From 7ef2eee2c3e4eb87f32540bbf405b720430fb1fa Mon Sep 17 00:00:00 2001 From: Stan Lagun Date: Mon, 16 May 2016 16:04:26 -0700 Subject: [PATCH] RabbitMQ heartbeats for consumer threads Sometimes network connection between agent and RabbitMQ becomes corrupted without agent being aware. Usually it happen due to some networking reconfiguration. For example changes to iptables. Because consumer connection only reads from the socket it cannot detect broken connection and will wait for the message forever. This commit enables periodic heartbeats that agent sends to the server. If RabbitMQ doesn't receive heartbeats for some time it disconnects the client. On the other hand because heartbeat is a write operation it lets the client to detect broken connection as well. Change-Id: Id326f8d3b6dce04d83472837e11342efa8eb32fe Closes-Bug: #1583007 Partially-Implements: blueprint rabbitmq-heartbeats --- muranoagent/app.py | 5 -- muranoagent/cmd/run.py | 4 ++ muranoagent/common/messaging/mqclient.py | 54 ++++++++++++++++---- muranoagent/common/messaging/subscription.py | 11 ++-- 4 files changed, 55 insertions(+), 19 deletions(-) diff --git a/muranoagent/app.py b/muranoagent/app.py index 0dd67747..b12499cf 100644 --- a/muranoagent/app.py +++ b/muranoagent/app.py @@ -125,7 +125,6 @@ class MuranoAgent(service.Service): def _wait_plan(self): delay = 5 - reconnect = False while True: try: with self._create_rmq_client() as mq: @@ -140,10 +139,6 @@ class MuranoAgent(service.Service): if msg is not None: msg.ack() yield - reconnect = True - elif reconnect: - reconnect = False - break except KeyboardInterrupt: break except Exception: diff --git a/muranoagent/cmd/run.py b/muranoagent/cmd/run.py index f4775aad..a321a42e 100644 --- a/muranoagent/cmd/run.py +++ b/muranoagent/cmd/run.py @@ -28,6 +28,10 @@ if os.path.exists(os.path.join(possible_topdir, '__init__.py')): sys.path.insert(0, possible_topdir) +import eventlet +eventlet.monkey_patch() + + from oslo_config import cfg from oslo_log import log as logging from oslo_service import service diff --git a/muranoagent/common/messaging/mqclient.py b/muranoagent/common/messaging/mqclient.py index 3f253dc8..ed51a089 100644 --- a/muranoagent/common/messaging/mqclient.py +++ b/muranoagent/common/messaging/mqclient.py @@ -13,11 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import anyjson +import random import ssl as ssl_module -from eventlet import patcher -kombu = patcher.import_patched('kombu') +import anyjson +import eventlet +import kombu + from subscription import Subscription @@ -38,6 +40,12 @@ class MqClient(object): 'cert_reqs': cert_reqs } + # Time interval after which RabbitMQ will disconnect client if no + # heartbeats were received. Usually client sends 2 heartbeats during + # this interval. Using random to make it less lucky that many agents + # ping RabbitMQ simultaneously + heartbeat_rate = 20 + 20 * random.random() + self._connection = kombu.Connection( 'amqp://{0}:{1}@{2}:{3}/{4}'.format( login, @@ -45,29 +53,52 @@ class MqClient(object): host, port, virtual_host - ), ssl=ssl_params + ), ssl=ssl_params, heartbeat=heartbeat_rate ) self._channel = None self._connected = False + self._exception = None def __enter__(self): self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): - self.close() + if exc_type: + self._connected = False + else: + self.close() return False def connect(self): self._connection.connect() self._channel = self._connection.channel() - self._connected = True + if not self._connected: + self._connected = True + eventlet.spawn(self._heartbeater) def close(self): - self._connection.close() - self._connected = False + if self._connected: + self._connection.close() + self._connected = False + + def _check_exception(self): + ex = self._exception + if ex: + self._exception = None + raise ex + + def _heartbeater(self): + while self._connected: + eventlet.sleep(1) + try: + self._connection.heartbeat_check() + except Exception as ex: + self._exception = ex + self._connected = False def declare(self, queue, exchange='', enable_ha=False, ttl=0): + self._check_exception() if not self._connected: raise RuntimeError('Not connected to RabbitMQ') @@ -84,12 +115,13 @@ class MqClient(object): queue_arguments['x-expires'] = ttl exchange = kombu.Exchange(exchange, type='direct', durable=True) - queue = kombu.Queue(queue, exchange, queue, durable=True, + queue = kombu.Queue(queue, exchange, queue, durable=False, queue_arguments=queue_arguments) bound_queue = queue(self._connection) bound_queue.declare() def send(self, message, key, exchange=''): + self._check_exception() if not self._connected: raise RuntimeError('Not connected to RabbitMQ') @@ -102,7 +134,9 @@ class MqClient(object): ) def open(self, queue, prefetch_count=1): + self._check_exception() if not self._connected: raise RuntimeError('Not connected to RabbitMQ') - return Subscription(self._connection, queue, prefetch_count) + return Subscription(self._connection, queue, prefetch_count, + self._check_exception) diff --git a/muranoagent/common/messaging/subscription.py b/muranoagent/common/messaging/subscription.py index c9e4d0bc..be7c9d16 100644 --- a/muranoagent/common/messaging/subscription.py +++ b/muranoagent/common/messaging/subscription.py @@ -17,19 +17,19 @@ import collections import socket import time -from eventlet import patcher -kombu = patcher.import_patched('kombu') +import kombu from muranoagent.common.messaging import message class Subscription(object): - def __init__(self, connection, queue, prefetch_count=1): + def __init__(self, connection, queue, prefetch_count, exception_func): self._buffer = collections.deque() self._connection = connection self._queue = kombu.Queue(name=queue, exchange=None) self._consumer = kombu.Consumer(self._connection, auto_declare=False) self._consumer.register_callback(self._receive) self._consumer.qos(prefetch_count=prefetch_count) + self._check_exception = exception_func def __enter__(self): self._consumer.add_queue(self._queue) @@ -37,17 +37,19 @@ class Subscription(object): return self def __exit__(self, exc_type, exc_val, exc_tb): - if self._consumer is not None: + if self._consumer is not None and not exc_type: self._consumer.cancel() return False def get_message(self, timeout=None): + self._check_exception() msg_handle = self._get(timeout=timeout) if msg_handle is None: return None return message.Message(self._connection, msg_handle) def _get(self, timeout=None): + self._check_exception() elapsed = 0.0 remaining = timeout while True: @@ -62,4 +64,5 @@ class Subscription(object): remaining = timeout and timeout - elapsed or None def _receive(self, message_data, message): + self._check_exception() self._buffer.append(message)