Support cross-repo-dependencies in independent pipelines

Changes supplied by the Depends-On header in git commit messages
will now additionally pull in changes from any project, on any
branch into independent pipelines.  They are considered "non-live"
changes, that is, changes that should not have any tests run on
them.

To accomodate this, the internal structure of an independent
pipeline is now a dynamically adjusting list of ChangeQueue objects
where a new ChangeQueue is created for each change added to the
pipeline (and that ChangeQueue might contain non-live changes
enqueued ahead of the change we are interested in for the purpose
of stacking commits in the associated Zuul refs).  This actually
more closely matches the visual and intuitive understanding of
independent pipelines.

Change-Id: I8ba0bc0918263f297666a50c607bca4f87c903b8
This commit is contained in:
James E. Blair 2014-12-30 17:01:44 -08:00
parent 5ee2425651
commit bfb8e04948
4 changed files with 153 additions and 34 deletions

View File

@ -1921,7 +1921,7 @@ class TestScheduler(ZuulTestCase):
status_jobs = set()
for p in data['pipelines']:
for q in p['change_queues']:
if q['dependent']:
if p['name'] in ['gate', 'conflict']:
self.assertEqual(q['window'], 20)
else:
self.assertEqual(q['window'], 0)
@ -3014,3 +3014,45 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(B.reported, 0)
self.assertEqual(A.data['status'], 'NEW')
self.assertEqual(B.data['status'], 'NEW')
def test_crd_check(self):
"Test cross-repo dependencies in independent pipelines"
self.gearman_server.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
# A Depends-On: B
A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
A.subject, B.data['id'])
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
queue = self.gearman_server.getQueue()
ref = self.getParameter(queue[-1], 'ZUUL_REF')
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.waitUntilSettled()
path = os.path.join(self.git_root, "org/project1")
repo = git.Repo(path)
repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
repo_messages.reverse()
correct_messages = ['initial commit', 'A-1']
self.assertEqual(repo_messages, correct_messages)
path = os.path.join(self.git_root, "org/project2")
repo = git.Repo(path)
repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
repo_messages.reverse()
correct_messages = ['initial commit', 'B-1']
self.assertEqual(repo_messages, correct_messages)
self.assertEqual(A.data['status'], 'NEW')
self.assertEqual(B.data['status'], 'NEW')
self.assertEqual(A.reported, 1)
self.assertEqual(B.reported, 0)
self.assertEqual(self.history[0].changes, '2,1 1,1')
self.assertEqual(len(self.sched.layout.pipelines['check'].queues), 0)

View File

@ -65,7 +65,7 @@ class TestZuulTrigger(ZuulTestCase):
for job in self.history:
if job.changes == '1,1':
self.assertEqual(job.name, 'project-gate')
elif job.changes == '2,1':
elif job.changes == '1,1 2,1':
self.assertEqual(job.name, 'project-check')
elif job.changes == '1,1 3,1':
self.assertEqual(job.name, 'project-gate')

View File

