KAFKA-3025: Message v1 -- add timetamp and use relative offset in compressed messagesets

This commit is contained in:
Dana Powers
2016-05-22 00:31:16 -07:00
parent 7f4a9361ea
commit 795cb9b29f
7 changed files with 132 additions and 50 deletions

View File

@@ -19,7 +19,7 @@ log = logging.getLogger(__name__)
ConsumerRecord = collections.namedtuple("ConsumerRecord",
["topic", "partition", "offset", "key", "value"])
["topic", "partition", "offset", "timestamp", "timestamp_type", "key", "value"])
class NoOffsetForPartitionError(Errors.KafkaError):
@@ -351,17 +351,33 @@ class Fetcher(six.Iterator):
position)
return dict(drained)
def _unpack_message_set(self, tp, messages):
def _unpack_message_set(self, tp, messages, relative_offset=0):
try:
for offset, size, msg in messages:
if self.config['check_crcs'] and not msg.validate_crc():
raise Errors.InvalidMessageError(msg)
elif msg.is_compressed():
for record in self._unpack_message_set(tp, msg.decompress()):
mset = msg.decompress()
# new format uses relative offsets for compressed messages
if msg.magic > 0:
last_offset, _, _ = mset[-1]
relative = offset - last_offset
else:
relative = 0
for record in self._unpack_message_set(tp, mset, relative):
yield record
else:
# Message v1 adds timestamp
if msg.magic > 0:
timestamp = msg.timestamp
timestamp_type = msg.timestamp_type
else:
timestamp = timestamp_type = None
key, value = self._deserialize(msg)
yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
yield ConsumerRecord(tp.topic, tp.partition,
offset + relative_offset,
timestamp, timestamp_type,
key, value)
# If unpacking raises StopIteration, it is erroneously
# caught by the generator. We want all exceptions to be raised
# back to the user. See Issue 545

View File

@@ -29,7 +29,7 @@ class MessageSetBuffer(object):
'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY),
'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
}
def __init__(self, buf, batch_size, compression_type=None):
def __init__(self, buf, batch_size, compression_type=None, message_version=0):
if compression_type is not None:
assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
checker, encoder, attributes = self._COMPRESSORS[compression_type]
@@ -40,6 +40,7 @@ class MessageSetBuffer(object):
self._compressor = None
self._compression_attributes = None
self._message_version = message_version
self._buffer = buf
# Init MessageSetSize to 0 -- update on close
self._buffer.seek(0)
@@ -85,7 +86,8 @@ class MessageSetBuffer(object):
# TODO: avoid copies with bytearray / memoryview
self._buffer.seek(4)
msg = Message(self._compressor(self._buffer.read()),
attributes=self._compression_attributes)
attributes=self._compression_attributes,
magic=self._message_version)
encoded = msg.encode()
self._buffer.seek(4)
self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg

View File

