Add scheduler RPC API v3.0

This patch creates scheduler RPC API version 3.0, while retaining
compatibility in rpcapi and manager for 2.x, allowing for continuous
deployment scenarios.

This should be merged just before the Newton release.

Change-Id: I8eb9c1ae93d84c63a061294fc570be1d9eed69ba
This commit is contained in:
Michał Dulko 2016-09-15 10:06:45 +02:00 committed by Michal Dulko
parent 27d78f39a0
commit 8a4aecb155
5 changed files with 185 additions and 63 deletions

View File

@ -198,6 +198,13 @@ class RPCAPI(object):
self.client = get_client(target, version_cap=rpc_version_cap,
serializer=serializer)
def _compat_ver(self, current, *legacy):
versions = (current,) + legacy
for version in versions[:-1]:
if self.client.can_send_version(version):
return version
return versions[-1]
@classmethod
def determine_rpc_version_cap(cls):
global LAST_RPC_VERSIONS

View File

@ -57,13 +57,9 @@ LOG = logging.getLogger(__name__)
class SchedulerManager(manager.Manager):
"""Chooses a host to create volumes."""
# FIXME(caosf): Remove unused argument 'topic' from functions
# create_consistencygroup(), create_volume(), migrate_volume_to_host(),
# retype() and manage_existing() in v3.0 of RPC API.
RPC_API_VERSION = scheduler_rpcapi.SchedulerAPI.RPC_API_VERSION
target = messaging.Target(version=RPC_API_VERSION)
target = messaging.Target(version='2.3')
def __init__(self, scheduler_driver=None, service_name=None,
*args, **kwargs):
@ -71,6 +67,7 @@ class SchedulerManager(manager.Manager):
scheduler_driver = CONF.scheduler_driver
self.driver = importutils.import_object(scheduler_driver)
super(SchedulerManager, self).__init__(*args, **kwargs)
self.additional_endpoints.append(_SchedulerV3Proxy(self))
self._startup_delay = True
def init_host_with_rpc(self):
@ -356,3 +353,84 @@ class SchedulerManager(manager.Manager):
rpc.get_notifier("scheduler").error(context,
'scheduler.' + method,
payload)
# TODO(dulek): This goes away immediately in Ocata and is just present in
# Newton so that we can receive v2.x and v3.0 messages.
class _SchedulerV3Proxy(object):
target = messaging.Target(version='3.0')
def __init__(self, manager):
self.manager = manager
def update_service_capabilities(self, context, service_name=None,
host=None, capabilities=None, **kwargs):
return self.manager.update_service_capabilities(
context, service_name=service_name, host=host,
capabilities=capabilities, **kwargs)
def create_consistencygroup(self, context, group, request_spec_list=None,
filter_properties_list=None):
# NOTE(dulek): Second argument here is `topic` which is unused. We're
# getting rid of it in 3.0, hence it's missing from method signature.
return self.manager.create_consistencygroup(
context, None, group, request_spec_list=request_spec_list,
filter_properties_list=filter_properties_list)
def create_group(self, context, group, group_spec=None,
group_filter_properties=None, request_spec_list=None,
filter_properties_list=None):
# NOTE(dulek): Second argument here is `topic` which is unused. We're
# getting rid of it in 3.0, hence it's missing from method signature.
return self.manager.create_group(
context, None, group, group_spec=group_spec,
group_filter_properties=group_filter_properties,
request_spec_list=request_spec_list,
filter_properties_list=filter_properties_list)
def create_volume(self, context, volume, snapshot_id=None, image_id=None,
request_spec=None, filter_properties=None):
# NOTE(dulek): Second argument here is `topic`, which is unused. We're
# getting rid of it in 3.0, hence it's missing from method signature.
# We're also replacing volume_id with volume object (switched from
# optional keyword argument to positional argument).
return self.manager.create_volume(
context, None, volume.id, snapshot_id=snapshot_id,
image_id=image_id, request_spec=request_spec,
filter_properties=filter_properties, volume=volume)
def request_service_capabilities(self, context):
return self.manager.request_service_capabilities(context)
def migrate_volume_to_host(self, context, volume, host,
force_host_copy, request_spec,
filter_properties=None):
# NOTE(dulek): Second argument here is `topic` which is unused. We're
# getting rid of it in 3.0, hence it's missing from method signature.
# We're also replacing volume_id with volume object (switched from
# optional keyword argument to positional argument).
return self.manager.migrate_volume_to_host(
context, None, volume.id, host, force_host_copy, request_spec,
filter_propterties=filter_properties, volume=volume)
def retype(self, context, volume, request_spec, filter_properties=None):
# NOTE(dulek): Second argument here is `topic` which is unused. We're
# getting rid of it in 3.0, hence it's missing from method signature.
# We're also replacing volume_id with volume object (switched from
# optional keyword argument to positional argument).
return self.manager.retype(
context, None, volume.id, request_spec,
filter_properties=filter_properties, volume=volume)
def manage_existing(self, context, volume, request_spec,
filter_properties=None):
# NOTE(dulek): Second argument here is `topic` which is unused. We're
# getting rid of it in 3.0, hence it's missing from method signature.
# We're also replacing volume_id with volume object (switched from
# optional keyword argument to positional argument).
return self.manager.manage_existing(
context, None, volume.id, request_spec,
filter_properties=filter_properties, volume=volume)
def get_pools(self, context, filters=None):
return self.manager.get_pools(context, filters=filters)

