Implement DB migration for volume transfer BP

This patch implements the backend for volume transfers as specified in the
Volume transfer blueprint.

Change-Id: Id18e57942c3908e2539f3b0845263d0fbdd198f5
implement bp: volume-transfer
This commit is contained in:
Ollie Leahy 2013-05-15 10:05:49 +00:00
parent a003e4a41f
commit 7d1534c8f3
14 changed files with 1709 additions and 1 deletions

View File

@ -0,0 +1,244 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import webob
from webob import exc
from xml.dom import minidom
from cinder.api import common
from cinder.api import extensions
from cinder.api.openstack import wsgi
from cinder.api.views import transfers as transfer_view
from cinder.api import xmlutil
from cinder import exception
from cinder.openstack.common import log as logging
from cinder import transfer as transferAPI
LOG = logging.getLogger(__name__)
def make_transfer(elem):
elem.set('id')
elem.set('volume_id')
elem.set('created_at')
elem.set('name')
elem.set('auth_key')
class TransferTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('transfer', selector='transfer')
make_transfer(root)
alias = Volume_transfer.alias
namespace = Volume_transfer.namespace
return xmlutil.MasterTemplate(root, 1, nsmap={alias: namespace})
class TransfersTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('transfers')
elem = xmlutil.SubTemplateElement(root, 'transfer',
selector='transfers')
make_transfer(elem)
alias = Volume_transfer.alias
namespace = Volume_transfer.namespace
return xmlutil.MasterTemplate(root, 1, nsmap={alias: namespace})
class CreateDeserializer(wsgi.MetadataXMLDeserializer):
def default(self, string):
dom = minidom.parseString(string)
transfer = self._extract_transfer(dom)
return {'body': {'transfer': transfer}}
def _extract_transfer(self, node):
transfer = {}
transfer_node = self.find_first_child_named(node, 'transfer')
attributes = ['volume_id', 'display_name']
for attr in attributes:
if transfer_node.getAttribute(attr):
transfer[attr] = transfer_node.getAttribute(attr)
return transfer
class AcceptDeserializer(wsgi.MetadataXMLDeserializer):
def default(self, string):
dom = minidom.parseString(string)
transfer = self._extract_transfer(dom)
return {'body': {'accept': transfer}}
def _extract_transfer(self, node):
transfer = {}
transfer_node = self.find_first_child_named(node, 'accept')
attributes = ['auth_key']
for attr in attributes:
if transfer_node.getAttribute(attr):
transfer[attr] = transfer_node.getAttribute(attr)
return transfer
class VolumeTransferController(wsgi.Controller):
""" The Volume Transfer API controller for the Openstack API."""
_view_builder_class = transfer_view.ViewBuilder
def __init__(self):
self.transfer_api = transferAPI.API()
super(VolumeTransferController, self).__init__()
@wsgi.serializers(xml=TransferTemplate)
def show(self, req, id):
"""Return data about active transfers."""
context = req.environ['cinder.context']
try:
transfer = self.transfer_api.get(context, transfer_id=id)
except exception.TransferNotFound as error:
raise exc.HTTPNotFound(explanation=unicode(error))
return self._view_builder.detail(req, transfer)
@wsgi.serializers(xml=TransfersTemplate)
def index(self, req):
"""Returns a summary list of transfers"""
return self._get_transfers(req, is_detail=False)
@wsgi.serializers(xml=TransfersTemplate)
def detail(self, req):
"""Returns a detailed list of tranfers."""
return self._get_transfers(req, is_detail=True)
def _get_transfers(self, req, is_detail):
"""Returns a list of transfers, transformed through view builder."""
context = req.environ['cinder.context']
LOG.debug(_('Listing volume transfers'))
transfers = self.transfer_api.get_all(context)
limited_list = common.limited(transfers, req)
if is_detail:
transfers = self._view_builder.detail_list(req, limited_list)
else:
transfers = self._view_builder.summary_list(req, limited_list)
return transfers
@wsgi.response(202)
@wsgi.serializers(xml=TransferTemplate)
@wsgi.deserializers(xml=CreateDeserializer)
def create(self, req, body):
"""Create a new volume transfer."""
LOG.debug(_('Creating new volume transfer %s'), body)
if not self.is_valid_body(body, 'transfer'):
raise exc.HTTPBadRequest()
context = req.environ['cinder.context']
try:
transfer = body['transfer']
volume_id = transfer['volume_id']
except KeyError:
msg = _("Incorrect request body format")
raise exc.HTTPBadRequest(explanation=msg)
name = transfer.get('name', None)
LOG.audit(_("Creating transfer of volume %(volume_id)s"), locals(),
context=context)
try:
new_transfer = self.transfer_api.create(context, volume_id, name)
except exception.InvalidVolume as error:
raise exc.HTTPBadRequest(explanation=unicode(error))
except exception.VolumeNotFound as error:
raise exc.HTTPNotFound(explanation=unicode(error))
transfer = self._view_builder.create(req,
dict(new_transfer.iteritems()))
return transfer
@wsgi.response(202)
@wsgi.serializers(xml=TransferTemplate)
@wsgi.deserializers(xml=AcceptDeserializer)
def accept(self, req, id, body):
"""Accept a new volume transfer."""
transfer_id = id
LOG.debug(_('Accepting volume transfer %s'), transfer_id)
if not self.is_valid_body(body, 'accept'):
raise exc.HTTPBadRequest()
context = req.environ['cinder.context']
try:
accept = body['accept']
auth_key = accept['auth_key']
except KeyError:
msg = _("Incorrect request body format")
raise exc.HTTPBadRequest(explanation=msg)
LOG.audit(_("Accepting transfer %(transfer_id)s"), locals(),
context=context)
try:
accepted_transfer = self.transfer_api.accept(context, transfer_id,
auth_key)
except exception.VolumeSizeExceedsAvailableQuota as error:
raise exc.HTTPRequestEntityTooLarge(
explanation=error.message, headers={'Retry-After': 0})
except exception.InvalidVolume as error:
raise exc.HTTPBadRequest(explanation=unicode(error))
transfer = self._view_builder.summary(req,
dict(accepted_transfer.iteritems()))
return transfer
def delete(self, req, id):
"""Delete a transfer."""
context = req.environ['cinder.context']
LOG.audit(_("Delete transfer with id: %s"), id, context=context)
try:
self.transfer_api.delete(context, transfer_id=id)
except exception.TransferNotFound as error:
raise exc.HTTPNotFound(explanation=unicode(error))
return webob.Response(status_int=202)
class Volume_transfer(extensions.ExtensionDescriptor):
"""Volume transfer management support"""
name = "VolumeTransfer"
alias = "os-volume-transfer"
namespace = "http://docs.openstack.org/volume/ext/volume-transfer/" + \
"api/v1.1"
updated = "2013-05-29T00:00:00+00:00"
def get_resources(self):
resources = []
res = extensions.ResourceExtension(Volume_transfer.alias,
VolumeTransferController(),
collection_actions={'detail':
'GET'},
member_actions={'accept': 'POST'})
resources.append(res)
return resources

