Provide the executor 'wait' function a timeout and use it

To make sure that waiting can be limited by some value, and
will not block forever add and use a timeout keyword argument
to the wait method so that waiting can be limited to some time
duration.

Change-Id: I2529e7bdc2ad449ab9de935c6bc4c6ea144e4f87
This commit is contained in:
Joshua Harlow 2015-08-28 11:42:36 -07:00
parent e9a1492b72
commit 123a0371c4
4 changed files with 86 additions and 31 deletions

View File

@ -30,12 +30,18 @@ class ExecutorBase(object):
@abc.abstractmethod
def start(self):
"Start polling for incoming messages."
"""Start polling for incoming messages."""
@abc.abstractmethod
def stop(self):
"Stop polling for messages."
"""Stop polling for messages."""
@abc.abstractmethod
def wait(self):
"Wait until the executor has stopped polling."
def wait(self, timeout=None):
"""Wait until the executor has stopped polling.
If a timeout is provided, and it is not ``None`` then this method will
wait up to that amount of time for its components to finish, if not
all components finish in the alloted time, then false will be returned
otherwise true will be returned.
"""

View File

@ -26,13 +26,17 @@ class FakeBlockingThread(object):
self._target()
@staticmethod
def join():
def join(timeout=None):
pass
@staticmethod
def stop():
pass
@staticmethod
def is_alive():
return False
class BlockingExecutor(impl_pooledexecutor.PooledExecutor):
"""A message executor which blocks the current thread.

View File

@ -20,6 +20,7 @@ import threading
from futurist import waiters
from oslo_config import cfg
from oslo_utils import excutils
from oslo_utils import timeutils
from oslo_messaging._executors import base
@ -116,16 +117,32 @@ class PooledExecutor(base.ExecutorBase):
self._tombstone.set()
self.listener.stop()
def wait(self):
# TODO(harlowja): this method really needs a timeout.
if self._poller is not None:
self._tombstone.wait()
self._poller.join()
self._poller = None
if self._executor is not None:
with self._mutator:
incomplete_fs = list(self._incomplete)
self._incomplete.clear()
if incomplete_fs:
self._wait_for_all(incomplete_fs)
self._executor = None
def wait(self, timeout=None):
with timeutils.StopWatch(duration=timeout) as w:
poller = self._poller
if poller is not None:
self._tombstone.wait(w.leftover(return_none=True))
if not self._tombstone.is_set():
return False
poller.join(w.leftover(return_none=True))
if poller.is_alive():
return False
self._poller = None
executor = self._executor
if executor is not None:
with self._mutator:
incomplete_fs = list(self._incomplete)
if incomplete_fs:
(done, not_done) = self._wait_for_all(
incomplete_fs,
timeout=w.leftover(return_none=True))
with self._mutator:
for fut in done:
try:
self._incomplete.remove(fut)
except ValueError:
pass
if not_done:
return False
self._executor = None
return True

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
import threading
# eventlet 0.16 with monkey patching does not work yet on Python 3,
@ -71,9 +72,9 @@ class TestExecutor(test_utils.BaseTestCase):
thread = threading.Thread(target=target, args=(executor,))
thread.daemon = True
thread.start()
thread.join(timeout=30)
return thread
def test_executor_dispatch(self):
def _create_dispatcher(self):
if impl_aioeventlet is not None:
aioeventlet_class = impl_aioeventlet.AsyncioEventletExecutor
else:
@ -110,11 +111,13 @@ class TestExecutor(test_utils.BaseTestCase):
endpoint = mock.MagicMock(return_value=simple_coroutine('result'))
event = eventlet.event.Event()
else:
def run_executor(executor):
executor.start()
executor.wait()
endpoint = mock.MagicMock(return_value='result')
event = None
class Dispatcher(object):
def __init__(self, endpoint):
@ -139,27 +142,52 @@ class TestExecutor(test_utils.BaseTestCase):
self.callback,
executor_callback)
listener = mock.Mock(spec=['poll', 'stop'])
dispatcher = Dispatcher(endpoint)
executor = self.executor(self.conf, listener, dispatcher)
return Dispatcher(endpoint), endpoint, event, run_executor
def test_slow_wait(self):
dispatcher, endpoint, event, run_executor = self._create_dispatcher()
listener = mock.Mock(spec=['poll', 'stop'])
executor = self.executor(self.conf, listener, dispatcher)
incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'})
def fake_poll(timeout=None):
if is_aioeventlet:
if listener.poll.call_count == 1:
return incoming_message
event.wait()
time.sleep(0.1)
if listener.poll.call_count == 10:
if event is not None:
event.wait()
executor.stop()
else:
if listener.poll.call_count == 1:
return incoming_message
executor.stop()
return incoming_message
listener.poll.side_effect = fake_poll
thread = self._run_in_thread(run_executor, executor)
self.assertFalse(executor.wait(timeout=0.1))
thread.join()
self.assertTrue(executor.wait())
self._run_in_thread(run_executor, executor)
def test_dead_wait(self):
dispatcher, _endpoint, _event, _run_executor = self._create_dispatcher()
listener = mock.Mock(spec=['poll', 'stop'])
executor = self.executor(self.conf, listener, dispatcher)
executor.stop()
self.assertTrue(executor.wait())
def test_executor_dispatch(self):
dispatcher, endpoint, event, run_executor = self._create_dispatcher()
listener = mock.Mock(spec=['poll', 'stop'])
executor = self.executor(self.conf, listener, dispatcher)
incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'})
def fake_poll(timeout=None):
if listener.poll.call_count == 1:
return incoming_message
if event is not None:
event.wait()
executor.stop()
listener.poll.side_effect = fake_poll
thread = self._run_in_thread(run_executor, executor)
thread.join()
endpoint.assert_called_once_with({}, {'payload': 'data'})
self.assertEqual(dispatcher.result, 'result')