From ce8a213fcde091077be644ce424816bb444d329f Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 19 May 2016 15:21:52 -0700 Subject: [PATCH] Estimate job runtimes internally Rather than relying on the workers to supply estimated job runtimes, record the last 10 successful run times and use those to estimate the run time of each job. This means that workers (which may be highly distributed and lack access to a substantial job history) no longer need to provide these values, and the central scheduler, which is better placed to do so since italready sees all job run times, will. Failure times and a scoreboard of results are kept for each job as well for potential future use in evaluating likelihood of job success. Change-Id: If0955e15a3da9eb842dbee02a4750a177a092d3e --- tests/base.py | 3 ++ tests/test_model.py | 78 +++++++++++++++++++++++++++++ tests/test_scheduler.py | 105 ++++++++++++++++++++++++++------------- zuul/launcher/gearman.py | 3 -- zuul/model.py | 77 ++++++++++++++++++++++++++++ zuul/scheduler.py | 25 ++++++++++ 6 files changed, 253 insertions(+), 38 deletions(-) diff --git a/tests/base.py b/tests/base.py index 405caa0ded..585f2d203b 100755 --- a/tests/base.py +++ b/tests/base.py @@ -876,11 +876,13 @@ class ZuulTestCase(BaseTestCase): self.test_root = os.path.join(tmp_root, "zuul-test") self.upstream_root = os.path.join(self.test_root, "upstream") self.git_root = os.path.join(self.test_root, "git") + self.state_root = os.path.join(self.test_root, "lib") if os.path.exists(self.test_root): shutil.rmtree(self.test_root) os.makedirs(self.test_root) os.makedirs(self.upstream_root) + os.makedirs(self.state_root) # Make per test copy of Configuration. self.setup_config() @@ -888,6 +890,7 @@ class ZuulTestCase(BaseTestCase): os.path.join(FIXTURE_DIR, self.config.get('zuul', 'layout_config'))) self.config.set('merger', 'git_dir', self.git_root) + self.config.set('zuul', 'state_dir', self.state_root) # For each project in config: self.init_repo("org/project") diff --git a/tests/test_model.py b/tests/test_model.py index 271161869f..ac19383ff8 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -12,6 +12,11 @@ # License for the specific language governing permissions and limitations # under the License. +import os +import random + +import fixtures + from zuul import change_matcher as cm from zuul import model @@ -62,3 +67,76 @@ class TestJob(BaseTestCase): metajob = model.Job('^job') job.copy(metajob) self._assert_job_booleans_are_not_none(job) + + +class TestJobTimeData(BaseTestCase): + def setUp(self): + super(TestJobTimeData, self).setUp() + self.tmp_root = self.useFixture(fixtures.TempDir( + rootdir=os.environ.get("ZUUL_TEST_ROOT")) + ).path + + def test_empty_timedata(self): + path = os.path.join(self.tmp_root, 'job-name') + self.assertFalse(os.path.exists(path)) + self.assertFalse(os.path.exists(path + '.tmp')) + td = model.JobTimeData(path) + self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + + def test_save_reload(self): + path = os.path.join(self.tmp_root, 'job-name') + self.assertFalse(os.path.exists(path)) + self.assertFalse(os.path.exists(path + '.tmp')) + td = model.JobTimeData(path) + self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + success_times = [] + failure_times = [] + results = [] + for x in range(10): + success_times.append(int(random.random() * 1000)) + failure_times.append(int(random.random() * 1000)) + results.append(0) + results.append(1) + random.shuffle(results) + s = f = 0 + for result in results: + if result: + td.add(failure_times[f], 'FAILURE') + f += 1 + else: + td.add(success_times[s], 'SUCCESS') + s += 1 + self.assertEqual(td.success_times, success_times) + self.assertEqual(td.failure_times, failure_times) + self.assertEqual(td.results, results[10:]) + td.save() + self.assertTrue(os.path.exists(path)) + self.assertFalse(os.path.exists(path + '.tmp')) + td = model.JobTimeData(path) + td.load() + self.assertEqual(td.success_times, success_times) + self.assertEqual(td.failure_times, failure_times) + self.assertEqual(td.results, results[10:]) + + +class TestTimeDataBase(BaseTestCase): + def setUp(self): + super(TestTimeDataBase, self).setUp() + self.tmp_root = self.useFixture(fixtures.TempDir( + rootdir=os.environ.get("ZUUL_TEST_ROOT")) + ).path + self.db = model.TimeDataBase(self.tmp_root) + + def test_timedatabase(self): + self.assertEqual(self.db.getEstimatedTime('job-name'), 0) + self.db.update('job-name', 50, 'SUCCESS') + self.assertEqual(self.db.getEstimatedTime('job-name'), 50) + self.db.update('job-name', 100, 'SUCCESS') + self.assertEqual(self.db.getEstimatedTime('job-name'), 75) + for x in range(10): + self.db.update('job-name', 100, 'SUCCESS') + self.assertEqual(self.db.getEstimatedTime('job-name'), 100) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index fe7c7cc4fa..15d33c8aad 100755 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -34,7 +34,6 @@ import zuul.reporter.gerrit import zuul.reporter.smtp from tests.base import ( - BaseTestCase, ZuulTestCase, repack_repo, ) @@ -44,40 +43,6 @@ logging.basicConfig(level=logging.DEBUG, '%(levelname)-8s %(message)s') -class TestSchedulerConfigParsing(BaseTestCase): - - def test_parse_skip_if(self): - job_yaml = """ -jobs: - - name: job_name - skip-if: - - project: ^project_name$ - branch: ^stable/icehouse$ - all-files-match-any: - - ^filename$ - - project: ^project2_name$ - all-files-match-any: - - ^filename2$ - """.strip() - data = yaml.load(job_yaml) - config_job = data.get('jobs')[0] - sched = zuul.scheduler.Scheduler({}) - cm = zuul.change_matcher - expected = cm.MatchAny([ - cm.MatchAll([ - cm.ProjectMatcher('^project_name$'), - cm.BranchMatcher('^stable/icehouse$'), - cm.MatchAllFiles([cm.FileMatcher('^filename$')]), - ]), - cm.MatchAll([ - cm.ProjectMatcher('^project2_name$'), - cm.MatchAllFiles([cm.FileMatcher('^filename2$')]), - ]), - ]) - matcher = sched._parseSkipIf(config_job) - self.assertEqual(expected, matcher) - - class TestScheduler(ZuulTestCase): def test_jobs_launched(self): @@ -495,6 +460,46 @@ class TestScheduler(ZuulTestCase): self.assertEqual(B.reported, 2) self.assertEqual(C.reported, 2) + def _test_time_database(self, iteration): + self.worker.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + A.addApproval('CRVW', 2) + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.waitUntilSettled() + time.sleep(2) + + data = json.loads(self.sched.formatStatusJSON()) + found_job = None + for pipeline in data['pipelines']: + if pipeline['name'] != 'gate': + continue + for queue in pipeline['change_queues']: + for head in queue['heads']: + for item in head: + for job in item['jobs']: + if job['name'] == 'project-merge': + found_job = job + break + + self.assertIsNotNone(found_job) + if iteration == 1: + self.assertIsNotNone(found_job['estimated_time']) + self.assertIsNone(found_job['remaining_time']) + else: + self.assertIsNotNone(found_job['estimated_time']) + self.assertTrue(found_job['estimated_time'] >= 2) + self.assertIsNotNone(found_job['remaining_time']) + + self.worker.hold_jobs_in_build = False + self.worker.release() + self.waitUntilSettled() + + def test_time_database(self): + "Test the time database" + + self._test_time_database(1) + self._test_time_database(2) + def test_two_failed_changes_at_head(self): "Test that changes are reparented correctly if 2 fail at head" @@ -600,6 +605,36 @@ class TestScheduler(ZuulTestCase): self.assertEqual(B.reported, 2) self.assertEqual(C.reported, 2) + def test_parse_skip_if(self): + job_yaml = """ +jobs: + - name: job_name + skip-if: + - project: ^project_name$ + branch: ^stable/icehouse$ + all-files-match-any: + - ^filename$ + - project: ^project2_name$ + all-files-match-any: + - ^filename2$ + """.strip() + data = yaml.load(job_yaml) + config_job = data.get('jobs')[0] + cm = zuul.change_matcher + expected = cm.MatchAny([ + cm.MatchAll([ + cm.ProjectMatcher('^project_name$'), + cm.BranchMatcher('^stable/icehouse$'), + cm.MatchAllFiles([cm.FileMatcher('^filename$')]), + ]), + cm.MatchAll([ + cm.ProjectMatcher('^project2_name$'), + cm.MatchAllFiles([cm.FileMatcher('^filename2$')]), + ]), + ]) + matcher = self.sched._parseSkipIf(config_job) + self.assertEqual(expected, matcher) + def test_patch_order(self): "Test that dependent patches are tested in the right order" A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py index 69fb71bc18..f3b867ce93 100644 --- a/zuul/launcher/gearman.py +++ b/zuul/launcher/gearman.py @@ -456,9 +456,6 @@ class Gearman(object): build.number = data.get('number') build.__gearman_manager = data.get('manager') self.sched.onBuildStarted(build) - - if job.denominator: - build.estimated_time = float(job.denominator) / 1000 else: self.log.error("Unable to find build %s" % job.unique) diff --git a/zuul/model.py b/zuul/model.py index 5bea5d03bb..3fb0577f23 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -13,7 +13,9 @@ # under the License. import copy +import os import re +import struct import time from uuid import uuid4 import extras @@ -1380,3 +1382,78 @@ class Layout(object): job.copy(metajob) self.jobs[name] = job return job + + +class JobTimeData(object): + format = 'B10H10H10B' + version = 0 + + def __init__(self, path): + self.path = path + self.success_times = [0 for x in range(10)] + self.failure_times = [0 for x in range(10)] + self.results = [0 for x in range(10)] + + def load(self): + if not os.path.exists(self.path): + return + with open(self.path) as f: + data = struct.unpack(self.format, f.read()) + version = data[0] + if version != self.version: + raise Exception("Unkown data version") + self.success_times = list(data[1:11]) + self.failure_times = list(data[11:21]) + self.results = list(data[21:32]) + + def save(self): + tmpfile = self.path + '.tmp' + data = [self.version] + data.extend(self.success_times) + data.extend(self.failure_times) + data.extend(self.results) + data = struct.pack(self.format, *data) + with open(tmpfile, 'w') as f: + f.write(data) + os.rename(tmpfile, self.path) + + def add(self, elapsed, result): + elapsed = int(elapsed) + if result == 'SUCCESS': + self.success_times.append(elapsed) + self.success_times.pop(0) + result = 0 + else: + self.failure_times.append(elapsed) + self.failure_times.pop(0) + result = 1 + self.results.append(result) + self.results.pop(0) + + def getEstimatedTime(self): + times = [x for x in self.success_times if x] + if times: + return float(sum(times)) / len(times) + return 0.0 + + +class TimeDataBase(object): + def __init__(self, root): + self.root = root + self.jobs = {} + + def _getTD(self, name): + td = self.jobs.get(name) + if not td: + td = JobTimeData(os.path.join(self.root, name)) + self.jobs[name] = td + td.load() + return td + + def getEstimatedTime(self, name): + return self._getTD(name).getEstimatedTime() + + def update(self, name, elapsed, result): + td = self._getTD(name) + td.add(elapsed, result) + td.save() diff --git a/zuul/scheduler.py b/zuul/scheduler.py index aea9a67e96..ee5cd2bca0 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -262,6 +262,9 @@ class Scheduler(threading.Thread): self.management_event_queue = Queue.Queue() self.layout = model.Layout() + time_dir = self._get_time_database_dir() + self.time_database = model.TimeDataBase(time_dir) + self.zuul_version = zuul_version.version_info.release_string() self.last_reconfigured = None @@ -740,6 +743,17 @@ class Scheduler(threading.Thread): state_dir = '/var/lib/zuul' return os.path.join(state_dir, 'queue.pickle') + def _get_time_database_dir(self): + if self.config.has_option('zuul', 'state_dir'): + state_dir = os.path.expanduser(self.config.get('zuul', + 'state_dir')) + else: + state_dir = '/var/lib/zuul' + d = os.path.join(state_dir, 'times') + if not os.path.exists(d): + os.mkdir(d) + return d + def _save_queue(self): pickle_file = self._get_queue_pickle_file() events = [] @@ -1069,6 +1083,11 @@ class Scheduler(threading.Thread): self.log.warning("Build %s is not associated with a pipeline" % (build,)) return + try: + build.estimated_time = float(self.time_database.getEstimatedTime( + build.job.name)) + except Exception: + self.log.exception("Exception estimating build time:") pipeline.manager.onBuildStarted(event.build) def _doBuildCompletedEvent(self, event): @@ -1082,6 +1101,12 @@ class Scheduler(threading.Thread): self.log.warning("Build %s is not associated with a pipeline" % (build,)) return + if build.end_time and build.start_time and build.result: + duration = build.end_time - build.start_time + try: + self.time_database.update(build.job.name, duration, build.result) + except Exception: + self.log.exception("Exception recording build time:") pipeline.manager.onBuildCompleted(event.build) def _doMergeCompletedEvent(self, event):