Fix concurrency issues by using READ_COMMITTED

The state of a workflow execution was not updated even when all task
executions were completed if some tasks finished at the same time as
other tasks.
Because we were using our connections with transaction isolation
level = REPEATABLE_READ - Each process was using a snapshot of the DB
created at the first read statement in that transaction.
When a task finished and evaluated the state of all the other tasks
it did not see the up-to-date state of those tasks - and so, because
not all tasks were completed - the task did not change the workflow
execution state.

Similar behavior happened with multiple action executions under same
task. On completion, each action execution checked the status of the
other action executions and did not see the up-to-date state of these
action execution - causing task execution to stay in RUNNING state.

Change-Id: I12f66134d92b8ed39df9d6128d7de5ee49aa8623
Closes-Bug: #1518012
Closes-Bug: #1513456
This commit is contained in:
Moshe Elisha 2015-12-05 15:31:24 +00:00
parent 209319e507
commit 976667c0a2
13 changed files with 202 additions and 97 deletions

View File

@ -20,6 +20,7 @@ from oslo_config import cfg
from oslo_db import options
from oslo_db.sqlalchemy import session as db_session
from oslo_log import log as logging
import sqlalchemy as sa
from mistral.db.sqlalchemy import sqlite_lock
from mistral import exceptions as exc
@ -34,6 +35,7 @@ options.set_defaults(cfg.CONF, connection="sqlite:///mistral.sqlite")
_DB_SESSION_THREAD_LOCAL_NAME = "db_sql_alchemy_session"
_facade = None
_sqlalchemy_create_engine_orig = sa.create_engine
def _get_facade():
@ -50,7 +52,25 @@ def _get_facade():
return _facade
# Monkey-patching sqlalchemy to set the isolation_level
# as this configuration is not exposed by oslo_db.
def _sqlalchemy_create_engine_wrapper(*args, **kwargs):
# sqlite (used for unit testing and not allowed for production)
# does not support READ_COMMITTED.
# Checking the drivername using the args and not the get_driver_name()
# method because that method requires a session.
if args[0].drivername != 'sqlite':
kwargs["isolation_level"] = "READ_COMMITTED"
return _sqlalchemy_create_engine_orig(*args, **kwargs)
def get_engine():
# If the patch was not applied yet.
if sa.create_engine != _sqlalchemy_create_engine_wrapper:
# Replace the original create_engine with our wrapper.
sa.create_engine = _sqlalchemy_create_engine_wrapper
return _get_facade().get_engine()

View File

@ -63,7 +63,7 @@ def transaction():
def acquire_lock(model, id):
IMPL.acquire_lock(model, id)
return IMPL.acquire_lock(model, id)
# Workbooks.

View File

@ -91,15 +91,24 @@ def transaction():
@b.session_aware()
def acquire_lock(model, id, session=None):
if b.get_driver_name() != 'sqlite':
query = _secure_query(model).filter("id = '%s'" % id)
# Expire all so all objects queried after lock is acquired
# will be up-to-date from the DB and not from cache.
session.expire_all()
if b.get_driver_name() != 'sqlite':
entity = _get_one_entity(model, id)
entity.update({'updated_at': timeutils.utcnow()})
query.update(
{'updated_at': timeutils.utcnow()},
synchronize_session='fetch',
)
else:
sqlite_lock.acquire_lock(id, session)
entity = _get_one_entity(model, id)
return entity
def _get_one_entity(model, id):
# Get entity by ID and expect exactly one object.
return _secure_query(model).filter(model.id == id).one()
def _secure_query(model, *columns):

View File

