Test changes in dependency order.

For dependent change queues, test the changes in dependency order,
as reported by gerrit.

Effectively, this means that a change is enqueued only if:
  * it can be merged
  * the change it depends on has been merged or:
    * can be merged
    * is the current patchset

Also, once a change is enqueued, changes that depend on it are
examined to see if they can be merged (assuming this one succeeds).
If so, they are enqueued.

Change-Id: I917b2a2d1fc94c3aa2de406ed4b9f795a8abb079
This commit is contained in:
James E. Blair 2012-07-25 16:59:21 -07:00
parent 1c860d7f13
commit e421a23f4f
4 changed files with 234 additions and 37 deletions

View File

@ -91,6 +91,7 @@ class Gerrit(object):
self.keyfile = keyfile
self.watcher_thread = None
self.event_queue = None
self.client = None
def startWatching(self):
self.event_queue = Queue.Queue()
@ -120,9 +121,12 @@ class Gerrit(object):
out, err = self._ssh(cmd)
return err
def query(self, change):
cmd = 'gerrit query --format json %s"' % (
change)
def query(self, query):
args = '--all-approvals --comments --commit-message'
args += ' --current-patch-set --dependencies --files'
args += ' --patch-sets --submit-records'
cmd = 'gerrit query --format json %s %s' % (
args, query)
out, err = self._ssh(cmd)
if not out:
return False
@ -136,7 +140,7 @@ class Gerrit(object):
pprint.pformat(data)))
return data
def _ssh(self, command):
def _open(self):
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.WarningPolicy())
@ -144,9 +148,18 @@ class Gerrit(object):
username=self.username,
port=self.port,
key_filename=self.keyfile)
self.client = client
self.log.debug("SSH command:\n%s" % command)
stdin, stdout, stderr = client.exec_command(command)
def _ssh(self, command):
if not self.client:
self._open()
try:
self.log.debug("SSH command:\n%s" % command)
stdin, stdout, stderr = self.client.exec_command(command)
except:
self._open()
stdin, stdout, stderr = self.client.exec_command(command)
out = stdout.read()
self.log.debug("SSH received stdout:\n%s" % out)

View File

