Browse Source

Merge "Executor: Don't start too many jobs at once"

changes/23/540623/1
Zuul 3 years ago
committed by Gerrit Code Review
parent
commit
0a64a92e9e
6 changed files with 165 additions and 9 deletions
  1. +8
    -1
      doc/source/admin/monitoring.rst
  2. +2
    -0
      tests/fixtures/config/governor/git/common-config/playbooks/base.yaml
  3. +34
    -0
      tests/fixtures/config/governor/git/common-config/zuul.yaml
  4. +6
    -0
      tests/fixtures/config/governor/main.yaml
  5. +79
    -0
      tests/unit/test_executor.py
  6. +36
    -8
      zuul/executor/server.py

+ 8
- 1
doc/source/admin/monitoring.rst View File

@ -136,10 +136,17 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
Incremented each time the executor starts a build.
.. stat:: starting_builds
:type: gauge
The number of builds starting on this executor. These are
builds which have not yet begun their first pre-playbook.
.. stat:: running_builds
:type: gauge
The number of builds currently running on this executor.
The number of builds currently running on this executor. This
includes starting builds.
.. stat:: load_average
:type: gauge


+ 2
- 0
tests/fixtures/config/governor/git/common-config/playbooks/base.yaml View File

@ -0,0 +1,2 @@
- hosts: all
tasks: []

+ 34
- 0
tests/fixtures/config/governor/git/common-config/zuul.yaml View File

@ -0,0 +1,34 @@
- pipeline:
name: check
manager: independent
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
Verified: 1
failure:
gerrit:
Verified: -1
- job:
name: base
parent: null
run: playbooks/base.yaml
- job:
name: test1
- job:
name: test2
- job:
name: test3
- project:
name: common-config
check:
jobs:
- test1
- test2
- test3

+ 6
- 0
tests/fixtures/config/governor/main.yaml View File

@ -0,0 +1,6 @@
- tenant:
name: tenant-one
source:
gerrit:
config-projects:
- common-config

+ 79
- 0
tests/unit/test_executor.py View File

