Support job pause

There are some use cases where it would be very useful to have some
resources that stay during the livetime of the buildset. E.g.:

* Spinning up a docker registry for child jobs that automatically gets
  cleaned up after a test run.

* Efficiently caching intermediate build results that are needed in
  child jobs without needing and overloading a central system for
  this.

In order to provide this functionality a job now can pause itself
after the run phase and let its children run. After all children are
finished it will be resumed. This can be easily triggered in a job by
using the zuul_return module together with the ability to forward any
data via ansible variables to the children.

 - name: Pause job
   zuul_return:
     data:
       zuul:
         pause: true
       registry_ip: "{{ hostvars[groups.all[0]].ansible_host }}"

Change-Id: I1a078bf22b9d3f9a33bd1517e73234749932e1bf
This commit is contained in:
Tobias Henkel 2018-07-23 16:32:11 +02:00
parent 4cb9932a4f
commit b2451becaf
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
22 changed files with 420 additions and 6 deletions

View File

@ -708,6 +708,26 @@ Not all reporters currently support line comments (or all of the
features of line comments); in these cases, reporters will simply
ignore this data.
Pausing the job
~~~~~~~~~~~~~~~
A job can be paused after the run phase. In this case the child jobs can start
and the parent job stays paused until all child jobs are finished. This for
example can be useful to start a docker registry in a parent job that will be
used by the child job. To indicate that the job should be paused use
*zuul_return* to set the **zuul.pause** value. You still can at the same time
supply any arbitrary data to the child jobs. For example:
.. code-block:: yaml
tasks:
- zuul_return:
data:
zuul:
pause: true
registry_ip_address: "{{ hostvars[groups.all[0]].ansible_host }}"
.. _build_status:
Build Status

View File

@ -0,0 +1,6 @@
---
features:
- |
A job now can pause itself using :ref:`return_values` and let the child jobs
run until they are finished. This can be used to serve some service which
can be used by the child jobs.

View File

@ -0,0 +1,17 @@
- pipeline:
name: check
manager: independent
post-review: true
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
Verified: 1
failure:
gerrit:
Verified: -1
- job:
name: base
parent: null

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1,5 @@
- hosts: all
tasks:
- file:
path: "{{zuul.executor.log_root}}/../../../compile1-pause.txt"
state: absent

View File

@ -0,0 +1,15 @@
- hosts: all
tasks:
# Copy a file into the test root to indicate that this job is alive. This
# file will be removed in the post-run so a child job can check if the
# parent is alive and in pause state.
- copy:
content: "Built project"
dest: "{{zuul.executor.log_root}}/../../../compile1-pause.txt"
- name: Pause and let child run
zuul_return:
data:
zuul:
pause: true
compile1: test

View File

@ -0,0 +1,5 @@
- hosts: all
tasks:
- file:
path: "{{zuul.executor.log_root}}/../../../compile2-pause.txt"
state: absent

View File

@ -0,0 +1,15 @@
- hosts: all
tasks:
# Copy a file into the test root to indicate that this job is alive. This
# file will be removed in the post-run so a child job can check if the
# parent is alive and in pause state.
- copy:
content: "Built project"
dest: "{{zuul.executor.log_root}}/../../../compile2-pause.txt"
- name: Pause and let child run
zuul_return:
data:
zuul:
pause: true
compile2: test

View File

@ -0,0 +1,23 @@
- hosts: all
tasks:
# We expect the pause indicator of compile1 to exist
- stat:
path: "{{zuul.executor.log_root}}/../../../compile1-pause.txt"
register: build_artifact
- assert:
that:
- build_artifact.stat.exists
# We expect the variables of compile1 to exist
- assert:
that:
- compile1 is defined
- compile1 == 'test'
# This job has no child job so just pause to check if
# we don't break if there are no child jobs.
- name: Pause
zuul_return:
data:
zuul:
pause: true

View File

@ -0,0 +1,23 @@
- hosts: all
tasks:
# We expect the pause indicator of compile1 and compile2 to exist
- stat:
path: "{{zuul.executor.log_root}}/../../../compile1-pause.txt"
register: build_artifact
- assert:
that:
- build_artifact.stat.exists
- stat:
path: "{{zuul.executor.log_root}}/../../../compile2-pause.txt"
register: build_artifact
- assert:
that:
- build_artifact.stat.exists
# We expect the variables of compile1 and compile2 to exist
- assert:
that:
- compile1 is defined
- compile1 == 'test'
- compile2 is defined
- compile2 == 'test'

View File

@ -0,0 +1,5 @@
- hosts: all
tasks:
- name: Fail
fail:
msg: This shall fail

View File

@ -0,0 +1,5 @@
- hosts: all
tasks:
- name: Good
debug:
msg: This shall not fail

View File

