Implement metadata options for snapshots

This patch implements metadata for snapshots as well as
the ability to update/delete that metadata.

Implements blueprint: update-snap-metadata

Change-Id: Iec2b7a51cdc3dffad41f24807067cc6c8e7d5135
This commit is contained in:
John Griffith 2013-02-18 23:39:06 +00:00 committed by john-griffith
parent 8aae8bae07
commit 667031428b
16 changed files with 1584 additions and 16 deletions

View File

@ -24,6 +24,7 @@ WSGI middleware for OpenStack Volume API.
from cinder.api import extensions
import cinder.api.openstack
from cinder.api.v1 import limits
from cinder.api.v1 import snapshot_metadata
from cinder.api.v1 import snapshots
from cinder.api.v1 import types
from cinder.api.v1 import volume_metadata
@ -66,6 +67,15 @@ class APIRouter(cinder.api.openstack.APIRouter):
collection={'detail': 'GET'},
member={'action': 'POST'})
self.resources['snapshot_metadata'] = \
snapshot_metadata.create_resource()
snapshot_metadata_controller = self.resources['snapshot_metadata']
mapper.resource("snapshot_metadata", "metadata",
controller=snapshot_metadata_controller,
parent_resource=dict(member_name='snapshot',
collection_name='snapshots'))
self.resources['limits'] = limits.create_resource()
mapper.resource("limit", "limits",
controller=self.resources['limits'])

View File

@ -0,0 +1,164 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import webob
from cinder.api import common
from cinder.api.openstack import wsgi
from cinder import exception
from cinder import volume
from webob import exc
class Controller(object):
""" The volume metadata API controller for the OpenStack API """
def __init__(self):
self.volume_api = volume.API()
super(Controller, self).__init__()
def _get_metadata(self, context, snapshot_id):
try:
snapshot = self.volume_api.get_snapshot(context, snapshot_id)
meta = self.volume_api.get_snapshot_metadata(context, snapshot)
except exception.SnapshotNotFound:
msg = _('snapshot does not exist')
raise exc.HTTPNotFound(explanation=msg)
return meta
@wsgi.serializers(xml=common.MetadataTemplate)
def index(self, req, snapshot_id):
""" Returns the list of metadata for a given snapshot"""
context = req.environ['cinder.context']
return {'metadata': self._get_metadata(context, snapshot_id)}
@wsgi.serializers(xml=common.MetadataTemplate)
@wsgi.deserializers(xml=common.MetadataDeserializer)
def create(self, req, snapshot_id, body):
try:
metadata = body['metadata']
except (KeyError, TypeError):
msg = _("Malformed request body")
raise exc.HTTPBadRequest(explanation=msg)
context = req.environ['cinder.context']
new_metadata = self._update_snapshot_metadata(context,
snapshot_id,
metadata,
delete=False)
return {'metadata': new_metadata}
@wsgi.serializers(xml=common.MetaItemTemplate)
@wsgi.deserializers(xml=common.MetaItemDeserializer)
def update(self, req, snapshot_id, id, body):
try:
meta_item = body['meta']
except (TypeError, KeyError):
expl = _('Malformed request body')
raise exc.HTTPBadRequest(explanation=expl)
if id not in meta_item:
expl = _('Request body and URI mismatch')
raise exc.HTTPBadRequest(explanation=expl)
if len(meta_item) > 1:
expl = _('Request body contains too many items')
raise exc.HTTPBadRequest(explanation=expl)
context = req.environ['cinder.context']
self._update_snapshot_metadata(context,
snapshot_id,
meta_item,
delete=False)
return {'meta': meta_item}
@wsgi.serializers(xml=common.MetadataTemplate)
@wsgi.deserializers(xml=common.MetadataDeserializer)
def update_all(self, req, snapshot_id, body):
try:
metadata = body['metadata']
except (TypeError, KeyError):
expl = _('Malformed request body')
raise exc.HTTPBadRequest(explanation=expl)
context = req.environ['cinder.context']
new_metadata = self._update_snapshot_metadata(context,
snapshot_id,
metadata,
delete=True)
return {'metadata': new_metadata}
def _update_snapshot_metadata(self, context,
snapshot_id, metadata,
delete=False):
try:
snapshot = self.volume_api.get_snapshot(context, snapshot_id)
return self.volume_api.update_snapshot_metadata(context,
snapshot,
metadata,
delete)
except exception.SnapshotNotFound:
msg = _('snapshot does not exist')
raise exc.HTTPNotFound(explanation=msg)
except (ValueError, AttributeError):
msg = _("Malformed request body")
raise exc.HTTPBadRequest(explanation=msg)
except exception.InvalidVolumeMetadata as error:
raise exc.HTTPBadRequest(explanation=unicode(error))
except exception.InvalidVolumeMetadataSize as error:
raise exc.HTTPRequestEntityTooLarge(explanation=unicode(error))
@wsgi.serializers(xml=common.MetaItemTemplate)
def show(self, req, snapshot_id, id):
""" Return a single metadata item """
context = req.environ['cinder.context']
data = self._get_metadata(context, snapshot_id)
try:
return {'meta': {id: data[id]}}
except KeyError:
msg = _("Metadata item was not found")
raise exc.HTTPNotFound(explanation=msg)
def delete(self, req, snapshot_id, id):
""" Deletes an existing metadata """
context = req.environ['cinder.context']
metadata = self._get_metadata(context, snapshot_id)
if id not in metadata:
msg = _("Metadata item was not found")
raise exc.HTTPNotFound(explanation=msg)
try:
snapshot = self.volume_api.get_snapshot(context, snapshot_id)
self.volume_api.delete_snapshot_metadata(context, snapshot, id)
except exception.SnapshotNotFound:
msg = _('snapshot does not exist')
raise exc.HTTPNotFound(explanation=msg)
return webob.Response(status_int=200)
def create_resource():
return wsgi.Resource(Controller())