@ -15,6 +15,11 @@
# License for the specific language governing permissions and limitations
# under the License.
try:
from unittest import mock
except ImportError:
import mock
import logging
import time
@ -436,3 +441,77 @@ class TestExecutorHostname(ZuulTestCase):
def test_executor_hostname(self):
self.assertEqual('test-executor-hostname.example.com',
self.executor_server.hostname)
class TestGovernor(ZuulTestCase):
tenant_config_file = 'config/governor/main.yaml'
@mock.patch('os.getloadavg')
@mock.patch('psutil.virtual_memory')
def test_load_governor(self, vm_mock, loadavg_mock):
class Dummy(object):
pass
ram = Dummy()
ram.percent = 20.0 # 20% used
vm_mock.return_value = ram
loadavg_mock.return_value = (0.0, 0.0, 0.0)
self.executor_server.manageLoad()
self.assertTrue(self.executor_server.accepting_work)
ram.percent = 99.0 # 99% used
loadavg_mock.return_value = (100.0, 100.0, 100.0)
self.executor_server.manageLoad()
self.assertFalse(self.executor_server.accepting_work)
def waitForExecutorBuild(self, jobname):
timeout = time.time() + 30
build = None
while (time.time() < timeout and not build):
for b in self.builds:
if b.name == jobname:
build = b
break
time.sleep(0.1)
build_id = build.uuid
while (time.time() < timeout and
build_id not in self.executor_server.job_workers):
time.sleep(0.1)
worker = self.executor_server.job_workers[build_id]
while (time.time() < timeout and
not worker.started):
time.sleep(0.1)
return build
def waitForWorkerCompletion(self, build):
timeout = time.time() + 30
while (time.time() < timeout and
build.uuid in self.executor_server.job_workers):
time.sleep(0.1)
def test_slow_start(self):
self.executor_server.hold_jobs_in_build = True
self.executor_server.max_starting_builds = 1
self.executor_server.manageLoad()
self.assertTrue(self.executor_server.accepting_work)
A = self.fake_gerrit.addFakeChange('common-config', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
build1 = self.waitForExecutorBuild('test1')
# With one job (test1) being started, we should no longer
# be accepting new work
self.assertFalse(self.executor_server.accepting_work)
self.assertEqual(len(self.executor_server.job_workers), 1)
# Allow enough starting builds for the test to complete.
self.executor_server.max_starting_builds = 3
build1.release()
self.waitForWorkerCompletion(build1)
self.executor_server.manageLoad()
self.waitForExecutorBuild('test2')
self.waitForExecutorBuild('test3')
self.assertFalse(self.executor_server.accepting_work)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.executor_server.manageLoad()
self.assertTrue(self.executor_server.accepting_work)

+ 36
- 8
zuul/executor/server.py View File

@ -574,6 +574,7 @@ class AnsibleJob(object):
self.proc = None
self.proc_lock = threading.Lock()
self.running = False
self.started = False # Whether playbooks have started running
self.aborted = False
self.aborted_reason = None
self.thread = None
@ -850,6 +851,7 @@ class AnsibleJob(object):
pre_failed = False
success = False
self.started = True
for index, playbook in enumerate(self.jobdir.pre_playbooks):
# TODOv3(pabelanger): Implement pre-run timeout setting.
pre_status, pre_code = self.runAnsiblePlaybook(
@ -1601,6 +1603,7 @@ class ExecutorServer(object):
socket.gethostname())
self.log_streaming_port = log_streaming_port
self.merger_lock = threading.Lock()
self.governor_lock = threading.Lock()
self.run_lock = threading.Lock()
self.verbose = False
self.command_map = dict(
@ -1632,6 +1635,7 @@ class ExecutorServer(object):
load_multiplier = float(get_default(self.config, 'executor',
'load_multiplier', '2.5'))
self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
self.max_starting_builds = self.max_load_avg * 2
self.min_avail_mem = float(get_default(self.config, 'executor',
'min_avail_mem', '5.0'))
self.accepting_work = False
@ -1751,6 +1755,10 @@ class ExecutorServer(object):
if self._running:
self.accepting_work = True
self.executor_worker.registerFunction("executor:execute")
# TODO(jeblair): Update geard to send a noop after
# registering for a job which is in the queue, then remove
# this API violation.
self.executor_worker._sendGrabJobUniq()
def unregister_work(self):
self.accepting_work = False
@ -1943,9 +1951,10 @@ class ExecutorServer(object):
self.statsd.incr(base_key + '.builds')
self.job_workers[job.unique] = self._job_class(self, job)
self.job_workers[job.unique].run()
self.manageLoad()
def run_governor(self):
while not self.governor_stop_event.wait(30):
while not self.governor_stop_event.wait(10):
try:
self.manageLoad()
except Exception:
@ -1953,12 +1962,23 @@ class ExecutorServer(object):
def manageLoad(self):
''' Apply some heuristics to decide whether or not we should
be askign for more jobs '''
be asking for more jobs '''
with self.governor_lock:
return self._manageLoad()
def _manageLoad(self):
load_avg = os.getloadavg()[0]
avail_mem_pct = 100.0 - psutil.virtual_memory().percent
starting_builds = 0
for worker in self.job_workers.values():
if not worker.started:
starting_builds += 1
max_starting_builds = max(
self.max_starting_builds - len(self.job_workers),
1)
if self.accepting_work:
# Don't unregister if we don't have any active jobs.
if load_avg > self.max_load_avg and self.job_workers:
if load_avg > self.max_load_avg:
self.log.info(
"Unregistering due to high system load {} > {}".format(
load_avg, self.max_load_avg))
@ -1968,14 +1988,20 @@ class ExecutorServer(object):
"Unregistering due to low memory {:3.1f}% < {}".format(
avail_mem_pct, self.min_avail_mem))
self.unregister_work()
elif starting_builds >= max_starting_builds:
self.log.info(
"Unregistering due to too many starting builds {} >= {}"
.format(starting_builds, max_starting_builds))
self.unregister_work()
elif (load_avg <= self.max_load_avg and
avail_mem_pct >= self.min_avail_mem):
avail_mem_pct >= self.min_avail_mem and
starting_builds < max_starting_builds):
self.log.info(
"Re-registering as job is within limits "
"{} <= {} {:3.1f}% <= {}".format(load_avg,
self.max_load_avg,
avail_mem_pct,
self.min_avail_mem))
"{} <= {} {:3.1f}% <= {} {} < {}".format(
load_avg, self.max_load_avg,
avail_mem_pct, self.min_avail_mem,
starting_builds, max_starting_builds))
self.register_work()
if self.statsd:
base_key = 'zuul.executor.%s' % self.hostname
@ -1985,6 +2011,8 @@ class ExecutorServer(object):
int(avail_mem_pct * 100))
self.statsd.gauge(base_key + '.running_builds',
len(self.job_workers))
self.statsd.gauge(base_key + '.starting_builds',
starting_builds)
def finishJob(self, unique):
del(self.job_workers[unique])


Loading…
Cancel
Save