View File

@ -0,0 +1,89 @@
# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinder.api import common
from cinder.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class ViewBuilder(common.ViewBuilder):
"""Model transfer API responses as a python dictionary."""
_collection_name = "transfers"
def __init__(self):
"""Initialize view builder."""
super(ViewBuilder, self).__init__()
def summary_list(self, request, transfers):
"""Show a list of transfers without many details."""
return self._list_view(self.summary, request, transfers)
def detail_list(self, request, transfers):
"""Detailed view of a list of transfers ."""
return self._list_view(self.detail, request, transfers)
def summary(self, request, transfer):
"""Generic, non-detailed view of a transfer."""
return {
'transfer': {
'id': transfer['id'],
'volume_id': transfer.get('volume_id'),
'name': transfer['display_name'],
'links': self._get_links(request,
transfer['id']),
},
}
def detail(self, request, transfer):
"""Detailed view of a single transfer."""
return {
'transfer': {
'id': transfer.get('id'),
'created_at': transfer.get('created_at'),
'name': transfer.get('display_name'),
'volume_id': transfer.get('volume_id'),
'links': self._get_links(request, transfer['id'])
}
}
def create(self, request, transfer):
"""Detailed view of a single transfer when created."""
return {
'transfer': {
'id': transfer.get('id'),
'created_at': transfer.get('created_at'),
'name': transfer.get('display_name'),
'volume_id': transfer.get('volume_id'),
'auth_key': transfer.get('auth_key'),
'links': self._get_links(request, transfer['id'])
}
}
def _list_view(self, func, request, transfers):
"""Provide a view for a list of transfers."""
transfers_list = [func(request, transfer)['transfer'] for transfer in
transfers]
transfers_links = self._get_collection_links(request,
transfers,
self._collection_name)
transfers_dict = dict(transfers=transfers_list)
if transfers_links:
transfers_dict['transfers_links'] = transfers_links
return transfers_dict

View File

@ -757,3 +757,36 @@ def backup_update(context, backup_id, values):
def backup_destroy(context, backup_id):
"""Destroy the backup or raise if it does not exist."""
return IMPL.backup_destroy(context, backup_id)
###################
def transfer_get(context, transfer_id):
"""Get a volume transfer record or raise if it does not exist."""
return IMPL.transfer_get(context, transfer_id)
def transfer_get_all(context):
"""Get all volume transfer records."""
return IMPL.transfer_get_all(context)
def transfer_get_all_by_project(context, project_id):
"""Get all volume transfer records for specified project."""
return IMPL.transfer_get_all_by_project(context, project_id)
def transfer_create(context, values):
"""Create an entry in the transfers table."""
return IMPL.transfer_create(context, values)
def transfer_destroy(context, transfer_id):
"""Destroy a record in the volume transfer table."""
return IMPL.transfer_destroy(context, transfer_id)
def transfer_accept(context, transfer_id, user_id, project_id):
"""Accept a volume transfer."""
return IMPL.transfer_accept(context, transfer_id, user_id, project_id)

View File

@ -2020,3 +2020,135 @@ def backup_destroy(context, backup_id):
'deleted': True,
'deleted_at': timeutils.utcnow(),
'updated_at': literal_column('updated_at')})
###############################
@require_context
def transfer_get(context, transfer_id, session=None):
query = model_query(context, models.Transfer,
session=session).\
filter_by(id=transfer_id)
if not is_admin_context(context):
volume = models.Volume
query = query.options(joinedload('volume')).\
filter(volume.project_id == context.project_id)
result = query.first()
if not result:
raise exception.TransferNotFound(transfer_id=transfer_id)
return result
def _translate_transfers(transfers):
results = []
for transfer in transfers:
r = {}
r['id'] = transfer['id']
r['volume_id'] = transfer['volume_id']
r['display_name'] = transfer['display_name']
r['created_at'] = transfer['created_at']
r['deleted'] = transfer['deleted']
results.append(r)
return results
@require_admin_context
def transfer_get_all(context):
results = model_query(context, models.Transfer).all()
return _translate_transfers(results)
@require_context
def transfer_get_all_by_project(context, project_id):
authorize_project_context(context, project_id)
volume = models.Volume
query = model_query(context, models.Transfer).\
options(joinedload('volume')).\
filter(volume.project_id == project_id)
results = query.all()
return _translate_transfers(results)
@require_context
def transfer_create(context, values):
transfer = models.Transfer()
if not values.get('id'):
values['id'] = str(uuid.uuid4())
session = get_session()
with session.begin():
volume_ref = volume_get(context,
values['volume_id'],
session=session)
if volume_ref['status'] != 'available':
msg = _('Volume must be available')
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
volume_ref['status'] = 'awaiting-transfer'
transfer.update(values)
transfer.save(session=session)
volume_ref.update(volume_ref)
volume_ref.save(session=session)
return transfer
@require_context
def transfer_destroy(context, transfer_id):
session = get_session()
with session.begin():
transfer_ref = transfer_get(context,
transfer_id,
session=session)
volume_ref = volume_get(context,
transfer_ref['volume_id'],
session=session)
# If the volume state is not 'awaiting-transfer' don't change it, but
# we can still mark the transfer record as deleted.
if volume_ref['status'] != 'awaiting-transfer':
msg = _('Volume in unexpected state %s, '
'expected awaiting-transfer') % volume_ref['status']
LOG.error(msg)
else:
volume_ref['status'] = 'available'
volume_ref.update(volume_ref)
volume_ref.save(session=session)
session.query(models.Transfer).\
filter_by(id=transfer_id).\
update({'deleted': True,
'deleted_at': timeutils.utcnow(),
'updated_at': literal_column('updated_at')})
@require_context
def transfer_accept(context, transfer_id, user_id, project_id):
session = get_session()
with session.begin():
transfer_ref = transfer_get(context, transfer_id, session)
volume_id = transfer_ref['volume_id']
volume_ref = volume_get(context, volume_id, session=session)
if volume_ref['status'] != 'awaiting-transfer':
volume_status = volume_ref['status']
msg = _('Transfer %(transfer_id)s: Volume id %(volume_id)s in '
'unexpected state %(status)s, expected '
'awaiting-transfer') % {'transfer_id': transfer_id,
'volume_id': volume_ref['id'],
'status': volume_ref['status']}
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
volume_ref['status'] = 'available'
volume_ref['user_id'] = user_id
volume_ref['project_id'] = project_id
volume_ref['updated_at'] = literal_column('updated_at')
volume_ref.update(volume_ref)
volume_ref.save(session=session)
session.query(models.Transfer).\
filter_by(id=transfer_ref['id']).\
update({'deleted': True,
'deleted_at': timeutils.utcnow(),
'updated_at': literal_column('updated_at')})