@@ -29,16 +29,21 @@ class FutureProduceResult(Future):
class FutureRecordMetadata(Future):
def __init__(self, produce_future, relative_offset):
def __init__(self, produce_future, relative_offset, timestamp_ms):
super(FutureRecordMetadata, self).__init__()
self._produce_future = produce_future
self.relative_offset = relative_offset
self.timestamp_ms = timestamp_ms
produce_future.add_callback(self._produce_success)
produce_future.add_errback(self.failure)
def _produce_success(self, base_offset):
def _produce_success(self, offset_and_timestamp):
base_offset, timestamp_ms = offset_and_timestamp
if timestamp_ms is None:
timestamp_ms = self.timestamp_ms
self.success(RecordMetadata(self._produce_future.topic_partition,
base_offset, self.relative_offset))
base_offset, timestamp_ms,
self.relative_offset))
def get(self, timeout=None):
if not self.is_done and not self._produce_future.await(timeout):
@@ -51,12 +56,13 @@ class FutureRecordMetadata(Future):
class RecordMetadata(collections.namedtuple(
'RecordMetadata', 'topic partition topic_partition offset')):
def __new__(cls, tp, base_offset, relative_offset=None):
'RecordMetadata', 'topic partition topic_partition offset timestamp')):
def __new__(cls, tp, base_offset, timestamp, relative_offset=None):
offset = base_offset
if relative_offset is not None and base_offset != -1:
offset += relative_offset
return super(RecordMetadata, cls).__new__(cls, tp.topic, tp.partition, tp, offset)
return super(RecordMetadata, cls).__new__(cls, tp.topic, tp.partition,
tp, offset, timestamp)
def __str__(self):
return 'RecordMetadata(topic=%s, partition=%s, offset=%s)' % (

View File

@@ -347,7 +347,7 @@ class KafkaProducer(object):
max_wait = self.config['max_block_ms'] / 1000.0
return self._wait_on_metadata(topic, max_wait)
def send(self, topic, value=None, key=None, partition=None):
def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
"""Publish a message to a topic.
Arguments:
@@ -368,6 +368,8 @@ class KafkaProducer(object):
partition (but if key is None, partition is chosen randomly).
Must be type bytes, or be serializable to bytes via configured
key_serializer.
timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC)
to use as the message timestamp. Defaults to current time.
Returns:
FutureRecordMetadata: resolves to RecordMetadata
@@ -396,8 +398,11 @@ class KafkaProducer(object):
self._ensure_valid_record_size(message_size)
tp = TopicPartition(topic, partition)
if timestamp_ms is None:
timestamp_ms = int(time.time() * 1000)
log.debug("Sending (key=%s value=%s) to %s", key, value, tp)
result = self._accumulator.append(tp, key_bytes, value_bytes,
result = self._accumulator.append(tp, timestamp_ms,
key_bytes, value_bytes,
self.config['max_block_ms'])
future, batch_is_full, new_batch_created = result
if batch_is_full or new_batch_created:
@@ -416,8 +421,10 @@ class KafkaProducer(object):
except Exception as e:
log.debug("Exception occurred during message send: %s", e)
return FutureRecordMetadata(
FutureProduceResult(TopicPartition(topic, partition)),
-1).failure(e)
FutureProduceResult(
TopicPartition(topic, partition)),
-1, None
).failure(e)
def flush(self, timeout=None):
"""

View File

@@ -36,7 +36,7 @@ class AtomicInteger(object):
class RecordBatch(object):
def __init__(self, tp, records):
def __init__(self, tp, records, message_version=0):
self.record_count = 0
#self.max_record_size = 0 # for metrics only
now = time.time()
@@ -46,22 +46,25 @@ class RecordBatch(object):
self.last_attempt = now
self.last_append = now
self.records = records
self.message_version = message_version
self.topic_partition = tp
self.produce_future = FutureProduceResult(tp)
self._retry = False
def try_append(self, key, value):
def try_append(self, timestamp_ms, key, value):
if not self.records.has_room_for(key, value):
return None
self.records.append(self.record_count, Message(value, key=key))
msg = Message(value, key=key, magic=self.message_version)
self.records.append(self.record_count, msg)
# self.max_record_size = max(self.max_record_size, Record.record_size(key, value)) # for metrics only
self.last_append = time.time()
future = FutureRecordMetadata(self.produce_future, self.record_count)
future = FutureRecordMetadata(self.produce_future, self.record_count,
timestamp_ms)
self.record_count += 1
return future
def done(self, base_offset=None, exception=None):
def done(self, base_offset=None, timestamp_ms=None, exception=None):
log.debug("Produced messages to topic-partition %s with base offset"
" %s and error %s.", self.topic_partition, base_offset,
exception) # trace
@@ -69,7 +72,7 @@ class RecordBatch(object):
log.warning('Batch is already closed -- ignoring batch.done()')
return
elif exception is None:
self.produce_future.success(base_offset)
self.produce_future.success((base_offset, timestamp_ms))
else:
self.produce_future.failure(exception)
@@ -78,7 +81,7 @@ class RecordBatch(object):
if ((self.records.is_full() and request_timeout_ms < since_append_ms)
or (request_timeout_ms < (since_append_ms + linger_ms))):
self.records.close()
self.done(-1, Errors.KafkaTimeoutError(
self.done(-1, None, Errors.KafkaTimeoutError(
"Batch containing %s record(s) expired due to timeout while"
" requesting metadata from brokers for %s", self.record_count,
self.topic_partition))
@@ -137,6 +140,7 @@ class RecordAccumulator(object):
'compression_type': None,
'linger_ms': 0,
'retry_backoff_ms': 100,
'message_version': 0,
}
def __init__(self, **configs):
@@ -155,7 +159,7 @@ class RecordAccumulator(object):
self.config['batch_size'])
self._incomplete = IncompleteRecordBatches()
def append(self, tp, key, value, max_time_to_block_ms):
def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms):
"""Add a record to the accumulator, return the append result.
The append result will contain the future metadata, and flag for
@@ -164,6 +168,7 @@ class RecordAccumulator(object):
Arguments:
tp (TopicPartition): The topic/partition to which this record is
being sent
timestamp_ms (int): The timestamp of the record (epoch ms)
key (bytes): The key for the record
value (bytes): The value for the record
max_time_to_block_ms (int): The maximum time in milliseconds to
@@ -188,7 +193,7 @@ class RecordAccumulator(object):
dq = self._batches[tp]
if dq:
last = dq[-1]
future = last.try_append(key, value)
future = last.try_append(timestamp_ms, key, value)
if future is not None:
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False
@@ -211,7 +216,7 @@ class RecordAccumulator(object):
if dq:
last = dq[-1]
future = last.try_append(key, value)
future = last.try_append(timestamp_ms, key, value)
if future is not None:
# Somebody else found us a batch, return the one we
# waited for! Hopefully this doesn't happen often...
@@ -220,9 +225,10 @@ class RecordAccumulator(object):
return future, batch_is_full, False
records = MessageSetBuffer(buf, self.config['batch_size'],
self.config['compression_type'])
batch = RecordBatch(tp, records)
future = batch.try_append(key, value)
self.config['compression_type'],
self.config['message_version'])
batch = RecordBatch(tp, records, self.config['message_version'])
future = batch.try_append(timestamp_ms, key, value)
if not future:
raise Exception()

View File

@@ -163,7 +163,7 @@ class Sender(threading.Thread):
def _failed_produce(self, batches, node_id, error):
log.debug("Error sending produce request to node %d: %s", node_id, error) # trace
for batch in batches:
self._complete_batch(batch, error, -1)
self._complete_batch(batch, error, -1, None)
def _handle_produce_response(self, batches, response):
"""Handle a produce response."""
@@ -183,15 +183,16 @@ class Sender(threading.Thread):
else:
# this is the acks = 0 case, just complete all requests
for batch in batches:
self._complete_batch(batch, None, -1)
self._complete_batch(batch, None, -1, None)
def _complete_batch(self, batch, error, base_offset):
def _complete_batch(self, batch, error, base_offset, timestamp_ms=None):
"""Complete or retry the given batch of records.
Arguments:
batch (RecordBatch): The record batch
error (Exception): The error (or None if none)
base_offset (int): The base offset assigned to the records if successful
timestamp_ms (int, optional): The timestamp returned by the broker for this batch
"""
# Standardize no-error to None
if error is Errors.NoError:
@@ -210,7 +211,7 @@ class Sender(threading.Thread):
error = error(batch.topic_partition.topic)
# tell the user the result of their request
batch.done(base_offset, error)
batch.done(base_offset, timestamp_ms, error)
self._accumulator.deallocate(batch)
if getattr(error, 'invalid_metadata', False):

View File

@@ -1,4 +1,5 @@
import io
import time
from ..codec import (has_gzip, has_snappy, has_lz4,
gzip_decode, snappy_decode, lz4_decode)
@@ -11,22 +12,39 @@ from ..util import crc32
class Message(Struct):
SCHEMA = Schema(
('crc', Int32),
('magic', Int8),
('attributes', Int8),
('key', Bytes),
('value', Bytes)
)
CODEC_MASK = 0x03
SCHEMAS = [
Schema(
('crc', Int32),
('magic', Int8),
('attributes', Int8),
('key', Bytes),
('value', Bytes)),
Schema(
('crc', Int32),
('magic', Int8),
('attributes', Int8),
('timestamp', Int64),
('key', Bytes),
('value', Bytes)),
]
SCHEMA = SCHEMAS[1]
CODEC_MASK = 0x07
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
CODEC_LZ4 = 0x03
HEADER_SIZE = 14 # crc(4), magic(1), attributes(1), key+value size(4*2)
TIMESTAMP_TYPE_MASK = 0x08
HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2)
def __init__(self, value, key=None, magic=0, attributes=0, crc=0):
def __init__(self, value, key=None, magic=0, attributes=0, crc=0,
timestamp=None):
assert value is None or isinstance(value, bytes), 'value must be bytes'
assert key is None or isinstance(key, bytes), 'key must be bytes'
assert magic > 0 or timestamp is None, 'timestamp not supported in v0'
# Default timestamp to now for v1 messages
if magic > 0 and timestamp is None:
timestamp = int(time.time() * 1000)
self.timestamp = timestamp
self.crc = crc
self.magic = magic
self.attributes = attributes
@@ -34,22 +52,48 @@ class Message(Struct):
self.value = value
self.encode = self._encode_self
@property
def timestamp_type(self):
"""0 for CreateTime; 1 for LogAppendTime; None if unsupported.
Value is determined by broker; produced messages should always set to 0
Requires Kafka >= 0.10 / message version >= 1
"""
if self.magic == 0:
return None
return self.attributes & self.TIMESTAMP_TYPE_MASK
def _encode_self(self, recalc_crc=True):
message = Message.SCHEMA.encode(
(self.crc, self.magic, self.attributes, self.key, self.value)
)
version = self.magic
if version == 1:
fields = (self.crc, self.magic, self.attributes, self.timestamp, self.key, self.value)
elif version == 0:
fields = (self.crc, self.magic, self.attributes, self.key, self.value)
else:
raise ValueError('Unrecognized message version: %s' % version)
message = Message.SCHEMAS[version].encode(fields)
if not recalc_crc:
return message
self.crc = crc32(message[4:])
return self.SCHEMA.fields[0].encode(self.crc) + message[4:]
crc_field = self.SCHEMAS[version].fields[0]
return crc_field.encode(self.crc) + message[4:]
@classmethod
def decode(cls, data):
if isinstance(data, bytes):
data = io.BytesIO(data)
fields = [field.decode(data) for field in cls.SCHEMA.fields]
return cls(fields[4], key=fields[3],
magic=fields[1], attributes=fields[2], crc=fields[0])
# Partial decode required to determine message version
base_fields = cls.SCHEMAS[0].fields[0:3]
crc, magic, attributes = [field.decode(data) for field in base_fields]
remaining = cls.SCHEMAS[magic].fields[3:]
fields = [field.decode(data) for field in remaining]
if magic == 1:
timestamp = fields[0]
else:
timestamp = None
return cls(fields[-1], key=fields[-2],
magic=magic, attributes=attributes, crc=crc,
timestamp=timestamp)
def validate_crc(self):
raw_msg = self._encode_self(recalc_crc=False)