Add support in Cinder for volume replication - driver approach

This is take #2 for managing replicaiton in Cinder.

This patch provides the foundation in Cinder to make volume
replication available to the cloud admin. It makes Cinder aware
of volume replicas, and allows the cloud admin to define storage
policies (volume types) that will enable replication.

In this version Cinder delegates most the work on replication
to the driver itself.

This includes:
1. Driver exposes replication capabilities via volume type convention.
2. Extend volume table to include columns to support replicaion.
3. Create replicas in the driver, making it transparant to Cinder.
4. Volume manager code to handle API, updates to create_volume to
   support creating test replicas.
5. Driver methods to expose per replication functions

Cinder-specs available at https://review.openstack.org/#/c/98308/

Volume replication use-case: Simplified disaster recovery
The OpenStack cloud is deployed across two metro distance data centers.
Storage backends are available in both data ceneters. The backends
are managed by either a single Cinder host or two, depending on the
storage backend requirements.
Storage admin configures the Cinder volume driver to support
replication.
Cloud admin creates a volume type "replicated" with extra-specs:
   capabilities:replication="<is> True"
Every volume created in type "replicated" has a copy on both
backends.
In case of data center failure in first data center, the cloud admin
promotes the replica, and redeploy the VMs - they will now run on
a host in the secondary data center using the storage on the
secondary data center.

Implements: blueprint volume-replication
DocImpact

Change-Id: I964852f08b500400a27bff99e5200386e00643c9
This commit is contained in:
Ronen Kat 2014-07-26 17:06:52 +03:00
parent 74a38b5e7e
commit 1c8f49bfe9
29 changed files with 1317 additions and 35 deletions

View File

@ -0,0 +1,139 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import webob
from webob import exc
from cinder.api import extensions
from cinder.api.openstack import wsgi
from cinder.api import xmlutil
from cinder import exception
from cinder.i18n import _
from cinder.openstack.common import log as logging
from cinder import replication as replicationAPI
from cinder import volume
LOG = logging.getLogger(__name__)
authorize = extensions.soft_extension_authorizer('volume',
'volume_replication')
class VolumeReplicationController(wsgi.Controller):
"""The Volume Replication API controller for the Openstack API."""
def __init__(self, *args, **kwargs):
super(VolumeReplicationController, self).__init__(*args, **kwargs)
self.volume_api = volume.API()
self.replication_api = replicationAPI.API()
def _add_replication_attributes(self, req, context, resp_volume):
db_volume = req.cached_resource_by_id(resp_volume['id'])
key = "%s:extended_status" % Volume_replication.alias
resp_volume[key] = db_volume['replication_extended_status']
key = "%s:driver_data" % Volume_replication.alias
resp_volume[key] = db_volume['replication_driver_data']
@wsgi.extends
def show(self, req, resp_obj, id):
context = req.environ['cinder.context']
if authorize(context):
resp_obj.attach(xml=VolumeReplicationAttributeTemplate())
self._add_replication_attributes(req, context,
resp_obj.obj['volume'])
@wsgi.extends
def detail(self, req, resp_obj):
context = req.environ['cinder.context']
if authorize(context):
resp_obj.attach(xml=VolumeReplicationListAttributeTemplate())
for vol in list(resp_obj.obj['volumes']):
self._add_replication_attributes(req, context, vol)
@wsgi.response(202)
@wsgi.action('os-promote-replica')
def promote(self, req, id, body):
context = req.environ['cinder.context']
try:
vol = self.volume_api.get(context, id)
LOG.info(_('Attempting to promote secondary replica to primary'
' for volume %s.'),
str(id),
context=context)
self.replication_api.promote(context, vol)
except exception.NotFound:
msg = _("Volume could not be found")
raise exc.HTTPNotFound(explanation=msg)
except exception.ReplicationError as error:
raise exc.HTTPBadRequest(explanation=unicode(error))
return webob.Response(status_int=202)
@wsgi.response(202)
@wsgi.action('os-reenable-replica')
def reenable(self, req, id, body):
context = req.environ['cinder.context']
try:
vol = self.volume_api.get(context, id)
LOG.info(_('Attempting to sync secondary replica with primary'
' for volume %s.'),
str(id),
context=context)
self.replication_api.reenable(context, vol)
except exception.NotFound:
msg = _("Volume could not be found")
raise exc.HTTPNotFound(explanation=msg)
except exception.ReplicationError as error:
raise exc.HTTPBadRequest(explanation=unicode(error))
return webob.Response(status_int=202)
class Volume_replication(extensions.ExtensionDescriptor):
"""Volume replication management support."""
name = "VolumeReplication"
alias = "os-volume-replication"
namespace = "http://docs.openstack.org/volume/ext/volume_replication/" + \
"api/v1"
updated = "2014-08-01T00:00:00+00:00"
def get_controller_extensions(self):
controller = VolumeReplicationController()
extension = extensions.ControllerExtension(self, 'volumes', controller)
return [extension]
def make_volume(elem):
elem.set('{%s}extended_status' % Volume_replication.namespace,
'%s:extended_status' % Volume_replication.alias)
elem.set('{%s}driver_data' % Volume_replication.namespace,
'%s:driver_data' % Volume_replication.alias)
class VolumeReplicationAttributeTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('volume', selector='volume')
make_volume(root)
alias = Volume_replication.alias
namespace = Volume_replication.namespace
return xmlutil.SlaveTemplate(root, 1, nsmap={alias: namespace})
class VolumeReplicationListAttributeTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('volumes')
elem = xmlutil.SubTemplateElement(root, 'volume', selector='volumes')
make_volume(elem)
alias = Volume_replication.alias
namespace = Volume_replication.namespace
return xmlutil.SlaveTemplate(root, 1, nsmap={alias: namespace})

View File

@ -68,7 +68,8 @@ class ViewBuilder(common.ViewBuilder):
'links': self._get_links(request, volume['id']),
'user_id': volume.get('user_id'),
'bootable': str(volume.get('bootable')).lower(),
'encrypted': self._is_volume_encrypted(volume)
'encrypted': self._is_volume_encrypted(volume),
'replication_status': volume.get('replication_status')
}
}

View File

@ -326,11 +326,30 @@ class VolumeController(wsgi.Controller):
else:
kwargs['source_volume'] = None
source_replica = volume.get('source_replica')
if source_replica is not None:
try:
src_vol = self.volume_api.get_volume(context,
source_replica)
if src_vol['replication_status'] == 'disabled':
explanation = _('source volume id:%s is not'
' replicated') % source_volid
raise exc.HTTPNotFound(explanation=explanation)
kwargs['source_replica'] = src_vol
except exception.NotFound:
explanation = (_('replica source volume id:%s not found') %
source_replica)
raise exc.HTTPNotFound(explanation=explanation)
else:
kwargs['source_replica'] = None
size = volume.get('size', None)
if size is None and kwargs['snapshot'] is not None:
size = kwargs['snapshot']['volume_size']
elif size is None and kwargs['source_volume'] is not None:
size = kwargs['source_volume']['size']
elif size is None and kwargs['source_replica'] is not None:
size = kwargs['source_replica']['size']
LOG.info(_("Create volume of %s GB"), size, context=context)

View File

@ -194,6 +194,9 @@ global_opts = [
help='Whether snapshots count against GigaByte quota'),
cfg.StrOpt('transfer_api_class',
default='cinder.transfer.api.API',
help='The full class name of the volume transfer API class'), ]
help='The full class name of the volume transfer API class'),
cfg.StrOpt('replication_api_class',
default='cinder.replication.api.API',
help='The full class name of the volume replication API class'), ]
CONF.register_opts(global_opts)

View File

