Estimate job runtimes internally

Rather than relying on the workers to supply estimated job runtimes,
record the last 10 successful run times and use those to estimate
the run time of each job.  This means that workers (which may be
highly distributed and lack access to a substantial job history)
no longer need to provide these values, and the central scheduler,
which is better placed to do so since italready sees all job run
times, will.

Failure times and a scoreboard of results are kept for each job
as well for potential future use in evaluating likelihood of
job success.

Change-Id: If0955e15a3da9eb842dbee02a4750a177a092d3e
This commit is contained in:
James E. Blair 2016-05-19 15:21:52 -07:00
parent 7c21047824
commit ce8a213fcd
6 changed files with 253 additions and 38 deletions

View File

@ -876,11 +876,13 @@ class ZuulTestCase(BaseTestCase):
self.test_root = os.path.join(tmp_root, "zuul-test")
self.upstream_root = os.path.join(self.test_root, "upstream")
self.git_root = os.path.join(self.test_root, "git")
self.state_root = os.path.join(self.test_root, "lib")
if os.path.exists(self.test_root):
shutil.rmtree(self.test_root)
os.makedirs(self.test_root)
os.makedirs(self.upstream_root)
os.makedirs(self.state_root)
# Make per test copy of Configuration.
self.setup_config()
@ -888,6 +890,7 @@ class ZuulTestCase(BaseTestCase):
os.path.join(FIXTURE_DIR,
self.config.get('zuul', 'layout_config')))
self.config.set('merger', 'git_dir', self.git_root)
self.config.set('zuul', 'state_dir', self.state_root)
# For each project in config:
self.init_repo("org/project")

View File

@ -12,6 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import random
import fixtures
from zuul import change_matcher as cm
from zuul import model
@ -62,3 +67,76 @@ class TestJob(BaseTestCase):
metajob = model.Job('^job')
job.copy(metajob)
self._assert_job_booleans_are_not_none(job)
class TestJobTimeData(BaseTestCase):
def setUp(self):
super(TestJobTimeData, self).setUp()
self.tmp_root = self.useFixture(fixtures.TempDir(
rootdir=os.environ.get("ZUUL_TEST_ROOT"))
).path
def test_empty_timedata(self):
path = os.path.join(self.tmp_root, 'job-name')
self.assertFalse(os.path.exists(path))
self.assertFalse(os.path.exists(path + '.tmp'))
td = model.JobTimeData(path)
self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
def test_save_reload(self):
path = os.path.join(self.tmp_root, 'job-name')
self.assertFalse(os.path.exists(path))
self.assertFalse(os.path.exists(path + '.tmp'))
td = model.JobTimeData(path)
self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
success_times = []
failure_times = []
results = []
for x in range(10):
success_times.append(int(random.random() * 1000))
failure_times.append(int(random.random() * 1000))
results.append(0)
results.append(1)
random.shuffle(results)
s = f = 0
for result in results:
if result:
td.add(failure_times[f], 'FAILURE')
f += 1
else:
td.add(success_times[s], 'SUCCESS')
s += 1
self.assertEqual(td.success_times, success_times)
self.assertEqual(td.failure_times, failure_times)
self.assertEqual(td.results, results[10:])
td.save()
self.assertTrue(os.path.exists(path))
self.assertFalse(os.path.exists(path + '.tmp'))
td = model.JobTimeData(path)
td.load()
self.assertEqual(td.success_times, success_times)
self.assertEqual(td.failure_times, failure_times)
self.assertEqual(td.results, results[10:])
class TestTimeDataBase(BaseTestCase):
def setUp(self):
super(TestTimeDataBase, self).setUp()
self.tmp_root = self.useFixture(fixtures.TempDir(
rootdir=os.environ.get("ZUUL_TEST_ROOT"))
).path
self.db = model.TimeDataBase(self.tmp_root)
def test_timedatabase(self):
self.assertEqual(self.db.getEstimatedTime('job-name'), 0)
self.db.update('job-name', 50, 'SUCCESS')
self.assertEqual(self.db.getEstimatedTime('job-name'), 50)
self.db.update('job-name', 100, 'SUCCESS')
self.assertEqual(self.db.getEstimatedTime('job-name'), 75)
for x in range(10):
self.db.update('job-name', 100, 'SUCCESS')
self.assertEqual(self.db.getEstimatedTime('job-name'), 100)

