Implement replication support in huawei driver

Implement the following interfaces:
  - create_replica
  - delete_replica
  - promote_replica
  - update_replica_state

DocImpact

Change-Id: I1c52c9f67daa8440050a03e7e9ead5ca32ded458
Implements: bp huawei-driver-support-replication
This commit is contained in:
zhaohua 2016-06-20 17:51:47 +08:00 committed by Yingzhe Zeng
parent 3b73777227
commit 88898b553d
14 changed files with 1439 additions and 66 deletions

View File

@ -99,3 +99,24 @@ class HuaweiBase(object):
@abc.abstractmethod
def teardown_server(self, server_details, security_services=None):
"""Teardown share server."""
@abc.abstractmethod
def create_replica(self, context, replica_list, new_replica,
access_rules, replica_snapshots, share_server=None):
"""Replicate the active replica to a new replica on this backend."""
@abc.abstractmethod
def update_replica_state(self, context, replica_list, replica,
access_rules, replica_snapshots,
share_server=None):
"""Update the replica_state of a replica."""
@abc.abstractmethod
def promote_replica(self, context, replica_list, replica, access_rules,
share_server=None):
"""Promote a replica to 'active' replica state."""
@abc.abstractmethod
def delete_replica(self, context, replica_list, replica_snapshots,
replica, share_server=None):
"""Delete a replica."""

View File

@ -46,6 +46,7 @@ ERROR_CONNECT_TO_SERVER = -403
ERROR_UNAUTHORIZED_TO_SERVER = -401
ERROR_LOGICAL_PORT_EXIST = 1073813505
ERROR_USER_OR_GROUP_NOT_EXIST = 1077939723
ERROR_REPLICATION_PAIR_NOT_EXIST = 1077937923
PORT_TYPE_ETH = '1'
PORT_TYPE_BOND = '7'
@ -100,3 +101,40 @@ OPTS_ASSOCIATE = {
}
VALID_SECTOR_SIZES = ('4', '8', '16', '32', '64')
LOCAL_RES_TYPES = (FILE_SYSTEM_TYPE,) = ('40',)
REPLICA_MODELS = (REPLICA_SYNC_MODEL,
REPLICA_ASYNC_MODEL) = ('1', '2')
REPLICA_SPEED_MODELS = (REPLICA_SPEED_LOW,
REPLICA_SPEED_MEDIUM,
REPLICA_SPEED_HIGH,
REPLICA_SPEED_HIGHEST) = ('1', '2', '3', '4')
REPLICA_HEALTH_STATUSES = (REPLICA_HEALTH_STATUS_NORMAL,
REPLICA_HEALTH_STATUS_FAULT,
REPLICA_HEALTH_STATUS_INVALID) = ('1', '2', '14')
REPLICA_DATA_STATUSES = (
REPLICA_DATA_STATUS_SYNCHRONIZED,
REPLICA_DATA_STATUS_COMPLETE,
REPLICA_DATA_STATUS_INCOMPLETE) = ('1', '2', '5')
REPLICA_DATA_STATUS_IN_SYNC = (
REPLICA_DATA_STATUS_SYNCHRONIZED,
REPLICA_DATA_STATUS_COMPLETE)
REPLICA_RUNNING_STATUSES = (
REPLICA_RUNNING_STATUS_NORMAL,
REPLICA_RUNNING_STATUS_SYNCING,
REPLICA_RUNNING_STATUS_SPLITTED,
REPLICA_RUNNING_STATUS_TO_RECOVER,
REPLICA_RUNNING_STATUS_INTERRUPTED,
REPLICA_RUNNING_STATUS_INVALID) = (
'1', '23', '26', '33', '34', '35')
REPLICA_SECONDARY_ACCESS_RIGHTS = (
REPLICA_SECONDARY_ACCESS_DENIED,
REPLICA_SECONDARY_RO,
REPLICA_SECONDARY_RW) = ('1', '2', '3')

View File

@ -58,21 +58,23 @@ class HuaweiNasDriver(driver.ShareDriver):
Add create share from snapshot.
1.3 - Add manage snapshot.
Support reporting disk type of pool.
Add replication support.
"""
def __init__(self, *args, **kwargs):
"""Do initialization."""
LOG.debug("Enter into init function.")
LOG.debug("Enter into init function of Huawei Driver.")
super(HuaweiNasDriver, self).__init__((True, False), *args, **kwargs)
self.configuration = kwargs.get('configuration', None)
if self.configuration:
self.configuration.append_config_values(huawei_opts)
backend_driver = self.get_backend_driver()
self.plugin = importutils.import_object(backend_driver,
self.configuration)
else:
raise exception.InvalidShare(
reason=_("Huawei configuration missing."))
if not self.configuration:
raise exception.InvalidInput(reason=_(
"Huawei driver configuration missing."))
self.configuration.append_config_values(huawei_opts)
kwargs.pop('configuration')
self.plugin = importutils.import_object(self.get_backend_driver(),
self.configuration,
**kwargs)
def check_for_setup_error(self):
"""Returns an error if prerequisites aren't met."""
@ -202,7 +204,17 @@ class HuaweiNasDriver(driver.ShareDriver):
storage_protocol='NFS_CIFS',
qos=True,
total_capacity_gb=0.0,
free_capacity_gb=0.0)
free_capacity_gb=0.0,
snapshot_support=self.plugin.snapshot_support,
)
# huawei array doesn't support snapshot replication, so driver can't
# create replicated snapshot, this's not fit the requirement of
# replication feature.
# to avoid this problem, we specify huawei driver can't support
# snapshot and replication both, as a workaround.
if not data['snapshot_support'] and self.plugin.replication_support:
data['replication_type'] = 'dr'
self.plugin.update_share_stats(data)
super(HuaweiNasDriver, self)._update_share_stats(data)
@ -214,3 +226,42 @@ class HuaweiNasDriver(driver.ShareDriver):
def _teardown_server(self, server_details, security_services=None):
"""Teardown share server."""
return self.plugin.teardown_server(server_details, security_services)
def create_replica(self, context, replica_list, new_replica,
access_rules, replica_snapshots, share_server=None):
"""Replicate the active replica to a new replica on this backend."""
return self.plugin.create_replica(context,
replica_list,
new_replica,
access_rules,
replica_snapshots,
share_server)
def update_replica_state(self, context, replica_list, replica,
access_rules, replica_snapshots,
share_server=None):
"""Update the replica_state of a replica."""
return self.plugin.update_replica_state(context,
replica_list,
replica,
access_rules,
replica_snapshots,
share_server)
def promote_replica(self, context, replica_list, replica, access_rules,
share_server=None):
"""Promote a replica to 'active' replica state.."""
return self.plugin.promote_replica(context,
replica_list,
replica,
access_rules,
share_server)
def delete_replica(self, context, replica_list, replica_snapshots,
replica, share_server=None):
"""Delete a replica."""
self.plugin.delete_replica(context,
replica_list,
replica_snapshots,
replica,
share_server)

View File

