Merge "RabbitMQ heartbeats for consumer threads"

This commit is contained in:
Jenkins 2016-05-26 14:14:37 +00:00 committed by Gerrit Code Review
commit 0dc412399f
4 changed files with 55 additions and 19 deletions

View File

@ -125,7 +125,6 @@ class MuranoAgent(service.Service):
def _wait_plan(self):
delay = 5
reconnect = False
while True:
try:
with self._create_rmq_client() as mq:
@ -140,10 +139,6 @@ class MuranoAgent(service.Service):
if msg is not None:
msg.ack()
yield
reconnect = True
elif reconnect:
reconnect = False
break
except KeyboardInterrupt:
break
except Exception:

View File

@ -28,6 +28,10 @@ if os.path.exists(os.path.join(possible_topdir,
'__init__.py')):
sys.path.insert(0, possible_topdir)
import eventlet
eventlet.monkey_patch()
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import service

View File

@ -13,11 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import anyjson
import random
import ssl as ssl_module
from eventlet import patcher
kombu = patcher.import_patched('kombu')
import anyjson
import eventlet
import kombu
from subscription import Subscription
@ -38,6 +40,12 @@ class MqClient(object):
'cert_reqs': cert_reqs
}
# Time interval after which RabbitMQ will disconnect client if no
# heartbeats were received. Usually client sends 2 heartbeats during
# this interval. Using random to make it less lucky that many agents
# ping RabbitMQ simultaneously
heartbeat_rate = 20 + 20 * random.random()
self._connection = kombu.Connection(
'amqp://{0}:{1}@{2}:{3}/{4}'.format(
login,
@ -45,29 +53,52 @@ class MqClient(object):
host,
port,
virtual_host
), ssl=ssl_params
), ssl=ssl_params, heartbeat=heartbeat_rate
)
self._channel = None
self._connected = False
self._exception = None
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
if exc_type:
self._connected = False
else:
self.close()
return False
def connect(self):
self._connection.connect()
self._channel = self._connection.channel()
self._connected = True
if not self._connected:
self._connected = True
eventlet.spawn(self._heartbeater)
def close(self):
self._connection.close()
self._connected = False
if self._connected:
self._connection.close()
self._connected = False
def _check_exception(self):
ex = self._exception
if ex:
self._exception = None
raise ex
def _heartbeater(self):
while self._connected:
eventlet.sleep(1)
try:
self._connection.heartbeat_check()
except Exception as ex:
self._exception = ex
self._connected = False
def declare(self, queue, exchange='', enable_ha=False, ttl=0):
self._check_exception()
if not self._connected:
raise RuntimeError('Not connected to RabbitMQ')
@ -84,12 +115,13 @@ class MqClient(object):
queue_arguments['x-expires'] = ttl
exchange = kombu.Exchange(exchange, type='direct', durable=True)
queue = kombu.Queue(queue, exchange, queue, durable=True,
queue = kombu.Queue(queue, exchange, queue, durable=False,
queue_arguments=queue_arguments)
bound_queue = queue(self._connection)
bound_queue.declare()
def send(self, message, key, exchange=''):
self._check_exception()
if not self._connected:
raise RuntimeError('Not connected to RabbitMQ')
@ -102,7 +134,9 @@ class MqClient(object):
)
def open(self, queue, prefetch_count=1):
self._check_exception()
if not self._connected:
raise RuntimeError('Not connected to RabbitMQ')
return Subscription(self._connection, queue, prefetch_count)
return Subscription(self._connection, queue, prefetch_count,
self._check_exception)

View File

@ -17,19 +17,19 @@ import collections
import socket
import time
from eventlet import patcher
kombu = patcher.import_patched('kombu')
import kombu
from muranoagent.common.messaging import message
class Subscription(object):
def __init__(self, connection, queue, prefetch_count=1):
def __init__(self, connection, queue, prefetch_count, exception_func):
self._buffer = collections.deque()
self._connection = connection
self._queue = kombu.Queue(name=queue, exchange=None)
self._consumer = kombu.Consumer(self._connection, auto_declare=False)
self._consumer.register_callback(self._receive)
self._consumer.qos(prefetch_count=prefetch_count)
self._check_exception = exception_func
def __enter__(self):
self._consumer.add_queue(self._queue)
@ -37,17 +37,19 @@ class Subscription(object):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._consumer is not None:
if self._consumer is not None and not exc_type:
self._consumer.cancel()
return False
def get_message(self, timeout=None):
self._check_exception()
msg_handle = self._get(timeout=timeout)
if msg_handle is None:
return None
return message.Message(self._connection, msg_handle)
def _get(self, timeout=None):
self._check_exception()
elapsed = 0.0
remaining = timeout
while True:
@ -62,4 +64,5 @@ class Subscription(object):
remaining = timeout and timeout - elapsed or None
def _receive(self, message_data, message):
self._check_exception()
self._buffer.append(message)