diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py index 62d7fc02f3..4e652d26f8 100644 --- a/zuul/launcher/ansiblelaunchserver.py +++ b/zuul/launcher/ansiblelaunchserver.py @@ -65,6 +65,7 @@ class LaunchServer(object): self.node_workers = {} self.mpmanager = multiprocessing.Manager() self.jobs = self.mpmanager.dict() + self.builds = self.mpmanager.dict() self.zmq_send_queue = multiprocessing.JoinableQueue() self.termination_queue = multiprocessing.JoinableQueue() self.sites = {} @@ -138,6 +139,7 @@ class LaunchServer(object): def register(self): self.worker.registerFunction("node-assign:zuul") + self.worker.registerFunction("stop:%s" % self.hostname) def reconfigure(self, config): self.log.debug("Reconfiguring") @@ -189,8 +191,11 @@ class LaunchServer(object): job = self.worker.getJob() try: if job.name.startswith('node-assign:'): - self.log.debug("Got assign-node job: %s" % job.unique) + self.log.debug("Got node-assign job: %s" % job.unique) self.assignNode(job) + elif job.name.startswith('stop:'): + self.log.debug("Got stop job: %s" % job.unique) + self.stopJob(job) else: self.log.error("Unable to handle job %s" % job.name) job.sendWorkFail() @@ -205,8 +210,8 @@ class LaunchServer(object): def assignNode(self, job): args = json.loads(job.arguments) self.log.debug("Assigned node with arguments: %s" % (args,)) - worker = NodeWorker(self.config, self.jobs, self.sites, - args['name'], args['host'], + worker = NodeWorker(self.config, self.jobs, self.builds, + self.sites, args['name'], args['host'], args['description'], args['labels'], self.hostname, self.zmq_send_queue, self.termination_queue) @@ -219,6 +224,31 @@ class LaunchServer(object): job.sendWorkData(json.dumps(data)) job.sendWorkComplete() + def stopJob(self, job): + try: + args = json.loads(job.arguments) + self.log.debug("Stop job with arguments: %s" % (args,)) + unique = args['number'] + build_worker_name = self.builds.get(unique) + if not build_worker_name: + self.log.debug("Unable to find build for job %s" % (unique,)) + return + node = self.node_workers.get(build_worker_name) + if not node: + self.log.debug("Unable to find worker for job %s" % (unique,)) + return + try: + if node.isAlive(): + node.queue.put(dict(action='abort')) + else: + self.log.debug("Node %s is not alive while aborting job" % + (node.name,)) + except Exception: + self.log.exception("Exception sending abort command " + "to worker:") + finally: + job.sendWorkComplete() + def runReaper(self): # We don't actually care if all the events are processed while self._reaper_running: @@ -238,11 +268,13 @@ class LaunchServer(object): class NodeWorker(object): log = logging.getLogger("zuul.NodeWorker") - def __init__(self, config, jobs, sites, name, host, description, labels, - manager_name, zmq_send_queue, termination_queue): + def __init__(self, config, jobs, builds, sites, name, host, + description, labels, manager_name, zmq_send_queue, + termination_queue): self.log.debug("Creating node worker %s" % (name,)) self.config = config self.jobs = jobs + self.builds = builds self.sites = sites self.name = name self.host = host @@ -332,7 +364,11 @@ class NodeWorker(object): self._job_complete_event.wait() self.worker.shutdown() elif item['action'] == 'reconfigure': + self.log.debug("Received reconfigure request") self.register() + elif item['action'] == 'abort': + self.log.debug("Received abort request") + self.abortRunningJob() finally: self.queue.task_done() @@ -450,6 +486,11 @@ class NodeWorker(object): except Exception: self.log.exception("Exception while sending job completion event") + try: + del self.builds[job.unique] + except Exception: + self.log.exception("Exception while clearing build record") + self._job_complete_event.set() if offline and self._running: self.stop() @@ -496,6 +537,7 @@ class NodeWorker(object): self._job_complete_event.clear() self.log.debug("Job %s: beginning" % (job.unique,)) + self.builds[job.unique] = self.name with JobDir() as jobdir: self.log.debug("Job %s: job root at %s" % (job.unique, jobdir.root)) @@ -505,8 +547,9 @@ class NodeWorker(object): self._job_start_time = time.time() data = { - 'url': 'https://server/job', - 'number': 1 + 'manager': self.manager_name, + 'number': job.unique, + # 'url': '', } job.sendWorkData(json.dumps(data)) job.sendWorkStatus(0, 100)