Add a nice run() method to the job class that will run a flow.

Add a run method that does the association of the flow with the
job and ensure that said association does not happen more than
once (aka the listeners are not duplicated).
This commit is contained in:
Joshua Harlow
2013-05-21 18:17:34 -07:00
parent 406d72a23e
commit a3c6040384

View File

@@ -24,6 +24,10 @@ from taskflow import states
from taskflow import utils
def task_and_state(task, state):
return "%s:%s" % (task.name, state)
class Claimer(object):
"""A base class for objects that can attempt to claim a given
job, so that said job can be worked on."""
@@ -82,54 +86,50 @@ class Job(object):
self._state = new_state
# TODO(harlowja): add logbook info?
def associate(self, wf, task_state_name_functor=None):
def _workflow_listener(self, context, flow, old_state):
"""Ensure that when we receive an event from said workflow that we
make sure a logbook entry exists for that flow."""
if flow.name in self.logbook:
return
self.logbook.add_flow(flow.name)
def _task_listener(self, context, state, flow, task, result=None):
"""Store the result of the task under the given flow in the log
book so that it can be retrieved later."""
metadata = None
flow_details = self.logbook[flow.name]
if state == states.SUCCESS:
metadata = {
'result': result,
}
task_state = task_and_state(task, state)
if task_state not in flow_details:
task_details = flow_details.add_task(task_state)
task_details.metadata = metadata
def _task_result_fetcher(self, context, flow, task):
flow_details = self.logbook[flow.name]
# See if it completed before so that we can use its results instead
# of having to recompute them.
task_state = task_and_state(task, states.SUCCESS)
if task_state in flow_details:
# TODO(harlowja): should we be a little more cautious about
# duplicate task results? Maybe we shouldn't allow them to
# have the same name in the first place?
task_details = flow_details[task_state][0]
if task_details.metadata and 'result' in task_details.metadata:
return (True, task_details.metadata['result'])
return (False, None)
def associate(self, flow):
"""Attachs the needed resumption and state change tracking listeners
to the given workflow so that the workflow can be resumed/tracked
using the jobs components."""
# TODO(harlowja): should this be in the job class or the workflow class
# or neither, still not quite sure...
def generate_task_name(task, state):
return "%s:%s" % (task.name, state)
if not task_state_name_functor:
task_state_name_functor = generate_task_name
def wf_state_change_listener(_context, wf, _old_state):
if wf.name in self.logbook:
return
self.logbook.add_flow(wf.name)
def task_result_fetcher(_context, wf, task):
wf_details = self.logbook[wf.name]
# See if it completed before so that we can use its results instead
# of having to recompute them.
td_name = task_state_name_functor(task, states.SUCCESS)
if td_name in wf_details:
# TODO(harlowja): should we be a little more cautious about
# duplicate task results? Maybe we shouldn't allow them to
# have the same name in the first place?
task_details = wf_details[td_name][0]
if task_details.metadata and 'result' in task_details.metadata:
return (True, task_details.metadata['result'])
return (False, None)
def task_state_change_listener(_context, state, wf, task, result=None):
metadata = None
wf_details = self.logbook[wf.name]
if state == states.SUCCESS:
metadata = {
'result': result,
}
td_name = task_state_name_functor(task, state)
if td_name not in wf_details:
td_details = wf_details.add_task(td_name)
td_details.metadata = metadata
wf.task_listeners.append(task_state_change_listener)
wf.listeners.append(wf_state_change_listener)
wf.result_fetcher = task_result_fetcher
if self._task_listener not in flow.task_listeners:
flow.task_listeners.append(self._task_listener)
if self._workflow_listener not in flow.listeners:
flow.listeners.append(self._workflow_listener)
flow.result_fetcher = self._task_result_fetcher
@property
def logbook(self):
@@ -152,6 +152,10 @@ class Job(object):
self._claimer.claim(self, owner)
self._change_state(states.CLAIMED)
def run(self, flow, *args, **kwargs):
self.associate(flow)
return flow.run(self.context, *args, **kwargs)
def unclaim(self):
"""Atomically transitions this job from claimed to unclaimed."""
if self.state == states.UNCLAIMED: