DB changes for resource locking

This patch adds engine_id and atomic_key column to the resource
table for locking the resource and making all operations atomic.
Also added the db api to update the resource table.

The engine which updates the db first, gets the lock on the
resource. Only updates with this engine are possible. Updates with
other engine will not go through as they will fail to find the
matching record in the db.

Change-Id: Id8c3df7a439347652d3feca7481cd72762bf30ea
Implements: blueprint convergence-resource-locking
This commit is contained in:
ishant 2015-02-19 22:07:32 -08:00
parent 63c942ba11
commit c0314eded3
6 changed files with 168 additions and 0 deletions

View File

@ -84,6 +84,12 @@ def resource_get_all(context):
return IMPL.resource_get_all(context)
def resource_update(context, resource_id, values, atomic_key,
expected_engine_id=None):
return IMPL.resource_update(context, resource_id, values, atomic_key,
expected_engine_id)
def resource_create(context, values):
return IMPL.resource_create(context, values)

View File

@ -154,6 +154,18 @@ def resource_get_all(context):
return results
def resource_update(context, resource_id, values, atomic_key,
expected_engine_id=None):
session = _session(context)
with session.begin():
values['atomic_key'] = atomic_key + 1
rows_updated = session.query(models.Resource).filter_by(
id=resource_id, engine_id=expected_engine_id,
atomic_key=atomic_key).update(values)
return bool(rows_updated)
def resource_data_get_all(resource, data=None):
"""
Looks up resource_data by resource.id. If data is encrypted,

View File

@ -0,0 +1,30 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy
def upgrade(migrate_engine):
meta = sqlalchemy.MetaData(bind=migrate_engine)
resource = sqlalchemy.Table('resource', meta, autoload=True)
engine_id = sqlalchemy.Column('engine_id', sqlalchemy.String(length=36))
atomic_key = sqlalchemy.Column('atomic_key', sqlalchemy.Integer)
engine_id.create(resource)
atomic_key.create(resource)
def downgrade(migrate_engine):
meta = sqlalchemy.MetaData(bind=migrate_engine)
resource = sqlalchemy.Table('resource', meta, autoload=True)
resource.c.engine_id.drop()
resource.c.atomic_key.drop()

View File

@ -287,6 +287,8 @@ class Resource(BASE, HeatBase, StateAware):
# created/modified. (bug #1193269)
updated_at = sqlalchemy.Column(sqlalchemy.DateTime)
properties_data = sqlalchemy.Column('properties_data', types.Json)
engine_id = sqlalchemy.Column(sqlalchemy.String(36))
atomic_key = sqlalchemy.Column(sqlalchemy.Integer)
class WatchRule(BASE, HeatBase):

View File

@ -505,6 +505,10 @@ class HeatMigrationsCheckers(test_migrations.WalkVersionsMixin,
self.assertTrue(rd_matches_old_data(rd.key, rd.value,
r.uuid))
def _check_058(self, engine, data):
self.assertColumnExists(engine, 'resource', 'engine_id')
self.assertColumnExists(engine, 'resource', 'atomic_key')
class TestHeatMigrationsMySQL(HeatMigrationsCheckers,
test_base.MySQLOpportunisticTestCase):

View File

@ -2071,3 +2071,117 @@ class DBAPIServiceTest(common.HeatTestCase):
db_api.service_delete(self.ctx, service.id, False)
self.assertRaises(exception.ServiceNotFound, db_api.service_get,
self.ctx, service.id)
class DBAPIResourceUpdateTest(common.HeatTestCase):
def setUp(self):
super(DBAPIResourceUpdateTest, self).setUp()
self.ctx = utils.dummy_context()
template = create_raw_template(self.ctx)
user_creds = create_user_creds(self.ctx)
stack = create_stack(self.ctx, template, user_creds)
self.resource = create_resource(self.ctx, stack,
atomic_key=0)
def test_unlocked_resource_update(self):
values = {'engine_id': 'engine-1',
'action': 'CREATE',
'status': 'IN_PROGRESS'}
db_res = db_api.resource_get(self.ctx, self.resource.id)
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, None)
self.assertTrue(ret)
db_res = db_api.resource_get(self.ctx, self.resource.id)
self.assertEqual('engine-1', db_res.engine_id)
self.assertEqual('CREATE', db_res.action)
self.assertEqual('IN_PROGRESS', db_res.status)
self.assertEqual(1, db_res.atomic_key)
def test_locked_resource_update_by_same_engine(self):
values = {'engine_id': 'engine-1',
'action': 'CREATE',
'status': 'IN_PROGRESS'}
db_res = db_api.resource_get(self.ctx, self.resource.id)
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, None)
self.assertTrue(ret)
db_res = db_api.resource_get(self.ctx, self.resource.id)
self.assertEqual('engine-1', db_res.engine_id)
self.assertEqual(1, db_res.atomic_key)
values = {'engine_id': 'engine-1',
'action': 'CREATE',
'status': 'FAILED'}
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, 'engine-1')
self.assertTrue(ret)
db_res = db_api.resource_get(self.ctx, self.resource.id)
self.assertEqual('engine-1', db_res.engine_id)
self.assertEqual('CREATE', db_res.action)
self.assertEqual('FAILED', db_res.status)
self.assertEqual(2, db_res.atomic_key)
def test_locked_resource_update_by_other_engine(self):
values = {'engine_id': 'engine-1',
'action': 'CREATE',
'status': 'IN_PROGRESS'}
db_res = db_api.resource_get(self.ctx, self.resource.id)
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, None)
self.assertTrue(ret)
db_res = db_api.resource_get(self.ctx, self.resource.id)
self.assertEqual('engine-1', db_res.engine_id)
self.assertEqual(1, db_res.atomic_key)
values = {'engine_id': 'engine-2',
'action': 'CREATE',
'status': 'FAILED'}
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, 'engine-2')
self.assertFalse(ret)
def test_release_resource_lock(self):
values = {'engine_id': 'engine-1',
'action': 'CREATE',
'status': 'IN_PROGRESS'}
db_res = db_api.resource_get(self.ctx, self.resource.id)
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, None)
self.assertTrue(ret)
db_res = db_api.resource_get(self.ctx, self.resource.id)
self.assertEqual('engine-1', db_res.engine_id)
self.assertEqual(1, db_res.atomic_key)
# Set engine id as None to release the lock
values = {'engine_id': None,
'action': 'CREATE',
'status': 'COMPLETE'}
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, 'engine-1')
self.assertTrue(ret)
db_res = db_api.resource_get(self.ctx, self.resource.id)
self.assertIsNone(db_res.engine_id)
self.assertEqual('CREATE', db_res.action)
self.assertEqual('COMPLETE', db_res.status)
self.assertEqual(2, db_res.atomic_key)
def test_steal_resource_lock(self):
values = {'engine_id': 'engine-1',
'action': 'CREATE',
'status': 'IN_PROGRESS'}
db_res = db_api.resource_get(self.ctx, self.resource.id)
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, None)
self.assertTrue(ret)
db_res = db_api.resource_get(self.ctx, self.resource.id)
self.assertEqual('engine-1', db_res.engine_id)
self.assertEqual(1, db_res.atomic_key)
# Set engine id as engine-2 and pass expected engine id as old engine
# i.e engine-1 in db api steal the lock
values = {'engine_id': 'engine-2',
'action': 'DELETE',
'status': 'IN_PROGRESS'}
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, 'engine-1')
self.assertTrue(ret)
db_res = db_api.resource_get(self.ctx, self.resource.id)
self.assertEqual('engine-2', db_res.engine_id)
self.assertEqual('DELETE', db_res.action)
self.assertEqual(2, db_res.atomic_key)