Support A/A on Scheduler operations

This patch allows scheduler to work with clustered hosts to support A/A
operations.

Reporting capabilities of clustered hosts will be grouped by the
cluster_name instead of the host, and non clustered hosts will still be
stored by host.

To avoid replacing a newer capability report with an older version we
timestamp capabilities on the volumes (it's backward compatible) and
only replace currently stored values in scheduler when they are newer.

Following actions now support A/A operation:

- manage_existing
- manage_existing_snapshot
- get_pools
- create_volume
- retype
- migrate_volume_to_host
- create_consistencygroup
- create_group
- update_service_capabilities
- extend_volume

And Affinity and Driver filters have been updated.

The new functionality to notify service capabilities has not been
changed to Active/Active and will be done in another patch.

APIImpact: Added microversion 3.16
Specs: https://review.openstack.org/327283
Implements: blueprint cinder-volume-active-active-support
Change-Id: I611e75500f3d5281188c5aae287c62e5810e6b72
This commit is contained in:
Gorka Eguileor 2016-07-21 13:36:58 +02:00
parent 344f230990
commit 9acf079b8c
43 changed files with 911 additions and 389 deletions

View File

@ -349,6 +349,13 @@ cgsnapshot_id:
in: body
required: false
type: string
cluster_mutex:
description: |
The OpenStack Block Storage cluster where the resource resides. Optional
only if host field is provided.
in: body
required: false
type: string
connector:
description: |
The ``connector`` object.
@ -638,6 +645,13 @@ host:
in: body
required: true
type: string
host_mutex:
description: |
The OpenStack Block Storage host where the existing resource resides.
Optional only if cluster field is provided.
in: body
required: false
type: string
host_name:
description: |
The name of the attaching host.

View File

@ -0,0 +1,19 @@
{
"volume": {
"host": null,
"cluster": "cluster@backend",
"ref": {
"source-name": "existingLV",
"source-id": "1234"
},
"name": "New Volume",
"availability_zone": "az2",
"description": "Volume imported from existingLV",
"volume_type": null,
"bootable": true,
"metadata": {
"key1": "value1",
"key2": "value2"
}
}
}

View File

@ -24,6 +24,10 @@ or source-name element, if possible.
The API chooses the size of the volume by rounding up the size of
the existing storage volume to the next gibibyte (GiB).
Prior to microversion 3.16 host field was required, with the possibility of
defining the cluster it is no longer required, but we must have either a host
or a cluster field but we cannot have them both with values.
Error response codes:202,
@ -38,7 +42,8 @@ Request
- volume_type: volume_type
- name: name
- volume: volume
- host: host
- host: host_mutex
- cluster: cluster_mutex
- ref: ref
- metadata: metadata
- tenant_id: tenant_id
@ -48,3 +53,6 @@ Request Example
.. literalinclude:: ./samples/volume-manage-request.json
:language: javascript
.. literalinclude:: ./samples/volume-manage-request-cluster.json
:language: javascript

View File

@ -370,3 +370,17 @@ class ViewBuilder(object):
url_parts[2] = prefix_parts[2] + url_parts[2]
return urllib.parse.urlunsplit(url_parts).rstrip('/')
def get_cluster_host(req, params, cluster_version):
if req.api_version_request.matches(cluster_version):
cluster_name = params.get('cluster')
msg = _('One and only one of cluster and host must be set.')
else:
cluster_name = None
msg = _('Host field is missing.')
host = params.get('host')
if bool(cluster_name) == bool(host):
raise exception.InvalidInput(reason=msg)
return cluster_name, host

View File

@ -17,6 +17,7 @@ import oslo_messaging as messaging
import webob
from webob import exc
from cinder.api import common
from cinder.api import extensions
from cinder.api.openstack import wsgi
from cinder import backup
@ -241,14 +242,12 @@ class VolumeAdminController(AdminController):
# Not found exception will be handled at the wsgi level
volume = self._get(context, id)
params = body['os-migrate_volume']
try:
host = params['host']
except KeyError:
raise exc.HTTPBadRequest(explanation=_("Must specify 'host'."))
cluster_name, host = common.get_cluster_host(req, params, '3.16')
force_host_copy = utils.get_bool_param('force_host_copy', params)
lock_volume = utils.get_bool_param('lock_volume', params)
self.volume_api.migrate_volume(context, volume, host, force_host_copy,
lock_volume)
self.volume_api.migrate_volume(context, volume, host, cluster_name,
force_host_copy, lock_volume)
return webob.Response(status_int=202)
@wsgi.action('os-migrate_volume_completion')

View File

@ -13,8 +13,8 @@
# under the License.
from oslo_log import log as logging
from webob import exc
from cinder.api import common
from cinder.api.contrib import resource_common_manage
from cinder.api import extensions
from cinder.api.openstack import wsgi
@ -64,6 +64,7 @@ class VolumeManageController(wsgi.Controller):
'volume':
{
'host': <Cinder host on which the existing storage resides>,
'cluster': <Cinder cluster on which the storage resides>,
'ref': <Driver-specific reference to existing storage object>,
}
}
@ -106,13 +107,10 @@ class VolumeManageController(wsgi.Controller):
# Check that the required keys are present, return an error if they
# are not.
required_keys = set(['ref', 'host'])
missing_keys = list(required_keys - set(volume.keys()))
if 'ref' not in volume:
raise exception.MissingRequired(element='ref')
if missing_keys:
msg = _("The following elements are required: %s") % \
', '.join(missing_keys)
raise exc.HTTPBadRequest(explanation=msg)
cluster_name, host = common.get_cluster_host(req, volume, '3.16')
LOG.debug('Manage volume request body: %s', body)
@ -139,7 +137,8 @@ class VolumeManageController(wsgi.Controller):
try:
new_volume = self.volume_api.manage_existing(context,
volume['host'],
host,
cluster_name,
volume['ref'],
**kwargs)
except exception.ServiceNotFound:

View File

@ -64,6 +64,7 @@ REST_API_VERSION_HISTORY = """
* 3.14 - Add group snapshot and create group from src APIs.
* 3.15 - Inject the response's `Etag` header to avoid the lost update
problem with volume metadata.
* 3.16 - Migrate volume now supports cluster
"""
# The minimum and maximum versions of the API supported
@ -71,7 +72,7 @@ REST_API_VERSION_HISTORY = """
# minimum version of the API supported.
# Explicitly using /v1 or /v2 enpoints will still work
_MIN_API_VERSION = "3.0"
_MAX_API_VERSION = "3.15"
_MAX_API_VERSION = "3.16"
_LEGACY_API_VERSION1 = "1.0"
_LEGACY_API_VERSION2 = "2.0"

View File

@ -191,3 +191,12 @@ user documentation.
------------------------
Added injecting the response's `Etag` header to avoid the lost update
problem with volume metadata.
3.16
----
os-migrate_volume now accepts ``cluster`` parameter when we want to migrate a
volume to a cluster. If we pass the ``host`` parameter for a volume that is
in a cluster, the request will be sent to the cluster as if we had requested
that specific cluster. Only ``host`` or ``cluster`` can be provided.
Creating a managed volume also supports the cluster parameter.

View File

@ -424,7 +424,7 @@ def _filter_host(field, value, match_level=None):
def _service_query(context, session=None, read_deleted='no', host=None,
cluster_name=None, is_up=None, backend_match_level=None,
**filters):
disabled=None, **filters):
filters = _clean_filters(filters)
if filters and not is_valid_model_filters(models.Service, filters):
return None
@ -442,6 +442,22 @@ def _service_query(context, session=None, read_deleted='no', host=None,
query = query.filter(_filter_host(models.Service.cluster_name,
cluster_name, backend_match_level))
# Now that we have clusters, a service is disabled if the service doesn't
# belong to a cluster or if it belongs to a cluster and the cluster itself
# is disabled.
if disabled is not None:
disabled_filter = or_(
and_(models.Service.cluster_name.is_(None),
models.Service.disabled),
and_(models.Service.cluster_name.isnot(None),
sql.exists().where(and_(
models.Cluster.name == models.Service.cluster_name,
models.Cluster.binary == models.Service.binary,
~models.Cluster.deleted,
models.Cluster.disabled))))
if not disabled:
disabled_filter = ~disabled_filter
query = query.filter(disabled_filter)
if filters:
query = query.filter_by(**filters)
@ -5074,16 +5090,14 @@ def consistencygroup_create(context, values, cg_snap_id=None, cg_id=None):
if conditions:
# We don't want duplicated field values
values.pop('volume_type_id', None)
values.pop('availability_zone', None)
values.pop('host', None)
names = ['volume_type_id', 'availability_zone', 'host',
'cluster_name']
for name in names:
values.pop(name, None)
sel = session.query(cg_model.volume_type_id,
cg_model.availability_zone,
cg_model.host,
*(bindparam(k, v) for k, v in values.items())
).filter(*conditions)
names = ['volume_type_id', 'availability_zone', 'host']
fields = [getattr(cg_model, name) for name in names]
fields.extend(bindparam(k, v) for k, v in values.items())
sel = session.query(*fields).filter(*conditions)
names.extend(values.keys())
insert_stmt = cg_model.__table__.insert().from_select(names, sel)
result = session.execute(insert_stmt)

View File

@ -177,7 +177,8 @@ class SchedulerDependentManager(Manager):
context,
self.service_name,
self.host,
self.last_capabilities)
self.last_capabilities,
self.cluster)
try:
self.scheduler_rpcapi.notify_service_capabilities(
context,

View File

@ -457,6 +457,10 @@ class ClusteredObject(object):
def service_topic_queue(self):
return self.cluster_name or self.host
@property
def is_clustered(self):
return bool(self.cluster_name)
class CinderObjectSerializer(base.VersionedObjectSerializer):
OBJ_BASE_CLASS = CinderObject

View File

@ -41,13 +41,14 @@ CONF = cfg.CONF
CONF.register_opts(scheduler_driver_opts)
def volume_update_db(context, volume_id, host):
"""Set the host and set the scheduled_at field of a volume.
def volume_update_db(context, volume_id, host, cluster_name):
"""Set the host, cluster_name, and set the scheduled_at field of a volume.
:returns: A Volume with the updated fields set properly.
"""
volume = objects.Volume.get_by_id(context, volume_id)
volume.host = host
volume.cluster_name = cluster_name
volume.scheduled_at = timeutils.utcnow()
volume.save()
@ -56,22 +57,24 @@ def volume_update_db(context, volume_id, host):
return volume
def group_update_db(context, group, host):
def group_update_db(context, group, host, cluster_name):
"""Set the host and the scheduled_at field of a consistencygroup.
:returns: A Consistencygroup with the updated fields set properly.
"""
group.update({'host': host, 'updated_at': timeutils.utcnow()})
group.update({'host': host, 'updated_at': timeutils.utcnow(),
'cluster_name': cluster_name})
group.save()
return group
def generic_group_update_db(context, group, host):
def generic_group_update_db(context, group, host, cluster_name):
"""Set the host and the scheduled_at field of a group.
:returns: A Group with the updated fields set properly.
"""
group.update({'host': host, 'updated_at': timeutils.utcnow()})
group.update({'host': host, 'updated_at': timeutils.utcnow(),
'cluster_name': cluster_name})
group.save()
return group
@ -97,11 +100,14 @@ class Scheduler(object):
return self.host_manager.has_all_capabilities()
def update_service_capabilities(self, service_name, host, capabilities):
def update_service_capabilities(self, service_name, host, capabilities,
cluster_name, timestamp):
"""Process a capability update from a service node."""
self.host_manager.update_service_capabilities(service_name,
host,
capabilities)
capabilities,
cluster_name,
timestamp)
def notify_service_capabilities(self, service_name, host, capabilities):
"""Notify capability update from a service node."""

View File

