api: Separate volume, snapshot and volume attachments

These all belong in separate files. Make it so.

We also rename the volume_attachment schema file to volume_attachments,
to better link it to the actual API code, and tweak an error message to
fix some capitalization.

Change-Id: Iffefc263bbf19d18137207c0432c16fdb3c513f9
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane
2025-07-29 15:05:08 +01:00
parent 2e666e768a
commit a8651eaff3
95 changed files with 3276 additions and 2907 deletions

View File

@@ -45,12 +45,12 @@ Response
**Example List volume attachments for an instance: JSON response**
.. literalinclude:: ../../doc/api_samples/os-volumes/list-volume-attachments-resp.json
.. literalinclude:: ../../doc/api_samples/os-volume_attachments/list-volume-attachments-resp.json
:language: javascript
**Example List tagged volume attachments for an instance (v2.89): JSON response**
.. literalinclude:: ../../doc/api_samples/os-volumes/v2.89/list-volume-attachments-resp.json
.. literalinclude:: ../../doc/api_samples/os-volume_attachments/v2.89/list-volume-attachments-resp.json
:language: javascript
Attach a volume to an instance
@@ -90,17 +90,17 @@ Request
**Example Attach a volume to an instance: JSON request**
.. literalinclude:: ../../doc/api_samples/os-volumes/attach-volume-to-server-req.json
.. literalinclude:: ../../doc/api_samples/os-volume_attachments/attach-volume-to-server-req.json
:language: javascript
**Example Attach a volume to an instance and tag it (v2.49): JSON request**
.. literalinclude:: ../../doc/api_samples/os-volumes/v2.49/attach-volume-to-server-req.json
.. literalinclude:: ../../doc/api_samples/os-volume_attachments/v2.49/attach-volume-to-server-req.json
:language: javascript
**Example Attach a volume to an instance with "delete_on_termination" (v2.79): JSON request**
.. literalinclude:: ../../doc/api_samples/os-volumes/v2.79/attach-volume-to-server-req.json
.. literalinclude:: ../../doc/api_samples/os-volume_attachments/v2.79/attach-volume-to-server-req.json
:language: javascript
Response
@@ -118,17 +118,17 @@ Response
**Example Attach a volume to an instance: JSON response**
.. literalinclude:: ../../doc/api_samples/os-volumes/attach-volume-to-server-resp.json
.. literalinclude:: ../../doc/api_samples/os-volume_attachments/attach-volume-to-server-resp.json
:language: javascript
**Example Attach a tagged volume to an instance (v2.70): JSON response**
.. literalinclude:: ../../doc/api_samples/os-volumes/v2.70/attach-volume-to-server-resp.json
.. literalinclude:: ../../doc/api_samples/os-volume_attachments/v2.70/attach-volume-to-server-resp.json
:language: javascript
**Example Attach a volume with "delete_on_termination" (v2.79): JSON response**
.. literalinclude:: ../../doc/api_samples/os-volumes/v2.79/attach-volume-to-server-resp.json
.. literalinclude:: ../../doc/api_samples/os-volume_attachments/v2.79/attach-volume-to-server-resp.json
:language: javascript
Show a detail of a volume attachment
@@ -167,12 +167,12 @@ Response
**Example Show a detail of a volume attachment: JSON response**
.. literalinclude:: ../../doc/api_samples/os-volumes/volume-attachment-detail-resp.json
.. literalinclude:: ../../doc/api_samples/os-volume_attachments/volume-attachment-detail-resp.json
:language: javascript
**Example Show a detail of a tagged volume attachment (v2.89): JSON response**
.. literalinclude:: ../../doc/api_samples/os-volumes/v2.89/volume-attachment-detail-resp.json
.. literalinclude:: ../../doc/api_samples/os-volume_attachments/v2.89/volume-attachment-detail-resp.json
:language: javascript
Update a volume attachment
@@ -233,7 +233,7 @@ Request
**Example Update a volume attachment (v2.85): JSON request**
.. literalinclude:: ../../doc/api_samples/os-volumes/v2.85/update-volume-attachment-delete-flag-req.json
.. literalinclude:: ../../doc/api_samples/os-volume_attachments/v2.85/update-volume-attachment-delete-flag-req.json
:language: javascript
Response

View File

@@ -279,7 +279,7 @@ Response
**Example List Snapshots**
.. literalinclude:: ../../doc/api_samples/os-volumes/snapshots-list-resp.json
.. literalinclude:: ../../doc/api_samples/os-snapshots/snapshots-list-resp.json
:language: javascript
Create Snapshot
@@ -306,7 +306,7 @@ Request
**Example Create Snapshot**
.. literalinclude:: ../../doc/api_samples/os-volumes/snapshot-create-req.json
.. literalinclude:: ../../doc/api_samples/os-snapshots/snapshot-create-req.json
:language: javascript
Response
@@ -325,7 +325,7 @@ Response
**Example Create Snapshot**
.. literalinclude:: ../../doc/api_samples/os-volumes/snapshot-create-resp.json
.. literalinclude:: ../../doc/api_samples/os-snapshots/snapshot-create-resp.json
:language: javascript
List Snapshots With Details
@@ -365,7 +365,7 @@ Response
**Example List Snapshots With Details**
.. literalinclude:: ../../doc/api_samples/os-volumes/snapshots-detail-resp.json
.. literalinclude:: ../../doc/api_samples/os-snapshots/snapshots-detail-resp.json
:language: javascript
Show Snapshot Details
@@ -404,7 +404,7 @@ Response
**Example Show Snapshot Details**
.. literalinclude:: ../../doc/api_samples/os-volumes/snapshots-show-resp.json
.. literalinclude:: ../../doc/api_samples/os-snapshots/snapshots-show-resp.json
:language: javascript
Delete Snapshot

View File