@ -0,0 +1,51 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Column
from sqlalchemy import MetaData, String, Table
from cinder.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def upgrade(migrate_engine):
"""Add replication columns to volumes."""
meta = MetaData()
meta.bind = migrate_engine
volumes = Table('volumes', meta, autoload=True)
replication_status = Column('replication_status', String(255))
replication_extended_status = Column('replication_extended_status',
String(255))
replication_driver_data = Column('replication_driver_data', String(255))
volumes.create_column(replication_status)
volumes.create_column(replication_extended_status)
volumes.create_column(replication_driver_data)
volumes.update().values(replication_status='disabled',
replication_extended_status=None,
replication_driver_data=None).execute()
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
volumes = Table('volumes', meta, autoload=True)
replication_status = volumes.columns.replication_status
replication_extended_status = volumes.columns.replication_extended_status
replication_driver_data = volumes.columns.replication_driver_data
volumes.drop_column(replication_status)
volumes.drop_column(replication_extended_status)
volumes.drop_column(replication_driver_data)

View File

@ -119,6 +119,10 @@ class Volume(BASE, CinderBase):
deleted = Column(Boolean, default=False)
bootable = Column(Boolean, default=False)
replication_status = Column(String(255))
replication_extended_status = Column(String(255))
replication_driver_data = Column(String(255))
class VolumeMetadata(BASE, CinderBase):
"""Represents a metadata key/value pair for a volume."""

View File

@ -586,6 +586,16 @@ class ManageExistingInvalidReference(CinderException):
"reference %(existing_ref)s: %(reason)s")
class ReplicationError(CinderException):
message = _("Volume %(volume_id)s replication "
"error: %(reason)s")
class ReplicationNotFound(NotFound):
message = _("Volume replication for %(volume_id)s "
"could not be found.")
class ManageExistingVolumeTypeMismatch(CinderException):
message = _("Manage existing volume failed due to volume type mismatch: "
"%(reason)s")

View File

@ -0,0 +1,24 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
import cinder.openstack.common.importutils
CONF = cfg.CONF
cls = CONF.replication_api_class
API = cinder.openstack.common.importutils.import_class(cls)

114
cinder/replication/api.py Normal file
View File

