Mutex repo updates and merge ops in executor
The executor maintains a set of all of the repos known to Zuul. This is primarily for the purpose of having a local cache from which to clone the repos which are used in jobs. To that end, before each job starts, it submits a request to a queue to update all the repos in the job. This makes sure they are up to date before being cloned for the job. Since we have that set of repos handy, we can also use them to perform merge operations. In other words, the executor con also act as a merger. This can be useful under heavy load, or in the case of a very simple Zuul installation which has no other merger. However, merge and update operations must not run at the same time, as simultaneous access to the git repo may cause errors. To that end, set up a mutex around merge jobs and update tasks. Since the primary purpose of the repos is to perform update tasks, create a special gearman worker for the merge tasks so that we can de-prioritize them in the executor. In this, we delay response to a NOOP packet until the update queue is empty. That means that if the gearman server notifies us that a merge job is ready, we won't grab it unless our merger is otherwise idle (we can still race here and get an update task between NOOP and GRAB_JOB, but there's little we can do about that, and the worst case is that we briefly delay a merge job). Since the executor jobs are now in a different worker, their NOOP/ GRAB_JOB cycle is unimpeded, even if the merge worker is waiting. Change-Id: Icf3663b1a2ce5309e496b1106d5adee6579e37c7
This commit is contained in:
@@ -2301,7 +2301,8 @@ class ZuulTestCase(BaseTestCase):
|
||||
# It hasn't been reported yet.
|
||||
return False
|
||||
# Make sure that none of the worker connections are in GRAB_WAIT
|
||||
for connection in self.executor_server.worker.active_connections:
|
||||
worker = self.executor_server.executor_worker
|
||||
for connection in worker.active_connections:
|
||||
if connection.state == 'GRAB_WAIT':
|
||||
return False
|
||||
return True
|
||||
|
||||
@@ -311,6 +311,20 @@ def _copy_ansible_files(python_module, target_dir):
|
||||
shutil.copy(os.path.join(library_path, fn), target_dir)
|
||||
|
||||
|
||||
class ExecutorMergeWorker(gear.TextWorker):
|
||||
def __init__(self, executor_server, *args, **kw):
|
||||
self.zuul_executor_server = executor_server
|
||||
super(ExecutorMergeWorker, self).__init__(*args, **kw)
|
||||
|
||||
def handleNoop(self, packet):
|
||||
# Wait until the update queue is empty before responding
|
||||
while self.zuul_executor_server.update_queue.qsize():
|
||||
time.sleep(1)
|
||||
|
||||
with self.zuul_executor_server.merger_lock:
|
||||
super(ExecutorMergeWorker, self).handleNoop(packet)
|
||||
|
||||
|
||||
class ExecutorServer(object):
|
||||
log = logging.getLogger("zuul.ExecutorServer")
|
||||
|
||||
@@ -323,6 +337,7 @@ class ExecutorServer(object):
|
||||
# perhaps hostname+pid.
|
||||
self.hostname = socket.gethostname()
|
||||
self.zuul_url = config.get('merger', 'zuul_url')
|
||||
self.merger_lock = threading.Lock()
|
||||
self.command_map = dict(
|
||||
stop=self.stop,
|
||||
pause=self.pause,
|
||||
@@ -406,10 +421,13 @@ class ExecutorServer(object):
|
||||
port = self.config.get('gearman', 'port')
|
||||
else:
|
||||
port = 4730
|
||||
self.worker = gear.TextWorker('Zuul Executor Server')
|
||||
self.worker.addServer(server, port)
|
||||
self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
|
||||
self.merger_worker.addServer(server, port)
|
||||
self.executor_worker = gear.TextWorker('Zuul Executor Server')
|
||||
self.executor_worker.addServer(server, port)
|
||||
self.log.debug("Waiting for server")
|
||||
self.worker.waitForServer()
|
||||
self.merger_worker.waitForServer()
|
||||
self.executor_worker.waitForServer()
|
||||
self.log.debug("Registering")
|
||||
self.register()
|
||||
|
||||
@@ -423,15 +441,19 @@ class ExecutorServer(object):
|
||||
self.update_thread = threading.Thread(target=self._updateLoop)
|
||||
self.update_thread.daemon = True
|
||||
self.update_thread.start()
|
||||
self.thread = threading.Thread(target=self.run)
|
||||
self.thread.daemon = True
|
||||
self.thread.start()
|
||||
self.merger_thread = threading.Thread(target=self.run_merger)
|
||||
self.merger_thread.daemon = True
|
||||
self.merger_thread.start()
|
||||
self.executor_thread = threading.Thread(target=self.run_executor)
|
||||
self.executor_thread.daemon = True
|
||||
self.executor_thread.start()
|
||||
|
||||
def register(self):
|
||||
self.worker.registerFunction("executor:execute")
|
||||
self.worker.registerFunction("executor:stop:%s" % self.hostname)
|
||||
self.worker.registerFunction("merger:merge")
|
||||
self.worker.registerFunction("merger:cat")
|
||||
self.executor_worker.registerFunction("executor:execute")
|
||||
self.executor_worker.registerFunction("executor:stop:%s" %
|
||||
self.hostname)
|
||||
self.merger_worker.registerFunction("merger:merge")
|
||||
self.merger_worker.registerFunction("merger:cat")
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
@@ -446,7 +468,8 @@ class ExecutorServer(object):
|
||||
except Exception:
|
||||
self.log.exception("Exception sending stop command "
|
||||
"to worker:")
|
||||
self.worker.shutdown()
|
||||
self.merger_worker.shutdown()
|
||||
self.executor_worker.shutdown()
|
||||
self.log.debug("Stopped")
|
||||
|
||||
def pause(self):
|
||||
@@ -471,7 +494,8 @@ class ExecutorServer(object):
|
||||
|
||||
def join(self):
|
||||
self.update_thread.join()
|
||||
self.thread.join()
|
||||
self.merger_thread.join()
|
||||
self.executor_thread.join()
|
||||
|
||||
def runCommand(self):
|
||||
while self._command_running:
|
||||
@@ -495,11 +519,12 @@ class ExecutorServer(object):
|
||||
if task is None:
|
||||
# We are asked to stop
|
||||
return
|
||||
self.log.info("Updating repo %s/%s" % (
|
||||
task.connection_name, task.project_name))
|
||||
self.merger.updateRepo(task.connection_name, task.project_name)
|
||||
self.log.debug("Finished updating repo %s/%s" %
|
||||
(task.connection_name, task.project_name))
|
||||
with self.merger_lock:
|
||||
self.log.info("Updating repo %s/%s" % (
|
||||
task.connection_name, task.project_name))
|
||||
self.merger.updateRepo(task.connection_name, task.project_name)
|
||||
self.log.debug("Finished updating repo %s/%s" %
|
||||
(task.connection_name, task.project_name))
|
||||
task.setComplete()
|
||||
|
||||
def update(self, connection_name, project_name):
|
||||
@@ -508,11 +533,35 @@ class ExecutorServer(object):
|
||||
task = self.update_queue.put(task)
|
||||
return task
|
||||
|
||||
def run(self):
|
||||
def run_merger(self):
|
||||
self.log.debug("Starting merger listener")
|
||||
while self._running:
|
||||
try:
|
||||
job = self.merger_worker.getJob()
|
||||
try:
|
||||
if job.name == 'merger:cat':
|
||||
self.log.debug("Got cat job: %s" % job.unique)
|
||||
self.cat(job)
|
||||
elif job.name == 'merger:merge':
|
||||
self.log.debug("Got merge job: %s" % job.unique)
|
||||
self.merge(job)
|
||||
else:
|
||||
self.log.error("Unable to handle job %s" % job.name)
|
||||
job.sendWorkFail()
|
||||
except Exception:
|
||||
self.log.exception("Exception while running job")
|
||||
job.sendWorkException(
|
||||
traceback.format_exc().encode('utf8'))
|
||||
except gear.InterruptedError:
|
||||
pass
|
||||
except Exception:
|
||||
self.log.exception("Exception while getting job")
|
||||
|
||||
def run_executor(self):
|
||||
self.log.debug("Starting executor listener")
|
||||
while self._running:
|
||||
try:
|
||||
job = self.worker.getJob()
|
||||
job = self.executor_worker.getJob()
|
||||
try:
|
||||
if job.name == 'executor:execute':
|
||||
self.log.debug("Got execute job: %s" % job.unique)
|
||||
@@ -520,12 +569,6 @@ class ExecutorServer(object):
|
||||
elif job.name.startswith('executor:stop'):
|
||||
self.log.debug("Got stop job: %s" % job.unique)
|
||||
self.stopJob(job)
|
||||
elif job.name == 'merger:cat':
|
||||
self.log.debug("Got cat job: %s" % job.unique)
|
||||
self.cat(job)
|
||||
elif job.name == 'merger:merge':
|
||||
self.log.debug("Got merge job: %s" % job.unique)
|
||||
self.merge(job)
|
||||
else:
|
||||
self.log.error("Unable to handle job %s" % job.name)
|
||||
job.sendWorkFail()
|
||||
@@ -566,8 +609,9 @@ class ExecutorServer(object):
|
||||
args = json.loads(job.arguments)
|
||||
task = self.update(args['connection'], args['project'])
|
||||
task.wait()
|
||||
files = self.merger.getFiles(args['connection'], args['project'],
|
||||
args['branch'], args['files'])
|
||||
with self.merger_lock:
|
||||
files = self.merger.getFiles(args['connection'], args['project'],
|
||||
args['branch'], args['files'])
|
||||
result = dict(updated=True,
|
||||
files=files,
|
||||
zuul_url=self.zuul_url)
|
||||
@@ -575,8 +619,9 @@ class ExecutorServer(object):
|
||||
|
||||
def merge(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
ret = self.merger.mergeChanges(args['items'], args.get('files'),
|
||||
args.get('repo_state'))
|
||||
with self.merger_lock:
|
||||
ret = self.merger.mergeChanges(args['items'], args.get('files'),
|
||||
args.get('repo_state'))
|
||||
result = dict(merged=(ret is not None),
|
||||
zuul_url=self.zuul_url)
|
||||
if ret is None:
|
||||
|
||||
Reference in New Issue
Block a user