Create a new child thread on each open fixture try
This commit is contained in:
@@ -122,11 +122,11 @@ class ZookeeperFixture(Fixture):
|
|||||||
# Configure Zookeeper child process
|
# Configure Zookeeper child process
|
||||||
args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties)
|
args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties)
|
||||||
env = self.kafka_run_class_env()
|
env = self.kafka_run_class_env()
|
||||||
self.child = SpawnedService(args, env)
|
|
||||||
|
|
||||||
# Party!
|
# Party!
|
||||||
self.out("Starting...")
|
self.out("Starting...")
|
||||||
while True:
|
while True:
|
||||||
|
self.child = SpawnedService(args, env)
|
||||||
self.child.start()
|
self.child.start()
|
||||||
if self.child.wait_for(r"binding to port", timeout=5):
|
if self.child.wait_for(r"binding to port", timeout=5):
|
||||||
break
|
break
|
||||||
@@ -202,11 +202,6 @@ class KafkaFixture(Fixture):
|
|||||||
properties = os.path.join(self.tmp_dir, "kafka.properties")
|
properties = os.path.join(self.tmp_dir, "kafka.properties")
|
||||||
self.render_template(template, properties, vars(self))
|
self.render_template(template, properties, vars(self))
|
||||||
|
|
||||||
# Configure Kafka child process
|
|
||||||
args = self.kafka_run_class_args("kafka.Kafka", properties)
|
|
||||||
env = self.kafka_run_class_env()
|
|
||||||
self.child = SpawnedService(args, env)
|
|
||||||
|
|
||||||
# Party!
|
# Party!
|
||||||
self.out("Creating Zookeeper chroot node...")
|
self.out("Creating Zookeeper chroot node...")
|
||||||
args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain",
|
args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain",
|
||||||
@@ -225,7 +220,13 @@ class KafkaFixture(Fixture):
|
|||||||
self.out("Done!")
|
self.out("Done!")
|
||||||
|
|
||||||
self.out("Starting...")
|
self.out("Starting...")
|
||||||
|
|
||||||
|
# Configure Kafka child process
|
||||||
|
args = self.kafka_run_class_args("kafka.Kafka", properties)
|
||||||
|
env = self.kafka_run_class_env()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
self.child = SpawnedService(args, env)
|
||||||
self.child.start()
|
self.child.start()
|
||||||
if self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id, timeout=5):
|
if self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id, timeout=5):
|
||||||
break
|
break
|
||||||
|
Reference in New Issue
Block a user