V3 jsonschema validation: admin_actions

This patch adds jsonschema validation for below admin_actions API's
* POST /v3/{project_id}/volumes/{volume_id}/action
* POST /v3/{project_id}/snapshots/{snapshot_id}/action
* POST /v3/{project_id}/backups/{backup_id}/action

Partial-Implements: bp json-schema-validation

Change-Id: Ib9057fe57c1b9cdcd0d924b029f03da3589507a2
This commit is contained in:
Neha Alhat 2017-12-08 18:17:36 +05:30
parent 5dcf4f52ad
commit 51c293dba3
5 changed files with 402 additions and 108 deletions

View File

@ -14,22 +14,22 @@
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_utils import strutils
from six.moves import http_client
import webob
from webob import exc
from cinder.api import common
from cinder.api import extensions
from cinder.api import microversions as mv
from cinder.api.openstack import wsgi
from cinder.api.schemas import admin_actions
from cinder.api import validation
from cinder import backup
from cinder import db
from cinder import exception
from cinder.i18n import _
from cinder import objects
from cinder.objects import fields
from cinder import rpc
from cinder import utils
from cinder import volume
@ -43,13 +43,6 @@ class AdminController(wsgi.Controller):
# FIXME(clayg): this will be hard to keep up-to-date
# Concrete classes can expand or over-ride
valid_status = set(['creating',
'available',
'deleting',
'error',
'error_deleting',
'error_managing',
'managing', ])
def __init__(self, *args, **kwargs):
super(AdminController, self).__init__(*args, **kwargs)
@ -67,16 +60,8 @@ class AdminController(wsgi.Controller):
def _delete(self, *args, **kwargs):
raise NotImplementedError()
def validate_update(self, body):
update = {}
try:
update['status'] = body['status'].lower()
except (TypeError, KeyError):
raise exc.HTTPBadRequest(explanation=_("Must specify 'status'"))
if update['status'] not in self.valid_status:
raise exc.HTTPBadRequest(
explanation=_("Must specify a valid status"))
return update
def validate_update(self, req, body):
raise NotImplementedError()
def authorize(self, context, action_name, target_obj=None):
context.authorize(
@ -107,7 +92,7 @@ class AdminController(wsgi.Controller):
'attached_mode')
context = req.environ['cinder.context']
update = self.validate_update(body['os-reset_status'])
update = self.validate_update(req, body=body)
msg = "Updating %(resource)s '%(id)s' with '%(update)r'"
LOG.debug(msg, {'resource': self.resource_name, 'id': id,
'update': update})
@ -142,20 +127,6 @@ class VolumeAdminController(AdminController):
collection = 'volumes'
# FIXME(jdg): We're appending additional valid status
# entries to the set we declare in the parent class
# this doesn't make a ton of sense, we should probably
# look at the structure of this whole process again
# Perhaps we don't even want any definitions in the abstract
# parent class?
valid_status = AdminController.valid_status.union(
('attaching', 'in-use', 'detaching', 'maintenance'))
valid_attach_status = (fields.VolumeAttachStatus.ATTACHED,
fields.VolumeAttachStatus.DETACHED,)
valid_migration_status = ('migrating', 'error',
'success', 'completing',
'none', 'starting',)
def _update(self, *args, **kwargs):
context = args[0]
volume_id = args[1]
@ -169,53 +140,38 @@ class VolumeAdminController(AdminController):
def _delete(self, *args, **kwargs):
return self.volume_api.delete(*args, **kwargs)
def validate_update(self, body):
@validation.schema(admin_actions.reset)
def validate_update(self, req, body):
update = {}
body = body['os-reset_status']
status = body.get('status', None)
attach_status = body.get('attach_status', None)
migration_status = body.get('migration_status', None)
valid = False
if status:
valid = True
update = super(VolumeAdminController, self).validate_update(body)
update['status'] = status.lower()
if attach_status:
valid = True
update['attach_status'] = attach_status.lower()
if update['attach_status'] not in self.valid_attach_status:
raise exc.HTTPBadRequest(
explanation=_("Must specify a valid attach status"))
if migration_status:
valid = True
update['migration_status'] = migration_status.lower()
if update['migration_status'] not in self.valid_migration_status:
raise exc.HTTPBadRequest(
explanation=_("Must specify a valid migration status"))
if update['migration_status'] == 'none':
update['migration_status'] = None
if not valid:
raise exc.HTTPBadRequest(
explanation=_("Must specify 'status', 'attach_status' "
"or 'migration_status' for update."))
return update
@wsgi.response(http_client.ACCEPTED)
@wsgi.action('os-force_detach')
@validation.schema(admin_actions.force_detach)
def _force_detach(self, req, id, body):
"""Roll back a bad detach after the volume been disconnected."""
context = req.environ['cinder.context']
# Not found exception will be handled at the wsgi level
volume = self._get(context, id)
self.authorize(context, 'force_detach', target_obj=volume)
try:
connector = body['os-force_detach'].get('connector', None)
except AttributeError:
msg = _("Invalid value '%s' for "
"os-force_detach.") % body['os-force_detach']
raise webob.exc.HTTPBadRequest(explanation=msg)
connector = body['os-force_detach'].get('connector', None)
try:
self.volume_api.terminate_connection(context, volume, connector)
except exception.VolumeBackendAPIException as error:
@ -242,6 +198,8 @@ class VolumeAdminController(AdminController):
@wsgi.response(http_client.ACCEPTED)
@wsgi.action('os-migrate_volume')
@validation.schema(admin_actions.migrate_volume, '2.0', '3.15')
@validation.schema(admin_actions.migrate_volume_v316, '3.16')
def _migrate_volume(self, req, id, body):
"""Migrate a volume to the specified host."""
context = req.environ['cinder.context']
@ -252,12 +210,15 @@ class VolumeAdminController(AdminController):
cluster_name, host = common.get_cluster_host(req, params,
mv.VOLUME_MIGRATE_CLUSTER)
force_host_copy = utils.get_bool_param('force_host_copy', params)
lock_volume = utils.get_bool_param('lock_volume', params)
force_host_copy = strutils.bool_from_string(params.get(
'force_host_copy', False), strict=True)
lock_volume = strutils.bool_from_string(params.get(
'lock_volume', False), strict=True)
self.volume_api.migrate_volume(context, volume, host, cluster_name,
force_host_copy, lock_volume)
@wsgi.action('os-migrate_volume_completion')
@validation.schema(admin_actions.migrate_volume_completion)
def _migrate_volume_completion(self, req, id, body):
"""Complete an in-progress migration."""
context = req.environ['cinder.context']
@ -265,11 +226,7 @@ class VolumeAdminController(AdminController):
volume = self._get(context, id)
self.authorize(context, 'migrate_volume_completion', target_obj=volume)
params = body['os-migrate_volume_completion']
try:
new_volume_id = params['new_volume']
except KeyError:
raise exc.HTTPBadRequest(
explanation=_("Must specify 'new_volume'"))
new_volume_id = params['new_volume']
# Not found exception will be handled at the wsgi level
new_volume = self._get(context, new_volume_id)
error = params.get('error', False)
@ -282,7 +239,12 @@ class SnapshotAdminController(AdminController):
"""AdminController for Snapshots."""
collection = 'snapshots'
valid_status = fields.SnapshotStatus.ALL
@validation.schema(admin_actions.reset_status_snapshot)
def validate_update(self, req, body):
status = body['os-reset_status']['status']
update = {'status': status.lower()}
return update
def _update(self, *args, **kwargs):
context = args[0]
@ -305,10 +267,6 @@ class BackupAdminController(AdminController):
collection = 'backups'
valid_status = set(['available',
'error'
])
def _get(self, *args, **kwargs):
return self.backup_api.get(*args, **kwargs)
@ -317,10 +275,12 @@ class BackupAdminController(AdminController):
@wsgi.response(http_client.ACCEPTED)
@wsgi.action('os-reset_status')
@validation.schema(admin_actions.reset_status_backup)
def _reset_status(self, req, id, body):
"""Reset status on the resource."""
context = req.environ['cinder.context']
update = self.validate_update(body['os-reset_status'])
status = body['os-reset_status']['status']
update = {'status': status.lower()}
msg = "Updating %(resource)s '%(id)s' with '%(update)r'"
LOG.debug(msg, {'resource': self.resource_name, 'id': id,
'update': update})

