Split out kafka version environments, default tox no longer runs any integration tests, make skipped integration also skip setupClass, implement rudimentary offset support in consumer.py
This commit is contained in:
@@ -10,6 +10,7 @@ from Queue import Empty, Queue
|
||||
from kafka.common import (
|
||||
ErrorMapping, FetchRequest,
|
||||
OffsetRequest, OffsetCommitRequest,
|
||||
OffsetFetchRequest,
|
||||
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
|
||||
)
|
||||
|
||||
@@ -105,17 +106,16 @@ class Consumer(object):
|
||||
"partition=%d failed with errorcode=%s" % (
|
||||
resp.topic, resp.partition, resp.error))
|
||||
|
||||
# Uncomment for 0.8.1
|
||||
#
|
||||
#for partition in partitions:
|
||||
# req = OffsetFetchRequest(topic, partition)
|
||||
# (offset,) = self.client.send_offset_fetch_request(group, [req],
|
||||
# callback=get_or_init_offset_callback,
|
||||
# fail_on_error=False)
|
||||
# self.offsets[partition] = offset
|
||||
|
||||
for partition in partitions:
|
||||
self.offsets[partition] = 0
|
||||
if auto_commit:
|
||||
for partition in partitions:
|
||||
req = OffsetFetchRequest(topic, partition)
|
||||
(offset,) = self.client.send_offset_fetch_request(group, [req],
|
||||
callback=get_or_init_offset_callback,
|
||||
fail_on_error=False)
|
||||
self.offsets[partition] = offset
|
||||
else:
|
||||
for partition in partitions:
|
||||
self.offsets[partition] = 0
|
||||
|
||||
def commit(self, partitions=None):
|
||||
"""
|
||||
|
@@ -1,25 +1,32 @@
|
||||
import unittest
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import random
|
||||
import socket
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import kafka
|
||||
from kafka.common import *
|
||||
from fixtures import ZookeeperFixture, KafkaFixture
|
||||
from testutil import *
|
||||
|
||||
@unittest.skipIf(skip_integration(), 'Skipping Integration')
|
||||
class TestKafkaClientIntegration(KafkaIntegrationTestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): # noqa
|
||||
if not os.environ.get('KAFKA_VERSION'):
|
||||
return
|
||||
|
||||
cls.zk = ZookeeperFixture.instance()
|
||||
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls): # noqa
|
||||
if not os.environ.get('KAFKA_VERSION'):
|
||||
return
|
||||
|
||||
cls.server.close()
|
||||
cls.zk.close()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_timeout(self):
|
||||
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
server_port = get_open_port()
|
||||
@@ -30,6 +37,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
|
||||
conn = kafka.conn.KafkaConnection("localhost", server_port, 1.0)
|
||||
self.assertGreaterEqual(t.interval, 1.0)
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_consume_none(self):
|
||||
fetch = FetchRequest(self.topic, 0, 0, 1024)
|
||||
|
||||
|
@@ -14,7 +14,7 @@ from kafka.common import (
|
||||
LeaderUnavailableError, PartitionUnavailableError
|
||||
)
|
||||
from kafka.codec import (
|
||||
has_gzip, has_snappy, gzip_encode, gzip_decode,
|
||||
has_snappy, gzip_encode, gzip_decode,
|
||||
snappy_encode, snappy_decode
|
||||
)
|
||||
from kafka.protocol import (
|
||||
@@ -23,7 +23,6 @@ from kafka.protocol import (
|
||||
from testutil import *
|
||||
|
||||
class TestCodec(unittest.TestCase):
|
||||
@unittest.skipUnless(has_gzip(), "Gzip not available")
|
||||
def test_gzip(self):
|
||||
for i in xrange(1000):
|
||||
s1 = random_string(100)
|
||||
|
@@ -1,3 +1,4 @@
|
||||
import os
|
||||
import unittest
|
||||
from datetime import datetime
|
||||
|
||||
@@ -7,10 +8,12 @@ from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
|
||||
from fixtures import ZookeeperFixture, KafkaFixture
|
||||
from testutil import *
|
||||
|
||||
@unittest.skipIf(skip_integration(), 'Skipping Integration')
|
||||
class TestConsumerIntegration(KafkaIntegrationTestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if not os.environ.get('KAFKA_VERSION'):
|
||||
return
|
||||
|
||||
cls.zk = ZookeeperFixture.instance()
|
||||
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
|
||||
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
|
||||
@@ -19,6 +22,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls): # noqa
|
||||
if not os.environ.get('KAFKA_VERSION'):
|
||||
return
|
||||
|
||||
cls.server1.close()
|
||||
cls.server2.close()
|
||||
cls.zk.close()
|
||||
@@ -38,6 +44,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
||||
# Make sure there are no duplicates
|
||||
self.assertEquals(len(set(messages)), num_messages)
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_simple_consumer(self):
|
||||
self.send_messages(0, range(0, 100))
|
||||
self.send_messages(1, range(100, 200))
|
||||
@@ -51,6 +58,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
consumer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_simple_consumer__seek(self):
|
||||
self.send_messages(0, range(0, 100))
|
||||
self.send_messages(1, range(100, 200))
|
||||
@@ -69,6 +77,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
consumer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_simple_consumer_blocking(self):
|
||||
consumer = SimpleConsumer(self.client, "group1",
|
||||
self.topic,
|
||||
@@ -96,6 +105,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
consumer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_simple_consumer_pending(self):
|
||||
# Produce 10 messages to partitions 0 and 1
|
||||
self.send_messages(0, range(0, 10))
|
||||
@@ -110,6 +120,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
consumer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_multi_process_consumer(self):
|
||||
# Produce 100 messages to partitions 0 and 1
|
||||
self.send_messages(0, range(0, 100))
|
||||
@@ -121,6 +132,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
consumer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_multi_process_consumer_blocking(self):
|
||||
consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False)
|
||||
|
||||
@@ -148,6 +160,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
consumer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_multi_proc_pending(self):
|
||||
self.send_messages(0, range(0, 10))
|
||||
self.send_messages(1, range(10, 20))
|
||||
@@ -160,6 +173,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
consumer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_large_messages(self):
|
||||
# Produce 10 "normal" size messages
|
||||
small_messages = self.send_messages(0, [ str(x) for x in range(10) ])
|
||||
@@ -177,6 +191,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
consumer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_huge_messages(self):
|
||||
huge_message, = self.send_messages(0, [
|
||||
create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)),
|
||||
@@ -213,23 +228,25 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
||||
msgs2 = self.send_messages(1, range(100, 200))
|
||||
|
||||
# Start a consumer
|
||||
consumer = SimpleConsumer(self.client, "group1",
|
||||
consumer1 = SimpleConsumer(self.client, "group1",
|
||||
self.topic, auto_commit=True,
|
||||
auto_commit_every_t=600,
|
||||
auto_commit_every_n=20,
|
||||
iter_timeout=0)
|
||||
|
||||
# Grab the first 195 messages
|
||||
output_msgs1 = [ consumer.get_message().message.value for _ in xrange(195) ]
|
||||
output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ]
|
||||
self.assert_message_count(output_msgs1, 195)
|
||||
consumer.stop()
|
||||
|
||||
# The offset should be at 180
|
||||
consumer = SimpleConsumer(self.client, "group1",
|
||||
consumer2 = SimpleConsumer(self.client, "group1",
|
||||
self.topic, auto_commit=True,
|
||||
auto_commit_every_t=600,
|
||||
auto_commit_every_n=20,
|
||||
iter_timeout=0)
|
||||
|
||||
# 180-200
|
||||
self.assert_message_count([ message for message in consumer ], 20)
|
||||
self.assert_message_count([ message for message in consumer2 ], 20)
|
||||
|
||||
consumer.stop()
|
||||
consumer1.stop()
|
||||
consumer2.stop()
|
||||
|
@@ -1,17 +1,20 @@
|
||||
import unittest
|
||||
import os
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from kafka import * # noqa
|
||||
from kafka.common import * # noqa
|
||||
from fixtures import ZookeeperFixture, KafkaFixture
|
||||
from testutil import *
|
||||
|
||||
@unittest.skipIf(skip_integration(), 'Skipping Integration')
|
||||
class TestFailover(KafkaIntegrationTestCase):
|
||||
create_client = False
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls): # noqa
|
||||
if not os.environ.get('KAFKA_VERSION'):
|
||||
return
|
||||
|
||||
zk_chroot = random_string(10)
|
||||
replicas = 2
|
||||
partitions = 2
|
||||
@@ -26,11 +29,15 @@ class TestFailover(KafkaIntegrationTestCase):
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
if not os.environ.get('KAFKA_VERSION'):
|
||||
return
|
||||
|
||||
cls.client.close()
|
||||
for broker in cls.brokers:
|
||||
broker.close()
|
||||
cls.zk.close()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_switch_leader(self):
|
||||
key, topic, partition = random_string(5), self.topic, 0
|
||||
producer = SimpleProducer(self.client)
|
||||
@@ -62,6 +69,7 @@ class TestFailover(KafkaIntegrationTestCase):
|
||||
|
||||
producer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_switch_leader_async(self):
|
||||
key, topic, partition = random_string(5), self.topic, 0
|
||||
producer = SimpleProducer(self.client, async=True)
|
||||
|
@@ -1,6 +1,7 @@
|
||||
import uuid
|
||||
import os
|
||||
import time
|
||||
import unittest
|
||||
import uuid
|
||||
|
||||
from kafka import * # noqa
|
||||
from kafka.common import * # noqa
|
||||
@@ -13,14 +14,21 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls): # noqa
|
||||
if not os.environ.get('KAFKA_VERSION'):
|
||||
return
|
||||
|
||||
cls.zk = ZookeeperFixture.instance()
|
||||
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls): # noqa
|
||||
if not os.environ.get('KAFKA_VERSION'):
|
||||
return
|
||||
|
||||
cls.server.close()
|
||||
cls.zk.close()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_produce_many_simple(self):
|
||||
start_offset = self.current_offset(self.topic, 0)
|
||||
|
||||
@@ -36,6 +44,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
100,
|
||||
)
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_produce_10k_simple(self):
|
||||
start_offset = self.current_offset(self.topic, 0)
|
||||
|
||||
@@ -45,6 +54,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
10000,
|
||||
)
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_produce_many_gzip(self):
|
||||
start_offset = self.current_offset(self.topic, 0)
|
||||
|
||||
@@ -57,8 +67,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
200,
|
||||
)
|
||||
|
||||
@unittest.skip("All snappy integration tests fail with nosnappyjava")
|
||||
@kafka_versions("all")
|
||||
def test_produce_many_snappy(self):
|
||||
self.skipTest("All snappy integration tests fail with nosnappyjava")
|
||||
start_offset = self.current_offset(self.topic, 0)
|
||||
|
||||
self.assert_produce_request([
|
||||
@@ -69,6 +80,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
200,
|
||||
)
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_produce_mixed(self):
|
||||
start_offset = self.current_offset(self.topic, 0)
|
||||
|
||||
@@ -85,6 +97,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
self.assert_produce_request(messages, start_offset, msg_count)
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_produce_100k_gzipped(self):
|
||||
start_offset = self.current_offset(self.topic, 0)
|
||||
|
||||
@@ -106,6 +119,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
# SimpleProducer Tests #
|
||||
############################
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_simple_producer(self):
|
||||
start_offset0 = self.current_offset(self.topic, 0)
|
||||
start_offset1 = self.current_offset(self.topic, 1)
|
||||
@@ -130,6 +144,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
producer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_round_robin_partitioner(self):
|
||||
msg1, msg2, msg3, msg4 = [ str(uuid.uuid4()) for _ in range(4) ]
|
||||
|
||||
@@ -152,6 +167,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
producer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_hashed_partitioner(self):
|
||||
start_offset0 = self.current_offset(self.topic, 0)
|
||||
start_offset1 = self.current_offset(self.topic, 1)
|
||||
@@ -174,6 +190,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
producer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_acks_none(self):
|
||||
start_offset0 = self.current_offset(self.topic, 0)
|
||||
start_offset1 = self.current_offset(self.topic, 1)
|
||||
@@ -185,6 +202,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
|
||||
producer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_acks_local_write(self):
|
||||
start_offset0 = self.current_offset(self.topic, 0)
|
||||
start_offset1 = self.current_offset(self.topic, 1)
|
||||
@@ -197,6 +215,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
producer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_acks_cluster_commit(self):
|
||||
start_offset0 = self.current_offset(self.topic, 0)
|
||||
start_offset1 = self.current_offset(self.topic, 1)
|
||||
@@ -211,6 +230,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
producer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_batched_simple_producer__triggers_by_message(self):
|
||||
start_offset0 = self.current_offset(self.topic, 0)
|
||||
start_offset1 = self.current_offset(self.topic, 1)
|
||||
@@ -259,6 +279,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
producer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_batched_simple_producer__triggers_by_time(self):
|
||||
start_offset0 = self.current_offset(self.topic, 0)
|
||||
start_offset1 = self.current_offset(self.topic, 1)
|
||||
@@ -310,6 +331,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
producer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_async_simple_producer(self):
|
||||
start_offset0 = self.current_offset(self.topic, 0)
|
||||
start_offset1 = self.current_offset(self.topic, 1)
|
||||
@@ -322,6 +344,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
producer.stop()
|
||||
|
||||
@kafka_versions("all")
|
||||
def test_async_keyed_producer(self):
|
||||
start_offset0 = self.current_offset(self.topic, 0)
|
||||
start_offset1 = self.current_offset(self.topic, 1)
|
||||
|
@@ -12,7 +12,7 @@ from kafka.common import (
|
||||
LeaderUnavailableError, PartitionUnavailableError
|
||||
)
|
||||
from kafka.codec import (
|
||||
has_gzip, has_snappy, gzip_encode, gzip_decode,
|
||||
has_snappy, gzip_encode, gzip_decode,
|
||||
snappy_encode, snappy_decode
|
||||
)
|
||||
from kafka.protocol import (
|
||||
@@ -29,7 +29,6 @@ class TestProtocol(unittest.TestCase):
|
||||
self.assertEqual(msg.key, key)
|
||||
self.assertEqual(msg.value, payload)
|
||||
|
||||
@unittest.skipUnless(has_gzip(), "gzip not available")
|
||||
def test_create_gzip(self):
|
||||
payloads = ["v1", "v2"]
|
||||
msg = create_gzip_message(payloads)
|
||||
@@ -197,7 +196,6 @@ class TestProtocol(unittest.TestCase):
|
||||
self.assertEqual(returned_offset2, 1)
|
||||
self.assertEqual(decoded_message2, create_message("v2", "k2"))
|
||||
|
||||
@unittest.skipUnless(has_gzip(), "Gzip not available")
|
||||
def test_decode_message_gzip(self):
|
||||
gzip_encoded = ('\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000'
|
||||
'\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01'
|
||||
|
@@ -13,7 +13,6 @@ from kafka import KafkaClient
|
||||
|
||||
__all__ = [
|
||||
'random_string',
|
||||
'skip_integration',
|
||||
'ensure_topic_creation',
|
||||
'get_open_port',
|
||||
'kafka_versions',
|
||||
@@ -25,15 +24,17 @@ def random_string(l):
|
||||
s = "".join(random.choice(string.letters) for i in xrange(l))
|
||||
return s
|
||||
|
||||
def skip_integration():
|
||||
return os.environ.get('SKIP_INTEGRATION')
|
||||
|
||||
def kafka_versions(*versions):
|
||||
def kafka_versions(func):
|
||||
@functools.wraps(func)
|
||||
def wrapper(self):
|
||||
if os.environ.get('KAFKA_VERSION', None) not in versions:
|
||||
kafka_version = os.environ.get('KAFKA_VERSION')
|
||||
|
||||
if not kafka_version:
|
||||
self.skipTest("no kafka version specified")
|
||||
elif 'all' not in versions and kafka_version not in versions:
|
||||
self.skipTest("unsupported kafka version")
|
||||
|
||||
return func(self)
|
||||
return wrapper
|
||||
return kafka_versions
|
||||
@@ -61,6 +62,9 @@ class KafkaIntegrationTestCase(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(KafkaIntegrationTestCase, self).setUp()
|
||||
if not os.environ.get('KAFKA_VERSION'):
|
||||
return
|
||||
|
||||
if not self.topic:
|
||||
self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))
|
||||
|
||||
@@ -73,6 +77,9 @@ class KafkaIntegrationTestCase(unittest.TestCase):
|
||||
|
||||
def tearDown(self):
|
||||
super(KafkaIntegrationTestCase, self).tearDown()
|
||||
if not os.environ.get('KAFKA_VERSION'):
|
||||
return
|
||||
|
||||
if self.create_client:
|
||||
self.client.close()
|
||||
|
||||
|
Reference in New Issue
Block a user