filter() is wrapped around list().
In py2 filter returns list, but in py3 it returns iterator. Change-Id: I71db21027bab11c8de715d39e71d0f6317a3f9e0 Partially-Implements: blueprint mistral-py3
This commit is contained in:
parent
ae0d5e9efe
commit
13ec6e8b2c
@ -273,10 +273,12 @@ class DefaultEngine(base.Engine, coordination.Service):
|
||||
# When resuming a workflow we need to ignore all 'pause'
|
||||
# commands because workflow controller takes tasks that
|
||||
# completed within the period when the workflow was pause.
|
||||
cmds = filter(
|
||||
cmds = list(
|
||||
filter(
|
||||
lambda c: not isinstance(c, commands.PauseWorkflow),
|
||||
cmds
|
||||
)
|
||||
)
|
||||
|
||||
# Since there's no explicit task causing the operation
|
||||
# we need to mark all not processed tasks as processed
|
||||
|
@ -45,7 +45,7 @@ def find_items(items, **props):
|
||||
|
||||
return True
|
||||
|
||||
filtered = filter(lambda item: _matches(item, **props), items)
|
||||
filtered = list(filter(lambda item: _matches(item, **props), items))
|
||||
|
||||
if len(filtered) == 1:
|
||||
return filtered[0]
|
||||
|
@ -280,7 +280,7 @@ class DirectWorkflowSpec(WorkflowSpec):
|
||||
|
||||
@staticmethod
|
||||
def _remove_task_from_clause(on_clause, t_name):
|
||||
return filter(lambda tup: tup[0] != t_name, on_clause)
|
||||
return list(filter(lambda tup: tup[0] != t_name, on_clause))
|
||||
|
||||
|
||||
class ReverseWorkflowSpec(WorkflowSpec):
|
||||
|
@ -43,13 +43,15 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
__workflow_type__ = "direct"
|
||||
|
||||
def _get_upstream_task_executions(self, task_spec):
|
||||
return filter(
|
||||
return list(
|
||||
filter(
|
||||
lambda t_e: self._is_upstream_task_execution(task_spec, t_e),
|
||||
wf_utils.find_task_executions_by_specs(
|
||||
self.wf_ex,
|
||||
self.wf_spec.find_inbound_task_specs(task_spec)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
def _is_upstream_task_execution(self, t_spec, t_ex_candidate):
|
||||
if not states.is_completed(t_ex_candidate.state):
|
||||
@ -154,10 +156,12 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
return True
|
||||
|
||||
def _find_end_tasks(self):
|
||||
return filter(
|
||||
return list(
|
||||
filter(
|
||||
lambda t_ex: not self._has_outbound_tasks(t_ex),
|
||||
wf_utils.find_successful_task_executions(self.wf_ex)
|
||||
)
|
||||
)
|
||||
|
||||
def _has_outbound_tasks(self, task_ex):
|
||||
# In order to determine if there are outbound tasks we just need
|
||||
@ -218,7 +222,9 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
]
|
||||
|
||||
def _remove_started_joins(self, cmds):
|
||||
return filter(lambda cmd: not self._is_started_join(cmd), cmds)
|
||||
return list(
|
||||
filter(lambda cmd: not self._is_started_join(cmd), cmds)
|
||||
)
|
||||
|
||||
def _is_started_join(self, cmd):
|
||||
if not (isinstance(cmd, commands.RunTask) and
|
||||
@ -278,7 +284,9 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
if not in_t_ex or not states.is_completed(in_t_ex.state):
|
||||
return False
|
||||
|
||||
return filter(
|
||||
return list(
|
||||
filter(
|
||||
lambda t_name: join_task_spec.get_name() == t_name,
|
||||
self._find_next_task_names(in_t_ex)
|
||||
)
|
||||
)
|
||||
|
@ -77,10 +77,12 @@ class ReverseWorkflowController(base.WorkflowController):
|
||||
or []
|
||||
]
|
||||
|
||||
return filter(
|
||||
return list(
|
||||
filter(
|
||||
lambda t_e: t_e.state == states.SUCCESS,
|
||||
wf_utils.find_task_executions_by_specs(self.wf_ex, t_specs)
|
||||
)
|
||||
)
|
||||
|
||||
def evaluate_workflow_final_context(self):
|
||||
task_execs = wf_utils.find_task_executions_by_spec(
|
||||
|
@ -45,9 +45,9 @@ def is_completed(task_ex):
|
||||
|
||||
def get_index(task_ex):
|
||||
return len(
|
||||
filter(
|
||||
list(filter(
|
||||
lambda x: x.accepted or states.RUNNING, task_ex.executions
|
||||
)
|
||||
))
|
||||
)
|
||||
|
||||
|
||||
@ -81,10 +81,10 @@ def _get_indices_if_rerun(unaccepted_executions):
|
||||
|
||||
def _get_unaccepted_act_exs(task_ex):
|
||||
# Choose only if not accepted but completed.
|
||||
return filter(
|
||||
return list(filter(
|
||||
lambda x: not x.accepted and states.is_completed(x.state),
|
||||
task_ex.executions
|
||||
)
|
||||
))
|
||||
|
||||
|
||||
def get_indices_for_loop(task_ex):
|
||||
@ -164,9 +164,9 @@ def validate_input(with_items_input):
|
||||
def has_more_iterations(task_ex):
|
||||
# See action executions which have been already
|
||||
# accepted or are still running.
|
||||
action_exs = filter(
|
||||
action_exs = list(filter(
|
||||
lambda x: x.accepted or x.state == states.RUNNING,
|
||||
task_ex.executions
|
||||
)
|
||||
))
|
||||
|
||||
return get_count(task_ex) > len(action_exs)
|
||||
|
Loading…
Reference in New Issue
Block a user