Merge "Use oslo.messaging topics for multibackend"

This commit is contained in:
Jenkins 2016-12-23 14:44:05 +00:00 committed by Gerrit Code Review
commit 942b9a4f90
6 changed files with 65 additions and 44 deletions

View File

@ -38,6 +38,7 @@ profiler_opts = importutils.try_import('osprofiler.opts')
from cinder.backup import rpcapi as backup_rpcapi
from cinder.common import constants
from cinder import context
from cinder import coordination
from cinder import exception
@ -48,6 +49,7 @@ from cinder import rpc
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import version
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as vol_utils
LOG = logging.getLogger(__name__)
@ -213,6 +215,7 @@ class Service(service.Service):
setup_profiler(binary, host)
self.rpcserver = None
self.backend_rpcserver = None
self.cluster_rpcserver = None
# TODO(geguileo): Remove method in O since it will no longer be used.
@ -241,16 +244,30 @@ class Service(service.Service):
LOG.debug("Creating RPC server for service %s", self.topic)
ctxt = context.get_admin_context()
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
endpoints.extend(self.manager.additional_endpoints)
obj_version_cap = objects.Service.get_minimum_obj_version(ctxt)
LOG.debug("Pinning object versions for RPC server serializer to %s",
obj_version_cap)
serializer = objects_base.CinderObjectSerializer(obj_version_cap)
target = messaging.Target(topic=self.topic, server=self.host)
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
# NOTE(dulek): Kids, don't do that at home. We're relying here on
# oslo.messaging implementation details to keep backward compatibility
# with pre-Ocata services. This will not matter once we drop
# compatibility with them.
if self.topic == constants.VOLUME_TOPIC:
target = messaging.Target(
topic='%(topic)s.%(host)s' % {'topic': self.topic,
'host': self.host},
server=vol_utils.extract_host(self.host, 'host'))
self.backend_rpcserver = rpc.get_server(target, endpoints,
serializer)
self.backend_rpcserver.start()
# TODO(geguileo): In O - Remove the is_svc_upgrading_to_n part
if self.cluster and not self.is_svc_upgrading_to_n(self.binary):
LOG.info(_LI('Starting %(topic)s cluster %(cluster)s (version '
@ -393,6 +410,8 @@ class Service(service.Service):
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.rpcserver.stop()
if self.backend_rpcserver:
self.backend_rpcserver.stop()
if self.cluster_rpcserver:
self.cluster_rpcserver.stop()
except Exception:
@ -422,6 +441,8 @@ class Service(service.Service):
pass
if self.rpcserver:
self.rpcserver.wait()
if self.backend_rpcserver:
self.backend_rpcserver.wait()
if self.cluster_rpcserver:
self.cluster_rpcserver.wait()
super(Service, self).wait()

View File

@ -117,6 +117,10 @@ class VolumeRpcAPITestCase(test.TestCase):
self.addCleanup(self._cleanup)
self.can_send_version_mock = self.patch(
'oslo_messaging.RPCClient.can_send_version',
return_value=True)
def _cleanup(self):
self.fake_snapshot.destroy()
self.fake_volume_obj.destroy()
@ -203,8 +207,9 @@ class VolumeRpcAPITestCase(test.TestCase):
elif 'cgsnapshot' in kwargs:
host = kwargs['cgsnapshot'].consistencygroup.service_topic_queue
target['server'] = utils.extract_host(host)
target['topic'] = '%s.%s' % (constants.VOLUME_TOPIC, host)
target['server'] = utils.extract_host(host, 'host')
target['topic'] = '%s.%s' % (constants.VOLUME_TOPIC,
utils.extract_host(host))
self.fake_args = None
self.fake_kwargs = None
@ -276,8 +281,9 @@ class VolumeRpcAPITestCase(test.TestCase):
elif 'group_snapshot' in kwargs:
host = kwargs['group_snapshot'].service_topic_queue
target['server'] = utils.extract_host(host)
target['topic'] = '%s.%s' % (constants.VOLUME_TOPIC, host)
target['server'] = utils.extract_host(host, 'host')
target['topic'] = '%s.%s' % (constants.VOLUME_TOPIC,
utils.extract_host(host))
self.fake_args = None
self.fake_kwargs = None
@ -416,9 +422,8 @@ class VolumeRpcAPITestCase(test.TestCase):
version='3.0')
@ddt.data('3.0', '3.3')
@mock.patch('oslo_messaging.RPCClient.can_send_version')
def test_attach_volume_to_instance(self, version, can_send_version):
can_send_version.return_value = (version == '3.3')
def test_attach_volume_to_instance(self, version):
self.can_send_version_mock.return_value = (version == '3.3')
self._test_volume_api('attach_volume',
rpc_method='call',
volume=self.fake_volume_obj,
@ -429,9 +434,8 @@ class VolumeRpcAPITestCase(test.TestCase):
version=version)
@ddt.data('3.0', '3.3')
@mock.patch('oslo_messaging.RPCClient.can_send_version')
def test_attach_volume_to_host(self, version, can_send_version):
can_send_version.return_value = (version == '3.3')
def test_attach_volume_to_host(self, version):
self.can_send_version_mock.return_value = (version == '3.3')
self._test_volume_api('attach_volume',
rpc_method='call',
volume=self.fake_volume_obj,
@ -448,9 +452,8 @@ class VolumeRpcAPITestCase(test.TestCase):
self.fake_src_cg.obj_reset_changes(['my_cluster'])
@ddt.data('3.0', '3.3')
@mock.patch('oslo_messaging.RPCClient.can_send_version')
def test_attach_volume_to_cluster(self, version, can_send_version):
can_send_version.return_value = (version == '3.3')
def test_attach_volume_to_cluster(self, version):
self.can_send_version_mock.return_value = (version == '3.3')
self._set_cluster()
self._test_volume_api('attach_volume',
rpc_method='call',
@ -462,9 +465,8 @@ class VolumeRpcAPITestCase(test.TestCase):
version=version)
@ddt.data('3.0', '3.4')
@mock.patch('oslo_messaging.RPCClient.can_send_version')
def test_detach_volume(self, version, can_send_version):
can_send_version.return_value = (version == '3.4')
def test_detach_volume(self, version):
self.can_send_version_mock.return_value = (version == '3.4')
self._test_volume_api('detach_volume',
rpc_method='call',
volume=self.fake_volume_obj,
@ -472,9 +474,8 @@ class VolumeRpcAPITestCase(test.TestCase):
version=version)
@ddt.data('3.0', '3.4')
@mock.patch('oslo_messaging.RPCClient.can_send_version')
def test_detach_volume_cluster(self, version, can_send_version):
can_send_version.return_value = (version == '3.4')
def test_detach_volume_cluster(self, version):
self.can_send_version_mock.return_value = (version == '3.4')
self._set_cluster()
self._test_volume_api('detach_volume',
rpc_method='call',
@ -500,8 +501,7 @@ class VolumeRpcAPITestCase(test.TestCase):
connector='fake_connector',
version='3.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
def test_initialize_connection_cluster(self, mock_can_send_version):
def test_initialize_connection_cluster(self):
self._set_cluster()
self._test_volume_api('initialize_connection',
rpc_method='call',
@ -676,12 +676,10 @@ class VolumeRpcAPITestCase(test.TestCase):
version='3.2')
@ddt.data(None, 'mycluster')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
@mock.patch('cinder.objects.backup.BackupDeviceInfo.from_primitive',
return_value={})
def test_get_backup_device_old(self, cluster_name, mock_from_primitive,
mock_can_send_version):
def test_get_backup_device_old(self, cluster_name, mock_from_primitive):
self.can_send_version_mock.return_value = False
self._change_cluster_name(self.fake_volume_obj, cluster_name)
self._test_volume_api('get_backup_device',
rpc_method='call',

View File

@ -800,13 +800,6 @@ class VolumeUtilsTestCase(test.TestCase):
volume_utils.extract_host,
None)
def test_get_volume_rpc_host(self):
host = 'Host@backend'
# default level is 'backend'
# check if host with backend is returned
self.assertEqual(volume_utils.extract_host(host),
volume_utils.get_volume_rpc_host(host))
def test_append_host(self):
host = 'Host'
pool = 'Pool'

View File

@ -116,16 +116,30 @@ class VolumeAPI(rpc.RPCAPI):
3.3 - Adds support for sending objects over RPC in attach_volume().
3.4 - Adds support for sending objects over RPC in detach_volume().
3.5 - Adds support for cluster in retype and migrate_volume
3.6 - Switch to use oslo.messaging topics to indicate backends instead
of @backend suffixes in server names.
"""
RPC_API_VERSION = '3.5'
RPC_API_VERSION = '3.6'
RPC_DEFAULT_VERSION = '3.0'
TOPIC = constants.VOLUME_TOPIC
BINARY = 'cinder-volume'
def _get_cctxt(self, host=None, version=None, **kwargs):
if host is not None:
kwargs['server'] = utils.get_volume_rpc_host(host)
if host:
server = utils.extract_host(host)
# TODO(dulek): If we're pinned before 3.6, we should send stuff the
# old way - addressing server=host@backend, topic=cinder-volume.
# Otherwise we're addressing server=host,
# topic=cinder-volume.host@backend. This conditional can go away
# when we stop supporting 3.x.
if self.client.can_send_version('3.6'):
kwargs['topic'] = '%(topic)s.%(host)s' % {'topic': self.TOPIC,
'host': server}
server = utils.extract_host(server, 'host')
kwargs['server'] = server
return super(VolumeAPI, self)._get_cctxt(version=version, **kwargs)
def create_consistencygroup(self, ctxt, group):

View File

@ -725,14 +725,6 @@ def extract_host(host, level='backend', default_pool_name=False):
return None
def get_volume_rpc_host(host):
if CONF.rpc_backend and CONF.rpc_backend == "zmq":
# ZeroMQ RPC driver requires only the hostname.
# So, return just that.
return extract_host(host, 'host')
return extract_host(host)
def append_host(host, pool):
"""Encode pool into host info."""
if not host or not pool:

View File

@ -0,0 +1,3 @@
---
features:
- Added support for ZMQ messaging layer in multibackend configuration.