Make Replication support Active-Active

This patch adds new methods to our failover mechanism to allow failover
to work when a backend is clustered.

Adds REST API microversion 3.26 that adds a new `failover` method
equivalent to `failover_host` but accepting `cluster` field as well as
the `host` field.

Thaw and Freeze are updated to update cluster and all services within
the cluster.

Now cluster listings accepts new filtering fields `replication_status`,
`frozen`, and `active_backend_id`.

Summary listings return `replication_status` field and detailed listings
also return `frozen` and `active_backend_id`.

Specs: https://review.openstack.org/401392

APIImpact: New service failover action and new fields in cluster listings.
Implements: blueprint cinder-volume-active-active-support
Change-Id: Id3291b28242d5814c259283fa629b48f22e70260
This commit is contained in:
Gorka Eguileor 2016-10-03 21:27:53 +02:00
parent 02389a1d2a
commit b4a13281ea
28 changed files with 1140 additions and 224 deletions

View File

@ -372,8 +372,23 @@ class ViewBuilder(object):
return urllib.parse.urlunsplit(url_parts).rstrip('/') return urllib.parse.urlunsplit(url_parts).rstrip('/')
def get_cluster_host(req, params, cluster_version): def get_cluster_host(req, params, cluster_version=None):
if req.api_version_request.matches(cluster_version): """Get cluster and host from the parameters.
This method checks the presence of cluster and host parameters and returns
them depending on the cluster_version.
If cluster_version is False we will never return the cluster_name and we
will require the presence of the host parameter.
If cluster_version is None we will always check for the presence of the
cluster parameter, and if cluster_version is a string with a version we
will only check for the presence of the parameter if the version of the
request is not less than it. In both cases we will require one and only
one parameter, host or cluster.
"""
if (cluster_version is not False and
req.api_version_request.matches(cluster_version)):
cluster_name = params.get('cluster') cluster_name = params.get('cluster')
msg = _('One and only one of cluster and host must be set.') msg = _('One and only one of cluster and host must be set.')
else: else:

View File

@ -20,6 +20,7 @@ from oslo_log import versionutils
from oslo_utils import timeutils from oslo_utils import timeutils
import webob.exc import webob.exc
from cinder.api import common
from cinder.api import extensions from cinder.api import extensions
from cinder.api.openstack import wsgi from cinder.api.openstack import wsgi
from cinder import exception from cinder import exception
@ -110,20 +111,23 @@ class ServiceController(wsgi.Controller):
return True return True
def _freeze(self, context, host): def _freeze(self, context, req, body):
return self.volume_api.freeze_host(context, host) cluster_name, host = common.get_cluster_host(req, body, '3.26')
return self.volume_api.freeze_host(context, host, cluster_name)
def _thaw(self, context, host): def _thaw(self, context, req, body):
return self.volume_api.thaw_host(context, host) cluster_name, host = common.get_cluster_host(req, body, '3.26')
return self.volume_api.thaw_host(context, host, cluster_name)
def _failover(self, context, host, backend_id=None): def _failover(self, context, req, body, clustered):
return self.volume_api.failover_host(context, host, backend_id) # We set version to None to always get the cluster name from the body,
# to False when we don't want to get it, and '3.26' when we only want
def _get_host(self, body): # it if the requested version is 3.26 or higher.
try: version = '3.26' if clustered else False
return body['host'] cluster_name, host = common.get_cluster_host(req, body, version)
except (TypeError, KeyError): self.volume_api.failover(context, host, cluster_name,
raise exception.MissingRequired(element='host') body.get('backend_id'))
return webob.Response(status_int=202)
def update(self, req, id, body): def update(self, req, id, body):
"""Enable/Disable scheduling for a service. """Enable/Disable scheduling for a service.
@ -148,20 +152,17 @@ class ServiceController(wsgi.Controller):
disabled = True disabled = True
status = "disabled" status = "disabled"
elif id == "freeze": elif id == "freeze":
return self._freeze(context, self._get_host(body)) return self._freeze(context, req, body)
elif id == "thaw": elif id == "thaw":
return self._thaw(context, self._get_host(body)) return self._thaw(context, req, body)
elif id == "failover_host": elif id == "failover_host":
self._failover( return self._failover(context, req, body, False)
context, elif req.api_version_request.matches('3.26') and id == 'failover':
self._get_host(body), return self._failover(context, req, body, True)
body.get('backend_id', None)
)
return webob.Response(status_int=202)
else: else:
raise exception.InvalidInput(reason=_("Unknown action")) raise exception.InvalidInput(reason=_("Unknown action"))
host = self._get_host(body) host = common.get_cluster_host(req, body, False)[1]
ret_val['disabled'] = disabled ret_val['disabled'] = disabled
if id == "disable-log-reason" and ext_loaded: if id == "disable-log-reason" and ext_loaded:

View File

@ -75,6 +75,8 @@ REST_API_VERSION_HISTORY = """
* 3.23 - Allow passing force parameter to volume delete. * 3.23 - Allow passing force parameter to volume delete.
* 3.24 - Add workers/cleanup endpoint. * 3.24 - Add workers/cleanup endpoint.
* 3.25 - Add ``volumes`` field to group list/detail and group show. * 3.25 - Add ``volumes`` field to group list/detail and group show.
* 3.26 - Add failover action and cluster listings accept new filters and
return new data.
""" """
# The minimum and maximum versions of the API supported # The minimum and maximum versions of the API supported
@ -82,7 +84,7 @@ REST_API_VERSION_HISTORY = """
# minimum version of the API supported. # minimum version of the API supported.
# Explicitly using /v1 or /v2 enpoints will still work # Explicitly using /v1 or /v2 enpoints will still work
_MIN_API_VERSION = "3.0" _MIN_API_VERSION = "3.0"
_MAX_API_VERSION = "3.25" _MAX_API_VERSION = "3.26"
_LEGACY_API_VERSION1 = "1.0" _LEGACY_API_VERSION1 = "1.0"
_LEGACY_API_VERSION2 = "2.0" _LEGACY_API_VERSION2 = "2.0"

View File

@ -268,3 +268,15 @@ user documentation.
3.25 3.25
---- ----
Add ``volumes`` field to group list/detail and group show. Add ``volumes`` field to group list/detail and group show.
3.26
----
- New ``failover`` action equivalent to ``failover_host``, but accepting
``cluster`` parameter as well as the ``host`` cluster that
``failover_host`` accepts.
- ``freeze`` and ``thaw`` actions accept ``cluster`` parameter.
- Cluster listing accepts ``replication_status``, ``frozen`` and
``active_backend_id`` as filters, and returns additional fields for each
cluster: ``replication_status``, ``frozen``, ``active_backend_id``.

View File

@ -22,11 +22,14 @@ from cinder import utils
CLUSTER_MICRO_VERSION = '3.7' CLUSTER_MICRO_VERSION = '3.7'
REPLICATION_DATA_MICRO_VERSION = '3.26'
class ClusterController(wsgi.Controller): class ClusterController(wsgi.Controller):
allowed_list_keys = {'name', 'binary', 'is_up', 'disabled', 'num_hosts', allowed_list_keys = {'name', 'binary', 'is_up', 'disabled', 'num_hosts',
'num_down_hosts', 'binary'} 'num_down_hosts', 'binary', 'replication_status',
'frozen', 'active_backend_id'}
replication_fields = {'replication_status', 'frozen', 'active_backend_id'}
policy_checker = wsgi.Controller.get_policy_checker('clusters') policy_checker = wsgi.Controller.get_policy_checker('clusters')
@ -38,7 +41,9 @@ class ClusterController(wsgi.Controller):
# Let the wsgi middleware convert NotFound exceptions # Let the wsgi middleware convert NotFound exceptions
cluster = objects.Cluster.get_by_id(context, None, binary=binary, cluster = objects.Cluster.get_by_id(context, None, binary=binary,
name=id, services_summary=True) name=id, services_summary=True)
return clusters_view.ViewBuilder.detail(cluster) replication_data = req.api_version_request.matches(
REPLICATION_DATA_MICRO_VERSION)
return clusters_view.ViewBuilder.detail(cluster, replication_data)
@wsgi.Controller.api_version(CLUSTER_MICRO_VERSION) @wsgi.Controller.api_version(CLUSTER_MICRO_VERSION)
def index(self, req): def index(self, req):
@ -59,9 +64,12 @@ class ClusterController(wsgi.Controller):
def _get_clusters(self, req, detail): def _get_clusters(self, req, detail):
# Let the wsgi middleware convert NotAuthorized exceptions # Let the wsgi middleware convert NotAuthorized exceptions
context = self.policy_checker(req, 'get_all') context = self.policy_checker(req, 'get_all')
replication_data = req.api_version_request.matches(
REPLICATION_DATA_MICRO_VERSION)
filters = dict(req.GET) filters = dict(req.GET)
allowed = self.allowed_list_keys allowed = self.allowed_list_keys
if not replication_data:
allowed = allowed.difference(self.replication_fields)
# Check filters are valid # Check filters are valid
if not allowed.issuperset(filters): if not allowed.issuperset(filters):
@ -78,7 +86,8 @@ class ClusterController(wsgi.Controller):
filters['services_summary'] = detail filters['services_summary'] = detail
clusters = objects.ClusterList.get_all(context, **filters) clusters = objects.ClusterList.get_all(context, **filters)
return clusters_view.ViewBuilder.list(clusters, detail) return clusters_view.ViewBuilder.list(clusters, detail,
replication_data)
@wsgi.Controller.api_version(CLUSTER_MICRO_VERSION) @wsgi.Controller.api_version(CLUSTER_MICRO_VERSION)
def update(self, req, id, body): def update(self, req, id, body):
@ -113,7 +122,9 @@ class ClusterController(wsgi.Controller):
cluster.save() cluster.save()
# We return summary data plus the disabled reason # We return summary data plus the disabled reason
ret_val = clusters_view.ViewBuilder.summary(cluster) replication_data = req.api_version_request.matches(
REPLICATION_DATA_MICRO_VERSION)
ret_val = clusters_view.ViewBuilder.summary(cluster, replication_data)
ret_val['cluster']['disabled_reason'] = disabled_reason ret_val['cluster']['disabled_reason'] = disabled_reason
return ret_val return ret_val

View File

@ -28,7 +28,7 @@ class ViewBuilder(object):
return '' return ''
@classmethod @classmethod
def detail(cls, cluster, flat=False): def detail(cls, cluster, replication_data=False, flat=False):
"""Detailed view of a cluster.""" """Detailed view of a cluster."""
result = cls.summary(cluster, flat=True) result = cls.summary(cluster, flat=True)
result.update( result.update(
@ -37,27 +37,36 @@ class ViewBuilder(object):
last_heartbeat=cls._normalize(cluster.last_heartbeat), last_heartbeat=cls._normalize(cluster.last_heartbeat),
created_at=cls._normalize(cluster.created_at), created_at=cls._normalize(cluster.created_at),
updated_at=cls._normalize(cluster.updated_at), updated_at=cls._normalize(cluster.updated_at),
disabled_reason=cluster.disabled_reason disabled_reason=cluster.disabled_reason,
replication_status=cluster.replication_status,
frozen=cluster.frozen,
active_backend_id=cluster.active_backend_id,
) )
if not replication_data:
for field in ('replication_status', 'frozen', 'active_backend_id'):
del result[field]
if flat: if flat:
return result return result
return {'cluster': result} return {'cluster': result}
@staticmethod @staticmethod
def summary(cluster, flat=False): def summary(cluster, replication_data=False, flat=False):
"""Generic, non-detailed view of a cluster.""" """Generic, non-detailed view of a cluster."""
result = { result = {
'name': cluster.name, 'name': cluster.name,
'binary': cluster.binary, 'binary': cluster.binary,
'state': 'up' if cluster.is_up else 'down', 'state': 'up' if cluster.is_up else 'down',
'status': 'disabled' if cluster.disabled else 'enabled', 'status': 'disabled' if cluster.disabled else 'enabled',
'replication_status': cluster.replication_status,
} }
if not replication_data:
del result['replication_status']
if flat: if flat:
return result return result
return {'cluster': result} return {'cluster': result}
@classmethod @classmethod
def list(cls, clusters, detail=False): def list(cls, clusters, detail=False, replication_data=False):
func = cls.detail if detail else cls.summary func = cls.detail if detail else cls.summary
return {'clusters': [func(n, flat=True) for n in clusters]} return {'clusters': [func(n, replication_data, flat=True)
for n in clusters]}

