Update Pure replication to cheesecake

Migration to cheesecake replication. This basically
modifies any of the existing replication code to work on the whole
backend instead of the specified volume.

It also completes the transition to allow swapping the underlying array
being managed by cinder upon failover. In the v2 implementation it was
conditional on some operations where things *could* be on the other
array. Now it is all driver operations.

This also switches the target_device_id to backend_id in the config for
replication devices.

Change-Id: I7f2580f4d764266b62a755cb4529c290e639984c
Closes-Bug: #1542100
This commit is contained in:
Patrick East 2016-02-05 15:52:31 -08:00
parent bb8b6ff100
commit a9e32691c2
2 changed files with 504 additions and 725 deletions

View File

@ -283,29 +283,35 @@ REPLICATED_PGSNAPS = [
"progress": 1.0,
"data_transferred": 318
}]
REPLICATED_VOLUME_OBJS = [
fake_volume.fake_volume_obj(None, id='repl-1'),
fake_volume.fake_volume_obj(None, id='repl-2'),
fake_volume.fake_volume_obj(None, id='repl-3'),
]
REPLICATED_VOLUME_SNAPS = [
{
"source": "array1:replicated_volume1",
"source": "array1:volume-repl-1-cinder",
"serial": "BBA481C01639104E0001D5F7",
"created": "2014-12-04T22:59:38Z",
"name": "array1:cinder-repl-pg.2.replicated_volume1",
"name": "array1:cinder-repl-pg.2.volume-repl-1-cinder",
"size": 1048576
},
{
"source": "array1:replicated_volume2",
"source": "array1:volume-repl-2-cinder",
"serial": "BBA481C01639104E0001D5F8",
"created": "2014-12-04T22:59:38Z",
"name": "array1:cinder-repl-pg.2.replicated_volume2",
"name": "array1:cinder-repl-pg.2.volume-repl-2-cinder",
"size": 1048576
},
{
"source": "array1:replicated_volume3",
"source": "array1:volume-repl-3-cinder",
"serial": "BBA481C01639104E0001D5F9",
"created": "2014-12-04T22:59:38Z",
"name": "array1:PureSRAEndToEndPGroup1.2.replicated_volume3",
"name": "array1:cinder-repl-pg.2.volume-repl-3-cinder",
"size": 1048576
}
]
NON_REPLICATED_VOL_TYPE = {"is_public": True,
"extra_specs": {},
"name": "volume_type_1",
@ -401,7 +407,7 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
self.mock_config.pure_replica_retention_long_term_default = (
REPLICATION_RETENTION_LONG_TERM_PER_DAY)
self.mock_config.safe_get.return_value = [
{"target_device_id": self.driver._array.array_id,
{"backend_id": self.driver._array.array_id,
"managed_backend_name": None,
"san_ip": "1.2.3.4",
"api_token": "abc123"}]
@ -418,7 +424,7 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
# Test single array configured
self.mock_config.safe_get.return_value = [
{"target_device_id": self.driver._array.id,
{"backend_id": self.driver._array.id,
"managed_backend_name": None,
"san_ip": "1.2.3.4",
"api_token": "abc123"}]
@ -428,7 +434,7 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
self.assertEqual(self.array, self.driver._replication_target_arrays[0])
only_target_array = self.driver._replication_target_arrays[0]
self.assertEqual(self.driver._array.id,
only_target_array._target_device_id)
only_target_array._backend_id)
@mock.patch(BASE_DRIVER_OBJ + '._generate_replication_retention')
@mock.patch(BASE_DRIVER_OBJ + '._setup_replicated_pgroups')
@ -443,11 +449,11 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
# Test multiple arrays configured
self.mock_config.safe_get.return_value = [
{"target_device_id": GET_ARRAY_PRIMARY["id"],
{"backend_id": GET_ARRAY_PRIMARY["id"],
"managed_backend_name": None,
"san_ip": "1.2.3.4",
"api_token": "abc123"},
{"target_device_id": GET_ARRAY_SECONDARY["id"],
{"backend_id": GET_ARRAY_SECONDARY["id"],
"managed_backend_name": None,
"san_ip": "1.2.3.5",
"api_token": "abc124"}]
@ -458,12 +464,12 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
self.assertEqual(self.array, self.driver._replication_target_arrays[0])
first_target_array = self.driver._replication_target_arrays[0]
self.assertEqual(GET_ARRAY_PRIMARY["id"],
first_target_array._target_device_id)
first_target_array._backend_id)
self.assertEqual(
self.array2, self.driver._replication_target_arrays[1])
second_target_array = self.driver._replication_target_arrays[1]
self.assertEqual(GET_ARRAY_SECONDARY["id"],
second_target_array._target_device_id)
second_target_array._backend_id)
@mock.patch(BASE_DRIVER_OBJ + '._generate_replication_retention')
@mock.patch(BASE_DRIVER_OBJ + '._setup_replicated_pgroups')
@ -1143,8 +1149,7 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
expected_snapshot_update = [{
'id': mock_snap.id,
'status': 'available',
'provider_location': self.array.array_id
'status': 'available'
}]
self.assertEqual(expected_snapshot_update, snapshots)
@ -1526,8 +1531,8 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
def test_retype_non_repl_to_non_repl(self, mock_is_replicated_type):
self._test_retype_repl(mock_is_replicated_type, False, False)
@mock.patch(BASE_DRIVER_OBJ + '.replication_enable')
@mock.patch(BASE_DRIVER_OBJ + '.replication_disable')
@mock.patch(BASE_DRIVER_OBJ + '._enable_replication')
@mock.patch(BASE_DRIVER_OBJ + '._disable_replication')
@mock.patch(BASE_DRIVER_OBJ + '._is_volume_replicated_type', autospec=True)
def test_retype_non_repl_to_repl(self,
mock_is_replicated_type,
@ -1538,10 +1543,10 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
False,
True)
self.assertFalse(mock_replication_disable.called)
mock_replication_enable.assert_called_with(context, volume)
mock_replication_enable.assert_called_with(volume)
@mock.patch(BASE_DRIVER_OBJ + '.replication_enable')
@mock.patch(BASE_DRIVER_OBJ + '.replication_disable')
@mock.patch(BASE_DRIVER_OBJ + '._enable_replication')
@mock.patch(BASE_DRIVER_OBJ + '._disable_replication')
@mock.patch(BASE_DRIVER_OBJ + '._is_volume_replicated_type', autospec=True)
def test_retype_repl_to_non_repl(self,
mock_is_replicated_type,
@ -1551,50 +1556,7 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
True,
False)
self.assertFalse(mock_replication_enable.called)
mock_replication_disable.assert_called_with(context, volume)
def test_get_arrays_single_array_found(self):
volume = deepcopy(VOLUME)
# Test case where only single array, it's the one we're looking for
volume["provider_location"] = GET_ARRAY_PRIMARY["id"]
rslt_primary, rslt_secondary = self.driver._get_arrays(volume)
self.assertEqual(self.array, rslt_primary)
self.assertEqual(0, len(rslt_secondary))
def test_get_arrays_single_array_not_found(self):
volume = deepcopy(VOLUME)
# Test case where only single array, it's not the one we're looking for
volume["provider_location"] = "won't find me"
rslt_primary, rslt_secondary = self.driver._get_arrays(volume)
self.assertEqual(self.array, rslt_primary)
self.assertEqual(0, len(rslt_secondary))
def test_get_arrays_no_primary_configured(self):
volume = deepcopy(VOLUME)
# Test case where no primary is configured,
del volume["provider_location"]
rslt_primary, rslt_secondary = self.driver._get_arrays(volume)
self.assertEqual(self.array, rslt_primary)
self.assertEqual(0, len(rslt_secondary))
def test_get_arrays_after_failover(self):
volume = deepcopy(VOLUME)
# Test case where 2 arrays, volume is failed over to secondary
volume["provider_location"] = GET_ARRAY_SECONDARY["id"]
self.array.array_name = GET_ARRAY_PRIMARY["array_name"]
self.driver._replication_target_arrays = [self.array2]
rslt_primary, rslt_secondary = self.driver._get_arrays(volume)
self.assertEqual(self.array2, rslt_primary)
self.assertListEqual([self.array], rslt_secondary)
def test_get_arrays_primary_configured(self):
volume = deepcopy(VOLUME)
# Test case where 2 arrays, provider_location is primary
volume["provider_location"] = GET_ARRAY_PRIMARY["id"]
self.driver._replication_target_arrays = [self.array2]
rslt_primary, rslt_secondary = self.driver._get_arrays(volume)
self.assertEqual(self.array, rslt_primary)
self.assertListEqual([self.array2], rslt_secondary)
mock_replication_disable.assert_called_with(volume)
@mock.patch('cinder.volume.volume_types.get_volume_type')
def test_is_vol_replicated_no_extra_specs(self, mock_get_vol_type):
@ -1683,412 +1645,245 @@ class PureBaseVolumeDriverTestCase(PureBaseSharedDriverTestCase):
self.driver._replication_target_arrays = [mock.Mock()]
self.driver._replication_target_arrays[0].array_name = (
GET_ARRAY_SECONDARY["array_name"])
model_update = self.driver.create_volume(VOLUME)
self.assertEqual(
"enabled", model_update["replication_status"],
"create_volume should return valid replication_status")
self.assertEqual(GET_ARRAY_PRIMARY["id"],
model_update["provider_location"])
self.driver.create_volume(VOLUME)
self.array.create_volume.assert_called_with(
VOLUME["name"] + "-cinder", 2 * units.Gi)
self.array.set_pgroup.assert_called_with(
REPLICATION_PROTECTION_GROUP,
addvollist=[VOLUME["name"] + "-cinder"])
self.assert_error_propagates([self.array.create_volume],
self.driver.create_volume, VOLUME)
@mock.patch(BASE_DRIVER_OBJ + "._get_flasharray")
def test_get_latest_replicated_vol_snap(self, mock_get_flasharray):
mock_get_flasharray.return_value = self.array
replicated_vol_snaps_return_0 = deepcopy(REPLICATED_VOLUME_SNAPS)
for snap in replicated_vol_snaps_return_0:
snap["source"] = "not_the_volume_we_want"
replicated_vol_snaps_return_1 = deepcopy(REPLICATED_VOLUME_SNAPS)
self.array.get_pgroup.return_value = REPLICATED_PGSNAPS
self.array.get_volume = mock.Mock()
self.array.get_volume.side_effect = [
replicated_vol_snaps_return_0,
replicated_vol_snaps_return_1]
source_array_name = "array1"
replicated_pg_name = "cinder-repl-pg"
replicated_vol_name = "replicated_volume2"
result = self.driver._get_latest_replicated_vol_snap(
self.array,
source_array_name,
replicated_pg_name,
replicated_vol_name)
expected = {
"source": "array1:replicated_volume2",
"serial": "BBA481C01639104E0001D5F8",
"created": "2014-12-04T22:59:38Z",
"name": "array1:cinder-repl-pg.2.replicated_volume2",
"size": 1048576
}
self.assertEqual(expected, result)
# Test when no replicated PG snapshots available
self.array.get_pgroup.return_value = []
result = self.driver._get_latest_replicated_vol_snap(
self.array,
source_array_name,
replicated_pg_name,
replicated_vol_name)
self.assertIsNone(result)
# Test when no replicated PG snapshot contains the volume we want
self.array.get_pgroup.return_value = REPLICATED_PGSNAPS
self.array.get_volume.side_effect = None
self.array.get_volume.return_value = REPLICATED_VOLUME_SNAPS
result = self.driver._get_latest_replicated_vol_snap(
self.array,
source_array_name,
replicated_pg_name,
"missing_volume")
self.assertIsNone(result)
@mock.patch('cinder.volume.volume_types.get_volume_type')
def test_create_cloned_volume_failed_over(self, mock_get_volume_type):
# Tests cloning a volume that is failed over to secondary
mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
vol_name = VOLUME["name"] + "-cinder"
src_name = SRC_VOL["name"] + "-cinder"
volume = deepcopy(VOLUME)
del volume["provider_location"]
src_vol = deepcopy(SRC_VOL)
src_vol["provider_location"] = GET_ARRAY_SECONDARY["id"]
src_vol["replication_status"] = "enabled"
self.driver._array.array_name = GET_ARRAY_PRIMARY["array_name"]
self.driver._array.array_id = GET_ARRAY_PRIMARY["id"]
self.driver._replication_target_arrays = [self.array2]
rslt = self.driver.create_cloned_volume(volume, src_vol)
self.assertFalse(self.array.copy_volume.called)
self.array2.copy_volume.assert_called_with(src_name, vol_name)
self.assertEqual("enabled", rslt["replication_status"])
self.assertEqual(GET_ARRAY_SECONDARY["id"], rslt["provider_location"])
self.assertFalse(self.array.extend_volume.called)
self.assertFalse(self.array2.extend_volume.called)
self.assert_error_propagates(
[self.array2.copy_volume],
self.driver.create_cloned_volume, volume, src_vol)
self.assertTrue(self.array2.set_pgroup.called)
self.assertFalse(self.array.set_pgroup.called)
@mock.patch(BASE_DRIVER_OBJ + "._add_and_replicate_if_needed")
@mock.patch(BASE_DRIVER_OBJ + "._get_arrays")
@mock.patch('cinder.volume.volume_types.get_volume_type')
def test_replication_enable(self, mock_get_volume_type,
mock_get_arrays,
mock_add_vol_to_pg):
mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
mock_get_arrays.return_value = (self.array, self.array2)
volume = deepcopy(VOLUME)
# Test volume is added to replication PG
rslt = self.driver.replication_enable(None, volume)
mock_add_vol_to_pg.assert_called_with(self.array, volume)
# Verify model_update shows replication is re-enabled
self.assertEqual("enabled", rslt["replication_status"])
@mock.patch('cinder.volume.volume_types.get_volume_type')
@mock.patch(BASE_DRIVER_OBJ + "._get_latest_replicated_vol_snap",
autospec=True)
def test_failover_src_volume_has_no_repl_snapshot(
self,
mock_get_latest_replicated_vol_snap,
mock_get_volume_type):
# Test case when target volume doesn't have a replicated snapshot. this
# can happen if replication is still propagating the first snapshot
mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
src_vol = deepcopy(VOLUME)
target_array = "dummy_target_device_id"
src_vol["replication_status"] = "enabled"
src_vol["volume_type_id"] = REPLICATED_VOL_TYPE["id"]
self.driver._replication_target_array = mock.Mock()
mock_get_latest_replicated_vol_snap.return_value = None
context = mock.MagicMock()
def test_find_failover_target_no_repl_targets(self):
self.driver._replication_target_arrays = []
self.assertRaises(exception.PureDriverException,
self.driver.replication_failover,
context,
src_vol,
target_array)
self.driver._find_failover_target,
None)
@mock.patch(BASE_DRIVER_OBJ + "._get_latest_replicated_vol_snap",
autospec=True)
@mock.patch('cinder.volume.volume_types.get_volume_type')
def test_failover(self, mock_get_volume_type,
mock_get_latest_replicated_vol_snap):
# Test case when replication is set up correctly, expect call
# to copy_volume
src_vol = deepcopy(VOLUME)
src_vol["replication_status"] = "enabled"
src_vol["volume_type_id"] = REPLICATED_VOL_TYPE["id"]
self._setup_mocks_for_replication()
mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
self.array2._target_device_id = self.array2.array_id
self.driver._replication_target_arrays = [self.array2]
# Assume array_name is also the san_ip
self.array2._target = GET_ARRAY_SECONDARY["array_name"]
self.driver._array._target = GET_ARRAY_PRIMARY["array_name"]
mock_get_latest_replicated_vol_snap.return_value =\
REPLICATED_VOLUME_SNAPS[0]
target_array = self.driver._replication_target_arrays[0]
target_array.copy_volume = mock.Mock()
context = mock.MagicMock()
self.driver.replication_failover(context, src_vol,
self.array2._target_device_id)
target_array.copy_volume.assert_called_with(
REPLICATED_VOLUME_SNAPS[0]["name"],
src_vol["name"] + "-cinder",
overwrite=True
)
@mock.patch(BASE_DRIVER_OBJ + '._get_latest_replicated_pg_snap')
def test_find_failover_target_secondary_specified(self, mock_get_snap):
mock_backend_1 = mock.Mock()
mock_backend_2 = mock.Mock()
secondary_id = 'foo'
mock_backend_2._backend_id = secondary_id
self.driver._replication_target_arrays = [mock_backend_1,
mock_backend_2]
mock_get_snap.return_value = REPLICATED_PGSNAPS[0]
array, pg_snap = self.driver._find_failover_target(secondary_id)
self.assertEqual(mock_backend_2, array)
self.assertEqual(REPLICATED_PGSNAPS[0], pg_snap)
def test_find_failover_target_secondary_specified_not_found(self):
mock_backend = mock.Mock()
mock_backend._backend_id = 'not_foo'
self.driver._replication_target_arrays = [mock_backend]
self.assertRaises(exception.InvalidReplicationTarget,
self.driver._find_failover_target,
'foo')
@mock.patch(BASE_DRIVER_OBJ + '._get_latest_replicated_pg_snap')
def test_find_failover_target_secondary_specified_no_pgsnap(self,
mock_get_snap):
mock_backend = mock.Mock()
secondary_id = 'foo'
mock_backend._backend_id = secondary_id
self.driver._replication_target_arrays = [mock_backend]
mock_get_snap.return_value = None
@mock.patch('cinder.volume.volume_types.get_volume_type')
def test_failover_no_replicated_array_configured(self,
mock_get_volume_type):
# verify exception if no replication target is configured
mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
self.driver._replication_target_arrays = None
context = mock.Mock()
src_vol = VOLUME
target_array = "dummy_target_device_id"
self.assertRaises(exception.PureDriverException,
self.driver.replication_failover,
context,
src_vol,
target_array)
self.driver._find_failover_target,
secondary_id)
@mock.patch(BASE_DRIVER_OBJ + "._get_latest_replicated_vol_snap",
autospec=True)
@mock.patch('cinder.volume.volume_types.get_volume_type')
def test_failover_invalid_replicated_array_specified(
self,
mock_get_volume_type,
mock_get_latest_replicated_vol_snap):
src_vol = deepcopy(VOLUME)
src_vol["replication_status"] = "enabled"
src_vol["volume_type_id"] = REPLICATED_VOL_TYPE["id"]
mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
self.array2._target_device_id = self.array2.array_id
self.driver._replication_target_arrays = [self.array2]
# Assume array_name is also the san_ip
self.array2._target = GET_ARRAY_SECONDARY["array_name"]
self.driver._array._target = GET_ARRAY_PRIMARY["array_name"]
# we should not attempt the operation to find snapshot on secondary
assert not mock_get_latest_replicated_vol_snap.called
@mock.patch(BASE_DRIVER_OBJ + '._get_latest_replicated_pg_snap')
def test_find_failover_target_no_secondary_specified(self,
mock_get_snap):
mock_backend_1 = mock.Mock()
mock_backend_2 = mock.Mock()
self.driver._replication_target_arrays = [mock_backend_1,
mock_backend_2]
mock_get_snap.return_value = REPLICATED_PGSNAPS[0]
@mock.patch('cinder.volume.volume_types.get_volume_type')
@mock.patch(BASE_DRIVER_OBJ + '._get_current_array')
def test_replication_disable(self, mock_get_array, mock_get_volume_type):
mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
mock_get_array.return_value = self.array
context = mock.MagicMock()
volume = VOLUME
array, pg_snap = self.driver._find_failover_target(None)
self.assertEqual(mock_backend_1, array)
self.assertEqual(REPLICATED_PGSNAPS[0], pg_snap)
model_update = self.driver.replication_disable(context, volume)
@mock.patch(BASE_DRIVER_OBJ + '._get_latest_replicated_pg_snap')
def test_find_failover_target_no_secondary_specified_missing_pgsnap(
self, mock_get_snap):
mock_backend_1 = mock.Mock()
mock_backend_2 = mock.Mock()
self.driver._replication_target_arrays = [mock_backend_1,
mock_backend_2]
mock_get_snap.side_effect = [None, REPLICATED_PGSNAPS[0]]
self.assertDictMatch({'replication_status': 'disabled'}, model_update)
self.array.set_pgroup.assert_called_with(
self.driver._replication_pg_name,
remvollist=['volume-' + volume['id'] + '-cinder']
)
array, pg_snap = self.driver._find_failover_target(None)
self.assertEqual(mock_backend_2, array)
self.assertEqual(REPLICATED_PGSNAPS[0], pg_snap)
@mock.patch('cinder.volume.volume_types.get_volume_type')
@mock.patch(BASE_DRIVER_OBJ + '._get_current_array')
def test_replication_disable_error_propagates(self,
mock_get_array,
mock_get_volume_type):
mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
mock_get_array.return_value = self.array
context = mock.MagicMock()
volume = VOLUME
@mock.patch(BASE_DRIVER_OBJ + '._get_latest_replicated_pg_snap')
def test_find_failover_target_no_secondary_specified_no_pgsnap(
self, mock_get_snap):
mock_backend = mock.Mock()
self.driver._replication_target_arrays = [mock_backend]
mock_get_snap.return_value = None
self.assertRaises(exception.PureDriverException,
self.driver._find_failover_target,
None)
@mock.patch(BASE_DRIVER_OBJ + '._get_latest_replicated_pg_snap')
def test_find_failover_target_error_propagates_secondary_specified(
self, mock_get_snap):
mock_backend = mock.Mock()
mock_backend._backend_id = 'foo'
self.driver._replication_target_arrays = [mock_backend]
self.assert_error_propagates(
[mock_get_array, self.array.set_pgroup],
self.driver.replication_disable,
context, volume
[mock_get_snap],
self.driver._find_failover_target,
'foo'
)
@mock.patch(BASE_DRIVER_OBJ + '._get_latest_replicated_pg_snap')
def test_find_failover_target_error_propagates_no_secondary(
self, mock_get_snap):
self.driver._replication_target_arrays = [mock.Mock()]
self.assert_error_propagates(
[mock_get_snap],
self.driver._find_failover_target,
None
)
@mock.patch('cinder.volume.volume_types.get_volume_type')
@mock.patch(BASE_DRIVER_OBJ + '._get_current_array')
def test_replication_disable_already_disabled(self,
mock_get_array,
mock_get_volume_type):
mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
mock_get_array.return_value = self.array
context = mock.MagicMock()
volume = VOLUME
self.array.set_pgroup.side_effect = FakePureStorageHTTPError(
code=400, text='could not be found')
model_update = self.driver.replication_disable(context, volume)
self.assertDictMatch({'replication_status': 'disabled'}, model_update)
self.array.set_pgroup.assert_called_with(
self.driver._replication_pg_name,
remvollist=['volume-' + volume["id"] + '-cinder']
)
@mock.patch('cinder.volume.volume_types.get_volume_type')
def test_list_replication_targets(self, mock_get_volume_type):
mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
mock_backend_1 = mock.MagicMock()
mock_backend_1.array_name = "cinder-backend-1"
mock_backend_1._target_device_id = "cinder-backend-1-id"
mock_backend_2 = mock.MagicMock()
mock_backend_2.array_name = "cinder-backend-2"
mock_backend_2._target_device_id = "cinder-backend-2-id"
self.driver._replication_target_arrays = [
mock_backend_1,
mock_backend_2
]
context = mock.MagicMock()
vref = VOLUME
self.array.get_pgroup.return_value = {
"hgroups": None,
"hosts": None,
"name": "pg2",
"source": "pure01",
"targets": [
{
"name": mock_backend_1.array_name,
"allowed": True
},
{
"name": mock_backend_2.array_name,
"allowed": False
},
],
"time_remaining": 86395,
"volumes": [
"volume-" + vref['id'] + "-cinder",
"volume-123456-cinder"
]
}
data = self.driver.list_replication_targets(context, vref)
expected_data = {
'volume_id': vref['id'],
'targets': [
{
'target_device_id': mock_backend_1._target_device_id
},
{
'target_device_id': mock_backend_2._target_device_id
}
]
}
self.assertDictMatch(expected_data, data)
@mock.patch('cinder.volume.volume_types.get_volume_type')
def test_list_replication_targets_error_propagates(self,
mock_get_volume_type):
mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
self.array.get_pgroup = mock.MagicMock()
self.assert_error_propagates([self.array.get_pgroup],
self.driver.list_replication_targets,
mock.Mock(),
VOLUME)
@mock.patch('cinder.volume.volume_types.get_volume_type')
def test_list_replication_targets_no_targets(self, mock_get_volume_type):
mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
mock_backend_1 = mock.MagicMock()
mock_backend_1.array_name = "cinder-backend-1"
mock_backend_1._target_device_id = "cinder-backend-1-id"
mock_backend_2 = mock.MagicMock()
mock_backend_2.array_name = "cinder-backend-2"
mock_backend_2._target_device_id = "cinder-backend-2-id"
self.driver._replication_target_arrays = [
mock_backend_1,
mock_backend_2
]
context = mock.MagicMock()
vref = VOLUME
self.array.get_pgroup.return_value = {
"hgroups": None,
"hosts": None,
"name": "pg2",
"source": "pure01",
"targets": None,
"time_remaining": 86395,
"volumes": [
"volume-" + vref['id'] + "-cinder",
"volume-123456-cinder"
]
}
data = self.driver.list_replication_targets(context, vref)
expected_data = {
'volume_id': vref['id'],
'targets': []
}
self.assertDictMatch(expected_data, data)
@mock.patch('cinder.volume.volume_types.get_volume_type')
def test_list_replication_targets_no_volumes(self, mock_get_volume_type):
mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
mock_backend_1 = mock.MagicMock()
mock_backend_1.array_name = "cinder-backend-1"
mock_backend_1._target_device_id = "cinder-backend-1-id"
mock_backend_2 = mock.MagicMock()
mock_backend_2.array_name = "cinder-backend-2"
mock_backend_2._target_device_id = "cinder-backend-2-id"
self.driver._replication_target_arrays = [
mock_backend_1,
mock_backend_2
]
context = mock.MagicMock()
vref = VOLUME
self.array.get_pgroup.return_value = {
"hgroups": None,
"hosts": None,
"name": "pg2",
"source": "pure01",
"targets": None,
"time_remaining": 86395,
"volumes": None
}
data = self.driver.list_replication_targets(context, vref)
expected_data = {
'volume_id': vref['id'],
'targets': []
}
self.assertDictMatch(expected_data, data)
@mock.patch('cinder.volume.volume_types.get_volume_type')
def test_list_replication_targets_no_secondary_configured(
def test_enable_replication_if_needed_success(
self, mock_get_volume_type):
mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
self.driver._replication_target_arrays = []
self.driver._enable_replication_if_needed(self.array, VOLUME)
self.array.set_pgroup.assert_called_with(
self.driver._replication_pg_name,
addvollist=[VOLUME_PURITY_NAME]
)
@mock.patch('cinder.volume.volume_types.get_volume_type')
def test_enable_replication_if_needed_not_repl_type(
self, mock_get_volume_type):
mock_get_volume_type.return_value = NON_REPLICATED_VOL_TYPE
self.driver._enable_replication_if_needed(self.array, VOLUME)
self.assertFalse(self.array.set_pgroup.called)
@mock.patch('cinder.volume.volume_types.get_volume_type')
def test_enable_replication_if_needed_already_repl(
self, mock_get_volume_type):
mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
self.array.set_pgroup.side_effect = FakePureStorageHTTPError(
code=400, text='already belongs to')
self.driver._enable_replication_if_needed(self.array, VOLUME)
self.array.set_pgroup.assert_called_with(
self.driver._replication_pg_name,
addvollist=[VOLUME_PURITY_NAME]
)
@mock.patch('cinder.volume.volume_types.get_volume_type')
def test_enable_replication_if_needed_error_propagates(
self, mock_get_volume_type):
mock_get_volume_type.return_value = REPLICATED_VOL_TYPE
self.driver._enable_replication_if_needed(self.array, VOLUME)
self.assert_error_propagates(
[self.array.set_pgroup],
self.driver._enable_replication,
self.array, VOLUME
)
@mock.patch(BASE_DRIVER_OBJ + '._get_flasharray')
@mock.patch(BASE_DRIVER_OBJ + '._find_failover_target')
def test_failover(self, mock_find_failover_target, mock_get_array):
secondary_device_id = 'foo'
self.array2._backend_id = secondary_device_id
self.driver._replication_target_arrays = [self.array2]
array2_v1_3 = mock.Mock()
array2_v1_3._backend_id = secondary_device_id
array2_v1_3.array_name = GET_ARRAY_SECONDARY['array_name']
array2_v1_3.array_id = GET_ARRAY_SECONDARY['id']
array2_v1_3.version = '1.3'
mock_get_array.return_value = array2_v1_3
target_array = self.array2
target_array.copy_volume = mock.Mock()
mock_find_failover_target.return_value = (
target_array,
REPLICATED_PGSNAPS[1]
)
array2_v1_3.get_volume.return_value = REPLICATED_VOLUME_SNAPS
context = mock.MagicMock()
vref = VOLUME
self.array.get_pgroup.return_value = {
"hgroups": None,
"hosts": None,
"name": "pg2",
"source": "pure01",
"targets": [
{
"name": "dummy1",
"allowed": True
},
{
"name": "dummy2",
"allowed": False
},
],
"time_remaining": 86395,
"volumes": None
}
new_active_id, volume_updates = self.driver.failover_host(
context,
REPLICATED_VOLUME_OBJS,
None
)
data = self.driver.list_replication_targets(context, vref)
self.assertEqual(secondary_device_id, new_active_id)
self.assertEqual([], volume_updates)
expected_data = {
'volume_id': vref['id'],
'targets': []
}
self.assertDictMatch(expected_data, data)
calls = []
for snap in REPLICATED_VOLUME_SNAPS:
vol_name = snap['name'].split('.')[-1]
calls.append(mock.call(
snap['name'],
vol_name,
overwrite=True
))
target_array.copy_volume.assert_has_calls(calls, any_order=True)
@mock.patch(BASE_DRIVER_OBJ + '._get_flasharray')
@mock.patch(BASE_DRIVER_OBJ + '._find_failover_target')
def test_failover_error_propagates(self, mock_find_failover_target,
mock_get_array):
mock_find_failover_target.return_value = (
self.array2,
REPLICATED_PGSNAPS[1]
)
array2_v1_3 = mock.Mock()
array2_v1_3.array_name = GET_ARRAY_SECONDARY['array_name']
array2_v1_3.array_id = GET_ARRAY_SECONDARY['id']
array2_v1_3.version = '1.3'
mock_get_array.return_value = array2_v1_3
array2_v1_3.get_volume.return_value = REPLICATED_VOLUME_SNAPS
self.assert_error_propagates(
[mock_find_failover_target,
mock_get_array,
array2_v1_3.get_volume,
self.array2.copy_volume],
self.driver.failover_host,
mock.Mock(), REPLICATED_VOLUME_OBJS, None
)
def test_disable_replication_success(self):
self.driver._disable_replication(VOLUME)
self.array.set_pgroup.assert_called_with(
self.driver._replication_pg_name,
remvollist=[VOLUME_PURITY_NAME]
)
def test_disable_replication_error_propagates(self):
self.assert_error_propagates(
[self.array.set_pgroup],
self.driver._disable_replication,
VOLUME
)
def test_disable_replication_already_disabled(self):
self.array.set_pgroup.side_effect = FakePureStorageHTTPError(
code=400, text='could not be found')
self.driver._disable_replication(VOLUME)
self.array.set_pgroup.assert_called_with(
self.driver._replication_pg_name,
remvollist=[VOLUME_PURITY_NAME]
)
class PureISCSIDriverTestCase(PureDriverTestCase):
@ -2539,7 +2334,8 @@ class PureVolumeUpdateStatsTestCase(PureBaseSharedDriverTestCase):
'queue_depth': PERF_INFO['queue_depth'],
'replication_enabled': False,
'replication_type': ['async'],
'replication_count': 0
'replication_count': 0,
'replication_targets': [],
}
real_result = self.driver.get_volume_stats(refresh=True)

