From 24bab6b7f6dc89d579807f85f1f3a1cdc54759e2 Mon Sep 17 00:00:00 2001 From: Gorka Eguileor Date: Mon, 19 Dec 2016 17:50:39 +0100 Subject: [PATCH] Prevent claiming and updating races on worker Current code for claiming and updating workers relays on the updated_at field to determine when a DB record has changed. This is usually enough for any DB with sub-second resolution since the likeliness of us having a race condition is very unlikely. But not all DBs support sub-second resolution, and in those cases the likeliness of a race condition increases considerable since we are working with a 1 second granularity. This patch completely removes the possibility of having race conditions using a specific integer field that will be increased on each DB update. It is compatible with both types of DBMs and will also work with rolling upgrades. The reason why we are not using the version counting provided by SQLAlchemy [1] is because we require an ORM instance to use the feature, and in some of our usages we don't have an instance to work with, and adding an additional read query is unnecessarily inefficient. Additionally we will no longer see spurious errors in unit test test_do_cleanup_not_cleaning_already_claimed_by_us. [1] http://docs.sqlalchemy.org/en/latest/orm/versioning.html Implements: blueprint cinder-volume-active-active-support Change-Id: Ief9333a2389d98f5d0a11d8da94d160de8ecce0e --- cinder/db/sqlalchemy/api.py | 4 +++ .../090_add_race_preventer_to_workers.py | 27 +++++++++++++++++++ cinder/db/sqlalchemy/models.py | 3 +++ cinder/objects/cleanable.py | 16 ++++++----- cinder/tests/unit/objects/test_cleanable.py | 4 +++ cinder/tests/unit/test_db_worker_api.py | 16 ++++++++--- cinder/tests/unit/test_migrations.py | 6 +++++ 7 files changed, 65 insertions(+), 11 deletions(-) create mode 100644 cinder/db/sqlalchemy/migrate_repo/versions/090_add_race_preventer_to_workers.py diff --git a/cinder/db/sqlalchemy/api.py b/cinder/db/sqlalchemy/api.py index 5e4c9f771..04200ceaf 100644 --- a/cinder/db/sqlalchemy/api.py +++ b/cinder/db/sqlalchemy/api.py @@ -6600,6 +6600,8 @@ def worker_update(context, id, filters=None, orm_worker=None, **values): # we set it here instead of letting SQLAlchemy do it to be able to update # the orm_worker. _worker_set_updated_at_field(values) + reference = orm_worker or models.Worker + values['race_preventer'] = reference.race_preventer + 1 result = query.update(values) if not result: raise exception.WorkerNotFound(id=id, **filters) @@ -6612,6 +6614,7 @@ def worker_claim_for_cleanup(context, claimer_id, orm_worker): # We set updated_at value so we are sure we update the DB entry even if the # service_id is the same in the DB, thus flagging the claim. values = {'service_id': claimer_id, + 'race_preventer': orm_worker.race_preventer + 1, 'updated_at': timeutils.utcnow()} _worker_set_updated_at_field(values) @@ -6620,6 +6623,7 @@ def worker_claim_for_cleanup(context, claimer_id, orm_worker): query = _worker_query(context, status=orm_worker.status, service_id=orm_worker.service_id, + race_preventer=orm_worker.race_preventer, until=orm_worker.updated_at, id=orm_worker.id) diff --git a/cinder/db/sqlalchemy/migrate_repo/versions/090_add_race_preventer_to_workers.py b/cinder/db/sqlalchemy/migrate_repo/versions/090_add_race_preventer_to_workers.py new file mode 100644 index 000000000..715e67821 --- /dev/null +++ b/cinder/db/sqlalchemy/migrate_repo/versions/090_add_race_preventer_to_workers.py @@ -0,0 +1,27 @@ +# Copyright (c) 2016 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from sqlalchemy import Column, Integer, MetaData, Table, text + + +def upgrade(migrate_engine): + """Add race preventer field to workers table.""" + meta = MetaData() + meta.bind = migrate_engine + + workers = Table('workers', meta, autoload=True) + race_preventer = Column('race_preventer', Integer, nullable=False, + default=0, server_default=text('0')) + race_preventer.create(workers, populate_default=True) diff --git a/cinder/db/sqlalchemy/models.py b/cinder/db/sqlalchemy/models.py index 9fa208b2c..50e8425a6 100644 --- a/cinder/db/sqlalchemy/models.py +++ b/cinder/db/sqlalchemy/models.py @@ -862,6 +862,9 @@ class Worker(BASE, CinderBase): # Service that is currently processing the operation service_id = Column(Integer, nullable=True) + # To prevent claiming and updating races + race_preventer = Column(Integer, nullable=False, default=0) + # This is a flag we don't need to store in the DB as it is only used when # we are doing the cleanup to let decorators know cleaning = False diff --git a/cinder/objects/cleanable.py b/cinder/objects/cleanable.py index 184e14f33..e3e684aa0 100644 --- a/cinder/objects/cleanable.py +++ b/cinder/objects/cleanable.py @@ -153,13 +153,15 @@ class CinderCleanableObject(base.CinderPersistentObject): # to update DB. if (worker.service_id != service_id or worker.status != self.status): try: - db.worker_update(self._context, worker.id, - filters={'service_id': worker.service_id, - 'status': worker.status, - 'updated_at': worker.updated_at}, - service_id=service_id, - status=self.status, - orm_worker=worker) + db.worker_update( + self._context, worker.id, + filters={'service_id': worker.service_id, + 'status': worker.status, + 'race_preventer': worker.race_preventer, + 'updated_at': worker.updated_at}, + service_id=service_id, + status=self.status, + orm_worker=worker) except exception.WorkerNotFound: self.worker = None raise exception.CleanableInUse(type=self.__class__.__name__, diff --git a/cinder/tests/unit/objects/test_cleanable.py b/cinder/tests/unit/objects/test_cleanable.py index cb7f6fffb..7830f3902 100644 --- a/cinder/tests/unit/objects/test_cleanable.py +++ b/cinder/tests/unit/objects/test_cleanable.py @@ -181,6 +181,7 @@ class TestCleanable(test_objects.BaseObjectsTestCase): self.context, worker.id, filters={'service_id': worker.service_id, 'status': worker.status, + 'race_preventer': worker.race_preventer, 'updated_at': worker.updated_at}, service_id=mock.sentinel.service_id, status=mock.sentinel.status, @@ -231,6 +232,7 @@ class TestCleanable(test_objects.BaseObjectsTestCase): self.context, worker.id, filters={'service_id': mock.sentinel.service_id2, 'status': mock.sentinel.status, + 'race_preventer': worker.race_preventer, 'updated_at': mock.sentinel.updated_at}, service_id=service_id, status=mock.sentinel.status, orm_worker=worker) @@ -324,6 +326,7 @@ class TestCleanable(test_objects.BaseObjectsTestCase): self.context, worker.id, filters={'service_id': worker.service_id, 'status': worker.status, + 'race_preventer': worker.race_preventer, 'updated_at': worker.updated_at}, service_id=mock.sentinel.service_id, status='cleanable', @@ -368,6 +371,7 @@ class TestCleanable(test_objects.BaseObjectsTestCase): self.context, worker.id, filters={'service_id': worker.service_id, 'status': worker.status, + 'race_preventer': worker.race_preventer, 'updated_at': worker.updated_at}, service_id=mock.sentinel.service_id, status='cleanable', diff --git a/cinder/tests/unit/test_db_worker_api.py b/cinder/tests/unit/test_db_worker_api.py index 7fedacff0..44721c449 100644 --- a/cinder/tests/unit/test_db_worker_api.py +++ b/cinder/tests/unit/test_db_worker_api.py @@ -160,7 +160,9 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin): worker.service_id = 1 db_worker = db.worker_get(self.ctxt, id=worker.id) - self._assertEqualObjects(worker, db_worker, ['updated_at']) + self._assertEqualObjects(worker, db_worker, + ['updated_at', 'race_preventer']) + self.assertEqual(worker.race_preventer + 1, db_worker.race_preventer) def test_worker_update_no_subsecond(self): """Test basic worker update.""" @@ -174,8 +176,10 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin): worker.service_id = 1 db_worker = db.worker_get(self.ctxt, id=worker.id) - self._assertEqualObjects(worker, db_worker, ['updated_at']) + self._assertEqualObjects(worker, db_worker, + ['updated_at', 'race_preventer']) self.assertEqual(0, db_worker.updated_at.microsecond) + self.assertEqual(worker.race_preventer + 1, db_worker.race_preventer) def test_worker_update_update_orm(self): """Test worker update updating the worker orm object.""" @@ -267,8 +271,10 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin): db_worker = db.worker_get(self.ctxt, id=worker.id) self._assertEqualObjects(claimed_worker, db_worker) - self._assertEqualObjects(worker, db_worker, ['updated_at']) + self._assertEqualObjects(worker, db_worker, + ['updated_at', 'race_preventer']) self.assertNotEqual(worker.updated_at, db_worker.updated_at) + self.assertEqual(worker.race_preventer + 1, db_worker.race_preventer) def test_worker_claim_fails_this_service_claimed(self): """Test claim fails when worker was already claimed by this service.""" @@ -293,5 +299,7 @@ class DBAPIWorkerTestCase(test.TestCase, test.ModelsObjectComparatorMixin): self.assertEqual(0, res) db_worker = db.worker_get(self.ctxt, id=worker.id) self._assertEqualObjects(claimed_worker, db_worker) - self._assertEqualObjects(worker, db_worker, ['updated_at']) + self._assertEqualObjects(worker, db_worker, + ['updated_at', 'race_preventer']) self.assertNotEqual(worker.updated_at, db_worker.updated_at) + self.assertEqual(worker.race_preventer + 1, db_worker.race_preventer) diff --git a/cinder/tests/unit/test_migrations.py b/cinder/tests/unit/test_migrations.py index 8f19230ff..0411a51f6 100644 --- a/cinder/tests/unit/test_migrations.py +++ b/cinder/tests/unit/test_migrations.py @@ -1092,6 +1092,12 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin): self.assertIsInstance(image_cache.c.cluster_name.type, self.VARCHAR_TYPE) + def _check_090(self, engine, data): + """Test adding race_preventer to workers table.""" + workers = db_utils.get_table(engine, 'workers') + self.assertIsInstance(workers.c.race_preventer.type, + self.INTEGER_TYPE) + def test_walk_versions(self): self.walk_versions(False, False)