Merge "filter() is wrapped around list()."
This commit is contained in:
commit
df2baa4fdc
@ -273,9 +273,11 @@ class DefaultEngine(base.Engine, coordination.Service):
|
||||
# When resuming a workflow we need to ignore all 'pause'
|
||||
# commands because workflow controller takes tasks that
|
||||
# completed within the period when the workflow was pause.
|
||||
cmds = filter(
|
||||
lambda c: not isinstance(c, commands.PauseWorkflow),
|
||||
cmds
|
||||
cmds = list(
|
||||
filter(
|
||||
lambda c: not isinstance(c, commands.PauseWorkflow),
|
||||
cmds
|
||||
)
|
||||
)
|
||||
|
||||
# Since there's no explicit task causing the operation
|
||||
|
@ -45,7 +45,7 @@ def find_items(items, **props):
|
||||
|
||||
return True
|
||||
|
||||
filtered = filter(lambda item: _matches(item, **props), items)
|
||||
filtered = list(filter(lambda item: _matches(item, **props), items))
|
||||
|
||||
if len(filtered) == 1:
|
||||
return filtered[0]
|
||||
|
@ -280,7 +280,7 @@ class DirectWorkflowSpec(WorkflowSpec):
|
||||
|
||||
@staticmethod
|
||||
def _remove_task_from_clause(on_clause, t_name):
|
||||
return filter(lambda tup: tup[0] != t_name, on_clause)
|
||||
return list(filter(lambda tup: tup[0] != t_name, on_clause))
|
||||
|
||||
|
||||
class ReverseWorkflowSpec(WorkflowSpec):
|
||||
|
@ -43,11 +43,13 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
__workflow_type__ = "direct"
|
||||
|
||||
def _get_upstream_task_executions(self, task_spec):
|
||||
return filter(
|
||||
lambda t_e: self._is_upstream_task_execution(task_spec, t_e),
|
||||
wf_utils.find_task_executions_by_specs(
|
||||
self.wf_ex,
|
||||
self.wf_spec.find_inbound_task_specs(task_spec)
|
||||
return list(
|
||||
filter(
|
||||
lambda t_e: self._is_upstream_task_execution(task_spec, t_e),
|
||||
wf_utils.find_task_executions_by_specs(
|
||||
self.wf_ex,
|
||||
self.wf_spec.find_inbound_task_specs(task_spec)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
@ -154,9 +156,11 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
return True
|
||||
|
||||
def _find_end_tasks(self):
|
||||
return filter(
|
||||
lambda t_ex: not self._has_outbound_tasks(t_ex),
|
||||
wf_utils.find_successful_task_executions(self.wf_ex)
|
||||
return list(
|
||||
filter(
|
||||
lambda t_ex: not self._has_outbound_tasks(t_ex),
|
||||
wf_utils.find_successful_task_executions(self.wf_ex)
|
||||
)
|
||||
)
|
||||
|
||||
def _has_outbound_tasks(self, task_ex):
|
||||
@ -218,7 +222,9 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
]
|
||||
|
||||
def _remove_started_joins(self, cmds):
|
||||
return filter(lambda cmd: not self._is_started_join(cmd), cmds)
|
||||
return list(
|
||||
filter(lambda cmd: not self._is_started_join(cmd), cmds)
|
||||
)
|
||||
|
||||
def _is_started_join(self, cmd):
|
||||
if not (isinstance(cmd, commands.RunTask) and
|
||||
@ -278,7 +284,9 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
if not in_t_ex or not states.is_completed(in_t_ex.state):
|
||||
return False
|
||||
|
||||
return filter(
|
||||
lambda t_name: join_task_spec.get_name() == t_name,
|
||||
self._find_next_task_names(in_t_ex)
|
||||
return list(
|
||||
filter(
|
||||
lambda t_name: join_task_spec.get_name() == t_name,
|
||||
self._find_next_task_names(in_t_ex)
|
||||
)
|
||||
)
|
||||
|
@ -77,9 +77,11 @@ class ReverseWorkflowController(base.WorkflowController):
|
||||
or []
|
||||
]
|
||||
|
||||
return filter(
|
||||
lambda t_e: t_e.state == states.SUCCESS,
|
||||
wf_utils.find_task_executions_by_specs(self.wf_ex, t_specs)
|
||||
return list(
|
||||
filter(
|
||||
lambda t_e: t_e.state == states.SUCCESS,
|
||||
wf_utils.find_task_executions_by_specs(self.wf_ex, t_specs)
|
||||
)
|
||||
)
|
||||
|
||||
def evaluate_workflow_final_context(self):
|
||||
|
@ -45,9 +45,9 @@ def is_completed(task_ex):
|
||||
|
||||
def get_index(task_ex):
|
||||
return len(
|
||||
filter(
|
||||
list(filter(
|
||||
lambda x: x.accepted or states.RUNNING, task_ex.executions
|
||||
)
|
||||
))
|
||||
)
|
||||
|
||||
|
||||
@ -81,10 +81,10 @@ def _get_indices_if_rerun(unaccepted_executions):
|
||||
|
||||
def _get_unaccepted_act_exs(task_ex):
|
||||
# Choose only if not accepted but completed.
|
||||
return filter(
|
||||
return list(filter(
|
||||
lambda x: not x.accepted and states.is_completed(x.state),
|
||||
task_ex.executions
|
||||
)
|
||||
))
|
||||
|
||||
|
||||
def get_indices_for_loop(task_ex):
|
||||
@ -164,9 +164,9 @@ def validate_input(with_items_input):
|
||||
def has_more_iterations(task_ex):
|
||||
# See action executions which have been already
|
||||
# accepted or are still running.
|
||||
action_exs = filter(
|
||||
action_exs = list(filter(
|
||||
lambda x: x.accepted or x.state == states.RUNNING,
|
||||
task_ex.executions
|
||||
)
|
||||
))
|
||||
|
||||
return get_count(task_ex) > len(action_exs)
|
||||
|
Loading…
Reference in New Issue
Block a user