Adds comment, updates pika-pool version
Change-Id: I3d9702bfdde89279258dbf0af027fa2a4a044edd
This commit is contained in:
@@ -42,7 +42,20 @@ _VERSION = "1.0"
|
||||
|
||||
|
||||
class RemoteExceptionMixin(object):
|
||||
"""Used for constructing dynamic exception type during deserialization of
|
||||
remote exception. It defines unified '__init__' method signature and
|
||||
exception message format
|
||||
"""
|
||||
def __init__(self, module, clazz, message, trace):
|
||||
"""Store serialized data
|
||||
:param module: String, module name for importing original exception
|
||||
class of serialized remote exception
|
||||
:param clazz: String, original class name of serialized remote
|
||||
exception
|
||||
:param message: String, original message of serialized remote
|
||||
exception
|
||||
:param trace: String, original trace of serialized remote exception
|
||||
"""
|
||||
self.module = module
|
||||
self.clazz = clazz
|
||||
self.message = message
|
||||
@@ -55,8 +68,23 @@ class RemoteExceptionMixin(object):
|
||||
|
||||
|
||||
class PikaIncomingMessage(object):
|
||||
"""Driver friendly adapter for received message. Extract message
|
||||
information from RabbitMQ message and provide access to it
|
||||
"""
|
||||
|
||||
def __init__(self, pika_engine, channel, method, properties, body, no_ack):
|
||||
"""Parse RabbitMQ message
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param channel: Channel, RabbitMQ channel which was used for
|
||||
this message delivery
|
||||
:param method: Method, RabbitMQ message method
|
||||
:param properties: Properties, RabbitMQ message properties
|
||||
:param body: Bytes, RabbitMQ message body
|
||||
:param no_ack: Boolean, defines should this message be acked by
|
||||
consumer or not
|
||||
"""
|
||||
headers = getattr(properties, "headers", {})
|
||||
version = headers.get(_VERSION_HEADER, None)
|
||||
if not utils.version_is_compatible(version, _VERSION):
|
||||
@@ -105,17 +133,43 @@ class PikaIncomingMessage(object):
|
||||
self.ctxt = context_dict
|
||||
|
||||
def acknowledge(self):
|
||||
"""Ack the message. Should be called by message processing logic when
|
||||
it considered as consumed (means that we don't need redelivery of this
|
||||
message anymore)
|
||||
"""
|
||||
if not self._no_ack:
|
||||
self._channel.basic_ack(delivery_tag=self.delivery_tag)
|
||||
|
||||
def requeue(self):
|
||||
"""Rollback the message. Should be called by message processing logic
|
||||
when it can not process the message right now and should be redelivered
|
||||
later if it is possible
|
||||
"""
|
||||
if not self._no_ack:
|
||||
return self._channel.basic_nack(delivery_tag=self.delivery_tag,
|
||||
requeue=True)
|
||||
|
||||
|
||||
class RpcPikaIncomingMessage(PikaIncomingMessage):
|
||||
"""PikaIncomingMessage implementation for RPC messages. It expects
|
||||
extra RPC related fields in message body (msg_id and reply_q). Also 'reply'
|
||||
method added to allow consumer to send RPC reply back to the RPC client
|
||||
"""
|
||||
|
||||
def __init__(self, pika_engine, channel, method, properties, body, no_ack):
|
||||
"""Defines default values of msg_id and reply_q fields and just call
|
||||
super.__init__ method
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param channel: Channel, RabbitMQ channel which was used for
|
||||
this message delivery
|
||||
:param method: Method, RabbitMQ message method
|
||||
:param properties: Properties, RabbitMQ message properties
|
||||
:param body: Bytes, RabbitMQ message body
|
||||
:param no_ack: Boolean, defines should this message be acked by
|
||||
consumer or not
|
||||
"""
|
||||
self.msg_id = None
|
||||
self.reply_q = None
|
||||
|
||||
@@ -124,6 +178,13 @@ class RpcPikaIncomingMessage(PikaIncomingMessage):
|
||||
)
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
"""Send back reply to the RPC client
|
||||
:param reply - Dictionary, reply. In case of exception should be None
|
||||
:param failure - Exception, exception, raised during processing RPC
|
||||
request. Should be None if RPC request was successfully processed
|
||||
:param log_failure, Boolean, not used in this implementation.
|
||||
It present here to be compatible with driver API
|
||||
"""
|
||||
if not (self.msg_id and self.reply_q):
|
||||
return
|
||||
|
||||
@@ -203,7 +264,24 @@ class RpcPikaIncomingMessage(PikaIncomingMessage):
|
||||
|
||||
|
||||
class RpcReplyPikaIncomingMessage(PikaIncomingMessage):
|
||||
"""PikaIncomingMessage implementation for RPC reply messages. It expects
|
||||
extra RPC reply related fields in message body (result and failure).
|
||||
"""
|
||||
def __init__(self, pika_engine, channel, method, properties, body, no_ack):
|
||||
"""Defines default values of result and failure fields, call
|
||||
super.__init__ method and then construct Exception object if failure is
|
||||
not None
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param channel: Channel, RabbitMQ channel which was used for
|
||||
this message delivery
|
||||
:param method: Method, RabbitMQ message method
|
||||
:param properties: Properties, RabbitMQ message properties
|
||||
:param body: Bytes, RabbitMQ message body
|
||||
:param no_ack: Boolean, defines should this message be acked by
|
||||
consumer or not
|
||||
"""
|
||||
self.result = None
|
||||
self.failure = None
|
||||
|
||||
@@ -246,8 +324,23 @@ class RpcReplyPikaIncomingMessage(PikaIncomingMessage):
|
||||
|
||||
|
||||
class PikaOutgoingMessage(object):
|
||||
"""Driver friendly adapter for sending message. Construct RabbitMQ message
|
||||
and send it
|
||||
"""
|
||||
|
||||
def __init__(self, pika_engine, message, context,
|
||||
content_type="application/json", content_encoding="utf-8"):
|
||||
"""Parse RabbitMQ message
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param message: Dictionary, user's message fields
|
||||
:param context: Dictionary, request context's fields
|
||||
:param content_type: String, content-type header, defines serialization
|
||||
mechanism
|
||||
:param content_encoding: String, defines encoding for text data
|
||||
"""
|
||||
|
||||
self._pika_engine = pika_engine
|
||||
|
||||
self.content_type = content_type
|
||||
@@ -267,6 +360,17 @@ class PikaOutgoingMessage(object):
|
||||
self.unique_id = uuid.uuid4().hex
|
||||
|
||||
def _prepare_message_to_send(self):
|
||||
"""Combine user's message fields an system fields (_unique_id,
|
||||
context's data etc)
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param message: Dictionary, user's message fields
|
||||
:param context: Dictionary, request context's fields
|
||||
:param content_type: String, content-type header, defines serialization
|
||||
mechanism
|
||||
:param content_encoding: String, defines encoding for text data
|
||||
"""
|
||||
msg = self.message.copy()
|
||||
|
||||
msg['_unique_id'] = self.unique_id
|
||||
@@ -279,6 +383,20 @@ class PikaOutgoingMessage(object):
|
||||
@staticmethod
|
||||
def _publish(pool, exchange, routing_key, body, properties, mandatory,
|
||||
expiration_time):
|
||||
"""Execute pika publish method using connection from connection pool
|
||||
Also this message catches all pika related exceptions and raise
|
||||
oslo.messaging specific exceptions
|
||||
|
||||
:param pool: Pool, pika connection pool for connection choosing
|
||||
:param exchange: String, RabbitMQ exchange name for message sending
|
||||
:param routing_key: String, RabbitMQ routing key for message routing
|
||||
:param body: Bytes, RabbitMQ message payload
|
||||
:param properties: Properties, RabbitMQ message properties
|
||||
:param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
|
||||
exception if it is not possible to deliver message to any queue)
|
||||
:param expiration_time: Float, expiration time in seconds
|
||||
(like time.time())
|
||||
"""
|
||||
timeout = (None if expiration_time is None else
|
||||
expiration_time - time.time())
|
||||
if timeout is not None and timeout < 0:
|
||||
@@ -341,6 +459,21 @@ class PikaOutgoingMessage(object):
|
||||
def _do_send(self, exchange, routing_key, msg_dict, confirm=True,
|
||||
mandatory=True, persistent=False, expiration_time=None,
|
||||
retrier=None):
|
||||
"""Send prepared message with configured retrying
|
||||
|
||||
:param exchange: String, RabbitMQ exchange name for message sending
|
||||
:param routing_key: String, RabbitMQ routing key for message routing
|
||||
:param msg_dict: Dictionary, message payload
|
||||
:param confirm: Boolean, enable publisher confirmation if True
|
||||
:param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
|
||||
exception if it is not possible to deliver message to any queue)
|
||||
:param persistent: Boolean, send persistent message if True, works only
|
||||
for routing into durable queues
|
||||
:param expiration_time: Float, expiration time in seconds
|
||||
(like time.time())
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
"""
|
||||
properties = pika_spec.BasicProperties(
|
||||
content_encoding=self.content_encoding,
|
||||
content_type=self.content_type,
|
||||
@@ -368,6 +501,20 @@ class PikaOutgoingMessage(object):
|
||||
|
||||
def send(self, exchange, routing_key='', confirm=True, mandatory=True,
|
||||
persistent=False, expiration_time=None, retrier=None):
|
||||
"""Send message with configured retrying
|
||||
|
||||
:param exchange: String, RabbitMQ exchange name for message sending
|
||||
:param routing_key: String, RabbitMQ routing key for message routing
|
||||
:param confirm: Boolean, enable publisher confirmation if True
|
||||
:param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
|
||||
exception if it is not possible to deliver message to any queue)
|
||||
:param persistent: Boolean, send persistent message if True, works only
|
||||
for routing into durable queues
|
||||
:param expiration_time: Float, expiration time in seconds
|
||||
(like time.time())
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
"""
|
||||
msg_dict = self._prepare_message_to_send()
|
||||
|
||||
return self._do_send(exchange, routing_key, msg_dict, confirm,
|
||||
@@ -375,6 +522,9 @@ class PikaOutgoingMessage(object):
|
||||
|
||||
|
||||
class RpcPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
"""PikaOutgoingMessage implementation for RPC messages. It adds
|
||||
possibility to wait and receive RPC reply
|
||||
"""
|
||||
def __init__(self, pika_engine, message, context,
|
||||
content_type="application/json", content_encoding="utf-8"):
|
||||
super(RpcPikaOutgoingMessage, self).__init__(
|
||||
@@ -385,6 +535,16 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
|
||||
|
||||
def send(self, target, reply_listener=None, expiration_time=None,
|
||||
retrier=None):
|
||||
"""Send RPC message with configured retrying
|
||||
|
||||
:param target: Target, oslo.messaging target which defines RPC service
|
||||
:param reply_listener: RpcReplyPikaListener, listener for waiting
|
||||
reply. If None - return immediately without reply waiting
|
||||
:param expiration_time: Float, expiration time in seconds
|
||||
(like time.time())
|
||||
:param retrier: retrying.Retrier, configured retrier object for sending
|
||||
message, if None no retrying is performed
|
||||
"""
|
||||
|
||||
exchange = self._pika_engine.get_rpc_exchange_name(
|
||||
target.exchange, target.topic, target.fanout, retrier is None
|
||||
|
||||
@@ -34,7 +34,7 @@ PyYAML>=3.1.0
|
||||
amqp>=1.4.0
|
||||
kombu>=3.0.7
|
||||
pika>=0.10.0
|
||||
pika-pool>=0.1.2
|
||||
pika-pool>=0.1.3
|
||||
|
||||
# middleware
|
||||
oslo.middleware>=2.8.0 # Apache-2.0
|
||||
|
||||
Reference in New Issue
Block a user