View File

@ -34,7 +34,6 @@ import zuul.reporter.gerrit
import zuul.reporter.smtp
from tests.base import (
BaseTestCase,
ZuulTestCase,
repack_repo,
)
@ -44,40 +43,6 @@ logging.basicConfig(level=logging.DEBUG,
'%(levelname)-8s %(message)s')
class TestSchedulerConfigParsing(BaseTestCase):
def test_parse_skip_if(self):
job_yaml = """
jobs:
- name: job_name
skip-if:
- project: ^project_name$
branch: ^stable/icehouse$
all-files-match-any:
- ^filename$
- project: ^project2_name$
all-files-match-any:
- ^filename2$
""".strip()
data = yaml.load(job_yaml)
config_job = data.get('jobs')[0]
sched = zuul.scheduler.Scheduler({})
cm = zuul.change_matcher
expected = cm.MatchAny([
cm.MatchAll([
cm.ProjectMatcher('^project_name$'),
cm.BranchMatcher('^stable/icehouse$'),
cm.MatchAllFiles([cm.FileMatcher('^filename$')]),
]),
cm.MatchAll([
cm.ProjectMatcher('^project2_name$'),
cm.MatchAllFiles([cm.FileMatcher('^filename2$')]),
]),
])
matcher = sched._parseSkipIf(config_job)
self.assertEqual(expected, matcher)
class TestScheduler(ZuulTestCase):
def test_jobs_launched(self):
@ -495,6 +460,46 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2)
def _test_time_database(self, iteration):
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.waitUntilSettled()
time.sleep(2)
data = json.loads(self.sched.formatStatusJSON())
found_job = None
for pipeline in data['pipelines']:
if pipeline['name'] != 'gate':
continue
for queue in pipeline['change_queues']:
for head in queue['heads']:
for item in head:
for job in item['jobs']:
if job['name'] == 'project-merge':
found_job = job
break
self.assertIsNotNone(found_job)
if iteration == 1:
self.assertIsNotNone(found_job['estimated_time'])
self.assertIsNone(found_job['remaining_time'])
else:
self.assertIsNotNone(found_job['estimated_time'])
self.assertTrue(found_job['estimated_time'] >= 2)
self.assertIsNotNone(found_job['remaining_time'])
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()
def test_time_database(self):
"Test the time database"
self._test_time_database(1)
self._test_time_database(2)
def test_two_failed_changes_at_head(self):
"Test that changes are reparented correctly if 2 fail at head"
@ -600,6 +605,36 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2)
def test_parse_skip_if(self):
job_yaml = """
jobs:
- name: job_name
skip-if:
- project: ^project_name$
branch: ^stable/icehouse$
all-files-match-any:
- ^filename$
- project: ^project2_name$
all-files-match-any:
- ^filename2$
""".strip()
data = yaml.load(job_yaml)
config_job = data.get('jobs')[0]
cm = zuul.change_matcher
expected = cm.MatchAny([
cm.MatchAll([
cm.ProjectMatcher('^project_name$'),
cm.BranchMatcher('^stable/icehouse$'),
cm.MatchAllFiles([cm.FileMatcher('^filename$')]),
]),
cm.MatchAll([
cm.ProjectMatcher('^project2_name$'),
cm.MatchAllFiles([cm.FileMatcher('^filename2$')]),
]),
])
matcher = self.sched._parseSkipIf(config_job)
self.assertEqual(expected, matcher)
def test_patch_order(self):
"Test that dependent patches are tested in the right order"
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')

View File

@ -456,9 +456,6 @@ class Gearman(object):
build.number = data.get('number')
build.__gearman_manager = data.get('manager')
self.sched.onBuildStarted(build)
if job.denominator:
build.estimated_time = float(job.denominator) / 1000
else:
self.log.error("Unable to find build %s" % job.unique)

