Refactoring a bit, cleanup for 0.8

Marking some stuff as not compatible for 0.8 (will be added in 0.8.1)
This commit is contained in:
David Arthur
2013-04-01 14:56:59 -04:00
parent b6d98c07b4
commit 0678a452ca
8 changed files with 222 additions and 190 deletions

View File

@@ -2,27 +2,21 @@ import logging
from kafka.client import KafkaClient, FetchRequest, ProduceRequest from kafka.client import KafkaClient, FetchRequest, ProduceRequest
from kafka.consumer import SimpleConsumer from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer
def produce_example(kafka): def produce_example(client):
message = kafka.create_message("testing") producer = SimpleProducer(client, "my-topic")
request = ProduceRequest("my-topic", -1, [message]) producer.send_message("test")
kafka.send_message_set(request)
def consume_example(kafka): def consume_example(client):
request = FetchRequest("my-topic", 0, 0, 1024) consumer = SimpleConsumer(client, "test-group", "my-topic")
(messages, nextRequest) = kafka.get_message_set(request) for message in consumer:
for message in messages: print(message)
print("Got Message: %s" % (message,))
print(nextRequest)
def produce_gz_example(kafka):
message = kafka.create_gzip_message("this message was gzipped", "along with this one")
request = ProduceRequest("my-topic", 0, [message])
kafka.send_message_set(request)
def main(): def main():
client = KafkaClient("localhost", 9092) client = KafkaClient("localhost", 9092)
consumer = SimpleConsumer(client, "test-group", "my-topic") produce_example(client)
consume_example(client)
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)

View File