@ -111,6 +111,9 @@ class Pipeline(object):
return queue
return None
def removeQueue(self, queue):
self.queues.remove(queue)
def getJobTree(self, project):
tree = self.job_trees.get(project)
return tree
@ -148,6 +151,8 @@ class Pipeline(object):
return torun
def findJobsToRun(self, item):
if not item.live:
return []
tree = self.getJobTree(item.change.project)
if not tree:
return []
@ -193,6 +198,8 @@ class Pipeline(object):
return False
def isHoldingFollowingChanges(self, item):
if not item.live:
return False
for job in self.getJobs(item.change):
if not job.hold_following_changes:
continue
@ -256,7 +263,6 @@ class Pipeline(object):
j_queues.append(j_queue)
j_queue['heads'] = []
j_queue['window'] = queue.window
j_queue['dependent'] = queue.dependent
j_changes = []
for e in queue.queue:
@ -303,8 +309,8 @@ class ChangeQueue(object):
different projects; this is one of them. For instance, there may
a queue shared by interrelated projects foo and bar, and a second
queue for independent project baz. Pipelines have one or more
PipelineQueues."""
def __init__(self, pipeline, dependent=True, window=0, window_floor=1,
ChangeQueues."""
def __init__(self, pipeline, window=0, window_floor=1,
window_increase_type='linear', window_increase_factor=1,
window_decrease_type='exponential', window_decrease_factor=2):
self.pipeline = pipeline
@ -314,7 +320,6 @@ class ChangeQueue(object):
self.projects = []
self._jobs = set()
self.queue = []
self.dependent = dependent
self.window = window
self.window_floor = window_floor
self.window_increase_type = window_increase_type
@ -348,14 +353,15 @@ class ChangeQueue(object):
self.name = self.assigned_name or self.generated_name
def enqueueChange(self, change):
item = QueueItem(self.pipeline, change)
item = QueueItem(self, change)
self.enqueueItem(item)
item.enqueue_time = time.time()
return item
def enqueueItem(self, item):
item.pipeline = self.pipeline
if self.dependent and self.queue:
item.queue = self
if self.queue:
item.item_ahead = self.queue[-1]
item.item_ahead.items_behind.append(item)
self.queue.append(item)
@ -374,8 +380,6 @@ class ChangeQueue(object):
item.dequeue_time = time.time()
def moveItem(self, item, item_ahead):
if not self.dependent:
return False
if item.item_ahead == item_ahead:
return False
# Remove from current location
@ -399,20 +403,20 @@ class ChangeQueue(object):
# TODO merge semantics
def isActionable(self, item):
if self.dependent and self.window:
if self.window:
return item in self.queue[:self.window]
else:
return True
def increaseWindowSize(self):
if self.dependent:
if self.window:
if self.window_increase_type == 'linear':
self.window += self.window_increase_factor
elif self.window_increase_type == 'exponential':
self.window *= self.window_increase_factor
def decreaseWindowSize(self):
if self.dependent:
if self.window:
if self.window_decrease_type == 'linear':
self.window = max(
self.window_floor,
@ -650,8 +654,9 @@ class BuildSet(object):
class QueueItem(object):
"""A changish inside of a Pipeline queue"""
def __init__(self, pipeline, change):
self.pipeline = pipeline
def __init__(self, queue, change):
self.pipeline = queue.pipeline
self.queue = queue
self.change = change # a changeish
self.build_sets = []
self.dequeued_needing_change = False
@ -662,7 +667,8 @@ class QueueItem(object):
self.enqueue_time = None
self.dequeue_time = None
self.reported = False
self.active = False
self.active = False # Whether an item is within an active window
self.live = True # Whether an item is intended to be processed at all
def __repr__(self):
if self.pipeline:

View File

@ -638,11 +638,15 @@ class Scheduler(threading.Thread):
self.log.debug("Re-enqueueing changes for pipeline %s" % name)
items_to_remove = []
builds_to_remove = []
last_head = None
for shared_queue in old_pipeline.queues:
for item in shared_queue.queue:
if not item.item_ahead:
last_head = item
item.item_ahead = None
item.items_behind = []
item.pipeline = None
item.queue = None
project = layout.projects.get(item.change.project.name)
if not project:
self.log.warning("Unable to find project for "
@ -658,7 +662,8 @@ class Scheduler(threading.Thread):
build.job = job
else:
builds_to_remove.append(build)
if not new_pipeline.manager.reEnqueueItem(item):
if not new_pipeline.manager.reEnqueueItem(item,
last_head):
items_to_remove.append(item)
for item in items_to_remove:
for build in item.current_build_set.getBuilds():
@ -1114,8 +1119,11 @@ class BasePipelineManager(object):
self.log.debug("Change %s abandoned, removing." % change)
self.removeChange(change)
def reEnqueueItem(self, item):
change_queue = self.pipeline.getQueue(item.change.project)
def reEnqueueItem(self, item, last_head):
if last_head.queue:
change_queue = last_head.queue
else:
change_queue = self.getChangeQueue(item.change)
if change_queue:
self.log.debug("Re-enqueing change %s in queue %s" %
(item.change, change_queue))
@ -1128,7 +1136,8 @@ class BasePipelineManager(object):
return False
def addChange(self, change, quiet=False, enqueue_time=None,
ignore_requirements=False, change_queue=None):
ignore_requirements=False, live=True,
change_queue=None):
self.log.debug("Considering adding change %s" % change)
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s is already in queue, ignoring" % change)
@ -1171,6 +1180,7 @@ class BasePipelineManager(object):
item = change_queue.enqueueChange(change)
if enqueue_time:
item.enqueue_time = enqueue_time
item.live = live
self.reportStats(item)
self.enqueueChangesBehind(change, quiet, ignore_requirements,
change_queue)
@ -1180,8 +1190,7 @@ class BasePipelineManager(object):
def dequeueItem(self, item):
self.log.debug("Removing change %s from queue" % item.change)
change_queue = self.pipeline.getQueue(item.change.project)
change_queue.dequeueItem(item)
item.queue.dequeueItem(item)
def removeChange(self, change):
# Remove a change from the queue, probably because it has been
@ -1292,7 +1301,9 @@ class BasePipelineManager(object):
def _processOneItem(self, item, nnfi, ready_ahead):
changed = False
item_ahead = item.item_ahead
change_queue = self.pipeline.getQueue(item.change.project)
if item_ahead and (not item_ahead.live):
item_ahead = None
change_queue = item.queue
failing_reasons = [] # Reasons this item is failing
if self.checkForChangesNeededBy(item.change) is not True:
@ -1316,8 +1327,7 @@ class BasePipelineManager(object):
self.cancelJobs(item, prime=False)
else:
item_ahead_merged = False
if ((item_ahead and item_ahead.change.is_merged) or
not change_queue.dependent):
if (item_ahead and item_ahead.change.is_merged):
item_ahead_merged = True
if (item_ahead != nnfi and not item_ahead_merged):
# Our current base is different than what we expected,
@ -1340,6 +1350,10 @@ class BasePipelineManager(object):
changed = True
if self.pipeline.didAnyJobFail(item):
failing_reasons.append("at least one job failed")
if (not item.live) and (not item.items_behind):
failing_reasons.append("is a non-live item with no items behind")
self.dequeueItem(item)
changed = True
if (not item_ahead) and self.pipeline.areAllJobsComplete(item):
try:
self.reportItem(item)
@ -1352,7 +1366,7 @@ class BasePipelineManager(object):
self.cancelJobs(item_behind)
self.dequeueItem(item)
changed = True
elif not failing_reasons:
elif not failing_reasons and item.live:
nnfi = item
item.current_build_set.failing_reasons = failing_reasons
if failing_reasons:
@ -1436,7 +1450,7 @@ class BasePipelineManager(object):
item.change.branch)
self.log.info("Reported change %s status: all-succeeded: %s, "
"merged: %s" % (item.change, succeeded, merged))
change_queue = self.pipeline.getQueue(item.change.project)
change_queue = item.queue
if not (succeeded and merged):
self.log.debug("Reported change %s failed tests or failed "
"to merge" % (item.change))
@ -1693,14 +1707,71 @@ class IndependentPipelineManager(BasePipelineManager):
def _postConfig(self, layout):
super(IndependentPipelineManager, self)._postConfig(layout)
change_queue = ChangeQueue(self.pipeline, dependent=False)
for project in self.pipeline.getProjects():
change_queue.addProject(project)
self.pipeline.addQueue(change_queue)
def getChangeQueue(self, change):
return self.pipeline.getQueue(change.project)
# creates a new change queue for every change
if change.project not in self.pipeline.getProjects():
return None
change_queue = ChangeQueue(self.pipeline)
change_queue.addProject(change.project)
self.pipeline.addQueue(change_queue)
return change_queue
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
change_queue):
ret = self.checkForChangesNeededBy(change)
if ret in [True, False]:
return ret
self.log.debug(" Changes %s must be merged ahead of %s" %
(ret, change))
for needed_change in ret:
# This differs from the dependent pipeline by enqueuing
# changes ahead as "not live", that is, not intended to
# have jobs run. Also, pipeline requirements are always
# ignored (which is safe because the changes are not
# live).
r = self.addChange(needed_change, quiet=True,
ignore_requirements=True,
live=False, change_queue=change_queue)
if not r:
return False
return True
def checkForChangesNeededBy(self, change):
self.log.debug("Checking for changes needed by %s:" % change)
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
if not hasattr(change, 'needs_changes'):
self.log.debug(" Changeish does not support dependencies")
return True
if not change.needs_changes:
self.log.debug(" No changes needed")
return True
changes_needed = []
for needed_change in change.needs_changes:
self.log.debug(" Change %s needs change %s:" % (
change, needed_change))
if needed_change.is_merged:
self.log.debug(" Needed change is merged")
continue
if self.isChangeAlreadyInQueue(needed_change):
self.log.debug(" Needed change is already ahead in the queue")
continue
self.log.debug(" Change %s is needed" % needed_change)
if needed_change not in changes_needed:
changes_needed.append(needed_change)
continue
# This differs from the dependent pipeline check in not
# verifying that the dependent change is mergable.
if changes_needed:
return changes_needed
return True
def dequeueItem(self, item):
super(IndependentPipelineManager, self).dequeueItem(item)
# An independent pipeline manager dynamically removes empty
# queues
if not item.queue.queue:
self.pipeline.removeQueue(item.queue)
class DependentPipelineManager(BasePipelineManager):