View File

@ -0,0 +1,33 @@
# Copyright (c) 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Boolean, Column, MetaData, String, Table, text
def upgrade(migrate_engine):
"""Add replication info to clusters table."""
meta = MetaData()
meta.bind = migrate_engine
clusters = Table('clusters', meta, autoload=True)
replication_status = Column('replication_status', String(length=36),
default="not-capable")
active_backend_id = Column('active_backend_id', String(length=255))
frozen = Column('frozen', Boolean, nullable=False, default=False,
server_default=text('false'))
clusters.create_column(replication_status)
clusters.create_column(frozen)
clusters.create_column(active_backend_id)

View File

@ -122,6 +122,10 @@ class Cluster(BASE, CinderBase):
disabled_reason = Column(String(255)) disabled_reason = Column(String(255))
race_preventer = Column(Integer, nullable=False, default=0) race_preventer = Column(Integer, nullable=False, default=0)
replication_status = Column(String(36), default="not-capable")
active_backend_id = Column(String(255))
frozen = Column(Boolean, nullable=False, default=False)
# Last heartbeat reported by any of the services of this cluster. This is # Last heartbeat reported by any of the services of this cluster. This is
# not deferred since we always want to load this field. # not deferred since we always want to load this field.
last_heartbeat = column_property( last_heartbeat = column_property(

View File

@ -422,12 +422,13 @@ class ImageNotFound(NotFound):
class ServiceNotFound(NotFound): class ServiceNotFound(NotFound):
def __init__(self, message=None, **kwargs): def __init__(self, message=None, **kwargs):
if kwargs.get('host', None): if not message:
self.message = _("Service %(service_id)s could not be " if kwargs.get('host', None):
"found on host %(host)s.") self.message = _("Service %(service_id)s could not be "
else: "found on host %(host)s.")
self.message = _("Service %(service_id)s could not be found.") else:
super(ServiceNotFound, self).__init__(None, **kwargs) self.message = _("Service %(service_id)s could not be found.")
super(ServiceNotFound, self).__init__(message, **kwargs)
class ServiceTooOld(Invalid): class ServiceTooOld(Invalid):

View File

@ -124,6 +124,7 @@ OBJ_VERSIONS.add('1.16', {'BackupDeviceInfo': '1.0'})
OBJ_VERSIONS.add('1.17', {'VolumeAttachment': '1.1'}) OBJ_VERSIONS.add('1.17', {'VolumeAttachment': '1.1'})
OBJ_VERSIONS.add('1.18', {'Snapshot': '1.3'}) OBJ_VERSIONS.add('1.18', {'Snapshot': '1.3'})
OBJ_VERSIONS.add('1.19', {'ConsistencyGroup': '1.4', 'CGSnapshot': '1.1'}) OBJ_VERSIONS.add('1.19', {'ConsistencyGroup': '1.4', 'CGSnapshot': '1.1'})
OBJ_VERSIONS.add('1.20', {'Cluster': '1.1'})
class CinderObjectRegistry(base.VersionedObjectRegistry): class CinderObjectRegistry(base.VersionedObjectRegistry):

View File

@ -20,6 +20,7 @@ from cinder import exception
from cinder.i18n import _ from cinder.i18n import _
from cinder import objects from cinder import objects
from cinder.objects import base from cinder.objects import base
from cinder.objects import fields as c_fields
from cinder import utils from cinder import utils
@ -37,7 +38,8 @@ class Cluster(base.CinderPersistentObject, base.CinderObject,
- Any other cluster field will be used as a filter. - Any other cluster field will be used as a filter.
""" """
# Version 1.0: Initial version # Version 1.0: Initial version
VERSION = '1.0' # Version 1.1: Add replication fields
VERSION = '1.1'
OPTIONAL_FIELDS = ('num_hosts', 'num_down_hosts', 'services') OPTIONAL_FIELDS = ('num_hosts', 'num_down_hosts', 'services')
# NOTE(geguileo): We don't want to expose race_preventer field at the OVO # NOTE(geguileo): We don't want to expose race_preventer field at the OVO
@ -54,8 +56,24 @@ class Cluster(base.CinderPersistentObject, base.CinderObject,
'last_heartbeat': fields.DateTimeField(nullable=True, read_only=True), 'last_heartbeat': fields.DateTimeField(nullable=True, read_only=True),
'services': fields.ObjectField('ServiceList', nullable=True, 'services': fields.ObjectField('ServiceList', nullable=True,
read_only=True), read_only=True),
# Replication properties
'replication_status': c_fields.ReplicationStatusField(nullable=True),
'frozen': fields.BooleanField(default=False),
'active_backend_id': fields.StringField(nullable=True),
} }
def obj_make_compatible(self, primitive, target_version):
"""Make a cluster representation compatible with a target version."""
# Convert all related objects
super(Cluster, self).obj_make_compatible(primitive, target_version)
# Before v1.1 we didn't have relication fields so we have to remove
# them.
if target_version == '1.0':
for obj_field in ('replication_status', 'frozen',
'active_backend_id'):
primitive.pop(obj_field, None)
@classmethod @classmethod
def _get_expected_attrs(cls, context, *args, **kwargs): def _get_expected_attrs(cls, context, *args, **kwargs):
"""Return expected attributes when getting a cluster. """Return expected attributes when getting a cluster.

View File

@ -44,6 +44,7 @@ from cinder import exception
from cinder.i18n import _, _LE, _LI, _LW from cinder.i18n import _, _LE, _LI, _LW
from cinder import objects from cinder import objects
from cinder.objects import base as objects_base from cinder.objects import base as objects_base
from cinder.objects import fields
from cinder import rpc from cinder import rpc
from cinder.scheduler import rpcapi as scheduler_rpcapi from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import version from cinder import version
@ -187,7 +188,7 @@ class Service(service.Service):
if self.added_to_cluster: if self.added_to_cluster:
# We pass copy service's disable status in the cluster if we # We pass copy service's disable status in the cluster if we
# have to create it. # have to create it.
self._ensure_cluster_exists(ctxt, service_ref.disabled) self._ensure_cluster_exists(ctxt, service_ref)
service_ref.cluster_name = cluster service_ref.cluster_name = cluster
service_ref.save() service_ref.save()
Service.service_id = service_ref.id Service.service_id = service_ref.id
@ -272,7 +273,9 @@ class Service(service.Service):
'%(version)s)'), '%(version)s)'),
{'topic': self.topic, 'version': version_string, {'topic': self.topic, 'version': version_string,
'cluster': self.cluster}) 'cluster': self.cluster})
target = messaging.Target(topic=self.topic, server=self.cluster) target = messaging.Target(
topic='%s.%s' % (self.topic, self.cluster),
server=vol_utils.extract_host(self.cluster, 'host'))
serializer = objects_base.CinderObjectSerializer(obj_version_cap) serializer = objects_base.CinderObjectSerializer(obj_version_cap)
self.cluster_rpcserver = rpc.get_server(target, endpoints, self.cluster_rpcserver = rpc.get_server(target, endpoints,
serializer) serializer)
@ -316,17 +319,34 @@ class Service(service.Service):
'new_down_time': new_down_time}) 'new_down_time': new_down_time})
CONF.set_override('service_down_time', new_down_time) CONF.set_override('service_down_time', new_down_time)
def _ensure_cluster_exists(self, context, disabled=None): def _ensure_cluster_exists(self, context, service):
if self.cluster: if self.cluster:
try: try:
objects.Cluster.get_by_id(context, None, name=self.cluster, cluster = objects.Cluster.get_by_id(context, None,
binary=self.binary) name=self.cluster,
binary=self.binary)
# If the cluster already exists, then the service replication
# fields must match those of the cluster unless the service
# is in error status.
error_states = (fields.ReplicationStatus.ERROR,
fields.ReplicationStatus.FAILOVER_ERROR)
if service.replication_status not in error_states:
for attr in ('replication_status', 'active_backend_id',
'frozen'):
if getattr(service, attr) != getattr(cluster, attr):
setattr(service, attr, getattr(cluster, attr))
except exception.ClusterNotFound: except exception.ClusterNotFound:
cluster = objects.Cluster(context=context, name=self.cluster, # Since the cluster didn't exist, we copy replication fields
binary=self.binary) # from the service.
# If disabled has been specified overwrite default value cluster = objects.Cluster(
if disabled is not None: context=context,
cluster.disabled = disabled name=self.cluster,
binary=self.binary,
disabled=service.disabled,
replication_status=service.replication_status,
active_backend_id=service.active_backend_id,
frozen=service.frozen)
try: try:
cluster.create() cluster.create()
@ -355,7 +375,10 @@ class Service(service.Service):
Service.service_id = service_ref.id Service.service_id = service_ref.id
# TODO(geguileo): In O unconditionally ensure that the cluster exists # TODO(geguileo): In O unconditionally ensure that the cluster exists
if not self.is_upgrading_to_n: if not self.is_upgrading_to_n:
self._ensure_cluster_exists(context) self._ensure_cluster_exists(context, service_ref)
# If we have updated the service_ref with replication data from
# the cluster it will be saved.
service_ref.save()
def __getattr__(self, key): def __getattr__(self, key):
manager = self.__dict__.get('manager', None) manager = self.__dict__.get('manager', None)

View File

