Ansible launcher: Use threads for workers

The SyncManager from the multiprocessing module seems to be exiting
under high load, and not leaving any clues as to why.  We can
probably handle the scale we anticipate using threads for the
moment at least, so switch to that.

Change-Id: If235cf802bb50874ecbe8bc234f67bc66a36ee22
This commit is contained in:
James E. Blair 2016-06-01 10:59:14 -07:00
parent ab46145594
commit 8fc762bc5e
1 changed files with 11 additions and 22 deletions

View File

@ -14,7 +14,6 @@
import json import json
import logging import logging
import multiprocessing
import os import os
import re import re
import shutil import shutil
@ -23,8 +22,8 @@ import socket
import subprocess import subprocess
import tempfile import tempfile
import threading import threading
import time
import traceback import traceback
import Queue
import uuid import uuid
import gear import gear
@ -79,14 +78,10 @@ class LaunchServer(object):
self.keep_jobdir = keep_jobdir self.keep_jobdir = keep_jobdir
self.hostname = socket.gethostname() self.hostname = socket.gethostname()
self.node_workers = {} self.node_workers = {}
# This has the side effect of creating the logger; our logging self.jobs = {}
# config will handle the rest. self.builds = {}
multiprocessing.get_logger() self.zmq_send_queue = Queue.Queue()
self.mpmanager = multiprocessing.Manager() self.termination_queue = Queue.Queue()
self.jobs = self.mpmanager.dict()
self.builds = self.mpmanager.dict()
self.zmq_send_queue = multiprocessing.JoinableQueue()
self.termination_queue = multiprocessing.JoinableQueue()
self.sites = {} self.sites = {}
self.static_nodes = {} self.static_nodes = {}
if config.has_option('launcher', 'accept-nodes'): if config.has_option('launcher', 'accept-nodes'):
@ -188,11 +183,6 @@ class LaunchServer(object):
self.gearman_thread.daemon = True self.gearman_thread.daemon = True
self.gearman_thread.start() self.gearman_thread.start()
# FIXME: Without this, sometimes the subprocess module does
# not actually launch any subprocesses. I have no
# explanation. -corvus
time.sleep(1)
# Start static workers # Start static workers
for node in self.static_nodes.values(): for node in self.static_nodes.values():
self.log.debug("Creating static node with arguments: %s" % (node,)) self.log.debug("Creating static node with arguments: %s" % (node,))
@ -320,8 +310,8 @@ class LaunchServer(object):
self.termination_queue, self.keep_jobdir) self.termination_queue, self.keep_jobdir)
self.node_workers[worker.name] = worker self.node_workers[worker.name] = worker
worker.process = multiprocessing.Process(target=worker.run) worker.thread = threading.Thread(target=worker.run)
worker.process.start() worker.thread.start()
def stopJob(self, job): def stopJob(self, job):
try: try:
@ -358,7 +348,7 @@ class LaunchServer(object):
continue continue
worker = self.node_workers[item] worker = self.node_workers[item]
self.log.debug("Joining %s" % (item,)) self.log.debug("Joining %s" % (item,))
worker.process.join() worker.thread.join()
self.log.debug("Joined %s" % (item,)) self.log.debug("Joined %s" % (item,))
del self.node_workers[item] del self.node_workers[item]
except Exception: except Exception:
@ -384,10 +374,10 @@ class NodeWorker(object):
if not isinstance(labels, list): if not isinstance(labels, list):
labels = [labels] labels = [labels]
self.labels = labels self.labels = labels
self.process = None self.thread = None
self.registered_functions = set() self.registered_functions = set()
self._running = True self._running = True
self.queue = multiprocessing.JoinableQueue() self.queue = Queue.Queue()
self.manager_name = manager_name self.manager_name = manager_name
self.zmq_send_queue = zmq_send_queue self.zmq_send_queue = zmq_send_queue
self.termination_queue = termination_queue self.termination_queue = termination_queue
@ -408,12 +398,11 @@ class NodeWorker(object):
def isAlive(self): def isAlive(self):
# Meant to be called from the manager # Meant to be called from the manager
if self.process and self.process.is_alive(): if self.thread and self.thread.is_alive():
return True return True
return False return False
def run(self): def run(self):
signal.signal(signal.SIGINT, signal.SIG_IGN)
self.log.debug("Node worker %s starting" % (self.name,)) self.log.debug("Node worker %s starting" % (self.name,))
server = self.config.get('gearman', 'server') server = self.config.get('gearman', 'server')
if self.config.has_option('gearman', 'port'): if self.config.has_option('gearman', 'port'):