From b9e134d7e955b9180482d2f7c8844501c750adf6 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Wed, 21 Jan 2015 09:13:10 +0100 Subject: [PATCH] rabbit: heartbeat implementation AMQP offers a heartbeat feature to ensure that the application layer promptly finds out about disrupted connections (and also completely unresponsive peers). If the client requests heartbeats on connection, rabbit server will regularly send messages to each connections with the expectation of a response. To acheive this, each driver connection object spawn a thread that send/retrieve heartbeat packets exchanged between the server and the client. To protect the concurrency access to the kombu connection between the driver and this thread use a lock that always prioritize the heartbeat thread. So when the heartbeat thread wakes up it will acquire the lock quickly, to ensure we have no heartbeat starvation when the driver sends a lot of messages. Also when we are polling the broker, the lock can be held for a long time by the 'consume' method, so this one does the heartbeat stuffs itself. DocImpact: 2 new configuration options for Rabbit driver Co-Authored-By: Oleksii Zamiatin Co-Authored-By: Ilya Pekelny Related-Bug: #1371723 Closes-Bug: #856764 Change-Id: I1d3a635f3853bc13ffc14034468f1ac6262c11a3 --- oslo_messaging/_drivers/amqp.py | 32 ++- oslo_messaging/_drivers/amqpdriver.py | 15 +- oslo_messaging/_drivers/impl_qpid.py | 2 +- oslo_messaging/_drivers/impl_rabbit.py | 264 ++++++++++++++++-- .../tests/drivers/test_impl_qpid.py | 7 +- .../tests/drivers/test_impl_rabbit.py | 122 +++++++- tests/drivers/test_impl_qpid.py | 7 +- tests/drivers/test_impl_rabbit.py | 14 +- 8 files changed, 424 insertions(+), 39 deletions(-) diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py index ebae514e3..0d3dc5194 100644 --- a/oslo_messaging/_drivers/amqp.py +++ b/oslo_messaging/_drivers/amqp.py @@ -55,6 +55,26 @@ amqp_opts = [ UNIQUE_ID = '_unique_id' LOG = logging.getLogger(__name__) +# NOTE(sileht): Even if rabbit/qpid have only one Connection class, +# this connection can be used for two purposes: +# * wait and receive amqp messages (only do read stuffs on the socket) +# * send messages to the broker (only do write stuffs on the socket) +# The code inside a connection class is not concurrency safe. +# Using one Connection class instance for doing both, will result +# of eventlet complaining of multiple greenthreads that read/write the +# same fd concurrently... because 'send' and 'listen' run in different +# greenthread. +# So, a connection cannot be shared between thread/greenthread and +# this two variables permit to define the purpose of the connection +# to allow drivers to add special handling if needed (like heatbeat). +# amqp drivers create 3 kind of connections: +# * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection +# * driver.send*(): a pool of 'PURPOSE_SEND' connections is used +# * driver internally have another 'PURPOSE_LISTEN' connection dedicated +# to wait replies of rpc call +PURPOSE_LISTEN = 'listen' +PURPOSE_SEND = 'send' + class ConnectionPool(pool.Pool): """Class that implements a Pool of Connections.""" @@ -66,9 +86,11 @@ class ConnectionPool(pool.Pool): self.reply_proxy = None # TODO(comstud): Timeout connections not used in a while - def create(self): + def create(self, purpose=None): + if purpose is None: + purpose = PURPOSE_SEND LOG.debug('Pool creating new connection') - return self.connection_cls(self.conf, self.url) + return self.connection_cls(self.conf, self.url, purpose) def empty(self): for item in self.iter_free(): @@ -87,16 +109,18 @@ class ConnectionContext(rpc_common.Connection): If possible the function makes sure to return a connection to the pool. """ - def __init__(self, connection_pool, pooled=True): + def __init__(self, connection_pool, purpose): """Create a new connection, or get one from the pool.""" self.connection = None self.connection_pool = connection_pool + pooled = purpose == PURPOSE_SEND if pooled: self.connection = connection_pool.get() else: # a non-pooled connection is requested, so create a new connection - self.connection = connection_pool.create() + self.connection = connection_pool.create(purpose) self.pooled = pooled + self.connection.pooled = pooled def __enter__(self): """When with ConnectionContext() is used, return self.""" diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 9e967c4da..bbc4b7828 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -69,7 +69,8 @@ class AMQPIncomingMessage(base.IncomingMessage): # NOTE(Alexei_987) not sending reply, if msg_id is empty # because reply should not be expected by caller side return - with self.listener.driver._get_connection() as conn: + with self.listener.driver._get_connection( + rpc_amqp.PURPOSE_SEND) as conn: self._send_reply(conn, reply, failure, log_failure=log_failure) self._send_reply(conn, ending=True) @@ -268,9 +269,9 @@ class AMQPDriverBase(base.BaseDriver): def _get_exchange(self, target): return target.exchange or self._default_exchange - def _get_connection(self, pooled=True): + def _get_connection(self, purpose=rpc_amqp.PURPOSE_SEND): return rpc_amqp.ConnectionContext(self._connection_pool, - pooled=pooled) + purpose=purpose) def _get_reply_q(self): with self._reply_q_lock: @@ -279,7 +280,7 @@ class AMQPDriverBase(base.BaseDriver): reply_q = 'reply_' + uuid.uuid4().hex - conn = self._get_connection(pooled=False) + conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN) self._waiter = ReplyWaiter(reply_q, conn, self._allowed_remote_exmods) @@ -320,7 +321,7 @@ class AMQPDriverBase(base.BaseDriver): self._waiter.listen(msg_id) try: - with self._get_connection() as conn: + with self._get_connection(rpc_amqp.PURPOSE_SEND) as conn: if notify: conn.notify_send(self._get_exchange(target), target.topic, msg, retry=retry) @@ -353,7 +354,7 @@ class AMQPDriverBase(base.BaseDriver): envelope=(version == 2.0), notify=True, retry=retry) def listen(self, target): - conn = self._get_connection(pooled=False) + conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN) listener = AMQPListener(self, conn) @@ -369,7 +370,7 @@ class AMQPDriverBase(base.BaseDriver): return listener def listen_for_notifications(self, targets_and_priorities, pool): - conn = self._get_connection(pooled=False) + conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN) listener = AMQPListener(self, conn) for target, priority in targets_and_priorities: diff --git a/oslo_messaging/_drivers/impl_qpid.py b/oslo_messaging/_drivers/impl_qpid.py index 464023054..3a2b29692 100644 --- a/oslo_messaging/_drivers/impl_qpid.py +++ b/oslo_messaging/_drivers/impl_qpid.py @@ -462,7 +462,7 @@ class Connection(object): pools = {} - def __init__(self, conf, url): + def __init__(self, conf, url, purpose): if not qpid_messaging: raise ImportError("Failed to import qpid.messaging") diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index b06d290aa..d10ac2247 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -12,19 +12,20 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib import functools import itertools import logging import os import socket import ssl +import threading import time import uuid import kombu import kombu.connection import kombu.entity -import kombu.exceptions import kombu.messaging from oslo_config import cfg from oslo_utils import netutils @@ -35,6 +36,7 @@ from oslo_messaging._drivers import amqp as rpc_amqp from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import common as rpc_common from oslo_messaging._i18n import _ +from oslo_messaging._i18n import _LE from oslo_messaging._i18n import _LI from oslo_messaging._i18n import _LW from oslo_messaging import exceptions @@ -120,6 +122,15 @@ rabbit_opts = [ help='Use HA queues in RabbitMQ (x-ha-policy: all). ' 'If you change this option, you must wipe the ' 'RabbitMQ database.'), + cfg.IntOpt('heartbeat_timeout_threshold', + default=60, + help="Number of seconds after which the Rabbit broker is " + "considered down if heartbeat's keep-alive fails " + "(0 disable the heartbeat)."), + cfg.IntOpt('heartbeat_rate', + default=2, + help='How often times during the heartbeat_timeout_threshold ' + 'we check the heartbeat.'), # NOTE(sileht): deprecated option since oslo_messaging 1.5.0, cfg.BoolOpt('fake_rabbit', @@ -460,12 +471,119 @@ class NotifyPublisher(TopicPublisher): queue.declare() +class DummyConnectionLock(object): + def acquire(self): + pass + + def release(self): + pass + + def heartbeat_acquire(self): + pass + + def __enter__(self): + self.acquire() + + def __exit__(self, type, value, traceback): + self.release() + + +class ConnectionLock(DummyConnectionLock): + """Lock object to protect access the the kombu connection + + This is a lock object to protect access the the kombu connection + object between the heartbeat thread and the driver thread. + + They are two way to acquire this lock: + * lock.acquire() + * lock.heartbeat_acquire() + + In both case lock.release(), release the lock. + + The goal is that the heartbeat thread always have the priority + for acquiring the lock. This ensures we have no heartbeat + starvation when the driver sends a lot of messages. + + So when lock.heartbeat_acquire() is called next time the lock + is released(), the caller unconditionnaly acquires + the lock, even someone else have asked for the lock before it. + """ + + def __init__(self): + self._workers_waiting = 0 + self._heartbeat_waiting = False + self._lock_acquired = None + self._monitor = threading.Lock() + self._workers_locks = threading.Condition(self._monitor) + self._heartbeat_lock = threading.Condition(self._monitor) + self._get_thread_id = self._fetch_current_thread_functor() + + def acquire(self): + with self._monitor: + while self._lock_acquired: + self._workers_waiting += 1 + self._workers_locks.wait() + self._workers_waiting -= 1 + self._lock_acquired = self._get_thread_id() + + def heartbeat_acquire(self): + # NOTE(sileht): must be called only one time + with self._monitor: + while self._lock_acquired is not None: + self._heartbeat_waiting = True + self._heartbeat_lock.wait() + self._heartbeat_waiting = False + self._lock_acquired = self._get_thread_id() + + def release(self): + with self._monitor: + if self._lock_acquired is None: + raise RuntimeError("We can't release a not acquired lock") + thread_id = self._get_thread_id() + if self._lock_acquired != thread_id: + raise RuntimeError("We can't release lock acquired by another " + "thread/greenthread; %s vs %s" % + (self._lock_acquired, thread_id)) + self._lock_acquired = None + if self._heartbeat_waiting: + self._heartbeat_lock.notify() + elif self._workers_waiting > 0: + self._workers_locks.notify() + + @contextlib.contextmanager + def for_heartbeat(self): + self.heartbeat_acquire() + try: + yield + finally: + self.release() + + @staticmethod + def _fetch_current_thread_functor(): + # Until https://github.com/eventlet/eventlet/issues/172 is resolved + # or addressed we have to use complicated workaround to get a object + # that will not be recycled; the usage of threading.current_thread() + # doesn't appear to currently be monkey patched and therefore isn't + # reliable to use (and breaks badly when used as all threads share + # the same current_thread() object)... + try: + import eventlet + from eventlet import patcher + green_threaded = patcher.is_monkey_patched('thread') + except ImportError: + green_threaded = False + if green_threaded: + return lambda: eventlet.getcurrent() + else: + return lambda: threading.current_thread() + + class Connection(object): """Connection object.""" pools = {} - def __init__(self, conf, url): + def __init__(self, conf, url, purpose): self.consumers = [] self.consumer_num = itertools.count(1) self.conf = conf @@ -527,18 +645,47 @@ class Connection(object): self.do_consume = True self._consume_loop_stopped = False - self.channel = None + + # NOTE(sileht): if purpose is PURPOSE_LISTEN + # we don't need the lock because we don't + # have a heartbeat thread + if purpose == rpc_amqp.PURPOSE_SEND: + self._connection_lock = ConnectionLock() + else: + self._connection_lock = DummyConnectionLock() + self.connection = kombu.connection.Connection( self._url, ssl=self._fetch_ssl_params(), login_method=self._login_method, - failover_strategy="shuffle") + failover_strategy="shuffle", + heartbeat=self.driver_conf.heartbeat_timeout_threshold) LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)d'), self.connection.info()) + + # NOTE(sileht): kombu recommend to run heartbeat_check every + # seconds, but we use a lock around the kombu connection + # so, to not lock to much this lock to most of the time do nothing + # expected waiting the events drain, we start heartbeat_check and + # retreive the server heartbeat packet only two times more than + # the minimum required for the heartbeat works + # (heatbeat_timeout/heartbeat_rate/2.0, default kombu + # heartbeat_rate is 2) + self._heartbeat_wait_timeout = ( + float(self.driver_conf.heartbeat_timeout_threshold) / + float(self.driver_conf.heartbeat_rate) / 2.0) + self._heartbeat_support_log_emitted = False + # NOTE(sileht): just ensure the connection is setuped at startup - self.ensure(error_callback=None, - method=lambda: True) + self.ensure_connection() + + # NOTE(sileht): if purpose is PURPOSE_LISTEN + # the consume code does the heartbeat stuff + # we don't need a thread + if purpose == rpc_amqp.PURPOSE_SEND: + self._heartbeat_start() + LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'), self.connection.info()) @@ -602,6 +749,10 @@ class Connection(object): return ssl_params or True return False + def ensure_connection(self): + self.ensure(error_callback=None, + method=lambda: True) + def ensure(self, error_callback, method, retry=None, timeout_is_error=True): """Will retry up to retry number of times. @@ -609,6 +760,8 @@ class Connection(object): retry = -1 means to retry forever retry = 0 means no retry retry = N means N retries + + NOTE(sileht): Must be called within the connection lock """ current_pid = os.getpid() @@ -676,6 +829,7 @@ class Connection(object): recoverable_errors = (self.connection.recoverable_channel_errors + self.connection.recoverable_connection_errors) + try: autoretry_method = self.connection.autoretry( execute_method, channel=self.channel, @@ -703,12 +857,17 @@ class Connection(object): raise exceptions.MessageDeliveryFailure(msg) def _set_current_channel(self, new_channel): + """Change the channel to use. + + NOTE(sileht): Must be called within the connection lock + """ if self.channel is not None and new_channel != self.channel: self.connection.maybe_close_channel(self.channel) self.channel = new_channel def close(self): """Close/release this connection.""" + self._heartbeat_stop() if self.connection: self._set_current_channel(None) self.connection.release() @@ -716,10 +875,74 @@ class Connection(object): def reset(self): """Reset a connection so it can be used again.""" - self._set_current_channel(self.connection.channel()) + with self._connection_lock: + self._set_current_channel(self.connection.channel()) self.consumers = [] self.consumer_num = itertools.count(1) + def _heartbeat_supported_and_enabled(self): + if self.driver_conf.heartbeat_timeout_threshold <= 0: + return False + + if self.connection.supports_heartbeats: + return True + elif not self._heartbeat_support_log_emitted: + LOG.warn(_LW("Heartbeat support requested but it is not supported " + "by the kombu driver or the broker")) + self._heartbeat_support_log_emitted = True + return False + + def _heartbeat_start(self): + if self._heartbeat_supported_and_enabled(): + self._heartbeat_exit_event = threading.Event() + self._heartbeat_thread = threading.Thread( + target=self._heartbeat_thread_job) + self._heartbeat_thread.daemon = True + self._heartbeat_thread.start() + else: + self._heartbeat_thread = None + + def _heartbeat_stop(self): + if self._heartbeat_thread is not None: + self._heartbeat_exit_event.set() + self._heartbeat_thread.join() + self._heartbeat_thread = None + + def _heartbeat_thread_job(self): + """Thread that maintains inactive connections + """ + while not self._heartbeat_exit_event.is_set(): + with self._connection_lock.for_heartbeat(): + + recoverable_errors = ( + self.connection.recoverable_channel_errors + + self.connection.recoverable_connection_errors) + + try: + try: + self.connection.heartbeat_check( + rate=self.driver_conf.heartbeat_rate) + # NOTE(sileht): We need to drain event to receive + # heartbeat from the broker but don't hold the + # connection too much times. In amqpdriver a connection + # is used exclusivly for read or for write, so we have + # to do this for connection used for write drain_events + # already do that for other connection + try: + self.connection.drain_events(timeout=0.001) + except socket.timeout: + pass + except recoverable_errors as exc: + LOG.info(_LI("A recoverable connection/channel error " + "occurs, try to reconnect: %s"), exc) + except Exception: + LOG.exception(_LE("Unexpected error during heartbeart " + "thread processing, retrying...")) + + self._heartbeat_exit_event.wait( + timeout=self._heartbeat_wait_timeout) + self._heartbeat_exit_event.clear() + def declare_consumer(self, consumer_cls, topic, callback): """Create a Consumer using the class that was passed in and add it to our list of consumers @@ -736,10 +959,14 @@ class Connection(object): self.consumers.append(consumer) return consumer - return self.ensure(_connect_error, _declare_consumer) + with self._connection_lock: + return self.ensure(_connect_error, _declare_consumer) def iterconsume(self, limit=None, timeout=None): - """Return an iterator that will consume from all queues/consumers.""" + """Return an iterator that will consume from all queues/consumers. + + NOTE(sileht): Must be called within the connection lock + """ timer = rpc_common.DecayingTimer(duration=timeout) timer.start() @@ -770,6 +997,9 @@ class Connection(object): self._consume_loop_stopped = False raise StopIteration + if self._heartbeat_supported_and_enabled(): + self.connection.heartbeat_check( + rate=self.driver_conf.heartbeat_rate) try: return self.connection.drain_events(timeout=poll_timeout) except socket.timeout as exc: @@ -795,7 +1025,8 @@ class Connection(object): **kwargs) publisher.send(msg, timeout) - self.ensure(_error_callback, _publish, retry=retry) + with self._connection_lock: + self.ensure(_error_callback, _publish, retry=retry) def declare_direct_consumer(self, topic, callback): """Create a 'direct' queue. @@ -861,12 +1092,13 @@ class Connection(object): def consume(self, limit=None, timeout=None): """Consume from all queues/consumers.""" - it = self.iterconsume(limit=limit, timeout=timeout) - while True: - try: - six.next(it) - except StopIteration: - return + with self._connection_lock: + it = self.iterconsume(limit=limit, timeout=timeout) + while True: + try: + six.next(it) + except StopIteration: + return def stop_consuming(self): self._consume_loop_stopped = True diff --git a/oslo_messaging/tests/drivers/test_impl_qpid.py b/oslo_messaging/tests/drivers/test_impl_qpid.py index 2d7dd6a07..e39f72a8a 100644 --- a/oslo_messaging/tests/drivers/test_impl_qpid.py +++ b/oslo_messaging/tests/drivers/test_impl_qpid.py @@ -27,6 +27,7 @@ import testscenarios import testtools import oslo_messaging +from oslo_messaging._drivers import amqp from oslo_messaging._drivers import impl_qpid as qpid_driver from oslo_messaging.tests import utils as test_utils @@ -564,7 +565,8 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase): with mock.patch('qpid.messaging.Connection') as conn_mock: # starting from the first broker in the list url = oslo_messaging.TransportURL.parse(self.conf, None) - connection = qpid_driver.Connection(self.conf, url) + connection = qpid_driver.Connection(self.conf, url, + amqp.PURPOSE_SEND) # reconnect will advance to the next broker, one broker per # attempt, and then wrap to the start of the list once the end is @@ -806,7 +808,8 @@ class QPidHATestCase(test_utils.BaseTestCase): # starting from the first broker in the list url = oslo_messaging.TransportURL.parse(self.conf, None) - self.connection = qpid_driver.Connection(self.conf, url) + self.connection = qpid_driver.Connection(self.conf, url, + amqp.PURPOSE_SEND) self.addCleanup(self.connection.close) self.info.update({'attempt': 0, diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index df0f3b3df..b4e691de2 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -21,6 +21,7 @@ import uuid import fixtures import kombu +import kombu.transport.memory import mock from oslo_config import cfg from oslo_serialization import jsonutils @@ -28,6 +29,7 @@ from oslotest import mockpatch import testscenarios import oslo_messaging +from oslo_messaging._drivers import amqp from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import common as driver_common from oslo_messaging._drivers import impl_rabbit as rabbit_driver @@ -54,6 +56,52 @@ class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase): self.assertEqual('memory:////', url) +class TestHeartbeat(test_utils.BaseTestCase): + + @mock.patch('oslo_messaging._drivers.impl_rabbit.LOG') + @mock.patch('kombu.connection.Connection.heartbeat_check') + @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.' + '_heartbeat_supported_and_enabled', return_value=True) + def _do_test_heartbeat_sent(self, fake_heartbeat_support, fake_heartbeat, + fake_logger, heartbeat_side_effect=None, + info=None): + + event = threading.Event() + + def heartbeat_check(rate=2): + event.set() + if heartbeat_side_effect: + raise heartbeat_side_effect + + fake_heartbeat.side_effect = heartbeat_check + + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + self.addCleanup(transport.cleanup) + conn = transport._driver._get_connection() + event.wait() + conn._heartbeat_stop() + + # check heartbeat have been called + self.assertLess(0, fake_heartbeat.call_count) + + if not heartbeat_side_effect: + self.assertEqual(2, fake_logger.info.call_count) + else: + self.assertEqual(3, fake_logger.info.call_count) + self.assertIn(mock.call(info, mock.ANY), + fake_logger.info.mock_calls) + + def test_test_heartbeat_sent_default(self): + self._do_test_heartbeat_sent() + + def test_test_heartbeat_sent_connection_fail(self): + self._do_test_heartbeat_sent( + heartbeat_side_effect=kombu.exceptions.ConnectionError, + info='A recoverable connection/channel error occurs, ' + 'try to reconnect: %s') + + class TestRabbitDriverLoad(test_utils.BaseTestCase): scenarios = [ @@ -68,6 +116,8 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase): @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure') @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset') def test_driver_load(self, fake_ensure, fake_reset): + self.config(heartbeat_timeout_threshold=0, + group='oslo_messaging_rabbit') self.messaging_conf.transport_driver = self.transport_driver transport = oslo_messaging.get_transport(self.conf) self.addCleanup(transport.cleanup) @@ -107,8 +157,8 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase): transport._driver._get_connection() connection_klass.assert_called_once_with( - 'memory:///', ssl=self.expected, - login_method='AMQPLAIN', failover_strategy="shuffle") + 'memory:///', ssl=self.expected, login_method='AMQPLAIN', + heartbeat=60, failover_strategy="shuffle") class TestRabbitIterconsume(test_utils.BaseTestCase): @@ -118,7 +168,7 @@ class TestRabbitIterconsume(test_utils.BaseTestCase): 'kombu+memory:////') self.addCleanup(transport.cleanup) deadline = time.time() + 3 - with transport._driver._get_connection() as conn: + with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn: conn.iterconsume(timeout=3) # kombu memory transport doesn't really raise error # so just simulate a real driver behavior @@ -170,10 +220,12 @@ class TestRabbitTransportURL(test_utils.BaseTestCase): def setUp(self): super(TestRabbitTransportURL, self).setUp() self.messaging_conf.transport_driver = 'rabbit' + self.config(heartbeat_timeout_threshold=0, + group='oslo_messaging_rabbit') @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure') @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset') - def test_transport_url(self, fake_ensure_connection, fake_reset): + def test_transport_url(self, fake_reset, fake_ensure): transport = oslo_messaging.get_transport(self.conf, self.url) self.addCleanup(transport.cleanup) driver = transport._driver @@ -223,6 +275,8 @@ class TestSendReceive(test_utils.BaseTestCase): cls._timeout) def test_send_receive(self): + self.config(heartbeat_timeout_threshold=0, + group="oslo_messaging_rabbit") transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') self.addCleanup(transport.cleanup) @@ -708,6 +762,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase): rabbit_retry_interval=0.01, rabbit_retry_backoff=0.01, kombu_reconnect_delay=0, + heartbeat_timeout_threshold=0, group="oslo_messaging_rabbit") self.kombu_connect = mock.Mock() @@ -719,7 +774,8 @@ class RpcKombuHATestCase(test_utils.BaseTestCase): # starting from the first broker in the list url = oslo_messaging.TransportURL.parse(self.conf, None) - self.connection = rabbit_driver.Connection(self.conf, url) + self.connection = rabbit_driver.Connection(self.conf, url, + amqp.PURPOSE_SEND) self.addCleanup(self.connection.close) def test_ensure_four_retry(self): @@ -745,3 +801,59 @@ class RpcKombuHATestCase(test_utils.BaseTestCase): retry=0) self.assertEqual(1, self.kombu_connect.call_count) self.assertEqual(2, mock_callback.call_count) + + +class ConnectionLockTestCase(test_utils.BaseTestCase): + def _thread(self, lock, sleep, heartbeat=False): + def thread_task(): + if heartbeat: + with lock.for_heartbeat(): + time.sleep(sleep) + else: + with lock: + time.sleep(sleep) + + t = threading.Thread(target=thread_task) + t.daemon = True + t.start() + start = time.time() + + def get_elapsed_time(): + t.join() + return time.time() - start + + return get_elapsed_time + + def test_workers_only(self): + l = rabbit_driver.ConnectionLock() + t1 = self._thread(l, 1) + t2 = self._thread(l, 1) + self.assertAlmostEqual(1, t1(), places=1) + self.assertAlmostEqual(2, t2(), places=1) + + def test_worker_and_heartbeat(self): + l = rabbit_driver.ConnectionLock() + t1 = self._thread(l, 1) + t2 = self._thread(l, 1, heartbeat=True) + self.assertAlmostEqual(1, t1(), places=1) + self.assertAlmostEqual(2, t2(), places=1) + + def test_workers_and_heartbeat(self): + l = rabbit_driver.ConnectionLock() + t1 = self._thread(l, 1) + t2 = self._thread(l, 1) + t3 = self._thread(l, 1) + t4 = self._thread(l, 1, heartbeat=True) + t5 = self._thread(l, 1) + self.assertAlmostEqual(1, t1(), places=1) + self.assertAlmostEqual(2, t4(), places=1) + self.assertAlmostEqual(3, t2(), places=1) + self.assertAlmostEqual(4, t3(), places=1) + self.assertAlmostEqual(5, t5(), places=1) + + def test_heartbeat(self): + l = rabbit_driver.ConnectionLock() + t1 = self._thread(l, 1, heartbeat=True) + t2 = self._thread(l, 1) + self.assertAlmostEqual(1, t1(), places=1) + self.assertAlmostEqual(2, t2(), places=1) diff --git a/tests/drivers/test_impl_qpid.py b/tests/drivers/test_impl_qpid.py index d8cd1e7bc..2c2c0a50c 100644 --- a/tests/drivers/test_impl_qpid.py +++ b/tests/drivers/test_impl_qpid.py @@ -27,6 +27,7 @@ import testscenarios import testtools from oslo import messaging +from oslo_messaging._drivers import amqp from oslo_messaging._drivers import impl_qpid as qpid_driver from oslo_messaging.tests import utils as test_utils @@ -564,7 +565,8 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase): with mock.patch('qpid.messaging.Connection') as conn_mock: # starting from the first broker in the list url = messaging.TransportURL.parse(self.conf, None) - connection = qpid_driver.Connection(self.conf, url) + connection = qpid_driver.Connection(self.conf, url, + amqp.PURPOSE_SEND) # reconnect will advance to the next broker, one broker per # attempt, and then wrap to the start of the list once the end is @@ -806,7 +808,8 @@ class QPidHATestCase(test_utils.BaseTestCase): # starting from the first broker in the list url = messaging.TransportURL.parse(self.conf, None) - self.connection = qpid_driver.Connection(self.conf, url) + self.connection = qpid_driver.Connection(self.conf, url, + amqp.PURPOSE_SEND) self.addCleanup(self.connection.close) self.info.update({'attempt': 0, diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py index 8e9b29e43..783afd855 100644 --- a/tests/drivers/test_impl_rabbit.py +++ b/tests/drivers/test_impl_rabbit.py @@ -27,6 +27,7 @@ import testscenarios from oslo.config import cfg from oslo import messaging from oslo.serialization import jsonutils +from oslo_messaging._drivers import amqp from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import common as driver_common from oslo_messaging._drivers import impl_rabbit as rabbit_driver @@ -44,6 +45,8 @@ class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase): self.config(fake_rabbit=True, group="oslo_messaging_rabbit") def test_driver_load(self): + self.config(heartbeat_timeout_threshold=0, + group='oslo_messaging_rabbit') transport = messaging.get_transport(self.conf) self.addCleanup(transport.cleanup) driver = transport._driver @@ -67,6 +70,8 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase): @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure') @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset') def test_driver_load(self, fake_ensure, fake_reset): + self.config(heartbeat_timeout_threshold=0, + group='oslo_messaging_rabbit') self.messaging_conf.transport_driver = self.transport_driver transport = messaging.get_transport(self.conf) self.addCleanup(transport.cleanup) @@ -83,7 +88,7 @@ class TestRabbitIterconsume(test_utils.BaseTestCase): transport = messaging.get_transport(self.conf, 'kombu+memory:////') self.addCleanup(transport.cleanup) deadline = time.time() + 3 - with transport._driver._get_connection() as conn: + with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn: conn.iterconsume(timeout=3) # kombu memory transport doesn't really raise error # so just simulate a real driver behavior @@ -140,6 +145,8 @@ class TestRabbitTransportURL(test_utils.BaseTestCase): def setUp(self): super(TestRabbitTransportURL, self).setUp() + self.config(heartbeat_timeout_threshold=0, + group='oslo_messaging_rabbit') self.messaging_conf.transport_driver = 'rabbit' @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure') @@ -200,6 +207,8 @@ class TestSendReceive(test_utils.BaseTestCase): cls._timeout) def test_send_receive(self): + self.config(heartbeat_timeout_threshold=0, + group='oslo_messaging_rabbit') transport = messaging.get_transport(self.conf, 'kombu+memory:////') self.addCleanup(transport.cleanup) @@ -710,7 +719,8 @@ class RpcKombuHATestCase(test_utils.BaseTestCase): # starting from the first broker in the list url = messaging.TransportURL.parse(self.conf, None) - self.connection = rabbit_driver.Connection(self.conf, url) + self.connection = rabbit_driver.Connection(self.conf, url, + amqp.PURPOSE_SEND) self.addCleanup(self.connection.close) def test_ensure_four_retry(self):