@@ -79,10 +79,12 @@ from nova.api.openstack.compute import servers
from nova.api.openstack.compute import services
from nova.api.openstack.compute import shelve
from nova.api.openstack.compute import simple_tenant_usage
from nova.api.openstack.compute import snapshots
from nova.api.openstack.compute import suspend_server
from nova.api.openstack.compute import tenant_networks
from nova.api.openstack.compute import versionsV21
from nova.api.openstack.compute import virtual_interfaces
from nova.api.openstack.compute import volume_attachments
from nova.api.openstack.compute import volumes
from nova.api.openstack import wsgi
from nova.api import wsgi as base_wsgi
@@ -321,7 +323,7 @@ server_topology_controller = functools.partial(_create_controller,
server_topology.ServerTopologyController, [])
server_volume_attachments_controller = functools.partial(_create_controller,
volumes.VolumeAttachmentController, [])
volume_attachments.VolumeAttachmentController, [])
services_controller = functools.partial(_create_controller,
@@ -333,7 +335,7 @@ simple_tenant_usage_controller = functools.partial(_create_controller,
snapshots_controller = functools.partial(_create_controller,
volumes.SnapshotController, [])
snapshots.SnapshotController, [])
tenant_networks_controller = functools.partial(_create_controller,

View File

@@ -0,0 +1,53 @@
# Copyright 2014 IBM Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova.api.validation import parameter_types
create = {
'type': 'object',
'properties': {
'snapshot': {
'type': 'object',
'properties': {
'volume_id': {'type': 'string'},
'force': parameter_types.boolean,
'display_name': {'type': 'string'},
'display_description': {'type': 'string'},
},
'required': ['volume_id'],
'additionalProperties': False,
},
},
'required': ['snapshot'],
'additionalProperties': False,
}
index_query = {
'type': 'object',
'properties': {
'limit': parameter_types.multi_params(
parameter_types.non_negative_integer),
'offset': parameter_types.multi_params(
parameter_types.non_negative_integer)
},
'additionalProperties': True
}
detail_query = index_query
show_query = {
'type': 'object',
'properties': {},
'additionalProperties': True
}

View File

@@ -1,18 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# TODO(stephenfin): Remove additionalProperties in a future API version
show_query = {
'type': 'object',
'properties': {},
'additionalProperties': True,
}

View File

@@ -0,0 +1,102 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from nova.api.validation import parameter_types
create = {
'type': 'object',
'properties': {
'volumeAttachment': {
'type': 'object',
'properties': {
'volumeId': parameter_types.volume_id,
'device': {
'type': ['string', 'null'],
# NOTE: The validation pattern from match_device() in
# nova/block_device.py.
'pattern': '(^/dev/x{0,1}[a-z]{0,1}d{0,1})([a-z]+)[0-9]*$'
},
},
'required': ['volumeId'],
'additionalProperties': False,
},
},
'required': ['volumeAttachment'],
'additionalProperties': False,
}
create_v249 = copy.deepcopy(create)
create_v249['properties']['volumeAttachment'][
'properties']['tag'] = parameter_types.tag
create_v279 = copy.deepcopy(create_v249)
create_v279['properties']['volumeAttachment'][
'properties']['delete_on_termination'] = parameter_types.boolean
update = copy.deepcopy(create)
del update['properties']['volumeAttachment']['properties']['device']
# NOTE(brinzhang): Allow attachment_id, serverId, device, tag, and
# delete_on_termination (i.e., follow the content of the GET response)
# to be specified for RESTfulness, even though we will not allow updating
# all of them.
update_v285 = {
'type': 'object',
'properties': {
'volumeAttachment': {
'type': 'object',
'properties': {
'volumeId': parameter_types.volume_id,
'device': {
'type': ['string', 'null'],
# NOTE: The validation pattern from match_device() in
# nova/block_device.py.
'pattern': '(^/dev/x{0,1}[a-z]{0,1}d{0,1})([a-z]+)[0-9]*$'
},
'tag': parameter_types.tag,
'delete_on_termination': parameter_types.boolean,
'serverId': parameter_types.server_id,
'id': parameter_types.attachment_id
},
'required': ['volumeId'],
'additionalProperties': False,
},
},
'required': ['volumeAttachment'],
'additionalProperties': False,
}
index_query = {
'type': 'object',
'properties': {
'limit': parameter_types.multi_params(
parameter_types.non_negative_integer),
'offset': parameter_types.multi_params(
parameter_types.non_negative_integer)
},
# NOTE(gmann): This is kept True to keep backward compatibility.
# As of now Schema validation stripped out the additional parameters and
# does not raise 400. In microversion 2.75, we have blocked the additional
# parameters.
'additionalProperties': True
}
index_query_v275 = copy.deepcopy(index_query)
index_query_v275['additionalProperties'] = False
# TODO(stephenfin): Remove additionalProperties in a future API version
show_query = {
'type': 'object',
'properties': {},
'additionalProperties': True,
}

View File

@@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
from nova.api.validation import parameter_types
create = {
@@ -43,88 +41,6 @@ create = {
}
snapshot_create = {
'type': 'object',
'properties': {
'snapshot': {
'type': 'object',
'properties': {
'volume_id': {'type': 'string'},
'force': parameter_types.boolean,
'display_name': {'type': 'string'},
'display_description': {'type': 'string'},
},
'required': ['volume_id'],
'additionalProperties': False,
},
},
'required': ['snapshot'],
'additionalProperties': False,
}
create_volume_attachment = {
'type': 'object',
'properties': {
'volumeAttachment': {
'type': 'object',
'properties': {
'volumeId': parameter_types.volume_id,
'device': {
'type': ['string', 'null'],
# NOTE: The validation pattern from match_device() in
# nova/block_device.py.
'pattern': '(^/dev/x{0,1}[a-z]{0,1}d{0,1})([a-z]+)[0-9]*$'
},
},
'required': ['volumeId'],
'additionalProperties': False,
},
},
'required': ['volumeAttachment'],
'additionalProperties': False,
}
create_volume_attachment_v249 = copy.deepcopy(create_volume_attachment)
create_volume_attachment_v249['properties']['volumeAttachment'][
'properties']['tag'] = parameter_types.tag
create_volume_attachment_v279 = copy.deepcopy(create_volume_attachment_v249)
create_volume_attachment_v279['properties']['volumeAttachment'][
'properties']['delete_on_termination'] = parameter_types.boolean
update_volume_attachment = copy.deepcopy(create_volume_attachment)
del update_volume_attachment['properties']['volumeAttachment'][
'properties']['device']
# NOTE(brinzhang): Allow attachment_id, serverId, device, tag, and
# delete_on_termination (i.e., follow the content of the GET response)
# to be specified for RESTfulness, even though we will not allow updating
# all of them.
update_volume_attachment_v285 = {
'type': 'object',
'properties': {
'volumeAttachment': {
'type': 'object',
'properties': {
'volumeId': parameter_types.volume_id,
'device': {
'type': ['string', 'null'],
# NOTE: The validation pattern from match_device() in
# nova/block_device.py.
'pattern': '(^/dev/x{0,1}[a-z]{0,1}d{0,1})([a-z]+)[0-9]*$'
},
'tag': parameter_types.tag,
'delete_on_termination': parameter_types.boolean,
'serverId': parameter_types.server_id,
'id': parameter_types.attachment_id
},
'required': ['volumeId'],
'additionalProperties': False,
},
},
'required': ['volumeAttachment'],
'additionalProperties': False,
}
index_query = {
'type': 'object',
'properties': {
@@ -133,28 +49,13 @@ index_query = {
'offset': parameter_types.multi_params(
parameter_types.non_negative_integer)
},
# NOTE(gmann): This is kept True to keep backward compatibility.
# As of now Schema validation stripped out the additional parameters and
# does not raise 400. In microversion 2.75, we have blocked the additional
# parameters.
'additionalProperties': True
}
detail_query = index_query
index_query_275 = copy.deepcopy(index_query)
index_query_275['additionalProperties'] = False
# TODO(stephenfin): Remove additionalProperties in a future API version
show_query = {
'type': 'object',
'properties': {},
'additionalProperties': True
}
# TODO(stephenfin): Remove additionalProperties in a future API version
snapshot_show_query = {
'type': 'object',
'properties': {},
'additionalProperties': True
}

View File

@@ -0,0 +1,151 @@
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""The volume snapshots extension."""
from oslo_utils import strutils
from webob import exc
from nova.api.openstack.api_version_request \
import MAX_PROXY_API_SUPPORT_VERSION
from nova.api.openstack import common
from nova.api.openstack.compute.schemas import snapshots as schema
from nova.api.openstack import wsgi
from nova.api import validation
from nova import exception
from nova.policies import volumes as vol_policies
from nova.volume import cinder
def _translate_snapshot_detail_view(context, vol):
"""Maps keys for snapshots details view."""
return _translate_snapshot_summary_view(context, vol)
def _translate_snapshot_summary_view(context, vol):
"""Maps keys for snapshots summary view."""
d = {}
d['id'] = vol['id']
d['volumeId'] = vol['volume_id']
d['status'] = vol['status']
# NOTE(gagupta): We map volume_size as the snapshot size
d['size'] = vol['volume_size']
d['createdAt'] = vol['created_at']
d['displayName'] = vol['display_name']
d['displayDescription'] = vol['display_description']
return d
class SnapshotController(wsgi.Controller):
"""The Snapshots API controller for the OpenStack API."""
def __init__(self):
super().__init__()
self.volume_api = cinder.API()
@wsgi.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@wsgi.expected_errors(404)
@validation.query_schema(schema.show_query)
def show(self, req, id):
"""Return data about the given snapshot."""
context = req.environ['nova.context']
context.can(
vol_policies.POLICY_NAME % 'snapshots:show',
target={'project_id': context.project_id})
try:
vol = self.volume_api.get_snapshot(context, id)
except exception.SnapshotNotFound as e:
raise exc.HTTPNotFound(explanation=e.format_message())
return {'snapshot': _translate_snapshot_detail_view(context, vol)}
@wsgi.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@wsgi.response(202)
@wsgi.expected_errors(404)
def delete(self, req, id):
"""Delete a snapshot."""
context = req.environ['nova.context']
context.can(
vol_policies.POLICY_NAME % 'snapshots:delete',
target={'project_id': context.project_id})
try:
self.volume_api.delete_snapshot(context, id)
except exception.SnapshotNotFound as e:
raise exc.HTTPNotFound(explanation=e.format_message())
@wsgi.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@wsgi.expected_errors(())
@validation.query_schema(schema.index_query)
def index(self, req):
"""Returns a summary list of snapshots."""
context = req.environ['nova.context']
context.can(
vol_policies.POLICY_NAME % 'snapshots:list',
target={'project_id': context.project_id})
return self._items(req, entity_maker=_translate_snapshot_summary_view)
@wsgi.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@wsgi.expected_errors(())
@validation.query_schema(schema.detail_query)
def detail(self, req):
"""Returns a detailed list of snapshots."""
context = req.environ['nova.context']
context.can(
vol_policies.POLICY_NAME % 'snapshots:detail',
target={'project_id': context.project_id})
return self._items(req, entity_maker=_translate_snapshot_detail_view)
def _items(self, req, entity_maker):
"""Returns a list of snapshots, transformed through entity_maker."""
context = req.environ['nova.context']
snapshots = self.volume_api.get_all_snapshots(context)
limited_list = common.limited(snapshots, req)
res = [entity_maker(context, snapshot) for snapshot in limited_list]
return {'snapshots': res}
@wsgi.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@wsgi.expected_errors((400, 403))
@validation.schema(schema.create)
def create(self, req, body):
"""Creates a new snapshot."""
context = req.environ['nova.context']
context.can(
vol_policies.POLICY_NAME % 'snapshots:create',
target={'project_id': context.project_id})
snapshot = body['snapshot']
volume_id = snapshot['volume_id']
force = snapshot.get('force', False)
force = strutils.bool_from_string(force, strict=True)
if force:
create_func = self.volume_api.create_snapshot_force
else:
create_func = self.volume_api.create_snapshot
try:
new_snapshot = create_func(
context, volume_id,
snapshot.get('display_name'),
snapshot.get('display_description'))
except exception.OverQuota as e:
raise exc.HTTPForbidden(explanation=e.format_message())
retval = _translate_snapshot_detail_view(context, new_snapshot)
return {'snapshot': retval}

View File

@@ -0,0 +1,402 @@
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""The volume attachments extension."""
from oslo_utils import strutils
from webob import exc
from nova.api.openstack import api_version_request
from nova.api.openstack import common
from nova.api.openstack.compute.schemas import volume_attachments as schema
from nova.api.openstack import wsgi
from nova.api import validation
from nova.compute import api as compute
from nova.compute import vm_states
from nova import exception
from nova.i18n import _
from nova import objects
from nova.policies import volumes_attachments as va_policies
from nova.volume import cinder
def _translate_attachment_detail_view(
bdm,
show_tag=False,
show_delete_on_termination=False,
show_attachment_id_bdm_uuid=False,
):
"""Maps keys for attachment details view.
:param bdm: BlockDeviceMapping object for an attached volume
:param show_tag: True if the "tag" field should be in the response, False
to exclude the "tag" field from the response
:param show_delete_on_termination: True if the "delete_on_termination"
field should be in the response, False to exclude the
"delete_on_termination" field from the response
:param show_attachment_id_bdm_uuid: True if the "attachment_id" and
"bdm_uuid" fields should be in the response. Also controls when the
"id" field is included.
"""
d = {}
if not show_attachment_id_bdm_uuid:
d['id'] = bdm.volume_id
d['volumeId'] = bdm.volume_id
d['serverId'] = bdm.instance_uuid
if bdm.device_name:
d['device'] = bdm.device_name
if show_tag:
d['tag'] = bdm.tag
if show_delete_on_termination:
d['delete_on_termination'] = bdm.delete_on_termination
if show_attachment_id_bdm_uuid:
d['attachment_id'] = bdm.attachment_id
d['bdm_uuid'] = bdm.uuid
return d
def _check_request_version(req, min_version, method, server_id, server_state):
if api_version_request.is_supported(req, min_version):
return
exc_inv = exception.InstanceInvalidState(
attr='vm_state',
instance_uuid=server_id,
state=server_state,
method=method)
common.raise_http_conflict_for_instance_invalid_state(
exc_inv, method, server_id)
class VolumeAttachmentController(wsgi.Controller):
"""The volume attachment API controller for the OpenStack API.
A child resource of the server. Note that we use the volume id
as the ID of the attachment (though this is not guaranteed externally)
"""
def __init__(self):
super().__init__()
self.compute_api = compute.API()
self.volume_api = cinder.API()
@wsgi.expected_errors(404)
@validation.query_schema(schema.index_query, '2.0', '2.74')
@validation.query_schema(schema.index_query_v275, '2.75')
def index(self, req, server_id):
"""Returns the list of volume attachments for a given instance."""
context = req.environ['nova.context']
instance = common.get_instance(self.compute_api, context, server_id)
context.can(
va_policies.POLICY_ROOT % 'index',
target={'project_id': instance.project_id})
bdms = objects.BlockDeviceMappingList.get_by_instance_uuid(
context, instance.uuid)
limited_list = common.limited(bdms, req)
results = []
show_tag = api_version_request.is_supported(req, '2.70')
show_delete_on_termination = api_version_request.is_supported(
req, '2.79')
show_attachment_id_bdm_uuid = api_version_request.is_supported(
req, '2.89')
for bdm in limited_list:
if bdm.volume_id:
va = _translate_attachment_detail_view(
bdm,
show_tag=show_tag,
show_delete_on_termination=show_delete_on_termination,
show_attachment_id_bdm_uuid=show_attachment_id_bdm_uuid,
)
results.append(va)
return {'volumeAttachments': results}
@wsgi.expected_errors(404)
@validation.query_schema(schema.show_query)
def show(self, req, server_id, id):
"""Return data about the given volume attachment."""
context = req.environ['nova.context']
instance = common.get_instance(self.compute_api, context, server_id)
context.can(
va_policies.POLICY_ROOT % 'show',
target={'project_id': instance.project_id})
volume_id = id
try:
bdm = objects.BlockDeviceMapping.get_by_volume_and_instance(
context, volume_id, instance.uuid)
except exception.VolumeBDMNotFound:
msg = _(
"Instance %(instance)s is not attached "
"to volume %(volume)s"
) % {'instance': server_id, 'volume': volume_id}
raise exc.HTTPNotFound(explanation=msg)
show_tag = api_version_request.is_supported(req, '2.70')
show_delete_on_termination = api_version_request.is_supported(
req, '2.79')
show_attachment_id_bdm_uuid = api_version_request.is_supported(
req, '2.89')
return {
'volumeAttachment': _translate_attachment_detail_view(
bdm,
show_tag=show_tag,
show_delete_on_termination=show_delete_on_termination,
show_attachment_id_bdm_uuid=show_attachment_id_bdm_uuid,
)
}
# TODO(mriedem): This API should return a 202 instead of a 200 response.
@wsgi.expected_errors((400, 403, 404, 409))
@validation.schema(schema.create, '2.0', '2.48')
@validation.schema(schema.create_v249, '2.49', '2.78')
@validation.schema(schema.create_v279, '2.79')
def create(self, req, server_id, body):
"""Attach a volume to an instance."""
context = req.environ['nova.context']
instance = common.get_instance(self.compute_api, context, server_id)
context.can(
va_policies.POLICY_ROOT % 'create',
target={'project_id': instance.project_id})
volume_id = body['volumeAttachment']['volumeId']
device = body['volumeAttachment'].get('device')
tag = body['volumeAttachment'].get('tag')
delete_on_termination = body['volumeAttachment'].get(
'delete_on_termination', False)
if instance.vm_state in (
vm_states.SHELVED, vm_states.SHELVED_OFFLOADED,
):
_check_request_version(
req, '2.20', 'attach_volume', server_id, instance.vm_state)
try:
supports_multiattach = common.supports_multiattach_volume(req)
device = self.compute_api.attach_volume(
context, instance, volume_id, device, tag=tag,
supports_multiattach=supports_multiattach,
delete_on_termination=delete_on_termination)
except exception.VolumeNotFound as e:
raise exc.HTTPNotFound(explanation=e.format_message())
except (exception.InstanceIsLocked, exception.DevicePathInUse) as e:
raise exc.HTTPConflict(explanation=e.format_message())
except exception.InstanceInvalidState as state_error:
common.raise_http_conflict_for_instance_invalid_state(
state_error, 'attach_volume', server_id)
except (
exception.InvalidVolume,
exception.InvalidDevicePath,
exception.InvalidInput,
exception.VolumeTaggedAttachNotSupported,
exception.MultiattachNotSupportedOldMicroversion,
exception.MultiattachToShelvedNotSupported,
) as e:
raise exc.HTTPBadRequest(explanation=e.format_message())
except exception.TooManyDiskDevices as e:
raise exc.HTTPForbidden(explanation=e.format_message())
# The attach is async
# NOTE(mriedem): It would be nice to use
# _translate_attachment_summary_view here but that does not include
# the 'device' key if device is None or the empty string which would
# be a backward incompatible change.
attachment = {}
attachment['id'] = volume_id
attachment['serverId'] = server_id
attachment['volumeId'] = volume_id
attachment['device'] = device
if api_version_request.is_supported(req, '2.70'):
attachment['tag'] = tag
if api_version_request.is_supported(req, '2.79'):
attachment['delete_on_termination'] = delete_on_termination
return {'volumeAttachment': attachment}
def _update_volume_swap(self, req, instance, id, body):
context = req.environ['nova.context']
old_volume_id = id
try:
old_volume = self.volume_api.get(context, old_volume_id)
except exception.VolumeNotFound as e:
raise exc.HTTPNotFound(explanation=e.format_message())
if (
'migration_status' not in old_volume or
old_volume['migration_status'] in (None, '')
):
message = (
f"volume {old_volume_id} is not migrating; this API "
f"should only be called by Cinder")
raise exc.HTTPConflict(explanation=message)
new_volume_id = body['volumeAttachment']['volumeId']
try:
new_volume = self.volume_api.get(context, new_volume_id)
except exception.VolumeNotFound as e:
# NOTE: This BadRequest is different from the above NotFound even
# though the same VolumeNotFound exception. This is intentional
# because new_volume_id is specified in a request body and if a
# nonexistent resource in the body (not URI) the code should be
# 400 Bad Request as API-WG guideline. On the other hand,
# old_volume_id is specified with URI. So it is valid to return
# NotFound response if that is not existent.
raise exc.HTTPBadRequest(explanation=e.format_message())
try:
self.compute_api.swap_volume(
context, instance, old_volume, new_volume)
except exception.VolumeBDMNotFound as e:
raise exc.HTTPNotFound(explanation=e.format_message())
except (
exception.InvalidVolume,
exception.MultiattachSwapVolumeNotSupported,
) as e:
raise exc.HTTPBadRequest(explanation=e.format_message())
except exception.InstanceIsLocked as e:
raise exc.HTTPConflict(explanation=e.format_message())
except exception.InstanceInvalidState as state_error:
common.raise_http_conflict_for_instance_invalid_state(
state_error, 'swap_volume', instance.uuid)
def _update_volume_regular(self, req, instance, id, body):
context = req.environ['nova.context']
att = body['volumeAttachment']
# NOTE(danms): We may be doing an update of regular parameters in
# the midst of a swap operation, so to find the original BDM, we need
# to use the old volume ID, which is the one in the path.
volume_id = id
try:
bdm = objects.BlockDeviceMapping.get_by_volume_and_instance(
context, volume_id, instance.uuid)
# NOTE(danms): The attachment id is just the (current) volume id
if 'id' in att and att['id'] != volume_id:
raise exc.HTTPBadRequest(
explanation='The id property is not mutable')
if 'serverId' in att and att['serverId'] != instance.uuid:
raise exc.HTTPBadRequest(
explanation='The serverId property is not mutable')
if 'device' in att and att['device'] != bdm.device_name:
raise exc.HTTPBadRequest(
explanation='The device property is not mutable')
if 'tag' in att and att['tag'] != bdm.tag:
raise exc.HTTPBadRequest(
explanation='The tag property is not mutable')
if 'delete_on_termination' in att:
bdm.delete_on_termination = strutils.bool_from_string(
att['delete_on_termination'], strict=True)
bdm.save()
except exception.VolumeBDMNotFound as e:
raise exc.HTTPNotFound(explanation=e.format_message())
@wsgi.response(202)
@wsgi.expected_errors((400, 404, 409))
@validation.schema(schema.update, '2.0', '2.84')
@validation.schema(schema.update_v285, '2.85')
def update(self, req, server_id, id, body):
context = req.environ['nova.context']
instance = common.get_instance(self.compute_api, context, server_id)
attachment = body['volumeAttachment']
volume_id = attachment['volumeId']
only_swap = not api_version_request.is_supported(req, '2.85')
# NOTE(brinzhang): If the 'volumeId' requested by the user is
# different from the 'id' in the url path, or only swap is allowed by
# the microversion, we should check the swap volume policy.
# otherwise, check the volume update policy.
# NOTE(gmann) We pass empty target to policy enforcement. This API
# is called by cinder which does not have correct project_id where
# server belongs to. By passing the empty target, we make sure that
# we do not check the requester project_id and allow users with
# allowed role to perform the swap volume.
if only_swap or id != volume_id:
context.can(va_policies.POLICY_ROOT % 'swap', target={})
else:
context.can(
va_policies.POLICY_ROOT % 'update',
target={'project_id': instance.project_id})
if only_swap:
# NOTE(danms): Original behavior is always call swap on PUT
self._update_volume_swap(req, instance, id, body)
else:
# NOTE(danms): New behavior is update any supported attachment
# properties first, and then call swap if volumeId differs
self._update_volume_regular(req, instance, id, body)
if id != volume_id:
self._update_volume_swap(req, instance, id, body)
@wsgi.response(202)
@wsgi.expected_errors((400, 403, 404, 409))
def delete(self, req, server_id, id):
"""Detach a volume from an instance."""
context = req.environ['nova.context']
instance = common.get_instance(
self.compute_api, context, server_id,
expected_attrs=['device_metadata'])
context.can(
va_policies.POLICY_ROOT % 'delete',
target={'project_id': instance.project_id})
volume_id = id
if instance.vm_state in (
vm_states.SHELVED, vm_states.SHELVED_OFFLOADED
):
_check_request_version(
req, '2.20', 'detach_volume', server_id, instance.vm_state)
try:
volume = self.volume_api.get(context, volume_id)
except exception.VolumeNotFound as e:
raise exc.HTTPNotFound(explanation=e.format_message())
try:
bdm = objects.BlockDeviceMapping.get_by_volume_and_instance(
context, volume_id, instance.uuid)
except exception.VolumeBDMNotFound:
msg = _(
"Instance %(instance)s is not attached "
"to volume %(volume)s"
) % {'instance': server_id, 'volume': volume_id}
raise exc.HTTPNotFound(explanation=msg)
if bdm.is_root:
msg = _("Cannot detach a root device volume")
raise exc.HTTPBadRequest(explanation=msg)
try:
self.compute_api.detach_volume(context, instance, volume)
except exception.InvalidVolume as e:
raise exc.HTTPBadRequest(explanation=e.format_message())
except exception.InvalidInput as e:
raise exc.HTTPBadRequest(explanation=e.format_message())
except (exception.InstanceIsLocked, exception.ServiceUnavailable) as e:
raise exc.HTTPConflict(explanation=e.format_message())
except exception.InstanceInvalidState as state_error:
common.raise_http_conflict_for_instance_invalid_state(
state_error, 'detach_volume', server_id)

View File

@@ -15,35 +15,22 @@
"""The volumes extension."""
from oslo_utils import strutils
from webob import exc
from nova.api.openstack import api_version_request
from nova.api.openstack.api_version_request \
import MAX_PROXY_API_SUPPORT_VERSION
from nova.api.openstack import common
from nova.api.openstack.compute.schemas import volume_attachment as volume_attachment_schema # noqa: E501
from nova.api.openstack.compute.schemas import volumes as volumes_schema
from nova.api.openstack.compute.schemas import volumes as schema
from nova.api.openstack import wsgi
from nova.api import validation
from nova.compute import api as compute
from nova.compute import vm_states
from nova import exception
from nova.i18n import _
from nova import objects
from nova.policies import volumes as vol_policies
from nova.policies import volumes_attachments as va_policies
from nova.volume import cinder
def _translate_volume_detail_view(context, vol):
"""Maps keys for volumes details view."""
d = _translate_volume_summary_view(context, vol)
# No additional data / lookups at the moment
return d
return _translate_volume_summary_view(context, vol)
def _translate_volume_summary_view(context, vol):
@@ -106,17 +93,18 @@ class VolumeController(wsgi.Controller):
"""The Volumes API controller for the OpenStack API."""
def __init__(self):
super(VolumeController, self).__init__()
super().__init__()
self.volume_api = cinder.API()
@wsgi.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@wsgi.expected_errors(404)
@validation.query_schema(volumes_schema.show_query)
@validation.query_schema(schema.show_query)
def show(self, req, id):
"""Return data about the given volume."""
context = req.environ['nova.context']
context.can(vol_policies.POLICY_NAME % 'show',
target={'project_id': context.project_id})
context.can(
vol_policies.POLICY_NAME % 'show',
target={'project_id': context.project_id})
try:
vol = self.volume_api.get(context, id)
@@ -131,8 +119,9 @@ class VolumeController(wsgi.Controller):
def delete(self, req, id):
"""Delete a volume."""
context = req.environ['nova.context']
context.can(vol_policies.POLICY_NAME % 'delete',
target={'project_id': context.project_id})
context.can(
vol_policies.POLICY_NAME % 'delete',
target={'project_id': context.project_id})
try:
self.volume_api.delete(context, id)
@@ -143,22 +132,24 @@ class VolumeController(wsgi.Controller):
@wsgi.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@wsgi.expected_errors(())
@validation.query_schema(volumes_schema.index_query)
@validation.query_schema(schema.index_query)
def index(self, req):
"""Returns a summary list of volumes."""
context = req.environ['nova.context']
context.can(vol_policies.POLICY_NAME % 'list',
target={'project_id': context.project_id})
context.can(
vol_policies.POLICY_NAME % 'list',
target={'project_id': context.project_id})
return self._items(req, entity_maker=_translate_volume_summary_view)
@wsgi.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@wsgi.expected_errors(())
@validation.query_schema(volumes_schema.detail_query)
@validation.query_schema(schema.detail_query)
def detail(self, req):
"""Returns a detailed list of volumes."""
context = req.environ['nova.context']
context.can(vol_policies.POLICY_NAME % 'detail',
target={'project_id': context.project_id})
context.can(
vol_policies.POLICY_NAME % 'detail',
target={'project_id': context.project_id})
return self._items(req, entity_maker=_translate_volume_detail_view)
def _items(self, req, entity_maker):
@@ -172,7 +163,7 @@ class VolumeController(wsgi.Controller):
@wsgi.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@wsgi.expected_errors((400, 403, 404))
@validation.schema(volumes_schema.create)
@validation.schema(schema.create)
def create(self, req, body):
"""Creates a new volume."""
context = req.environ['nova.context']
@@ -208,8 +199,7 @@ class VolumeController(wsgi.Controller):
snapshot=snapshot,
volume_type=vol_type,
metadata=metadata,
availability_zone=availability_zone
)
availability_zone=availability_zone)
except exception.InvalidInput as err:
raise exc.HTTPBadRequest(explanation=err.format_message())
except exception.OverQuota as err:
@@ -224,479 +214,3 @@ class VolumeController(wsgi.Controller):
location = '%s/%s' % (req.url, new_volume['id'])
return wsgi.ResponseObject(result, headers=dict(location=location))
def _translate_attachment_detail_view(
bdm,
show_tag=False,
show_delete_on_termination=False,
show_attachment_id_bdm_uuid=False,
):
"""Maps keys for attachment details view.
:param bdm: BlockDeviceMapping object for an attached volume
:param show_tag: True if the "tag" field should be in the response, False
to exclude the "tag" field from the response
:param show_delete_on_termination: True if the "delete_on_termination"
field should be in the response, False to exclude the
"delete_on_termination" field from the response
:param show_attachment_id_bdm_uuid: True if the "attachment_id" and
"bdm_uuid" fields should be in the response. Also controls when the
"id" field is included.
"""
d = {}
if not show_attachment_id_bdm_uuid:
d['id'] = bdm.volume_id
d['volumeId'] = bdm.volume_id
d['serverId'] = bdm.instance_uuid
if bdm.device_name:
d['device'] = bdm.device_name
if show_tag:
d['tag'] = bdm.tag
if show_delete_on_termination:
d['delete_on_termination'] = bdm.delete_on_termination
if show_attachment_id_bdm_uuid:
d['attachment_id'] = bdm.attachment_id
d['bdm_uuid'] = bdm.uuid
return d
def _check_request_version(req, min_version, method, server_id, server_state):
if not api_version_request.is_supported(req, min_version):
exc_inv = exception.InstanceInvalidState(
attr='vm_state',
instance_uuid=server_id,
state=server_state,
method=method)
common.raise_http_conflict_for_instance_invalid_state(
exc_inv,
method,
server_id)
class VolumeAttachmentController(wsgi.Controller):
"""The volume attachment API controller for the OpenStack API.
A child resource of the server. Note that we use the volume id
as the ID of the attachment (though this is not guaranteed externally)
"""
def __init__(self):
self.compute_api = compute.API()
self.volume_api = cinder.API()
super(VolumeAttachmentController, self).__init__()
@wsgi.expected_errors(404)
@validation.query_schema(volumes_schema.index_query_275, '2.75')
@validation.query_schema(volumes_schema.index_query, '2.0', '2.74')
def index(self, req, server_id):
"""Returns the list of volume attachments for a given instance."""
context = req.environ['nova.context']
instance = common.get_instance(self.compute_api, context, server_id)
context.can(va_policies.POLICY_ROOT % 'index',
target={'project_id': instance.project_id})
bdms = objects.BlockDeviceMappingList.get_by_instance_uuid(
context, instance.uuid)
limited_list = common.limited(bdms, req)
results = []
show_tag = api_version_request.is_supported(req, '2.70')
show_delete_on_termination = api_version_request.is_supported(
req, '2.79')
show_attachment_id_bdm_uuid = api_version_request.is_supported(
req, '2.89')
for bdm in limited_list:
if bdm.volume_id:
va = _translate_attachment_detail_view(
bdm,
show_tag=show_tag,
show_delete_on_termination=show_delete_on_termination,
show_attachment_id_bdm_uuid=show_attachment_id_bdm_uuid,
)
results.append(va)
return {'volumeAttachments': results}
@wsgi.expected_errors(404)
@validation.query_schema(volume_attachment_schema.show_query)
def show(self, req, server_id, id):
"""Return data about the given volume attachment."""
context = req.environ['nova.context']
instance = common.get_instance(self.compute_api, context, server_id)
context.can(va_policies.POLICY_ROOT % 'show',
target={'project_id': instance.project_id})
volume_id = id
try:
bdm = objects.BlockDeviceMapping.get_by_volume_and_instance(
context, volume_id, instance.uuid)
except exception.VolumeBDMNotFound:
msg = (_("Instance %(instance)s is not attached "
"to volume %(volume)s") %
{'instance': server_id, 'volume': volume_id})
raise exc.HTTPNotFound(explanation=msg)
show_tag = api_version_request.is_supported(req, '2.70')
show_delete_on_termination = api_version_request.is_supported(
req, '2.79')
show_attachment_id_bdm_uuid = api_version_request.is_supported(
req, '2.89')
return {
'volumeAttachment': _translate_attachment_detail_view(
bdm,
show_tag=show_tag,
show_delete_on_termination=show_delete_on_termination,
show_attachment_id_bdm_uuid=show_attachment_id_bdm_uuid,
)
}
# TODO(mriedem): This API should return a 202 instead of a 200 response.
@wsgi.expected_errors((400, 403, 404, 409))
@validation.schema(volumes_schema.create_volume_attachment, '2.0', '2.48')
@validation.schema(volumes_schema.create_volume_attachment_v249, '2.49',
'2.78')
@validation.schema(volumes_schema.create_volume_attachment_v279, '2.79')
def create(self, req, server_id, body):
"""Attach a volume to an instance."""
context = req.environ['nova.context']
instance = common.get_instance(self.compute_api, context, server_id)
context.can(va_policies.POLICY_ROOT % 'create',
target={'project_id': instance.project_id})
volume_id = body['volumeAttachment']['volumeId']
device = body['volumeAttachment'].get('device')
tag = body['volumeAttachment'].get('tag')
delete_on_termination = body['volumeAttachment'].get(
'delete_on_termination', False)
if instance.vm_state in (vm_states.SHELVED,
vm_states.SHELVED_OFFLOADED):
_check_request_version(req, '2.20', 'attach_volume',
server_id, instance.vm_state)
try:
supports_multiattach = common.supports_multiattach_volume(req)
device = self.compute_api.attach_volume(
context, instance, volume_id, device, tag=tag,
supports_multiattach=supports_multiattach,
delete_on_termination=delete_on_termination)
except exception.VolumeNotFound as e:
raise exc.HTTPNotFound(explanation=e.format_message())
except (exception.InstanceIsLocked,
exception.DevicePathInUse) as e:
raise exc.HTTPConflict(explanation=e.format_message())
except exception.InstanceInvalidState as state_error:
common.raise_http_conflict_for_instance_invalid_state(state_error,
'attach_volume', server_id)
except (exception.InvalidVolume,
exception.InvalidDevicePath,
exception.InvalidInput,
exception.VolumeTaggedAttachNotSupported,
exception.MultiattachNotSupportedOldMicroversion,
exception.MultiattachToShelvedNotSupported) as e:
raise exc.HTTPBadRequest(explanation=e.format_message())
except exception.TooManyDiskDevices as e:
raise exc.HTTPForbidden(explanation=e.format_message())
# The attach is async
# NOTE(mriedem): It would be nice to use
# _translate_attachment_summary_view here but that does not include
# the 'device' key if device is None or the empty string which would
# be a backward incompatible change.
attachment = {}
attachment['id'] = volume_id
attachment['serverId'] = server_id
attachment['volumeId'] = volume_id
attachment['device'] = device
if api_version_request.is_supported(req, '2.70'):
attachment['tag'] = tag
if api_version_request.is_supported(req, '2.79'):
attachment['delete_on_termination'] = delete_on_termination
return {'volumeAttachment': attachment}
def _update_volume_swap(self, req, instance, id, body):
context = req.environ['nova.context']
old_volume_id = id
try:
old_volume = self.volume_api.get(context, old_volume_id)
except exception.VolumeNotFound as e:
raise exc.HTTPNotFound(explanation=e.format_message())
if ('migration_status' not in old_volume or
old_volume['migration_status'] in (None, '')):
message = (f"volume {old_volume_id} is not migrating this api "
"should only be called by Cinder")
raise exc.HTTPConflict(explanation=message)
new_volume_id = body['volumeAttachment']['volumeId']
try:
new_volume = self.volume_api.get(context, new_volume_id)
except exception.VolumeNotFound as e:
# NOTE: This BadRequest is different from the above NotFound even
# though the same VolumeNotFound exception. This is intentional
# because new_volume_id is specified in a request body and if a
# nonexistent resource in the body (not URI) the code should be
# 400 Bad Request as API-WG guideline. On the other hand,
# old_volume_id is specified with URI. So it is valid to return
# NotFound response if that is not existent.
raise exc.HTTPBadRequest(explanation=e.format_message())
try:
self.compute_api.swap_volume(context, instance, old_volume,
new_volume)
except exception.VolumeBDMNotFound as e:
raise exc.HTTPNotFound(explanation=e.format_message())
except (exception.InvalidVolume,
exception.MultiattachSwapVolumeNotSupported) as e:
raise exc.HTTPBadRequest(explanation=e.format_message())
except exception.InstanceIsLocked as e:
raise exc.HTTPConflict(explanation=e.format_message())
except exception.InstanceInvalidState as state_error:
common.raise_http_conflict_for_instance_invalid_state(state_error,
'swap_volume', instance.uuid)
def _update_volume_regular(self, req, instance, id, body):
context = req.environ['nova.context']
att = body['volumeAttachment']
# NOTE(danms): We may be doing an update of regular parameters in
# the midst of a swap operation, so to find the original BDM, we need
# to use the old volume ID, which is the one in the path.
volume_id = id
try:
bdm = objects.BlockDeviceMapping.get_by_volume_and_instance(
context, volume_id, instance.uuid)
# NOTE(danms): The attachment id is just the (current) volume id
if 'id' in att and att['id'] != volume_id:
raise exc.HTTPBadRequest(explanation='The id property is '
'not mutable')
if 'serverId' in att and att['serverId'] != instance.uuid:
raise exc.HTTPBadRequest(explanation='The serverId property '
'is not mutable')
if 'device' in att and att['device'] != bdm.device_name:
raise exc.HTTPBadRequest(explanation='The device property is '
'not mutable')
if 'tag' in att and att['tag'] != bdm.tag:
raise exc.HTTPBadRequest(explanation='The tag property is '
'not mutable')
if 'delete_on_termination' in att:
bdm.delete_on_termination = strutils.bool_from_string(
att['delete_on_termination'], strict=True)
bdm.save()
except exception.VolumeBDMNotFound as e:
raise exc.HTTPNotFound(explanation=e.format_message())
@wsgi.response(202)
@wsgi.expected_errors((400, 404, 409))
@validation.schema(volumes_schema.update_volume_attachment, '2.0', '2.84')
@validation.schema(volumes_schema.update_volume_attachment_v285, '2.85')
def update(self, req, server_id, id, body):
context = req.environ['nova.context']
instance = common.get_instance(self.compute_api, context, server_id)
attachment = body['volumeAttachment']
volume_id = attachment['volumeId']
only_swap = not api_version_request.is_supported(req, '2.85')
# NOTE(brinzhang): If the 'volumeId' requested by the user is
# different from the 'id' in the url path, or only swap is allowed by
# the microversion, we should check the swap volume policy.
# otherwise, check the volume update policy.
# NOTE(gmann) We pass empty target to policy enforcement. This API
# is called by cinder which does not have correct project_id where
# server belongs to. By passing the empty target, we make sure that
# we do not check the requester project_id and allow users with
# allowed role to perform the swap volume.
if only_swap or id != volume_id:
context.can(va_policies.POLICY_ROOT % 'swap', target={})
else:
context.can(va_policies.POLICY_ROOT % 'update',
target={'project_id': instance.project_id})
if only_swap:
# NOTE(danms): Original behavior is always call swap on PUT
self._update_volume_swap(req, instance, id, body)
else:
# NOTE(danms): New behavior is update any supported attachment
# properties first, and then call swap if volumeId differs
self._update_volume_regular(req, instance, id, body)
if id != volume_id:
self._update_volume_swap(req, instance, id, body)
@wsgi.response(202)
@wsgi.expected_errors((400, 403, 404, 409))
def delete(self, req, server_id, id):
"""Detach a volume from an instance."""
context = req.environ['nova.context']
instance = common.get_instance(self.compute_api, context, server_id,
expected_attrs=['device_metadata'])
context.can(va_policies.POLICY_ROOT % 'delete',
target={'project_id': instance.project_id})
volume_id = id
if instance.vm_state in (vm_states.SHELVED,
vm_states.SHELVED_OFFLOADED):
_check_request_version(req, '2.20', 'detach_volume',
server_id, instance.vm_state)
try:
volume = self.volume_api.get(context, volume_id)
except exception.VolumeNotFound as e:
raise exc.HTTPNotFound(explanation=e.format_message())
try:
bdm = objects.BlockDeviceMapping.get_by_volume_and_instance(
context, volume_id, instance.uuid)
except exception.VolumeBDMNotFound:
msg = (_("Instance %(instance)s is not attached "
"to volume %(volume)s") %
{'instance': server_id, 'volume': volume_id})
raise exc.HTTPNotFound(explanation=msg)
if bdm.is_root:
msg = _("Cannot detach a root device volume")
raise exc.HTTPBadRequest(explanation=msg)
try:
self.compute_api.detach_volume(context, instance, volume)
except exception.InvalidVolume as e:
raise exc.HTTPBadRequest(explanation=e.format_message())
except exception.InvalidInput as e:
raise exc.HTTPBadRequest(explanation=e.format_message())
except (exception.InstanceIsLocked, exception.ServiceUnavailable) as e:
raise exc.HTTPConflict(explanation=e.format_message())
except exception.InstanceInvalidState as state_error:
common.raise_http_conflict_for_instance_invalid_state(state_error,
'detach_volume', server_id)
def _translate_snapshot_detail_view(context, vol):
"""Maps keys for snapshots details view."""
d = _translate_snapshot_summary_view(context, vol)
# NOTE(gagupta): No additional data / lookups at the moment
return d
def _translate_snapshot_summary_view(context, vol):
"""Maps keys for snapshots summary view."""
d = {}
d['id'] = vol['id']
d['volumeId'] = vol['volume_id']
d['status'] = vol['status']
# NOTE(gagupta): We map volume_size as the snapshot size
d['size'] = vol['volume_size']
d['createdAt'] = vol['created_at']
d['displayName'] = vol['display_name']
d['displayDescription'] = vol['display_description']
return d
class SnapshotController(wsgi.Controller):
"""The Snapshots API controller for the OpenStack API."""
def __init__(self):
self.volume_api = cinder.API()
super(SnapshotController, self).__init__()
@wsgi.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@wsgi.expected_errors(404)
@validation.query_schema(volumes_schema.snapshot_show_query)
def show(self, req, id):
"""Return data about the given snapshot."""
context = req.environ['nova.context']
context.can(vol_policies.POLICY_NAME % 'snapshots:show',
target={'project_id': context.project_id})
try:
vol = self.volume_api.get_snapshot(context, id)
except exception.SnapshotNotFound as e:
raise exc.HTTPNotFound(explanation=e.format_message())
return {'snapshot': _translate_snapshot_detail_view(context, vol)}
@wsgi.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@wsgi.response(202)
@wsgi.expected_errors(404)
def delete(self, req, id):
"""Delete a snapshot."""
context = req.environ['nova.context']
context.can(vol_policies.POLICY_NAME % 'snapshots:delete',
target={'project_id': context.project_id})
try:
self.volume_api.delete_snapshot(context, id)
except exception.SnapshotNotFound as e:
raise exc.HTTPNotFound(explanation=e.format_message())
@wsgi.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@wsgi.expected_errors(())
@validation.query_schema(volumes_schema.index_query)
def index(self, req):
"""Returns a summary list of snapshots."""
context = req.environ['nova.context']
context.can(vol_policies.POLICY_NAME % 'snapshots:list',
target={'project_id': context.project_id})
return self._items(req, entity_maker=_translate_snapshot_summary_view)
@wsgi.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@wsgi.expected_errors(())
@validation.query_schema(volumes_schema.detail_query)
def detail(self, req):
"""Returns a detailed list of snapshots."""
context = req.environ['nova.context']
context.can(vol_policies.POLICY_NAME % 'snapshots:detail',
target={'project_id': context.project_id})
return self._items(req, entity_maker=_translate_snapshot_detail_view)
def _items(self, req, entity_maker):
"""Returns a list of snapshots, transformed through entity_maker."""
context = req.environ['nova.context']
snapshots = self.volume_api.get_all_snapshots(context)
limited_list = common.limited(snapshots, req)
res = [entity_maker(context, snapshot) for snapshot in limited_list]
return {'snapshots': res}
@wsgi.api_version("2.1", MAX_PROXY_API_SUPPORT_VERSION)
@wsgi.expected_errors((400, 403))
@validation.schema(volumes_schema.snapshot_create)
def create(self, req, body):
"""Creates a new snapshot."""
context = req.environ['nova.context']
context.can(vol_policies.POLICY_NAME % 'snapshots:create',
target={'project_id': context.project_id})
snapshot = body['snapshot']
volume_id = snapshot['volume_id']
force = snapshot.get('force', False)
force = strutils.bool_from_string(force, strict=True)
if force:
create_func = self.volume_api.create_snapshot_force
else:
create_func = self.volume_api.create_snapshot
try:
new_snapshot = create_func(context, volume_id,
snapshot.get('display_name'),
snapshot.get('display_description'))
except exception.OverQuota as e:
raise exc.HTTPForbidden(explanation=e.format_message())
retval = _translate_snapshot_detail_view(context, new_snapshot)
return {'snapshot': retval}

View File

@@ -0,0 +1,75 @@
# Copyright 2012 Nebula, Inc.
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova.tests.functional.api_sample_tests import api_sample_base
from nova.tests.unit.api.openstack import fakes
class SnapshotsSampleJsonTests(api_sample_base.ApiSampleTestBaseV21):
sample_dir = "os-snapshots"
create_subs = {
'snapshot_name': 'snap-001',
'description': 'Daily backup',
'volume_id': '521752a6-acf6-4b2d-bc7a-119f9148cd8c'
}
def setUp(self):
super().setUp()
self.stub_out(
"nova.volume.cinder.API.create_snapshot",
fakes.stub_snapshot_create)
self.stub_out(
"nova.volume.cinder.API.delete_snapshot",
fakes.stub_snapshot_delete)
self.stub_out(
"nova.volume.cinder.API.get_all_snapshots",
fakes.stub_snapshot_get_all)
self.stub_out(
"nova.volume.cinder.API.get_snapshot",
fakes.stub_snapshot_get)
def _create_snapshot(self):
response = self._do_post(
"os-snapshots", "snapshot-create-req", self.create_subs)
return response
def test_snapshots_create(self):
response = self._create_snapshot()
self._verify_response(
"snapshot-create-resp", self.create_subs, response, 200)
def test_snapshots_delete(self):
self._create_snapshot()
response = self._do_delete('os-snapshots/100')
self.assertEqual(202, response.status_code)
self.assertEqual('', response.text)
def test_snapshots_detail(self):
response = self._do_get('os-snapshots/detail')
self._verify_response('snapshots-detail-resp', {}, response, 200)
def test_snapshots_list(self):
response = self._do_get('os-snapshots')
self._verify_response('snapshots-list-resp', {}, response, 200)
def test_snapshots_show(self):
response = self._do_get('os-snapshots/100')
subs = {
'snapshot_name': 'Default name',
'description': 'Default description'
}
self._verify_response('snapshots-show-resp', subs, response, 200)

View File

@@ -0,0 +1,161 @@
# Copyright 2012 Nebula, Inc.
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova.tests import fixtures
from nova.tests.functional.api_sample_tests import test_servers
class VolumeAttachmentsSample(test_servers.ServersSampleBase):
sample_dir = "os-volume_attachments"
# The 'os_compute_api:os-volumes-attachments:swap' policy is admin-only
ADMIN_API = True
OLD_VOLUME_ID = fixtures.CinderFixture.SWAP_OLD_VOL
NEW_VOLUME_ID = fixtures.CinderFixture.SWAP_NEW_VOL
def setUp(self):
super().setUp()
self.cinder = self.useFixture(fixtures.CinderFixture(self))
self.server_id = self._post_server()
def _get_vol_attachment_subs(self, subs):
"""Allows subclasses to override/supplement request/response subs"""
return subs
def test_attach_volume_to_server(self):
subs = {
'volume_id': self.OLD_VOLUME_ID,
'device': '/dev/sdb'
}
subs = self._get_vol_attachment_subs(subs)
response = self._do_post(
'servers/%s/os-volume_attachments' % self.server_id,
'attach-volume-to-server-req', subs)
self._verify_response(
'attach-volume-to-server-resp', subs, response, 200)
return subs
def test_list_volume_attachments(self):
subs = self.test_attach_volume_to_server()
# Attach another volume to the server so the response has multiple
# which is more interesting since it's a list of dicts.
body = {
'volumeAttachment': {
'volumeId': self.NEW_VOLUME_ID
}
}
self.api.post_server_volume(self.server_id, body)
response = self._do_get(
'servers/%s/os-volume_attachments' % self.server_id)
subs['volume_id2'] = self.NEW_VOLUME_ID
self._verify_response(
'list-volume-attachments-resp', subs, response, 200)
def test_volume_attachment_detail(self):
subs = self.test_attach_volume_to_server()
response = self._do_get(
'servers/%s/os-volume_attachments/%s' % (
self.server_id, subs['volume_id']))
self._verify_response(
'volume-attachment-detail-resp', subs, response, 200)
def test_volume_attachment_delete(self):
subs = self.test_attach_volume_to_server()
response = self._do_delete(
'servers/%s/os-volume_attachments/%s' % (
self.server_id, subs['volume_id']))
self.assertEqual(202, response.status_code)
self.assertEqual('', response.text)
def test_volume_attachment_update(self):
subs = self.test_attach_volume_to_server()
subs['new_volume_id'] = self.NEW_VOLUME_ID
response = self._do_put(
'servers/%s/os-volume_attachments/%s' % (
self.server_id, subs['volume_id']),
'update-volume-req', subs)
self.assertEqual(202, response.status_code)
self.assertEqual('', response.text)
class VolumeAttachmentsSampleV249(VolumeAttachmentsSample):
"""Microversion 2.49 adds the "tag" parameter to the request body"""
microversion = '2.49'
scenarios = [('v2_49', {'api_major_version': 'v2.1'})]
def setUp(self):
super().setUp()
# Stub out ComputeManager._delete_disk_metadata since the fake virt
# driver does not actually update the instance.device_metadata.devices
# list with the tagged bdm disk device metadata.
self.stub_out(
'nova.compute.manager.ComputeManager._delete_disk_metadata',
lambda *a, **kw: None)
def _get_vol_attachment_subs(self, subs):
return dict(subs, tag='foo')
class VolumeAttachmentsSampleV270(VolumeAttachmentsSampleV249):
"""Microversion 2.70 adds the "tag" parameter to the response body"""
microversion = '2.70'
scenarios = [('v2_70', {'api_major_version': 'v2.1'})]
class VolumeAttachmentsSampleV279(VolumeAttachmentsSampleV270):
"""Microversion 2.79 adds the "delete_on_termination" parameter to the
request and response body.
"""
microversion = '2.79'
scenarios = [('v2_79', {'api_major_version': 'v2.1'})]
class VolumeAttachmentsSampleV285(VolumeAttachmentsSampleV279):
"""Microversion 2.85 adds the ``PUT
/servers/{server_id}/os-volume_attachments/{volume_id}``
support for specifying ``delete_on_termination`` field in the request
body to re-config the attached volume whether to delete when the instance
is deleted.
"""
microversion = '2.85'
scenarios = [('v2_85', {'api_major_version': 'v2.1'})]
def test_volume_attachment_update(self):
subs = self.test_attach_volume_to_server()
attached_volume_id = subs['volume_id']
subs['server_id'] = self.server_id
response = self._do_put(
'servers/%s/os-volume_attachments/%s' % (
self.server_id, attached_volume_id),
'update-volume-attachment-delete-flag-req', subs)
self.assertEqual(202, response.status_code)
self.assertEqual('', response.text)
# Make sure the attached volume was changed
attachments = self.api.api_get(
'/servers/%s/os-volume_attachments' % self.server_id
).body['volumeAttachments']
self.assertEqual(1, len(attachments))
self.assertEqual(self.server_id, attachments[0]['serverId'])
self.assertTrue(attachments[0]['delete_on_termination'])
class VolumeAttachmentsSampleV289(VolumeAttachmentsSampleV285):
"""Microversion 2.89 adds the "attachment_id" parameter to the
response body of show and list.
"""
microversion = '2.89'
scenarios = [('v2_89', {'api_major_version': 'v2.1'})]

View File

@@ -15,67 +15,10 @@
import datetime
from nova.tests import fixtures
from nova.tests.functional.api_sample_tests import api_sample_base
from nova.tests.functional.api_sample_tests import test_servers
from nova.tests.unit.api.openstack import fakes
class SnapshotsSampleJsonTests(api_sample_base.ApiSampleTestBaseV21):
sample_dir = "os-volumes"
create_subs = {
'snapshot_name': 'snap-001',
'description': 'Daily backup',
'volume_id': '521752a6-acf6-4b2d-bc7a-119f9148cd8c'
}
def setUp(self):
super(SnapshotsSampleJsonTests, self).setUp()
self.stub_out("nova.volume.cinder.API.get_all_snapshots",
fakes.stub_snapshot_get_all)
self.stub_out("nova.volume.cinder.API.get_snapshot",
fakes.stub_snapshot_get)
def _create_snapshot(self):
self.stub_out("nova.volume.cinder.API.create_snapshot",
fakes.stub_snapshot_create)
response = self._do_post("os-snapshots",
"snapshot-create-req",
self.create_subs)
return response
def test_snapshots_create(self):
response = self._create_snapshot()
self._verify_response("snapshot-create-resp",
self.create_subs, response, 200)
def test_snapshots_delete(self):
self.stub_out("nova.volume.cinder.API.delete_snapshot",
fakes.stub_snapshot_delete)
self._create_snapshot()
response = self._do_delete('os-snapshots/100')
self.assertEqual(202, response.status_code)
self.assertEqual('', response.text)
def test_snapshots_detail(self):
response = self._do_get('os-snapshots/detail')
self._verify_response('snapshots-detail-resp', {}, response, 200)
def test_snapshots_list(self):
response = self._do_get('os-snapshots')
self._verify_response('snapshots-list-resp', {}, response, 200)
def test_snapshots_show(self):
response = self._do_get('os-snapshots/100')
subs = {
'snapshot_name': 'Default name',
'description': 'Default description'
}
self._verify_response('snapshots-show-resp', subs, response, 200)
def _get_volume_id():
return 'a26887c6-c47b-4654-abb5-dfadf7d3f803'
@@ -130,7 +73,7 @@ class VolumesSampleJsonTest(test_servers.ServersSampleBase):
sample_dir = "os-volumes"
def setUp(self):
super(VolumesSampleJsonTest, self).setUp()
super().setUp()
fakes.stub_out_networking(self)
self.stub_out("nova.volume.cinder.API.delete",
@@ -187,143 +130,3 @@ class VolumesSampleJsonTest(test_servers.ServersSampleBase):
response = self._do_delete('os-volumes/%s' % vol_id)
self.assertEqual(202, response.status_code)
self.assertEqual('', response.text)
class VolumeAttachmentsSample(test_servers.ServersSampleBase):
# The 'os_compute_api:os-volumes-attachments:swap' policy is admin-only
ADMIN_API = True
sample_dir = "os-volumes"
OLD_VOLUME_ID = fixtures.CinderFixture.SWAP_OLD_VOL
NEW_VOLUME_ID = fixtures.CinderFixture.SWAP_NEW_VOL
def setUp(self):
super(VolumeAttachmentsSample, self).setUp()
self.cinder = self.useFixture(fixtures.CinderFixture(self))
self.server_id = self._post_server()
def _get_vol_attachment_subs(self, subs):
"""Allows subclasses to override/supplement request/response subs"""
return subs
def test_attach_volume_to_server(self):
subs = {
'volume_id': self.OLD_VOLUME_ID,
'device': '/dev/sdb'
}
subs = self._get_vol_attachment_subs(subs)
response = self._do_post('servers/%s/os-volume_attachments'
% self.server_id,
'attach-volume-to-server-req', subs)
self._verify_response('attach-volume-to-server-resp', subs,
response, 200)
return subs
def test_list_volume_attachments(self):
subs = self.test_attach_volume_to_server()
# Attach another volume to the server so the response has multiple
# which is more interesting since it's a list of dicts.
body = {
'volumeAttachment': {
'volumeId': self.NEW_VOLUME_ID
}
}
self.api.post_server_volume(self.server_id, body)
response = self._do_get('servers/%s/os-volume_attachments'
% self.server_id)
subs['volume_id2'] = self.NEW_VOLUME_ID
self._verify_response('list-volume-attachments-resp', subs,
response, 200)
def test_volume_attachment_detail(self):
subs = self.test_attach_volume_to_server()
response = self._do_get('servers/%s/os-volume_attachments/%s'
% (self.server_id, subs['volume_id']))
self._verify_response('volume-attachment-detail-resp', subs,
response, 200)
def test_volume_attachment_delete(self):
subs = self.test_attach_volume_to_server()
response = self._do_delete('servers/%s/os-volume_attachments/%s'
% (self.server_id, subs['volume_id']))
self.assertEqual(202, response.status_code)
self.assertEqual('', response.text)
def test_volume_attachment_update(self):
subs = self.test_attach_volume_to_server()
subs['new_volume_id'] = self.NEW_VOLUME_ID
response = self._do_put('servers/%s/os-volume_attachments/%s'
% (self.server_id, subs['volume_id']),
'update-volume-req',
subs)
self.assertEqual(202, response.status_code)
self.assertEqual('', response.text)
class VolumeAttachmentsSampleV249(VolumeAttachmentsSample):
sample_dir = "os-volumes"
microversion = '2.49'
scenarios = [('v2_49', {'api_major_version': 'v2.1'})]
def setUp(self):
super(VolumeAttachmentsSampleV249, self).setUp()
# Stub out ComputeManager._delete_disk_metadata since the fake virt
# driver does not actually update the instance.device_metadata.devices
# list with the tagged bdm disk device metadata.
self.stub_out('nova.compute.manager.ComputeManager.'
'_delete_disk_metadata', lambda *a, **kw: None)
def _get_vol_attachment_subs(self, subs):
return dict(subs, tag='foo')
class VolumeAttachmentsSampleV270(VolumeAttachmentsSampleV249):
"""2.70 adds the "tag" parameter to the response body"""
microversion = '2.70'
scenarios = [('v2_70', {'api_major_version': 'v2.1'})]
class VolumeAttachmentsSampleV279(VolumeAttachmentsSampleV270):
"""Microversion 2.79 adds the "delete_on_termination" parameter to the
request and response body.
"""
microversion = '2.79'
scenarios = [('v2_79', {'api_major_version': 'v2.1'})]
class UpdateVolumeAttachmentsSampleV285(VolumeAttachmentsSampleV279):
"""Microversion 2.85 adds the ``PUT
/servers/{server_id}/os-volume_attachments/{volume_id}``
support for specifying ``delete_on_termination`` field in the request
body to re-config the attached volume whether to delete when the instance
is deleted.
"""
microversion = '2.85'
scenarios = [('v2_85', {'api_major_version': 'v2.1'})]
def test_volume_attachment_update(self):
subs = self.test_attach_volume_to_server()
attached_volume_id = subs['volume_id']
subs['server_id'] = self.server_id
response = self._do_put('servers/%s/os-volume_attachments/%s'
% (self.server_id, attached_volume_id),
'update-volume-attachment-delete-flag-req',
subs)
self.assertEqual(202, response.status_code)
self.assertEqual('', response.text)
# Make sure the attached volume was changed
attachments = self.api.api_get(
'/servers/%s/os-volume_attachments' % self.server_id).body[
'volumeAttachments']
self.assertEqual(1, len(attachments))
self.assertEqual(self.server_id, attachments[0]['serverId'])
self.assertTrue(attachments[0]['delete_on_termination'])
class VolumeAttachmentsSampleV289(UpdateVolumeAttachmentsSampleV285):
"""Microversion 2.89 adds the "attachment_id" parameter to the
response body of show and list.
"""
microversion = '2.89'
scenarios = [('v2_89', {'api_major_version': 'v2.1'})]

View File

@@ -67,7 +67,7 @@ class TestLibvirtROMultiattachMigrate(
client.OpenStackApiException, self.api.put_server_volume,
server_id, self.cinder.MULTIATTACH_RO_SWAP_OLD_VOL,
self.cinder.MULTIATTACH_RO_SWAP_NEW_VOL)
self.assertIn("this api should only be called by Cinder", str(ex))
self.assertIn("this API should only be called by Cinder", str(ex))
def test_ro_multiattach_migrate_volume(self):
server_id = self._create_server(networks='none')['id']

View File

@@ -64,4 +64,4 @@ class TestDirectSwapVolume(
client.OpenStackApiException, self.api.put_server_volume,
server_id, self.cinder.MULTIATTACH_RO_SWAP_OLD_VOL,
self.cinder.MULTIATTACH_RO_SWAP_NEW_VOL)
self.assertIn("this api should only be called by Cinder", str(ex))
self.assertIn("this API should only be called by Cinder", str(ex))

View File

@@ -0,0 +1,255 @@
# Copyright 2013 Josh Durgin
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import urllib
import fixtures
from oslo_serialization import jsonutils
from oslo_utils.fixture import uuidsentinel as uuids
import webob
from nova.api.openstack.compute import assisted_volume_snapshots \
as assisted_snaps_v21
from nova.compute import api as compute_api
from nova.compute import task_states
from nova import exception
from nova import test
from nova.tests.unit.api.openstack import fakes
FAKE_UUID = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
class AssistedSnapshotCreateTestCaseV21(test.NoDBTestCase):
assisted_snaps = assisted_snaps_v21
bad_request = exception.ValidationError
def setUp(self):
super(AssistedSnapshotCreateTestCaseV21, self).setUp()
self.controller = \
self.assisted_snaps.AssistedVolumeSnapshotsController()
self.url = ('/v2/%s/os-assisted-volume-snapshots' %
fakes.FAKE_PROJECT_ID)
@mock.patch.object(compute_api.API, 'volume_snapshot_create')
def test_assisted_create(self, mock_volume_snapshot_create):
mock_volume_snapshot_create.return_value = {
'snapshot': {
'id': uuids.snapshot_id,
'volumeId': uuids.volume_id,
},
}
req = fakes.HTTPRequest.blank(self.url)
expected_create_info = {'type': 'qcow2',
'new_file': 'new_file',
'snapshot_id': 'snapshot_id'}
body = {'snapshot': {'volume_id': uuids.volume_to_snapshot,
'create_info': expected_create_info}}
req.method = 'POST'
self.controller.create(req, body=body)
mock_volume_snapshot_create.assert_called_once_with(
req.environ['nova.context'], uuids.volume_to_snapshot,
expected_create_info)
def test_assisted_create_missing_create_info(self):
req = fakes.HTTPRequest.blank(self.url)
body = {'snapshot': {'volume_id': '1'}}
req.method = 'POST'
self.assertRaises(self.bad_request, self.controller.create,
req, body=body)
def test_assisted_create_with_unexpected_attr(self):
req = fakes.HTTPRequest.blank(self.url)
body = {
'snapshot': {
'volume_id': '1',
'create_info': {
'type': 'qcow2',
'new_file': 'new_file',
'snapshot_id': 'snapshot_id'
}
},
'unexpected': 0,
}
req.method = 'POST'
self.assertRaises(self.bad_request, self.controller.create,
req, body=body)
@mock.patch('nova.objects.BlockDeviceMapping.get_by_volume',
side_effect=exception.VolumeBDMIsMultiAttach(volume_id='1'))
def test_assisted_create_multiattach_fails(self, bdm_get_by_volume):
req = fakes.HTTPRequest.blank(self.url)
body = {'snapshot':
{'volume_id': '1',
'create_info': {'type': 'qcow2',
'new_file': 'new_file',
'snapshot_id': 'snapshot_id'}}}
req.method = 'POST'
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create, req, body=body)
def _test_assisted_create_instance_conflict(self, api_error):
req = fakes.HTTPRequest.blank(self.url)
body = {'snapshot':
{'volume_id': '1',
'create_info': {'type': 'qcow2',
'new_file': 'new_file',
'snapshot_id': 'snapshot_id'}}}
req.method = 'POST'
with mock.patch.object(compute_api.API, 'volume_snapshot_create',
side_effect=api_error):
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.create,
req, body=body)
def test_assisted_create_instance_invalid_state(self):
api_error = exception.InstanceInvalidState(
instance_uuid=FAKE_UUID, attr='task_state',
state=task_states.SHELVING_OFFLOADING,
method='volume_snapshot_create')
self._test_assisted_create_instance_conflict(api_error)
def test_assisted_create_instance_not_ready(self):
api_error = exception.InstanceNotReady(instance_id=FAKE_UUID)
self._test_assisted_create_instance_conflict(api_error)
class AssistedSnapshotDeleteTestCaseV21(test.NoDBTestCase):
assisted_snaps = assisted_snaps_v21
microversion = '2.1'
def _check_status(self, expected_status, req, res, controller_method):
self.assertEqual(expected_status, controller_method.wsgi_codes(req))
def setUp(self):
super(AssistedSnapshotDeleteTestCaseV21, self).setUp()
self.controller = \
self.assisted_snaps.AssistedVolumeSnapshotsController()
self.mock_volume_snapshot_delete = self.useFixture(
fixtures.MockPatchObject(compute_api.API,
'volume_snapshot_delete')).mock
self.url = ('/v2/%s/os-assisted-volume-snapshots' %
fakes.FAKE_PROJECT_ID)
def test_assisted_delete(self):
params = {
'delete_info': jsonutils.dumps({'volume_id': '1'}),
}
req = fakes.HTTPRequest.blank(
self.url + '?%s' %
urllib.parse.urlencode(params),
version=self.microversion)
req.method = 'DELETE'
result = self.controller.delete(req, '5')
self._check_status(204, req, result, self.controller.delete)
def test_assisted_delete_missing_delete_info(self):
req = fakes.HTTPRequest.blank(self.url,
version=self.microversion)
req.method = 'DELETE'
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
req, '5')
def _test_assisted_delete_instance_conflict(self, api_error):
self.mock_volume_snapshot_delete.side_effect = api_error
params = {
'delete_info': jsonutils.dumps({'volume_id': '1'}),
}
req = fakes.HTTPRequest.blank(
self.url + '?%s' %
urllib.parse.urlencode(params),
version=self.microversion)
req.method = 'DELETE'
self.assertRaises(
webob.exc.HTTPBadRequest, self.controller.delete, req, '5')
def test_assisted_delete_instance_invalid_state(self):
api_error = exception.InstanceInvalidState(
instance_uuid=FAKE_UUID, attr='task_state',
state=task_states.UNSHELVING,
method='volume_snapshot_delete')
self._test_assisted_delete_instance_conflict(api_error)
def test_assisted_delete_instance_not_ready(self):
api_error = exception.InstanceNotReady(instance_id=FAKE_UUID)
self._test_assisted_delete_instance_conflict(api_error)
def test_delete_additional_query_parameters(self):
params = {
'delete_info': jsonutils.dumps({'volume_id': '1'}),
'additional': 123
}
req = fakes.HTTPRequest.blank(
self.url + '?%s' %
urllib.parse.urlencode(params),
version=self.microversion)
req.method = 'DELETE'
self.controller.delete(req, '5')
def test_delete_duplicate_query_parameters_validation(self):
params = [
('delete_info', jsonutils.dumps({'volume_id': '1'})),
('delete_info', jsonutils.dumps({'volume_id': '2'}))
]
req = fakes.HTTPRequest.blank(
self.url + '?%s' %
urllib.parse.urlencode(params),
version=self.microversion)
req.method = 'DELETE'
self.controller.delete(req, '5')
def test_assisted_delete_missing_volume_id(self):
params = {
'delete_info': jsonutils.dumps({'something_else': '1'}),
}
req = fakes.HTTPRequest.blank(
self.url + '?%s' %
urllib.parse.urlencode(params),
version=self.microversion)
req.method = 'DELETE'
ex = self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.delete, req, '5')
# This is the result of a KeyError but the only thing in the message
# is the missing key.
self.assertIn('volume_id', str(ex))
class AssistedSnapshotDeleteTestCaseV275(AssistedSnapshotDeleteTestCaseV21):
assisted_snaps = assisted_snaps_v21
microversion = '2.75'
def test_delete_additional_query_parameters_old_version(self):
params = {
'delete_info': jsonutils.dumps({'volume_id': '1'}),
'additional': 123
}
req = fakes.HTTPRequest.blank(
self.url + '?%s' %
urllib.parse.urlencode(params),
version='2.74')
self.controller.delete(req, 1)
def test_delete_additional_query_parameters(self):
req = fakes.HTTPRequest.blank(
self.url + '?unknown=1',
version=self.microversion)
self.assertRaises(exception.ValidationError,
self.controller.delete, req, 1)

