Add cleanable base object and cleanup request VO
This patch adds CinderCleanableObject class that is a Versioned Object base class and CleanupRequest Versioned Object that will be used to pass cleanup requests to c-vol and c-bak nodes but will not have a DB representation. This will be used for non Active-Active configurations as well. Specs: https://review.openstack.org/236977 Implements: blueprint cinder-volume-active-active-support Change-Id: Ia84b2f55a782c5e881bab03a8469b884f265910c
This commit is contained in:
parent
852a66d8d0
commit
95170e54b2
@ -444,6 +444,11 @@ class WorkerExists(Duplicate):
|
||||
message = _("Worker for %(type)s %(id)s already exists.")
|
||||
|
||||
|
||||
class CleanableInUse(Invalid):
|
||||
message = _('%(type)s with id %(id)s is already being cleaned up or '
|
||||
'another host has taken over it.')
|
||||
|
||||
|
||||
class ClusterNotFound(NotFound):
|
||||
message = _('Cluster %(id)s could not be found.')
|
||||
|
||||
|
@ -25,6 +25,8 @@ def register_all():
|
||||
# function in order for it to be registered by services that may
|
||||
# need to receive it via RPC.
|
||||
__import__('cinder.objects.backup')
|
||||
# NOTE(geguileo): Don't include cleanable to prevent circular imports
|
||||
__import__('cinder.objects.cleanup_request')
|
||||
__import__('cinder.objects.cgsnapshot')
|
||||
__import__('cinder.objects.cluster')
|
||||
__import__('cinder.objects.consistencygroup')
|
||||
|
@ -117,6 +117,7 @@ OBJ_VERSIONS.add('1.10', {'Group': '1.0', 'GroupList': '1.0', 'Volume': '1.5',
|
||||
OBJ_VERSIONS.add('1.11', {'GroupSnapshot': '1.0', 'GroupSnapshotList': '1.0',
|
||||
'Group': '1.1'})
|
||||
OBJ_VERSIONS.add('1.12', {'VolumeType': '1.3'})
|
||||
OBJ_VERSIONS.add('1.13', {'CleanupRequest': '1.0'})
|
||||
|
||||
|
||||
class CinderObjectRegistry(base.VersionedObjectRegistry):
|
||||
|
218
cinder/objects/cleanable.py
Normal file
218
cinder/objects/cleanable.py
Normal file
@ -0,0 +1,218 @@
|
||||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from functools import wraps
|
||||
import inspect
|
||||
|
||||
from oslo_utils import versionutils
|
||||
|
||||
from cinder import db
|
||||
from cinder import exception
|
||||
from cinder.objects import base
|
||||
from cinder import service
|
||||
from cinder.volume import rpcapi as vol_rpcapi
|
||||
|
||||
|
||||
class CinderCleanableObject(base.CinderPersistentObject):
|
||||
"""Base class for cleanable OVO resources."""
|
||||
worker = None
|
||||
|
||||
@classmethod
|
||||
def get_rpc_api(cls):
|
||||
# By default assume all resources are handled by c-vol services
|
||||
return vol_rpcapi.VolumeAPI
|
||||
|
||||
@classmethod
|
||||
def get_pinned_version(cls):
|
||||
# We pin the version by the last service that gets updated, which is
|
||||
# c-vol or c-bak
|
||||
min_obj_vers_str = cls.get_rpc_api().determine_obj_version_cap()
|
||||
|
||||
# Get current pinned down version for this object
|
||||
version = base.OBJ_VERSIONS[min_obj_vers_str][cls.__name__]
|
||||
return versionutils.convert_version_to_int(version)
|
||||
|
||||
@staticmethod
|
||||
def _is_cleanable(status, obj_version):
|
||||
"""Check if a specific status for a specific OBJ version is cleanable.
|
||||
|
||||
Each CinderCleanableObject class should implement this method and
|
||||
return True for cleanable status for versions equal or higher to the
|
||||
ones where the functionality was added.
|
||||
|
||||
:returns: Whether to create a workers DB entry or not
|
||||
:param obj_version: Min object version running in the cloud or None if
|
||||
current version.
|
||||
:type obj_version: float
|
||||
"""
|
||||
return False
|
||||
|
||||
def is_cleanable(self, pinned=False):
|
||||
"""Check if cleanable VO status is cleanable.
|
||||
|
||||
:param pinned: If we should check against pinned version or current
|
||||
version.
|
||||
:type pinned_version: bool
|
||||
:returns: Whether this needs a workers DB entry or not
|
||||
"""
|
||||
if pinned:
|
||||
obj_version = self.get_pinned_version()
|
||||
else:
|
||||
obj_version = None
|
||||
return self._is_cleanable(self.status, obj_version)
|
||||
|
||||
def create_worker(self, pinned=True):
|
||||
"""Create a worker entry at the API."""
|
||||
# This method is mostly called from the rpc layer, therefore it checks
|
||||
# if it's cleanable given current pinned version.
|
||||
if not self.is_cleanable(pinned):
|
||||
return False
|
||||
|
||||
resource_type = self.__class__.__name__
|
||||
|
||||
entry_in_db = False
|
||||
|
||||
# This will only loop on very rare race conditions
|
||||
while not entry_in_db:
|
||||
try:
|
||||
# On the common case there won't be an entry in the DB, that's
|
||||
# why we try to create first.
|
||||
db.worker_create(self._context, status=self.status,
|
||||
resource_type=resource_type,
|
||||
resource_id=self.id)
|
||||
entry_in_db = True
|
||||
except exception.WorkerExists:
|
||||
try:
|
||||
db.worker_update(self._context, None,
|
||||
filters={'resource_type': resource_type,
|
||||
'resource_id': self.id},
|
||||
service_id=None,
|
||||
status=self.status)
|
||||
entry_in_db = True
|
||||
except exception.WorkerNotFound:
|
||||
pass
|
||||
return entry_in_db
|
||||
|
||||
def set_worker(self):
|
||||
worker = self.worker
|
||||
|
||||
service_id = service.Service.service_id
|
||||
resource_type = self.__class__.__name__
|
||||
|
||||
if worker:
|
||||
if worker.cleaning:
|
||||
return
|
||||
else:
|
||||
try:
|
||||
worker = db.worker_get(self._context,
|
||||
resource_type=resource_type,
|
||||
resource_id=self.id)
|
||||
except exception.WorkerNotFound:
|
||||
# If the call didn't come from an RPC call we still have to
|
||||
# create the entry in the DB.
|
||||
try:
|
||||
self.worker = db.worker_create(self._context,
|
||||
status=self.status,
|
||||
resource_type=resource_type,
|
||||
resource_id=self.id,
|
||||
service_id=service_id)
|
||||
return
|
||||
except exception.WorkerExists:
|
||||
# If 2 cleanable operations are competing for this resource
|
||||
# and the other one created the entry first that one won
|
||||
raise exception.CleanableInUse(type=resource_type,
|
||||
id=self.id)
|
||||
|
||||
# If we have to claim this work or if the status has changed we have
|
||||
# to update DB.
|
||||
if (worker.service_id != service_id or worker.status != self.status):
|
||||
try:
|
||||
db.worker_update(self._context, worker.id,
|
||||
filters={'service_id': worker.service_id,
|
||||
'status': worker.status,
|
||||
'updated_at': worker.updated_at},
|
||||
service_id=service_id,
|
||||
status=self.status,
|
||||
orm_worker=worker)
|
||||
except exception.WorkerNotFound:
|
||||
self.worker = None
|
||||
raise exception.CleanableInUse(type=self.__class__.__name__,
|
||||
id=self.id)
|
||||
self.worker = worker
|
||||
|
||||
def unset_worker(self):
|
||||
if self.worker:
|
||||
db.worker_destroy(self._context, id=self.worker.id,
|
||||
status=self.worker.status,
|
||||
service_id=self.worker.service_id)
|
||||
self.worker = None
|
||||
|
||||
@staticmethod
|
||||
def set_workers(*decorator_args):
|
||||
"""Decorator that adds worker DB rows for cleanable versioned objects.
|
||||
|
||||
By default will take care of all cleanable objects, but we can limit
|
||||
which objects we want by passing the name of the arguments we want
|
||||
to be added.
|
||||
"""
|
||||
def _decorator(f):
|
||||
@wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
if decorator_args:
|
||||
call_args = inspect.getcallargs(f, *args, **kwargs)
|
||||
candidates = [call_args[obj] for obj in decorator_args]
|
||||
else:
|
||||
candidates = list(args)
|
||||
candidates.extend(kwargs.values())
|
||||
cleanables = [cand for cand in candidates
|
||||
if (isinstance(cand, CinderCleanableObject)
|
||||
and cand.is_cleanable(pinned=False))]
|
||||
try:
|
||||
# Create the entries in the workers table
|
||||
for cleanable in cleanables:
|
||||
cleanable.set_worker()
|
||||
|
||||
# Call the function
|
||||
result = f(*args, **kwargs)
|
||||
finally:
|
||||
# Remove entries from the workers table
|
||||
for cleanable in cleanables:
|
||||
# NOTE(geguileo): We check that the status has changed
|
||||
# to avoid removing the worker entry when we finished
|
||||
# the operation due to an unexpected exception and also
|
||||
# when this process stops because the main process has
|
||||
# stopped.
|
||||
if (cleanable.worker and
|
||||
cleanable.status != cleanable.worker.status):
|
||||
try:
|
||||
cleanable.unset_worker()
|
||||
except Exception:
|
||||
pass
|
||||
return result
|
||||
return wrapper
|
||||
|
||||
# If we don't have optional decorator arguments the argument in
|
||||
# decorator_args is the function we have to decorate
|
||||
if len(decorator_args) == 1 and callable(decorator_args[0]):
|
||||
function = decorator_args[0]
|
||||
decorator_args = None
|
||||
return _decorator(function)
|
||||
return _decorator
|
||||
|
||||
def refresh(self):
|
||||
# We want to keep the worker entry on refresh
|
||||
worker = self.worker
|
||||
super(CinderCleanableObject, self).refresh()
|
||||
self.worker = worker
|
49
cinder/objects/cleanup_request.py
Normal file
49
cinder/objects/cleanup_request.py
Normal file
@ -0,0 +1,49 @@
|
||||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_versionedobjects import fields
|
||||
|
||||
from cinder.objects import base
|
||||
|
||||
|
||||
@base.CinderObjectRegistry.register
|
||||
class CleanupRequest(base.CinderObject, base.ClusteredObject):
|
||||
"""Versioned Object to send cleanup requests."""
|
||||
# Version 1.0: Initial version
|
||||
VERSION = '1.0'
|
||||
|
||||
fields = {
|
||||
'service_id': fields.IntegerField(nullable=True),
|
||||
'cluster_name': fields.StringField(nullable=True),
|
||||
'host': fields.StringField(nullable=True),
|
||||
'binary': fields.StringField(nullable=True),
|
||||
'is_up': fields.BooleanField(default=False, nullable=True),
|
||||
'disabled': fields.BooleanField(nullable=True),
|
||||
'resource_id': fields.UUIDField(nullable=True),
|
||||
'resource_type': fields.StringField(nullable=True),
|
||||
'until': fields.DateTimeField(nullable=True),
|
||||
}
|
||||
|
||||
def __init__(self, context=None, **kwargs):
|
||||
super(CleanupRequest, self).__init__(**kwargs)
|
||||
|
||||
# Set non initialized fields with default or None values
|
||||
for field_name in self.fields:
|
||||
if not self.obj_attr_is_set(field_name):
|
||||
field = self.fields[field_name]
|
||||
if field.default != fields.UnspecifiedDefault:
|
||||
setattr(self, field_name, field.default)
|
||||
elif field.nullable:
|
||||
setattr(self, field_name, None)
|
@ -122,6 +122,9 @@ class Service(service.Service):
|
||||
it state to the database services table.
|
||||
"""
|
||||
|
||||
# Make service_id a class attribute so it can be used for clean up
|
||||
service_id = None
|
||||
|
||||
def __init__(self, host, binary, topic, manager, report_interval=None,
|
||||
periodic_interval=None, periodic_fuzzy_delay=None,
|
||||
service_name=None, coordination=False, cluster=None, *args,
|
||||
@ -186,7 +189,7 @@ class Service(service.Service):
|
||||
self._ensure_cluster_exists(ctxt, service_ref.disabled)
|
||||
service_ref.cluster_name = cluster
|
||||
service_ref.save()
|
||||
self.service_id = service_ref.id
|
||||
Service.service_id = service_ref.id
|
||||
except exception.NotFound:
|
||||
# We don't want to include cluster information on the service or
|
||||
# create the cluster entry if we are upgrading.
|
||||
@ -332,7 +335,7 @@ class Service(service.Service):
|
||||
kwargs['cluster_name'] = self.cluster
|
||||
service_ref = objects.Service(context=context, **kwargs)
|
||||
service_ref.create()
|
||||
self.service_id = service_ref.id
|
||||
Service.service_id = service_ref.id
|
||||
# TODO(geguileo): In O unconditionally ensure that the cluster exists
|
||||
if not self.is_upgrading_to_n:
|
||||
self._ensure_cluster_exists(context)
|
||||
@ -442,12 +445,14 @@ class Service(service.Service):
|
||||
zone = CONF.storage_availability_zone
|
||||
try:
|
||||
try:
|
||||
service_ref = objects.Service.get_by_id(ctxt, self.service_id)
|
||||
service_ref = objects.Service.get_by_id(ctxt,
|
||||
Service.service_id)
|
||||
except exception.NotFound:
|
||||
LOG.debug('The service database object disappeared, '
|
||||
'recreating it.')
|
||||
self._create_service_ref(ctxt)
|
||||
service_ref = objects.Service.get_by_id(ctxt, self.service_id)
|
||||
service_ref = objects.Service.get_by_id(ctxt,
|
||||
Service.service_id)
|
||||
|
||||
service_ref.report_count += 1
|
||||
if zone != service_ref.availability_zone:
|
||||
|
286
cinder/tests/unit/objects/test_cleanable.py
Normal file
286
cinder/tests/unit/objects/test_cleanable.py
Normal file
@ -0,0 +1,286 @@
|
||||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from cinder import context
|
||||
from cinder import exception
|
||||
from cinder.objects import cleanable
|
||||
from cinder import rpc
|
||||
from cinder import service
|
||||
from cinder.tests.unit import objects as test_objects
|
||||
from cinder.volume import rpcapi
|
||||
|
||||
|
||||
# NOTE(geguileo): We use Backup because we have version changes from 1.0 to 1.3
|
||||
|
||||
class Backup(cleanable.CinderCleanableObject):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Backup, self).__init__(*args)
|
||||
for attr, value in kwargs.items():
|
||||
setattr(self, attr, value)
|
||||
|
||||
@staticmethod
|
||||
def _is_cleanable(status, obj_version):
|
||||
if obj_version and obj_version <= 1003:
|
||||
return False
|
||||
return status == 'cleanable'
|
||||
|
||||
|
||||
class TestCleanable(test_objects.BaseObjectsTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestCleanable, self).setUp()
|
||||
self.context = context.RequestContext(self.user_id, self.project_id,
|
||||
is_admin=True)
|
||||
|
||||
def test_get_rpc_api(self):
|
||||
"""Test get_rpc_api."""
|
||||
vol_rpcapi = cleanable.CinderCleanableObject.get_rpc_api()
|
||||
self.assertEqual(rpcapi.VolumeAPI, vol_rpcapi)
|
||||
|
||||
def test_get_pinned_version(self):
|
||||
"""Test that we get the pinned version for this specific object."""
|
||||
rpc.LAST_OBJ_VERSIONS[Backup.get_rpc_api().BINARY] = '1.0'
|
||||
version = Backup.get_pinned_version()
|
||||
self.assertEqual(1003, version)
|
||||
|
||||
def test_is_cleanable_pinned_pinned_too_old(self):
|
||||
"""Test is_cleanable with pinned version with uncleanable version."""
|
||||
rpc.LAST_OBJ_VERSIONS[Backup.get_rpc_api().BINARY] = '1.0'
|
||||
backup = Backup(status='cleanable')
|
||||
self.assertFalse(backup.is_cleanable(pinned=True))
|
||||
|
||||
def test_is_cleanable_pinned_result_true(self):
|
||||
"""Test with pinned version with cleanable version and status."""
|
||||
rpc.LAST_OBJ_VERSIONS[Backup.get_rpc_api().BINARY] = '1.3'
|
||||
backup = Backup(status='cleanable')
|
||||
self.assertTrue(backup.is_cleanable(pinned=True))
|
||||
|
||||
def test_is_cleanable_pinned_result_false(self):
|
||||
"""Test with pinned version with cleanable version but not status."""
|
||||
rpc.LAST_OBJ_VERSIONS[Backup.get_rpc_api().BINARY] = '1.3'
|
||||
backup = Backup(status='not_cleanable')
|
||||
self.assertFalse(backup.is_cleanable(pinned=True))
|
||||
|
||||
def test_is_cleanable_unpinned_result_false(self):
|
||||
"""Test unpinned version with old version and non cleanable status."""
|
||||
rpc.LAST_OBJ_VERSIONS[Backup.get_rpc_api().BINARY] = '1.0'
|
||||
backup = Backup(status='not_cleanable')
|
||||
self.assertFalse(backup.is_cleanable(pinned=False))
|
||||
|
||||
def test_is_cleanable_unpinned_result_true(self):
|
||||
"""Test unpinned version with old version and cleanable status."""
|
||||
rpc.LAST_OBJ_VERSIONS[Backup.get_rpc_api().BINARY] = '1.0'
|
||||
backup = Backup(status='cleanable')
|
||||
self.assertTrue(backup.is_cleanable(pinned=False))
|
||||
|
||||
@mock.patch('cinder.db.worker_create', autospec=True)
|
||||
def test_create_worker(self, mock_create):
|
||||
"""Test worker creation as if it were from an rpc call."""
|
||||
rpc.LAST_OBJ_VERSIONS[Backup.get_rpc_api().BINARY] = '1.3'
|
||||
mock_create.return_value = mock.sentinel.worker
|
||||
backup = Backup(_context=self.context, status='cleanable',
|
||||
id=mock.sentinel.id)
|
||||
res = backup.create_worker()
|
||||
self.assertTrue(res)
|
||||
mock_create.assert_called_once_with(self.context,
|
||||
status='cleanable',
|
||||
resource_type='Backup',
|
||||
resource_id=mock.sentinel.id)
|
||||
|
||||
@mock.patch('cinder.db.worker_create', autospec=True)
|
||||
def test_create_worker_pinned_too_old(self, mock_create):
|
||||
"""Test worker creation when we are pinnned with an old version."""
|
||||
rpc.LAST_OBJ_VERSIONS[Backup.get_rpc_api().BINARY] = '1.0'
|
||||
mock_create.return_value = mock.sentinel.worker
|
||||
backup = Backup(_context=self.context, status='cleanable',
|
||||
id=mock.sentinel.id)
|
||||
res = backup.create_worker()
|
||||
self.assertFalse(res)
|
||||
self.assertFalse(mock_create.called)
|
||||
|
||||
@mock.patch('cinder.db.worker_create', autospec=True)
|
||||
def test_create_worker_non_cleanable(self, mock_create):
|
||||
"""Test worker creation when status is non cleanable."""
|
||||
rpc.LAST_OBJ_VERSIONS[Backup.get_rpc_api().BINARY] = '1.3'
|
||||
mock_create.return_value = mock.sentinel.worker
|
||||
backup = Backup(_context=self.context, status='non_cleanable',
|
||||
id=mock.sentinel.id)
|
||||
res = backup.create_worker()
|
||||
self.assertFalse(res)
|
||||
self.assertFalse(mock_create.called)
|
||||
|
||||
@mock.patch('cinder.db.worker_update', autospec=True)
|
||||
@mock.patch('cinder.db.worker_create', autospec=True)
|
||||
def test_create_worker_already_exists(self, mock_create, mock_update):
|
||||
"""Test worker creation when a worker for the resource exists."""
|
||||
rpc.LAST_OBJ_VERSIONS[Backup.get_rpc_api().BINARY] = '1.3'
|
||||
mock_create.side_effect = exception.WorkerExists(type='type', id='id')
|
||||
|
||||
backup = Backup(_context=self.context, status='cleanable',
|
||||
id=mock.sentinel.id)
|
||||
res = backup.create_worker()
|
||||
self.assertTrue(res)
|
||||
self.assertTrue(mock_create.called)
|
||||
mock_update.assert_called_once_with(
|
||||
self.context, None,
|
||||
filters={'resource_type': 'Backup',
|
||||
'resource_id': mock.sentinel.id},
|
||||
service_id=None, status='cleanable')
|
||||
|
||||
@mock.patch('cinder.db.worker_update', autospec=True)
|
||||
@mock.patch('cinder.db.worker_create', autospec=True)
|
||||
def test_create_worker_cleaning(self, mock_create, mock_update):
|
||||
"""Test worker creation on race condition.
|
||||
|
||||
Test that we still create an entry if there is a rare race condition
|
||||
that the entry gets removed from the DB between our failure to create
|
||||
it and our try to update the entry.
|
||||
"""
|
||||
rpc.LAST_OBJ_VERSIONS[Backup.get_rpc_api().BINARY] = '1.3'
|
||||
mock_create.side_effect = [
|
||||
exception.WorkerExists(type='type', id='id'), mock.sentinel.worker]
|
||||
mock_update.side_effect = exception.WorkerNotFound
|
||||
|
||||
backup = Backup(_context=self.context, status='cleanable',
|
||||
id=mock.sentinel.id)
|
||||
self.assertTrue(backup.create_worker())
|
||||
self.assertEqual(2, mock_create.call_count)
|
||||
self.assertTrue(mock_update.called)
|
||||
|
||||
@mock.patch('cinder.db.worker_update', autospec=True)
|
||||
@mock.patch('cinder.db.worker_get', autospec=True)
|
||||
def test_set_worker(self, mock_get, mock_update):
|
||||
"""Test set worker for a normal job received from an rpc call."""
|
||||
service.Service.service_id = mock.sentinel.service_id
|
||||
mock_get.return_value.cleaning = False
|
||||
backup = Backup(_context=self.context, status=mock.sentinel.status,
|
||||
id=mock.sentinel.id)
|
||||
|
||||
backup.set_worker()
|
||||
mock_get.assert_called_once_with(self.context, resource_type='Backup',
|
||||
resource_id=mock.sentinel.id)
|
||||
worker = mock_get.return_value
|
||||
mock_update.assert_called_once_with(
|
||||
self.context, worker.id,
|
||||
filters={'service_id': worker.service_id,
|
||||
'status': worker.status,
|
||||
'updated_at': worker.updated_at},
|
||||
service_id=mock.sentinel.service_id,
|
||||
status=mock.sentinel.status,
|
||||
orm_worker=worker)
|
||||
self.assertEqual(worker, backup.worker)
|
||||
|
||||
@mock.patch('cinder.db.worker_create', autospec=True)
|
||||
@mock.patch('cinder.db.worker_get', autospec=True)
|
||||
def test_set_worker_direct(self, mock_get, mock_create):
|
||||
"""Test set worker for direct call (non rpc call)."""
|
||||
mock_get.side_effect = exception.WorkerNotFound
|
||||
service_id = mock.sentinel.service_id
|
||||
service.Service.service_id = service_id
|
||||
mock_create.return_value = mock.Mock(service_id=service_id,
|
||||
status=mock.sentinel.status,
|
||||
deleted=False, cleaning=False)
|
||||
|
||||
backup = Backup(_context=self.context, status=mock.sentinel.status,
|
||||
id=mock.sentinel.id)
|
||||
|
||||
backup.set_worker()
|
||||
mock_get.assert_called_once_with(self.context, resource_type='Backup',
|
||||
resource_id=mock.sentinel.id)
|
||||
mock_create.assert_called_once_with(self.context,
|
||||
status=mock.sentinel.status,
|
||||
resource_type='Backup',
|
||||
resource_id=mock.sentinel.id,
|
||||
service_id=service_id)
|
||||
self.assertEqual(mock_create.return_value, backup.worker)
|
||||
|
||||
@mock.patch('cinder.db.worker_update', autospec=True)
|
||||
@mock.patch('cinder.db.worker_get', autospec=True)
|
||||
def test_set_worker_claim_from_another_host(self, mock_get, mock_update):
|
||||
"""Test set worker when the job was started on another failed host."""
|
||||
service_id = mock.sentinel.service_id
|
||||
service.Service.service_id = service_id
|
||||
worker = mock.Mock(service_id=mock.sentinel.service_id2,
|
||||
status=mock.sentinel.status, cleaning=False,
|
||||
updated_at=mock.sentinel.updated_at)
|
||||
mock_get.return_value = worker
|
||||
|
||||
backup = Backup(_context=self.context, status=mock.sentinel.status,
|
||||
id=mock.sentinel.id)
|
||||
|
||||
backup.set_worker()
|
||||
|
||||
mock_update.assert_called_once_with(
|
||||
self.context, worker.id,
|
||||
filters={'service_id': mock.sentinel.service_id2,
|
||||
'status': mock.sentinel.status,
|
||||
'updated_at': mock.sentinel.updated_at},
|
||||
service_id=service_id, status=mock.sentinel.status,
|
||||
orm_worker=worker)
|
||||
self.assertEqual(worker, backup.worker)
|
||||
|
||||
@mock.patch('cinder.db.worker_create', autospec=True)
|
||||
@mock.patch('cinder.db.worker_get', autospec=True)
|
||||
def test_set_worker_race_condition_fail(self, mock_get, mock_create):
|
||||
"""Test we cannot claim a work if we lose race condition."""
|
||||
service.Service.service_id = mock.sentinel.service_id
|
||||
mock_get.side_effect = exception.WorkerNotFound
|
||||
mock_create.side_effect = exception.WorkerExists(type='type', id='id')
|
||||
|
||||
backup = Backup(_context=self.context, status=mock.sentinel.status,
|
||||
id=mock.sentinel.id)
|
||||
|
||||
self.assertRaises(exception.CleanableInUse, backup.set_worker)
|
||||
self.assertTrue(mock_get.called)
|
||||
self.assertTrue(mock_create.called)
|
||||
|
||||
@mock.patch('cinder.db.worker_update', autospec=True)
|
||||
@mock.patch('cinder.db.worker_get', autospec=True)
|
||||
def test_set_worker_claim_fail_after_get(self, mock_get, mock_update):
|
||||
"""Test we don't have race condition if worker changes after get."""
|
||||
service.Service.service_id = mock.sentinel.service_id
|
||||
worker = mock.Mock(service_id=mock.sentinel.service_id2,
|
||||
status=mock.sentinel.status, deleted=False,
|
||||
cleaning=False)
|
||||
mock_get.return_value = worker
|
||||
mock_update.side_effect = exception.WorkerNotFound
|
||||
|
||||
backup = Backup(_context=self.context, status=mock.sentinel.status,
|
||||
id=mock.sentinel.id)
|
||||
|
||||
self.assertRaises(exception.CleanableInUse, backup.set_worker)
|
||||
self.assertTrue(mock_get.called)
|
||||
self.assertTrue(mock_update.called)
|
||||
|
||||
@mock.patch('cinder.db.worker_destroy')
|
||||
def test_unset_worker(self, destroy_mock):
|
||||
backup = Backup(_context=self.context, status=mock.sentinel.status,
|
||||
id=mock.sentinel.id)
|
||||
worker = mock.Mock()
|
||||
backup.worker = worker
|
||||
backup.unset_worker()
|
||||
destroy_mock.assert_called_once_with(self.context, id=worker.id,
|
||||
status=worker.status,
|
||||
service_id=worker.service_id)
|
||||
self.assertIsNone(backup.worker)
|
||||
|
||||
@mock.patch('cinder.db.worker_destroy')
|
||||
def test_unset_worker_not_set(self, destroy_mock):
|
||||
backup = Backup(_context=self.context, status=mock.sentinel.status,
|
||||
id=mock.sentinel.id)
|
||||
backup.unset_worker()
|
||||
self.assertFalse(destroy_mock.called)
|
71
cinder/tests/unit/objects/test_cleanup_request.py
Normal file
71
cinder/tests/unit/objects/test_cleanup_request.py
Normal file
@ -0,0 +1,71 @@
|
||||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from cinder import objects
|
||||
from cinder.tests.unit import fake_constants as fake
|
||||
from cinder.tests.unit import objects as test_objects
|
||||
|
||||
|
||||
class TestCleanupRequest(test_objects.BaseObjectsTestCase):
|
||||
|
||||
all_fields = ('service_id', 'cluster_name', 'host', 'binary', 'service_id',
|
||||
'is_up', 'disabled', 'resource_id', 'resource_type', 'until')
|
||||
|
||||
default = {'is_up': False}
|
||||
|
||||
def setUp(self):
|
||||
super(TestCleanupRequest, self).setUp()
|
||||
|
||||
self.fields = dict(service_id=1, cluster_name='cluster_name',
|
||||
host='host_name', binary='binary_name', is_up=False,
|
||||
resource_id=fake.VOLUME_ID, resource_type='Volume',
|
||||
until=timeutils.utcnow(with_timezone=True),
|
||||
disabled=True)
|
||||
|
||||
def _req_as_dict(self, req):
|
||||
return {field: getattr(req, field) for field in self.all_fields}
|
||||
|
||||
def _req_default(self, field):
|
||||
return self.default.get(field, None)
|
||||
|
||||
def test_init_all_set(self):
|
||||
"""Test __init__ when setting all field values."""
|
||||
req = objects.CleanupRequest(mock.sentinel.context, **self.fields)
|
||||
self.assertDictEqual(self.fields, self._req_as_dict(req))
|
||||
|
||||
def test_init_default(self):
|
||||
"""Test __init__ when one field is missing."""
|
||||
for field in self.fields:
|
||||
fields = self.fields.copy()
|
||||
del fields[field]
|
||||
req = objects.CleanupRequest(mock.sentinel.context, **fields)
|
||||
fields[field] = self._req_default(field)
|
||||
self.assertDictEqual(fields, self._req_as_dict(req))
|
||||
|
||||
def test_init_defaults(self):
|
||||
"""Test __init__ when only one field is set."""
|
||||
all_defaults = {field: self._req_default(field)
|
||||
for field in self.all_fields}
|
||||
|
||||
for field in self.fields:
|
||||
fields = {field: self.fields[field]}
|
||||
req = objects.CleanupRequest(mock.sentinel.context, **fields)
|
||||
expected = all_defaults.copy()
|
||||
expected.update(fields)
|
||||
self.assertDictEqual(expected, self._req_as_dict(req))
|
@ -26,6 +26,7 @@ object_data = {
|
||||
'Backup': '1.4-c50f7a68bb4c400dd53dd219685b3992',
|
||||
'BackupImport': '1.4-c50f7a68bb4c400dd53dd219685b3992',
|
||||
'BackupList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',
|
||||
'CleanupRequest': '1.0-e7c688b893e1d5537ccf65cc3eb10a28',
|
||||
'Cluster': '1.0-6f06e867c073e9d31722c53b0a9329b8',
|
||||
'ClusterList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',
|
||||
'CGSnapshot': '1.0-3212ac2b4c2811b7134fb9ba2c49ff74',
|
||||
|
Loading…
Reference in New Issue
Block a user