Use and verify event and latch wait() return using timeouts
Instead of blocking up the whole test suite when a latch or event was not decremented to its desired value (or not set for an event) we should use a reasonably high value that we use when waiting for those actions to occur and verify that when those wait() functions return that we have reached the desired state and if not either raise an exception or stop further testing. Fixes bug 1363739 Change-Id: I8b40282ac2db9cabd48b0b65c8a2a49610d77c4f
This commit is contained in:
parent
4370fd19a5
commit
7ca631356e
@ -13,7 +13,6 @@
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import threading
|
||||
|
||||
import six
|
||||
|
||||
@ -23,6 +22,7 @@ from taskflow.listeners import logging as logging_listener
|
||||
from taskflow.types import timing as tt
|
||||
from taskflow.utils import async_utils
|
||||
from taskflow.utils import lock_utils
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
WAIT_TIMEOUT = 0.5
|
||||
@ -64,7 +64,7 @@ class SingleThreadedConductor(base.Conductor):
|
||||
self._wait_timeout = wait_timeout
|
||||
else:
|
||||
raise ValueError("Invalid timeout literal: %s" % (wait_timeout))
|
||||
self._dead = threading.Event()
|
||||
self._dead = threading_utils.Event()
|
||||
|
||||
@lock_utils.locked
|
||||
def stop(self, timeout=None):
|
||||
@ -81,8 +81,7 @@ class SingleThreadedConductor(base.Conductor):
|
||||
be honored in the future) and False will be returned indicating this.
|
||||
"""
|
||||
self._wait_timeout.interrupt()
|
||||
self._dead.wait(timeout)
|
||||
return self._dead.is_set()
|
||||
return self._dead.wait(timeout)
|
||||
|
||||
@property
|
||||
def dispatching(self):
|
||||
|
@ -16,13 +16,13 @@
|
||||
|
||||
import logging
|
||||
import socket
|
||||
import threading
|
||||
|
||||
import kombu
|
||||
import six
|
||||
|
||||
from taskflow.engines.worker_based import dispatcher
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -39,7 +39,7 @@ class Proxy(object):
|
||||
self._topic = topic
|
||||
self._exchange_name = exchange_name
|
||||
self._on_wait = on_wait
|
||||
self._running = threading.Event()
|
||||
self._running = threading_utils.Event()
|
||||
self._dispatcher = dispatcher.TypeDispatcher(type_handlers)
|
||||
self._dispatcher.add_requeue_filter(
|
||||
# NOTE(skudriashev): Process all incoming messages only if proxy is
|
||||
|
@ -30,6 +30,7 @@ from taskflow import test
|
||||
from taskflow.tests import utils as test_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import persistence_utils as pu
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
@ -88,14 +89,15 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
|
||||
with close_many(components.conductor, components.client):
|
||||
t = make_thread(components.conductor)
|
||||
t.start()
|
||||
self.assertTrue(components.conductor.stop(0.5))
|
||||
self.assertTrue(
|
||||
components.conductor.stop(test_utils.WAIT_TIMEOUT))
|
||||
self.assertFalse(components.conductor.dispatching)
|
||||
t.join()
|
||||
|
||||
def test_run(self):
|
||||
components = self.make_components()
|
||||
components.conductor.connect()
|
||||
consumed_event = threading.Event()
|
||||
consumed_event = threading_utils.Event()
|
||||
|
||||
def on_consume(state, details):
|
||||
consumed_event.set()
|
||||
@ -110,9 +112,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
|
||||
backend=components.persistence)
|
||||
components.board.post('poke', lb,
|
||||
details={'flow_uuid': fd.uuid})
|
||||
consumed_event.wait(1.0)
|
||||
self.assertTrue(consumed_event.is_set())
|
||||
self.assertTrue(components.conductor.stop(1.0))
|
||||
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
|
||||
self.assertTrue(components.conductor.stop(test_utils.WAIT_TIMEOUT))
|
||||
self.assertFalse(components.conductor.dispatching)
|
||||
|
||||
persistence = components.persistence
|
||||
@ -125,8 +126,7 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
|
||||
def test_fail_run(self):
|
||||
components = self.make_components()
|
||||
components.conductor.connect()
|
||||
|
||||
consumed_event = threading.Event()
|
||||
consumed_event = threading_utils.Event()
|
||||
|
||||
def on_consume(state, details):
|
||||
consumed_event.set()
|
||||
@ -141,9 +141,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
|
||||
backend=components.persistence)
|
||||
components.board.post('poke', lb,
|
||||
details={'flow_uuid': fd.uuid})
|
||||
consumed_event.wait(1.0)
|
||||
self.assertTrue(consumed_event.is_set())
|
||||
self.assertTrue(components.conductor.stop(1.0))
|
||||
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
|
||||
self.assertTrue(components.conductor.stop(test_utils.WAIT_TIMEOUT))
|
||||
self.assertFalse(components.conductor.dispatching)
|
||||
|
||||
persistence = components.persistence
|
||||
|
@ -25,8 +25,10 @@ from taskflow.openstack.common import uuidutils
|
||||
from taskflow.persistence.backends import impl_dir
|
||||
from taskflow import states
|
||||
from taskflow.test import mock
|
||||
from taskflow.tests import utils as test_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import persistence_utils as p_utils
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
FLUSH_PATH_TPL = '/taskflow/flush-test/%s'
|
||||
|
||||
@ -52,8 +54,8 @@ def flush(client, path=None):
|
||||
# before this context manager exits.
|
||||
if not path:
|
||||
path = FLUSH_PATH_TPL % uuidutils.generate_uuid()
|
||||
created = threading.Event()
|
||||
deleted = threading.Event()
|
||||
created = threading_utils.Event()
|
||||
deleted = threading_utils.Event()
|
||||
|
||||
def on_created(data, stat):
|
||||
if stat is not None:
|
||||
@ -67,13 +69,19 @@ def flush(client, path=None):
|
||||
|
||||
watchers.DataWatch(client, path, func=on_created)
|
||||
client.create(path, makepath=True)
|
||||
created.wait()
|
||||
if not created.wait(test_utils.WAIT_TIMEOUT):
|
||||
raise RuntimeError("Could not receive creation of %s in"
|
||||
" the alloted timeout of %s seconds"
|
||||
% (path, test_utils.WAIT_TIMEOUT))
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
watchers.DataWatch(client, path, func=on_deleted)
|
||||
client.delete(path, recursive=True)
|
||||
deleted.wait()
|
||||
if not deleted.wait(test_utils.WAIT_TIMEOUT):
|
||||
raise RuntimeError("Could not receive deletion of %s in"
|
||||
" the alloted timeout of %s seconds"
|
||||
% (path, test_utils.WAIT_TIMEOUT))
|
||||
|
||||
|
||||
class BoardTestMixin(object):
|
||||
@ -119,11 +127,13 @@ class BoardTestMixin(object):
|
||||
self.assertRaises(excp.NotFound, self.board.wait, timeout=0.1)
|
||||
|
||||
def test_wait_arrival(self):
|
||||
ev = threading.Event()
|
||||
ev = threading_utils.Event()
|
||||
jobs = []
|
||||
|
||||
def poster(wait_post=0.2):
|
||||
ev.wait() # wait until the waiter is active
|
||||
if not ev.wait(test_utils.WAIT_TIMEOUT):
|
||||
raise RuntimeError("Waiter did not appear ready"
|
||||
" in %s seconds" % test_utils.WAIT_TIMEOUT)
|
||||
time.sleep(wait_post)
|
||||
self.board.post('test', p_utils.temporary_log_book())
|
||||
|
||||
|
@ -22,7 +22,9 @@ from concurrent import futures
|
||||
import mock
|
||||
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils as test_utils
|
||||
from taskflow.utils import lock_utils
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
# NOTE(harlowja): Sleep a little so time.time() can not be the same (which will
|
||||
# cause false positives when our overlap detection code runs). If there are
|
||||
@ -353,7 +355,7 @@ class ReadWriteLockTest(test.TestCase):
|
||||
def test_double_reader_writer(self):
|
||||
lock = lock_utils.ReaderWriterLock()
|
||||
activated = collections.deque()
|
||||
active = threading.Event()
|
||||
active = threading_utils.Event()
|
||||
|
||||
def double_reader():
|
||||
with lock.read_lock():
|
||||
@ -369,7 +371,7 @@ class ReadWriteLockTest(test.TestCase):
|
||||
|
||||
reader = threading.Thread(target=double_reader)
|
||||
reader.start()
|
||||
active.wait()
|
||||
self.assertTrue(active.wait(test_utils.WAIT_TIMEOUT))
|
||||
|
||||
writer = threading.Thread(target=happy_writer)
|
||||
writer.start()
|
||||
|
@ -14,7 +14,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import threading
|
||||
import time
|
||||
|
||||
from concurrent import futures
|
||||
@ -24,15 +23,16 @@ from taskflow.engines.worker_based import executor
|
||||
from taskflow.engines.worker_based import protocol as pr
|
||||
from taskflow import test
|
||||
from taskflow.test import mock
|
||||
from taskflow.tests import utils
|
||||
from taskflow.tests import utils as test_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
|
||||
class TestWorkerTaskExecutor(test.MockTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestWorkerTaskExecutor, self).setUp()
|
||||
self.task = utils.DummyTask()
|
||||
self.task = test_utils.DummyTask()
|
||||
self.task_uuid = 'task-uuid'
|
||||
self.task_args = {'a': 'a'}
|
||||
self.task_result = 'task-result'
|
||||
@ -42,7 +42,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
|
||||
self.executor_uuid = 'executor-uuid'
|
||||
self.executor_exchange = 'executor-exchange'
|
||||
self.executor_topic = 'test-topic1'
|
||||
self.proxy_started_event = threading.Event()
|
||||
self.proxy_started_event = threading_utils.Event()
|
||||
|
||||
# patch classes
|
||||
self.proxy_mock, self.proxy_inst_mock = self.patchClass(
|
||||
@ -121,7 +121,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
|
||||
self.assertEqual(len(ex._requests_cache), 0)
|
||||
expected_calls = [
|
||||
mock.call.transition_and_log_error(pr.FAILURE, logger=mock.ANY),
|
||||
mock.call.set_result(result=utils.FailureMatcher(failure))
|
||||
mock.call.set_result(result=test_utils.FailureMatcher(failure))
|
||||
]
|
||||
self.assertEqual(expected_calls, self.request_inst_mock.mock_calls)
|
||||
|
||||
@ -303,7 +303,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
|
||||
ex.start()
|
||||
|
||||
# make sure proxy thread started
|
||||
self.proxy_started_event.wait()
|
||||
self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT))
|
||||
|
||||
# stop executor
|
||||
ex.stop()
|
||||
@ -319,7 +319,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
|
||||
ex.start()
|
||||
|
||||
# make sure proxy thread started
|
||||
self.proxy_started_event.wait()
|
||||
self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT))
|
||||
|
||||
# start executor again
|
||||
ex.start()
|
||||
@ -362,14 +362,14 @@ class TestWorkerTaskExecutor(test.MockTestCase):
|
||||
ex.start()
|
||||
|
||||
# make sure thread started
|
||||
self.proxy_started_event.wait()
|
||||
self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT))
|
||||
|
||||
# restart executor
|
||||
ex.stop()
|
||||
ex.start()
|
||||
|
||||
# make sure thread started
|
||||
self.proxy_started_event.wait()
|
||||
self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT))
|
||||
|
||||
# stop executor
|
||||
ex.stop()
|
||||
|
@ -23,15 +23,15 @@ from taskflow import test
|
||||
from taskflow.test import mock
|
||||
from taskflow.tests import utils as test_utils
|
||||
from taskflow.types import latch
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
TEST_EXCHANGE, TEST_TOPIC = ('test-exchange', 'test-topic')
|
||||
BARRIER_WAIT_TIMEOUT = 1.0
|
||||
POLLING_INTERVAL = 0.01
|
||||
|
||||
|
||||
class TestMessagePump(test.TestCase):
|
||||
def test_notify(self):
|
||||
barrier = threading.Event()
|
||||
barrier = threading_utils.Event()
|
||||
|
||||
on_notify = mock.MagicMock()
|
||||
on_notify.side_effect = lambda *args, **kwargs: barrier.set()
|
||||
@ -49,8 +49,7 @@ class TestMessagePump(test.TestCase):
|
||||
p.wait()
|
||||
p.publish(pr.Notify(), TEST_TOPIC)
|
||||
|
||||
barrier.wait(BARRIER_WAIT_TIMEOUT)
|
||||
self.assertTrue(barrier.is_set())
|
||||
self.assertTrue(barrier.wait(test_utils.WAIT_TIMEOUT))
|
||||
p.stop()
|
||||
t.join()
|
||||
|
||||
@ -58,7 +57,7 @@ class TestMessagePump(test.TestCase):
|
||||
on_notify.assert_called_with({}, mock.ANY)
|
||||
|
||||
def test_response(self):
|
||||
barrier = threading.Event()
|
||||
barrier = threading_utils.Event()
|
||||
|
||||
on_response = mock.MagicMock()
|
||||
on_response.side_effect = lambda *args, **kwargs: barrier.set()
|
||||
@ -77,7 +76,7 @@ class TestMessagePump(test.TestCase):
|
||||
resp = pr.Response(pr.RUNNING)
|
||||
p.publish(resp, TEST_TOPIC)
|
||||
|
||||
barrier.wait(BARRIER_WAIT_TIMEOUT)
|
||||
self.assertTrue(barrier.wait(test_utils.WAIT_TIMEOUT))
|
||||
self.assertTrue(barrier.is_set())
|
||||
p.stop()
|
||||
t.join()
|
||||
@ -126,7 +125,7 @@ class TestMessagePump(test.TestCase):
|
||||
uuidutils.generate_uuid(),
|
||||
pr.EXECUTE, [], None, None), TEST_TOPIC)
|
||||
|
||||
barrier.wait(BARRIER_WAIT_TIMEOUT)
|
||||
self.assertTrue(barrier.wait(test_utils.WAIT_TIMEOUT))
|
||||
self.assertEqual(0, barrier.needed)
|
||||
p.stop()
|
||||
t.join()
|
||||
|
@ -16,7 +16,6 @@
|
||||
|
||||
import contextlib
|
||||
import string
|
||||
import threading
|
||||
|
||||
import six
|
||||
|
||||
@ -26,15 +25,18 @@ from taskflow import retry
|
||||
from taskflow import task
|
||||
from taskflow.utils import kazoo_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
ARGS_KEY = '__args__'
|
||||
KWARGS_KEY = '__kwargs__'
|
||||
ORDER_KEY = '__order__'
|
||||
|
||||
ZK_TEST_CONFIG = {
|
||||
'timeout': 1.0,
|
||||
'hosts': ["localhost:2181"],
|
||||
}
|
||||
# If latches/events take longer than this to become empty/set, something is
|
||||
# usually wrong and should be debugged instead of deadlocking...
|
||||
WAIT_TIMEOUT = 300
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
@ -342,16 +344,14 @@ class WaitForOneFromTask(SaveOrderTask):
|
||||
self.wait_states = [wait_states]
|
||||
else:
|
||||
self.wait_states = wait_states
|
||||
self.event = threading.Event()
|
||||
self.event = threading_utils.Event()
|
||||
|
||||
def execute(self):
|
||||
# NOTE(imelnikov): if test was not complete within
|
||||
# 5 minutes, something is terribly wrong
|
||||
self.event.wait(300)
|
||||
if not self.event.is_set():
|
||||
raise RuntimeError('Timeout occurred while waiting '
|
||||
if not self.event.wait(WAIT_TIMEOUT):
|
||||
raise RuntimeError('%s second timeout occurred while waiting '
|
||||
'for %s to change state to %s'
|
||||
% (self.wait_for, self.wait_states))
|
||||
% (WAIT_TIMEOUT, self.wait_for,
|
||||
self.wait_states))
|
||||
return super(WaitForOneFromTask, self).execute()
|
||||
|
||||
def callback(self, state, details):
|
||||
|
@ -14,10 +14,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import threading
|
||||
|
||||
from oslo.utils import timeutils
|
||||
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
|
||||
class Timeout(object):
|
||||
"""An object which represents a timeout.
|
||||
@ -29,7 +29,7 @@ class Timeout(object):
|
||||
if timeout < 0:
|
||||
raise ValueError("Timeout must be >= 0 and not %s" % (timeout))
|
||||
self._timeout = timeout
|
||||
self._event = threading.Event()
|
||||
self._event = threading_utils.Event()
|
||||
|
||||
def interrupt(self):
|
||||
self._event.set()
|
||||
|
@ -15,11 +15,31 @@
|
||||
# under the License.
|
||||
|
||||
import multiprocessing
|
||||
import sys
|
||||
import threading
|
||||
|
||||
from six.moves import _thread
|
||||
|
||||
|
||||
if sys.version_info[0:2] == (2, 6):
|
||||
# This didn't return that was/wasn't set in 2.6, since we actually care
|
||||
# whether it did or didn't add that feature by taking the code from 2.7
|
||||
# that added this functionality...
|
||||
#
|
||||
# TODO(harlowja): remove when we can drop 2.6 support.
|
||||
class Event(threading._Event):
|
||||
def wait(self, timeout=None):
|
||||
self.__cond.acquire()
|
||||
try:
|
||||
if not self.__flag:
|
||||
self.__cond.wait(timeout)
|
||||
return self.__flag
|
||||
finally:
|
||||
self.__cond.release()
|
||||
else:
|
||||
Event = threading.Event
|
||||
|
||||
|
||||
def get_ident():
|
||||
"""Return the 'thread identifier' of the current thread."""
|
||||
return _thread.get_ident()
|
||||
|
Loading…
Reference in New Issue
Block a user