Files
deb-python-kafka/test/fixtures.py
2016-04-25 16:26:44 -07:00

338 lines
12 KiB
Python

import atexit
import logging
import os
import os.path
import shutil
import subprocess
import tempfile
import time
import uuid
from six.moves import urllib
from six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
from test.service import ExternalService, SpawnedService
from test.testutil import get_open_port
log = logging.getLogger(__name__)
class Fixture(object):
kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0')
scala_version = os.environ.get("SCALA_VERSION", '2.8.0')
project_root = os.environ.get('PROJECT_ROOT', os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
kafka_root = os.environ.get("KAFKA_ROOT", os.path.join(project_root, 'servers', kafka_version, "kafka-bin"))
ivy_root = os.environ.get('IVY_ROOT', os.path.expanduser("~/.ivy2/cache"))
@classmethod
def download_official_distribution(cls,
kafka_version=None,
scala_version=None,
output_dir=None):
if not kafka_version:
kafka_version = cls.kafka_version
if not scala_version:
scala_version = cls.scala_version
if not output_dir:
output_dir = os.path.join(cls.project_root, 'servers', 'dist')
distfile = 'kafka_%s-%s' % (scala_version, kafka_version,)
url_base = 'https://archive.apache.org/dist/kafka/%s/' % (kafka_version,)
output_file = os.path.join(output_dir, distfile + '.tgz')
if os.path.isfile(output_file):
log.info("Found file already on disk: %s", output_file)
return output_file
# New tarballs are .tgz, older ones are sometimes .tar.gz
try:
url = url_base + distfile + '.tgz'
log.info("Attempting to download %s", url)
response = urllib.request.urlopen(url)
except urllib.error.HTTPError:
log.exception("HTTP Error")
url = url_base + distfile + '.tar.gz'
log.info("Attempting to download %s", url)
response = urllib.request.urlopen(url)
log.info("Saving distribution file to %s", output_file)
with open(output_file, 'w') as output_file_fd:
output_file_fd.write(response.read())
return output_file
@classmethod
def test_resource(cls, filename):
return os.path.join(cls.project_root, "servers", cls.kafka_version, "resources", filename)
@classmethod
def kafka_run_class_args(cls, *args):
result = [os.path.join(cls.kafka_root, 'bin', 'kafka-run-class.sh')]
result.extend(args)
return result
def kafka_run_class_env(self):
env = os.environ.copy()
env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % self.test_resource("log4j.properties")
return env
@classmethod
def render_template(cls, source_file, target_file, binding):
log.info('Rendering %s from template %s', target_file, source_file)
with open(source_file, "r") as handle:
template = handle.read()
assert len(template) > 0, 'Empty template %s' % source_file
with open(target_file, "w") as handle:
handle.write(template.format(**binding))
handle.flush()
os.fsync(handle)
# fsync directory for durability
# https://blog.gocept.com/2013/07/15/reliable-file-updates-with-python/
dirfd = os.open(os.path.dirname(target_file), os.O_DIRECTORY)
os.fsync(dirfd)
os.close(dirfd)
class ZookeeperFixture(Fixture):
@classmethod
def instance(cls):
if "ZOOKEEPER_URI" in os.environ:
parse = urlparse(os.environ["ZOOKEEPER_URI"])
(host, port) = (parse.hostname, parse.port)
fixture = ExternalService(host, port)
else:
(host, port) = ("127.0.0.1", get_open_port())
fixture = cls(host, port)
fixture.open()
return fixture
def __init__(self, host, port):
self.host = host
self.port = port
self.tmp_dir = None
self.child = None
def kafka_run_class_env(self):
env = super(ZookeeperFixture, self).kafka_run_class_env()
env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs')
return env
def out(self, message):
log.info("*** Zookeeper [%s:%d]: %s", self.host, self.port, message)
def open(self):
self.tmp_dir = tempfile.mkdtemp()
self.out("Running local instance...")
log.info(" host = %s", self.host)
log.info(" port = %s", self.port)
log.info(" tmp_dir = %s", self.tmp_dir)
# Generate configs
template = self.test_resource("zookeeper.properties")
properties = os.path.join(self.tmp_dir, "zookeeper.properties")
self.render_template(template, properties, vars(self))
# Configure Zookeeper child process
args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties)
env = self.kafka_run_class_env()
# Party!
timeout = 5
max_timeout = 30
backoff = 1
end_at = time.time() + max_timeout
tries = 1
while time.time() < end_at:
self.out('Attempting to start (try #%d)' % tries)
try:
os.stat(properties)
except:
log.warning('Config %s not found -- re-rendering', properties)
self.render_template(template, properties, vars(self))
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max(end_at - time.time(), 0))
if self.child.wait_for(r"binding to port", timeout=timeout):
break
self.child.stop()
timeout *= 2
time.sleep(backoff)
tries += 1
else:
raise Exception('Failed to start Zookeeper before max_timeout')
self.out("Done!")
atexit.register(self.close)
def close(self):
if self.child is None:
return
self.out("Stopping...")
self.child.stop()
self.child = None
self.out("Done!")
shutil.rmtree(self.tmp_dir)
def __del__(self):
self.close()
class KafkaFixture(Fixture):
@classmethod
def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None,
host=None, port=None,
transport='PLAINTEXT', replicas=1, partitions=2):
if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
if "KAFKA_URI" in os.environ:
parse = urlparse(os.environ["KAFKA_URI"])
(host, port) = (parse.hostname, parse.port)
fixture = ExternalService(host, port)
else:
if port is None:
port = get_open_port()
# force IPv6 here because of a confusing point:
#
# - if the string "localhost" is passed, Kafka will *only* bind to the IPv4 address of localhost
# (127.0.0.1); however, kafka-python will attempt to connect on ::1 and fail
#
# - if the address literal 127.0.0.1 is passed, the metadata request during bootstrap will return
# the name "localhost" and we'll go back to the first case. This is odd!
#
# Ideally, Kafka would bind to all loopback addresses when we tell it to listen on "localhost" the
# way it makes an IPv6 socket bound to both 0.0.0.0/0 and ::/0 when we tell it to bind to "" (that is
# to say, when we make a listener of PLAINTEXT://:port.
#
# Note that even though we specify the bind host in bracket notation, Kafka responds to the bootstrap
# metadata request without square brackets later.
if host is None:
host = "[::1]"
fixture = KafkaFixture(host, port, broker_id,
zk_host, zk_port, zk_chroot,
transport=transport,
replicas=replicas, partitions=partitions)
fixture.open()
return fixture
def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot,
replicas=1, partitions=2, transport='PLAINTEXT'):
self.host = host
self.port = port
self.broker_id = broker_id
self.transport = transport.upper()
self.ssl_dir = self.test_resource('ssl')
self.zk_host = zk_host
self.zk_port = zk_port
self.zk_chroot = zk_chroot
self.replicas = replicas
self.partitions = partitions
self.tmp_dir = None
self.child = None
self.running = False
def kafka_run_class_env(self):
env = super(KafkaFixture, self).kafka_run_class_env()
env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs')
return env
def out(self, message):
log.info("*** Kafka [%s:%d]: %s", self.host, self.port, message)
def open(self):
if self.running:
self.out("Instance already running")
return
self.tmp_dir = tempfile.mkdtemp()
self.out("Running local instance...")
log.info(" host = %s", self.host)
log.info(" port = %s", self.port)
log.info(" transport = %s", self.transport)
log.info(" broker_id = %s", self.broker_id)
log.info(" zk_host = %s", self.zk_host)
log.info(" zk_port = %s", self.zk_port)
log.info(" zk_chroot = %s", self.zk_chroot)
log.info(" replicas = %s", self.replicas)
log.info(" partitions = %s", self.partitions)
log.info(" tmp_dir = %s", self.tmp_dir)
# Create directories
os.mkdir(os.path.join(self.tmp_dir, "logs"))
os.mkdir(os.path.join(self.tmp_dir, "data"))
# Generate configs
template = self.test_resource("kafka.properties")
properties = os.path.join(self.tmp_dir, "kafka.properties")
self.render_template(template, properties, vars(self))
# Party!
self.out("Creating Zookeeper chroot node...")
args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain",
"-server", "%s:%d" % (self.zk_host, self.zk_port),
"create",
"/%s" % self.zk_chroot,
"kafka-python")
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if proc.wait() != 0:
self.out("Failed to create Zookeeper chroot node")
self.out(proc.stdout.read())
self.out(proc.stderr.read())
raise RuntimeError("Failed to create Zookeeper chroot node")
self.out("Done!")
# Configure Kafka child process
args = self.kafka_run_class_args("kafka.Kafka", properties)
env = self.kafka_run_class_env()
timeout = 5
max_timeout = 30
backoff = 1
end_at = time.time() + max_timeout
tries = 1
while time.time() < end_at:
self.out('Attempting to start (try #%d)' % tries)
try:
os.stat(properties)
except:
log.warning('Config %s not found -- re-rendering', properties)
self.render_template(template, properties, vars(self))
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max(end_at - time.time(), 0))
if self.child.wait_for(r"\[Kafka Server %d\], Started" %
self.broker_id, timeout=timeout):
break
self.child.stop()
timeout *= 2
time.sleep(backoff)
tries += 1
else:
raise Exception('Failed to start KafkaInstance before max_timeout')
self.out("Done!")
self.running = True
atexit.register(self.close)
def __del__(self):
self.close()
def close(self):
if not self.running:
self.out("Instance already stopped")
return
self.out("Stopping...")
self.child.stop()
self.child = None
self.out("Done!")
shutil.rmtree(self.tmp_dir)
self.running = False