Cherry-pick mrtheb/kafka-python 8b9c7e51

Sync tests and fixtures with kafka 0.8.0-beta1 tag

Conflicts:

	README.md
	kafka-src
This commit is contained in:
mrtheb
2013-10-01 15:07:47 -04:00
committed by David Arthur
parent 1d4bc3f631
commit c304e3d4b0
5 changed files with 33 additions and 29 deletions

View File

@@ -18,7 +18,7 @@ Copyright 2013, David Arthur under Apache License, v2.0. See `LICENSE`
# Status # Status
I'm following the version numbers of Kafka, plus one number to indicate the I'm following the version numbers of Kafka, plus one number to indicate the
version of this project. The current version is 0.8.1-1. This version is under version of this project. The current version is 0.8.0-1. This version is under
development, APIs are subject to change. development, APIs are subject to change.
# Usage # Usage
@@ -196,6 +196,7 @@ git submodule update
cd kafka-src cd kafka-src
./sbt update ./sbt update
./sbt package ./sbt package
./sbt assembly-package-dependency
``` ```
And then run the tests. This will actually start up real local Zookeeper And then run the tests. This will actually start up real local Zookeeper

View File

@@ -17,6 +17,7 @@ from urlparse import urlparse
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
KAFKA_ROOT = os.path.join(PROJECT_ROOT, "kafka-src") KAFKA_ROOT = os.path.join(PROJECT_ROOT, "kafka-src")
IVY_ROOT = os.path.expanduser("~/.ivy2/cache") IVY_ROOT = os.path.expanduser("~/.ivy2/cache")
SCALA_VERSION = '2.8.0'
if "PROJECT_ROOT" in os.environ: if "PROJECT_ROOT" in os.environ:
PROJECT_ROOT = os.environ["PROJECT_ROOT"] PROJECT_ROOT = os.environ["PROJECT_ROOT"]
@@ -24,6 +25,8 @@ if "KAFKA_ROOT" in os.environ:
KAFKA_ROOT = os.environ["KAFKA_ROOT"] KAFKA_ROOT = os.environ["KAFKA_ROOT"]
if "IVY_ROOT" in os.environ: if "IVY_ROOT" in os.environ:
IVY_ROOT = os.environ["IVY_ROOT"] IVY_ROOT = os.environ["IVY_ROOT"]
if "SCALA_VERSION" in os.environ:
SCALA_VERSION = os.environ["SCALA_VERSION"]
def test_resource(file): def test_resource(file):
@@ -33,16 +36,8 @@ def test_resource(file):
def test_classpath(): def test_classpath():
# ./kafka-src/bin/kafka-run-class.sh is the authority. # ./kafka-src/bin/kafka-run-class.sh is the authority.
jars = ["."] jars = ["."]
jars.append(IVY_ROOT + "/org.xerial.snappy/snappy-java/bundles/snappy-java-1.0.4.1.jar") # assume all dependencies have been packaged into one jar with sbt-assembly's task "assembly-package-dependency"
jars.append(IVY_ROOT + "/org.scala-lang/scala-library/jars/scala-library-2.8.0.jar") jars.extend(glob.glob(KAFKA_ROOT + "/core/target/scala-%s/*.jar" % SCALA_VERSION))
jars.append(IVY_ROOT + "/org.scala-lang/scala-compiler/jars/scala-compiler-2.8.0.jar")
jars.append(IVY_ROOT + "/log4j/log4j/jars/log4j-1.2.15.jar")
jars.append(IVY_ROOT + "/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar")
jars.append(IVY_ROOT + "/org.apache.zookeeper/zookeeper/jars/zookeeper-3.3.4.jar")
jars.append(IVY_ROOT + "/net.sf.jopt-simple/jopt-simple/jars/jopt-simple-3.2.jar")
jars.extend(glob.glob(KAFKA_ROOT + "/core/target/scala-2.8.0/*.jar"))
jars.extend(glob.glob(KAFKA_ROOT + "/core/lib/*.jar"))
jars.extend(glob.glob(KAFKA_ROOT + "/perf/target/scala-2.8.0/kafka*.jar"))
jars = filter(os.path.exists, map(os.path.abspath, jars)) jars = filter(os.path.exists, map(os.path.abspath, jars))
return ":".join(jars) return ":".join(jars)
@@ -314,7 +309,7 @@ class KafkaFixture(object):
print("*** Starting Kafka...") print("*** Starting Kafka...")
self.child.start() self.child.start()
self.child.wait_for(r"\[Kafka Server \d+\], started") self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id)
print("*** Done!") print("*** Done!")
def close(self): def close(self):

View File

@@ -4,9 +4,9 @@
# The ASF licenses this file to You under the Apache License, Version 2.0 # The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with # (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at # the License. You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, # distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -47,8 +47,8 @@ log.cleanup.interval.mins=1
############################# Zookeeper ############################# ############################# Zookeeper #############################
zk.connect={zk_host}:{zk_port}/{zk_chroot} zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
zk.connection.timeout.ms=1000000 zookeeper.connection.timeout.ms=1000000
kafka.metrics.polling.interval.secs=5 kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter

View File