@ -74,12 +74,11 @@ class FilterScheduler(driver.Scheduler):
if not weighed_host:
raise exception.NoValidHost(reason=_("No weighed hosts available"))
host = weighed_host.obj.host
backend = weighed_host.obj
updated_group = driver.group_update_db(context, group, backend.host,
backend.cluster_name)
updated_group = driver.group_update_db(context, group, host)
self.volume_rpcapi.create_consistencygroup(context,
updated_group, host)
self.volume_rpcapi.create_consistencygroup(context, updated_group)
def schedule_create_group(self, context, group,
group_spec,
@ -96,12 +95,13 @@ class FilterScheduler(driver.Scheduler):
if not weighed_host:
raise exception.NoValidHost(reason=_("No weighed hosts available"))
host = weighed_host.obj.host
backend = weighed_host.obj
updated_group = driver.generic_group_update_db(context, group, host)
updated_group = driver.generic_group_update_db(context, group,
backend.host,
backend.cluster_name)
self.volume_rpcapi.create_group(context,
updated_group, host)
self.volume_rpcapi.create_group(context, updated_group)
def schedule_create_volume(self, context, request_spec, filter_properties):
weighed_host = self._schedule(context, request_spec,
@ -110,18 +110,20 @@ class FilterScheduler(driver.Scheduler):
if not weighed_host:
raise exception.NoValidHost(reason=_("No weighed hosts available"))
host = weighed_host.obj.host
backend = weighed_host.obj
volume_id = request_spec['volume_id']
updated_volume = driver.volume_update_db(context, volume_id, host)
updated_volume = driver.volume_update_db(context, volume_id,
backend.host,
backend.cluster_name)
self._post_select_populate_filter_properties(filter_properties,
weighed_host.obj)
backend)
# context is not serializable
filter_properties.pop('context', None)
self.volume_rpcapi.create_volume(context, updated_volume, host,
request_spec, filter_properties,
self.volume_rpcapi.create_volume(context, updated_volume, request_spec,
filter_properties,
allow_reschedule=True)
def host_passes_filters(self, context, host, request_spec,
@ -131,7 +133,7 @@ class FilterScheduler(driver.Scheduler):
filter_properties)
for weighed_host in weighed_hosts:
host_state = weighed_host.obj
if host_state.host == host:
if host_state.backend_id == host:
return host_state
volume_id = request_spec.get('volume_id', '??volume_id missing??')
@ -144,26 +146,27 @@ class FilterScheduler(driver.Scheduler):
migration_policy='never'):
"""Find a host that can accept the volume with its new type."""
filter_properties = filter_properties or {}
current_host = request_spec['volume_properties']['host']
backend = (request_spec['volume_properties'].get('cluster_name')
or request_spec['volume_properties']['host'])
# The volume already exists on this host, and so we shouldn't check if
# it can accept the volume again in the CapacityFilter.
filter_properties['vol_exists_on'] = current_host
# The volume already exists on this backend, and so we shouldn't check
# if it can accept the volume again in the CapacityFilter.
filter_properties['vol_exists_on'] = backend
weighed_hosts = self._get_weighted_candidates(context, request_spec,
filter_properties)
if not weighed_hosts:
weighed_backends = self._get_weighted_candidates(context, request_spec,
filter_properties)
if not weighed_backends:
raise exception.NoValidHost(reason=_('No valid hosts for volume '
'%(id)s with type %(type)s') %
{'id': request_spec['volume_id'],
'type': request_spec['volume_type']})
for weighed_host in weighed_hosts:
host_state = weighed_host.obj
if host_state.host == current_host:
return host_state
for weighed_backend in weighed_backends:
backend_state = weighed_backend.obj
if backend_state.backend_id == backend:
return backend_state
if utils.extract_host(current_host, 'pool') is None:
if utils.extract_host(backend, 'pool') is None:
# legacy volumes created before pool is introduced has no pool
# info in host. But host_state.host always include pool level
# info. In this case if above exact match didn't work out, we
@ -172,11 +175,12 @@ class FilterScheduler(driver.Scheduler):
# cause migration between pools on same host, which we consider
# it is different from migration between hosts thus allow that
# to happen even migration policy is 'never'.
for weighed_host in weighed_hosts:
host_state = weighed_host.obj
backend = utils.extract_host(host_state.host, 'backend')
if backend == current_host:
return host_state
for weighed_backend in weighed_backends:
backend_state = weighed_backend.obj
new_backend = utils.extract_host(backend_state.backend_id,
'backend')
if new_backend == backend:
return backend_state
if migration_policy == 'never':
raise exception.NoValidHost(reason=_('Current host not valid for '
@ -186,7 +190,7 @@ class FilterScheduler(driver.Scheduler):
{'id': request_spec['volume_id'],
'type': request_spec['volume_type']})
top_host = self._choose_top_host(weighed_hosts, request_spec)
top_host = self._choose_top_host(weighed_backends, request_spec)
return top_host.obj
def get_pools(self, context, filters):
@ -201,7 +205,7 @@ class FilterScheduler(driver.Scheduler):
been selected by the scheduling process.
"""
# Add a retry entry for the selected volume backend:
self._add_retry_host(filter_properties, host_state.host)
self._add_retry_host(filter_properties, host_state.backend_id)
def _add_retry_host(self, filter_properties, host):
"""Add a retry entry for the selected volume backend.
@ -418,8 +422,8 @@ class FilterScheduler(driver.Scheduler):
for host2 in temp_weighed_hosts:
# Should schedule creation of CG on backend level,
# not pool level.
if (utils.extract_host(host1.obj.host) ==
utils.extract_host(host2.obj.host)):
if (utils.extract_host(host1.obj.backend_id) ==
utils.extract_host(host2.obj.backend_id)):
new_weighed_hosts.append(host1)
weighed_hosts = new_weighed_hosts
if not weighed_hosts:
@ -530,8 +534,8 @@ class FilterScheduler(driver.Scheduler):
for host2 in host_list2:
# Should schedule creation of group on backend level,
# not pool level.
if (utils.extract_host(host1.obj.host) ==
utils.extract_host(host2.obj.host)):
if (utils.extract_host(host1.obj.backend_id) ==
utils.extract_host(host2.obj.backend_id)):
new_hosts.append(host1)
if not new_hosts:
return []
@ -615,7 +619,7 @@ class FilterScheduler(driver.Scheduler):
# Get host name including host@backend#pool info from
# weighed_hosts.
for host in weighed_hosts[::-1]:
backend = utils.extract_host(host.obj.host)
backend = utils.extract_host(host.obj.backend_id)
if backend != group_backend:
weighed_hosts.remove(host)
if not weighed_hosts:
@ -651,7 +655,7 @@ class FilterScheduler(driver.Scheduler):
def _choose_top_host(self, weighed_hosts, request_spec):
top_host = weighed_hosts[0]
host_state = top_host.obj
LOG.debug("Choosing %s", host_state.host)
LOG.debug("Choosing %s", host_state.backend_id)
volume_properties = request_spec['volume_properties']
host_state.consume_from_volume(volume_properties)
return top_host
@ -659,11 +663,11 @@ class FilterScheduler(driver.Scheduler):
def _choose_top_host_group(self, weighed_hosts, request_spec_list):
top_host = weighed_hosts[0]
host_state = top_host.obj
LOG.debug("Choosing %s", host_state.host)
LOG.debug("Choosing %s", host_state.backend_id)
return top_host
def _choose_top_host_generic_group(self, weighed_hosts):
top_host = weighed_hosts[0]
host_state = top_host.obj
LOG.debug("Choosing %s", host_state.host)
LOG.debug("Choosing %s", host_state.backend_id)
return top_host

View File

@ -24,6 +24,14 @@ class AffinityFilter(filters.BaseHostFilter):
def __init__(self):
self.volume_api = volume.API()
def _get_volumes(self, context, affinity_uuids, backend_state):
filters = {'id': affinity_uuids, 'deleted': False}
if backend_state.cluster_name:
filters['cluster_name'] = backend_state.cluster_name
else:
filters['host'] = backend_state.host
return self.volume_api.get_all(context, filters=filters)
class DifferentBackendFilter(AffinityFilter):
"""Schedule volume on a different back-end from a set of volumes."""
@ -53,11 +61,8 @@ class DifferentBackendFilter(AffinityFilter):
return False
if affinity_uuids:
return not self.volume_api.get_all(
context, filters={'host': host_state.host,
'id': affinity_uuids,
'deleted': False})
return not self._get_volumes(context, affinity_uuids,
host_state)
# With no different_host key
return True
@ -90,10 +95,7 @@ class SameBackendFilter(AffinityFilter):
return False
if affinity_uuids:
return self.volume_api.get_all(
context, filters={'host': host_state.host,
'id': affinity_uuids,
'deleted': False})
return self._get_volumes(context, affinity_uuids, host_state)
# With no same_host key
return True

View File

@ -36,25 +36,29 @@ class CapacityFilter(filters.BaseHostFilter):
# If the volume already exists on this host, don't fail it for
# insufficient capacity (e.g., if we are retyping)
if host_state.host == filter_properties.get('vol_exists_on'):
if host_state.backend_id == filter_properties.get('vol_exists_on'):
return True
spec = filter_properties.get('request_spec')
if spec:
volid = spec.get('volume_id')
grouping = 'cluster' if host_state.cluster_name else 'host'
if filter_properties.get('new_size'):
# If new_size is passed, we are allocating space to extend a volume
requested_size = (int(filter_properties.get('new_size')) -
int(filter_properties.get('size')))
LOG.debug('Checking if host %(host)s can extend the volume %(id)s'
'in %(size)s GB', {'host': host_state.host, 'id': volid,
'size': requested_size})
LOG.debug('Checking if %(grouping)s %(grouping_name)s can extend '
'the volume %(id)s in %(size)s GB',
{'grouping': grouping,
'grouping_name': host_state.backend_id, 'id': volid,
'size': requested_size})
else:
requested_size = filter_properties.get('size')
LOG.debug('Checking if host %(host)s can create a %(size)s GB '
'volume (%(id)s)',
{'host': host_state.host, 'id': volid,
LOG.debug('Checking if %(grouping)s %(grouping_name)s can create '
'a %(size)s GB volume (%(id)s)',
{'grouping': grouping,
'grouping_name': host_state.backend_id, 'id': volid,
'size': requested_size})
if host_state.free_capacity_gb is None:
@ -85,18 +89,16 @@ class CapacityFilter(filters.BaseHostFilter):
total = float(total_space)
if total <= 0:
LOG.warning(_LW("Insufficient free space for volume creation. "
"Total capacity is %(total).2f on host %(host)s."),
"Total capacity is %(total).2f on %(grouping)s "
"%(grouping_name)s."),
{"total": total,
"host": host_state.host})
"grouping": grouping,
"grouping_name": host_state.backend_id})
return False
# Calculate how much free space is left after taking into account
# the reserved space.
free = free_space - math.floor(total * reserved)
msg_args = {"host": host_state.host,
"requested": requested_size,
"available": free}
# NOTE(xyang): If 'provisioning:type' is 'thick' in extra_specs,
# we will not use max_over_subscription_ratio and
# provisioned_capacity_gb to determine whether a volume can be
@ -117,15 +119,18 @@ class CapacityFilter(filters.BaseHostFilter):
provisioned_ratio = ((host_state.provisioned_capacity_gb +
requested_size) / total)
if provisioned_ratio > host_state.max_over_subscription_ratio:
msg_args = {
"provisioned_ratio": provisioned_ratio,
"oversub_ratio": host_state.max_over_subscription_ratio,
"grouping": grouping,
"grouping_name": host_state.backend_id,
}
LOG.warning(_LW(
"Insufficient free space for thin provisioning. "
"The ratio of provisioned capacity over total capacity "
"%(provisioned_ratio).2f has exceeded the maximum over "
"subscription ratio %(oversub_ratio).2f on host "
"%(host)s."),
{"provisioned_ratio": provisioned_ratio,
"oversub_ratio": host_state.max_over_subscription_ratio,
"host": host_state.host})
"subscription ratio %(oversub_ratio).2f on %(grouping)s "
"%(grouping_name)s."), msg_args)
return False
else:
# Thin provisioning is enabled and projected over-subscription
@ -138,23 +143,30 @@ class CapacityFilter(filters.BaseHostFilter):
free * host_state.max_over_subscription_ratio)
return adjusted_free_virtual >= requested_size
elif thin and host_state.thin_provisioning_support:
LOG.warning(_LW("Filtering out host %(host)s with an invalid "
"maximum over subscription ratio of "
"%(oversub_ratio).2f. The ratio should be a "
LOG.warning(_LW("Filtering out %(grouping)s %(grouping_name)s "
"with an invalid maximum over subscription ratio "
"of %(oversub_ratio).2f. The ratio should be a "
"minimum of 1.0."),
{"oversub_ratio":
host_state.max_over_subscription_ratio,
"host": host_state.host})
"grouping": grouping,
"grouping_name": host_state.backend_id})
return False
msg_args = {"grouping_name": host_state.backend_id,
"grouping": grouping,
"requested": requested_size,
"available": free}
if free < requested_size:
LOG.warning(_LW("Insufficient free space for volume creation "
"on host %(host)s (requested / avail): "
"%(requested)s/%(available)s"), msg_args)
"on %(grouping)s %(grouping_name)s (requested / "
"avail): %(requested)s/%(available)s"),
msg_args)
return False
LOG.debug("Space information for volume creation "
"on host %(host)s (requested / avail): "
"on %(grouping)s %(grouping_name)s (requested / avail): "
"%(requested)s/%(available)s", msg_args)
return True

View File

