237 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			237 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| import os
 | |
| import os.path
 | |
| import shutil
 | |
| import subprocess
 | |
| import tempfile
 | |
| from six.moves import urllib
 | |
| import uuid
 | |
| 
 | |
| from six.moves.urllib.parse import urlparse  # pylint: disable-msg=E0611
 | |
| from test.service import ExternalService, SpawnedService
 | |
| from test.testutil import get_open_port
 | |
| 
 | |
| class Fixture(object):
 | |
|     kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0')
 | |
|     scala_version = os.environ.get("SCALA_VERSION", '2.8.0')
 | |
|     project_root = os.environ.get('PROJECT_ROOT', os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
 | |
|     kafka_root = os.environ.get("KAFKA_ROOT", os.path.join(project_root, 'servers', kafka_version, "kafka-bin"))
 | |
|     ivy_root = os.environ.get('IVY_ROOT', os.path.expanduser("~/.ivy2/cache"))
 | |
| 
 | |
|     @classmethod
 | |
|     def download_official_distribution(cls,
 | |
|                                        kafka_version=None,
 | |
|                                        scala_version=None,
 | |
|                                        output_dir=None):
 | |
|         if not kafka_version:
 | |
|             kafka_version = cls.kafka_version
 | |
|         if not scala_version:
 | |
|             scala_version = cls.scala_version
 | |
|         if not output_dir:
 | |
|             output_dir = os.path.join(cls.project_root, 'servers', 'dist')
 | |
| 
 | |
|         distfile = 'kafka_%s-%s' % (scala_version, kafka_version,)
 | |
|         url_base = 'https://archive.apache.org/dist/kafka/%s/' % (kafka_version,)
 | |
|         output_file = os.path.join(output_dir, distfile + '.tgz')
 | |
| 
 | |
|         if os.path.isfile(output_file):
 | |
|             logging.info("Found file already on disk: %s", output_file)
 | |
|             return output_file
 | |
| 
 | |
|         # New tarballs are .tgz, older ones are sometimes .tar.gz
 | |
|         try:
 | |
|             url = url_base + distfile + '.tgz'
 | |
|             logging.info("Attempting to download %s", url)
 | |
|             response = urllib.request.urlopen(url)
 | |
|         except urllib.error.HTTPError:
 | |
|             logging.exception("HTTP Error")
 | |
|             url = url_base + distfile + '.tar.gz'
 | |
|             logging.info("Attempting to download %s", url)
 | |
|             response = urllib.request.urlopen(url)
 | |
| 
 | |
|         logging.info("Saving distribution file to %s", output_file)
 | |
|         with open(output_file, 'w') as output_file_fd:
 | |
|             output_file_fd.write(response.read())
 | |
| 
 | |
|         return output_file
 | |
| 
 | |
|     @classmethod
 | |
|     def test_resource(cls, filename):
 | |
|         return os.path.join(cls.project_root, "servers", cls.kafka_version, "resources", filename)
 | |
| 
 | |
|     @classmethod
 | |
|     def kafka_run_class_args(cls, *args):
 | |
|         result = [os.path.join(cls.kafka_root, 'bin', 'kafka-run-class.sh')]
 | |
|         result.extend(args)
 | |
|         return result
 | |
| 
 | |
|     @classmethod
 | |
|     def kafka_run_class_env(cls):
 | |
|         env = os.environ.copy()
 | |
|         env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % cls.test_resource("log4j.properties")
 | |
|         return env
 | |
| 
 | |
|     @classmethod
 | |
|     def render_template(cls, source_file, target_file, binding):
 | |
|         with open(source_file, "r") as handle:
 | |
|             template = handle.read()
 | |
|         with open(target_file, "w") as handle:
 | |
|             handle.write(template.format(**binding))
 | |
| 
 | |
| 
 | |
| class ZookeeperFixture(Fixture):
 | |
|     @classmethod
 | |
|     def instance(cls):
 | |
|         if "ZOOKEEPER_URI" in os.environ:
 | |
|             parse = urlparse(os.environ["ZOOKEEPER_URI"])
 | |
|             (host, port) = (parse.hostname, parse.port)
 | |
|             fixture = ExternalService(host, port)
 | |
|         else:
 | |
|             (host, port) = ("127.0.0.1", get_open_port())
 | |
|             fixture = cls(host, port)
 | |
| 
 | |
|         fixture.open()
 | |
|         return fixture
 | |
| 
 | |
|     def __init__(self, host, port):
 | |
|         self.host = host
 | |
|         self.port = port
 | |
| 
 | |
|         self.tmp_dir = None
 | |
|         self.child = None
 | |
| 
 | |
|     def out(self, message):
 | |
|         logging.info("*** Zookeeper [%s:%d]: %s", self.host, self.port, message)
 | |
| 
 | |
|     def open(self):
 | |
|         self.tmp_dir = tempfile.mkdtemp()
 | |
|         self.out("Running local instance...")
 | |
|         logging.info("  host    = %s", self.host)
 | |
|         logging.info("  port    = %s", self.port)
 | |
|         logging.info("  tmp_dir = %s", self.tmp_dir)
 | |
| 
 | |
|         # Generate configs
 | |
|         template = self.test_resource("zookeeper.properties")
 | |
|         properties = os.path.join(self.tmp_dir, "zookeeper.properties")
 | |
|         self.render_template(template, properties, vars(self))
 | |
| 
 | |
|         # Configure Zookeeper child process
 | |
|         args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties)
 | |
