Fixed TestKafkaProducerIntegration
This commit is contained in:
@@ -199,10 +199,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
start_offset1 = self.current_offset(self.topic, 1)
|
||||
|
||||
producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
|
||||
resp1 = producer.send(self.topic, "key1", self.msg("one"))
|
||||
resp2 = producer.send(self.topic, "key2", self.msg("two"))
|
||||
resp3 = producer.send(self.topic, "key3", self.msg("three"))
|
||||
resp4 = producer.send(self.topic, "key4", self.msg("four"))
|
||||
resp1 = producer.send(self.topic, self.key("key1"), self.msg("one"))
|
||||
resp2 = producer.send(self.topic, self.key("key2"), self.msg("two"))
|
||||
resp3 = producer.send(self.topic, self.key("key3"), self.msg("three"))
|
||||
resp4 = producer.send(self.topic, self.key("key4"), self.msg("four"))
|
||||
|
||||
self.assert_produce_response(resp1, start_offset0+0)
|
||||
self.assert_produce_response(resp2, start_offset1+0)
|
||||
@@ -220,20 +220,28 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
start_offset1 = self.current_offset(self.topic, 1)
|
||||
|
||||
producer = KeyedProducer(self.client, partitioner=HashedPartitioner)
|
||||
resp1 = producer.send(self.topic, 1, self.msg("one"))
|
||||
resp2 = producer.send(self.topic, 2, self.msg("two"))
|
||||
resp3 = producer.send(self.topic, 3, self.msg("three"))
|
||||
resp4 = producer.send(self.topic, 3, self.msg("four"))
|
||||
resp5 = producer.send(self.topic, 4, self.msg("five"))
|
||||
resp1 = producer.send(self.topic, self.key("1"), self.msg("one"))
|
||||
resp2 = producer.send(self.topic, self.key("2"), self.msg("two"))
|
||||
resp3 = producer.send(self.topic, self.key("3"), self.msg("three"))
|
||||
resp4 = producer.send(self.topic, self.key("3"), self.msg("four"))
|
||||
resp5 = producer.send(self.topic, self.key("4"), self.msg("five"))
|
||||
|
||||
self.assert_produce_response(resp1, start_offset1+0)
|
||||
self.assert_produce_response(resp2, start_offset0+0)
|
||||
self.assert_produce_response(resp3, start_offset1+1)
|
||||
self.assert_produce_response(resp4, start_offset1+2)
|
||||
self.assert_produce_response(resp5, start_offset0+1)
|
||||
offsets = {0: start_offset0, 1: start_offset1}
|
||||
messages = {0: [], 1: []}
|
||||
|
||||
self.assert_fetch_offset(0, start_offset0, [ self.msg("two"), self.msg("five") ])
|
||||
self.assert_fetch_offset(1, start_offset1, [ self.msg("one"), self.msg("three"), self.msg("four") ])
|
||||
keys = [self.key(k) for k in ["1", "2", "3", "3", "4"]]
|
||||
resps = [resp1, resp2, resp3, resp4, resp5]
|
||||
msgs = [self.msg(m) for m in ["one", "two", "three", "four", "five"]]
|
||||
|
||||
for key, resp, msg in zip(keys, resps, msgs):
|
||||
k = hash(key) % 2
|
||||
offset = offsets[k]
|
||||
self.assert_produce_response(resp, offset)
|
||||
offsets[k] += 1
|
||||
messages[k].append(msg)
|
||||
|
||||
self.assert_fetch_offset(0, start_offset0, messages[0])
|
||||
self.assert_fetch_offset(1, start_offset1, messages[1])
|
||||
|
||||
producer.stop()
|
||||
|
||||
@@ -393,7 +401,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)
|
||||
|
||||
resp = producer.send(self.topic, "key1", self.msg("one"))
|
||||
resp = producer.send(self.topic, self.key("key1"), self.msg("one"))
|
||||
self.assertEquals(len(resp), 0)
|
||||
|
||||
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
|
||||
|
||||
@@ -89,6 +89,10 @@ class KafkaIntegrationTestCase(unittest.TestCase):
|
||||
|
||||
return self._messages[s].encode('utf-8')
|
||||
|
||||
def key(self, k):
|
||||
return k.encode('utf-8')
|
||||
|
||||
|
||||
class Timer(object):
|
||||
def __enter__(self):
|
||||
self.start = time.time()
|
||||
|
||||
Reference in New Issue
Block a user