View File

@ -56,6 +56,15 @@ def _translate_snapshot_summary_view(context, snapshot):
d['status'] = snapshot['status']
d['size'] = snapshot['volume_size']
if snapshot.get('snapshot_metadata'):
metadata = snapshot.get('snapshot_metadata')
d['metadata'] = dict((item['key'], item['value']) for item in metadata)
# avoid circular ref when vol is a Volume instance
elif snapshot.get('metadata') and isinstance(snapshot.get('metadata'),
dict):
d['metadata'] = snapshot['metadata']
else:
d['metadata'] = {}
return d
@ -67,6 +76,7 @@ def make_snapshot(elem):
elem.set('display_name')
elem.set('display_description')
elem.set('volume_id')
elem.append(common.MetadataTemplate())
class SnapshotTemplate(xmlutil.TemplateBuilder):
@ -147,12 +157,15 @@ class SnapshotsController(wsgi.Controller):
@wsgi.serializers(xml=SnapshotTemplate)
def create(self, req, body):
"""Creates a new snapshot."""
kwargs = {}
context = req.environ['cinder.context']
if not self.is_valid_body(body, 'snapshot'):
raise exc.HTTPUnprocessableEntity()
snapshot = body['snapshot']
kwargs['metadata'] = snapshot.get('metadata', None)
volume_id = snapshot['volume_id']
volume = self.volume_api.get(context, volume_id)
force = snapshot.get('force', False)
@ -168,13 +181,15 @@ class SnapshotsController(wsgi.Controller):
context,
volume,
snapshot.get('display_name'),
snapshot.get('display_description'))
snapshot.get('display_description'),
**kwargs)
else:
new_snapshot = self.volume_api.create_snapshot(
context,
volume,
snapshot.get('display_name'),
snapshot.get('display_description'))
snapshot.get('display_description'),
**kwargs)
retval = _translate_snapshot_detail_view(context, new_snapshot)

View File

@ -0,0 +1,164 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import webob
from cinder.api import common
from cinder.api.openstack import wsgi
from cinder import exception
from cinder import volume
from webob import exc
class Controller(object):
""" The volume metadata API controller for the OpenStack API """
def __init__(self):
self.volume_api = volume.API()
super(Controller, self).__init__()
def _get_metadata(self, context, snapshot_id):
try:
snapshot = self.volume_api.get_snapshot(context, snapshot_id)
meta = self.volume_api.get_snapshot_metadata(context, snapshot)
except exception.SnapshotNotFound:
msg = _('snapshot does not exist')
raise exc.HTTPNotFound(explanation=msg)
return meta
@wsgi.serializers(xml=common.MetadataTemplate)
def index(self, req, snapshot_id):
""" Returns the list of metadata for a given snapshot"""
context = req.environ['cinder.context']
return {'metadata': self._get_metadata(context, snapshot_id)}
@wsgi.serializers(xml=common.MetadataTemplate)
@wsgi.deserializers(xml=common.MetadataDeserializer)
def create(self, req, snapshot_id, body):
try:
metadata = body['metadata']
except (KeyError, TypeError):
msg = _("Malformed request body")
raise exc.HTTPBadRequest(explanation=msg)
context = req.environ['cinder.context']
new_metadata = self._update_snapshot_metadata(context,
snapshot_id,
metadata,
delete=False)
return {'metadata': new_metadata}
@wsgi.serializers(xml=common.MetaItemTemplate)
@wsgi.deserializers(xml=common.MetaItemDeserializer)
def update(self, req, snapshot_id, id, body):
try:
meta_item = body['meta']
except (TypeError, KeyError):
expl = _('Malformed request body')
raise exc.HTTPBadRequest(explanation=expl)
if id not in meta_item:
expl = _('Request body and URI mismatch')
raise exc.HTTPBadRequest(explanation=expl)
if len(meta_item) > 1:
expl = _('Request body contains too many items')
raise exc.HTTPBadRequest(explanation=expl)
context = req.environ['cinder.context']
self._update_snapshot_metadata(context,
snapshot_id,
meta_item,
delete=False)
return {'meta': meta_item}
@wsgi.serializers(xml=common.MetadataTemplate)
@wsgi.deserializers(xml=common.MetadataDeserializer)
def update_all(self, req, snapshot_id, body):
try:
metadata = body['metadata']
except (TypeError, KeyError):
expl = _('Malformed request body')
raise exc.HTTPBadRequest(explanation=expl)
context = req.environ['cinder.context']
new_metadata = self._update_snapshot_metadata(context,
snapshot_id,
metadata,
delete=True)
return {'metadata': new_metadata}
def _update_snapshot_metadata(self, context,
snapshot_id, metadata,
delete=False):
try:
snapshot = self.volume_api.get_snapshot(context, snapshot_id)
return self.volume_api.update_snapshot_metadata(context,
snapshot,
metadata,
delete)
except exception.SnapshotNotFound:
msg = _('snapshot does not exist')
raise exc.HTTPNotFound(explanation=msg)
except (ValueError, AttributeError):
msg = _("Malformed request body")
raise exc.HTTPBadRequest(explanation=msg)
except exception.InvalidVolumeMetadata as error:
raise exc.HTTPBadRequest(explanation=unicode(error))
except exception.InvalidVolumeMetadataSize as error:
raise exc.HTTPRequestEntityTooLarge(explanation=unicode(error))
@wsgi.serializers(xml=common.MetaItemTemplate)
def show(self, req, snapshot_id, id):
""" Return a single metadata item """
context = req.environ['cinder.context']
data = self._get_metadata(context, snapshot_id)
try:
return {'meta': {id: data[id]}}
except KeyError:
msg = _("Metadata item was not found")
raise exc.HTTPNotFound(explanation=msg)
def delete(self, req, snapshot_id, id):
""" Deletes an existing metadata """
context = req.environ['cinder.context']
metadata = self._get_metadata(context, snapshot_id)
if id not in metadata:
msg = _("Metadata item was not found")
raise exc.HTTPNotFound(explanation=msg)
try:
snapshot = self.volume_api.get_snapshot(context, snapshot_id)
self.volume_api.delete_snapshot_metadata(context, snapshot, id)
except exception.SnapshotNotFound:
msg = _('snapshot does not exist')
raise exc.HTTPNotFound(explanation=msg)
return webob.Response(status_int=200)
def create_resource():
return wsgi.Resource(Controller())

