Merge "Allow users run the rabbitmq heartbeat inside a standard pthread."

This commit is contained in:
Zuul 2019-08-08 15:50:07 +00:00 committed by Gerrit Code Review
commit 1541b0c7f9
2 changed files with 57 additions and 1 deletions

View File

@ -33,6 +33,7 @@ import kombu.messaging
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import eventletutils
from oslo_utils import importutils
import six
import six.moves
from six.moves.urllib import parse
@ -46,6 +47,18 @@ from oslo_messaging._drivers import pool
from oslo_messaging import _utils
from oslo_messaging import exceptions
eventlet = importutils.try_import('eventlet')
if eventlet and eventletutils.is_monkey_patched("thread"):
# Here we initialize module with the native python threading module
# if it was already monkey patched by eventlet/greenlet.
stdlib_threading = eventlet.patcher.original('threading')
else:
# Manage the case where we run this driver in a non patched environment
# and where user even so configure the driver to run heartbeat through
# a python thread, if we don't do that when the heartbeat will start
# we will facing an issue by trying to override the threading module.
stdlib_threading = threading
# NOTE(sileht): don't exists in py2 socket module
TCP_USER_TIMEOUT = 18
@ -75,6 +88,15 @@ rabbit_opts = [
deprecated_name='kombu_ssl_ca_certs',
help='SSL certification authority file '
'(valid only if SSL enabled).'),
cfg.BoolOpt('heartbeat_in_pthread',
default=False,
help="EXPERIMENTAL: Run the health check heartbeat thread"
"through a native python thread. By default if this"
"option isn't provided the health check heartbeat will"
"inherit the execution model from the parent process. By"
"example if the parent process have monkey patched the"
"stdlib by using eventlet/greenlet then the heartbeat"
"will be run through a green thread."),
cfg.FloatOpt('kombu_reconnect_delay',
default=1.0,
deprecated_group='DEFAULT',
@ -442,6 +464,34 @@ class Connection(object):
driver_conf.kombu_missing_consumer_retry_timeout
self.kombu_failover_strategy = driver_conf.kombu_failover_strategy
self.kombu_compression = driver_conf.kombu_compression
self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread
if self.heartbeat_in_pthread:
# NOTE(hberaud): Experimental: threading module is in use to run
# the rabbitmq health check heartbeat. in some situation like
# with nova-api, nova need green threads to run the cells
# mechanismes in an async mode, so they used eventlet and
# greenlet to monkey patch the python stdlib and get green threads.
# The issue here is that nova-api run under the apache MPM prefork
# module and mod_wsgi. The apache prefork module doesn't support
# epoll and recent kernel features, and evenlet is built over epoll
# and libevent, so when we run the rabbitmq heartbeat we inherit
# from the execution model of the parent process (nova-api), and
# in this case we will run the heartbeat through a green thread.
# We want to allow users to choose between pthread and
# green threads if needed in some specific situations.
# This experimental feature allow user to use pthread in an env
# that doesn't support eventlet without forcing the parent process
# to stop to use eventlet if they need monkey patching for some
# specific reasons.
# If users want to use pthread we need to make sure that we
# will use the *native* threading module for
# initialize the heartbeat thread.
# Here we override globaly the previously imported
# threading module with the native python threading module
# if it was already monkey patched by eventlet/greenlet.
global threading
threading = stdlib_threading
if self.ssl:
self.ssl_version = driver_conf.ssl_version
@ -905,7 +955,7 @@ class Connection(object):
def _heartbeat_start(self):
if self._heartbeat_supported_and_enabled():
self._heartbeat_exit_event = eventletutils.Event()
self._heartbeat_exit_event = threading.Event()
self._heartbeat_thread = threading.Thread(
target=self._heartbeat_thread_job, name="Rabbit-heartbeat")
self._heartbeat_thread.daemon = True

View File

@ -89,6 +89,11 @@ class TestHeartbeat(test_utils.BaseTestCase):
info='A recoverable connection/channel error occurred, '
'trying to reconnect: %s')
def test_run_heartbeat_in_pthread(self):
self.config(heartbeat_in_pthread=True,
group="oslo_messaging_rabbit")
self._do_test_heartbeat_sent()
class TestRabbitQos(test_utils.BaseTestCase):
@ -999,6 +1004,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
class ConnectionLockTestCase(test_utils.BaseTestCase):
def _thread(self, lock, sleep, heartbeat=False):
def thread_task():
if heartbeat:
with lock.for_heartbeat():