Move job running into own class
Help keep the state of each job the launcher is managing in its own class. This will make stopping, pre/post playbooks and handling failures easier. Change-Id: I8fe77025ca443adcc5c8ca61f3a6b3abde0ba690
This commit is contained in:
parent
d130f718b0
commit
50c2178243
|
@ -728,7 +728,8 @@ class RecordingLaunchServer(zuul.launcher.server.LaunchServer):
|
|||
args = json.loads(job.arguments)
|
||||
args['zuul']['_test'] = dict(test_root=self._test_root)
|
||||
job.arguments = json.dumps(args)
|
||||
super(RecordingLaunchServer, self).launchJob(job)
|
||||
self.job_workers[job.unique] = RecordingAnsibleJob(self, job)
|
||||
self.job_workers[job.unique].run()
|
||||
|
||||
def stopJob(self, job):
|
||||
self.log.debug("handle stop")
|
||||
|
@ -740,25 +741,27 @@ class RecordingLaunchServer(zuul.launcher.server.LaunchServer):
|
|||
build.release()
|
||||
super(RecordingLaunchServer, self).stopJob(job)
|
||||
|
||||
def runAnsible(self, jobdir, job):
|
||||
build = self.job_builds[job.unique]
|
||||
|
||||
class RecordingAnsibleJob(zuul.launcher.server.AnsibleJob):
|
||||
def runAnsible(self, jobdir):
|
||||
build = self.launcher_server.job_builds[self.job.unique]
|
||||
build.jobdir = jobdir
|
||||
|
||||
if self._run_ansible:
|
||||
result = super(RecordingLaunchServer, self).runAnsible(jobdir, job)
|
||||
if self.launcher_server._run_ansible:
|
||||
result = super(RecordingAnsibleJob, self).runAnsible(jobdir)
|
||||
else:
|
||||
result = build.run()
|
||||
|
||||
self.lock.acquire()
|
||||
self.build_history.append(
|
||||
self.launcher_server.lock.acquire()
|
||||
self.launcher_server.build_history.append(
|
||||
BuildHistory(name=build.name, result=result, changes=build.changes,
|
||||
node=build.node, uuid=build.unique,
|
||||
parameters=build.parameters,
|
||||
pipeline=build.parameters['ZUUL_PIPELINE'])
|
||||
)
|
||||
self.running_builds.remove(build)
|
||||
del self.job_builds[job.unique]
|
||||
self.lock.release()
|
||||
self.launcher_server.running_builds.remove(build)
|
||||
del self.launcher_server.job_builds[self.job.unique]
|
||||
self.launcher_server.lock.release()
|
||||
if build.run_error:
|
||||
result = None
|
||||
return result
|
||||
|
|
|
@ -182,10 +182,6 @@ class LaunchServer(object):
|
|||
self.merge_name = self.config.get('merger', 'git_user_name')
|
||||
else:
|
||||
self.merge_name = None
|
||||
if self.config.has_option('launcher', 'private_key_file'):
|
||||
self.private_key_file = config.get('launcher', 'private_key_file')
|
||||
else:
|
||||
self.private_key_file = '~/.ssh/id_rsa'
|
||||
|
||||
self.connections = connections
|
||||
self.merger = self._getMerger(self.merge_root)
|
||||
|
@ -208,6 +204,8 @@ class LaunchServer(object):
|
|||
for fn in os.listdir(library_path):
|
||||
shutil.copy(os.path.join(library_path, fn), self.library_dir)
|
||||
|
||||
self.job_workers = {}
|
||||
|
||||
def _getMerger(self, root):
|
||||
return zuul.merger.merger.Merger(root, self.connections,
|
||||
self.merge_email, self.merge_name)
|
||||
|
@ -337,24 +335,88 @@ class LaunchServer(object):
|
|||
self.log.exception("Exception while getting job")
|
||||
|
||||
def launchJob(self, job):
|
||||
thread = threading.Thread(target=self._launch, args=(job,))
|
||||
thread.start()
|
||||
self.job_workers[job.unique] = AnsibleJob(self, job)
|
||||
self.job_workers[job.unique].run()
|
||||
|
||||
def _launch(self, job):
|
||||
self.log.debug("Job %s: beginning" % (job.unique,))
|
||||
def finishJob(self, unique):
|
||||
del(self.job_workers[unique])
|
||||
|
||||
def stopJob(self, job):
|
||||
# TODOv3: implement.
|
||||
job.sendWorkComplete()
|
||||
|
||||
def cat(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
task = self.update(args['project'], args['url'])
|
||||
task.wait()
|
||||
files = self.merger.getFiles(args['project'], args['url'],
|
||||
args['branch'], args['files'])
|
||||
result = dict(updated=True,
|
||||
files=files,
|
||||
zuul_url=self.zuul_url)
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def merge(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
ret = self.merger.mergeChanges(args['items'], args.get('files'))
|
||||
result = dict(merged=(ret is not None),
|
||||
zuul_url=self.zuul_url)
|
||||
if args.get('files'):
|
||||
result['commit'], result['files'] = ret
|
||||
else:
|
||||
result['commit'] = ret
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
|
||||
class AnsibleJob(object):
|
||||
log = logging.getLogger("zuul.AnsibleJob")
|
||||
|
||||
def __init__(self, launcher_server, job):
|
||||
self.launcher_server = launcher_server
|
||||
self.job = job
|
||||
self.main_proc = None
|
||||
self.running = False
|
||||
|
||||
if self.launcher_server.config.has_option(
|
||||
'launcher', 'private_key_file'):
|
||||
self.private_key_file = self.launcher_server.config.get(
|
||||
'launcher', 'private_key_file')
|
||||
else:
|
||||
self.private_key_file = '~/.ssh/id_rsa'
|
||||
|
||||
def run(self):
|
||||
self.running = True
|
||||
self.thread = threading.Thread(target=self.launch)
|
||||
self.thread.start()
|
||||
|
||||
def stop(self):
|
||||
self.abortRunningProc(self.main_proc)
|
||||
self.thread.join()
|
||||
|
||||
def launch(self):
|
||||
try:
|
||||
self._launch()
|
||||
finally:
|
||||
self.running = False
|
||||
self.launcher_server.finishJob(self.job.unique)
|
||||
|
||||
def _launch(self):
|
||||
self.log.debug("Job %s: beginning" % (self.job.unique,))
|
||||
with JobDir() as jobdir:
|
||||
self.log.debug("Job %s: job root at %s" %
|
||||
(job.unique, jobdir.root))
|
||||
args = json.loads(job.arguments)
|
||||
(self.job.unique, jobdir.root))
|
||||
args = json.loads(self.job.arguments)
|
||||
tasks = []
|
||||
for project in args['projects']:
|
||||
self.log.debug("Job %s: updating project %s" %
|
||||
(job.unique, project['name']))
|
||||
tasks.append(self.update(project['name'], project['url']))
|
||||
(self.job.unique, project['name']))
|
||||
tasks.append(self.launcher_server.update(
|
||||
project['name'], project['url']))
|
||||
for task in tasks:
|
||||
task.wait()
|
||||
self.log.debug("Job %s: git updates complete" % (job.unique,))
|
||||
merger = self._getMerger(jobdir.git_root)
|
||||
|
||||
self.log.debug("Job %s: git updates complete" % (self.job.unique,))
|
||||
merger = self.launcher_server._getMerger(jobdir.git_root)
|
||||
merge_items = [i for i in args['items'] if i.get('refspec')]
|
||||
if merge_items:
|
||||
commit = merger.mergeChanges(merge_items) # noqa
|
||||
|
@ -368,7 +430,7 @@ class LaunchServer(object):
|
|||
self.prepareAnsibleFiles(jobdir, args)
|
||||
|
||||
data = {
|
||||
'manager': self.hostname,
|
||||
'manager': self.launcher_server.hostname,
|
||||
'url': 'https://server/job/{}/0/'.format(args['job']),
|
||||
'worker_name': 'My Worker',
|
||||
}
|
||||
|
@ -384,19 +446,15 @@ class LaunchServer(object):
|
|||
# 'worker_version': 'v1.1',
|
||||
# 'worker_extra': {'something': 'else'}
|
||||
|
||||
job.sendWorkData(json.dumps(data))
|
||||
job.sendWorkStatus(0, 100)
|
||||
self.job.sendWorkData(json.dumps(data))
|
||||
self.job.sendWorkStatus(0, 100)
|
||||
|
||||
result = self.runAnsible(jobdir, job)
|
||||
result = self.runAnsible(jobdir)
|
||||
if result is None:
|
||||
job.sendWorkFail()
|
||||
return
|
||||
result = dict(result=result)
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def stopJob(self, job):
|
||||
# TODOv3: implement.
|
||||
job.sendWorkComplete()
|
||||
self.job.sendWorkFail()
|
||||
else:
|
||||
result = dict(result=result)
|
||||
self.job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def getHostList(self, args):
|
||||
# TODOv3: the localhost addition is temporary so we have
|
||||
|
@ -419,7 +477,8 @@ class LaunchServer(object):
|
|||
# Check out the playbook repo if needed and return the path to
|
||||
# the playbook that should be run.
|
||||
playbook = args['playbook']
|
||||
source = self.connections.getSource(playbook['connection'])
|
||||
source = self.launcher_server.connections.getSource(
|
||||
playbook['connection'])
|
||||
project = source.getProject(playbook['project'])
|
||||
# TODO(jeblair): construct the url in the merger itself
|
||||
url = source.getGitUrl(project)
|
||||
|
@ -439,7 +498,7 @@ class LaunchServer(object):
|
|||
# the stack of changes we are testing, so check out the branch
|
||||
# tip into a dedicated space.
|
||||
|
||||
merger = self._getMerger(jobdir.playbook_root)
|
||||
merger = self.launcher_server._getMerger(jobdir.playbook_root)
|
||||
merger.checkoutBranch(project.name, url, playbook['branch'])
|
||||
|
||||
path = os.path.join(jobdir.playbook_root,
|
||||
|
@ -468,7 +527,8 @@ class LaunchServer(object):
|
|||
config.write('retry_files_enabled = False\n')
|
||||
config.write('log_path = %s\n' % jobdir.ansible_log)
|
||||
config.write('gathering = explicit\n')
|
||||
config.write('library = %s\n' % self.library_dir)
|
||||
config.write('library = %s\n'
|
||||
% self.launcher_server.library_dir)
|
||||
# bump the timeout because busy nodes may take more than
|
||||
# 10s to respond
|
||||
config.write('timeout = 30\n')
|
||||
|
@ -504,8 +564,7 @@ class LaunchServer(object):
|
|||
"ansible process:")
|
||||
return aborted
|
||||
|
||||
def runAnsible(self, jobdir, job):
|
||||
# Job is included here for the benefit of the test framework.
|
||||
def runAnsible(self, jobdir):
|
||||
env_copy = os.environ.copy()
|
||||
env_copy['LOGNAME'] = 'zuul'
|
||||
|
||||
|
@ -518,7 +577,7 @@ class LaunchServer(object):
|
|||
'-e@%s' % jobdir.vars, verbose]
|
||||
self.log.debug("Ansible command: %s" % (cmd,))
|
||||
# TODOv3: verbose
|
||||
proc = subprocess.Popen(
|
||||
self.main_proc = subprocess.Popen(
|
||||
cmd,
|
||||
cwd=jobdir.ansible_root,
|
||||
stdout=subprocess.PIPE,
|
||||
|
@ -532,14 +591,13 @@ class LaunchServer(object):
|
|||
timeout = 60
|
||||
watchdog = Watchdog(timeout + ANSIBLE_WATCHDOG_GRACE,
|
||||
self._ansibleTimeout,
|
||||
(proc,
|
||||
"Ansible timeout exceeded"))
|
||||
(self.main_proc, "Ansible timeout exceeded"))
|
||||
watchdog.start()
|
||||
try:
|
||||
for line in iter(proc.stdout.readline, b''):
|
||||
for line in iter(self.main_proc.stdout.readline, b''):
|
||||
line = line[:1024].rstrip()
|
||||
self.log.debug("Ansible output: %s" % (line,))
|
||||
ret = proc.wait()
|
||||
ret = self.main_proc.wait()
|
||||
finally:
|
||||
watchdog.stop()
|
||||
self.log.debug("Ansible exit code: %s" % (ret,))
|
||||
|
@ -557,25 +615,3 @@ class LaunchServer(object):
|
|||
if ret == 0:
|
||||
return 'SUCCESS'
|
||||
return 'FAILURE'
|
||||
|
||||
def cat(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
task = self.update(args['project'], args['url'])
|
||||
task.wait()
|
||||
files = self.merger.getFiles(args['project'], args['url'],
|
||||
args['branch'], args['files'])
|
||||
result = dict(updated=True,
|
||||
files=files,
|
||||
zuul_url=self.zuul_url)
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def merge(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
ret = self.merger.mergeChanges(args['items'], args.get('files'))
|
||||
result = dict(merged=(ret is not None),
|
||||
zuul_url=self.zuul_url)
|
||||
if args.get('files'):
|
||||
result['commit'], result['files'] = ret
|
||||
else:
|
||||
result['commit'] = ret
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
|
Loading…
Reference in New Issue