Merge "Use NNFI scheduler algorithm"

This commit is contained in:
Jenkins 2013-09-20 16:06:59 +00:00 committed by Gerrit Code Review
commit 2da48b8180
7 changed files with 231 additions and 196 deletions

View File

@ -25,12 +25,6 @@ Since 1.3.0:
triggers later). See the sample layout.yaml and Zuul section of the triggers later). See the sample layout.yaml and Zuul section of the
documentation. documentation.
* The default behavior is now to immediately dequeue changes that have
merge conflicts, even those not at the head of the queue. To enable
the old behavior (which would wait until the conflicting change was
at the head before dequeuing it), see the new "dequeue-on-conflict"
option.
* Some statsd keys have changed in a backwards incompatible way: * Some statsd keys have changed in a backwards incompatible way:
* The counters and timers of the form zuul.job.{name} is now split * The counters and timers of the form zuul.job.{name} is now split
into several keys of the form: into several keys of the form:

View File

@ -346,14 +346,6 @@ explanation of each of the parameters::
well. To suppress this behavior (and allow jobs to continue well. To suppress this behavior (and allow jobs to continue
running), set this to ``false``. Default: ``true``. running), set this to ``false``. Default: ``true``.
**dequeue-on-conflict**
Normally, if there is a merge conflict or similar error with a
change, Zuul will immediately remove it from the queue, even if the
error is only due to a change that happened to be enqueued ahead of
it. If you would like to keep the change in the queue until it is
at the head to be certain that the merge conflict is intrinsic to
the change, set this to ``false``. Default: ``true``.
**success** **success**
Describes where Zuul should report to if all the jobs complete Describes where Zuul should report to if all the jobs complete
successfully. successfully.

View File

@ -75,7 +75,6 @@ pipelines:
verified: -1 verified: -1
- name: conflict - name: conflict
dequeue-on-conflict: false
manager: DependentPipelineManager manager: DependentPipelineManager
failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
trigger: trigger:

View File

