Robustify locking in MessageHandlingServer
This change formalises locking in MessageHandlingServer, which closes several bugs: * It adds locking for internal state when using the blocking executor, which closes a number of races. * It does not hold a lock while executing server functions, which removes a potential cause of deadlock if the server does its own locking. * It fixes a regression introduced in change gI3cfbe1bf02d451e379b1dcc23dacb0139c03be76. If multiple threads called wait() simultaneously, only 1 of them would wait and the others would return immediately, despite message handling not having completed. With this change only 1 will call the underlying wait, but all will wait on its completion. Additionally, it introduces some new functionality: * It allows the user to make calls in any order and it will ensure, with locking, that these will be reordered appropriately. * The caller can pass a `timeout` argument to any server method, which will cause it to raise an exception if it waits too long. * The caller can pass a `log_after` argument to any server method, which will cause it to raise a log message if it waits too long. It can also be used to disable logging when waiting is intentional. We remove DummyCondition as it no longer has any users. This change was originally committed as change I9d516b208446963dcd80b75e2d5a2cecb1187efa, but was reverted as it caused a hang in a Nova test. This was caused by the locking behaviour for handling restarting a previously stopped server. The original patch caused the state to 'wrap' immediately after the user called wait(). This caused a hang in tests which redundantly called stop() and wait() multiple times. This new patch only wraps when the user calls start() again. Callers who do not restart a server will therefore not be affected by the wrapping behaviour. Callers who do restart a server will be no worse than before. We add a deprecation warning on restart, as this operation is inherently racy with this api and there is a simple, safe alternative. This new version has been successfully tested against the unit and functional tests of nova, cinder, glance, and ceilometer. Change-Id: Ic79f87e7b069c1f62d6121486fd6cafd732fdde7
This commit is contained in:
@@ -13,6 +13,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
import time
|
||||
import threading
|
||||
|
||||
from oslo_config import cfg
|
||||
@@ -20,6 +22,7 @@ import testscenarios
|
||||
|
||||
import mock
|
||||
import oslo_messaging
|
||||
from oslo_messaging import server as server_module
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
load_tests = testscenarios.load_tests_apply_scenarios
|
||||
@@ -528,3 +531,302 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
|
||||
|
||||
TestMultipleServers.generate_scenarios()
|
||||
|
||||
class TestServerLocking(test_utils.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestServerLocking, self).setUp(conf=cfg.ConfigOpts())
|
||||
|
||||
def _logmethod(name):
|
||||
def method(self):
|
||||
with self._lock:
|
||||
self._calls.append(name)
|
||||
return method
|
||||
|
||||
executors = []
|
||||
class FakeExecutor(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._lock = threading.Lock()
|
||||
self._calls = []
|
||||
self.listener = mock.MagicMock()
|
||||
executors.append(self)
|
||||
|
||||
start = _logmethod('start')
|
||||
stop = _logmethod('stop')
|
||||
wait = _logmethod('wait')
|
||||
execute = _logmethod('execute')
|
||||
self.executors = executors
|
||||
|
||||
self.server = oslo_messaging.MessageHandlingServer(mock.Mock(),
|
||||
mock.Mock())
|
||||
self.server._executor_cls = FakeExecutor
|
||||
|
||||
def test_start_stop_wait(self):
|
||||
# Test a simple execution of start, stop, wait in order
|
||||
|
||||
thread = eventlet.spawn(self.server.start)
|
||||
self.server.stop()
|
||||
self.server.wait()
|
||||
|
||||
self.assertEqual(len(self.executors), 1)
|
||||
executor = self.executors[0]
|
||||
self.assertEqual(executor._calls,
|
||||
['start', 'execute', 'stop', 'wait'])
|
||||
self.assertTrue(executor.listener.cleanup.called)
|
||||
|
||||
def test_reversed_order(self):
|
||||
# Test that if we call wait, stop, start, these will be correctly
|
||||
# reordered
|
||||
|
||||
wait = eventlet.spawn(self.server.wait)
|
||||
# This is non-deterministic, but there's not a great deal we can do
|
||||
# about that
|
||||
eventlet.sleep(0)
|
||||
|
||||
stop = eventlet.spawn(self.server.stop)
|
||||
eventlet.sleep(0)
|
||||
|
||||
start = eventlet.spawn(self.server.start)
|
||||
|
||||
self.server.wait()
|
||||
|
||||
self.assertEqual(len(self.executors), 1)
|
||||
executor = self.executors[0]
|
||||
self.assertEqual(executor._calls,
|
||||
['start', 'execute', 'stop', 'wait'])
|
||||
|
||||
def test_wait_for_running_task(self):
|
||||
# Test that if 2 threads call a method simultaneously, both will wait,
|
||||
# but only 1 will call the underlying executor method.
|
||||
|
||||
start_event = threading.Event()
|
||||
finish_event = threading.Event()
|
||||
|
||||
running_event = threading.Event()
|
||||
done_event = threading.Event()
|
||||
|
||||
runner = [None]
|
||||
class SteppingFakeExecutor(self.server._executor_cls):
|
||||
def start(self):
|
||||
# Tell the test which thread won the race
|
||||
runner[0] = eventlet.getcurrent()
|
||||
running_event.set()
|
||||
|
||||
start_event.wait()
|
||||
super(SteppingFakeExecutor, self).start()
|
||||
done_event.set()
|
||||
|
||||
finish_event.wait()
|
||||
self.server._executor_cls = SteppingFakeExecutor
|
||||
|
||||
start1 = eventlet.spawn(self.server.start)
|
||||
start2 = eventlet.spawn(self.server.start)
|
||||
|
||||
# Wait until one of the threads starts running
|
||||
running_event.wait()
|
||||
runner = runner[0]
|
||||
waiter = start2 if runner == start1 else start2
|
||||
|
||||
waiter_finished = threading.Event()
|
||||
waiter.link(lambda _: waiter_finished.set())
|
||||
|
||||
# At this point, runner is running start(), and waiter() is waiting for
|
||||
# it to complete. runner has not yet logged anything.
|
||||
self.assertEqual(1, len(self.executors))
|
||||
executor = self.executors[0]
|
||||
|
||||
self.assertEqual(executor._calls, [])
|
||||
self.assertFalse(waiter_finished.is_set())
|
||||
|
||||
# Let the runner log the call
|
||||
start_event.set()
|
||||
done_event.wait()
|
||||
|
||||
# We haven't signalled completion yet, so execute shouldn't have run
|
||||
self.assertEqual(executor._calls, ['start'])
|
||||
self.assertFalse(waiter_finished.is_set())
|
||||
|
||||
# Let the runner complete
|
||||
finish_event.set()
|
||||
waiter.wait()
|
||||
runner.wait()
|
||||
|
||||
# Check that both threads have finished, start was only called once,
|
||||
# and execute ran
|
||||
self.assertTrue(waiter_finished.is_set())
|
||||
self.assertEqual(executor._calls, ['start', 'execute'])
|
||||
|
||||
def test_start_stop_wait_stop_wait(self):
|
||||
# Test that we behave correctly when calling stop/wait more than once.
|
||||
# Subsequent calls should be noops.
|
||||
|
||||
self.server.start()
|
||||
self.server.stop()
|
||||
self.server.wait()
|
||||
self.server.stop()
|
||||
self.server.wait()
|
||||
|
||||
self.assertEqual(len(self.executors), 1)
|
||||
executor = self.executors[0]
|
||||
self.assertEqual(executor._calls,
|
||||
['start', 'execute', 'stop', 'wait'])
|
||||
self.assertTrue(executor.listener.cleanup.called)
|
||||
|
||||
def test_state_wrapping(self):
|
||||
# Test that we behave correctly if a thread waits, and the server state
|
||||
# has wrapped when it it next scheduled
|
||||
|
||||
# Ensure that if 2 threads wait for the completion of 'start', the
|
||||
# first will wait until complete_event is signalled, but the second
|
||||
# will continue
|
||||
complete_event = threading.Event()
|
||||
complete_waiting_callback = threading.Event()
|
||||
|
||||
start_state = self.server._states['start']
|
||||
old_wait_for_completion = start_state.wait_for_completion
|
||||
waited = [False]
|
||||
def new_wait_for_completion(*args, **kwargs):
|
||||
if not waited[0]:
|
||||
waited[0] = True
|
||||
complete_waiting_callback.set()
|
||||
complete_event.wait()
|
||||
old_wait_for_completion(*args, **kwargs)
|
||||
start_state.wait_for_completion = new_wait_for_completion
|
||||
|
||||
# thread1 will wait for start to complete until we signal it
|
||||
thread1 = eventlet.spawn(self.server.stop)
|
||||
thread1_finished = threading.Event()
|
||||
thread1.link(lambda _: thread1_finished.set())
|
||||
|
||||
self.server.start()
|
||||
complete_waiting_callback.wait()
|
||||
|
||||
# The server should have started, but stop should not have been called
|
||||
self.assertEqual(1, len(self.executors))
|
||||
self.assertEqual(self.executors[0]._calls, ['start', 'execute'])
|
||||
self.assertFalse(thread1_finished.is_set())
|
||||
|
||||
self.server.stop()
|
||||
self.server.wait()
|
||||
|
||||
# We should have gone through all the states, and thread1 should still
|
||||
# be waiting
|
||||
self.assertEqual(1, len(self.executors))
|
||||
self.assertEqual(self.executors[0]._calls, ['start', 'execute',
|
||||
'stop', 'wait'])
|
||||
self.assertFalse(thread1_finished.is_set())
|
||||
|
||||
# Start again
|
||||
self.server.start()
|
||||
|
||||
# We should now record 2 executors
|
||||
self.assertEqual(2, len(self.executors))
|
||||
self.assertEqual(self.executors[0]._calls, ['start', 'execute',
|
||||
'stop', 'wait'])
|
||||
self.assertEqual(self.executors[1]._calls, ['start', 'execute'])
|
||||
self.assertFalse(thread1_finished.is_set())
|
||||
|
||||
# Allow thread1 to complete
|
||||
complete_event.set()
|
||||
thread1_finished.wait()
|
||||
|
||||
# thread1 should now have finished, and stop should not have been
|
||||
# called again on either the first or second executor
|
||||
self.assertEqual(2, len(self.executors))
|
||||
self.assertEqual(self.executors[0]._calls, ['start', 'execute',
|
||||
'stop', 'wait'])
|
||||
self.assertEqual(self.executors[1]._calls, ['start', 'execute'])
|
||||
self.assertTrue(thread1_finished.is_set())
|
||||
|
||||
@mock.patch.object(server_module, 'DEFAULT_LOG_AFTER', 1)
|
||||
@mock.patch.object(server_module, 'LOG')
|
||||
def test_logging(self, mock_log):
|
||||
# Test that we generate a log message if we wait longer than
|
||||
# DEFAULT_LOG_AFTER
|
||||
|
||||
log_event = threading.Event()
|
||||
mock_log.warn.side_effect = lambda _: log_event.set()
|
||||
|
||||
# Call stop without calling start. We should log a wait after 1 second
|
||||
thread = eventlet.spawn(self.server.stop)
|
||||
log_event.wait()
|
||||
|
||||
# Redundant given that we already waited, but it's nice to assert
|
||||
self.assertTrue(mock_log.warn.called)
|
||||
thread.kill()
|
||||
|
||||
@mock.patch.object(server_module, 'LOG')
|
||||
def test_logging_explicit_wait(self, mock_log):
|
||||
# Test that we generate a log message if we wait longer than
|
||||
# the number of seconds passed to log_after
|
||||
|
||||
log_event = threading.Event()
|
||||
mock_log.warn.side_effect = lambda _: log_event.set()
|
||||
|
||||
# Call stop without calling start. We should log a wait after 1 second
|
||||
thread = eventlet.spawn(self.server.stop, log_after=1)
|
||||
log_event.wait()
|
||||
|
||||
# Redundant given that we already waited, but it's nice to assert
|
||||
self.assertTrue(mock_log.warn.called)
|
||||
thread.kill()
|
||||
|
||||
@mock.patch.object(server_module, 'LOG')
|
||||
def test_logging_with_timeout(self, mock_log):
|
||||
# Test that we log a message after log_after seconds if we've also
|
||||
# specified an absolute timeout
|
||||
|
||||
log_event = threading.Event()
|
||||
mock_log.warn.side_effect = lambda _: log_event.set()
|
||||
|
||||
# Call stop without calling start. We should log a wait after 1 second
|
||||
thread = eventlet.spawn(self.server.stop, log_after=1, timeout=2)
|
||||
log_event.wait()
|
||||
|
||||
# Redundant given that we already waited, but it's nice to assert
|
||||
self.assertTrue(mock_log.warn.called)
|
||||
thread.kill()
|
||||
|
||||
def test_timeout_wait(self):
|
||||
# Test that we will eventually timeout when passing the timeout option
|
||||
# if a preceding condition is not satisfied.
|
||||
|
||||
self.assertRaises(server_module.TaskTimeout,
|
||||
self.server.stop, timeout=1)
|
||||
|
||||
def test_timeout_running(self):
|
||||
# Test that we will eventually timeout if we're waiting for another
|
||||
# thread to complete this task
|
||||
|
||||
# Start the server, which will also instantiate an executor
|
||||
self.server.start()
|
||||
|
||||
stop_called = threading.Event()
|
||||
|
||||
# Patch the executor's stop method to be very slow
|
||||
def slow_stop():
|
||||
stop_called.set()
|
||||
eventlet.sleep(10)
|
||||
self.executors[0].stop = slow_stop
|
||||
|
||||
# Call stop in a new thread
|
||||
thread = eventlet.spawn(self.server.stop)
|
||||
|
||||
# Wait until the thread is in the slow stop method
|
||||
stop_called.wait()
|
||||
|
||||
# Call stop again in the main thread with a timeout
|
||||
self.assertRaises(server_module.TaskTimeout,
|
||||
self.server.stop, timeout=1)
|
||||
thread.kill()
|
||||
|
||||
@mock.patch.object(server_module, 'LOG')
|
||||
def test_log_after_zero(self, mock_log):
|
||||
# Test that we do not log a message after DEFAULT_LOG_AFTER if the
|
||||
# caller gave log_after=1
|
||||
|
||||
# Call stop without calling start.
|
||||
self.assertRaises(server_module.TaskTimeout,
|
||||
self.server.stop, log_after=0, timeout=2)
|
||||
|
||||
# We timed out. Ensure we didn't log anything.
|
||||
self.assertFalse(mock_log.warn.called)
|
||||
|
||||
Reference in New Issue
Block a user