@ -19,7 +19,9 @@ import string
import tempfile
import time
from oslo_config import cfg
from oslo_log import log
import oslo_messaging as messaging
from oslo_serialization import jsonutils
from oslo_utils import excutils
from oslo_utils import strutils
@ -33,32 +35,61 @@ from manila.i18n import _
from manila.i18n import _LE
from manila.i18n import _LI
from manila.i18n import _LW
from manila import rpc
from manila.share.drivers.huawei import base as driver
from manila.share.drivers.huawei import constants
from manila.share.drivers.huawei import huawei_utils
from manila.share.drivers.huawei.v3 import helper
from manila.share.drivers.huawei.v3 import replication
from manila.share.drivers.huawei.v3 import rpcapi as v3_rpcapi
from manila.share.drivers.huawei.v3 import smartx
from manila.share import share_types
from manila.share import utils as share_utils
from manila import utils
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class V3StorageConnection(driver.HuaweiBase):
"""Helper class for Huawei OceanStor V3 storage system."""
def __init__(self, configuration):
def __init__(self, configuration, **kwargs):
super(V3StorageConnection, self).__init__(configuration)
self.helper = helper.RestHelper(self.configuration)
self.replica_mgr = replication.ReplicaPairManager(self.helper)
self.rpc_client = v3_rpcapi.HuaweiV3API()
self.private_storage = kwargs.get('private_storage')
self.qos_support = False
self.snapshot_support = False
self.replication_support = False
def _setup_rpc_server(self, endpoints):
host = "%s@%s" % (CONF.host, self.configuration.config_group)
target = messaging.Target(topic=self.rpc_client.topic, server=host)
self.rpc_server = rpc.get_server(target, endpoints)
self.rpc_server.start()
def connect(self):
"""Try to connect to V3 server."""
if self.configuration:
self.helper = helper.RestHelper(self.configuration)
else:
raise exception.InvalidInput(_("Huawei configuration missing."))
self.helper.login()
self._setup_rpc_server([self.replica_mgr])
self._setup_conf()
def _setup_conf(self):
root = self.helper._read_xml()
snapshot_support = root.findtext('Storage/SnapshotSupport')
if snapshot_support:
self.snapshot_support = strutils.bool_from_string(
snapshot_support, strict=True)
replication_support = root.findtext('Storage/ReplicationSupport')
if replication_support:
self.replication_support = strutils.bool_from_string(
replication_support, strict=True)
def create_share(self, share, share_server=None):
"""Create a share."""
@ -107,14 +138,14 @@ class V3StorageConnection(driver.HuaweiBase):
if qos_id:
self.remove_qos_fs(fs_id, qos_id)
self.helper._delete_fs(fs_id)
message = (_('Failed to create share %(name)s.'
message = (_('Failed to create share %(name)s. '
'Reason: %(err)s.')
% {'name': share_name,
'err': err})
raise exception.InvalidShare(reason=message)
try:
self.helper._create_share(share_name, fs_id, share_proto)
self.helper.create_share(share_name, fs_id, share_proto)
except Exception as err:
if fs_id is not None:
qos_id = self.helper.get_qosid_by_fsid(fs_id)
@ -255,7 +286,7 @@ class V3StorageConnection(driver.HuaweiBase):
LOG.debug("Delete a snapshot.")
snap_name = snapshot['id']
sharefsid = self.helper._get_fsid_by_name(snapshot['share_name'])
sharefsid = self.helper.get_fsid_by_name(snapshot['share_name'])
if sharefsid is None:
LOG.warning(_LW('Delete snapshot share id %s fs has been '
@ -283,6 +314,7 @@ class V3StorageConnection(driver.HuaweiBase):
pool_name = pool_name.strip().strip('\n')
capacity = self._get_capacity(pool_name, all_pool_info)
disk_type = self._get_disk_type(pool_name, all_pool_info)
if capacity:
pool = dict(
pool_name=pool_name,
@ -303,6 +335,7 @@ class V3StorageConnection(driver.HuaweiBase):
huawei_smartpartition=[True, False],
huawei_sectorsize=[True, False],
)
if disk_type:
pool['huawei_disk_type'] = disk_type
@ -330,7 +363,7 @@ class V3StorageConnection(driver.HuaweiBase):
if not share:
LOG.warning(_LW('The share was not found. Share name:%s'),
share_name)
fsid = self.helper._get_fsid_by_name(share_name)
fsid = self.helper.get_fsid_by_name(share_name)
if fsid:
self.helper._delete_fs(fsid)
return
@ -355,7 +388,7 @@ class V3StorageConnection(driver.HuaweiBase):
def create_share_from_snapshot(self, share, snapshot,
share_server=None):
"""Create a share from snapshot."""
share_fs_id = self.helper._get_fsid_by_name(snapshot['share_name'])
share_fs_id = self.helper.get_fsid_by_name(snapshot['share_name'])
if not share_fs_id:
err_msg = (_("The source filesystem of snapshot %s "
"does not exist.")
@ -1229,6 +1262,12 @@ class V3StorageConnection(driver.HuaweiBase):
LOG.error(err_msg)
raise exception.InvalidInput(reason=err_msg)
if self.snapshot_support and self.replication_support:
err_msg = _('Config file invalid. SnapshotSupport and '
'ReplicationSupport can not both be set to True.')
LOG.error(err_msg)
raise exception.BadConfigurationException(reason=err_msg)
def check_service(self):
running_status = self.helper._get_cifs_service_status()
if running_status != constants.STATUS_SERVICE_RUNNING:
@ -1668,3 +1707,147 @@ class V3StorageConnection(driver.HuaweiBase):
ip = self._get_share_ip(share_server)
location = self._get_location_path(share_name, share_proto, ip)
return [location]
def create_replica(self, context, replica_list, new_replica,
access_rules, replica_snapshots, share_server=None):
"""Create a new share, and create a remote replication pair."""
active_replica = share_utils.get_active_replica(replica_list)
if (self.private_storage.get(active_replica['share_id'],
'replica_pair_id')):
# for huawei array, only one replication can be created for
# each active replica, so if a replica pair id is recorded for
# this share, it means active replica already has a replication,
# can not create anymore.
msg = _('Cannot create more than one replica for share %s.')
LOG.error(msg, active_replica['share_id'])
raise exception.ReplicationException(
reason=msg % active_replica['share_id'])
# Create a new share
new_share_name = new_replica['name']
location = self.create_share(new_replica, share_server)
# create a replication pair.
# replication pair only can be created by master node,
# so here is a remote call to trigger master node to
# start the creating progress.
try:
replica_pair_id = self.rpc_client.create_replica_pair(
context,
active_replica['host'],
local_share_info=active_replica,
remote_device_wwn=self.helper.get_array_wwn(),
remote_fs_id=self.helper.get_fsid_by_name(new_share_name)
)
except Exception:
LOG.exception(_LE('Failed to create a replication pair '
'with host %s.'),
active_replica['host'])
raise
self.private_storage.update(new_replica['share_id'],
{'replica_pair_id': replica_pair_id})
# Get the state of the new created replica
replica_state = self.replica_mgr.get_replica_state(replica_pair_id)
replica_ref = {
'export_locations': [location],
'replica_state': replica_state,
'access_rules_status': common_constants.STATUS_ACTIVE,
}
return replica_ref
def update_replica_state(self, context, replica_list, replica,
access_rules, replica_snapshots,
share_server=None):
replica_pair_id = self.private_storage.get(replica['share_id'],
'replica_pair_id')
if replica_pair_id is None:
msg = _LE("No replication pair ID recorded for share %s.")
LOG.error(msg, replica['share_id'])
return common_constants.STATUS_ERROR
self.replica_mgr.update_replication_pair_state(replica_pair_id)
return self.replica_mgr.get_replica_state(replica_pair_id)
def promote_replica(self, context, replica_list, replica, access_rules,
share_server=None):
replica_pair_id = self.private_storage.get(replica['share_id'],
'replica_pair_id')
if replica_pair_id is None:
msg = _("No replication pair ID recorded for share %s.")
LOG.error(msg, replica['share_id'])
raise exception.ReplicationException(
reason=msg % replica['share_id'])
try:
self.replica_mgr.switch_over(replica_pair_id)
except Exception:
LOG.exception(_LE('Failed to promote replica %s.'),
replica['id'])
raise
updated_new_active_access = True
cleared_old_active_access = True
try:
self.update_access(replica, access_rules, [], [], share_server)
except Exception:
LOG.warning(_LW('Failed to set access rules to '
'new active replica %s.'),
replica['id'])
updated_new_active_access = False
old_active_replica = share_utils.get_active_replica(replica_list)
try:
self.clear_access(old_active_replica, share_server)
except Exception:
LOG.warning(_LW("Failed to clear access rules from "
"old active replica %s."),
old_active_replica['id'])
cleared_old_active_access = False
new_active_update = {
'id': replica['id'],
'replica_state': common_constants.REPLICA_STATE_ACTIVE,
}
new_active_update['access_rules_status'] = (
common_constants.STATUS_ACTIVE if updated_new_active_access
else common_constants.STATUS_OUT_OF_SYNC)
# get replica state for new secondary after switch over
replica_state = self.replica_mgr.get_replica_state(replica_pair_id)
old_active_update = {
'id': old_active_replica['id'],
'replica_state': replica_state,
}
old_active_update['access_rules_status'] = (
common_constants.STATUS_OUT_OF_SYNC if cleared_old_active_access
else common_constants.STATUS_ACTIVE)
return [new_active_update, old_active_update]
def delete_replica(self, context, replica_list, replica_snapshots,
replica, share_server=None):
replica_pair_id = self.private_storage.get(replica['share_id'],
'replica_pair_id')
if replica_pair_id is None:
msg = _LW("No replication pair ID recorded for share %(share)s. "
"Continue to delete replica %(replica)s.")
LOG.warning(msg, {'share': replica['share_id'],
'replica': replica['id']})
else:
self.replica_mgr.delete_replication_pair(replica_pair_id)
self.private_storage.delete(replica['share_id'])
try:
self.delete_share(replica, share_server)
except Exception:
LOG.exception(_LE('Failed to delete replica %s.'),
replica['id'])
raise

View File

@ -218,7 +218,7 @@ class RestHelper(object):
LOG.error(_LE('Bad response from change file: %s.') % err)
raise err
def _create_share(self, share_name, fs_id, share_proto):
def create_share(self, share_name, fs_id, share_proto):
"""Create a share."""
share_url_type = self._get_share_url_type(share_proto)
share_path = self._get_share_path(share_name)
@ -698,14 +698,14 @@ class RestHelper(object):
return share_url_type
def _get_fsid_by_name(self, share_name):
def get_fsid_by_name(self, share_name):
url = "/FILESYSTEM?range=[0-8191]"
result = self.call(url, None, "GET")
self._assert_rest_result(result, 'Get filesystem by name error!')
sharename = share_name.replace("-", "_")
share_name = share_name.replace("-", "_")
for item in result.get('data', []):
if sharename == item['NAME']:
if share_name == item['NAME']:
return item['ID']
def _get_fs_info_by_id(self, fsid):
@ -1336,8 +1336,111 @@ class RestHelper(object):
return False, None
def find_array_version(self):
def _get_array_info(self):
url = "/system/"
result = self.call(url, None)
self._assert_rest_result(result, _('Find array version error.'))
return result['data']['PRODUCTVERSION']
result = self.call(url, None, "GET")
msg = _('Get array info error.')
self._assert_rest_result(result, msg)
self._assert_data_in_result(result, msg)
return result.get('data')
def find_array_version(self):
info = self._get_array_info()
return info.get('PRODUCTVERSION')
def get_array_wwn(self):
info = self._get_array_info()
return info.get('wwn')
def _get_all_remote_devices(self):
url = "/remote_device"
result = self.call(url, None, "GET")
self._assert_rest_result(result, _('Get all remote devices error.'))
return result.get('data', [])
def get_remote_device_by_wwn(self, wwn):
devices = self._get_all_remote_devices()
for device in devices:
if device.get('WWN') == wwn:
return device
return {}
def create_replication_pair(self, pair_params):
url = "/REPLICATIONPAIR"
data = jsonutils.dumps(pair_params)
result = self.call(url, data, "POST")
msg = _('Failed to create replication pair for '
'(LOCALRESID: %(lres)s, REMOTEDEVICEID: %(rdev)s, '
'REMOTERESID: %(rres)s).') % {
'lres': pair_params['LOCALRESID'],
'rdev': pair_params['REMOTEDEVICEID'],
'rres': pair_params['REMOTERESID']}
self._assert_rest_result(result, msg)
self._assert_data_in_result(result, msg)
return result['data']
def split_replication_pair(self, pair_id):
url = '/REPLICATIONPAIR/split'
data = jsonutils.dumps({"ID": pair_id, "TYPE": "263"})
result = self.call(url, data, "PUT")
msg = _('Failed to split replication pair %s.') % pair_id
self._assert_rest_result(result, msg)
def switch_replication_pair(self, pair_id):
url = '/REPLICATIONPAIR/switch'
data = jsonutils.dumps({"ID": pair_id, "TYPE": "263"})
result = self.call(url, data, "PUT")
msg = _('Failed to switch replication pair %s.') % pair_id
self._assert_rest_result(result, msg)
def delete_replication_pair(self, pair_id):
url = "/REPLICATIONPAIR/" + pair_id
data = None
result = self.call(url, data, "DELETE")
if (result['error']['code'] ==
constants.ERROR_REPLICATION_PAIR_NOT_EXIST):
LOG.warning(_LW('Replication pair %s was not found.'),
pair_id)
return
msg = _('Failed to delete replication pair %s.') % pair_id
self._assert_rest_result(result, msg)
def sync_replication_pair(self, pair_id):
url = "/REPLICATIONPAIR/sync"
data = jsonutils.dumps({"ID": pair_id, "TYPE": "263"})
result = self.call(url, data, "PUT")
msg = _('Failed to sync replication pair %s.') % pair_id
self._assert_rest_result(result, msg)
def cancel_pair_secondary_write_lock(self, pair_id):
url = "/REPLICATIONPAIR/CANCEL_SECODARY_WRITE_LOCK"
data = jsonutils.dumps({"ID": pair_id, "TYPE": "263"})
result = self.call(url, data, "PUT")
msg = _('Failed to cancel replication pair %s '
'secondary write lock.') % pair_id
self._assert_rest_result(result, msg)
def set_pair_secondary_write_lock(self, pair_id):
url = "/REPLICATIONPAIR/SET_SECODARY_WRITE_LOCK"
data = jsonutils.dumps({"ID": pair_id, "TYPE": "263"})
result = self.call(url, data, "PUT")
msg = _('Failed to set replication pair %s '
'secondary write lock.') % pair_id
self._assert_rest_result(result, msg)
def get_replication_pair_by_id(self, pair_id):
url = "/REPLICATIONPAIR/" + pair_id
result = self.call(url, None, "GET")
msg = _('Failed to get replication pair %s.') % pair_id
self._assert_rest_result(result, msg)
self._assert_data_in_result(result, msg)
return result.get('data')

View File

@ -0,0 +1,250 @@
# Copyright (c) 2016 Huawei Technologies Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from oslo_utils import strutils
from manila.common import constants as common_constants
from manila import exception
from manila.i18n import _
from manila.i18n import _LE
from manila.i18n import _LW
from manila.share.drivers.huawei import constants
LOG = log.getLogger(__name__)
class ReplicaPairManager(object):
def __init__(self, helper):
self.helper = helper
def create(self, local_share_info, remote_device_wwn, remote_fs_id):
local_share_name = local_share_info.get('name')
try:
local_fs_id = self.helper.get_fsid_by_name(local_share_name)
if not local_fs_id:
msg = _("Local fs was not found by name %s.")
LOG.error(msg, local_share_name)
raise exception.ReplicationException(
reason=msg % local_share_name)
remote_device = self.helper.get_remote_device_by_wwn(
remote_device_wwn)
pair_params = {
"LOCALRESID": local_fs_id,
"LOCALRESTYPE": constants.FILE_SYSTEM_TYPE,
"REMOTEDEVICEID": remote_device.get('ID'),
"REMOTEDEVICENAME": remote_device.get('NAME'),
"REMOTERESID": remote_fs_id,
"REPLICATIONMODEL": constants.REPLICA_ASYNC_MODEL,
"RECOVERYPOLICY": '2',
"SYNCHRONIZETYPE": '1',
"SPEED": constants.REPLICA_SPEED_MEDIUM,
}
pair_info = self.helper.create_replication_pair(pair_params)
except Exception:
msg = _LE("Failed to create replication pair for share %s.")
LOG.exception(msg, local_share_name)
raise
self._sync_replication_pair(pair_info['ID'])
return pair_info['ID']
def _get_replication_pair_info(self, replica_pair_id):
try:
pair_info = self.helper.get_replication_pair_by_id(
replica_pair_id)
except Exception:
LOG.exception(_LE('Failed to get replication pair info for '
'%s.'), replica_pair_id)
raise
return pair_info
def _check_replication_health(self, pair_info):
if (pair_info['HEALTHSTATUS'] !=
constants.REPLICA_HEALTH_STATUS_NORMAL):
return common_constants.STATUS_ERROR
def _check_replication_running_status(self, pair_info):
if (pair_info['RUNNINGSTATUS'] in (
constants.REPLICA_RUNNING_STATUS_SPLITTED,
constants.REPLICA_RUNNING_STATUS_TO_RECOVER)):
return common_constants.REPLICA_STATE_OUT_OF_SYNC
if (pair_info['RUNNINGSTATUS'] in (
constants.REPLICA_RUNNING_STATUS_INTERRUPTED,
constants.REPLICA_RUNNING_STATUS_INVALID)):
return common_constants.STATUS_ERROR
def _check_replication_secondary_data_status(self, pair_info):
if (pair_info['SECRESDATASTATUS'] in
constants.REPLICA_DATA_STATUS_IN_SYNC):
return common_constants.REPLICA_STATE_IN_SYNC
else:
return common_constants.REPLICA_STATE_OUT_OF_SYNC
def _check_replica_state(self, pair_info):
result = self._check_replication_health(pair_info)
if result is not None:
return result
result = self._check_replication_running_status(pair_info)
if result is not None:
return result
return self._check_replication_secondary_data_status(pair_info)
def get_replica_state(self, replica_pair_id):
try:
pair_info = self._get_replication_pair_info(replica_pair_id)
except Exception:
# if cannot communicate to backend, return error
LOG.error(_LE('Cannot get replica state, return %s'),
common_constants.STATUS_ERROR)
return common_constants.STATUS_ERROR
return self._check_replica_state(pair_info)
def _sync_replication_pair(self, pair_id):
try:
self.helper.sync_replication_pair(pair_id)
except Exception as err:
LOG.warning(_LW('Failed to sync replication pair %(id)s. '
'Reason: %(err)s'),
{'id': pair_id, 'err': err})
def update_replication_pair_state(self, replica_pair_id):
pair_info = self._get_replication_pair_info(replica_pair_id)
health = self._check_replication_health(pair_info)
if health is not None:
LOG.warning(_LW("Cannot update the replication %s "
"because it's not in normal status."),
replica_pair_id)
return
if strutils.bool_from_string(pair_info['ISPRIMARY']):
# current replica is primary, not consistent with manila.
# the reason for this circumstance is the last switch over
# didn't succeed completely. continue the switch over progress..
try:
self.helper.switch_replication_pair(replica_pair_id)
except Exception:
msg = _LE('Replication pair %s primary/secondary '
'relationship is not right, try to switch over '
'again but still failed.')
LOG.exception(msg, replica_pair_id)
return
# refresh the replication pair info
pair_info = self._get_replication_pair_info(replica_pair_id)
if pair_info['SECRESACCESS'] == constants.REPLICA_SECONDARY_RW:
try:
self.helper.set_pair_secondary_write_lock(replica_pair_id)
except Exception:
msg = _LE('Replication pair %s secondary access is R/W, '
'try to set write lock but still failed.')
LOG.exception(msg, replica_pair_id)
return
if pair_info['RUNNINGSTATUS'] in (
constants.REPLICA_RUNNING_STATUS_NORMAL,
constants.REPLICA_RUNNING_STATUS_SPLITTED,
constants.REPLICA_RUNNING_STATUS_TO_RECOVER):
self._sync_replication_pair(replica_pair_id)
def switch_over(self, replica_pair_id):
pair_info = self._get_replication_pair_info(replica_pair_id)
if strutils.bool_from_string(pair_info['ISPRIMARY']):
LOG.warning(_LW('The replica to promote is already primary, '
'no need to switch over.'))
return
replica_state = self._check_replica_state(pair_info)
if replica_state != common_constants.REPLICA_STATE_IN_SYNC:
# replica is not in SYNC state, can't be promoted
msg = _('Data of replica %s is not synchronized, '
'can not promote.')
raise exception.ReplicationException(
reason=msg % replica_pair_id)
try:
self.helper.split_replication_pair(replica_pair_id)
except Exception:
# split failed
# means replication pair is in an abnormal status,
# ignore this exception, continue to cancel secondary write lock,
# let secondary share accessible for disaster recovery.
LOG.exception(_LE('Failed to split replication pair %s while '
'switching over.'), replica_pair_id)
try:
self.helper.cancel_pair_secondary_write_lock(replica_pair_id)
except Exception:
LOG.exception(_LE('Failed to cancel replication pair %s '
'secondary write lock.'), replica_pair_id)
raise
try:
self.helper.switch_replication_pair(replica_pair_id)
self.helper.set_pair_secondary_write_lock(replica_pair_id)
self.helper.sync_replication_pair(replica_pair_id)
except Exception:
LOG.exception(_LE('Failed to completely switch over '
'replication pair %s.'), replica_pair_id)
# for all the rest steps,
# because secondary share is accessible now,
# the upper business may access the secondary share,
# return success to tell replica is primary.
return
def delete_replication_pair(self, replica_pair_id):
try:
self.helper.split_replication_pair(replica_pair_id)
except Exception:
# Ignore this exception because replication pair may at some
# abnormal status that supports deleting.
LOG.warning(_LW('Failed to split replication pair %s '
'before deleting it. Ignore this exception, '
'and try to delete anyway.'),
replica_pair_id)
try:
self.helper.delete_replication_pair(replica_pair_id)
except Exception:
LOG.exception(_LE('Failed to delete replication pair %s.'),
replica_pair_id)
raise
def create_replica_pair(self, ctx,
local_share_info,
remote_device_wwn,
remote_fs_id):
"""Create replication pair for RPC call.
This is for remote call, because replica pair can only be created
by master node.
"""
return self.create(local_share_info,
remote_device_wwn,
remote_fs_id)

View File

@ -0,0 +1,46 @@
# Copyright (c) 2016 Huawei Technologies Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_messaging as messaging
from manila import rpc
from manila.share import utils
class HuaweiV3API(object):
"""Client side of the huawei V3 rpc API.
API version history:
1.0 - Initial version.
"""
BASE_RPC_API_VERSION = '1.0'
def __init__(self):
self.topic = 'huawei_v3'
target = messaging.Target(topic=self.topic,
version=self.BASE_RPC_API_VERSION)
self.client = rpc.get_client(target, version_cap='1.0')
def create_replica_pair(self, context, host, local_share_info,
remote_device_wwn, remote_fs_id):
new_host = utils.extract_host(host)
call_context = self.client.prepare(server=new_host, version='1.0')
return call_context.call(
context, 'create_replica_pair',
local_share_info=local_share_info,
remote_device_wwn=remote_device_wwn,
remote_fs_id=remote_fs_id)

View File

@ -2882,6 +2882,7 @@ class ShareManager(manager.SchedulerDependentManager):
# TODO(gouthamr): remove method when the db layer returns primitives
share_replica_ref = {
'id': share_replica.get('id'),
'name': share_replica.get('name'),
'share_id': share_replica.get('share_id'),
'host': share_replica.get('host'),
'status': share_replica.get('status'),

View File

@ -16,6 +16,9 @@
"""Share-related Utilities and helpers."""
from manila.common import constants
DEFAULT_POOL_NAME = '_pool0'
@ -76,3 +79,10 @@ def append_host(host, pool):
new_host = "#".join([host, pool])
return new_host
def get_active_replica(replica_list):
"""Returns the first 'active' replica in the list of replicas provided."""
for replica in replica_list:
if replica['replica_state'] == constants.REPLICA_STATE_ACTIVE:
return replica

View File

@ -18,6 +18,7 @@
import os
import shutil
import six
import tempfile
import time
import xml.dom.minidom
@ -26,15 +27,19 @@ import ddt
import mock
from oslo_serialization import jsonutils
from manila.common import constants as common_constants
from manila import context
from manila.data import utils as data_utils
from manila import db
from manila import exception
from manila import rpc
from manila.share import configuration as conf
from manila.share.drivers.huawei import constants
from manila.share.drivers.huawei import huawei_nas
from manila.share.drivers.huawei.v3 import connection
from manila.share.drivers.huawei.v3 import helper
from manila.share.drivers.huawei.v3 import replication
from manila.share.drivers.huawei.v3 import rpcapi
from manila.share.drivers.huawei.v3 import smartx
from manila import test
from manila import utils
@ -324,6 +329,7 @@ class FakeHuaweiNasHelper(helper.RestHelper):
self.cache_exist = True
self.partition_exist = True
self.alloc_type = None
self.custom_results = {}
def _change_file_mode(self, filepath):
pass
@ -332,6 +338,14 @@ class FakeHuaweiNasHelper(helper.RestHelper):
url = url.replace('http://100.115.10.69:8082/deviceManager/rest', '')
url = url.replace('/210235G7J20000000000/', '')
if self.custom_results and self.custom_results.get(url):
result = self.custom_results[url]
if isinstance(result, six.string_types):
return jsonutils.loads(result)
if isinstance(result, dict) and result.get(method):
return jsonutils.loads(result[method])
if self.test_normal:
if self.test_multi_url_flag == 1:
data = '{"error":{"code":-403}}'
@ -383,7 +397,14 @@ class FakeHuaweiNasHelper(helper.RestHelper):
if url == "/system/":
data = """{"error":{"code":0},
"data":{"PRODUCTVERSION": "V300R003C10"}}"""
"data":{"PRODUCTVERSION": "V300R003C10",
"wwn": "fake_wwn"}}"""
if url == "/remote_device":
data = """{"error":{"code":0},
"data":[{"ID": "0",
"NAME": "fake_name",
"WWN": "fake_wwn"}]}"""
if url == "/ioclass" or url == "/ioclass/11":
data = QoS_response(method)
@ -568,7 +589,9 @@ class FakeHuaweiNasHelper(helper.RestHelper):
if url == "/FILESYSTEM?range=[0-8191]":
data = """{"error":{"code":0},
"data":[{"ID":"4",
"NAME":"share_fake_uuid"}]}"""
"NAME":"share_fake_uuid"},
{"ID":"8",
"NAME":"share_fake_new_uuid"}]}"""
if url == "/filesystem/4":
data, self.extend_share_flag, self.shrink_share_flag = (
@ -707,6 +730,33 @@ class FakeHuaweiNasHelper(helper.RestHelper):
else:
data = """{"error":{"code":0}}"""
if url == "/REPLICATIONPAIR":
data = """{"error":{"code":0},"data":{
"ID":"fake_pair_id"}}"""
if url == "/REPLICATIONPAIR/sync":
data = """{"error":{"code":0}}"""
if url == "/REPLICATIONPAIR/switch":
data = """{"error":{"code":0}}"""
if url == "/REPLICATIONPAIR/split":
data = """{"error":{"code":0}}"""
if url == "/REPLICATIONPAIR/CANCEL_SECODARY_WRITE_LOCK":
data = """{"error":{"code":0}}"""
if url == "/REPLICATIONPAIR/SET_SECODARY_WRITE_LOCK":
data = """{"error":{"code":0}}"""
if url == "/REPLICATIONPAIR/fake_pair_id":
data = """{"error":{"code":0},"data":{
"ID": "fake_pair_id",
"HEALTHSTATUS": "1",
"SECRESDATASTATUS": "1",
"ISPRIMARY": "false",
"SECRESACCESS": "1",
"RUNNINGSTATUS": "1"}}"""
else:
data = '{"error":{"code":31755596}}'
@ -714,21 +764,63 @@ class FakeHuaweiNasHelper(helper.RestHelper):
return res_json
class FakeRpcClient(rpcapi.HuaweiV3API):
def __init__(self, helper):
super(self.__class__, self).__init__()
self.replica_mgr = replication.ReplicaPairManager(helper)
class fake_call_context(object):
def __init__(self, replica_mgr):
self.replica_mgr = replica_mgr
def call(self, context, func_name, **kwargs):
if func_name == 'create_replica_pair':
return self.replica_mgr.create_replica_pair(
context, **kwargs)
def create_replica_pair(self, context, host, local_share_info,
remote_device_wwn, remote_fs_id):
self.client.prepare = mock.Mock(
return_value=self.fake_call_context(self.replica_mgr))
return super(self.__class__, self).create_replica_pair(
context, host, local_share_info,
remote_device_wwn, remote_fs_id)
class FakeRpcServer(object):
def start(self):
pass
class FakePrivateStorage(object):
def __init__(self):
self.map = {}
def get(self, entity_id, key=None, default=None):
if self.map.get(entity_id):
return self.map[entity_id].get(key, default)
return default
def update(self, entity_id, details, delete_existing=False):
self.map[entity_id] = details
def delete(self, entity_id, key=None):
self.map.pop(entity_id)
class FakeHuaweiNasDriver(huawei_nas.HuaweiNasDriver):
"""Fake HuaweiNasDriver."""
def __init__(self, *args, **kwargs):
huawei_nas.HuaweiNasDriver.__init__(self, *args, **kwargs)
self.plugin = FakeV3StorageConnection(self.configuration)
self.plugin = connection.V3StorageConnection(self.configuration)
class FakeV3StorageConnection(connection.V3StorageConnection):
"""Fake V3StorageConnection."""
def __init__(self, configuration):
connection.V3StorageConnection.__init__(self, configuration)
self.configuration = configuration
self.helper = FakeHuaweiNasHelper(self.configuration)
self.plugin.helper = FakeHuaweiNasHelper(self.configuration)
self.plugin.replica_mgr = replication.ReplicaPairManager(
self.plugin.helper)
self.plugin.rpc_client = FakeRpcClient(self.plugin.helper)
self.plugin.private_storage = FakePrivateStorage()
@ddt.ddt
@ -747,6 +839,7 @@ class HuaweiShareDriverTestCase(test.TestCase):
self.configuration.network_config_group = 'fake_network_config_group'
self.configuration.admin_network_config_group = (
'fake_admin_network_config_group')
self.configuration.config_group = 'fake_share_backend_name'
self.configuration.share_backend_name = 'fake_share_backend_name'
self.configuration.huawei_share_backend = 'V3'
self.configuration.max_over_subscription_ratio = 1
@ -1203,6 +1296,27 @@ class HuaweiShareDriverTestCase(test.TestCase):
}
}
self.active_replica = {
'id': 'fake_active_replica_id',
'share_id': 'fake_share_id',
'name': 'share_fake_uuid',
'host': 'hostname1@backend_name1#OpenStack_Pool',
'size': 5,
'share_proto': 'NFS',
'replica_state': common_constants.REPLICA_STATE_ACTIVE,
}
self.new_replica = {
'id': 'fake_new_replica_id',
'share_id': 'fake_share_id',
'name': 'share_fake_new_uuid',
'host': 'hostname2@backend_name2#OpenStack_Pool',
'size': 5,
'share_proto': 'NFS',
'replica_state': common_constants.REPLICA_STATE_OUT_OF_SYNC,
'share_type_id': 'fake_id',
}
def _get_share_by_proto(self, share_proto):
if share_proto == "NFS":
share = self.share_nfs
@ -1217,6 +1331,14 @@ class HuaweiShareDriverTestCase(test.TestCase):
'share_type_get',
mock.Mock(return_value=share_type))
def test_no_configuration(self):
self.mock_object(huawei_nas.HuaweiNasDriver,
'driver_handles_share_servers',
True)
self.assertRaises(exception.InvalidInput,
huawei_nas.HuaweiNasDriver)
def test_conf_product_fail(self):
self.recreate_fake_conf_file(product_flag=False)
self.driver.plugin.configuration.manila_huawei_conf_file = (
@ -1261,6 +1383,15 @@ class HuaweiShareDriverTestCase(test.TestCase):
self.assertRaises(exception.InvalidInput,
self.driver.plugin.check_conf_file)
def test_conf_snapshot_replication_conflict(self):
self.recreate_fake_conf_file(snapshot_support=True,
replication_support=True)
self.driver.plugin.configuration.manila_huawei_conf_file = (
self.fake_conf_file)
self.driver.plugin._setup_conf()
self.assertRaises(exception.BadConfigurationException,
self.driver.plugin.check_conf_file)
def test_get_backend_driver_fail(self):
test_fake_conf_file = None
self.driver.plugin.configuration.manila_huawei_conf_file = (
@ -1322,9 +1453,15 @@ class HuaweiShareDriverTestCase(test.TestCase):
self.assertRaises(exception.InvalidInput,
self.driver.plugin.helper._read_xml)
def test_connect_success(self):
FakeRpcServer.start = mock.Mock()
rpc.get_server = mock.Mock(return_value=FakeRpcServer())
self.driver.plugin.connect()
FakeRpcServer.start.assert_called_once()
def test_connect_fail(self):
self.driver.plugin.configuration = None
self.assertRaises(exception.InvalidInput,
self.driver.plugin.helper.test_multi_url_flag = 1
self.assertRaises(exception.InvalidShare,
self.driver.plugin.connect)
def test_login_success(self):
@ -2085,14 +2222,14 @@ class HuaweiShareDriverTestCase(test.TestCase):
def test_create_share_from_snapshot_nonefs(self):
self.driver.plugin.helper.login()
self.mock_object(self.driver.plugin.helper,
'_get_fsid_by_name',
'get_fsid_by_name',
mock.Mock(return_value={}))
self.assertRaises(exception.StorageResourceNotFound,
self.driver.create_share_from_snapshot,
self._context, self.share_nfs,
self.nfs_snapshot, self.share_server)
self.assertTrue(self.driver.plugin.helper.
_get_fsid_by_name.called)
get_fsid_by_name.called)
def test_create_share_from_notexistingsnapshot_fail(self):
self.driver.plugin.helper.login()
@ -2272,32 +2409,47 @@ class HuaweiShareDriverTestCase(test.TestCase):
self.assertEqual("100.115.10.68:/share_fake_uuid", location)
def test_get_share_stats_refresh_pool_not_exist(self):
self.driver.plugin.helper.login()
self.recreate_fake_conf_file(pool_node_flag=False)
self.driver.plugin.configuration.manila_huawei_conf_file = (
self.fake_conf_file)
self.assertRaises(exception.InvalidInput,
self.driver._update_share_stats)
def test_get_share_stats_refresh(self):
self.driver.plugin.helper.login()
@ddt.data({"snapshot_support": True,
"replication_support": False},
{"snapshot_support": False,
"replication_support": True})
@ddt.unpack
def test_get_share_stats_refresh(self, snapshot_support,
replication_support):
self.recreate_fake_conf_file(snapshot_support=snapshot_support,
replication_support=replication_support)
self.driver.plugin.configuration.manila_huawei_conf_file = (
self.fake_conf_file)
self.driver.plugin._setup_conf()
self.driver._update_share_stats()
expected = {}
expected["share_backend_name"] = "fake_share_backend_name"
expected["driver_handles_share_servers"] = False
expected["vendor_name"] = 'Huawei'
expected["driver_version"] = '1.3'
expected["storage_protocol"] = 'NFS_CIFS'
expected['reserved_percentage'] = 0
expected['total_capacity_gb'] = 0.0
expected['free_capacity_gb'] = 0.0
expected['qos'] = True
expected["snapshot_support"] = True
expected['replication_domain'] = None
expected['filter_function'] = None
expected['goodness_function'] = None
expected["pools"] = []
expected = {
"share_backend_name": "fake_share_backend_name",
"driver_handles_share_servers": False,
"vendor_name": "Huawei",
"driver_version": "1.3",
"storage_protocol": "NFS_CIFS",
"reserved_percentage": 0,
"total_capacity_gb": 0.0,
"free_capacity_gb": 0.0,
"qos": True,
"snapshot_support": snapshot_support,
"replication_domain": None,
"filter_function": None,
"goodness_function": None,
"pools": [],
}
if replication_support:
expected['replication_type'] = 'dr'
pool = dict(
pool_name='OpenStack_Pool',
total_capacity_gb=2.0,
@ -2313,7 +2465,7 @@ class HuaweiShareDriverTestCase(test.TestCase):
huawei_smartcache=[True, False],
huawei_smartpartition=[True, False],
huawei_sectorsize=[True, False],
huawei_disk_type='ssd'
huawei_disk_type='ssd',
)
expected["pools"].append(pool)
self.assertEqual(expected, self.driver._stats)
@ -3742,6 +3894,13 @@ class HuaweiShareDriverTestCase(test.TestCase):
share,
self.share_server)
def _add_conf_file_element(self, doc, parent_element, name, value=None):
new_element = doc.createElement(name)
if value:
new_text = doc.createTextNode(value)
new_element.appendChild(new_text)
parent_element.appendChild(new_element)
def create_fake_conf_file(self, fake_conf_file,
product_flag=True, username_flag=True,
pool_node_flag=True, timeout_flag=True,
@ -3749,7 +3908,9 @@ class HuaweiShareDriverTestCase(test.TestCase):
alloctype_value='Thick',
sectorsize_value='4',
multi_url=False,
logical_port='100.115.10.68'):
logical_port='100.115.10.68',
snapshot_support=True,
replication_support=False):
doc = xml.dom.minidom.Document()
config = doc.createElement('Config')
doc.appendChild(config)
@ -3802,6 +3963,14 @@ class HuaweiShareDriverTestCase(test.TestCase):
url.appendChild(url_text)
storage.appendChild(url)
if snapshot_support:
self._add_conf_file_element(
doc, storage, 'SnapshotSupport', 'True')
if replication_support:
self._add_conf_file_element(
doc, storage, 'ReplicationSupport', 'True')
lun = doc.createElement('Filesystem')
config.appendChild(lun)
@ -3878,7 +4047,9 @@ class HuaweiShareDriverTestCase(test.TestCase):
alloctype_value='Thick',
sectorsize_value='4',
multi_url=False,
logical_port='100.115.10.68'):
logical_port='100.115.10.68',
snapshot_support=True,
replication_support=False):
self.tmp_dir = tempfile.mkdtemp()
self.fake_conf_file = self.tmp_dir + '/manila_huawei_conf.xml'
self.addCleanup(shutil.rmtree, self.tmp_dir)
@ -3886,5 +4057,468 @@ class HuaweiShareDriverTestCase(test.TestCase):
username_flag, pool_node_flag,
timeout_flag, wait_interval_flag,
alloctype_value, sectorsize_value,
multi_url, logical_port)
multi_url, logical_port,
snapshot_support, replication_support)
self.addCleanup(os.remove, self.fake_conf_file)
@ddt.data(common_constants.STATUS_ERROR,
common_constants.REPLICA_STATE_IN_SYNC,
common_constants.REPLICA_STATE_OUT_OF_SYNC)
def test_create_replica_success(self, replica_state):
share_type = self.fake_type_not_extra['test_with_extra']
self.mock_object(db, 'share_type_get',
mock.Mock(return_value=share_type))
if replica_state == common_constants.STATUS_ERROR:
self.driver.plugin.helper.custom_results[
'/REPLICATIONPAIR/fake_pair_id'] = {
"GET": """{"error":{"code":0},
"data":{"HEALTHSTATUS": "2"}}"""}
elif replica_state == common_constants.REPLICA_STATE_OUT_OF_SYNC:
self.driver.plugin.helper.custom_results[
'/REPLICATIONPAIR/fake_pair_id'] = {
"GET": """{"error":{"code":0},
"data":{"HEALTHSTATUS": "1",
"RUNNINGSTATUS": "1",
"SECRESDATASTATUS": "5"}}"""}
result = self.driver.create_replica(
self._context,
[self.active_replica, self.new_replica],
self.new_replica,
[], [], None)
expected = {
'export_locations': ['100.115.10.68:/share_fake_new_uuid'],
'replica_state': replica_state,
'access_rules_status': common_constants.STATUS_ACTIVE,
}
self.assertEqual(expected, result)
self.assertEqual('fake_pair_id',
self.driver.plugin.private_storage.get(
'fake_share_id', 'replica_pair_id'))
@ddt.data({'url': '/FILESYSTEM?range=[0-8191]',
'url_result': '{"error":{"code":0}}',
'expected_exception': exception.ReplicationException},
{'url': '/NFSHARE',
'url_result': '{"error":{"code":-403}}',
'expected_exception': exception.InvalidShare},
{'url': '/REPLICATIONPAIR',
'url_result': '{"error":{"code":-403}}',
'expected_exception': exception.InvalidShare},)
@ddt.unpack
def test_create_replica_fail(self, url, url_result, expected_exception):
share_type = self.fake_type_not_extra['test_with_extra']
self.mock_object(db, 'share_type_get',
mock.Mock(return_value=share_type))
self.driver.plugin.helper.custom_results[url] = url_result
self.assertRaises(expected_exception,
self.driver.create_replica,
self._context,
[self.active_replica, self.new_replica],
self.new_replica,
[], [], None)
self.assertIsNone(self.driver.plugin.private_storage.get(
'fake_share_id', 'replica_pair_id'))
def test_create_replica_with_get_state_fail(self):
share_type = self.fake_type_not_extra['test_with_extra']
self.mock_object(db, 'share_type_get',
mock.Mock(return_value=share_type))
self.driver.plugin.helper.custom_results[
'/REPLICATIONPAIR/fake_pair_id'] = {
"GET": """{"error":{"code":-403}}"""}
result = self.driver.create_replica(
self._context,
[self.active_replica, self.new_replica],
self.new_replica,
[], [], None)
expected = {
'export_locations': ['100.115.10.68:/share_fake_new_uuid'],
'replica_state': common_constants.STATUS_ERROR,
'access_rules_status': common_constants.STATUS_ACTIVE,
}
self.assertEqual(expected, result)
self.assertEqual('fake_pair_id',
self.driver.plugin.private_storage.get(
'fake_share_id', 'replica_pair_id'))
def test_create_replica_with_already_exists(self):
self.driver.plugin.private_storage.update(
'fake_share_id',
{'replica_pair_id': 'fake_pair_id'})
self.assertRaises(exception.ReplicationException,
self.driver.create_replica,
self._context,
[self.active_replica, self.new_replica],
self.new_replica,
[], [], None)
@ddt.data({'pair_info': """{"HEALTHSTATUS": "2",
"SECRESDATASTATUS": "2",
"ISPRIMARY": "false",
"SECRESACCESS": "1",
"RUNNINGSTATUS": "1"}""",
'assert_method': 'get_replication_pair_by_id'},
{'pair_info': """{"HEALTHSTATUS": "1",
"SECRESDATASTATUS": "2",
"ISPRIMARY": "true",
"SECRESACCESS": "1",
"RUNNINGSTATUS": "1"}""",
'assert_method': 'switch_replication_pair'},
{'pair_info': """{"HEALTHSTATUS": "1",
"SECRESDATASTATUS": "2",
"ISPRIMARY": "false",
"SECRESACCESS": "3",
"RUNNINGSTATUS": "1"}""",
'assert_method': 'set_pair_secondary_write_lock'},
{'pair_info': """{"HEALTHSTATUS": "1",
"SECRESDATASTATUS": "2",
"ISPRIMARY": "false",
"SECRESACCESS": "1",
"RUNNINGSTATUS": "33"}""",
'assert_method': 'sync_replication_pair'},)
@ddt.unpack
def test_update_replica_state_success(self, pair_info, assert_method):
self.driver.plugin.private_storage.update(
'fake_share_id',
{'replica_pair_id': 'fake_pair_id'})
helper_method = getattr(self.driver.plugin.helper, assert_method)
mocker = self.mock_object(self.driver.plugin.helper,
assert_method,
mock.Mock(wraps=helper_method))
self.driver.plugin.helper.custom_results[
'/REPLICATIONPAIR/fake_pair_id'] = {
"GET": """{"error":{"code":0},
"data":%s}""" % pair_info}
self.driver.update_replica_state(
self._context,
[self.active_replica, self.new_replica],
self.new_replica,
[], [], None)
mocker.assert_called_with('fake_pair_id')
@ddt.data({'pair_info': """{"HEALTHSTATUS": "1",
"SECRESDATASTATUS": "2",
"ISPRIMARY": "true",
"SECRESACCESS": "1",
"RUNNINGSTATUS": "1"}""",
'assert_method': 'switch_replication_pair',
'error_url': '/REPLICATIONPAIR/switch'},
{'pair_info': """{"HEALTHSTATUS": "1",
"SECRESDATASTATUS": "2",
"ISPRIMARY": "false",
"SECRESACCESS": "3",
"RUNNINGSTATUS": "1"}""",
'assert_method': 'set_pair_secondary_write_lock',
'error_url': '/REPLICATIONPAIR/SET_SECODARY_WRITE_LOCK'},
{'pair_info': """{"HEALTHSTATUS": "1",
"SECRESDATASTATUS": "2",
"ISPRIMARY": "false",
"SECRESACCESS": "1",
"RUNNINGSTATUS": "26"}""",
'assert_method': 'sync_replication_pair',
'error_url': '/REPLICATIONPAIR/sync'},)
@ddt.unpack
def test_update_replica_state_with_exception_ignore(
self, pair_info, assert_method, error_url):
self.driver.plugin.private_storage.update(
'fake_share_id',
{'replica_pair_id': 'fake_pair_id'})
helper_method = getattr(self.driver.plugin.helper, assert_method)
mocker = self.mock_object(self.driver.plugin.helper,
assert_method,
mock.Mock(wraps=helper_method))
self.driver.plugin.helper.custom_results[
error_url] = """{"error":{"code":-403}}"""
self.driver.plugin.helper.custom_results[
'/REPLICATIONPAIR/fake_pair_id'] = {
"GET": """{"error":{"code":0},
"data":%s}""" % pair_info}
self.driver.update_replica_state(
self._context,
[self.active_replica, self.new_replica],
self.new_replica,
[], [], None)
mocker.assert_called_once_with('fake_pair_id')
def test_update_replica_state_with_replication_abnormal(self):
self.driver.plugin.private_storage.update(
'fake_share_id',
{'replica_pair_id': 'fake_pair_id'})
self.driver.plugin.helper.custom_results[
'/REPLICATIONPAIR/fake_pair_id'] = {
"GET": """{"error":{"code":0},
"data":{"HEALTHSTATUS": "2"}}"""}
result = self.driver.update_replica_state(
self._context,
[self.active_replica, self.new_replica],
self.new_replica,
[], [], None)
self.assertEqual(common_constants.STATUS_ERROR, result)
def test_update_replica_state_with_no_pair_id(self):
result = self.driver.update_replica_state(
self._context,
[self.active_replica, self.new_replica],
self.new_replica,
[], [], None)
self.assertEqual(common_constants.STATUS_ERROR, result)
@ddt.data('true', 'false')
def test_promote_replica_success(self, is_primary):
self.driver.plugin.private_storage.update(
'fake_share_id',
{'replica_pair_id': 'fake_pair_id'})
self.driver.plugin.helper.custom_results[
'/REPLICATIONPAIR/fake_pair_id'] = {
"GET": """{"error": {"code": 0},
"data": {"HEALTHSTATUS": "1",
"RUNNINGSTATUS": "1",
"SECRESDATASTATUS": "2",
"ISPRIMARY": "%s"}}""" % is_primary}
result = self.driver.promote_replica(
self._context,
[self.active_replica, self.new_replica],
self.new_replica,
[], None)
expected = [
{'id': self.new_replica['id'],
'replica_state': common_constants.REPLICA_STATE_ACTIVE,
'access_rules_status': common_constants.STATUS_ACTIVE},
{'id': self.active_replica['id'],
'replica_state': common_constants.REPLICA_STATE_IN_SYNC,
'access_rules_status': common_constants.STATUS_OUT_OF_SYNC},
]
self.assertEqual(expected, result)
@ddt.data({'mock_method': 'update_access',
'new_access_status': common_constants.STATUS_OUT_OF_SYNC,
'old_access_status': common_constants.STATUS_OUT_OF_SYNC},
{'mock_method': 'clear_access',
'new_access_status': common_constants.STATUS_OUT_OF_SYNC,
'old_access_status': common_constants.STATUS_ACTIVE},)
@ddt.unpack
def test_promote_replica_with_access_update_error(
self, mock_method, new_access_status, old_access_status):
self.driver.plugin.private_storage.update(
'fake_share_id',
{'replica_pair_id': 'fake_pair_id'})
self.driver.plugin.helper.custom_results[
'/REPLICATIONPAIR/fake_pair_id'] = {
"GET": """{"error": {"code": 0},
"data": {"HEALTHSTATUS": "1",
"RUNNINGSTATUS": "1",
"SECRESDATASTATUS": "2",
"ISPRIMARY": "false"}}"""}
mocker = self.mock_object(self.driver.plugin,
mock_method,
mock.Mock(side_effect=Exception('err')))
result = self.driver.promote_replica(
self._context,
[self.active_replica, self.new_replica],
self.new_replica,
[], None)
expected = [
{'id': self.new_replica['id'],
'replica_state': common_constants.REPLICA_STATE_ACTIVE,
'access_rules_status': new_access_status},
{'id': self.active_replica['id'],
'replica_state': common_constants.REPLICA_STATE_IN_SYNC,
'access_rules_status': old_access_status},
]
self.assertEqual(expected, result)
mocker.assert_called()
@ddt.data({'error_url': '/REPLICATIONPAIR/split',
'assert_method': 'split_replication_pair'},
{'error_url': '/REPLICATIONPAIR/switch',
'assert_method': 'switch_replication_pair'},
{'error_url': '/REPLICATIONPAIR/SET_SECODARY_WRITE_LOCK',
'assert_method': 'set_pair_secondary_write_lock'},
{'error_url': '/REPLICATIONPAIR/sync',
'assert_method': 'sync_replication_pair'},)
@ddt.unpack
def test_promote_replica_with_error_ignore(self, error_url, assert_method):
self.driver.plugin.private_storage.update(
'fake_share_id',
{'replica_pair_id': 'fake_pair_id'})
helper_method = getattr(self.driver.plugin.helper, assert_method)
mocker = self.mock_object(self.driver.plugin.helper,
assert_method,
mock.Mock(wraps=helper_method))
self.driver.plugin.helper.custom_results[
error_url] = '{"error":{"code":-403}}'
fake_pair_infos = [{'ISPRIMARY': 'False',
'HEALTHSTATUS': '1',
'RUNNINGSTATUS': '1',
'SECRESDATASTATUS': '1'},
{'HEALTHSTATUS': '2'}]
self.mock_object(self.driver.plugin.replica_mgr,
'_get_replication_pair_info',
mock.Mock(side_effect=fake_pair_infos))
result = self.driver.promote_replica(
self._context,
[self.active_replica, self.new_replica],
self.new_replica,
[], None)
expected = [
{'id': self.new_replica['id'],
'replica_state': common_constants.REPLICA_STATE_ACTIVE,
'access_rules_status': common_constants.STATUS_ACTIVE},
{'id': self.active_replica['id'],
'replica_state': common_constants.STATUS_ERROR,
'access_rules_status': common_constants.STATUS_OUT_OF_SYNC},
]
self.assertEqual(expected, result)
mocker.assert_called_once_with('fake_pair_id')
@ddt.data({'error_url': '/REPLICATIONPAIR/fake_pair_id',
'url_result': """{"error":{"code":0},
"data":{"HEALTHSTATUS": "1",
"ISPRIMARY": "false",
"RUNNINGSTATUS": "1",
"SECRESDATASTATUS": "5"}}""",
'expected_exception': exception.ReplicationException},
{'error_url': '/REPLICATIONPAIR/CANCEL_SECODARY_WRITE_LOCK',
'url_result': """{"error":{"code":-403}}""",
'expected_exception': exception.InvalidShare},)
@ddt.unpack
def test_promote_replica_fail(self, error_url, url_result,
expected_exception):
self.driver.plugin.private_storage.update(
'fake_share_id',
{'replica_pair_id': 'fake_pair_id'})
self.driver.plugin.helper.custom_results[error_url] = url_result
self.assertRaises(expected_exception,
self.driver.promote_replica,
self._context,
[self.active_replica, self.new_replica],
self.new_replica,
[], None)
def test_promote_replica_with_no_pair_id(self):
self.assertRaises(exception.ReplicationException,
self.driver.promote_replica,
self._context,
[self.active_replica, self.new_replica],
self.new_replica,
[], None)
@ddt.data({'url': '/REPLICATIONPAIR/split',
'url_result': '{"error":{"code":-403}}'},
{'url': '/REPLICATIONPAIR/fake_pair_id',
'url_result': '{"error":{"code":1077937923}}'},
{'url': '/REPLICATIONPAIR/fake_pair_id',
'url_result': '{"error":{"code":0}}'},)
@ddt.unpack
def test_delete_replica_success(self, url, url_result):
self.driver.plugin.private_storage.update(
'fake_share_id',
{'replica_pair_id': 'fake_pair_id'})
self.driver.plugin.helper.custom_results['/filesystem/8'] = {
"DELETE": '{"error":{"code":0}}'}
self.driver.plugin.helper.custom_results[url] = url_result
self.driver.delete_replica(self._context,
[self.active_replica, self.new_replica],
[], self.new_replica, None)
self.assertIsNone(self.driver.plugin.private_storage.get(
'fake_share_id', 'replica_pair_id'))
@ddt.data({'url': '/REPLICATIONPAIR/fake_pair_id',
'expected': 'fake_pair_id'},
{'url': '/filesystem/8',
'expected': None},)
@ddt.unpack
def test_delete_replica_fail(self, url, expected):
self.driver.plugin.private_storage.update(
'fake_share_id',
{'replica_pair_id': 'fake_pair_id'})
self.driver.plugin.helper.custom_results[url] = {
"DELETE": '{"error":{"code":-403}}'}
self.assertRaises(exception.InvalidShare,
self.driver.delete_replica,
self._context,
[self.active_replica, self.new_replica],
[], self.new_replica, None)
self.assertEqual(expected,
self.driver.plugin.private_storage.get(
'fake_share_id', 'replica_pair_id'))
def test_delete_replica_with_no_pair_id(self):
self.driver.plugin.helper.custom_results['/filesystem/8'] = {
"DELETE": '{"error":{"code":0}}'}
self.driver.delete_replica(self._context,
[self.active_replica, self.new_replica],
[], self.new_replica, None)
@ddt.data({'pair_info': """{"HEALTHSTATUS": "2"}""",
'expected_state': common_constants.STATUS_ERROR},
{'pair_info': """{"HEALTHSTATUS": "1",
"RUNNINGSTATUS": "26"}""",
'expected_state': common_constants.REPLICA_STATE_OUT_OF_SYNC},
{'pair_info': """{"HEALTHSTATUS": "1",
"RUNNINGSTATUS": "33"}""",
'expected_state': common_constants.REPLICA_STATE_OUT_OF_SYNC},
{'pair_info': """{"HEALTHSTATUS": "1",
"RUNNINGSTATUS": "34"}""",
'expected_state': common_constants.STATUS_ERROR},
{'pair_info': """{"HEALTHSTATUS": "1",
"RUNNINGSTATUS": "35"}""",
'expected_state': common_constants.STATUS_ERROR},
{'pair_info': """{"HEALTHSTATUS": "1",
"SECRESDATASTATUS": "1",
"RUNNINGSTATUS": "1"}""",
'expected_state': common_constants.REPLICA_STATE_IN_SYNC},
{'pair_info': """{"HEALTHSTATUS": "1",
"SECRESDATASTATUS": "2",
"RUNNINGSTATUS": "1"}""",
'expected_state': common_constants.REPLICA_STATE_IN_SYNC},
{'pair_info': """{"HEALTHSTATUS": "1",
"SECRESDATASTATUS": "5",
"RUNNINGSTATUS": "1"}""",
'expected_state': common_constants.REPLICA_STATE_OUT_OF_SYNC})
@ddt.unpack
def test_get_replica_state(self, pair_info, expected_state):
self.driver.plugin.helper.custom_results[
'/REPLICATIONPAIR/fake_pair_id'] = {
"GET": """{"error":{"code":0},
"data":%s}""" % pair_info}
result_state = self.driver.plugin.replica_mgr.get_replica_state(
'fake_pair_id')
self.assertEqual(expected_state, result_state)