View File

@ -0,0 +1,141 @@
# Copyright (C) 2018 NTT DATA
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Schema for V3 admin_actions API.
"""
import copy
from cinder.api.validation import parameter_types
reset = {
'type': 'object',
'properties': {
'os-reset_status': {
'type': 'object',
'format': 'validate_volume_reset_body',
'properties': {
'status': {'type': ['string', 'null'],
'format': 'volume_status'},
'attach_status': {'type': ['string', 'null'],
'format': 'volume_attach_status'},
'migration_status': {'type': ['string', 'null'],
'format': 'volume_migration_status'},
},
'additionalProperties': False,
},
},
'required': ['os-reset_status'],
'additionalProperties': False,
}
force_detach = {
'type': 'object',
'properties': {
'os-force_detach': {
'type': 'object',
'properties': {
'connector': {'type': ['string', 'object', 'null']},
'attachment_id': {'type': ['string', 'null']}
},
'additionalProperties': False,
},
},
'required': ['os-force_detach'],
'additionalProperties': False,
}
migrate_volume = {
'type': 'object',
'properties': {
'os-migrate_volume': {
'type': 'object',
'properties': {
'host': {'type': 'string', 'maxLength': 255},
'force_host_copy': parameter_types.boolean,
'lock_volume': parameter_types.boolean,
},
'required': ['host'],
'additionalProperties': False,
},
},
'required': ['os-migrate_volume'],
'additionalProperties': False,
}
migrate_volume_v316 = {
'type': 'object',
'properties': {
'os-migrate_volume': {
'type': 'object',
'properties': {
'host': {'type': ['string', 'null'],
'maxLength': 255},
'force_host_copy': parameter_types.boolean,
'lock_volume': parameter_types.boolean,
'cluster': parameter_types.name_allow_zero_min_length,
},
'additionalProperties': False,
},
},
'required': ['os-migrate_volume'],
'additionalProperties': False,
}
migrate_volume_completion = {
'type': 'object',
'properties': {
'os-migrate_volume_completion': {
'type': 'object',
'properties': {
'new_volume': parameter_types.uuid,
'error': {'type': ['string', 'null', 'boolean']},
},
'required': ['new_volume'],
'additionalProperties': False,
},
},
'required': ['os-migrate_volume_completion'],
'additionalProperties': False,
}
reset_status_backup = {
'type': 'object',
'properties': {
'os-reset_status': {
'type': 'object',
'properties': {
'status': {'type': 'string',
'format': 'backup_status'},
},
'required': ['status'],
'additionalProperties': False,
},
},
'required': ['os-reset_status'],
'additionalProperties': False,
}
reset_status_snapshot = copy.deepcopy(reset_status_backup)
reset_status_snapshot['properties']['os-reset_status'][
'properties']['status']['format'] = 'snapshot_status'

View File

@ -296,6 +296,82 @@ def _validate_log_level(level):
return True
@jsonschema.FormatChecker.cls_checks('validate_volume_reset_body')
def _validate_volume_reset_body(instance):
status = instance.get('status')
attach_status = instance.get('attach_status')
migration_status = instance.get('migration_status')
if not status and not attach_status and not migration_status:
msg = _("Must specify 'status', 'attach_status' or 'migration_status'"
" for update.")
raise exception.InvalidParameterValue(err=msg)
return True
@jsonschema.FormatChecker.cls_checks('volume_status')
def _validate_volume_status(param_value):
if param_value and param_value.lower() not in c_fields.VolumeStatus.ALL:
msg = _("Volume status: %(status)s is invalid, "
"valid statuses are: "
"%(valid)s.") % {'status': param_value,
'valid': c_fields.VolumeStatus.ALL}
raise exception.InvalidParameterValue(err=msg)
return True
@jsonschema.FormatChecker.cls_checks('volume_attach_status')
def _validate_volume_attach_status(param_value):
valid_attach_status = [c_fields.VolumeAttachStatus.ATTACHED,
c_fields.VolumeAttachStatus.DETACHED]
if param_value and param_value.lower() not in valid_attach_status:
msg = _("Volume attach status: %(status)s is invalid, "
"valid statuses are: "
"%(valid)s.") % {'status': param_value,
'valid': valid_attach_status}
raise exception.InvalidParameterValue(err=msg)
return True
@jsonschema.FormatChecker.cls_checks('volume_migration_status')
def _validate_volume_migration_status(param_value):
if param_value and (
param_value.lower() not in c_fields.VolumeMigrationStatus.ALL):
msg = _("Volume migration status: %(status)s is invalid, "
"valid statuses are: "
"%(valid)s.") % {'status': param_value,
'valid': c_fields.VolumeMigrationStatus.ALL}
raise exception.InvalidParameterValue(err=msg)
return True
@jsonschema.FormatChecker.cls_checks('snapshot_status')
def _validate_snapshot_status(param_value):
if not param_value or (
param_value.lower() not in c_fields.SnapshotStatus.ALL):
msg = _("Snapshot status: %(status)s is invalid, "
"valid statuses are: "
"%(valid)s.") % {'status': param_value,
'valid': c_fields.SnapshotStatus.ALL}
raise exception.InvalidParameterValue(err=msg)
return True
@jsonschema.FormatChecker.cls_checks('backup_status')
def _validate_backup_status(param_value):
valid_status = [c_fields.BackupStatus.AVAILABLE,
c_fields.BackupStatus.ERROR]
if not param_value or (
param_value.lower() not in valid_status):
msg = _("Backup status: %(status)s is invalid, "
"valid statuses are: "
"%(valid)s.") % {'status': param_value,
'valid': valid_status}
raise exception.InvalidParameterValue(err=msg)
return True
class FormatChecker(jsonschema.FormatChecker):
"""A FormatChecker can output the message from cause exception

