Merge pull request #314 from dpkp/keyed_producer_failover

Handle keyed producer failover
This commit is contained in:
Dana Powers
2015-02-19 23:25:19 -08:00
6 changed files with 231 additions and 145 deletions

View File

@@ -12,14 +12,13 @@ class Partitioner(object):
""" """
self.partitions = partitions self.partitions = partitions
def partition(self, key, partitions): def partition(self, key, partitions=None):
""" """
Takes a string key and num_partitions as argument and returns Takes a string key and num_partitions as argument and returns
a partition to be used for the message a partition to be used for the message
Arguments: Arguments:
partitions: The list of partitions is passed in every call. This key: the key to use for partitioning
may look like an overhead, but it will be useful partitions: (optional) a list of partitions.
(in future) when we handle cases like rebalancing
""" """
raise NotImplementedError('partition function has to be implemented') raise NotImplementedError('partition function has to be implemented')

View File

@@ -5,7 +5,9 @@ class HashedPartitioner(Partitioner):
Implements a partitioner which selects the target partition based on Implements a partitioner which selects the target partition based on
the hash of the key the hash of the key
""" """
def partition(self, key, partitions): def partition(self, key, partitions=None):
if not partitions:
partitions = self.partitions
size = len(partitions) size = len(partitions)
idx = hash(key) % size idx = hash(key) % size

View File

@@ -15,9 +15,9 @@ class RoundRobinPartitioner(Partitioner):
self.partitions = partitions self.partitions = partitions
self.iterpart = cycle(partitions) self.iterpart = cycle(partitions)
def partition(self, key, partitions): def partition(self, key, partitions=None):
# Refresh the partition list if necessary # Refresh the partition list if necessary
if self.partitions != partitions: if partitions and self.partitions != partitions:
self._set_partitions(partitions) self._set_partitions(partitions)
return next(self.iterpart) return next(self.iterpart)

View File

@@ -54,7 +54,7 @@ class KeyedProducer(Producer):
self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic)) self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic))
partitioner = self.partitioners[topic] partitioner = self.partitioners[topic]
return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic)) return partitioner.partition(key)
def send_messages(self,topic,key,*msg): def send_messages(self,topic,key,*msg):
partition = self._next_partition(topic, key) partition = self._next_partition(topic, key)

View File

