diff --git a/cinder/scheduler/manager.py b/cinder/scheduler/manager.py index 368a99ce658..ab6487af0a5 100644 --- a/cinder/scheduler/manager.py +++ b/cinder/scheduler/manager.py @@ -56,7 +56,7 @@ LOG = logging.getLogger(__name__) class SchedulerManager(manager.Manager): """Chooses a host to create volumes.""" - RPC_API_VERSION = '2.0' + RPC_API_VERSION = '2.1' target = messaging.Target(version=RPC_API_VERSION) @@ -255,20 +255,25 @@ class SchedulerManager(manager.Manager): old_reservations) def manage_existing(self, context, topic, volume_id, - request_spec, filter_properties=None): + request_spec, filter_properties=None, volume=None): """Ensure that the host exists and can accept the volume.""" self._wait_for_scheduler() + # FIXME(mdulko): Remove this in v3.0 of RPC API. + if volume is None: + # For older clients, mimic the old behavior and look up the + # volume by its volume_id. + volume = objects.Volume.get_by_id(context, volume_id) + def _manage_existing_set_error(self, context, ex, request_spec): volume_state = {'volume_state': {'status': 'error'}} self._set_volume_state_and_notify('manage_existing', volume_state, context, ex, request_spec) - volume_ref = db.volume_get(context, volume_id) try: self.driver.host_passes_filters(context, - volume_ref['host'], + volume.host, request_spec, filter_properties) except exception.NoValidHost as ex: @@ -277,7 +282,7 @@ class SchedulerManager(manager.Manager): with excutils.save_and_reraise_exception(): _manage_existing_set_error(self, context, ex, request_spec) else: - volume_rpcapi.VolumeAPI().manage_existing(context, volume_ref, + volume_rpcapi.VolumeAPI().manage_existing(context, volume, request_spec.get('ref')) def get_pools(self, context, filters=None): diff --git a/cinder/scheduler/rpcapi.py b/cinder/scheduler/rpcapi.py index 748d31e3816..84365ecdd87 100644 --- a/cinder/scheduler/rpcapi.py +++ b/cinder/scheduler/rpcapi.py @@ -52,16 +52,17 @@ class SchedulerAPI(rpc.RPCAPI): set to 1.11. 2.0 - Remove 1.x compatibility + 2.1 - Adds support for sending objects over RPC in manage_existing() """ - RPC_API_VERSION = '2.0' + RPC_API_VERSION = '2.1' TOPIC = CONF.scheduler_topic BINARY = 'cinder-scheduler' def create_consistencygroup(self, ctxt, topic, group, request_spec_list=None, filter_properties_list=None): - version = '2.0' + version = '2.1' cctxt = self.client.prepare(version=version) request_spec_p_list = [] for request_spec in request_spec_list: @@ -113,15 +114,20 @@ class SchedulerAPI(rpc.RPCAPI): return cctxt.cast(ctxt, 'retype', **msg_args) def manage_existing(self, ctxt, topic, volume_id, - request_spec=None, filter_properties=None): - version = '2.0' - cctxt = self.client.prepare(version=version) + request_spec=None, filter_properties=None, + volume=None): request_spec_p = jsonutils.to_primitive(request_spec) - return cctxt.cast(ctxt, 'manage_existing', - topic=topic, - volume_id=volume_id, - request_spec=request_spec_p, - filter_properties=filter_properties) + msg_args = { + 'topic': topic, 'volume_id': volume_id, + 'request_spec': request_spec_p, + 'filter_properties': filter_properties, 'volume': volume, + } + version = '2.1' + if not self.client.can_send_version('2.1'): + version = '2.0' + msg_args.pop('volume') + cctxt = self.client.prepare(version=version) + return cctxt.cast(ctxt, 'manage_existing', **msg_args) def get_pools(self, ctxt, filters=None): version = '2.0' diff --git a/cinder/tests/unit/scheduler/test_rpcapi.py b/cinder/tests/unit/scheduler/test_rpcapi.py index aa4cfa6bbf7..9a0159728ad 100644 --- a/cinder/tests/unit/scheduler/test_rpcapi.py +++ b/cinder/tests/unit/scheduler/test_rpcapi.py @@ -19,6 +19,7 @@ Unit Tests for cinder.scheduler.rpcapi import copy +import ddt import mock from cinder import context @@ -26,6 +27,7 @@ from cinder.scheduler import rpcapi as scheduler_rpcapi from cinder import test +@ddt.ddt class SchedulerRpcAPITestCase(test.TestCase): def setUp(self): @@ -118,14 +120,19 @@ class SchedulerRpcAPITestCase(test.TestCase): volume='volume', version='2.0') - def test_manage_existing(self): + @ddt.data('2.0', '2.1') + @mock.patch('oslo_messaging.RPCClient.can_send_version') + def test_manage_existing(self, version, can_send_version): + can_send_version.side_effect = lambda x: x == version self._test_scheduler_api('manage_existing', rpc_method='cast', topic='topic', volume_id='volume_id', request_spec='fake_request_spec', filter_properties='filter_properties', - version='2.0') + volume='volume', + version=version) + can_send_version.assert_called_with('2.1') def test_get_pools(self): self._test_scheduler_api('get_pools', diff --git a/cinder/tests/unit/test_volume_rpcapi.py b/cinder/tests/unit/test_volume_rpcapi.py index 49f8ebda8c7..4b3d9d85885 100644 --- a/cinder/tests/unit/test_volume_rpcapi.py +++ b/cinder/tests/unit/test_volume_rpcapi.py @@ -16,7 +16,9 @@ Unit Tests for cinder.volume.rpcapi """ import copy +import mock +import ddt from oslo_config import cfg from oslo_serialization import jsonutils @@ -37,6 +39,7 @@ from cinder.volume import utils CONF = cfg.CONF +@ddt.ddt class VolumeRpcAPITestCase(test.TestCase): def setUp(self): @@ -408,14 +411,19 @@ class VolumeRpcAPITestCase(test.TestCase): old_reservations=self.fake_reservations, version='2.0') - def test_manage_existing(self): + @ddt.data('2.0', '2.2') + @mock.patch('oslo_messaging.RPCClient.can_send_version') + def test_manage_existing(self, version, can_send_version): + can_send_version.side_effect = lambda x: x == version self._test_volume_api('manage_existing', rpc_method='cast', - volume=self.fake_volume, + volume=self.fake_volume_obj, ref={'lv_name': 'foo'}, - version='2.0') + version=version) + can_send_version.assert_called_once_with('2.2') - def test_manage_existing_snapshot(self): + @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True) + def test_manage_existing_snapshot(self, mock_can_send_version): volume_update = {'host': 'fake_host'} snpshot = { 'id': fake.SNAPSHOT_ID, diff --git a/cinder/tests/unit/volume/flows/fake_volume_api.py b/cinder/tests/unit/volume/flows/fake_volume_api.py index b4c97349770..a8155c97074 100644 --- a/cinder/tests/unit/volume/flows/fake_volume_api.py +++ b/cinder/tests/unit/volume/flows/fake_volume_api.py @@ -43,7 +43,7 @@ class FakeSchedulerRpcAPI(object): self.test_inst.assertEqual(self.expected_spec, request_spec) def manage_existing(self, context, volume_topic, volume_id, - request_spec=None): + request_spec=None, volume=None): self.test_inst.assertEqual(self.expected_spec, request_spec) diff --git a/cinder/volume/api.py b/cinder/volume/api.py index 3642cca85ee..e8f5a50cfcb 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -1536,7 +1536,7 @@ class API(base.Base): service = self._get_service_by_host(context, host) if availability_zone is None: - availability_zone = service.get('availability_zone') + availability_zone = service.availability_zone manage_what = { 'context': context, diff --git a/cinder/volume/flows/api/manage_existing.py b/cinder/volume/flows/api/manage_existing.py index a0410129192..fcbe0e25bc5 100644 --- a/cinder/volume/flows/api/manage_existing.py +++ b/cinder/volume/flows/api/manage_existing.py @@ -20,6 +20,7 @@ from taskflow.types import failure as ft from cinder import exception from cinder import flow_utils from cinder.i18n import _LE +from cinder import objects from cinder.volume.flows import common LOG = logging.getLogger(__name__) @@ -68,18 +69,11 @@ class EntryCreateTask(flow_utils.CinderTask): 'bootable': kwargs.pop('bootable'), } - volume = self.db.volume_create(context, volume_properties) + volume = objects.Volume(context, volume_properties) + volume.create() return { 'volume_properties': volume_properties, - # NOTE(harlowja): it appears like further usage of this volume - # result actually depend on it being a sqlalchemy object and not - # just a plain dictionary so that's why we are storing this here. - # - # In the future where this task results can be serialized and - # restored automatically for continued running we will need to - # resolve the serialization & recreation of this object since raw - # sqlalchemy objects can't be serialized. 'volume': volume, } @@ -117,8 +111,9 @@ class ManageCastTask(flow_utils.CinderTask): # Call the scheduler to ensure that the host exists and that it can # accept the volume self.scheduler_rpcapi.manage_existing(context, CONF.volume_topic, - volume['id'], - request_spec=request_spec) + volume.id, + request_spec=request_spec, + volume=volume) def revert(self, context, result, flow_failures, **kwargs): # Restore the source volume status and set the volume to error status. diff --git a/cinder/volume/flows/manager/manage_existing.py b/cinder/volume/flows/manager/manage_existing.py index 9d2ac1886dc..03732a06a93 100644 --- a/cinder/volume/flows/manager/manage_existing.py +++ b/cinder/volume/flows/manager/manage_existing.py @@ -40,7 +40,7 @@ class PrepareForQuotaReservationTask(flow_utils.CinderTask): self.driver = driver def execute(self, context, volume_ref, manage_existing_ref): - volume_id = volume_ref['id'] + volume_id = volume_ref.id if not self.driver.initialized: driver_name = self.driver.__class__.__name__ LOG.error(_LE("Unable to manage existing volume. " @@ -55,11 +55,11 @@ class PrepareForQuotaReservationTask(flow_utils.CinderTask): manage_existing_ref) return {'size': size, - 'volume_type_id': volume_ref['volume_type_id'], + 'volume_type_id': volume_ref.volume_type_id, 'volume_properties': volume_ref, - 'volume_spec': {'status': volume_ref['status'], - 'volume_name': volume_ref['name'], - 'volume_id': volume_ref['id']}} + 'volume_spec': {'status': volume_ref.status, + 'volume_name': volume_ref.name, + 'volume_id': volume_ref.id}} class ManageExistingTask(flow_utils.CinderTask): @@ -91,7 +91,7 @@ class ManageExistingTask(flow_utils.CinderTask): return {'volume': volume_ref} -def get_flow(context, db, driver, host, volume_id, ref): +def get_flow(context, db, driver, host, volume, ref): """Constructs and returns the manager entrypoint flow.""" flow_name = ACTION.replace(":", "_") + "_manager" @@ -102,13 +102,12 @@ def get_flow(context, db, driver, host, volume_id, ref): # determined. create_what = { 'context': context, - 'volume_id': volume_id, + 'volume_ref': volume, 'manage_existing_ref': ref, 'optional_args': {'is_quota_committed': False} } - volume_flow.add(create_mgr.ExtractVolumeRefTask(db, host), - create_mgr.NotifyVolumeActionTask(db, + volume_flow.add(create_mgr.NotifyVolumeActionTask(db, "manage_existing.start"), PrepareForQuotaReservationTask(db, driver), create_api.QuotaReserveTask(), diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index 0daf7f21c0b..18c61a30893 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -218,7 +218,7 @@ def locked_snapshot_operation(f): class VolumeManager(manager.SchedulerDependentManager): """Manages attachable block storage devices.""" - RPC_API_VERSION = '2.1' + RPC_API_VERSION = '2.2' target = messaging.Target(version=RPC_API_VERSION) @@ -2284,7 +2284,13 @@ class VolumeManager(manager.SchedulerDependentManager): LOG.info(_LI("Retype volume completed successfully."), resource=volume) - def manage_existing(self, ctxt, volume_id, ref=None): + def manage_existing(self, ctxt, volume_id, ref=None, volume=None): + # FIXME(dulek): Remove this in v3.0 of RPC API. + if volume is None: + # For older clients, mimic the old behavior and look up the volume + # by its volume_id. + volume = objects.Volume.get_by_id(context, volume_id) + try: flow_engine = manage_existing.get_flow( ctxt, @@ -2292,7 +2298,9 @@ class VolumeManager(manager.SchedulerDependentManager): self.driver, self.host, volume_id, - ref) + ref, + volume, + ) except Exception: msg = _("Failed to create manage_existing flow.") LOG.exception(msg, resource={'type': 'volume', 'id': volume_id}) diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py index 1a6f9777080..1c0a6a82e80 100644 --- a/cinder/volume/rpcapi.py +++ b/cinder/volume/rpcapi.py @@ -100,9 +100,10 @@ class VolumeAPI(rpc.RPCAPI): 2.0 - Remove 1.x compatibility 2.1 - Add get_manageable_volumes() and get_manageable_snapshots(). + 2.2 - Adds support for sending objects over RPC in manage_existing(). """ - RPC_API_VERSION = '2.1' + RPC_API_VERSION = '2.2' TOPIC = CONF.volume_topic BINARY = 'cinder-volume' @@ -243,8 +244,15 @@ class VolumeAPI(rpc.RPCAPI): old_reservations=old_reservations) def manage_existing(self, ctxt, volume, ref): - cctxt = self._get_cctxt(volume['host'], '2.0') - cctxt.cast(ctxt, 'manage_existing', volume_id=volume['id'], ref=ref) + msg_args = { + 'volume_id': volume.id, 'ref': ref, 'volume': volume, + } + version = '2.2' + if not self.client.can_send_version('2.2'): + version = '2.0' + msg_args.pop('volume') + cctxt = self._get_cctxt(volume.host, version) + cctxt.cast(ctxt, 'manage_existing', **msg_args) def promote_replica(self, ctxt, volume): cctxt = self._get_cctxt(volume['host'], '2.0')