@@ -242,6 +242,7 @@ class TestKafkaClient(unittest.TestCase):
# Offset Tests # # Offset Tests #
#################### ####################
@unittest.skip('commmit offset not supported in this version')
def test_commit_fetch_offsets(self): def test_commit_fetch_offsets(self):
req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata") req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata")
(resp,) = self.client.send_offset_commit_request("group", [req]) (resp,) = self.client.send_offset_commit_request("group", [req])
@@ -401,8 +402,9 @@ class TestKafkaClient(unittest.TestCase):
producer.stop() producer.stop()
def test_acks_cluster_commit(self): def test_acks_cluster_commit(self):
producer = SimpleProducer(self.client, "test_acks_cluster_commit", producer = SimpleProducer(
req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT) self.client, "test_acks_cluster_commit",
req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT)
resp = producer.send_messages("one") resp = producer.send_messages("one")
self.assertEquals(len(resp), 1) self.assertEquals(len(resp), 1)
@@ -548,11 +550,11 @@ class TestKafkaClient(unittest.TestCase):
class TestConsumer(unittest.TestCase): class TestConsumer(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): # noqa def setUpClass(cls):
cls.zk = ZookeeperFixture.instance() cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
cls.client = KafkaClient(cls.server2.host, cls.server2.port) cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192)
@classmethod @classmethod
def tearDownClass(cls): # noqa def tearDownClass(cls): # noqa
@@ -581,7 +583,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0) self.assertEquals(resp.offset, 0)
# Start a consumer # Start a consumer
consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer") consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer", auto_commit=False)
all_messages = [] all_messages = []
for message in consumer: for message in consumer:
all_messages.append(message) all_messages.append(message)
@@ -604,6 +606,11 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(len(all_messages), 13) self.assertEquals(len(all_messages), 13)
consumer.stop()
def test_simple_consumer_blocking(self):
consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_blocking", auto_commit=False)
# Blocking API # Blocking API
start = datetime.now() start = datetime.now()
messages = consumer.get_messages(block=True, timeout=5) messages = consumer.get_messages(block=True, timeout=5)
@@ -612,13 +619,13 @@ class TestConsumer(unittest.TestCase):
self.assertEqual(len(messages), 0) self.assertEqual(len(messages), 0)
# Send 10 messages # Send 10 messages
produce = ProduceRequest("test_simple_consumer", 0, messages=[ produce = ProduceRequest("test_simple_consumer_blocking", 0, messages=[
create_message("Test message 0 %d" % i) for i in range(10) create_message("Test message 0 %d" % i) for i in range(10)
]) ])
for resp in self.client.send_produce_request([produce]): for resp in self.client.send_produce_request([produce]):
self.assertEquals(resp.error, 0) self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 100) self.assertEquals(resp.offset, 0)
# Fetch 5 messages # Fetch 5 messages
messages = consumer.get_messages(count=5, block=True, timeout=5) messages = consumer.get_messages(count=5, block=True, timeout=5)
@@ -650,7 +657,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.error, 0) self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0) self.assertEquals(resp.offset, 0)
consumer = SimpleConsumer(self.client, "group1", "test_simple_pending") consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", auto_commit=False)
self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10)
@@ -676,7 +683,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0) self.assertEquals(resp.offset, 0)
# Start a consumer # Start a consumer
consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer") consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer", auto_commit=False)
all_messages = [] all_messages = []
for message in consumer: for message in consumer:
all_messages.append(message) all_messages.append(message)
@@ -732,7 +739,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.error, 0) self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0) self.assertEquals(resp.offset, 0)
consumer = MultiProcessConsumer(self.client, "group1", "test_mppending") consumer = MultiProcessConsumer(self.client, "group1", "test_mppending", auto_commit=False)
self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10)
@@ -749,7 +756,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0) self.assertEquals(resp.offset, 0)
# Produce 10 messages that are too large (bigger than default fetch size) # Produce 10 messages that are too large (bigger than default fetch size)
messages2=[create_message(random_string(5000)) for i in range(10)] messages2 = [create_message(random_string(5000)) for i in range(10)]
produce2 = ProduceRequest("test_large_messages", 0, messages2) produce2 = ProduceRequest("test_large_messages", 0, messages2)
for resp in self.client.send_produce_request([produce2]): for resp in self.client.send_produce_request([produce2]):
@@ -757,12 +764,13 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 10) self.assertEquals(resp.offset, 10)
# Consumer should still get all of them # Consumer should still get all of them
consumer = SimpleConsumer(self.client, "group1", "test_large_messages") consumer = SimpleConsumer(self.client, "group1", "test_large_messages", auto_commit=False)
all_messages = messages1 + messages2 all_messages = messages1 + messages2
for i, message in enumerate(consumer): for i, message in enumerate(consumer):
self.assertEquals(all_messages[i], message.message) self.assertEquals(all_messages[i], message.message)
self.assertEquals(i, 19) self.assertEquals(i, 19)
def random_string(l): def random_string(l):
s = "".join(random.choice(string.letters) for i in xrange(l)) s = "".join(random.choice(string.letters) for i in xrange(l))
return s return s