@@ -7,6 +7,7 @@ from . import unittest
from kafka import KafkaClient, SimpleConsumer from kafka import KafkaClient, SimpleConsumer
from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
from kafka.producer.base import Producer from kafka.producer.base import Producer
from kafka.producer import KeyedProducer
from test.fixtures import ZookeeperFixture, KafkaFixture from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import ( from test.testutil import (
@@ -17,8 +18,7 @@ from test.testutil import (
class TestFailover(KafkaIntegrationTestCase): class TestFailover(KafkaIntegrationTestCase):
create_client = False create_client = False
@classmethod def setUp(self):
def setUpClass(cls): # noqa
if not os.environ.get('KAFKA_VERSION'): if not os.environ.get('KAFKA_VERSION'):
return return
@@ -27,33 +27,41 @@ class TestFailover(KafkaIntegrationTestCase):
partitions = 2 partitions = 2
# mini zookeeper, 2 kafka brokers # mini zookeeper, 2 kafka brokers
cls.zk = ZookeeperFixture.instance() self.zk = ZookeeperFixture.instance()
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers] hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers]
cls.client = KafkaClient(hosts) self.client = KafkaClient(hosts)
super(TestFailover, self).setUp()
@classmethod def tearDown(self):
def tearDownClass(cls): super(TestFailover, self).tearDown()
if not os.environ.get('KAFKA_VERSION'): if not os.environ.get('KAFKA_VERSION'):
return return
cls.client.close() self.client.close()
for broker in cls.brokers: for broker in self.brokers:
broker.close() broker.close()
cls.zk.close() self.zk.close()
@kafka_versions("all") @kafka_versions("all")
def test_switch_leader(self): def test_switch_leader(self):
topic = self.topic topic = self.topic
partition = 0 partition = 0
# Test the base class Producer -- send_messages to a specific partition # Testing the base Producer class here so that we can easily send
# messages to a specific partition, kill the leader for that partition
# and check that after another broker takes leadership the producer
# is able to resume sending messages
# require that the server commit messages to all in-sync replicas
# so that failover doesn't lose any messages on server-side
# and we can assert that server-side message count equals client-side
producer = Producer(self.client, async=False, producer = Producer(self.client, async=False,
req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT) req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
# Send 10 random messages # Send 100 random messages to a specific partition
self._send_random_messages(producer, topic, partition, 100) self._send_random_messages(producer, topic, partition, 100)
# kill leader for partition # kill leader for partition
@@ -80,7 +88,7 @@ class TestFailover(KafkaIntegrationTestCase):
self._send_random_messages(producer, topic, partition, 100) self._send_random_messages(producer, topic, partition, 100)
# count number of messages # count number of messages
# Should be equal to 10 before + 1 recovery + 10 after # Should be equal to 100 before + 1 recovery + 100 after
self.assert_message_count(topic, 201, partitions=(partition,)) self.assert_message_count(topic, 201, partitions=(partition,))
@@ -116,6 +124,45 @@ class TestFailover(KafkaIntegrationTestCase):
# Should be equal to 10 before + 1 recovery + 10 after # Should be equal to 10 before + 1 recovery + 10 after
self.assert_message_count(topic, 21, partitions=(partition,)) self.assert_message_count(topic, 21, partitions=(partition,))
@kafka_versions("all")
def test_switch_leader_keyed_producer(self):
topic = self.topic
producer = KeyedProducer(self.client, async=False)
# Send 10 random messages
for _ in range(10):
key = random_string(3)
msg = random_string(10)
producer.send_messages(topic, key, msg)
# kill leader for partition 0
self._kill_leader(topic, 0)
recovered = False
started = time.time()
timeout = 60
while not recovered and (time.time() - started) < timeout:
try:
key = random_string(3)
msg = random_string(10)
producer.send_messages(topic, key, msg)
if producer.partitioners[topic].partition(key) == 0:
recovered = True
except (FailedPayloadsError, ConnectionError):
logging.debug("caught exception sending message -- will retry")
continue
# Verify we successfully sent the message
self.assertTrue(recovered)
# send some more messages just to make sure no more exceptions
for _ in range(10):
key = random_string(3)
msg = random_string(10)
producer.send_messages(topic, key, msg)
def _send_random_messages(self, producer, topic, partition, n): def _send_random_messages(self, producer, topic, partition, n):
for j in range(n): for j in range(n):
logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j) logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j)

View File

