Support setting kafka instance port explicitly in fixture
This commit is contained in:
@@ -151,7 +151,8 @@ class ZookeeperFixture(Fixture):
|
||||
|
||||
class KafkaFixture(Fixture):
|
||||
@classmethod
|
||||
def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2):
|
||||
def instance(cls, broker_id, zk_host, zk_port,
|
||||
zk_chroot=None, port=None, replicas=1, partitions=2):
|
||||
if zk_chroot is None:
|
||||
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
|
||||
if "KAFKA_URI" in os.environ:
|
||||
@@ -159,8 +160,11 @@ class KafkaFixture(Fixture):
|
||||
(host, port) = (parse.hostname, parse.port)
|
||||
fixture = ExternalService(host, port)
|
||||
else:
|
||||
(host, port) = ("127.0.0.1", get_open_port())
|
||||
fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot, replicas, partitions)
|
||||
if port is None:
|
||||
port = get_open_port()
|
||||
host = "127.0.0.1"
|
||||
fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot,
|
||||
replicas=replicas, partitions=partitions)
|
||||
fixture.open()
|
||||
return fixture
|
||||
|
||||
|
||||
@@ -27,8 +27,10 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
||||
|
||||
cls.zk = ZookeeperFixture.instance()
|
||||
chroot = random_string(10)
|
||||
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port, chroot)
|
||||
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port, chroot)
|
||||
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port,
|
||||
zk_chroot=chroot)
|
||||
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port,
|
||||
zk_chroot=chroot)
|
||||
|
||||
cls.server = cls.server1 # Bootstrapping server
|
||||
|
||||
|
||||
@@ -28,8 +28,11 @@ class TestFailover(KafkaIntegrationTestCase):
|
||||
|
||||
# mini zookeeper, 3 kafka brokers
|
||||
self.zk = ZookeeperFixture.instance()
|
||||
kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
|
||||
self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
|
||||
kk_args = [self.zk.host, self.zk.port]
|
||||
kk_kwargs = {'zk_chroot': zk_chroot, 'replicas': replicas,
|
||||
'partitions': partitions}
|
||||
self.brokers = [KafkaFixture.instance(i, *kk_args, **kk_kwargs)
|
||||
for i in range(replicas)]
|
||||
|
||||
hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers]
|
||||
self.client = SimpleClient(hosts, timeout=2)
|
||||
|
||||
Reference in New Issue
Block a user