From 31fb1923bb32fb74ddd97058e86e9fe47cc30678 Mon Sep 17 00:00:00 2001 From: Bulat Gaifullin Date: Fri, 2 Sep 2016 13:43:46 +0300 Subject: [PATCH] Fixed dead-locks and race conditions in tasks execution Also imporved detecting dead-locks in chains, instead of card-coded chains use generic approach - collect knowledge about chains and check a new one against knowledge base. Change-Id: Ie1549c14d95a372a341e1e6e616226f8ee226a03 Partial-Bug: 1618852 --- nailgun/nailgun/db/deadlock_detector.py | 70 +++++-- .../alembic_migrations/versions/fuel_9_1.py | 16 ++ nailgun/nailgun/objects/task.py | 7 +- nailgun/nailgun/rpc/receiver.py | 186 +++++++++--------- nailgun/nailgun/task/manager.py | 77 ++++---- .../test/unit/test_deadlock_detector.py | 13 +- 6 files changed, 216 insertions(+), 153 deletions(-) diff --git a/nailgun/nailgun/db/deadlock_detector.py b/nailgun/nailgun/db/deadlock_detector.py index a668fd9f51..6b9372581f 100644 --- a/nailgun/nailgun/db/deadlock_detector.py +++ b/nailgun/nailgun/db/deadlock_detector.py @@ -17,22 +17,58 @@ import threading import traceback +import networkx + from nailgun.logger import logger from nailgun.settings import settings -ALLOWED_LOCKS_CHAINS = [ - ('attributes', 'clusters'), - ('attributes', 'clusters', 'ip_addr_ranges'), - ('attributes', 'ip_addr_ranges'), - ('attributes', 'ip_addrs'), - ('attributes', 'ip_addrs', 'network_groups'), - ('attributes', 'ip_addr_ranges', 'node_nic_interfaces'), - ('clusters', 'nodes'), - ('tasks', 'clusters'), - ('tasks', 'clusters', 'nodes'), - ('tasks', 'nodes'), - ('nodes', 'node_nic_interfaces'), -] + +class LockChainsRegistry(object): + def __init__(self): + self._chains = networkx.DiGraph() + self._cache = {} + self._lock = threading.Lock() + + def register(self, transition): + """Register and new chain. + + :param transition: the name of tables in chain + """ + # because the number of possible combinations is limited + # eventually all of them will be added to cache + # and this method does not affect performance + tmp = self._chains.copy() + tmp.add_path(transition) + # if after add transition graph contains cycles + # transition has wrong order + if networkx.is_directed_acyclic_graph(tmp): + result = self._cache[transition] = True + self._chains = tmp + else: + result = self._cache[transition] = False + return result + + def is_allowed(self, transition): + """Checks that transition does not introduce potential dead-lock.""" + try: + return self._cache[transition] + except KeyError: + pass + + with self._lock: + try: + # try again with exclusive access + return self._cache[transition] + except KeyError: + return self.register(transition) + + +_lock_chains_registry = LockChainsRegistry() + +# well known chains +_lock_chains_registry.register(('attributes', 'clusters')) +_lock_chains_registry.register(('clusters', 'tasks')) +_lock_chains_registry.register(('clusters', 'nodes')) class Lock(object): @@ -131,8 +167,8 @@ class ObjectsLockingOrderViolation(DeadlockDetectorError): class LockTransitionNotAllowedError(DeadlockDetectorError): def __init__(self): msg = "Possible deadlock found while attempting " \ - "to lock table: '{0}'. Lock transition is not allowed: {1}. " \ - "Traceback info: {2}".format( + "to lock table: '{0}'. Lock transition is not allowed: {1}.\n" \ + "Traceback info: {2}\n".format( context.locks[-1].table, ', '.join(lock.table for lock in context.locks), self._get_locks_trace() @@ -182,9 +218,9 @@ def register_lock(table): if len(context.locks) == 1: return lock - # Checking lock transition is allowed + # Checking lock transition is same as in previous visit transition = tuple(l.table for l in context.locks) - if transition not in ALLOWED_LOCKS_CHAINS: + if not _lock_chains_registry.is_allowed(transition): Lock.propagate_exception(LockTransitionNotAllowedError()) return lock diff --git a/nailgun/nailgun/db/migration/alembic_migrations/versions/fuel_9_1.py b/nailgun/nailgun/db/migration/alembic_migrations/versions/fuel_9_1.py index a35202a5cd..9c5ce6396a 100644 --- a/nailgun/nailgun/db/migration/alembic_migrations/versions/fuel_9_1.py +++ b/nailgun/nailgun/db/migration/alembic_migrations/versions/fuel_9_1.py @@ -60,6 +60,7 @@ def upgrade(): upgrade_deployment_history_summary() upgrade_node_deployment_info() upgrade_add_task_start_end_time() + fix_deployment_history_constraint() def downgrade(): @@ -470,3 +471,18 @@ def upgrade_add_task_start_end_time(): def downgrade_add_task_start_end_time(): op.drop_column('tasks', 'time_start') op.drop_column('tasks', 'time_end') + + +def fix_deployment_history_constraint(): + # only recreate deployment_history_task_id_fkey with valid properties + op.drop_constraint( + 'deployment_history_task_id_fkey', + 'deployment_history', + type_='foreignkey' + ) + + op.create_foreign_key( + "deployment_history_task_id_fkey", + "deployment_history", "tasks", + ["task_id"], ["id"], ondelete="CASCADE" + ) diff --git a/nailgun/nailgun/objects/task.py b/nailgun/nailgun/objects/task.py index 096f338bd9..b5002a0995 100644 --- a/nailgun/nailgun/objects/task.py +++ b/nailgun/nailgun/objects/task.py @@ -42,8 +42,11 @@ class Task(NailgunObject): @classmethod def get_by_uid(cls, uid, fail_if_not_found=False, lock_for_update=False): - return cls.get_by_uid_excluding_deleted(uid, fail_if_not_found=False, - lock_for_update=False) + return cls.get_by_uid_excluding_deleted( + uid, + fail_if_not_found=fail_if_not_found, + lock_for_update=lock_for_update + ) @classmethod def create_subtask(cls, instance, name): diff --git a/nailgun/nailgun/rpc/receiver.py b/nailgun/nailgun/rpc/receiver.py index 4485b83e71..02c5d07139 100644 --- a/nailgun/nailgun/rpc/receiver.py +++ b/nailgun/nailgun/rpc/receiver.py @@ -26,7 +26,6 @@ from oslo_serialization import jsonutils from sqlalchemy import or_ from nailgun import consts -from nailgun.errors import errors as nailgun_errors from nailgun import notifier from nailgun import objects from nailgun.settings import settings @@ -50,6 +49,37 @@ logger = logging.getLogger('receiverd') class NailgunReceiver(object): + @classmethod + def acquire_lock(cls, transaction_uuid): + """Get transaction and acquire exclusive access. + + :param transaction_uuid: the unique identifier of transaction + :return: transaction object or None if there is no task with such uid + """ + # use transaction object to get removed by UI tasks + transaction = objects.Transaction.get_by_uuid(transaction_uuid) + if not transaction: + logger.error("Task '%s' was removed.", transaction_uuid) + return + + # the lock order is following: cluster, task + if transaction.cluster: + objects.Cluster.get_by_uid( + transaction.cluster_id, + fail_if_not_found=True, lock_for_update=True + ) + + # read transaction again to ensure + # that it was not removed in other session + transaction = objects.Transaction.get_by_uuid( + transaction_uuid, lock_for_update=True) + if not transaction: + logger.error( + "Race condition detected, task '%s' was removed.", + transaction_uuid + ) + return transaction + @classmethod def remove_nodes_resp(cls, **kwargs): logger.info( @@ -67,19 +97,9 @@ class NailgunReceiver(object): progress = 100 # locking task - task = objects.Task.get_by_uuid( - task_uuid, - fail_if_not_found=True, - lock_for_update=True - ) - - # locking cluster - if task.cluster_id is not None: - objects.Cluster.get_by_uid( - task.cluster_id, - fail_if_not_found=True, - lock_for_update=True - ) + task = cls.acquire_lock(task_uuid) + if not task: + return False # locking nodes all_nodes = itertools.chain(nodes, error_nodes, inaccessible_nodes) @@ -226,7 +246,9 @@ class NailgunReceiver(object): ) status = kwargs.get('status') task_uuid = kwargs['task_uuid'] - task = objects.Task.get_by_uuid(task_uuid) + task = cls.acquire_lock(task_uuid) + if not task: + return if status == consts.TASK_STATUSES.ready: logger.info("IBP images from deleted cluster have been removed") @@ -241,11 +263,10 @@ class NailgunReceiver(object): "RPC method transaction_resp received: %s", jsonutils.dumps(kwargs) ) - transaction = objects.Task.get_by_uuid( - kwargs.pop('task_uuid', None), - fail_if_not_found=True, - lock_for_update=True, - ) + # TODO(bgaifullin) move lock to transaction manager + transaction = cls.acquire_lock(kwargs.pop('task_uuid', None)) + if not transaction: + return manager = transactions.TransactionsManager(transaction.cluster.id) manager.process(transaction, kwargs) @@ -261,18 +282,9 @@ class NailgunReceiver(object): status = kwargs.get('status') progress = kwargs.get('progress') - task = objects.Task.get_by_uuid( - task_uuid, - fail_if_not_found=True, - lock_for_update=True - ) - - # lock cluster - objects.Cluster.get_by_uid( - task.cluster_id, - fail_if_not_found=True, - lock_for_update=True - ) + task = cls.acquire_lock(task_uuid) + if not task: + return if not status: status = task.status @@ -377,11 +389,9 @@ class NailgunReceiver(object): progress = kwargs.get('progress') nodes = kwargs.get('nodes', []) - task = objects.Task.get_by_uuid( - task_uuid, - fail_if_not_found=True, - lock_for_update=True - ) + task = cls.acquire_lock(task_uuid) + if not task: + return # we should remove master node from the nodes since it requires # special handling and won't work with old code @@ -432,11 +442,9 @@ class NailgunReceiver(object): status = kwargs.get('status') progress = kwargs.get('progress') - task = objects.Task.get_by_uuid( - task_uuid, - fail_if_not_found=True, - lock_for_update=True - ) + task = cls.acquire_lock(task_uuid) + if not task: + return q_nodes = objects.NodeCollection.filter_by_id_list( None, task.cache['nodes']) @@ -667,10 +675,9 @@ class NailgunReceiver(object): status = kwargs.get('status') progress = kwargs.get('progress') - task = objects.Task.get_by_uuid( - task_uuid, - fail_if_not_found=True, - ) + task = cls.acquire_lock(task_uuid) + if not task: + return stopping_task_names = [ consts.TASK_NAMES.deploy, @@ -693,13 +700,6 @@ class NailgunReceiver(object): 'id' ).all() - # Locking cluster - objects.Cluster.get_by_uid( - task.cluster_id, - fail_if_not_found=True, - lock_for_update=True - ) - if not stop_tasks: logger.warning("stop_deployment_resp: deployment tasks \ not found for environment '%s'!", task.cluster_id) @@ -821,18 +821,9 @@ class NailgunReceiver(object): status = kwargs.get('status') progress = kwargs.get('progress') - task = objects.Task.get_by_uuid( - task_uuid, - fail_if_not_found=True, - lock_for_update=True - ) - - # Locking cluster - objects.Cluster.get_by_uid( - task.cluster_id, - fail_if_not_found=True, - lock_for_update=True - ) + task = cls.acquire_lock(task_uuid) + if not task: + return if status == consts.TASK_STATUSES.ready: # restoring pending changes @@ -922,8 +913,9 @@ class NailgunReceiver(object): status = kwargs.get('status') progress = kwargs.get('progress') - # We simply check that each node received all vlans for cluster - task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True) + task = cls.acquire_lock(task_uuid) + if not task: + return result = [] # We expect that 'nodes' contains all nodes which we test. @@ -1049,7 +1041,10 @@ class NailgunReceiver(object): jsonutils.dumps(kwargs)) ) task_uuid = kwargs.get('task_uuid') - task = objects.task.Task.get_by_uuid(uuid=task_uuid) + task = cls.acquire_lock(task_uuid) + if not task: + return + if kwargs.get('status'): task.status = kwargs['status'] task.progress = kwargs.get('progress', 0) @@ -1116,6 +1111,10 @@ class NailgunReceiver(object): status = kwargs.get('status') progress = kwargs.get('progress') + task = cls.acquire_lock(task_uuid) + if not task: + return + nodes_uids = [node['uid'] for node in nodes] nodes_db = db().query(Node).filter(Node.id.in_(nodes_uids)).all() nodes_map = dict((str(node.id), node) for node in nodes_db) @@ -1142,7 +1141,6 @@ class NailgunReceiver(object): error_msg = '\n'.join(messages) if messages else error_msg logger.debug('Check dhcp message %s', error_msg) - task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True) objects.Task.update_verify_networks(task, status, progress, error_msg, result) @@ -1157,7 +1155,9 @@ class NailgunReceiver(object): status = kwargs.get('status') progress = kwargs.get('progress') - task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True) + task = cls.acquire_lock(task_uuid) + if not task: + return release_info = task.cache['args']['release_info'] release_id = release_info['release_id'] @@ -1200,7 +1200,9 @@ class NailgunReceiver(object): error = kwargs.get('error') msg = kwargs.get('msg') - task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True) + task = cls.acquire_lock(task_uuid) + if not task: + return if status == 'error': notifier.notify('error', error) @@ -1229,8 +1231,9 @@ class NailgunReceiver(object): error = kwargs.get('error') message = kwargs.get('msg') - task = objects.Task.get_by_uuid( - task_uuid, fail_if_not_found=True, lock_for_update=True) + task = cls.acquire_lock(task_uuid) + if not task: + return if status not in (consts.TASK_STATUSES.ready, consts.TASK_STATUSES.error): @@ -1281,8 +1284,9 @@ class NailgunReceiver(object): status = consts.TASK_STATUSES.ready progress = 100 - task = objects.Task.get_by_uuid( - task_uuid, fail_if_not_found=True) + task = cls.acquire_lock(task_uuid) + if not task: + return failed_response_nodes = { n['uid']: n for n in response if n['status'] != 0 @@ -1364,22 +1368,19 @@ class NailgunReceiver(object): jsonutils.dumps(kwargs)) task_uuid = kwargs.get('task_uuid') + task = cls.acquire_lock(task_uuid) + if not task: + return - try: - task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True, - lock_for_update=True) - if task.status == consts.TASK_STATUSES.pending: - objects.Task.update( - task, {'status': consts.TASK_STATUSES.running}) - logger.debug("Task '%s' is acknowledged as running", - task_uuid) - else: - logger.debug("Task '%s' in status '%s' can not " - "be acknowledged as running", task_uuid, - task.status) - except nailgun_errors.ObjectNotFound: - logger.warning("Task '%s' acknowledgement as running failed " - "due to task doesn't exist in DB", task_uuid) + if task.status == consts.TASK_STATUSES.pending: + objects.Task.update( + task, {'status': consts.TASK_STATUSES.running}) + logger.debug("Task '%s' is acknowledged as running", + task_uuid) + else: + logger.debug("Task '%s' in status '%s' can not " + "be acknowledged as running", task_uuid, + task.status) @classmethod def update_dnsmasq_resp(cls, **kwargs): @@ -1391,8 +1392,9 @@ class NailgunReceiver(object): error = kwargs.get('error', '') message = kwargs.get('msg', '') - task = objects.Task.get_by_uuid( - task_uuid, fail_if_not_found=True, lock_for_update=True) + task = cls.acquire_lock(task_uuid) + if not task: + return data = {'status': status, 'progress': 100, 'message': message} if status == consts.TASK_STATUSES.error: diff --git a/nailgun/nailgun/task/manager.py b/nailgun/nailgun/task/manager.py index e9eb1ecf8e..be68580f4f 100644 --- a/nailgun/nailgun/task/manager.py +++ b/nailgun/nailgun/task/manager.py @@ -847,28 +847,43 @@ class StopDeploymentTaskManager(TaskManager): return task -class ResetEnvironmentTaskManager(TaskManager): +class ClearTaskHistory(TaskManager): + def clear_tasks_history(self, force=False): + try: + self.check_running_task(delete_obsolete=False) + except errors.TaskAlreadyRunning: + if not force: + raise + + logger.error( + u"Force stop running tasks for cluster %s", self.cluster.name + ) + running_tasks = objects.TaskCollection.all_in_progress( + self.cluster.id + ) + for task in running_tasks: + # Force set task to finished state and update action log + TaskHelper.set_ready_if_not_finished(task) + + # clear tasks history + cluster_tasks = objects.TaskCollection.get_cluster_tasks( + self.cluster.id + ) + cluster_tasks.delete(synchronize_session='fetch') + + +class ResetEnvironmentTaskManager(ClearTaskHistory): def execute(self, force=False, **kwargs): try: - self.check_running_task( - delete_obsolete=objects.Task.hard_delete - ) + self.clear_tasks_history(force=force) except errors.TaskAlreadyRunning: - if force: - logger.error( - u"Reset cluster '{0}' " - u"while deployment is still running." - .format(self.cluster.name) - ) - - else: - raise errors.DeploymentAlreadyStarted( - "Can't reset environment '{0}' when " - "running deployment task exists.".format( - self.cluster.id - ) + raise errors.DeploymentAlreadyStarted( + "Can't reset environment '{0}' when " + "running deployment task exists.".format( + self.cluster.id ) + ) # FIXME(aroma): remove updating of 'deployed_before' # when stop action is reworked. 'deployed_before' @@ -1111,33 +1126,17 @@ class VerifyNetworksTaskManager(TaskManager): return task -class ClusterDeletionManager(TaskManager): - +class ClusterDeletionManager(ClearTaskHistory): def execute(self, force=False, **kwargs): try: - self.check_running_task( - delete_obsolete=objects.Task.hard_delete - ) + self.clear_tasks_history(force=force) except errors.TaskAlreadyRunning: - if force: - logger.warning( - u"Deletion cluster '{0}' " - u"while deployment is still running." - .format(self.cluster.name) - ) - running_tasks = objects.TaskCollection.all_in_progress( + raise errors.DeploymentAlreadyStarted( + u"Can't delete environment '{0}' when " + u"running deployment task exists.".format( self.cluster.id ) - for task in running_tasks: - # Force set task to finished state and update action log - TaskHelper.set_ready_if_not_finished(task) - else: - raise errors.DeploymentAlreadyStarted( - u"Can't delete environment '{0}' when " - u"running deployment task exists.".format( - self.cluster.id - ) - ) + ) # locking nodes nodes = objects.NodeCollection.filter_by( None, diff --git a/nailgun/nailgun/test/unit/test_deadlock_detector.py b/nailgun/nailgun/test/unit/test_deadlock_detector.py index 90ba07da16..6771a111ea 100644 --- a/nailgun/nailgun/test/unit/test_deadlock_detector.py +++ b/nailgun/nailgun/test/unit/test_deadlock_detector.py @@ -91,16 +91,23 @@ class TestDeadlockDetector(BaseTestCase): db().rollback() self.assertEquals(0, len(dd.context.locks)) - def test_unknown_locks_chain_failed(self): + def test_different_order_in_chains_detected(self): db().query(models.Release).with_lockmode('update').all() + db().query(models.Node).with_lockmode('update').all() + db().rollback() + + db().query(models.Node).with_lockmode('update').all() self.assertRaises( dd.LockTransitionNotAllowedError, - db().query(models.Node).with_lockmode, 'update' + db().query(models.Release).with_lockmode, 'update' ) db().rollback() - db().query(models.Task).with_lockmode('update').all() db().query(models.Cluster).with_lockmode('update').all() + db().query(models.Task).with_lockmode('update').all() + db().query(models.Node).with_lockmode('update').all() + db().rollback() + db().query(models.Node).with_lockmode('update').all() self.assertRaises( dd.LockTransitionNotAllowedError,