@@ -14,12 +14,12 @@ from kafka.common import (
FetchRequest, ProduceRequest, FetchRequest, ProduceRequest,
UnknownTopicOrPartitionError, LeaderNotAvailableError UnknownTopicOrPartitionError, LeaderNotAvailableError
) )
from kafka.producer.base import Producer
from test.fixtures import ZookeeperFixture, KafkaFixture from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, kafka_versions from test.testutil import KafkaIntegrationTestCase, kafka_versions
class TestKafkaProducerIntegration(KafkaIntegrationTestCase): class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
topic = b'produce_topic'
@classmethod @classmethod
def setUpClass(cls): # noqa def setUpClass(cls): # noqa
@@ -140,25 +140,26 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all") @kafka_versions("all")
def test_simple_producer(self): def test_simple_producer(self):
start_offset0 = self.current_offset(self.topic, 0) partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offset1 = self.current_offset(self.topic, 1) start_offsets = [self.current_offset(self.topic, p) for p in partitions]
producer = SimpleProducer(self.client, random_start=False) producer = SimpleProducer(self.client, random_start=False)
# Goes to first partition, randomly. # Goes to first partition, randomly.
resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
self.assert_produce_response(resp, start_offset0) self.assert_produce_response(resp, start_offsets[0])
# Goes to the next partition, randomly. # Goes to the next partition, randomly.
resp = producer.send_messages(self.topic, self.msg("three")) resp = producer.send_messages(self.topic, self.msg("three"))
self.assert_produce_response(resp, start_offset1) self.assert_produce_response(resp, start_offsets[1])
self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two") ]) self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two") ])
self.assert_fetch_offset(1, start_offset1, [ self.msg("three") ]) self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("three") ])
# Goes back to the first partition because there's only two partitions # Goes back to the first partition because there's only two partitions
resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five")) resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
self.assert_produce_response(resp, start_offset0+2) self.assert_produce_response(resp, start_offsets[0]+2)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ]) self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ])
producer.stop() producer.stop()
@@ -194,110 +195,38 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assertEqual(resp3[0].partition, 0) self.assertEqual(resp3[0].partition, 0)
@kafka_versions("all") @kafka_versions("all")
def test_round_robin_partitioner(self): def test_async_simple_producer(self):
start_offset0 = self.current_offset(self.topic, 0) partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset1 = self.current_offset(self.topic, 1) start_offset = self.current_offset(self.topic, partition)
producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) producer = SimpleProducer(self.client, async=True, random_start=False)
resp1 = producer.send(self.topic, self.key("key1"), self.msg("one"))
resp2 = producer.send(self.topic, self.key("key2"), self.msg("two"))
resp3 = producer.send(self.topic, self.key("key3"), self.msg("three"))
resp4 = producer.send(self.topic, self.key("key4"), self.msg("four"))
self.assert_produce_response(resp1, start_offset0+0)
self.assert_produce_response(resp2, start_offset1+0)
self.assert_produce_response(resp3, start_offset0+1)
self.assert_produce_response(resp4, start_offset1+1)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("three") ])
self.assert_fetch_offset(1, start_offset1, [ self.msg("two"), self.msg("four") ])
producer.stop()
@kafka_versions("all")
def test_hashed_partitioner(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
producer = KeyedProducer(self.client, partitioner=HashedPartitioner)
resp1 = producer.send(self.topic, self.key("1"), self.msg("one"))
resp2 = producer.send(self.topic, self.key("2"), self.msg("two"))
resp3 = producer.send(self.topic, self.key("3"), self.msg("three"))
resp4 = producer.send(self.topic, self.key("3"), self.msg("four"))
resp5 = producer.send(self.topic, self.key("4"), self.msg("five"))
offsets = {0: start_offset0, 1: start_offset1}
messages = {0: [], 1: []}
keys = [self.key(k) for k in ["1", "2", "3", "3", "4"]]
resps = [resp1, resp2, resp3, resp4, resp5]
msgs = [self.msg(m) for m in ["one", "two", "three", "four", "five"]]
for key, resp, msg in zip(keys, resps, msgs):
k = hash(key) % 2
offset = offsets[k]
self.assert_produce_response(resp, offset)
offsets[k] += 1
messages[k].append(msg)
self.assert_fetch_offset(0, start_offset0, messages[0])
self.assert_fetch_offset(1, start_offset1, messages[1])
producer.stop()
@kafka_versions("all")
def test_acks_none(self):
start_offset0 = self.current_offset(self.topic, 0)
producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED,
random_start=False)
resp = producer.send_messages(self.topic, self.msg("one")) resp = producer.send_messages(self.topic, self.msg("one"))
self.assertEqual(len(resp), 0) self.assertEqual(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) # wait for the server to report a new highwatermark
producer.stop() while self.current_offset(self.topic, partition) == start_offset:
time.sleep(0.1)
@kafka_versions("all") self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
def test_acks_local_write(self):
start_offset0 = self.current_offset(self.topic, 0)
producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
random_start=False)
resp = producer.send_messages(self.topic, self.msg("one"))
self.assert_produce_response(resp, start_offset0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
producer.stop()
@kafka_versions("all")
def test_acks_cluster_commit(self):
start_offset0 = self.current_offset(self.topic, 0)
producer = SimpleProducer(
self.client,
req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT,
random_start=False)
resp = producer.send_messages(self.topic, self.msg("one"))
self.assert_produce_response(resp, start_offset0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
producer.stop() producer.stop()
@kafka_versions("all") @kafka_versions("all")
def test_batched_simple_producer__triggers_by_message(self): def test_batched_simple_producer__triggers_by_message(self):
start_offset0 = self.current_offset(self.topic, 0) partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offset1 = self.current_offset(self.topic, 1) start_offsets = [self.current_offset(self.topic, p) for p in partitions]
# Configure batch producer
batch_messages = 5
batch_interval = 5
producer = SimpleProducer( producer = SimpleProducer(
self.client, self.client,
batch_send=True, batch_send=True,
batch_send_every_n=5, batch_send_every_n=batch_messages,
batch_send_every_t=20, batch_send_every_t=batch_interval,
random_start=False) random_start=False)
# Send 5 messages and do a fetch # Send 4 messages -- should not trigger a batch
resp = producer.send_messages(self.topic, resp = producer.send_messages(self.topic,
self.msg("one"), self.msg("one"),
self.msg("two"), self.msg("two"),
@@ -309,9 +238,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assertEqual(len(resp), 0) self.assertEqual(len(resp), 0)
# It hasn't sent yet # It hasn't sent yet
self.assert_fetch_offset(0, start_offset0, []) self.assert_fetch_offset(partitions[0], start_offsets[0], [])
self.assert_fetch_offset(1, start_offset1, []) self.assert_fetch_offset(partitions[1], start_offsets[1], [])
# send 3 more messages -- should trigger batch on first 5
resp = producer.send_messages(self.topic, resp = producer.send_messages(self.topic,
self.msg("five"), self.msg("five"),
self.msg("six"), self.msg("six"),
@@ -321,30 +251,32 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# Batch mode is async. No ack # Batch mode is async. No ack
self.assertEqual(len(resp), 0) self.assertEqual(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [ # send messages groups all *msgs in a single call to the same partition
# so we should see all messages from the first call in one partition
self.assert_fetch_offset(partitions[0], start_offsets[0], [
self.msg("one"), self.msg("one"),
self.msg("two"), self.msg("two"),
self.msg("three"), self.msg("three"),
self.msg("four"), self.msg("four"),
]) ])
self.assert_fetch_offset(1, start_offset1, [ # Because we are batching every 5 messages, we should only see one
self.assert_fetch_offset(partitions[1], start_offsets[1], [
self.msg("five"), self.msg("five"),
# self.msg("six"),
# self.msg("seven"),
]) ])
producer.stop() producer.stop()
@kafka_versions("all") @kafka_versions("all")
def test_batched_simple_producer__triggers_by_time(self): def test_batched_simple_producer__triggers_by_time(self):
start_offset0 = self.current_offset(self.topic, 0) partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offset1 = self.current_offset(self.topic, 1) start_offsets = [self.current_offset(self.topic, p) for p in partitions]
batch_interval = 5
producer = SimpleProducer(self.client, producer = SimpleProducer(self.client,
batch_send=True, batch_send=True,
batch_send_every_n=100, batch_send_every_n=100,
batch_send_every_t=5, batch_send_every_t=batch_interval,
random_start=False) random_start=False)
# Send 5 messages and do a fetch # Send 5 messages and do a fetch
@@ -359,8 +291,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assertEqual(len(resp), 0) self.assertEqual(len(resp), 0)
# It hasn't sent yet # It hasn't sent yet
self.assert_fetch_offset(0, start_offset0, []) self.assert_fetch_offset(partitions[0], start_offsets[0], [])
self.assert_fetch_offset(1, start_offset1, []) self.assert_fetch_offset(partitions[1], start_offsets[1], [])
resp = producer.send_messages(self.topic, resp = producer.send_messages(self.topic,
self.msg("five"), self.msg("five"),
@@ -372,16 +304,16 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assertEqual(len(resp), 0) self.assertEqual(len(resp), 0)
# Wait the timeout out # Wait the timeout out
time.sleep(5) time.sleep(batch_interval)
self.assert_fetch_offset(0, start_offset0, [ self.assert_fetch_offset(partitions[0], start_offsets[0], [
self.msg("one"), self.msg("one"),
self.msg("two"), self.msg("two"),
self.msg("three"), self.msg("three"),
self.msg("four"), self.msg("four"),
]) ])
self.assert_fetch_offset(1, start_offset1, [ self.assert_fetch_offset(partitions[1], start_offsets[1], [
self.msg("five"), self.msg("five"),
self.msg("six"), self.msg("six"),
self.msg("seven"), self.msg("seven"),
@@ -389,40 +321,146 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop() producer.stop()
############################
# KeyedProducer Tests #
############################
@kafka_versions("all") @kafka_versions("all")
def test_async_simple_producer(self): def test_round_robin_partitioner(self):
start_offset0 = self.current_offset(self.topic, 0) partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
producer = SimpleProducer(self.client, async=True, random_start=False) producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
resp = producer.send_messages(self.topic, self.msg("one")) resp1 = producer.send(self.topic, self.key("key1"), self.msg("one"))
self.assertEqual(len(resp), 0) resp2 = producer.send(self.topic, self.key("key2"), self.msg("two"))
resp3 = producer.send(self.topic, self.key("key3"), self.msg("three"))
resp4 = producer.send(self.topic, self.key("key4"), self.msg("four"))
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) self.assert_produce_response(resp1, start_offsets[0]+0)
self.assert_produce_response(resp2, start_offsets[1]+0)
self.assert_produce_response(resp3, start_offsets[0]+1)
self.assert_produce_response(resp4, start_offsets[1]+1)
self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("three") ])
self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("two"), self.msg("four") ])
producer.stop()
@kafka_versions("all")
def test_hashed_partitioner(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
producer = KeyedProducer(self.client, partitioner=HashedPartitioner)
resp1 = producer.send(self.topic, self.key("1"), self.msg("one"))
resp2 = producer.send(self.topic, self.key("2"), self.msg("two"))
resp3 = producer.send(self.topic, self.key("3"), self.msg("three"))
resp4 = producer.send(self.topic, self.key("3"), self.msg("four"))
resp5 = producer.send(self.topic, self.key("4"), self.msg("five"))
offsets = {partitions[0]: start_offsets[0], partitions[1]: start_offsets[1]}
messages = {partitions[0]: [], partitions[1]: []}
keys = [self.key(k) for k in ["1", "2", "3", "3", "4"]]
resps = [resp1, resp2, resp3, resp4, resp5]
msgs = [self.msg(m) for m in ["one", "two", "three", "four", "five"]]
for key, resp, msg in zip(keys, resps, msgs):
k = hash(key) % 2
partition = partitions[k]
offset = offsets[partition]
self.assert_produce_response(resp, offset)
offsets[partition] += 1
messages[partition].append(msg)
self.assert_fetch_offset(partitions[0], start_offsets[0], messages[partitions[0]])
self.assert_fetch_offset(partitions[1], start_offsets[1], messages[partitions[1]])
producer.stop() producer.stop()
@kafka_versions("all") @kafka_versions("all")
def test_async_keyed_producer(self): def test_async_keyed_producer(self):
start_offset0 = self.current_offset(self.topic, 0) partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True) producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)
resp = producer.send(self.topic, self.key("key1"), self.msg("one")) resp = producer.send(self.topic, self.key("key1"), self.msg("one"))
self.assertEqual(len(resp), 0) self.assertEqual(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) # wait for the server to report a new highwatermark
while self.current_offset(self.topic, partition) == start_offset:
time.sleep(0.1)
self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
producer.stop() producer.stop()
def assert_produce_request(self, messages, initial_offset, message_ct): ############################
produce = ProduceRequest(self.topic, 0, messages=messages) # Producer ACK Tests #
############################
@kafka_versions("all")
def test_acks_none(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
producer = Producer(
self.client,
req_acks=Producer.ACK_NOT_REQUIRED,
)
resp = producer.send_messages(self.topic, partition, self.msg("one"))
# No response from produce request with no acks required
self.assertEqual(len(resp), 0)
# But the message should still have been delivered
self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
producer.stop()
@kafka_versions("all")
def test_acks_local_write(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
producer = Producer(
self.client,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
)
resp = producer.send_messages(self.topic, partition, self.msg("one"))
self.assert_produce_response(resp, start_offset)
self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
producer.stop()
@kafka_versions("all")
def test_acks_cluster_commit(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
producer = Producer(
self.client,
req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
)
resp = producer.send_messages(self.topic, partition, self.msg("one"))
self.assert_produce_response(resp, start_offset)
self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
producer.stop()
def assert_produce_request(self, messages, initial_offset, message_ct,
partition=0):
produce = ProduceRequest(self.topic, partition, messages=messages)
# There should only be one response message from the server. # There should only be one response message from the server.
# This will throw an exception if there's more than one. # This will throw an exception if there's more than one.
resp = self.client.send_produce_request([ produce ]) resp = self.client.send_produce_request([ produce ])
self.assert_produce_response(resp, initial_offset) self.assert_produce_response(resp, initial_offset)
self.assertEqual(self.current_offset(self.topic, 0), initial_offset + message_ct) self.assertEqual(self.current_offset(self.topic, partition), initial_offset + message_ct)
def assert_produce_response(self, resp, initial_offset): def assert_produce_response(self, resp, initial_offset):
self.assertEqual(len(resp), 1) self.assertEqual(len(resp), 1)