diff --git a/taskflow/engines/action_engine/scheduler.py b/taskflow/engines/action_engine/scheduler.py index 266c9ebb..82bacbe4 100644 --- a/taskflow/engines/action_engine/scheduler.py +++ b/taskflow/engines/action_engine/scheduler.py @@ -21,31 +21,17 @@ from taskflow import task as task_atom from taskflow.types import failure -class Scheduler(object): - """Schedules atoms using actions to schedule.""" - +class _RetryScheduler(object): def __init__(self, runtime): self._runtime = runtime - self._analyzer = runtime.analyzer self._retry_action = runtime.retry_action - self._runtime = runtime self._storage = runtime.storage - self._task_action = runtime.task_action - def _schedule_node(self, node): - """Schedule a single node for execution.""" - # TODO(harlowja): we need to rework this so that we aren't doing type - # checking here, type checking usually means something isn't done right - # and usually will limit extensibility in the future. - if isinstance(node, task_atom.BaseTask): - return self._schedule_task(node) - elif isinstance(node, retry_atom.Retry): - return self._schedule_retry(node) - else: - raise TypeError("Unknown how to schedule atom '%s' (%s)" - % (node, type(node))) + @staticmethod + def handles(atom): + return isinstance(atom, retry_atom.Retry) - def _schedule_retry(self, retry): + def schedule(self, retry): """Schedules the given retry atom for *future* completion. Depending on the atoms stored intention this may schedule the retry @@ -64,7 +50,17 @@ class Scheduler(object): raise excp.ExecutionFailure("Unknown how to schedule retry with" " intention: %s" % intention) - def _schedule_task(self, task): + +class _TaskScheduler(object): + def __init__(self, runtime): + self._storage = runtime.storage + self._task_action = runtime.task_action + + @staticmethod + def handles(atom): + return isinstance(atom, task_atom.BaseTask) + + def schedule(self, task): """Schedules the given task atom for *future* completion. Depending on the atoms stored intention this may schedule the task @@ -79,6 +75,24 @@ class Scheduler(object): raise excp.ExecutionFailure("Unknown how to schedule task with" " intention: %s" % intention) + +class Scheduler(object): + """Schedules atoms using actions to schedule.""" + + def __init__(self, runtime): + self._schedulers = [ + _RetryScheduler(runtime), + _TaskScheduler(runtime), + ] + + def _schedule_node(self, node): + """Schedule a single node for execution.""" + for sched in self._schedulers: + if sched.handles(node): + return sched.schedule(node) + else: + raise TypeError("Unknown how to schedule '%s'" % node) + def schedule(self, nodes): """Schedules the provided nodes for *future* completion.