diff --git a/taskflow/examples/jobboard_produce_consume_colors.py b/taskflow/examples/jobboard_produce_consume_colors.py index aa80828f2..80c2acbab 100644 --- a/taskflow/examples/jobboard_produce_consume_colors.py +++ b/taskflow/examples/jobboard_produce_consume_colors.py @@ -35,6 +35,7 @@ from zake import fake_client from taskflow import exceptions as excp from taskflow.jobs import backends +from taskflow.utils import threading_utils # In this example we show how a jobboard can be used to post work for other # entities to work on. This example creates a set of jobs using one producer @@ -152,14 +153,12 @@ def main(): with contextlib.closing(fake_client.FakeClient()) as c: created = [] for i in compat_range(0, PRODUCERS): - p = threading.Thread(target=producer, args=(i + 1, c)) - p.daemon = True + p = threading_utils.daemon_thread(producer, i + 1, c) created.append(p) p.start() consumed = collections.deque() for i in compat_range(0, WORKERS): - w = threading.Thread(target=worker, args=(i + 1, c, consumed)) - w.daemon = True + w = threading_utils.daemon_thread(worker, i + 1, c, consumed) created.append(w) w.start() while created: diff --git a/taskflow/examples/wbe_mandelbrot.py b/taskflow/examples/wbe_mandelbrot.py index cf46c240b..f21465e34 100644 --- a/taskflow/examples/wbe_mandelbrot.py +++ b/taskflow/examples/wbe_mandelbrot.py @@ -18,7 +18,6 @@ import logging import math import os import sys -import threading top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, @@ -31,6 +30,7 @@ from taskflow import engines from taskflow.engines.worker_based import worker from taskflow.patterns import unordered_flow as uf from taskflow import task +from taskflow.utils import threading_utils # INTRO: This example walks through a workflow that will in parallel compute # a mandelbrot result set (using X 'remote' workers) and then combine their @@ -229,8 +229,7 @@ def create_fractal(): worker_conf['topic'] = 'calculator_%s' % (i + 1) worker_topics.append(worker_conf['topic']) w = worker.Worker(**worker_conf) - runner = threading.Thread(target=w.run) - runner.daemon = True + runner = threading_utils.daemon_thread(w.run) runner.start() w.wait() workers.append((runner, w.stop)) diff --git a/taskflow/examples/wbe_simple_linear.py b/taskflow/examples/wbe_simple_linear.py index a15b48fae..bcaa86121 100644 --- a/taskflow/examples/wbe_simple_linear.py +++ b/taskflow/examples/wbe_simple_linear.py @@ -19,7 +19,6 @@ import logging import os import sys import tempfile -import threading top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, @@ -30,6 +29,7 @@ from taskflow import engines from taskflow.engines.worker_based import worker from taskflow.patterns import linear_flow as lf from taskflow.tests import utils +from taskflow.utils import threading_utils import example_utils # noqa @@ -123,8 +123,7 @@ if __name__ == "__main__": worker_conf['topic'] = 'worker-%s' % (i + 1) worker_topics.append(worker_conf['topic']) w = worker.Worker(**worker_conf) - runner = threading.Thread(target=w.run) - runner.daemon = True + runner = threading_utils.daemon_thread(w.run) runner.start() w.wait() workers.append((runner, w.stop)) diff --git a/taskflow/tests/unit/conductor/test_conductor.py b/taskflow/tests/unit/conductor/test_conductor.py index cf19fa840..0fb677ea6 100644 --- a/taskflow/tests/unit/conductor/test_conductor.py +++ b/taskflow/tests/unit/conductor/test_conductor.py @@ -16,7 +16,6 @@ import collections import contextlib -import threading from zake import fake_client @@ -51,12 +50,6 @@ def test_factory(blowup): return f -def make_thread(conductor): - t = threading.Thread(target=conductor.run) - t.daemon = True - return t - - class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): ComponentBundle = collections.namedtuple('ComponentBundle', ['board', 'client', @@ -85,7 +78,7 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): components = self.make_components() components.conductor.connect() with close_many(components.conductor, components.client): - t = make_thread(components.conductor) + t = threading_utils.daemon_thread(components.conductor.run) t.start() self.assertTrue( components.conductor.stop(test_utils.WAIT_TIMEOUT)) @@ -102,7 +95,7 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): components.board.notifier.register(jobboard.REMOVAL, on_consume) with close_many(components.conductor, components.client): - t = make_thread(components.conductor) + t = threading_utils.daemon_thread(components.conductor.run) t.start() lb, fd = pu.temporary_flow_detail(components.persistence) engines.save_factory_details(fd, test_factory, @@ -131,7 +124,7 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): components.board.notifier.register(jobboard.REMOVAL, on_consume) with close_many(components.conductor, components.client): - t = make_thread(components.conductor) + t = threading_utils.daemon_thread(components.conductor.run) t.start() lb, fd = pu.temporary_flow_detail(components.persistence) engines.save_factory_details(fd, test_factory, diff --git a/taskflow/tests/unit/jobs/base.py b/taskflow/tests/unit/jobs/base.py index e0a20a044..24da7d3ae 100644 --- a/taskflow/tests/unit/jobs/base.py +++ b/taskflow/tests/unit/jobs/base.py @@ -15,7 +15,6 @@ # under the License. import contextlib -import threading import time from kazoo.recipe import watchers @@ -143,11 +142,9 @@ class BoardTestMixin(object): jobs.extend(it) with connect_close(self.board): - t1 = threading.Thread(target=poster) - t1.daemon = True + t1 = threading_utils.daemon_thread(poster) t1.start() - t2 = threading.Thread(target=waiter) - t2.daemon = True + t2 = threading_utils.daemon_thread(waiter) t2.start() for t in (t1, t2): t.join() diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 43153f36f..1ff813539 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -644,7 +644,7 @@ class WorkerBasedEngineTest(EngineTaskTest, 'topics': tuple([worker_conf['topic']]), }) self.worker = wkr.Worker(**worker_conf) - self.worker_thread = tu.daemon_thread(target=self.worker.run) + self.worker_thread = tu.daemon_thread(self.worker.run) self.worker_thread.start() # Make sure the worker is started before we can continue... diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py index 384178107..66f08c09a 100644 --- a/taskflow/tests/unit/test_utils.py +++ b/taskflow/tests/unit/test_utils.py @@ -17,7 +17,6 @@ import collections import inspect import random -import threading import time import six @@ -29,6 +28,7 @@ from taskflow.types import failure from taskflow.utils import lock_utils from taskflow.utils import misc from taskflow.utils import reflection +from taskflow.utils import threading_utils def mere_function(a, b): @@ -373,8 +373,7 @@ class CachedPropertyTest(test.TestCase): threads = [] try: for _i in range(0, 20): - t = threading.Thread(target=lambda: a.b) - t.daemon = True + t = threading_utils.daemon_thread(lambda: a.b) threads.append(t) for t in threads: t.start() diff --git a/taskflow/tests/unit/test_utils_lock_utils.py b/taskflow/tests/unit/test_utils_lock_utils.py index 37bc17114..06bef1ee6 100644 --- a/taskflow/tests/unit/test_utils_lock_utils.py +++ b/taskflow/tests/unit/test_utils_lock_utils.py @@ -195,8 +195,7 @@ class MultilockTest(test.TestCase): threads = [] for _i in range(0, 20): - t = threading.Thread(target=run) - t.daemon = True + t = threading_utils.daemon_thread(run) threads.append(t) t.start() while threads: @@ -234,9 +233,8 @@ class MultilockTest(test.TestCase): target = run_fail else: target = run - t = threading.Thread(target=target) + t = threading_utils.daemon_thread(target) threads.append(t) - t.daemon = True t.start() while threads: t = threads.pop() @@ -287,9 +285,8 @@ class MultilockTest(test.TestCase): threads = [] for i in range(0, 20): - t = threading.Thread(target=run) + t = threading_utils.daemon_thread(run) threads.append(t) - t.daemon = True t.start() while threads: t = threads.pop() @@ -369,11 +366,11 @@ class ReadWriteLockTest(test.TestCase): with lock.write_lock(): activated.append(lock.owner) - reader = threading.Thread(target=double_reader) + reader = threading_utils.daemon_thread(double_reader) reader.start() self.assertTrue(active.wait(test_utils.WAIT_TIMEOUT)) - writer = threading.Thread(target=happy_writer) + writer = threading_utils.daemon_thread(happy_writer) writer.start() reader.join() diff --git a/taskflow/tests/unit/worker_based/test_message_pump.py b/taskflow/tests/unit/worker_based/test_message_pump.py index 1fc946ed7..8b48b435d 100644 --- a/taskflow/tests/unit/worker_based/test_message_pump.py +++ b/taskflow/tests/unit/worker_based/test_message_pump.py @@ -14,8 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import threading - from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import proxy from taskflow.openstack.common import uuidutils @@ -43,8 +41,7 @@ class TestMessagePump(test.TestCase): 'polling_interval': POLLING_INTERVAL, }) - t = threading.Thread(target=p.start) - t.daemon = True + t = threading_utils.daemon_thread(p.start) t.start() p.wait() p.publish(pr.Notify(), TEST_TOPIC) @@ -69,8 +66,7 @@ class TestMessagePump(test.TestCase): 'polling_interval': POLLING_INTERVAL, }) - t = threading.Thread(target=p.start) - t.daemon = True + t = threading_utils.daemon_thread(p.start) t.start() p.wait() resp = pr.Response(pr.RUNNING) @@ -109,8 +105,7 @@ class TestMessagePump(test.TestCase): 'polling_interval': POLLING_INTERVAL, }) - t = threading.Thread(target=p.start) - t.daemon = True + t = threading_utils.daemon_thread(p.start) t.start() p.wait() diff --git a/taskflow/tests/unit/worker_based/test_pipeline.py b/taskflow/tests/unit/worker_based/test_pipeline.py index b86cedd07..ed3e26626 100644 --- a/taskflow/tests/unit/worker_based/test_pipeline.py +++ b/taskflow/tests/unit/worker_based/test_pipeline.py @@ -14,8 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import threading - from concurrent import futures from taskflow.engines.worker_based import endpoint @@ -25,6 +23,7 @@ from taskflow.openstack.common import uuidutils from taskflow import test from taskflow.tests import utils as test_utils from taskflow.types import failure +from taskflow.utils import threading_utils TEST_EXCHANGE, TEST_TOPIC = ('test-exchange', 'test-topic') @@ -44,8 +43,7 @@ class TestPipeline(test.TestCase): transport_options={ 'polling_interval': POLLING_INTERVAL, }) - server_thread = threading.Thread(target=server.start) - server_thread.daemon = True + server_thread = threading_utils.daemon_thread(server.start) return (server, server_thread) def _fetch_executor(self): diff --git a/taskflow/tests/unit/worker_based/test_proxy.py b/taskflow/tests/unit/worker_based/test_proxy.py index de5f3abc9..4217a726e 100644 --- a/taskflow/tests/unit/worker_based/test_proxy.py +++ b/taskflow/tests/unit/worker_based/test_proxy.py @@ -15,12 +15,12 @@ # under the License. import socket -import threading from six.moves import mock from taskflow.engines.worker_based import proxy from taskflow import test +from taskflow.utils import threading_utils class TestProxy(test.MockTestCase): @@ -210,8 +210,7 @@ class TestProxy(test.MockTestCase): self.assertFalse(pr.is_running) # start proxy in separate thread - t = threading.Thread(target=pr.start) - t.daemon = True + t = threading_utils.daemon_thread(pr.start) t.start() # make sure proxy is started