From 7ca631356efd943bf8e246a6a907653a70a35771 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 2 Sep 2014 12:36:52 -0700 Subject: [PATCH] Use and verify event and latch wait() return using timeouts Instead of blocking up the whole test suite when a latch or event was not decremented to its desired value (or not set for an event) we should use a reasonably high value that we use when waiting for those actions to occur and verify that when those wait() functions return that we have reached the desired state and if not either raise an exception or stop further testing. Fixes bug 1363739 Change-Id: I8b40282ac2db9cabd48b0b65c8a2a49610d77c4f --- taskflow/conductors/single_threaded.py | 7 +++--- taskflow/engines/worker_based/proxy.py | 4 ++-- .../tests/unit/conductor/test_conductor.py | 19 ++++++++-------- taskflow/tests/unit/jobs/base.py | 22 ++++++++++++++----- taskflow/tests/unit/test_utils_lock_utils.py | 6 +++-- .../tests/unit/worker_based/test_executor.py | 18 +++++++-------- .../unit/worker_based/test_message_pump.py | 13 +++++------ taskflow/tests/utils.py | 18 +++++++-------- taskflow/types/timing.py | 6 ++--- taskflow/utils/threading_utils.py | 20 +++++++++++++++++ 10 files changed, 81 insertions(+), 52 deletions(-) diff --git a/taskflow/conductors/single_threaded.py b/taskflow/conductors/single_threaded.py index 23994e792..eef64836a 100644 --- a/taskflow/conductors/single_threaded.py +++ b/taskflow/conductors/single_threaded.py @@ -13,7 +13,6 @@ # under the License. import logging -import threading import six @@ -23,6 +22,7 @@ from taskflow.listeners import logging as logging_listener from taskflow.types import timing as tt from taskflow.utils import async_utils from taskflow.utils import lock_utils +from taskflow.utils import threading_utils LOG = logging.getLogger(__name__) WAIT_TIMEOUT = 0.5 @@ -64,7 +64,7 @@ class SingleThreadedConductor(base.Conductor): self._wait_timeout = wait_timeout else: raise ValueError("Invalid timeout literal: %s" % (wait_timeout)) - self._dead = threading.Event() + self._dead = threading_utils.Event() @lock_utils.locked def stop(self, timeout=None): @@ -81,8 +81,7 @@ class SingleThreadedConductor(base.Conductor): be honored in the future) and False will be returned indicating this. """ self._wait_timeout.interrupt() - self._dead.wait(timeout) - return self._dead.is_set() + return self._dead.wait(timeout) @property def dispatching(self): diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index c51dd164e..e0f9ce7d6 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -16,13 +16,13 @@ import logging import socket -import threading import kombu import six from taskflow.engines.worker_based import dispatcher from taskflow.utils import misc +from taskflow.utils import threading_utils LOG = logging.getLogger(__name__) @@ -39,7 +39,7 @@ class Proxy(object): self._topic = topic self._exchange_name = exchange_name self._on_wait = on_wait - self._running = threading.Event() + self._running = threading_utils.Event() self._dispatcher = dispatcher.TypeDispatcher(type_handlers) self._dispatcher.add_requeue_filter( # NOTE(skudriashev): Process all incoming messages only if proxy is diff --git a/taskflow/tests/unit/conductor/test_conductor.py b/taskflow/tests/unit/conductor/test_conductor.py index b43ba035f..2d254a306 100644 --- a/taskflow/tests/unit/conductor/test_conductor.py +++ b/taskflow/tests/unit/conductor/test_conductor.py @@ -30,6 +30,7 @@ from taskflow import test from taskflow.tests import utils as test_utils from taskflow.utils import misc from taskflow.utils import persistence_utils as pu +from taskflow.utils import threading_utils @contextlib.contextmanager @@ -88,14 +89,15 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): with close_many(components.conductor, components.client): t = make_thread(components.conductor) t.start() - self.assertTrue(components.conductor.stop(0.5)) + self.assertTrue( + components.conductor.stop(test_utils.WAIT_TIMEOUT)) self.assertFalse(components.conductor.dispatching) t.join() def test_run(self): components = self.make_components() components.conductor.connect() - consumed_event = threading.Event() + consumed_event = threading_utils.Event() def on_consume(state, details): consumed_event.set() @@ -110,9 +112,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): backend=components.persistence) components.board.post('poke', lb, details={'flow_uuid': fd.uuid}) - consumed_event.wait(1.0) - self.assertTrue(consumed_event.is_set()) - self.assertTrue(components.conductor.stop(1.0)) + self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) + self.assertTrue(components.conductor.stop(test_utils.WAIT_TIMEOUT)) self.assertFalse(components.conductor.dispatching) persistence = components.persistence @@ -125,8 +126,7 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): def test_fail_run(self): components = self.make_components() components.conductor.connect() - - consumed_event = threading.Event() + consumed_event = threading_utils.Event() def on_consume(state, details): consumed_event.set() @@ -141,9 +141,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): backend=components.persistence) components.board.post('poke', lb, details={'flow_uuid': fd.uuid}) - consumed_event.wait(1.0) - self.assertTrue(consumed_event.is_set()) - self.assertTrue(components.conductor.stop(1.0)) + self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) + self.assertTrue(components.conductor.stop(test_utils.WAIT_TIMEOUT)) self.assertFalse(components.conductor.dispatching) persistence = components.persistence diff --git a/taskflow/tests/unit/jobs/base.py b/taskflow/tests/unit/jobs/base.py index f8a2687ea..e0a20a044 100644 --- a/taskflow/tests/unit/jobs/base.py +++ b/taskflow/tests/unit/jobs/base.py @@ -25,8 +25,10 @@ from taskflow.openstack.common import uuidutils from taskflow.persistence.backends import impl_dir from taskflow import states from taskflow.test import mock +from taskflow.tests import utils as test_utils from taskflow.utils import misc from taskflow.utils import persistence_utils as p_utils +from taskflow.utils import threading_utils FLUSH_PATH_TPL = '/taskflow/flush-test/%s' @@ -52,8 +54,8 @@ def flush(client, path=None): # before this context manager exits. if not path: path = FLUSH_PATH_TPL % uuidutils.generate_uuid() - created = threading.Event() - deleted = threading.Event() + created = threading_utils.Event() + deleted = threading_utils.Event() def on_created(data, stat): if stat is not None: @@ -67,13 +69,19 @@ def flush(client, path=None): watchers.DataWatch(client, path, func=on_created) client.create(path, makepath=True) - created.wait() + if not created.wait(test_utils.WAIT_TIMEOUT): + raise RuntimeError("Could not receive creation of %s in" + " the alloted timeout of %s seconds" + % (path, test_utils.WAIT_TIMEOUT)) try: yield finally: watchers.DataWatch(client, path, func=on_deleted) client.delete(path, recursive=True) - deleted.wait() + if not deleted.wait(test_utils.WAIT_TIMEOUT): + raise RuntimeError("Could not receive deletion of %s in" + " the alloted timeout of %s seconds" + % (path, test_utils.WAIT_TIMEOUT)) class BoardTestMixin(object): @@ -119,11 +127,13 @@ class BoardTestMixin(object): self.assertRaises(excp.NotFound, self.board.wait, timeout=0.1) def test_wait_arrival(self): - ev = threading.Event() + ev = threading_utils.Event() jobs = [] def poster(wait_post=0.2): - ev.wait() # wait until the waiter is active + if not ev.wait(test_utils.WAIT_TIMEOUT): + raise RuntimeError("Waiter did not appear ready" + " in %s seconds" % test_utils.WAIT_TIMEOUT) time.sleep(wait_post) self.board.post('test', p_utils.temporary_log_book()) diff --git a/taskflow/tests/unit/test_utils_lock_utils.py b/taskflow/tests/unit/test_utils_lock_utils.py index 066b17a22..c7fe09f11 100644 --- a/taskflow/tests/unit/test_utils_lock_utils.py +++ b/taskflow/tests/unit/test_utils_lock_utils.py @@ -22,7 +22,9 @@ from concurrent import futures import mock from taskflow import test +from taskflow.tests import utils as test_utils from taskflow.utils import lock_utils +from taskflow.utils import threading_utils # NOTE(harlowja): Sleep a little so time.time() can not be the same (which will # cause false positives when our overlap detection code runs). If there are @@ -353,7 +355,7 @@ class ReadWriteLockTest(test.TestCase): def test_double_reader_writer(self): lock = lock_utils.ReaderWriterLock() activated = collections.deque() - active = threading.Event() + active = threading_utils.Event() def double_reader(): with lock.read_lock(): @@ -369,7 +371,7 @@ class ReadWriteLockTest(test.TestCase): reader = threading.Thread(target=double_reader) reader.start() - active.wait() + self.assertTrue(active.wait(test_utils.WAIT_TIMEOUT)) writer = threading.Thread(target=happy_writer) writer.start() diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index 59681bb2a..3e494e88d 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import threading import time from concurrent import futures @@ -24,15 +23,16 @@ from taskflow.engines.worker_based import executor from taskflow.engines.worker_based import protocol as pr from taskflow import test from taskflow.test import mock -from taskflow.tests import utils +from taskflow.tests import utils as test_utils from taskflow.utils import misc +from taskflow.utils import threading_utils class TestWorkerTaskExecutor(test.MockTestCase): def setUp(self): super(TestWorkerTaskExecutor, self).setUp() - self.task = utils.DummyTask() + self.task = test_utils.DummyTask() self.task_uuid = 'task-uuid' self.task_args = {'a': 'a'} self.task_result = 'task-result' @@ -42,7 +42,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.executor_uuid = 'executor-uuid' self.executor_exchange = 'executor-exchange' self.executor_topic = 'test-topic1' - self.proxy_started_event = threading.Event() + self.proxy_started_event = threading_utils.Event() # patch classes self.proxy_mock, self.proxy_inst_mock = self.patchClass( @@ -121,7 +121,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.assertEqual(len(ex._requests_cache), 0) expected_calls = [ mock.call.transition_and_log_error(pr.FAILURE, logger=mock.ANY), - mock.call.set_result(result=utils.FailureMatcher(failure)) + mock.call.set_result(result=test_utils.FailureMatcher(failure)) ] self.assertEqual(expected_calls, self.request_inst_mock.mock_calls) @@ -303,7 +303,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): ex.start() # make sure proxy thread started - self.proxy_started_event.wait() + self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT)) # stop executor ex.stop() @@ -319,7 +319,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): ex.start() # make sure proxy thread started - self.proxy_started_event.wait() + self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT)) # start executor again ex.start() @@ -362,14 +362,14 @@ class TestWorkerTaskExecutor(test.MockTestCase): ex.start() # make sure thread started - self.proxy_started_event.wait() + self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT)) # restart executor ex.stop() ex.start() # make sure thread started - self.proxy_started_event.wait() + self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT)) # stop executor ex.stop() diff --git a/taskflow/tests/unit/worker_based/test_message_pump.py b/taskflow/tests/unit/worker_based/test_message_pump.py index 008ad72c7..1fc946ed7 100644 --- a/taskflow/tests/unit/worker_based/test_message_pump.py +++ b/taskflow/tests/unit/worker_based/test_message_pump.py @@ -23,15 +23,15 @@ from taskflow import test from taskflow.test import mock from taskflow.tests import utils as test_utils from taskflow.types import latch +from taskflow.utils import threading_utils TEST_EXCHANGE, TEST_TOPIC = ('test-exchange', 'test-topic') -BARRIER_WAIT_TIMEOUT = 1.0 POLLING_INTERVAL = 0.01 class TestMessagePump(test.TestCase): def test_notify(self): - barrier = threading.Event() + barrier = threading_utils.Event() on_notify = mock.MagicMock() on_notify.side_effect = lambda *args, **kwargs: barrier.set() @@ -49,8 +49,7 @@ class TestMessagePump(test.TestCase): p.wait() p.publish(pr.Notify(), TEST_TOPIC) - barrier.wait(BARRIER_WAIT_TIMEOUT) - self.assertTrue(barrier.is_set()) + self.assertTrue(barrier.wait(test_utils.WAIT_TIMEOUT)) p.stop() t.join() @@ -58,7 +57,7 @@ class TestMessagePump(test.TestCase): on_notify.assert_called_with({}, mock.ANY) def test_response(self): - barrier = threading.Event() + barrier = threading_utils.Event() on_response = mock.MagicMock() on_response.side_effect = lambda *args, **kwargs: barrier.set() @@ -77,7 +76,7 @@ class TestMessagePump(test.TestCase): resp = pr.Response(pr.RUNNING) p.publish(resp, TEST_TOPIC) - barrier.wait(BARRIER_WAIT_TIMEOUT) + self.assertTrue(barrier.wait(test_utils.WAIT_TIMEOUT)) self.assertTrue(barrier.is_set()) p.stop() t.join() @@ -126,7 +125,7 @@ class TestMessagePump(test.TestCase): uuidutils.generate_uuid(), pr.EXECUTE, [], None, None), TEST_TOPIC) - barrier.wait(BARRIER_WAIT_TIMEOUT) + self.assertTrue(barrier.wait(test_utils.WAIT_TIMEOUT)) self.assertEqual(0, barrier.needed) p.stop() t.join() diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 1662b2869..d01f91a30 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -16,7 +16,6 @@ import contextlib import string -import threading import six @@ -26,15 +25,18 @@ from taskflow import retry from taskflow import task from taskflow.utils import kazoo_utils from taskflow.utils import misc +from taskflow.utils import threading_utils ARGS_KEY = '__args__' KWARGS_KEY = '__kwargs__' ORDER_KEY = '__order__' - ZK_TEST_CONFIG = { 'timeout': 1.0, 'hosts': ["localhost:2181"], } +# If latches/events take longer than this to become empty/set, something is +# usually wrong and should be debugged instead of deadlocking... +WAIT_TIMEOUT = 300 @contextlib.contextmanager @@ -342,16 +344,14 @@ class WaitForOneFromTask(SaveOrderTask): self.wait_states = [wait_states] else: self.wait_states = wait_states - self.event = threading.Event() + self.event = threading_utils.Event() def execute(self): - # NOTE(imelnikov): if test was not complete within - # 5 minutes, something is terribly wrong - self.event.wait(300) - if not self.event.is_set(): - raise RuntimeError('Timeout occurred while waiting ' + if not self.event.wait(WAIT_TIMEOUT): + raise RuntimeError('%s second timeout occurred while waiting ' 'for %s to change state to %s' - % (self.wait_for, self.wait_states)) + % (WAIT_TIMEOUT, self.wait_for, + self.wait_states)) return super(WaitForOneFromTask, self).execute() def callback(self, state, details): diff --git a/taskflow/types/timing.py b/taskflow/types/timing.py index 6e2c46c80..decada4ad 100644 --- a/taskflow/types/timing.py +++ b/taskflow/types/timing.py @@ -14,10 +14,10 @@ # License for the specific language governing permissions and limitations # under the License. -import threading - from oslo.utils import timeutils +from taskflow.utils import threading_utils + class Timeout(object): """An object which represents a timeout. @@ -29,7 +29,7 @@ class Timeout(object): if timeout < 0: raise ValueError("Timeout must be >= 0 and not %s" % (timeout)) self._timeout = timeout - self._event = threading.Event() + self._event = threading_utils.Event() def interrupt(self): self._event.set() diff --git a/taskflow/utils/threading_utils.py b/taskflow/utils/threading_utils.py index 2af170239..b3749bca8 100644 --- a/taskflow/utils/threading_utils.py +++ b/taskflow/utils/threading_utils.py @@ -15,11 +15,31 @@ # under the License. import multiprocessing +import sys import threading from six.moves import _thread +if sys.version_info[0:2] == (2, 6): + # This didn't return that was/wasn't set in 2.6, since we actually care + # whether it did or didn't add that feature by taking the code from 2.7 + # that added this functionality... + # + # TODO(harlowja): remove when we can drop 2.6 support. + class Event(threading._Event): + def wait(self, timeout=None): + self.__cond.acquire() + try: + if not self.__flag: + self.__cond.wait(timeout) + return self.__flag + finally: + self.__cond.release() +else: + Event = threading.Event + + def get_ident(): """Return the 'thread identifier' of the current thread.""" return _thread.get_ident()