executor: add merge_jobs options to disable gearman merge jobs

This change adds a zuul.conf option to disable the global merge
jobs from running on an executor node.

Change-Id: Icd6374a6c97a404662b39de9df54f4b7c5ab36aa
This commit is contained in:
Tristan Cacqueray 2019-08-16 15:26:03 +00:00
parent d68f63c641
commit c96f0af4f5
3 changed files with 38 additions and 18 deletions

View File

@ -736,6 +736,15 @@ The following sections of ``zuul.conf`` are used by the executor:
node-attributes:
executor-zone: vpn
.. attr:: merge_jobs
:default: True
To disable global merge job, set it to false. This is useful for zoned
executors that are running on slow network where you don't want them to
perform merge operations for any events. The executor will still perform
the merge operations required for the build they are executing.
.. attr:: merger
.. attr:: git_user_email

View File

@ -0,0 +1,4 @@
---
features:
- |
A new config option enable executor to not performed global merge job.

View File

@ -2416,20 +2416,23 @@ class ExecutorServer(object):
self.ansible_manager.install()
self.ansible_manager.copyAnsibleFiles()
self.merger_jobs = {
'merger:merge': self.merge,
'merger:cat': self.cat,
'merger:refstate': self.refstate,
'merger:fileschanges': self.fileschanges,
}
self.merger_gearworker = ZuulGearWorker(
'Zuul Executor Merger',
'zuul.ExecutorServer',
'merger',
self.config,
self.merger_jobs,
worker_class=ExecutorMergeWorker,
worker_args=[self])
if get_default(self.config, 'executor', 'merge_jobs', True):
self.merger_jobs = {
'merger:merge': self.merge,
'merger:cat': self.cat,
'merger:refstate': self.refstate,
'merger:fileschanges': self.fileschanges,
}
self.merger_gearworker = ZuulGearWorker(
'Zuul Executor Merger',
'zuul.ExecutorServer',
'merger',
self.config,
self.merger_jobs,
worker_class=ExecutorMergeWorker,
worker_args=[self])
else:
self.merger_gearworker = None
function_name = 'executor:execute'
if self.zone:
@ -2460,7 +2463,8 @@ class ExecutorServer(object):
self._running = True
self._command_running = True
self.merger_gearworker.start()
if self.merger_gearworker is not None:
self.merger_gearworker.start()
self.executor_gearworker.start()
self.log.debug("Starting command processor")
@ -2512,7 +2516,8 @@ class ExecutorServer(object):
self.governor_stop_event.set()
self.governor_thread.join()
# Stop accepting new jobs
self.merger_gearworker.gearman.setFunctions([])
if self.merger_gearworker is not None:
self.merger_gearworker.gearman.setFunctions([])
self.executor_gearworker.gearman.setFunctions([])
# Tell the executor worker to abort any jobs it just accepted,
# and grab the list of currently running job workers.
@ -2542,7 +2547,8 @@ class ExecutorServer(object):
# All job results should have been sent by now, shutdown the
# gearman workers.
self.merger_gearworker.stop()
if self.merger_gearworker is not None:
self.merger_gearworker.stop()
self.executor_gearworker.stop()
if self.statsd:
@ -2559,7 +2565,8 @@ class ExecutorServer(object):
self.governor_thread.join()
for update_thread in self.update_threads:
update_thread.join()
self.merger_gearworker.join()
if self.merger_gearworker is not None:
self.merger_gearworker.join()
self.executor_gearworker.join()
def pause(self):