Browse Source

Monitor job root and kill over limit jobs

If a job were to be pointed at an abnormally large git repository (or
a maliciously large one), a clone would fill the disk. Or anything else
that might happen that writes data onto the executor disk.

We run a single thread periodically running du on the root of all jobs
on this executor. This is called the DiskAccountant.

We set a config item per executor of the limit per job. This won't
actually save a server from a full disk if many thousands of concurrent
changes are submitted, but this will prevent any accidental filling of
the disk, and make malicious disk filling much harder.

We also ignore hard links from the merge root, which will exempt bits
cloned from the merge root from disk accounting.

Change-Id: I415e5930cc3ebe2c7e1a84316e78578d6b9ecf30
Story: 2000879
Task: 3504
changes/02/485902/10
Clint Byrum 4 years ago
parent
commit
dc8a090c4c
  1. 7
      doc/source/admin/components.rst
  2. 6
      tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml
  3. 22
      tests/fixtures/config/disk-accountant/git/common-config/zuul.yaml
  4. 1
      tests/fixtures/config/disk-accountant/git/org_project/README
  5. 8
      tests/fixtures/config/disk-accountant/main.yaml
  6. 28
      tests/fixtures/zuul-disk-accounting.conf
  7. 89
      tests/unit/test_disk_accountant.py
  8. 12
      tests/unit/test_v3.py
  9. 116
      zuul/executor/server.py

7
doc/source/admin/components.rst

@ -354,6 +354,13 @@ executor
variables=/etc/zuul/variables.yaml
**disk_limit_per_job**
This integer is the maximum number of megabytes that any one job is
allowed to consume on disk while it is running. If a job's scratch
space has more than this much space consumed, it will be aborted::
disk_limit_per_job=100
merger
""""""

6
tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml

@ -0,0 +1,6 @@
- hosts: localhost
tasks:
- command: dd if=/dev/zero of=toobig bs=1M count=2
- wait_for:
delay: 10
path: /

22
tests/fixtures/config/disk-accountant/git/common-config/zuul.yaml

@ -0,0 +1,22 @@
- pipeline:
name: check
manager: independent
allow-secrets: true
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
verified: 1
failure:
gerrit:
verified: -1
- job:
name: dd-big-empty-file
- project:
name: org/project
check:
jobs:
- dd-big-empty-file

1
tests/fixtures/config/disk-accountant/git/org_project/README

@ -0,0 +1 @@
test

8
tests/fixtures/config/disk-accountant/main.yaml

@ -0,0 +1,8 @@
- tenant:
name: tenant-one
source:
gerrit:
config-projects:
- common-config
untrusted-projects:
- org/project

28
tests/fixtures/zuul-disk-accounting.conf

@ -0,0 +1,28 @@
[gearman]
server=127.0.0.1
[scheduler]
tenant_config=main.yaml
[merger]
git_dir=/tmp/zuul-test/merger-git
git_user_email=zuul@example.com
git_user_name=zuul
zuul_url=http://zuul.example.com/p
[executor]
git_dir=/tmp/zuul-test/executor-git
disk_limit_per_job=1
[connection gerrit]
driver=gerrit
server=review.example.com
user=jenkins
sshkey=fake_id_rsa_path
[connection smtp]
driver=smtp
server=localhost
port=25
default_from=zuul@example.com
default_to=you@example.com

89
tests/unit/test_disk_accountant.py

