Merge "Calculate context for tasks with dependencies"
This commit is contained in:
commit
a9ec0c1c4d
@ -40,7 +40,7 @@ from mistral.workbook import tasks as wb_task
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
|
||||
WF_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
|
||||
|
||||
|
||||
def get_transport(transport=None):
|
||||
@ -85,8 +85,8 @@ class Engine(object):
|
||||
|
||||
context = copy.copy(context) if context else {}
|
||||
|
||||
WORKFLOW_TRACE.info("New execution started - [workbook_name = '%s', "
|
||||
"task_name = '%s']" % (workbook_name, task_name))
|
||||
WF_TRACE.info("New execution started - [workbook_name = '%s', "
|
||||
"task_name = '%s']" % (workbook_name, task_name))
|
||||
|
||||
db_api.start_tx()
|
||||
|
||||
@ -117,11 +117,12 @@ class Engine(object):
|
||||
# Update task with new context and params.
|
||||
executables = data_flow.prepare_tasks(tasks_to_start,
|
||||
context,
|
||||
workbook)
|
||||
workbook,
|
||||
tasks)
|
||||
|
||||
db_api.commit_tx()
|
||||
except Exception as e:
|
||||
msg = "Failed to create necessary DB objects: %s" % e
|
||||
msg = "Failed to start workflow execution: %s" % e
|
||||
LOG.exception(msg)
|
||||
raise exc.EngineException(msg)
|
||||
finally:
|
||||
@ -172,18 +173,17 @@ class Engine(object):
|
||||
result = kwargs.get('result')
|
||||
|
||||
db_api.start_tx()
|
||||
|
||||
try:
|
||||
# TODO(rakhmerov): validate state transition
|
||||
task = db_api.task_get(task_id)
|
||||
workbook = self._get_workbook(task['workbook_name'])
|
||||
|
||||
wf_trace_msg = "Task '%s' [%s -> %s" % \
|
||||
(task['name'], task['state'], state)
|
||||
|
||||
wf_trace_msg += ']' if state == states.ERROR \
|
||||
else ", result = %s]" % result
|
||||
WORKFLOW_TRACE.info(wf_trace_msg)
|
||||
if state == states.ERROR:
|
||||
WF_TRACE.info("Task '%s' [%s -> %s]" %
|
||||
(task['name'], task['state'], state))
|
||||
else:
|
||||
WF_TRACE.info("Task '%s' [%s -> %s, result = %s]" %
|
||||
(task['name'], task['state'], state, result))
|
||||
|
||||
action_name = wb_task.TaskSpec(task['task_spec'])\
|
||||
.get_full_action_name()
|
||||
@ -206,20 +206,43 @@ class Engine(object):
|
||||
task, context = self._update_task(workbook, task, state,
|
||||
task_output)
|
||||
|
||||
execution = db_api.execution_get(task['execution_id'])
|
||||
|
||||
self._create_next_tasks(task, workbook)
|
||||
|
||||
# At that point, sqlalchemy tries to flush the changes in task
|
||||
# to the db and, in some cases, hits sqlite database lock
|
||||
# established by another thread of convey_task_results executed
|
||||
# at the same time (for example, as a result of two std.echo
|
||||
# tasks started one after another within the same self._run_task
|
||||
# call). By separating the transaction into two, we creating a
|
||||
# window of opportunity for task changes to be flushed. The
|
||||
# possible ramifications are unclear at the moment and should be
|
||||
# a subject of further review.
|
||||
|
||||
# TODO(rakhmerov): review the possibility to use a single
|
||||
# transaction after switching to the db with better support of
|
||||
# concurrency.
|
||||
db_api.commit_tx()
|
||||
except Exception as e:
|
||||
msg = "Failed to save task result: %s" % e
|
||||
LOG.exception(msg)
|
||||
raise exc.EngineException(msg)
|
||||
finally:
|
||||
db_api.end_tx()
|
||||
|
||||
db_api.start_tx()
|
||||
try:
|
||||
execution = db_api.execution_get(task['execution_id'])
|
||||
|
||||
# Determine what tasks need to be started.
|
||||
tasks = db_api.tasks_get(execution_id=task['execution_id'])
|
||||
tasks = db_api.tasks_get(execution_id=execution['id'])
|
||||
|
||||
new_exec_state = self._determine_execution_state(execution, tasks)
|
||||
|
||||
if execution['state'] != new_exec_state:
|
||||
wf_trace_msg = \
|
||||
"Execution '%s' [%s -> %s]" % \
|
||||
WF_TRACE.info(
|
||||
"Execution '%s' [%s -> %s]" %
|
||||
(execution['id'], execution['state'], new_exec_state)
|
||||
WORKFLOW_TRACE.info(wf_trace_msg)
|
||||
)
|
||||
|
||||
execution = db_api.execution_update(execution['id'], {
|
||||
"state": new_exec_state
|
||||
@ -239,11 +262,12 @@ class Engine(object):
|
||||
# Update task with new context and params.
|
||||
executables = data_flow.prepare_tasks(tasks_to_start,
|
||||
context,
|
||||
workbook)
|
||||
workbook,
|
||||
tasks)
|
||||
|
||||
db_api.commit_tx()
|
||||
except Exception as e:
|
||||
msg = "Failed to create necessary DB objects: %s" % e
|
||||
msg = "Failed to queue next batch of tasks: %s" % e
|
||||
LOG.exception(msg)
|
||||
raise exc.EngineException(msg)
|
||||
finally:
|
||||
@ -325,7 +349,7 @@ class Engine(object):
|
||||
|
||||
@classmethod
|
||||
def _create_tasks(cls, task_list, workbook, workbook_name, execution_id):
|
||||
tasks = []
|
||||
tasks = {}
|
||||
|
||||
for task in task_list:
|
||||
state, task_runtime_context = retry.get_task_runtime(task)
|
||||
@ -333,7 +357,8 @@ class Engine(object):
|
||||
|
||||
db_task = db_api.task_create(execution_id, {
|
||||
"name": task.name,
|
||||
"requires": task.get_requires(),
|
||||
"requires": [tasks[name]['id'] for name
|
||||
in task.get_requires()],
|
||||
"task_spec": task.to_dict(),
|
||||
"action_spec": {} if not action_spec
|
||||
else action_spec.to_dict(),
|
||||
@ -343,9 +368,9 @@ class Engine(object):
|
||||
"workbook_name": workbook_name
|
||||
})
|
||||
|
||||
tasks.append(db_task)
|
||||
tasks[db_task['name']] = db_task
|
||||
|
||||
return tasks
|
||||
return tasks.values()
|
||||
|
||||
@classmethod
|
||||
def _get_workbook(cls, workbook_name):
|
||||
@ -409,14 +434,16 @@ class Engine(object):
|
||||
execution_id = task['execution_id']
|
||||
execution = db_api.execution_get(execution_id)
|
||||
|
||||
tasks = db_api.tasks_get(execution_id=execution_id)
|
||||
|
||||
# Change state from DELAYED to RUNNING.
|
||||
|
||||
WORKFLOW_TRACE.info("Task '%s' [%s -> %s]"
|
||||
% (task['name'],
|
||||
task['state'], states.RUNNING))
|
||||
WF_TRACE.info("Task '%s' [%s -> %s]" %
|
||||
(task['name'], task['state'], states.RUNNING))
|
||||
executables = data_flow.prepare_tasks([task],
|
||||
outbound_context,
|
||||
workbook)
|
||||
workbook,
|
||||
tasks)
|
||||
db_api.commit_tx()
|
||||
finally:
|
||||
db_api.end_tx()
|
||||
|
@ -60,12 +60,23 @@ def evaluate_task_parameters(task, context):
|
||||
return expr.evaluate_recursively(params, context)
|
||||
|
||||
|
||||
def prepare_tasks(tasks, context, workbook):
|
||||
def build_required_context(task, tasks):
|
||||
context = {}
|
||||
|
||||
for req_task in tasks:
|
||||
if req_task['id'] in task.get('requires', []):
|
||||
_merge_dicts(context, get_outbound_context(req_task))
|
||||
|
||||
return context
|
||||
|
||||
|
||||
def prepare_tasks(tasks_to_start, context, workbook, tasks):
|
||||
results = []
|
||||
|
||||
for task in tasks:
|
||||
# TODO(rakhmerov): Inbound context should be a merge of
|
||||
# outbound contexts of task dependencies, if any.
|
||||
for task in tasks_to_start:
|
||||
|
||||
context = _merge_dicts(context, build_required_context(task, tasks))
|
||||
|
||||
action_params = evaluate_task_parameters(task, context)
|
||||
|
||||
db_api.task_update(task['id'],
|
||||
|
@ -32,10 +32,9 @@ def find_workflow_tasks(workbook, task_name):
|
||||
|
||||
_update_dependencies(wb_tasks, full_graph)
|
||||
|
||||
graph = _get_subgraph(full_graph, task_name)
|
||||
tasks = []
|
||||
for node in graph:
|
||||
tasks.append(wb_tasks[node])
|
||||
# Find the list of the tasks in the order they supposed to be executed
|
||||
tasks = [wb_tasks[node] for node
|
||||
in traversal.dfs_postorder_nodes(full_graph.reverse(), task_name)]
|
||||
|
||||
return tasks
|
||||
|
||||
@ -50,7 +49,7 @@ def find_resolved_tasks(tasks):
|
||||
allows += [t['name']]
|
||||
allow_set = set(allows)
|
||||
for t in tasks:
|
||||
deps = t.get('requires', [])
|
||||
deps = t['task_spec'].get('requires', {}).keys()
|
||||
if len(set(deps) - allow_set) == 0:
|
||||
# all required tasks, if any, are SUCCESS
|
||||
if t['state'] == states.IDLE:
|
||||
@ -124,14 +123,6 @@ def is_error(tasks):
|
||||
return all(task['state'] == states.ERROR for task in tasks)
|
||||
|
||||
|
||||
def _get_subgraph(full_graph, task_name):
|
||||
nodes_set = traversal.dfs_predecessors(full_graph.reverse(),
|
||||
task_name).keys()
|
||||
nodes_set.append(task_name)
|
||||
|
||||
return full_graph.subgraph(nodes_set)
|
||||
|
||||
|
||||
def _get_dependency_tasks(tasks, task):
|
||||
if len(tasks[task].requires) < 1:
|
||||
return []
|
||||
|
@ -15,6 +15,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
import copy
|
||||
import re
|
||||
import six
|
||||
import yaql
|
||||
@ -128,6 +129,8 @@ def _evaluate_item(item, context):
|
||||
|
||||
|
||||
def evaluate_recursively(data, context):
|
||||
data = copy.copy(data)
|
||||
|
||||
if not context:
|
||||
return data
|
||||
|
||||
|
@ -23,4 +23,4 @@ Workflow:
|
||||
action: MyActions.concat
|
||||
parameters:
|
||||
left: Greetings
|
||||
right: {$.string}
|
||||
right: {$.string}
|
||||
|
@ -0,0 +1,62 @@
|
||||
Namespaces:
|
||||
MyService:
|
||||
# These ad-hoc actions based on std.echo have parameters only for test
|
||||
# purposes. In practice, it's more convenient just to use std.echo and
|
||||
# specify parameter 'output'.
|
||||
actions:
|
||||
concat:
|
||||
class: std.echo
|
||||
base-parameters:
|
||||
output: '{$.left} {$.right}'
|
||||
parameters:
|
||||
- left
|
||||
- right
|
||||
output:
|
||||
string: $
|
||||
|
||||
Workflow:
|
||||
# context = {
|
||||
# 'person': {
|
||||
# 'first_name': 'John',
|
||||
# 'last_name': 'Doe',
|
||||
# 'address': {
|
||||
# 'street': '124352 Broadway Street',
|
||||
# 'city': 'Gloomington',
|
||||
# 'country': 'USA'
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
|
||||
tasks:
|
||||
build_full_name:
|
||||
action: MyService.concat
|
||||
parameters:
|
||||
left: $.person.first_name
|
||||
right: $.person.last_name
|
||||
publish:
|
||||
full_name: $.string
|
||||
|
||||
build_greeting:
|
||||
requires: [build_full_name]
|
||||
action: MyService.concat
|
||||
parameters:
|
||||
left: Dear
|
||||
right: $.full_name
|
||||
publish:
|
||||
greeting: $.string
|
||||
|
||||
build_address:
|
||||
requires: [build_full_name]
|
||||
action: MyService.concat
|
||||
parameters:
|
||||
left: To
|
||||
right: $.full_name
|
||||
publish:
|
||||
address: $.string
|
||||
|
||||
send_greeting:
|
||||
requires: [build_address, build_greeting]
|
||||
action: MyService.concat
|
||||
parameters:
|
||||
left: '{$.address}.'
|
||||
right: '{$.greeting},..'
|
29
mistral/tests/resources/test_order.yaml
Normal file
29
mistral/tests/resources/test_order.yaml
Normal file
@ -0,0 +1,29 @@
|
||||
Workflow:
|
||||
tasks:
|
||||
task:
|
||||
requires: [atask, pretask]
|
||||
action: std.echo
|
||||
parameters:
|
||||
output: some
|
||||
btask:
|
||||
requires: [ztask]
|
||||
action: std.echo
|
||||
parameters:
|
||||
output: some
|
||||
ztask:
|
||||
action: std.echo
|
||||
parameters:
|
||||
output: some
|
||||
atask:
|
||||
action: std.echo
|
||||
parameters:
|
||||
output: some
|
||||
ctask:
|
||||
action: std.echo
|
||||
parameters:
|
||||
output: some
|
||||
pretask:
|
||||
requires: [btask, ztask]
|
||||
action: std.echo
|
||||
parameters:
|
||||
output: some
|
@ -61,6 +61,13 @@ def create_workbook(definition_path):
|
||||
})
|
||||
|
||||
|
||||
def context_contains_required(task):
|
||||
requires = task.get('task_spec').get('requires')
|
||||
subcontexts = task.get('in_context').get('task', {})
|
||||
|
||||
return set(requires.keys()).issubset(set(subcontexts.keys()))
|
||||
|
||||
|
||||
@mock.patch.object(
|
||||
engine.EngineClient, 'start_workflow_execution',
|
||||
mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow))
|
||||
@ -250,6 +257,41 @@ class DataFlowTest(base.EngineTestCase):
|
||||
},
|
||||
send_greeting_task['output'])
|
||||
|
||||
def test_task_with_diamond_dependencies(self):
|
||||
CTX = copy.copy(CONTEXT)
|
||||
|
||||
wb = create_workbook('data_flow/task_with_diamond_dependencies.yaml')
|
||||
|
||||
execution = self.engine.start_workflow_execution(wb['name'],
|
||||
'send_greeting',
|
||||
CTX)
|
||||
|
||||
# We have to reread execution to get its latest version.
|
||||
execution = db_api.execution_get(execution['id'])
|
||||
|
||||
self.assertEqual(states.SUCCESS, execution['state'])
|
||||
self.assertDictEqual(CTX, execution['context'])
|
||||
|
||||
tasks = db_api.tasks_get(workbook_name=wb['name'],
|
||||
execution_id=execution['id'])
|
||||
|
||||
self.assertEqual(4, len(tasks))
|
||||
|
||||
results = {
|
||||
'build_full_name': ('full_name', 'John Doe'),
|
||||
'build_address': ('address', 'To John Doe'),
|
||||
'build_greeting': ('greeting', 'Dear John Doe'),
|
||||
'send_greeting': ('task',
|
||||
{'send_greeting':
|
||||
{'string': 'To John Doe. Dear John Doe,..'}})
|
||||
}
|
||||
|
||||
for task in tasks:
|
||||
self.assertTrue(context_contains_required(task),
|
||||
"Task context is incomplete: %s" % task['name'])
|
||||
key, value = results[task['name']]
|
||||
self.assertEqual(value, task['output'][key])
|
||||
|
||||
def test_two_subsequent_tasks(self):
|
||||
CTX = copy.copy(CONTEXT)
|
||||
|
||||
|
@ -96,7 +96,7 @@ class DataFlowModuleTest(base.DbTestCase):
|
||||
db_api.task_create(EXEC_ID, TASK2.copy())
|
||||
]
|
||||
|
||||
executables = data_flow.prepare_tasks(tasks, CONTEXT, wb)
|
||||
executables = data_flow.prepare_tasks(tasks, CONTEXT, wb, tasks)
|
||||
|
||||
self.assertEqual(2, len(executables))
|
||||
|
||||
|
@ -21,19 +21,27 @@ from mistral.tests import base
|
||||
|
||||
TASKS = [
|
||||
{
|
||||
'requires': {},
|
||||
'name': 'backup-vms',
|
||||
'state': states.IDLE
|
||||
'state': states.IDLE,
|
||||
'task_spec': {
|
||||
'requires': {}
|
||||
}
|
||||
},
|
||||
{
|
||||
'requires': {},
|
||||
'name': 'create-vms',
|
||||
'state': states.SUCCESS
|
||||
'state': states.SUCCESS,
|
||||
'task_spec': {
|
||||
'requires': {}
|
||||
}
|
||||
},
|
||||
{
|
||||
'requires': ['create-vms'],
|
||||
'name': 'attach-volume',
|
||||
'state': states.IDLE
|
||||
'state': states.IDLE,
|
||||
'task_spec': {
|
||||
'requires': {
|
||||
'create-vms': ''
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
|
||||
@ -41,16 +49,33 @@ TASKS = [
|
||||
class WorkflowTest(base.DbTestCase):
|
||||
def setUp(self):
|
||||
super(WorkflowTest, self).setUp()
|
||||
self.parser = parser.get_workbook(base.get_resource("test_rest.yaml"))
|
||||
|
||||
def test_find_workflow_tasks(self):
|
||||
tasks = workflow.find_workflow_tasks(self.parser, "attach-volumes")
|
||||
tasks = workflow.find_workflow_tasks(
|
||||
parser.get_workbook(base.get_resource("test_rest.yaml")),
|
||||
"attach-volumes"
|
||||
)
|
||||
|
||||
self.assertEqual(2, len(tasks))
|
||||
|
||||
self._assert_single_item(tasks, name='create-vms')
|
||||
self._assert_single_item(tasks, name='attach-volumes')
|
||||
|
||||
def test_find_workflow_tasks_order(self):
|
||||
tasks = workflow.find_workflow_tasks(
|
||||
parser.get_workbook(base.get_resource("test_order.yaml")),
|
||||
'task'
|
||||
)
|
||||
|
||||
self.assertEqual(5, len(tasks))
|
||||
|
||||
completed = set()
|
||||
|
||||
for i, task in enumerate(tasks):
|
||||
self.assertTrue(set(task.requires.keys()).issubset(completed),
|
||||
"Task %s isn't completed yet" % task.name)
|
||||
completed.add(task.name)
|
||||
|
||||
def test_tasks_to_start(self):
|
||||
tasks_to_start = workflow.find_resolved_tasks(TASKS)
|
||||
self.assertEqual(len(tasks_to_start), 2)
|
||||
|
Loading…
Reference in New Issue
Block a user