@ -156,11 +156,7 @@ class DefaultEngine(base.Engine, coordination.Service):
# policy worked.
wf_ex_id = task_ex.workflow_execution_id
# Must be before loading the object itself (see method doc).
self._lock_workflow_execution(wf_ex_id)
wf_ex = task_ex.workflow_execution
wf_ex = self._lock_workflow_execution(wf_ex_id)
wf_trace.info(
task_ex,
@ -234,13 +230,9 @@ class DefaultEngine(base.Engine, coordination.Service):
).get_clone()
wf_ex_id = action_ex.task_execution.workflow_execution_id
wf_ex = self._lock_workflow_execution(wf_ex_id)
# Must be before loading the object itself (see method doc).
self._lock_workflow_execution(wf_ex_id)
wf_ex = action_ex.task_execution.workflow_execution
task_ex = task_handler.on_action_complete(action_ex, result)
task_handler.on_action_complete(action_ex, result)
# If workflow is on pause or completed then there's no
# need to continue workflow.
@ -254,7 +246,9 @@ class DefaultEngine(base.Engine, coordination.Service):
with db_api.transaction():
action_ex = db_api.get_action_execution(action_ex_id)
task_ex = action_ex.task_execution
wf_ex = action_ex.task_execution.workflow_execution
wf_ex = self._lock_workflow_execution(
task_ex.workflow_execution_id
)
self._on_task_state_change(task_ex, wf_ex)
return action_ex.get_clone()
@ -266,17 +260,16 @@ class DefaultEngine(base.Engine, coordination.Service):
action_ex_id, e, traceback.format_exc()
)
self._fail_workflow(wf_ex_id, e)
# If an exception was thrown after we got the wf_ex_id
if wf_ex_id:
self._fail_workflow(wf_ex_id, e)
raise e
@u.log_exec(LOG)
def pause_workflow(self, execution_id):
with db_api.transaction():
# Must be before loading the object itself (see method doc).
self._lock_workflow_execution(execution_id)
wf_ex = db_api.get_workflow_execution(execution_id)
wf_ex = self._lock_workflow_execution(execution_id)
wf_handler.set_execution_state(wf_ex, states.PAUSED)
@ -323,16 +316,13 @@ class DefaultEngine(base.Engine, coordination.Service):
def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True):
try:
with db_api.transaction():
# Must be before loading the object itself (see method doc).
self._lock_workflow_execution(wf_ex_id)
wf_ex = self._lock_workflow_execution(wf_ex_id)
task_ex = db_api.get_task_execution(task_ex_id)
if task_ex.workflow_execution.id != wf_ex_id:
raise ValueError('Workflow execution ID does not match.')
wf_ex = task_ex.workflow_execution
if wf_ex.state == states.PAUSED:
return wf_ex.get_clone()
@ -349,10 +339,7 @@ class DefaultEngine(base.Engine, coordination.Service):
def resume_workflow(self, wf_ex_id):
try:
with db_api.transaction():
# Must be before loading the object itself (see method doc).
self._lock_workflow_execution(wf_ex_id)
wf_ex = db_api.get_workflow_execution(wf_ex_id)
wf_ex = self._lock_workflow_execution(wf_ex_id)
if wf_ex.state != states.PAUSED:
return wf_ex.get_clone()
@ -369,10 +356,7 @@ class DefaultEngine(base.Engine, coordination.Service):
@u.log_exec(LOG)
def stop_workflow(self, execution_id, state, message=None):
with db_api.transaction():
# Must be before loading the object itself (see method doc).
self._lock_workflow_execution(execution_id)
wf_ex = db_api.get_execution(execution_id)
wf_ex = self._lock_workflow_execution(execution_id)
return self._stop_workflow(wf_ex, state, message)
@ -506,9 +490,7 @@ class DefaultEngine(base.Engine, coordination.Service):
@staticmethod
def _lock_workflow_execution(wf_exec_id):
# NOTE: Workflow execution object must be locked before
# loading the object itself into the session (either with
# 'get_XXX' or 'load_XXX' methods). Otherwise, there can be
# multiple parallel transactions that see the same state
# and hence the rest of the method logic would not be atomic.
db_api.acquire_lock(db_models.WorkflowExecution, wf_exec_id)
# Locks a workflow execution using the db_api.acquire_lock function.
# The method expires all session objects and returns the up-to-date
# workflow execution from the DB.
return db_api.acquire_lock(db_models.WorkflowExecution, wf_exec_id)

View File

@ -557,6 +557,32 @@ class ExecutionTestsV2(base.TestCase):
self.reverse_wf['name'],
params={"task_name": "nonexist"})
@test.attr(type='sanity')
def test_action_ex_concurrency(self):
resp, wf = self.client.create_workflow("wf_action_ex_concurrency.yaml")
self.assertEqual(201, resp.status)
wf_name = wf['workflows'][0]['name']
resp, execution = self.client.create_execution(wf_name)
self.assertEqual(201, resp.status)
self.assertEqual('RUNNING', execution['state'])
self.client.wait_execution_success(execution)
@test.attr(type='sanity')
def test_task_ex_concurrency(self):
resp, wf = self.client.create_workflow("wf_task_ex_concurrency.yaml")
self.assertEqual(201, resp.status)
wf_name = wf['workflows'][0]['name']
resp, execution = self.client.create_execution(wf_name)
self.assertEqual(201, resp.status)
self.assertEqual('RUNNING', execution['state'])
self.client.wait_execution(execution, target_state='ERROR')
class CronTriggerTestsV2(base.TestCase):

