RBD: Implement v2.1 replication
This patch implements v2.1 replication in the RBD driver. A single ceph backend can support both replicated and non-replicated volumes using volume types. For replicated volumes, both clusters are expected to be configured with rbd-mirror with keys in place and image mirroring enabled on the pool. The RBD driver will enable replication per-volume if the volume type requests it. On failover, each replicated volume is promoted to primary on the secondary cluster and new connection requests will receive connection information for the volume on the secondary cluster. At the time of writing, failback is not supported by Cinder and requires admin intervention to reach a per-failover state. Non replicated volumes will be set to error status to reflect that they are not available, and the previous status will be stored in the replication_driver_data field. There are two configuration pieces required to make this work: 1. A volume type that enables replication: $ cinder type-create replicated $ cinder type-key replicated set volume_backend_name=ceph $ cinder type-key replicated set replication_enabled='<is> True' 2. A secondary backend defined in cinder.conf: [ceph] ... replication_device = backend_id:secondary, conf:/etc/ceph/secondary.conf, user:cinder The only required parameter is backend_id, as conf and user have defaults. Conf defaults to /etc/ceph/$backend_id.conf and user defaults to rbd_user or cinder if that one is None. We also have a new configuration option for the RBD driver called replication_connect_timeout that allows us to specify the timeout for the promotion/demotion of a single volume. We try to do a clean failover for cases where there is still connectivity with the primary cluster, so we'll try to do a demotion of the original images, and when one of the demotions fails we just assume that all of the other demotions will fail as well (the cluster is not accesible) so we'll do a forceful promotion for those images. DocImpact Implements: blueprint rbd-replication Co-Authored-By: Gorka Eguileor <geguileo@redhat.com> Change-Id: I58c38fe11014aaade6b42f4bdf9d32b73c82e18d
This commit is contained in:
parent
808862bf37
commit
f81d8a37de
@ -155,9 +155,10 @@ class RBDTestCase(test.TestCase):
|
||||
self.cfg.image_conversion_dir = None
|
||||
self.cfg.rbd_cluster_name = 'nondefault'
|
||||
self.cfg.rbd_pool = 'rbd'
|
||||
self.cfg.rbd_ceph_conf = None
|
||||
self.cfg.rbd_ceph_conf = '/etc/ceph/my_ceph.conf'
|
||||
self.cfg.rbd_secret_uuid = None
|
||||
self.cfg.rbd_user = None
|
||||
self.cfg.rbd_user = 'cinder'
|
||||
self.cfg.volume_backend_name = None
|
||||
self.cfg.volume_dd_blocksize = '1M'
|
||||
self.cfg.rbd_store_chunk_size = 4
|
||||
self.cfg.rados_connection_retries = 3
|
||||
@ -198,13 +199,185 @@ class RBDTestCase(test.TestCase):
|
||||
self.assertRaises(exception.InvalidConfigurationValue,
|
||||
self.driver.check_for_setup_error)
|
||||
|
||||
def test_parse_replication_config_empty(self):
|
||||
self.driver._parse_replication_configs([])
|
||||
self.assertEqual([], self.driver._replication_targets)
|
||||
|
||||
def test_parse_replication_config_missing(self):
|
||||
"""Parsing replication_device without required backend_id."""
|
||||
cfg = [{'conf': '/etc/ceph/secondary.conf'}]
|
||||
self.assertRaises(exception.InvalidConfigurationValue,
|
||||
self.driver._parse_replication_configs,
|
||||
cfg)
|
||||
|
||||
def test_parse_replication_config_defaults(self):
|
||||
"""Parsing replication_device with default conf and user."""
|
||||
cfg = [{'backend_id': 'secondary-backend'}]
|
||||
expected = [{'name': 'secondary-backend',
|
||||
'conf': '/etc/ceph/secondary-backend.conf',
|
||||
'user': 'cinder'}]
|
||||
self.driver._parse_replication_configs(cfg)
|
||||
self.assertEqual(expected, self.driver._replication_targets)
|
||||
|
||||
@ddt.data(1, 2)
|
||||
def test_parse_replication_config(self, num_targets):
|
||||
cfg = [{'backend_id': 'secondary-backend',
|
||||
'conf': 'foo',
|
||||
'user': 'bar'},
|
||||
{'backend_id': 'tertiary-backend'}]
|
||||
expected = [{'name': 'secondary-backend',
|
||||
'conf': 'foo',
|
||||
'user': 'bar'},
|
||||
{'name': 'tertiary-backend',
|
||||
'conf': '/etc/ceph/tertiary-backend.conf',
|
||||
'user': 'cinder'}]
|
||||
self.driver._parse_replication_configs(cfg[:num_targets])
|
||||
self.assertEqual(expected[:num_targets],
|
||||
self.driver._replication_targets)
|
||||
|
||||
def test_do_setup_replication_disabled(self):
|
||||
with mock.patch.object(self.driver.configuration, 'safe_get',
|
||||
return_value=None):
|
||||
self.driver.do_setup(self.context)
|
||||
self.assertFalse(self.driver._is_replication_enabled)
|
||||
self.assertEqual([], self.driver._replication_targets)
|
||||
self.assertEqual([], self.driver._target_names)
|
||||
self.assertEqual({'name': self.cfg.rbd_cluster_name,
|
||||
'conf': self.cfg.rbd_ceph_conf,
|
||||
'user': self.cfg.rbd_user},
|
||||
self.driver._active_config)
|
||||
|
||||
def test_do_setup_replication(self):
|
||||
cfg = [{'backend_id': 'secondary-backend',
|
||||
'conf': 'foo',
|
||||
'user': 'bar'}]
|
||||
expected = [{'name': 'secondary-backend',
|
||||
'conf': 'foo',
|
||||
'user': 'bar'}]
|
||||
|
||||
with mock.patch.object(self.driver.configuration, 'safe_get',
|
||||
return_value=cfg):
|
||||
self.driver.do_setup(self.context)
|
||||
self.assertTrue(self.driver._is_replication_enabled)
|
||||
self.assertEqual(expected, self.driver._replication_targets)
|
||||
self.assertEqual({'name': self.cfg.rbd_cluster_name,
|
||||
'conf': self.cfg.rbd_ceph_conf,
|
||||
'user': self.cfg.rbd_user},
|
||||
self.driver._active_config)
|
||||
|
||||
def test_do_setup_replication_failed_over(self):
|
||||
cfg = [{'backend_id': 'secondary-backend',
|
||||
'conf': 'foo',
|
||||
'user': 'bar'}]
|
||||
expected = [{'name': 'secondary-backend',
|
||||
'conf': 'foo',
|
||||
'user': 'bar'}]
|
||||
self.driver._active_backend_id = 'secondary-backend'
|
||||
|
||||
with mock.patch.object(self.driver.configuration, 'safe_get',
|
||||
return_value=cfg):
|
||||
self.driver.do_setup(self.context)
|
||||
self.assertTrue(self.driver._is_replication_enabled)
|
||||
self.assertEqual(expected, self.driver._replication_targets)
|
||||
self.assertEqual(expected[0], self.driver._active_config)
|
||||
|
||||
def test_do_setup_replication_failed_over_unknown(self):
|
||||
cfg = [{'backend_id': 'secondary-backend',
|
||||
'conf': 'foo',
|
||||
'user': 'bar'}]
|
||||
self.driver._active_backend_id = 'unknown-backend'
|
||||
|
||||
with mock.patch.object(self.driver.configuration, 'safe_get',
|
||||
return_value=cfg):
|
||||
self.assertRaises(exception.InvalidReplicationTarget,
|
||||
self.driver.do_setup,
|
||||
self.context)
|
||||
|
||||
@mock.patch.object(driver.RBDDriver, '_enable_replication',
|
||||
return_value=mock.sentinel.volume_update)
|
||||
def test_enable_replication_if_needed_replicated_volume(self, mock_enable):
|
||||
self.volume_a.volume_type = fake_volume.fake_volume_type_obj(
|
||||
self.context,
|
||||
id=fake.VOLUME_TYPE_ID,
|
||||
extra_specs={'replication_enabled': '<is> True'})
|
||||
res = self.driver._enable_replication_if_needed(self.volume_a)
|
||||
self.assertEqual(mock.sentinel.volume_update, res)
|
||||
mock_enable.assert_called_once_with(self.volume_a)
|
||||
|
||||
@ddt.data(False, True)
|
||||
@mock.patch.object(driver.RBDDriver, '_enable_replication')
|
||||
def test_enable_replication_if_needed_non_replicated(self, enabled,
|
||||
mock_enable):
|
||||
self.driver._is_replication_enabled = enabled
|
||||
res = self.driver._enable_replication_if_needed(self.volume_a)
|
||||
if enabled:
|
||||
expect = {'replication_status': fields.ReplicationStatus.DISABLED}
|
||||
else:
|
||||
expect = None
|
||||
self.assertEqual(expect, res)
|
||||
mock_enable.assert_not_called()
|
||||
|
||||
@ddt.data(True, False)
|
||||
@common_mocks
|
||||
def test_create_volume(self):
|
||||
def test_enable_replication(self, journaling_enabled):
|
||||
"""Test _enable_replication method.
|
||||
|
||||
We want to confirm that if the Ceph backend has globally enabled
|
||||
journaling we don't try to enable it again and we properly indicate
|
||||
with our return value that it was already enabled.
|
||||
"""
|
||||
journaling_feat = 1
|
||||
self.driver.rbd.RBD_FEATURE_JOURNALING = journaling_feat
|
||||
image = self.mock_proxy.return_value.__enter__.return_value
|
||||
if journaling_enabled:
|
||||
image.features.return_value = journaling_feat
|
||||
else:
|
||||
image.features.return_value = 0
|
||||
|
||||
enabled = str(journaling_enabled).lower()
|
||||
expected = {
|
||||
'replication_driver_data': '{"had_journaling":%s}' % enabled,
|
||||
'replication_status': 'enabled',
|
||||
}
|
||||
|
||||
res = self.driver._enable_replication(self.volume_a)
|
||||
self.assertEqual(expected, res)
|
||||
|
||||
if journaling_enabled:
|
||||
image.update_features.assert_not_called()
|
||||
else:
|
||||
image.update_features.assert_called_once_with(journaling_feat,
|
||||
True)
|
||||
image.mirror_image_enable.assert_called_once_with()
|
||||
|
||||
@ddt.data('true', 'false')
|
||||
@common_mocks
|
||||
def test_disable_replication(self, had_journaling):
|
||||
driver_data = '{"had_journaling": %s}' % had_journaling
|
||||
self.volume_a.replication_driver_data = driver_data
|
||||
image = self.mock_proxy.return_value.__enter__.return_value
|
||||
|
||||
res = self.driver._disable_replication(self.volume_a)
|
||||
expected = {'replication_status': fields.ReplicationStatus.DISABLED,
|
||||
'replication_driver_data': None}
|
||||
self.assertEqual(expected, res)
|
||||
image.mirror_image_disable.assert_called_once_with(False)
|
||||
|
||||
if had_journaling == 'true':
|
||||
image.update_features.assert_not_called()
|
||||
else:
|
||||
image.update_features.assert_called_once_with(
|
||||
self.driver.rbd.RBD_FEATURE_JOURNALING, False)
|
||||
|
||||
@common_mocks
|
||||
@mock.patch.object(driver.RBDDriver, '_enable_replication')
|
||||
def test_create_volume(self, mock_enable_repl):
|
||||
client = self.mock_client.return_value
|
||||
client.__enter__.return_value = client
|
||||
|
||||
self.driver.create_volume(self.volume_a)
|
||||
res = self.driver.create_volume(self.volume_a)
|
||||
|
||||
self.assertIsNone(res)
|
||||
chunk_size = self.cfg.rbd_store_chunk_size * units.Mi
|
||||
order = int(math.log(chunk_size, 2))
|
||||
args = [client.ioctx, str(self.volume_a.name),
|
||||
@ -215,6 +388,37 @@ class RBDTestCase(test.TestCase):
|
||||
*args, **kwargs)
|
||||
client.__enter__.assert_called_once_with()
|
||||
client.__exit__.assert_called_once_with(None, None, None)
|
||||
mock_enable_repl.assert_not_called()
|
||||
|
||||
@common_mocks
|
||||
@mock.patch.object(driver.RBDDriver, '_enable_replication')
|
||||
def test_create_volume_replicated(self, mock_enable_repl):
|
||||
self.volume_a.volume_type = fake_volume.fake_volume_type_obj(
|
||||
self.context,
|
||||
id=fake.VOLUME_TYPE_ID,
|
||||
extra_specs={'replication_enabled': '<is> True'})
|
||||
|
||||
client = self.mock_client.return_value
|
||||
client.__enter__.return_value = client
|
||||
|
||||
expected_update = {
|
||||
'replication_status': 'enabled',
|
||||
'replication_driver_data': '{"had_journaling": false}'
|
||||
}
|
||||
mock_enable_repl.return_value = expected_update
|
||||
|
||||
res = self.driver.create_volume(self.volume_a)
|
||||
self.assertEqual(expected_update, res)
|
||||
mock_enable_repl.assert_called_once_with(self.volume_a)
|
||||
|
||||
chunk_size = self.cfg.rbd_store_chunk_size * units.Mi
|
||||
order = int(math.log(chunk_size, 2))
|
||||
self.mock_rbd.RBD.return_value.create.assert_called_once_with(
|
||||
client.ioctx, self.volume_a.name, self.volume_a.size * units.Gi,
|
||||
order, old_format=False, features=client.features)
|
||||
|
||||
client.__enter__.assert_called_once_with()
|
||||
client.__exit__.assert_called_once_with(None, None, None)
|
||||
|
||||
@common_mocks
|
||||
def test_create_encrypted_volume(self):
|
||||
@ -607,7 +811,8 @@ class RBDTestCase(test.TestCase):
|
||||
volume.parent_info.assert_called_once_with()
|
||||
|
||||
@common_mocks
|
||||
def test_create_cloned_volume_same_size(self):
|
||||
@mock.patch.object(driver.RBDDriver, '_enable_replication')
|
||||
def test_create_cloned_volume_same_size(self, mock_enable_repl):
|
||||
self.cfg.rbd_max_clone_depth = 2
|
||||
|
||||
with mock.patch.object(self.driver, '_get_clone_depth') as \
|
||||
@ -616,8 +821,10 @@ class RBDTestCase(test.TestCase):
|
||||
with mock.patch.object(self.driver, '_resize') as mock_resize:
|
||||
mock_get_clone_depth.return_value = 1
|
||||
|
||||
self.driver.create_cloned_volume(self.volume_b, self.volume_a)
|
||||
res = self.driver.create_cloned_volume(self.volume_b,
|
||||
self.volume_a)
|
||||
|
||||
self.assertIsNone(res)
|
||||
(self.mock_rbd.Image.return_value.create_snap
|
||||
.assert_called_once_with('.'.join(
|
||||
(self.volume_b.name, 'clone_snap'))))
|
||||
@ -629,11 +836,47 @@ class RBDTestCase(test.TestCase):
|
||||
self.mock_rbd.Image.return_value.close \
|
||||
.assert_called_once_with()
|
||||
self.assertTrue(mock_get_clone_depth.called)
|
||||
self.assertEqual(
|
||||
0, mock_resize.call_count)
|
||||
mock_resize.assert_not_called()
|
||||
mock_enable_repl.assert_not_called()
|
||||
|
||||
@common_mocks
|
||||
def test_create_cloned_volume_different_size(self):
|
||||
@mock.patch.object(driver.RBDDriver, '_get_clone_depth', return_value=1)
|
||||
@mock.patch.object(driver.RBDDriver, '_resize')
|
||||
@mock.patch.object(driver.RBDDriver, '_enable_replication')
|
||||
def test_create_cloned_volume_replicated(self,
|
||||
mock_enable_repl,
|
||||
mock_resize,
|
||||
mock_get_clone_depth):
|
||||
self.cfg.rbd_max_clone_depth = 2
|
||||
self.volume_b.volume_type = fake_volume.fake_volume_type_obj(
|
||||
self.context,
|
||||
id=fake.VOLUME_TYPE_ID,
|
||||
extra_specs={'replication_enabled': '<is> True'})
|
||||
|
||||
expected_update = {
|
||||
'replication_status': 'enabled',
|
||||
'replication_driver_data': '{"had_journaling": false}'
|
||||
}
|
||||
mock_enable_repl.return_value = expected_update
|
||||
|
||||
res = self.driver.create_cloned_volume(self.volume_b, self.volume_a)
|
||||
self.assertEqual(expected_update, res)
|
||||
mock_enable_repl.assert_called_once_with(self.volume_b)
|
||||
|
||||
name = self.volume_b.name
|
||||
image = self.mock_rbd.Image.return_value
|
||||
|
||||
image.create_snap.assert_called_once_with(name + '.clone_snap')
|
||||
image.protect_snap.assert_called_once_with(name + '.clone_snap')
|
||||
self.assertEqual(1, self.mock_rbd.RBD.return_value.clone.call_count)
|
||||
self.mock_rbd.Image.return_value.close.assert_called_once_with()
|
||||
mock_get_clone_depth.assert_called_once_with(
|
||||
self.mock_client().__enter__(), self.volume_a.name)
|
||||
mock_resize.assert_not_called()
|
||||
|
||||
@common_mocks
|
||||
@mock.patch.object(driver.RBDDriver, '_enable_replication')
|
||||
def test_create_cloned_volume_different_size(self, mock_enable_repl):
|
||||
self.cfg.rbd_max_clone_depth = 2
|
||||
|
||||
with mock.patch.object(self.driver, '_get_clone_depth') as \
|
||||
@ -643,8 +886,10 @@ class RBDTestCase(test.TestCase):
|
||||
mock_get_clone_depth.return_value = 1
|
||||
|
||||
self.volume_b.size = 20
|
||||
self.driver.create_cloned_volume(self.volume_b, self.volume_a)
|
||||
res = self.driver.create_cloned_volume(self.volume_b,
|
||||
self.volume_a)
|
||||
|
||||
self.assertIsNone(res)
|
||||
(self.mock_rbd.Image.return_value.create_snap
|
||||
.assert_called_once_with('.'.join(
|
||||
(self.volume_b.name, 'clone_snap'))))
|
||||
@ -658,9 +903,11 @@ class RBDTestCase(test.TestCase):
|
||||
self.assertTrue(mock_get_clone_depth.called)
|
||||
self.assertEqual(
|
||||
1, mock_resize.call_count)
|
||||
mock_enable_repl.assert_not_called()
|
||||
|
||||
@common_mocks
|
||||
def test_create_cloned_volume_w_flatten(self):
|
||||
@mock.patch.object(driver.RBDDriver, '_enable_replication')
|
||||
def test_create_cloned_volume_w_flatten(self, mock_enable_repl):
|
||||
self.cfg.rbd_max_clone_depth = 1
|
||||
|
||||
with mock.patch.object(self.driver, '_get_clone_info') as \
|
||||
@ -673,8 +920,10 @@ class RBDTestCase(test.TestCase):
|
||||
# Try with no flatten required
|
||||
mock_get_clone_depth.return_value = 1
|
||||
|
||||
self.driver.create_cloned_volume(self.volume_b, self.volume_a)
|
||||
res = self.driver.create_cloned_volume(self.volume_b,
|
||||
self.volume_a)
|
||||
|
||||
self.assertIsNone(res)
|
||||
(self.mock_rbd.Image.return_value.create_snap
|
||||
.assert_called_once_with('.'.join(
|
||||
(self.volume_b.name, 'clone_snap'))))
|
||||
@ -694,9 +943,11 @@ class RBDTestCase(test.TestCase):
|
||||
self.assertEqual(
|
||||
2, self.mock_rbd.Image.return_value.close.call_count)
|
||||
self.assertTrue(mock_get_clone_depth.called)
|
||||
mock_enable_repl.assert_not_called()
|
||||
|
||||
@common_mocks
|
||||
def test_create_cloned_volume_w_clone_exception(self):
|
||||
@mock.patch.object(driver.RBDDriver, '_enable_replication')
|
||||
def test_create_cloned_volume_w_clone_exception(self, mock_enable_repl):
|
||||
self.cfg.rbd_max_clone_depth = 2
|
||||
self.mock_rbd.RBD.return_value.clone.side_effect = (
|
||||
self.mock_rbd.RBD.Error)
|
||||
@ -724,6 +975,7 @@ class RBDTestCase(test.TestCase):
|
||||
.assert_called_once_with('.'.join(
|
||||
(self.volume_b.name, 'clone_snap'))))
|
||||
self.mock_rbd.Image.return_value.close.assert_called_once_with()
|
||||
mock_enable_repl.assert_not_called()
|
||||
|
||||
@common_mocks
|
||||
def test_good_locations(self):
|
||||
@ -814,8 +1066,9 @@ class RBDTestCase(test.TestCase):
|
||||
self.cfg.image_conversion_dir = '/var/run/cinder/tmp'
|
||||
self._copy_image()
|
||||
|
||||
@ddt.data(True, False)
|
||||
@common_mocks
|
||||
def test_update_volume_stats(self):
|
||||
def test_update_volume_stats(self, replication_enabled):
|
||||
client = self.mock_client.return_value
|
||||
client.__enter__.return_value = client
|
||||
|
||||
@ -829,11 +1082,9 @@ class RBDTestCase(test.TestCase):
|
||||
'{"name":"volumes","id":3,"stats":{"kb_used":0,"bytes_used":0,'
|
||||
'"max_avail":28987613184,"objects":0}}]}\n', '')
|
||||
|
||||
self.mock_object(self.driver.configuration, 'safe_get',
|
||||
mock_driver_configuration)
|
||||
|
||||
expected = dict(
|
||||
volume_backend_name='RBD',
|
||||
replication_enabled=replication_enabled,
|
||||
vendor_name='Open Source',
|
||||
driver_version=self.driver.VERSION,
|
||||
storage_protocol='ceph',
|
||||
@ -845,6 +1096,18 @@ class RBDTestCase(test.TestCase):
|
||||
max_over_subscription_ratio=1.0,
|
||||
multiattach=False)
|
||||
|
||||
if replication_enabled:
|
||||
targets = [{'backend_id': 'secondary-backend'},
|
||||
{'backend_id': 'tertiary-backend'}]
|
||||
with mock.patch.object(self.driver.configuration, 'safe_get',
|
||||
return_value=targets):
|
||||
self.driver._do_setup_replication()
|
||||
expected['replication_targets'] = [t['backend_id']for t in targets]
|
||||
expected['replication_targets'].append('default')
|
||||
|
||||
self.mock_object(self.driver.configuration, 'safe_get',
|
||||
mock_driver_configuration)
|
||||
|
||||
actual = self.driver.get_volume_stats(True)
|
||||
client.cluster.mon_command.assert_called_once_with(
|
||||
'{"prefix":"df", "format":"json"}', '')
|
||||
@ -863,6 +1126,7 @@ class RBDTestCase(test.TestCase):
|
||||
mock_driver_configuration)
|
||||
|
||||
expected = dict(volume_backend_name='RBD',
|
||||
replication_enabled=False,
|
||||
vendor_name='Open Source',
|
||||
driver_version=self.driver.VERSION,
|
||||
storage_protocol='ceph',
|
||||
@ -904,8 +1168,8 @@ class RBDTestCase(test.TestCase):
|
||||
'hosts': hosts,
|
||||
'ports': ports,
|
||||
'cluster_name': self.cfg.rbd_cluster_name,
|
||||
'auth_enabled': False,
|
||||
'auth_username': None,
|
||||
'auth_enabled': True,
|
||||
'auth_username': self.cfg.rbd_user,
|
||||
'secret_type': 'ceph',
|
||||
'secret_uuid': None,
|
||||
'volume_id': self.volume_a.id
|
||||
@ -920,7 +1184,8 @@ class RBDTestCase(test.TestCase):
|
||||
{'rbd_chunk_size': 32, 'order': 25})
|
||||
@ddt.unpack
|
||||
@common_mocks
|
||||
def test_clone(self, rbd_chunk_size, order):
|
||||
@mock.patch.object(driver.RBDDriver, '_enable_replication')
|
||||
def test_clone(self, mock_enable_repl, rbd_chunk_size, order):
|
||||
self.cfg.rbd_store_chunk_size = rbd_chunk_size
|
||||
src_pool = u'images'
|
||||
src_image = u'image-name'
|
||||
@ -938,7 +1203,56 @@ class RBDTestCase(test.TestCase):
|
||||
# capture both rados client used to perform the clone
|
||||
client.__enter__.side_effect = mock__enter__(client)
|
||||
|
||||
self.driver._clone(self.volume_a, src_pool, src_image, src_snap)
|
||||
res = self.driver._clone(self.volume_a, src_pool, src_image, src_snap)
|
||||
|
||||
self.assertEqual({}, res)
|
||||
|
||||
args = [client_stack[0].ioctx, str(src_image), str(src_snap),
|
||||
client_stack[1].ioctx, str(self.volume_a.name)]
|
||||
kwargs = {'features': client.features,
|
||||
'order': order}
|
||||
self.mock_rbd.RBD.return_value.clone.assert_called_once_with(
|
||||
*args, **kwargs)
|
||||
self.assertEqual(2, client.__enter__.call_count)
|
||||
mock_enable_repl.assert_not_called()
|
||||
|
||||
@common_mocks
|
||||
@mock.patch.object(driver.RBDDriver, '_enable_replication')
|
||||
def test_clone_replicated(self, mock_enable_repl):
|
||||
rbd_chunk_size = 1
|
||||
order = 20
|
||||
self.volume_a.volume_type = fake_volume.fake_volume_type_obj(
|
||||
self.context,
|
||||
id=fake.VOLUME_TYPE_ID,
|
||||
extra_specs={'replication_enabled': '<is> True'})
|
||||
|
||||
expected_update = {
|
||||
'replication_status': 'enabled',
|
||||
'replication_driver_data': '{"had_journaling": false}'
|
||||
}
|
||||
mock_enable_repl.return_value = expected_update
|
||||
|
||||
self.cfg.rbd_store_chunk_size = rbd_chunk_size
|
||||
src_pool = u'images'
|
||||
src_image = u'image-name'
|
||||
src_snap = u'snapshot-name'
|
||||
|
||||
client_stack = []
|
||||
|
||||
def mock__enter__(inst):
|
||||
def _inner():
|
||||
client_stack.append(inst)
|
||||
return inst
|
||||
return _inner
|
||||
|
||||
client = self.mock_client.return_value
|
||||
# capture both rados client used to perform the clone
|
||||
client.__enter__.side_effect = mock__enter__(client)
|
||||
|
||||
res = self.driver._clone(self.volume_a, src_pool, src_image, src_snap)
|
||||
|
||||
self.assertEqual(expected_update, res)
|
||||
mock_enable_repl.assert_called_once_with(self.volume_a)
|
||||
|
||||
args = [client_stack[0].ioctx, str(src_image), str(src_snap),
|
||||
client_stack[1].ioctx, str(self.volume_a.name)]
|
||||
@ -948,6 +1262,45 @@ class RBDTestCase(test.TestCase):
|
||||
*args, **kwargs)
|
||||
self.assertEqual(2, client.__enter__.call_count)
|
||||
|
||||
@ddt.data({},
|
||||
{'replication_status': 'enabled',
|
||||
'replication_driver_data': '{"had_journaling": false}'})
|
||||
@common_mocks
|
||||
@mock.patch.object(driver.RBDDriver, '_is_cloneable', return_value=True)
|
||||
def test_clone_image_replication(self, return_value, mock_cloneable):
|
||||
mock_clone = self.mock_object(self.driver, '_clone',
|
||||
return_value=return_value)
|
||||
image_loc = ('rbd://fee/fi/fo/fum', None)
|
||||
image_meta = {'disk_format': 'raw', 'id': 'id.foo'}
|
||||
|
||||
res = self.driver.clone_image(self.context,
|
||||
self.volume_a,
|
||||
image_loc,
|
||||
image_meta,
|
||||
mock.Mock())
|
||||
|
||||
expected = return_value.copy()
|
||||
expected['provider_location'] = None
|
||||
self.assertEqual((expected, True), res)
|
||||
|
||||
mock_clone.assert_called_once_with(self.volume_a, 'fi', 'fo', 'fum')
|
||||
|
||||
@common_mocks
|
||||
@mock.patch.object(driver.RBDDriver, '_clone',
|
||||
return_value=mock.sentinel.volume_update)
|
||||
@mock.patch.object(driver.RBDDriver, '_resize', mock.Mock())
|
||||
def test_create_vol_from_snap_replication(self, mock_clone):
|
||||
self.cfg.rbd_flatten_volume_from_snapshot = False
|
||||
snapshot = mock.Mock()
|
||||
|
||||
res = self.driver.create_volume_from_snapshot(self.volume_a, snapshot)
|
||||
|
||||
self.assertEqual(mock.sentinel.volume_update, res)
|
||||
mock_clone.assert_called_once_with(self.volume_a,
|
||||
self.cfg.rbd_pool,
|
||||
snapshot.volume_name,
|
||||
snapshot.name)
|
||||
|
||||
@common_mocks
|
||||
def test_extend_volume(self):
|
||||
fake_size = '20'
|
||||
@ -956,38 +1309,99 @@ class RBDTestCase(test.TestCase):
|
||||
self.driver.extend_volume(self.volume_a, fake_size)
|
||||
mock_resize.assert_called_once_with(self.volume_a, size=size)
|
||||
|
||||
@ddt.data(False, True)
|
||||
@common_mocks
|
||||
def test_retype(self):
|
||||
def test_retype(self, enabled):
|
||||
"""Test retyping a non replicated volume.
|
||||
|
||||
We will test on a system that doesn't have replication enabled and on
|
||||
one that hast it enabled.
|
||||
"""
|
||||
self.driver._is_replication_enabled = enabled
|
||||
if enabled:
|
||||
expect = {'replication_status': fields.ReplicationStatus.DISABLED}
|
||||
else:
|
||||
expect = None
|
||||
context = {}
|
||||
diff = {'encryption': {},
|
||||
'extra_specs': {}}
|
||||
updates = {'name': 'testvolume',
|
||||
'host': 'currenthost',
|
||||
'id': fake.VOLUME_ID}
|
||||
fake_type = 'high-IOPS'
|
||||
fake_type = fake_volume.fake_volume_type_obj(context)
|
||||
volume = fake_volume.fake_volume_obj(context, **updates)
|
||||
volume.volume_type = None
|
||||
|
||||
# The hosts have been checked same before rbd.retype
|
||||
# is called.
|
||||
# RBD doesn't support multiple pools in a driver.
|
||||
host = {'host': 'currenthost'}
|
||||
self.assertTrue(self.driver.retype(context, volume,
|
||||
fake_type, diff, host))
|
||||
self.assertEqual((True, expect),
|
||||
self.driver.retype(context, volume,
|
||||
fake_type, diff, host))
|
||||
|
||||
# The encryptions have been checked as same before rbd.retype
|
||||
# is called.
|
||||
diff['encryption'] = {}
|
||||
self.assertTrue(self.driver.retype(context, volume,
|
||||
fake_type, diff, host))
|
||||
self.assertEqual((True, expect),
|
||||
self.driver.retype(context, volume,
|
||||
fake_type, diff, host))
|
||||
|
||||
# extra_specs changes are supported.
|
||||
diff['extra_specs'] = {'non-empty': 'non-empty'}
|
||||
self.assertTrue(self.driver.retype(context, volume,
|
||||
fake_type, diff, host))
|
||||
self.assertEqual((True, expect),
|
||||
self.driver.retype(context, volume,
|
||||
fake_type, diff, host))
|
||||
diff['extra_specs'] = {}
|
||||
|
||||
self.assertTrue(self.driver.retype(context, volume,
|
||||
fake_type, diff, host))
|
||||
self.assertEqual((True, expect),
|
||||
self.driver.retype(context, volume,
|
||||
fake_type, diff, host))
|
||||
|
||||
@ddt.data({'old_replicated': False, 'new_replicated': False},
|
||||
{'old_replicated': False, 'new_replicated': True},
|
||||
{'old_replicated': True, 'new_replicated': False},
|
||||
{'old_replicated': True, 'new_replicated': True})
|
||||
@ddt.unpack
|
||||
@common_mocks
|
||||
@mock.patch.object(driver.RBDDriver, '_disable_replication',
|
||||
return_value=mock.sentinel.disable_replication)
|
||||
@mock.patch.object(driver.RBDDriver, '_enable_replication',
|
||||
return_value=mock.sentinel.enable_replication)
|
||||
def test_retype_replicated(self, mock_disable, mock_enable, old_replicated,
|
||||
new_replicated):
|
||||
"""Test retyping a non replicated volume.
|
||||
|
||||
We will test on a system that doesn't have replication enabled and on
|
||||
one that hast it enabled.
|
||||
"""
|
||||
self.driver._is_replication_enabled = True
|
||||
replicated_type = fake_volume.fake_volume_type_obj(
|
||||
self.context,
|
||||
id=fake.VOLUME_TYPE_ID,
|
||||
extra_specs={'replication_enabled': '<is> True'})
|
||||
|
||||
self.volume_a.volume_type = replicated_type if old_replicated else None
|
||||
|
||||
if new_replicated:
|
||||
new_type = replicated_type
|
||||
if old_replicated:
|
||||
update = None
|
||||
else:
|
||||
update = mock.sentinel.enable_replication
|
||||
else:
|
||||
new_type = fake_volume.fake_volume_type_obj(
|
||||
self.context,
|
||||
id=fake.VOLUME_TYPE2_ID),
|
||||
if old_replicated:
|
||||
update = mock.sentinel.disable_replication
|
||||
else:
|
||||
update = {'replication_status':
|
||||
fields.ReplicationStatus.DISABLED}
|
||||
|
||||
res = self.driver.retype(self.context, self.volume_a, new_type, None,
|
||||
None)
|
||||
self.assertEqual((True, update), res)
|
||||
|
||||
@common_mocks
|
||||
def test_update_migrated_volume(self):
|
||||
@ -1074,6 +1488,129 @@ class RBDTestCase(test.TestCase):
|
||||
self.assertEqual(
|
||||
3, self.mock_rados.Rados.return_value.shutdown.call_count)
|
||||
|
||||
@common_mocks
|
||||
def test_failover_host_no_replication(self):
|
||||
self.driver._is_replication_enabled = False
|
||||
self.assertRaises(exception.UnableToFailOver,
|
||||
self.driver.failover_host,
|
||||
self.context, [self.volume_a])
|
||||
|
||||
@ddt.data(None, 'tertiary-backend')
|
||||
@common_mocks
|
||||
@mock.patch.object(driver.RBDDriver, '_get_failover_target_config')
|
||||
@mock.patch.object(driver.RBDDriver, '_failover_volume', autospec=True)
|
||||
def test_failover_host(self, secondary_id, mock_failover_vol,
|
||||
mock_get_cfg):
|
||||
mock_failover_vol.side_effect = lambda self, v, r, d, s: v
|
||||
self.mock_object(self.driver.configuration, 'safe_get',
|
||||
return_value=[{'backend_id': 'secondary-backend'},
|
||||
{'backend_id': 'tertiary-backend'}])
|
||||
self.driver._do_setup_replication()
|
||||
volumes = [self.volume_a, self.volume_b]
|
||||
remote = self.driver._replication_targets[1 if secondary_id else 0]
|
||||
mock_get_cfg.return_value = (remote['name'], remote)
|
||||
|
||||
res = self.driver.failover_host(self.context, volumes, secondary_id)
|
||||
|
||||
self.assertEqual((remote['name'], volumes), res)
|
||||
self.assertEqual(remote, self.driver._active_config)
|
||||
mock_failover_vol.assert_has_calls(
|
||||
[mock.call(mock.ANY, v, remote, False,
|
||||
fields.ReplicationStatus.FAILED_OVER)
|
||||
for v in volumes])
|
||||
mock_get_cfg.assert_called_once_with(secondary_id)
|
||||
|
||||
@mock.patch.object(driver.RBDDriver, '_failover_volume', autospec=True)
|
||||
def test_failover_host_failback(self, mock_failover_vol):
|
||||
mock_failover_vol.side_effect = lambda self, v, r, d, s: v
|
||||
self.driver._active_backend_id = 'secondary-backend'
|
||||
self.mock_object(self.driver.configuration, 'safe_get',
|
||||
return_value=[{'backend_id': 'secondary-backend'},
|
||||
{'backend_id': 'tertiary-backend'}])
|
||||
self.driver._do_setup_replication()
|
||||
|
||||
remote = self.driver._get_target_config('default')
|
||||
volumes = [self.volume_a, self.volume_b]
|
||||
res = self.driver.failover_host(self.context, volumes, 'default')
|
||||
|
||||
self.assertEqual(('default', volumes), res)
|
||||
self.assertEqual(remote, self.driver._active_config)
|
||||
mock_failover_vol.assert_has_calls(
|
||||
[mock.call(mock.ANY, v, remote, False,
|
||||
fields.ReplicationStatus.ENABLED)
|
||||
for v in volumes])
|
||||
|
||||
@mock.patch.object(driver.RBDDriver, '_failover_volume')
|
||||
def test_failover_host_no_more_replica_targets(self, mock_failover_vol):
|
||||
mock_failover_vol.side_effect = lambda w, x, y, z: w
|
||||
self.driver._active_backend_id = 'secondary-backend'
|
||||
self.mock_object(self.driver.configuration, 'safe_get',
|
||||
return_value=[{'backend_id': 'secondary-backend'}])
|
||||
self.driver._do_setup_replication()
|
||||
|
||||
volumes = [self.volume_a, self.volume_b]
|
||||
self.assertRaises(exception.InvalidReplicationTarget,
|
||||
self.driver.failover_host,
|
||||
self.context, volumes, None)
|
||||
|
||||
def test_failover_volume_non_replicated(self):
|
||||
self.volume_a.replication_status = fields.ReplicationStatus.DISABLED
|
||||
remote = {'name': 'name', 'user': 'user', 'conf': 'conf',
|
||||
'pool': 'pool'}
|
||||
expected = {
|
||||
'volume_id': self.volume_a.id,
|
||||
'updates': {
|
||||
'status': 'error',
|
||||
'previous_status': self.volume_a.status,
|
||||
'replication_status': fields.ReplicationStatus.NOT_CAPABLE,
|
||||
}
|
||||
}
|
||||
res = self.driver._failover_volume(
|
||||
self.volume_a, remote, False, fields.ReplicationStatus.FAILED_OVER)
|
||||
self.assertEqual(expected, res)
|
||||
|
||||
@ddt.data(True, False)
|
||||
@mock.patch.object(driver.RBDDriver, '_exec_on_volume',
|
||||
side_effect=Exception)
|
||||
def test_failover_volume_error(self, is_demoted, mock_exec):
|
||||
self.volume_a.replication_driver_data = '{"had_journaling": false}'
|
||||
self.volume_a.volume_type = fake_volume.fake_volume_type_obj(
|
||||
self.context,
|
||||
id=fake.VOLUME_TYPE_ID,
|
||||
extra_specs={'replication_enabled': '<is> True'})
|
||||
remote = {'name': 'name', 'user': 'user', 'conf': 'conf',
|
||||
'pool': 'pool'}
|
||||
repl_status = fields.ReplicationStatus.FAILOVER_ERROR
|
||||
expected = {'volume_id': self.volume_a.id,
|
||||
'updates': {'status': 'error',
|
||||
'previous_status': self.volume_a.status,
|
||||
'replication_status': repl_status}}
|
||||
res = self.driver._failover_volume(
|
||||
self.volume_a, remote, is_demoted,
|
||||
fields.ReplicationStatus.FAILED_OVER)
|
||||
self.assertEqual(expected, res)
|
||||
mock_exec.assert_called_once_with(self.volume_a.name, remote,
|
||||
'mirror_image_promote',
|
||||
not is_demoted)
|
||||
|
||||
@mock.patch.object(driver.RBDDriver, '_exec_on_volume')
|
||||
def test_failover_volume(self, mock_exec):
|
||||
self.volume_a.replication_driver_data = '{"had_journaling": false}'
|
||||
self.volume_a.volume_type = fake_volume.fake_volume_type_obj(
|
||||
self.context,
|
||||
id=fake.VOLUME_TYPE_ID,
|
||||
extra_specs={'replication_enabled': '<is> True'})
|
||||
remote = {'name': 'name', 'user': 'user', 'conf': 'conf',
|
||||
'pool': 'pool'}
|
||||
repl_status = fields.ReplicationStatus.FAILED_OVER
|
||||
expected = {'volume_id': self.volume_a.id,
|
||||
'updates': {'replication_status': repl_status}}
|
||||
res = self.driver._failover_volume(self.volume_a, remote, True,
|
||||
repl_status)
|
||||
self.assertEqual(expected, res)
|
||||
mock_exec.assert_called_once_with(self.volume_a.name, remote,
|
||||
'mirror_image_promote', False)
|
||||
|
||||
|
||||
class ManagedRBDTestCase(test_volume.DriverTestCase):
|
||||
driver_name = "cinder.volume.drivers.rbd.RBDDriver"
|
||||
@ -1221,6 +1758,7 @@ class ManagedRBDTestCase(test_volume.DriverTestCase):
|
||||
mock_clone:
|
||||
with mock.patch.object(self.volume.driver, '_resize') as \
|
||||
mock_resize:
|
||||
mock_clone.return_value = {}
|
||||
image_loc = ('rbd://fee/fi/fo/fum', None)
|
||||
|
||||
volume = {'name': 'vol1'}
|
||||
@ -1249,6 +1787,7 @@ class ManagedRBDTestCase(test_volume.DriverTestCase):
|
||||
mock.patch.object(self.volume.driver, '_resize') \
|
||||
as mock_resize:
|
||||
mock_is_cloneable.side_effect = cloneable_side_effect
|
||||
mock_clone.return_value = {}
|
||||
image_loc = ('rbd://bee/bi/bo/bum',
|
||||
[{'url': 'rbd://bee/bi/bo/bum'},
|
||||
{'url': 'rbd://fee/fi/fo/fum'}])
|
||||
|
@ -32,6 +32,7 @@ from cinder import exception
|
||||
from cinder.i18n import _, _LE, _LI, _LW
|
||||
from cinder.image import image_utils
|
||||
from cinder import interface
|
||||
from cinder.objects import fields
|
||||
from cinder import utils
|
||||
from cinder.volume import driver
|
||||
|
||||
@ -87,12 +88,19 @@ RBD_OPTS = [
|
||||
'failed.'),
|
||||
cfg.IntOpt('rados_connection_interval', default=5,
|
||||
help='Interval value (in seconds) between connection '
|
||||
'retries to ceph cluster.')
|
||||
'retries to ceph cluster.'),
|
||||
cfg.IntOpt('replication_connect_timeout', default=5,
|
||||
help='Timeout value (in seconds) used when connecting to '
|
||||
'ceph cluster to do a demotion/promotion of volumes. '
|
||||
'If value < 0, no timeout is set and default librados '
|
||||
'value is used.'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(RBD_OPTS)
|
||||
|
||||
EXTRA_SPECS_REPL_ENABLED = "replication_enabled"
|
||||
|
||||
|
||||
class RBDVolumeProxy(object):
|
||||
"""Context manager for dealing with an existing rbd volume.
|
||||
@ -104,8 +112,8 @@ class RBDVolumeProxy(object):
|
||||
'client' and 'ioctx'.
|
||||
"""
|
||||
def __init__(self, driver, name, pool=None, snapshot=None,
|
||||
read_only=False):
|
||||
client, ioctx = driver._connect_to_rados(pool)
|
||||
read_only=False, remote=None, timeout=None):
|
||||
client, ioctx = driver._connect_to_rados(pool, remote, timeout)
|
||||
if snapshot is not None:
|
||||
snapshot = utils.convert_str(snapshot)
|
||||
|
||||
@ -166,7 +174,9 @@ class RBDDriver(driver.TransferVD, driver.ExtendVD,
|
||||
# ThirdPartySystems wiki page
|
||||
CI_WIKI_NAME = "Cinder_Jenkins"
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
SYSCONFDIR = '/etc/ceph/'
|
||||
|
||||
def __init__(self, active_backend_id=None, *args, **kwargs):
|
||||
super(RBDDriver, self).__init__(*args, **kwargs)
|
||||
self.configuration.append_config_values(RBD_OPTS)
|
||||
self._stats = {}
|
||||
@ -182,6 +192,66 @@ class RBDDriver(driver.TransferVD, driver.ExtendVD,
|
||||
if val is not None:
|
||||
setattr(self.configuration, attr, utils.convert_str(val))
|
||||
|
||||
self._backend_name = (self.configuration.volume_backend_name or
|
||||
self.__class__.__name__)
|
||||
self._active_backend_id = active_backend_id
|
||||
self._active_config = {}
|
||||
self._is_replication_enabled = False
|
||||
self._replication_targets = []
|
||||
self._target_names = []
|
||||
|
||||
def _get_target_config(self, target_id):
|
||||
"""Get a replication target from known replication targets."""
|
||||
for target in self._replication_targets:
|
||||
if target['name'] == target_id:
|
||||
return target
|
||||
if not target_id or target_id == 'default':
|
||||
return {
|
||||
'name': self.configuration.rbd_cluster_name,
|
||||
'conf': self.configuration.rbd_ceph_conf,
|
||||
'user': self.configuration.rbd_user
|
||||
}
|
||||
raise exception.InvalidReplicationTarget(
|
||||
reason=_('RBD: Unknown failover target host %s.') % target_id)
|
||||
|
||||
def do_setup(self, context):
|
||||
"""Performs initialization steps that could raise exceptions."""
|
||||
self._do_setup_replication()
|
||||
self._active_config = self._get_target_config(self._active_backend_id)
|
||||
|
||||
def _do_setup_replication(self):
|
||||
replication_devices = self.configuration.safe_get(
|
||||
'replication_device')
|
||||
if replication_devices:
|
||||
self._parse_replication_configs(replication_devices)
|
||||
self._is_replication_enabled = True
|
||||
self._target_names.append('default')
|
||||
|
||||
def _parse_replication_configs(self, replication_devices):
|
||||
for replication_device in replication_devices:
|
||||
if 'backend_id' not in replication_device:
|
||||
msg = _('Missing backend_id in replication_device '
|
||||
'configuration.')
|
||||
raise exception.InvalidConfigurationValue(msg)
|
||||
|
||||
name = replication_device['backend_id']
|
||||
conf = replication_device.get('conf',
|
||||
self.SYSCONFDIR + name + '.conf')
|
||||
user = replication_device.get(
|
||||
'user', self.configuration.rbd_user or 'cinder')
|
||||
# Pool has to be the same in all clusters
|
||||
replication_target = {'name': name,
|
||||
'conf': utils.convert_str(conf),
|
||||
'user': utils.convert_str(user)}
|
||||
LOG.info(_LI('Adding replication target: %s.'), name)
|
||||
self._replication_targets.append(replication_target)
|
||||
self._target_names.append(name)
|
||||
|
||||
def _get_config_tuple(self, remote=None):
|
||||
if not remote:
|
||||
remote = self._active_config
|
||||
return (remote.get('name'), remote.get('conf'), remote.get('user'))
|
||||
|
||||
def check_for_setup_error(self):
|
||||
"""Returns an error if prerequisites aren't met."""
|
||||
if rados is None:
|
||||
@ -204,32 +274,41 @@ class RBDDriver(driver.TransferVD, driver.ExtendVD,
|
||||
|
||||
def _ceph_args(self):
|
||||
args = []
|
||||
if self.configuration.rbd_user:
|
||||
args.extend(['--id', self.configuration.rbd_user])
|
||||
if self.configuration.rbd_ceph_conf:
|
||||
args.extend(['--conf', self.configuration.rbd_ceph_conf])
|
||||
if self.configuration.rbd_cluster_name:
|
||||
args.extend(['--cluster', self.configuration.rbd_cluster_name])
|
||||
|
||||
name, conf, user = self._get_config_tuple()
|
||||
|
||||
if user:
|
||||
args.extend(['--id', user])
|
||||
if name:
|
||||
args.extend(['--cluster', name])
|
||||
if conf:
|
||||
args.extend(['--conf', conf])
|
||||
|
||||
return args
|
||||
|
||||
@utils.retry(exception.VolumeBackendAPIException,
|
||||
CONF.rados_connection_interval,
|
||||
CONF.rados_connection_retries)
|
||||
def _connect_to_rados(self, pool=None):
|
||||
LOG.debug("opening connection to ceph cluster (timeout=%s).",
|
||||
self.configuration.rados_connect_timeout)
|
||||
def _connect_to_rados(self, pool=None, remote=None, timeout=None):
|
||||
|
||||
name, conf, user = self._get_config_tuple(remote)
|
||||
|
||||
client = self.rados.Rados(
|
||||
rados_id=self.configuration.rbd_user,
|
||||
clustername=self.configuration.rbd_cluster_name,
|
||||
conffile=self.configuration.rbd_ceph_conf)
|
||||
if pool is not None:
|
||||
pool = utils.convert_str(pool)
|
||||
else:
|
||||
pool = self.configuration.rbd_pool
|
||||
|
||||
try:
|
||||
if timeout is None:
|
||||
timeout = self.configuration.rados_connect_timeout
|
||||
|
||||
LOG.debug("connecting to %(name)s (timeout=%(timeout)s).",
|
||||
{'name': name, 'timeout': timeout})
|
||||
|
||||
client = self.rados.Rados(rados_id=user,
|
||||
clustername=name,
|
||||
conffile=conf)
|
||||
|
||||
try:
|
||||
if timeout >= 0:
|
||||
timeout = six.text_type(timeout)
|
||||
client.conf_set('rados_osd_op_timeout', timeout)
|
||||
@ -314,6 +393,10 @@ class RBDDriver(driver.TransferVD, driver.ExtendVD,
|
||||
backend_name = self.configuration.safe_get('volume_backend_name')
|
||||
stats['volume_backend_name'] = backend_name or 'RBD'
|
||||
|
||||
stats['replication_enabled'] = self._is_replication_enabled
|
||||
if self._is_replication_enabled:
|
||||
stats['replication_targets'] = self._target_names
|
||||
|
||||
try:
|
||||
with RADOSClient(self) as client:
|
||||
ret, outbuf, _outs = client.cluster.mon_command(
|
||||
@ -443,7 +526,16 @@ class RBDDriver(driver.TransferVD, driver.ExtendVD,
|
||||
except Exception:
|
||||
src_volume.unprotect_snap(clone_snap)
|
||||
src_volume.remove_snap(clone_snap)
|
||||
src_volume.close()
|
||||
raise
|
||||
|
||||
try:
|
||||
volume_update = self._enable_replication_if_needed(volume)
|
||||
except Exception:
|
||||
self.RBDProxy().remove(client.ioctx, dest_name)
|
||||
err_msg = (_('Failed to enable image replication'))
|
||||
raise exception.ReplicationError(reason=err_msg,
|
||||
volume_id=volume.id)
|
||||
finally:
|
||||
src_volume.close()
|
||||
|
||||
@ -455,6 +547,35 @@ class RBDDriver(driver.TransferVD, driver.ExtendVD,
|
||||
self._resize(volume)
|
||||
|
||||
LOG.debug("clone created successfully")
|
||||
return volume_update
|
||||
|
||||
def _enable_replication(self, volume):
|
||||
"""Enable replication for a volume.
|
||||
|
||||
Returns required volume update.
|
||||
"""
|
||||
vol_name = utils.convert_str(volume.name)
|
||||
with RBDVolumeProxy(self, vol_name) as image:
|
||||
had_journaling = image.features() & self.rbd.RBD_FEATURE_JOURNALING
|
||||
if not had_journaling:
|
||||
image.update_features(self.rbd.RBD_FEATURE_JOURNALING, True)
|
||||
image.mirror_image_enable()
|
||||
|
||||
driver_data = self._dumps({'had_journaling': bool(had_journaling)})
|
||||
return {'replication_status': fields.ReplicationStatus.ENABLED,
|
||||
'replication_driver_data': driver_data}
|
||||
|
||||
def _is_replicated_type(self, volume_type):
|
||||
# We do a safe attribute get because volume_type could be None
|
||||
specs = getattr(volume_type, 'extra_specs', {})
|
||||
return specs.get(EXTRA_SPECS_REPL_ENABLED) == "<is> True"
|
||||
|
||||
def _enable_replication_if_needed(self, volume):
|
||||
if self._is_replicated_type(volume.volume_type):
|
||||
return self._enable_replication(volume)
|
||||
if self._is_replication_enabled:
|
||||
return {'replication_status': fields.ReplicationStatus.DISABLED}
|
||||
return None
|
||||
|
||||
def create_volume(self, volume):
|
||||
"""Creates a logical volume."""
|
||||
@ -469,15 +590,25 @@ class RBDDriver(driver.TransferVD, driver.ExtendVD,
|
||||
|
||||
chunk_size = self.configuration.rbd_store_chunk_size * units.Mi
|
||||
order = int(math.log(chunk_size, 2))
|
||||
vol_name = utils.convert_str(volume.name)
|
||||
|
||||
with RADOSClient(self) as client:
|
||||
self.RBDProxy().create(client.ioctx,
|
||||
utils.convert_str(volume.name),
|
||||
vol_name,
|
||||
size,
|
||||
order,
|
||||
old_format=False,
|
||||
features=client.features)
|
||||
|
||||
try:
|
||||
volume_update = self._enable_replication_if_needed(volume)
|
||||
except Exception:
|
||||
self.RBDProxy().remove(client.ioctx, vol_name)
|
||||
err_msg = (_('Failed to enable image replication'))
|
||||
raise exception.ReplicationError(reason=err_msg,
|
||||
volume_id=volume.id)
|
||||
return volume_update
|
||||
|
||||
def _flatten(self, pool, volume_name):
|
||||
LOG.debug('flattening %(pool)s/%(img)s',
|
||||
dict(pool=pool, img=volume_name))
|
||||
@ -491,6 +622,7 @@ class RBDDriver(driver.TransferVD, driver.ExtendVD,
|
||||
|
||||
chunk_size = self.configuration.rbd_store_chunk_size * units.Mi
|
||||
order = int(math.log(chunk_size, 2))
|
||||
vol_name = utils.convert_str(volume.name)
|
||||
|
||||
with RADOSClient(self, src_pool) as src_client:
|
||||
with RADOSClient(self) as dest_client:
|
||||
@ -498,10 +630,19 @@ class RBDDriver(driver.TransferVD, driver.ExtendVD,
|
||||
utils.convert_str(src_image),
|
||||
utils.convert_str(src_snap),
|
||||
dest_client.ioctx,
|
||||
utils.convert_str(volume.name),
|
||||
vol_name,
|
||||
features=src_client.features,
|
||||
order=order)
|
||||
|
||||
try:
|
||||
volume_update = self._enable_replication_if_needed(volume)
|
||||
except Exception:
|
||||
self.RBDProxy().remove(dest_client.ioctx, vol_name)
|
||||
err_msg = (_('Failed to enable image replication'))
|
||||
raise exception.ReplicationError(reason=err_msg,
|
||||
volume_id=volume.id)
|
||||
return volume_update or {}
|
||||
|
||||
def _resize(self, volume, **kwargs):
|
||||
size = kwargs.get('size', None)
|
||||
if not size:
|
||||
@ -512,12 +653,13 @@ class RBDDriver(driver.TransferVD, driver.ExtendVD,
|
||||
|
||||
def create_volume_from_snapshot(self, volume, snapshot):
|
||||
"""Creates a volume from a snapshot."""
|
||||
self._clone(volume, self.configuration.rbd_pool,
|
||||
snapshot.volume_name, snapshot.name)
|
||||
volume_update = self._clone(volume, self.configuration.rbd_pool,
|
||||
snapshot.volume_name, snapshot.name)
|
||||
if self.configuration.rbd_flatten_volume_from_snapshot:
|
||||
self._flatten(self.configuration.rbd_pool, volume.name)
|
||||
if int(volume.size):
|
||||
self._resize(volume)
|
||||
return volume_update
|
||||
|
||||
def _delete_backup_snaps(self, rbd_image):
|
||||
backup_snaps = self._get_backup_snaps(rbd_image)
|
||||
@ -719,16 +861,162 @@ class RBDDriver(driver.TransferVD, driver.ExtendVD,
|
||||
LOG.info(_LI("Snapshot %s does not exist in backend."),
|
||||
snap_name)
|
||||
|
||||
def retype(self, context, volume, new_type, diff, host):
|
||||
"""Retypes a volume, allow Qos and extra_specs change."""
|
||||
def _disable_replication(self, volume):
|
||||
"""Disable replication on the given volume."""
|
||||
vol_name = utils.convert_str(volume.name)
|
||||
with RBDVolumeProxy(self, vol_name) as image:
|
||||
image.mirror_image_disable(False)
|
||||
driver_data = json.loads(volume.replication_driver_data)
|
||||
# If we didn't have journaling enabled when we enabled replication
|
||||
# we must remove journaling since it we added it for the
|
||||
# replication
|
||||
if not driver_data['had_journaling']:
|
||||
image.update_features(self.rbd.RBD_FEATURE_JOURNALING, False)
|
||||
return {'replication_status': fields.ReplicationStatus.DISABLED,
|
||||
'replication_driver_data': None}
|
||||
|
||||
# No need to check encryption, extra_specs and Qos here as:
|
||||
# encryptions have been checked as same.
|
||||
# extra_specs are not used in the driver.
|
||||
# Qos settings are not used in the driver.
|
||||
LOG.debug('RBD retype called for volume %s. No action '
|
||||
'required for RBD volumes.', volume.id)
|
||||
return True
|
||||
def retype(self, context, volume, new_type, diff, host):
|
||||
"""Retype from one volume type to another on the same backend."""
|
||||
old_vol_replicated = self._is_replicated_type(volume.volume_type)
|
||||
new_vol_replicated = self._is_replicated_type(new_type)
|
||||
|
||||
if old_vol_replicated and not new_vol_replicated:
|
||||
try:
|
||||
return True, self._disable_replication(volume)
|
||||
except Exception:
|
||||
err_msg = (_('Failed to disable image replication'))
|
||||
raise exception.ReplicationError(reason=err_msg,
|
||||
volume_id=volume.id)
|
||||
elif not old_vol_replicated and new_vol_replicated:
|
||||
try:
|
||||
return True, self._enable_replication(volume)
|
||||
except Exception:
|
||||
err_msg = (_('Failed to enable image replication'))
|
||||
raise exception.ReplicationError(reason=err_msg,
|
||||
volume_id=volume.id)
|
||||
|
||||
if not new_vol_replicated and self._is_replication_enabled:
|
||||
update = {'replication_status': fields.ReplicationStatus.DISABLED}
|
||||
else:
|
||||
update = None
|
||||
return True, update
|
||||
|
||||
def _dumps(self, obj):
|
||||
return json.dumps(obj, separators=(',', ':'))
|
||||
|
||||
def _exec_on_volume(self, volume_name, remote, operation, *args, **kwargs):
|
||||
@utils.retry(rbd.ImageBusy,
|
||||
CONF.rados_connection_interval,
|
||||
CONF.rados_connection_retries)
|
||||
def _do_exec():
|
||||
timeout = self.configuration.replication_connect_timeout
|
||||
with RBDVolumeProxy(self, volume_name, self.configuration.rbd_pool,
|
||||
remote=remote, timeout=timeout) as rbd_image:
|
||||
return getattr(rbd_image, operation)(*args, **kwargs)
|
||||
return _do_exec()
|
||||
|
||||
def _failover_volume(self, volume, remote, is_demoted, replication_status):
|
||||
"""Process failover for a volume.
|
||||
|
||||
There are 3 different cases that will return different update values
|
||||
for the volume:
|
||||
|
||||
- Volume has replication enabled and failover succeeded: Set
|
||||
replication status to failed-over.
|
||||
- Volume has replication enabled and failover fails: Set status to
|
||||
error, replication status to failover-error, and store previous
|
||||
status in previous_status field.
|
||||
- Volume replication is disabled: Set status to error, and store
|
||||
status in previous_status field.
|
||||
"""
|
||||
# Failover is allowed when volume has it enabled or it has already
|
||||
# failed over, because we may want to do a second failover.
|
||||
if self._is_replicated_type(volume.volume_type):
|
||||
vol_name = utils.convert_str(volume.name)
|
||||
try:
|
||||
self._exec_on_volume(vol_name, remote,
|
||||
'mirror_image_promote', not is_demoted)
|
||||
|
||||
return {'volume_id': volume.id,
|
||||
'updates': {'replication_status': replication_status}}
|
||||
except Exception as e:
|
||||
replication_status = fields.ReplicationStatus.FAILOVER_ERROR
|
||||
LOG.error(_LE('Failed to failover volume %(volume)s with '
|
||||
'error: %(error)s.'),
|
||||
{'volume': volume.name, 'error': e})
|
||||
else:
|
||||
replication_status = fields.ReplicationStatus.NOT_CAPABLE
|
||||
LOG.debug('Skipping failover for non replicated volume '
|
||||
'%(volume)s with status: %(status)s',
|
||||
{'volume': volume.name, 'status': volume.status})
|
||||
|
||||
# Failover did not happen
|
||||
error_result = {
|
||||
'volume_id': volume.id,
|
||||
'updates': {
|
||||
'status': 'error',
|
||||
'previous_status': volume.status,
|
||||
'replication_status': replication_status
|
||||
}
|
||||
}
|
||||
|
||||
return error_result
|
||||
|
||||
def _demote_volumes(self, volumes, until_failure=True):
|
||||
"""Try to demote volumes on the current primary cluster."""
|
||||
result = []
|
||||
try_demoting = True
|
||||
for volume in volumes:
|
||||
demoted = False
|
||||
if try_demoting and self._is_replicated_type(volume.volume_type):
|
||||
vol_name = utils.convert_str(volume.name)
|
||||
try:
|
||||
self._exec_on_volume(vol_name, self._active_config,
|
||||
'mirror_image_demote')
|
||||
demoted = True
|
||||
except Exception as e:
|
||||
LOG.debug('Failed to demote %(volume)s with error: '
|
||||
'%(error)s.',
|
||||
{'volume': volume.name, 'error': e})
|
||||
try_demoting = not until_failure
|
||||
result.append(demoted)
|
||||
return result
|
||||
|
||||
def _get_failover_target_config(self, secondary_id=None):
|
||||
if not secondary_id:
|
||||
# In auto mode exclude failback and active
|
||||
candidates = set(self._target_names).difference(
|
||||
('default', self._active_backend_id))
|
||||
if not candidates:
|
||||
raise exception.InvalidReplicationTarget(
|
||||
reason=_('RBD: No available failover target host.'))
|
||||
secondary_id = candidates.pop()
|
||||
return secondary_id, self._get_target_config(secondary_id)
|
||||
|
||||
def failover_host(self, context, volumes, secondary_id=None):
|
||||
"""Failover to replication target."""
|
||||
LOG.info(_LI('RBD driver failover started.'))
|
||||
if not self._is_replication_enabled:
|
||||
raise exception.UnableToFailOver(
|
||||
reason=_('RBD: Replication is not enabled.'))
|
||||
|
||||
if secondary_id == 'default':
|
||||
replication_status = fields.ReplicationStatus.ENABLED
|
||||
else:
|
||||
replication_status = fields.ReplicationStatus.FAILED_OVER
|
||||
|
||||
secondary_id, remote = self._get_failover_target_config(secondary_id)
|
||||
|
||||
# Try to demote the volumes first
|
||||
demotion_results = self._demote_volumes(volumes)
|
||||
# Do the failover taking into consideration if they have been demoted
|
||||
updates = [self._failover_volume(volume, remote, is_demoted,
|
||||
replication_status)
|
||||
for volume, is_demoted in zip(volumes, demotion_results)]
|
||||
self._active_backend_id = secondary_id
|
||||
self._active_config = remote
|
||||
LOG.info(_LI('RBD driver failover completed.'))
|
||||
return secondary_id, updates
|
||||
|
||||
def ensure_export(self, context, volume):
|
||||
"""Synchronously recreates an export for a logical volume."""
|
||||
@ -834,9 +1122,10 @@ class RBDDriver(driver.TransferVD, driver.ExtendVD,
|
||||
url_location, image_meta):
|
||||
_prefix, pool, image, snapshot = \
|
||||
self._parse_location(url_location)
|
||||
self._clone(volume, pool, image, snapshot)
|
||||
volume_update = self._clone(volume, pool, image, snapshot)
|
||||
volume_update['provider_location'] = None
|
||||
self._resize(volume)
|
||||
return {'provider_location': None}, True
|
||||
return volume_update, True
|
||||
return ({}, False)
|
||||
|
||||
def _image_conversion_dir(self):
|
||||
|
@ -0,0 +1,3 @@
|
||||
---
|
||||
features:
|
||||
- Added v2.1 replication support to RBD driver.
|
Loading…
Reference in New Issue
Block a user