Add Share Migration feature

Share Migration allows a share to be migrated from
one host#pool to another host#pool through the
"manila migrate <share> <host#pool>" command. It first
calls the driver to perform it in an optimized way if
possible. If the driver returns that it did not migrate,
it performs a generic migration.

A new field has been added to "shares" table: task_state,
which tracks migration status.

For driver migration, the method migrate_share in driver
base class should be overridden.

For generic migration, drivers may use new config options
to achieve the necessary configuration:
- migration_mounting_backend_ip: If backend has additional
exports IP for admin network, specify it here.
- migration_data_copy_node_ip: IP of entity performing
migration between backends, such as manila node or
data copy service node. This may not apply for
DHSS = true drivers.
- migration_protocol_mount_command: specify mount command
with protocol and additional parameters. Advisable to restrict
protocols per backend. Defaults to "mount -t <share_proto>".

If additional customization is needed, drivers may override
certain methods:
- _mount_share: return the mount command.
- _umount_share: return the umount command.
- _get_access_rule_for_data_copy: return an access rule with
the IP address which will allow the manila node or data copy node
to mount the share after added permission through
allow-access API command.

Change-Id: I8dde892cb7c0180b2b56d8c7d680dfe2320c2ec7
Implements: blueprint share-migration
This commit is contained in:
Rodrigo Barbieri 2015-05-13 11:19:48 -03:00
parent fa8faad187
commit 0524ab8fc2
31 changed files with 2962 additions and 42 deletions

View File

@ -25,6 +25,7 @@
"share:get_share_metadata": "rule:default",
"share:delete_share_metadata": "rule:default",
"share:update_share_metadata": "rule:default",
"share:migrate": "rule:admin_api",
"share_instance:index": "rule:admin_api",
"share_instance:show": "rule:admin_api",

View File

@ -25,6 +25,7 @@ from manila import db
from manila import exception
from manila import share
LOG = log.getLogger(__name__)
@ -114,6 +115,9 @@ class ShareAdminController(AdminController):
def _delete(self, *args, **kwargs):
return self.share_api.delete(*args, **kwargs)
def _migrate(self, *args, **kwargs):
return self.share_api.migrate_share(*args, **kwargs)
class ShareInstancesAdminController(AdminController):
"""AdminController for Share instances."""

View File

@ -50,6 +50,7 @@ REST_API_VERSION_HISTORY = """
* 1.3 - Snapshots become optional feature.
* 1.4 - Share instances admin API
* 1.5 - Consistency Group support
* 1.6 - Share Migration admin API
"""
@ -57,7 +58,7 @@ REST_API_VERSION_HISTORY = """
# The default api version request is defined to be the
# the minimum version of the API supported.
_MIN_API_VERSION = "1.0"
_MAX_API_VERSION = "1.5"
_MAX_API_VERSION = "1.6"
DEFAULT_API_VERSION = _MIN_API_VERSION

View File

@ -50,3 +50,7 @@ user documentation.
Consistency groups support. /consistency-groups and /cgsnapshots are
implemented. AdminActions 'os-force_delete and' 'os-reset_status' have been
updated for both new resources.
1.6
---
Share Migration admin API.

View File

@ -18,6 +18,7 @@
import ast
from oslo_log import log
from oslo_utils import strutils
from oslo_utils import uuidutils
import six
import webob
@ -88,6 +89,31 @@ class ShareController(wsgi.Controller):
return webob.Response(status_int=202)
@wsgi.Controller.api_version("1.6", None, True)
@wsgi.action("os-migrate_share")
def migrate_share(self, req, id, body):
"""Migrate a share to the specified host."""
context = req.environ['manila.context']
try:
share = self.share_api.get(context, id)
except exception.NotFound:
msg = _("Share %s not found.") % id
raise exc.HTTPNotFound(explanation=msg)
params = body['os-migrate_share']
try:
host = params['host']
except KeyError:
raise exc.HTTPBadRequest(explanation=_("Must specify 'host'"))
force_host_copy = params.get('force_host_copy', False)
try:
force_host_copy = strutils.bool_from_string(force_host_copy,
strict=True)
except ValueError:
raise exc.HTTPBadRequest(
explanation=_("Bad value for 'force_host_copy'"))
self.share_api.migrate_share(context, share, host, force_host_copy)
return webob.Response(status_int=202)
def index(self, req):
"""Returns a summary list of shares."""
return self._get_shares(req, is_detail=False)

View File

@ -62,6 +62,7 @@ class ViewBuilder(common.ViewBuilder):
'availability_zone': share.get('availability_zone'),
'created_at': share.get('created_at'),
'status': share.get('status'),
'task_state': share.get('task_state'),
'name': share.get('display_name'),
'description': share.get('display_description'),
'project_id': share.get('project_id'),

View File

@ -34,6 +34,17 @@ STATUS_SHRINKING_ERROR = 'shrinking_error'
STATUS_SHRINKING_POSSIBLE_DATA_LOSS_ERROR = (
'shrinking_possible_data_loss_error'
)
STATUS_TASK_STATE_MIGRATION_STARTING = 'migration_starting'
STATUS_TASK_STATE_MIGRATION_ERROR = 'migration_error'
STATUS_TASK_STATE_MIGRATION_SUCCESS = 'migration_success'
STATUS_TASK_STATE_MIGRATION_COMPLETING = 'migration_completing'
STATUS_TASK_STATE_MIGRATION_MIGRATING = 'migrating'
BUSY_TASK_STATES = (
STATUS_TASK_STATE_MIGRATION_COMPLETING,
STATUS_TASK_STATE_MIGRATION_MIGRATING,
STATUS_TASK_STATE_MIGRATION_STARTING,
)
TRANSITIONAL_STATUSES = (
STATUS_CREATING, STATUS_DELETING,
@ -84,6 +95,14 @@ ACCESS_LEVELS = (
ACCESS_LEVEL_RO,
)
TASK_STATE_STATUSES = (
STATUS_TASK_STATE_MIGRATION_STARTING,
STATUS_TASK_STATE_MIGRATION_ERROR,
STATUS_TASK_STATE_MIGRATION_SUCCESS,
STATUS_TASK_STATE_MIGRATION_COMPLETING,
STATUS_TASK_STATE_MIGRATION_MIGRATING,
)
class ExtraSpecs(object):

View File

@ -0,0 +1,37 @@
# Copyright 2015 Hitachi Data Systems.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Add shares.task_state
Revision ID: 323840a08dc4
Revises: 3651e16d7c43
Create Date: 2015-04-30 07:58:45.175790
"""
# revision identifiers, used by Alembic.
revision = '323840a08dc4'
down_revision = '3651e16d7c43'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column('shares',
sa.Column('task_state', sa.String(255)))
def downgrade():
op.drop_column('shares', 'task_state')

View File

@ -351,7 +351,7 @@ def service_get_by_host_and_topic(context, host, topic):
filter_by(topic=topic).\
first()
if not result:
raise exception.ServiceNotFound(service_id=None)
raise exception.ServiceNotFound(service_id=host)
return result

View File

@ -198,8 +198,8 @@ class Share(BASE, ManilaBase):
@property
def export_location(self):
if len(self.export_locations) > 0:
return self.export_locations[0]
if len(self.instances) > 0:
return self.instance.export_location
@property
def export_locations(self):
@ -207,8 +207,9 @@ class Share(BASE, ManilaBase):
# replication functionality will be implemented.
all_export_locations = []
for instance in self.instances:
for export_location in instance.export_locations:
all_export_locations.append(export_location['path'])
if instance['status'] == constants.STATUS_AVAILABLE:
for export_location in instance.export_locations:
all_export_locations.append(export_location['path'])
return all_export_locations
@ -233,8 +234,18 @@ class Share(BASE, ManilaBase):
@property
def instance(self):
# NOTE(ganso): We prefer instances with AVAILABLE status,
# and we also prefer to show any other status than CREATING
result = None
if len(self.instances) > 0:
return self.instances[0]
for instance in self.instances:
if instance.status == constants.STATUS_AVAILABLE:
return instance
elif instance.status == constants.STATUS_CREATING:
result = instance
if result is None:
result = self.instances[0]
return result
id = Column(String(36), primary_key=True)
deleted = Column(String(36), default='False')
@ -255,7 +266,7 @@ class Share(BASE, ManilaBase):
nullable=True)
source_cgsnapshot_member_id = Column(String(36), nullable=True)
task_state = Column(String(255))
instances = orm.relationship(
"ShareInstance",
lazy='immediate',

View File

@ -228,6 +228,10 @@ class InvalidShareServer(Invalid):
message = _("Share server %(share_server_id)s is not valid.")
class ShareMigrationFailed(ManilaException):
message = _("Share migration failed: %(reason)s")
class ShareServerNotCreated(ManilaException):
message = _("Share server %(share_server_id)s failed on creation.")

View File

@ -109,3 +109,8 @@ class Scheduler(object):
def get_pools(self, context, filters):
"""Must override schedule method for scheduler to work."""
raise NotImplementedError(_("Must implement get_pools"))
def host_passes_filters(self, context, host, request_spec,
filter_properties):
"""Must override schedule method for migration to work."""
raise NotImplementedError(_("Must implement host_passes_filters"))

View File

@ -109,11 +109,9 @@ class FilterScheduler(driver.Scheduler):
snapshot_id=snapshot_id
)
def _schedule_share(self, context, request_spec, filter_properties=None):
"""Returns a list of hosts that meet the required specs.
def _format_filter_properties(self, context, filter_properties,
request_spec):
The list is ordered by their fitness.
"""
elevated = context.elevated()
share_properties = request_spec['share_properties']
@ -172,6 +170,18 @@ class FilterScheduler(driver.Scheduler):
self.populate_filter_properties_share(request_spec, filter_properties)
return filter_properties, share_properties
def _schedule_share(self, context, request_spec, filter_properties=None):
"""Returns a list of hosts that meet the required specs.
The list is ordered by their fitness.
"""
elevated = context.elevated()
filter_properties, share_properties = self._format_filter_properties(
context, filter_properties, request_spec)
# Find our local list of acceptable hosts by filtering and
# weighing our options. we virtually consume resources on
# it so subsequent selections can adjust accordingly.
@ -384,3 +394,23 @@ class FilterScheduler(driver.Scheduler):
if not weighed_hosts:
return None
return weighed_hosts[0].obj.host
def host_passes_filters(self, context, host, request_spec,
filter_properties):
elevated = context.elevated()
filter_properties, share_properties = self._format_filter_properties(
context, filter_properties, request_spec)
hosts = self.host_manager.get_all_host_states_share(elevated)
hosts = self.host_manager.get_filtered_hosts(hosts, filter_properties)
hosts = self.host_manager.get_weighed_hosts(hosts, filter_properties)
for tgt_host in hosts:
if tgt_host.obj.host == host:
return tgt_host.obj
msg = (_('Cannot place share %(id)s on %(host)s')
% {'id': request_spec['share_id'], 'host': host})
raise exception.NoValidHost(reason=msg)

View File

@ -23,6 +23,7 @@ from oslo_config import cfg
from oslo_log import log
from oslo_utils import excutils
from oslo_utils import importutils
import six
from manila.common import constants
from manila import context
@ -47,7 +48,7 @@ CONF.register_opt(scheduler_driver_opt)
class SchedulerManager(manager.Manager):
"""Chooses a host to create shares."""
RPC_API_VERSION = '1.3'
RPC_API_VERSION = '1.4'
def __init__(self, scheduler_driver=None, service_name=None,
*args, **kwargs):
@ -83,35 +84,69 @@ class SchedulerManager(manager.Manager):
self.driver.schedule_create_share(context, request_spec,
filter_properties)
except exception.NoValidHost as ex:
self._set_share_error_state_and_notify('create_share',
context, ex, request_spec)
self._set_share_state_and_notify('create_share',
{'status':
constants.STATUS_ERROR},
context, ex, request_spec)
except Exception as ex:
with excutils.save_and_reraise_exception():
self._set_share_error_state_and_notify('create_share',
context, ex,
request_spec)
self._set_share_state_and_notify('create_share',
{'status':
constants.STATUS_ERROR},
context, ex, request_spec)
def get_pools(self, context, filters=None):
"""Get active pools from the scheduler's cache."""
return self.driver.get_pools(context, filters)
def _set_share_error_state_and_notify(self, method, context, ex,
request_spec):
LOG.error(_LE("Failed to schedule_%(method)s: %(ex)s"),
{"method": method, "ex": ex})
def migrate_share_to_host(self, context, share_id, host,
force_host_copy, request_spec,
filter_properties=None):
"""Ensure that the host exists and can accept the share."""
def _migrate_share_set_error(self, context, ex, request_spec):
self._set_share_state_and_notify(
'migrate_share_to_host',
{'task_state': constants.STATUS_TASK_STATE_MIGRATION_ERROR},
context, ex, request_spec)
try:
tgt_host = self.driver.host_passes_filters(context, host,
request_spec,
filter_properties)
except exception.NoValidHost as ex:
_migrate_share_set_error(self, context, ex, request_spec)
except Exception as ex:
with excutils.save_and_reraise_exception():
_migrate_share_set_error(self, context, ex, request_spec)
else:
share_ref = db.share_get(context, share_id)
try:
share_rpcapi.ShareAPI().migrate_share(context,
share_ref, tgt_host,
force_host_copy)
except Exception as ex:
with excutils.save_and_reraise_exception():
_migrate_share_set_error(self, context, ex, request_spec)
def _set_share_state_and_notify(self, method, state, context, ex,
request_spec):
LOG.error(_LE("Failed to schedule %(method)s: %(ex)s"),
{"method": method, "ex": six.text_type(ex)})
share_state = {'status': 'error'}
properties = request_spec.get('share_properties', {})
share_id = request_spec.get('share_id', None)
if share_id:
db.share_update(context, share_id, share_state)
db.share_update(context, share_id, state)
payload = dict(request_spec=request_spec,
share_properties=properties,
share_id=share_id,
state=share_state,
state=state,
method=method,
reason=ex)

View File