@ -1068,9 +1068,6 @@ class TestScheduler(testtools.TestCase):
print 'pipeline %s queue %s contents %s' % ( print 'pipeline %s queue %s contents %s' % (
pipeline.name, queue.name, queue.queue) pipeline.name, queue.name, queue.queue)
self.assertEqual(len(queue.queue), 0) self.assertEqual(len(queue.queue), 0)
if len(queue.severed_heads) != 0:
print 'heads', queue.severed_heads
self.assertEqual(len(queue.severed_heads), 0)
def assertReportedStat(self, key, value=None, kind=None): def assertReportedStat(self, key, value=None, kind=None):
start = time.time() start = time.time()
@ -1403,11 +1400,20 @@ class TestScheduler(testtools.TestCase):
self.release(self.builds[2]) self.release(self.builds[2])
self.waitUntilSettled() self.waitUntilSettled()
# project-test1 and project-test2 for A, project-test2 for B # project-test1 and project-test2 for A
self.assertEqual(len(self.builds), 3) # project-test2 for B
# project-merge for C (without B)
self.assertEqual(len(self.builds), 4)
self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 2) self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 2)
# check that build status of aborted jobs are masked ('CANCELED') self.worker.release('.*-merge')
self.waitUntilSettled()
# project-test1 and project-test2 for A
# project-test2 for B
# project-test1 and project-test2 for C
self.assertEqual(len(self.builds), 5)
items = self.sched.layout.pipelines['gate'].getAllItems() items = self.sched.layout.pipelines['gate'].getAllItems()
builds = items[0].current_build_set.getBuilds() builds = items[0].current_build_set.getBuilds()
self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1) self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1)
@ -1418,7 +1424,7 @@ class TestScheduler(testtools.TestCase):
self.assertEqual(self.countJobResults(builds, None), 1) self.assertEqual(self.countJobResults(builds, None), 1)
builds = items[2].current_build_set.getBuilds() builds = items[2].current_build_set.getBuilds()
self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1) self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1)
self.assertEqual(self.countJobResults(builds, 'CANCELED'), 2) self.assertEqual(self.countJobResults(builds, None), 2)
self.worker.hold_jobs_in_build = False self.worker.hold_jobs_in_build = False
self.worker.release() self.worker.release()
@ -1667,8 +1673,9 @@ class TestScheduler(testtools.TestCase):
self.waitUntilSettled() self.waitUntilSettled()
self.gearman_server.release('.*-merge') self.gearman_server.release('.*-merge')
self.waitUntilSettled() self.waitUntilSettled()
queue = self.gearman_server.getQueue()
self.getParameter(queue[-1], 'ZUUL_REF') self.assertEqual(len(self.history), 2) # A and C merge jobs
self.gearman_server.hold_jobs_in_queue = False self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release() self.gearman_server.release()
self.waitUntilSettled() self.waitUntilSettled()
@ -1679,32 +1686,7 @@ class TestScheduler(testtools.TestCase):
self.assertEqual(A.reported, 2) self.assertEqual(A.reported, 2)
self.assertEqual(B.reported, 2) self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2) self.assertEqual(C.reported, 2)
self.assertEqual(len(self.history), 6)
def test_dequeue_conflict(self):
"Test that the option to dequeue merge conflicts works"
self.gearman_server.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addPatchset(['conflict'])
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
B.addPatchset(['conflict'])
A.addApproval('CRVW', 2)
B.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
self.waitUntilSettled()
self.assertEqual(A.reported, 1)
self.assertEqual(B.reported, 2)
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(B.data['status'], 'NEW')
self.assertEqual(A.reported, 2)
self.assertEqual(B.reported, 2)
def test_post(self): def test_post(self):
"Test that post jobs run" "Test that post jobs run"
@ -1888,6 +1870,66 @@ class TestScheduler(testtools.TestCase):
self.assertEqual(C.reported, 2) self.assertEqual(C.reported, 2)
self.assertEqual(len(self.history), 1) self.assertEqual(len(self.history), 1)
def test_failing_dependent_changes(self):
"Test that failing dependent patches are taken out of stream"
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D')
E = self.fake_gerrit.addFakeChange('org/project', 'master', 'E')
A.addApproval('CRVW', 2)
B.addApproval('CRVW', 2)
C.addApproval('CRVW', 2)
D.addApproval('CRVW', 2)
E.addApproval('CRVW', 2)
# E, D -> C -> B, A
D.setDependsOn(C, 1)
C.setDependsOn(B, 1)
self.worker.addFailTest('project-test1', B)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.fake_gerrit.addEvent(D.addApproval('APRV', 1))
self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
self.fake_gerrit.addEvent(E.addApproval('APRV', 1))
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.hold_jobs_in_build = False
for build in self.builds:
if build.parameters['ZUUL_CHANGE'] != '1':
build.release()
self.waitUntilSettled()
self.worker.release()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
self.assertEqual(B.data['status'], 'NEW')
self.assertEqual(B.reported, 2)
self.assertEqual(C.data['status'], 'NEW')
self.assertEqual(C.reported, 2)
self.assertEqual(D.data['status'], 'NEW')
self.assertEqual(D.reported, 2)
self.assertEqual(E.data['status'], 'MERGED')
self.assertEqual(E.reported, 2)
self.assertEqual(len(self.history), 18)
def test_head_is_dequeued_once(self): def test_head_is_dequeued_once(self):
"Test that if a change at the head fails it is dequeued only once" "Test that if a change at the head fails it is dequeued only once"
# If it's dequeued more than once, we should see extra # If it's dequeued more than once, we should see extra

View File

@ -64,7 +64,6 @@ class LayoutSchema(object):
'success-message': str, 'success-message': str,
'failure-message': str, 'failure-message': str,
'dequeue-on-new-patchset': bool, 'dequeue-on-new-patchset': bool,
'dequeue-on-conflict': bool,
'trigger': trigger, 'trigger': trigger,
'success': report_actions, 'success': report_actions,
'failure': report_actions, 'failure': report_actions,

View File

