Add cluster job distribution

This patch adds a new mechanism to distribute jobs to c-vol nodes with
the addition of the "cluster" configuration option.

DocImpact: New cluster configuration option
Specs: https://review.openstack.org/327283
Implements: blueprint cinder-volume-active-active-support
Change-Id: Id0a2e4c6a7b5f17e4ee3484ae1b53c03d472b586
This commit is contained in:
Gorka Eguileor 2016-05-23 14:24:08 +02:00
parent 625cab15b0
commit a5b4e34f94
11 changed files with 433 additions and 66 deletions

View File

@ -205,7 +205,7 @@ class BackupManager(manager.SchedulerDependentManager):
backup.fail_reason = err
backup.save()
def init_host(self):
def init_host(self, **kwargs):
"""Run initialization needed for a standalone service."""
ctxt = context.get_admin_context()

View File

@ -50,11 +50,21 @@ from cinder import utils
from cinder import version
CONF = cfg.CONF
deprecated_host_opt = cfg.DeprecatedOpt('host')
host_opt = cfg.StrOpt('backend_host', help='Backend override of host value.',
deprecated_opts=[deprecated_host_opt])
cfg.CONF.register_cli_opt(host_opt)
CONF = cfg.CONF
CONF.register_cli_opt(host_opt)
# TODO(geguileo): Once we complete the work on A-A update the option's help.
cluster_opt = cfg.StrOpt('cluster',
default=None,
help='Name of this cluster. Used to group volume '
'hosts that share the same backend '
'configurations to work in HA Active-Active '
'mode. Active-Active is not yet supported.')
CONF.register_opt(cluster_opt)
def main():
@ -75,11 +85,16 @@ def main():
CONF.register_opt(host_opt, group=backend)
backend_host = getattr(CONF, backend).backend_host
host = "%s@%s" % (backend_host or CONF.host, backend)
# We also want to set cluster to None on empty strings, and we
# ignore leading and trailing spaces.
cluster = CONF.cluster and CONF.cluster.strip()
cluster = (cluster or None) and '%s@%s' % (cluster, backend)
try:
server = service.Service.create(host=host,
service_name=backend,
binary='cinder-volume',
coordination=True)
coordination=True,
cluster=cluster)
except Exception:
msg = _('Volume service %s failed to start.') % host
LOG.exception(msg)
@ -96,7 +111,8 @@ def main():
'Support for DEFAULT section to configure drivers '
'will be removed in the next release.'))
server = service.Service.create(binary='cinder-volume',
coordination=True)
coordination=True,
cluster=CONF.cluster)
launcher.launch_service(server)
service_started = True

View File

