Merge "When creating daemon threads use the bundled threading_utils"

This commit is contained in:
Jenkins
2014-11-28 04:26:16 +00:00
committed by Gerrit Code Review
11 changed files with 27 additions and 52 deletions

View File

@@ -35,6 +35,7 @@ from zake import fake_client
from taskflow import exceptions as excp
from taskflow.jobs import backends
from taskflow.utils import threading_utils
# In this example we show how a jobboard can be used to post work for other
# entities to work on. This example creates a set of jobs using one producer
@@ -152,14 +153,12 @@ def main():
with contextlib.closing(fake_client.FakeClient()) as c:
created = []
for i in compat_range(0, PRODUCERS):
p = threading.Thread(target=producer, args=(i + 1, c))
p.daemon = True
p = threading_utils.daemon_thread(producer, i + 1, c)
created.append(p)
p.start()
consumed = collections.deque()
for i in compat_range(0, WORKERS):
w = threading.Thread(target=worker, args=(i + 1, c, consumed))
w.daemon = True
w = threading_utils.daemon_thread(worker, i + 1, c, consumed)
created.append(w)
w.start()
while created:

View File

@@ -18,7 +18,6 @@ import logging
import math
import os
import sys
import threading
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir,
@@ -31,6 +30,7 @@ from taskflow import engines
from taskflow.engines.worker_based import worker
from taskflow.patterns import unordered_flow as uf
from taskflow import task
from taskflow.utils import threading_utils
# INTRO: This example walks through a workflow that will in parallel compute
# a mandelbrot result set (using X 'remote' workers) and then combine their
@@ -229,8 +229,7 @@ def create_fractal():
worker_conf['topic'] = 'calculator_%s' % (i + 1)
worker_topics.append(worker_conf['topic'])
w = worker.Worker(**worker_conf)
runner = threading.Thread(target=w.run)
runner.daemon = True
runner = threading_utils.daemon_thread(w.run)
runner.start()
w.wait()
workers.append((runner, w.stop))

View File

@@ -19,7 +19,6 @@ import logging
import os
import sys
import tempfile
import threading
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir,
@@ -30,6 +29,7 @@ from taskflow import engines
from taskflow.engines.worker_based import worker
from taskflow.patterns import linear_flow as lf
from taskflow.tests import utils
from taskflow.utils import threading_utils
import example_utils # noqa
@@ -123,8 +123,7 @@ if __name__ == "__main__":
worker_conf['topic'] = 'worker-%s' % (i + 1)
worker_topics.append(worker_conf['topic'])
w = worker.Worker(**worker_conf)
runner = threading.Thread(target=w.run)
runner.daemon = True
runner = threading_utils.daemon_thread(w.run)
runner.start()
w.wait()
workers.append((runner, w.stop))

View File