@ -62,7 +62,8 @@ class Job(object):
self.success_message = None
self.parameter_function = None
self.hold_following_changes = False
self.event_filters = []
self.branches = []
self._branches = []
def __str__(self):
return self.name
@ -75,13 +76,14 @@ class Job(object):
self.success_message = other.success_message
self.parameter_function = other.parameter_function
self.hold_following_changes = other.hold_following_changes
self.event_filters = other.event_filters[:]
self.branches = other.branches[:]
self._branches = other._branches[:]
def eventMatches(self, event):
if not self.event_filters:
def changeMatches(self, change):
if not self.branches:
return True
for ef in self.event_filters:
if ef.matches(event):
for branch in self.branches:
if branch.match(change.branch):
return True
return False
@ -293,7 +295,7 @@ class BuildSet(object):
class Change(object):
def __init__(self, queue_name, project, event):
def __init__(self, queue_name, project):
self.queue_name = queue_name
self.project = project
self.branch = None
@ -304,19 +306,12 @@ class Change(object):
self.ref = None
self.oldrev = None
self.newrev = None
self.event = event
self.reported = False
if event.change_number:
self.branch = event.branch
self.number = event.change_number
self.url = event.change_url
self.patchset = event.patch_number
self.refspec = event.refspec
if event.ref:
self.ref = event.ref
self.oldrev = event.oldrev
self.newrev = event.newrev
self.needs_change = None
self.needed_by_changes = []
self.is_current_patchset = True
self.can_merge = False
self.is_merged = False
self.build_sets = []
self.change_ahead = None
@ -346,7 +341,7 @@ class Change(object):
return False
def _filterJobs(self, jobs):
return filter(lambda job: job.eventMatches(self.event), jobs)
return filter(lambda job: job.changeMatches(self), jobs)
def formatStatus(self, indent=0, html=False):
indent_str = ' ' * indent
@ -448,7 +443,7 @@ class Change(object):
return []
for tree in job_trees:
job = tree.job
if not job.eventMatches(self.event):
if not job.changeMatches(self):
continue
result = None
if job:
@ -537,6 +532,22 @@ class TriggerEvent(object):
return ret
def getChange(self, manager_name, project, trigger):
# TODO: make the scheduler deal with events (which may have
# changes) rather than changes so that we don't have to create
# "fake" changes for events that aren't associated with changes.
if self.change_number:
change = trigger.getChange(self.change_number, self.patch_number,
manager_name)
if self.ref:
change = Change(manager_name, project)
change.ref = self.ref
change.oldrev = self.oldrev
change.newrev = self.newrev
return change
class EventFilter(object):
def __init__(self, types=[], branches=[], refs=[], approvals={},

View File

@ -20,7 +20,7 @@ import re
import threading
import yaml
from model import Job, Change, Project, ChangeQueue, EventFilter
from model import Job, Project, ChangeQueue, EventFilter
class Scheduler(threading.Thread):
@ -113,8 +113,8 @@ class Scheduler(threading.Thread):
job.parameter_function = func
branches = toList(config_job.get('branch'))
if branches:
f = EventFilter(branches=branches)
job.event_filters = [f]
job._branches = branches
job.branches = [re.compile(x) for x in branches]
def add_jobs(job_tree, config_jobs):
for job in config_jobs:
@ -313,8 +313,8 @@ class Scheduler(threading.Thread):
if not manager.eventMatches(event):
self.log.debug("Event %s ignored by %s" % (event, manager))
continue
change = Change(manager.name, project, event)
self.log.info("Adding %s, %s to to %s" %
change = event.getChange(manager.name, project, self.trigger)
self.log.info("Adding %s, %s to %s" %
(project, change, manager))
manager.addChange(change)
@ -372,8 +372,8 @@ class BaseQueueManager(object):
istr = ' ' + ' ' * indent
if tree.job:
efilters = ''
for e in tree.job.event_filters:
efilters += str(e)
for b in tree.job._branches:
efilters += str(b)
if efilters:
efilters = ' ' + efilters
hold = ''
@ -400,6 +400,13 @@ class BaseQueueManager(object):
self.log.info(" On failure:")
self.log.info(" %s" % self.failure_action)
def getSubmitAllowNeeds(self):
# Get a list of code review labels that are allowed to be
# "needed" in the submit records for a change, with respect
# to this queue. In other words, the list of review labels
# this queue itself is likely to set before submitting.
return self.success_action.keys()
def eventMatches(self, event):
for ef in self.event_filters:
if ef.matches(event):
@ -595,10 +602,66 @@ class DependentQueueManager(BaseQueueManager):
return queue
self.log.error("Unable to find change queue for project %s" % project)
def _checkForChangesNeededBy(self, change):
self.log.debug("Checking for changes needed by %s:" % change)
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
if not change.needs_change:
self.log.debug(" No changes needed")
return True
if change.needs_change.is_merged:
self.log.debug(" Needed change is merged")
return True
if not change.needs_change.is_current_patchset:
self.log.debug(" Needed change is not the current patchset")
return False
change_queue = self.getQueue(change.project)
if change.needs_change in change_queue.queue:
self.log.debug(" Needed change is already ahead in the queue")
return True
if change.needs_change.can_merge:
# It can merge, so attempt to enqueue it _ahead_ of this change.
# If that works we can enqueue this change, otherwise, we can't.
self.log.debug(" Change %s must be merged ahead of %s" %
(change.needs_change, change))
return self.addChange(change.needs_change)
# The needed change can't be merged.
self.log.debug(" Change %s is needed but can not be merged" %
change.needs_change)
return False
def _checkForChangesNeeding(self, change):
to_enqueue = []
self.log.debug("Checking for changes needing %s:" % change)
for needs in change.needed_by_changes:
if needs.can_merge:
self.log.debug(" Change %s needs %s and is ready to merge" %
(needs, change))
to_enqueue.append(needs)
return to_enqueue
def addChange(self, change):
# Returns true if added (or not needed), false if failed to add
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s is already in queue, ignoring" % change)
return
return True
if not change.can_merge:
self.log.debug("Change %s can not merge, ignoring" % change)
return False
if not self._checkForChangesNeededBy(change):
return False
to_enqueue = self._checkForChangesNeeding(change)
# TODO(jeblair): Consider re-ordering this so that the dependent
# changes aren't checked until closer when they are needed.
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s has been added to queue, ignoring" %
change)
return True
self.log.debug("Adding change %s" % change)
change_queue = self.getQueue(change.project)
if change_queue:
@ -606,6 +669,10 @@ class DependentQueueManager(BaseQueueManager):
(change, change_queue))
change_queue.enqueueChange(change)
self._addChange(change)
for needs in to_enqueue:
self.addChange(needs)
return True
return False
def _getDependentChanges(self, change):
orig_change = change

