Files
deb-python-kafka/test/fixtures.py
mrtheb c304e3d4b0 Cherry-pick mrtheb/kafka-python 8b9c7e51
Sync tests and fixtures with kafka 0.8.0-beta1 tag

Conflicts:

	README.md
	kafka-src
2013-10-03 11:29:40 -04:00

321 lines
10 KiB
Python

import glob
import os
import re
import select
import shutil
import socket
import subprocess
import sys
import tempfile
import threading
import time
import uuid
from urlparse import urlparse
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
KAFKA_ROOT = os.path.join(PROJECT_ROOT, "kafka-src")
IVY_ROOT = os.path.expanduser("~/.ivy2/cache")
SCALA_VERSION = '2.8.0'
if "PROJECT_ROOT" in os.environ:
PROJECT_ROOT = os.environ["PROJECT_ROOT"]
if "KAFKA_ROOT" in os.environ:
KAFKA_ROOT = os.environ["KAFKA_ROOT"]
if "IVY_ROOT" in os.environ:
IVY_ROOT = os.environ["IVY_ROOT"]
if "SCALA_VERSION" in os.environ:
SCALA_VERSION = os.environ["SCALA_VERSION"]
def test_resource(file):
return os.path.join(PROJECT_ROOT, "test", "resources", file)
def test_classpath():
# ./kafka-src/bin/kafka-run-class.sh is the authority.
jars = ["."]
# assume all dependencies have been packaged into one jar with sbt-assembly's task "assembly-package-dependency"
jars.extend(glob.glob(KAFKA_ROOT + "/core/target/scala-%s/*.jar" % SCALA_VERSION))
jars = filter(os.path.exists, map(os.path.abspath, jars))
return ":".join(jars)
def kafka_run_class_args(*args):
# ./kafka-src/bin/kafka-run-class.sh is the authority.
result = ["java", "-Xmx512M", "-server"]
result.append("-Dlog4j.configuration=file:%s" % test_resource("log4j.properties"))
result.append("-Dcom.sun.management.jmxremote")
result.append("-Dcom.sun.management.jmxremote.authenticate=false")
result.append("-Dcom.sun.management.jmxremote.ssl=false")
result.append("-cp")
result.append(test_classpath())
result.extend(args)
return result
def get_open_port():
sock = socket.socket()
sock.bind(("", 0))
port = sock.getsockname()[1]
sock.close()
return port
def render_template(source_file, target_file, binding):
with open(source_file, "r") as handle:
template = handle.read()
with open(target_file, "w") as handle:
handle.write(template.format(**binding))
class ExternalService(object):
def __init__(self, host, port):
print("Using already running service at %s:%d" % (host, port))
def open(self):
pass
def close(self):
pass
class SpawnedService(threading.Thread):
def __init__(self, args=[]):
threading.Thread.__init__(self)
self.args = args
self.captured_stdout = ""
self.captured_stderr = ""
self.stdout_file = None
self.stderr_file = None
self.capture_stdout = True
self.capture_stderr = True
self.show_stdout = True
self.show_stderr = True
self.should_die = threading.Event()
def configure_stdout(self, file=None, capture=True, show=False):
self.stdout_file = file
self.capture_stdout = capture
self.show_stdout = show
def configure_stderr(self, file=None, capture=False, show=True):
self.stderr_file = file
self.capture_stderr = capture
self.show_stderr = show
def run(self):
stdout_handle = None
stderr_handle = None
try:
if self.stdout_file:
stdout_handle = open(self.stdout_file, "w")
if self.stderr_file:
stderr_handle = open(self.stderr_file, "w")
self.run_with_handles(stdout_handle, stderr_handle)
finally:
if stdout_handle:
stdout_handle.close()
if stderr_handle:
stderr_handle.close()
def run_with_handles(self, stdout_handle, stderr_handle):
child = subprocess.Popen(
self.args,
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
alive = True
while True:
(rds, wds, xds) = select.select([child.stdout, child.stderr], [], [], 1)
if child.stdout in rds:
line = child.stdout.readline()
if stdout_handle:
stdout_handle.write(line)
stdout_handle.flush()
if self.capture_stdout:
self.captured_stdout += line
if self.show_stdout:
sys.stdout.write(line)
sys.stdout.flush()
if child.stderr in rds:
line = child.stderr.readline()
if stderr_handle:
stderr_handle.write(line)
stderr_handle.flush()
if self.capture_stderr:
self.captured_stderr += line
if self.show_stderr:
sys.stderr.write(line)
sys.stderr.flush()
if self.should_die.is_set():
child.terminate()
alive = False
if child.poll() is not None:
if not alive:
break
else:
raise RuntimeError("Subprocess has died. Aborting.")
def wait_for(self, pattern, timeout=10):
t1 = time.time()
while True:
t2 = time.time()
if t2 - t1 >= timeout:
raise RuntimeError("Waiting for %r timed out" % pattern)
if re.search(pattern, self.captured_stdout) is not None:
return
if re.search(pattern, self.captured_stderr) is not None:
return
time.sleep(0.1)
def start(self):
threading.Thread.start(self)
def stop(self):
self.should_die.set()
self.join()
class ZookeeperFixture(object):
@staticmethod
def instance():
if "ZOOKEEPER_URI" in os.environ:
parse = urlparse(os.environ["ZOOKEEPER_URI"])
(host, port) = (parse.hostname, parse.port)
fixture = ExternalService(host, port)
else:
(host, port) = ("127.0.0.1", get_open_port())
fixture = ZookeeperFixture(host, port)
fixture.open()
return fixture
def __init__(self, host, port):
self.host = host
self.port = port
self.tmp_dir = None
self.child = None
def open(self):
self.tmp_dir = tempfile.mkdtemp()
print("*** Running local Zookeeper instance...")
print(" host = %s" % self.host)
print(" port = %s" % self.port)
print(" tmp_dir = %s" % self.tmp_dir)
# Generate configs
template = test_resource("zookeeper.properties")
properties = os.path.join(self.tmp_dir, "zookeeper.properties")
render_template(template, properties, vars(self))
# Configure Zookeeper child process
self.child = SpawnedService(kafka_run_class_args(
"org.apache.zookeeper.server.quorum.QuorumPeerMain",
properties
))
self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt"))
self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))
# Party!
print("*** Starting Zookeeper...")
self.child.start()
self.child.wait_for(r"Snapshotting")
print("*** Done!")
def close(self):
print("*** Stopping Zookeeper...")
self.child.stop()
self.child = None
print("*** Done!")
shutil.rmtree(self.tmp_dir)
class KafkaFixture(object):
@staticmethod
def instance(broker_id, zk_host, zk_port, zk_chroot=None):
if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
if "KAFKA_URI" in os.environ:
parse = urlparse(os.environ["KAFKA_URI"])
(host, port) = (parse.hostname, parse.port)
fixture = ExternalService(host, port)
else:
(host, port) = ("127.0.0.1", get_open_port())
fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot)
fixture.open()
return fixture
def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot):
self.host = host
self.port = port
self.broker_id = broker_id
self.zk_host = zk_host
self.zk_port = zk_port
self.zk_chroot = zk_chroot
self.tmp_dir = None
self.child = None
def open(self):
self.tmp_dir = tempfile.mkdtemp()
print("*** Running local Kafka instance")
print(" host = %s" % self.host)
print(" port = %s" % self.port)
print(" broker_id = %s" % self.broker_id)
print(" zk_host = %s" % self.zk_host)
print(" zk_port = %s" % self.zk_port)
print(" zk_chroot = %s" % self.zk_chroot)
print(" tmp_dir = %s" % self.tmp_dir)
# Create directories
os.mkdir(os.path.join(self.tmp_dir, "logs"))
os.mkdir(os.path.join(self.tmp_dir, "data"))
# Generate configs
template = test_resource("kafka.properties")
properties = os.path.join(self.tmp_dir, "kafka.properties")
render_template(template, properties, vars(self))
# Configure Kafka child process
self.child = SpawnedService(kafka_run_class_args(
"kafka.Kafka", properties
))
self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt"))
self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))
# Party!
print("*** Creating Zookeeper chroot node...")
proc = subprocess.Popen(kafka_run_class_args(
"org.apache.zookeeper.ZooKeeperMain",
"-server", "%s:%d" % (self.zk_host, self.zk_port),
"create", "/%s" % self.zk_chroot, "kafka-python"
))
if proc.wait() != 0:
print("*** Failed to create Zookeeper chroot node")
raise RuntimeError("Failed to create Zookeeper chroot node")
print("*** Done!")
print("*** Starting Kafka...")
self.child.start()
self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id)
print("*** Done!")
def close(self):
print("*** Stopping Kafka...")
self.child.stop()
self.child = None
print("*** Done!")
shutil.rmtree(self.tmp_dir)