Update fixtures to eliminate extraneous logging on non-errors, split out mostly unrelated service.py, fix test in client_integration to use get_open_port, fix unintended import cascade in test_producer_integration

This commit is contained in:
Mark Roberts
2014-04-22 23:14:23 -07:00
parent d35b8fd81f
commit b6262e4c0b
5 changed files with 238 additions and 219 deletions

View File

@@ -1,204 +1,69 @@
import logging
import glob import glob
import os import os
import re
import select
import shutil import shutil
import socket
import subprocess import subprocess
import sys
import tempfile import tempfile
import threading
import time
import uuid import uuid
from urlparse import urlparse from urlparse import urlparse
from .service import ExternalService, SpawnedService
from .testutil import get_open_port
class Fixture(object):
project_root = os.environ.get('PROJECT_ROOT', os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
scala_version = os.environ.get("SCALA_VERSION", '2.8.0')
kafka_root = os.environ.get("KAFKA_ROOT", os.path.join(project_root, "kafka-src"))
ivy_root = os.environ.get('IVY_ROOT', os.path.expanduser("~/.ivy2/cache"))
@classmethod
def test_resource(cls, filename):
return os.path.join(cls.project_root, "test", "resources", filename)
@classmethod
def test_classpath(cls):
# ./kafka-src/bin/kafka-run-class.sh is the authority.
jars = ["."]
# assume all dependencies have been packaged into one jar with sbt-assembly's task "assembly-package-dependency"
jars.extend(glob.glob(cls.kafka_root + "/core/target/scala-%s/*.jar" % cls.scala_version))
jars = filter(os.path.exists, map(os.path.abspath, jars))
return ":".join(jars)
@classmethod
def kafka_run_class_args(cls, *args):
# ./kafka-src/bin/kafka-run-class.sh is the authority.
result = ["java", "-Xmx512M", "-server"]
result.append("-Dlog4j.configuration=file:%s" % cls.test_resource("log4j.properties"))
result.append("-Dcom.sun.management.jmxremote")
result.append("-Dcom.sun.management.jmxremote.authenticate=false")
result.append("-Dcom.sun.management.jmxremote.ssl=false")
result.append("-cp")
result.append(cls.test_classpath())
result.extend(args)
return result
@classmethod
def render_template(cls, source_file, target_file, binding):
with open(source_file, "r") as handle:
template = handle.read()
with open(target_file, "w") as handle:
handle.write(template.format(**binding))
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) class ZookeeperFixture(Fixture):
KAFKA_ROOT = os.path.join(PROJECT_ROOT, "kafka-src") @classmethod
IVY_ROOT = os.path.expanduser("~/.ivy2/cache") def instance(cls):
SCALA_VERSION = '2.8.0'
if "PROJECT_ROOT" in os.environ:
PROJECT_ROOT = os.environ["PROJECT_ROOT"]
if "KAFKA_ROOT" in os.environ:
KAFKA_ROOT = os.environ["KAFKA_ROOT"]
if "IVY_ROOT" in os.environ:
IVY_ROOT = os.environ["IVY_ROOT"]
if "SCALA_VERSION" in os.environ:
SCALA_VERSION = os.environ["SCALA_VERSION"]
def test_resource(file):
return os.path.join(PROJECT_ROOT, "test", "resources", file)
def test_classpath():
# ./kafka-src/bin/kafka-run-class.sh is the authority.
jars = ["."]
# assume all dependencies have been packaged into one jar with sbt-assembly's task "assembly-package-dependency"
jars.extend(glob.glob(KAFKA_ROOT + "/core/target/scala-%s/*.jar" % SCALA_VERSION))
jars = filter(os.path.exists, map(os.path.abspath, jars))
return ":".join(jars)
def kafka_run_class_args(*args):
# ./kafka-src/bin/kafka-run-class.sh is the authority.
result = ["java", "-Xmx512M", "-server"]
result.append("-Dlog4j.configuration=file:%s" % test_resource("log4j.properties"))
result.append("-Dcom.sun.management.jmxremote")
result.append("-Dcom.sun.management.jmxremote.authenticate=false")
result.append("-Dcom.sun.management.jmxremote.ssl=false")
result.append("-cp")
result.append(test_classpath())
result.extend(args)
return result
def get_open_port():
sock = socket.socket()
sock.bind(("", 0))
port = sock.getsockname()[1]
sock.close()
return port
def render_template(source_file, target_file, binding):
with open(source_file, "r") as handle:
template = handle.read()
with open(target_file, "w") as handle:
handle.write(template.format(**binding))
class ExternalService(object):
def __init__(self, host, port):
print("Using already running service at %s:%d" % (host, port))
self.host = host
self.port = port
def open(self):
pass
def close(self):
pass
class SpawnedService(threading.Thread):
def __init__(self, args=[]):
threading.Thread.__init__(self)
self.args = args
self.captured_stdout = ""
self.captured_stderr = ""
self.stdout_file = None
self.stderr_file = None
self.capture_stdout = True
self.capture_stderr = True
self.show_stdout = True
self.show_stderr = True
self.should_die = threading.Event()
def configure_stdout(self, file=None, capture=True, show=False):
self.stdout_file = file
self.capture_stdout = capture
self.show_stdout = show
def configure_stderr(self, file=None, capture=False, show=True):
self.stderr_file = file
self.capture_stderr = capture
self.show_stderr = show
def run(self):
stdout_handle = None
stderr_handle = None
try:
if self.stdout_file:
stdout_handle = open(self.stdout_file, "w")
if self.stderr_file:
stderr_handle = open(self.stderr_file, "w")
self.run_with_handles(stdout_handle, stderr_handle)
finally:
if stdout_handle:
stdout_handle.close()
if stderr_handle:
stderr_handle.close()
def run_with_handles(self, stdout_handle, stderr_handle):
child = subprocess.Popen(
self.args,
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
alive = True
while True:
(rds, wds, xds) = select.select([child.stdout, child.stderr], [], [], 1)
if child.stdout in rds:
line = child.stdout.readline()
if stdout_handle:
stdout_handle.write(line)
stdout_handle.flush()
if self.capture_stdout:
self.captured_stdout += line
if self.show_stdout:
sys.stdout.write(line)
sys.stdout.flush()
if child.stderr in rds:
line = child.stderr.readline()
if stderr_handle:
stderr_handle.write(line)
stderr_handle.flush()
if self.capture_stderr:
self.captured_stderr += line
if self.show_stderr:
sys.stderr.write(line)
sys.stderr.flush()
if self.should_die.is_set():
child.terminate()
alive = False
if child.poll() is not None:
if not alive:
break
else:
raise RuntimeError("Subprocess has died. Aborting.")
def wait_for(self, pattern, timeout=10):
t1 = time.time()
while True:
t2 = time.time()
if t2 - t1 >= timeout:
raise RuntimeError("Waiting for %r timed out" % pattern)
if re.search(pattern, self.captured_stdout) is not None:
return
if re.search(pattern, self.captured_stderr) is not None:
return
time.sleep(0.1)
def start(self):
threading.Thread.start(self)
def stop(self):
self.should_die.set()
self.join()
class ZookeeperFixture(object):
@staticmethod
def instance():
if "ZOOKEEPER_URI" in os.environ: if "ZOOKEEPER_URI" in os.environ:
parse = urlparse(os.environ["ZOOKEEPER_URI"]) parse = urlparse(os.environ["ZOOKEEPER_URI"])
(host, port) = (parse.hostname, parse.port) (host, port) = (parse.hostname, parse.port)
fixture = ExternalService(host, port) fixture = ExternalService(host, port)
else: else:
(host, port) = ("127.0.0.1", get_open_port()) (host, port) = ("127.0.0.1", get_open_port())
fixture = ZookeeperFixture(host, port) fixture = cls(host, port)
fixture.open()
fixture.open()
return fixture return fixture
def __init__(self, host, port): def __init__(self, host, port):
@@ -209,22 +74,22 @@ class ZookeeperFixture(object):
self.child = None self.child = None
def out(self, message): def out(self, message):
print("*** Zookeeper [%s:%d]: %s" % (self.host, self.port, message)) logging.info("*** Zookeeper [%s:%d]: %s", self.host, self.port, message)
def open(self): def open(self):
self.tmp_dir = tempfile.mkdtemp() self.tmp_dir = tempfile.mkdtemp()
self.out("Running local instance...") self.out("Running local instance...")
print(" host = %s" % self.host) logging.info(" host = %s", self.host)
print(" port = %s" % self.port) logging.info(" port = %s", self.port)
print(" tmp_dir = %s" % self.tmp_dir) logging.info(" tmp_dir = %s", self.tmp_dir)
# Generate configs # Generate configs
template = test_resource("zookeeper.properties") template = self.test_resource("zookeeper.properties")
properties = os.path.join(self.tmp_dir, "zookeeper.properties") properties = os.path.join(self.tmp_dir, "zookeeper.properties")
render_template(template, properties, vars(self)) self.render_template(template, properties, vars(self))
# Configure Zookeeper child process # Configure Zookeeper child process
self.child = SpawnedService(kafka_run_class_args( self.child = SpawnedService(self.kafka_run_class_args(
"org.apache.zookeeper.server.quorum.QuorumPeerMain", "org.apache.zookeeper.server.quorum.QuorumPeerMain",
properties properties
)) ))
@@ -245,9 +110,9 @@ class ZookeeperFixture(object):
shutil.rmtree(self.tmp_dir) shutil.rmtree(self.tmp_dir)
class KafkaFixture(object): class KafkaFixture(Fixture):
@staticmethod @classmethod
def instance(broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2): def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2):
if zk_chroot is None: if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
if "KAFKA_URI" in os.environ: if "KAFKA_URI" in os.environ:
@@ -278,7 +143,7 @@ class KafkaFixture(object):
self.running = False self.running = False
def out(self, message): def out(self, message):
print("*** Kafka [%s:%d]: %s" % (self.host, self.port, message)) logging.info("*** Kafka [%s:%d]: %s", self.host, self.port, message)
def open(self): def open(self):
if self.running: if self.running:
@@ -287,27 +152,27 @@ class KafkaFixture(object):
self.tmp_dir = tempfile.mkdtemp() self.tmp_dir = tempfile.mkdtemp()
self.out("Running local instance...") self.out("Running local instance...")
print(" host = %s" % self.host) logging.info(" host = %s", self.host)
print(" port = %s" % self.port) logging.info(" port = %s", self.port)
print(" broker_id = %s" % self.broker_id) logging.info(" broker_id = %s", self.broker_id)
print(" zk_host = %s" % self.zk_host) logging.info(" zk_host = %s", self.zk_host)
print(" zk_port = %s" % self.zk_port) logging.info(" zk_port = %s", self.zk_port)
print(" zk_chroot = %s" % self.zk_chroot) logging.info(" zk_chroot = %s", self.zk_chroot)
print(" replicas = %s" % self.replicas) logging.info(" replicas = %s", self.replicas)
print(" partitions = %s" % self.partitions) logging.info(" partitions = %s", self.partitions)
print(" tmp_dir = %s" % self.tmp_dir) logging.info(" tmp_dir = %s", self.tmp_dir)
# Create directories # Create directories
os.mkdir(os.path.join(self.tmp_dir, "logs")) os.mkdir(os.path.join(self.tmp_dir, "logs"))
os.mkdir(os.path.join(self.tmp_dir, "data")) os.mkdir(os.path.join(self.tmp_dir, "data"))
# Generate configs # Generate configs
template = test_resource("kafka.properties") template = self.test_resource("kafka.properties")
properties = os.path.join(self.tmp_dir, "kafka.properties") properties = os.path.join(self.tmp_dir, "kafka.properties")
render_template(template, properties, vars(self)) self.render_template(template, properties, vars(self))
# Configure Kafka child process # Configure Kafka child process
self.child = SpawnedService(kafka_run_class_args( self.child = SpawnedService(self.kafka_run_class_args(
"kafka.Kafka", properties "kafka.Kafka", properties
)) ))
self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt")) self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt"))
@@ -315,13 +180,18 @@ class KafkaFixture(object):
# Party! # Party!
self.out("Creating Zookeeper chroot node...") self.out("Creating Zookeeper chroot node...")
proc = subprocess.Popen(kafka_run_class_args( proc = subprocess.Popen(self.kafka_run_class_args(
"org.apache.zookeeper.ZooKeeperMain", "org.apache.zookeeper.ZooKeeperMain",
"-server", "%s:%d" % (self.zk_host, self.zk_port), "-server", "%s:%d" % (self.zk_host, self.zk_port),
"create", "/%s" % self.zk_chroot, "kafka-python" "create", "/%s" % self.zk_chroot, "kafka-python"
)) ),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if proc.wait() != 0: if proc.wait() != 0:
self.out("Failed to create Zookeeper chroot node") self.out("Failed to create Zookeeper chroot node")
self.out(proc.stdout)
self.out(proc.stderr)
raise RuntimeError("Failed to create Zookeeper chroot node") raise RuntimeError("Failed to create Zookeeper chroot node")
self.out("Done!") self.out("Done!")

129
test/service.py Normal file
View File

@@ -0,0 +1,129 @@
import re
import select
import subprocess
import sys
import threading
import time
__all__ = [
'ExternalService',
'SpawnedService',
]
class ExternalService(object):
def __init__(self, host, port):
print("Using already running service at %s:%d" % (host, port))
self.host = host
self.port = port
def open(self):
pass
def close(self):
pass
class SpawnedService(threading.Thread):
def __init__(self, args=[]):
threading.Thread.__init__(self)
self.args = args
self.captured_stdout = ""
self.captured_stderr = ""
self.stdout_file = None
self.stderr_file = None
self.capture_stdout = True
self.capture_stderr = True
self.show_stdout = True
self.show_stderr = True
self.should_die = threading.Event()
def configure_stdout(self, file=None, capture=True, show=False):
self.stdout_file = file
self.capture_stdout = capture
self.show_stdout = show
def configure_stderr(self, file=None, capture=False, show=True):
self.stderr_file = file
self.capture_stderr = capture
self.show_stderr = show
def run(self):
stdout_handle = None
stderr_handle = None
try:
if self.stdout_file:
stdout_handle = open(self.stdout_file, "w")
if self.stderr_file:
stderr_handle = open(self.stderr_file, "w")
self.run_with_handles(stdout_handle, stderr_handle)
finally:
if stdout_handle:
stdout_handle.close()
if stderr_handle:
stderr_handle.close()
def run_with_handles(self, stdout_handle, stderr_handle):
child = subprocess.Popen(
self.args,
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
alive = True
while True:
(rds, wds, xds) = select.select([child.stdout, child.stderr], [], [], 1)
if child.stdout in rds:
line = child.stdout.readline()
if stdout_handle:
stdout_handle.write(line)
stdout_handle.flush()
if self.capture_stdout:
self.captured_stdout += line
if self.show_stdout:
sys.stdout.write(line)
sys.stdout.flush()
if child.stderr in rds:
line = child.stderr.readline()
if stderr_handle:
stderr_handle.write(line)
stderr_handle.flush()
if self.capture_stderr:
self.captured_stderr += line
if self.show_stderr:
sys.stderr.write(line)
sys.stderr.flush()
if self.should_die.is_set():
child.terminate()
alive = False
if child.poll() is not None:
if not alive:
break
else:
raise RuntimeError("Subprocess has died. Aborting.")
def wait_for(self, pattern, timeout=10):
t1 = time.time()
while True:
t2 = time.time()
if t2 - t1 >= timeout:
raise RuntimeError("Waiting for %r timed out" % pattern)
if re.search(pattern, self.captured_stdout) is not None:
return
if re.search(pattern, self.captured_stderr) is not None:
return
time.sleep(0.1)
def start(self):
threading.Thread.start(self)
def stop(self):
self.should_die.set()
self.join()

View File

@@ -22,11 +22,12 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
def test_timeout(self): def test_timeout(self):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('localhost', 14567)) server_port = get_open_port()
server_socket.bind(('localhost', server_port))
with Timer() as t: with Timer() as t:
with self.assertRaises((socket.timeout, socket.error)): with self.assertRaises((socket.timeout, socket.error)):
conn = kafka.conn.KafkaConnection("localhost", 14567, 1.0) conn = kafka.conn.KafkaConnection("localhost", server_port, 1.0)
self.assertGreaterEqual(t.interval, 1.0) self.assertGreaterEqual(t.interval, 1.0)
def test_consume_none(self): def test_consume_none(self):

View File

@@ -1,3 +1,4 @@
import uuid
import time import time
import unittest import unittest

View File

@@ -1,13 +1,24 @@
import uuid import logging
import time
import unittest
import os import os
import random import random
import socket
import string import string
import logging import time
import unittest
import uuid
from kafka.common import OffsetRequest from kafka.common import OffsetRequest
from kafka import KafkaClient from kafka import KafkaClient
__all__ = [
'random_string',
'skip_integration',
'ensure_topic_creation',
'get_open_port',
'KafkaIntegrationTestCase',
'Timer',
]
def random_string(l): def random_string(l):
s = "".join(random.choice(string.letters) for i in xrange(l)) s = "".join(random.choice(string.letters) for i in xrange(l))
return s return s
@@ -25,6 +36,13 @@ def ensure_topic_creation(client, topic_name, timeout = 30):
client.load_metadata_for_topics(topic_name) client.load_metadata_for_topics(topic_name)
time.sleep(1) time.sleep(1)
def get_open_port():
sock = socket.socket()
sock.bind(("", 0))
port = sock.getsockname()[1]
sock.close()
return port
class KafkaIntegrationTestCase(unittest.TestCase): class KafkaIntegrationTestCase(unittest.TestCase):
create_client = True create_client = True
topic = None topic = None