@@ -28,30 +28,35 @@ class KafkaClient(object):
self.brokers = {} # broker_id -> BrokerMetadata self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id self.topics_to_brokers = {} # topic_id -> broker_id
self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...]
self.load_metadata_for_topics() self._load_metadata_for_topics()
def close(self): ##################
for conn in self.conns.values(): # Private API #
conn.close() ##################
def get_conn_for_broker(self, broker):
def _get_conn_for_broker(self, broker):
"Get or create a connection to a broker" "Get or create a connection to a broker"
if (broker.host, broker.port) not in self.conns: if (broker.host, broker.port) not in self.conns:
self.conns[(broker.host, broker.port)] = KafkaConnection(broker.host, broker.port, self.bufsize) self.conns[(broker.host, broker.port)] = KafkaConnection(broker.host, broker.port, self.bufsize)
return self.conns[(broker.host, broker.port)] return self.conns[(broker.host, broker.port)]
def next_id(self): def _get_leader_for_partition(self, topic, partition):
"Generate a new correlation id" key = TopicAndPartition(topic, partition)
return KafkaClient.ID_GEN.next() if key not in self.topics_to_brokers:
self._load_metadata_for_topics(topic)
if key not in self.topics_to_brokers:
raise Exception("Partition does not exist: %s" % str(key))
return self.topics_to_brokers[key]
def load_metadata_for_topics(self, *topics): def _load_metadata_for_topics(self, *topics):
""" """
Discover brokers and metadata for a set of topics. This method will Discover brokers and metadata for a set of topics. This method will
recurse in the event of a retry. recurse in the event of a retry.
""" """
requestId = self.next_id() requestId = self._next_id()
request = KafkaProtocol.encode_metadata_request(KafkaClient.CLIENT_ID, requestId, topics) request = KafkaProtocol.encode_metadata_request(KafkaClient.CLIENT_ID, requestId, topics)
response = self.try_send_request(requestId, request) response = self._send_broker_unaware_request(requestId, request)
if response is None: if response is None:
raise Exception("All servers failed to process request") raise Exception("All servers failed to process request")
(brokers, topics) = KafkaProtocol.decode_metadata_response(response) (brokers, topics) = KafkaProtocol.decode_metadata_response(response)
@@ -64,18 +69,79 @@ class KafkaClient(object):
if meta.leader == -1: if meta.leader == -1:
log.info("Partition is unassigned, delay for 1s and retry") log.info("Partition is unassigned, delay for 1s and retry")
time.sleep(1) time.sleep(1)
self.load_metadata_for_topics(topic) self._load_metadata_for_topics(topic)
else: else:
self.topics_to_brokers[TopicAndPartition(topic, partition)] = brokers[meta.leader] self.topics_to_brokers[TopicAndPartition(topic, partition)] = brokers[meta.leader]
self.topic_partitions[topic].append(partition) self.topic_partitions[topic].append(partition)
def get_leader_for_partition(self, topic, partition): def _next_id(self):
key = TopicAndPartition(topic, partition) "Generate a new correlation id"
if key not in self.topics_to_brokers: return KafkaClient.ID_GEN.next()
self.load_metadata_for_topics(topic)
if key not in self.topics_to_brokers: def _send_broker_unaware_request(self, requestId, request):
raise Exception("Partition does not exist: %s" % str(key)) """
return self.topics_to_brokers[key] Attempt to send a broker-agnostic request to one of the available brokers.
Keep trying until you succeed.
"""
for conn in self.conns.values():
try:
conn.send(requestId, request)
response = conn.recv(requestId)
return response
except Exception, e:
log.warning("Could not send request [%r] to server %s, trying next server: %s" % (request, conn, e))
continue
return None
def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
"""
Group a list of request payloads by topic+partition and send them to the
leader broker for that partition using the supplied encode/decode functions
Params
======
payloads: list of object-like entities with a topic and partition attribute
encode_fn: a method to encode the list of payloads to a request body, must accept
client_id, correlation_id, and payloads as keyword arguments
decode_fn: a method to decode a response body into response objects. The response
objects must be object-like and have topic and partition attributes
Return
======
List of response objects in the same order as the supplied payloads
"""
# Group the requests by topic+partition
original_keys = []
payloads_by_broker = defaultdict(list)
for payload in payloads:
payloads_by_broker[self._get_leader_for_partition(payload.topic, payload.partition)].append(payload)
original_keys.append((payload.topic, payload.partition))
# Accumulate the responses in a dictionary
acc = {}
# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
conn = self._get_conn_for_broker(broker)
requestId = self._next_id()
request = encoder_fn(client_id=KafkaClient.CLIENT_ID, correlation_id=requestId, payloads=payloads)
# Send the request, recv the response
conn.send(requestId, request)
response = conn.recv(requestId)
for response in decoder_fn(response):
acc[(response.topic, response.partition)] = response
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys)
#################
# Public API #
#################
def close(self):
for conn in self.conns.values():
conn.close()
def send_produce_request(self, payloads=[], fail_on_error=True, callback=None): def send_produce_request(self, payloads=[], fail_on_error=True, callback=None):
""" """
@@ -95,37 +161,21 @@ class KafkaClient(object):
====== ======
list of ProduceResponse or callback(ProduceResponse), in the order of input payloads list of ProduceResponse or callback(ProduceResponse), in the order of input payloads
""" """
# Group the produce requests by which broker they go to resps = self._send_broker_aware_request(payloads,
original_keys = [] KafkaProtocol.encode_produce_request,
payloads_by_broker = defaultdict(list) KafkaProtocol.decode_produce_response)
for payload in payloads: out = []
payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)] += payloads for resp in resps:
original_keys.append((payload.topic, payload.partition)) # Check for errors
if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR:
# Accumulate the responses in a dictionary raise Exception("ProduceRequest for %s failed with errorcode=%d" %
acc = {} (TopicAndPartition(resp.topic, resp.partition), resp.error))
# Run the callback
# For each broker, send the list of request payloads if callback is not None:
for broker, payloads in payloads_by_broker.items(): out.append(callback(resp))
conn = self.get_conn_for_broker(broker) else:
requestId = self.next_id() out.append(resp)
request = KafkaProtocol.encode_produce_request(KafkaClient.CLIENT_ID, requestId, payloads) return out
# Send the request
conn.send(requestId, request)
response = conn.recv(requestId)
for produce_response in KafkaProtocol.decode_produce_response(response):
# Check for errors
if fail_on_error == True and produce_response.error != ErrorMapping.NO_ERROR:
raise Exception("ProduceRequest for %s failed with errorcode=%d" %
(TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error))
# Run the callback
if callback is not None:
acc[(produce_response.topic, produce_response.partition)] = callback(produce_response)
else:
acc[(produce_response.topic, produce_response.partition)] = produce_response
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys)
def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None): def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None):
""" """
@@ -134,108 +184,63 @@ class KafkaClient(object):
Payloads are grouped by topic and partition so they can be pipelined to the same Payloads are grouped by topic and partition so they can be pipelined to the same
brokers. brokers.
""" """
# Group the produce requests by which broker they go to resps = self._send_broker_aware_request(payloads,
original_keys = [] KafkaProtocol.encode_fetch_request,
payloads_by_broker = defaultdict(list) KafkaProtocol.decode_fetch_response)
for payload in payloads: out = []
payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)].append(payload) for resp in resps:
original_keys.append((payload.topic, payload.partition)) # Check for errors
if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR:
raise Exception("FetchRequest for %s failed with errorcode=%d" %
(TopicAndPartition(resp.topic, resp.partition), resp.error))
# Run the callback
if callback is not None:
out.append(callback(resp))
else:
out.append(resp)
return out
# Accumulate the responses in a dictionary, keyed by topic+partition
acc = {}
# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
conn = self.get_conn_for_broker(broker)
requestId = self.next_id()
request = KafkaProtocol.encode_fetch_request(KafkaClient.CLIENT_ID, requestId, payloads)
# Send the request
conn.send(requestId, request)
response = conn.recv(requestId)
for fetch_response in KafkaProtocol.decode_fetch_response_iter(response):
# Check for errors
if fail_on_error == True and fetch_response.error != ErrorMapping.NO_ERROR:
raise Exception("FetchRequest %s failed with errorcode=%d" %
(TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error))
# Run the callback
if callback is not None:
acc[(fetch_response.topic, fetch_response.partition)] = callback(fetch_response)
else:
acc[(fetch_response.topic, fetch_response.partition)] = fetch_response
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys)
def try_send_request(self, requestId, request):
"""
Attempt to send a broker-agnostic request to one of the available brokers.
Keep trying until you succeed.
"""
for conn in self.conns.values():
try:
conn.send(requestId, request)
response = conn.recv(requestId)
return response
except Exception, e:
log.warning("Could not send request [%r] to server %s, trying next server: %s" % (request, conn, e))
continue
return None
def send_offset_request(self, payloads=[], fail_on_error=True, callback=None): def send_offset_request(self, payloads=[], fail_on_error=True, callback=None):
requestId = self.next_id() resps = self._send_broker_aware_request(payloads,
request = KafkaProtocol.encode_offset_request(KafkaClient.CLIENT_ID, requestId, payloads) KafkaProtocol.encode_offset_request,
response = self.try_send_request(requestId, request) KafkaProtocol.decode_offset_response)
if response is None:
if fail_on_error is True:
raise Exception("All servers failed to process request")
else:
return None
out = [] out = []
for offset_response in KafkaProtocol.decode_offset_response(response): for resp in resps:
if fail_on_error == True and offset_response.error != ErrorMapping.NO_ERROR: if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR:
raise Exception("OffsetRequest failed with errorcode=%s", offset_response.error) raise Exception("OffsetRequest failed with errorcode=%s", resp.error)
if callback is not None: if callback is not None:
out.append(callback(offset_response)) out.append(callback(resp))
else: else:
out.append(offset_response) out.append(resp)
return out return out
def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None):
requestId = self.next_id() raise NotImplementedError("Broker-managed offsets not supported in 0.8")
request = KafkaProtocol.encode_offset_commit_request(KafkaClient.CLIENT_ID, requestId, group, payloads) resps = self._send_broker_aware_request(payloads,
response = self.try_send_request(requestId, request) partial(KafkaProtocol.encode_offset_commit_request, group=group),
if response is None: KafkaProtocol.decode_offset_commit_response)
if fail_on_error is True:
raise Exception("All servers failed to process request")
else:
return None
out = [] out = []
for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response): for resp in resps:
log.debug(offset_commit_response) if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR:
if fail_on_error == True and offset_commit_response.error != ErrorMapping.NO_ERROR: raise Exception("OffsetCommitRequest failed with errorcode=%s", resp.error)
raise Exception("OffsetCommitRequest failed with errorcode=%s", offset_commit_response.error)
if callback is not None: if callback is not None:
out.append(callback(offset_commit_response)) out.append(callback(resp))
else: else:
out.append(offset_commit_response) out.append(resp)
return out return out
def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None): def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None):
requestId = self.next_id() raise NotImplementedError("Broker-managed offsets not supported in 0.8")
request = KafkaProtocol.encode_offset_fetch_request(KafkaClient.CLIENT_ID, requestId, group, payloads) resps = self._send_broker_aware_request(payloads,
response = self.try_send_request(requestId, request) partial(KafkaProtocol.encode_offset_commit_fetch, group=group),
if response is None: KafkaProtocol.decode_offset_fetch_response)
if fail_on_error is True:
raise Exception("All servers failed to process request")
else:
return None
out = [] out = []
for offset_fetch_response in KafkaProtocol.decode_offset_fetch_response(response): for resp in resps:
if fail_on_error == True and offset_fetch_response.error != ErrorMapping.NO_ERROR: if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR:
raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % ( raise Exception("OffsetCommitRequest failed with errorcode=%s", resp.error)
offset_fetch_response.topic, offset_fetch_response.partition, offset_fetch_response.error))
if callback is not None: if callback is not None:
out.append(callback(offset_fetch_response)) out.append(callback(resp))
else: else:
out.append(offset_fetch_response) out.append(resp)
return out return out