View File

@ -0,0 +1,71 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Boolean, Column, DateTime
from sqlalchemy import MetaData, String, Table, ForeignKey
from cinder.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
volumes = Table('volumes', meta, autoload=True)
# New table
transfers = Table(
'transfers', meta,
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean),
Column('id', String(36), primary_key=True, nullable=False),
Column('volume_id', String(length=36), ForeignKey('volumes.id'),
nullable=False),
Column('display_name', String(length=255,
convert_unicode=True,
unicode_error=None,
_warn_on_bytestring=False)),
Column('salt', String(length=255,
convert_unicode=True,
unicode_error=None,
_warn_on_bytestring=False)),
Column('crypt_hash', String(length=255,
convert_unicode=True,
unicode_error=None,
_warn_on_bytestring=False)),
Column('expires_at', DateTime(timezone=False)),
mysql_engine='InnoDB'
)
try:
transfers.create()
except Exception:
LOG.error(_("Table |%s| not created!"), repr(transfers))
raise
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
transfers = Table('transfers',
meta,
autoload=True)
try:
transfers.drop()
except Exception:
LOG.error(_("transfers table not dropped"))

View File

@ -387,6 +387,22 @@ class Backup(BASE, CinderBase):
object_count = Column(Integer)
class Transfer(BASE, CinderBase):
"""Represents a volume transfer request."""
__tablename__ = 'transfers'
id = Column(String(36), primary_key=True)
volume_id = Column(String(36), ForeignKey('volumes.id'))
display_name = Column(String(255))
salt = Column(String(255))
crypt_hash = Column(String(255))
expires_at = Column(DateTime)
volume = relationship(Volume, backref="transfer",
foreign_keys=volume_id,
primaryjoin='and_('
'Transfer.volume_id == Volume.id,'
'Transfer.deleted == False)')
def register_models():
"""Register Models and create metadata.
@ -404,6 +420,7 @@ def register_models():
Volume,
VolumeMetadata,
SnapshotMetadata,
Transfer,
VolumeTypeExtraSpecs,
VolumeTypes,
VolumeGlanceMetadata,

View File

@ -182,6 +182,10 @@ class InvalidParameterValue(Invalid):
message = _("%(err)s")
class InvalidAuthKey(Invalid):
message = _("Invalid auth key") + ": %(reason)s"
class ServiceUnavailable(Invalid):
message = _("Service is unavailable at this time.")
@ -554,3 +558,7 @@ class InvalidBackup(Invalid):
class SwiftConnectionFailed(CinderException):
message = _("Connection to swift failed") + ": %(reason)s"
class TransferNotFound(NotFound):
message = _("Transfer %(transfer_id)s could not be found.")

View File

@ -217,6 +217,9 @@ global_opts = [
'with its options'),
cfg.BoolOpt('no_snapshot_gb_quota',
default=False,
help='Whether snapshots count against GigaByte quota'), ]
help='Whether snapshots count against GigaByte quota'),
cfg.StrOpt('transfer_api_class',
default='cinder.transfer.api.API',
help='The full class name of the volume transfer API class'), ]
FLAGS.register_opts(global_opts)

View File

@ -0,0 +1,572 @@
# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests for volume transfer code.
"""
import json
from xml.dom import minidom
import webob
from cinder import context
from cinder import db
from cinder import exception
from cinder.openstack.common import log as logging
from cinder import test
from cinder.tests.api import fakes
from cinder.transfer import API
import cinder.volume
LOG = logging.getLogger(__name__)
volume_transfer_api = API()
class VolumeTransferAPITestCase(test.TestCase):
"""Test Case for transfers API."""
def setUp(self):
super(VolumeTransferAPITestCase, self).setUp()
def tearDown(self):
super(VolumeTransferAPITestCase, self).tearDown()
@staticmethod
def _create_transfer(volume_id=1,
display_name='test_transfer'):
"""Create a transfer object."""
return volume_transfer_api.create(context.get_admin_context(),
volume_id,
display_name)
@staticmethod
def _create_volume(display_name='test_volume',
display_description='this is a test volume',
status='available',
size=1):
"""Create a volume object."""
vol = {}
vol['size'] = size
vol['user_id'] = 'fake'
vol['project_id'] = 'fake'
vol['status'] = status
vol['display_name'] = display_name
vol['display_description'] = display_description
vol['attach_status'] = status
return db.volume_create(context.get_admin_context(), vol)['id']
def test_show_transfer(self):
volume_id = self._create_volume(size=5)
transfer = self._create_transfer(volume_id)
LOG.debug('Created transfer with id %s' % transfer)
req = webob.Request.blank('/v2/fake/os-volume-transfer/%s' %
transfer['id'])
req.method = 'GET'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual(res_dict['transfer']['name'], 'test_transfer')
self.assertEqual(res_dict['transfer']['id'], transfer['id'])
self.assertEqual(res_dict['transfer']['volume_id'], volume_id)
db.transfer_destroy(context.get_admin_context(), transfer['id'])
db.volume_destroy(context.get_admin_context(), volume_id)
def test_show_transfer_xml_content_type(self):
volume_id = self._create_volume(size=5)
transfer = self._create_transfer(volume_id)
req = webob.Request.blank('/v2/fake/os-volume-transfer/%s' %
transfer['id'])
req.method = 'GET'
req.headers['Content-Type'] = 'application/xml'
req.headers['Accept'] = 'application/xml'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
dom = minidom.parseString(res.body)
transfer_xml = dom.getElementsByTagName('transfer')
name = transfer_xml.item(0).getAttribute('name')
self.assertEquals(name.strip(), "test_transfer")
db.transfer_destroy(context.get_admin_context(), transfer['id'])
db.volume_destroy(context.get_admin_context(), volume_id)
def test_show_transfer_with_transfer_NotFound(self):
req = webob.Request.blank('/v2/fake/os-volume-transfer/1234')
req.method = 'GET'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 404)
self.assertEqual(res_dict['itemNotFound']['code'], 404)
self.assertEqual(res_dict['itemNotFound']['message'],
'Transfer 1234 could not be found.')
def test_list_transfers_json(self):
volume_id_1 = self._create_volume(size=5)
volume_id_2 = self._create_volume(size=5)
transfer1 = self._create_transfer(volume_id_1)
transfer2 = self._create_transfer(volume_id_2)
req = webob.Request.blank('/v2/fake/os-volume-transfer')
req.method = 'GET'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual(len(res_dict['transfers'][0]), 4)
self.assertEqual(res_dict['transfers'][0]['id'], transfer1['id'])
self.assertEqual(res_dict['transfers'][0]['name'], 'test_transfer')
self.assertEqual(len(res_dict['transfers'][1]), 4)
self.assertEqual(res_dict['transfers'][1]['name'], 'test_transfer')
db.transfer_destroy(context.get_admin_context(), transfer2['id'])
db.transfer_destroy(context.get_admin_context(), transfer1['id'])
db.volume_destroy(context.get_admin_context(), volume_id_1)
db.volume_destroy(context.get_admin_context(), volume_id_2)
def test_list_transfers_xml(self):
volume_id_1 = self._create_volume(size=5)
volume_id_2 = self._create_volume(size=5)
transfer1 = self._create_transfer(volume_id_1)
transfer2 = self._create_transfer(volume_id_2)
req = webob.Request.blank('/v2/fake/os-volume-transfer')
req.method = 'GET'
req.headers['Content-Type'] = 'application/xml'
req.headers['Accept'] = 'application/xml'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
dom = minidom.parseString(res.body)
transfer_list = dom.getElementsByTagName('transfer')
self.assertEqual(transfer_list.item(0).attributes.length, 3)
self.assertEqual(transfer_list.item(0).getAttribute('id'),
transfer1['id'])
self.assertEqual(transfer_list.item(1).attributes.length, 3)
self.assertEqual(transfer_list.item(1).getAttribute('id'),
transfer2['id'])
db.transfer_destroy(context.get_admin_context(), transfer2['id'])
db.transfer_destroy(context.get_admin_context(), transfer1['id'])
db.volume_destroy(context.get_admin_context(), volume_id_2)
db.volume_destroy(context.get_admin_context(), volume_id_1)
def test_list_transfers_detail_json(self):
volume_id_1 = self._create_volume(size=5)
volume_id_2 = self._create_volume(size=5)
transfer1 = self._create_transfer(volume_id_1)
transfer2 = self._create_transfer(volume_id_2)
req = webob.Request.blank('/v2/fake/os-volume-transfer/detail')
req.method = 'GET'
req.headers['Content-Type'] = 'application/json'
req.headers['Accept'] = 'application/json'
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual(len(res_dict['transfers'][0]), 5)
self.assertEqual(res_dict['transfers'][0]['name'],
'test_transfer')
self.assertEqual(res_dict['transfers'][0]['id'], transfer1['id'])
self.assertEqual(res_dict['transfers'][0]['volume_id'], volume_id_1)
self.assertEqual(len(res_dict['transfers'][1]), 5)
self.assertEqual(res_dict['transfers'][1]['name'],
'test_transfer')
self.assertEqual(res_dict['transfers'][1]['id'], transfer2['id'])
self.assertEqual(res_dict['transfers'][1]['volume_id'], volume_id_2)
db.transfer_destroy(context.get_admin_context(), transfer2['id'])
db.transfer_destroy(context.get_admin_context(), transfer1['id'])
db.volume_destroy(context.get_admin_context(), volume_id_2)
db.volume_destroy(context.get_admin_context(), volume_id_1)
def test_list_transfers_detail_xml(self):
volume_id_1 = self._create_volume(size=5)
volume_id_2 = self._create_volume(size=5)
transfer1 = self._create_transfer(volume_id_1)
transfer2 = self._create_transfer(volume_id_2)
req = webob.Request.blank('/v2/fake/os-volume-transfer/detail')
req.method = 'GET'
req.headers['Content-Type'] = 'application/xml'
req.headers['Accept'] = 'application/xml'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
dom = minidom.parseString(res.body)
transfer_detail = dom.getElementsByTagName('transfer')
self.assertEqual(transfer_detail.item(0).attributes.length, 4)
self.assertEqual(
transfer_detail.item(0).getAttribute('name'), 'test_transfer')
self.assertEqual(
transfer_detail.item(0).getAttribute('id'), transfer1['id'])
self.assertEqual(transfer_detail.item(0).getAttribute('volume_id'),
volume_id_1)
self.assertEqual(transfer_detail.item(1).attributes.length, 4)
self.assertEqual(
transfer_detail.item(1).getAttribute('name'), 'test_transfer')
self.assertEqual(
transfer_detail.item(1).getAttribute('id'), transfer2['id'])
self.assertEqual(transfer_detail.item(1).getAttribute('volume_id'),
volume_id_2)
db.transfer_destroy(context.get_admin_context(), transfer2['id'])
db.transfer_destroy(context.get_admin_context(), transfer1['id'])
db.volume_destroy(context.get_admin_context(), volume_id_2)
db.volume_destroy(context.get_admin_context(), volume_id_1)
def test_create_transfer_json(self):
volume_id = self._create_volume(status='available', size=5)
body = {"transfer": {"display_name": "transfer1",
"volume_id": volume_id}}
req = webob.Request.blank('/v2/fake/os-volume-transfer')
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
LOG.info(res_dict)
self.assertEqual(res.status_int, 202)
self.assertTrue('id' in res_dict['transfer'])
self.assertTrue('auth_key' in res_dict['transfer'])
self.assertTrue('created_at' in res_dict['transfer'])
self.assertTrue('name' in res_dict['transfer'])
self.assertTrue('volume_id' in res_dict['transfer'])
db.volume_destroy(context.get_admin_context(), volume_id)
def test_create_transfer_xml(self):
volume_size = 2
volume_id = self._create_volume(status='available', size=volume_size)
req = webob.Request.blank('/v2/fake/os-volume-transfer')
req.body = ('<transfer display_name="transfer-001" '
'volume_id="%s"/>' % volume_id)
req.method = 'POST'
req.headers['Content-Type'] = 'application/xml'
req.headers['Accept'] = 'application/xml'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 202)
dom = minidom.parseString(res.body)
transfer = dom.getElementsByTagName('transfer')
self.assertTrue(transfer.item(0).hasAttribute('id'))
self.assertTrue(transfer.item(0).hasAttribute('auth_key'))
self.assertTrue(transfer.item(0).hasAttribute('created_at'))
self.assertTrue(transfer.item(0).hasAttribute('name'))
self.assertTrue(transfer.item(0).hasAttribute('volume_id'))
db.volume_destroy(context.get_admin_context(), volume_id)
def test_create_transfer_with_no_body(self):
req = webob.Request.blank('/v2/fake/os-volume-transfer')
req.body = json.dumps(None)
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.headers['Accept'] = 'application/json'
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 400)
self.assertEqual(res_dict['badRequest']['code'], 400)
self.assertEqual(res_dict['badRequest']['message'],
'The server could not comply with the request since'
' it is either malformed or otherwise incorrect.')
def test_create_transfer_with_body_KeyError(self):
body = {"transfer": {"display_name": "transfer1"}}
req = webob.Request.blank('/v2/fake/os-volume-transfer')
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 400)
self.assertEqual(res_dict['badRequest']['code'], 400)
self.assertEqual(res_dict['badRequest']['message'],
'Incorrect request body format')
def test_create_transfer_with_VolumeNotFound(self):
body = {"transfer": {"display_name": "transfer1",
"volume_id": 1234}}
req = webob.Request.blank('/v2/fake/os-volume-transfer')
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 404)
self.assertEqual(res_dict['itemNotFound']['code'], 404)
self.assertEqual(res_dict['itemNotFound']['message'],
'Volume 1234 could not be found.')
def test_create_transfer_with_InvalidVolume(self):
volume_id = self._create_volume(status='attached')
body = {"transfer": {"display_name": "transfer1",
"volume_id": volume_id}}
req = webob.Request.blank('/v2/fake/os-volume-transfer')
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 400)
self.assertEqual(res_dict['badRequest']['code'], 400)
self.assertEqual(res_dict['badRequest']['message'],
'Invalid volume: status must be available')
db.volume_destroy(context.get_admin_context(), volume_id)
def test_delete_transfer_awaiting_transfer(self):
volume_id = self._create_volume()
transfer = self._create_transfer(volume_id)
req = webob.Request.blank('/v2/fake/os-volume-transfer/%s' %
transfer['id'])
req.method = 'DELETE'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 202)
# verify transfer has been deleted
req = webob.Request.blank('/v2/fake/os-volume-transfer/%s' %
transfer['id'])
req.method = 'GET'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 404)
self.assertEqual(res_dict['itemNotFound']['code'], 404)
self.assertEqual(res_dict['itemNotFound']['message'],
'Transfer %s could not be found.' % transfer['id'])
self.assertEqual(db.volume_get(context.get_admin_context(),
volume_id)['status'], 'available')
db.volume_destroy(context.get_admin_context(), volume_id)
def test_delete_transfer_with_transfer_NotFound(self):
req = webob.Request.blank('/v2/fake/os-volume-transfer/9999')
req.method = 'DELETE'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 404)
self.assertEqual(res_dict['itemNotFound']['code'], 404)
self.assertEqual(res_dict['itemNotFound']['message'],
'Transfer 9999 could not be found.')
def test_accept_transfer_volume_id_specified_json(self):
volume_id = self._create_volume()
transfer = self._create_transfer(volume_id)
body = {"accept": {"id": transfer['id'],
"auth_key": transfer['auth_key']}}
req = webob.Request.blank('/v2/fake/os-volume-transfer/%s/accept' %
transfer['id'])
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 202)
self.assertEqual(res_dict['transfer']['id'], transfer['id'])
self.assertEqual(res_dict['transfer']['volume_id'], volume_id)
def test_accept_transfer_volume_id_specified_xml(self):
volume_id = self._create_volume(size=5)
transfer = self._create_transfer(volume_id)
req = webob.Request.blank('/v2/fake/os-volume-transfer/%s/accept' %
transfer['id'])
req.body = '<accept auth_key="%s"/>' % transfer['auth_key']
req.method = 'POST'
req.headers['Content-Type'] = 'application/xml'
req.headers['Accept'] = 'application/xml'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 202)
dom = minidom.parseString(res.body)
accept = dom.getElementsByTagName('transfer')
self.assertEqual(accept.item(0).getAttribute('id'),
transfer['id'])
self.assertEqual(accept.item(0).getAttribute('volume_id'), volume_id)
db.volume_destroy(context.get_admin_context(), volume_id)
def test_accept_transfer_with_no_body(self):
volume_id = self._create_volume(size=5)
transfer = self._create_transfer(volume_id)
req = webob.Request.blank('/v2/fake/os-volume-transfer/%s/accept' %
transfer['id'])
req.body = json.dumps(None)
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.headers['Accept'] = 'application/json'
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 400)
self.assertEqual(res_dict['badRequest']['code'], 400)
self.assertEqual(res_dict['badRequest']['message'],
'The server could not comply with the request since'
' it is either malformed or otherwise incorrect.')
db.volume_destroy(context.get_admin_context(), volume_id)
def test_accept_transfer_with_body_KeyError(self):
volume_id = self._create_volume(size=5)
transfer = self._create_transfer(volume_id)
req = webob.Request.blank('/v2/fake/os-volume-transfer/%s/accept' %
transfer['id'])
body = {"": {}}
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.headers['Accept'] = 'application/json'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 400)
self.assertEqual(res_dict['badRequest']['code'], 400)
self.assertEqual(res_dict['badRequest']['message'],
'The server could not comply with the request since'
' it is either malformed or otherwise incorrect.')
def test_accept_transfer_invalid_id_auth_key(self):
volume_id = self._create_volume()
transfer = self._create_transfer(volume_id)
body = {"accept": {"id": transfer['id'],
"auth_key": 1}}
req = webob.Request.blank('/v2/fake/os-volume-transfer/%s/accept' %
transfer['id'])
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 400)
self.assertEqual(res_dict['badRequest']['code'], 400)
self.assertEqual(res_dict['badRequest']['message'],
'Invalid auth key: Attempt to transfer %s with '
'invalid auth key.' % transfer['id'])
db.transfer_destroy(context.get_admin_context(), transfer['id'])
db.volume_destroy(context.get_admin_context(), volume_id)
def test_accept_transfer_with_invalid_transfer(self):
volume_id = self._create_volume()
transfer = self._create_transfer(volume_id)
body = {"accept": {"id": transfer['id'],
"auth_key": 1}}
req = webob.Request.blank('/v2/fake/os-volume-transfer/1/accept')
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 404)
self.assertEqual(res_dict['itemNotFound']['code'], 404)
self.assertEqual(res_dict['itemNotFound']['message'],
'TransferNotFound: Transfer 1 could not be found.')
db.transfer_destroy(context.get_admin_context(), transfer['id'])
db.volume_destroy(context.get_admin_context(), volume_id)
def test_accept_transfer_with_VolumeSizeExceedsAvailableQuota(self):
def fake_transfer_api_accept_throwing_VolumeSizeExceedsAvailableQuota(
cls, context, transfer, volume_id):
raise exception.VolumeSizeExceedsAvailableQuota()
self.stubs.Set(
cinder.transfer.API,
'accept',
fake_transfer_api_accept_throwing_VolumeSizeExceedsAvailableQuota)
volume_id = self._create_volume()
transfer = self._create_transfer(volume_id)
body = {"accept": {"id": transfer['id'],
"auth_key": transfer['auth_key']}}
req = webob.Request.blank('/v2/fake/os-volume-transfer/%s/accept' %
transfer['id'])
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 413)
self.assertEqual(res_dict['overLimit']['code'], 413)
self.assertEqual(res_dict['overLimit']['message'],
'Requested volume or snapshot exceeds allowed '
'Gigabytes quota')
def test_accept_transfer_with_VolumeLimitExceeded(self):
def fake_transfer_api_accept_throwing_VolumeLimitExceeded(cls,
context,
transfer,
volume_id):
raise exception.VolumeLimitExceeded(allowed=1)
self.stubs.Set(cinder.transfer.API, 'accept',
fake_transfer_api_accept_throwing_VolumeLimitExceeded)
volume_id = self._create_volume()
transfer = self._create_transfer(volume_id)
body = {"accept": {"id": transfer['id'],
"auth_key": transfer['auth_key']}}
req = webob.Request.blank('/v2/fake/os-volume-transfer/%s/accept' %
transfer['id'])
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 413)
self.assertEqual(res_dict['overLimit']['code'], 413)
self.assertEqual(res_dict['overLimit']['message'],
'VolumeLimitExceeded: Maximum number of volumes '
'allowed (1) exceeded')

