From bfb8e049487d76acbf685762d6a4725f49670e5a Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Tue, 30 Dec 2014 17:01:44 -0800 Subject: [PATCH] Support cross-repo-dependencies in independent pipelines Changes supplied by the Depends-On header in git commit messages will now additionally pull in changes from any project, on any branch into independent pipelines. They are considered "non-live" changes, that is, changes that should not have any tests run on them. To accomodate this, the internal structure of an independent pipeline is now a dynamically adjusting list of ChangeQueue objects where a new ChangeQueue is created for each change added to the pipeline (and that ChangeQueue might contain non-live changes enqueued ahead of the change we are interested in for the purpose of stacking commits in the associated Zuul refs). This actually more closely matches the visual and intuitive understanding of independent pipelines. Change-Id: I8ba0bc0918263f297666a50c607bca4f87c903b8 --- tests/test_scheduler.py | 44 +++++++++++++++- tests/test_zuultrigger.py | 2 +- zuul/model.py | 34 +++++++----- zuul/scheduler.py | 107 +++++++++++++++++++++++++++++++------- 4 files changed, 153 insertions(+), 34 deletions(-) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 3837cfac5b..059f155f42 100755 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -1921,7 +1921,7 @@ class TestScheduler(ZuulTestCase): status_jobs = set() for p in data['pipelines']: for q in p['change_queues']: - if q['dependent']: + if p['name'] in ['gate', 'conflict']: self.assertEqual(q['window'], 20) else: self.assertEqual(q['window'], 0) @@ -3014,3 +3014,45 @@ For CI problems and help debugging, contact ci@example.org""" self.assertEqual(B.reported, 0) self.assertEqual(A.data['status'], 'NEW') self.assertEqual(B.data['status'], 'NEW') + + def test_crd_check(self): + "Test cross-repo dependencies in independent pipelines" + + self.gearman_server.hold_jobs_in_queue = True + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B') + + # A Depends-On: B + A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % ( + A.subject, B.data['id']) + + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + queue = self.gearman_server.getQueue() + ref = self.getParameter(queue[-1], 'ZUUL_REF') + self.gearman_server.hold_jobs_in_queue = False + self.gearman_server.release() + self.waitUntilSettled() + + path = os.path.join(self.git_root, "org/project1") + repo = git.Repo(path) + repo_messages = [c.message.strip() for c in repo.iter_commits(ref)] + repo_messages.reverse() + correct_messages = ['initial commit', 'A-1'] + self.assertEqual(repo_messages, correct_messages) + + path = os.path.join(self.git_root, "org/project2") + repo = git.Repo(path) + repo_messages = [c.message.strip() for c in repo.iter_commits(ref)] + repo_messages.reverse() + correct_messages = ['initial commit', 'B-1'] + self.assertEqual(repo_messages, correct_messages) + + self.assertEqual(A.data['status'], 'NEW') + self.assertEqual(B.data['status'], 'NEW') + self.assertEqual(A.reported, 1) + self.assertEqual(B.reported, 0) + + self.assertEqual(self.history[0].changes, '2,1 1,1') + self.assertEqual(len(self.sched.layout.pipelines['check'].queues), 0) diff --git a/tests/test_zuultrigger.py b/tests/test_zuultrigger.py index 3f339beffe..a26fa8605d 100644 --- a/tests/test_zuultrigger.py +++ b/tests/test_zuultrigger.py @@ -65,7 +65,7 @@ class TestZuulTrigger(ZuulTestCase): for job in self.history: if job.changes == '1,1': self.assertEqual(job.name, 'project-gate') - elif job.changes == '2,1': + elif job.changes == '1,1 2,1': self.assertEqual(job.name, 'project-check') elif job.changes == '1,1 3,1': self.assertEqual(job.name, 'project-gate') diff --git a/zuul/model.py b/zuul/model.py index 3bba28467c..2a69b79029 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -111,6 +111,9 @@ class Pipeline(object): return queue return None + def removeQueue(self, queue): + self.queues.remove(queue) + def getJobTree(self, project): tree = self.job_trees.get(project) return tree @@ -148,6 +151,8 @@ class Pipeline(object): return torun def findJobsToRun(self, item): + if not item.live: + return [] tree = self.getJobTree(item.change.project) if not tree: return [] @@ -193,6 +198,8 @@ class Pipeline(object): return False def isHoldingFollowingChanges(self, item): + if not item.live: + return False for job in self.getJobs(item.change): if not job.hold_following_changes: continue @@ -256,7 +263,6 @@ class Pipeline(object): j_queues.append(j_queue) j_queue['heads'] = [] j_queue['window'] = queue.window - j_queue['dependent'] = queue.dependent j_changes = [] for e in queue.queue: @@ -303,8 +309,8 @@ class ChangeQueue(object): different projects; this is one of them. For instance, there may a queue shared by interrelated projects foo and bar, and a second queue for independent project baz. Pipelines have one or more - PipelineQueues.""" - def __init__(self, pipeline, dependent=True, window=0, window_floor=1, + ChangeQueues.""" + def __init__(self, pipeline, window=0, window_floor=1, window_increase_type='linear', window_increase_factor=1, window_decrease_type='exponential', window_decrease_factor=2): self.pipeline = pipeline @@ -314,7 +320,6 @@ class ChangeQueue(object): self.projects = [] self._jobs = set() self.queue = [] - self.dependent = dependent self.window = window self.window_floor = window_floor self.window_increase_type = window_increase_type @@ -348,14 +353,15 @@ class ChangeQueue(object): self.name = self.assigned_name or self.generated_name def enqueueChange(self, change): - item = QueueItem(self.pipeline, change) + item = QueueItem(self, change) self.enqueueItem(item) item.enqueue_time = time.time() return item def enqueueItem(self, item): item.pipeline = self.pipeline - if self.dependent and self.queue: + item.queue = self + if self.queue: item.item_ahead = self.queue[-1] item.item_ahead.items_behind.append(item) self.queue.append(item) @@ -374,8 +380,6 @@ class ChangeQueue(object): item.dequeue_time = time.time() def moveItem(self, item, item_ahead): - if not self.dependent: - return False if item.item_ahead == item_ahead: return False # Remove from current location @@ -399,20 +403,20 @@ class ChangeQueue(object): # TODO merge semantics def isActionable(self, item): - if self.dependent and self.window: + if self.window: return item in self.queue[:self.window] else: return True def increaseWindowSize(self): - if self.dependent: + if self.window: if self.window_increase_type == 'linear': self.window += self.window_increase_factor elif self.window_increase_type == 'exponential': self.window *= self.window_increase_factor def decreaseWindowSize(self): - if self.dependent: + if self.window: if self.window_decrease_type == 'linear': self.window = max( self.window_floor, @@ -650,8 +654,9 @@ class BuildSet(object): class QueueItem(object): """A changish inside of a Pipeline queue""" - def __init__(self, pipeline, change): - self.pipeline = pipeline + def __init__(self, queue, change): + self.pipeline = queue.pipeline + self.queue = queue self.change = change # a changeish self.build_sets = [] self.dequeued_needing_change = False @@ -662,7 +667,8 @@ class QueueItem(object): self.enqueue_time = None self.dequeue_time = None self.reported = False - self.active = False + self.active = False # Whether an item is within an active window + self.live = True # Whether an item is intended to be processed at all def __repr__(self): if self.pipeline: diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 3c09e0a92f..ffb8e3f83d 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -638,11 +638,15 @@ class Scheduler(threading.Thread): self.log.debug("Re-enqueueing changes for pipeline %s" % name) items_to_remove = [] builds_to_remove = [] + last_head = None for shared_queue in old_pipeline.queues: for item in shared_queue.queue: + if not item.item_ahead: + last_head = item item.item_ahead = None item.items_behind = [] item.pipeline = None + item.queue = None project = layout.projects.get(item.change.project.name) if not project: self.log.warning("Unable to find project for " @@ -658,7 +662,8 @@ class Scheduler(threading.Thread): build.job = job else: builds_to_remove.append(build) - if not new_pipeline.manager.reEnqueueItem(item): + if not new_pipeline.manager.reEnqueueItem(item, + last_head): items_to_remove.append(item) for item in items_to_remove: for build in item.current_build_set.getBuilds(): @@ -1114,8 +1119,11 @@ class BasePipelineManager(object): self.log.debug("Change %s abandoned, removing." % change) self.removeChange(change) - def reEnqueueItem(self, item): - change_queue = self.pipeline.getQueue(item.change.project) + def reEnqueueItem(self, item, last_head): + if last_head.queue: + change_queue = last_head.queue + else: + change_queue = self.getChangeQueue(item.change) if change_queue: self.log.debug("Re-enqueing change %s in queue %s" % (item.change, change_queue)) @@ -1128,7 +1136,8 @@ class BasePipelineManager(object): return False def addChange(self, change, quiet=False, enqueue_time=None, - ignore_requirements=False, change_queue=None): + ignore_requirements=False, live=True, + change_queue=None): self.log.debug("Considering adding change %s" % change) if self.isChangeAlreadyInQueue(change): self.log.debug("Change %s is already in queue, ignoring" % change) @@ -1171,6 +1180,7 @@ class BasePipelineManager(object): item = change_queue.enqueueChange(change) if enqueue_time: item.enqueue_time = enqueue_time + item.live = live self.reportStats(item) self.enqueueChangesBehind(change, quiet, ignore_requirements, change_queue) @@ -1180,8 +1190,7 @@ class BasePipelineManager(object): def dequeueItem(self, item): self.log.debug("Removing change %s from queue" % item.change) - change_queue = self.pipeline.getQueue(item.change.project) - change_queue.dequeueItem(item) + item.queue.dequeueItem(item) def removeChange(self, change): # Remove a change from the queue, probably because it has been @@ -1292,7 +1301,9 @@ class BasePipelineManager(object): def _processOneItem(self, item, nnfi, ready_ahead): changed = False item_ahead = item.item_ahead - change_queue = self.pipeline.getQueue(item.change.project) + if item_ahead and (not item_ahead.live): + item_ahead = None + change_queue = item.queue failing_reasons = [] # Reasons this item is failing if self.checkForChangesNeededBy(item.change) is not True: @@ -1316,8 +1327,7 @@ class BasePipelineManager(object): self.cancelJobs(item, prime=False) else: item_ahead_merged = False - if ((item_ahead and item_ahead.change.is_merged) or - not change_queue.dependent): + if (item_ahead and item_ahead.change.is_merged): item_ahead_merged = True if (item_ahead != nnfi and not item_ahead_merged): # Our current base is different than what we expected, @@ -1340,6 +1350,10 @@ class BasePipelineManager(object): changed = True if self.pipeline.didAnyJobFail(item): failing_reasons.append("at least one job failed") + if (not item.live) and (not item.items_behind): + failing_reasons.append("is a non-live item with no items behind") + self.dequeueItem(item) + changed = True if (not item_ahead) and self.pipeline.areAllJobsComplete(item): try: self.reportItem(item) @@ -1352,7 +1366,7 @@ class BasePipelineManager(object): self.cancelJobs(item_behind) self.dequeueItem(item) changed = True - elif not failing_reasons: + elif not failing_reasons and item.live: nnfi = item item.current_build_set.failing_reasons = failing_reasons if failing_reasons: @@ -1436,7 +1450,7 @@ class BasePipelineManager(object): item.change.branch) self.log.info("Reported change %s status: all-succeeded: %s, " "merged: %s" % (item.change, succeeded, merged)) - change_queue = self.pipeline.getQueue(item.change.project) + change_queue = item.queue if not (succeeded and merged): self.log.debug("Reported change %s failed tests or failed " "to merge" % (item.change)) @@ -1693,14 +1707,71 @@ class IndependentPipelineManager(BasePipelineManager): def _postConfig(self, layout): super(IndependentPipelineManager, self)._postConfig(layout) - change_queue = ChangeQueue(self.pipeline, dependent=False) - for project in self.pipeline.getProjects(): - change_queue.addProject(project) - - self.pipeline.addQueue(change_queue) - def getChangeQueue(self, change): - return self.pipeline.getQueue(change.project) + # creates a new change queue for every change + if change.project not in self.pipeline.getProjects(): + return None + change_queue = ChangeQueue(self.pipeline) + change_queue.addProject(change.project) + self.pipeline.addQueue(change_queue) + return change_queue + + def enqueueChangesAhead(self, change, quiet, ignore_requirements, + change_queue): + ret = self.checkForChangesNeededBy(change) + if ret in [True, False]: + return ret + self.log.debug(" Changes %s must be merged ahead of %s" % + (ret, change)) + for needed_change in ret: + # This differs from the dependent pipeline by enqueuing + # changes ahead as "not live", that is, not intended to + # have jobs run. Also, pipeline requirements are always + # ignored (which is safe because the changes are not + # live). + r = self.addChange(needed_change, quiet=True, + ignore_requirements=True, + live=False, change_queue=change_queue) + if not r: + return False + return True + + def checkForChangesNeededBy(self, change): + self.log.debug("Checking for changes needed by %s:" % change) + # Return true if okay to proceed enqueing this change, + # false if the change should not be enqueued. + if not hasattr(change, 'needs_changes'): + self.log.debug(" Changeish does not support dependencies") + return True + if not change.needs_changes: + self.log.debug(" No changes needed") + return True + changes_needed = [] + for needed_change in change.needs_changes: + self.log.debug(" Change %s needs change %s:" % ( + change, needed_change)) + if needed_change.is_merged: + self.log.debug(" Needed change is merged") + continue + if self.isChangeAlreadyInQueue(needed_change): + self.log.debug(" Needed change is already ahead in the queue") + continue + self.log.debug(" Change %s is needed" % needed_change) + if needed_change not in changes_needed: + changes_needed.append(needed_change) + continue + # This differs from the dependent pipeline check in not + # verifying that the dependent change is mergable. + if changes_needed: + return changes_needed + return True + + def dequeueItem(self, item): + super(IndependentPipelineManager, self).dequeueItem(item) + # An independent pipeline manager dynamically removes empty + # queues + if not item.queue.queue: + self.pipeline.removeQueue(item.queue) class DependentPipelineManager(BasePipelineManager):