From 8fc762bc5e75931f18188605891bf4180db3eb9c Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Wed, 1 Jun 2016 10:59:14 -0700 Subject: [PATCH] Ansible launcher: Use threads for workers The SyncManager from the multiprocessing module seems to be exiting under high load, and not leaving any clues as to why. We can probably handle the scale we anticipate using threads for the moment at least, so switch to that. Change-Id: If235cf802bb50874ecbe8bc234f67bc66a36ee22 --- zuul/launcher/ansiblelaunchserver.py | 33 ++++++++++------------------ 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py index 792d1b9880..5ecc954a48 100644 --- a/zuul/launcher/ansiblelaunchserver.py +++ b/zuul/launcher/ansiblelaunchserver.py @@ -14,7 +14,6 @@ import json import logging -import multiprocessing import os import re import shutil @@ -23,8 +22,8 @@ import socket import subprocess import tempfile import threading -import time import traceback +import Queue import uuid import gear @@ -79,14 +78,10 @@ class LaunchServer(object): self.keep_jobdir = keep_jobdir self.hostname = socket.gethostname() self.node_workers = {} - # This has the side effect of creating the logger; our logging - # config will handle the rest. - multiprocessing.get_logger() - self.mpmanager = multiprocessing.Manager() - self.jobs = self.mpmanager.dict() - self.builds = self.mpmanager.dict() - self.zmq_send_queue = multiprocessing.JoinableQueue() - self.termination_queue = multiprocessing.JoinableQueue() + self.jobs = {} + self.builds = {} + self.zmq_send_queue = Queue.Queue() + self.termination_queue = Queue.Queue() self.sites = {} self.static_nodes = {} if config.has_option('launcher', 'accept-nodes'): @@ -188,11 +183,6 @@ class LaunchServer(object): self.gearman_thread.daemon = True self.gearman_thread.start() - # FIXME: Without this, sometimes the subprocess module does - # not actually launch any subprocesses. I have no - # explanation. -corvus - time.sleep(1) - # Start static workers for node in self.static_nodes.values(): self.log.debug("Creating static node with arguments: %s" % (node,)) @@ -320,8 +310,8 @@ class LaunchServer(object): self.termination_queue, self.keep_jobdir) self.node_workers[worker.name] = worker - worker.process = multiprocessing.Process(target=worker.run) - worker.process.start() + worker.thread = threading.Thread(target=worker.run) + worker.thread.start() def stopJob(self, job): try: @@ -358,7 +348,7 @@ class LaunchServer(object): continue worker = self.node_workers[item] self.log.debug("Joining %s" % (item,)) - worker.process.join() + worker.thread.join() self.log.debug("Joined %s" % (item,)) del self.node_workers[item] except Exception: @@ -384,10 +374,10 @@ class NodeWorker(object): if not isinstance(labels, list): labels = [labels] self.labels = labels - self.process = None + self.thread = None self.registered_functions = set() self._running = True - self.queue = multiprocessing.JoinableQueue() + self.queue = Queue.Queue() self.manager_name = manager_name self.zmq_send_queue = zmq_send_queue self.termination_queue = termination_queue @@ -408,12 +398,11 @@ class NodeWorker(object): def isAlive(self): # Meant to be called from the manager - if self.process and self.process.is_alive(): + if self.thread and self.thread.is_alive(): return True return False def run(self): - signal.signal(signal.SIGINT, signal.SIG_IGN) self.log.debug("Node worker %s starting" % (self.name,)) server = self.config.get('gearman', 'server') if self.config.has_option('gearman', 'port'):