View File

@ -26,6 +26,7 @@ from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import units
import six
from cinder import context
from cinder import exception
@ -143,6 +144,8 @@ class PureBaseVolumeDriver(san.SanDriver):
self._replication_retention_long_term = None
self._replication_retention_long_term_per_day = None
self._is_replication_enabled = False
self._active_backend_id = kwargs.get('active_backend_id', None)
self._failed_over_primary_array = None
def parse_replication_configs(self):
self._replication_interval = (
@ -158,13 +161,15 @@ class PureBaseVolumeDriver(san.SanDriver):
retention_policy = self._generate_replication_retention()
replication_devices = self.configuration.safe_get(
'replication_device')
primary_array = self._get_current_array()
if replication_devices:
for replication_device in replication_devices:
target_device_id = replication_device["target_device_id"]
backend_id = replication_device["backend_id"]
san_ip = replication_device["san_ip"]
api_token = replication_device["api_token"]
target_array = self._get_flasharray(san_ip, api_token)
target_array._target_device_id = target_device_id
target_array._backend_id = backend_id
LOG.debug("Adding san_ip %(san_ip)s to replication_targets.",
{"san_ip": san_ip})
api_version = target_array.get_rest_version()
@ -179,14 +184,14 @@ class PureBaseVolumeDriver(san.SanDriver):
target_array_info = target_array.get()
target_array.array_name = target_array_info["array_name"]
target_array.array_id = target_array_info["id"]
LOG.debug("secondary array name: %s", self._array.array_name)
LOG.debug("secondary array id: %s", self._array.array_id)
self._setup_replicated_pgroups(target_array, [self._array],
LOG.debug("secondary array name: %s", target_array.array_name)
LOG.debug("secondary array id: %s", target_array.array_id)
self._setup_replicated_pgroups(target_array, [primary_array],
self._replication_pg_name,
self._replication_interval,
retention_policy)
self._replication_target_arrays.append(target_array)
self._setup_replicated_pgroups(self._array,
self._setup_replicated_pgroups(primary_array,
self._replication_target_arrays,
self._replication_pg_name,
self._replication_interval,
@ -206,13 +211,25 @@ class PureBaseVolumeDriver(san.SanDriver):
self._array = self._get_flasharray(
self.configuration.san_ip,
api_token=self.configuration.pure_api_token)
self._array._target_device_id = self.configuration.config_group
LOG.debug("Primary array target_device_id: %s",
self._array._backend_id = self._backend_name
LOG.debug("Primary array backend_id: %s",
self.configuration.config_group)
LOG.debug("Primary array name: %s", self._array.array_name)
LOG.debug("Primary array id: %s", self._array.array_id)
self.do_setup_replication()
# If we have failed over at some point we need to adjust our current
# array based on the one that we have failed over to
if (self._active_backend_id is not None and
self._active_backend_id != self._array._backend_id):
for array in self._replication_target_arrays:
if array._backend_id == self._active_backend_id:
self._failed_over_primary_array = self._array
self._array = array
break
def do_setup_replication(self):
replication_devices = self.configuration.safe_get(
'replication_device')
@ -230,7 +247,8 @@ class PureBaseVolumeDriver(san.SanDriver):
"""Creates a volume."""
vol_name = self._get_vol_name(volume)
vol_size = volume["size"] * units.Gi
self._array.create_volume(vol_name, vol_size)
current_array = self._get_current_array()
current_array.create_volume(vol_name, vol_size)
if volume['consistencygroup_id']:
self._add_volume_to_consistency_group(
@ -238,11 +256,7 @@ class PureBaseVolumeDriver(san.SanDriver):
vol_name
)
model_update = {'provider_location': self._array.array_id}
if self._add_and_replicate_if_needed(self._array, volume):
model_update['replication_status'] = 'enabled'
return model_update
self._enable_replication_if_needed(current_array, volume)
@log_debug_trace
def create_volume_from_snapshot(self, volume, snapshot):
@ -258,10 +272,7 @@ class PureBaseVolumeDriver(san.SanDriver):
'%(id)s.') % {'id': snapshot['id']}
raise exception.PureDriverException(reason=msg)
# Check which backend the snapshot is on. In case of failover and
# snapshot on failed over volume the snapshot may be on the
# secondary array.
current_array = self._get_current_array(snapshot)
current_array = self._get_current_array()
current_array.copy_volume(snap_name, vol_name)
self._extend_if_needed(current_array,
@ -269,43 +280,30 @@ class PureBaseVolumeDriver(san.SanDriver):
snapshot["volume_size"],
volume["size"])
# TODO(dwilson): figure out if we need to mirror consisgroup on
# target array if failover has occurred.
if volume['consistencygroup_id']:
if current_array.array_id == self._array.array_id:
self._add_volume_to_consistency_group(
volume['consistencygroup_id'],
vol_name)
else:
LOG.warning(_LW("Volume %s is failed over - skipping addition"
" to Consistency Group."), volume["id"])
self._add_volume_to_consistency_group(
volume['consistencygroup_id'],
vol_name)
model_update = {"provider_location": current_array.array_id}
if self._add_and_replicate_if_needed(current_array, volume):
model_update['replication_status'] = 'enabled'
self._enable_replication_if_needed(current_array, volume)
return model_update
def _add_and_replicate_if_needed(self, array, volume):
"""Add volume to protection group and create a snapshot."""
def _enable_replication_if_needed(self, array, volume):
if self._is_volume_replicated_type(volume):
try:
array.set_pgroup(self._replication_pg_name,
addvollist=[self._get_vol_name(volume)])
except purestorage.PureHTTPError as err:
with excutils.save_and_reraise_exception() as ctxt:
if (err.code == 400 and
ERR_MSG_ALREADY_BELONGS in err.text):
# Happens if the volume already added to PG.
ctxt.reraise = False
LOG.warning(_LW("Adding Volume to Protection Group "
"failed with message: %s"), err.text)
array.create_pgroup_snapshot(self._replication_pg_name,
replicate_now=True,
apply_retention=True)
return True
else:
return False
self._enable_replication(array, volume)
def _enable_replication(self, array, volume):
"""Add volume to replicated protection group."""
try:
array.set_pgroup(self._replication_pg_name,
addvollist=[self._get_vol_name(volume)])
except purestorage.PureHTTPError as err:
with excutils.save_and_reraise_exception() as ctxt:
if (err.code == 400 and
ERR_MSG_ALREADY_BELONGS in err.text):
# Happens if the volume already added to PG.
ctxt.reraise = False
LOG.warning(_LW("Adding Volume to Protection Group "
"failed with message: %s"), err.text)
@log_debug_trace
def create_cloned_volume(self, volume, src_vref):
@ -315,29 +313,19 @@ class PureBaseVolumeDriver(san.SanDriver):
# Check which backend the source volume is on. In case of failover
# the source volume may be on the secondary array.
current_array = self._get_current_array(src_vref)
current_array = self._get_current_array()
current_array.copy_volume(src_name, vol_name)
self._extend_if_needed(current_array,
vol_name,
src_vref["size"],
volume["size"])
# TODO(dwilson): figure out if we need to mirror consisgroup on
# target array if failover has occurred.
if volume['consistencygroup_id']:
if current_array.array_id == self._array.array_id:
self._add_volume_to_consistency_group(
volume['consistencygroup_id'],
vol_name)
else:
LOG.warning(_LW("Volume %s is failed over - skipping addition"
" to Consistency Group."), volume["id"])
self._add_volume_to_consistency_group(
volume['consistencygroup_id'],
vol_name)
model_update = {"provider_location": current_array.array_id}
if self._add_and_replicate_if_needed(current_array, volume):
model_update['replication_status'] = 'enabled'
return model_update
self._enable_replication_if_needed(current_array, volume)
def _extend_if_needed(self, array, vol_name, src_size, vol_size):
"""Extend the volume from size src_size to size vol_size."""
@ -349,7 +337,7 @@ class PureBaseVolumeDriver(san.SanDriver):
def delete_volume(self, volume):
"""Disconnect all hosts and delete the volume"""
vol_name = self._get_vol_name(volume)
current_array = self._get_current_array(volume)
current_array = self._get_current_array()
try:
connected_hosts = current_array.list_volume_private_connections(
vol_name)
@ -373,17 +361,16 @@ class PureBaseVolumeDriver(san.SanDriver):
"""Creates a snapshot."""
# Get current array in case we have failed over via replication.
current_array = self._get_current_array(snapshot)
current_array = self._get_current_array()
vol_name, snap_suff = self._get_snap_name(snapshot).split(".")
current_array.create_snapshot(vol_name, suffix=snap_suff)
return {'provider_location': current_array.array_id}
@log_debug_trace
def delete_snapshot(self, snapshot):
"""Deletes a snapshot."""
# Get current array in case we have failed over via replication.
current_array = self._get_current_array(snapshot)
current_array = self._get_current_array()
snap_name = self._get_snap_name(snapshot)
try:
@ -437,7 +424,7 @@ class PureBaseVolumeDriver(san.SanDriver):
def terminate_connection(self, volume, connector, **kwargs):
"""Terminate connection."""
# Get current array in case we have failed over via replication.
current_array = self._get_current_array(volume)
current_array = self._get_current_array()
self._disconnect(current_array, volume, connector, **kwargs)
@log_debug_trace
@ -484,13 +471,14 @@ class PureBaseVolumeDriver(san.SanDriver):
def _update_stats(self):
"""Set self._stats with relevant information."""
current_array = self._get_current_array()
# Collect info from the array
space_info = self._array.get(space=True)
perf_info = self._array.get(action='monitor')[0] # Always first index
hosts = self._array.list_hosts()
snaps = self._array.list_volumes(snap=True, pending=True)
pgroups = self._array.list_pgroups(pending=True)
space_info = current_array.get(space=True)
perf_info = current_array.get(action='monitor')[0] # Always index 0
hosts = current_array.list_hosts()
snaps = current_array.list_volumes(snap=True, pending=True)
pgroups = current_array.list_pgroups(pending=True)
# Perform some translations and calculations
total_capacity = float(space_info["capacity"]) / units.Gi
@ -548,14 +536,17 @@ class PureBaseVolumeDriver(san.SanDriver):
data['usec_per_write_op'] = perf_info['usec_per_write_op']
data['queue_depth'] = perf_info['queue_depth']
# Replication
data["replication_enabled"] = self._is_replication_enabled
data["replication_type"] = ["async"]
data["replication_count"] = len(self._replication_target_arrays)
data["replication_targets"] = [array._backend_id for array
in self._replication_target_arrays]
self._stats = data
def _get_provisioned_space(self):
"""Sum up provisioned size of all volumes on array"""
volumes = self._array.list_volumes(pending=True)
volumes = self._get_current_array().list_volumes(pending=True)
return sum(item["size"] for item in volumes), len(volumes)
def _get_thin_provisioning(self, provisioned_space, used_space):
@ -583,7 +574,7 @@ class PureBaseVolumeDriver(san.SanDriver):
"""Extend volume to new_size."""
# Get current array in case we have failed over via replication.
current_array = self._get_current_array(volume)
current_array = self._get_current_array()
vol_name = self._get_vol_name(volume)
new_size = new_size * units.Gi
@ -591,13 +582,15 @@ class PureBaseVolumeDriver(san.SanDriver):
def _add_volume_to_consistency_group(self, consistencygroup_id, vol_name):
pgroup_name = self._get_pgroup_name_from_id(consistencygroup_id)
self._array.set_pgroup(pgroup_name, addvollist=[vol_name])
current_array = self._get_current_array()
current_array.set_pgroup(pgroup_name, addvollist=[vol_name])
@log_debug_trace
def create_consistencygroup(self, context, group):
"""Creates a consistencygroup."""
self._array.create_pgroup(self._get_pgroup_name_from_id(group.id))
current_array = self._get_current_array()
current_array.create_pgroup(self._get_pgroup_name_from_id(group.id))
model_update = {'status': fields.ConsistencyGroupStatus.AVAILABLE}
return model_update
@ -626,7 +619,8 @@ class PureBaseVolumeDriver(san.SanDriver):
'while cloning Consistency Group %(source_group)s.',
{'snap_name': tmp_pgsnap_name,
'source_group': source_group.id})
self._array.create_pgroup_snapshot(pgroup_name, suffix=tmp_suffix)
current_array = self._get_current_array()
current_array.create_pgroup_snapshot(pgroup_name, suffix=tmp_suffix)
try:
for source_vol, cloned_vol in zip(source_vols, volumes):
source_snap_name = self._get_pgroup_vol_snap_name(
@ -635,7 +629,7 @@ class PureBaseVolumeDriver(san.SanDriver):
self._get_vol_name(source_vol)
)
cloned_vol_name = self._get_vol_name(cloned_vol)
self._array.copy_volume(source_snap_name, cloned_vol_name)
current_array.copy_volume(source_snap_name, cloned_vol_name)
self._add_volume_to_consistency_group(
group.id,
cloned_vol_name
@ -656,8 +650,7 @@ class PureBaseVolumeDriver(san.SanDriver):
return_volumes = []
for volume in volumes:
return_volume = {'id': volume.id, 'status': 'available',
'provider_location': self._array.array_id}
return_volume = {'id': volume.id, 'status': 'available'}
return_volumes.append(return_volume)
model_update = {'status': 'available'}
return model_update, return_volumes
@ -668,9 +661,10 @@ class PureBaseVolumeDriver(san.SanDriver):
try:
pgroup_name = self._get_pgroup_name_from_id(group.id)
self._array.destroy_pgroup(pgroup_name)
current_array = self._get_current_array()
current_array.destroy_pgroup(pgroup_name)
if self.configuration.pure_eradicate_on_delete:
self._array.eradicate_pgroup(pgroup_name)
current_array.eradicate_pgroup(pgroup_name)
except purestorage.PureHTTPError as err:
with excutils.save_and_reraise_exception() as ctxt:
if (err.code == 400 and
@ -709,8 +703,9 @@ class PureBaseVolumeDriver(san.SanDriver):
else:
remvollist = []
self._array.set_pgroup(pgroup_name, addvollist=addvollist,
remvollist=remvollist)
current_array = self._get_current_array()
current_array.set_pgroup(pgroup_name, addvollist=addvollist,
remvollist=remvollist)
return None, None, None
@ -721,14 +716,14 @@ class PureBaseVolumeDriver(san.SanDriver):
cg_id = cgsnapshot.consistencygroup_id
pgroup_name = self._get_pgroup_name_from_id(cg_id)
pgsnap_suffix = self._get_pgroup_snap_suffix(cgsnapshot)
self._array.create_pgroup_snapshot(pgroup_name, suffix=pgsnap_suffix)
current_array = self._get_current_array()
current_array.create_pgroup_snapshot(pgroup_name, suffix=pgsnap_suffix)
snapshot_updates = []
for snapshot in snapshots:
snapshot_updates.append({
'id': snapshot.id,
'status': 'available',
'provider_location': self._array.array_id
'status': 'available'
})
model_update = {'status': 'available'}
@ -736,12 +731,13 @@ class PureBaseVolumeDriver(san.SanDriver):
return model_update, snapshot_updates
def _delete_pgsnapshot(self, pgsnap_name):
current_array = self._get_current_array()
try:
# FlashArray.destroy_pgroup is also used for deleting
# pgroup snapshots. The underlying REST API is identical.
self._array.destroy_pgroup(pgsnap_name)
current_array.destroy_pgroup(pgsnap_name)
if self.configuration.pure_eradicate_on_delete:
self._array.eradicate_pgroup(pgsnap_name)
current_array.eradicate_pgroup(pgsnap_name)
except purestorage.PureHTTPError as err:
with excutils.save_and_reraise_exception() as ctxt:
if (err.code == 400 and
@ -792,8 +788,9 @@ class PureBaseVolumeDriver(san.SanDriver):
else:
ref_vol_name = existing_ref['name']
current_array = self._get_current_array()
try:
volume_info = self._array.get_volume(ref_vol_name, snap=is_snap)
volume_info = current_array.get_volume(ref_vol_name, snap=is_snap)
if volume_info:
if is_snap:
for snap in volume_info:
@ -823,9 +820,9 @@ class PureBaseVolumeDriver(san.SanDriver):
self._validate_manage_existing_ref(existing_ref)
ref_vol_name = existing_ref['name']
current_array = self._get_current_array()
connected_hosts = \
self._array.list_volume_private_connections(ref_vol_name)
current_array.list_volume_private_connections(ref_vol_name)
if len(connected_hosts) > 0:
raise exception.ManageExistingInvalidReference(
existing_ref=existing_ref,
@ -858,8 +855,9 @@ class PureBaseVolumeDriver(san.SanDriver):
This will not raise an exception if the object does not exist
"""
current_array = self._get_current_array()
try:
self._array.rename_volume(old_name, new_name)
current_array.rename_volume(old_name, new_name)
except purestorage.PureHTTPError as err:
with excutils.save_and_reraise_exception() as ctxt:
if (err.code == 400 and
@ -886,7 +884,8 @@ class PureBaseVolumeDriver(san.SanDriver):
self._rename_volume_object(vol_name, unmanaged_vol_name)
def _verify_manage_snap_api_requirements(self):
api_version = self._array.get_rest_version()
current_array = self._get_current_array()
api_version = current_array.get_rest_version()
if api_version not in MANAGE_SNAP_REQUIRED_API_VERSIONS:
msg = _('Unable to do manage snapshot operations with Purity REST '
'API version %(api_version)s, requires '
@ -1058,46 +1057,25 @@ class PureBaseVolumeDriver(san.SanDriver):
if previous_vol_replicated and not new_vol_replicated:
# Remove from protection group.
self.replication_disable(context, volume)
self._disable_replication(volume)
elif not previous_vol_replicated and new_vol_replicated:
# Add to protection group.
self.replication_enable(context, volume)
self._enable_replication(volume)
return True, None
# Replication v2
@log_debug_trace
def replication_enable(self, context, volume):
"""Enable replication on the given volume."""
# Get current array in case we have failed over.
current_array = self._get_current_array(volume)
LOG.debug("Enabling replication for volume %(id)s residing on "
"array %(target_device_id)s." %
{"id": volume["id"],
"target_device_id": current_array._target_device_id})
model_update = {"provider_location": current_array.array_id}
if self._add_and_replicate_if_needed(current_array, volume):
model_update['replication_status'] = 'enabled'
return model_update
@log_debug_trace
def replication_disable(self, context, volume):
def _disable_replication(self, volume):
"""Disable replication on the given volume."""
# Get current array in case we have failed over via replication.
current_array = self._get_current_array(volume)
current_array = self._get_current_array()
LOG.debug("Disabling replication for volume %(id)s residing on "
"array %(target_device_id)s." %
"array %(backend_id)s." %
{"id": volume["id"],
"target_device_id": current_array._target_device_id})
"backend_id": current_array._backend_id})
try:
current_array.set_pgroup(self._replication_pg_name,
remvollist=(
[self._get_vol_name(volume)]))
remvollist=([self._get_vol_name(volume)]))
except purestorage.PureHTTPError as err:
with excutils.save_and_reraise_exception() as ctxt:
if (err.code == 400 and
@ -1108,128 +1086,99 @@ class PureBaseVolumeDriver(san.SanDriver):
else:
LOG.error(_LE("Disable replication on volume failed with "
"message: %s"), err.text)
return {'replication_status': 'disabled'}
@log_debug_trace
def replication_failover(self, context, volume, secondary):
"""Failover volume to the secondary array
def failover_host(self, context, volumes, secondary_id=None):
"""Failover backend to a secondary array
This action will not affect the original volume in any
way and it will stay as is. If a subsequent replication_enable
and failover is performed we will simply overwrite the original
volume.
This action will not affect the original volumes in any
way and it will stay as is. If a subsequent failover is performed we
will simply overwrite the original (now unmanaged) volumes.
"""
vol_name = self._get_vol_name(volume)
# Get the latest replicated snapshot for src_name volume.
# Find "source": "<source_array>:<vol_name>" in snapshot attributes.
secondary_array = None
current_array, failover_candidate_arrays = self._get_arrays(volume)
LOG.debug("Failover replication for volume %(id)s residing on "
"array %(primary)s to %(secondary)s." %
{"id": volume["id"],
"primary": current_array._target_device_id,
"secondary": secondary})
if not failover_candidate_arrays:
raise exception.PureDriverException(
reason=_("Unable to failover volume %(volume)s, no "
"secondary targets configured.") %
{'volume': vol_name})
if secondary_id == 'default':
# We are going back to the 'original' driver config, just put
# our current array back to the primary.
if self._failed_over_primary_array:
self._set_current_array(self._failed_over_primary_array)
return secondary_id, []
else:
msg = _('Unable to failback to "default", this can only be '
'done after a failover has completed.')
raise exception.InvalidReplicationTarget(message=msg)
if secondary:
for array in failover_candidate_arrays:
if array._target_device_id == secondary:
secondary_array = array
if not secondary_array:
raise exception.PureDriverException(
reason=_("Unable to determine secondary_array from supplied "
"secondary: %(secondary)s.") %
{"secondary": secondary}
current_array = self._get_current_array()
LOG.debug("Failover replication for array %(primary)s to "
"%(secondary)s." % {
"primary": current_array._backend_id,
"secondary": secondary_id
})
if secondary_id == current_array._backend_id:
raise exception.InvalidReplicationTarget(
reason=_("Secondary id can not be the same as primary array, "
"backend_id = %(secondary)s.") %
{"secondary": secondary_id}
)
secondary_array, pg_snap = self._find_failover_target(secondary_id)
LOG.debug("Starting failover from %(primary)s to %(secondary)s",
{"primary": current_array.array_name,
"secondary": secondary_array.array_name})
vol_source_name_to_find = "%s:%s" % (current_array.array_name,
vol_name)
volume_snap = self._get_latest_replicated_vol_snap(
secondary_array,
current_array.array_name,
self._replication_pg_name,
vol_name)
if not volume_snap:
raise exception.PureDriverException(
reason=_("Unable to find volume snapshot for %s.")
% vol_source_name_to_find)
# Create volume from snapshot.
secondary_array.copy_volume(volume_snap["name"],
vol_name,
overwrite=True)
# New volume inherits replicated type, but is not actively replicating.
model_update = {"provider_location": secondary_array.array_id,
"replication_status": "failed-over"}
return model_update
# NOTE(patrickeast): This currently requires a call with REST API 1.3.
# If we need to, create a temporary FlashArray for this operation.
api_version = secondary_array.get_rest_version()
LOG.debug("Current REST API for array id %(id)s is %(api_version)s",
{"id": secondary_array.array_id, "api_version": api_version})
if api_version != '1.3':
target_array = self._get_flasharray(
secondary_array._target,
api_token=secondary_array._api_token,
rest_version='1.3'
)
else:
target_array = secondary_array
@log_debug_trace
def list_replication_targets(self, context, vref):
"""Return all connected arrays that are active."""
data = {'volume_id': vref['id']}
status = {}
current_array, failover_candidate_arrays = self._get_arrays(vref)
LOG.debug("List replication targets for volume %(id)s residing on "
"array %(primary)s." %
{"id": vref["id"],
"primary": current_array._target_device_id})
pgroup = current_array.get_pgroup(self._replication_pg_name)
volume_name = self._get_vol_name(vref)
volumes_in_pgroup = pgroup["volumes"]
is_vol_in_pgroup = (volumes_in_pgroup and
volume_name in pgroup["volumes"])
# Purity returns None instead of empty list if no targets
target_arrays = pgroup.get("targets") or []
for target_array in target_arrays:
if is_vol_in_pgroup:
status[target_array["name"]] = target_array["allowed"]
volume_snaps = target_array.get_volume(pg_snap['name'],
snap=True,
pgroup=True)
# We only care about volumes that are in the list we are given.
vol_names = set()
for vol in volumes:
vol_names.add(self._get_vol_name(vol))
for snap in volume_snaps:
vol_name = snap['name'].split('.')[-1]
if vol_name in vol_names:
vol_names.remove(vol_name)
LOG.debug('Creating volume %(vol)s from replicated snapshot '
'%(snap)s', {'vol': vol_name, 'snap': snap['name']})
secondary_array.copy_volume(snap['name'],
vol_name,
overwrite=True)
else:
status[target_array["name"]] = False
LOG.debug('Ignoring unmanaged volume %(vol)s from replicated '
'snapshot %(snap)s.', {'vol': vol_name,
'snap': snap['name']})
# The only volumes remaining in the vol_names set have been left behind
# on the array and should be considered as being in an error state.
model_updates = []
for vol in volumes:
if self._get_vol_name(vol) in vol_names:
model_updates.append({
'volume_id': vol['id'],
'updates': {
'status': 'error',
}
})
remote_targets = []
for flash_array in (failover_candidate_arrays or []):
if flash_array.array_name in status:
remote_targets.append(
{'target_device_id': flash_array._target_device_id})
data["targets"] = remote_targets
return data
def _get_current_array(self, volume):
current_array, _ = self._get_arrays(volume)
return current_array
def _get_arrays(self, volume):
"""Returns the current and secondary arrays for a volume or snapshot
:param volume: volume or snapshot object
:return: the current_array, list of secondary_arrays for the volume
"""
current_array_id = volume.get("provider_location", None)
# Default to configured current array, including case when
# provider_location is misconfigured.
primary_array = self._array
secondary_arrays = []
if self._replication_target_arrays:
secondary_arrays = self._replication_target_arrays
if current_array_id and not current_array_id == self._array.array_id:
for flash_array in self._replication_target_arrays:
if flash_array.array_id == current_array_id:
primary_array = flash_array
secondary_arrays = [self._array]
break
return primary_array, secondary_arrays
# After failover we want our current array to be swapped for the
# secondary array we just failed over to.
self._failed_over_primary_array = self._get_current_array()
self._set_current_array(secondary_array)
return secondary_array._backend_id, model_updates
def _does_pgroup_exist(self, array, pgroup_name):
"""Return True/False"""
@ -1356,23 +1305,10 @@ class PureBaseVolumeDriver(san.SanDriver):
return replication_retention
@log_debug_trace
def _get_latest_replicated_vol_snap(self,
array,
source_array_name,
pgroup_name,
vol_name):
# NOTE(patrickeast): This currently requires a call with REST API 1.3
# if we need to create a temporary FlashArray for this operation.
api_version = array.get_rest_version()
LOG.debug("Current REST API for array id %(id)s is %(api_version)s",
{"id": array.array_id, "api_version": api_version})
if api_version != '1.3':
target_array = self._get_flasharray(array._target,
api_token=array._api_token,
rest_version='1.3')
else:
target_array = array
def _get_latest_replicated_pg_snap(self,
target_array,
source_array_name,
pgroup_name):
# Get all protection group snapshots.
snap_name = "%s:%s" % (source_array_name, pgroup_name)
LOG.debug("Looking for snap %(snap)s on array id %(array_id)s",
@ -1391,29 +1327,12 @@ class PureBaseVolumeDriver(san.SanDriver):
pg_snaps_filtered.sort(key=lambda x: x["created"], reverse=True)
LOG.debug("Sorted list of snapshots %(pg_snaps_filtered)s",
{"pg_snaps_filtered": pg_snaps_filtered})
volume_snap = None
vol_snap_source_to_find = "%s:%s" % (source_array_name, vol_name)
LOG.debug("Searching for snapshot of volume %(vol)s on array "
"%(array)s.",
{"vol": vol_snap_source_to_find, "array": array.array_name})
for pg_snap in pg_snaps_filtered:
# Get volume snapshots inside the replicated PG snapshot.
volume_snaps = target_array.get_volume(pg_snap["name"],
snap=True,
pgroup=True)
for snap in volume_snaps:
LOG.debug("Examining snapshot %(snap)s.", {"snap": snap})
if snap["source"] == vol_snap_source_to_find:
volume_snap = snap
break
if volume_snap: # Found the volume snapshot we needed.
LOG.debug("Found snapshot for volume %(vol)s in "
"snap %(snap)s.",
{"snap": pg_snap["name"],
"vol": vol_snap_source_to_find})
break
return volume_snap
pg_snap = pg_snaps_filtered[0] if pg_snaps_filtered else None
LOG.debug("Selecting snapshot %(pg_snap)s for failover.",
{"pg_snap": pg_snap})
return pg_snap
@log_debug_trace
def _create_protection_group_if_not_exist(self, source_array, pgname):
@ -1449,6 +1368,69 @@ class PureBaseVolumeDriver(san.SanDriver):
replication_flag = (replication_capability == "<is> True")
return replication_flag
def _find_failover_target(self, secondary):
if not self._replication_target_arrays:
raise exception.PureDriverException(
reason=_("Unable to find failover target, no "
"secondary targets configured."))
secondary_array = None
pg_snap = None
if secondary:
for array in self._replication_target_arrays:
if array._backend_id == secondary:
secondary_array = array
break
if not secondary_array:
raise exception.InvalidReplicationTarget(
reason=_("Unable to determine secondary_array from"
" supplied secondary: %(secondary)s.") %
{"secondary": secondary}
)
pg_snap = self._get_latest_replicated_pg_snap(
secondary_array,
self._get_current_array().array_name,
self._replication_pg_name
)
else:
LOG.debug('No secondary array id specified, checking all targets.')
for array in self._replication_target_arrays:
try:
secondary_array = array
pg_snap = self._get_latest_replicated_pg_snap(
secondary_array,
self._get_current_array().array_name,
self._replication_pg_name
)
if pg_snap:
break
except Exception:
LOG.exception(_LE('Error finding replicated pg snapshot '
'on %(secondary)s.'),
{'secondary': array._backend_id})
if not secondary_array:
raise exception.PureDriverException(
reason=_("Unable to find viable secondary array from"
"configured targets: %(targets)s.") %
{"targets": six.text_type(self._replication_target_arrays)}
)
if not pg_snap:
raise exception.PureDriverException(
reason=_("Unable to find viable pg snapshot to use for"
"failover on selected secondary array: %(id)s.") %
{"id": secondary_array._backend_id}
)
return secondary_array, pg_snap
def _get_current_array(self):
return self._array
def _set_current_array(self, array):
self._array = array
class PureISCSIDriver(PureBaseVolumeDriver, san.SanISCSIDriver):
@ -1520,7 +1502,8 @@ class PureISCSIDriver(PureBaseVolumeDriver, san.SanISCSIDriver):
def _get_target_iscsi_ports(self):
"""Return list of iSCSI-enabled port descriptions."""
ports = self._array.list_ports()
current_array = self._get_current_array()
ports = current_array.list_ports()
iscsi_ports = [port for port in ports if port["iqn"]]
if not iscsi_ports:
raise exception.PureDriverException(
@ -1559,7 +1542,7 @@ class PureISCSIDriver(PureBaseVolumeDriver, san.SanISCSIDriver):
(chap_username, chap_password, initiator_update) = \
self._get_chap_credentials(connector['host'], initiator_data)
current_array = self._get_current_array(volume)
current_array = self._get_current_array()
vol_name = self._get_vol_name(volume)
host = self._get_host(current_array, connector)
@ -1639,7 +1622,7 @@ class PureFCDriver(PureBaseVolumeDriver, driver.FibreChannelDriver):
@log_debug_trace
def initialize_connection(self, volume, connector, initiator_data=None):
"""Allow connection to connector and return connection info."""
current_array = self._get_current_array(volume)
current_array = self._get_current_array()
connection = self._connect(volume, connector)
target_wwns = self._get_array_wwns(current_array)
init_targ_map = self._build_initiator_target_map(target_wwns,
@ -1662,7 +1645,7 @@ class PureFCDriver(PureBaseVolumeDriver, driver.FibreChannelDriver):
"""Connect the host and volume; return dict describing connection."""
wwns = connector["wwpns"]
current_array = self._get_current_array(volume)
current_array = self._get_current_array()
vol_name = self._get_vol_name(volume)
host = self._get_host(current_array, connector)
@ -1706,7 +1689,7 @@ class PureFCDriver(PureBaseVolumeDriver, driver.FibreChannelDriver):
@log_debug_trace
def terminate_connection(self, volume, connector, **kwargs):
"""Terminate connection."""
current_array = self._get_current_array(volume)
current_array = self._get_current_array()
no_more_connections = self._disconnect(current_array, volume,
connector, **kwargs)