View File

@@ -17,7 +17,7 @@ from unittest import mock
import webob
from nova.api.openstack.compute import volumes as volumes_v21
from nova.api.openstack.compute import snapshots
from nova import exception
from nova import test
from nova.tests.unit.api.openstack import fakes
@@ -27,11 +27,9 @@ FAKE_UUID = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
class SnapshotApiTestV21(test.NoDBTestCase):
controller = volumes_v21.SnapshotController()
validation_error = exception.ValidationError
def setUp(self):
super(SnapshotApiTestV21, self).setUp()
super().setUp()
fakes.stub_out_networking(self)
self.stub_out("nova.volume.cinder.API.create_snapshot",
fakes.stub_snapshot_create)
@@ -44,6 +42,7 @@ class SnapshotApiTestV21(test.NoDBTestCase):
self.stub_out("nova.volume.cinder.API.get_all_snapshots",
fakes.stub_snapshot_get_all)
self.stub_out("nova.volume.cinder.API.get", fakes.stub_volume_get)
self.controller = snapshots.SnapshotController()
self.req = fakes.HTTPRequest.blank('')
def _test_snapshot_create(self, force):
@@ -70,9 +69,26 @@ class SnapshotApiTestV21(test.NoDBTestCase):
def test_snapshot_create_invalid_force_param(self):
body = {'snapshot': {'volume_id': '1',
'force': '**&&^^%%$$##@@'}}
self.assertRaises(self.validation_error,
self.assertRaises(exception.ValidationError,
self.controller.create, self.req, body=body)
def test_create_no_body(self):
self.assertRaises(
exception.ValidationError,
self.controller.create, self.req, body=None)
def test_create_missing_volume(self):
body = {'foo': {'a': 'b'}}
self.assertRaises(
exception.ValidationError,
self.controller.create, self.req, body=body)
def test_create_malformed_entity(self):
body = {'snapshot': 'string'}
self.assertRaises(
exception.ValidationError,
self.controller.create, self.req, body=body)
def test_snapshot_delete(self):
snapshot_id = '123'
delete = self.controller.delete
@@ -80,7 +96,7 @@ class SnapshotApiTestV21(test.NoDBTestCase):
# NOTE: on v2.1, http status code is set as wsgi_codes of API
# method instead of status_int in a response object.
if isinstance(self.controller, volumes_v21.SnapshotController):
if isinstance(self.controller, snapshots.SnapshotController):
status_int = delete.wsgi_codes(self.req)
else:
status_int = result.status_int
@@ -224,8 +240,8 @@ class SnapshotApiTestV21(test.NoDBTestCase):
class TestSnapshotAPIDeprecation(test.NoDBTestCase):
def setUp(self):
super(TestSnapshotAPIDeprecation, self).setUp()
self.controller = volumes_v21.SnapshotController()
super().setUp()
self.controller = snapshots.SnapshotController()
self.req = fakes.HTTPRequest.blank('', version='2.36')
def test_all_apis_return_not_found(self):

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,242 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from oslo_utils.fixture import uuidsentinel as uuids
from nova.api.openstack.compute import snapshots
from nova.policies import base as base_policy
from nova.policies import volumes as v_policies
from nova.tests.unit.api.openstack import fakes
from nova.tests.unit.policies import base
class SnapshotsPolicyTest(base.BasePolicyTest):
"""Test Snapshots APIs policies with all possible context.
This class defines the set of context with different roles
which are allowed and not allowed to pass the policy checks.
With those set of context, it will call the API operation and
verify the expected behaviour.
"""
def setUp(self):
super().setUp()
self.snapshot_ctlr = snapshots.SnapshotController()
self.req = fakes.HTTPRequest.blank('')
# Everyone will be able to perform crud operations
# on volume and volume snapshots.
# NOTE: Nova cannot verify the volume/snapshot owner during nova policy
# enforcement so will be passing context's project_id as target to
# policy and always pass. If requester is not admin or owner
# of volume/snapshot then cinder will be returning the appropriate
# error.
self.project_member_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_manager_context,
self.project_member_context, self.project_reader_context,
self.project_foo_context,
self.other_project_reader_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_manager_context,
self.other_project_member_context
]
self.project_reader_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_manager_context,
self.project_member_context, self.project_reader_context,
self.project_foo_context,
self.other_project_reader_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_manager_context,
self.other_project_member_context
]
@mock.patch('nova.volume.cinder.API.get_all_snapshots')
def test_list_snapshots_policy(self, mock_get):
rule_name = "os_compute_api:os-volumes:snapshots:list"
self.common_policy_auth(self.project_reader_authorized_contexts,
rule_name, self.snapshot_ctlr.index,
self.req)
@mock.patch('nova.volume.cinder.API.get_all_snapshots')
def test_list_detail_snapshots_policy(self, mock_get):
rule_name = "os_compute_api:os-volumes:snapshots:detail"
self.common_policy_auth(self.project_reader_authorized_contexts,
rule_name, self.snapshot_ctlr.detail,
self.req)
@mock.patch('nova.volume.cinder.API.get_snapshot')
def test_show_snapshot_policy(self, mock_get):
rule_name = "os_compute_api:os-volumes:snapshots:show"
self.common_policy_auth(self.project_reader_authorized_contexts,
rule_name, self.snapshot_ctlr.show,
self.req, uuids.fake_id)
@mock.patch('nova.volume.cinder.API.create_snapshot')
def test_create_snapshot_policy(self, mock_create):
rule_name = "os_compute_api:os-volumes:snapshots:create"
body = {"snapshot": {"volume_id": uuids.fake_id}}
self.common_policy_auth(self.project_member_authorized_contexts,
rule_name, self.snapshot_ctlr.create,
self.req, body=body)
@mock.patch('nova.volume.cinder.API.delete_snapshot')
def test_delete_snapshot_policy(self, mock_delete):
rule_name = "os_compute_api:os-volumes:snapshots:delete"
self.common_policy_auth(self.project_member_authorized_contexts,
rule_name, self.snapshot_ctlr.delete,
self.req, uuids.fake_id)
class SnapshotsNoLegacyNoScopePolicyTest(SnapshotsPolicyTest):
"""Test Snapshot APIs policies with no legacy deprecated rules
and no scope checks which means new defaults only.
"""
without_deprecated_rules = True
rules_without_deprecation = {
v_policies.POLICY_NAME % 'list':
base_policy.PROJECT_READER_OR_ADMIN,
v_policies.POLICY_NAME % 'detail':
base_policy.PROJECT_READER_OR_ADMIN,
v_policies.POLICY_NAME % 'show':
base_policy.PROJECT_READER_OR_ADMIN,
v_policies.POLICY_NAME % 'create':
base_policy.PROJECT_MEMBER_OR_ADMIN,
v_policies.POLICY_NAME % 'delete':
base_policy.PROJECT_MEMBER_OR_ADMIN,
v_policies.POLICY_NAME % 'snapshots:list':
base_policy.PROJECT_READER_OR_ADMIN,
v_policies.POLICY_NAME % 'snapshots:detail':
base_policy.PROJECT_READER_OR_ADMIN,
v_policies.POLICY_NAME % 'snapshots:delete':
base_policy.PROJECT_MEMBER_OR_ADMIN,
v_policies.POLICY_NAME % 'snapshots:create':
base_policy.PROJECT_MEMBER_OR_ADMIN,
v_policies.POLICY_NAME % 'snapshots:show':
base_policy.PROJECT_READER_OR_ADMIN,
}
def setUp(self):
super().setUp()
# With no legacy, project other roles like foo will not be able
# to operate on volume and snapshot.
self.project_member_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_manager_context,
self.project_member_context, self.system_member_context,
self.other_project_manager_context,
self.other_project_member_context
]
self.project_reader_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_manager_context,
self.project_member_context, self.project_reader_context,
self.other_project_reader_context,
self.system_member_context, self.system_reader_context,
self.other_project_manager_context,
self.other_project_member_context
]
class SnapshotsScopeTypePolicyTest(SnapshotsPolicyTest):
"""Test Snapshots APIs policies with system scope enabled.
This class set the nova.conf [oslo_policy] enforce_scope to True
so that we can switch on the scope checking on oslo policy side.
It defines the set of context with scoped token
which are allowed and not allowed to pass the policy checks.
With those set of context, it will run the API operation and
verify the expected behaviour.
"""
def setUp(self):
super().setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# With scope enabled, system users will not be able to
# operate on volume and snapshot.
self.project_member_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_manager_context,
self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_reader_context,
self.other_project_manager_context,
self.other_project_member_context
]
self.project_reader_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_manager_context,
self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_reader_context,
self.other_project_manager_context,
self.other_project_member_context
]
class SnapshotsScopeTypeNoLegacyPolicyTest(SnapshotsScopeTypePolicyTest):
"""Test Snapshot APIs policies with system scope enabled,
and no legacy deprecated rules.
"""
without_deprecated_rules = True
rules_without_deprecation = {
v_policies.POLICY_NAME % 'list':
base_policy.PROJECT_READER_OR_ADMIN,
v_policies.POLICY_NAME % 'detail':
base_policy.PROJECT_READER_OR_ADMIN,
v_policies.POLICY_NAME % 'show':
base_policy.PROJECT_READER_OR_ADMIN,
v_policies.POLICY_NAME % 'create':
base_policy.PROJECT_MEMBER_OR_ADMIN,
v_policies.POLICY_NAME % 'delete':
base_policy.PROJECT_MEMBER_OR_ADMIN,
v_policies.POLICY_NAME % 'snapshots:list':
base_policy.PROJECT_READER_OR_ADMIN,
v_policies.POLICY_NAME % 'snapshots:detail':
base_policy.PROJECT_READER_OR_ADMIN,
v_policies.POLICY_NAME % 'snapshots:delete':
base_policy.PROJECT_MEMBER_OR_ADMIN,
v_policies.POLICY_NAME % 'snapshots:create':
base_policy.PROJECT_MEMBER_OR_ADMIN,
v_policies.POLICY_NAME % 'snapshots:show':
base_policy.PROJECT_READER_OR_ADMIN,
}
def setUp(self):
super().setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# With no legacy and scope enabled, system users and project
# other roles like foo will not be able to operate on volume
# and snapshot.
self.project_member_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_manager_context,
self.project_member_context,
self.other_project_manager_context,
self.other_project_member_context
]
self.project_reader_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_manager_context,
self.project_member_context,
self.project_reader_context,
self.other_project_manager_context,
self.other_project_reader_context,
self.other_project_member_context
]

