revise DB to reverse engine GC process

this patch add an method to support recersing engine GC process.
We will try to delete the records in dependency table, and then break
node lock --> node scope lock --> cluster scope lock so that there
will not be requests to the cluster/node before we finishing GC.

Change-Id: I7cf5432c81bd44a6e80dea02829a70be04e9385e
This commit is contained in:
ruijie 2017-08-30 19:19:46 +08:00
parent 602b64ac01
commit 43b0ca9330
2 changed files with 82 additions and 0 deletions

View File

@ -1434,6 +1434,42 @@ def service_get_all():
return session.query(models.Service).all()
def _mark_engine_failed(session, action_id, timestamp, reason=None):
query = session.query(models.ActionDependency)
# process cluster actions
d_query = query.filter_by(dependent=action_id)
dependents = [d.depended for d in d_query.all()]
if dependents:
for d in dependents:
_mark_engine_failed(session, d, timestamp, reason)
else:
# process node actions
depended = query.filter_by(depended=action_id)
if depended.count() == 0:
return
depended.delete(synchronize_session=False)
# mark myself as failed
action = session.query(models.Action).filter_by(id=action_id).first()
values = {
'owner': None,
'status': consts.ACTION_FAILED,
'status_reason': (six.text_type(reason) if reason else
_('Action execution failed')),
'end_time': timestamp,
}
action.update(values)
action.save(session)
@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True,
retry_interval=0.5, inc_retry_interval=True)
def dummy_gc(context, action_ids, timestamp, reason=None):
with session_for_write() as session:
for action in action_ids:
_mark_engine_failed(session, action, timestamp, reason)
def gc_by_engine(engine_id):
# Get all actions locked by an engine
with session_for_write() as session:

View File

@ -441,6 +441,52 @@ class DBAPIActionTest(base.SenlinTestCase):
return id_of
def test_engine_mark_failed_with_depended(self):
timestamp = time.time()
id_of = self._prepare_action_mark_failed_cancel()
db_api.dummy_gc(self.ctx, [id_of['A01']], timestamp, 'BOOM')
for aid in [id_of['A02'], id_of['A03'], id_of['A04']]:
action = db_api.action_get(self.ctx, aid)
self.assertEqual(consts.ACTION_FAILED, action.status)
self.assertEqual('BOOM', action.status_reason)
self.assertEqual(timestamp, action.end_time)
action = db_api.action_get(self.ctx, id_of['A01'])
self.assertEqual(consts.ACTION_FAILED, action.status)
self.assertEqual('BOOM', action.status_reason)
self.assertEqual(timestamp, action.end_time)
for aid in [id_of['A02'], id_of['A03'], id_of['A04']]:
result = db_api.dependency_get_dependents(self.ctx, aid)
self.assertEqual(0, len(result))
def test_engine_mark_failed_without_depended(self):
timestamp = time.time()
id_of = self._prepare_action_mark_failed_cancel()
db_api.dummy_gc(self.ctx, [id_of['A02']], timestamp, 'BOOM')
for aid in [id_of['A03'], id_of['A04']]:
action = db_api.action_get(self.ctx, aid)
self.assertEqual(consts.ACTION_INIT, action.status)
self.assertNotEqual('BOOM', action.status_reason)
self.assertNotEqual(timestamp, action.end_time)
action = db_api.action_get(self.ctx, id_of['A01'])
self.assertEqual(consts.ACTION_WAITING, action.status)
self.assertNotEqual('BOOM', action.status_reason)
self.assertNotEqual(timestamp, action.end_time)
action_d = db_api.action_get(self.ctx, id_of['A02'])
self.assertEqual(consts.ACTION_FAILED, action_d.status)
self.assertEqual('BOOM', action_d.status_reason)
self.assertEqual(timestamp, action_d.end_time)
for aid in [id_of['A03'], id_of['A04']]:
result = db_api.dependency_get_dependents(self.ctx, aid)
self.assertEqual(1, len(result))
result = db_api.dependency_get_dependents(self.ctx, id_of['A02'])
self.assertEqual(0, len(result))
def test_action_mark_failed(self):
timestamp = time.time()
id_of = self._prepare_action_mark_failed_cancel()