Merge "DB changes for resource locking"

This commit is contained in:
Jenkins 2015-03-12 22:35:13 +00:00 committed by Gerrit Code Review
commit ecdc6e5543
6 changed files with 168 additions and 0 deletions

View File

@ -84,6 +84,12 @@ def resource_get_all(context):
return IMPL.resource_get_all(context) return IMPL.resource_get_all(context)
def resource_update(context, resource_id, values, atomic_key,
expected_engine_id=None):
return IMPL.resource_update(context, resource_id, values, atomic_key,
expected_engine_id)
def resource_create(context, values): def resource_create(context, values):
return IMPL.resource_create(context, values) return IMPL.resource_create(context, values)

View File

@ -154,6 +154,18 @@ def resource_get_all(context):
return results return results
def resource_update(context, resource_id, values, atomic_key,
expected_engine_id=None):
session = _session(context)
with session.begin():
values['atomic_key'] = atomic_key + 1
rows_updated = session.query(models.Resource).filter_by(
id=resource_id, engine_id=expected_engine_id,
atomic_key=atomic_key).update(values)
return bool(rows_updated)
def resource_data_get_all(resource, data=None): def resource_data_get_all(resource, data=None):
""" """
Looks up resource_data by resource.id. If data is encrypted, Looks up resource_data by resource.id. If data is encrypted,

View File

@ -0,0 +1,30 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy
def upgrade(migrate_engine):
meta = sqlalchemy.MetaData(bind=migrate_engine)
resource = sqlalchemy.Table('resource', meta, autoload=True)
engine_id = sqlalchemy.Column('engine_id', sqlalchemy.String(length=36))
atomic_key = sqlalchemy.Column('atomic_key', sqlalchemy.Integer)
engine_id.create(resource)
atomic_key.create(resource)
def downgrade(migrate_engine):
meta = sqlalchemy.MetaData(bind=migrate_engine)
resource = sqlalchemy.Table('resource', meta, autoload=True)
resource.c.engine_id.drop()
resource.c.atomic_key.drop()

View File

@ -287,6 +287,8 @@ class Resource(BASE, HeatBase, StateAware):
# created/modified. (bug #1193269) # created/modified. (bug #1193269)
updated_at = sqlalchemy.Column(sqlalchemy.DateTime) updated_at = sqlalchemy.Column(sqlalchemy.DateTime)
properties_data = sqlalchemy.Column('properties_data', types.Json) properties_data = sqlalchemy.Column('properties_data', types.Json)
engine_id = sqlalchemy.Column(sqlalchemy.String(36))
atomic_key = sqlalchemy.Column(sqlalchemy.Integer)
class WatchRule(BASE, HeatBase): class WatchRule(BASE, HeatBase):

View File