@ -0,0 +1,49 @@
- job:
name: compile1
run: playbooks/compile1.yaml
post-run: playbooks/compile1-post.yaml
- job:
name: compile2
run: playbooks/compile2.yaml
post-run: playbooks/compile2-post.yaml
- job:
name: test1-after-compile1
run: playbooks/test-compile1.yaml
- job:
name: test2-after-compile1
run: playbooks/test-compile1.yaml
- job:
name: test-after-compile2
run: playbooks/test-compile2.yaml
- job:
name: test-good
run: playbooks/test-good.yaml
- job:
name: test-fail
run: playbooks/test-fail.yaml
- project:
check:
jobs:
- compile1
- compile2:
dependencies:
- compile1
- test1-after-compile1:
dependencies:
- compile1
- test2-after-compile1:
dependencies:
- compile1
- test-after-compile2:
dependencies:
- compile2
- test-good
- test-fail

View File

@ -0,0 +1,7 @@
- tenant:
name: tenant-one
source:
gerrit:
config-projects:
- common-config
- org/project

View File

@ -4131,3 +4131,42 @@ class TestNoLog(AnsibleZuulTestCase):
self.assertNotIn('my-very-secret-password-2', json_log)
self.assertNotIn('my-very-secret-password-1', text_log)
self.assertNotIn('my-very-secret-password-2', text_log)
class TestJobPause(AnsibleZuulTestCase):
tenant_config_file = 'config/job-pause/main.yaml'
def _get_file(self, build, path):
p = os.path.join(build.jobdir.root, path)
with open(p) as f:
return f.read()
def test_job_pause(self):
self.wait_timeout = 120
# Output extra ansible info so we might see errors.
self.executor_server.verbose = True
self.executor_server.keep_jobdir = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertHistory([
dict(name='test-fail', result='FAILURE', changes='1,1'),
dict(name='test-good', result='SUCCESS', changes='1,1'),
dict(name='test1-after-compile1', result='SUCCESS', changes='1,1'),
dict(name='test2-after-compile1', result='SUCCESS', changes='1,1'),
dict(name='test-after-compile2', result='SUCCESS', changes='1,1'),
dict(name='compile2', result='SUCCESS', changes='1,1'),
dict(name='compile1', result='SUCCESS', changes='1,1'),
], ordered=False)
# The order of some of these tests is not deterministic so check that
# the last two are compile2, compile1 in this order.
history_compile1 = self.history[-1]
history_compile2 = self.history[-2]
self.assertEqual('compile1', history_compile1.name)
self.assertEqual('compile2', history_compile2.name)

View File

