From 3453371f608fbb4f88bc87357caa8686b47a8333 Mon Sep 17 00:00:00 2001 From: Kirill Izotov Date: Mon, 14 Jul 2014 17:59:56 +0700 Subject: [PATCH] Calculate context for tasks with dependencies Also: - fixed the order tree is traversed in - fixed `evaluate_recursively` variable leak - added couple more tests for uncovered cases - fixed premature db session flush I had to split convey_task_result's transaction into two to give some room for concurrency. As far as I can tell, it should not affect the process since all the crucial data will be reread once again in the second transaction. Anyway, this is a temporary measure and when we switch to MySQL, we should review it once again. Closes-bug: #1339614 Change-Id: I9246931749f13df157d474cf75755462f7336bc7 --- mistral/engine/__init__.py | 83 ++++++++++++------- mistral/engine/data_flow.py | 19 ++++- mistral/engine/workflow.py | 17 +--- mistral/expressions.py | 3 + .../resources/control_flow/require_flow.yaml | 2 +- .../task_with_diamond_dependencies.yaml | 62 ++++++++++++++ mistral/tests/resources/test_order.yaml | 29 +++++++ mistral/tests/unit/engine/test_data_flow.py | 42 ++++++++++ .../unit/engine/test_data_flow_module.py | 2 +- mistral/tests/unit/engine/test_workflow.py | 41 +++++++-- 10 files changed, 245 insertions(+), 55 deletions(-) create mode 100644 mistral/tests/resources/data_flow/task_with_diamond_dependencies.yaml create mode 100644 mistral/tests/resources/test_order.yaml diff --git a/mistral/engine/__init__.py b/mistral/engine/__init__.py index 942971d51..6d22dbbc6 100644 --- a/mistral/engine/__init__.py +++ b/mistral/engine/__init__.py @@ -40,7 +40,7 @@ from mistral.workbook import tasks as wb_task LOG = logging.getLogger(__name__) -WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name) +WF_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name) def get_transport(transport=None): @@ -85,8 +85,8 @@ class Engine(object): context = copy.copy(context) if context else {} - WORKFLOW_TRACE.info("New execution started - [workbook_name = '%s', " - "task_name = '%s']" % (workbook_name, task_name)) + WF_TRACE.info("New execution started - [workbook_name = '%s', " + "task_name = '%s']" % (workbook_name, task_name)) db_api.start_tx() @@ -117,11 +117,12 @@ class Engine(object): # Update task with new context and params. executables = data_flow.prepare_tasks(tasks_to_start, context, - workbook) + workbook, + tasks) db_api.commit_tx() except Exception as e: - msg = "Failed to create necessary DB objects: %s" % e + msg = "Failed to start workflow execution: %s" % e LOG.exception(msg) raise exc.EngineException(msg) finally: @@ -172,18 +173,17 @@ class Engine(object): result = kwargs.get('result') db_api.start_tx() - try: # TODO(rakhmerov): validate state transition task = db_api.task_get(task_id) workbook = self._get_workbook(task['workbook_name']) - wf_trace_msg = "Task '%s' [%s -> %s" % \ - (task['name'], task['state'], state) - - wf_trace_msg += ']' if state == states.ERROR \ - else ", result = %s]" % result - WORKFLOW_TRACE.info(wf_trace_msg) + if state == states.ERROR: + WF_TRACE.info("Task '%s' [%s -> %s]" % + (task['name'], task['state'], state)) + else: + WF_TRACE.info("Task '%s' [%s -> %s, result = %s]" % + (task['name'], task['state'], state, result)) action_name = wb_task.TaskSpec(task['task_spec'])\ .get_full_action_name() @@ -206,20 +206,43 @@ class Engine(object): task, context = self._update_task(workbook, task, state, task_output) - execution = db_api.execution_get(task['execution_id']) - self._create_next_tasks(task, workbook) + # At that point, sqlalchemy tries to flush the changes in task + # to the db and, in some cases, hits sqlite database lock + # established by another thread of convey_task_results executed + # at the same time (for example, as a result of two std.echo + # tasks started one after another within the same self._run_task + # call). By separating the transaction into two, we creating a + # window of opportunity for task changes to be flushed. The + # possible ramifications are unclear at the moment and should be + # a subject of further review. + + # TODO(rakhmerov): review the possibility to use a single + # transaction after switching to the db with better support of + # concurrency. + db_api.commit_tx() + except Exception as e: + msg = "Failed to save task result: %s" % e + LOG.exception(msg) + raise exc.EngineException(msg) + finally: + db_api.end_tx() + + db_api.start_tx() + try: + execution = db_api.execution_get(task['execution_id']) + # Determine what tasks need to be started. - tasks = db_api.tasks_get(execution_id=task['execution_id']) + tasks = db_api.tasks_get(execution_id=execution['id']) new_exec_state = self._determine_execution_state(execution, tasks) if execution['state'] != new_exec_state: - wf_trace_msg = \ - "Execution '%s' [%s -> %s]" % \ + WF_TRACE.info( + "Execution '%s' [%s -> %s]" % (execution['id'], execution['state'], new_exec_state) - WORKFLOW_TRACE.info(wf_trace_msg) + ) execution = db_api.execution_update(execution['id'], { "state": new_exec_state @@ -239,11 +262,12 @@ class Engine(object): # Update task with new context and params. executables = data_flow.prepare_tasks(tasks_to_start, context, - workbook) + workbook, + tasks) db_api.commit_tx() except Exception as e: - msg = "Failed to create necessary DB objects: %s" % e + msg = "Failed to queue next batch of tasks: %s" % e LOG.exception(msg) raise exc.EngineException(msg) finally: @@ -325,7 +349,7 @@ class Engine(object): @classmethod def _create_tasks(cls, task_list, workbook, workbook_name, execution_id): - tasks = [] + tasks = {} for task in task_list: state, task_runtime_context = retry.get_task_runtime(task) @@ -333,7 +357,8 @@ class Engine(object): db_task = db_api.task_create(execution_id, { "name": task.name, - "requires": task.get_requires(), + "requires": [tasks[name]['id'] for name + in task.get_requires()], "task_spec": task.to_dict(), "action_spec": {} if not action_spec else action_spec.to_dict(), @@ -343,9 +368,9 @@ class Engine(object): "workbook_name": workbook_name }) - tasks.append(db_task) + tasks[db_task['name']] = db_task - return tasks + return tasks.values() @classmethod def _get_workbook(cls, workbook_name): @@ -409,14 +434,16 @@ class Engine(object): execution_id = task['execution_id'] execution = db_api.execution_get(execution_id) + tasks = db_api.tasks_get(execution_id=execution_id) + # Change state from DELAYED to RUNNING. - WORKFLOW_TRACE.info("Task '%s' [%s -> %s]" - % (task['name'], - task['state'], states.RUNNING)) + WF_TRACE.info("Task '%s' [%s -> %s]" % + (task['name'], task['state'], states.RUNNING)) executables = data_flow.prepare_tasks([task], outbound_context, - workbook) + workbook, + tasks) db_api.commit_tx() finally: db_api.end_tx() diff --git a/mistral/engine/data_flow.py b/mistral/engine/data_flow.py index 90828f402..2b0d61fad 100644 --- a/mistral/engine/data_flow.py +++ b/mistral/engine/data_flow.py @@ -60,12 +60,23 @@ def evaluate_task_parameters(task, context): return expr.evaluate_recursively(params, context) -def prepare_tasks(tasks, context, workbook): +def build_required_context(task, tasks): + context = {} + + for req_task in tasks: + if req_task['id'] in task.get('requires', []): + _merge_dicts(context, get_outbound_context(req_task)) + + return context + + +def prepare_tasks(tasks_to_start, context, workbook, tasks): results = [] - for task in tasks: - # TODO(rakhmerov): Inbound context should be a merge of - # outbound contexts of task dependencies, if any. + for task in tasks_to_start: + + context = _merge_dicts(context, build_required_context(task, tasks)) + action_params = evaluate_task_parameters(task, context) db_api.task_update(task['id'], diff --git a/mistral/engine/workflow.py b/mistral/engine/workflow.py index 51c0292b4..f69513fd5 100644 --- a/mistral/engine/workflow.py +++ b/mistral/engine/workflow.py @@ -32,10 +32,9 @@ def find_workflow_tasks(workbook, task_name): _update_dependencies(wb_tasks, full_graph) - graph = _get_subgraph(full_graph, task_name) - tasks = [] - for node in graph: - tasks.append(wb_tasks[node]) + # Find the list of the tasks in the order they supposed to be executed + tasks = [wb_tasks[node] for node + in traversal.dfs_postorder_nodes(full_graph.reverse(), task_name)] return tasks @@ -50,7 +49,7 @@ def find_resolved_tasks(tasks): allows += [t['name']] allow_set = set(allows) for t in tasks: - deps = t.get('requires', []) + deps = t['task_spec'].get('requires', {}).keys() if len(set(deps) - allow_set) == 0: # all required tasks, if any, are SUCCESS if t['state'] == states.IDLE: @@ -124,14 +123,6 @@ def is_error(tasks): return all(task['state'] == states.ERROR for task in tasks) -def _get_subgraph(full_graph, task_name): - nodes_set = traversal.dfs_predecessors(full_graph.reverse(), - task_name).keys() - nodes_set.append(task_name) - - return full_graph.subgraph(nodes_set) - - def _get_dependency_tasks(tasks, task): if len(tasks[task].requires) < 1: return [] diff --git a/mistral/expressions.py b/mistral/expressions.py index 7bfb3fad3..5a712f3fc 100644 --- a/mistral/expressions.py +++ b/mistral/expressions.py @@ -15,6 +15,7 @@ # limitations under the License. import abc +import copy import re import six import yaql @@ -128,6 +129,8 @@ def _evaluate_item(item, context): def evaluate_recursively(data, context): + data = copy.copy(data) + if not context: return data diff --git a/mistral/tests/resources/control_flow/require_flow.yaml b/mistral/tests/resources/control_flow/require_flow.yaml index 5bdf7a27e..b71e2df63 100644 --- a/mistral/tests/resources/control_flow/require_flow.yaml +++ b/mistral/tests/resources/control_flow/require_flow.yaml @@ -23,4 +23,4 @@ Workflow: action: MyActions.concat parameters: left: Greetings - right: {$.string} \ No newline at end of file + right: {$.string} diff --git a/mistral/tests/resources/data_flow/task_with_diamond_dependencies.yaml b/mistral/tests/resources/data_flow/task_with_diamond_dependencies.yaml new file mode 100644 index 000000000..8f15a5367 --- /dev/null +++ b/mistral/tests/resources/data_flow/task_with_diamond_dependencies.yaml @@ -0,0 +1,62 @@ +Namespaces: + MyService: + # These ad-hoc actions based on std.echo have parameters only for test + # purposes. In practice, it's more convenient just to use std.echo and + # specify parameter 'output'. + actions: + concat: + class: std.echo + base-parameters: + output: '{$.left} {$.right}' + parameters: + - left + - right + output: + string: $ + +Workflow: + # context = { + # 'person': { + # 'first_name': 'John', + # 'last_name': 'Doe', + # 'address': { + # 'street': '124352 Broadway Street', + # 'city': 'Gloomington', + # 'country': 'USA' + # } + # } + # } + + tasks: + build_full_name: + action: MyService.concat + parameters: + left: $.person.first_name + right: $.person.last_name + publish: + full_name: $.string + + build_greeting: + requires: [build_full_name] + action: MyService.concat + parameters: + left: Dear + right: $.full_name + publish: + greeting: $.string + + build_address: + requires: [build_full_name] + action: MyService.concat + parameters: + left: To + right: $.full_name + publish: + address: $.string + + send_greeting: + requires: [build_address, build_greeting] + action: MyService.concat + parameters: + left: '{$.address}.' + right: '{$.greeting},..' diff --git a/mistral/tests/resources/test_order.yaml b/mistral/tests/resources/test_order.yaml new file mode 100644 index 000000000..86e19cb95 --- /dev/null +++ b/mistral/tests/resources/test_order.yaml @@ -0,0 +1,29 @@ +Workflow: + tasks: + task: + requires: [atask, pretask] + action: std.echo + parameters: + output: some + btask: + requires: [ztask] + action: std.echo + parameters: + output: some + ztask: + action: std.echo + parameters: + output: some + atask: + action: std.echo + parameters: + output: some + ctask: + action: std.echo + parameters: + output: some + pretask: + requires: [btask, ztask] + action: std.echo + parameters: + output: some diff --git a/mistral/tests/unit/engine/test_data_flow.py b/mistral/tests/unit/engine/test_data_flow.py index 2045ebc0d..308a1750a 100644 --- a/mistral/tests/unit/engine/test_data_flow.py +++ b/mistral/tests/unit/engine/test_data_flow.py @@ -61,6 +61,13 @@ def create_workbook(definition_path): }) +def context_contains_required(task): + requires = task.get('task_spec').get('requires') + subcontexts = task.get('in_context').get('task', {}) + + return set(requires.keys()).issubset(set(subcontexts.keys())) + + @mock.patch.object( engine.EngineClient, 'start_workflow_execution', mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow)) @@ -250,6 +257,41 @@ class DataFlowTest(base.EngineTestCase): }, send_greeting_task['output']) + def test_task_with_diamond_dependencies(self): + CTX = copy.copy(CONTEXT) + + wb = create_workbook('data_flow/task_with_diamond_dependencies.yaml') + + execution = self.engine.start_workflow_execution(wb['name'], + 'send_greeting', + CTX) + + # We have to reread execution to get its latest version. + execution = db_api.execution_get(execution['id']) + + self.assertEqual(states.SUCCESS, execution['state']) + self.assertDictEqual(CTX, execution['context']) + + tasks = db_api.tasks_get(workbook_name=wb['name'], + execution_id=execution['id']) + + self.assertEqual(4, len(tasks)) + + results = { + 'build_full_name': ('full_name', 'John Doe'), + 'build_address': ('address', 'To John Doe'), + 'build_greeting': ('greeting', 'Dear John Doe'), + 'send_greeting': ('task', + {'send_greeting': + {'string': 'To John Doe. Dear John Doe,..'}}) + } + + for task in tasks: + self.assertTrue(context_contains_required(task), + "Task context is incomplete: %s" % task['name']) + key, value = results[task['name']] + self.assertEqual(value, task['output'][key]) + def test_two_subsequent_tasks(self): CTX = copy.copy(CONTEXT) diff --git a/mistral/tests/unit/engine/test_data_flow_module.py b/mistral/tests/unit/engine/test_data_flow_module.py index 4c73e8b98..b1900e8e8 100644 --- a/mistral/tests/unit/engine/test_data_flow_module.py +++ b/mistral/tests/unit/engine/test_data_flow_module.py @@ -96,7 +96,7 @@ class DataFlowModuleTest(base.DbTestCase): db_api.task_create(EXEC_ID, TASK2.copy()) ] - executables = data_flow.prepare_tasks(tasks, CONTEXT, wb) + executables = data_flow.prepare_tasks(tasks, CONTEXT, wb, tasks) self.assertEqual(2, len(executables)) diff --git a/mistral/tests/unit/engine/test_workflow.py b/mistral/tests/unit/engine/test_workflow.py index cbe4a31a3..1f7c643c3 100644 --- a/mistral/tests/unit/engine/test_workflow.py +++ b/mistral/tests/unit/engine/test_workflow.py @@ -21,19 +21,27 @@ from mistral.tests import base TASKS = [ { - 'requires': {}, 'name': 'backup-vms', - 'state': states.IDLE + 'state': states.IDLE, + 'task_spec': { + 'requires': {} + } }, { - 'requires': {}, 'name': 'create-vms', - 'state': states.SUCCESS + 'state': states.SUCCESS, + 'task_spec': { + 'requires': {} + } }, { - 'requires': ['create-vms'], 'name': 'attach-volume', - 'state': states.IDLE + 'state': states.IDLE, + 'task_spec': { + 'requires': { + 'create-vms': '' + } + } } ] @@ -41,16 +49,33 @@ TASKS = [ class WorkflowTest(base.DbTestCase): def setUp(self): super(WorkflowTest, self).setUp() - self.parser = parser.get_workbook(base.get_resource("test_rest.yaml")) def test_find_workflow_tasks(self): - tasks = workflow.find_workflow_tasks(self.parser, "attach-volumes") + tasks = workflow.find_workflow_tasks( + parser.get_workbook(base.get_resource("test_rest.yaml")), + "attach-volumes" + ) self.assertEqual(2, len(tasks)) self._assert_single_item(tasks, name='create-vms') self._assert_single_item(tasks, name='attach-volumes') + def test_find_workflow_tasks_order(self): + tasks = workflow.find_workflow_tasks( + parser.get_workbook(base.get_resource("test_order.yaml")), + 'task' + ) + + self.assertEqual(5, len(tasks)) + + completed = set() + + for i, task in enumerate(tasks): + self.assertTrue(set(task.requires.keys()).issubset(completed), + "Task %s isn't completed yet" % task.name) + completed.add(task.name) + def test_tasks_to_start(self): tasks_to_start = workflow.find_resolved_tasks(TASKS) self.assertEqual(len(tasks_to_start), 2)