Calculate context for tasks with dependencies

Also:
 - fixed the order tree is traversed in
 - fixed `evaluate_recursively` variable leak
 - added couple more tests for uncovered cases
 - fixed premature db session flush

I had to split convey_task_result's transaction into two to give some
room for concurrency. As far as I can tell, it should not affect the
process since all the crucial data will be reread once again in the
second transaction. Anyway, this is a temporary measure and when we
switch to MySQL, we should review it once again.

Closes-bug: #1339614
Change-Id: I9246931749f13df157d474cf75755462f7336bc7
This commit is contained in:
Kirill Izotov 2014-07-14 17:59:56 +07:00
parent 5566129d64
commit 3453371f60
10 changed files with 245 additions and 55 deletions

View File

@ -40,7 +40,7 @@ from mistral.workbook import tasks as wb_task
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name) WF_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
def get_transport(transport=None): def get_transport(transport=None):
@ -85,8 +85,8 @@ class Engine(object):
context = copy.copy(context) if context else {} context = copy.copy(context) if context else {}
WORKFLOW_TRACE.info("New execution started - [workbook_name = '%s', " WF_TRACE.info("New execution started - [workbook_name = '%s', "
"task_name = '%s']" % (workbook_name, task_name)) "task_name = '%s']" % (workbook_name, task_name))
db_api.start_tx() db_api.start_tx()
@ -117,11 +117,12 @@ class Engine(object):
# Update task with new context and params. # Update task with new context and params.
executables = data_flow.prepare_tasks(tasks_to_start, executables = data_flow.prepare_tasks(tasks_to_start,
context, context,
workbook) workbook,
tasks)
db_api.commit_tx() db_api.commit_tx()
except Exception as e: except Exception as e:
msg = "Failed to create necessary DB objects: %s" % e msg = "Failed to start workflow execution: %s" % e
LOG.exception(msg) LOG.exception(msg)
raise exc.EngineException(msg) raise exc.EngineException(msg)
finally: finally:
@ -172,18 +173,17 @@ class Engine(object):
result = kwargs.get('result') result = kwargs.get('result')
db_api.start_tx() db_api.start_tx()
try: try:
# TODO(rakhmerov): validate state transition # TODO(rakhmerov): validate state transition
task = db_api.task_get(task_id) task = db_api.task_get(task_id)
workbook = self._get_workbook(task['workbook_name']) workbook = self._get_workbook(task['workbook_name'])
wf_trace_msg = "Task '%s' [%s -> %s" % \ if state == states.ERROR:
(task['name'], task['state'], state) WF_TRACE.info("Task '%s' [%s -> %s]" %
(task['name'], task['state'], state))
wf_trace_msg += ']' if state == states.ERROR \ else:
else ", result = %s]" % result WF_TRACE.info("Task '%s' [%s -> %s, result = %s]" %
WORKFLOW_TRACE.info(wf_trace_msg) (task['name'], task['state'], state, result))
action_name = wb_task.TaskSpec(task['task_spec'])\ action_name = wb_task.TaskSpec(task['task_spec'])\
.get_full_action_name() .get_full_action_name()
@ -206,20 +206,43 @@ class Engine(object):
task, context = self._update_task(workbook, task, state, task, context = self._update_task(workbook, task, state,
task_output) task_output)
execution = db_api.execution_get(task['execution_id'])
self._create_next_tasks(task, workbook) self._create_next_tasks(task, workbook)
# At that point, sqlalchemy tries to flush the changes in task
# to the db and, in some cases, hits sqlite database lock
# established by another thread of convey_task_results executed
# at the same time (for example, as a result of two std.echo
# tasks started one after another within the same self._run_task
# call). By separating the transaction into two, we creating a
# window of opportunity for task changes to be flushed. The
# possible ramifications are unclear at the moment and should be
# a subject of further review.
# TODO(rakhmerov): review the possibility to use a single
# transaction after switching to the db with better support of
# concurrency.
db_api.commit_tx()
except Exception as e:
msg = "Failed to save task result: %s" % e
LOG.exception(msg)
raise exc.EngineException(msg)
finally:
db_api.end_tx()
db_api.start_tx()
try:
execution = db_api.execution_get(task['execution_id'])
# Determine what tasks need to be started. # Determine what tasks need to be started.
tasks = db_api.tasks_get(execution_id=task['execution_id']) tasks = db_api.tasks_get(execution_id=execution['id'])
new_exec_state = self._determine_execution_state(execution, tasks) new_exec_state = self._determine_execution_state(execution, tasks)
if execution['state'] != new_exec_state: if execution['state'] != new_exec_state:
wf_trace_msg = \ WF_TRACE.info(
"Execution '%s' [%s -> %s]" % \ "Execution '%s' [%s -> %s]" %
(execution['id'], execution['state'], new_exec_state) (execution['id'], execution['state'], new_exec_state)
WORKFLOW_TRACE.info(wf_trace_msg) )
execution = db_api.execution_update(execution['id'], { execution = db_api.execution_update(execution['id'], {
"state": new_exec_state "state": new_exec_state
@ -239,11 +262,12 @@ class Engine(object):
# Update task with new context and params. # Update task with new context and params.
executables = data_flow.prepare_tasks(tasks_to_start, executables = data_flow.prepare_tasks(tasks_to_start,
context, context,
workbook) workbook,
tasks)
db_api.commit_tx() db_api.commit_tx()
except Exception as e: except Exception as e:
msg = "Failed to create necessary DB objects: %s" % e msg = "Failed to queue next batch of tasks: %s" % e
LOG.exception(msg) LOG.exception(msg)
raise exc.EngineException(msg) raise exc.EngineException(msg)
finally: finally:
@ -325,7 +349,7 @@ class Engine(object):
@classmethod @classmethod
def _create_tasks(cls, task_list, workbook, workbook_name, execution_id): def _create_tasks(cls, task_list, workbook, workbook_name, execution_id):
tasks = [] tasks = {}
for task in task_list: for task in task_list:
state, task_runtime_context = retry.get_task_runtime(task) state, task_runtime_context = retry.get_task_runtime(task)
@ -333,7 +357,8 @@ class Engine(object):
db_task = db_api.task_create(execution_id, { db_task = db_api.task_create(execution_id, {
"name": task.name, "name": task.name,
"requires": task.get_requires(), "requires": [tasks[name]['id'] for name
in task.get_requires()],
"task_spec": task.to_dict(), "task_spec": task.to_dict(),
"action_spec": {} if not action_spec "action_spec": {} if not action_spec
else action_spec.to_dict(), else action_spec.to_dict(),
@ -343,9 +368,9 @@ class Engine(object):
"workbook_name": workbook_name "workbook_name": workbook_name
}) })
tasks.append(db_task) tasks[db_task['name']] = db_task
return tasks return tasks.values()
@classmethod @classmethod
def _get_workbook(cls, workbook_name): def _get_workbook(cls, workbook_name):
@ -409,14 +434,16 @@ class Engine(object):
execution_id = task['execution_id'] execution_id = task['execution_id']
execution = db_api.execution_get(execution_id) execution = db_api.execution_get(execution_id)
tasks = db_api.tasks_get(execution_id=execution_id)
# Change state from DELAYED to RUNNING. # Change state from DELAYED to RUNNING.
WORKFLOW_TRACE.info("Task '%s' [%s -> %s]" WF_TRACE.info("Task '%s' [%s -> %s]" %
% (task['name'], (task['name'], task['state'], states.RUNNING))
task['state'], states.RUNNING))
executables = data_flow.prepare_tasks([task], executables = data_flow.prepare_tasks([task],
outbound_context, outbound_context,
workbook) workbook,
tasks)
db_api.commit_tx() db_api.commit_tx()
finally: finally:
db_api.end_tx() db_api.end_tx()

View File

@ -60,12 +60,23 @@ def evaluate_task_parameters(task, context):
return expr.evaluate_recursively(params, context) return expr.evaluate_recursively(params, context)
def prepare_tasks(tasks, context, workbook): def build_required_context(task, tasks):
context = {}
for req_task in tasks:
if req_task['id'] in task.get('requires', []):
_merge_dicts(context, get_outbound_context(req_task))
return context
def prepare_tasks(tasks_to_start, context, workbook, tasks):
results = [] results = []
for task in tasks: for task in tasks_to_start:
# TODO(rakhmerov): Inbound context should be a merge of
# outbound contexts of task dependencies, if any. context = _merge_dicts(context, build_required_context(task, tasks))
action_params = evaluate_task_parameters(task, context) action_params = evaluate_task_parameters(task, context)
db_api.task_update(task['id'], db_api.task_update(task['id'],

View File

@ -32,10 +32,9 @@ def find_workflow_tasks(workbook, task_name):
_update_dependencies(wb_tasks, full_graph) _update_dependencies(wb_tasks, full_graph)
graph = _get_subgraph(full_graph, task_name) # Find the list of the tasks in the order they supposed to be executed
tasks = [] tasks = [wb_tasks[node] for node
for node in graph: in traversal.dfs_postorder_nodes(full_graph.reverse(), task_name)]
tasks.append(wb_tasks[node])
return tasks return tasks
@ -50,7 +49,7 @@ def find_resolved_tasks(tasks):
allows += [t['name']] allows += [t['name']]
allow_set = set(allows) allow_set = set(allows)
for t in tasks: for t in tasks:
deps = t.get('requires', []) deps = t['task_spec'].get('requires', {}).keys()
if len(set(deps) - allow_set) == 0: if len(set(deps) - allow_set) == 0:
# all required tasks, if any, are SUCCESS # all required tasks, if any, are SUCCESS
if t['state'] == states.IDLE: if t['state'] == states.IDLE:
@ -124,14 +123,6 @@ def is_error(tasks):
return all(task['state'] == states.ERROR for task in tasks) return all(task['state'] == states.ERROR for task in tasks)
def _get_subgraph(full_graph, task_name):
nodes_set = traversal.dfs_predecessors(full_graph.reverse(),
task_name).keys()
nodes_set.append(task_name)
return full_graph.subgraph(nodes_set)
def _get_dependency_tasks(tasks, task): def _get_dependency_tasks(tasks, task):
if len(tasks[task].requires) < 1: if len(tasks[task].requires) < 1:
return [] return []

View File

@ -15,6 +15,7 @@
# limitations under the License. # limitations under the License.
import abc import abc
import copy
import re import re
import six import six
import yaql import yaql
@ -128,6 +129,8 @@ def _evaluate_item(item, context):
def evaluate_recursively(data, context): def evaluate_recursively(data, context):
data = copy.copy(data)
if not context: if not context:
return data return data

View File

@ -23,4 +23,4 @@ Workflow:
action: MyActions.concat action: MyActions.concat
parameters: parameters:
left: Greetings left: Greetings
right: {$.string} right: {$.string}

View File

@ -0,0 +1,62 @@
Namespaces:
MyService:
# These ad-hoc actions based on std.echo have parameters only for test
# purposes. In practice, it's more convenient just to use std.echo and
# specify parameter 'output'.
actions:
concat:
class: std.echo
base-parameters:
output: '{$.left} {$.right}'
parameters:
- left
- right
output:
string: $
Workflow:
# context = {
# 'person': {
# 'first_name': 'John',
# 'last_name': 'Doe',
# 'address': {
# 'street': '124352 Broadway Street',
# 'city': 'Gloomington',
# 'country': 'USA'
# }
# }
# }
tasks:
build_full_name:
action: MyService.concat
parameters:
left: $.person.first_name
right: $.person.last_name
publish:
full_name: $.string
build_greeting:
requires: [build_full_name]
action: MyService.concat
parameters:
left: Dear
right: $.full_name
publish:
greeting: $.string
build_address:
requires: [build_full_name]
action: MyService.concat
parameters:
left: To
right: $.full_name
publish:
address: $.string
send_greeting:
requires: [build_address, build_greeting]
action: MyService.concat
parameters:
left: '{$.address}.'
right: '{$.greeting},..'

View File

@ -0,0 +1,29 @@
Workflow:
tasks:
task:
requires: [atask, pretask]
action: std.echo
parameters:
output: some
btask:
requires: [ztask]
action: std.echo
parameters:
output: some
ztask:
action: std.echo
parameters:
output: some
atask:
action: std.echo
parameters:
output: some
ctask:
action: std.echo
parameters:
output: some
pretask:
requires: [btask, ztask]
action: std.echo
parameters:
output: some

View File

@ -61,6 +61,13 @@ def create_workbook(definition_path):
}) })
def context_contains_required(task):
requires = task.get('task_spec').get('requires')
subcontexts = task.get('in_context').get('task', {})
return set(requires.keys()).issubset(set(subcontexts.keys()))
@mock.patch.object( @mock.patch.object(
engine.EngineClient, 'start_workflow_execution', engine.EngineClient, 'start_workflow_execution',
mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow)) mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow))
@ -250,6 +257,41 @@ class DataFlowTest(base.EngineTestCase):
}, },
send_greeting_task['output']) send_greeting_task['output'])
def test_task_with_diamond_dependencies(self):
CTX = copy.copy(CONTEXT)
wb = create_workbook('data_flow/task_with_diamond_dependencies.yaml')
execution = self.engine.start_workflow_execution(wb['name'],
'send_greeting',
CTX)
# We have to reread execution to get its latest version.
execution = db_api.execution_get(execution['id'])
self.assertEqual(states.SUCCESS, execution['state'])
self.assertDictEqual(CTX, execution['context'])
tasks = db_api.tasks_get(workbook_name=wb['name'],
execution_id=execution['id'])
self.assertEqual(4, len(tasks))
results = {
'build_full_name': ('full_name', 'John Doe'),
'build_address': ('address', 'To John Doe'),
'build_greeting': ('greeting', 'Dear John Doe'),
'send_greeting': ('task',
{'send_greeting':
{'string': 'To John Doe. Dear John Doe,..'}})
}
for task in tasks:
self.assertTrue(context_contains_required(task),
"Task context is incomplete: %s" % task['name'])
key, value = results[task['name']]
self.assertEqual(value, task['output'][key])
def test_two_subsequent_tasks(self): def test_two_subsequent_tasks(self):
CTX = copy.copy(CONTEXT) CTX = copy.copy(CONTEXT)