@ -255,6 +255,36 @@ class ServicesTest(test.TestCase):
]} ]}
self.assertEqual(response, res_dict) self.assertEqual(response, res_dict)
def test_failover_old_version(self):
req = FakeRequest(version='3.18')
self.assertRaises(exception.InvalidInput, self.controller.update, req,
'failover', {'cluster': 'cluster1'})
def test_failover_no_values(self):
req = FakeRequest(version='3.26')
self.assertRaises(exception.InvalidInput, self.controller.update, req,
'failover', {'backend_id': 'replica1'})
@ddt.data({'host': 'hostname'}, {'cluster': 'mycluster'})
@mock.patch('cinder.volume.api.API.failover')
def test_failover(self, body, failover_mock):
req = FakeRequest(version='3.26')
body['backend_id'] = 'replica1'
res = self.controller.update(req, 'failover', body)
self.assertEqual(202, res.status_code)
failover_mock.assert_called_once_with(req.environ['cinder.context'],
body.get('host'),
body.get('cluster'), 'replica1')
@ddt.data({}, {'host': 'hostname', 'cluster': 'mycluster'})
@mock.patch('cinder.volume.api.API.failover')
def test_failover_invalid_input(self, body, failover_mock):
req = FakeRequest(version='3.26')
body['backend_id'] = 'replica1'
self.assertRaises(exception.InvalidInput,
self.controller.update, req, 'failover', body)
failover_mock.assert_not_called()
def test_services_list_with_cluster_name(self): def test_services_list_with_cluster_name(self):
req = FakeRequest(version='3.7') req = FakeRequest(version='3.7')
res_dict = self.controller.index(req) res_dict = self.controller.index(req)
@ -756,11 +786,12 @@ class ServicesTest(test.TestCase):
req = fakes.HTTPRequest.blank(url) req = fakes.HTTPRequest.blank(url)
body = {'host': mock.sentinel.host, body = {'host': mock.sentinel.host,
'backend_id': mock.sentinel.backend_id} 'backend_id': mock.sentinel.backend_id}
with mock.patch.object(self.controller.volume_api, 'failover_host') \ with mock.patch.object(self.controller.volume_api, 'failover') \
as failover_mock: as failover_mock:
res = self.controller.update(req, 'failover_host', body) res = self.controller.update(req, 'failover_host', body)
failover_mock.assert_called_once_with(req.environ['cinder.context'], failover_mock.assert_called_once_with(req.environ['cinder.context'],
mock.sentinel.host, mock.sentinel.host,
None,
mock.sentinel.backend_id) mock.sentinel.backend_id)
self.assertEqual(202, res.status_code) self.assertEqual(202, res.status_code)
@ -772,7 +803,7 @@ class ServicesTest(test.TestCase):
as freeze_mock: as freeze_mock:
res = self.controller.update(req, 'freeze', body) res = self.controller.update(req, 'freeze', body)
freeze_mock.assert_called_once_with(req.environ['cinder.context'], freeze_mock.assert_called_once_with(req.environ['cinder.context'],
mock.sentinel.host) mock.sentinel.host, None)
self.assertEqual(freeze_mock.return_value, res) self.assertEqual(freeze_mock.return_value, res)
def test_services_thaw(self): def test_services_thaw(self):
@ -783,12 +814,12 @@ class ServicesTest(test.TestCase):
as thaw_mock: as thaw_mock:
res = self.controller.update(req, 'thaw', body) res = self.controller.update(req, 'thaw', body)
thaw_mock.assert_called_once_with(req.environ['cinder.context'], thaw_mock.assert_called_once_with(req.environ['cinder.context'],
mock.sentinel.host) mock.sentinel.host, None)
self.assertEqual(thaw_mock.return_value, res) self.assertEqual(thaw_mock.return_value, res)
@ddt.data('freeze', 'thaw', 'failover_host') @ddt.data('freeze', 'thaw', 'failover_host')
def test_services_replication_calls_no_host(self, method): def test_services_replication_calls_no_host(self, method):
url = '/v2/%s/os-services/%s' % (fake.PROJECT_ID, method) url = '/v2/%s/os-services/%s' % (fake.PROJECT_ID, method)
req = fakes.HTTPRequest.blank(url) req = fakes.HTTPRequest.blank(url)
self.assertRaises(exception.MissingRequired, self.assertRaises(exception.InvalidInput,
self.controller.update, req, method, {}) self.controller.update, req, method, {})

View File

@ -18,6 +18,7 @@ import datetime
import ddt import ddt
from iso8601 import iso8601 from iso8601 import iso8601
import mock import mock
from oslo_utils import versionutils
from cinder.api import extensions from cinder.api import extensions
from cinder.api.openstack import api_version_request as api_version from cinder.api.openstack import api_version_request as api_version
@ -31,11 +32,17 @@ from cinder.tests.unit import fake_cluster
CLUSTERS = [ CLUSTERS = [
fake_cluster.fake_db_cluster( fake_cluster.fake_db_cluster(
id=1, id=1,
replication_status='error',
frozen=False,
active_backend_id='replication1',
last_heartbeat=datetime.datetime(2016, 6, 1, 2, 46, 28), last_heartbeat=datetime.datetime(2016, 6, 1, 2, 46, 28),
updated_at=datetime.datetime(2016, 6, 1, 2, 46, 28), updated_at=datetime.datetime(2016, 6, 1, 2, 46, 28),
created_at=datetime.datetime(2016, 6, 1, 2, 46, 28)), created_at=datetime.datetime(2016, 6, 1, 2, 46, 28)),
fake_cluster.fake_db_cluster( fake_cluster.fake_db_cluster(
id=2, name='cluster2', num_hosts=2, num_down_hosts=1, disabled=True, id=2, name='cluster2', num_hosts=2, num_down_hosts=1, disabled=True,
replication_status='error',
frozen=True,
active_backend_id='replication2',
updated_at=datetime.datetime(2016, 6, 1, 1, 46, 28), updated_at=datetime.datetime(2016, 6, 1, 1, 46, 28),
created_at=datetime.datetime(2016, 6, 1, 1, 46, 28)) created_at=datetime.datetime(2016, 6, 1, 1, 46, 28))
] ]
@ -51,6 +58,9 @@ EXPECTED = [{'created_at': datetime.datetime(2016, 6, 1, 2, 46, 28),
'num_hosts': 0, 'num_hosts': 0,
'state': 'up', 'state': 'up',
'status': 'enabled', 'status': 'enabled',
'replication_status': 'error',
'frozen': False,
'active_backend_id': 'replication1',
'updated_at': datetime.datetime(2016, 6, 1, 2, 46, 28)}, 'updated_at': datetime.datetime(2016, 6, 1, 2, 46, 28)},
{'created_at': datetime.datetime(2016, 6, 1, 1, 46, 28), {'created_at': datetime.datetime(2016, 6, 1, 1, 46, 28),
'disabled_reason': None, 'disabled_reason': None,
@ -61,6 +71,9 @@ EXPECTED = [{'created_at': datetime.datetime(2016, 6, 1, 2, 46, 28),
'num_hosts': 2, 'num_hosts': 2,
'state': 'down', 'state': 'down',
'status': 'disabled', 'status': 'disabled',
'replication_status': 'error',
'frozen': True,
'active_backend_id': 'replication2',
'updated_at': datetime.datetime(2016, 6, 1, 1, 46, 28)}] 'updated_at': datetime.datetime(2016, 6, 1, 1, 46, 28)}]
@ -92,6 +105,21 @@ class ClustersTestCase(test.TestCase):
{'is_up': True, 'disabled': False, 'num_hosts': 2, {'is_up': True, 'disabled': False, 'num_hosts': 2,
'num_down_hosts': 1, 'binary': 'cinder-volume'}) 'num_down_hosts': 1, 'binary': 'cinder-volume'})
REPLICATION_FILTERS = ({'replication_status': 'error'}, {'frozen': True},
{'active_backend_id': 'replication'})
def _get_expected(self, version='3.8'):
if versionutils.convert_version_to_tuple(version) >= (3, 19):
return EXPECTED
expect = []
for cluster in EXPECTED:
cluster = cluster.copy()
for key in ('replication_status', 'frozen', 'active_backend_id'):
cluster.pop(key)
expect.append(cluster)
return expect
def setUp(self): def setUp(self):
super(ClustersTestCase, self).setUp() super(ClustersTestCase, self).setUp()
@ -101,8 +129,10 @@ class ClustersTestCase(test.TestCase):
self.controller = clusters.ClusterController(self.ext_mgr) self.controller = clusters.ClusterController(self.ext_mgr)
@mock.patch('cinder.db.cluster_get_all', return_value=CLUSTERS_ORM) @mock.patch('cinder.db.cluster_get_all', return_value=CLUSTERS_ORM)
def _test_list(self, get_all_mock, detailed, filters, expected=None): def _test_list(self, get_all_mock, detailed, filters=None, expected=None,
req = FakeRequest(**filters) version='3.8'):
filters = filters or {}
req = FakeRequest(version=version, **filters)
method = getattr(self.controller, 'detail' if detailed else 'index') method = getattr(self.controller, 'detail' if detailed else 'index')
clusters = method(req) clusters = method(req)
@ -119,7 +149,7 @@ class ClustersTestCase(test.TestCase):
@ddt.data(*LIST_FILTERS) @ddt.data(*LIST_FILTERS)
def test_index_detail(self, filters): def test_index_detail(self, filters):
"""Verify that we get all clusters with detailed data.""" """Verify that we get all clusters with detailed data."""
expected = {'clusters': EXPECTED} expected = {'clusters': self._get_expected()}
self._test_list(detailed=True, filters=filters, expected=expected) self._test_list(detailed=True, filters=filters, expected=expected)
@ddt.data(*LIST_FILTERS) @ddt.data(*LIST_FILTERS)
@ -135,6 +165,16 @@ class ClustersTestCase(test.TestCase):
'status': 'disabled'}]} 'status': 'disabled'}]}
self._test_list(detailed=False, filters=filters, expected=expected) self._test_list(detailed=False, filters=filters, expected=expected)
@ddt.data(*REPLICATION_FILTERS)
def test_index_detail_fail_old(self, filters):
self.assertRaises(exception.InvalidInput, self._test_list,
detailed=True, filters=filters)
@ddt.data(*REPLICATION_FILTERS)
def test_index_summary_fail_old(self, filters):
self.assertRaises(exception.InvalidInput, self._test_list,
detailed=False, filters=filters)
@ddt.data(True, False) @ddt.data(True, False)
def test_index_unauthorized(self, detailed): def test_index_unauthorized(self, detailed):
"""Verify that unauthorized user can't list clusters.""" """Verify that unauthorized user can't list clusters."""
@ -147,13 +187,35 @@ class ClustersTestCase(test.TestCase):
"""Verify the wrong version so that user can't list clusters.""" """Verify the wrong version so that user can't list clusters."""
self.assertRaises(exception.VersionNotFoundForAPIMethod, self.assertRaises(exception.VersionNotFoundForAPIMethod,
self._test_list, detailed=detailed, self._test_list, detailed=detailed,
filters={'version': '3.5'}) version='3.6')
@ddt.data(*REPLICATION_FILTERS)
def test_index_detail_replication_new_fields(self, filters):
version = '3.26'
expected = {'clusters': self._get_expected(version)}
self._test_list(detailed=True, filters=filters, expected=expected,
version=version)
@ddt.data(*REPLICATION_FILTERS)
def test_index_summary_replication_new_fields(self, filters):
expected = {'clusters': [{'name': 'cluster_name',
'binary': 'cinder-volume',
'state': 'up',
'replication_status': 'error',
'status': 'enabled'},
{'name': 'cluster2',
'binary': 'cinder-volume',
'state': 'down',
'replication_status': 'error',
'status': 'disabled'}]}
self._test_list(detailed=False, filters=filters, expected=expected,
version='3.26')
@mock.patch('cinder.db.sqlalchemy.api.cluster_get', @mock.patch('cinder.db.sqlalchemy.api.cluster_get',
return_value=CLUSTERS_ORM[0]) return_value=CLUSTERS_ORM[0])
def test_show(self, get_mock): def test_show(self, get_mock):
req = FakeRequest() req = FakeRequest()
expected = {'cluster': EXPECTED[0]} expected = {'cluster': self._get_expected()[0]}
cluster = self.controller.show(req, mock.sentinel.name, cluster = self.controller.show(req, mock.sentinel.name,
mock.sentinel.binary) mock.sentinel.binary)
self.assertEqual(expected, cluster) self.assertEqual(expected, cluster)

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import ddt
import mock import mock
from oslo_utils import timeutils from oslo_utils import timeutils
@ -40,6 +41,7 @@ def _get_filters_sentinel():
'num_down_hosts': mock.sentinel.num_down_hosts} 'num_down_hosts': mock.sentinel.num_down_hosts}
@ddt.ddt
class TestCluster(test_objects.BaseObjectsTestCase): class TestCluster(test_objects.BaseObjectsTestCase):
"""Test Cluster Versioned Object methods.""" """Test Cluster Versioned Object methods."""
cluster = fake_cluster.fake_cluster_orm() cluster = fake_cluster.fake_cluster_orm()
@ -111,6 +113,19 @@ class TestCluster(test_objects.BaseObjectsTestCase):
last_heartbeat=expired_time) last_heartbeat=expired_time)
self.assertFalse(cluster.is_up) self.assertFalse(cluster.is_up)
@ddt.data('1.0', '1.1')
def tests_obj_make_compatible(self, version):
new_fields = {'replication_status': 'error', 'frozen': True,
'active_backend_id': 'replication'}
cluster = objects.Cluster(self.context, **new_fields)
primitive = cluster.obj_to_primitive(version)
converted_cluster = objects.Cluster.obj_from_primitive(primitive)
for key, value in new_fields.items():
if version == '1.0':
self.assertFalse(converted_cluster.obj_attr_is_set(key))
else:
self.assertEqual(value, getattr(converted_cluster, key))
class TestClusterList(test_objects.BaseObjectsTestCase): class TestClusterList(test_objects.BaseObjectsTestCase):
"""Test ClusterList Versioned Object methods.""" """Test ClusterList Versioned Object methods."""

