diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py index 48344c53..33c4441d 100644 --- a/taskflow/conductors/base.py +++ b/taskflow/conductors/base.py @@ -88,10 +88,28 @@ class Conductor(object): store = dict(job.details["store"]) else: store = {} - return engines.load_from_detail(flow_detail, store=store, - engine=self._engine, - backend=self._persistence, - **self._engine_options) + engine = engines.load_from_detail(flow_detail, store=store, + engine=self._engine, + backend=self._persistence, + **self._engine_options) + for listener in self._listeners_from_job(job, engine): + listener.register() + return engine + + def _listeners_from_job(self, job, engine): + """Returns a list of listeners to be attached to an engine. + + This method should be overridden in order to attach listeners to + engines. It will be called once for each job, and the list returned + listeners will be added to the engine for this job. + + :param job: A job instance that is about to be run in an engine. + :param engine: The engine that listeners will be attached to. + :returns: a list of (unregistered) listener instances. + """ + # TODO(dkrause): Create a standard way to pass listeners or + # listener factories over the jobboard + return [] @lock_utils.locked def connect(self):