diff --git a/cinder/api/common.py b/cinder/api/common.py index 09e2e8ed2f3..307ebdda4df 100644 --- a/cinder/api/common.py +++ b/cinder/api/common.py @@ -372,8 +372,23 @@ class ViewBuilder(object): return urllib.parse.urlunsplit(url_parts).rstrip('/') -def get_cluster_host(req, params, cluster_version): - if req.api_version_request.matches(cluster_version): +def get_cluster_host(req, params, cluster_version=None): + """Get cluster and host from the parameters. + + This method checks the presence of cluster and host parameters and returns + them depending on the cluster_version. + + If cluster_version is False we will never return the cluster_name and we + will require the presence of the host parameter. + + If cluster_version is None we will always check for the presence of the + cluster parameter, and if cluster_version is a string with a version we + will only check for the presence of the parameter if the version of the + request is not less than it. In both cases we will require one and only + one parameter, host or cluster. + """ + if (cluster_version is not False and + req.api_version_request.matches(cluster_version)): cluster_name = params.get('cluster') msg = _('One and only one of cluster and host must be set.') else: diff --git a/cinder/api/contrib/services.py b/cinder/api/contrib/services.py index c556a930c2e..fbe13d822ee 100644 --- a/cinder/api/contrib/services.py +++ b/cinder/api/contrib/services.py @@ -20,6 +20,7 @@ from oslo_log import versionutils from oslo_utils import timeutils import webob.exc +from cinder.api import common from cinder.api import extensions from cinder.api.openstack import wsgi from cinder import exception @@ -110,20 +111,23 @@ class ServiceController(wsgi.Controller): return True - def _freeze(self, context, host): - return self.volume_api.freeze_host(context, host) + def _freeze(self, context, req, body): + cluster_name, host = common.get_cluster_host(req, body, '3.26') + return self.volume_api.freeze_host(context, host, cluster_name) - def _thaw(self, context, host): - return self.volume_api.thaw_host(context, host) + def _thaw(self, context, req, body): + cluster_name, host = common.get_cluster_host(req, body, '3.26') + return self.volume_api.thaw_host(context, host, cluster_name) - def _failover(self, context, host, backend_id=None): - return self.volume_api.failover_host(context, host, backend_id) - - def _get_host(self, body): - try: - return body['host'] - except (TypeError, KeyError): - raise exception.MissingRequired(element='host') + def _failover(self, context, req, body, clustered): + # We set version to None to always get the cluster name from the body, + # to False when we don't want to get it, and '3.26' when we only want + # it if the requested version is 3.26 or higher. + version = '3.26' if clustered else False + cluster_name, host = common.get_cluster_host(req, body, version) + self.volume_api.failover(context, host, cluster_name, + body.get('backend_id')) + return webob.Response(status_int=202) def update(self, req, id, body): """Enable/Disable scheduling for a service. @@ -148,20 +152,17 @@ class ServiceController(wsgi.Controller): disabled = True status = "disabled" elif id == "freeze": - return self._freeze(context, self._get_host(body)) + return self._freeze(context, req, body) elif id == "thaw": - return self._thaw(context, self._get_host(body)) + return self._thaw(context, req, body) elif id == "failover_host": - self._failover( - context, - self._get_host(body), - body.get('backend_id', None) - ) - return webob.Response(status_int=202) + return self._failover(context, req, body, False) + elif req.api_version_request.matches('3.26') and id == 'failover': + return self._failover(context, req, body, True) else: raise exception.InvalidInput(reason=_("Unknown action")) - host = self._get_host(body) + host = common.get_cluster_host(req, body, False)[1] ret_val['disabled'] = disabled if id == "disable-log-reason" and ext_loaded: diff --git a/cinder/api/openstack/api_version_request.py b/cinder/api/openstack/api_version_request.py index 667a7d3dc48..164a6625a41 100644 --- a/cinder/api/openstack/api_version_request.py +++ b/cinder/api/openstack/api_version_request.py @@ -75,6 +75,8 @@ REST_API_VERSION_HISTORY = """ * 3.23 - Allow passing force parameter to volume delete. * 3.24 - Add workers/cleanup endpoint. * 3.25 - Add ``volumes`` field to group list/detail and group show. + * 3.26 - Add failover action and cluster listings accept new filters and + return new data. """ # The minimum and maximum versions of the API supported @@ -82,7 +84,7 @@ REST_API_VERSION_HISTORY = """ # minimum version of the API supported. # Explicitly using /v1 or /v2 enpoints will still work _MIN_API_VERSION = "3.0" -_MAX_API_VERSION = "3.25" +_MAX_API_VERSION = "3.26" _LEGACY_API_VERSION1 = "1.0" _LEGACY_API_VERSION2 = "2.0" diff --git a/cinder/api/openstack/rest_api_version_history.rst b/cinder/api/openstack/rest_api_version_history.rst index f0380aab9d1..b343cdb0fb1 100644 --- a/cinder/api/openstack/rest_api_version_history.rst +++ b/cinder/api/openstack/rest_api_version_history.rst @@ -268,3 +268,15 @@ user documentation. 3.25 ---- Add ``volumes`` field to group list/detail and group show. + +3.26 +---- + - New ``failover`` action equivalent to ``failover_host``, but accepting + ``cluster`` parameter as well as the ``host`` cluster that + ``failover_host`` accepts. + + - ``freeze`` and ``thaw`` actions accept ``cluster`` parameter. + + - Cluster listing accepts ``replication_status``, ``frozen`` and + ``active_backend_id`` as filters, and returns additional fields for each + cluster: ``replication_status``, ``frozen``, ``active_backend_id``. diff --git a/cinder/api/v3/clusters.py b/cinder/api/v3/clusters.py index 0048cf25a0c..120f1e6a106 100644 --- a/cinder/api/v3/clusters.py +++ b/cinder/api/v3/clusters.py @@ -22,11 +22,14 @@ from cinder import utils CLUSTER_MICRO_VERSION = '3.7' +REPLICATION_DATA_MICRO_VERSION = '3.26' class ClusterController(wsgi.Controller): allowed_list_keys = {'name', 'binary', 'is_up', 'disabled', 'num_hosts', - 'num_down_hosts', 'binary'} + 'num_down_hosts', 'binary', 'replication_status', + 'frozen', 'active_backend_id'} + replication_fields = {'replication_status', 'frozen', 'active_backend_id'} policy_checker = wsgi.Controller.get_policy_checker('clusters') @@ -38,7 +41,9 @@ class ClusterController(wsgi.Controller): # Let the wsgi middleware convert NotFound exceptions cluster = objects.Cluster.get_by_id(context, None, binary=binary, name=id, services_summary=True) - return clusters_view.ViewBuilder.detail(cluster) + replication_data = req.api_version_request.matches( + REPLICATION_DATA_MICRO_VERSION) + return clusters_view.ViewBuilder.detail(cluster, replication_data) @wsgi.Controller.api_version(CLUSTER_MICRO_VERSION) def index(self, req): @@ -59,9 +64,12 @@ class ClusterController(wsgi.Controller): def _get_clusters(self, req, detail): # Let the wsgi middleware convert NotAuthorized exceptions context = self.policy_checker(req, 'get_all') - + replication_data = req.api_version_request.matches( + REPLICATION_DATA_MICRO_VERSION) filters = dict(req.GET) allowed = self.allowed_list_keys + if not replication_data: + allowed = allowed.difference(self.replication_fields) # Check filters are valid if not allowed.issuperset(filters): @@ -78,7 +86,8 @@ class ClusterController(wsgi.Controller): filters['services_summary'] = detail clusters = objects.ClusterList.get_all(context, **filters) - return clusters_view.ViewBuilder.list(clusters, detail) + return clusters_view.ViewBuilder.list(clusters, detail, + replication_data) @wsgi.Controller.api_version(CLUSTER_MICRO_VERSION) def update(self, req, id, body): @@ -113,7 +122,9 @@ class ClusterController(wsgi.Controller): cluster.save() # We return summary data plus the disabled reason - ret_val = clusters_view.ViewBuilder.summary(cluster) + replication_data = req.api_version_request.matches( + REPLICATION_DATA_MICRO_VERSION) + ret_val = clusters_view.ViewBuilder.summary(cluster, replication_data) ret_val['cluster']['disabled_reason'] = disabled_reason return ret_val diff --git a/cinder/api/v3/views/clusters.py b/cinder/api/v3/views/clusters.py index 2391cc176ed..3cabf61cf4a 100644 --- a/cinder/api/v3/views/clusters.py +++ b/cinder/api/v3/views/clusters.py @@ -28,7 +28,7 @@ class ViewBuilder(object): return '' @classmethod - def detail(cls, cluster, flat=False): + def detail(cls, cluster, replication_data=False, flat=False): """Detailed view of a cluster.""" result = cls.summary(cluster, flat=True) result.update( @@ -37,27 +37,36 @@ class ViewBuilder(object): last_heartbeat=cls._normalize(cluster.last_heartbeat), created_at=cls._normalize(cluster.created_at), updated_at=cls._normalize(cluster.updated_at), - disabled_reason=cluster.disabled_reason + disabled_reason=cluster.disabled_reason, + replication_status=cluster.replication_status, + frozen=cluster.frozen, + active_backend_id=cluster.active_backend_id, ) - + if not replication_data: + for field in ('replication_status', 'frozen', 'active_backend_id'): + del result[field] if flat: return result return {'cluster': result} @staticmethod - def summary(cluster, flat=False): + def summary(cluster, replication_data=False, flat=False): """Generic, non-detailed view of a cluster.""" result = { 'name': cluster.name, 'binary': cluster.binary, 'state': 'up' if cluster.is_up else 'down', 'status': 'disabled' if cluster.disabled else 'enabled', + 'replication_status': cluster.replication_status, } + if not replication_data: + del result['replication_status'] if flat: return result return {'cluster': result} @classmethod - def list(cls, clusters, detail=False): + def list(cls, clusters, detail=False, replication_data=False): func = cls.detail if detail else cls.summary - return {'clusters': [func(n, flat=True) for n in clusters]} + return {'clusters': [func(n, replication_data, flat=True) + for n in clusters]} diff --git a/cinder/db/sqlalchemy/migrate_repo/versions/088_add_replication_info_to_cluster.py b/cinder/db/sqlalchemy/migrate_repo/versions/088_add_replication_info_to_cluster.py new file mode 100644 index 00000000000..0e8ea25aa20 --- /dev/null +++ b/cinder/db/sqlalchemy/migrate_repo/versions/088_add_replication_info_to_cluster.py @@ -0,0 +1,33 @@ +# Copyright (c) 2016 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from sqlalchemy import Boolean, Column, MetaData, String, Table, text + + +def upgrade(migrate_engine): + """Add replication info to clusters table.""" + meta = MetaData() + meta.bind = migrate_engine + + clusters = Table('clusters', meta, autoload=True) + replication_status = Column('replication_status', String(length=36), + default="not-capable") + active_backend_id = Column('active_backend_id', String(length=255)) + frozen = Column('frozen', Boolean, nullable=False, default=False, + server_default=text('false')) + + clusters.create_column(replication_status) + clusters.create_column(frozen) + clusters.create_column(active_backend_id) diff --git a/cinder/db/sqlalchemy/models.py b/cinder/db/sqlalchemy/models.py index cbed9e183b1..a9556e109c7 100644 --- a/cinder/db/sqlalchemy/models.py +++ b/cinder/db/sqlalchemy/models.py @@ -122,6 +122,10 @@ class Cluster(BASE, CinderBase): disabled_reason = Column(String(255)) race_preventer = Column(Integer, nullable=False, default=0) + replication_status = Column(String(36), default="not-capable") + active_backend_id = Column(String(255)) + frozen = Column(Boolean, nullable=False, default=False) + # Last heartbeat reported by any of the services of this cluster. This is # not deferred since we always want to load this field. last_heartbeat = column_property( diff --git a/cinder/exception.py b/cinder/exception.py index 59656b96bcf..7a16a0980a3 100644 --- a/cinder/exception.py +++ b/cinder/exception.py @@ -422,12 +422,13 @@ class ImageNotFound(NotFound): class ServiceNotFound(NotFound): def __init__(self, message=None, **kwargs): - if kwargs.get('host', None): - self.message = _("Service %(service_id)s could not be " - "found on host %(host)s.") - else: - self.message = _("Service %(service_id)s could not be found.") - super(ServiceNotFound, self).__init__(None, **kwargs) + if not message: + if kwargs.get('host', None): + self.message = _("Service %(service_id)s could not be " + "found on host %(host)s.") + else: + self.message = _("Service %(service_id)s could not be found.") + super(ServiceNotFound, self).__init__(message, **kwargs) class ServiceTooOld(Invalid): diff --git a/cinder/objects/base.py b/cinder/objects/base.py index 478b4434a95..1c22469cc49 100644 --- a/cinder/objects/base.py +++ b/cinder/objects/base.py @@ -124,6 +124,7 @@ OBJ_VERSIONS.add('1.16', {'BackupDeviceInfo': '1.0'}) OBJ_VERSIONS.add('1.17', {'VolumeAttachment': '1.1'}) OBJ_VERSIONS.add('1.18', {'Snapshot': '1.3'}) OBJ_VERSIONS.add('1.19', {'ConsistencyGroup': '1.4', 'CGSnapshot': '1.1'}) +OBJ_VERSIONS.add('1.20', {'Cluster': '1.1'}) class CinderObjectRegistry(base.VersionedObjectRegistry): diff --git a/cinder/objects/cluster.py b/cinder/objects/cluster.py index 6b5842a982f..c72598efd9d 100644 --- a/cinder/objects/cluster.py +++ b/cinder/objects/cluster.py @@ -20,6 +20,7 @@ from cinder import exception from cinder.i18n import _ from cinder import objects from cinder.objects import base +from cinder.objects import fields as c_fields from cinder import utils @@ -37,7 +38,8 @@ class Cluster(base.CinderPersistentObject, base.CinderObject, - Any other cluster field will be used as a filter. """ # Version 1.0: Initial version - VERSION = '1.0' + # Version 1.1: Add replication fields + VERSION = '1.1' OPTIONAL_FIELDS = ('num_hosts', 'num_down_hosts', 'services') # NOTE(geguileo): We don't want to expose race_preventer field at the OVO @@ -54,8 +56,24 @@ class Cluster(base.CinderPersistentObject, base.CinderObject, 'last_heartbeat': fields.DateTimeField(nullable=True, read_only=True), 'services': fields.ObjectField('ServiceList', nullable=True, read_only=True), + # Replication properties + 'replication_status': c_fields.ReplicationStatusField(nullable=True), + 'frozen': fields.BooleanField(default=False), + 'active_backend_id': fields.StringField(nullable=True), } + def obj_make_compatible(self, primitive, target_version): + """Make a cluster representation compatible with a target version.""" + # Convert all related objects + super(Cluster, self).obj_make_compatible(primitive, target_version) + + # Before v1.1 we didn't have relication fields so we have to remove + # them. + if target_version == '1.0': + for obj_field in ('replication_status', 'frozen', + 'active_backend_id'): + primitive.pop(obj_field, None) + @classmethod def _get_expected_attrs(cls, context, *args, **kwargs): """Return expected attributes when getting a cluster. diff --git a/cinder/service.py b/cinder/service.py index e2bc28d1c73..39f061a9227 100644 --- a/cinder/service.py +++ b/cinder/service.py @@ -44,6 +44,7 @@ from cinder import exception from cinder.i18n import _, _LE, _LI, _LW from cinder import objects from cinder.objects import base as objects_base +from cinder.objects import fields from cinder import rpc from cinder.scheduler import rpcapi as scheduler_rpcapi from cinder import version @@ -187,7 +188,7 @@ class Service(service.Service): if self.added_to_cluster: # We pass copy service's disable status in the cluster if we # have to create it. - self._ensure_cluster_exists(ctxt, service_ref.disabled) + self._ensure_cluster_exists(ctxt, service_ref) service_ref.cluster_name = cluster service_ref.save() Service.service_id = service_ref.id @@ -272,7 +273,9 @@ class Service(service.Service): '%(version)s)'), {'topic': self.topic, 'version': version_string, 'cluster': self.cluster}) - target = messaging.Target(topic=self.topic, server=self.cluster) + target = messaging.Target( + topic='%s.%s' % (self.topic, self.cluster), + server=vol_utils.extract_host(self.cluster, 'host')) serializer = objects_base.CinderObjectSerializer(obj_version_cap) self.cluster_rpcserver = rpc.get_server(target, endpoints, serializer) @@ -316,17 +319,34 @@ class Service(service.Service): 'new_down_time': new_down_time}) CONF.set_override('service_down_time', new_down_time) - def _ensure_cluster_exists(self, context, disabled=None): + def _ensure_cluster_exists(self, context, service): if self.cluster: try: - objects.Cluster.get_by_id(context, None, name=self.cluster, - binary=self.binary) + cluster = objects.Cluster.get_by_id(context, None, + name=self.cluster, + binary=self.binary) + # If the cluster already exists, then the service replication + # fields must match those of the cluster unless the service + # is in error status. + error_states = (fields.ReplicationStatus.ERROR, + fields.ReplicationStatus.FAILOVER_ERROR) + if service.replication_status not in error_states: + for attr in ('replication_status', 'active_backend_id', + 'frozen'): + if getattr(service, attr) != getattr(cluster, attr): + setattr(service, attr, getattr(cluster, attr)) + except exception.ClusterNotFound: - cluster = objects.Cluster(context=context, name=self.cluster, - binary=self.binary) - # If disabled has been specified overwrite default value - if disabled is not None: - cluster.disabled = disabled + # Since the cluster didn't exist, we copy replication fields + # from the service. + cluster = objects.Cluster( + context=context, + name=self.cluster, + binary=self.binary, + disabled=service.disabled, + replication_status=service.replication_status, + active_backend_id=service.active_backend_id, + frozen=service.frozen) try: cluster.create() @@ -355,7 +375,10 @@ class Service(service.Service): Service.service_id = service_ref.id # TODO(geguileo): In O unconditionally ensure that the cluster exists if not self.is_upgrading_to_n: - self._ensure_cluster_exists(context) + self._ensure_cluster_exists(context, service_ref) + # If we have updated the service_ref with replication data from + # the cluster it will be saved. + service_ref.save() def __getattr__(self, key): manager = self.__dict__.get('manager', None) diff --git a/cinder/tests/unit/api/contrib/test_services.py b/cinder/tests/unit/api/contrib/test_services.py index 64de68f099f..d5187c277f4 100644 --- a/cinder/tests/unit/api/contrib/test_services.py +++ b/cinder/tests/unit/api/contrib/test_services.py @@ -255,6 +255,36 @@ class ServicesTest(test.TestCase): ]} self.assertEqual(response, res_dict) + def test_failover_old_version(self): + req = FakeRequest(version='3.18') + self.assertRaises(exception.InvalidInput, self.controller.update, req, + 'failover', {'cluster': 'cluster1'}) + + def test_failover_no_values(self): + req = FakeRequest(version='3.26') + self.assertRaises(exception.InvalidInput, self.controller.update, req, + 'failover', {'backend_id': 'replica1'}) + + @ddt.data({'host': 'hostname'}, {'cluster': 'mycluster'}) + @mock.patch('cinder.volume.api.API.failover') + def test_failover(self, body, failover_mock): + req = FakeRequest(version='3.26') + body['backend_id'] = 'replica1' + res = self.controller.update(req, 'failover', body) + self.assertEqual(202, res.status_code) + failover_mock.assert_called_once_with(req.environ['cinder.context'], + body.get('host'), + body.get('cluster'), 'replica1') + + @ddt.data({}, {'host': 'hostname', 'cluster': 'mycluster'}) + @mock.patch('cinder.volume.api.API.failover') + def test_failover_invalid_input(self, body, failover_mock): + req = FakeRequest(version='3.26') + body['backend_id'] = 'replica1' + self.assertRaises(exception.InvalidInput, + self.controller.update, req, 'failover', body) + failover_mock.assert_not_called() + def test_services_list_with_cluster_name(self): req = FakeRequest(version='3.7') res_dict = self.controller.index(req) @@ -756,11 +786,12 @@ class ServicesTest(test.TestCase): req = fakes.HTTPRequest.blank(url) body = {'host': mock.sentinel.host, 'backend_id': mock.sentinel.backend_id} - with mock.patch.object(self.controller.volume_api, 'failover_host') \ + with mock.patch.object(self.controller.volume_api, 'failover') \ as failover_mock: res = self.controller.update(req, 'failover_host', body) failover_mock.assert_called_once_with(req.environ['cinder.context'], mock.sentinel.host, + None, mock.sentinel.backend_id) self.assertEqual(202, res.status_code) @@ -772,7 +803,7 @@ class ServicesTest(test.TestCase): as freeze_mock: res = self.controller.update(req, 'freeze', body) freeze_mock.assert_called_once_with(req.environ['cinder.context'], - mock.sentinel.host) + mock.sentinel.host, None) self.assertEqual(freeze_mock.return_value, res) def test_services_thaw(self): @@ -783,12 +814,12 @@ class ServicesTest(test.TestCase): as thaw_mock: res = self.controller.update(req, 'thaw', body) thaw_mock.assert_called_once_with(req.environ['cinder.context'], - mock.sentinel.host) + mock.sentinel.host, None) self.assertEqual(thaw_mock.return_value, res) @ddt.data('freeze', 'thaw', 'failover_host') def test_services_replication_calls_no_host(self, method): url = '/v2/%s/os-services/%s' % (fake.PROJECT_ID, method) req = fakes.HTTPRequest.blank(url) - self.assertRaises(exception.MissingRequired, + self.assertRaises(exception.InvalidInput, self.controller.update, req, method, {}) diff --git a/cinder/tests/unit/api/v3/test_cluster.py b/cinder/tests/unit/api/v3/test_cluster.py index 27e4e30bba5..e6f280d790f 100644 --- a/cinder/tests/unit/api/v3/test_cluster.py +++ b/cinder/tests/unit/api/v3/test_cluster.py @@ -18,6 +18,7 @@ import datetime import ddt from iso8601 import iso8601 import mock +from oslo_utils import versionutils from cinder.api import extensions from cinder.api.openstack import api_version_request as api_version @@ -31,11 +32,17 @@ from cinder.tests.unit import fake_cluster CLUSTERS = [ fake_cluster.fake_db_cluster( id=1, + replication_status='error', + frozen=False, + active_backend_id='replication1', last_heartbeat=datetime.datetime(2016, 6, 1, 2, 46, 28), updated_at=datetime.datetime(2016, 6, 1, 2, 46, 28), created_at=datetime.datetime(2016, 6, 1, 2, 46, 28)), fake_cluster.fake_db_cluster( id=2, name='cluster2', num_hosts=2, num_down_hosts=1, disabled=True, + replication_status='error', + frozen=True, + active_backend_id='replication2', updated_at=datetime.datetime(2016, 6, 1, 1, 46, 28), created_at=datetime.datetime(2016, 6, 1, 1, 46, 28)) ] @@ -51,6 +58,9 @@ EXPECTED = [{'created_at': datetime.datetime(2016, 6, 1, 2, 46, 28), 'num_hosts': 0, 'state': 'up', 'status': 'enabled', + 'replication_status': 'error', + 'frozen': False, + 'active_backend_id': 'replication1', 'updated_at': datetime.datetime(2016, 6, 1, 2, 46, 28)}, {'created_at': datetime.datetime(2016, 6, 1, 1, 46, 28), 'disabled_reason': None, @@ -61,6 +71,9 @@ EXPECTED = [{'created_at': datetime.datetime(2016, 6, 1, 2, 46, 28), 'num_hosts': 2, 'state': 'down', 'status': 'disabled', + 'replication_status': 'error', + 'frozen': True, + 'active_backend_id': 'replication2', 'updated_at': datetime.datetime(2016, 6, 1, 1, 46, 28)}] @@ -92,6 +105,21 @@ class ClustersTestCase(test.TestCase): {'is_up': True, 'disabled': False, 'num_hosts': 2, 'num_down_hosts': 1, 'binary': 'cinder-volume'}) + REPLICATION_FILTERS = ({'replication_status': 'error'}, {'frozen': True}, + {'active_backend_id': 'replication'}) + + def _get_expected(self, version='3.8'): + if versionutils.convert_version_to_tuple(version) >= (3, 19): + return EXPECTED + + expect = [] + for cluster in EXPECTED: + cluster = cluster.copy() + for key in ('replication_status', 'frozen', 'active_backend_id'): + cluster.pop(key) + expect.append(cluster) + return expect + def setUp(self): super(ClustersTestCase, self).setUp() @@ -101,8 +129,10 @@ class ClustersTestCase(test.TestCase): self.controller = clusters.ClusterController(self.ext_mgr) @mock.patch('cinder.db.cluster_get_all', return_value=CLUSTERS_ORM) - def _test_list(self, get_all_mock, detailed, filters, expected=None): - req = FakeRequest(**filters) + def _test_list(self, get_all_mock, detailed, filters=None, expected=None, + version='3.8'): + filters = filters or {} + req = FakeRequest(version=version, **filters) method = getattr(self.controller, 'detail' if detailed else 'index') clusters = method(req) @@ -119,7 +149,7 @@ class ClustersTestCase(test.TestCase): @ddt.data(*LIST_FILTERS) def test_index_detail(self, filters): """Verify that we get all clusters with detailed data.""" - expected = {'clusters': EXPECTED} + expected = {'clusters': self._get_expected()} self._test_list(detailed=True, filters=filters, expected=expected) @ddt.data(*LIST_FILTERS) @@ -135,6 +165,16 @@ class ClustersTestCase(test.TestCase): 'status': 'disabled'}]} self._test_list(detailed=False, filters=filters, expected=expected) + @ddt.data(*REPLICATION_FILTERS) + def test_index_detail_fail_old(self, filters): + self.assertRaises(exception.InvalidInput, self._test_list, + detailed=True, filters=filters) + + @ddt.data(*REPLICATION_FILTERS) + def test_index_summary_fail_old(self, filters): + self.assertRaises(exception.InvalidInput, self._test_list, + detailed=False, filters=filters) + @ddt.data(True, False) def test_index_unauthorized(self, detailed): """Verify that unauthorized user can't list clusters.""" @@ -147,13 +187,35 @@ class ClustersTestCase(test.TestCase): """Verify the wrong version so that user can't list clusters.""" self.assertRaises(exception.VersionNotFoundForAPIMethod, self._test_list, detailed=detailed, - filters={'version': '3.5'}) + version='3.6') + + @ddt.data(*REPLICATION_FILTERS) + def test_index_detail_replication_new_fields(self, filters): + version = '3.26' + expected = {'clusters': self._get_expected(version)} + self._test_list(detailed=True, filters=filters, expected=expected, + version=version) + + @ddt.data(*REPLICATION_FILTERS) + def test_index_summary_replication_new_fields(self, filters): + expected = {'clusters': [{'name': 'cluster_name', + 'binary': 'cinder-volume', + 'state': 'up', + 'replication_status': 'error', + 'status': 'enabled'}, + {'name': 'cluster2', + 'binary': 'cinder-volume', + 'state': 'down', + 'replication_status': 'error', + 'status': 'disabled'}]} + self._test_list(detailed=False, filters=filters, expected=expected, + version='3.26') @mock.patch('cinder.db.sqlalchemy.api.cluster_get', return_value=CLUSTERS_ORM[0]) def test_show(self, get_mock): req = FakeRequest() - expected = {'cluster': EXPECTED[0]} + expected = {'cluster': self._get_expected()[0]} cluster = self.controller.show(req, mock.sentinel.name, mock.sentinel.binary) self.assertEqual(expected, cluster) diff --git a/cinder/tests/unit/objects/test_cluster.py b/cinder/tests/unit/objects/test_cluster.py index 6a63fca48b2..50d045e8103 100644 --- a/cinder/tests/unit/objects/test_cluster.py +++ b/cinder/tests/unit/objects/test_cluster.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import ddt import mock from oslo_utils import timeutils @@ -40,6 +41,7 @@ def _get_filters_sentinel(): 'num_down_hosts': mock.sentinel.num_down_hosts} +@ddt.ddt class TestCluster(test_objects.BaseObjectsTestCase): """Test Cluster Versioned Object methods.""" cluster = fake_cluster.fake_cluster_orm() @@ -111,6 +113,19 @@ class TestCluster(test_objects.BaseObjectsTestCase): last_heartbeat=expired_time) self.assertFalse(cluster.is_up) + @ddt.data('1.0', '1.1') + def tests_obj_make_compatible(self, version): + new_fields = {'replication_status': 'error', 'frozen': True, + 'active_backend_id': 'replication'} + cluster = objects.Cluster(self.context, **new_fields) + primitive = cluster.obj_to_primitive(version) + converted_cluster = objects.Cluster.obj_from_primitive(primitive) + for key, value in new_fields.items(): + if version == '1.0': + self.assertFalse(converted_cluster.obj_attr_is_set(key)) + else: + self.assertEqual(value, getattr(converted_cluster, key)) + class TestClusterList(test_objects.BaseObjectsTestCase): """Test ClusterList Versioned Object methods.""" diff --git a/cinder/tests/unit/objects/test_objects.py b/cinder/tests/unit/objects/test_objects.py index 5bec70ae460..4fdacf03721 100644 --- a/cinder/tests/unit/objects/test_objects.py +++ b/cinder/tests/unit/objects/test_objects.py @@ -28,7 +28,7 @@ object_data = { 'BackupImport': '1.4-c50f7a68bb4c400dd53dd219685b3992', 'BackupList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e', 'CleanupRequest': '1.0-e7c688b893e1d5537ccf65cc3eb10a28', - 'Cluster': '1.0-6f06e867c073e9d31722c53b0a9329b8', + 'Cluster': '1.1-cdb1572b250837933d950cc6662313b8', 'ClusterList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e', 'CGSnapshot': '1.1-3212ac2b4c2811b7134fb9ba2c49ff74', 'CGSnapshotList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e', diff --git a/cinder/tests/unit/test_migrations.py b/cinder/tests/unit/test_migrations.py index 0ff23872327..87984d59bf2 100644 --- a/cinder/tests/unit/test_migrations.py +++ b/cinder/tests/unit/test_migrations.py @@ -1076,6 +1076,16 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin): self.VARCHAR_TYPE) self.assertTrue(messages.c.request_id.nullable) + def _check_088(self, engine, data): + """Test adding replication data to cluster table.""" + clusters = db_utils.get_table(engine, 'clusters') + self.assertIsInstance(clusters.c.replication_status.type, + self.VARCHAR_TYPE) + self.assertIsInstance(clusters.c.active_backend_id.type, + self.VARCHAR_TYPE) + self.assertIsInstance(clusters.c.frozen.type, + self.BOOL_TYPE) + def test_walk_versions(self): self.walk_versions(False, False) diff --git a/cinder/tests/unit/test_service.py b/cinder/tests/unit/test_service.py index c5d6cc2e56f..170992eff86 100644 --- a/cinder/tests/unit/test_service.py +++ b/cinder/tests/unit/test_service.py @@ -30,6 +30,7 @@ from cinder import db from cinder import exception from cinder import manager from cinder import objects +from cinder.objects import fields from cinder import rpc from cinder import service from cinder import test @@ -437,8 +438,9 @@ class ServiceTestCase(test.TestCase): if cluster and added_to_cluster: self.assertIsNotNone(app.cluster_rpcserver) - expected_target_calls.append(mock.call(topic=self.topic, - server=cluster)) + expected_target_calls.append(mock.call( + topic=self.topic + '.' + cluster, + server=cluster.split('@')[0])) expected_rpc_calls.extend(expected_rpc_calls[:]) # Check that we create message targets for host and cluster @@ -465,12 +467,111 @@ class ServiceTestCase(test.TestCase): get_min_obj_mock): """Test that with cluster we create the rpc service.""" get_min_obj_mock.return_value = obj_version - cluster = 'cluster' + cluster = 'cluster@backend#pool' + self.host = 'host@backend#pool' app = service.Service.create(host=self.host, binary='cinder-volume', cluster=cluster, topic=self.topic) self._check_rpc_servers_and_init_host(app, obj_version != '1.3', cluster) + @mock.patch('cinder.service.Service.is_svc_upgrading_to_n', + mock.Mock(return_value=False)) + @mock.patch('cinder.objects.Cluster.get_by_id') + def test_ensure_cluster_exists_no_cluster(self, get_mock): + app = service.Service.create(host=self.host, + binary=self.binary, + topic=self.topic) + svc = objects.Service.get_by_id(self.ctxt, app.service_id) + app._ensure_cluster_exists(self.ctxt, svc) + get_mock.assert_not_called() + self.assertEqual({}, svc.cinder_obj_get_changes()) + + @mock.patch('cinder.service.Service.is_svc_upgrading_to_n', + mock.Mock(return_value=False)) + @mock.patch('cinder.objects.Cluster.get_by_id') + def test_ensure_cluster_exists_cluster_exists_non_relicated(self, + get_mock): + cluster = objects.Cluster( + name='cluster_name', active_backend_id=None, frozen=False, + replication_status=fields.ReplicationStatus.NOT_CAPABLE) + get_mock.return_value = cluster + + app = service.Service.create(host=self.host, + binary=self.binary, + topic=self.topic) + svc = objects.Service.get_by_id(self.ctxt, app.service_id) + app.cluster = cluster.name + app._ensure_cluster_exists(self.ctxt, svc) + get_mock.assert_called_once_with(self.ctxt, None, name=cluster.name, + binary=app.binary) + self.assertEqual({}, svc.cinder_obj_get_changes()) + + @mock.patch('cinder.service.Service.is_svc_upgrading_to_n', + mock.Mock(return_value=False)) + @mock.patch('cinder.objects.Cluster.get_by_id') + def test_ensure_cluster_exists_cluster_change(self, get_mock): + """We copy replication fields from the cluster to the service.""" + changes = dict(replication_status=fields.ReplicationStatus.FAILED_OVER, + active_backend_id='secondary', + frozen=True) + cluster = objects.Cluster(name='cluster_name', **changes) + get_mock.return_value = cluster + + app = service.Service.create(host=self.host, + binary=self.binary, + topic=self.topic) + svc = objects.Service.get_by_id(self.ctxt, app.service_id) + app.cluster = cluster.name + app._ensure_cluster_exists(self.ctxt, svc) + get_mock.assert_called_once_with(self.ctxt, None, name=cluster.name, + binary=app.binary) + self.assertEqual(changes, svc.cinder_obj_get_changes()) + + @mock.patch('cinder.service.Service.is_svc_upgrading_to_n', + mock.Mock(return_value=False)) + @mock.patch('cinder.objects.Cluster.get_by_id') + def test_ensure_cluster_exists_cluster_no_change(self, get_mock): + """Don't copy replication fields from cluster if replication error.""" + changes = dict(replication_status=fields.ReplicationStatus.FAILED_OVER, + active_backend_id='secondary', + frozen=True) + cluster = objects.Cluster(name='cluster_name', **changes) + get_mock.return_value = cluster + + app = service.Service.create(host=self.host, + binary=self.binary, + topic=self.topic) + svc = objects.Service.get_by_id(self.ctxt, app.service_id) + svc.replication_status = fields.ReplicationStatus.ERROR + svc.obj_reset_changes() + app.cluster = cluster.name + app._ensure_cluster_exists(self.ctxt, svc) + get_mock.assert_called_once_with(self.ctxt, None, name=cluster.name, + binary=app.binary) + self.assertEqual({}, svc.cinder_obj_get_changes()) + + @mock.patch('cinder.service.Service.is_svc_upgrading_to_n', + mock.Mock(return_value=False)) + def test_ensure_cluster_exists_cluster_create_replicated_and_non(self): + """We use service replication fields to create the cluster.""" + changes = dict(replication_status=fields.ReplicationStatus.FAILED_OVER, + active_backend_id='secondary', + frozen=True) + + app = service.Service.create(host=self.host, + binary=self.binary, + topic=self.topic) + svc = objects.Service.get_by_id(self.ctxt, app.service_id) + for key, value in changes.items(): + setattr(svc, key, value) + + app.cluster = 'cluster_name' + app._ensure_cluster_exists(self.ctxt, svc) + + cluster = objects.Cluster.get_by_id(self.ctxt, None, name=app.cluster) + for key, value in changes.items(): + self.assertEqual(value, getattr(cluster, key)) + class TestWSGIService(test.TestCase): diff --git a/cinder/tests/unit/test_volume.py b/cinder/tests/unit/test_volume.py index 58d54b43c0f..28951fabb90 100644 --- a/cinder/tests/unit/test_volume.py +++ b/cinder/tests/unit/test_volume.py @@ -40,6 +40,7 @@ from taskflow.engines.action_engine import engine from cinder.api import common from cinder.brick.local_dev import lvm as brick_lvm +from cinder.common import constants from cinder import context from cinder import coordination from cinder import db @@ -5711,107 +5712,356 @@ class VolumeMigrationTestCase(base.BaseVolumeTestCase): self.assertRaises(exception.VolumeNotFound, volume.refresh) +@ddt.ddt class ReplicationTestCase(base.BaseVolumeTestCase): - @mock.patch.object(volume_rpcapi.VolumeAPI, 'failover_host') + @mock.patch('cinder.objects.Service.is_up', True) + @mock.patch.object(volume_rpcapi.VolumeAPI, 'failover') @mock.patch.object(cinder.db, 'conditional_update') - @mock.patch.object(cinder.db, 'service_get') - def test_failover_host(self, mock_db_args, mock_db_update, - mock_failover): - """Test replication failover_host.""" + @mock.patch.object(objects.ServiceList, 'get_all') + def test_failover(self, mock_get_all, mock_db_update, mock_failover): + """Test replication failover.""" - mock_db_args.return_value = fake_service.fake_service_obj( - self.context, - binary='cinder-volume') + service = fake_service.fake_service_obj(self.context, + binary='cinder-volume') + mock_get_all.return_value = [service] mock_db_update.return_value = {'replication_status': 'enabled'} volume_api = cinder.volume.api.API() - volume_api.failover_host(self.context, host=CONF.host) - mock_failover.assert_called_once_with(self.context, CONF.host, None) + volume_api.failover(self.context, host=CONF.host, cluster_name=None) + mock_failover.assert_called_once_with(self.context, service, None) - @mock.patch.object(volume_rpcapi.VolumeAPI, 'failover_host') + @mock.patch.object(volume_rpcapi.VolumeAPI, 'failover') @mock.patch.object(cinder.db, 'conditional_update') - @mock.patch.object(cinder.db, 'service_get') - def test_failover_host_unexpected_status(self, mock_db_args, - mock_db_update, - mock_failover): - """Test replication failover_host unxepected status.""" + @mock.patch.object(cinder.db, 'service_get_all') + def test_failover_unexpected_status(self, mock_db_get_all, mock_db_update, + mock_failover): + """Test replication failover unxepected status.""" - mock_db_args.return_value = fake_service.fake_service_obj( + mock_db_get_all.return_value = [fake_service.fake_service_obj( self.context, - binary='cinder-volume') + binary='cinder-volume')] mock_db_update.return_value = None volume_api = cinder.volume.api.API() self.assertRaises(exception.InvalidInput, - volume_api.failover_host, + volume_api.failover, self.context, - host=CONF.host) + host=CONF.host, + cluster_name=None) @mock.patch.object(volume_rpcapi.VolumeAPI, 'freeze_host') - @mock.patch.object(cinder.db, 'conditional_update') - @mock.patch.object(cinder.db, 'service_get') - def test_freeze_host(self, mock_db_args, mock_db_update, + @mock.patch.object(cinder.db, 'conditional_update', return_value=1) + @mock.patch.object(cinder.objects.ServiceList, 'get_all') + def test_freeze_host(self, mock_get_all, mock_db_update, mock_freeze): """Test replication freeze_host.""" - mock_db_args.return_value = fake_service.fake_service_obj( - self.context, - binary='cinder-volume') - mock_db_update.return_value = {'frozen': False} + service = fake_service.fake_service_obj(self.context, + binary='cinder-volume') + mock_get_all.return_value = [service] + mock_freeze.return_value = True volume_api = cinder.volume.api.API() - volume_api.freeze_host(self.context, host=CONF.host) - mock_freeze.assert_called_once_with(self.context, CONF.host) + volume_api.freeze_host(self.context, host=CONF.host, cluster_name=None) + mock_freeze.assert_called_once_with(self.context, service) @mock.patch.object(volume_rpcapi.VolumeAPI, 'freeze_host') @mock.patch.object(cinder.db, 'conditional_update') - @mock.patch.object(cinder.db, 'service_get') - def test_freeze_host_unexpected_status(self, mock_db_args, + @mock.patch.object(cinder.db, 'service_get_all') + def test_freeze_host_unexpected_status(self, mock_get_all, mock_db_update, mock_freeze): """Test replication freeze_host unexpected status.""" - mock_db_args.return_value = fake_service.fake_service_obj( + mock_get_all.return_value = [fake_service.fake_service_obj( self.context, - binary='cinder-volume') + binary='cinder-volume')] mock_db_update.return_value = None volume_api = cinder.volume.api.API() self.assertRaises(exception.InvalidInput, volume_api.freeze_host, self.context, - host=CONF.host) + host=CONF.host, + cluster_name=None) @mock.patch.object(volume_rpcapi.VolumeAPI, 'thaw_host') - @mock.patch.object(cinder.db, 'conditional_update') - @mock.patch.object(cinder.db, 'service_get') - def test_thaw_host(self, mock_db_args, mock_db_update, + @mock.patch.object(cinder.db, 'conditional_update', return_value=1) + @mock.patch.object(cinder.objects.ServiceList, 'get_all') + def test_thaw_host(self, mock_get_all, mock_db_update, mock_thaw): """Test replication thaw_host.""" - mock_db_args.return_value = fake_service.fake_service_obj( - self.context, - binary='cinder-volume') - mock_db_update.return_value = {'frozen': True} + service = fake_service.fake_service_obj(self.context, + binary='cinder-volume') + mock_get_all.return_value = [service] mock_thaw.return_value = True volume_api = cinder.volume.api.API() - volume_api.thaw_host(self.context, host=CONF.host) - mock_thaw.assert_called_once_with(self.context, CONF.host) + volume_api.thaw_host(self.context, host=CONF.host, cluster_name=None) + mock_thaw.assert_called_once_with(self.context, service) @mock.patch.object(volume_rpcapi.VolumeAPI, 'thaw_host') @mock.patch.object(cinder.db, 'conditional_update') - @mock.patch.object(cinder.db, 'service_get') - def test_thaw_host_unexpected_status(self, mock_db_args, + @mock.patch.object(cinder.db, 'service_get_all') + def test_thaw_host_unexpected_status(self, mock_get_all, mock_db_update, mock_thaw): """Test replication thaw_host unexpected status.""" - mock_db_args.return_value = fake_service.fake_service_obj( + mock_get_all.return_value = [fake_service.fake_service_obj( self.context, - binary='cinder-volume') + binary='cinder-volume')] mock_db_update.return_value = None volume_api = cinder.volume.api.API() self.assertRaises(exception.InvalidInput, volume_api.thaw_host, self.context, - host=CONF.host) + host=CONF.host, cluster_name=None) + + @mock.patch('cinder.volume.driver.BaseVD.failover_completed') + def test_failover_completed(self, completed_mock): + rep_field = fields.ReplicationStatus + svc = objects.Service(self.context, host=self.volume.host, + binary=constants.VOLUME_BINARY, + replication_status=rep_field.ENABLED) + svc.create() + self.volume.failover_completed( + self.context, + {'active_backend_id': 'secondary', + 'replication_status': rep_field.FAILED_OVER}) + service = objects.Service.get_by_id(self.context, svc.id) + self.assertEqual('secondary', service.active_backend_id) + self.assertEqual('failed-over', service.replication_status) + completed_mock.assert_called_once_with(self.context, 'secondary') + + @mock.patch('cinder.volume.driver.BaseVD.failover_completed', wraps=True) + def test_failover_completed_driver_failure(self, completed_mock): + rep_field = fields.ReplicationStatus + svc = objects.Service(self.context, host=self.volume.host, + binary=constants.VOLUME_BINARY, + replication_status=rep_field.ENABLED) + svc.create() + self.volume.failover_completed( + self.context, + {'active_backend_id': 'secondary', + 'replication_status': rep_field.FAILED_OVER}) + service = objects.Service.get_by_id(self.context, svc.id) + self.assertEqual('secondary', service.active_backend_id) + self.assertEqual(rep_field.ERROR, service.replication_status) + self.assertTrue(service.disabled) + self.assertIsNotNone(service.disabled_reason) + completed_mock.assert_called_once_with(self.context, 'secondary') + + @mock.patch('cinder.volume.rpcapi.VolumeAPI.failover_completed') + def test_finish_failover_non_clustered(self, completed_mock): + svc = mock.Mock(is_clustered=None) + self.volume.finish_failover(self.context, svc, mock.sentinel.updates) + svc.update.assert_called_once_with(mock.sentinel.updates) + svc.save.assert_called_once_with() + completed_mock.assert_not_called() + + @mock.patch('cinder.volume.rpcapi.VolumeAPI.failover_completed') + def test_finish_failover_clustered(self, completed_mock): + svc = mock.Mock(cluster_name='cluster_name') + updates = {'status': 'error'} + self.volume.finish_failover(self.context, svc, updates) + completed_mock.assert_called_once_with(self.context, svc, updates) + svc.cluster.status = 'error' + svc.cluster.save.assert_called_once() + + @ddt.data(None, 'cluster_name') + @mock.patch('cinder.volume.manager.VolumeManager.finish_failover') + @mock.patch('cinder.volume.manager.VolumeManager._get_my_volumes') + def test_failover_manager(self, cluster, get_vols_mock, finish_mock): + """Test manager's failover method for clustered and not clustered.""" + rep_field = fields.ReplicationStatus + svc = objects.Service(self.context, host=self.volume.host, + binary=constants.VOLUME_BINARY, + cluster_name=cluster, + replication_status=rep_field.ENABLED) + svc.create() + + vol = objects.Volume(self.context, host=self.volume.host) + vol.create() + + get_vols_mock.return_value = [vol] + + with mock.patch.object(self.volume, 'driver') as driver: + called, not_called = driver.failover_host, driver.failover + if cluster: + called, not_called = not_called, called + + called.return_value = ('secondary', [{'volume_id': vol.id, + 'updates': {'status': 'error'}}]) + + self.volume.failover(self.context, + secondary_backend_id='secondary') + + not_called.assert_not_called() + called.assert_called_once_with(self.context, [vol], + secondary_id='secondary') + + expected_update = {'replication_status': rep_field.FAILED_OVER, + 'active_backend_id': 'secondary', + 'disabled': True, + 'disabled_reason': 'failed-over'} + finish_mock.assert_called_once_with(self.context, svc, expected_update) + + volume = objects.Volume.get_by_id(self.context, vol.id) + self.assertEqual('error', volume.status) + + @ddt.data(('host1', None), (None, 'mycluster')) + @ddt.unpack + def test_failover_api_fail_multiple_results(self, host, cluster): + """Fail if we try to failover multiple backends in the same request.""" + rep_field = fields.ReplicationStatus + clusters = [ + objects.Cluster(self.context, + name='mycluster@backend1', + replication_status=rep_field.ENABLED, + binary=constants.VOLUME_BINARY), + objects.Cluster(self.context, + name='mycluster@backend2', + replication_status=rep_field.ENABLED, + binary=constants.VOLUME_BINARY) + ] + clusters[0].create() + clusters[1].create() + services = [ + objects.Service(self.context, host='host1@backend1', + cluster_name=clusters[0].name, + replication_status=rep_field.ENABLED, + binary=constants.VOLUME_BINARY), + objects.Service(self.context, host='host1@backend2', + cluster_name=clusters[1].name, + replication_status=rep_field.ENABLED, + binary=constants.VOLUME_BINARY), + ] + services[0].create() + services[1].create() + self.assertRaises(exception.Invalid, + self.volume_api.failover, self.context, host, + cluster) + + def test_failover_api_not_found(self): + self.assertRaises(exception.ServiceNotFound, self.volume_api.failover, + self.context, 'host1', None) + + @mock.patch('cinder.volume.rpcapi.VolumeAPI.failover') + def test_failover_api_success_multiple_results(self, failover_mock): + """Succeed to failover multiple services for the same backend.""" + rep_field = fields.ReplicationStatus + cluster_name = 'mycluster@backend1' + cluster = objects.Cluster(self.context, + name=cluster_name, + replication_status=rep_field.ENABLED, + binary=constants.VOLUME_BINARY) + cluster.create() + services = [ + objects.Service(self.context, host='host1@backend1', + cluster_name=cluster_name, + replication_status=rep_field.ENABLED, + binary=constants.VOLUME_BINARY), + objects.Service(self.context, host='host2@backend1', + cluster_name=cluster_name, + replication_status=rep_field.ENABLED, + binary=constants.VOLUME_BINARY), + ] + services[0].create() + services[1].create() + + self.volume_api.failover(self.context, None, cluster_name, + mock.sentinel.secondary_id) + + for service in services + [cluster]: + self.assertEqual(rep_field.ENABLED, service.replication_status) + service.refresh() + self.assertEqual(rep_field.FAILING_OVER, + service.replication_status) + + failover_mock.assert_called_once_with(self.context, mock.ANY, + mock.sentinel.secondary_id) + self.assertEqual(services[0].id, failover_mock.call_args[0][1].id) + + @mock.patch('cinder.volume.rpcapi.VolumeAPI.failover') + def test_failover_api_success_multiple_results_not_updated(self, + failover_mock): + """Succeed to failover even if a service is not updated.""" + rep_field = fields.ReplicationStatus + cluster_name = 'mycluster@backend1' + cluster = objects.Cluster(self.context, + name=cluster_name, + replication_status=rep_field.ENABLED, + binary=constants.VOLUME_BINARY) + cluster.create() + services = [ + objects.Service(self.context, host='host1@backend1', + cluster_name=cluster_name, + replication_status=rep_field.ENABLED, + binary=constants.VOLUME_BINARY), + objects.Service(self.context, host='host2@backend1', + cluster_name=cluster_name, + replication_status=rep_field.ERROR, + binary=constants.VOLUME_BINARY), + ] + services[0].create() + services[1].create() + + self.volume_api.failover(self.context, None, cluster_name, + mock.sentinel.secondary_id) + + for service in services[:1] + [cluster]: + service.refresh() + self.assertEqual(rep_field.FAILING_OVER, + service.replication_status) + + services[1].refresh() + self.assertEqual(rep_field.ERROR, services[1].replication_status) + + failover_mock.assert_called_once_with(self.context, mock.ANY, + mock.sentinel.secondary_id) + self.assertEqual(services[0].id, failover_mock.call_args[0][1].id) + + @mock.patch('cinder.volume.rpcapi.VolumeAPI.failover') + def test_failover_api_fail_multiple_results_not_updated(self, + failover_mock): + """Fail if none of the services could be updated.""" + rep_field = fields.ReplicationStatus + cluster_name = 'mycluster@backend1' + cluster = objects.Cluster(self.context, + name=cluster_name, + replication_status=rep_field.ENABLED, + binary=constants.VOLUME_BINARY) + cluster.create() + down_time = timeutils.datetime.datetime(1970, 1, 1) + services = [ + # This service is down + objects.Service(self.context, host='host1@backend1', + cluster_name=cluster_name, + replication_status=rep_field.ENABLED, + created_at=down_time, + updated_at=down_time, + modified_at=down_time, + binary=constants.VOLUME_BINARY), + # This service is not with the right replication status + objects.Service(self.context, host='host2@backend1', + cluster_name=cluster_name, + replication_status=rep_field.ERROR, + binary=constants.VOLUME_BINARY), + ] + services[0].create() + services[1].create() + + self.assertRaises(exception.InvalidInput, + self.volume_api.failover, self.context, None, + cluster_name, mock.sentinel.secondary_id) + + for service in services: + svc = objects.Service.get_by_id(self.context, service.id) + self.assertEqual(service.replication_status, + svc.replication_status) + + cluster.refresh() + self.assertEqual(rep_field.ENABLED, cluster.replication_status) + + failover_mock.assert_not_called() class CopyVolumeToImageTestCase(base.BaseVolumeTestCase): diff --git a/cinder/tests/unit/test_volume_rpcapi.py b/cinder/tests/unit/test_volume_rpcapi.py index a48cdea7c78..048abaf0155 100644 --- a/cinder/tests/unit/test_volume_rpcapi.py +++ b/cinder/tests/unit/test_volume_rpcapi.py @@ -32,6 +32,7 @@ from cinder.objects import fields from cinder import test from cinder.tests.unit.backup import fake_backup from cinder.tests.unit import fake_constants as fake +from cinder.tests.unit import fake_service from cinder.tests.unit import fake_snapshot from cinder.tests.unit import fake_volume from cinder.tests.unit import utils as tests_utils @@ -135,6 +136,7 @@ class VolumeRpcAPITestCase(test.TestCase): self.assertIn('id', self.fake_volume) def _get_expected_msg(self, kwargs): + update = kwargs.pop('_expected_msg', {}) expected_msg = copy.deepcopy(kwargs) if 'volume' in expected_msg: volume = expected_msg.pop('volume') @@ -172,9 +174,11 @@ class VolumeRpcAPITestCase(test.TestCase): if 'new_volume' in expected_msg: volume = expected_msg['new_volume'] expected_msg['new_volume_id'] = volume['id'] + expected_msg.update(update) return expected_msg - def _test_volume_api(self, method, rpc_method, **kwargs): + def _test_volume_api(self, method, rpc_method, _expected_method=None, + **kwargs): ctxt = context.RequestContext('fake_user', 'fake_project') if 'rpcapi_class' in kwargs: @@ -207,6 +211,8 @@ class VolumeRpcAPITestCase(test.TestCase): host = 'fake_host' elif 'cgsnapshot' in kwargs: host = kwargs['cgsnapshot'].consistencygroup.service_topic_queue + elif 'service' in kwargs: + host = kwargs['service'].service_topic_queue target['server'] = utils.extract_host(host, 'host') target['topic'] = '%s.%s' % (constants.VOLUME_TOPIC, @@ -233,7 +239,7 @@ class VolumeRpcAPITestCase(test.TestCase): retval = getattr(rpcapi, method)(ctxt, **kwargs) self.assertEqual(expected_retval, retval) - expected_args = [ctxt, method] + expected_args = [ctxt, _expected_method or method] for arg, expected_arg in zip(self.fake_args, expected_args): self.assertEqual(expected_arg, arg) @@ -623,18 +629,42 @@ class VolumeRpcAPITestCase(test.TestCase): version='3.0') def test_freeze_host(self): + service = fake_service.fake_service_obj(self.context, + host='fake_host', + binary='cinder-volume') self._test_volume_api('freeze_host', rpc_method='call', - host='fake_host', version='3.0') + service=service, version='3.0') def test_thaw_host(self): - self._test_volume_api('thaw_host', rpc_method='call', host='fake_host', + service = fake_service.fake_service_obj(self.context, + host='fake_host', + binary='cinder-volume') + self._test_volume_api('thaw_host', rpc_method='call', service=service, version='3.0') - def test_failover_host(self): - self._test_volume_api('failover_host', rpc_method='cast', - host='fake_host', + @ddt.data('3.0', '3.8') + @mock.patch('oslo_messaging.RPCClient.can_send_version') + def test_failover(self, version, can_send_version): + can_send_version.side_effect = lambda x: x == version + service = objects.Service(self.context, host='fake_host', + cluster_name=None) + _expected_method = 'failover' if version == '3.8' else 'failover_host' + self._test_volume_api('failover', rpc_method='cast', + service=service, secondary_backend_id='fake_backend', - version='3.0') + version=version, + _expected_method=_expected_method) + + @mock.patch('cinder.volume.rpcapi.VolumeAPI._get_cctxt') + def test_failover_completed(self, cctxt_mock): + service = objects.Service(self.context, host='fake_host', + cluster_name='cluster_name') + rpcapi = volume_rpcapi.VolumeAPI() + rpcapi.failover_completed(self.context, service, mock.sentinel.updates) + cctxt_mock.assert_called_once_with(service.cluster_name, '3.8', + fanout=True) + cctxt_mock.return_value.cast(self.context, 'failover_completed', + updates=mock.sentinel.updates) def test_create_consistencygroup_from_src_cgsnapshot(self): self._test_volume_api('create_consistencygroup_from_src', diff --git a/cinder/tests/unit/volume/drivers/nexenta/test_nexenta_edge.py b/cinder/tests/unit/volume/drivers/nexenta/test_nexenta_edge.py index 129624e9e76..dcf8ddfd77c 100644 --- a/cinder/tests/unit/volume/drivers/nexenta/test_nexenta_edge.py +++ b/cinder/tests/unit/volume/drivers/nexenta/test_nexenta_edge.py @@ -76,6 +76,7 @@ class TestNexentaEdgeISCSIDriver(test.TestCase): self.cfg.nexenta_iscsi_service = NEDGE_SERVICE self.cfg.nexenta_blocksize = NEDGE_BLOCKSIZE self.cfg.nexenta_chunksize = NEDGE_CHUNKSIZE + self.cfg.replication_device = [] mock_exec = mock.Mock() mock_exec.return_value = ('', '') diff --git a/cinder/tests/unit/volume/drivers/nexenta/test_nexenta_edge_nbd.py b/cinder/tests/unit/volume/drivers/nexenta/test_nexenta_edge_nbd.py index ae76f6cab6e..b4a442de70a 100644 --- a/cinder/tests/unit/volume/drivers/nexenta/test_nexenta_edge_nbd.py +++ b/cinder/tests/unit/volume/drivers/nexenta/test_nexenta_edge_nbd.py @@ -78,6 +78,7 @@ class TestNexentaEdgeNBDDriver(test.TestCase): self.cfg.nexenta_blocksize = 512 self.cfg.nexenta_chunksize = 4096 self.cfg.reserved_percentage = 0 + self.cfg.replication_device = [] self.ctx = context.get_admin_context() self.drv = nbd.NexentaEdgeNBDDriver(configuration=self.cfg) diff --git a/cinder/tests/unit/volume/test_driver.py b/cinder/tests/unit/volume/test_driver.py new file mode 100644 index 00000000000..1602bc642d8 --- /dev/null +++ b/cinder/tests/unit/volume/test_driver.py @@ -0,0 +1,120 @@ +# Copyright (c) 2016 Red Hat Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import ddt +import mock + +from cinder import exception +from cinder import test +from cinder.volume import driver +from cinder.volume import manager + + +def my_safe_get(self, value): + if value == 'replication_device': + return ['replication'] + return None + + +@ddt.ddt +class DriverTestCase(test.TestCase): + + @staticmethod + def _get_driver(relicated, version): + class NonReplicatedDriver(driver.VolumeDriver): + pass + + class V21Driver(driver.VolumeDriver): + def failover_host(*args, **kwargs): + pass + + class AADriver(V21Driver): + def failover_completed(*args, **kwargs): + pass + + if not relicated: + return NonReplicatedDriver + + if version == 'v2.1': + return V21Driver + + return AADriver + + @ddt.data('v2.1', 'a/a', 'newfeature') + def test_supports_replication_feature_none(self, rep_version): + my_driver = self._get_driver(False, None) + self.assertFalse(my_driver.supports_replication_feature(rep_version)) + + @ddt.data('v2.1', 'a/a', 'newfeature') + def test_supports_replication_feature_only_21(self, rep_version): + version = 'v2.1' + my_driver = self._get_driver(True, version) + self.assertEqual(rep_version == version, + my_driver.supports_replication_feature(rep_version)) + + @ddt.data('v2.1', 'a/a', 'newfeature') + def test_supports_replication_feature_aa(self, rep_version): + my_driver = self._get_driver(True, 'a/a') + self.assertEqual(rep_version in ('v2.1', 'a/a'), + my_driver.supports_replication_feature(rep_version)) + + def test_init_non_replicated(self): + config = manager.config.Configuration(manager.volume_manager_opts, + config_group='volume') + # No exception raised + self._get_driver(False, None)(configuration=config) + + @ddt.data('v2.1', 'a/a') + @mock.patch('cinder.volume.configuration.Configuration.safe_get', + my_safe_get) + def test_init_replicated_non_clustered(self, version): + def append_config_values(self, volume_opts): + pass + + config = manager.config.Configuration(manager.volume_manager_opts, + config_group='volume') + # No exception raised + self._get_driver(True, version)(configuration=config) + + @mock.patch('cinder.volume.configuration.Configuration.safe_get', + my_safe_get) + def test_init_replicated_clustered_not_supported(self): + config = manager.config.Configuration(manager.volume_manager_opts, + config_group='volume') + # Raises exception because we are trying to run a replicated service + # in clustered mode but the driver doesn't support it. + self.assertRaises(exception.Invalid, self._get_driver(True, 'v2.1'), + configuration=config, cluster_name='mycluster') + + @mock.patch('cinder.volume.configuration.Configuration.safe_get', + my_safe_get) + def test_init_replicated_clustered_supported(self): + config = manager.config.Configuration(manager.volume_manager_opts, + config_group='volume') + # No exception raised + self._get_driver(True, 'a/a')(configuration=config, + cluster_name='mycluster') + + def test_failover(self): + """Test default failover behavior of calling failover_host.""" + my_driver = self._get_driver(True, 'a/a')() + with mock.patch.object(my_driver, 'failover_host') as failover_mock: + res = my_driver.failover(mock.sentinel.context, + mock.sentinel.volumes, + secondary_id=mock.sentinel.secondary_id) + self.assertEqual(failover_mock.return_value, res) + failover_mock.assert_called_once_with(mock.sentinel.context, + mock.sentinel.volumes, + mock.sentinel.secondary_id) diff --git a/cinder/tests/unit/volume/test_replication_manager.py b/cinder/tests/unit/volume/test_replication_manager.py index 3cc2a48e863..3d7486aebc4 100644 --- a/cinder/tests/unit/volume/test_replication_manager.py +++ b/cinder/tests/unit/volume/test_replication_manager.py @@ -31,7 +31,7 @@ class ReplicationTestCase(base.BaseVolumeTestCase): self.host = 'host@backend#pool' self.manager = manager.VolumeManager(host=self.host) - @mock.patch('cinder.objects.VolumeList.get_all_by_host') + @mock.patch('cinder.objects.VolumeList.get_all') @mock.patch('cinder.volume.driver.BaseVD.failover_host', side_effect=exception.InvalidReplicationTarget('')) @ddt.data(('backend2', 'default', fields.ReplicationStatus.FAILED_OVER), @@ -55,7 +55,8 @@ class ReplicationTestCase(base.BaseVolumeTestCase): replication_status=fields.ReplicationStatus.FAILING_OVER) self.manager.failover_host(self.context, new_backend) - mock_getall.assert_called_once_with(self.context, self.host) + mock_getall.assert_called_once_with(self.context, + filters={'host': self.host}) mock_failover.assert_called_once_with(self.context, mock_getall.return_value, secondary_id=new_backend) diff --git a/cinder/volume/api.py b/cinder/volume/api.py index 2c485434b9d..ab13c4bb378 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -26,6 +26,7 @@ from oslo_log import log as logging from oslo_utils import excutils from oslo_utils import strutils from oslo_utils import timeutils +from oslo_utils import versionutils import six from cinder.api import common @@ -1704,70 +1705,137 @@ class API(base.Base): offset, sort_keys, sort_dirs) + def _get_cluster_and_services_for_replication(self, ctxt, host, + cluster_name): + services = objects.ServiceList.get_all( + ctxt, filters={'host': host, 'cluster_name': cluster_name, + 'binary': constants.VOLUME_BINARY}) + + if not services: + msg = _('No service found with ') + ( + 'host=%(host)s' if host else 'cluster=%(cluster_name)s') + raise exception.ServiceNotFound(msg, host=host, + cluster_name=cluster_name) + + cluster = services[0].cluster + # Check that the host or cluster we received only results in 1 host or + # hosts from the same cluster. + if cluster_name: + check_attribute = 'cluster_name' + expected = cluster.name + else: + check_attribute = 'host' + expected = services[0].host + if any(getattr(s, check_attribute) != expected for s in services): + msg = _('Services from different clusters found.') + raise exception.InvalidParameterValue(msg) + + # If we received host parameter but host belongs to a cluster we have + # to change all the services in the cluster, not just one host + if host and cluster: + services = cluster.services + + return cluster, services + + def _replication_db_change(self, ctxt, field, expected_value, new_value, + host, cluster_name, check_up=False): + def _error_msg(service): + expected = utils.build_or_str(six.text_type(expected_value)) + up_msg = 'and must be up ' if check_up else '' + msg = (_('%(field)s in %(service)s must be %(expected)s ' + '%(up_msg)sto failover.') + % {'field': field, 'service': service, + 'expected': expected, 'up_msg': up_msg}) + LOG.error(msg) + return msg + + cluster, services = self._get_cluster_and_services_for_replication( + ctxt, host, cluster_name) + + expect = {field: expected_value} + change = {field: new_value} + + if cluster: + old_value = getattr(cluster, field) + if ((check_up and not cluster.is_up) + or not cluster.conditional_update(change, expect)): + msg = _error_msg(cluster.name) + raise exception.InvalidInput(reason=msg) + + changed = [] + not_changed = [] + for service in services: + if ((not check_up or service.is_up) + and service.conditional_update(change, expect)): + changed.append(service) + else: + not_changed.append(service) + + # If there were some services that couldn't be changed we should at + # least log the error. + if not_changed: + msg = _error_msg([s.host for s in not_changed]) + # If we couldn't change any of the services + if not changed: + # Undo the cluster change + if cluster: + setattr(cluster, field, old_value) + cluster.save() + raise exception.InvalidInput( + reason=_('No service could be changed: %s') % msg) + LOG.warning(_LW('Some services could not be changed: %s'), msg) + + return cluster, services + # FIXME(jdg): Move these Cheesecake methods (freeze, thaw and failover) # to a services API because that's what they are - def failover_host(self, - ctxt, - host, - secondary_id=None): - + def failover(self, ctxt, host, cluster_name, secondary_id=None): check_policy(ctxt, 'failover_host') ctxt = ctxt if ctxt.is_admin else ctxt.elevated() - svc_host = volume_utils.extract_host(host, 'backend') - service = objects.Service.get_by_args( - ctxt, svc_host, constants.VOLUME_BINARY) - expected = {'replication_status': [fields.ReplicationStatus.ENABLED, - fields.ReplicationStatus.FAILED_OVER]} - result = service.conditional_update( - {'replication_status': fields.ReplicationStatus.FAILING_OVER}, - expected) - if not result: - expected_status = utils.build_or_str( - expected['replication_status']) - msg = (_('Host replication_status must be %s to failover.') - % expected_status) - LOG.error(msg) - raise exception.InvalidInput(reason=msg) - self.volume_rpcapi.failover_host(ctxt, host, secondary_id) + # TODO(geguileo): In P - Remove this version check + rpc_version = self.volume_rpcapi.determine_rpc_version_cap() + rpc_version = versionutils.convert_version_to_tuple(rpc_version) + if cluster_name and rpc_version < (3, 5): + msg = _('replication operations with cluster field') + raise exception.UnavailableDuringUpgrade(action=msg) - def freeze_host(self, ctxt, host): + rep_fields = fields.ReplicationStatus + expected_values = [rep_fields.ENABLED, rep_fields.FAILED_OVER] + new_value = rep_fields.FAILING_OVER + cluster, services = self._replication_db_change( + ctxt, 'replication_status', expected_values, new_value, host, + cluster_name, check_up=True) + + self.volume_rpcapi.failover(ctxt, services[0], secondary_id) + + def freeze_host(self, ctxt, host, cluster_name): check_policy(ctxt, 'freeze_host') ctxt = ctxt if ctxt.is_admin else ctxt.elevated() - svc_host = volume_utils.extract_host(host, 'backend') - service = objects.Service.get_by_args( - ctxt, svc_host, constants.VOLUME_BINARY) - expected = {'frozen': False} - result = service.conditional_update( - {'frozen': True}, expected) - if not result: - msg = _('Host is already Frozen.') - LOG.error(msg) - raise exception.InvalidInput(reason=msg) + expected = False + new_value = True + cluster, services = self._replication_db_change( + ctxt, 'frozen', expected, new_value, host, cluster_name, + check_up=False) # Should we set service status to disabled to keep # scheduler calls from being sent? Just use existing # `cinder service-disable reason=freeze` - self.volume_rpcapi.freeze_host(ctxt, host) - - def thaw_host(self, ctxt, host): + self.volume_rpcapi.freeze_host(ctxt, services[0]) + def thaw_host(self, ctxt, host, cluster_name): check_policy(ctxt, 'thaw_host') ctxt = ctxt if ctxt.is_admin else ctxt.elevated() - svc_host = volume_utils.extract_host(host, 'backend') - service = objects.Service.get_by_args( - ctxt, svc_host, constants.VOLUME_BINARY) - expected = {'frozen': True} - result = service.conditional_update( - {'frozen': False}, expected) - if not result: - msg = _('Host is NOT Frozen.') - LOG.error(msg) - raise exception.InvalidInput(reason=msg) - if not self.volume_rpcapi.thaw_host(ctxt, host): + expected = True + new_value = False + cluster, services = self._replication_db_change( + ctxt, 'frozen', expected, new_value, host, cluster_name, + check_up=False) + + if not self.volume_rpcapi.thaw_host(ctxt, services[0]): return "Backend reported error during thaw_host operation." def check_volume_filters(self, filters, strict=False): diff --git a/cinder/volume/driver.py b/cinder/volume/driver.py index 8f722df56ad..c177dd080c1 100644 --- a/cinder/volume/driver.py +++ b/cinder/volume/driver.py @@ -335,6 +335,10 @@ class BaseVD(object): # the unsupported driver started. SUPPORTED = True + # Methods checked to detect a driver implements a replication feature + REPLICATION_FEATURE_CHECKERS = {'v2.1': 'failover_host', + 'a/a': 'failover_completed'} + def __init__(self, execute=utils.execute, *args, **kwargs): # NOTE(vish): db is set by Manager self.db = kwargs.get('db') @@ -347,6 +351,16 @@ class BaseVD(object): self.configuration.append_config_values(iser_opts) utils.setup_tracing(self.configuration.safe_get('trace_flags')) + # NOTE(geguileo): Don't allow to start if we are enabling + # replication on a cluster service with a backend that doesn't + # support the required mechanism for Active-Active. + replication_devices = self.configuration.safe_get( + 'replication_device') + if (self.cluster_name and replication_devices and + not self.supports_replication_feature('a/a')): + raise exception.Invalid(_("Driver doesn't support clustered " + "replication.")) + self.driver_utils = driver_utils.VolumeDriverUtils( self._driver_data_namespace(), self.db) @@ -1701,6 +1715,38 @@ class BaseVD(object): # 'replication_extended_status': 'whatever',...}},] raise NotImplementedError() + def failover(self, context, volumes, secondary_id=None): + """Like failover but for a host that is clustered. + + Most of the time this will be the exact same behavior as failover_host, + so if it's not overwritten, it is assumed to be the case. + """ + return self.failover_host(context, volumes, secondary_id) + + def failover_completed(self, context, active_backend_id=None): + """This method is called after failover for clustered backends.""" + raise NotImplementedError() + + @classmethod + def _is_base_method(cls, method_name): + method = getattr(cls, method_name) + return method.__module__ == getattr(BaseVD, method_name).__module__ + + @classmethod + def supports_replication_feature(cls, feature): + """Check if driver class supports replication features. + + Feature is a string that must be one of: + - v2.1 + - a/a + """ + if feature not in cls.REPLICATION_FEATURE_CHECKERS: + return False + + # Check if method is being implemented/overwritten by the driver + method_name = cls.REPLICATION_FEATURE_CHECKERS[feature] + return not cls._is_base_method(method_name) + def get_replication_updates(self, context): """Old replication update method, deprecate.""" raise NotImplementedError() diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index 8e60862804b..d2e05929e63 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -4151,9 +4151,8 @@ class VolumeManager(manager.CleanableManager, volume.update(model_update_default) volume.save() - # Replication V2.1 methods - def failover_host(self, context, - secondary_backend_id=None): + # Replication V2.1 and a/a method + def failover(self, context, secondary_backend_id=None): """Failover a backend to a secondary replication target. Instructs a replication capable/configured backend to failover @@ -4167,30 +4166,33 @@ class VolumeManager(manager.CleanableManager, :param context: security context :param secondary_backend_id: Specifies backend_id to fail over to """ + updates = {} + repl_status = fields.ReplicationStatus + svc_host = vol_utils.extract_host(self.host, 'backend') + service = objects.Service.get_by_args(context, svc_host, + constants.VOLUME_BINARY) + volumes = self._get_my_volumes(context) - service = objects.Service.get_by_args( - context, - svc_host, - constants.VOLUME_BINARY) - volumes = objects.VolumeList.get_all_by_host(context, self.host) - - exception_encountered = False + exception_encountered = True try: + # For non clustered we can call v2.1 failover_host, but for + # clustered we call a/a failover method. We know a/a method + # exists because BaseVD class wouldn't have started if it didn't. + failover = getattr(self.driver, + 'failover' if service.is_clustered + else 'failover_host') # expected form of volume_update_list: # [{volume_id: , updates: {'provider_id': xxxx....}}, # {volume_id: , updates: {'provider_id': xxxx....}}] - (active_backend_id, volume_update_list) = ( - self.driver.failover_host( - context, - volumes, - secondary_id=secondary_backend_id)) + active_backend_id, volume_update_list = failover( + context, + volumes, + secondary_id=secondary_backend_id) + exception_encountered = False except exception.UnableToFailOver: LOG.exception(_LE("Failed to perform replication failover")) - service.replication_status = ( - fields.ReplicationStatus.FAILOVER_ERROR) - service.save() - exception_encountered = True + updates['replication_status'] = repl_status.FAILOVER_ERROR except exception.InvalidReplicationTarget: LOG.exception(_LE("Invalid replication target specified " "for failover")) @@ -4199,12 +4201,9 @@ class VolumeManager(manager.CleanableManager, # secondary to another secondary. In both cases active_backend_id # will be set. if service.active_backend_id: - service.replication_status = ( - fields.ReplicationStatus.FAILED_OVER) + updates['replication_status'] = repl_status.FAILED_OVER else: - service.replication_status = fields.ReplicationStatus.ENABLED - service.save() - exception_encountered = True + updates['replication_status'] = repl_status.ENABLED except exception.VolumeDriverException: # NOTE(jdg): Drivers need to be aware if they fail during # a failover sequence, we're expecting them to cleanup @@ -4212,36 +4211,29 @@ class VolumeManager(manager.CleanableManager, # backend is still set as primary as per driver memory LOG.error(_LE("Driver reported error during " "replication failover.")) - service.replication_status = ( - fields.ReplicationStatus.FAILOVER_ERROR) - service.disabled = True - service.save() - exception_encountered = True + updates.update(disabled=True, + replication_status=repl_status.FAILOVER_ERROR) if exception_encountered: LOG.error( _LE("Error encountered during failover on host: " "%(host)s invalid target ID %(backend_id)s"), {'host': self.host, 'backend_id': secondary_backend_id}) + self.finish_failover(context, service, updates) return if secondary_backend_id == "default": - service.replication_status = fields.ReplicationStatus.ENABLED - service.active_backend_id = "" - if service.frozen: - service.disabled = True - service.disabled_reason = "frozen" - else: - service.disabled = False - service.disabled_reason = "" - service.save() - + updates['replication_status'] = repl_status.ENABLED + updates['active_backend_id'] = '' + updates['disabled'] = service.frozen + updates['disabled_reason'] = 'frozen' if service.frozen else '' else: - service.replication_status = fields.ReplicationStatus.FAILED_OVER - service.active_backend_id = active_backend_id - service.disabled = True - service.disabled_reason = "failed-over" - service.save() + updates['replication_status'] = repl_status.FAILED_OVER + updates['active_backend_id'] = active_backend_id + updates['disabled'] = True + updates['disabled_reason'] = 'failed-over' + + self.finish_failover(context, service, updates) for update in volume_update_list: # Response must include an id key: {volume_id: } @@ -4259,6 +4251,53 @@ class VolumeManager(manager.CleanableManager, LOG.info(_LI("Failed over to replication target successfully.")) + # TODO(geguileo): In P - remove this + failover_host = failover + + def finish_failover(self, context, service, updates): + """Completion of the failover locally or via RPC.""" + # If the service is clustered, broadcast the service changes to all + # volume services, including this one. + if service.is_clustered: + # We have to update the cluster with the same data, and we do it + # before broadcasting the failover_completed RPC call to prevent + # races with services that may be starting.. + for key, value in updates.items(): + setattr(service.cluster, key, value) + service.cluster.save() + rpcapi = volume_rpcapi.VolumeAPI() + rpcapi.failover_completed(context, service, updates) + else: + service.update(updates) + service.save() + + def failover_completed(self, context, updates): + """Finalize failover of this backend. + + When a service is clustered and replicated the failover has 2 stages, + one that does the failover of the volumes and another that finalizes + the failover of the services themselves. + + This method takes care of the last part and is called from the service + doing the failover of the volumes after finished processing the + volumes. + """ + svc_host = vol_utils.extract_host(self.host, 'backend') + service = objects.Service.get_by_args(context, svc_host, + constants.VOLUME_BINARY) + service.update(updates) + try: + self.driver.failover_completed(context, service.active_backend_id) + except Exception: + msg = _('Driver reported error during replication failover ' + 'completion.') + LOG.exception(msg) + service.disabled = True + service.disabled_reason = msg + service.replication_status = ( + fields.ReplicationStatus.ERROR) + service.save() + def freeze_host(self, context): """Freeze management plane on this backend. diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py index 3bb0dd7a692..f7c8135ff1e 100644 --- a/cinder/volume/rpcapi.py +++ b/cinder/volume/rpcapi.py @@ -122,9 +122,10 @@ class VolumeAPI(rpc.RPCAPI): of @backend suffixes in server names. 3.7 - Adds do_cleanup method to do volume cleanups from other nodes that we were doing in init_host. + 3.8 - Make failover_host cluster aware and add failover_completed. """ - RPC_API_VERSION = '3.7' + RPC_API_VERSION = '3.8' RPC_DEFAULT_VERSION = '3.0' TOPIC = constants.VOLUME_TOPIC BINARY = 'cinder-volume' @@ -310,21 +311,31 @@ class VolumeAPI(rpc.RPCAPI): new_volume=new_volume, volume_status=original_volume_status) - def freeze_host(self, ctxt, host): + def freeze_host(self, ctxt, service): """Set backend host to frozen.""" - cctxt = self._get_cctxt(host) + cctxt = self._get_cctxt(service.service_topic_queue) return cctxt.call(ctxt, 'freeze_host') - def thaw_host(self, ctxt, host): + def thaw_host(self, ctxt, service): """Clear the frozen setting on a backend host.""" - cctxt = self._get_cctxt(host) + cctxt = self._get_cctxt(service.service_topic_queue) return cctxt.call(ctxt, 'thaw_host') - def failover_host(self, ctxt, host, secondary_backend_id=None): - """Failover host to the specified backend_id (secondary).""" - cctxt = self._get_cctxt(host) - cctxt.cast(ctxt, 'failover_host', - secondary_backend_id=secondary_backend_id) + def failover(self, ctxt, service, secondary_backend_id=None): + """Failover host to the specified backend_id (secondary). """ + version = '3.8' + method = 'failover' + if not self.client.can_send_version(version): + version = '3.0' + method = 'failover_host' + cctxt = self._get_cctxt(service.service_topic_queue, version) + cctxt.cast(ctxt, method, secondary_backend_id=secondary_backend_id) + + def failover_completed(self, ctxt, service, updates): + """Complete failover on all services of the cluster.""" + cctxt = self._get_cctxt(service.service_topic_queue, '3.8', + fanout=True) + cctxt.cast(ctxt, 'failover_completed', updates=updates) def manage_existing_snapshot(self, ctxt, snapshot, ref, backend): cctxt = self._get_cctxt(backend)