This commit is contained in:
@@ -20,7 +20,8 @@ log = logging.getLogger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
ConsumerRecord = collections.namedtuple("ConsumerRecord",
|
ConsumerRecord = collections.namedtuple("ConsumerRecord",
|
||||||
["topic", "partition", "offset", "timestamp", "timestamp_type", "key", "value"])
|
["topic", "partition", "offset", "timestamp", "timestamp_type",
|
||||||
|
"key", "value", "checksum", "serialized_key_size", "serialized_value_size"])
|
||||||
|
|
||||||
|
|
||||||
class NoOffsetForPartitionError(Errors.KafkaError):
|
class NoOffsetForPartitionError(Errors.KafkaError):
|
||||||
@@ -410,13 +411,17 @@ class Fetcher(six.Iterator):
|
|||||||
key, value = self._deserialize(inner_msg)
|
key, value = self._deserialize(inner_msg)
|
||||||
yield ConsumerRecord(tp.topic, tp.partition, inner_offset,
|
yield ConsumerRecord(tp.topic, tp.partition, inner_offset,
|
||||||
inner_timestamp, msg.timestamp_type,
|
inner_timestamp, msg.timestamp_type,
|
||||||
key, value)
|
key, value, inner_msg.crc,
|
||||||
|
len(inner_msg.key) if inner_msg.key is not None else -1,
|
||||||
|
len(inner_msg.value) if inner_msg.value is not None else -1)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
key, value = self._deserialize(msg)
|
key, value = self._deserialize(msg)
|
||||||
yield ConsumerRecord(tp.topic, tp.partition, offset,
|
yield ConsumerRecord(tp.topic, tp.partition, offset,
|
||||||
msg.timestamp, msg.timestamp_type,
|
msg.timestamp, msg.timestamp_type,
|
||||||
key, value)
|
key, value, msg.crc,
|
||||||
|
len(msg.key) if msg.key is not None else -1,
|
||||||
|
len(msg.value) if msg.value is not None else -1)
|
||||||
|
|
||||||
# If unpacking raises StopIteration, it is erroneously
|
# If unpacking raises StopIteration, it is erroneously
|
||||||
# caught by the generator. We want all exceptions to be raised
|
# caught by the generator. We want all exceptions to be raised
|
||||||
|
@@ -59,7 +59,7 @@ class MessageSetBuffer(object):
|
|||||||
self._final_size = None
|
self._final_size = None
|
||||||
|
|
||||||
def append(self, offset, message):
|
def append(self, offset, message):
|
||||||
"""Apend a Message to the MessageSet.
|
"""Append a Message to the MessageSet.
|
||||||
|
|
||||||
Arguments:
|
Arguments:
|
||||||
offset (int): offset of the message
|
offset (int): offset of the message
|
||||||
|
@@ -29,22 +29,29 @@ class FutureProduceResult(Future):
|
|||||||
|
|
||||||
|
|
||||||
class FutureRecordMetadata(Future):
|
class FutureRecordMetadata(Future):
|
||||||
def __init__(self, produce_future, relative_offset, timestamp_ms):
|
def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size):
|
||||||
super(FutureRecordMetadata, self).__init__()
|
super(FutureRecordMetadata, self).__init__()
|
||||||
self._produce_future = produce_future
|
self._produce_future = produce_future
|
||||||
self.relative_offset = relative_offset
|
# packing args as a tuple is a minor speed optimization
|
||||||
self.timestamp_ms = timestamp_ms
|
self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size)
|
||||||
produce_future.add_callback(self._produce_success)
|
produce_future.add_callback(self._produce_success)
|
||||||
produce_future.add_errback(self.failure)
|
produce_future.add_errback(self.failure)
|
||||||
|
|
||||||
def _produce_success(self, offset_and_timestamp):
|
def _produce_success(self, offset_and_timestamp):
|
||||||
offset, timestamp_ms = offset_and_timestamp
|
offset, produce_timestamp_ms = offset_and_timestamp
|
||||||
if timestamp_ms is None:
|
|
||||||
timestamp_ms = self.timestamp_ms
|
# Unpacking from args tuple is minor speed optimization
|
||||||
if offset != -1 and self.relative_offset is not None:
|
(relative_offset, timestamp_ms, checksum,
|
||||||
offset += self.relative_offset
|
serialized_key_size, serialized_value_size) = self.args
|
||||||
|
|
||||||
|
if produce_timestamp_ms is not None:
|
||||||
|
timestamp_ms = produce_timestamp_ms
|
||||||
|
if offset != -1 and relative_offset is not None:
|
||||||
|
offset += relative_offset
|
||||||
tp = self._produce_future.topic_partition
|
tp = self._produce_future.topic_partition
|
||||||
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms)
|
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
|
||||||
|
checksum, serialized_key_size,
|
||||||
|
serialized_value_size)
|
||||||
self.success(metadata)
|
self.success(metadata)
|
||||||
|
|
||||||
def get(self, timeout=None):
|
def get(self, timeout=None):
|
||||||
@@ -57,4 +64,6 @@ class FutureRecordMetadata(Future):
|
|||||||
return self.value
|
return self.value
|
||||||
|
|
||||||
|
|
||||||
RecordMetadata = collections.namedtuple('RecordMetadata', 'topic partition topic_partition offset timestamp')
|
RecordMetadata = collections.namedtuple(
|
||||||
|
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
|
||||||
|
'checksum', 'serialized_key_size', 'serialized_value_size'])
|
||||||
|
@@ -457,6 +457,7 @@ class KafkaProducer(object):
|
|||||||
assert value is not None or self.config['api_version'] >= (0, 8, 1), (
|
assert value is not None or self.config['api_version'] >= (0, 8, 1), (
|
||||||
'Null messages require kafka >= 0.8.1')
|
'Null messages require kafka >= 0.8.1')
|
||||||
assert not (value is None and key is None), 'Need at least one: key or value'
|
assert not (value is None and key is None), 'Need at least one: key or value'
|
||||||
|
key_bytes = value_bytes = None
|
||||||
try:
|
try:
|
||||||
# first make sure the metadata for the topic is
|
# first make sure the metadata for the topic is
|
||||||
# available
|
# available
|
||||||
@@ -497,10 +498,11 @@ class KafkaProducer(object):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.debug("Exception occurred during message send: %s", e)
|
log.debug("Exception occurred during message send: %s", e)
|
||||||
return FutureRecordMetadata(
|
return FutureRecordMetadata(
|
||||||
FutureProduceResult(
|
FutureProduceResult(TopicPartition(topic, partition)),
|
||||||
TopicPartition(topic, partition)),
|
-1, None, None,
|
||||||
-1, None
|
len(key_bytes) if key_bytes is not None else -1,
|
||||||
).failure(e)
|
len(value_bytes) if value_bytes is not None else -1
|
||||||
|
).failure(e)
|
||||||
|
|
||||||
def flush(self, timeout=None):
|
def flush(self, timeout=None):
|
||||||
"""
|
"""
|
||||||
|
@@ -57,10 +57,13 @@ class RecordBatch(object):
|
|||||||
|
|
||||||
msg = Message(value, key=key, magic=self.message_version)
|
msg = Message(value, key=key, magic=self.message_version)
|
||||||
record_size = self.records.append(self.record_count, msg)
|
record_size = self.records.append(self.record_count, msg)
|
||||||
|
checksum = msg.crc # crc is recalculated during records.append()
|
||||||
self.max_record_size = max(self.max_record_size, record_size)
|
self.max_record_size = max(self.max_record_size, record_size)
|
||||||
self.last_append = time.time()
|
self.last_append = time.time()
|
||||||
future = FutureRecordMetadata(self.produce_future, self.record_count,
|
future = FutureRecordMetadata(self.produce_future, self.record_count,
|
||||||
timestamp_ms)
|
timestamp_ms, checksum,
|
||||||
|
len(key) if key is not None else -1,
|
||||||
|
len(value) if value is not None else -1)
|
||||||
self.record_count += 1
|
self.record_count += 1
|
||||||
return future
|
return future
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user