From 976667c0a22f604559e19135f30dea803e3080f6 Mon Sep 17 00:00:00 2001 From: Moshe Elisha Date: Sat, 5 Dec 2015 15:31:24 +0000 Subject: [PATCH] Fix concurrency issues by using READ_COMMITTED The state of a workflow execution was not updated even when all task executions were completed if some tasks finished at the same time as other tasks. Because we were using our connections with transaction isolation level = REPEATABLE_READ - Each process was using a snapshot of the DB created at the first read statement in that transaction. When a task finished and evaluated the state of all the other tasks it did not see the up-to-date state of those tasks - and so, because not all tasks were completed - the task did not change the workflow execution state. Similar behavior happened with multiple action executions under same task. On completion, each action execution checked the status of the other action executions and did not see the up-to-date state of these action execution - causing task execution to stay in RUNNING state. Change-Id: I12f66134d92b8ed39df9d6128d7de5ee49aa8623 Closes-Bug: #1518012 Closes-Bug: #1513456 --- mistral/db/sqlalchemy/base.py | 20 ++++++ mistral/db/v2/api.py | 2 +- mistral/db/v2/sqlalchemy/api.py | 21 ++++-- mistral/engine/default_engine.py | 52 +++++---------- .../api/v2/test_mistral_basic_v2.py | 26 ++++++++ mistral/tests/functional/base.py | 13 ++-- .../resources/wf_action_ex_concurrency.yaml | 8 +++ .../resources/wf_task_ex_concurrency.yaml | 11 ++++ mistral/tests/unit/db/v2/test_locking.py | 50 +------------- mistral/tests/unit/engine/base.py | 3 + .../tests/unit/engine/test_workflow_resume.py | 9 ++- .../unit/services/test_trigger_service.py | 18 ++++- ...el-to-read-committed-7080833ad284b901.yaml | 66 +++++++++++++++++++ 13 files changed, 202 insertions(+), 97 deletions(-) create mode 100644 mistral/tests/resources/wf_action_ex_concurrency.yaml create mode 100644 mistral/tests/resources/wf_task_ex_concurrency.yaml create mode 100644 releasenotes/notes/changing-isolation-level-to-read-committed-7080833ad284b901.yaml diff --git a/mistral/db/sqlalchemy/base.py b/mistral/db/sqlalchemy/base.py index 93d64a17..b8fac2f0 100644 --- a/mistral/db/sqlalchemy/base.py +++ b/mistral/db/sqlalchemy/base.py @@ -20,6 +20,7 @@ from oslo_config import cfg from oslo_db import options from oslo_db.sqlalchemy import session as db_session from oslo_log import log as logging +import sqlalchemy as sa from mistral.db.sqlalchemy import sqlite_lock from mistral import exceptions as exc @@ -34,6 +35,7 @@ options.set_defaults(cfg.CONF, connection="sqlite:///mistral.sqlite") _DB_SESSION_THREAD_LOCAL_NAME = "db_sql_alchemy_session" _facade = None +_sqlalchemy_create_engine_orig = sa.create_engine def _get_facade(): @@ -50,7 +52,25 @@ def _get_facade(): return _facade +# Monkey-patching sqlalchemy to set the isolation_level +# as this configuration is not exposed by oslo_db. +def _sqlalchemy_create_engine_wrapper(*args, **kwargs): + # sqlite (used for unit testing and not allowed for production) + # does not support READ_COMMITTED. + # Checking the drivername using the args and not the get_driver_name() + # method because that method requires a session. + if args[0].drivername != 'sqlite': + kwargs["isolation_level"] = "READ_COMMITTED" + + return _sqlalchemy_create_engine_orig(*args, **kwargs) + + def get_engine(): + # If the patch was not applied yet. + if sa.create_engine != _sqlalchemy_create_engine_wrapper: + # Replace the original create_engine with our wrapper. + sa.create_engine = _sqlalchemy_create_engine_wrapper + return _get_facade().get_engine() diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index a9b12e18..35605531 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -63,7 +63,7 @@ def transaction(): def acquire_lock(model, id): - IMPL.acquire_lock(model, id) + return IMPL.acquire_lock(model, id) # Workbooks. diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 617b0e12..d61ee569 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -91,15 +91,24 @@ def transaction(): @b.session_aware() def acquire_lock(model, id, session=None): - if b.get_driver_name() != 'sqlite': - query = _secure_query(model).filter("id = '%s'" % id) + # Expire all so all objects queried after lock is acquired + # will be up-to-date from the DB and not from cache. + session.expire_all() + + if b.get_driver_name() != 'sqlite': + entity = _get_one_entity(model, id) + entity.update({'updated_at': timeutils.utcnow()}) - query.update( - {'updated_at': timeutils.utcnow()}, - synchronize_session='fetch', - ) else: sqlite_lock.acquire_lock(id, session) + entity = _get_one_entity(model, id) + + return entity + + +def _get_one_entity(model, id): + # Get entity by ID and expect exactly one object. + return _secure_query(model).filter(model.id == id).one() def _secure_query(model, *columns): diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index ae478f64..c22f515c 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -156,11 +156,7 @@ class DefaultEngine(base.Engine, coordination.Service): # policy worked. wf_ex_id = task_ex.workflow_execution_id - - # Must be before loading the object itself (see method doc). - self._lock_workflow_execution(wf_ex_id) - - wf_ex = task_ex.workflow_execution + wf_ex = self._lock_workflow_execution(wf_ex_id) wf_trace.info( task_ex, @@ -234,13 +230,9 @@ class DefaultEngine(base.Engine, coordination.Service): ).get_clone() wf_ex_id = action_ex.task_execution.workflow_execution_id + wf_ex = self._lock_workflow_execution(wf_ex_id) - # Must be before loading the object itself (see method doc). - self._lock_workflow_execution(wf_ex_id) - - wf_ex = action_ex.task_execution.workflow_execution - - task_ex = task_handler.on_action_complete(action_ex, result) + task_handler.on_action_complete(action_ex, result) # If workflow is on pause or completed then there's no # need to continue workflow. @@ -254,7 +246,9 @@ class DefaultEngine(base.Engine, coordination.Service): with db_api.transaction(): action_ex = db_api.get_action_execution(action_ex_id) task_ex = action_ex.task_execution - wf_ex = action_ex.task_execution.workflow_execution + wf_ex = self._lock_workflow_execution( + task_ex.workflow_execution_id + ) self._on_task_state_change(task_ex, wf_ex) return action_ex.get_clone() @@ -266,17 +260,16 @@ class DefaultEngine(base.Engine, coordination.Service): action_ex_id, e, traceback.format_exc() ) - self._fail_workflow(wf_ex_id, e) + # If an exception was thrown after we got the wf_ex_id + if wf_ex_id: + self._fail_workflow(wf_ex_id, e) raise e @u.log_exec(LOG) def pause_workflow(self, execution_id): with db_api.transaction(): - # Must be before loading the object itself (see method doc). - self._lock_workflow_execution(execution_id) - - wf_ex = db_api.get_workflow_execution(execution_id) + wf_ex = self._lock_workflow_execution(execution_id) wf_handler.set_execution_state(wf_ex, states.PAUSED) @@ -323,16 +316,13 @@ class DefaultEngine(base.Engine, coordination.Service): def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True): try: with db_api.transaction(): - # Must be before loading the object itself (see method doc). - self._lock_workflow_execution(wf_ex_id) + wf_ex = self._lock_workflow_execution(wf_ex_id) task_ex = db_api.get_task_execution(task_ex_id) if task_ex.workflow_execution.id != wf_ex_id: raise ValueError('Workflow execution ID does not match.') - wf_ex = task_ex.workflow_execution - if wf_ex.state == states.PAUSED: return wf_ex.get_clone() @@ -349,10 +339,7 @@ class DefaultEngine(base.Engine, coordination.Service): def resume_workflow(self, wf_ex_id): try: with db_api.transaction(): - # Must be before loading the object itself (see method doc). - self._lock_workflow_execution(wf_ex_id) - - wf_ex = db_api.get_workflow_execution(wf_ex_id) + wf_ex = self._lock_workflow_execution(wf_ex_id) if wf_ex.state != states.PAUSED: return wf_ex.get_clone() @@ -369,10 +356,7 @@ class DefaultEngine(base.Engine, coordination.Service): @u.log_exec(LOG) def stop_workflow(self, execution_id, state, message=None): with db_api.transaction(): - # Must be before loading the object itself (see method doc). - self._lock_workflow_execution(execution_id) - - wf_ex = db_api.get_execution(execution_id) + wf_ex = self._lock_workflow_execution(execution_id) return self._stop_workflow(wf_ex, state, message) @@ -506,9 +490,7 @@ class DefaultEngine(base.Engine, coordination.Service): @staticmethod def _lock_workflow_execution(wf_exec_id): - # NOTE: Workflow execution object must be locked before - # loading the object itself into the session (either with - # 'get_XXX' or 'load_XXX' methods). Otherwise, there can be - # multiple parallel transactions that see the same state - # and hence the rest of the method logic would not be atomic. - db_api.acquire_lock(db_models.WorkflowExecution, wf_exec_id) + # Locks a workflow execution using the db_api.acquire_lock function. + # The method expires all session objects and returns the up-to-date + # workflow execution from the DB. + return db_api.acquire_lock(db_models.WorkflowExecution, wf_exec_id) diff --git a/mistral/tests/functional/api/v2/test_mistral_basic_v2.py b/mistral/tests/functional/api/v2/test_mistral_basic_v2.py index 574c261c..b415350d 100644 --- a/mistral/tests/functional/api/v2/test_mistral_basic_v2.py +++ b/mistral/tests/functional/api/v2/test_mistral_basic_v2.py @@ -557,6 +557,32 @@ class ExecutionTestsV2(base.TestCase): self.reverse_wf['name'], params={"task_name": "nonexist"}) + @test.attr(type='sanity') + def test_action_ex_concurrency(self): + resp, wf = self.client.create_workflow("wf_action_ex_concurrency.yaml") + self.assertEqual(201, resp.status) + + wf_name = wf['workflows'][0]['name'] + resp, execution = self.client.create_execution(wf_name) + + self.assertEqual(201, resp.status) + self.assertEqual('RUNNING', execution['state']) + + self.client.wait_execution_success(execution) + + @test.attr(type='sanity') + def test_task_ex_concurrency(self): + resp, wf = self.client.create_workflow("wf_task_ex_concurrency.yaml") + self.assertEqual(201, resp.status) + + wf_name = wf['workflows'][0]['name'] + resp, execution = self.client.create_execution(wf_name) + + self.assertEqual(201, resp.status) + self.assertEqual('RUNNING', execution['state']) + + self.client.wait_execution(execution, target_state='ERROR') + class CronTriggerTestsV2(base.TestCase): diff --git a/mistral/tests/functional/base.py b/mistral/tests/functional/base.py index da4c4efc..0fcc672b 100644 --- a/mistral/tests/functional/base.py +++ b/mistral/tests/functional/base.py @@ -89,14 +89,19 @@ class MistralClientBase(rest_client.RestClient): return resp, json.loads(body) def wait_execution_success(self, ex_body, timeout=180, url='executions'): + return self.wait_execution(ex_body, timeout=timeout, url=url) + + def wait_execution(self, ex_body, timeout=180, url='executions', + target_state='SUCCESS'): start_time = time.time() - expected_states = ['SUCCESS', 'RUNNING'] + expected_states = [target_state, 'RUNNING'] - while ex_body['state'] != 'SUCCESS': + while ex_body['state'] != target_state: if time.time() - start_time > timeout: - msg = ("Execution exceeds timeout {0} to change state " - "to SUCCESS. Execution: {1}".format(timeout, ex_body)) + msg = ("Execution exceeds timeout {0} " + "to change state to {1}. " + "Execution: {2}".format(timeout, target_state, ex_body)) raise exceptions.TimeoutException(msg) _, ex_body = self.get_object(url, ex_body['id']) diff --git a/mistral/tests/resources/wf_action_ex_concurrency.yaml b/mistral/tests/resources/wf_action_ex_concurrency.yaml new file mode 100644 index 00000000..0dee5505 --- /dev/null +++ b/mistral/tests/resources/wf_action_ex_concurrency.yaml @@ -0,0 +1,8 @@ +--- +version: '2.0' + +test_action_ex_concurrency: + tasks: + test_with_items: + with-items: index in <% range(2) %> + action: std.echo output='<% $.index %>' \ No newline at end of file diff --git a/mistral/tests/resources/wf_task_ex_concurrency.yaml b/mistral/tests/resources/wf_task_ex_concurrency.yaml new file mode 100644 index 00000000..45894088 --- /dev/null +++ b/mistral/tests/resources/wf_task_ex_concurrency.yaml @@ -0,0 +1,11 @@ +--- +version: '2.0' + +test_task_ex_concurrency: + tasks: + task1: + action: std.async_noop + timeout: 2 + task2: + action: std.async_noop + timeout: 2 \ No newline at end of file diff --git a/mistral/tests/unit/db/v2/test_locking.py b/mistral/tests/unit/db/v2/test_locking.py index 6b0ddc2d..2c523967 100644 --- a/mistral/tests/unit/db/v2/test_locking.py +++ b/mistral/tests/unit/db/v2/test_locking.py @@ -92,16 +92,11 @@ class SQLiteLocksTest(test_base.DbTestCase): self._random_sleep() with db_api.transaction(): - # Here we lock the object before it gets loaded into the - # session and prevent reading the same object state by - # multiple transactions. Hence the rest of the transaction - # body works atomically (in a serialized manner) and the - # result (object name) must be equal to a number of - # transactions. - db_api.acquire_lock(db_models.WorkflowExecution, wf_ex.id) + # Lock workflow execution and get the most up-to-date object. + wf_ex = db_api.acquire_lock(db_models.WorkflowExecution, wf_ex.id) # Refresh the object. - wf_ex = db_api.get_workflow_execution(wf_ex.id) + db_api.get_workflow_execution(wf_ex.id) wf_ex.name = str(int(wf_ex.name) + 1) @@ -127,42 +122,3 @@ class SQLiteLocksTest(test_base.DbTestCase): print("Correct locking test gave object name: %s" % wf_ex.name) self.assertEqual(str(number), wf_ex.name) - - def _run_invalid_locking(self, wf_ex): - self._random_sleep() - - with db_api.transaction(): - # Load object into the session (transaction). - wf_ex = db_api.get_workflow_execution(wf_ex.id) - - # It's too late to lock the object here because it's already - # been loaded into the session so there should be multiple - # threads that read the same object state so they write the - # same value into DB. As a result we won't get a result - # (object name) equal to a number of transactions. - db_api.acquire_lock(db_models.WorkflowExecution, wf_ex.id) - - wf_ex.name = str(int(wf_ex.name) + 1) - - return wf_ex.name - - def test_invalid_locking(self): - wf_ex = db_api.create_workflow_execution(WF_EXEC) - - threads = [] - - number = 500 - - for i in range(1, number): - threads.append( - eventlet.spawn(self._run_invalid_locking, wf_ex) - ) - - [t.wait() for t in threads] - [t.kill() for t in threads] - - wf_ex = db_api.get_workflow_execution(wf_ex.id) - - print("Invalid locking test gave object name: %s" % wf_ex.name) - - self.assertNotEqual(str(number), wf_ex.name) diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index 3b849e03..89c55366 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -166,3 +166,6 @@ class EngineTestCase(base.DbTestCase): def is_task_delayed(self, task_ex_id): return self.is_task_in_state(task_ex_id, states.RUNNING_DELAYED) + + def is_task_processed(self, task_ex_id): + return db_api.get_task_execution(task_ex_id).processed diff --git a/mistral/tests/unit/engine/test_workflow_resume.py b/mistral/tests/unit/engine/test_workflow_resume.py index cec38b4e..b69a1a97 100644 --- a/mistral/tests/unit/engine/test_workflow_resume.py +++ b/mistral/tests/unit/engine/test_workflow_resume.py @@ -303,6 +303,11 @@ class WorkflowResumeTest(base.EngineTestCase): self.assertEqual(states.RUNNING, wf_ex.state) + # Wait for task3 to be processed. + task3_ex = self._assert_single_item(task_execs, name='task3') + self._await(lambda: self.is_task_success(task3_ex.id)) + self._await(lambda: self.is_task_processed(task3_ex.id)) + # Finish task2. task2_action_ex = db_api.get_action_executions( task_execution_id=task2_ex.id @@ -314,7 +319,7 @@ class WorkflowResumeTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) - self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertEqual(states.SUCCESS, wf_ex.state, wf_ex.state_info) self.assertEqual(4, len(wf_ex.task_executions)) @mock.patch.object(de.DefaultEngine, '_fail_workflow') @@ -335,7 +340,7 @@ class WorkflowResumeTest(base.EngineTestCase): with mock.patch.object( db_api, - 'get_workflow_execution', + 'acquire_lock', side_effect=err): self.assertRaises( diff --git a/mistral/tests/unit/services/test_trigger_service.py b/mistral/tests/unit/services/test_trigger_service.py index 95da472c..46ea6747 100644 --- a/mistral/tests/unit/services/test_trigger_service.py +++ b/mistral/tests/unit/services/test_trigger_service.py @@ -281,7 +281,21 @@ class TriggerServiceV2Test(base.DbTestCase): datetime.datetime(2010, 8, 25) ) - eventlet.sleep(10) + # Wait until there are 'trigger_count' executions. + self._await( + lambda: self._wait_for_single_execution_with_multiple_processes( + trigger_count, + start_wf_mock + ) + ) - self.assertEqual(True, start_wf_mock.called) + # Wait some more and make sure there are no more than 'trigger_count' + # executions. + eventlet.sleep(5) self.assertEqual(trigger_count, start_wf_mock.call_count) + + def _wait_for_single_execution_with_multiple_processes(self, trigger_count, + start_wf_mock): + eventlet.sleep(1) + + return trigger_count == start_wf_mock.call_count diff --git a/releasenotes/notes/changing-isolation-level-to-read-committed-7080833ad284b901.yaml b/releasenotes/notes/changing-isolation-level-to-read-committed-7080833ad284b901.yaml new file mode 100644 index 00000000..f8359f03 --- /dev/null +++ b/releasenotes/notes/changing-isolation-level-to-read-committed-7080833ad284b901.yaml @@ -0,0 +1,66 @@ +--- +fixes: + - | + [`bug 1518012 `_] + [`bug 1513456 `_] + + Fix concurrency issues by using READ_COMMITTED + + This release note describes bugs: + * #1513456 - task stuck in RUNNING state when all action executions are finished regarding the problem and the fix. + * #1518012- WF execution stays in RUNNING although task and action executions are in SUCCESS. + + This fix does not require any action from Mistral users and + does not have any implications other than the bug fix. + + The state of a workflow execution was not updated even when all task + executions were completed if some tasks finished at the same time as + other tasks. + + Because we were using our connections with transaction isolation + level = REPEATABLE_READ - Each process was using a snapshot of the DB + created at the first read statement in that transaction. + When a task finished and evaluated the state of all the other tasks + it did not see the up-to-date state of those tasks - and so, because + not all tasks were completed - the task did not change the workflow + execution state. + + Similar behavior happened with multiple action executions under same + task. On completion, each action execution checked the status of the + other action executions and did not see the up-to-date state of these + action execution - causing task execution to stay in RUNNING state. + + The solution is to change DB transaction isolation level from + REPEATABLE_READ to READ_COMMITTED so process A can see changes committed + in other transactions even if process A is in the middle of a transaction. + + A short explaination regarding the different isolation levels: + + - | + + REPEATABLE_READ - while in transaction, the first read operation to the + DB creates a snapshot of the entire DB so you are guarantee that all the + data in the DB will remain the same until the end of the transaction. + + REPEATABLE_READ example: + * ConnectionA selects from tableA in a transaction. + * ConnectionB deletes all rows from tableB in a transaction. + * ConnectionB commits. + * ConnectionA loops over the rows of tableA and fetches from tableB using the tableA_tableB_FK - ConnectionA will get rows from tableB. + + - | + + READ_COMMITTED - while in a transaction, every query to the DB will get + the committed data. + + READ_COMMITTED example: + * ConnectionA starts a transaction. + * ConnectionB starts a transaction. + * ConnectionA insert row to tableA and commits. + * ConnectionB insert row to tableA. + * ConnectionB selects tableA and gets two rows. + * ConnectionB commits / rollback. + + Two good articles about isolation levels are: + * `Differences between READ-COMMITTED and REPEATABLE-READ transaction isolation levels `_. + * `MySQL performance implications of InnoDB isolation modes `_.