diff --git a/tests/fixtures/layout-rate-limit.yaml b/tests/fixtures/layout-rate-limit.yaml new file mode 100644 index 0000000000..9f6748c98d --- /dev/null +++ b/tests/fixtures/layout-rate-limit.yaml @@ -0,0 +1,32 @@ +pipelines: + - name: gate + manager: DependentPipelineManager + failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures + trigger: + gerrit: + - event: comment-added + approval: + - approved: 1 + start: + gerrit: + verified: 0 + success: + gerrit: + verified: 2 + submit: true + failure: + gerrit: + verified: -2 + window: 2 + window-floor: 1 + window-increase-type: linear + window-increase-factor: 1 + window-decrease-type: exponential + window-decrease-factor: 2 + +projects: + - name: org/project + gate: + - project-merge: + - project-test1 + - project-test2 diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 0b9e5a8a91..0ce0f88c60 100755 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -3301,3 +3301,167 @@ class TestScheduler(testtools.TestCase): self.worker.hold_jobs_in_build = False self.worker.release() self.waitUntilSettled() + + def test_queue_rate_limiting(self): + "Test that DependentPipelines are rate limited with dep across window" + self.config.set('zuul', 'layout_config', + 'tests/fixtures/layout-rate-limit.yaml') + self.sched.reconfigure(self.config) + self.worker.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') + + C.setDependsOn(B, 1) + self.worker.addFailTest('project-test1', A) + + A.addApproval('CRVW', 2) + B.addApproval('CRVW', 2) + C.addApproval('CRVW', 2) + + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) + self.waitUntilSettled() + + # Only A and B will have their merge jobs queued because + # window is 2. + self.assertEqual(len(self.builds), 2) + self.assertEqual(self.builds[0].name, 'project-merge') + self.assertEqual(self.builds[1].name, 'project-merge') + + self.worker.release('.*-merge') + self.waitUntilSettled() + self.worker.release('.*-merge') + self.waitUntilSettled() + + # Only A and B will have their test jobs queued because + # window is 2. + self.assertEqual(len(self.builds), 4) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test2') + self.assertEqual(self.builds[2].name, 'project-test1') + self.assertEqual(self.builds[3].name, 'project-test2') + + self.worker.release('project-.*') + self.waitUntilSettled() + + queue = self.sched.layout.pipelines['gate'].queues[0] + # A failed so window is reduced by 1 to 1. + self.assertEqual(queue.window, 1) + self.assertEqual(queue.window_floor, 1) + self.assertEqual(A.data['status'], 'NEW') + + # Gate is reset and only B's merge job is queued because + # window shrunk to 1. + self.assertEqual(len(self.builds), 1) + self.assertEqual(self.builds[0].name, 'project-merge') + + self.worker.release('.*-merge') + self.waitUntilSettled() + + # Only B's test jobs are queued because window is still 1. + self.assertEqual(len(self.builds), 2) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test2') + + self.worker.release('project-.*') + self.waitUntilSettled() + + # B was successfully merged so window is increased to 2. + self.assertEqual(queue.window, 2) + self.assertEqual(queue.window_floor, 1) + self.assertEqual(B.data['status'], 'MERGED') + + # Only C is left and its merge job is queued. + self.assertEqual(len(self.builds), 1) + self.assertEqual(self.builds[0].name, 'project-merge') + + self.worker.release('.*-merge') + self.waitUntilSettled() + + # After successful merge job the test jobs for C are queued. + self.assertEqual(len(self.builds), 2) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test2') + + self.worker.release('project-.*') + self.waitUntilSettled() + + # C successfully merged so window is bumped to 3. + self.assertEqual(queue.window, 3) + self.assertEqual(queue.window_floor, 1) + self.assertEqual(C.data['status'], 'MERGED') + + def test_queue_rate_limiting_dependent(self): + "Test that DependentPipelines are rate limited with dep in window" + self.config.set('zuul', 'layout_config', + 'tests/fixtures/layout-rate-limit.yaml') + self.sched.reconfigure(self.config) + self.worker.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') + + B.setDependsOn(A, 1) + + self.worker.addFailTest('project-test1', A) + + A.addApproval('CRVW', 2) + B.addApproval('CRVW', 2) + C.addApproval('CRVW', 2) + + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) + self.waitUntilSettled() + + # Only A and B will have their merge jobs queued because + # window is 2. + self.assertEqual(len(self.builds), 2) + self.assertEqual(self.builds[0].name, 'project-merge') + self.assertEqual(self.builds[1].name, 'project-merge') + + self.worker.release('.*-merge') + self.waitUntilSettled() + self.worker.release('.*-merge') + self.waitUntilSettled() + + # Only A and B will have their test jobs queued because + # window is 2. + self.assertEqual(len(self.builds), 4) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test2') + self.assertEqual(self.builds[2].name, 'project-test1') + self.assertEqual(self.builds[3].name, 'project-test2') + + self.worker.release('project-.*') + self.waitUntilSettled() + + queue = self.sched.layout.pipelines['gate'].queues[0] + # A failed so window is reduced by 1 to 1. + self.assertEqual(queue.window, 1) + self.assertEqual(queue.window_floor, 1) + self.assertEqual(A.data['status'], 'NEW') + self.assertEqual(B.data['status'], 'NEW') + + # Gate is reset and only C's merge job is queued because + # window shrunk to 1 and A and B were dequeued. + self.assertEqual(len(self.builds), 1) + self.assertEqual(self.builds[0].name, 'project-merge') + + self.worker.release('.*-merge') + self.waitUntilSettled() + + # Only C's test jobs are queued because window is still 1. + self.assertEqual(len(self.builds), 2) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test2') + + self.worker.release('project-.*') + self.waitUntilSettled() + + # C was successfully merged so window is increased to 2. + self.assertEqual(queue.window, 2) + self.assertEqual(queue.window_floor, 1) + self.assertEqual(C.data['status'], 'MERGED') diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py index 1c1a670e39..de1aec4d53 100644 --- a/zuul/layoutvalidator.py +++ b/zuul/layoutvalidator.py @@ -60,6 +60,10 @@ class LayoutSchema(object): 'subject': str, }, } + window = v.All(int, v.Range(min=1)) + window_floor = v.All(int, v.Range(min=1)) + window_type = v.Any('linear', 'exponential') + window_factor = v.All(int, v.Range(min=1)) pipeline = {v.Required('name'): str, v.Required('manager'): manager, @@ -72,6 +76,12 @@ class LayoutSchema(object): 'success': report_actions, 'failure': report_actions, 'start': report_actions, + 'window': window, + 'window-floor': window_floor, + 'window-increase-type': window_type, + 'window-increase-factor': window_factor, + 'window-decrease-type': window_type, + 'window-decrease-factor': window_factor, } pipelines = [pipeline] diff --git a/zuul/model.py b/zuul/model.py index 3bc284e3ea..904e8f3516 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -59,6 +59,12 @@ class Pipeline(object): self.start_actions = None self.success_actions = None self.failure_actions = None + self.window = None + self.window_floor = None + self.window_increase_type = None + self.window_increase_factor = None + self.window_decrease_type = None + self.window_decrease_factor = None def __repr__(self): return '' % self.name @@ -375,13 +381,21 @@ class ChangeQueue(object): a queue shared by interrelated projects foo and bar, and a second queue for independent project baz. Pipelines have one or more PipelineQueues.""" - def __init__(self, pipeline, dependent=True): + def __init__(self, pipeline, dependent=True, window=0, window_floor=1, + window_increase_type='linear', window_increase_factor=1, + window_decrease_type='exponential', window_decrease_factor=2): self.pipeline = pipeline self.name = '' self.projects = [] self._jobs = set() self.queue = [] self.dependent = dependent + self.window = window + self.window_floor = window_floor + self.window_increase_type = window_increase_type + self.window_increase_factor = window_increase_factor + self.window_decrease_type = window_decrease_type + self.window_decrease_factor = window_decrease_factor def __repr__(self): return '' % (self.pipeline.name, self.name) @@ -398,7 +412,7 @@ class ChangeQueue(object): self._jobs |= set(self.pipeline.getJobTree(project).getJobs()) def enqueueChange(self, change): - item = QueueItem(self.pipeline, change) + item = QueueItem(self, self.pipeline, change) self.enqueueItem(item) item.enqueue_time = time.time() return item @@ -444,6 +458,32 @@ class ChangeQueue(object): def mergeChangeQueue(self, other): for project in other.projects: self.addProject(project) + self.window = min(self.window, other.window) + # TODO merge semantics + + def getActionableItems(self): + if self.dependent and self.window: + return self.queue[:self.window] + else: + return self.queue[:] + + def increaseWindowSize(self): + if self.dependent: + if self.window_increase_type == 'linear': + self.window += self.window_increase_factor + elif self.window_increase_type == 'exponential': + self.window *= self.window_increase_factor + + def decreaseWindowSize(self): + if self.dependent: + if self.window_decrease_type == 'linear': + self.window = max( + self.window_floor, + self.window - self.window_decrease_factor) + elif self.window_decrease_type == 'exponential': + self.window = max( + self.window_floor, + self.window / self.window_decrease_factor) class Project(object): @@ -619,7 +659,8 @@ class BuildSet(object): class QueueItem(object): """A changish inside of a Pipeline queue""" - def __init__(self, pipeline, change): + def __init__(self, change_queue, pipeline, change): + self.change_queue = change_queue self.pipeline = pipeline self.change = change # a changeish self.build_sets = [] diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 5be01013b3..7ca1e35cb2 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -200,6 +200,17 @@ class Scheduler(threading.Thread): pipeline.success_actions = action_reporters['success'] pipeline.failure_actions = action_reporters['failure'] + pipeline.window = conf_pipeline.get('window', 20) + pipeline.window_floor = conf_pipeline.get('window-floor', 3) + pipeline.window_increase_type = conf_pipeline.get( + 'window-increase-type', 'linear') + pipeline.window_increase_factor = conf_pipeline.get( + 'window-increase-factor', 1) + pipeline.window_decrease_type = conf_pipeline.get( + 'window-decrease-type', 'exponential') + pipeline.window_decrease_factor = conf_pipeline.get( + 'window-decrease-factor', 2) + manager = globals()[conf_pipeline['manager']](self, pipeline) pipeline.setManager(manager) layout.pipelines[conf_pipeline['name']] = pipeline @@ -1172,7 +1183,7 @@ class BasePipelineManager(object): for queue in self.pipeline.queues: queue_changed = False nnfi = None # Nearest non-failing item - for item in queue.queue[:]: + for item in queue.getActionableItems(): item_changed, nnfi = self._processOneItem(item, nnfi) if item_changed: queue_changed = True @@ -1247,7 +1258,16 @@ class BasePipelineManager(object): if not (succeeded and merged): self.log.debug("Reported change %s failed tests or failed " "to merge" % (item.change)) + item.change_queue.decreaseWindowSize() + self.log.debug("%s window size decreased to %s" % + (item.change_queue, + item.change_queue.window)) raise MergeFailure("Change %s failed to merge" % item.change) + else: + item.change_queue.increaseWindowSize() + self.log.debug("%s window size increased to %s" % + (item.change_queue, + item.change_queue.window)) def _reportItem(self, item): if item.reported: @@ -1504,7 +1524,14 @@ class DependentPipelineManager(BasePipelineManager): change_queues = [] for project in self.pipeline.getProjects(): - change_queue = ChangeQueue(self.pipeline) + change_queue = ChangeQueue( + self.pipeline, + window=self.pipeline.window, + window_floor=self.pipeline.window_floor, + window_increase_type=self.pipeline.window_increase_type, + window_increase_factor=self.pipeline.window_increase_factor, + window_decrease_type=self.pipeline.window_decrease_type, + window_decrease_factor=self.pipeline.window_decrease_factor) change_queue.addProject(project) change_queues.append(change_queue) self.log.debug("Created queue: %s" % change_queue)