Merge "Notify on the individual engine steps"
This commit is contained in:
@@ -57,6 +57,16 @@ class BlockingConductor(base.Conductor):
|
||||
upon the jobboard capabilities to automatically abandon these jobs.
|
||||
"""
|
||||
|
||||
START_FINISH_EVENTS_EMITTED = tuple([
|
||||
'compilation', 'preparation',
|
||||
'validation', 'running',
|
||||
])
|
||||
"""Events will be emitted for the start and finish of each engine
|
||||
activity defined above, the actual event name that can be registered
|
||||
to subscribe to will be ``${event}_start`` and ``${event}_end`` where
|
||||
the ``${event}`` in this pseudo-variable will be one of these events.
|
||||
"""
|
||||
|
||||
def __init__(self, name, jobboard,
|
||||
persistence=None, engine=None,
|
||||
engine_options=None, wait_timeout=None):
|
||||
@@ -108,10 +118,24 @@ class BlockingConductor(base.Conductor):
|
||||
with ExitStack() as stack:
|
||||
for listener in listeners:
|
||||
stack.enter_context(listener)
|
||||
LOG.debug("Dispatching engine %s for job: %s", engine, job)
|
||||
LOG.debug("Dispatching engine for job '%s'", job)
|
||||
consume = True
|
||||
try:
|
||||
engine.run()
|
||||
for stage_func, event_name in [(engine.compile, 'compilation'),
|
||||
(engine.prepare, 'preparation'),
|
||||
(engine.validate, 'validation'),
|
||||
(engine.run, 'running')]:
|
||||
self._notifier.notify("%s_start" % event_name, {
|
||||
'job': job,
|
||||
'engine': engine,
|
||||
'conductor': self,
|
||||
})
|
||||
stage_func()
|
||||
self._notifier.notify("%s_end" % event_name, {
|
||||
'job': job,
|
||||
'engine': engine,
|
||||
'conductor': self,
|
||||
})
|
||||
except excp.WrappedFailure as e:
|
||||
if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)):
|
||||
consume = False
|
||||
|
||||
@@ -20,6 +20,7 @@ import six
|
||||
|
||||
from taskflow import engines
|
||||
from taskflow import exceptions as excp
|
||||
from taskflow.types import notifier
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
@@ -45,6 +46,18 @@ class Conductor(object):
|
||||
self._engine_options = engine_options.copy()
|
||||
self._persistence = persistence
|
||||
self._lock = threading.RLock()
|
||||
self._notifier = notifier.Notifier()
|
||||
|
||||
@property
|
||||
def notifier(self):
|
||||
"""The conductor actions (or other state changes) notifier.
|
||||
|
||||
NOTE(harlowja): different conductor implementations may emit
|
||||
different events + event details at different times, so refer to your
|
||||
conductor documentation to know exactly what can and what can not be
|
||||
subscribed to.
|
||||
"""
|
||||
return self._notifier
|
||||
|
||||
def _flow_detail_from_job(self, job):
|
||||
"""Extracts a flow detail from a job (via some manner).
|
||||
|
||||
@@ -34,6 +34,7 @@ from taskflow.patterns import linear_flow as lf
|
||||
from taskflow.persistence import backends as persistence_backends
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow import task
|
||||
from taskflow.types import timing
|
||||
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
@@ -61,10 +62,11 @@ HOW_MANY_BOTTLES = 99
|
||||
|
||||
|
||||
class TakeABottleDown(task.Task):
|
||||
def execute(self):
|
||||
def execute(self, bottles_left):
|
||||
sys.stdout.write('Take one down, ')
|
||||
sys.stdout.flush()
|
||||
time.sleep(TAKE_DOWN_DELAY)
|
||||
return bottles_left - 1
|
||||
|
||||
|
||||
class PassItAround(task.Task):
|
||||
@@ -82,16 +84,49 @@ class Conclusion(task.Task):
|
||||
|
||||
def make_bottles(count):
|
||||
s = lf.Flow("bottle-song")
|
||||
for bottle in reversed(list(range(1, count + 1))):
|
||||
take_bottle = TakeABottleDown("take-bottle-%s" % bottle)
|
||||
|
||||
take_bottle = TakeABottleDown("take-bottle-%s" % count,
|
||||
inject={'bottles_left': count},
|
||||
provides='bottles_left')
|
||||
pass_it = PassItAround("pass-%s-around" % count)
|
||||
next_bottles = Conclusion("next-bottles-%s" % (count - 1))
|
||||
s.add(take_bottle, pass_it, next_bottles)
|
||||
|
||||
for bottle in reversed(list(range(1, count))):
|
||||
take_bottle = TakeABottleDown("take-bottle-%s" % bottle,
|
||||
provides='bottles_left')
|
||||
pass_it = PassItAround("pass-%s-around" % bottle)
|
||||
next_bottles = Conclusion("next-bottles-%s" % (bottle - 1),
|
||||
inject={"bottles_left": bottle - 1})
|
||||
next_bottles = Conclusion("next-bottles-%s" % (bottle - 1))
|
||||
s.add(take_bottle, pass_it, next_bottles)
|
||||
|
||||
return s
|
||||
|
||||
|
||||
def run_conductor():
|
||||
event_watches = {}
|
||||
|
||||
# This will be triggered by the conductor doing various activities
|
||||
# with engines, and is quite nice to be able to see the various timing
|
||||
# segments (which is useful for debugging, or watching, or figuring out
|
||||
# where to optimize).
|
||||
def on_conductor_event(event, details):
|
||||
print("Event '%s' has been received..." % event)
|
||||
print("Details = %s" % details)
|
||||
if event.endswith("_start"):
|
||||
w = timing.StopWatch()
|
||||
w.start()
|
||||
base_event = event[0:-len("_start")]
|
||||
event_watches[base_event] = w
|
||||
if event.endswith("_end"):
|
||||
base_event = event[0:-len("_end")]
|
||||
try:
|
||||
w = event_watches.pop(base_event)
|
||||
w.stop()
|
||||
print("It took %0.3f seconds for event '%s' to finish"
|
||||
% (w.elapsed(), base_event))
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
print("Starting conductor with pid: %s" % ME)
|
||||
my_name = "conductor-%s" % ME
|
||||
persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
|
||||
@@ -104,6 +139,7 @@ def run_conductor():
|
||||
with contextlib.closing(job_backend):
|
||||
cond = conductor_backends.fetch('blocking', my_name, job_backend,
|
||||
persistence=persist_backend)
|
||||
cond.notifier.register(cond.notifier.ANY, on_conductor_event)
|
||||
# Run forever, and kill -9 or ctrl-c me...
|
||||
try:
|
||||
cond.run()
|
||||
|
||||
Reference in New Issue
Block a user