Executor: Don't start too many jobs at once

The metrics that we use to govern load on the executors are all
trailing indicators.  The executors are capable of accepting a
large number of jobs in a batch and then, only after they begin
to run, will the load indicators increase.  To avoid the thundering
herd problem, reduce the rate at which we accept jobs past a certain
point.

That point is twice the number of jobs as the target load average.
In practice that seems to be a fairly conservative but reasonable
number of jobs for the executor to run, so, to facilitate a quick
start, allow the executor to start up to that number all at once.

Once the number of jobs running is beyond that number, subsequent
jobs will only be accepted one at a time, after each one completes
its startup phase (cloning repos, establishing ansible connections),
which is to say, at the point where the job begins running its first
pre-playbook.

We will also wait until the next regular interval of the governor
to accept the next job.  That's currently 30 seconds, but to make
the system a bit more responsive, it's lowered to 10 seconds in this
change.

To summarize: after a bunch[1] of jobs are running, after each new
job, we wait until that job has started running playbooks, plus up
to an additional 10 seconds, before accepting a new job.

This is implemented by adding a 'starting jobs' metric to the governor
so that we register or de-register the execute function based on
whether too many jobs are in the startup phase.  We add a forced
call to the governor routine after each job starts so that we can
unregister if necessary before picking up the next job, and wrap that
routine in a lock since it is now called from multiple threads and
its logic may not be entirely thread-safe.

Also, add tests for all three inputs to manageLoad.

[1] 2*target load average

Change-Id: I066bc539e70eb475ca2b871fb90644264d8d5bf4
This commit is contained in:
James E. Blair 2018-02-01 13:59:48 -08:00
parent 42382a7026
commit df37ad2ce7
6 changed files with 165 additions and 9 deletions

View File

@ -136,10 +136,17 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
Incremented each time the executor starts a build. Incremented each time the executor starts a build.
.. stat:: starting_builds
:type: gauge
The number of builds starting on this executor. These are
builds which have not yet begun their first pre-playbook.
.. stat:: running_builds .. stat:: running_builds
:type: gauge :type: gauge
The number of builds currently running on this executor. The number of builds currently running on this executor. This
includes starting builds.
.. stat:: load_average .. stat:: load_average
:type: gauge :type: gauge

View File

@ -0,0 +1,2 @@
- hosts: all
tasks: []

View File

@ -0,0 +1,34 @@
- pipeline:
name: check
manager: independent
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
Verified: 1
failure:
gerrit:
Verified: -1
- job:
name: base
parent: null
run: playbooks/base.yaml
- job:
name: test1
- job:
name: test2
- job:
name: test3
- project:
name: common-config
check:
jobs:
- test1
- test2
- test3

View File

@ -0,0 +1,6 @@
- tenant:
name: tenant-one
source:
gerrit:
config-projects:
- common-config

View File

