Huawei: Implement v2 replication (managed)

This patch implements the managed side of v2 replication in the
HuaweiDriver. Both Synchronous mode and Asynchronous mode are
supported with Huawei arrays. The volume type need to associate with
the extra spec with 'replication_enabled' equaling to '<is> True', and
'replication_type' equaling to '<in> sync' or '<in> async'. If
'replication_type' is not provided, it will be defaulted to
Asynchronous mode.

What are supported with replication:
1. create volume
2. create volume from snapshot
3. clone a volume
4. create volume from image
5. volume retype
6. volume migration

So far we support a single remote device, and 'cinder.conf' should
configure a replication local backend and a replication remote backend
as follows:

[replica]
volume_driver =
    cinder.volume.drivers.huawei.huawei_driver.HuaweiISCSIDriver
cinder_huawei_conf_file = /etc/cinder/cinder_huawei_conf.xml
volume_backend_name = replica
replication_device = target_device_id:huawei-replica-1,
    managed_backend_name:host@replica-remote#pool,
    san_address:san_url_1;san_url_2,
    san_user:admin,san_password:passwd

[replica-remote]
volume_driver =
    cinder.volume.drivers.huawei.huawei_driver.HuaweiISCSIDriver
cinder_huawei_conf_file = /etc/cinder/cinder_huawei_conf_remote.xml
volume_backend_name = replica-remote
replication_device = target_device_id:huawei-replica-2,
    managed_backend_name:host@replica#pool,
    san_address:san_url_1;san_url_2,
    san_user:admin,san_password:passwd

Change-Id: Iaa3834547f80e7687f7e1946cb10d35f9ff0c136
Implements: blueprint support-replication-for-huawei-volume-driver
This commit is contained in:
chenzongliang 2015-12-12 17:11:55 +08:00 committed by neochin
parent d5ccbf9c28
commit a51c2b8b07
7 changed files with 1560 additions and 37 deletions

View File

