Fixes conflicts after merging master
Change-Id: I0d75c19e3002a3aad2dd35bbaea203fa9ba0c0ea
This commit is contained in:
parent
cc0f8cc8a9
commit
3976a2ff81
@ -97,17 +97,27 @@ class RpcReplyPikaListener(object):
|
||||
"""
|
||||
while self._reply_poller:
|
||||
try:
|
||||
message = self._reply_poller.poll()
|
||||
if message is None:
|
||||
try:
|
||||
messages = self._reply_poller.poll()
|
||||
except pika_drv_exc.EstablishConnectionException:
|
||||
LOG.exception("Problem during establishing connection for "
|
||||
"reply polling")
|
||||
time.sleep(
|
||||
self._pika_engine.host_connection_reconnect_delay
|
||||
)
|
||||
continue
|
||||
message.acknowledge()
|
||||
future = self._reply_waiting_futures.pop(message.msg_id, None)
|
||||
if future is not None:
|
||||
future.set_result(message)
|
||||
except pika_drv_exc.EstablishConnectionException:
|
||||
LOG.exception("Problem during establishing connection for "
|
||||
"reply polling")
|
||||
time.sleep(self._pika_engine.host_connection_reconnect_delay)
|
||||
|
||||
for message in messages:
|
||||
try:
|
||||
message.acknowledge()
|
||||
future = self._reply_waiting_futures.pop(
|
||||
message.msg_id, None
|
||||
)
|
||||
if future is not None:
|
||||
future.set_result(message)
|
||||
except Exception:
|
||||
LOG.exception("Unexpected exception during processing"
|
||||
"reply message")
|
||||
except BaseException:
|
||||
LOG.exception("Unexpected exception during reply polling")
|
||||
|
||||
|
@ -12,7 +12,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import threading
|
||||
import time
|
||||
|
||||
@ -31,27 +30,30 @@ class PikaPoller(object):
|
||||
connectivity related problem detected
|
||||
"""
|
||||
|
||||
def __init__(self, pika_engine, prefetch_count):
|
||||
def __init__(self, pika_engine, prefetch_count,
|
||||
incoming_message_class=pika_drv_msg.PikaIncomingMessage):
|
||||
"""Initialize required fields
|
||||
|
||||
:param pika_engine: PikaEngine, shared object with configuration and
|
||||
shared driver functionality
|
||||
:param prefetch_count: Integer, maximum count of unacknowledged
|
||||
messages which RabbitMQ broker sends to this consumer
|
||||
:param incoming_message_class: PikaIncomingMessage, wrapper for
|
||||
consumed RabbitMQ message
|
||||
"""
|
||||
self._pika_engine = pika_engine
|
||||
self._prefetch_count = prefetch_count
|
||||
self._incoming_message_class = incoming_message_class
|
||||
|
||||
self._connection = None
|
||||
self._channel = None
|
||||
self._lock = threading.Lock()
|
||||
|
||||
self._prefetch_count = prefetch_count
|
||||
|
||||
self._started = False
|
||||
|
||||
self._queues_to_consume = None
|
||||
|
||||
self._message_queue = collections.deque()
|
||||
self._message_queue = []
|
||||
|
||||
def _reconnect(self):
|
||||
"""Performs reconnection to the broker. It is unsafe method for
|
||||
@ -106,7 +108,10 @@ class PikaPoller(object):
|
||||
no_ack=True mode
|
||||
"""
|
||||
self._message_queue.append(
|
||||
(self._channel, method, properties, body, True)
|
||||
self._incoming_message_class(
|
||||
self._pika_engine, self._channel, method, properties, body,
|
||||
True
|
||||
)
|
||||
)
|
||||
|
||||
def _on_message_with_ack_callback(self, unused, method, properties, body):
|
||||
@ -114,7 +119,10 @@ class PikaPoller(object):
|
||||
no_ack=False mode
|
||||
"""
|
||||
self._message_queue.append(
|
||||
(self._channel, method, properties, body, False)
|
||||
self._incoming_message_class(
|
||||
self._pika_engine, self._channel, method, properties, body,
|
||||
False
|
||||
)
|
||||
)
|
||||
|
||||
def _cleanup(self):
|
||||
@ -137,17 +145,19 @@ class PikaPoller(object):
|
||||
LOG.exception("Unexpected error during closing connection")
|
||||
self._connection = None
|
||||
|
||||
def poll(self, timeout=None):
|
||||
def poll(self, timeout=None, prefetch_size=1):
|
||||
"""Main method of this class - consumes message from RabbitMQ
|
||||
|
||||
:param: timeout: float, seconds, timeout for waiting new incoming
|
||||
message, None means wait forever
|
||||
:return: tuple, RabbitMQ message related data
|
||||
(channel, method, properties, body, no_ack)
|
||||
:param: prefetch_size: Integer, count of messages which we are want to
|
||||
poll. It blocks until prefetch_size messages are consumed or until
|
||||
timeout gets expired
|
||||
:return: list of PikaIncomingMessage, RabbitMQ messages
|
||||
"""
|
||||
expiration_time = time.time() + timeout if timeout else None
|
||||
|
||||
while not self._message_queue:
|
||||
while len(self._message_queue) < prefetch_size:
|
||||
with self._lock:
|
||||
if not self._started:
|
||||
return None
|
||||
@ -162,15 +172,17 @@ class PikaPoller(object):
|
||||
self._connection.process_data_events(
|
||||
time_limit=0.25
|
||||
)
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
LOG.warn("Exception during consuming message. " + str(e))
|
||||
self._cleanup()
|
||||
raise
|
||||
if timeout is not None:
|
||||
timeout = expiration_time - time.time()
|
||||
if timeout <= 0:
|
||||
return None
|
||||
break
|
||||
|
||||
return self._message_queue.popleft()
|
||||
result = self._message_queue[:prefetch_size]
|
||||
self._message_queue = self._message_queue[prefetch_size:]
|
||||
return result
|
||||
|
||||
def start(self):
|
||||
"""Starts poller. Should be called before polling to allow message
|
||||
@ -226,7 +238,9 @@ class RpcServicePikaPoller(PikaPoller):
|
||||
self._target = target
|
||||
|
||||
super(RpcServicePikaPoller, self).__init__(
|
||||
pika_engine, prefetch_count=prefetch_count)
|
||||
pika_engine, prefetch_count=prefetch_count,
|
||||
incoming_message_class=pika_drv_msg.RpcPikaIncomingMessage
|
||||
)
|
||||
|
||||
def _declare_queue_binding(self):
|
||||
"""Overrides base method and perform declaration of RabbitMQ exchanges
|
||||
@ -274,21 +288,6 @@ class RpcServicePikaPoller(PikaPoller):
|
||||
)
|
||||
return queues_to_consume
|
||||
|
||||
def poll(self, timeout=None):
|
||||
"""Overrides base method and wrap RabbitMQ message into
|
||||
RpcPikaIncomingMessage
|
||||
|
||||
:param: timeout: float, seconds, timeout for waiting new incoming
|
||||
message, None means wait forever
|
||||
:return: RpcPikaIncomingMessage, consumed RPC message
|
||||
"""
|
||||
msg = super(RpcServicePikaPoller, self).poll(timeout)
|
||||
if msg is None:
|
||||
return None
|
||||
return pika_drv_msg.RpcPikaIncomingMessage(
|
||||
self._pika_engine, *msg
|
||||
)
|
||||
|
||||
|
||||
class RpcReplyPikaPoller(PikaPoller):
|
||||
"""PikaPoller implementation for polling RPC reply messages. Overrides
|
||||
@ -309,7 +308,8 @@ class RpcReplyPikaPoller(PikaPoller):
|
||||
self._queue = queue
|
||||
|
||||
super(RpcReplyPikaPoller, self).__init__(
|
||||
pika_engine, prefetch_count
|
||||
pika_engine=pika_engine, prefetch_count=prefetch_count,
|
||||
incoming_message_class=pika_drv_msg.RpcReplyPikaIncomingMessage
|
||||
)
|
||||
|
||||
def _declare_queue_binding(self):
|
||||
@ -355,21 +355,6 @@ class RpcReplyPikaPoller(PikaPoller):
|
||||
|
||||
retrier(self.reconnect)()
|
||||
|
||||
def poll(self, timeout=None):
|
||||
"""Overrides base method and wrap RabbitMQ message into
|
||||
RpcReplyPikaIncomingMessage
|
||||
|
||||
:param: timeout: float, seconds, timeout for waiting new incoming
|
||||
message, None means wait forever
|
||||
:return: RpcReplyPikaIncomingMessage, consumed RPC reply message
|
||||
"""
|
||||
msg = super(RpcReplyPikaPoller, self).poll(timeout)
|
||||
if msg is None:
|
||||
return None
|
||||
return pika_drv_msg.RpcReplyPikaIncomingMessage(
|
||||
self._pika_engine, *msg
|
||||
)
|
||||
|
||||
|
||||
class NotificationPikaPoller(PikaPoller):
|
||||
"""PikaPoller implementation for polling Notification messages. Overrides
|
||||
@ -421,18 +406,3 @@ class NotificationPikaPoller(PikaPoller):
|
||||
queues_to_consume[queue] = False
|
||||
|
||||
return queues_to_consume
|
||||
|
||||
def poll(self, timeout=None):
|
||||
"""Overrides base method and wrap RabbitMQ message into
|
||||
PikaIncomingMessage
|
||||
|
||||
:param: timeout: float, seconds, timeout for waiting new incoming
|
||||
message, None means wait forever
|
||||
:return: PikaIncomingMessage, consumed Notification message
|
||||
"""
|
||||
msg = super(NotificationPikaPoller, self).poll(timeout)
|
||||
if msg is None:
|
||||
return None
|
||||
return pika_drv_msg.PikaIncomingMessage(
|
||||
self._pika_engine, *msg
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user