From d2ec5787253349e89302bd569c4d36d21fb989c3 Mon Sep 17 00:00:00 2001 From: Gorka Eguileor Date: Tue, 22 Mar 2016 16:43:37 +0100 Subject: [PATCH] Make c-vol use workers table for cleanup To be able to support multiple hosts working with the same resources we have added the workers table to keep track of which host is working with each specific resource. This patch makes c-vol service work with this new table by adding entries on cleanable operations and removing them once these operations have completed. Service cleanup on initialization has also been changed to use this new table so hosts will cleanup only resources from operations they left on the air and leave any operations that are being processed by other hosts. Specs: https://review.openstack.org/236977 Implements: blueprint cinder-volume-active-active-support Change-Id: I4e5440b8450558add372214fd1a0373ab4ad2434 --- cinder/api/contrib/admin_actions.py | 10 + cinder/db/api.py | 15 +- cinder/db/sqlalchemy/api.py | 83 +++++- .../versions/085_modify_workers_updated_at.py | 55 ++++ cinder/manager.py | 108 +++++++- cinder/objects/base.py | 1 + cinder/objects/cleanable.py | 7 +- cinder/objects/snapshot.py | 18 +- cinder/objects/volume.py | 26 +- cinder/scheduler/rpcapi.py | 1 + cinder/service.py | 6 +- cinder/test.py | 7 + cinder/tests/unit/consistencygroup/test_cg.py | 4 +- cinder/tests/unit/objects/__init__.py | 4 +- cinder/tests/unit/objects/test_cleanable.py | 1 + cinder/tests/unit/objects/test_objects.py | 4 +- cinder/tests/unit/scheduler/test_rpcapi.py | 41 ++- cinder/tests/unit/test_cleanable_manager.py | 227 +++++++++++++++ cinder/tests/unit/test_db_api.py | 34 +++ cinder/tests/unit/test_db_worker_api.py | 52 +++- cinder/tests/unit/test_service.py | 3 +- cinder/tests/unit/test_volume.py | 259 +++--------------- cinder/tests/unit/test_volume_cleanup.py | 177 ++++++++++++ cinder/tests/unit/volume/__init__.py | 150 ++++++++++ .../tests/unit/volume/test_manage_volume.py | 9 +- cinder/volume/manager.py | 106 +++---- cinder/volume/rpcapi.py | 2 + 27 files changed, 1087 insertions(+), 323 deletions(-) create mode 100644 cinder/db/sqlalchemy/migrate_repo/versions/085_modify_workers_updated_at.py create mode 100644 cinder/tests/unit/test_cleanable_manager.py create mode 100644 cinder/tests/unit/test_volume_cleanup.py diff --git a/cinder/api/contrib/admin_actions.py b/cinder/api/contrib/admin_actions.py index ed5fe998d98..2fb3b36e5ba 100644 --- a/cinder/api/contrib/admin_actions.py +++ b/cinder/api/contrib/admin_actions.py @@ -80,6 +80,15 @@ class AdminController(wsgi.Controller): action = '%s_admin_actions:%s' % (self.resource_name, action_name) extensions.extension_authorizer('volume', action)(context) + def _remove_worker(self, context, id): + # Remove the cleanup worker from the DB when we change a resource + # status since it renders useless the entry. + res = db.worker_destroy(context, resource_type=self.collection.title(), + resource_id=id) + if res: + LOG.debug('Worker entry for %s with id %s has been deleted.', + self.collection, id) + @wsgi.action('os-reset_status') def _reset_status(self, req, id, body): """Reset status on the resource.""" @@ -106,6 +115,7 @@ class AdminController(wsgi.Controller): # Not found exception will be handled at the wsgi level self._update(context, id, update) + self._remove_worker(context, id) if update.get('attach_status') == 'detached': _clean_volume_attachment(context, id) diff --git a/cinder/db/api.py b/cinder/db/api.py index 30dd92bca0a..a674b9c11ed 100644 --- a/cinder/db/api.py +++ b/cinder/db/api.py @@ -251,8 +251,8 @@ def volume_get(context, volume_id): return IMPL.volume_get(context, volume_id) -def volume_get_all(context, marker, limit, sort_keys=None, sort_dirs=None, - filters=None, offset=None): +def volume_get_all(context, marker=None, limit=None, sort_keys=None, + sort_dirs=None, filters=None, offset=None): """Get all volumes.""" return IMPL.volume_get_all(context, marker, limit, sort_keys=sort_keys, sort_dirs=sort_dirs, filters=filters, @@ -1584,6 +1584,17 @@ def message_destroy(context, message_id): ################### +def workers_init(): + """Check if DB supports subsecond resolution and set global flag. + + MySQL 5.5 doesn't support subsecond resolution in datetime fields, so we + have to take it into account when working with the worker's table. + + Once we drop support for MySQL 5.5 we can remove this method. + """ + return IMPL.workers_init() + + def worker_create(context, **values): """Create a worker entry from optional arguments.""" return IMPL.worker_create(context, **values) diff --git a/cinder/db/sqlalchemy/api.py b/cinder/db/sqlalchemy/api.py index 02709d26abb..58306358144 100644 --- a/cinder/db/sqlalchemy/api.py +++ b/cinder/db/sqlalchemy/api.py @@ -1751,8 +1751,8 @@ def volume_get(context, volume_id): @require_admin_context -def volume_get_all(context, marker, limit, sort_keys=None, sort_dirs=None, - filters=None, offset=None): +def volume_get_all(context, marker=None, limit=None, sort_keys=None, + sort_dirs=None, filters=None, offset=None): """Retrieves all volumes. If no sort parameters are specified then the returned volumes are sorted @@ -1998,6 +1998,15 @@ def _process_volume_filters(query, filters): LOG.debug("'migration_status' column could not be found.") return None + host = filters.pop('host', None) + if host: + query = query.filter(_filter_host(models.Volume.host, host)) + + cluster_name = filters.pop('cluster_name', None) + if cluster_name: + query = query.filter(_filter_host(models.Volume.cluster_name, + cluster_name)) + # Apply exact match filters for everything else, ensure that the # filter value exists on the model for key in filters.keys(): @@ -2608,7 +2617,8 @@ def snapshot_get_all(context, filters=None, marker=None, limit=None, order. :param context: context to query under - :param filters: dictionary of filters; will do exact matching on values + :param filters: dictionary of filters; will do exact matching on values. + Special keys host and cluster_name refer to the volume. :param marker: the last item of the previous page, used to determine the next page of results to return :param limit: maximum number of items to return @@ -2618,7 +2628,9 @@ def snapshot_get_all(context, filters=None, marker=None, limit=None, paired with corresponding item in sort_keys :returns: list of matching snapshots """ - if filters and not is_valid_model_filters(models.Snapshot, filters): + if filters and not is_valid_model_filters(models.Snapshot, filters, + exclude_list=('host', + 'cluster_name')): return [] session = get_session() @@ -2642,8 +2654,19 @@ def _snaps_get_query(context, session=None, project_only=False): def _process_snaps_filters(query, filters): if filters: # Ensure that filters' keys exist on the model - if not is_valid_model_filters(models.Snapshot, filters): + if not is_valid_model_filters(models.Snapshot, filters, + exclude_list=('host', 'cluster_name')): return None + filters = filters.copy() + host = filters.pop('host', None) + cluster = filters.pop('cluster_name', None) + if host or cluster: + query = query.join(models.Snapshot.volume) + vol_field = models.Volume + if host: + query = query.filter(_filter_host(vol_field.host, host)) + if cluster: + query = query.filter(_filter_host(vol_field.cluster_name, cluster)) query = query.filter_by(**filters) return query @@ -5481,13 +5504,15 @@ def cgsnapshot_get(context, cgsnapshot_id): return _cgsnapshot_get(context, cgsnapshot_id) -def is_valid_model_filters(model, filters): +def is_valid_model_filters(model, filters, exclude_list=None): """Return True if filter values exist on the model :param model: a Cinder model :param filters: dictionary of filters """ for key in filters.keys(): + if exclude_list and key in exclude_list: + continue try: getattr(model, key) except AttributeError: @@ -6055,7 +6080,7 @@ def image_volume_cache_get_all_for_host(context, host): def _worker_query(context, session=None, until=None, db_filters=None, - **filters): + ignore_sentinel=True, **filters): # Remove all filters based on the workers table that are set to None filters = _clean_filters(filters) @@ -6064,6 +6089,11 @@ def _worker_query(context, session=None, until=None, db_filters=None, query = model_query(context, models.Worker, session=session) + # TODO(geguileo): Once we remove support for MySQL 5.5 we can remove this + if ignore_sentinel: + # We don't want to retrieve the workers sentinel + query = query.filter(models.Worker.resource_type != 'SENTINEL') + if until: db_filters = list(db_filters) if db_filters else [] # Since we set updated_at at creation time we don't need to check @@ -6079,8 +6109,41 @@ def _worker_query(context, session=None, until=None, db_filters=None, return query +DB_SUPPORTS_SUBSECOND_RESOLUTION = True + + +def workers_init(): + """Check if DB supports subsecond resolution and set global flag. + + MySQL 5.5 doesn't support subsecond resolution in datetime fields, so we + have to take it into account when working with the worker's table. + + To do this we'll have 1 row in the DB, created by the migration script, + where we have tried to set the microseconds and we'll check it. + + Once we drop support for MySQL 5.5 we can remove this method. + """ + global DB_SUPPORTS_SUBSECOND_RESOLUTION + session = get_session() + query = session.query(models.Worker).filter_by(resource_type='SENTINEL') + worker = query.first() + DB_SUPPORTS_SUBSECOND_RESOLUTION = bool(worker.updated_at.microsecond) + + +def _worker_set_updated_at_field(values): + # TODO(geguileo): Once we drop support for MySQL 5.5 we can simplify this + # method. + updated_at = values.get('updated_at', timeutils.utcnow()) + if isinstance(updated_at, six.string_types): + return + if not DB_SUPPORTS_SUBSECOND_RESOLUTION: + updated_at = updated_at.replace(microsecond=0) + values['updated_at'] = updated_at + + def worker_create(context, **values): """Create a worker entry from optional arguments.""" + _worker_set_updated_at_field(values) worker = models.Worker(**values) session = get_session() try: @@ -6118,6 +6181,11 @@ def worker_update(context, id, filters=None, orm_worker=None, **values): """Update a worker with given values.""" filters = filters or {} query = _worker_query(context, id=id, **filters) + + # If we want to update the orm_worker and we don't set the update_at field + # we set it here instead of letting SQLAlchemy do it to be able to update + # the orm_worker. + _worker_set_updated_at_field(values) result = query.update(values) if not result: raise exception.WorkerNotFound(id=id, **filters) @@ -6131,6 +6199,7 @@ def worker_claim_for_cleanup(context, claimer_id, orm_worker): # service_id is the same in the DB, thus flagging the claim. values = {'service_id': claimer_id, 'updated_at': timeutils.utcnow()} + _worker_set_updated_at_field(values) # We only update the worker entry if it hasn't been claimed by other host # or thread diff --git a/cinder/db/sqlalchemy/migrate_repo/versions/085_modify_workers_updated_at.py b/cinder/db/sqlalchemy/migrate_repo/versions/085_modify_workers_updated_at.py new file mode 100644 index 00000000000..0b02b986d0e --- /dev/null +++ b/cinder/db/sqlalchemy/migrate_repo/versions/085_modify_workers_updated_at.py @@ -0,0 +1,55 @@ +# Copyright (c) 2016 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_utils import timeutils +from sqlalchemy.dialects import mysql +from sqlalchemy import MetaData, Table + + +def upgrade(migrate_engine): + """Add microseconds precission on updated_at field in MySQL databases. + + PostgreSQL, SQLite, and MSSQL have sub-second precission by default, but + MySQL defaults to second precision in DateTime fields, which creates + problems for the resource cleanup mechanism. + """ + meta = MetaData() + meta.bind = migrate_engine + workers = Table('workers', meta, autoload=True) + + # This is only necessary for mysql, and since the table is not in use this + # will only be an schema update. + if migrate_engine.name.startswith('mysql'): + try: + workers.c.updated_at.alter(mysql.DATETIME(fsp=6)) + except Exception: + # MySQL v5.5 or earlier don't support sub-second resolution so we + # may have cleanup races in Active-Active configurations, that's + # why upgrading is recommended in that case. + # Code in Cinder is capable of working with 5.5, so for 5.5 there's + # no problem + pass + + # TODO(geguileo): Once we remove support for MySQL 5.5 we have to create + # an upgrade migration to remove this row. + # Set workers table sub-second support sentinel + wi = workers.insert() + now = timeutils.utcnow().replace(microsecond=123) + wi.execute({'created_at': now, + 'updated_at': now, + 'deleted': False, + 'resource_type': 'SENTINEL', + 'resource_id': 'SUB-SECOND', + 'status': 'OK'}) diff --git a/cinder/manager.py b/cinder/manager.py index cc198b22701..3ecf30609c5 100644 --- a/cinder/manager.py +++ b/cinder/manager.py @@ -56,9 +56,14 @@ from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging from oslo_service import periodic_task +from oslo_utils import timeutils +from cinder import context +from cinder import db from cinder.db import base -from cinder.i18n import _LI +from cinder import exception +from cinder.i18n import _LE, _LI, _LW +from cinder import objects from cinder import rpc from cinder.scheduler import rpcapi as scheduler_rpcapi @@ -92,13 +97,14 @@ class Manager(base.Base, PeriodicTasks): """Tasks to be run at a periodic interval.""" return self.run_periodic_tasks(context, raise_on_error=raise_on_error) - def init_host(self, added_to_cluster=None): + def init_host(self, service_id=None, added_to_cluster=None): """Handle initialization if this is a standalone service. A hook point for services to execute tasks before the services are made available (i.e. showing up on RPC and starting to accept RPC calls) to other components. Child classes should override this method. + :param service_id: ID of the service where the manager is running. :param added_to_cluster: True when a host's cluster configuration has changed from not being defined or being '' to any other value and the DB service record @@ -175,3 +181,101 @@ class SchedulerDependentManager(Manager): def reset(self): super(SchedulerDependentManager, self).reset() self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI() + + +class CleanableManager(object): + def do_cleanup(self, context, cleanup_request): + LOG.info(_LI('Initiating service %s cleanup'), + cleanup_request.service_id) + + # If the 'until' field in the cleanup request is not set, we default to + # this very moment. + until = cleanup_request.until or timeutils.utcnow() + keep_entry = False + + to_clean = db.worker_get_all( + context, + resource_type=cleanup_request.resource_type, + resource_id=cleanup_request.resource_id, + service_id=cleanup_request.service_id, + until=until) + + for clean in to_clean: + original_service_id = clean.service_id + original_time = clean.updated_at + # Try to do a soft delete to mark the entry as being cleaned up + # by us (setting service id to our service id). + res = db.worker_claim_for_cleanup(context, + claimer_id=self.service_id, + orm_worker=clean) + + # Claim may fail if entry is being cleaned by another service, has + # been removed (finished cleaning) by another service or the user + # started a new cleanable operation. + # In any of these cases we don't have to do cleanup or remove the + # worker entry. + if not res: + continue + + # Try to get versioned object for resource we have to cleanup + try: + vo_cls = getattr(objects, clean.resource_type) + vo = vo_cls.get_by_id(context, clean.resource_id) + # Set the worker DB entry in the VO and mark it as being a + # clean operation + clean.cleaning = True + vo.worker = clean + except exception.NotFound: + LOG.debug('Skipping cleanup for non existent %(type)s %(id)s.', + {'type': clean.resource_type, + 'id': clean.resource_id}) + else: + # Resource status should match + if vo.status != clean.status: + LOG.debug('Skipping cleanup for mismatching work on ' + '%(type)s %(id)s: %(exp_sts)s <> %(found_sts)s.', + {'type': clean.resource_type, + 'id': clean.resource_id, + 'exp_sts': clean.status, + 'found_sts': vo.status}) + else: + LOG.info(_LI('Cleaning %(type)s with id %(id)s and status ' + '%(status)s'), + {'type': clean.resource_type, + 'id': clean.resource_id, + 'status': clean.status}, + resource=vo) + try: + # Some cleanup jobs are performed asynchronously, so + # we don't delete the worker entry, they'll take care + # of it + keep_entry = self._do_cleanup(context, vo) + except Exception: + LOG.exception(_LE('Could not perform cleanup.')) + # Return the worker DB entry to the original service + db.worker_update(context, clean.id, + service_id=original_service_id, + updated_at=original_time) + continue + + # The resource either didn't exist or was properly cleaned, either + # way we can remove the entry from the worker table if the cleanup + # method doesn't want to keep the entry (for example for delayed + # deletion). + if not keep_entry and not db.worker_destroy(context, id=clean.id): + LOG.warning(_LW('Could not remove worker entry %s.'), clean.id) + + LOG.info(_LI('Service %s cleanup completed.'), + cleanup_request.service_id) + + def _do_cleanup(self, ctxt, vo_resource): + return False + + def init_host(self, service_id, **kwargs): + ctxt = context.get_admin_context() + self.service_id = service_id + # TODO(geguileo): Once we don't support MySQL 5.5 anymore we can remove + # call to workers_init. + db.workers_init() + cleanup_request = objects.CleanupRequest(service_id=service_id) + self.do_cleanup(ctxt, cleanup_request) diff --git a/cinder/objects/base.py b/cinder/objects/base.py index ff4ecd7fc6a..329191edbbf 100644 --- a/cinder/objects/base.py +++ b/cinder/objects/base.py @@ -119,6 +119,7 @@ OBJ_VERSIONS.add('1.11', {'GroupSnapshot': '1.0', 'GroupSnapshotList': '1.0', OBJ_VERSIONS.add('1.12', {'VolumeType': '1.3'}) OBJ_VERSIONS.add('1.13', {'CleanupRequest': '1.0'}) OBJ_VERSIONS.add('1.14', {'VolumeAttachmentList': '1.1'}) +OBJ_VERSIONS.add('1.15', {'Volume': '1.6', 'Snapshot': '1.2'}) class CinderObjectRegistry(base.VersionedObjectRegistry): diff --git a/cinder/objects/cleanable.py b/cinder/objects/cleanable.py index 59007910ea3..420355e2a1e 100644 --- a/cinder/objects/cleanable.py +++ b/cinder/objects/cleanable.py @@ -13,9 +13,9 @@ # License for the specific language governing permissions and limitations # under the License. -from functools import wraps import inspect +import decorator from oslo_utils import versionutils from cinder import db @@ -168,8 +168,7 @@ class CinderCleanableObject(base.CinderPersistentObject): to be added. """ def _decorator(f): - @wraps(f) - def wrapper(*args, **kwargs): + def wrapper(f, *args, **kwargs): if decorator_args: call_args = inspect.getcallargs(f, *args, **kwargs) candidates = [call_args[obj] for obj in decorator_args] @@ -201,7 +200,7 @@ class CinderCleanableObject(base.CinderPersistentObject): except Exception: pass return result - return wrapper + return decorator.decorate(f, wrapper) # If we don't have optional decorator arguments the argument in # decorator_args is the function we have to decorate diff --git a/cinder/objects/snapshot.py b/cinder/objects/snapshot.py index f74e2475994..4584e8e1bc2 100644 --- a/cinder/objects/snapshot.py +++ b/cinder/objects/snapshot.py @@ -21,6 +21,7 @@ from cinder import exception from cinder.i18n import _ from cinder import objects from cinder.objects import base +from cinder.objects import cleanable from cinder.objects import fields as c_fields @@ -28,11 +29,12 @@ CONF = cfg.CONF @base.CinderObjectRegistry.register -class Snapshot(base.CinderPersistentObject, base.CinderObject, +class Snapshot(cleanable.CinderCleanableObject, base.CinderObject, base.CinderObjectDictCompat): # Version 1.0: Initial version # Version 1.1: Changed 'status' field to use SnapshotStatusField - VERSION = '1.1' + # Version 1.2: This object is now cleanable (adds rows to workers table) + VERSION = '1.2' # NOTE(thangp): OPTIONAL_FIELDS are fields that would be lazy-loaded. They # are typically the relationship in the sqlalchemy object. @@ -248,6 +250,13 @@ class Snapshot(base.CinderPersistentObject, base.CinderObject, return db.snapshot_data_get_for_project(context, project_id, volume_type_id) + @staticmethod + def _is_cleanable(status, obj_version): + # Before 1.2 we didn't have workers table, so cleanup wasn't supported. + if obj_version and obj_version < 1.2: + return False + return status == 'creating' + @base.CinderObjectRegistry.register class SnapshotList(base.ObjectListBase, base.CinderObject): @@ -260,6 +269,11 @@ class SnapshotList(base.ObjectListBase, base.CinderObject): @classmethod def get_all(cls, context, search_opts, marker=None, limit=None, sort_keys=None, sort_dirs=None, offset=None): + """Get all snapshot given some search_opts (filters). + + Special search options accepted are host and cluster_name, that refer + to the volume's fields. + """ snapshots = db.snapshot_get_all(context, search_opts, marker, limit, sort_keys, sort_dirs, offset) expected_attrs = Snapshot._get_expected_attrs(context) diff --git a/cinder/objects/volume.py b/cinder/objects/volume.py index f1aa619e076..98863213d73 100644 --- a/cinder/objects/volume.py +++ b/cinder/objects/volume.py @@ -21,6 +21,7 @@ from cinder import exception from cinder.i18n import _ from cinder import objects from cinder.objects import base +from cinder.objects import cleanable CONF = cfg.CONF @@ -48,7 +49,7 @@ class MetadataObject(dict): @base.CinderObjectRegistry.register -class Volume(base.CinderPersistentObject, base.CinderObject, +class Volume(cleanable.CinderCleanableObject, base.CinderObject, base.CinderObjectDictCompat, base.CinderComparableObject, base.ClusteredObject): # Version 1.0: Initial version @@ -58,7 +59,8 @@ class Volume(base.CinderPersistentObject, base.CinderObject, # Version 1.3: Added finish_volume_migration() # Version 1.4: Added cluster fields # Version 1.5: Added group - VERSION = '1.5' + # Version 1.6: This object is now cleanable (adds rows to workers table) + VERSION = '1.6' OPTIONAL_FIELDS = ('metadata', 'admin_metadata', 'glance_metadata', 'volume_type', 'volume_attachment', 'consistencygroup', @@ -364,6 +366,14 @@ class Volume(base.CinderPersistentObject, base.CinderObject, self.admin_metadata = db.volume_admin_metadata_update( self._context, self.id, metadata, True) + # When we are creating a volume and we change from 'creating' + # status to 'downloading' status we have to change the worker entry + # in the DB to reflect this change, otherwise the cleanup will + # not be performed as it will be mistaken for a volume that has + # been somehow changed (reset status, forced operation...) + if updates.get('status') == 'downloading': + self.set_worker() + db.volume_update(self._context, self.id, updates) self.obj_reset_changes() @@ -486,6 +496,14 @@ class Volume(base.CinderPersistentObject, base.CinderObject, dest_volume.save() return dest_volume + @staticmethod + def _is_cleanable(status, obj_version): + # Before 1.6 we didn't have workers table, so cleanup wasn't supported. + # cleaning. + if obj_version and obj_version < 1.6: + return False + return status in ('creating', 'deleting', 'uploading', 'downloading') + @base.CinderObjectRegistry.register class VolumeList(base.ObjectListBase, base.CinderObject): @@ -523,8 +541,8 @@ class VolumeList(base.ObjectListBase, base.CinderObject): return expected_attrs @classmethod - def get_all(cls, context, marker, limit, sort_keys=None, sort_dirs=None, - filters=None, offset=None): + def get_all(cls, context, marker=None, limit=None, sort_keys=None, + sort_dirs=None, filters=None, offset=None): volumes = db.volume_get_all(context, marker, limit, sort_keys=sort_keys, sort_dirs=sort_dirs, filters=filters, offset=offset) diff --git a/cinder/scheduler/rpcapi.py b/cinder/scheduler/rpcapi.py index 6dd00cba55f..461a8273556 100644 --- a/cinder/scheduler/rpcapi.py +++ b/cinder/scheduler/rpcapi.py @@ -95,6 +95,7 @@ class SchedulerAPI(rpc.RPCAPI): def create_volume(self, ctxt, volume, snapshot_id=None, image_id=None, request_spec=None, filter_properties=None): + volume.create_worker() cctxt = self._get_cctxt() msg_args = {'snapshot_id': snapshot_id, 'image_id': image_id, 'request_spec': request_spec, diff --git a/cinder/service.py b/cinder/service.py index d08bd930dce..06a94d15882 100644 --- a/cinder/service.py +++ b/cinder/service.py @@ -121,7 +121,6 @@ class Service(service.Service): on topic. It also periodically runs tasks on the manager and reports it state to the database services table. """ - # Make service_id a class attribute so it can be used for clean up service_id = None @@ -144,6 +143,8 @@ class Service(service.Service): if CONF.profiler.enabled: manager_class = profiler.trace_cls("rpc")(manager_class) + self.service = None + # NOTE(geguileo): We need to create the Service DB entry before we # create the manager, otherwise capped versions for serializer and rpc # client would use existing DB entries not including us, which could @@ -234,7 +235,8 @@ class Service(service.Service): if self.coordination: coordination.COORDINATOR.start() - self.manager.init_host(added_to_cluster=self.added_to_cluster) + self.manager.init_host(added_to_cluster=self.added_to_cluster, + service_id=Service.service_id) LOG.debug("Creating RPC server for service %s", self.topic) diff --git a/cinder/test.py b/cinder/test.py index 254f5331a22..221834a5a1a 100644 --- a/cinder/test.py +++ b/cinder/test.py @@ -89,6 +89,7 @@ class TestCase(testtools.TestCase): """Test case base class for all unit tests.""" POLICY_PATH = 'cinder/tests/unit/policy.json' + MOCK_WORKER = True def _get_joined_notifier(self, *args, **kwargs): # We create a new fake notifier but we join the notifications with @@ -110,6 +111,12 @@ class TestCase(testtools.TestCase): side_effect=self._get_joined_notifier) p.start() + if self.MOCK_WORKER: + # Mock worker creation for all tests that don't care about it + clean_path = 'cinder.objects.cleanable.CinderCleanableObject.%s' + for method in ('create_worker', 'set_worker', 'unset_worker'): + self.patch(clean_path % method, return_value=None) + # Unit tests do not need to use lazy gettext i18n.enable_lazy(False) diff --git a/cinder/tests/unit/consistencygroup/test_cg.py b/cinder/tests/unit/consistencygroup/test_cg.py index 33795b66aca..c8a420ffc26 100644 --- a/cinder/tests/unit/consistencygroup/test_cg.py +++ b/cinder/tests/unit/consistencygroup/test_cg.py @@ -23,8 +23,8 @@ from cinder import quota from cinder.tests.unit import conf_fixture from cinder.tests.unit import fake_constants as fake from cinder.tests.unit import fake_snapshot -from cinder.tests.unit import test_volume from cinder.tests.unit import utils as tests_utils +from cinder.tests.unit import volume as base import cinder.volume from cinder.volume import driver from cinder.volume import utils as volutils @@ -33,7 +33,7 @@ CGQUOTAS = quota.CGQUOTAS CONF = cfg.CONF -class ConsistencyGroupTestCase(test_volume.BaseVolumeTestCase): +class ConsistencyGroupTestCase(base.BaseVolumeTestCase): def test_delete_volume_in_consistency_group(self): """Test deleting a volume that's tied to a consistency group fails.""" consistencygroup_id = fake.CONSISTENCY_GROUP_ID diff --git a/cinder/tests/unit/objects/__init__.py b/cinder/tests/unit/objects/__init__.py index 3e1886d30a8..08d8ffd6ae8 100644 --- a/cinder/tests/unit/objects/__init__.py +++ b/cinder/tests/unit/objects/__init__.py @@ -21,8 +21,8 @@ from cinder import test class BaseObjectsTestCase(test.TestCase): - def setUp(self): - super(BaseObjectsTestCase, self).setUp() + def setUp(self, *args, **kwargs): + super(BaseObjectsTestCase, self).setUp(*args, **kwargs) self.user_id = 'fake-user' self.project_id = 'fake-project' self.context = context.RequestContext(self.user_id, self.project_id, diff --git a/cinder/tests/unit/objects/test_cleanable.py b/cinder/tests/unit/objects/test_cleanable.py index a15b6d9252f..aa49270e547 100644 --- a/cinder/tests/unit/objects/test_cleanable.py +++ b/cinder/tests/unit/objects/test_cleanable.py @@ -40,6 +40,7 @@ class Backup(cleanable.CinderCleanableObject): class TestCleanable(test_objects.BaseObjectsTestCase): + MOCK_WORKER = False def setUp(self): super(TestCleanable, self).setUp() diff --git a/cinder/tests/unit/objects/test_objects.py b/cinder/tests/unit/objects/test_objects.py index cfa886e5f81..a5bff8210a3 100644 --- a/cinder/tests/unit/objects/test_objects.py +++ b/cinder/tests/unit/objects/test_objects.py @@ -38,9 +38,9 @@ object_data = { 'RequestSpec': '1.1-b0bd1a28d191d75648901fa853e8a733', 'Service': '1.4-c7d011989d1718ca0496ccf640b42712', 'ServiceList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e', - 'Snapshot': '1.1-d6a9d58f627bb2a5cf804b0dd7a12bc7', + 'Snapshot': '1.2-d6a9d58f627bb2a5cf804b0dd7a12bc7', 'SnapshotList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e', - 'Volume': '1.5-19919d8086d6a38ab9d3ab88139e70e0', + 'Volume': '1.6-19919d8086d6a38ab9d3ab88139e70e0', 'VolumeList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e', 'VolumeAttachment': '1.0-b30dacf62b2030dd83d8a1603f1064ff', 'VolumeAttachmentList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e', diff --git a/cinder/tests/unit/scheduler/test_rpcapi.py b/cinder/tests/unit/scheduler/test_rpcapi.py index 464e2006058..2b2df0384d2 100644 --- a/cinder/tests/unit/scheduler/test_rpcapi.py +++ b/cinder/tests/unit/scheduler/test_rpcapi.py @@ -17,8 +17,6 @@ Unit Tests for cinder.scheduler.rpcapi """ -import copy - import mock from cinder import context @@ -46,7 +44,7 @@ class SchedulerRpcAPITestCase(test.TestCase): "version": kwargs.pop('version', rpcapi.RPC_API_VERSION) } - expected_msg = copy.deepcopy(kwargs) + expected_msg = kwargs.copy() self.fake_args = None self.fake_kwargs = None @@ -86,44 +84,65 @@ class SchedulerRpcAPITestCase(test.TestCase): version='3.0') def test_create_volume(self): + volume = fake_volume.fake_volume_obj(self.context) + create_worker_mock = self.mock_object(volume, 'create_worker') self._test_scheduler_api('create_volume', rpc_method='cast', snapshot_id='snapshot_id', image_id='image_id', request_spec='fake_request_spec', filter_properties='filter_properties', - volume=fake_volume.fake_volume_obj( - self.context), + volume=volume, version='3.0') + create_worker_mock.assert_called_once() + + def test_create_volume_serialization(self): + volume = fake_volume.fake_volume_obj(self.context) + create_worker_mock = self.mock_object(volume, 'create_worker') + self._test_scheduler_api('create_volume', + rpc_method='cast', + snapshot_id='snapshot_id', + image_id='image_id', + request_spec={'volume_type': {}}, + filter_properties='filter_properties', + volume=volume, + version='3.0') + create_worker_mock.assert_called_once() def test_migrate_volume_to_host(self): + volume = fake_volume.fake_volume_obj(self.context) + create_worker_mock = self.mock_object(volume, 'create_worker') self._test_scheduler_api('migrate_volume_to_host', rpc_method='cast', host='host', force_host_copy=True, request_spec='fake_request_spec', filter_properties='filter_properties', - volume=fake_volume.fake_volume_obj( - self.context), + volume=volume, version='3.0') + create_worker_mock.assert_not_called() def test_retype(self): + volume = fake_volume.fake_volume_obj(self.context) + create_worker_mock = self.mock_object(volume, 'create_worker') self._test_scheduler_api('retype', rpc_method='cast', request_spec='fake_request_spec', filter_properties='filter_properties', - volume=fake_volume.fake_volume_obj( - self.context), + volume=volume, version='3.0') + create_worker_mock.assert_not_called() def test_manage_existing(self): + volume = fake_volume.fake_volume_obj(self.context) + create_worker_mock = self.mock_object(volume, 'create_worker') self._test_scheduler_api('manage_existing', rpc_method='cast', request_spec='fake_request_spec', filter_properties='filter_properties', - volume=fake_volume.fake_volume_obj( - self.context), + volume=volume, version='3.0') + create_worker_mock.assert_not_called() def test_get_pools(self): self._test_scheduler_api('get_pools', diff --git a/cinder/tests/unit/test_cleanable_manager.py b/cinder/tests/unit/test_cleanable_manager.py new file mode 100644 index 00000000000..e682501d56d --- /dev/null +++ b/cinder/tests/unit/test_cleanable_manager.py @@ -0,0 +1,227 @@ +# Copyright (c) 2016 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from oslo_utils import timeutils + +from cinder import context +from cinder import db +from cinder import manager +from cinder import objects +from cinder import test +from cinder.tests.unit import fake_constants +from cinder.tests.unit import utils + + +class FakeManager(manager.CleanableManager): + def __init__(self, service_id=None, keep_after_clean=False): + if service_id: + self.service_id = service_id + self.keep_after_clean = keep_after_clean + + def _do_cleanup(self, ctxt, vo_resource): + vo_resource.status += '_cleaned' + vo_resource.save() + return self.keep_after_clean + + +class TestCleanableManager(test.TestCase): + def setUp(self): + super(TestCleanableManager, self).setUp() + self.user_id = fake_constants.USER_ID + self.project_id = fake_constants.PROJECT_ID + self.context = context.RequestContext(self.user_id, self.project_id, + is_admin=True) + self.service = db.service_create(self.context, {}) + + @mock.patch('cinder.db.workers_init', autospec=True) + @mock.patch('cinder.manager.CleanableManager.do_cleanup', autospec=True) + def test_init_host_with_service(self, mock_cleanup, mock_workers_init): + mngr = FakeManager() + self.assertFalse(hasattr(mngr, 'service_id')) + mngr.init_host(service_id=self.service.id) + + self.assertEqual(self.service.id, mngr.service_id) + mock_cleanup.assert_called_once_with(mngr, mock.ANY, mock.ANY) + clean_req = mock_cleanup.call_args[0][2] + self.assertIsInstance(clean_req, objects.CleanupRequest) + self.assertEqual(self.service.id, clean_req.service_id) + mock_workers_init.assert_called_once_with() + + def test_do_cleanup(self): + """Basic successful cleanup.""" + vol = utils.create_volume(self.context, status='creating') + db.worker_create(self.context, status='creating', + resource_type='Volume', resource_id=vol.id, + service_id=self.service.id) + + clean_req = objects.CleanupRequest(service_id=self.service.id) + mngr = FakeManager(self.service.id) + mngr.do_cleanup(self.context, clean_req) + + self.assertListEqual([], db.worker_get_all(self.context)) + vol.refresh() + self.assertEqual('creating_cleaned', vol.status) + + def test_do_cleanup_not_cleaning_already_claimed(self): + """Basic cleanup that doesn't touch already cleaning works.""" + vol = utils.create_volume(self.context, status='creating') + worker1 = db.worker_create(self.context, status='creating', + resource_type='Volume', resource_id=vol.id, + service_id=self.service.id) + worker1 = db.worker_get(self.context, id=worker1.id) + vol2 = utils.create_volume(self.context, status='deleting') + worker2 = db.worker_create(self.context, status='deleting', + resource_type='Volume', resource_id=vol2.id, + service_id=self.service.id + 1) + worker2 = db.worker_get(self.context, id=worker2.id) + + # Simulate that the change to vol2 worker happened between + # worker_get_all and trying to claim a work for cleanup + worker2.service_id = self.service.id + + clean_req = objects.CleanupRequest(service_id=self.service.id) + mngr = FakeManager(self.service.id) + with mock.patch('cinder.db.worker_get_all') as get_all_mock: + get_all_mock.return_value = [worker1, worker2] + mngr.do_cleanup(self.context, clean_req) + + workers = db.worker_get_all(self.context) + self.assertEqual(1, len(workers)) + self.assertEqual(worker2.id, workers[0].id) + + vol.refresh() + self.assertEqual('creating_cleaned', vol.status) + vol2.refresh() + self.assertEqual('deleting', vol2.status) + + def test_do_cleanup_not_cleaning_already_claimed_by_us(self): + """Basic cleanup that doesn't touch other thread's claimed works.""" + original_time = timeutils.utcnow() + other_thread_claimed_time = timeutils.utcnow() + vol = utils.create_volume(self.context, status='creating') + worker1 = db.worker_create(self.context, status='creating', + resource_type='Volume', resource_id=vol.id, + service_id=self.service.id, + updated_at=original_time) + worker1 = db.worker_get(self.context, id=worker1.id) + vol2 = utils.create_volume(self.context, status='deleting') + worker2 = db.worker_create(self.context, status='deleting', + resource_type='Volume', resource_id=vol2.id, + service_id=self.service.id, + updated_at=other_thread_claimed_time) + worker2 = db.worker_get(self.context, id=worker2.id) + + # Simulate that the change to vol2 worker happened between + # worker_get_all and trying to claim a work for cleanup + worker2.updated_at = original_time + + clean_req = objects.CleanupRequest(service_id=self.service.id) + mngr = FakeManager(self.service.id) + with mock.patch('cinder.db.worker_get_all') as get_all_mock: + get_all_mock.return_value = [worker1, worker2] + mngr.do_cleanup(self.context, clean_req) + + workers = db.worker_get_all(self.context) + self.assertEqual(1, len(workers)) + self.assertEqual(worker2.id, workers[0].id) + + vol.refresh() + self.assertEqual('creating_cleaned', vol.status) + vol2.refresh() + self.assertEqual('deleting', vol2.status) + + def test_do_cleanup_resource_deleted(self): + """Cleanup on a resource that's been already deleted.""" + vol = utils.create_volume(self.context, status='creating') + db.worker_create(self.context, status='creating', + resource_type='Volume', resource_id=vol.id, + service_id=self.service.id) + vol.destroy() + + clean_req = objects.CleanupRequest(service_id=self.service.id) + mngr = FakeManager(self.service.id) + mngr.do_cleanup(self.context, clean_req) + + workers = db.worker_get_all(self.context) + self.assertListEqual([], workers) + + def test_do_cleanup_resource_on_another_service(self): + """Cleanup on a resource that's been claimed by other service.""" + vol = utils.create_volume(self.context, status='deleting') + db.worker_create(self.context, status='deleting', + resource_type='Volume', resource_id=vol.id, + service_id=self.service.id + 1) + + clean_req = objects.CleanupRequest(service_id=self.service.id) + mngr = FakeManager(self.service.id) + mngr.do_cleanup(self.context, clean_req) + + workers = db.worker_get_all(self.context) + self.assertEqual(1, len(workers)) + + vol.refresh() + self.assertEqual('deleting', vol.status) + + def test_do_cleanup_resource_changed_status(self): + """Cleanup on a resource that's changed status.""" + vol = utils.create_volume(self.context, status='available') + db.worker_create(self.context, status='creating', + resource_type='Volume', resource_id=vol.id, + service_id=self.service.id) + + clean_req = objects.CleanupRequest(service_id=self.service.id) + mngr = FakeManager(self.service.id) + mngr.do_cleanup(self.context, clean_req) + + workers = db.worker_get_all(self.context) + self.assertListEqual([], workers) + + vol.refresh() + self.assertEqual('available', vol.status) + + def test_do_cleanup_keep_worker(self): + """Cleanup on a resource that will remove worker when cleaning up.""" + vol = utils.create_volume(self.context, status='deleting') + db.worker_create(self.context, status='deleting', + resource_type='Volume', resource_id=vol.id, + service_id=self.service.id) + + clean_req = objects.CleanupRequest(service_id=self.service.id) + mngr = FakeManager(self.service.id, keep_after_clean=True) + mngr.do_cleanup(self.context, clean_req) + + workers = db.worker_get_all(self.context) + self.assertEqual(1, len(workers)) + vol.refresh() + self.assertEqual('deleting_cleaned', vol.status) + + @mock.patch.object(FakeManager, '_do_cleanup', side_effect=Exception) + def test_do_cleanup_revive_on_cleanup_fail(self, mock_clean): + """Cleanup will revive a worker if cleanup fails.""" + vol = utils.create_volume(self.context, status='creating') + db.worker_create(self.context, status='creating', + resource_type='Volume', resource_id=vol.id, + service_id=self.service.id) + + clean_req = objects.CleanupRequest(service_id=self.service.id) + mngr = FakeManager(self.service.id) + mngr.do_cleanup(self.context, clean_req) + + workers = db.worker_get_all(self.context) + self.assertEqual(1, len(workers)) + vol.refresh() + self.assertEqual('creating', vol.status) diff --git a/cinder/tests/unit/test_db_api.py b/cinder/tests/unit/test_db_api.py index 592fbcb8784..a931215f8d1 100644 --- a/cinder/tests/unit/test_db_api.py +++ b/cinder/tests/unit/test_db_api.py @@ -407,6 +407,21 @@ class DBAPIVolumeTestCase(BaseTest): self._assertEqualListsOfObjects(volumes, db.volume_get_all( self.ctxt, None, None, ['host'], None)) + @ddt.data('cluster_name', 'host') + def test_volume_get_all_filter_host_and_cluster(self, field): + volumes = [] + for i in range(2): + for value in ('host%d@backend#pool', 'host%d@backend', 'host%d'): + kwargs = {field: value % i} + volumes.append(utils.create_volume(self.ctxt, **kwargs)) + + for i in range(3): + filters = {field: getattr(volumes[i], field)} + result = db.volume_get_all(self.ctxt, filters=filters) + self.assertEqual(i + 1, len(result)) + self.assertSetEqual({v.id for v in volumes[:i + 1]}, + {v.id for v in result}) + def test_volume_get_all_marker_passed(self): volumes = [ db.volume_create(self.ctxt, {'id': 1}), @@ -1277,6 +1292,7 @@ class DBAPIVolumeTestCase(BaseTest): db_vols[i].cluster_name) +@ddt.ddt class DBAPISnapshotTestCase(BaseTest): """Tests for cinder.db.api.snapshot_*.""" @@ -1367,6 +1383,24 @@ class DBAPISnapshotTestCase(BaseTest): filters), ignored_keys=['metadata', 'volume']) + @ddt.data('cluster_name', 'host') + def test_snapshot_get_all_filter_host_and_cluster(self, field): + volumes = [] + snapshots = [] + for i in range(2): + for value in ('host%d@backend#pool', 'host%d@backend', 'host%d'): + kwargs = {field: value % i} + vol = utils.create_volume(self.ctxt, **kwargs) + volumes.append(vol) + snapshots.append(utils.create_snapshot(self.ctxt, vol.id)) + + for i in range(3): + filters = {field: getattr(volumes[i], field)} + result = db.snapshot_get_all(self.ctxt, filters=filters) + self.assertEqual(i + 1, len(result)) + self.assertSetEqual({s.id for s in snapshots[:i + 1]}, + {s.id for s in result}) + def test_snapshot_get_by_host(self): db.volume_create(self.ctxt, {'id': 1, 'host': 'host1'}) db.volume_create(self.ctxt, {'id': 2, 'host': 'host2'}) diff --git a/cinder/tests/unit/test_db_worker_api.py b/cinder/tests/unit/test_db_worker_api.py index be15640e6c9..7fedacff02d 100644 --- a/cinder/tests/unit/test_db_worker_api.py +++ b/cinder/tests/unit/test_db_worker_api.py @@ -15,9 +15,11 @@ """Unit tests for cinder.db.api.Worker""" +from datetime import datetime import time import uuid +import mock from oslo_db import exception as db_exception import six @@ -40,12 +42,41 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin): super(DBAPIWorkerTestCase, self).setUp() self.ctxt = context.get_admin_context() + def tearDown(self): + db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION = True + super(DBAPIWorkerTestCase, self).tearDown() + + def test_workers_init(self): + # SQLite supports subsecond resolution so result is True + db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION = None + db.workers_init() + self.assertTrue(db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION) + + def test_workers_init_not_supported(self): + # Fake a Db that doesn't support sub-second resolution in datetimes + db.worker_update( + self.ctxt, None, + {'resource_type': 'SENTINEL', 'ignore_sentinel': False}, + updated_at=datetime.utcnow().replace(microsecond=0)) + db.workers_init() + self.assertFalse(db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION) + def test_worker_create_and_get(self): """Test basic creation of a worker record.""" worker = db.worker_create(self.ctxt, **self.worker_fields) db_worker = db.worker_get(self.ctxt, id=worker.id) self._assertEqualObjects(worker, db_worker) + @mock.patch('oslo_utils.timeutils.utcnow', + return_value=datetime.utcnow().replace(microsecond=123)) + def test_worker_create_no_subsecond(self, mock_utcnow): + """Test basic creation of a worker record.""" + db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION = False + worker = db.worker_create(self.ctxt, **self.worker_fields) + db_worker = db.worker_get(self.ctxt, id=worker.id) + self._assertEqualObjects(worker, db_worker) + self.assertEqual(0, db_worker.updated_at.microsecond) + def test_worker_create_unique_constrains(self): """Test when we use an already existing resource type and id.""" db.worker_create(self.ctxt, **self.worker_fields) @@ -131,6 +162,21 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin): db_worker = db.worker_get(self.ctxt, id=worker.id) self._assertEqualObjects(worker, db_worker, ['updated_at']) + def test_worker_update_no_subsecond(self): + """Test basic worker update.""" + db.sqlalchemy.api.DB_SUPPORTS_SUBSECOND_RESOLUTION = False + worker = self._create_workers(1)[0] + worker = db.worker_get(self.ctxt, id=worker.id) + now = datetime.utcnow().replace(microsecond=123) + with mock.patch('oslo_utils.timeutils.utcnow', return_value=now): + res = db.worker_update(self.ctxt, worker.id, service_id=1) + self.assertEqual(1, res) + worker.service_id = 1 + + db_worker = db.worker_get(self.ctxt, id=worker.id) + self._assertEqualObjects(worker, db_worker, ['updated_at']) + self.assertEqual(0, db_worker.updated_at.microsecond) + def test_worker_update_update_orm(self): """Test worker update updating the worker orm object.""" worker = self._create_workers(1)[0] @@ -139,7 +185,9 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin): self.assertEqual(1, res) db_worker = db.worker_get(self.ctxt, id=worker.id) - self._assertEqualObjects(worker, db_worker, ['updated_at']) + # If we are updating the ORM object we don't ignore the update_at field + # because it will get updated in the ORM instance. + self._assertEqualObjects(worker, db_worker) def test_worker_destroy(self): """Test that worker destroy really deletes the DB entry.""" @@ -152,7 +200,7 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin): def test_worker_destroy_non_existent(self): """Test that worker destroy returns 0 when entry doesn't exist.""" - res = db.worker_destroy(self.ctxt, id=1) + res = db.worker_destroy(self.ctxt, id=100) self.assertEqual(0, res) def test_worker_claim(self): diff --git a/cinder/tests/unit/test_service.py b/cinder/tests/unit/test_service.py index 01b7ddd2c81..a1f1114e574 100644 --- a/cinder/tests/unit/test_service.py +++ b/cinder/tests/unit/test_service.py @@ -427,7 +427,8 @@ class ServiceTestCase(test.TestCase): # Since we have created the service entry we call init_host with # added_to_cluster=True init_host_mock.assert_called_once_with( - added_to_cluster=added_to_cluster) + added_to_cluster=added_to_cluster, + service_id=self.service_ref['id']) expected_target_calls = [mock.call(topic=self.topic, server=self.host)] expected_rpc_calls = [mock.call(target_mock.return_value, mock.ANY, diff --git a/cinder/tests/unit/test_volume.py b/cinder/tests/unit/test_volume.py index bd531054646..f150e4a067b 100644 --- a/cinder/tests/unit/test_volume.py +++ b/cinder/tests/unit/test_volume.py @@ -36,7 +36,6 @@ from oslo_utils import importutils from oslo_utils import timeutils from oslo_utils import units import six -from stevedore import extension from taskflow.engines.action_engine import engine from cinder.api import common @@ -65,6 +64,7 @@ from cinder.tests.unit import fake_volume from cinder.tests.unit.image import fake as fake_image from cinder.tests.unit.keymgr import fake as fake_keymgr from cinder.tests.unit import utils as tests_utils +from cinder.tests.unit import volume as base from cinder import utils import cinder.volume from cinder.volume import api as volume_api @@ -124,120 +124,7 @@ class FakeImageService(object): 'status': 'active'} -class BaseVolumeTestCase(test.TestCase): - """Test Case for volumes.""" - - FAKE_UUID = fake.IMAGE_ID - - def setUp(self): - super(BaseVolumeTestCase, self).setUp() - self.extension_manager = extension.ExtensionManager( - "BaseVolumeTestCase") - vol_tmpdir = tempfile.mkdtemp() - self.flags(volumes_dir=vol_tmpdir) - self.addCleanup(self._cleanup) - self.volume = importutils.import_object(CONF.volume_manager) - self.volume.message_api = mock.Mock() - self.configuration = mock.Mock(conf.Configuration) - self.context = context.get_admin_context() - self.context.user_id = fake.USER_ID - # NOTE(mriedem): The id is hard-coded here for tracking race fail - # assertions with the notification code, it's part of an - # elastic-recheck query so don't remove it or change it. - self.project_id = '7f265bd4-3a85-465e-a899-5dc4854a86d3' - self.context.project_id = self.project_id - self.volume_params = { - 'status': 'creating', - 'host': CONF.host, - 'size': 1} - self.mock_object(brick_lvm.LVM, - 'get_all_volume_groups', - self.fake_get_all_volume_groups) - fake_image.mock_image_service(self) - self.mock_object(brick_lvm.LVM, '_vg_exists', lambda x: True) - self.mock_object(os.path, 'exists', lambda x: True) - self.volume.driver.set_initialized() - self.volume.stats = {'allocated_capacity_gb': 0, - 'pools': {}} - # keep ordered record of what we execute - self.called = [] - self.volume_api = cinder.volume.api.API() - - def _cleanup(self): - try: - shutil.rmtree(CONF.volumes_dir) - except OSError: - pass - - def fake_get_all_volume_groups(obj, vg_name=None, no_suffix=True): - return [{'name': 'cinder-volumes', - 'size': '5.00', - 'available': '2.50', - 'lv_count': '2', - 'uuid': 'vR1JU3-FAKE-C4A9-PQFh-Mctm-9FwA-Xwzc1m'}] - - @mock.patch('cinder.image.image_utils.TemporaryImages.fetch') - @mock.patch('cinder.volume.flows.manager.create_volume.' - 'CreateVolumeFromSpecTask._clone_image_volume') - def _create_volume_from_image(self, mock_clone_image_volume, - mock_fetch_img, - fakeout_copy_image_to_volume=False, - fakeout_clone_image=False, - clone_image_volume=False): - """Test function of create_volume_from_image. - - Test cases call this function to create a volume from image, caller - can choose whether to fake out copy_image_to_volume and clone_image, - after calling this, test cases should check status of the volume. - """ - def fake_local_path(volume): - return dst_path - - def fake_copy_image_to_volume(context, volume, - image_service, image_id): - pass - - def fake_fetch_to_raw(ctx, image_service, image_id, path, blocksize, - size=None, throttle=None): - pass - - def fake_clone_image(ctx, volume_ref, - image_location, image_meta, - image_service): - return {'provider_location': None}, True - - dst_fd, dst_path = tempfile.mkstemp() - os.close(dst_fd) - self.mock_object(self.volume.driver, 'local_path', fake_local_path) - if fakeout_clone_image: - self.mock_object(self.volume.driver, 'clone_image', - fake_clone_image) - self.mock_object(image_utils, 'fetch_to_raw', fake_fetch_to_raw) - if fakeout_copy_image_to_volume: - self.mock_object(self.volume.driver, 'copy_image_to_volume', - fake_copy_image_to_volume) - mock_clone_image_volume.return_value = ({}, clone_image_volume) - mock_fetch_img.return_value = mock.MagicMock( - spec=tests_utils.get_file_spec()) - - image_id = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77' - volume = tests_utils.create_volume(self.context, **self.volume_params) - # creating volume testdata - try: - request_spec = { - 'volume_properties': self.volume_params, - 'image_id': image_id, - } - self.volume.create_volume(self.context, volume, request_spec) - finally: - # cleanup - os.unlink(dst_path) - volume = objects.Volume.get_by_id(self.context, volume.id) - - return volume - - -class AvailabilityZoneTestCase(BaseVolumeTestCase): +class AvailabilityZoneTestCase(base.BaseVolumeTestCase): def setUp(self): super(AvailabilityZoneTestCase, self).setUp() self.get_all = self.patch( @@ -317,59 +204,16 @@ class AvailabilityZoneTestCase(BaseVolumeTestCase): @ddt.ddt -class VolumeTestCase(BaseVolumeTestCase): +class VolumeTestCase(base.BaseVolumeTestCase): def setUp(self): super(VolumeTestCase, self).setUp() - self._clear_patch = mock.patch('cinder.volume.utils.clear_volume', - autospec=True) - self._clear_patch.start() + self.patch('cinder.volume.utils.clear_volume', autospec=True) self.expected_status = 'available' + self.service_id = 1 - def tearDown(self): - super(VolumeTestCase, self).tearDown() - self._clear_patch.stop() - - def test_init_host_clears_downloads(self): - """Test that init_host will unwedge a volume stuck in downloading.""" - volume = tests_utils.create_volume(self.context, status='downloading', - size=0, host=CONF.host) - self.volume.init_host() - volume.refresh() - self.assertEqual("error", volume.status) - self.volume.delete_volume(self.context, volume) - - def test_init_host_clears_uploads_available_volume(self): - """init_host will clean an available volume stuck in uploading.""" - volume = tests_utils.create_volume(self.context, status='uploading', - size=0, host=CONF.host) - self.volume.init_host() - volume = objects.Volume.get_by_id(context.get_admin_context(), - volume.id) - self.assertEqual("available", volume.status) - - def test_init_host_clears_uploads_in_use_volume(self): - """init_host will clean an in-use volume stuck in uploading.""" - volume = tests_utils.create_volume(self.context, status='uploading', - size=0, host=CONF.host) - fake_uuid = fakes.get_fake_uuid() - tests_utils.attach_volume(self.context, volume.id, fake_uuid, - 'fake_host', '/dev/vda') - self.volume.init_host() - volume = objects.Volume.get_by_id(context.get_admin_context(), - volume.id) - self.assertEqual("in-use", volume.status) - - def test_init_host_resumes_deletes(self): - """init_host will resume deleting volume in deleting status.""" - volume = tests_utils.create_volume(self.context, status='deleting', - size=0, host=CONF.host) - volume_id = volume['id'] - self.volume.init_host() - self.assertRaises(exception.VolumeNotFound, db.volume_get, - context.get_admin_context(), volume_id) - - def test_init_host_count_allocated_capacity(self): + @mock.patch('cinder.manager.CleanableManager.init_host') + def test_init_host_count_allocated_capacity(self, init_host_mock): vol0 = tests_utils.create_volume( self.context, size=100, host=CONF.host) vol1 = tests_utils.create_volume( @@ -384,7 +228,9 @@ class VolumeTestCase(BaseVolumeTestCase): vol4 = tests_utils.create_volume( self.context, size=1024, host=volutils.append_host(CONF.host, 'pool2')) - self.volume.init_host() + self.volume.init_host(service_id=self.service_id) + init_host_mock.assert_called_once_with( + service_id=self.service_id, added_to_cluster=None) stats = self.volume.stats self.assertEqual(2020, stats['allocated_capacity_gb']) self.assertEqual( @@ -427,7 +273,7 @@ class VolumeTestCase(BaseVolumeTestCase): {'id': snap1.id, 'provider_id': '7 8 yyyy'}] mock_update.return_value = (volumes, snapshots) # initialize - self.volume.init_host() + self.volume.init_host(service_id=self.service_id) # Grab volume and snapshot objects vol0_obj = objects.Volume.get_by_id(context.get_admin_context(), vol0.id) @@ -459,7 +305,7 @@ class VolumeTestCase(BaseVolumeTestCase): snap1 = tests_utils.create_snapshot(self.context, vol1.id) mock_update.return_value = ([], []) # initialize - self.volume.init_host() + self.volume.init_host(service_id=self.service_id) # Grab volume and snapshot objects vol0_obj = objects.Volume.get_by_id(context.get_admin_context(), vol0.id) @@ -481,16 +327,22 @@ class VolumeTestCase(BaseVolumeTestCase): @mock.patch('cinder.volume.manager.VolumeManager.' '_include_resources_in_cluster') def test_init_host_cluster_not_changed(self, include_in_cluster_mock): - self.volume.init_host(False) + self.volume.init_host(added_to_cluster=False, + service_id=self.service_id) include_in_cluster_mock.assert_not_called() + @mock.patch('cinder.objects.snapshot.SnapshotList.get_all', + return_value=[]) + @mock.patch('cinder.objects.volume.VolumeList.get_all', return_value=[]) @mock.patch('cinder.objects.volume.VolumeList.include_in_cluster') @mock.patch('cinder.objects.consistencygroup.ConsistencyGroupList.' 'include_in_cluster') - def test_init_host_added_to_cluster(self, vol_include_mock, - cg_include_mock): + def test_init_host_added_to_cluster(self, cg_include_mock, + vol_include_mock, vol_get_all_mock, + snap_get_all_mock): self.mock_object(self.volume, 'cluster', mock.sentinel.cluster) - self.volume.init_host(True) + self.volume.init_host(added_to_cluster=True, + service_id=self.service_id) vol_include_mock.assert_called_once_with(mock.ANY, mock.sentinel.cluster, @@ -498,6 +350,10 @@ class VolumeTestCase(BaseVolumeTestCase): cg_include_mock.assert_called_once_with(mock.ANY, mock.sentinel.cluster, host=self.volume.host) + vol_get_all_mock.assert_called_once_with( + mock.ANY, filters={'cluster_name': mock.sentinel.cluster}) + snap_get_all_mock.assert_called_once_with( + mock.ANY, search_opts={'cluster_name': mock.sentinel.cluster}) @mock.patch('cinder.objects.service.Service.get_minimum_rpc_version') @mock.patch('cinder.objects.service.Service.get_minimum_obj_version') @@ -552,45 +408,6 @@ class VolumeTestCase(BaseVolumeTestCase): self.volume.driver._initialized = False self.assertFalse(self.volume.is_working()) - def test_create_volume_fails_with_creating_and_downloading_status(self): - """Test init_host in case of volume. - - While the status of volume is 'creating' or 'downloading', - volume process down. - After process restarting this 'creating' status is changed to 'error'. - """ - for status in ['creating', 'downloading']: - volume = tests_utils.create_volume(self.context, status=status, - size=0, host=CONF.host) - - self.volume.init_host() - volume.refresh() - self.assertEqual('error', volume.status) - self.volume.delete_volume(self.context, volume) - - def test_create_snapshot_fails_with_creating_status(self): - """Test init_host in case of snapshot. - - While the status of snapshot is 'creating', volume process - down. After process restarting this 'creating' status is - changed to 'error'. - """ - volume = tests_utils.create_volume(self.context, - **self.volume_params) - snapshot = tests_utils.create_snapshot( - self.context, - volume['id'], - status=fields.SnapshotStatus.CREATING) - snap_id = snapshot['id'] - self.volume.init_host() - - snapshot_obj = objects.Snapshot.get_by_id(self.context, snap_id) - - self.assertEqual(fields.SnapshotStatus.ERROR, snapshot_obj.status) - - self.volume.delete_snapshot(self.context, snapshot_obj) - self.volume.delete_volume(self.context, volume) - @mock.patch('cinder.tests.unit.fake_notifier.FakeNotifier._notify') @mock.patch.object(QUOTAS, 'reserve') @mock.patch.object(QUOTAS, 'commit') @@ -1297,7 +1114,7 @@ class VolumeTestCase(BaseVolumeTestCase): def test_delete_volume_not_found(self, mock_get_volume): """Test delete volume moves on if the volume does not exist.""" volume_id = '12345678-1234-5678-1234-567812345678' - volume = objects.Volume(self.context, id=volume_id) + volume = objects.Volume(self.context, status='available', id=volume_id) self.volume.delete_volume(self.context, volume) self.assertTrue(mock_get_volume.called) @@ -4685,16 +4502,6 @@ class VolumeTestCase(BaseVolumeTestCase): self.context, snap) - def test_init_host_clears_deleting_snapshots(self): - """Test that init_host will delete a snapshot stuck in deleting.""" - volume = tests_utils.create_volume(self.context, status='deleting', - size=1, host=CONF.host) - snapshot = tests_utils.create_snapshot(self.context, - volume.id, status='deleting') - self.volume.init_host() - self.assertRaises(exception.VolumeNotFound, volume.refresh) - self.assertRaises(exception.SnapshotNotFound, snapshot.refresh) - @mock.patch('cinder.volume.drivers.lvm.LVMVolumeDriver.' 'manage_existing') @mock.patch('cinder.volume.drivers.lvm.LVMVolumeDriver.' @@ -4815,7 +4622,7 @@ class VolumeTestCase(BaseVolumeTestCase): @ddt.ddt -class VolumeMigrationTestCase(BaseVolumeTestCase): +class VolumeMigrationTestCase(base.BaseVolumeTestCase): def setUp(self): super(VolumeMigrationTestCase, self).setUp() @@ -5631,7 +5438,7 @@ class VolumeMigrationTestCase(BaseVolumeTestCase): self.context, volume.id) -class ReplicationTestCase(BaseVolumeTestCase): +class ReplicationTestCase(base.BaseVolumeTestCase): @mock.patch.object(volume_rpcapi.VolumeAPI, 'failover_host') @mock.patch.object(cinder.db, 'conditional_update') @@ -5734,7 +5541,7 @@ class ReplicationTestCase(BaseVolumeTestCase): host=CONF.host) -class CopyVolumeToImageTestCase(BaseVolumeTestCase): +class CopyVolumeToImageTestCase(base.BaseVolumeTestCase): def fake_local_path(self, volume): return self.dst_path @@ -6008,7 +5815,7 @@ class CopyVolumeToImageTestCase(BaseVolumeTestCase): self.assertTrue(mock_delete.called) -class GetActiveByWindowTestCase(BaseVolumeTestCase): +class GetActiveByWindowTestCase(base.BaseVolumeTestCase): def setUp(self): super(GetActiveByWindowTestCase, self).setUp() self.ctx = context.get_admin_context(read_deleted="yes") @@ -6761,7 +6568,7 @@ class VolumePolicyTestCase(test.TestCase): target) -class ImageVolumeCacheTestCase(BaseVolumeTestCase): +class ImageVolumeCacheTestCase(base.BaseVolumeTestCase): def setUp(self): super(ImageVolumeCacheTestCase, self).setUp() @@ -6833,7 +6640,7 @@ class ImageVolumeCacheTestCase(BaseVolumeTestCase): @ddt.ddt -class DiscardFlagTestCase(BaseVolumeTestCase): +class DiscardFlagTestCase(base.BaseVolumeTestCase): def setUp(self): super(DiscardFlagTestCase, self).setUp() diff --git a/cinder/tests/unit/test_volume_cleanup.py b/cinder/tests/unit/test_volume_cleanup.py new file mode 100644 index 00000000000..6e67e7b7424 --- /dev/null +++ b/cinder/tests/unit/test_volume_cleanup.py @@ -0,0 +1,177 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import mock +from oslo_config import cfg + +from cinder import context +from cinder import db +from cinder import exception +from cinder import objects +from cinder.objects import fields +from cinder import service +from cinder.tests.unit.api import fakes +from cinder.tests.unit import utils as tests_utils +from cinder.tests.unit import volume as base + + +CONF = cfg.CONF + + +class VolumeCleanupTestCase(base.BaseVolumeTestCase): + MOCK_WORKER = False + + def setUp(self): + super(VolumeCleanupTestCase, self).setUp() + self.service_id = 1 + self.mock_object(service.Service, 'service_id', self.service_id) + self.patch('cinder.volume.utils.clear_volume', autospec=True) + + def _assert_workers_are_removed(self): + workers = db.worker_get_all(self.context, read_deleted='yes') + self.assertListEqual([], workers) + + def test_init_host_clears_uploads_available_volume(self): + """init_host will clean an available volume stuck in uploading.""" + volume = tests_utils.create_volume(self.context, status='uploading', + size=0, host=CONF.host) + + db.worker_create(self.context, resource_type='Volume', + resource_id=volume.id, status=volume.status, + service_id=self.service_id) + + self.volume.init_host(service_id=service.Service.service_id) + volume.refresh() + self.assertEqual("available", volume.status) + self._assert_workers_are_removed() + + @mock.patch('cinder.manager.CleanableManager.init_host') + def test_init_host_clears_uploads_in_use_volume(self, init_host_mock): + """init_host will clean an in-use volume stuck in uploading.""" + volume = tests_utils.create_volume(self.context, status='uploading', + size=0, host=CONF.host) + + db.worker_create(self.context, resource_type='Volume', + resource_id=volume.id, status=volume.status, + service_id=self.service_id) + + fake_uuid = fakes.get_fake_uuid() + tests_utils.attach_volume(self.context, volume.id, fake_uuid, + 'fake_host', '/dev/vda') + self.volume.init_host(service_id=mock.sentinel.service_id) + init_host_mock.assert_called_once_with( + service_id=mock.sentinel.service_id, added_to_cluster=None) + volume.refresh() + self.assertEqual("in-use", volume.status) + self._assert_workers_are_removed() + + def test_init_host_clears_downloads(self): + """Test that init_host will unwedge a volume stuck in downloading.""" + volume = tests_utils.create_volume(self.context, status='downloading', + size=0, host=CONF.host) + db.worker_create(self.context, resource_type='Volume', + resource_id=volume.id, status=volume.status, + service_id=self.service_id) + mock_clear = self.mock_object(self.volume.driver, 'clear_download') + + self.volume.init_host(service_id=service.Service.service_id) + self.assertEqual(1, mock_clear.call_count) + self.assertEqual(volume.id, mock_clear.call_args[0][1].id) + volume.refresh() + self.assertEqual("error", volume['status']) + + self.volume.delete_volume(self.context, volume=volume) + self._assert_workers_are_removed() + + def test_init_host_resumes_deletes(self): + """init_host will resume deleting volume in deleting status.""" + volume = tests_utils.create_volume(self.context, status='deleting', + size=0, host=CONF.host) + + db.worker_create(self.context, resource_type='Volume', + resource_id=volume.id, status=volume.status, + service_id=self.service_id) + + self.volume.init_host(service_id=service.Service.service_id) + + self.assertRaises(exception.VolumeNotFound, db.volume_get, + context.get_admin_context(), volume.id) + self._assert_workers_are_removed() + + def test_create_volume_fails_with_creating_and_downloading_status(self): + """Test init_host_with_service in case of volume. + + While the status of volume is 'creating' or 'downloading', + volume process down. + After process restarting this 'creating' status is changed to 'error'. + """ + for status in ('creating', 'downloading'): + volume = tests_utils.create_volume(self.context, status=status, + size=0, host=CONF.host) + + db.worker_create(self.context, resource_type='Volume', + resource_id=volume.id, status=volume.status, + service_id=self.service_id) + + self.volume.init_host(service_id=service.Service.service_id) + volume.refresh() + + self.assertEqual('error', volume['status']) + self.volume.delete_volume(self.context, volume) + self._assert_workers_are_removed() + + def test_create_snapshot_fails_with_creating_status(self): + """Test init_host_with_service in case of snapshot. + + While the status of snapshot is 'creating', volume process + down. After process restarting this 'creating' status is + changed to 'error'. + """ + volume = tests_utils.create_volume(self.context, + **self.volume_params) + snapshot = tests_utils.create_snapshot( + self.context, + volume.id, + status=fields.SnapshotStatus.CREATING) + db.worker_create(self.context, resource_type='Snapshot', + resource_id=snapshot.id, status=snapshot.status, + service_id=self.service_id) + + self.volume.init_host(service_id=service.Service.service_id) + + snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot.id) + + self.assertEqual(fields.SnapshotStatus.ERROR, snapshot_obj.status) + self.assertEqual(service.Service.service_id, + self.volume.service_id) + self._assert_workers_are_removed() + + self.volume.delete_snapshot(self.context, snapshot_obj) + self.volume.delete_volume(self.context, volume) + + def test_init_host_clears_deleting_snapshots(self): + """Test that init_host will delete a snapshot stuck in deleting.""" + volume = tests_utils.create_volume(self.context, status='deleting', + size=1, host=CONF.host) + snapshot = tests_utils.create_snapshot(self.context, + volume.id, status='deleting') + + db.worker_create(self.context, resource_type='Volume', + resource_id=volume.id, status=volume.status, + service_id=self.service_id) + + self.volume.init_host(service_id=self.service_id) + self.assertRaises(exception.VolumeNotFound, volume.refresh) + self.assertRaises(exception.SnapshotNotFound, snapshot.refresh) diff --git a/cinder/tests/unit/volume/__init__.py b/cinder/tests/unit/volume/__init__.py index e69de29bb2d..1fa62ef5ad4 100644 --- a/cinder/tests/unit/volume/__init__.py +++ b/cinder/tests/unit/volume/__init__.py @@ -0,0 +1,150 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import os +import shutil +import tempfile + +import mock +from oslo_config import cfg +from oslo_utils import importutils +from stevedore import extension + +from cinder.brick.local_dev import lvm as brick_lvm +from cinder import context +from cinder.image import image_utils +from cinder import objects +from cinder import test +from cinder.tests.unit import fake_constants as fake +from cinder.tests.unit.image import fake as fake_image +from cinder.tests.unit import utils as tests_utils +from cinder.volume import api as volume_api +from cinder.volume import configuration as conf + + +CONF = cfg.CONF + + +class BaseVolumeTestCase(test.TestCase): + """Test Case for volumes.""" + + FAKE_UUID = fake.IMAGE_ID + + def setUp(self, *args, **kwargs): + super(BaseVolumeTestCase, self).setUp(*args, **kwargs) + self.extension_manager = extension.ExtensionManager( + "BaseVolumeTestCase") + vol_tmpdir = tempfile.mkdtemp() + self.flags(volumes_dir=vol_tmpdir) + self.addCleanup(self._cleanup) + self.volume = importutils.import_object(CONF.volume_manager) + self.volume.message_api = mock.Mock() + self.configuration = mock.Mock(conf.Configuration) + self.context = context.get_admin_context() + self.context.user_id = fake.USER_ID + # NOTE(mriedem): The id is hard-coded here for tracking race fail + # assertions with the notification code, it's part of an + # elastic-recheck query so don't remove it or change it. + self.project_id = '7f265bd4-3a85-465e-a899-5dc4854a86d3' + self.context.project_id = self.project_id + self.volume_params = { + 'status': 'creating', + 'host': CONF.host, + 'size': 1} + self.mock_object(brick_lvm.LVM, + 'get_all_volume_groups', + self.fake_get_all_volume_groups) + fake_image.mock_image_service(self) + self.mock_object(brick_lvm.LVM, '_vg_exists', lambda x: True) + self.mock_object(os.path, 'exists', lambda x: True) + self.volume.driver.set_initialized() + self.volume.stats = {'allocated_capacity_gb': 0, + 'pools': {}} + # keep ordered record of what we execute + self.called = [] + self.volume_api = volume_api.API() + + def _cleanup(self): + try: + shutil.rmtree(CONF.volumes_dir) + except OSError: + pass + + def fake_get_all_volume_groups(obj, vg_name=None, no_suffix=True): + return [{'name': 'cinder-volumes', + 'size': '5.00', + 'available': '2.50', + 'lv_count': '2', + 'uuid': 'vR1JU3-FAKE-C4A9-PQFh-Mctm-9FwA-Xwzc1m'}] + + @mock.patch('cinder.image.image_utils.TemporaryImages.fetch') + @mock.patch('cinder.volume.flows.manager.create_volume.' + 'CreateVolumeFromSpecTask._clone_image_volume') + def _create_volume_from_image(self, mock_clone_image_volume, + mock_fetch_img, + fakeout_copy_image_to_volume=False, + fakeout_clone_image=False, + clone_image_volume=False): + """Test function of create_volume_from_image. + + Test cases call this function to create a volume from image, caller + can choose whether to fake out copy_image_to_volume and clone_image, + after calling this, test cases should check status of the volume. + """ + def fake_local_path(volume): + return dst_path + + def fake_copy_image_to_volume(context, volume, + image_service, image_id): + pass + + def fake_fetch_to_raw(ctx, image_service, image_id, path, blocksize, + size=None, throttle=None): + pass + + def fake_clone_image(ctx, volume_ref, + image_location, image_meta, + image_service): + return {'provider_location': None}, True + + dst_fd, dst_path = tempfile.mkstemp() + os.close(dst_fd) + self.mock_object(self.volume.driver, 'local_path', fake_local_path) + if fakeout_clone_image: + self.mock_object(self.volume.driver, 'clone_image', + fake_clone_image) + self.mock_object(image_utils, 'fetch_to_raw', fake_fetch_to_raw) + if fakeout_copy_image_to_volume: + self.mock_object(self.volume.driver, 'copy_image_to_volume', + fake_copy_image_to_volume) + mock_clone_image_volume.return_value = ({}, clone_image_volume) + mock_fetch_img.return_value = mock.MagicMock( + spec=tests_utils.get_file_spec()) + + image_id = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77' + volume = tests_utils.create_volume(self.context, **self.volume_params) + # creating volume testdata + try: + request_spec = { + 'volume_properties': self.volume_params, + 'image_id': image_id, + } + self.volume.create_volume(self.context, volume, request_spec) + finally: + # cleanup + os.unlink(dst_path) + volume = objects.Volume.get_by_id(self.context, volume.id) + + return volume diff --git a/cinder/tests/unit/volume/test_manage_volume.py b/cinder/tests/unit/volume/test_manage_volume.py index 0118ead684e..984bb8c6c7d 100644 --- a/cinder/tests/unit/volume/test_manage_volume.py +++ b/cinder/tests/unit/volume/test_manage_volume.py @@ -14,13 +14,12 @@ import mock -from cinder.tests.unit import fake_constants as fake -from cinder.tests.unit import fake_volume -from cinder.tests.unit import test_volume - from cinder import context from cinder import exception from cinder import objects +from cinder.tests.unit import fake_constants as fake +from cinder.tests.unit import fake_volume +from cinder.tests.unit import volume as base from cinder.volume.flows.manager import manage_existing from cinder.volume import manager from cinder.volume import utils @@ -29,7 +28,7 @@ FAKE_HOST_POOL = 'volPool' FAKE_HOST = 'hostname@backend' -class ManageVolumeTestCase(test_volume.BaseVolumeTestCase): +class ManageVolumeTestCase(base.BaseVolumeTestCase): def setUp(self): super(ManageVolumeTestCase, self).setUp() diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index ef6f560945f..80bb31c6f0d 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -152,7 +152,8 @@ MAPPING = { } -class VolumeManager(manager.SchedulerDependentManager): +class VolumeManager(manager.CleanableManager, + manager.SchedulerDependentManager): """Manages attachable block storage devices.""" RPC_API_VERSION = volume_rpcapi.VolumeAPI.RPC_API_VERSION @@ -192,7 +193,7 @@ class VolumeManager(manager.SchedulerDependentManager): LOG.debug("Cinder Volume DB check: vol_db_empty=%s", vol_db_empty) # We pass the current setting for service.active_backend_id to - # the driver on init, incase there was a restart or something + # the driver on init, in case there was a restart or something curr_active_backend_id = None svc_host = vol_utils.extract_host(self.host, 'backend') try: @@ -317,9 +318,9 @@ class VolumeManager(manager.SchedulerDependentManager): def _sync_provider_info(self, ctxt, volumes, snapshots): # NOTE(jdg): For now this just updates provider_id, we can add more - # add more items to the update if they're relevant but we need - # to be safe in what we allow and add a list of allowed keys - # things that make sense are provider_*, replication_status etc + # items to the update if they're relevant but we need to be safe in + # what we allow and add a list of allowed keys. Things that make sense + # are provider_*, replication_status etc updates, snapshot_updates = self.driver.update_provider_info( volumes, snapshots) @@ -370,7 +371,7 @@ class VolumeManager(manager.SchedulerDependentManager): {'num_vols': num_vols, 'num_cgs': num_cgs, 'host': self.host, 'cluster': self.cluster}) - def init_host(self, added_to_cluster=None): + def init_host(self, added_to_cluster=None, **kwargs): """Perform any required initialization.""" ctxt = context.get_admin_context() if not self.driver.supported: @@ -407,14 +408,19 @@ class VolumeManager(manager.SchedulerDependentManager): # Initialize backend capabilities list self.driver.init_capabilities() - volumes = objects.VolumeList.get_all_by_host(ctxt, self.host) - snapshots = objects.SnapshotList.get_by_host(ctxt, self.host) + if self.cluster: + filters = {'cluster_name': self.cluster} + else: + filters = {'host': self.host} + volumes = objects.VolumeList.get_all(ctxt, filters=filters) + snapshots = objects.SnapshotList.get_all(ctxt, search_opts=filters) self._sync_provider_info(ctxt, volumes, snapshots) # FIXME volume count for exporting is wrong + self.stats['pools'] = {} + self.stats.update({'allocated_capacity_gb': 0}) + try: - self.stats['pools'] = {} - self.stats.update({'allocated_capacity_gb': 0}) for volume in volumes: # available volume should also be counted into allocated if volume['status'] in ['in-use', 'available']: @@ -428,32 +434,10 @@ class VolumeManager(manager.SchedulerDependentManager): LOG.exception(_LE("Failed to re-export volume, " "setting to ERROR."), resource=volume) - volume.status = 'error' - volume.save() - elif volume['status'] in ('downloading', 'creating'): - LOG.warning(_LW("Detected volume stuck " - "in %(curr_status)s " - "status, setting to ERROR."), - {'curr_status': volume['status']}, - resource=volume) + volume.conditional_update({'status': 'error'}, + {'status': 'in-use'}) + # All other cleanups are processed by parent class CleanableManager - if volume['status'] == 'downloading': - self.driver.clear_download(ctxt, volume) - volume.status = 'error' - volume.save() - elif volume.status == 'uploading': - # Set volume status to available or in-use. - self.db.volume_update_status_based_on_attachment( - ctxt, volume.id) - else: - pass - snapshots = objects.SnapshotList.get_by_host( - ctxt, self.host, {'status': fields.SnapshotStatus.CREATING}) - for snapshot in snapshots: - LOG.warning(_LW("Detected snapshot stuck in creating " - "status, setting to ERROR."), resource=snapshot) - snapshot.status = fields.SnapshotStatus.ERROR - snapshot.save() except Exception: LOG.exception(_LE("Error during re-export on driver init."), resource=volume) @@ -466,26 +450,16 @@ class VolumeManager(manager.SchedulerDependentManager): # that an entry exists in the service table self.driver.set_initialized() - for volume in volumes: - if volume['status'] == 'deleting': - if CONF.volume_service_inithost_offload: - # Offload all the pending volume delete operations to the - # threadpool to prevent the main volume service thread - # from being blocked. - self._add_to_threadpool(self.delete_volume, ctxt, volume, - cascade=True) - else: - # By default, delete volumes sequentially - self.delete_volume(ctxt, volume, cascade=True) - LOG.info(_LI("Resume volume delete completed successfully."), - resource=volume) - # collect and publish service capabilities self.publish_service_capabilities(ctxt) LOG.info(_LI("Driver initialization completed successfully."), resource={'type': 'driver', 'id': self.driver.__class__.__name__}) + # Make sure to call CleanableManager to do the cleanup + super(VolumeManager, self).init_host(added_to_cluster=added_to_cluster, + **kwargs) + def init_host_with_rpc(self): LOG.info(_LI("Initializing RPC dependent components of volume " "driver %(driver_name)s (%(version)s)"), @@ -527,6 +501,37 @@ class VolumeManager(manager.SchedulerDependentManager): resource={'type': 'driver', 'id': self.driver.__class__.__name__}) + def _do_cleanup(self, ctxt, vo_resource): + if isinstance(vo_resource, objects.Volume): + if vo_resource.status == 'downloading': + self.driver.clear_download(ctxt, vo_resource) + + elif vo_resource.status == 'uploading': + # Set volume status to available or in-use. + self.db.volume_update_status_based_on_attachment( + ctxt, vo_resource.id) + + elif vo_resource.status == 'deleting': + if CONF.volume_service_inithost_offload: + # Offload all the pending volume delete operations to the + # threadpool to prevent the main volume service thread + # from being blocked. + self._add_to_threadpool(self.delete_volume, ctxt, + vo_resource, cascade=True) + else: + # By default, delete volumes sequentially + self.delete_volume(ctxt, vo_resource, cascade=True) + # We signal that we take care of cleaning the worker ourselves + # (with set_workers decorator in delete_volume method) so + # do_cleanup method doesn't need to remove it. + return True + + # For Volume creating and downloading and for Snapshot downloading + # statuses we have to set status to error + if vo_resource.status in ('creating', 'downloading'): + vo_resource.status = 'error' + vo_resource.save() + def is_working(self): """Return if Manager is ready to accept requests. @@ -536,6 +541,7 @@ class VolumeManager(manager.SchedulerDependentManager): """ return self.driver.initialized + @objects.Volume.set_workers def create_volume(self, context, volume, request_spec=None, filter_properties=None, allow_reschedule=True): """Creates the volume.""" @@ -627,6 +633,7 @@ class VolumeManager(manager.SchedulerDependentManager): return volume.id @coordination.synchronized('{volume.id}-{f_name}') + @objects.Volume.set_workers def delete_volume(self, context, volume, unmanage_only=False, cascade=False): """Deletes and unexports volume. @@ -786,6 +793,7 @@ class VolumeManager(manager.SchedulerDependentManager): volume_ref.status = status volume_ref.save() + @objects.Snapshot.set_workers def create_snapshot(self, context, snapshot): """Creates and exports the snapshot.""" context = context.elevated() diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py index 7c55fd03894..01b7e7dc309 100644 --- a/cinder/volume/rpcapi.py +++ b/cinder/volume/rpcapi.py @@ -164,6 +164,7 @@ class VolumeAPI(rpc.RPCAPI): volume=volume) def delete_volume(self, ctxt, volume, unmanage_only=False, cascade=False): + volume.create_worker() cctxt = self._get_cctxt(volume.host) msg_args = { 'volume': volume, 'unmanage_only': unmanage_only, @@ -173,6 +174,7 @@ class VolumeAPI(rpc.RPCAPI): cctxt.cast(ctxt, 'delete_volume', **msg_args) def create_snapshot(self, ctxt, volume, snapshot): + snapshot.create_worker() cctxt = self._get_cctxt(volume['host']) cctxt.cast(ctxt, 'create_snapshot', snapshot=snapshot)