View File

@ -0,0 +1,140 @@
# Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for transfers table."""
from cinder import context
from cinder import db
from cinder import exception
from cinder.openstack.common import log as logging
from cinder import test
LOG = logging.getLogger(__name__)
class TransfersTableTestCase(test.TestCase):
"""Test case for transfers model."""
def setUp(self):
super(TransfersTableTestCase, self).setUp()
self.ctxt = context.RequestContext(user_id='user_id',
project_id='project_id')
def tearDown(self):
super(TransfersTableTestCase, self).tearDown()
def _create_volume(self,
display_name='test_volume',
display_description='this is a test volume',
status='available',
size=1):
"""Create a volume object."""
vol = {}
vol['size'] = size
vol['user_id'] = self.ctxt.user_id
vol['project_id'] = self.ctxt.project_id
vol['status'] = status
vol['display_name'] = display_name
vol['display_description'] = display_description
vol['attach_status'] = 'detached'
return db.volume_create(self.ctxt, vol)['id']
def _create_transfer(self, volume_id=None):
"""Create a transfer object."""
transfer = {'display_name': 'display_name',
'salt': 'salt',
'crypt_hash': 'crypt_hash'}
if volume_id is not None:
transfer['volume_id'] = volume_id
return db.transfer_create(self.ctxt, transfer)['id']
def test_transfer_create(self):
# If the volume_id is Null a KeyError exception will be raised.
self.assertRaises(KeyError,
self._create_transfer)
volume_id = self._create_volume(size=1)
self._create_transfer(volume_id)
def test_transfer_get(self):
volume_id1 = self._create_volume(size=1)
xfer_id1 = self._create_transfer(volume_id1)
xfer = db.transfer_get(self.ctxt, xfer_id1)
self.assertEquals(xfer.volume_id, volume_id1, "Unexpected volume_id")
nctxt = context.RequestContext(user_id='new_user_id',
project_id='new_project_id')
self.assertRaises(exception.TransferNotFound,
db.transfer_get, nctxt, xfer_id1)
xfer = db.transfer_get(nctxt.elevated(), xfer_id1)
self.assertEquals(xfer.volume_id, volume_id1, "Unexpected volume_id")
def test_transfer_get_all(self):
volume_id1 = self._create_volume(size=1)
volume_id2 = self._create_volume(size=1)
self._create_transfer(volume_id1)
self._create_transfer(volume_id2)
self.assertRaises(exception.NotAuthorized,
db.transfer_get_all,
self.ctxt)
xfer = db.transfer_get_all(context.get_admin_context())
self.assertEquals(len(xfer), 2,
"Unexpected number of transfer records")
xfer = db.transfer_get_all_by_project(self.ctxt, self.ctxt.project_id)
self.assertEquals(len(xfer), 2,
"Unexpected number of transfer records")
nctxt = context.RequestContext(user_id='new_user_id',
project_id='new_project_id')
self.assertRaises(exception.NotAuthorized,
db.transfer_get_all_by_project,
nctxt, self.ctxt.project_id)
xfer = db.transfer_get_all_by_project(nctxt.elevated(),
self.ctxt.project_id)
self.assertEquals(len(xfer), 2,
"Unexpected number of transfer records")
def test_transfer_destroy(self):
volume_id = self._create_volume(size=1)
volume_id2 = self._create_volume(size=1)
xfer_id1 = self._create_transfer(volume_id)
xfer_id2 = self._create_transfer(volume_id2)
xfer = db.transfer_get_all(context.get_admin_context())
self.assertEquals(len(xfer), 2,
"Unexpected number of transfer records")
self.assertFalse(xfer[0]['deleted'], "Deleted flag is set")
db.transfer_destroy(self.ctxt, xfer_id1)
xfer = db.transfer_get_all(context.get_admin_context())
self.assertEquals(len(xfer), 1,
"Unexpected number of transfer records")
self.assertEquals(xfer[0]['id'], xfer_id2,
"Unexpected value for Transfer id")
nctxt = context.RequestContext(user_id='new_user_id',
project_id='new_project_id')
self.assertRaises(exception.TransferNotFound,
db.transfer_destroy, nctxt, xfer_id2)
db.transfer_destroy(nctxt.elevated(), xfer_id2)
xfer = db.transfer_get_all(context.get_admin_context())
self.assertEquals(len(xfer), 0,
"Unexpected number of transfer records")

