Add worker's DB operations

This patch adds DB sqlalchemy operations for the new workers table that
will be used for the new cleanup mechanism implemented to support
Active-Active configurations.

They will be used for non Active-Active configurations as well.

Specs: https://review.openstack.org/236977

Implements: blueprint cinder-volume-active-active-support
Change-Id: Id1f09edaa4fb803d522dd1859f6f3ca8b18771ab
This commit is contained in:
Gorka Eguileor 2016-03-23 16:27:00 +01:00
parent 7294cf0352
commit ee451e548a
7 changed files with 446 additions and 43 deletions

View File

@ -1305,6 +1305,41 @@ def message_destroy(context, message_id):
###################
def worker_create(context, **values):
"""Create a worker entry from optional arguments."""
return IMPL.worker_create(context, **values)
def worker_get(context, **filters):
"""Get a worker or raise exception if it does not exist."""
return IMPL.worker_get(context, **filters)
def worker_get_all(context, until=None, db_filters=None, **filters):
"""Get all workers that match given criteria."""
return IMPL.worker_get_all(context, until=until, db_filters=db_filters,
**filters)
def worker_update(context, id, filters=None, orm_worker=None, **values):
"""Update a worker with given values."""
return IMPL.worker_update(context, id, filters=filters,
orm_worker=orm_worker, **values)
def worker_claim_for_cleanup(context, claimer_id, orm_worker):
"""Soft delete a worker, change the service_id and update the worker."""
return IMPL.worker_claim_for_cleanup(context, claimer_id, orm_worker)
def worker_destroy(context, **filters):
"""Delete a worker (no soft delete)."""
return IMPL.worker_destroy(context, **filters)
###################
def resource_exists(context, model, resource_id):
return IMPL.resource_exists(context, model, resource_id)

View File

@ -4989,6 +4989,107 @@ def image_volume_cache_get_all_for_host(context, host):
all()
###################
def _worker_query(context, session=None, until=None, db_filters=None,
**filters):
# Remove all filters based on the workers table that are set to None
filters = _clean_filters(filters)
if filters and not is_valid_model_filters(models.Worker, filters):
return None
query = model_query(context, models.Worker, session=session)
if until:
db_filters = list(db_filters) if db_filters else []
# Since we set updated_at at creation time we don't need to check
# created_at field.
db_filters.append(models.Worker.updated_at <= until)
if db_filters:
query = query.filter(and_(*db_filters))
if filters:
query = query.filter_by(**filters)
return query
def worker_create(context, **values):
"""Create a worker entry from optional arguments."""
worker = models.Worker(**values)
session = get_session()
try:
with session.begin():
worker.save(session)
except db_exc.DBDuplicateEntry:
raise exception.WorkerExists(type=values.get('resource_type'),
id=values.get('resource_id'))
return worker
def worker_get(context, **filters):
"""Get a worker or raise exception if it does not exist."""
query = _worker_query(context, **filters)
worker = query.first() if query else None
if not worker:
raise exception.WorkerNotFound(**filters)
return worker
def worker_get_all(context, **filters):
"""Get all workers that match given criteria."""
query = _worker_query(context, **filters)
return query.all() if query else []
def _orm_worker_update(worker, values):
if not worker:
return
for key, value in values.items():
setattr(worker, key, value)
def worker_update(context, id, filters=None, orm_worker=None, **values):
"""Update a worker with given values."""
filters = filters or {}
query = _worker_query(context, id=id, **filters)
result = query.update(values)
if not result:
raise exception.WorkerNotFound(id=id, **filters)
_orm_worker_update(orm_worker, values)
return result
def worker_claim_for_cleanup(context, claimer_id, orm_worker):
"""Claim a worker entry for cleanup."""
# We set updated_at value so we are sure we update the DB entry even if the
# service_id is the same in the DB, thus flagging the claim.
values = {'service_id': claimer_id,
'updated_at': timeutils.utcnow()}
# We only update the worker entry if it hasn't been claimed by other host
# or thread
query = _worker_query(context,
status=orm_worker.status,
service_id=orm_worker.service_id,
until=orm_worker.updated_at,
id=orm_worker.id)
result = query.update(values, synchronize_session=False)
if result:
_orm_worker_update(orm_worker, values)
return result
def worker_destroy(context, **filters):
"""Delete a worker (no soft delete)."""
query = _worker_query(context, **filters)
return query.delete()
###############################
@ -5012,7 +5113,10 @@ def get_model_for_versioned_object(versioned_object):
'CGSnapshot': models.Cgsnapshot,
}
model_name = versioned_object.obj_name()
if isinstance(versioned_object, six.string_types):
model_name = versioned_object
else:
model_name = versioned_object.obj_name()
return (VO_TO_MODEL_EXCEPTIONS.get(model_name) or
getattr(models, model_name))

