DB API for cleaning mess left by dead engine
Change-Id: If7d37939fbc9c484266ed264f0af7ba791619676
This commit is contained in:
parent
a79fa034b0
commit
626d7ce5c9
|
@ -17,6 +17,7 @@ Implementation of SQLAlchemy backend.
|
|||
import six
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_db import api as oslo_db_api
|
||||
|
@ -416,10 +417,27 @@ def cluster_lock_acquire(cluster_id, action_id, scope):
|
|||
action_ids=[six.text_type(action_id)],
|
||||
semaphore=scope)
|
||||
session.add(lock)
|
||||
|
||||
return lock.action_ids
|
||||
|
||||
|
||||
def _release_cluster_lock(session, lock, action_id, scope):
|
||||
|
||||
success = False
|
||||
if (scope == -1 and lock.semaphore < 0) or lock.semaphore == 1:
|
||||
if six.text_type(action_id) in lock.action_ids:
|
||||
session.delete(lock)
|
||||
success = True
|
||||
elif six.text_type(action_id) in lock.action_ids:
|
||||
if lock.semaphore == 1:
|
||||
session.delete(lock)
|
||||
else:
|
||||
lock.action_ids.remove(six.text_type(action_id))
|
||||
lock.semaphore -= 1
|
||||
lock.save(session)
|
||||
success = True
|
||||
return success
|
||||
|
||||
|
||||
def cluster_lock_release(cluster_id, action_id, scope):
|
||||
'''Release lock on a cluster.
|
||||
|
||||
|
@ -434,21 +452,7 @@ def cluster_lock_release(cluster_id, action_id, scope):
|
|||
if lock is None:
|
||||
return False
|
||||
|
||||
success = False
|
||||
if scope == -1 or lock.semaphore == 1:
|
||||
if six.text_type(action_id) in lock.action_ids:
|
||||
session.delete(lock)
|
||||
success = True
|
||||
elif action_id in lock.action_ids:
|
||||
if lock.semaphore == 1:
|
||||
session.delete(lock)
|
||||
else:
|
||||
lock.action_ids.remove(six.text_type(action_id))
|
||||
lock.semaphore -= 1
|
||||
lock.save(session)
|
||||
success = True
|
||||
|
||||
return success
|
||||
return _release_cluster_lock(session, lock, action_id, scope)
|
||||
|
||||
|
||||
def cluster_lock_steal(cluster_id, action_id):
|
||||
|
@ -1326,6 +1330,26 @@ def service_get_all(context):
|
|||
return model_query(context, models.Service).all()
|
||||
|
||||
|
||||
def gc_by_engine(context, engine_id):
|
||||
# Get all actions locked by an engine
|
||||
with session_for_write() as session:
|
||||
q_actions = session.query(models.Action).filter_by(owner=engine_id)
|
||||
timestamp = time.time()
|
||||
for a in q_actions.all():
|
||||
# Release all node locks
|
||||
query = session.query(models.NodeLock).filter_by(action_id=a.id)
|
||||
query.delete(synchronize_session=False)
|
||||
|
||||
# Release all cluster locks
|
||||
for cl in session.query(models.ClusterLock).all():
|
||||
res = _release_cluster_lock(session, cl, a.id, -1)
|
||||
if not res:
|
||||
_release_cluster_lock(session, cl, a.id, 1)
|
||||
|
||||
# mark action failed and relase lock
|
||||
_mark_failed(session, a.id, timestamp, reason="Engine failure")
|
||||
|
||||
|
||||
# HealthRegistry
|
||||
def registry_claim(context, engine_id):
|
||||
with session_for_write() as session:
|
||||
|
|
|
@ -219,3 +219,111 @@ class DBAPILockTest(base.SenlinTestCase):
|
|||
|
||||
observed = db_api.node_lock_release(self.node.id, UUID2)
|
||||
self.assertTrue(observed)
|
||||
|
||||
|
||||
class GCByEngineTest(base.SenlinTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(GCByEngineTest, self).setUp()
|
||||
self.ctx = utils.dummy_context()
|
||||
self.profile = shared.create_profile(self.ctx)
|
||||
self.cluster = shared.create_cluster(self.ctx, self.profile)
|
||||
self.node = shared.create_node(self.ctx, self.cluster, self.profile)
|
||||
|
||||
def test_delete_cluster_lock(self):
|
||||
# Test the case that a single cluster-scope clock can be released
|
||||
#
|
||||
# (dead-engine) --> Action --> ClusterLock
|
||||
# |action|owner| |cluster|action|scope|
|
||||
# | A1 | E1 | |C1 |[A1] |-1 |
|
||||
|
||||
# preparation
|
||||
engine_id = UUID1
|
||||
action = shared.create_action(self.ctx, target=self.cluster.id,
|
||||
status='RUNNING', owner=engine_id,
|
||||
project=self.ctx.project)
|
||||
db_api.cluster_lock_acquire(self.cluster.id, action.id, -1)
|
||||
|
||||
# do it
|
||||
db_api.gc_by_engine(self.ctx, engine_id)
|
||||
|
||||
# assertion
|
||||
observed = db_api.cluster_lock_acquire(self.cluster.id, UUID2, -1)
|
||||
self.assertIn(UUID2, observed)
|
||||
self.assertNotIn(action.id, observed)
|
||||
|
||||
new_action = db_api.action_get(self.ctx, action.id)
|
||||
self.assertEqual('FAILED', new_action.status)
|
||||
self.assertEqual("Engine failure", new_action.status_reason)
|
||||
|
||||
def test_delete_cluster_lock_and_node_lock_1(self):
|
||||
# Test the case that an action is about node that also locked a
|
||||
# cluster and the cluster lock can be released
|
||||
#
|
||||
# (dead-engine) --> Action --> NodeLock
|
||||
# |action|owner| |node |action|
|
||||
# | A1 | E1 | |N1 |A1 |
|
||||
# --> ClusterLock
|
||||
# |cluster|action|scope|
|
||||
# |C1 |[A1] |1 |
|
||||
# preparation
|
||||
engine_id = UUID1
|
||||
action = shared.create_action(self.ctx, target=self.node.id,
|
||||
status='RUNNING', owner=engine_id,
|
||||
project=self.ctx.project)
|
||||
db_api.cluster_lock_acquire(self.cluster.id, action.id, 1)
|
||||
db_api.node_lock_acquire(self.cluster.id, action.id)
|
||||
|
||||
# do it
|
||||
db_api.gc_by_engine(self.ctx, engine_id)
|
||||
|
||||
# assertion
|
||||
# even a read lock is okay now
|
||||
observed = db_api.cluster_lock_acquire(self.node.id, UUID2, 1)
|
||||
self.assertIn(UUID2, observed)
|
||||
self.assertNotIn(action.id, observed)
|
||||
|
||||
# node can be locked again
|
||||
observed = db_api.node_lock_acquire(self.node.id, UUID2)
|
||||
self.assertEqual(UUID2, observed)
|
||||
|
||||
new_action = db_api.action_get(self.ctx, action.id)
|
||||
self.assertEqual('FAILED', new_action.status)
|
||||
self.assertEqual("Engine failure", new_action.status_reason)
|
||||
|
||||
def test_delete_cluster_lock_and_node_lock_2(self):
|
||||
# Test the case that an action is about node that also locked a
|
||||
# cluster and the cluster lock will remain locked
|
||||
#
|
||||
# (dead-engine) --> Action --> NodeLock
|
||||
# |action|owner| |node |action|
|
||||
# | A1 | E1 | |N1 |A1 |
|
||||
# --> ClusterLock
|
||||
# |cluster|action |scope|
|
||||
# |C1 |[A1, A2]|2 |
|
||||
# preparation
|
||||
engine_id = UUID1
|
||||
action = shared.create_action(self.ctx, target=self.node.id,
|
||||
status='RUNNING', owner=engine_id,
|
||||
project=self.ctx.project)
|
||||
db_api.cluster_lock_acquire(self.cluster.id, action.id, 1)
|
||||
db_api.cluster_lock_acquire(self.cluster.id, UUID2, 1)
|
||||
db_api.node_lock_acquire(self.node.id, action.id)
|
||||
|
||||
# do it
|
||||
db_api.gc_by_engine(self.ctx, engine_id)
|
||||
|
||||
# assertion
|
||||
# a read lock is okay now and cluster lock state not broken
|
||||
observed = db_api.cluster_lock_acquire(self.cluster.id, UUID3, 1)
|
||||
self.assertIn(UUID2, observed)
|
||||
self.assertIn(UUID3, observed)
|
||||
self.assertNotIn(action.id, observed)
|
||||
|
||||
# node can be locked again
|
||||
observed = db_api.node_lock_acquire(self.node.id, UUID2)
|
||||
self.assertEqual(UUID2, observed)
|
||||
|
||||
new_action = db_api.action_get(self.ctx, action.id)
|
||||
self.assertEqual('FAILED', new_action.status)
|
||||
self.assertEqual("Engine failure", new_action.status_reason)
|
||||
|
|
Loading…
Reference in New Issue