Add support for zones in executors

Create a new config setting to allow zuul executors to be grouped into
zones. By default, this setting is disabled (set to None), to keep
backwards compat.

Story: 2001125
Task: 4817

Change-Id: I345ee2d0c004afa68858eb195189b56de3d41e97
Signed-off-by: Paul Belanger <pabelanger@redhat.com>
This commit is contained in:
Ricardo Carrillo Cruz 2018-03-02 12:55:35 +01:00 committed by Paul Belanger
parent 71f60674b9
commit 090e9d8cd5
9 changed files with 156 additions and 32 deletions

View File

@ -611,6 +611,25 @@ The following sections of ``zuul.conf`` are used by the executor:
where it cannot determine its hostname correctly this can be overridden
here.
.. attr:: zone
:default: None
Name of the nodepool executor-zone to exclusively execute all jobs that
have nodes of the specified provider. As an example, it is possible for
nodepool nodes to exist in a cloud with out public accessable IP
IP address. By adding an executor to a zone nodepool nodes could be
configured to use private ip addresses.
To enable this in nodepool, you'll use the node-attributes setting in a
provider pool. For example:
.. code-block:: yaml
pools:
- name: main
node-attributes:
executor-zone: vpn
.. attr:: merger
.. attr:: git_user_email

View File

@ -0,0 +1,7 @@
---
features:
- |
One or more zuul executors can now be added to a :attr:`executor.zone`.
This is helpful if a cloud does not have any public IP addresses for
nodepool nodes. Now you'll be able to have a zuul executor on the same
private network as your nodepool nodes.

View File

@ -1679,7 +1679,7 @@ class FakeGearmanServer(gear.Server):
self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
for job in self.getQueue():
match = False
if job.name == b'executor:execute':
if job.name.startswith(b'executor:execute'):
parameters = json.loads(job.arguments.decode('utf8'))
if not regex or re.match(regex, parameters.get('job')):
match = True
@ -1748,6 +1748,7 @@ class FakeNodepool(object):
self.thread.start()
self.fail_requests = set()
self.remote_ansible = False
self.attributes = None
def stop(self):
self._running = False
@ -1820,6 +1821,7 @@ class FakeNodepool(object):
provider='test-provider',
region='test-region',
az='test-az',
attributes=self.attributes,
interface_ip=remote_ip,
public_ipv4=remote_ip,
private_ipv4=None,

View File

@ -65,6 +65,48 @@ class TestSchedulerSSL(SSLZuulTestCase):
'label1')
class TestSchedulerZone(ZuulTestCase):
tenant_config_file = 'config/single-tenant/main.yaml'
def setUp(self):
super(TestSchedulerZone, self).setUp()
self.fake_nodepool.attributes = {'executor-zone': 'test-provider.vpn'}
def setup_config(self):
super(TestSchedulerZone, self).setup_config()
self.config.set('executor', 'zone', 'test-provider.vpn')
def test_jobs_executed(self):
"Test that jobs are executed and a change is merged per zone"
self.gearman_server.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
queue = self.gearman_server.getQueue()
self.assertEqual(len(self.builds), 0)
self.assertEqual(len(queue), 1)
self.assertEqual(b'executor:execute:test-provider.vpn', queue[0].name)
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.waitUntilSettled()
self.assertEqual(self.getJobFromHistory('project-merge').result,
'SUCCESS')
self.assertEqual(self.getJobFromHistory('project-test1').result,
'SUCCESS')
self.assertEqual(self.getJobFromHistory('project-test2').result,
'SUCCESS')
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
self.assertEqual(self.getJobFromHistory('project-test1').node,
'label1')
self.assertEqual(self.getJobFromHistory('project-test2').node,
'label1')
class TestScheduler(ZuulTestCase):
tenant_config_file = 'config/single-tenant/main.yaml'

View File

@ -22,6 +22,7 @@ from uuid import uuid4
import zuul.model
from zuul.lib.config import get_default
from zuul.lib.gear_utils import getGearmanFunctions
from zuul.lib.jsonutil import json_dumps
from zuul.model import Build
@ -306,8 +307,30 @@ class ExecutorClient(object):
self.sched.onBuildCompleted(build, 'SUCCESS', {}, [])
return build
gearman_job = gear.TextJob('executor:execute', json_dumps(params),
unique=uuid)
functions = getGearmanFunctions(self.gearman)
function_name = 'executor:execute'
# Because all nodes belong to the same provider, region and
# availability zone we can get executor_zone from only the first
# node.
executor_zone = None
if nodes and nodes[0].get('attributes'):
executor_zone = nodes[0]['attributes'].get('executor-zone')
if executor_zone:
_fname = '%s:%s' % (
function_name,
executor_zone)
if _fname in functions.keys():
function_name = _fname
else:
self.log.warning(
"Job requested '%s' zuul-executor zone, but no "
"zuul-executors found for this zone; ignoring zone "
"request" % executor_zone)
gearman_job = gear.TextJob(
function_name, json_dumps(params), unique=uuid)
build.__gearman_job = gearman_job
build.__gearman_worker = None
self.builds[uuid] = build

