@ -133,8 +133,6 @@ class Scheduler(threading.Thread):
" Build succeeded. " )
pipeline . dequeue_on_new_patchset = conf_pipeline . get (
' dequeue-on-new-patchset ' , True )
pipeline . dequeue_on_conflict = conf_pipeline . get (
' dequeue-on-conflict ' , True )
action_reporters = { }
for action in [ ' start ' , ' success ' , ' failure ' ] :
@ -456,10 +454,9 @@ class Scheduler(threading.Thread):
name )
items_to_remove = [ ]
for shared_queue in old_pipeline . queues :
for item in ( shared_queue . queue +
shared_queue . severed_heads ) :
for item in shared_queue . queue :
item . item_ahead = None
item . item_behind = None
item . items _behind = [ ]
item . pipeline = None
project = layout . projects . get ( item . change . project . name )
if not project :
@ -470,9 +467,7 @@ class Scheduler(threading.Thread):
items_to_remove . append ( item )
continue
item . change . project = project
severed = item in shared_queue . severed_heads
if not new_pipeline . manager . reEnqueueItem (
item , severed = severed ) :
if not new_pipeline . manager . reEnqueueItem ( item ) :
items_to_remove . append ( item )
builds_to_remove = [ ]
for build , item in old_pipeline . manager . building_jobs . items ( ) :
@ -794,6 +789,9 @@ class BasePipelineManager(object):
def checkForChangesNeededBy ( self , change ) :
return True
def getFailingDependentItem ( self , item ) :
return None
def getDependentItems ( self , item ) :
orig_item = item
items = [ ]
@ -805,6 +803,12 @@ class BasePipelineManager(object):
[ x . change for x in items ] ) )
return items
def getItemForChange ( self , change ) :
for item in self . pipeline . getAllItems ( ) :
if item . change . equals ( change ) :
return item
return None
def findOldVersionOfChangeAlreadyInQueue ( self , change ) :
for c in self . pipeline . getChangesInQueue ( ) :
if change . isUpdateOf ( c ) :
@ -820,15 +824,12 @@ class BasePipelineManager(object):
( change , old_change , old_change ) )
self . removeChange ( old_change )
def reEnqueueItem ( self , item , severed = False ) :
def reEnqueueItem ( self , item ) :
change_queue = self . pipeline . getQueue ( item . change . project )
if change_queue :
self . log . debug ( " Re-enqueing change %s in queue %s " %
( item . change , change_queue ) )
if severed :
change_queue . addSeveredHead ( item )
else :
change_queue . enqueueItem ( item )
change_queue . enqueueItem ( item )
self . reportStats ( item )
return True
else :
@ -869,15 +870,10 @@ class BasePipelineManager(object):
change . project )
return False
def dequeueItem ( self , item , keep_severed_heads = True ) :
def dequeueItem ( self , item ) :
self . log . debug ( " Removing change %s from queue " % item . change )
item_ahead = item . item_ahead
change_queue = self . pipeline . getQueue ( item . change . project )
change_queue . dequeueItem ( item )
if ( keep_severed_heads and not item_ahead and
( item . change . is_reportable and not item . reported ) ) :
self . log . debug ( " Adding %s as a severed head " % item . change )
change_queue . addSeveredHead ( item )
self . sched . _maintain_trigger_cache = True
def removeChange ( self , change ) :
@ -888,7 +884,7 @@ class BasePipelineManager(object):
self . log . debug ( " Canceling builds behind change: %s "
" because it is being removed. " % item . change )
self . cancelJobs ( item )
self . dequeueItem ( item , keep_severed_heads = False )
self . dequeueItem ( item )
self . reportStats ( item )
def prepareRef ( self , item ) :
@ -901,29 +897,14 @@ class BasePipelineManager(object):
ref = item . current_build_set . ref
dependent_items = self . getDependentItems ( item )
dependent_items . reverse ( )
dependent_str = ' , ' . join (
[ ' %s ' % i . change . number for i in dependent_items
if i . change . project == item . change . project ] )
if dependent_str :
msg = \
" This change was unable to be automatically merged " \
" with the current state of the repository and the " \
" following changes which were enqueued ahead of it: " \
" %s . Please rebase your change and upload a new " \
" patchset. " % dependent_str
else :
msg = " This change was unable to be automatically merged " \
" with the current state of the repository. Please " \
" rebase your change and upload a new patchset. "
all_items = dependent_items + [ item ]
if ( dependent_items and
not dependent_items [ - 1 ] . current_build_set . commit ) :
self . pipeline . setUnableToMerge ( item , msg )
return True
commit = self . sched . merger . mergeChanges ( all_items , ref )
item . current_build_set . commit = commit
if not commit :
self . log . info ( " Unable to merge change %s " % item . change )
msg = ( " This change was unable to be automatically merged "
" with the current state of the repository. Please "
" rebase your change and upload a new patchset. " )
self . pipeline . setUnableToMerge ( item , msg )
return True
return False
@ -971,74 +952,94 @@ class BasePipelineManager(object):
self . log . debug ( " Removing build %s from running builds " % build )
build . result = ' CANCELED '
del self . building_jobs [ build ]
if item . item_behind :
for item_behind in item . items _behind :
self . log . debug ( " Canceling jobs for change %s , behind change %s " %
( item . item _behind. change , item . change ) )
if self . cancelJobs ( item . item _behind, prime = prime ) :
( item_behind . change , item . change ) )
if self . cancelJobs ( item_behind , prime = prime ) :
canceled = True
return canceled
def _processOneItem ( self , item ) :
def _processOneItem ( self , item , nnfi ) :
changed = False
item_ahead = item . item_ahead
item_behind = item . item_behind
if self . prepareRef ( item ) :
changed = True
if self . pipeline . dequeue_on_conflict :
self . log . info ( " Dequeuing change %s because "
" of a git merge error " % item . change )
self . dequeueItem ( item , keep_severed_heads = False )
try :
self . reportItem ( item )
except MergeFailure :
pass
return changed
change_queue = self . pipeline . getQueue ( item . change . project )
failing_reasons = [ ] # Reasons this item is failing
if self . checkForChangesNeededBy ( item . change ) is not True :
# It's not okay to enqueue this change, we should remove it.
self . log . info ( " Dequeuing change %s because "
" it can no longer merge " % item . change )
self . cancelJobs ( item )
self . dequeueItem ( item , keep_severed_heads = False )
self . dequeueItem ( item )
self . pipeline . setDequeuedNeedingChange ( item )
try :
self . reportItem ( item )
except MergeFailure :
pass
changed = True
return changed
if not item_ahead :
merge_failed = False
if self . pipeline . areAllJobsComplete ( item ) :
try :
self . reportItem ( item )
except MergeFailure :
merge_failed = True
self . dequeueItem ( item )
changed = True
if merge_failed or self . pipeline . didAnyJobFail ( item ) :
if item_behind :
self . cancelJobs ( item_behind )
changed = True
self . dequeueItem ( item )
return ( True , nnfi )
dep_item = self . getFailingDependentItem ( item )
if dep_item :
failing_reasons . append ( ' a needed change is failing ' )
self . cancelJobs ( item , prime = False )
else :
if self . pipeline . didAnyJobFail ( item ) :
if item_behind :
if self . cancelJobs ( item_behind , prime = False ) :
changed = True
# don't restart yet; this change will eventually become
# the head
if ( item_ahead and item_ahead != nnfi and
not item_ahead . change . is_merged ) :
# Our current base is different than what we expected,
# and it's not because our current base merged. Something
# ahead must have failed.
self . log . info ( " Resetting builds for change %s because the "
" item ahead, %s , is not the nearest non-failing "
" item, %s " % ( item . change , item_ahead , nnfi ) )
change_queue . moveItem ( item , nnfi )
changed = True
self . cancelJobs ( item )
self . prepareRef ( item )
if item . current_build_set . unable_to_merge :
failing_reasons . append ( " merge conflict " )
if self . launchJobs ( item ) :
changed = True
return changed
if self . pipeline . didAnyJobFail ( item ) :
failing_reasons . append ( " at least one job failed " )
if ( not item_ahead ) and self . pipeline . areAllJobsComplete ( item ) :
try :
self . reportItem ( item )
except MergeFailure :
failing_reasons . append ( " did not merge " )
for item_behind in item . items_behind :
self . log . info ( " Resetting builds for change %s because the "
" item ahead, %s , failed to merge " %
( item_behind . change , item ) )
self . cancelJobs ( item_behind )
self . dequeueItem ( item )
changed = True
elif not failing_reasons :
nnfi = item
item . current_build_set . failing_reasons = failing_reasons
if failing_reasons :
self . log . debug ( " %s is a failing item because %s " %
( item , failing_reasons ) )
return ( changed , nnfi )
def processQueue ( self ) :
# Do whatever needs to be done for each change in the queue
self . log . debug ( " Starting queue processor: %s " % self . pipeline . name )
changed = False
for item in self . pipeline . getAllItems ( ) :
if self . _processOneItem ( item ) :
for queue in self . pipeline . queues :
queue_changed = False
nnfi = None # Nearest non-failing item
for item in queue . queue [ : ] :
item_changed , nnfi = self . _processOneItem ( item , nnfi )
if item_changed :
queue_changed = True
self . reportStats ( item )
if queue_changed :
changed = True
self . reportStats ( item )
status = ' '
for item in queue . queue :
status + = self . pipeline . formatStatus ( item )
if status :
self . log . debug ( " Queue %s status is now: \n %s " %
( queue . name , status ) )
self . log . debug ( " Finished queue processor: %s (changed: %s ) " %
( self . pipeline . name , changed ) )
return changed
@ -1079,8 +1080,8 @@ class BasePipelineManager(object):
del self . building_jobs [ build ]
self . pipeline . setResult ( change , build )
self . log . info ( " Change %s status is now: \n %s " %
( change , self . pipeline . formatStatus ( change ) ) )
self . log . debug ( " Change %s status is now: \n %s " %
( change , self . pipeline . formatStatus ( change ) ) )
self . updateBuildDescriptions ( build . build_set )
while self . processQueue ( ) :
pass
@ -1444,3 +1445,15 @@ class DependentPipelineManager(BasePipelineManager):
self . log . debug ( " Change %s is needed but can not be merged " %
change . needs_change )
return False
def getFailingDependentItem ( self , item ) :
if not hasattr ( item . change , ' needs_change ' ) :
return None
if not item . change . needs_change :
return None
needs_item = self . getItemForChange ( item . change . needs_change )
if not needs_item :
return None
if needs_item . current_build_set . failing_reasons :
return needs_item
return None