@ -35,10 +35,11 @@ class DriverFilter(filters.BaseHostFilter):
"""Determines whether a host has a passing filter_function or not."""
stats = self._generate_stats(host_state, filter_properties)
LOG.debug("Checking host '%s'", stats['host_stats']['host'])
LOG.debug("Checking backend '%s'", stats['host_stats']['backend_id'])
result = self._check_filter_function(stats)
LOG.debug("Result: %s", result)
LOG.debug("Done checking host '%s'", stats['host_stats']['host'])
LOG.debug("Done checking backend '%s'",
stats['host_stats']['backend_id'])
return result
@ -89,6 +90,8 @@ class DriverFilter(filters.BaseHostFilter):
host_stats = {
'host': host_state.host,
'cluster_name': host_state.cluster_name,
'backend_id': host_state.backend_id,
'volume_backend_name': host_state.volume_backend_name,
'vendor_name': host_state.vendor_name,
'driver_version': host_state.driver_version,

View File

@ -45,7 +45,7 @@ class IgnoreAttemptedHostsFilter(filters.BaseHostFilter):
return True
hosts = attempted.get('hosts', [])
host = host_state.host
host = host_state.backend_id
passes = host not in hosts
pass_msg = "passes" if passes else "fails"

View File

@ -69,9 +69,9 @@ class InstanceLocalityFilter(filters.BaseHostFilter):
return self._nova_ext_srv_attr
def host_passes(self, host_state, filter_properties):
def host_passes(self, backend_state, filter_properties):
context = filter_properties['context']
host = volume_utils.extract_host(host_state.host, 'host')
host = volume_utils.extract_host(backend_state.backend_id, 'host')
scheduler_hints = filter_properties.get('scheduler_hints') or {}
instance_uuid = scheduler_hints.get(HINT_KEYWORD, None)

View File

@ -86,10 +86,11 @@ class ReadOnlyDict(collections.Mapping):
class HostState(object):
"""Mutable and immutable information tracked for a volume backend."""
def __init__(self, host, capabilities=None, service=None):
def __init__(self, host, cluster_name, capabilities=None, service=None):
self.capabilities = None
self.service = None
self.host = host
self.cluster_name = cluster_name
self.update_capabilities(capabilities, service)
self.volume_backend_name = None
@ -122,6 +123,10 @@ class HostState(object):
self.updated = None
@property
def backend_id(self):
return self.cluster_name or self.host
def update_capabilities(self, capabilities=None, service=None):
# Read-only capability dicts
@ -210,7 +215,8 @@ class HostState(object):
cur_pool = self.pools.get(pool_name, None)
if not cur_pool:
# Add new pool
cur_pool = PoolState(self.host, pool_cap, pool_name)
cur_pool = PoolState(self.host, self.cluster_name,
pool_cap, pool_name)
self.pools[pool_name] = cur_pool
cur_pool.update_from_volume_capability(pool_cap, service)
@ -227,7 +233,8 @@ class HostState(object):
if len(self.pools) == 0:
# No pool was there
single_pool = PoolState(self.host, capability, pool_name)
single_pool = PoolState(self.host, self.cluster_name,
capability, pool_name)
self._append_backend_info(capability)
self.pools[pool_name] = single_pool
else:
@ -235,7 +242,8 @@ class HostState(object):
try:
single_pool = self.pools[pool_name]
except KeyError:
single_pool = PoolState(self.host, capability, pool_name)
single_pool = PoolState(self.host, self.cluster_name,
capability, pool_name)
self._append_backend_info(capability)
self.pools[pool_name] = single_pool
@ -293,14 +301,18 @@ class HostState(object):
# FIXME(zhiteng) backend level free_capacity_gb isn't as
# meaningful as it used to be before pool is introduced, we'd
# come up with better representation of HostState.
return ("host '%s': free_capacity_gb: %s, pools: %s" %
(self.host, self.free_capacity_gb, self.pools))
grouping = 'cluster' if self.cluster_name else 'host'
grouping_name = self.backend_id
return ("%s '%s': free_capacity_gb: %s, pools: %s" %
(grouping, grouping_name, self.free_capacity_gb, self.pools))
class PoolState(HostState):
def __init__(self, host, capabilities, pool_name):
def __init__(self, host, cluster_name, capabilities, pool_name):
new_host = vol_utils.append_host(host, pool_name)
super(PoolState, self).__init__(new_host, capabilities)
new_cluster = vol_utils.append_host(cluster_name, pool_name)
super(PoolState, self).__init__(new_host, new_cluster, capabilities)
self.pool_name = pool_name
# No pools in pool
self.pools = None
@ -443,7 +455,8 @@ class HostManager(object):
hosts,
weight_properties)
def update_service_capabilities(self, service_name, host, capabilities):
def update_service_capabilities(self, service_name, host, capabilities,
cluster_name, timestamp):
"""Update the per-service capabilities based on this notification."""
if service_name != 'volume':
LOG.debug('Ignoring %(service_name)s service update '
@ -451,9 +464,12 @@ class HostManager(object):
{'service_name': service_name, 'host': host})
return
# TODO(geguileo): In P - Remove the next line since we receive the
# timestamp
timestamp = timestamp or timeutils.utcnow()
# Copy the capabilities, so we don't modify the original dict
capab_copy = dict(capabilities)
capab_copy["timestamp"] = timeutils.utcnow() # Reported time
capab_copy["timestamp"] = timestamp
# Set the default capabilities in case None is set.
capab_old = self.service_states.get(host, {"timestamp": 0})
@ -474,15 +490,19 @@ class HostManager(object):
self.service_states[host] = capab_copy
LOG.debug("Received %(service_name)s service update from "
"%(host)s: %(cap)s",
cluster_msg = (('Cluster: %s - Host: ' % cluster_name) if cluster_name
else '')
LOG.debug("Received %(service_name)s service update from %(cluster)s"
"%(host)s: %(cap)s%(cluster)s",
{'service_name': service_name, 'host': host,
'cap': capabilities})
'cap': capabilities,
'cluster': cluster_msg})
self._no_capabilities_hosts.discard(host)
def notify_service_capabilities(self, service_name, host, capabilities):
"""Notify the ceilometer with updated volume stats"""
# TODO(geguileo): Make this work with Active/Active
if service_name != 'volume':
return
@ -519,6 +539,7 @@ class HostManager(object):
volume_services = objects.ServiceList.get_all_by_topic(context,
topic,
disabled=False)
active_backends = set()
active_hosts = set()
no_capabilities_hosts = set()
for service in volume_services.objects:
@ -526,32 +547,46 @@ class HostManager(object):
if not service.is_up:
LOG.warning(_LW("volume service is down. (host: %s)"), host)
continue
capabilities = self.service_states.get(host, None)
if capabilities is None:
no_capabilities_hosts.add(host)
continue
host_state = self.host_state_map.get(host)
if not host_state:
host_state = self.host_state_cls(host,
capabilities=capabilities,
service=
dict(service))
self.host_state_map[host] = host_state
# update capabilities and attributes in host_state
host_state.update_from_volume_capability(capabilities,
service=
dict(service))
# Since the service could have been added or remove from a cluster
backend_key = service.service_topic_queue
backend_state = self.host_state_map.get(backend_key, None)
if not backend_state:
backend_state = self.host_state_cls(
host,
service.cluster_name,
capabilities=capabilities,
service=dict(service))
self.host_state_map[backend_key] = backend_state
# We may be receiving capability reports out of order from
# different volume services in a cluster, so we drop older updates
# and only update for newer capability reports.
if (backend_state.capabilities['timestamp'] <=
capabilities['timestamp']):
# update capabilities and attributes in backend_state
backend_state.update_from_volume_capability(
capabilities, service=dict(service))
active_backends.add(backend_key)
active_hosts.add(host)
self._no_capabilities_hosts = no_capabilities_hosts
# remove non-active hosts from host_state_map
nonactive_hosts = set(self.host_state_map.keys()) - active_hosts
for host in nonactive_hosts:
LOG.info(_LI("Removing non-active host: %(host)s from "
"scheduler cache."), {'host': host})
del self.host_state_map[host]
# remove non-active keys from host_state_map
inactive_backend_keys = set(self.host_state_map) - active_backends
for backend_key in inactive_backend_keys:
# NOTE(geguileo): We don't want to log the removal of a host from
# the map when we are removing it because it has been added to a
# cluster.
if backend_key not in active_hosts:
LOG.info(_LI("Removing non-active backend: %(backend)s from "
"scheduler cache."), {'backend': backend_key})
del self.host_state_map[backend_key]
def get_all_host_states(self, context):
"""Returns a dict of all the hosts the HostManager knows about.

View File

@ -19,12 +19,15 @@
Scheduler Service
"""
from datetime import datetime
import eventlet
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_utils import excutils
from oslo_utils import importutils
from oslo_utils import timeutils
import six
from cinder import context
@ -33,6 +36,7 @@ from cinder import exception
from cinder import flow_utils
from cinder.i18n import _, _LE
from cinder import manager
from cinder import objects
from cinder import quota
from cinder import rpc
from cinder.scheduler.flows import create_volume
@ -53,7 +57,7 @@ QUOTAS = quota.QUOTAS
LOG = logging.getLogger(__name__)
class SchedulerManager(manager.Manager):
class SchedulerManager(manager.CleanableManager, manager.Manager):
"""Chooses a host to create volumes."""
RPC_API_VERSION = scheduler_rpcapi.SchedulerAPI.RPC_API_VERSION
@ -80,13 +84,22 @@ class SchedulerManager(manager.Manager):
self.driver.reset()
def update_service_capabilities(self, context, service_name=None,
host=None, capabilities=None, **kwargs):
host=None, capabilities=None,
cluster_name=None, timestamp=None,
**kwargs):
"""Process a capability update from a service node."""
if capabilities is None:
capabilities = {}
# If we received the timestamp we have to deserialize it
elif timestamp:
timestamp = datetime.strptime(timestamp,
timeutils.PERFECT_TIME_FORMAT)
self.driver.update_service_capabilities(service_name,
host,
capabilities)
capabilities,
cluster_name,
timestamp)
def notify_service_capabilities(self, context, service_name,
host, capabilities):
@ -150,9 +163,9 @@ class SchedulerManager(manager.Manager):
group.status = 'error'
group.save()
@objects.Volume.set_workers
def create_volume(self, context, volume, snapshot_id=None, image_id=None,
request_spec=None, filter_properties=None):
self._wait_for_scheduler()
try:
@ -171,13 +184,21 @@ class SchedulerManager(manager.Manager):
with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
def _do_cleanup(self, ctxt, vo_resource):
# We can only receive cleanup requests for volumes, but we check anyway
# We need to cleanup the volume status for cases where the scheduler
# died while scheduling the volume creation.
if (isinstance(vo_resource, objects.Volume) and
vo_resource.status == 'creating'):
vo_resource.status = 'error'
vo_resource.save()
def request_service_capabilities(self, context):
volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
def migrate_volume_to_host(self, context, volume, host, force_host_copy,
request_spec, filter_properties=None):
def migrate_volume(self, context, volume, backend, force_copy,
request_spec, filter_properties):
"""Ensure that the host exists and can accept the volume."""
self._wait_for_scheduler()
def _migrate_volume_set_error(self, context, ex, request_spec):
@ -193,9 +214,9 @@ class SchedulerManager(manager.Manager):
context, ex, request_spec)
try:
tgt_host = self.driver.host_passes_filters(context, host,
request_spec,
filter_properties)
tgt_backend = self.driver.host_passes_filters(context, backend,
request_spec,
filter_properties)
except exception.NoValidHost as ex:
_migrate_volume_set_error(self, context, ex, request_spec)
except Exception as ex:
@ -203,8 +224,14 @@ class SchedulerManager(manager.Manager):
_migrate_volume_set_error(self, context, ex, request_spec)
else:
volume_rpcapi.VolumeAPI().migrate_volume(context, volume,
tgt_host,
force_host_copy)
tgt_backend,
force_copy)
# FIXME(geguileo): Remove this in v4.0 of RPC API.
def migrate_volume_to_host(self, context, volume, host, force_host_copy,
request_spec, filter_properties=None):
return self.migrate_volume(context, volume, host, force_host_copy,
request_spec, filter_properties)
def retype(self, context, volume, request_spec, filter_properties=None):
"""Schedule the modification of a volume's type.
@ -272,7 +299,7 @@ class SchedulerManager(manager.Manager):
try:
self.driver.host_passes_filters(context,
volume.host,
volume.service_topic_queue,
request_spec,
filter_properties)
except exception.NoValidHost as ex:
@ -306,7 +333,8 @@ class SchedulerManager(manager.Manager):
filter_properties['new_size'] = new_size
try:
self.driver.host_passes_filters(context, volume.host,
self.driver.host_passes_filters(context,
volume.service_topic_queue,
request_spec, filter_properties)
volume_rpcapi.VolumeAPI().extend_volume(context, volume, new_size,
reservations)

View File

@ -17,6 +17,7 @@ Client side of the scheduler manager RPC API.
"""
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from cinder.common import constants
from cinder import exception
@ -62,9 +63,12 @@ class SchedulerAPI(rpc.RPCAPI):
3.0 - Remove 2.x compatibility
3.1 - Adds notify_service_capabilities()
3.2 - Adds extend_volume()
3.3 - Add cluster support to migrate_volume, and to
update_service_capabilities and send the timestamp from the
capabilities.
"""
RPC_API_VERSION = '3.2'
RPC_API_VERSION = '3.3'
RPC_DEFAULT_VERSION = '3.0'
TOPIC = constants.SCHEDULER_TOPIC
BINARY = 'cinder-scheduler'
@ -106,15 +110,24 @@ class SchedulerAPI(rpc.RPCAPI):
'filter_properties': filter_properties, 'volume': volume}
return cctxt.cast(ctxt, 'create_volume', **msg_args)
def migrate_volume_to_host(self, ctxt, volume, host, force_host_copy=False,
request_spec=None, filter_properties=None):
cctxt = self._get_cctxt()
def migrate_volume(self, ctxt, volume, backend, force_copy=False,
request_spec=None, filter_properties=None):
request_spec_p = jsonutils.to_primitive(request_spec)
msg_args = {'host': host, 'force_host_copy': force_host_copy,
'request_spec': request_spec_p,
msg_args = {'request_spec': request_spec_p,
'filter_properties': filter_properties, 'volume': volume}
version = '3.3'
if self.client.can_send_version(version):
msg_args['backend'] = backend
msg_args['force_copy'] = force_copy
method = 'migrate_volume'
else:
version = '3.0'
msg_args['host'] = backend
msg_args['force_host_copy'] = force_copy
method = 'migrate_volume_to_host'
return cctxt.cast(ctxt, 'migrate_volume_to_host', **msg_args)
cctxt = self._get_cctxt(version=version)
return cctxt.cast(ctxt, method, **msg_args)
def retype(self, ctxt, volume, request_spec=None, filter_properties=None):
cctxt = self._get_cctxt()
@ -157,14 +170,27 @@ class SchedulerAPI(rpc.RPCAPI):
return cctxt.call(ctxt, 'get_pools', filters=filters)
def update_service_capabilities(self, ctxt, service_name, host,
capabilities):
cctxt = self._get_cctxt(fanout=True)
cctxt.cast(ctxt, 'update_service_capabilities',
service_name=service_name, host=host,
capabilities=capabilities)
capabilities, cluster_name,
timestamp=None):
msg_args = dict(service_name=service_name, host=host,
capabilities=capabilities)
version = '3.3'
# If server accepts timestamping the capabilities and the cluster name
if self.client.can_send_version(version):
# Serialize the timestamp
timestamp = timestamp or timeutils.utcnow()
msg_args.update(cluster_name=cluster_name,
timestamp=jsonutils.to_primitive(timestamp))
else:
version = '3.0'
cctxt = self._get_cctxt(fanout=True, version=version)
cctxt.cast(ctxt, 'update_service_capabilities', **msg_args)
def notify_service_capabilities(self, ctxt, service_name,
host, capabilities):
# TODO(geguileo): Make this work with Active/Active
cctxt = self._get_cctxt(version='3.1')
if not cctxt.can_send_version('3.1'):
msg = _('notify_service_capabilities requires cinder-scheduler '

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import fixtures
import mock
from oslo_concurrency import lockutils
@ -21,6 +22,7 @@ import webob
from webob import exc
from cinder.api.contrib import admin_actions
from cinder.api.openstack import api_version_request as api_version
from cinder.backup import api as backup_api
from cinder.backup import rpcapi as backup_rpcapi
from cinder.common import constants
@ -70,6 +72,7 @@ class BaseAdminTest(test.TestCase):
return volume
@ddt.ddt
class AdminActionsTest(BaseAdminTest):
def setUp(self):
super(AdminActionsTest, self).setUp()
@ -101,6 +104,7 @@ class AdminActionsTest(BaseAdminTest):
self.patch('cinder.objects.Service.get_minimum_rpc_version',
side_effect=_get_minimum_rpc_version_mock)
self.controller = admin_actions.VolumeAdminController()
def tearDown(self):
self.svc.stop()
@ -138,7 +142,7 @@ class AdminActionsTest(BaseAdminTest):
updated_status)
def test_valid_updates(self):
vac = admin_actions.VolumeAdminController()
vac = self.controller
vac.validate_update({'status': 'creating'})
vac.validate_update({'status': 'available'})
@ -503,10 +507,74 @@ class AdminActionsTest(BaseAdminTest):
{'host': 'test2',
'topic': constants.VOLUME_TOPIC,
'created_at': timeutils.utcnow()})
db.service_create(self.ctx,
{'host': 'clustered_host',
'topic': constants.VOLUME_TOPIC,
'binary': constants.VOLUME_BINARY,
'cluster_name': 'cluster',
'created_at': timeutils.utcnow()})
db.cluster_create(self.ctx,
{'name': 'cluster',
'binary': constants.VOLUME_BINARY})
# current status is available
volume = self._create_volume(self.ctx)
return volume
def _migrate_volume_3_exec(self, ctx, volume, host, expected_status,
force_host_copy=False, version=None,
cluster=None):
# build request to migrate to host
# req = fakes.HTTPRequest.blank('/v3/%s/volumes/%s/action' % (
# fake.PROJECT_ID, volume['id']))
req = webob.Request.blank('/v3/%s/volumes/%s/action' % (
fake.PROJECT_ID, volume['id']))
req.method = 'POST'
req.headers['content-type'] = 'application/json'
body = {'os-migrate_volume': {'host': host,
'force_host_copy': force_host_copy}}
version = version or '3.0'
req.headers = {'OpenStack-API-Version': 'volume %s' % version}
req.api_version_request = api_version.APIVersionRequest(version)
if version == '3.16':
body['os-migrate_volume']['cluster'] = cluster
req.body = jsonutils.dump_as_bytes(body)
req.environ['cinder.context'] = ctx
resp = self.controller._migrate_volume(req, volume.id, body)
# verify status
self.assertEqual(expected_status, resp.status_int)
volume = db.volume_get(self.ctx, volume['id'])
return volume
@ddt.data('3.0', '3.15', '3.16')
def test_migrate_volume_success_3(self, version):
expected_status = 202
host = 'test2'
volume = self._migrate_volume_prep()
volume = self._migrate_volume_3_exec(self.ctx, volume, host,
expected_status, version=version)
self.assertEqual('starting', volume['migration_status'])
def test_migrate_volume_success_cluster(self):
expected_status = 202
# We cannot provide host and cluster, so send host to None
host = None
cluster = 'cluster'
volume = self._migrate_volume_prep()
volume = self._migrate_volume_3_exec(self.ctx, volume, host,
expected_status, version='3.16',
cluster=cluster)
self.assertEqual('starting', volume['migration_status'])
def test_migrate_volume_fail_host_and_cluster(self):
# We cannot send host and cluster in the request
host = 'test2'
cluster = 'cluster'
volume = self._migrate_volume_prep()
self.assertRaises(exception.InvalidInput,
self._migrate_volume_3_exec, self.ctx, volume, host,
None, version='3.16', cluster=cluster)
def _migrate_volume_exec(self, ctx, volume, host, expected_status,
force_host_copy=False):
# build request to migrate to host

View File

@ -45,7 +45,7 @@ def volume_get(self, context, volume_id, viewable_admin_meta=False):
if volume_id == fake.VOLUME_ID:
return objects.Volume(context, id=fake.VOLUME_ID,
_name_id=fake.VOLUME2_ID,
host='fake_host')
host='fake_host', cluster_name=None)
raise exception.VolumeNotFound(volume_id=volume_id)
@ -109,7 +109,7 @@ class SnapshotManageTest(test.TestCase):
@mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot')
@mock.patch('cinder.volume.api.API.create_snapshot_in_db')
@mock.patch('cinder.db.service_get')
@mock.patch('cinder.db.sqlalchemy.api.service_get')
def test_manage_snapshot_ok(self, mock_db,
mock_create_snapshot, mock_rpcapi):
"""Test successful manage snapshot execution.
@ -128,7 +128,8 @@ class SnapshotManageTest(test.TestCase):
# Check the db.service_get was called with correct arguments.
mock_db.assert_called_once_with(
mock.ANY, host='fake_host', binary='cinder-volume')
mock.ANY, None, host='fake_host', binary='cinder-volume',
cluster_name=None)
# Check the create_snapshot_in_db was called with correct arguments.
self.assertEqual(1, mock_create_snapshot.call_count)
@ -149,7 +150,7 @@ class SnapshotManageTest(test.TestCase):
new_callable=mock.PropertyMock)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot')
@mock.patch('cinder.volume.api.API.create_snapshot_in_db')
@mock.patch('cinder.db.service_get')
@mock.patch('cinder.db.sqlalchemy.api.service_get')
def test_manage_snapshot_disabled(self, mock_db, mock_create_snapshot,
mock_rpcapi, mock_is_up):
"""Test manage snapshot failure due to disabled service."""
@ -168,7 +169,7 @@ class SnapshotManageTest(test.TestCase):
new_callable=mock.PropertyMock)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot')
@mock.patch('cinder.volume.api.API.create_snapshot_in_db')
@mock.patch('cinder.db.service_get')
@mock.patch('cinder.db.sqlalchemy.api.service_get')
def test_manage_snapshot_is_down(self, mock_db, mock_create_snapshot,
mock_rpcapi, mock_is_up):
"""Test manage snapshot failure due to down service."""
@ -280,7 +281,7 @@ class SnapshotManageTest(test.TestCase):
sort_dirs=['asc'], sort_keys=['reference'])
@mock.patch('cinder.objects.service.Service.is_up', return_value=True)
@mock.patch('cinder.db.service_get')
@mock.patch('cinder.db.sqlalchemy.api.service_get')
def test_get_manageable_snapshots_disabled(self, mock_db, mock_is_up):
mock_db.return_value = fake_service.fake_service_obj(self._admin_ctxt,
disabled=True)
@ -292,7 +293,7 @@ class SnapshotManageTest(test.TestCase):
@mock.patch('cinder.objects.service.Service.is_up', return_value=False,
new_callable=mock.PropertyMock)
@mock.patch('cinder.db.service_get')
@mock.patch('cinder.db.sqlalchemy.api.service_get')
def test_get_manageable_snapshots_is_down(self, mock_db, mock_is_up):
mock_db.return_value = fake_service.fake_service_obj(self._admin_ctxt)
res = self._get_resp_get('host_ok', False, True)