View File

@ -89,14 +89,19 @@ class MistralClientBase(rest_client.RestClient):
return resp, json.loads(body)
def wait_execution_success(self, ex_body, timeout=180, url='executions'):
return self.wait_execution(ex_body, timeout=timeout, url=url)
def wait_execution(self, ex_body, timeout=180, url='executions',
target_state='SUCCESS'):
start_time = time.time()
expected_states = ['SUCCESS', 'RUNNING']
expected_states = [target_state, 'RUNNING']
while ex_body['state'] != 'SUCCESS':
while ex_body['state'] != target_state:
if time.time() - start_time > timeout:
msg = ("Execution exceeds timeout {0} to change state "
"to SUCCESS. Execution: {1}".format(timeout, ex_body))
msg = ("Execution exceeds timeout {0} "
"to change state to {1}. "
"Execution: {2}".format(timeout, target_state, ex_body))
raise exceptions.TimeoutException(msg)
_, ex_body = self.get_object(url, ex_body['id'])

View File

@ -0,0 +1,8 @@
---
version: '2.0'
test_action_ex_concurrency:
tasks:
test_with_items:
with-items: index in <% range(2) %>
action: std.echo output='<% $.index %>'

View File

@ -0,0 +1,11 @@
---
version: '2.0'
test_task_ex_concurrency:
tasks:
task1:
action: std.async_noop
timeout: 2
task2:
action: std.async_noop
timeout: 2

View File

@ -92,16 +92,11 @@ class SQLiteLocksTest(test_base.DbTestCase):
self._random_sleep()
with db_api.transaction():
# Here we lock the object before it gets loaded into the
# session and prevent reading the same object state by
# multiple transactions. Hence the rest of the transaction
# body works atomically (in a serialized manner) and the
# result (object name) must be equal to a number of
# transactions.
db_api.acquire_lock(db_models.WorkflowExecution, wf_ex.id)
# Lock workflow execution and get the most up-to-date object.
wf_ex = db_api.acquire_lock(db_models.WorkflowExecution, wf_ex.id)
# Refresh the object.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
db_api.get_workflow_execution(wf_ex.id)
wf_ex.name = str(int(wf_ex.name) + 1)
@ -127,42 +122,3 @@ class SQLiteLocksTest(test_base.DbTestCase):
print("Correct locking test gave object name: %s" % wf_ex.name)
self.assertEqual(str(number), wf_ex.name)
def _run_invalid_locking(self, wf_ex):
self._random_sleep()
with db_api.transaction():
# Load object into the session (transaction).
wf_ex = db_api.get_workflow_execution(wf_ex.id)
# It's too late to lock the object here because it's already
# been loaded into the session so there should be multiple
# threads that read the same object state so they write the
# same value into DB. As a result we won't get a result
# (object name) equal to a number of transactions.
db_api.acquire_lock(db_models.WorkflowExecution, wf_ex.id)
wf_ex.name = str(int(wf_ex.name) + 1)
return wf_ex.name
def test_invalid_locking(self):
wf_ex = db_api.create_workflow_execution(WF_EXEC)
threads = []
number = 500
for i in range(1, number):
threads.append(
eventlet.spawn(self._run_invalid_locking, wf_ex)
)
[t.wait() for t in threads]
[t.kill() for t in threads]
wf_ex = db_api.get_workflow_execution(wf_ex.id)
print("Invalid locking test gave object name: %s" % wf_ex.name)
self.assertNotEqual(str(number), wf_ex.name)

View File

@ -166,3 +166,6 @@ class EngineTestCase(base.DbTestCase):
def is_task_delayed(self, task_ex_id):
return self.is_task_in_state(task_ex_id, states.RUNNING_DELAYED)
def is_task_processed(self, task_ex_id):
return db_api.get_task_execution(task_ex_id).processed

View File