View File

@@ -1,3 +1,4 @@
from itertools import izip_longest, repeat
import logging import logging
from threading import Lock from threading import Lock
@@ -30,7 +31,7 @@ class SimpleConsumer(object):
self.client = client self.client = client
self.topic = topic self.topic = topic
self.group = group self.group = group
self.client.load_metadata_for_topics(topic) self.client._load_metadata_for_topics(topic)
self.offsets = {} self.offsets = {}
# Set up the auto-commit timer # Set up the auto-commit timer
@@ -54,12 +55,16 @@ class SimpleConsumer(object):
raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % ( raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % (
resp.topic, resp.partition, resp.error)) resp.topic, resp.partition, resp.error))
# Uncomment for 0.8.1
#
#for partition in self.client.topic_partitions[topic]:
# req = OffsetFetchRequest(topic, partition)
# (offset,) = self.client.send_offset_fetch_request(group, [req],
# callback=get_or_init_offset_callback, fail_on_error=False)
# self.offsets[partition] = offset
for partition in self.client.topic_partitions[topic]: for partition in self.client.topic_partitions[topic]:
req = OffsetFetchRequest(topic, partition) self.offsets[partition] = 0
(offset,) = self.client.send_offset_fetch_request(group, [req],
callback=get_or_init_offset_callback, fail_on_error=False)
self.offsets[partition] = offset
print self.offsets
def seek(self, offset, whence): def seek(self, offset, whence):
""" """
@@ -71,25 +76,30 @@ class SimpleConsumer(object):
1 is relative to the current offset 1 is relative to the current offset
2 is relative to the latest known offset (tail) 2 is relative to the latest known offset (tail)
""" """
if whence == 1: if whence == 1: # relative to current position
# relative to current position
for partition, _offset in self.offsets.items(): for partition, _offset in self.offsets.items():
self.offset[partition] = _offset + offset self.offset[partition] = _offset + offset
elif whence in (0, 2): elif whence in (0, 2): # relative to beginning or end
# relative to beginning or end # divide the request offset by number of partitions, distribute the remained evenly
(delta, rem) = divmod(offset, len(self.offsets))
deltas = {}
for partition, r in izip_longest(self.offsets.keys(), repeat(1, rem), fillvalue=0):
deltas[partition] = delta + r
reqs = [] reqs = []
for partition in offsets.keys(): for partition in self.offsets.keys():
if whence == 0: if whence == 0:
reqs.append(OffsetRequest(self.topic, partition, -2, 1)) reqs.append(OffsetRequest(self.topic, partition, -2, 1))
elif whence == 2: elif whence == 2:
reqs.append(OffsetRequest(self.topic, partition, -1, 1)) reqs.append(OffsetRequest(self.topic, partition, -1, 1))
else: else:
pass pass
resps = self.client.send_offset_request([req])
resps = self.client.send_offset_request(reqs)
for resp in resps: for resp in resps:
self.offsets[resp.partition] = resp.offsets[0] + offset self.offsets[resp.partition] = resp.offsets[0] + deltas[resp.partition]
else: else:
raise raise ValueError("Unexpected value for `whence`, %d" % whence)
def commit(self, partitions=[]): def commit(self, partitions=[]):
""" """
@@ -98,6 +108,8 @@ class SimpleConsumer(object):
partitions: list of partitions to commit, default is to commit all of them partitions: list of partitions to commit, default is to commit all of them
""" """
raise NotImplementedError("Broker-managed offsets not supported in 0.8")
# short circuit if nothing happened # short circuit if nothing happened
if self.count_since_commit == 0: if self.count_since_commit == 0:
return return
@@ -121,15 +133,31 @@ class SimpleConsumer(object):
self.count_since_commit = 0 self.count_since_commit = 0
def __iter__(self): def __iter__(self):
"""
Create an iterate per partition. Iterate through them calling next() until they are
all exhausted.
"""
iters = {} iters = {}
for partition, offset in self.offsets.items(): for partition, offset in self.offsets.items():
iters[partition] = self.__iter_partition__(partition, offset) iters[partition] = self.__iter_partition__(partition, offset)
if len(iters) == 0:
return
while True: while True:
for it in iters.values(): if len(iters) == 0:
yield it.next() break
for partition, it in iters.items():
try:
yield it.next()
except StopIteration:
log.debug("Done iterating over partition %s" % partition)
del iters[partition]
continue # skip auto-commit since we didn't yield anything
# auto commit logic
self.count_since_commit += 1 self.count_since_commit += 1
# deal with auto commits
if self.auto_commit is True: if self.auto_commit is True:
if self.auto_commit_every_n is not None and self.count_since_commit > self.auto_commit_every_n: if self.auto_commit_every_n is not None and self.count_since_commit > self.auto_commit_every_n:
if self.commit_timer is not None: if self.commit_timer is not None:
@@ -140,19 +168,22 @@ class SimpleConsumer(object):
self.commit() self.commit()
def __iter_partition__(self, partition, offset): def __iter_partition__(self, partition, offset):
"""
Iterate over the messages in a partition. Create a FetchRequest to get back
a batch of messages, yield them one at a time. After a batch is exhausted,
start a new batch unless we've reached the end of ths partition.
"""
while True: while True:
req = FetchRequest(self.topic, partition, offset, 1024) req = FetchRequest(self.topic, partition, offset, 1024) # TODO configure fetch size
(resp,) = self.client.send_fetch_request([req]) (resp,) = self.client.send_fetch_request([req])
assert resp.topic == self.topic assert resp.topic == self.topic
assert resp.partition == partition assert resp.partition == partition
next_offset = None next_offset = None
for message in resp.messages: for message in resp.messages:
next_offset = message.offset next_offset = message.offset
print partition, message, message.offset
yield message yield message
# update the internal state _after_ we yield the message # update the internal state _after_ we yield the message
self.offsets[partition] = message.offset self.offsets[partition] = message.offset
print partition, next_offset
if next_offset is None: if next_offset is None:
break break
else: else:

