Volume RPC API Versioning

Add versioning to Volume Rpc API version.  This is initial version
1.0, which is compatible with previous non-versioned RPC API.

Note: this patch slightly change the db.volume_update() behavior,
which now returns updated volume info.

Change-Id: I78036b6ed97c5bc369d8c85307ecaaad8e31ff90
This commit is contained in:
Zhiteng Huang
2012-11-10 01:32:22 +08:00
parent 4206f0db05
commit 2940ce438f
9 changed files with 324 additions and 194 deletions

View File

@@ -1046,6 +1046,7 @@ def volume_update(context, volume_id, values):
volume_ref = volume_get(context, volume_id, session=session)
volume_ref.update(values)
volume_ref.save(session=session)
return volume_ref
####################

View File

@@ -67,7 +67,6 @@ class ChanceScheduler(driver.Scheduler):
snapshot_id = request_spec['snapshot_id']
image_id = request_spec['image_id']
driver.cast_to_host(context, topic, host, 'create_volume',
volume_id=volume_id,
snapshot_id=snapshot_id,
image_id=image_id)
updated_volume = driver.volume_update_db(context, volume_id, host)
self.volume_rpcapi.create_volume(context, updated_volume, host,
snapshot_id, image_id)

View File

@@ -25,14 +25,11 @@ from cinder import db
from cinder import flags
from cinder.openstack.common import cfg
from cinder.openstack.common import importutils
from cinder.openstack.common import log as logging
from cinder.openstack.common import rpc
from cinder.openstack.common import timeutils
from cinder import utils
from cinder.volume import rpcapi as volume_rpcapi
LOG = logging.getLogger(__name__)
scheduler_driver_opts = [
cfg.StrOpt('scheduler_host_manager',
default='cinder.scheduler.host_manager.HostManager',
@@ -43,36 +40,14 @@ FLAGS = flags.FLAGS
FLAGS.register_opts(scheduler_driver_opts)
def cast_to_volume_host(context, host, method, update_db=True, **kwargs):
"""Cast request to a volume host queue"""
def volume_update_db(context, volume_id, host):
'''Set the host and set the scheduled_at field of a volume.
if update_db:
volume_id = kwargs.get('volume_id', None)
if volume_id is not None:
:returns: A Volume with the updated fields set properly.
'''
now = timeutils.utcnow()
db.volume_update(context, volume_id,
{'host': host, 'scheduled_at': now})
rpc.cast(context,
rpc.queue_get_for(context, FLAGS.volume_topic, host),
{"method": method, "args": kwargs})
LOG.debug(_("Casted '%(method)s' to host '%(host)s'") % locals())
def cast_to_host(context, topic, host, method, update_db=True, **kwargs):
"""Generic cast to host"""
topic_mapping = {
"volume": cast_to_volume_host}
func = topic_mapping.get(topic)
if func:
func(context, host, method, update_db=update_db, **kwargs)
else:
rpc.cast(context,
rpc.queue_get_for(context, topic, host),
{"method": method, "args": kwargs})
LOG.debug(_("Casted '%(method)s' to %(topic)s '%(host)s'")
% locals())
values = {'host': host, 'scheduled_at': now}
return db.volume_update(context, volume_id, values)
class Scheduler(object):
@@ -81,6 +56,7 @@ class Scheduler(object):
def __init__(self):
self.host_manager = importutils.import_object(
FLAGS.scheduler_host_manager)
self.volume_rpcapi = volume_rpcapi.VolumeAPI()
def get_host_list(self):
"""Get a list of hosts from the HostManager."""

View File

@@ -62,9 +62,11 @@ class SimpleScheduler(chance.ChanceScheduler):
service = db.service_get_by_args(elevated, host, topic)
if not utils.service_is_up(service):
raise exception.WillNotSchedule(host=host)
driver.cast_to_volume_host(context, host, 'create_volume',
volume_id=volume_id, snapshot_id=snapshot_id,
image_id=image_id)
updated_volume = driver.volume_update_db(context, volume_id, host)
self.volume_rpcapi.create_volume(context, updated_volume,
host,
snapshot_id,
image_id)
return None
results = db.service_get_all_volume_sorted(elevated)
@@ -77,9 +79,12 @@ class SimpleScheduler(chance.ChanceScheduler):
msg = _("Not enough allocatable volume gigabytes remaining")
raise exception.NoValidHost(reason=msg)
if utils.service_is_up(service) and not service['disabled']:
driver.cast_to_volume_host(context, service['host'],
'create_volume', volume_id=volume_id,
snapshot_id=snapshot_id, image_id=image_id)
updated_volume = driver.volume_update_db(context, volume_id,
service['host'])
self.volume_rpcapi.create_volume(context, updated_volume,
service['host'],
snapshot_id,
image_id)
return None
msg = _("Is the appropriate service running?")
raise exception.NoValidHost(reason=msg)

View File

@@ -24,13 +24,13 @@ from cinder import context
from cinder import db
from cinder import exception
from cinder import flags
from cinder.openstack.common import rpc
from cinder.openstack.common import timeutils
from cinder.scheduler import driver
from cinder.scheduler import manager
from cinder import test
from cinder import utils
FLAGS = flags.FLAGS
@@ -179,97 +179,13 @@ class SchedulerDriverModuleTestCase(test.TestCase):
super(SchedulerDriverModuleTestCase, self).setUp()
self.context = context.RequestContext('fake_user', 'fake_project')
def test_cast_to_volume_host_update_db_with_volume_id(self):
host = 'fake_host1'
method = 'fake_method'
fake_kwargs = {'volume_id': 31337,
'extra_arg': 'meow'}
queue = 'fake_queue'
def test_volume_host_update_db(self):
self.mox.StubOutWithMock(timeutils, 'utcnow')
self.mox.StubOutWithMock(db, 'volume_update')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
timeutils.utcnow().AndReturn('fake-now')
db.volume_update(self.context, 31337,
{'host': host, 'scheduled_at': 'fake-now'})
rpc.queue_get_for(self.context,
FLAGS.volume_topic, host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
{'host': 'fake_host', 'scheduled_at': 'fake-now'})
self.mox.ReplayAll()
driver.cast_to_volume_host(self.context, host, method,
update_db=True, **fake_kwargs)
def test_cast_to_volume_host_update_db_without_volume_id(self):
host = 'fake_host1'
method = 'fake_method'
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
rpc.queue_get_for(self.context,
FLAGS.volume_topic, host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
self.mox.ReplayAll()
driver.cast_to_volume_host(self.context, host, method,
update_db=True, **fake_kwargs)
def test_cast_to_volume_host_no_update_db(self):
host = 'fake_host1'
method = 'fake_method'
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
rpc.queue_get_for(self.context,
FLAGS.volume_topic, host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
self.mox.ReplayAll()
driver.cast_to_volume_host(self.context, host, method,
update_db=False, **fake_kwargs)
def test_cast_to_host_volume_topic(self):
host = 'fake_host1'
method = 'fake_method'
fake_kwargs = {'extra_arg': 'meow'}
self.mox.StubOutWithMock(driver, 'cast_to_volume_host')
driver.cast_to_volume_host(self.context, host, method,
update_db=False, **fake_kwargs)
self.mox.ReplayAll()
driver.cast_to_host(self.context, 'volume', host, method,
update_db=False, **fake_kwargs)
def test_cast_to_host_unknown_topic(self):
host = 'fake_host1'
method = 'fake_method'
fake_kwargs = {'extra_arg': 'meow'}
topic = 'unknown'
queue = 'fake_queue'
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
rpc.queue_get_for(self.context, topic, host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
self.mox.ReplayAll()
driver.cast_to_host(self.context, topic, host, method,
update_db=False, **fake_kwargs)
driver.volume_update_db(self.context, 31337, 'fake_host')

View File

@@ -0,0 +1,164 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012, Intel, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for cinder.volume.rpcapi
"""
from cinder import context
from cinder import db
from cinder import flags
from cinder.openstack.common import jsonutils
from cinder.openstack.common import rpc
from cinder import test
from cinder.volume import rpcapi as volume_rpcapi
FLAGS = flags.FLAGS
class VolumeRpcAPITestCase(test.TestCase):
def setUp(self):
self.context = context.get_admin_context()
vol = {}
vol['host'] = 'fake_host'
vol['availability_zone'] = FLAGS.storage_availability_zone
vol['status'] = "available"
vol['attach_status'] = "detached"
volume = db.volume_create(self.context, vol)
snpshot = {
'volume_id': 'fake_id',
'status': "creating",
'progress': '0%',
'volume_size': 0,
'display_name': 'fake_name',
'display_description': 'fake_description'}
snapshot = db.snapshot_create(self.context, snpshot)
self.fake_volume = jsonutils.to_primitive(volume)
self.fake_snapshot = jsonutils.to_primitive(snapshot)
super(VolumeRpcAPITestCase, self).setUp()
def test_serialized_volume_has_id(self):
self.assertTrue('id' in self.fake_volume)
def _test_volume_api(self, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
if 'rpcapi_class' in kwargs:
rpcapi_class = kwargs['rpcapi_class']
del kwargs['rpcapi_class']
else:
rpcapi_class = volume_rpcapi.VolumeAPI
rpcapi = rpcapi_class()
expected_retval = 'foo' if method == 'call' else None
expected_version = kwargs.pop('version', rpcapi.BASE_RPC_API_VERSION)
expected_msg = rpcapi.make_msg(method, **kwargs)
if 'volume' in expected_msg['args']:
volume = expected_msg['args']['volume']
del expected_msg['args']['volume']
expected_msg['args']['volume_id'] = volume['id']
if 'snapshot' in expected_msg['args']:
snapshot = expected_msg['args']['snapshot']
del expected_msg['args']['snapshot']
expected_msg['args']['snapshot_id'] = snapshot['id']
if 'host' in expected_msg['args']:
del expected_msg['args']['host']
expected_msg['version'] = expected_version
if 'host' in kwargs:
host = kwargs['host']
else:
host = kwargs['volume']['host']
expected_topic = '%s.%s' % (FLAGS.volume_topic, host)
self.fake_args = None
self.fake_kwargs = None
def _fake_rpc_method(*args, **kwargs):
self.fake_args = args
self.fake_kwargs = kwargs
if expected_retval:
return expected_retval
self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, expected_topic, expected_msg]
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
def test_create_volume(self):
self._test_volume_api('create_volume',
rpc_method='cast',
volume=self.fake_volume,
host='fake_host1',
snapshot_id='fake_snapshot_id',
image_id='fake_image_id')
def test_delete_volume(self):
self._test_volume_api('delete_volume',
rpc_method='cast',
volume=self.fake_volume)
def test_create_snapshot(self):
self._test_volume_api('create_snapshot',
rpc_method='cast',
volume=self.fake_volume,
snapshot=self.fake_snapshot)
def test_delete_snapshot(self):
self._test_volume_api('delete_snapshot',
rpc_method='cast',
snapshot=self.fake_snapshot,
host='fake_host')
def test_attach_volume(self):
self._test_volume_api('attach_volume',
rpc_method='call',
volume=self.fake_volume,
instance_uuid='fake_uuid',
mountpoint='fake_mountpoint')
def test_detach_volume(self):
self._test_volume_api('detach_volume',
rpc_method='call',
volume=self.fake_volume)
def test_copy_volume_to_image(self):
self._test_volume_api('copy_volume_to_image',
rpc_method='cast',
volume=self.fake_volume,
image_id='fake_image_id')
def test_initialize_connection(self):
self._test_volume_api('initialize_connection',
rpc_method='call',
volume=self.fake_volume,
connector='fake_connector')
def test_terminate_connection(self):
self._test_volume_api('terminate_connection',
rpc_method='call',
volume=self.fake_volume,
connector='fake_connector',
force=False)

View File

@@ -34,7 +34,7 @@ from cinder.openstack.common import timeutils
import cinder.policy
from cinder import quota
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder.volume import volume_types
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import volume_types
volume_host_opt = cfg.BoolOpt('snapshot_same_host',
@@ -81,6 +81,7 @@ class API(base.Base):
self.image_service = (image_service or
glance.get_default_image_service())
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
self.volume_rpcapi = volume_rpcapi.VolumeAPI()
super(API, self).__init__(db_driver)
def create(self, context, size, name, description, snapshot=None,
@@ -207,16 +208,12 @@ class API(base.Base):
snapshot_ref = self.db.snapshot_get(context, snapshot_id)
src_volume_ref = self.db.volume_get(context,
snapshot_ref['volume_id'])
topic = rpc.queue_get_for(context,
FLAGS.volume_topic,
src_volume_ref['host'])
# bypass scheduler and send request directly to volume
rpc.cast(context,
topic,
{"method": "create_volume",
"args": {"volume_id": volume_id,
"snapshot_id": snapshot_id,
"image_id": image_id}})
self.volume_rpcapi.create_volume(context,
src_volume_ref,
src_volume_ref['host'],
snapshot_id,
image_id)
else:
self.scheduler_rpcapi.create_volume(context,
FLAGS.volume_topic,
@@ -256,11 +253,8 @@ class API(base.Base):
now = timeutils.utcnow()
self.db.volume_update(context, volume_id, {'status': 'deleting',
'terminated_at': now})
host = volume['host']
rpc.cast(context,
rpc.queue_get_for(context, FLAGS.volume_topic, host),
{"method": "delete_volume",
"args": {"volume_id": volume_id}})
self.volume_rpcapi.delete_volume(context, volume)
@wrap_check_policy
def update(self, context, volume, fields):
@@ -388,40 +382,28 @@ class API(base.Base):
@wrap_check_policy
def attach(self, context, volume, instance_uuid, mountpoint):
host = volume['host']
queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "attach_volume",
"args": {"volume_id": volume['id'],
"instance_uuid": instance_uuid,
"mountpoint": mountpoint}})
return self.volume_rpcapi.attach_volume(context,
volume,
instance_uuid,
mountpoint)
@wrap_check_policy
def detach(self, context, volume):
host = volume['host']
queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "detach_volume",
"args": {"volume_id": volume['id']}})
return self.volume_rpcapi.detach_volume(context, volume)
@wrap_check_policy
def initialize_connection(self, context, volume, connector):
host = volume['host']
queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "initialize_connection",
"args": {"volume_id": volume['id'],
"connector": connector}})
return self.volume_rpcapi.initialize_connection(context,
volume,
connector)
@wrap_check_policy
def terminate_connection(self, context, volume, connector, force=False):
self.unreserve_volume(context, volume)
host = volume['host']
queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "terminate_connection",
"args": {"volume_id": volume['id'],
"connector": connector, 'force': force}})
return self.volume_rpcapi.terminate_connection(context,
volume,
connector,
force)
def _create_snapshot(self, context, volume, name, description,
force=False):
@@ -442,12 +424,8 @@ class API(base.Base):
'display_description': description}
snapshot = self.db.snapshot_create(context, options)
host = volume['host']
rpc.cast(context,
rpc.queue_get_for(context, FLAGS.volume_topic, host),
{"method": "create_snapshot",
"args": {"volume_id": volume['id'],
"snapshot_id": snapshot['id']}})
self.volume_rpcapi.create_snapshot(context, volume, snapshot)
return snapshot
def create_snapshot(self, context, volume, name, description):
@@ -466,11 +444,7 @@ class API(base.Base):
self.db.snapshot_update(context, snapshot['id'],
{'status': 'deleting'})
volume = self.db.volume_get(context, snapshot['volume_id'])
host = volume['host']
rpc.cast(context,
rpc.queue_get_for(context, FLAGS.volume_topic, host),
{"method": "delete_snapshot",
"args": {"snapshot_id": snapshot['id']}})
self.volume_rpcapi.delete_snapshot(context, snapshot, volume['host'])
@wrap_check_policy
def update_snapshot(self, context, snapshot, fields):
@@ -529,13 +503,8 @@ class API(base.Base):
recv_metadata = self.image_service.create(context, metadata)
self.update(context, volume, {'status': 'uploading'})
rpc.cast(context,
rpc.queue_get_for(context,
FLAGS.volume_topic,
volume['host']),
{"method": "copy_volume_to_image",
"args": {"volume_id": volume['id'],
"image_id": recv_metadata['id']}})
self.volume_rpcapi.copy_volume_to_image(context, volume,
recv_metadata['id'])
response = {"id": volume['id'],
"updated_at": volume['updated_at'],

View File

@@ -77,6 +77,9 @@ MAPPING = {
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
RPC_API_VERSION = '1.0'
def __init__(self, volume_driver=None, *args, **kwargs):
"""Load the driver from the one specified in args, or from flags."""
if not volume_driver:

97
cinder/volume/rpcapi.py Normal file
View File

@@ -0,0 +1,97 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012, Intel, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Client side of the volume RPC API.
"""
from cinder import exception
from cinder import flags
from cinder.openstack.common import rpc
import cinder.openstack.common.rpc.proxy
FLAGS = flags.FLAGS
class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
'''Client side of the volume rpc API.
API version history:
1.0 - Initial version.
'''
BASE_RPC_API_VERSION = '1.0'
def __init__(self):
super(VolumeAPI, self).__init__(topic=FLAGS.volume_topic,
default_version=self.BASE_RPC_API_VERSION)
def create_volume(self, ctxt, volume, host,
snapshot_id=None, image_id=None):
self.cast(ctxt, self.make_msg('create_volume',
volume_id=volume['id'],
snapshot_id=snapshot_id,
image_id=image_id),
topic=rpc.queue_get_for(ctxt, self.topic, host))
def delete_volume(self, ctxt, volume):
self.cast(ctxt, self.make_msg('delete_volume',
volume_id=volume['id']),
topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
def create_snapshot(self, ctxt, volume, snapshot):
self.cast(ctxt, self.make_msg('create_snapshot',
volume_id=volume['id'],
snapshot_id=snapshot['id']),
topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
def delete_snapshot(self, ctxt, snapshot, host):
self.cast(ctxt, self.make_msg('delete_snapshot',
snapshot_id=snapshot['id']),
topic=rpc.queue_get_for(ctxt, self.topic, host))
def attach_volume(self, ctxt, volume, instance_uuid, mountpoint):
return self.call(ctxt, self.make_msg('attach_volume',
volume_id=volume['id'],
instance_uuid=instance_uuid,
mountpoint=mountpoint),
topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
def detach_volume(self, ctxt, volume):
return self.call(ctxt, self.make_msg('detach_volume',
volume_id=volume['id']),
topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
def copy_volume_to_image(self, ctxt, volume, image_id):
self.cast(ctxt, self.make_msg('copy_volume_to_image',
volume_id=volume['id'],
image_id=image_id),
topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
def initialize_connection(self, ctxt, volume, connector):
return self.call(ctxt, self.make_msg('initialize_connection',
volume_id=volume['id'],
connector=connector),
topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
def terminate_connection(self, ctxt, volume, connector, force=False):
return self.call(ctxt, self.make_msg('terminate_connection',
volume_id=volume['id'],
connector=connector,
force=force),
topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))