View File

@ -15,7 +15,7 @@
import threading
import logging
from zuul.lib import gerrit
from zuul.model import TriggerEvent
from zuul.model import TriggerEvent, Change
class GerritEventConnector(threading.Thread):
@ -95,13 +95,20 @@ class Gerrit(object):
message, action)
def isMerged(self, change):
self.log.debug("Checking if change %s is merged", change)
self.log.debug("Checking if change %s is merged" % change)
if not change.number:
self.log.debug("Change has no number; considering it merged")
# Good question. It's probably ref-updated, which, ah,
# means it's merged.
return True
data = self.gerrit.query(change.number)
change._data = data
change.is_merged = self._isMerged(change)
return change.is_merged
def _isMerged(self, change):
data = change._data
if not data:
return False
status = data.get('status')
@ -110,3 +117,102 @@ class Gerrit(object):
self.log.debug("Change %s status: %s" % (change, status))
if status == 'MERGED' or status == 'SUBMITTED':
return True
def _canMerge(self, change, allow_needs):
if not change.number:
self.log.debug("Change has no number; considering it merged")
# Good question. It's probably ref-updated, which, ah,
# means it's merged.
return True
data = change._data
if not data:
return False
if not 'submitRecords' in data:
return False
try:
for sr in data['submitRecords']:
if sr['status'] == 'OK':
return True
elif sr['status'] == 'NOT_READY':
for label in sr['labels']:
if label['status'] == 'OK':
continue
elif label['status'] in ['NEED', 'REJECT']:
# It may be our own rejection, so we ignore
if label['label'].lower() not in allow_needs:
return False
continue
else:
# IMPOSSIBLE
return False
else:
# CLOSED, RULE_ERROR
return False
except:
self.log.exception("Exception determining whether change"
"%s can merge:" % change)
return False
return True
def getChange(self, number, patchset, queue_name, changes=None):
# TODO: queue_name is screwing up the data model, refactor
# the queue context so it isn't necessary.
self.log.info("Getting information for %s,%s" % (number, patchset))
if changes is None:
changes = {}
data = self.gerrit.query(number)
project = self.sched.projects[data['project']]
change = Change(queue_name, project)
change._data = data
change.number = number
change.patchset = patchset
change.project = project
change.branch = data['branch']
change.url = data['url']
max_ps = 0
for ps in data['patchSets']:
if ps['number'] == patchset:
change.refspec = ps['ref']
if int(ps['number']) > int(max_ps):
max_ps = ps['number']
if max_ps == patchset:
change.is_current_patchset = True
else:
change.is_current_patchset = False
manager = self.sched.queue_managers[queue_name]
change.can_merge = self._canMerge(change,
manager.getSubmitAllowNeeds())
change.is_merged = self._isMerged(change)
if change.is_merged:
# This change is merged, so we don't need to look any further
# for dependencies.
return change
key = '%s,%s' % (number, patchset)
changes[key] = change
def cachedGetChange(num, ps):
key = '%s,%s' % (num, ps)
if key in changes:
return changes.get(key)
c = self.getChange(num, ps, queue_name, changes)
return c
if 'dependsOn' in data:
parts = data['dependsOn'][0]['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = cachedGetChange(dep_num, dep_ps)
if not dep.is_merged:
change.needs_change = dep
if 'neededBy' in data:
for needed in data['neededBy']:
parts = needed['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = cachedGetChange(dep_num, dep_ps)
if not dep.is_merged and dep.is_current_patchset:
change.needed_by_changes.append(dep)
return change