diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py index 2fe4acae..05f6fcd1 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_message.py +++ b/oslo_messaging/_drivers/pika_driver/pika_message.py @@ -42,7 +42,20 @@ _VERSION = "1.0" class RemoteExceptionMixin(object): + """Used for constructing dynamic exception type during deserialization of + remote exception. It defines unified '__init__' method signature and + exception message format + """ def __init__(self, module, clazz, message, trace): + """Store serialized data + :param module: String, module name for importing original exception + class of serialized remote exception + :param clazz: String, original class name of serialized remote + exception + :param message: String, original message of serialized remote + exception + :param trace: String, original trace of serialized remote exception + """ self.module = module self.clazz = clazz self.message = message @@ -55,8 +68,23 @@ class RemoteExceptionMixin(object): class PikaIncomingMessage(object): + """Driver friendly adapter for received message. Extract message + information from RabbitMQ message and provide access to it + """ def __init__(self, pika_engine, channel, method, properties, body, no_ack): + """Parse RabbitMQ message + + :param pika_engine: PikaEngine, shared object with configuration and + shared driver functionality + :param channel: Channel, RabbitMQ channel which was used for + this message delivery + :param method: Method, RabbitMQ message method + :param properties: Properties, RabbitMQ message properties + :param body: Bytes, RabbitMQ message body + :param no_ack: Boolean, defines should this message be acked by + consumer or not + """ headers = getattr(properties, "headers", {}) version = headers.get(_VERSION_HEADER, None) if not utils.version_is_compatible(version, _VERSION): @@ -105,17 +133,43 @@ class PikaIncomingMessage(object): self.ctxt = context_dict def acknowledge(self): + """Ack the message. Should be called by message processing logic when + it considered as consumed (means that we don't need redelivery of this + message anymore) + """ if not self._no_ack: self._channel.basic_ack(delivery_tag=self.delivery_tag) def requeue(self): + """Rollback the message. Should be called by message processing logic + when it can not process the message right now and should be redelivered + later if it is possible + """ if not self._no_ack: return self._channel.basic_nack(delivery_tag=self.delivery_tag, requeue=True) class RpcPikaIncomingMessage(PikaIncomingMessage): + """PikaIncomingMessage implementation for RPC messages. It expects + extra RPC related fields in message body (msg_id and reply_q). Also 'reply' + method added to allow consumer to send RPC reply back to the RPC client + """ + def __init__(self, pika_engine, channel, method, properties, body, no_ack): + """Defines default values of msg_id and reply_q fields and just call + super.__init__ method + + :param pika_engine: PikaEngine, shared object with configuration and + shared driver functionality + :param channel: Channel, RabbitMQ channel which was used for + this message delivery + :param method: Method, RabbitMQ message method + :param properties: Properties, RabbitMQ message properties + :param body: Bytes, RabbitMQ message body + :param no_ack: Boolean, defines should this message be acked by + consumer or not + """ self.msg_id = None self.reply_q = None @@ -124,6 +178,13 @@ class RpcPikaIncomingMessage(PikaIncomingMessage): ) def reply(self, reply=None, failure=None, log_failure=True): + """Send back reply to the RPC client + :param reply - Dictionary, reply. In case of exception should be None + :param failure - Exception, exception, raised during processing RPC + request. Should be None if RPC request was successfully processed + :param log_failure, Boolean, not used in this implementation. + It present here to be compatible with driver API + """ if not (self.msg_id and self.reply_q): return @@ -203,7 +264,24 @@ class RpcPikaIncomingMessage(PikaIncomingMessage): class RpcReplyPikaIncomingMessage(PikaIncomingMessage): + """PikaIncomingMessage implementation for RPC reply messages. It expects + extra RPC reply related fields in message body (result and failure). + """ def __init__(self, pika_engine, channel, method, properties, body, no_ack): + """Defines default values of result and failure fields, call + super.__init__ method and then construct Exception object if failure is + not None + + :param pika_engine: PikaEngine, shared object with configuration and + shared driver functionality + :param channel: Channel, RabbitMQ channel which was used for + this message delivery + :param method: Method, RabbitMQ message method + :param properties: Properties, RabbitMQ message properties + :param body: Bytes, RabbitMQ message body + :param no_ack: Boolean, defines should this message be acked by + consumer or not + """ self.result = None self.failure = None @@ -246,8 +324,23 @@ class RpcReplyPikaIncomingMessage(PikaIncomingMessage): class PikaOutgoingMessage(object): + """Driver friendly adapter for sending message. Construct RabbitMQ message + and send it + """ + def __init__(self, pika_engine, message, context, content_type="application/json", content_encoding="utf-8"): + """Parse RabbitMQ message + + :param pika_engine: PikaEngine, shared object with configuration and + shared driver functionality + :param message: Dictionary, user's message fields + :param context: Dictionary, request context's fields + :param content_type: String, content-type header, defines serialization + mechanism + :param content_encoding: String, defines encoding for text data + """ + self._pika_engine = pika_engine self.content_type = content_type @@ -267,6 +360,17 @@ class PikaOutgoingMessage(object): self.unique_id = uuid.uuid4().hex def _prepare_message_to_send(self): + """Combine user's message fields an system fields (_unique_id, + context's data etc) + + :param pika_engine: PikaEngine, shared object with configuration and + shared driver functionality + :param message: Dictionary, user's message fields + :param context: Dictionary, request context's fields + :param content_type: String, content-type header, defines serialization + mechanism + :param content_encoding: String, defines encoding for text data + """ msg = self.message.copy() msg['_unique_id'] = self.unique_id @@ -279,6 +383,20 @@ class PikaOutgoingMessage(object): @staticmethod def _publish(pool, exchange, routing_key, body, properties, mandatory, expiration_time): + """Execute pika publish method using connection from connection pool + Also this message catches all pika related exceptions and raise + oslo.messaging specific exceptions + + :param pool: Pool, pika connection pool for connection choosing + :param exchange: String, RabbitMQ exchange name for message sending + :param routing_key: String, RabbitMQ routing key for message routing + :param body: Bytes, RabbitMQ message payload + :param properties: Properties, RabbitMQ message properties + :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise + exception if it is not possible to deliver message to any queue) + :param expiration_time: Float, expiration time in seconds + (like time.time()) + """ timeout = (None if expiration_time is None else expiration_time - time.time()) if timeout is not None and timeout < 0: @@ -341,6 +459,21 @@ class PikaOutgoingMessage(object): def _do_send(self, exchange, routing_key, msg_dict, confirm=True, mandatory=True, persistent=False, expiration_time=None, retrier=None): + """Send prepared message with configured retrying + + :param exchange: String, RabbitMQ exchange name for message sending + :param routing_key: String, RabbitMQ routing key for message routing + :param msg_dict: Dictionary, message payload + :param confirm: Boolean, enable publisher confirmation if True + :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise + exception if it is not possible to deliver message to any queue) + :param persistent: Boolean, send persistent message if True, works only + for routing into durable queues + :param expiration_time: Float, expiration time in seconds + (like time.time()) + :param retrier: retrying.Retrier, configured retrier object for sending + message, if None no retrying is performed + """ properties = pika_spec.BasicProperties( content_encoding=self.content_encoding, content_type=self.content_type, @@ -368,6 +501,20 @@ class PikaOutgoingMessage(object): def send(self, exchange, routing_key='', confirm=True, mandatory=True, persistent=False, expiration_time=None, retrier=None): + """Send message with configured retrying + + :param exchange: String, RabbitMQ exchange name for message sending + :param routing_key: String, RabbitMQ routing key for message routing + :param confirm: Boolean, enable publisher confirmation if True + :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise + exception if it is not possible to deliver message to any queue) + :param persistent: Boolean, send persistent message if True, works only + for routing into durable queues + :param expiration_time: Float, expiration time in seconds + (like time.time()) + :param retrier: retrying.Retrier, configured retrier object for sending + message, if None no retrying is performed + """ msg_dict = self._prepare_message_to_send() return self._do_send(exchange, routing_key, msg_dict, confirm, @@ -375,6 +522,9 @@ class PikaOutgoingMessage(object): class RpcPikaOutgoingMessage(PikaOutgoingMessage): + """PikaOutgoingMessage implementation for RPC messages. It adds + possibility to wait and receive RPC reply + """ def __init__(self, pika_engine, message, context, content_type="application/json", content_encoding="utf-8"): super(RpcPikaOutgoingMessage, self).__init__( @@ -385,6 +535,16 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): def send(self, target, reply_listener=None, expiration_time=None, retrier=None): + """Send RPC message with configured retrying + + :param target: Target, oslo.messaging target which defines RPC service + :param reply_listener: RpcReplyPikaListener, listener for waiting + reply. If None - return immediately without reply waiting + :param expiration_time: Float, expiration time in seconds + (like time.time()) + :param retrier: retrying.Retrier, configured retrier object for sending + message, if None no retrying is performed + """ exchange = self._pika_engine.get_rpc_exchange_name( target.exchange, target.topic, target.fanout, retrier is None diff --git a/requirements.txt b/requirements.txt index 681d55ec..fa33c243 100644 --- a/requirements.txt +++ b/requirements.txt @@ -34,7 +34,7 @@ PyYAML>=3.1.0 amqp>=1.4.0 kombu>=3.0.7 pika>=0.10.0 -pika-pool>=0.1.2 +pika-pool>=0.1.3 # middleware oslo.middleware>=2.8.0 # Apache-2.0