Update manage_existing to use volume object

The following patch updates manage_existing APIs to use volume versioned
objects. Changes were made to be backward compatible with older RPC
clients. Changes in the drivers are left to each driver maintainer to
update.

Note that this patch DOES NOT try to use object dot notation everywhere
to keep the size small. This cleanup will be done in the future.

Change-Id: Ie806336ad95834b091c5942d7dfa13345006c1c8
Partial-Implements: blueprint cinder-objects
This commit is contained in:
Michał Dulko 2015-12-15 14:49:49 +01:00
parent 6db7a8a43d
commit 092a01f2cf
10 changed files with 85 additions and 49 deletions

View File

@ -56,7 +56,7 @@ LOG = logging.getLogger(__name__)
class SchedulerManager(manager.Manager):
"""Chooses a host to create volumes."""
RPC_API_VERSION = '2.0'
RPC_API_VERSION = '2.1'
target = messaging.Target(version=RPC_API_VERSION)
@ -255,20 +255,25 @@ class SchedulerManager(manager.Manager):
old_reservations)
def manage_existing(self, context, topic, volume_id,
request_spec, filter_properties=None):
request_spec, filter_properties=None, volume=None):
"""Ensure that the host exists and can accept the volume."""
self._wait_for_scheduler()
# FIXME(mdulko): Remove this in v3.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the
# volume by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
def _manage_existing_set_error(self, context, ex, request_spec):
volume_state = {'volume_state': {'status': 'error'}}
self._set_volume_state_and_notify('manage_existing', volume_state,
context, ex, request_spec)
volume_ref = db.volume_get(context, volume_id)
try:
self.driver.host_passes_filters(context,
volume_ref['host'],
volume.host,
request_spec,
filter_properties)
except exception.NoValidHost as ex:
@ -277,7 +282,7 @@ class SchedulerManager(manager.Manager):
with excutils.save_and_reraise_exception():
_manage_existing_set_error(self, context, ex, request_spec)
else:
volume_rpcapi.VolumeAPI().manage_existing(context, volume_ref,
volume_rpcapi.VolumeAPI().manage_existing(context, volume,
request_spec.get('ref'))
def get_pools(self, context, filters=None):

View File

@ -52,16 +52,17 @@ class SchedulerAPI(rpc.RPCAPI):
set to 1.11.
2.0 - Remove 1.x compatibility
2.1 - Adds support for sending objects over RPC in manage_existing()
"""
RPC_API_VERSION = '2.0'
RPC_API_VERSION = '2.1'
TOPIC = CONF.scheduler_topic
BINARY = 'cinder-scheduler'
def create_consistencygroup(self, ctxt, topic, group,
request_spec_list=None,
filter_properties_list=None):
version = '2.0'
version = '2.1'
cctxt = self.client.prepare(version=version)
request_spec_p_list = []
for request_spec in request_spec_list:
@ -113,15 +114,20 @@ class SchedulerAPI(rpc.RPCAPI):
return cctxt.cast(ctxt, 'retype', **msg_args)
def manage_existing(self, ctxt, topic, volume_id,
request_spec=None, filter_properties=None):
version = '2.0'
cctxt = self.client.prepare(version=version)
request_spec=None, filter_properties=None,
volume=None):
request_spec_p = jsonutils.to_primitive(request_spec)
return cctxt.cast(ctxt, 'manage_existing',
topic=topic,
volume_id=volume_id,
request_spec=request_spec_p,
filter_properties=filter_properties)
msg_args = {
'topic': topic, 'volume_id': volume_id,
'request_spec': request_spec_p,
'filter_properties': filter_properties, 'volume': volume,
}
version = '2.1'
if not self.client.can_send_version('2.1'):
version = '2.0'
msg_args.pop('volume')
cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'manage_existing', **msg_args)
def get_pools(self, ctxt, filters=None):
version = '2.0'

View File

@ -19,6 +19,7 @@ Unit Tests for cinder.scheduler.rpcapi
import copy
import ddt
import mock
from cinder import context
@ -26,6 +27,7 @@ from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import test
@ddt.ddt
class SchedulerRpcAPITestCase(test.TestCase):
def setUp(self):
@ -118,14 +120,19 @@ class SchedulerRpcAPITestCase(test.TestCase):
volume='volume',
version='2.0')
def test_manage_existing(self):
@ddt.data('2.0', '2.1')
@mock.patch('oslo_messaging.RPCClient.can_send_version')
def test_manage_existing(self, version, can_send_version):
can_send_version.side_effect = lambda x: x == version
self._test_scheduler_api('manage_existing',
rpc_method='cast',
topic='topic',
volume_id='volume_id',
request_spec='fake_request_spec',
filter_properties='filter_properties',
version='2.0')
volume='volume',
version=version)
can_send_version.assert_called_with('2.1')
def test_get_pools(self):
self._test_scheduler_api('get_pools',

View File

@ -16,7 +16,9 @@
Unit Tests for cinder.volume.rpcapi
"""
import copy
import mock
import ddt
from oslo_config import cfg
from oslo_serialization import jsonutils
@ -37,6 +39,7 @@ from cinder.volume import utils
CONF = cfg.CONF
@ddt.ddt
class VolumeRpcAPITestCase(test.TestCase):
def setUp(self):
@ -408,14 +411,19 @@ class VolumeRpcAPITestCase(test.TestCase):
old_reservations=self.fake_reservations,
version='2.0')
def test_manage_existing(self):
@ddt.data('2.0', '2.2')
@mock.patch('oslo_messaging.RPCClient.can_send_version')
def test_manage_existing(self, version, can_send_version):
can_send_version.side_effect = lambda x: x == version
self._test_volume_api('manage_existing',
rpc_method='cast',
volume=self.fake_volume,
volume=self.fake_volume_obj,
ref={'lv_name': 'foo'},
version='2.0')
version=version)
can_send_version.assert_called_once_with('2.2')
def test_manage_existing_snapshot(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
def test_manage_existing_snapshot(self, mock_can_send_version):
volume_update = {'host': 'fake_host'}
snpshot = {
'id': fake.SNAPSHOT_ID,

View File

@ -43,7 +43,7 @@ class FakeSchedulerRpcAPI(object):
self.test_inst.assertEqual(self.expected_spec, request_spec)
def manage_existing(self, context, volume_topic, volume_id,
request_spec=None):
request_spec=None, volume=None):
self.test_inst.assertEqual(self.expected_spec, request_spec)

View File

@ -1536,7 +1536,7 @@ class API(base.Base):
service = self._get_service_by_host(context, host)
if availability_zone is None:
availability_zone = service.get('availability_zone')
availability_zone = service.availability_zone
manage_what = {
'context': context,

View File

@ -20,6 +20,7 @@ from taskflow.types import failure as ft
from cinder import exception
from cinder import flow_utils
from cinder.i18n import _LE
from cinder import objects
from cinder.volume.flows import common
LOG = logging.getLogger(__name__)
@ -68,18 +69,11 @@ class EntryCreateTask(flow_utils.CinderTask):
'bootable': kwargs.pop('bootable'),
}
volume = self.db.volume_create(context, volume_properties)
volume = objects.Volume(context, volume_properties)
volume.create()
return {
'volume_properties': volume_properties,
# NOTE(harlowja): it appears like further usage of this volume
# result actually depend on it being a sqlalchemy object and not
# just a plain dictionary so that's why we are storing this here.
#
# In the future where this task results can be serialized and
# restored automatically for continued running we will need to
# resolve the serialization & recreation of this object since raw
# sqlalchemy objects can't be serialized.
'volume': volume,
}
@ -117,8 +111,9 @@ class ManageCastTask(flow_utils.CinderTask):
# Call the scheduler to ensure that the host exists and that it can
# accept the volume
self.scheduler_rpcapi.manage_existing(context, CONF.volume_topic,
volume['id'],
request_spec=request_spec)
volume.id,
request_spec=request_spec,
volume=volume)
def revert(self, context, result, flow_failures, **kwargs):
# Restore the source volume status and set the volume to error status.

