RabbitMQ heartbeats for consumer threads

Sometimes network connection between agent and RabbitMQ
becomes corrupted without agent being aware. Usually
it happen due to some networking reconfiguration. For example
changes to iptables. Because consumer connection only reads from
the socket it cannot detect broken connection and will wait
for the message forever.

This commit enables periodic heartbeats that agent sends to the
server. If RabbitMQ doesn't receive heartbeats for some time it
disconnects the client. On the other hand because heartbeat
is a write operation it lets the client to detect broken connection
as well.

Change-Id: Id326f8d3b6dce04d83472837e11342efa8eb32fe
Closes-Bug: #1583007
Partially-Implements: blueprint rabbitmq-heartbeats
This commit is contained in:
Stan Lagun 2016-05-16 16:04:26 -07:00
parent 2e4c3c944e
commit 7ef2eee2c3
4 changed files with 55 additions and 19 deletions

View File

@ -125,7 +125,6 @@ class MuranoAgent(service.Service):
def _wait_plan(self):
delay = 5
reconnect = False
while True:
try:
with self._create_rmq_client() as mq:
@ -140,10 +139,6 @@ class MuranoAgent(service.Service):
if msg is not None:
msg.ack()
yield
reconnect = True
elif reconnect:
reconnect = False
break
except KeyboardInterrupt:
break
except Exception:

View File

@ -28,6 +28,10 @@ if os.path.exists(os.path.join(possible_topdir,
'__init__.py')):
sys.path.insert(0, possible_topdir)
import eventlet
eventlet.monkey_patch()
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import service

View File

@ -13,11 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import anyjson
import random
import ssl as ssl_module
from eventlet import patcher
kombu = patcher.import_patched('kombu')
import anyjson
import eventlet
import kombu
from subscription import Subscription
@ -38,6 +40,12 @@ class MqClient(object):
'cert_reqs': cert_reqs
}
# Time interval after which RabbitMQ will disconnect client if no
# heartbeats were received. Usually client sends 2 heartbeats during
# this interval. Using random to make it less lucky that many agents
# ping RabbitMQ simultaneously
heartbeat_rate = 20 + 20 * random.random()
self._connection = kombu.Connection(
'amqp://{0}:{1}@{2}:{3}/{4}'.format(
login,
@ -45,29 +53,52 @@ class MqClient(object):
host,
port,
virtual_host
), ssl=ssl_params
), ssl=ssl_params, heartbeat=heartbeat_rate
)
self._channel = None
self._connected = False
self._exception = None
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
if exc_type:
self._connected = False
else:
self.close()
return False
def connect(self):
self._connection.connect()
self._channel = self._connection.channel()
self._connected = True
if not self._connected:
self._connected = True
eventlet.spawn(self._heartbeater)
def close(self):
self._connection.close()
self._connected = False
if self._connected:
self._connection.close()
self._connected = False
def _check_exception(self):
ex = self._exception
if ex:
self._exception = None
raise ex
def _heartbeater(self):
while self._connected:
eventlet.sleep(1)
try:
self._connection.heartbeat_check()
except Exception as ex:
self._exception = ex
self._connected = False
def declare(self, queue, exchange='', enable_ha=False, ttl=0):
self._check_exception()
if not self._connected:
raise RuntimeError('Not connected to RabbitMQ')
@ -84,12 +115,13 @@ class MqClient(object):
queue_arguments['x-expires'] = ttl
exchange = kombu.Exchange(exchange, type='direct', durable=True)
queue = kombu.Queue(queue, exchange, queue, durable=True,
queue = kombu.Queue(queue, exchange, queue, durable=False,
queue_arguments=queue_arguments)
bound_queue = queue(self._connection)
bound_queue.declare()
def send(self, message, key, exchange=''):
self._check_exception()
if not self._connected:
raise RuntimeError('Not connected to RabbitMQ')
@ -102,7 +134,9 @@ class MqClient(object):
)
def open(self, queue, prefetch_count=1):
self._check_exception()
if not self._connected:
raise RuntimeError('Not connected to RabbitMQ')
return Subscription(self._connection, queue, prefetch_count)
return Subscription(self._connection, queue, prefetch_count,
self._check_exception)

View File

@ -17,19 +17,19 @@ import collections
import socket
import time
from eventlet import patcher
kombu = patcher.import_patched('kombu')
import kombu
from muranoagent.common.messaging import message
class Subscription(object):
def __init__(self, connection, queue, prefetch_count=1):
def __init__(self, connection, queue, prefetch_count, exception_func):
self._buffer = collections.deque()
self._connection = connection
self._queue = kombu.Queue(name=queue, exchange=None)
self._consumer = kombu.Consumer(self._connection, auto_declare=False)
self._consumer.register_callback(self._receive)
self._consumer.qos(prefetch_count=prefetch_count)
self._check_exception = exception_func
def __enter__(self):
self._consumer.add_queue(self._queue)
@ -37,17 +37,19 @@ class Subscription(object):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._consumer is not None:
if self._consumer is not None and not exc_type:
self._consumer.cancel()
return False
def get_message(self, timeout=None):
self._check_exception()
msg_handle = self._get(timeout=timeout)
if msg_handle is None:
return None
return message.Message(self._connection, msg_handle)
def _get(self, timeout=None):
self._check_exception()
elapsed = 0.0
remaining = timeout
while True:
@ -62,4 +64,5 @@ class Subscription(object):
remaining = timeout and timeout - elapsed or None
def _receive(self, message_data, message):
self._check_exception()
self._buffer.append(message)