diff --git a/taskflow/conductors/backends/impl_executor.py b/taskflow/conductors/backends/impl_executor.py index c47488da..d61f3e0d 100644 --- a/taskflow/conductors/backends/impl_executor.py +++ b/taskflow/conductors/backends/impl_executor.py @@ -106,6 +106,20 @@ class ExecutorConductor(base.Conductor): activity defined above, the actual event name that can be registered to subscribe to will be ``${event}_start`` and ``${event}_end`` where the ``${event}`` in this pseudo-variable will be one of these events. + + .. deprecated:: 1.23.0 + Use :py:attr:`~EVENTS_EMITTED` + """ + + EVENTS_EMITTED = tuple([ + 'compilation_start', 'compilation_end', + 'preparation_start', 'preparation_end', + 'validation_start', 'validation_end', + 'running_start', 'running_end', + 'job_consumed', 'job_abandoned', + ]) + """Events will be emitted for each of the events above. The event is + emitted to listeners registered with the conductor. """ def __init__(self, name, jobboard, @@ -217,8 +231,18 @@ class ExecutorConductor(base.Conductor): try: if consume: self._jobboard.consume(job, self._name) + self._notifier.notify("job_consumed", { + 'job': job, + 'conductor': self, + 'persistence': self._persistence, + }) else: self._jobboard.abandon(job, self._name) + self._notifier.notify("job_abandoned", { + 'job': job, + 'conductor': self, + 'persistence': self._persistence, + }) except (excp.JobFailure, excp.NotFound): if consume: self._log.warn("Failed job consumption: %s", job, diff --git a/taskflow/tests/unit/test_conductors.py b/taskflow/tests/unit/test_conductors.py index d7f84d50..9fa46f98 100644 --- a/taskflow/tests/unit/test_conductors.py +++ b/taskflow/tests/unit/test_conductors.py @@ -113,11 +113,25 @@ class ManyConductorTest(testscenarios.TestWithScenarios, components = self.make_components() components.conductor.connect() consumed_event = threading.Event() + job_consumed_event = threading.Event() + job_abandoned_event = threading.Event() def on_consume(state, details): consumed_event.set() + def on_job_consumed(event, details): + if event == 'job_consumed': + job_consumed_event.set() + + def on_job_abandoned(event, details): + if event == 'job_abandoned': + job_abandoned_event.set() + components.board.notifier.register(base.REMOVAL, on_consume) + components.conductor.notifier.register("job_consumed", + on_job_consumed) + components.conductor.notifier.register("job_abandoned", + on_job_abandoned) with close_many(components.conductor, components.client): t = threading_utils.daemon_thread(components.conductor.run) t.start() @@ -128,6 +142,8 @@ class ManyConductorTest(testscenarios.TestWithScenarios, components.board.post('poke', lb, details={'flow_uuid': fd.uuid}) self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) + self.assertTrue(job_consumed_event.wait(test_utils.WAIT_TIMEOUT)) + self.assertFalse(job_abandoned_event.wait(1)) components.conductor.stop() self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT)) self.assertFalse(components.conductor.dispatching) @@ -171,11 +187,25 @@ class ManyConductorTest(testscenarios.TestWithScenarios, components = self.make_components() components.conductor.connect() consumed_event = threading.Event() + job_consumed_event = threading.Event() + job_abandoned_event = threading.Event() def on_consume(state, details): consumed_event.set() + def on_job_consumed(event, details): + if event == 'job_consumed': + job_consumed_event.set() + + def on_job_abandoned(event, details): + if event == 'job_abandoned': + job_abandoned_event.set() + components.board.notifier.register(base.REMOVAL, on_consume) + components.conductor.notifier.register("job_consumed", + on_job_consumed) + components.conductor.notifier.register("job_abandoned", + on_job_abandoned) with close_many(components.conductor, components.client): t = threading_utils.daemon_thread(components.conductor.run) t.start() @@ -186,6 +216,8 @@ class ManyConductorTest(testscenarios.TestWithScenarios, components.board.post('poke', lb, details={'flow_uuid': fd.uuid}) self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) + self.assertTrue(job_consumed_event.wait(test_utils.WAIT_TIMEOUT)) + self.assertFalse(job_abandoned_event.wait(1)) components.conductor.stop() self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT)) self.assertFalse(components.conductor.dispatching)