View File

@ -56,6 +56,15 @@ def _translate_snapshot_summary_view(context, snapshot):
d['status'] = snapshot['status']
d['size'] = snapshot['volume_size']
if snapshot.get('snapshot_metadata'):
metadata = snapshot.get('snapshot_metadata')
d['metadata'] = dict((item['key'], item['value']) for item in metadata)
# avoid circular ref when vol is a Volume instance
elif snapshot.get('metadata') and isinstance(snapshot.get('metadata'),
dict):
d['metadata'] = snapshot['metadata']
else:
d['metadata'] = {}
return d
@ -67,6 +76,7 @@ def make_snapshot(elem):
elem.set('name')
elem.set('description')
elem.set('volume_id')
elem.append(common.MetadataTemplate())
class SnapshotTemplate(xmlutil.TemplateBuilder):
@ -153,12 +163,15 @@ class SnapshotsController(wsgi.Controller):
@wsgi.serializers(xml=SnapshotTemplate)
def create(self, req, body):
"""Creates a new snapshot."""
kwargs = {}
context = req.environ['cinder.context']
if not self.is_valid_body(body, 'snapshot'):
raise exc.HTTPUnprocessableEntity()
snapshot = body['snapshot']
kwargs['metadata'] = snapshot.get('metadata', None)
volume_id = snapshot['volume_id']
volume = self.volume_api.get(context, volume_id)
force = snapshot.get('force', False)
@ -179,13 +192,15 @@ class SnapshotsController(wsgi.Controller):
context,
volume,
snapshot.get('display_name'),
snapshot.get('description'))
snapshot.get('description'),
**kwargs)
else:
new_snapshot = self.volume_api.create_snapshot(
context,
volume,
snapshot.get('display_name'),
snapshot.get('description'))
snapshot.get('description'),
**kwargs)
retval = _translate_snapshot_detail_view(context, new_snapshot)

View File

@ -320,6 +320,24 @@ def snapshot_data_get_for_project(context, project_id, session=None):
####################
def snapshot_metadata_get(context, snapshot_id):
"""Get all metadata for a snapshot."""
return IMPL.snapshot_metadata_get(context, snapshot_id)
def snapshot_metadata_delete(context, snapshot_id, key):
"""Delete the given metadata item."""
IMPL.snapshot_metadata_delete(context, snapshot_id, key)
def snapshot_metadata_update(context, snapshot_id, metadata, delete):
"""Update metadata if it exists, otherwise create it."""
IMPL.snapshot_metadata_update(context, snapshot_id, metadata, delete)
####################
def volume_metadata_get(context, volume_id):
"""Get all metadata for a volume."""
return IMPL.volume_metadata_get(context, volume_id)

View File

@ -1186,6 +1186,8 @@ def volume_metadata_update(context, volume_id, metadata, delete):
@require_context
def snapshot_create(context, values):
values['snapshot_metadata'] = _metadata_refs(values.get('metadata'),
models.SnapshotMetadata)
snapshot_ref = models.Snapshot()
if not values.get('id'):
values['id'] = str(uuid.uuid4())
@ -1194,7 +1196,8 @@ def snapshot_create(context, values):
session = get_session()
with session.begin():
snapshot_ref.save(session=session)
return snapshot_ref
return snapshot_get(context, values['id'], session=session)
@require_admin_context
@ -1265,6 +1268,85 @@ def snapshot_update(context, snapshot_id, values):
snapshot_ref.update(values)
snapshot_ref.save(session=session)
####################
def _snapshot_metadata_get_query(context, snapshot_id, session=None):
return model_query(context, models.SnapshotMetadata,
session=session, read_deleted="no").\
filter_by(snapshot_id=snapshot_id)
@require_context
@require_snapshot_exists
def snapshot_metadata_get(context, snapshot_id):
rows = _snapshot_metadata_get_query(context, snapshot_id).all()
result = {}
for row in rows:
result[row['key']] = row['value']
return result
@require_context
@require_snapshot_exists
def snapshot_metadata_delete(context, snapshot_id, key):
_snapshot_metadata_get_query(context, snapshot_id).\
filter_by(key=key).\
update({'deleted': True,
'deleted_at': timeutils.utcnow(),
'updated_at': literal_column('updated_at')})
@require_context
@require_snapshot_exists
def snapshot_metadata_get_item(context, snapshot_id, key, session=None):
result = _snapshot_metadata_get_query(context,
snapshot_id,
session=session).\
filter_by(key=key).\
first()
if not result:
raise exception.SnapshotMetadataNotFound(metadata_key=key,
snapshot_id=snapshot_id)
return result
@require_context
@require_snapshot_exists
def snapshot_metadata_update(context, snapshot_id, metadata, delete):
session = get_session()
# Set existing metadata to deleted if delete argument is True
if delete:
original_metadata = snapshot_metadata_get(context, snapshot_id)
for meta_key, meta_value in original_metadata.iteritems():
if meta_key not in metadata:
meta_ref = snapshot_metadata_get_item(context, snapshot_id,
meta_key, session)
meta_ref.update({'deleted': True})
meta_ref.save(session=session)
meta_ref = None
# Now update all existing items with new values, or create new meta objects
for meta_key, meta_value in metadata.items():
# update the value whether it exists or not
item = {"value": meta_value}
try:
meta_ref = snapshot_metadata_get_item(context, snapshot_id,
meta_key, session)
except exception.SnapshotMetadataNotFound as e:
meta_ref = models.SnapshotMetadata()
item.update({"key": meta_key, "snapshot_id": snapshot_id})
meta_ref.update(item)
meta_ref.save(session=session)
return metadata
###################

