Add send_list_offset_request for searching offset by timestamp (#1001)
This commit is contained in:
@@ -686,6 +686,16 @@ class SimpleClient(object):
|
||||
return [resp if not callback else callback(resp) for resp in resps
|
||||
if not fail_on_error or not self._raise_on_response_error(resp)]
|
||||
|
||||
def send_list_offset_request(self, payloads=[], fail_on_error=True,
|
||||
callback=None):
|
||||
resps = self._send_broker_aware_request(
|
||||
payloads,
|
||||
KafkaProtocol.encode_list_offset_request,
|
||||
KafkaProtocol.decode_list_offset_response)
|
||||
|
||||
return [resp if not callback else callback(resp) for resp in resps
|
||||
if not fail_on_error or not self._raise_on_response_error(resp)]
|
||||
|
||||
def send_offset_commit_request(self, group, payloads=[],
|
||||
fail_on_error=True, callback=None):
|
||||
encoder = functools.partial(KafkaProtocol.encode_offset_commit_request,
|
||||
|
||||
@@ -248,6 +248,35 @@ class KafkaProtocol(object):
|
||||
for partition, error, offsets in partitions
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def encode_list_offset_request(cls, payloads=()):
|
||||
return kafka.protocol.offset.OffsetRequest[1](
|
||||
replica_id=-1,
|
||||
topics=[(
|
||||
topic,
|
||||
[(
|
||||
partition,
|
||||
payload.time)
|
||||
for partition, payload in six.iteritems(topic_payloads)])
|
||||
for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
|
||||
|
||||
@classmethod
|
||||
def decode_list_offset_response(cls, response):
|
||||
"""
|
||||
Decode OffsetResponse_v2 into ListOffsetResponsePayloads
|
||||
|
||||
Arguments:
|
||||
response: OffsetResponse_v2
|
||||
|
||||
Returns: list of ListOffsetResponsePayloads
|
||||
"""
|
||||
return [
|
||||
kafka.structs.ListOffsetResponsePayload(topic, partition, error, timestamp, offset)
|
||||
for topic, partitions in response.topics
|
||||
for partition, error, timestamp, offset in partitions
|
||||
]
|
||||
|
||||
|
||||
@classmethod
|
||||
def encode_metadata_request(cls, topics=(), payloads=None):
|
||||
"""
|
||||
|
||||
@@ -37,9 +37,15 @@ FetchResponsePayload = namedtuple("FetchResponsePayload",
|
||||
OffsetRequestPayload = namedtuple("OffsetRequestPayload",
|
||||
["topic", "partition", "time", "max_offsets"])
|
||||
|
||||
ListOffsetRequestPayload = namedtuple("ListOffsetRequestPayload",
|
||||
["topic", "partition", "time"])
|
||||
|
||||
OffsetResponsePayload = namedtuple("OffsetResponsePayload",
|
||||
["topic", "partition", "error", "offsets"])
|
||||
|
||||
ListOffsetResponsePayload = namedtuple("ListOffsetResponsePayload",
|
||||
["topic", "partition", "error", "timestamp", "offset"])
|
||||
|
||||
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
|
||||
OffsetCommitRequestPayload = namedtuple("OffsetCommitRequestPayload",
|
||||
["topic", "partition", "offset", "metadata"])
|
||||
|
||||
Reference in New Issue
Block a user