View File

@ -40,7 +40,7 @@ class PrepareForQuotaReservationTask(flow_utils.CinderTask):
self.driver = driver
def execute(self, context, volume_ref, manage_existing_ref):
volume_id = volume_ref['id']
volume_id = volume_ref.id
if not self.driver.initialized:
driver_name = self.driver.__class__.__name__
LOG.error(_LE("Unable to manage existing volume. "
@ -55,11 +55,11 @@ class PrepareForQuotaReservationTask(flow_utils.CinderTask):
manage_existing_ref)
return {'size': size,
'volume_type_id': volume_ref['volume_type_id'],
'volume_type_id': volume_ref.volume_type_id,
'volume_properties': volume_ref,
'volume_spec': {'status': volume_ref['status'],
'volume_name': volume_ref['name'],
'volume_id': volume_ref['id']}}
'volume_spec': {'status': volume_ref.status,
'volume_name': volume_ref.name,
'volume_id': volume_ref.id}}
class ManageExistingTask(flow_utils.CinderTask):
@ -91,7 +91,7 @@ class ManageExistingTask(flow_utils.CinderTask):
return {'volume': volume_ref}
def get_flow(context, db, driver, host, volume_id, ref):
def get_flow(context, db, driver, host, volume, ref):
"""Constructs and returns the manager entrypoint flow."""
flow_name = ACTION.replace(":", "_") + "_manager"
@ -102,13 +102,12 @@ def get_flow(context, db, driver, host, volume_id, ref):
# determined.
create_what = {
'context': context,
'volume_id': volume_id,
'volume_ref': volume,
'manage_existing_ref': ref,
'optional_args': {'is_quota_committed': False}
}
volume_flow.add(create_mgr.ExtractVolumeRefTask(db, host),
create_mgr.NotifyVolumeActionTask(db,
volume_flow.add(create_mgr.NotifyVolumeActionTask(db,
"manage_existing.start"),
PrepareForQuotaReservationTask(db, driver),
create_api.QuotaReserveTask(),

View File

@ -218,7 +218,7 @@ def locked_snapshot_operation(f):
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
RPC_API_VERSION = '2.1'
RPC_API_VERSION = '2.2'
target = messaging.Target(version=RPC_API_VERSION)
@ -2284,7 +2284,13 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.info(_LI("Retype volume completed successfully."),
resource=volume)
def manage_existing(self, ctxt, volume_id, ref=None):
def manage_existing(self, ctxt, volume_id, ref=None, volume=None):
# FIXME(dulek): Remove this in v3.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the volume
# by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
try:
flow_engine = manage_existing.get_flow(
ctxt,
@ -2292,7 +2298,9 @@ class VolumeManager(manager.SchedulerDependentManager):
self.driver,
self.host,
volume_id,
ref)
ref,
volume,
)
except Exception:
msg = _("Failed to create manage_existing flow.")
LOG.exception(msg, resource={'type': 'volume', 'id': volume_id})

View File

@ -100,9 +100,10 @@ class VolumeAPI(rpc.RPCAPI):
2.0 - Remove 1.x compatibility
2.1 - Add get_manageable_volumes() and get_manageable_snapshots().
2.2 - Adds support for sending objects over RPC in manage_existing().
"""
RPC_API_VERSION = '2.1'
RPC_API_VERSION = '2.2'
TOPIC = CONF.volume_topic
BINARY = 'cinder-volume'
@ -243,8 +244,15 @@ class VolumeAPI(rpc.RPCAPI):
old_reservations=old_reservations)
def manage_existing(self, ctxt, volume, ref):
cctxt = self._get_cctxt(volume['host'], '2.0')
cctxt.cast(ctxt, 'manage_existing', volume_id=volume['id'], ref=ref)
msg_args = {
'volume_id': volume.id, 'ref': ref, 'volume': volume,
}
version = '2.2'
if not self.client.can_send_version('2.2'):
version = '2.0'
msg_args.pop('volume')
cctxt = self._get_cctxt(volume.host, version)
cctxt.cast(ctxt, 'manage_existing', **msg_args)
def promote_replica(self, ctxt, volume):
cctxt = self._get_cctxt(volume['host'], '2.0')