|         env = self.kafka_run_class_env()
 | |
|         self.child = SpawnedService(args, env)
 | |
| 
 | |
|         # Party!
 | |
|         self.out("Starting...")
 | |
|         self.child.start()
 | |
|         self.child.wait_for(r"binding to port")
 | |
|         self.out("Done!")
 | |
| 
 | |
|     def close(self):
 | |
|         self.out("Stopping...")
 | |
|         self.child.stop()
 | |
|         self.child = None
 | |
|         self.out("Done!")
 | |
|         shutil.rmtree(self.tmp_dir)
 | |
| 
 | |
| 
 | |
| class KafkaFixture(Fixture):
 | |
|     @classmethod
 | |
|     def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2):
 | |
|         if zk_chroot is None:
 | |
|             zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
 | |
|         if "KAFKA_URI" in os.environ:
 | |
|             parse = urlparse(os.environ["KAFKA_URI"])
 | |
|             (host, port) = (parse.hostname, parse.port)
 | |
|             fixture = ExternalService(host, port)
 | |
|         else:
 | |
|             (host, port) = ("127.0.0.1", get_open_port())
 | |
|             fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot, replicas, partitions)
 | |
|             fixture.open()
 | |
|         return fixture
 | |
| 
 | |
|     def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=1, partitions=2):
 | |
|         self.host = host
 | |
|         self.port = port
 | |
| 
 | |
|         self.broker_id = broker_id
 | |
| 
 | |
|         self.zk_host = zk_host
 | |
|         self.zk_port = zk_port
 | |
|         self.zk_chroot = zk_chroot
 | |
| 
 | |
|         self.replicas = replicas
 | |
|         self.partitions = partitions
 | |
| 
 | |
|         self.tmp_dir = None
 | |
|         self.child = None
 | |
|         self.running = False
 | |
| 
 | |
|     def out(self, message):
 | |
|         logging.info("*** Kafka [%s:%d]: %s", self.host, self.port, message)
 | |
| 
 | |
|     def open(self):
 | |
|         if self.running:
 | |
|             self.out("Instance already running")
 | |
|             return
 | |
| 
 | |
|         self.tmp_dir = tempfile.mkdtemp()
 | |
|         self.out("Running local instance...")
 | |
|         logging.info("  host       = %s", self.host)
 | |
|         logging.info("  port       = %s", self.port)
 | |
|         logging.info("  broker_id  = %s", self.broker_id)
 | |
|         logging.info("  zk_host    = %s", self.zk_host)
 | |
|         logging.info("  zk_port    = %s", self.zk_port)
 | |
|         logging.info("  zk_chroot  = %s", self.zk_chroot)
 | |
|         logging.info("  replicas   = %s", self.replicas)
 | |
|         logging.info("  partitions = %s", self.partitions)
 | |
|         logging.info("  tmp_dir    = %s", self.tmp_dir)
 | |
| 
 | |
|         # Create directories
 | |
|         os.mkdir(os.path.join(self.tmp_dir, "logs"))
 | |
|         os.mkdir(os.path.join(self.tmp_dir, "data"))
 | |
| 
 | |
|         # Generate configs
 | |
|         template = self.test_resource("kafka.properties")
 | |
|         properties = os.path.join(self.tmp_dir, "kafka.properties")
 | |
|         self.render_template(template, properties, vars(self))
 | |
| 
 | |
|         # Configure Kafka child process
 | |
|         args = self.kafka_run_class_args("kafka.Kafka", properties)
 | |
|         env = self.kafka_run_class_env()
 | |
|         self.child = SpawnedService(args, env)
 | |
| 
 | |
|         # Party!
 | |
|         self.out("Creating Zookeeper chroot node...")
 | |
|         args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain",
 | |
|                                          "-server", "%s:%d" % (self.zk_host, self.zk_port),
 | |
|                                          "create",
 | |
|                                          "/%s" % self.zk_chroot,
 | |
|                                          "kafka-python")
 | |
|         env = self.kafka_run_class_env()
 | |
|         proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | |
| 
 | |
|         if proc.wait() != 0:
 | |
|             self.out("Failed to create Zookeeper chroot node")
 | |
|             self.out(proc.stdout.read())
 | |
|             self.out(proc.stderr.read())
 | |
|             raise RuntimeError("Failed to create Zookeeper chroot node")
 | |
|         self.out("Done!")
 | |
| 
 | |
|         self.out("Starting...")
 | |
|         self.child.start()
 | |
|         self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id)
 | |
|         self.out("Done!")
 | |
|         self.running = True
 | |
| 
 | |
|     def close(self):
 | |
|         if not self.running:
 | |
|             self.out("Instance already stopped")
 | |
|             return
 | |
| 
 | |
|         self.out("Stopping...")
 | |
|         self.child.stop()
 | |
|         self.child = None
 | |
|         self.out("Done!")
 | |
|         shutil.rmtree(self.tmp_dir)
 | |
|         self.running = False
 | 