@ -138,7 +138,13 @@ import LineTImage from '../images/line-t.png';
job_status: function (job) {
let result = job.result ? job.result.toLowerCase() : null
if (result === null) {
result = job.url ? 'in progress' : 'queued'
if (job.url === null) {
result = 'queued'
} else if (job.paused !== null && job.paused) {
result = 'paused'
} else {
result = 'in progress'
}
}
if (result === 'in progress') {

View File

@ -417,6 +417,12 @@ class ExecutorClient(object):
# Update information about worker
build.worker.updateFromData(data)
if 'paused' in data and build.paused != data['paused']:
build.paused = data['paused']
if build.paused:
result_data = data.get('data', {})
self.sched.onBuildPaused(build, result_data)
if not started:
self.log.info("Build %s started" % job)
build.__gearman_worker = data.get('worker_name')
@ -465,6 +471,19 @@ class ExecutorClient(object):
timeout=300)
return True
def resumeBuild(self, build):
if not build.__gearman_worker:
self.log.error("Build %s has no manager while resuming" %
(build,))
resume_uuid = str(uuid4().hex)
data = dict(uuid=build.__gearman_job.unique)
stop_job = gear.TextJob("executor:resume:%s" % build.__gearman_worker,
json_dumps(data), unique=resume_uuid)
self.meta_jobs[resume_uuid] = stop_job
self.log.debug("Submitting resume job: %s", stop_job)
self.gearman.submitJob(stop_job, precedence=gear.PRECEDENCE_HIGH,
timeout=300)
def lookForLostBuilds(self):
self.log.debug("Looking for lost builds")
# Construct a list from the values iterator to protect from it changing

View File

@ -36,6 +36,13 @@ class StartingBuildsSensor(SensorInterface):
def _getRunningBuilds(self):
return len(self.executor.job_workers)
def _getPausedBuilds(self):
paused_builds = 0
for worker in self.executor.job_workers.values():
if not worker.paused:
paused_builds += 1
return paused_builds
def isOk(self):
starting_builds = self._getStartingBuilds()
max_starting_builds = max(
@ -49,5 +56,6 @@ class StartingBuildsSensor(SensorInterface):
return True, "{} <= {}".format(starting_builds, max_starting_builds)
def reportStats(self, statsd, base_key):
statsd.gauge(base_key + '.paused_builds', self._getPausedBuilds())
statsd.gauge(base_key + '.running_builds', self._getRunningBuilds())
statsd.gauge(base_key + '.starting_builds', self._getStartingBuilds())

View File

@ -627,8 +627,10 @@ class AnsibleJob(object):
self.proc_lock = threading.Lock()
self.running = False
self.started = False # Whether playbooks have started running
self.paused = False
self.aborted = False
self.aborted_reason = None
self._resume_event = threading.Event()
self.thread = None
self.project_info = {}
self.private_key_file = get_default(self.executor_server.config,
@ -654,8 +656,47 @@ class AnsibleJob(object):
def stop(self, reason=None):
self.aborted = True
self.aborted_reason = reason
# if paused we need to resume the job so it can be stopped
self.resume()
self.abortRunningProc()
def pause(self):
args = json.loads(self.job.arguments)
self.log.info(
"Pausing job %s for ref %s (change %s)" % (
args['zuul']['job'],
args['zuul']['ref'],
args['zuul']['change_url']))
with open(self.jobdir.job_output_file, 'a') as job_output:
job_output.write(
"{now} |\n"
"{now} | Job paused\n".format(now=datetime.datetime.now()))
self.paused = True
data = {'paused': self.paused, 'data': self.getResultData()}
self.job.sendWorkData(json.dumps(data))
self._resume_event.wait()
def resume(self):
if not self.paused:
return
args = json.loads(self.job.arguments)
self.log.info(
"Resuming job %s for ref %s (change %s)" % (
args['zuul']['job'],
args['zuul']['ref'],
args['zuul']['change_url']))
with open(self.jobdir.job_output_file, 'a') as job_output:
job_output.write(
"{now} | Job resumed\n"
"{now} |\n".format(now=datetime.datetime.now()))
self.paused = False
self._resume_event.set()
def wait(self):
if self.thread:
self.thread.join()
@ -986,6 +1027,12 @@ class AnsibleJob(object):
# run it again.
return None
# check if we need to pause here
result_data = self.getResultData()
pause = result_data.get('zuul', {}).get('pause')
if pause:
self.pause()
post_timeout = args['post_timeout']
for index, playbook in enumerate(self.jobdir.post_playbooks):
# Post timeout operates a little differently to the main job
@ -1984,6 +2031,8 @@ class ExecutorServer(object):
def register(self):
self.register_work()
self.executor_worker.registerFunction("executor:resume:%s" %
self.hostname)
self.executor_worker.registerFunction("executor:stop:%s" %
self.hostname)
self.merger_worker.registerFunction("merger:merge")
@ -2182,6 +2231,9 @@ class ExecutorServer(object):
if job.name == 'executor:execute':
self.log.debug("Got execute job: %s" % job.unique)
self.executeJob(job)
elif job.name.startswith('executor:resume'):
self.log.debug("Got resume job: %s" % job.unique)
self.resumeJob(job)
elif job.name.startswith('executor:stop'):
self.log.debug("Got stop job: %s" % job.unique)
self.stopJob(job)
@ -2249,6 +2301,15 @@ class ExecutorServer(object):
unique = os.path.basename(jobdir)
self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL)
def resumeJob(self, job):
try:
args = json.loads(job.arguments)
self.log.debug("Resume job with arguments: %s" % (args,))
unique = args['uuid']
self.resumeJobByUnique(unique)
finally:
job.sendWorkComplete()
def stopJob(self, job):
try:
args = json.loads(job.arguments)
@ -2258,6 +2319,17 @@ class ExecutorServer(object):
finally:
job.sendWorkComplete()
def resumeJobByUnique(self, unique):
job_worker = self.job_workers.get(unique)
if not job_worker:
self.log.debug("Unable to find worker for job %s" % (unique,))
return
try:
job_worker.resume()
except Exception:
self.log.exception("Exception sending resume command "
"to worker:")
def stopJobByUnique(self, unique, reason=None):
job_worker = self.job_workers.get(unique)
if not job_worker:

View File

@ -718,6 +718,46 @@ class PipelineManager(object):
self.log.debug("Build %s started" % build)
return True
def onBuildPaused(self, build):
item = build.build_set.item
self.log.debug("Build %s of %s paused", build, item.change)
child_jobs = item.job_graph.getDependentJobsRecursively(build.job.name)
if not child_jobs:
# if there are no child jobs we need to directly resume the build
self.log.debug("Resuming build %s of %s with no child jobs",
build, item.change)
self.sched.executor.resumeBuild(build)
item.setResult(build)
return True
def _resumeParents(self, build):
"""
Resumes all parent jobs where all children are finished.
"""
item = build.build_set.item
builds = item.current_build_set.builds
jobgraph = item.job_graph
parent_builds = [builds.get(x.name) for x in
jobgraph.getParentJobsRecursively(
build.job.name, soft=True)]
for parent_build in parent_builds:
if parent_build and parent_build.paused:
# check if all child jobs are finished
child_builds = [builds.get(x.name) for x in
jobgraph.getDependentJobsRecursively(
parent_build.job.name)]
all_completed = True
for child_build in child_builds:
if not child_build or not child_build.result:
all_completed = False
break
if all_completed:
self.sched.executor.resumeBuild(parent_build)
parent_build.paused = False
def onBuildCompleted(self, build):
item = build.build_set.item
@ -731,6 +771,7 @@ class PipelineManager(object):
if build.retry:
build.build_set.removeJobNodeSet(build.job.name)
self._resumeParents(build)
return True
def onMergeCompleted(self, event):

View File

@ -1491,9 +1491,9 @@ class JobGraph(object):
all_dependent_jobs |= new_dependent_jobs
return [self.jobs[name] for name in all_dependent_jobs]
def getParentJobsRecursively(self, dependent_job):
def getParentJobsRecursively(self, dependent_job, soft=False):
return [self.jobs[name] for name in
self._getParentJobNamesRecursively(dependent_job)]
self._getParentJobNamesRecursively(dependent_job, soft)]
def _getParentJobNamesRecursively(self, dependent_job, soft=False):
all_parent_jobs = set()
@ -1534,6 +1534,7 @@ class Build(object):
self.end_time = None
self.estimated_time = None
self.canceled = False
self.paused = False
self.retry = False
self.parameters = {}
self.worker = Worker()
@ -1998,7 +1999,7 @@ class QueueItem(object):
for job in self.job_graph.getJobs():
build = self.current_build_set.getBuild(job.name)
if build:
if build.result == 'SUCCESS':
if build.result == 'SUCCESS' or build.paused:
successful_job_names.add(job.name)
else:
jobs_not_started.add(job)
@ -2056,7 +2057,7 @@ class QueueItem(object):
jobs_not_requested = set()
for job in self.job_graph.getJobs():
build = build_set.getBuild(job.name)
if build and build.result == 'SUCCESS':
if build and (build.result == 'SUCCESS' or build.paused):
successful_job_names.add(job.name)
else:
nodeset = build_set.getJobNodeSet(job.name)
@ -2108,7 +2109,7 @@ class QueueItem(object):
skipped += self.job_graph.getDependentJobsRecursively(
skip)
elif build.result != 'SUCCESS':
elif build.result != 'SUCCESS' and not build.paused:
skipped += self.job_graph.getDependentJobsRecursively(
build.job.name)
@ -2306,6 +2307,7 @@ class QueueItem(object):
'estimated_time': build.estimated_time if build else None,
'pipeline': build.pipeline.name if build else None,
'canceled': build.canceled if build else None,
'paused': build.paused if build else None,
'retry': build.retry if build else None,
'node_labels': build.node_labels if build else [],
'node_name': build.node_name if build else None,

View File

@ -174,6 +174,16 @@ class BuildStartedEvent(ResultEvent):
self.build = build
class BuildPausedEvent(ResultEvent):
"""A build has been paused.
:arg Build build: The build which has been paused.
"""
def __init__(self, build):
self.build = build
class BuildCompletedEvent(ResultEvent):
"""A build has completed
@ -396,6 +406,12 @@ class Scheduler(threading.Thread):
self.result_event_queue.put(event)
self.wake_event.set()
def onBuildPaused(self, build, result_data):
build.result_data = result_data
event = BuildPausedEvent(build)
self.result_event_queue.put(event)
self.wake_event.set()
def onBuildCompleted(self, build, result, result_data):
build.end_time = time.time()
build.result_data = result_data
@ -1058,6 +1074,8 @@ class Scheduler(threading.Thread):
try:
if isinstance(event, BuildStartedEvent):
self._doBuildStartedEvent(event)
elif isinstance(event, BuildPausedEvent):
self._doBuildPausedEvent(event)
elif isinstance(event, BuildCompletedEvent):
self._doBuildCompletedEvent(event)
elif isinstance(event, MergeCompletedEvent):
@ -1087,6 +1105,19 @@ class Scheduler(threading.Thread):
self.log.exception("Exception estimating build time:")
pipeline.manager.onBuildStarted(event.build)
def _doBuildPausedEvent(self, event):
build = event.build
if build.build_set is not build.build_set.item.current_build_set:
self.log.warning("Build %s is not in the current build set" %
(build,))
return
pipeline = build.build_set.item.pipeline
if not pipeline:
self.log.warning("Build %s is not associated with a pipeline" %
(build,))
return
pipeline.manager.onBuildPaused(event.build)
def _getAutoholdRequestKey(self, build):
change = build.build_set.item.change