@ -563,6 +563,10 @@ class HeatMigrationsCheckers(test_migrations.WalkVersionsMixin,
self.assertTrue(rd_matches_old_data(rd.key, rd.value, self.assertTrue(rd_matches_old_data(rd.key, rd.value,
r.uuid)) r.uuid))
def _check_058(self, engine, data):
self.assertColumnExists(engine, 'resource', 'engine_id')
self.assertColumnExists(engine, 'resource', 'atomic_key')
class TestHeatMigrationsMySQL(HeatMigrationsCheckers, class TestHeatMigrationsMySQL(HeatMigrationsCheckers,
test_base.MySQLOpportunisticTestCase): test_base.MySQLOpportunisticTestCase):

View File

@ -2071,3 +2071,117 @@ class DBAPIServiceTest(common.HeatTestCase):
db_api.service_delete(self.ctx, service.id, False) db_api.service_delete(self.ctx, service.id, False)
self.assertRaises(exception.ServiceNotFound, db_api.service_get, self.assertRaises(exception.ServiceNotFound, db_api.service_get,
self.ctx, service.id) self.ctx, service.id)
class DBAPIResourceUpdateTest(common.HeatTestCase):
def setUp(self):
super(DBAPIResourceUpdateTest, self).setUp()
self.ctx = utils.dummy_context()
template = create_raw_template(self.ctx)
user_creds = create_user_creds(self.ctx)
stack = create_stack(self.ctx, template, user_creds)
self.resource = create_resource(self.ctx, stack,
atomic_key=0)
def test_unlocked_resource_update(self):
values = {'engine_id': 'engine-1',
'action': 'CREATE',
'status': 'IN_PROGRESS'}
db_res = db_api.resource_get(self.ctx, self.resource.id)
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, None)
self.assertTrue(ret)
db_res = db_api.resource_get(self.ctx, self.resource.id)
self.assertEqual('engine-1', db_res.engine_id)
self.assertEqual('CREATE', db_res.action)
self.assertEqual('IN_PROGRESS', db_res.status)
self.assertEqual(1, db_res.atomic_key)
def test_locked_resource_update_by_same_engine(self):
values = {'engine_id': 'engine-1',
'action': 'CREATE',
'status': 'IN_PROGRESS'}
db_res = db_api.resource_get(self.ctx, self.resource.id)
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, None)
self.assertTrue(ret)
db_res = db_api.resource_get(self.ctx, self.resource.id)
self.assertEqual('engine-1', db_res.engine_id)
self.assertEqual(1, db_res.atomic_key)
values = {'engine_id': 'engine-1',
'action': 'CREATE',
'status': 'FAILED'}
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, 'engine-1')
self.assertTrue(ret)
db_res = db_api.resource_get(self.ctx, self.resource.id)
self.assertEqual('engine-1', db_res.engine_id)
self.assertEqual('CREATE', db_res.action)
self.assertEqual('FAILED', db_res.status)
self.assertEqual(2, db_res.atomic_key)
def test_locked_resource_update_by_other_engine(self):
values = {'engine_id': 'engine-1',
'action': 'CREATE',
'status': 'IN_PROGRESS'}
db_res = db_api.resource_get(self.ctx, self.resource.id)
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, None)
self.assertTrue(ret)
db_res = db_api.resource_get(self.ctx, self.resource.id)
self.assertEqual('engine-1', db_res.engine_id)
self.assertEqual(1, db_res.atomic_key)
values = {'engine_id': 'engine-2',
'action': 'CREATE',
'status': 'FAILED'}
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, 'engine-2')
self.assertFalse(ret)
def test_release_resource_lock(self):
values = {'engine_id': 'engine-1',
'action': 'CREATE',
'status': 'IN_PROGRESS'}
db_res = db_api.resource_get(self.ctx, self.resource.id)
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, None)
self.assertTrue(ret)
db_res = db_api.resource_get(self.ctx, self.resource.id)
self.assertEqual('engine-1', db_res.engine_id)
self.assertEqual(1, db_res.atomic_key)
# Set engine id as None to release the lock
values = {'engine_id': None,
'action': 'CREATE',
'status': 'COMPLETE'}
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, 'engine-1')
self.assertTrue(ret)
db_res = db_api.resource_get(self.ctx, self.resource.id)
self.assertIsNone(db_res.engine_id)
self.assertEqual('CREATE', db_res.action)
self.assertEqual('COMPLETE', db_res.status)
self.assertEqual(2, db_res.atomic_key)
def test_steal_resource_lock(self):
values = {'engine_id': 'engine-1',
'action': 'CREATE',
'status': 'IN_PROGRESS'}
db_res = db_api.resource_get(self.ctx, self.resource.id)
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, None)
self.assertTrue(ret)
db_res = db_api.resource_get(self.ctx, self.resource.id)
self.assertEqual('engine-1', db_res.engine_id)
self.assertEqual(1, db_res.atomic_key)
# Set engine id as engine-2 and pass expected engine id as old engine
# i.e engine-1 in db api steal the lock
values = {'engine_id': 'engine-2',
'action': 'DELETE',
'status': 'IN_PROGRESS'}
ret = db_api.resource_update(self.ctx, self.resource.id,
values, db_res.atomic_key, 'engine-1')
self.assertTrue(ret)
db_res = db_api.resource_get(self.ctx, self.resource.id)
self.assertEqual('engine-2', db_res.engine_id)
self.assertEqual('DELETE', db_res.action)
self.assertEqual(2, db_res.atomic_key)