View File

@ -759,6 +759,7 @@ def register_models():
ConsistencyGroup,
Cgsnapshot,
Cluster,
Worker,
)
engine = create_engine(CONF.database.connection, echo=False)
for model in models:

View File

@ -402,6 +402,20 @@ class ServiceTooOld(Invalid):
message = _("Service is too old to fulfil this request.")
class WorkerNotFound(NotFound):
message = _("Worker with %s could not be found.")
def __init__(self, message=None, **kwargs):
keys_list = ('{0}=%({0})s'.format(key) for key in kwargs)
placeholder = ', '.join(keys_list)
self.message = self.message % placeholder
super(WorkerNotFound, self).__init__(message, **kwargs)
class WorkerExists(Duplicate):
message = _("Worker for %(type)s %(id)s already exists.")
class ClusterNotFound(NotFound):
message = _('Cluster %(id)s could not be found.')

View File

@ -37,6 +37,7 @@ from oslo_messaging import conffixture as messaging_conffixture
from oslo_utils import strutils
from oslo_utils import timeutils
from oslotest import moxstubout
import six
import testtools
from cinder.common import config # noqa Need to register global_opts
@ -395,3 +396,43 @@ class TestCase(testtools.TestCase):
self.assertEqual(call[0], posargs[0])
self.assertEqual(call[1], posargs[2])
class ModelsObjectComparatorMixin(object):
def _dict_from_object(self, obj, ignored_keys):
if ignored_keys is None:
ignored_keys = []
if isinstance(obj, dict):
items = obj.items()
else:
items = obj.iteritems()
return {k: v for k, v in items
if k not in ignored_keys}
def _assertEqualObjects(self, obj1, obj2, ignored_keys=None):
obj1 = self._dict_from_object(obj1, ignored_keys)
obj2 = self._dict_from_object(obj2, ignored_keys)
self.assertEqual(
len(obj1), len(obj2),
"Keys mismatch: %s" % six.text_type(
set(obj1.keys()) ^ set(obj2.keys())))
for key, value in obj1.items():
self.assertEqual(value, obj2[key])
def _assertEqualListsOfObjects(self, objs1, objs2, ignored_keys=None,
msg=None):
obj_to_dict = lambda o: self._dict_from_object(o, ignored_keys)
sort_key = lambda d: [d[k] for k in sorted(d)]
conv_and_sort = lambda obj: sorted(map(obj_to_dict, obj), key=sort_key)
self.assertListEqual(conv_and_sort(objs1), conv_and_sort(objs2),
msg=msg)
def _assertEqualListsOfPrimitivesAsSets(self, primitives1, primitives2):
self.assertEqual(len(primitives1), len(primitives2))
for primitive in primitives1:
self.assertIn(primitive, primitives2)
for primitive in primitives2:
self.assertIn(primitive, primitives1)

View File

@ -22,7 +22,6 @@ import mock
from oslo_config import cfg
from oslo_utils import timeutils
from oslo_utils import uuidutils
import six
from cinder.api import common
from cinder import context
@ -72,47 +71,7 @@ def _quota_reserve(context, project_id):
)
class ModelsObjectComparatorMixin(object):
def _dict_from_object(self, obj, ignored_keys):
if ignored_keys is None:
ignored_keys = []
if isinstance(obj, dict):
items = obj.items()
else:
items = obj.iteritems()
return {k: v for k, v in items
if k not in ignored_keys}
def _assertEqualObjects(self, obj1, obj2, ignored_keys=None):
obj1 = self._dict_from_object(obj1, ignored_keys)
obj2 = self._dict_from_object(obj2, ignored_keys)
self.assertEqual(
len(obj1), len(obj2),
"Keys mismatch: %s" % six.text_type(
set(obj1.keys()) ^ set(obj2.keys())))
for key, value in obj1.items():
self.assertEqual(value, obj2[key])
def _assertEqualListsOfObjects(self, objs1, objs2, ignored_keys=None,
msg=None):
obj_to_dict = lambda o: self._dict_from_object(o, ignored_keys)
sort_key = lambda d: [d[k] for k in sorted(d)]
conv_and_sort = lambda obj: sorted(map(obj_to_dict, obj), key=sort_key)
self.assertListEqual(conv_and_sort(objs1), conv_and_sort(objs2),
msg=msg)
def _assertEqualListsOfPrimitivesAsSets(self, primitives1, primitives2):
self.assertEqual(len(primitives1), len(primitives2))
for primitive in primitives1:
self.assertIn(primitive, primitives2)
for primitive in primitives2:
self.assertIn(primitive, primitives1)
class BaseTest(test.TestCase, ModelsObjectComparatorMixin):
class BaseTest(test.TestCase, test.ModelsObjectComparatorMixin):
def setUp(self):
super(BaseTest, self).setUp()
self.ctxt = context.get_admin_context()

