Add a process limit governor to the executor

The zuul executor has a number of governors that check system limits
which will result in the executor unregistering itself from performing
future work when near to these limits. One such limit is the number of
processes that is allowed by the runtime system. Add a governor that
will unregister the executor when near to the process limit.

To do this we rely on cgroups' pids.max and pids.current values. There
are two ways these manifest. The first is running within a container. In
this case the values will be present in the root cgroup directory. When
not running a container these values are not present. We could then
attempt to rely on ulimits and process listings, but it appears that
systemd creates a user.slice cgroup for each user and tracks the
pids.max and pids.current (and on my system these match my ulimits) so
we use this instead. Systemd is fairly ubiquitous and this prevents us
from having two different implementations in place.

This is important because running zuul jobs requires a number of
processes. When we are near the process limit starting a new job will
fork ansible and ansible forks ssh which also relies on control
persistence processes and an ssh-agent. Rather than failing the job due
to an inability to fork we should stop running new jobs until we have
backed away from the limit.

Change-Id: Ie24e386680087d7640fac13ceb34b1eb934d5146
This commit is contained in:
Clark Boylan
2025-03-22 10:39:27 -07:00
parent 220b655ae6
commit 4901d4c013
12 changed files with 322 additions and 0 deletions

View File

@ -0,0 +1,10 @@
---
features:
- |
The Zuul Executor has a new governor sensor that detects when the
Executor is nearing the limit on new processes. Running Zuul jobs
in the Executor requires a number of process including but not limited
to: Bubblewrap, Ansible, SSH, and SSH Agent. This new governor sensor
helps to avoid jobs failing due to an inability to fork one of these
many processes. Instead the Executor will pause its efforts until it
backs away from the process limit.

1
tests/fixtures/cgroup/pids.100 vendored Normal file
View File

@ -0,0 +1 @@
100

1
tests/fixtures/cgroup/pids.141 vendored Normal file
View File

@ -0,0 +1 @@
141

1
tests/fixtures/cgroup/pids.150 vendored Normal file
View File

@ -0,0 +1 @@
150

1
tests/fixtures/cgroup/pids.15600 vendored Normal file
View File

@ -0,0 +1 @@
15600

1
tests/fixtures/cgroup/pids.16k vendored Normal file
View File

@ -0,0 +1 @@
16384

1
tests/fixtures/cgroup/pids.8k vendored Normal file
View File

@ -0,0 +1 @@
8192

1
tests/fixtures/cgroup/pids.foo vendored Normal file
View File

@ -0,0 +1 @@
foo

1
tests/fixtures/cgroup/pids.max vendored Normal file
View File

@ -0,0 +1 @@
max

View File