@ -26,7 +26,7 @@ CONF = cfg.CONF
class SchedulerAPI(object):
'''Client side of the scheduler rpc API.
"""Client side of the scheduler rpc API.
API version history:
@ -35,15 +35,16 @@ class SchedulerAPI(object):
1.2 - Introduce Share Instances:
Replace create_share() - > create_share_instance()
1.3 - Add create_consistency_group method
'''
1.4 - Add migrate_share_to_host method
"""
RPC_API_VERSION = '1.3'
RPC_API_VERSION = '1.4'
def __init__(self):
super(SchedulerAPI, self).__init__()
target = messaging.Target(topic=CONF.scheduler_topic,
version=self.RPC_API_VERSION)
self.client = rpc.get_client(target, version_cap='1.3')
self.client = rpc.get_client(target, version_cap='1.4')
def create_share_instance(self, ctxt, request_spec=None,
filter_properties=None):
@ -84,3 +85,16 @@ class SchedulerAPI(object):
request_spec=request_spec_p,
filter_properties=filter_properties,
)
def migrate_share_to_host(self, ctxt, share_id, host,
force_host_copy=False, request_spec=None,
filter_properties=None):
cctxt = self.client.prepare(version='1.4')
request_spec_p = jsonutils.to_primitive(request_spec)
return cctxt.cast(ctxt, 'migrate_share_to_host',
share_id=share_id,
host=host,
force_host_copy=force_host_copy,
request_spec=request_spec_p,
filter_properties=filter_properties)

View File

@ -39,6 +39,9 @@ from manila import policy
from manila import quota
from manila.scheduler import rpcapi as scheduler_rpcapi
from manila.share import rpcapi as share_rpcapi
from manila.share import share_types
from manila.share import utils as share_utils
from manila import utils
share_api_opts = [
cfg.BoolOpt('use_scheduler_creating_share_from_snapshot',
@ -319,6 +322,8 @@ class API(base.Base):
self.scheduler_rpcapi.create_share_instance(
context, request_spec=request_spec, filter_properties={})
return share_instance
def manage(self, context, share_data, driver_options):
policy.check_policy(context, 'share', 'manage')
@ -360,6 +365,13 @@ class API(base.Base):
def unmanage(self, context, share):
policy.check_policy(context, 'share', 'unmanage')
# Make sure share is not part of a migration
if share['task_state'] in constants.BUSY_TASK_STATES:
msg = _("Share %s is busy as part of an active "
"task.") % share['id']
LOG.error(msg)
raise exception.InvalidShare(reason=msg)
update_data = {'status': constants.STATUS_UNMANAGING,
'terminated_at': timeutils.utcnow()}
share_ref = self.db.share_update(context, share['id'], update_data)
@ -381,7 +393,8 @@ class API(base.Base):
share_id = share['id']
statuses = (constants.STATUS_AVAILABLE, constants.STATUS_ERROR)
statuses = (constants.STATUS_AVAILABLE, constants.STATUS_ERROR,
constants.STATUS_INACTIVE)
if not (force or share['status'] in statuses):
msg = _("Share status must be one of %(statuses)s") % {
"statuses": statuses}
@ -399,6 +412,15 @@ class API(base.Base):
cgsnapshot_members_count)
raise exception.InvalidShare(reason=msg)
# Make sure share is not part of a migration
if share['task_state'] not in (
None, constants.STATUS_TASK_STATE_MIGRATION_ERROR,
constants.STATUS_TASK_STATE_MIGRATION_SUCCESS):
msg = _("Share %s is busy as part of an active "
"task.") % share['id']
LOG.error(msg)
raise exception.InvalidShare(reason=msg)
try:
reservations = QUOTAS.reserve(context,
project_id=project_id,
@ -423,7 +445,8 @@ class API(base.Base):
def delete_instance(self, context, share_instance, force=False):
policy.check_policy(context, 'share', 'delete')
statuses = (constants.STATUS_AVAILABLE, constants.STATUS_ERROR)
statuses = (constants.STATUS_AVAILABLE, constants.STATUS_ERROR,
constants.STATUS_INACTIVE)
if not (force or share_instance['status'] in statuses):
msg = _("Share instance status must be one of %(statuses)s") % {
"statuses": statuses}
@ -526,6 +549,78 @@ class API(base.Base):
self.share_rpcapi.create_snapshot(context, share, snapshot)
return snapshot
@policy.wrap_check_policy('share')
def migrate_share(self, context, share, host, force_host_copy):
"""Migrates share to a new host."""
policy.check_policy(context, 'share', 'migrate')
share_instance = share.instance
# We only handle "available" share for now
if share_instance['status'] != constants.STATUS_AVAILABLE:
msg = _('Share instance %(instance_id)s status must be available, '
'but current status is: %(instance_status)s.') % {
'instance_id': share_instance['id'],
'instance_status': share_instance['status']}
LOG.error(msg)
raise exception.InvalidShare(reason=msg)
# Make sure share is not part of a migration
if share['task_state'] in constants.BUSY_TASK_STATES:
msg = _("Share %s is busy as part of an active "
"task.") % share['id']
LOG.error(msg)
raise exception.InvalidShare(reason=msg)
# Make sure the destination host is different than the current one
if host == share_instance['host']:
msg = _('Destination host %(dest_host)s must be different '
'than the current host %(src_host)s.') % {
'dest_host': host,
'src_host': share_instance['host']}
LOG.error(msg)
raise exception.InvalidHost(reason=msg)
# We only handle shares without snapshots for now
snaps = self.db.share_snapshot_get_all_for_share(context, share['id'])
if snaps:
msg = _("Share %s must not have snapshots.") % share['id']
LOG.error(msg)
raise exception.InvalidShare(reason=msg)
# Make sure the host is in the list of available hosts
utils.validate_service_host(context, share_utils.extract_host(host))
# NOTE(ganso): there is the possibility of an error between here and
# manager code, which will cause the share to be stuck in
# MIGRATION_STARTING status. According to Liberty Midcycle discussion,
# this kind of scenario should not be cleaned up, the administrator
# should be issued to clear this status before a new migration request
# is made
self.update(
context, share,
{'task_state': constants.STATUS_TASK_STATE_MIGRATION_STARTING})
share_type = {}
share_type_id = share['share_type_id']
if share_type_id:
share_type = share_types.get_share_type(context, share_type_id)
request_spec = {'share_properties': share,
'share_instance_properties': share_instance.to_dict(),
'share_type': share_type,
'share_id': share['id']}
try:
self.scheduler_rpcapi.migrate_share_to_host(context, share['id'],
host, force_host_copy,
request_spec)
except Exception:
self.update(
context, share,
{'task_state': constants.STATUS_TASK_STATE_MIGRATION_ERROR})
raise
@policy.wrap_check_policy('share')
def delete_snapshot(self, context, snapshot, force=False):
statuses = (constants.STATUS_AVAILABLE, constants.STATUS_ERROR)
@ -697,7 +792,6 @@ class API(base.Base):
for share_instance in share.instances:
self.allow_access_to_instance(ctx, share_instance, access)
return access
def allow_access_to_instance(self, context, share_instance, access):
@ -726,7 +820,13 @@ class API(base.Base):
self.db.share_access_delete(ctx, access["id"])
elif access['state'] == constants.STATUS_ACTIVE:
for share_instance in share.instances:
self.deny_access_to_instance(ctx, share_instance, access)
try:
self.deny_access_to_instance(ctx, share_instance, access)
except exception.NotFound:
LOG.warn(_LW("Access rule %(access_id)s not found "
"for instance %(instance_id)s.") % {
'access_id': access['id'],
'instance_id': share_instance['id']})
else:
msg = _("Access policy should be %(active)s or in %(error)s "
"state") % {"active": constants.STATUS_ACTIVE,

View File

@ -18,14 +18,17 @@ Drivers for shares.
"""
import re
import time
from oslo_config import cfg
from oslo_log import log
import six
from manila import exception
from manila.i18n import _LE
from manila.i18n import _, _LE
from manila import network
from manila.share import utils as share_utils
from manila import utils
LOG = log.getLogger(__name__)
@ -72,6 +75,47 @@ share_opts = [
'total physical capacity. A ratio of 1.0 means '
'provisioned capacity cannot exceed the total physical '
'capacity. A ratio lower than 1.0 is invalid.'),
cfg.StrOpt(
'migration_tmp_location',
default='/tmp/',
help="Temporary path to create and mount shares during migration."),
cfg.ListOpt(
'migration_ignore_files',
default=['lost+found'],
help="List of files and folders to be ignored when migrating shares. "
"Items should be names (not including any path)."),
cfg.IntOpt(
'migration_wait_access_rules_timeout',
default=90,
help="Time to wait for access rules to be allowed/denied on backends "
"when migrating shares using generic approach (seconds)."),
cfg.IntOpt(
'migration_create_delete_share_timeout',
default=300,
help='Timeout for creating and deleting share instances '
'when performing share migration (seconds).'),
cfg.StrOpt(
'migration_mounting_backend_ip',
default=None,
help="Backend IP in admin network to use for mounting "
"shares during migration."),
cfg.StrOpt(
'migration_data_copy_node_ip',
default=None,
help="The IP of the node responsible for copying data during "
"migration, such as the data copy service node, reachable by "
"the backend."),
cfg.StrOpt(
'migration_protocol_mount_command',
default=None,
help="The command for mounting shares for this backend. Must specify"
"the executable and all necessary parameters for the protocol "
"supported. It is advisable to separate protocols per backend."),
cfg.BoolOpt(
'migration_readonly_support',
default=True,
help="Specify whether read only access mode is supported in this"
"backend."),
]
ssh_opts = [
@ -238,6 +282,260 @@ class ShareDriver(object):
{'actual': self.driver_handles_share_servers,
'allowed': driver_handles_share_servers})
def migrate_share(self, context, share_ref, host,
dest_driver_migration_info):
"""Is called to perform driver migration.
Driver should implement this method if willing to perform migration
in an optimized way, useful for when driver understands destination
backend.
:param context: The 'context.RequestContext' object for the request.
:param share_ref: Reference to the share being migrated.
:param host: Destination host and its capabilities.
:param dest_driver_migration_info: Migration information provided by
destination host.
:returns: Boolean value indicating if driver migration succeeded.
:returns: Dictionary containing a model update.
"""
return None, None
def get_driver_migration_info(self, context, share_instance, share_server):
"""Is called to provide necessary driver migration logic."""
return None
def get_migration_info(self, context, share_instance, share_server):
"""Is called to provide necessary generic migration logic."""
mount_cmd = self._get_mount_command(context, share_instance,
share_server)
umount_cmd = self._get_unmount_command(context, share_instance,
share_server)
access = self._get_access_rule_for_data_copy(
context, share_instance, share_server)
return {'mount': mount_cmd,
'umount': umount_cmd,
'access': access}
def _get_mount_command(self, context, share_instance, share_server):
"""Is called to delegate mounting share logic."""
mount_cmd = self._get_mount_command_protocol(share_instance,
share_server)
mount_ip = self._get_mount_ip(share_instance, share_server)
mount_cmd.append(mount_ip)
mount_path = self.configuration.safe_get(
'migration_tmp_location') + share_instance['id']
mount_cmd.append(mount_path)
return mount_cmd
def _get_mount_command_protocol(self, share_instance, share_server):
mount_cmd = self.configuration.safe_get(
'migration_protocol_mount_command')
if mount_cmd:
return mount_cmd.split()
else:
return ['mount', '-t', share_instance['share_proto'].lower()]
def _get_mount_ip(self, share_instance, share_server):
# Note(ganso): DHSS = true drivers may need to override this method
# and use information saved in share_server structure.
mount_ip = self.configuration.safe_get('migration_mounting_backend_ip')
old_ip = share_instance['export_locations'][0]['path']
if mount_ip:
# NOTE(ganso): Does not currently work with hostnames and ipv6.
p = re.compile("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")
new_ip = p.sub(mount_ip, old_ip)
return new_ip
else:
return old_ip
def _get_unmount_command(self, context, share_instance, share_server):
return ['umount',
self.configuration.safe_get('migration_tmp_location')
+ share_instance['id']]
def _get_access_rule_for_data_copy(
self, context, share_instance, share_server):
"""Is called to obtain access rule so data copy node can mount."""
# Note(ganso): The current method implementation is intended to work
# with Data Copy Service approach. If Manila Node is used for copying,
# then DHSS = true drivers may need to override this method.
service_ip = self.configuration.safe_get('migration_data_copy_node_ip')
return {'access_type': 'ip',
'access_level': 'rw',
'access_to': service_ip}
def copy_share_data(self, context, helper, share, share_instance,
share_server, new_share_instance, new_share_server,
migration_info_src, migration_info_dest):
# NOTE(ganso): This method is here because it is debatable if it can
# be overridden by a driver or not. Personally I think it should not,
# else it would be possible to lose compatibility with generic
# migration between backends, but allows the driver to use it on its
# own implementation if it wants to.
migrated = False
mount_path = self.configuration.safe_get('migration_tmp_location')
src_access = migration_info_src['access']
dest_access = migration_info_dest['access']
if None in (src_access['access_to'], dest_access['access_to']):
msg = _("Access rules not appropriate for mounting share instances"
" for migration of share %(share_id)s,"
" source share access: %(src_ip)s, destination share"
" access: %(dest_ip)s. Aborting.") % {
'src_ip': src_access['access_to'],
'dest_ip': dest_access['access_to'],
'share_id': share['id']}
raise exception.ShareMigrationFailed(reason=msg)
# NOTE(ganso): Removing any previously conflicting access rules, which
# would cause the following access_allow to fail for one instance.
helper.deny_migration_access(None, src_access, False)
helper.deny_migration_access(None, dest_access, False)
# NOTE(ganso): I would rather allow access to instances separately,
# but I require an access_id since it is a new access rule and
# destination manager must receive an access_id. I can either move
# this code to manager code so I can create the rule in DB manually,
# or ignore duplicate access rule errors for some specific scenarios.
try:
src_access_ref = helper.allow_migration_access(src_access)
except Exception as e:
LOG.error(_LE("Share migration failed attempting to allow "
"access of %(access_to)s to share "
"instance %(instance_id)s.") % {
'access_to': src_access['access_to'],
'instance_id': share_instance['id']})
msg = six.text_type(e)
LOG.exception(msg)
raise exception.ShareMigrationFailed(reason=msg)
try:
dest_access_ref = helper.allow_migration_access(dest_access)
except Exception as e:
LOG.error(_LE("Share migration failed attempting to allow "
"access of %(access_to)s to share "
"instance %(instance_id)s.") % {
'access_to': dest_access['access_to'],
'instance_id': new_share_instance['id']})
msg = six.text_type(e)
LOG.exception(msg)
helper.cleanup_migration_access(src_access_ref, src_access)
raise exception.ShareMigrationFailed(reason=msg)
# NOTE(ganso): From here we have the possibility of not cleaning
# anything when facing an error. At this moment, we have the
# destination instance in "inactive" state, while we are performing
# operations on the source instance. I think it is best to not clean
# the instance, leave it in "inactive" state, but try to clean
# temporary access rules, mounts, folders, etc, since no additional
# harm is done.
def _mount_for_migration(migration_info):
try:
utils.execute(*migration_info['mount'], run_as_root=True)
except Exception:
LOG.error(_LE("Failed to mount temporary folder for "
"migration of share instance "
"%(share_instance_id)s "
"to %(new_share_instance_id)s") % {
'share_instance_id': share_instance['id'],
'new_share_instance_id': new_share_instance['id']})
helper.cleanup_migration_access(
src_access_ref, src_access)
helper.cleanup_migration_access(
dest_access_ref, dest_access)
raise
utils.execute('mkdir', '-p',
''.join((mount_path, share_instance['id'])))
utils.execute('mkdir', '-p',
''.join((mount_path, new_share_instance['id'])))
# NOTE(ganso): mkdir command sometimes returns faster than it
# actually runs, so we better sleep for 1 second.
time.sleep(1)
try:
_mount_for_migration(migration_info_src)
except Exception as e:
LOG.error(_LE("Share migration failed attempting to mount "
"share instance %s.") % share_instance['id'])
msg = six.text_type(e)
LOG.exception(msg)
helper.cleanup_temp_folder(share_instance, mount_path)
helper.cleanup_temp_folder(new_share_instance, mount_path)
raise exception.ShareMigrationFailed(reason=msg)
try:
_mount_for_migration(migration_info_dest)
except Exception as e:
LOG.error(_LE("Share migration failed attempting to mount "
"share instance %s.") % new_share_instance['id'])
msg = six.text_type(e)
LOG.exception(msg)
helper.cleanup_unmount_temp_folder(share_instance,
migration_info_src)
helper.cleanup_temp_folder(share_instance, mount_path)
helper.cleanup_temp_folder(new_share_instance, mount_path)
raise exception.ShareMigrationFailed(reason=msg)
try:
ignore_list = self.configuration.safe_get('migration_ignore_files')
copy = share_utils.Copy(mount_path + share_instance['id'],
mount_path + new_share_instance['id'],
ignore_list)
copy.run()
if copy.get_progress()['total_progress'] == 100:
migrated = True
except Exception as e:
LOG.exception(six.text_type(e))
LOG.error(_LE("Failed to copy files for "
"migration of share instance %(share_instance_id)s "
"to %(new_share_instance_id)s") % {
'share_instance_id': share_instance['id'],
'new_share_instance_id': new_share_instance['id']})
# NOTE(ganso): For some reason I frequently get AMQP errors after
# copying finishes, which seems like is the service taking too long to
# copy while not replying heartbeat messages, so AMQP closes the
# socket. There is no impact, it just shows a big trace and AMQP
# reconnects after, although I would like to prevent this situation
# without the use of additional threads. Suggestions welcome.
utils.execute(*migration_info_src['umount'], run_as_root=True)
utils.execute(*migration_info_dest['umount'], run_as_root=True)
utils.execute('rmdir', ''.join((mount_path, share_instance['id'])),
check_exit_code=False)
utils.execute('rmdir', ''.join((mount_path, new_share_instance['id'])),
check_exit_code=False)
helper.deny_migration_access(src_access_ref, src_access)
helper.deny_migration_access(dest_access_ref, dest_access)
if not migrated:
msg = ("Copying from share instance %(instance_id)s "
"to %(new_instance_id)s did not succeed." % {
'instance_id': share_instance['id'],
'new_instance_id': new_share_instance['id']})
raise exception.ShareMigrationFailed(reason=msg)
LOG.debug("Copying completed in migration for share %s." % share['id'])
def create_share(self, context, share, share_server=None):
"""Is called to create share."""
raise NotImplementedError()