View File

@ -52,38 +52,44 @@ class SchedulerAPI(rpc.RPCAPI):
2.1 - Adds support for sending objects over RPC in manage_existing()
2.2 - Sends request_spec as object in create_volume()
2.3 - Add create_group method
... Newton supports messaging 2.3. Any changes to existing methods in
2.x after this point should be done so that they can handle version cap
set to 2.3.
3.0 - Remove 2.x compatibility
"""
RPC_API_VERSION = '2.3'
RPC_API_VERSION = '3.0'
TOPIC = constants.SCHEDULER_TOPIC
BINARY = 'cinder-scheduler'
# FIXME(caosf): Remove unused argument 'topic' from functions
# create_consistencygroup(), create_volume(), migrate_volume_to_host(),
# retype() and manage_existing() in v3.0 of RPC API.
def create_consistencygroup(self, ctxt, topic, group,
request_spec_list=None,
filter_properties_list=None):
version = '2.1'
version = self._compat_ver('3.0', '2.0')
cctxt = self.client.prepare(version=version)
request_spec_p_list = []
for request_spec in request_spec_list:
request_spec_p = jsonutils.to_primitive(request_spec)
request_spec_p_list.append(request_spec_p)
return cctxt.cast(ctxt, 'create_consistencygroup',
topic=topic,
group=group,
request_spec_list=request_spec_p_list,
filter_properties_list=filter_properties_list)
msg_args = {
'group': group, 'request_spec_list': request_spec_p_list,
'filter_properties_list': filter_properties_list,
}
if version == '2.0':
msg_args['topic'] = topic
return cctxt.cast(ctxt, 'create_consistencygroup', **msg_args)
def create_group(self, ctxt, topic, group,
group_spec=None,
request_spec_list=None,
group_filter_properties=None,
filter_properties_list=None):
version = '2.3'
version = self._compat_ver('3.0', '2.3')
cctxt = self.client.prepare(version=version)
request_spec_p_list = []
for request_spec in request_spec_list:
@ -91,26 +97,31 @@ class SchedulerAPI(rpc.RPCAPI):
request_spec_p_list.append(request_spec_p)
group_spec_p = jsonutils.to_primitive(group_spec)
return cctxt.cast(ctxt, 'create_group',
topic=topic,
group=group,
group_spec=group_spec_p,
request_spec_list=request_spec_p_list,
group_filter_properties=group_filter_properties,
filter_properties_list=filter_properties_list)
msg_args = {
'group': group, 'group_spec': group_spec_p,
'request_spec_list': request_spec_p_list,
'group_filter_properties': group_filter_properties,
'filter_properties_list': filter_properties_list,
}
if version == '2.3':
msg_args['topic'] = topic
return cctxt.cast(ctxt, 'create_group', **msg_args)
def create_volume(self, ctxt, topic, volume_id, snapshot_id=None,
image_id=None, request_spec=None,
filter_properties=None, volume=None):
request_spec_p = jsonutils.to_primitive(request_spec)
msg_args = {'topic': topic, 'volume_id': volume_id,
'snapshot_id': snapshot_id, 'image_id': image_id,
msg_args = {'snapshot_id': snapshot_id, 'image_id': image_id,
'request_spec': request_spec_p,
'filter_properties': filter_properties, 'volume': volume}
version = '2.2'
if not self.client.can_send_version('2.2'):
version = self._compat_ver('3.0', '2.2', '2.0')
if version in ('2.2', '2.0'):
msg_args['volume_id'] = volume.id
msg_args['topic'] = topic
if version == '2.0':
# Send request_spec as dict
version = '2.0'
msg_args['request_spec'] = jsonutils.to_primitive(request_spec)
cctxt = self.client.prepare(version=version)
@ -120,23 +131,29 @@ class SchedulerAPI(rpc.RPCAPI):
force_host_copy=False, request_spec=None,
filter_properties=None, volume=None):
request_spec_p = jsonutils.to_primitive(request_spec)
msg_args = {'topic': topic, 'volume_id': volume_id,
'host': host, 'force_host_copy': force_host_copy,
msg_args = {'host': host, 'force_host_copy': force_host_copy,
'request_spec': request_spec_p,
'filter_properties': filter_properties, 'volume': volume}
version = '2.0'
version = self._compat_ver('3.0', '2.0')
if version == '2.0':
msg_args['volume_id'] = volume.id
msg_args['topic'] = topic
cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'migrate_volume_to_host', **msg_args)
def retype(self, ctxt, topic, volume_id,
request_spec=None, filter_properties=None, volume=None):
def retype(self, ctxt, topic, volume_id, request_spec=None,
filter_properties=None, volume=None):
request_spec_p = jsonutils.to_primitive(request_spec)
msg_args = {'topic': topic, 'volume_id': volume_id,
'request_spec': request_spec_p,
msg_args = {'request_spec': request_spec_p,
'filter_properties': filter_properties, 'volume': volume}
version = '2.0'
version = self._compat_ver('3.0', '2.0')
if version == '2.0':
msg_args['volume_id'] = volume.id
msg_args['topic'] = topic
cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'retype', **msg_args)
@ -146,19 +163,20 @@ class SchedulerAPI(rpc.RPCAPI):
volume=None):
request_spec_p = jsonutils.to_primitive(request_spec)
msg_args = {
'topic': topic, 'volume_id': volume_id,
'request_spec': request_spec_p,
'filter_properties': filter_properties, 'volume': volume,
}
version = '2.1'
if not self.client.can_send_version('2.1'):
version = '2.0'
version = self._compat_ver('3.0', '2.1', '2.0')
if version in ('2.1', '2.0'):
msg_args['volume_id'] = volume.id
msg_args['topic'] = topic
if version == '2.0':
msg_args.pop('volume')
cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'manage_existing', **msg_args)
def get_pools(self, ctxt, filters=None):
version = '2.0'
version = self._compat_ver('3.0', '2.0')
cctxt = self.client.prepare(version=version)
return cctxt.call(ctxt, 'get_pools',
filters=filters)
@ -166,7 +184,7 @@ class SchedulerAPI(rpc.RPCAPI):
def update_service_capabilities(self, ctxt,
service_name, host,
capabilities):
version = '2.0'
version = self._compat_ver('3.0', '2.0')
cctxt = self.client.prepare(fanout=True, version=version)
cctxt.cast(ctxt, 'update_service_capabilities',
service_name=service_name, host=host,

View File

@ -25,6 +25,8 @@ import mock
from cinder import context
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import test
from cinder.tests.unit import fake_constants
from cinder.tests.unit import fake_volume
@ddt.ddt
@ -32,13 +34,17 @@ class SchedulerRpcAPITestCase(test.TestCase):
def setUp(self):
super(SchedulerRpcAPITestCase, self).setUp()
self.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
self.context = context.RequestContext('fake_user', 'fake_project')
self.volume_id = fake_constants.VOLUME_ID
def tearDown(self):
super(SchedulerRpcAPITestCase, self).tearDown()
def _test_scheduler_api(self, method, rpc_method,
fanout=False, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
ctxt = self.context
rpcapi = scheduler_rpcapi.SchedulerAPI()
expected_retval = 'foo' if rpc_method == 'call' else None
@ -84,21 +90,22 @@ class SchedulerRpcAPITestCase(test.TestCase):
host='fake_host',
capabilities='fake_capabilities',
fanout=True,
version='2.0')
version='3.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
def test_create_volume(self, can_send_version):
self._test_scheduler_api('create_volume',
rpc_method='cast',
topic='topic',
volume_id='volume_id',
volume_id=self.volume_id,
snapshot_id='snapshot_id',
image_id='image_id',
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
version='2.2')
can_send_version.assert_has_calls([mock.call('2.2')])
volume=fake_volume.fake_volume_obj(
self.context),
version='3.0')
can_send_version.assert_has_calls([mock.call('3.0')])
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
@ -106,36 +113,39 @@ class SchedulerRpcAPITestCase(test.TestCase):
self._test_scheduler_api('create_volume',
rpc_method='cast',
topic='topic',
volume_id='volume_id',
volume_id=self.volume_id,
snapshot_id='snapshot_id',
image_id='image_id',
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
volume=fake_volume.fake_volume_obj(
self.context),
version='2.0')
can_send_version.assert_has_calls([mock.call('2.2')])
can_send_version.assert_has_calls([mock.call('3.0'), mock.call('2.2')])
def test_migrate_volume_to_host(self):
self._test_scheduler_api('migrate_volume_to_host',
rpc_method='cast',
topic='topic',
volume_id='volume_id',
volume_id=self.volume_id,
host='host',
force_host_copy=True,
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
version='2.0')
volume=fake_volume.fake_volume_obj(
self.context),
version='3.0')
def test_retype(self):
self._test_scheduler_api('retype',
rpc_method='cast',
topic='topic',
volume_id='volume_id',
volume_id=self.volume_id,
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
version='2.0')
volume=fake_volume.fake_volume_obj(
self.context),
version='3.0')
@ddt.data('2.0', '2.1')
@mock.patch('oslo_messaging.RPCClient.can_send_version')
@ -144,18 +154,19 @@ class SchedulerRpcAPITestCase(test.TestCase):
self._test_scheduler_api('manage_existing',
rpc_method='cast',
topic='topic',
volume_id='volume_id',
volume_id=self.volume_id,
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
volume=fake_volume.fake_volume_obj(
self.context),
version=version)
can_send_version.assert_called_with('2.1')
can_send_version.assert_has_calls([mock.call('3.0'), mock.call('2.1')])
def test_get_pools(self):
self._test_scheduler_api('get_pools',
rpc_method='call',
filters=None,
version='2.0')
version='3.0')
def test_create_group(self):
self._test_scheduler_api('create_group',
@ -168,4 +179,4 @@ class SchedulerRpcAPITestCase(test.TestCase):
'fake_group_filter_properties',
filter_properties_list=
['fake_filter_properties_list'],
version='2.3')
version='3.0')

View File

@ -0,0 +1,8 @@
---
upgrade:
- Deployments doing continuous live upgrades from master branch should not
upgrade into Ocata before doing an upgrade which includes all the Newton's
RPC API version bump commits (scheduler, volume). If you're upgrading
deployment in a release-to-release manner, then you can safely ignore this
note.