From df37ad2ce773b6c9a621a9e8be5a1c7688afec4a Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 1 Feb 2018 13:59:48 -0800 Subject: [PATCH] Executor: Don't start too many jobs at once The metrics that we use to govern load on the executors are all trailing indicators. The executors are capable of accepting a large number of jobs in a batch and then, only after they begin to run, will the load indicators increase. To avoid the thundering herd problem, reduce the rate at which we accept jobs past a certain point. That point is twice the number of jobs as the target load average. In practice that seems to be a fairly conservative but reasonable number of jobs for the executor to run, so, to facilitate a quick start, allow the executor to start up to that number all at once. Once the number of jobs running is beyond that number, subsequent jobs will only be accepted one at a time, after each one completes its startup phase (cloning repos, establishing ansible connections), which is to say, at the point where the job begins running its first pre-playbook. We will also wait until the next regular interval of the governor to accept the next job. That's currently 30 seconds, but to make the system a bit more responsive, it's lowered to 10 seconds in this change. To summarize: after a bunch[1] of jobs are running, after each new job, we wait until that job has started running playbooks, plus up to an additional 10 seconds, before accepting a new job. This is implemented by adding a 'starting jobs' metric to the governor so that we register or de-register the execute function based on whether too many jobs are in the startup phase. We add a forced call to the governor routine after each job starts so that we can unregister if necessary before picking up the next job, and wrap that routine in a lock since it is now called from multiple threads and its logic may not be entirely thread-safe. Also, add tests for all three inputs to manageLoad. [1] 2*target load average Change-Id: I066bc539e70eb475ca2b871fb90644264d8d5bf4 --- doc/source/admin/monitoring.rst | 9 ++- .../git/common-config/playbooks/base.yaml | 2 + .../governor/git/common-config/zuul.yaml | 34 ++++++++ tests/fixtures/config/governor/main.yaml | 6 ++ tests/unit/test_executor.py | 79 +++++++++++++++++++ zuul/executor/server.py | 44 +++++++++-- 6 files changed, 165 insertions(+), 9 deletions(-) create mode 100644 tests/fixtures/config/governor/git/common-config/playbooks/base.yaml create mode 100644 tests/fixtures/config/governor/git/common-config/zuul.yaml create mode 100644 tests/fixtures/config/governor/main.yaml diff --git a/doc/source/admin/monitoring.rst b/doc/source/admin/monitoring.rst index 0fdb3b2233..d43fd035c8 100644 --- a/doc/source/admin/monitoring.rst +++ b/doc/source/admin/monitoring.rst @@ -136,10 +136,17 @@ These metrics are emitted by the Zuul :ref:`scheduler`: Incremented each time the executor starts a build. + .. stat:: starting_builds + :type: gauge + + The number of builds starting on this executor. These are + builds which have not yet begun their first pre-playbook. + .. stat:: running_builds :type: gauge - The number of builds currently running on this executor. + The number of builds currently running on this executor. This + includes starting builds. .. stat:: load_average :type: gauge diff --git a/tests/fixtures/config/governor/git/common-config/playbooks/base.yaml b/tests/fixtures/config/governor/git/common-config/playbooks/base.yaml new file mode 100644 index 0000000000..f679dceaef --- /dev/null +++ b/tests/fixtures/config/governor/git/common-config/playbooks/base.yaml @@ -0,0 +1,2 @@ +- hosts: all + tasks: [] diff --git a/tests/fixtures/config/governor/git/common-config/zuul.yaml b/tests/fixtures/config/governor/git/common-config/zuul.yaml new file mode 100644 index 0000000000..093da16b13 --- /dev/null +++ b/tests/fixtures/config/governor/git/common-config/zuul.yaml @@ -0,0 +1,34 @@ +- pipeline: + name: check + manager: independent + trigger: + gerrit: + - event: patchset-created + success: + gerrit: + Verified: 1 + failure: + gerrit: + Verified: -1 + +- job: + name: base + parent: null + run: playbooks/base.yaml + +- job: + name: test1 + +- job: + name: test2 + +- job: + name: test3 + +- project: + name: common-config + check: + jobs: + - test1 + - test2 + - test3 diff --git a/tests/fixtures/config/governor/main.yaml b/tests/fixtures/config/governor/main.yaml new file mode 100644 index 0000000000..9d01f542f9 --- /dev/null +++ b/tests/fixtures/config/governor/main.yaml @@ -0,0 +1,6 @@ +- tenant: + name: tenant-one + source: + gerrit: + config-projects: + - common-config diff --git a/tests/unit/test_executor.py b/tests/unit/test_executor.py index 474859d781..8cb98ee8bb 100755 --- a/tests/unit/test_executor.py +++ b/tests/unit/test_executor.py @@ -15,6 +15,11 @@ # License for the specific language governing permissions and limitations # under the License. +try: + from unittest import mock +except ImportError: + import mock + import logging import time @@ -436,3 +441,77 @@ class TestExecutorHostname(ZuulTestCase): def test_executor_hostname(self): self.assertEqual('test-executor-hostname.example.com', self.executor_server.hostname) + + +class TestGovernor(ZuulTestCase): + tenant_config_file = 'config/governor/main.yaml' + + @mock.patch('os.getloadavg') + @mock.patch('psutil.virtual_memory') + def test_load_governor(self, vm_mock, loadavg_mock): + class Dummy(object): + pass + ram = Dummy() + ram.percent = 20.0 # 20% used + vm_mock.return_value = ram + loadavg_mock.return_value = (0.0, 0.0, 0.0) + self.executor_server.manageLoad() + self.assertTrue(self.executor_server.accepting_work) + ram.percent = 99.0 # 99% used + loadavg_mock.return_value = (100.0, 100.0, 100.0) + self.executor_server.manageLoad() + self.assertFalse(self.executor_server.accepting_work) + + def waitForExecutorBuild(self, jobname): + timeout = time.time() + 30 + build = None + while (time.time() < timeout and not build): + for b in self.builds: + if b.name == jobname: + build = b + break + time.sleep(0.1) + build_id = build.uuid + while (time.time() < timeout and + build_id not in self.executor_server.job_workers): + time.sleep(0.1) + worker = self.executor_server.job_workers[build_id] + while (time.time() < timeout and + not worker.started): + time.sleep(0.1) + return build + + def waitForWorkerCompletion(self, build): + timeout = time.time() + 30 + while (time.time() < timeout and + build.uuid in self.executor_server.job_workers): + time.sleep(0.1) + + def test_slow_start(self): + self.executor_server.hold_jobs_in_build = True + self.executor_server.max_starting_builds = 1 + self.executor_server.manageLoad() + self.assertTrue(self.executor_server.accepting_work) + A = self.fake_gerrit.addFakeChange('common-config', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + + build1 = self.waitForExecutorBuild('test1') + # With one job (test1) being started, we should no longer + # be accepting new work + self.assertFalse(self.executor_server.accepting_work) + self.assertEqual(len(self.executor_server.job_workers), 1) + # Allow enough starting builds for the test to complete. + self.executor_server.max_starting_builds = 3 + build1.release() + self.waitForWorkerCompletion(build1) + self.executor_server.manageLoad() + + self.waitForExecutorBuild('test2') + self.waitForExecutorBuild('test3') + self.assertFalse(self.executor_server.accepting_work) + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + self.executor_server.manageLoad() + self.assertTrue(self.executor_server.accepting_work) diff --git a/zuul/executor/server.py b/zuul/executor/server.py index d2982d22e6..9573a9cfe1 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -574,6 +574,7 @@ class AnsibleJob(object): self.proc = None self.proc_lock = threading.Lock() self.running = False + self.started = False # Whether playbooks have started running self.aborted = False self.aborted_reason = None self.thread = None @@ -850,6 +851,7 @@ class AnsibleJob(object): pre_failed = False success = False + self.started = True for index, playbook in enumerate(self.jobdir.pre_playbooks): # TODOv3(pabelanger): Implement pre-run timeout setting. pre_status, pre_code = self.runAnsiblePlaybook( @@ -1601,6 +1603,7 @@ class ExecutorServer(object): socket.gethostname()) self.log_streaming_port = log_streaming_port self.merger_lock = threading.Lock() + self.governor_lock = threading.Lock() self.run_lock = threading.Lock() self.verbose = False self.command_map = dict( @@ -1632,6 +1635,7 @@ class ExecutorServer(object): load_multiplier = float(get_default(self.config, 'executor', 'load_multiplier', '2.5')) self.max_load_avg = multiprocessing.cpu_count() * load_multiplier + self.max_starting_builds = self.max_load_avg * 2 self.min_avail_mem = float(get_default(self.config, 'executor', 'min_avail_mem', '5.0')) self.accepting_work = False @@ -1751,6 +1755,10 @@ class ExecutorServer(object): if self._running: self.accepting_work = True self.executor_worker.registerFunction("executor:execute") + # TODO(jeblair): Update geard to send a noop after + # registering for a job which is in the queue, then remove + # this API violation. + self.executor_worker._sendGrabJobUniq() def unregister_work(self): self.accepting_work = False @@ -1943,9 +1951,10 @@ class ExecutorServer(object): self.statsd.incr(base_key + '.builds') self.job_workers[job.unique] = self._job_class(self, job) self.job_workers[job.unique].run() + self.manageLoad() def run_governor(self): - while not self.governor_stop_event.wait(30): + while not self.governor_stop_event.wait(10): try: self.manageLoad() except Exception: @@ -1953,12 +1962,23 @@ class ExecutorServer(object): def manageLoad(self): ''' Apply some heuristics to decide whether or not we should - be askign for more jobs ''' + be asking for more jobs ''' + with self.governor_lock: + return self._manageLoad() + + def _manageLoad(self): load_avg = os.getloadavg()[0] avail_mem_pct = 100.0 - psutil.virtual_memory().percent + starting_builds = 0 + for worker in self.job_workers.values(): + if not worker.started: + starting_builds += 1 + max_starting_builds = max( + self.max_starting_builds - len(self.job_workers), + 1) if self.accepting_work: # Don't unregister if we don't have any active jobs. - if load_avg > self.max_load_avg and self.job_workers: + if load_avg > self.max_load_avg: self.log.info( "Unregistering due to high system load {} > {}".format( load_avg, self.max_load_avg)) @@ -1968,14 +1988,20 @@ class ExecutorServer(object): "Unregistering due to low memory {:3.1f}% < {}".format( avail_mem_pct, self.min_avail_mem)) self.unregister_work() + elif starting_builds >= max_starting_builds: + self.log.info( + "Unregistering due to too many starting builds {} >= {}" + .format(starting_builds, max_starting_builds)) + self.unregister_work() elif (load_avg <= self.max_load_avg and - avail_mem_pct >= self.min_avail_mem): + avail_mem_pct >= self.min_avail_mem and + starting_builds < max_starting_builds): self.log.info( "Re-registering as job is within limits " - "{} <= {} {:3.1f}% <= {}".format(load_avg, - self.max_load_avg, - avail_mem_pct, - self.min_avail_mem)) + "{} <= {} {:3.1f}% <= {} {} < {}".format( + load_avg, self.max_load_avg, + avail_mem_pct, self.min_avail_mem, + starting_builds, max_starting_builds)) self.register_work() if self.statsd: base_key = 'zuul.executor.%s' % self.hostname @@ -1985,6 +2011,8 @@ class ExecutorServer(object): int(avail_mem_pct * 100)) self.statsd.gauge(base_key + '.running_builds', len(self.job_workers)) + self.statsd.gauge(base_key + '.starting_builds', + starting_builds) def finishJob(self, unique): del(self.job_workers[unique])