Optionally allow zoned executors to process unzoned jobs

When running a deployment using the zone feature it can be troublesome
to run nodeless jobs. The reason for this is that zoned executors
don't run jobs that have no zone at all. This can be solved by either
running an additional set of unzoned executors or with this change
allow some executors also to process unzoned jobs. As running
additional unzoned executors just for nodeless jobs can increase
operations overhead this optional setting can be very useful for those
setups.

Change-Id: I9c025cdbe17a55f47f0cbd97b8b593727fc97e2d
This commit is contained in:
Tobias Henkel
2019-07-31 17:07:39 +02:00
committed by James E. Blair
parent 7599b6bdc0
commit 171dfa36a6
4 changed files with 82 additions and 21 deletions

View File

@@ -829,6 +829,14 @@ The following sections of ``zuul.conf`` are used by the executor:
node-attributes:
executor-zone: vpn
.. attr:: allow_unzoned
:default: False
If :attr:`executor.zone` is set it by default only processes jobs with
nodes of that specific zone even if the nodes have no zone at all.
Enabling ``allow_unzoned`` lets the executor also take jobs with nodes
without zone.
.. attr:: merge_jobs
:default: True
@@ -837,7 +845,6 @@ The following sections of ``zuul.conf`` are used by the executor:
perform merge operations for any events. The executor will still perform
the merge operations required for the build they are executing.
.. attr:: merger
.. attr:: git_user_email

View File

@@ -0,0 +1,6 @@
---
features:
- |
Executors configured with :attr:`executor.zone` now can also be configured
to also process jobs without a zone by enabling
:attr:`executor.allow_unzoned`.

View File

@@ -159,6 +159,46 @@ class TestSchedulerZone(ZuulTestCase):
'label1')
class TestSchedulerZoneFallback(ZuulTestCase):
tenant_config_file = 'config/single-tenant/main.yaml'
def setup_config(self, config_file: str):
config = super().setup_config(config_file)
config.set('executor', 'zone', 'test-provider.vpn')
config.set('executor', 'allow_unzoned', 'true')
return config
def test_jobs_executed(self):
"Test that jobs are executed and a change is merged per zone"
self.gearman_server.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
queue = self.gearman_server.getQueue()
self.assertEqual(len(self.builds), 0)
self.assertEqual(len(queue), 1)
self.assertEqual(b'executor:execute', queue[0].name)
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.waitUntilSettled()
self.assertEqual(self.getJobFromHistory('project-merge').result,
'SUCCESS')
self.assertEqual(self.getJobFromHistory('project-test1').result,
'SUCCESS')
self.assertEqual(self.getJobFromHistory('project-test2').result,
'SUCCESS')
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
self.assertEqual(self.getJobFromHistory('project-test1').node,
'label1')
self.assertEqual(self.getJobFromHistory('project-test2').node,
'label1')
class TestAuthorizeViaRPC(ZuulTestCase):
tenant_config_file = 'config/authorization/single-tenant/main.yaml'

View File

@@ -2597,6 +2597,8 @@ class ExecutorServer(BaseMergeServer):
self.setup_timeout = int(get_default(self.config, 'executor',
'ansible_setup_timeout', 60))
self.zone = get_default(self.config, 'executor', 'zone')
self.allow_unzoned = get_default(self.config, 'executor',
'allow_unzoned', False)
self.ansible_callbacks = {}
for section_name in self.config.sections():
@@ -2695,22 +2697,14 @@ class ExecutorServer(BaseMergeServer):
self.process_merge_jobs = get_default(self.config, 'executor',
'merge_jobs', True)
function_name = 'executor:execute'
if self.zone:
function_name += ':%s' % self.zone
# This function only exists so we can count how many executors
# are online.
online_name = 'executor:online'
if self.zone:
online_name += ':%s' % self.zone
self.executor_jobs = {
"executor:resume:%s" % self.hostname: self.resumeJob,
"executor:stop:%s" % self.hostname: self.stopJob,
function_name: self.executeJob,
online_name: self.noop,
}
for function_name in self._getExecuteFunctionNames():
self.executor_jobs[function_name] = self.executeJob
for function_name in self._getOnlineFunctionNames():
self.executor_jobs[function_name] = self.noop
self.executor_gearworker = ZuulGearWorker(
'Zuul Executor Server',
@@ -2724,6 +2718,24 @@ class ExecutorServer(BaseMergeServer):
# Used to offload expensive operations to different processes
self.process_worker = None
def _getFunctionSuffixes(self):
suffixes = []
if self.zone:
suffixes.append(':' + self.zone)
if self.allow_unzoned:
suffixes.append('')
else:
suffixes.append('')
return suffixes
def _getExecuteFunctionNames(self):
base_name = 'executor:execute'
return [base_name + suffix for suffix in self._getFunctionSuffixes()]
def _getOnlineFunctionNames(self):
base_name = 'executor:online'
return [base_name + suffix for suffix in self._getFunctionSuffixes()]
def _repoLock(self, connection_name, project_name):
return self.repo_locks.getRepoLock(connection_name, project_name)
@@ -2775,10 +2787,8 @@ class ExecutorServer(BaseMergeServer):
def register_work(self):
if self._running:
self.accepting_work = True
function_name = 'executor:execute'
if self.zone:
function_name += ':%s' % self.zone
self.executor_gearworker.gearman.registerFunction(function_name)
for function in self._getExecuteFunctionNames():
self.executor_gearworker.gearman.registerFunction(function)
# TODO(jeblair): Update geard to send a noop after
# registering for a job which is in the queue, then remove
# this API violation.
@@ -2786,10 +2796,8 @@ class ExecutorServer(BaseMergeServer):
def unregister_work(self):
self.accepting_work = False
function_name = 'executor:execute'
if self.zone:
function_name += ':%s' % self.zone
self.executor_gearworker.gearman.unRegisterFunction(function_name)
for function in self._getExecuteFunctionNames():
self.executor_gearworker.gearman.unRegisterFunction(function)
def stop(self):
self.log.debug("Stopping")