Add rate limiting to dependent pipeline queues

When changes report failure reduce the total number of changes that will
be tested concurrently in the pipeline queue the reported change
belonged too. Increase the number of changes that will be tested when
changes report success. This implements simple rate limiting which
should reduce resource thrash when tests are unstable.

Change-Id: Id092446c83649b3916751c4e4665d2adc75d0458
This commit is contained in:
Clark Boylan 2014-01-21 11:43:20 -08:00
parent f39b9ca80c
commit 7603a37725
5 changed files with 279 additions and 5 deletions

32
tests/fixtures/layout-rate-limit.yaml vendored Normal file
View File

@ -0,0 +1,32 @@
pipelines:
- name: gate
manager: DependentPipelineManager
failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
trigger:
gerrit:
- event: comment-added
approval:
- approved: 1
start:
gerrit:
verified: 0
success:
gerrit:
verified: 2
submit: true
failure:
gerrit:
verified: -2
window: 2
window-floor: 1
window-increase-type: linear
window-increase-factor: 1
window-decrease-type: exponential
window-decrease-factor: 2
projects:
- name: org/project
gate:
- project-merge:
- project-test1
- project-test2

View File

@ -3301,3 +3301,167 @@ class TestScheduler(testtools.TestCase):
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()
def test_queue_rate_limiting(self):
"Test that DependentPipelines are rate limited with dep across window"
self.config.set('zuul', 'layout_config',
'tests/fixtures/layout-rate-limit.yaml')
self.sched.reconfigure(self.config)
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
C.setDependsOn(B, 1)
self.worker.addFailTest('project-test1', A)
A.addApproval('CRVW', 2)
B.addApproval('CRVW', 2)
C.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
self.waitUntilSettled()
# Only A and B will have their merge jobs queued because
# window is 2.
self.assertEqual(len(self.builds), 2)
self.assertEqual(self.builds[0].name, 'project-merge')
self.assertEqual(self.builds[1].name, 'project-merge')
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
# Only A and B will have their test jobs queued because
# window is 2.
self.assertEqual(len(self.builds), 4)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test2')
self.assertEqual(self.builds[2].name, 'project-test1')
self.assertEqual(self.builds[3].name, 'project-test2')
self.worker.release('project-.*')
self.waitUntilSettled()
queue = self.sched.layout.pipelines['gate'].queues[0]
# A failed so window is reduced by 1 to 1.
self.assertEqual(queue.window, 1)
self.assertEqual(queue.window_floor, 1)
self.assertEqual(A.data['status'], 'NEW')
# Gate is reset and only B's merge job is queued because
# window shrunk to 1.
self.assertEqual(len(self.builds), 1)
self.assertEqual(self.builds[0].name, 'project-merge')
self.worker.release('.*-merge')
self.waitUntilSettled()
# Only B's test jobs are queued because window is still 1.
self.assertEqual(len(self.builds), 2)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test2')
self.worker.release('project-.*')
self.waitUntilSettled()
# B was successfully merged so window is increased to 2.
self.assertEqual(queue.window, 2)
self.assertEqual(queue.window_floor, 1)
self.assertEqual(B.data['status'], 'MERGED')
# Only C is left and its merge job is queued.
self.assertEqual(len(self.builds), 1)
self.assertEqual(self.builds[0].name, 'project-merge')
self.worker.release('.*-merge')
self.waitUntilSettled()
# After successful merge job the test jobs for C are queued.
self.assertEqual(len(self.builds), 2)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test2')
self.worker.release('project-.*')
self.waitUntilSettled()
# C successfully merged so window is bumped to 3.
self.assertEqual(queue.window, 3)
self.assertEqual(queue.window_floor, 1)
self.assertEqual(C.data['status'], 'MERGED')
def test_queue_rate_limiting_dependent(self):
"Test that DependentPipelines are rate limited with dep in window"
self.config.set('zuul', 'layout_config',
'tests/fixtures/layout-rate-limit.yaml')
self.sched.reconfigure(self.config)
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
B.setDependsOn(A, 1)
self.worker.addFailTest('project-test1', A)
A.addApproval('CRVW', 2)
B.addApproval('CRVW', 2)
C.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
self.waitUntilSettled()
# Only A and B will have their merge jobs queued because
# window is 2.
self.assertEqual(len(self.builds), 2)
self.assertEqual(self.builds[0].name, 'project-merge')
self.assertEqual(self.builds[1].name, 'project-merge')
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
# Only A and B will have their test jobs queued because
# window is 2.
self.assertEqual(len(self.builds), 4)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test2')
self.assertEqual(self.builds[2].name, 'project-test1')
self.assertEqual(self.builds[3].name, 'project-test2')
self.worker.release('project-.*')
self.waitUntilSettled()
queue = self.sched.layout.pipelines['gate'].queues[0]
# A failed so window is reduced by 1 to 1.
self.assertEqual(queue.window, 1)
self.assertEqual(queue.window_floor, 1)
self.assertEqual(A.data['status'], 'NEW')
self.assertEqual(B.data['status'], 'NEW')
# Gate is reset and only C's merge job is queued because
# window shrunk to 1 and A and B were dequeued.
self.assertEqual(len(self.builds), 1)
self.assertEqual(self.builds[0].name, 'project-merge')
self.worker.release('.*-merge')
self.waitUntilSettled()
# Only C's test jobs are queued because window is still 1.
self.assertEqual(len(self.builds), 2)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test2')
self.worker.release('project-.*')
self.waitUntilSettled()
# C was successfully merged so window is increased to 2.
self.assertEqual(queue.window, 2)
self.assertEqual(queue.window_floor, 1)
self.assertEqual(C.data['status'], 'MERGED')

