diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py index 792d1b9880..5ecc954a48 100644 --- a/zuul/launcher/ansiblelaunchserver.py +++ b/zuul/launcher/ansiblelaunchserver.py @@ -14,7 +14,6 @@ import json import logging -import multiprocessing import os import re import shutil @@ -23,8 +22,8 @@ import socket import subprocess import tempfile import threading -import time import traceback +import Queue import uuid import gear @@ -79,14 +78,10 @@ class LaunchServer(object): self.keep_jobdir = keep_jobdir self.hostname = socket.gethostname() self.node_workers = {} - # This has the side effect of creating the logger; our logging - # config will handle the rest. - multiprocessing.get_logger() - self.mpmanager = multiprocessing.Manager() - self.jobs = self.mpmanager.dict() - self.builds = self.mpmanager.dict() - self.zmq_send_queue = multiprocessing.JoinableQueue() - self.termination_queue = multiprocessing.JoinableQueue() + self.jobs = {} + self.builds = {} + self.zmq_send_queue = Queue.Queue() + self.termination_queue = Queue.Queue() self.sites = {} self.static_nodes = {} if config.has_option('launcher', 'accept-nodes'): @@ -188,11 +183,6 @@ class LaunchServer(object): self.gearman_thread.daemon = True self.gearman_thread.start() - # FIXME: Without this, sometimes the subprocess module does - # not actually launch any subprocesses. I have no - # explanation. -corvus - time.sleep(1) - # Start static workers for node in self.static_nodes.values(): self.log.debug("Creating static node with arguments: %s" % (node,)) @@ -320,8 +310,8 @@ class LaunchServer(object): self.termination_queue, self.keep_jobdir) self.node_workers[worker.name] = worker - worker.process = multiprocessing.Process(target=worker.run) - worker.process.start() + worker.thread = threading.Thread(target=worker.run) + worker.thread.start() def stopJob(self, job): try: @@ -358,7 +348,7 @@ class LaunchServer(object): continue worker = self.node_workers[item] self.log.debug("Joining %s" % (item,)) - worker.process.join() + worker.thread.join() self.log.debug("Joined %s" % (item,)) del self.node_workers[item] except Exception: @@ -384,10 +374,10 @@ class NodeWorker(object): if not isinstance(labels, list): labels = [labels] self.labels = labels - self.process = None + self.thread = None self.registered_functions = set() self._running = True - self.queue = multiprocessing.JoinableQueue() + self.queue = Queue.Queue() self.manager_name = manager_name self.zmq_send_queue = zmq_send_queue self.termination_queue = termination_queue @@ -408,12 +398,11 @@ class NodeWorker(object): def isAlive(self): # Meant to be called from the manager - if self.process and self.process.is_alive(): + if self.thread and self.thread.is_alive(): return True return False def run(self): - signal.signal(signal.SIGINT, signal.SIG_IGN) self.log.debug("Node worker %s starting" % (self.name,)) server = self.config.get('gearman', 'server') if self.config.has_option('gearman', 'port'):