@ -31,6 +31,7 @@ from cinder.volume.drivers.huawei import fc_zone_helper
from cinder.volume.drivers.huawei import huawei_conf
from cinder.volume.drivers.huawei import huawei_driver
from cinder.volume.drivers.huawei import hypermetro
from cinder.volume.drivers.huawei import replication
from cinder.volume.drivers.huawei import rest_client
from cinder.volume.drivers.huawei import smartx
@ -98,6 +99,31 @@ hyper_volume = {'name': 'volume-21ec7341-9256-497b-97d9-ef48edcf0635',
'value': '11'}],
}
sync_replica_specs = {'replication_enabled': '<is> True',
'replication_type': '<in> sync'}
async_replica_specs = {'replication_enabled': '<is> True',
'replication_type': '<in> async'}
TEST_PAIR_ID = "3400a30d844d0004"
replication_volume = {
'name': 'volume-21ec7341-9256-497b-97d9-ef48edcf0635',
'size': 2,
'volume_name': 'vol1',
'id': '21ec7341-9256-497b-97d9-ef48edcf0635',
'volume_id': '21ec7341-9256-497b-97d9-ef48edcf0635',
'provider_auth': None,
'project_id': 'project',
'display_name': 'vol1',
'display_description': 'test volume',
'volume_type_id': None,
'host': 'ubuntu@huawei#OpenStack_Pool',
'provider_location': '11',
'metadata': {'lun_wwn': '6643e8c1004c5f6723e9f454003'},
'replication_status': 'disabled',
'replication_driver_data':
'{"pair_id": "%s", "rmt_lun_id": "1"}' % TEST_PAIR_ID,
}
test_snap = {'name': 'volume-21ec7341-9256-497b-97d9-ef48edcf0635',
'size': 1,
'volume_name': 'vol1',
@ -157,6 +183,22 @@ test_new_type = {
'description': None,
}
test_new_replication_type = {
'name': u'new_type',
'qos_specs_id': None,
'deleted': False,
'created_at': None,
'updated_at': None,
'extra_specs': {
'replication_enabled': '<is> True',
'replication_type': '<in> sync',
},
'is_public': True,
'deleted_at': None,
'id': u'530a56e1-a1a4-49f3-ab6c-779a6e5d999f',
'description': None,
}
hypermetro_devices = """
{
"remote_device": {
@ -279,7 +321,13 @@ FAKE_LUN_INFO_RESPONSE = """
},
"data": {
"ID": "1",
"NAME": "5mFHcBv4RkCcD+JyrWc0SA"
"NAME": "5mFHcBv4RkCcD+JyrWc0SA",
"WWN": "6643e8c1004c5f6723e9f454003",
"DESCRIPTION": "21ec7341-9256-497b-97d9-ef48edcf0635",
"HEALTHSTATUS": "1",
"RUNNINGSTATUS": "27",
"ALLOCTYPE": "1",
"CAPACITY": "2097152"
}
}
"""
@ -294,7 +342,7 @@ FAKE_LUN_GET_SUCCESS_RESPONSE = """
"IOCLASSID": "11",
"NAME": "5mFHcBv4RkCcD+JyrWc0SA",
"DESCRIPTION": "21ec7341-9256-497b-97d9-ef48edcf0635",
"RUNNINGSTATUS": "2",
"RUNNINGSTATUS": "10",
"HEALTHSTATUS": "1",
"RUNNINGSTATUS": "27",
"LUNLIST": "",
@ -309,7 +357,9 @@ FAKE_LUN_GET_SUCCESS_RESPONSE = """
"WRITECACHEPOLICY": "5",
"OWNINGCONTROLLER": "0B",
"SMARTCACHEPARTITIONID": "",
"CACHEPARTITIONID": ""
"CACHEPARTITIONID": "",
"WWN": "6643e8c1004c5f6723e9f454003",
"PARENTNAME": "OpenStack_Pool"
}
}
"""
@ -938,7 +988,8 @@ FAKE_SYSTEM_VERSION_RESPONSE = """
"code": 0
},
"data":{
"PRODUCTVERSION": "V100R001C10"
"PRODUCTVERSION": "V100R001C10",
"wwn": "21003400a30d844d"
}
}
"""
@ -1603,6 +1654,148 @@ MAP_COMMAND_TO_FAKE_RESPONSE['/fc_port/associate?TYPE=213&ASSOCIATEOBJTYPE='
FAKE_PORTS_IN_PG_RESPONSE)
# Replication response
FAKE_GET_REMOTEDEV_RESPONSE = """
{
"data":[{
"ARRAYTYPE":"1",
"HEALTHSTATUS":"1",
"ID":"0",
"NAME":"Huawei.Storage",
"RUNNINGSTATUS":"1",
"WWN":"21003400a30d844d"
}],
"error":{
"code":0,
"description":"0"
}
}
"""
MAP_COMMAND_TO_FAKE_RESPONSE['/remote_device/GET'] = (
FAKE_GET_REMOTEDEV_RESPONSE)
FAKE_CREATE_PAIR_RESPONSE = """
{
"data":{
"ID":"%s"
},
"error":{
"code":0,
"description":"0"
}
}
""" % TEST_PAIR_ID
MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/POST'] = (
FAKE_CREATE_PAIR_RESPONSE)
FAKE_DELETE_PAIR_RESPONSE = """
{
"data":{},
"error":{
"code":0,
"description":"0"
}
}
"""
MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/%s/DELETE' % TEST_PAIR_ID] = (
FAKE_DELETE_PAIR_RESPONSE)
FAKE_SET_PAIR_ACCESS_RESPONSE = """
{
"data":{},
"error":{
"code":0,
"description":"0"
}
}
"""
MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/%s/PUT' % TEST_PAIR_ID] = (
FAKE_SET_PAIR_ACCESS_RESPONSE)
FAKE_GET_PAIR_NORMAL_RESPONSE = """
{
"data":{
"REPLICATIONMODEL": "1",
"RUNNINGSTATUS": "1",
"SECRESACCESS": "2",
"HEALTHSTATUS": "1",
"ISPRIMARY": "true"
},
"error":{
"code":0,
"description":"0"
}
}
"""
FAKE_GET_PAIR_SPLIT_RESPONSE = """
{
"data":{
"REPLICATIONMODEL": "1",
"RUNNINGSTATUS": "26",
"SECRESACCESS": "2",
"ISPRIMARY": "true"
},
"error":{
"code":0,
"description":"0"
}
}
"""
FAKE_GET_PAIR_SYNC_RESPONSE = """
{
"data":{
"REPLICATIONMODEL": "1",
"RUNNINGSTATUS": "23",
"SECRESACCESS": "2"
},
"error":{
"code":0,
"description":"0"
}
}
"""
MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/%s/GET' % TEST_PAIR_ID] = (
FAKE_GET_PAIR_NORMAL_RESPONSE)
FAKE_SYNC_PAIR_RESPONSE = """
{
"data":{},
"error":{
"code":0,
"description":"0"
}
}
"""
MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/sync/PUT'] = (
FAKE_SYNC_PAIR_RESPONSE)
FAKE_SPLIT_PAIR_RESPONSE = """
{
"data":{},
"error":{
"code":0,
"description":"0"
}
}
"""
MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/split/PUT'] = (
FAKE_SPLIT_PAIR_RESPONSE)
FAKE_SWITCH_PAIR_RESPONSE = """
{
"data":{},
"error":{
"code":0,
"description":"0"
}
}
"""
MAP_COMMAND_TO_FAKE_RESPONSE['/REPLICATIONPAIR/switch/PUT'] = (
FAKE_SWITCH_PAIR_RESPONSE)
def Fake_sleep(time):
pass
@ -1612,7 +1805,14 @@ class FakeHuaweiConf(object):
self.conf = conf
self.protocol = protocol
def safe_get(self, key):
try:
return getattr(self.conf, key)
except Exception:
return
def update_config_value(self):
setattr(self.conf, 'volume_backend_name', 'huawei_storage')
setattr(self.conf, 'san_address',
['http://100.115.10.69:8082/deviceManager/rest/'])
setattr(self.conf, 'san_user', 'admin')
@ -1646,6 +1846,16 @@ class FakeHuaweiConf(object):
'TargetPortGroup': 'portgroup-test', }
setattr(self.conf, 'iscsi_info', [iscsi_info])
targets = [{'target_device_id': 'huawei-replica-1',
'managed_backend_name': 'ubuntu@huawei2#OpenStack_Pool',
'san_address':
'https://100.97.10.69:8088/deviceManager/rest/',
'san_user': 'admin',
'san_password': 'Admin@storage1'}]
setattr(self.conf, 'replication_device', targets)
setattr(self.conf, 'safe_get', self.safe_get)
class FakeClient(rest_client.RestClient):
@ -1717,6 +1927,11 @@ class FakeDB(object):
return volumes
class FakeReplicaPairManager(replication.ReplicaPairManager):
def _init_rmt_client(self):
self.rmt_client = FakeClient(self.conf)
class FakeISCSIStorage(huawei_driver.HuaweiISCSIDriver):
"""Fake Huawei Storage, Rewrite some methods of HuaweiISCSIDriver."""
@ -1734,6 +1949,7 @@ class FakeISCSIStorage(huawei_driver.HuaweiISCSIDriver):
self.rmt_client,
self.configuration,
self.db)
self.replica = FakeReplicaPairManager(self.client, self.configuration)
class FakeFCStorage(huawei_driver.HuaweiFCDriver):
@ -1754,6 +1970,7 @@ class FakeFCStorage(huawei_driver.HuaweiFCDriver):
self.rmt_client,
self.configuration,
self.db)
self.replica = FakeReplicaPairManager(self.client, self.configuration)
@ddt.ddt
@ -1836,9 +2053,20 @@ class HuaweiISCSIDriverTestCase(test.TestCase):
self.assertTrue(delete_flag)
def test_create_volume_from_snapsuccess(self):
lun_info = self.driver.create_volume_from_snapshot(test_volume,
test_volume)
self.assertEqual('1', lun_info['provider_location'])
self.mock_object(
huawei_driver.HuaweiBaseDriver,
'_get_volume_type',
mock.Mock(return_value={'extra_specs': sync_replica_specs}))
self.mock_object(replication.ReplicaCommonDriver, 'sync')
model_update = self.driver.create_volume_from_snapshot(test_volume,
test_volume)
self.assertEqual('1', model_update['provider_location'])
driver_data = {'pair_id': TEST_PAIR_ID,
'rmt_lun_id': '1'}
driver_data = replication.to_string(driver_data)
self.assertEqual(driver_data, model_update['replication_driver_data'])
self.assertEqual('enabled', model_update['replication_status'])
def test_initialize_connection_success(self):
iscsi_properties = self.driver.initialize_connection(test_volume,
@ -1850,7 +2078,7 @@ class HuaweiISCSIDriverTestCase(test.TestCase):
def test_get_volume_status(self):
data = self.driver.get_volume_stats()
self.assertEqual('2.0.3', data['driver_version'])
self.assertEqual('2.0.5', data['driver_version'])
def test_extend_volume(self):
@ -2501,6 +2729,311 @@ class HuaweiISCSIDriverTestCase(test.TestCase):
self.driver.unmanage_snapshot(test_snapshot)
self.assertEqual(1, mock_rename.call_count)
def test_init_rmt_client(self):
self.mock_object(rest_client, 'RestClient',
mock.Mock(return_value=None))
replica = replication.ReplicaPairManager(self.driver.client,
self.configuration)
self.assertEqual(replica.rmt_pool, 'OpenStack_Pool')
self.assertEqual(replica.target_dev_id, 'huawei-replica-1')
@ddt.data(sync_replica_specs, async_replica_specs)
def test_create_replication_success(self, mock_type):
self.mock_object(replication.ReplicaCommonDriver, 'sync')
self.mock_object(
huawei_driver.HuaweiBaseDriver,
'_get_volume_type',
mock.Mock(return_value={'extra_specs': mock_type}))
model_update = self.driver.create_volume(replication_volume)
driver_data = {'pair_id': TEST_PAIR_ID,
'rmt_lun_id': '1'}
driver_data = replication.to_string(driver_data)
self.assertEqual(driver_data, model_update['replication_driver_data'])
self.assertEqual('enabled', model_update['replication_status'])
@ddt.data(
[
rest_client.RestClient,
'get_array_info',
mock.Mock(
side_effect=exception.VolumeBackendAPIException(data='err'))
],
[
rest_client.RestClient,
'get_remote_devices',
mock.Mock(
side_effect=exception.VolumeBackendAPIException(data='err'))
],
[
rest_client.RestClient,
'get_remote_devices',
mock.Mock(return_value={})
],
[
replication.ReplicaPairManager,
'wait_volume_online',
mock.Mock(side_effect=[
None,
exception.VolumeBackendAPIException(data='err')])
],
[
rest_client.RestClient,
'create_pair',
mock.Mock(
side_effect=exception.VolumeBackendAPIException(data='err'))
],
[
replication.ReplicaCommonDriver,
'sync',
mock.Mock(
side_effect=exception.VolumeBackendAPIException(data='err'))
],
)
@ddt.unpack
def test_create_replication_fail(self, mock_module, mock_func, mock_value):
self.mock_object(
huawei_driver.HuaweiBaseDriver,
'_get_volume_type',
mock.Mock(return_value={'extra_specs': sync_replica_specs}))
self.mock_object(replication.ReplicaPairManager, '_delete_pair')
self.mock_object(mock_module, mock_func, mock_value)
self.assertRaises(
exception.VolumeBackendAPIException,
self.driver.create_volume, replication_volume)
def test_delete_replication_success(self):
self.mock_object(replication.ReplicaCommonDriver, 'split')
self.mock_object(
huawei_driver.HuaweiBaseDriver,
'_get_volume_type',
mock.Mock(return_value={'extra_specs': sync_replica_specs}))
self.driver.delete_volume(replication_volume)
self.mock_object(rest_client.RestClient, 'check_lun_exist',
mock.Mock(return_value=False))
self.driver.delete_volume(replication_volume)
def test_wait_volume_online(self):
replica = FakeReplicaPairManager(self.driver.client,
self.configuration)
lun_info = {'ID': '11'}
replica.wait_volume_online(self.driver.client, lun_info)
offline_status = {'RUNNINGSTATUS': '28'}
replica.wait_volume_online(self.driver.client, lun_info)
with mock.patch.object(rest_client.RestClient, 'get_lun_info',
offline_status):
self.assertRaises(exception.VolumeBackendAPIException,
replica.wait_volume_online,
self.driver.client,
lun_info)
def test_wait_second_access(self):
pair_id = '1'
access_ro = constants.REPLICA_SECOND_RO
access_rw = constants.REPLICA_SECOND_RW
op = replication.PairOp(self.driver.client)
common_driver = replication.ReplicaCommonDriver(self.configuration, op)
self.mock_object(replication.PairOp, 'get_replica_info',
mock.Mock(return_value={'SECRESACCESS': access_ro}))
common_driver.wait_second_access(pair_id, access_ro)
self.assertRaises(exception.VolumeBackendAPIException,
common_driver.wait_second_access, pair_id, access_rw)
def test_wait_replica_ready(self):
normal_status = {
'RUNNINGSTATUS': constants.REPLICA_RUNNING_STATUS_NORMAL,
'HEALTHSTATUS': constants.REPLICA_HEALTH_STATUS_NORMAL
}
split_status = {
'RUNNINGSTATUS': constants.REPLICA_RUNNING_STATUS_SPLIT,
'HEALTHSTATUS': constants.REPLICA_HEALTH_STATUS_NORMAL
}
sync_status = {
'RUNNINGSTATUS': constants.REPLICA_RUNNING_STATUS_SYNC,
'HEALTHSTATUS': constants.REPLICA_HEALTH_STATUS_NORMAL
}
pair_id = '1'
op = replication.PairOp(self.driver.client)
common_driver = replication.ReplicaCommonDriver(self.configuration, op)
with mock.patch.object(replication.PairOp, 'get_replica_info',
mock.Mock(return_value=normal_status)):
common_driver.wait_replica_ready(pair_id)
with mock.patch.object(
replication.PairOp,
'get_replica_info',
mock.Mock(side_effect=[sync_status, normal_status])):
common_driver.wait_replica_ready(pair_id)
with mock.patch.object(replication.PairOp, 'get_replica_info',
mock.Mock(return_value=split_status)):
self.assertRaises(exception.VolumeBackendAPIException,
common_driver.wait_replica_ready, pair_id)
def test_replication_enable_success(self):
self.mock_object(replication.ReplicaCommonDriver, 'unprotect_second')
self.mock_object(replication.ReplicaCommonDriver, 'split')
self.mock_object(replication.PairOp, 'is_primary',
mock.Mock(side_effect=[False, True]))
self.driver.replication_enable(None, replication_volume)
@ddt.data(
[
replication.AbsReplicaOp,
'is_running_status',
mock.Mock(return_value=False)
],
[
replication,
'get_replication_driver_data',
mock.Mock(return_value={})
],
[
replication.PairOp,
'get_replica_info',
mock.Mock(return_value={})
],
)
@ddt.unpack
def test_replication_enable_fail(self, mock_module, mock_func, mock_value):
self.mock_object(mock_module, mock_func, mock_value)
self.assertRaises(
exception.VolumeBackendAPIException,
self.driver.replication_enable, None, replication_volume)
def test_replication_disable_fail(self):
self.assertRaises(
exception.VolumeBackendAPIException,
self.driver.replication_disable, None, replication_volume)
def test_replication_disable_success(self):
self.mock_object(replication.ReplicaCommonDriver, 'split')
self.driver.replication_disable(None, replication_volume)
self.mock_object(replication, 'get_replication_driver_data',
mock.Mock(return_value={}))
self.driver.replication_disable(None, replication_volume)
def test_replication_failover_success(self):
self.mock_object(replication.ReplicaCommonDriver, 'split')
self.mock_object(replication.PairOp, 'is_primary',
mock.Mock(return_value=False))
model_update = self.driver.replication_failover(
None, replication_volume, None)
self.assertEqual('ubuntu@huawei2#OpenStack_Pool', model_update['host'])
self.assertEqual('1', model_update['provider_location'])
driver_data = {'pair_id': TEST_PAIR_ID,
'rmt_lun_id': '11'}
driver_data = replication.to_string(driver_data)
self.assertEqual(driver_data, model_update['replication_driver_data'])
@ddt.data(
[
replication.PairOp,
'is_primary',
mock.Mock(return_value=True)
],
[
replication.PairOp,
'is_primary',
mock.Mock(return_value=False)
],
[
replication,
'get_replication_driver_data',
mock.Mock(return_value={})
],
[
replication,
'get_replication_driver_data',
mock.Mock(return_value={'pair_id': '1'})
],
)
@ddt.unpack
def test_replication_failover_fail(self,
mock_module, mock_func, mock_value):
self.mock_object(
replication.ReplicaCommonDriver,
'wait_second_access',
mock.Mock(
side_effect=exception.VolumeBackendAPIException(data="error")))
self.mock_object(mock_module, mock_func, mock_value)
self.assertRaises(
exception.VolumeBackendAPIException,
self.driver.replication_failover,
None,
replication_volume, None)
def test_list_replication_targets(self):
info = self.driver.list_replication_targets(None, replication_volume)
targets = [{'target_device_id': 'huawei-replica-1'}]
self.assertEqual(targets, info['targets'])
self.mock_object(replication, 'get_replication_driver_data',
mock.Mock(return_value={}))
info = self.driver.list_replication_targets(None, replication_volume)
self.assertEqual(targets, info['targets'])
@ddt.data(constants.REPLICA_SECOND_RW, constants.REPLICA_SECOND_RO)
def test_replication_protect_second(self, mock_access):
replica_id = TEST_PAIR_ID
op = replication.PairOp(self.driver.client)
common_driver = replication.ReplicaCommonDriver(self.configuration, op)
self.mock_object(replication.ReplicaCommonDriver, 'wait_second_access')
self.mock_object(
replication.PairOp,
'get_replica_info',
mock.Mock(return_value={'SECRESACCESS': mock_access}))
common_driver.protect_second(replica_id)
common_driver.unprotect_second(replica_id)
def test_replication_sync(self):
replica_id = TEST_PAIR_ID
op = replication.PairOp(self.driver.client)
common_driver = replication.ReplicaCommonDriver(self.configuration, op)
async_normal_status = {
'REPLICATIONMODEL': constants.REPLICA_ASYNC_MODEL,
'RUNNINGSTATUS': constants.REPLICA_RUNNING_STATUS_NORMAL,
'HEALTHSTATUS': constants.REPLICA_HEALTH_STATUS_NORMAL
}
self.mock_object(replication.ReplicaCommonDriver, 'protect_second')
self.mock_object(replication.PairOp, 'get_replica_info',
mock.Mock(return_value=async_normal_status))
common_driver.sync(replica_id, True)
common_driver.sync(replica_id, False)
def test_replication_split(self):
replica_id = TEST_PAIR_ID
op = replication.PairOp(self.driver.client)
common_driver = replication.ReplicaCommonDriver(self.configuration, op)
self.mock_object(replication.ReplicaCommonDriver, 'wait_expect_state')
self.mock_object(replication.PairOp, 'split', mock.Mock(
side_effect=exception.VolumeBackendAPIException(data='err')))
common_driver.split(replica_id)
def test_replication_base_op(self):
replica_id = '1'
op = replication.AbsReplicaOp(None)
op.create()
op.delete(replica_id)
op.protect_second(replica_id)
op.unprotect_second(replica_id)
op.sync(replica_id)
op.split(replica_id)
op.switch(replica_id)
op.is_primary({})
op.get_replica_info(replica_id)
op._is_status(None, {'key': 'volue'}, None)
class FCSanLookupService(object):
@ -2565,9 +3098,33 @@ class HuaweiFCDriverTestCase(test.TestCase):
self.assertTrue(self.driver.client.terminateFlag)
def test_get_volume_status(self):
remote_device_info = {"ARRAYTYPE": "1",
"HEALTHSTATUS": "1",
"RUNNINGSTATUS": "10"}
self.mock_object(
replication.ReplicaPairManager,
'get_remote_device_by_wwn',
mock.Mock(return_value=remote_device_info))
data = self.driver.get_volume_stats()
self.assertEqual('2.0.4', data['driver_version'])
self.assertEqual('2.0.5', data['driver_version'])
self.assertTrue(data['pools'][0]['replication_enabled'])
self.assertListEqual(['sync', 'async'],
data['pools'][0]['replication_type'])
self.mock_object(
replication.ReplicaPairManager,
'get_remote_device_by_wwn',
mock.Mock(return_value={}))
data = self.driver.get_volume_stats()
self.assertNotIn('replication_enabled', data['pools'][0])
self.mock_object(
replication.ReplicaPairManager,
'try_get_remote_wwn',
mock.Mock(return_value={}))
data = self.driver.get_volume_stats()
self.assertEqual('2.0.5', data['driver_version'])
self.assertNotIn('replication_enabled', data['pools'][0])
def test_extend_volume(self):
self.driver.extend_volume(test_volume, 3)
@ -2755,6 +3312,17 @@ class HuaweiFCDriverTestCase(test.TestCase):
test_new_type, None, test_host)
self.assertTrue(retype)
@mock.patch.object(rest_client.RestClient, 'add_lun_to_partition')
@mock.patch.object(
huawei_driver.HuaweiBaseDriver,
'_get_volume_type',
return_value={'extra_specs': sync_replica_specs})
def test_retype_replication_volume_success(self, mock_get_type,
mock_add_lun_to_partition):
retype = self.driver.retype(None, test_volume,
test_new_replication_type, None, test_host)
self.assertTrue(retype)
def test_retype_volume_cache_fail(self):
self.driver.client.cache_not_exist = True

