From 40d19c7696f1e0b7d75eacbd271974ee9155c019 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 8 Jun 2015 19:16:16 -0700 Subject: [PATCH] Handle conductor ctrl-c more appropriately When a conductor program is interrupted via ctrl-c or equivalent it is much nicer log that that has happened and to reraise that exception. This also slightly tweaks the 99 bottles song to make it even better, by having more pieces/tasks, which makes it possible to kill the program during each task and see how the resumption works when a flow is composed of segments. Change-Id: I5d242eba9a043ef96646ba74ea5928daa0691ed0 --- taskflow/conductors/backends/impl_blocking.py | 4 ++ taskflow/examples/99_bottles.py | 54 ++++++++++++------- 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py index fb8a3c3a..945f591b 100644 --- a/taskflow/conductors/backends/impl_blocking.py +++ b/taskflow/conductors/backends/impl_blocking.py @@ -18,6 +18,7 @@ except ImportError: from contextlib2 import ExitStack # noqa from debtcollector import removals +from oslo_utils import excutils import six from taskflow.conductors import base @@ -151,6 +152,9 @@ class BlockingConductor(base.Conductor): consume = False try: f = self._dispatch_job(job) + except KeyboardInterrupt: + with excutils.save_and_reraise_exception(): + LOG.warn("Job dispatching interrupted: %s", job) except Exception: LOG.warn("Job dispatching failed: %s", job, exc_info=True) diff --git a/taskflow/examples/99_bottles.py b/taskflow/examples/99_bottles.py index 9959255b..90894e9c 100644 --- a/taskflow/examples/99_bottles.py +++ b/taskflow/examples/99_bottles.py @@ -54,33 +54,47 @@ JB_CONF = { 'board': 'zookeeper', 'path': '/taskflow/99-bottles-demo', } -DB_URI = r"sqlite:////tmp/bottles.db" -PART_DELAY = 1.0 +PERSISTENCE_URI = r"sqlite:////tmp/bottles.db" +TAKE_DOWN_DELAY = 1.0 +PASS_AROUND_DELAY = 3.0 HOW_MANY_BOTTLES = 99 -class TakeABottleDownPassItAround(task.Task): - def execute(self, bottles_left): +class TakeABottleDown(task.Task): + def execute(self): sys.stdout.write('Take one down, ') - time.sleep(PART_DELAY) + sys.stdout.flush() + time.sleep(TAKE_DOWN_DELAY) + + +class PassItAround(task.Task): + def execute(self): sys.stdout.write('pass it around, ') - time.sleep(PART_DELAY) + sys.stdout.flush() + time.sleep(PASS_AROUND_DELAY) + + +class Conclusion(task.Task): + def execute(self, bottles_left): sys.stdout.write('%s bottles of beer on the wall...\n' % bottles_left) + sys.stdout.flush() def make_bottles(count): s = lf.Flow("bottle-song") for bottle in reversed(list(range(1, count + 1))): - t = TakeABottleDownPassItAround("take-bottle-%s" % bottle, - inject={"bottles_left": bottle - 1}) - s.add(t) + take_bottle = TakeABottleDown("take-bottle-%s" % bottle) + pass_it = PassItAround("pass-%s-around" % bottle) + next_bottles = Conclusion("next-bottles-%s" % (bottle - 1), + inject={"bottles_left": bottle - 1}) + s.add(take_bottle, pass_it, next_bottles) return s def run_conductor(): print("Starting conductor with pid: %s" % ME) my_name = "conductor-%s" % ME - persist_backend = persistence_backends.fetch(DB_URI) + persist_backend = persistence_backends.fetch(PERSISTENCE_URI) with contextlib.closing(persist_backend): with contextlib.closing(persist_backend.get_connection()) as conn: conn.upgrade() @@ -90,17 +104,18 @@ def run_conductor(): with contextlib.closing(job_backend): cond = conductor_backends.fetch('blocking', my_name, job_backend, persistence=persist_backend) - # Run forever, and kill -9 me... - # - # TODO(harlowja): it would be nicer if we could handle - # ctrl-c better... - cond.run() + # Run forever, and kill -9 or ctrl-c me... + try: + cond.run() + finally: + cond.stop() + cond.wait() def run_poster(): print("Starting poster with pid: %s" % ME) my_name = "poster-%s" % ME - persist_backend = persistence_backends.fetch(DB_URI) + persist_backend = persistence_backends.fetch(PERSISTENCE_URI) with contextlib.closing(persist_backend): with contextlib.closing(persist_backend.get_connection()) as conn: conn.upgrade() @@ -128,11 +143,12 @@ def run_poster(): def main(): if len(sys.argv) == 1: sys.stderr.write("%s p|c\n" % os.path.basename(sys.argv[0])) - return - if sys.argv[1] == 'p': + elif sys.argv[1] == 'p': run_poster() - if sys.argv[1] == 'c': + elif sys.argv[1] == 'c': run_conductor() + else: + sys.stderr.write("%s p|c\n" % os.path.basename(sys.argv[0])) if __name__ == '__main__':