@ -34,6 +34,7 @@ from tests.base import (
)
from zuul.executor.sensors.startingbuilds import StartingBuildsSensor
from zuul.executor.sensors.process import ProcessSensor
from zuul.executor.sensors.ram import RAMSensor
from zuul.executor.server import squash_variables
from zuul.model import NodeSet, Group
@ -849,6 +850,189 @@ class TestGovernor(ZuulTestCase):
self.executor_server.manageLoad()
self.assertFalse(self.executor_server.accepting_work)
@mock.patch('os.getloadavg')
@mock.patch('psutil.virtual_memory')
@okay_tracebacks("invalid literal for int() with base 10: 'foo'")
def test_process_governor(self, vm_mock, loadavg_mock):
# Set up load average and memory sensors to accept work.
class Dummy(object):
pass
ram = Dummy()
ram.percent = 20.0 # 20% used
ram.total = 8 * 1024 * 1024 * 1024 # 8GiB
vm_mock.return_value = ram
loadavg_mock.return_value = (0.0, 0.0, 0.0)
# Test no limit
process_sensor = [x for x in self.executor_server.sensors
if isinstance(x, ProcessSensor)][0]
process_sensor._root_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.max')
process_sensor._root_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
process_sensor._user_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.max')
process_sensor._user_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
self.executor_server.manageLoad()
self.assertTrue(self.executor_server.accepting_work)
# Test typical runtime values
process_sensor._root_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
process_sensor._root_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.8k')
process_sensor._user_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
process_sensor._user_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.8k')
self.executor_server.manageLoad()
self.assertTrue(self.executor_server.accepting_work)
# Test within 5%
process_sensor._root_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
process_sensor._root_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.15600')
process_sensor._user_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
process_sensor._user_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.15600')
self.executor_server.manageLoad()
self.assertFalse(self.executor_server.accepting_work)
# Test within 10 keep running
process_sensor._root_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.150')
process_sensor._root_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.100')
process_sensor._user_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.150')
process_sensor._user_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.100')
self.executor_server.manageLoad()
self.assertTrue(self.executor_server.accepting_work)
# Test within 10 stop
process_sensor._root_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.150')
process_sensor._root_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.141')
process_sensor._user_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.150')
process_sensor._user_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.141')
self.executor_server.manageLoad()
self.assertFalse(self.executor_server.accepting_work)
# Test at limit
process_sensor._root_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
process_sensor._root_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
process_sensor._user_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
process_sensor._user_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
self.executor_server.manageLoad()
self.assertFalse(self.executor_server.accepting_work)
# Test no limit only root cgroup files
process_sensor._root_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.max')
process_sensor._root_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
process_sensor._user_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'doesnotexist')
process_sensor._user_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'doesnotexist')
self.executor_server.manageLoad()
self.assertTrue(self.executor_server.accepting_work)
# Test typical runtime values only root cgroup files
process_sensor._root_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
process_sensor._root_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.8k')
process_sensor._user_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'doesnotexist')
process_sensor._user_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'doesnotexist')
self.executor_server.manageLoad()
self.assertTrue(self.executor_server.accepting_work)
# Test at limit only root cgroup files
process_sensor._root_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
process_sensor._root_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
process_sensor._user_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'doesnotexist')
process_sensor._user_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'doesnotexist')
self.executor_server.manageLoad()
self.assertFalse(self.executor_server.accepting_work)
# Test no limit only user cgroup files
process_sensor._root_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'doesnotexist')
process_sensor._root_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'doesnotexist')
process_sensor._user_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.max')
process_sensor._user_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
self.executor_server.manageLoad()
self.assertTrue(self.executor_server.accepting_work)
# Test typical runtime values only user cgroup files
process_sensor._root_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'doesnotexist')
process_sensor._root_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'doesnotexist')
process_sensor._user_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
process_sensor._user_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.8k')
self.executor_server.manageLoad()
self.assertTrue(self.executor_server.accepting_work)
# Test at limit only user cgroup files
process_sensor._root_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'doesnotexist')
process_sensor._root_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'doesnotexist')
process_sensor._user_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
process_sensor._user_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.16k')
self.executor_server.manageLoad()
self.assertFalse(self.executor_server.accepting_work)
# Test no cgroup files found
process_sensor._root_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'doesnotexist')
process_sensor._root_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'doesnotexist')
process_sensor._user_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'doesnotexist')
process_sensor._user_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'doesnotexist')
self.executor_server.manageLoad()
self.assertTrue(self.executor_server.accepting_work)
# Test inability to parse cgroup files
process_sensor._root_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.foo')
process_sensor._root_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.foo')
process_sensor._user_cgroup_max_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.foo')
process_sensor._user_cgroup_cur_file = os.path.join(
FIXTURE_DIR, 'cgroup', 'pids.foo')
self.executor_server.manageLoad()
self.assertTrue(self.executor_server.accepting_work)
@mock.patch('os.getloadavg')
@mock.patch('os.statvfs')
def test_hdd_governor(self, statvfs_mock, loadavg_mock):

View File