View File

@ -60,6 +60,10 @@ class LayoutSchema(object):
'subject': str,
},
}
window = v.All(int, v.Range(min=1))
window_floor = v.All(int, v.Range(min=1))
window_type = v.Any('linear', 'exponential')
window_factor = v.All(int, v.Range(min=1))
pipeline = {v.Required('name'): str,
v.Required('manager'): manager,
@ -72,6 +76,12 @@ class LayoutSchema(object):
'success': report_actions,
'failure': report_actions,
'start': report_actions,
'window': window,
'window-floor': window_floor,
'window-increase-type': window_type,
'window-increase-factor': window_factor,
'window-decrease-type': window_type,
'window-decrease-factor': window_factor,
}
pipelines = [pipeline]

View File

@ -59,6 +59,12 @@ class Pipeline(object):
self.start_actions = None
self.success_actions = None
self.failure_actions = None
self.window = None
self.window_floor = None
self.window_increase_type = None
self.window_increase_factor = None
self.window_decrease_type = None
self.window_decrease_factor = None
def __repr__(self):
return '<Pipeline %s>' % self.name
@ -375,13 +381,21 @@ class ChangeQueue(object):
a queue shared by interrelated projects foo and bar, and a second
queue for independent project baz. Pipelines have one or more
PipelineQueues."""
def __init__(self, pipeline, dependent=True):
def __init__(self, pipeline, dependent=True, window=0, window_floor=1,
window_increase_type='linear', window_increase_factor=1,
window_decrease_type='exponential', window_decrease_factor=2):
self.pipeline = pipeline
self.name = ''
self.projects = []
self._jobs = set()
self.queue = []
self.dependent = dependent
self.window = window
self.window_floor = window_floor
self.window_increase_type = window_increase_type
self.window_increase_factor = window_increase_factor
self.window_decrease_type = window_decrease_type
self.window_decrease_factor = window_decrease_factor
def __repr__(self):
return '<ChangeQueue %s: %s>' % (self.pipeline.name, self.name)
@ -398,7 +412,7 @@ class ChangeQueue(object):
self._jobs |= set(self.pipeline.getJobTree(project).getJobs())
def enqueueChange(self, change):
item = QueueItem(self.pipeline, change)
item = QueueItem(self, self.pipeline, change)
self.enqueueItem(item)
item.enqueue_time = time.time()
return item
@ -444,6 +458,32 @@ class ChangeQueue(object):
def mergeChangeQueue(self, other):
for project in other.projects:
self.addProject(project)
self.window = min(self.window, other.window)
# TODO merge semantics
def getActionableItems(self):
if self.dependent and self.window:
return self.queue[:self.window]
else:
return self.queue[:]
def increaseWindowSize(self):
if self.dependent:
if self.window_increase_type == 'linear':
self.window += self.window_increase_factor
elif self.window_increase_type == 'exponential':
self.window *= self.window_increase_factor
def decreaseWindowSize(self):
if self.dependent:
if self.window_decrease_type == 'linear':
self.window = max(
self.window_floor,
self.window - self.window_decrease_factor)
elif self.window_decrease_type == 'exponential':
self.window = max(
self.window_floor,
self.window / self.window_decrease_factor)
class Project(object):
@ -619,7 +659,8 @@ class BuildSet(object):
class QueueItem(object):
"""A changish inside of a Pipeline queue"""
def __init__(self, pipeline, change):
def __init__(self, change_queue, pipeline, change):
self.change_queue = change_queue
self.pipeline = pipeline
self.change = change # a changeish
self.build_sets = []

View File

@ -200,6 +200,17 @@ class Scheduler(threading.Thread):
pipeline.success_actions = action_reporters['success']
pipeline.failure_actions = action_reporters['failure']
pipeline.window = conf_pipeline.get('window', 20)
pipeline.window_floor = conf_pipeline.get('window-floor', 3)
pipeline.window_increase_type = conf_pipeline.get(
'window-increase-type', 'linear')
pipeline.window_increase_factor = conf_pipeline.get(
'window-increase-factor', 1)
pipeline.window_decrease_type = conf_pipeline.get(
'window-decrease-type', 'exponential')
pipeline.window_decrease_factor = conf_pipeline.get(
'window-decrease-factor', 2)
manager = globals()[conf_pipeline['manager']](self, pipeline)
pipeline.setManager(manager)
layout.pipelines[conf_pipeline['name']] = pipeline
@ -1172,7 +1183,7 @@ class BasePipelineManager(object):
for queue in self.pipeline.queues:
queue_changed = False
nnfi = None # Nearest non-failing item
for item in queue.queue[:]:
for item in queue.getActionableItems():
item_changed, nnfi = self._processOneItem(item, nnfi)
if item_changed:
queue_changed = True
@ -1247,7 +1258,16 @@ class BasePipelineManager(object):
if not (succeeded and merged):
self.log.debug("Reported change %s failed tests or failed "
"to merge" % (item.change))
item.change_queue.decreaseWindowSize()
self.log.debug("%s window size decreased to %s" %
(item.change_queue,
item.change_queue.window))
raise MergeFailure("Change %s failed to merge" % item.change)
else:
item.change_queue.increaseWindowSize()
self.log.debug("%s window size increased to %s" %
(item.change_queue,
item.change_queue.window))
def _reportItem(self, item):
if item.reported:
@ -1504,7 +1524,14 @@ class DependentPipelineManager(BasePipelineManager):
change_queues = []
for project in self.pipeline.getProjects():
change_queue = ChangeQueue(self.pipeline)
change_queue = ChangeQueue(
self.pipeline,
window=self.pipeline.window,
window_floor=self.pipeline.window_floor,
window_increase_type=self.pipeline.window_increase_type,
window_increase_factor=self.pipeline.window_increase_factor,
window_decrease_type=self.pipeline.window_decrease_type,
window_decrease_factor=self.pipeline.window_decrease_factor)
change_queue.addProject(project)
change_queues.append(change_queue)
self.log.debug("Created queue: %s" % change_queue)