View File

@ -628,3 +628,47 @@ class TestMigrations(test.TestCase):
self.assertFalse(engine.dialect.has_table(engine.connect(),
"snapshot_metadata"))
def test_migration_010(self):
"""Test adding transfers table works correctly."""
for (key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.INIT_VERSION)
migration_api.upgrade(engine, TestMigrations.REPOSITORY, 9)
metadata = sqlalchemy.schema.MetaData()
metadata.bind = engine
migration_api.upgrade(engine, TestMigrations.REPOSITORY, 10)
self.assertTrue(engine.dialect.has_table(engine.connect(),
"transfers"))
transfers = sqlalchemy.Table('transfers',
metadata,
autoload=True)
self.assertTrue(isinstance(transfers.c.created_at.type,
sqlalchemy.types.DATETIME))
self.assertTrue(isinstance(transfers.c.updated_at.type,
sqlalchemy.types.DATETIME))
self.assertTrue(isinstance(transfers.c.deleted_at.type,
sqlalchemy.types.DATETIME))
self.assertTrue(isinstance(transfers.c.deleted.type,
sqlalchemy.types.BOOLEAN))
self.assertTrue(isinstance(transfers.c.id.type,
sqlalchemy.types.VARCHAR))
self.assertTrue(isinstance(transfers.c.volume_id.type,
sqlalchemy.types.VARCHAR))
self.assertTrue(isinstance(transfers.c.display_name.type,
sqlalchemy.types.VARCHAR))
self.assertTrue(isinstance(transfers.c.salt.type,
sqlalchemy.types.VARCHAR))
self.assertTrue(isinstance(transfers.c.crypt_hash.type,
sqlalchemy.types.VARCHAR))
self.assertTrue(isinstance(transfers.c.expires_at.type,
sqlalchemy.types.DATETIME))
migration_api.downgrade(engine, TestMigrations.REPOSITORY, 9)
self.assertFalse(engine.dialect.has_table(engine.connect(),
"transfers"))

