Refactor and update integration tests
This commit is contained in:
committed by
David Arthur
parent
6704050e50
commit
e073b3310d
@@ -11,6 +11,13 @@ except ImportError:
|
||||
log.warn("Snappy codec not available")
|
||||
hasSnappy = False
|
||||
|
||||
def has_gzip():
|
||||
return True
|
||||
|
||||
|
||||
def has_snappy():
|
||||
return hasSnappy
|
||||
|
||||
|
||||
def gzip_encode(payload):
|
||||
buf = StringIO()
|
||||
|
||||
@@ -83,6 +83,11 @@ class SimpleConsumer(object):
|
||||
for partition in self.client.topic_partitions[topic]:
|
||||
self.offsets[partition] = 0
|
||||
|
||||
def stop(self):
|
||||
if self.commit_timer is not None:
|
||||
self.commit_timer.stop()
|
||||
self.commit()
|
||||
|
||||
def seek(self, offset, whence):
|
||||
"""
|
||||
Alter the current offset in the consumer, similar to fseek
|
||||
|
||||
@@ -1,147 +1,337 @@
|
||||
import glob
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import select
|
||||
import shlex
|
||||
import shutil
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from threading import Thread, Event
|
||||
import threading
|
||||
import time
|
||||
import unittest
|
||||
import uuid
|
||||
|
||||
from urlparse import urlparse
|
||||
|
||||
from kafka import *
|
||||
from kafka.common import *
|
||||
from kafka import * # noqa
|
||||
from kafka.common import * # noqa
|
||||
from kafka.codec import has_gzip, has_snappy
|
||||
|
||||
|
||||
def kafka_log4j():
|
||||
return os.path.abspath("./test/resources/log4j.properties")
|
||||
|
||||
|
||||
def kafka_classpath():
|
||||
# ./kafka-src/bin/kafka-run-class.sh is the authority.
|
||||
ivy = os.path.expanduser("~/.ivy2/cache")
|
||||
base = os.path.abspath("./kafka-src/")
|
||||
|
||||
jars = ["."]
|
||||
jars.append(ivy + "/org.xerial.snappy/snappy-java/bundles/snappy-java-1.0.4.1.jar")
|
||||
jars.append(ivy + "/org.scala-lang/scala-library/jars/scala-library-2.8.0.jar")
|
||||
jars.append(ivy + "/org.scala-lang/scala-compiler/jars/scala-compiler-2.8.0.jar")
|
||||
jars.append(ivy + "/log4j/log4j/jars/log4j-1.2.15.jar")
|
||||
jars.append(ivy + "/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar")
|
||||
jars.append(ivy + "/org.apache.zookeeper/zookeeper/jars/zookeeper-3.3.4.jar")
|
||||
jars.append(ivy + "/net.sf.jopt-simple/jopt-simple/jars/jopt-simple-3.2.jar")
|
||||
jars.extend(glob.glob(base + "/core/target/scala-2.8.0/*.jar"))
|
||||
jars.extend(glob.glob(base + "/core/lib/*.jar"))
|
||||
jars.extend(glob.glob(base + "/perf/target/scala-2.8.0/kafka*.jar"))
|
||||
|
||||
jars = filter(os.path.exists, map(os.path.abspath, jars))
|
||||
return ":".join(jars)
|
||||
|
||||
|
||||
def kafka_run_class_args(*args):
|
||||
# ./kafka-src/bin/kafka-run-class.sh is the authority.
|
||||
result = ["java", "-Xmx512M", "-server"]
|
||||
result.append("-Dlog4j.configuration=file:%s" % kafka_log4j())
|
||||
result.append("-Dcom.sun.management.jmxremote")
|
||||
result.append("-Dcom.sun.management.jmxremote.authenticate=false")
|
||||
result.append("-Dcom.sun.management.jmxremote.ssl=false")
|
||||
result.append("-cp")
|
||||
result.append(kafka_classpath())
|
||||
result.extend(args)
|
||||
return result
|
||||
|
||||
|
||||
def get_open_port():
|
||||
sock = socket.socket()
|
||||
sock.bind(('',0))
|
||||
sock.bind(("", 0))
|
||||
port = sock.getsockname()[1]
|
||||
sock.close()
|
||||
return port
|
||||
|
||||
def build_kafka_classpath():
|
||||
baseDir = "./kafka-src"
|
||||
jars = []
|
||||
jars += glob.glob(os.path.join(baseDir, "project/boot/scala-2.8.0/lib/*.jar"))
|
||||
jars += glob.glob(os.path.join(baseDir, "core/target/scala_2.8.0/*.jar"))
|
||||
jars += glob.glob(os.path.join(baseDir, "core/lib/*.jar"))
|
||||
jars += glob.glob(os.path.join(baseDir, "core/lib_managed/scala_2.8.0/compile/*.jar"))
|
||||
jars += glob.glob(os.path.join(baseDir, "core/target/scala-2.8.0/kafka_2.8.0-*.jar"))
|
||||
jars += glob.glob(os.path.join(baseDir, "/Users/mumrah/.ivy2/cache/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar"))
|
||||
cp = ":".join(["."] + [os.path.abspath(jar) for jar in jars])
|
||||
cp += ":" + os.path.abspath(os.path.join(baseDir, "conf/log4j.properties"))
|
||||
return cp
|
||||
|
||||
class KafkaFixture(Thread):
|
||||
def __init__(self, host, port, broker_id, zk_chroot=None):
|
||||
Thread.__init__(self)
|
||||
self.broker_id = broker_id
|
||||
self.zk_chroot = zk_chroot
|
||||
self.port = port
|
||||
self.capture = ""
|
||||
self.shouldDie = Event()
|
||||
self.tmpDir = tempfile.mkdtemp()
|
||||
print("tmp dir: %s" % self.tmpDir)
|
||||
|
||||
def run(self):
|
||||
# Create the log directory
|
||||
logDir = os.path.join(self.tmpDir, 'logs')
|
||||
os.mkdir(logDir)
|
||||
stdout = open(os.path.join(logDir, 'stdout'), 'w')
|
||||
|
||||
# Create the config file
|
||||
if self.zk_chroot is None:
|
||||
self.zk_chroot= "kafka-python_%s" % self.tmpDir.replace("/", "_")
|
||||
logConfig = "test/resources/log4j.properties"
|
||||
configFile = os.path.join(self.tmpDir, 'server.properties')
|
||||
f = open('test/resources/server.properties', 'r')
|
||||
props = f.read()
|
||||
f = open(configFile, 'w')
|
||||
f.write(props % {'broker.id': self.broker_id,
|
||||
'kafka.port': self.port,
|
||||
'kafka.tmp.dir': logDir,
|
||||
'kafka.partitions': 2,
|
||||
'zk.chroot': self.zk_chroot})
|
||||
f.close()
|
||||
|
||||
cp = build_kafka_classpath()
|
||||
|
||||
# Create the Zookeeper chroot
|
||||
args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, self.zk_chroot))
|
||||
proc = subprocess.Popen(args)
|
||||
ret = proc.wait()
|
||||
if ret != 0:
|
||||
sys.exit(1)
|
||||
def render_template(source_file, target_file, binding):
|
||||
with open(source_file, "r") as handle:
|
||||
template = handle.read()
|
||||
with open(target_file, "w") as handle:
|
||||
handle.write(template.format(**binding))
|
||||
|
||||
|
||||
# Start Kafka
|
||||
args = shlex.split("java -Xmx256M -server -Dlog4j.configuration=%s -cp %s kafka.Kafka %s" % (logConfig, cp, configFile))
|
||||
proc = subprocess.Popen(args, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()})
|
||||
|
||||
killed = False
|
||||
while True:
|
||||
(rlist, wlist, xlist) = select.select([proc.stdout], [], [], 1)
|
||||
if proc.stdout in rlist:
|
||||
read = proc.stdout.readline()
|
||||
stdout.write(read)
|
||||
stdout.flush()
|
||||
self.capture += read
|
||||
|
||||
if self.shouldDie.is_set():
|
||||
proc.terminate()
|
||||
killed = True
|
||||
|
||||
if proc.poll() is not None:
|
||||
#shutil.rmtree(self.tmpDir)
|
||||
if killed:
|
||||
break
|
||||
else:
|
||||
raise RuntimeError("Kafka died. Aborting.")
|
||||
|
||||
def wait_for(self, target, timeout=10):
|
||||
t1 = time.time()
|
||||
while True:
|
||||
t2 = time.time()
|
||||
if t2-t1 >= timeout:
|
||||
return False
|
||||
if target in self.capture:
|
||||
return True
|
||||
time.sleep(0.100)
|
||||
|
||||
def close(self):
|
||||
self.shouldDie.set()
|
||||
|
||||
class ExternalKafkaFixture(object):
|
||||
class ExternalServiceFixture(object):
|
||||
def __init__(self, host, port):
|
||||
print("Using already running Kafka at %s:%d" % (host, port))
|
||||
print("Using already running service at %s:%d" % (host, port))
|
||||
|
||||
def open(self):
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
class SubprocessFixture(threading.Thread):
|
||||
def __init__(self, args=[]):
|
||||
threading.Thread.__init__(self)
|
||||
|
||||
self.args = args
|
||||
self.captured_stdout = ""
|
||||
self.captured_stderr = ""
|
||||
self.stdout_file = None
|
||||
self.stderr_file = None
|
||||
self.capture_stdout = True
|
||||
self.capture_stderr = True
|
||||
self.show_stdout = True
|
||||
self.show_stderr = True
|
||||
|
||||
self.should_die = threading.Event()
|
||||
|
||||
def configure_stdout(self, file=None, capture=True, show=False):
|
||||
self.stdout_file = file
|
||||
self.capture_stdout = capture
|
||||
self.show_stdout = show
|
||||
|
||||
def configure_stderr(self, file=None, capture=False, show=True):
|
||||
self.stderr_file = file
|
||||
self.capture_stderr = capture
|
||||
self.show_stderr = show
|
||||
|
||||
def run(self):
|
||||
stdout_handle = None
|
||||
stderr_handle = None
|
||||
try:
|
||||
if self.stdout_file:
|
||||
stdout_handle = open(self.stdout_file, "w")
|
||||
if self.stderr_file:
|
||||
stderr_handle = open(self.stderr_file, "w")
|
||||
self.run_with_handles(stdout_handle, stderr_handle)
|
||||
finally:
|
||||
if stdout_handle:
|
||||
stdout_handle.close()
|
||||
if stderr_handle:
|
||||
stderr_handle.close()
|
||||
|
||||
def run_with_handles(self, stdout_handle, stderr_handle):
|
||||
child = subprocess.Popen(
|
||||
self.args,
|
||||
bufsize=1,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
alive = True
|
||||
|
||||
while True:
|
||||
(rds, wds, xds) = select.select([child.stdout, child.stderr], [], [], 1)
|
||||
|
||||
if child.stdout in rds:
|
||||
line = child.stdout.readline()
|
||||
if stdout_handle:
|
||||
stdout_handle.write(line)
|
||||
stdout_handle.flush()
|
||||
if self.capture_stdout:
|
||||
self.captured_stdout += line
|
||||
if self.show_stdout:
|
||||
sys.stdout.write(line)
|
||||
sys.stdout.flush()
|
||||
|
||||
if child.stderr in rds:
|
||||
line = child.stderr.readline()
|
||||
if stderr_handle:
|
||||
stderr_handle.write(line)
|
||||
stderr_handle.flush()
|
||||
if self.capture_stderr:
|
||||
self.captured_stderr += line
|
||||
if self.show_stderr:
|
||||
sys.stderr.write(line)
|
||||
sys.stderr.flush()
|
||||
|
||||
if self.should_die.is_set():
|
||||
child.terminate()
|
||||
alive = False
|
||||
|
||||
if child.poll() is not None:
|
||||
if not alive:
|
||||
break
|
||||
else:
|
||||
raise RuntimeError("Subprocess has died. Aborting.")
|
||||
|
||||
def wait_for(self, pattern, timeout=10):
|
||||
t1 = time.time()
|
||||
while True:
|
||||
t2 = time.time()
|
||||
if t2 - t1 >= timeout:
|
||||
raise RuntimeError("Waiting for %r timed out" % pattern)
|
||||
if re.search(pattern, self.captured_stdout) is not None:
|
||||
return
|
||||
if re.search(pattern, self.captured_stderr) is not None:
|
||||
return
|
||||
time.sleep(0.1)
|
||||
|
||||
def start(self):
|
||||
threading.Thread.start(self)
|
||||
|
||||
def stop(self):
|
||||
self.should_die.set()
|
||||
self.join()
|
||||
|
||||
|
||||
class ZookeeperFixture(object):
|
||||
@staticmethod
|
||||
def instance():
|
||||
if "ZOOKEEPER_URI" in os.environ:
|
||||
parse = urlparse(os.environ["ZOOKEEPER_URI"])
|
||||
(host, port) = (parse.hostname, parse.port)
|
||||
fixture = ExternalServiceFixture(host, port)
|
||||
else:
|
||||
(host, port) = ("127.0.0.1", get_open_port())
|
||||
fixture = ZookeeperFixture(host, port)
|
||||
fixture.open()
|
||||
return fixture
|
||||
|
||||
def __init__(self, host, port):
|
||||
self.host = host
|
||||
self.port = port
|
||||
|
||||
self.tmp_dir = None
|
||||
self.child = None
|
||||
|
||||
def open(self):
|
||||
self.tmp_dir = tempfile.mkdtemp()
|
||||
print("*** Running local Zookeeper instance...")
|
||||
print(" host = %s" % self.host)
|
||||
print(" port = %s" % self.port)
|
||||
print(" tmp_dir = %s" % self.tmp_dir)
|
||||
|
||||
# Generate configs
|
||||
properties = os.path.join(self.tmp_dir, "zookeeper.properties")
|
||||
render_template("./test/resources/zookeeper.properties", properties, vars(self))
|
||||
|
||||
# Configure Zookeeper child process
|
||||
self.child = SubprocessFixture(kafka_run_class_args(
|
||||
"org.apache.zookeeper.server.quorum.QuorumPeerMain",
|
||||
properties
|
||||
))
|
||||
self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt"))
|
||||
self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))
|
||||
|
||||
# Party!
|
||||
print("*** Starting Zookeeper...")
|
||||
self.child.start()
|
||||
self.child.wait_for(r"Snapshotting")
|
||||
print("*** Done!")
|
||||
|
||||
def close(self):
|
||||
print("*** Stopping Zookeeper...")
|
||||
self.child.stop()
|
||||
self.child = None
|
||||
print("*** Done!")
|
||||
shutil.rmtree(self.tmp_dir)
|
||||
|
||||
|
||||
class KafkaFixture(object):
|
||||
@staticmethod
|
||||
def instance(broker_id, zk_host, zk_port, zk_chroot=None):
|
||||
if zk_chroot is None:
|
||||
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
|
||||
if "KAFKA_URI" in os.environ:
|
||||
parse = urlparse(os.environ["KAFKA_URI"])
|
||||
(host, port) = (parse.hostname, parse.port)
|
||||
fixture = ExternalServiceFixture(host, port)
|
||||
else:
|
||||
(host, port) = ("localhost", get_open_port())
|
||||
fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot)
|
||||
fixture.open()
|
||||
return fixture
|
||||
|
||||
def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot):
|
||||
self.host = host
|
||||
self.port = port
|
||||
|
||||
self.broker_id = broker_id
|
||||
|
||||
self.zk_host = zk_host
|
||||
self.zk_port = zk_port
|
||||
self.zk_chroot = zk_chroot
|
||||
|
||||
self.tmp_dir = None
|
||||
self.child = None
|
||||
|
||||
def open(self):
|
||||
self.tmp_dir = tempfile.mkdtemp()
|
||||
print("*** Running local Kafka instance")
|
||||
print(" host = %s" % self.host)
|
||||
print(" port = %s" % self.port)
|
||||
print(" broker_id = %s" % self.broker_id)
|
||||
print(" zk_host = %s" % self.zk_host)
|
||||
print(" zk_port = %s" % self.zk_port)
|
||||
print(" zk_chroot = %s" % self.zk_chroot)
|
||||
print(" tmp_dir = %s" % self.tmp_dir)
|
||||
|
||||
# Create directories
|
||||
os.mkdir(os.path.join(self.tmp_dir, "logs"))
|
||||
os.mkdir(os.path.join(self.tmp_dir, "data"))
|
||||
|
||||
# Generate configs
|
||||
properties = os.path.join(self.tmp_dir, "kafka.properties")
|
||||
render_template("./test/resources/kafka.properties", properties, vars(self))
|
||||
|
||||
# Configure Kafka child process
|
||||
self.child = SubprocessFixture(kafka_run_class_args(
|
||||
"kafka.Kafka", properties
|
||||
))
|
||||
self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt"))
|
||||
self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))
|
||||
|
||||
# Party!
|
||||
print("*** Creating Zookeeper chroot node...")
|
||||
proc = subprocess.Popen(kafka_run_class_args(
|
||||
"org.apache.zookeeper.ZooKeeperMain",
|
||||
"-server", "%s:%d" % (self.zk_host, self.zk_port),
|
||||
"create", "/%s" % self.zk_chroot, "kafka-python"
|
||||
))
|
||||
if proc.wait() != 0:
|
||||
print("*** Failed to create Zookeeper chroot node")
|
||||
raise RuntimeError("Failed to create Zookeeper chroot node")
|
||||
print("*** Done!")
|
||||
|
||||
print("*** Starting Kafka...")
|
||||
self.child.start()
|
||||
self.child.wait_for(r"\[Kafka Server \d+\], started")
|
||||
print("*** Done!")
|
||||
|
||||
def close(self):
|
||||
print("*** Stopping Kafka...")
|
||||
self.child.stop()
|
||||
self.child = None
|
||||
print("*** Done!")
|
||||
shutil.rmtree(self.tmp_dir)
|
||||
|
||||
|
||||
class TestKafkaClient(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if os.environ.has_key('KAFKA_URI'):
|
||||
parse = urlparse(os.environ['KAFKA_URI'])
|
||||
(host, port) = (parse.hostname, parse.port)
|
||||
cls.server = ExternalKafkaFixture(host, port)
|
||||
cls.client = KafkaClient(host, port)
|
||||
else:
|
||||
port = get_open_port()
|
||||
cls.server = KafkaFixture("localhost", port, 0)
|
||||
cls.server.start()
|
||||
cls.server.wait_for("Kafka server started")
|
||||
cls.client = KafkaClient("localhost", port)
|
||||
def setUpClass(cls): # noqa
|
||||
cls.zk = ZookeeperFixture.instance()
|
||||
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
|
||||
cls.client = KafkaClient(cls.server.host, cls.server.port)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
def tearDownClass(cls): # noqa
|
||||
cls.client.close()
|
||||
cls.server.close()
|
||||
cls.zk.close()
|
||||
|
||||
#####################
|
||||
# Produce Tests #
|
||||
@@ -150,7 +340,7 @@ class TestKafkaClient(unittest.TestCase):
|
||||
def test_produce_many_simple(self):
|
||||
produce = ProduceRequest("test_produce_many_simple", 0, messages=[
|
||||
create_message("Test message %d" % i) for i in range(100)
|
||||
])
|
||||
])
|
||||
|
||||
for resp in self.client.send_produce_request([produce]):
|
||||
self.assertEquals(resp.error, 0)
|
||||
@@ -176,7 +366,7 @@ class TestKafkaClient(unittest.TestCase):
|
||||
def test_produce_10k_simple(self):
|
||||
produce = ProduceRequest("test_produce_10k_simple", 0, messages=[
|
||||
create_message("Test message %d" % i) for i in range(10000)
|
||||
])
|
||||
])
|
||||
|
||||
for resp in self.client.send_produce_request([produce]):
|
||||
self.assertEquals(resp.error, 0)
|
||||
@@ -186,6 +376,8 @@ class TestKafkaClient(unittest.TestCase):
|
||||
self.assertEquals(offset.offsets[0], 10000)
|
||||
|
||||
def test_produce_many_gzip(self):
|
||||
if not has_gzip():
|
||||
return
|
||||
message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)])
|
||||
message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)])
|
||||
|
||||
@@ -199,6 +391,8 @@ class TestKafkaClient(unittest.TestCase):
|
||||
self.assertEquals(offset.offsets[0], 200)
|
||||
|
||||
def test_produce_many_snappy(self):
|
||||
if not has_snappy():
|
||||
return
|
||||
message1 = create_snappy_message(["Snappy 1 %d" % i for i in range(100)])
|
||||
message2 = create_snappy_message(["Snappy 2 %d" % i for i in range(100)])
|
||||
|
||||
@@ -212,6 +406,8 @@ class TestKafkaClient(unittest.TestCase):
|
||||
self.assertEquals(offset.offsets[0], 200)
|
||||
|
||||
def test_produce_mixed(self):
|
||||
if not has_gzip() or not has_snappy():
|
||||
return
|
||||
message1 = create_message("Just a plain message")
|
||||
message2 = create_gzip_message(["Gzipped %d" % i for i in range(100)])
|
||||
message3 = create_snappy_message(["Snappy %d" % i for i in range(100)])
|
||||
@@ -225,7 +421,6 @@ class TestKafkaClient(unittest.TestCase):
|
||||
(offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_mixed", 0, -1, 1)])
|
||||
self.assertEquals(offset.offsets[0], 201)
|
||||
|
||||
|
||||
def test_produce_100k_gzipped(self):
|
||||
req1 = ProduceRequest("test_produce_100k_gzipped", 0, messages=[
|
||||
create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
|
||||
@@ -260,7 +455,7 @@ class TestKafkaClient(unittest.TestCase):
|
||||
self.assertEquals(fetch_resp.error, 0)
|
||||
self.assertEquals(fetch_resp.topic, "test_consume_none")
|
||||
self.assertEquals(fetch_resp.partition, 0)
|
||||
|
||||
|
||||
messages = list(fetch_resp.messages)
|
||||
self.assertEquals(len(messages), 0)
|
||||
|
||||
@@ -301,7 +496,7 @@ class TestKafkaClient(unittest.TestCase):
|
||||
fetch1 = FetchRequest("test_produce_consume_many", 0, 0, 1024)
|
||||
|
||||
(fetch_resp1,) = self.client.send_fetch_request([fetch1])
|
||||
|
||||
|
||||
self.assertEquals(fetch_resp1.error, 0)
|
||||
self.assertEquals(fetch_resp1.highwaterMark, 100)
|
||||
messages = list(fetch_resp1.messages)
|
||||
@@ -365,7 +560,7 @@ class TestKafkaClient(unittest.TestCase):
|
||||
(resp,) = self.client.send_offset_fetch_request("group", [req])
|
||||
self.assertEquals(resp.error, 0)
|
||||
self.assertEquals(resp.offset, 42)
|
||||
self.assertEquals(resp.metadata, "") # Metadata isn't stored for now
|
||||
self.assertEquals(resp.metadata, "") # Metadata isn't stored for now
|
||||
|
||||
# Producer Tests
|
||||
|
||||
@@ -389,30 +584,21 @@ class TestKafkaClient(unittest.TestCase):
|
||||
self.assertEquals(len(messages), 1)
|
||||
self.assertEquals(messages[0].message.value, "three")
|
||||
|
||||
class TestConsumer(unittest.TestCase):
|
||||
|
||||
class TestKafkaConsumer(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
# Broker 0
|
||||
port = get_open_port()
|
||||
cls.server1 = KafkaFixture("localhost", port, 0)
|
||||
cls.server1.start()
|
||||
cls.server1.wait_for("Kafka server started")
|
||||
|
||||
# Broker 1
|
||||
zk = cls.server1.zk_chroot
|
||||
port = get_open_port()
|
||||
cls.server2 = KafkaFixture("localhost", port, 1, zk)
|
||||
cls.server2.start()
|
||||
cls.server2.wait_for("Kafka server started")
|
||||
|
||||
# Client bootstraps from broker 1
|
||||
cls.client = KafkaClient("localhost", port)
|
||||
def setUpClass(cls): # noqa
|
||||
cls.zk = ZookeeperFixture.instance()
|
||||
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
|
||||
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
|
||||
cls.client = KafkaClient(cls.server2.host, cls.server2.port)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
def tearDownClass(cls): # noqa
|
||||
cls.client.close()
|
||||
cls.server1.close()
|
||||
cls.server2.close()
|
||||
cls.zk.close()
|
||||
|
||||
def test_consumer(self):
|
||||
# Produce 100 messages to partition 0
|
||||
@@ -440,7 +626,8 @@ class TestConsumer(unittest.TestCase):
|
||||
all_messages.append(message)
|
||||
|
||||
self.assertEquals(len(all_messages), 200)
|
||||
self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes
|
||||
# Make sure there are no duplicates
|
||||
self.assertEquals(len(all_messages), len(set(all_messages)))
|
||||
|
||||
consumer.seek(-10, 2)
|
||||
all_messages = []
|
||||
@@ -456,6 +643,8 @@ class TestConsumer(unittest.TestCase):
|
||||
|
||||
self.assertEquals(len(all_messages), 13)
|
||||
|
||||
consumer.stop()
|
||||
|
||||
def test_pending(self):
|
||||
# Produce 10 messages to partition 0 and 1
|
||||
|
||||
@@ -477,7 +666,9 @@ class TestConsumer(unittest.TestCase):
|
||||
self.assertEquals(consumer.pending(), 20)
|
||||
self.assertEquals(consumer.pending(partitions=[0]), 10)
|
||||
self.assertEquals(consumer.pending(partitions=[1]), 10)
|
||||
consumer.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
unittest.main()
|
||||
unittest.main()
|
||||
|
||||
58
test/resources/kafka.properties
Normal file
58
test/resources/kafka.properties
Normal file
@@ -0,0 +1,58 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
############################# Server Basics #############################
|
||||
|
||||
broker.id={broker_id}
|
||||
|
||||
############################# Socket Server Settings #############################
|
||||
|
||||
port={port}
|
||||
host.name={host}
|
||||
|
||||
num.network.threads=2
|
||||
num.io.threads=2
|
||||
|
||||
socket.send.buffer.bytes=1048576
|
||||
socket.receive.buffer.bytes=1048576
|
||||
socket.request.max.bytes=104857600
|
||||
|
||||
############################# Log Basics #############################
|
||||
|
||||
log.dir={tmp_dir}/data
|
||||
num.partitions=2
|
||||
|
||||
############################# Log Flush Policy #############################
|
||||
|
||||
log.flush.interval.messages=10000
|
||||
log.flush.interval.ms=1000
|
||||
|
||||
############################# Log Retention Policy #############################
|
||||
|
||||
log.retention.hours=168
|
||||
log.segment.bytes=536870912
|
||||
log.cleanup.interval.mins=1
|
||||
|
||||
############################# Zookeeper #############################
|
||||
|
||||
zk.connect={zk_host}:{zk_port}/{zk_chroot}
|
||||
zk.connection.timeout.ms=1000000
|
||||
|
||||
kafka.metrics.polling.interval.secs=5
|
||||
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
|
||||
kafka.csv.metrics.dir={tmp_dir}
|
||||
kafka.csv.metrics.reporter.enabled=false
|
||||
|
||||
log.cleanup.policy=delete
|
||||
@@ -12,19 +12,13 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
log4j.rootLogger=TRACE, stdout
|
||||
|
||||
log4j.rootLogger=INFO, stdout
|
||||
|
||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
|
||||
|
||||
#log4j.appender.fileAppender=org.apache.log4j.FileAppender
|
||||
#log4j.appender.fileAppender.File=kafka-request.log
|
||||
#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
|
||||
#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
|
||||
|
||||
|
||||
# Turn on all our debugging info
|
||||
#log4j.logger.kafka=INFO
|
||||
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
|
||||
|
||||
log4j.logger.kafka=DEBUG, stdout
|
||||
log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, stdout
|
||||
log4j.logger.org.apache.zookeeper=INFO, stdout
|
||||
|
||||
@@ -1,117 +0,0 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# see kafka.server.KafkaConfig for additional details and defaults
|
||||
|
||||
############################# Server Basics #############################
|
||||
|
||||
# The id of the broker. This must be set to a unique integer for each broker.
|
||||
broker.id=%(broker.id)d
|
||||
|
||||
############################# Socket Server Settings #############################
|
||||
|
||||
# The port the socket server listens on
|
||||
port=%(kafka.port)d
|
||||
|
||||
# Hostname the broker will bind to and advertise to producers and consumers.
|
||||
# If not set, the server will bind to all interfaces and advertise the value returned from
|
||||
# from java.net.InetAddress.getCanonicalHostName().
|
||||
#host.name=localhost
|
||||
|
||||
# The number of threads handling network requests
|
||||
num.network.threads=2
|
||||
|
||||
# The number of threads doing disk I/O
|
||||
num.io.threads=2
|
||||
|
||||
# The send buffer (SO_SNDBUF) used by the socket server
|
||||
socket.send.buffer.bytes=1048576
|
||||
|
||||
# The receive buffer (SO_RCVBUF) used by the socket server
|
||||
socket.receive.buffer.bytes=1048576
|
||||
|
||||
# The maximum size of a request that the socket server will accept (protection against OOM)
|
||||
socket.request.max.bytes=104857600
|
||||
|
||||
|
||||
############################# Log Basics #############################
|
||||
|
||||
# The directory under which to store log files
|
||||
log.dir=%(kafka.tmp.dir)s
|
||||
|
||||
# The number of logical partitions per topic per server. More partitions allow greater parallelism
|
||||
# for consumption, but also mean more files.
|
||||
num.partitions=%(kafka.partitions)d
|
||||
|
||||
############################# Log Flush Policy #############################
|
||||
|
||||
# The following configurations control the flush of data to disk. This is the most
|
||||
# important performance knob in kafka.
|
||||
# There are a few important trade-offs here:
|
||||
# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
|
||||
# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
|
||||
# 3. Throughput: The flush is generally the most expensive operation.
|
||||
# The settings below allow one to configure the flush policy to flush data after a period of time or
|
||||
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
|
||||
|
||||
# The number of messages to accept before forcing a flush of data to disk
|
||||
log.flush.interval.messages=10000
|
||||
|
||||
# The maximum amount of time a message can sit in a log before we force a flush
|
||||
log.flush.interval.ms=1000
|
||||
|
||||
# Per-topic overrides for log.flush.interval.ms
|
||||
#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
|
||||
|
||||
############################# Log Retention Policy #############################
|
||||
|
||||
# The following configurations control the disposal of log segments. The policy can
|
||||
# be set to delete segments after a period of time, or after a given size has accumulated.
|
||||
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
|
||||
# from the end of the log.
|
||||
|
||||
# The minimum age of a log file to be eligible for deletion
|
||||
log.retention.hours=168
|
||||
|
||||
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
|
||||
# segments don't drop below log.retention.bytes.
|
||||
#log.retention.bytes=1073741824
|
||||
|
||||
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
|
||||
log.segment.bytes=536870912
|
||||
|
||||
# The interval at which log segments are checked to see if they can be deleted according
|
||||
# to the retention policies
|
||||
log.cleanup.interval.mins=1
|
||||
|
||||
############################# Zookeeper #############################
|
||||
|
||||
# Zk connection string (see zk docs for details).
|
||||
# This is a comma separated host:port pairs, each corresponding to a zk
|
||||
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
|
||||
# You can also append an optional chroot string to the urls to specify the
|
||||
# root directory for all kafka znodes.
|
||||
zk.connect=localhost:2181/%(zk.chroot)s
|
||||
|
||||
# Timeout in ms for connecting to zookeeper
|
||||
zk.connection.timeout.ms=1000000
|
||||
|
||||
# metrics reporter properties
|
||||
kafka.metrics.polling.interval.secs=5
|
||||
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
|
||||
kafka.csv.metrics.dir=/tmp/kafka_metrics
|
||||
# Disable csv reporting by default.
|
||||
kafka.csv.metrics.reporter.enabled=false
|
||||
|
||||
log.cleanup.policy=delete
|
||||
19
test/resources/zookeeper.properties
Normal file
19
test/resources/zookeeper.properties
Normal file
@@ -0,0 +1,19 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
dataDir={tmp_dir}
|
||||
clientPortAddress={host}
|
||||
clientPort={port}
|
||||
maxClientCnxns=0
|
||||
Reference in New Issue
Block a user