Refactor load sensors into drivers

The executor governor can be made much simpler and generic when
abstracting the actual data gathering into sensors.

Change-Id: Ieb41eef1991c04c58bfb41c39ea0d1010ef3e330
This commit is contained in:
Tobias Henkel 2018-03-02 16:01:39 +00:00 committed by Tobias Henkel
parent a0888ad0ad
commit 3a1e0e6195
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
6 changed files with 228 additions and 54 deletions

View File

@ -23,6 +23,7 @@ import zuul.executor.server
import zuul.model
from tests.base import ZuulTestCase, simple_layout, iterate_timeout
from zuul.executor.sensors.startingbuilds import StartingBuildsSensor
class TestExecutorRepos(ZuulTestCase):
@ -487,6 +488,13 @@ class TestGovernor(ZuulTestCase):
return build
def test_slow_start(self):
def _set_starting_builds(min, max):
for sensor in self.executor_server.sensors:
if isinstance(sensor, StartingBuildsSensor):
sensor.min_starting_builds = min
sensor.max_starting_builds = max
# Note: This test relies on the fact that manageLoad is only
# run at specific points. Several times in the test we check
# that manageLoad has disabled or enabled job acceptance based
@ -501,8 +509,7 @@ class TestGovernor(ZuulTestCase):
# seconds).
self.executor_server.governor_stop_event.set()
self.executor_server.hold_jobs_in_build = True
self.executor_server.max_starting_builds = 1
self.executor_server.min_starting_builds = 1
_set_starting_builds(1, 1)
self.executor_server.manageLoad()
self.assertTrue(self.executor_server.accepting_work)
A = self.fake_gerrit.addFakeChange('common-config', 'master', 'A')
@ -514,7 +521,7 @@ class TestGovernor(ZuulTestCase):
self.assertFalse(self.executor_server.accepting_work)
self.assertEqual(len(self.executor_server.job_workers), 1)
# Allow enough starting builds for the test to complete.
self.executor_server.max_starting_builds = 3
_set_starting_builds(1, 3)
# We must wait for build1 to enter a waiting state otherwise
# the subsequent release() is a noop and the build is never
# released. We don't use waitUntilSettled as that requires

View File

@ -0,0 +1,43 @@
# Copyright 2018 BMW Car IT GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
class SensorInterface(object, metaclass=abc.ABCMeta):
"""The sensor interface used by the load governor
A sensor which provides load indicators for managing the load
on an executor.
"""
@abc.abstractmethod
def isOk(self):
"""Report if current load is ok for accepting new jobs.
:returns: Bool, str: True if we can accept new jobs, otherwise False
and a string for the log
:rtype: Bool, str
"""
pass
@abc.abstractmethod
def reportStats(self, statsd, base_key):
"""Report statistics to statsd
:param statsd: the statsd object to use for reporting
:param base_key: the base key to use for reporting
"""
pass

View File

@ -0,0 +1,42 @@
# Copyright 2018 BMW Car IT GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import multiprocessing
import os
from zuul.executor.sensors import SensorInterface
from zuul.lib.config import get_default
class CPUSensor(SensorInterface):
log = logging.getLogger("zuul.executor.sensor.cpu")
def __init__(self, config=None):
load_multiplier = float(get_default(config, 'executor',
'load_multiplier', '2.5'))
self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
def isOk(self):
load_avg = os.getloadavg()[0]
if load_avg > self.max_load_avg:
return False, "high system load {} > {}".format(
load_avg, self.max_load_avg)
return True, "{} <= {}".format(load_avg, self.max_load_avg)
def reportStats(self, statsd, base_key):
load_avg = os.getloadavg()[0]
statsd.gauge(base_key + '.load_average', int(load_avg * 100))

View File

@ -0,0 +1,47 @@
# Copyright 2018 BMW Car IT GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import psutil
from zuul.executor.sensors import SensorInterface
from zuul.lib.config import get_default
def get_avail_mem_pct():
avail_mem_pct = 100.0 - psutil.virtual_memory().percent
return avail_mem_pct
class RAMSensor(SensorInterface):
log = logging.getLogger("zuul.executor.sensor.ram")
def __init__(self, config=None):
self.min_avail_mem = float(get_default(config, 'executor',
'min_avail_mem', '5.0'))
def isOk(self):
avail_mem_pct = get_avail_mem_pct()
if avail_mem_pct < self.min_avail_mem:
return False, "low memory {:3.1f}% < {}".format(
avail_mem_pct, self.min_avail_mem)
return True, "{:3.1f}% <= {}".format(avail_mem_pct, self.min_avail_mem)
def reportStats(self, statsd, base_key):
avail_mem_pct = get_avail_mem_pct()
statsd.gauge(base_key + '.pct_used_ram',
int((100.0 - avail_mem_pct) * 100))

View File