@ -0,0 +1,118 @@
# Copyright 2025 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import os.path
from zuul.executor.sensors import SensorInterface
class ProcessSensor(SensorInterface):
log = logging.getLogger("zuul.executor.sensor.process")
def __init__(self, statsd, base_key, config=None):
super().__init__(statsd, base_key)
# The executor and ansible require a number of processes to function
# minimally: the executor itself, ansible, ssh control persistence,
# ssh and so on. Set a minimum of room for 10 processes before we
# stop.
self._min_headroom = 10
self._safety_factor = 0.05
self._uid = os.getuid()
self._pid_max = self._get_pid_max()
self._root_cgroup_max_file = '/sys/fs/cgroup/pids.max'
self._root_cgroup_cur_file = '/sys/fs/cgroup/pids.current'
# This appears to be systemd specific behavior with cgroups that
# reflects the ulimit values. This way we don't need to have a
# separate system for ulimit checking.
self._user_cgroup_max_file = f'/sys/fs/cgroup/user.slice' \
f'/user-{self._uid}.slice/pids.max'
self._user_cgroup_cur_file = f'/sys/fs/cgroup/user.slice' \
f'/user-{self._uid}.slice/pids.current'
def _get_pid_max(self):
# Default for x86_64
default = 2 ** 22
path = '/proc/sys/kernel/pid_max'
if os.path.exists(path):
with open(path) as f:
s = f.read().strip()
try:
i = int(s)
except ValueError:
self.log.exception('Unable to determine pid_max')
i = default
return i
else:
return default
def isOk(self):
# Processes running in the root cgroup won't have these values
# but containers do.
# If no max is found assume pid_max. If no current usage is found
# assume 1 for the current process.
root_max = self._get_root_cgroup_max() or self._pid_max
root_current = self._get_root_cgroup_current() or 1
# Processes running under systemd will have these values.
user_max = self._get_user_slice_max() or self._pid_max
user_current = self._get_user_slice_current() or 1
limit = min(root_max, user_max)
usage = max(root_current, user_current)
min_headroom = limit * self._safety_factor
if min_headroom < self._min_headroom:
min_headroom = self._min_headroom
# This shouldn't ever be negative but I'm not sure if you can reduce
# cgroup limits below the current usage at runtime.
headroom = max(limit - usage, 0)
if self.statsd:
self.statsd.gauge(self.base_key + '.max_process',
limit)
self.statsd.gauge(self.base_key + '.cur_process',
usage)
if min_headroom >= headroom:
return False, f'high process utilization: {usage} max: {limit}'
return True, f'process utilization: {usage} max: {limit}'
def _get_root_cgroup_max(self):
return self._get_cgroup_value(self._root_cgroup_max_file)
def _get_root_cgroup_current(self):
return self._get_cgroup_value(self._root_cgroup_cur_file)
def _get_user_slice_max(self):
return self._get_cgroup_value(self._user_cgroup_max_file)
def _get_user_slice_current(self):
return self._get_cgroup_value(self._user_cgroup_cur_file)
def _get_cgroup_value(self, path):
if os.path.exists(path):
with open(path) as f:
s = f.read().strip()
if s == 'max':
return self._pid_max
else:
try:
i = int(s)
except ValueError:
self.log.exception('Unable to convert cgroup '
'process value')
i = None
return i
else:
return None

View File

@ -61,6 +61,7 @@ import zuul.ansible.logconfig
from zuul.executor.sensors.cpu import CPUSensor
from zuul.executor.sensors.hdd import HDDSensor
from zuul.executor.sensors.pause import PauseSensor
from zuul.executor.sensors.process import ProcessSensor
from zuul.executor.sensors.startingbuilds import StartingBuildsSensor
from zuul.executor.sensors.ram import RAMSensor
from zuul.executor.common import zuul_params_from_job
@ -3813,6 +3814,7 @@ class ExecutorServer(BaseMergeServer):
cpu_sensor,
HDDSensor(self.statsd, base_key, config),
self.pause_sensor,
ProcessSensor(self.statsd, base_key, config),
RAMSensor(self.statsd, base_key, config),
StartingBuildsSensor(self.statsd, base_key,
self, cpu_sensor.max_load_avg, config),