Use oslo.messaging topics for multibackend
We've built our multibackend support by abusing oslo.messaging's Target.server and appending '@backend-name' prefix to the hostname. This made implementation easier, as we're simply treating multibackends as totally separated services. While this worked in RabbitMQ, zmq is communicating explicitly using hostnames, so appending anything to hostname in Target.server breaks communication. This commit modifies the messaging layer of cinder-volume to use Target.topic to distinguish backends. This is done by: * Making cinder-volume listen on new RPC server, with Target.server set to raw hostname, and topic is set to 'cinder-volume.host@backend'. 'cinder-volume' prefix is added to keep compatibility with Newton's services (we're relying on how RabbitMQ transport is implemented in oslo.messaging). * Note that old RPC server listening on 'cinder-volume' topic is left there, as we need it to recieve fanout messages from scheduler. * When sending a message to cinder-volume, we're sending it using Target.topic to route it to correct host and backend. For backward compatibility it's controlled by conditional based on RPC version pin. Closes-Bug: 1630975 Related-Bug: 1440631 Implements: cinder-zeromq-support Change-Id: I22efbeb97e11838139e2b33226d1c10094d27c1d
This commit is contained in:
parent
b3e96a58f0
commit
df647d0ccd
@ -38,6 +38,7 @@ profiler_opts = importutils.try_import('osprofiler.opts')
|
||||
|
||||
|
||||
from cinder.backup import rpcapi as backup_rpcapi
|
||||
from cinder.common import constants
|
||||
from cinder import context
|
||||
from cinder import coordination
|
||||
from cinder import exception
|
||||
@ -48,6 +49,7 @@ from cinder import rpc
|
||||
from cinder.scheduler import rpcapi as scheduler_rpcapi
|
||||
from cinder import version
|
||||
from cinder.volume import rpcapi as volume_rpcapi
|
||||
from cinder.volume import utils as vol_utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -213,6 +215,7 @@ class Service(service.Service):
|
||||
|
||||
setup_profiler(binary, host)
|
||||
self.rpcserver = None
|
||||
self.backend_rpcserver = None
|
||||
self.cluster_rpcserver = None
|
||||
|
||||
# TODO(geguileo): Remove method in O since it will no longer be used.
|
||||
@ -241,16 +244,30 @@ class Service(service.Service):
|
||||
LOG.debug("Creating RPC server for service %s", self.topic)
|
||||
|
||||
ctxt = context.get_admin_context()
|
||||
target = messaging.Target(topic=self.topic, server=self.host)
|
||||
endpoints = [self.manager]
|
||||
endpoints.extend(self.manager.additional_endpoints)
|
||||
obj_version_cap = objects.Service.get_minimum_obj_version(ctxt)
|
||||
LOG.debug("Pinning object versions for RPC server serializer to %s",
|
||||
obj_version_cap)
|
||||
serializer = objects_base.CinderObjectSerializer(obj_version_cap)
|
||||
|
||||
target = messaging.Target(topic=self.topic, server=self.host)
|
||||
self.rpcserver = rpc.get_server(target, endpoints, serializer)
|
||||
self.rpcserver.start()
|
||||
|
||||
# NOTE(dulek): Kids, don't do that at home. We're relying here on
|
||||
# oslo.messaging implementation details to keep backward compatibility
|
||||
# with pre-Ocata services. This will not matter once we drop
|
||||
# compatibility with them.
|
||||
if self.topic == constants.VOLUME_TOPIC:
|
||||
target = messaging.Target(
|
||||
topic='%(topic)s.%(host)s' % {'topic': self.topic,
|
||||
'host': self.host},
|
||||
server=vol_utils.extract_host(self.host, 'host'))
|
||||
self.backend_rpcserver = rpc.get_server(target, endpoints,
|
||||
serializer)
|
||||
self.backend_rpcserver.start()
|
||||
|
||||
# TODO(geguileo): In O - Remove the is_svc_upgrading_to_n part
|
||||
if self.cluster and not self.is_svc_upgrading_to_n(self.binary):
|
||||
LOG.info(_LI('Starting %(topic)s cluster %(cluster)s (version '
|
||||
@ -393,6 +410,8 @@ class Service(service.Service):
|
||||
# errors, go ahead and ignore them.. as we're shutting down anyway
|
||||
try:
|
||||
self.rpcserver.stop()
|
||||
if self.backend_rpcserver:
|
||||
self.backend_rpcserver.stop()
|
||||
if self.cluster_rpcserver:
|
||||
self.cluster_rpcserver.stop()
|
||||
except Exception:
|
||||
@ -422,6 +441,8 @@ class Service(service.Service):
|
||||
pass
|
||||
if self.rpcserver:
|
||||
self.rpcserver.wait()
|
||||
if self.backend_rpcserver:
|
||||
self.backend_rpcserver.wait()
|
||||
if self.cluster_rpcserver:
|
||||
self.cluster_rpcserver.wait()
|
||||
super(Service, self).wait()
|
||||
|
@ -117,6 +117,10 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
|
||||
self.addCleanup(self._cleanup)
|
||||
|
||||
self.can_send_version_mock = self.patch(
|
||||
'oslo_messaging.RPCClient.can_send_version',
|
||||
return_value=True)
|
||||
|
||||
def _cleanup(self):
|
||||
self.fake_snapshot.destroy()
|
||||
self.fake_volume_obj.destroy()
|
||||
@ -203,8 +207,9 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
elif 'cgsnapshot' in kwargs:
|
||||
host = kwargs['cgsnapshot'].consistencygroup.service_topic_queue
|
||||
|
||||
target['server'] = utils.extract_host(host)
|
||||
target['topic'] = '%s.%s' % (constants.VOLUME_TOPIC, host)
|
||||
target['server'] = utils.extract_host(host, 'host')
|
||||
target['topic'] = '%s.%s' % (constants.VOLUME_TOPIC,
|
||||
utils.extract_host(host))
|
||||
|
||||
self.fake_args = None
|
||||
self.fake_kwargs = None
|
||||
@ -276,8 +281,9 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
elif 'group_snapshot' in kwargs:
|
||||
host = kwargs['group_snapshot'].service_topic_queue
|
||||
|
||||
target['server'] = utils.extract_host(host)
|
||||
target['topic'] = '%s.%s' % (constants.VOLUME_TOPIC, host)
|
||||
target['server'] = utils.extract_host(host, 'host')
|
||||
target['topic'] = '%s.%s' % (constants.VOLUME_TOPIC,
|
||||
utils.extract_host(host))
|
||||
|
||||
self.fake_args = None
|
||||
self.fake_kwargs = None
|
||||
@ -416,9 +422,8 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
version='3.0')
|
||||
|
||||
@ddt.data('3.0', '3.3')
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version')
|
||||
def test_attach_volume_to_instance(self, version, can_send_version):
|
||||
can_send_version.return_value = (version == '3.3')
|
||||
def test_attach_volume_to_instance(self, version):
|
||||
self.can_send_version_mock.return_value = (version == '3.3')
|
||||
self._test_volume_api('attach_volume',
|
||||
rpc_method='call',
|
||||
volume=self.fake_volume_obj,
|
||||
@ -429,9 +434,8 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
version=version)
|
||||
|
||||
@ddt.data('3.0', '3.3')
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version')
|
||||
def test_attach_volume_to_host(self, version, can_send_version):
|
||||
can_send_version.return_value = (version == '3.3')
|
||||
def test_attach_volume_to_host(self, version):
|
||||
self.can_send_version_mock.return_value = (version == '3.3')
|
||||
self._test_volume_api('attach_volume',
|
||||
rpc_method='call',
|
||||
volume=self.fake_volume_obj,
|
||||
@ -448,9 +452,8 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
self.fake_src_cg.obj_reset_changes(['my_cluster'])
|
||||
|
||||
@ddt.data('3.0', '3.3')
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version')
|
||||
def test_attach_volume_to_cluster(self, version, can_send_version):
|
||||
can_send_version.return_value = (version == '3.3')
|
||||
def test_attach_volume_to_cluster(self, version):
|
||||
self.can_send_version_mock.return_value = (version == '3.3')
|
||||
self._set_cluster()
|
||||
self._test_volume_api('attach_volume',
|
||||
rpc_method='call',
|
||||
@ -462,9 +465,8 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
version=version)
|
||||
|
||||
@ddt.data('3.0', '3.4')
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version')
|
||||
def test_detach_volume(self, version, can_send_version):
|
||||
can_send_version.return_value = (version == '3.4')
|
||||
def test_detach_volume(self, version):
|
||||
self.can_send_version_mock.return_value = (version == '3.4')
|
||||
self._test_volume_api('detach_volume',
|
||||
rpc_method='call',
|
||||
volume=self.fake_volume_obj,
|
||||
@ -472,9 +474,8 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
version=version)
|
||||
|
||||
@ddt.data('3.0', '3.4')
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version')
|
||||
def test_detach_volume_cluster(self, version, can_send_version):
|
||||
can_send_version.return_value = (version == '3.4')
|
||||
def test_detach_volume_cluster(self, version):
|
||||
self.can_send_version_mock.return_value = (version == '3.4')
|
||||
self._set_cluster()
|
||||
self._test_volume_api('detach_volume',
|
||||
rpc_method='call',
|
||||
@ -500,8 +501,7 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
connector='fake_connector',
|
||||
version='3.0')
|
||||
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
|
||||
def test_initialize_connection_cluster(self, mock_can_send_version):
|
||||
def test_initialize_connection_cluster(self):
|
||||
self._set_cluster()
|
||||
self._test_volume_api('initialize_connection',
|
||||
rpc_method='call',
|
||||
@ -676,12 +676,10 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
version='3.2')
|
||||
|
||||
@ddt.data(None, 'mycluster')
|
||||
@mock.patch('oslo_messaging.RPCClient.can_send_version',
|
||||
return_value=False)
|
||||
@mock.patch('cinder.objects.backup.BackupDeviceInfo.from_primitive',
|
||||
return_value={})
|
||||
def test_get_backup_device_old(self, cluster_name, mock_from_primitive,
|
||||
mock_can_send_version):
|
||||
def test_get_backup_device_old(self, cluster_name, mock_from_primitive):
|
||||
self.can_send_version_mock.return_value = False
|
||||
self._change_cluster_name(self.fake_volume_obj, cluster_name)
|
||||
self._test_volume_api('get_backup_device',
|
||||
rpc_method='call',
|
||||
|
@ -800,13 +800,6 @@ class VolumeUtilsTestCase(test.TestCase):
|
||||
volume_utils.extract_host,
|
||||
None)
|
||||
|
||||
def test_get_volume_rpc_host(self):
|
||||
host = 'Host@backend'
|
||||
# default level is 'backend'
|
||||
# check if host with backend is returned
|
||||
self.assertEqual(volume_utils.extract_host(host),
|
||||
volume_utils.get_volume_rpc_host(host))
|
||||
|
||||
def test_append_host(self):
|
||||
host = 'Host'
|
||||
pool = 'Pool'
|
||||
|
@ -116,16 +116,30 @@ class VolumeAPI(rpc.RPCAPI):
|
||||
3.3 - Adds support for sending objects over RPC in attach_volume().
|
||||
3.4 - Adds support for sending objects over RPC in detach_volume().
|
||||
3.5 - Adds support for cluster in retype and migrate_volume
|
||||
3.6 - Switch to use oslo.messaging topics to indicate backends instead
|
||||
of @backend suffixes in server names.
|
||||
"""
|
||||
|
||||
RPC_API_VERSION = '3.5'
|
||||
RPC_API_VERSION = '3.6'
|
||||
RPC_DEFAULT_VERSION = '3.0'
|
||||
TOPIC = constants.VOLUME_TOPIC
|
||||
BINARY = 'cinder-volume'
|
||||
|
||||
def _get_cctxt(self, host=None, version=None, **kwargs):
|
||||
if host is not None:
|
||||
kwargs['server'] = utils.get_volume_rpc_host(host)
|
||||
if host:
|
||||
server = utils.extract_host(host)
|
||||
|
||||
# TODO(dulek): If we're pinned before 3.6, we should send stuff the
|
||||
# old way - addressing server=host@backend, topic=cinder-volume.
|
||||
# Otherwise we're addressing server=host,
|
||||
# topic=cinder-volume.host@backend. This conditional can go away
|
||||
# when we stop supporting 3.x.
|
||||
if self.client.can_send_version('3.6'):
|
||||
kwargs['topic'] = '%(topic)s.%(host)s' % {'topic': self.TOPIC,
|
||||
'host': server}
|
||||
server = utils.extract_host(server, 'host')
|
||||
kwargs['server'] = server
|
||||
|
||||
return super(VolumeAPI, self)._get_cctxt(version=version, **kwargs)
|
||||
|
||||
def create_consistencygroup(self, ctxt, group):
|
||||
|
@ -725,14 +725,6 @@ def extract_host(host, level='backend', default_pool_name=False):
|
||||
return None
|
||||
|
||||
|
||||
def get_volume_rpc_host(host):
|
||||
if CONF.rpc_backend and CONF.rpc_backend == "zmq":
|
||||
# ZeroMQ RPC driver requires only the hostname.
|
||||
# So, return just that.
|
||||
return extract_host(host, 'host')
|
||||
return extract_host(host)
|
||||
|
||||
|
||||
def append_host(host, pool):
|
||||
"""Encode pool into host info."""
|
||||
if not host or not pool:
|
||||
|
@ -0,0 +1,3 @@
|
||||
---
|
||||
features:
|
||||
- Added support for ZMQ messaging layer in multibackend configuration.
|
Loading…
x
Reference in New Issue
Block a user