@ -0,0 +1,53 @@
# Copyright 2018 BMW Car IT GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import multiprocessing
from zuul.executor.sensors import SensorInterface
class StartingBuildsSensor(SensorInterface):
log = logging.getLogger("zuul.executor.sensor.startingbuilds")
def __init__(self, executor, max_load_avg):
self.executor = executor
self.max_starting_builds = max_load_avg * 2
self.min_starting_builds = max(int(multiprocessing.cpu_count() / 2), 1)
def _getStartingBuilds(self):
starting_builds = 0
for worker in self.executor.job_workers.values():
if not worker.started:
starting_builds += 1
return starting_builds
def _getRunningBuilds(self):
return len(self.executor.job_workers)
def isOk(self):
starting_builds = self._getStartingBuilds()
max_starting_builds = max(
self.max_starting_builds - self._getRunningBuilds(),
self.min_starting_builds)
if starting_builds >= max_starting_builds:
return False, "too many starting builds {} >= {}".format(
starting_builds, max_starting_builds)
return True, "{} <= {}".format(starting_builds, max_starting_builds)
def reportStats(self, statsd, base_key):
statsd.gauge(base_key + '.running_builds', self._getRunningBuilds())
statsd.gauge(base_key + '.starting_builds', self._getStartingBuilds())

View File

@ -16,9 +16,7 @@ import collections
import datetime
import json
import logging
import multiprocessing
import os
import psutil
import shutil
import signal
import shlex
@ -40,6 +38,9 @@ import gear
import zuul.merger.merger
import zuul.ansible.logconfig
from zuul.executor.sensors.cpu import CPUSensor
from zuul.executor.sensors.startingbuilds import StartingBuildsSensor
from zuul.executor.sensors.ram import RAMSensor
from zuul.lib import commandsocket
BUFFER_LINES_FOR_SYNTAX = 200
@ -1818,13 +1819,6 @@ class ExecutorServer(object):
# If the execution driver ever becomes configurable again,
# this is where it would happen.
execution_wrapper_name = 'bubblewrap'
load_multiplier = float(get_default(self.config, 'executor',
'load_multiplier', '2.5'))
self.max_load_avg = multiprocessing.cpu_count() * load_multiplier
self.max_starting_builds = self.max_load_avg * 2
self.min_starting_builds = max(int(multiprocessing.cpu_count() / 2), 1)
self.min_avail_mem = float(get_default(self.config, 'executor',
'min_avail_mem', '5.0'))
self.accepting_work = False
self.execution_wrapper = connections.drivers[execution_wrapper_name]
@ -1874,6 +1868,13 @@ class ExecutorServer(object):
self.stopJobDiskFull,
self.merge_root)
cpu_sensor = CPUSensor(config)
self.sensors = [
cpu_sensor,
RAMSensor(config),
StartingBuildsSensor(self, cpu_sensor.max_load_avg)
]
def _getMerger(self, root, cache_root, logger=None):
return zuul.merger.merger.Merger(
root, self.connections, self.merge_email, self.merge_name,
@ -2160,52 +2161,33 @@ class ExecutorServer(object):
return self._manageLoad()
def _manageLoad(self):
load_avg = os.getloadavg()[0]
avail_mem_pct = 100.0 - psutil.virtual_memory().percent
starting_builds = 0
for worker in self.job_workers.values():
if not worker.started:
starting_builds += 1
max_starting_builds = max(
self.max_starting_builds - len(self.job_workers),
self.min_starting_builds)
if self.accepting_work:
# Don't unregister if we don't have any active jobs.
if load_avg > self.max_load_avg:
self.log.info(
"Unregistering due to high system load {} > {}".format(
load_avg, self.max_load_avg))
self.unregister_work()
elif avail_mem_pct < self.min_avail_mem:
self.log.info(
"Unregistering due to low memory {:3.1f}% < {}".format(
avail_mem_pct, self.min_avail_mem))
self.unregister_work()
elif starting_builds >= max_starting_builds:
self.log.info(
"Unregistering due to too many starting builds {} >= {}"
.format(starting_builds, max_starting_builds))
self.unregister_work()
elif (load_avg <= self.max_load_avg and
avail_mem_pct >= self.min_avail_mem and
starting_builds < max_starting_builds):
self.log.info(
"Re-registering as job is within limits "
"{} <= {} {:3.1f}% <= {} {} < {}".format(
load_avg, self.max_load_avg,
avail_mem_pct, self.min_avail_mem,
starting_builds, max_starting_builds))
self.register_work()
for sensor in self.sensors:
ok, message = sensor.isOk()
if not ok:
self.log.info(
"Unregistering due to {}".format(message))
self.unregister_work()
break
else:
reregister = True
limits = []
for sensor in self.sensors:
ok, message = sensor.isOk()
limits.append(message)
if not ok:
reregister = False
break
if reregister:
self.log.info("Re-registering as job is within its limits "
"{}".format(", ".join(limits)))
self.register_work()
if self.statsd:
base_key = 'zuul.executor.{hostname}'
self.statsd.gauge(base_key + '.load_average',
int(load_avg * 100))
self.statsd.gauge(base_key + '.pct_used_ram',
int((100.0 - avail_mem_pct) * 100))
self.statsd.gauge(base_key + '.running_builds',
len(self.job_workers))
self.statsd.gauge(base_key + '.starting_builds',
starting_builds)
for sensor in self.sensors:
sensor.reportStats(self.statsd, base_key)
def finishJob(self, unique):
del(self.job_workers[unique])