View File

@ -0,0 +1,249 @@
# Copyright (c) 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Unit tests for cinder.db.api.Worker"""
import time
import uuid
from oslo_db import exception as db_exception
import six
from cinder import context
from cinder import db
from cinder import exception
from cinder import test
from cinder.tests.unit import fake_constants as fake
class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin):
worker_fields = {'resource_type': 'Volume',
'resource_id': fake.VOLUME_ID,
'status': 'creating'}
def _uuid(self):
return six.text_type(uuid.uuid4())
def setUp(self):
super(DBAPIWorkerTestCase, self).setUp()
self.ctxt = context.get_admin_context()
def test_worker_create_and_get(self):
"""Test basic creation of a worker record."""
worker = db.worker_create(self.ctxt, **self.worker_fields)
db_worker = db.worker_get(self.ctxt, id=worker.id)
self._assertEqualObjects(worker, db_worker)
def test_worker_create_unique_constrains(self):
"""Test when we use an already existing resource type and id."""
db.worker_create(self.ctxt, **self.worker_fields)
self.assertRaises(exception.WorkerExists, db.worker_create,
self.ctxt,
resource_type=self.worker_fields['resource_type'],
resource_id=self.worker_fields['resource_id'],
status='not_' + self.worker_fields['status'])
def test_worker_create_missing_required_field(self):
"""Try creating a worker with a missing required field."""
for field in self.worker_fields:
params = self.worker_fields.copy()
del params[field]
self.assertRaises(db_exception.DBError, db.worker_create,
self.ctxt, **params)
def test_worker_create_invalid_field(self):
"""Try creating a worker with a non existent db field."""
self.assertRaises(TypeError, db.worker_create, self.ctxt,
myfield='123', **self.worker_fields)
def test_worker_get_non_existent(self):
"""Check basic non existent worker record get method."""
db.worker_create(self.ctxt, **self.worker_fields)
self.assertRaises(exception.WorkerNotFound, db.worker_get,
self.ctxt, service_id='1', **self.worker_fields)
def _create_workers(self, num, read_back=False, **fields):
workers = []
base_params = self.worker_fields.copy()
base_params.update(fields)
for i in range(num):
params = base_params.copy()
params['resource_id'] = self._uuid()
workers.append(db.worker_create(self.ctxt, **params))
if read_back:
for i in range(len(workers)):
workers[i] = db.worker_get(self.ctxt, id=workers[i].id)
return workers
def test_worker_get_all(self):
"""Test basic get_all method."""
self._create_workers(1)
service = db.service_create(self.ctxt, {})
workers = self._create_workers(3, service_id=service.id)
db_workers = db.worker_get_all(self.ctxt, service_id=service.id)
self._assertEqualListsOfObjects(workers, db_workers)
def test_worker_get_all_until(self):
"""Test get_all until a specific time."""
workers = self._create_workers(3, read_back=True)
timestamp = workers[-1].updated_at
time.sleep(0.1)
self._create_workers(3)
db_workers = db.worker_get_all(self.ctxt, until=timestamp)
self._assertEqualListsOfObjects(workers, db_workers)
def test_worker_get_all_returns_empty(self):
"""Test that get_all returns an empty list when there's no results."""
self._create_workers(3, deleted=True)
db_workers = db.worker_get_all(self.ctxt)
self.assertListEqual([], db_workers)
def test_worker_update_not_exists(self):
"""Test worker update when the worker doesn't exist."""
self.assertRaises(exception.WorkerNotFound, db.worker_update,
self.ctxt, 1)
def test_worker_update(self):
"""Test basic worker update."""
worker = self._create_workers(1)[0]
worker = db.worker_get(self.ctxt, id=worker.id)
res = db.worker_update(self.ctxt, worker.id, service_id=1)
self.assertEqual(1, res)
worker.service_id = 1
db_worker = db.worker_get(self.ctxt, id=worker.id)
self._assertEqualObjects(worker, db_worker, ['updated_at'])
def test_worker_update_update_orm(self):
"""Test worker update updating the worker orm object."""
worker = self._create_workers(1)[0]
res = db.worker_update(self.ctxt, worker.id, orm_worker=worker,
service_id=1)
self.assertEqual(1, res)
db_worker = db.worker_get(self.ctxt, id=worker.id)
self._assertEqualObjects(worker, db_worker, ['updated_at'])
def test_worker_destroy(self):
"""Test that worker destroy really deletes the DB entry."""
worker = self._create_workers(1)[0]
res = db.worker_destroy(self.ctxt, id=worker.id)
self.assertEqual(1, res)
db_workers = db.worker_get_all(self.ctxt, read_deleted='yes')
self.assertListEqual([], db_workers)
def test_worker_destroy_non_existent(self):
"""Test that worker destroy returns 0 when entry doesn't exist."""
res = db.worker_destroy(self.ctxt, id=1)
self.assertEqual(0, res)
def test_worker_claim(self):
"""Test worker claim of normal DB entry."""
service_id = 1
worker = db.worker_create(self.ctxt, resource_type='Volume',
resource_id=fake.VOLUME_ID,
status='deleting')
res = db.worker_claim_for_cleanup(self.ctxt, service_id, worker)
self.assertEqual(1, res)
db_worker = db.worker_get(self.ctxt, id=worker.id)
self._assertEqualObjects(worker, db_worker, ['updated_at'])
self.assertEqual(service_id, db_worker.service_id)
self.assertEqual(worker.service_id, db_worker.service_id)
def test_worker_claim_fails_status_change(self):
"""Test that claim fails if the work entry has changed its status."""
worker = db.worker_create(self.ctxt, resource_type='Volume',
resource_id=fake.VOLUME_ID,
status='deleting')
worker.status = 'creating'
res = db.worker_claim_for_cleanup(self.ctxt, 1, worker)
self.assertEqual(0, res)
db_worker = db.worker_get(self.ctxt, id=worker.id)
self._assertEqualObjects(worker, db_worker, ['status'])
self.assertIsNone(db_worker.service_id)
def test_worker_claim_fails_service_change(self):
"""Test that claim fails on worker service change."""
failed_service = 1
working_service = 2
this_service = 3
worker = db.worker_create(self.ctxt, resource_type='Volume',
resource_id=fake.VOLUME_ID,
status='deleting',
service_id=working_service)
worker.service_id = failed_service
res = db.worker_claim_for_cleanup(self.ctxt, this_service, worker)
self.assertEqual(0, res)
db_worker = db.worker_get(self.ctxt, id=worker.id)
self.assertEqual(working_service, db_worker.service_id)
def test_worker_claim_same_service(self):
"""Test worker claim of a DB entry that has our service_id."""
service_id = 1
worker = db.worker_create(self.ctxt, resource_type='Volume',
resource_id=fake.VOLUME_ID,
status='deleting', service_id=service_id)
# Read from DB to get updated_at field
worker = db.worker_get(self.ctxt, id=worker.id)
claimed_worker = db.worker_get(self.ctxt, id=worker.id)
res = db.worker_claim_for_cleanup(self.ctxt,
service_id,
claimed_worker)
self.assertEqual(1, res)
db_worker = db.worker_get(self.ctxt, id=worker.id)
self._assertEqualObjects(claimed_worker, db_worker)
self._assertEqualObjects(worker, db_worker, ['updated_at'])
self.assertNotEqual(worker.updated_at, db_worker.updated_at)
def test_worker_claim_fails_this_service_claimed(self):
"""Test claim fails when worker was already claimed by this service."""
service_id = 1
worker = db.worker_create(self.ctxt, resource_type='Volume',
resource_id=fake.VOLUME_ID,
status='creating',
service_id=service_id)
# Read it back to have the updated_at value
worker = db.worker_get(self.ctxt, id=worker.id)
claimed_worker = db.worker_get(self.ctxt, id=worker.id)
time.sleep(0.1)
# Simulate that this service starts processing this entry
res = db.worker_claim_for_cleanup(self.ctxt,
service_id,
claimed_worker)
self.assertEqual(1, res)
res = db.worker_claim_for_cleanup(self.ctxt, service_id, worker)
self.assertEqual(0, res)
db_worker = db.worker_get(self.ctxt, id=worker.id)
self._assertEqualObjects(claimed_worker, db_worker)
self._assertEqualObjects(worker, db_worker, ['updated_at'])
self.assertNotEqual(worker.updated_at, db_worker.updated_at)