diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py index 945f591b..a87eacdf 100644 --- a/taskflow/conductors/backends/impl_blocking.py +++ b/taskflow/conductors/backends/impl_blocking.py @@ -56,6 +56,16 @@ class BlockingConductor(base.Conductor): upon the jobboard capabilities to automatically abandon these jobs. """ + START_FINISH_EVENTS_EMITTED = tuple([ + 'compilation', 'preparation', + 'validation', 'running', + ]) + """Events will be emitted for the start and finish of each engine + activity defined above, the actual event name that can be registered + to subscribe to will be ``${event}_start`` and ``${event}_end`` where + the ``${event}`` in this pseudo-variable will be one of these events. + """ + def __init__(self, name, jobboard, persistence=None, engine=None, engine_options=None, wait_timeout=None): @@ -105,10 +115,24 @@ class BlockingConductor(base.Conductor): with ExitStack() as stack: for listener in listeners: stack.enter_context(listener) - LOG.debug("Dispatching engine %s for job: %s", engine, job) + LOG.debug("Dispatching engine for job '%s'", job) consume = True try: - engine.run() + for stage_func, event_name in [(engine.compile, 'compilation'), + (engine.prepare, 'preparation'), + (engine.validate, 'validation'), + (engine.run, 'running')]: + self._notifier.notify("%s_start" % event_name, { + 'job': job, + 'engine': engine, + 'conductor': self, + }) + stage_func() + self._notifier.notify("%s_end" % event_name, { + 'job': job, + 'engine': engine, + 'conductor': self, + }) except excp.WrappedFailure as e: if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)): consume = False diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py index 6e46fff8..69424232 100644 --- a/taskflow/conductors/base.py +++ b/taskflow/conductors/base.py @@ -20,6 +20,7 @@ import six from taskflow import engines from taskflow import exceptions as excp +from taskflow.types import notifier @six.add_metaclass(abc.ABCMeta) @@ -45,6 +46,18 @@ class Conductor(object): self._engine_options = engine_options.copy() self._persistence = persistence self._lock = threading.RLock() + self._notifier = notifier.Notifier() + + @property + def notifier(self): + """The conductor actions (or other state changes) notifier. + + NOTE(harlowja): different conductor implementations may emit + different events + event details at different times, so refer to your + conductor documentation to know exactly what can and what can not be + subscribed to. + """ + return self._notifier def _flow_detail_from_job(self, job): """Extracts a flow detail from a job (via some manner). diff --git a/taskflow/examples/99_bottles.py b/taskflow/examples/99_bottles.py index 90894e9c..cbcf54ce 100644 --- a/taskflow/examples/99_bottles.py +++ b/taskflow/examples/99_bottles.py @@ -34,6 +34,7 @@ from taskflow.patterns import linear_flow as lf from taskflow.persistence import backends as persistence_backends from taskflow.persistence import logbook from taskflow import task +from taskflow.types import timing from oslo_utils import uuidutils @@ -61,10 +62,11 @@ HOW_MANY_BOTTLES = 99 class TakeABottleDown(task.Task): - def execute(self): + def execute(self, bottles_left): sys.stdout.write('Take one down, ') sys.stdout.flush() time.sleep(TAKE_DOWN_DELAY) + return bottles_left - 1 class PassItAround(task.Task): @@ -82,16 +84,49 @@ class Conclusion(task.Task): def make_bottles(count): s = lf.Flow("bottle-song") - for bottle in reversed(list(range(1, count + 1))): - take_bottle = TakeABottleDown("take-bottle-%s" % bottle) + + take_bottle = TakeABottleDown("take-bottle-%s" % count, + inject={'bottles_left': count}, + provides='bottles_left') + pass_it = PassItAround("pass-%s-around" % count) + next_bottles = Conclusion("next-bottles-%s" % (count - 1)) + s.add(take_bottle, pass_it, next_bottles) + + for bottle in reversed(list(range(1, count))): + take_bottle = TakeABottleDown("take-bottle-%s" % bottle, + provides='bottles_left') pass_it = PassItAround("pass-%s-around" % bottle) - next_bottles = Conclusion("next-bottles-%s" % (bottle - 1), - inject={"bottles_left": bottle - 1}) + next_bottles = Conclusion("next-bottles-%s" % (bottle - 1)) s.add(take_bottle, pass_it, next_bottles) + return s def run_conductor(): + event_watches = {} + + # This will be triggered by the conductor doing various activities + # with engines, and is quite nice to be able to see the various timing + # segments (which is useful for debugging, or watching, or figuring out + # where to optimize). + def on_conductor_event(event, details): + print("Event '%s' has been received..." % event) + print("Details = %s" % details) + if event.endswith("_start"): + w = timing.StopWatch() + w.start() + base_event = event[0:-len("_start")] + event_watches[base_event] = w + if event.endswith("_end"): + base_event = event[0:-len("_end")] + try: + w = event_watches.pop(base_event) + w.stop() + print("It took %0.3f seconds for event '%s' to finish" + % (w.elapsed(), base_event)) + except KeyError: + pass + print("Starting conductor with pid: %s" % ME) my_name = "conductor-%s" % ME persist_backend = persistence_backends.fetch(PERSISTENCE_URI) @@ -104,6 +139,7 @@ def run_conductor(): with contextlib.closing(job_backend): cond = conductor_backends.fetch('blocking', my_name, job_backend, persistence=persist_backend) + cond.notifier.register(cond.notifier.ANY, on_conductor_event) # Run forever, and kill -9 or ctrl-c me... try: cond.run()