Move gerrit logic from source to connection

Sources and connections are very tightly coupled in reality.  Rather
than trying to maintain them as two abstractions, consider the
connection to hold all of the information and logic about the reality
of the external resource it represents.  Make sources mere local
data structures that are used to interface a connection with a pipeline.

As seen in subsequent changes, this will allow us to simplify the
interconnections between objects.

Change-Id: I2dd88e1165267e4f987a205ba55923eaec7ea9ce
This commit is contained in:
James E. Blair 2016-03-11 10:38:36 -08:00
parent 96c6bf868a
commit 765e11b657
2 changed files with 301 additions and 304 deletions

View File

@ -13,9 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
import select
import json
import re
import select
import threading
import time
from six.moves import queue as Queue
import paramiko
@ -25,7 +26,8 @@ import voluptuous as v
import urllib2
from zuul.connection import BaseConnection
from zuul.model import TriggerEvent, Project
from zuul.model import TriggerEvent, Project, Change, Ref, NullChange
from zuul import exceptions
class GerritEventConnector(threading.Thread):
@ -107,17 +109,13 @@ class GerritEventConnector(threading.Thread):
# the main thread.
# NOTE(jhesketh): Ideally we'd just remove the change from the
# cache to denote that it needs updating. However the change
# object is already used by Item's and hence BuildSet's etc. and
# object is already used by Items and hence BuildSets etc. and
# we need to update those objects by reference so that they have
# the correct/new information and also avoid hitting gerrit
# multiple times.
if self.connection.attached_to['source']:
self.connection.attached_to['source'][0]._getChange(
event.change_number, event.patch_number, refresh=True)
# We only need to do this once since the connection maintains
# the cache (which is shared between all the sources)
# NOTE(jhesketh): We may couple sources and connections again
# at which point this becomes more sensible.
self.connection._getChange(event.change_number,
event.patch_number,
refresh=True)
self.connection.sched.addEvent(event)
def run(self):
@ -210,6 +208,10 @@ class GerritWatcher(threading.Thread):
class GerritConnection(BaseConnection):
driver_name = 'gerrit'
log = logging.getLogger("connection.gerrit")
depends_on_re = re.compile(r"^Depends-On: (I[0-9a-f]{40})\s*$",
re.MULTILINE | re.IGNORECASE)
replication_timeout = 300
replication_retry_interval = 5
def __init__(self, connection_name, connection_config):
super(GerritConnection, self).__init__(connection_name,
@ -241,18 +243,6 @@ class GerritConnection(BaseConnection):
self.projects[name] = Project(name)
return self.projects[name]
def getCachedChange(self, key):
if key in self._change_cache:
return self._change_cache.get(key)
return None
def updateChangeCache(self, key, value):
self._change_cache[key] = value
def deleteCachedChange(self, key):
if key in self._change_cache:
del self._change_cache[key]
def maintainCache(self, relevant):
# This lets the user supply a list of change objects that are
# still in use. Anything in our cache that isn't in the supplied
@ -264,6 +254,287 @@ class GerritConnection(BaseConnection):
for key in remove:
del self._change_cache[key]
def getChange(self, event):
if event.change_number:
refresh = False
change = self._getChange(event.change_number, event.patch_number,
refresh=refresh)
elif event.ref:
project = self.getProject(event.project_name)
change = Ref(project)
change.ref = event.ref
change.oldrev = event.oldrev
change.newrev = event.newrev
change.url = self._getGitwebUrl(project, sha=event.newrev)
else:
# TODOv3(jeblair): we need to get the project from the event
change = NullChange(project)
return change
def _getChange(self, number, patchset, refresh=False, history=None):
key = '%s,%s' % (number, patchset)
change = self._change_cache.get(key)
if change and not refresh:
return change
if not change:
change = Change(None)
change.number = number
change.patchset = patchset
key = '%s,%s' % (change.number, change.patchset)
self._change_cache[key] = change
try:
self._updateChange(change, history)
except Exception:
if key in self._change_cache:
del self._change_cache[key]
raise
return change
def _getDependsOnFromCommit(self, message):
records = []
seen = set()
for match in self.depends_on_re.findall(message):
if match in seen:
self.log.debug("Ignoring duplicate Depends-On: %s" %
(match,))
continue
seen.add(match)
query = "change:%s" % (match,)
self.log.debug("Running query %s to find needed changes" %
(query,))
records.extend(self.simpleQuery(query))
return records
def _getNeededByFromCommit(self, change_id):
records = []
seen = set()
query = 'message:%s' % change_id
self.log.debug("Running query %s to find changes needed-by" %
(query,))
results = self.simpleQuery(query)
for result in results:
for match in self.depends_on_re.findall(
result['commitMessage']):
if match != change_id:
continue
key = (result['number'], result['currentPatchSet']['number'])
if key in seen:
continue
self.log.debug("Found change %s,%s needs %s from commit" %
(key[0], key[1], change_id))
seen.add(key)
records.append(result)
return records
def _updateChange(self, change, history=None):
self.log.info("Updating information for %s,%s" %
(change.number, change.patchset))
data = self.query(change.number)
change._data = data
if change.patchset is None:
change.patchset = data['currentPatchSet']['number']
if 'project' not in data:
raise exceptions.ChangeNotFound(change.number, change.patchset)
change.project = self.getProject(data['project'])
change.branch = data['branch']
change.url = data['url']
max_ps = 0
files = []
for ps in data['patchSets']:
if ps['number'] == change.patchset:
change.refspec = ps['ref']
for f in ps.get('files', []):
files.append(f['file'])
if int(ps['number']) > int(max_ps):
max_ps = ps['number']
if max_ps == change.patchset:
change.is_current_patchset = True
else:
change.is_current_patchset = False
change.files = files
change.is_merged = self._isMerged(change)
change.approvals = data['currentPatchSet'].get('approvals', [])
change.open = data['open']
change.status = data['status']
change.owner = data['owner']
if change.is_merged:
# This change is merged, so we don't need to look any further
# for dependencies.
return change
if history is None:
history = []
else:
history = history[:]
history.append(change.number)
needs_changes = []
if 'dependsOn' in data:
parts = data['dependsOn'][0]['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
if dep_num in history:
raise Exception("Dependency cycle detected: %s in %s" % (
dep_num, history))
self.log.debug("Getting git-dependent change %s,%s" %
(dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history)
if (not dep.is_merged) and dep not in needs_changes:
needs_changes.append(dep)
for record in self._getDependsOnFromCommit(data['commitMessage']):
dep_num = record['number']
dep_ps = record['currentPatchSet']['number']
if dep_num in history:
raise Exception("Dependency cycle detected: %s in %s" % (
dep_num, history))
self.log.debug("Getting commit-dependent change %s,%s" %
(dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history)
if (not dep.is_merged) and dep not in needs_changes:
needs_changes.append(dep)
change.needs_changes = needs_changes
needed_by_changes = []
if 'neededBy' in data:
for needed in data['neededBy']:
parts = needed['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = self._getChange(dep_num, dep_ps)
if (not dep.is_merged) and dep.is_current_patchset:
needed_by_changes.append(dep)
for record in self._getNeededByFromCommit(data['id']):
dep_num = record['number']
dep_ps = record['currentPatchSet']['number']
self.log.debug("Getting commit-needed change %s,%s" %
(dep_num, dep_ps))
# Because a commit needed-by may be a cross-repo
# dependency, cause that change to refresh so that it will
# reference the latest patchset of its Depends-On (this
# change).
dep = self._getChange(dep_num, dep_ps, refresh=True)
if (not dep.is_merged) and dep.is_current_patchset:
needed_by_changes.append(dep)
change.needed_by_changes = needed_by_changes
return change
def isMerged(self, change, head=None):
self.log.debug("Checking if change %s is merged" % change)
if not change.number:
self.log.debug("Change has no number; considering it merged")
# Good question. It's probably ref-updated, which, ah,
# means it's merged.
return True
data = self.query(change.number)
change._data = data
change.is_merged = self._isMerged(change)
if not head:
return change.is_merged
if not change.is_merged:
return False
ref = 'refs/heads/' + change.branch
self.log.debug("Waiting for %s to appear in git repo" % (change))
if self._waitForRefSha(change.project, ref, change._ref_sha):
self.log.debug("Change %s is in the git repo" %
(change))
return True
self.log.debug("Change %s did not appear in the git repo" %
(change))
return False
def _isMerged(self, change):
data = change._data
if not data:
return False
status = data.get('status')
if not status:
return False
self.log.debug("Change %s status: %s" % (change, status))
if status == 'MERGED':
return True
return False
def _waitForRefSha(self, project, ref, old_sha=''):
# Wait for the ref to show up in the repo
start = time.time()
while time.time() - start < self.replication_timeout:
sha = self.getRefSha(project.name, ref)
if old_sha != sha:
return True
time.sleep(self.replication_retry_interval)
return False
def getRefSha(self, project, ref):
refs = {}
try:
refs = self.getInfoRefs(project)
except:
self.log.exception("Exception looking for ref %s" %
ref)
sha = refs.get(ref, '')
return sha
def canMerge(self, change, allow_needs):
if not change.number:
self.log.debug("Change has no number; considering it merged")
# Good question. It's probably ref-updated, which, ah,
# means it's merged.
return True
data = change._data
if not data:
return False
if 'submitRecords' not in data:
return False
try:
for sr in data['submitRecords']:
if sr['status'] == 'OK':
return True
elif sr['status'] == 'NOT_READY':
for label in sr['labels']:
if label['status'] in ['OK', 'MAY']:
continue
elif label['status'] in ['NEED', 'REJECT']:
# It may be our own rejection, so we ignore
if label['label'].lower() not in allow_needs:
return False
continue
else:
# IMPOSSIBLE
return False
else:
# CLOSED, RULE_ERROR
return False
except:
self.log.exception("Exception determining whether change"
"%s can merge:" % change)
return False
return True
def getProjectOpenChanges(self, project):
# This is a best-effort function in case Gerrit is unable to return
# a particular change. It happens.
query = "project:%s status:open" % (project.name,)
self.log.debug("Running query %s to get project open changes" %
(query,))
data = self.simpleQuery(query)
changes = []
for record in data:
try:
changes.append(
self._getChange(record['number'],
record['currentPatchSet']['number']))
except Exception:
self.log.exception("Unable to query change %s" %
(record.get('number'),))
return changes
def addEvent(self, data):
return self.event_queue.put((time.time(), data))

View File

@ -13,307 +13,33 @@
# under the License.
import logging
import re
import time
from zuul import exceptions
from zuul.model import Change, Ref, NullChange
from zuul.source import BaseSource
class GerritSource(BaseSource):
name = 'gerrit'
log = logging.getLogger("zuul.source.Gerrit")
replication_timeout = 300
replication_retry_interval = 5
depends_on_re = re.compile(r"^Depends-On: (I[0-9a-f]{40})\s*$",
re.MULTILINE | re.IGNORECASE)
def getRefSha(self, project, ref):
refs = {}
try:
refs = self.connection.getInfoRefs(project)
except:
self.log.exception("Exception looking for ref %s" %
ref)
sha = refs.get(ref, '')
return sha
def _waitForRefSha(self, project, ref, old_sha=''):
# Wait for the ref to show up in the repo
start = time.time()
while time.time() - start < self.replication_timeout:
sha = self.getRefSha(project.name, ref)
if old_sha != sha:
return True
time.sleep(self.replication_retry_interval)
return False
return self.connection.getRefSha(project, ref)
def isMerged(self, change, head=None):
self.log.debug("Checking if change %s is merged" % change)
if not change.number:
self.log.debug("Change has no number; considering it merged")
# Good question. It's probably ref-updated, which, ah,
# means it's merged.
return True
data = self.connection.query(change.number)
change._data = data
change.is_merged = self._isMerged(change)
if not head:
return change.is_merged
if not change.is_merged:
return False
ref = 'refs/heads/' + change.branch
self.log.debug("Waiting for %s to appear in git repo" % (change))
if self._waitForRefSha(change.project, ref, change._ref_sha):
self.log.debug("Change %s is in the git repo" %
(change))
return True
self.log.debug("Change %s did not appear in the git repo" %
(change))
return False
def _isMerged(self, change):
data = change._data
if not data:
return False
status = data.get('status')
if not status:
return False
self.log.debug("Change %s status: %s" % (change, status))
if status == 'MERGED':
return True
return False
return self.connection.isMerged(change, head)
def canMerge(self, change, allow_needs):
if not change.number:
self.log.debug("Change has no number; considering it merged")
# Good question. It's probably ref-updated, which, ah,
# means it's merged.
return True
data = change._data
if not data:
return False
if 'submitRecords' not in data:
return False
try:
for sr in data['submitRecords']:
if sr['status'] == 'OK':
return True
elif sr['status'] == 'NOT_READY':
for label in sr['labels']:
if label['status'] in ['OK', 'MAY']:
continue
elif label['status'] in ['NEED', 'REJECT']:
# It may be our own rejection, so we ignore
if label['label'].lower() not in allow_needs:
return False
continue
else:
# IMPOSSIBLE
return False
else:
# CLOSED, RULE_ERROR
return False
except:
self.log.exception("Exception determining whether change"
"%s can merge:" % change)
return False
return True
return self.connection.canMerge(change, allow_needs)
def postConfig(self):
pass
def getChange(self, event):
return self.connection.getChange(event)
def getProject(self, name):
return self.connection.getProject(name)
def getChange(self, event):
if event.change_number:
refresh = False
change = self._getChange(event.change_number, event.patch_number,
refresh=refresh)
elif event.ref:
project = self.getProject(event.project_name)
change = Ref(project)
change.ref = event.ref
change.oldrev = event.oldrev
change.newrev = event.newrev
change.url = self._getGitwebUrl(project, sha=event.newrev)
else:
# TODOv3(jeblair): we need to get the project from the event
change = NullChange(project)
return change
def _getChange(self, number, patchset, refresh=False, history=None):
key = '%s,%s' % (number, patchset)
change = self.connection.getCachedChange(key)
if change and not refresh:
return change
if not change:
change = Change(None)
change.number = number
change.patchset = patchset
key = '%s,%s' % (change.number, change.patchset)
self.connection.updateChangeCache(key, change)
try:
self._updateChange(change, history)
except Exception:
self.connection.deleteCachedChange(key)
raise
return change
def getProjectOpenChanges(self, project):
# This is a best-effort function in case Gerrit is unable to return
# a particular change. It happens.
query = "project:%s status:open" % (project.name,)
self.log.debug("Running query %s to get project open changes" %
(query,))
data = self.connection.simpleQuery(query)
changes = []
for record in data:
try:
changes.append(
self._getChange(record['number'],
record['currentPatchSet']['number']))
except Exception:
self.log.exception("Unable to query change %s" %
(record.get('number'),))
return changes
def _getDependsOnFromCommit(self, message):
records = []
seen = set()
for match in self.depends_on_re.findall(message):
if match in seen:
self.log.debug("Ignoring duplicate Depends-On: %s" %
(match,))
continue
seen.add(match)
query = "change:%s" % (match,)
self.log.debug("Running query %s to find needed changes" %
(query,))
records.extend(self.connection.simpleQuery(query))
return records
def _getNeededByFromCommit(self, change_id):
records = []
seen = set()
query = 'message:%s' % change_id
self.log.debug("Running query %s to find changes needed-by" %
(query,))
results = self.connection.simpleQuery(query)
for result in results:
for match in self.depends_on_re.findall(
result['commitMessage']):
if match != change_id:
continue
key = (result['number'], result['currentPatchSet']['number'])
if key in seen:
continue
self.log.debug("Found change %s,%s needs %s from commit" %
(key[0], key[1], change_id))
seen.add(key)
records.append(result)
return records
def _updateChange(self, change, history=None):
self.log.info("Updating information for %s,%s" %
(change.number, change.patchset))
data = self.connection.query(change.number)
change._data = data
if change.patchset is None:
change.patchset = data['currentPatchSet']['number']
if 'project' not in data:
raise exceptions.ChangeNotFound(change.number, change.patchset)
change.project = self.getProject(data['project'])
change.branch = data['branch']
change.url = data['url']
max_ps = 0
files = []
for ps in data['patchSets']:
if ps['number'] == change.patchset:
change.refspec = ps['ref']
for f in ps.get('files', []):
files.append(f['file'])
if int(ps['number']) > int(max_ps):
max_ps = ps['number']
if max_ps == change.patchset:
change.is_current_patchset = True
else:
change.is_current_patchset = False
change.files = files
change.is_merged = self._isMerged(change)
change.approvals = data['currentPatchSet'].get('approvals', [])
change.open = data['open']
change.status = data['status']
change.owner = data['owner']
if change.is_merged:
# This change is merged, so we don't need to look any further
# for dependencies.
return change
if history is None:
history = []
else:
history = history[:]
history.append(change.number)
needs_changes = []
if 'dependsOn' in data:
parts = data['dependsOn'][0]['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
if dep_num in history:
raise Exception("Dependency cycle detected: %s in %s" % (
dep_num, history))
self.log.debug("Getting git-dependent change %s,%s" %
(dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history)
if (not dep.is_merged) and dep not in needs_changes:
needs_changes.append(dep)
for record in self._getDependsOnFromCommit(data['commitMessage']):
dep_num = record['number']
dep_ps = record['currentPatchSet']['number']
if dep_num in history:
raise Exception("Dependency cycle detected: %s in %s" % (
dep_num, history))
self.log.debug("Getting commit-dependent change %s,%s" %
(dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history)
if (not dep.is_merged) and dep not in needs_changes:
needs_changes.append(dep)
change.needs_changes = needs_changes
needed_by_changes = []
if 'neededBy' in data:
for needed in data['neededBy']:
parts = needed['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = self._getChange(dep_num, dep_ps)
if (not dep.is_merged) and dep.is_current_patchset:
needed_by_changes.append(dep)
for record in self._getNeededByFromCommit(data['id']):
dep_num = record['number']
dep_ps = record['currentPatchSet']['number']
self.log.debug("Getting commit-needed change %s,%s" %
(dep_num, dep_ps))
# Because a commit needed-by may be a cross-repo
# dependency, cause that change to refresh so that it will
# reference the latest patchset of its Depends-On (this
# change).
dep = self._getChange(dep_num, dep_ps, refresh=True)
if (not dep.is_merged) and dep.is_current_patchset:
needed_by_changes.append(dep)
change.needed_by_changes = needed_by_changes
return change
return self.connection.getProjectOpenChanges(project)
def getGitUrl(self, project):
return self.connection.getGitUrl(project)