@ -303,6 +303,11 @@ class WorkflowResumeTest(base.EngineTestCase):
self.assertEqual(states.RUNNING, wf_ex.state)
# Wait for task3 to be processed.
task3_ex = self._assert_single_item(task_execs, name='task3')
self._await(lambda: self.is_task_success(task3_ex.id))
self._await(lambda: self.is_task_processed(task3_ex.id))
# Finish task2.
task2_action_ex = db_api.get_action_executions(
task_execution_id=task2_ex.id
@ -314,7 +319,7 @@ class WorkflowResumeTest(base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertEqual(states.SUCCESS, wf_ex.state, wf_ex.state_info)
self.assertEqual(4, len(wf_ex.task_executions))
@mock.patch.object(de.DefaultEngine, '_fail_workflow')
@ -335,7 +340,7 @@ class WorkflowResumeTest(base.EngineTestCase):
with mock.patch.object(
db_api,
'get_workflow_execution',
'acquire_lock',
side_effect=err):
self.assertRaises(

View File

@ -281,7 +281,21 @@ class TriggerServiceV2Test(base.DbTestCase):
datetime.datetime(2010, 8, 25)
)
eventlet.sleep(10)
# Wait until there are 'trigger_count' executions.
self._await(
lambda: self._wait_for_single_execution_with_multiple_processes(
trigger_count,
start_wf_mock
)
)
self.assertEqual(True, start_wf_mock.called)
# Wait some more and make sure there are no more than 'trigger_count'
# executions.
eventlet.sleep(5)
self.assertEqual(trigger_count, start_wf_mock.call_count)
def _wait_for_single_execution_with_multiple_processes(self, trigger_count,
start_wf_mock):
eventlet.sleep(1)
return trigger_count == start_wf_mock.call_count

View File

@ -0,0 +1,66 @@
---
fixes:
- |
[`bug 1518012 <https://bugs.launchpad.net/keystone/+bug/1518012>`_]
[`bug 1513456 <https://bugs.launchpad.net/keystone/+bug/1513456>`_]
Fix concurrency issues by using READ_COMMITTED
This release note describes bugs:
* #1513456 - task stuck in RUNNING state when all action executions are finished regarding the problem and the fix.
* #1518012- WF execution stays in RUNNING although task and action executions are in SUCCESS.
This fix does not require any action from Mistral users and
does not have any implications other than the bug fix.
The state of a workflow execution was not updated even when all task
executions were completed if some tasks finished at the same time as
other tasks.
Because we were using our connections with transaction isolation
level = REPEATABLE_READ - Each process was using a snapshot of the DB
created at the first read statement in that transaction.
When a task finished and evaluated the state of all the other tasks
it did not see the up-to-date state of those tasks - and so, because
not all tasks were completed - the task did not change the workflow
execution state.
Similar behavior happened with multiple action executions under same
task. On completion, each action execution checked the status of the
other action executions and did not see the up-to-date state of these
action execution - causing task execution to stay in RUNNING state.
The solution is to change DB transaction isolation level from
REPEATABLE_READ to READ_COMMITTED so process A can see changes committed
in other transactions even if process A is in the middle of a transaction.
A short explaination regarding the different isolation levels:
- |
REPEATABLE_READ - while in transaction, the first read operation to the
DB creates a snapshot of the entire DB so you are guarantee that all the
data in the DB will remain the same until the end of the transaction.
REPEATABLE_READ example:
* ConnectionA selects from tableA in a transaction.
* ConnectionB deletes all rows from tableB in a transaction.
* ConnectionB commits.
* ConnectionA loops over the rows of tableA and fetches from tableB using the tableA_tableB_FK - ConnectionA will get rows from tableB.
- |
READ_COMMITTED - while in a transaction, every query to the DB will get
the committed data.
READ_COMMITTED example:
* ConnectionA starts a transaction.
* ConnectionB starts a transaction.
* ConnectionA insert row to tableA and commits.
* ConnectionB insert row to tableA.
* ConnectionB selects tableA and gets two rows.
* ConnectionB commits / rollback.
Two good articles about isolation levels are:
* `Differences between READ-COMMITTED and REPEATABLE-READ transaction isolation levels <https://www.percona.com/blog/2015/01/14/mysql-performance-implications-of-innodb-isolation-modes/>`_.
* `MySQL performance implications of InnoDB isolation modes <https://www.percona.com/blog/2012/08/28/differences-between-read-committed-and-repeatable-read-transaction-isolation-levels/>`_.