View File

@ -42,6 +42,8 @@ from manila import manager
from manila import quota
import manila.share.configuration
from manila.share import drivers_private_data
from manila.share import migration
from manila.share import rpcapi as share_rpcapi
from manila.share import utils as share_utils
from manila import utils
@ -129,7 +131,7 @@ def add_hooks(f):
class ShareManager(manager.SchedulerDependentManager):
"""Manages NAS storages."""
RPC_API_VERSION = '1.5'
RPC_API_VERSION = '1.6'
def __init__(self, share_driver=None, service_name=None, *args, **kwargs):
"""Load the driver from args, or from flags."""
@ -501,6 +503,166 @@ class ShareManager(manager.SchedulerDependentManager):
else:
return None
def get_migration_info(self, ctxt, share_instance_id, share_server):
share_instance = self.db.share_instance_get(
ctxt, share_instance_id, with_share_data=True)
return self.driver.get_migration_info(ctxt, share_instance,
share_server)
def get_driver_migration_info(self, ctxt, share_instance_id, share_server):
share_instance = self.db.share_instance_get(
ctxt, share_instance_id, with_share_data=True)
return self.driver.get_driver_migration_info(ctxt, share_instance,
share_server)
def migrate_share(self, ctxt, share_id, host, force_host_copy=False):
"""Migrates a share from current host to another host."""
LOG.debug("Entered migrate_share method for share %s." % share_id)
# NOTE(ganso): Cinder checks if driver is initialized before doing
# anything. This might not be needed, as this code may not be reached
# if driver service is not running. If for any reason service is
# running but driver is not, the following code should fail at specific
# points, which would be effectively the same as throwing an
# exception here.
rpcapi = share_rpcapi.ShareAPI()
share_ref = self.db.share_get(ctxt, share_id)
share_instance = self._get_share_instance(ctxt, share_ref)
moved = False
msg = None
self.db.share_update(
ctxt, share_ref['id'],
{'task_state': constants.STATUS_TASK_STATE_MIGRATION_MIGRATING})
if not force_host_copy:
try:
share_server = self._get_share_server(ctxt.elevated(),
share_instance)
dest_driver_migration_info = rpcapi.get_driver_migration_info(
ctxt, share_instance, share_server)
LOG.debug("Calling driver migration for share %s." % share_id)
moved, model_update = self.driver.migrate_share(
ctxt, share_instance, host, dest_driver_migration_info)
# NOTE(ganso): Here we are allowing the driver to perform
# changes even if it has not performed migration. While this
# scenario may not be valid, I am not sure if it should be
# forcefully prevented.
if model_update:
self.db.share_instance_update(ctxt, share_instance['id'],
model_update)
except exception.ManilaException as e:
msg = six.text_type(e)
LOG.exception(msg)
if not moved:
try:
LOG.debug("Starting generic migration "
"for share %s." % share_id)
moved = self._migrate_share_generic(ctxt, share_ref, host)
except Exception as e:
msg = six.text_type(e)
LOG.exception(msg)
LOG.error(_LE("Generic migration failed for"
" share %s.") % share_id)
if moved:
self.db.share_update(
ctxt, share_id,
{'task_state': constants.STATUS_TASK_STATE_MIGRATION_SUCCESS})
LOG.info(_LI("Share Migration for share %s"
" completed successfully.") % share_id)
else:
self.db.share_update(
ctxt, share_id,
{'task_state': constants.STATUS_TASK_STATE_MIGRATION_ERROR})
raise exception.ShareMigrationFailed(reason=msg)
def _migrate_share_generic(self, context, share, host):
rpcapi = share_rpcapi.ShareAPI()
share_instance = self._get_share_instance(context, share)
access_rule_timeout = self.driver.configuration.safe_get(
'migration_wait_access_rules_timeout')
create_delete_timeout = self.driver.configuration.safe_get(
'migration_create_delete_share_timeout')
helper = migration.ShareMigrationHelper(
context, self.db, create_delete_timeout,
access_rule_timeout, share)
# NOTE(ganso): We are going to save all access rules prior to removal.
# Since we may have several instances of the same share, it may be
# a good idea to limit or remove all instances/replicas' access
# so they remain unchanged as well during migration.
readonly_support = self.driver.configuration.safe_get(
'migration_readonly_support')
saved_rules = helper.change_to_read_only(readonly_support)
try:
new_share_instance = helper.create_instance_and_wait(
context, share, share_instance, host)
self.db.share_instance_update(
context, new_share_instance['id'],
{'status': constants.STATUS_INACTIVE}
)
LOG.debug("Time to start copying in migration"
" for share %s." % share['id'])
share_server = self._get_share_server(context.elevated(),
share_instance)
new_share_server = self._get_share_server(context.elevated(),
new_share_instance)
src_migration_info = self.driver.get_migration_info(
context, share_instance, share_server)
dest_migration_info = rpcapi.get_migration_info(
context, new_share_instance, new_share_server)
self.driver.copy_share_data(context, helper, share, share_instance,
share_server, new_share_instance,
new_share_server, src_migration_info,
dest_migration_info)
except Exception as e:
LOG.exception(six.text_type(e))
LOG.error(_LE("Share migration failed, reverting access rules for "
"share %s.") % share['id'])
helper.revert_access_rules(readonly_support, saved_rules)
raise
self.db.share_update(
context, share['id'],
{'task_state': constants.STATUS_TASK_STATE_MIGRATION_COMPLETING})
helper.revert_access_rules(readonly_support, saved_rules)
self.db.share_instance_update(context, new_share_instance['id'],
{'status': constants.STATUS_AVAILABLE})
helper.delete_instance_and_wait(context, share_instance)
return True
def _get_share_instance(self, context, share):
if isinstance(share, six.string_types):
id = share

348
manila/share/migration.py Normal file
View File

@ -0,0 +1,348 @@
# Copyright (c) 2015 Hitachi Data Systems.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Helper class for Share Migration."""
import time
from oslo_log import log
import six
from manila.common import constants
from manila import exception
from manila.i18n import _
from manila.i18n import _LE
from manila.i18n import _LW
from manila.share import api as share_api
from manila import utils
LOG = log.getLogger(__name__)
class ShareMigrationHelper(object):
def __init__(self, context, db, create_delete_timeout, access_rule_timeout,
share):
self.db = db
self.share = share
self.context = context
self.api = share_api.API()
self.migration_create_delete_share_timeout = create_delete_timeout
self.migration_wait_access_rules_timeout = access_rule_timeout
def delete_instance_and_wait(self, context, share_instance):
self.api.delete_instance(context, share_instance, True)
# Wait for deletion.
starttime = time.time()
deadline = starttime + self.migration_create_delete_share_timeout
tries = -1
instance = "Something not None"
while instance:
try:
instance = self.db.share_instance_get(context,
share_instance['id'])
tries += 1
now = time.time()
if now > deadline:
msg = _("Timeout trying to delete instance "
"%s") % share_instance['id']
raise exception.ShareMigrationFailed(reason=msg)
except exception.NotFound:
instance = None
else:
time.sleep(tries ** 2)
def create_instance_and_wait(self, context, share, share_instance, host):
api = share_api.API()
new_share_instance = api.create_instance(
context, share, share_instance['share_network_id'], host['host'])
# Wait for new_share_instance to become ready
starttime = time.time()
deadline = starttime + self.migration_create_delete_share_timeout
new_share_instance = self.db.share_instance_get(
context, new_share_instance['id'], with_share_data=True)
tries = 0
while new_share_instance['status'] != constants.STATUS_AVAILABLE:
tries += 1
now = time.time()
if new_share_instance['status'] == constants.STATUS_ERROR:
msg = _("Failed to create new share instance"
" (from %(share_id)s) on "
"destination host %(host_name)s") % {
'share_id': share['id'], 'host_name': host['host']}
raise exception.ShareMigrationFailed(reason=msg)
elif now > deadline:
msg = _("Timeout creating new share instance "
"(from %(share_id)s) on "
"destination host %(host_name)s") % {
'share_id': share['id'], 'host_name': host['host']}
raise exception.ShareMigrationFailed(reason=msg)
else:
time.sleep(tries ** 2)
new_share_instance = self.db.share_instance_get(
context, new_share_instance['id'], with_share_data=True)
return new_share_instance
def deny_rules_and_wait(self, context, share, saved_rules):
api = share_api.API()
# Deny each one.
for instance in share.instances:
for access in saved_rules:
api.deny_access_to_instance(context, instance, access)
# Wait for all rules to be cleared.
starttime = time.time()
deadline = starttime + self.migration_wait_access_rules_timeout
tries = 0
rules = self.db.share_access_get_all_for_share(context, share['id'])
while len(rules) > 0:
tries += 1
now = time.time()
if now > deadline:
msg = _("Timeout removing access rules from share "
"%(share_id)s. Timeout was %(timeout)s seconds.") % {
'share_id': share['id'],
'timeout': self.migration_wait_access_rules_timeout}
raise exception.ShareMigrationFailed(reason=msg)
else:
time.sleep(tries ** 2)
rules = self.db.share_access_get_all_for_share(
context, share['id'])
def add_rules_and_wait(self, context, share, saved_rules,
access_level=None):
for access in saved_rules:
if access_level:
level = access_level
else:
level = access['access_level']
self.api.allow_access(context, share, access['access_type'],
access['access_to'], level)
starttime = time.time()
deadline = starttime + self.migration_wait_access_rules_timeout
rules_added = False
tries = 0
rules = self.db.share_access_get_all_for_share(context, share['id'])
while not rules_added:
rules_added = True
tries += 1
now = time.time()
for access in rules:
if access['state'] != constants.STATUS_ACTIVE:
rules_added = False
if rules_added:
break
if now > deadline:
msg = _("Timeout adding access rules for share "
"%(share_id)s. Timeout was %(timeout)s seconds.") % {
'share_id': share['id'],
'timeout': self.migration_wait_access_rules_timeout}
raise exception.ShareMigrationFailed(reason=msg)
else:
time.sleep(tries ** 2)
rules = self.db.share_access_get_all_for_share(
context, share['id'])
def wait_for_allow_access(self, access_ref):
starttime = time.time()
deadline = starttime + self.migration_wait_access_rules_timeout
tries = 0
rule = self.api.access_get(self.context, access_ref['id'])
while rule['state'] != constants.STATUS_ACTIVE:
tries += 1
now = time.time()
if rule['state'] == constants.STATUS_ERROR:
msg = _("Failed to allow access"
" on share %s") % self.share['id']
raise exception.ShareMigrationFailed(reason=msg)
elif now > deadline:
msg = _("Timeout trying to allow access"
" on share %(share_id)s. Timeout "
"was %(timeout)s seconds.") % {
'share_id': self.share['id'],
'timeout': self.migration_wait_access_rules_timeout}
raise exception.ShareMigrationFailed(reason=msg)
else:
time.sleep(tries ** 2)
rule = self.api.access_get(self.context, access_ref['id'])
return rule
def wait_for_deny_access(self, access_ref):
starttime = time.time()
deadline = starttime + self.migration_wait_access_rules_timeout
tries = -1
rule = "Something not None"
while rule:
try:
rule = self.api.access_get(self.context, access_ref['id'])
tries += 1
now = time.time()
if now > deadline:
msg = _("Timeout trying to deny access"
" on share %(share_id)s. Timeout "
"was %(timeout)s seconds.") % {
'share_id': self.share['id'],
'timeout': self.migration_wait_access_rules_timeout}
raise exception.ShareMigrationFailed(reason=msg)
except exception.NotFound:
rule = None
else:
time.sleep(tries ** 2)
def allow_migration_access(self, access):
allowed = False
access_ref = None
try:
access_ref = self.api.allow_access(
self.context, self.share,
access['access_type'], access['access_to'])
allowed = True
except exception.ShareAccessExists:
LOG.warning(_LW("Access rule already allowed. "
"Access %(access_to)s - Share "
"%(share_id)s") % {
'access_to': access['access_to'],
'share_id': self.share['id']})
access_list = self.api.access_get_all(self.context, self.share)
for access_item in access_list:
if access_item['access_to'] == access['access_to']:
access_ref = access_item
if access_ref and allowed:
access_ref = self.wait_for_allow_access(access_ref)
return access_ref
def deny_migration_access(self, access_ref, access, throw_not_found=True):
denied = False
if access_ref:
try:
# Update status
access_ref = self.api.access_get(
self.context, access_ref['id'])
except exception.NotFound:
access_ref = None
LOG.warning(_LW("Access rule not found. "
"Access %(access_to)s - Share "
"%(share_id)s") % {
'access_to': access['access_to'],
'share_id': self.share['id']})
else:
access_list = self.api.access_get_all(self.context, self.share)
for access_item in access_list:
if access_item['access_to'] == access['access_to']:
access_ref = access_item
break
if access_ref:
try:
self.api.deny_access(self.context, self.share, access_ref)
denied = True
except (exception.InvalidShareAccess, exception.NotFound) as e:
LOG.exception(six.text_type(e))
LOG.warning(_LW("Access rule not found. "
"Access %(access_to)s - Share "
"%(share_id)s") % {
'access_to': access['access_to'],
'share_id': self.share['id']})
if throw_not_found:
raise
if denied:
self.wait_for_deny_access(access_ref)
# NOTE(ganso): Cleanup methods do not throw exception, since the
# exceptions that should be thrown are the ones that call the cleanup
def cleanup_migration_access(self, access_ref, access):
try:
self.deny_migration_access(access_ref, access)
except Exception as mae:
LOG.exception(six.text_type(mae))
LOG.error(_LE("Could not cleanup access rule of share "
"%s") % self.share['id'])
def cleanup_temp_folder(self, instance, mount_path):
try:
utils.execute('rmdir', mount_path + instance['id'],
check_exit_code=False)
except Exception as tfe:
LOG.exception(six.text_type(tfe))
LOG.error(_LE("Could not cleanup instance %(instance_id)s "
"temporary folders for migration of "
"share %(share_id)s") % {
'instance_id': instance['id'],
'share_id': self.share['id']})
def cleanup_unmount_temp_folder(self, instance, migration_info):
try:
utils.execute(*migration_info['umount'], run_as_root=True)
except Exception as utfe:
LOG.exception(six.text_type(utfe))
LOG.error(_LE("Could not unmount folder of instance"
" %(instance_id)s for migration of "
"share %(share_id)s") % {
'instance_id': instance['id'],
'share_id': self.share['id']})
def change_to_read_only(self, readonly_support):
# NOTE(ganso): If the share does not allow readonly mode we
# should remove all access rules and prevent any access
saved_rules = self.db.share_access_get_all_for_share(
self.context, self.share['id'])
self.deny_rules_and_wait(self.context, self.share, saved_rules)
if readonly_support:
LOG.debug("Changing all of share %s access rules "
"to read-only." % self.share['id'])
self.add_rules_and_wait(self.context, self.share,
saved_rules, 'ro')
return saved_rules
def revert_access_rules(self, readonly_support, saved_rules):
if readonly_support:
readonly_rules = self.db.share_access_get_all_for_share(
self.context, self.share['id'])
LOG.debug("Removing all of share %s read-only "
"access rules." % self.share['id'])
self.deny_rules_and_wait(self.context, self.share, readonly_rules)
self.add_rules_and_wait(self.context, self.share, saved_rules)

