Attempt to add ssl support to kafka fixtures

This commit is contained in:
Dana Powers
2016-04-04 19:54:22 -07:00
parent 01f03656cc
commit 097198ccea
5 changed files with 44 additions and 10 deletions

1
.gitignore vendored
View File

@@ -6,6 +6,7 @@ dist
MANIFEST
env
servers/*/kafka-bin*
servers/*/resources/ssl*
.coverage*
.noseids
docs/_build

View File

@@ -21,11 +21,20 @@ broker.id={broker_id}
############################# Socket Server Settings #############################
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
ssl.keystore.location={ssl_dir}/server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/server.truststore.jks
ssl.truststore.password=foobar
# The port the socket server listens on
port={port}
#port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name={host}
#host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from

View File

@@ -21,11 +21,20 @@ broker.id={broker_id}
############################# Socket Server Settings #############################
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
ssl.keystore.location={ssl_dir}/server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/server.truststore.jks
ssl.truststore.password=foobar
# The port the socket server listens on
port={port}
#port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name={host}
#host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from

View File

@@ -21,11 +21,20 @@ broker.id={broker_id}
############################# Socket Server Settings #############################
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
ssl.keystore.location={ssl_dir}/server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/server.truststore.jks
ssl.truststore.password=foobar
# The port the socket server listens on
port={port}
#port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name={host}
#host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from

View File

@@ -182,8 +182,8 @@ class ZookeeperFixture(Fixture):
class KafkaFixture(Fixture):
@classmethod
def instance(cls, broker_id, zk_host, zk_port,
zk_chroot=None, port=None, replicas=1, partitions=2):
def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, port=None,
transport='PLAINTEXT', replicas=1, partitions=2):
if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
if "KAFKA_URI" in os.environ:
@@ -194,16 +194,21 @@ class KafkaFixture(Fixture):
if port is None:
port = get_open_port()
host = "127.0.0.1"
fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot,
fixture = KafkaFixture(host, port, broker_id,
zk_host, zk_port, zk_chroot,
transport=transport,
replicas=replicas, partitions=partitions)
fixture.open()
return fixture
def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=1, partitions=2):
def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot,
replicas=1, partitions=2, transport='PLAINTEXT'):
self.host = host
self.port = port
self.broker_id = broker_id
self.transport = transport.upper()
self.ssl_dir = self.test_resource('ssl')
self.zk_host = zk_host
self.zk_port = zk_port
@@ -233,6 +238,7 @@ class KafkaFixture(Fixture):
self.out("Running local instance...")
log.info(" host = %s", self.host)
log.info(" port = %s", self.port)
log.info(" transport = %s", self.transport)
log.info(" broker_id = %s", self.broker_id)
log.info(" zk_host = %s", self.zk_host)
log.info(" zk_port = %s", self.zk_port)