Merge "Split the scheduler into sub-schedulers"
This commit is contained in:
@@ -21,31 +21,17 @@ from taskflow import task as task_atom
|
||||
from taskflow.types import failure
|
||||
|
||||
|
||||
class Scheduler(object):
|
||||
"""Schedules atoms using actions to schedule."""
|
||||
|
||||
class _RetryScheduler(object):
|
||||
def __init__(self, runtime):
|
||||
self._runtime = runtime
|
||||
self._analyzer = runtime.analyzer
|
||||
self._retry_action = runtime.retry_action
|
||||
self._runtime = runtime
|
||||
self._storage = runtime.storage
|
||||
self._task_action = runtime.task_action
|
||||
|
||||
def _schedule_node(self, node):
|
||||
"""Schedule a single node for execution."""
|
||||
# TODO(harlowja): we need to rework this so that we aren't doing type
|
||||
# checking here, type checking usually means something isn't done right
|
||||
# and usually will limit extensibility in the future.
|
||||
if isinstance(node, task_atom.BaseTask):
|
||||
return self._schedule_task(node)
|
||||
elif isinstance(node, retry_atom.Retry):
|
||||
return self._schedule_retry(node)
|
||||
else:
|
||||
raise TypeError("Unknown how to schedule atom '%s' (%s)"
|
||||
% (node, type(node)))
|
||||
@staticmethod
|
||||
def handles(atom):
|
||||
return isinstance(atom, retry_atom.Retry)
|
||||
|
||||
def _schedule_retry(self, retry):
|
||||
def schedule(self, retry):
|
||||
"""Schedules the given retry atom for *future* completion.
|
||||
|
||||
Depending on the atoms stored intention this may schedule the retry
|
||||
@@ -64,7 +50,17 @@ class Scheduler(object):
|
||||
raise excp.ExecutionFailure("Unknown how to schedule retry with"
|
||||
" intention: %s" % intention)
|
||||
|
||||
def _schedule_task(self, task):
|
||||
|
||||
class _TaskScheduler(object):
|
||||
def __init__(self, runtime):
|
||||
self._storage = runtime.storage
|
||||
self._task_action = runtime.task_action
|
||||
|
||||
@staticmethod
|
||||
def handles(atom):
|
||||
return isinstance(atom, task_atom.BaseTask)
|
||||
|
||||
def schedule(self, task):
|
||||
"""Schedules the given task atom for *future* completion.
|
||||
|
||||
Depending on the atoms stored intention this may schedule the task
|
||||
@@ -79,6 +75,24 @@ class Scheduler(object):
|
||||
raise excp.ExecutionFailure("Unknown how to schedule task with"
|
||||
" intention: %s" % intention)
|
||||
|
||||
|
||||
class Scheduler(object):
|
||||
"""Schedules atoms using actions to schedule."""
|
||||
|
||||
def __init__(self, runtime):
|
||||
self._schedulers = [
|
||||
_RetryScheduler(runtime),
|
||||
_TaskScheduler(runtime),
|
||||
]
|
||||
|
||||
def _schedule_node(self, node):
|
||||
"""Schedule a single node for execution."""
|
||||
for sched in self._schedulers:
|
||||
if sched.handles(node):
|
||||
return sched.schedule(node)
|
||||
else:
|
||||
raise TypeError("Unknown how to schedule '%s'" % node)
|
||||
|
||||
def schedule(self, nodes):
|
||||
"""Schedules the provided nodes for *future* completion.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user