diff --git a/taskflow/examples/99_bottles.py b/taskflow/examples/99_bottles.py index 0211df5b..983bc201 100644 --- a/taskflow/examples/99_bottles.py +++ b/taskflow/examples/99_bottles.py @@ -15,6 +15,7 @@ # under the License. import contextlib +import functools import logging import os import sys @@ -104,17 +105,16 @@ def make_bottles(count): return s -def run_conductor(): +def run_conductor(only_run_once=False): # This continuously consumers until its stopped via ctrl-c or other # kill signal... - event_watches = {} # This will be triggered by the conductor doing various activities # with engines, and is quite nice to be able to see the various timing # segments (which is useful for debugging, or watching, or figuring out # where to optimize). - def on_conductor_event(event, details): + def on_conductor_event(cond, event, details): print("Event '%s' has been received..." % event) print("Details = %s" % details) if event.endswith("_start"): @@ -131,6 +131,8 @@ def run_conductor(): % (w.elapsed(), base_event)) except KeyError: pass + if event == 'running_end' and only_run_once: + cond.stop() print("Starting conductor with pid: %s" % ME) my_name = "conductor-%s" % ME @@ -144,6 +146,7 @@ def run_conductor(): with contextlib.closing(job_backend): cond = conductor_backends.fetch('blocking', my_name, job_backend, persistence=persist_backend) + on_conductor_event = functools.partial(on_conductor_event, cond) cond.notifier.register(cond.notifier.ANY, on_conductor_event) # Run forever, and kill -9 or ctrl-c me... try: @@ -184,9 +187,23 @@ def run_poster(): print("Goodbye...") +def main_local(): + # Run locally typically this is activating during unit testing when all + # the examples are made sure to still function correctly... + global TAKE_DOWN_DELAY + global PASS_AROUND_DELAY + global JB_CONF + # Make everything go much faster (so that this finishes quickly). + PASS_AROUND_DELAY = 0.01 + TAKE_DOWN_DELAY = 0.01 + JB_CONF['path'] = JB_CONF['path'] + "-" + uuidutils.generate_uuid() + run_poster() + run_conductor(only_run_once=True) + + def main(): if len(sys.argv) == 1: - sys.stderr.write("%s p|c\n" % os.path.basename(sys.argv[0])) + main_local() elif sys.argv[1] in ('p', 'c'): if sys.argv[-1] == "v": logging.basicConfig(level=5)