diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 9f6a0277d..d745da7c9 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -273,9 +273,11 @@ class DefaultEngine(base.Engine, coordination.Service): # When resuming a workflow we need to ignore all 'pause' # commands because workflow controller takes tasks that # completed within the period when the workflow was pause. - cmds = filter( - lambda c: not isinstance(c, commands.PauseWorkflow), - cmds + cmds = list( + filter( + lambda c: not isinstance(c, commands.PauseWorkflow), + cmds + ) ) # Since there's no explicit task causing the operation diff --git a/mistral/tests/functional/base.py b/mistral/tests/functional/base.py index dec5abb49..efcafa18d 100644 --- a/mistral/tests/functional/base.py +++ b/mistral/tests/functional/base.py @@ -45,7 +45,7 @@ def find_items(items, **props): return True - filtered = filter(lambda item: _matches(item, **props), items) + filtered = list(filter(lambda item: _matches(item, **props), items)) if len(filtered) == 1: return filtered[0] diff --git a/mistral/workbook/v2/workflows.py b/mistral/workbook/v2/workflows.py index 88d4cde56..b141c6781 100644 --- a/mistral/workbook/v2/workflows.py +++ b/mistral/workbook/v2/workflows.py @@ -280,7 +280,7 @@ class DirectWorkflowSpec(WorkflowSpec): @staticmethod def _remove_task_from_clause(on_clause, t_name): - return filter(lambda tup: tup[0] != t_name, on_clause) + return list(filter(lambda tup: tup[0] != t_name, on_clause)) class ReverseWorkflowSpec(WorkflowSpec): diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index f1219bd1e..954273940 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -43,11 +43,13 @@ class DirectWorkflowController(base.WorkflowController): __workflow_type__ = "direct" def _get_upstream_task_executions(self, task_spec): - return filter( - lambda t_e: self._is_upstream_task_execution(task_spec, t_e), - wf_utils.find_task_executions_by_specs( - self.wf_ex, - self.wf_spec.find_inbound_task_specs(task_spec) + return list( + filter( + lambda t_e: self._is_upstream_task_execution(task_spec, t_e), + wf_utils.find_task_executions_by_specs( + self.wf_ex, + self.wf_spec.find_inbound_task_specs(task_spec) + ) ) ) @@ -154,9 +156,11 @@ class DirectWorkflowController(base.WorkflowController): return True def _find_end_tasks(self): - return filter( - lambda t_ex: not self._has_outbound_tasks(t_ex), - wf_utils.find_successful_task_executions(self.wf_ex) + return list( + filter( + lambda t_ex: not self._has_outbound_tasks(t_ex), + wf_utils.find_successful_task_executions(self.wf_ex) + ) ) def _has_outbound_tasks(self, task_ex): @@ -218,7 +222,9 @@ class DirectWorkflowController(base.WorkflowController): ] def _remove_started_joins(self, cmds): - return filter(lambda cmd: not self._is_started_join(cmd), cmds) + return list( + filter(lambda cmd: not self._is_started_join(cmd), cmds) + ) def _is_started_join(self, cmd): if not (isinstance(cmd, commands.RunTask) and @@ -278,7 +284,9 @@ class DirectWorkflowController(base.WorkflowController): if not in_t_ex or not states.is_completed(in_t_ex.state): return False - return filter( - lambda t_name: join_task_spec.get_name() == t_name, - self._find_next_task_names(in_t_ex) + return list( + filter( + lambda t_name: join_task_spec.get_name() == t_name, + self._find_next_task_names(in_t_ex) + ) ) diff --git a/mistral/workflow/reverse_workflow.py b/mistral/workflow/reverse_workflow.py index f422dca58..7fa45a071 100644 --- a/mistral/workflow/reverse_workflow.py +++ b/mistral/workflow/reverse_workflow.py @@ -77,9 +77,11 @@ class ReverseWorkflowController(base.WorkflowController): or [] ] - return filter( - lambda t_e: t_e.state == states.SUCCESS, - wf_utils.find_task_executions_by_specs(self.wf_ex, t_specs) + return list( + filter( + lambda t_e: t_e.state == states.SUCCESS, + wf_utils.find_task_executions_by_specs(self.wf_ex, t_specs) + ) ) def evaluate_workflow_final_context(self): diff --git a/mistral/workflow/with_items.py b/mistral/workflow/with_items.py index 0cbdb6b24..0e07021e3 100644 --- a/mistral/workflow/with_items.py +++ b/mistral/workflow/with_items.py @@ -45,9 +45,9 @@ def is_completed(task_ex): def get_index(task_ex): return len( - filter( + list(filter( lambda x: x.accepted or states.RUNNING, task_ex.executions - ) + )) ) @@ -81,10 +81,10 @@ def _get_indices_if_rerun(unaccepted_executions): def _get_unaccepted_act_exs(task_ex): # Choose only if not accepted but completed. - return filter( + return list(filter( lambda x: not x.accepted and states.is_completed(x.state), task_ex.executions - ) + )) def get_indices_for_loop(task_ex): @@ -164,9 +164,9 @@ def validate_input(with_items_input): def has_more_iterations(task_ex): # See action executions which have been already # accepted or are still running. - action_exs = filter( + action_exs = list(filter( lambda x: x.accepted or x.state == states.RUNNING, task_ex.executions - ) + )) return get_count(task_ex) > len(action_exs)