View File

@ -75,3 +75,31 @@ QOS_KEYS = ['MAXIOPS', 'MINIOPS', 'MINBANDWidth',
'MAXBANDWidth', 'LATENCY', 'IOTYPE']
MAX_LUN_NUM_IN_QOS = 64
HYPERMETRO_CLASS = "cinder.volume.drivers.huawei.hypermetro.HuaweiHyperMetro"
DEFAULT_REPLICA_WAIT_INTERVAL = 1
DEFAULT_REPLICA_WAIT_TIMEOUT = 10
REPLICA_SYNC_MODEL = '1'
REPLICA_ASYNC_MODEL = '2'
REPLICA_SPEED = '2'
REPLICA_PERIOD = '3600'
REPLICA_SECOND_RO = '2'
REPLICA_SECOND_RW = '3'
REPLICA_RUNNING_STATUS_KEY = 'RUNNINGSTATUS'
REPLICA_RUNNING_STATUS_INITIAL_SYNC = '21'
REPLICA_RUNNING_STATUS_SYNC = '23'
REPLICA_RUNNING_STATUS_SYNCED = '24'
REPLICA_RUNNING_STATUS_NORMAL = '1'
REPLICA_RUNNING_STATUS_SPLIT = '26'
REPLICA_RUNNING_STATUS_INVALID = '35'
REPLICA_HEALTH_STATUS_KEY = 'HEALTHSTATUS'
REPLICA_HEALTH_STATUS_NORMAL = '1'
REPLICA_LOCAL_DATA_STATUS_KEY = 'PRIRESDATASTATUS'
REPLICA_REMOTE_DATA_STATUS_KEY = 'SECRESDATASTATUS'
REPLICA_DATA_SYNC_KEY = 'ISDATASYNC'
REPLICA_DATA_STATUS_SYNCED = '1'
REPLICA_DATA_STATUS_COMPLETE = '2'
REPLICA_DATA_STATUS_INCOMPLETE = '3'

