Fixes and improvements after testing on RabbitMQ cluster:

1) adds tcp_user_timeout parameter - timiout for unacked tcp pockets
2) adds host_connection_reconnect_delay parameter - delay for
reconnection to some host if error occurs during connection. 
It allows to use other hosts if we have some host disconnected
3) adds rpc_listener_ack and rpc_listener_prefetch_count properties -
enable consumer acknowledges and set maximum number of unacknowledged
messages
4) fixes time units (in oslo.messaging it is seconds but in RabbitMQ -
milliseconds)

Change-Id: Ifd549a1eebeef27a3d36ceb6d3e8b1c76ea00b65
This commit is contained in:
Dmitriy Ukhlov 2015-11-02 16:05:59 +02:00
parent 9cae182b49
commit 968d3e6741

View File

@ -24,6 +24,7 @@ import pika_pool
import retrying
import six
import socket
import sys
import threading
import time
@ -46,30 +47,36 @@ pika_opts = [
help='Maximum number of channels to allow'),
cfg.IntOpt('frame_max', default=None,
help='The maximum byte size for an AMQP frame'),
cfg.IntOpt('heartbeat_interval', default=None,
help='How often to send heartbeats'),
cfg.IntOpt('heartbeat_interval', default=1,
help="How often to send heartbeats for consumer's connections"),
cfg.BoolOpt('ssl', default=None,
help='Enable SSL'),
cfg.DictOpt('ssl_options', default=None,
help='Arguments passed to ssl.wrap_socket'),
cfg.FloatOpt('socket_timeout', default=None,
help='Use for high latency networks'),
cfg.FloatOpt('socket_timeout', default=0.25,
help="Set socket timeout in seconds for connection's socket"),
cfg.FloatOpt('tcp_user_timeout', default=0.25,
help="Set TCP_USER_TIMEOUT in seconds for connection's "
"socket"),
cfg.FloatOpt('host_connection_reconnect_delay', default=5,
help="Set delay for reconnection to some host which has "
"connection error")
]
pika_pool_opts = [
cfg.IntOpt('pool_max_size', default=10,
help="Maximum number of connections to keep queued."),
cfg.IntOpt('pool_max_overflow', default=10,
cfg.IntOpt('pool_max_overflow', default=0,
help="Maximum number of connections to create above "
"`pool_max_size`."),
cfg.IntOpt('pool_timeout', default=30,
help="Default number of seconds to wait for a connections to "
"available"),
cfg.IntOpt('pool_recycle', default=None,
cfg.IntOpt('pool_recycle', default=600,
help="Lifetime of a connection (since creation) in seconds "
"or None for no recycling. Expired connections are "
"closed on acquire."),
cfg.IntOpt('pool_stale', default=None,
cfg.IntOpt('pool_stale', default=60,
help="Threshold at which inactive (since release) connections "
"are considered stale in seconds or None for no "
"staleness. Stale connections are closed on acquire.")
@ -87,7 +94,7 @@ notification_opts = [
"sending notification, -1 means infinite retry."
),
cfg.FloatOpt(
'notification_retry_delay', default=0.1,
'notification_retry_delay', default=0.25,
help="Reconnecting retry delay in case of connectivity problem during "
"sending notification message"
)
@ -101,23 +108,45 @@ rpc_opts = [
help="Exchange name for for sending RPC messages"),
cfg.StrOpt('rpc_reply_exchange', default="${control_exchange}_rpc_reply",
help="Exchange name for for receiving RPC replies"),
cfg.BoolOpt('rpc_listener_ack', default=True,
help="Disable to increase performance. If disabled - some "
"messages may be lost in case of connectivity problem. "
"If enabled - may cause not needed message redelivery "
"and rpc request could be processed more then one time"),
cfg.BoolOpt('rpc_reply_listener_ack', default=True,
help="Disable to increase performance. If disabled - some "
"replies may be lost in case of connectivity problem."),
cfg.IntOpt(
'rpc_reply_retry_attempts', default=3,
'rpc_listener_prefetch_count', default=10,
help="Max number of not acknowledged message which RabbitMQ can send "
"to rpc listener. Works only if rpc_listener_ack == True"
),
cfg.IntOpt(
'rpc_reply_listener_prefetch_count', default=10,
help="Max number of not acknowledged message which RabbitMQ can send "
"to rpc reply listener. Works only if rpc_reply_listener_ack == "
"True"
),
cfg.IntOpt(
'rpc_reply_retry_attempts', default=-1,
help="Reconnecting retry count in case of connectivity problem during "
"sending reply. -1 means infinite retry."
"sending reply. -1 means infinite retry during rpc_timeout"
),
cfg.FloatOpt(
'rpc_reply_retry_delay', default=0.1,
'rpc_reply_retry_delay', default=0.25,
help="Reconnecting retry delay in case of connectivity problem during "
"sending reply."
),
cfg.IntOpt(
'default_rpc_retry_attempts', default=0,
'default_rpc_retry_attempts', default=-1,
help="Reconnecting retry count in case of connectivity problem during "
"sending RPC message, -1 means infinite retry."
"sending RPC message, -1 means infinite retry. If actual "
"retry attempts in not 0 the rpc request could be processed more "
"then one time"
),
cfg.FloatOpt(
'rpc_retry_delay', default=0.1,
'rpc_retry_delay', default=0.25,
help="Reconnecting retry delay in case of connectivity problem during "
"sending RPC message"
)
@ -147,10 +176,18 @@ class ConnectionException(exceptions.MessagingException):
pass
class HostConnectionNotAllowedException(ConnectionException):
pass
class EstablishConnectionException(ConnectionException):
pass
class TimeoutConnectionException(ConnectionException):
pass
class PooledConnectionWithConfirmations(pika_pool.Connection):
@property
def channel(self):
@ -161,9 +198,16 @@ class PooledConnectionWithConfirmations(pika_pool.Connection):
class PikaEngine(object):
HOST_CONNECTION_LAST_TRY_TIME = "last_try_time"
HOST_CONNECTION_LAST_SUCCESS_TRY_TIME = "last_success_try_time"
TCP_USER_TIMEOUT = 18
def __init__(self, conf, url, default_exchange=None):
self.conf = conf
# processing rpc options
self.default_rpc_exchange = (
conf.oslo_messaging_pika.default_rpc_exchange if
conf.oslo_messaging_pika.default_rpc_exchange else
@ -175,14 +219,18 @@ class PikaEngine(object):
default_exchange
)
self.default_notification_exchange = (
conf.oslo_messaging_pika.default_notification_exchange if
conf.oslo_messaging_pika.default_notification_exchange else
default_exchange
self.rpc_listener_ack = conf.oslo_messaging_pika.rpc_listener_ack
self.rpc_reply_listener_ack = (
conf.oslo_messaging_pika.rpc_reply_listener_ack
)
self.notification_persistence = (
conf.oslo_messaging_pika.notification_persistence
self.rpc_listener_prefetch_count = (
conf.oslo_messaging_pika.rpc_listener_prefetch_count
)
self.rpc_reply_listener_prefetch_count = (
conf.oslo_messaging_pika.rpc_listener_prefetch_count
)
self.rpc_reply_retry_attempts = (
@ -198,6 +246,17 @@ class PikaEngine(object):
raise ValueError("rpc_reply_retry_delay should be non-negative "
"integer")
# processing notification options
self.default_notification_exchange = (
conf.oslo_messaging_pika.default_notification_exchange if
conf.oslo_messaging_pika.default_notification_exchange else
default_exchange
)
self.notification_persistence = (
conf.oslo_messaging_pika.notification_persistence
)
self.default_rpc_retry_attempts = (
conf.oslo_messaging_pika.default_rpc_retry_attempts
)
@ -235,32 +294,41 @@ class PikaEngine(object):
self._reply_consumer_lock = threading.Lock()
self._puller_thread = None
self._tcp_user_timeout = self.conf.oslo_messaging_pika.tcp_user_timeout
self._host_connection_reconnect_delay = (
self.conf.oslo_messaging_pika.host_connection_reconnect_delay
)
# initializing connection parameters for configured RabbitMQ hosts
self._pika_next_connection_num = 0
common_pika_params = {
'virtual_host': url.virtual_host,
'channel_max': self.conf.oslo_messaging_pika.channel_max,
'frame_max': self.conf.oslo_messaging_pika.frame_max,
'heartbeat_interval':
self.conf.oslo_messaging_pika.heartbeat_interval,
'ssl': self.conf.oslo_messaging_pika.ssl,
'ssl_options': self.conf.oslo_messaging_pika.ssl_options,
'socket_timeout': self.conf.oslo_messaging_pika.socket_timeout,
}
self._pika_params_list = []
self._create_connection_lock = threading.Lock()
self._connection_lock = threading.Lock()
self._connection_host_param_list = []
self._connection_host_status_list = []
self._next_connection_host_num = 0
for transport_host in url.hosts:
pika_params = pika.ConnectionParameters(
pika_params = common_pika_params.copy()
pika_params.update(
host=transport_host.hostname,
port=transport_host.port,
credentials=pika_credentials.PlainCredentials(
transport_host.username, transport_host.password
),
**common_pika_params
)
self._pika_params_list.append(pika_params)
self._connection_host_param_list.append(pika_params)
self._connection_host_status_list.append({
self.HOST_CONNECTION_LAST_TRY_TIME: 0,
self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME: 0
})
# initializing 2 connection pools: 1st for connections without
# confirmations, 2nd - with confirmations
@ -286,41 +354,125 @@ class PikaEngine(object):
PooledConnectionWithConfirmations
)
def create_connection(self):
def _next_connection_num(self):
with self._connection_lock:
cur_num = self._next_connection_host_num
self._next_connection_host_num += 1
self._next_connection_host_num %= len(
self._connection_host_param_list
)
return cur_num
def create_connection(self, for_listening=False):
"""Create and return connection to any available host.
:return: cerated connection
:raise: ConnectionException if all hosts are not reachable
"""
host_num = len(self._pika_params_list)
connection_attempts = host_num
host_count = len(self._connection_host_param_list)
connection_attempts = host_count
pika_next_connection_num = self._next_connection_num()
while connection_attempts > 0:
with self._create_connection_lock:
try:
return self.create_host_connection(
self._pika_next_connection_num
pika_next_connection_num, for_listening
)
except pika_pool.Connection.connectivity_errors as e:
LOG.warn(str(e))
except HostConnectionNotAllowedException as e:
LOG.warn(str(e))
connection_attempts -= 1
continue
finally:
self._pika_next_connection_num += 1
self._pika_next_connection_num %= host_num
pika_next_connection_num += 1
pika_next_connection_num %= host_count
raise EstablishConnectionException(
"Can not establish connection to any configured RabbitMQ host: " +
str(self._pika_params_list)
str(self._connection_host_param_list)
)
def create_host_connection(self, host_index):
def _set_tcp_user_timeout(self, s):
if not self._tcp_user_timeout:
return
try:
s.setsockopt(
socket.IPPROTO_TCP, self.TCP_USER_TIMEOUT,
int(self._tcp_user_timeout * 1000)
)
except socket.error:
LOG.warn(
"Whoops, this kernel doesn't seem to support TCP_USER_TIMEOUT."
)
def create_host_connection(self, host_index, for_listening=False):
"""Create new connection to host #host_index
:return: New connection
"""
return pika_adapters.BlockingConnection(
self._pika_params_list[host_index]
with self._connection_lock:
cur_time = time.time()
last_success_time = self._connection_host_status_list[host_index][
self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME
]
last_time = self._connection_host_status_list[host_index][
self.HOST_CONNECTION_LAST_TRY_TIME
]
if (last_time != last_success_time and
cur_time - last_time <
self._host_connection_reconnect_delay):
raise HostConnectionNotAllowedException(
"Connection to host #{} is not allowed now because of "
"previous failure".format(host_index)
)
try:
base_host_params = self._connection_host_param_list[host_index]
connection = pika_adapters.BlockingConnection(
pika.ConnectionParameters(
heartbeat_interval=(
self.conf.oslo_messaging_pika.heartbeat_interval
if for_listening else None
),
**base_host_params
)
)
self._set_tcp_user_timeout(connection._impl.socket)
self._connection_host_status_list[host_index][
self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME
] = cur_time
return connection
finally:
self._connection_host_status_list[host_index][
self.HOST_CONNECTION_LAST_TRY_TIME
] = cur_time
@staticmethod
def declare_queue_binding_by_channel(channel, exchange, queue, routing_key,
exchange_type, queue_expiration,
queue_auto_delete, durable):
channel.exchange_declare(
exchange, exchange_type, auto_delete=True, durable=durable
)
arguments = {}
if queue_expiration > 0:
arguments['x-expires'] = queue_expiration * 1000
channel.queue_declare(
queue, auto_delete=queue_auto_delete, durable=durable,
arguments=arguments
)
channel.queue_bind(queue, exchange, routing_key)
def declare_queue_binding(self, exchange, queue, routing_key,
exchange_type, queue_expiration,
queue_auto_delete, durable,
@ -331,20 +483,10 @@ class PikaEngine(object):
)
try:
with self.connection_pool.acquire(timeout=timeout) as conn:
conn.channel.exchange_declare(
exchange, exchange_type, auto_delete=True, durable=durable
self.declare_queue_binding_by_channel(
conn.channel, exchange, queue, routing_key, exchange_type,
queue_expiration, queue_auto_delete, durable
)
arguments = {}
if queue_expiration > 0:
arguments['x-expires'] = queue_expiration * 1000
conn.channel.queue_declare(
queue, auto_delete=queue_auto_delete, durable=durable,
arguments=arguments
)
conn.channel.queue_bind(queue, exchange, routing_key)
except pika_pool.Timeout as e:
raise exceptions.MessagingTimeout(
"Timeout for current operation was expired. {}.".format(str(e))
@ -369,11 +511,10 @@ class PikaEngine(object):
raise exceptions.MessagingTimeout(
"Timeout for current operation was expired."
)
try:
with pool.acquire(timeout=timeout) as conn:
if timeout is not None:
properties.expiration = str(int(timeout))
properties.expiration = str(int(timeout * 1000))
conn.channel.publish(
exchange=exchange,
routing_key=routing_key,
@ -410,6 +551,7 @@ class PikaEngine(object):
body, properties, exchange, routing_key, str(e)
)
)
raise ConnectionException(
"Connectivity problem detected during sending the message: "
"[body:{}, properties: {}] to target: [exchange:{}, "
@ -417,6 +559,10 @@ class PikaEngine(object):
body, properties, exchange, routing_key, str(e)
)
)
except socket.timeout:
raise TimeoutConnectionException(
"Socket timeout exceeded."
)
def publish(self, exchange, routing_key, body, properties, confirm,
mandatory, expiration_time, retrier):
@ -454,6 +600,8 @@ class PikaEngine(object):
pika_engine=self,
exchange=self.rpc_reply_exchange,
queue=self._reply_queue,
no_ack=not self.rpc_reply_listener_ack,
prefetch_count=self.rpc_reply_listener_prefetch_count
)
self._reply_listener.start(timeout=timeout)
@ -473,6 +621,7 @@ class PikaEngine(object):
while self._reply_consumer_thread_run_flag:
try:
message = self._reply_listener.poll(timeout=1)
message.acknowledge()
if message is None:
continue
i = 0
@ -528,9 +677,9 @@ class PikaIncomingMessage(object):
self.content_encoding = getattr(properties, "content_encoding",
"utf-8")
self.expiration = (
self.expiration_time = (
None if properties.expiration is None else
int(properties.expiration)
time.time() + int(properties.expiration) / 1000
)
if self.content_type != "application/json":
@ -584,7 +733,7 @@ class PikaIncomingMessage(object):
else self._pika_engine.rpc_reply_retry_attempts
),
retry_on_exception=on_exception,
wait_fixed=self._pika_engine.rpc_reply_retry_delay,
wait_fixed=self._pika_engine.rpc_reply_retry_delay * 1000,
)
try:
@ -601,7 +750,7 @@ class PikaIncomingMessage(object):
),
confirm=True,
mandatory=False,
expiration_time=time.time() + self.expiration,
expiration_time=self.expiration_time,
retrier=retrier
)
LOG.debug(
@ -618,18 +767,12 @@ class PikaIncomingMessage(object):
def acknowledge(self):
if not self._no_ack:
try:
self._channel.basic_ack(delivery_tag=self.delivery_tag)
except Exception:
LOG.exception("Unable to acknowledge the message")
def requeue(self):
if not self._no_ack:
try:
return self._channel.basic_nack(delivery_tag=self.delivery_tag,
requeue=True)
except Exception:
LOG.exception("Unable to requeue the message")
class PikaOutgoingMessage(object):
@ -669,7 +812,7 @@ class PikaOutgoingMessage(object):
)
expiration_time = (
None if timeout is None else timeout + time.time()
None if timeout is None else (timeout + time.time())
)
if wait_for_reply:
@ -722,7 +865,9 @@ class PikaListener(object):
self._message_queue = collections.deque()
def _reconnect(self):
self._connection = self._pika_engine.create_connection()
self._connection = self._pika_engine.create_connection(
for_listening=True
)
self._channel = self._connection.channel()
self._channel.basic_qos(prefetch_count=self._prefetch_count)
@ -763,12 +908,14 @@ class PikaListener(object):
with self._lock:
if not self._started:
return None
try:
if self._channel is None:
self._reconnect()
try:
self._connection.process_data_events()
except pika_pool.Connection.connectivity_errors:
except Exception:
self._cleanup()
raise
if timeout and time.time() - start > timeout:
return None
@ -800,7 +947,7 @@ class PikaListener(object):
class RpcServicePikaListener(PikaListener):
def __init__(self, pika_engine, target, no_ack=True, prefetch_count=1):
def __init__(self, pika_engine, target, no_ack, prefetch_count):
self._target = target
super(RpcServicePikaListener, self).__init__(
@ -820,17 +967,20 @@ class RpcServicePikaListener(PikaListener):
self._pika_engine.conf.oslo_messaging_pika.rpc_queue_expiration
)
self._pika_engine.declare_queue_binding(
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel,
exchange=exchange, queue=queue, routing_key=queue,
exchange_type='direct', queue_expiration=queue_expiration,
queue_auto_delete=False, durable=False
)
self._pika_engine.declare_queue_binding(
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel,
exchange=exchange, queue=server_queue, routing_key=server_queue,
exchange_type='direct', queue_expiration=queue_expiration,
queue_auto_delete=False, durable=False
)
self._pika_engine.declare_queue_binding(
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel,
exchange=fanout_exchange, queue=server_queue, routing_key="",
exchange_type='fanout', queue_expiration=queue_expiration,
queue_auto_delete=False, durable=False
@ -849,8 +999,7 @@ class RpcServicePikaListener(PikaListener):
class RpcReplyPikaListener(PikaListener):
def __init__(self, pika_engine, exchange, queue, no_ack=True,
prefetch_count=1):
def __init__(self, pika_engine, exchange, queue, no_ack, prefetch_count):
self._exchange = exchange
self._queue = queue
@ -863,7 +1012,8 @@ class RpcReplyPikaListener(PikaListener):
self._pika_engine.conf.oslo_messaging_pika.rpc_queue_expiration
)
self._pika_engine.declare_queue_binding(
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel,
exchange=self._exchange, queue=self._queue,
routing_key=self._queue, exchange_type='direct',
queue_expiration=queue_expiration, queue_auto_delete=False,
@ -881,8 +1031,8 @@ class RpcReplyPikaListener(PikaListener):
retrier = retrying.retry(
stop_max_attempt_number=self._pika_engine.rpc_reply_retry_attempts,
stop_max_delay=timeout,
wait_fixed=self._pika_engine.rpc_reply_retry_delay,
stop_max_delay=timeout * 1000,
wait_fixed=self._pika_engine.rpc_reply_retry_delay * 1000,
retry_on_exception=on_exception,
)
@ -912,7 +1062,8 @@ class NotificationPikaListener(PikaListener):
for target, priority in self._targets_and_priorities:
routing_key = '%s.%s' % (target.topic, priority)
queue = self._queue_name or routing_key
self._pika_engine.declare_queue_binding(
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel,
exchange=(
target.exchange or
self._pika_engine.default_notification_exchange
@ -991,7 +1142,7 @@ class PikaDriver(object):
retrying.retry(
stop_max_attempt_number=(None if retry == -1 else retry),
retry_on_exception=on_exception,
wait_fixed=self._pika_engine.rpc_retry_delay,
wait_fixed=self._pika_engine.rpc_retry_delay * 1000,
)
)
@ -1045,7 +1196,7 @@ class PikaDriver(object):
queue=target.topic,
routing_key=target.topic,
exchange_type='direct',
queue_expiration=False,
queue_expiration=None,
queue_auto_delete=False,
durable=self._pika_engine.notification_persistence,
)
@ -1054,6 +1205,7 @@ class PikaDriver(object):
return True
elif isinstance(ex,
(ConnectionException, MessageRejectedException)):
LOG.warn(str(ex))
return True
else:
return False
@ -1061,7 +1213,7 @@ class PikaDriver(object):
retrier = retrying.retry(
stop_max_attempt_number=(None if retry == -1 else retry),
retry_on_exception=on_exception,
wait_fixed=self._pika_engine.notification_retry_delay,
wait_fixed=self._pika_engine.notification_retry_delay * 1000,
)
msg = PikaOutgoingMessage(self._pika_engine, message, ctxt)
@ -1080,7 +1232,11 @@ class PikaDriver(object):
)
def listen(self, target):
listener = RpcServicePikaListener(self._pika_engine, target)
listener = RpcServicePikaListener(
self._pika_engine, target,
no_ack=not self._pika_engine.rpc_listener_ack,
prefetch_count=self._pika_engine.rpc_listener_prefetch_count
)
listener.start()
return listener