Enhance start/stop concurrency race condition fix

We want to ensure start have really finish before the caller
use stop to not go into weird driver state.

Inteads of raise RuntimeError or just printing warning, we
now use a lock to protect the state change of the server.

So stop will just wait that the server finish to start before
stopping.

This also removes the need to start/stop the server from the
same thread just to fix the state consistency of drivers when
thread or eventlet executor is used.

Change-Id: Ia7346f6cc0bbe8631a2e3318c7031b34ab1ec510
This commit is contained in:
Mehdi Abaakouk
2015-09-16 14:07:52 +02:00
parent c5b7ab6f1c
commit 5a98303eee
4 changed files with 46 additions and 79 deletions

View File

@@ -271,22 +271,10 @@ class Consumer(object):
message.ack() message.ack()
class DummyConnectionLock(object): class DummyConnectionLock(_utils.DummyLock):
def acquire(self):
pass
def release(self):
pass
def heartbeat_acquire(self): def heartbeat_acquire(self):
pass pass
def __enter__(self):
self.acquire()
def __exit__(self, type, value, traceback):
self.release()
class ConnectionLock(DummyConnectionLock): class ConnectionLock(DummyConnectionLock):
"""Lock object to protect access the the kombu connection """Lock object to protect access the the kombu connection

View File

@@ -114,3 +114,17 @@ def fetch_current_thread_functor():
return lambda: eventlet.getcurrent() return lambda: eventlet.getcurrent()
else: else:
return lambda: threading.current_thread() return lambda: threading.current_thread()
class DummyLock(object):
def acquire(self):
pass
def release(self):
pass
def __enter__(self):
self.acquire()
def __exit__(self, type, value, traceback):
self.release()

View File

@@ -24,6 +24,7 @@ __all__ = [
] ]
import logging import logging
import threading
from oslo_service import service from oslo_service import service
from stevedore import driver from stevedore import driver
@@ -92,7 +93,14 @@ class MessageHandlingServer(service.ServiceBase):
self.dispatcher = dispatcher self.dispatcher = dispatcher
self.executor = executor self.executor = executor
self._get_thread_id = _utils.fetch_current_thread_functor() # NOTE(sileht): we use a lock to protect the state change of the
# server, we don't want to call stop until the transport driver
# is fully started. Except for the blocking executor that have
# start() that doesn't return
if self.executor != "blocking":
self._state_lock = threading.Lock()
else:
self._state_lock = _utils.DummyLock()
try: try:
mgr = driver.DriverManager('oslo.messaging.executors', mgr = driver.DriverManager('oslo.messaging.executors',
@@ -103,7 +111,6 @@ class MessageHandlingServer(service.ServiceBase):
self._executor_cls = mgr.driver self._executor_cls = mgr.driver
self._executor = None self._executor = None
self._running = False self._running = False
self._thread_id = None
super(MessageHandlingServer, self).__init__() super(MessageHandlingServer, self).__init__()
@@ -121,8 +128,6 @@ class MessageHandlingServer(service.ServiceBase):
choose to dispatch messages in a new thread, coroutine or simply the choose to dispatch messages in a new thread, coroutine or simply the
current thread. current thread.
""" """
self._check_same_thread_id()
if self._executor is not None: if self._executor is not None:
return return
try: try:
@@ -130,20 +135,11 @@ class MessageHandlingServer(service.ServiceBase):
except driver_base.TransportDriverError as ex: except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex) raise ServerListenError(self.target, ex)
self._running = True with self._state_lock:
self._executor = self._executor_cls(self.conf, listener, self._running = True
self.dispatcher) self._executor = self._executor_cls(self.conf, listener,
self._executor.start() self.dispatcher)
self._executor.start()
def _check_same_thread_id(self):
if self._thread_id is None:
self._thread_id = self._get_thread_id()
elif self._thread_id != self._get_thread_id():
# NOTE(dims): Need to change this to raise RuntimeError after
# verifying/fixing other openstack projects (like Neutron)
# work ok with this change
LOG.warn(_LW("start/stop/wait must be called in the "
"same thread"))
def stop(self): def stop(self):
"""Stop handling incoming messages. """Stop handling incoming messages.
@@ -153,11 +149,10 @@ class MessageHandlingServer(service.ServiceBase):
some messages, and underlying driver resources associated to this some messages, and underlying driver resources associated to this
server are still in use. See 'wait' for more details. server are still in use. See 'wait' for more details.
""" """
self._check_same_thread_id() with self._state_lock:
if self._executor is not None:
if self._executor is not None: self._running = False
self._running = False self._executor.stop()
self._executor.stop()
def wait(self): def wait(self):
"""Wait for message processing to complete. """Wait for message processing to complete.
@@ -169,25 +164,21 @@ class MessageHandlingServer(service.ServiceBase):
Once it's finished, the underlying driver resources associated to this Once it's finished, the underlying driver resources associated to this
server are released (like closing useless network connections). server are released (like closing useless network connections).
""" """
self._check_same_thread_id() with self._state_lock:
if self._running:
# NOTE(dims): Need to change this to raise RuntimeError after
# verifying/fixing other openstack projects (like Neutron)
# work ok with this change
LOG.warn(_LW("wait() should be called after stop() as it "
"waits for existing messages to finish "
"processing"))
if self._running: if self._executor is not None:
# NOTE(dims): Need to change this to raise RuntimeError after self._executor.wait()
# verifying/fixing other openstack projects (like Neutron) # Close listener connection after processing all messages
# work ok with this change self._executor.listener.cleanup()
LOG.warn(_LW("wait() should be called after stop() as it "
"waits for existing messages to finish "
"processing"))
if self._executor is not None: self._executor = None
self._executor.wait()
# Close listener connection after processing all messages
self._executor.listener.cleanup()
self._executor = None
# NOTE(sileht): executor/listener have been properly stopped
# allow to restart it into another thread
self._thread_id = None
def reset(self): def reset(self):
"""Reset service. """Reset service.

View File

@@ -21,7 +21,6 @@ import testscenarios
import mock import mock
import oslo_messaging import oslo_messaging
from oslo_messaging.tests import utils as test_utils from oslo_messaging.tests import utils as test_utils
from six.moves import mock
load_tests = testscenarios.load_tests_apply_scenarios load_tests = testscenarios.load_tests_apply_scenarios
@@ -151,31 +150,6 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
'stop() as it waits for existing ' 'stop() as it waits for existing '
'messages to finish processing') 'messages to finish processing')
@mock.patch('oslo_messaging._executors.impl_pooledexecutor.'
'PooledExecutor.wait')
def test_server_invalid_stop_from_other_thread(self, mock_wait):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
serializer=serializer,
executor='eventlet')
t = test_utils.ServerThreadHelper(server)
t.start()
self.addCleanup(t.join)
self.addCleanup(t.stop)
with mock.patch('logging.Logger.warn') as warn:
server.stop()
warn.assert_called_with('start/stop/wait must be called '
'in the same thread')
with mock.patch('logging.Logger.warn') as warn:
server.wait()
warn.assert_called_with('start/stop/wait must be called '
'in the same thread')
def test_no_target_server(self): def test_no_target_server(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:') transport = oslo_messaging.get_transport(self.conf, url='fake:')