View File

@ -151,7 +151,6 @@ class HuaweiConf(object):
else:
msg = (_("Invalid lun type %s is configured.") % lun_type)
LOG.exception(msg)
raise exception.InvalidInput(reason=msg)
setattr(self.conf, 'lun_type', lun_type)

View File

@ -33,6 +33,7 @@ from cinder.volume.drivers.huawei import fc_zone_helper
from cinder.volume.drivers.huawei import huawei_conf
from cinder.volume.drivers.huawei import huawei_utils
from cinder.volume.drivers.huawei import hypermetro
from cinder.volume.drivers.huawei import replication
from cinder.volume.drivers.huawei import rest_client
from cinder.volume.drivers.huawei import smartx
from cinder.volume import utils as volume_utils
@ -90,6 +91,10 @@ class HuaweiBaseDriver(driver.VolumeDriver):
metro_san_password)
self.rmt_client.login()
# init replication manager
self.replica = replication.ReplicaPairManager(self.client,
self.configuration)
def check_for_setup_error(self):
pass
@ -98,7 +103,9 @@ class HuaweiBaseDriver(driver.VolumeDriver):
self.huawei_conf.update_config_value()
if self.metro_flag:
self.rmt_client.get_all_pools()
return self.client.update_volume_stats()
stats = self.client.update_volume_stats()
stats = self.replica.update_replica_capability(stats)
return stats
def _get_volume_type(self, volume):
volume_type = None
@ -127,6 +134,8 @@ class HuaweiBaseDriver(driver.VolumeDriver):
'thin_provisioning_support': False,
'thick_provisioning_support': False,
'hypermetro': False,
'replication_enabled': False,
'replication_type': 'async',
}
opts_value = {
@ -146,6 +155,7 @@ class HuaweiBaseDriver(driver.VolumeDriver):
opts_associate,
specs)
opts = smartx.SmartX().get_smartx_specs_opts(opts)
opts = replication.get_replication_opts(opts)
LOG.debug('volume opts %(opts)s.', {'opts': opts})
return opts
@ -172,12 +182,15 @@ class HuaweiBaseDriver(driver.VolumeDriver):
if ((not scope or scope == 'capabilities')
and key in opts_capability):
words = value.split()
if not (words and len(words) == 2 and words[0] == '<is>'):
LOG.error(_LE("Extra specs must be specified as "
"capabilities:%s='<is> True' or "
"'<is> true'."), key)
else:
if words and len(words) == 2 and words[0] in ('<is>', '<in>'):
opts[key] = words[1].lower()
elif key == 'replication_type':
LOG.error(_LE("Extra specs must be specified as "
"replication_type='<in> sync' or "
"'<in> async'."))
else:
LOG.error(_LE("Extra specs must be specified as "
"capabilities:%s='<is> True'."), key)
if ((scope in opts_capability)
and (key in opts_value)
@ -193,7 +206,7 @@ class HuaweiBaseDriver(driver.VolumeDriver):
'TYPE': '11',
'NAME': huawei_utils.encode_name(volume['id']),
'PARENTTYPE': '216',
'PARENTID': self.client.get_pool_id(volume, pool_name),
'PARENTID': self.client.get_pool_id(pool_name),
'DESCRIPTION': volume['name'],
'ALLOCTYPE': opts.get('LUNType', self.configuration.lun_type),
'CAPACITY': huawei_utils.get_volume_size(volume),
@ -220,10 +233,11 @@ class HuaweiBaseDriver(driver.VolumeDriver):
model_update['metadata'] = metadata
return lun_info, model_update
def create_volume(self, volume):
"""Create a volume."""
volume_type = self._get_volume_type(volume)
opts = self._get_volume_params(volume_type)
def _create_base_type_volume(self, opts, volume, volume_type):
"""Create volume and add some base type.
Base type is the services won't conflict with the other service.
"""
lun_params = self._get_lun_params(volume, opts)
lun_info, model_update = self._create_volume(volume, lun_params)
lun_id = lun_info['ID']
@ -244,7 +258,17 @@ class HuaweiBaseDriver(driver.VolumeDriver):
msg = _('Create volume error. Because %s.') % six.text_type(err)
raise exception.VolumeBackendAPIException(data=msg)
if (opts.get('hypermetro') and opts.get('hypermetro') == 'true'):
return lun_params, lun_info, model_update
def _add_extend_type_to_volume(self, opts, lun_params, lun_info,
model_update):
"""Add the extend type.
Extend type is the services may conflict with LUNCopy.
So add it after the those services.
"""
lun_id = lun_info['ID']
if opts.get('hypermetro') == 'true':
metro = hypermetro.HuaweiHyperMetro(self.client,
self.rmt_client,
self.configuration,
@ -257,6 +281,35 @@ class HuaweiBaseDriver(driver.VolumeDriver):
self._delete_lun_with_check(lun_id)
raise
if opts.get('replication_enabled') == 'true':
replica_model = opts.get('replication_type')
try:
replica_info = self.replica.create_replica(lun_info,
replica_model)
model_update.update(replica_info)
except Exception as err:
LOG.exception(_LE('Create replication volume error.'))
self._delete_lun_with_check(lun_id)
raise
return model_update
def create_volume(self, volume):
"""Create a volume."""
volume_type = self._get_volume_type(volume)
opts = self._get_volume_params(volume_type)
if (opts.get('hypermetro') == 'true'
and opts.get('replication_enabled') == 'true'):
err_msg = _("Hypermetro and Replication can not be "
"used in the same volume_type.")
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
lun_params, lun_info, model_update = (
self._create_base_type_volume(opts, volume, volume_type))
model_update = self._add_extend_type_to_volume(opts, lun_params,
lun_info, model_update)
return model_update
def _delete_volume(self, volume):
@ -281,7 +334,7 @@ class HuaweiBaseDriver(driver.VolumeDriver):
lun_id = volume.get('provider_location')
if not lun_id or not self.client.check_lun_exist(lun_id):
LOG.warning(_LW("Can't find lun %s on the array."), lun_id)
return False
return
qos_id = self.client.get_qosid_by_lunid(lun_id)
if qos_id:
@ -301,6 +354,16 @@ class HuaweiBaseDriver(driver.VolumeDriver):
self._delete_volume(volume)
raise
# Delete a replication volume
replica_data = volume.get('replication_driver_data')
if replica_data:
try:
self.replica.delete_replica(volume)
except exception.VolumeBackendAPIException as err:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Delete replication error."))
self._delete_volume(volume)
self._delete_volume(volume)
def _delete_lun_with_check(self, lun_id):
@ -414,6 +477,15 @@ class HuaweiBaseDriver(driver.VolumeDriver):
def migrate_volume(self, ctxt, volume, host, new_type=None):
"""Migrate a volume within the same array."""
# NOTE(jlc): Replication volume can't migrate. But retype
# can remove replication relationship first then do migrate.
# So don't add this judgement into _check_migration_valid().
volume_type = self._get_volume_type(volume)
opts = self._get_volume_params(volume_type)
if opts.get('replication_enabled') == 'true':
return (False, None)
return self._migrate_volume(volume, host, new_type)
def _check_migration_valid(self, host, volume):
@ -516,6 +588,15 @@ class HuaweiBaseDriver(driver.VolumeDriver):
We use LUNcopy to copy a new volume from snapshot.
The time needed increases as volume size does.
"""
volume_type = self._get_volume_type(volume)
opts = self._get_volume_params(volume_type)
if (opts.get('hypermetro') == 'true'
and opts.get('replication_enabled') == 'true'):
err_msg = _("Hypermetro and Replication can not be "
"used in the same volume_type.")
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
snapshotname = huawei_utils.encode_name(snapshot['id'])
snapshot_id = snapshot.get('provider_location')
if snapshot_id is None:
@ -528,7 +609,9 @@ class HuaweiBaseDriver(driver.VolumeDriver):
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
model_update = self.create_volume(volume)
lun_params, lun_info, model_update = (
self._create_base_type_volume(opts, volume, volume_type))
tgt_lun_id = model_update['provider_location']
luncopy_name = huawei_utils.encode_name(volume['id'])
LOG.info(_LI(
@ -555,6 +638,10 @@ class HuaweiBaseDriver(driver.VolumeDriver):
self._copy_volume(volume, luncopy_name,
snapshot_id, tgt_lun_id)
# NOTE(jlc): Actually, we just only support replication here right
# now, not hypermetro.
model_update = self._add_extend_type_to_volume(opts, lun_params,
lun_info, model_update)
return model_update
def create_cloned_volume(self, volume, src_vref):
@ -598,6 +685,14 @@ class HuaweiBaseDriver(driver.VolumeDriver):
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
volume_type = self._get_volume_type(volume)
opts = self._get_volume_params(volume_type)
if opts.get('replication_enabled') == 'true':
msg = (_("Can't extend replication volume, volume: %(id)s") %
{"id": volume['id']})
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
old_size = huawei_utils.get_volume_size(volume)
new_size = int(new_size) * units.Gi / 512
volume_name = huawei_utils.encode_name(volume['id'])
@ -670,25 +765,51 @@ class HuaweiBaseDriver(driver.VolumeDriver):
migration, change_opts, lun_id = self.determine_changes_when_retype(
volume, new_type, host)
model_update = {}
replica_enabled_change = change_opts.get('replication_enabled')
replica_type_change = change_opts.get('replication_type')
if replica_enabled_change and replica_enabled_change[0] == 'true':
try:
self.replica.delete_replica(volume)
model_update.update({'replication_status': 'disabled',
'replication_driver_data': None})
except exception.VolumeBackendAPIException:
LOG.exception(_LE('Retype volume error. '
'Delete replication failed.'))
return False
try:
if migration:
LOG.debug("Begin to migrate LUN(id: %(lun_id)s) with "
"change %(change_opts)s.",
{"lun_id": lun_id, "change_opts": change_opts})
if self._migrate_volume(volume, host, new_type):
return True
else:
if not self._migrate_volume(volume, host, new_type):
LOG.warning(_LW("Storage-assisted migration failed during "
"retype."))
return False
else:
# Modify lun to change policy
self.modify_lun(lun_id, change_opts)
return True
except exception.VolumeBackendAPIException:
LOG.exception(_LE('Retype volume error.'))
return False
if replica_enabled_change and replica_enabled_change[1] == 'true':
try:
# If replica_enabled_change is not None, the
# replica_type_change won't be None. See function
# determine_changes_when_retype.
lun_info = self.client.get_lun_info(lun_id)
replica_info = self.replica.create_replica(
lun_info, replica_type_change[1])
model_update.update(replica_info)
except exception.VolumeBackendAPIException:
LOG.exception(_LE('Retype volume error. '
'Create replication failed.'))
return False
return (True, model_update)
def modify_lun(self, lun_id, change_opts):
if change_opts.get('partitionid'):
old, new = change_opts['partitionid']
@ -858,6 +979,19 @@ class HuaweiBaseDriver(driver.VolumeDriver):
migration = True
change_opts['LUNType'] = (old_opts['LUNType'], new_opts['LUNType'])
volume_type = self._get_volume_type(volume)
volume_opts = self._get_volume_params(volume_type)
if (volume_opts['replication_enabled'] == 'true'
or new_opts['replication_enabled'] == 'true'):
# If replication_enabled changes,
# then replication_type in change_opts will be set.
change_opts['replication_enabled'] = (
volume_opts['replication_enabled'],
new_opts['replication_enabled'])
change_opts['replication_type'] = (volume_opts['replication_type'],
new_opts['replication_type'])
change_opts = self._check_needed_changes(lun_id, old_opts, new_opts,
change_opts, new_type)
@ -1060,6 +1194,7 @@ class HuaweiBaseDriver(driver.VolumeDriver):
# Check other stuffs to determine whether this LUN can be imported.
self._check_lun_valid_for_manage(lun_info, external_ref)
type_id = volume.get('volume_type_id')
new_opts = None
if type_id:
# Handle volume type if specified.
old_opts = self.get_lun_specs(lun_id)
@ -1087,7 +1222,21 @@ class HuaweiBaseDriver(driver.VolumeDriver):
self.client.rename_lun(lun_id, new_name, # pylint: disable=E1121
description)
return {'provider_location': lun_id}
model_update = {}
model_update.update({'provider_location': lun_id})
if new_opts and new_opts.get('replication_enabled'):
LOG.debug("Manage volume need to create replication.")
try:
lun_info = self.client.get_lun_info(lun_id)
replica_info = self.replica.create_replica(
lun_info, new_opts.get('replication_type'))
model_update.update(replica_info)
except exception.VolumeBackendAPIException:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Manage exist volume failed."))
return model_update
def _get_lun_info_by_ref(self, external_ref):
LOG.debug("Get external_ref: %s", external_ref)
@ -1236,6 +1385,27 @@ class HuaweiBaseDriver(driver.VolumeDriver):
{'snapshot_id': snapshot['id'],
'snapshot_name': snapshot_name})
def replication_enable(self, context, volume):
"""Enable replication and do switch role when needed."""
self.replica.enable_replica(volume)
def replication_disable(self, context, volume):
"""Disable replication."""
self.replica.disable_replica(volume)
def replication_failover(self, context, volume, secondary):
"""Disable replication and unprotect remote LUN."""
return self.replica.failover_replica(volume)
def list_replication_targets(self, context, vref):
"""Obtain volume repliction targets."""
return self.replica.list_replica_targets(vref)
def get_replication_updates(self, context):
# NOTE(jlc): The manager does not do aynthing with these updates.
# When that is changed, here must be modified as well.
return []
class HuaweiISCSIDriver(HuaweiBaseDriver, driver.ISCSIDriver):
"""ISCSI driver for Huawei storage arrays.
@ -1254,9 +1424,10 @@ class HuaweiISCSIDriver(HuaweiBaseDriver, driver.ISCSIDriver):
2.0.1 - Manage/unmanage volume support
2.0.2 - Refactor HuaweiISCSIDriver
2.0.3 - Manage/unmanage snapshot support
2.0.5 - Replication V2 support
"""
VERSION = "2.0.3"
VERSION = "2.0.5"
def __init__(self, *args, **kwargs):
super(HuaweiISCSIDriver, self).__init__(*args, **kwargs)
@ -1449,9 +1620,10 @@ class HuaweiFCDriver(HuaweiBaseDriver, driver.FibreChannelDriver):
2.0.2 - Refactor HuaweiFCDriver
2.0.3 - Manage/unmanage snapshot support
2.0.4 - Balanced FC port selection
2.0.5 - Replication V2 support
"""
VERSION = "2.0.4"
VERSION = "2.0.5"
def __init__(self, *args, **kwargs):
super(HuaweiFCDriver, self).__init__(*args, **kwargs)
@ -1515,7 +1687,7 @@ class HuaweiFCDriver(HuaweiBaseDriver, driver.FibreChannelDriver):
if not wwns_in_host and not iqns_in_host:
self.client.remove_host(host_id)
msg = (_('Can not add FC initiator to host.'))
msg = _('Can not add FC initiator to host.')
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)

View File

@ -0,0 +1,675 @@
# Copyright (c) 2016 Huawei Technologies Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import json
import re
from oslo_log import log as logging
from oslo_utils import excutils
from cinder import exception
from cinder.i18n import _, _LW, _LE
from cinder.volume.drivers.huawei import constants
from cinder.volume.drivers.huawei import huawei_utils
from cinder.volume.drivers.huawei import rest_client
from cinder.volume import utils as volume_utils
LOG = logging.getLogger(__name__)
class AbsReplicaOp(object):
def __init__(self, client):
self.client = client
def create(self, **kwargs):
pass
def delete(self, replica_id):
pass
def protect_second(self, replica_id):
pass
def unprotect_second(self, replica_id):
pass
def sync(self, replica_id):
pass
def split(self, replica_id):
pass
def switch(self, replica_id):
pass
def is_primary(self, replica_info):
flag = replica_info.get('ISPRIMARY')
if flag and flag.lower() == 'true':
return True
return False
def get_replica_info(self, replica_id):
return {}
def _is_status(self, status_key, status, replica_info):
if type(status) in (list, tuple):
return replica_info.get(status_key, '') in status
if type(status) is str:
return replica_info.get(status_key, '') == status
return False
def is_running_status(self, status, replica_info):
return self._is_status(constants.REPLICA_RUNNING_STATUS_KEY,
status, replica_info)
def is_health_status(self, status, replica_info):
return self._is_status(constants.REPLICA_HEALTH_STATUS_KEY,
status, replica_info)
class PairOp(AbsReplicaOp):
def create(self, local_lun_id, rmt_lun_id, rmt_dev_id,
rmt_dev_name, replica_model,
speed=constants.REPLICA_SPEED,
period=constants.REPLICA_PERIOD,
**kwargs):
super(PairOp, self).create(**kwargs)
params = {
"LOCALRESID": local_lun_id,
"LOCALRESTYPE": '11',
"REMOTEDEVICEID": rmt_dev_id,
"REMOTEDEVICENAME": rmt_dev_name,
"REMOTERESID": rmt_lun_id,
"REPLICATIONMODEL": replica_model,
# recovery policy. 1: auto, 2: manual
"RECOVERYPOLICY": '2',
"SPEED": speed,
}
if replica_model == constants.REPLICA_ASYNC_MODEL:
# Synchronize type values:
# 1, manual
# 2, timed wait when synchronization begins
# 3, timed wait when synchronization ends
params['SYNCHRONIZETYPE'] = '2'
params['TIMINGVAL'] = period
try:
pair_info = self.client.create_pair(params)
except Exception as err:
msg = _('Create replication pair failed. Error: %s.') % err
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
return pair_info
def split(self, pair_id):
self.client.split_pair(pair_id)
def delete(self, pair_id, force=False):
self.client.delete_pair(pair_id, force)
def protect_second(self, pair_id):
self.client.set_pair_second_access(pair_id,
constants.REPLICA_SECOND_RO)
def unprotect_second(self, pair_id):
self.client.set_pair_second_access(pair_id,
constants.REPLICA_SECOND_RW)
def sync(self, pair_id):
self.client.sync_pair(pair_id)
def switch(self, pair_id):
self.client.switch_pair(pair_id)
def get_replica_info(self, pair_id):
return self.client.get_pair_by_id(pair_id)
class CGOp(AbsReplicaOp):
pass
class ReplicaCommonDriver(object):
def __init__(self, conf, replica_op):
self.conf = conf
self.op = replica_op
def protect_second(self, replica_id):
info = self.op.get_replica_info(replica_id)
if info.get('SECRESACCESS') == constants.REPLICA_SECOND_RO:
return
self.op.protect_second(replica_id)
self.wait_second_access(replica_id, constants.REPLICA_SECOND_RO)
def unprotect_second(self, replica_id):
info = self.op.get_replica_info(replica_id)
if info.get('SECRESACCESS') == constants.REPLICA_SECOND_RW:
return
self.op.unprotect_second(replica_id)
self.wait_second_access(replica_id, constants.REPLICA_SECOND_RW)
def sync(self, replica_id, wait_complete=False):
self.protect_second(replica_id)
expect_status = (constants.REPLICA_RUNNING_STATUS_NORMAL,
constants.REPLICA_RUNNING_STATUS_SYNC,
constants.REPLICA_RUNNING_STATUS_INITIAL_SYNC)
info = self.op.get_replica_info(replica_id)
# When running status is synchronizing or normal,
# it's not necessary to do synchronize again.
if (info.get('REPLICATIONMODEL') == constants.REPLICA_SYNC_MODEL
and self.op.is_running_status(expect_status, info)):
return
self.op.sync(replica_id)
self.wait_expect_state(replica_id, expect_status)
if wait_complete:
self.wait_replica_ready(replica_id)
def split(self, replica_id):
running_status = (constants.REPLICA_RUNNING_STATUS_SPLIT,
constants.REPLICA_RUNNING_STATUS_INVALID)
info = self.op.get_replica_info(replica_id)
if self.op.is_running_status(running_status, info):
return
try:
self.op.split(replica_id)
except Exception as err:
LOG.warning(_LW('Split replication exception: %s.'), err)
try:
self.wait_expect_state(replica_id, running_status)
except Exception as err:
msg = _('Split replication failed.')
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
def enable(self, replica_id, wait_sync_complete=False):
info = self.op.get_replica_info(replica_id)
if not self.op.is_primary(info):
self.switch(replica_id)
self.sync(replica_id)
return None
def disable(self, replica_id):
self.split(replica_id)
return None
def switch(self, replica_id):
self.split(replica_id)
self.unprotect_second(replica_id)
self.op.switch(replica_id)
# Wait to be primary
def _wait_switch_to_primary():
info = self.op.get_replica_info(replica_id)
if self.op.is_primary(info):
return True
return False
interval = constants.DEFAULT_REPLICA_WAIT_INTERVAL
timeout = constants.DEFAULT_REPLICA_WAIT_TIMEOUT
huawei_utils.wait_for_condition(_wait_switch_to_primary,
interval,
timeout)
def failover(self, replica_id):
"""Failover replication.
Purpose:
1. Split replication.
2. Set secondary access read & write.
"""
info = self.op.get_replica_info(replica_id)
if self.op.is_primary(info):
msg = _('We should not do switch over on primary array.')
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
sync_status_set = (constants.REPLICA_RUNNING_STATUS_SYNC,
constants.REPLICA_RUNNING_STATUS_INITIAL_SYNC)
if self.op.is_running_status(sync_status_set, info):
self.wait_replica_ready(replica_id)
self.split(replica_id)
self.op.unprotect_second(replica_id)
def wait_replica_ready(self, replica_id, interval=None, timeout=None):
LOG.debug('Wait synchronize complete.')
running_status_normal = (constants.REPLICA_RUNNING_STATUS_NORMAL,
constants.REPLICA_RUNNING_STATUS_SYNCED)
running_status_sync = (constants.REPLICA_RUNNING_STATUS_SYNC,
constants.REPLICA_RUNNING_STATUS_INITIAL_SYNC)
health_status_normal = constants.REPLICA_HEALTH_STATUS_NORMAL
def _replica_ready():
info = self.op.get_replica_info(replica_id)
if (self.op.is_running_status(running_status_normal, info)
and self.op.is_health_status(health_status_normal, info)):
return True
if not self.op.is_running_status(running_status_sync, info):
msg = (_('Wait synchronize failed. Running status: %s.') %
info.get(constants.REPLICA_RUNNING_STATUS_KEY))
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
return False
if not interval:
interval = constants.DEFAULT_WAIT_INTERVAL
if not timeout:
timeout = constants.DEFAULT_WAIT_TIMEOUT
huawei_utils.wait_for_condition(_replica_ready,
interval,
timeout)
def wait_second_access(self, replica_id, access_level):
def _check_access():
info = self.op.get_replica_info(replica_id)
if info.get('SECRESACCESS') == access_level:
return True
return False
interval = constants.DEFAULT_REPLICA_WAIT_INTERVAL
timeout = constants.DEFAULT_REPLICA_WAIT_TIMEOUT
huawei_utils.wait_for_condition(_check_access,
interval,
timeout)
def wait_expect_state(self, replica_id,
running_status, health_status=None,
interval=None, timeout=None):
def _check_state():
info = self.op.get_replica_info(replica_id)
if self.op.is_running_status(running_status, info):
if (not health_status
or self.op.is_health_status(health_status, info)):
return True
return False
if not interval:
interval = constants.DEFAULT_REPLICA_WAIT_INTERVAL
if not timeout:
timeout = constants.DEFAULT_REPLICA_WAIT_TIMEOUT
huawei_utils.wait_for_condition(_check_state, interval, timeout)
def get_replication_driver_data(volume):
if volume.get('replication_driver_data'):
return json.loads(volume['replication_driver_data'])
return {}
def to_string(dict_data):
if dict_data:
return json.dumps(dict_data)
return ''
class ReplicaPairManager(object):
def __init__(self, local_client, conf):
self.local_client = local_client
self.conf = conf
self.replica_device = self.conf.safe_get('replication_device')
if not self.replica_device:
return
# managed_backed_name format: host_name@backend_name#pool_name
self.rmt_backend = self.replica_device[0]['managed_backend_name']
self.rmt_pool = volume_utils.extract_host(self.rmt_backend,
level='pool')
self.target_dev_id = self.replica_device[0]['target_device_id']
self._init_rmt_client()
self.local_op = PairOp(self.local_client)
self.local_driver = ReplicaCommonDriver(self.conf, self.local_op)
self.rmt_op = PairOp(self.rmt_client)
self.rmt_driver = ReplicaCommonDriver(self.conf, self.rmt_op)
self.try_login_remote_array()
def try_login_remote_array(self):
try:
self.rmt_client.login()
except Exception as err:
LOG.warning(_LW('Remote array login failed. Error: %s.'), err)
def try_get_remote_wwn(self):
try:
info = self.rmt_client.get_array_info()
return info.get('wwn')
except Exception as err:
LOG.warning(_LW('Get remote array wwn failed. Error: %s.'), err)
return None
def get_remote_device_by_wwn(self, wwn):
devices = {}
try:
devices = self.local_client.get_remote_devices()
except Exception as err:
LOG.warning(_LW('Get remote devices failed. Error: %s.'), err)
for device in devices:
if device.get('WWN') == wwn:
return device
return {}
def check_remote_available(self):
if not self.replica_device:
return False
# We get device wwn in every check time.
# If remote array changed, we can run normally.
wwn = self.try_get_remote_wwn()
if not wwn:
return False
device = self.get_remote_device_by_wwn(wwn)
# Check remote device is available to use.
# If array type is replication, 'ARRAYTYPE' == '1'.
# If health status is normal, 'HEALTHSTATUS' == '1'.
if (device and device.get('ARRAYTYPE') == '1'
and device.get('HEALTHSTATUS') == '1'
and device.get('RUNNINGSTATUS') == constants.STATUS_RUNNING):
return True
return False
def update_replica_capability(self, stats):
is_rmt_dev_available = self.check_remote_available()
if not is_rmt_dev_available:
if self.replica_device:
LOG.warning(_LW('Remote device is unavailable. '
'Remote backend: %s.'),
self.rmt_backend)
return stats
for pool in stats['pools']:
pool['replication_enabled'] = True
pool['replication_type'] = ['sync', 'async']
return stats
def _init_rmt_client(self):
# Multiple addresses support.
rmt_addrs = self.replica_device[0]['san_address'].split(';')
rmt_addrs = list(set([x.strip() for x in rmt_addrs if x.strip()]))
rmt_user = self.replica_device[0]['san_user']
rmt_password = self.replica_device[0]['san_password']
self.rmt_client = rest_client.RestClient(self.conf,
rmt_addrs,
rmt_user,
rmt_password)
def get_rmt_dev_info(self):
wwn = self.try_get_remote_wwn()
if not wwn:
return None, None
device = self.get_remote_device_by_wwn(wwn)
if not device:
return None, None
return device.get('ID'), device.get('NAME')
def build_rmt_lun_params(self, local_lun_info):
params = {
'TYPE': '11',
'NAME': local_lun_info['NAME'],
'PARENTTYPE': '216',
'PARENTID': self.rmt_client.get_pool_id(self.rmt_pool),
'DESCRIPTION': local_lun_info['DESCRIPTION'],
'ALLOCTYPE': local_lun_info['ALLOCTYPE'],
'CAPACITY': local_lun_info['CAPACITY'],
'WRITEPOLICY': self.conf.lun_write_type,
'MIRRORPOLICY': self.conf.lun_mirror_switch,
'PREFETCHPOLICY': self.conf.lun_prefetch_type,
'PREFETCHVALUE': self.conf.lun_prefetch_value,
'DATATRANSFERPOLICY': self.conf.lun_policy,
'READCACHEPOLICY': self.conf.lun_read_cache_policy,
'WRITECACHEPOLICY': self.conf.lun_write_cache_policy,
}
LOG.debug('Remote lun params: %s.', params)
return params
def wait_volume_online(self, client, lun_info,
interval=None, timeout=None):
online_status = constants.STATUS_VOLUME_READY
if lun_info.get('RUNNINGSTATUS') == online_status:
return
lun_id = lun_info['ID']
def _wait_online():
info = client.get_lun_info(lun_id)
return info.get('RUNNINGSTATUS') == online_status
if not interval:
interval = constants.DEFAULT_REPLICA_WAIT_INTERVAL
if not timeout:
timeout = constants.DEFAULT_REPLICA_WAIT_TIMEOUT
huawei_utils.wait_for_condition(_wait_online,
interval,
timeout)
def create_rmt_lun(self, local_lun_info):
# Create on rmt array. If failed, raise exception.
lun_params = self.build_rmt_lun_params(local_lun_info)
lun_info = self.rmt_client.create_lun(lun_params)
try:
self.wait_volume_online(self.rmt_client, lun_info)
except exception.VolumeBackendAPIException:
with excutils.save_and_reraise_exception():
self.rmt_client.delete_lun(lun_info['ID'])
return lun_info
def create_replica(self, local_lun_info, replica_model):
"""Create remote LUN and replication pair.
Purpose:
1. create remote lun
2. create replication pair
3. enable replication pair
"""
LOG.debug(('Create replication, local lun info: %(info)s, '
'replication model: %(model)s.'),
{'info': local_lun_info, 'model': replica_model})
local_lun_id = local_lun_info['ID']
self.wait_volume_online(self.local_client, local_lun_info)
# step1, create remote lun
rmt_lun_info = self.create_rmt_lun(local_lun_info)
rmt_lun_id = rmt_lun_info['ID']
# step2, get remote device info
rmt_dev_id, rmt_dev_name = self.get_rmt_dev_info()
if not rmt_lun_id or not rmt_dev_name:
self._delete_rmt_lun(rmt_lun_id)
msg = _('Get remote device info failed.')
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
# step3, create replication pair
try:
pair_info = self.local_op.create(local_lun_id,
rmt_lun_id, rmt_dev_id,
rmt_dev_name, replica_model)
pair_id = pair_info['ID']
except Exception as err:
with excutils.save_and_reraise_exception():
LOG.error(_LE('Create pair failed. Error: %s.'), err)
self._delete_rmt_lun(rmt_lun_id)
# step4, start sync manually. If replication type is sync,
# then wait for sync complete.
wait_complete = (replica_model == constants.REPLICA_SYNC_MODEL)
try:
self.local_driver.sync(pair_id, wait_complete)
except Exception as err:
with excutils.save_and_reraise_exception():
LOG.error(_LE('Start synchronization failed. Error: %s.'), err)
self._delete_pair(pair_id)
self._delete_rmt_lun(rmt_lun_id)
model_update = {}
driver_data = {'pair_id': pair_id,
'rmt_lun_id': rmt_lun_id}
model_update['replication_driver_data'] = to_string(driver_data)
model_update['replication_status'] = 'enabled'
LOG.debug('Create replication, return info: %s.', model_update)
return model_update
def _delete_pair(self, pair_id):
if (not pair_id
or not self.local_client.check_pair_exist(pair_id)):
return
self.local_driver.split(pair_id)
self.local_op.delete(pair_id)
def _delete_rmt_lun(self, lun_id):
if lun_id and self.rmt_client.check_lun_exist(lun_id):
self.rmt_client.delete_lun(lun_id)
def delete_replica(self, volume):
"""Delete replication pair and remote lun.
Purpose:
1. delete replication pair
2. delete remote_lun
"""
LOG.debug('Delete replication, volume: %s.', volume['id'])
info = get_replication_driver_data(volume)
pair_id = info.get('pair_id')
if pair_id:
self._delete_pair(pair_id)
# Delete remote_lun
rmt_lun_id = info.get('rmt_lun_id')
if rmt_lun_id:
self._delete_rmt_lun(rmt_lun_id)
def enable_replica(self, volume):
"""Enable replication.
Purpose:
1. If local backend's array is secondary, switch to primary
2. Synchronize data
"""
LOG.debug('Enable replication, volume: %s.', volume['id'])
info = get_replication_driver_data(volume)
pair_id = info.get('pair_id')
if not pair_id:
msg = _('No pair id in volume replication_driver_data.')
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
info = self.local_op.get_replica_info(pair_id)
if not info:
msg = _('Pair does not exist on array. Pair id: %s.') % pair_id
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
wait_sync_complete = False
if info.get('REPLICATIONMODEL') == constants.REPLICA_SYNC_MODEL:
wait_sync_complete = True
return self.local_driver.enable(pair_id, wait_sync_complete)
def disable_replica(self, volume):
"""We consider that all abnormal states is disabled."""
LOG.debug('Disable replication, volume: %s.', volume['id'])
info = get_replication_driver_data(volume)
pair_id = info.get('pair_id')
if not pair_id:
LOG.warning(_LW('No pair id in volume replication_driver_data.'))
return None
return self.local_driver.disable(pair_id)
def failover_replica(self, volume):
"""Just make the secondary available."""
LOG.debug('Failover replication, volume: %s.', volume['id'])
info = get_replication_driver_data(volume)
pair_id = info.get('pair_id')
if not pair_id:
msg = _('No pair id in volume replication_driver_data.')
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
rmt_lun_id = info.get('rmt_lun_id')
if not rmt_lun_id:
msg = _('No remote LUN id in volume replication_driver_data.')
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
# Remote array must be available. So we can get the real pool info.
lun_info = self.rmt_client.get_lun_info(rmt_lun_id)
lun_wwn = lun_info.get('WWN')
lun_pool = lun_info.get('PARENTNAME')
new_backend = re.sub(r'(?<=#).*$', lun_pool, self.rmt_backend)
self.rmt_driver.failover(pair_id)
metadata = huawei_utils.get_volume_metadata(volume)
metadata.update({'lun_wwn': lun_wwn})
new_driver_data = {'pair_id': pair_id,
'rmt_lun_id': volume['provider_location']}
new_driver_data = to_string(new_driver_data)
return {'host': new_backend,
'provider_location': rmt_lun_id,
'replication_driver_data': new_driver_data,
'metadata': metadata}
def list_replica_targets(self, volume):
info = get_replication_driver_data(volume)
if not info:
LOG.warning(_LW('Replication driver data does not exist. '
'Volume: %s'), volume['id'])
targets = [{'target_device_id': self.target_dev_id}]
return {'volume_id': volume['id'],
'targets': targets}
def get_replication_opts(opts):
if opts.get('replication_type') == 'sync':
opts['replication_type'] = constants.REPLICA_SYNC_MODEL
else:
opts['replication_type'] = constants.REPLICA_ASYNC_MODEL
return opts

View File

@ -231,7 +231,7 @@ class RestClient(object):
return info
def get_pool_id(self, volume, pool_name):
def get_pool_id(self, pool_name):
pools = self.get_all_pools()
pool_info = self.get_pool_info(pool_name, pools)
if not pool_info:
@ -1562,11 +1562,15 @@ class RestClient(object):
self._assert_rest_result(result, _('Add lun to cache error.'))
def find_array_version(self):
def get_array_info(self):
url = "/system/"
result = self.call(url, None, "GET")
self._assert_rest_result(result, _('Find array version error.'))
return result['data']['PRODUCTVERSION']
self._assert_rest_result(result, _('Get array info error.'))
return result.get('data', None)
def find_array_version(self):
info = self.get_array_info()
return info.get('PRODUCTVERSION', None)
def remove_host(self, host_id):
url = "/host/%s" % host_id
@ -2008,3 +2012,78 @@ class RestClient(object):
for item in result.get('data', []):
wwns.append(item['WWN'])
return wwns
def get_remote_devices(self):
url = "/remote_device"
result = self.call(url, None, "GET")
self._assert_rest_result(result, _('Get remote devices error.'))
return result.get('data', [])
def create_pair(self, pair_params):
url = "/REPLICATIONPAIR"
result = self.call(url, pair_params, "POST")
msg = _('Create replication error.')
self._assert_rest_result(result, msg)
self._assert_data_in_result(result, msg)
return result['data']
def get_pair_by_id(self, pair_id):
url = "/REPLICATIONPAIR/" + pair_id
result = self.call(url, None, "GET")
msg = _('Get pair failed.')
self._assert_rest_result(result, msg)
return result.get('data', {})
def switch_pair(self, pair_id):
url = '/REPLICATIONPAIR/switch'
data = {"ID": pair_id,
"TYPE": "263"}
result = self.call(url, data, "PUT")
msg = _('Switch over pair error.')
self._assert_rest_result(result, msg)
def split_pair(self, pair_id):
url = '/REPLICATIONPAIR/split'
data = {"ID": pair_id,
"TYPE": "263"}
result = self.call(url, data, "PUT")
msg = _('Split pair error.')
self._assert_rest_result(result, msg)
def delete_pair(self, pair_id, force=False):
url = "/REPLICATIONPAIR/" + pair_id
data = None
if force:
data = {"ISLOCALDELETE": force}
result = self.call(url, data, "DELETE")
msg = _('delete_replication error.')
self._assert_rest_result(result, msg)
def sync_pair(self, pair_id):
url = "/REPLICATIONPAIR/sync"
data = {"ID": pair_id,
"TYPE": "263"}
result = self.call(url, data, "PUT")
msg = _('Sync pair error.')
self._assert_rest_result(result, msg)
def check_pair_exist(self, pair_id):
url = "/REPLICATIONPAIR/" + pair_id
result = self.call(url, None, "GET")
return result['error']['code'] == 0
def set_pair_second_access(self, pair_id, access):
url = "/REPLICATIONPAIR/" + pair_id
data = {"ID": pair_id,
"SECRESACCESS": access}
result = self.call(url, data, "PUT")
msg = _('Set pair secondary access error.')
self._assert_rest_result(result, msg)

View File

@ -0,0 +1,2 @@
features:
- Added Replication V2 support for Huawei drivers.