View File

@ -2033,6 +2033,7 @@ class ExecutorServer(object):
'default_username', 'zuul')
self.disk_limit_per_job = int(get_default(self.config, 'executor',
'disk_limit_per_job', 250))
self.zone = get_default(self.config, 'executor', 'zone')
self.merge_email = get_default(self.config, 'merger', 'git_user_email')
self.merge_name = get_default(self.config, 'merger', 'git_user_name')
self.merge_speed_limit = get_default(
@ -2177,7 +2178,10 @@ class ExecutorServer(object):
def register_work(self):
if self._running:
self.accepting_work = True
self.executor_worker.registerFunction("executor:execute")
function_name = 'executor:execute'
if self.zone:
function_name += ':%s' % self.zone
self.executor_worker.registerFunction(function_name)
# TODO(jeblair): Update geard to send a noop after
# registering for a job which is in the queue, then remove
# this API violation.
@ -2185,7 +2189,10 @@ class ExecutorServer(object):
def unregister_work(self):
self.accepting_work = False
self.executor_worker.unRegisterFunction("executor:execute")
function_name = 'executor:execute'
if self.zone:
function_name += ':%s' % self.zone
self.executor_worker.unRegisterFunction(function_name)
def stop(self):
self.log.debug("Stopping")
@ -2362,8 +2369,12 @@ class ExecutorServer(object):
if not self._running:
job.sendWorkFail()
return
if job.name == 'executor:execute':
self.log.debug("Got execute job: %s" % job.unique)
function_name = 'executor:execute'
if self.zone:
function_name += ':%s' % self.zone
if job.name == (function_name):
self.log.debug("Got %s job: %s" %
(function_name, job.unique))
self.executeJob(job)
elif job.name.startswith('executor:resume'):
self.log.debug("Got resume job: %s" % job.unique)

42
zuul/lib/gear_utils.py Normal file
View File

@ -0,0 +1,42 @@
# Copyright 2018 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import gear
import logging
log = logging.getLogger("zuul.gear_utils")
def getGearmanFunctions(gearman):
functions = {}
for connection in gearman.active_connections:
try:
req = gear.StatusAdminRequest()
connection.sendAdminRequest(req, timeout=300)
except Exception:
log.exception("Exception while listing functions")
gearman._lostConnection(connection)
continue
for line in req.response.decode('utf8').split('\n'):
parts = [x.strip() for x in line.split('\t')]
if len(parts) < 4:
continue
# parts[0] - function name
# parts[1] - total jobs queued (including building)
# parts[2] - jobs building
# parts[3] - workers registered
data = functions.setdefault(parts[0], [0, 0, 0])
for i in range(3):
data[i] += int(parts[i + 1])
return functions

View File

@ -77,29 +77,6 @@ class RPCListener(object):
self.worker.registerFunction("zuul:key_get")
self.worker.registerFunction("zuul:config_errors_list")
def getFunctions(self):
functions = {}
for connection in self.worker.active_connections:
try:
req = gear.StatusAdminRequest()
connection.sendAdminRequest(req, timeout=300)
except Exception:
self.log.exception("Exception while listing functions")
self.worker._lostConnection(connection)
continue
for line in req.response.decode('utf8').split('\n'):
parts = [x.strip() for x in line.split('\t')]
if len(parts) < 4:
continue
# parts[0] - function name
# parts[1] - total jobs queued (including building)
# parts[2] - jobs building
# parts[3] - workers registered
data = functions.setdefault(parts[0], [0, 0, 0])
for i in range(3):
data[i] += int(parts[i + 1])
return functions
def stop(self):
self.log.debug("Stopping")
self._running = False

View File

@ -33,6 +33,7 @@ from zuul import version as zuul_version
from zuul import rpclistener
from zuul.lib import commandsocket
from zuul.lib.config import get_default
from zuul.lib.gear_utils import getGearmanFunctions
from zuul.lib.statsd import get_statsd
import zuul.lib.queue
@ -373,7 +374,7 @@ class Scheduler(threading.Thread):
def _runStats(self):
if not self.statsd:
return
functions = self.rpc.getFunctions()
functions = getGearmanFunctions(self.rpc.worker)
executors_accepting = 0
executors_online = 0
execute_queue = 0
@ -382,7 +383,7 @@ class Scheduler(threading.Thread):
merge_queue = 0
merge_running = 0
for (name, (queued, running, registered)) in functions.items():
if name == 'executor:execute':
if name.startswith('executor:execute'):
executors_accepting = registered
execute_queue = queued - running
execute_running = running