From c543dc20669aa4de645251d0418fd0e6e42b9557 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 9 Oct 2014 14:28:59 -0700 Subject: [PATCH] When creating daemon threads use the bundled threading_utils Instead of creating daemon threads using the threads module directly use our small utility file to create the daemon thread on our behalf and set the appropriate attributes to ensure it's a daemon thread. This change replaces the existing locations where we were doing this manually and uses the threading_utils helper function uniformly instead. Change-Id: I535cee8a63407f753cf812df53c4f5bc83e0c9ae --- .../examples/jobboard_produce_consume_colors.py | 7 +++---- taskflow/examples/wbe_mandelbrot.py | 5 ++--- taskflow/examples/wbe_simple_linear.py | 5 ++--- taskflow/tests/unit/conductor/test_conductor.py | 13 +++---------- taskflow/tests/unit/jobs/base.py | 7 ++----- taskflow/tests/unit/test_engines.py | 2 +- taskflow/tests/unit/test_utils.py | 5 ++--- taskflow/tests/unit/test_utils_lock_utils.py | 13 +++++-------- .../tests/unit/worker_based/test_message_pump.py | 11 +++-------- taskflow/tests/unit/worker_based/test_pipeline.py | 6 ++---- taskflow/tests/unit/worker_based/test_proxy.py | 5 ++--- 11 files changed, 27 insertions(+), 52 deletions(-) diff --git a/taskflow/examples/jobboard_produce_consume_colors.py b/taskflow/examples/jobboard_produce_consume_colors.py index aa80828f..80c2acba 100644 --- a/taskflow/examples/jobboard_produce_consume_colors.py +++ b/taskflow/examples/jobboard_produce_consume_colors.py @@ -35,6 +35,7 @@ from zake import fake_client from taskflow import exceptions as excp from taskflow.jobs import backends +from taskflow.utils import threading_utils # In this example we show how a jobboard can be used to post work for other # entities to work on. This example creates a set of jobs using one producer @@ -152,14 +153,12 @@ def main(): with contextlib.closing(fake_client.FakeClient()) as c: created = [] for i in compat_range(0, PRODUCERS): - p = threading.Thread(target=producer, args=(i + 1, c)) - p.daemon = True + p = threading_utils.daemon_thread(producer, i + 1, c) created.append(p) p.start() consumed = collections.deque() for i in compat_range(0, WORKERS): - w = threading.Thread(target=worker, args=(i + 1, c, consumed)) - w.daemon = True + w = threading_utils.daemon_thread(worker, i + 1, c, consumed) created.append(w) w.start() while created: diff --git a/taskflow/examples/wbe_mandelbrot.py b/taskflow/examples/wbe_mandelbrot.py index cf46c240..f21465e3 100644 --- a/taskflow/examples/wbe_mandelbrot.py +++ b/taskflow/examples/wbe_mandelbrot.py @@ -18,7 +18,6 @@ import logging import math import os import sys -import threading top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, @@ -31,6 +30,7 @@ from taskflow import engines from taskflow.engines.worker_based import worker from taskflow.patterns import unordered_flow as uf from taskflow import task +from taskflow.utils import threading_utils # INTRO: This example walks through a workflow that will in parallel compute # a mandelbrot result set (using X 'remote' workers) and then combine their @@ -229,8 +229,7 @@ def create_fractal(): worker_conf['topic'] = 'calculator_%s' % (i + 1) worker_topics.append(worker_conf['topic']) w = worker.Worker(**worker_conf) - runner = threading.Thread(target=w.run) - runner.daemon = True + runner = threading_utils.daemon_thread(w.run) runner.start() w.wait() workers.append((runner, w.stop)) diff --git a/taskflow/examples/wbe_simple_linear.py b/taskflow/examples/wbe_simple_linear.py index a15b48fa..bcaa8612 100644 --- a/taskflow/examples/wbe_simple_linear.py +++ b/taskflow/examples/wbe_simple_linear.py @@ -19,7 +19,6 @@ import logging import os import sys import tempfile -import threading top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, @@ -30,6 +29,7 @@ from taskflow import engines from taskflow.engines.worker_based import worker from taskflow.patterns import linear_flow as lf from taskflow.tests import utils +from taskflow.utils import threading_utils import example_utils # noqa @@ -123,8 +123,7 @@ if __name__ == "__main__": worker_conf['topic'] = 'worker-%s' % (i + 1) worker_topics.append(worker_conf['topic']) w = worker.Worker(**worker_conf) - runner = threading.Thread(target=w.run) - runner.daemon = True + runner = threading_utils.daemon_thread(w.run) runner.start() w.wait() workers.append((runner, w.stop)) diff --git a/taskflow/tests/unit/conductor/test_conductor.py b/taskflow/tests/unit/conductor/test_conductor.py index cf19fa84..0fb677ea 100644 --- a/taskflow/tests/unit/conductor/test_conductor.py +++ b/taskflow/tests/unit/conductor/test_conductor.py @@ -16,7 +16,6 @@ import collections import contextlib -import threading from zake import fake_client @@ -51,12 +50,6 @@ def test_factory(blowup): return f -def make_thread(conductor): - t = threading.Thread(target=conductor.run) - t.daemon = True - return t - - class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): ComponentBundle = collections.namedtuple('ComponentBundle', ['board', 'client', @@ -85,7 +78,7 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): components = self.make_components() components.conductor.connect() with close_many(components.conductor, components.client): - t = make_thread(components.conductor) + t = threading_utils.daemon_thread(components.conductor.run) t.start() self.assertTrue( components.conductor.stop(test_utils.WAIT_TIMEOUT)) @@ -102,7 +95,7 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): components.board.notifier.register(jobboard.REMOVAL, on_consume) with close_many(components.conductor, components.client): - t = make_thread(components.conductor) + t = threading_utils.daemon_thread(components.conductor.run) t.start() lb, fd = pu.temporary_flow_detail(components.persistence) engines.save_factory_details(fd, test_factory, @@ -131,7 +124,7 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): components.board.notifier.register(jobboard.REMOVAL, on_consume) with close_many(components.conductor, components.client): - t = make_thread(components.conductor) + t = threading_utils.daemon_thread(components.conductor.run) t.start() lb, fd = pu.temporary_flow_detail(components.persistence) engines.save_factory_details(fd, test_factory, diff --git a/taskflow/tests/unit/jobs/base.py b/taskflow/tests/unit/jobs/base.py index e0a20a04..24da7d3a 100644 --- a/taskflow/tests/unit/jobs/base.py +++ b/taskflow/tests/unit/jobs/base.py @@ -15,7 +15,6 @@ # under the License. import contextlib -import threading import time from kazoo.recipe import watchers @@ -143,11 +142,9 @@ class BoardTestMixin(object): jobs.extend(it) with connect_close(self.board): - t1 = threading.Thread(target=poster) - t1.daemon = True + t1 = threading_utils.daemon_thread(poster) t1.start() - t2 = threading.Thread(target=waiter) - t2.daemon = True + t2 = threading_utils.daemon_thread(waiter) t2.start() for t in (t1, t2): t.join() diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 43153f36..1ff81353 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -644,7 +644,7 @@ class WorkerBasedEngineTest(EngineTaskTest, 'topics': tuple([worker_conf['topic']]), }) self.worker = wkr.Worker(**worker_conf) - self.worker_thread = tu.daemon_thread(target=self.worker.run) + self.worker_thread = tu.daemon_thread(self.worker.run) self.worker_thread.start() # Make sure the worker is started before we can continue... diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py index 38417810..66f08c09 100644 --- a/taskflow/tests/unit/test_utils.py +++ b/taskflow/tests/unit/test_utils.py @@ -17,7 +17,6 @@ import collections import inspect import random -import threading import time import six @@ -29,6 +28,7 @@ from taskflow.types import failure from taskflow.utils import lock_utils from taskflow.utils import misc from taskflow.utils import reflection +from taskflow.utils import threading_utils def mere_function(a, b): @@ -373,8 +373,7 @@ class CachedPropertyTest(test.TestCase): threads = [] try: for _i in range(0, 20): - t = threading.Thread(target=lambda: a.b) - t.daemon = True + t = threading_utils.daemon_thread(lambda: a.b) threads.append(t) for t in threads: t.start() diff --git a/taskflow/tests/unit/test_utils_lock_utils.py b/taskflow/tests/unit/test_utils_lock_utils.py index 37bc1711..06bef1ee 100644 --- a/taskflow/tests/unit/test_utils_lock_utils.py +++ b/taskflow/tests/unit/test_utils_lock_utils.py @@ -195,8 +195,7 @@ class MultilockTest(test.TestCase): threads = [] for _i in range(0, 20): - t = threading.Thread(target=run) - t.daemon = True + t = threading_utils.daemon_thread(run) threads.append(t) t.start() while threads: @@ -234,9 +233,8 @@ class MultilockTest(test.TestCase): target = run_fail else: target = run - t = threading.Thread(target=target) + t = threading_utils.daemon_thread(target) threads.append(t) - t.daemon = True t.start() while threads: t = threads.pop() @@ -287,9 +285,8 @@ class MultilockTest(test.TestCase): threads = [] for i in range(0, 20): - t = threading.Thread(target=run) + t = threading_utils.daemon_thread(run) threads.append(t) - t.daemon = True t.start() while threads: t = threads.pop() @@ -369,11 +366,11 @@ class ReadWriteLockTest(test.TestCase): with lock.write_lock(): activated.append(lock.owner) - reader = threading.Thread(target=double_reader) + reader = threading_utils.daemon_thread(double_reader) reader.start() self.assertTrue(active.wait(test_utils.WAIT_TIMEOUT)) - writer = threading.Thread(target=happy_writer) + writer = threading_utils.daemon_thread(happy_writer) writer.start() reader.join() diff --git a/taskflow/tests/unit/worker_based/test_message_pump.py b/taskflow/tests/unit/worker_based/test_message_pump.py index 1fc946ed..8b48b435 100644 --- a/taskflow/tests/unit/worker_based/test_message_pump.py +++ b/taskflow/tests/unit/worker_based/test_message_pump.py @@ -14,8 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import threading - from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import proxy from taskflow.openstack.common import uuidutils @@ -43,8 +41,7 @@ class TestMessagePump(test.TestCase): 'polling_interval': POLLING_INTERVAL, }) - t = threading.Thread(target=p.start) - t.daemon = True + t = threading_utils.daemon_thread(p.start) t.start() p.wait() p.publish(pr.Notify(), TEST_TOPIC) @@ -69,8 +66,7 @@ class TestMessagePump(test.TestCase): 'polling_interval': POLLING_INTERVAL, }) - t = threading.Thread(target=p.start) - t.daemon = True + t = threading_utils.daemon_thread(p.start) t.start() p.wait() resp = pr.Response(pr.RUNNING) @@ -109,8 +105,7 @@ class TestMessagePump(test.TestCase): 'polling_interval': POLLING_INTERVAL, }) - t = threading.Thread(target=p.start) - t.daemon = True + t = threading_utils.daemon_thread(p.start) t.start() p.wait() diff --git a/taskflow/tests/unit/worker_based/test_pipeline.py b/taskflow/tests/unit/worker_based/test_pipeline.py index b86cedd0..ed3e2662 100644 --- a/taskflow/tests/unit/worker_based/test_pipeline.py +++ b/taskflow/tests/unit/worker_based/test_pipeline.py @@ -14,8 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import threading - from concurrent import futures from taskflow.engines.worker_based import endpoint @@ -25,6 +23,7 @@ from taskflow.openstack.common import uuidutils from taskflow import test from taskflow.tests import utils as test_utils from taskflow.types import failure +from taskflow.utils import threading_utils TEST_EXCHANGE, TEST_TOPIC = ('test-exchange', 'test-topic') @@ -44,8 +43,7 @@ class TestPipeline(test.TestCase): transport_options={ 'polling_interval': POLLING_INTERVAL, }) - server_thread = threading.Thread(target=server.start) - server_thread.daemon = True + server_thread = threading_utils.daemon_thread(server.start) return (server, server_thread) def _fetch_executor(self): diff --git a/taskflow/tests/unit/worker_based/test_proxy.py b/taskflow/tests/unit/worker_based/test_proxy.py index de5f3abc..4217a726 100644 --- a/taskflow/tests/unit/worker_based/test_proxy.py +++ b/taskflow/tests/unit/worker_based/test_proxy.py @@ -15,12 +15,12 @@ # under the License. import socket -import threading from six.moves import mock from taskflow.engines.worker_based import proxy from taskflow import test +from taskflow.utils import threading_utils class TestProxy(test.MockTestCase): @@ -210,8 +210,7 @@ class TestProxy(test.MockTestCase): self.assertFalse(pr.is_running) # start proxy in separate thread - t = threading.Thread(target=pr.start) - t.daemon = True + t = threading_utils.daemon_thread(pr.start) t.start() # make sure proxy is started