WIP amqp1: add option eventloop_in_pthread
Change-Id: I6df90254c75174e899fb5d3d74fa86fc04e59823
This commit is contained in:
parent
ca939fc0e4
commit
864b85e807
|
@ -36,7 +36,6 @@ import threading
|
|||
import time
|
||||
import uuid
|
||||
|
||||
from oslo_utils import eventletutils
|
||||
import proton
|
||||
import pyngus
|
||||
|
||||
|
@ -79,7 +78,7 @@ class SubscribeTask(Task):
|
|||
self._subscriber_id = listener.id
|
||||
self._in_queue = listener.incoming
|
||||
self._service = SERVICE_NOTIFY if notifications else SERVICE_RPC
|
||||
self._wakeup = eventletutils.Event()
|
||||
self._wakeup = eventloop.Event()
|
||||
|
||||
def wait(self):
|
||||
self._wakeup.wait()
|
||||
|
@ -106,7 +105,7 @@ class SendTask(Task):
|
|||
self.service = SERVICE_NOTIFY if notification else SERVICE_RPC
|
||||
self.timer = None
|
||||
self._retry = None if retry is None or retry < 0 else retry
|
||||
self._wakeup = eventletutils.Event()
|
||||
self._wakeup = eventloop.Event()
|
||||
self._error = None
|
||||
self._sender = None
|
||||
|
||||
|
|
|
@ -35,6 +35,9 @@ import threading
|
|||
import time
|
||||
import uuid
|
||||
|
||||
from oslo_messaging import _utils
|
||||
from oslo_utils import eventletutils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -268,12 +271,55 @@ class Requests(object):
|
|||
r()
|
||||
|
||||
|
||||
class Thread(threading.Thread):
|
||||
_threading_module = None
|
||||
_event_module = None
|
||||
|
||||
|
||||
def initialize_threading(pthread):
|
||||
global _threading_module
|
||||
global _event_module
|
||||
if pthread:
|
||||
# We need the original, un-monkey-patched Thread class so a real
|
||||
# pthread is created. Also requires the original, un-monkey-patched
|
||||
# Event class for synchronization with the controller across threads.
|
||||
_threading_module = _utils.stdlib_threading
|
||||
_event_module = _threading_module
|
||||
else:
|
||||
# Use whatever threading environment is present. This may or may not
|
||||
# be under eventlet. The eventletutils.Event class will provide the
|
||||
# correct underlying primitive in either case.
|
||||
_threading_module = threading
|
||||
_event_module = eventletutils
|
||||
|
||||
|
||||
def Thread(*args, **kwargs):
|
||||
"""Dynamically-generate the Thread class with varying parent class depending
|
||||
on the configured threading module.
|
||||
"""
|
||||
klass = type("Thread", (_threading_module.Thread,),
|
||||
_Thread.__dict__.copy())
|
||||
return klass(*args, **kwargs)
|
||||
|
||||
|
||||
def Event():
|
||||
"""Wrapper around either threading.Event or eventletutils.Event. This is
|
||||
used by SubscribeTask and SendTask in the controller. The synchronization
|
||||
primitive needs to vary depending on which threading environment is being
|
||||
used for the eventloop.
|
||||
"""
|
||||
return _event_module.Event()
|
||||
|
||||
|
||||
class _Thread(object):
|
||||
"""Manages socket I/O and executes callables queued up by external
|
||||
threads.
|
||||
"""
|
||||
def __init__(self, container_name, node, command, pid):
|
||||
super(Thread, self).__init__()
|
||||
# This calls __init__ in whichever threading module was chosen
|
||||
# based on the eventloop_in_pthread option. _Thread objects
|
||||
# are not created directly; instead used to dynamically generate the
|
||||
# Thread type from the module-level Thread() function above
|
||||
super(type(self), self).__init__()
|
||||
|
||||
# callables from other threads:
|
||||
self._requests = Requests()
|
||||
|
|
|
@ -90,6 +90,16 @@ amqp1_opts = [
|
|||
default='',
|
||||
help='SASL realm to use if no realm present in username'),
|
||||
|
||||
cfg.BoolOpt('eventloop_in_pthread',
|
||||
default=False,
|
||||
help=("Run the eventloop thread in a native python thread. "
|
||||
"If this option is equal to False then the eventloop "
|
||||
"thread will inherit the execution model from the "
|
||||
"parent process. For example if the parent process has "
|
||||
"monkey patched the stdlib by using eventlet/greenlet "
|
||||
"then the eventloop will be run through a green "
|
||||
"thread.")),
|
||||
|
||||
# Network connection failure retry options
|
||||
|
||||
cfg.IntOpt('connection_retry_interval',
|
||||
|
|
|
@ -33,6 +33,7 @@ from oslo_utils import importutils
|
|||
from oslo_utils import timeutils
|
||||
|
||||
from oslo_messaging._drivers.amqp1_driver.eventloop import compute_timeout
|
||||
from oslo_messaging._drivers.amqp1_driver.eventloop import initialize_threading
|
||||
from oslo_messaging._drivers.amqp1_driver import opts
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers import common
|
||||
|
@ -262,6 +263,8 @@ class ProtonDriver(base.BaseDriver):
|
|||
LOG.warning("Ignoring unrecognized pre_settle value(s): %s",
|
||||
" ".join(bad_opts))
|
||||
|
||||
initialize_threading(opt_name.eventloop_in_pthread)
|
||||
|
||||
def _ensure_connect_called(func):
|
||||
"""Causes a new controller to be created when the messaging service is
|
||||
first used by the current process. It is safe to push tasks to it
|
||||
|
|
|
@ -235,6 +235,10 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto):
|
|||
|
||||
driver.cleanup()
|
||||
|
||||
def test_send_no_reply_eventloop_in_pthread(self):
|
||||
self.config(eventloop_in_pthread=True, group="oslo_messaging_amqp")
|
||||
self.test_send_no_reply()
|
||||
|
||||
def test_send_exchange_with_reply(self):
|
||||
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
|
||||
target1 = oslo_messaging.Target(topic="test-topic", exchange="e1")
|
||||
|
|
Loading…
Reference in New Issue