From e421a23f4ffad6330df4bc85f2b55b323b20895a Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Wed, 25 Jul 2012 16:59:21 -0700 Subject: [PATCH] Test changes in dependency order. For dependent change queues, test the changes in dependency order, as reported by gerrit. Effectively, this means that a change is enqueued only if: * it can be merged * the change it depends on has been merged or: * can be merged * is the current patchset Also, once a change is enqueued, changes that depend on it are examined to see if they can be merged (assuming this one succeeds). If so, they are enqueued. Change-Id: I917b2a2d1fc94c3aa2de406ed4b9f795a8abb079 --- zuul/lib/gerrit.py | 25 +++++++--- zuul/model.py | 53 ++++++++++++-------- zuul/scheduler.py | 83 ++++++++++++++++++++++++++++--- zuul/trigger/gerrit.py | 110 ++++++++++++++++++++++++++++++++++++++++- 4 files changed, 234 insertions(+), 37 deletions(-) diff --git a/zuul/lib/gerrit.py b/zuul/lib/gerrit.py index 13b74fcef4..fd5ebe4857 100644 --- a/zuul/lib/gerrit.py +++ b/zuul/lib/gerrit.py @@ -91,6 +91,7 @@ class Gerrit(object): self.keyfile = keyfile self.watcher_thread = None self.event_queue = None + self.client = None def startWatching(self): self.event_queue = Queue.Queue() @@ -120,9 +121,12 @@ class Gerrit(object): out, err = self._ssh(cmd) return err - def query(self, change): - cmd = 'gerrit query --format json %s"' % ( - change) + def query(self, query): + args = '--all-approvals --comments --commit-message' + args += ' --current-patch-set --dependencies --files' + args += ' --patch-sets --submit-records' + cmd = 'gerrit query --format json %s %s' % ( + args, query) out, err = self._ssh(cmd) if not out: return False @@ -136,7 +140,7 @@ class Gerrit(object): pprint.pformat(data))) return data - def _ssh(self, command): + def _open(self): client = paramiko.SSHClient() client.load_system_host_keys() client.set_missing_host_key_policy(paramiko.WarningPolicy()) @@ -144,9 +148,18 @@ class Gerrit(object): username=self.username, port=self.port, key_filename=self.keyfile) + self.client = client - self.log.debug("SSH command:\n%s" % command) - stdin, stdout, stderr = client.exec_command(command) + def _ssh(self, command): + if not self.client: + self._open() + + try: + self.log.debug("SSH command:\n%s" % command) + stdin, stdout, stderr = self.client.exec_command(command) + except: + self._open() + stdin, stdout, stderr = self.client.exec_command(command) out = stdout.read() self.log.debug("SSH received stdout:\n%s" % out) diff --git a/zuul/model.py b/zuul/model.py index d16ab0f5d6..b0c47c8f47 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -62,7 +62,8 @@ class Job(object): self.success_message = None self.parameter_function = None self.hold_following_changes = False - self.event_filters = [] + self.branches = [] + self._branches = [] def __str__(self): return self.name @@ -75,13 +76,14 @@ class Job(object): self.success_message = other.success_message self.parameter_function = other.parameter_function self.hold_following_changes = other.hold_following_changes - self.event_filters = other.event_filters[:] + self.branches = other.branches[:] + self._branches = other._branches[:] - def eventMatches(self, event): - if not self.event_filters: + def changeMatches(self, change): + if not self.branches: return True - for ef in self.event_filters: - if ef.matches(event): + for branch in self.branches: + if branch.match(change.branch): return True return False @@ -293,7 +295,7 @@ class BuildSet(object): class Change(object): - def __init__(self, queue_name, project, event): + def __init__(self, queue_name, project): self.queue_name = queue_name self.project = project self.branch = None @@ -304,19 +306,12 @@ class Change(object): self.ref = None self.oldrev = None self.newrev = None - self.event = event self.reported = False - - if event.change_number: - self.branch = event.branch - self.number = event.change_number - self.url = event.change_url - self.patchset = event.patch_number - self.refspec = event.refspec - if event.ref: - self.ref = event.ref - self.oldrev = event.oldrev - self.newrev = event.newrev + self.needs_change = None + self.needed_by_changes = [] + self.is_current_patchset = True + self.can_merge = False + self.is_merged = False self.build_sets = [] self.change_ahead = None @@ -346,7 +341,7 @@ class Change(object): return False def _filterJobs(self, jobs): - return filter(lambda job: job.eventMatches(self.event), jobs) + return filter(lambda job: job.changeMatches(self), jobs) def formatStatus(self, indent=0, html=False): indent_str = ' ' * indent @@ -448,7 +443,7 @@ class Change(object): return [] for tree in job_trees: job = tree.job - if not job.eventMatches(self.event): + if not job.changeMatches(self): continue result = None if job: @@ -537,6 +532,22 @@ class TriggerEvent(object): return ret + def getChange(self, manager_name, project, trigger): + # TODO: make the scheduler deal with events (which may have + # changes) rather than changes so that we don't have to create + # "fake" changes for events that aren't associated with changes. + + if self.change_number: + change = trigger.getChange(self.change_number, self.patch_number, + manager_name) + if self.ref: + change = Change(manager_name, project) + change.ref = self.ref + change.oldrev = self.oldrev + change.newrev = self.newrev + + return change + class EventFilter(object): def __init__(self, types=[], branches=[], refs=[], approvals={}, diff --git a/zuul/scheduler.py b/zuul/scheduler.py index fe068164f5..e42ae26d06 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -20,7 +20,7 @@ import re import threading import yaml -from model import Job, Change, Project, ChangeQueue, EventFilter +from model import Job, Project, ChangeQueue, EventFilter class Scheduler(threading.Thread): @@ -113,8 +113,8 @@ class Scheduler(threading.Thread): job.parameter_function = func branches = toList(config_job.get('branch')) if branches: - f = EventFilter(branches=branches) - job.event_filters = [f] + job._branches = branches + job.branches = [re.compile(x) for x in branches] def add_jobs(job_tree, config_jobs): for job in config_jobs: @@ -313,8 +313,8 @@ class Scheduler(threading.Thread): if not manager.eventMatches(event): self.log.debug("Event %s ignored by %s" % (event, manager)) continue - change = Change(manager.name, project, event) - self.log.info("Adding %s, %s to to %s" % + change = event.getChange(manager.name, project, self.trigger) + self.log.info("Adding %s, %s to %s" % (project, change, manager)) manager.addChange(change) @@ -372,8 +372,8 @@ class BaseQueueManager(object): istr = ' ' + ' ' * indent if tree.job: efilters = '' - for e in tree.job.event_filters: - efilters += str(e) + for b in tree.job._branches: + efilters += str(b) if efilters: efilters = ' ' + efilters hold = '' @@ -400,6 +400,13 @@ class BaseQueueManager(object): self.log.info(" On failure:") self.log.info(" %s" % self.failure_action) + def getSubmitAllowNeeds(self): + # Get a list of code review labels that are allowed to be + # "needed" in the submit records for a change, with respect + # to this queue. In other words, the list of review labels + # this queue itself is likely to set before submitting. + return self.success_action.keys() + def eventMatches(self, event): for ef in self.event_filters: if ef.matches(event): @@ -595,10 +602,66 @@ class DependentQueueManager(BaseQueueManager): return queue self.log.error("Unable to find change queue for project %s" % project) + def _checkForChangesNeededBy(self, change): + self.log.debug("Checking for changes needed by %s:" % change) + # Return true if okay to proceed enqueing this change, + # false if the change should not be enqueued. + if not change.needs_change: + self.log.debug(" No changes needed") + return True + if change.needs_change.is_merged: + self.log.debug(" Needed change is merged") + return True + if not change.needs_change.is_current_patchset: + self.log.debug(" Needed change is not the current patchset") + return False + change_queue = self.getQueue(change.project) + if change.needs_change in change_queue.queue: + self.log.debug(" Needed change is already ahead in the queue") + return True + if change.needs_change.can_merge: + # It can merge, so attempt to enqueue it _ahead_ of this change. + # If that works we can enqueue this change, otherwise, we can't. + self.log.debug(" Change %s must be merged ahead of %s" % + (change.needs_change, change)) + return self.addChange(change.needs_change) + # The needed change can't be merged. + self.log.debug(" Change %s is needed but can not be merged" % + change.needs_change) + return False + + def _checkForChangesNeeding(self, change): + to_enqueue = [] + self.log.debug("Checking for changes needing %s:" % change) + for needs in change.needed_by_changes: + if needs.can_merge: + self.log.debug(" Change %s needs %s and is ready to merge" % + (needs, change)) + to_enqueue.append(needs) + return to_enqueue + def addChange(self, change): + # Returns true if added (or not needed), false if failed to add if self.isChangeAlreadyInQueue(change): self.log.debug("Change %s is already in queue, ignoring" % change) - return + return True + + if not change.can_merge: + self.log.debug("Change %s can not merge, ignoring" % change) + return False + + if not self._checkForChangesNeededBy(change): + return False + + to_enqueue = self._checkForChangesNeeding(change) + # TODO(jeblair): Consider re-ordering this so that the dependent + # changes aren't checked until closer when they are needed. + + if self.isChangeAlreadyInQueue(change): + self.log.debug("Change %s has been added to queue, ignoring" % + change) + return True + self.log.debug("Adding change %s" % change) change_queue = self.getQueue(change.project) if change_queue: @@ -606,6 +669,10 @@ class DependentQueueManager(BaseQueueManager): (change, change_queue)) change_queue.enqueueChange(change) self._addChange(change) + for needs in to_enqueue: + self.addChange(needs) + return True + return False def _getDependentChanges(self, change): orig_change = change diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py index accfc0c776..9e57e56f98 100644 --- a/zuul/trigger/gerrit.py +++ b/zuul/trigger/gerrit.py @@ -15,7 +15,7 @@ import threading import logging from zuul.lib import gerrit -from zuul.model import TriggerEvent +from zuul.model import TriggerEvent, Change class GerritEventConnector(threading.Thread): @@ -95,13 +95,20 @@ class Gerrit(object): message, action) def isMerged(self, change): - self.log.debug("Checking if change %s is merged", change) + self.log.debug("Checking if change %s is merged" % change) if not change.number: self.log.debug("Change has no number; considering it merged") # Good question. It's probably ref-updated, which, ah, # means it's merged. return True + data = self.gerrit.query(change.number) + change._data = data + change.is_merged = self._isMerged(change) + return change.is_merged + + def _isMerged(self, change): + data = change._data if not data: return False status = data.get('status') @@ -110,3 +117,102 @@ class Gerrit(object): self.log.debug("Change %s status: %s" % (change, status)) if status == 'MERGED' or status == 'SUBMITTED': return True + + def _canMerge(self, change, allow_needs): + if not change.number: + self.log.debug("Change has no number; considering it merged") + # Good question. It's probably ref-updated, which, ah, + # means it's merged. + return True + data = change._data + if not data: + return False + if not 'submitRecords' in data: + return False + try: + for sr in data['submitRecords']: + if sr['status'] == 'OK': + return True + elif sr['status'] == 'NOT_READY': + for label in sr['labels']: + if label['status'] == 'OK': + continue + elif label['status'] in ['NEED', 'REJECT']: + # It may be our own rejection, so we ignore + if label['label'].lower() not in allow_needs: + return False + continue + else: + # IMPOSSIBLE + return False + else: + # CLOSED, RULE_ERROR + return False + except: + self.log.exception("Exception determining whether change" + "%s can merge:" % change) + return False + return True + + def getChange(self, number, patchset, queue_name, changes=None): + # TODO: queue_name is screwing up the data model, refactor + # the queue context so it isn't necessary. + self.log.info("Getting information for %s,%s" % (number, patchset)) + if changes is None: + changes = {} + data = self.gerrit.query(number) + project = self.sched.projects[data['project']] + change = Change(queue_name, project) + change._data = data + + change.number = number + change.patchset = patchset + change.project = project + change.branch = data['branch'] + change.url = data['url'] + max_ps = 0 + for ps in data['patchSets']: + if ps['number'] == patchset: + change.refspec = ps['ref'] + if int(ps['number']) > int(max_ps): + max_ps = ps['number'] + if max_ps == patchset: + change.is_current_patchset = True + else: + change.is_current_patchset = False + + manager = self.sched.queue_managers[queue_name] + change.can_merge = self._canMerge(change, + manager.getSubmitAllowNeeds()) + change.is_merged = self._isMerged(change) + if change.is_merged: + # This change is merged, so we don't need to look any further + # for dependencies. + return change + + key = '%s,%s' % (number, patchset) + changes[key] = change + + def cachedGetChange(num, ps): + key = '%s,%s' % (num, ps) + if key in changes: + return changes.get(key) + c = self.getChange(num, ps, queue_name, changes) + return c + + if 'dependsOn' in data: + parts = data['dependsOn'][0]['ref'].split('/') + dep_num, dep_ps = parts[3], parts[4] + dep = cachedGetChange(dep_num, dep_ps) + if not dep.is_merged: + change.needs_change = dep + + if 'neededBy' in data: + for needed in data['neededBy']: + parts = needed['ref'].split('/') + dep_num, dep_ps = parts[3], parts[4] + dep = cachedGetChange(dep_num, dep_ps) + if not dep.is_merged and dep.is_current_patchset: + change.needed_by_changes.append(dep) + + return change