Raise an error if we attempt to group duplicate topic-partition payloads
- previously this would simply drop one of the payloads
This commit is contained in:
@@ -423,6 +423,8 @@ class KafkaClient(object):
|
|||||||
|
|
||||||
Arguments:
|
Arguments:
|
||||||
payloads (list of ProduceRequest): produce requests to send to kafka
|
payloads (list of ProduceRequest): produce requests to send to kafka
|
||||||
|
ProduceRequest payloads must not contain duplicates for any
|
||||||
|
topic-partition.
|
||||||
acks (int, optional): how many acks the servers should receive from replica
|
acks (int, optional): how many acks the servers should receive from replica
|
||||||
brokers before responding to the request. If it is 0, the server
|
brokers before responding to the request. If it is 0, the server
|
||||||
will not send any response. If it is 1, the server will wait
|
will not send any response. If it is 1, the server will wait
|
||||||
|
@@ -82,6 +82,9 @@ def relative_unpack(fmt, data, cur):
|
|||||||
def group_by_topic_and_partition(tuples):
|
def group_by_topic_and_partition(tuples):
|
||||||
out = collections.defaultdict(dict)
|
out = collections.defaultdict(dict)
|
||||||
for t in tuples:
|
for t in tuples:
|
||||||
|
assert t.topic not in out or t.partition not in out[t.topic], \
|
||||||
|
'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__,
|
||||||
|
t.topic, t.partition)
|
||||||
out[t.topic][t.partition] = t
|
out[t.topic][t.partition] = t
|
||||||
return out
|
return out
|
||||||
|
|
||||||
|
@@ -107,7 +107,6 @@ class UtilTest(unittest.TestCase):
|
|||||||
t = kafka.common.TopicAndPartition
|
t = kafka.common.TopicAndPartition
|
||||||
|
|
||||||
l = [
|
l = [
|
||||||
t("a", 1),
|
|
||||||
t("a", 1),
|
t("a", 1),
|
||||||
t("a", 2),
|
t("a", 2),
|
||||||
t("a", 3),
|
t("a", 3),
|
||||||
@@ -124,3 +123,8 @@ class UtilTest(unittest.TestCase):
|
|||||||
3: t("b", 3),
|
3: t("b", 3),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
# should not be able to group duplicate topic-partitions
|
||||||
|
t1 = t("a", 1)
|
||||||
|
with self.assertRaises(AssertionError):
|
||||||
|
kafka.util.group_by_topic_and_partition([t1, t1])
|
||||||
|
Reference in New Issue
Block a user