diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py index bba7228c9..aaab4bdf4 100644 --- a/oslo_messaging/_drivers/amqp1_driver/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -36,7 +36,6 @@ import threading import time import uuid -from oslo_utils import eventletutils import proton import pyngus @@ -79,7 +78,7 @@ class SubscribeTask(Task): self._subscriber_id = listener.id self._in_queue = listener.incoming self._service = SERVICE_NOTIFY if notifications else SERVICE_RPC - self._wakeup = eventletutils.Event() + self._wakeup = eventloop.Event() def wait(self): self._wakeup.wait() @@ -106,7 +105,7 @@ class SendTask(Task): self.service = SERVICE_NOTIFY if notification else SERVICE_RPC self.timer = None self._retry = None if retry is None or retry < 0 else retry - self._wakeup = eventletutils.Event() + self._wakeup = eventloop.Event() self._error = None self._sender = None diff --git a/oslo_messaging/_drivers/amqp1_driver/eventloop.py b/oslo_messaging/_drivers/amqp1_driver/eventloop.py index 7cfd2ea4e..851988561 100644 --- a/oslo_messaging/_drivers/amqp1_driver/eventloop.py +++ b/oslo_messaging/_drivers/amqp1_driver/eventloop.py @@ -35,6 +35,9 @@ import threading import time import uuid +from oslo_messaging import _utils +from oslo_utils import eventletutils + LOG = logging.getLogger(__name__) @@ -268,12 +271,55 @@ class Requests(object): r() -class Thread(threading.Thread): +_threading_module = None +_event_module = None + + +def initialize_threading(pthread): + global _threading_module + global _event_module + if pthread: + # We need the original, un-monkey-patched Thread class so a real + # pthread is created. Also requires the original, un-monkey-patched + # Event class for synchronization with the controller across threads. + _threading_module = _utils.stdlib_threading + _event_module = _threading_module + else: + # Use whatever threading environment is present. This may or may not + # be under eventlet. The eventletutils.Event class will provide the + # correct underlying primitive in either case. + _threading_module = threading + _event_module = eventletutils + + +def Thread(*args, **kwargs): + """Dynamically-generate the Thread class with varying parent class depending + on the configured threading module. + """ + klass = type("Thread", (_threading_module.Thread,), + _Thread.__dict__.copy()) + return klass(*args, **kwargs) + + +def Event(): + """Wrapper around either threading.Event or eventletutils.Event. This is + used by SubscribeTask and SendTask in the controller. The synchronization + primitive needs to vary depending on which threading environment is being + used for the eventloop. + """ + return _event_module.Event() + + +class _Thread(object): """Manages socket I/O and executes callables queued up by external threads. """ def __init__(self, container_name, node, command, pid): - super(Thread, self).__init__() + # This calls __init__ in whichever threading module was chosen + # based on the eventloop_in_pthread option. _Thread objects + # are not created directly; instead used to dynamically generate the + # Thread type from the module-level Thread() function above + super(type(self), self).__init__() # callables from other threads: self._requests = Requests() diff --git a/oslo_messaging/_drivers/amqp1_driver/opts.py b/oslo_messaging/_drivers/amqp1_driver/opts.py index 91655242b..a9ef435a8 100644 --- a/oslo_messaging/_drivers/amqp1_driver/opts.py +++ b/oslo_messaging/_drivers/amqp1_driver/opts.py @@ -90,6 +90,16 @@ amqp1_opts = [ default='', help='SASL realm to use if no realm present in username'), + cfg.BoolOpt('eventloop_in_pthread', + default=False, + help=("Run the eventloop thread in a native python thread. " + "If this option is equal to False then the eventloop " + "thread will inherit the execution model from the " + "parent process. For example if the parent process has " + "monkey patched the stdlib by using eventlet/greenlet " + "then the eventloop will be run through a green " + "thread.")), + # Network connection failure retry options cfg.IntOpt('connection_retry_interval', diff --git a/oslo_messaging/_drivers/impl_amqp1.py b/oslo_messaging/_drivers/impl_amqp1.py index d464742cd..bd310a77f 100644 --- a/oslo_messaging/_drivers/impl_amqp1.py +++ b/oslo_messaging/_drivers/impl_amqp1.py @@ -33,6 +33,7 @@ from oslo_utils import importutils from oslo_utils import timeutils from oslo_messaging._drivers.amqp1_driver.eventloop import compute_timeout +from oslo_messaging._drivers.amqp1_driver.eventloop import initialize_threading from oslo_messaging._drivers.amqp1_driver import opts from oslo_messaging._drivers import base from oslo_messaging._drivers import common @@ -262,6 +263,8 @@ class ProtonDriver(base.BaseDriver): LOG.warning("Ignoring unrecognized pre_settle value(s): %s", " ".join(bad_opts)) + initialize_threading(opt_name.eventloop_in_pthread) + def _ensure_connect_called(func): """Causes a new controller to be created when the messaging service is first used by the current process. It is safe to push tasks to it diff --git a/oslo_messaging/tests/drivers/test_amqp_driver.py b/oslo_messaging/tests/drivers/test_amqp_driver.py index f58a40d79..b19230f3a 100644 --- a/oslo_messaging/tests/drivers/test_amqp_driver.py +++ b/oslo_messaging/tests/drivers/test_amqp_driver.py @@ -235,6 +235,10 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto): driver.cleanup() + def test_send_no_reply_eventloop_in_pthread(self): + self.config(eventloop_in_pthread=True, group="oslo_messaging_amqp") + self.test_send_no_reply() + def test_send_exchange_with_reply(self): driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) target1 = oslo_messaging.Target(topic="test-topic", exchange="e1")