Merge pull request #487 from dpkp/kafka_version_tests

Refactor kafka_versions to support arbitrary operators (> >= < <= ! =)
This commit is contained in:
Dana Powers
2015-12-10 14:55:15 -08:00
5 changed files with 47 additions and 56 deletions

View File

@@ -27,7 +27,6 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
cls.server.close()
cls.zk.close()
@kafka_versions("all")
def test_consume_none(self):
fetch = FetchRequest(self.bytes_topic, 0, 0, 1024)
@@ -39,7 +38,6 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
messages = list(fetch_resp.messages)
self.assertEqual(len(messages), 0)
@kafka_versions("all")
def test_ensure_topic_exists(self):
# assume that self.topic was created by setUp
@@ -50,7 +48,6 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
with self.assertRaises(KafkaTimeoutError):
self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0)
@kafka_versions('all')
def test_send_produce_request_maintains_request_response_order(self):
self.client.ensure_topic_exists(b'foo')
@@ -83,7 +80,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
# Offset Tests #
####################
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
@kafka_versions('>=0.8.1')
def test_commit_fetch_offsets(self):
req = OffsetCommitRequest(self.bytes_topic, 0, 42, b"metadata")
(resp,) = self.client.send_offset_commit_request(b"group", [req])

View File

@@ -78,7 +78,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
**configs)
return consumer
@kafka_versions("all")
def test_simple_consumer(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -90,7 +89,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
@kafka_versions('all')
def test_simple_consumer_smallest_offset_reset(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -102,7 +100,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# messages from beginning.
self.assert_message_count([message for message in consumer], 200)
@kafka_versions('all')
def test_simple_consumer_largest_offset_reset(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -120,7 +117,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Since the offset is set to largest we should read all the new messages.
self.assert_message_count([message for message in consumer], 200)
@kafka_versions('all')
def test_simple_consumer_no_reset(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -132,7 +128,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
with self.assertRaises(OffsetOutOfRangeError):
consumer.get_message()
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
@kafka_versions('>=0.8.1')
def test_simple_consumer_load_initial_offsets(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -149,7 +145,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer = self.consumer(auto_commit=False)
self.assertEqual(consumer.offsets, {0: 51, 1: 101})
@kafka_versions("all")
def test_simple_consumer__seek(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -180,7 +175,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
@kafka_versions("all")
def test_simple_consumer_blocking(self):
consumer = self.consumer()
@@ -214,7 +208,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
@kafka_versions("all")
def test_simple_consumer_pending(self):
# make sure that we start with no pending messages
consumer = self.consumer()
@@ -242,7 +235,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEquals(set([0, 1]), set([pending_part1, pending_part2]))
consumer.stop()
@kafka_versions("all")
def test_multi_process_consumer(self):
# Produce 100 messages to partitions 0 and 1
self.send_messages(0, range(0, 100))
@@ -254,7 +246,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
@kafka_versions("all")
def test_multi_process_consumer_blocking(self):
consumer = self.consumer(consumer = MultiProcessConsumer)
@@ -292,7 +283,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
@kafka_versions("all")
def test_multi_proc_pending(self):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
@@ -308,7 +298,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
@kafka_versions('>=0.8.1')
def test_multi_process_consumer_load_initial_offsets(self):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
@@ -326,7 +316,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
auto_commit=False)
self.assertEqual(consumer.offsets, {0: 5, 1: 15})
@kafka_versions("all")
def test_large_messages(self):
# Produce 10 "normal" size messages
small_messages = self.send_messages(0, [ str(x) for x in range(10) ])
@@ -343,7 +332,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
@kafka_versions("all")
def test_huge_messages(self):
huge_message, = self.send_messages(0, [
create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)),
@@ -374,7 +362,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
big_consumer.stop()
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
@kafka_versions('>=0.8.1')
def test_offset_behavior__resuming_behavior(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -401,7 +389,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer1.stop()
consumer2.stop()
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
@kafka_versions('>=0.8.1')
def test_multi_process_offset_behavior__resuming_behavior(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -437,7 +425,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer2.stop()
# TODO: Make this a unit test -- should not require integration
@kafka_versions("all")
def test_fetch_buffer_size(self):
# Test parameters (see issue 135 / PR 136)
@@ -455,7 +442,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = [ message for message in consumer ]
self.assertEqual(len(messages), 2)
@kafka_versions("all")
def test_kafka_consumer(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -476,7 +462,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEqual(len(messages[0]), 100)
self.assertEqual(len(messages[1]), 100)
@kafka_versions("all")
def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500
consumer = self.kafka_consumer(auto_offset_reset='smallest',
@@ -509,7 +494,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEqual(len(messages), 5)
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
@kafka_versions('>=0.8.1')
def test_kafka_consumer__offset_commit_resume(self):
GROUP_ID = random_string(10).encode('utf-8')

View File

@@ -8,9 +8,7 @@ from kafka.producer.base import Producer
from kafka.util import kafka_bytestring
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
KafkaIntegrationTestCase, kafka_versions, random_string
)
from test.testutil import KafkaIntegrationTestCase, random_string
log = logging.getLogger(__name__)
@@ -21,7 +19,7 @@ class TestFailover(KafkaIntegrationTestCase):
def setUp(self):
if not os.environ.get('KAFKA_VERSION'):
return
self.skipTest('integration test requires KAFKA_VERSION')
zk_chroot = random_string(10)
replicas = 3
@@ -46,7 +44,6 @@ class TestFailover(KafkaIntegrationTestCase):
broker.close()
self.zk.close()
@kafka_versions("all")
def test_switch_leader(self):
topic = self.topic
partition = 0
@@ -94,7 +91,6 @@ class TestFailover(KafkaIntegrationTestCase):
self.assert_message_count(topic, 201, partitions=(partition,),
at_least=True)
@kafka_versions("all")
def test_switch_leader_async(self):
topic = self.topic
partition = 0
@@ -142,7 +138,6 @@ class TestFailover(KafkaIntegrationTestCase):
self.assert_message_count(topic, 21, partitions=(partition + 1,),
at_least=True)
@kafka_versions("all")
def test_switch_leader_keyed_producer(self):
topic = self.topic
@@ -180,7 +175,6 @@ class TestFailover(KafkaIntegrationTestCase):
msg = random_string(10).encode('utf-8')
producer.send_messages(topic, key, msg)
@kafka_versions("all")
def test_switch_leader_simple_consumer(self):
producer = Producer(self.client, async=False)
consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10)

View File

@@ -38,7 +38,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
cls.server.close()
cls.zk.close()
@kafka_versions("all")
def test_produce_many_simple(self):
start_offset = self.current_offset(self.topic, 0)
@@ -56,7 +55,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
100,
)
@kafka_versions("all")
def test_produce_10k_simple(self):
start_offset = self.current_offset(self.topic, 0)
@@ -67,7 +65,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
10000,
)
@kafka_versions("all")
def test_produce_many_gzip(self):
start_offset = self.current_offset(self.topic, 0)
@@ -82,7 +79,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
200,
)
@kafka_versions("all")
def test_produce_many_snappy(self):
self.skipTest("All snappy integration tests fail with nosnappyjava")
start_offset = self.current_offset(self.topic, 0)
@@ -95,7 +91,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
200,
)
@kafka_versions("all")
def test_produce_mixed(self):
start_offset = self.current_offset(self.topic, 0)
@@ -113,7 +108,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assert_produce_request(messages, start_offset, msg_count)
@kafka_versions("all")
def test_produce_100k_gzipped(self):
start_offset = self.current_offset(self.topic, 0)
@@ -139,7 +133,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# SimpleProducer Tests #
############################
@kafka_versions("all")
def test_simple_producer(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -164,7 +157,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
@kafka_versions("all")
def test_produce__new_topic_fails_with_reasonable_error(self):
new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8')
producer = SimpleProducer(self.client, random_start=False)
@@ -174,7 +166,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
LeaderNotAvailableError)):
producer.send_messages(new_topic, self.msg("one"))
@kafka_versions("all")
def test_producer_random_order(self):
producer = SimpleProducer(self.client, random_start=True)
resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
@@ -184,7 +175,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assertEqual(resp1[0].partition, resp3[0].partition)
self.assertNotEqual(resp1[0].partition, resp2[0].partition)
@kafka_versions("all")
def test_producer_ordered_start(self):
producer = SimpleProducer(self.client, random_start=False)
resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
@@ -195,7 +185,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assertEqual(resp2[0].partition, 1)
self.assertEqual(resp3[0].partition, 0)
@kafka_versions("all")
def test_async_simple_producer(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
@@ -210,7 +199,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
@kafka_versions("all")
def test_batched_simple_producer__triggers_by_message(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -278,7 +266,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
@kafka_versions("all")
def test_batched_simple_producer__triggers_by_time(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -339,7 +326,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# KeyedProducer Tests #
############################
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
@kafka_versions('>=0.8.1')
def test_keyedproducer_null_payload(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -361,7 +348,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
@kafka_versions("all")
def test_round_robin_partitioner(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -382,7 +368,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
@kafka_versions("all")
def test_hashed_partitioner(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -414,7 +399,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
@kafka_versions("all")
def test_async_keyed_producer(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
@@ -436,7 +420,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# Producer ACK Tests #
############################
@kafka_versions("all")
def test_acks_none(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
@@ -454,7 +437,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
producer.stop()
@kafka_versions("all")
def test_acks_local_write(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
@@ -470,7 +452,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
@kafka_versions("all")
def test_acks_cluster_commit(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)

View File

@@ -1,5 +1,6 @@
import functools
import logging
import operator
import os
import random
import socket
@@ -26,15 +27,48 @@ def random_string(l):
return "".join(random.choice(string.ascii_letters) for i in xrange(l))
def kafka_versions(*versions):
def version_str_to_list(s):
return list(map(int, s.split('.'))) # e.g., [0, 8, 1, 1]
def construct_lambda(s):
if s[0].isdigit():
op_str = '='
v_str = s
elif s[1].isdigit():
op_str = s[0] # ! < > =
v_str = s[1:]
elif s[2].isdigit():
op_str = s[0:2] # >= <=
v_str = s[2:]
else:
raise ValueError('Unrecognized kafka version / operator: %s' % s)
op_map = {
'=': operator.eq,
'!': operator.ne,
'>': operator.gt,
'<': operator.lt,
'>=': operator.ge,
'<=': operator.le
}
op = op_map[op_str]
version = version_str_to_list(v_str)
return lambda a: op(version_str_to_list(a), version)
validators = map(construct_lambda, versions)
def kafka_versions(func):
@functools.wraps(func)
def wrapper(self):
kafka_version = os.environ.get('KAFKA_VERSION')
if not kafka_version:
self.skipTest("no kafka version specified")
elif 'all' not in versions and kafka_version not in versions:
self.skipTest("unsupported kafka version")
self.skipTest("no kafka version set in KAFKA_VERSION env var")
for f in validators:
if not f(kafka_version):
self.skipTest("unsupported kafka version")
return func(self)
return wrapper
@@ -57,7 +91,7 @@ class KafkaIntegrationTestCase(unittest.TestCase):
def setUp(self):
super(KafkaIntegrationTestCase, self).setUp()
if not os.environ.get('KAFKA_VERSION'):
return
self.skipTest('Integration test requires KAFKA_VERSION')
if not self.topic:
topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))