diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst index c11b5cb0ad..08671c9a16 100644 --- a/doc/source/admin/components.rst +++ b/doc/source/admin/components.rst @@ -354,6 +354,13 @@ executor variables=/etc/zuul/variables.yaml +**disk_limit_per_job** + This integer is the maximum number of megabytes that any one job is + allowed to consume on disk while it is running. If a job's scratch + space has more than this much space consumed, it will be aborted:: + + disk_limit_per_job=100 + merger """""" diff --git a/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml b/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml new file mode 100644 index 0000000000..95ab87057f --- /dev/null +++ b/tests/fixtures/config/disk-accountant/git/common-config/playbooks/dd-big-empty-file.yaml @@ -0,0 +1,6 @@ +- hosts: localhost + tasks: + - command: dd if=/dev/zero of=toobig bs=1M count=2 + - wait_for: + delay: 10 + path: / diff --git a/tests/fixtures/config/disk-accountant/git/common-config/zuul.yaml b/tests/fixtures/config/disk-accountant/git/common-config/zuul.yaml new file mode 100644 index 0000000000..83a5158a31 --- /dev/null +++ b/tests/fixtures/config/disk-accountant/git/common-config/zuul.yaml @@ -0,0 +1,22 @@ +- pipeline: + name: check + manager: independent + allow-secrets: true + trigger: + gerrit: + - event: patchset-created + success: + gerrit: + verified: 1 + failure: + gerrit: + verified: -1 + +- job: + name: dd-big-empty-file + +- project: + name: org/project + check: + jobs: + - dd-big-empty-file diff --git a/tests/fixtures/config/disk-accountant/git/org_project/README b/tests/fixtures/config/disk-accountant/git/org_project/README new file mode 100644 index 0000000000..9daeafb986 --- /dev/null +++ b/tests/fixtures/config/disk-accountant/git/org_project/README @@ -0,0 +1 @@ +test diff --git a/tests/fixtures/config/disk-accountant/main.yaml b/tests/fixtures/config/disk-accountant/main.yaml new file mode 100644 index 0000000000..208e274b13 --- /dev/null +++ b/tests/fixtures/config/disk-accountant/main.yaml @@ -0,0 +1,8 @@ +- tenant: + name: tenant-one + source: + gerrit: + config-projects: + - common-config + untrusted-projects: + - org/project diff --git a/tests/fixtures/zuul-disk-accounting.conf b/tests/fixtures/zuul-disk-accounting.conf new file mode 100644 index 0000000000..b0ae48e52c --- /dev/null +++ b/tests/fixtures/zuul-disk-accounting.conf @@ -0,0 +1,28 @@ +[gearman] +server=127.0.0.1 + +[scheduler] +tenant_config=main.yaml + +[merger] +git_dir=/tmp/zuul-test/merger-git +git_user_email=zuul@example.com +git_user_name=zuul +zuul_url=http://zuul.example.com/p + +[executor] +git_dir=/tmp/zuul-test/executor-git +disk_limit_per_job=1 + +[connection gerrit] +driver=gerrit +server=review.example.com +user=jenkins +sshkey=fake_id_rsa_path + +[connection smtp] +driver=smtp +server=localhost +port=25 +default_from=zuul@example.com +default_to=you@example.com diff --git a/tests/unit/test_disk_accountant.py b/tests/unit/test_disk_accountant.py new file mode 100644 index 0000000000..22c8f343e3 --- /dev/null +++ b/tests/unit/test_disk_accountant.py @@ -0,0 +1,89 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import tempfile +import time + +from tests.base import BaseTestCase + +from zuul.executor.server import DiskAccountant + + +class FakeExecutor(object): + def __init__(self): + self.stopped_jobs = set() + self.used = {} + + def stopJobByJobDir(self, jobdir): + self.stopped_jobs.add(jobdir) + + def usage(self, dirname, used): + self.used[dirname] = used + + +class TestDiskAccountant(BaseTestCase): + def test_disk_accountant(self): + jobs_dir = tempfile.mkdtemp() + cache_dir = tempfile.mkdtemp() + executor_server = FakeExecutor() + da = DiskAccountant(jobs_dir, 1, executor_server.stopJobByJobDir, + cache_dir) + da.start() + + jobdir = os.path.join(jobs_dir, '012345') + os.mkdir(jobdir) + testfile = os.path.join(jobdir, 'tfile') + with open(testfile, 'w') as tf: + tf.write(2 * 1024 * 1024 * '.') + + # da should catch over-limit dir within 5 seconds + for i in range(0, 50): + if jobdir in executor_server.stopped_jobs: + break + time.sleep(0.1) + self.assertEqual(set([jobdir]), executor_server.stopped_jobs) + da.stop() + self.assertFalse(da.thread.is_alive()) + + def test_cache_hard_links(self): + root_dir = tempfile.mkdtemp() + jobs_dir = os.path.join(root_dir, 'jobs') + os.mkdir(jobs_dir) + cache_dir = os.path.join(root_dir, 'cache') + os.mkdir(cache_dir) + + executor_server = FakeExecutor() + da = DiskAccountant(jobs_dir, 1, executor_server.stopJobByJobDir, + cache_dir, executor_server.usage) + da.start() + + jobdir = os.path.join(jobs_dir, '012345') + os.mkdir(jobdir) + + repo_dir = os.path.join(cache_dir, 'a.repo') + os.mkdir(repo_dir) + source_file = os.path.join(repo_dir, 'big_file') + with open(source_file, 'w') as tf: + tf.write(2 * 1024 * 1024 * '.') + dest_link = os.path.join(jobdir, 'big_file') + os.link(source_file, dest_link) + + # da should _not_ count this file. Wait for 5s to get noticed + for i in range(0, 50): + if jobdir in executor_server.used: + break + time.sleep(0.1) + self.assertEqual(set(), executor_server.stopped_jobs) + self.assertIn(jobdir, executor_server.used) + self.assertEqual(1, executor_server.used[jobdir]) + da.stop() diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py index 7dcb4ae32c..3d8d37c15d 100644 --- a/tests/unit/test_v3.py +++ b/tests/unit/test_v3.py @@ -924,3 +924,15 @@ class TestDataReturn(AnsibleZuulTestCase): self.assertIn('- data-return-relative ' 'http://example.com/test/log/url/docs/index.html', A.messages[-1]) + + +class TestDiskAccounting(AnsibleZuulTestCase): + config_file = 'zuul-disk-accounting.conf' + tenant_config_file = 'config/disk-accountant/main.yaml' + + def test_disk_accountant_kills_job(self): + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + self.assertHistory([ + dict(name='dd-big-empty-file', result='ABORTED', changes='1,1')]) diff --git a/zuul/executor/server.py b/zuul/executor/server.py index 84aab555c4..11b7c3ac9a 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -55,6 +55,88 @@ class RoleNotFoundError(ExecutorError): pass +class DiskAccountant(object): + ''' A single thread to periodically run du and monitor a base directory + + Whenever the accountant notices a dir over limit, it will call the + given func with an argument of the job directory. That function + should be used to remediate the problem, generally by killing the + job producing the disk bloat). The function will be called every + time the problem is noticed, so it should be handled synchronously + to avoid stacking up calls. + ''' + log = logging.getLogger("zuul.ExecutorDiskAccountant") + + def __init__(self, jobs_base, limit, func, cache_dir, usage_func=None): + ''' + :param str jobs_base: absolute path name of dir to be monitored + :param int limit: maximum number of MB allowed to be in use in any one + subdir + :param callable func: Function to call with overlimit dirs + :param str cache_dir: absolute path name of dir to be passed as the + first argument to du. This will ensure du does + not count any hardlinks to files in this + directory against a single job. + :param callable usage_func: Optional function to call with usage + for every dir _NOT_ over limit + ''' + # Don't cross the streams + if cache_dir == jobs_base: + raise Exception("Cache dir and jobs dir cannot be the same") + self.thread = threading.Thread(target=self._run, + name='executor-diskaccountant') + self.thread.daemon = True + self._running = False + self.jobs_base = jobs_base + self.limit = limit + self.func = func + self.cache_dir = cache_dir + self.usage_func = usage_func + self.stop_event = threading.Event() + + def _run(self): + while self._running: + # Walk job base + before = time.time() + du = subprocess.Popen( + ['du', '-m', '--max-depth=1', self.cache_dir, self.jobs_base], + stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) + for line in du.stdout: + (size, dirname) = line.rstrip().split() + dirname = dirname.decode('utf8') + if dirname == self.jobs_base or dirname == self.cache_dir: + continue + if os.path.dirname(dirname) == self.cache_dir: + continue + size = int(size) + if size > self.limit: + self.log.info( + "{job} is using {size}MB (limit={limit})" + .format(size=size, job=dirname, limit=self.limit)) + self.func(dirname) + elif self.usage_func: + self.log.debug( + "{job} is using {size}MB (limit={limit})" + .format(size=size, job=dirname, limit=self.limit)) + self.usage_func(dirname, size) + du.wait() + after = time.time() + # Sleep half as long as that took, or 1s, whichever is longer + delay_time = max((after - before) / 2, 1.0) + self.stop_event.wait(delay_time) + + def start(self): + self._running = True + self.thread.start() + + def stop(self): + self._running = False + self.stop_event.set() + # We join here to avoid whitelisting the thread -- if it takes more + # than 5s to stop in tests, there's a problem. + self.thread.join(timeout=5) + + class Watchdog(object): def __init__(self, timeout, function, args): self.timeout = timeout @@ -443,6 +525,8 @@ class ExecutorServer(object): '/var/lib/zuul/executor-git') self.default_username = get_default(self.config, 'executor', 'default_username', 'zuul') + self.disk_limit_per_job = int(get_default(self.config, 'executor', + 'disk_limit_per_job', 250)) self.merge_email = get_default(self.config, 'merger', 'git_user_email') self.merge_name = get_default(self.config, 'merger', 'git_user_name') execution_wrapper_name = get_default(self.config, 'executor', @@ -486,6 +570,10 @@ class ExecutorServer(object): pass self.job_workers = {} + self.disk_accountant = DiskAccountant(self.jobdir_root, + self.disk_limit_per_job, + self.stopJobByJobdir, + self.merge_root) def _getMerger(self, root, logger=None): if root != self.merge_root: @@ -530,6 +618,7 @@ class ExecutorServer(object): self.executor_thread = threading.Thread(target=self.run_executor) self.executor_thread.daemon = True self.executor_thread.start() + self.disk_accountant.start() def register(self): self.executor_worker.registerFunction("executor:execute") @@ -540,6 +629,7 @@ class ExecutorServer(object): def stop(self): self.log.debug("Stopping") + self.disk_accountant.stop() self._running = False self._command_running = False self.command_socket.stop() @@ -675,23 +765,30 @@ class ExecutorServer(object): def finishJob(self, unique): del(self.job_workers[unique]) + def stopJobByJobdir(self, jobdir): + unique = os.path.basename(jobdir) + self.stopJobByUnique(unique) + def stopJob(self, job): try: args = json.loads(job.arguments) self.log.debug("Stop job with arguments: %s" % (args,)) unique = args['uuid'] - job_worker = self.job_workers.get(unique) - if not job_worker: - self.log.debug("Unable to find worker for job %s" % (unique,)) - return - try: - job_worker.stop() - except Exception: - self.log.exception("Exception sending stop command " - "to worker:") + self.stopJobByUnique(unique) finally: job.sendWorkComplete() + def stopJobByUnique(self, unique): + job_worker = self.job_workers.get(unique) + if not job_worker: + self.log.debug("Unable to find worker for job %s" % (unique,)) + return + try: + job_worker.stop() + except Exception: + self.log.exception("Exception sending stop command " + "to worker:") + def cat(self, job): args = json.loads(job.arguments) task = self.update(args['connection'], args['project']) @@ -1429,6 +1526,7 @@ class AnsibleJob(object): if timeout: watchdog.stop() self.log.debug("Stopped watchdog") + self.log.debug("Stopped disk job killer") with self.proc_lock: self.proc = None