Stub out aborting jobs in ansible launcher
The scheduler tests need to be able to abort jobs. Even though our Ansible launcher is not capable of aborting jobs yet, implement the method so that we can override it in the RecordingLaunchServer and exercise the scheduler. Instead of using name+number to specify which build to abort, use the UUID of the build. Remove the build number entirely, since it isn't required. In some places it was used to determine whether the job had started. In those cases, use the build URL instead. Also correct the launch server used in tests so that it does not send two initial WORK_DATA packets. Change-Id: I75525a2b48eb2761d599c039ba3084f09609dfbe
This commit is contained in:
parent
f3156c9154
commit
173029714a
|
@ -483,8 +483,8 @@ class BuildHistory(object):
|
|||
self.__dict__.update(kw)
|
||||
|
||||
def __repr__(self):
|
||||
return ("<Completed build, result: %s name: %s #%s changes: %s>" %
|
||||
(self.result, self.name, self.number, self.changes))
|
||||
return ("<Completed build, result: %s name: %s uuid: %s changes: %s>" %
|
||||
(self.result, self.name, self.uuid, self.changes))
|
||||
|
||||
|
||||
class FakeURLOpener(object):
|
||||
|
@ -541,12 +541,12 @@ class FakeStatsd(threading.Thread):
|
|||
class FakeBuild(object):
|
||||
log = logging.getLogger("zuul.test")
|
||||
|
||||
def __init__(self, launch_server, job, number, node):
|
||||
def __init__(self, launch_server, job, node):
|
||||
self.daemon = True
|
||||
self.launch_server = launch_server
|
||||
self.job = job
|
||||
self.jobdir = None
|
||||
self.number = number
|
||||
self.uuid = job.unique
|
||||
self.node = node
|
||||
self.parameters = json.loads(job.arguments)
|
||||
self.unique = self.parameters['ZUUL_UUID']
|
||||
|
@ -591,26 +591,8 @@ class FakeBuild(object):
|
|||
self.wait_condition.release()
|
||||
|
||||
def run(self):
|
||||
data = {
|
||||
'url': 'https://server/job/%s/%s/' % (self.name, self.number),
|
||||
'name': self.name,
|
||||
'number': self.number,
|
||||
'manager': self.launch_server.worker.worker_id,
|
||||
'worker_name': 'My Worker',
|
||||
'worker_hostname': 'localhost',
|
||||
'worker_ips': ['127.0.0.1', '192.168.1.1'],
|
||||
'worker_fqdn': 'zuul.example.org',
|
||||
'worker_program': 'FakeBuilder',
|
||||
'worker_version': 'v1.1',
|
||||
'worker_extra': {'something': 'else'}
|
||||
}
|
||||
|
||||
self.log.debug('Running build %s' % self.unique)
|
||||
|
||||
self.job.sendWorkData(json.dumps(data))
|
||||
self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
|
||||
self.job.sendWorkStatus(0, 100)
|
||||
|
||||
if self.launch_server.hold_jobs_in_build:
|
||||
self.log.debug('Holding build %s' % self.unique)
|
||||
self._wait()
|
||||
|
@ -680,8 +662,6 @@ class RecordingLaunchServer(zuul.launcher.server.LaunchServer):
|
|||
self.lock = threading.Lock()
|
||||
self.running_builds = []
|
||||
self.build_history = []
|
||||
self._build_counter_lock = threading.Lock()
|
||||
self.build_counter = 0
|
||||
self.fail_tests = {}
|
||||
self.job_builds = {}
|
||||
|
||||
|
@ -720,16 +700,23 @@ class RecordingLaunchServer(zuul.launcher.server.LaunchServer):
|
|||
self.log.debug("Done releasing builds %s (%s)" %
|
||||
(regex, len(self.running_builds)))
|
||||
|
||||
def launch(self, job):
|
||||
with self._build_counter_lock:
|
||||
self.build_counter += 1
|
||||
build_counter = self.build_counter
|
||||
def launchJob(self, job):
|
||||
node = None
|
||||
build = FakeBuild(self, job, build_counter, node)
|
||||
build = FakeBuild(self, job, node)
|
||||
job.build = build
|
||||
self.running_builds.append(build)
|
||||
self.job_builds[job.unique] = build
|
||||
super(RecordingLaunchServer, self).launch(job)
|
||||
super(RecordingLaunchServer, self).launchJob(job)
|
||||
|
||||
def stopJob(self, job):
|
||||
self.log.debug("handle stop")
|
||||
parameters = json.loads(job.arguments)
|
||||
uuid = parameters['uuid']
|
||||
for build in self.running_builds:
|
||||
if build.unique == uuid:
|
||||
build.aborted = True
|
||||
build.release()
|
||||
super(RecordingLaunchServer, self).stopJob(job)
|
||||
|
||||
def runAnsible(self, jobdir, job):
|
||||
build = self.job_builds[job.unique]
|
||||
|
@ -742,9 +729,9 @@ class RecordingLaunchServer(zuul.launcher.server.LaunchServer):
|
|||
|
||||
self.lock.acquire()
|
||||
self.build_history.append(
|
||||
BuildHistory(name=build.name, number=build.number,
|
||||
result=result, changes=build.changes, node=build.node,
|
||||
uuid=build.unique, parameters=build.parameters,
|
||||
BuildHistory(name=build.name, result=result, changes=build.changes,
|
||||
node=build.node, uuid=build.unique,
|
||||
parameters=build.parameters,
|
||||
pipeline=build.parameters['ZUUL_PIPELINE'])
|
||||
)
|
||||
self.running_builds.remove(build)
|
||||
|
@ -1358,7 +1345,7 @@ class ZuulTestCase(BaseTestCase):
|
|||
return False
|
||||
if server_job.waiting:
|
||||
continue
|
||||
if build.number is None:
|
||||
if build.url is None:
|
||||
self.log.debug("%s has not reported start" % build)
|
||||
return False
|
||||
worker_build = self.launch_server.job_builds.get(server_job.unique)
|
||||
|
|
|
@ -390,6 +390,7 @@ class LaunchClient(object):
|
|||
gearman_job = gear.Job('launcher:launch', json.dumps(params),
|
||||
unique=uuid)
|
||||
build.__gearman_job = gearman_job
|
||||
build.__gearman_manager = None
|
||||
self.builds[uuid] = build
|
||||
|
||||
if pipeline.precedence == zuul.model.PRECEDENCE_NORMAL:
|
||||
|
@ -428,7 +429,8 @@ class LaunchClient(object):
|
|||
self.log.debug("Build %s has no associated gearman job" % build)
|
||||
return
|
||||
|
||||
if build.number is not None:
|
||||
# TODOv3(jeblair): make a nicer way of recording build start.
|
||||
if build.url is not None:
|
||||
self.log.debug("Build %s has already started" % build)
|
||||
self.cancelRunningBuild(build)
|
||||
self.log.debug("Canceled running build %s" % build)
|
||||
|
@ -444,7 +446,7 @@ class LaunchClient(object):
|
|||
time.sleep(1)
|
||||
|
||||
self.log.debug("Still unable to find build %s to cancel" % build)
|
||||
if build.number:
|
||||
if build.url:
|
||||
self.log.debug("Build %s has just started" % build)
|
||||
self.log.debug("Canceled running build %s" % build)
|
||||
self.cancelRunningBuild(build)
|
||||
|
@ -473,7 +475,7 @@ class LaunchClient(object):
|
|||
# internal dict after it's added to the report queue.
|
||||
del self.builds[job.unique]
|
||||
else:
|
||||
if not job.name.startswith("stop:"):
|
||||
if not job.name.startswith("launcher:stop:"):
|
||||
self.log.error("Unable to find build %s" % job.unique)
|
||||
|
||||
def onWorkStatus(self, job):
|
||||
|
@ -481,14 +483,14 @@ class LaunchClient(object):
|
|||
self.log.debug("Build %s update %s" % (job, data))
|
||||
build = self.builds.get(job.unique)
|
||||
if build:
|
||||
started = (build.url is not None)
|
||||
# Allow URL to be updated
|
||||
build.url = data.get('url') or build.url
|
||||
build.url = data.get('url', build.url)
|
||||
# Update information about worker
|
||||
build.worker.updateFromData(data)
|
||||
|
||||
if build.number is None:
|
||||
if not started:
|
||||
self.log.info("Build %s started" % job)
|
||||
build.number = data.get('number')
|
||||
build.__gearman_manager = data.get('manager')
|
||||
self.sched.onBuildStarted(build)
|
||||
else:
|
||||
|
@ -518,10 +520,12 @@ class LaunchClient(object):
|
|||
return False
|
||||
|
||||
def cancelRunningBuild(self, build):
|
||||
if not build.__gearman_manager:
|
||||
self.log.error("Build %s has no manager while canceling" %
|
||||
(build,))
|
||||
stop_uuid = str(uuid4().hex)
|
||||
data = dict(name=build.job.name,
|
||||
number=build.number)
|
||||
stop_job = gear.Job("stop:%s" % build.__gearman_manager,
|
||||
data = dict(uuid=build.__gearman_job.unique)
|
||||
stop_job = gear.Job("launcher:stop:%s" % build.__gearman_manager,
|
||||
json.dumps(data), unique=stop_uuid)
|
||||
self.meta_jobs[stop_uuid] = stop_job
|
||||
self.log.debug("Submitting stop job: %s", stop_job)
|
||||
|
|
|
@ -17,6 +17,7 @@ import json
|
|||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import socket
|
||||
import subprocess
|
||||
import tempfile
|
||||
import threading
|
||||
|
@ -113,6 +114,9 @@ class LaunchServer(object):
|
|||
|
||||
def __init__(self, config, connections={}):
|
||||
self.config = config
|
||||
# TODOv3(mordred): make the launcher name more unique --
|
||||
# perhaps hostname+pid.
|
||||
self.hostname = socket.gethostname()
|
||||
self.zuul_url = config.get('merger', 'zuul_url')
|
||||
|
||||
if self.config.has_option('merger', 'git_dir'):
|
||||
|
@ -161,7 +165,7 @@ class LaunchServer(object):
|
|||
|
||||
def register(self):
|
||||
self.worker.registerFunction("launcher:launch")
|
||||
# TODOv3: abort
|
||||
self.worker.registerFunction("launcher:stop:%s" % self.hostname)
|
||||
self.worker.registerFunction("merger:merge")
|
||||
self.worker.registerFunction("merger:cat")
|
||||
|
||||
|
@ -204,7 +208,10 @@ class LaunchServer(object):
|
|||
try:
|
||||
if job.name == 'launcher:launch':
|
||||
self.log.debug("Got launch job: %s" % job.unique)
|
||||
self.launch(job)
|
||||
self.launchJob(job)
|
||||
elif job.name.startswith('launcher:stop'):
|
||||
self.log.debug("Got stop job: %s" % job.unique)
|
||||
self.stopJob(job)
|
||||
elif job.name == 'merger:cat':
|
||||
self.log.debug("Got cat job: %s" % job.unique)
|
||||
self.cat(job)
|
||||
|
@ -220,7 +227,7 @@ class LaunchServer(object):
|
|||
except Exception:
|
||||
self.log.exception("Exception while getting job")
|
||||
|
||||
def launch(self, job):
|
||||
def launchJob(self, job):
|
||||
thread = threading.Thread(target=self._launch, args=(job,))
|
||||
thread.start()
|
||||
|
||||
|
@ -243,18 +250,35 @@ class LaunchServer(object):
|
|||
|
||||
# TODOv3: Ansible the ansible thing here.
|
||||
self.prepareAnsibleFiles(jobdir, args)
|
||||
result = self.runAnsible(jobdir, job)
|
||||
|
||||
data = {
|
||||
'manager': self.hostname,
|
||||
'url': 'https://server/job',
|
||||
'number': 1
|
||||
}
|
||||
|
||||
# TODOv3:
|
||||
# 'name': self.name,
|
||||
# 'manager': self.launch_server.hostname,
|
||||
# 'worker_name': 'My Worker',
|
||||
# 'worker_hostname': 'localhost',
|
||||
# 'worker_ips': ['127.0.0.1', '192.168.1.1'],
|
||||
# 'worker_fqdn': 'zuul.example.org',
|
||||
# 'worker_program': 'FakeBuilder',
|
||||
# 'worker_version': 'v1.1',
|
||||
# 'worker_extra': {'something': 'else'}
|
||||
|
||||
job.sendWorkData(json.dumps(data))
|
||||
job.sendWorkStatus(0, 100)
|
||||
|
||||
result = self.runAnsible(jobdir, job)
|
||||
|
||||
result = dict(result=result)
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def stopJob(self, job):
|
||||
# TODOv3: implement.
|
||||
job.sendWorkComplete()
|
||||
|
||||
def getHostList(self, args):
|
||||
# TODOv3: This should get the appropriate nodes from nodepool,
|
||||
# or in the unit tests, be overriden to return localhost.
|
||||
|
|
|
@ -487,7 +487,6 @@ class Build(object):
|
|||
self.job = job
|
||||
self.uuid = uuid
|
||||
self.url = None
|
||||
self.number = None
|
||||
self.result = None
|
||||
self.build_set = None
|
||||
self.launch_time = time.time()
|
||||
|
@ -975,7 +974,6 @@ class QueueItem(object):
|
|||
'pipeline': build.pipeline.name if build else None,
|
||||
'canceled': build.canceled if build else None,
|
||||
'retry': build.retry if build else None,
|
||||
'number': build.number if build else None,
|
||||
'node_labels': build.node_labels if build else [],
|
||||
'node_name': build.node_name if build else None,
|
||||
'worker': worker,
|
||||
|
|
Loading…
Reference in New Issue