View File

@ -165,5 +165,42 @@ class VolumeAttachStatusField(BaseEnumField):
AUTO_TYPE = VolumeAttachStatus()
class VolumeStatus(BaseCinderEnum):
CREATING = 'creating'
AVAILABLE = 'available'
DELETING = 'deleting'
ERROR = 'error'
ERROR_DELETING = 'error_deleting'
ERROR_MANAGING = 'error_managing'
MANAGING = 'managing'
ATTACHING = 'attaching'
IN_USE = 'in-use'
DETACHING = 'detaching'
MAINTENANCE = 'maintenance'
ALL = (CREATING, AVAILABLE, DELETING, ERROR, ERROR_DELETING,
ERROR_MANAGING, MANAGING, ATTACHING, IN_USE, DETACHING,
MAINTENANCE)
class VolumeStatusField(BaseEnumField):
AUTO_TYPE = VolumeStatus()
class VolumeMigrationStatus(BaseCinderEnum):
MIGRATING = 'migrating'
ERROR = 'error'
SUCCESS = 'success'
COMPLETING = 'completing'
NONE = 'none'
STARTING = 'starting'
ALL = (MIGRATING, ERROR, SUCCESS, COMPLETING, NONE, STARTING)
class VolumeMigrationStatusField(BaseEnumField):
AUTO_TYPE = VolumeStatus()
class DictOfNullableField(fields.AutoTypedField):
AUTO_TYPE = fields.Dict(fields.FieldType(), nullable=True)