View File

@ -23,6 +23,7 @@ except ImportError:
from urllib.parse import urlencode
import webob
from cinder.api.contrib import volume_manage
from cinder.api.openstack import api_version_request as api_version
from cinder import context
from cinder import exception
@ -51,7 +52,7 @@ def app_v3():
return mapper
def service_get(context, host, binary):
def service_get(context, id, host=None, binary=None, *args, **kwargs):
"""Replacement for Service.service_get_by_host_and_topic.
We mock the Service.service_get_by_host_and_topic method to return
@ -146,7 +147,7 @@ def api_get_manageable_volumes(*args, **kwargs):
@ddt.ddt
@mock.patch('cinder.db.service_get', service_get)
@mock.patch('cinder.db.sqlalchemy.api.service_get', service_get)
@mock.patch('cinder.volume.volume_types.get_volume_type_by_name',
vt_get_volume_type_by_name)
@mock.patch('cinder.volume.volume_types.get_volume_type',
@ -173,6 +174,7 @@ class VolumeManageTest(test.TestCase):
self._non_admin_ctxt = context.RequestContext(fake.USER_ID,
fake.PROJECT_ID,
is_admin=False)
self.controller = volume_manage.VolumeManageController()
def _get_resp_post(self, body):
"""Helper to execute a POST os-volume-manage API call."""
@ -196,10 +198,11 @@ class VolumeManageTest(test.TestCase):
res = req.get_response(app_v3())
return res
@ddt.data(False, True)
@mock.patch('cinder.volume.api.API.manage_existing', wraps=api_manage)
@mock.patch(
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
def test_manage_volume_ok(self, mock_validate, mock_api_manage):
def test_manage_volume_ok(self, cluster, mock_validate, mock_api_manage):
"""Test successful manage volume execution.
Tests for correct operation when valid arguments are passed in the
@ -209,6 +212,9 @@ class VolumeManageTest(test.TestCase):
"""
body = {'volume': {'host': 'host_ok',
'ref': 'fake_ref'}}
# This will be ignored
if cluster:
body['volume']['cluster'] = 'cluster'
res = self._get_resp_post(body)
self.assertEqual(202, res.status_int)
@ -216,9 +222,48 @@ class VolumeManageTest(test.TestCase):
self.assertEqual(1, mock_api_manage.call_count)
args = mock_api_manage.call_args[0]
self.assertEqual(body['volume']['host'], args[1])
self.assertEqual(body['volume']['ref'], args[2])
self.assertIsNone(args[2]) # Cluster argument
self.assertEqual(body['volume']['ref'], args[3])
self.assertTrue(mock_validate.called)
def _get_resp_create(self, body, version='3.0'):
url = '/v3/%s/os-volume-manage' % fake.PROJECT_ID
req = webob.Request.blank(url, base_url='http://localhost.com' + url)
req.method = 'POST'
req.headers['Content-Type'] = 'application/json'
req.environ['cinder.context'] = self._admin_ctxt
req.body = jsonutils.dump_as_bytes(body)
req.headers = {'OpenStack-API-Version': 'volume %s' % version}
req.api_version_request = api_version.APIVersionRequest(version)
res = self.controller.create(req, body)
return res
@mock.patch('cinder.volume.api.API.manage_existing', wraps=api_manage)
@mock.patch(
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
def test_manage_volume_ok_cluster(self, mock_validate, mock_api_manage):
body = {'volume': {'cluster': 'cluster',
'ref': 'fake_ref'}}
res = self._get_resp_create(body, '3.16')
self.assertEqual(['volume'], list(res.keys()))
# Check that the manage API was called with the correct arguments.
self.assertEqual(1, mock_api_manage.call_count)
args = mock_api_manage.call_args[0]
self.assertIsNone(args[1])
self.assertEqual(body['volume']['cluster'], args[2])
self.assertEqual(body['volume']['ref'], args[3])
self.assertTrue(mock_validate.called)
@mock.patch(
'cinder.api.openstack.wsgi.Controller.validate_name_and_description')
def test_manage_volume_fail_host_cluster(self, mock_validate):
body = {'volume': {'host': 'host_ok',
'cluster': 'cluster',
'ref': 'fake_ref'}}
self.assertRaises(exception.InvalidInput,
self._get_resp_create, body, '3.16')
def test_manage_volume_missing_host(self):
"""Test correct failure when host is not specified."""
body = {'volume': {'ref': 'fake_ref'}}

View File

@ -60,7 +60,7 @@ class SnapshotManageTest(test.TestCase):
@mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot')
@mock.patch('cinder.volume.api.API.create_snapshot_in_db')
@mock.patch('cinder.objects.service.Service.get_by_args')
@mock.patch('cinder.objects.service.Service.get_by_id')
def test_manage_snapshot_route(self, mock_service_get,
mock_create_snapshot, mock_rpcapi):
"""Test call to manage snapshot.

View File

@ -23,6 +23,9 @@ from cinder.scheduler import filter_scheduler
from cinder.scheduler import host_manager
UTC_NOW = timeutils.utcnow()
class FakeFilterScheduler(filter_scheduler.FilterScheduler):
def __init__(self, *args, **kwargs):
super(FakeFilterScheduler, self).__init__(*args, **kwargs)
@ -43,7 +46,7 @@ class FakeHostManager(host_manager.HostManager):
'thick_provisioning_support': True,
'reserved_percentage': 10,
'volume_backend_name': 'lvm1',
'timestamp': None},
'timestamp': UTC_NOW},
'host2': {'total_capacity_gb': 2048,
'free_capacity_gb': 300,
'allocated_capacity_gb': 1748,
@ -53,7 +56,7 @@ class FakeHostManager(host_manager.HostManager):
'thick_provisioning_support': False,
'reserved_percentage': 10,
'volume_backend_name': 'lvm2',
'timestamp': None},
'timestamp': UTC_NOW},
'host3': {'total_capacity_gb': 512,
'free_capacity_gb': 256,
'allocated_capacity_gb': 256,
@ -63,7 +66,7 @@ class FakeHostManager(host_manager.HostManager):
'thick_provisioning_support': True,
'reserved_percentage': 0,
'volume_backend_name': 'lvm3',
'timestamp': None},
'timestamp': UTC_NOW},
'host4': {'total_capacity_gb': 2048,
'free_capacity_gb': 200,
'allocated_capacity_gb': 1848,
@ -73,7 +76,7 @@ class FakeHostManager(host_manager.HostManager):
'thick_provisioning_support': False,
'reserved_percentage': 5,
'volume_backend_name': 'lvm4',
'timestamp': None,
'timestamp': UTC_NOW,
'consistencygroup_support': True},
'host5': {'total_capacity_gb': 'infinite',
'free_capacity_gb': 'unknown',
@ -83,13 +86,13 @@ class FakeHostManager(host_manager.HostManager):
'thin_provisioning_support': True,
'thick_provisioning_support': False,
'reserved_percentage': 5,
'timestamp': None},
'timestamp': UTC_NOW},
}
class FakeHostState(host_manager.HostState):
def __init__(self, host, attribute_dict):
super(FakeHostState, self).__init__(host)
super(FakeHostState, self).__init__(host, None)
for (key, val) in attribute_dict.items():
setattr(self, key, val)