View File

@ -39,9 +39,12 @@ class ShareAPI(object):
create_share() -> create_share_instance()
delete_share() -> delete_share_instance()
Add share_instance argument to allow_access() & deny_access()
1.5 - Add create_consistency_group, delete_consistency_group
create_cgsnapshot, and delete_cgsnapshot methods
1.6 - Introduce Share migration:
migrate_share()
get_migration_info()
get_driver_migration_info()
"""
BASE_RPC_API_VERSION = '1.0'
@ -50,7 +53,7 @@ class ShareAPI(object):
super(ShareAPI, self).__init__()
target = messaging.Target(topic=CONF.share_topic,
version=self.BASE_RPC_API_VERSION)
self.client = rpc.get_client(target, version_cap='1.5')
self.client = rpc.get_client(target, version_cap='1.6')
def create_share_instance(self, ctxt, share_instance, host,
request_spec, filter_properties,
@ -86,6 +89,28 @@ class ShareAPI(object):
cctxt.cast(ctxt, 'delete_share_instance',
share_instance_id=share_instance['id'])
def migrate_share(self, ctxt, share, dest_host, force_host_copy):
new_host = utils.extract_host(share['host'])
cctxt = self.client.prepare(server=new_host, version='1.6')
host_p = {'host': dest_host.host,
'capabilities': dest_host.capabilities}
cctxt.cast(ctxt, 'migrate_share', share_id=share['id'],
host=host_p, force_host_copy=force_host_copy)
def get_migration_info(self, ctxt, share_instance, share_server):
new_host = utils.extract_host(share_instance['host'])
cctxt = self.client.prepare(server=new_host, version='1.6')
return cctxt.call(ctxt, 'get_migration_info',
share_instance_id=share_instance['id'],
share_server=share_server)
def get_driver_migration_info(self, ctxt, share_instance, share_server):
new_host = utils.extract_host(share_instance['host'])
cctxt = self.client.prepare(server=new_host, version='1.6')
return cctxt.call(ctxt, 'get_driver_migration_info',
share_instance_id=share_instance['id'],
share_server=share_server)
def delete_share_server(self, ctxt, share_server):
host = utils.extract_host(share_server['host'])
cctxt = self.client.prepare(server=host, version='1.0')

View File

@ -14,9 +14,16 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Share-related Utilities and helpers."""
import os
import shutil
from oslo_log import log
import six
LOG = log.getLogger(__name__)
DEFAULT_POOL_NAME = '_pool0'
@ -73,3 +80,97 @@ def append_host(host, pool):
new_host = "#".join([host, pool])
return new_host
class Copy(object):
def __init__(self, src, dest, ignore_list):
self.src = src
self.dest = dest
self.totalSize = 0
self.currentSize = 0
self.files = []
self.dirs = []
self.currentCopy = None
self.ignoreList = ignore_list
def get_progress(self):
if self.currentCopy is not None:
try:
(mode, ino, dev, nlink, uid, gid, size, atime, mtime,
ctime) = os.stat(self.currentCopy['file_path'])
except OSError:
size = 0
total_progress = 0
if self.totalSize > 0:
total_progress = self.currentSize * 100 / self.totalSize
current_file_progress = 0
if self.currentCopy['size'] > 0:
current_file_progress = size * 100 / self.currentCopy['size']
current_file_path = six.text_type(self.currentCopy['file_path'])
progress = {
'total_progress': total_progress,
'current_file_path': current_file_path,
'current_file_progress': current_file_progress
}
return progress
else:
return {'total_progress': 100}
def run(self):
self.explore(self.src)
self.copy(self.src, self.dest)
LOG.info((six.text_type(self.get_progress())))
def copy(self, src, dest):
# Create dirs with max permissions so files can be copied
for dir_item in self.dirs:
new_dir = dir_item['name'].replace(src, dest)
os.mkdir(new_dir)
for file_item in self.files:
file_path = file_item['name'].replace(src, dest)
self.currentCopy = {'file_path': file_path,
'size': file_item['attr']}
LOG.info(six.text_type(self.get_progress()))
shutil.copy2(file_item['name'],
file_item['name'].replace(src, dest))
self.currentSize += file_item['attr']
# Set permissions to dirs
for dir_item in self.dirs:
new_dir = dir_item['name'].replace(src, dest)
shutil.copystat(dir_item['name'], new_dir)
def explore(self, path):
for dirpath, dirnames, filenames in os.walk(path):
for dirname in dirnames:
if dirname not in self.ignoreList:
dir_item = os.path.join(dirpath, dirname)
(mode, ino, dev, nlink, uid, gid, size, atime, mtime,
ctime) = os.stat(dir_item)
self.dirs.append({'name': dir_item,
'attr': mode})
for filename in filenames:
if filename not in self.ignoreList:
file_item = os.path.join(dirpath, filename)
(mode, ino, dev, nlink, uid, gid, size, atime, mtime,
ctime) = os.stat(file_item)
self.files.append({'name': file_item,
'attr': size})
self.totalSize += size

View File