View File

@ -28,7 +28,7 @@ object_data = {
'BackupImport': '1.4-c50f7a68bb4c400dd53dd219685b3992', 'BackupImport': '1.4-c50f7a68bb4c400dd53dd219685b3992',
'BackupList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e', 'BackupList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',
'CleanupRequest': '1.0-e7c688b893e1d5537ccf65cc3eb10a28', 'CleanupRequest': '1.0-e7c688b893e1d5537ccf65cc3eb10a28',
'Cluster': '1.0-6f06e867c073e9d31722c53b0a9329b8', 'Cluster': '1.1-cdb1572b250837933d950cc6662313b8',
'ClusterList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e', 'ClusterList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',
'CGSnapshot': '1.1-3212ac2b4c2811b7134fb9ba2c49ff74', 'CGSnapshot': '1.1-3212ac2b4c2811b7134fb9ba2c49ff74',
'CGSnapshotList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e', 'CGSnapshotList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',

View File

@ -1076,6 +1076,16 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
self.VARCHAR_TYPE) self.VARCHAR_TYPE)
self.assertTrue(messages.c.request_id.nullable) self.assertTrue(messages.c.request_id.nullable)
def _check_088(self, engine, data):
"""Test adding replication data to cluster table."""
clusters = db_utils.get_table(engine, 'clusters')
self.assertIsInstance(clusters.c.replication_status.type,
self.VARCHAR_TYPE)
self.assertIsInstance(clusters.c.active_backend_id.type,
self.VARCHAR_TYPE)
self.assertIsInstance(clusters.c.frozen.type,
self.BOOL_TYPE)
def test_walk_versions(self): def test_walk_versions(self):
self.walk_versions(False, False) self.walk_versions(False, False)

View File

@ -30,6 +30,7 @@ from cinder import db
from cinder import exception from cinder import exception
from cinder import manager from cinder import manager
from cinder import objects from cinder import objects
from cinder.objects import fields
from cinder import rpc from cinder import rpc
from cinder import service from cinder import service
from cinder import test from cinder import test
@ -437,8 +438,9 @@ class ServiceTestCase(test.TestCase):
if cluster and added_to_cluster: if cluster and added_to_cluster:
self.assertIsNotNone(app.cluster_rpcserver) self.assertIsNotNone(app.cluster_rpcserver)
expected_target_calls.append(mock.call(topic=self.topic, expected_target_calls.append(mock.call(
server=cluster)) topic=self.topic + '.' + cluster,
server=cluster.split('@')[0]))
expected_rpc_calls.extend(expected_rpc_calls[:]) expected_rpc_calls.extend(expected_rpc_calls[:])
# Check that we create message targets for host and cluster # Check that we create message targets for host and cluster
@ -465,12 +467,111 @@ class ServiceTestCase(test.TestCase):
get_min_obj_mock): get_min_obj_mock):
"""Test that with cluster we create the rpc service.""" """Test that with cluster we create the rpc service."""
get_min_obj_mock.return_value = obj_version get_min_obj_mock.return_value = obj_version
cluster = 'cluster' cluster = 'cluster@backend#pool'
self.host = 'host@backend#pool'
app = service.Service.create(host=self.host, binary='cinder-volume', app = service.Service.create(host=self.host, binary='cinder-volume',
cluster=cluster, topic=self.topic) cluster=cluster, topic=self.topic)
self._check_rpc_servers_and_init_host(app, obj_version != '1.3', self._check_rpc_servers_and_init_host(app, obj_version != '1.3',
cluster) cluster)
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
mock.Mock(return_value=False))
@mock.patch('cinder.objects.Cluster.get_by_id')
def test_ensure_cluster_exists_no_cluster(self, get_mock):
app = service.Service.create(host=self.host,
binary=self.binary,
topic=self.topic)
svc = objects.Service.get_by_id(self.ctxt, app.service_id)
app._ensure_cluster_exists(self.ctxt, svc)
get_mock.assert_not_called()
self.assertEqual({}, svc.cinder_obj_get_changes())
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
mock.Mock(return_value=False))
@mock.patch('cinder.objects.Cluster.get_by_id')
def test_ensure_cluster_exists_cluster_exists_non_relicated(self,
get_mock):
cluster = objects.Cluster(
name='cluster_name', active_backend_id=None, frozen=False,
replication_status=fields.ReplicationStatus.NOT_CAPABLE)
get_mock.return_value = cluster
app = service.Service.create(host=self.host,
binary=self.binary,
topic=self.topic)
svc = objects.Service.get_by_id(self.ctxt, app.service_id)
app.cluster = cluster.name
app._ensure_cluster_exists(self.ctxt, svc)
get_mock.assert_called_once_with(self.ctxt, None, name=cluster.name,
binary=app.binary)
self.assertEqual({}, svc.cinder_obj_get_changes())
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
mock.Mock(return_value=False))
@mock.patch('cinder.objects.Cluster.get_by_id')
def test_ensure_cluster_exists_cluster_change(self, get_mock):
"""We copy replication fields from the cluster to the service."""
changes = dict(replication_status=fields.ReplicationStatus.FAILED_OVER,
active_backend_id='secondary',
frozen=True)
cluster = objects.Cluster(name='cluster_name', **changes)
get_mock.return_value = cluster
app = service.Service.create(host=self.host,
binary=self.binary,
topic=self.topic)
svc = objects.Service.get_by_id(self.ctxt, app.service_id)
app.cluster = cluster.name
app._ensure_cluster_exists(self.ctxt, svc)
get_mock.assert_called_once_with(self.ctxt, None, name=cluster.name,
binary=app.binary)
self.assertEqual(changes, svc.cinder_obj_get_changes())
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
mock.Mock(return_value=False))
@mock.patch('cinder.objects.Cluster.get_by_id')
def test_ensure_cluster_exists_cluster_no_change(self, get_mock):
"""Don't copy replication fields from cluster if replication error."""
changes = dict(replication_status=fields.ReplicationStatus.FAILED_OVER,
active_backend_id='secondary',
frozen=True)
cluster = objects.Cluster(name='cluster_name', **changes)
get_mock.return_value = cluster
app = service.Service.create(host=self.host,
binary=self.binary,
topic=self.topic)
svc = objects.Service.get_by_id(self.ctxt, app.service_id)
svc.replication_status = fields.ReplicationStatus.ERROR
svc.obj_reset_changes()
app.cluster = cluster.name
app._ensure_cluster_exists(self.ctxt, svc)
get_mock.assert_called_once_with(self.ctxt, None, name=cluster.name,
binary=app.binary)
self.assertEqual({}, svc.cinder_obj_get_changes())
@mock.patch('cinder.service.Service.is_svc_upgrading_to_n',
mock.Mock(return_value=False))
def test_ensure_cluster_exists_cluster_create_replicated_and_non(self):
"""We use service replication fields to create the cluster."""
changes = dict(replication_status=fields.ReplicationStatus.FAILED_OVER,
active_backend_id='secondary',
frozen=True)
app = service.Service.create(host=self.host,
binary=self.binary,
topic=self.topic)
svc = objects.Service.get_by_id(self.ctxt, app.service_id)
for key, value in changes.items():
setattr(svc, key, value)
app.cluster = 'cluster_name'
app._ensure_cluster_exists(self.ctxt, svc)
cluster = objects.Cluster.get_by_id(self.ctxt, None, name=app.cluster)
for key, value in changes.items():
self.assertEqual(value, getattr(cluster, key))
class TestWSGIService(test.TestCase): class TestWSGIService(test.TestCase):

View File