View File

@ -178,7 +178,7 @@ class TestBaseFilterHandler(test.TestCase):
def test_get_filtered_objects_info_and_debug_log_none_returned(self):
all_filters = [FilterA, FilterA, FilterB]
fake_hosts = [host_manager.HostState('fake_host%s' % x)
fake_hosts = [host_manager.HostState('fake_host%s' % x, None)
for x in range(1, 4)]
filt_props = {"request_spec": {'volume_id': fake.VOLUME_ID,

View File

@ -15,6 +15,7 @@
"""
Tests For Capacity Weigher.
"""
from datetime import datetime
import ddt
import mock
@ -248,7 +249,7 @@ class CapacityWeigherTestCase(test.TestCase):
'thin_provisioning_support': True,
'thick_provisioning_support': False,
'reserved_percentage': 5,
'timestamp': None}
'timestamp': datetime.utcnow()}
hostinfo_list = self._get_all_hosts()
# host1: thin_provisioning_support = False
@ -290,7 +291,7 @@ class CapacityWeigherTestCase(test.TestCase):
'thin_provisioning_support': True,
'thick_provisioning_support': False,
'reserved_percentage': 5,
'timestamp': None}
'timestamp': datetime.utcnow()}
hostinfo_list = self._get_all_hosts()
# host1: thin_provisioning_support = False
@ -332,7 +333,7 @@ class CapacityWeigherTestCase(test.TestCase):
'thin_provisioning_support': True,
'thick_provisioning_support': False,
'reserved_percentage': 5,
'timestamp': None}
'timestamp': datetime.utcnow()}
hostinfo_list = self._get_all_hosts()
# host1: thin_provisioning_support = False
@ -374,7 +375,7 @@ class CapacityWeigherTestCase(test.TestCase):
'thin_provisioning_support': True,
'thick_provisioning_support': False,
'reserved_percentage': 5,
'timestamp': None}
'timestamp': datetime.utcnow()}
hostinfo_list = self._get_all_hosts()
# host1: thin_provisioning_support = False

View File