@ -0,0 +1,114 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Handles all requests relating to volume replication.
"""
import functools
from oslo.config import cfg
from cinder.db import base
from cinder import exception
from cinder.i18n import _
from cinder.openstack.common import log as logging
from cinder import policy
from cinder import volume as cinder_volume
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volume_utils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
PROMOTE_PROCEED_STATUS = ('active', 'active-stopped')
REENABLE_PROCEED_STATUS = ('inactive', 'active-stopped', 'error')
def wrap_check_policy(func):
"""Check policy corresponding to the wrapped methods prior to execution.
This decorator requires the first 3 args of the wrapped function
to be (self, context, relationship_id)
"""
@functools.wraps(func)
def wrapped(self, context, target_obj, *args, **kwargs):
check_policy(context, func.__name__, target_obj)
return func(self, context, target_obj, *args, **kwargs)
return wrapped
def check_policy(context, action, target_obj=None):
target = {
'project_id': context.project_id,
'user_id': context.user_id,
}
target.update(target_obj or {})
_action = 'volume_extension:replication:%s' % action
policy.enforce(context, _action, target)
class API(base.Base):
"""API for interacting with volume replication relationships."""
def __init__(self, db_driver=None):
super(API, self).__init__(db_driver)
self.volume_rpcapi = volume_rpcapi.VolumeAPI()
self.volume_api = cinder_volume.API()
@wrap_check_policy
def promote(self, context, vol):
if vol['replication_status'] == 'disabled':
msg = _("Replication is not enabled for volume")
raise exception.ReplicationError(
reason=msg,
volume_id=vol['id'])
if vol['replication_status'] not in PROMOTE_PROCEED_STATUS:
msg = _("Replication status for volume must be active or "
"active-stopped, but current status "
"is: %s") % vol['replication_status']
raise exception.ReplicationError(
reason=msg,
volume_id=vol['id'])
if vol['status'] != 'available':
msg = _("Volume status for volume must be available, but current "
"status is: %s") % vol['status']
raise exception.ReplicationError(
reason=msg,
volume_id=vol['id'])
volume_utils.notify_about_replication_usage(context,
vol,
'promote')
self.volume_rpcapi.promote_replica(context, vol)
@wrap_check_policy
def reenable(self, context, vol):
if vol['replication_status'] == 'disabled':
msg = _("Replication is not enabled")
raise exception.ReplicationError(
reason=msg,
volume_id=vol['id'])
if vol['replication_status'] not in REENABLE_PROCEED_STATUS:
msg = _("Replication status for volume must be inactive,"
" active-stopped, or error, but current status "
"is: %s") % vol['replication_status']
raise exception.ReplicationError(
reason=msg,
volume_id=vol['id'])
volume_utils.notify_about_replication_usage(context,
vol,
'sync')
self.volume_rpcapi.reenable_replication(context, vol)

View File

@ -589,6 +589,20 @@ class AdminActionsTest(test.TestCase):
volume = self._migrate_volume_exec(ctx, volume, host, expected_status)
self.assertEqual(volume['migration_status'], 'starting')
def test_migrate_volume_fail_replication(self):
expected_status = 400
host = 'test2'
ctx = context.RequestContext('admin', 'fake', True)
volume = self._migrate_volume_prep()
# current status is available
volume = db.volume_create(ctx,
{'status': 'available',
'host': 'test',
'provider_location': '',
'attach_status': '',
'replication_status': 'active'})
volume = self._migrate_volume_exec(ctx, volume, host, expected_status)
def test_migrate_volume_as_non_admin(self):
expected_status = 403
host = 'test2'

View File

@ -0,0 +1,246 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests for volume replication API code.
"""
import json
import mock
from oslo.config import cfg
import webob
from cinder import context
from cinder import test
from cinder.tests.api import fakes
from cinder.tests import utils as tests_utils
CONF = cfg.CONF
def app():
# no auth, just let environ['cinder.context'] pass through
api = fakes.router.APIRouter()
mapper = fakes.urlmap.URLMap()
mapper['/v2'] = api
return mapper
class VolumeReplicationAPITestCase(test.TestCase):
"""Test Cases for replication API."""
def setUp(self):
super(VolumeReplicationAPITestCase, self).setUp()
self.ctxt = context.RequestContext('admin', 'fake', True)
self.volume_params = {
'host': CONF.host,
'size': 1}
def _get_resp(self, operation, volume_id, xml=False):
"""Helper for a replication action req for the specified volume_id."""
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume_id)
req.method = 'POST'
if xml:
body = '<os-%s-replica/>' % operation
req.headers['Content-Type'] = 'application/xml'
req.headers['Accept'] = 'application/xml'
req.body = body
else:
body = {'os-%s-replica' % operation: ''}
req.headers['Content-Type'] = 'application/json'
req.body = json.dumps(body)
req.environ['cinder.context'] = context.RequestContext('admin',
'fake',
True)
res = req.get_response(app())
return req, res
def test_promote_bad_id(self):
(req, res) = self._get_resp('promote', 'fake')
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 404, msg)
def test_promote_bad_id_xml(self):
(req, res) = self._get_resp('promote', 'fake', xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 404, msg)
def test_promote_volume_not_replicated(self):
volume = tests_utils.create_volume(
self.ctxt,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'])
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 400, msg)
def test_promote_volume_not_replicated_xml(self):
volume = tests_utils.create_volume(
self.ctxt,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'], xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 400, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
def test_promote_replication_volume_status(self,
_rpcapi_promote):
for status in ['error', 'in-use']:
volume = tests_utils.create_volume(self.ctxt,
status = status,
replication_status = 'active',
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'])
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 400, msg)
for status in ['available']:
volume = tests_utils.create_volume(self.ctxt,
status = status,
replication_status = 'active',
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'])
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 202, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
def test_promote_replication_volume_status_xml(self,
_rpcapi_promote):
for status in ['error', 'in-use']:
volume = tests_utils.create_volume(self.ctxt,
status = status,
replication_status = 'active',
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'], xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 400, msg)
for status in ['available']:
volume = tests_utils.create_volume(self.ctxt,
status = status,
replication_status = 'active',
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'], xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 202, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
def test_promote_replication_replication_status(self,
_rpcapi_promote):
for status in ['error', 'copying', 'inactive']:
volume = tests_utils.create_volume(self.ctxt,
status = 'available',
replication_status = status,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'])
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 400, msg)
for status in ['active', 'active-stopped']:
volume = tests_utils.create_volume(self.ctxt,
status = 'available',
replication_status = status,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'])
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 202, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.promote_replica')
def test_promote_replication_replication_status_xml(self,
_rpcapi_promote):
for status in ['error', 'copying', 'inactive']:
volume = tests_utils.create_volume(self.ctxt,
status = 'available',
replication_status = status,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'], xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 400, msg)
for status in ['active', 'active-stopped']:
volume = tests_utils.create_volume(self.ctxt,
status = 'available',
replication_status = status,
**self.volume_params)
(req, res) = self._get_resp('promote', volume['id'], xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 202, msg)
def test_reenable_bad_id(self):
(req, res) = self._get_resp('reenable', 'fake')
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 404, msg)
def test_reenable_bad_id_xml(self):
(req, res) = self._get_resp('reenable', 'fake', xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 404, msg)
def test_reenable_volume_not_replicated(self):
volume = tests_utils.create_volume(
self.ctxt,
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'])
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 400, msg)
def test_reenable_volume_not_replicated_xml(self):
volume = tests_utils.create_volume(
self.ctxt,
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'], xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 400, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.reenable_replication')
def test_reenable_replication_replication_status(self,
_rpcapi_promote):
for status in ['active', 'copying']:
volume = tests_utils.create_volume(self.ctxt,
status = 'available',
replication_status = status,
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'])
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 400, msg)
for status in ['inactive', 'active-stopped', 'error']:
volume = tests_utils.create_volume(self.ctxt,
status = 'available',
replication_status = status,
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'])
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 202, msg)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.reenable_replication')
def test_reenable_replication_replication_status_xml(self,
_rpcapi_promote):
for status in ['active', 'copying']:
volume = tests_utils.create_volume(self.ctxt,
status = 'available',
replication_status = status,
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'], xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 400, msg)
for status in ['inactive', 'active-stopped', 'error']:
volume = tests_utils.create_volume(self.ctxt,
status = 'available',
replication_status = status,
**self.volume_params)
(req, res) = self._get_resp('reenable', volume['id'], xml=True)
msg = ("request: %s\nresult: %s" % (req, res))
self.assertEqual(res.status_int, 202, msg)

View File

@ -50,7 +50,10 @@ def stub_volume(id, **kwargs):
{'key': 'readonly', 'value': 'False'}],
'bootable': False,
'launched_at': datetime.datetime(1, 1, 1, 1, 1, 1),
'volume_type': {'name': 'vol_type_name'}}
'volume_type': {'name': 'vol_type_name'},
'replication_status': 'disabled',
'replication_extended_status': None,
'replication_driver_data': None}
volume.update(kwargs)
if kwargs.get('volume_glance_metadata', None):

View File

@ -104,6 +104,7 @@ class VolumeApiTest(test.TestCase):
'rel': 'bookmark'}],
'metadata': {},
'name': 'Volume Test Name',
'replication_status': 'disabled',
'size': 100,
'snapshot_id': None,
'source_volid': None,
@ -207,6 +208,7 @@ class VolumeApiTest(test.TestCase):
'rel': 'bookmark'}],
'metadata': {},
'name': 'Volume Test Name',
'replication_status': 'disabled',
'size': '1',
'snapshot_id': None,
'source_volid': None,
@ -286,6 +288,7 @@ class VolumeApiTest(test.TestCase):
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'Updated Test Name',
'replication_status': 'disabled',
'attachments': [
{
'id': '1',
@ -338,6 +341,7 @@ class VolumeApiTest(test.TestCase):
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'Updated Test Name',
'replication_status': 'disabled',
'attachments': [
{
'id': '1',
@ -393,6 +397,7 @@ class VolumeApiTest(test.TestCase):
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'New Name',
'replication_status': 'disabled',
'attachments': [
{
'id': '1',
@ -443,6 +448,7 @@ class VolumeApiTest(test.TestCase):
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'displayname',
'replication_status': 'disabled',
'attachments': [{
'id': '1',
'volume_id': '1',
@ -504,6 +510,7 @@ class VolumeApiTest(test.TestCase):
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'Updated Test Name',
'replication_status': 'disabled',
'attachments': [{
'id': '1',
'volume_id': '1',
@ -607,6 +614,7 @@ class VolumeApiTest(test.TestCase):
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'displayname',
'replication_status': 'disabled',
'attachments': [
{
'device': '/',
@ -667,6 +675,7 @@ class VolumeApiTest(test.TestCase):
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'displayname',
'replication_status': 'disabled',
'attachments': [
{
'device': '/',
@ -1066,6 +1075,7 @@ class VolumeApiTest(test.TestCase):
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'displayname',
'replication_status': 'disabled',
'attachments': [
{
'device': '/',
@ -1115,6 +1125,7 @@ class VolumeApiTest(test.TestCase):
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'displayname',
'replication_status': 'disabled',
'attachments': [],
'user_id': 'fakeuser',
'volume_type': 'vol_type_name',
@ -1172,6 +1183,7 @@ class VolumeApiTest(test.TestCase):
'availability_zone': 'fakeaz',
'bootable': 'false',
'name': 'displayname',
'replication_status': 'disabled',
'attachments': [
{
'device': '/',

View File

@ -75,6 +75,9 @@
"backup:get_all": [],
"backup:restore": [],
"backup:backup-import": [["rule:admin_api"]],
"backup:backup-export": [["rule:admin_api"]]
"backup:backup-export": [["rule:admin_api"]],
"volume_extension:replication:promote": [["rule:admin_api"]],
"volume_extension:replication:reenable": [["rule:admin_api"]]
}

View File

@ -42,12 +42,15 @@ class fake_volume_api(object):
request_spec, filter_properties,
allow_reschedule=True,
snapshot_id=None, image_id=None,
source_volid=None):
source_volid=None,
source_replicaid=None):
self.test_inst.assertEqual(self.expected_spec, request_spec)
self.test_inst.assertEqual(request_spec['source_volid'], source_volid)
self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id)
self.test_inst.assertEqual(request_spec['image_id'], image_id)
self.test_inst.assertEqual(request_spec['source_replicaid'],
source_replicaid)
class fake_db(object):
@ -84,7 +87,8 @@ class CreateVolumeFlowTestCase(test.TestCase):
spec = {'volume_id': None,
'source_volid': None,
'snapshot_id': None,
'image_id': None}
'image_id': None,
'source_replicaid': None}
task = create_volume.VolumeCastTask(
fake_scheduler_rpc_api(spec, self),
@ -96,7 +100,8 @@ class CreateVolumeFlowTestCase(test.TestCase):
spec = {'volume_id': 1,
'source_volid': 2,
'snapshot_id': 3,
'image_id': 4}
'image_id': 4,
'source_replicaid': 5}
task = create_volume.VolumeCastTask(
fake_scheduler_rpc_api(spec, self),

View File

@ -1093,3 +1093,36 @@ class TestMigrations(test.TestCase):
autoload=True)
index_names = [idx.name for idx in reservations.indexes]
self.assertNotIn('reservations_deleted_expire_idx', index_names)
def test_migration_024(self):
"""Test adding replication columns to volume table."""
for (key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
migration.db_initial_version())
migration_api.upgrade(engine, TestMigrations.REPOSITORY, 23)
metadata = sqlalchemy.schema.MetaData()
metadata.bind = engine
migration_api.upgrade(engine, TestMigrations.REPOSITORY, 24)
volumes = sqlalchemy.Table('volumes',
metadata,
autoload=True)
self.assertIsInstance(volumes.c.replication_status.type,
sqlalchemy.types.VARCHAR)
self.assertIsInstance(volumes.c.replication_extended_status.type,
sqlalchemy.types.VARCHAR)
self.assertIsInstance(volumes.c.replication_driver_data.type,
sqlalchemy.types.VARCHAR)
migration_api.downgrade(engine, TestMigrations.REPOSITORY, 23)
metadata = sqlalchemy.schema.MetaData()
metadata.bind = engine
volumes = sqlalchemy.Table('volumes',
metadata,
autoload=True)
self.assertNotIn('replication_status', volumes.c)
self.assertNotIn('replication_extended_status', volumes.c)
self.assertNotIn('replication_driver_data', volumes.c)

View File

@ -0,0 +1,111 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests for Volume replication code.
"""
import mock
from oslo.config import cfg
from cinder import context
from cinder import db
from cinder import exception
from cinder.openstack.common import importutils
from cinder import test
from cinder.tests import utils as test_utils
CONF = cfg.CONF
class VolumeReplicationTestCase(test.TestCase):
def setUp(self):
super(VolumeReplicationTestCase, self).setUp()
self.ctxt = context.RequestContext('user', 'fake', False)
self.adm_ctxt = context.RequestContext('admin', 'fake', True)
self.manager = importutils.import_object(CONF.volume_manager)
self.manager.host = 'test_host'
self.manager.stats = {'allocated_capacity_gb': 0}
self.driver_patcher = mock.patch.object(self.manager, 'driver')
self.driver = self.driver_patcher.start()
@mock.patch('cinder.utils.require_driver_initialized')
def test_promote_replica_uninit_driver(self, _init):
"""Test promote replication when driver is not initialized."""
_init.side_effect = exception.DriverNotInitialized
vol = test_utils.create_volume(self.ctxt,
status='available',
replication_status='active')
self.driver.promote_replica.return_value = None
self.assertRaises(exception.DriverNotInitialized,
self.manager.promote_replica,
self.adm_ctxt,
vol['id'])
def test_promote_replica(self):
"""Test promote replication."""
vol = test_utils.create_volume(self.ctxt,
status='available',
replication_status='active')
self.driver.promote_replica.return_value = \
{'replication_status': 'inactive'}
self.manager.promote_replica(self.adm_ctxt, vol['id'])
vol_after = db.volume_get(self.ctxt, vol['id'])
self.assertEqual(vol_after['replication_status'], 'inactive')
def test_promote_replica_fail(self):
"""Test promote replication when promote fails."""
vol = test_utils.create_volume(self.ctxt,
status='available',
replication_status='active')
self.driver.promote_replica.side_effect = exception.CinderException
self.assertRaises(exception.CinderException,
self.manager.promote_replica,
self.adm_ctxt,
vol['id'])
def test_reenable_replication(self):
"""Test reenable replication."""
vol = test_utils.create_volume(self.ctxt,
status='available',
replication_status='error')
self.driver.reenable_replication.return_value = \
{'replication_status': 'copying'}
self.manager.reenable_replication(self.adm_ctxt, vol['id'])
vol_after = db.volume_get(self.ctxt, vol['id'])
self.assertEqual(vol_after['replication_status'], 'copying')
@mock.patch('cinder.utils.require_driver_initialized')
def test_reenable_replication_uninit_driver(self, _init):
"""Test reenable replication when driver is not initialized."""
_init.side_effect = exception.DriverNotInitialized
vol = test_utils.create_volume(self.ctxt,
status='available',
replication_status='error')
self.assertRaises(exception.DriverNotInitialized,
self.manager.reenable_replication,
self.adm_ctxt,
vol['id'])
def test_reenable_replication_fail(self):
"""Test promote replication when driver is not initialized."""
vol = test_utils.create_volume(self.ctxt,
status='available',
replication_status='error')
self.driver.reenable_replication.side_effect = \
exception.CinderException
self.assertRaises(exception.CinderException,
self.manager.reenable_replication,
self.adm_ctxt,
vol['id'])

View File

@ -357,6 +357,9 @@ class VolumeTestCase(BaseVolumeTestCase):
'user_id': 'fake',
'launched_at': 'DONTCARE',
'size': 1,
'replication_status': 'disabled',
'replication_extended_status': None,
'replication_driver_data': None,
}
self.assertDictMatch(msg['payload'], expected)
msg = fake_notifier.NOTIFICATIONS[1]
@ -2445,6 +2448,28 @@ class VolumeTestCase(BaseVolumeTestCase):
self.assertRaises(exception.CinderException,
self.volume.create_volume, ctxt, volume_src['id'])
@mock.patch(
'cinder.volume.driver.VolumeDriver.create_replica_test_volume')
def test_create_volume_from_sourcereplica(self, _create_replica_test):
"""Test volume can be created from a volume replica."""
_create_replica_test.return_value = None
volume_src = tests_utils.create_volume(self.context,
**self.volume_params)
self.volume.create_volume(self.context, volume_src['id'])
volume_dst = tests_utils.create_volume(self.context,
source_replicaid=
volume_src['id'],
**self.volume_params)
self.volume.create_volume(self.context, volume_dst['id'],
source_replicaid=volume_src['id'])
self.assertEqual('available',
db.volume_get(context.get_admin_context(),
volume_dst['id']).status)
self.assertTrue(_create_replica_test.called)
self.volume.delete_volume(self.context, volume_dst['id'])
self.volume.delete_volume(self.context, volume_src['id'])
def test_create_volume_from_sourcevol(self):
"""Test volume can be created from a source volume."""
def fake_create_cloned_volume(volume, src_vref):
@ -2706,7 +2731,8 @@ class VolumeTestCase(BaseVolumeTestCase):
self.assertEqual(volume['status'], 'available')
def _retype_volume_exec(self, driver, snap=False, policy='on-demand',
migrate_exc=False, exc=None, diff_equal=False):
migrate_exc=False, exc=None, diff_equal=False,
replica=False):
elevated = context.get_admin_context()
project_id = self.context.project_id
@ -2716,9 +2742,14 @@ class VolumeTestCase(BaseVolumeTestCase):
vol_type = db.volume_type_get_by_name(elevated, 'new')
db.quota_create(elevated, project_id, 'volumes_new', 10)
if replica:
rep_status = 'active'
else:
rep_status = 'disabled'
volume = tests_utils.create_volume(self.context, size=1,
host=CONF.host, status='retyping',
volume_type_id=old_vol_type['id'])
volume_type_id=old_vol_type['id'],
replication_status=rep_status)
if snap:
self._create_snapshot(volume['id'], size=volume['size'])
if driver or diff_equal:
@ -2789,6 +2820,11 @@ class VolumeTestCase(BaseVolumeTestCase):
self._retype_volume_exec(False, policy='never',
exc=exception.VolumeMigrationFailed)
def test_retype_volume_migration_with_replica(self):
self._retype_volume_exec(False,
replica=True,
exc=exception.InvalidVolume)
def test_retype_volume_migration_with_snaps(self):
self._retype_volume_exec(False, snap=True, exc=exception.InvalidVolume)

View File

@ -146,6 +146,7 @@ class VolumeRpcAPITestCase(test.TestCase):
snapshot_id='fake_snapshot_id',
image_id='fake_image_id',
source_volid='fake_src_id',
source_replicaid='fake_replica_id',
version='1.4')
def test_create_volume_serialization(self):
@ -160,6 +161,7 @@ class VolumeRpcAPITestCase(test.TestCase):
snapshot_id='fake_snapshot_id',
image_id='fake_image_id',
source_volid='fake_src_id',
source_replicaid='fake_replica_id',
version='1.4')
def test_delete_volume(self):
@ -288,3 +290,15 @@ class VolumeRpcAPITestCase(test.TestCase):
volume=self.fake_volume,
ref={'lv_name': 'foo'},
version='1.15')
def test_promote_replica(self):
self._test_volume_api('promote_replica',
rpc_method='cast',
volume=self.fake_volume,
version='1.17')
def test_reenable_replica(self):
self._test_volume_api('reenable_replication',
rpc_method='cast',
volume=self.fake_volume,
version='1.17')

View File

@ -31,6 +31,9 @@ def create_volume(ctxt,
size=1,
availability_zone='fake_az',
volume_type_id=None,
replication_status='disabled',
replication_extended_status=None,
replication_driver_data=None,
**kwargs):
"""Create a volume object in the DB."""
vol = {}
@ -48,6 +51,10 @@ def create_volume(ctxt,
vol['volume_type_id'] = volume_type_id
for key in kwargs:
vol[key] = kwargs[key]
vol['replication_status'] = replication_status
vol['replication_extended_status'] = replication_extended_status
vol['replication_driver_data'] = replication_driver_data
return db.volume_create(ctxt, vol)

View File

@ -152,7 +152,8 @@ class API(base.Base):
def create(self, context, size, name, description, snapshot=None,
image_id=None, volume_type=None, metadata=None,
availability_zone=None, source_volume=None,
scheduler_hints=None, backup_source_volume=None):
scheduler_hints=None, backup_source_volume=None,
source_replica=None):
if source_volume and volume_type:
if volume_type['id'] != source_volume['volume_type_id']:
@ -161,6 +162,12 @@ class API(base.Base):
"You should omit the argument.")
raise exception.InvalidInput(reason=msg)
# When cloning replica (for testing), volume type must be omitted
if source_replica and volume_type:
msg = _("No volume_type should be provided when creating test "
"replica, type must be omitted.")
raise exception.InvalidInput(reason=msg)
if snapshot and volume_type:
if volume_type['id'] != snapshot['volume_type_id']:
msg = _("Invalid volume_type provided (requested type "
@ -190,6 +197,7 @@ class API(base.Base):
'scheduler_hints': scheduler_hints,
'key_manager': self.key_manager,
'backup_source_volume': backup_source_volume,
'source_replica': source_replica,
'optional_args': {'is_quota_committed': False}
}
try:
@ -475,6 +483,11 @@ class API(base.Base):
msg = _("Snapshot cannot be created while volume is migrating")
raise exception.InvalidVolume(reason=msg)
if volume['status'].startswith('replica_'):
# Can't snapshot secondary replica
msg = _("Snapshot of secondary replica is not allowed.")
raise exception.InvalidVolume(reason=msg)
if ((not force) and (volume['status'] != "available")):
msg = _("must be available")
raise exception.InvalidVolume(reason=msg)
@ -839,6 +852,13 @@ class API(base.Base):
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
# We only handle non-replicated volumes for now
rep_status = volume['replication_status']
if rep_status is not None and rep_status != 'disabled':
msg = _("Volume must not be replicated.")
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
# Make sure the host is in the list of available hosts
elevated = context.elevated()
topic = CONF.volume_topic

View File

@ -278,19 +278,57 @@ class VolumeDriver(object):
def create_volume(self, volume):
"""Creates a volume. Can optionally return a Dictionary of
changes to the volume object to be persisted.
If volume_type extra specs includes
'capabilities:replication <is> True' the driver
needs to create a volume replica (secondary), and setup replication
between the newly created volume and the secondary volume.
Returned dictionary should include:
volume['replication_status'] = 'copying'
volume['replication_extended_status'] = driver specific value
volume['driver_data'] = driver specific value
"""
raise NotImplementedError()
def create_volume_from_snapshot(self, volume, snapshot):
"""Creates a volume from a snapshot."""
"""Creates a volume from a snapshot.
If volume_type extra specs includes 'replication: <is> True'
the driver needs to create a volume replica (secondary),
and setup replication between the newly created volume and
the secondary volume.
"""
raise NotImplementedError()
def create_cloned_volume(self, volume, src_vref):
"""Creates a clone of the specified volume."""
"""Creates a clone of the specified volume.
If volume_type extra specs includes 'replication: <is> True' the
driver needs to create a volume replica (secondary)
and setup replication between the newly created volume
and the secondary volume.
"""
raise NotImplementedError()
def create_replica_test_volume(self, volume, src_vref):
"""Creates a test replica clone of the specified replicated volume.
Create a clone of the replicated (secondary) volume.
"""
raise NotImplementedError()
def delete_volume(self, volume):
"""Deletes a volume."""
"""Deletes a volume.
If volume_type extra specs includes 'replication: <is> True'
then the driver needs to delete the volume replica too.
"""
raise NotImplementedError()
def create_snapshot(self, snapshot):
@ -307,6 +345,10 @@ class VolumeDriver(object):
def get_volume_stats(self, refresh=False):
"""Return the current state of the volume service. If 'refresh' is
True, run the update first.
For replication the following state should be reported:
replication_support = True (None or false disables replication)
"""
return None
@ -547,7 +589,24 @@ class VolumeDriver(object):
def retype(self, context, volume, new_type, diff, host):
"""Convert the volume to be of the new type.
Returns a boolean indicating whether the retype occurred.
Returns either:
A boolean indicating whether the retype occurred, or
A tuple (retyped, model_update) where retyped is a boolean
indicating if the retype occurred, and the model_update includes
changes for the volume db.
if diff['extra_specs'] includes 'replication' then:
if ('True', _ ) then replication should be disabled:
Volume replica should be deleted
volume['replication_status'] should be changed to 'disabled'
volume['replication_extended_status'] = None
volume['replication_driver_data'] = None
if (_, 'True') then replication should be enabled:
Volume replica (secondary) should be created, and replication
should be setup between the volume and the newly created
replica
volume['replication_status'] = 'copying'
volume['replication_extended_status'] = driver specific value
volume['replication_driver_data'] = driver specific value
:param ctxt: Context
:param volume: A dictionary describing the volume to migrate
@ -557,7 +616,7 @@ class VolumeDriver(object):
host['host'] is its name, and host['capabilities'] is a
dictionary of its reported capabilities.
"""
return False
return False, None
def accept_transfer(self, context, volume, new_user, new_project):
"""Accept the transfer of a volume for a new user/project."""
@ -635,6 +694,82 @@ class VolumeDriver(object):
def validate_connector_has_setting(connector, setting):
pass
def reenable_replication(self, context, volume):
"""Re-enable replication between the replica and primary volume.
This is used to re-enable/fix the replication between primary
and secondary. One use is as part of the fail-back process, when
you re-synchorize your old primary with the promoted volume
(the old replica).
Returns model_update for the volume to reflect the actions of the
driver.
The driver is expected to update the following entries:
'replication_status'
'replication_extended_status'
'replication_driver_data'
Possible 'replication_status' values (in model_update) are:
'error' - replication in error state
'copying' - replication copying data to secondary (inconsistent)
'active' - replication copying data to secondary (consistent)
'active-stopped' - replication data copy on hold (consistent)
'inactive' - replication data copy on hold (inconsistent)
Values in 'replication_extended_status' and 'replication_driver_data'
are managed by the driver.
:param context: Context
:param volume: A dictionary describing the volume
"""
msg = _("sync_replica not implemented.")
raise NotImplementedError(msg)
def get_replication_status(self, context, volume):
"""Query the actual volume replication status from the driver.
Returns model_update for the volume.
The driver is expected to update the following entries:
'replication_status'
'replication_extended_status'
'replication_driver_data'
Possible 'replication_status' values (in model_update) are:
'error' - replication in error state
'copying' - replication copying data to secondary (inconsistent)
'active' - replication copying data to secondary (consistent)
'active-stopped' - replication data copy on hold (consistent)
'inactive' - replication data copy on hold (inconsistent)
Values in 'replication_extended_status' and 'replication_driver_data'
are managed by the driver.
:param context: Context
:param volume: A dictionary describing the volume
"""
return None
def promote_replica(self, context, volume):
"""Promote the replica to be the primary volume.
Following this command, replication between the volumes at
the storage level should be stopped, the replica should be
available to be attached, and the replication status should
be in status 'inactive'.
Returns model_update for the volume.
The driver is expected to update the following entries:
'replication_status'
'replication_extended_status'
'replication_driver_data'
Possible 'replication_status' values (in model_update) are:
'error' - replication in error state
'inactive' - replication data copy on hold (inconsistent)
Values in 'replication_extended_status' and 'replication_driver_data'
are managed by the driver.
:param context: Context
:param volume: A dictionary describing the volume
"""
msg = _("promote_replica not implemented.")
raise NotImplementedError(msg)
# ####### Interface methods for DataPath (Connector) ########
def ensure_export(self, context, volume):
"""Synchronously recreates an export for a volume."""

View File

@ -40,6 +40,7 @@ QUOTAS = quota.QUOTAS
# from, 'error' being the common example.
SNAPSHOT_PROCEED_STATUS = ('available',)
SRC_VOL_PROCEED_STATUS = ('available', 'in-use',)
REPLICA_PROCEED_STATUS = ('active', 'active-stopped')
class ExtractVolumeRequestTask(flow_utils.CinderTask):
@ -58,7 +59,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
# reconstructed elsewhere and continued).
default_provides = set(['availability_zone', 'size', 'snapshot_id',
'source_volid', 'volume_type', 'volume_type_id',
'encryption_key_id'])
'encryption_key_id', 'source_replicaid'])
def __init__(self, image_service, availability_zones, **kwargs):
super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION],
@ -111,6 +112,38 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
source_volid = source_volume['id']
return source_volid
@staticmethod
def _extract_source_replica(source_replica):
"""Extracts the volume id from the provided replica (if provided).
This function validates the input replica_volume dict and checks that
the status of that replica_volume is valid for creating a volume from.
"""
source_replicaid = None
if source_replica is not None:
if source_replica['status'] not in SRC_VOL_PROCEED_STATUS:
msg = _("Unable to create a volume from an originating source"
" volume when its status is not one of %s"
" values")
msg = msg % (", ".join(SRC_VOL_PROCEED_STATUS))
# TODO(harlowja): what happens if the status changes after this
# initial volume status check occurs??? Seems like someone
# could delete the volume after this check passes but before
# the volume is officially created?
raise exception.InvalidVolume(reason=msg)
replication_status = source_replica['replication_status']
if replication_status not in REPLICA_PROCEED_STATUS:
msg = _("Unable to create a volume from a replica"
" when replication status is not one of %s"
" values")
msg = msg % (", ".join(REPLICA_PROCEED_STATUS))
# TODO(ronenkat): what happens if the replication status
# changes after this initial volume status check occurs???
raise exception.InvalidVolume(reason=msg)
source_replicaid = source_replica['id']
return source_replicaid
@staticmethod
def _extract_size(size, source_volume, snapshot):
"""Extracts and validates the volume size.
@ -330,7 +363,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
def execute(self, context, size, snapshot, image_id, source_volume,
availability_zone, volume_type, metadata,
key_manager, backup_source_volume):
key_manager, backup_source_volume, source_replica):
utils.check_exclusive_options(snapshot=snapshot,
imageRef=image_id,
@ -341,6 +374,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
# volume will remain available after we do this initial verification??
snapshot_id = self._extract_snapshot(snapshot)
source_volid = self._extract_source_volume(source_volume)
source_replicaid = self._extract_source_replica(source_replica)
size = self._extract_size(size, source_volume, snapshot)
self._check_image_metadata(context, image_id, size)
@ -354,8 +388,15 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
# should copy encryption metadata from the encrypted volume type to the
# volume upon creation and propagate that information to each snapshot.
# This strategy avoid any dependency upon the encrypted volume type.
def_vol_type = volume_types.get_default_volume_type()
if not volume_type and not source_volume and not snapshot:
volume_type = volume_types.get_default_volume_type()
volume_type = def_vol_type
# When creating a clone of a replica (replication test), we can't
# use the volume type of the replica, therefore, we use the default.
# NOTE(ronenkat): this assumes the default type is not replicated.
if source_replicaid:
volume_type = def_vol_type
volume_type_id = self._get_volume_type_id(volume_type,
source_volume, snapshot,
@ -387,6 +428,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
'volume_type_id': volume_type_id,
'encryption_key_id': encryption_key_id,
'qos_specs': specs,
'source_replicaid': source_replicaid,
}
@ -401,7 +443,8 @@ class EntryCreateTask(flow_utils.CinderTask):
def __init__(self, db):
requires = ['availability_zone', 'description', 'metadata',
'name', 'reservations', 'size', 'snapshot_id',
'source_volid', 'volume_type_id', 'encryption_key_id']
'source_volid', 'volume_type_id', 'encryption_key_id',
'source_replicaid']
super(EntryCreateTask, self).__init__(addons=[ACTION],
requires=requires)
self.db = db
@ -427,6 +470,7 @@ class EntryCreateTask(flow_utils.CinderTask):
# Rename these to the internal name.
'display_description': kwargs.pop('description'),
'display_name': kwargs.pop('name'),
'replication_status': 'disabled',
}
# Merge in the other required arguments which should provide the rest
@ -612,7 +656,7 @@ class VolumeCastTask(flow_utils.CinderTask):
def __init__(self, scheduler_rpcapi, volume_rpcapi, db):
requires = ['image_id', 'scheduler_hints', 'snapshot_id',
'source_volid', 'volume_id', 'volume_type',
'volume_properties']
'volume_properties', 'source_replicaid']
super(VolumeCastTask, self).__init__(addons=[ACTION],
requires=requires)
self.volume_rpcapi = volume_rpcapi
@ -621,6 +665,7 @@ class VolumeCastTask(flow_utils.CinderTask):
def _cast_create_volume(self, context, request_spec, filter_properties):
source_volid = request_spec['source_volid']
source_replicaid = request_spec['source_replicaid']
volume_id = request_spec['volume_id']
snapshot_id = request_spec['snapshot_id']
image_id = request_spec['image_id']
@ -639,6 +684,9 @@ class VolumeCastTask(flow_utils.CinderTask):
elif source_volid:
source_volume_ref = self.db.volume_get(context, source_volid)
host = source_volume_ref['host']
elif source_replicaid:
source_volume_ref = self.db.volume_get(context, source_replicaid)
host = source_volume_ref['host']
if not host:
# Cast to the scheduler and let it handle whatever is needed
@ -666,7 +714,8 @@ class VolumeCastTask(flow_utils.CinderTask):
allow_reschedule=False,
snapshot_id=snapshot_id,
image_id=image_id,
source_volid=source_volid)
source_volid=source_volid,
source_replicaid=source_replicaid)
def execute(self, context, **kwargs):
scheduler_hints = kwargs.pop('scheduler_hints', None)

View File

@ -208,7 +208,8 @@ class ExtractVolumeSpecTask(flow_utils.CinderTask):
default_provides = 'volume_spec'
def __init__(self, db):
requires = ['image_id', 'snapshot_id', 'source_volid']
requires = ['image_id', 'snapshot_id', 'source_volid',
'source_replicaid']
super(ExtractVolumeSpecTask, self).__init__(addons=[ACTION],
requires=requires)
self.db = db
@ -254,6 +255,18 @@ class ExtractVolumeSpecTask(flow_utils.CinderTask):
'source_volstatus': source_volume_ref['status'],
'type': 'source_vol',
})
elif kwargs.get('source_replicaid'):
# We are making a clone based on the replica.
#
# NOTE(harlowja): This will likely fail if the replica
# disappeared by the time this call occurred.
source_volid = kwargs['source_replicaid']
source_volume_ref = self.db.volume_get(context, source_volid)
specs.update({
'source_replicaid': source_volid,
'source_replicastatus': source_volume_ref['status'],
'type': 'source_replica',
})
elif kwargs.get('image_id'):
# We are making an image based volume instead of a raw volume.
image_href = kwargs['image_id']
@ -363,6 +376,17 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
context,
source_volid,
volume_id)
elif kwargs.get('source_replicaid'):
src_type = 'source replica'
src_id = kwargs['source_replicaid']
source_replicaid = src_id
LOG.debug(log_template % {'src_type': src_type,
'src_id': src_id,
'vol_id': volume_id})
self.db.volume_glance_metadata_copy_from_volume_to_volume(
context,
source_replicaid,
volume_id)
elif kwargs.get('image_id'):
src_type = 'image'
src_id = kwargs['image_id']
@ -432,6 +456,27 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
source_volid=source_volid)
return model_update
def _create_from_source_replica(self, context, volume_ref,
source_replicaid, **kwargs):
# NOTE(harlowja): if the source volume has disappeared this will be our
# detection of that since this database call should fail.
#
# NOTE(harlowja): likely this is not the best place for this to happen
# and we should have proper locks on the source volume while actions
# that use the source volume are underway.
srcvol_ref = self.db.volume_get(context, source_replicaid)
model_update = self.driver.create_replica_test_volume(volume_ref,
srcvol_ref)
# NOTE(harlowja): Subtasks would be useful here since after this
# point the volume has already been created and further failures
# will not destroy the volume (although they could in the future).
if srcvol_ref.bootable:
self._handle_bootable_volume_glance_meta(
context,
volume_ref['id'],
source_replicaid=source_replicaid)
return model_update
def _copy_image_to_volume(self, context, volume_ref,
image_id, image_location, image_service):
"""Downloads Glance image to the specified volume."""
@ -588,6 +633,9 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
elif create_type == 'source_vol':
model_update = self._create_from_source_volume(
context, volume_ref=volume_ref, **volume_spec)
elif create_type == 'source_replica':
model_update = self._create_from_source_replica(
context, volume_ref=volume_ref, **volume_spec)
elif create_type == 'image':
model_update = self._create_from_image(context,
volume_ref=volume_ref,
@ -661,7 +709,7 @@ class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
def get_flow(context, db, driver, scheduler_rpcapi, host, volume_id,
allow_reschedule, reschedule_context, request_spec,
filter_properties, snapshot_id=None, image_id=None,
source_volid=None):
source_volid=None, source_replicaid=None):
"""Constructs and returns the manager entrypoint flow.
This flow will do the following:
@ -691,6 +739,7 @@ def get_flow(context, db, driver, scheduler_rpcapi, host, volume_id,
'snapshot_id': snapshot_id,
'source_volid': source_volid,
'volume_id': volume_id,
'source_replicaid': source_replicaid,
}
volume_flow.add(ExtractVolumeRefTask(db, host))

View File

@ -151,7 +151,7 @@ def locked_snapshot_operation(f):
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
RPC_API_VERSION = '1.16'
RPC_API_VERSION = '1.17'
target = messaging.Target(version=RPC_API_VERSION)
@ -273,7 +273,8 @@ class VolumeManager(manager.SchedulerDependentManager):
def create_volume(self, context, volume_id, request_spec=None,
filter_properties=None, allow_reschedule=True,
snapshot_id=None, image_id=None, source_volid=None):
snapshot_id=None, image_id=None, source_volid=None,
source_replicaid=None):
"""Creates the volume."""
context_saved = context.deepcopy()
@ -294,6 +295,7 @@ class VolumeManager(manager.SchedulerDependentManager):
snapshot_id=snapshot_id,
image_id=image_id,
source_volid=source_volid,
source_replicaid=source_replicaid,
allow_reschedule=allow_reschedule,
reschedule_context=context_saved,
request_spec=request_spec,
@ -301,7 +303,7 @@ class VolumeManager(manager.SchedulerDependentManager):
except Exception:
LOG.exception(_("Failed to create manager volume flow"))
raise exception.CinderException(
_("Failed to create manager volume flow"))
_("Failed to create manager volume flow."))
if snapshot_id is not None:
# Make sure the snapshot is not deleted until we are done with it.
@ -309,6 +311,9 @@ class VolumeManager(manager.SchedulerDependentManager):
elif source_volid is not None:
# Make sure the volume is not deleted until we are done with it.
locked_action = "%s-%s" % (source_volid, 'delete_volume')
elif source_replicaid is not None:
# Make sure the volume is not deleted until we are done with it.
locked_action = "%s-%s" % (source_replicaid, 'delete_volume')
else:
locked_action = None
@ -1263,11 +1268,22 @@ class VolumeManager(manager.SchedulerDependentManager):
retyped = True
# Call driver to try and change the type
retype_model_update = None
if not retyped:
try:
new_type = volume_types.get_volume_type(context, new_type_id)
retyped = self.driver.retype(context, volume_ref, new_type,
diff, host)
ret = self.driver.retype(context,
volume_ref,
new_type,
diff,
host)
# Check if the driver retype provided a model update or
# just a retype indication
if type(ret) == tuple:
retyped, retype_model_update = ret
else:
retyped = ret
if retyped:
LOG.info(_("Volume %s: retyped successfully"), volume_id)
except Exception as ex:
@ -1294,6 +1310,16 @@ class VolumeManager(manager.SchedulerDependentManager):
msg = _("Volume must not have snapshots.")
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
# Don't allow volume with replicas to be migrated
rep_status = volume_ref['replication_status']
if rep_status is not None and rep_status != 'disabled':
_retype_error(context, volume_id, old_reservations,
new_reservations, status_update)
msg = _("Volume must not be replicated.")
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
self.db.volume_update(context, volume_ref['id'],
{'migration_status': 'starting'})
@ -1305,10 +1331,12 @@ class VolumeManager(manager.SchedulerDependentManager):
_retype_error(context, volume_id, old_reservations,
new_reservations, status_update)
else:
self.db.volume_update(context, volume_id,
{'volume_type_id': new_type_id,
'host': host['host'],
'status': status_update['status']})
model_update = {'volume_type_id': new_type_id,
'host': host['host'],
'status': status_update['status']}
if retype_model_update:
model_update.update(retype_model_update)
self.db.volume_update(context, volume_id, model_update)
if old_reservations:
QUOTAS.commit(context, old_reservations, project_id=project_id)
@ -1317,7 +1345,7 @@ class VolumeManager(manager.SchedulerDependentManager):
self.publish_service_capabilities(context)
def manage_existing(self, ctxt, volume_id, ref=None):
LOG.debug('manage_existing: managing %s' % ref)
LOG.debug('manage_existing: managing %s.' % ref)
try:
flow_engine = manage_existing.get_flow(
ctxt,
@ -1339,3 +1367,96 @@ class VolumeManager(manager.SchedulerDependentManager):
# Update volume stats
self.stats['allocated_capacity_gb'] += volume_ref['size']
return volume_ref['id']
def promote_replica(self, ctxt, volume_id):
"""Promote volume replica secondary to be the primary volume."""
try:
utils.require_driver_initialized(self.driver)
except exception.DriverNotInitialized:
with excutils.save_and_reraise_exception():
LOG.exception(_("Failed to promote replica for volume %(id)s.")
% {'id': volume_id})
volume = self.db.volume_get(ctxt, volume_id)
model_update = None
try:
LOG.debug("Volume %s: promote replica.", volume_id)
model_update = self.driver.promote_replica(ctxt, volume)
except exception.CinderException:
err_msg = (_('Error promoting secondary volume to primary'))
raise exception.ReplicationError(reason=err_msg,
volume_id=volume_id)
try:
if model_update:
volume = self.db.volume_update(ctxt,
volume_id,
model_update)
except exception.CinderException:
err_msg = (_("Failed updating model"
" with driver provided model %(model)s") %
{'model': model_update})
raise exception.ReplicationError(reason=err_msg,
volume_id=volume_id)
def reenable_replication(self, ctxt, volume_id):
"""Re-enable replication of secondary volume with primary volumes."""
try:
utils.require_driver_initialized(self.driver)
except exception.DriverNotInitialized:
with excutils.save_and_reraise_exception():
LOG.exception(_("Failed to sync replica for volume %(id)s.")
% {'id': volume_id})
volume = self.db.volume_get(ctxt, volume_id)
model_update = None
try:
LOG.debug("Volume %s: sync replica.", volume_id)
model_update = self.driver.reenable_replication(ctxt, volume)
except exception.CinderException:
err_msg = (_('Error synchronizing secondary volume to primary'))
raise exception.ReplicationError(reason=err_msg,
volume_id=volume_id)
try:
if model_update:
volume = self.db.volume_update(ctxt,
volume_id,
model_update)
except exception.CinderException:
err_msg = (_("Failed updating model"
" with driver provided model %(model)s") %
{'model': model_update})
raise exception.ReplicationError(reason=err_msg,
volume_id=volume_id)
@periodic_task.periodic_task
def _update_replication_relationship_status(self, ctxt):
LOG.info(_('Updating volume replication status.'))
if not self.driver.initialized:
if self.driver.configuration.config_group is None:
config_group = ''
else:
config_group = ('(config name %s)' %
self.driver.configuration.config_group)
LOG.warning(_('Unable to update volume replication status, '
'%(driver_name)s -%(driver_version)s '
'%(config_group)s driver is uninitialized.') %
{'driver_name': self.driver.__class__.__name__,
'driver_version': self.driver.get_version(),
'config_group': config_group})
else:
volumes = self.db.volume_get_all_by_host(ctxt, self.host)
for vol in volumes:
model_update = None
try:
model_update = self.driver.get_replication_status(
ctxt, vol)
if model_update:
self.db.volume_update(ctxt,
vol['id'],
model_update)
except Exception:
LOG.exception(_("Error checking replication status for "
"volume %s") % vol['id'])

View File

@ -51,6 +51,8 @@ class VolumeAPI(object):
1.14 - Adds reservation parameter to extend_volume().
1.15 - Adds manage_existing and unmanage_only flag to delete_volume.
1.16 - Removes create_export.
1.17 - Add replica option to create_volume, promote_replica and
sync_replica.
'''
BASE_RPC_API_VERSION = '1.0'
@ -59,12 +61,13 @@ class VolumeAPI(object):
super(VolumeAPI, self).__init__()
target = messaging.Target(topic=CONF.volume_topic,
version=self.BASE_RPC_API_VERSION)
self.client = rpc.get_client(target, '1.15')
self.client = rpc.get_client(target, '1.17')
def create_volume(self, ctxt, volume, host,
request_spec, filter_properties,
allow_reschedule=True,
snapshot_id=None, image_id=None,
source_replicaid=None,
source_volid=None):
cctxt = self.client.prepare(server=host, version='1.4')
@ -76,6 +79,7 @@ class VolumeAPI(object):
allow_reschedule=allow_reschedule,
snapshot_id=snapshot_id,
image_id=image_id,
source_replicaid=source_replicaid,
source_volid=source_volid),
def delete_volume(self, ctxt, volume, unmanage_only=False):
@ -165,3 +169,11 @@ class VolumeAPI(object):
def manage_existing(self, ctxt, volume, ref):
cctxt = self.client.prepare(server=volume['host'], version='1.15')
cctxt.cast(ctxt, 'manage_existing', volume_id=volume['id'], ref=ref)
def promote_replica(self, ctxt, volume):
cctxt = self.client.prepare(server=volume['host'], version='1.17')
cctxt.cast(ctxt, 'promote_replica', volume_id=volume['id'])
def reenable_replication(self, ctxt, volume):
cctxt = self.client.prepare(server=volume['host'], version='1.17')
cctxt.cast(ctxt, 'reenable_replication', volume_id=volume['id'])

View File

@ -54,7 +54,13 @@ def _usage_from_volume(context, volume_ref, **kw):
created_at=null_safe_str(volume_ref['created_at']),
status=volume_ref['status'],
snapshot_id=volume_ref['snapshot_id'],
size=volume_ref['size'])
size=volume_ref['size'],
replication_status=volume_ref['replication_status'],
replication_extended_status=
volume_ref['replication_extended_status'],
replication_driver_data=
volume_ref['replication_driver_data'],
)
usage_info.update(kw)
return usage_info
@ -107,6 +113,40 @@ def notify_about_snapshot_usage(context, snapshot, event_suffix,
usage_info)
def notify_about_replication_usage(context, volume, suffix,
extra_usage_info=None, host=None):
if not host:
host = CONF.host
if not extra_usage_info:
extra_usage_info = {}
usage_info = _usage_from_volume(context,
volume,
**extra_usage_info)
rpc.get_notifier('replication', host).info(context,
'replication.%s' % suffix,
usage_info)
def notify_about_replication_error(context, volume, suffix,
extra_error_info=None, host=None):
if not host:
host = CONF.host
if not extra_error_info:
extra_error_info = {}
usage_info = _usage_from_volume(context,
volume,
**extra_error_info)
rpc.get_notifier('replication', host).error(context,
'replication.%s' % suffix,
usage_info)
def setup_blkio_cgroup(srcpath, dstpath, bps_limit, execute=utils.execute):
if not bps_limit:
LOG.debug('Not using bps rate limiting on volume copy')

View File

@ -609,6 +609,10 @@
# value)
#transfer_api_class=cinder.transfer.api.API
# The full class name of the volume replication API class
# (string value)
#replication_api_class=cinder.replication.api.API
#
# Options defined in cinder.compute

View File

@ -52,6 +52,9 @@
"volume:delete_transfer": [],
"volume:get_all_transfers": [],
"volume_extension:replication:promote": ["rule:admin_api"],
"volume_extension:replication:reenable": ["rule:admin_api"],
"backup:create" : [],
"backup:delete": [],
"backup:get": [],