diff --git a/taskflow/conductors/single_threaded.py b/taskflow/conductors/single_threaded.py index 84038ef1a..173d2cec8 100644 --- a/taskflow/conductors/single_threaded.py +++ b/taskflow/conductors/single_threaded.py @@ -13,7 +13,6 @@ # under the License. import logging -import threading import six @@ -23,6 +22,7 @@ from taskflow.listeners import logging as logging_listener from taskflow.types import timing as tt from taskflow.utils import async_utils from taskflow.utils import lock_utils +from taskflow.utils import threading_utils LOG = logging.getLogger(__name__) WAIT_TIMEOUT = 0.5 @@ -64,7 +64,7 @@ class SingleThreadedConductor(base.Conductor): self._wait_timeout = wait_timeout else: raise ValueError("Invalid timeout literal: %s" % (wait_timeout)) - self._dead = threading.Event() + self._dead = threading_utils.Event() @lock_utils.locked def stop(self, timeout=None): @@ -81,8 +81,7 @@ class SingleThreadedConductor(base.Conductor): be honored in the future) and False will be returned indicating this. """ self._wait_timeout.interrupt() - self._dead.wait(timeout) - return self._dead.is_set() + return self._dead.wait(timeout) @property def dispatching(self): diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index c51dd164e..e0f9ce7d6 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -16,13 +16,13 @@ import logging import socket -import threading import kombu import six from taskflow.engines.worker_based import dispatcher from taskflow.utils import misc +from taskflow.utils import threading_utils LOG = logging.getLogger(__name__) @@ -39,7 +39,7 @@ class Proxy(object): self._topic = topic self._exchange_name = exchange_name self._on_wait = on_wait - self._running = threading.Event() + self._running = threading_utils.Event() self._dispatcher = dispatcher.TypeDispatcher(type_handlers) self._dispatcher.add_requeue_filter( # NOTE(skudriashev): Process all incoming messages only if proxy is diff --git a/taskflow/tests/unit/conductor/test_conductor.py b/taskflow/tests/unit/conductor/test_conductor.py index 216fa3878..8f9c2d105 100644 --- a/taskflow/tests/unit/conductor/test_conductor.py +++ b/taskflow/tests/unit/conductor/test_conductor.py @@ -30,6 +30,7 @@ from taskflow import test from taskflow.tests import utils as test_utils from taskflow.utils import misc from taskflow.utils import persistence_utils as pu +from taskflow.utils import threading_utils @contextlib.contextmanager @@ -85,14 +86,15 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): with close_many(components.conductor, components.client): t = make_thread(components.conductor) t.start() - self.assertTrue(components.conductor.stop(0.5)) + self.assertTrue( + components.conductor.stop(test_utils.WAIT_TIMEOUT)) self.assertFalse(components.conductor.dispatching) t.join() def test_run(self): components = self.make_components() components.conductor.connect() - consumed_event = threading.Event() + consumed_event = threading_utils.Event() def on_consume(state, details): consumed_event.set() @@ -107,9 +109,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): backend=components.persistence) components.board.post('poke', lb, details={'flow_uuid': fd.uuid}) - consumed_event.wait(1.0) - self.assertTrue(consumed_event.is_set()) - self.assertTrue(components.conductor.stop(1.0)) + self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) + self.assertTrue(components.conductor.stop(test_utils.WAIT_TIMEOUT)) self.assertFalse(components.conductor.dispatching) persistence = components.persistence @@ -122,8 +123,7 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): def test_fail_run(self): components = self.make_components() components.conductor.connect() - - consumed_event = threading.Event() + consumed_event = threading_utils.Event() def on_consume(state, details): consumed_event.set() @@ -138,9 +138,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): backend=components.persistence) components.board.post('poke', lb, details={'flow_uuid': fd.uuid}) - consumed_event.wait(1.0) - self.assertTrue(consumed_event.is_set()) - self.assertTrue(components.conductor.stop(1.0)) + self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) + self.assertTrue(components.conductor.stop(test_utils.WAIT_TIMEOUT)) self.assertFalse(components.conductor.dispatching) persistence = components.persistence diff --git a/taskflow/tests/unit/jobs/base.py b/taskflow/tests/unit/jobs/base.py index f8a2687ea..e0a20a044 100644 --- a/taskflow/tests/unit/jobs/base.py +++ b/taskflow/tests/unit/jobs/base.py @@ -25,8 +25,10 @@ from taskflow.openstack.common import uuidutils from taskflow.persistence.backends import impl_dir from taskflow import states from taskflow.test import mock +from taskflow.tests import utils as test_utils from taskflow.utils import misc from taskflow.utils import persistence_utils as p_utils +from taskflow.utils import threading_utils FLUSH_PATH_TPL = '/taskflow/flush-test/%s' @@ -52,8 +54,8 @@ def flush(client, path=None): # before this context manager exits. if not path: path = FLUSH_PATH_TPL % uuidutils.generate_uuid() - created = threading.Event() - deleted = threading.Event() + created = threading_utils.Event() + deleted = threading_utils.Event() def on_created(data, stat): if stat is not None: @@ -67,13 +69,19 @@ def flush(client, path=None): watchers.DataWatch(client, path, func=on_created) client.create(path, makepath=True) - created.wait() + if not created.wait(test_utils.WAIT_TIMEOUT): + raise RuntimeError("Could not receive creation of %s in" + " the alloted timeout of %s seconds" + % (path, test_utils.WAIT_TIMEOUT)) try: yield finally: watchers.DataWatch(client, path, func=on_deleted) client.delete(path, recursive=True) - deleted.wait() + if not deleted.wait(test_utils.WAIT_TIMEOUT): + raise RuntimeError("Could not receive deletion of %s in" + " the alloted timeout of %s seconds" + % (path, test_utils.WAIT_TIMEOUT)) class BoardTestMixin(object): @@ -119,11 +127,13 @@ class BoardTestMixin(object): self.assertRaises(excp.NotFound, self.board.wait, timeout=0.1) def test_wait_arrival(self): - ev = threading.Event() + ev = threading_utils.Event() jobs = [] def poster(wait_post=0.2): - ev.wait() # wait until the waiter is active + if not ev.wait(test_utils.WAIT_TIMEOUT): + raise RuntimeError("Waiter did not appear ready" + " in %s seconds" % test_utils.WAIT_TIMEOUT) time.sleep(wait_post) self.board.post('test', p_utils.temporary_log_book()) diff --git a/taskflow/tests/unit/test_utils_lock_utils.py b/taskflow/tests/unit/test_utils_lock_utils.py index 066b17a22..c7fe09f11 100644 --- a/taskflow/tests/unit/test_utils_lock_utils.py +++ b/taskflow/tests/unit/test_utils_lock_utils.py @@ -22,7 +22,9 @@ from concurrent import futures import mock from taskflow import test +from taskflow.tests import utils as test_utils from taskflow.utils import lock_utils +from taskflow.utils import threading_utils # NOTE(harlowja): Sleep a little so time.time() can not be the same (which will # cause false positives when our overlap detection code runs). If there are @@ -353,7 +355,7 @@ class ReadWriteLockTest(test.TestCase): def test_double_reader_writer(self): lock = lock_utils.ReaderWriterLock() activated = collections.deque() - active = threading.Event() + active = threading_utils.Event() def double_reader(): with lock.read_lock(): @@ -369,7 +371,7 @@ class ReadWriteLockTest(test.TestCase): reader = threading.Thread(target=double_reader) reader.start() - active.wait() + self.assertTrue(active.wait(test_utils.WAIT_TIMEOUT)) writer = threading.Thread(target=happy_writer) writer.start() diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index 59681bb2a..3e494e88d 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import threading import time from concurrent import futures @@ -24,15 +23,16 @@ from taskflow.engines.worker_based import executor from taskflow.engines.worker_based import protocol as pr from taskflow import test from taskflow.test import mock -from taskflow.tests import utils +from taskflow.tests import utils as test_utils from taskflow.utils import misc +from taskflow.utils import threading_utils class TestWorkerTaskExecutor(test.MockTestCase): def setUp(self): super(TestWorkerTaskExecutor, self).setUp() - self.task = utils.DummyTask() + self.task = test_utils.DummyTask() self.task_uuid = 'task-uuid' self.task_args = {'a': 'a'} self.task_result = 'task-result' @@ -42,7 +42,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.executor_uuid = 'executor-uuid' self.executor_exchange = 'executor-exchange' self.executor_topic = 'test-topic1' - self.proxy_started_event = threading.Event() + self.proxy_started_event = threading_utils.Event() # patch classes self.proxy_mock, self.proxy_inst_mock = self.patchClass( @@ -121,7 +121,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.assertEqual(len(ex._requests_cache), 0) expected_calls = [ mock.call.transition_and_log_error(pr.FAILURE, logger=mock.ANY), - mock.call.set_result(result=utils.FailureMatcher(failure)) + mock.call.set_result(result=test_utils.FailureMatcher(failure)) ] self.assertEqual(expected_calls, self.request_inst_mock.mock_calls) @@ -303,7 +303,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): ex.start() # make sure proxy thread started - self.proxy_started_event.wait() + self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT)) # stop executor ex.stop() @@ -319,7 +319,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): ex.start() # make sure proxy thread started - self.proxy_started_event.wait() + self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT)) # start executor again ex.start() @@ -362,14 +362,14 @@ class TestWorkerTaskExecutor(test.MockTestCase): ex.start() # make sure thread started - self.proxy_started_event.wait() + self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT)) # restart executor ex.stop() ex.start() # make sure thread started - self.proxy_started_event.wait() + self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT)) # stop executor ex.stop() diff --git a/taskflow/tests/unit/worker_based/test_message_pump.py b/taskflow/tests/unit/worker_based/test_message_pump.py index 008ad72c7..1fc946ed7 100644 --- a/taskflow/tests/unit/worker_based/test_message_pump.py +++ b/taskflow/tests/unit/worker_based/test_message_pump.py @@ -23,15 +23,15 @@ from taskflow import test from taskflow.test import mock from taskflow.tests import utils as test_utils from taskflow.types import latch +from taskflow.utils import threading_utils TEST_EXCHANGE, TEST_TOPIC = ('test-exchange', 'test-topic') -BARRIER_WAIT_TIMEOUT = 1.0 POLLING_INTERVAL = 0.01 class TestMessagePump(test.TestCase): def test_notify(self): - barrier = threading.Event() + barrier = threading_utils.Event() on_notify = mock.MagicMock() on_notify.side_effect = lambda *args, **kwargs: barrier.set() @@ -49,8 +49,7 @@ class TestMessagePump(test.TestCase): p.wait() p.publish(pr.Notify(), TEST_TOPIC) - barrier.wait(BARRIER_WAIT_TIMEOUT) - self.assertTrue(barrier.is_set()) + self.assertTrue(barrier.wait(test_utils.WAIT_TIMEOUT)) p.stop() t.join() @@ -58,7 +57,7 @@ class TestMessagePump(test.TestCase): on_notify.assert_called_with({}, mock.ANY) def test_response(self): - barrier = threading.Event() + barrier = threading_utils.Event() on_response = mock.MagicMock() on_response.side_effect = lambda *args, **kwargs: barrier.set() @@ -77,7 +76,7 @@ class TestMessagePump(test.TestCase): resp = pr.Response(pr.RUNNING) p.publish(resp, TEST_TOPIC) - barrier.wait(BARRIER_WAIT_TIMEOUT) + self.assertTrue(barrier.wait(test_utils.WAIT_TIMEOUT)) self.assertTrue(barrier.is_set()) p.stop() t.join() @@ -126,7 +125,7 @@ class TestMessagePump(test.TestCase): uuidutils.generate_uuid(), pr.EXECUTE, [], None, None), TEST_TOPIC) - barrier.wait(BARRIER_WAIT_TIMEOUT) + self.assertTrue(barrier.wait(test_utils.WAIT_TIMEOUT)) self.assertEqual(0, barrier.needed) p.stop() t.join() diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 1662b2869..d01f91a30 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -16,7 +16,6 @@ import contextlib import string -import threading import six @@ -26,15 +25,18 @@ from taskflow import retry from taskflow import task from taskflow.utils import kazoo_utils from taskflow.utils import misc +from taskflow.utils import threading_utils ARGS_KEY = '__args__' KWARGS_KEY = '__kwargs__' ORDER_KEY = '__order__' - ZK_TEST_CONFIG = { 'timeout': 1.0, 'hosts': ["localhost:2181"], } +# If latches/events take longer than this to become empty/set, something is +# usually wrong and should be debugged instead of deadlocking... +WAIT_TIMEOUT = 300 @contextlib.contextmanager @@ -342,16 +344,14 @@ class WaitForOneFromTask(SaveOrderTask): self.wait_states = [wait_states] else: self.wait_states = wait_states - self.event = threading.Event() + self.event = threading_utils.Event() def execute(self): - # NOTE(imelnikov): if test was not complete within - # 5 minutes, something is terribly wrong - self.event.wait(300) - if not self.event.is_set(): - raise RuntimeError('Timeout occurred while waiting ' + if not self.event.wait(WAIT_TIMEOUT): + raise RuntimeError('%s second timeout occurred while waiting ' 'for %s to change state to %s' - % (self.wait_for, self.wait_states)) + % (WAIT_TIMEOUT, self.wait_for, + self.wait_states)) return super(WaitForOneFromTask, self).execute() def callback(self, state, details): diff --git a/taskflow/types/timing.py b/taskflow/types/timing.py index 6e2c46c80..decada4ad 100644 --- a/taskflow/types/timing.py +++ b/taskflow/types/timing.py @@ -14,10 +14,10 @@ # License for the specific language governing permissions and limitations # under the License. -import threading - from oslo.utils import timeutils +from taskflow.utils import threading_utils + class Timeout(object): """An object which represents a timeout. @@ -29,7 +29,7 @@ class Timeout(object): if timeout < 0: raise ValueError("Timeout must be >= 0 and not %s" % (timeout)) self._timeout = timeout - self._event = threading.Event() + self._event = threading_utils.Event() def interrupt(self): self._event.set() diff --git a/taskflow/utils/threading_utils.py b/taskflow/utils/threading_utils.py index 2af170239..b3749bca8 100644 --- a/taskflow/utils/threading_utils.py +++ b/taskflow/utils/threading_utils.py @@ -15,11 +15,31 @@ # under the License. import multiprocessing +import sys import threading from six.moves import _thread +if sys.version_info[0:2] == (2, 6): + # This didn't return that was/wasn't set in 2.6, since we actually care + # whether it did or didn't add that feature by taking the code from 2.7 + # that added this functionality... + # + # TODO(harlowja): remove when we can drop 2.6 support. + class Event(threading._Event): + def wait(self, timeout=None): + self.__cond.acquire() + try: + if not self.__flag: + self.__cond.wait(timeout) + return self.__flag + finally: + self.__cond.release() +else: + Event = threading.Event + + def get_ident(): """Return the 'thread identifier' of the current thread.""" return _thread.get_ident()