Merge "[rabbit] Ignore stream offset header when stream queue is not used" into stable/2024.2
This commit is contained in:
commit
402cc29333
@ -622,12 +622,12 @@ class Consumer(object):
|
|||||||
|
|
||||||
Messages that are processed and ack'ed.
|
Messages that are processed and ack'ed.
|
||||||
"""
|
"""
|
||||||
|
if self.rabbit_stream_fanout:
|
||||||
offset = message.headers.get("x-stream-offset")
|
offset = message.headers.get("x-stream-offset")
|
||||||
if offset is not None:
|
if offset is not None:
|
||||||
LOG.debug("Stream for %s current offset: %s", self.queue_name,
|
LOG.debug("Stream for %s current offset: %s",
|
||||||
offset)
|
self.queue_name, offset)
|
||||||
self.next_stream_offset = offset + 1
|
self.next_stream_offset = offset + 1
|
||||||
|
|
||||||
m2p = getattr(self.queue.channel, 'message_to_python', None)
|
m2p = getattr(self.queue.channel, 'message_to_python', None)
|
||||||
if m2p:
|
if m2p:
|
||||||
@ -1096,7 +1096,7 @@ class Connection(object):
|
|||||||
info = {'err_str': exc, 'sleep_time': interval}
|
info = {'err_str': exc, 'sleep_time': interval}
|
||||||
info.update(self._get_connection_info(conn_error=True))
|
info.update(self._get_connection_info(conn_error=True))
|
||||||
|
|
||||||
if 'Basic.cancel' in str(exc):
|
if self.rabbit_stream_fanout and 'Basic.cancel' in str(exc):
|
||||||
# This branch allows for consumer offset reset
|
# This branch allows for consumer offset reset
|
||||||
# in the unlikely case consumers are cancelled. This may
|
# in the unlikely case consumers are cancelled. This may
|
||||||
# happen, for example, when we delete the stream queue.
|
# happen, for example, when we delete the stream queue.
|
||||||
|
Loading…
Reference in New Issue
Block a user