@ -40,6 +40,7 @@ from taskflow.engines.action_engine import engine
from cinder.api import common from cinder.api import common
from cinder.brick.local_dev import lvm as brick_lvm from cinder.brick.local_dev import lvm as brick_lvm
from cinder.common import constants
from cinder import context from cinder import context
from cinder import coordination from cinder import coordination
from cinder import db from cinder import db
@ -5711,107 +5712,356 @@ class VolumeMigrationTestCase(base.BaseVolumeTestCase):
self.assertRaises(exception.VolumeNotFound, volume.refresh) self.assertRaises(exception.VolumeNotFound, volume.refresh)
@ddt.ddt
class ReplicationTestCase(base.BaseVolumeTestCase): class ReplicationTestCase(base.BaseVolumeTestCase):
@mock.patch.object(volume_rpcapi.VolumeAPI, 'failover_host') @mock.patch('cinder.objects.Service.is_up', True)
@mock.patch.object(volume_rpcapi.VolumeAPI, 'failover')
@mock.patch.object(cinder.db, 'conditional_update') @mock.patch.object(cinder.db, 'conditional_update')
@mock.patch.object(cinder.db, 'service_get') @mock.patch.object(objects.ServiceList, 'get_all')
def test_failover_host(self, mock_db_args, mock_db_update, def test_failover(self, mock_get_all, mock_db_update, mock_failover):
mock_failover): """Test replication failover."""
"""Test replication failover_host."""
mock_db_args.return_value = fake_service.fake_service_obj( service = fake_service.fake_service_obj(self.context,
self.context, binary='cinder-volume')
binary='cinder-volume') mock_get_all.return_value = [service]
mock_db_update.return_value = {'replication_status': 'enabled'} mock_db_update.return_value = {'replication_status': 'enabled'}
volume_api = cinder.volume.api.API() volume_api = cinder.volume.api.API()
volume_api.failover_host(self.context, host=CONF.host) volume_api.failover(self.context, host=CONF.host, cluster_name=None)
mock_failover.assert_called_once_with(self.context, CONF.host, None) mock_failover.assert_called_once_with(self.context, service, None)
@mock.patch.object(volume_rpcapi.VolumeAPI, 'failover_host') @mock.patch.object(volume_rpcapi.VolumeAPI, 'failover')
@mock.patch.object(cinder.db, 'conditional_update') @mock.patch.object(cinder.db, 'conditional_update')
@mock.patch.object(cinder.db, 'service_get') @mock.patch.object(cinder.db, 'service_get_all')
def test_failover_host_unexpected_status(self, mock_db_args, def test_failover_unexpected_status(self, mock_db_get_all, mock_db_update,
mock_db_update, mock_failover):
mock_failover): """Test replication failover unxepected status."""
"""Test replication failover_host unxepected status."""
mock_db_args.return_value = fake_service.fake_service_obj( mock_db_get_all.return_value = [fake_service.fake_service_obj(
self.context, self.context,
binary='cinder-volume') binary='cinder-volume')]
mock_db_update.return_value = None mock_db_update.return_value = None
volume_api = cinder.volume.api.API() volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidInput, self.assertRaises(exception.InvalidInput,
volume_api.failover_host, volume_api.failover,
self.context, self.context,
host=CONF.host) host=CONF.host,
cluster_name=None)
@mock.patch.object(volume_rpcapi.VolumeAPI, 'freeze_host') @mock.patch.object(volume_rpcapi.VolumeAPI, 'freeze_host')
@mock.patch.object(cinder.db, 'conditional_update') @mock.patch.object(cinder.db, 'conditional_update', return_value=1)
@mock.patch.object(cinder.db, 'service_get') @mock.patch.object(cinder.objects.ServiceList, 'get_all')
def test_freeze_host(self, mock_db_args, mock_db_update, def test_freeze_host(self, mock_get_all, mock_db_update,
mock_freeze): mock_freeze):
"""Test replication freeze_host.""" """Test replication freeze_host."""
mock_db_args.return_value = fake_service.fake_service_obj( service = fake_service.fake_service_obj(self.context,
self.context, binary='cinder-volume')
binary='cinder-volume') mock_get_all.return_value = [service]
mock_db_update.return_value = {'frozen': False} mock_freeze.return_value = True
volume_api = cinder.volume.api.API() volume_api = cinder.volume.api.API()
volume_api.freeze_host(self.context, host=CONF.host) volume_api.freeze_host(self.context, host=CONF.host, cluster_name=None)
mock_freeze.assert_called_once_with(self.context, CONF.host) mock_freeze.assert_called_once_with(self.context, service)
@mock.patch.object(volume_rpcapi.VolumeAPI, 'freeze_host') @mock.patch.object(volume_rpcapi.VolumeAPI, 'freeze_host')
@mock.patch.object(cinder.db, 'conditional_update') @mock.patch.object(cinder.db, 'conditional_update')
@mock.patch.object(cinder.db, 'service_get') @mock.patch.object(cinder.db, 'service_get_all')
def test_freeze_host_unexpected_status(self, mock_db_args, def test_freeze_host_unexpected_status(self, mock_get_all,
mock_db_update, mock_db_update,
mock_freeze): mock_freeze):
"""Test replication freeze_host unexpected status.""" """Test replication freeze_host unexpected status."""
mock_db_args.return_value = fake_service.fake_service_obj( mock_get_all.return_value = [fake_service.fake_service_obj(
self.context, self.context,
binary='cinder-volume') binary='cinder-volume')]
mock_db_update.return_value = None mock_db_update.return_value = None
volume_api = cinder.volume.api.API() volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidInput, self.assertRaises(exception.InvalidInput,
volume_api.freeze_host, volume_api.freeze_host,
self.context, self.context,
host=CONF.host) host=CONF.host,
cluster_name=None)
@mock.patch.object(volume_rpcapi.VolumeAPI, 'thaw_host') @mock.patch.object(volume_rpcapi.VolumeAPI, 'thaw_host')
@mock.patch.object(cinder.db, 'conditional_update') @mock.patch.object(cinder.db, 'conditional_update', return_value=1)
@mock.patch.object(cinder.db, 'service_get') @mock.patch.object(cinder.objects.ServiceList, 'get_all')
def test_thaw_host(self, mock_db_args, mock_db_update, def test_thaw_host(self, mock_get_all, mock_db_update,
mock_thaw): mock_thaw):
"""Test replication thaw_host.""" """Test replication thaw_host."""
mock_db_args.return_value = fake_service.fake_service_obj( service = fake_service.fake_service_obj(self.context,
self.context, binary='cinder-volume')
binary='cinder-volume') mock_get_all.return_value = [service]
mock_db_update.return_value = {'frozen': True}
mock_thaw.return_value = True mock_thaw.return_value = True
volume_api = cinder.volume.api.API() volume_api = cinder.volume.api.API()
volume_api.thaw_host(self.context, host=CONF.host) volume_api.thaw_host(self.context, host=CONF.host, cluster_name=None)
mock_thaw.assert_called_once_with(self.context, CONF.host) mock_thaw.assert_called_once_with(self.context, service)
@mock.patch.object(volume_rpcapi.VolumeAPI, 'thaw_host') @mock.patch.object(volume_rpcapi.VolumeAPI, 'thaw_host')
@mock.patch.object(cinder.db, 'conditional_update') @mock.patch.object(cinder.db, 'conditional_update')
@mock.patch.object(cinder.db, 'service_get') @mock.patch.object(cinder.db, 'service_get_all')
def test_thaw_host_unexpected_status(self, mock_db_args, def test_thaw_host_unexpected_status(self, mock_get_all,
mock_db_update, mock_db_update,
mock_thaw): mock_thaw):
"""Test replication thaw_host unexpected status.""" """Test replication thaw_host unexpected status."""
mock_db_args.return_value = fake_service.fake_service_obj( mock_get_all.return_value = [fake_service.fake_service_obj(
self.context, self.context,
binary='cinder-volume') binary='cinder-volume')]
mock_db_update.return_value = None mock_db_update.return_value = None
volume_api = cinder.volume.api.API() volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidInput, self.assertRaises(exception.InvalidInput,
volume_api.thaw_host, volume_api.thaw_host,
self.context, self.context,
host=CONF.host) host=CONF.host, cluster_name=None)
@mock.patch('cinder.volume.driver.BaseVD.failover_completed')
def test_failover_completed(self, completed_mock):
rep_field = fields.ReplicationStatus
svc = objects.Service(self.context, host=self.volume.host,
binary=constants.VOLUME_BINARY,
replication_status=rep_field.ENABLED)
svc.create()
self.volume.failover_completed(
self.context,
{'active_backend_id': 'secondary',
'replication_status': rep_field.FAILED_OVER})
service = objects.Service.get_by_id(self.context, svc.id)
self.assertEqual('secondary', service.active_backend_id)
self.assertEqual('failed-over', service.replication_status)
completed_mock.assert_called_once_with(self.context, 'secondary')
@mock.patch('cinder.volume.driver.BaseVD.failover_completed', wraps=True)
def test_failover_completed_driver_failure(self, completed_mock):
rep_field = fields.ReplicationStatus
svc = objects.Service(self.context, host=self.volume.host,
binary=constants.VOLUME_BINARY,
replication_status=rep_field.ENABLED)
svc.create()
self.volume.failover_completed(
self.context,
{'active_backend_id': 'secondary',
'replication_status': rep_field.FAILED_OVER})
service = objects.Service.get_by_id(self.context, svc.id)
self.assertEqual('secondary', service.active_backend_id)
self.assertEqual(rep_field.ERROR, service.replication_status)
self.assertTrue(service.disabled)
self.assertIsNotNone(service.disabled_reason)
completed_mock.assert_called_once_with(self.context, 'secondary')
@mock.patch('cinder.volume.rpcapi.VolumeAPI.failover_completed')
def test_finish_failover_non_clustered(self, completed_mock):
svc = mock.Mock(is_clustered=None)
self.volume.finish_failover(self.context, svc, mock.sentinel.updates)
svc.update.assert_called_once_with(mock.sentinel.updates)
svc.save.assert_called_once_with()
completed_mock.assert_not_called()
@mock.patch('cinder.volume.rpcapi.VolumeAPI.failover_completed')
def test_finish_failover_clustered(self, completed_mock):
svc = mock.Mock(cluster_name='cluster_name')
updates = {'status': 'error'}
self.volume.finish_failover(self.context, svc, updates)
completed_mock.assert_called_once_with(self.context, svc, updates)
svc.cluster.status = 'error'
svc.cluster.save.assert_called_once()
@ddt.data(None, 'cluster_name')
@mock.patch('cinder.volume.manager.VolumeManager.finish_failover')
@mock.patch('cinder.volume.manager.VolumeManager._get_my_volumes')
def test_failover_manager(self, cluster, get_vols_mock, finish_mock):
"""Test manager's failover method for clustered and not clustered."""
rep_field = fields.ReplicationStatus
svc = objects.Service(self.context, host=self.volume.host,
binary=constants.VOLUME_BINARY,
cluster_name=cluster,
replication_status=rep_field.ENABLED)
svc.create()
vol = objects.Volume(self.context, host=self.volume.host)
vol.create()
get_vols_mock.return_value = [vol]
with mock.patch.object(self.volume, 'driver') as driver:
called, not_called = driver.failover_host, driver.failover
if cluster:
called, not_called = not_called, called
called.return_value = ('secondary', [{'volume_id': vol.id,
'updates': {'status': 'error'}}])
self.volume.failover(self.context,
secondary_backend_id='secondary')
not_called.assert_not_called()
called.assert_called_once_with(self.context, [vol],
secondary_id='secondary')
expected_update = {'replication_status': rep_field.FAILED_OVER,
'active_backend_id': 'secondary',
'disabled': True,
'disabled_reason': 'failed-over'}
finish_mock.assert_called_once_with(self.context, svc, expected_update)
volume = objects.Volume.get_by_id(self.context, vol.id)
self.assertEqual('error', volume.status)
@ddt.data(('host1', None), (None, 'mycluster'))
@ddt.unpack
def test_failover_api_fail_multiple_results(self, host, cluster):
"""Fail if we try to failover multiple backends in the same request."""
rep_field = fields.ReplicationStatus
clusters = [
objects.Cluster(self.context,
name='mycluster@backend1',
replication_status=rep_field.ENABLED,
binary=constants.VOLUME_BINARY),
objects.Cluster(self.context,
name='mycluster@backend2',
replication_status=rep_field.ENABLED,
binary=constants.VOLUME_BINARY)
]
clusters[0].create()
clusters[1].create()
services = [
objects.Service(self.context, host='host1@backend1',
cluster_name=clusters[0].name,
replication_status=rep_field.ENABLED,
binary=constants.VOLUME_BINARY),
objects.Service(self.context, host='host1@backend2',
cluster_name=clusters[1].name,
replication_status=rep_field.ENABLED,
binary=constants.VOLUME_BINARY),
]
services[0].create()
services[1].create()
self.assertRaises(exception.Invalid,
self.volume_api.failover, self.context, host,
cluster)
def test_failover_api_not_found(self):
self.assertRaises(exception.ServiceNotFound, self.volume_api.failover,
self.context, 'host1', None)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.failover')
def test_failover_api_success_multiple_results(self, failover_mock):
"""Succeed to failover multiple services for the same backend."""
rep_field = fields.ReplicationStatus
cluster_name = 'mycluster@backend1'
cluster = objects.Cluster(self.context,
name=cluster_name,
replication_status=rep_field.ENABLED,
binary=constants.VOLUME_BINARY)
cluster.create()
services = [
objects.Service(self.context, host='host1@backend1',
cluster_name=cluster_name,
replication_status=rep_field.ENABLED,
binary=constants.VOLUME_BINARY),
objects.Service(self.context, host='host2@backend1',
cluster_name=cluster_name,
replication_status=rep_field.ENABLED,
binary=constants.VOLUME_BINARY),
]
services[0].create()
services[1].create()
self.volume_api.failover(self.context, None, cluster_name,
mock.sentinel.secondary_id)
for service in services + [cluster]:
self.assertEqual(rep_field.ENABLED, service.replication_status)
service.refresh()
self.assertEqual(rep_field.FAILING_OVER,
service.replication_status)
failover_mock.assert_called_once_with(self.context, mock.ANY,
mock.sentinel.secondary_id)
self.assertEqual(services[0].id, failover_mock.call_args[0][1].id)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.failover')
def test_failover_api_success_multiple_results_not_updated(self,
failover_mock):
"""Succeed to failover even if a service is not updated."""
rep_field = fields.ReplicationStatus
cluster_name = 'mycluster@backend1'
cluster = objects.Cluster(self.context,
name=cluster_name,
replication_status=rep_field.ENABLED,
binary=constants.VOLUME_BINARY)
cluster.create()
services = [
objects.Service(self.context, host='host1@backend1',
cluster_name=cluster_name,
replication_status=rep_field.ENABLED,
binary=constants.VOLUME_BINARY),
objects.Service(self.context, host='host2@backend1',
cluster_name=cluster_name,
replication_status=rep_field.ERROR,
binary=constants.VOLUME_BINARY),
]
services[0].create()
services[1].create()
self.volume_api.failover(self.context, None, cluster_name,
mock.sentinel.secondary_id)
for service in services[:1] + [cluster]:
service.refresh()
self.assertEqual(rep_field.FAILING_OVER,
service.replication_status)
services[1].refresh()
self.assertEqual(rep_field.ERROR, services[1].replication_status)
failover_mock.assert_called_once_with(self.context, mock.ANY,
mock.sentinel.secondary_id)
self.assertEqual(services[0].id, failover_mock.call_args[0][1].id)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.failover')
def test_failover_api_fail_multiple_results_not_updated(self,
failover_mock):
"""Fail if none of the services could be updated."""
rep_field = fields.ReplicationStatus
cluster_name = 'mycluster@backend1'
cluster = objects.Cluster(self.context,
name=cluster_name,
replication_status=rep_field.ENABLED,
binary=constants.VOLUME_BINARY)
cluster.create()
down_time = timeutils.datetime.datetime(1970, 1, 1)
services = [
# This service is down
objects.Service(self.context, host='host1@backend1',
cluster_name=cluster_name,
replication_status=rep_field.ENABLED,
created_at=down_time,
updated_at=down_time,
modified_at=down_time,
binary=constants.VOLUME_BINARY),
# This service is not with the right replication status
objects.Service(self.context, host='host2@backend1',
cluster_name=cluster_name,
replication_status=rep_field.ERROR,
binary=constants.VOLUME_BINARY),
]
services[0].create()
services[1].create()
self.assertRaises(exception.InvalidInput,
self.volume_api.failover, self.context, None,
cluster_name, mock.sentinel.secondary_id)
for service in services:
svc = objects.Service.get_by_id(self.context, service.id)
self.assertEqual(service.replication_status,
svc.replication_status)
cluster.refresh()
self.assertEqual(rep_field.ENABLED, cluster.replication_status)
failover_mock.assert_not_called()
class CopyVolumeToImageTestCase(base.BaseVolumeTestCase): class CopyVolumeToImageTestCase(base.BaseVolumeTestCase):

