Prevent claiming and updating races on worker
Current code for claiming and updating workers relays on the updated_at field to determine when a DB record has changed. This is usually enough for any DB with sub-second resolution since the likeliness of us having a race condition is very unlikely. But not all DBs support sub-second resolution, and in those cases the likeliness of a race condition increases considerable since we are working with a 1 second granularity. This patch completely removes the possibility of having race conditions using a specific integer field that will be increased on each DB update. It is compatible with both types of DBMs and will also work with rolling upgrades. The reason why we are not using the version counting provided by SQLAlchemy [1] is because we require an ORM instance to use the feature, and in some of our usages we don't have an instance to work with, and adding an additional read query is unnecessarily inefficient. Additionally we will no longer see spurious errors in unit test test_do_cleanup_not_cleaning_already_claimed_by_us. [1] http://docs.sqlalchemy.org/en/latest/orm/versioning.html Implements: blueprint cinder-volume-active-active-support Change-Id: Ief9333a2389d98f5d0a11d8da94d160de8ecce0e
This commit is contained in:
parent
2195885e77
commit
24bab6b7f6
@ -6600,6 +6600,8 @@ def worker_update(context, id, filters=None, orm_worker=None, **values):
|
||||
# we set it here instead of letting SQLAlchemy do it to be able to update
|
||||
# the orm_worker.
|
||||
_worker_set_updated_at_field(values)
|
||||
reference = orm_worker or models.Worker
|
||||
values['race_preventer'] = reference.race_preventer + 1
|
||||
result = query.update(values)
|
||||
if not result:
|
||||
raise exception.WorkerNotFound(id=id, **filters)
|
||||
@ -6612,6 +6614,7 @@ def worker_claim_for_cleanup(context, claimer_id, orm_worker):
|
||||
# We set updated_at value so we are sure we update the DB entry even if the
|
||||
# service_id is the same in the DB, thus flagging the claim.
|
||||
values = {'service_id': claimer_id,
|
||||
'race_preventer': orm_worker.race_preventer + 1,
|
||||
'updated_at': timeutils.utcnow()}
|
||||
_worker_set_updated_at_field(values)
|
||||
|
||||
@ -6620,6 +6623,7 @@ def worker_claim_for_cleanup(context, claimer_id, orm_worker):
|
||||
query = _worker_query(context,
|
||||
status=orm_worker.status,
|
||||
service_id=orm_worker.service_id,
|
||||
race_preventer=orm_worker.race_preventer,
|
||||
until=orm_worker.updated_at,
|
||||
id=orm_worker.id)
|
||||
|
||||
|
@ -0,0 +1,27 @@
|
||||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from sqlalchemy import Column, Integer, MetaData, Table, text
|
||||
|
||||
|
||||
def upgrade(migrate_engine):
|
||||
"""Add race preventer field to workers table."""
|
||||
meta = MetaData()
|
||||
meta.bind = migrate_engine
|
||||
|
||||
workers = Table('workers', meta, autoload=True)
|
||||
race_preventer = Column('race_preventer', Integer, nullable=False,
|
||||
default=0, server_default=text('0'))
|
||||
race_preventer.create(workers, populate_default=True)
|
@ -862,6 +862,9 @@ class Worker(BASE, CinderBase):
|
||||
# Service that is currently processing the operation
|
||||
service_id = Column(Integer, nullable=True)
|
||||
|
||||
# To prevent claiming and updating races
|
||||
race_preventer = Column(Integer, nullable=False, default=0)
|
||||
|
||||
# This is a flag we don't need to store in the DB as it is only used when
|
||||
# we are doing the cleanup to let decorators know
|
||||
cleaning = False
|
||||
|
@ -153,13 +153,15 @@ class CinderCleanableObject(base.CinderPersistentObject):
|
||||
# to update DB.
|
||||
if (worker.service_id != service_id or worker.status != self.status):
|
||||
try:
|
||||
db.worker_update(self._context, worker.id,
|
||||
filters={'service_id': worker.service_id,
|
||||
'status': worker.status,
|
||||
'updated_at': worker.updated_at},
|
||||
service_id=service_id,
|
||||
status=self.status,
|
||||
orm_worker=worker)
|
||||
db.worker_update(
|
||||
self._context, worker.id,
|
||||
filters={'service_id': worker.service_id,
|
||||
'status': worker.status,
|
||||
'race_preventer': worker.race_preventer,
|
||||
'updated_at': worker.updated_at},
|
||||
service_id=service_id,
|
||||
status=self.status,
|
||||
orm_worker=worker)
|
||||
except exception.WorkerNotFound:
|
||||
self.worker = None
|
||||
raise exception.CleanableInUse(type=self.__class__.__name__,
|
||||
|
@ -181,6 +181,7 @@ class TestCleanable(test_objects.BaseObjectsTestCase):
|
||||
self.context, worker.id,
|
||||
filters={'service_id': worker.service_id,
|
||||
'status': worker.status,
|
||||
'race_preventer': worker.race_preventer,
|
||||
'updated_at': worker.updated_at},
|
||||
service_id=mock.sentinel.service_id,
|
||||
status=mock.sentinel.status,
|
||||
@ -231,6 +232,7 @@ class TestCleanable(test_objects.BaseObjectsTestCase):
|
||||
self.context, worker.id,
|
||||
filters={'service_id': mock.sentinel.service_id2,
|
||||
'status': mock.sentinel.status,
|
||||
'race_preventer': worker.race_preventer,
|
||||
'updated_at': mock.sentinel.updated_at},
|
||||
service_id=service_id, status=mock.sentinel.status,
|
||||
orm_worker=worker)
|
||||
@ -324,6 +326,7 @@ class TestCleanable(test_objects.BaseObjectsTestCase):
|
||||
self.context, worker.id,
|
||||
filters={'service_id': worker.service_id,
|
||||
'status': worker.status,
|
||||
'race_preventer': worker.race_preventer,
|
||||
'updated_at': worker.updated_at},
|
||||
service_id=mock.sentinel.service_id,
|
||||
status='cleanable',
|
||||
@ -368,6 +371,7 @@ class TestCleanable(test_objects.BaseObjectsTestCase):
|
||||
self.context, worker.id,
|
||||
filters={'service_id': worker.service_id,
|
||||
'status': worker.status,
|
||||
'race_preventer': worker.race_preventer,
|
||||
'updated_at': worker.updated_at},
|
||||
service_id=mock.sentinel.service_id,
|
||||
status='cleanable',
|
||||
|
@ -160,7 +160,9 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin):
|
||||
worker.service_id = 1
|
||||
|
||||
db_worker = db.worker_get(self.ctxt, id=worker.id)
|
||||
self._assertEqualObjects(worker, db_worker, ['updated_at'])
|
||||
self._assertEqualObjects(worker, db_worker,
|
||||
['updated_at', 'race_preventer'])
|
||||
self.assertEqual(worker.race_preventer + 1, db_worker.race_preventer)
|
||||
|
||||
def test_worker_update_no_subsecond(self):
|
||||
"""Test basic worker update."""
|
||||
@ -174,8 +176,10 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin):
|
||||
worker.service_id = 1
|
||||
|
||||
db_worker = db.worker_get(self.ctxt, id=worker.id)
|
||||
self._assertEqualObjects(worker, db_worker, ['updated_at'])
|
||||
self._assertEqualObjects(worker, db_worker,
|
||||
['updated_at', 'race_preventer'])
|
||||
self.assertEqual(0, db_worker.updated_at.microsecond)
|
||||
self.assertEqual(worker.race_preventer + 1, db_worker.race_preventer)
|
||||
|
||||
def test_worker_update_update_orm(self):
|
||||
"""Test worker update updating the worker orm object."""
|
||||
@ -267,8 +271,10 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin):
|
||||
db_worker = db.worker_get(self.ctxt, id=worker.id)
|
||||
|
||||
self._assertEqualObjects(claimed_worker, db_worker)
|
||||
self._assertEqualObjects(worker, db_worker, ['updated_at'])
|
||||
self._assertEqualObjects(worker, db_worker,
|
||||
['updated_at', 'race_preventer'])
|
||||
self.assertNotEqual(worker.updated_at, db_worker.updated_at)
|
||||
self.assertEqual(worker.race_preventer + 1, db_worker.race_preventer)
|
||||
|
||||
def test_worker_claim_fails_this_service_claimed(self):
|
||||
"""Test claim fails when worker was already claimed by this service."""
|
||||
@ -293,5 +299,7 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin):
|
||||
self.assertEqual(0, res)
|
||||
db_worker = db.worker_get(self.ctxt, id=worker.id)
|
||||
self._assertEqualObjects(claimed_worker, db_worker)
|
||||
self._assertEqualObjects(worker, db_worker, ['updated_at'])
|
||||
self._assertEqualObjects(worker, db_worker,
|
||||
['updated_at', 'race_preventer'])
|
||||
self.assertNotEqual(worker.updated_at, db_worker.updated_at)
|
||||
self.assertEqual(worker.race_preventer + 1, db_worker.race_preventer)
|
||||
|
@ -1092,6 +1092,12 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
|
||||
self.assertIsInstance(image_cache.c.cluster_name.type,
|
||||
self.VARCHAR_TYPE)
|
||||
|
||||
def _check_090(self, engine, data):
|
||||
"""Test adding race_preventer to workers table."""
|
||||
workers = db_utils.get_table(engine, 'workers')
|
||||
self.assertIsInstance(workers.c.race_preventer.type,
|
||||
self.INTEGER_TYPE)
|
||||
|
||||
def test_walk_versions(self):
|
||||
self.walk_versions(False, False)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user