Starting integration tests
This commit is contained in:
26
README.md
26
README.md
@@ -12,14 +12,34 @@ Copyright 2012, David Arthur under Apache License, v2.0. See `LICENSE`
|
||||
|
||||
This project is very much alpha. The API is in flux and not all the features are fully implemented.
|
||||
|
||||
# Usage
|
||||
# Tests
|
||||
|
||||
## Run the tests
|
||||
## Run the unit tests
|
||||
|
||||
```shell
|
||||
python -m unittest -v test
|
||||
python -m test.unit
|
||||
```
|
||||
|
||||
## Run the integration tests
|
||||
|
||||
First, checkout the Kafka source
|
||||
|
||||
```shell
|
||||
git submodule init
|
||||
git submodule update
|
||||
cd kafka-src
|
||||
./sbt update
|
||||
./sbt package
|
||||
```
|
||||
|
||||
Then from the root directory, run the integration tests
|
||||
|
||||
```shell
|
||||
python -m test.integration
|
||||
```
|
||||
|
||||
# Usage
|
||||
|
||||
## Send a message to a topic
|
||||
|
||||
You need to specify the topic and partition
|
||||
|
||||
100
test/integration.py
Normal file
100
test/integration.py
Normal file
@@ -0,0 +1,100 @@
|
||||
import os
|
||||
import select
|
||||
import shlex
|
||||
import shutil
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from threading import Thread, Event
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from kafka import KafkaClient, ProduceRequest, FetchRequest
|
||||
|
||||
def get_open_port():
|
||||
sock = socket.socket()
|
||||
sock.bind(('',0))
|
||||
port = sock.getsockname()[1]
|
||||
sock.close()
|
||||
return port
|
||||
|
||||
class KafkaFixture(Thread):
|
||||
def __init__(self, port):
|
||||
Thread.__init__(self)
|
||||
self.port = port
|
||||
self.capture = ""
|
||||
self.shouldDie = Event()
|
||||
self.tmpDir = tempfile.mkdtemp()
|
||||
|
||||
def run(self):
|
||||
# Create the log directory
|
||||
logDir = os.path.join(self.tmpDir, 'logs')
|
||||
os.mkdir(logDir)
|
||||
|
||||
# Create the config file
|
||||
configFile = os.path.join(self.tmpDir, 'server.properties')
|
||||
f = open('test/resources/server.properties', 'r')
|
||||
props = f.read()
|
||||
f = open(configFile, 'w')
|
||||
f.write(props % {'kafka.port': self.port, 'kafka.tmp.dir': logDir, 'kafka.partitions': 2})
|
||||
f.close()
|
||||
|
||||
# Start Kafka
|
||||
args = shlex.split("./kafka-src/bin/kafka-server-start.sh %s" % configFile)
|
||||
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()})
|
||||
|
||||
killed = False
|
||||
while True:
|
||||
(rlist, wlist, xlist) = select.select([proc.stdout], [], [], 1)
|
||||
if proc.stdout in rlist:
|
||||
read = proc.stdout.readline()
|
||||
sys.stdout.write(read)
|
||||
self.capture += read
|
||||
|
||||
if self.shouldDie.is_set():
|
||||
proc.terminate()
|
||||
killed = True
|
||||
|
||||
if proc.poll() is not None:
|
||||
shutil.rmtree(self.tmpDir)
|
||||
if killed:
|
||||
break
|
||||
else:
|
||||
raise RuntimeError("Kafka died. Aborting.")
|
||||
|
||||
def wait_for(self, target, timeout=10):
|
||||
t1 = time.time()
|
||||
while True:
|
||||
t2 = time.time()
|
||||
if t2-t1 >= timeout:
|
||||
return False
|
||||
if target in self.capture:
|
||||
return True
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
class IntegrationTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
port = get_open_port()
|
||||
self.server = KafkaFixture(port)
|
||||
self.server.start()
|
||||
self.server.wait_for("Kafka server started")
|
||||
self.kafka = KafkaClient("localhost", port)
|
||||
|
||||
def test_produce(self):
|
||||
req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")])
|
||||
self.kafka.send_message_set(req)
|
||||
self.assertTrue(self.server.wait_for("Created log for 'my-topic'-0"))
|
||||
|
||||
req = ProduceRequest("my-topic", 1, [KafkaClient.create_message("testing")])
|
||||
self.kafka.send_message_set(req)
|
||||
self.assertTrue(self.server.wait_for("Created log for 'my-topic'-1"))
|
||||
|
||||
def tearDown(self):
|
||||
self.kafka.close()
|
||||
self.server.shouldDie.set()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
30
test/resources/log4j.properties
Normal file
30
test/resources/log4j.properties
Normal file
@@ -0,0 +1,30 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
log4j.rootLogger=TRACE, stdout
|
||||
|
||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
|
||||
|
||||
#log4j.appender.fileAppender=org.apache.log4j.FileAppender
|
||||
#log4j.appender.fileAppender.File=kafka-request.log
|
||||
#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
|
||||
#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
|
||||
|
||||
|
||||
# Turn on all our debugging info
|
||||
#log4j.logger.kafka=INFO
|
||||
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
|
||||
|
||||
116
test/resources/server.properties
Normal file
116
test/resources/server.properties
Normal file
@@ -0,0 +1,116 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# see kafka.server.KafkaConfig for additional details and defaults
|
||||
|
||||
############################# Server Basics #############################
|
||||
|
||||
# The id of the broker. This must be set to a unique integer for each broker.
|
||||
brokerid=0
|
||||
|
||||
# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
|
||||
# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
|
||||
# may not be what you want.
|
||||
#hostname=
|
||||
|
||||
|
||||
############################# Socket Server Settings #############################
|
||||
|
||||
# The port the socket server listens on
|
||||
port=%(kafka.port)d
|
||||
|
||||
# The number of processor threads the socket server uses for receiving and answering requests.
|
||||
# Defaults to the number of cores on the machine
|
||||
num.threads=8
|
||||
|
||||
# The send buffer (SO_SNDBUF) used by the socket server
|
||||
socket.send.buffer=1048576
|
||||
|
||||
# The receive buffer (SO_RCVBUF) used by the socket server
|
||||
socket.receive.buffer=1048576
|
||||
|
||||
# The maximum size of a request that the socket server will accept (protection against OOM)
|
||||
max.socket.request.bytes=104857600
|
||||
|
||||
|
||||
############################# Log Basics #############################
|
||||
|
||||
# The directory under which to store log files
|
||||
log.dir=%(kafka.tmp.dir)s
|
||||
|
||||
# The number of logical partitions per topic per server. More partitions allow greater parallelism
|
||||
# for consumption, but also mean more files.
|
||||
num.partitions=%(kafka.partitions)d
|
||||
|
||||
# Overrides for for the default given by num.partitions on a per-topic basis
|
||||
#topic.partition.count.map=topic1:3, topic2:4
|
||||
|
||||
############################# Log Flush Policy #############################
|
||||
|
||||
# The following configurations control the flush of data to disk. This is the most
|
||||
# important performance knob in kafka.
|
||||
# There are a few important trade-offs here:
|
||||
# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
|
||||
# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
|
||||
# 3. Throughput: The flush is generally the most expensive operation.
|
||||
# The settings below allow one to configure the flush policy to flush data after a period of time or
|
||||
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
|
||||
|
||||
# The number of messages to accept before forcing a flush of data to disk
|
||||
log.flush.interval=10000
|
||||
|
||||
# The maximum amount of time a message can sit in a log before we force a flush
|
||||
log.default.flush.interval.ms=1000
|
||||
|
||||
# Per-topic overrides for log.default.flush.interval.ms
|
||||
#topic.flush.intervals.ms=topic1:1000, topic2:3000
|
||||
|
||||
# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
|
||||
log.default.flush.scheduler.interval.ms=1000
|
||||
|
||||
############################# Log Retention Policy #############################
|
||||
|
||||
# The following configurations control the disposal of log segments. The policy can
|
||||
# be set to delete segments after a period of time, or after a given size has accumulated.
|
||||
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
|
||||
# from the end of the log.
|
||||
|
||||
# The minimum age of a log file to be eligible for deletion
|
||||
log.retention.hours=168
|
||||
|
||||
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
|
||||
# segments don't drop below log.retention.size.
|
||||
#log.retention.size=1073741824
|
||||
|
||||
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
|
||||
log.file.size=536870912
|
||||
|
||||
# The interval at which log segments are checked to see if they can be deleted according
|
||||
# to the retention policies
|
||||
log.cleanup.interval.mins=1
|
||||
|
||||
############################# Zookeeper #############################
|
||||
|
||||
# Enable connecting to zookeeper
|
||||
enable.zookeeper=false
|
||||
|
||||
# Zk connection string (see zk docs for details).
|
||||
# This is a comma separated host:port pairs, each corresponding to a zk
|
||||
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
|
||||
# You can also append an optional chroot string to the urls to specify the
|
||||
# root directory for all kafka znodes.
|
||||
zk.connect=localhost:2181
|
||||
|
||||
# Timeout in ms for connecting to zookeeper
|
||||
zk.connectiontimeout.ms=1000000
|
||||
Reference in New Issue
Block a user