From 972e3c78e8c7b37a7aecffef8e5f27d77d41a99d Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 29 Aug 2013 12:04:55 -0700 Subject: [PATCH] Use NNFI scheduler algorithm Update the scheduler algorithm to NNFI -- Nearest Non-Failing Item. A stateless description of the algorithm is that jobs for every item should always be run based on the repository state(s) set by the nearest non-failing item ahead of it in its change queue. This means that should an item fail (for any reason -- failure to merge, a merge conflict, or job failure) changes after it will have their builds canceled and restarted with the assumption that the failed change will not merge but the nearest non-failing change ahead will merge. This should mean that dependent queues will always be running jobs and no longer need to wait for a failing change to merge or not merge before restarting jobs. This removes the dequeue-on-conflict behavior because there is now no cost to keeping an item that can not merge in the queue. The documentation and associated test for this are removed. This also removes the concept of severed heads because a failing change at the head will not prevent other changes from proceeding with their tests. If the jobs for the change at the head run longer than following changes, it could still impact them while it completes, but the reduction in code complexity is worth this minor de-optimization. The debugging representation of QueueItem is changed to make it more useful. Change-Id: I0d2d416fb0dd88647490ec06ed69deae71d39374 --- NEWS.rst | 6 -- doc/source/zuul.rst | 8 -- tests/fixtures/layout.yaml | 1 - tests/test_scheduler.py | 112 +++++++++++++++-------- zuul/layoutvalidator.py | 1 - zuul/model.py | 120 ++++++++++++------------- zuul/scheduler.py | 179 ++++++++++++++++++++----------------- 7 files changed, 231 insertions(+), 196 deletions(-) diff --git a/NEWS.rst b/NEWS.rst index db269a45d5..c4901a8a6a 100644 --- a/NEWS.rst +++ b/NEWS.rst @@ -25,12 +25,6 @@ Since 1.3.0: triggers later). See the sample layout.yaml and Zuul section of the documentation. -* The default behavior is now to immediately dequeue changes that have - merge conflicts, even those not at the head of the queue. To enable - the old behavior (which would wait until the conflicting change was - at the head before dequeuing it), see the new "dequeue-on-conflict" - option. - * Some statsd keys have changed in a backwards incompatible way: * The counters and timers of the form zuul.job.{name} is now split into several keys of the form: diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst index f8e070c3dd..6adfa30f88 100644 --- a/doc/source/zuul.rst +++ b/doc/source/zuul.rst @@ -346,14 +346,6 @@ explanation of each of the parameters:: well. To suppress this behavior (and allow jobs to continue running), set this to ``false``. Default: ``true``. -**dequeue-on-conflict** - Normally, if there is a merge conflict or similar error with a - change, Zuul will immediately remove it from the queue, even if the - error is only due to a change that happened to be enqueued ahead of - it. If you would like to keep the change in the queue until it is - at the head to be certain that the merge conflict is intrinsic to - the change, set this to ``false``. Default: ``true``. - **success** Describes where Zuul should report to if all the jobs complete successfully. diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml index 1cb86880e2..dc659fb911 100644 --- a/tests/fixtures/layout.yaml +++ b/tests/fixtures/layout.yaml @@ -75,7 +75,6 @@ pipelines: verified: -1 - name: conflict - dequeue-on-conflict: false manager: DependentPipelineManager failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures trigger: diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 70b68c54e9..395ff2595a 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -1068,9 +1068,6 @@ class TestScheduler(testtools.TestCase): print 'pipeline %s queue %s contents %s' % ( pipeline.name, queue.name, queue.queue) self.assertEqual(len(queue.queue), 0) - if len(queue.severed_heads) != 0: - print 'heads', queue.severed_heads - self.assertEqual(len(queue.severed_heads), 0) def assertReportedStat(self, key, value=None, kind=None): start = time.time() @@ -1403,11 +1400,20 @@ class TestScheduler(testtools.TestCase): self.release(self.builds[2]) self.waitUntilSettled() - # project-test1 and project-test2 for A, project-test2 for B - self.assertEqual(len(self.builds), 3) + # project-test1 and project-test2 for A + # project-test2 for B + # project-merge for C (without B) + self.assertEqual(len(self.builds), 4) self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 2) - # check that build status of aborted jobs are masked ('CANCELED') + self.worker.release('.*-merge') + self.waitUntilSettled() + + # project-test1 and project-test2 for A + # project-test2 for B + # project-test1 and project-test2 for C + self.assertEqual(len(self.builds), 5) + items = self.sched.layout.pipelines['gate'].getAllItems() builds = items[0].current_build_set.getBuilds() self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1) @@ -1418,7 +1424,7 @@ class TestScheduler(testtools.TestCase): self.assertEqual(self.countJobResults(builds, None), 1) builds = items[2].current_build_set.getBuilds() self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1) - self.assertEqual(self.countJobResults(builds, 'CANCELED'), 2) + self.assertEqual(self.countJobResults(builds, None), 2) self.worker.hold_jobs_in_build = False self.worker.release() @@ -1667,8 +1673,9 @@ class TestScheduler(testtools.TestCase): self.waitUntilSettled() self.gearman_server.release('.*-merge') self.waitUntilSettled() - queue = self.gearman_server.getQueue() - self.getParameter(queue[-1], 'ZUUL_REF') + + self.assertEqual(len(self.history), 2) # A and C merge jobs + self.gearman_server.hold_jobs_in_queue = False self.gearman_server.release() self.waitUntilSettled() @@ -1679,32 +1686,7 @@ class TestScheduler(testtools.TestCase): self.assertEqual(A.reported, 2) self.assertEqual(B.reported, 2) self.assertEqual(C.reported, 2) - - def test_dequeue_conflict(self): - "Test that the option to dequeue merge conflicts works" - - self.gearman_server.hold_jobs_in_queue = True - A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') - A.addPatchset(['conflict']) - B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') - B.addPatchset(['conflict']) - A.addApproval('CRVW', 2) - B.addApproval('CRVW', 2) - self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) - self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) - self.waitUntilSettled() - - self.assertEqual(A.reported, 1) - self.assertEqual(B.reported, 2) - - self.gearman_server.hold_jobs_in_queue = False - self.gearman_server.release() - self.waitUntilSettled() - - self.assertEqual(A.data['status'], 'MERGED') - self.assertEqual(B.data['status'], 'NEW') - self.assertEqual(A.reported, 2) - self.assertEqual(B.reported, 2) + self.assertEqual(len(self.history), 6) def test_post(self): "Test that post jobs run" @@ -1888,6 +1870,66 @@ class TestScheduler(testtools.TestCase): self.assertEqual(C.reported, 2) self.assertEqual(len(self.history), 1) + def test_failing_dependent_changes(self): + "Test that failing dependent patches are taken out of stream" + self.worker.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') + D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D') + E = self.fake_gerrit.addFakeChange('org/project', 'master', 'E') + A.addApproval('CRVW', 2) + B.addApproval('CRVW', 2) + C.addApproval('CRVW', 2) + D.addApproval('CRVW', 2) + E.addApproval('CRVW', 2) + + # E, D -> C -> B, A + + D.setDependsOn(C, 1) + C.setDependsOn(B, 1) + + self.worker.addFailTest('project-test1', B) + + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(D.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(E.addApproval('APRV', 1)) + + self.waitUntilSettled() + self.worker.release('.*-merge') + self.waitUntilSettled() + self.worker.release('.*-merge') + self.waitUntilSettled() + self.worker.release('.*-merge') + self.waitUntilSettled() + self.worker.release('.*-merge') + self.waitUntilSettled() + self.worker.release('.*-merge') + self.waitUntilSettled() + + self.worker.hold_jobs_in_build = False + for build in self.builds: + if build.parameters['ZUUL_CHANGE'] != '1': + build.release() + self.waitUntilSettled() + + self.worker.release() + self.waitUntilSettled() + + self.assertEqual(A.data['status'], 'MERGED') + self.assertEqual(A.reported, 2) + self.assertEqual(B.data['status'], 'NEW') + self.assertEqual(B.reported, 2) + self.assertEqual(C.data['status'], 'NEW') + self.assertEqual(C.reported, 2) + self.assertEqual(D.data['status'], 'NEW') + self.assertEqual(D.reported, 2) + self.assertEqual(E.data['status'], 'MERGED') + self.assertEqual(E.reported, 2) + self.assertEqual(len(self.history), 18) + def test_head_is_dequeued_once(self): "Test that if a change at the head fails it is dequeued only once" # If it's dequeued more than once, we should see extra diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py index 00900a03ca..0d08f1b850 100644 --- a/zuul/layoutvalidator.py +++ b/zuul/layoutvalidator.py @@ -64,7 +64,6 @@ class LayoutSchema(object): 'success-message': str, 'failure-message': str, 'dequeue-on-new-patchset': bool, - 'dequeue-on-conflict': bool, 'trigger': trigger, 'success': report_actions, 'failure': report_actions, diff --git a/zuul/model.py b/zuul/model.py index d68ac915dd..ffbad0152f 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -51,7 +51,6 @@ class Pipeline(object): self.failure_message = None self.success_message = None self.dequeue_on_new_patchset = True - self.dequeue_on_conflict = True self.job_trees = {} # project -> JobTree self.manager = None self.queues = [] @@ -169,6 +168,7 @@ class Pipeline(object): return True if build.result != 'SUCCESS': return True + if not item.item_ahead: return False return self.isHoldingFollowingChanges(item.item_ahead) @@ -212,7 +212,6 @@ class Pipeline(object): items = [] for shared_queue in self.queues: items.extend(shared_queue.queue) - items.extend(shared_queue.severed_heads) return items def formatStatusHTML(self): @@ -222,8 +221,8 @@ class Pipeline(object): s = 'Change queue: %s' % queue.name ret += s + '\n' ret += '-' * len(s) + '\n' - for head in queue.getHeads(): - ret += self.formatStatus(head, html=True) + for item in queue.queue: + ret += self.formatStatus(item, html=True) return ret def formatStatusJSON(self): @@ -235,18 +234,21 @@ class Pipeline(object): j_queue = dict(name=queue.name) j_queues.append(j_queue) j_queue['heads'] = [] - for head in queue.getHeads(): - j_changes = [] - e = head - while e: - j_changes.append(self.formatItemJSON(e)) - if (len(j_changes) > 1 and - (j_changes[-2]['remaining_time'] is not None) and - (j_changes[-1]['remaining_time'] is not None)): - j_changes[-1]['remaining_time'] = max( - j_changes[-2]['remaining_time'], - j_changes[-1]['remaining_time']) - e = e.item_behind + + j_changes = [] + for e in queue.queue: + if not e.item_ahead: + if j_changes: + j_queue['heads'].append(j_changes) + j_changes = [] + j_changes.append(self.formatItemJSON(e)) + if (len(j_changes) > 1 and + (j_changes[-2]['remaining_time'] is not None) and + (j_changes[-1]['remaining_time'] is not None)): + j_changes[-1]['remaining_time'] = max( + j_changes[-2]['remaining_time'], + j_changes[-1]['remaining_time']) + if j_changes: j_queue['heads'].append(j_changes) return j_pipeline @@ -261,9 +263,11 @@ class Pipeline(object): changeish.url, changeish._id()) else: - ret += '%sProject %s change %s\n' % (indent_str, - changeish.project.name, - changeish._id()) + ret += '%sProject %s change %s based on %s\n' % ( + indent_str, + changeish.project.name, + changeish._id(), + item.item_ahead) for job in self.getJobs(changeish): build = item.current_build_set.getBuild(job.name) if build: @@ -284,9 +288,6 @@ class Pipeline(object): job_name = '%s' % (url, job_name) ret += '%s %s: %s%s' % (indent_str, job_name, result, voting) ret += '\n' - if item.item_behind: - ret += '%sFollowed by:\n' % (indent_str) - ret += self.formatStatus(item.item_behind, indent + 2, html) return ret def formatItemJSON(self, item): @@ -333,19 +334,7 @@ class Pipeline(object): result=result, voting=job.voting)) if self.haveAllJobsStarted(item): - # if a change ahead has failed, we are unknown. - item_ahead_failed = False - i = item.item_ahead - while i: - if self.didAnyJobFail(i): - item_ahead_failed = True - i = None # safe to stop looking - else: - i = i.item_ahead - if item_ahead_failed: - ret['remaining_time'] = None - else: - ret['remaining_time'] = max_remaining + ret['remaining_time'] = max_remaining else: ret['remaining_time'] = None return ret @@ -385,7 +374,6 @@ class ChangeQueue(object): self.projects = [] self._jobs = set() self.queue = [] - self.severed_heads = [] self.dependent = dependent def __repr__(self): @@ -411,45 +399,44 @@ class ChangeQueue(object): def enqueueItem(self, item): if self.dependent and self.queue: item.item_ahead = self.queue[-1] - item.item_ahead.item_behind = item + item.item_ahead.items_behind.append(item) self.queue.append(item) def dequeueItem(self, item): if item in self.queue: self.queue.remove(item) - if item in self.severed_heads: - self.severed_heads.remove(item) if item.item_ahead: - item.item_ahead.item_behind = item.item_behind - if item.item_behind: - item.item_behind.item_ahead = item.item_ahead + item.item_ahead.items_behind.remove(item) + for item_behind in item.items_behind: + if item.item_ahead: + item.item_ahead.items_behind.append(item_behind) + item_behind.item_ahead = item.item_ahead item.item_ahead = None - item.item_behind = None + item.items_behind = [] item.dequeue_time = time.time() - def addSeveredHead(self, item): - self.severed_heads.append(item) + def moveItem(self, item, item_ahead): + if not self.dependent: + return False + if item.item_ahead == item_ahead: + return False + # Remove from current location + if item.item_ahead: + item.item_ahead.items_behind.remove(item) + for item_behind in item.items_behind: + if item.item_ahead: + item.item_ahead.items_behind.append(item_behind) + item_behind.item_ahead = item.item_ahead + # Add to new location + item.item_ahead = item_ahead + if item.item_ahead: + item.item_ahead.items_behind.append(item) + return True def mergeChangeQueue(self, other): for project in other.projects: self.addProject(project) - def getHead(self): - if not self.queue: - return None - return self.queue[0] - - def getHeads(self): - heads = [] - if self.dependent: - h = self.getHead() - if h: - heads.append(h) - else: - heads.extend(self.queue) - heads.extend(self.severed_heads) - return heads - class Project(object): def __init__(self, name): @@ -592,6 +579,7 @@ class BuildSet(object): self.commit = None self.unable_to_merge = False self.unable_to_merge_message = None + self.failing_reasons = [] def setConfiguration(self): # The change isn't enqueued until after it's created @@ -632,11 +620,19 @@ class QueueItem(object): self.current_build_set = BuildSet(self) self.build_sets.append(self.current_build_set) self.item_ahead = None - self.item_behind = None + self.items_behind = [] self.enqueue_time = None self.dequeue_time = None self.reported = False + def __repr__(self): + if self.pipeline: + pipeline = self.pipeline.name + else: + pipeline = None + return '' % ( + id(self), self.change, pipeline) + def resetAllBuilds(self): old = self.current_build_set self.current_build_set.result = 'CANCELED' diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 8a4d942857..514be2fea6 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -133,8 +133,6 @@ class Scheduler(threading.Thread): "Build succeeded.") pipeline.dequeue_on_new_patchset = conf_pipeline.get( 'dequeue-on-new-patchset', True) - pipeline.dequeue_on_conflict = conf_pipeline.get( - 'dequeue-on-conflict', True) action_reporters = {} for action in ['start', 'success', 'failure']: @@ -456,10 +454,9 @@ class Scheduler(threading.Thread): name) items_to_remove = [] for shared_queue in old_pipeline.queues: - for item in (shared_queue.queue + - shared_queue.severed_heads): + for item in shared_queue.queue: item.item_ahead = None - item.item_behind = None + item.items_behind = [] item.pipeline = None project = layout.projects.get(item.change.project.name) if not project: @@ -470,9 +467,7 @@ class Scheduler(threading.Thread): items_to_remove.append(item) continue item.change.project = project - severed = item in shared_queue.severed_heads - if not new_pipeline.manager.reEnqueueItem( - item, severed=severed): + if not new_pipeline.manager.reEnqueueItem(item): items_to_remove.append(item) builds_to_remove = [] for build, item in old_pipeline.manager.building_jobs.items(): @@ -794,6 +789,9 @@ class BasePipelineManager(object): def checkForChangesNeededBy(self, change): return True + def getFailingDependentItem(self, item): + return None + def getDependentItems(self, item): orig_item = item items = [] @@ -805,6 +803,12 @@ class BasePipelineManager(object): [x.change for x in items])) return items + def getItemForChange(self, change): + for item in self.pipeline.getAllItems(): + if item.change.equals(change): + return item + return None + def findOldVersionOfChangeAlreadyInQueue(self, change): for c in self.pipeline.getChangesInQueue(): if change.isUpdateOf(c): @@ -820,15 +824,12 @@ class BasePipelineManager(object): (change, old_change, old_change)) self.removeChange(old_change) - def reEnqueueItem(self, item, severed=False): + def reEnqueueItem(self, item): change_queue = self.pipeline.getQueue(item.change.project) if change_queue: self.log.debug("Re-enqueing change %s in queue %s" % (item.change, change_queue)) - if severed: - change_queue.addSeveredHead(item) - else: - change_queue.enqueueItem(item) + change_queue.enqueueItem(item) self.reportStats(item) return True else: @@ -869,15 +870,10 @@ class BasePipelineManager(object): change.project) return False - def dequeueItem(self, item, keep_severed_heads=True): + def dequeueItem(self, item): self.log.debug("Removing change %s from queue" % item.change) - item_ahead = item.item_ahead change_queue = self.pipeline.getQueue(item.change.project) change_queue.dequeueItem(item) - if (keep_severed_heads and not item_ahead and - (item.change.is_reportable and not item.reported)): - self.log.debug("Adding %s as a severed head" % item.change) - change_queue.addSeveredHead(item) self.sched._maintain_trigger_cache = True def removeChange(self, change): @@ -888,7 +884,7 @@ class BasePipelineManager(object): self.log.debug("Canceling builds behind change: %s " "because it is being removed." % item.change) self.cancelJobs(item) - self.dequeueItem(item, keep_severed_heads=False) + self.dequeueItem(item) self.reportStats(item) def prepareRef(self, item): @@ -901,29 +897,14 @@ class BasePipelineManager(object): ref = item.current_build_set.ref dependent_items = self.getDependentItems(item) dependent_items.reverse() - dependent_str = ', '.join( - ['%s' % i.change.number for i in dependent_items - if i.change.project == item.change.project]) - if dependent_str: - msg = \ - "This change was unable to be automatically merged "\ - "with the current state of the repository and the "\ - "following changes which were enqueued ahead of it: "\ - "%s. Please rebase your change and upload a new "\ - "patchset." % dependent_str - else: - msg = "This change was unable to be automatically merged "\ - "with the current state of the repository. Please "\ - "rebase your change and upload a new patchset." all_items = dependent_items + [item] - if (dependent_items and - not dependent_items[-1].current_build_set.commit): - self.pipeline.setUnableToMerge(item, msg) - return True commit = self.sched.merger.mergeChanges(all_items, ref) item.current_build_set.commit = commit if not commit: self.log.info("Unable to merge change %s" % item.change) + msg = ("This change was unable to be automatically merged " + "with the current state of the repository. Please " + "rebase your change and upload a new patchset.") self.pipeline.setUnableToMerge(item, msg) return True return False @@ -971,74 +952,94 @@ class BasePipelineManager(object): self.log.debug("Removing build %s from running builds" % build) build.result = 'CANCELED' del self.building_jobs[build] - if item.item_behind: + for item_behind in item.items_behind: self.log.debug("Canceling jobs for change %s, behind change %s" % - (item.item_behind.change, item.change)) - if self.cancelJobs(item.item_behind, prime=prime): + (item_behind.change, item.change)) + if self.cancelJobs(item_behind, prime=prime): canceled = True return canceled - def _processOneItem(self, item): + def _processOneItem(self, item, nnfi): changed = False item_ahead = item.item_ahead - item_behind = item.item_behind - if self.prepareRef(item): - changed = True - if self.pipeline.dequeue_on_conflict: - self.log.info("Dequeuing change %s because " - "of a git merge error" % item.change) - self.dequeueItem(item, keep_severed_heads=False) - try: - self.reportItem(item) - except MergeFailure: - pass - return changed + change_queue = self.pipeline.getQueue(item.change.project) + failing_reasons = [] # Reasons this item is failing + if self.checkForChangesNeededBy(item.change) is not True: # It's not okay to enqueue this change, we should remove it. self.log.info("Dequeuing change %s because " "it can no longer merge" % item.change) self.cancelJobs(item) - self.dequeueItem(item, keep_severed_heads=False) + self.dequeueItem(item) self.pipeline.setDequeuedNeedingChange(item) try: self.reportItem(item) except MergeFailure: pass - changed = True - return changed - if not item_ahead: - merge_failed = False - if self.pipeline.areAllJobsComplete(item): - try: - self.reportItem(item) - except MergeFailure: - merge_failed = True - self.dequeueItem(item) - changed = True - if merge_failed or self.pipeline.didAnyJobFail(item): - if item_behind: - self.cancelJobs(item_behind) - changed = True - self.dequeueItem(item) + return (True, nnfi) + dep_item = self.getFailingDependentItem(item) + if dep_item: + failing_reasons.append('a needed change is failing') + self.cancelJobs(item, prime=False) else: - if self.pipeline.didAnyJobFail(item): - if item_behind: - if self.cancelJobs(item_behind, prime=False): - changed = True - # don't restart yet; this change will eventually become - # the head + if (item_ahead and item_ahead != nnfi and + not item_ahead.change.is_merged): + # Our current base is different than what we expected, + # and it's not because our current base merged. Something + # ahead must have failed. + self.log.info("Resetting builds for change %s because the " + "item ahead, %s, is not the nearest non-failing " + "item, %s" % (item.change, item_ahead, nnfi)) + change_queue.moveItem(item, nnfi) + changed = True + self.cancelJobs(item) + self.prepareRef(item) + if item.current_build_set.unable_to_merge: + failing_reasons.append("merge conflict") if self.launchJobs(item): changed = True - return changed + if self.pipeline.didAnyJobFail(item): + failing_reasons.append("at least one job failed") + if (not item_ahead) and self.pipeline.areAllJobsComplete(item): + try: + self.reportItem(item) + except MergeFailure: + failing_reasons.append("did not merge") + for item_behind in item.items_behind: + self.log.info("Resetting builds for change %s because the " + "item ahead, %s, failed to merge" % + (item_behind.change, item)) + self.cancelJobs(item_behind) + self.dequeueItem(item) + changed = True + elif not failing_reasons: + nnfi = item + item.current_build_set.failing_reasons = failing_reasons + if failing_reasons: + self.log.debug("%s is a failing item because %s" % + (item, failing_reasons)) + return (changed, nnfi) def processQueue(self): # Do whatever needs to be done for each change in the queue self.log.debug("Starting queue processor: %s" % self.pipeline.name) changed = False - for item in self.pipeline.getAllItems(): - if self._processOneItem(item): + for queue in self.pipeline.queues: + queue_changed = False + nnfi = None # Nearest non-failing item + for item in queue.queue[:]: + item_changed, nnfi = self._processOneItem(item, nnfi) + if item_changed: + queue_changed = True + self.reportStats(item) + if queue_changed: changed = True - self.reportStats(item) + status = '' + for item in queue.queue: + status += self.pipeline.formatStatus(item) + if status: + self.log.debug("Queue %s status is now:\n %s" % + (queue.name, status)) self.log.debug("Finished queue processor: %s (changed: %s)" % (self.pipeline.name, changed)) return changed @@ -1079,8 +1080,8 @@ class BasePipelineManager(object): del self.building_jobs[build] self.pipeline.setResult(change, build) - self.log.info("Change %s status is now:\n %s" % - (change, self.pipeline.formatStatus(change))) + self.log.debug("Change %s status is now:\n %s" % + (change, self.pipeline.formatStatus(change))) self.updateBuildDescriptions(build.build_set) while self.processQueue(): pass @@ -1444,3 +1445,15 @@ class DependentPipelineManager(BasePipelineManager): self.log.debug(" Change %s is needed but can not be merged" % change.needs_change) return False + + def getFailingDependentItem(self, item): + if not hasattr(item.change, 'needs_change'): + return None + if not item.change.needs_change: + return None + needs_item = self.getItemForChange(item.change.needs_change) + if not needs_item: + return None + if needs_item.current_build_set.failing_reasons: + return needs_item + return None