RabbitMQ heartbeats for consumer threads
Sometimes network connection between agent and RabbitMQ becomes corrupted without agent being aware. Usually it happen due to some networking reconfiguration. For example changes to iptables. Because consumer connection only reads from the socket it cannot detect broken connection and will wait for the message forever. This commit enables periodic heartbeats that agent sends to the server. If RabbitMQ doesn't receive heartbeats for some time it disconnects the client. On the other hand because heartbeat is a write operation it lets the client to detect broken connection as well. Change-Id: Id326f8d3b6dce04d83472837e11342efa8eb32fe Closes-Bug: #1583007 Partially-Implements: blueprint rabbitmq-heartbeats
This commit is contained in:
parent
119e00e085
commit
3f7c4fc672
@ -125,7 +125,6 @@ class MuranoAgent(service.Service):
|
||||
|
||||
def _wait_plan(self):
|
||||
delay = 5
|
||||
reconnect = False
|
||||
while True:
|
||||
try:
|
||||
with self._create_rmq_client() as mq:
|
||||
@ -140,10 +139,6 @@ class MuranoAgent(service.Service):
|
||||
if msg is not None:
|
||||
msg.ack()
|
||||
yield
|
||||
reconnect = True
|
||||
elif reconnect:
|
||||
reconnect = False
|
||||
break
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
except Exception:
|
||||
|
@ -28,6 +28,10 @@ if os.path.exists(os.path.join(possible_topdir,
|
||||
'__init__.py')):
|
||||
sys.path.insert(0, possible_topdir)
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import service
|
||||
|
@ -13,11 +13,13 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import anyjson
|
||||
import random
|
||||
import ssl as ssl_module
|
||||
|
||||
from eventlet import patcher
|
||||
kombu = patcher.import_patched('kombu')
|
||||
import anyjson
|
||||
import eventlet
|
||||
import kombu
|
||||
|
||||
from subscription import Subscription
|
||||
|
||||
|
||||
@ -38,6 +40,12 @@ class MqClient(object):
|
||||
'cert_reqs': cert_reqs
|
||||
}
|
||||
|
||||
# Time interval after which RabbitMQ will disconnect client if no
|
||||
# heartbeats were received. Usually client sends 2 heartbeats during
|
||||
# this interval. Using random to make it less lucky that many agents
|
||||
# ping RabbitMQ simultaneously
|
||||
heartbeat_rate = 20 + 20 * random.random()
|
||||
|
||||
self._connection = kombu.Connection(
|
||||
'amqp://{0}:{1}@{2}:{3}/{4}'.format(
|
||||
login,
|
||||
@ -45,29 +53,52 @@ class MqClient(object):
|
||||
host,
|
||||
port,
|
||||
virtual_host
|
||||
), ssl=ssl_params
|
||||
), ssl=ssl_params, heartbeat=heartbeat_rate
|
||||
)
|
||||
self._channel = None
|
||||
self._connected = False
|
||||
self._exception = None
|
||||
|
||||
def __enter__(self):
|
||||
self.connect()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.close()
|
||||
if exc_type:
|
||||
self._connected = False
|
||||
else:
|
||||
self.close()
|
||||
return False
|
||||
|
||||
def connect(self):
|
||||
self._connection.connect()
|
||||
self._channel = self._connection.channel()
|
||||
self._connected = True
|
||||
if not self._connected:
|
||||
self._connected = True
|
||||
eventlet.spawn(self._heartbeater)
|
||||
|
||||
def close(self):
|
||||
self._connection.close()
|
||||
self._connected = False
|
||||
if self._connected:
|
||||
self._connection.close()
|
||||
self._connected = False
|
||||
|
||||
def _check_exception(self):
|
||||
ex = self._exception
|
||||
if ex:
|
||||
self._exception = None
|
||||
raise ex
|
||||
|
||||
def _heartbeater(self):
|
||||
while self._connected:
|
||||
eventlet.sleep(1)
|
||||
try:
|
||||
self._connection.heartbeat_check()
|
||||
except Exception as ex:
|
||||
self._exception = ex
|
||||
self._connected = False
|
||||
|
||||
def declare(self, queue, exchange='', enable_ha=False, ttl=0):
|
||||
self._check_exception()
|
||||
if not self._connected:
|
||||
raise RuntimeError('Not connected to RabbitMQ')
|
||||
|
||||
@ -84,12 +115,13 @@ class MqClient(object):
|
||||
queue_arguments['x-expires'] = ttl
|
||||
|
||||
exchange = kombu.Exchange(exchange, type='direct', durable=True)
|
||||
queue = kombu.Queue(queue, exchange, queue, durable=True,
|
||||
queue = kombu.Queue(queue, exchange, queue, durable=False,
|
||||
queue_arguments=queue_arguments)
|
||||
bound_queue = queue(self._connection)
|
||||
bound_queue.declare()
|
||||
|
||||
def send(self, message, key, exchange=''):
|
||||
self._check_exception()
|
||||
if not self._connected:
|
||||
raise RuntimeError('Not connected to RabbitMQ')
|
||||
|
||||
@ -102,7 +134,9 @@ class MqClient(object):
|
||||
)
|
||||
|
||||
def open(self, queue, prefetch_count=1):
|
||||
self._check_exception()
|
||||
if not self._connected:
|
||||
raise RuntimeError('Not connected to RabbitMQ')
|
||||
|
||||
return Subscription(self._connection, queue, prefetch_count)
|
||||
return Subscription(self._connection, queue, prefetch_count,
|
||||
self._check_exception)
|
||||
|
@ -17,19 +17,19 @@ import collections
|
||||
import socket
|
||||
import time
|
||||
|
||||
from eventlet import patcher
|
||||
kombu = patcher.import_patched('kombu')
|
||||
import kombu
|
||||
from muranoagent.common.messaging import message
|
||||
|
||||
|
||||
class Subscription(object):
|
||||
def __init__(self, connection, queue, prefetch_count=1):
|
||||
def __init__(self, connection, queue, prefetch_count, exception_func):
|
||||
self._buffer = collections.deque()
|
||||
self._connection = connection
|
||||
self._queue = kombu.Queue(name=queue, exchange=None)
|
||||
self._consumer = kombu.Consumer(self._connection, auto_declare=False)
|
||||
self._consumer.register_callback(self._receive)
|
||||
self._consumer.qos(prefetch_count=prefetch_count)
|
||||
self._check_exception = exception_func
|
||||
|
||||
def __enter__(self):
|
||||
self._consumer.add_queue(self._queue)
|
||||
@ -37,17 +37,19 @@ class Subscription(object):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if self._consumer is not None:
|
||||
if self._consumer is not None and not exc_type:
|
||||
self._consumer.cancel()
|
||||
return False
|
||||
|
||||
def get_message(self, timeout=None):
|
||||
self._check_exception()
|
||||
msg_handle = self._get(timeout=timeout)
|
||||
if msg_handle is None:
|
||||
return None
|
||||
return message.Message(self._connection, msg_handle)
|
||||
|
||||
def _get(self, timeout=None):
|
||||
self._check_exception()
|
||||
elapsed = 0.0
|
||||
remaining = timeout
|
||||
while True:
|
||||
@ -62,4 +64,5 @@ class Subscription(object):
|
||||
remaining = timeout and timeout - elapsed or None
|
||||
|
||||
def _receive(self, message_data, message):
|
||||
self._check_exception()
|
||||
self._buffer.append(message)
|
||||
|
Loading…
Reference in New Issue
Block a user