Merge "Support pausing merge jobs"

This commit is contained in:
Zuul 2020-03-06 16:48:27 +00:00 committed by Gerrit Code Review
commit 8c68fa5e1b
6 changed files with 45 additions and 8 deletions

View File

@ -479,8 +479,13 @@ The following section of ``zuul.conf`` is used by the merger:
Operation
~~~~~~~~~
To start the merger, run ``zuul-merger``. To stop it, kill the
PID which was saved in the pidfile specified in the configuration.
To start the merger, run ``zuul-merger``.
In order to stop the merger and under normal circumstances it is
best to pause and wait for all currently running tasks to finish
before stopping it. To do so run ``zuul-merger pause``.
To stop the merger immediately, run ``zuul-merger stop``.
.. _executor:

View File

@ -0,0 +1,5 @@
---
features:
- |
The zuul-executor now also pauses all merger related tasks when it's paused.
Further the zuul-merger can also be paused by running ``zuul-merger pause``.

View File

@ -7399,6 +7399,9 @@ class TestSemaphore(ZuulTestCase):
# Pause the executor so it doesn't take any jobs.
self.executor_server.pause()
# Start merger as the paused executor won't take merge jobs.
self._startMerger()
# Pause nodepool so we can wait on the node requests and fulfill them
# in a controlled manner.
self.fake_nodepool.paused = True

View File

@ -2652,7 +2652,7 @@ class ExecutorServer(object):
}
self.merger_gearworker = ZuulGearWorker(
'Zuul Executor Merger',
'zuul.ExecutorServer',
'zuul.ExecutorServer.MergeWorker',
'merger',
self.config,
self.merger_jobs,
@ -2673,7 +2673,7 @@ class ExecutorServer(object):
self.executor_gearworker = ZuulGearWorker(
'Zuul Executor Server',
'zuul.ExecutorServer',
'zuul.ExecutorServer.ExecuteWorker',
'executor',
self.config,
self.executor_jobs,
@ -2812,10 +2812,16 @@ class ExecutorServer(object):
self.executor_gearworker.join()
def pause(self):
self.log.debug('Pausing')
self.pause_sensor.pause = True
if self.merger_gearworker is not None:
self.merger_gearworker.unregister()
def unpause(self):
self.log.debug('Resuming')
self.pause_sensor.pause = False
if self.merger_gearworker is not None:
self.merger_gearworker.register()
def graceful(self):
# TODOv3: implement

View File

@ -53,11 +53,18 @@ class ZuulGearWorker:
tcp_keepintvl=30, tcp_keepcnt=5)
self.log.debug('Waiting for server')
self.gearman.waitForServer()
self.register()
self.thread.start()
self.log.debug('Registering')
def register(self):
self.log.debug('Registering jobs')
for job in self.jobs:
self.gearman.registerFunction(job)
self.thread.start()
def unregister(self):
self.log.debug('Unregistering jobs')
for job in self.jobs:
self.gearman.unRegisterFunction(job)
def stop(self):
self._running = False

View File

@ -22,7 +22,7 @@ from zuul.lib.gearworker import ZuulGearWorker
from zuul.merger import merger
COMMANDS = ['stop']
COMMANDS = ['stop', 'pause', 'unpause']
class MergeServer(object):
@ -44,7 +44,10 @@ class MergeServer(object):
merge_root, connections, merge_email, merge_name, speed_limit,
speed_time, git_timeout=git_timeout)
self.command_map = dict(
stop=self.stop)
stop=self.stop,
pause=self.pause,
unpause=self.unpause,
)
command_socket = get_default(
self.config, 'merger', 'command_socket',
'/var/lib/zuul/merger.socket')
@ -85,6 +88,14 @@ class MergeServer(object):
def join(self):
self.gearworker.join()
def pause(self):
self.log.debug('Pausing')
self.gearworker.unregister()
def unpause(self):
self.log.debug('Resuming')
self.gearworker.register()
def runCommand(self):
while self._command_running:
try: