From 090e9d8cd5104349a4e361b5f5d9e5eadc4397cd Mon Sep 17 00:00:00 2001 From: Ricardo Carrillo Cruz Date: Fri, 2 Mar 2018 12:55:35 +0100 Subject: [PATCH] Add support for zones in executors Create a new config setting to allow zuul executors to be grouped into zones. By default, this setting is disabled (set to None), to keep backwards compat. Story: 2001125 Task: 4817 Change-Id: I345ee2d0c004afa68858eb195189b56de3d41e97 Signed-off-by: Paul Belanger --- doc/source/admin/components.rst | 19 +++++++++ .../executor-zones-54318b8ea2f7e195.yaml | 7 ++++ tests/base.py | 4 +- tests/unit/test_scheduler.py | 42 +++++++++++++++++++ zuul/executor/client.py | 27 +++++++++++- zuul/executor/server.py | 19 +++++++-- zuul/lib/gear_utils.py | 42 +++++++++++++++++++ zuul/rpclistener.py | 23 ---------- zuul/scheduler.py | 5 ++- 9 files changed, 156 insertions(+), 32 deletions(-) create mode 100644 releasenotes/notes/executor-zones-54318b8ea2f7e195.yaml create mode 100644 zuul/lib/gear_utils.py diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst index bed1f84fc2..617ef35dd5 100644 --- a/doc/source/admin/components.rst +++ b/doc/source/admin/components.rst @@ -611,6 +611,25 @@ The following sections of ``zuul.conf`` are used by the executor: where it cannot determine its hostname correctly this can be overridden here. + .. attr:: zone + :default: None + + Name of the nodepool executor-zone to exclusively execute all jobs that + have nodes of the specified provider. As an example, it is possible for + nodepool nodes to exist in a cloud with out public accessable IP + IP address. By adding an executor to a zone nodepool nodes could be + configured to use private ip addresses. + + To enable this in nodepool, you'll use the node-attributes setting in a + provider pool. For example: + + .. code-block:: yaml + + pools: + - name: main + node-attributes: + executor-zone: vpn + .. attr:: merger .. attr:: git_user_email diff --git a/releasenotes/notes/executor-zones-54318b8ea2f7e195.yaml b/releasenotes/notes/executor-zones-54318b8ea2f7e195.yaml new file mode 100644 index 0000000000..42db8cccb8 --- /dev/null +++ b/releasenotes/notes/executor-zones-54318b8ea2f7e195.yaml @@ -0,0 +1,7 @@ +--- +features: + - | + One or more zuul executors can now be added to a :attr:`executor.zone`. + This is helpful if a cloud does not have any public IP addresses for + nodepool nodes. Now you'll be able to have a zuul executor on the same + private network as your nodepool nodes. diff --git a/tests/base.py b/tests/base.py index 4aa39004a9..9fbd1a2fae 100644 --- a/tests/base.py +++ b/tests/base.py @@ -1679,7 +1679,7 @@ class FakeGearmanServer(gear.Server): self.log.debug("releasing queued job %s (%s)" % (regex, qlen)) for job in self.getQueue(): match = False - if job.name == b'executor:execute': + if job.name.startswith(b'executor:execute'): parameters = json.loads(job.arguments.decode('utf8')) if not regex or re.match(regex, parameters.get('job')): match = True @@ -1748,6 +1748,7 @@ class FakeNodepool(object): self.thread.start() self.fail_requests = set() self.remote_ansible = False + self.attributes = None def stop(self): self._running = False @@ -1820,6 +1821,7 @@ class FakeNodepool(object): provider='test-provider', region='test-region', az='test-az', + attributes=self.attributes, interface_ip=remote_ip, public_ipv4=remote_ip, private_ipv4=None, diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index c4bd99d3fe..dc34b0a944 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -65,6 +65,48 @@ class TestSchedulerSSL(SSLZuulTestCase): 'label1') +class TestSchedulerZone(ZuulTestCase): + tenant_config_file = 'config/single-tenant/main.yaml' + + def setUp(self): + super(TestSchedulerZone, self).setUp() + self.fake_nodepool.attributes = {'executor-zone': 'test-provider.vpn'} + + def setup_config(self): + super(TestSchedulerZone, self).setup_config() + self.config.set('executor', 'zone', 'test-provider.vpn') + + def test_jobs_executed(self): + "Test that jobs are executed and a change is merged per zone" + self.gearman_server.hold_jobs_in_queue = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + A.addApproval('Code-Review', 2) + self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) + self.waitUntilSettled() + + queue = self.gearman_server.getQueue() + self.assertEqual(len(self.builds), 0) + self.assertEqual(len(queue), 1) + self.assertEqual(b'executor:execute:test-provider.vpn', queue[0].name) + + self.gearman_server.hold_jobs_in_queue = False + self.gearman_server.release() + self.waitUntilSettled() + + self.assertEqual(self.getJobFromHistory('project-merge').result, + 'SUCCESS') + self.assertEqual(self.getJobFromHistory('project-test1').result, + 'SUCCESS') + self.assertEqual(self.getJobFromHistory('project-test2').result, + 'SUCCESS') + self.assertEqual(A.data['status'], 'MERGED') + self.assertEqual(A.reported, 2) + self.assertEqual(self.getJobFromHistory('project-test1').node, + 'label1') + self.assertEqual(self.getJobFromHistory('project-test2').node, + 'label1') + + class TestScheduler(ZuulTestCase): tenant_config_file = 'config/single-tenant/main.yaml' diff --git a/zuul/executor/client.py b/zuul/executor/client.py index a1d251d93d..db1ef48f37 100644 --- a/zuul/executor/client.py +++ b/zuul/executor/client.py @@ -22,6 +22,7 @@ from uuid import uuid4 import zuul.model from zuul.lib.config import get_default +from zuul.lib.gear_utils import getGearmanFunctions from zuul.lib.jsonutil import json_dumps from zuul.model import Build @@ -306,8 +307,30 @@ class ExecutorClient(object): self.sched.onBuildCompleted(build, 'SUCCESS', {}, []) return build - gearman_job = gear.TextJob('executor:execute', json_dumps(params), - unique=uuid) + functions = getGearmanFunctions(self.gearman) + function_name = 'executor:execute' + # Because all nodes belong to the same provider, region and + # availability zone we can get executor_zone from only the first + # node. + executor_zone = None + if nodes and nodes[0].get('attributes'): + executor_zone = nodes[0]['attributes'].get('executor-zone') + + if executor_zone: + _fname = '%s:%s' % ( + function_name, + executor_zone) + if _fname in functions.keys(): + function_name = _fname + else: + self.log.warning( + "Job requested '%s' zuul-executor zone, but no " + "zuul-executors found for this zone; ignoring zone " + "request" % executor_zone) + + gearman_job = gear.TextJob( + function_name, json_dumps(params), unique=uuid) + build.__gearman_job = gearman_job build.__gearman_worker = None self.builds[uuid] = build diff --git a/zuul/executor/server.py b/zuul/executor/server.py index 8e243d6696..a6e4d36cec 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -2033,6 +2033,7 @@ class ExecutorServer(object): 'default_username', 'zuul') self.disk_limit_per_job = int(get_default(self.config, 'executor', 'disk_limit_per_job', 250)) + self.zone = get_default(self.config, 'executor', 'zone') self.merge_email = get_default(self.config, 'merger', 'git_user_email') self.merge_name = get_default(self.config, 'merger', 'git_user_name') self.merge_speed_limit = get_default( @@ -2177,7 +2178,10 @@ class ExecutorServer(object): def register_work(self): if self._running: self.accepting_work = True - self.executor_worker.registerFunction("executor:execute") + function_name = 'executor:execute' + if self.zone: + function_name += ':%s' % self.zone + self.executor_worker.registerFunction(function_name) # TODO(jeblair): Update geard to send a noop after # registering for a job which is in the queue, then remove # this API violation. @@ -2185,7 +2189,10 @@ class ExecutorServer(object): def unregister_work(self): self.accepting_work = False - self.executor_worker.unRegisterFunction("executor:execute") + function_name = 'executor:execute' + if self.zone: + function_name += ':%s' % self.zone + self.executor_worker.unRegisterFunction(function_name) def stop(self): self.log.debug("Stopping") @@ -2362,8 +2369,12 @@ class ExecutorServer(object): if not self._running: job.sendWorkFail() return - if job.name == 'executor:execute': - self.log.debug("Got execute job: %s" % job.unique) + function_name = 'executor:execute' + if self.zone: + function_name += ':%s' % self.zone + if job.name == (function_name): + self.log.debug("Got %s job: %s" % + (function_name, job.unique)) self.executeJob(job) elif job.name.startswith('executor:resume'): self.log.debug("Got resume job: %s" % job.unique) diff --git a/zuul/lib/gear_utils.py b/zuul/lib/gear_utils.py new file mode 100644 index 0000000000..02df72daf7 --- /dev/null +++ b/zuul/lib/gear_utils.py @@ -0,0 +1,42 @@ +# Copyright 2018 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import gear +import logging + +log = logging.getLogger("zuul.gear_utils") + + +def getGearmanFunctions(gearman): + functions = {} + for connection in gearman.active_connections: + try: + req = gear.StatusAdminRequest() + connection.sendAdminRequest(req, timeout=300) + except Exception: + log.exception("Exception while listing functions") + gearman._lostConnection(connection) + continue + for line in req.response.decode('utf8').split('\n'): + parts = [x.strip() for x in line.split('\t')] + if len(parts) < 4: + continue + # parts[0] - function name + # parts[1] - total jobs queued (including building) + # parts[2] - jobs building + # parts[3] - workers registered + data = functions.setdefault(parts[0], [0, 0, 0]) + for i in range(3): + data[i] += int(parts[i + 1]) + return functions diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py index 5380e93112..384b9c7203 100644 --- a/zuul/rpclistener.py +++ b/zuul/rpclistener.py @@ -77,29 +77,6 @@ class RPCListener(object): self.worker.registerFunction("zuul:key_get") self.worker.registerFunction("zuul:config_errors_list") - def getFunctions(self): - functions = {} - for connection in self.worker.active_connections: - try: - req = gear.StatusAdminRequest() - connection.sendAdminRequest(req, timeout=300) - except Exception: - self.log.exception("Exception while listing functions") - self.worker._lostConnection(connection) - continue - for line in req.response.decode('utf8').split('\n'): - parts = [x.strip() for x in line.split('\t')] - if len(parts) < 4: - continue - # parts[0] - function name - # parts[1] - total jobs queued (including building) - # parts[2] - jobs building - # parts[3] - workers registered - data = functions.setdefault(parts[0], [0, 0, 0]) - for i in range(3): - data[i] += int(parts[i + 1]) - return functions - def stop(self): self.log.debug("Stopping") self._running = False diff --git a/zuul/scheduler.py b/zuul/scheduler.py index cba94b9547..3f501fde88 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -33,6 +33,7 @@ from zuul import version as zuul_version from zuul import rpclistener from zuul.lib import commandsocket from zuul.lib.config import get_default +from zuul.lib.gear_utils import getGearmanFunctions from zuul.lib.statsd import get_statsd import zuul.lib.queue @@ -373,7 +374,7 @@ class Scheduler(threading.Thread): def _runStats(self): if not self.statsd: return - functions = self.rpc.getFunctions() + functions = getGearmanFunctions(self.rpc.worker) executors_accepting = 0 executors_online = 0 execute_queue = 0 @@ -382,7 +383,7 @@ class Scheduler(threading.Thread): merge_queue = 0 merge_running = 0 for (name, (queued, running, registered)) in functions.items(): - if name == 'executor:execute': + if name.startswith('executor:execute'): executors_accepting = registered execute_queue = queued - running execute_running = running