@ -15,6 +15,11 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
try:
from unittest import mock
except ImportError:
import mock
import logging import logging
import time import time
@ -436,3 +441,77 @@ class TestExecutorHostname(ZuulTestCase):
def test_executor_hostname(self): def test_executor_hostname(self):
self.assertEqual('test-executor-hostname.example.com', self.assertEqual('test-executor-hostname.example.com',
self.executor_server.hostname) self.executor_server.hostname)
class TestGovernor(ZuulTestCase):
tenant_config_file = 'config/governor/main.yaml'
@mock.patch('os.getloadavg')
@mock.patch('psutil.virtual_memory')
def test_load_governor(self, vm_mock, loadavg_mock):
class Dummy(object):
pass
ram = Dummy()
ram.percent = 20.0 # 20% used
vm_mock.return_value = ram
loadavg_mock.return_value = (0.0, 0.0, 0.0)
self.executor_server.manageLoad()
self.assertTrue(self.executor_server.accepting_work)
ram.percent = 99.0 # 99% used
loadavg_mock.return_value = (100.0, 100.0, 100.0)
self.executor_server.manageLoad()
self.assertFalse(self.executor_server.accepting_work)
def waitForExecutorBuild(self, jobname):
timeout = time.time() + 30
build = None
while (time.time() < timeout and not build):
for b in self.builds:
if b.name == jobname:
build = b
break
time.sleep(0.1)
build_id = build.uuid
while (time.time() < timeout and
build_id not in self.executor_server.job_workers):
time.sleep(0.1)
worker = self.executor_server.job_workers[build_id]
while (time.time() < timeout and
not worker.started):
time.sleep(0.1)
return build
def waitForWorkerCompletion(self, build):
timeout = time.time() + 30
while (time.time() < timeout and
build.uuid in self.executor_server.job_workers):
time.sleep(0.1)
def test_slow_start(self):
self.executor_server.hold_jobs_in_build = True
self.executor_server.max_starting_builds = 1
self.executor_server.manageLoad()
self.assertTrue(self.executor_server.accepting_work)
A = self.fake_gerrit.addFakeChange('common-config', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
build1 = self.waitForExecutorBuild('test1')
# With one job (test1) being started, we should no longer
# be accepting new work
self.assertFalse(self.executor_server.accepting_work)
self.assertEqual(len(self.executor_server.job_workers), 1)
# Allow enough starting builds for the test to complete.
self.executor_server.max_starting_builds = 3
build1.release()
self.waitForWorkerCompletion(build1)
self.executor_server.manageLoad()
self.waitForExecutorBuild('test2')
self.waitForExecutorBuild('test3')
self.assertFalse(self.executor_server.accepting_work)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.executor_server.manageLoad()
self.assertTrue(self.executor_server.accepting_work)

View File

@ -574,6 +574,7 @@ class AnsibleJob(object):
self.proc = None self.proc = None
self.proc_lock = threading.Lock() self.proc_lock = threading.Lock()
self.running = False self.running = False
self.started = False # Whether playbooks have started running
self.aborted = False self.aborted = False
self.aborted_reason = None self.aborted_reason = None
self.thread = None self.thread = None
@ -850,6 +851,7 @@ class AnsibleJob(object):
pre_failed = False pre_failed = False
success = False success = False
self.started = True
for index, playbook in enumerate(self.jobdir.pre_playbooks): for index, playbook in enumerate(self.jobdir.pre_playbooks):
# TODOv3(pabelanger): Implement pre-run timeout setting. # TODOv3(pabelanger): Implement pre-run timeout setting.
pre_status, pre_code = self.runAnsiblePlaybook( pre_status, pre_code = self.runAnsiblePlaybook(
@ -1601,6 +1603,7 @@ class ExecutorServer(object):
socket.gethostname()) socket.gethostname())
self.log_streaming_port = log_streaming_port self.log_streaming_port = log_streaming_port
self.merger_lock = threading.Lock() self.merger_lock = threading.Lock()
self.governor_lock = threading.Lock()
self.run_lock = threading.Lock() self.run_lock = threading.Lock()
self.verbose = False self.verbose = False
self.command_map = dict( self.command_map = dict(
@ -1632,6 +1635,7 @@ class ExecutorServer(object):
load_multiplier = float(get_default(self.config, 'executor', load_multiplier = float(get_default(self.config, 'executor',
'load_multiplier', '2.5')) 'load_multiplier', '2.5'))
self.max_load_avg = multiprocessing.cpu_count() * load_multiplier self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
self.max_starting_builds = self.max_load_avg * 2
self.min_avail_mem = float(get_default(self.config, 'executor', self.min_avail_mem = float(get_default(self.config, 'executor',
'min_avail_mem', '5.0')) 'min_avail_mem', '5.0'))
self.accepting_work = False self.accepting_work = False
@ -1751,6 +1755,10 @@ class ExecutorServer(object):
if self._running: if self._running:
self.accepting_work = True self.accepting_work = True
self.executor_worker.registerFunction("executor:execute") self.executor_worker.registerFunction("executor:execute")
# TODO(jeblair): Update geard to send a noop after
# registering for a job which is in the queue, then remove
# this API violation.
self.executor_worker._sendGrabJobUniq()
def unregister_work(self): def unregister_work(self):
self.accepting_work = False self.accepting_work = False
@ -1943,9 +1951,10 @@ class ExecutorServer(object):
self.statsd.incr(base_key + '.builds') self.statsd.incr(base_key + '.builds')
self.job_workers[job.unique] = self._job_class(self, job) self.job_workers[job.unique] = self._job_class(self, job)
self.job_workers[job.unique].run() self.job_workers[job.unique].run()
self.manageLoad()
def run_governor(self): def run_governor(self):
while not self.governor_stop_event.wait(30): while not self.governor_stop_event.wait(10):
try: try:
self.manageLoad() self.manageLoad()
except Exception: except Exception:
@ -1953,12 +1962,23 @@ class ExecutorServer(object):
def manageLoad(self): def manageLoad(self):
''' Apply some heuristics to decide whether or not we should ''' Apply some heuristics to decide whether or not we should
be askign for more jobs ''' be asking for more jobs '''
with self.governor_lock:
return self._manageLoad()
def _manageLoad(self):
load_avg = os.getloadavg()[0] load_avg = os.getloadavg()[0]
avail_mem_pct = 100.0 - psutil.virtual_memory().percent avail_mem_pct = 100.0 - psutil.virtual_memory().percent
starting_builds = 0
for worker in self.job_workers.values():
if not worker.started:
starting_builds += 1
max_starting_builds = max(
self.max_starting_builds - len(self.job_workers),
1)
if self.accepting_work: if self.accepting_work:
# Don't unregister if we don't have any active jobs. # Don't unregister if we don't have any active jobs.
if load_avg > self.max_load_avg and self.job_workers: if load_avg > self.max_load_avg:
self.log.info( self.log.info(
"Unregistering due to high system load {} > {}".format( "Unregistering due to high system load {} > {}".format(
load_avg, self.max_load_avg)) load_avg, self.max_load_avg))
@ -1968,14 +1988,20 @@ class ExecutorServer(object):
"Unregistering due to low memory {:3.1f}% < {}".format( "Unregistering due to low memory {:3.1f}% < {}".format(
avail_mem_pct, self.min_avail_mem)) avail_mem_pct, self.min_avail_mem))
self.unregister_work() self.unregister_work()
elif starting_builds >= max_starting_builds:
self.log.info(
"Unregistering due to too many starting builds {} >= {}"
.format(starting_builds, max_starting_builds))
self.unregister_work()
elif (load_avg <= self.max_load_avg and elif (load_avg <= self.max_load_avg and
avail_mem_pct >= self.min_avail_mem): avail_mem_pct >= self.min_avail_mem and
starting_builds < max_starting_builds):
self.log.info( self.log.info(
"Re-registering as job is within limits " "Re-registering as job is within limits "
"{} <= {} {:3.1f}% <= {}".format(load_avg, "{} <= {} {:3.1f}% <= {} {} < {}".format(
self.max_load_avg, load_avg, self.max_load_avg,
avail_mem_pct, avail_mem_pct, self.min_avail_mem,
self.min_avail_mem)) starting_builds, max_starting_builds))
self.register_work() self.register_work()
if self.statsd: if self.statsd:
base_key = 'zuul.executor.%s' % self.hostname base_key = 'zuul.executor.%s' % self.hostname
@ -1985,6 +2011,8 @@ class ExecutorServer(object):
int(avail_mem_pct * 100)) int(avail_mem_pct * 100))
self.statsd.gauge(base_key + '.running_builds', self.statsd.gauge(base_key + '.running_builds',
len(self.job_workers)) len(self.job_workers))
self.statsd.gauge(base_key + '.starting_builds',
starting_builds)
def finishJob(self, unique): def finishJob(self, unique):
del(self.job_workers[unique]) del(self.job_workers[unique])