From 7603a37725344918b4edef99b8566172a3940c04 Mon Sep 17 00:00:00 2001 From: Clark Boylan Date: Tue, 21 Jan 2014 11:43:20 -0800 Subject: [PATCH] Add rate limiting to dependent pipeline queues When changes report failure reduce the total number of changes that will be tested concurrently in the pipeline queue the reported change belonged too. Increase the number of changes that will be tested when changes report success. This implements simple rate limiting which should reduce resource thrash when tests are unstable. Change-Id: Id092446c83649b3916751c4e4665d2adc75d0458 --- tests/fixtures/layout-rate-limit.yaml | 32 +++++ tests/test_scheduler.py | 164 ++++++++++++++++++++++++++ zuul/layoutvalidator.py | 10 ++ zuul/model.py | 47 +++++++- zuul/scheduler.py | 31 ++++- 5 files changed, 279 insertions(+), 5 deletions(-) create mode 100644 tests/fixtures/layout-rate-limit.yaml diff --git a/tests/fixtures/layout-rate-limit.yaml b/tests/fixtures/layout-rate-limit.yaml new file mode 100644 index 0000000000..9f6748c98d --- /dev/null +++ b/tests/fixtures/layout-rate-limit.yaml @@ -0,0 +1,32 @@ +pipelines: + - name: gate + manager: DependentPipelineManager + failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures + trigger: + gerrit: + - event: comment-added + approval: + - approved: 1 + start: + gerrit: + verified: 0 + success: + gerrit: + verified: 2 + submit: true + failure: + gerrit: + verified: -2 + window: 2 + window-floor: 1 + window-increase-type: linear + window-increase-factor: 1 + window-decrease-type: exponential + window-decrease-factor: 2 + +projects: + - name: org/project + gate: + - project-merge: + - project-test1 + - project-test2 diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 0b9e5a8a91..0ce0f88c60 100755 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -3301,3 +3301,167 @@ class TestScheduler(testtools.TestCase): self.worker.hold_jobs_in_build = False self.worker.release() self.waitUntilSettled() + + def test_queue_rate_limiting(self): + "Test that DependentPipelines are rate limited with dep across window" + self.config.set('zuul', 'layout_config', + 'tests/fixtures/layout-rate-limit.yaml') + self.sched.reconfigure(self.config) + self.worker.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') + + C.setDependsOn(B, 1) + self.worker.addFailTest('project-test1', A) + + A.addApproval('CRVW', 2) + B.addApproval('CRVW', 2) + C.addApproval('CRVW', 2) + + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) + self.waitUntilSettled() + + # Only A and B will have their merge jobs queued because + # window is 2. + self.assertEqual(len(self.builds), 2) + self.assertEqual(self.builds[0].name, 'project-merge') + self.assertEqual(self.builds[1].name, 'project-merge') + + self.worker.release('.*-merge') + self.waitUntilSettled() + self.worker.release('.*-merge') + self.waitUntilSettled() + + # Only A and B will have their test jobs queued because + # window is 2. + self.assertEqual(len(self.builds), 4) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test2') + self.assertEqual(self.builds[2].name, 'project-test1') + self.assertEqual(self.builds[3].name, 'project-test2') + + self.worker.release('project-.*') + self.waitUntilSettled() + + queue = self.sched.layout.pipelines['gate'].queues[0] + # A failed so window is reduced by 1 to 1. + self.assertEqual(queue.window, 1) + self.assertEqual(queue.window_floor, 1) + self.assertEqual(A.data['status'], 'NEW') + + # Gate is reset and only B's merge job is queued because + # window shrunk to 1. + self.assertEqual(len(self.builds), 1) + self.assertEqual(self.builds[0].name, 'project-merge') + + self.worker.release('.*-merge') + self.waitUntilSettled() + + # Only B's test jobs are queued because window is still 1. + self.assertEqual(len(self.builds), 2) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test2') + + self.worker.release('project-.*') + self.waitUntilSettled() + + # B was successfully merged so window is increased to 2. + self.assertEqual(queue.window, 2) + self.assertEqual(queue.window_floor, 1) + self.assertEqual(B.data['status'], 'MERGED') + + # Only C is left and its merge job is queued. + self.assertEqual(len(self.builds), 1) + self.assertEqual(self.builds[0].name, 'project-merge') + + self.worker.release('.*-merge') + self.waitUntilSettled() + + # After successful merge job the test jobs for C are queued. + self.assertEqual(len(self.builds), 2) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test2') + + self.worker.release('project-.*') + self.waitUntilSettled() + + # C successfully merged so window is bumped to 3. + self.assertEqual(queue.window, 3) + self.assertEqual(queue.window_floor, 1) + self.assertEqual(C.data['status'], 'MERGED') + + def test_queue_rate_limiting_dependent(self): + "Test that DependentPipelines are rate limited with dep in window" + self.config.set('zuul', 'layout_config', + 'tests/fixtures/layout-rate-limit.yaml') + self.sched.reconfigure(self.config) + self.worker.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') + + B.setDependsOn(A, 1) + + self.worker.addFailTest('project-test1', A) + + A.addApproval('CRVW', 2) + B.addApproval('CRVW', 2) + C.addApproval('CRVW', 2) + + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) + self.waitUntilSettled() + + # Only A and B will have their merge jobs queued because + # window is 2. + self.assertEqual(len(self.builds), 2) + self.assertEqual(self.builds[0].name, 'project-merge') + self.assertEqual(self.builds[1].name, 'project-merge') + + self.worker.release('.*-merge') + self.waitUntilSettled() + self.worker.release('.*-merge') + self.waitUntilSettled() + + # Only A and B will have their test jobs queued because + # window is 2. + self.assertEqual(len(self.builds), 4) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test2') + self.assertEqual(self.builds[2].name, 'project-test1') + self.assertEqual(self.builds[3].name, 'project-test2') + + self.worker.release('project-.*') + self.waitUntilSettled() + + queue = self.sched.layout.pipelines['gate'].queues[0] + # A failed so window is reduced by 1 to 1. + self.assertEqual(queue.window, 1) + self.assertEqual(queue.window_floor, 1) + self.assertEqual(A.data['status'], 'NEW') + self.assertEqual(B.data['status'], 'NEW') + + # Gate is reset and only C's merge job is queued because + # window shrunk to 1 and A and B were dequeued. + self.assertEqual(len(self.builds), 1) + self.assertEqual(self.builds[0].name, 'project-merge') + + self.worker.release('.*-merge') + self.waitUntilSettled() + + # Only C's test jobs are queued because window is still 1. + self.assertEqual(len(self.builds), 2) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test2') + + self.worker.release('project-.*') + self.waitUntilSettled() + + # C was successfully merged so window is increased to 2. + self.assertEqual(queue.window, 2) + self.assertEqual(queue.window_floor, 1) + self.assertEqual(C.data['status'], 'MERGED') diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py index 1c1a670e39..de1aec4d53 100644 --- a/zuul/layoutvalidator.py +++ b/zuul/layoutvalidator.py @@ -60,6 +60,10 @@ class LayoutSchema(object): 'subject': str, }, } + window = v.All(int, v.Range(min=1)) + window_floor = v.All(int, v.Range(min=1)) + window_type = v.Any('linear', 'exponential') + window_factor = v.All(int, v.Range(min=1)) pipeline = {v.Required('name'): str, v.Required('manager'): manager, @@ -72,6 +76,12 @@ class LayoutSchema(object): 'success': report_actions, 'failure': report_actions, 'start': report_actions, + 'window': window, + 'window-floor': window_floor, + 'window-increase-type': window_type, + 'window-increase-factor': window_factor, + 'window-decrease-type': window_type, + 'window-decrease-factor': window_factor, } pipelines = [pipeline] diff --git a/zuul/model.py b/zuul/model.py index 3bc284e3ea..904e8f3516 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -59,6 +59,12 @@ class Pipeline(object): self.start_actions = None self.success_actions = None self.failure_actions = None + self.window = None + self.window_floor = None + self.window_increase_type = None + self.window_increase_factor = None + self.window_decrease_type = None + self.window_decrease_factor = None def __repr__(self): return '' % self.name @@ -375,13 +381,21 @@ class ChangeQueue(object): a queue shared by interrelated projects foo and bar, and a second queue for independent project baz. Pipelines have one or more PipelineQueues.""" - def __init__(self, pipeline, dependent=True): + def __init__(self, pipeline, dependent=True, window=0, window_floor=1, + window_increase_type='linear', window_increase_factor=1, + window_decrease_type='exponential', window_decrease_factor=2): self.pipeline = pipeline self.name = '' self.projects = [] self._jobs = set() self.queue = [] self.dependent = dependent + self.window = window + self.window_floor = window_floor + self.window_increase_type = window_increase_type + self.window_increase_factor = window_increase_factor + self.window_decrease_type = window_decrease_type + self.window_decrease_factor = window_decrease_factor def __repr__(self): return '' % (self.pipeline.name, self.name) @@ -398,7 +412,7 @@ class ChangeQueue(object): self._jobs |= set(self.pipeline.getJobTree(project).getJobs()) def enqueueChange(self, change): - item = QueueItem(self.pipeline, change) + item = QueueItem(self, self.pipeline, change) self.enqueueItem(item) item.enqueue_time = time.time() return item @@ -444,6 +458,32 @@ class ChangeQueue(object): def mergeChangeQueue(self, other): for project in other.projects: self.addProject(project) + self.window = min(self.window, other.window) + # TODO merge semantics + + def getActionableItems(self): + if self.dependent and self.window: + return self.queue[:self.window] + else: + return self.queue[:] + + def increaseWindowSize(self): + if self.dependent: + if self.window_increase_type == 'linear': + self.window += self.window_increase_factor + elif self.window_increase_type == 'exponential': + self.window *= self.window_increase_factor + + def decreaseWindowSize(self): + if self.dependent: + if self.window_decrease_type == 'linear': + self.window = max( + self.window_floor, + self.window - self.window_decrease_factor) + elif self.window_decrease_type == 'exponential': + self.window = max( + self.window_floor, + self.window / self.window_decrease_factor) class Project(object): @@ -619,7 +659,8 @@ class BuildSet(object): class QueueItem(object): """A changish inside of a Pipeline queue""" - def __init__(self, pipeline, change): + def __init__(self, change_queue, pipeline, change): + self.change_queue = change_queue self.pipeline = pipeline self.change = change # a changeish self.build_sets = [] diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 5be01013b3..7ca1e35cb2 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -200,6 +200,17 @@ class Scheduler(threading.Thread): pipeline.success_actions = action_reporters['success'] pipeline.failure_actions = action_reporters['failure'] + pipeline.window = conf_pipeline.get('window', 20) + pipeline.window_floor = conf_pipeline.get('window-floor', 3) + pipeline.window_increase_type = conf_pipeline.get( + 'window-increase-type', 'linear') + pipeline.window_increase_factor = conf_pipeline.get( + 'window-increase-factor', 1) + pipeline.window_decrease_type = conf_pipeline.get( + 'window-decrease-type', 'exponential') + pipeline.window_decrease_factor = conf_pipeline.get( + 'window-decrease-factor', 2) + manager = globals()[conf_pipeline['manager']](self, pipeline) pipeline.setManager(manager) layout.pipelines[conf_pipeline['name']] = pipeline @@ -1172,7 +1183,7 @@ class BasePipelineManager(object): for queue in self.pipeline.queues: queue_changed = False nnfi = None # Nearest non-failing item - for item in queue.queue[:]: + for item in queue.getActionableItems(): item_changed, nnfi = self._processOneItem(item, nnfi) if item_changed: queue_changed = True @@ -1247,7 +1258,16 @@ class BasePipelineManager(object): if not (succeeded and merged): self.log.debug("Reported change %s failed tests or failed " "to merge" % (item.change)) + item.change_queue.decreaseWindowSize() + self.log.debug("%s window size decreased to %s" % + (item.change_queue, + item.change_queue.window)) raise MergeFailure("Change %s failed to merge" % item.change) + else: + item.change_queue.increaseWindowSize() + self.log.debug("%s window size increased to %s" % + (item.change_queue, + item.change_queue.window)) def _reportItem(self, item): if item.reported: @@ -1504,7 +1524,14 @@ class DependentPipelineManager(BasePipelineManager): change_queues = [] for project in self.pipeline.getProjects(): - change_queue = ChangeQueue(self.pipeline) + change_queue = ChangeQueue( + self.pipeline, + window=self.pipeline.window, + window_floor=self.pipeline.window_floor, + window_increase_type=self.pipeline.window_increase_type, + window_increase_factor=self.pipeline.window_increase_factor, + window_decrease_type=self.pipeline.window_decrease_type, + window_decrease_factor=self.pipeline.window_decrease_factor) change_queue.addProject(project) change_queues.append(change_queue) self.log.debug("Created queue: %s" % change_queue)