View File

@@ -13,10 +13,11 @@ class SimpleProducer(object):
def __init__(self, client, topic): def __init__(self, client, topic):
self.client = client self.client = client
self.topic = topic self.topic = topic
self.client.load_metadata_for_topics(topic) self.client._load_metadata_for_topics(topic)
self.next_partition = cycle(self.client.topic_partitions[topic]) self.next_partition = cycle(self.client.topic_partitions[topic])
def send_message(self, msg): def send_message(self, msg):
req = ProduceRequest(self.topic, self.next_partition.next(), req = ProduceRequest(self.topic, self.next_partition.next(),
messages=[create_message(msg)]) messages=[create_message(msg)])
resp = self.client.send_produce_request([req]).next() resp = self.client.send_produce_request([req])[0]
assert resp.error == 0

View File

@@ -217,7 +217,7 @@ class KafkaProtocol(object):
return struct.pack('>i%ds' % len(message), len(message), message) return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod @classmethod
def decode_fetch_response_iter(cls, data): def decode_fetch_response(cls, data):
""" """
Decode bytes to a FetchResponse Decode bytes to a FetchResponse

View File

@@ -8,6 +8,8 @@ from .client import KafkaClient, FetchRequest, ProduceRequest
log = logging.getLogger("kafka") log = logging.getLogger("kafka")
raise NotImplementedError("Still need to refactor this class")
class KafkaConsumerProcess(Process): class KafkaConsumerProcess(Process):
def __init__(self, client, topic, partition, out_queue, barrier, consumer_fetch_size=1024, consumer_sleep=200): def __init__(self, client, topic, partition, out_queue, barrier, consumer_fetch_size=1024, consumer_sleep=200):
self.client = copy(client) self.client = copy(client)