@ -0,0 +1,89 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import tempfile
import time
from tests.base import BaseTestCase
from zuul.executor.server import DiskAccountant
class FakeExecutor(object):
def __init__(self):
self.stopped_jobs = set()
self.used = {}
def stopJobByJobDir(self, jobdir):
self.stopped_jobs.add(jobdir)
def usage(self, dirname, used):
self.used[dirname] = used
class TestDiskAccountant(BaseTestCase):
def test_disk_accountant(self):
jobs_dir = tempfile.mkdtemp()
cache_dir = tempfile.mkdtemp()
executor_server = FakeExecutor()
da = DiskAccountant(jobs_dir, 1, executor_server.stopJobByJobDir,
cache_dir)
da.start()
jobdir = os.path.join(jobs_dir, '012345')
os.mkdir(jobdir)
testfile = os.path.join(jobdir, 'tfile')
with open(testfile, 'w') as tf:
tf.write(2 * 1024 * 1024 * '.')
# da should catch over-limit dir within 5 seconds
for i in range(0, 50):
if jobdir in executor_server.stopped_jobs:
break
time.sleep(0.1)
self.assertEqual(set([jobdir]), executor_server.stopped_jobs)
da.stop()
self.assertFalse(da.thread.is_alive())
def test_cache_hard_links(self):
root_dir = tempfile.mkdtemp()
jobs_dir = os.path.join(root_dir, 'jobs')
os.mkdir(jobs_dir)
cache_dir = os.path.join(root_dir, 'cache')
os.mkdir(cache_dir)
executor_server = FakeExecutor()
da = DiskAccountant(jobs_dir, 1, executor_server.stopJobByJobDir,
cache_dir, executor_server.usage)
da.start()
jobdir = os.path.join(jobs_dir, '012345')
os.mkdir(jobdir)
repo_dir = os.path.join(cache_dir, 'a.repo')
os.mkdir(repo_dir)
source_file = os.path.join(repo_dir, 'big_file')
with open(source_file, 'w') as tf:
tf.write(2 * 1024 * 1024 * '.')
dest_link = os.path.join(jobdir, 'big_file')
os.link(source_file, dest_link)
# da should _not_ count this file. Wait for 5s to get noticed
for i in range(0, 50):
if jobdir in executor_server.used:
break
time.sleep(0.1)
self.assertEqual(set(), executor_server.stopped_jobs)
self.assertIn(jobdir, executor_server.used)
self.assertEqual(1, executor_server.used[jobdir])
da.stop()

12
tests/unit/test_v3.py

@ -924,3 +924,15 @@ class TestDataReturn(AnsibleZuulTestCase):
self.assertIn('- data-return-relative '
'http://example.com/test/log/url/docs/index.html',
A.messages[-1])
class TestDiskAccounting(AnsibleZuulTestCase):
config_file = 'zuul-disk-accounting.conf'
tenant_config_file = 'config/disk-accountant/main.yaml'
def test_disk_accountant_kills_job(self):
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertHistory([
dict(name='dd-big-empty-file', result='ABORTED', changes='1,1')])

116
zuul/executor/server.py

