2nd patch to reverse GC process
We will try to delete the dependents first and then mark the actions' status as Failed since the owner of the actions went down. After that, we will try to release node lock and cluster lock(cluster scope or node scope) One problem is that if the dead engine hold a cluster action with a number of sub-actions belong to mult engines, we will mark them all as 'FAILED'. The starting point for this design is to mark all sub-actions which status are 'READY' to 'FAILED' so that the scheduler will not try to acquire them and process them. Change-Id: I53a5e1d19f4adb04580627cb6306c43efe610553
This commit is contained in:
parent
52811c7e9e
commit
08e77c7c73
|
@ -1443,12 +1443,11 @@ def _mark_engine_failed(session, action_id, timestamp, reason=None):
|
|||
for d in dependents:
|
||||
_mark_engine_failed(session, d, timestamp, reason)
|
||||
else:
|
||||
# process node actions
|
||||
depended = query.filter_by(depended=action_id)
|
||||
if depended.count() == 0:
|
||||
return
|
||||
depended.delete(synchronize_session=False)
|
||||
|
||||
# TODO(anyone): this will mark all depended actions' status to 'FAILED'
|
||||
# even the action belong to other engines and the action is running
|
||||
# mark myself as failed
|
||||
action = session.query(models.Action).filter_by(id=action_id).first()
|
||||
values = {
|
||||
|
@ -1464,10 +1463,23 @@ def _mark_engine_failed(session, action_id, timestamp, reason=None):
|
|||
|
||||
@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True,
|
||||
retry_interval=0.5, inc_retry_interval=True)
|
||||
def dummy_gc(context, action_ids, timestamp, reason=None):
|
||||
def dummy_gc(engine_id):
|
||||
with session_for_write() as session:
|
||||
for action in action_ids:
|
||||
_mark_engine_failed(session, action, timestamp, reason)
|
||||
q_actions = session.query(models.Action).filter_by(owner=engine_id)
|
||||
timestamp = time.time()
|
||||
for action in q_actions.all():
|
||||
_mark_engine_failed(session, action.id, timestamp,
|
||||
reason='Engine failure')
|
||||
# Release all node locks
|
||||
query = session.query(models.NodeLock).\
|
||||
filter_by(action_id=action.id)
|
||||
query.delete(synchronize_session=False)
|
||||
|
||||
# Release all cluster locks
|
||||
for clock in session.query(models.ClusterLock).all():
|
||||
res = _release_cluster_lock(session, clock, action.id, -1)
|
||||
if not res:
|
||||
_release_cluster_lock(session, clock, action.id, 1)
|
||||
|
||||
|
||||
def gc_by_engine(engine_id):
|
||||
|
|
|
@ -444,7 +444,9 @@ class DBAPIActionTest(base.SenlinTestCase):
|
|||
def test_engine_mark_failed_with_depended(self):
|
||||
timestamp = time.time()
|
||||
id_of = self._prepare_action_mark_failed_cancel()
|
||||
db_api.dummy_gc(self.ctx, [id_of['A01']], timestamp, 'BOOM')
|
||||
with db_api.session_for_write() as session:
|
||||
db_api._mark_engine_failed(session, id_of['A01'],
|
||||
timestamp, 'BOOM')
|
||||
for aid in [id_of['A02'], id_of['A03'], id_of['A04']]:
|
||||
action = db_api.action_get(self.ctx, aid)
|
||||
self.assertEqual(consts.ACTION_FAILED, action.status)
|
||||
|
@ -463,7 +465,9 @@ class DBAPIActionTest(base.SenlinTestCase):
|
|||
def test_engine_mark_failed_without_depended(self):
|
||||
timestamp = time.time()
|
||||
id_of = self._prepare_action_mark_failed_cancel()
|
||||
db_api.dummy_gc(self.ctx, [id_of['A02']], timestamp, 'BOOM')
|
||||
with db_api.session_for_write() as session:
|
||||
db_api._mark_engine_failed(session, id_of['A02'],
|
||||
timestamp, 'BOOM')
|
||||
|
||||
for aid in [id_of['A03'], id_of['A04']]:
|
||||
action = db_api.action_get(self.ctx, aid)
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_utils import uuidutils
|
||||
from senlin.db.sqlalchemy import api as db_api
|
||||
from senlin.tests.unit.common import base
|
||||
from senlin.tests.unit.common import utils
|
||||
|
@ -327,3 +328,186 @@ class GCByEngineTest(base.SenlinTestCase):
|
|||
new_action = db_api.action_get(self.ctx, action.id)
|
||||
self.assertEqual('FAILED', new_action.status)
|
||||
self.assertEqual("Engine failure", new_action.status_reason)
|
||||
|
||||
|
||||
class DummyGCByEngineTest(base.SenlinTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(DummyGCByEngineTest, self).setUp()
|
||||
self.ctx = utils.dummy_context()
|
||||
self.profile = shared.create_profile(self.ctx)
|
||||
self.cluster = shared.create_cluster(self.ctx, self.profile)
|
||||
self.node = shared.create_node(self.ctx, self.cluster, self.profile)
|
||||
|
||||
def test_delete_cluster_lock(self):
|
||||
# Test the case that a single cluster-scope clock can be released
|
||||
#
|
||||
# (dead-engine) --> Action --> ClusterLock
|
||||
# |action|owner| |cluster|action|scope|
|
||||
# | A1 | E1 | |C1 |[A1] |-1 |
|
||||
|
||||
# preparation
|
||||
engine_id = UUID1
|
||||
action = shared.create_action(self.ctx, target=self.cluster.id,
|
||||
status='RUNNING', owner=engine_id,
|
||||
project=self.ctx.project)
|
||||
db_api.cluster_lock_acquire(self.cluster.id, action.id, -1)
|
||||
|
||||
# do it
|
||||
db_api.dummy_gc(engine_id)
|
||||
|
||||
# assertion
|
||||
observed = db_api.cluster_lock_acquire(self.cluster.id, UUID2, -1)
|
||||
self.assertIn(UUID2, observed)
|
||||
self.assertNotIn(action.id, observed)
|
||||
|
||||
new_action = db_api.action_get(self.ctx, action.id)
|
||||
self.assertEqual('FAILED', new_action.status)
|
||||
self.assertEqual("Engine failure", new_action.status_reason)
|
||||
|
||||
def test_delete_cluster_lock_and_node_lock_1(self):
|
||||
# Test the case that an action is about node that also locked a
|
||||
# cluster and the cluster lock can be released
|
||||
#
|
||||
# (dead-engine) --> Action --> NodeLock
|
||||
# |action|owner| |node |action|
|
||||
# | A1 | E1 | |N1 |A1 |
|
||||
# --> ClusterLock
|
||||
# |cluster|action|scope|
|
||||
# |C1 |[A1] |1 |
|
||||
# preparation
|
||||
engine_id = UUID1
|
||||
action = shared.create_action(self.ctx, target=self.node.id,
|
||||
status='RUNNING', owner=engine_id,
|
||||
project=self.ctx.project)
|
||||
db_api.cluster_lock_acquire(self.cluster.id, action.id, 1)
|
||||
db_api.node_lock_acquire(self.cluster.id, action.id)
|
||||
|
||||
# do it
|
||||
db_api.dummy_gc(engine_id)
|
||||
|
||||
# assertion
|
||||
# even a read lock is okay now
|
||||
observed = db_api.cluster_lock_acquire(self.node.id, UUID2, 1)
|
||||
self.assertIn(UUID2, observed)
|
||||
self.assertNotIn(action.id, observed)
|
||||
|
||||
# node can be locked again
|
||||
observed = db_api.node_lock_acquire(self.node.id, UUID2)
|
||||
self.assertEqual(UUID2, observed)
|
||||
|
||||
new_action = db_api.action_get(self.ctx, action.id)
|
||||
self.assertEqual('FAILED', new_action.status)
|
||||
self.assertEqual("Engine failure", new_action.status_reason)
|
||||
|
||||
def test_delete_cluster_lock_and_node_lock_2(self):
|
||||
# Test the case that an action is about node that also locked a
|
||||
# cluster and the cluster lock will remain locked
|
||||
#
|
||||
# (dead-engine) --> Action --> NodeLock
|
||||
# |action|owner| |node |action|
|
||||
# | A1 | E1 | |N1 |A1 |
|
||||
# --> ClusterLock
|
||||
# |cluster|action |scope|
|
||||
# |C1 |[A1, A2]|2 |
|
||||
# preparation
|
||||
engine_id = UUID1
|
||||
action = shared.create_action(self.ctx, target=self.node.id,
|
||||
status='RUNNING', owner=engine_id,
|
||||
project=self.ctx.project)
|
||||
db_api.cluster_lock_acquire(self.cluster.id, action.id, 1)
|
||||
db_api.cluster_lock_acquire(self.cluster.id, UUID2, 1)
|
||||
db_api.node_lock_acquire(self.node.id, action.id)
|
||||
|
||||
# do it
|
||||
db_api.dummy_gc(engine_id)
|
||||
|
||||
# assertion
|
||||
# a read lock is okay now and cluster lock state not broken
|
||||
observed = db_api.cluster_lock_acquire(self.cluster.id, UUID3, 1)
|
||||
self.assertIn(UUID2, observed)
|
||||
self.assertIn(UUID3, observed)
|
||||
self.assertNotIn(action.id, observed)
|
||||
|
||||
# node can be locked again
|
||||
observed = db_api.node_lock_acquire(self.node.id, UUID2)
|
||||
self.assertEqual(UUID2, observed)
|
||||
|
||||
new_action = db_api.action_get(self.ctx, action.id)
|
||||
self.assertEqual('FAILED', new_action.status)
|
||||
self.assertEqual("Engine failure", new_action.status_reason)
|
||||
|
||||
def test_mult_engine_keep_node_scope_lock(self):
|
||||
engine1 = UUID1
|
||||
engine2 = UUID2
|
||||
|
||||
node2 = shared.create_node(self.ctx, self.cluster, self.profile)
|
||||
|
||||
c_action = shared.create_action(self.ctx, target=self.cluster.id,
|
||||
status='WAITING', owner=engine1,
|
||||
project=self.ctx.project)
|
||||
|
||||
n_action_1 = shared.create_action(self.ctx, target=self.node.id,
|
||||
status='RUNNING', owner=engine1,
|
||||
project=self.ctx.project)
|
||||
|
||||
n_action_2 = shared.create_action(self.ctx, target=node2.id,
|
||||
status='RUNNING', owner=engine2,
|
||||
project=self.ctx.project)
|
||||
|
||||
db_api.dependency_add(self.ctx, [n_action_1.id, n_action_2.id],
|
||||
c_action.id)
|
||||
|
||||
db_api.cluster_lock_acquire(self.cluster.id, c_action.id, -1)
|
||||
db_api.cluster_lock_acquire(self.cluster.id, n_action_1.id, 1)
|
||||
db_api.cluster_lock_acquire(self.cluster.id, n_action_2.id, 1)
|
||||
db_api.node_lock_acquire(self.node.id, n_action_1.id)
|
||||
db_api.node_lock_acquire(node2.id, n_action_2.id)
|
||||
|
||||
# do it
|
||||
db_api.dummy_gc(engine1)
|
||||
|
||||
# try to acquire cluster scope lock
|
||||
observed = db_api.cluster_lock_acquire(self.cluster.id, UUID3, -1)
|
||||
self.assertIn(UUID3, observed)
|
||||
self.assertEqual(1, len(observed))
|
||||
|
||||
# try to acquire node scope lock
|
||||
UUID4 = uuidutils.generate_uuid()
|
||||
observed = db_api.cluster_lock_acquire(self.node.id, UUID4, 1)
|
||||
self.assertIn(UUID4, observed)
|
||||
self.assertEqual(1, len(observed))
|
||||
|
||||
# node scope lock will be also released
|
||||
UUID5 = uuidutils.generate_uuid()
|
||||
observed = db_api.cluster_lock_acquire(node2.id, UUID5, 1)
|
||||
self.assertIn(UUID5, observed)
|
||||
self.assertEqual(1, len(observed))
|
||||
|
||||
# try to acquire node lock
|
||||
UUID6 = uuidutils.generate_uuid()
|
||||
observed = db_api.node_lock_acquire(self.node.id, UUID6)
|
||||
self.assertEqual(UUID6, observed)
|
||||
|
||||
# node locks for actions owned by other engines are still there
|
||||
UUID7 = uuidutils.generate_uuid()
|
||||
observed = db_api.node_lock_acquire(node2.id, UUID7)
|
||||
self.assertNotEqual(UUID7, observed)
|
||||
self.assertEqual(n_action_2.id, observed)
|
||||
|
||||
# check dependency
|
||||
dependents = db_api.dependency_get_depended(self.ctx, c_action.id)
|
||||
self.assertEqual(0, len(dependents))
|
||||
|
||||
# check action status
|
||||
new_c_action = db_api.action_get(self.ctx, c_action.id)
|
||||
self.assertEqual('FAILED', new_c_action.status)
|
||||
self.assertIsNone(new_c_action.owner)
|
||||
|
||||
new_n_action_1 = db_api.action_get(self.ctx, n_action_1.id)
|
||||
self.assertEqual('FAILED', new_n_action_1.status)
|
||||
self.assertIsNone(new_n_action_1.owner)
|
||||
|
||||
new_n_action_2 = db_api.action_get(self.ctx, n_action_2.id)
|
||||
self.assertEqual('FAILED', new_n_action_2.status)
|
||||
self.assertIsNone(new_n_action_2.owner)
|
||||
|
|
Loading…
Reference in New Issue