From 1e8fabd0cbeb88d267b3068d3f92ceb8333a31dc Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 1 Dec 2014 19:22:34 -0800 Subject: [PATCH] Split the scheduler into sub-schedulers Instead of having a larger scheduler class that contains logic for both the retry routine and the task routine split this into two classes and have the scheduler class use those sub-schedulers for internal scheduling. Change-Id: I6309a5fd172d5b20a01a2ba8b3e4cf8512d085fb --- taskflow/engines/action_engine/scheduler.py | 54 +++++++++++++-------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/taskflow/engines/action_engine/scheduler.py b/taskflow/engines/action_engine/scheduler.py index 266c9ebb..82bacbe4 100644 --- a/taskflow/engines/action_engine/scheduler.py +++ b/taskflow/engines/action_engine/scheduler.py @@ -21,31 +21,17 @@ from taskflow import task as task_atom from taskflow.types import failure -class Scheduler(object): - """Schedules atoms using actions to schedule.""" - +class _RetryScheduler(object): def __init__(self, runtime): self._runtime = runtime - self._analyzer = runtime.analyzer self._retry_action = runtime.retry_action - self._runtime = runtime self._storage = runtime.storage - self._task_action = runtime.task_action - def _schedule_node(self, node): - """Schedule a single node for execution.""" - # TODO(harlowja): we need to rework this so that we aren't doing type - # checking here, type checking usually means something isn't done right - # and usually will limit extensibility in the future. - if isinstance(node, task_atom.BaseTask): - return self._schedule_task(node) - elif isinstance(node, retry_atom.Retry): - return self._schedule_retry(node) - else: - raise TypeError("Unknown how to schedule atom '%s' (%s)" - % (node, type(node))) + @staticmethod + def handles(atom): + return isinstance(atom, retry_atom.Retry) - def _schedule_retry(self, retry): + def schedule(self, retry): """Schedules the given retry atom for *future* completion. Depending on the atoms stored intention this may schedule the retry @@ -64,7 +50,17 @@ class Scheduler(object): raise excp.ExecutionFailure("Unknown how to schedule retry with" " intention: %s" % intention) - def _schedule_task(self, task): + +class _TaskScheduler(object): + def __init__(self, runtime): + self._storage = runtime.storage + self._task_action = runtime.task_action + + @staticmethod + def handles(atom): + return isinstance(atom, task_atom.BaseTask) + + def schedule(self, task): """Schedules the given task atom for *future* completion. Depending on the atoms stored intention this may schedule the task @@ -79,6 +75,24 @@ class Scheduler(object): raise excp.ExecutionFailure("Unknown how to schedule task with" " intention: %s" % intention) + +class Scheduler(object): + """Schedules atoms using actions to schedule.""" + + def __init__(self, runtime): + self._schedulers = [ + _RetryScheduler(runtime), + _TaskScheduler(runtime), + ] + + def _schedule_node(self, node): + """Schedule a single node for execution.""" + for sched in self._schedulers: + if sched.handles(node): + return sched.schedule(node) + else: + raise TypeError("Unknown how to schedule '%s'" % node) + def schedule(self, nodes): """Schedules the provided nodes for *future* completion.