From 9acf079b8c84f7d8e9151d58162a8deb8fd77fad Mon Sep 17 00:00:00 2001 From: Gorka Eguileor Date: Thu, 21 Jul 2016 13:36:58 +0200 Subject: [PATCH] Support A/A on Scheduler operations This patch allows scheduler to work with clustered hosts to support A/A operations. Reporting capabilities of clustered hosts will be grouped by the cluster_name instead of the host, and non clustered hosts will still be stored by host. To avoid replacing a newer capability report with an older version we timestamp capabilities on the volumes (it's backward compatible) and only replace currently stored values in scheduler when they are newer. Following actions now support A/A operation: - manage_existing - manage_existing_snapshot - get_pools - create_volume - retype - migrate_volume_to_host - create_consistencygroup - create_group - update_service_capabilities - extend_volume And Affinity and Driver filters have been updated. The new functionality to notify service capabilities has not been changed to Active/Active and will be done in another patch. APIImpact: Added microversion 3.16 Specs: https://review.openstack.org/327283 Implements: blueprint cinder-volume-active-active-support Change-Id: I611e75500f3d5281188c5aae287c62e5810e6b72 --- api-ref/source/v3/parameters.yaml | 14 ++ .../volume-manage-request-cluster.json | 19 ++ api-ref/source/v3/volume-manage.inc | 10 +- cinder/api/common.py | 14 ++ cinder/api/contrib/admin_actions.py | 11 +- cinder/api/contrib/volume_manage.py | 15 +- cinder/api/openstack/api_version_request.py | 3 +- .../openstack/rest_api_version_history.rst | 9 + cinder/db/sqlalchemy/api.py | 34 ++-- cinder/manager.py | 3 +- cinder/objects/base.py | 4 + cinder/scheduler/driver.py | 22 ++- cinder/scheduler/filter_scheduler.py | 88 ++++----- cinder/scheduler/filters/affinity_filter.py | 20 ++- cinder/scheduler/filters/capacity_filter.py | 62 ++++--- cinder/scheduler/filters/driver_filter.py | 7 +- .../filters/ignore_attempted_hosts_filter.py | 2 +- .../filters/instance_locality_filter.py | 4 +- cinder/scheduler/host_manager.py | 95 ++++++---- cinder/scheduler/manager.py | 56 ++++-- cinder/scheduler/rpcapi.py | 50 ++++-- .../unit/api/contrib/test_admin_actions.py | 70 +++++++- .../unit/api/contrib/test_snapshot_manage.py | 15 +- .../unit/api/contrib/test_volume_manage.py | 53 +++++- .../tests/unit/api/v3/test_snapshot_manage.py | 2 +- cinder/tests/unit/scheduler/fakes.py | 15 +- .../tests/unit/scheduler/test_base_filter.py | 2 +- .../unit/scheduler/test_capacity_weigher.py | 9 +- .../unit/scheduler/test_chance_weigher.py | 2 +- .../unit/scheduler/test_filter_scheduler.py | 2 +- .../tests/unit/scheduler/test_host_manager.py | 170 +++++++++++++----- cinder/tests/unit/scheduler/test_rpcapi.py | 25 ++- cinder/tests/unit/scheduler/test_scheduler.py | 33 +++- cinder/tests/unit/test_db_api.py | 33 ++++ cinder/tests/unit/test_volume.py | 19 +- cinder/tests/unit/test_volume_rpcapi.py | 57 +++--- cinder/volume/api.py | 110 ++++++++---- cinder/volume/driver.py | 1 + cinder/volume/flows/api/create_volume.py | 24 +-- cinder/volume/flows/api/manage_existing.py | 4 +- cinder/volume/manager.py | 51 ++++-- cinder/volume/rpcapi.py | 58 +++--- cinder/volume/utils.py | 3 + 43 files changed, 911 insertions(+), 389 deletions(-) create mode 100644 api-ref/source/v3/samples/volume-manage-request-cluster.json diff --git a/api-ref/source/v3/parameters.yaml b/api-ref/source/v3/parameters.yaml index a15041a719c..74e1ac41f66 100644 --- a/api-ref/source/v3/parameters.yaml +++ b/api-ref/source/v3/parameters.yaml @@ -349,6 +349,13 @@ cgsnapshot_id: in: body required: false type: string +cluster_mutex: + description: | + The OpenStack Block Storage cluster where the resource resides. Optional + only if host field is provided. + in: body + required: false + type: string connector: description: | The ``connector`` object. @@ -638,6 +645,13 @@ host: in: body required: true type: string +host_mutex: + description: | + The OpenStack Block Storage host where the existing resource resides. + Optional only if cluster field is provided. + in: body + required: false + type: string host_name: description: | The name of the attaching host. diff --git a/api-ref/source/v3/samples/volume-manage-request-cluster.json b/api-ref/source/v3/samples/volume-manage-request-cluster.json new file mode 100644 index 00000000000..5467c98d7bd --- /dev/null +++ b/api-ref/source/v3/samples/volume-manage-request-cluster.json @@ -0,0 +1,19 @@ +{ + "volume": { + "host": null, + "cluster": "cluster@backend", + "ref": { + "source-name": "existingLV", + "source-id": "1234" + }, + "name": "New Volume", + "availability_zone": "az2", + "description": "Volume imported from existingLV", + "volume_type": null, + "bootable": true, + "metadata": { + "key1": "value1", + "key2": "value2" + } + } +} diff --git a/api-ref/source/v3/volume-manage.inc b/api-ref/source/v3/volume-manage.inc index 9bd5cb7c622..d9d33b2bb86 100644 --- a/api-ref/source/v3/volume-manage.inc +++ b/api-ref/source/v3/volume-manage.inc @@ -24,6 +24,10 @@ or source-name element, if possible. The API chooses the size of the volume by rounding up the size of the existing storage volume to the next gibibyte (GiB). +Prior to microversion 3.16 host field was required, with the possibility of +defining the cluster it is no longer required, but we must have either a host +or a cluster field but we cannot have them both with values. + Error response codes:202, @@ -38,7 +42,8 @@ Request - volume_type: volume_type - name: name - volume: volume - - host: host + - host: host_mutex + - cluster: cluster_mutex - ref: ref - metadata: metadata - tenant_id: tenant_id @@ -48,3 +53,6 @@ Request Example .. literalinclude:: ./samples/volume-manage-request.json :language: javascript + +.. literalinclude:: ./samples/volume-manage-request-cluster.json + :language: javascript diff --git a/cinder/api/common.py b/cinder/api/common.py index c39770cf996..09e2e8ed2f3 100644 --- a/cinder/api/common.py +++ b/cinder/api/common.py @@ -370,3 +370,17 @@ class ViewBuilder(object): url_parts[2] = prefix_parts[2] + url_parts[2] return urllib.parse.urlunsplit(url_parts).rstrip('/') + + +def get_cluster_host(req, params, cluster_version): + if req.api_version_request.matches(cluster_version): + cluster_name = params.get('cluster') + msg = _('One and only one of cluster and host must be set.') + else: + cluster_name = None + msg = _('Host field is missing.') + + host = params.get('host') + if bool(cluster_name) == bool(host): + raise exception.InvalidInput(reason=msg) + return cluster_name, host diff --git a/cinder/api/contrib/admin_actions.py b/cinder/api/contrib/admin_actions.py index b673e4881e8..4893e0bd7c5 100644 --- a/cinder/api/contrib/admin_actions.py +++ b/cinder/api/contrib/admin_actions.py @@ -17,6 +17,7 @@ import oslo_messaging as messaging import webob from webob import exc +from cinder.api import common from cinder.api import extensions from cinder.api.openstack import wsgi from cinder import backup @@ -241,14 +242,12 @@ class VolumeAdminController(AdminController): # Not found exception will be handled at the wsgi level volume = self._get(context, id) params = body['os-migrate_volume'] - try: - host = params['host'] - except KeyError: - raise exc.HTTPBadRequest(explanation=_("Must specify 'host'.")) + + cluster_name, host = common.get_cluster_host(req, params, '3.16') force_host_copy = utils.get_bool_param('force_host_copy', params) lock_volume = utils.get_bool_param('lock_volume', params) - self.volume_api.migrate_volume(context, volume, host, force_host_copy, - lock_volume) + self.volume_api.migrate_volume(context, volume, host, cluster_name, + force_host_copy, lock_volume) return webob.Response(status_int=202) @wsgi.action('os-migrate_volume_completion') diff --git a/cinder/api/contrib/volume_manage.py b/cinder/api/contrib/volume_manage.py index ba7cc1cdf31..a6a735b9ec0 100644 --- a/cinder/api/contrib/volume_manage.py +++ b/cinder/api/contrib/volume_manage.py @@ -13,8 +13,8 @@ # under the License. from oslo_log import log as logging -from webob import exc +from cinder.api import common from cinder.api.contrib import resource_common_manage from cinder.api import extensions from cinder.api.openstack import wsgi @@ -64,6 +64,7 @@ class VolumeManageController(wsgi.Controller): 'volume': { 'host': , + 'cluster': , 'ref': , } } @@ -106,13 +107,10 @@ class VolumeManageController(wsgi.Controller): # Check that the required keys are present, return an error if they # are not. - required_keys = set(['ref', 'host']) - missing_keys = list(required_keys - set(volume.keys())) + if 'ref' not in volume: + raise exception.MissingRequired(element='ref') - if missing_keys: - msg = _("The following elements are required: %s") % \ - ', '.join(missing_keys) - raise exc.HTTPBadRequest(explanation=msg) + cluster_name, host = common.get_cluster_host(req, volume, '3.16') LOG.debug('Manage volume request body: %s', body) @@ -139,7 +137,8 @@ class VolumeManageController(wsgi.Controller): try: new_volume = self.volume_api.manage_existing(context, - volume['host'], + host, + cluster_name, volume['ref'], **kwargs) except exception.ServiceNotFound: diff --git a/cinder/api/openstack/api_version_request.py b/cinder/api/openstack/api_version_request.py index 70ceb4ba10f..6f118231882 100644 --- a/cinder/api/openstack/api_version_request.py +++ b/cinder/api/openstack/api_version_request.py @@ -64,6 +64,7 @@ REST_API_VERSION_HISTORY = """ * 3.14 - Add group snapshot and create group from src APIs. * 3.15 - Inject the response's `Etag` header to avoid the lost update problem with volume metadata. + * 3.16 - Migrate volume now supports cluster """ # The minimum and maximum versions of the API supported @@ -71,7 +72,7 @@ REST_API_VERSION_HISTORY = """ # minimum version of the API supported. # Explicitly using /v1 or /v2 enpoints will still work _MIN_API_VERSION = "3.0" -_MAX_API_VERSION = "3.15" +_MAX_API_VERSION = "3.16" _LEGACY_API_VERSION1 = "1.0" _LEGACY_API_VERSION2 = "2.0" diff --git a/cinder/api/openstack/rest_api_version_history.rst b/cinder/api/openstack/rest_api_version_history.rst index 1f5062885a9..ab0309901f4 100644 --- a/cinder/api/openstack/rest_api_version_history.rst +++ b/cinder/api/openstack/rest_api_version_history.rst @@ -191,3 +191,12 @@ user documentation. ------------------------ Added injecting the response's `Etag` header to avoid the lost update problem with volume metadata. + +3.16 +---- + os-migrate_volume now accepts ``cluster`` parameter when we want to migrate a + volume to a cluster. If we pass the ``host`` parameter for a volume that is + in a cluster, the request will be sent to the cluster as if we had requested + that specific cluster. Only ``host`` or ``cluster`` can be provided. + + Creating a managed volume also supports the cluster parameter. diff --git a/cinder/db/sqlalchemy/api.py b/cinder/db/sqlalchemy/api.py index 60a1f778d97..0abaa25c080 100644 --- a/cinder/db/sqlalchemy/api.py +++ b/cinder/db/sqlalchemy/api.py @@ -424,7 +424,7 @@ def _filter_host(field, value, match_level=None): def _service_query(context, session=None, read_deleted='no', host=None, cluster_name=None, is_up=None, backend_match_level=None, - **filters): + disabled=None, **filters): filters = _clean_filters(filters) if filters and not is_valid_model_filters(models.Service, filters): return None @@ -442,6 +442,22 @@ def _service_query(context, session=None, read_deleted='no', host=None, query = query.filter(_filter_host(models.Service.cluster_name, cluster_name, backend_match_level)) + # Now that we have clusters, a service is disabled if the service doesn't + # belong to a cluster or if it belongs to a cluster and the cluster itself + # is disabled. + if disabled is not None: + disabled_filter = or_( + and_(models.Service.cluster_name.is_(None), + models.Service.disabled), + and_(models.Service.cluster_name.isnot(None), + sql.exists().where(and_( + models.Cluster.name == models.Service.cluster_name, + models.Cluster.binary == models.Service.binary, + ~models.Cluster.deleted, + models.Cluster.disabled)))) + if not disabled: + disabled_filter = ~disabled_filter + query = query.filter(disabled_filter) if filters: query = query.filter_by(**filters) @@ -5074,16 +5090,14 @@ def consistencygroup_create(context, values, cg_snap_id=None, cg_id=None): if conditions: # We don't want duplicated field values - values.pop('volume_type_id', None) - values.pop('availability_zone', None) - values.pop('host', None) + names = ['volume_type_id', 'availability_zone', 'host', + 'cluster_name'] + for name in names: + values.pop(name, None) - sel = session.query(cg_model.volume_type_id, - cg_model.availability_zone, - cg_model.host, - *(bindparam(k, v) for k, v in values.items()) - ).filter(*conditions) - names = ['volume_type_id', 'availability_zone', 'host'] + fields = [getattr(cg_model, name) for name in names] + fields.extend(bindparam(k, v) for k, v in values.items()) + sel = session.query(*fields).filter(*conditions) names.extend(values.keys()) insert_stmt = cg_model.__table__.insert().from_select(names, sel) result = session.execute(insert_stmt) diff --git a/cinder/manager.py b/cinder/manager.py index 0cd1e2a701b..7beb9f873eb 100644 --- a/cinder/manager.py +++ b/cinder/manager.py @@ -177,7 +177,8 @@ class SchedulerDependentManager(Manager): context, self.service_name, self.host, - self.last_capabilities) + self.last_capabilities, + self.cluster) try: self.scheduler_rpcapi.notify_service_capabilities( context, diff --git a/cinder/objects/base.py b/cinder/objects/base.py index e8784a7bc1a..60e223c9d4b 100644 --- a/cinder/objects/base.py +++ b/cinder/objects/base.py @@ -457,6 +457,10 @@ class ClusteredObject(object): def service_topic_queue(self): return self.cluster_name or self.host + @property + def is_clustered(self): + return bool(self.cluster_name) + class CinderObjectSerializer(base.VersionedObjectSerializer): OBJ_BASE_CLASS = CinderObject diff --git a/cinder/scheduler/driver.py b/cinder/scheduler/driver.py index 2fc3a8da587..57d759e7185 100644 --- a/cinder/scheduler/driver.py +++ b/cinder/scheduler/driver.py @@ -41,13 +41,14 @@ CONF = cfg.CONF CONF.register_opts(scheduler_driver_opts) -def volume_update_db(context, volume_id, host): - """Set the host and set the scheduled_at field of a volume. +def volume_update_db(context, volume_id, host, cluster_name): + """Set the host, cluster_name, and set the scheduled_at field of a volume. :returns: A Volume with the updated fields set properly. """ volume = objects.Volume.get_by_id(context, volume_id) volume.host = host + volume.cluster_name = cluster_name volume.scheduled_at = timeutils.utcnow() volume.save() @@ -56,22 +57,24 @@ def volume_update_db(context, volume_id, host): return volume -def group_update_db(context, group, host): +def group_update_db(context, group, host, cluster_name): """Set the host and the scheduled_at field of a consistencygroup. :returns: A Consistencygroup with the updated fields set properly. """ - group.update({'host': host, 'updated_at': timeutils.utcnow()}) + group.update({'host': host, 'updated_at': timeutils.utcnow(), + 'cluster_name': cluster_name}) group.save() return group -def generic_group_update_db(context, group, host): +def generic_group_update_db(context, group, host, cluster_name): """Set the host and the scheduled_at field of a group. :returns: A Group with the updated fields set properly. """ - group.update({'host': host, 'updated_at': timeutils.utcnow()}) + group.update({'host': host, 'updated_at': timeutils.utcnow(), + 'cluster_name': cluster_name}) group.save() return group @@ -97,11 +100,14 @@ class Scheduler(object): return self.host_manager.has_all_capabilities() - def update_service_capabilities(self, service_name, host, capabilities): + def update_service_capabilities(self, service_name, host, capabilities, + cluster_name, timestamp): """Process a capability update from a service node.""" self.host_manager.update_service_capabilities(service_name, host, - capabilities) + capabilities, + cluster_name, + timestamp) def notify_service_capabilities(self, service_name, host, capabilities): """Notify capability update from a service node.""" diff --git a/cinder/scheduler/filter_scheduler.py b/cinder/scheduler/filter_scheduler.py index 7c7e8af1830..5ead5919a8f 100644 --- a/cinder/scheduler/filter_scheduler.py +++ b/cinder/scheduler/filter_scheduler.py @@ -74,12 +74,11 @@ class FilterScheduler(driver.Scheduler): if not weighed_host: raise exception.NoValidHost(reason=_("No weighed hosts available")) - host = weighed_host.obj.host + backend = weighed_host.obj + updated_group = driver.group_update_db(context, group, backend.host, + backend.cluster_name) - updated_group = driver.group_update_db(context, group, host) - - self.volume_rpcapi.create_consistencygroup(context, - updated_group, host) + self.volume_rpcapi.create_consistencygroup(context, updated_group) def schedule_create_group(self, context, group, group_spec, @@ -96,12 +95,13 @@ class FilterScheduler(driver.Scheduler): if not weighed_host: raise exception.NoValidHost(reason=_("No weighed hosts available")) - host = weighed_host.obj.host + backend = weighed_host.obj - updated_group = driver.generic_group_update_db(context, group, host) + updated_group = driver.generic_group_update_db(context, group, + backend.host, + backend.cluster_name) - self.volume_rpcapi.create_group(context, - updated_group, host) + self.volume_rpcapi.create_group(context, updated_group) def schedule_create_volume(self, context, request_spec, filter_properties): weighed_host = self._schedule(context, request_spec, @@ -110,18 +110,20 @@ class FilterScheduler(driver.Scheduler): if not weighed_host: raise exception.NoValidHost(reason=_("No weighed hosts available")) - host = weighed_host.obj.host + backend = weighed_host.obj volume_id = request_spec['volume_id'] - updated_volume = driver.volume_update_db(context, volume_id, host) + updated_volume = driver.volume_update_db(context, volume_id, + backend.host, + backend.cluster_name) self._post_select_populate_filter_properties(filter_properties, - weighed_host.obj) + backend) # context is not serializable filter_properties.pop('context', None) - self.volume_rpcapi.create_volume(context, updated_volume, host, - request_spec, filter_properties, + self.volume_rpcapi.create_volume(context, updated_volume, request_spec, + filter_properties, allow_reschedule=True) def host_passes_filters(self, context, host, request_spec, @@ -131,7 +133,7 @@ class FilterScheduler(driver.Scheduler): filter_properties) for weighed_host in weighed_hosts: host_state = weighed_host.obj - if host_state.host == host: + if host_state.backend_id == host: return host_state volume_id = request_spec.get('volume_id', '??volume_id missing??') @@ -144,26 +146,27 @@ class FilterScheduler(driver.Scheduler): migration_policy='never'): """Find a host that can accept the volume with its new type.""" filter_properties = filter_properties or {} - current_host = request_spec['volume_properties']['host'] + backend = (request_spec['volume_properties'].get('cluster_name') + or request_spec['volume_properties']['host']) - # The volume already exists on this host, and so we shouldn't check if - # it can accept the volume again in the CapacityFilter. - filter_properties['vol_exists_on'] = current_host + # The volume already exists on this backend, and so we shouldn't check + # if it can accept the volume again in the CapacityFilter. + filter_properties['vol_exists_on'] = backend - weighed_hosts = self._get_weighted_candidates(context, request_spec, - filter_properties) - if not weighed_hosts: + weighed_backends = self._get_weighted_candidates(context, request_spec, + filter_properties) + if not weighed_backends: raise exception.NoValidHost(reason=_('No valid hosts for volume ' '%(id)s with type %(type)s') % {'id': request_spec['volume_id'], 'type': request_spec['volume_type']}) - for weighed_host in weighed_hosts: - host_state = weighed_host.obj - if host_state.host == current_host: - return host_state + for weighed_backend in weighed_backends: + backend_state = weighed_backend.obj + if backend_state.backend_id == backend: + return backend_state - if utils.extract_host(current_host, 'pool') is None: + if utils.extract_host(backend, 'pool') is None: # legacy volumes created before pool is introduced has no pool # info in host. But host_state.host always include pool level # info. In this case if above exact match didn't work out, we @@ -172,11 +175,12 @@ class FilterScheduler(driver.Scheduler): # cause migration between pools on same host, which we consider # it is different from migration between hosts thus allow that # to happen even migration policy is 'never'. - for weighed_host in weighed_hosts: - host_state = weighed_host.obj - backend = utils.extract_host(host_state.host, 'backend') - if backend == current_host: - return host_state + for weighed_backend in weighed_backends: + backend_state = weighed_backend.obj + new_backend = utils.extract_host(backend_state.backend_id, + 'backend') + if new_backend == backend: + return backend_state if migration_policy == 'never': raise exception.NoValidHost(reason=_('Current host not valid for ' @@ -186,7 +190,7 @@ class FilterScheduler(driver.Scheduler): {'id': request_spec['volume_id'], 'type': request_spec['volume_type']}) - top_host = self._choose_top_host(weighed_hosts, request_spec) + top_host = self._choose_top_host(weighed_backends, request_spec) return top_host.obj def get_pools(self, context, filters): @@ -201,7 +205,7 @@ class FilterScheduler(driver.Scheduler): been selected by the scheduling process. """ # Add a retry entry for the selected volume backend: - self._add_retry_host(filter_properties, host_state.host) + self._add_retry_host(filter_properties, host_state.backend_id) def _add_retry_host(self, filter_properties, host): """Add a retry entry for the selected volume backend. @@ -418,8 +422,8 @@ class FilterScheduler(driver.Scheduler): for host2 in temp_weighed_hosts: # Should schedule creation of CG on backend level, # not pool level. - if (utils.extract_host(host1.obj.host) == - utils.extract_host(host2.obj.host)): + if (utils.extract_host(host1.obj.backend_id) == + utils.extract_host(host2.obj.backend_id)): new_weighed_hosts.append(host1) weighed_hosts = new_weighed_hosts if not weighed_hosts: @@ -530,8 +534,8 @@ class FilterScheduler(driver.Scheduler): for host2 in host_list2: # Should schedule creation of group on backend level, # not pool level. - if (utils.extract_host(host1.obj.host) == - utils.extract_host(host2.obj.host)): + if (utils.extract_host(host1.obj.backend_id) == + utils.extract_host(host2.obj.backend_id)): new_hosts.append(host1) if not new_hosts: return [] @@ -615,7 +619,7 @@ class FilterScheduler(driver.Scheduler): # Get host name including host@backend#pool info from # weighed_hosts. for host in weighed_hosts[::-1]: - backend = utils.extract_host(host.obj.host) + backend = utils.extract_host(host.obj.backend_id) if backend != group_backend: weighed_hosts.remove(host) if not weighed_hosts: @@ -651,7 +655,7 @@ class FilterScheduler(driver.Scheduler): def _choose_top_host(self, weighed_hosts, request_spec): top_host = weighed_hosts[0] host_state = top_host.obj - LOG.debug("Choosing %s", host_state.host) + LOG.debug("Choosing %s", host_state.backend_id) volume_properties = request_spec['volume_properties'] host_state.consume_from_volume(volume_properties) return top_host @@ -659,11 +663,11 @@ class FilterScheduler(driver.Scheduler): def _choose_top_host_group(self, weighed_hosts, request_spec_list): top_host = weighed_hosts[0] host_state = top_host.obj - LOG.debug("Choosing %s", host_state.host) + LOG.debug("Choosing %s", host_state.backend_id) return top_host def _choose_top_host_generic_group(self, weighed_hosts): top_host = weighed_hosts[0] host_state = top_host.obj - LOG.debug("Choosing %s", host_state.host) + LOG.debug("Choosing %s", host_state.backend_id) return top_host diff --git a/cinder/scheduler/filters/affinity_filter.py b/cinder/scheduler/filters/affinity_filter.py index 7d2910a2df7..560167b88fd 100644 --- a/cinder/scheduler/filters/affinity_filter.py +++ b/cinder/scheduler/filters/affinity_filter.py @@ -24,6 +24,14 @@ class AffinityFilter(filters.BaseHostFilter): def __init__(self): self.volume_api = volume.API() + def _get_volumes(self, context, affinity_uuids, backend_state): + filters = {'id': affinity_uuids, 'deleted': False} + if backend_state.cluster_name: + filters['cluster_name'] = backend_state.cluster_name + else: + filters['host'] = backend_state.host + return self.volume_api.get_all(context, filters=filters) + class DifferentBackendFilter(AffinityFilter): """Schedule volume on a different back-end from a set of volumes.""" @@ -53,11 +61,8 @@ class DifferentBackendFilter(AffinityFilter): return False if affinity_uuids: - return not self.volume_api.get_all( - context, filters={'host': host_state.host, - 'id': affinity_uuids, - 'deleted': False}) - + return not self._get_volumes(context, affinity_uuids, + host_state) # With no different_host key return True @@ -90,10 +95,7 @@ class SameBackendFilter(AffinityFilter): return False if affinity_uuids: - return self.volume_api.get_all( - context, filters={'host': host_state.host, - 'id': affinity_uuids, - 'deleted': False}) + return self._get_volumes(context, affinity_uuids, host_state) # With no same_host key return True diff --git a/cinder/scheduler/filters/capacity_filter.py b/cinder/scheduler/filters/capacity_filter.py index 52e0211ef49..1959d36fbb5 100644 --- a/cinder/scheduler/filters/capacity_filter.py +++ b/cinder/scheduler/filters/capacity_filter.py @@ -36,25 +36,29 @@ class CapacityFilter(filters.BaseHostFilter): # If the volume already exists on this host, don't fail it for # insufficient capacity (e.g., if we are retyping) - if host_state.host == filter_properties.get('vol_exists_on'): + if host_state.backend_id == filter_properties.get('vol_exists_on'): return True spec = filter_properties.get('request_spec') if spec: volid = spec.get('volume_id') + grouping = 'cluster' if host_state.cluster_name else 'host' if filter_properties.get('new_size'): # If new_size is passed, we are allocating space to extend a volume requested_size = (int(filter_properties.get('new_size')) - int(filter_properties.get('size'))) - LOG.debug('Checking if host %(host)s can extend the volume %(id)s' - 'in %(size)s GB', {'host': host_state.host, 'id': volid, - 'size': requested_size}) + LOG.debug('Checking if %(grouping)s %(grouping_name)s can extend ' + 'the volume %(id)s in %(size)s GB', + {'grouping': grouping, + 'grouping_name': host_state.backend_id, 'id': volid, + 'size': requested_size}) else: requested_size = filter_properties.get('size') - LOG.debug('Checking if host %(host)s can create a %(size)s GB ' - 'volume (%(id)s)', - {'host': host_state.host, 'id': volid, + LOG.debug('Checking if %(grouping)s %(grouping_name)s can create ' + 'a %(size)s GB volume (%(id)s)', + {'grouping': grouping, + 'grouping_name': host_state.backend_id, 'id': volid, 'size': requested_size}) if host_state.free_capacity_gb is None: @@ -85,18 +89,16 @@ class CapacityFilter(filters.BaseHostFilter): total = float(total_space) if total <= 0: LOG.warning(_LW("Insufficient free space for volume creation. " - "Total capacity is %(total).2f on host %(host)s."), + "Total capacity is %(total).2f on %(grouping)s " + "%(grouping_name)s."), {"total": total, - "host": host_state.host}) + "grouping": grouping, + "grouping_name": host_state.backend_id}) return False # Calculate how much free space is left after taking into account # the reserved space. free = free_space - math.floor(total * reserved) - msg_args = {"host": host_state.host, - "requested": requested_size, - "available": free} - # NOTE(xyang): If 'provisioning:type' is 'thick' in extra_specs, # we will not use max_over_subscription_ratio and # provisioned_capacity_gb to determine whether a volume can be @@ -117,15 +119,18 @@ class CapacityFilter(filters.BaseHostFilter): provisioned_ratio = ((host_state.provisioned_capacity_gb + requested_size) / total) if provisioned_ratio > host_state.max_over_subscription_ratio: + msg_args = { + "provisioned_ratio": provisioned_ratio, + "oversub_ratio": host_state.max_over_subscription_ratio, + "grouping": grouping, + "grouping_name": host_state.backend_id, + } LOG.warning(_LW( "Insufficient free space for thin provisioning. " "The ratio of provisioned capacity over total capacity " "%(provisioned_ratio).2f has exceeded the maximum over " - "subscription ratio %(oversub_ratio).2f on host " - "%(host)s."), - {"provisioned_ratio": provisioned_ratio, - "oversub_ratio": host_state.max_over_subscription_ratio, - "host": host_state.host}) + "subscription ratio %(oversub_ratio).2f on %(grouping)s " + "%(grouping_name)s."), msg_args) return False else: # Thin provisioning is enabled and projected over-subscription @@ -138,23 +143,30 @@ class CapacityFilter(filters.BaseHostFilter): free * host_state.max_over_subscription_ratio) return adjusted_free_virtual >= requested_size elif thin and host_state.thin_provisioning_support: - LOG.warning(_LW("Filtering out host %(host)s with an invalid " - "maximum over subscription ratio of " - "%(oversub_ratio).2f. The ratio should be a " + LOG.warning(_LW("Filtering out %(grouping)s %(grouping_name)s " + "with an invalid maximum over subscription ratio " + "of %(oversub_ratio).2f. The ratio should be a " "minimum of 1.0."), {"oversub_ratio": host_state.max_over_subscription_ratio, - "host": host_state.host}) + "grouping": grouping, + "grouping_name": host_state.backend_id}) return False + msg_args = {"grouping_name": host_state.backend_id, + "grouping": grouping, + "requested": requested_size, + "available": free} + if free < requested_size: LOG.warning(_LW("Insufficient free space for volume creation " - "on host %(host)s (requested / avail): " - "%(requested)s/%(available)s"), msg_args) + "on %(grouping)s %(grouping_name)s (requested / " + "avail): %(requested)s/%(available)s"), + msg_args) return False LOG.debug("Space information for volume creation " - "on host %(host)s (requested / avail): " + "on %(grouping)s %(grouping_name)s (requested / avail): " "%(requested)s/%(available)s", msg_args) return True diff --git a/cinder/scheduler/filters/driver_filter.py b/cinder/scheduler/filters/driver_filter.py index b57532413e5..329b3c7ed75 100644 --- a/cinder/scheduler/filters/driver_filter.py +++ b/cinder/scheduler/filters/driver_filter.py @@ -35,10 +35,11 @@ class DriverFilter(filters.BaseHostFilter): """Determines whether a host has a passing filter_function or not.""" stats = self._generate_stats(host_state, filter_properties) - LOG.debug("Checking host '%s'", stats['host_stats']['host']) + LOG.debug("Checking backend '%s'", stats['host_stats']['backend_id']) result = self._check_filter_function(stats) LOG.debug("Result: %s", result) - LOG.debug("Done checking host '%s'", stats['host_stats']['host']) + LOG.debug("Done checking backend '%s'", + stats['host_stats']['backend_id']) return result @@ -89,6 +90,8 @@ class DriverFilter(filters.BaseHostFilter): host_stats = { 'host': host_state.host, + 'cluster_name': host_state.cluster_name, + 'backend_id': host_state.backend_id, 'volume_backend_name': host_state.volume_backend_name, 'vendor_name': host_state.vendor_name, 'driver_version': host_state.driver_version, diff --git a/cinder/scheduler/filters/ignore_attempted_hosts_filter.py b/cinder/scheduler/filters/ignore_attempted_hosts_filter.py index beffe16a4f8..f37241652f7 100644 --- a/cinder/scheduler/filters/ignore_attempted_hosts_filter.py +++ b/cinder/scheduler/filters/ignore_attempted_hosts_filter.py @@ -45,7 +45,7 @@ class IgnoreAttemptedHostsFilter(filters.BaseHostFilter): return True hosts = attempted.get('hosts', []) - host = host_state.host + host = host_state.backend_id passes = host not in hosts pass_msg = "passes" if passes else "fails" diff --git a/cinder/scheduler/filters/instance_locality_filter.py b/cinder/scheduler/filters/instance_locality_filter.py index 7ab7c513365..e2d562f34e9 100644 --- a/cinder/scheduler/filters/instance_locality_filter.py +++ b/cinder/scheduler/filters/instance_locality_filter.py @@ -69,9 +69,9 @@ class InstanceLocalityFilter(filters.BaseHostFilter): return self._nova_ext_srv_attr - def host_passes(self, host_state, filter_properties): + def host_passes(self, backend_state, filter_properties): context = filter_properties['context'] - host = volume_utils.extract_host(host_state.host, 'host') + host = volume_utils.extract_host(backend_state.backend_id, 'host') scheduler_hints = filter_properties.get('scheduler_hints') or {} instance_uuid = scheduler_hints.get(HINT_KEYWORD, None) diff --git a/cinder/scheduler/host_manager.py b/cinder/scheduler/host_manager.py index 6c3203af97d..345c8c836f1 100644 --- a/cinder/scheduler/host_manager.py +++ b/cinder/scheduler/host_manager.py @@ -86,10 +86,11 @@ class ReadOnlyDict(collections.Mapping): class HostState(object): """Mutable and immutable information tracked for a volume backend.""" - def __init__(self, host, capabilities=None, service=None): + def __init__(self, host, cluster_name, capabilities=None, service=None): self.capabilities = None self.service = None self.host = host + self.cluster_name = cluster_name self.update_capabilities(capabilities, service) self.volume_backend_name = None @@ -122,6 +123,10 @@ class HostState(object): self.updated = None + @property + def backend_id(self): + return self.cluster_name or self.host + def update_capabilities(self, capabilities=None, service=None): # Read-only capability dicts @@ -210,7 +215,8 @@ class HostState(object): cur_pool = self.pools.get(pool_name, None) if not cur_pool: # Add new pool - cur_pool = PoolState(self.host, pool_cap, pool_name) + cur_pool = PoolState(self.host, self.cluster_name, + pool_cap, pool_name) self.pools[pool_name] = cur_pool cur_pool.update_from_volume_capability(pool_cap, service) @@ -227,7 +233,8 @@ class HostState(object): if len(self.pools) == 0: # No pool was there - single_pool = PoolState(self.host, capability, pool_name) + single_pool = PoolState(self.host, self.cluster_name, + capability, pool_name) self._append_backend_info(capability) self.pools[pool_name] = single_pool else: @@ -235,7 +242,8 @@ class HostState(object): try: single_pool = self.pools[pool_name] except KeyError: - single_pool = PoolState(self.host, capability, pool_name) + single_pool = PoolState(self.host, self.cluster_name, + capability, pool_name) self._append_backend_info(capability) self.pools[pool_name] = single_pool @@ -293,14 +301,18 @@ class HostState(object): # FIXME(zhiteng) backend level free_capacity_gb isn't as # meaningful as it used to be before pool is introduced, we'd # come up with better representation of HostState. - return ("host '%s': free_capacity_gb: %s, pools: %s" % - (self.host, self.free_capacity_gb, self.pools)) + grouping = 'cluster' if self.cluster_name else 'host' + grouping_name = self.backend_id + + return ("%s '%s': free_capacity_gb: %s, pools: %s" % + (grouping, grouping_name, self.free_capacity_gb, self.pools)) class PoolState(HostState): - def __init__(self, host, capabilities, pool_name): + def __init__(self, host, cluster_name, capabilities, pool_name): new_host = vol_utils.append_host(host, pool_name) - super(PoolState, self).__init__(new_host, capabilities) + new_cluster = vol_utils.append_host(cluster_name, pool_name) + super(PoolState, self).__init__(new_host, new_cluster, capabilities) self.pool_name = pool_name # No pools in pool self.pools = None @@ -443,7 +455,8 @@ class HostManager(object): hosts, weight_properties) - def update_service_capabilities(self, service_name, host, capabilities): + def update_service_capabilities(self, service_name, host, capabilities, + cluster_name, timestamp): """Update the per-service capabilities based on this notification.""" if service_name != 'volume': LOG.debug('Ignoring %(service_name)s service update ' @@ -451,9 +464,12 @@ class HostManager(object): {'service_name': service_name, 'host': host}) return + # TODO(geguileo): In P - Remove the next line since we receive the + # timestamp + timestamp = timestamp or timeutils.utcnow() # Copy the capabilities, so we don't modify the original dict capab_copy = dict(capabilities) - capab_copy["timestamp"] = timeutils.utcnow() # Reported time + capab_copy["timestamp"] = timestamp # Set the default capabilities in case None is set. capab_old = self.service_states.get(host, {"timestamp": 0}) @@ -474,15 +490,19 @@ class HostManager(object): self.service_states[host] = capab_copy - LOG.debug("Received %(service_name)s service update from " - "%(host)s: %(cap)s", + cluster_msg = (('Cluster: %s - Host: ' % cluster_name) if cluster_name + else '') + LOG.debug("Received %(service_name)s service update from %(cluster)s" + "%(host)s: %(cap)s%(cluster)s", {'service_name': service_name, 'host': host, - 'cap': capabilities}) + 'cap': capabilities, + 'cluster': cluster_msg}) self._no_capabilities_hosts.discard(host) def notify_service_capabilities(self, service_name, host, capabilities): """Notify the ceilometer with updated volume stats""" + # TODO(geguileo): Make this work with Active/Active if service_name != 'volume': return @@ -519,6 +539,7 @@ class HostManager(object): volume_services = objects.ServiceList.get_all_by_topic(context, topic, disabled=False) + active_backends = set() active_hosts = set() no_capabilities_hosts = set() for service in volume_services.objects: @@ -526,32 +547,46 @@ class HostManager(object): if not service.is_up: LOG.warning(_LW("volume service is down. (host: %s)"), host) continue + capabilities = self.service_states.get(host, None) if capabilities is None: no_capabilities_hosts.add(host) continue - host_state = self.host_state_map.get(host) - if not host_state: - host_state = self.host_state_cls(host, - capabilities=capabilities, - service= - dict(service)) - self.host_state_map[host] = host_state - # update capabilities and attributes in host_state - host_state.update_from_volume_capability(capabilities, - service= - dict(service)) + # Since the service could have been added or remove from a cluster + backend_key = service.service_topic_queue + backend_state = self.host_state_map.get(backend_key, None) + if not backend_state: + backend_state = self.host_state_cls( + host, + service.cluster_name, + capabilities=capabilities, + service=dict(service)) + self.host_state_map[backend_key] = backend_state + + # We may be receiving capability reports out of order from + # different volume services in a cluster, so we drop older updates + # and only update for newer capability reports. + if (backend_state.capabilities['timestamp'] <= + capabilities['timestamp']): + # update capabilities and attributes in backend_state + backend_state.update_from_volume_capability( + capabilities, service=dict(service)) + active_backends.add(backend_key) active_hosts.add(host) self._no_capabilities_hosts = no_capabilities_hosts - # remove non-active hosts from host_state_map - nonactive_hosts = set(self.host_state_map.keys()) - active_hosts - for host in nonactive_hosts: - LOG.info(_LI("Removing non-active host: %(host)s from " - "scheduler cache."), {'host': host}) - del self.host_state_map[host] + # remove non-active keys from host_state_map + inactive_backend_keys = set(self.host_state_map) - active_backends + for backend_key in inactive_backend_keys: + # NOTE(geguileo): We don't want to log the removal of a host from + # the map when we are removing it because it has been added to a + # cluster. + if backend_key not in active_hosts: + LOG.info(_LI("Removing non-active backend: %(backend)s from " + "scheduler cache."), {'backend': backend_key}) + del self.host_state_map[backend_key] def get_all_host_states(self, context): """Returns a dict of all the hosts the HostManager knows about. diff --git a/cinder/scheduler/manager.py b/cinder/scheduler/manager.py index c2ab5dc1008..9b69582d098 100644 --- a/cinder/scheduler/manager.py +++ b/cinder/scheduler/manager.py @@ -19,12 +19,15 @@ Scheduler Service """ +from datetime import datetime + import eventlet from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging from oslo_utils import excutils from oslo_utils import importutils +from oslo_utils import timeutils import six from cinder import context @@ -33,6 +36,7 @@ from cinder import exception from cinder import flow_utils from cinder.i18n import _, _LE from cinder import manager +from cinder import objects from cinder import quota from cinder import rpc from cinder.scheduler.flows import create_volume @@ -53,7 +57,7 @@ QUOTAS = quota.QUOTAS LOG = logging.getLogger(__name__) -class SchedulerManager(manager.Manager): +class SchedulerManager(manager.CleanableManager, manager.Manager): """Chooses a host to create volumes.""" RPC_API_VERSION = scheduler_rpcapi.SchedulerAPI.RPC_API_VERSION @@ -80,13 +84,22 @@ class SchedulerManager(manager.Manager): self.driver.reset() def update_service_capabilities(self, context, service_name=None, - host=None, capabilities=None, **kwargs): + host=None, capabilities=None, + cluster_name=None, timestamp=None, + **kwargs): """Process a capability update from a service node.""" if capabilities is None: capabilities = {} + # If we received the timestamp we have to deserialize it + elif timestamp: + timestamp = datetime.strptime(timestamp, + timeutils.PERFECT_TIME_FORMAT) + self.driver.update_service_capabilities(service_name, host, - capabilities) + capabilities, + cluster_name, + timestamp) def notify_service_capabilities(self, context, service_name, host, capabilities): @@ -150,9 +163,9 @@ class SchedulerManager(manager.Manager): group.status = 'error' group.save() + @objects.Volume.set_workers def create_volume(self, context, volume, snapshot_id=None, image_id=None, request_spec=None, filter_properties=None): - self._wait_for_scheduler() try: @@ -171,13 +184,21 @@ class SchedulerManager(manager.Manager): with flow_utils.DynamicLogListener(flow_engine, logger=LOG): flow_engine.run() + def _do_cleanup(self, ctxt, vo_resource): + # We can only receive cleanup requests for volumes, but we check anyway + # We need to cleanup the volume status for cases where the scheduler + # died while scheduling the volume creation. + if (isinstance(vo_resource, objects.Volume) and + vo_resource.status == 'creating'): + vo_resource.status = 'error' + vo_resource.save() + def request_service_capabilities(self, context): volume_rpcapi.VolumeAPI().publish_service_capabilities(context) - def migrate_volume_to_host(self, context, volume, host, force_host_copy, - request_spec, filter_properties=None): + def migrate_volume(self, context, volume, backend, force_copy, + request_spec, filter_properties): """Ensure that the host exists and can accept the volume.""" - self._wait_for_scheduler() def _migrate_volume_set_error(self, context, ex, request_spec): @@ -193,9 +214,9 @@ class SchedulerManager(manager.Manager): context, ex, request_spec) try: - tgt_host = self.driver.host_passes_filters(context, host, - request_spec, - filter_properties) + tgt_backend = self.driver.host_passes_filters(context, backend, + request_spec, + filter_properties) except exception.NoValidHost as ex: _migrate_volume_set_error(self, context, ex, request_spec) except Exception as ex: @@ -203,8 +224,14 @@ class SchedulerManager(manager.Manager): _migrate_volume_set_error(self, context, ex, request_spec) else: volume_rpcapi.VolumeAPI().migrate_volume(context, volume, - tgt_host, - force_host_copy) + tgt_backend, + force_copy) + + # FIXME(geguileo): Remove this in v4.0 of RPC API. + def migrate_volume_to_host(self, context, volume, host, force_host_copy, + request_spec, filter_properties=None): + return self.migrate_volume(context, volume, host, force_host_copy, + request_spec, filter_properties) def retype(self, context, volume, request_spec, filter_properties=None): """Schedule the modification of a volume's type. @@ -272,7 +299,7 @@ class SchedulerManager(manager.Manager): try: self.driver.host_passes_filters(context, - volume.host, + volume.service_topic_queue, request_spec, filter_properties) except exception.NoValidHost as ex: @@ -306,7 +333,8 @@ class SchedulerManager(manager.Manager): filter_properties['new_size'] = new_size try: - self.driver.host_passes_filters(context, volume.host, + self.driver.host_passes_filters(context, + volume.service_topic_queue, request_spec, filter_properties) volume_rpcapi.VolumeAPI().extend_volume(context, volume, new_size, reservations) diff --git a/cinder/scheduler/rpcapi.py b/cinder/scheduler/rpcapi.py index 40b8ee0fa1e..a5124a80b86 100644 --- a/cinder/scheduler/rpcapi.py +++ b/cinder/scheduler/rpcapi.py @@ -17,6 +17,7 @@ Client side of the scheduler manager RPC API. """ from oslo_serialization import jsonutils +from oslo_utils import timeutils from cinder.common import constants from cinder import exception @@ -62,9 +63,12 @@ class SchedulerAPI(rpc.RPCAPI): 3.0 - Remove 2.x compatibility 3.1 - Adds notify_service_capabilities() 3.2 - Adds extend_volume() + 3.3 - Add cluster support to migrate_volume, and to + update_service_capabilities and send the timestamp from the + capabilities. """ - RPC_API_VERSION = '3.2' + RPC_API_VERSION = '3.3' RPC_DEFAULT_VERSION = '3.0' TOPIC = constants.SCHEDULER_TOPIC BINARY = 'cinder-scheduler' @@ -106,15 +110,24 @@ class SchedulerAPI(rpc.RPCAPI): 'filter_properties': filter_properties, 'volume': volume} return cctxt.cast(ctxt, 'create_volume', **msg_args) - def migrate_volume_to_host(self, ctxt, volume, host, force_host_copy=False, - request_spec=None, filter_properties=None): - cctxt = self._get_cctxt() + def migrate_volume(self, ctxt, volume, backend, force_copy=False, + request_spec=None, filter_properties=None): request_spec_p = jsonutils.to_primitive(request_spec) - msg_args = {'host': host, 'force_host_copy': force_host_copy, - 'request_spec': request_spec_p, + msg_args = {'request_spec': request_spec_p, 'filter_properties': filter_properties, 'volume': volume} + version = '3.3' + if self.client.can_send_version(version): + msg_args['backend'] = backend + msg_args['force_copy'] = force_copy + method = 'migrate_volume' + else: + version = '3.0' + msg_args['host'] = backend + msg_args['force_host_copy'] = force_copy + method = 'migrate_volume_to_host' - return cctxt.cast(ctxt, 'migrate_volume_to_host', **msg_args) + cctxt = self._get_cctxt(version=version) + return cctxt.cast(ctxt, method, **msg_args) def retype(self, ctxt, volume, request_spec=None, filter_properties=None): cctxt = self._get_cctxt() @@ -157,14 +170,27 @@ class SchedulerAPI(rpc.RPCAPI): return cctxt.call(ctxt, 'get_pools', filters=filters) def update_service_capabilities(self, ctxt, service_name, host, - capabilities): - cctxt = self._get_cctxt(fanout=True) - cctxt.cast(ctxt, 'update_service_capabilities', - service_name=service_name, host=host, - capabilities=capabilities) + capabilities, cluster_name, + timestamp=None): + msg_args = dict(service_name=service_name, host=host, + capabilities=capabilities) + + version = '3.3' + # If server accepts timestamping the capabilities and the cluster name + if self.client.can_send_version(version): + # Serialize the timestamp + timestamp = timestamp or timeutils.utcnow() + msg_args.update(cluster_name=cluster_name, + timestamp=jsonutils.to_primitive(timestamp)) + else: + version = '3.0' + + cctxt = self._get_cctxt(fanout=True, version=version) + cctxt.cast(ctxt, 'update_service_capabilities', **msg_args) def notify_service_capabilities(self, ctxt, service_name, host, capabilities): + # TODO(geguileo): Make this work with Active/Active cctxt = self._get_cctxt(version='3.1') if not cctxt.can_send_version('3.1'): msg = _('notify_service_capabilities requires cinder-scheduler ' diff --git a/cinder/tests/unit/api/contrib/test_admin_actions.py b/cinder/tests/unit/api/contrib/test_admin_actions.py index 311fb09d1d0..47e678806da 100644 --- a/cinder/tests/unit/api/contrib/test_admin_actions.py +++ b/cinder/tests/unit/api/contrib/test_admin_actions.py @@ -10,6 +10,7 @@ # License for the specific language governing permissions and limitations # under the License. +import ddt import fixtures import mock from oslo_concurrency import lockutils @@ -21,6 +22,7 @@ import webob from webob import exc from cinder.api.contrib import admin_actions +from cinder.api.openstack import api_version_request as api_version from cinder.backup import api as backup_api from cinder.backup import rpcapi as backup_rpcapi from cinder.common import constants @@ -70,6 +72,7 @@ class BaseAdminTest(test.TestCase): return volume +@ddt.ddt class AdminActionsTest(BaseAdminTest): def setUp(self): super(AdminActionsTest, self).setUp() @@ -101,6 +104,7 @@ class AdminActionsTest(BaseAdminTest): self.patch('cinder.objects.Service.get_minimum_rpc_version', side_effect=_get_minimum_rpc_version_mock) + self.controller = admin_actions.VolumeAdminController() def tearDown(self): self.svc.stop() @@ -138,7 +142,7 @@ class AdminActionsTest(BaseAdminTest): updated_status) def test_valid_updates(self): - vac = admin_actions.VolumeAdminController() + vac = self.controller vac.validate_update({'status': 'creating'}) vac.validate_update({'status': 'available'}) @@ -503,10 +507,74 @@ class AdminActionsTest(BaseAdminTest): {'host': 'test2', 'topic': constants.VOLUME_TOPIC, 'created_at': timeutils.utcnow()}) + db.service_create(self.ctx, + {'host': 'clustered_host', + 'topic': constants.VOLUME_TOPIC, + 'binary': constants.VOLUME_BINARY, + 'cluster_name': 'cluster', + 'created_at': timeutils.utcnow()}) + db.cluster_create(self.ctx, + {'name': 'cluster', + 'binary': constants.VOLUME_BINARY}) # current status is available volume = self._create_volume(self.ctx) return volume + def _migrate_volume_3_exec(self, ctx, volume, host, expected_status, + force_host_copy=False, version=None, + cluster=None): + # build request to migrate to host + # req = fakes.HTTPRequest.blank('/v3/%s/volumes/%s/action' % ( + # fake.PROJECT_ID, volume['id'])) + req = webob.Request.blank('/v3/%s/volumes/%s/action' % ( + fake.PROJECT_ID, volume['id'])) + req.method = 'POST' + req.headers['content-type'] = 'application/json' + body = {'os-migrate_volume': {'host': host, + 'force_host_copy': force_host_copy}} + version = version or '3.0' + req.headers = {'OpenStack-API-Version': 'volume %s' % version} + req.api_version_request = api_version.APIVersionRequest(version) + if version == '3.16': + body['os-migrate_volume']['cluster'] = cluster + req.body = jsonutils.dump_as_bytes(body) + req.environ['cinder.context'] = ctx + resp = self.controller._migrate_volume(req, volume.id, body) + + # verify status + self.assertEqual(expected_status, resp.status_int) + volume = db.volume_get(self.ctx, volume['id']) + return volume + + @ddt.data('3.0', '3.15', '3.16') + def test_migrate_volume_success_3(self, version): + expected_status = 202 + host = 'test2' + volume = self._migrate_volume_prep() + volume = self._migrate_volume_3_exec(self.ctx, volume, host, + expected_status, version=version) + self.assertEqual('starting', volume['migration_status']) + + def test_migrate_volume_success_cluster(self): + expected_status = 202 + # We cannot provide host and cluster, so send host to None + host = None + cluster = 'cluster' + volume = self._migrate_volume_prep() + volume = self._migrate_volume_3_exec(self.ctx, volume, host, + expected_status, version='3.16', + cluster=cluster) + self.assertEqual('starting', volume['migration_status']) + + def test_migrate_volume_fail_host_and_cluster(self): + # We cannot send host and cluster in the request + host = 'test2' + cluster = 'cluster' + volume = self._migrate_volume_prep() + self.assertRaises(exception.InvalidInput, + self._migrate_volume_3_exec, self.ctx, volume, host, + None, version='3.16', cluster=cluster) + def _migrate_volume_exec(self, ctx, volume, host, expected_status, force_host_copy=False): # build request to migrate to host diff --git a/cinder/tests/unit/api/contrib/test_snapshot_manage.py b/cinder/tests/unit/api/contrib/test_snapshot_manage.py index fb6980977fd..65daed472a8 100644 --- a/cinder/tests/unit/api/contrib/test_snapshot_manage.py +++ b/cinder/tests/unit/api/contrib/test_snapshot_manage.py @@ -45,7 +45,7 @@ def volume_get(self, context, volume_id, viewable_admin_meta=False): if volume_id == fake.VOLUME_ID: return objects.Volume(context, id=fake.VOLUME_ID, _name_id=fake.VOLUME2_ID, - host='fake_host') + host='fake_host', cluster_name=None) raise exception.VolumeNotFound(volume_id=volume_id) @@ -109,7 +109,7 @@ class SnapshotManageTest(test.TestCase): @mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot') @mock.patch('cinder.volume.api.API.create_snapshot_in_db') - @mock.patch('cinder.db.service_get') + @mock.patch('cinder.db.sqlalchemy.api.service_get') def test_manage_snapshot_ok(self, mock_db, mock_create_snapshot, mock_rpcapi): """Test successful manage snapshot execution. @@ -128,7 +128,8 @@ class SnapshotManageTest(test.TestCase): # Check the db.service_get was called with correct arguments. mock_db.assert_called_once_with( - mock.ANY, host='fake_host', binary='cinder-volume') + mock.ANY, None, host='fake_host', binary='cinder-volume', + cluster_name=None) # Check the create_snapshot_in_db was called with correct arguments. self.assertEqual(1, mock_create_snapshot.call_count) @@ -149,7 +150,7 @@ class SnapshotManageTest(test.TestCase): new_callable=mock.PropertyMock) @mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot') @mock.patch('cinder.volume.api.API.create_snapshot_in_db') - @mock.patch('cinder.db.service_get') + @mock.patch('cinder.db.sqlalchemy.api.service_get') def test_manage_snapshot_disabled(self, mock_db, mock_create_snapshot, mock_rpcapi, mock_is_up): """Test manage snapshot failure due to disabled service.""" @@ -168,7 +169,7 @@ class SnapshotManageTest(test.TestCase): new_callable=mock.PropertyMock) @mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot') @mock.patch('cinder.volume.api.API.create_snapshot_in_db') - @mock.patch('cinder.db.service_get') + @mock.patch('cinder.db.sqlalchemy.api.service_get') def test_manage_snapshot_is_down(self, mock_db, mock_create_snapshot, mock_rpcapi, mock_is_up): """Test manage snapshot failure due to down service.""" @@ -280,7 +281,7 @@ class SnapshotManageTest(test.TestCase): sort_dirs=['asc'], sort_keys=['reference']) @mock.patch('cinder.objects.service.Service.is_up', return_value=True) - @mock.patch('cinder.db.service_get') + @mock.patch('cinder.db.sqlalchemy.api.service_get') def test_get_manageable_snapshots_disabled(self, mock_db, mock_is_up): mock_db.return_value = fake_service.fake_service_obj(self._admin_ctxt, disabled=True) @@ -292,7 +293,7 @@ class SnapshotManageTest(test.TestCase): @mock.patch('cinder.objects.service.Service.is_up', return_value=False, new_callable=mock.PropertyMock) - @mock.patch('cinder.db.service_get') + @mock.patch('cinder.db.sqlalchemy.api.service_get') def test_get_manageable_snapshots_is_down(self, mock_db, mock_is_up): mock_db.return_value = fake_service.fake_service_obj(self._admin_ctxt) res = self._get_resp_get('host_ok', False, True) diff --git a/cinder/tests/unit/api/contrib/test_volume_manage.py b/cinder/tests/unit/api/contrib/test_volume_manage.py index f7d96dfabc8..a7862ba6ca1 100644 --- a/cinder/tests/unit/api/contrib/test_volume_manage.py +++ b/cinder/tests/unit/api/contrib/test_volume_manage.py @@ -23,6 +23,7 @@ except ImportError: from urllib.parse import urlencode import webob +from cinder.api.contrib import volume_manage from cinder.api.openstack import api_version_request as api_version from cinder import context from cinder import exception @@ -51,7 +52,7 @@ def app_v3(): return mapper -def service_get(context, host, binary): +def service_get(context, id, host=None, binary=None, *args, **kwargs): """Replacement for Service.service_get_by_host_and_topic. We mock the Service.service_get_by_host_and_topic method to return @@ -146,7 +147,7 @@ def api_get_manageable_volumes(*args, **kwargs): @ddt.ddt -@mock.patch('cinder.db.service_get', service_get) +@mock.patch('cinder.db.sqlalchemy.api.service_get', service_get) @mock.patch('cinder.volume.volume_types.get_volume_type_by_name', vt_get_volume_type_by_name) @mock.patch('cinder.volume.volume_types.get_volume_type', @@ -173,6 +174,7 @@ class VolumeManageTest(test.TestCase): self._non_admin_ctxt = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, is_admin=False) + self.controller = volume_manage.VolumeManageController() def _get_resp_post(self, body): """Helper to execute a POST os-volume-manage API call.""" @@ -196,10 +198,11 @@ class VolumeManageTest(test.TestCase): res = req.get_response(app_v3()) return res + @ddt.data(False, True) @mock.patch('cinder.volume.api.API.manage_existing', wraps=api_manage) @mock.patch( 'cinder.api.openstack.wsgi.Controller.validate_name_and_description') - def test_manage_volume_ok(self, mock_validate, mock_api_manage): + def test_manage_volume_ok(self, cluster, mock_validate, mock_api_manage): """Test successful manage volume execution. Tests for correct operation when valid arguments are passed in the @@ -209,6 +212,9 @@ class VolumeManageTest(test.TestCase): """ body = {'volume': {'host': 'host_ok', 'ref': 'fake_ref'}} + # This will be ignored + if cluster: + body['volume']['cluster'] = 'cluster' res = self._get_resp_post(body) self.assertEqual(202, res.status_int) @@ -216,9 +222,48 @@ class VolumeManageTest(test.TestCase): self.assertEqual(1, mock_api_manage.call_count) args = mock_api_manage.call_args[0] self.assertEqual(body['volume']['host'], args[1]) - self.assertEqual(body['volume']['ref'], args[2]) + self.assertIsNone(args[2]) # Cluster argument + self.assertEqual(body['volume']['ref'], args[3]) self.assertTrue(mock_validate.called) + def _get_resp_create(self, body, version='3.0'): + url = '/v3/%s/os-volume-manage' % fake.PROJECT_ID + req = webob.Request.blank(url, base_url='http://localhost.com' + url) + req.method = 'POST' + req.headers['Content-Type'] = 'application/json' + req.environ['cinder.context'] = self._admin_ctxt + req.body = jsonutils.dump_as_bytes(body) + req.headers = {'OpenStack-API-Version': 'volume %s' % version} + req.api_version_request = api_version.APIVersionRequest(version) + res = self.controller.create(req, body) + return res + + @mock.patch('cinder.volume.api.API.manage_existing', wraps=api_manage) + @mock.patch( + 'cinder.api.openstack.wsgi.Controller.validate_name_and_description') + def test_manage_volume_ok_cluster(self, mock_validate, mock_api_manage): + body = {'volume': {'cluster': 'cluster', + 'ref': 'fake_ref'}} + res = self._get_resp_create(body, '3.16') + self.assertEqual(['volume'], list(res.keys())) + + # Check that the manage API was called with the correct arguments. + self.assertEqual(1, mock_api_manage.call_count) + args = mock_api_manage.call_args[0] + self.assertIsNone(args[1]) + self.assertEqual(body['volume']['cluster'], args[2]) + self.assertEqual(body['volume']['ref'], args[3]) + self.assertTrue(mock_validate.called) + + @mock.patch( + 'cinder.api.openstack.wsgi.Controller.validate_name_and_description') + def test_manage_volume_fail_host_cluster(self, mock_validate): + body = {'volume': {'host': 'host_ok', + 'cluster': 'cluster', + 'ref': 'fake_ref'}} + self.assertRaises(exception.InvalidInput, + self._get_resp_create, body, '3.16') + def test_manage_volume_missing_host(self): """Test correct failure when host is not specified.""" body = {'volume': {'ref': 'fake_ref'}} diff --git a/cinder/tests/unit/api/v3/test_snapshot_manage.py b/cinder/tests/unit/api/v3/test_snapshot_manage.py index 58d2ee0af91..56ce469a60c 100644 --- a/cinder/tests/unit/api/v3/test_snapshot_manage.py +++ b/cinder/tests/unit/api/v3/test_snapshot_manage.py @@ -60,7 +60,7 @@ class SnapshotManageTest(test.TestCase): @mock.patch('cinder.volume.rpcapi.VolumeAPI.manage_existing_snapshot') @mock.patch('cinder.volume.api.API.create_snapshot_in_db') - @mock.patch('cinder.objects.service.Service.get_by_args') + @mock.patch('cinder.objects.service.Service.get_by_id') def test_manage_snapshot_route(self, mock_service_get, mock_create_snapshot, mock_rpcapi): """Test call to manage snapshot. diff --git a/cinder/tests/unit/scheduler/fakes.py b/cinder/tests/unit/scheduler/fakes.py index 071eb64e679..6e5539c9e43 100644 --- a/cinder/tests/unit/scheduler/fakes.py +++ b/cinder/tests/unit/scheduler/fakes.py @@ -23,6 +23,9 @@ from cinder.scheduler import filter_scheduler from cinder.scheduler import host_manager +UTC_NOW = timeutils.utcnow() + + class FakeFilterScheduler(filter_scheduler.FilterScheduler): def __init__(self, *args, **kwargs): super(FakeFilterScheduler, self).__init__(*args, **kwargs) @@ -43,7 +46,7 @@ class FakeHostManager(host_manager.HostManager): 'thick_provisioning_support': True, 'reserved_percentage': 10, 'volume_backend_name': 'lvm1', - 'timestamp': None}, + 'timestamp': UTC_NOW}, 'host2': {'total_capacity_gb': 2048, 'free_capacity_gb': 300, 'allocated_capacity_gb': 1748, @@ -53,7 +56,7 @@ class FakeHostManager(host_manager.HostManager): 'thick_provisioning_support': False, 'reserved_percentage': 10, 'volume_backend_name': 'lvm2', - 'timestamp': None}, + 'timestamp': UTC_NOW}, 'host3': {'total_capacity_gb': 512, 'free_capacity_gb': 256, 'allocated_capacity_gb': 256, @@ -63,7 +66,7 @@ class FakeHostManager(host_manager.HostManager): 'thick_provisioning_support': True, 'reserved_percentage': 0, 'volume_backend_name': 'lvm3', - 'timestamp': None}, + 'timestamp': UTC_NOW}, 'host4': {'total_capacity_gb': 2048, 'free_capacity_gb': 200, 'allocated_capacity_gb': 1848, @@ -73,7 +76,7 @@ class FakeHostManager(host_manager.HostManager): 'thick_provisioning_support': False, 'reserved_percentage': 5, 'volume_backend_name': 'lvm4', - 'timestamp': None, + 'timestamp': UTC_NOW, 'consistencygroup_support': True}, 'host5': {'total_capacity_gb': 'infinite', 'free_capacity_gb': 'unknown', @@ -83,13 +86,13 @@ class FakeHostManager(host_manager.HostManager): 'thin_provisioning_support': True, 'thick_provisioning_support': False, 'reserved_percentage': 5, - 'timestamp': None}, + 'timestamp': UTC_NOW}, } class FakeHostState(host_manager.HostState): def __init__(self, host, attribute_dict): - super(FakeHostState, self).__init__(host) + super(FakeHostState, self).__init__(host, None) for (key, val) in attribute_dict.items(): setattr(self, key, val) diff --git a/cinder/tests/unit/scheduler/test_base_filter.py b/cinder/tests/unit/scheduler/test_base_filter.py index 05377c3194d..a0ecdfaf83e 100644 --- a/cinder/tests/unit/scheduler/test_base_filter.py +++ b/cinder/tests/unit/scheduler/test_base_filter.py @@ -178,7 +178,7 @@ class TestBaseFilterHandler(test.TestCase): def test_get_filtered_objects_info_and_debug_log_none_returned(self): all_filters = [FilterA, FilterA, FilterB] - fake_hosts = [host_manager.HostState('fake_host%s' % x) + fake_hosts = [host_manager.HostState('fake_host%s' % x, None) for x in range(1, 4)] filt_props = {"request_spec": {'volume_id': fake.VOLUME_ID, diff --git a/cinder/tests/unit/scheduler/test_capacity_weigher.py b/cinder/tests/unit/scheduler/test_capacity_weigher.py index b15823d4661..77fb2ba2ea7 100644 --- a/cinder/tests/unit/scheduler/test_capacity_weigher.py +++ b/cinder/tests/unit/scheduler/test_capacity_weigher.py @@ -15,6 +15,7 @@ """ Tests For Capacity Weigher. """ +from datetime import datetime import ddt import mock @@ -248,7 +249,7 @@ class CapacityWeigherTestCase(test.TestCase): 'thin_provisioning_support': True, 'thick_provisioning_support': False, 'reserved_percentage': 5, - 'timestamp': None} + 'timestamp': datetime.utcnow()} hostinfo_list = self._get_all_hosts() # host1: thin_provisioning_support = False @@ -290,7 +291,7 @@ class CapacityWeigherTestCase(test.TestCase): 'thin_provisioning_support': True, 'thick_provisioning_support': False, 'reserved_percentage': 5, - 'timestamp': None} + 'timestamp': datetime.utcnow()} hostinfo_list = self._get_all_hosts() # host1: thin_provisioning_support = False @@ -332,7 +333,7 @@ class CapacityWeigherTestCase(test.TestCase): 'thin_provisioning_support': True, 'thick_provisioning_support': False, 'reserved_percentage': 5, - 'timestamp': None} + 'timestamp': datetime.utcnow()} hostinfo_list = self._get_all_hosts() # host1: thin_provisioning_support = False @@ -374,7 +375,7 @@ class CapacityWeigherTestCase(test.TestCase): 'thin_provisioning_support': True, 'thick_provisioning_support': False, 'reserved_percentage': 5, - 'timestamp': None} + 'timestamp': datetime.utcnow()} hostinfo_list = self._get_all_hosts() # host1: thin_provisioning_support = False diff --git a/cinder/tests/unit/scheduler/test_chance_weigher.py b/cinder/tests/unit/scheduler/test_chance_weigher.py index 5732aed7a42..651707af17f 100644 --- a/cinder/tests/unit/scheduler/test_chance_weigher.py +++ b/cinder/tests/unit/scheduler/test_chance_weigher.py @@ -58,7 +58,7 @@ class ChanceWeigherTestCase(test.TestCase): # ensure we don't lose any hosts when weighing with # the ChanceWeigher hm = host_manager.HostManager() - fake_hosts = [host_manager.HostState('fake_host%s' % x) + fake_hosts = [host_manager.HostState('fake_host%s' % x, None) for x in range(1, 5)] weighed_hosts = hm.get_weighed_hosts(fake_hosts, {}, 'ChanceWeigher') self.assertEqual(4, len(weighed_hosts)) diff --git a/cinder/tests/unit/scheduler/test_filter_scheduler.py b/cinder/tests/unit/scheduler/test_filter_scheduler.py index c9ed2f1ce27..f45dad7c7c7 100644 --- a/cinder/tests/unit/scheduler/test_filter_scheduler.py +++ b/cinder/tests/unit/scheduler/test_filter_scheduler.py @@ -414,7 +414,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase): filter_properties = {'retry': retry} sched = fakes.FakeFilterScheduler() - host_state = host_manager.HostState('host') + host_state = host_manager.HostState('host', None) host_state.total_capacity_gb = 1024 sched._post_select_populate_filter_properties(filter_properties, host_state) diff --git a/cinder/tests/unit/scheduler/test_host_manager.py b/cinder/tests/unit/scheduler/test_host_manager.py index ad5490bbc69..1dc509a29a8 100644 --- a/cinder/tests/unit/scheduler/test_host_manager.py +++ b/cinder/tests/unit/scheduler/test_host_manager.py @@ -19,14 +19,18 @@ Tests For HostManager from datetime import datetime import mock +from oslo_serialization import jsonutils from oslo_utils import timeutils from cinder.common import constants +from cinder import context +from cinder import db from cinder import exception from cinder import objects from cinder.scheduler import filters from cinder.scheduler import host_manager from cinder import test +from cinder.tests.unit import fake_constants as fake from cinder.tests.unit.objects import test_service @@ -46,7 +50,7 @@ class HostManagerTestCase(test.TestCase): def setUp(self): super(HostManagerTestCase, self).setUp() self.host_manager = host_manager.HostManager() - self.fake_hosts = [host_manager.HostState('fake_host%s' % x) + self.fake_hosts = [host_manager.HostState('fake_host%s' % x, None) for x in range(1, 5)] # For a second scheduler service. self.host_manager_1 = host_manager.HostManager() @@ -93,27 +97,29 @@ class HostManagerTestCase(test.TestCase): _mock_get_updated_pools): service_states = self.host_manager.service_states self.assertDictMatch({}, service_states) - _mock_utcnow.side_effect = [31337, 31338, 31339] + _mock_utcnow.side_effect = [31338, 31339] _mock_get_updated_pools.return_value = [] - host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=1) - host2_volume_capabs = dict(free_capacity_gb=5432, timestamp=1) - host3_volume_capabs = dict(free_capacity_gb=6543, timestamp=1) + timestamp = jsonutils.to_primitive(datetime.utcnow()) + host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=timestamp) + host2_volume_capabs = dict(free_capacity_gb=5432) + host3_volume_capabs = dict(free_capacity_gb=6543) service_name = 'volume' self.host_manager.update_service_capabilities(service_name, 'host1', - host1_volume_capabs) + host1_volume_capabs, + None, timestamp) self.host_manager.update_service_capabilities(service_name, 'host2', - host2_volume_capabs) + host2_volume_capabs, + None, None) self.host_manager.update_service_capabilities(service_name, 'host3', - host3_volume_capabs) + host3_volume_capabs, + None, None) # Make sure dictionary isn't re-assigned self.assertEqual(service_states, self.host_manager.service_states) - # Make sure original dictionary wasn't copied - self.assertEqual(1, host1_volume_capabs['timestamp']) - host1_volume_capabs['timestamp'] = 31337 + host1_volume_capabs['timestamp'] = timestamp host2_volume_capabs['timestamp'] = 31338 host3_volume_capabs['timestamp'] = 31339 @@ -150,7 +156,7 @@ class HostManagerTestCase(test.TestCase): # S0: update_service_capabilities() self.host_manager.update_service_capabilities(service_name, 'host1', - capab1) + capab1, None, None) self.assertDictMatch(dict(dict(timestamp=31337), **capab1), self.host_manager.service_states['host1']) @@ -168,7 +174,7 @@ class HostManagerTestCase(test.TestCase): # S1: update_service_capabilities() self.host_manager_1.update_service_capabilities(service_name, 'host1', - capab1) + capab1, None, None) self.assertDictMatch(dict(dict(timestamp=31339), **capab1), self.host_manager_1.service_states['host1']) @@ -208,7 +214,7 @@ class HostManagerTestCase(test.TestCase): # S0: update_service_capabilities() self.host_manager.update_service_capabilities(service_name, 'host1', - capab1) + capab1, None, None) self.assertDictMatch(dict(dict(timestamp=31340), **capab1), self.host_manager.service_states['host1']) @@ -219,7 +225,7 @@ class HostManagerTestCase(test.TestCase): # S1: update_service_capabilities() self.host_manager_1.update_service_capabilities(service_name, 'host1', - capab1) + capab1, None, None) self.assertDictMatch(dict(dict(timestamp=31341), **capab1), self.host_manager_1.service_states['host1']) @@ -292,7 +298,7 @@ class HostManagerTestCase(test.TestCase): # S0: update_service_capabilities() self.host_manager.update_service_capabilities(service_name, 'host1', - capab1) + capab1, None, None) self.assertDictMatch( dict(dict(timestamp=31340), **capab1), @@ -303,7 +309,7 @@ class HostManagerTestCase(test.TestCase): # S1: update_service_capabilities() self.host_manager_1.update_service_capabilities(service_name, 'host1', - capab1) + capab1, None, None) self.assertDictMatch(dict(dict(timestamp=31345), **capab1), self.host_manager_1.service_states['host1']) @@ -355,7 +361,7 @@ class HostManagerTestCase(test.TestCase): # S0: update_service_capabilities() self.host_manager.update_service_capabilities(service_name, 'host1', - capab2) + capab2, None, None) self.assertDictMatch( dict(dict(timestamp=31340), **capab1), self.host_manager.service_states_last_update['host1']) @@ -378,7 +384,7 @@ class HostManagerTestCase(test.TestCase): # S1: update_service_capabilities() self.host_manager_1.update_service_capabilities(service_name, 'host1', - capab2) + capab2, None, None) self.assertDictMatch(dict(dict(timestamp=31348), **capab2), self.host_manager_1.service_states['host1']) @@ -452,7 +458,7 @@ class HostManagerTestCase(test.TestCase): # S0: update_service_capabilities() self.host_manager.update_service_capabilities(service_name, 'host1', - capab2) + capab2, None, None) self.assertDictMatch( dict(dict(timestamp=31349), **capab2), self.host_manager.service_states_last_update['host1']) @@ -462,7 +468,7 @@ class HostManagerTestCase(test.TestCase): # S1: update_service_capabilities() self.host_manager_1.update_service_capabilities(service_name, 'host1', - capab2) + capab2, None, None) self.assertDictMatch( dict(dict(timestamp=31348), **capab2), @@ -490,19 +496,23 @@ class HostManagerTestCase(test.TestCase): self.host_manager = host_manager.HostManager() self.assertFalse(self.host_manager.has_all_capabilities()) - host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=1) - host2_volume_capabs = dict(free_capacity_gb=5432, timestamp=1) - host3_volume_capabs = dict(free_capacity_gb=6543, timestamp=1) + timestamp = jsonutils.to_primitive(datetime.utcnow()) + host1_volume_capabs = dict(free_capacity_gb=4321) + host2_volume_capabs = dict(free_capacity_gb=5432) + host3_volume_capabs = dict(free_capacity_gb=6543) service_name = 'volume' self.host_manager.update_service_capabilities(service_name, 'host1', - host1_volume_capabs) + host1_volume_capabs, + None, timestamp) self.assertFalse(self.host_manager.has_all_capabilities()) self.host_manager.update_service_capabilities(service_name, 'host2', - host2_volume_capabs) + host2_volume_capabs, + None, timestamp) self.assertFalse(self.host_manager.has_all_capabilities()) self.host_manager.update_service_capabilities(service_name, 'host3', - host3_volume_capabs) + host3_volume_capabs, + None, timestamp) self.assertTrue(self.host_manager.has_all_capabilities()) @mock.patch('cinder.db.service_get_all') @@ -532,7 +542,7 @@ class HostManagerTestCase(test.TestCase): mocked_service_states = { 'host1': dict(volume_backend_name='AAA', total_capacity_gb=512, free_capacity_gb=200, - timestamp=None, reserved_percentage=0), + timestamp=dates[1], reserved_percentage=0), } _mock_service_get_all.return_value = services @@ -547,24 +557,93 @@ class HostManagerTestCase(test.TestCase): mocked_service_states): self.host_manager.update_service_capabilities(service_name, 'host1', - host_volume_capabs) + host_volume_capabs, + None, None) res = self.host_manager.get_pools(context) self.assertEqual(1, len(res)) self.assertEqual(dates[1], res[0]['capabilities']['timestamp']) self.host_manager.update_service_capabilities(service_name, 'host1', - host_volume_capabs) + host_volume_capabs, + None, None) res = self.host_manager.get_pools(context) self.assertEqual(1, len(res)) self.assertEqual(dates[2], res[0]['capabilities']['timestamp']) + @mock.patch('cinder.objects.Service.is_up', True) + def test_get_all_host_states_cluster(self): + """Test get_all_host_states when we have clustered services. + + Confirm that clustered services are grouped and that only the latest + of the capability reports is relevant. + """ + ctxt = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True) + + cluster_name = 'cluster' + db.cluster_create(ctxt, {'name': cluster_name, + 'binary': constants.VOLUME_BINARY}) + + services = ( + db.service_create(ctxt, + {'host': 'clustered_host_1', + 'topic': constants.VOLUME_TOPIC, + 'binary': constants.VOLUME_BINARY, + 'cluster_name': cluster_name, + 'created_at': timeutils.utcnow()}), + # Even if this service is disabled, since it belongs to an enabled + # cluster, it's not really disabled. + db.service_create(ctxt, + {'host': 'clustered_host_2', + 'topic': constants.VOLUME_TOPIC, + 'binary': constants.VOLUME_BINARY, + 'disabled': True, + 'cluster_name': cluster_name, + 'created_at': timeutils.utcnow()}), + db.service_create(ctxt, + {'host': 'clustered_host_3', + 'topic': constants.VOLUME_TOPIC, + 'binary': constants.VOLUME_BINARY, + 'cluster_name': cluster_name, + 'created_at': timeutils.utcnow()}), + db.service_create(ctxt, + {'host': 'non_clustered_host', + 'topic': constants.VOLUME_TOPIC, + 'binary': constants.VOLUME_BINARY, + 'created_at': timeutils.utcnow()}), + # This service has no capabilities + db.service_create(ctxt, + {'host': 'no_capabilities_host', + 'topic': constants.VOLUME_TOPIC, + 'binary': constants.VOLUME_BINARY, + 'created_at': timeutils.utcnow()}), + ) + + capabilities = ((1, {'free_capacity_gb': 1000}), + # This is the capacity that will be selected for the + # cluster because is the one with the latest timestamp. + (3, {'free_capacity_gb': 2000}), + (2, {'free_capacity_gb': 3000}), + (1, {'free_capacity_gb': 4000})) + + for i in range(len(capabilities)): + self.host_manager.update_service_capabilities( + 'volume', services[i].host, capabilities[i][1], + services[i].cluster_name, capabilities[i][0]) + + res = self.host_manager.get_all_host_states(ctxt) + result = {(s.cluster_name or s.host, s.free_capacity_gb) for s in res} + expected = {(cluster_name + '#_pool0', 2000), + ('non_clustered_host#_pool0', 4000)} + self.assertSetEqual(expected, result) + @mock.patch('cinder.db.service_get_all') @mock.patch('cinder.objects.service.Service.is_up', new_callable=mock.PropertyMock) def test_get_all_host_states(self, _mock_service_is_up, _mock_service_get_all): context = 'fake_context' + timestamp = datetime.utcnow() topic = constants.VOLUME_TOPIC services = [ @@ -596,15 +675,15 @@ class HostManagerTestCase(test.TestCase): service_states = { 'host1': dict(volume_backend_name='AAA', total_capacity_gb=512, free_capacity_gb=200, - timestamp=None, reserved_percentage=0, + timestamp=timestamp, reserved_percentage=0, provisioned_capacity_gb=312), 'host2': dict(volume_backend_name='BBB', total_capacity_gb=256, free_capacity_gb=100, - timestamp=None, reserved_percentage=0, + timestamp=timestamp, reserved_percentage=0, provisioned_capacity_gb=156), 'host3': dict(volume_backend_name='CCC', total_capacity_gb=10000, free_capacity_gb=700, - timestamp=None, reserved_percentage=0, + timestamp=timestamp, reserved_percentage=0, provisioned_capacity_gb=9300), } # First test: service.is_up is always True, host5 is disabled, @@ -665,6 +744,7 @@ class HostManagerTestCase(test.TestCase): def test_get_pools(self, _mock_service_is_up, _mock_service_get_all): context = 'fake_context' + timestamp = datetime.utcnow() services = [ dict(id=1, host='host1', topic='volume', disabled=False, @@ -678,15 +758,15 @@ class HostManagerTestCase(test.TestCase): mocked_service_states = { 'host1': dict(volume_backend_name='AAA', total_capacity_gb=512, free_capacity_gb=200, - timestamp=None, reserved_percentage=0, + timestamp=timestamp, reserved_percentage=0, provisioned_capacity_gb=312), 'host2@back1': dict(volume_backend_name='BBB', total_capacity_gb=256, free_capacity_gb=100, - timestamp=None, reserved_percentage=0, + timestamp=timestamp, reserved_percentage=0, provisioned_capacity_gb=156), 'host2@back2': dict(volume_backend_name='CCC', total_capacity_gb=10000, free_capacity_gb=700, - timestamp=None, reserved_percentage=0, + timestamp=timestamp, reserved_percentage=0, provisioned_capacity_gb=9300), } @@ -706,7 +786,7 @@ class HostManagerTestCase(test.TestCase): { 'name': 'host1#AAA', 'capabilities': { - 'timestamp': None, + 'timestamp': timestamp, 'volume_backend_name': 'AAA', 'free_capacity_gb': 200, 'driver_version': None, @@ -719,7 +799,7 @@ class HostManagerTestCase(test.TestCase): { 'name': 'host2@back1#BBB', 'capabilities': { - 'timestamp': None, + 'timestamp': timestamp, 'volume_backend_name': 'BBB', 'free_capacity_gb': 100, 'driver_version': None, @@ -732,7 +812,7 @@ class HostManagerTestCase(test.TestCase): { 'name': 'host2@back2#CCC', 'capabilities': { - 'timestamp': None, + 'timestamp': timestamp, 'volume_backend_name': 'CCC', 'free_capacity_gb': 700, 'driver_version': None, @@ -887,7 +967,7 @@ class HostStateTestCase(test.TestCase): """Test case for HostState class.""" def test_update_from_volume_capability_nopool(self): - fake_host = host_manager.HostState('host1') + fake_host = host_manager.HostState('host1', None) self.assertIsNone(fake_host.free_capacity_gb) volume_capability = {'total_capacity_gb': 1024, @@ -922,7 +1002,7 @@ class HostStateTestCase(test.TestCase): self.assertRaises(KeyError, lambda: fake_host.pools['pool0']) def test_update_from_volume_capability_with_pools(self): - fake_host = host_manager.HostState('host1') + fake_host = host_manager.HostState('host1', None) self.assertIsNone(fake_host.free_capacity_gb) capability = { 'volume_backend_name': 'Local iSCSI', @@ -1014,7 +1094,7 @@ class HostStateTestCase(test.TestCase): fake_host.pools['3rd pool'].provisioned_capacity_gb) def test_update_from_volume_infinite_capability(self): - fake_host = host_manager.HostState('host1') + fake_host = host_manager.HostState('host1', None) self.assertIsNone(fake_host.free_capacity_gb) volume_capability = {'total_capacity_gb': 'infinite', @@ -1035,7 +1115,7 @@ class HostStateTestCase(test.TestCase): fake_host.pools['_pool0'].free_capacity_gb) def test_update_from_volume_unknown_capability(self): - fake_host = host_manager.HostState('host1') + fake_host = host_manager.HostState('host1', None) self.assertIsNone(fake_host.free_capacity_gb) volume_capability = {'total_capacity_gb': 'infinite', @@ -1056,7 +1136,7 @@ class HostStateTestCase(test.TestCase): fake_host.pools['_pool0'].free_capacity_gb) def test_update_from_empty_volume_capability(self): - fake_host = host_manager.HostState('host1') + fake_host = host_manager.HostState('host1', None) vol_cap = {'timestamp': None} @@ -1076,7 +1156,7 @@ class PoolStateTestCase(test.TestCase): """Test case for HostState class.""" def test_update_from_volume_capability(self): - fake_pool = host_manager.PoolState('host1', None, 'pool0') + fake_pool = host_manager.PoolState('host1', None, None, 'pool0') self.assertIsNone(fake_pool.free_capacity_gb) volume_capability = {'total_capacity_gb': 1024, diff --git a/cinder/tests/unit/scheduler/test_rpcapi.py b/cinder/tests/unit/scheduler/test_rpcapi.py index d3f68f89ba4..3ffbe2b7b2a 100644 --- a/cinder/tests/unit/scheduler/test_rpcapi.py +++ b/cinder/tests/unit/scheduler/test_rpcapi.py @@ -17,6 +17,7 @@ Unit Tests for cinder.scheduler.rpcapi """ +import ddt import mock from cinder import context @@ -27,6 +28,7 @@ from cinder.tests.unit import fake_constants from cinder.tests.unit import fake_volume +@ddt.ddt class SchedulerRpcAPITestCase(test.TestCase): def setUp(self): @@ -75,14 +77,20 @@ class SchedulerRpcAPITestCase(test.TestCase): for kwarg, value in self.fake_kwargs.items(): self.assertEqual(expected_msg[kwarg], value) - def test_update_service_capabilities(self): + @ddt.data('3.0', '3.3') + @mock.patch('oslo_messaging.RPCClient.can_send_version') + def test_update_service_capabilities(self, version, can_send_version): + can_send_version.side_effect = lambda x: x == version self._test_scheduler_api('update_service_capabilities', rpc_method='cast', service_name='fake_name', host='fake_host', - capabilities='fake_capabilities', + cluster_name='cluster_name', + capabilities={}, fanout=True, - version='3.0') + version=version, + timestamp='123') + can_send_version.assert_called_once_with('3.3') def test_create_volume(self): volume = fake_volume.fake_volume_obj(self.context) @@ -135,17 +143,18 @@ class SchedulerRpcAPITestCase(test.TestCase): version='3.0') create_worker_mock.assert_called_once() - def test_migrate_volume_to_host(self): + @mock.patch('oslo_messaging.RPCClient.can_send_version') + def test_migrate_volume(self, can_send_version): volume = fake_volume.fake_volume_obj(self.context) create_worker_mock = self.mock_object(volume, 'create_worker') - self._test_scheduler_api('migrate_volume_to_host', + self._test_scheduler_api('migrate_volume', rpc_method='cast', - host='host', - force_host_copy=True, + backend='host', + force_copy=True, request_spec='fake_request_spec', filter_properties='filter_properties', volume=volume, - version='3.0') + version='3.3') create_worker_mock.assert_not_called() def test_retype(self): diff --git a/cinder/tests/unit/scheduler/test_scheduler.py b/cinder/tests/unit/scheduler/test_scheduler.py index b5902dfab91..9e4b6f8a1e1 100644 --- a/cinder/tests/unit/scheduler/test_scheduler.py +++ b/cinder/tests/unit/scheduler/test_scheduler.py @@ -103,7 +103,7 @@ class SchedulerManagerTestCase(test.TestCase): self.manager.update_service_capabilities(self.context, service_name=service, host=host) - _mock_update_cap.assert_called_once_with(service, host, {}) + _mock_update_cap.assert_called_once_with(service, host, {}, None, None) @mock.patch('cinder.scheduler.driver.Scheduler.' 'update_service_capabilities') @@ -117,7 +117,8 @@ class SchedulerManagerTestCase(test.TestCase): service_name=service, host=host, capabilities=capabilities) - _mock_update_cap.assert_called_once_with(service, host, capabilities) + _mock_update_cap.assert_called_once_with(service, host, capabilities, + None, None) @mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume') @mock.patch('cinder.message.api.API.create') @@ -164,6 +165,19 @@ class SchedulerManagerTestCase(test.TestCase): request_spec_obj, {}) self.assertFalse(_mock_sleep.called) + @mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume') + @mock.patch('eventlet.sleep') + def test_create_volume_set_worker(self, _mock_sleep, _mock_sched_create): + """Make sure that the worker is created when creating a volume.""" + volume = tests_utils.create_volume(self.context, status='creating') + + request_spec = {'volume_id': volume.id} + + self.manager.create_volume(self.context, volume, + request_spec=request_spec, + filter_properties={}) + volume.set_worker.assert_called_once_with() + @mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume') @mock.patch('cinder.scheduler.driver.Scheduler.is_ready') @mock.patch('eventlet.sleep') @@ -326,6 +340,13 @@ class SchedulerManagerTestCase(test.TestCase): self.manager.driver = original_driver + def test_do_cleanup(self): + vol = tests_utils.create_volume(self.context, status='creating') + self.manager._do_cleanup(self.context, vol) + + vol.refresh() + self.assertEqual('error', vol.status) + class SchedulerTestCase(test.TestCase): """Test case for base scheduler driver class.""" @@ -346,9 +367,9 @@ class SchedulerTestCase(test.TestCase): host = 'fake_host' capabilities = {'fake_capability': 'fake_value'} self.driver.update_service_capabilities(service_name, host, - capabilities) + capabilities, None) _mock_update_cap.assert_called_once_with(service_name, host, - capabilities) + capabilities, None) @mock.patch('cinder.scheduler.host_manager.HostManager.' 'has_all_capabilities', return_value=False) @@ -387,8 +408,10 @@ class SchedulerDriverModuleTestCase(test.TestCase): volume = fake_volume.fake_volume_obj(self.context) _mock_volume_get.return_value = volume - driver.volume_update_db(self.context, volume.id, 'fake_host') + driver.volume_update_db(self.context, volume.id, 'fake_host', + 'fake_cluster') scheduled_at = volume.scheduled_at.replace(tzinfo=None) _mock_vol_update.assert_called_once_with( self.context, volume.id, {'host': 'fake_host', + 'cluster_name': 'fake_cluster', 'scheduled_at': scheduled_at}) diff --git a/cinder/tests/unit/test_db_api.py b/cinder/tests/unit/test_db_api.py index fd5b2ad7f07..1598fa12f11 100644 --- a/cinder/tests/unit/test_db_api.py +++ b/cinder/tests/unit/test_db_api.py @@ -167,6 +167,39 @@ class DBAPIServiceTestCase(BaseTest): real_service1 = db.service_get(self.ctxt, host='host1', topic='topic1') self._assertEqualObjects(service1, real_service1) + def test_service_get_all_disabled_by_cluster(self): + values = [ + # Enabled services + {'host': 'host1', 'binary': 'b1', 'disabled': False}, + {'host': 'host2', 'binary': 'b1', 'disabled': False, + 'cluster_name': 'enabled_cluster'}, + {'host': 'host3', 'binary': 'b1', 'disabled': True, + 'cluster_name': 'enabled_cluster'}, + + # Disabled services + {'host': 'host4', 'binary': 'b1', 'disabled': True}, + {'host': 'host5', 'binary': 'b1', 'disabled': False, + 'cluster_name': 'disabled_cluster'}, + {'host': 'host6', 'binary': 'b1', 'disabled': True, + 'cluster_name': 'disabled_cluster'}, + ] + + db.cluster_create(self.ctxt, {'name': 'enabled_cluster', + 'binary': 'b1', + 'disabled': False}), + db.cluster_create(self.ctxt, {'name': 'disabled_cluster', + 'binary': 'b1', + 'disabled': True}), + services = [self._create_service(vals) for vals in values] + + enabled = db.service_get_all(self.ctxt, disabled=False) + disabled = db.service_get_all(self.ctxt, disabled=True) + + self.assertSetEqual({s.host for s in services[:3]}, + {s.host for s in enabled}) + self.assertSetEqual({s.host for s in services[3:]}, + {s.host for s in disabled}) + def test_service_get_all(self): expired = (datetime.datetime.utcnow() - datetime.timedelta(seconds=CONF.service_down_time + 1)) diff --git a/cinder/tests/unit/test_volume.py b/cinder/tests/unit/test_volume.py index 6f949e1bfb8..5016fa271ae 100644 --- a/cinder/tests/unit/test_volume.py +++ b/cinder/tests/unit/test_volume.py @@ -344,20 +344,19 @@ class VolumeTestCase(base.BaseVolumeTestCase): def test_init_host_added_to_cluster(self, cg_include_mock, vol_include_mock, vol_get_all_mock, snap_get_all_mock): - self.mock_object(self.volume, 'cluster', mock.sentinel.cluster) + cluster = str(mock.sentinel.cluster) + self.mock_object(self.volume, 'cluster', cluster) self.volume.init_host(added_to_cluster=True, service_id=self.service_id) - vol_include_mock.assert_called_once_with(mock.ANY, - mock.sentinel.cluster, + vol_include_mock.assert_called_once_with(mock.ANY, cluster, host=self.volume.host) - cg_include_mock.assert_called_once_with(mock.ANY, - mock.sentinel.cluster, + cg_include_mock.assert_called_once_with(mock.ANY, cluster, host=self.volume.host) vol_get_all_mock.assert_called_once_with( - mock.ANY, filters={'cluster_name': mock.sentinel.cluster}) + mock.ANY, filters={'cluster_name': cluster}) snap_get_all_mock.assert_called_once_with( - mock.ANY, search_opts={'cluster_name': mock.sentinel.cluster}) + mock.ANY, search_opts={'cluster_name': cluster}) @mock.patch('cinder.objects.service.Service.get_minimum_rpc_version') @mock.patch('cinder.objects.service.Service.get_minimum_obj_version') @@ -4785,7 +4784,7 @@ class VolumeMigrationTestCase(base.BaseVolumeTestCase): self.assertEqual('newhost', volume.host) self.assertEqual('success', volume.migration_status) - def _fake_create_volume(self, ctxt, volume, host, req_spec, filters, + def _fake_create_volume(self, ctxt, volume, req_spec, filters, allow_reschedule=True): return db.volume_update(ctxt, volume['id'], {'status': self.expected_status}) @@ -4880,7 +4879,7 @@ class VolumeMigrationTestCase(base.BaseVolumeTestCase): nova_api, create_volume, save): def fake_create_volume(*args, **kwargs): - context, volume, host, request_spec, filter_properties = args + context, volume, request_spec, filter_properties = args fake_db = mock.Mock() task = create_volume_manager.ExtractVolumeSpecTask(fake_db) specs = task.execute(context, volume, {}) @@ -4916,7 +4915,7 @@ class VolumeMigrationTestCase(base.BaseVolumeTestCase): migrate_volume_completion, nova_api, create_volume, save): def fake_create_volume(*args, **kwargs): - context, volume, host, request_spec, filter_properties = args + context, volume, request_spec, filter_properties = args fake_db = mock.Mock() task = create_volume_manager.ExtractVolumeSpecTask(fake_db) specs = task.execute(context, volume, {}) diff --git a/cinder/tests/unit/test_volume_rpcapi.py b/cinder/tests/unit/test_volume_rpcapi.py index 7cb308b9a4c..01fc9ef332b 100644 --- a/cinder/tests/unit/test_volume_rpcapi.py +++ b/cinder/tests/unit/test_volume_rpcapi.py @@ -26,6 +26,7 @@ from cinder.common import constants from cinder import context from cinder import db from cinder import objects +from cinder.objects import base as ovo_base from cinder.objects import fields from cinder import test from cinder.tests.unit.backup import fake_backup @@ -158,8 +159,12 @@ class VolumeRpcAPITestCase(test.TestCase): if 'dest_host' in expected_msg: dest_host = expected_msg.pop('dest_host') dest_host_dict = {'host': dest_host.host, + 'cluster_name': dest_host.cluster_name, 'capabilities': dest_host.capabilities} expected_msg['host'] = dest_host_dict + if 'force_copy' in expected_msg: + expected_msg['force_host_copy'] = expected_msg.pop('force_copy') + if 'new_volume' in expected_msg: volume = expected_msg['new_volume'] expected_msg['new_volume_id'] = volume['id'] @@ -229,26 +234,11 @@ class VolumeRpcAPITestCase(test.TestCase): self.assertEqual(expected_arg, arg) for kwarg, value in self.fake_kwargs.items(): - if isinstance(value, objects.Snapshot): - expected_snapshot = expected_msg[kwarg].obj_to_primitive() - snapshot = value.obj_to_primitive() - self.assertEqual(expected_snapshot, snapshot) - elif isinstance(value, objects.ConsistencyGroup): - expected_cg = expected_msg[kwarg].obj_to_primitive() - cg = value.obj_to_primitive() - self.assertEqual(expected_cg, cg) - elif isinstance(value, objects.CGSnapshot): - expected_cgsnapshot = expected_msg[kwarg].obj_to_primitive() - cgsnapshot = value.obj_to_primitive() - self.assertEqual(expected_cgsnapshot, cgsnapshot) - elif isinstance(value, objects.Volume): - expected_volume = expected_msg[kwarg].obj_to_primitive() - volume = value.obj_to_primitive() - self.assertDictEqual(expected_volume, volume) - elif isinstance(value, objects.Backup): - expected_backup = expected_msg[kwarg].obj_to_primitive() - backup = value.obj_to_primitive() - self.assertEqual(expected_backup, backup) + if isinstance(value, ovo_base.CinderObject): + expected = expected_msg[kwarg].obj_to_primitive() + primitive = value.obj_to_primitive() + self.assertEqual(expected, primitive) + else: self.assertEqual(expected_msg[kwarg], value) @@ -328,8 +318,7 @@ class VolumeRpcAPITestCase(test.TestCase): def test_create_consistencygroup(self): self._test_volume_api('create_consistencygroup', rpc_method='cast', - group=self.fake_cg, host='fake_host1', - version='3.0') + group=self.fake_cg, version='3.0') def test_delete_consistencygroup(self): self._test_volume_api('delete_consistencygroup', rpc_method='cast', @@ -358,7 +347,6 @@ class VolumeRpcAPITestCase(test.TestCase): self._test_volume_api('create_volume', rpc_method='cast', volume=self.fake_volume_obj, - host='fake_host1', request_spec='fake_request_spec', filter_properties='fake_properties', allow_reschedule=True, @@ -540,7 +528,13 @@ class VolumeRpcAPITestCase(test.TestCase): '-8ffd-0800200c9a66', version='3.0') - def test_extend_volume(self): + def _change_cluster_name(self, resource, cluster_name): + resource.cluster_name = cluster_name + resource.obj_reset_changes() + + @ddt.data(None, 'mycluster') + def test_extend_volume(self, cluster_name): + self._change_cluster_name(self.fake_volume_obj, cluster_name) self._test_volume_api('extend_volume', rpc_method='cast', volume=self.fake_volume_obj, @@ -548,10 +542,12 @@ class VolumeRpcAPITestCase(test.TestCase): reservations=self.fake_reservations, version='3.0') - def test_migrate_volume(self): + @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True) + def test_migrate_volume(self, can_send_version): class FakeHost(object): def __init__(self): self.host = 'host' + self.cluster_name = 'cluster_name' self.capabilities = {} dest_host = FakeHost() self._test_volume_api('migrate_volume', @@ -559,7 +555,7 @@ class VolumeRpcAPITestCase(test.TestCase): volume=self.fake_volume_obj, dest_host=dest_host, force_host_copy=True, - version='3.0') + version='3.5') def test_migrate_volume_completion(self): self._test_volume_api('migrate_volume_completion', @@ -569,10 +565,12 @@ class VolumeRpcAPITestCase(test.TestCase): error=False, version='3.0') - def test_retype(self): + @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True) + def test_retype(self, can_send_version): class FakeHost(object): def __init__(self): self.host = 'host' + self.cluster_name = 'cluster_name' self.capabilities = {} dest_host = FakeHost() self._test_volume_api('retype', @@ -583,7 +581,7 @@ class VolumeRpcAPITestCase(test.TestCase): migration_policy='never', reservations=self.fake_reservations, old_reservations=self.fake_reservations, - version='3.0') + version='3.5') def test_manage_existing(self): self._test_volume_api('manage_existing', @@ -685,8 +683,7 @@ class VolumeRpcAPITestCase(test.TestCase): def test_create_group(self): self._test_group_api('create_group', rpc_method='cast', - group=self.fake_group, host='fake_host1', - version='3.0') + group=self.fake_group, version='3.0') def test_delete_group(self): self._test_group_api('delete_group', rpc_method='cast', diff --git a/cinder/volume/api.py b/cinder/volume/api.py index 82e88d02f3a..0c85334e6cc 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -1344,32 +1344,47 @@ class API(base.Base): resource=volume) @wrap_check_policy - def migrate_volume(self, context, volume, host, force_host_copy, + def migrate_volume(self, context, volume, host, cluster_name, force_copy, lock_volume): - """Migrate the volume to the specified host.""" - # Make sure the host is in the list of available hosts + """Migrate the volume to the specified host or cluster.""" elevated = context.elevated() - topic = constants.VOLUME_TOPIC - services = objects.ServiceList.get_all_by_topic( - elevated, topic, disabled=False) - found = False - svc_host = volume_utils.extract_host(host, 'backend') - for service in services: - if service.is_up and service.host == svc_host: - found = True - break - if not found: - msg = _('No available service named %s') % host + + # If we received a request to migrate to a host + # Look for the service - must be up and enabled + svc_host = host and volume_utils.extract_host(host, 'backend') + svc_cluster = cluster_name and volume_utils.extract_host(cluster_name, + 'backend') + # NOTE(geguileo): Only svc_host or svc_cluster is set, so when we get + # a service from the DB we are getting either one specific service from + # a host or any service from a cluster that is up, which means that the + # cluster itself is also up. + try: + svc = objects.Service.get_by_id(elevated, None, is_up=True, + topic=constants.VOLUME_TOPIC, + host=svc_host, disabled=False, + cluster_name=svc_cluster, + backend_match_level='pool') + except exception.ServiceNotFound: + msg = _('No available service named %s') % cluster_name or host LOG.error(msg) raise exception.InvalidHost(reason=msg) + # Even if we were requested to do a migration to a host, if the host is + # in a cluster we will do a cluster migration. + cluster_name = svc.cluster_name # Build required conditions for conditional update expected = {'status': ('available', 'in-use'), 'migration_status': self.AVAILABLE_MIGRATION_STATUS, 'replication_status': (None, 'disabled'), 'consistencygroup_id': (None, ''), - 'group_id': (None, ''), - 'host': db.Not(host)} + 'group_id': (None, '')} + + # We want to make sure that the migration is to another host or + # another cluster. + if cluster_name: + expected['cluster_name'] = db.Not(cluster_name) + else: + expected['host'] = db.Not(host) filters = [~db.volume_has_snapshots_filter()] @@ -1392,8 +1407,8 @@ class API(base.Base): if not result: msg = _('Volume %s status must be available or in-use, must not ' 'be migrating, have snapshots, be replicated, be part of ' - 'a group and destination host must be different than the ' - 'current host') % {'vol_id': volume.id} + 'a group and destination host/cluster must be different ' + 'than the current one') % {'vol_id': volume.id} LOG.error(msg) raise exception.InvalidVolume(reason=msg) @@ -1406,11 +1421,11 @@ class API(base.Base): request_spec = {'volume_properties': volume, 'volume_type': volume_type, 'volume_id': volume.id} - self.scheduler_rpcapi.migrate_volume_to_host(context, - volume, - host, - force_host_copy, - request_spec) + self.scheduler_rpcapi.migrate_volume(context, + volume, + cluster_name or host, + force_copy, + request_spec) LOG.info(_LI("Migrate volume request issued successfully."), resource=volume) @@ -1556,19 +1571,31 @@ class API(base.Base): LOG.info(_LI("Retype volume request issued successfully."), resource=volume) - def _get_service_by_host(self, context, host, resource='volume'): + def _get_service_by_host_cluster(self, context, host, cluster_name, + resource='volume'): elevated = context.elevated() + + svc_cluster = cluster_name and volume_utils.extract_host(cluster_name, + 'backend') + svc_host = host and volume_utils.extract_host(host, 'backend') + + # NOTE(geguileo): Only svc_host or svc_cluster is set, so when we get + # a service from the DB we are getting either one specific service from + # a host or any service that is up from a cluster, which means that the + # cluster itself is also up. try: - svc_host = volume_utils.extract_host(host, 'backend') - service = objects.Service.get_by_args( - elevated, svc_host, 'cinder-volume') + service = objects.Service.get_by_id(elevated, None, host=svc_host, + binary='cinder-volume', + cluster_name=svc_cluster) except exception.ServiceNotFound: with excutils.save_and_reraise_exception(): LOG.error(_LE('Unable to find service: %(service)s for ' - 'given host: %(host)s.'), - {'service': constants.VOLUME_BINARY, 'host': host}) + 'given host: %(host)s and cluster %(cluster)s.'), + {'service': constants.VOLUME_BINARY, 'host': host, + 'cluster': cluster_name}) - if service.disabled: + if service.disabled and (not service.cluster_name or + service.cluster.disabled): LOG.error(_LE('Unable to manage existing %s on a disabled ' 'service.'), resource) raise exception.ServiceUnavailable() @@ -1580,15 +1607,16 @@ class API(base.Base): return service - def manage_existing(self, context, host, ref, name=None, description=None, - volume_type=None, metadata=None, + def manage_existing(self, context, host, cluster_name, ref, name=None, + description=None, volume_type=None, metadata=None, availability_zone=None, bootable=False): if volume_type and 'extra_specs' not in volume_type: extra_specs = volume_types.get_volume_type_extra_specs( volume_type['id']) volume_type['extra_specs'] = extra_specs - service = self._get_service_by_host(context, host) + service = self._get_service_by_host_cluster(context, host, + cluster_name) if availability_zone is None: availability_zone = service.availability_zone @@ -1597,7 +1625,8 @@ class API(base.Base): 'context': context, 'name': name, 'description': description, - 'host': host, + 'host': service.host, + 'cluster_name': service.cluster_name, 'ref': ref, 'volume_type': volume_type, 'metadata': metadata, @@ -1626,7 +1655,7 @@ class API(base.Base): def get_manageable_volumes(self, context, host, marker=None, limit=None, offset=None, sort_keys=None, sort_dirs=None): - self._get_service_by_host(context, host) + self._get_service_by_host_cluster(context, host, None) return self.volume_rpcapi.get_manageable_volumes(context, host, marker, limit, offset, sort_keys, @@ -1635,18 +1664,21 @@ class API(base.Base): def manage_existing_snapshot(self, context, ref, volume, name=None, description=None, metadata=None): - service = self._get_service_by_host(context, volume.host, 'snapshot') + service = self._get_service_by_host_cluster(context, volume.host, + volume.cluster_name, + 'snapshot') + snapshot_object = self.create_snapshot_in_db(context, volume, name, description, True, metadata, None, commit_quota=False) - self.volume_rpcapi.manage_existing_snapshot(context, snapshot_object, - ref, service.host) + self.volume_rpcapi.manage_existing_snapshot( + context, snapshot_object, ref, service.service_topic_queue) return snapshot_object def get_manageable_snapshots(self, context, host, marker=None, limit=None, offset=None, sort_keys=None, sort_dirs=None): - self._get_service_by_host(context, host, resource='snapshot') + self._get_service_by_host_cluster(context, host, None, 'snapshot') return self.volume_rpcapi.get_manageable_snapshots(context, host, marker, limit, offset, sort_keys, diff --git a/cinder/volume/driver.py b/cinder/volume/driver.py index a7379e4be31..b154d6a4cf7 100644 --- a/cinder/volume/driver.py +++ b/cinder/volume/driver.py @@ -339,6 +339,7 @@ class BaseVD(object): # NOTE(vish): db is set by Manager self.db = kwargs.get('db') self.host = kwargs.get('host') + self.cluster_name = kwargs.get('cluster_name') self.configuration = kwargs.get('configuration', None) if self.configuration: diff --git a/cinder/volume/flows/api/create_volume.py b/cinder/volume/flows/api/create_volume.py index c4f14eac3d0..f21255d84d6 100644 --- a/cinder/volume/flows/api/create_volume.py +++ b/cinder/volume/flows/api/create_volume.py @@ -703,13 +703,13 @@ class VolumeCastTask(flow_utils.CinderTask): self.db = db def _cast_create_volume(self, context, request_spec, filter_properties): - source_volid = request_spec['source_volid'] - source_replicaid = request_spec['source_replicaid'] + source_volume_ref = None + source_volid = (request_spec['source_volid'] or + request_spec['source_replicaid']) volume = request_spec['volume'] snapshot_id = request_spec['snapshot_id'] image_id = request_spec['image_id'] cgroup_id = request_spec['consistencygroup_id'] - host = None cgsnapshot_id = request_spec['cgsnapshot_id'] group_id = request_spec['group_id'] if cgroup_id: @@ -734,19 +734,11 @@ class VolumeCastTask(flow_utils.CinderTask): # snapshot resides instead of passing it through the scheduler, so # snapshot can be copied to the new volume. snapshot = objects.Snapshot.get_by_id(context, snapshot_id) - source_volume_ref = objects.Volume.get_by_id(context, - snapshot.volume_id) - host = source_volume_ref.host + source_volume_ref = snapshot.volume elif source_volid: - source_volume_ref = objects.Volume.get_by_id(context, - source_volid) - host = source_volume_ref.host - elif source_replicaid: - source_volume_ref = objects.Volume.get_by_id(context, - source_replicaid) - host = source_volume_ref.host + source_volume_ref = objects.Volume.get_by_id(context, source_volid) - if not host: + if not source_volume_ref: # Cast to the scheduler and let it handle whatever is needed # to select the target host for this volume. self.scheduler_rpcapi.create_volume( @@ -759,14 +751,14 @@ class VolumeCastTask(flow_utils.CinderTask): else: # Bypass the scheduler and send the request directly to the volume # manager. - volume.host = host + volume.host = source_volume_ref.host + volume.cluster_name = source_volume_ref.cluster_name volume.scheduled_at = timeutils.utcnow() volume.save() if not cgsnapshot_id: self.volume_rpcapi.create_volume( context, volume, - volume.host, request_spec, filter_properties, allow_reschedule=False) diff --git a/cinder/volume/flows/api/manage_existing.py b/cinder/volume/flows/api/manage_existing.py index a683d17261c..ca23c006e09 100644 --- a/cinder/volume/flows/api/manage_existing.py +++ b/cinder/volume/flows/api/manage_existing.py @@ -37,7 +37,8 @@ class EntryCreateTask(flow_utils.CinderTask): def __init__(self, db): requires = ['availability_zone', 'description', 'metadata', - 'name', 'host', 'bootable', 'volume_type', 'ref'] + 'name', 'host', 'cluster_name', 'bootable', 'volume_type', + 'ref'] super(EntryCreateTask, self).__init__(addons=[ACTION], requires=requires) self.db = db @@ -62,6 +63,7 @@ class EntryCreateTask(flow_utils.CinderTask): 'display_description': kwargs.pop('description'), 'display_name': kwargs.pop('name'), 'host': kwargs.pop('host'), + 'cluster_name': kwargs.pop('cluster_name'), 'availability_zone': kwargs.pop('availability_zone'), 'volume_type_id': volume_type_id, 'metadata': kwargs.pop('metadata') or {}, diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index f70c03f73b9..45e5e05c215 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -222,6 +222,7 @@ class VolumeManager(manager.CleanableManager, configuration=self.configuration, db=self.db, host=self.host, + cluster_name=self.cluster, is_vol_db_empty=vol_db_empty, active_backend_id=curr_active_backend_id) @@ -548,6 +549,12 @@ class VolumeManager(manager.CleanableManager, """ return self.driver.initialized + def _set_resource_host(self, resource): + """Set the host field on the DB to our own when we are clustered.""" + if resource.is_clustered and resource.host != self.host: + resource.host = self.host + resource.save() + @objects.Volume.set_workers def create_volume(self, context, volume, request_spec=None, filter_properties=None, allow_reschedule=True): @@ -555,6 +562,9 @@ class VolumeManager(manager.CleanableManager, # Log about unsupported drivers utils.log_unsupported_driver_warning(self.driver) + # Make sure the host in the DB matches our own when clustered + self._set_resource_host(volume) + context_elevated = context.elevated() if filter_properties is None: filter_properties = {} @@ -1683,12 +1693,13 @@ class VolumeManager(manager.CleanableManager, remote=src_remote, attach_encryptor=attach_encryptor) - def _migrate_volume_generic(self, ctxt, volume, host, new_type_id): + def _migrate_volume_generic(self, ctxt, volume, backend, new_type_id): rpcapi = volume_rpcapi.VolumeAPI() # Create new volume on remote host tmp_skip = {'snapshot_id', 'source_volid'} - skip = self._VOLUME_CLONE_SKIP_PROPERTIES | tmp_skip | {'host'} + skip = self._VOLUME_CLONE_SKIP_PROPERTIES | tmp_skip | {'host', + 'cluster_name'} new_vol_values = {k: volume[k] for k in set(volume.keys()) - skip} if new_type_id: new_vol_values['volume_type_id'] = new_type_id @@ -1700,15 +1711,16 @@ class VolumeManager(manager.CleanableManager, new_volume = objects.Volume( context=ctxt, - host=host['host'], + host=backend['host'], + cluster_name=backend.get('cluster_name'), status='creating', attach_status=fields.VolumeAttachStatus.DETACHED, migration_status='target:%s' % volume['id'], **new_vol_values ) new_volume.create() - rpcapi.create_volume(ctxt, new_volume, host['host'], - None, None, allow_reschedule=False) + rpcapi.create_volume(ctxt, new_volume, None, None, + allow_reschedule=False) # Wait for new_volume to become ready starttime = time.time() @@ -1720,13 +1732,13 @@ class VolumeManager(manager.CleanableManager, tries += 1 now = time.time() if new_volume.status == 'error': - msg = _("failed to create new_volume on destination host") + msg = _("failed to create new_volume on destination") self._clean_temporary_volume(ctxt, volume, new_volume, clean_db_only=True) raise exception.VolumeMigrationFailed(reason=msg) elif now > deadline: - msg = _("timeout creating new_volume on destination host") + msg = _("timeout creating new_volume on destination") self._clean_temporary_volume(ctxt, volume, new_volume, clean_db_only=True) @@ -1931,6 +1943,7 @@ class VolumeManager(manager.CleanableManager, host) if moved: updates = {'host': host['host'], + 'cluster_name': host.get('cluster_name'), 'migration_status': 'success', 'previous_status': volume.status} if status_update: @@ -1948,8 +1961,7 @@ class VolumeManager(manager.CleanableManager, volume.save() if not moved: try: - self._migrate_volume_generic(ctxt, volume, host, - new_type_id) + self._migrate_volume_generic(ctxt, volume, host, new_type_id) except Exception: with excutils.save_and_reraise_exception(): updates = {'migration_status': 'error'} @@ -2238,17 +2250,22 @@ class VolumeManager(manager.CleanableManager, # Call driver to try and change the type retype_model_update = None - # NOTE(jdg): Check to see if the destination host is the same - # as the current. If it's not don't call the driver.retype - # method, otherwise drivers that implement retype may report - # success, but it's invalid in the case of a migrate. + # NOTE(jdg): Check to see if the destination host or cluster (depending + # if it's the volume is in a clustered backend or not) is the same as + # the current. If it's not don't call the driver.retype method, + # otherwise drivers that implement retype may report success, but it's + # invalid in the case of a migrate. # We assume that those that support pools do this internally # so we strip off the pools designation + if (not retyped and not diff.get('encryption') and - vol_utils.hosts_are_equivalent(self.driver.host, - host['host'])): + ((not host.get('cluster_name') and + vol_utils.hosts_are_equivalent(self.driver.host, + host['host'])) or + (vol_utils.hosts_are_equivalent(self.driver.cluster_name, + host.get('cluster_name'))))): try: new_type = volume_types.get_volume_type(context, new_type_id) ret = self.driver.retype(context, @@ -2311,6 +2328,7 @@ class VolumeManager(manager.CleanableManager, else: model_update = {'volume_type_id': new_type_id, 'host': host['host'], + 'cluster_name': host.get('cluster_name'), 'status': status_update['status']} if retype_model_update: model_update.update(retype_model_update) @@ -2407,6 +2425,9 @@ class VolumeManager(manager.CleanableManager, def _create_group(self, context, group, is_generic_group=True): context = context.elevated() + # Make sure the host in the DB matches our own when clustered + self._set_resource_host(group) + status = fields.GroupStatus.AVAILABLE model_update = None diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py index a1498fa228d..368660db368 100644 --- a/cinder/volume/rpcapi.py +++ b/cinder/volume/rpcapi.py @@ -115,9 +115,10 @@ class VolumeAPI(rpc.RPCAPI): get_backup_device(). 3.3 - Adds support for sending objects over RPC in attach_volume(). 3.4 - Adds support for sending objects over RPC in detach_volume(). + 3.5 - Adds support for cluster in retype and migrate_volume """ - RPC_API_VERSION = '3.4' + RPC_API_VERSION = '3.5' RPC_DEFAULT_VERSION = '3.0' TOPIC = constants.VOLUME_TOPIC BINARY = 'cinder-volume' @@ -125,10 +126,10 @@ class VolumeAPI(rpc.RPCAPI): def _get_cctxt(self, host=None, version=None, **kwargs): if host is not None: kwargs['server'] = utils.get_volume_rpc_host(host) - return super(VolumeAPI, self)._get_cctxt(version, **kwargs) + return super(VolumeAPI, self)._get_cctxt(version=version, **kwargs) - def create_consistencygroup(self, ctxt, group, host): - cctxt = self._get_cctxt(host) + def create_consistencygroup(self, ctxt, group): + cctxt = self._get_cctxt(group.service_topic_queue) cctxt.cast(ctxt, 'create_consistencygroup', group=group) def delete_consistencygroup(self, ctxt, group): @@ -145,7 +146,7 @@ class VolumeAPI(rpc.RPCAPI): def create_consistencygroup_from_src(self, ctxt, group, cgsnapshot=None, source_cg=None): - cctxt = self._get_cctxt(group.host) + cctxt = self._get_cctxt(group.service_topic_queue) cctxt.cast(ctxt, 'create_consistencygroup_from_src', group=group, cgsnapshot=cgsnapshot, @@ -159,9 +160,9 @@ class VolumeAPI(rpc.RPCAPI): cctxt = self._get_cctxt(cgsnapshot.service_topic_queue) cctxt.cast(ctxt, 'delete_cgsnapshot', cgsnapshot=cgsnapshot) - def create_volume(self, ctxt, volume, host, request_spec, - filter_properties, allow_reschedule=True): - cctxt = self._get_cctxt(host) + def create_volume(self, ctxt, volume, request_spec, filter_properties, + allow_reschedule=True): + cctxt = self._get_cctxt(volume.service_topic_queue) cctxt.cast(ctxt, 'create_volume', request_spec=request_spec, filter_properties=filter_properties, @@ -239,35 +240,48 @@ class VolumeAPI(rpc.RPCAPI): new_user=new_user, new_project=new_project) def extend_volume(self, ctxt, volume, new_size, reservations): - cctxt = self._get_cctxt(volume.host) + cctxt = self._get_cctxt(volume.service_topic_queue) cctxt.cast(ctxt, 'extend_volume', volume=volume, new_size=new_size, reservations=reservations) def migrate_volume(self, ctxt, volume, dest_host, force_host_copy): - host_p = {'host': dest_host.host, - 'capabilities': dest_host.capabilities} - cctxt = self._get_cctxt(volume.host) - cctxt.cast(ctxt, 'migrate_volume', volume=volume, host=host_p, + backend_p = {'host': dest_host.host, + 'cluster_name': dest_host.cluster_name, + 'capabilities': dest_host.capabilities} + + version = '3.5' + if not self.client.can_send_version(version): + version = '3.0' + del backend_p['cluster_name'] + + cctxt = self._get_cctxt(volume.service_topic_queue, version) + cctxt.cast(ctxt, 'migrate_volume', volume=volume, host=backend_p, force_host_copy=force_host_copy) def migrate_volume_completion(self, ctxt, volume, new_volume, error): - cctxt = self._get_cctxt(volume.host) + cctxt = self._get_cctxt(volume.service_topic_queue) return cctxt.call(ctxt, 'migrate_volume_completion', volume=volume, new_volume=new_volume, error=error,) def retype(self, ctxt, volume, new_type_id, dest_host, migration_policy='never', reservations=None, old_reservations=None): - host_p = {'host': dest_host.host, - 'capabilities': dest_host.capabilities} - cctxt = self._get_cctxt(volume.host) + backend_p = {'host': dest_host.host, + 'cluster_name': dest_host.cluster_name, + 'capabilities': dest_host.capabilities} + version = '3.5' + if not self.client.can_send_version(version): + version = '3.0' + del backend_p['cluster_name'] + + cctxt = self._get_cctxt(volume.service_topic_queue, version) cctxt.cast(ctxt, 'retype', volume=volume, new_type_id=new_type_id, - host=host_p, migration_policy=migration_policy, + host=backend_p, migration_policy=migration_policy, reservations=reservations, old_reservations=old_reservations) def manage_existing(self, ctxt, volume, ref): - cctxt = self._get_cctxt(volume.host) + cctxt = self._get_cctxt(volume.service_topic_queue) cctxt.cast(ctxt, 'manage_existing', ref=ref, volume=volume) def update_migrated_volume(self, ctxt, volume, new_volume, @@ -334,8 +348,8 @@ class VolumeAPI(rpc.RPCAPI): limit=limit, offset=offset, sort_keys=sort_keys, sort_dirs=sort_dirs) - def create_group(self, ctxt, group, host): - cctxt = self._get_cctxt(host) + def create_group(self, ctxt, group): + cctxt = self._get_cctxt(group.service_topic_queue) cctxt.cast(ctxt, 'create_group', group=group) def delete_group(self, ctxt, group): @@ -349,7 +363,7 @@ class VolumeAPI(rpc.RPCAPI): def create_group_from_src(self, ctxt, group, group_snapshot=None, source_group=None): - cctxt = self._get_cctxt(group.host) + cctxt = self._get_cctxt(group.service_topic_queue) cctxt.cast(ctxt, 'create_group_from_src', group=group, group_snapshot=group_snapshot, source_group=source_group) diff --git a/cinder/volume/utils.py b/cinder/volume/utils.py index 4f43a75f1a2..d87502079a8 100644 --- a/cinder/volume/utils.py +++ b/cinder/volume/utils.py @@ -752,6 +752,9 @@ def matching_backend_name(src_volume_type, volume_type): def hosts_are_equivalent(host_1, host_2): + # In case host_1 or host_2 are None + if not (host_1 and host_2): + return host_1 == host_2 return extract_host(host_1) == extract_host(host_2)