Annotate gerrit event logs
Gerrit events don't have a uuid so generate one when we receive it and use it to annotate gerrit logs. Change-Id: I0bd6256064eab390eb1595b8d11e1906512fc0c1
This commit is contained in:
parent
f0479f5606
commit
2e08cd812e
|
@ -78,6 +78,7 @@ import zuul.zk
|
|||
import zuul.configloader
|
||||
from zuul.exceptions import MergeFailure
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.logutil import get_annotated_logger
|
||||
|
||||
FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
|
||||
'fixtures')
|
||||
|
@ -727,12 +728,12 @@ class FakeGerritConnection(gerritconnection.GerritConnection):
|
|||
if message:
|
||||
change.setReported()
|
||||
|
||||
def query(self, number):
|
||||
def query(self, number, event=None):
|
||||
if type(number) == int:
|
||||
return self.queryChange(number)
|
||||
return self.queryChange(number, event=event)
|
||||
raise Exception("Could not query %s %s" % (type(number, number)))
|
||||
|
||||
def queryChange(self, number):
|
||||
def queryChange(self, number, event=None):
|
||||
change = self.changes.get(int(number))
|
||||
if change:
|
||||
return change.query()
|
||||
|
@ -761,8 +762,9 @@ class FakeGerritConnection(gerritconnection.GerritConnection):
|
|||
l = [change.query() for change in self.changes.values()]
|
||||
return l
|
||||
|
||||
def simpleQuery(self, query):
|
||||
self.log.debug("simpleQuery: %s" % query)
|
||||
def simpleQuery(self, query, event=None):
|
||||
log = get_annotated_logger(self.log, event)
|
||||
log.debug("simpleQuery: %s", query)
|
||||
self.queries.append(query)
|
||||
results = []
|
||||
if query.startswith('(') and 'OR' in query:
|
||||
|
|
|
@ -29,8 +29,10 @@ import urllib.parse
|
|||
import requests
|
||||
|
||||
from typing import Dict, List
|
||||
from uuid import uuid4
|
||||
|
||||
from zuul.connection import BaseConnection
|
||||
from zuul.lib.logutil import get_annotated_logger
|
||||
from zuul.model import Ref, Tag, Branch, Project
|
||||
from zuul import exceptions
|
||||
from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent
|
||||
|
@ -70,6 +72,12 @@ class GerritEventConnector(threading.Thread):
|
|||
now = time.time()
|
||||
time.sleep(max((ts + self.delay) - now, 0.0))
|
||||
event = GerritTriggerEvent()
|
||||
|
||||
# Gerrit events don't have an event id that could be used to globally
|
||||
# identify this event in the system so we have to generate one.
|
||||
event.zuul_event_id = str(uuid4().hex)
|
||||
log = get_annotated_logger(self.log, event)
|
||||
|
||||
event.type = data.get('type')
|
||||
# This catches when a change is merged, as it could potentially
|
||||
# have merged layout info which will need to be read in.
|
||||
|
@ -129,9 +137,9 @@ class GerritEventConnector(threading.Thread):
|
|||
if field:
|
||||
event.account = data.get(accountfield_from_type[event.type])
|
||||
else:
|
||||
self.log.warning("Received unrecognized event type '%s' "
|
||||
"from Gerrit. Can not get account information." %
|
||||
(event.type,))
|
||||
log.warning("Received unrecognized event type '%s' "
|
||||
"from Gerrit. Can not get account information." %
|
||||
(event.type,))
|
||||
|
||||
# This checks whether the event created or deleted a branch so
|
||||
# that Zuul may know to perform a reconfiguration on the
|
||||
|
@ -184,7 +192,7 @@ class GerritEventConnector(threading.Thread):
|
|||
# gerrit multiple times.
|
||||
self.connection._getChange(event.change_number,
|
||||
event.patch_number,
|
||||
refresh=True)
|
||||
refresh=True, event=event)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
|
@ -464,7 +472,8 @@ class GerritConnection(BaseConnection):
|
|||
change = None
|
||||
return change
|
||||
|
||||
def _getChange(self, number, patchset, refresh=False, history=None):
|
||||
def _getChange(self, number, patchset, refresh=False, history=None,
|
||||
event=None):
|
||||
# Ensure number and patchset are str
|
||||
number = str(number)
|
||||
patchset = str(patchset)
|
||||
|
@ -478,7 +487,7 @@ class GerritConnection(BaseConnection):
|
|||
self._change_cache.setdefault(change.number, {})
|
||||
self._change_cache[change.number][change.patchset] = change
|
||||
try:
|
||||
self._updateChange(change, history)
|
||||
self._updateChange(change, event, history)
|
||||
except Exception:
|
||||
if self._change_cache.get(change.number, {}).get(change.patchset):
|
||||
del self._change_cache[change.number][change.patchset]
|
||||
|
@ -487,30 +496,29 @@ class GerritConnection(BaseConnection):
|
|||
raise
|
||||
return change
|
||||
|
||||
def _getDependsOnFromCommit(self, message, change):
|
||||
def _getDependsOnFromCommit(self, message, change, event):
|
||||
log = get_annotated_logger(self.log, event)
|
||||
records = []
|
||||
seen = set()
|
||||
for match in self.depends_on_re.findall(message):
|
||||
if match in seen:
|
||||
self.log.debug("Ignoring duplicate Depends-On: %s" %
|
||||
(match,))
|
||||
log.debug("Ignoring duplicate Depends-On: %s", match)
|
||||
continue
|
||||
seen.add(match)
|
||||
query = "change:%s" % (match,)
|
||||
self.log.debug("Updating %s: Running query %s "
|
||||
"to find needed changes" %
|
||||
(change, query,))
|
||||
records.extend(self.simpleQuery(query))
|
||||
log.debug("Updating %s: Running query %s to find needed changes",
|
||||
change, query)
|
||||
records.extend(self.simpleQuery(query, event=event))
|
||||
return records
|
||||
|
||||
def _getNeededByFromCommit(self, change_id, change):
|
||||
def _getNeededByFromCommit(self, change_id, change, event):
|
||||
log = get_annotated_logger(self.log, event)
|
||||
records = []
|
||||
seen = set()
|
||||
query = 'message:{%s}' % change_id
|
||||
self.log.debug("Updating %s: Running query %s "
|
||||
"to find changes needed-by" %
|
||||
(change, query,))
|
||||
results = self.simpleQuery(query)
|
||||
log.debug("Updating %s: Running query %s to find changes needed-by",
|
||||
change, query)
|
||||
results = self.simpleQuery(query, event=event)
|
||||
for result in results:
|
||||
for match in self.depends_on_re.findall(
|
||||
result['commitMessage']):
|
||||
|
@ -520,14 +528,15 @@ class GerritConnection(BaseConnection):
|
|||
str(result['currentPatchSet']['number']))
|
||||
if key in seen:
|
||||
continue
|
||||
self.log.debug("Updating %s: Found change %s,%s "
|
||||
"needs %s from commit" %
|
||||
(change, key[0], key[1], change_id))
|
||||
log.debug("Updating %s: Found change %s,%s "
|
||||
"needs %s from commit",
|
||||
change, key[0], key[1], change_id)
|
||||
seen.add(key)
|
||||
records.append(result)
|
||||
return records
|
||||
|
||||
def _updateChange(self, change, history=None):
|
||||
def _updateChange(self, change, event, history):
|
||||
log = get_annotated_logger(self.log, event)
|
||||
|
||||
# In case this change is already in the history we have a
|
||||
# cyclic dependency and don't need to update ourselves again
|
||||
|
@ -547,11 +556,11 @@ class GerritConnection(BaseConnection):
|
|||
# no patchsets before adding the entry to the change cache.
|
||||
if (history and change.number and change.patchset and
|
||||
(change.number, change.patchset) in history):
|
||||
self.log.debug("Change %s is in history" % (change,))
|
||||
log.debug("Change %s is in history", change)
|
||||
return change
|
||||
|
||||
self.log.info("Updating %s" % (change,))
|
||||
data = self.queryChange(change.number)
|
||||
log.info("Updating %s", change)
|
||||
data = self.queryChange(change.number, event=event)
|
||||
change._data = data
|
||||
|
||||
if change.patchset is None:
|
||||
|
@ -597,7 +606,7 @@ class GerritConnection(BaseConnection):
|
|||
if change.is_merged:
|
||||
# This change is merged, so we don't need to look any further
|
||||
# for dependencies.
|
||||
self.log.debug("Updating %s: change is merged" % (change,))
|
||||
log.debug("Updating %s: change is merged", change)
|
||||
return change
|
||||
|
||||
if history is None:
|
||||
|
@ -611,9 +620,10 @@ class GerritConnection(BaseConnection):
|
|||
if 'dependsOn' in data:
|
||||
parts = data['dependsOn'][0]['ref'].split('/')
|
||||
dep_num, dep_ps = parts[3], parts[4]
|
||||
self.log.debug("Updating %s: Getting git-dependent change %s,%s" %
|
||||
(change, dep_num, dep_ps))
|
||||
dep = self._getChange(dep_num, dep_ps, history=history)
|
||||
log.debug("Updating %s: Getting git-dependent change %s,%s",
|
||||
change, dep_num, dep_ps)
|
||||
dep = self._getChange(dep_num, dep_ps, history=history,
|
||||
event=event)
|
||||
# This is a git commit dependency. So we only ignore it if it is
|
||||
# already merged. So even if it is "ABANDONED", we should not
|
||||
# ignore it.
|
||||
|
@ -624,13 +634,13 @@ class GerritConnection(BaseConnection):
|
|||
|
||||
compat_needs_changes = []
|
||||
for record in self._getDependsOnFromCommit(data['commitMessage'],
|
||||
change):
|
||||
change, event):
|
||||
dep_num = str(record['number'])
|
||||
dep_ps = str(record['currentPatchSet']['number'])
|
||||
self.log.debug("Updating %s: Getting commit-dependent "
|
||||
"change %s,%s" %
|
||||
(change, dep_num, dep_ps))
|
||||
dep = self._getChange(dep_num, dep_ps, history=history)
|
||||
log.debug("Updating %s: Getting commit-dependent "
|
||||
"change %s,%s", change, dep_num, dep_ps)
|
||||
dep = self._getChange(dep_num, dep_ps, history=history,
|
||||
event=event)
|
||||
if dep.open and dep not in needs_changes:
|
||||
compat_needs_changes.append(dep)
|
||||
needs_changes.add(dep)
|
||||
|
@ -642,9 +652,10 @@ class GerritConnection(BaseConnection):
|
|||
for needed in data['neededBy']:
|
||||
parts = needed['ref'].split('/')
|
||||
dep_num, dep_ps = parts[3], parts[4]
|
||||
self.log.debug("Updating %s: Getting git-needed change %s,%s" %
|
||||
(change, dep_num, dep_ps))
|
||||
dep = self._getChange(dep_num, dep_ps, history=history)
|
||||
log.debug("Updating %s: Getting git-needed change %s,%s",
|
||||
change, dep_num, dep_ps)
|
||||
dep = self._getChange(dep_num, dep_ps, history=history,
|
||||
event=event)
|
||||
if (dep.open and dep.is_current_patchset and
|
||||
dep not in needed_by_changes):
|
||||
git_needed_by_changes.append(dep)
|
||||
|
@ -652,11 +663,11 @@ class GerritConnection(BaseConnection):
|
|||
change.git_needed_by_changes = git_needed_by_changes
|
||||
|
||||
compat_needed_by_changes = []
|
||||
for record in self._getNeededByFromCommit(data['id'], change):
|
||||
for record in self._getNeededByFromCommit(data['id'], change, event):
|
||||
dep_num = str(record['number'])
|
||||
dep_ps = str(record['currentPatchSet']['number'])
|
||||
self.log.debug("Updating %s: Getting commit-needed change %s,%s" %
|
||||
(change, dep_num, dep_ps))
|
||||
log.debug("Updating %s: Getting commit-needed change %s,%s",
|
||||
change, dep_num, dep_ps)
|
||||
# Because a commit needed-by may be a cross-repo
|
||||
# dependency, cause that change to refresh so that it will
|
||||
# reference the latest patchset of its Depends-On (this
|
||||
|
@ -664,7 +675,7 @@ class GerritConnection(BaseConnection):
|
|||
# refreshed this change so refresh is not needed in this case.
|
||||
refresh = (dep_num, dep_ps) not in history
|
||||
dep = self._getChange(
|
||||
dep_num, dep_ps, refresh=refresh, history=history)
|
||||
dep_num, dep_ps, refresh=refresh, history=history, event=event)
|
||||
if (dep.open and dep.is_current_patchset
|
||||
and dep not in needed_by_changes):
|
||||
compat_needed_by_changes.append(dep)
|
||||
|
@ -886,7 +897,7 @@ class GerritConnection(BaseConnection):
|
|||
"Error submitting data to gerrit, attempt %s", x)
|
||||
time.sleep(x * 10)
|
||||
|
||||
def query(self, query):
|
||||
def query(self, query, event=None):
|
||||
args = '--all-approvals --comments --commit-message'
|
||||
args += ' --current-patch-set --dependencies --files'
|
||||
args += ' --patch-sets --submit-records'
|
||||
|
@ -901,15 +912,16 @@ class GerritConnection(BaseConnection):
|
|||
data = json.loads(lines[0])
|
||||
if not data:
|
||||
return False
|
||||
self.iolog.debug("Received data from Gerrit query: \n%s" %
|
||||
(pprint.pformat(data)))
|
||||
iolog = get_annotated_logger(self.iolog, event)
|
||||
iolog.debug("Received data from Gerrit query: \n%s",
|
||||
pprint.pformat(data))
|
||||
return data
|
||||
|
||||
def queryChange(self, number):
|
||||
return self.query('change:%s' % number)
|
||||
def queryChange(self, number, event=None):
|
||||
return self.query('change:%s' % number, event=event)
|
||||
|
||||
def simpleQuery(self, query):
|
||||
def _query_chunk(query):
|
||||
def simpleQuery(self, query, event=None):
|
||||
def _query_chunk(query, event):
|
||||
args = '--commit-message --current-patch-set'
|
||||
|
||||
cmd = 'gerrit query --format json %s %s' % (
|
||||
|
@ -935,14 +947,15 @@ class GerritConnection(BaseConnection):
|
|||
|
||||
if not data:
|
||||
return False, more_changes
|
||||
self.iolog.debug("Received data from Gerrit query: \n%s" %
|
||||
(pprint.pformat(data)))
|
||||
iolog = get_annotated_logger(self.iolog, event)
|
||||
iolog.debug("Received data from Gerrit query: \n%s",
|
||||
pprint.pformat(data))
|
||||
return data, more_changes
|
||||
|
||||
# gerrit returns 500 results by default, so implement paging
|
||||
# for large projects like nova
|
||||
alldata = []
|
||||
chunk, more_changes = _query_chunk(query)
|
||||
chunk, more_changes = _query_chunk(query, event)
|
||||
while(chunk):
|
||||
alldata.extend(chunk)
|
||||
if more_changes is None:
|
||||
|
@ -955,7 +968,8 @@ class GerritConnection(BaseConnection):
|
|||
# no more changes
|
||||
break
|
||||
|
||||
chunk, more_changes = _query_chunk("%s %s" % (query, resume))
|
||||
chunk, more_changes = _query_chunk(
|
||||
"%s %s" % (query, resume), event)
|
||||
return alldata
|
||||
|
||||
def _uploadPack(self, project: Project) -> str:
|
||||
|
|
Loading…
Reference in New Issue