diff --git a/heat/db/api.py b/heat/db/api.py index 60207dcd56..b5f6a04b3b 100644 --- a/heat/db/api.py +++ b/heat/db/api.py @@ -84,6 +84,12 @@ def resource_get_all(context): return IMPL.resource_get_all(context) +def resource_update(context, resource_id, values, atomic_key, + expected_engine_id=None): + return IMPL.resource_update(context, resource_id, values, atomic_key, + expected_engine_id) + + def resource_create(context, values): return IMPL.resource_create(context, values) diff --git a/heat/db/sqlalchemy/api.py b/heat/db/sqlalchemy/api.py index 658825b27b..b1043c50ce 100644 --- a/heat/db/sqlalchemy/api.py +++ b/heat/db/sqlalchemy/api.py @@ -154,6 +154,18 @@ def resource_get_all(context): return results +def resource_update(context, resource_id, values, atomic_key, + expected_engine_id=None): + session = _session(context) + with session.begin(): + values['atomic_key'] = atomic_key + 1 + rows_updated = session.query(models.Resource).filter_by( + id=resource_id, engine_id=expected_engine_id, + atomic_key=atomic_key).update(values) + + return bool(rows_updated) + + def resource_data_get_all(resource, data=None): """ Looks up resource_data by resource.id. If data is encrypted, diff --git a/heat/db/sqlalchemy/migrate_repo/versions/058_resource_engine_id.py b/heat/db/sqlalchemy/migrate_repo/versions/058_resource_engine_id.py new file mode 100644 index 0000000000..bc8a74434f --- /dev/null +++ b/heat/db/sqlalchemy/migrate_repo/versions/058_resource_engine_id.py @@ -0,0 +1,30 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sqlalchemy + + +def upgrade(migrate_engine): + meta = sqlalchemy.MetaData(bind=migrate_engine) + resource = sqlalchemy.Table('resource', meta, autoload=True) + engine_id = sqlalchemy.Column('engine_id', sqlalchemy.String(length=36)) + atomic_key = sqlalchemy.Column('atomic_key', sqlalchemy.Integer) + engine_id.create(resource) + atomic_key.create(resource) + + +def downgrade(migrate_engine): + meta = sqlalchemy.MetaData(bind=migrate_engine) + resource = sqlalchemy.Table('resource', meta, autoload=True) + resource.c.engine_id.drop() + resource.c.atomic_key.drop() diff --git a/heat/db/sqlalchemy/models.py b/heat/db/sqlalchemy/models.py index ef626e6c2a..ffaf52e616 100644 --- a/heat/db/sqlalchemy/models.py +++ b/heat/db/sqlalchemy/models.py @@ -287,6 +287,8 @@ class Resource(BASE, HeatBase, StateAware): # created/modified. (bug #1193269) updated_at = sqlalchemy.Column(sqlalchemy.DateTime) properties_data = sqlalchemy.Column('properties_data', types.Json) + engine_id = sqlalchemy.Column(sqlalchemy.String(36)) + atomic_key = sqlalchemy.Column(sqlalchemy.Integer) class WatchRule(BASE, HeatBase): diff --git a/heat/tests/db/test_migrations.py b/heat/tests/db/test_migrations.py index b2df27a2f4..6d47b9ff79 100644 --- a/heat/tests/db/test_migrations.py +++ b/heat/tests/db/test_migrations.py @@ -505,6 +505,10 @@ class HeatMigrationsCheckers(test_migrations.WalkVersionsMixin, self.assertTrue(rd_matches_old_data(rd.key, rd.value, r.uuid)) + def _check_058(self, engine, data): + self.assertColumnExists(engine, 'resource', 'engine_id') + self.assertColumnExists(engine, 'resource', 'atomic_key') + class TestHeatMigrationsMySQL(HeatMigrationsCheckers, test_base.MySQLOpportunisticTestCase): diff --git a/heat/tests/test_sqlalchemy_api.py b/heat/tests/test_sqlalchemy_api.py index b2fc23f42e..b625b5b307 100644 --- a/heat/tests/test_sqlalchemy_api.py +++ b/heat/tests/test_sqlalchemy_api.py @@ -2071,3 +2071,117 @@ class DBAPIServiceTest(common.HeatTestCase): db_api.service_delete(self.ctx, service.id, False) self.assertRaises(exception.ServiceNotFound, db_api.service_get, self.ctx, service.id) + + +class DBAPIResourceUpdateTest(common.HeatTestCase): + def setUp(self): + super(DBAPIResourceUpdateTest, self).setUp() + self.ctx = utils.dummy_context() + template = create_raw_template(self.ctx) + user_creds = create_user_creds(self.ctx) + stack = create_stack(self.ctx, template, user_creds) + self.resource = create_resource(self.ctx, stack, + atomic_key=0) + + def test_unlocked_resource_update(self): + values = {'engine_id': 'engine-1', + 'action': 'CREATE', + 'status': 'IN_PROGRESS'} + db_res = db_api.resource_get(self.ctx, self.resource.id) + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, None) + self.assertTrue(ret) + db_res = db_api.resource_get(self.ctx, self.resource.id) + self.assertEqual('engine-1', db_res.engine_id) + self.assertEqual('CREATE', db_res.action) + self.assertEqual('IN_PROGRESS', db_res.status) + self.assertEqual(1, db_res.atomic_key) + + def test_locked_resource_update_by_same_engine(self): + values = {'engine_id': 'engine-1', + 'action': 'CREATE', + 'status': 'IN_PROGRESS'} + db_res = db_api.resource_get(self.ctx, self.resource.id) + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, None) + self.assertTrue(ret) + db_res = db_api.resource_get(self.ctx, self.resource.id) + self.assertEqual('engine-1', db_res.engine_id) + self.assertEqual(1, db_res.atomic_key) + values = {'engine_id': 'engine-1', + 'action': 'CREATE', + 'status': 'FAILED'} + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, 'engine-1') + self.assertTrue(ret) + db_res = db_api.resource_get(self.ctx, self.resource.id) + self.assertEqual('engine-1', db_res.engine_id) + self.assertEqual('CREATE', db_res.action) + self.assertEqual('FAILED', db_res.status) + self.assertEqual(2, db_res.atomic_key) + + def test_locked_resource_update_by_other_engine(self): + values = {'engine_id': 'engine-1', + 'action': 'CREATE', + 'status': 'IN_PROGRESS'} + db_res = db_api.resource_get(self.ctx, self.resource.id) + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, None) + self.assertTrue(ret) + db_res = db_api.resource_get(self.ctx, self.resource.id) + self.assertEqual('engine-1', db_res.engine_id) + self.assertEqual(1, db_res.atomic_key) + values = {'engine_id': 'engine-2', + 'action': 'CREATE', + 'status': 'FAILED'} + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, 'engine-2') + self.assertFalse(ret) + + def test_release_resource_lock(self): + values = {'engine_id': 'engine-1', + 'action': 'CREATE', + 'status': 'IN_PROGRESS'} + db_res = db_api.resource_get(self.ctx, self.resource.id) + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, None) + self.assertTrue(ret) + db_res = db_api.resource_get(self.ctx, self.resource.id) + self.assertEqual('engine-1', db_res.engine_id) + self.assertEqual(1, db_res.atomic_key) + # Set engine id as None to release the lock + values = {'engine_id': None, + 'action': 'CREATE', + 'status': 'COMPLETE'} + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, 'engine-1') + self.assertTrue(ret) + db_res = db_api.resource_get(self.ctx, self.resource.id) + self.assertIsNone(db_res.engine_id) + self.assertEqual('CREATE', db_res.action) + self.assertEqual('COMPLETE', db_res.status) + self.assertEqual(2, db_res.atomic_key) + + def test_steal_resource_lock(self): + values = {'engine_id': 'engine-1', + 'action': 'CREATE', + 'status': 'IN_PROGRESS'} + db_res = db_api.resource_get(self.ctx, self.resource.id) + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, None) + self.assertTrue(ret) + db_res = db_api.resource_get(self.ctx, self.resource.id) + self.assertEqual('engine-1', db_res.engine_id) + self.assertEqual(1, db_res.atomic_key) + # Set engine id as engine-2 and pass expected engine id as old engine + # i.e engine-1 in db api steal the lock + values = {'engine_id': 'engine-2', + 'action': 'DELETE', + 'status': 'IN_PROGRESS'} + ret = db_api.resource_update(self.ctx, self.resource.id, + values, db_res.atomic_key, 'engine-1') + self.assertTrue(ret) + db_res = db_api.resource_get(self.ctx, self.resource.id) + self.assertEqual('engine-2', db_res.engine_id) + self.assertEqual('DELETE', db_res.action) + self.assertEqual(2, db_res.atomic_key)