If some of the tests stop brokers then error out, the teardown method will try to close the same brokers and fail. This change allows it to continue.
339 lines
11 KiB
Python
339 lines
11 KiB
Python
import glob
|
|
import os
|
|
import re
|
|
import select
|
|
import shutil
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import uuid
|
|
|
|
from urlparse import urlparse
|
|
|
|
|
|
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
|
KAFKA_ROOT = os.path.join(PROJECT_ROOT, "kafka-src")
|
|
IVY_ROOT = os.path.expanduser("~/.ivy2/cache")
|
|
SCALA_VERSION = '2.8.0'
|
|
|
|
if "PROJECT_ROOT" in os.environ:
|
|
PROJECT_ROOT = os.environ["PROJECT_ROOT"]
|
|
if "KAFKA_ROOT" in os.environ:
|
|
KAFKA_ROOT = os.environ["KAFKA_ROOT"]
|
|
if "IVY_ROOT" in os.environ:
|
|
IVY_ROOT = os.environ["IVY_ROOT"]
|
|
if "SCALA_VERSION" in os.environ:
|
|
SCALA_VERSION = os.environ["SCALA_VERSION"]
|
|
|
|
|
|
def test_resource(file):
|
|
return os.path.join(PROJECT_ROOT, "test", "resources", file)
|
|
|
|
|
|
def test_classpath():
|
|
# ./kafka-src/bin/kafka-run-class.sh is the authority.
|
|
jars = ["."]
|
|
# assume all dependencies have been packaged into one jar with sbt-assembly's task "assembly-package-dependency"
|
|
jars.extend(glob.glob(KAFKA_ROOT + "/core/target/scala-%s/*.jar" % SCALA_VERSION))
|
|
|
|
jars = filter(os.path.exists, map(os.path.abspath, jars))
|
|
return ":".join(jars)
|
|
|
|
|
|
def kafka_run_class_args(*args):
|
|
# ./kafka-src/bin/kafka-run-class.sh is the authority.
|
|
result = ["java", "-Xmx512M", "-server"]
|
|
result.append("-Dlog4j.configuration=file:%s" % test_resource("log4j.properties"))
|
|
result.append("-Dcom.sun.management.jmxremote")
|
|
result.append("-Dcom.sun.management.jmxremote.authenticate=false")
|
|
result.append("-Dcom.sun.management.jmxremote.ssl=false")
|
|
result.append("-cp")
|
|
result.append(test_classpath())
|
|
result.extend(args)
|
|
return result
|
|
|
|
|
|
def get_open_port():
|
|
sock = socket.socket()
|
|
sock.bind(("", 0))
|
|
port = sock.getsockname()[1]
|
|
sock.close()
|
|
return port
|
|
|
|
|
|
def render_template(source_file, target_file, binding):
|
|
with open(source_file, "r") as handle:
|
|
template = handle.read()
|
|
with open(target_file, "w") as handle:
|
|
handle.write(template.format(**binding))
|
|
|
|
|
|
class ExternalService(object):
|
|
def __init__(self, host, port):
|
|
print("Using already running service at %s:%d" % (host, port))
|
|
self.host = host
|
|
self.port = port
|
|
|
|
def open(self):
|
|
pass
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
|
|
class SpawnedService(threading.Thread):
|
|
def __init__(self, args=[]):
|
|
threading.Thread.__init__(self)
|
|
|
|
self.args = args
|
|
self.captured_stdout = ""
|
|
self.captured_stderr = ""
|
|
self.stdout_file = None
|
|
self.stderr_file = None
|
|
self.capture_stdout = True
|
|
self.capture_stderr = True
|
|
self.show_stdout = True
|
|
self.show_stderr = True
|
|
|
|
self.should_die = threading.Event()
|
|
|
|
def configure_stdout(self, file=None, capture=True, show=False):
|
|
self.stdout_file = file
|
|
self.capture_stdout = capture
|
|
self.show_stdout = show
|
|
|
|
def configure_stderr(self, file=None, capture=False, show=True):
|
|
self.stderr_file = file
|
|
self.capture_stderr = capture
|
|
self.show_stderr = show
|
|
|
|
def run(self):
|
|
stdout_handle = None
|
|
stderr_handle = None
|
|
try:
|
|
if self.stdout_file:
|
|
stdout_handle = open(self.stdout_file, "w")
|
|
if self.stderr_file:
|
|
stderr_handle = open(self.stderr_file, "w")
|
|
self.run_with_handles(stdout_handle, stderr_handle)
|
|
finally:
|
|
if stdout_handle:
|
|
stdout_handle.close()
|
|
if stderr_handle:
|
|
stderr_handle.close()
|
|
|
|
def run_with_handles(self, stdout_handle, stderr_handle):
|
|
child = subprocess.Popen(
|
|
self.args,
|
|
bufsize=1,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
alive = True
|
|
|
|
while True:
|
|
(rds, wds, xds) = select.select([child.stdout, child.stderr], [], [], 1)
|
|
|
|
if child.stdout in rds:
|
|
line = child.stdout.readline()
|
|
if stdout_handle:
|
|
stdout_handle.write(line)
|
|
stdout_handle.flush()
|
|
if self.capture_stdout:
|
|
self.captured_stdout += line
|
|
if self.show_stdout:
|
|
sys.stdout.write(line)
|
|
sys.stdout.flush()
|
|
|
|
if child.stderr in rds:
|
|
line = child.stderr.readline()
|
|
if stderr_handle:
|
|
stderr_handle.write(line)
|
|
stderr_handle.flush()
|
|
if self.capture_stderr:
|
|
self.captured_stderr += line
|
|
if self.show_stderr:
|
|
sys.stderr.write(line)
|
|
sys.stderr.flush()
|
|
|
|
if self.should_die.is_set():
|
|
child.terminate()
|
|
alive = False
|
|
|
|
if child.poll() is not None:
|
|
if not alive:
|
|
break
|
|
else:
|
|
raise RuntimeError("Subprocess has died. Aborting.")
|
|
|
|
def wait_for(self, pattern, timeout=10):
|
|
t1 = time.time()
|
|
while True:
|
|
t2 = time.time()
|
|
if t2 - t1 >= timeout:
|
|
raise RuntimeError("Waiting for %r timed out" % pattern)
|
|
if re.search(pattern, self.captured_stdout) is not None:
|
|
return
|
|
if re.search(pattern, self.captured_stderr) is not None:
|
|
return
|
|
time.sleep(0.1)
|
|
|
|
def start(self):
|
|
threading.Thread.start(self)
|
|
|
|
def stop(self):
|
|
self.should_die.set()
|
|
self.join()
|
|
|
|
|
|
class ZookeeperFixture(object):
|
|
@staticmethod
|
|
def instance():
|
|
if "ZOOKEEPER_URI" in os.environ:
|
|
parse = urlparse(os.environ["ZOOKEEPER_URI"])
|
|
(host, port) = (parse.hostname, parse.port)
|
|
fixture = ExternalService(host, port)
|
|
else:
|
|
(host, port) = ("127.0.0.1", get_open_port())
|
|
fixture = ZookeeperFixture(host, port)
|
|
fixture.open()
|
|
return fixture
|
|
|
|
def __init__(self, host, port):
|
|
self.host = host
|
|
self.port = port
|
|
|
|
self.tmp_dir = None
|
|
self.child = None
|
|
|
|
def open(self):
|
|
self.tmp_dir = tempfile.mkdtemp()
|
|
print("*** Running local Zookeeper instance...")
|
|
print(" host = %s" % self.host)
|
|
print(" port = %s" % self.port)
|
|
print(" tmp_dir = %s" % self.tmp_dir)
|
|
|
|
# Generate configs
|
|
template = test_resource("zookeeper.properties")
|
|
properties = os.path.join(self.tmp_dir, "zookeeper.properties")
|
|
render_template(template, properties, vars(self))
|
|
|
|
# Configure Zookeeper child process
|
|
self.child = SpawnedService(kafka_run_class_args(
|
|
"org.apache.zookeeper.server.quorum.QuorumPeerMain",
|
|
properties
|
|
))
|
|
self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt"))
|
|
self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))
|
|
|
|
# Party!
|
|
print("*** Starting Zookeeper...")
|
|
self.child.start()
|
|
self.child.wait_for(r"Snapshotting")
|
|
print("*** Done!")
|
|
|
|
def close(self):
|
|
print("*** Stopping Zookeeper...")
|
|
self.child.stop()
|
|
self.child = None
|
|
print("*** Done!")
|
|
shutil.rmtree(self.tmp_dir)
|
|
|
|
|
|
class KafkaFixture(object):
|
|
@staticmethod
|
|
def instance(broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2):
|
|
if zk_chroot is None:
|
|
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
|
|
if "KAFKA_URI" in os.environ:
|
|
parse = urlparse(os.environ["KAFKA_URI"])
|
|
(host, port) = (parse.hostname, parse.port)
|
|
fixture = ExternalService(host, port)
|
|
else:
|
|
(host, port) = ("127.0.0.1", get_open_port())
|
|
fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot, replicas, partitions)
|
|
fixture.open()
|
|
return fixture
|
|
|
|
def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=1, partitions=2):
|
|
self.host = host
|
|
self.port = port
|
|
|
|
self.broker_id = broker_id
|
|
|
|
self.zk_host = zk_host
|
|
self.zk_port = zk_port
|
|
self.zk_chroot = zk_chroot
|
|
|
|
self.replicas = replicas
|
|
self.partitions = partitions
|
|
|
|
self.tmp_dir = None
|
|
self.child = None
|
|
self.running = False
|
|
|
|
def open(self):
|
|
if self.running:
|
|
print("*** Kafka instance already running")
|
|
return
|
|
|
|
self.tmp_dir = tempfile.mkdtemp()
|
|
print("*** Running local Kafka instance")
|
|
print(" host = %s" % self.host)
|
|
print(" port = %s" % self.port)
|
|
print(" broker_id = %s" % self.broker_id)
|
|
print(" zk_host = %s" % self.zk_host)
|
|
print(" zk_port = %s" % self.zk_port)
|
|
print(" zk_chroot = %s" % self.zk_chroot)
|
|
print(" replicas = %s" % self.replicas)
|
|
print(" partitions = %s" % self.partitions)
|
|
print(" tmp_dir = %s" % self.tmp_dir)
|
|
|
|
# Create directories
|
|
os.mkdir(os.path.join(self.tmp_dir, "logs"))
|
|
os.mkdir(os.path.join(self.tmp_dir, "data"))
|
|
|
|
# Generate configs
|
|
template = test_resource("kafka.properties")
|
|
properties = os.path.join(self.tmp_dir, "kafka.properties")
|
|
render_template(template, properties, vars(self))
|
|
|
|
# Configure Kafka child process
|
|
self.child = SpawnedService(kafka_run_class_args(
|
|
"kafka.Kafka", properties
|
|
))
|
|
self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt"))
|
|
self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))
|
|
|
|
# Party!
|
|
print("*** Creating Zookeeper chroot node...")
|
|
proc = subprocess.Popen(kafka_run_class_args(
|
|
"org.apache.zookeeper.ZooKeeperMain",
|
|
"-server", "%s:%d" % (self.zk_host, self.zk_port),
|
|
"create", "/%s" % self.zk_chroot, "kafka-python"
|
|
))
|
|
if proc.wait() != 0:
|
|
print("*** Failed to create Zookeeper chroot node")
|
|
raise RuntimeError("Failed to create Zookeeper chroot node")
|
|
print("*** Done!")
|
|
|
|
print("*** Starting Kafka...")
|
|
self.child.start()
|
|
self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id)
|
|
print("*** Done!")
|
|
self.running = True
|
|
|
|
def close(self):
|
|
if not self.running:
|
|
print("*** Kafka instance already stopped")
|
|
return
|
|
|
|
print("*** Stopping Kafka...")
|
|
self.child.stop()
|
|
self.child = None
|
|
print("*** Done!")
|
|
shutil.rmtree(self.tmp_dir)
|
|
self.running = False
|