Adding notification points for job completion

Adding notifications for job completion, both consumed and abandoned, so that a
listener can take some action based on job completion.

Change-Id: I826285d4bfccd2406df7b59e53a9b724702ed094
This commit is contained in:
Min Pae
2015-11-04 09:27:25 -08:00
parent ae9c701f90
commit 6918b8fab0
2 changed files with 56 additions and 0 deletions

View File

@@ -106,6 +106,20 @@ class ExecutorConductor(base.Conductor):
activity defined above, the actual event name that can be registered
to subscribe to will be ``${event}_start`` and ``${event}_end`` where
the ``${event}`` in this pseudo-variable will be one of these events.
.. deprecated:: 1.23.0
Use :py:attr:`~EVENTS_EMITTED`
"""
EVENTS_EMITTED = tuple([
'compilation_start', 'compilation_end',
'preparation_start', 'preparation_end',
'validation_start', 'validation_end',
'running_start', 'running_end',
'job_consumed', 'job_abandoned',
])
"""Events will be emitted for each of the events above. The event is
emitted to listeners registered with the conductor.
"""
def __init__(self, name, jobboard,
@@ -217,8 +231,18 @@ class ExecutorConductor(base.Conductor):
try:
if consume:
self._jobboard.consume(job, self._name)
self._notifier.notify("job_consumed", {
'job': job,
'conductor': self,
'persistence': self._persistence,
})
else:
self._jobboard.abandon(job, self._name)
self._notifier.notify("job_abandoned", {
'job': job,
'conductor': self,
'persistence': self._persistence,
})
except (excp.JobFailure, excp.NotFound):
if consume:
self._log.warn("Failed job consumption: %s", job,

View File

@@ -113,11 +113,25 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
components = self.make_components()
components.conductor.connect()
consumed_event = threading.Event()
job_consumed_event = threading.Event()
job_abandoned_event = threading.Event()
def on_consume(state, details):
consumed_event.set()
def on_job_consumed(event, details):
if event == 'job_consumed':
job_consumed_event.set()
def on_job_abandoned(event, details):
if event == 'job_abandoned':
job_abandoned_event.set()
components.board.notifier.register(base.REMOVAL, on_consume)
components.conductor.notifier.register("job_consumed",
on_job_consumed)
components.conductor.notifier.register("job_abandoned",
on_job_abandoned)
with close_many(components.conductor, components.client):
t = threading_utils.daemon_thread(components.conductor.run)
t.start()
@@ -128,6 +142,8 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
self.assertTrue(job_consumed_event.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(job_abandoned_event.wait(1))
components.conductor.stop()
self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
@@ -171,11 +187,25 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
components = self.make_components()
components.conductor.connect()
consumed_event = threading.Event()
job_consumed_event = threading.Event()
job_abandoned_event = threading.Event()
def on_consume(state, details):
consumed_event.set()
def on_job_consumed(event, details):
if event == 'job_consumed':
job_consumed_event.set()
def on_job_abandoned(event, details):
if event == 'job_abandoned':
job_abandoned_event.set()
components.board.notifier.register(base.REMOVAL, on_consume)
components.conductor.notifier.register("job_consumed",
on_job_consumed)
components.conductor.notifier.register("job_abandoned",
on_job_abandoned)
with close_many(components.conductor, components.client):
t = threading_utils.daemon_thread(components.conductor.run)
t.start()
@@ -186,6 +216,8 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
self.assertTrue(job_consumed_event.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(job_abandoned_event.wait(1))
components.conductor.stop()
self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)