View File

@@ -0,0 +1,276 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import fixtures
from oslo_utils.fixture import uuidsentinel as uuids
from oslo_utils import timeutils
from nova.api.openstack.compute import volume_attachments
from nova.compute import vm_states
from nova import exception
from nova import objects
from nova.objects import block_device as block_device_obj
from nova.policies import volumes_attachments as va_policies
from nova.tests.unit.api.openstack import fakes
from nova.tests.unit import fake_block_device
from nova.tests.unit import fake_instance
from nova.tests.unit.policies import base
# This is the server ID.
FAKE_UUID = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
# This is the old volume ID (to swap from).
FAKE_UUID_A = '00000000-aaaa-aaaa-aaaa-000000000000'
# This is the new volume ID (to swap to).
FAKE_UUID_B = 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb'
def fake_bdm_get_by_volume_and_instance(cls, ctxt, volume_id, instance_uuid):
if volume_id not in (FAKE_UUID_A, uuids.source_swap_vol):
raise exception.VolumeBDMNotFound(volume_id=volume_id)
db_bdm = fake_block_device.FakeDbBlockDeviceDict(
{'id': 1,
'instance_uuid': instance_uuid,
'device_name': '/dev/fake0',
'delete_on_termination': 'False',
'source_type': 'volume',
'destination_type': 'volume',
'snapshot_id': None,
'volume_id': volume_id,
'volume_size': 1})
return objects.BlockDeviceMapping._from_db_object(
ctxt, objects.BlockDeviceMapping(), db_bdm)
def fake_get_volume(self, context, id):
migration_status = None
if id == FAKE_UUID_A:
status = 'in-use'
attach_status = 'attached'
elif id == FAKE_UUID_B:
status = 'available'
attach_status = 'detached'
elif id == uuids.source_swap_vol:
status = 'in-use'
attach_status = 'attached'
migration_status = 'migrating'
else:
raise exception.VolumeNotFound(volume_id=id)
return {
'id': id, 'status': status, 'attach_status': attach_status,
'migration_status': migration_status
}
class VolumeAttachPolicyTest(base.BasePolicyTest):
"""Test os-volumes-attachments APIs policies with all possible context.
This class defines the set of context with different roles
which are allowed and not allowed to pass the policy checks.
With those set of context, it will call the API operation and
verify the expected behaviour.
"""
def setUp(self):
super().setUp()
self.controller = volume_attachments.VolumeAttachmentController()
self.req = fakes.HTTPRequest.blank('')
self.policy_root = va_policies.POLICY_ROOT
self.stub_out('nova.objects.BlockDeviceMapping'
'.get_by_volume_and_instance',
fake_bdm_get_by_volume_and_instance)
self.stub_out('nova.volume.cinder.API.get', fake_get_volume)
self.mock_get = self.useFixture(
fixtures.MockPatch('nova.api.openstack.common.get_instance')).mock
uuid = uuids.fake_id
self.instance = fake_instance.fake_instance_obj(
self.project_member_context,
id=1, uuid=uuid, project_id=self.project_id,
vm_state=vm_states.ACTIVE,
task_state=None, launched_at=timeutils.utcnow())
self.mock_get.return_value = self.instance
# With legacy rule and no scope checks, all admin, project members
# project reader or other project role(because legacy rule allow
# resource owner- having same project id and no role check) is
# able create/delete/update the volume attachment.
self.project_member_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_manager_context,
self.project_member_context, self.project_reader_context,
self.project_foo_context]
# With legacy rule and no scope checks, all admin, project members
# project reader or other project role(because legacy rule allow
# resource owner- having same project id and no role check) is
# able get the volume attachment.
self.project_reader_authorized_contexts = (
self.project_member_authorized_contexts)
# By default, legacy rule are enable and scope check is disabled.
# system admin, legacy admin, and project admin is able to update
# volume attachment with a different volumeId.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
@mock.patch.object(objects.BlockDeviceMappingList, 'get_by_instance_uuid')
def test_index_volume_attach_policy(self, mock_get_instance):
rule_name = self.policy_root % "index"
self.common_policy_auth(self.project_reader_authorized_contexts,
rule_name, self.controller.index,
self.req, FAKE_UUID)
def test_show_volume_attach_policy(self):
rule_name = self.policy_root % "show"
self.common_policy_auth(self.project_reader_authorized_contexts,
rule_name, self.controller.show,
self.req, FAKE_UUID, FAKE_UUID_A)
@mock.patch('nova.compute.api.API.attach_volume')
def test_create_volume_attach_policy(self, mock_attach_volume):
rule_name = self.policy_root % "create"
body = {'volumeAttachment': {'volumeId': FAKE_UUID_B,
'device': '/dev/fake'}}
self.common_policy_auth(self.project_member_authorized_contexts,
rule_name, self.controller.create,
self.req, FAKE_UUID, body=body)
@mock.patch.object(block_device_obj.BlockDeviceMapping, 'save')
def test_update_volume_attach_policy(self, mock_bdm_save):
rule_name = self.policy_root % "update"
req = fakes.HTTPRequest.blank('', version='2.85')
body = {'volumeAttachment': {
'volumeId': FAKE_UUID_A,
'delete_on_termination': True}}
self.common_policy_auth(self.project_member_authorized_contexts,
rule_name, self.controller.update,
req, FAKE_UUID,
FAKE_UUID_A, body=body)
@mock.patch('nova.compute.api.API.detach_volume')
def test_delete_volume_attach_policy(self, mock_detach_volume):
rule_name = self.policy_root % "delete"
self.common_policy_auth(self.project_member_authorized_contexts,
rule_name, self.controller.delete,
self.req, FAKE_UUID, FAKE_UUID_A)
@mock.patch('nova.compute.api.API.swap_volume')
def test_swap_volume_attach_policy(self, mock_swap_volume):
rule_name = self.policy_root % "swap"
body = {'volumeAttachment': {'volumeId': FAKE_UUID_B}}
self.common_policy_auth(
self.project_admin_authorized_contexts,
rule_name, self.controller.update,
self.req, FAKE_UUID, uuids.source_swap_vol, body=body)
@mock.patch.object(block_device_obj.BlockDeviceMapping, 'save')
@mock.patch('nova.compute.api.API.swap_volume')
def test_swap_volume_attach_policy_failed(self,
mock_swap_volume,
mock_bdm_save):
"""Policy check fails for swap + update due to swap policy failure.
"""
rule_name = self.policy_root % "swap"
req = fakes.HTTPRequest.blank('', version='2.85')
req.environ['nova.context'].user_id = 'other-user'
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
body = {'volumeAttachment': {'volumeId': FAKE_UUID_B,
'delete_on_termination': True}}
exc = self.assertRaises(
exception.PolicyNotAuthorized, self.controller.update,
req, FAKE_UUID, FAKE_UUID_A, body=body)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
mock_swap_volume.assert_not_called()
mock_bdm_save.assert_not_called()
@mock.patch.object(block_device_obj.BlockDeviceMapping, 'save')
@mock.patch('nova.compute.api.API.swap_volume')
def test_pass_swap_and_update_volume_attach_policy(self,
mock_swap_volume,
mock_bdm_save):
rule_name = self.policy_root % "swap"
req = fakes.HTTPRequest.blank('', version='2.85')
body = {'volumeAttachment': {'volumeId': FAKE_UUID_B,
'delete_on_termination': True}}
self.common_policy_auth(
self.project_admin_authorized_contexts,
rule_name, self.controller.update,
req, FAKE_UUID, uuids.source_swap_vol, body=body)
mock_swap_volume.assert_called()
mock_bdm_save.assert_called()
class VolumeAttachNoLegacyNoScopePolicyTest(VolumeAttachPolicyTest):
"""Test volume attachment APIs policies with no legacy deprecated rules
and no scope checks which means new defaults only.
"""
without_deprecated_rules = True
def setUp(self):
super().setUp()
# With no legacy rule, only admin, member, or reader will be
# able to perform volume attachment operation on its own project.
self.project_member_authorized_contexts = (
self.project_member_or_admin_with_no_scope_no_legacy)
self.project_reader_authorized_contexts = (
self.project_reader_or_admin_with_no_scope_no_legacy)
class VolumeAttachScopeTypePolicyTest(VolumeAttachPolicyTest):
"""Test os-volume-attachments APIs policies with system scope enabled.
This class set the nova.conf [oslo_policy] enforce_scope to True
so that we can switch on the scope checking on oslo policy side.
It defines the set of context with scoped token
which are allowed and not allowed to pass the policy checks.
With those set of context, it will run the API operation and
verify the expected behaviour.
"""
def setUp(self):
super().setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Scope enable will not allow system admin to perform the
# volume attachments.
self.project_member_authorized_contexts = (
self.project_m_r_or_admin_with_scope_and_legacy)
self.project_reader_authorized_contexts = (
self.project_m_r_or_admin_with_scope_and_legacy)
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context]
class VolumeAttachScopeTypeNoLegacyPolicyTest(VolumeAttachScopeTypePolicyTest):
"""Test os-volume-attachments APIs policies with system scope enabled,
and no legacy deprecated rules.
"""
without_deprecated_rules = True
def setUp(self):
super().setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# With scope enable and no legacy rule, it will not allow
# system users and project admin/member/reader will be able to
# perform volume attachment operation on its own project.
self.project_member_authorized_contexts = (
self.project_member_or_admin_with_scope_no_legacy)
self.project_reader_authorized_contexts = (
self.project_reader_or_admin_with_scope_no_legacy)

