diff --git a/doc/source/admin/monitoring.rst b/doc/source/admin/monitoring.rst index 0fdb3b2233..d43fd035c8 100644 --- a/doc/source/admin/monitoring.rst +++ b/doc/source/admin/monitoring.rst @@ -136,10 +136,17 @@ These metrics are emitted by the Zuul :ref:`scheduler`: Incremented each time the executor starts a build. + .. stat:: starting_builds + :type: gauge + + The number of builds starting on this executor. These are + builds which have not yet begun their first pre-playbook. + .. stat:: running_builds :type: gauge - The number of builds currently running on this executor. + The number of builds currently running on this executor. This + includes starting builds. .. stat:: load_average :type: gauge diff --git a/tests/fixtures/config/governor/git/common-config/playbooks/base.yaml b/tests/fixtures/config/governor/git/common-config/playbooks/base.yaml new file mode 100644 index 0000000000..f679dceaef --- /dev/null +++ b/tests/fixtures/config/governor/git/common-config/playbooks/base.yaml @@ -0,0 +1,2 @@ +- hosts: all + tasks: [] diff --git a/tests/fixtures/config/governor/git/common-config/zuul.yaml b/tests/fixtures/config/governor/git/common-config/zuul.yaml new file mode 100644 index 0000000000..093da16b13 --- /dev/null +++ b/tests/fixtures/config/governor/git/common-config/zuul.yaml @@ -0,0 +1,34 @@ +- pipeline: + name: check + manager: independent + trigger: + gerrit: + - event: patchset-created + success: + gerrit: + Verified: 1 + failure: + gerrit: + Verified: -1 + +- job: + name: base + parent: null + run: playbooks/base.yaml + +- job: + name: test1 + +- job: + name: test2 + +- job: + name: test3 + +- project: + name: common-config + check: + jobs: + - test1 + - test2 + - test3 diff --git a/tests/fixtures/config/governor/main.yaml b/tests/fixtures/config/governor/main.yaml new file mode 100644 index 0000000000..9d01f542f9 --- /dev/null +++ b/tests/fixtures/config/governor/main.yaml @@ -0,0 +1,6 @@ +- tenant: + name: tenant-one + source: + gerrit: + config-projects: + - common-config diff --git a/tests/unit/test_executor.py b/tests/unit/test_executor.py index 474859d781..8cb98ee8bb 100755 --- a/tests/unit/test_executor.py +++ b/tests/unit/test_executor.py @@ -15,6 +15,11 @@ # License for the specific language governing permissions and limitations # under the License. +try: + from unittest import mock +except ImportError: + import mock + import logging import time @@ -436,3 +441,77 @@ class TestExecutorHostname(ZuulTestCase): def test_executor_hostname(self): self.assertEqual('test-executor-hostname.example.com', self.executor_server.hostname) + + +class TestGovernor(ZuulTestCase): + tenant_config_file = 'config/governor/main.yaml' + + @mock.patch('os.getloadavg') + @mock.patch('psutil.virtual_memory') + def test_load_governor(self, vm_mock, loadavg_mock): + class Dummy(object): + pass + ram = Dummy() + ram.percent = 20.0 # 20% used + vm_mock.return_value = ram + loadavg_mock.return_value = (0.0, 0.0, 0.0) + self.executor_server.manageLoad() + self.assertTrue(self.executor_server.accepting_work) + ram.percent = 99.0 # 99% used + loadavg_mock.return_value = (100.0, 100.0, 100.0) + self.executor_server.manageLoad() + self.assertFalse(self.executor_server.accepting_work) + + def waitForExecutorBuild(self, jobname): + timeout = time.time() + 30 + build = None + while (time.time() < timeout and not build): + for b in self.builds: + if b.name == jobname: + build = b + break + time.sleep(0.1) + build_id = build.uuid + while (time.time() < timeout and + build_id not in self.executor_server.job_workers): + time.sleep(0.1) + worker = self.executor_server.job_workers[build_id] + while (time.time() < timeout and + not worker.started): + time.sleep(0.1) + return build + + def waitForWorkerCompletion(self, build): + timeout = time.time() + 30 + while (time.time() < timeout and + build.uuid in self.executor_server.job_workers): + time.sleep(0.1) + + def test_slow_start(self): + self.executor_server.hold_jobs_in_build = True + self.executor_server.max_starting_builds = 1 + self.executor_server.manageLoad() + self.assertTrue(self.executor_server.accepting_work) + A = self.fake_gerrit.addFakeChange('common-config', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + + build1 = self.waitForExecutorBuild('test1') + # With one job (test1) being started, we should no longer + # be accepting new work + self.assertFalse(self.executor_server.accepting_work) + self.assertEqual(len(self.executor_server.job_workers), 1) + # Allow enough starting builds for the test to complete. + self.executor_server.max_starting_builds = 3 + build1.release() + self.waitForWorkerCompletion(build1) + self.executor_server.manageLoad() + + self.waitForExecutorBuild('test2') + self.waitForExecutorBuild('test3') + self.assertFalse(self.executor_server.accepting_work) + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + self.executor_server.manageLoad() + self.assertTrue(self.executor_server.accepting_work) diff --git a/zuul/executor/server.py b/zuul/executor/server.py index d2982d22e6..9573a9cfe1 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -574,6 +574,7 @@ class AnsibleJob(object): self.proc = None self.proc_lock = threading.Lock() self.running = False + self.started = False # Whether playbooks have started running self.aborted = False self.aborted_reason = None self.thread = None @@ -850,6 +851,7 @@ class AnsibleJob(object): pre_failed = False success = False + self.started = True for index, playbook in enumerate(self.jobdir.pre_playbooks): # TODOv3(pabelanger): Implement pre-run timeout setting. pre_status, pre_code = self.runAnsiblePlaybook( @@ -1601,6 +1603,7 @@ class ExecutorServer(object): socket.gethostname()) self.log_streaming_port = log_streaming_port self.merger_lock = threading.Lock() + self.governor_lock = threading.Lock() self.run_lock = threading.Lock() self.verbose = False self.command_map = dict( @@ -1632,6 +1635,7 @@ class ExecutorServer(object): load_multiplier = float(get_default(self.config, 'executor', 'load_multiplier', '2.5')) self.max_load_avg = multiprocessing.cpu_count() * load_multiplier + self.max_starting_builds = self.max_load_avg * 2 self.min_avail_mem = float(get_default(self.config, 'executor', 'min_avail_mem', '5.0')) self.accepting_work = False @@ -1751,6 +1755,10 @@ class ExecutorServer(object): if self._running: self.accepting_work = True self.executor_worker.registerFunction("executor:execute") + # TODO(jeblair): Update geard to send a noop after + # registering for a job which is in the queue, then remove + # this API violation. + self.executor_worker._sendGrabJobUniq() def unregister_work(self): self.accepting_work = False @@ -1943,9 +1951,10 @@ class ExecutorServer(object): self.statsd.incr(base_key + '.builds') self.job_workers[job.unique] = self._job_class(self, job) self.job_workers[job.unique].run() + self.manageLoad() def run_governor(self): - while not self.governor_stop_event.wait(30): + while not self.governor_stop_event.wait(10): try: self.manageLoad() except Exception: @@ -1953,12 +1962,23 @@ class ExecutorServer(object): def manageLoad(self): ''' Apply some heuristics to decide whether or not we should - be askign for more jobs ''' + be asking for more jobs ''' + with self.governor_lock: + return self._manageLoad() + + def _manageLoad(self): load_avg = os.getloadavg()[0] avail_mem_pct = 100.0 - psutil.virtual_memory().percent + starting_builds = 0 + for worker in self.job_workers.values(): + if not worker.started: + starting_builds += 1 + max_starting_builds = max( + self.max_starting_builds - len(self.job_workers), + 1) if self.accepting_work: # Don't unregister if we don't have any active jobs. - if load_avg > self.max_load_avg and self.job_workers: + if load_avg > self.max_load_avg: self.log.info( "Unregistering due to high system load {} > {}".format( load_avg, self.max_load_avg)) @@ -1968,14 +1988,20 @@ class ExecutorServer(object): "Unregistering due to low memory {:3.1f}% < {}".format( avail_mem_pct, self.min_avail_mem)) self.unregister_work() + elif starting_builds >= max_starting_builds: + self.log.info( + "Unregistering due to too many starting builds {} >= {}" + .format(starting_builds, max_starting_builds)) + self.unregister_work() elif (load_avg <= self.max_load_avg and - avail_mem_pct >= self.min_avail_mem): + avail_mem_pct >= self.min_avail_mem and + starting_builds < max_starting_builds): self.log.info( "Re-registering as job is within limits " - "{} <= {} {:3.1f}% <= {}".format(load_avg, - self.max_load_avg, - avail_mem_pct, - self.min_avail_mem)) + "{} <= {} {:3.1f}% <= {} {} < {}".format( + load_avg, self.max_load_avg, + avail_mem_pct, self.min_avail_mem, + starting_builds, max_starting_builds)) self.register_work() if self.statsd: base_key = 'zuul.executor.%s' % self.hostname @@ -1985,6 +2011,8 @@ class ExecutorServer(object): int(avail_mem_pct * 100)) self.statsd.gauge(base_key + '.running_builds', len(self.job_workers)) + self.statsd.gauge(base_key + '.starting_builds', + starting_builds) def finishJob(self, unique): del(self.job_workers[unique])