View File

@@ -243,7 +243,7 @@ class TestKafkaClient(unittest.TestCase):
def test_consume_none(self): def test_consume_none(self):
fetch = FetchRequest("test_consume_none", 0, 0, 1024) fetch = FetchRequest("test_consume_none", 0, 0, 1024)
fetch_resp = self.client.send_fetch_request([fetch]).next() fetch_resp = self.client.send_fetch_request([fetch])[0]
self.assertEquals(fetch_resp.error, 0) self.assertEquals(fetch_resp.error, 0)
self.assertEquals(fetch_resp.topic, "test_consume_none") self.assertEquals(fetch_resp.topic, "test_consume_none")
self.assertEquals(fetch_resp.partition, 0) self.assertEquals(fetch_resp.partition, 0)
@@ -263,7 +263,7 @@ class TestKafkaClient(unittest.TestCase):
fetch = FetchRequest("test_produce_consume", 0, 0, 1024) fetch = FetchRequest("test_produce_consume", 0, 0, 1024)
fetch_resp = self.client.send_fetch_request([fetch]).next() fetch_resp = self.client.send_fetch_request([fetch])[0]
self.assertEquals(fetch_resp.error, 0) self.assertEquals(fetch_resp.error, 0)
messages = list(fetch_resp.messages) messages = list(fetch_resp.messages)
@@ -343,6 +343,7 @@ class TestKafkaClient(unittest.TestCase):
# Offset Tests # # Offset Tests #
#################### ####################
@unittest.skip("No supported until 0.8.1")
def test_commit_fetch_offsets(self): def test_commit_fetch_offsets(self):
req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata") req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata")
(resp,) = self.client.send_offset_commit_request("group", [req]) (resp,) = self.client.send_offset_commit_request("group", [req])
@@ -428,23 +429,21 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(len(all_messages), 200) self.assertEquals(len(all_messages), 200)
self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes
# Produce more messages consumer.seek(-10, 2)
produce3 = ProduceRequest("test_consumer", 1, messages=[
create_message("Test message 3 %d" % i) for i in range(10)
])
for resp in self.client.send_produce_request([produce3]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 100)
# Start a new consumer, make sure we only get the newly produced messages
consumer = SimpleConsumer(self.client, "group1", "test_consumer")
all_messages = [] all_messages = []
for message in consumer: for message in consumer:
all_messages.append(message) all_messages.append(message)
self.assertEquals(len(all_messages), 10) self.assertEquals(len(all_messages), 10)
consumer.seek(-13, 2)
all_messages = []
for message in consumer:
all_messages.append(message)
self.assertEquals(len(all_messages), 13)
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
unittest.main() unittest.main()