View File

@@ -12,271 +12,14 @@
from unittest import mock
import fixtures
from oslo_utils.fixture import uuidsentinel as uuids
from oslo_utils import timeutils
from nova.api.openstack.compute import volumes as volumes_v21
from nova.compute import vm_states
from nova import exception
from nova import objects
from nova.objects import block_device as block_device_obj
from nova.api.openstack.compute import volumes
from nova.policies import base as base_policy
from nova.policies import volumes as v_policies
from nova.policies import volumes_attachments as va_policies
from nova.tests.unit.api.openstack import fakes
from nova.tests.unit import fake_block_device
from nova.tests.unit import fake_instance
from nova.tests.unit.policies import base
# This is the server ID.
FAKE_UUID = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
# This is the old volume ID (to swap from).
FAKE_UUID_A = '00000000-aaaa-aaaa-aaaa-000000000000'
# This is the new volume ID (to swap to).
FAKE_UUID_B = 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb'
def fake_bdm_get_by_volume_and_instance(cls, ctxt, volume_id, instance_uuid):
if volume_id not in (FAKE_UUID_A, uuids.source_swap_vol):
raise exception.VolumeBDMNotFound(volume_id=volume_id)
db_bdm = fake_block_device.FakeDbBlockDeviceDict(
{'id': 1,
'instance_uuid': instance_uuid,
'device_name': '/dev/fake0',
'delete_on_termination': 'False',
'source_type': 'volume',
'destination_type': 'volume',
'snapshot_id': None,
'volume_id': volume_id,
'volume_size': 1})
return objects.BlockDeviceMapping._from_db_object(
ctxt, objects.BlockDeviceMapping(), db_bdm)
def fake_get_volume(self, context, id):
migration_status = None
if id == FAKE_UUID_A:
status = 'in-use'
attach_status = 'attached'
elif id == FAKE_UUID_B:
status = 'available'
attach_status = 'detached'
elif id == uuids.source_swap_vol:
status = 'in-use'
attach_status = 'attached'
migration_status = 'migrating'
else:
raise exception.VolumeNotFound(volume_id=id)
return {
'id': id, 'status': status, 'attach_status': attach_status,
'migration_status': migration_status
}
class VolumeAttachPolicyTest(base.BasePolicyTest):
"""Test os-volumes-attachments APIs policies with all possible context.
This class defines the set of context with different roles
which are allowed and not allowed to pass the policy checks.
With those set of context, it will call the API operation and
verify the expected behaviour.
"""
def setUp(self):
super(VolumeAttachPolicyTest, self).setUp()
self.controller = volumes_v21.VolumeAttachmentController()
self.req = fakes.HTTPRequest.blank('')
self.policy_root = va_policies.POLICY_ROOT
self.stub_out('nova.objects.BlockDeviceMapping'
'.get_by_volume_and_instance',
fake_bdm_get_by_volume_and_instance)
self.stub_out('nova.volume.cinder.API.get', fake_get_volume)
self.mock_get = self.useFixture(
fixtures.MockPatch('nova.api.openstack.common.get_instance')).mock
uuid = uuids.fake_id
self.instance = fake_instance.fake_instance_obj(
self.project_member_context,
id=1, uuid=uuid, project_id=self.project_id,
vm_state=vm_states.ACTIVE,
task_state=None, launched_at=timeutils.utcnow())
self.mock_get.return_value = self.instance
# With legacy rule and no scope checks, all admin, project members
# project reader or other project role(because legacy rule allow
# resource owner- having same project id and no role check) is
# able create/delete/update the volume attachment.
self.project_member_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_manager_context,
self.project_member_context, self.project_reader_context,
self.project_foo_context]
# With legacy rule and no scope checks, all admin, project members
# project reader or other project role(because legacy rule allow
# resource owner- having same project id and no role check) is
# able get the volume attachment.
self.project_reader_authorized_contexts = (
self.project_member_authorized_contexts)
# By default, legacy rule are enable and scope check is disabled.
# system admin, legacy admin, and project admin is able to update
# volume attachment with a different volumeId.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
@mock.patch.object(objects.BlockDeviceMappingList, 'get_by_instance_uuid')
def test_index_volume_attach_policy(self, mock_get_instance):
rule_name = self.policy_root % "index"
self.common_policy_auth(self.project_reader_authorized_contexts,
rule_name, self.controller.index,
self.req, FAKE_UUID)
def test_show_volume_attach_policy(self):
rule_name = self.policy_root % "show"
self.common_policy_auth(self.project_reader_authorized_contexts,
rule_name, self.controller.show,
self.req, FAKE_UUID, FAKE_UUID_A)
@mock.patch('nova.compute.api.API.attach_volume')
def test_create_volume_attach_policy(self, mock_attach_volume):
rule_name = self.policy_root % "create"
body = {'volumeAttachment': {'volumeId': FAKE_UUID_B,
'device': '/dev/fake'}}
self.common_policy_auth(self.project_member_authorized_contexts,
rule_name, self.controller.create,
self.req, FAKE_UUID, body=body)
@mock.patch.object(block_device_obj.BlockDeviceMapping, 'save')
def test_update_volume_attach_policy(self, mock_bdm_save):
rule_name = self.policy_root % "update"
req = fakes.HTTPRequest.blank('', version='2.85')
body = {'volumeAttachment': {
'volumeId': FAKE_UUID_A,
'delete_on_termination': True}}
self.common_policy_auth(self.project_member_authorized_contexts,
rule_name, self.controller.update,
req, FAKE_UUID,
FAKE_UUID_A, body=body)
@mock.patch('nova.compute.api.API.detach_volume')
def test_delete_volume_attach_policy(self, mock_detach_volume):
rule_name = self.policy_root % "delete"
self.common_policy_auth(self.project_member_authorized_contexts,
rule_name, self.controller.delete,
self.req, FAKE_UUID, FAKE_UUID_A)
@mock.patch('nova.compute.api.API.swap_volume')
def test_swap_volume_attach_policy(self, mock_swap_volume):
rule_name = self.policy_root % "swap"
body = {'volumeAttachment': {'volumeId': FAKE_UUID_B}}
self.common_policy_auth(
self.project_admin_authorized_contexts,
rule_name, self.controller.update,
self.req, FAKE_UUID, uuids.source_swap_vol, body=body)
@mock.patch.object(block_device_obj.BlockDeviceMapping, 'save')
@mock.patch('nova.compute.api.API.swap_volume')
def test_swap_volume_attach_policy_failed(self,
mock_swap_volume,
mock_bdm_save):
"""Policy check fails for swap + update due to swap policy failure.
"""
rule_name = self.policy_root % "swap"
req = fakes.HTTPRequest.blank('', version='2.85')
req.environ['nova.context'].user_id = 'other-user'
self.policy.set_rules({rule_name: "user_id:%(user_id)s"})
body = {'volumeAttachment': {'volumeId': FAKE_UUID_B,
'delete_on_termination': True}}
exc = self.assertRaises(
exception.PolicyNotAuthorized, self.controller.update,
req, FAKE_UUID, FAKE_UUID_A, body=body)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
mock_swap_volume.assert_not_called()
mock_bdm_save.assert_not_called()
@mock.patch.object(block_device_obj.BlockDeviceMapping, 'save')
@mock.patch('nova.compute.api.API.swap_volume')
def test_pass_swap_and_update_volume_attach_policy(self,
mock_swap_volume,
mock_bdm_save):
rule_name = self.policy_root % "swap"
req = fakes.HTTPRequest.blank('', version='2.85')
body = {'volumeAttachment': {'volumeId': FAKE_UUID_B,
'delete_on_termination': True}}
self.common_policy_auth(
self.project_admin_authorized_contexts,
rule_name, self.controller.update,
req, FAKE_UUID, uuids.source_swap_vol, body=body)
mock_swap_volume.assert_called()
mock_bdm_save.assert_called()
class VolumeAttachNoLegacyNoScopePolicyTest(VolumeAttachPolicyTest):
"""Test volume attachment APIs policies with no legacy deprecated rules
and no scope checks which means new defaults only.
"""
without_deprecated_rules = True
def setUp(self):
super(VolumeAttachNoLegacyNoScopePolicyTest, self).setUp()
# With no legacy rule, only admin, member, or reader will be
# able to perform volume attachment operation on its own project.
self.project_member_authorized_contexts = (
self.project_member_or_admin_with_no_scope_no_legacy)
self.project_reader_authorized_contexts = (
self.project_reader_or_admin_with_no_scope_no_legacy)
class VolumeAttachScopeTypePolicyTest(VolumeAttachPolicyTest):
"""Test os-volume-attachments APIs policies with system scope enabled.
This class set the nova.conf [oslo_policy] enforce_scope to True
so that we can switch on the scope checking on oslo policy side.
It defines the set of context with scoped token
which are allowed and not allowed to pass the policy checks.
With those set of context, it will run the API operation and
verify the expected behaviour.
"""
def setUp(self):
super(VolumeAttachScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Scope enable will not allow system admin to perform the
# volume attachments.
self.project_member_authorized_contexts = (
self.project_m_r_or_admin_with_scope_and_legacy)
self.project_reader_authorized_contexts = (
self.project_m_r_or_admin_with_scope_and_legacy)
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context]
class VolumeAttachScopeTypeNoLegacyPolicyTest(VolumeAttachScopeTypePolicyTest):
"""Test os-volume-attachments APIs policies with system scope enabled,
and no legacy deprecated rules.
"""
without_deprecated_rules = True
def setUp(self):
super(VolumeAttachScopeTypeNoLegacyPolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# With scope enable and no legacy rule, it will not allow
# system users and project admin/member/reader will be able to
# perform volume attachment operation on its own project.
self.project_member_authorized_contexts = (
self.project_member_or_admin_with_scope_no_legacy)
self.project_reader_authorized_contexts = (
self.project_reader_or_admin_with_scope_no_legacy)
class VolumesPolicyTest(base.BasePolicyTest):
"""Test Volumes APIs policies with all possible context.
@@ -288,9 +31,8 @@ class VolumesPolicyTest(base.BasePolicyTest):
"""
def setUp(self):
super(VolumesPolicyTest, self).setUp()
self.controller = volumes_v21.VolumeController()
self.snapshot_ctlr = volumes_v21.SnapshotController()
super().setUp()
self.controller = volumes.VolumeController()
self.req = fakes.HTTPRequest.blank('')
self.controller._translate_volume_summary_view = mock.MagicMock()
# Everyone will be able to perform crud operations
@@ -364,42 +106,6 @@ class VolumesPolicyTest(base.BasePolicyTest):
rule_name, self.controller.delete,
self.req, uuids.fake_id)
@mock.patch('nova.volume.cinder.API.get_all_snapshots')
def test_list_snapshots_policy(self, mock_get):
rule_name = "os_compute_api:os-volumes:snapshots:list"
self.common_policy_auth(self.project_reader_authorized_contexts,
rule_name, self.snapshot_ctlr.index,
self.req)
@mock.patch('nova.volume.cinder.API.get_all_snapshots')
def test_list_detail_snapshots_policy(self, mock_get):
rule_name = "os_compute_api:os-volumes:snapshots:detail"
self.common_policy_auth(self.project_reader_authorized_contexts,
rule_name, self.snapshot_ctlr.detail,
self.req)
@mock.patch('nova.volume.cinder.API.get_snapshot')
def test_show_snapshot_policy(self, mock_get):
rule_name = "os_compute_api:os-volumes:snapshots:show"
self.common_policy_auth(self.project_reader_authorized_contexts,
rule_name, self.snapshot_ctlr.show,
self.req, uuids.fake_id)
@mock.patch('nova.volume.cinder.API.create_snapshot')
def test_create_snapshot_policy(self, mock_create):
rule_name = "os_compute_api:os-volumes:snapshots:create"
body = {"snapshot": {"volume_id": uuids.fake_id}}
self.common_policy_auth(self.project_member_authorized_contexts,
rule_name, self.snapshot_ctlr.create,
self.req, body=body)
@mock.patch('nova.volume.cinder.API.delete_snapshot')
def test_delete_snapshot_policy(self, mock_delete):
rule_name = "os_compute_api:os-volumes:snapshots:delete"
self.common_policy_auth(self.project_member_authorized_contexts,
rule_name, self.snapshot_ctlr.delete,
self.req, uuids.fake_id)
class VolumesNoLegacyNoScopePolicyTest(VolumesPolicyTest):
"""Test Volume APIs policies with no legacy deprecated rules
@@ -432,7 +138,7 @@ class VolumesNoLegacyNoScopePolicyTest(VolumesPolicyTest):
}
def setUp(self):
super(VolumesNoLegacyNoScopePolicyTest, self).setUp()
super().setUp()
# With no legacy, project other roles like foo will not be able
# to operate on volume and snapshot.
self.project_member_authorized_contexts = [
@@ -465,7 +171,7 @@ class VolumesScopeTypePolicyTest(VolumesPolicyTest):
"""
def setUp(self):
super(VolumesScopeTypePolicyTest, self).setUp()
super().setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# With scope enabled, system users will not be able to
# operate on volume and snapshot.
@@ -519,7 +225,7 @@ class VolumesScopeTypeNoLegacyPolicyTest(VolumesScopeTypePolicyTest):
}
def setUp(self):
super(VolumesScopeTypeNoLegacyPolicyTest, self).setUp()
super().setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# With no legacy and scope enabled, system users and project
# other roles like foo will not be able to operate on volume