@ -21,6 +21,7 @@ from oslo_config import cfg
import webob
from manila.api import common
from manila.api.openstack import api_version_request as api_version
from manila.api.v1 import shares
from manila.common import constants
from manila import context
@ -31,11 +32,20 @@ from manila.share import share_types
from manila import test
from manila.tests.api.contrib import stubs
from manila.tests.api import fakes
from manila.tests import db_utils
from manila import utils
CONF = cfg.CONF
def app():
# no auth, just let environ['manila.context'] pass through
api = fakes.router.APIRouter()
mapper = fakes.urlmap.URLMap()
mapper['/v1'] = api
return mapper
class ShareApiTest(test.TestCase):
"""Share Api Test."""
def setUp(self):
@ -91,6 +101,7 @@ class ShareApiTest(test.TestCase):
'snapshot_id': '2',
'share_network_id': None,
'status': 'fakestatus',
'task_state': None,
'share_type': '1',
'volume_type': '1',
'is_public': False,
@ -196,6 +207,62 @@ class ShareApiTest(test.TestCase):
self.assertEqual(create_mock.call_args[1]['share_network_id'],
"fakenetid")
def test_migrate_share(self):
share = db_utils.create_share()
req = fakes.HTTPRequest.blank('/shares/%s/action' % share['id'],
use_admin_context=True)
req.method = 'POST'
req.headers['content-type'] = 'application/json'
req.api_version_request = api_version.APIVersionRequest('1.6')
req.api_version_request.experimental = True
body = {'os-migrate_share': {'host': 'fake_host'}}
self.mock_object(share_api.API, 'migrate_share')
self.controller.migrate_share(req, share['id'], body)
def test_migrate_share_no_share_id(self):
req = fakes.HTTPRequest.blank('/shares/%s/action' % 'fake_id',
use_admin_context=True)
req.method = 'POST'
req.headers['content-type'] = 'application/json'
req.api_version_request = api_version.APIVersionRequest('1.6')
req.api_version_request.experimental = True
body = {'os-migrate_share': {'host': 'fake_host'}}
self.mock_object(share_api.API, 'migrate_share')
self.mock_object(share_api.API, 'get',
mock.Mock(side_effect=[exception.NotFound]))
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.migrate_share,
req, 'fake_id', body)
def test_migrate_share_no_host(self):
share = db_utils.create_share()
req = fakes.HTTPRequest.blank('/shares/%s/action' % share['id'],
use_admin_context=True)
req.method = 'POST'
req.headers['content-type'] = 'application/json'
req.api_version_request = api_version.APIVersionRequest('1.6')
req.api_version_request.experimental = True
body = {'os-migrate_share': {}}
self.mock_object(share_api.API, 'migrate_share')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.migrate_share,
req, share['id'], body)
def test_migrate_share_no_host_invalid_force_host_copy(self):
share = db_utils.create_share()
req = fakes.HTTPRequest.blank('/shares/%s/action' % share['id'],
use_admin_context=True)
req.method = 'POST'
req.headers['content-type'] = 'application/json'
req.api_version_request = api_version.APIVersionRequest('1.6')
req.api_version_request.experimental = True
body = {'os-migrate_share': {'host': 'fake_host',
'force_host_copy': 'fake'}}
self.mock_object(share_api.API, 'migrate_share')
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.migrate_share,
req, share['id'], body)
def test_share_create_from_snapshot_without_share_net_no_parent(self):
shr = {
"size": 100,
@ -625,6 +692,7 @@ class ShareApiTest(test.TestCase):
'shares': [
{
'status': 'fakestatus',
'task_state': None,
'description': 'displaydesc',
'export_location': 'fake_location',
'export_locations': ['fake_location', 'fake_location2'],
@ -670,6 +738,7 @@ class ShareApiTest(test.TestCase):
'shares': [
{
'status': 'fakestatus',
'task_state': None,
'description': 'displaydesc',
'export_location': 'fake_location',
'export_locations': ['fake_location', 'fake_location2'],

View File

@ -347,3 +347,44 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
request_spec)
self.assertEqual(2, len(hosts))
def _host_passes_filters_setup(self, mock_obj):
sched = fakes.FakeFilterScheduler()
sched.host_manager = fakes.FakeHostManager()
fake_context = context.RequestContext('user', 'project',
is_admin=True)
fakes.mock_host_manager_db_calls(mock_obj)
return (sched, fake_context)
@mock.patch('manila.db.service_get_all_by_topic')
def test_host_passes_filters_happy_day(self, _mock_service_get_topic):
sched, ctx = self._host_passes_filters_setup(
_mock_service_get_topic)
request_spec = {'share_id': 1,
'share_type': {'name': 'fake_type'},
'share_instance_properties': {},
'share_properties': {'project_id': 1,
'size': 1}}
ret_host = sched.host_passes_filters(ctx, 'host1#_pool0',
request_spec, {})
self.assertEqual('host1#_pool0', ret_host.host)
self.assertTrue(_mock_service_get_topic.called)
@mock.patch('manila.db.service_get_all_by_topic')
def test_host_passes_filters_no_capacity(self, _mock_service_get_topic):
sched, ctx = self._host_passes_filters_setup(
_mock_service_get_topic)
request_spec = {'share_id': 1,
'share_type': {'name': 'fake_type'},
'share_instance_properties': {},
'share_properties': {'project_id': 1,
'size': 1024}}
self.assertRaises(exception.NoValidHost,
sched.host_passes_filters,
ctx, 'host3#_pool0', request_spec, {})
self.assertTrue(_mock_service_get_topic.called)

View File

@ -100,3 +100,13 @@ class SchedulerRpcAPITestCase(test.TestCase):
request_spec='fake_request_spec',
filter_properties='filter_properties',
version='1.3')
def test_migrate_share_to_host(self):
self._test_scheduler_api('migrate_share_to_host',
rpc_method='cast',
share_id='share_id',
host='host',
force_host_copy=True,
request_spec='fake_request_spec',
filter_properties='filter_properties',
version='1.4')

View File

@ -152,6 +152,31 @@ class SchedulerManagerTestCase(test.TestCase):
request_spec=request_spec,
filter_properties={})
def test_migrate_share_to_host(self):
share = db_utils.create_share()
host = 'fake@backend#pool'
self.mock_object(db, 'share_get', mock.Mock(return_value=share))
self.mock_object(share_rpcapi.ShareAPI, 'migrate_share')
self.mock_object(driver.Scheduler, 'host_passes_filters',
mock.Mock(return_value=host))
self.manager.migrate_share_to_host(self.context, share['id'], host,
False, {}, None)
def test_migrate_share_to_host_no_valid_host(self):
share = db_utils.create_share()
host = 'fake@backend#pool'
self.mock_object(
driver.Scheduler, 'host_passes_filters',
mock.Mock(side_effect=[exception.NoValidHost('fake')]))
self.manager.migrate_share_to_host(self.context, share['id'], host,
False, {}, None)
class SchedulerTestCase(test.TestCase):
"""Test case for base scheduler driver class."""

View File

@ -812,6 +812,7 @@ class ShareAPITestCase(test.TestCase):
'status': constants.STATUS_AVAILABLE,
'user_id': self.context.user_id,
'project_id': self.context.project_id,
'task_state': None
}
self.mock_object(db_api, 'share_update', mock.Mock())
@ -822,6 +823,20 @@ class ShareAPITestCase(test.TestCase):
db_api.share_update.assert_called_once_with(
mock.ANY, share_data['id'], mock.ANY)
def test_unmanage_task_state_busy(self):
share_data = {
'id': 'fakeid',
'host': 'fake',
'size': '1',
'status': constants.STATUS_AVAILABLE,
'user_id': self.context.user_id,
'project_id': self.context.project_id,
'task_state': constants.STATUS_TASK_STATE_MIGRATION_MIGRATING
}
self.assertRaises(exception.InvalidShare, self.api.unmanage,
self.context, share_data)
@mock.patch.object(quota.QUOTAS, 'reserve',
mock.Mock(return_value='reservation'))
@mock.patch.object(quota.QUOTAS, 'commit', mock.Mock())
@ -1060,6 +1075,15 @@ class ShareAPITestCase(test.TestCase):
share
)
def test_delete_share_part_of_migration(self):
share = db_utils.create_share(
status=constants.STATUS_AVAILABLE,
task_state=constants.STATUS_TASK_STATE_MIGRATION_MIGRATING)
self.assertRaises(exception.InvalidShare,
self.api.delete,
self.context, share)
def test_delete_share_quota_error(self):
share = self._setup_delete_mocks(constants.STATUS_AVAILABLE)
self.mock_object(quota.QUOTAS, 'reserve',
@ -1321,6 +1345,16 @@ class ShareAPITestCase(test.TestCase):
self.share_rpcapi.deny_access.assert_called_once_with(
self.context, utils.IsAMatcher(models.ShareInstance), access)
def test_deny_access_not_found(self):
share = db_utils.create_share(status=constants.STATUS_AVAILABLE)
access = db_utils.create_access(state=constants.STATUS_ACTIVE,
share_id=share['id'])
self.mock_object(db_api, 'share_instance_access_get',
mock.Mock(side_effect=[exception.NotFound('fake')]))
self.api.deny_access(self.context, share, access)
share_api.policy.check_policy.assert_called_with(
self.context, 'share', 'deny_access')
def test_deny_access_not_active_not_error(self):
share = db_utils.create_share(status=constants.STATUS_AVAILABLE)
access = db_utils.create_access(share_id=share['id'])
@ -1356,6 +1390,7 @@ class ShareAPITestCase(test.TestCase):
def test_access_get_all(self):
share = db_utils.create_share(id='fakeid')
expected = {
'fakeacc0id': {
'id': 'fakeacc0id',
@ -1381,7 +1416,6 @@ class ShareAPITestCase(test.TestCase):
self.mock_object(db_api, 'share_access_get_all_for_share',
mock.Mock(return_value=rules))
actual = self.api.access_get_all(self.context, share)
for access in actual:
expected_access = expected[access['id']]
@ -1497,6 +1531,106 @@ class ShareAPITestCase(test.TestCase):
self.context, share, new_size
)
def test_migrate_share(self):
host = 'fake2@backend#pool'
share = db_utils.create_share(
status=constants.STATUS_AVAILABLE,
host='fake@backend#pool', share_type_id='fake_type_id')
request_spec = {'share_properties': share,
'share_instance_properties': share.instance.to_dict(),
'share_type': 'fake_type',
'share_id': share['id']}
self.mock_object(self.scheduler_rpcapi, 'migrate_share_to_host')
self.mock_object(share_types, 'get_share_type',
mock.Mock(return_value='fake_type'))
self.mock_object(utils, 'validate_service_host')
self.api.migrate_share(self.context, share, host, True)
self.scheduler_rpcapi.migrate_share_to_host.assert_called_once_with(
self.context, share['id'], host, True, request_spec)
def test_migrate_share_status_unavailable(self):
host = 'fake2@backend#pool'
share = db_utils.create_share(
status=constants.STATUS_ERROR)
mock_log = self.mock_object(share_api, 'LOG')
self.assertRaises(exception.InvalidShare, self.api.migrate_share,
self.context, share, host, True)
self.assertTrue(mock_log.error.called)
def test_migrate_share_task_state_invalid(self):
host = 'fake2@backend#pool'
share = db_utils.create_share(
status=constants.STATUS_AVAILABLE,
task_state=constants.STATUS_TASK_STATE_MIGRATION_MIGRATING)
mock_log = self.mock_object(share_api, 'LOG')
self.assertRaises(exception.InvalidShare, self.api.migrate_share,
self.context, share, host, True)
self.assertTrue(mock_log.error.called)
def test_migrate_share_with_snapshots(self):
host = 'fake2@backend#pool'
share = db_utils.create_share(
host='fake@backend#pool', status=constants.STATUS_AVAILABLE)
self.mock_object(db_api, 'share_snapshot_get_all_for_share',
mock.Mock(return_value=True))
mock_log = self.mock_object(share_api, 'LOG')
self.assertRaises(exception.InvalidShare, self.api.migrate_share,
self.context, share, host, True)
self.assertTrue(mock_log.error.called)
def test_migrate_share_invalid_host(self):
host = 'fake@backend#pool'
share = db_utils.create_share(
host='fake2@backend', status=constants.STATUS_AVAILABLE)
self.mock_object(db_api, 'share_snapshot_get_all_for_share',
mock.Mock(return_value=False))
self.assertRaises(exception.ServiceNotFound,
self.api.migrate_share,
self.context, share, host, True)
def test_migrate_share_same_host(self):
host = 'fake@backend#pool'
share = db_utils.create_share(
host='fake@backend#pool', status=constants.STATUS_AVAILABLE)
mock_log = self.mock_object(share_api, 'LOG')
self.assertRaises(exception.InvalidHost,
self.api.migrate_share,
self.context, share, host, True)
self.assertTrue(mock_log.error.called)
def test_migrate_share_exception(self):
host = 'fake2@backend#pool'
share = db_utils.create_share(
host='fake@backend#pool', status=constants.STATUS_AVAILABLE)
self.mock_object(utils, 'validate_service_host')
self.mock_object(db_api, 'share_snapshot_get_all_for_share',
mock.Mock(return_value=False))
self.mock_object(db_api, 'share_update', mock.Mock(return_value=True))
self.mock_object(self.scheduler_rpcapi, 'migrate_share_to_host',
mock.Mock(side_effect=exception.ShareMigrationFailed(
reason='fake')))
self.assertRaises(exception.ShareMigrationFailed,
self.api.migrate_share,
self.context, share, host, True)
db_api.share_update.assert_any_call(
mock.ANY, share['id'], mock.ANY)
class OtherTenantsShareActionsTestCase(test.TestCase):
def setUp(self):

View File

@ -20,11 +20,16 @@ import time
import ddt
import mock
from manila.common import constants
from manila import exception
from manila import network
from manila.share import configuration
from manila.share import driver
from manila.share import migration
from manila.share import rpcapi
from manila.share import utils as share_utils
from manila import test
from manila.tests import db_utils
from manila.tests import utils as test_utils
from manila import utils
@ -314,3 +319,335 @@ class ShareDriverTestCase(test.TestCase):
"fake_context", share_instances)
self.assertEqual(share_instances, result)
def test_migrate_share(self):
driver.CONF.set_default('driver_handles_share_servers', False)
share_driver = driver.ShareDriver(False)
self.assertEqual((None, None),
share_driver.migrate_share(None, None, None, None))
def test_get_driver_migration_info_default(self):
driver.CONF.set_default('driver_handles_share_servers', False)
share_driver = driver.ShareDriver(False)
self.assertIsNone(
share_driver.get_driver_migration_info(None, None, None), None)
def test_get_migration_info_default(self):
expected = {'mount': ['mount', '-t', 'fake_proto', '/fake/fake_id',
'/tmp/fake_id'],
'umount': ['umount', '/tmp/fake_id'],
'access': {'access_type': 'ip',
'access_level': 'rw',
'access_to': None}}
fake_share = {'id': 'fake_id',
'share_proto': 'fake_proto',
'export_locations': [{'path': '/fake/fake_id'}]}
driver.CONF.set_default('driver_handles_share_servers', False)
share_driver = driver.ShareDriver(False)
share_driver.configuration = configuration.Configuration(None)
migration_info = share_driver.get_migration_info(None,
fake_share,
"fake_server")
self.assertEqual(expected, migration_info)
def test_get_migration_info_parameters(self):
expected = {'mount': ['fake_mount', '/200.200.200.200/fake_id',
'/tmp/fake_id'],
'umount': ['umount', '/tmp/fake_id'],
'access': {'access_type': 'ip',
'access_level': 'rw',
'access_to': '100.100.100.100'}}
fake_share = {'id': 'fake_id',
'export_locations': [{'path': '/5.5.5.5/fake_id'}]}
driver.CONF.set_default('driver_handles_share_servers', False)
driver.CONF.set_default('migration_protocol_mount_command',
'fake_mount')
driver.CONF.set_default('migration_mounting_backend_ip',
'200.200.200.200')
driver.CONF.set_default('migration_data_copy_node_ip',
'100.100.100.100')
share_driver = driver.ShareDriver(False)
share_driver.configuration = configuration.Configuration(None)
migration_info = share_driver.get_migration_info(None,
fake_share,
"fake_server")
self.assertEqual(expected, migration_info)
def _setup_mocks_copy_share_data(self):
get_migration_info_value = {'mount': 'fake',
'umount': 'fake',
'access':
{'access_type': 'fake',
'access_to': 'fake'}}
self.mock_object(rpcapi.ShareAPI, 'get_migration_info',
mock.Mock(return_value=get_migration_info_value))
self.mock_object(driver.ShareDriver, 'get_migration_info',
mock.Mock(return_value=get_migration_info_value))
self.mock_object(share_utils.Copy, 'run')
self.mock_object(time, 'sleep')
driver.CONF.set_default('driver_handles_share_servers', False)
share_driver = driver.ShareDriver(
False, configuration=configuration.Configuration(None))
return share_driver
def test_copy_share_data(self):
fake_share = db_utils.create_share(
id='fakeid', status=constants.STATUS_AVAILABLE, host='fake_host')
fake_share_instance = {'id': 'fake_id', 'host': 'fake_host'}
share_driver = self._setup_mocks_copy_share_data()
remote = {'access': {'access_to': '192.168.0.1'},
'mount': 'fake_mount',
'umount': 'fake_umount'}
local = {'access': {'access_to': '192.168.1.1'},
'mount': 'fake_mount',
'umount': 'fake_umount'}
helper = migration.ShareMigrationHelper(None, None, None, None, None)
driver.CONF.set_default('migration_tmp_location', '/fake/path')
driver.CONF.set_default('migration_ignore_files', None)
self.mock_object(migration.ShareMigrationHelper,
'deny_migration_access')
self.mock_object(migration.ShareMigrationHelper,
'allow_migration_access',
mock.Mock(return_value='fake_access_ref'))
self.mock_object(utils, 'execute')
self.mock_object(share_utils.Copy, 'get_progress', mock.Mock(
return_value={'total_progress': 100}))
share_driver.copy_share_data('ctx', helper, fake_share,
fake_share_instance, None,
fake_share_instance, None,
local, remote)
args = ((None, local['access'], False),
(None, remote['access'], False),
('fake_access_ref', local['access']),
('fake_access_ref', remote['access']))
migration.ShareMigrationHelper.deny_migration_access.assert_has_calls(
[mock.call(*a) for a in args])
def test_copy_share_data_failed(self):
fake_share = db_utils.create_share(
id='fakeid', status=constants.STATUS_AVAILABLE, host='fake_host')
fake_share_instance = {'id': 'fake_id', 'host': 'fake_host'}
share_driver = self._setup_mocks_copy_share_data()
remote = {'access': {'access_to': '192.168.0.1'},
'mount': 'fake_mount',
'umount': 'fake_umount'}
local = {'access': {'access_to': '192.168.1.1'},
'mount': 'fake_mount',
'umount': 'fake_umount'}
helper = migration.ShareMigrationHelper(None, None, None, None, None)
driver.CONF.set_default('migration_tmp_location', '/fake/path')
driver.CONF.set_default('migration_ignore_files', None)
self.mock_object(migration.ShareMigrationHelper,
'deny_migration_access')
self.mock_object(migration.ShareMigrationHelper,
'allow_migration_access',
mock.Mock(return_value='fake_access_ref'))
self.mock_object(utils, 'execute')
self.mock_object(share_utils.Copy, 'get_progress', mock.Mock(
return_value=None))
self.assertRaises(exception.ShareMigrationFailed,
share_driver.copy_share_data, 'ctx', helper,
fake_share, fake_share_instance, None,
fake_share_instance, None, local, remote)
args = ((None, local['access'], False),
(None, remote['access'], False))
migration.ShareMigrationHelper.deny_migration_access.assert_has_calls(
[mock.call(*a) for a in args])
def test_copy_share_data_local_access_exception(self):
fake_share = db_utils.create_share(
id='fakeid', status=constants.STATUS_AVAILABLE, host='fake_host')
fake_share_instance = {'id': 'fake_id', 'host': 'fake_host'}
share_driver = self._setup_mocks_copy_share_data()
remote = {'access': {'access_to': '192.168.0.1'},
'mount': 'fake_mount',
'umount': 'fake_umount'}
local = {'access': {'access_to': '192.168.1.1'},
'mount': 'fake_mount',
'umount': 'fake_umount'}
helper = migration.ShareMigrationHelper(None, None, None, None, None)
driver.CONF.set_default('migration_tmp_location', '/fake/path')
driver.CONF.set_default('migration_ignore_files', None)
self.mock_object(migration.ShareMigrationHelper,
'deny_migration_access')
self.mock_object(
migration.ShareMigrationHelper,
'allow_migration_access',
mock.Mock(side_effect=[
exception.ShareMigrationFailed(reason='fake')]))
self.assertRaises(exception.ShareMigrationFailed,
share_driver.copy_share_data, 'ctx', helper,
fake_share, fake_share_instance, None,
fake_share_instance, None, local, remote)
args = ((None, local['access'], False),
(None, remote['access'], False))
migration.ShareMigrationHelper.deny_migration_access.assert_has_calls(
[mock.call(*a) for a in args])
def test_copy_share_data_remote_access_exception(self):
fake_share = db_utils.create_share(
id='fakeid', status=constants.STATUS_AVAILABLE, host='fake_host')
fake_share_instance = {'id': 'fake_id', 'host': 'fake_host'}
share_driver = self._setup_mocks_copy_share_data()
remote = {'access': {'access_to': '192.168.0.1'},
'mount': 'fake_mount',
'umount': 'fake_umount'}
local = {'access': {'access_to': '192.168.1.1'},
'mount': 'fake_mount',
'umount': 'fake_umount'}
helper = migration.ShareMigrationHelper(None, None, None, None, None)
driver.CONF.set_default('migration_tmp_location', '/fake/path')
driver.CONF.set_default('migration_ignore_files', None)
self.mock_object(migration.ShareMigrationHelper,
'deny_migration_access')
self.mock_object(
migration.ShareMigrationHelper,
'allow_migration_access',
mock.Mock(side_effect=[None,
exception.ShareMigrationFailed(
reason='fake')]))
self.mock_object(migration.ShareMigrationHelper,
'cleanup_migration_access')
self.assertRaises(exception.ShareMigrationFailed,
share_driver.copy_share_data, 'ctx', helper,
fake_share, fake_share_instance, None,
fake_share_instance, None, local, remote)
args = ((None, local['access'], False),
(None, remote['access'], False))
migration.ShareMigrationHelper.deny_migration_access.assert_has_calls(
[mock.call(*a) for a in args])
def test_copy_share_data_mount_for_migration_exception(self):
fake_share = db_utils.create_share(
id='fakeid', status=constants.STATUS_AVAILABLE, host='fake_host')
fake_share_instance = {'id': 'fake_id', 'host': 'fake_host'}
share_driver = self._setup_mocks_copy_share_data()
remote = {'access': {'access_to': '192.168.0.1'},
'mount': 'fake_mount',
'umount': 'fake_umount'}
local = {'access': {'access_to': '192.168.1.1'},
'mount': 'fake_mount',
'umount': 'fake_umount'}
helper = migration.ShareMigrationHelper(None, None, None, None, None)
msg = ('Failed to mount temporary folder for migration of share '
'instance fake_id to fake_id')
driver.CONF.set_default('migration_tmp_location', '/fake/path')
self.mock_object(migration.ShareMigrationHelper,
'deny_migration_access')
self.mock_object(migration.ShareMigrationHelper,
'allow_migration_access',
mock.Mock(return_value='fake_access_ref'))
self.mock_object(migration.ShareMigrationHelper,
'cleanup_migration_access')
self.mock_object(migration.ShareMigrationHelper,
'cleanup_temp_folder')
self.mock_object(utils, 'execute', mock.Mock(
side_effect=[None, None, exception.ShareMigrationFailed(msg)]))
self.assertRaises(exception.ShareMigrationFailed,
share_driver.copy_share_data,
'ctx', helper, fake_share,
fake_share_instance, None,
fake_share_instance, None,
local, remote)
args = ((None, local['access'], False),
(None, remote['access'], False))
migration.ShareMigrationHelper.deny_migration_access.assert_has_calls(
[mock.call(*a) for a in args])
def test_copy_share_data_mount_for_migration_exception2(self):
fake_share = db_utils.create_share(
id='fakeid', status=constants.STATUS_AVAILABLE, host='fake_host')
fake_share_instance = {'id': 'fake_id', 'host': 'fake_host'}
share_driver = self._setup_mocks_copy_share_data()
remote = {'access': {'access_to': '192.168.0.1'},
'mount': 'fake_mount',
'umount': 'fake_umount'}
local = {'access': {'access_to': '192.168.1.1'},
'mount': 'fake_mount',
'umount': 'fake_umount'}
helper = migration.ShareMigrationHelper(None, None, None, None, None)
msg = ('Failed to mount temporary folder for migration of share '
'instance fake_id to fake_id')
driver.CONF.set_default('migration_tmp_location', '/fake/path')
self.mock_object(migration.ShareMigrationHelper,
'deny_migration_access')
self.mock_object(migration.ShareMigrationHelper,
'allow_migration_access',
mock.Mock(return_value='fake_access_ref'))
self.mock_object(migration.ShareMigrationHelper,
'cleanup_migration_access')
self.mock_object(migration.ShareMigrationHelper,
'cleanup_temp_folder')
self.mock_object(migration.ShareMigrationHelper,
'cleanup_unmount_temp_folder')
self.mock_object(utils, 'execute', mock.Mock(
side_effect=[None, None, None,
exception.ShareMigrationFailed(msg)]))
self.assertRaises(exception.ShareMigrationFailed,
share_driver.copy_share_data,
'ctx', helper, fake_share,
fake_share_instance, None,
fake_share_instance, None,
local, remote)
args = ((None, local['access'], False),
(None, remote['access'], False))
migration.ShareMigrationHelper.deny_migration_access.assert_has_calls(
[mock.call(*a) for a in args])
def test_copy_share_data_access_rule_invalid(self):
fake_share = db_utils.create_share(
id='fakeid', status=constants.STATUS_AVAILABLE, host='fake_host')
share_driver = self._setup_mocks_copy_share_data()
remote = {'access': {'access_to': None},
'mount': 'fake_mount',
'umount': 'fake_umount'}
local = {'access': {'access_to': '192.168.1.1'},
'mount': 'fake_mount',
'umount': 'fake_umount'}
driver.CONF.set_default('migration_tmp_location', '/fake/path')
self.assertRaises(exception.ShareMigrationFailed,
share_driver.copy_share_data, 'ctx', None,
fake_share, None, None, None, None, local, remote)

View File

@ -31,6 +31,8 @@ from manila import exception
from manila import quota
from manila.share import drivers_private_data
from manila.share import manager
from manila.share import migration
from manila.share import rpcapi
from manila import test
from manila.tests import db_utils
from manila.tests import utils as test_utils
@ -2291,6 +2293,409 @@ class ShareManagerTestCase(test.TestCase):
assert_called_once_with(mock.ANY, fake_snap['id'],
{'status': constants.STATUS_ERROR})
def test_get_migration_info(self):
share_instance = 'fake-share-instance'
share_instance_id = 'fake-id'
share_server = 'fake-share-server'
manager = self.share_manager
self.mock_object(manager.db, 'share_instance_get',
mock.Mock(return_value=share_instance))
self.mock_object(manager.driver, 'get_migration_info')
manager.get_migration_info(self.context,
share_instance_id, share_server)
manager.db.share_instance_get.assert_called_once_with(
self.context, share_instance_id, with_share_data=True
)
manager.driver.get_migration_info.assert_called_once_with(
self.context, share_instance, share_server
)
def test_get_driver_migration_info(self):
share_instance = 'fake-share-instance'
share_instance_id = 'fake-id'
share_server = 'fake-share-server'
manager = self.share_manager
self.mock_object(manager.db, 'share_instance_get',
mock.Mock(return_value=share_instance))
self.mock_object(manager.driver, 'get_driver_migration_info')
manager.get_driver_migration_info(self.context, share_instance_id,
share_server)
manager.db.share_instance_get.assert_called_once_with(
self.context, share_instance_id, with_share_data=True
)
manager.driver.get_driver_migration_info.assert_called_once_with(
self.context, share_instance, share_server
)
def test_migrate_share_not_moved_by_driver(self):
share = db_utils.create_share()
share_id = share['id']
host = 'fake-host'
status_migrating = {
'task_state': constants.STATUS_TASK_STATE_MIGRATION_MIGRATING
}
status_success = {
'task_state': constants.STATUS_TASK_STATE_MIGRATION_SUCCESS
}
share_server = 'fake-share-server'
migration_info = 'fake-info'
manager = self.share_manager
self.mock_object(manager, 'driver')
self.mock_object(manager.db, 'share_update')
self.mock_object(manager, '_get_share_server',
mock.Mock(return_value=share_server))
self.mock_object(rpcapi.ShareAPI, 'get_driver_migration_info',
mock.Mock(return_value=migration_info))
self.mock_object(manager.driver,
'migrate_share',
mock.Mock(return_value=[False, None]))
self.mock_object(manager, '_migrate_share_generic',
mock.Mock(return_value=True))
manager.migrate_share(self.context, share_id, host)
manager.db.share_update.assert_any_call(
self.context, share_id, status_migrating
)
manager.driver.migrate_share.assert_called_once_with(
self.context, utils.IsAMatcher(models.ShareInstance),
host, migration_info
)
manager._migrate_share_generic.assert_called_once_with(
self.context, utils.IsAMatcher(models.Share), host
)
manager.db.share_update.assert_any_call(
self.context, share_id, status_success
)
def test_migrate_share_driver_migration(self):
share = db_utils.create_share()
share_id = share['id']
host = 'fake-host'
status_migrating = {
'task_state': constants.STATUS_TASK_STATE_MIGRATION_MIGRATING
}
status_success = {
'task_state': constants.STATUS_TASK_STATE_MIGRATION_SUCCESS
}
share_server = 'fake-share-server'
migration_info = 'fake-info'
manager = self.share_manager
self.mock_object(manager, 'driver')
self.mock_object(manager.db, 'share_update')
self.mock_object(manager, '_get_share_server',
mock.Mock(return_value=share_server))
self.mock_object(rpcapi.ShareAPI, 'get_driver_migration_info',
mock.Mock(return_value=migration_info))
self.mock_object(manager.driver,
'migrate_share',
mock.Mock(return_value=[True, None]))
self.mock_object(manager.db, 'share_instance_update')
manager.migrate_share(self.context, share_id, host)
manager.db.share_update.assert_any_call(
self.context, share_id, status_migrating
)
manager.driver.migrate_share.assert_called_once_with(
self.context, utils.IsAMatcher(models.ShareInstance),
host, migration_info
)
manager.db.share_update.assert_any_call(
self.context, share_id, status_success
)
def test_migrate_share_driver_migration_instance_update(self):
share = db_utils.create_share()
share_id = share['id']
host = 'fake-host'
status_migrating = {
'task_state': constants.STATUS_TASK_STATE_MIGRATION_MIGRATING
}
status_success = {
'task_state': constants.STATUS_TASK_STATE_MIGRATION_SUCCESS
}
share_server = 'fake-share-server'
migration_info = 'fake-info'
manager = self.share_manager
self.mock_object(manager, 'driver')
self.mock_object(manager.db, 'share_update')
self.mock_object(manager, '_get_share_server',
mock.Mock(return_value=share_server))
self.mock_object(rpcapi.ShareAPI, 'get_driver_migration_info',
mock.Mock(return_value=migration_info))
self.mock_object(manager.driver,
'migrate_share',
mock.Mock(return_value=[True, mock.ANY]))
self.mock_object(manager.db, 'share_instance_update')
manager.migrate_share(self.context, share_id, host)
manager.db.share_update.assert_any_call(
self.context, share_id, status_migrating
)
manager.driver.migrate_share.assert_called_once_with(
self.context, utils.IsAMatcher(models.ShareInstance),
host, migration_info
)
manager.db.share_instance_update.assert_called_once_with(
self.context, mock.ANY, mock.ANY
)
manager.db.share_update.assert_any_call(
self.context, share_id, status_success
)
def test_migrate_share_exception_driver(self):
share = db_utils.create_share()
share_id = share['id']
host = 'fake-host'
status_migrating = {
'task_state': constants.STATUS_TASK_STATE_MIGRATION_MIGRATING
}
status_error = {
'task_state': constants.STATUS_TASK_STATE_MIGRATION_ERROR
}
share_server = 'fake-share-server'
migration_info = 'fake-info'
manager = self.share_manager
self.mock_object(manager, 'driver')
self.mock_object(manager.db, 'share_update')
self.mock_object(manager, '_get_share_server',
mock.Mock(return_value=share_server))
self.mock_object(rpcapi.ShareAPI, 'get_driver_migration_info',
mock.Mock(return_value=migration_info))
self.mock_object(manager.driver,
'migrate_share',
mock.Mock(side_effect=exception.ManilaException))
self.mock_object(manager, '_migrate_share_generic',
mock.Mock(return_value=False))
self.assertRaises(exception.ShareMigrationFailed,
manager.migrate_share,
self.context, share_id, host)
manager.db.share_update.assert_any_call(
self.context, share_id, status_migrating
)
manager.driver.migrate_share.assert_called_once_with(
self.context, utils.IsAMatcher(models.ShareInstance),
host, migration_info
)
manager._migrate_share_generic.assert_called_once_with(
self.context, utils.IsAMatcher(models.Share), host
)
manager.db.share_update.assert_any_call(
self.context, share_id, status_error
)
def test_migrate_share_exception_generic(self):
share = db_utils.create_share()
share_id = share['id']
host = 'fake-host'
status_migrating = {
'task_state': constants.STATUS_TASK_STATE_MIGRATION_MIGRATING
}
status_error = {
'task_state': constants.STATUS_TASK_STATE_MIGRATION_ERROR
}
share_server = 'fake-share-server'
migration_info = 'fake-info'
manager = self.share_manager
self.mock_object(manager, 'driver')
self.mock_object(manager.db, 'share_update')
self.mock_object(manager, '_get_share_server',
mock.Mock(return_value=share_server))
self.mock_object(rpcapi.ShareAPI, 'get_driver_migration_info',
mock.Mock(return_value=migration_info))
self.mock_object(manager.driver,
'migrate_share',
mock.Mock(return_value=[False, None]))
self.mock_object(manager,
'_migrate_share_generic',
mock.Mock(side_effect=Exception))
self.assertRaises(exception.ShareMigrationFailed,
manager.migrate_share,
self.context, share_id, host, migration_info)
manager.db.share_update.assert_any_call(
self.context, share_id, status_migrating
)
manager.db.share_update.assert_any_call(
self.context, share_id, status_error
)
def test_migrate_share_force_host_copy(self):
share = db_utils.create_share()
share_id = share['id']
host = 'fake-host'
status_migrating = {
'task_state': constants.STATUS_TASK_STATE_MIGRATION_MIGRATING
}
status_success = {
'task_state': constants.STATUS_TASK_STATE_MIGRATION_SUCCESS
}
manager = self.share_manager
self.mock_object(manager, 'driver')
self.mock_object(manager.db, 'share_update')
self.mock_object(manager, '_migrate_share_generic',
mock.Mock(return_value=True))
manager.migrate_share(self.context, share_id, host, True)
manager.db.share_update.assert_any_call(
self.context, share_id, status_migrating
)
manager._migrate_share_generic.assert_called_once_with(
self.context, utils.IsAMatcher(models.Share), host
)
manager.db.share_update.assert_any_call(
self.context, share_id, status_success
)
def test_migrate_share_generic(self):
share = db_utils.create_share()
share_id = share['id']
host = {'host': 'fake-host'}
status_completing = {
'task_state': constants.STATUS_TASK_STATE_MIGRATION_COMPLETING
}
status_inactive = {'status': constants.STATUS_INACTIVE}
status_available = {'status': constants.STATUS_AVAILABLE}
share_server = 'fake-server'
new_share_server = 'new-fake-server'
src_migration_info = 'fake-src-migration-info'
dest_migration_info = 'fake-dest-migration-info'
manager = self.share_manager
manager.create_share_instance(self.context, share.instance['id'])
share_instance = manager._get_share_instance(self.context, share)
new_share_instance = {'id': 'fake-id',
'status': constants.STATUS_CREATING}
self.mock_object(manager, '_get_share_instance',
mock.Mock(return_value=share_instance))
self.mock_object(migration.ShareMigrationHelper,
'change_to_read_only')
self.mock_object(migration.ShareMigrationHelper,
'create_instance_and_wait',
mock.Mock(return_value=new_share_instance))
self.mock_object(manager.db, 'share_instance_update')
self.mock_object(
manager,
'_get_share_server',
mock.Mock(side_effect=[share_server, new_share_server])
)
self.mock_object(manager.driver, 'get_migration_info',
mock.Mock(return_value=src_migration_info))
self.mock_object(rpcapi.ShareAPI, 'get_migration_info',
mock.Mock(return_value=dest_migration_info))
self.mock_object(manager.driver, 'copy_share_data')
self.mock_object(manager.db, 'share_update')
self.mock_object(migration.ShareMigrationHelper,
'revert_access_rules')
self.mock_object(migration.ShareMigrationHelper,
'delete_instance_and_wait')
manager._migrate_share_generic(self.context, share, host)
manager._get_share_instance.assert_called_once_with(
self.context, share
)
manager.db.share_instance_update.assert_any_call(
self.context, new_share_instance['id'], status_inactive
)
manager._get_share_server.assert_any_call(
mock.ANY, share_instance
)
manager._get_share_server.assert_any_call(
mock.ANY, new_share_instance
)
manager.driver.get_migration_info.assert_called_once_with(
self.context, share_instance, share_server
)
manager.driver.copy_share_data.assert_called_once_with(
self.context, mock.ANY, share, share_instance,
share_server, new_share_instance, new_share_server,
src_migration_info, dest_migration_info
)
manager.db.share_update.assert_called_once_with(
self.context, share_id, status_completing
)
manager.db.share_instance_update.assert_any_call(
self.context, new_share_instance['id'], status_available
)
def test_migrate_share_generic_exception(self):
share = db_utils.create_share()
host = {'host': 'fake-host'}
manager = self.share_manager
manager.create_share_instance(self.context, share.instance['id'])
share_instance = manager._get_share_instance(self.context, share)
self.mock_object(manager, '_get_share_instance',
mock.Mock(return_value=share_instance))
self.mock_object(migration.ShareMigrationHelper,
'change_to_read_only')
self.mock_object(migration.ShareMigrationHelper,
'create_instance_and_wait',
mock.Mock(side_effect=exception.ShareMigrationFailed(
reason='fake')))
self.mock_object(migration.ShareMigrationHelper,
'revert_access_rules')
self.assertRaises(exception.ShareMigrationFailed,
manager._migrate_share_generic,
self.context, share, host)
@ddt.ddt
class HookWrapperTestCase(test.TestCase):

View File

@ -0,0 +1,513 @@
# Copyright 2015 Hitachi Data Systems inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
import time
from manila.common import constants
from manila import context
from manila import db
from manila import exception
from manila.share import api as share_api
from manila.share import driver
from manila.share import migration
from manila import test
from manila.tests import db_utils
from manila import utils
@ddt.ddt
class ShareMigrationHelperTestCase(test.TestCase):
"""Tests ShareMigrationHelper."""
def setUp(self):
super(ShareMigrationHelperTestCase, self).setUp()
self.share = db_utils.create_share()
self.context = context.get_admin_context()
self.helper = migration.ShareMigrationHelper(
self.context, db,
driver.CONF.migration_create_delete_share_timeout,
driver.CONF.migration_wait_access_rules_timeout, self.share)
def test_deny_rules_and_wait(self):
saved_rules = [db_utils.create_access(share_id=self.share['id'],
state=constants.STATUS_ACTIVE)]
self.mock_object(share_api.API, 'deny_access_to_instance')
self.mock_object(db, 'share_access_get_all_for_share',
mock.Mock(side_effect=[saved_rules, []]))
self.mock_object(time, 'sleep')
self.helper.deny_rules_and_wait(
self.context, self.share, saved_rules)
db.share_access_get_all_for_share.assert_any_call(
self.context, self.share['id'])
def test_deny_rules_and_wait_timeout(self):
saved_rules = [db_utils.create_access(share_id=self.share['id'],
state=constants.STATUS_ACTIVE)]
self.mock_object(share_api.API, 'deny_access_to_instance')
self.mock_object(db, 'share_access_get_all_for_share',
mock.Mock(return_value=saved_rules))
self.mock_object(time, 'sleep')
now = time.time()
timeout = now + 100
self.mock_object(time, 'time',
mock.Mock(side_effect=[now, timeout]))
self.assertRaises(exception.ShareMigrationFailed,
self.helper.deny_rules_and_wait,
self.context, self.share, saved_rules)
db.share_access_get_all_for_share.assert_called_once_with(
self.context, self.share['id'])
def test_add_rules_and_wait(self):
rules_active = [db_utils.create_access(share_id=self.share['id'],
state=constants.STATUS_ACTIVE)]
rules_new = [db_utils.create_access(share_id=self.share['id'],
state=constants.STATUS_NEW)]
self.mock_object(share_api.API, 'allow_access')
self.mock_object(db, 'share_access_get_all_for_share',
mock.Mock(side_effect=[rules_new,
rules_active]))
self.mock_object(time, 'sleep')
self.helper.add_rules_and_wait(self.context, self.share,
rules_active)
db.share_access_get_all_for_share.assert_any_call(
self.context, self.share['id'])
def test_add_rules_and_wait_access_level(self):
rules_active = [db_utils.create_access(share_id=self.share['id'],
state=constants.STATUS_ACTIVE)]
self.mock_object(share_api.API, 'allow_access')
self.mock_object(db, 'share_access_get_all_for_share',
mock.Mock(return_value=rules_active))
self.mock_object(time, 'sleep')
self.helper.add_rules_and_wait(self.context, self.share,
rules_active, 'access_level')
db.share_access_get_all_for_share.assert_any_call(
self.context, self.share['id'])
def test_add_rules_and_wait_timeout(self):
rules_new = [db_utils.create_access(share_id=self.share['id'],
state=constants.STATUS_NEW)]
self.mock_object(share_api.API, 'allow_access')
self.mock_object(db, 'share_access_get_all_for_share',
mock.Mock(return_value=rules_new))
self.mock_object(time, 'sleep')
now = time.time()
timeout = now + 100
self.mock_object(time, 'time',
mock.Mock(side_effect=[now, timeout]))
self.assertRaises(exception.ShareMigrationFailed,
self.helper.add_rules_and_wait, self.context,
self.share, rules_new)
db.share_access_get_all_for_share.assert_called_once_with(
self.context, self.share['id'])
def test_delete_instance_and_wait(self):
self.mock_object(share_api.API, 'delete_instance')
self.mock_object(db, 'share_instance_get',
mock.Mock(side_effect=[self.share.instance, None]))
self.mock_object(time, 'sleep')
self.helper.delete_instance_and_wait(self.context,
self.share.instance)
db.share_instance_get.assert_any_call(
self.context, self.share.instance['id'])
def test_delete_instance_and_wait_timeout(self):
self.mock_object(share_api.API, 'delete_instance')
self.mock_object(db, 'share_instance_get',
mock.Mock(side_effect=[self.share.instance, None]))
self.mock_object(time, 'sleep')
now = time.time()
timeout = now + 310
self.mock_object(time, 'time',
mock.Mock(side_effect=[now, timeout]))
self.assertRaises(exception.ShareMigrationFailed,
self.helper.delete_instance_and_wait,
self.context, self.share.instance)
db.share_instance_get.assert_called_once_with(
self.context, self.share.instance['id'])
def test_delete_instance_and_wait_not_found(self):
self.mock_object(share_api.API, 'delete_instance')
self.mock_object(db, 'share_instance_get',
mock.Mock(side_effect=exception.NotFound))
self.mock_object(time, 'sleep')
self.helper.delete_instance_and_wait(self.context,
self.share.instance)
db.share_instance_get.assert_called_once_with(
self.context, self.share.instance['id'])
def test_create_instance_and_wait(self):
host = {'host': 'fake-host'}
share_instance_creating = db_utils.create_share_instance(
share_id=self.share['id'], status=constants.STATUS_CREATING,
share_network_id='fake_network_id')
share_instance_available = db_utils.create_share_instance(
share_id=self.share['id'], status=constants.STATUS_AVAILABLE,
share_network_id='fake_network_id')
self.mock_object(share_api.API, 'create_instance',
mock.Mock(return_value=share_instance_creating))
self.mock_object(db, 'share_instance_get',
mock.Mock(side_effect=[share_instance_creating,
share_instance_available]))
self.mock_object(time, 'sleep')
self.helper.create_instance_and_wait(
self.context, self.share, share_instance_creating, host)
db.share_instance_get.assert_any_call(
self.context, share_instance_creating['id'], with_share_data=True)
def test_create_instance_and_wait_status_error(self):
host = {'host': 'fake-host'}
share_instance_error = db_utils.create_share_instance(
share_id=self.share['id'], status=constants.STATUS_ERROR,
share_network_id='fake_network_id')
self.mock_object(share_api.API, 'create_instance',
mock.Mock(return_value=share_instance_error))
self.mock_object(db, 'share_instance_get',
mock.Mock(return_value=share_instance_error))
self.mock_object(time, 'sleep')
self.assertRaises(exception.ShareMigrationFailed,
self.helper.create_instance_and_wait,
self.context, self.share, share_instance_error, host)
db.share_instance_get.assert_called_once_with(
self.context, share_instance_error['id'], with_share_data=True)
def test_create_instance_and_wait_timeout(self):
host = {'host': 'fake-host'}
share_instance_creating = db_utils.create_share_instance(
share_id=self.share['id'], status=constants.STATUS_CREATING,
share_network_id='fake_network_id')
self.mock_object(share_api.API, 'create_instance',
mock.Mock(return_value=share_instance_creating))
self.mock_object(db, 'share_instance_get',
mock.Mock(return_value=share_instance_creating))
self.mock_object(time, 'sleep')
now = time.time()
timeout = now + 310
self.mock_object(time, 'time',
mock.Mock(side_effect=[now, timeout]))
self.assertRaises(exception.ShareMigrationFailed,
self.helper.create_instance_and_wait,
self.context, self.share, share_instance_creating,
host)
db.share_instance_get.assert_called_once_with(
self.context, share_instance_creating['id'], with_share_data=True)
def test_wait_for_allow_access(self):
access_active = db_utils.create_access(state=constants.STATUS_ACTIVE,
share_id=self.share['id'])
access_new = db_utils.create_access(state=constants.STATUS_NEW,
share_id=self.share['id'])
self.mock_object(time, 'sleep')
self.mock_object(self.helper.api, 'access_get',
mock.Mock(side_effect=[access_new, access_active]))
result = self.helper.wait_for_allow_access(access_new)
self.assertEqual(access_active, result)
def test_wait_for_allow_access_timeout(self):
access_new = db_utils.create_access(state=constants.STATUS_NEW,
share_id=self.share['id'])
self.mock_object(self.helper.api, 'access_get',
mock.Mock(return_value=access_new))
now = time.time()
timeout = now + 100
self.mock_object(time, 'time',
mock.Mock(side_effect=[now, timeout]))
self.assertRaises(exception.ShareMigrationFailed,
self.helper.wait_for_allow_access, access_new)
def test_wait_for_allow_access_error(self):
access_new = db_utils.create_access(state=constants.STATUS_NEW,
share_id=self.share['id'])
access_error = db_utils.create_access(state=constants.STATUS_ERROR,
share_id=self.share['id'])
self.mock_object(self.helper.api, 'access_get',
mock.Mock(return_value=access_error))
self.assertRaises(exception.ShareMigrationFailed,
self.helper.wait_for_allow_access, access_new)
def test_wait_for_deny_access(self):
access_active = db_utils.create_access(state=constants.STATUS_ACTIVE,
share_id=self.share['id'])
self.mock_object(self.helper.api, 'access_get',
mock.Mock(side_effect=[[access_active],
exception.NotFound]))
self.helper.wait_for_deny_access(access_active)
def test_wait_for_deny_access_timeout(self):
access_active = db_utils.create_access(state=constants.STATUS_ACTIVE,
share_id=self.share['id'])
self.mock_object(self.helper.api, 'access_get',
mock.Mock(side_effect=[[access_active],
[access_active]]))
now = time.time()
timeout = now + 100
self.mock_object(time, 'time',
mock.Mock(side_effect=[now, timeout]))
self.assertRaises(exception.ShareMigrationFailed,
self.helper.wait_for_deny_access, access_active)
def test_allow_migration_access(self):
access = {'access_to': 'fake_ip',
'access_type': 'fake_type'}
access_active = db_utils.create_access(state=constants.STATUS_ACTIVE,
share_id=self.share['id'])
self.mock_object(self.helper, 'wait_for_allow_access',
mock.Mock(return_value=access_active))
self.mock_object(self.helper.api, 'allow_access',
mock.Mock(return_value=access_active))
result = self.helper.allow_migration_access(access)
self.assertEqual(access_active, result)
self.helper.wait_for_allow_access.assert_called_once_with(
access_active)
def test_allow_migration_access_exists(self):
access = {'access_to': 'fake_ip',
'access_type': 'fake_type'}
access_active = db_utils.create_access(state=constants.STATUS_ACTIVE,
share_id=self.share['id'],
access_to='fake_ip')
self.mock_object(
self.helper.api, 'allow_access',
mock.Mock(side_effect=[exception.ShareAccessExists('fake')]))
self.mock_object(self.helper.api, 'access_get_all',
mock.Mock(return_value=[access_active]))
result = self.helper.allow_migration_access(access)
self.assertEqual(access_active, result)
def test_deny_migration_access(self):
access = {'access_to': 'fake_ip',
'access_type': 'fake_type'}
access_active = db_utils.create_access(state=constants.STATUS_ACTIVE,
share_id=self.share['id'],
access_to='fake_ip')
self.mock_object(self.helper.api, 'access_get',
mock.Mock(return_value=access_active))
self.mock_object(self.helper.api, 'deny_access')
self.mock_object(self.helper, 'wait_for_deny_access')
self.helper.deny_migration_access(access_active, access)
self.helper.wait_for_deny_access.assert_called_once_with(access_active)
def test_deny_migration_access_not_found(self):
access = {'access_to': 'fake_ip',
'access_type': 'fake_type'}
access_active = db_utils.create_access(state=constants.STATUS_ACTIVE,
share_id=self.share['id'],
access_to='fake_ip')
self.mock_object(self.helper.api, 'access_get',
mock.Mock(side_effect=exception.NotFound('fake')))
self.helper.deny_migration_access(access_active, access)
def test_deny_migration_access_none(self):
access = {'access_to': 'fake_ip',
'access_type': 'fake_type'}
access_active = db_utils.create_access(state=constants.STATUS_ACTIVE,
share_id=self.share['id'],
access_to='fake_ip')
self.mock_object(self.helper.api, 'access_get_all',
mock.Mock(return_value=[access_active]))
self.mock_object(self.helper.api, 'deny_access')
self.mock_object(self.helper, 'wait_for_deny_access')
self.helper.deny_migration_access(None, access)
self.helper.wait_for_deny_access.assert_called_once_with(access_active)
def test_deny_migration_access_exception(self):
access = {'access_to': 'fake_ip',
'access_type': 'fake_type'}
access_active = db_utils.create_access(state=constants.STATUS_ACTIVE,
share_id=self.share['id'],
access_to='fake_ip')
self.mock_object(self.helper.api, 'access_get',
mock.Mock(return_value=access_active))
self.mock_object(self.helper.api, 'deny_access',
mock.Mock(side_effect=[exception.NotFound('fake')]))
self.assertRaises(exception.NotFound,
self.helper.deny_migration_access, access_active,
access)
def test_cleanup_migration_access_exception(self):
self.mock_object(self.helper, 'deny_migration_access',
mock.Mock(side_effect=Exception('fake')))
self.helper.cleanup_migration_access(None, None)
def test_cleanup_temp_folder_exception(self):
self.mock_object(utils, 'execute',
mock.Mock(side_effect=Exception('fake')))
self.helper.cleanup_temp_folder(self.share.instance, None)
def test_cleanup_unmount_temp_folder_exception(self):
self.mock_object(utils, 'execute',
mock.Mock(side_effect=Exception('fake')))
self.helper.cleanup_unmount_temp_folder(self.share.instance, None)
def test_change_to_read_only(self):
access_active = db_utils.create_access(state=constants.STATUS_ACTIVE,
share_id=self.share['id'],
access_to='fake_ip')
self.mock_object(db, 'share_access_get_all_for_share',
mock.Mock(return_value=access_active))
self.mock_object(self.helper, 'deny_rules_and_wait')
self.mock_object(self.helper, 'add_rules_and_wait')
result = self.helper.change_to_read_only(True)
self.assertEqual(access_active, result)
db.share_access_get_all_for_share.assert_called_once_with(
self.context, self.share['id'])
self.helper.deny_rules_and_wait.assert_called_once_with(
self.context, self.share, access_active)
self.helper.add_rules_and_wait.assert_called_once_with(
self.context, self.share, access_active, 'ro')
def test_revert_access_rules(self):
access_active = db_utils.create_access(state=constants.STATUS_ACTIVE,
share_id=self.share['id'],
access_to='fake_ip')
self.mock_object(db, 'share_access_get_all_for_share',
mock.Mock(return_value=access_active))
self.mock_object(self.helper, 'deny_rules_and_wait')
self.mock_object(self.helper, 'add_rules_and_wait')
self.helper.revert_access_rules(True, access_active)
db.share_access_get_all_for_share.assert_called_once_with(
self.context, self.share['id'])
self.helper.deny_rules_and_wait.assert_called_once_with(
self.context, self.share, access_active)
self.helper.add_rules_and_wait.assert_called_once_with(
self.context, self.share, access_active)

View File

@ -45,12 +45,14 @@ class ShareRpcAPITestCase(test.TestCase):
share_server = db_utils.create_share_server()
cg = {'id': 'fake_cg_id', 'host': 'fake_host'}
cgsnapshot = {'id': 'fake_cg_id'}
host = {'host': 'fake_host', 'capabilities': 1}
self.fake_share = jsonutils.to_primitive(share)
self.fake_access = jsonutils.to_primitive(access)
self.fake_snapshot = jsonutils.to_primitive(snapshot)
self.fake_share_server = jsonutils.to_primitive(share_server)
self.fake_cg = jsonutils.to_primitive(cg)
self.fake_cgsnapshot = jsonutils.to_primitive(cgsnapshot)
self.fake_host = jsonutils.to_primitive(host)
self.ctxt = context.RequestContext('fake_user', 'fake_project')
self.rpcapi = share_rpcapi.ShareAPI()
@ -64,7 +66,7 @@ class ShareRpcAPITestCase(test.TestCase):
"version": kwargs.pop('version', self.rpcapi.BASE_RPC_API_VERSION)
}
expected_msg = copy.deepcopy(kwargs)
if 'share' in expected_msg:
if 'share' in expected_msg and method != 'get_migration_info':
share = expected_msg['share']
del expected_msg['share']
expected_msg['share_id'] = share['id']
@ -89,15 +91,18 @@ class ShareRpcAPITestCase(test.TestCase):
snapshot = expected_msg['snapshot']
del expected_msg['snapshot']
expected_msg['snapshot_id'] = snapshot['id']
if 'dest_host' in expected_msg:
del expected_msg['dest_host']
expected_msg['host'] = self.fake_host
if 'host' in kwargs:
host = kwargs['host']
elif 'share_server' in kwargs:
host = kwargs['share_server']['host']
elif 'cg' in kwargs:
host = kwargs['cg']['host']
elif 'share_instance' in kwargs:
host = kwargs['share_instance']['host']
elif 'share_server' in kwargs:
host = kwargs['share_server']['host']
else:
host = kwargs['share']['host']
target['server'] = host
@ -218,3 +223,30 @@ class ShareRpcAPITestCase(test.TestCase):
rpc_method='cast',
cgsnapshot=self.fake_cgsnapshot,
host='fake_host1')
def test_migrate_share(self):
fake_dest_host = self.Desthost()
self._test_share_api('migrate_share',
rpc_method='cast',
version='1.6',
share=self.fake_share,
dest_host=fake_dest_host,
force_host_copy='1')
def test_get_migration_info(self):
self._test_share_api('get_migration_info',
rpc_method='call',
version='1.6',
share_instance=self.fake_share,
share_server=self.fake_share_server)
def test_get_driver_migration_info(self):
self._test_share_api('get_driver_migration_info',
rpc_method='call',
version='1.6',
share_instance=self.fake_share,
share_server=self.fake_share_server)
class Desthost(object):
host = 'fake_host'
capabilities = 1