@ -51,7 +51,6 @@ class Pipeline(object):
self.failure_message = None self.failure_message = None
self.success_message = None self.success_message = None
self.dequeue_on_new_patchset = True self.dequeue_on_new_patchset = True
self.dequeue_on_conflict = True
self.job_trees = {} # project -> JobTree self.job_trees = {} # project -> JobTree
self.manager = None self.manager = None
self.queues = [] self.queues = []
@ -169,6 +168,7 @@ class Pipeline(object):
return True return True
if build.result != 'SUCCESS': if build.result != 'SUCCESS':
return True return True
if not item.item_ahead: if not item.item_ahead:
return False return False
return self.isHoldingFollowingChanges(item.item_ahead) return self.isHoldingFollowingChanges(item.item_ahead)
@ -212,7 +212,6 @@ class Pipeline(object):
items = [] items = []
for shared_queue in self.queues: for shared_queue in self.queues:
items.extend(shared_queue.queue) items.extend(shared_queue.queue)
items.extend(shared_queue.severed_heads)
return items return items
def formatStatusHTML(self): def formatStatusHTML(self):
@ -222,8 +221,8 @@ class Pipeline(object):
s = 'Change queue: %s' % queue.name s = 'Change queue: %s' % queue.name
ret += s + '\n' ret += s + '\n'
ret += '-' * len(s) + '\n' ret += '-' * len(s) + '\n'
for head in queue.getHeads(): for item in queue.queue:
ret += self.formatStatus(head, html=True) ret += self.formatStatus(item, html=True)
return ret return ret
def formatStatusJSON(self): def formatStatusJSON(self):
@ -235,18 +234,21 @@ class Pipeline(object):
j_queue = dict(name=queue.name) j_queue = dict(name=queue.name)
j_queues.append(j_queue) j_queues.append(j_queue)
j_queue['heads'] = [] j_queue['heads'] = []
for head in queue.getHeads():
j_changes = [] j_changes = []
e = head for e in queue.queue:
while e: if not e.item_ahead:
j_changes.append(self.formatItemJSON(e)) if j_changes:
if (len(j_changes) > 1 and j_queue['heads'].append(j_changes)
(j_changes[-2]['remaining_time'] is not None) and j_changes = []
(j_changes[-1]['remaining_time'] is not None)): j_changes.append(self.formatItemJSON(e))
j_changes[-1]['remaining_time'] = max( if (len(j_changes) > 1 and
j_changes[-2]['remaining_time'], (j_changes[-2]['remaining_time'] is not None) and
j_changes[-1]['remaining_time']) (j_changes[-1]['remaining_time'] is not None)):
e = e.item_behind j_changes[-1]['remaining_time'] = max(
j_changes[-2]['remaining_time'],
j_changes[-1]['remaining_time'])
if j_changes:
j_queue['heads'].append(j_changes) j_queue['heads'].append(j_changes)
return j_pipeline return j_pipeline
@ -261,9 +263,11 @@ class Pipeline(object):
changeish.url, changeish.url,
changeish._id()) changeish._id())
else: else:
ret += '%sProject %s change %s\n' % (indent_str, ret += '%sProject %s change %s based on %s\n' % (
changeish.project.name, indent_str,
changeish._id()) changeish.project.name,
changeish._id(),
item.item_ahead)
for job in self.getJobs(changeish): for job in self.getJobs(changeish):
build = item.current_build_set.getBuild(job.name) build = item.current_build_set.getBuild(job.name)
if build: if build:
@ -284,9 +288,6 @@ class Pipeline(object):
job_name = '<a href="%s">%s</a>' % (url, job_name) job_name = '<a href="%s">%s</a>' % (url, job_name)
ret += '%s %s: %s%s' % (indent_str, job_name, result, voting) ret += '%s %s: %s%s' % (indent_str, job_name, result, voting)
ret += '\n' ret += '\n'
if item.item_behind:
ret += '%sFollowed by:\n' % (indent_str)
ret += self.formatStatus(item.item_behind, indent + 2, html)
return ret return ret
def formatItemJSON(self, item): def formatItemJSON(self, item):
@ -333,19 +334,7 @@ class Pipeline(object):
result=result, result=result,
voting=job.voting)) voting=job.voting))
if self.haveAllJobsStarted(item): if self.haveAllJobsStarted(item):
# if a change ahead has failed, we are unknown. ret['remaining_time'] = max_remaining
item_ahead_failed = False
i = item.item_ahead
while i:
if self.didAnyJobFail(i):
item_ahead_failed = True
i = None # safe to stop looking
else:
i = i.item_ahead
if item_ahead_failed:
ret['remaining_time'] = None
else:
ret['remaining_time'] = max_remaining
else: else:
ret['remaining_time'] = None ret['remaining_time'] = None
return ret return ret
@ -385,7 +374,6 @@ class ChangeQueue(object):
self.projects = [] self.projects = []
self._jobs = set() self._jobs = set()
self.queue = [] self.queue = []
self.severed_heads = []
self.dependent = dependent self.dependent = dependent
def __repr__(self): def __repr__(self):
@ -411,45 +399,44 @@ class ChangeQueue(object):
def enqueueItem(self, item): def enqueueItem(self, item):
if self.dependent and self.queue: if self.dependent and self.queue:
item.item_ahead = self.queue[-1] item.item_ahead = self.queue[-1]
item.item_ahead.item_behind = item item.item_ahead.items_behind.append(item)
self.queue.append(item) self.queue.append(item)
def dequeueItem(self, item): def dequeueItem(self, item):
if item in self.queue: if item in self.queue:
self.queue.remove(item) self.queue.remove(item)
if item in self.severed_heads:
self.severed_heads.remove(item)
if item.item_ahead: if item.item_ahead:
item.item_ahead.item_behind = item.item_behind item.item_ahead.items_behind.remove(item)
if item.item_behind: for item_behind in item.items_behind:
item.item_behind.item_ahead = item.item_ahead if item.item_ahead:
item.item_ahead.items_behind.append(item_behind)
item_behind.item_ahead = item.item_ahead
item.item_ahead = None item.item_ahead = None
item.item_behind = None item.items_behind = []
item.dequeue_time = time.time() item.dequeue_time = time.time()
def addSeveredHead(self, item): def moveItem(self, item, item_ahead):
self.severed_heads.append(item) if not self.dependent:
return False
if item.item_ahead == item_ahead:
return False
# Remove from current location
if item.item_ahead:
item.item_ahead.items_behind.remove(item)
for item_behind in item.items_behind:
if item.item_ahead:
item.item_ahead.items_behind.append(item_behind)
item_behind.item_ahead = item.item_ahead
# Add to new location
item.item_ahead = item_ahead
if item.item_ahead:
item.item_ahead.items_behind.append(item)
return True
def mergeChangeQueue(self, other): def mergeChangeQueue(self, other):
for project in other.projects: for project in other.projects:
self.addProject(project) self.addProject(project)
def getHead(self):
if not self.queue:
return None
return self.queue[0]
def getHeads(self):
heads = []
if self.dependent:
h = self.getHead()
if h:
heads.append(h)
else:
heads.extend(self.queue)
heads.extend(self.severed_heads)
return heads
class Project(object): class Project(object):
def __init__(self, name): def __init__(self, name):
@ -592,6 +579,7 @@ class BuildSet(object):
self.commit = None self.commit = None
self.unable_to_merge = False self.unable_to_merge = False
self.unable_to_merge_message = None self.unable_to_merge_message = None
self.failing_reasons = []
def setConfiguration(self): def setConfiguration(self):
# The change isn't enqueued until after it's created # The change isn't enqueued until after it's created
@ -632,11 +620,19 @@ class QueueItem(object):
self.current_build_set = BuildSet(self) self.current_build_set = BuildSet(self)
self.build_sets.append(self.current_build_set) self.build_sets.append(self.current_build_set)
self.item_ahead = None self.item_ahead = None
self.item_behind = None self.items_behind = []
self.enqueue_time = None self.enqueue_time = None
self.dequeue_time = None self.dequeue_time = None
self.reported = False self.reported = False
def __repr__(self):
if self.pipeline:
pipeline = self.pipeline.name
else:
pipeline = None
return '<QueueItem 0x%x for %s in %s>' % (
id(self), self.change, pipeline)
def resetAllBuilds(self): def resetAllBuilds(self):
old = self.current_build_set old = self.current_build_set
self.current_build_set.result = 'CANCELED' self.current_build_set.result = 'CANCELED'