View File

@ -32,6 +32,7 @@ from cinder.objects import fields
from cinder import test from cinder import test
from cinder.tests.unit.backup import fake_backup from cinder.tests.unit.backup import fake_backup
from cinder.tests.unit import fake_constants as fake from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_service
from cinder.tests.unit import fake_snapshot from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume from cinder.tests.unit import fake_volume
from cinder.tests.unit import utils as tests_utils from cinder.tests.unit import utils as tests_utils
@ -135,6 +136,7 @@ class VolumeRpcAPITestCase(test.TestCase):
self.assertIn('id', self.fake_volume) self.assertIn('id', self.fake_volume)
def _get_expected_msg(self, kwargs): def _get_expected_msg(self, kwargs):
update = kwargs.pop('_expected_msg', {})
expected_msg = copy.deepcopy(kwargs) expected_msg = copy.deepcopy(kwargs)
if 'volume' in expected_msg: if 'volume' in expected_msg:
volume = expected_msg.pop('volume') volume = expected_msg.pop('volume')
@ -172,9 +174,11 @@ class VolumeRpcAPITestCase(test.TestCase):
if 'new_volume' in expected_msg: if 'new_volume' in expected_msg:
volume = expected_msg['new_volume'] volume = expected_msg['new_volume']
expected_msg['new_volume_id'] = volume['id'] expected_msg['new_volume_id'] = volume['id']
expected_msg.update(update)
return expected_msg return expected_msg
def _test_volume_api(self, method, rpc_method, **kwargs): def _test_volume_api(self, method, rpc_method, _expected_method=None,
**kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project') ctxt = context.RequestContext('fake_user', 'fake_project')
if 'rpcapi_class' in kwargs: if 'rpcapi_class' in kwargs:
@ -207,6 +211,8 @@ class VolumeRpcAPITestCase(test.TestCase):
host = 'fake_host' host = 'fake_host'
elif 'cgsnapshot' in kwargs: elif 'cgsnapshot' in kwargs:
host = kwargs['cgsnapshot'].consistencygroup.service_topic_queue host = kwargs['cgsnapshot'].consistencygroup.service_topic_queue
elif 'service' in kwargs:
host = kwargs['service'].service_topic_queue
target['server'] = utils.extract_host(host, 'host') target['server'] = utils.extract_host(host, 'host')
target['topic'] = '%s.%s' % (constants.VOLUME_TOPIC, target['topic'] = '%s.%s' % (constants.VOLUME_TOPIC,
@ -233,7 +239,7 @@ class VolumeRpcAPITestCase(test.TestCase):
retval = getattr(rpcapi, method)(ctxt, **kwargs) retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(expected_retval, retval) self.assertEqual(expected_retval, retval)
expected_args = [ctxt, method] expected_args = [ctxt, _expected_method or method]
for arg, expected_arg in zip(self.fake_args, expected_args): for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(expected_arg, arg) self.assertEqual(expected_arg, arg)
@ -623,18 +629,42 @@ class VolumeRpcAPITestCase(test.TestCase):
version='3.0') version='3.0')
def test_freeze_host(self): def test_freeze_host(self):
service = fake_service.fake_service_obj(self.context,
host='fake_host',
binary='cinder-volume')
self._test_volume_api('freeze_host', rpc_method='call', self._test_volume_api('freeze_host', rpc_method='call',
host='fake_host', version='3.0') service=service, version='3.0')
def test_thaw_host(self): def test_thaw_host(self):
self._test_volume_api('thaw_host', rpc_method='call', host='fake_host', service = fake_service.fake_service_obj(self.context,
host='fake_host',
binary='cinder-volume')
self._test_volume_api('thaw_host', rpc_method='call', service=service,
version='3.0') version='3.0')
def test_failover_host(self): @ddt.data('3.0', '3.8')
self._test_volume_api('failover_host', rpc_method='cast', @mock.patch('oslo_messaging.RPCClient.can_send_version')
host='fake_host', def test_failover(self, version, can_send_version):
can_send_version.side_effect = lambda x: x == version
service = objects.Service(self.context, host='fake_host',
cluster_name=None)
_expected_method = 'failover' if version == '3.8' else 'failover_host'
self._test_volume_api('failover', rpc_method='cast',
service=service,
secondary_backend_id='fake_backend', secondary_backend_id='fake_backend',
version='3.0') version=version,
_expected_method=_expected_method)
@mock.patch('cinder.volume.rpcapi.VolumeAPI._get_cctxt')
def test_failover_completed(self, cctxt_mock):
service = objects.Service(self.context, host='fake_host',
cluster_name='cluster_name')
rpcapi = volume_rpcapi.VolumeAPI()
rpcapi.failover_completed(self.context, service, mock.sentinel.updates)
cctxt_mock.assert_called_once_with(service.cluster_name, '3.8',
fanout=True)
cctxt_mock.return_value.cast(self.context, 'failover_completed',
updates=mock.sentinel.updates)
def test_create_consistencygroup_from_src_cgsnapshot(self): def test_create_consistencygroup_from_src_cgsnapshot(self):
self._test_volume_api('create_consistencygroup_from_src', self._test_volume_api('create_consistencygroup_from_src',

View File

@ -76,6 +76,7 @@ class TestNexentaEdgeISCSIDriver(test.TestCase):
self.cfg.nexenta_iscsi_service = NEDGE_SERVICE self.cfg.nexenta_iscsi_service = NEDGE_SERVICE
self.cfg.nexenta_blocksize = NEDGE_BLOCKSIZE self.cfg.nexenta_blocksize = NEDGE_BLOCKSIZE
self.cfg.nexenta_chunksize = NEDGE_CHUNKSIZE self.cfg.nexenta_chunksize = NEDGE_CHUNKSIZE
self.cfg.replication_device = []
mock_exec = mock.Mock() mock_exec = mock.Mock()
mock_exec.return_value = ('', '') mock_exec.return_value = ('', '')

View File

@ -78,6 +78,7 @@ class TestNexentaEdgeNBDDriver(test.TestCase):
self.cfg.nexenta_blocksize = 512 self.cfg.nexenta_blocksize = 512
self.cfg.nexenta_chunksize = 4096 self.cfg.nexenta_chunksize = 4096
self.cfg.reserved_percentage = 0 self.cfg.reserved_percentage = 0
self.cfg.replication_device = []
self.ctx = context.get_admin_context() self.ctx = context.get_admin_context()
self.drv = nbd.NexentaEdgeNBDDriver(configuration=self.cfg) self.drv = nbd.NexentaEdgeNBDDriver(configuration=self.cfg)

View File

@ -0,0 +1,120 @@
# Copyright (c) 2016 Red Hat Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from cinder import exception
from cinder import test
from cinder.volume import driver
from cinder.volume import manager
def my_safe_get(self, value):
if value == 'replication_device':
return ['replication']
return None
@ddt.ddt
class DriverTestCase(test.TestCase):
@staticmethod
def _get_driver(relicated, version):
class NonReplicatedDriver(driver.VolumeDriver):
pass
class V21Driver(driver.VolumeDriver):
def failover_host(*args, **kwargs):
pass
class AADriver(V21Driver):
def failover_completed(*args, **kwargs):
pass
if not relicated:
return NonReplicatedDriver
if version == 'v2.1':
return V21Driver
return AADriver
@ddt.data('v2.1', 'a/a', 'newfeature')
def test_supports_replication_feature_none(self, rep_version):
my_driver = self._get_driver(False, None)
self.assertFalse(my_driver.supports_replication_feature(rep_version))
@ddt.data('v2.1', 'a/a', 'newfeature')
def test_supports_replication_feature_only_21(self, rep_version):
version = 'v2.1'
my_driver = self._get_driver(True, version)
self.assertEqual(rep_version == version,
my_driver.supports_replication_feature(rep_version))
@ddt.data('v2.1', 'a/a', 'newfeature')
def test_supports_replication_feature_aa(self, rep_version):
my_driver = self._get_driver(True, 'a/a')
self.assertEqual(rep_version in ('v2.1', 'a/a'),
my_driver.supports_replication_feature(rep_version))
def test_init_non_replicated(self):
config = manager.config.Configuration(manager.volume_manager_opts,
config_group='volume')
# No exception raised
self._get_driver(False, None)(configuration=config)
@ddt.data('v2.1', 'a/a')
@mock.patch('cinder.volume.configuration.Configuration.safe_get',
my_safe_get)
def test_init_replicated_non_clustered(self, version):
def append_config_values(self, volume_opts):
pass
config = manager.config.Configuration(manager.volume_manager_opts,
config_group='volume')
# No exception raised
self._get_driver(True, version)(configuration=config)
@mock.patch('cinder.volume.configuration.Configuration.safe_get',
my_safe_get)
def test_init_replicated_clustered_not_supported(self):
config = manager.config.Configuration(manager.volume_manager_opts,
config_group='volume')
# Raises exception because we are trying to run a replicated service
# in clustered mode but the driver doesn't support it.
self.assertRaises(exception.Invalid, self._get_driver(True, 'v2.1'),
configuration=config, cluster_name='mycluster')
@mock.patch('cinder.volume.configuration.Configuration.safe_get',
my_safe_get)
def test_init_replicated_clustered_supported(self):
config = manager.config.Configuration(manager.volume_manager_opts,
config_group='volume')
# No exception raised
self._get_driver(True, 'a/a')(configuration=config,
cluster_name='mycluster')
def test_failover(self):
"""Test default failover behavior of calling failover_host."""
my_driver = self._get_driver(True, 'a/a')()
with mock.patch.object(my_driver, 'failover_host') as failover_mock:
res = my_driver.failover(mock.sentinel.context,
mock.sentinel.volumes,
secondary_id=mock.sentinel.secondary_id)
self.assertEqual(failover_mock.return_value, res)
failover_mock.assert_called_once_with(mock.sentinel.context,
mock.sentinel.volumes,
mock.sentinel.secondary_id)

View File

@ -31,7 +31,7 @@ class ReplicationTestCase(base.BaseVolumeTestCase):
self.host = 'host@backend#pool' self.host = 'host@backend#pool'
self.manager = manager.VolumeManager(host=self.host) self.manager = manager.VolumeManager(host=self.host)
@mock.patch('cinder.objects.VolumeList.get_all_by_host') @mock.patch('cinder.objects.VolumeList.get_all')
@mock.patch('cinder.volume.driver.BaseVD.failover_host', @mock.patch('cinder.volume.driver.BaseVD.failover_host',
side_effect=exception.InvalidReplicationTarget('')) side_effect=exception.InvalidReplicationTarget(''))
@ddt.data(('backend2', 'default', fields.ReplicationStatus.FAILED_OVER), @ddt.data(('backend2', 'default', fields.ReplicationStatus.FAILED_OVER),
@ -55,7 +55,8 @@ class ReplicationTestCase(base.BaseVolumeTestCase):
replication_status=fields.ReplicationStatus.FAILING_OVER) replication_status=fields.ReplicationStatus.FAILING_OVER)
self.manager.failover_host(self.context, new_backend) self.manager.failover_host(self.context, new_backend)
mock_getall.assert_called_once_with(self.context, self.host) mock_getall.assert_called_once_with(self.context,
filters={'host': self.host})
mock_failover.assert_called_once_with(self.context, mock_failover.assert_called_once_with(self.context,
mock_getall.return_value, mock_getall.return_value,
secondary_id=new_backend) secondary_id=new_backend)

View File

@ -26,6 +26,7 @@ from oslo_log import log as logging
from oslo_utils import excutils from oslo_utils import excutils
from oslo_utils import strutils from oslo_utils import strutils
from oslo_utils import timeutils from oslo_utils import timeutils
from oslo_utils import versionutils
import six import six
from cinder.api import common from cinder.api import common
@ -1704,70 +1705,137 @@ class API(base.Base):
offset, sort_keys, offset, sort_keys,
sort_dirs) sort_dirs)
def _get_cluster_and_services_for_replication(self, ctxt, host,
cluster_name):
services = objects.ServiceList.get_all(
ctxt, filters={'host': host, 'cluster_name': cluster_name,
'binary': constants.VOLUME_BINARY})
if not services:
msg = _('No service found with ') + (
'host=%(host)s' if host else 'cluster=%(cluster_name)s')
raise exception.ServiceNotFound(msg, host=host,
cluster_name=cluster_name)
cluster = services[0].cluster
# Check that the host or cluster we received only results in 1 host or
# hosts from the same cluster.
if cluster_name:
check_attribute = 'cluster_name'
expected = cluster.name
else:
check_attribute = 'host'
expected = services[0].host
if any(getattr(s, check_attribute) != expected for s in services):
msg = _('Services from different clusters found.')
raise exception.InvalidParameterValue(msg)
# If we received host parameter but host belongs to a cluster we have
# to change all the services in the cluster, not just one host
if host and cluster:
services = cluster.services
return cluster, services
def _replication_db_change(self, ctxt, field, expected_value, new_value,
host, cluster_name, check_up=False):
def _error_msg(service):
expected = utils.build_or_str(six.text_type(expected_value))
up_msg = 'and must be up ' if check_up else ''
msg = (_('%(field)s in %(service)s must be %(expected)s '
'%(up_msg)sto failover.')
% {'field': field, 'service': service,
'expected': expected, 'up_msg': up_msg})
LOG.error(msg)
return msg
cluster, services = self._get_cluster_and_services_for_replication(
ctxt, host, cluster_name)
expect = {field: expected_value}
change = {field: new_value}
if cluster:
old_value = getattr(cluster, field)
if ((check_up and not cluster.is_up)
or not cluster.conditional_update(change, expect)):
msg = _error_msg(cluster.name)
raise exception.InvalidInput(reason=msg)
changed = []
not_changed = []
for service in services:
if ((not check_up or service.is_up)
and service.conditional_update(change, expect)):
changed.append(service)
else:
not_changed.append(service)
# If there were some services that couldn't be changed we should at
# least log the error.
if not_changed:
msg = _error_msg([s.host for s in not_changed])
# If we couldn't change any of the services
if not changed:
# Undo the cluster change
if cluster:
setattr(cluster, field, old_value)
cluster.save()
raise exception.InvalidInput(
reason=_('No service could be changed: %s') % msg)
LOG.warning(_LW('Some services could not be changed: %s'), msg)
return cluster, services
# FIXME(jdg): Move these Cheesecake methods (freeze, thaw and failover) # FIXME(jdg): Move these Cheesecake methods (freeze, thaw and failover)
# to a services API because that's what they are # to a services API because that's what they are
def failover_host(self, def failover(self, ctxt, host, cluster_name, secondary_id=None):
ctxt,
host,
secondary_id=None):
check_policy(ctxt, 'failover_host') check_policy(ctxt, 'failover_host')
ctxt = ctxt if ctxt.is_admin else ctxt.elevated() ctxt = ctxt if ctxt.is_admin else ctxt.elevated()
svc_host = volume_utils.extract_host(host, 'backend')
service = objects.Service.get_by_args( # TODO(geguileo): In P - Remove this version check
ctxt, svc_host, constants.VOLUME_BINARY) rpc_version = self.volume_rpcapi.determine_rpc_version_cap()
expected = {'replication_status': [fields.ReplicationStatus.ENABLED, rpc_version = versionutils.convert_version_to_tuple(rpc_version)
fields.ReplicationStatus.FAILED_OVER]} if cluster_name and rpc_version < (3, 5):
result = service.conditional_update( msg = _('replication operations with cluster field')
{'replication_status': fields.ReplicationStatus.FAILING_OVER}, raise exception.UnavailableDuringUpgrade(action=msg)
expected)
if not result:
expected_status = utils.build_or_str(
expected['replication_status'])
msg = (_('Host replication_status must be %s to failover.')
% expected_status)
LOG.error(msg)
raise exception.InvalidInput(reason=msg)
self.volume_rpcapi.failover_host(ctxt, host, secondary_id)
def freeze_host(self, ctxt, host): rep_fields = fields.ReplicationStatus
expected_values = [rep_fields.ENABLED, rep_fields.FAILED_OVER]
new_value = rep_fields.FAILING_OVER
cluster, services = self._replication_db_change(
ctxt, 'replication_status', expected_values, new_value, host,
cluster_name, check_up=True)
self.volume_rpcapi.failover(ctxt, services[0], secondary_id)
def freeze_host(self, ctxt, host, cluster_name):
check_policy(ctxt, 'freeze_host') check_policy(ctxt, 'freeze_host')
ctxt = ctxt if ctxt.is_admin else ctxt.elevated() ctxt = ctxt if ctxt.is_admin else ctxt.elevated()
svc_host = volume_utils.extract_host(host, 'backend')
service = objects.Service.get_by_args( expected = False
ctxt, svc_host, constants.VOLUME_BINARY) new_value = True
expected = {'frozen': False} cluster, services = self._replication_db_change(
result = service.conditional_update( ctxt, 'frozen', expected, new_value, host, cluster_name,
{'frozen': True}, expected) check_up=False)
if not result:
msg = _('Host is already Frozen.')
LOG.error(msg)
raise exception.InvalidInput(reason=msg)
# Should we set service status to disabled to keep # Should we set service status to disabled to keep
# scheduler calls from being sent? Just use existing # scheduler calls from being sent? Just use existing
# `cinder service-disable reason=freeze` # `cinder service-disable reason=freeze`
self.volume_rpcapi.freeze_host(ctxt, host) self.volume_rpcapi.freeze_host(ctxt, services[0])
def thaw_host(self, ctxt, host):
def thaw_host(self, ctxt, host, cluster_name):
check_policy(ctxt, 'thaw_host') check_policy(ctxt, 'thaw_host')
ctxt = ctxt if ctxt.is_admin else ctxt.elevated() ctxt = ctxt if ctxt.is_admin else ctxt.elevated()
svc_host = volume_utils.extract_host(host, 'backend')
service = objects.Service.get_by_args( expected = True
ctxt, svc_host, constants.VOLUME_BINARY) new_value = False
expected = {'frozen': True} cluster, services = self._replication_db_change(
result = service.conditional_update( ctxt, 'frozen', expected, new_value, host, cluster_name,
{'frozen': False}, expected) check_up=False)
if not result:
msg = _('Host is NOT Frozen.') if not self.volume_rpcapi.thaw_host(ctxt, services[0]):
LOG.error(msg)
raise exception.InvalidInput(reason=msg)
if not self.volume_rpcapi.thaw_host(ctxt, host):
return "Backend reported error during thaw_host operation." return "Backend reported error during thaw_host operation."
def check_volume_filters(self, filters, strict=False): def check_volume_filters(self, filters, strict=False):

View File

@ -335,6 +335,10 @@ class BaseVD(object):
# the unsupported driver started. # the unsupported driver started.
SUPPORTED = True SUPPORTED = True
# Methods checked to detect a driver implements a replication feature
REPLICATION_FEATURE_CHECKERS = {'v2.1': 'failover_host',
'a/a': 'failover_completed'}
def __init__(self, execute=utils.execute, *args, **kwargs): def __init__(self, execute=utils.execute, *args, **kwargs):
# NOTE(vish): db is set by Manager # NOTE(vish): db is set by Manager
self.db = kwargs.get('db') self.db = kwargs.get('db')
@ -347,6 +351,16 @@ class BaseVD(object):
self.configuration.append_config_values(iser_opts) self.configuration.append_config_values(iser_opts)
utils.setup_tracing(self.configuration.safe_get('trace_flags')) utils.setup_tracing(self.configuration.safe_get('trace_flags'))
# NOTE(geguileo): Don't allow to start if we are enabling
# replication on a cluster service with a backend that doesn't
# support the required mechanism for Active-Active.
replication_devices = self.configuration.safe_get(
'replication_device')
if (self.cluster_name and replication_devices and
not self.supports_replication_feature('a/a')):
raise exception.Invalid(_("Driver doesn't support clustered "
"replication."))
self.driver_utils = driver_utils.VolumeDriverUtils( self.driver_utils = driver_utils.VolumeDriverUtils(
self._driver_data_namespace(), self.db) self._driver_data_namespace(), self.db)
@ -1701,6 +1715,38 @@ class BaseVD(object):
# 'replication_extended_status': 'whatever',...}},] # 'replication_extended_status': 'whatever',...}},]
raise NotImplementedError() raise NotImplementedError()
def failover(self, context, volumes, secondary_id=None):
"""Like failover but for a host that is clustered.
Most of the time this will be the exact same behavior as failover_host,
so if it's not overwritten, it is assumed to be the case.
"""
return self.failover_host(context, volumes, secondary_id)
def failover_completed(self, context, active_backend_id=None):
"""This method is called after failover for clustered backends."""
raise NotImplementedError()
@classmethod
def _is_base_method(cls, method_name):
method = getattr(cls, method_name)
return method.__module__ == getattr(BaseVD, method_name).__module__
@classmethod
def supports_replication_feature(cls, feature):
"""Check if driver class supports replication features.
Feature is a string that must be one of:
- v2.1
- a/a
"""
if feature not in cls.REPLICATION_FEATURE_CHECKERS:
return False
# Check if method is being implemented/overwritten by the driver
method_name = cls.REPLICATION_FEATURE_CHECKERS[feature]
return not cls._is_base_method(method_name)
def get_replication_updates(self, context): def get_replication_updates(self, context):
"""Old replication update method, deprecate.""" """Old replication update method, deprecate."""
raise NotImplementedError() raise NotImplementedError()

View File

@ -4151,9 +4151,8 @@ class VolumeManager(manager.CleanableManager,
volume.update(model_update_default) volume.update(model_update_default)
volume.save() volume.save()
# Replication V2.1 methods # Replication V2.1 and a/a method
def failover_host(self, context, def failover(self, context, secondary_backend_id=None):
secondary_backend_id=None):
"""Failover a backend to a secondary replication target. """Failover a backend to a secondary replication target.
Instructs a replication capable/configured backend to failover Instructs a replication capable/configured backend to failover
@ -4167,30 +4166,33 @@ class VolumeManager(manager.CleanableManager,
:param context: security context :param context: security context
:param secondary_backend_id: Specifies backend_id to fail over to :param secondary_backend_id: Specifies backend_id to fail over to
""" """
updates = {}
repl_status = fields.ReplicationStatus
svc_host = vol_utils.extract_host(self.host, 'backend') svc_host = vol_utils.extract_host(self.host, 'backend')
service = objects.Service.get_by_args(context, svc_host,
constants.VOLUME_BINARY)
volumes = self._get_my_volumes(context)
service = objects.Service.get_by_args( exception_encountered = True
context,
svc_host,
constants.VOLUME_BINARY)
volumes = objects.VolumeList.get_all_by_host(context, self.host)
exception_encountered = False
try: try:
# For non clustered we can call v2.1 failover_host, but for
# clustered we call a/a failover method. We know a/a method
# exists because BaseVD class wouldn't have started if it didn't.
failover = getattr(self.driver,
'failover' if service.is_clustered
else 'failover_host')
# expected form of volume_update_list: # expected form of volume_update_list:
# [{volume_id: <cinder-volid>, updates: {'provider_id': xxxx....}}, # [{volume_id: <cinder-volid>, updates: {'provider_id': xxxx....}},
# {volume_id: <cinder-volid>, updates: {'provider_id': xxxx....}}] # {volume_id: <cinder-volid>, updates: {'provider_id': xxxx....}}]
(active_backend_id, volume_update_list) = ( active_backend_id, volume_update_list = failover(
self.driver.failover_host( context,
context, volumes,
volumes, secondary_id=secondary_backend_id)
secondary_id=secondary_backend_id)) exception_encountered = False
except exception.UnableToFailOver: except exception.UnableToFailOver:
LOG.exception(_LE("Failed to perform replication failover")) LOG.exception(_LE("Failed to perform replication failover"))
service.replication_status = ( updates['replication_status'] = repl_status.FAILOVER_ERROR
fields.ReplicationStatus.FAILOVER_ERROR)
service.save()
exception_encountered = True
except exception.InvalidReplicationTarget: except exception.InvalidReplicationTarget:
LOG.exception(_LE("Invalid replication target specified " LOG.exception(_LE("Invalid replication target specified "
"for failover")) "for failover"))
@ -4199,12 +4201,9 @@ class VolumeManager(manager.CleanableManager,
# secondary to another secondary. In both cases active_backend_id # secondary to another secondary. In both cases active_backend_id
# will be set. # will be set.
if service.active_backend_id: if service.active_backend_id:
service.replication_status = ( updates['replication_status'] = repl_status.FAILED_OVER
fields.ReplicationStatus.FAILED_OVER)
else: else:
service.replication_status = fields.ReplicationStatus.ENABLED updates['replication_status'] = repl_status.ENABLED
service.save()
exception_encountered = True
except exception.VolumeDriverException: except exception.VolumeDriverException:
# NOTE(jdg): Drivers need to be aware if they fail during # NOTE(jdg): Drivers need to be aware if they fail during
# a failover sequence, we're expecting them to cleanup # a failover sequence, we're expecting them to cleanup
@ -4212,36 +4211,29 @@ class VolumeManager(manager.CleanableManager,
# backend is still set as primary as per driver memory # backend is still set as primary as per driver memory
LOG.error(_LE("Driver reported error during " LOG.error(_LE("Driver reported error during "
"replication failover.")) "replication failover."))
service.replication_status = ( updates.update(disabled=True,
fields.ReplicationStatus.FAILOVER_ERROR) replication_status=repl_status.FAILOVER_ERROR)
service.disabled = True
service.save()
exception_encountered = True
if exception_encountered: if exception_encountered:
LOG.error( LOG.error(
_LE("Error encountered during failover on host: " _LE("Error encountered during failover on host: "
"%(host)s invalid target ID %(backend_id)s"), "%(host)s invalid target ID %(backend_id)s"),
{'host': self.host, 'backend_id': {'host': self.host, 'backend_id':
secondary_backend_id}) secondary_backend_id})
self.finish_failover(context, service, updates)
return return
if secondary_backend_id == "default": if secondary_backend_id == "default":
service.replication_status = fields.ReplicationStatus.ENABLED updates['replication_status'] = repl_status.ENABLED
service.active_backend_id = "" updates['active_backend_id'] = ''
if service.frozen: updates['disabled'] = service.frozen
service.disabled = True updates['disabled_reason'] = 'frozen' if service.frozen else ''
service.disabled_reason = "frozen"
else:
service.disabled = False
service.disabled_reason = ""
service.save()
else: else:
service.replication_status = fields.ReplicationStatus.FAILED_OVER updates['replication_status'] = repl_status.FAILED_OVER
service.active_backend_id = active_backend_id updates['active_backend_id'] = active_backend_id
service.disabled = True updates['disabled'] = True
service.disabled_reason = "failed-over" updates['disabled_reason'] = 'failed-over'
service.save()
self.finish_failover(context, service, updates)
for update in volume_update_list: for update in volume_update_list:
# Response must include an id key: {volume_id: <cinder-uuid>} # Response must include an id key: {volume_id: <cinder-uuid>}
@ -4259,6 +4251,53 @@ class VolumeManager(manager.CleanableManager,
LOG.info(_LI("Failed over to replication target successfully.")) LOG.info(_LI("Failed over to replication target successfully."))
# TODO(geguileo): In P - remove this
failover_host = failover
def finish_failover(self, context, service, updates):
"""Completion of the failover locally or via RPC."""
# If the service is clustered, broadcast the service changes to all
# volume services, including this one.
if service.is_clustered:
# We have to update the cluster with the same data, and we do it
# before broadcasting the failover_completed RPC call to prevent
# races with services that may be starting..
for key, value in updates.items():
setattr(service.cluster, key, value)
service.cluster.save()
rpcapi = volume_rpcapi.VolumeAPI()
rpcapi.failover_completed(context, service, updates)
else:
service.update(updates)
service.save()
def failover_completed(self, context, updates):
"""Finalize failover of this backend.
When a service is clustered and replicated the failover has 2 stages,
one that does the failover of the volumes and another that finalizes
the failover of the services themselves.
This method takes care of the last part and is called from the service
doing the failover of the volumes after finished processing the
volumes.
"""
svc_host = vol_utils.extract_host(self.host, 'backend')
service = objects.Service.get_by_args(context, svc_host,
constants.VOLUME_BINARY)
service.update(updates)
try:
self.driver.failover_completed(context, service.active_backend_id)
except Exception:
msg = _('Driver reported error during replication failover '
'completion.')
LOG.exception(msg)
service.disabled = True
service.disabled_reason = msg
service.replication_status = (
fields.ReplicationStatus.ERROR)
service.save()
def freeze_host(self, context): def freeze_host(self, context):
"""Freeze management plane on this backend. """Freeze management plane on this backend.

View File

@ -122,9 +122,10 @@ class VolumeAPI(rpc.RPCAPI):
of @backend suffixes in server names. of @backend suffixes in server names.
3.7 - Adds do_cleanup method to do volume cleanups from other nodes 3.7 - Adds do_cleanup method to do volume cleanups from other nodes
that we were doing in init_host. that we were doing in init_host.
3.8 - Make failover_host cluster aware and add failover_completed.
""" """
RPC_API_VERSION = '3.7' RPC_API_VERSION = '3.8'
RPC_DEFAULT_VERSION = '3.0' RPC_DEFAULT_VERSION = '3.0'
TOPIC = constants.VOLUME_TOPIC TOPIC = constants.VOLUME_TOPIC
BINARY = 'cinder-volume' BINARY = 'cinder-volume'
@ -310,21 +311,31 @@ class VolumeAPI(rpc.RPCAPI):
new_volume=new_volume, new_volume=new_volume,
volume_status=original_volume_status) volume_status=original_volume_status)
def freeze_host(self, ctxt, host): def freeze_host(self, ctxt, service):
"""Set backend host to frozen.""" """Set backend host to frozen."""
cctxt = self._get_cctxt(host) cctxt = self._get_cctxt(service.service_topic_queue)
return cctxt.call(ctxt, 'freeze_host') return cctxt.call(ctxt, 'freeze_host')
def thaw_host(self, ctxt, host): def thaw_host(self, ctxt, service):
"""Clear the frozen setting on a backend host.""" """Clear the frozen setting on a backend host."""
cctxt = self._get_cctxt(host) cctxt = self._get_cctxt(service.service_topic_queue)
return cctxt.call(ctxt, 'thaw_host') return cctxt.call(ctxt, 'thaw_host')
def failover_host(self, ctxt, host, secondary_backend_id=None): def failover(self, ctxt, service, secondary_backend_id=None):
"""Failover host to the specified backend_id (secondary).""" """Failover host to the specified backend_id (secondary). """
cctxt = self._get_cctxt(host) version = '3.8'
cctxt.cast(ctxt, 'failover_host', method = 'failover'
secondary_backend_id=secondary_backend_id) if not self.client.can_send_version(version):
version = '3.0'
method = 'failover_host'
cctxt = self._get_cctxt(service.service_topic_queue, version)
cctxt.cast(ctxt, method, secondary_backend_id=secondary_backend_id)
def failover_completed(self, ctxt, service, updates):
"""Complete failover on all services of the cluster."""
cctxt = self._get_cctxt(service.service_topic_queue, '3.8',
fanout=True)
cctxt.cast(ctxt, 'failover_completed', updates=updates)
def manage_existing_snapshot(self, ctxt, snapshot, ref, backend): def manage_existing_snapshot(self, ctxt, snapshot, ref, backend):
cctxt = self._get_cctxt(backend) cctxt = self._get_cctxt(backend)