diff --git a/mistral/api/controllers/v2/task.py b/mistral/api/controllers/v2/task.py index 8fdf5b98..527a9290 100644 --- a/mistral/api/controllers/v2/task.py +++ b/mistral/api/controllers/v2/task.py @@ -49,7 +49,6 @@ class Task(resource.Resource): result = wtypes.text input = wtypes.text - output = wtypes.text created_at = wtypes.text updated_at = wtypes.text @@ -62,7 +61,7 @@ class Task(resource.Resource): if hasattr(e, key): # Nonetype check for dictionary must be explicit. if val is not None and ( - key == 'input' or key == 'output'): + key == 'input' or key == 'result'): val = json.dumps(val) setattr(e, key, val) @@ -124,13 +123,13 @@ class TasksController(rest.RestController): raise exc.InvalidResultException(str(e)) if task.state == states.ERROR: - raw_result = wf_utils.TaskResult(error=result) + task_result = wf_utils.TaskResult(error=result) else: - raw_result = wf_utils.TaskResult(data=result) + task_result = wf_utils.TaskResult(data=result) engine = rpc.get_engine_client() - values = engine.on_task_result(id, raw_result) + values = engine.on_task_result(id, task_result) return Task.from_dict(values) diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 3d08ff18..0943825d 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -473,9 +473,11 @@ def create_delayed_call(values, session=None): @b.session_aware() def delete_delayed_call(delayed_call_id, session=None): delayed_call = _get_delayed_call(delayed_call_id) + if not delayed_call: - raise exc.NotFoundException("DelayedCall not found [delayed_call_id=" - "%s]" % delayed_call_id) + raise exc.NotFoundException( + "DelayedCall not found [delayed_call_id=%s]" % delayed_call_id + ) session.delete(delayed_call) @@ -483,6 +485,7 @@ def delete_delayed_call(delayed_call_id, session=None): @b.session_aware() def get_delayed_calls_to_start(time, session=None): query = b.model_query(models.DelayedCall) + query = query.filter(models.DelayedCall.execution_time < time) query = query.order_by(models.DelayedCall.execution_time) diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index acd97b40..a6042425 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -94,7 +94,8 @@ class Task(mb.MistralSecureModelBase): in_context = sa.Column(st.JsonDictType()) input = sa.Column(st.JsonDictType()) # TODO(rakhmerov): Do we just need to use invocation instead of output? - output = sa.Column(st.JsonDictType()) + result = sa.Column(st.JsonDictType()) + published = sa.Column(st.JsonDictType()) # Runtime context like iteration_no of a repeater. # Effectively internal engine properties which will be used to determine @@ -112,16 +113,6 @@ class Task(mb.MistralSecureModelBase): lazy='joined' ) - def to_dict(self): - d = super(Task, self).to_dict() - - d['result'] = json.dumps( - d['output'].get('task', {}).get(d['name'], {}) - if d['output'] else {} - ) - - return d - class ActionInvocation(mb.MistralSecureModelBase): """Contains task action invocation information.""" @@ -133,7 +124,7 @@ class ActionInvocation(mb.MistralSecureModelBase): state = sa.Column(sa.String(20)) # Note: Corresponds to MySQL 'LONGTEXT' type which is of unlimited size. - output = sa.orm.deferred(sa.Column(st.LongText())) + result = sa.orm.deferred(sa.Column(st.LongText())) # Relations. task_id = sa.Column(sa.String(36), sa.ForeignKey('tasks_v2.id')) diff --git a/mistral/engine1/base.py b/mistral/engine1/base.py index 7ba661c5..f1f6c59c 100644 --- a/mistral/engine1/base.py +++ b/mistral/engine1/base.py @@ -35,14 +35,14 @@ class Engine(object): raise NotImplementedError @abc.abstractmethod - def on_task_result(self, task_id, raw_result): - """Accepts workflow task raw result and continues the workflow. + def on_task_result(self, task_id, result): + """Accepts workflow task result and continues the workflow. - Task result here is a raw task result which comes from a corresponding - action/workflow associated which the task is associated with. + Task result here is a result which comes from a action/workflow + associated which the task. :param task_id: Task id. - :param raw_result: Raw task result that comes from action/workflow - (before publisher). Instance of mistral.workflow.base.TaskResult + :param result: Action/workflow result. Instance of + mistral.workflow.base.TaskResult :return: """ raise NotImplementedError @@ -118,12 +118,12 @@ class TaskPolicy(object): # No-op by default. pass - def after_task_complete(self, task_db, task_spec, raw_result): + def after_task_complete(self, task_db, task_spec, result): """Called right after task completes. :param task_db: Completed task DB model. :param task_spec: Completed task specification. - :param raw_result: TaskResult instance passed to on_task_result. + :param result: TaskResult instance passed to on_task_result. It is needed for analysis of result and scheduling task again. """ # No-op by default. diff --git a/mistral/engine1/commands.py b/mistral/engine1/commands.py index 25372b08..b2531de4 100644 --- a/mistral/engine1/commands.py +++ b/mistral/engine1/commands.py @@ -234,8 +234,10 @@ class RunTask(EngineCommand): for a_input in action_input_collection: evaluated_input = expr.evaluate_recursively( self.task_spec.get_input(), - utils.merge_dicts(copy.copy(a_input), - copy.copy(self.task_db.in_context))) + utils.merge_dicts( + copy.copy(a_input), + copy.copy(self.task_db.in_context)) + ) if action_context: evaluated_input['action_context'] = action_context @@ -244,9 +246,11 @@ class RunTask(EngineCommand): self.task_db.id, action_db.action_class, action_db.attributes or {}, - utils.merge_dicts(evaluated_input, - action_defaults, - overwrite=False), + utils.merge_dicts( + evaluated_input, + action_defaults, + overwrite=False + ), target ) @@ -255,9 +259,11 @@ class RunTask(EngineCommand): self.task_db.id, action_db.action_class, action_db.attributes or {}, - utils.merge_dicts(action_input, - action_defaults, - overwrite=False), + utils.merge_dicts( + action_input, + action_defaults, + overwrite=False + ), target ) diff --git a/mistral/engine1/default_engine.py b/mistral/engine1/default_engine.py index 17cd5fdc..2dfc6e0b 100644 --- a/mistral/engine1/default_engine.py +++ b/mistral/engine1/default_engine.py @@ -43,8 +43,9 @@ class DefaultEngine(base.Engine): @u.log_exec(LOG) def start_workflow(self, workflow_name, workflow_input, **params): + exec_id = None + try: - execution_id = None params = self._canonize_workflow_params(params) with db_api.transaction(): @@ -60,7 +61,7 @@ class DefaultEngine(base.Engine): workflow_input, params ) - execution_id = exec_db.id + exec_id = exec_db.id u.wf_trace.info( exec_db, @@ -80,33 +81,34 @@ class DefaultEngine(base.Engine): self._run_remote_commands(cmds, exec_db, wf_handler) except Exception as e: - LOG.error("Failed to start workflow '%s' id=%s: %s\n%s", - workflow_name, execution_id, e, traceback.format_exc()) - self._fail_workflow(execution_id, e) + LOG.error( + "Failed to start workflow '%s' id=%s: %s\n%s", + workflow_name, exec_id, e, traceback.format_exc() + ) + self._fail_workflow(exec_id, e) raise e return exec_db @u.log_exec(LOG) - def on_task_result(self, task_id, raw_result): - try: - task_name = "Unknown" - execution_id = None + def on_task_result(self, task_id, result): + task_name = "Unknown" + exec_id = None + try: with db_api.transaction(): task_db = db_api.get_task(task_id) task_name = task_db.name exec_db = db_api.get_execution(task_db.execution_id) - execution_id = exec_db.id + exec_id = exec_db.id - raw_result = utils.transform_result( - exec_db, task_db, raw_result) + result = utils.transform_result(exec_db, task_db, result) wf_handler = wfh_factory.create_workflow_handler(exec_db) self._after_task_complete( task_db, spec_parser.get_task_spec(task_db.spec), - raw_result, + result, wf_handler.wf_spec ) @@ -114,7 +116,7 @@ class DefaultEngine(base.Engine): return task_db # Calculate commands to process next. - cmds = wf_handler.on_task_result(task_db, raw_result) + cmds = wf_handler.on_task_result(task_db, result) self._run_local_commands( cmds, @@ -127,21 +129,25 @@ class DefaultEngine(base.Engine): self._check_subworkflow_completion(exec_db) except Exception as e: - LOG.error("Failed to handle results for task '%s' id=%s: %s\n%s", - task_name, task_id, e, traceback.format_exc()) + LOG.error( + "Failed to handle results for task '%s' id=%s: %s\n%s", + task_name, task_id, e, traceback.format_exc() + ) # TODO(dzimine): try to find out which command caused failure. - self._fail_workflow(execution_id, e) + self._fail_workflow(exec_id, e) raise e return task_db @u.log_exec(LOG) def run_task(self, task_id): - try: - execution_id = None + task_name = "Unknown" + exec_id = None + try: with db_api.transaction(): task_db = db_api.get_task(task_id) + task_name = task_db.name u.wf_trace.info( task_db, @@ -157,7 +163,7 @@ class DefaultEngine(base.Engine): task_spec = spec_parser.get_task_spec(task_db.spec) exec_db = task_db.execution - execution_id = exec_db.id + exec_id = exec_db.id wf_handler = wfh_factory.create_workflow_handler(exec_db) @@ -168,9 +174,11 @@ class DefaultEngine(base.Engine): cmd.run_remote(exec_db, wf_handler) except Exception as e: - LOG.error("Failed to run task '%s': %s\n%s", - task_db.name, e, traceback.format_exc()) - self._fail_workflow(execution_id, e, task_id) + LOG.error( + "Failed to run task '%s': %s\n%s", + task_name, e, traceback.format_exc() + ) + self._fail_workflow(exec_id, e, task_id) raise e @u.log_exec(LOG) @@ -213,21 +221,22 @@ class DefaultEngine(base.Engine): err_msg = str(err) exec_db = db_api.load_execution(execution_id) + if exec_db is None: LOG.error("Cant fail workflow execution id='%s': not found.", execution_id) return wf_handler = wfh_factory.create_workflow_handler(exec_db) + wf_handler.fail_workflow(err_msg) if task_id: - task_db = db_api.get_task(task_id) # Note(dzimine): Don't call self.engine_client: # 1) to avoid computing and triggering next tasks # 2) to avoid a loop in case of error in transport wf_handler.on_task_result( - task_db, + db_api.get_task(task_id), wf_utils.TaskResult(error=err_msg) ) @@ -239,7 +248,6 @@ class DefaultEngine(base.Engine): @staticmethod def _canonize_workflow_params(params): - # Resolve environment parameter. env = params.get('env', {}) @@ -296,9 +304,9 @@ class DefaultEngine(base.Engine): return exec_db @staticmethod - def _after_task_complete(task_db, task_spec, raw_result, wf_spec): + def _after_task_complete(task_db, task_spec, result, wf_spec): for p in policies.build_policies(task_spec.get_policies(), wf_spec): - p.after_task_complete(task_db, task_spec, raw_result) + p.after_task_complete(task_db, task_spec, result) def _check_subworkflow_completion(self, exec_db): if not exec_db.parent_task_id: diff --git a/mistral/engine1/policies.py b/mistral/engine1/policies.py index 2923784b..3e416f41 100644 --- a/mistral/engine1/policies.py +++ b/mistral/engine1/policies.py @@ -189,7 +189,7 @@ class WaitAfterPolicy(base.TaskPolicy): def __init__(self, delay): self.delay = delay - def after_task_complete(self, task_db, task_spec, raw_result): + def after_task_complete(self, task_db, task_spec, result): context_key = 'wait_after_policy' runtime_context = _ensure_context_has_key( @@ -224,7 +224,7 @@ class WaitAfterPolicy(base.TaskPolicy): task_db.state = states.DELAYED serializers = { - 'raw_result': 'mistral.workflow.utils.TaskResultSerializer' + 'result': 'mistral.workflow.utils.TaskResultSerializer' } scheduler.schedule_call( @@ -233,7 +233,7 @@ class WaitAfterPolicy(base.TaskPolicy): self.delay, serializers, task_id=task_db.id, - raw_result=raw_result + result=result ) @@ -243,7 +243,7 @@ class RetryPolicy(base.TaskPolicy): self.delay = delay self.break_on = break_on - def after_task_complete(self, task_db, task_spec, raw_result): + def after_task_complete(self, task_db, task_spec, result): """Possible Cases: 1. state = SUCCESS @@ -262,7 +262,7 @@ class RetryPolicy(base.TaskPolicy): task_db.runtime_context = runtime_context - state = states.ERROR if raw_result.is_error() else states.SUCCESS + state = states.ERROR if result.is_error() else states.SUCCESS if state != states.ERROR: return @@ -273,7 +273,7 @@ class RetryPolicy(base.TaskPolicy): % (task_db.name, task_db.state) ) - outbound_context = task_db.output + outbound_context = task_db.result policy_context = runtime_context[context_key] diff --git a/mistral/engine1/rpc.py b/mistral/engine1/rpc.py index e04e0058..8f505bce 100644 --- a/mistral/engine1/rpc.py +++ b/mistral/engine1/rpc.py @@ -184,11 +184,11 @@ class EngineClient(base.Engine): params=params ) - def on_task_result(self, task_id, raw_result): + def on_task_result(self, task_id, result): """Conveys task result to Mistral Engine. This method should be used by clients of Mistral Engine to update - state of a task once task action has been performed. One of the + state of a task once task action has executed. One of the clients of this method is Mistral REST API server that receives task result from the outside action handlers. @@ -203,8 +203,8 @@ class EngineClient(base.Engine): auth_ctx.ctx(), 'on_task_result', task_id=task_id, - result_data=raw_result.data, - result_error=raw_result.error + result_data=result.data, + result_error=result.error ) def run_task(self, task_id): diff --git a/mistral/engine1/utils.py b/mistral/engine1/utils.py index 725433b5..67e9606a 100644 --- a/mistral/engine1/utils.py +++ b/mistral/engine1/utils.py @@ -105,9 +105,19 @@ def resolve_workflow(parent_wf_name, parent_wf_spec_name, wf_spec_name): return wf_db -def transform_result(exec_db, task_db, raw_result): - if raw_result.is_error(): - return raw_result +def transform_result(exec_db, task_db, result): + """Transforms task result accounting for ad-hoc actions. + + In case if the given result is an action result and action is + an ad-hoc action the method transforms the result according to + ad-hoc action configuration. + + :param exec_db: Execution DB model. + :param task_db: Task DB model. + :param result: Result of task action/workflow. + """ + if result.is_error(): + return result action_spec_name = spec_parser.get_task_spec( task_db.spec).get_action_name() @@ -120,14 +130,14 @@ def transform_result(exec_db, task_db, raw_result): exec_db.wf_name, wf_spec_name, action_spec_name, - raw_result + result ) - return raw_result + return result def transform_action_result(wf_name, wf_spec_name, action_spec_name, - raw_result): + result): action_db = resolve_action( wf_name, wf_spec_name, @@ -135,14 +145,14 @@ def transform_action_result(wf_name, wf_spec_name, action_spec_name, ) if not action_db.spec: - return raw_result + return result transformer = spec_parser.get_action_spec(action_db.spec).get_output() if transformer is None: - return raw_result + return result return wf_utils.TaskResult( - data=expr.evaluate_recursively(transformer, raw_result.data), - error=raw_result.error + data=expr.evaluate_recursively(transformer, result.data), + error=result.error ) diff --git a/mistral/tests/unit/api/v2/test_tasks.py b/mistral/tests/unit/api/v2/test_tasks.py index 1a9fadce..a8164928 100644 --- a/mistral/tests/unit/api/v2/test_tasks.py +++ b/mistral/tests/unit/api/v2/test_tasks.py @@ -39,7 +39,7 @@ TASK_DB = models.Task( tags=['a', 'b'], in_context={}, input={}, - output={}, + result={}, runtime_context={}, execution_id='123', created_at=datetime.datetime(1970, 1, 1), @@ -53,7 +53,6 @@ TASK = { 'state': 'RUNNING', 'result': '{}', 'input': '{}', - 'output': '{}', 'execution_id': '123', 'created_at': '1970-01-01 00:00:00', 'updated_at': '1970-01-01 00:00:00' diff --git a/mistral/tests/unit/engine1/test_dataflow.py b/mistral/tests/unit/engine1/test_dataflow.py index 0b1f41c4..5c1f16ae 100644 --- a/mistral/tests/unit/engine1/test_dataflow.py +++ b/mistral/tests/unit/engine1/test_dataflow.py @@ -185,11 +185,11 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase): task3 = self._assert_single_item(tasks, name='task3') self.assertEqual(states.SUCCESS, task3.state) - self.assertDictEqual({'hi': 'Hi'}, task1.output) - self.assertDictEqual({'to': 'Morpheus'}, task2.output) + self.assertDictEqual({'hi': 'Hi'}, task1.result) + self.assertDictEqual({'to': 'Morpheus'}, task2.result) self.assertDictEqual( {'result': 'Hi, Morpheus! Sincerely, your Neo.'}, - task3.output + task3.result ) def test_parallel_tasks(self): @@ -215,8 +215,8 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase): self.assertEqual(states.SUCCESS, task1.state) self.assertEqual(states.SUCCESS, task2.state) - self.assertDictEqual({'var1': 1}, task1.output) - self.assertDictEqual({'var2': 2}, task2.output) + self.assertDictEqual({'var1': 1}, task1.result) + self.assertDictEqual({'var2': 2}, task2.result) self.assertEqual(1, exec_db.output['var1']) self.assertEqual(2, exec_db.output['var2']) @@ -252,11 +252,11 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase): self.assertEqual(states.SUCCESS, task2.state) self.assertEqual(states.SUCCESS, task21.state) - self.assertDictEqual({'var1': 1}, task1.output) - self.assertDictEqual({'var12': 12}, task12.output) - self.assertDictEqual({'var14': 14}, task14.output) - self.assertDictEqual({'var2': 2}, task2.output) - self.assertDictEqual({'var21': 21}, task21.output) + self.assertDictEqual({'var1': 1}, task1.result) + self.assertDictEqual({'var12': 12}, task12.result) + self.assertDictEqual({'var14': 14}, task14.result) + self.assertDictEqual({'var2': 2}, task2.result) + self.assertDictEqual({'var21': 21}, task21.result) self.assertEqual(1, exec_db.output['var1']) self.assertEqual(12, exec_db.output['var12']) @@ -290,17 +290,17 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase): task4 = self._assert_single_item(tasks, name='task4') self.assertEqual(states.SUCCESS, task4.state) - self.assertDictEqual({'greeting': 'Hi'}, task1.output) - self.assertDictEqual({'greeting': 'Yo'}, task2.output) - self.assertDictEqual({'to': 'Morpheus'}, task3.output) + self.assertDictEqual({'greeting': 'Hi'}, task1.result) + self.assertDictEqual({'greeting': 'Yo'}, task2.result) + self.assertDictEqual({'to': 'Morpheus'}, task3.result) self.assertDictEqual( {'result': 'Yo, Morpheus! Sincerely, your Neo.'}, - task4.output + task4.result ) class DataFlowTest(test_base.BaseTest): - def test_evaluate_task_output_simple(self): + def test_evaluate_task_result_simple(self): """Test simplest green-path scenario: action status is SUCCESS, action output is string published variables are static (no expression), @@ -310,16 +310,21 @@ class DataFlowTest(test_base.BaseTest): """ publish_dict = {'foo': 'bar'} action_output = 'string data' + task_db = models.Task(name='task1') + task_spec = mock.MagicMock() task_spec.get_publish = mock.MagicMock(return_value=publish_dict) - raw_result = utils.TaskResult(data=action_output, error=None) - res = data_flow.evaluate_task_output(task_db, task_spec, raw_result) + res = data_flow.evaluate_task_result( + task_db, + task_spec, + utils.TaskResult(data=action_output, error=None) + ) self.assertEqual(res['foo'], 'bar') - def test_evaluate_task_output(self): + def test_evaluate_task_result(self): """Test green-path scenario with evaluations action status is SUCCESS, action output is dict published variables with expression, @@ -346,9 +351,11 @@ class DataFlowTest(test_base.BaseTest): task_spec = mock.MagicMock() task_spec.get_publish = mock.MagicMock(return_value=publish) - raw_result = utils.TaskResult(data=action_output, error=None) - - res = data_flow.evaluate_task_output(task_db, task_spec, raw_result) + res = data_flow.evaluate_task_result( + task_db, + task_spec, + utils.TaskResult(data=action_output, error=None) + ) self.assertEqual(3, len(res)) @@ -361,7 +368,7 @@ class DataFlowTest(test_base.BaseTest): # Resolved from action output. self.assertEqual(res['a'], 'adata') - def test_evaluate_task_output_with_error(self): + def test_evaluate_task_result_with_error(self): """Test handling ERROR in action action status is ERROR, action output is error string published variables should not evaluate, @@ -376,9 +383,11 @@ class DataFlowTest(test_base.BaseTest): task_spec = mock.MagicMock() task_spec.get_publish = mock.MagicMock(return_value=publish) - raw_result = utils.TaskResult(data=None, error=action_output) - - res = data_flow.evaluate_task_output(task_db, task_spec, raw_result) + res = data_flow.evaluate_task_result( + task_db, + task_spec, + utils.TaskResult(data=None, error=action_output) + ) self.assertDictEqual( res, diff --git a/mistral/tests/unit/engine1/test_default_engine.py b/mistral/tests/unit/engine1/test_default_engine.py index d77ffc99..f3f2fd14 100644 --- a/mistral/tests/unit/engine1/test_default_engine.py +++ b/mistral/tests/unit/engine1/test_default_engine.py @@ -250,7 +250,7 @@ class DefaultEngineTest(base.DbTestCase): self._assert_dict_contains_subset(wf_input, task1_db.in_context) self.assertIn('__execution', task_db.in_context) self.assertDictEqual({'output': 'Hey'}, task1_db.input) - self.assertDictEqual({'result': 'Hey'}, task1_db.output) + self.assertDictEqual({'result': 'Hey'}, task1_db.result) exec_db = db_api.get_execution(exec_db.id) @@ -279,12 +279,12 @@ class DefaultEngineTest(base.DbTestCase): self.assertEqual(states.SUCCESS, task2_db.state) in_context = copy.deepcopy(wf_input) - in_context.update(task1_db.output) + in_context.update(task1_db.result) self._assert_dict_contains_subset(in_context, task2_db.in_context) self.assertIn('__execution', task_db.in_context) self.assertDictEqual({'output': 'Hi'}, task2_db.input) - self.assertDictEqual({}, task2_db.output) + self.assertDictEqual({}, task2_db.result) self.assertEqual(2, len(exec_db.tasks)) diff --git a/mistral/tests/unit/engine1/test_direct_workflow.py b/mistral/tests/unit/engine1/test_direct_workflow.py index f7ffbdce..0d9b5764 100644 --- a/mistral/tests/unit/engine1/test_direct_workflow.py +++ b/mistral/tests/unit/engine1/test_direct_workflow.py @@ -110,15 +110,15 @@ class DirectWorkflowEngineTest(base.EngineTestCase): self.assertIn( "Failed to initialize action", - task_db2.output['task'][task_db2.name] + task_db2.result['task'][task_db2.name] ) self.assertIn( "unexpected keyword argument", - task_db2.output['task'][task_db2.name] + task_db2.result['task'][task_db2.name] ) self.assertTrue(exec_db.state, states.ERROR) - self.assertIn(task_db2.output['error'], exec_db.state_info) + self.assertIn(task_db2.result['error'], exec_db.state_info) def test_wrong_first_task_input(self): WORKFLOW_WRONG_FIRST_TASK_INPUT = """ @@ -136,15 +136,15 @@ class DirectWorkflowEngineTest(base.EngineTestCase): self.assertIn( "Failed to initialize action", - task_db.output['task'][task_db.name] + task_db.result['task'][task_db.name] ) self.assertIn( "unexpected keyword argument", - task_db.output['task'][task_db.name] + task_db.result['task'][task_db.name] ) self.assertTrue(exec_db.state, states.ERROR) - self.assertIn(task_db.output['error'], exec_db.state_info) + self.assertIn(task_db.result['error'], exec_db.state_info) def test_wrong_action(self): WORKFLOW_WRONG_ACTION = """ diff --git a/mistral/tests/unit/engine1/test_javascript_action.py b/mistral/tests/unit/engine1/test_javascript_action.py index 63c8ff69..2b6a0ccf 100644 --- a/mistral/tests/unit/engine1/test_javascript_action.py +++ b/mistral/tests/unit/engine1/test_javascript_action.py @@ -104,4 +104,4 @@ class JavaScriptEngineTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task_db.state) self.assertDictEqual({}, task_db.runtime_context) - self.assertEqual(500, task_db.output['result']) + self.assertEqual(500, task_db.result['result']) diff --git a/mistral/tests/unit/engine1/test_join.py b/mistral/tests/unit/engine1/test_join.py index 8e14ec82..a8af4b0c 100644 --- a/mistral/tests/unit/engine1/test_join.py +++ b/mistral/tests/unit/engine1/test_join.py @@ -357,7 +357,7 @@ class JoinEngineTest(base.EngineTestCase): { 'result4': '1,2', }, - task4.output + task4.result ) self.assertDictEqual({'result': '1,2'}, exec_db.output) @@ -389,7 +389,7 @@ class JoinEngineTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task4.state) self.assertEqual(states.SUCCESS, task5.state) - result5 = task5.output['result5'] + result5 = task5.result['result5'] self.assertIsNotNone(result5) self.assertEqual(2, result5.count('None')) @@ -419,7 +419,7 @@ class JoinEngineTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task3.state) self.assertEqual(states.SUCCESS, task4.state) - result4 = task4.output['result4'] + result4 = task4.result['result4'] self.assertIsNotNone(result4) self.assertEqual(2, result4.count('None')) diff --git a/mistral/tests/unit/engine1/test_race_condition.py b/mistral/tests/unit/engine1/test_race_condition.py index 09838cc9..76467954 100644 --- a/mistral/tests/unit/engine1/test_race_condition.py +++ b/mistral/tests/unit/engine1/test_race_condition.py @@ -203,5 +203,5 @@ class LongActionTest(base.EngineTestCase): { 'result1': 1, }, - task1.output + task1.result ) diff --git a/mistral/tests/unit/engine1/test_with_items.py b/mistral/tests/unit/engine1/test_with_items.py index aca92f33..26d2973c 100644 --- a/mistral/tests/unit/engine1/test_with_items.py +++ b/mistral/tests/unit/engine1/test_with_items.py @@ -161,7 +161,7 @@ class WithItemsEngineTest(base.EngineTestCase): # Since we know that we can receive results in random order, # check is not depend on order of items. - result = task1.output['result'] + result = task1.result['result'] self.assertTrue(isinstance(result, list)) @@ -189,7 +189,7 @@ class WithItemsEngineTest(base.EngineTestCase): tasks = exec_db.tasks task1 = self._assert_single_item(tasks, name='task1') - result = task1.output['result'] + result = task1.result['result'] self.assertTrue(isinstance(result, list)) @@ -220,7 +220,8 @@ class WithItemsEngineTest(base.EngineTestCase): # Since we know that we can receive results in random order, # check is not depend on order of items. - result = task1.output['result'] + result = task1.result['result'] + self.assertTrue(isinstance(result, list)) self.assertIn('a 1', result) @@ -253,7 +254,7 @@ class WithItemsEngineTest(base.EngineTestCase): exec_db = db_api.get_execution(exec_db.id) task_db = db_api.get_task(task_db.id) - result = task_db.output['result'] + result = task_db.result['result'] self.assertTrue(isinstance(result, list)) diff --git a/mistral/tests/unit/workflow/test_with_items.py b/mistral/tests/unit/workflow/test_with_items.py index e8232fb4..c06145ac 100644 --- a/mistral/tests/unit/workflow/test_with_items.py +++ b/mistral/tests/unit/workflow/test_with_items.py @@ -38,7 +38,7 @@ TASK_SPEC = tasks.TaskSpec(TASK_DICT) TASK_DB = models.Task( name='task1', - output=None, + result=None, ) @@ -48,15 +48,21 @@ class WithItemsCalculationsTest(base.BaseTest): task_dict['publish'] = {'result': '{$.task1}'} task_spec = tasks.TaskSpec(task_dict) - raw_result = utils.TaskResult(data='output!') - output = with_items.get_output(TASK_DB, task_spec, raw_result) + output = with_items.get_result( + TASK_DB, + task_spec, + utils.TaskResult(data='output!') + ) self.assertDictEqual({'result': ['output!']}, output) def test_calculate_output_without_key(self): - raw_result = utils.TaskResult(data='output!') - output = with_items.get_output(TASK_DB, TASK_SPEC, raw_result) + output = with_items.get_result( + TASK_DB, + TASK_SPEC, + utils.TaskResult(data='output!') + ) # TODO(rakhmerov): Fix during result/output refactoring. self.assertDictEqual({}, output) diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index fa48c6a4..d14b21d5 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -59,14 +59,15 @@ class WorkflowHandler(object): """ raise NotImplementedError - def on_task_result(self, task_db, raw_result): + def on_task_result(self, task_db, result): """Handles event of arriving a task result. Given task result performs analysis of the workflow execution and - identifies tasks that can be scheduled for execution. + identifies commands (including tasks) that can be scheduled for + execution. :param task_db: Task that the result corresponds to. - :param raw_result: Raw task result that comes from action/workflow - (before publisher). Instance of mistral.workflow.utils.TaskResult + :param result: Task action/workflow result. + Instance of mistral.workflow.utils.TaskResult :return List of engine commands that needs to be performed. """ @@ -78,24 +79,22 @@ class WorkflowHandler(object): task_spec = self.wf_spec.get_tasks()[task_db.name] - task_db.output = self._determine_task_output( - task_spec, - task_db, - raw_result - ) + task_db.state = self._determine_task_state(task_db, task_spec, result) - task_db.state = self._determine_task_state( - task_db, + # TODO(rakhmerov): This needs to be fixed (the method should work + # differently). + task_db.result = self._determine_task_result( task_spec, - raw_result + task_db, + result ) wf_trace_msg += "%s" % task_db.state if task_db.state == states.ERROR: - wf_trace_msg += ", error = %s]" % utils.cut(raw_result.error) + wf_trace_msg += ", error = %s]" % utils.cut(result.error) else: - wf_trace_msg += ", result = %s]" % utils.cut(raw_result.data) + wf_trace_msg += ", result = %s]" % utils.cut(result.data) wf_trace.info(task_db, wf_trace_msg) @@ -108,7 +107,8 @@ class WorkflowHandler(object): not self._is_error_handled(task_db)): if not self.is_paused_or_completed(): # TODO(dzimine): pass task_db.result when Model refactored. - msg = str(task_db.output.get('error', "Unknown")) + msg = str(task_db.result.get('error', "Unknown")) + self._set_execution_state( states.ERROR, "Failure caused by error in task '%s': %s" @@ -132,23 +132,19 @@ class WorkflowHandler(object): return cmds @staticmethod - def _determine_task_output(task_spec, task_db, raw_result): - with_items_spec = task_spec.get_with_items() - - if with_items_spec: - return with_items.get_output( - task_db, task_spec, raw_result) + def _determine_task_result(task_spec, task_db, result): + # TODO(rakhmerov): Think how 'with-items' can be better encapsulated. + if task_spec.get_with_items(): + return with_items.get_result(task_db, task_spec, result) else: - return data_flow.evaluate_task_output( - task_db, task_spec, raw_result) + return data_flow.evaluate_task_result(task_db, task_spec, result) @staticmethod - def _determine_task_state(task_db, task_spec, raw_result): - state = states.ERROR if raw_result.is_error() else states.SUCCESS + def _determine_task_state(task_db, task_spec, result): + state = states.ERROR if result.is_error() else states.SUCCESS - with_items_spec = task_spec.get_with_items() - - if with_items_spec: + # TODO(rakhmerov): Think how 'with-items' can be better encapsulated. + if task_spec.get_with_items(): # Change the index. with_items.do_step(task_db) diff --git a/mistral/workflow/data_flow.py b/mistral/workflow/data_flow.py index 22e09ed4..b5fc3907 100644 --- a/mistral/workflow/data_flow.py +++ b/mistral/workflow/data_flow.py @@ -93,21 +93,23 @@ def _evaluate_upstream_context(upstream_db_tasks): # TODO(rakhmerov): This method should utilize task invocations and calculate # effective task output. -def evaluate_task_output(task_db, task_spec, raw_result): - """Evaluates task output given a raw task result from action/workflow. +# TODO(rakhmerov): Now this method doesn't make a lot of sense because we +# treat action/workflow as a task result so we need to calculate only +# what could be called "effective task result" +def evaluate_task_result(task_db, task_spec, result): + """Evaluates task result given a result from action/workflow. :param task_db: DB task :param task_spec: Task specification - :param raw_result: Raw task result that comes from action/workflow - (before publisher). Instance of mistral.workflow.base.TaskResult - :return: Complete task output that goes to Data Flow context for SUCCESS - or raw error for ERROR + :param result: Task action/workflow result. Instance of + mistral.workflow.base.TaskResult + :return: Complete task result. """ - if raw_result.is_error(): + if result.is_error(): return { - 'error': raw_result.error, - 'task': {task_db.name: raw_result.error} + 'error': result.error, + 'task': {task_db.name: result.error} } # Expression context is task inbound context + action/workflow result @@ -120,11 +122,24 @@ def evaluate_task_output(task_db, task_spec, raw_result): task_db.name ) - expr_ctx[task_db.name] = copy.deepcopy(raw_result.data) or {} + expr_ctx[task_db.name] = copy.deepcopy(result.data) or {} return expr.evaluate_recursively(task_spec.get_publish(), expr_ctx) +def evaluate_effective_task_result(task_db, task_spec): + """Evaluates effective (final) task result. + + Based on existing task invocations this method calculates + task final result that's supposed to be accessibly by users. + :param task_db: DB task. + :param task_spec: Task specification. + :return: Effective (final) task result. + """ + # TODO(rakhmerov): Implement + pass + + def evaluate_task_outbound_context(task_db): """Evaluates task outbound Data Flow context. @@ -137,12 +152,12 @@ def evaluate_task_outbound_context(task_db): in_context = (copy.deepcopy(dict(task_db.in_context)) if task_db.in_context is not None else {}) - out_ctx = utils.merge_dicts(in_context, task_db.output) + out_ctx = utils.merge_dicts(in_context, task_db.result) # Add task output under key 'task.taskName'. out_ctx = utils.merge_dicts( out_ctx, - {task_db.name: copy.deepcopy(task_db.output) or None} + {task_db.name: copy.deepcopy(task_db.result) or None} ) return out_ctx @@ -156,7 +171,7 @@ def evaluate_workflow_output(wf_spec, context): """ output_dict = wf_spec.get_output() - # Evaluate 'publish' clause using raw result as a context. + # Evaluate workflow 'publish' clause using the final workflow context. output = expr.evaluate_recursively(output_dict, context) # TODO(rakhmerov): Many don't like that we return the whole context diff --git a/mistral/workflow/with_items.py b/mistral/workflow/with_items.py index dc9c4bb6..85d3f0c2 100644 --- a/mistral/workflow/with_items.py +++ b/mistral/workflow/with_items.py @@ -18,11 +18,13 @@ from mistral import exceptions as exc from mistral import expressions as expr -# TODO(rakhmerov): Partially duplicates data_flow.evaluate_task_output -def get_output(task_db, task_spec, raw_result): - """Returns output from task markered as with-items +# TODO(rakhmerov): Partially duplicates data_flow.evaluate_task_result +# TODO(rakhmerov): Method now looks confusing because it's called 'get_result' +# and has 'result' parameter, but it's temporary, needs to be refactored. +def get_result(task_db, task_spec, result): + """Returns result from task markered as with-items - Examples of output: + Examples of result: 1. Without publish clause: { "task": { @@ -30,64 +32,64 @@ def get_output(task_db, task_spec, raw_result): } } Note: In this case we don't create any specific - output to prevent generating large data in DB. + result to prevent generating large data in DB. Note: None here used for calculating number of finished iterations. - 2. With publish clause and specific output key: + 2. With publish clause and specific result key: { "result": [ - "output1", - "output2" + "result1", + "result2" ], "task": { "task1": { "result": [ - "output1", - "output2" + "result1", + "result2" ] } } } """ - e_data = raw_result.error + e_data = result.error expr_ctx = copy.deepcopy(task_db.in_context) or {} - expr_ctx[task_db.name] = copy.deepcopy(raw_result.data) or {} + expr_ctx[task_db.name] = copy.deepcopy(result.data) or {} - # Calc output for with-items (only list form is used). - output = expr.evaluate_recursively(task_spec.get_publish(), expr_ctx) + # Calc result for with-items (only list form is used). + result = expr.evaluate_recursively(task_spec.get_publish(), expr_ctx) - if not task_db.output: - task_db.output = {} + if not task_db.result: + task_db.result = {} - task_output = copy.copy(task_db.output) + task_result = copy.copy(task_db.result) - out_key = _get_output_key(task_spec) + res_key = _get_result_key(task_spec) - if out_key: - if out_key in task_output: - task_output[out_key].append( - output.get(out_key) or e_data + if res_key: + if res_key in task_result: + task_result[res_key].append( + result.get(res_key) or e_data ) else: - task_output[out_key] = [output.get(out_key) or e_data] + task_result[res_key] = [result.get(res_key) or e_data] - # Add same result to task output under key 'task'. - # TODO(rakhmerov): Fix this during output/result refactoring. - # task_output[t_name] = + # Add same result to task result under key 'task'. + # TODO(rakhmerov): Fix this during task result refactoring. + # task_result[t_name] = # { - # out_key: task_output[out_key] + # res_key: task_result[res_key] # } # else: - # if 'task' not in task_output: - # task_output.update({'task': {t_name: [None or e_data]}}) + # if 'task' not in task_result: + # task_result.update({'task': {t_name: [None or e_data]}}) # else: - # task_output['task'][t_name].append(None or e_data) + # task_result['task'][t_name].append(None or e_data) - return task_output + return task_result def _get_context(task_db): @@ -202,7 +204,7 @@ def validate_input(with_items_input): ) -def _get_output_key(task_spec): +def _get_result_key(task_spec): return (task_spec.get_publish().keys()[0] if task_spec.get_publish() else None)