Ansible launcher: support static workers

Change-Id: I7775124f89be94dc184586151e371ab1910ca0f1
This commit is contained in:
James E. Blair 2016-05-26 07:17:43 -07:00
parent 978601149f
commit fc47fec61d
1 changed files with 59 additions and 12 deletions

View File

@ -23,6 +23,7 @@ import socket
import subprocess
import tempfile
import threading
import time
import traceback
import uuid
@ -36,6 +37,12 @@ import zuul.ansible.library
import zuul.ansible.plugins.callback_plugins
def boolify(x):
if isinstance(x, str):
return bool(int(x))
return bool(x)
class JobDir(object):
def __init__(self, keep=False):
self.keep = keep
@ -63,7 +70,8 @@ class JobDir(object):
class LaunchServer(object):
log = logging.getLogger("zuul.LaunchServer")
section_re = re.compile('site "(.*?)"')
site_section_re = re.compile('site "(.*?)"')
node_section_re = re.compile('node "(.*?)"')
def __init__(self, config, keep_jobdir=False):
self.config = config
@ -76,17 +84,45 @@ class LaunchServer(object):
self.zmq_send_queue = multiprocessing.JoinableQueue()
self.termination_queue = multiprocessing.JoinableQueue()
self.sites = {}
self.static_nodes = {}
if config.has_option('launcher', 'accept-nodes'):
self.accept_nodes = config.get('launcher', 'accept-nodes')
else:
self.accept_nodes = True
for section in config.sections():
m = self.section_re.match(section)
m = self.site_section_re.match(section)
if m:
sitename = m.group(1)
d = {}
d['host'] = config.get(section, 'host')
d['user'] = config.get(section, 'user')
d['pass'] = config.get(section, 'pass', '')
d['root'] = config.get(section, 'root', '/')
if config.has_option(section, 'pass'):
d['pass'] = config.get(section, 'pass')
else:
d['pass'] = ''
if config.has_option(section, 'root'):
d['root'] = config.get(section, 'root')
else:
d['root'] = '/'
self.sites[sitename] = d
continue
m = self.node_section_re.match(section)
if m:
nodename = m.group(1)
d = {}
d['name'] = nodename
d['host'] = config.get(section, 'host')
if config.has_option(section, 'description'):
d['description'] = config.get(section, 'description')
else:
d['description'] = ''
if config.has_option(section, 'labels'):
d['labels'] = config.get(section, 'labels').split(',')
else:
d['labels'] = []
self.static_nodes[nodename] = d
continue
def start(self):
self._gearman_running = True
@ -132,6 +168,16 @@ class LaunchServer(object):
self.gearman_thread.daemon = True
self.gearman_thread.start()
# FIXME: Without this, sometimes the subprocess module does
# not actually launch any subprocesses. I have no
# explanation. -corvus
time.sleep(1)
# Start static workers
for node in self.static_nodes.values():
self.log.debug("Creating static node with arguments: %s" % (node,))
self._launchWorker(node)
def loadJobs(self):
self.log.debug("Loading jobs")
builder = JJB()
@ -147,7 +193,8 @@ class LaunchServer(object):
del self.jobs[name]
def register(self):
self.worker.registerFunction("node-assign:zuul")
if self.accept_nodes:
self.worker.registerFunction("node-assign:zuul")
self.worker.registerFunction("stop:%s" % self.hostname)
def reconfigure(self, config):
@ -220,6 +267,12 @@ class LaunchServer(object):
def assignNode(self, job):
args = json.loads(job.arguments)
self.log.debug("Assigned node with arguments: %s" % (args,))
self._launchWorker(args)
data = dict(manager=self.hostname)
job.sendWorkData(json.dumps(data))
job.sendWorkComplete()
def _launchWorker(self, args):
worker = NodeWorker(self.config, self.jobs, self.builds,
self.sites, args['name'], args['host'],
args['description'], args['labels'],
@ -230,10 +283,6 @@ class LaunchServer(object):
worker.process = multiprocessing.Process(target=worker.run)
worker.process.start()
data = dict(manager=self.hostname)
job.sendWorkData(json.dumps(data))
job.sendWorkComplete()
def stopJob(self, job):
try:
args = json.loads(job.arguments)
@ -458,9 +507,7 @@ class NodeWorker(object):
# Make sure we can parse what we need from the job first
args = json.loads(job.arguments)
# This may be configurable later, or we may choose to honor
# OFFLINE_NODE_WHEN_COMPLETE
offline = True
offline = boolify(args.get('OFFLINE_NODE_WHEN_COMPLETE', False))
job_name = job.name.split(':')[1]
# Initialize the result so we have something regardless of