From 63c67302488bea87e2e2870dc83e73aacddeaa34 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 8 Jun 2015 22:16:47 -0700 Subject: [PATCH] Notify on the individual engine steps When a conductor is running it is quite useful to be able to how long each engine step takes. To enable this information being output, add a notifier to the base conductor and use it in the blocking conductor to emit events around engine activities. This makes it possible to track the timing (or other information that can be gathered from these events) in a non-intrusive manner. In the `99_bottles.py` demo we also now use this to be able to easily see what the conductor is actively doing (without having to enable the more verbose DEBUG level logging). Change-Id: Ifd8ff38f82fc8135fe5fec4c8e41f0e06f4fdee3 --- taskflow/conductors/backends/impl_blocking.py | 28 ++++++++++- taskflow/conductors/base.py | 13 ++++++ taskflow/examples/99_bottles.py | 46 +++++++++++++++++-- 3 files changed, 80 insertions(+), 7 deletions(-) diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py index 945f591b..a87eacdf 100644 --- a/taskflow/conductors/backends/impl_blocking.py +++ b/taskflow/conductors/backends/impl_blocking.py @@ -56,6 +56,16 @@ class BlockingConductor(base.Conductor): upon the jobboard capabilities to automatically abandon these jobs. """ + START_FINISH_EVENTS_EMITTED = tuple([ + 'compilation', 'preparation', + 'validation', 'running', + ]) + """Events will be emitted for the start and finish of each engine + activity defined above, the actual event name that can be registered + to subscribe to will be ``${event}_start`` and ``${event}_end`` where + the ``${event}`` in this pseudo-variable will be one of these events. + """ + def __init__(self, name, jobboard, persistence=None, engine=None, engine_options=None, wait_timeout=None): @@ -105,10 +115,24 @@ class BlockingConductor(base.Conductor): with ExitStack() as stack: for listener in listeners: stack.enter_context(listener) - LOG.debug("Dispatching engine %s for job: %s", engine, job) + LOG.debug("Dispatching engine for job '%s'", job) consume = True try: - engine.run() + for stage_func, event_name in [(engine.compile, 'compilation'), + (engine.prepare, 'preparation'), + (engine.validate, 'validation'), + (engine.run, 'running')]: + self._notifier.notify("%s_start" % event_name, { + 'job': job, + 'engine': engine, + 'conductor': self, + }) + stage_func() + self._notifier.notify("%s_end" % event_name, { + 'job': job, + 'engine': engine, + 'conductor': self, + }) except excp.WrappedFailure as e: if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)): consume = False diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py index 6e46fff8..69424232 100644 --- a/taskflow/conductors/base.py +++ b/taskflow/conductors/base.py @@ -20,6 +20,7 @@ import six from taskflow import engines from taskflow import exceptions as excp +from taskflow.types import notifier @six.add_metaclass(abc.ABCMeta) @@ -45,6 +46,18 @@ class Conductor(object): self._engine_options = engine_options.copy() self._persistence = persistence self._lock = threading.RLock() + self._notifier = notifier.Notifier() + + @property + def notifier(self): + """The conductor actions (or other state changes) notifier. + + NOTE(harlowja): different conductor implementations may emit + different events + event details at different times, so refer to your + conductor documentation to know exactly what can and what can not be + subscribed to. + """ + return self._notifier def _flow_detail_from_job(self, job): """Extracts a flow detail from a job (via some manner). diff --git a/taskflow/examples/99_bottles.py b/taskflow/examples/99_bottles.py index 90894e9c..cbcf54ce 100644 --- a/taskflow/examples/99_bottles.py +++ b/taskflow/examples/99_bottles.py @@ -34,6 +34,7 @@ from taskflow.patterns import linear_flow as lf from taskflow.persistence import backends as persistence_backends from taskflow.persistence import logbook from taskflow import task +from taskflow.types import timing from oslo_utils import uuidutils @@ -61,10 +62,11 @@ HOW_MANY_BOTTLES = 99 class TakeABottleDown(task.Task): - def execute(self): + def execute(self, bottles_left): sys.stdout.write('Take one down, ') sys.stdout.flush() time.sleep(TAKE_DOWN_DELAY) + return bottles_left - 1 class PassItAround(task.Task): @@ -82,16 +84,49 @@ class Conclusion(task.Task): def make_bottles(count): s = lf.Flow("bottle-song") - for bottle in reversed(list(range(1, count + 1))): - take_bottle = TakeABottleDown("take-bottle-%s" % bottle) + + take_bottle = TakeABottleDown("take-bottle-%s" % count, + inject={'bottles_left': count}, + provides='bottles_left') + pass_it = PassItAround("pass-%s-around" % count) + next_bottles = Conclusion("next-bottles-%s" % (count - 1)) + s.add(take_bottle, pass_it, next_bottles) + + for bottle in reversed(list(range(1, count))): + take_bottle = TakeABottleDown("take-bottle-%s" % bottle, + provides='bottles_left') pass_it = PassItAround("pass-%s-around" % bottle) - next_bottles = Conclusion("next-bottles-%s" % (bottle - 1), - inject={"bottles_left": bottle - 1}) + next_bottles = Conclusion("next-bottles-%s" % (bottle - 1)) s.add(take_bottle, pass_it, next_bottles) + return s def run_conductor(): + event_watches = {} + + # This will be triggered by the conductor doing various activities + # with engines, and is quite nice to be able to see the various timing + # segments (which is useful for debugging, or watching, or figuring out + # where to optimize). + def on_conductor_event(event, details): + print("Event '%s' has been received..." % event) + print("Details = %s" % details) + if event.endswith("_start"): + w = timing.StopWatch() + w.start() + base_event = event[0:-len("_start")] + event_watches[base_event] = w + if event.endswith("_end"): + base_event = event[0:-len("_end")] + try: + w = event_watches.pop(base_event) + w.stop() + print("It took %0.3f seconds for event '%s' to finish" + % (w.elapsed(), base_event)) + except KeyError: + pass + print("Starting conductor with pid: %s" % ME) my_name = "conductor-%s" % ME persist_backend = persistence_backends.fetch(PERSISTENCE_URI) @@ -104,6 +139,7 @@ def run_conductor(): with contextlib.closing(job_backend): cond = conductor_backends.fetch('blocking', my_name, job_backend, persistence=persist_backend) + cond.notifier.register(cond.notifier.ANY, on_conductor_event) # Run forever, and kill -9 or ctrl-c me... try: cond.run()