View File

@ -0,0 +1,60 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Boolean, Column, DateTime
from sqlalchemy import Integer, MetaData, String, Table, ForeignKey
from cinder.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
snapshots = Table('snapshots', meta, autoload=True)
# New table
snapshot_metadata = Table(
'snapshot_metadata', meta,
Column('created_at', DateTime),
Column('updated_at', DateTime),
Column('deleted_at', DateTime),
Column('deleted', Boolean),
Column('id', Integer, primary_key=True, nullable=False),
Column('snapshot_id', String(length=36), ForeignKey('snapshots.id'),
nullable=False),
Column('key', String(length=255)),
Column('value', String(length=255)),
mysql_engine='InnoDB'
)
try:
snapshot_metadata.create()
except Exception:
LOG.error(_("Table |%s| not created!"), repr(snapshot_metadata))
raise
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
snapshot_metadata = Table('snapshot_metadata',
meta,
autoload=True)
try:
snapshot_metadata.drop()
except Exception:
LOG.error(_("snapshot_metadata table not dropped"))

View File

@ -325,6 +325,22 @@ class Snapshot(BASE, CinderBase):
'Snapshot.deleted == False)')
class SnapshotMetadata(BASE, CinderBase):
"""Represents a metadata key/value pair for a snapshot."""
__tablename__ = 'snapshot_metadata'
id = Column(Integer, primary_key=True)
key = Column(String(255))
value = Column(String(255))
snapshot_id = Column(String(36),
ForeignKey('snapshots.id'),
nullable=False)
snapshot = relationship(Snapshot, backref="snapshot_metadata",
foreign_keys=snapshot_id,
primaryjoin='and_('
'SnapshotMetadata.snapshot_id == Snapshot.id,'
'SnapshotMetadata.deleted == False)')
class IscsiTarget(BASE, CinderBase):
"""Represents an iscsi target for a given host."""
__tablename__ = 'iscsi_targets'
@ -427,6 +443,7 @@ def register_models():
SMVolume,
Volume,
VolumeMetadata,
SnapshotMetadata,
VolumeTypeExtraSpecs,
VolumeTypes,
VolumeGlanceMetadata,

View File

@ -259,6 +259,19 @@ class InvalidVolumeMetadataSize(Invalid):
message = _("Invalid metadata size") + ": %(reason)s"
class SnapshotMetadataNotFound(NotFound):
message = _("Snapshot %(snapshot_id)s has no metadata with "
"key %(metadata_key)s.")
class InvalidSnapshotMetadata(Invalid):
message = _("Invalid metadata") + ": %(reason)s"
class InvalidSnapshotMetadataSize(Invalid):
message = _("Invalid metadata size") + ": %(reason)s"
class VolumeTypeNotFound(NotFound):
message = _("Volume type %(volume_type_id)s could not be found.")

View File