View File

@ -20,7 +20,6 @@ from oslo_serialization import jsonutils
from oslo_utils import timeutils
from six.moves import http_client
import webob
from webob import exc
from cinder.api.contrib import admin_actions
from cinder.api import microversions as mv
@ -150,25 +149,45 @@ class AdminActionsTest(BaseAdminTest):
backup['id'],
updated_status)
def test_valid_updates(self):
@ddt.data({'os-reset_status': {'status': 'creating'}},
{'os-reset_status': {'status': 'available'}},
{'os-reset_status': {'status': 'deleting'}},
{'os-reset_status': {'status': 'error'}},
{'os-reset_status': {'status': 'error_deleting'}},
{'os-reset_status': {'attach_status':
fields.VolumeAttachStatus.DETACHED}},
{'os-reset_status': {'attach_status':
fields.VolumeAttachStatus.ATTACHED}},
{'os-reset_status': {'migration_status': 'migrating'}},
{'os-reset_status': {'migration_status': 'completing'}},
{'os-reset_status': {'migration_status': 'error'}},
{'os-reset_status': {'migration_status': 'none'}},
{'os-reset_status': {'migration_status': 'starting'}})
def test_valid_updates(self, body):
req = webob.Request.blank('/v3/%s/volumes/%s/action' % (
fake.PROJECT_ID, id))
req.method = 'POST'
req.headers['content-type'] = 'application/json'
req.environ['cinder.context'] = self.ctx
req.api_version_request = mv.get_api_version(mv.BASE_VERSION)
vac = self.controller
vac.validate_update(req, body=body)
vac.validate_update({'status': 'creating'})
vac.validate_update({'status': 'available'})
vac.validate_update({'status': 'deleting'})
vac.validate_update({'status': 'error'})
vac.validate_update({'status': 'error_deleting'})
vac.validate_update({'attach_status':
fields.VolumeAttachStatus.DETACHED})
vac.validate_update({'attach_status':
fields.VolumeAttachStatus.ATTACHED})
vac.validate_update({'migration_status': 'migrating'})
vac.validate_update({'migration_status': 'error'})
vac.validate_update({'migration_status': 'completing'})
vac.validate_update({'migration_status': 'none'})
vac.validate_update({'migration_status': 'starting'})
@ddt.data({'os-reset_status': {'status': None}},
{'os-reset_status': {'attach_status': None}},
{'os-reset_status': {'migration_status': None}},
{'os-reset_status': {'status': "", 'attach_status': "",
"migration_status": ""}})
def test_invalid_updates(self, body):
req = webob.Request.blank('/v3/%s/volumes/%s/action' % (
fake.PROJECT_ID, id))
req.method = 'POST'
req.headers['content-type'] = 'application/json'
req.environ['cinder.context'] = self.ctx
req.api_version_request = mv.get_api_version(mv.BASE_VERSION)
vac = self.controller
self.assertRaises(exception.InvalidParameterValue, vac.validate_update,
req, body=body)
def test_reset_attach_status(self):
volume = db.volume_create(self.ctx,
@ -292,15 +311,15 @@ class AdminActionsTest(BaseAdminTest):
self.assertEqual(http_client.ACCEPTED, resp.status_int)
def test_invalid_status_for_backup(self):
@ddt.data({'status': None}, {'status': 'restoring'})
def test_invalid_status_for_backup(self, status):
volume = db.volume_create(self.ctx,
{'status': 'available', 'host': 'test',
'provider_location': '', 'size': 1})
backup = db.backup_create(self.ctx, {'status': 'available',
'volume_id': volume['id']})
resp = self._issue_backup_reset(self.ctx,
backup,
{'status': 'restoring'})
backup, status)
self.assertEqual(http_client.BAD_REQUEST, resp.status_int)
def test_backup_reset_status_with_invalid_backup(self):
@ -321,6 +340,26 @@ class AdminActionsTest(BaseAdminTest):
# Should raise 404 if backup doesn't exist.
self.assertEqual(http_client.NOT_FOUND, resp.status_int)
@ddt.data({'os-reset_status': {}})
def test_backup_reset_status_with_invalid_body(self, body):
volume = db.volume_create(self.ctx,
{'status': 'available', 'host': 'test',
'provider_location': '', 'size': 1})
backup = db.backup_create(self.ctx,
{'status': fields.BackupStatus.AVAILABLE,
'volume_id': volume['id'],
'user_id': fake.USER_ID,
'project_id': fake.PROJECT_ID})
req = webob.Request.blank('/v2/%s/%s/%s/action' % (
fake.PROJECT_ID, 'backups', backup['id']))
req.method = 'POST'
req.headers['content-type'] = 'application/json'
req.body = jsonutils.dump_as_bytes(body)
req.environ['cinder.context'] = self.ctx
resp = req.get_response(app())
self.assertEqual(http_client.BAD_REQUEST, resp.status_int)
def test_malformed_reset_status_body(self):
volume = db.volume_create(self.ctx, {'status': 'available', 'size': 1})
@ -447,7 +486,8 @@ class AdminActionsTest(BaseAdminTest):
snapshot = objects.Snapshot.get_by_id(self.ctx, snapshot['id'])
self.assertEqual(fields.SnapshotStatus.ERROR, snapshot.status)
def test_invalid_status_for_snapshot(self):
@ddt.data({'status': None}, {'status': 'attaching'})
def test_invalid_status_for_snapshot(self, updated_status):
volume = db.volume_create(self.ctx,
{'status': 'available', 'host': 'test',
'provider_location': '', 'size': 1})
@ -457,12 +497,31 @@ class AdminActionsTest(BaseAdminTest):
snapshot.create()
self.addCleanup(snapshot.destroy)
resp = self._issue_snapshot_reset(self.ctx, snapshot,
{'status': 'attaching'})
resp = self._issue_snapshot_reset(self.ctx, snapshot, updated_status)
self.assertEqual(http_client.BAD_REQUEST, resp.status_int)
self.assertEqual(fields.SnapshotStatus.AVAILABLE, snapshot.status)
@ddt.data({'os-reset_status': {}})
def test_snapshot_reset_status_with_invalid_body(self, body):
volume = db.volume_create(self.ctx,
{'status': 'available', 'host': 'test',
'provider_location': '', 'size': 1})
snapshot = objects.Snapshot(self.ctx,
status=fields.SnapshotStatus.AVAILABLE,
volume_id=volume['id'])
snapshot.create()
self.addCleanup(snapshot.destroy)
req = webob.Request.blank('/v2/%s/%s/%s/action' % (
fake.PROJECT_ID, 'snapshots', snapshot['id']))
req.method = 'POST'
req.headers['content-type'] = 'application/json'
req.body = jsonutils.dump_as_bytes(body)
req.environ['cinder.context'] = self.ctx
resp = req.get_response(app())
self.assertEqual(http_client.BAD_REQUEST, resp.status_int)
def test_force_delete(self):
# current status is creating
volume = self._create_volume(self.ctx, {'size': 1, 'host': None})
@ -587,14 +646,15 @@ class AdminActionsTest(BaseAdminTest):
cluster=cluster)
def _migrate_volume_exec(self, ctx, volume, host, expected_status,
force_host_copy=False):
force_host_copy=False, lock_volume=False):
# build request to migrate to host
req = webob.Request.blank('/v2/%s/volumes/%s/action' % (
fake.PROJECT_ID, volume['id']))
req.method = 'POST'
req.headers['content-type'] = 'application/json'
body = {'os-migrate_volume': {'host': host,
'force_host_copy': force_host_copy}}
'force_host_copy': force_host_copy,
'lock_volume': lock_volume}}
req.body = jsonutils.dump_as_bytes(body)
req.environ['cinder.context'] = ctx
resp = req.get_response(app())
@ -690,12 +750,29 @@ class AdminActionsTest(BaseAdminTest):
self.addCleanup(snap.destroy)
self._migrate_volume_exec(self.ctx, volume, host, expected_status)
def test_migrate_volume_bad_force_host_copy(self):
@ddt.data('force_host_copy', None, ' true ', 0)
def test_migrate_volume_bad_force_host_copy(self, force_host_copy):
expected_status = http_client.BAD_REQUEST
host = 'test2'
volume = self._migrate_volume_prep()
self._migrate_volume_exec(self.ctx, volume, host, expected_status,
force_host_copy='foo')
force_host_copy=force_host_copy)
@ddt.data('lock_volume', None, ' true ', 0)
def test_migrate_volume_bad_lock_volume(self, lock_volume):
expected_status = http_client.BAD_REQUEST
host = 'test2'
volume = self._migrate_volume_prep()
self._migrate_volume_exec(self.ctx, volume, host, expected_status,
lock_volume=lock_volume)
@ddt.data('true', False, '1', '0')
def test_migrate_volume_valid_lock_volume(self, lock_volume):
expected_status = http_client.ACCEPTED
host = 'test2'
volume = self._migrate_volume_prep()
self._migrate_volume_exec(self.ctx, volume, host, expected_status,
lock_volume=lock_volume)
def _migrate_volume_comp_exec(self, ctx, volume, new_volume, error,
expected_status, expected_id, no_body=False):
@ -776,16 +853,19 @@ class AdminActionsTest(BaseAdminTest):
self._migrate_volume_comp_exec(self.ctx, volume, new_volume, False,
expected_status, expected_id)
def test_backup_reset_valid_updates(self):
vac = admin_actions.BackupAdminController()
vac.validate_update({'status': 'available'})
vac.validate_update({'status': 'error'})
self.assertRaises(exc.HTTPBadRequest,
vac.validate_update,
{'status': 'restoring'})
self.assertRaises(exc.HTTPBadRequest,
vac.validate_update,
{'status': 'creating'})
def test_migrate_volume_comp_no_new_volume(self):
volume = db.volume_create(self.ctx, {'id': fake.VOLUME_ID})
req = webob.Request.blank('/v2/%s/volumes/%s/action' % (
fake.PROJECT_ID, volume['id']))
req.method = 'POST'
req.headers['content-type'] = 'application/json'
body = {'os-migrate_volume_completion': {'error': False}}
req.body = jsonutils.dump_as_bytes(body)
req.environ['cinder.context'] = self.ctx
resp = req.get_response(app())
res_dict = jsonutils.loads(resp.body)
self.assertEqual(http_client.BAD_REQUEST,
res_dict['badRequest']['code'])
@mock.patch('cinder.backup.rpcapi.BackupAPI.delete_backup', mock.Mock())
@mock.patch('cinder.db.service_get_all')