@ -58,7 +58,7 @@ class ChanceWeigherTestCase(test.TestCase):
# ensure we don't lose any hosts when weighing with
# the ChanceWeigher
hm = host_manager.HostManager()
fake_hosts = [host_manager.HostState('fake_host%s' % x)
fake_hosts = [host_manager.HostState('fake_host%s' % x, None)
for x in range(1, 5)]
weighed_hosts = hm.get_weighed_hosts(fake_hosts, {}, 'ChanceWeigher')
self.assertEqual(4, len(weighed_hosts))

View File

@ -414,7 +414,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
filter_properties = {'retry': retry}
sched = fakes.FakeFilterScheduler()
host_state = host_manager.HostState('host')
host_state = host_manager.HostState('host', None)
host_state.total_capacity_gb = 1024
sched._post_select_populate_filter_properties(filter_properties,
host_state)

View File

@ -19,14 +19,18 @@ Tests For HostManager
from datetime import datetime
import mock
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from cinder.common import constants
from cinder import context
from cinder import db
from cinder import exception
from cinder import objects
from cinder.scheduler import filters
from cinder.scheduler import host_manager
from cinder import test
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit.objects import test_service
@ -46,7 +50,7 @@ class HostManagerTestCase(test.TestCase):
def setUp(self):
super(HostManagerTestCase, self).setUp()
self.host_manager = host_manager.HostManager()
self.fake_hosts = [host_manager.HostState('fake_host%s' % x)
self.fake_hosts = [host_manager.HostState('fake_host%s' % x, None)
for x in range(1, 5)]
# For a second scheduler service.
self.host_manager_1 = host_manager.HostManager()
@ -93,27 +97,29 @@ class HostManagerTestCase(test.TestCase):
_mock_get_updated_pools):
service_states = self.host_manager.service_states
self.assertDictMatch({}, service_states)
_mock_utcnow.side_effect = [31337, 31338, 31339]
_mock_utcnow.side_effect = [31338, 31339]
_mock_get_updated_pools.return_value = []
host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=1)
host2_volume_capabs = dict(free_capacity_gb=5432, timestamp=1)
host3_volume_capabs = dict(free_capacity_gb=6543, timestamp=1)
timestamp = jsonutils.to_primitive(datetime.utcnow())
host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=timestamp)
host2_volume_capabs = dict(free_capacity_gb=5432)
host3_volume_capabs = dict(free_capacity_gb=6543)
service_name = 'volume'
self.host_manager.update_service_capabilities(service_name, 'host1',
host1_volume_capabs)
host1_volume_capabs,
None, timestamp)
self.host_manager.update_service_capabilities(service_name, 'host2',
host2_volume_capabs)
host2_volume_capabs,
None, None)
self.host_manager.update_service_capabilities(service_name, 'host3',
host3_volume_capabs)
host3_volume_capabs,
None, None)
# Make sure dictionary isn't re-assigned
self.assertEqual(service_states, self.host_manager.service_states)
# Make sure original dictionary wasn't copied
self.assertEqual(1, host1_volume_capabs['timestamp'])
host1_volume_capabs['timestamp'] = 31337
host1_volume_capabs['timestamp'] = timestamp
host2_volume_capabs['timestamp'] = 31338
host3_volume_capabs['timestamp'] = 31339
@ -150,7 +156,7 @@ class HostManagerTestCase(test.TestCase):
# S0: update_service_capabilities()
self.host_manager.update_service_capabilities(service_name, 'host1',
capab1)
capab1, None, None)
self.assertDictMatch(dict(dict(timestamp=31337), **capab1),
self.host_manager.service_states['host1'])
@ -168,7 +174,7 @@ class HostManagerTestCase(test.TestCase):
# S1: update_service_capabilities()
self.host_manager_1.update_service_capabilities(service_name, 'host1',
capab1)
capab1, None, None)
self.assertDictMatch(dict(dict(timestamp=31339), **capab1),
self.host_manager_1.service_states['host1'])
@ -208,7 +214,7 @@ class HostManagerTestCase(test.TestCase):
# S0: update_service_capabilities()
self.host_manager.update_service_capabilities(service_name, 'host1',
capab1)
capab1, None, None)
self.assertDictMatch(dict(dict(timestamp=31340), **capab1),
self.host_manager.service_states['host1'])
@ -219,7 +225,7 @@ class HostManagerTestCase(test.TestCase):
# S1: update_service_capabilities()
self.host_manager_1.update_service_capabilities(service_name, 'host1',
capab1)
capab1, None, None)
self.assertDictMatch(dict(dict(timestamp=31341), **capab1),
self.host_manager_1.service_states['host1'])
@ -292,7 +298,7 @@ class HostManagerTestCase(test.TestCase):
# S0: update_service_capabilities()
self.host_manager.update_service_capabilities(service_name, 'host1',
capab1)
capab1, None, None)
self.assertDictMatch(
dict(dict(timestamp=31340), **capab1),
@ -303,7 +309,7 @@ class HostManagerTestCase(test.TestCase):
# S1: update_service_capabilities()
self.host_manager_1.update_service_capabilities(service_name, 'host1',
capab1)
capab1, None, None)
self.assertDictMatch(dict(dict(timestamp=31345), **capab1),
self.host_manager_1.service_states['host1'])
@ -355,7 +361,7 @@ class HostManagerTestCase(test.TestCase):
# S0: update_service_capabilities()
self.host_manager.update_service_capabilities(service_name, 'host1',
capab2)
capab2, None, None)
self.assertDictMatch(
dict(dict(timestamp=31340), **capab1),
self.host_manager.service_states_last_update['host1'])
@ -378,7 +384,7 @@ class HostManagerTestCase(test.TestCase):
# S1: update_service_capabilities()
self.host_manager_1.update_service_capabilities(service_name, 'host1',
capab2)
capab2, None, None)
self.assertDictMatch(dict(dict(timestamp=31348), **capab2),
self.host_manager_1.service_states['host1'])
@ -452,7 +458,7 @@ class HostManagerTestCase(test.TestCase):
# S0: update_service_capabilities()
self.host_manager.update_service_capabilities(service_name, 'host1',
capab2)
capab2, None, None)
self.assertDictMatch(
dict(dict(timestamp=31349), **capab2),
self.host_manager.service_states_last_update['host1'])
@ -462,7 +468,7 @@ class HostManagerTestCase(test.TestCase):
# S1: update_service_capabilities()
self.host_manager_1.update_service_capabilities(service_name, 'host1',
capab2)
capab2, None, None)
self.assertDictMatch(
dict(dict(timestamp=31348), **capab2),
@ -490,19 +496,23 @@ class HostManagerTestCase(test.TestCase):
self.host_manager = host_manager.HostManager()
self.assertFalse(self.host_manager.has_all_capabilities())
host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=1)
host2_volume_capabs = dict(free_capacity_gb=5432, timestamp=1)
host3_volume_capabs = dict(free_capacity_gb=6543, timestamp=1)
timestamp = jsonutils.to_primitive(datetime.utcnow())
host1_volume_capabs = dict(free_capacity_gb=4321)
host2_volume_capabs = dict(free_capacity_gb=5432)
host3_volume_capabs = dict(free_capacity_gb=6543)
service_name = 'volume'
self.host_manager.update_service_capabilities(service_name, 'host1',
host1_volume_capabs)
host1_volume_capabs,
None, timestamp)
self.assertFalse(self.host_manager.has_all_capabilities())
self.host_manager.update_service_capabilities(service_name, 'host2',
host2_volume_capabs)
host2_volume_capabs,
None, timestamp)
self.assertFalse(self.host_manager.has_all_capabilities())
self.host_manager.update_service_capabilities(service_name, 'host3',
host3_volume_capabs)
host3_volume_capabs,
None, timestamp)
self.assertTrue(self.host_manager.has_all_capabilities())
@mock.patch('cinder.db.service_get_all')
@ -532,7 +542,7 @@ class HostManagerTestCase(test.TestCase):
mocked_service_states = {
'host1': dict(volume_backend_name='AAA',
total_capacity_gb=512, free_capacity_gb=200,
timestamp=None, reserved_percentage=0),
timestamp=dates[1], reserved_percentage=0),
}
_mock_service_get_all.return_value = services
@ -547,24 +557,93 @@ class HostManagerTestCase(test.TestCase):
mocked_service_states):
self.host_manager.update_service_capabilities(service_name,
'host1',
host_volume_capabs)
host_volume_capabs,
None, None)
res = self.host_manager.get_pools(context)
self.assertEqual(1, len(res))
self.assertEqual(dates[1], res[0]['capabilities']['timestamp'])
self.host_manager.update_service_capabilities(service_name,
'host1',
host_volume_capabs)
host_volume_capabs,
None, None)
res = self.host_manager.get_pools(context)
self.assertEqual(1, len(res))
self.assertEqual(dates[2], res[0]['capabilities']['timestamp'])
@mock.patch('cinder.objects.Service.is_up', True)
def test_get_all_host_states_cluster(self):
"""Test get_all_host_states when we have clustered services.
Confirm that clustered services are grouped and that only the latest
of the capability reports is relevant.
"""
ctxt = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
cluster_name = 'cluster'
db.cluster_create(ctxt, {'name': cluster_name,
'binary': constants.VOLUME_BINARY})
services = (
db.service_create(ctxt,
{'host': 'clustered_host_1',
'topic': constants.VOLUME_TOPIC,
'binary': constants.VOLUME_BINARY,
'cluster_name': cluster_name,
'created_at': timeutils.utcnow()}),
# Even if this service is disabled, since it belongs to an enabled
# cluster, it's not really disabled.
db.service_create(ctxt,
{'host': 'clustered_host_2',
'topic': constants.VOLUME_TOPIC,
'binary': constants.VOLUME_BINARY,
'disabled': True,
'cluster_name': cluster_name,
'created_at': timeutils.utcnow()}),
db.service_create(ctxt,
{'host': 'clustered_host_3',
'topic': constants.VOLUME_TOPIC,
'binary': constants.VOLUME_BINARY,
'cluster_name': cluster_name,
'created_at': timeutils.utcnow()}),
db.service_create(ctxt,
{'host': 'non_clustered_host',
'topic': constants.VOLUME_TOPIC,
'binary': constants.VOLUME_BINARY,
'created_at': timeutils.utcnow()}),
# This service has no capabilities
db.service_create(ctxt,
{'host': 'no_capabilities_host',
'topic': constants.VOLUME_TOPIC,
'binary': constants.VOLUME_BINARY,
'created_at': timeutils.utcnow()}),
)
capabilities = ((1, {'free_capacity_gb': 1000}),
# This is the capacity that will be selected for the
# cluster because is the one with the latest timestamp.
(3, {'free_capacity_gb': 2000}),
(2, {'free_capacity_gb': 3000}),
(1, {'free_capacity_gb': 4000}))
for i in range(len(capabilities)):
self.host_manager.update_service_capabilities(
'volume', services[i].host, capabilities[i][1],
services[i].cluster_name, capabilities[i][0])
res = self.host_manager.get_all_host_states(ctxt)
result = {(s.cluster_name or s.host, s.free_capacity_gb) for s in res}
expected = {(cluster_name + '#_pool0', 2000),
('non_clustered_host#_pool0', 4000)}
self.assertSetEqual(expected, result)
@mock.patch('cinder.db.service_get_all')
@mock.patch('cinder.objects.service.Service.is_up',
new_callable=mock.PropertyMock)
def test_get_all_host_states(self, _mock_service_is_up,
_mock_service_get_all):
context = 'fake_context'
timestamp = datetime.utcnow()
topic = constants.VOLUME_TOPIC
services = [
@ -596,15 +675,15 @@ class HostManagerTestCase(test.TestCase):
service_states = {
'host1': dict(volume_backend_name='AAA',
total_capacity_gb=512, free_capacity_gb=200,
timestamp=None, reserved_percentage=0,
timestamp=timestamp, reserved_percentage=0,
provisioned_capacity_gb=312),
'host2': dict(volume_backend_name='BBB',
total_capacity_gb=256, free_capacity_gb=100,
timestamp=None, reserved_percentage=0,
timestamp=timestamp, reserved_percentage=0,
provisioned_capacity_gb=156),
'host3': dict(volume_backend_name='CCC',
total_capacity_gb=10000, free_capacity_gb=700,
timestamp=None, reserved_percentage=0,
timestamp=timestamp, reserved_percentage=0,
provisioned_capacity_gb=9300),
}
# First test: service.is_up is always True, host5 is disabled,
@ -665,6 +744,7 @@ class HostManagerTestCase(test.TestCase):
def test_get_pools(self, _mock_service_is_up,
_mock_service_get_all):
context = 'fake_context'
timestamp = datetime.utcnow()
services = [
dict(id=1, host='host1', topic='volume', disabled=False,
@ -678,15 +758,15 @@ class HostManagerTestCase(test.TestCase):
mocked_service_states = {
'host1': dict(volume_backend_name='AAA',
total_capacity_gb=512, free_capacity_gb=200,
timestamp=None, reserved_percentage=0,
timestamp=timestamp, reserved_percentage=0,
provisioned_capacity_gb=312),
'host2@back1': dict(volume_backend_name='BBB',
total_capacity_gb=256, free_capacity_gb=100,
timestamp=None, reserved_percentage=0,
timestamp=timestamp, reserved_percentage=0,
provisioned_capacity_gb=156),
'host2@back2': dict(volume_backend_name='CCC',
total_capacity_gb=10000, free_capacity_gb=700,
timestamp=None, reserved_percentage=0,
timestamp=timestamp, reserved_percentage=0,
provisioned_capacity_gb=9300),
}
@ -706,7 +786,7 @@ class HostManagerTestCase(test.TestCase):
{
'name': 'host1#AAA',
'capabilities': {
'timestamp': None,
'timestamp': timestamp,
'volume_backend_name': 'AAA',
'free_capacity_gb': 200,
'driver_version': None,
@ -719,7 +799,7 @@ class HostManagerTestCase(test.TestCase):
{
'name': 'host2@back1#BBB',
'capabilities': {
'timestamp': None,
'timestamp': timestamp,
'volume_backend_name': 'BBB',
'free_capacity_gb': 100,
'driver_version': None,
@ -732,7 +812,7 @@ class HostManagerTestCase(test.TestCase):
{
'name': 'host2@back2#CCC',
'capabilities': {
'timestamp': None,
'timestamp': timestamp,
'volume_backend_name': 'CCC',
'free_capacity_gb': 700,
'driver_version': None,
@ -887,7 +967,7 @@ class HostStateTestCase(test.TestCase):
"""Test case for HostState class."""
def test_update_from_volume_capability_nopool(self):
fake_host = host_manager.HostState('host1')
fake_host = host_manager.HostState('host1', None)
self.assertIsNone(fake_host.free_capacity_gb)
volume_capability = {'total_capacity_gb': 1024,
@ -922,7 +1002,7 @@ class HostStateTestCase(test.TestCase):
self.assertRaises(KeyError, lambda: fake_host.pools['pool0'])
def test_update_from_volume_capability_with_pools(self):
fake_host = host_manager.HostState('host1')
fake_host = host_manager.HostState('host1', None)
self.assertIsNone(fake_host.free_capacity_gb)
capability = {
'volume_backend_name': 'Local iSCSI',
@ -1014,7 +1094,7 @@ class HostStateTestCase(test.TestCase):
fake_host.pools['3rd pool'].provisioned_capacity_gb)
def test_update_from_volume_infinite_capability(self):
fake_host = host_manager.HostState('host1')
fake_host = host_manager.HostState('host1', None)
self.assertIsNone(fake_host.free_capacity_gb)
volume_capability = {'total_capacity_gb': 'infinite',
@ -1035,7 +1115,7 @@ class HostStateTestCase(test.TestCase):
fake_host.pools['_pool0'].free_capacity_gb)
def test_update_from_volume_unknown_capability(self):
fake_host = host_manager.HostState('host1')
fake_host = host_manager.HostState('host1', None)
self.assertIsNone(fake_host.free_capacity_gb)
volume_capability = {'total_capacity_gb': 'infinite',
@ -1056,7 +1136,7 @@ class HostStateTestCase(test.TestCase):
fake_host.pools['_pool0'].free_capacity_gb)
def test_update_from_empty_volume_capability(self):
fake_host = host_manager.HostState('host1')
fake_host = host_manager.HostState('host1', None)
vol_cap = {'timestamp': None}
@ -1076,7 +1156,7 @@ class PoolStateTestCase(test.TestCase):
"""Test case for HostState class."""
def test_update_from_volume_capability(self):
fake_pool = host_manager.PoolState('host1', None, 'pool0')
fake_pool = host_manager.PoolState('host1', None, None, 'pool0')
self.assertIsNone(fake_pool.free_capacity_gb)
volume_capability = {'total_capacity_gb': 1024,

View File

@ -17,6 +17,7 @@
Unit Tests for cinder.scheduler.rpcapi
"""
import ddt
import mock
from cinder import context
@ -27,6 +28,7 @@ from cinder.tests.unit import fake_constants
from cinder.tests.unit import fake_volume
@ddt.ddt
class SchedulerRpcAPITestCase(test.TestCase):
def setUp(self):
@ -75,14 +77,20 @@ class SchedulerRpcAPITestCase(test.TestCase):
for kwarg, value in self.fake_kwargs.items():
self.assertEqual(expected_msg[kwarg], value)
def test_update_service_capabilities(self):
@ddt.data('3.0', '3.3')
@mock.patch('oslo_messaging.RPCClient.can_send_version')
def test_update_service_capabilities(self, version, can_send_version):
can_send_version.side_effect = lambda x: x == version
self._test_scheduler_api('update_service_capabilities',
rpc_method='cast',
service_name='fake_name',
host='fake_host',
capabilities='fake_capabilities',
cluster_name='cluster_name',
capabilities={},
fanout=True,
version='3.0')
version=version,
timestamp='123')
can_send_version.assert_called_once_with('3.3')
def test_create_volume(self):
volume = fake_volume.fake_volume_obj(self.context)
@ -135,17 +143,18 @@ class SchedulerRpcAPITestCase(test.TestCase):
version='3.0')
create_worker_mock.assert_called_once()
def test_migrate_volume_to_host(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version')
def test_migrate_volume(self, can_send_version):
volume = fake_volume.fake_volume_obj(self.context)
create_worker_mock = self.mock_object(volume, 'create_worker')
self._test_scheduler_api('migrate_volume_to_host',
self._test_scheduler_api('migrate_volume',
rpc_method='cast',
host='host',
force_host_copy=True,
backend='host',
force_copy=True,
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume=volume,
version='3.0')
version='3.3')
create_worker_mock.assert_not_called()
def test_retype(self):

View File

@ -103,7 +103,7 @@ class SchedulerManagerTestCase(test.TestCase):
self.manager.update_service_capabilities(self.context,
service_name=service,
host=host)
_mock_update_cap.assert_called_once_with(service, host, {})
_mock_update_cap.assert_called_once_with(service, host, {}, None, None)
@mock.patch('cinder.scheduler.driver.Scheduler.'
'update_service_capabilities')
@ -117,7 +117,8 @@ class SchedulerManagerTestCase(test.TestCase):
service_name=service,
host=host,
capabilities=capabilities)
_mock_update_cap.assert_called_once_with(service, host, capabilities)
_mock_update_cap.assert_called_once_with(service, host, capabilities,
None, None)
@mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
@mock.patch('cinder.message.api.API.create')
@ -164,6 +165,19 @@ class SchedulerManagerTestCase(test.TestCase):
request_spec_obj, {})
self.assertFalse(_mock_sleep.called)
@mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
@mock.patch('eventlet.sleep')
def test_create_volume_set_worker(self, _mock_sleep, _mock_sched_create):
"""Make sure that the worker is created when creating a volume."""
volume = tests_utils.create_volume(self.context, status='creating')
request_spec = {'volume_id': volume.id}
self.manager.create_volume(self.context, volume,
request_spec=request_spec,
filter_properties={})
volume.set_worker.assert_called_once_with()
@mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
@mock.patch('cinder.scheduler.driver.Scheduler.is_ready')
@mock.patch('eventlet.sleep')
@ -326,6 +340,13 @@ class SchedulerManagerTestCase(test.TestCase):
self.manager.driver = original_driver
def test_do_cleanup(self):
vol = tests_utils.create_volume(self.context, status='creating')
self.manager._do_cleanup(self.context, vol)
vol.refresh()
self.assertEqual('error', vol.status)
class SchedulerTestCase(test.TestCase):
"""Test case for base scheduler driver class."""
@ -346,9 +367,9 @@ class SchedulerTestCase(test.TestCase):
host = 'fake_host'
capabilities = {'fake_capability': 'fake_value'}
self.driver.update_service_capabilities(service_name, host,
capabilities)
capabilities, None)
_mock_update_cap.assert_called_once_with(service_name, host,
capabilities)
capabilities, None)
@mock.patch('cinder.scheduler.host_manager.HostManager.'
'has_all_capabilities', return_value=False)
@ -387,8 +408,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
volume = fake_volume.fake_volume_obj(self.context)
_mock_volume_get.return_value = volume
driver.volume_update_db(self.context, volume.id, 'fake_host')
driver.volume_update_db(self.context, volume.id, 'fake_host',
'fake_cluster')
scheduled_at = volume.scheduled_at.replace(tzinfo=None)
_mock_vol_update.assert_called_once_with(
self.context, volume.id, {'host': 'fake_host',
'cluster_name': 'fake_cluster',
'scheduled_at': scheduled_at})

View File

@ -167,6 +167,39 @@ class DBAPIServiceTestCase(BaseTest):
real_service1 = db.service_get(self.ctxt, host='host1', topic='topic1')
self._assertEqualObjects(service1, real_service1)
def test_service_get_all_disabled_by_cluster(self):
values = [
# Enabled services
{'host': 'host1', 'binary': 'b1', 'disabled': False},
{'host': 'host2', 'binary': 'b1', 'disabled': False,
'cluster_name': 'enabled_cluster'},
{'host': 'host3', 'binary': 'b1', 'disabled': True,
'cluster_name': 'enabled_cluster'},
# Disabled services
{'host': 'host4', 'binary': 'b1', 'disabled': True},
{'host': 'host5', 'binary': 'b1', 'disabled': False,
'cluster_name': 'disabled_cluster'},
{'host': 'host6', 'binary': 'b1', 'disabled': True,
'cluster_name': 'disabled_cluster'},
]
db.cluster_create(self.ctxt, {'name': 'enabled_cluster',
'binary': 'b1',
'disabled': False}),
db.cluster_create(self.ctxt, {'name': 'disabled_cluster',
'binary': 'b1',
'disabled': True}),
services = [self._create_service(vals) for vals in values]
enabled = db.service_get_all(self.ctxt, disabled=False)
disabled = db.service_get_all(self.ctxt, disabled=True)
self.assertSetEqual({s.host for s in services[:3]},
{s.host for s in enabled})
self.assertSetEqual({s.host for s in services[3:]},
{s.host for s in disabled})
def test_service_get_all(self):
expired = (datetime.datetime.utcnow()
- datetime.timedelta(seconds=CONF.service_down_time + 1))

View File

@ -344,20 +344,19 @@ class VolumeTestCase(base.BaseVolumeTestCase):
def test_init_host_added_to_cluster(self, cg_include_mock,
vol_include_mock, vol_get_all_mock,
snap_get_all_mock):
self.mock_object(self.volume, 'cluster', mock.sentinel.cluster)
cluster = str(mock.sentinel.cluster)
self.mock_object(self.volume, 'cluster', cluster)
self.volume.init_host(added_to_cluster=True,
service_id=self.service_id)
vol_include_mock.assert_called_once_with(mock.ANY,
mock.sentinel.cluster,
vol_include_mock.assert_called_once_with(mock.ANY, cluster,
host=self.volume.host)
cg_include_mock.assert_called_once_with(mock.ANY,
mock.sentinel.cluster,
cg_include_mock.assert_called_once_with(mock.ANY, cluster,
host=self.volume.host)
vol_get_all_mock.assert_called_once_with(
mock.ANY, filters={'cluster_name': mock.sentinel.cluster})
mock.ANY, filters={'cluster_name': cluster})
snap_get_all_mock.assert_called_once_with(
mock.ANY, search_opts={'cluster_name': mock.sentinel.cluster})
mock.ANY, search_opts={'cluster_name': cluster})
@mock.patch('cinder.objects.service.Service.get_minimum_rpc_version')
@mock.patch('cinder.objects.service.Service.get_minimum_obj_version')
@ -4785,7 +4784,7 @@ class VolumeMigrationTestCase(base.BaseVolumeTestCase):
self.assertEqual('newhost', volume.host)
self.assertEqual('success', volume.migration_status)
def _fake_create_volume(self, ctxt, volume, host, req_spec, filters,
def _fake_create_volume(self, ctxt, volume, req_spec, filters,
allow_reschedule=True):
return db.volume_update(ctxt, volume['id'],
{'status': self.expected_status})
@ -4880,7 +4879,7 @@ class VolumeMigrationTestCase(base.BaseVolumeTestCase):
nova_api, create_volume,
save):
def fake_create_volume(*args, **kwargs):
context, volume, host, request_spec, filter_properties = args
context, volume, request_spec, filter_properties = args
fake_db = mock.Mock()
task = create_volume_manager.ExtractVolumeSpecTask(fake_db)
specs = task.execute(context, volume, {})
@ -4916,7 +4915,7 @@ class VolumeMigrationTestCase(base.BaseVolumeTestCase):
migrate_volume_completion,
nova_api, create_volume, save):
def fake_create_volume(*args, **kwargs):
context, volume, host, request_spec, filter_properties = args
context, volume, request_spec, filter_properties = args
fake_db = mock.Mock()
task = create_volume_manager.ExtractVolumeSpecTask(fake_db)
specs = task.execute(context, volume, {})

View File

@ -26,6 +26,7 @@ from cinder.common import constants
from cinder import context
from cinder import db
from cinder import objects
from cinder.objects import base as ovo_base
from cinder.objects import fields
from cinder import test
from cinder.tests.unit.backup import fake_backup
@ -158,8 +159,12 @@ class VolumeRpcAPITestCase(test.TestCase):
if 'dest_host' in expected_msg:
dest_host = expected_msg.pop('dest_host')
dest_host_dict = {'host': dest_host.host,
'cluster_name': dest_host.cluster_name,
'capabilities': dest_host.capabilities}
expected_msg['host'] = dest_host_dict
if 'force_copy' in expected_msg:
expected_msg['force_host_copy'] = expected_msg.pop('force_copy')
if 'new_volume' in expected_msg:
volume = expected_msg['new_volume']
expected_msg['new_volume_id'] = volume['id']
@ -229,26 +234,11 @@ class VolumeRpcAPITestCase(test.TestCase):
self.assertEqual(expected_arg, arg)
for kwarg, value in self.fake_kwargs.items():
if isinstance(value, objects.Snapshot):
expected_snapshot = expected_msg[kwarg].obj_to_primitive()
snapshot = value.obj_to_primitive()
self.assertEqual(expected_snapshot, snapshot)
elif isinstance(value, objects.ConsistencyGroup):
expected_cg = expected_msg[kwarg].obj_to_primitive()
cg = value.obj_to_primitive()
self.assertEqual(expected_cg, cg)
elif isinstance(value, objects.CGSnapshot):
expected_cgsnapshot = expected_msg[kwarg].obj_to_primitive()
cgsnapshot = value.obj_to_primitive()
self.assertEqual(expected_cgsnapshot, cgsnapshot)
elif isinstance(value, objects.Volume):
expected_volume = expected_msg[kwarg].obj_to_primitive()
volume = value.obj_to_primitive()
self.assertDictEqual(expected_volume, volume)
elif isinstance(value, objects.Backup):
expected_backup = expected_msg[kwarg].obj_to_primitive()
backup = value.obj_to_primitive()
self.assertEqual(expected_backup, backup)
if isinstance(value, ovo_base.CinderObject):
expected = expected_msg[kwarg].obj_to_primitive()
primitive = value.obj_to_primitive()
self.assertEqual(expected, primitive)
else:
self.assertEqual(expected_msg[kwarg], value)
@ -328,8 +318,7 @@ class VolumeRpcAPITestCase(test.TestCase):
def test_create_consistencygroup(self):
self._test_volume_api('create_consistencygroup', rpc_method='cast',
group=self.fake_cg, host='fake_host1',
version='3.0')
group=self.fake_cg, version='3.0')
def test_delete_consistencygroup(self):
self._test_volume_api('delete_consistencygroup', rpc_method='cast',
@ -358,7 +347,6 @@ class VolumeRpcAPITestCase(test.TestCase):
self._test_volume_api('create_volume',
rpc_method='cast',
volume=self.fake_volume_obj,
host='fake_host1',
request_spec='fake_request_spec',
filter_properties='fake_properties',
allow_reschedule=True,
@ -540,7 +528,13 @@ class VolumeRpcAPITestCase(test.TestCase):
'-8ffd-0800200c9a66',
version='3.0')
def test_extend_volume(self):
def _change_cluster_name(self, resource, cluster_name):
resource.cluster_name = cluster_name
resource.obj_reset_changes()
@ddt.data(None, 'mycluster')
def test_extend_volume(self, cluster_name):
self._change_cluster_name(self.fake_volume_obj, cluster_name)
self._test_volume_api('extend_volume',
rpc_method='cast',
volume=self.fake_volume_obj,
@ -548,10 +542,12 @@ class VolumeRpcAPITestCase(test.TestCase):
reservations=self.fake_reservations,
version='3.0')
def test_migrate_volume(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
def test_migrate_volume(self, can_send_version):
class FakeHost(object):
def __init__(self):
self.host = 'host'
self.cluster_name = 'cluster_name'
self.capabilities = {}
dest_host = FakeHost()
self._test_volume_api('migrate_volume',
@ -559,7 +555,7 @@ class VolumeRpcAPITestCase(test.TestCase):
volume=self.fake_volume_obj,
dest_host=dest_host,
force_host_copy=True,
version='3.0')
version='3.5')
def test_migrate_volume_completion(self):
self._test_volume_api('migrate_volume_completion',
@ -569,10 +565,12 @@ class VolumeRpcAPITestCase(test.TestCase):
error=False,
version='3.0')
def test_retype(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
def test_retype(self, can_send_version):
class FakeHost(object):
def __init__(self):
self.host = 'host'
self.cluster_name = 'cluster_name'
self.capabilities = {}
dest_host = FakeHost()
self._test_volume_api('retype',
@ -583,7 +581,7 @@ class VolumeRpcAPITestCase(test.TestCase):
migration_policy='never',
reservations=self.fake_reservations,
old_reservations=self.fake_reservations,
version='3.0')
version='3.5')
def test_manage_existing(self):
self._test_volume_api('manage_existing',
@ -685,8 +683,7 @@ class VolumeRpcAPITestCase(test.TestCase):
def test_create_group(self):
self._test_group_api('create_group', rpc_method='cast',
group=self.fake_group, host='fake_host1',
version='3.0')
group=self.fake_group, version='3.0')
def test_delete_group(self):
self._test_group_api('delete_group', rpc_method='cast',

View File

@ -1344,32 +1344,47 @@ class API(base.Base):
resource=volume)
@wrap_check_policy
def migrate_volume(self, context, volume, host, force_host_copy,
def migrate_volume(self, context, volume, host, cluster_name, force_copy,
lock_volume):
"""Migrate the volume to the specified host."""
# Make sure the host is in the list of available hosts
"""Migrate the volume to the specified host or cluster."""
elevated = context.elevated()
topic = constants.VOLUME_TOPIC
services = objects.ServiceList.get_all_by_topic(
elevated, topic, disabled=False)
found = False
svc_host = volume_utils.extract_host(host, 'backend')
for service in services:
if service.is_up and service.host == svc_host:
found = True
break
if not found:
msg = _('No available service named %s') % host
# If we received a request to migrate to a host
# Look for the service - must be up and enabled
svc_host = host and volume_utils.extract_host(host, 'backend')
svc_cluster = cluster_name and volume_utils.extract_host(cluster_name,
'backend')
# NOTE(geguileo): Only svc_host or svc_cluster is set, so when we get
# a service from the DB we are getting either one specific service from
# a host or any service from a cluster that is up, which means that the
# cluster itself is also up.
try:
svc = objects.Service.get_by_id(elevated, None, is_up=True,
topic=constants.VOLUME_TOPIC,
host=svc_host, disabled=False,
cluster_name=svc_cluster,
backend_match_level='pool')
except exception.ServiceNotFound:
msg = _('No available service named %s') % cluster_name or host
LOG.error(msg)
raise exception.InvalidHost(reason=msg)
# Even if we were requested to do a migration to a host, if the host is
# in a cluster we will do a cluster migration.
cluster_name = svc.cluster_name
# Build required conditions for conditional update
expected = {'status': ('available', 'in-use'),
'migration_status': self.AVAILABLE_MIGRATION_STATUS,
'replication_status': (None, 'disabled'),
'consistencygroup_id': (None, ''),
'group_id': (None, ''),
'host': db.Not(host)}
'group_id': (None, '')}
# We want to make sure that the migration is to another host or
# another cluster.
if cluster_name:
expected['cluster_name'] = db.Not(cluster_name)
else:
expected['host'] = db.Not(host)
filters = [~db.volume_has_snapshots_filter()]
@ -1392,8 +1407,8 @@ class API(base.Base):
if not result:
msg = _('Volume %s status must be available or in-use, must not '
'be migrating, have snapshots, be replicated, be part of '
'a group and destination host must be different than the '
'current host') % {'vol_id': volume.id}
'a group and destination host/cluster must be different '
'than the current one') % {'vol_id': volume.id}
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
@ -1406,11 +1421,11 @@ class API(base.Base):
request_spec = {'volume_properties': volume,
'volume_type': volume_type,
'volume_id': volume.id}
self.scheduler_rpcapi.migrate_volume_to_host(context,
volume,
host,
force_host_copy,
request_spec)
self.scheduler_rpcapi.migrate_volume(context,
volume,
cluster_name or host,
force_copy,
request_spec)
LOG.info(_LI("Migrate volume request issued successfully."),
resource=volume)
@ -1556,19 +1571,31 @@ class API(base.Base):
LOG.info(_LI("Retype volume request issued successfully."),
resource=volume)
def _get_service_by_host(self, context, host, resource='volume'):
def _get_service_by_host_cluster(self, context, host, cluster_name,
resource='volume'):
elevated = context.elevated()
svc_cluster = cluster_name and volume_utils.extract_host(cluster_name,
'backend')
svc_host = host and volume_utils.extract_host(host, 'backend')
# NOTE(geguileo): Only svc_host or svc_cluster is set, so when we get
# a service from the DB we are getting either one specific service from
# a host or any service that is up from a cluster, which means that the
# cluster itself is also up.
try:
svc_host = volume_utils.extract_host(host, 'backend')
service = objects.Service.get_by_args(
elevated, svc_host, 'cinder-volume')
service = objects.Service.get_by_id(elevated, None, host=svc_host,
binary='cinder-volume',
cluster_name=svc_cluster)
except exception.ServiceNotFound:
with excutils.save_and_reraise_exception():
LOG.error(_LE('Unable to find service: %(service)s for '
'given host: %(host)s.'),
{'service': constants.VOLUME_BINARY, 'host': host})
'given host: %(host)s and cluster %(cluster)s.'),
{'service': constants.VOLUME_BINARY, 'host': host,
'cluster': cluster_name})
if service.disabled:
if service.disabled and (not service.cluster_name or
service.cluster.disabled):
LOG.error(_LE('Unable to manage existing %s on a disabled '
'service.'), resource)
raise exception.ServiceUnavailable()
@ -1580,15 +1607,16 @@ class API(base.Base):
return service
def manage_existing(self, context, host, ref, name=None, description=None,
volume_type=None, metadata=None,
def manage_existing(self, context, host, cluster_name, ref, name=None,
description=None, volume_type=None, metadata=None,
availability_zone=None, bootable=False):
if volume_type and 'extra_specs' not in volume_type:
extra_specs = volume_types.get_volume_type_extra_specs(
volume_type['id'])
volume_type['extra_specs'] = extra_specs
service = self._get_service_by_host(context, host)
service = self._get_service_by_host_cluster(context, host,
cluster_name)
if availability_zone is None:
availability_zone = service.availability_zone
@ -1597,7 +1625,8 @@ class API(base.Base):
'context': context,
'name': name,
'description': description,
'host': host,
'host': service.host,
'cluster_name': service.cluster_name,
'ref': ref,
'volume_type': volume_type,
'metadata': metadata,
@ -1626,7 +1655,7 @@ class API(base.Base):
def get_manageable_volumes(self, context, host, marker=None, limit=None,
offset=None, sort_keys=None, sort_dirs=None):
self._get_service_by_host(context, host)
self._get_service_by_host_cluster(context, host, None)
return self.volume_rpcapi.get_manageable_volumes(context, host,
marker, limit,
offset, sort_keys,
@ -1635,18 +1664,21 @@ class API(base.Base):
def manage_existing_snapshot(self, context, ref, volume,
name=None, description=None,
metadata=None):
service = self._get_service_by_host(context, volume.host, 'snapshot')
service = self._get_service_by_host_cluster(context, volume.host,
volume.cluster_name,
'snapshot')
snapshot_object = self.create_snapshot_in_db(context, volume, name,
description, True,
metadata, None,
commit_quota=False)
self.volume_rpcapi.manage_existing_snapshot(context, snapshot_object,
ref, service.host)
self.volume_rpcapi.manage_existing_snapshot(
context, snapshot_object, ref, service.service_topic_queue)
return snapshot_object
def get_manageable_snapshots(self, context, host, marker=None, limit=None,
offset=None, sort_keys=None, sort_dirs=None):
self._get_service_by_host(context, host, resource='snapshot')
self._get_service_by_host_cluster(context, host, None, 'snapshot')
return self.volume_rpcapi.get_manageable_snapshots(context, host,
marker, limit,
offset, sort_keys,

View File

@ -339,6 +339,7 @@ class BaseVD(object):
# NOTE(vish): db is set by Manager
self.db = kwargs.get('db')
self.host = kwargs.get('host')
self.cluster_name = kwargs.get('cluster_name')
self.configuration = kwargs.get('configuration', None)
if self.configuration:

View File

@ -703,13 +703,13 @@ class VolumeCastTask(flow_utils.CinderTask):
self.db = db
def _cast_create_volume(self, context, request_spec, filter_properties):
source_volid = request_spec['source_volid']
source_replicaid = request_spec['source_replicaid']
source_volume_ref = None
source_volid = (request_spec['source_volid'] or
request_spec['source_replicaid'])
volume = request_spec['volume']
snapshot_id = request_spec['snapshot_id']
image_id = request_spec['image_id']
cgroup_id = request_spec['consistencygroup_id']
host = None
cgsnapshot_id = request_spec['cgsnapshot_id']
group_id = request_spec['group_id']
if cgroup_id:
@ -734,19 +734,11 @@ class VolumeCastTask(flow_utils.CinderTask):
# snapshot resides instead of passing it through the scheduler, so
# snapshot can be copied to the new volume.
snapshot = objects.Snapshot.get_by_id(context, snapshot_id)
source_volume_ref = objects.Volume.get_by_id(context,
snapshot.volume_id)
host = source_volume_ref.host
source_volume_ref = snapshot.volume
elif source_volid:
source_volume_ref = objects.Volume.get_by_id(context,
source_volid)
host = source_volume_ref.host
elif source_replicaid:
source_volume_ref = objects.Volume.get_by_id(context,
source_replicaid)
host = source_volume_ref.host
source_volume_ref = objects.Volume.get_by_id(context, source_volid)
if not host:
if not source_volume_ref:
# Cast to the scheduler and let it handle whatever is needed
# to select the target host for this volume.
self.scheduler_rpcapi.create_volume(
@ -759,14 +751,14 @@ class VolumeCastTask(flow_utils.CinderTask):
else:
# Bypass the scheduler and send the request directly to the volume
# manager.
volume.host = host
volume.host = source_volume_ref.host
volume.cluster_name = source_volume_ref.cluster_name
volume.scheduled_at = timeutils.utcnow()
volume.save()
if not cgsnapshot_id:
self.volume_rpcapi.create_volume(
context,
volume,
volume.host,
request_spec,
filter_properties,
allow_reschedule=False)

View File

@ -37,7 +37,8 @@ class EntryCreateTask(flow_utils.CinderTask):
def __init__(self, db):
requires = ['availability_zone', 'description', 'metadata',
'name', 'host', 'bootable', 'volume_type', 'ref']
'name', 'host', 'cluster_name', 'bootable', 'volume_type',
'ref']
super(EntryCreateTask, self).__init__(addons=[ACTION],
requires=requires)
self.db = db
@ -62,6 +63,7 @@ class EntryCreateTask(flow_utils.CinderTask):
'display_description': kwargs.pop('description'),
'display_name': kwargs.pop('name'),
'host': kwargs.pop('host'),
'cluster_name': kwargs.pop('cluster_name'),
'availability_zone': kwargs.pop('availability_zone'),
'volume_type_id': volume_type_id,
'metadata': kwargs.pop('metadata') or {},

View File

@ -222,6 +222,7 @@ class VolumeManager(manager.CleanableManager,
configuration=self.configuration,
db=self.db,
host=self.host,
cluster_name=self.cluster,
is_vol_db_empty=vol_db_empty,
active_backend_id=curr_active_backend_id)
@ -548,6 +549,12 @@ class VolumeManager(manager.CleanableManager,
"""
return self.driver.initialized
def _set_resource_host(self, resource):
"""Set the host field on the DB to our own when we are clustered."""
if resource.is_clustered and resource.host != self.host:
resource.host = self.host
resource.save()
@objects.Volume.set_workers
def create_volume(self, context, volume, request_spec=None,
filter_properties=None, allow_reschedule=True):
@ -555,6 +562,9 @@ class VolumeManager(manager.CleanableManager,
# Log about unsupported drivers
utils.log_unsupported_driver_warning(self.driver)
# Make sure the host in the DB matches our own when clustered
self._set_resource_host(volume)
context_elevated = context.elevated()
if filter_properties is None:
filter_properties = {}
@ -1683,12 +1693,13 @@ class VolumeManager(manager.CleanableManager,
remote=src_remote,
attach_encryptor=attach_encryptor)
def _migrate_volume_generic(self, ctxt, volume, host, new_type_id):
def _migrate_volume_generic(self, ctxt, volume, backend, new_type_id):
rpcapi = volume_rpcapi.VolumeAPI()
# Create new volume on remote host
tmp_skip = {'snapshot_id', 'source_volid'}
skip = self._VOLUME_CLONE_SKIP_PROPERTIES | tmp_skip | {'host'}
skip = self._VOLUME_CLONE_SKIP_PROPERTIES | tmp_skip | {'host',
'cluster_name'}
new_vol_values = {k: volume[k] for k in set(volume.keys()) - skip}
if new_type_id:
new_vol_values['volume_type_id'] = new_type_id
@ -1700,15 +1711,16 @@ class VolumeManager(manager.CleanableManager,
new_volume = objects.Volume(
context=ctxt,
host=host['host'],
host=backend['host'],
cluster_name=backend.get('cluster_name'),
status='creating',
attach_status=fields.VolumeAttachStatus.DETACHED,
migration_status='target:%s' % volume['id'],
**new_vol_values
)
new_volume.create()
rpcapi.create_volume(ctxt, new_volume, host['host'],
None, None, allow_reschedule=False)
rpcapi.create_volume(ctxt, new_volume, None, None,
allow_reschedule=False)
# Wait for new_volume to become ready
starttime = time.time()
@ -1720,13 +1732,13 @@ class VolumeManager(manager.CleanableManager,
tries += 1
now = time.time()
if new_volume.status == 'error':
msg = _("failed to create new_volume on destination host")
msg = _("failed to create new_volume on destination")
self._clean_temporary_volume(ctxt, volume,
new_volume,
clean_db_only=True)
raise exception.VolumeMigrationFailed(reason=msg)
elif now > deadline:
msg = _("timeout creating new_volume on destination host")
msg = _("timeout creating new_volume on destination")
self._clean_temporary_volume(ctxt, volume,
new_volume,
clean_db_only=True)
@ -1931,6 +1943,7 @@ class VolumeManager(manager.CleanableManager,
host)
if moved:
updates = {'host': host['host'],
'cluster_name': host.get('cluster_name'),
'migration_status': 'success',
'previous_status': volume.status}
if status_update:
@ -1948,8 +1961,7 @@ class VolumeManager(manager.CleanableManager,
volume.save()
if not moved:
try:
self._migrate_volume_generic(ctxt, volume, host,
new_type_id)
self._migrate_volume_generic(ctxt, volume, host, new_type_id)
except Exception:
with excutils.save_and_reraise_exception():
updates = {'migration_status': 'error'}
@ -2238,17 +2250,22 @@ class VolumeManager(manager.CleanableManager,
# Call driver to try and change the type
retype_model_update = None
# NOTE(jdg): Check to see if the destination host is the same
# as the current. If it's not don't call the driver.retype
# method, otherwise drivers that implement retype may report
# success, but it's invalid in the case of a migrate.
# NOTE(jdg): Check to see if the destination host or cluster (depending
# if it's the volume is in a clustered backend or not) is the same as
# the current. If it's not don't call the driver.retype method,
# otherwise drivers that implement retype may report success, but it's
# invalid in the case of a migrate.
# We assume that those that support pools do this internally
# so we strip off the pools designation
if (not retyped and
not diff.get('encryption') and
vol_utils.hosts_are_equivalent(self.driver.host,
host['host'])):
((not host.get('cluster_name') and
vol_utils.hosts_are_equivalent(self.driver.host,
host['host'])) or
(vol_utils.hosts_are_equivalent(self.driver.cluster_name,
host.get('cluster_name'))))):
try:
new_type = volume_types.get_volume_type(context, new_type_id)
ret = self.driver.retype(context,
@ -2311,6 +2328,7 @@ class VolumeManager(manager.CleanableManager,
else:
model_update = {'volume_type_id': new_type_id,
'host': host['host'],
'cluster_name': host.get('cluster_name'),
'status': status_update['status']}
if retype_model_update:
model_update.update(retype_model_update)
@ -2407,6 +2425,9 @@ class VolumeManager(manager.CleanableManager,
def _create_group(self, context, group, is_generic_group=True):
context = context.elevated()
# Make sure the host in the DB matches our own when clustered
self._set_resource_host(group)
status = fields.GroupStatus.AVAILABLE
model_update = None

View File

@ -115,9 +115,10 @@ class VolumeAPI(rpc.RPCAPI):
get_backup_device().
3.3 - Adds support for sending objects over RPC in attach_volume().
3.4 - Adds support for sending objects over RPC in detach_volume().
3.5 - Adds support for cluster in retype and migrate_volume
"""
RPC_API_VERSION = '3.4'
RPC_API_VERSION = '3.5'
RPC_DEFAULT_VERSION = '3.0'
TOPIC = constants.VOLUME_TOPIC
BINARY = 'cinder-volume'
@ -125,10 +126,10 @@ class VolumeAPI(rpc.RPCAPI):
def _get_cctxt(self, host=None, version=None, **kwargs):
if host is not None:
kwargs['server'] = utils.get_volume_rpc_host(host)
return super(VolumeAPI, self)._get_cctxt(version, **kwargs)
return super(VolumeAPI, self)._get_cctxt(version=version, **kwargs)
def create_consistencygroup(self, ctxt, group, host):
cctxt = self._get_cctxt(host)
def create_consistencygroup(self, ctxt, group):
cctxt = self._get_cctxt(group.service_topic_queue)
cctxt.cast(ctxt, 'create_consistencygroup', group=group)
def delete_consistencygroup(self, ctxt, group):
@ -145,7 +146,7 @@ class VolumeAPI(rpc.RPCAPI):
def create_consistencygroup_from_src(self, ctxt, group, cgsnapshot=None,
source_cg=None):
cctxt = self._get_cctxt(group.host)
cctxt = self._get_cctxt(group.service_topic_queue)
cctxt.cast(ctxt, 'create_consistencygroup_from_src',
group=group,
cgsnapshot=cgsnapshot,
@ -159,9 +160,9 @@ class VolumeAPI(rpc.RPCAPI):
cctxt = self._get_cctxt(cgsnapshot.service_topic_queue)
cctxt.cast(ctxt, 'delete_cgsnapshot', cgsnapshot=cgsnapshot)
def create_volume(self, ctxt, volume, host, request_spec,
filter_properties, allow_reschedule=True):
cctxt = self._get_cctxt(host)
def create_volume(self, ctxt, volume, request_spec, filter_properties,
allow_reschedule=True):
cctxt = self._get_cctxt(volume.service_topic_queue)
cctxt.cast(ctxt, 'create_volume',
request_spec=request_spec,
filter_properties=filter_properties,
@ -239,35 +240,48 @@ class VolumeAPI(rpc.RPCAPI):
new_user=new_user, new_project=new_project)
def extend_volume(self, ctxt, volume, new_size, reservations):
cctxt = self._get_cctxt(volume.host)
cctxt = self._get_cctxt(volume.service_topic_queue)
cctxt.cast(ctxt, 'extend_volume', volume=volume, new_size=new_size,
reservations=reservations)
def migrate_volume(self, ctxt, volume, dest_host, force_host_copy):
host_p = {'host': dest_host.host,
'capabilities': dest_host.capabilities}
cctxt = self._get_cctxt(volume.host)
cctxt.cast(ctxt, 'migrate_volume', volume=volume, host=host_p,
backend_p = {'host': dest_host.host,
'cluster_name': dest_host.cluster_name,
'capabilities': dest_host.capabilities}
version = '3.5'
if not self.client.can_send_version(version):
version = '3.0'
del backend_p['cluster_name']
cctxt = self._get_cctxt(volume.service_topic_queue, version)
cctxt.cast(ctxt, 'migrate_volume', volume=volume, host=backend_p,
force_host_copy=force_host_copy)
def migrate_volume_completion(self, ctxt, volume, new_volume, error):
cctxt = self._get_cctxt(volume.host)
cctxt = self._get_cctxt(volume.service_topic_queue)
return cctxt.call(ctxt, 'migrate_volume_completion', volume=volume,
new_volume=new_volume, error=error,)
def retype(self, ctxt, volume, new_type_id, dest_host,
migration_policy='never', reservations=None,
old_reservations=None):
host_p = {'host': dest_host.host,
'capabilities': dest_host.capabilities}
cctxt = self._get_cctxt(volume.host)
backend_p = {'host': dest_host.host,
'cluster_name': dest_host.cluster_name,
'capabilities': dest_host.capabilities}
version = '3.5'
if not self.client.can_send_version(version):
version = '3.0'
del backend_p['cluster_name']
cctxt = self._get_cctxt(volume.service_topic_queue, version)
cctxt.cast(ctxt, 'retype', volume=volume, new_type_id=new_type_id,
host=host_p, migration_policy=migration_policy,
host=backend_p, migration_policy=migration_policy,
reservations=reservations,
old_reservations=old_reservations)
def manage_existing(self, ctxt, volume, ref):
cctxt = self._get_cctxt(volume.host)
cctxt = self._get_cctxt(volume.service_topic_queue)
cctxt.cast(ctxt, 'manage_existing', ref=ref, volume=volume)
def update_migrated_volume(self, ctxt, volume, new_volume,
@ -334,8 +348,8 @@ class VolumeAPI(rpc.RPCAPI):
limit=limit, offset=offset, sort_keys=sort_keys,
sort_dirs=sort_dirs)
def create_group(self, ctxt, group, host):
cctxt = self._get_cctxt(host)
def create_group(self, ctxt, group):
cctxt = self._get_cctxt(group.service_topic_queue)
cctxt.cast(ctxt, 'create_group', group=group)
def delete_group(self, ctxt, group):
@ -349,7 +363,7 @@ class VolumeAPI(rpc.RPCAPI):
def create_group_from_src(self, ctxt, group, group_snapshot=None,
source_group=None):
cctxt = self._get_cctxt(group.host)
cctxt = self._get_cctxt(group.service_topic_queue)
cctxt.cast(ctxt, 'create_group_from_src', group=group,
group_snapshot=group_snapshot, source_group=source_group)

View File

@ -752,6 +752,9 @@ def matching_backend_name(src_volume_type, volume_type):
def hosts_are_equivalent(host_1, host_2):
# In case host_1 or host_2 are None
if not (host_1 and host_2):
return host_1 == host_2
return extract_host(host_1) == extract_host(host_2)