View File

@ -16,6 +16,11 @@
"""Tests For miscellaneous util methods used with share."""
import os
import shutil
import mock
from manila.share import utils as share_utils
from manila import test
@ -128,3 +133,126 @@ class ShareUtilsTestCase(test.TestCase):
expected = None
self.assertEqual(expected,
share_utils.append_host(host, pool))
class CopyClassTestCase(test.TestCase):
def setUp(self):
super(CopyClassTestCase, self).setUp()
src = '/path/fake/src'
dest = '/path/fake/dst'
ignore_list = ['item']
self._copy = share_utils.Copy(src, dest, ignore_list)
self._copy.totalSize = 10000
self._copy.currentSize = 100
self._copy.files = [{'name': '/fileA', 'attr': 100},
{'name': '/fileB', 'attr': 150},
{'name': '/fileC', 'attr': 200}]
self._copy.dirs = [{'name': '/fakeA', 'attr': 777},
{'name': '/fakeB', 'attr': 666},
{'name': '/fakeC', 'attr': 767}]
self._copy.currentCopy = {'file_path': '/fake/path', 'size': 100}
self.stat_result = [777, 'ino', 'dev', 'nlink', 'uid',
'gid', 100, 'at', 'mt', 'ct']
self.mock_log = self.mock_object(share_utils, 'LOG')
def test_get_progress(self):
expected = {'total_progress': 1,
'current_file_path': '/fake/path',
'current_file_progress': 100}
self.mock_object(os, 'stat', mock.Mock(return_value=self.stat_result))
out = self._copy.get_progress()
self.assertEqual(expected, out)
os.stat.assert_called_once_with('/fake/path')
def test_get_progress_current_copy_none(self):
self._copy.currentCopy = None
expected = {'total_progress': 100}
out = self._copy.get_progress()
self.assertEqual(expected, out)
def test_get_progress_os_exception(self):
expected = {'total_progress': 1,
'current_file_path': '/fake/path',
'current_file_progress': 0}
self.mock_object(os, 'stat', mock.Mock(side_effect=OSError))
out = self._copy.get_progress()
os.stat.assert_called_once_with('/fake/path')
self.assertEqual(expected, out)
def test_run(self):
dirpath = '/dirpath1'
dirnames = [('dir1', 'dir2'), ('dir3', 'dir4')]
filenames = [('file1.txt', 'file2.exe'), ('file3.txt', 'file4.exe')]
os_walk_return = [(dirpath, dirnames[0], filenames[0]),
(dirpath, dirnames[1], filenames[1])]
self.mock_object(shutil, 'copy2', mock.Mock())
self.mock_object(shutil, 'copystat', mock.Mock())
self.mock_object(os, 'stat', mock.Mock(return_value=self.stat_result))
self.mock_object(os, 'walk', mock.Mock(return_value=os_walk_return))
self.mock_object(os, 'mkdir', mock.Mock())
self._copy.run()
self.assertTrue(self.mock_log.info.called)
os.walk.assert_called_once_with('/path/fake/src')
# os.stats called in explore and get_progress functions
self.assertEqual(16, os.stat.call_count)
def test_copy(self):
src = '/path/fake/src'
dest = '/path/fake/dst'
self.mock_object(os, 'stat', mock.Mock(return_value=self.stat_result))
self.mock_object(os, 'mkdir', mock.Mock())
self.mock_object(shutil, 'copy2', mock.Mock())
self.mock_object(shutil, 'copystat', mock.Mock())
self._copy.copy(src, dest)
self.assertTrue(self.mock_log.info.called)
# shutil.copystat should be called 3 times.
# Once for each entry in self._copy.dirs
self.assertEqual(3, shutil.copystat.call_count)
# os.stat should be called 3 times.
# Once for each entry in self._copy.files
self.assertEqual(3, os.stat.call_count)
self.assertEqual(3, os.mkdir.call_count)
args = ('/fileA', '/fileB', '/fileC')
os.stat.assert_has_calls([mock.call(a) for a in args])
args = ('/fakeA', '/fakeB', '/fakeC')
os.mkdir.assert_has_calls([mock.call(a) for a in args])
def test_explore(self):
path = '/dirpath1'
dirpath = '/dirpath1'
dirnames = [('dir1', 'dir2'), ('dir3', 'dir4')]
filenames = [('file1.txt', 'file2.exe'), ('file3.txt', 'file4.exe')]
os_walk_return = [(dirpath, dirnames[0], filenames[0]),
(dirpath, dirnames[1], filenames[1])]
self.mock_object(os, 'stat', mock.Mock(return_value=self.stat_result))
self.mock_object(os, 'walk', mock.Mock(return_value=os_walk_return))
self._copy.explore(path)
os.walk.assert_called_once_with('/dirpath1')
# Function os.stat should be called 8 times.
# 4 times for dirname in dirnames, and 4 times for
# filename in filenames
self.assertEqual(8, os.stat.call_count)
args = ('/dirpath1/dir1', '/dirpath1/dir2', '/dirpath1/file1.txt',
'/dirpath1/file2.exe', '/dirpath1/dir3', '/dirpath1/dir4',
'/dirpath1/file3.txt', '/dirpath1/file4.exe')
os.stat.assert_has_calls([mock.call(a) for a in args])