@ -55,6 +55,88 @@ class RoleNotFoundError(ExecutorError):
pass
class DiskAccountant(object):
''' A single thread to periodically run du and monitor a base directory
Whenever the accountant notices a dir over limit, it will call the
given func with an argument of the job directory. That function
should be used to remediate the problem, generally by killing the
job producing the disk bloat). The function will be called every
time the problem is noticed, so it should be handled synchronously
to avoid stacking up calls.
'''
log = logging.getLogger("zuul.ExecutorDiskAccountant")
def __init__(self, jobs_base, limit, func, cache_dir, usage_func=None):
'''
:param str jobs_base: absolute path name of dir to be monitored
:param int limit: maximum number of MB allowed to be in use in any one
subdir
:param callable func: Function to call with overlimit dirs
:param str cache_dir: absolute path name of dir to be passed as the
first argument to du. This will ensure du does
not count any hardlinks to files in this
directory against a single job.
:param callable usage_func: Optional function to call with usage
for every dir _NOT_ over limit
'''
# Don't cross the streams
if cache_dir == jobs_base:
raise Exception("Cache dir and jobs dir cannot be the same")
self.thread = threading.Thread(target=self._run,
name='executor-diskaccountant')
self.thread.daemon = True
self._running = False
self.jobs_base = jobs_base
self.limit = limit
self.func = func
self.cache_dir = cache_dir
self.usage_func = usage_func
self.stop_event = threading.Event()
def _run(self):
while self._running:
# Walk job base
before = time.time()
du = subprocess.Popen(
['du', '-m', '--max-depth=1', self.cache_dir, self.jobs_base],
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
for line in du.stdout:
(size, dirname) = line.rstrip().split()
dirname = dirname.decode('utf8')
if dirname == self.jobs_base or dirname == self.cache_dir:
continue
if os.path.dirname(dirname) == self.cache_dir:
continue
size = int(size)
if size > self.limit:
self.log.info(
"{job} is using {size}MB (limit={limit})"
.format(size=size, job=dirname, limit=self.limit))
self.func(dirname)
elif self.usage_func:
self.log.debug(
"{job} is using {size}MB (limit={limit})"
.format(size=size, job=dirname, limit=self.limit))
self.usage_func(dirname, size)
du.wait()
after = time.time()
# Sleep half as long as that took, or 1s, whichever is longer
delay_time = max((after - before) / 2, 1.0)
self.stop_event.wait(delay_time)
def start(self):
self._running = True
self.thread.start()
def stop(self):
self._running = False
self.stop_event.set()
# We join here to avoid whitelisting the thread -- if it takes more
# than 5s to stop in tests, there's a problem.
self.thread.join(timeout=5)
class Watchdog(object):
def __init__(self, timeout, function, args):
self.timeout = timeout
@ -443,6 +525,8 @@ class ExecutorServer(object):
'/var/lib/zuul/executor-git')
self.default_username = get_default(self.config, 'executor',
'default_username', 'zuul')
self.disk_limit_per_job = int(get_default(self.config, 'executor',
'disk_limit_per_job', 250))
self.merge_email = get_default(self.config, 'merger', 'git_user_email')
self.merge_name = get_default(self.config, 'merger', 'git_user_name')
execution_wrapper_name = get_default(self.config, 'executor',
@ -486,6 +570,10 @@ class ExecutorServer(object):
pass
self.job_workers = {}
self.disk_accountant = DiskAccountant(self.jobdir_root,
self.disk_limit_per_job,
self.stopJobByJobdir,
self.merge_root)
def _getMerger(self, root, logger=None):
if root != self.merge_root:
@ -530,6 +618,7 @@ class ExecutorServer(object):
self.executor_thread = threading.Thread(target=self.run_executor)
self.executor_thread.daemon = True
self.executor_thread.start()
self.disk_accountant.start()
def register(self):
self.executor_worker.registerFunction("executor:execute")
@ -540,6 +629,7 @@ class ExecutorServer(object):
def stop(self):
self.log.debug("Stopping")
self.disk_accountant.stop()
self._running = False
self._command_running = False
self.command_socket.stop()
@ -675,23 +765,30 @@ class ExecutorServer(object):
def finishJob(self, unique):
del(self.job_workers[unique])
def stopJobByJobdir(self, jobdir):
unique = os.path.basename(jobdir)
self.stopJobByUnique(unique)
def stopJob(self, job):
try:
args = json.loads(job.arguments)
self.log.debug("Stop job with arguments: %s" % (args,))
unique = args['uuid']
job_worker = self.job_workers.get(unique)
if not job_worker:
self.log.debug("Unable to find worker for job %s" % (unique,))
return
try:
job_worker.stop()
except Exception:
self.log.exception("Exception sending stop command "
"to worker:")
self.stopJobByUnique(unique)
finally:
job.sendWorkComplete()
def stopJobByUnique(self, unique):
job_worker = self.job_workers.get(unique)
if not job_worker:
self.log.debug("Unable to find worker for job %s" % (unique,))
return
try:
job_worker.stop()
except Exception:
self.log.exception("Exception sending stop command "
"to worker:")
def cat(self, job):
args = json.loads(job.arguments)
task = self.update(args['connection'], args['project'])
@ -1429,6 +1526,7 @@ class AnsibleJob(object):
if timeout:
watchdog.stop()
self.log.debug("Stopped watchdog")
self.log.debug("Stopped disk job killer")
with self.proc_lock:
self.proc = None

Loading…
Cancel
Save