Merge remote-tracking branch 'origin/feature/pika' into merge-branch
Change-Id: I570b3dbb3ecaa13d5a6b240a092de82ee0b39c50
This commit is contained in:
commit
2982b6a7ed
@ -2,3 +2,4 @@
|
||||
host=review.openstack.org
|
||||
port=29418
|
||||
project=openstack/oslo.messaging.git
|
||||
branch=feature/pika
|
||||
|
276
oslo_messaging/_drivers/impl_pika.py
Normal file
276
oslo_messaging/_drivers/impl_pika.py
Normal file
@ -0,0 +1,276 @@
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_messaging import exceptions
|
||||
import pika_pool
|
||||
import retrying
|
||||
|
||||
from oslo_messaging._drivers.pika_driver import pika_engine as pika_drv_engine
|
||||
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
||||
from oslo_messaging._drivers.pika_driver import pika_listener as pika_drv_lstnr
|
||||
from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
|
||||
from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
pika_opts = [
|
||||
cfg.IntOpt('channel_max', default=None,
|
||||
help='Maximum number of channels to allow'),
|
||||
cfg.IntOpt('frame_max', default=None,
|
||||
help='The maximum byte size for an AMQP frame'),
|
||||
cfg.IntOpt('heartbeat_interval', default=1,
|
||||
help="How often to send heartbeats for consumer's connections"),
|
||||
cfg.BoolOpt('ssl', default=None,
|
||||
help='Enable SSL'),
|
||||
cfg.DictOpt('ssl_options', default=None,
|
||||
help='Arguments passed to ssl.wrap_socket'),
|
||||
cfg.FloatOpt('socket_timeout', default=0.25,
|
||||
help="Set socket timeout in seconds for connection's socket"),
|
||||
cfg.FloatOpt('tcp_user_timeout', default=0.25,
|
||||
help="Set TCP_USER_TIMEOUT in seconds for connection's "
|
||||
"socket"),
|
||||
cfg.FloatOpt('host_connection_reconnect_delay', default=0.25,
|
||||
help="Set delay for reconnection to some host which has "
|
||||
"connection error")
|
||||
]
|
||||
|
||||
pika_pool_opts = [
|
||||
cfg.IntOpt('pool_max_size', default=10,
|
||||
help="Maximum number of connections to keep queued."),
|
||||
cfg.IntOpt('pool_max_overflow', default=0,
|
||||
help="Maximum number of connections to create above "
|
||||
"`pool_max_size`."),
|
||||
cfg.IntOpt('pool_timeout', default=30,
|
||||
help="Default number of seconds to wait for a connections to "
|
||||
"available"),
|
||||
cfg.IntOpt('pool_recycle', default=600,
|
||||
help="Lifetime of a connection (since creation) in seconds "
|
||||
"or None for no recycling. Expired connections are "
|
||||
"closed on acquire."),
|
||||
cfg.IntOpt('pool_stale', default=60,
|
||||
help="Threshold at which inactive (since release) connections "
|
||||
"are considered stale in seconds or None for no "
|
||||
"staleness. Stale connections are closed on acquire.")
|
||||
]
|
||||
|
||||
notification_opts = [
|
||||
cfg.BoolOpt('notification_persistence', default=False,
|
||||
help="Persist notification messages."),
|
||||
cfg.StrOpt('default_notification_exchange',
|
||||
default="${control_exchange}_notification",
|
||||
help="Exchange name for for sending notifications"),
|
||||
cfg.IntOpt(
|
||||
'default_notification_retry_attempts', default=-1,
|
||||
help="Reconnecting retry count in case of connectivity problem during "
|
||||
"sending notification, -1 means infinite retry."
|
||||
),
|
||||
cfg.FloatOpt(
|
||||
'notification_retry_delay', default=0.25,
|
||||
help="Reconnecting retry delay in case of connectivity problem during "
|
||||
"sending notification message"
|
||||
)
|
||||
]
|
||||
|
||||
rpc_opts = [
|
||||
cfg.IntOpt('rpc_queue_expiration', default=60,
|
||||
help="Time to live for rpc queues without consumers in "
|
||||
"seconds."),
|
||||
cfg.StrOpt('default_rpc_exchange', default="${control_exchange}_rpc",
|
||||
help="Exchange name for for sending RPC messages"),
|
||||
cfg.StrOpt('rpc_reply_exchange', default="${control_exchange}_rpc_reply",
|
||||
help="Exchange name for for receiving RPC replies"),
|
||||
cfg.IntOpt(
|
||||
'rpc_listener_prefetch_count', default=10,
|
||||
help="Max number of not acknowledged message which RabbitMQ can send "
|
||||
"to rpc listener. Works only if rpc_listener_ack == True"
|
||||
),
|
||||
cfg.IntOpt(
|
||||
'rpc_reply_listener_prefetch_count', default=10,
|
||||
help="Max number of not acknowledged message which RabbitMQ can send "
|
||||
"to rpc reply listener. Works only if rpc_reply_listener_ack == "
|
||||
"True"
|
||||
),
|
||||
cfg.IntOpt(
|
||||
'rpc_reply_retry_attempts', default=-1,
|
||||
help="Reconnecting retry count in case of connectivity problem during "
|
||||
"sending reply. -1 means infinite retry during rpc_timeout"
|
||||
),
|
||||
cfg.FloatOpt(
|
||||
'rpc_reply_retry_delay', default=0.25,
|
||||
help="Reconnecting retry delay in case of connectivity problem during "
|
||||
"sending reply."
|
||||
),
|
||||
cfg.IntOpt(
|
||||
'default_rpc_retry_attempts', default=-1,
|
||||
help="Reconnecting retry count in case of connectivity problem during "
|
||||
"sending RPC message, -1 means infinite retry. If actual "
|
||||
"retry attempts in not 0 the rpc request could be processed more "
|
||||
"then one time"
|
||||
),
|
||||
cfg.FloatOpt(
|
||||
'rpc_retry_delay', default=0.25,
|
||||
help="Reconnecting retry delay in case of connectivity problem during "
|
||||
"sending RPC message"
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
class PikaDriver(object):
|
||||
def __init__(self, conf, url, default_exchange=None,
|
||||
allowed_remote_exmods=None):
|
||||
opt_group = cfg.OptGroup(name='oslo_messaging_pika',
|
||||
title='Pika driver options')
|
||||
conf.register_group(opt_group)
|
||||
conf.register_opts(pika_opts, group=opt_group)
|
||||
conf.register_opts(pika_pool_opts, group=opt_group)
|
||||
conf.register_opts(rpc_opts, group=opt_group)
|
||||
conf.register_opts(notification_opts, group=opt_group)
|
||||
|
||||
self.conf = conf
|
||||
|
||||
self._pika_engine = pika_drv_engine.PikaEngine(
|
||||
conf, url, default_exchange, allowed_remote_exmods
|
||||
)
|
||||
self._reply_listener = pika_drv_lstnr.RpcReplyPikaListener(
|
||||
self._pika_engine
|
||||
)
|
||||
|
||||
def require_features(self, requeue=False):
|
||||
pass
|
||||
|
||||
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
|
||||
retry=None):
|
||||
expiration_time = None if timeout is None else time.time() + timeout
|
||||
|
||||
if retry is None:
|
||||
retry = self._pika_engine.default_rpc_retry_attempts
|
||||
|
||||
def on_exception(ex):
|
||||
if isinstance(ex, (pika_drv_exc.ConnectionException,
|
||||
exceptions.MessageDeliveryFailure)):
|
||||
LOG.warn(str(ex))
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
retrier = (
|
||||
None if retry == 0 else
|
||||
retrying.retry(
|
||||
stop_max_attempt_number=(None if retry == -1 else retry),
|
||||
retry_on_exception=on_exception,
|
||||
wait_fixed=self._pika_engine.rpc_retry_delay * 1000,
|
||||
)
|
||||
)
|
||||
|
||||
msg = pika_drv_msg.RpcPikaOutgoingMessage(self._pika_engine, message,
|
||||
ctxt)
|
||||
reply = msg.send(
|
||||
target,
|
||||
reply_listener=self._reply_listener if wait_for_reply else None,
|
||||
expiration_time=expiration_time,
|
||||
retrier=retrier
|
||||
)
|
||||
|
||||
if reply is not None:
|
||||
if reply.failure is not None:
|
||||
raise reply.failure
|
||||
|
||||
return reply.result
|
||||
|
||||
def _declare_notification_queue_binding(self, target, timeout=None):
|
||||
if timeout is not None and timeout < 0:
|
||||
raise exceptions.MessagingTimeout(
|
||||
"Timeout for current operation was expired."
|
||||
)
|
||||
try:
|
||||
with (self._pika_engine.connection_without_confirmation_pool
|
||||
.acquire)(timeout=timeout) as conn:
|
||||
self._pika_engine.declare_queue_binding_by_channel(
|
||||
conn.channel,
|
||||
exchange=(
|
||||
target.exchange or
|
||||
self._pika_engine.default_notification_exchange
|
||||
),
|
||||
queue=target.topic,
|
||||
routing_key=target.topic,
|
||||
exchange_type='direct',
|
||||
queue_expiration=None,
|
||||
durable=self._pika_engine.notification_persistence,
|
||||
)
|
||||
except pika_pool.Timeout as e:
|
||||
raise exceptions.MessagingTimeout(
|
||||
"Timeout for current operation was expired. {}.".format(str(e))
|
||||
)
|
||||
|
||||
def send_notification(self, target, ctxt, message, version, retry=None):
|
||||
if retry is None:
|
||||
retry = self._pika_engine.default_notification_retry_attempts
|
||||
|
||||
def on_exception(ex):
|
||||
if isinstance(ex, (pika_drv_exc.ExchangeNotFoundException,
|
||||
pika_drv_exc.RoutingException)):
|
||||
LOG.warn(str(ex))
|
||||
try:
|
||||
self._declare_notification_queue_binding(target)
|
||||
except pika_drv_exc.ConnectionException as e:
|
||||
LOG.warn(str(e))
|
||||
return True
|
||||
elif isinstance(ex, (pika_drv_exc.ConnectionException,
|
||||
pika_drv_exc.MessageRejectedException)):
|
||||
LOG.warn(str(ex))
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
retrier = retrying.retry(
|
||||
stop_max_attempt_number=(None if retry == -1 else retry),
|
||||
retry_on_exception=on_exception,
|
||||
wait_fixed=self._pika_engine.notification_retry_delay * 1000,
|
||||
)
|
||||
|
||||
msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message,
|
||||
ctxt)
|
||||
return msg.send(
|
||||
exchange=(
|
||||
target.exchange or
|
||||
self._pika_engine.default_notification_exchange
|
||||
),
|
||||
routing_key=target.topic,
|
||||
confirm=True,
|
||||
mandatory=True,
|
||||
persistent=self._pika_engine.notification_persistence,
|
||||
retrier=retrier
|
||||
)
|
||||
|
||||
def listen(self, target):
|
||||
listener = pika_drv_poller.RpcServicePikaPoller(
|
||||
self._pika_engine, target,
|
||||
prefetch_count=self._pika_engine.rpc_listener_prefetch_count
|
||||
)
|
||||
listener.start()
|
||||
return listener
|
||||
|
||||
def listen_for_notifications(self, targets_and_priorities, pool):
|
||||
listener = pika_drv_poller.NotificationPikaPoller(
|
||||
self._pika_engine, targets_and_priorities, pool
|
||||
)
|
||||
listener.start()
|
||||
return listener
|
||||
|
||||
def cleanup(self):
|
||||
self._reply_listener.cleanup()
|
0
oslo_messaging/_drivers/pika_driver/__init__.py
Normal file
0
oslo_messaging/_drivers/pika_driver/__init__.py
Normal file
434
oslo_messaging/_drivers/pika_driver/pika_engine.py
Normal file
434
oslo_messaging/_drivers/pika_driver/pika_engine.py
Normal file
@ -0,0 +1,434 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import random
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo_log import log as logging
|
||||
import pika
|
||||
from pika.adapters import select_connection
|
||||
from pika import credentials as pika_credentials
|
||||
import pika_pool
|
||||
import six
|
||||
|
||||
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
|
||||
|
||||
|
||||
def _is_eventlet_monkey_patched(module):
|
||||
"""Determines safely is eventlet patching for module enabled or not
|
||||
|
||||
:param module: String, module name
|
||||
:return Bool, True if module is pathed, False otherwise
|
||||
"""
|
||||
|
||||
if 'eventlet.patcher' not in sys.modules:
|
||||
return False
|
||||
import eventlet.patcher
|
||||
return eventlet.patcher.is_monkey_patched(module)
|
||||
|
||||
|
||||
def _create_select_poller_connection_impl(
|
||||
parameters, on_open_callback, on_open_error_callback,
|
||||
on_close_callback, stop_ioloop_on_close):
|
||||
"""Used for disabling autochoise of poller ('select', 'poll', 'epool', etc)
|
||||
inside default 'SelectConnection.__init__(...)' logic. It is necessary to
|
||||
force 'select' poller usage if eventlet is monkeypatched because eventlet
|
||||
patches only 'select' system call
|
||||
|
||||
Method signature is copied form 'SelectConnection.__init__(...)', because
|
||||
it is used as replacement of 'SelectConnection' class to create instances
|
||||
"""
|
||||
return select_connection.SelectConnection(
|
||||
parameters=parameters,
|
||||
on_open_callback=on_open_callback,
|
||||
on_open_error_callback=on_open_error_callback,
|
||||
on_close_callback=on_close_callback,
|
||||
stop_ioloop_on_close=stop_ioloop_on_close,
|
||||
custom_ioloop=select_connection.SelectPoller()
|
||||
)
|
||||
|
||||
|
||||
class _PooledConnectionWithConfirmations(pika_pool.Connection):
|
||||
"""Derived from 'pika_pool.Connection' and extends its logic - adds
|
||||
'confirm_delivery' call after channel creation to enable delivery
|
||||
confirmation for channel
|
||||
"""
|
||||
@property
|
||||
def channel(self):
|
||||
if self.fairy.channel is None:
|
||||
self.fairy.channel = self.fairy.cxn.channel()
|
||||
self.fairy.channel.confirm_delivery()
|
||||
return self.fairy.channel
|
||||
|
||||
|
||||
class PikaEngine(object):
|
||||
"""Used for shared functionality between other pika driver modules, like
|
||||
connection factory, connection pools, processing and holding configuration,
|
||||
etc.
|
||||
"""
|
||||
|
||||
# constants for creating connection statistics
|
||||
HOST_CONNECTION_LAST_TRY_TIME = "last_try_time"
|
||||
HOST_CONNECTION_LAST_SUCCESS_TRY_TIME = "last_success_try_time"
|
||||
|
||||
# constant for setting tcp_user_timeout socket option
|
||||
# (it should be defined in 'select' module of standard library in future)
|
||||
TCP_USER_TIMEOUT = 18
|
||||
|
||||
def __init__(self, conf, url, default_exchange=None,
|
||||
allowed_remote_exmods=None):
|
||||
self.conf = conf
|
||||
|
||||
self._force_select_poller_use = _is_eventlet_monkey_patched('select')
|
||||
|
||||
# processing rpc options
|
||||
self.default_rpc_exchange = (
|
||||
conf.oslo_messaging_pika.default_rpc_exchange if
|
||||
conf.oslo_messaging_pika.default_rpc_exchange else
|
||||
default_exchange
|
||||
)
|
||||
self.rpc_reply_exchange = (
|
||||
conf.oslo_messaging_pika.rpc_reply_exchange if
|
||||
conf.oslo_messaging_pika.rpc_reply_exchange else
|
||||
default_exchange
|
||||
)
|
||||
|
||||
self.allowed_remote_exmods = [_EXCEPTIONS_MODULE]
|
||||
if allowed_remote_exmods:
|
||||
self.allowed_remote_exmods.extend(allowed_remote_exmods)
|
||||
|
||||
self.rpc_listener_prefetch_count = (
|
||||
conf.oslo_messaging_pika.rpc_listener_prefetch_count
|
||||
)
|
||||
|
||||
self.rpc_reply_listener_prefetch_count = (
|
||||
conf.oslo_messaging_pika.rpc_listener_prefetch_count
|
||||
)
|
||||
|
||||
self.rpc_reply_retry_attempts = (
|
||||
conf.oslo_messaging_pika.rpc_reply_retry_attempts
|
||||
)
|
||||
if self.rpc_reply_retry_attempts is None:
|
||||
raise ValueError("rpc_reply_retry_attempts should be integer")
|
||||
self.rpc_reply_retry_delay = (
|
||||
conf.oslo_messaging_pika.rpc_reply_retry_delay
|
||||
)
|
||||
if (self.rpc_reply_retry_delay is None or
|
||||
self.rpc_reply_retry_delay < 0):
|
||||
raise ValueError("rpc_reply_retry_delay should be non-negative "
|
||||
"integer")
|
||||
|
||||
self.rpc_queue_expiration = (
|
||||
self.conf.oslo_messaging_pika.rpc_queue_expiration
|
||||
)
|
||||
|
||||
# processing notification options
|
||||
self.default_notification_exchange = (
|
||||
conf.oslo_messaging_pika.default_notification_exchange if
|
||||
conf.oslo_messaging_pika.default_notification_exchange else
|
||||
default_exchange
|
||||
)
|
||||
|
||||
self.notification_persistence = (
|
||||
conf.oslo_messaging_pika.notification_persistence
|
||||
)
|
||||
|
||||
self.default_rpc_retry_attempts = (
|
||||
conf.oslo_messaging_pika.default_rpc_retry_attempts
|
||||
)
|
||||
if self.default_rpc_retry_attempts is None:
|
||||
raise ValueError("default_rpc_retry_attempts should be an integer")
|
||||
self.rpc_retry_delay = (
|
||||
conf.oslo_messaging_pika.rpc_retry_delay
|
||||
)
|
||||
if (self.rpc_retry_delay is None or
|
||||
self.rpc_retry_delay < 0):
|
||||
raise ValueError("rpc_retry_delay should be non-negative integer")
|
||||
|
||||
self.default_notification_retry_attempts = (
|
||||
conf.oslo_messaging_pika.default_notification_retry_attempts
|
||||
)
|
||||
if self.default_notification_retry_attempts is None:
|
||||
raise ValueError("default_notification_retry_attempts should be "
|
||||
"an integer")
|
||||
self.notification_retry_delay = (
|
||||
conf.oslo_messaging_pika.notification_retry_delay
|
||||
)
|
||||
if (self.notification_retry_delay is None or
|
||||
self.notification_retry_delay < 0):
|
||||
raise ValueError("notification_retry_delay should be non-negative "
|
||||
"integer")
|
||||
|
||||
self._tcp_user_timeout = self.conf.oslo_messaging_pika.tcp_user_timeout
|
||||
self.host_connection_reconnect_delay = (
|
||||
self.conf.oslo_messaging_pika.host_connection_reconnect_delay
|
||||
)
|
||||
self._heartbeat_interval = (
|
||||
self.conf.oslo_messaging_pika.heartbeat_interval
|
||||
)
|
||||
|
||||
# initializing connection parameters for configured RabbitMQ hosts
|
||||
common_pika_params = {
|
||||
'virtual_host': url.virtual_host,
|
||||
'channel_max': self.conf.oslo_messaging_pika.channel_max,
|
||||
'frame_max': self.conf.oslo_messaging_pika.frame_max,
|
||||
'ssl': self.conf.oslo_messaging_pika.ssl,
|
||||
'ssl_options': self.conf.oslo_messaging_pika.ssl_options,
|
||||
'socket_timeout': self.conf.oslo_messaging_pika.socket_timeout,
|
||||
}
|
||||
|
||||
self._connection_lock = threading.Lock()
|
||||
|
||||
self._connection_host_param_list = []
|
||||
self._connection_host_status_list = []
|
||||
|
||||
if not url.hosts:
|
||||
raise ValueError("You should provide at least one RabbitMQ host")
|
||||
|
||||
for transport_host in url.hosts:
|
||||
pika_params = common_pika_params.copy()
|
||||
pika_params.update(
|
||||
host=transport_host.hostname,
|
||||
port=transport_host.port,
|
||||
credentials=pika_credentials.PlainCredentials(
|
||||
transport_host.username, transport_host.password
|
||||
),
|
||||
)
|
||||
self._connection_host_param_list.append(pika_params)
|
||||
self._connection_host_status_list.append({
|
||||
self.HOST_CONNECTION_LAST_TRY_TIME: 0,
|
||||
self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME: 0
|
||||
})
|
||||
|
||||
self._next_connection_host_num = random.randint(
|
||||
0, len(self._connection_host_param_list) - 1
|
||||
)
|
||||
|
||||
# initializing 2 connection pools: 1st for connections without
|
||||
# confirmations, 2nd - with confirmations
|
||||
self.connection_without_confirmation_pool = pika_pool.QueuedPool(
|
||||
create=self.create_connection,
|
||||
max_size=self.conf.oslo_messaging_pika.pool_max_size,
|
||||
max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow,
|
||||
timeout=self.conf.oslo_messaging_pika.pool_timeout,
|
||||
recycle=self.conf.oslo_messaging_pika.pool_recycle,
|
||||
stale=self.conf.oslo_messaging_pika.pool_stale,
|
||||
)
|
||||
|
||||
self.connection_with_confirmation_pool = pika_pool.QueuedPool(
|
||||
create=self.create_connection,
|
||||
max_size=self.conf.oslo_messaging_pika.pool_max_size,
|
||||
max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow,
|
||||
timeout=self.conf.oslo_messaging_pika.pool_timeout,
|
||||
recycle=self.conf.oslo_messaging_pika.pool_recycle,
|
||||
stale=self.conf.oslo_messaging_pika.pool_stale,
|
||||
)
|
||||
|
||||
self.connection_with_confirmation_pool.Connection = (
|
||||
_PooledConnectionWithConfirmations
|
||||
)
|
||||
|
||||
def _next_connection_num(self):
|
||||
"""Used for creating connections to different RabbitMQ nodes in
|
||||
round robin order
|
||||
|
||||
:return: next host number to create connection to
|
||||
"""
|
||||
with self._connection_lock:
|
||||
cur_num = self._next_connection_host_num
|
||||
self._next_connection_host_num += 1
|
||||
self._next_connection_host_num %= len(
|
||||
self._connection_host_param_list
|
||||
)
|
||||
return cur_num
|
||||
|
||||
def create_connection(self, for_listening=False):
|
||||
"""Create and return connection to any available host.
|
||||
|
||||
:return: created connection
|
||||
:raise: ConnectionException if all hosts are not reachable
|
||||
"""
|
||||
host_count = len(self._connection_host_param_list)
|
||||
connection_attempts = host_count
|
||||
|
||||
pika_next_connection_num = self._next_connection_num()
|
||||
|
||||
while connection_attempts > 0:
|
||||
try:
|
||||
return self.create_host_connection(
|
||||
pika_next_connection_num, for_listening
|
||||
)
|
||||
except pika_pool.Connection.connectivity_errors as e:
|
||||
LOG.warn(str(e))
|
||||
except pika_drv_exc.HostConnectionNotAllowedException as e:
|
||||
LOG.warn(str(e))
|
||||
|
||||
connection_attempts -= 1
|
||||
pika_next_connection_num += 1
|
||||
pika_next_connection_num %= host_count
|
||||
|
||||
raise pika_drv_exc.EstablishConnectionException(
|
||||
"Can not establish connection to any configured RabbitMQ host: " +
|
||||
str(self._connection_host_param_list)
|
||||
)
|
||||
|
||||
def _set_tcp_user_timeout(self, s):
|
||||
if not self._tcp_user_timeout:
|
||||
return
|
||||
try:
|
||||
s.setsockopt(
|
||||
socket.IPPROTO_TCP, self.TCP_USER_TIMEOUT,
|
||||
int(self._tcp_user_timeout * 1000)
|
||||
)
|
||||
except socket.error:
|
||||
LOG.warn(
|
||||
"Whoops, this kernel doesn't seem to support TCP_USER_TIMEOUT."
|
||||
)
|
||||
|
||||
def create_host_connection(self, host_index, for_listening=False):
|
||||
"""Create new connection to host #host_index
|
||||
:param host_index: Integer, number of host for connection establishing
|
||||
:param for_listening: Boolean, creates connection for listening
|
||||
(enable heartbeats) if True
|
||||
:return: New connection
|
||||
"""
|
||||
|
||||
with self._connection_lock:
|
||||
cur_time = time.time()
|
||||
|
||||
last_success_time = self._connection_host_status_list[host_index][
|
||||
self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME
|
||||
]
|
||||
last_time = self._connection_host_status_list[host_index][
|
||||
self.HOST_CONNECTION_LAST_TRY_TIME
|
||||
]
|
||||
|
||||
# raise HostConnectionNotAllowedException if we tried to establish
|
||||
# connection in last 'host_connection_reconnect_delay' and got
|
||||
# failure
|
||||
if (last_time != last_success_time and
|
||||
cur_time - last_time <
|
||||
self.host_connection_reconnect_delay):
|
||||
raise pika_drv_exc.HostConnectionNotAllowedException(
|
||||
"Connection to host #{} is not allowed now because of "
|
||||
"previous failure".format(host_index)
|
||||
)
|
||||
|
||||
try:
|
||||
base_host_params = self._connection_host_param_list[host_index]
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
parameters=pika.ConnectionParameters(
|
||||
heartbeat_interval=(
|
||||
self._heartbeat_interval
|
||||
if for_listening else None
|
||||
),
|
||||
**base_host_params
|
||||
),
|
||||
_impl_class=(_create_select_poller_connection_impl
|
||||
if self._force_select_poller_use else None)
|
||||
)
|
||||
|
||||
self._set_tcp_user_timeout(connection._impl.socket)
|
||||
|
||||
self._connection_host_status_list[host_index][
|
||||
self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME
|
||||
] = cur_time
|
||||
|
||||
return connection
|
||||
finally:
|
||||
self._connection_host_status_list[host_index][
|
||||
self.HOST_CONNECTION_LAST_TRY_TIME
|
||||
] = cur_time
|
||||
|
||||
@staticmethod
|
||||
def declare_queue_binding_by_channel(channel, exchange, queue, routing_key,
|
||||
exchange_type, queue_expiration,
|
||||
durable):
|
||||
"""Declare exchange, queue and bind them using already created
|
||||
channel, if they don't exist
|
||||
|
||||
:param channel: Channel for communication with RabbitMQ
|
||||
:param exchange: String, RabbitMQ exchange name
|
||||
:param queue: Sting, RabbitMQ queue name
|
||||
:param routing_key: Sting, RabbitMQ routing key for queue binding
|
||||
:param exchange_type: String ('direct', 'topic' or 'fanout')
|
||||
exchange type for exchange to be declared
|
||||
:param queue_expiration: Integer, time in seconds which queue will
|
||||
remain existing in RabbitMQ when there no consumers connected
|
||||
:param durable: Boolean, creates durable exchange and queue if true
|
||||
"""
|
||||
try:
|
||||
channel.exchange_declare(
|
||||
exchange, exchange_type, auto_delete=True, durable=durable
|
||||
)
|
||||
arguments = {}
|
||||
|
||||
if queue_expiration > 0:
|
||||
arguments['x-expires'] = queue_expiration * 1000
|
||||
|
||||
channel.queue_declare(queue, durable=durable, arguments=arguments)
|
||||
|
||||
channel.queue_bind(queue, exchange, routing_key)
|
||||
except pika_pool.Connection.connectivity_errors as e:
|
||||
raise pika_drv_exc.ConnectionException(
|
||||
"Connectivity problem detected during declaring queue "
|
||||
"binding: exchange:{}, queue: {}, routing_key: {}, "
|
||||
"exchange_type: {}, queue_expiration: {}, "
|
||||
"durable: {}. {}".format(
|
||||
exchange, queue, routing_key, exchange_type,
|
||||
queue_expiration, durable, str(e)
|
||||
)
|
||||
)
|
||||
|
||||
def get_rpc_exchange_name(self, exchange, topic, fanout, no_ack):
|
||||
"""Returns RabbitMQ exchange name for given rpc request
|
||||
|
||||
:param exchange: String, oslo.messaging target's exchange
|
||||
:param topic: String, oslo.messaging target's topic
|
||||
:param fanout: Boolean, oslo.messaging target's fanout mode
|
||||
:param no_ack: Boolean, use message delivery with acknowledges or not
|
||||
|
||||
:return: String, RabbitMQ exchange name
|
||||
"""
|
||||
exchange = (exchange or self.default_rpc_exchange)
|
||||
|
||||
if fanout:
|
||||
exchange = '{}_fanout_{}_{}'.format(
|
||||
exchange, "no_ack" if no_ack else "with_ack", topic
|
||||
)
|
||||
return exchange
|
||||
|
||||
@staticmethod
|
||||
def get_rpc_queue_name(topic, server, no_ack):
|
||||
"""Returns RabbitMQ queue name for given rpc request
|
||||
|
||||
:param topic: String, oslo.messaging target's topic
|
||||
:param server: String, oslo.messaging target's server
|
||||
:param no_ack: Boolean, use message delivery with acknowledges or not
|
||||
|
||||
:return: String, RabbitMQ exchange name
|
||||
"""
|
||||
queue_parts = ["no_ack" if no_ack else "with_ack", topic]
|
||||
if server is not None:
|
||||
queue_parts.append(server)
|
||||
queue = '.'.join(queue_parts)
|
||||
return queue
|
68
oslo_messaging/_drivers/pika_driver/pika_exceptions.py
Normal file
68
oslo_messaging/_drivers/pika_driver/pika_exceptions.py
Normal file
@ -0,0 +1,68 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_messaging import exceptions
|
||||
|
||||
|
||||
class ExchangeNotFoundException(exceptions.MessageDeliveryFailure):
|
||||
"""Is raised if specified exchange is not found in RabbitMQ."""
|
||||
pass
|
||||
|
||||
|
||||
class MessageRejectedException(exceptions.MessageDeliveryFailure):
|
||||
"""Is raised if message which you are trying to send was nacked by RabbitMQ
|
||||
it may happen if RabbitMQ is not able to process message
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class RoutingException(exceptions.MessageDeliveryFailure):
|
||||
"""Is raised if message can not be delivered to any queue. Usually it means
|
||||
that any queue is not binded to given exchange with given routing key.
|
||||
Raised if 'mandatory' flag specified only
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class ConnectionException(exceptions.MessagingException):
|
||||
"""Is raised if some operation can not be performed due to connectivity
|
||||
problem
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class TimeoutConnectionException(ConnectionException):
|
||||
"""Is raised if socket timeout was expired during network interaction"""
|
||||
pass
|
||||
|
||||
|
||||
class EstablishConnectionException(ConnectionException):
|
||||
"""Is raised if we have some problem during establishing connection
|
||||
procedure
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class HostConnectionNotAllowedException(EstablishConnectionException):
|
||||
"""Is raised in case of try to establish connection to temporary
|
||||
not allowed host (because of reconnection policy for example)
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class UnsupportedDriverVersion(exceptions.MessagingException):
|
||||
"""Is raised when message is received but was sent by different,
|
||||
not supported driver version
|
||||
"""
|
||||
pass
|
155
oslo_messaging/_drivers/pika_driver/pika_listener.py
Normal file
155
oslo_messaging/_drivers/pika_driver/pika_listener.py
Normal file
@ -0,0 +1,155 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from concurrent import futures
|
||||
from oslo_log import log as logging
|
||||
|
||||
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
||||
from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RpcReplyPikaListener(object):
|
||||
"""Provide functionality for listening RPC replies. Create and handle
|
||||
reply poller and coroutine for performing polling job
|
||||
"""
|
||||
|
||||
def __init__(self, pika_engine):
|
||||
self._pika_engine = pika_engine
|
||||
|
||||
# preparing poller for listening replies
|
||||
self._reply_queue = None
|
||||
|
||||
self._reply_poller = None
|
||||
self._reply_waiting_futures = {}
|
||||
|
||||
self._reply_consumer_initialized = False
|
||||
self._reply_consumer_initialization_lock = threading.Lock()
|
||||
self._poller_thread = None
|
||||
|
||||
def get_reply_qname(self, expiration_time=None):
|
||||
"""As result return reply queue name, shared for whole process,
|
||||
but before this check is RPC listener initialized or not and perform
|
||||
initialization if needed
|
||||
|
||||
:param expiration_time: Float, expiration time in seconds
|
||||
(like time.time()),
|
||||
:return: String, queue name which hould be used for reply sending
|
||||
"""
|
||||
if self._reply_consumer_initialized:
|
||||
return self._reply_queue
|
||||
|
||||
with self._reply_consumer_initialization_lock:
|
||||
if self._reply_consumer_initialized:
|
||||
return self._reply_queue
|
||||
|
||||
# generate reply queue name if needed
|
||||
if self._reply_queue is None:
|
||||
self._reply_queue = "reply.{}.{}.{}".format(
|
||||
self._pika_engine.conf.project,
|
||||
self._pika_engine.conf.prog, uuid.uuid4().hex
|
||||
)
|
||||
|
||||
# initialize reply poller if needed
|
||||
if self._reply_poller is None:
|
||||
self._reply_poller = pika_drv_poller.RpcReplyPikaPoller(
|
||||
pika_engine=self._pika_engine,
|
||||
exchange=self._pika_engine.rpc_reply_exchange,
|
||||
queue=self._reply_queue,
|
||||
prefetch_count=(
|
||||
self._pika_engine.rpc_reply_listener_prefetch_count
|
||||
)
|
||||
)
|
||||
|
||||
self._reply_poller.start(timeout=expiration_time - time.time())
|
||||
|
||||
# start reply poller job thread if needed
|
||||
if self._poller_thread is None:
|
||||
self._poller_thread = threading.Thread(target=self._poller)
|
||||
self._poller_thread.daemon = True
|
||||
|
||||
if not self._poller_thread.is_alive():
|
||||
self._poller_thread.start()
|
||||
|
||||
self._reply_consumer_initialized = True
|
||||
|
||||
return self._reply_queue
|
||||
|
||||
def _poller(self):
|
||||
"""Reply polling job. Poll replies in infinite loop and notify
|
||||
registered features
|
||||
"""
|
||||
while self._reply_poller:
|
||||
try:
|
||||
try:
|
||||
messages = self._reply_poller.poll()
|
||||
except pika_drv_exc.EstablishConnectionException:
|
||||
LOG.exception("Problem during establishing connection for "
|
||||
"reply polling")
|
||||
time.sleep(
|
||||
self._pika_engine.host_connection_reconnect_delay
|
||||
)
|
||||
continue
|
||||
|
||||
for message in messages:
|
||||
try:
|
||||
message.acknowledge()
|
||||
future = self._reply_waiting_futures.pop(
|
||||
message.msg_id, None
|
||||
)
|
||||
if future is not None:
|
||||
future.set_result(message)
|
||||
except Exception:
|
||||
LOG.exception("Unexpected exception during processing"
|
||||
"reply message")
|
||||
except BaseException:
|
||||
LOG.exception("Unexpected exception during reply polling")
|
||||
|
||||
def register_reply_waiter(self, msg_id):
|
||||
"""Register reply waiter. Should be called before message sending to
|
||||
the server
|
||||
:param msg_id: String, message_id of expected reply
|
||||
:return future: Future, container for expected reply to be returned
|
||||
over
|
||||
"""
|
||||
future = futures.Future()
|
||||
self._reply_waiting_futures[msg_id] = future
|
||||
return future
|
||||
|
||||
def unregister_reply_waiter(self, msg_id):
|
||||
"""Unregister reply waiter. Should be called if client has not got
|
||||
reply and doesn't want to continue waiting (if timeout_expired for
|
||||
example)
|
||||
:param msg_id:
|
||||
"""
|
||||
self._reply_waiting_futures.pop(msg_id, None)
|
||||
|
||||
def cleanup(self):
|
||||
"""Stop replies consuming and cleanup resources"""
|
||||
if self._reply_poller:
|
||||
self._reply_poller.stop()
|
||||
self._reply_poller.cleanup()
|
||||
self._reply_poller = None
|
||||
|
||||
if self._poller_thread:
|
||||
if self._poller_thread.is_alive():
|
||||
self._poller_thread.join()
|
||||
self._poller_thread = None
|
||||
|
||||
self._reply_queue = None
|
621
oslo_messaging/_drivers/pika_driver/pika_message.py
Normal file
621
oslo_messaging/_drivers/pika_driver/pika_message.py
Normal file
@ -0,0 +1,621 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import socket
|
||||
import time
|
||||
import traceback
|
||||
import uuid
|
||||
|
||||
from concurrent import futures
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import importutils
|
||||
from pika import exceptions as pika_exceptions
|
||||
from pika import spec as pika_spec
|
||||
import pika_pool
|
||||
import retrying
|
||||
import six
|
||||
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
||||
from oslo_messaging import _utils as utils
|
||||
from oslo_messaging import exceptions
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
_VERSION_HEADER = "version"
|
||||
_VERSION = "1.0"
|
||||
|
||||
|
||||
class RemoteExceptionMixin(object):
|
||||
"""Used for constructing dynamic exception type during deserialization of
|
||||
remote exception. It defines unified '__init__' method signature and
|
||||
exception message format
|
||||
"""
|
||||
def __init__(self, module, clazz, message, trace):
|
||||
"""Store serialized data
|
||||
:param module: String, module name for importing original exception
|
||||
class of serialized remote exception
|
||||
:param clazz: String, original class name of serialized remote
|
||||
exception
|
||||
:param message: String, original message of serialized remote
|
||||
exception
|
||||
:param trace: String, original trace of serialized remote exception
|
||||
"""
|
||||
self.module = module
|
||||
self.clazz = clazz
|
||||
self.message = message
|
||||
self.trace = trace
|
||||
|
||||
self._str_msgs = message + "\n" + "\n".join(trace)
|
||||
|
||||
def __str__(self):
|
||||
return self._str_msgs
|
||||
|
||||
|
||||
class PikaIncomingMessage(object):
|
||||
"""Driver friendly adapter for received message. Extract message
|
||||
information from RabbitMQ message and provide access to it
|
||||
"""
|
||||
|
||||
def __init__(self, pika_engine, channel, method, properties, body):
|
||||
"""Parse RabbitMQ message
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param channel: Channel, RabbitMQ channel which was used for
|
||||
this message delivery, used for sending ack back.
|
||||
If None - ack is not required
|
||||
:param method: Method, RabbitMQ message method
|
||||
:param properties: Properties, RabbitMQ message properties
|
||||
:param body: Bytes, RabbitMQ message body
|
||||
"""
|
||||
headers = getattr(properties, "headers", {})
|
||||
version = headers.get(_VERSION_HEADER, None)
|
||||
if not utils.version_is_compatible(version, _VERSION):
|
||||
raise pika_drv_exc.UnsupportedDriverVersion(
|
||||
"Message's version: {} is not compatible with driver version: "
|
||||
"{}".format(version, _VERSION))
|
||||
|
||||
self._pika_engine = pika_engine
|
||||
self._channel = channel
|
||||
self._delivery_tag = method.delivery_tag
|
||||
|
||||
self._version = version
|
||||
|
||||
self._content_type = properties.content_type
|
||||
self._content_encoding = properties.content_encoding
|
||||
self.unique_id = properties.message_id
|
||||
|
||||
self.expiration_time = (
|
||||
None if properties.expiration is None else
|
||||
time.time() + float(properties.expiration) / 1000
|
||||
)
|
||||
|
||||
if self._content_type != "application/json":
|
||||
raise NotImplementedError(
|
||||
"Content-type['{}'] is not valid, "
|
||||
"'application/json' only is supported.".format(
|
||||
self._content_type
|
||||
)
|
||||
)
|
||||
|
||||
message_dict = jsonutils.loads(body, encoding=self._content_encoding)
|
||||
|
||||
context_dict = {}
|
||||
|
||||
for key in list(message_dict.keys()):
|
||||
key = six.text_type(key)
|
||||
if key.startswith('_$_'):
|
||||
value = message_dict.pop(key)
|
||||
context_dict[key[3:]] = value
|
||||
self.message = message_dict
|
||||
self.ctxt = context_dict
|
||||
|
||||
def need_ack(self):
|
||||
return self._channel is not None
|
||||
|
||||
def acknowledge(self):
|
||||
"""Ack the message. Should be called by message processing logic when
|
||||
it considered as consumed (means that we don't need redelivery of this
|
||||
message anymore)
|
||||
"""
|
||||
if self.need_ack():
|
||||
self._channel.basic_ack(delivery_tag=self._delivery_tag)
|
||||
|
||||
def requeue(self):
|
||||
"""Rollback the message. Should be called by message processing logic
|
||||
when it can not process the message right now and should be redelivered
|
||||
later if it is possible
|
||||
"""
|
||||
if self.need_ack():
|
||||
return self._channel.basic_nack(delivery_tag=self._delivery_tag,
|
||||
requeue=True)
|
||||
|
||||
|
||||
class RpcPikaIncomingMessage(PikaIncomingMessage):
|
||||
"""PikaIncomingMessage implementation for RPC messages. It expects
|
||||
extra RPC related fields in message body (msg_id and reply_q). Also 'reply'
|
||||
method added to allow consumer to send RPC reply back to the RPC client
|
||||
"""
|
||||
|
||||
def __init__(self, pika_engine, channel, method, properties, body):
|
||||
"""Defines default values of msg_id and reply_q fields and just call
|
||||
super.__init__ method
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param channel: Channel, RabbitMQ channel which was used for
|
||||
this message delivery, used for sending ack back.
|
||||
If None - ack is not required
|
||||
:param method: Method, RabbitMQ message method
|
||||
:param properties: Properties, RabbitMQ message properties
|
||||
:param body: Bytes, RabbitMQ message body
|
||||
"""
|
||||
super(RpcPikaIncomingMessage, self).__init__(
|
||||
pika_engine, channel, method, properties, body
|
||||
)
|
||||
self.reply_q = properties.reply_to
|
||||
self.msg_id = properties.correlation_id
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
"""Send back reply to the RPC client
|
||||
:param reply: Dictionary, reply. In case of exception should be None
|
||||
:param failure: Tuple, should be a sys.exc_info() tuple.
|
||||
Should be None if RPC request was successfully processed.
|
||||
:param log_failure: Boolean, not used in this implementation.
|
||||
It present here to be compatible with driver API
|
||||
|
||||
:return RpcReplyPikaIncomingMessage, message with reply
|
||||
"""
|
||||
|
||||
if self.reply_q is None:
|
||||
return
|
||||
|
||||
reply_outgoing_message = RpcReplyPikaOutgoingMessage(
|
||||
self._pika_engine, self.msg_id, reply=reply, failure_info=failure,
|
||||
content_type=self._content_type,
|
||||
content_encoding=self._content_encoding
|
||||
)
|
||||
|
||||
def on_exception(ex):
|
||||
if isinstance(ex, pika_drv_exc.ConnectionException):
|
||||
LOG.warn(str(ex))
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
retrier = retrying.retry(
|
||||
stop_max_attempt_number=(
|
||||
None if self._pika_engine.rpc_reply_retry_attempts == -1
|
||||
else self._pika_engine.rpc_reply_retry_attempts
|
||||
),
|
||||
retry_on_exception=on_exception,
|
||||
wait_fixed=self._pika_engine.rpc_reply_retry_delay * 1000,
|
||||
) if self._pika_engine.rpc_reply_retry_attempts else None
|
||||
|
||||
try:
|
||||
reply_outgoing_message.send(
|
||||
reply_q=self.reply_q,
|
||||
expiration_time=self.expiration_time,
|
||||
retrier=retrier
|
||||
)
|
||||
LOG.debug(
|
||||
"Message [id:'{}'] replied to '{}'.".format(
|
||||
self.msg_id, self.reply_q
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
LOG.exception(
|
||||
"Message [id:'{}'] wasn't replied to : {}".format(
|
||||
self.msg_id, self.reply_q
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class RpcReplyPikaIncomingMessage(PikaIncomingMessage):
|
||||
"""PikaIncomingMessage implementation for RPC reply messages. It expects
|
||||
extra RPC reply related fields in message body (result and failure).
|
||||
"""
|
||||
def __init__(self, pika_engine, channel, method, properties, body):
|
||||
"""Defines default values of result and failure fields, call
|
||||
super.__init__ method and then construct Exception object if failure is
|
||||
not None
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param channel: Channel, RabbitMQ channel which was used for
|
||||
this message delivery, used for sending ack back.
|
||||
If None - ack is not required
|
||||
:param method: Method, RabbitMQ message method
|
||||
:param properties: Properties, RabbitMQ message properties
|
||||
:param body: Bytes, RabbitMQ message body
|
||||
"""
|
||||
super(RpcReplyPikaIncomingMessage, self).__init__(
|
||||
pika_engine, channel, method, properties, body
|
||||
)
|
||||
|
||||
self.msg_id = properties.correlation_id
|
||||
|
||||
self.result = self.message.get("s", None)
|
||||
self.failure = self.message.get("e", None)
|
||||
|
||||
if self.failure is not None:
|
||||
trace = self.failure.get('t', [])
|
||||
message = self.failure.get('s', "")
|
||||
class_name = self.failure.get('c')
|
||||
module_name = self.failure.get('m')
|
||||
|
||||
res_exc = None
|
||||
|
||||
if module_name in pika_engine.allowed_remote_exmods:
|
||||
try:
|
||||
module = importutils.import_module(module_name)
|
||||
klass = getattr(module, class_name)
|
||||
|
||||
ex_type = type(
|
||||
klass.__name__,
|
||||
(RemoteExceptionMixin, klass),
|
||||
{}
|
||||
)
|
||||
|
||||
res_exc = ex_type(module_name, class_name, message, trace)
|
||||
except ImportError as e:
|
||||
LOG.warn(
|
||||
"Can not deserialize remote exception [module:{}, "
|
||||
"class:{}]. {}".format(module_name, class_name, str(e))
|
||||
)
|
||||
|
||||
# if we have not processed failure yet, use RemoteError class
|
||||
if res_exc is None:
|
||||
res_exc = oslo_messaging.RemoteError(
|
||||
class_name, message, trace
|
||||
)
|
||||
self.failure = res_exc
|
||||
|
||||
|
||||
class PikaOutgoingMessage(object):
|
||||
"""Driver friendly adapter for sending message. Construct RabbitMQ message
|
||||
and send it
|
||||
"""
|
||||
|
||||
def __init__(self, pika_engine, message, context,
|
||||
content_type="application/json", content_encoding="utf-8"):
|
||||
"""Parse RabbitMQ message
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param message: Dictionary, user's message fields
|
||||
:param context: Dictionary, request context's fields
|
||||
:param content_type: String, content-type header, defines serialization
|
||||
mechanism
|
||||
:param content_encoding: String, defines encoding for text data
|
||||
"""
|
||||
|
||||
self._pika_engine = pika_engine
|
||||
|
||||
self._content_type = content_type
|
||||
self._content_encoding = content_encoding
|
||||
|
||||
if self._content_type != "application/json":
|
||||
raise NotImplementedError(
|
||||
"Content-type['{}'] is not valid, "
|
||||
"'application/json' only is supported.".format(
|
||||
self._content_type
|
||||
)
|
||||
)
|
||||
|
||||
self.message = message
|
||||
self.context = context
|
||||
|
||||
self.unique_id = uuid.uuid4().hex
|
||||
|
||||
def _prepare_message_to_send(self):
|
||||
"""Combine user's message fields an system fields (_unique_id,
|
||||
context's data etc)
|
||||
"""
|
||||
msg = self.message.copy()
|
||||
|
||||
if self.context:
|
||||
for key, value in six.iteritems(self.context):
|
||||
key = six.text_type(key)
|
||||
msg['_$_' + key] = value
|
||||
|
||||
props = pika_spec.BasicProperties(
|
||||
content_encoding=self._content_encoding,
|
||||
content_type=self._content_type,
|
||||
headers={_VERSION_HEADER: _VERSION},
|
||||
message_id=self.unique_id,
|
||||
)
|
||||
return msg, props
|
||||
|
||||
@staticmethod
|
||||
def _publish(pool, exchange, routing_key, body, properties, mandatory,
|
||||
expiration_time):
|
||||
"""Execute pika publish method using connection from connection pool
|
||||
Also this message catches all pika related exceptions and raise
|
||||
oslo.messaging specific exceptions
|
||||
|
||||
:param pool: Pool, pika connection pool for connection choosing
|
||||
:param exchange: String, RabbitMQ exchange name for message sending
|
||||
:param routing_key: String, RabbitMQ routing key for message routing
|
||||
:param body: Bytes, RabbitMQ message payload
|
||||
:param properties: Properties, RabbitMQ message properties
|
||||
:param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
|
||||
exception if it is not possible to deliver message to any queue)
|
||||
:param expiration_time: Float, expiration time in seconds
|
||||
(like time.time())
|
||||
"""
|
||||
timeout = (None if expiration_time is None else
|
||||
expiration_time - time.time())
|
||||
if timeout is not None and timeout < 0:
|
||||
raise exceptions.MessagingTimeout(
|
||||
"Timeout for current operation was expired."
|
||||
)
|
||||
try:
|
||||
with pool.acquire(timeout=timeout) as conn:
|
||||
if timeout is not None:
|
||||
properties.expiration = str(int(timeout * 1000))
|
||||
conn.channel.publish(
|
||||
exchange=exchange,
|
||||
routing_key=routing_key,
|
||||
body=body,
|
||||
properties=properties,
|
||||
mandatory=mandatory
|
||||
)
|
||||
except pika_exceptions.NackError as e:
|
||||
raise pika_drv_exc.MessageRejectedException(
|
||||
"Can not send message: [body: {}], properties: {}] to "
|
||||
"target [exchange: {}, routing_key: {}]. {}".format(
|
||||
body, properties, exchange, routing_key, str(e)
|
||||
)
|
||||
)
|
||||
except pika_exceptions.UnroutableError as e:
|
||||
raise pika_drv_exc.RoutingException(
|
||||
"Can not deliver message:[body:{}, properties: {}] to any"
|
||||
"queue using target: [exchange:{}, "
|
||||
"routing_key:{}]. {}".format(
|
||||
body, properties, exchange, routing_key, str(e)
|
||||
)
|
||||
)
|
||||
except pika_pool.Timeout as e:
|
||||
raise exceptions.MessagingTimeout(
|
||||
"Timeout for current operation was expired. {}".format(str(e))
|
||||
)
|
||||
except pika_pool.Connection.connectivity_errors as e:
|
||||
if (isinstance(e, pika_exceptions.ChannelClosed)
|
||||
and e.args and e.args[0] == 404):
|
||||
raise pika_drv_exc.ExchangeNotFoundException(
|
||||
"Attempt to send message to not existing exchange "
|
||||
"detected, message: [body:{}, properties: {}], target: "
|
||||
"[exchange:{}, routing_key:{}]. {}".format(
|
||||
body, properties, exchange, routing_key, str(e)
|
||||
)
|
||||
)
|
||||
|
||||
raise pika_drv_exc.ConnectionException(
|
||||
"Connectivity problem detected during sending the message: "
|
||||
"[body:{}, properties: {}] to target: [exchange:{}, "
|
||||
"routing_key:{}]. {}".format(
|
||||
body, properties, exchange, routing_key, str(e)
|
||||
)
|
||||
)
|
||||
except socket.timeout:
|
||||
raise pika_drv_exc.TimeoutConnectionException(
|
||||
"Socket timeout exceeded."
|
||||
)
|
||||
|
||||
def _do_send(self, exchange, routing_key, msg_dict, msg_props,
|
||||
confirm=True, mandatory=True, persistent=False,
|
||||
expiration_time=None, retrier=None):
|
||||
"""Send prepared message with configured retrying
|
||||
|
||||
:param exchange: String, RabbitMQ exchange name for message sending
|
||||
:param routing_key: String, RabbitMQ routing key for message routing
|
||||
:param msg_dict: Dictionary, message payload
|
||||
:param msg_props: Properties, message properties
|
||||
:param confirm: Boolean, enable publisher confirmation if True
|
||||
:param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
|
||||
exception if it is not possible to deliver message to any queue)
|
||||
:param persistent: Boolean, send persistent message if True, works only
|
||||
for routing into durable queues
|
||||
:param expiration_time: Float, expiration time in seconds
|
||||
(like time.time())
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
"""
|
||||
msg_props.delivery_mode = 2 if persistent else 1
|
||||
|
||||
pool = (self._pika_engine.connection_with_confirmation_pool
|
||||
if confirm else
|
||||
self._pika_engine.connection_without_confirmation_pool)
|
||||
|
||||
body = jsonutils.dump_as_bytes(msg_dict,
|
||||
encoding=self._content_encoding)
|
||||
|
||||
LOG.debug(
|
||||
"Sending message:[body:{}; properties: {}] to target: "
|
||||
"[exchange:{}; routing_key:{}]".format(
|
||||
body, msg_props, exchange, routing_key
|
||||
)
|
||||
)
|
||||
|
||||
publish = (self._publish if retrier is None else
|
||||
retrier(self._publish))
|
||||
|
||||
return publish(pool, exchange, routing_key, body, msg_props,
|
||||
mandatory, expiration_time)
|
||||
|
||||
def send(self, exchange, routing_key='', confirm=True, mandatory=True,
|
||||
persistent=False, expiration_time=None, retrier=None):
|
||||
"""Send message with configured retrying
|
||||
|
||||
:param exchange: String, RabbitMQ exchange name for message sending
|
||||
:param routing_key: String, RabbitMQ routing key for message routing
|
||||
:param confirm: Boolean, enable publisher confirmation if True
|
||||
:param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
|
||||
exception if it is not possible to deliver message to any queue)
|
||||
:param persistent: Boolean, send persistent message if True, works only
|
||||
for routing into durable queues
|
||||
:param expiration_time: Float, expiration time in seconds
|
||||
(like time.time())
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
"""
|
||||
msg_dict, msg_props = self._prepare_message_to_send()
|
||||
|
||||
return self._do_send(exchange, routing_key, msg_dict, msg_props,
|
||||
confirm, mandatory, persistent, expiration_time,
|
||||
retrier)
|
||||
|
||||
|
||||
class RpcPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
"""PikaOutgoingMessage implementation for RPC messages. It adds
|
||||
possibility to wait and receive RPC reply
|
||||
"""
|
||||
def __init__(self, pika_engine, message, context,
|
||||
content_type="application/json", content_encoding="utf-8"):
|
||||
super(RpcPikaOutgoingMessage, self).__init__(
|
||||
pika_engine, message, context, content_type, content_encoding
|
||||
)
|
||||
self.msg_id = None
|
||||
self.reply_q = None
|
||||
|
||||
def send(self, target, reply_listener=None, expiration_time=None,
|
||||
retrier=None):
|
||||
"""Send RPC message with configured retrying
|
||||
|
||||
:param target: Target, oslo.messaging target which defines RPC service
|
||||
:param reply_listener: RpcReplyPikaListener, listener for waiting
|
||||
reply. If None - return immediately without reply waiting
|
||||
:param expiration_time: Float, expiration time in seconds
|
||||
(like time.time())
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
"""
|
||||
|
||||
exchange = self._pika_engine.get_rpc_exchange_name(
|
||||
target.exchange, target.topic, target.fanout, retrier is None
|
||||
)
|
||||
|
||||
queue = "" if target.fanout else self._pika_engine.get_rpc_queue_name(
|
||||
target.topic, target.server, retrier is None
|
||||
)
|
||||
|
||||
msg_dict, msg_props = self._prepare_message_to_send()
|
||||
|
||||
if reply_listener:
|
||||
self.msg_id = uuid.uuid4().hex
|
||||
msg_props.correlation_id = self.msg_id
|
||||
LOG.debug('MSG_ID is %s', self.msg_id)
|
||||
|
||||
self.reply_q = reply_listener.get_reply_qname(
|
||||
expiration_time - time.time()
|
||||
)
|
||||
msg_props.reply_to = self.reply_q
|
||||
|
||||
future = reply_listener.register_reply_waiter(msg_id=self.msg_id)
|
||||
|
||||
self._do_send(
|
||||
exchange=exchange, routing_key=queue, msg_dict=msg_dict,
|
||||
msg_props=msg_props, confirm=True, mandatory=True,
|
||||
persistent=False, expiration_time=expiration_time,
|
||||
retrier=retrier
|
||||
)
|
||||
|
||||
try:
|
||||
return future.result(expiration_time - time.time())
|
||||
except BaseException as e:
|
||||
reply_listener.unregister_reply_waiter(self.msg_id)
|
||||
if isinstance(e, futures.TimeoutError):
|
||||
e = exceptions.MessagingTimeout()
|
||||
raise e
|
||||
else:
|
||||
self._do_send(
|
||||
exchange=exchange, routing_key=queue, msg_dict=msg_dict,
|
||||
msg_props=msg_props, confirm=True, mandatory=True,
|
||||
persistent=False, expiration_time=expiration_time,
|
||||
retrier=retrier
|
||||
)
|
||||
|
||||
|
||||
class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
"""PikaOutgoingMessage implementation for RPC reply messages. It sets
|
||||
correlation_id AMQP property to link this reply with response
|
||||
"""
|
||||
def __init__(self, pika_engine, msg_id, reply=None, failure_info=None,
|
||||
content_type="application/json", content_encoding="utf-8"):
|
||||
"""Initialize with reply information for sending
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param msg_id: String, msg_id of RPC request, which waits for reply
|
||||
:param reply: Dictionary, reply. In case of exception should be None
|
||||
:param failure_info: Tuple, should be a sys.exc_info() tuple.
|
||||
Should be None if RPC request was successfully processed.
|
||||
:param content_type: String, content-type header, defines serialization
|
||||
mechanism
|
||||
:param content_encoding: String, defines encoding for text data
|
||||
"""
|
||||
self.msg_id = msg_id
|
||||
|
||||
if failure_info is not None:
|
||||
ex_class = failure_info[0]
|
||||
ex = failure_info[1]
|
||||
tb = traceback.format_exception(*failure_info)
|
||||
if issubclass(ex_class, RemoteExceptionMixin):
|
||||
failure_data = {
|
||||
'c': ex.clazz,
|
||||
'm': ex.module,
|
||||
's': ex.message,
|
||||
't': tb
|
||||
}
|
||||
else:
|
||||
failure_data = {
|
||||
'c': six.text_type(ex_class.__name__),
|
||||
'm': six.text_type(ex_class.__module__),
|
||||
's': six.text_type(ex),
|
||||
't': tb
|
||||
}
|
||||
|
||||
msg = {'e': failure_data}
|
||||
else:
|
||||
msg = {'s': reply}
|
||||
|
||||
super(RpcReplyPikaOutgoingMessage, self).__init__(
|
||||
pika_engine, msg, None, content_type, content_encoding
|
||||
)
|
||||
|
||||
def send(self, reply_q, expiration_time=None, retrier=None):
|
||||
"""Send RPC message with configured retrying
|
||||
|
||||
:param reply_q: String, queue name for sending reply
|
||||
:param expiration_time: Float, expiration time in seconds
|
||||
(like time.time())
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
"""
|
||||
|
||||
msg_dict, msg_props = self._prepare_message_to_send()
|
||||
msg_props.correlation_id = self.msg_id
|
||||
|
||||
self._do_send(
|
||||
exchange=self._pika_engine.rpc_reply_exchange, routing_key=reply_q,
|
||||
msg_dict=msg_dict, msg_props=msg_props, confirm=True,
|
||||
mandatory=True, persistent=False, expiration_time=expiration_time,
|
||||
retrier=retrier
|
||||
)
|
402
oslo_messaging/_drivers/pika_driver/pika_poller.py
Normal file
402
oslo_messaging/_drivers/pika_driver/pika_poller.py
Normal file
@ -0,0 +1,402 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo_log import log as logging
|
||||
import pika_pool
|
||||
import retrying
|
||||
import six
|
||||
|
||||
from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PikaPoller(object):
|
||||
"""Provides user friendly functionality for RabbitMQ message consuming,
|
||||
handles low level connectivity problems and restore connection if some
|
||||
connectivity related problem detected
|
||||
"""
|
||||
|
||||
def __init__(self, pika_engine, prefetch_count, incoming_message_class):
|
||||
"""Initialize required fields
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param prefetch_count: Integer, maximum count of unacknowledged
|
||||
messages which RabbitMQ broker sends to this consumer
|
||||
:param incoming_message_class: PikaIncomingMessage, wrapper for
|
||||
consumed RabbitMQ message
|
||||
"""
|
||||
self._pika_engine = pika_engine
|
||||
self._prefetch_count = prefetch_count
|
||||
self._incoming_message_class = incoming_message_class
|
||||
|
||||
self._connection = None
|
||||
self._channel = None
|
||||
self._lock = threading.Lock()
|
||||
|
||||
self._started = False
|
||||
|
||||
self._queues_to_consume = None
|
||||
|
||||
self._message_queue = []
|
||||
|
||||
def _reconnect(self):
|
||||
"""Performs reconnection to the broker. It is unsafe method for
|
||||
internal use only
|
||||
"""
|
||||
self._connection = self._pika_engine.create_connection(
|
||||
for_listening=True
|
||||
)
|
||||
self._channel = self._connection.channel()
|
||||
self._channel.basic_qos(prefetch_count=self._prefetch_count)
|
||||
|
||||
if self._queues_to_consume is None:
|
||||
self._queues_to_consume = self._declare_queue_binding()
|
||||
|
||||
for queue, no_ack in six.iteritems(self._queues_to_consume):
|
||||
self._start_consuming(queue, no_ack)
|
||||
|
||||
def _declare_queue_binding(self):
|
||||
"""Is called by recovering connection logic if target RabbitMQ
|
||||
exchange and (or) queue do not exist. Should be overridden in child
|
||||
classes
|
||||
|
||||
:return Dictionary, declared_queue_name -> no_ack_mode
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
"It is base class. Please declare exchanges and queues here"
|
||||
)
|
||||
|
||||
def _start_consuming(self, queue, no_ack):
|
||||
"""Is called by recovering connection logic for starting consumption
|
||||
of the RabbitMQ queue
|
||||
|
||||
:param queue: String, RabbitMQ queue name for consuming
|
||||
:param no_ack: Boolean, Choose consuming acknowledgement mode. If True,
|
||||
acknowledges are not needed. RabbitMQ considers message consumed
|
||||
after sending it to consumer immediately
|
||||
"""
|
||||
on_message_no_ack_callback = (
|
||||
self._on_message_no_ack_callback if no_ack
|
||||
else self._on_message_with_ack_callback
|
||||
)
|
||||
|
||||
try:
|
||||
self._channel.basic_consume(on_message_no_ack_callback, queue,
|
||||
no_ack=no_ack)
|
||||
except Exception:
|
||||
self._queues_to_consume = None
|
||||
raise
|
||||
|
||||
def _on_message_no_ack_callback(self, unused, method, properties, body):
|
||||
"""Is called by Pika when message was received from queue listened with
|
||||
no_ack=True mode
|
||||
"""
|
||||
self._message_queue.append(
|
||||
self._incoming_message_class(
|
||||
self._pika_engine, None, method, properties, body
|
||||
)
|
||||
)
|
||||
|
||||
def _on_message_with_ack_callback(self, unused, method, properties, body):
|
||||
"""Is called by Pika when message was received from queue listened with
|
||||
no_ack=False mode
|
||||
"""
|
||||
self._message_queue.append(
|
||||
self._incoming_message_class(
|
||||
self._pika_engine, self._channel, method, properties, body
|
||||
)
|
||||
)
|
||||
|
||||
def _cleanup(self):
|
||||
"""Cleanup allocated resources (channel, connection, etc). It is unsafe
|
||||
method for internal use only
|
||||
"""
|
||||
if self._channel:
|
||||
try:
|
||||
self._channel.close()
|
||||
except Exception as ex:
|
||||
if not pika_pool.Connection.is_connection_invalidated(ex):
|
||||
LOG.exception("Unexpected error during closing channel")
|
||||
self._channel = None
|
||||
|
||||
if self._connection:
|
||||
try:
|
||||
self._connection.close()
|
||||
except Exception as ex:
|
||||
if not pika_pool.Connection.is_connection_invalidated(ex):
|
||||
LOG.exception("Unexpected error during closing connection")
|
||||
self._connection = None
|
||||
|
||||
for i in xrange(len(self._message_queue) - 1, -1, -1):
|
||||
message = self._message_queue[i]
|
||||
if message.need_ack():
|
||||
del self._message_queue[i]
|
||||
|
||||
def poll(self, timeout=None, prefetch_size=1):
|
||||
"""Main method of this class - consumes message from RabbitMQ
|
||||
|
||||
:param: timeout: float, seconds, timeout for waiting new incoming
|
||||
message, None means wait forever
|
||||
:param: prefetch_size: Integer, count of messages which we are want to
|
||||
poll. It blocks until prefetch_size messages are consumed or until
|
||||
timeout gets expired
|
||||
:return: list of PikaIncomingMessage, RabbitMQ messages
|
||||
"""
|
||||
expiration_time = time.time() + timeout if timeout else None
|
||||
|
||||
while True:
|
||||
with self._lock:
|
||||
if timeout is not None:
|
||||
timeout = expiration_time - time.time()
|
||||
if (len(self._message_queue) < prefetch_size and
|
||||
self._started and ((timeout is None) or timeout > 0)):
|
||||
try:
|
||||
if self._channel is None:
|
||||
self._reconnect()
|
||||
# we need some time_limit here, not too small to avoid
|
||||
# a lot of not needed iterations but not too large to
|
||||
# release lock time to time and give a chance to
|
||||
# perform another method waiting this lock
|
||||
self._connection.process_data_events(
|
||||
time_limit=0.25
|
||||
)
|
||||
except pika_pool.Connection.connectivity_errors:
|
||||
self._cleanup()
|
||||
raise
|
||||
else:
|
||||
result = self._message_queue[:prefetch_size]
|
||||
del self._message_queue[:prefetch_size]
|
||||
return result
|
||||
|
||||
def start(self):
|
||||
"""Starts poller. Should be called before polling to allow message
|
||||
consuming
|
||||
"""
|
||||
self._started = True
|
||||
|
||||
def stop(self):
|
||||
"""Stops poller. Should be called when polling is not needed anymore to
|
||||
stop new message consuming. After that it is necessary to poll already
|
||||
prefetched messages
|
||||
"""
|
||||
with self._lock:
|
||||
if not self._started:
|
||||
return
|
||||
|
||||
self._started = False
|
||||
|
||||
def reconnect(self):
|
||||
"""Safe version of _reconnect. Performs reconnection to the broker."""
|
||||
with self._lock:
|
||||
self._cleanup()
|
||||
try:
|
||||
self._reconnect()
|
||||
except Exception:
|
||||
self._cleanup()
|
||||
raise
|
||||
|
||||
def cleanup(self):
|
||||
"""Safe version of _cleanup. Cleans up allocated resources (channel,
|
||||
connection, etc).
|
||||
"""
|
||||
with self._lock:
|
||||
self._cleanup()
|
||||
|
||||
|
||||
class RpcServicePikaPoller(PikaPoller):
|
||||
"""PikaPoller implementation for polling RPC messages. Overrides base
|
||||
functionality according to RPC specific
|
||||
"""
|
||||
def __init__(self, pika_engine, target, prefetch_count):
|
||||
"""Adds target parameter for declaring RPC specific exchanges and
|
||||
queues
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param target: Target, oslo.messaging Target object which defines RPC
|
||||
endpoint
|
||||
:param prefetch_count: Integer, maximum count of unacknowledged
|
||||
messages which RabbitMQ broker sends to this consumer
|
||||
"""
|
||||
self._target = target
|
||||
|
||||
super(RpcServicePikaPoller, self).__init__(
|
||||
pika_engine, prefetch_count=prefetch_count,
|
||||
incoming_message_class=pika_drv_msg.RpcPikaIncomingMessage
|
||||
)
|
||||
|
||||
def _declare_queue_binding(self):
|
||||
"""Overrides base method and perform declaration of RabbitMQ exchanges
|
||||
and queues which correspond to oslo.messaging RPC target
|
||||
|
||||
:return Dictionary, declared_queue_name -> no_ack_mode
|
||||
"""
|
||||
queue_expiration = self._pika_engine.rpc_queue_expiration
|
||||
|
||||
queues_to_consume = {}
|
||||
|
||||
for no_ack in [True, False]:
|
||||
exchange = self._pika_engine.get_rpc_exchange_name(
|
||||
self._target.exchange, self._target.topic, False, no_ack
|
||||
)
|
||||
fanout_exchange = self._pika_engine.get_rpc_exchange_name(
|
||||
self._target.exchange, self._target.topic, True, no_ack
|
||||
)
|
||||
queue = self._pika_engine.get_rpc_queue_name(
|
||||
self._target.topic, None, no_ack
|
||||
)
|
||||
server_queue = self._pika_engine.get_rpc_queue_name(
|
||||
self._target.topic, self._target.server, no_ack
|
||||
)
|
||||
|
||||
queues_to_consume[queue] = no_ack
|
||||
queues_to_consume[server_queue] = no_ack
|
||||
|
||||
self._pika_engine.declare_queue_binding_by_channel(
|
||||
channel=self._channel, exchange=exchange, queue=queue,
|
||||
routing_key=queue, exchange_type='direct', durable=False,
|
||||
queue_expiration=queue_expiration
|
||||
)
|
||||
self._pika_engine.declare_queue_binding_by_channel(
|
||||
channel=self._channel, exchange=exchange, queue=server_queue,
|
||||
routing_key=server_queue, exchange_type='direct',
|
||||
queue_expiration=queue_expiration, durable=False
|
||||
)
|
||||
self._pika_engine.declare_queue_binding_by_channel(
|
||||
channel=self._channel, exchange=fanout_exchange, durable=False,
|
||||
queue=server_queue, routing_key="", exchange_type='fanout',
|
||||
queue_expiration=queue_expiration
|
||||
)
|
||||
return queues_to_consume
|
||||
|
||||
|
||||
class RpcReplyPikaPoller(PikaPoller):
|
||||
"""PikaPoller implementation for polling RPC reply messages. Overrides
|
||||
base functionality according to RPC reply specific
|
||||
"""
|
||||
def __init__(self, pika_engine, exchange, queue, prefetch_count):
|
||||
"""Adds exchange and queue parameter for declaring exchange and queue
|
||||
used for RPC reply delivery
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param exchange: String, exchange name used for RPC reply delivery
|
||||
:param queue: String, queue name used for RPC reply delivery
|
||||
:param prefetch_count: Integer, maximum count of unacknowledged
|
||||
messages which RabbitMQ broker sends to this consumer
|
||||
"""
|
||||
self._exchange = exchange
|
||||
self._queue = queue
|
||||
|
||||
super(RpcReplyPikaPoller, self).__init__(
|
||||
pika_engine=pika_engine, prefetch_count=prefetch_count,
|
||||
incoming_message_class=pika_drv_msg.RpcReplyPikaIncomingMessage
|
||||
)
|
||||
|
||||
def _declare_queue_binding(self):
|
||||
"""Overrides base method and perform declaration of RabbitMQ exchange
|
||||
and queue used for RPC reply delivery
|
||||
|
||||
:return Dictionary, declared_queue_name -> no_ack_mode
|
||||
"""
|
||||
self._pika_engine.declare_queue_binding_by_channel(
|
||||
channel=self._channel,
|
||||
exchange=self._exchange, queue=self._queue,
|
||||
routing_key=self._queue, exchange_type='direct',
|
||||
queue_expiration=self._pika_engine.rpc_queue_expiration,
|
||||
durable=False
|
||||
)
|
||||
|
||||
return {self._queue: False}
|
||||
|
||||
def start(self, timeout=None):
|
||||
"""Overrides default behaviour of start method. Base start method
|
||||
does not create connection to RabbitMQ during start method (uses
|
||||
lazy connecting during first poll method call). This class should be
|
||||
connected after start call to ensure that exchange and queue for reply
|
||||
delivery are created before RPC request sending
|
||||
"""
|
||||
super(RpcReplyPikaPoller, self).start()
|
||||
|
||||
def on_exception(ex):
|
||||
LOG.warn(str(ex))
|
||||
|
||||
return True
|
||||
|
||||
retrier = retrying.retry(
|
||||
stop_max_attempt_number=self._pika_engine.rpc_reply_retry_attempts,
|
||||
stop_max_delay=None if timeout is None else timeout * 1000,
|
||||
wait_fixed=self._pika_engine.rpc_reply_retry_delay * 1000,
|
||||
retry_on_exception=on_exception,
|
||||
)
|
||||
|
||||
retrier(self.reconnect)()
|
||||
|
||||
|
||||
class NotificationPikaPoller(PikaPoller):
|
||||
"""PikaPoller implementation for polling Notification messages. Overrides
|
||||
base functionality according to Notification specific
|
||||
"""
|
||||
def __init__(self, pika_engine, targets_and_priorities,
|
||||
queue_name=None, prefetch_count=100):
|
||||
"""Adds targets_and_priorities and queue_name parameter
|
||||
for declaring exchanges and queues used for notification delivery
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param targets_and_priorities: list of (target, priority), defines
|
||||
default queue names for corresponding notification types
|
||||
:param queue: String, alternative queue name used for this poller
|
||||
instead of default queue name
|
||||
:param prefetch_count: Integer, maximum count of unacknowledged
|
||||
messages which RabbitMQ broker sends to this consumer
|
||||
"""
|
||||
self._targets_and_priorities = targets_and_priorities
|
||||
self._queue_name = queue_name
|
||||
|
||||
super(NotificationPikaPoller, self).__init__(
|
||||
pika_engine, prefetch_count=prefetch_count,
|
||||
incoming_message_class=pika_drv_msg.PikaIncomingMessage
|
||||
)
|
||||
|
||||
def _declare_queue_binding(self):
|
||||
"""Overrides base method and perform declaration of RabbitMQ exchanges
|
||||
and queues used for notification delivery
|
||||
|
||||
:return Dictionary, declared_queue_name -> no_ack_mode
|
||||
"""
|
||||
queues_to_consume = {}
|
||||
for target, priority in self._targets_and_priorities:
|
||||
routing_key = '%s.%s' % (target.topic, priority)
|
||||
queue = self._queue_name or routing_key
|
||||
self._pika_engine.declare_queue_binding_by_channel(
|
||||
channel=self._channel,
|
||||
exchange=(
|
||||
target.exchange or
|
||||
self._pika_engine.default_notification_exchange
|
||||
),
|
||||
queue = queue,
|
||||
routing_key=routing_key,
|
||||
exchange_type='direct',
|
||||
queue_expiration=None,
|
||||
durable=self._pika_engine.notification_persistence,
|
||||
)
|
||||
queues_to_consume[queue] = False
|
||||
|
||||
return queues_to_consume
|
0
oslo_messaging/tests/drivers/pika/__init__.py
Normal file
0
oslo_messaging/tests/drivers/pika/__init__.py
Normal file
622
oslo_messaging/tests/drivers/pika/test_message.py
Normal file
622
oslo_messaging/tests/drivers/pika/test_message.py
Normal file
@ -0,0 +1,622 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import functools
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from concurrent import futures
|
||||
from mock import mock, patch
|
||||
from oslo_serialization import jsonutils
|
||||
import pika
|
||||
from pika import spec
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers.pika_driver import pika_engine
|
||||
from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
|
||||
|
||||
|
||||
class PikaIncomingMessageTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._pika_engine = mock.Mock()
|
||||
self._channel = mock.Mock()
|
||||
|
||||
self._delivery_tag = 12345
|
||||
|
||||
self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
|
||||
self._properties = pika.BasicProperties(
|
||||
content_type="application/json",
|
||||
headers={"version": "1.0"},
|
||||
)
|
||||
self._body = (
|
||||
b'{"_$_key_context":"context_value",'
|
||||
b'"payload_key": "payload_value"}'
|
||||
)
|
||||
|
||||
def test_message_body_parsing(self):
|
||||
message = pika_drv_msg.PikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body
|
||||
)
|
||||
|
||||
self.assertEqual(message.ctxt.get("key_context", None),
|
||||
"context_value")
|
||||
self.assertEqual(message.message.get("payload_key", None),
|
||||
"payload_value")
|
||||
|
||||
def test_message_acknowledge(self):
|
||||
message = pika_drv_msg.PikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body
|
||||
)
|
||||
|
||||
message.acknowledge()
|
||||
|
||||
self.assertEqual(1, self._channel.basic_ack.call_count)
|
||||
self.assertEqual({"delivery_tag": self._delivery_tag},
|
||||
self._channel.basic_ack.call_args[1])
|
||||
|
||||
def test_message_acknowledge_no_ack(self):
|
||||
message = pika_drv_msg.PikaIncomingMessage(
|
||||
self._pika_engine, None, self._method, self._properties,
|
||||
self._body
|
||||
)
|
||||
|
||||
message.acknowledge()
|
||||
|
||||
self.assertEqual(0, self._channel.basic_ack.call_count)
|
||||
|
||||
def test_message_requeue(self):
|
||||
message = pika_drv_msg.PikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body
|
||||
)
|
||||
|
||||
message.requeue()
|
||||
|
||||
self.assertEqual(1, self._channel.basic_nack.call_count)
|
||||
self.assertEqual({"delivery_tag": self._delivery_tag, 'requeue': True},
|
||||
self._channel.basic_nack.call_args[1])
|
||||
|
||||
def test_message_requeue_no_ack(self):
|
||||
message = pika_drv_msg.PikaIncomingMessage(
|
||||
self._pika_engine, None, self._method, self._properties,
|
||||
self._body
|
||||
)
|
||||
|
||||
message.requeue()
|
||||
|
||||
self.assertEqual(0, self._channel.basic_nack.call_count)
|
||||
|
||||
|
||||
class RpcPikaIncomingMessageTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._pika_engine = mock.Mock()
|
||||
self._pika_engine.rpc_reply_retry_attempts = 3
|
||||
self._pika_engine.rpc_reply_retry_delay = 0.25
|
||||
|
||||
self._channel = mock.Mock()
|
||||
|
||||
self._delivery_tag = 12345
|
||||
|
||||
self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
|
||||
self._body = (
|
||||
b'{"_$_key_context":"context_value",'
|
||||
b'"payload_key":"payload_value"}'
|
||||
)
|
||||
self._properties = pika.BasicProperties(
|
||||
content_type="application/json",
|
||||
content_encoding="utf-8",
|
||||
headers={"version": "1.0"},
|
||||
)
|
||||
|
||||
def test_call_message_body_parsing(self):
|
||||
self._properties.correlation_id = 123456789
|
||||
self._properties.reply_to = "reply_queue"
|
||||
|
||||
message = pika_drv_msg.RpcPikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body
|
||||
)
|
||||
|
||||
self.assertEqual(message.ctxt.get("key_context", None),
|
||||
"context_value")
|
||||
self.assertEqual(message.msg_id, 123456789)
|
||||
self.assertEqual(message.reply_q, "reply_queue")
|
||||
|
||||
self.assertEqual(message.message.get("payload_key", None),
|
||||
"payload_value")
|
||||
|
||||
def test_cast_message_body_parsing(self):
|
||||
message = pika_drv_msg.RpcPikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body
|
||||
)
|
||||
|
||||
self.assertEqual(message.ctxt.get("key_context", None),
|
||||
"context_value")
|
||||
self.assertEqual(message.msg_id, None)
|
||||
self.assertEqual(message.reply_q, None)
|
||||
|
||||
self.assertEqual(message.message.get("payload_key", None),
|
||||
"payload_value")
|
||||
|
||||
@patch(("oslo_messaging._drivers.pika_driver.pika_message."
|
||||
"PikaOutgoingMessage.send"))
|
||||
def test_reply_for_cast_message(self, send_reply_mock):
|
||||
message = pika_drv_msg.RpcPikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body
|
||||
)
|
||||
|
||||
self.assertEqual(message.ctxt.get("key_context", None),
|
||||
"context_value")
|
||||
self.assertEqual(message.msg_id, None)
|
||||
self.assertEqual(message.reply_q, None)
|
||||
|
||||
self.assertEqual(message.message.get("payload_key", None),
|
||||
"payload_value")
|
||||
|
||||
message.reply(reply=object())
|
||||
|
||||
self.assertEqual(send_reply_mock.call_count, 0)
|
||||
|
||||
@patch("oslo_messaging._drivers.pika_driver.pika_message."
|
||||
"RpcReplyPikaOutgoingMessage")
|
||||
@patch("retrying.retry")
|
||||
def test_positive_reply_for_call_message(self,
|
||||
retry_mock,
|
||||
outgoing_message_mock):
|
||||
self._properties.correlation_id = 123456789
|
||||
self._properties.reply_to = "reply_queue"
|
||||
|
||||
message = pika_drv_msg.RpcPikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body
|
||||
)
|
||||
|
||||
self.assertEqual(message.ctxt.get("key_context", None),
|
||||
"context_value")
|
||||
self.assertEqual(message.msg_id, 123456789)
|
||||
self.assertEqual(message.reply_q, "reply_queue")
|
||||
|
||||
self.assertEqual(message.message.get("payload_key", None),
|
||||
"payload_value")
|
||||
reply = "all_fine"
|
||||
message.reply(reply=reply)
|
||||
|
||||
outgoing_message_mock.assert_called_once_with(
|
||||
self._pika_engine, 123456789, failure_info=None, reply='all_fine',
|
||||
content_encoding='utf-8', content_type='application/json'
|
||||
)
|
||||
outgoing_message_mock().send.assert_called_once_with(
|
||||
expiration_time=None, reply_q='reply_queue', retrier=mock.ANY
|
||||
)
|
||||
retry_mock.assert_called_once_with(
|
||||
retry_on_exception=mock.ANY, stop_max_attempt_number=3,
|
||||
wait_fixed=250.0
|
||||
)
|
||||
|
||||
@patch("oslo_messaging._drivers.pika_driver.pika_message."
|
||||
"RpcReplyPikaOutgoingMessage")
|
||||
@patch("retrying.retry")
|
||||
def test_negative_reply_for_call_message(self,
|
||||
retry_mock,
|
||||
outgoing_message_mock):
|
||||
self._properties.correlation_id = 123456789
|
||||
self._properties.reply_to = "reply_queue"
|
||||
|
||||
message = pika_drv_msg.RpcPikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
self._body
|
||||
)
|
||||
|
||||
self.assertEqual(message.ctxt.get("key_context", None),
|
||||
"context_value")
|
||||
self.assertEqual(message.msg_id, 123456789)
|
||||
self.assertEqual(message.reply_q, "reply_queue")
|
||||
|
||||
self.assertEqual(message.message.get("payload_key", None),
|
||||
"payload_value")
|
||||
|
||||
failure_info = object()
|
||||
message.reply(failure=failure_info)
|
||||
|
||||
outgoing_message_mock.assert_called_once_with(
|
||||
self._pika_engine, 123456789,
|
||||
failure_info=failure_info,
|
||||
reply=None,
|
||||
content_encoding='utf-8',
|
||||
content_type='application/json'
|
||||
)
|
||||
outgoing_message_mock().send.assert_called_once_with(
|
||||
expiration_time=None, reply_q='reply_queue', retrier=mock.ANY
|
||||
)
|
||||
retry_mock.assert_called_once_with(
|
||||
retry_on_exception=mock.ANY, stop_max_attempt_number=3,
|
||||
wait_fixed=250.0
|
||||
)
|
||||
|
||||
|
||||
class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._pika_engine = mock.Mock()
|
||||
self._pika_engine.allowed_remote_exmods = [
|
||||
pika_engine._EXCEPTIONS_MODULE, "oslo_messaging.exceptions"
|
||||
]
|
||||
|
||||
self._channel = mock.Mock()
|
||||
|
||||
self._delivery_tag = 12345
|
||||
|
||||
self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
|
||||
|
||||
self._properties = pika.BasicProperties(
|
||||
content_type="application/json",
|
||||
content_encoding="utf-8",
|
||||
headers={"version": "1.0"},
|
||||
correlation_id=123456789
|
||||
)
|
||||
|
||||
def test_positive_reply_message_body_parsing(self):
|
||||
|
||||
body = b'{"s": "all fine"}'
|
||||
|
||||
message = pika_drv_msg.RpcReplyPikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
body
|
||||
)
|
||||
|
||||
self.assertEqual(message.msg_id, 123456789)
|
||||
self.assertIsNone(message.failure)
|
||||
self.assertEquals(message.result, "all fine")
|
||||
|
||||
def test_negative_reply_message_body_parsing(self):
|
||||
|
||||
body = (b'{'
|
||||
b' "e": {'
|
||||
b' "s": "Error message",'
|
||||
b' "t": ["TRACE HERE"],'
|
||||
b' "c": "MessagingException",'
|
||||
b' "m": "oslo_messaging.exceptions"'
|
||||
b' }'
|
||||
b'}')
|
||||
|
||||
message = pika_drv_msg.RpcReplyPikaIncomingMessage(
|
||||
self._pika_engine, self._channel, self._method, self._properties,
|
||||
body
|
||||
)
|
||||
|
||||
self.assertEqual(message.msg_id, 123456789)
|
||||
self.assertIsNone(message.result)
|
||||
self.assertEquals(
|
||||
str(message.failure),
|
||||
'Error message\n'
|
||||
'TRACE HERE'
|
||||
)
|
||||
self.assertIsInstance(message.failure,
|
||||
oslo_messaging.MessagingException)
|
||||
|
||||
|
||||
class PikaOutgoingMessageTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._pika_engine = mock.MagicMock()
|
||||
self._exchange = "it is exchange"
|
||||
self._routing_key = "it is routing key"
|
||||
self._expiration = 1
|
||||
self._expiration_time = time.time() + self._expiration
|
||||
self._mandatory = object()
|
||||
|
||||
self._message = {"msg_type": 1, "msg_str": "hello"}
|
||||
self._context = {"request_id": 555, "token": "it is a token"}
|
||||
|
||||
@patch("oslo_serialization.jsonutils.dumps",
|
||||
new=functools.partial(jsonutils.dumps, sort_keys=True))
|
||||
def test_send_with_confirmation(self):
|
||||
message = pika_drv_msg.PikaOutgoingMessage(
|
||||
self._pika_engine, self._message, self._context
|
||||
)
|
||||
|
||||
message.send(
|
||||
exchange=self._exchange,
|
||||
routing_key=self._routing_key,
|
||||
confirm=True,
|
||||
mandatory=self._mandatory,
|
||||
persistent=True,
|
||||
expiration_time=self._expiration_time,
|
||||
retrier=None
|
||||
)
|
||||
|
||||
self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.assert_called_once_with(
|
||||
body=mock.ANY,
|
||||
exchange=self._exchange, mandatory=self._mandatory,
|
||||
properties=mock.ANY,
|
||||
routing_key=self._routing_key
|
||||
)
|
||||
|
||||
body = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["body"]
|
||||
|
||||
self.assertEqual(
|
||||
b'{"_$_request_id": 555, "_$_token": "it is a token", '
|
||||
b'"msg_str": "hello", "msg_type": 1}',
|
||||
body
|
||||
)
|
||||
|
||||
props = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["properties"]
|
||||
|
||||
self.assertEqual(props.content_encoding, 'utf-8')
|
||||
self.assertEqual(props.content_type, 'application/json')
|
||||
self.assertEqual(props.delivery_mode, 2)
|
||||
self.assertTrue(self._expiration * 1000 - float(props.expiration) <
|
||||
100)
|
||||
self.assertEqual(props.headers, {'version': '1.0'})
|
||||
self.assertTrue(props.message_id)
|
||||
|
||||
@patch("oslo_serialization.jsonutils.dumps",
|
||||
new=functools.partial(jsonutils.dumps, sort_keys=True))
|
||||
def test_send_without_confirmation(self):
|
||||
message = pika_drv_msg.PikaOutgoingMessage(
|
||||
self._pika_engine, self._message, self._context
|
||||
)
|
||||
|
||||
message.send(
|
||||
exchange=self._exchange,
|
||||
routing_key=self._routing_key,
|
||||
confirm=False,
|
||||
mandatory=self._mandatory,
|
||||
persistent=False,
|
||||
expiration_time=self._expiration_time,
|
||||
retrier=None
|
||||
)
|
||||
|
||||
self._pika_engine.connection_without_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.assert_called_once_with(
|
||||
body=mock.ANY,
|
||||
exchange=self._exchange, mandatory=self._mandatory,
|
||||
properties=mock.ANY,
|
||||
routing_key=self._routing_key
|
||||
)
|
||||
|
||||
body = self._pika_engine.connection_without_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["body"]
|
||||
|
||||
self.assertEqual(
|
||||
b'{"_$_request_id": 555, "_$_token": "it is a token", '
|
||||
b'"msg_str": "hello", "msg_type": 1}',
|
||||
body
|
||||
)
|
||||
|
||||
props = self._pika_engine.connection_without_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["properties"]
|
||||
|
||||
self.assertEqual(props.content_encoding, 'utf-8')
|
||||
self.assertEqual(props.content_type, 'application/json')
|
||||
self.assertEqual(props.delivery_mode, 1)
|
||||
self.assertTrue(self._expiration * 1000 - float(props.expiration)
|
||||
< 100)
|
||||
self.assertEqual(props.headers, {'version': '1.0'})
|
||||
self.assertTrue(props.message_id)
|
||||
|
||||
|
||||
class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._exchange = "it is exchange"
|
||||
self._routing_key = "it is routing key"
|
||||
|
||||
self._pika_engine = mock.MagicMock()
|
||||
self._pika_engine.get_rpc_exchange_name.return_value = self._exchange
|
||||
self._pika_engine.get_rpc_queue_name.return_value = self._routing_key
|
||||
|
||||
self._message = {"msg_type": 1, "msg_str": "hello"}
|
||||
self._context = {"request_id": 555, "token": "it is a token"}
|
||||
|
||||
@patch("oslo_serialization.jsonutils.dumps",
|
||||
new=functools.partial(jsonutils.dumps, sort_keys=True))
|
||||
def test_send_cast_message(self):
|
||||
message = pika_drv_msg.RpcPikaOutgoingMessage(
|
||||
self._pika_engine, self._message, self._context
|
||||
)
|
||||
|
||||
expiration = 1
|
||||
expiration_time = time.time() + expiration
|
||||
|
||||
message.send(
|
||||
target=oslo_messaging.Target(exchange=self._exchange,
|
||||
topic=self._routing_key),
|
||||
reply_listener=None,
|
||||
expiration_time=expiration_time,
|
||||
retrier=None
|
||||
)
|
||||
|
||||
self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.assert_called_once_with(
|
||||
body=mock.ANY,
|
||||
exchange=self._exchange, mandatory=True,
|
||||
properties=mock.ANY,
|
||||
routing_key=self._routing_key
|
||||
)
|
||||
|
||||
body = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["body"]
|
||||
|
||||
self.assertEqual(
|
||||
b'{"_$_request_id": 555, "_$_token": "it is a token", '
|
||||
b'"msg_str": "hello", "msg_type": 1}',
|
||||
body
|
||||
)
|
||||
|
||||
props = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["properties"]
|
||||
|
||||
self.assertEqual(props.content_encoding, 'utf-8')
|
||||
self.assertEqual(props.content_type, 'application/json')
|
||||
self.assertEqual(props.delivery_mode, 1)
|
||||
self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
|
||||
self.assertEqual(props.headers, {'version': '1.0'})
|
||||
self.assertIsNone(props.correlation_id)
|
||||
self.assertIsNone(props.reply_to)
|
||||
self.assertTrue(props.message_id)
|
||||
|
||||
@patch("oslo_serialization.jsonutils.dumps",
|
||||
new=functools.partial(jsonutils.dumps, sort_keys=True))
|
||||
def test_send_call_message(self):
|
||||
message = pika_drv_msg.RpcPikaOutgoingMessage(
|
||||
self._pika_engine, self._message, self._context
|
||||
)
|
||||
|
||||
expiration = 1
|
||||
expiration_time = time.time() + expiration
|
||||
|
||||
result = "it is a result"
|
||||
reply_queue_name = "reply_queue_name"
|
||||
|
||||
future = futures.Future()
|
||||
future.set_result(result)
|
||||
reply_listener = mock.Mock()
|
||||
reply_listener.register_reply_waiter.return_value = future
|
||||
reply_listener.get_reply_qname.return_value = reply_queue_name
|
||||
|
||||
res = message.send(
|
||||
target=oslo_messaging.Target(exchange=self._exchange,
|
||||
topic=self._routing_key),
|
||||
reply_listener=reply_listener,
|
||||
expiration_time=expiration_time,
|
||||
retrier=None
|
||||
)
|
||||
|
||||
self.assertEqual(result, res)
|
||||
|
||||
self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.assert_called_once_with(
|
||||
body=mock.ANY,
|
||||
exchange=self._exchange, mandatory=True,
|
||||
properties=mock.ANY,
|
||||
routing_key=self._routing_key
|
||||
)
|
||||
|
||||
body = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["body"]
|
||||
|
||||
self.assertEqual(
|
||||
b'{"_$_request_id": 555, "_$_token": "it is a token", '
|
||||
b'"msg_str": "hello", "msg_type": 1}',
|
||||
body
|
||||
)
|
||||
|
||||
props = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["properties"]
|
||||
|
||||
self.assertEqual(props.content_encoding, 'utf-8')
|
||||
self.assertEqual(props.content_type, 'application/json')
|
||||
self.assertEqual(props.delivery_mode, 1)
|
||||
self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
|
||||
self.assertEqual(props.headers, {'version': '1.0'})
|
||||
self.assertEqual(props.correlation_id, message.msg_id)
|
||||
self.assertEquals(props.reply_to, reply_queue_name)
|
||||
self.assertTrue(props.message_id)
|
||||
|
||||
|
||||
class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._reply_q = "reply_queue_name"
|
||||
|
||||
self._expiration = 1
|
||||
self._expiration_time = time.time() + self._expiration
|
||||
|
||||
self._pika_engine = mock.MagicMock()
|
||||
|
||||
self._rpc_reply_exchange = "rpc_reply_exchange"
|
||||
self._pika_engine.rpc_reply_exchange = self._rpc_reply_exchange
|
||||
|
||||
self._msg_id = 12345567
|
||||
|
||||
@patch("oslo_serialization.jsonutils.dumps",
|
||||
new=functools.partial(jsonutils.dumps, sort_keys=True))
|
||||
def test_success_message_send(self):
|
||||
message = pika_drv_msg.RpcReplyPikaOutgoingMessage(
|
||||
self._pika_engine, self._msg_id, reply="all_fine"
|
||||
)
|
||||
|
||||
message.send(self._reply_q, expiration_time=self._expiration_time,
|
||||
retrier=None)
|
||||
|
||||
self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.assert_called_once_with(
|
||||
body=b'{"s": "all_fine"}',
|
||||
exchange=self._rpc_reply_exchange, mandatory=True,
|
||||
properties=mock.ANY,
|
||||
routing_key=self._reply_q
|
||||
)
|
||||
|
||||
props = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["properties"]
|
||||
|
||||
self.assertEqual(props.content_encoding, 'utf-8')
|
||||
self.assertEqual(props.content_type, 'application/json')
|
||||
self.assertEqual(props.delivery_mode, 1)
|
||||
self.assertTrue(self._expiration * 1000 - float(props.expiration) <
|
||||
100)
|
||||
self.assertEqual(props.headers, {'version': '1.0'})
|
||||
self.assertEqual(props.correlation_id, message.msg_id)
|
||||
self.assertIsNone(props.reply_to)
|
||||
self.assertTrue(props.message_id)
|
||||
|
||||
@patch("traceback.format_exception", new=lambda x,y,z:z)
|
||||
@patch("oslo_serialization.jsonutils.dumps",
|
||||
new=functools.partial(jsonutils.dumps, sort_keys=True))
|
||||
def test_failure_message_send(self):
|
||||
failure_info = (oslo_messaging.MessagingException,
|
||||
oslo_messaging.MessagingException("Error message"),
|
||||
['It is a trace'])
|
||||
|
||||
|
||||
message = pika_drv_msg.RpcReplyPikaOutgoingMessage(
|
||||
self._pika_engine, self._msg_id, failure_info=failure_info
|
||||
)
|
||||
|
||||
message.send(self._reply_q, expiration_time=self._expiration_time,
|
||||
retrier=None)
|
||||
|
||||
self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.assert_called_once_with(
|
||||
body=mock.ANY,
|
||||
exchange=self._rpc_reply_exchange,
|
||||
mandatory=True,
|
||||
properties=mock.ANY,
|
||||
routing_key=self._reply_q
|
||||
)
|
||||
|
||||
body = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["body"]
|
||||
self.assertEqual(
|
||||
b'{"e": {"c": "MessagingException", '
|
||||
b'"m": "oslo_messaging.exceptions", "s": "Error message", '
|
||||
b'"t": ["It is a trace"]}}',
|
||||
body
|
||||
)
|
||||
|
||||
props = self._pika_engine.connection_with_confirmation_pool.acquire(
|
||||
).__enter__().channel.publish.call_args[1]["properties"]
|
||||
|
||||
self.assertEqual(props.content_encoding, 'utf-8')
|
||||
self.assertEqual(props.content_type, 'application/json')
|
||||
self.assertEqual(props.delivery_mode, 1)
|
||||
self.assertTrue(self._expiration * 1000 - float(props.expiration) <
|
||||
100)
|
||||
self.assertEqual(props.headers, {'version': '1.0'})
|
||||
self.assertEqual(props.correlation_id, message.msg_id)
|
||||
self.assertIsNone(props.reply_to)
|
||||
self.assertTrue(props.message_id)
|
536
oslo_messaging/tests/drivers/pika/test_poller.py
Normal file
536
oslo_messaging/tests/drivers/pika/test_poller.py
Normal file
@ -0,0 +1,536 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
|
||||
from oslo_messaging._drivers.pika_driver import pika_poller
|
||||
|
||||
|
||||
class PikaPollerTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._pika_engine = mock.Mock()
|
||||
self._poller_connection_mock = mock.Mock()
|
||||
self._poller_channel_mock = mock.Mock()
|
||||
self._poller_connection_mock.channel.return_value = (
|
||||
self._poller_channel_mock
|
||||
)
|
||||
self._pika_engine.create_connection.return_value = (
|
||||
self._poller_connection_mock
|
||||
)
|
||||
self._prefetch_count = 123
|
||||
|
||||
@mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
|
||||
"_declare_queue_binding")
|
||||
def test_poll(self, declare_queue_binding_mock):
|
||||
incoming_message_class_mock = mock.Mock()
|
||||
poller = pika_poller.PikaPoller(
|
||||
self._pika_engine, self._prefetch_count,
|
||||
incoming_message_class=incoming_message_class_mock
|
||||
)
|
||||
unused = object()
|
||||
method = object()
|
||||
properties = object()
|
||||
body = object()
|
||||
|
||||
self._poller_connection_mock.process_data_events.side_effect = (
|
||||
lambda time_limit: poller._on_message_with_ack_callback(
|
||||
unused, method, properties, body
|
||||
)
|
||||
)
|
||||
|
||||
poller.start()
|
||||
res = poller.poll()
|
||||
|
||||
self.assertEqual(len(res), 1)
|
||||
|
||||
self.assertEqual(res[0], incoming_message_class_mock.return_value)
|
||||
incoming_message_class_mock.assert_called_once_with(
|
||||
self._pika_engine, self._poller_channel_mock, method, properties,
|
||||
body
|
||||
)
|
||||
|
||||
self.assertTrue(self._pika_engine.create_connection.called)
|
||||
self.assertTrue(self._poller_connection_mock.channel.called)
|
||||
|
||||
self.assertTrue(declare_queue_binding_mock.called)
|
||||
|
||||
@mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
|
||||
"_declare_queue_binding")
|
||||
def test_poll_after_stop(self, declare_queue_binding_mock):
|
||||
incoming_message_class_mock = mock.Mock()
|
||||
poller = pika_poller.PikaPoller(
|
||||
self._pika_engine, self._prefetch_count,
|
||||
incoming_message_class=incoming_message_class_mock
|
||||
)
|
||||
|
||||
n = 10
|
||||
params = []
|
||||
|
||||
for i in range(n):
|
||||
params.append((object(), object(), object(), object()))
|
||||
|
||||
index = [0]
|
||||
|
||||
def f(time_limit):
|
||||
for i in range(10):
|
||||
poller._on_message_no_ack_callback(
|
||||
*params[index[0]]
|
||||
)
|
||||
index[0] += 1
|
||||
|
||||
self._poller_connection_mock.process_data_events.side_effect = f
|
||||
|
||||
poller.start()
|
||||
res = poller.poll(prefetch_size=1)
|
||||
self.assertEqual(len(res), 1)
|
||||
self.assertEqual(res[0], incoming_message_class_mock.return_value)
|
||||
self.assertEqual(
|
||||
incoming_message_class_mock.call_args_list[0][0],
|
||||
(self._pika_engine, None) + params[0][1:]
|
||||
)
|
||||
|
||||
poller.stop()
|
||||
|
||||
res2 = poller.poll(prefetch_size=n)
|
||||
|
||||
self.assertEqual(len(res2), n-1)
|
||||
self.assertEqual(incoming_message_class_mock.call_count, n)
|
||||
|
||||
self.assertEqual(
|
||||
self._poller_connection_mock.process_data_events.call_count, 1)
|
||||
|
||||
for i in range(n-1):
|
||||
self.assertEqual(res2[i], incoming_message_class_mock.return_value)
|
||||
self.assertEqual(
|
||||
incoming_message_class_mock.call_args_list[i+1][0],
|
||||
(self._pika_engine, None) + params[i+1][1:]
|
||||
)
|
||||
|
||||
self.assertTrue(self._pika_engine.create_connection.called)
|
||||
self.assertTrue(self._poller_connection_mock.channel.called)
|
||||
|
||||
self.assertTrue(declare_queue_binding_mock.called)
|
||||
|
||||
@mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
|
||||
"_declare_queue_binding")
|
||||
def test_poll_batch(self, declare_queue_binding_mock):
|
||||
incoming_message_class_mock = mock.Mock()
|
||||
poller = pika_poller.PikaPoller(
|
||||
self._pika_engine, self._prefetch_count,
|
||||
incoming_message_class=incoming_message_class_mock
|
||||
)
|
||||
|
||||
n = 10
|
||||
params = []
|
||||
|
||||
for i in range(n):
|
||||
params.append((object(), object(), object(), object()))
|
||||
|
||||
index = [0]
|
||||
|
||||
def f(time_limit):
|
||||
poller._on_message_with_ack_callback(
|
||||
*params[index[0]]
|
||||
)
|
||||
index[0] += 1
|
||||
|
||||
self._poller_connection_mock.process_data_events.side_effect = f
|
||||
|
||||
poller.start()
|
||||
res = poller.poll(prefetch_size=n)
|
||||
|
||||
self.assertEqual(len(res), n)
|
||||
self.assertEqual(incoming_message_class_mock.call_count, n)
|
||||
|
||||
for i in range(n):
|
||||
self.assertEqual(res[i], incoming_message_class_mock.return_value)
|
||||
self.assertEqual(
|
||||
incoming_message_class_mock.call_args_list[i][0],
|
||||
(self._pika_engine, self._poller_channel_mock) + params[i][1:]
|
||||
)
|
||||
|
||||
self.assertTrue(self._pika_engine.create_connection.called)
|
||||
self.assertTrue(self._poller_connection_mock.channel.called)
|
||||
|
||||
self.assertTrue(declare_queue_binding_mock.called)
|
||||
|
||||
@mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
|
||||
"_declare_queue_binding")
|
||||
def test_poll_batch_with_timeout(self, declare_queue_binding_mock):
|
||||
incoming_message_class_mock = mock.Mock()
|
||||
poller = pika_poller.PikaPoller(
|
||||
self._pika_engine, self._prefetch_count,
|
||||
incoming_message_class=incoming_message_class_mock
|
||||
)
|
||||
|
||||
n = 10
|
||||
timeout = 1
|
||||
sleep_time = 0.2
|
||||
params = []
|
||||
|
||||
success_count = 5
|
||||
|
||||
for i in range(n):
|
||||
params.append((object(), object(), object(), object()))
|
||||
|
||||
index = [0]
|
||||
|
||||
def f(time_limit):
|
||||
time.sleep(sleep_time)
|
||||
poller._on_message_with_ack_callback(
|
||||
*params[index[0]]
|
||||
)
|
||||
index[0] += 1
|
||||
|
||||
self._poller_connection_mock.process_data_events.side_effect = f
|
||||
|
||||
poller.start()
|
||||
res = poller.poll(prefetch_size=n, timeout=timeout)
|
||||
|
||||
self.assertEqual(len(res), success_count)
|
||||
self.assertEqual(incoming_message_class_mock.call_count, success_count)
|
||||
|
||||
for i in range(success_count):
|
||||
self.assertEqual(res[i], incoming_message_class_mock.return_value)
|
||||
self.assertEqual(
|
||||
incoming_message_class_mock.call_args_list[i][0],
|
||||
(self._pika_engine, self._poller_channel_mock) + params[i][1:]
|
||||
)
|
||||
|
||||
self.assertTrue(self._pika_engine.create_connection.called)
|
||||
self.assertTrue(self._poller_connection_mock.channel.called)
|
||||
|
||||
self.assertTrue(declare_queue_binding_mock.called)
|
||||
|
||||
|
||||
class RpcServicePikaPollerTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._pika_engine = mock.Mock()
|
||||
self._poller_connection_mock = mock.Mock()
|
||||
self._poller_channel_mock = mock.Mock()
|
||||
self._poller_connection_mock.channel.return_value = (
|
||||
self._poller_channel_mock
|
||||
)
|
||||
self._pika_engine.create_connection.return_value = (
|
||||
self._poller_connection_mock
|
||||
)
|
||||
|
||||
self._pika_engine.get_rpc_queue_name.side_effect = (
|
||||
lambda topic, server, no_ack: "_".join(
|
||||
[topic, str(server), str(no_ack)]
|
||||
)
|
||||
)
|
||||
|
||||
self._pika_engine.get_rpc_exchange_name.side_effect = (
|
||||
lambda exchange, topic, fanout, no_ack: "_".join(
|
||||
[exchange, topic, str(fanout), str(no_ack)]
|
||||
)
|
||||
)
|
||||
|
||||
self._prefetch_count = 123
|
||||
self._target = mock.Mock(exchange="exchange", topic="topic",
|
||||
server="server")
|
||||
self._pika_engine.rpc_queue_expiration = 12345
|
||||
|
||||
@mock.patch("oslo_messaging._drivers.pika_driver.pika_message."
|
||||
"RpcPikaIncomingMessage")
|
||||
def test_declare_rpc_queue_bindings(self, rpc_pika_incoming_message_mock):
|
||||
poller = pika_poller.RpcServicePikaPoller(
|
||||
self._pika_engine, self._target, self._prefetch_count,
|
||||
)
|
||||
self._poller_connection_mock.process_data_events.side_effect = (
|
||||
lambda time_limit: poller._on_message_with_ack_callback(
|
||||
None, None, None, None
|
||||
)
|
||||
)
|
||||
|
||||
poller.start()
|
||||
res = poller.poll()
|
||||
|
||||
self.assertEqual(len(res), 1)
|
||||
|
||||
self.assertEqual(res[0], rpc_pika_incoming_message_mock.return_value)
|
||||
|
||||
self.assertTrue(self._pika_engine.create_connection.called)
|
||||
self.assertTrue(self._poller_connection_mock.channel.called)
|
||||
|
||||
declare_queue_binding_by_channel_mock = (
|
||||
self._pika_engine.declare_queue_binding_by_channel
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
declare_queue_binding_by_channel_mock.call_count, 6
|
||||
)
|
||||
|
||||
declare_queue_binding_by_channel_mock.assert_has_calls((
|
||||
mock.call(
|
||||
channel=self._poller_channel_mock, durable=False,
|
||||
exchange="exchange_topic_False_True",
|
||||
exchange_type='direct',
|
||||
queue="topic_None_True",
|
||||
queue_expiration=12345,
|
||||
routing_key="topic_None_True"
|
||||
),
|
||||
mock.call(
|
||||
channel=self._poller_channel_mock, durable=False,
|
||||
exchange="exchange_topic_False_True",
|
||||
exchange_type='direct',
|
||||
queue="topic_server_True",
|
||||
queue_expiration=12345,
|
||||
routing_key="topic_server_True"
|
||||
),
|
||||
mock.call(
|
||||
channel=self._poller_channel_mock, durable=False,
|
||||
exchange="exchange_topic_True_True",
|
||||
exchange_type='fanout',
|
||||
queue="topic_server_True",
|
||||
queue_expiration=12345,
|
||||
routing_key=''
|
||||
),
|
||||
mock.call(
|
||||
channel=self._poller_channel_mock, durable=False,
|
||||
exchange="exchange_topic_False_False",
|
||||
exchange_type='direct',
|
||||
queue="topic_None_False",
|
||||
queue_expiration=12345,
|
||||
routing_key="topic_None_False"
|
||||
),
|
||||
mock.call(
|
||||
channel=self._poller_channel_mock, durable=False,
|
||||
exchange="exchange_topic_False_False",
|
||||
exchange_type='direct',
|
||||
queue="topic_server_False",
|
||||
queue_expiration=12345,
|
||||
routing_key="topic_server_False"
|
||||
),
|
||||
mock.call(
|
||||
channel=self._poller_channel_mock, durable=False,
|
||||
exchange="exchange_topic_True_False",
|
||||
exchange_type='fanout',
|
||||
queue="topic_server_False",
|
||||
queue_expiration=12345,
|
||||
routing_key=''
|
||||
),
|
||||
))
|
||||
|
||||
|
||||
class RpcReplyServicePikaPollerTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._pika_engine = mock.Mock()
|
||||
self._poller_connection_mock = mock.Mock()
|
||||
self._poller_channel_mock = mock.Mock()
|
||||
self._poller_connection_mock.channel.return_value = (
|
||||
self._poller_channel_mock
|
||||
)
|
||||
self._pika_engine.create_connection.return_value = (
|
||||
self._poller_connection_mock
|
||||
)
|
||||
|
||||
self._prefetch_count = 123
|
||||
self._exchange = "rpc_reply_exchange"
|
||||
self._queue = "rpc_reply_queue"
|
||||
|
||||
self._pika_engine.rpc_reply_retry_delay = 12132543456
|
||||
|
||||
self._pika_engine.rpc_queue_expiration = 12345
|
||||
self._pika_engine.rpc_reply_retry_attempts = 3
|
||||
|
||||
def test_start(self):
|
||||
poller = pika_poller.RpcReplyPikaPoller(
|
||||
self._pika_engine, self._exchange, self._queue,
|
||||
self._prefetch_count,
|
||||
)
|
||||
|
||||
poller.start()
|
||||
|
||||
self.assertTrue(self._pika_engine.create_connection.called)
|
||||
self.assertTrue(self._poller_connection_mock.channel.called)
|
||||
|
||||
def test_declare_rpc_reply_queue_binding(self):
|
||||
poller = pika_poller.RpcReplyPikaPoller(
|
||||
self._pika_engine, self._exchange, self._queue,
|
||||
self._prefetch_count,
|
||||
)
|
||||
|
||||
poller.start()
|
||||
|
||||
declare_queue_binding_by_channel_mock = (
|
||||
self._pika_engine.declare_queue_binding_by_channel
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
declare_queue_binding_by_channel_mock.call_count, 1
|
||||
)
|
||||
|
||||
declare_queue_binding_by_channel_mock.assert_called_once_with(
|
||||
channel=self._poller_channel_mock, durable=False,
|
||||
exchange='rpc_reply_exchange', exchange_type='direct',
|
||||
queue='rpc_reply_queue', queue_expiration=12345,
|
||||
routing_key='rpc_reply_queue'
|
||||
)
|
||||
|
||||
|
||||
class NotificationPikaPollerTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._pika_engine = mock.Mock()
|
||||
self._poller_connection_mock = mock.Mock()
|
||||
self._poller_channel_mock = mock.Mock()
|
||||
self._poller_connection_mock.channel.return_value = (
|
||||
self._poller_channel_mock
|
||||
)
|
||||
self._pika_engine.create_connection.return_value = (
|
||||
self._poller_connection_mock
|
||||
)
|
||||
|
||||
self._prefetch_count = 123
|
||||
self._target_and_priorities = (
|
||||
(
|
||||
mock.Mock(exchange="exchange1", topic="topic1",
|
||||
server="server1"), 1
|
||||
),
|
||||
(
|
||||
mock.Mock(exchange="exchange1", topic="topic1"), 2
|
||||
),
|
||||
(
|
||||
mock.Mock(exchange="exchange2", topic="topic2",), 1
|
||||
),
|
||||
)
|
||||
self._pika_engine.notification_persistence = object()
|
||||
|
||||
@mock.patch("oslo_messaging._drivers.pika_driver.pika_message."
|
||||
"PikaIncomingMessage")
|
||||
def test_declare_notification_queue_bindings_default_queue(
|
||||
self, pika_incoming_message_mock):
|
||||
poller = pika_poller.NotificationPikaPoller(
|
||||
self._pika_engine, self._target_and_priorities, None,
|
||||
self._prefetch_count,
|
||||
)
|
||||
self._poller_connection_mock.process_data_events.side_effect = (
|
||||
lambda time_limit: poller._on_message_with_ack_callback(
|
||||
None, None, None, None
|
||||
)
|
||||
)
|
||||
|
||||
poller.start()
|
||||
res = poller.poll()
|
||||
|
||||
self.assertEqual(len(res), 1)
|
||||
|
||||
self.assertEqual(res[0], pika_incoming_message_mock.return_value)
|
||||
|
||||
self.assertTrue(self._pika_engine.create_connection.called)
|
||||
self.assertTrue(self._poller_connection_mock.channel.called)
|
||||
|
||||
declare_queue_binding_by_channel_mock = (
|
||||
self._pika_engine.declare_queue_binding_by_channel
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
declare_queue_binding_by_channel_mock.call_count, 3
|
||||
)
|
||||
|
||||
declare_queue_binding_by_channel_mock.assert_has_calls((
|
||||
mock.call(
|
||||
channel=self._poller_channel_mock,
|
||||
durable=self._pika_engine.notification_persistence,
|
||||
exchange="exchange1",
|
||||
exchange_type='direct',
|
||||
queue="topic1.1",
|
||||
queue_expiration=None,
|
||||
routing_key="topic1.1"
|
||||
),
|
||||
mock.call(
|
||||
channel=self._poller_channel_mock,
|
||||
durable=self._pika_engine.notification_persistence,
|
||||
exchange="exchange1",
|
||||
exchange_type='direct',
|
||||
queue="topic1.2",
|
||||
queue_expiration=None,
|
||||
routing_key="topic1.2"
|
||||
),
|
||||
mock.call(
|
||||
channel=self._poller_channel_mock,
|
||||
durable=self._pika_engine.notification_persistence,
|
||||
exchange="exchange2",
|
||||
exchange_type='direct',
|
||||
queue="topic2.1",
|
||||
queue_expiration=None,
|
||||
routing_key="topic2.1"
|
||||
)
|
||||
))
|
||||
|
||||
@mock.patch("oslo_messaging._drivers.pika_driver.pika_message."
|
||||
"PikaIncomingMessage")
|
||||
def test_declare_notification_queue_bindings_custom_queue(
|
||||
self, pika_incoming_message_mock):
|
||||
poller = pika_poller.NotificationPikaPoller(
|
||||
self._pika_engine, self._target_and_priorities,
|
||||
"custom_queue_name", self._prefetch_count
|
||||
)
|
||||
self._poller_connection_mock.process_data_events.side_effect = (
|
||||
lambda time_limit: poller._on_message_with_ack_callback(
|
||||
None, None, None, None
|
||||
)
|
||||
)
|
||||
|
||||
poller.start()
|
||||
res = poller.poll()
|
||||
|
||||
self.assertEqual(len(res), 1)
|
||||
|
||||
self.assertEqual(res[0], pika_incoming_message_mock.return_value)
|
||||
|
||||
self.assertTrue(self._pika_engine.create_connection.called)
|
||||
self.assertTrue(self._poller_connection_mock.channel.called)
|
||||
|
||||
declare_queue_binding_by_channel_mock = (
|
||||
self._pika_engine.declare_queue_binding_by_channel
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
declare_queue_binding_by_channel_mock.call_count, 3
|
||||
)
|
||||
|
||||
declare_queue_binding_by_channel_mock.assert_has_calls((
|
||||
mock.call(
|
||||
channel=self._poller_channel_mock,
|
||||
durable=self._pika_engine.notification_persistence,
|
||||
exchange="exchange1",
|
||||
exchange_type='direct',
|
||||
queue="custom_queue_name",
|
||||
queue_expiration=None,
|
||||
routing_key="topic1.1"
|
||||
),
|
||||
mock.call(
|
||||
channel=self._poller_channel_mock,
|
||||
durable=self._pika_engine.notification_persistence,
|
||||
exchange="exchange1",
|
||||
exchange_type='direct',
|
||||
queue="custom_queue_name",
|
||||
queue_expiration=None,
|
||||
routing_key="topic1.2"
|
||||
),
|
||||
mock.call(
|
||||
channel=self._poller_channel_mock,
|
||||
durable=self._pika_engine.notification_persistence,
|
||||
exchange="exchange2",
|
||||
exchange_type='direct',
|
||||
queue="custom_queue_name",
|
||||
queue_expiration=None,
|
||||
routing_key="topic2.1"
|
||||
)
|
||||
))
|
@ -35,6 +35,8 @@ PyYAML>=3.1.0
|
||||
# we set the amqp version to ensure heartbeat works
|
||||
amqp>=1.4.0
|
||||
kombu>=3.0.7
|
||||
pika>=0.10.0
|
||||
pika-pool>=0.1.3
|
||||
|
||||
# middleware
|
||||
oslo.middleware>=3.0.0 # Apache-2.0
|
||||
|
32
setup-test-env-pika.sh
Executable file
32
setup-test-env-pika.sh
Executable file
@ -0,0 +1,32 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
. tools/functions.sh
|
||||
|
||||
DATADIR=$(mktemp -d /tmp/OSLOMSG-RABBIT.XXXXX)
|
||||
trap "clean_exit $DATADIR" EXIT
|
||||
|
||||
export RABBITMQ_NODE_IP_ADDRESS=127.0.0.1
|
||||
export RABBITMQ_NODE_PORT=65123
|
||||
export RABBITMQ_NODENAME=oslomsg-test@localhost
|
||||
export RABBITMQ_LOG_BASE=$DATADIR
|
||||
export RABBITMQ_MNESIA_BASE=$DATADIR
|
||||
export RABBITMQ_PID_FILE=$DATADIR/pid
|
||||
export HOME=$DATADIR
|
||||
|
||||
# NOTE(sileht): We directly use the rabbitmq scripts
|
||||
# to avoid distribution check, like running as root/rabbitmq
|
||||
# enforcing.
|
||||
export PATH=/usr/lib/rabbitmq/bin/:$PATH
|
||||
|
||||
|
||||
mkfifo ${DATADIR}/out
|
||||
rabbitmq-server &> ${DATADIR}/out &
|
||||
wait_for_line "Starting broker... completed" "ERROR:" ${DATADIR}/out
|
||||
|
||||
rabbitmqctl add_user oslomsg oslosecret
|
||||
rabbitmqctl set_permissions "oslomsg" ".*" ".*" ".*"
|
||||
|
||||
|
||||
export TRANSPORT_URL=pika://oslomsg:oslosecret@127.0.0.1:65123//
|
||||
$*
|
@ -37,6 +37,7 @@ oslo.messaging.drivers =
|
||||
|
||||
# This is just for internal testing
|
||||
fake = oslo_messaging._drivers.impl_fake:FakeDriver
|
||||
pika = oslo_messaging._drivers.impl_pika:PikaDriver
|
||||
|
||||
oslo.messaging.executors =
|
||||
aioeventlet = oslo_messaging._executors.impl_aioeventlet:AsyncioEventletExecutor
|
||||
|
3
tox.ini
3
tox.ini
@ -27,6 +27,9 @@ commands = python setup.py build_sphinx
|
||||
[testenv:py27-func-rabbit]
|
||||
commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
||||
|
||||
[testenv:py27-func-pika]
|
||||
commands = {toxinidir}/setup-test-env-pika.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
|
||||
|
||||
[testenv:py27-func-amqp1]
|
||||
setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:65123//
|
||||
# NOTE(flaper87): This gate job run on fedora21 for now.
|
||||
|
Loading…
Reference in New Issue
Block a user