Add support for kafka 0.8.1

This commit is contained in:
Mark Roberts
2014-04-23 02:04:04 -07:00
parent 6628c109b7
commit 7e5c847aa9
8 changed files with 116 additions and 4 deletions

3
.gitmodules vendored
View File

@@ -1,3 +1,6 @@
[submodule "servers/0.8.0/kafka-src"]
path = servers/0.8.0/kafka-src
url = https://github.com/apache/kafka.git
[submodule "servers/0.8.1/kafka-src"]
path = servers/0.8.1/kafka-src
url = https://github.com/apache/kafka.git

View File

@@ -2,3 +2,4 @@
git submodule update --init
(cd servers/0.8.0/kafka-src && ./sbt update package assembly-package-dependency)
(cd servers/0.8.1/kafka-src && ./gradlew jarAll)

View File

@@ -0,0 +1,59 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
############################# Server Basics #############################
broker.id={broker_id}
############################# Socket Server Settings #############################
port={port}
host.name={host}
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs={tmp_dir}/data
num.partitions={partitions}
default.replication.factor={replicas}
############################# Log Flush Policy #############################
log.flush.interval.messages=10000
log.flush.interval.ms=1000
############################# Log Retention Policy #############################
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleanup.interval.mins=1
log.cleaner.enable=false
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
zookeeper.connection.timeout.ms=1000000

View File

@@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.logger.kafka=DEBUG, stdout
log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, stdout
log4j.logger.org.apache.zookeeper=INFO, stdout

View File

@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dataDir={tmp_dir}
clientPortAddress={host}
clientPort={port}
maxClientCnxns=0

View File

@@ -26,9 +26,13 @@ class Fixture(object):
# ./kafka-src/bin/kafka-run-class.sh is the authority.
jars = ["."]
# assume all dependencies have been packaged into one jar with sbt-assembly's task "assembly-package-dependency"
# 0.8.0 build path, should contain the core jar and a deps jar
jars.extend(glob.glob(cls.kafka_root + "/core/target/scala-%s/*.jar" % cls.scala_version))
# 0.8.1 build path, should contain the core jar and several dep jars
jars.extend(glob.glob(cls.kafka_root + "/core/build/libs/*.jar"))
jars.extend(glob.glob(cls.kafka_root + "/core/build/dependant-libs-%s/*.jar" % cls.scala_version))
jars = filter(os.path.exists, map(os.path.abspath, jars))
return ":".join(jars)

View File

@@ -45,7 +45,7 @@ class SpawnedService(threading.Thread):
self.capture_stdout = capture
self.show_stdout = show
def configure_stderr(self, file=None, capture=False, show=True):
def configure_stderr(self, file=None, capture=False, show=False):
self.stderr_file = file
self.capture_stderr = capture
self.show_stderr = show
@@ -114,9 +114,10 @@ class SpawnedService(threading.Thread):
t2 = time.time()
if t2 - t1 >= timeout:
raise RuntimeError("Waiting for %r timed out" % pattern)
if re.search(pattern, self.captured_stdout) is not None:
if re.search(pattern, self.captured_stdout, re.IGNORECASE) is not None:
return
if re.search(pattern, self.captured_stderr) is not None:
if re.search(pattern, self.captured_stderr, re.IGNORECASE) is not None:
return
time.sleep(0.1)