@@ -16,7 +16,6 @@
import collections
import contextlib
import threading
from zake import fake_client
@@ -51,12 +50,6 @@ def test_factory(blowup):
return f
def make_thread(conductor):
t = threading.Thread(target=conductor.run)
t.daemon = True
return t
class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
ComponentBundle = collections.namedtuple('ComponentBundle',
['board', 'client',
@@ -85,7 +78,7 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
components = self.make_components()
components.conductor.connect()
with close_many(components.conductor, components.client):
t = make_thread(components.conductor)
t = threading_utils.daemon_thread(components.conductor.run)
t.start()
self.assertTrue(
components.conductor.stop(test_utils.WAIT_TIMEOUT))
@@ -102,7 +95,7 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
components.board.notifier.register(jobboard.REMOVAL, on_consume)
with close_many(components.conductor, components.client):
t = make_thread(components.conductor)
t = threading_utils.daemon_thread(components.conductor.run)
t.start()
lb, fd = pu.temporary_flow_detail(components.persistence)
engines.save_factory_details(fd, test_factory,
@@ -131,7 +124,7 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
components.board.notifier.register(jobboard.REMOVAL, on_consume)
with close_many(components.conductor, components.client):
t = make_thread(components.conductor)
t = threading_utils.daemon_thread(components.conductor.run)
t.start()
lb, fd = pu.temporary_flow_detail(components.persistence)
engines.save_factory_details(fd, test_factory,

View File

@@ -15,7 +15,6 @@
# under the License.
import contextlib
import threading
import time
from kazoo.recipe import watchers
@@ -143,11 +142,9 @@ class BoardTestMixin(object):
jobs.extend(it)
with connect_close(self.board):
t1 = threading.Thread(target=poster)
t1.daemon = True
t1 = threading_utils.daemon_thread(poster)
t1.start()
t2 = threading.Thread(target=waiter)
t2.daemon = True
t2 = threading_utils.daemon_thread(waiter)
t2.start()
for t in (t1, t2):
t.join()

View File

@@ -644,7 +644,7 @@ class WorkerBasedEngineTest(EngineTaskTest,
'topics': tuple([worker_conf['topic']]),
})
self.worker = wkr.Worker(**worker_conf)
self.worker_thread = tu.daemon_thread(target=self.worker.run)
self.worker_thread = tu.daemon_thread(self.worker.run)
self.worker_thread.start()
# Make sure the worker is started before we can continue...

View File

@@ -17,7 +17,6 @@
import collections
import inspect
import random
import threading
import time
import six
@@ -29,6 +28,7 @@ from taskflow.types import failure
from taskflow.utils import lock_utils
from taskflow.utils import misc
from taskflow.utils import reflection
from taskflow.utils import threading_utils
def mere_function(a, b):
@@ -373,8 +373,7 @@ class CachedPropertyTest(test.TestCase):
threads = []
try:
for _i in range(0, 20):
t = threading.Thread(target=lambda: a.b)
t.daemon = True
t = threading_utils.daemon_thread(lambda: a.b)
threads.append(t)
for t in threads:
t.start()

View File

@@ -195,8 +195,7 @@ class MultilockTest(test.TestCase):
threads = []
for _i in range(0, 20):
t = threading.Thread(target=run)
t.daemon = True
t = threading_utils.daemon_thread(run)
threads.append(t)
t.start()
while threads:
@@ -234,9 +233,8 @@ class MultilockTest(test.TestCase):
target = run_fail
else:
target = run
t = threading.Thread(target=target)
t = threading_utils.daemon_thread(target)
threads.append(t)
t.daemon = True
t.start()
while threads:
t = threads.pop()
@@ -287,9 +285,8 @@ class MultilockTest(test.TestCase):
threads = []
for i in range(0, 20):
t = threading.Thread(target=run)
t = threading_utils.daemon_thread(run)
threads.append(t)
t.daemon = True
t.start()
while threads:
t = threads.pop()
@@ -369,11 +366,11 @@ class ReadWriteLockTest(test.TestCase):
with lock.write_lock():
activated.append(lock.owner)
reader = threading.Thread(target=double_reader)
reader = threading_utils.daemon_thread(double_reader)
reader.start()
self.assertTrue(active.wait(test_utils.WAIT_TIMEOUT))
writer = threading.Thread(target=happy_writer)
writer = threading_utils.daemon_thread(happy_writer)
writer.start()
reader.join()

View File

@@ -14,8 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
from taskflow.engines.worker_based import protocol as pr
from taskflow.engines.worker_based import proxy
from taskflow.openstack.common import uuidutils
@@ -43,8 +41,7 @@ class TestMessagePump(test.TestCase):
'polling_interval': POLLING_INTERVAL,
})
t = threading.Thread(target=p.start)
t.daemon = True
t = threading_utils.daemon_thread(p.start)
t.start()
p.wait()
p.publish(pr.Notify(), TEST_TOPIC)
@@ -69,8 +66,7 @@ class TestMessagePump(test.TestCase):
'polling_interval': POLLING_INTERVAL,
})
t = threading.Thread(target=p.start)
t.daemon = True
t = threading_utils.daemon_thread(p.start)
t.start()
p.wait()
resp = pr.Response(pr.RUNNING)
@@ -109,8 +105,7 @@ class TestMessagePump(test.TestCase):
'polling_interval': POLLING_INTERVAL,
})
t = threading.Thread(target=p.start)
t.daemon = True
t = threading_utils.daemon_thread(p.start)
t.start()
p.wait()

View File

@@ -14,8 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
from concurrent import futures
from taskflow.engines.worker_based import endpoint
@@ -25,6 +23,7 @@ from taskflow.openstack.common import uuidutils
from taskflow import test
from taskflow.tests import utils as test_utils
from taskflow.types import failure
from taskflow.utils import threading_utils
TEST_EXCHANGE, TEST_TOPIC = ('test-exchange', 'test-topic')
@@ -44,8 +43,7 @@ class TestPipeline(test.TestCase):
transport_options={
'polling_interval': POLLING_INTERVAL,
})
server_thread = threading.Thread(target=server.start)
server_thread.daemon = True
server_thread = threading_utils.daemon_thread(server.start)
return (server, server_thread)
def _fetch_executor(self):

View File

@@ -15,12 +15,12 @@
# under the License.
import socket
import threading
from six.moves import mock
from taskflow.engines.worker_based import proxy
from taskflow import test
from taskflow.utils import threading_utils
class TestProxy(test.MockTestCase):
@@ -210,8 +210,7 @@ class TestProxy(test.MockTestCase):
self.assertFalse(pr.is_running)
# start proxy in separate thread
t = threading.Thread(target=pr.start)
t.daemon = True
t = threading_utils.daemon_thread(pr.start)
t.start()
# make sure proxy is started