diff --git a/taskflow/job.py b/taskflow/job.py index cb1b6aee..467b0db5 100644 --- a/taskflow/job.py +++ b/taskflow/job.py @@ -143,16 +143,38 @@ class Job(object): return (True, task_details.metadata['result']) return (False, None) - def associate(self, flow): + def associate(self, flow, parents=True): """Attachs the needed resumption and state change tracking listeners to the given workflow so that the workflow can be resumed/tracked using the jobs components.""" + if self._task_listener not in flow.task_listeners: flow.task_listeners.append(self._task_listener) if self._workflow_listener not in flow.listeners: flow.listeners.append(self._workflow_listener) flow.result_fetcher = self._task_result_fetcher + # Associate the parents as well (if desired) + if parents and flow.parents: + for p in flow.parents: + self.associate(p, parents) + + def disassociate(self, flow, parents=True): + """Detaches the needed resumption and state change tracking listeners + from the given workflow.""" + + if self._task_listener in flow.task_listeners: + flow.task_listeners.remove(self._task_listener) + if self._workflow_listener in flow.listeners: + flow.listeners.remove(self._workflow_listener) + if flow.result_fetcher is self._task_result_fetcher: + flow.result_fetcher = None + + # Disassociate from the flows parents (if desired) + if parents and flow.parents: + for p in flow.parents: + self.disassociate(p, parents) + @property def logbook(self): """Fetches (or creates) a logbook entry for this job."""