Replication v2.1 (Cheesecake)

This focuses the replication work on a specific use case,
and eliminates some of the ambiguity in earlier versions.

Additionally this implementation addresses needs for
devices that do replication based on the whole backend-device
or on Pools.

Use case:
  DR scenario, where a storage device is rendered inoperable.
  This implementation allows the preservation of user data
  for those volumes that are of type replication-enabled.

  The goal is NOT to make failures completely transparent
  but instead to preserve data access while an Admin tries
  to rebuild/recover his/her cloud.

It's very important to note that we're no longer interested in
dealing with replication in Cinder at a Volume level.  The concept
of have "some" volumes failover, and "others" left behind, proved
to not only be overly complex and difficult to implement, but we
never identified a concrete use-case where one would use failover
in a scenario where some volumes would stay and be accessible on
a primary but other may be moved and accessed via a secondary.

In this model, it's host/backend based.  So when you failover,
you're failing over an entire backend.  We heavily leverage
existing resources, specifically services, and capabilities.

Implements: blueprint replication-update

Change-Id: If862bcd18515098639f94a8294a8e44e1358c52a
This commit is contained in:
John Griffith 2016-02-03 16:11:58 +00:00
parent 0f5e61bf6e
commit 106c14a84b
27 changed files with 573 additions and 1732 deletions

View File

@ -259,85 +259,6 @@ class VolumeAdminController(AdminController):
new_volume, error)
return {'save_volume_id': ret}
@wsgi.action('os-enable_replication')
def _enable_replication(self, req, id, body):
"""Enable/Re-enable replication on replciation capable volume.
Admin only method, used primarily for cases like disable/re-enable
replication process on a replicated volume for maintenance or testing
"""
context = req.environ['cinder.context']
self.authorize(context, 'enable_replication')
try:
volume = self._get(context, id)
except exception.VolumeNotFound as e:
raise exc.HTTPNotFound(explanation=e.msg)
self.volume_api.enable_replication(context, volume)
return webob.Response(status_int=202)
@wsgi.action('os-disable_replication')
def _disable_replication(self, req, id, body):
"""Disable replication on replciation capable volume.
Admin only method, used to instruct a backend to
disable replication process to a replicated volume.
"""
context = req.environ['cinder.context']
self.authorize(context, 'disable_replication')
try:
volume = self._get(context, id)
except exception.VolumeNotFound as e:
raise exc.HTTPNotFound(explanation=e.msg)
self.volume_api.disable_replication(context, volume)
return webob.Response(status_int=202)
@wsgi.action('os-failover_replication')
def _failover_replication(self, req, id, body):
"""Failover a replicating volume to it's secondary
Admin only method, used to force a fail-over to
a replication target. Optional secondary param to
indicate what device to promote in case of multiple
replication targets.
"""
context = req.environ['cinder.context']
self.authorize(context, 'failover_replication')
try:
volume = self._get(context, id)
except exception.VolumeNotFound as e:
raise exc.HTTPNotFound(explanation=e.msg)
secondary = body['os-failover_replication'].get('secondary', None)
self.volume_api.failover_replication(context, volume, secondary)
return webob.Response(status_int=202)
@wsgi.action('os-list_replication_targets')
def _list_replication_targets(self, req, id, body):
"""Show replication targets for the specified host.
Admin only method, used to display configured
replication target devices for the specified volume.
"""
# TODO(jdg): We'll want an equivalent type of command
# to querie a backend host (show configuration for a
# specified backend), but priority here is for
# a volume as it's likely to be more useful.
context = req.environ['cinder.context']
self.authorize(context, 'list_replication_targets')
try:
volume = self._get(context, id)
except exception.VolumeNotFound as e:
raise exc.HTTPNotFound(explanation=e.msg)
# Expected response is a dict is a dict with unkonwn
# keys. Should be of the form:
# {'volume_id': xx, 'replication_targets':[{k: v, k1: v1...}]}
return self.volume_api.list_replication_targets(context, volume)
class SnapshotAdminController(AdminController):
"""AdminController for Snapshots."""

View File

@ -34,6 +34,9 @@ class CapabilitiesController(wsgi.Controller):
_view_builder_class = capabilities_view.ViewBuilder
def __init__(self):
# FIXME(jdg): Is it kosher that this just
# skips the volume.api and goes straight to RPC
# from here?
self.volume_api = rpcapi.VolumeAPI()
super(CapabilitiesController, self).__init__()

View File

@ -27,6 +27,7 @@ from cinder import exception
from cinder.i18n import _
from cinder import objects
from cinder import utils
from cinder import volume
CONF = cfg.CONF
@ -46,6 +47,9 @@ class ServicesIndexTemplate(xmlutil.TemplateBuilder):
elem.set('state')
elem.set('update_at')
elem.set('disabled_reason')
elem.set('replication_status')
elem.set('active_backend_id')
elem.set('frozen')
return xmlutil.MasterTemplate(root, 1)
@ -63,6 +67,9 @@ class ServicesUpdateTemplate(xmlutil.TemplateBuilder):
root.set('binary')
root.set('status')
root.set('disabled_reason')
root.set('replication_status')
root.set('active_backend_id')
root.set('frozen')
return xmlutil.MasterTemplate(root, 1)
@ -71,6 +78,7 @@ class ServiceController(wsgi.Controller):
def __init__(self, ext_mgr=None):
self.ext_mgr = ext_mgr
super(ServiceController, self).__init__()
self.volume_api = volume.API()
@wsgi.serializers(xml=ServicesIndexTemplate)
def index(self, req):
@ -119,6 +127,10 @@ class ServiceController(wsgi.Controller):
'updated_at': updated_at}
if detailed:
ret_fields['disabled_reason'] = svc.disabled_reason
if svc.binary == "cinder-volume":
ret_fields['replication_status'] = svc.replication_status
ret_fields['active_backend_id'] = svc.active_backend_id
ret_fields['frozen'] = svc.frozen
svcs.append(ret_fields)
return {'services': svcs}
@ -133,9 +145,24 @@ class ServiceController(wsgi.Controller):
return True
def _freeze(self, context, host):
return self.volume_api.freeze_host(context, host)
def _thaw(self, context, host):
return self.volume_api.thaw_host(context, host)
def _failover(self, context, host, backend_id=None):
return self.volume_api.failover_host(context, host, backend_id)
@wsgi.serializers(xml=ServicesUpdateTemplate)
def update(self, req, id, body):
"""Enable/Disable scheduling for a service."""
"""Enable/Disable scheduling for a service.
Includes Freeze/Thaw which sends call down to drivers
and allows volume.manager for the specified host to
disable the service rather than accessing the service
directly in this API layer.
"""
context = req.environ['cinder.context']
authorize(context, action='update')
@ -150,6 +177,15 @@ class ServiceController(wsgi.Controller):
(id == "disable-log-reason" and ext_loaded)):
disabled = True
status = "disabled"
elif id == "freeze":
return self._freeze(context, body['host'])
elif id == "thaw":
return self._thaw(context, body['host'])
elif id == "failover_host":
return self._failover(context,
body['host'],
body.get('backend_id',
None))
else:
raise webob.exc.HTTPNotFound(explanation=_("Unknown action"))

View File

@ -1,137 +0,0 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
import six
import webob
from webob import exc
from cinder.api import extensions
from cinder.api.openstack import wsgi
from cinder.api import xmlutil
from cinder import exception
from cinder.i18n import _LI
from cinder import replication as replicationAPI
from cinder import volume
LOG = logging.getLogger(__name__)
authorize = extensions.soft_extension_authorizer('volume',
'volume_replication')
class VolumeReplicationController(wsgi.Controller):
"""The Volume Replication API controller for the OpenStack API."""
def __init__(self, *args, **kwargs):
super(VolumeReplicationController, self).__init__(*args, **kwargs)
self.volume_api = volume.API()
self.replication_api = replicationAPI.API()
def _add_replication_attributes(self, req, resp_volume):
db_volume = req.cached_resource_by_id(resp_volume['id'])
key = "%s:extended_status" % Volume_replication.alias
resp_volume[key] = db_volume['replication_extended_status']
key = "%s:driver_data" % Volume_replication.alias
resp_volume[key] = db_volume['replication_driver_data']
@wsgi.extends
def show(self, req, resp_obj, id):
context = req.environ['cinder.context']
if authorize(context):
resp_obj.attach(xml=VolumeReplicationAttributeTemplate())
self._add_replication_attributes(req, resp_obj.obj['volume'])
@wsgi.extends
def detail(self, req, resp_obj):
context = req.environ['cinder.context']
if authorize(context):
resp_obj.attach(xml=VolumeReplicationListAttributeTemplate())
for vol in list(resp_obj.obj['volumes']):
self._add_replication_attributes(req, vol)
@wsgi.response(202)
@wsgi.action('os-promote-replica')
def promote(self, req, id, body):
context = req.environ['cinder.context']
try:
vol = self.volume_api.get(context, id)
LOG.info(_LI('Attempting to promote secondary replica to primary'
' for volume %s.'),
id,
context=context)
self.replication_api.promote(context, vol)
except exception.VolumeNotFound as error:
raise exc.HTTPNotFound(explanation=error.msg)
except exception.ReplicationError as error:
raise exc.HTTPBadRequest(explanation=six.text_type(error))
return webob.Response(status_int=202)
@wsgi.response(202)
@wsgi.action('os-reenable-replica')
def reenable(self, req, id, body):
context = req.environ['cinder.context']
try:
vol = self.volume_api.get(context, id)
LOG.info(_LI('Attempting to sync secondary replica with primary'
' for volume %s.'),
id,
context=context)
self.replication_api.reenable(context, vol)
except exception.VolumeNotFound as error:
raise exc.HTTPNotFound(explanation=error.msg)
except exception.ReplicationError as error:
raise exc.HTTPBadRequest(explanation=six.text_type(error))
return webob.Response(status_int=202)
class Volume_replication(extensions.ExtensionDescriptor):
"""Volume replication management support."""
name = "VolumeReplication"
alias = "os-volume-replication"
namespace = "http://docs.openstack.org/volume/ext/volume_replication/" + \
"api/v1"
updated = "2014-08-01T00:00:00+00:00"
def get_controller_extensions(self):
controller = VolumeReplicationController()
extension = extensions.ControllerExtension(self, 'volumes', controller)
return [extension]
def make_volume(elem):
elem.set('{%s}extended_status' % Volume_replication.namespace,
'%s:extended_status' % Volume_replication.alias)
elem.set('{%s}driver_data' % Volume_replication.namespace,
'%s:driver_data' % Volume_replication.alias)
class VolumeReplicationAttributeTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('volume', selector='volume')
make_volume(root)
alias = Volume_replication.alias
namespace = Volume_replication.namespace
return xmlutil.SlaveTemplate(root, 1, nsmap={alias: namespace})
class VolumeReplicationListAttributeTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('volumes')
elem = xmlutil.SubTemplateElement(root, 'volume', selector='volumes')
make_volume(elem)
alias = Volume_replication.alias
namespace = Volume_replication.namespace
return xmlutil.SlaveTemplate(root, 1, nsmap={alias: namespace})

View File

@ -37,5 +37,6 @@ class ViewBuilder(common.ViewBuilder):
'display_name': capabilities.get('display_name'),
'description': capabilities.get('description'),
'visibility': capabilities.get('visibility'),
'replication_targets': capabilities.get('replication_targets', []),
'properties': capabilities.get('properties'),
}

View File