View File

@ -133,8 +133,6 @@ class Scheduler(threading.Thread):
"Build succeeded.") "Build succeeded.")
pipeline.dequeue_on_new_patchset = conf_pipeline.get( pipeline.dequeue_on_new_patchset = conf_pipeline.get(
'dequeue-on-new-patchset', True) 'dequeue-on-new-patchset', True)
pipeline.dequeue_on_conflict = conf_pipeline.get(
'dequeue-on-conflict', True)
action_reporters = {} action_reporters = {}
for action in ['start', 'success', 'failure']: for action in ['start', 'success', 'failure']:
@ -456,10 +454,9 @@ class Scheduler(threading.Thread):
name) name)
items_to_remove = [] items_to_remove = []
for shared_queue in old_pipeline.queues: for shared_queue in old_pipeline.queues:
for item in (shared_queue.queue + for item in shared_queue.queue:
shared_queue.severed_heads):
item.item_ahead = None item.item_ahead = None
item.item_behind = None item.items_behind = []
item.pipeline = None item.pipeline = None
project = layout.projects.get(item.change.project.name) project = layout.projects.get(item.change.project.name)
if not project: if not project:
@ -470,9 +467,7 @@ class Scheduler(threading.Thread):
items_to_remove.append(item) items_to_remove.append(item)
continue continue
item.change.project = project item.change.project = project
severed = item in shared_queue.severed_heads if not new_pipeline.manager.reEnqueueItem(item):
if not new_pipeline.manager.reEnqueueItem(
item, severed=severed):
items_to_remove.append(item) items_to_remove.append(item)
builds_to_remove = [] builds_to_remove = []
for build, item in old_pipeline.manager.building_jobs.items(): for build, item in old_pipeline.manager.building_jobs.items():
@ -794,6 +789,9 @@ class BasePipelineManager(object):
def checkForChangesNeededBy(self, change): def checkForChangesNeededBy(self, change):
return True return True
def getFailingDependentItem(self, item):
return None
def getDependentItems(self, item): def getDependentItems(self, item):
orig_item = item orig_item = item
items = [] items = []
@ -805,6 +803,12 @@ class BasePipelineManager(object):
[x.change for x in items])) [x.change for x in items]))
return items return items
def getItemForChange(self, change):
for item in self.pipeline.getAllItems():
if item.change.equals(change):
return item
return None
def findOldVersionOfChangeAlreadyInQueue(self, change): def findOldVersionOfChangeAlreadyInQueue(self, change):
for c in self.pipeline.getChangesInQueue(): for c in self.pipeline.getChangesInQueue():
if change.isUpdateOf(c): if change.isUpdateOf(c):
@ -820,15 +824,12 @@ class BasePipelineManager(object):
(change, old_change, old_change)) (change, old_change, old_change))
self.removeChange(old_change) self.removeChange(old_change)
def reEnqueueItem(self, item, severed=False): def reEnqueueItem(self, item):
change_queue = self.pipeline.getQueue(item.change.project) change_queue = self.pipeline.getQueue(item.change.project)
if change_queue: if change_queue:
self.log.debug("Re-enqueing change %s in queue %s" % self.log.debug("Re-enqueing change %s in queue %s" %
(item.change, change_queue)) (item.change, change_queue))
if severed: change_queue.enqueueItem(item)
change_queue.addSeveredHead(item)
else:
change_queue.enqueueItem(item)
self.reportStats(item) self.reportStats(item)
return True return True
else: else:
@ -869,15 +870,10 @@ class BasePipelineManager(object):
change.project) change.project)
return False return False
def dequeueItem(self, item, keep_severed_heads=True): def dequeueItem(self, item):
self.log.debug("Removing change %s from queue" % item.change) self.log.debug("Removing change %s from queue" % item.change)
item_ahead = item.item_ahead
change_queue = self.pipeline.getQueue(item.change.project) change_queue = self.pipeline.getQueue(item.change.project)
change_queue.dequeueItem(item) change_queue.dequeueItem(item)
if (keep_severed_heads and not item_ahead and
(item.change.is_reportable and not item.reported)):
self.log.debug("Adding %s as a severed head" % item.change)
change_queue.addSeveredHead(item)
self.sched._maintain_trigger_cache = True self.sched._maintain_trigger_cache = True
def removeChange(self, change): def removeChange(self, change):
@ -888,7 +884,7 @@ class BasePipelineManager(object):
self.log.debug("Canceling builds behind change: %s " self.log.debug("Canceling builds behind change: %s "
"because it is being removed." % item.change) "because it is being removed." % item.change)
self.cancelJobs(item) self.cancelJobs(item)
self.dequeueItem(item, keep_severed_heads=False) self.dequeueItem(item)
self.reportStats(item) self.reportStats(item)
def prepareRef(self, item): def prepareRef(self, item):
@ -901,29 +897,14 @@ class BasePipelineManager(object):
ref = item.current_build_set.ref ref = item.current_build_set.ref
dependent_items = self.getDependentItems(item) dependent_items = self.getDependentItems(item)
dependent_items.reverse() dependent_items.reverse()
dependent_str = ', '.join(
['%s' % i.change.number for i in dependent_items
if i.change.project == item.change.project])
if dependent_str:
msg = \
"This change was unable to be automatically merged "\
"with the current state of the repository and the "\
"following changes which were enqueued ahead of it: "\
"%s. Please rebase your change and upload a new "\
"patchset." % dependent_str
else:
msg = "This change was unable to be automatically merged "\
"with the current state of the repository. Please "\
"rebase your change and upload a new patchset."
all_items = dependent_items + [item] all_items = dependent_items + [item]
if (dependent_items and
not dependent_items[-1].current_build_set.commit):
self.pipeline.setUnableToMerge(item, msg)
return True
commit = self.sched.merger.mergeChanges(all_items, ref) commit = self.sched.merger.mergeChanges(all_items, ref)
item.current_build_set.commit = commit item.current_build_set.commit = commit
if not commit: if not commit:
self.log.info("Unable to merge change %s" % item.change) self.log.info("Unable to merge change %s" % item.change)
msg = ("This change was unable to be automatically merged "
"with the current state of the repository. Please "
"rebase your change and upload a new patchset.")
self.pipeline.setUnableToMerge(item, msg) self.pipeline.setUnableToMerge(item, msg)
return True return True
return False return False
@ -971,74 +952,94 @@ class BasePipelineManager(object):
self.log.debug("Removing build %s from running builds" % build) self.log.debug("Removing build %s from running builds" % build)
build.result = 'CANCELED' build.result = 'CANCELED'
del self.building_jobs[build] del self.building_jobs[build]
if item.item_behind: for item_behind in item.items_behind:
self.log.debug("Canceling jobs for change %s, behind change %s" % self.log.debug("Canceling jobs for change %s, behind change %s" %
(item.item_behind.change, item.change)) (item_behind.change, item.change))
if self.cancelJobs(item.item_behind, prime=prime): if self.cancelJobs(item_behind, prime=prime):
canceled = True canceled = True
return canceled return canceled
def _processOneItem(self, item): def _processOneItem(self, item, nnfi):
changed = False changed = False
item_ahead = item.item_ahead item_ahead = item.item_ahead
item_behind = item.item_behind change_queue = self.pipeline.getQueue(item.change.project)
if self.prepareRef(item): failing_reasons = [] # Reasons this item is failing
changed = True
if self.pipeline.dequeue_on_conflict:
self.log.info("Dequeuing change %s because "
"of a git merge error" % item.change)
self.dequeueItem(item, keep_severed_heads=False)
try:
self.reportItem(item)
except MergeFailure:
pass
return changed
if self.checkForChangesNeededBy(item.change) is not True: if self.checkForChangesNeededBy(item.change) is not True:
# It's not okay to enqueue this change, we should remove it. # It's not okay to enqueue this change, we should remove it.
self.log.info("Dequeuing change %s because " self.log.info("Dequeuing change %s because "
"it can no longer merge" % item.change) "it can no longer merge" % item.change)
self.cancelJobs(item) self.cancelJobs(item)
self.dequeueItem(item, keep_severed_heads=False) self.dequeueItem(item)
self.pipeline.setDequeuedNeedingChange(item) self.pipeline.setDequeuedNeedingChange(item)
try: try:
self.reportItem(item) self.reportItem(item)
except MergeFailure: except MergeFailure:
pass pass
changed = True return (True, nnfi)
return changed dep_item = self.getFailingDependentItem(item)
if not item_ahead: if dep_item:
merge_failed = False failing_reasons.append('a needed change is failing')
if self.pipeline.areAllJobsComplete(item): self.cancelJobs(item, prime=False)
try:
self.reportItem(item)
except MergeFailure:
merge_failed = True
self.dequeueItem(item)
changed = True
if merge_failed or self.pipeline.didAnyJobFail(item):
if item_behind:
self.cancelJobs(item_behind)
changed = True
self.dequeueItem(item)
else: else:
if self.pipeline.didAnyJobFail(item): if (item_ahead and item_ahead != nnfi and
if item_behind: not item_ahead.change.is_merged):
if self.cancelJobs(item_behind, prime=False): # Our current base is different than what we expected,
changed = True # and it's not because our current base merged. Something
# don't restart yet; this change will eventually become # ahead must have failed.
# the head self.log.info("Resetting builds for change %s because the "
"item ahead, %s, is not the nearest non-failing "
"item, %s" % (item.change, item_ahead, nnfi))
change_queue.moveItem(item, nnfi)
changed = True
self.cancelJobs(item)
self.prepareRef(item)
if item.current_build_set.unable_to_merge:
failing_reasons.append("merge conflict")
if self.launchJobs(item): if self.launchJobs(item):
changed = True changed = True
return changed if self.pipeline.didAnyJobFail(item):
failing_reasons.append("at least one job failed")
if (not item_ahead) and self.pipeline.areAllJobsComplete(item):
try:
self.reportItem(item)
except MergeFailure:
failing_reasons.append("did not merge")
for item_behind in item.items_behind:
self.log.info("Resetting builds for change %s because the "
"item ahead, %s, failed to merge" %
(item_behind.change, item))
self.cancelJobs(item_behind)
self.dequeueItem(item)
changed = True
elif not failing_reasons:
nnfi = item
item.current_build_set.failing_reasons = failing_reasons
if failing_reasons:
self.log.debug("%s is a failing item because %s" %
(item, failing_reasons))
return (changed, nnfi)
def processQueue(self): def processQueue(self):
# Do whatever needs to be done for each change in the queue # Do whatever needs to be done for each change in the queue
self.log.debug("Starting queue processor: %s" % self.pipeline.name) self.log.debug("Starting queue processor: %s" % self.pipeline.name)
changed = False changed = False
for item in self.pipeline.getAllItems(): for queue in self.pipeline.queues:
if self._processOneItem(item): queue_changed = False
nnfi = None # Nearest non-failing item
for item in queue.queue[:]:
item_changed, nnfi = self._processOneItem(item, nnfi)
if item_changed:
queue_changed = True
self.reportStats(item)
if queue_changed:
changed = True changed = True
self.reportStats(item) status = ''
for item in queue.queue:
status += self.pipeline.formatStatus(item)
if status:
self.log.debug("Queue %s status is now:\n %s" %
(queue.name, status))
self.log.debug("Finished queue processor: %s (changed: %s)" % self.log.debug("Finished queue processor: %s (changed: %s)" %
(self.pipeline.name, changed)) (self.pipeline.name, changed))
return changed return changed
@ -1079,8 +1080,8 @@ class BasePipelineManager(object):
del self.building_jobs[build] del self.building_jobs[build]
self.pipeline.setResult(change, build) self.pipeline.setResult(change, build)
self.log.info("Change %s status is now:\n %s" % self.log.debug("Change %s status is now:\n %s" %
(change, self.pipeline.formatStatus(change))) (change, self.pipeline.formatStatus(change)))
self.updateBuildDescriptions(build.build_set) self.updateBuildDescriptions(build.build_set)
while self.processQueue(): while self.processQueue():
pass pass
@ -1444,3 +1445,15 @@ class DependentPipelineManager(BasePipelineManager):
self.log.debug(" Change %s is needed but can not be merged" % self.log.debug(" Change %s is needed but can not be merged" %
change.needs_change) change.needs_change)
return False return False
def getFailingDependentItem(self, item):
if not hasattr(item.change, 'needs_change'):
return None
if not item.change.needs_change:
return None
needs_item = self.getItemForChange(item.change.needs_change)
if not needs_item:
return None
if needs_item.current_build_set.failing_reasons:
return needs_item
return None