@ -0,0 +1,458 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import webob
from cinder.api import extensions
from cinder.api.v1 import snapshot_metadata
from cinder.api.v1 import snapshots
import cinder.db
from cinder import exception
from cinder.openstack.common import cfg
from cinder.openstack.common import jsonutils
from cinder import test
from cinder.tests.api import fakes
CONF = cfg.CONF
def return_create_snapshot_metadata_max(context,
snapshot_id,
metadata,
delete):
return stub_max_snapshot_metadata()
def return_create_snapshot_metadata(context, snapshot_id, metadata, delete):
return stub_snapshot_metadata()
def return_snapshot_metadata(context, snapshot_id):
if not isinstance(snapshot_id, str) or not len(snapshot_id) == 36:
msg = 'id %s must be a uuid in return snapshot metadata' % snapshot_id
raise Exception(msg)
return stub_snapshot_metadata()
def return_empty_snapshot_metadata(context, snapshot_id):
return {}
def delete_snapshot_metadata(context, snapshot_id, key):
pass
def stub_snapshot_metadata():
metadata = {
"key1": "value1",
"key2": "value2",
"key3": "value3",
}
return metadata
def stub_max_snapshot_metadata():
metadata = {"metadata": {}}
for num in range(CONF.quota_metadata_items):
metadata['metadata']['key%i' % num] = "blah"
return metadata
def return_snapshot(context, snapshot_id):
return {'id': '0cc3346e-9fef-4445-abe6-5d2b2690ec64',
'name': 'fake',
'status': 'available',
'metadata': {}}
def return_volume(context, volume_id):
return {'id': 'fake-vol-id',
'size': 100,
'name': 'fake',
'host': 'fake-host',
'status': 'available',
'metadata': {}}
def return_snapshot_nonexistent(context, snapshot_id):
raise exception.SnapshotNotFound('bogus test message')
def fake_update_snapshot_metadata(self, context, snapshot, diff):
pass
class SnapshotMetaDataTest(test.TestCase):
def setUp(self):
super(SnapshotMetaDataTest, self).setUp()
self.volume_api = cinder.volume.api.API()
fakes.stub_out_key_pair_funcs(self.stubs)
self.stubs.Set(cinder.db, 'volume_get', return_volume)
self.stubs.Set(cinder.db, 'snapshot_get', return_snapshot)
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_snapshot_metadata)
self.stubs.Set(self.volume_api, 'update_snapshot_metadata',
fake_update_snapshot_metadata)
self.ext_mgr = extensions.ExtensionManager()
self.ext_mgr.extensions = {}
self.snapshot_controller = snapshots.SnapshotsController(self.ext_mgr)
self.controller = snapshot_metadata.Controller()
self.id = str(uuid.uuid4())
self.url = '/v1/fake/snapshots/%s/metadata' % self.id
snap = {"volume_size": 100,
"volume_id": "fake-vol-id",
"display_name": "Volume Test Name",
"display_description": "Volume Test Desc",
"availability_zone": "zone1:host1",
"host": "fake-host",
"metadata": {}}
body = {"snapshot": snap}
req = fakes.HTTPRequest.blank('/v1/snapshots')
self.snapshot_controller.create(req, body)
def test_index(self):
req = fakes.HTTPRequest.blank(self.url)
res_dict = self.controller.index(req, self.id)
expected = {
'metadata': {
'key1': 'value1',
'key2': 'value2',
'key3': 'value3',
},
}
self.assertEqual(expected, res_dict)
def test_index_nonexistent_snapshot(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_snapshot_nonexistent)
req = fakes.HTTPRequest.blank(self.url)
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.index, req, self.url)
def test_index_no_data(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_empty_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
res_dict = self.controller.index(req, self.id)
expected = {'metadata': {}}
self.assertEqual(expected, res_dict)
def test_show(self):
req = fakes.HTTPRequest.blank(self.url + '/key2')
res_dict = self.controller.show(req, self.id, 'key2')
expected = {'meta': {'key2': 'value2'}}
self.assertEqual(expected, res_dict)
def test_show_nonexistent_snapshot(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_snapshot_nonexistent)
req = fakes.HTTPRequest.blank(self.url + '/key2')
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.show, req, self.id, 'key2')
def test_show_meta_not_found(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_empty_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key6')
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.show, req, self.id, 'key6')
def test_delete(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_snapshot_metadata)
self.stubs.Set(cinder.db, 'snapshot_metadata_delete',
delete_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key2')
req.method = 'DELETE'
res = self.controller.delete(req, self.id, 'key2')
self.assertEqual(200, res.status_int)
def test_delete_nonexistent_snapshot(self):
self.stubs.Set(cinder.db, 'snapshot_get',
return_snapshot_nonexistent)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'DELETE'
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.delete, req, self.id, 'key1')
def test_delete_meta_not_found(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_empty_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key6')
req.method = 'DELETE'
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.delete, req, self.id, 'key6')
def test_create(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_empty_snapshot_metadata)
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank('/v1/snapshot_metadata')
req.method = 'POST'
req.content_type = "application/json"
body = {"metadata": {"key9": "value9"}}
req.body = jsonutils.dumps(body)
res_dict = self.controller.create(req, self.id, body)
self.assertEqual(body, res_dict)
def test_create_empty_body(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'POST'
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, req, self.id, None)
def test_create_item_empty_key(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"": "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, req, self.id, body)
def test_create_item_key_too_long(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {("a" * 260): "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create,
req, self.id, body)
def test_create_nonexistent_snapshot(self):
self.stubs.Set(cinder.db, 'snapshot_get',
return_snapshot_nonexistent)
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_snapshot_metadata)
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank('/v1/snapshot_metadata')
req.method = 'POST'
req.content_type = "application/json"
body = {"metadata": {"key9": "value9"}}
req.body = jsonutils.dumps(body)
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.create, req, self.id, body)
def test_update_all(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {
'metadata': {
'key10': 'value10',
'key99': 'value99',
},
}
req.body = jsonutils.dumps(expected)
res_dict = self.controller.update_all(req, self.id, expected)
self.assertEqual(expected, res_dict)
def test_update_all_empty_container(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {'metadata': {}}
req.body = jsonutils.dumps(expected)
res_dict = self.controller.update_all(req, self.id, expected)
self.assertEqual(expected, res_dict)
def test_update_all_malformed_container(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {'meta': {}}
req.body = jsonutils.dumps(expected)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update_all, req, self.id, expected)
def test_update_all_malformed_data(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {'metadata': ['asdf']}
req.body = jsonutils.dumps(expected)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update_all, req, self.id, expected)
def test_update_all_nonexistent_snapshot(self):
self.stubs.Set(cinder.db, 'snapshot_get', return_snapshot_nonexistent)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
body = {'metadata': {'key10': 'value10'}}
req.body = jsonutils.dumps(body)
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.update_all, req, '100', body)
def test_update_item(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"key1": "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
res_dict = self.controller.update(req, self.id, 'key1', body)
expected = {'meta': {'key1': 'value1'}}
self.assertEqual(expected, res_dict)
def test_update_item_nonexistent_snapshot(self):
self.stubs.Set(cinder.db, 'snapshot_get',
return_snapshot_nonexistent)
req = fakes.HTTPRequest.blank(
'/v1.1/fake/snapshots/asdf/metadata/key1')
req.method = 'PUT'
body = {"meta": {"key1": "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.update, req, self.id, 'key1', body)
def test_update_item_empty_body(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, self.id, 'key1', None)
def test_update_item_empty_key(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"": "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, self.id, '', body)
def test_update_item_key_too_long(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {("a" * 260): "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.update,
req, self.id, ("a" * 260), body)
def test_update_item_value_too_long(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"key1": ("a" * 260)}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.update,
req, self.id, "key1", body)
def test_update_item_too_many_keys(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"key1": "value1", "key2": "value2"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, self.id, 'key1', body)
def test_update_item_body_uri_mismatch(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/bad')
req.method = 'PUT'
body = {"meta": {"key1": "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, self.id, 'bad', body)
def test_invalid_metadata_items_on_create(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'POST'
req.headers["content-type"] = "application/json"
#test for long key
data = {"metadata": {"a" * 260: "value1"}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req, self.id, data)
#test for long value
data = {"metadata": {"key": "v" * 260}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req, self.id, data)
#test for empty key.
data = {"metadata": {"": "value1"}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, req, self.id, data)

View File

@ -46,11 +46,14 @@ def _get_default_snapshot_param():
'display_description': 'Default description', }
def stub_snapshot_create(self, context, volume_id, name, description):
def stub_snapshot_create(self, context,
volume_id, name,
description, metadata):
snapshot = _get_default_snapshot_param()
snapshot['volume_id'] = volume_id
snapshot['display_name'] = name
snapshot['display_description'] = description
snapshot['metadata'] = metadata
return snapshot
@ -145,6 +148,7 @@ class SnapshotApiTest(test.TestCase):
'created_at': None,
'display_name': 'Updated Test Name',
'display_description': 'Default description',
'metadata': {},
}}
self.assertEquals(expected, res_dict)

View File

@ -0,0 +1,458 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import webob
from cinder.api import extensions
from cinder.api.v2 import snapshot_metadata
from cinder.api.v2 import snapshots
import cinder.db
from cinder import exception
from cinder.openstack.common import cfg
from cinder.openstack.common import jsonutils
from cinder import test
from cinder.tests.api import fakes
CONF = cfg.CONF
def return_create_snapshot_metadata_max(context,
snapshot_id,
metadata,
delete):
return stub_max_snapshot_metadata()
def return_create_snapshot_metadata(context, snapshot_id, metadata, delete):
return stub_snapshot_metadata()
def return_snapshot_metadata(context, snapshot_id):
if not isinstance(snapshot_id, str) or not len(snapshot_id) == 36:
msg = 'id %s must be a uuid in return snapshot metadata' % snapshot_id
raise Exception(msg)
return stub_snapshot_metadata()
def return_empty_snapshot_metadata(context, snapshot_id):
return {}
def delete_snapshot_metadata(context, snapshot_id, key):
pass
def stub_snapshot_metadata():
metadata = {
"key1": "value1",
"key2": "value2",
"key3": "value3",
}
return metadata
def stub_max_snapshot_metadata():
metadata = {"metadata": {}}
for num in range(CONF.quota_metadata_items):
metadata['metadata']['key%i' % num] = "blah"
return metadata
def return_snapshot(context, snapshot_id):
return {'id': '0cc3346e-9fef-4445-abe6-5d2b2690ec64',
'name': 'fake',
'status': 'available',
'metadata': {}}
def return_volume(context, volume_id):
return {'id': 'fake-vol-id',
'size': 100,
'name': 'fake',
'host': 'fake-host',
'status': 'available',
'metadata': {}}
def return_snapshot_nonexistent(context, snapshot_id):
raise exception.SnapshotNotFound('bogus test message')
def fake_update_snapshot_metadata(self, context, snapshot, diff):
pass
class SnapshotMetaDataTest(test.TestCase):
def setUp(self):
super(SnapshotMetaDataTest, self).setUp()
self.volume_api = cinder.volume.api.API()
fakes.stub_out_key_pair_funcs(self.stubs)
self.stubs.Set(cinder.db, 'volume_get', return_volume)
self.stubs.Set(cinder.db, 'snapshot_get', return_snapshot)
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_snapshot_metadata)
self.stubs.Set(self.volume_api, 'update_snapshot_metadata',
fake_update_snapshot_metadata)
self.ext_mgr = extensions.ExtensionManager()
self.ext_mgr.extensions = {}
self.snapshot_controller = snapshots.SnapshotsController(self.ext_mgr)
self.controller = snapshot_metadata.Controller()
self.id = str(uuid.uuid4())
self.url = '/v2/fake/snapshots/%s/metadata' % self.id
snap = {"volume_size": 100,
"volume_id": "fake-vol-id",
"display_name": "Volume Test Name",
"display_description": "Volume Test Desc",
"availability_zone": "zone1:host1",
"host": "fake-host",
"metadata": {}}
body = {"snapshot": snap}
req = fakes.HTTPRequest.blank('/v2/snapshots')
self.snapshot_controller.create(req, body)
def test_index(self):
req = fakes.HTTPRequest.blank(self.url)
res_dict = self.controller.index(req, self.id)
expected = {
'metadata': {
'key1': 'value1',
'key2': 'value2',
'key3': 'value3',
},
}
self.assertEqual(expected, res_dict)
def test_index_nonexistent_snapshot(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_snapshot_nonexistent)
req = fakes.HTTPRequest.blank(self.url)
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.index, req, self.url)
def test_index_no_data(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_empty_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
res_dict = self.controller.index(req, self.id)
expected = {'metadata': {}}
self.assertEqual(expected, res_dict)
def test_show(self):
req = fakes.HTTPRequest.blank(self.url + '/key2')
res_dict = self.controller.show(req, self.id, 'key2')
expected = {'meta': {'key2': 'value2'}}
self.assertEqual(expected, res_dict)
def test_show_nonexistent_snapshot(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_snapshot_nonexistent)
req = fakes.HTTPRequest.blank(self.url + '/key2')
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.show, req, self.id, 'key2')
def test_show_meta_not_found(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_empty_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key6')
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.show, req, self.id, 'key6')
def test_delete(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_snapshot_metadata)
self.stubs.Set(cinder.db, 'snapshot_metadata_delete',
delete_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key2')
req.method = 'DELETE'
res = self.controller.delete(req, self.id, 'key2')
self.assertEqual(200, res.status_int)
def test_delete_nonexistent_snapshot(self):
self.stubs.Set(cinder.db, 'snapshot_get',
return_snapshot_nonexistent)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'DELETE'
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.delete, req, self.id, 'key1')
def test_delete_meta_not_found(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_empty_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key6')
req.method = 'DELETE'
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.delete, req, self.id, 'key6')
def test_create(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_empty_snapshot_metadata)
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank('/v2/snapshot_metadata')
req.method = 'POST'
req.content_type = "application/json"
body = {"metadata": {"key9": "value9"}}
req.body = jsonutils.dumps(body)
res_dict = self.controller.create(req, self.id, body)
self.assertEqual(body, res_dict)
def test_create_empty_body(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'POST'
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, req, self.id, None)
def test_create_item_empty_key(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"": "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, req, self.id, body)
def test_create_item_key_too_long(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {("a" * 260): "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create,
req, self.id, body)
def test_create_nonexistent_snapshot(self):
self.stubs.Set(cinder.db, 'snapshot_get',
return_snapshot_nonexistent)
self.stubs.Set(cinder.db, 'snapshot_metadata_get',
return_snapshot_metadata)
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank('/v2/snapshot_metadata')
req.method = 'POST'
req.content_type = "application/json"
body = {"metadata": {"key9": "value9"}}
req.body = jsonutils.dumps(body)
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.create, req, self.id, body)
def test_update_all(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {
'metadata': {
'key10': 'value10',
'key99': 'value99',
},
}
req.body = jsonutils.dumps(expected)
res_dict = self.controller.update_all(req, self.id, expected)
self.assertEqual(expected, res_dict)
def test_update_all_empty_container(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {'metadata': {}}
req.body = jsonutils.dumps(expected)
res_dict = self.controller.update_all(req, self.id, expected)
self.assertEqual(expected, res_dict)
def test_update_all_malformed_container(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {'meta': {}}
req.body = jsonutils.dumps(expected)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update_all, req, self.id, expected)
def test_update_all_malformed_data(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {'metadata': ['asdf']}
req.body = jsonutils.dumps(expected)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update_all, req, self.id, expected)
def test_update_all_nonexistent_snapshot(self):
self.stubs.Set(cinder.db, 'snapshot_get', return_snapshot_nonexistent)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
body = {'metadata': {'key10': 'value10'}}
req.body = jsonutils.dumps(body)
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.update_all, req, '100', body)
def test_update_item(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"key1": "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
res_dict = self.controller.update(req, self.id, 'key1', body)
expected = {'meta': {'key1': 'value1'}}
self.assertEqual(expected, res_dict)
def test_update_item_nonexistent_snapshot(self):
self.stubs.Set(cinder.db, 'snapshot_get',
return_snapshot_nonexistent)
req = fakes.HTTPRequest.blank(
'/v2/fake/snapshots/asdf/metadata/key1')
req.method = 'PUT'
body = {"meta": {"key1": "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.update, req, self.id, 'key1', body)
def test_update_item_empty_body(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, self.id, 'key1', None)
def test_update_item_empty_key(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"": "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, self.id, '', body)
def test_update_item_key_too_long(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {("a" * 260): "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.update,
req, self.id, ("a" * 260), body)
def test_update_item_value_too_long(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"key1": ("a" * 260)}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.update,
req, self.id, "key1", body)
def test_update_item_too_many_keys(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"key1": "value1", "key2": "value2"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, self.id, 'key1', body)
def test_update_item_body_uri_mismatch(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url + '/bad')
req.method = 'PUT'
body = {"meta": {"key1": "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, self.id, 'bad', body)
def test_invalid_metadata_items_on_create(self):
self.stubs.Set(cinder.db, 'snapshot_metadata_update',
return_create_snapshot_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'POST'
req.headers["content-type"] = "application/json"
#test for long key
data = {"metadata": {"a" * 260: "value1"}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req, self.id, data)
#test for long value
data = {"metadata": {"key": "v" * 260}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req, self.id, data)
#test for empty key.
data = {"metadata": {"": "value1"}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, req, self.id, data)

View File

@ -48,11 +48,14 @@ def _get_default_snapshot_param():
}
def stub_snapshot_create(self, context, volume_id, name, description):
def stub_snapshot_create(self, context,
volume_id, name,
description, metadata):
snapshot = _get_default_snapshot_param()
snapshot['volume_id'] = volume_id
snapshot['display_name'] = name
snapshot['display_description'] = description
snapshot['metadata'] = metadata
return snapshot
@ -160,6 +163,7 @@ class SnapshotApiTest(test.TestCase):
'created_at': None,
'name': 'Updated Test Name',
'description': 'Default description',
'metadata': {},
}
}
self.assertEquals(expected, res_dict)

View File

@ -585,3 +585,45 @@ class TestMigrations(test.TestCase):
self.assertFalse(engine.dialect.has_table(engine.connect(),
"backups"))
def test_migration_009(self):
"""Test adding snapshot_metadata table works correctly."""
for (key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.INIT_VERSION)
migration_api.upgrade(engine, TestMigrations.REPOSITORY, 8)
metadata = sqlalchemy.schema.MetaData()
metadata.bind = engine
migration_api.upgrade(engine, TestMigrations.REPOSITORY, 9)
self.assertTrue(engine.dialect.has_table(engine.connect(),
"snapshot_metadata"))
snapshot_metadata = sqlalchemy.Table('snapshot_metadata',
metadata,
autoload=True)
self.assertTrue(isinstance(snapshot_metadata.c.created_at.type,
sqlalchemy.types.DATETIME))
self.assertTrue(isinstance(snapshot_metadata.c.updated_at.type,
sqlalchemy.types.DATETIME))
self.assertTrue(isinstance(snapshot_metadata.c.deleted_at.type,
sqlalchemy.types.DATETIME))
self.assertTrue(isinstance(snapshot_metadata.c.deleted.type,
sqlalchemy.types.BOOLEAN))
self.assertTrue(isinstance(snapshot_metadata.c.deleted.type,
sqlalchemy.types.BOOLEAN))
self.assertTrue(isinstance(snapshot_metadata.c.id.type,
sqlalchemy.types.INTEGER))
self.assertTrue(isinstance(snapshot_metadata.c.snapshot_id.type,
sqlalchemy.types.VARCHAR))
self.assertTrue(isinstance(snapshot_metadata.c.key.type,
sqlalchemy.types.VARCHAR))
self.assertTrue(isinstance(snapshot_metadata.c.value.type,
sqlalchemy.types.VARCHAR))
migration_api.downgrade(engine, TestMigrations.REPOSITORY, 8)
self.assertFalse(engine.dialect.has_table(engine.connect(),
"snapshot_metadata"))

View File

@ -25,13 +25,11 @@ import functools
from oslo.config import cfg
from cinder.db import base
from cinder.db.sqlalchemy import models
from cinder import exception
from cinder import flags
from cinder.image import glance
from cinder.openstack.common import excutils
from cinder.openstack.common import log as logging
from cinder.openstack.common import rpc
from cinder.openstack.common import timeutils
import cinder.policy
from cinder import quota
@ -484,14 +482,16 @@ class API(base.Base):
connector,
force)
def _create_snapshot(self, context, volume, name, description,
force=False):
def _create_snapshot(self, context,
volume, name, description,
force=False, metadata=None):
check_policy(context, 'create_snapshot', volume)
if ((not force) and (volume['status'] != "available")):
msg = _("must be available")
raise exception.InvalidVolume(reason=msg)
self._check_metadata_properties(context, metadata)
options = {'volume_id': volume['id'],
'user_id': context.user_id,
'project_id': context.project_id,
@ -499,20 +499,25 @@ class API(base.Base):
'progress': '0%',
'volume_size': volume['size'],
'display_name': name,
'display_description': description}
'display_description': description,
'metadata': metadata}
snapshot = self.db.snapshot_create(context, options)
self.volume_rpcapi.create_snapshot(context, volume, snapshot)
return snapshot
def create_snapshot(self, context, volume, name, description):
def create_snapshot(self, context,
volume, name,
description, metadata=None):
return self._create_snapshot(context, volume, name, description,
False)
False, metadata)
def create_snapshot_force(self, context, volume, name, description):
def create_snapshot_force(self, context,
volume, name,
description, metadata=None):
return self._create_snapshot(context, volume, name, description,
True)
True, metadata)
@wrap_check_policy
def delete_snapshot(self, context, snapshot, force=False):
@ -589,6 +594,45 @@ class API(base.Base):
return i['value']
return None
def get_snapshot_metadata(self, context, snapshot):
"""Get all metadata associated with a snapshot."""
rv = self.db.snapshot_metadata_get(context, snapshot['id'])
return dict(rv.iteritems())
def delete_snapshot_metadata(self, context, snapshot, key):
"""Delete the given metadata item from a snapshot."""
self.db.snapshot_metadata_delete(context, snapshot['id'], key)
def update_snapshot_metadata(self, context,
snapshot, metadata,
delete=False):
"""Updates or creates snapshot metadata.
If delete is True, metadata items that are not specified in the
`metadata` argument will be deleted.
"""
orig_meta = self.get_snapshot_metadata(context, snapshot)
if delete:
_metadata = metadata
else:
_metadata = orig_meta.copy()
_metadata.update(metadata)
self._check_metadata_properties(context, _metadata)
self.db.snapshot_metadata_update(context,
snapshot['id'],
_metadata,
True)
# TODO(jdg): Implement an RPC call for drivers that may use this info
return _metadata
def get_snapshot_metadata_value(self, snapshot, key):
pass
@wrap_check_policy
def get_volume_image_metadata(self, context, volume):
db_data = self.db.volume_glance_metadata_get(context, volume['id'])