@ -0,0 +1,30 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Column
from sqlalchemy import Boolean, MetaData, String, Table
def upgrade(migrate_engine):
"""Add replication info to services table."""
meta = MetaData()
meta.bind = migrate_engine
services = Table('services', meta, autoload=True)
replication_status = Column('replication_status', String(length=36),
default="not-capable")
active_backend_id = Column('active_backend_id', String(length=255))
frozen = Column('frozen', Boolean, default=False)
services.create_column(replication_status)
services.create_column(frozen)
services.create_column(active_backend_id)

View File

@ -15,6 +15,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
SQLAlchemy models for cinder data.
"""
@ -77,6 +78,12 @@ class Service(BASE, CinderBase):
object_current_version = Column(String(36))
object_available_version = Column(String(36))
# replication_status can be: enabled, disabled, not-capable, error,
# failed-over or not-configured
replication_status = Column(String(255), default="not-capable")
active_backend_id = Column(String(255))
frozen = Column(Boolean, nullable=False, default=False)
class ConsistencyGroup(BASE, CinderBase):
"""Represents a consistencygroup."""

View File

@ -701,6 +701,15 @@ class ManageExistingAlreadyManaged(CinderException):
"Volume %(volume_ref)s already managed.")
class InvalidReplicationTarget(Invalid):
message = _("Invalid Replication Target: %(reason)s")
class UnableToFailOver(CinderException):
message = _("Unable to failover to replication target:"
"%(reason)s).")
class ReplicationError(CinderException):
message = _("Volume %(volume_id)s replication "
"error: %(reason)s")

View File

@ -99,6 +99,7 @@ OBJ_VERSIONS.add('1.0', {'Backup': '1.3', 'BackupImport': '1.3',
'Volume': '1.3', 'VolumeTypeList': '1.1'})
OBJ_VERSIONS.add('1.1', {'Service': '1.2', 'ServiceList': '1.1'})
OBJ_VERSIONS.add('1.2', {'Backup': '1.4', 'BackupImport': '1.4'})
OBJ_VERSIONS.add('1.3', {'Service': '1.3'})
class CinderObjectRegistry(base.VersionedObjectRegistry):

View File

@ -63,3 +63,24 @@ class ConsistencyGroupStatus(Enum):
class ConsistencyGroupStatusField(BaseEnumField):
AUTO_TYPE = ConsistencyGroupStatus()
class ReplicationStatus(Enum):
ERROR = 'error'
ENABLED = 'enabled'
DISABLED = 'disabled'
NOT_CAPABLE = 'not-capable'
FAILING_OVER = 'failing-over'
FAILOVER_ERROR = 'failover-error'
FAILED_OVER = 'failed-over'
ALL = (ERROR, ENABLED, DISABLED, NOT_CAPABLE, FAILOVER_ERROR, FAILING_OVER,
FAILED_OVER)
def __init__(self):
super(ReplicationStatus, self).__init__(
valid_values=ReplicationStatus.ALL)
class ReplicationStatusField(BaseEnumField):
AUTO_TYPE = ReplicationStatus()

View File

@ -21,6 +21,7 @@ from cinder import exception
from cinder.i18n import _
from cinder import objects
from cinder.objects import base
from cinder.objects import fields as c_fields
from cinder import utils
LOG = logging.getLogger(__name__)
@ -33,7 +34,8 @@ class Service(base.CinderPersistentObject, base.CinderObject,
# Version 1.0: Initial version
# Version 1.1: Add rpc_current_version and object_current_version fields
# Version 1.2: Add get_minimum_rpc_version() and get_minimum_obj_version()
VERSION = '1.2'
# Version 1.3: Add replication fields
VERSION = '1.3'
fields = {
'id': fields.IntegerField(),
@ -49,6 +51,11 @@ class Service(base.CinderPersistentObject, base.CinderObject,
'modified_at': fields.DateTimeField(nullable=True),
'rpc_current_version': fields.StringField(nullable=True),
'object_current_version': fields.StringField(nullable=True),
# Replication properties
'replication_status': c_fields.ReplicationStatusField(nullable=True),
'frozen': fields.BooleanField(default=False),
'active_backend_id': fields.StringField(nullable=True),
}
def obj_make_compatible(self, primitive, target_version):

View File

@ -31,6 +31,7 @@ def rpcapi_get_capabilities(self, context, host, discover):
display_name='Capabilities of Cinder LVM driver',
description='These are volume type options provided by '
'Cinder LVM driver, blah, blah.',
replication_targets=[],
visibility='public',
properties = dict(
compression = dict(
@ -79,6 +80,7 @@ class CapabilitiesAPITest(test.TestCase):
'description': 'These are volume type options provided by '
'Cinder LVM driver, blah, blah.',
'visibility': 'public',
'replication_targets': [],
'properties': {
'compression': {
'title': 'Compression',

View File

@ -249,6 +249,9 @@ class ServicesTest(test.TestCase):
2012, 10, 29, 13, 42, 2),
'disabled_reason': 'test1'},
{'binary': 'cinder-volume',
'replication_status': None,
'active_backend_id': None,
'frozen': False,
'host': 'host1', 'zone': 'cinder',
'status': 'disabled', 'state': 'up',
'updated_at': datetime.datetime(
@ -262,6 +265,9 @@ class ServicesTest(test.TestCase):
2012, 9, 19, 6, 55, 34),
'disabled_reason': ''},
{'binary': 'cinder-volume',
'replication_status': None,
'active_backend_id': None,
'frozen': False,
'host': 'host2',
'zone': 'cinder',
'status': 'disabled', 'state': 'down',
@ -269,6 +275,9 @@ class ServicesTest(test.TestCase):
2012, 9, 18, 8, 3, 38),
'disabled_reason': 'test4'},
{'binary': 'cinder-volume',
'replication_status': None,
'active_backend_id': None,
'frozen': False,
'host': 'host2',
'zone': 'cinder',
'status': 'disabled', 'state': 'down',
@ -276,6 +285,9 @@ class ServicesTest(test.TestCase):
2012, 10, 29, 13, 42, 5),
'disabled_reason': 'test5'},
{'binary': 'cinder-volume',
'replication_status': None,
'active_backend_id': None,
'frozen': False,
'host': 'host2',
'zone': 'cinder',
'status': 'enabled', 'state': 'down',
@ -325,6 +337,9 @@ class ServicesTest(test.TestCase):
29, 13, 42, 2),
'disabled_reason': 'test1'},
{'binary': 'cinder-volume',
'frozen': False,
'replication_status': None,
'active_backend_id': None,
'host': 'host1',
'zone': 'cinder',
'status': 'disabled', 'state': 'up',
@ -376,6 +391,9 @@ class ServicesTest(test.TestCase):
response = {'services': [
{'binary': 'cinder-volume',
'replication_status': None,
'active_backend_id': None,
'frozen': False,
'host': 'host1',
'zone': 'cinder',
'status': 'disabled',
@ -384,6 +402,9 @@ class ServicesTest(test.TestCase):
13, 42, 5),
'disabled_reason': 'test2'},
{'binary': 'cinder-volume',
'replication_status': None,
'active_backend_id': None,
'frozen': False,
'host': 'host2',
'zone': 'cinder',
'status': 'disabled',
@ -392,6 +413,9 @@ class ServicesTest(test.TestCase):
8, 3, 38),
'disabled_reason': 'test4'},
{'binary': 'cinder-volume',
'replication_status': None,
'active_backend_id': None,
'frozen': False,
'host': 'host2',
'zone': 'cinder',
'status': 'disabled',
@ -400,6 +424,9 @@ class ServicesTest(test.TestCase):
13, 42, 5),
'disabled_reason': 'test5'},
{'binary': 'cinder-volume',
'replication_status': None,
'active_backend_id': None,
'frozen': False,
'host': 'host2',
'zone': 'cinder',
'status': 'enabled',
@ -452,34 +479,46 @@ class ServicesTest(test.TestCase):
response = {'services': [
{'binary': 'cinder-volume',
'replication_status': None,
'active_backend_id': None,
'host': 'host1',
'zone': 'cinder',
'status': 'disabled',
'state': 'up',
'frozen': False,
'updated_at': datetime.datetime(2012, 10, 29,
13, 42, 5),
'disabled_reason': 'test2'},
{'binary': 'cinder-volume',
'replication_status': None,
'active_backend_id': None,
'host': 'host2',
'zone': 'cinder',
'status': 'disabled',
'state': 'down',
'frozen': False,
'updated_at': datetime.datetime(2012, 9, 18,
8, 3, 38),
'disabled_reason': 'test4'},
{'binary': 'cinder-volume',
'replication_status': None,
'active_backend_id': None,
'host': 'host2',
'zone': 'cinder',
'status': 'disabled',
'state': 'down',
'frozen': False,
'updated_at': datetime.datetime(2012, 10, 29,
13, 42, 5),
'disabled_reason': 'test5'},
{'binary': 'cinder-volume',
'replication_status': None,
'active_backend_id': None,
'host': 'host2',
'zone': 'cinder',
'status': 'enabled',
'state': 'down',
'frozen': False,
'updated_at': datetime.datetime(2012, 9, 18,
8, 3, 38),
'disabled_reason': ''}]}
@ -507,13 +546,16 @@ class ServicesTest(test.TestCase):
response = {'services': [
{'binary': 'cinder-volume',
'replication_status': None,
'active_backend_id': None,
'host': 'host1',
'zone': 'cinder',
'status': 'disabled',
'state': 'up',
'updated_at': datetime.datetime(2012, 10, 29,
13, 42, 5),
'disabled_reason': 'test2'}]}
'disabled_reason': 'test2',
'frozen': False}]}
self.assertEqual(response, res_dict)
def test_services_list_with_host_binary(self):
@ -538,6 +580,9 @@ class ServicesTest(test.TestCase):
response = {'services': [
{'binary': 'cinder-volume',
'replication_status': None,
'active_backend_id': None,
'frozen': False,
'host': 'host1',
'zone': 'cinder',
'status': 'disabled',

View File

@ -1,248 +0,0 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests for volume replication API code.
"""
import mock
from oslo_config import cfg
from oslo_serialization import jsonutils
import six
import webob
from cinder import context
from cinder import test
from cinder.tests.unit.api import fakes
from cinder.tests.unit import utils as tests_utils
CONF = cfg.CONF
def app():
# no auth, just let environ['cinder.context'] pass through
api = fakes.router.APIRouter()
mapper = fakes.urlmap.URLMap()
mapper['/v2'] = api
return mapper
class VolumeReplicationAPITestCase(test.TestCase):
"""Test Cases for replication API."""
def setUp(self):
super(VolumeReplicationAPITestCase, self).setUp()
self.ctxt = context.RequestContext('admin', 'fake', True)
self.volume_params = {
'host': CONF.host,
'size': 1}
def _get_resp(self, operation, volume_id, xml=False):
"""Helper for a replication action req for the specified volume_id."""
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume_id)
req.method = 'POST'
if xml:
body = '<os-%s-replica/>' % operation
req.headers['Content-Type'] = 'application/xml'
req.headers['Accept'] = 'application/xml'
if isinstance(body, six.text_type):
body = body.encode('utf-8')
req.body = body
else:
body = {'os-%s-replica' % operation: ''}
req.headers['Content-Type'] = 'application/json'
req.body = jsonutils.dump_as_bytes(body)
req.environ['cinder.context'] = context.RequestContext('admin',
'fake',
True)
res = req.get_response(app())
return req, res
def test_promote_bad_id(self):
(req, res) = self._get_resp('promote', 'fake')
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(404, res.status_int, msg)
def test_promote_bad_id_xml(self):
(req, res) = self._get_resp('promote', 'fake', xml=True)
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(404, res.status_int, msg)
def test_promote_volume_not_replicated(self):
volume = tests_utils.create_volume(
self.ctxt,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'])
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(400, res.status_int, msg)
def test_promote_volume_not_replicated_xml(self):
volume = tests_utils.create_volume(
self.ctxt,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'], xml=True)
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(400, res.status_int, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
def test_promote_replication_volume_status(self,
_rpcapi_promote):
for status in ['error', 'in-use']:
volume = tests_utils.create_volume(self.ctxt,
status = status,
replication_status = 'active',
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'])
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(400, res.status_int, msg)
for status in ['available']:
volume = tests_utils.create_volume(self.ctxt,
status = status,
replication_status = 'active',
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'])
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(202, res.status_int, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
def test_promote_replication_volume_status_xml(self,
_rpcapi_promote):
for status in ['error', 'in-use']:
volume = tests_utils.create_volume(self.ctxt,
status = status,
replication_status = 'active',
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'], xml=True)
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(400, res.status_int, msg)
for status in ['available']:
volume = tests_utils.create_volume(self.ctxt,
status = status,
replication_status = 'active',
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'], xml=True)
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(202, res.status_int, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
def test_promote_replication_replication_status(self,
_rpcapi_promote):
for status in ['error', 'copying', 'inactive']:
volume = tests_utils.create_volume(self.ctxt,
status = 'available',
replication_status = status,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'])
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(400, res.status_int, msg)
for status in ['active', 'active-stopped']:
volume = tests_utils.create_volume(self.ctxt,
status = 'available',
replication_status = status,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'])
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(202, res.status_int, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
def test_promote_replication_replication_status_xml(self,
_rpcapi_promote):
for status in ['error', 'copying', 'inactive']:
volume = tests_utils.create_volume(self.ctxt,
status = 'available',
replication_status = status,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'], xml=True)
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(400, res.status_int, msg)
for status in ['active', 'active-stopped']:
volume = tests_utils.create_volume(self.ctxt,
status = 'available',
replication_status = status,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'], xml=True)
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(202, res.status_int, msg)
def test_reenable_bad_id(self):
(req, res) = self._get_resp('reenable', 'fake')
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(404, res.status_int, msg)
def test_reenable_bad_id_xml(self):
(req, res) = self._get_resp('reenable', 'fake', xml=True)
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(404, res.status_int, msg)
def test_reenable_volume_not_replicated(self):
volume = tests_utils.create_volume(
self.ctxt,
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'])
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(400, res.status_int, msg)
def test_reenable_volume_not_replicated_xml(self):
volume = tests_utils.create_volume(
self.ctxt,
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'], xml=True)
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(400, res.status_int, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.reenable_replication')
def test_reenable_replication_replication_status(self,
_rpcapi_promote):
for status in ['active', 'copying']:
volume = tests_utils.create_volume(self.ctxt,
status = 'available',
replication_status = status,
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'])
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(400, res.status_int, msg)
for status in ['inactive', 'active-stopped', 'error']:
volume = tests_utils.create_volume(self.ctxt,
status = 'available',
replication_status = status,
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'])
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(202, res.status_int, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.reenable_replication')
def test_reenable_replication_replication_status_xml(self,
_rpcapi_promote):
for status in ['active', 'copying']:
volume = tests_utils.create_volume(self.ctxt,
status = 'available',
replication_status = status,
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'], xml=True)
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(400, res.status_int, msg)
for status in ['inactive', 'active-stopped', 'error']:
volume = tests_utils.create_volume(self.ctxt,
status = 'available',
replication_status = status,
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'], xml=True)
msg = ("request: %r\nresult: %r" % (req, res))
self.assertEqual(202, res.status_int, msg)

View File

@ -28,7 +28,7 @@ object_data = {
'CGSnapshotList': '1.0-e8c3f4078cd0ee23487b34d173eec776',
'ConsistencyGroup': '1.2-ed7f90a6871991a19af716ade7337fc9',
'ConsistencyGroupList': '1.1-73916823b697dfa0c7f02508d87e0f28',
'Service': '1.2-4d3dd6c9906da364739fbf3f90c80505',
'Service': '1.3-e8f82835bd43722d8d84c55072466eba',
'ServiceList': '1.1-cb758b200f0a3a90efabfc5aa2ffb627',
'Snapshot': '1.0-a6c33eefeadefb324d79f72f66c54e9a',
'SnapshotList': '1.0-71661e7180ef6cc51501704a9bea4bf1',

View File

@ -730,6 +730,15 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
self.assertIsInstance(backups.c.restore_volume_id.type,
self.VARCHAR_TYPE)
def _check_065(self, engine, data):
services = db_utils.get_table(engine, 'services')
self.assertIsInstance(services.c.replication_status.type,
self.VARCHAR_TYPE)
self.assertIsInstance(services.c.frozen.type,
self.BOOL_TYPE)
self.assertIsInstance(services.c.active_backend_id.type,
self.VARCHAR_TYPE)
def test_walk_versions(self):
self.walk_versions(False, False)

View File

@ -1,295 +0,0 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests for Volume replication code.
"""
import mock
from oslo_config import cfg
from oslo_utils import importutils
from cinder import context
from cinder import db
from cinder import exception
from cinder import test
from cinder.tests.unit import utils as test_utils
from cinder.volume import driver
CONF = cfg.CONF
class VolumeReplicationTestCaseBase(test.TestCase):
def setUp(self):
super(VolumeReplicationTestCaseBase, self).setUp()
self.ctxt = context.RequestContext('user', 'fake', False)
self.adm_ctxt = context.RequestContext('admin', 'fake', True)
self.manager = importutils.import_object(CONF.volume_manager)
self.manager.host = 'test_host'
self.manager.stats = {'allocated_capacity_gb': 0}
self.driver_patcher = mock.patch.object(self.manager, 'driver',
spec=driver.VolumeDriver)
self.driver = self.driver_patcher.start()
class VolumeReplicationTestCase(VolumeReplicationTestCaseBase):
@mock.patch('cinder.utils.require_driver_initialized')
def test_promote_replica_uninit_driver(self, _init):
"""Test promote replication when driver is not initialized."""
_init.side_effect = exception.DriverNotInitialized
vol = test_utils.create_volume(self.ctxt,
status='available',
replication_status='active')
self.driver.promote_replica.return_value = None
self.assertRaises(exception.DriverNotInitialized,
self.manager.promote_replica,
self.adm_ctxt,
vol['id'])
def test_promote_replica(self):
"""Test promote replication."""
vol = test_utils.create_volume(self.ctxt,
status='available',
replication_status='active')
self.driver.promote_replica.return_value = \
{'replication_status': 'inactive'}
self.manager.promote_replica(self.adm_ctxt, vol['id'])
vol_after = db.volume_get(self.ctxt, vol['id'])
self.assertEqual('inactive', vol_after['replication_status'])
def test_promote_replica_fail(self):
"""Test promote replication when promote fails."""
vol = test_utils.create_volume(self.ctxt,
status='available',
replication_status='active')
self.driver.promote_replica.side_effect = exception.CinderException
self.assertRaises(exception.CinderException,
self.manager.promote_replica,
self.adm_ctxt,
vol['id'])
def test_reenable_replication(self):
"""Test reenable replication."""
vol = test_utils.create_volume(self.ctxt,
status='available',
replication_status='error')
self.driver.reenable_replication.return_value = \
{'replication_status': 'copying'}
self.manager.reenable_replication(self.adm_ctxt, vol['id'])
vol_after = db.volume_get(self.ctxt, vol['id'])
self.assertEqual('copying', vol_after['replication_status'])
@mock.patch('cinder.utils.require_driver_initialized')
def test_reenable_replication_uninit_driver(self, _init):
"""Test reenable replication when driver is not initialized."""
_init.side_effect = exception.DriverNotInitialized
vol = test_utils.create_volume(self.ctxt,
status='available',
replication_status='error')
self.assertRaises(exception.DriverNotInitialized,
self.manager.reenable_replication,
self.adm_ctxt,
vol['id'])
def test_reenable_replication_fail(self):
"""Test promote replication when driver is not initialized."""
vol = test_utils.create_volume(self.ctxt,
status='available',
replication_status='error')
self.driver.reenable_replication.side_effect = \
exception.CinderException
self.assertRaises(exception.CinderException,
self.manager.reenable_replication,
self.adm_ctxt,
vol['id'])
class VolumeManagerReplicationV2Tests(VolumeReplicationTestCaseBase):
mock_driver = None
mock_db = None
vol = None
def setUp(self):
super(VolumeManagerReplicationV2Tests, self).setUp()
self.mock_db = mock.Mock()
self.mock_driver = mock.Mock()
self.vol = test_utils.create_volume(self.ctxt,
status='available',
replication_status='enabling')
self.manager.driver = self.mock_driver
self.mock_driver.replication_enable.return_value = \
{'replication_status': 'enabled'}
self.mock_driver.replication_disable.return_value = \
{'replication_status': 'disabled'}
self.manager.db = self.mock_db
self.mock_db.volume_get.return_value = self.vol
self.mock_db.volume_update.return_value = self.vol
# enable_replication tests
@mock.patch('cinder.utils.require_driver_initialized')
def test_enable_replication_uninitialized_driver(self,
mock_require_driver_init):
mock_require_driver_init.side_effect = exception.DriverNotInitialized
self.assertRaises(exception.DriverNotInitialized,
self.manager.enable_replication,
self.ctxt,
self.vol)
def test_enable_replication_error_state(self):
self.vol['replication_status'] = 'error'
self.assertRaises(exception.InvalidVolume,
self.manager.enable_replication,
self.ctxt,
self.vol)
def test_enable_replication_driver_raises_cinder_exception(self):
self.mock_driver.replication_enable.side_effect = \
exception.CinderException
self.assertRaises(exception.VolumeBackendAPIException,
self.manager.enable_replication,
self.ctxt,
self.vol)
self.mock_db.volume_update.assert_called_with(
self.ctxt,
self.vol['id'],
{'replication_status': 'error'})
def test_enable_replication_driver_raises_exception(self):
self.mock_driver.replication_enable.side_effect = Exception
self.assertRaises(exception.VolumeBackendAPIException,
self.manager.enable_replication,
self.ctxt,
self.vol)
self.mock_db.volume_update.assert_called_with(
self.ctxt,
self.vol['id'],
{'replication_status': 'error'})
def test_enable_replication_success(self):
self.manager.enable_replication(self.ctxt, self.vol)
# volume_update is called multiple times
self.mock_db.volume_update.side_effect = [self.vol, self.vol]
self.mock_db.volume_update.assert_called_with(
self.ctxt,
self.vol['id'],
{'replication_status': 'enabled'})
# disable_replication tests
@mock.patch('cinder.utils.require_driver_initialized')
def test_disable_replication_uninitialized_driver(self,
mock_req_driver_init):
mock_req_driver_init.side_effect = exception.DriverNotInitialized
self.assertRaises(exception.DriverNotInitialized,
self.manager.disable_replication,
self.ctxt,
self.vol)
def test_disable_replication_error_state(self):
self.vol['replication_status'] = 'error'
self.assertRaises(exception.InvalidVolume,
self.manager.disable_replication,
self.ctxt,
self.vol)
def test_disable_replication_driver_raises_cinder_exception(self):
self.vol['replication_status'] = 'disabling'
self.mock_driver.replication_disable.side_effect = \
exception.CinderException
self.assertRaises(exception.VolumeBackendAPIException,
self.manager.disable_replication,
self.ctxt,
self.vol)
self.mock_db.volume_update.assert_called_with(
self.ctxt,
self.vol['id'],
{'replication_status': 'error'})
def test_disable_replication_driver_raises_exception(self):
self.vol['replication_status'] = 'disabling'
self.mock_driver.replication_disable.side_effect = Exception
self.assertRaises(exception.VolumeBackendAPIException,
self.manager.disable_replication,
self.ctxt,
self.vol)
self.mock_db.volume_update.assert_called_with(
self.ctxt,
self.vol['id'],
{'replication_status': 'error'})
def test_disable_replication_success(self):
self.vol['replication_status'] = 'disabling'
self.manager.disable_replication(self.ctxt, self.vol)
# volume_update is called multiple times
self.mock_db.volume_update.side_effect = [self.vol, self.vol]
self.mock_db.volume_update.assert_called_with(
self.ctxt,
self.vol['id'],
{'replication_status': 'disabled'})
# failover_replication tests
@mock.patch('cinder.utils.require_driver_initialized')
def test_failover_replication_uninitialized_driver(self,
mock_driver_init):
self.vol['replication_status'] = 'enabling_secondary'
# validate that driver is called even if uninitialized
mock_driver_init.side_effect = exception.DriverNotInitialized
self.manager.failover_replication(self.ctxt, self.vol)
# volume_update is called multiple times
self.mock_db.volume_update.side_effect = [self.vol, self.vol]
self.mock_db.volume_update.assert_called_with(
self.ctxt,
self.vol['id'],
{'replication_status': 'failed-over'})
def test_failover_replication_error_state(self):
self.vol['replication_status'] = 'error'
self.assertRaises(exception.InvalidVolume,
self.manager.failover_replication,
self.ctxt,
self.vol)
def test_failover_replication_driver_raises_cinder_exception(self):
self.vol['replication_status'] = 'enabling_secondary'
self.mock_driver.replication_failover.side_effect = \
exception.CinderException
self.assertRaises(exception.VolumeBackendAPIException,
self.manager.failover_replication,
self.ctxt,
self.vol)
self.mock_db.volume_update.assert_called_with(
self.ctxt,
self.vol['id'],
{'replication_status': 'error'})
def test_failover_replication_driver_raises_exception(self):
self.vol['replication_status'] = 'enabling_secondary'
self.mock_driver.replication_failover.side_effect = Exception
self.assertRaises(exception.VolumeBackendAPIException,
self.manager.failover_replication,
self.ctxt,
self.vol)
self.mock_db.volume_update.assert_called_with(
self.ctxt,
self.vol['id'],
{'replication_status': 'error'})
def test_failover_replication_success(self):
self.vol['replication_status'] = 'enabling_secondary'
self.manager.failover_replication(self.ctxt, self.vol)
# volume_update is called multiple times
self.mock_db.volume_update.side_effect = [self.vol, self.vol]
self.mock_db.volume_update.assert_called_with(
self.ctxt,
self.vol['id'],
{'replication_status': 'failed-over'})

View File

@ -356,24 +356,6 @@ class VolumeTestCase(BaseVolumeTestCase):
self.volume.delete_volume(self.context, vol3['id'])
self.volume.delete_volume(self.context, vol4['id'])
@mock.patch.object(vol_manager.VolumeManager, 'add_periodic_task')
def test_init_host_repl_enabled_periodic_task(self, mock_add_p_task):
manager = vol_manager.VolumeManager()
with mock.patch.object(manager.driver,
'get_volume_stats') as m_get_stats:
m_get_stats.return_value = {'replication': True}
manager.init_host()
mock_add_p_task.assert_called_once_with(mock.ANY)
@mock.patch.object(vol_manager.VolumeManager, 'add_periodic_task')
def test_init_host_repl_disabled_periodic_task(self, mock_add_p_task):
manager = vol_manager.VolumeManager()
with mock.patch.object(manager.driver,
'get_volume_stats') as m_get_stats:
m_get_stats.return_value = {'replication': False}
manager.init_host()
self.assertEqual(0, mock_add_p_task.call_count)
@mock.patch('cinder.rpc.LAST_RPC_VERSIONS', {'cinder-scheduler': '1.3'})
@mock.patch('cinder.rpc.LAST_OBJ_VERSIONS', {'cinder-scheduler': '1.5'})
def test_reset(self):
@ -1027,49 +1009,6 @@ class VolumeTestCase(BaseVolumeTestCase):
self.assertRaises(exception.CinderException,
vol_manager.VolumeManager)
@mock.patch.object(db, 'volume_get_all_by_host')
def test_update_replication_rel_status(self, m_get_by_host):
m_get_by_host.return_value = [mock.sentinel.vol]
ctxt = context.get_admin_context()
manager = vol_manager.VolumeManager()
with mock.patch.object(manager.driver,
'get_replication_status') as m_get_rep_status:
m_get_rep_status.return_value = None
manager._update_replication_relationship_status(ctxt)
m_get_rep_status.assert_called_once_with(ctxt, mock.sentinel.vol)
exp_filters = {
'replication_status':
['active', 'copying', 'error', 'active-stopped', 'inactive']}
m_get_by_host.assert_called_once_with(ctxt, manager.host,
filters=exp_filters)
@mock.patch.object(db, 'volume_get_all_by_host',
mock.Mock(return_value=[{'id': 'foo'}]))
@mock.patch.object(db, 'volume_update')
def test_update_replication_rel_status_update_vol(self, mock_update):
"""Volume is updated with replication update data."""
ctxt = context.get_admin_context()
manager = vol_manager.VolumeManager()
with mock.patch.object(manager.driver,
'get_replication_status') as m_get_rep_status:
m_get_rep_status.return_value = mock.sentinel.model_update
manager._update_replication_relationship_status(ctxt)
mock_update.assert_called_once_with(ctxt, 'foo',
mock.sentinel.model_update)
@mock.patch.object(db, 'volume_get_all_by_host',
mock.Mock(return_value=[{'id': 'foo'}]))
def test_update_replication_rel_status_with_repl_support_exc(self):
"""Exception handled when raised getting replication status."""
ctxt = context.get_admin_context()
manager = vol_manager.VolumeManager()
manager.driver._initialized = True
manager.driver._stats['replication'] = True
with mock.patch.object(manager.driver,
'get_replication_status') as m_get_rep_status:
m_get_rep_status.side_effect = Exception()
manager._update_replication_relationship_status(ctxt)
def test_delete_busy_volume(self):
"""Test volume survives deletion if driver reports it as busy."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
@ -6534,118 +6473,6 @@ class GenericVolumeDriverTestCase(DriverTestCase):
backup_obj = objects.Backup.get_by_id(self.context, backup.id)
self.assertEqual(temp_vol.id, backup_obj.temp_volume_id)
def test_enable_replication_invalid_state(self):
volume_api = cinder.volume.api.API()
ctxt = context.get_admin_context()
volume = tests_utils.create_volume(ctxt,
size=1,
host=CONF.host,
replication_status='enabled')
self.assertRaises(exception.InvalidVolume,
volume_api.enable_replication,
ctxt, volume)
def test_enable_replication_invalid_type(self):
volume_api = cinder.volume.api.API()
ctxt = context.get_admin_context()
volume = tests_utils.create_volume(self.context,
size=1,
host=CONF.host,
replication_status='disabled')
volume['volume_type_id'] = 'dab02f01-b50f-4ed6-8d42-2b5b9680996e'
fake_specs = {}
with mock.patch.object(volume_types,
'get_volume_type_extra_specs',
return_value = fake_specs):
self.assertRaises(exception.InvalidVolume,
volume_api.enable_replication,
ctxt,
volume)
def test_enable_replication(self):
volume_api = cinder.volume.api.API()
ctxt = context.get_admin_context()
volume = tests_utils.create_volume(self.context,
size=1,
host=CONF.host,
replication_status='disabled')
volume['volume_type_id'] = 'dab02f01-b50f-4ed6-8d42-2b5b9680996e'
fake_specs = {'replication_enabled': '<is> True'}
with mock.patch.object(volume_rpcapi.VolumeAPI,
'enable_replication') as mock_enable_rep,\
mock.patch.object(volume_types,
'get_volume_type_extra_specs',
return_value = fake_specs):
volume_api.enable_replication(ctxt, volume)
self.assertTrue(mock_enable_rep.called)
def test_enable_replication_driver_initialized(self):
volume = tests_utils.create_volume(self.context,
size=1,
host=CONF.host,
replication_status='enabling')
# set initialized to False
self.volume.driver._initialized = False
# start test
self.assertRaises(exception.DriverNotInitialized,
self.volume.enable_replication,
self.context,
volume)
def test_disable_replication_invalid_state(self):
volume_api = cinder.volume.api.API()
ctxt = context.get_admin_context()
volume = tests_utils.create_volume(ctxt,
size=1,
host=CONF.host,
replication_status='invalid-state')
self.assertRaises(exception.InvalidVolume,
volume_api.disable_replication,
ctxt, volume)
def test_disable_replication(self):
volume_api = cinder.volume.api.API()
ctxt = context.get_admin_context()
volume = tests_utils.create_volume(self.context,
size=1,
host=CONF.host,
replication_status='disabled')
volume['volume_type_id'] = 'dab02f01-b50f-4ed6-8d42-2b5b9680996e'
fake_specs = {'replication_enabled': '<is> True'}
with mock.patch.object(volume_rpcapi.VolumeAPI,
'disable_replication') as mock_disable_rep,\
mock.patch.object(volume_types,
'get_volume_type_extra_specs',
return_value = fake_specs):
volume_api.disable_replication(ctxt, volume)
self.assertTrue(mock_disable_rep.called)
volume['replication_status'] = 'enabled'
volume_api.disable_replication(ctxt, volume)
self.assertTrue(mock_disable_rep.called)
def test_disable_replication_driver_initialized(self):
volume = tests_utils.create_volume(self.context,
size=1,
host=CONF.host,
replication_status='disabling')
# set initialized to False
self.volume.driver._initialized = False
# start test
self.assertRaises(exception.DriverNotInitialized,
self.volume.disable_replication,
self.context,
volume)
@mock.patch.object(utils, 'brick_get_connector_properties')
@mock.patch.object(cinder.volume.driver.VolumeDriver, '_attach_volume')
@mock.patch.object(cinder.volume.driver.VolumeDriver, '_detach_volume')

View File

@ -75,88 +75,6 @@ class NotifyUsageTestCase(test.TestCase):
'volume.test_suffix',
mock_usage.return_value)
@mock.patch('cinder.volume.utils._usage_from_volume')
@mock.patch('cinder.volume.utils.CONF')
@mock.patch('cinder.volume.utils.rpc')
def test_notify_about_replication_usage(self, mock_rpc,
mock_conf, mock_usage):
mock_conf.host = 'host1'
output = volume_utils.notify_about_replication_usage(
mock.sentinel.context,
mock.sentinel.volume,
'test_suffix')
self.assertIsNone(output)
mock_usage.assert_called_once_with(mock.sentinel.context,
mock.sentinel.volume)
mock_rpc.get_notifier.assert_called_once_with('replication', 'host1')
mock_rpc.get_notifier.return_value.info.assert_called_once_with(
mock.sentinel.context,
'replication.test_suffix',
mock_usage.return_value)
@mock.patch('cinder.volume.utils._usage_from_volume')
@mock.patch('cinder.volume.utils.CONF')
@mock.patch('cinder.volume.utils.rpc')
def test_notify_about_replication_usage_with_kwargs(self, mock_rpc,
mock_conf, mock_usage):
mock_conf.host = 'host1'
output = volume_utils.notify_about_replication_usage(
mock.sentinel.context,
mock.sentinel.volume,
'test_suffix',
extra_usage_info={'a': 'b', 'c': 'd'},
host='host2')
self.assertIsNone(output)
mock_usage.assert_called_once_with(mock.sentinel.context,
mock.sentinel.volume,
a='b', c='d')
mock_rpc.get_notifier.assert_called_once_with('replication', 'host2')
mock_rpc.get_notifier.return_value.info.assert_called_once_with(
mock.sentinel.context,
'replication.test_suffix',
mock_usage.return_value)
@mock.patch('cinder.volume.utils._usage_from_volume')
@mock.patch('cinder.volume.utils.CONF')
@mock.patch('cinder.volume.utils.rpc')
def test_notify_about_replication_error(self, mock_rpc,
mock_conf, mock_usage):
mock_conf.host = 'host1'
output = volume_utils.notify_about_replication_error(
mock.sentinel.context,
mock.sentinel.volume,
'test_suffix')
self.assertIsNone(output)
mock_usage.assert_called_once_with(mock.sentinel.context,
mock.sentinel.volume)
mock_rpc.get_notifier.assert_called_once_with('replication', 'host1')
mock_rpc.get_notifier.return_value.error.assert_called_once_with(
mock.sentinel.context,
'replication.test_suffix',
mock_usage.return_value)
@mock.patch('cinder.volume.utils._usage_from_volume')
@mock.patch('cinder.volume.utils.CONF')
@mock.patch('cinder.volume.utils.rpc')
def test_notify_about_replication_error_with_kwargs(self, mock_rpc,
mock_conf, mock_usage):
mock_conf.host = 'host1'
output = volume_utils.notify_about_replication_error(
mock.sentinel.context,
mock.sentinel.volume,
'test_suffix',
extra_error_info={'a': 'b', 'c': 'd'},
host='host2')
self.assertIsNone(output)
mock_usage.assert_called_once_with(mock.sentinel.context,
mock.sentinel.volume,
a='b', c='d')
mock_rpc.get_notifier.assert_called_once_with('replication', 'host2')
mock_rpc.get_notifier.return_value.error.assert_called_once_with(
mock.sentinel.context,
'replication.test_suffix',
mock_usage.return_value)
@mock.patch('cinder.volume.utils._usage_from_snapshot')
@mock.patch('cinder.volume.utils.CONF')
@mock.patch('cinder.volume.utils.rpc')

View File

@ -283,9 +283,9 @@ def last_completed_audit_period(unit=None):
def list_of_dicts_to_dict(seq, key):
"""Convert list of dicts to an indexted dict.
"""Convert list of dicts to an indexed dict.
Takes a list of dicts, and converts it a nested dict
Takes a list of dicts, and converts it to a nested dict
indexed by <key>
:param seq: list of dicts

View File

@ -40,6 +40,7 @@ from cinder.image import glance
from cinder import keymgr
from cinder import objects
from cinder.objects import base as objects_base
from cinder.objects import fields
import cinder.policy
from cinder import quota
from cinder import quota_utils
@ -117,29 +118,6 @@ def check_policy(context, action, target_obj=None):
cinder.policy.enforce(context, _action, target)
def valid_replication_volume(func):
"""Check that the volume is capable of replication.
This decorator requires the first 3 args of the wrapped function
to be (self, context, volume)
"""
@functools.wraps(func)
def wrapped(self, context, volume, *args, **kwargs):
rep_capable = False
if volume.get('volume_type_id', None):
extra_specs = volume_types.get_volume_type_extra_specs(
volume.get('volume_type_id'))
rep_capable = extra_specs.get('replication_enabled',
False) == "<is> True"
if not rep_capable:
msg = _("Volume is not a replication enabled volume, "
"replication operations can only be performed "
"on volumes that are of type replication_enabled.")
raise exception.InvalidVolume(reason=msg)
return func(self, context, volume, *args, **kwargs)
return wrapped
class API(base.Base):
"""API for interacting with the volume manager."""
@ -1579,123 +1557,72 @@ class API(base.Base):
ref, host)
return snapshot_object
# Replication V2 methods ##
# NOTE(jdg): It might be kinda silly to propagate the named
# args with defaults all the way down through rpc into manager
# but for now the consistency is useful, and there may be
# some usefulness in the future (direct calls in manager?)
# NOTE(jdg): Relying solely on the volume-type quota mechanism
# need to consider looking at how we handle configured backends
# WRT quotas, do they count against normal quotas or not? For
# now they're a special resource, so no.
@wrap_check_policy
@valid_replication_volume
def enable_replication(self, ctxt, volume):
# NOTE(jdg): details like sync vs async
# and replica count are to be set via the
# volume-type and config files.
# Get a fresh ref from db and check status
volume = self.db.volume_get(ctxt, volume['id'])
# NOTE(jdg): Set a valid status as a var to minimize errors via typos
# also, use a list, we may want to add to it some day
# TODO(jdg): Move these up to a global list for each call and ban the
# free form typing of states and state checks going forward
# NOTE(jdg): There may be a need for some backends to allow this
# call to driver regardless of replication_status, most likely
# this indicates an issue with the driver, but might be useful
# cases to consider modifying this for in the future.
valid_rep_status = ['disabled', 'failed-over', 'error']
rep_status = volume.get('replication_status', valid_rep_status[0])
if rep_status not in valid_rep_status:
msg = (_("Invalid status to enable replication. "
"valid states are: %(valid_states)s, "
"current replication-state is: %(curr_state)s.") %
{'valid_states': valid_rep_status,
'curr_state': rep_status})
raise exception.InvalidVolume(reason=msg)
vref = self.db.volume_update(ctxt,
volume['id'],
{'replication_status': 'enabling'})
self.volume_rpcapi.enable_replication(ctxt, vref)
@wrap_check_policy
@valid_replication_volume
def disable_replication(self, ctxt, volume):
valid_disable_status = ['disabled', 'enabled']
# NOTE(jdg): Just use disabled here (item 1 in the list) this
# way if someone says disable_rep on a volume that's not being
# replicated we just say "ok, done"
rep_status = volume.get('replication_status', valid_disable_status[0])
if rep_status not in valid_disable_status:
msg = (_("Invalid status to disable replication. "
"valid states are: %(valid_states)s, "
"current replication-state is: %(curr_state)s.") %
{'valid_states': valid_disable_status,
'curr_state': rep_status})
raise exception.InvalidVolume(reason=msg)
vref = self.db.volume_update(ctxt,
volume['id'],
{'replication_status': 'disabling'})
self.volume_rpcapi.disable_replication(ctxt, vref)
@wrap_check_policy
@valid_replication_volume
def failover_replication(self,
# FIXME(jdg): Move these Cheesecake methods (freeze, thaw and failover)
# to a services API because that's what they are
def failover_host(self,
ctxt,
volume,
secondary=None):
host,
secondary_id=None):
# FIXME(jdg): What is the secondary argument?
# for managed secondaries that's easy; it's a host
# for others, it's tricky; will propose a format for
# secondaries that includes an ID/Name that can be
# used as a handle
valid_failover_status = ['enabled']
rep_status = volume.get('replication_status', 'na')
ctxt = context.get_admin_context()
svc_host = volume_utils.extract_host(host, 'backend')
if rep_status not in valid_failover_status:
msg = (_("Invalid status to failover replication. "
"valid states are: %(valid_states)s, "
"current replication-state is: %(curr_state)s.") %
{'valid_states': valid_failover_status,
'curr_state': rep_status})
service = objects.Service.get_by_host_and_topic(
ctxt, svc_host, CONF.volume_topic)
expected = {'replication_status': fields.ReplicationStatus.ENABLED}
result = service.conditional_update(
{'replication_status': fields.ReplicationStatus.FAILING_OVER},
expected)
if not result:
expected_status = utils.build_or_str(
expected['replication_status'])
msg = (_('Host replication_status must be %s to failover.')
% expected_status)
LOG.error(msg)
raise exception.InvalidInput(reason=msg)
active_backend_id = self.volume_rpcapi.failover_host(
ctxt, host,
secondary_id)
return active_backend_id
raise exception.InvalidVolume(reason=msg)
def freeze_host(self, ctxt, host):
vref = self.db.volume_update(
ctxt,
volume['id'],
{'replication_status': 'enabling_secondary'})
ctxt = context.get_admin_context()
svc_host = volume_utils.extract_host(host, 'backend')
self.volume_rpcapi.failover_replication(ctxt,
vref,
secondary)
# NOTE(jdg): get_by_host_and_topic filters out disabled
service = objects.Service.get_by_args(
ctxt, svc_host, CONF.volume_topic)
expected = {'frozen': False}
result = service.conditional_update(
{'frozen': True}, expected)
if not result:
msg = _LE('Host is already Frozen.')
LOG.error(msg)
raise exception.InvalidInput(reason=msg)
@wrap_check_policy
@valid_replication_volume
def list_replication_targets(self, ctxt, volume):
# Should we set service status to disabled to keep
# scheduler calls from being sent? Just use existing
# `cinder service-disable reason=freeze`
self.volume_rpcapi.freeze_host(ctxt, host)
# NOTE(jdg): This collects info for the specified volume
# it is NOT an error if the volume is not being replicated
# also, would be worth having something at a backend/host
# level to show an admin how a backend is configured.
return self.volume_rpcapi.list_replication_targets(ctxt, volume)
def thaw_host(self, ctxt, host):
ctxt = context.get_admin_context()
svc_host = volume_utils.extract_host(host, 'backend')
# NOTE(jdg): get_by_host_and_topic filters out disabled
service = objects.Service.get_by_args(
ctxt, svc_host, CONF.volume_topic)
expected = {'frozen': True}
result = service.conditional_update(
{'frozen': False}, expected)
if not result:
msg = _LE('Host is NOT Frozen.')
LOG.error(msg)
raise exception.InvalidInput(reason=msg)
if not self.volume_rpcapi.thaw_host(ctxt, host):
return "Backend reported error during thaw_host operation."
def check_volume_filters(self, filters):
'''Sets the user filter value to accepted format'''

View File

@ -220,7 +220,6 @@ volume_opts = [
"replication target devices. Each entry takes the "
"standard dict config form: replication_device = "
"target_device_id:<required>,"
"managed_backend_name:<host@backend_name>,"
"key1:value1,key2:value2..."),
cfg.BoolOpt('image_upload_use_cinder_backend',
default=False,
@ -1629,6 +1628,75 @@ class BaseVD(object):
msg = _("Unmanage volume not implemented.")
raise NotImplementedError(msg)
def freeze_backend(self, context):
"""Notify the backend that it's frozen.
We use set to prohibit the creation of any new resources
on the backend, or any modifications to existing items on
a backend. We set/enforce this by not allowing scheduling
of new volumes to the specified backend, and checking at the
api for modifications to resources and failing.
In most cases the driver may not need to do anything, but
this provides a handle if they need it.
:param context: security context
:response: True|False
"""
return True
def thaw_backend(self, context):
"""Notify the backend that it's unfrozen/thawed.
Returns the backend to a normal state after a freeze
operation.
In most cases the driver may not need to do anything, but
this provides a handle if they need it.
:param context: security context
:response: True|False
"""
return True
def failover_host(self, context, volumes, secondary_id=None):
"""Failover a backend to a secondary replication target.
Instructs a replication capable/configured backend to failover
to one of it's secondary replication targets. host=None is
an acceptable input, and leaves it to the driver to failover
to the only configured target, or to choose a target on it's
own. All of the hosts volumes will be passed on to the driver
in order for it to determine the replicated volumes on the host,
if needed.
Response is a tuple, including the new target backend_id
AND a lit of dictionaries with volume_id and updates.
*Key things to consider (attaching failed-over volumes):
provider_location
provider_auth
provider_id
replication_status
:param context: security context
:param volumes: list of volume objects, in case the driver needs
to take action on them in some way
:param secondary_id: Specifies rep target backend to fail over to
:returns : ID of the backend that was failed-over to
and model update for volumes
"""
# Example volume_updates data structure:
# [{'volume_id': <cinder-uuid>,
# 'updates': {'provider_id': 8,
# 'replication_status': 'failed-over',
# 'replication_extended_status': 'whatever',...}},]
raise NotImplementedError()
def get_replication_updates(self, context):
"""Old replication update method, deprecate."""
raise NotImplementedError()
@six.add_metaclass(abc.ABCMeta)
class LocalVD(object):
@ -1811,171 +1879,6 @@ class ManageableVD(object):
pass
@six.add_metaclass(abc.ABCMeta)
class ReplicaV2VD(object):
"""Cinder replication functionality.
The Cinder replication functionality is set up primarily through
the use of volume-types in conjunction with the filter scheduler.
This requires:
1. The driver reports "replication = True" in it's capabilities
2. The cinder.conf file includes the valid_replication_devices section
The driver configuration is expected to take one of the following two
forms, see devref replication docs for details.
Note we provide cinder.volume.utils.convert_config_string_to_dict
to parse this out into a usable proper dictionary.
"""
@abc.abstractmethod
def replication_enable(self, context, volume):
"""Enable replication on a replication capable volume.
If the volume was created on a replication_enabled host this method
is used to re-enable replication for the volume.
Primarily we only want this for testing/admin purposes. The idea
being that the bulk of the replication details are handled by the
type definition and the driver; however disable/enable(re-enable) is
provided for admins to test or do maintenance which is a
requirement by some cloud-providers.
NOTE: This is intended as an ADMIN only call and is not
intended to be used by end-user to enable replication. We're
leaving that to volume-type info, this is for things like
maintenance or testing.
:param context: security context
:param volume: volume object returned by DB
:response: {replication_driver_data: vendor-data} DB update
The replication_driver_data response is vendor unique,
data returned/used by the driver. It is expected that
the response from the driver is in the appropriate db update
format, in the form of a dict, where the vendor data is
stored under the key 'replication_driver_data'
"""
# TODO(jdg): Put a check in at API layer to verify the host is
# replication capable before even issuing this call (can just
# check against the volume-type for said volume as well)
raise NotImplementedError()
@abc.abstractmethod
def replication_disable(self, context, volume):
"""Disable replication on the specified volume.
If the specified volume is currently replication enabled,
this method can be used to disable the replication process
on the backend.
Note that we still send this call to a driver whose volume
may report replication-disabled already. We do this as a
safety mechanism to allow a driver to cleanup any mismatch
in state between Cinder and itself.
This is intended as an ADMIN only call to allow for
maintenance and testing. If a driver receives this call
and the process fails for some reason the driver should
return a status update to "replication_status=disable_failed"
:param context: security context
:param volume: volume object returned by DB
:response: {replication_driver_data: vendor-data} DB update
The replication_driver_data response is vendor unique,
data returned/used by the driver. It is expected that
the response from the driver is in the appropriate db update
format, in the form of a dict, where the vendor data is
stored under the key 'replication_driver_data'
"""
raise NotImplementedError()
@abc.abstractmethod
def replication_failover(self, context, volume, secondary):
"""Force failover to a secondary replication target.
Forces the failover action of a replicated volume to one of its
secondary/target devices. By default the choice of target devices
is left up to the driver. In particular we expect one way
replication here, but are providing a mechanism for 'n' way
if supported/configured.
Currently we leave it up to the driver to figure out how/what
to do here. Rather than doing things like ID swaps, we instead
just let the driver figure out how/where to route things.
In cases where we might want to drop a volume-service node and
the replication target is a configured cinder backend, we'll
just update the host column for the volume.
Very important point here is that in the case of a successful
failover, we want to update the replication_status of the
volume to "failed-over". This way there's an indication that
things worked as expected, and that it's evident that the volume
may no longer be replicating to another backend (primary burst
in to flames). This status will be set by the manager.
:param context: security context
:param volume: volume object returned by DB
:param secondary: Specifies rep target to fail over to
:response: dict of updates
So the response would take the form:
{host: <properly formatted host string for db update>,
model_update: {standard_model_update_KVs},
replication_driver_data: xxxxxxx}
It is expected that the format of these responses are in a consumable
format to be used in a db.update call directly.
Additionally we utilize exception catching to report back to the
manager when things went wrong and to inform the caller on how
to proceed.
"""
raise NotImplementedError()
@abc.abstractmethod
def list_replication_targets(self, context, vref):
"""Provide a means to obtain replication targets for a volume.
This method is used to query a backend to get the current
replication config info for the specified volume.
In the case of a volume that isn't being replicated,
the driver should return an empty list.
Example response for replicating to a managed backend:
{'volume_id': volume['id'],
'targets':[{'type': 'managed',
'backend_name': 'backend_name'}...]
Example response for replicating to an unmanaged backend:
{'volume_id': volume['id'],
'targets':[{'type': 'unmanaged',
'vendor-key-1': 'value-1'}...]
NOTE: It's the responsibility of the driver to mask out any
passwords or sensitive information. Also the format of the
response allows mixed (managed/unmanaged) targets, even though
the first iteration does not support configuring the driver in
such a manner.
"""
raise NotImplementedError()
@six.add_metaclass(abc.ABCMeta)
class ManageableSnapshotsVD(object):
# NOTE: Can't use abstractmethod before all drivers implement it
@ -2026,6 +1929,7 @@ class ManageableSnapshotsVD(object):
pass
# TODO(jdg): Remove this after the V2.1 code merges
@six.add_metaclass(abc.ABCMeta)
class ReplicaVD(object):
@abc.abstractmethod

View File

@ -50,7 +50,7 @@ CONF.register_opts(common_opts)
class DellCommonDriver(driver.ConsistencyGroupVD, driver.ManageableVD,
driver.ExtendVD, driver.ReplicaV2VD,
driver.ExtendVD,
driver.SnapshotVD, driver.BaseVD):
def __init__(self, *args, **kwargs):

View File

@ -1405,6 +1405,7 @@ class SolidFireDriver(san.SanISCSIDriver):
data["driver_version"] = self.VERSION
data["storage_protocol"] = 'iSCSI'
data['consistencygroup_support'] = True
data['replication_enabled'] = True
data['total_capacity_gb'] = (
float(results['maxProvisionedSpace'] / units.Gi))

View File

@ -204,7 +204,7 @@ def locked_snapshot_operation(f):
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
RPC_API_VERSION = '1.38'
RPC_API_VERSION = '1.39'
target = messaging.Target(version=RPC_API_VERSION)
@ -240,12 +240,29 @@ class VolumeManager(manager.SchedulerDependentManager):
context.get_admin_context())
LOG.debug("Cinder Volume DB check: vol_db_empty=%s", vol_db_empty)
# We pass the current setting for service.active_backend_id to
# the driver on init, incase there was a restart or something
curr_active_backend_id = None
svc_host = vol_utils.extract_host(self.host, 'backend')
try:
service = objects.Service.get_by_host_and_topic(
context.get_admin_context(), svc_host,
CONF.volume_topic)
except exception.ServiceNotFound:
# NOTE(jdg): This is to solve problems with unit tests
LOG.warning(_LW("Service not found for updating "
"active_backend_id, assuming default"
"for driver init."))
else:
curr_active_backend_id = service.active_backend_id
self.driver = importutils.import_object(
volume_driver,
configuration=self.configuration,
db=self.db,
host=self.host,
is_vol_db_empty=vol_db_empty)
is_vol_db_empty=vol_db_empty,
active_backend_id=curr_active_backend_id)
if CONF.profiler.profiler_enabled and profiler is not None:
self.driver = profiler.trace_cls("driver")(self.driver)
@ -380,7 +397,6 @@ class VolumeManager(manager.SchedulerDependentManager):
def init_host(self):
"""Perform any required initialization."""
ctxt = context.get_admin_context()
LOG.info(_LI("Starting volume driver %(driver_name)s (%(version)s)"),
@ -474,11 +490,6 @@ class VolumeManager(manager.SchedulerDependentManager):
# collect and publish service capabilities
self.publish_service_capabilities(ctxt)
# conditionally run replication status task
# FIXME(jdg): This should go away or be handled differently
# if/when we're ready for V2 replication
stats = self.driver.get_volume_stats(refresh=True)
if stats and stats.get('replication', False):
@ -488,6 +499,27 @@ class VolumeManager(manager.SchedulerDependentManager):
self.add_periodic_task(run_replication_task)
svc_host = vol_utils.extract_host(self.host, 'backend')
try:
# NOTE(jdg): may be some things to think about here in failover
# scenarios
service = objects.Service.get_by_host_and_topic(
context.get_admin_context(), svc_host,
CONF.volume_topic)
except exception.ServiceNotFound:
# FIXME(jdg): no idea what we'd do if we hit this case
LOG.warning(_LW("Service not found for updating "
"replication_status."))
else:
if service.replication_status == (
fields.ReplicationStatus.FAILED_OVER):
pass
elif stats and stats.get('replication_enabled', False):
service.replication_status = fields.ReplicationStatus.ENABLED
else:
service.replication_status = fields.ReplicationStatus.DISABLED
service.save()
LOG.info(_LI("Driver initialization completed successfully."),
resource={'type': 'driver',
'id': self.driver.__class__.__name__})
@ -504,7 +536,6 @@ class VolumeManager(manager.SchedulerDependentManager):
def create_volume(self, context, volume_id, request_spec=None,
filter_properties=None, allow_reschedule=True,
volume=None):
"""Creates the volume."""
# FIXME(thangp): Remove this in v2.0 of RPC API.
if volume is None:
@ -3174,277 +3205,158 @@ class VolumeManager(manager.SchedulerDependentManager):
volume.update(model_update_default)
volume.save()
# Replication V2 methods
def enable_replication(self, context, volume):
"""Enable replication on a replication capable volume.
# Replication V2.1 methods
def failover_host(self, context,
secondary_backend_id=None):
"""Failover a backend to a secondary replication target.
If the volume was created on a replication_enabled host this method
is used to enable replication for the volume. Primarily used for
testing and maintenance.
Instructs a replication capable/configured backend to failover
to one of it's secondary replication targets. host=None is
an acceptable input, and leaves it to the driver to failover
to the only configured target, or to choose a target on it's
own. All of the hosts volumes will be passed on to the driver
in order for it to determine the replicated volumes on the host,
if needed.
:param context: security context
:param volume: volume object returned by DB
:param secondary_backend_id: Specifies backend_id to fail over to
:returns : ID of the backend that was failed-over to
"""
svc_host = vol_utils.extract_host(self.host, 'backend')
# NOTE(jdg): get_by_host_and_topic filters out disabled
service = objects.Service.get_by_args(
context, svc_host,
CONF.volume_topic)
volumes = objects.VolumeList.get_all_by_host(context, self.host)
exception_encountered = False
try:
# If the driver isn't initialized, we can't talk to the backend
# so the driver can't enable replication
utils.require_driver_initialized(self.driver)
except exception.DriverNotInitialized:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Can't enable replication because the "
"driver isn't initialized"))
# NOTE(jdg): We're going to do fresh get from the DB and verify that
# we are in an expected state ('enabling')
volume = self.db.volume_get(context, volume['id'])
if volume['replication_status'] != 'enabling':
msg = (_("Unable to enable replication due to invalid "
"replication status: %(status)s.") %
{'status': volume['replication_status']})
LOG.error(msg, resource=volume)
raise exception.InvalidVolume(reason=msg)
try:
rep_driver_data = self.driver.replication_enable(context,
volume)
except exception.CinderException:
err_msg = (_("Enable replication for volume failed."))
LOG.exception(err_msg, resource=volume)
self.db.volume_update(context,
volume['id'],
{'replication_status': 'error'})
raise exception.VolumeBackendAPIException(data=err_msg)
except Exception:
msg = _('enable_replication caused exception in driver.')
LOG.exception(msg, resource=volume)
self.db.volume_update(context,
volume['id'],
{'replication_status': 'error'})
raise exception.VolumeBackendAPIException(data=msg)
try:
if rep_driver_data:
volume = self.db.volume_update(context,
volume['id'],
rep_driver_data)
except exception.CinderException as ex:
LOG.exception(_LE("Driver replication data update failed."),
resource=volume)
raise exception.VolumeBackendAPIException(reason=ex)
self.db.volume_update(context, volume['id'],
{'replication_status': 'enabled'})
def disable_replication(self, context, volume):
"""Disable replication on the specified volume.
If the specified volume is currently replication enabled,
this method can be used to disable the replication process
on the backend. This method assumes that we checked
replication status in the API layer to ensure we should
send this call to the driver.
:param context: security context
:param volume: volume object returned by DB
"""
try:
# If the driver isn't initialized, we can't talk to the backend
# so the driver can't enable replication
utils.require_driver_initialized(self.driver)
except exception.DriverNotInitialized:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Can't disable replication because the "
"driver isn't initialized"))
volume = self.db.volume_get(context, volume['id'])
if volume['replication_status'] != 'disabling':
msg = (_("Unable to disable replication due to invalid "
"replication status: %(status)s.") %
{'status': volume['replication_status']})
LOG.error(msg, resource=volume)
raise exception.InvalidVolume(reason=msg)
try:
rep_driver_data = self.driver.replication_disable(context,
volume)
except exception.CinderException:
err_msg = (_("Disable replication for volume failed."))
LOG.exception(err_msg, resource=volume)
self.db.volume_update(context,
volume['id'],
{'replication_status': 'error'})
raise exception.VolumeBackendAPIException(data=err_msg)
except Exception:
msg = _('disable_replication caused exception in driver.')
LOG.exception(msg, resource=volume)
self.db.volume_update(context,
volume['id'],
{'replication_status': 'error'})
raise exception.VolumeBackendAPIException(msg)
try:
if rep_driver_data:
volume = self.db.volume_update(context,
volume['id'],
rep_driver_data)
except exception.CinderException as ex:
LOG.exception(_LE("Driver replication data update failed."),
resource=volume)
self.db.volume_update(context,
volume['id'],
{'replication_status': 'error'})
raise exception.VolumeBackendAPIException(reason=ex)
self.db.volume_update(context,
volume['id'],
{'replication_status': 'disabled'})
def failover_replication(self, context, volume, secondary=None):
"""Force failover to a secondary replication target.
Forces the failover action of a replicated volume to one of its
secondary/target devices. By default the choice of target devices
is left up to the driver. In particular we expect one way
replication here, but are providing a mechanism for 'n' way
if supported/configrued.
Currently we leave it up to the driver to figure out how/what
to do here. Rather than doing things like ID swaps, we instead
just let the driver figure out how/where to route things.
In cases where we might want to drop a volume-service node and
the replication target is a configured cinder backend, we'll
just update the host column for the volume.
:param context: security context
:param volume: volume object returned by DB
:param secondary: Specifies rep target to fail over to
"""
# NOTE(hemna) We intentionally don't enforce the driver being
# initialized here. because the primary might actually be down,
# but we still want to give the driver a chance of doing some work
# against the target. It's entirely up to the driver to deal with
# not being able to talk to the primary array that it's configured
# to manage.
if volume['replication_status'] != 'enabling_secondary':
msg = (_("Unable to failover replication due to invalid "
"replication status: %(status)s.") %
{'status': volume['replication_status']})
LOG.error(msg, resource=volume)
raise exception.InvalidVolume(reason=msg)
try:
volume = self.db.volume_get(context, volume['id'])
model_update = self.driver.replication_failover(context,
volume,
secondary)
# model_updates is a dict containing a report of relevant
# items based on the backend and how it operates or what it needs.
# For example:
# {'host': 'secondary-configured-cinder-backend',
# 'provider_location: 'foo',
# 'replication_driver_data': 'driver-specific-stuff-for-db'}
# Where 'host' is a valid cinder host string like
# 'foo@bar#baz'
except exception.CinderException:
# FIXME(jdg): We need to create a few different exceptions here
# and handle each differently:
# 1. I couldn't failover, but the original setup is ok so proceed
# as if this were never called
# 2. I ran into a problem and I have no idea what state things
# are in, so set volume to error
# 3. I ran into a problem and a human needs to come fix me up
err_msg = (_("Replication failover for volume failed."))
LOG.exception(err_msg, resource=volume)
self.db.volume_update(context,
volume['id'],
{'replication_status': 'error'})
raise exception.VolumeBackendAPIException(data=err_msg)
except Exception:
msg = _('replication_failover caused exception in driver.')
LOG.exception(msg, resource=volume)
self.db.volume_update(context,
volume['id'],
{'replication_status': 'error'})
raise exception.VolumeBackendAPIException(msg)
if model_update:
try:
volume = self.db.volume_update(
# expected form of volume_update_list:
# [{volume_id: <cinder-volid>, updates: {'provider_id': xxxx....}},
# {volume_id: <cinder-volid>, updates: {'provider_id': xxxx....}}]
(active_backend_id, volume_update_list) = (
self.driver.failover_host(
context,
volume['id'],
model_update)
volumes,
secondary_backend_id))
except exception.UnableToFailOver:
LOG.exception(_LE("Failed to perform replication failover"))
service.replication_status = (
fields.ReplicationStatus.FAILOVER_ERROR)
exception_encountered = True
except exception.InvalidReplicationTarget:
LOG.exception(_LE("Invalid replication target specified "
"for failover"))
service.replication_status = fields.ReplicationStatus.ENABLED
exception_encountered = True
except exception.VolumeDriverException:
# NOTE(jdg): Drivers need to be aware if they fail during
# a failover sequence, we're expecting them to cleanup
# and make sure the driver state is such that the original
# backend is still set as primary as per driver memory
LOG.error(_LE("Driver reported error during "
"replication failover."))
service.status = 'error'
service.save()
exception_encountered = True
if exception_encountered:
LOG.error(
_LE("Error encountered during failover on host: "
"%(host)s invalid target ID %(backend_id)"),
{'host': self.host, 'backend_id':
secondary_backend_id})
return None
except exception.CinderException as ex:
LOG.exception(_LE("Driver replication data update failed."),
resource=volume)
self.db.volume_update(context,
volume['id'],
{'replication_status': 'error'})
raise exception.VolumeBackendAPIException(reason=ex)
service.replication_status = fields.ReplicationStatus.FAILED_OVER
service.active_backend_id = active_backend_id
service.disabled = True
service.disabled_reason = "failed-over"
service.save()
# NOTE(jdg): We're setting replication status to failed-over
# which indicates the volume is ok, things went as expected but
# we're likely not replicating any longer because... well we
# did a fail-over. In the case of admin bringing primary
# back online he/she can use enable_replication to get this
# state set back to enabled.
for update in volume_update_list:
# Response must include an id key: {volume_id: <cinder-uuid>}
if not update.get('volume_id'):
raise exception.UnableToFailOver(
reason=_("Update list, doesn't include volume_id"))
# Key things to consider (attaching failed-over volumes):
# provider_location
# provider_auth
# provider_id
# replication_status
vobj = objects.Volume.get_by_id(context, update['volume_id'])
vobj.update(update.get('updates', {}))
vobj.save()
# Also, in the case of multiple targets, the driver can update
# status in the rep-status checks if it still has valid replication
# targets that the volume is being replicated to.
LOG.info(_LI("Failed over to replication target successfully."))
return active_backend_id
self.db.volume_update(context,
volume['id'],
{'replication_status': 'failed-over'})
def freeze_host(self, context):
"""Freeze management plane on this backend.
def list_replication_targets(self, context, volume):
"""Provide a means to obtain replication targets for a volume.
This method is used to query a backend to get the current
replication config info for the specified volume.
In the case of a volume that isn't being replicated,
the driver should return an empty list.
There is one required field for configuration of
replication, (target_device_id). As such we
use that for the response in list_replication_targets.
Internal methods can be added to extract additional
details if needed, but the only detail that should be
exposed to an end user is the identifier. In the case
of an Admin, (s)he can always cross check the cinder.conf
file that they have configured for additional info.
Example response:
{'volume_id': volume['id'],
'targets':[<target-device-id>,...]'
Basically puts the control/management plane into a
Read Only state. We should handle this in the scheduler,
however this is provided to let the driver know in case it
needs/wants to do something specific on the backend.
:param context: security context
"""
# NOTE(hemna) We intentionally don't enforce the driver being
# initialized here. because the primary might actually be down,
# but we still want to give the driver a chance of doing some work
# against the target. It's entirely up to the driver to deal with
# not being able to talk to the primary array that it's configured
# to manage.
# TODO(jdg): Return from driver? or catch?
# Update status column in service entry
try:
volume = self.db.volume_get(context, volume['id'])
replication_targets = self.driver.list_replication_targets(context,
volume)
self.driver.freeze_backend(context)
except exception.VolumeDriverException:
# NOTE(jdg): In the case of freeze, we don't really
# need the backend's consent or anything, we'll just
# disable the service, so we can just log this and
# go about our business
LOG.warning(_LW('Error encountered on Cinder backend during '
'freeze operation, service is frozen, however '
'notification to driver has failed.'))
svc_host = vol_utils.extract_host(self.host, 'backend')
except exception.CinderException:
err_msg = (_("Get replication targets failed."))
LOG.exception(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
# NOTE(jdg): get_by_host_and_topic filters out disabled
service = objects.Service.get_by_args(
context, svc_host,
CONF.volume_topic)
service.disabled = True
service.disabled_reason = "frozen"
service.save()
LOG.info(_LI("Set backend status to frozen successfully."))
return True
return replication_targets
def thaw_host(self, context):
"""UnFreeze management plane on this backend.
Basically puts the control/management plane back into
a normal state. We should handle this in the scheduler,
however this is provided to let the driver know in case it
needs/wants to do something specific on the backend.
:param context: security context
"""
# TODO(jdg): Return from driver? or catch?
# Update status column in service entry
try:
self.driver.thaw_backend(context)
except exception.VolumeDriverException:
# NOTE(jdg): Thaw actually matters, if this call
# to the backend fails, we're stuck and can't re-enable
LOG.error(_LE('Error encountered on Cinder backend during '
'thaw operation, service will remain frozen.'))
return False
svc_host = vol_utils.extract_host(self.host, 'backend')
# NOTE(jdg): get_by_host_and_topic filters out disabled
service = objects.Service.get_by_args(
context, svc_host,
CONF.volume_topic)
service.disabled = False
service.disabled_reason = ""
service.save()
LOG.info(_LI("Thawed backend successfully."))
return True
def manage_existing_snapshot(self, ctxt, snapshot, ref=None):
LOG.debug('manage_existing_snapshot: managing %s.', ref)

View File

@ -91,9 +91,10 @@ class VolumeAPI(rpc.RPCAPI):
checks in the API.
1.38 - Scaling backup service, add get_backup_device() and
secure_file_operations_enabled()
1.39 - Update replication methods to reflect new backend rep strategy
"""
RPC_API_VERSION = '1.38'
RPC_API_VERSION = '1.39'
TOPIC = CONF.volume_topic
BINARY = 'cinder-volume'
@ -303,27 +304,26 @@ class VolumeAPI(rpc.RPCAPI):
new_volume=new_volume,
volume_status=original_volume_status)
def enable_replication(self, ctxt, volume):
cctxt = self._get_cctxt(volume['host'], '1.27')
cctxt.cast(ctxt, 'enable_replication', volume=volume)
def freeze_host(self, ctxt, host):
"""Set backend host to frozen."""
cctxt = self._get_cctxt(host, '1.39')
return cctxt.call(ctxt, 'freeze_host')
def disable_replication(self, ctxt, volume):
cctxt = self._get_cctxt(volume['host'], '1.27')
cctxt.cast(ctxt, 'disable_replication',
volume=volume)
def thaw_host(self, ctxt, host):
"""Clear the frozen setting on a backend host."""
cctxt = self._get_cctxt(host, '1.39')
return cctxt.call(ctxt, 'thaw_host')
def failover_replication(self,
ctxt,
volume,
secondary=None):
cctxt = self._get_cctxt(volume['host'], '1.27')
cctxt.cast(ctxt, 'failover_replication',
volume=volume,
secondary=secondary)
def failover_host(self, ctxt, host,
secondary_backend_id=None):
"""Failover host to the specified backend_id (secondary). """
cctxt = self._get_cctxt(host, '1.39')
return cctxt.call(ctxt, 'failover_host',
secondary_backend_id=secondary_backend_id)
def list_replication_targets(self, ctxt, volume):
cctxt = self._get_cctxt(volume['host'], '1.27')
return cctxt.call(ctxt, 'list_replication_targets', volume=volume)
def list_replication_targets(self, ctxt, host):
cctxt = self._get_cctxt(host, '1.39')
return cctxt.call(ctxt, 'list_replication_targets')
def manage_existing_snapshot(self, ctxt, snapshot, ref, host):
cctxt = self._get_cctxt(host, '1.28')

View File

@ -12,13 +12,38 @@ of all the different backend devices.
Most of the configuration is done via the cinder.conf file
under the driver section and through the use of volume types.
NOTE:
This implementation is intended to solve a specific use case.
It's critical that you read the Use Cases section of the spec
here:
https://specs.openstack.org/openstack/cinder-specs/specs/mitaka/cheesecake.html
Config file examples
--------------------
The cinder.conf file is used to specify replication target
devices for a specific driver. Replication targets may
be specified as external (unmanaged) or internally
Cinder managed backend devices.
The cinder.conf file is used to specify replication config info
for a specific driver. There is no concept of managed vs unmanaged,
ALL replication configurations are expected to work by using the same
driver. In other words, rather than trying to perform any magic
by changing host entries in the DB for a Volume etc, all replication
targets are considered "unmanged" BUT if a failover is issued, it's
the drivers responsiblity to access replication volumes on the replicated
backend device.
This results in no changes for the end-user. For example, He/She can
still issue an attach call to a replicated volume that has been failed
over, and the driver will still receive the call BUT the driver will
need to figure out if it needs to redirect the call to the a different
backend than the default or not.
Information regarding if the backend is in a failed over state should
be stored in the driver, and in the case of a restart, the service
entry in the DB will have the replication status info and pass it
in during init to allow the driver to be set in the correct state.
In the case of a failover event, and a volume was NOT of type
replicated, that volume will now be UNAVAILABLE and any calls
to access that volume should return a VolumeNotFound exception.
**replication_device**
@ -28,37 +53,13 @@ like to configure.
*NOTE:*
There are two standardized keys in the config
There is one standaredized and REQUIRED key in the config
entry, all others are vendor-unique:
* target_device_id:<vendor-identifier-for-rep-target>
* managed_backend_name:<cinder-backend-host-entry>,"
* backend_id:<vendor-identifier-for-rep-target>
target_device_id is REQUIRED in all configurations
An example config entry for a managed replication device
would look like this::
.....
[driver-biz]
volume_driver=xxxx
volume_backend_name=biz
[driver-foo]
volume_driver=xxxx
volume_backend_name=foo
replication_device = device_target_id:vendor-id-info,managed_backend_name:biz,unique_key:val....
The use of multiopt will result in self.configuration.get('replication_device')
returning a list of properly formed python dictionaries that can
be easily consumed::
[{device_target_id: blahblah, managed_backend_name: biz, unique_key: val1}]
In the case of multiple replication target devices::
An example driver config for a device with multiple replication targets
is show below::
.....
[driver-biz]
@ -72,76 +73,16 @@ In the case of multiple replication target devices::
[driver-foo]
volume_driver=xxxx
volume_backend_name=foo
managed_replication_target=True
replication_device = device_target_id:vendor-id-info,managed_backend_name:biz,unique_key:val....
replication_device = device_target_id:vendor-id-info,managed_backend_name:baz,unique_key:val....
In this example the result is self.configuration.get('replication_device')
returning a list of properly formed python dictionaries::
[{device_target_id: blahblah, managed_backend_name: biz, unique_key: val1},
{device_target_id: moreblah, managed_backend_name: baz, unique_key: val1}]
In the case of unmanaged replication target devices::
.....
[driver-biz]
volume_driver=xxxx
volume_backend_name=biz
[driver-baz]
volume_driver=xxxx
volume_backend_name=baz
[driver-foo]
volume_driver=xxxx
volume_backend_name=foo
replication_device = device_target_id:vendor-id-info,managed_backend_name:None,unique_key:val....
replication_device = device_target_id:vendor-id-info,managed_backend_name:None,unique_key:val....
The managed_backend_name entry may also be omitted altogether in the case of unmanaged targets.
replication_device = backend_id:vendor-id-1,unique_key:val....
replication_device = backend_id:vendor-id-2,unique_key:val....
In this example the result is self.configuration.get('replication_device) with the list::
[{device_target_id: blahblah, managed_backend_name: None, unique_key: val1},
{device_target_id: moreblah, managed_backend_name: None, unique_key: val1}]
[{backend_id: vendor-id-1, unique_key: val1},
{backend_id: vendor-id-2, unique_key: val1}]
Special note about Managed target device
----------------------------------------
Remember that in the case where another Cinder backend is
used that it's likely you'll still need some special data
to instruct the primary driver how to communicate with the
secondary. In this case we use the same structure and entries
but we set the key **managed_backend_name** to a valid
Cinder backend name.
**WARNING**
The building of the host string for a driver is not always
very straight forward. The enabled_backends names which
correspond to the driver-section are what actually get used
to form the host string for the volume service.
Also, take care that your driver knows how to parse out the
host correctly, although the secondary backend may be managed
it may not be on the same host, it may have a pool specification
etc. In the example above we can assume the same host, in other
cases we would need to use the form::
<host>@<driver-section-name>
and for some vendors we may require pool specification::
<host>@<driver-section-name>#<pool-name>
Regardless, it's best that you actually check the services entry
and verify that you've set this correctly, and likely to avoid
problems your vendor documentation for customers to configure this
should recommend configuring backends, then verifying settings
from cinder services list.
Volume Types / Extra Specs
---------------------------
In order for a user to specify they'd like a replicated volume, there needs to be
@ -156,70 +97,69 @@ backend that supports replication, the extra-specs entry would be::
{replication: enabled}
If you needed to provide a specific backend device (multiple backends supporting replication)::
{replication: enabled, volume_backend_name: foo}
Additionally you could provide additional details using scoped keys::
{replication: enabled, volume_backend_name: foo,
replication: replication_type: async}
Again, it's up to the driver to parse the volume type info on create and set things up
It's up to the driver to parse the volume type info on create and set things up
as requested. While the scoping key can be anything, it's strongly recommended that all
backends utilize the same key (replication) for consistency and to make things easier for
the Cloud Administrator.
Additionally it's expected that if a backend is configured with 3 replciation
targets, that if a volume of type replication=enabled is issued against that
backend then it will replicate to ALL THREE of the configured targets.
Capabilities reporting
----------------------
The following entries are expected to be added to the stats/capabilities update for
replication configured devices::
stats["replication_enabled"] = True|False
stats["replication_type"] = ['async', 'sync'...]
stats["replication_count"] = len(self.cluster_pairs)
stats["replication_targets"] = [<backend-id_1, <backend-id_2>...]
NOTICE, we report configured replication targets via volume stats_update
This information is added to the get_capabilities admin call.
Required methods
-----------------
The number of API methods associated with replication are intentionally very limited, and are
The number of API methods associated with replication is intentionally very limited,
Admin only methods.
They include::
replication_enable(self, context, volume)
replication_disable(self, context, volume)
replication_failover(self, context, volume)
list_replication_targets(self, context)
replication_failover(self, context, volumes)
**replication_enable**
Used to notify the driver that we would like to enable replication on a replication capable volume.
NOTE this is NOT used as the initial create replication command, that's handled by the volume-type at
create time. This is provided as a method for an Admin that may have needed to disable replication
on a volume for maintenance or whatever reason to signify that they'd like to "resume" replication on
the given volume.
**replication_disable**
Used to notify the driver that we would like to disable replication on a replication capable volume.
This again would be used by a Cloud Administrator for things like maintenance etc.
Additionally we have freeze/thaw methods that will act on the scheduler
but may or may not require something from the driver::
freeze_backend(self, context)
thaw_backend(self, context)
**replication_failover**
Used to instruct the backend to fail over to the secondary/target device on a replication capable volume.
This may be used for triggering a fail-over manually or for testing purposes.
Used to instruct the backend to fail over to the secondary/target device.
If not secondary is specified (via backend_id argument) it's up to the driver
to choose which device to failover to. In the case of only a single
replication target this argument should be ignored.
Note that ideally drivers will know how to update the volume reference properly so that Cinder is now
pointing to the secondary. Also, while it's not required, at this time; ideally the command would
act as a toggle, allowing to switch back and forth between primary and secondary and back to primary.
**list_replication_targets**
Keep in mind the use case is that the backend has died a horrible death and is
no longer valid. Any volumes that were on the primary and NOT of replication
type should now be unavailable.
Used by the admin to query a volume for a list of configured replication targets.
NOTE: We do not expect things like create requests to go to the driver and
magically create volumes on the replication target. The concept is that the
backend is lost, and we're just providing a DR mechanism to preserve user data
for volumes that were speicfied as such via type settings.
The expected response is simply the single required field in replication-device
configuration. It's possible that in the future we may want to add a show
command that provides all the various details about a target replication
device. This would be of the form:
`show_replication_target <target_device_id>`
**freeze_backend**
Puts a backend host/service into a R/O state for the control plane. For
example if a failover is issued, it is likely desireable that while data access
to existing volumes is maintained, it likely would not be wise to continue
doing things like creates, deletes, extends etc.
Example response:
{'volume_id': volume['id'],
'targets':[<target-device-id>,...]'
**thaw_backend**
Clear frozen control plane on a backend.