From c0314eded3268a05f85ae2d876acca1f86513144 Mon Sep 17 00:00:00 2001 From: ishant Date: Thu, 19 Feb 2015 22:07:32 -0800 Subject: [PATCH] DB changes for resource locking This patch adds engine_id and atomic_key column to the resource table for locking the resource and making all operations atomic. Also added the db api to update the resource table. The engine which updates the db first, gets the lock on the resource. Only updates with this engine are possible. Updates with other engine will not go through as they will fail to find the matching record in the db. Change-Id: Id8c3df7a439347652d3feca7481cd72762bf30ea Implements: blueprint convergence-resource-locking --- heat/db/api.py | 6 + heat/db/sqlalchemy/api.py | 12 ++ .../versions/058_resource_engine_id.py | 30 +++++ heat/db/sqlalchemy/models.py | 2 + heat/tests/db/test_migrations.py | 4 + heat/tests/test_sqlalchemy_api.py | 114 ++++++++++++++++++ 6 files changed, 168 insertions(+) create mode 100644 heat/db/sqlalchemy/migrate_repo/versions/058_resource_engine_id.py diff --git a/heat/db/api.py b/heat/db/api.py index 60207dcd5..b5f6a04b3 100644 --- a/heat/db/api.py +++ b/heat/db/api.py @@ -84,6 +84,12 @@ def resource_get_all(context): return IMPL.resource_get_all(context) +def resource_update(context, resource_id, values, atomic_key, + expected_engine_id=None): + return IMPL.resource_update(context, resource_id, values, atomic_key, + expected_engine_id) + + def resource_create(context, values): return IMPL.resource_create(context, values) diff --git a/heat/db/sqlalchemy/api.py b/heat/db/sqlalchemy/api.py index 658825b27..b1043c50c 100644 --- a/heat/db/sqlalchemy/api.py +++ b/heat/db/sqlalchemy/api.py @@ -154,6 +154,18 @@ def resource_get_all(context): return results +def resource_update(context, resource_id, values, atomic_key, + expected_engine_id=None): + session = _session(context) + with session.begin(): + values['atomic_key'] = atomic_key + 1 + rows_updated = session.query(models.Resource).filter_by( + id=resource_id, engine_id=expected_engine_id, + atomic_key=atomic_key).update(values) + + return bool(rows_updated) + + def resource_data_get_all(resource, data=None): """ Looks up resource_data by resource.id. If data is encrypted, diff --git a/heat/db/sqlalchemy/migrate_repo/versions/058_resource_engine_id.py b/heat/db/sqlalchemy/migrate_repo/versions/058_resource_engine_id.py new file mode 100644 index 000000000..bc8a74434 --- /dev/null +++ b/heat/db/sqlalchemy/migrate_repo/versions/058_resource_engine_id.py @@ -0,0 +1,30 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sqlalchemy + + +def upgrade(migrate_engine): + meta = sqlalchemy.MetaData(bind=migrate_engine) + resource = sqlalchemy.Table('resource', meta, autoload=True) + engine_id = sqlalchemy.Column('engine_id', sqlalchemy.String(length=36)) + atomic_key = sqlalchemy.Column('atomic_key', sqlalchemy.Integer) + engine_id.create(resource) + atomic_key.create(resource) + + +def downgrade(migrate_engine): + meta = sqlalchemy.MetaData(bind=migrate_engine) + resource = sqlalchemy.Table('resource', meta, autoload=True) + resource.c.engine_id.drop() + resource.c.atomic_key.drop() diff --git a/heat/db/sqlalchemy/models.py b/heat/db/sqlalchemy/models.py index ef626e6c2..ffaf52e61 100644 --- a/heat/db/sqlalchemy/models.py +++ b/heat/db/sqlalchemy/models.py @@ -287,6 +287,8 @@ class Resource(BASE, HeatBase, StateAware): # created/modified. (bug #1193269) updated_at = sqlalchemy.Column(sqlalchemy.DateTime) properties_data = sqlalchemy.Column('properties_data', types.Json) + engine_id = sqlalchemy.Column(sqlalchemy.String(36)) + atomic_key = sqlalchemy.Column(sqlalchemy.Integer) class WatchRule(BASE, HeatBase): diff --git a/heat/tests/db/test_migrations.py b/heat/tests/db/test_migrations.py index b2df27a2f..6d47b9ff7 100644 --- a/heat/tests/db/test_migrations.py +++ b/heat/tests/db/test_migrations.py @@ -505,6 +505,10 @@ class HeatMigrationsCheckers(test_migrations.WalkVersionsMixin, self.assertTrue(rd_matches_old_data(rd.key, rd.value, r.uuid)) + def _check_058(self, engine, data): + self.assertColumnExists(engine, 'resource', 'engine_id') + self.assertColumnExists(engine, 'resource', 'atomic_key') + class TestHeatMigrationsMySQL(HeatMigrationsCheckers, test_base.MySQLOpportunisticTestCase): diff --git a/heat/tests/test_sqlalchemy_api.py b/heat/tests/test_sqlalchemy_api.py index b2fc23f42..b625b5b30 100644 --- a/heat/tests/test_sqlalchemy_api.py +++ b/heat/tests/test_sqlalchemy_api.py @@ -2071,3 +2071,117 @@ class DBAPIServiceTest(common.HeatTestCase): db_api.service_delete(self.ctx, service.id, False) self.assertRaises(exception.ServiceNotFound, db_api.service_get, self.ctx, service.id) + + +class DBAPIResourceUpdateTest(common.HeatTestCase): + def setUp(self): + super(DBAPIResourceUpdateTest, self).setUp() + self.ctx = utils.dummy_context() + template = create_raw_template(self.ctx) + user_creds = create_user_creds(self.ctx) + stack = create_stack(self.ctx, template, user_creds) + self.resource = create_resource(self.ctx, stack, + atomic_key=0) + + def test_unlocked_resource_update(self): + values = {'engine_id': 'engine-1', + 'action': 'CREATE', + 'status': 'IN_PROGRESS'} + db_res = db_api.resource_get(self.ctx, self.resource.id) + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, None) + self.assertTrue(ret) + db_res = db_api.resource_get(self.ctx, self.resource.id) + self.assertEqual('engine-1', db_res.engine_id) + self.assertEqual('CREATE', db_res.action) + self.assertEqual('IN_PROGRESS', db_res.status) + self.assertEqual(1, db_res.atomic_key) + + def test_locked_resource_update_by_same_engine(self): + values = {'engine_id': 'engine-1', + 'action': 'CREATE', + 'status': 'IN_PROGRESS'} + db_res = db_api.resource_get(self.ctx, self.resource.id) + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, None) + self.assertTrue(ret) + db_res = db_api.resource_get(self.ctx, self.resource.id) + self.assertEqual('engine-1', db_res.engine_id) + self.assertEqual(1, db_res.atomic_key) + values = {'engine_id': 'engine-1', + 'action': 'CREATE', + 'status': 'FAILED'} + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, 'engine-1') + self.assertTrue(ret) + db_res = db_api.resource_get(self.ctx, self.resource.id) + self.assertEqual('engine-1', db_res.engine_id) + self.assertEqual('CREATE', db_res.action) + self.assertEqual('FAILED', db_res.status) + self.assertEqual(2, db_res.atomic_key) + + def test_locked_resource_update_by_other_engine(self): + values = {'engine_id': 'engine-1', + 'action': 'CREATE', + 'status': 'IN_PROGRESS'} + db_res = db_api.resource_get(self.ctx, self.resource.id) + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, None) + self.assertTrue(ret) + db_res = db_api.resource_get(self.ctx, self.resource.id) + self.assertEqual('engine-1', db_res.engine_id) + self.assertEqual(1, db_res.atomic_key) + values = {'engine_id': 'engine-2', + 'action': 'CREATE', + 'status': 'FAILED'} + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, 'engine-2') + self.assertFalse(ret) + + def test_release_resource_lock(self): + values = {'engine_id': 'engine-1', + 'action': 'CREATE', + 'status': 'IN_PROGRESS'} + db_res = db_api.resource_get(self.ctx, self.resource.id) + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, None) + self.assertTrue(ret) + db_res = db_api.resource_get(self.ctx, self.resource.id) + self.assertEqual('engine-1', db_res.engine_id) + self.assertEqual(1, db_res.atomic_key) + # Set engine id as None to release the lock + values = {'engine_id': None, + 'action': 'CREATE', + 'status': 'COMPLETE'} + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, 'engine-1') + self.assertTrue(ret) + db_res = db_api.resource_get(self.ctx, self.resource.id) + self.assertIsNone(db_res.engine_id) + self.assertEqual('CREATE', db_res.action) + self.assertEqual('COMPLETE', db_res.status) + self.assertEqual(2, db_res.atomic_key) + + def test_steal_resource_lock(self): + values = {'engine_id': 'engine-1', + 'action': 'CREATE', + 'status': 'IN_PROGRESS'} + db_res = db_api.resource_get(self.ctx, self.resource.id) + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, None) + self.assertTrue(ret) + db_res = db_api.resource_get(self.ctx, self.resource.id) + self.assertEqual('engine-1', db_res.engine_id) + self.assertEqual(1, db_res.atomic_key) + # Set engine id as engine-2 and pass expected engine id as old engine + # i.e engine-1 in db api steal the lock + values = {'engine_id': 'engine-2', + 'action': 'DELETE', + 'status': 'IN_PROGRESS'} + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, 'engine-1') + self.assertTrue(ret) + db_res = db_api.resource_get(self.ctx, self.resource.id) + self.assertEqual('engine-2', db_res.engine_id) + self.assertEqual('DELETE', db_res.action) + self.assertEqual(2, db_res.atomic_key)