@ -80,10 +80,11 @@ class Manager(base.Base, PeriodicTasks):
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, host=None, db_driver=None):
def __init__(self, host=None, db_driver=None, cluster=None):
if not host:
host = CONF.host
self.host = host
self.cluster = cluster
self.additional_endpoints = []
super(Manager, self).__init__(db_driver)
@ -91,13 +92,17 @@ class Manager(base.Base, PeriodicTasks):
"""Tasks to be run at a periodic interval."""
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def init_host(self):
def init_host(self, added_to_cluster=None):
"""Handle initialization if this is a standalone service.
A hook point for services to execute tasks before the services are made
available (i.e. showing up on RPC and starting to accept RPC calls) to
other components. Child classes should override this method.
:param added_to_cluster: True when a host's cluster configuration has
changed from not being defined or being '' to
any other value and the DB service record
reflects this new value.
"""
pass
@ -140,12 +145,14 @@ class SchedulerDependentManager(Manager):
"""
def __init__(self, host=None, db_driver=None, service_name='undefined'):
def __init__(self, host=None, db_driver=None, service_name='undefined',
cluster=None):
self.last_capabilities = None
self.service_name = service_name
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
self._tp = greenpool.GreenPool()
super(SchedulerDependentManager, self).__init__(host, db_driver)
super(SchedulerDependentManager, self).__init__(host, db_driver,
cluster=cluster)
def update_service_capabilities(self, capabilities):
"""Remember these capabilities to send on next periodic update."""

View File

@ -207,6 +207,7 @@ def list_opts():
('DEFAULT',
itertools.chain(
cinder_backup_driver.service_opts,
[cinder_cmd_volume.cluster_opt],
cinder_api_common.api_common_opts,
cinder_backup_drivers_ceph.service_opts,
cinder_volume_drivers_smbfs.volume_opts,

View File

@ -191,49 +191,51 @@ class RPCAPI(object):
def __init__(self):
target = messaging.Target(topic=self.TOPIC,
version=self.RPC_API_VERSION)
obj_version_cap = self._determine_obj_version_cap()
obj_version_cap = self.determine_obj_version_cap()
serializer = base.CinderObjectSerializer(obj_version_cap)
rpc_version_cap = self._determine_rpc_version_cap()
rpc_version_cap = self.determine_rpc_version_cap()
self.client = get_client(target, version_cap=rpc_version_cap,
serializer=serializer)
def _determine_rpc_version_cap(self):
@classmethod
def determine_rpc_version_cap(cls):
global LAST_RPC_VERSIONS
if self.BINARY in LAST_RPC_VERSIONS:
return LAST_RPC_VERSIONS[self.BINARY]
if cls.BINARY in LAST_RPC_VERSIONS:
return LAST_RPC_VERSIONS[cls.BINARY]
version_cap = objects.Service.get_minimum_rpc_version(
cinder.context.get_admin_context(), self.BINARY)
cinder.context.get_admin_context(), cls.BINARY)
if version_cap == 'liberty':
# NOTE(dulek): This means that one of the services is Liberty,
# we should cap to it's RPC version.
version_cap = LIBERTY_RPC_VERSIONS[self.BINARY]
version_cap = LIBERTY_RPC_VERSIONS[cls.BINARY]
elif not version_cap:
# If there is no service we assume they will come up later and will
# have the same version as we do.
version_cap = self.RPC_API_VERSION
version_cap = cls.RPC_API_VERSION
LOG.info(_LI('Automatically selected %(binary)s RPC version '
'%(version)s as minimum service version.'),
{'binary': self.BINARY, 'version': version_cap})
LAST_RPC_VERSIONS[self.BINARY] = version_cap
{'binary': cls.BINARY, 'version': version_cap})
LAST_RPC_VERSIONS[cls.BINARY] = version_cap
return version_cap
def _determine_obj_version_cap(self):
@classmethod
def determine_obj_version_cap(cls):
global LAST_OBJ_VERSIONS
if self.BINARY in LAST_OBJ_VERSIONS:
return LAST_OBJ_VERSIONS[self.BINARY]
if cls.BINARY in LAST_OBJ_VERSIONS:
return LAST_OBJ_VERSIONS[cls.BINARY]
version_cap = objects.Service.get_minimum_obj_version(
cinder.context.get_admin_context(), self.BINARY)
cinder.context.get_admin_context(), cls.BINARY)
# If there is no service we assume they will come up later and will
# have the same version as we do.
if not version_cap:
version_cap = base.OBJ_VERSIONS.get_current()
LOG.info(_LI('Automatically selected %(binary)s objects version '
'%(version)s as minimum service version.'),
{'binary': self.BINARY, 'version': version_cap})
LAST_OBJ_VERSIONS[self.BINARY] = version_cap
{'binary': cls.BINARY, 'version': version_cap})
LAST_OBJ_VERSIONS[cls.BINARY] = version_cap
return version_cap

View File

@ -36,6 +36,8 @@ profiler = importutils.try_import('osprofiler.profiler')
osprofiler_web = importutils.try_import('osprofiler.web')
profiler_opts = importutils.try_import('osprofiler.opts')
from cinder.backup import rpcapi as backup_rpcapi
from cinder import context
from cinder import coordination
from cinder import exception
@ -43,7 +45,9 @@ from cinder.i18n import _, _LE, _LI, _LW
from cinder import objects
from cinder.objects import base as objects_base
from cinder import rpc
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import version
from cinder.volume import rpcapi as volume_rpcapi
LOG = logging.getLogger(__name__)
@ -116,12 +120,14 @@ class Service(service.Service):
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None,
service_name=None, coordination=False, *args, **kwargs):
service_name=None, coordination=False, cluster=None, *args,
**kwargs):
super(Service, self).__init__()
if not rpc.initialized():
rpc.init(CONF)
self.cluster = cluster
self.host = host
self.binary = binary
self.topic = topic
@ -133,21 +139,61 @@ class Service(service.Service):
# NOTE(geguileo): We need to create the Service DB entry before we
# create the manager, otherwise capped versions for serializer and rpc
# client would used existing DB entries not including us, which could
# client would use existing DB entries not including us, which could
# result in us using None (if it's the first time the service is run)
# or an old version (if this is a normal upgrade of a single service).
ctxt = context.get_admin_context()
self.is_upgrading_to_n = self.is_svc_upgrading_to_n(binary)
try:
service_ref = objects.Service.get_by_args(ctxt, host, binary)
service_ref.rpc_current_version = manager_class.RPC_API_VERSION
obj_version = objects_base.OBJ_VERSIONS.get_current()
service_ref.object_current_version = obj_version
# TODO(geguileo): In O we can remove the service upgrading part on
# the next equation, because by then all our services will be
# properly setting the cluster during volume migrations since
# they'll have the new Volume ORM model. But until then we can
# only set the cluster in the DB and pass added_to_cluster to
# init_host when we have completed the rolling upgrade from M to N.
# added_to_cluster attribute marks when we consider that we have
# just added a host to a cluster so we can include resources into
# that cluster. We consider that we have added the host when we
# didn't have data in the cluster DB field and our current
# configuration has a cluster value. We don't want to do anything
# automatic if the cluster is changed, in those cases we'll want
# to use cinder manage command and to it manually.
self.added_to_cluster = (not service_ref.cluster_name and cluster
and not self.is_upgrading_to_n)
# TODO(geguileo): In O - Remove self.is_upgrading_to_n part
if (service_ref.cluster_name != cluster and
not self.is_upgrading_to_n):
LOG.info(_LI('This service has been moved from cluster '
'%(cluster_svc)s to %(cluster_cfg)s. Resources '
'will %(opt_no)sbe moved to the new cluster'),
{'cluster_svc': service_ref.cluster_name,
'cluster_cfg': cluster,
'opt_no': '' if self.added_to_cluster else 'NO '})
if self.added_to_cluster:
# We pass copy service's disable status in the cluster if we
# have to create it.
self._ensure_cluster_exists(ctxt, service_ref.disabled)
service_ref.cluster_name = cluster
service_ref.save()
self.service_id = service_ref.id
except exception.NotFound:
# We don't want to include cluster information on the service or
# create the cluster entry if we are upgrading.
self._create_service_ref(ctxt, manager_class.RPC_API_VERSION)
# TODO(geguileo): In O set added_to_cluster to True
# We don't want to include resources in the cluster during the
# start while we are still doing the rolling upgrade.
self.added_to_cluster = not self.is_upgrading_to_n
self.manager = manager_class(host=self.host,
cluster=self.cluster,
service_name=service_name,
*args, **kwargs)
self.report_interval = report_interval
@ -159,6 +205,18 @@ class Service(service.Service):
setup_profiler(binary, host)
self.rpcserver = None
self.cluster_rpcserver = None
# TODO(geguileo): Remove method in O since it will no longer be used.
@staticmethod
def is_svc_upgrading_to_n(binary):
"""Given an RPC API class determine if the service is upgrading."""
rpcapis = {'cinder-scheduler': scheduler_rpcapi.SchedulerAPI,
'cinder-volume': volume_rpcapi.VolumeAPI,
'cinder-backup': backup_rpcapi.BackupAPI}
rpc_api = rpcapis[binary]
# If we are pinned to 1.3, then we are upgrading from M to N
return rpc_api.determine_obj_version_cap() == '1.3'
def start(self):
version_string = version.version_string()
@ -169,7 +227,7 @@ class Service(service.Service):
if self.coordination:
coordination.COORDINATOR.start()
self.manager.init_host()
self.manager.init_host(added_to_cluster=self.added_to_cluster)
LOG.debug("Creating RPC server for service %s", self.topic)
@ -180,6 +238,18 @@ class Service(service.Service):
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
# TODO(geguileo): In O - Remove the is_svc_upgrading_to_n part
if self.cluster and not self.is_svc_upgrading_to_n(self.binary):
LOG.info(_LI('Starting %(topic)s cluster %(cluster)s (version '
'%(version)s)'),
{'topic': self.topic, 'version': version_string,
'cluster': self.cluster})
target = messaging.Target(topic=self.topic, server=self.cluster)
serializer = objects_base.CinderObjectSerializer()
self.cluster_rpcserver = rpc.get_server(target, endpoints,
serializer)
self.cluster_rpcserver.start()
self.manager.init_host_with_rpc()
if self.report_interval:
@ -218,6 +288,25 @@ class Service(service.Service):
'new_down_time': new_down_time})
CONF.set_override('service_down_time', new_down_time)
def _ensure_cluster_exists(self, context, disabled=None):
if self.cluster:
try:
objects.Cluster.get_by_id(context, None, name=self.cluster,
binary=self.binary)
except exception.ClusterNotFound:
cluster = objects.Cluster(context=context, name=self.cluster,
binary=self.binary)
# If disabled has been specified overwrite default value
if disabled is not None:
cluster.disabled = disabled
try:
cluster.create()
# Race condition occurred and another service created the
# cluster, so we can continue as it already exists.
except exception.ClusterExists:
pass
def _create_service_ref(self, context, rpc_version=None):
zone = CONF.storage_availability_zone
kwargs = {
@ -229,9 +318,16 @@ class Service(service.Service):
'rpc_current_version': rpc_version or self.manager.RPC_API_VERSION,
'object_current_version': objects_base.OBJ_VERSIONS.get_current(),
}
# TODO(geguileo): In O unconditionally set cluster_name like above
# If we are upgrading we have to ignore the cluster value
if not self.is_upgrading_to_n:
kwargs['cluster_name'] = self.cluster
service_ref = objects.Service(context=context, **kwargs)
service_ref.create()
self.service_id = service_ref.id
# TODO(geguileo): In O unconditionally ensure that the cluster exists
if not self.is_upgrading_to_n:
self._ensure_cluster_exists(context)
def __getattr__(self, key):
manager = self.__dict__.get('manager', None)
@ -241,7 +337,7 @@ class Service(service.Service):
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_interval=None,
periodic_fuzzy_delay=None, service_name=None,
coordination=False):
coordination=False, cluster=None):
"""Instantiates class and passes back application object.
:param host: defaults to CONF.host
@ -251,6 +347,7 @@ class Service(service.Service):
:param report_interval: defaults to CONF.report_interval
:param periodic_interval: defaults to CONF.periodic_interval
:param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
:param cluster: Defaults to None, as only some services will have it
"""
if not host:
@ -273,7 +370,8 @@ class Service(service.Service):
periodic_interval=periodic_interval,
periodic_fuzzy_delay=periodic_fuzzy_delay,
service_name=service_name,
coordination=coordination)
coordination=coordination,
cluster=cluster)
return service_obj
@ -282,6 +380,8 @@ class Service(service.Service):
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.rpcserver.stop()
if self.cluster_rpcserver:
self.cluster_rpcserver.stop()
except Exception:
pass
@ -309,6 +409,8 @@ class Service(service.Service):
pass
if self.rpcserver:
self.rpcserver.wait()
if self.cluster_rpcserver:
self.cluster_rpcserver.wait()
super(Service, self).wait()
def periodic_tasks(self, raise_on_error=False):

View File

@ -346,7 +346,8 @@ class TestCinderVolumeCmd(test.TestCase):
monkey_patch.assert_called_once_with()
get_launcher.assert_called_once_with()
service_create.assert_called_once_with(binary='cinder-volume',
coordination=True)
coordination=True,
cluster=None)
launcher.launch_service.assert_called_once_with(server)
launcher.wait.assert_called_once_with()
@ -369,9 +370,11 @@ class TestCinderVolumeCmd(test.TestCase):
monkey_patch.assert_called_once_with()
get_launcher.assert_called_once_with()
c1 = mock.call(binary='cinder-volume', host='host@backend1',
service_name='backend1', coordination=True)
service_name='backend1', coordination=True,
cluster=None)
c2 = mock.call(binary='cinder-volume', host='host@backend2',
service_name='backend2', coordination=True)
service_name='backend2', coordination=True,
cluster=None)
service_create.assert_has_calls([c1, c2])
self.assertEqual(len(backends), launcher.launch_service.call_count)
launcher.wait.assert_called_once_with()

View File

@ -19,6 +19,7 @@
Unit Tests for remote procedure calls using queue
"""
import ddt
import mock
from oslo_concurrency import processutils
from oslo_config import cfg
@ -51,9 +52,10 @@ CONF.register_opts(test_service_opts)
class FakeManager(manager.Manager):
"""Fake manager for tests."""
def __init__(self, host=None,
db_driver=None, service_name=None):
db_driver=None, service_name=None, cluster=None):
super(FakeManager, self).__init__(host=host,
db_driver=db_driver)
db_driver=db_driver,
cluster=cluster)
def test_method(self):
return 'manager'
@ -67,7 +69,9 @@ class ExtendedService(service.Service):
class ServiceManagerTestCase(test.TestCase):
"""Test cases for Services."""
def test_message_gets_to_manager(self):
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
return_value=False)
def test_message_gets_to_manager(self, is_upgrading_mock):
serv = service.Service('test',
'test',
'test',
@ -75,7 +79,9 @@ class ServiceManagerTestCase(test.TestCase):
serv.start()
self.assertEqual('manager', serv.test_method())
def test_override_manager_method(self):
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
return_value=False)
def test_override_manager_method(self, is_upgrading_mock):
serv = ExtendedService('test',
'test',
'test',
@ -83,9 +89,11 @@ class ServiceManagerTestCase(test.TestCase):
serv.start()
self.assertEqual('service', serv.test_method())
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
return_value=False)
@mock.patch('cinder.rpc.LAST_OBJ_VERSIONS', {'test': '1.5'})
@mock.patch('cinder.rpc.LAST_RPC_VERSIONS', {'test': '1.3'})
def test_reset(self):
def test_reset(self, is_upgrading_mock):
serv = service.Service('test',
'test',
'test',
@ -97,29 +105,45 @@ class ServiceManagerTestCase(test.TestCase):
class ServiceFlagsTestCase(test.TestCase):
def test_service_enabled_on_create_based_on_flag(self):
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
return_value=False)
def test_service_enabled_on_create_based_on_flag(self,
is_upgrading_mock=False):
ctxt = context.get_admin_context()
self.flags(enable_new_services=True)
host = 'foo'
binary = 'cinder-fake'
app = service.Service.create(host=host, binary=binary)
app.start()
app.stop()
ref = db.service_get(context.get_admin_context(), app.service_id)
db.service_destroy(context.get_admin_context(), app.service_id)
self.assertFalse(ref['disabled'])
cluster = 'cluster'
app = service.Service.create(host=host, binary=binary, cluster=cluster)
ref = db.service_get(ctxt, app.service_id)
db.service_destroy(ctxt, app.service_id)
self.assertFalse(ref.disabled)
def test_service_disabled_on_create_based_on_flag(self):
# Check that the cluster is also enabled
db_cluster = objects.ClusterList.get_all(ctxt)[0]
self.assertFalse(db_cluster.disabled)
db.cluster_destroy(ctxt, db_cluster.id)
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
return_value=False)
def test_service_disabled_on_create_based_on_flag(self, is_upgrading_mock):
ctxt = context.get_admin_context()
self.flags(enable_new_services=False)
host = 'foo'
binary = 'cinder-fake'
app = service.Service.create(host=host, binary=binary)
app.start()
app.stop()
ref = db.service_get(context.get_admin_context(), app.service_id)
db.service_destroy(context.get_admin_context(), app.service_id)
self.assertTrue(ref['disabled'])
cluster = 'cluster'
app = service.Service.create(host=host, binary=binary, cluster=cluster)
ref = db.service_get(ctxt, app.service_id)
db.service_destroy(ctxt, app.service_id)
self.assertTrue(ref.disabled)
# Check that the cluster is also enabled
db_cluster = objects.ClusterList.get_all(ctxt)[0]
self.assertTrue(db_cluster.disabled)
db.cluster_destroy(ctxt, db_cluster.id)
@ddt.ddt
class ServiceTestCase(test.TestCase):
"""Test cases for Services."""
@ -134,24 +158,113 @@ class ServiceTestCase(test.TestCase):
'report_count': 0,
'availability_zone': 'nova',
'id': 1}
self.ctxt = context.get_admin_context()
def _check_app(self, app, cluster=None, cluster_exists=None,
is_upgrading=False, svc_id=None, added_to_cluster=None):
"""Check that Service instance and DB service and cluster are ok."""
self.assertIsNotNone(app)
# Check that we have the service ID
self.assertTrue(hasattr(app, 'service_id'))
if svc_id:
self.assertEqual(svc_id, app.service_id)
# Check that cluster has been properly set
self.assertEqual(cluster, app.cluster)
# Check that the entry has been really created in the DB
svc = objects.Service.get_by_id(self.ctxt, app.service_id)
cluster_name = cluster if cluster_exists is not False else None
# Check that cluster name matches
self.assertEqual(cluster_name, svc.cluster_name)
clusters = objects.ClusterList.get_all(self.ctxt)
if added_to_cluster is None:
added_to_cluster = not is_upgrading
if cluster_name:
# Make sure we have created the cluster in the DB
self.assertEqual(1, len(clusters))
cluster = clusters[0]
self.assertEqual(cluster_name, cluster.name)
self.assertEqual(self.binary, cluster.binary)
else:
# Make sure we haven't created any cluster in the DB
self.assertListEqual([], clusters.objects)
self.assertEqual(added_to_cluster, app.added_to_cluster)
@ddt.data(False, True)
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n')
def test_create(self, is_upgrading, is_upgrading_mock):
"""Test non clustered service creation."""
is_upgrading_mock.return_value = is_upgrading
def test_create(self):
# NOTE(vish): Create was moved out of mock replay to make sure that
# the looping calls are created in StartService.
app = service.Service.create(host=self.host,
binary=self.binary,
topic=self.topic)
self._check_app(app, is_upgrading=is_upgrading)
self.assertIsNotNone(app)
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
return_value=False)
def test_create_with_cluster_not_upgrading(self, is_upgrading_mock):
"""Test DB cluster creation when service is created."""
cluster_name = 'cluster'
app = service.Service.create(host=self.host, binary=self.binary,
cluster=cluster_name, topic=self.topic)
self._check_app(app, cluster_name)
# Check that we have the service ID
self.assertTrue(hasattr(app, 'service_id'))
# Check that the entry has been really created in the DB
objects.Service.get_by_id(context.get_admin_context(), app.service_id)
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
return_value=True)
def test_create_with_cluster_upgrading(self, is_upgrading_mock):
"""Test that we don't create the cluster while we are upgrading."""
cluster_name = 'cluster'
app = service.Service.create(host=self.host, binary=self.binary,
cluster=cluster_name, topic=self.topic)
self._check_app(app, cluster_name, cluster_exists=False,
is_upgrading=True)
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
return_value=False)
def test_create_svc_exists_upgrade_cluster(self, is_upgrading_mock):
"""Test that we update cluster_name field when cfg has changed."""
# Create the service in the DB
db_svc = db.service_create(context.get_admin_context(),
{'host': self.host, 'binary': self.binary,
'topic': self.topic,
'cluster_name': None})
cluster_name = 'cluster'
app = service.Service.create(host=self.host, binary=self.binary,
cluster=cluster_name, topic=self.topic)
self._check_app(app, cluster_name, svc_id=db_svc.id)
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
return_value=True)
def test_create_svc_exists_not_upgrade_cluster(self, is_upgrading_mock):
"""Test we don't update cluster_name on cfg change when upgrading."""
# Create the service in the DB
db_svc = db.service_create(context.get_admin_context(),
{'host': self.host, 'binary': self.binary,
'topic': self.topic,
'cluster': None})
cluster_name = 'cluster'
app = service.Service.create(host=self.host, binary=self.binary,
cluster=cluster_name, topic=self.topic)
self._check_app(app, cluster_name, cluster_exists=False,
is_upgrading=True, svc_id=db_svc.id)
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
return_value=False)
@mock.patch.object(objects.service.Service, 'get_by_args')
@mock.patch.object(objects.service.Service, 'get_by_id')
def test_report_state_newly_disconnected(self, get_by_id, get_by_args):
def test_report_state_newly_disconnected(self, get_by_id, get_by_args,
is_upgrading_mock):
get_by_args.side_effect = exception.NotFound()
get_by_id.side_effect = db_exc.DBConnectionError()
with mock.patch.object(objects.service, 'db') as mock_db:
@ -168,9 +281,12 @@ class ServiceTestCase(test.TestCase):
self.assertTrue(serv.model_disconnected)
self.assertFalse(mock_db.service_update.called)
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
return_value=False)
@mock.patch.object(objects.service.Service, 'get_by_args')
@mock.patch.object(objects.service.Service, 'get_by_id')
def test_report_state_disconnected_DBError(self, get_by_id, get_by_args):
def test_report_state_disconnected_DBError(self, get_by_id, get_by_args,
is_upgrading_mock):
get_by_args.side_effect = exception.NotFound()
get_by_id.side_effect = db_exc.DBError()
with mock.patch.object(objects.service, 'db') as mock_db:
@ -187,9 +303,12 @@ class ServiceTestCase(test.TestCase):
self.assertTrue(serv.model_disconnected)
self.assertFalse(mock_db.service_update.called)
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
return_value=False)
@mock.patch('cinder.db.sqlalchemy.api.service_update')
@mock.patch('cinder.db.sqlalchemy.api.service_get')
def test_report_state_newly_connected(self, get_by_id, service_update):
def test_report_state_newly_connected(self, get_by_id, service_update,
is_upgrading_mock):
get_by_id.return_value = self.service_ref
serv = service.Service(
@ -205,7 +324,9 @@ class ServiceTestCase(test.TestCase):
self.assertFalse(serv.model_disconnected)
self.assertTrue(service_update.called)
def test_report_state_manager_not_working(self):
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
return_value=False)
def test_report_state_manager_not_working(self, is_upgrading_mock):
with mock.patch('cinder.db') as mock_db:
mock_db.service_get.return_value = self.service_ref
@ -222,7 +343,9 @@ class ServiceTestCase(test.TestCase):
serv.manager.is_working.assert_called_once_with()
self.assertFalse(mock_db.service_update.called)
def test_service_with_long_report_interval(self):
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
return_value=False)
def test_service_with_long_report_interval(self, is_upgrading_mock):
self.override_config('service_down_time', 10)
self.override_config('report_interval', 10)
service.Service.create(
@ -230,9 +353,12 @@ class ServiceTestCase(test.TestCase):
manager="cinder.tests.unit.test_service.FakeManager")
self.assertEqual(25, CONF.service_down_time)
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
return_value=False)
@mock.patch.object(rpc, 'get_server')
@mock.patch('cinder.db')
def test_service_stop_waits_for_rpcserver(self, mock_db, mock_rpc):
def test_service_stop_waits_for_rpcserver(self, mock_db, mock_rpc,
is_upgrading_mock):
serv = service.Service(
self.host,
self.binary,
@ -246,6 +372,8 @@ class ServiceTestCase(test.TestCase):
serv.rpcserver.stop.assert_called_once_with()
serv.rpcserver.wait.assert_called_once_with()
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
return_value=False)
@mock.patch('cinder.service.Service.report_state')
@mock.patch('cinder.service.Service.periodic_tasks')
@mock.patch.object(service.loopingcall, 'FixedIntervalLoopingCall')
@ -253,7 +381,7 @@ class ServiceTestCase(test.TestCase):
@mock.patch('cinder.db')
def test_service_stop_waits_for_timers(self, mock_db, mock_rpc,
mock_loopcall, mock_periodic,
mock_report):
mock_report, is_upgrading_mock):
"""Test that we wait for loopcalls only if stop succeeds."""
serv = service.Service(
self.host,
@ -287,6 +415,61 @@ class ServiceTestCase(test.TestCase):
self.assertEqual(1, serv.timers[1].stop.call_count)
self.assertEqual(1, serv.timers[1].wait.call_count)
@mock.patch('cinder.manager.Manager.init_host')
@mock.patch.object(service.loopingcall, 'FixedIntervalLoopingCall')
@mock.patch('oslo_messaging.Target')
@mock.patch.object(rpc, 'get_server')
def _check_rpc_servers_and_init_host(self, app, added_to_cluster, cluster,
rpc_mock, target_mock, loop_mock,
init_host_mock):
app.start()
# Since we have created the service entry we call init_host with
# added_to_cluster=True
init_host_mock.assert_called_once_with(
added_to_cluster=added_to_cluster)
expected_target_calls = [mock.call(topic=self.topic, server=self.host)]
expected_rpc_calls = [mock.call(target_mock.return_value, mock.ANY,
mock.ANY),
mock.call().start()]
if cluster and added_to_cluster:
self.assertIsNotNone(app.cluster_rpcserver)
expected_target_calls.append(mock.call(topic=self.topic,
server=cluster))
expected_rpc_calls.extend(expected_rpc_calls[:])
# Check that we create message targets for host and cluster
target_mock.assert_has_calls(expected_target_calls)
# Check we get and start rpc services for host and cluster
rpc_mock.assert_has_calls(expected_rpc_calls)
self.assertIsNotNone(app.rpcserver)
app.stop()
@mock.patch('cinder.objects.Service.get_minimum_obj_version',
return_value='1.6')
def test_start_rpc_and_init_host_no_cluster(self, is_upgrading_mock):
"""Test that without cluster we don't create rpc service."""
app = service.Service.create(host=self.host, binary='cinder-volume',
cluster=None, topic=self.topic)
self._check_rpc_servers_and_init_host(app, True, None)
@ddt.data('1.3', '1.8')
@mock.patch('cinder.objects.Service.get_minimum_obj_version')
def test_start_rpc_and_init_host_cluster(self, obj_version,
get_min_obj_mock):
"""Test that with cluster we create the rpc service."""
get_min_obj_mock.return_value = obj_version
cluster = 'cluster'
app = service.Service.create(host=self.host, binary='cinder-volume',
cluster=cluster, topic=self.topic)
self._check_rpc_servers_and_init_host(app, obj_version != '1.3',
cluster)
class TestWSGIService(test.TestCase):

View File

@ -474,6 +474,27 @@ class VolumeTestCase(BaseVolumeTestCase):
self.volume.delete_volume(self.context, vol0.id)
self.volume.delete_volume(self.context, vol1.id)
@mock.patch('cinder.volume.manager.VolumeManager.'
'_include_resources_in_cluster')
def test_init_host_cluster_not_changed(self, include_in_cluster_mock):
self.volume.init_host(False)
include_in_cluster_mock.assert_not_called()
@mock.patch('cinder.objects.volume.VolumeList.include_in_cluster')
@mock.patch('cinder.objects.consistencygroup.ConsistencyGroupList.'
'include_in_cluster')
def test_init_host_added_to_cluster(self, vol_include_mock,
cg_include_mock):
self.mock_object(self.volume, 'cluster', mock.sentinel.cluster)
self.volume.init_host(True)
vol_include_mock.assert_called_once_with(mock.ANY,
mock.sentinel.cluster,
host=self.volume.host)
cg_include_mock.assert_called_once_with(mock.ANY,
mock.sentinel.cluster,
host=self.volume.host)
@mock.patch('cinder.objects.service.Service.get_minimum_rpc_version')
@mock.patch('cinder.objects.service.Service.get_minimum_obj_version')
@mock.patch('cinder.rpc.LAST_RPC_VERSIONS', {'cinder-scheduler': '1.3'})

View File

@ -355,10 +355,30 @@ class VolumeManager(manager.SchedulerDependentManager):
update['id'],
{'provider_id': update['provider_id']})
def init_host(self):
def _include_resources_in_cluster(self, ctxt):
LOG.info(_LI('Including all resources from host %(host)s in cluster '
'%(cluster)s.'),
{'host': self.host, 'cluster': self.cluster})
num_vols = objects.VolumeList.include_in_cluster(
ctxt, self.cluster, host=self.host)
num_cgs = objects.ConsistencyGroupList.include_in_cluster(
ctxt, self.cluster, host=self.host)
LOG.info(_LI('%(num_vols)s volumes and %(num_cgs)s consistency groups '
'from host %(host)s have been included in cluster '
'%(cluster)s.'),
{'num_vols': num_vols, 'num_cgs': num_cgs,
'host': self.host, 'cluster': self.cluster})
def init_host(self, added_to_cluster=None):
"""Perform any required initialization."""
ctxt = context.get_admin_context()
# If we have just added this host to a cluster we have to include all
# our resources in that cluster.
if added_to_cluster:
self._include_resources_in_cluster(ctxt)
LOG.info(_LI("Starting volume driver %(driver_name)s (%(version)s)"),
{'driver_name': self.driver.__class__.__name__,
'version': self.driver.get_version()})

View File

@ -0,0 +1,12 @@
---
prelude: >
Everything in Cinder's release notes related to the High Availability
Active-Active effort -preluded with "HA A-A:"- is work in progress and
should not be used in production until it has been completed and the
appropriate release note has been issued stating its readiness for
production.
features:
- "HA A-A: Add cluster configuration option to allow grouping hosts that
share the same backend configurations and should work in Active-Active
fashion."