Browse Source

Merge "executor: add merge_jobs options to disable gearman merge jobs"

changes/54/685354/11
Zuul 2 weeks ago
parent
commit
a6c945d883
3 changed files with 38 additions and 18 deletions
  1. +9
    -0
      doc/source/admin/components.rst
  2. +4
    -0
      releasenotes/notes/executor-merger-f853f2f717ea4640.yaml
  3. +25
    -18
      zuul/executor/server.py

+ 9
- 0
doc/source/admin/components.rst View File

@@ -775,6 +775,15 @@ The following sections of ``zuul.conf`` are used by the executor:
node-attributes:
executor-zone: vpn

.. attr:: merge_jobs
:default: True

To disable global merge job, set it to false. This is useful for zoned
executors that are running on slow network where you don't want them to
perform merge operations for any events. The executor will still perform
the merge operations required for the build they are executing.


.. attr:: merger

.. attr:: git_user_email

+ 4
- 0
releasenotes/notes/executor-merger-f853f2f717ea4640.yaml View File

@@ -0,0 +1,4 @@
---
features:
- |
A new config option enable executor to not performed global merge job.

+ 25
- 18
zuul/executor/server.py View File

@@ -2430,20 +2430,23 @@ class ExecutorServer(object):
self.ansible_manager.install()
self.ansible_manager.copyAnsibleFiles()

self.merger_jobs = {
'merger:merge': self.merge,
'merger:cat': self.cat,
'merger:refstate': self.refstate,
'merger:fileschanges': self.fileschanges,
}
self.merger_gearworker = ZuulGearWorker(
'Zuul Executor Merger',
'zuul.ExecutorServer',
'merger',
self.config,
self.merger_jobs,
worker_class=ExecutorMergeWorker,
worker_args=[self])
if get_default(self.config, 'executor', 'merge_jobs', True):
self.merger_jobs = {
'merger:merge': self.merge,
'merger:cat': self.cat,
'merger:refstate': self.refstate,
'merger:fileschanges': self.fileschanges,
}
self.merger_gearworker = ZuulGearWorker(
'Zuul Executor Merger',
'zuul.ExecutorServer',
'merger',
self.config,
self.merger_jobs,
worker_class=ExecutorMergeWorker,
worker_args=[self])
else:
self.merger_gearworker = None

function_name = 'executor:execute'
if self.zone:
@@ -2474,7 +2477,8 @@ class ExecutorServer(object):
self._running = True
self._command_running = True

self.merger_gearworker.start()
if self.merger_gearworker is not None:
self.merger_gearworker.start()
self.executor_gearworker.start()

self.log.debug("Starting command processor")
@@ -2526,7 +2530,8 @@ class ExecutorServer(object):
self.governor_stop_event.set()
self.governor_thread.join()
# Stop accepting new jobs
self.merger_gearworker.gearman.setFunctions([])
if self.merger_gearworker is not None:
self.merger_gearworker.gearman.setFunctions([])
self.executor_gearworker.gearman.setFunctions([])
# Tell the executor worker to abort any jobs it just accepted,
# and grab the list of currently running job workers.
@@ -2556,7 +2561,8 @@ class ExecutorServer(object):

# All job results should have been sent by now, shutdown the
# gearman workers.
self.merger_gearworker.stop()
if self.merger_gearworker is not None:
self.merger_gearworker.stop()
self.executor_gearworker.stop()

if self.statsd:
@@ -2573,7 +2579,8 @@ class ExecutorServer(object):
self.governor_thread.join()
for update_thread in self.update_threads:
update_thread.join()
self.merger_gearworker.join()
if self.merger_gearworker is not None:
self.merger_gearworker.join()
self.executor_gearworker.join()

def pause(self):

Loading…
Cancel
Save