View File

@ -13,7 +13,9 @@
# under the License.
import copy
import os
import re
import struct
import time
from uuid import uuid4
import extras
@ -1380,3 +1382,78 @@ class Layout(object):
job.copy(metajob)
self.jobs[name] = job
return job
class JobTimeData(object):
format = 'B10H10H10B'
version = 0
def __init__(self, path):
self.path = path
self.success_times = [0 for x in range(10)]
self.failure_times = [0 for x in range(10)]
self.results = [0 for x in range(10)]
def load(self):
if not os.path.exists(self.path):
return
with open(self.path) as f:
data = struct.unpack(self.format, f.read())
version = data[0]
if version != self.version:
raise Exception("Unkown data version")
self.success_times = list(data[1:11])
self.failure_times = list(data[11:21])
self.results = list(data[21:32])
def save(self):
tmpfile = self.path + '.tmp'
data = [self.version]
data.extend(self.success_times)
data.extend(self.failure_times)
data.extend(self.results)
data = struct.pack(self.format, *data)
with open(tmpfile, 'w') as f:
f.write(data)
os.rename(tmpfile, self.path)
def add(self, elapsed, result):
elapsed = int(elapsed)
if result == 'SUCCESS':
self.success_times.append(elapsed)
self.success_times.pop(0)
result = 0
else:
self.failure_times.append(elapsed)
self.failure_times.pop(0)
result = 1
self.results.append(result)
self.results.pop(0)
def getEstimatedTime(self):
times = [x for x in self.success_times if x]
if times:
return float(sum(times)) / len(times)
return 0.0
class TimeDataBase(object):
def __init__(self, root):
self.root = root
self.jobs = {}
def _getTD(self, name):
td = self.jobs.get(name)
if not td:
td = JobTimeData(os.path.join(self.root, name))
self.jobs[name] = td
td.load()
return td
def getEstimatedTime(self, name):
return self._getTD(name).getEstimatedTime()
def update(self, name, elapsed, result):
td = self._getTD(name)
td.add(elapsed, result)
td.save()

View File

@ -262,6 +262,9 @@ class Scheduler(threading.Thread):
self.management_event_queue = Queue.Queue()
self.layout = model.Layout()
time_dir = self._get_time_database_dir()
self.time_database = model.TimeDataBase(time_dir)
self.zuul_version = zuul_version.version_info.release_string()
self.last_reconfigured = None
@ -740,6 +743,17 @@ class Scheduler(threading.Thread):
state_dir = '/var/lib/zuul'
return os.path.join(state_dir, 'queue.pickle')
def _get_time_database_dir(self):
if self.config.has_option('zuul', 'state_dir'):
state_dir = os.path.expanduser(self.config.get('zuul',
'state_dir'))
else:
state_dir = '/var/lib/zuul'
d = os.path.join(state_dir, 'times')
if not os.path.exists(d):
os.mkdir(d)
return d
def _save_queue(self):
pickle_file = self._get_queue_pickle_file()
events = []
@ -1069,6 +1083,11 @@ class Scheduler(threading.Thread):
self.log.warning("Build %s is not associated with a pipeline" %
(build,))
return
try:
build.estimated_time = float(self.time_database.getEstimatedTime(
build.job.name))
except Exception:
self.log.exception("Exception estimating build time:")
pipeline.manager.onBuildStarted(event.build)
def _doBuildCompletedEvent(self, event):
@ -1082,6 +1101,12 @@ class Scheduler(threading.Thread):
self.log.warning("Build %s is not associated with a pipeline" %
(build,))
return
if build.end_time and build.start_time and build.result:
duration = build.end_time - build.start_time
try:
self.time_database.update(build.job.name, duration, build.result)
except Exception:
self.log.exception("Exception recording build time:")
pipeline.manager.onBuildCompleted(event.build)
def _doMergeCompletedEvent(self, event):