Merge "kafka: Don't hide unpack/unserialize exception"
This commit is contained in:
commit
6809dbe044
@ -38,6 +38,11 @@ from oslo_messaging._i18n import _LE
|
||||
from oslo_messaging._i18n import _LW
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
import logging as l
|
||||
l.basicConfig(level=l.INFO)
|
||||
l.getLogger("kafka").setLevel(l.WARN)
|
||||
l.getLogger("stevedore").setLevel(l.WARN)
|
||||
|
||||
if eventletutils.is_monkey_patched('select'):
|
||||
# monkeypatch the vendored SelectSelector._select like eventlet does
|
||||
# https://github.com/eventlet/eventlet/blob/master/eventlet/green/selectors.py#L32
|
||||
@ -55,18 +60,11 @@ LOG = logging.getLogger(__name__)
|
||||
def unpack_message(msg):
|
||||
context = {}
|
||||
message = None
|
||||
try:
|
||||
if msg:
|
||||
msg = json.loads(msg)
|
||||
message = driver_common.deserialize_msg(msg)
|
||||
if 'context' in message:
|
||||
context = message['context']
|
||||
del message['context']
|
||||
except ValueError as e:
|
||||
LOG.info("Invalid format of consumed message: %s" % e)
|
||||
except Exception:
|
||||
LOG.warning(_LW("Exception during message unpacking"))
|
||||
return message, context
|
||||
msg = json.loads(msg)
|
||||
message = driver_common.deserialize_msg(msg)
|
||||
context = message['_context']
|
||||
del message['_context']
|
||||
return context, message
|
||||
|
||||
|
||||
def pack_message(ctxt, msg):
|
||||
@ -76,7 +74,7 @@ def pack_message(ctxt, msg):
|
||||
context_d = ctxt
|
||||
else:
|
||||
context_d = ctxt.to_dict()
|
||||
msg['context'] = context_d
|
||||
msg['_context'] = context_d
|
||||
|
||||
msg = driver_common.serialize_msg(msg)
|
||||
|
||||
@ -181,7 +179,15 @@ class Connection(object):
|
||||
|
||||
@with_reconnect()
|
||||
def _poll_messages(self, timeout):
|
||||
return self.consumer.poll(timeout * 1000.0)
|
||||
messages = self.consumer.poll(timeout * 1000.0)
|
||||
messages = [record.value
|
||||
for records in messages.values() if records
|
||||
for record in records]
|
||||
if not messages:
|
||||
# NOTE(sileht): really ? you return payload but no messages...
|
||||
# simulate timeout to consume message again
|
||||
raise kafka.errors.ConsumerTimeout()
|
||||
return messages
|
||||
|
||||
def consume(self, timeout=None):
|
||||
"""Receive up to 'max_fetch_messages' messages.
|
||||
@ -275,26 +281,17 @@ class KafkaListener(base.PollStyleListener):
|
||||
|
||||
@base.batch_poll_helper
|
||||
def poll(self, timeout=None):
|
||||
# TODO(sileht): use batch capability of kafka
|
||||
while not self._stopped.is_set():
|
||||
if self.incoming_queue:
|
||||
return self.incoming_queue.pop(0)
|
||||
try:
|
||||
messages = self.conn.consume(timeout=timeout)
|
||||
if messages:
|
||||
self._put_messages_to_queue(messages)
|
||||
messages = self.conn.consume(timeout=timeout) or []
|
||||
for message in messages:
|
||||
msg = OsloKafkaMessage(*unpack_message(message))
|
||||
self.incoming_queue.append(msg)
|
||||
except driver_common.Timeout:
|
||||
return None
|
||||
|
||||
def _put_messages_to_queue(self, messages):
|
||||
for topic, records in messages.items():
|
||||
if records:
|
||||
for record in records:
|
||||
message, context = unpack_message(record.value)
|
||||
if message:
|
||||
self.incoming_queue.append(
|
||||
OsloKafkaMessage(ctxt=context, message=message))
|
||||
|
||||
def stop(self):
|
||||
self._stopped.set()
|
||||
self.conn.stop_consuming()
|
||||
|
Loading…
x
Reference in New Issue
Block a user