From 092a01f2cf435364b7ecb69ade9d0d816232015c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Dulko?= Date: Tue, 15 Dec 2015 14:49:49 +0100 Subject: [PATCH] Update manage_existing to use volume object The following patch updates manage_existing APIs to use volume versioned objects. Changes were made to be backward compatible with older RPC clients. Changes in the drivers are left to each driver maintainer to update. Note that this patch DOES NOT try to use object dot notation everywhere to keep the size small. This cleanup will be done in the future. Change-Id: Ie806336ad95834b091c5942d7dfa13345006c1c8 Partial-Implements: blueprint cinder-objects --- cinder/scheduler/manager.py | 15 +++++++---- cinder/scheduler/rpcapi.py | 26 ++++++++++++------- cinder/tests/unit/scheduler/test_rpcapi.py | 11 ++++++-- cinder/tests/unit/test_volume_rpcapi.py | 16 +++++++++--- .../unit/volume/flows/fake_volume_api.py | 2 +- cinder/volume/api.py | 2 +- cinder/volume/flows/api/manage_existing.py | 17 +++++------- .../volume/flows/manager/manage_existing.py | 17 ++++++------ cinder/volume/manager.py | 14 +++++++--- cinder/volume/rpcapi.py | 14 +++++++--- 10 files changed, 85 insertions(+), 49 deletions(-) diff --git a/cinder/scheduler/manager.py b/cinder/scheduler/manager.py index 368a99ce658..ab6487af0a5 100644 --- a/cinder/scheduler/manager.py +++ b/cinder/scheduler/manager.py @@ -56,7 +56,7 @@ LOG = logging.getLogger(__name__) class SchedulerManager(manager.Manager): """Chooses a host to create volumes.""" - RPC_API_VERSION = '2.0' + RPC_API_VERSION = '2.1' target = messaging.Target(version=RPC_API_VERSION) @@ -255,20 +255,25 @@ class SchedulerManager(manager.Manager): old_reservations) def manage_existing(self, context, topic, volume_id, - request_spec, filter_properties=None): + request_spec, filter_properties=None, volume=None): """Ensure that the host exists and can accept the volume.""" self._wait_for_scheduler() + # FIXME(mdulko): Remove this in v3.0 of RPC API. + if volume is None: + # For older clients, mimic the old behavior and look up the + # volume by its volume_id. + volume = objects.Volume.get_by_id(context, volume_id) + def _manage_existing_set_error(self, context, ex, request_spec): volume_state = {'volume_state': {'status': 'error'}} self._set_volume_state_and_notify('manage_existing', volume_state, context, ex, request_spec) - volume_ref = db.volume_get(context, volume_id) try: self.driver.host_passes_filters(context, - volume_ref['host'], + volume.host, request_spec, filter_properties) except exception.NoValidHost as ex: @@ -277,7 +282,7 @@ class SchedulerManager(manager.Manager): with excutils.save_and_reraise_exception(): _manage_existing_set_error(self, context, ex, request_spec) else: - volume_rpcapi.VolumeAPI().manage_existing(context, volume_ref, + volume_rpcapi.VolumeAPI().manage_existing(context, volume, request_spec.get('ref')) def get_pools(self, context, filters=None): diff --git a/cinder/scheduler/rpcapi.py b/cinder/scheduler/rpcapi.py index 748d31e3816..84365ecdd87 100644 --- a/cinder/scheduler/rpcapi.py +++ b/cinder/scheduler/rpcapi.py @@ -52,16 +52,17 @@ class SchedulerAPI(rpc.RPCAPI): set to 1.11. 2.0 - Remove 1.x compatibility + 2.1 - Adds support for sending objects over RPC in manage_existing() """ - RPC_API_VERSION = '2.0' + RPC_API_VERSION = '2.1' TOPIC = CONF.scheduler_topic BINARY = 'cinder-scheduler' def create_consistencygroup(self, ctxt, topic, group, request_spec_list=None, filter_properties_list=None): - version = '2.0' + version = '2.1' cctxt = self.client.prepare(version=version) request_spec_p_list = [] for request_spec in request_spec_list: @@ -113,15 +114,20 @@ class SchedulerAPI(rpc.RPCAPI): return cctxt.cast(ctxt, 'retype', **msg_args) def manage_existing(self, ctxt, topic, volume_id, - request_spec=None, filter_properties=None): - version = '2.0' - cctxt = self.client.prepare(version=version) + request_spec=None, filter_properties=None, + volume=None): request_spec_p = jsonutils.to_primitive(request_spec) - return cctxt.cast(ctxt, 'manage_existing', - topic=topic, - volume_id=volume_id, - request_spec=request_spec_p, - filter_properties=filter_properties) + msg_args = { + 'topic': topic, 'volume_id': volume_id, + 'request_spec': request_spec_p, + 'filter_properties': filter_properties, 'volume': volume, + } + version = '2.1' + if not self.client.can_send_version('2.1'): + version = '2.0' + msg_args.pop('volume') + cctxt = self.client.prepare(version=version) + return cctxt.cast(ctxt, 'manage_existing', **msg_args) def get_pools(self, ctxt, filters=None): version = '2.0' diff --git a/cinder/tests/unit/scheduler/test_rpcapi.py b/cinder/tests/unit/scheduler/test_rpcapi.py index aa4cfa6bbf7..9a0159728ad 100644 --- a/cinder/tests/unit/scheduler/test_rpcapi.py +++ b/cinder/tests/unit/scheduler/test_rpcapi.py @@ -19,6 +19,7 @@ Unit Tests for cinder.scheduler.rpcapi import copy +import ddt import mock from cinder import context @@ -26,6 +27,7 @@ from cinder.scheduler import rpcapi as scheduler_rpcapi from cinder import test +@ddt.ddt class SchedulerRpcAPITestCase(test.TestCase): def setUp(self): @@ -118,14 +120,19 @@ class SchedulerRpcAPITestCase(test.TestCase): volume='volume', version='2.0') - def test_manage_existing(self): + @ddt.data('2.0', '2.1') + @mock.patch('oslo_messaging.RPCClient.can_send_version') + def test_manage_existing(self, version, can_send_version): + can_send_version.side_effect = lambda x: x == version self._test_scheduler_api('manage_existing', rpc_method='cast', topic='topic', volume_id='volume_id', request_spec='fake_request_spec', filter_properties='filter_properties', - version='2.0') + volume='volume', + version=version) + can_send_version.assert_called_with('2.1') def test_get_pools(self): self._test_scheduler_api('get_pools', diff --git a/cinder/tests/unit/test_volume_rpcapi.py b/cinder/tests/unit/test_volume_rpcapi.py index 49f8ebda8c7..4b3d9d85885 100644 --- a/cinder/tests/unit/test_volume_rpcapi.py +++ b/cinder/tests/unit/test_volume_rpcapi.py @@ -16,7 +16,9 @@ Unit Tests for cinder.volume.rpcapi """ import copy +import mock +import ddt from oslo_config import cfg from oslo_serialization import jsonutils @@ -37,6 +39,7 @@ from cinder.volume import utils CONF = cfg.CONF +@ddt.ddt class VolumeRpcAPITestCase(test.TestCase): def setUp(self): @@ -408,14 +411,19 @@ class VolumeRpcAPITestCase(test.TestCase): old_reservations=self.fake_reservations, version='2.0') - def test_manage_existing(self): + @ddt.data('2.0', '2.2') + @mock.patch('oslo_messaging.RPCClient.can_send_version') + def test_manage_existing(self, version, can_send_version): + can_send_version.side_effect = lambda x: x == version self._test_volume_api('manage_existing', rpc_method='cast', - volume=self.fake_volume, + volume=self.fake_volume_obj, ref={'lv_name': 'foo'}, - version='2.0') + version=version) + can_send_version.assert_called_once_with('2.2') - def test_manage_existing_snapshot(self): + @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True) + def test_manage_existing_snapshot(self, mock_can_send_version): volume_update = {'host': 'fake_host'} snpshot = { 'id': fake.SNAPSHOT_ID, diff --git a/cinder/tests/unit/volume/flows/fake_volume_api.py b/cinder/tests/unit/volume/flows/fake_volume_api.py index b4c97349770..a8155c97074 100644 --- a/cinder/tests/unit/volume/flows/fake_volume_api.py +++ b/cinder/tests/unit/volume/flows/fake_volume_api.py @@ -43,7 +43,7 @@ class FakeSchedulerRpcAPI(object): self.test_inst.assertEqual(self.expected_spec, request_spec) def manage_existing(self, context, volume_topic, volume_id, - request_spec=None): + request_spec=None, volume=None): self.test_inst.assertEqual(self.expected_spec, request_spec) diff --git a/cinder/volume/api.py b/cinder/volume/api.py index 3642cca85ee..e8f5a50cfcb 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -1536,7 +1536,7 @@ class API(base.Base): service = self._get_service_by_host(context, host) if availability_zone is None: - availability_zone = service.get('availability_zone') + availability_zone = service.availability_zone manage_what = { 'context': context, diff --git a/cinder/volume/flows/api/manage_existing.py b/cinder/volume/flows/api/manage_existing.py index a0410129192..fcbe0e25bc5 100644 --- a/cinder/volume/flows/api/manage_existing.py +++ b/cinder/volume/flows/api/manage_existing.py @@ -20,6 +20,7 @@ from taskflow.types import failure as ft from cinder import exception from cinder import flow_utils from cinder.i18n import _LE +from cinder import objects from cinder.volume.flows import common LOG = logging.getLogger(__name__) @@ -68,18 +69,11 @@ class EntryCreateTask(flow_utils.CinderTask): 'bootable': kwargs.pop('bootable'), } - volume = self.db.volume_create(context, volume_properties) + volume = objects.Volume(context, volume_properties) + volume.create() return { 'volume_properties': volume_properties, - # NOTE(harlowja): it appears like further usage of this volume - # result actually depend on it being a sqlalchemy object and not - # just a plain dictionary so that's why we are storing this here. - # - # In the future where this task results can be serialized and - # restored automatically for continued running we will need to - # resolve the serialization & recreation of this object since raw - # sqlalchemy objects can't be serialized. 'volume': volume, } @@ -117,8 +111,9 @@ class ManageCastTask(flow_utils.CinderTask): # Call the scheduler to ensure that the host exists and that it can # accept the volume self.scheduler_rpcapi.manage_existing(context, CONF.volume_topic, - volume['id'], - request_spec=request_spec) + volume.id, + request_spec=request_spec, + volume=volume) def revert(self, context, result, flow_failures, **kwargs): # Restore the source volume status and set the volume to error status. diff --git a/cinder/volume/flows/manager/manage_existing.py b/cinder/volume/flows/manager/manage_existing.py index 9d2ac1886dc..03732a06a93 100644 --- a/cinder/volume/flows/manager/manage_existing.py +++ b/cinder/volume/flows/manager/manage_existing.py @@ -40,7 +40,7 @@ class PrepareForQuotaReservationTask(flow_utils.CinderTask): self.driver = driver def execute(self, context, volume_ref, manage_existing_ref): - volume_id = volume_ref['id'] + volume_id = volume_ref.id if not self.driver.initialized: driver_name = self.driver.__class__.__name__ LOG.error(_LE("Unable to manage existing volume. " @@ -55,11 +55,11 @@ class PrepareForQuotaReservationTask(flow_utils.CinderTask): manage_existing_ref) return {'size': size, - 'volume_type_id': volume_ref['volume_type_id'], + 'volume_type_id': volume_ref.volume_type_id, 'volume_properties': volume_ref, - 'volume_spec': {'status': volume_ref['status'], - 'volume_name': volume_ref['name'], - 'volume_id': volume_ref['id']}} + 'volume_spec': {'status': volume_ref.status, + 'volume_name': volume_ref.name, + 'volume_id': volume_ref.id}} class ManageExistingTask(flow_utils.CinderTask): @@ -91,7 +91,7 @@ class ManageExistingTask(flow_utils.CinderTask): return {'volume': volume_ref} -def get_flow(context, db, driver, host, volume_id, ref): +def get_flow(context, db, driver, host, volume, ref): """Constructs and returns the manager entrypoint flow.""" flow_name = ACTION.replace(":", "_") + "_manager" @@ -102,13 +102,12 @@ def get_flow(context, db, driver, host, volume_id, ref): # determined. create_what = { 'context': context, - 'volume_id': volume_id, + 'volume_ref': volume, 'manage_existing_ref': ref, 'optional_args': {'is_quota_committed': False} } - volume_flow.add(create_mgr.ExtractVolumeRefTask(db, host), - create_mgr.NotifyVolumeActionTask(db, + volume_flow.add(create_mgr.NotifyVolumeActionTask(db, "manage_existing.start"), PrepareForQuotaReservationTask(db, driver), create_api.QuotaReserveTask(), diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index 0daf7f21c0b..18c61a30893 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -218,7 +218,7 @@ def locked_snapshot_operation(f): class VolumeManager(manager.SchedulerDependentManager): """Manages attachable block storage devices.""" - RPC_API_VERSION = '2.1' + RPC_API_VERSION = '2.2' target = messaging.Target(version=RPC_API_VERSION) @@ -2284,7 +2284,13 @@ class VolumeManager(manager.SchedulerDependentManager): LOG.info(_LI("Retype volume completed successfully."), resource=volume) - def manage_existing(self, ctxt, volume_id, ref=None): + def manage_existing(self, ctxt, volume_id, ref=None, volume=None): + # FIXME(dulek): Remove this in v3.0 of RPC API. + if volume is None: + # For older clients, mimic the old behavior and look up the volume + # by its volume_id. + volume = objects.Volume.get_by_id(context, volume_id) + try: flow_engine = manage_existing.get_flow( ctxt, @@ -2292,7 +2298,9 @@ class VolumeManager(manager.SchedulerDependentManager): self.driver, self.host, volume_id, - ref) + ref, + volume, + ) except Exception: msg = _("Failed to create manage_existing flow.") LOG.exception(msg, resource={'type': 'volume', 'id': volume_id}) diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py index 1a6f9777080..1c0a6a82e80 100644 --- a/cinder/volume/rpcapi.py +++ b/cinder/volume/rpcapi.py @@ -100,9 +100,10 @@ class VolumeAPI(rpc.RPCAPI): 2.0 - Remove 1.x compatibility 2.1 - Add get_manageable_volumes() and get_manageable_snapshots(). + 2.2 - Adds support for sending objects over RPC in manage_existing(). """ - RPC_API_VERSION = '2.1' + RPC_API_VERSION = '2.2' TOPIC = CONF.volume_topic BINARY = 'cinder-volume' @@ -243,8 +244,15 @@ class VolumeAPI(rpc.RPCAPI): old_reservations=old_reservations) def manage_existing(self, ctxt, volume, ref): - cctxt = self._get_cctxt(volume['host'], '2.0') - cctxt.cast(ctxt, 'manage_existing', volume_id=volume['id'], ref=ref) + msg_args = { + 'volume_id': volume.id, 'ref': ref, 'volume': volume, + } + version = '2.2' + if not self.client.can_send_version('2.2'): + version = '2.0' + msg_args.pop('volume') + cctxt = self._get_cctxt(volume.host, version) + cctxt.cast(ctxt, 'manage_existing', **msg_args) def promote_replica(self, ctxt, volume): cctxt = self._get_cctxt(volume['host'], '2.0')