View File

@ -16,6 +16,7 @@
"""Tests For miscellaneous util methods used with share."""
from manila.common import constants
from manila.share import utils as share_utils
from manila import test
@ -140,3 +141,21 @@ class ShareUtilsTestCase(test.TestCase):
expected = None
self.assertEqual(expected,
share_utils.append_host(host, pool))
def test_get_active_replica_success(self):
replica_list = [{'id': '123456',
'replica_state': constants.REPLICA_STATE_IN_SYNC},
{'id': '654321',
'replica_state': constants.REPLICA_STATE_ACTIVE},
]
replica = share_utils.get_active_replica(replica_list)
self.assertEqual('654321', replica['id'])
def test_get_active_replica_not_exist(self):
replica_list = [{'id': '123456',
'replica_state': constants.REPLICA_STATE_IN_SYNC},
{'id': '654321',
'replica_state': constants.REPLICA_STATE_OUT_OF_SYNC},
]
replica = share_utils.get_active_replica(replica_list)
self.assertIsNone(replica)

View File

@ -168,6 +168,11 @@ ShareGroup = [
help="Defines whether to run replication tests or not. "
"Enable this feature if the driver is configured "
"for replication."),
cfg.BoolOpt("run_multiple_share_replicas_tests",
default=True,
help="Defines whether to run multiple replicas creation test "
"or not. Enable this if the driver can create more than "
"one replica for a share."),
cfg.BoolOpt("run_migration_tests",
default=False,
help="Enable or disable migration tests."),

View File

@ -175,6 +175,8 @@ class ReplicationTest(base.BaseSharesMixedTest):
self.delete_share_replica(share_replica["id"])
@test.attr(type=[base.TAG_POSITIVE, base.TAG_BACKEND])
@testtools.skipUnless(CONF.share.run_multiple_share_replicas_tests,
'Multiple share replicas tests are disabled.')
def test_add_multiple_share_replicas(self):
rep_domain, pools = self.get_pools_for_replication_domain()
if len(pools) < 3:

View File

@ -0,0 +1,10 @@
---
features:
- Huawei driver now supports replication. It reports a replication type
'dr'(Disaster Recovery), so "replication_type=dr" can be used in the
share type extra specs to schedule shares to the Huawei driver when
configured for replication.
- The huawei driver now supports turning off snapshot support.
issues:
- When snapshot support is turned on in the Huawei driver, replication
cannot be used.