Ansible launcher: support zuul stop commands

Change-Id: I02f43162fb9c5d691f0d24841c196f5cb5e2e43e
This commit is contained in:
James E. Blair 2016-04-21 11:26:02 -07:00 committed by James E. Blair
parent 08d7d4b49b
commit 19233fbff5
1 changed files with 50 additions and 7 deletions

View File

@ -65,6 +65,7 @@ class LaunchServer(object):
self.node_workers = {}
self.mpmanager = multiprocessing.Manager()
self.jobs = self.mpmanager.dict()
self.builds = self.mpmanager.dict()
self.zmq_send_queue = multiprocessing.JoinableQueue()
self.termination_queue = multiprocessing.JoinableQueue()
self.sites = {}
@ -138,6 +139,7 @@ class LaunchServer(object):
def register(self):
self.worker.registerFunction("node-assign:zuul")
self.worker.registerFunction("stop:%s" % self.hostname)
def reconfigure(self, config):
self.log.debug("Reconfiguring")
@ -189,8 +191,11 @@ class LaunchServer(object):
job = self.worker.getJob()
try:
if job.name.startswith('node-assign:'):
self.log.debug("Got assign-node job: %s" % job.unique)
self.log.debug("Got node-assign job: %s" % job.unique)
self.assignNode(job)
elif job.name.startswith('stop:'):
self.log.debug("Got stop job: %s" % job.unique)
self.stopJob(job)
else:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
@ -205,8 +210,8 @@ class LaunchServer(object):
def assignNode(self, job):
args = json.loads(job.arguments)
self.log.debug("Assigned node with arguments: %s" % (args,))
worker = NodeWorker(self.config, self.jobs, self.sites,
args['name'], args['host'],
worker = NodeWorker(self.config, self.jobs, self.builds,
self.sites, args['name'], args['host'],
args['description'], args['labels'],
self.hostname, self.zmq_send_queue,
self.termination_queue)
@ -219,6 +224,31 @@ class LaunchServer(object):
job.sendWorkData(json.dumps(data))
job.sendWorkComplete()
def stopJob(self, job):
try:
args = json.loads(job.arguments)
self.log.debug("Stop job with arguments: %s" % (args,))
unique = args['number']
build_worker_name = self.builds.get(unique)
if not build_worker_name:
self.log.debug("Unable to find build for job %s" % (unique,))
return
node = self.node_workers.get(build_worker_name)
if not node:
self.log.debug("Unable to find worker for job %s" % (unique,))
return
try:
if node.isAlive():
node.queue.put(dict(action='abort'))
else:
self.log.debug("Node %s is not alive while aborting job" %
(node.name,))
except Exception:
self.log.exception("Exception sending abort command "
"to worker:")
finally:
job.sendWorkComplete()
def runReaper(self):
# We don't actually care if all the events are processed
while self._reaper_running:
@ -238,11 +268,13 @@ class LaunchServer(object):
class NodeWorker(object):
log = logging.getLogger("zuul.NodeWorker")
def __init__(self, config, jobs, sites, name, host, description, labels,
manager_name, zmq_send_queue, termination_queue):
def __init__(self, config, jobs, builds, sites, name, host,
description, labels, manager_name, zmq_send_queue,
termination_queue):
self.log.debug("Creating node worker %s" % (name,))
self.config = config
self.jobs = jobs
self.builds = builds
self.sites = sites
self.name = name
self.host = host
@ -332,7 +364,11 @@ class NodeWorker(object):
self._job_complete_event.wait()
self.worker.shutdown()
elif item['action'] == 'reconfigure':
self.log.debug("Received reconfigure request")
self.register()
elif item['action'] == 'abort':
self.log.debug("Received abort request")
self.abortRunningJob()
finally:
self.queue.task_done()
@ -450,6 +486,11 @@ class NodeWorker(object):
except Exception:
self.log.exception("Exception while sending job completion event")
try:
del self.builds[job.unique]
except Exception:
self.log.exception("Exception while clearing build record")
self._job_complete_event.set()
if offline and self._running:
self.stop()
@ -496,6 +537,7 @@ class NodeWorker(object):
self._job_complete_event.clear()
self.log.debug("Job %s: beginning" % (job.unique,))
self.builds[job.unique] = self.name
with JobDir() as jobdir:
self.log.debug("Job %s: job root at %s" %
(job.unique, jobdir.root))
@ -505,8 +547,9 @@ class NodeWorker(object):
self._job_start_time = time.time()
data = {
'url': 'https://server/job',
'number': 1
'manager': self.manager_name,
'number': job.unique,
# 'url': '',
}
job.sendWorkData(json.dumps(data))
job.sendWorkStatus(0, 100)