View File

@ -96,7 +96,7 @@ class DataFlowModuleTest(base.DbTestCase):
db_api.task_create(EXEC_ID, TASK2.copy()) db_api.task_create(EXEC_ID, TASK2.copy())
] ]
executables = data_flow.prepare_tasks(tasks, CONTEXT, wb) executables = data_flow.prepare_tasks(tasks, CONTEXT, wb, tasks)
self.assertEqual(2, len(executables)) self.assertEqual(2, len(executables))

View File

@ -21,19 +21,27 @@ from mistral.tests import base
TASKS = [ TASKS = [
{ {
'requires': {},
'name': 'backup-vms', 'name': 'backup-vms',
'state': states.IDLE 'state': states.IDLE,
'task_spec': {
'requires': {}
}
}, },
{ {
'requires': {},
'name': 'create-vms', 'name': 'create-vms',
'state': states.SUCCESS 'state': states.SUCCESS,
'task_spec': {
'requires': {}
}
}, },
{ {
'requires': ['create-vms'],
'name': 'attach-volume', 'name': 'attach-volume',
'state': states.IDLE 'state': states.IDLE,
'task_spec': {
'requires': {
'create-vms': ''
}
}
} }
] ]
@ -41,16 +49,33 @@ TASKS = [
class WorkflowTest(base.DbTestCase): class WorkflowTest(base.DbTestCase):
def setUp(self): def setUp(self):
super(WorkflowTest, self).setUp() super(WorkflowTest, self).setUp()
self.parser = parser.get_workbook(base.get_resource("test_rest.yaml"))
def test_find_workflow_tasks(self): def test_find_workflow_tasks(self):
tasks = workflow.find_workflow_tasks(self.parser, "attach-volumes") tasks = workflow.find_workflow_tasks(
parser.get_workbook(base.get_resource("test_rest.yaml")),
"attach-volumes"
)
self.assertEqual(2, len(tasks)) self.assertEqual(2, len(tasks))
self._assert_single_item(tasks, name='create-vms') self._assert_single_item(tasks, name='create-vms')
self._assert_single_item(tasks, name='attach-volumes') self._assert_single_item(tasks, name='attach-volumes')
def test_find_workflow_tasks_order(self):
tasks = workflow.find_workflow_tasks(
parser.get_workbook(base.get_resource("test_order.yaml")),
'task'
)
self.assertEqual(5, len(tasks))
completed = set()
for i, task in enumerate(tasks):
self.assertTrue(set(task.requires.keys()).issubset(completed),
"Task %s isn't completed yet" % task.name)
completed.add(task.name)
def test_tasks_to_start(self): def test_tasks_to_start(self):
tasks_to_start = workflow.find_resolved_tasks(TASKS) tasks_to_start = workflow.find_resolved_tasks(TASKS)
self.assertEqual(len(tasks_to_start), 2) self.assertEqual(len(tasks_to_start), 2)