View File

@ -0,0 +1,133 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 OpenStack LLC.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Unit Tests for volume transfers."""
import datetime
from cinder import context
from cinder import db
from cinder import exception
from cinder import flags
from cinder.openstack.common import log as logging
from cinder import test
from cinder.transfer import api as transfer_api
from cinder.volume import api as cinder_api
LOG = logging.getLogger(__name__)
class VolumeTransferTestCase(test.TestCase):
"""Test cases for volume type code."""
def setUp(self):
super(VolumeTransferTestCase, self).setUp()
self.ctxt = context.RequestContext(user_id='user_id',
project_id='project_id')
def _create_volume(self, volume_id, status='available',
user_id=None, project_id=None):
if user_id is None:
user_id = self.ctxt.user_id
if project_id is None:
project_id = self.ctxt.project_id
vol = {'id': volume_id,
'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
'user_id': user_id,
'project_id': project_id,
'display_name': 'Display Name',
'display_description': 'Display Description',
'size': 1,
'status': status}
volume = db.volume_create(self.ctxt, vol)
return volume
def test_transfer_volume_create_delete(self):
tx_api = transfer_api.API()
volume = self._create_volume('1')
response = tx_api.create(self.ctxt, '1', 'Description')
volume = db.volume_get(self.ctxt, '1')
self.assertEquals('awaiting-transfer', volume['status'],
'Unexpected state')
tx_api.delete(self.ctxt, response['id'])
volume = db.volume_get(self.ctxt, '1')
self.assertEquals('available', volume['status'],
'Unexpected state')
def test_transfer_invalid_volume(self):
tx_api = transfer_api.API()
volume = self._create_volume('1', status='in-use')
self.assertRaises(exception.InvalidVolume,
tx_api.create,
self.ctxt, '1', 'Description')
volume = db.volume_get(self.ctxt, '1')
self.assertEquals('in-use', volume['status'],
'Unexpected state')
def test_transfer_accept(self):
tx_api = transfer_api.API()
volume = self._create_volume('1')
transfer = tx_api.create(self.ctxt, '1', 'Description')
volume = db.volume_get(self.ctxt, '1')
self.assertEquals('awaiting-transfer', volume['status'],
'Unexpected state')
self.assertRaises(exception.TransferNotFound,
tx_api.accept,
self.ctxt, '2', transfer['auth_key'])
self.assertRaises(exception.InvalidAuthKey,
tx_api.accept,
self.ctxt, transfer['id'], 'wrong')
db.volume_update(self.ctxt, '1', {'status': 'wrong'})
self.assertRaises(exception.InvalidVolume,
tx_api.accept,
self.ctxt, transfer['id'], transfer['auth_key'])
db.volume_update(self.ctxt, '1', {'status': 'awaiting-transfer'})
self.ctxt.user_id = 'new_user_id'
self.ctxt.project_id = 'new_project_id'
response = tx_api.accept(self.ctxt,
transfer['id'],
transfer['auth_key'])
volume = db.volume_get(self.ctxt, '1')
self.assertEquals(volume['project_id'], 'new_project_id',
'Unexpected project id')
self.assertEquals(volume['user_id'], 'new_user_id',
'Unexpected user id')
self.assertEquals(volume['id'], response['volume_id'],
'Unexpected volume id in response.')
self.assertEquals(transfer['id'], response['id'],
'Unexpected transfer id in response.')
def test_transfer_get(self):
tx_api = transfer_api.API()
volume = self._create_volume('1')
transfer = tx_api.create(self.ctxt, volume['id'], 'Description')
t = tx_api.get(self.ctxt, transfer['id'])
self.assertEquals(t['id'], transfer['id'], 'Unexpected transfer id')
ts = tx_api.get_all(self.ctxt)
self.assertEquals(len(ts), 1, 'Unexpected number of transfers.')
nctxt = context.RequestContext(user_id='new_user_id',
project_id='new_project_id')
self.assertRaises(exception.TransferNotFound,
tx_api.get,
nctxt,
transfer['id'])
ts = tx_api.get_all(nctxt)
self.assertEquals(len(ts), 0, 'Unexpected transfers listed.')

View File

@ -0,0 +1,23 @@
# Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Importing full names to not pollute the namespace and cause possible
# collisions with use of 'from cinder.transfer import <foo>' elsewhere.
import cinder.flags
import cinder.openstack.common.importutils
API = cinder.openstack.common.importutils.import_class(
cinder.flags.FLAGS.transfer_api_class)

199
cinder/transfer/api.py Normal file
View File

@ -0,0 +1,199 @@
# Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Handles all requests relating to transferring ownership of volumes.
"""
import datetime
import hashlib
import hmac
import random
from oslo.config import cfg
from cinder.db import base
from cinder import exception
from cinder import flags
from cinder.openstack.common import log as logging
from cinder import quota
from cinder.volume import api as volume_api
volume_transfer_opts = [
cfg.IntOpt('volume_transfer_salt_length', default=8,
help='The number of characters in the salt.'),
cfg.IntOpt('volume_transfer_key_length', default=16,
help='The number of characters in the '
'autogenerated auth key.'), ]
FLAGS = flags.FLAGS
FLAGS.register_opts(volume_transfer_opts)
LOG = logging.getLogger(__name__)
QUOTAS = quota.QUOTAS
class API(base.Base):
"""API for interacting volume transfers."""
def __init__(self, db_driver=None):
self.volume_api = volume_api.API()
super(API, self).__init__(db_driver)
def get(self, context, transfer_id):
rv = self.db.transfer_get(context, transfer_id)
return dict(rv.iteritems())
def delete(self, context, transfer_id):
"""
Make the RPC call to delete a volume transfer.
"""
transfer = self.db.transfer_get(context, transfer_id)
volume_ref = self.db.volume_get(context, transfer.volume_id)
if volume_ref['status'] != 'awaiting-transfer':
msg = _("Volume in unexpected state")
LOG.error(msg)
self.db.transfer_destroy(context, transfer_id)
def get_all(self, context, filters={}):
if context.is_admin and 'all_tenants' in filters:
transfers = self.db.transfer_get_all(context)
else:
transfers = self.db.transfer_get_all_by_project(context,
context.project_id)
return transfers
def _get_random_string(self, length):
"""Get a random hex string of the specified length."""
rndstr = ""
random.seed(datetime.datetime.now().microsecond)
while len(rndstr) < length:
rndstr += hashlib.sha224(str(random.random())).hexdigest()
return rndstr[0:length]
def _get_crypt_hash(self, salt, auth_key):
"""Generate a random hash based on the salt and the auth key."""
return hmac.new(str(salt),
str(auth_key),
hashlib.sha1).hexdigest()
def create(self, context, volume_id, display_name):
"""Creates an entry in the transfers table."""
LOG.info("Generating transfer record for volume %s" % volume_id)
volume_ref = self.db.volume_get(context, volume_id)
if volume_ref['status'] != "available":
raise exception.InvalidVolume(reason=_("status must be available"))
# The salt is just a short random string.
salt = self._get_random_string(FLAGS.volume_transfer_salt_length)
auth_key = self._get_random_string(FLAGS.volume_transfer_key_length)
crypt_hash = self._get_crypt_hash(salt, auth_key)
# TODO(ollie): Transfer expiry needs to be implemented.
transfer_rec = {'volume_id': volume_id,
'display_name': display_name,
'salt': salt,
'crypt_hash': crypt_hash,
'expires_at': None}
try:
transfer = self.db.transfer_create(context, transfer_rec)
except Exception:
LOG.error(_("Failed to create transfer record for %s") % volume_id)
raise
return {'id': transfer['id'],
'volume_id': transfer['volume_id'],
'display_name': transfer['display_name'],
'auth_key': auth_key,
'created_at': transfer['created_at']}
def accept(self, context, transfer_id, auth_key):
"""Accept a volume that has been offered for transfer."""
# We must use an elevated context to see the volume that is still
# owned by the donor.
transfer = self.db.transfer_get(context.elevated(), transfer_id)
crypt_hash = self._get_crypt_hash(transfer['salt'], auth_key)
if crypt_hash != transfer['crypt_hash']:
msg = (_("Attempt to transfer %s with invalid auth key.") %
transfer_id)
LOG.error(msg)
raise exception.InvalidAuthKey(reason=msg)
volume_id = transfer['volume_id']
vol_ref = self.db.volume_get(context.elevated(), volume_id)
try:
reservations = QUOTAS.reserve(context, volumes=1,
gigabytes=vol_ref['size'])
except exception.OverQuota as e:
overs = e.kwargs['overs']
usages = e.kwargs['usages']
quotas = e.kwargs['quotas']
def _consumed(name):
return (usages[name]['reserved'] + usages[name]['in_use'])
if 'gigabytes' in overs:
msg = _("Quota exceeded for %(s_pid)s, tried to create "
"%(s_size)sG volume (%(d_consumed)dG of %(d_quota)dG "
"already consumed)")
LOG.warn(msg % {'s_pid': context.project_id,
's_size': vol_ref['size'],
'd_consumed': _consumed('gigabytes'),
'd_quota': quotas['gigabytes']})
raise exception.VolumeSizeExceedsAvailableQuota()
elif 'volumes' in overs:
msg = _("Quota exceeded for %(s_pid)s, tried to create "
"volume (%(d_consumed)d volumes "
"already consumed)")
LOG.warn(msg % {'s_pid': context.project_id,
'd_consumed': _consumed('volumes')})
raise exception.VolumeLimitExceeded(allowed=quotas['volumes'])
try:
donor_id = vol_ref['project_id']
donor_reservations = QUOTAS.reserve(context,
project_id=donor_id,
volumes=-1,
gigabytes=-vol_ref['size'])
except Exception:
donor_reservations = None
LOG.exception(_("Failed to update quota donating volume"
"transfer id %s") % transfer_id)
try:
# Transfer ownership of the volume now, must use an elevated
# context.
self.db.transfer_accept(context.elevated(),
transfer_id,
context.user_id,
context.project_id)
QUOTAS.commit(context, reservations)
if donor_reservations:
QUOTAS.commit(context, donor_reservations, project_id=donor_id)
LOG.info(_("Volume %s has been transferred.") % volume_id)
except Exception as exc:
QUOTAS.rollback(context, reservations)
if donor_reservations:
QUOTAS.rollback(context, donor_reservations,
project_id=donor_id)
raise exc
vol_ref = self.db.volume_get(context, volume_id)
return {'id': transfer_id,
'display_name': transfer['display_name'],
'volume_id': vol_ref['id']}