Annotate logs around reporting

It's useful to annotate the logs around reporting with the event id
that caused the action.

Change-Id: I282c28fb0156070696f3d231a2a28f8f62deffca
This commit is contained in:
Tobias Henkel
2019-05-26 19:37:50 +02:00
parent a703f42c24
commit 1078d767ac
11 changed files with 138 additions and 106 deletions

View File

@@ -688,7 +688,8 @@ class FakeGerritConnection(gerritconnection.GerritConnection):
}
return event
def review(self, change, message, action, file_comments):
def review(self, change, message, action={}, file_comments={},
zuul_event_id=None):
if self.web_server:
return super(FakeGerritConnection, self).review(
change, message, action, file_comments)
@@ -1263,13 +1264,14 @@ class FakeGithubConnection(githubconnection.GithubConnection):
def real_getGitUrl(self, project):
return super(FakeGithubConnection, self).getGitUrl(project)
def commentPull(self, project, pr_number, message):
def commentPull(self, project, pr_number, message, zuul_event_id=None):
# record that this got reported
self.reports.append((project, pr_number, 'comment'))
pull_request = self.pull_requests[int(pr_number)]
pull_request.addComment(message)
def mergePull(self, project, pr_number, commit_message='', sha=None):
def mergePull(self, project, pr_number, commit_message='', sha=None,
zuul_event_id=None):
# record that this got reported
self.reports.append((project, pr_number, 'merge'))
pull_request = self.pull_requests[int(pr_number)]
@@ -1282,20 +1284,20 @@ class FakeGithubConnection(githubconnection.GithubConnection):
pull_request.setMerged(commit_message)
def setCommitStatus(self, project, sha, state, url='', description='',
context='default', user='zuul'):
context='default', user='zuul', zuul_event_id=None):
# record that this got reported and call original method
self.reports.append((project, sha, 'status', (user, context, state)))
super(FakeGithubConnection, self).setCommitStatus(
project, sha, state,
url=url, description=description, context=context)
def labelPull(self, project, pr_number, label):
def labelPull(self, project, pr_number, label, zuul_event_id=None):
# record that this got reported
self.reports.append((project, pr_number, 'label', label))
pull_request = self.pull_requests[int(pr_number)]
pull_request.addLabel(label)
def unlabelPull(self, project, pr_number, label):
def unlabelPull(self, project, pr_number, label, zuul_event_id=None):
# record that this got reported
self.reports.append((project, pr_number, 'unlabel', label))
pull_request = self.pull_requests[pr_number]
@@ -2705,7 +2707,7 @@ class ZuulTestCase(BaseTestCase):
# Set up mqtt related fakes
self.mqtt_messages = []
def fakeMQTTPublish(_, topic, msg, qos):
def fakeMQTTPublish(_, topic, msg, qos, zuul_event_id):
log = logging.getLogger('zuul.FakeMQTTPubish')
log.info('Publishing message via mqtt')
self.mqtt_messages.append({'topic': topic, 'msg': msg, 'qos': qos})

View File

@@ -835,15 +835,16 @@ class GerritConnection(BaseConnection):
self.event_queue.task_done()
def review(self, change, message, action={},
file_comments={}):
file_comments={}, zuul_event_id=None):
if self.session:
meth = self.review_http
else:
meth = self.review_ssh
return meth(change, message, action, file_comments)
return meth(change, message, action=action,
file_comments=file_comments, zuul_event_id=zuul_event_id)
def review_ssh(self, change, message, action={},
file_comments={}):
file_comments={}, zuul_event_id=None):
project = change.project.name
cmd = 'gerrit review --project %s' % project
if message:
@@ -855,11 +856,12 @@ class GerritConnection(BaseConnection):
cmd += ' --label %s=%s' % (key, val)
changeid = '%s,%s' % (change.number, change.patchset)
cmd += ' %s' % changeid
out, err = self._ssh(cmd)
out, err = self._ssh(cmd, zuul_event_id=zuul_event_id)
return err
def review_http(self, change, message, action={},
file_comments={}):
file_comments={}, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
data = dict(message=message,
strict_labels=False)
submit = False
@@ -886,15 +888,13 @@ class GerritConnection(BaseConnection):
data)
break
except Exception:
self.log.exception(
"Error submitting data to gerrit, attempt %s", x)
log.exception("Error submitting data to gerrit, attempt %s", x)
time.sleep(x * 10)
if change.is_current_patchset and submit:
try:
self.post('changes/%s/submit' % (changeid,), {})
except Exception:
self.log.exception(
"Error submitting data to gerrit, attempt %s", x)
log.exception("Error submitting data to gerrit, attempt %s", x)
time.sleep(x * 10)
def query(self, query, event=None):
@@ -998,12 +998,13 @@ class GerritConnection(BaseConnection):
self.client = None
raise
def _ssh(self, command, stdin_data=None):
def _ssh(self, command, stdin_data=None, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
if not self.client:
self._open()
try:
self.log.debug("SSH command:\n%s" % command)
log.debug("SSH command:\n%s", command)
stdin, stdout, stderr = self.client.exec_command(command)
except Exception:
self._open()
@@ -1016,14 +1017,14 @@ class GerritConnection(BaseConnection):
self.iolog.debug("SSH received stdout:\n%s" % out)
ret = stdout.channel.recv_exit_status()
self.log.debug("SSH exit status: %s" % ret)
log.debug("SSH exit status: %s", ret)
err = stderr.read().decode('utf-8')
if err.strip():
self.log.debug("SSH received stderr:\n%s" % err)
log.debug("SSH received stderr:\n%s", err)
if ret:
self.log.debug("SSH received stdout:\n%s" % out)
log.debug("SSH received stdout:\n%s", out)
raise Exception("Gerrit error executing %s" % command)
return (out, err)

View File

@@ -16,6 +16,7 @@ import logging
import voluptuous as v
from zuul.driver.gerrit.gerritsource import GerritSource
from zuul.lib.logutil import get_annotated_logger
from zuul.reporter import BaseReporter
@@ -39,6 +40,7 @@ class GerritReporter(BaseReporter):
def report(self, item):
"""Send a message to gerrit."""
log = get_annotated_logger(self.log, item.event)
# If the source is no GerritSource we cannot report anything here.
if not isinstance(item.change.project.source, GerritSource):
@@ -54,14 +56,13 @@ class GerritReporter(BaseReporter):
self.filterComments(item, comments)
message = self._formatItemReport(item)
self.log.debug("Report change %s, params %s,"
" message: %s, comments: %s" %
(item.change, self.config, message, comments))
log.debug("Report change %s, params %s, message: %s, comments: %s",
item.change, self.config, message, comments)
item.change._ref_sha = item.change.project.source.getRefSha(
item.change.project, 'refs/heads/' + item.change.branch)
return self.connection.review(item.change, message, self.config,
comments)
comments, zuul_event_id=item.event)
def getSubmitAllowNeeds(self):
"""Get a list of code review labels that are allowed to be

View File

@@ -1409,17 +1409,20 @@ class GithubConnection(BaseConnection):
# get permissions from the data
return perms.json().get('permission', 'none')
def commentPull(self, project, pr_number, message):
github = self.getGithubClient(project)
def commentPull(self, project, pr_number, message, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
github = self.getGithubClient(project, zuul_event_id=zuul_event_id)
owner, proj = project.split('/')
repository = github.repository(owner, proj)
pull_request = repository.issue(pr_number)
pull_request.create_comment(message)
self.log.debug("Commented on PR %s/%s#%s", owner, proj, pr_number)
log.debug("Commented on PR %s/%s#%s", owner, proj, pr_number)
self.log_rate_limit(self.log, github)
def mergePull(self, project, pr_number, commit_message='', sha=None):
github = self.getGithubClient(project)
def mergePull(self, project, pr_number, commit_message='', sha=None,
zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
github = self.getGithubClient(project, zuul_event_id=zuul_event_id)
owner, proj = project.split('/')
pull_request = github.pull_request(owner, proj, pr_number)
try:
@@ -1428,7 +1431,7 @@ class GithubConnection(BaseConnection):
raise MergeFailure('Merge was not successful due to mergeability'
' conflict, original error is %s' % e)
self.log.debug("Merged PR %s/%s#%s", owner, proj, pr_number)
log.debug("Merged PR %s/%s#%s", owner, proj, pr_number)
self.log_rate_limit(self.log, github)
if not result:
raise Exception('Pull request was not merged')
@@ -1461,37 +1464,41 @@ class GithubConnection(BaseConnection):
return statuses
def setCommitStatus(self, project, sha, state, url='', description='',
context=''):
github = self.getGithubClient(project)
context='', zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
github = self.getGithubClient(project, zuul_event_id=zuul_event_id)
owner, proj = project.split('/')
repository = github.repository(owner, proj)
repository.create_status(sha, state, url, description, context)
self.log.debug("Set commit status to %s for sha %s on %s",
state, sha, project)
log.debug("Set commit status to %s for sha %s on %s",
state, sha, project)
self.log_rate_limit(self.log, github)
def reviewPull(self, project, pr_number, sha, review, body):
github = self.getGithubClient(project)
def reviewPull(self, project, pr_number, sha, review, body,
zuul_event_id=None):
github = self.getGithubClient(project, zuul_event_id=zuul_event_id)
owner, proj = project.split('/')
pull_request = github.pull_request(owner, proj, pr_number)
event = review.replace('-', '_')
event = event.upper()
pull_request.create_review(body=body, commit_id=sha, event=event)
def labelPull(self, project, pr_number, label):
github = self.getGithubClient(project)
def labelPull(self, project, pr_number, label, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
github = self.getGithubClient(project, zuul_event_id=zuul_event_id)
owner, proj = project.split('/')
pull_request = github.issue(owner, proj, pr_number)
pull_request.add_labels(label)
self.log.debug("Added label %s to %s#%s", label, proj, pr_number)
log.debug("Added label %s to %s#%s", label, proj, pr_number)
self.log_rate_limit(self.log, github)
def unlabelPull(self, project, pr_number, label):
github = self.getGithubClient(project)
def unlabelPull(self, project, pr_number, label, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
github = self.getGithubClient(project, zuul_event_id=zuul_event_id)
owner, proj = project.split('/')
pull_request = github.issue(owner, proj, pr_number)
pull_request.remove_label(label)
self.log.debug("Removed label %s from %s#%s", label, proj, pr_number)
log.debug("Removed label %s from %s#%s", label, proj, pr_number)
self.log_rate_limit(self.log, github)
def getPushedFileNames(self, event):

View File

@@ -16,6 +16,7 @@ import logging
import voluptuous as v
import time
from zuul.lib.logutil import get_annotated_logger
from zuul.reporter import BaseReporter
from zuul.exceptions import MergeFailure
from zuul.driver.util import scalar_or_list
@@ -45,7 +46,6 @@ class GithubReporter(BaseReporter):
def report(self, item):
"""Report on an event."""
# If the source is not GithubSource we cannot report anything here.
if not isinstance(item.change.project.source, GithubSource):
return
@@ -89,15 +89,18 @@ class GithubReporter(BaseReporter):
return ret
def addPullComment(self, item, comment=None):
log = get_annotated_logger(self.log, item.event)
message = comment or self._formatItemReport(item)
project = item.change.project.name
pr_number = item.change.number
self.log.debug(
'Reporting change %s, params %s, message: %s' %
(item.change, self.config, message))
self.connection.commentPull(project, pr_number, message)
log.debug('Reporting change %s, params %s, message: %s',
item.change, self.config, message)
self.connection.commentPull(project, pr_number, message,
zuul_event_id=item.event)
def setCommitStatus(self, item):
log = get_annotated_logger(self.log, item.event)
project = item.change.project.name
if hasattr(item.change, 'patchset'):
sha = item.change.patchset
@@ -122,66 +125,72 @@ class GithubReporter(BaseReporter):
# characters seems to trip the limit.
description = 'status: %s' % self._commit_status
self.log.debug(
log.debug(
'Reporting change %s, params %s, '
'context: %s, state: %s, description: %s, url: %s' %
(item.change, self.config,
self.context, state, description, url))
'context: %s, state: %s, description: %s, url: %s',
item.change, self.config, self.context, state, description, url)
self.connection.setCommitStatus(
project, sha, state, url, description, self.context)
project, sha, state, url, description, self.context,
zuul_event_id=item.event)
def mergePull(self, item):
log = get_annotated_logger(self.log, item.event)
project = item.change.project.name
pr_number = item.change.number
sha = item.change.patchset
self.log.debug('Reporting change %s, params %s, merging via API' %
(item.change, self.config))
log.debug('Reporting change %s, params %s, merging via API',
item.change, self.config)
message = self._formatMergeMessage(item.change)
for i in [1, 2]:
try:
self.connection.mergePull(project, pr_number, message, sha)
self.connection.mergePull(project, pr_number, message, sha,
zuul_event_id=item.event)
item.change.is_merged = True
return
except MergeFailure:
self.log.exception(
'Merge attempt of change %s %s/2 failed.' %
(item.change, i), exc_info=True)
log.exception('Merge attempt of change %s %s/2 failed.',
item.change, i, exc_info=True)
if i == 1:
time.sleep(2)
self.log.warning(
'Merge of change %s failed after 2 attempts, giving up' %
item.change)
log.warning('Merge of change %s failed after 2 attempts, giving up',
item.change)
def addReview(self, item):
log = get_annotated_logger(self.log, item.event)
project = item.change.project.name
pr_number = item.change.number
sha = item.change.patchset
self.log.debug('Reporting change %s, params %s, review:\n%s' %
(item.change, self.config, self._review))
log.debug('Reporting change %s, params %s, review:\n%s',
item.change, self.config, self._review)
self.connection.reviewPull(
project,
pr_number,
sha,
self._review,
self._review_body)
self._review_body,
zuul_event_id=item.event)
for label in self._unlabels:
self.connection.unlabelPull(project, pr_number, label)
self.connection.unlabelPull(project, pr_number, label,
zuul_event_id=item.event)
def setLabels(self, item):
log = get_annotated_logger(self.log, item.event)
project = item.change.project.name
pr_number = item.change.number
if self._labels:
self.log.debug('Reporting change %s, params %s, labels:\n%s' %
(item.change, self.config, self._labels))
log.debug('Reporting change %s, params %s, labels:\n%s',
item.change, self.config, self._labels)
for label in self._labels:
self.connection.labelPull(project, pr_number, label)
self.connection.labelPull(project, pr_number, label,
zuul_event_id=item.event)
if self._unlabels:
self.log.debug('Reporting change %s, params %s, unlabels:\n%s' %
(item.change, self.config, self._unlabels))
log.debug('Reporting change %s, params %s, unlabels:\n%s',
item.change, self.config, self._unlabels)
for label in self._unlabels:
self.connection.unlabelPull(project, pr_number, label)
self.connection.unlabelPull(project, pr_number, label,
zuul_event_id=item.event)
def _formatMergeMessage(self, change):
message = ''

View File

@@ -19,6 +19,7 @@ import paho.mqtt.client as mqtt
from zuul.connection import BaseConnection
from zuul.exceptions import ConfigurationError
from zuul.lib.logutil import get_annotated_logger
class MQTTConnection(BaseConnection):
@@ -80,12 +81,13 @@ class MQTTConnection(BaseConnection):
self.client.disconnect()
self.connected = False
def publish(self, topic, message, qos):
def publish(self, topic, message, qos, zuul_event_id):
log = get_annotated_logger(self.log, zuul_event_id)
if not self.connected:
self.log.warn("MQTT reporter (%s) is disabled" % self)
log.warning("MQTT reporter (%s) is disabled", self)
return
try:
self.client.publish(topic, payload=json.dumps(message), qos=qos)
except Exception:
self.log.exception(
log.exception(
"Could not publish message to topic '%s' via mqtt", topic)

View File

@@ -16,6 +16,7 @@ import logging
import time
import voluptuous as v
from zuul.lib.logutil import get_annotated_logger
from zuul.reporter import BaseReporter
@@ -26,8 +27,8 @@ class MQTTReporter(BaseReporter):
log = logging.getLogger("zuul.MQTTReporter")
def report(self, item):
self.log.debug("Report change %s, params %s" %
(item.change, self.config))
log = get_annotated_logger(self.log, item.event)
log.debug("Report change %s, params %s", item.change, self.config)
message = {
'timestamp': time.time(),
'action': self._action,
@@ -77,11 +78,11 @@ class MQTTReporter(BaseReporter):
patchset=getattr(item.change, 'patchset', None),
ref=getattr(item.change, 'ref', None))
except Exception:
self.log.exception("Error while formatting MQTT topic %s:"
% self.config['topic'])
log.exception("Error while formatting MQTT topic %s:",
self.config['topic'])
if topic is not None:
self.connection.publish(
topic, message, qos=self.config.get('qos', 0))
topic, message, self.config.get('qos', 0), item.event)
def topicValue(value):

View File

@@ -18,6 +18,7 @@ import smtplib
from email.mime.text import MIMEText
from zuul.connection import BaseConnection
from zuul.lib.logutil import get_annotated_logger
class SMTPConnection(BaseConnection):
@@ -43,7 +44,8 @@ class SMTPConnection(BaseConnection):
else:
self.smtp_starttls = True
def sendMail(self, subject, message, from_email=None, to_email=None):
def sendMail(self, subject, message, from_email=None, to_email=None,
zuul_event_id=None):
# Create a text/plain email message
from_email = from_email \
if from_email is not None else self.smtp_default_from
@@ -64,4 +66,5 @@ class SMTPConnection(BaseConnection):
s.sendmail(from_email, to_email.split(','), msg.as_string())
s.quit()
except Exception as e:
self.log.warning("Error sending mail via SMTP: %s", e)
log = get_annotated_logger(self.log, zuul_event_id)
log.warning("Error sending mail via SMTP: %s", e)

View File

@@ -15,6 +15,7 @@
import logging
import voluptuous as v
from zuul.lib.logutil import get_annotated_logger
from zuul.reporter import BaseReporter
@@ -26,10 +27,11 @@ class SMTPReporter(BaseReporter):
def report(self, item):
"""Send the compiled report message via smtp."""
log = get_annotated_logger(self.log, item.event)
message = self._formatItemReport(item)
self.log.debug("Report change %s, params %s, message: %s" %
(item.change, self.config, message))
log.debug("Report change %s, params %s, message: %s",
item.change, self.config, message)
from_email = self.config['from'] \
if 'from' in self.config else None
@@ -42,7 +44,8 @@ class SMTPReporter(BaseReporter):
else:
subject = "Report for change %s" % item.change
self.connection.sendMail(subject, message, from_email, to_email)
self.connection.sendMail(subject, message, from_email, to_email,
zuul_event_id=item.event)
def getSchema():

View File

@@ -18,6 +18,7 @@ import logging
import time
import voluptuous as v
from zuul.lib.logutil import get_annotated_logger
from zuul.reporter import BaseReporter
from zuul.lib.artifacts import get_artifacts_from_result_data
@@ -30,9 +31,10 @@ class SQLReporter(BaseReporter):
def report(self, item):
"""Create an entry into a database."""
log = get_annotated_logger(self.log, item.event)
if not self.connection.tables_established:
self.log.warn("SQL reporter (%s) is disabled " % self)
log.warning("SQL reporter (%s) is disabled ", self)
return
with self.connection.getSession() as db:

View File

@@ -165,6 +165,7 @@ class PipelineManager(object):
Takes the action_reporters, item, message and extra options and
sends them to the pluggable reporters.
"""
log = get_annotated_logger(self.log, item.event)
report_errors = []
if len(action_reporters) > 0:
for reporter in action_reporters:
@@ -174,7 +175,7 @@ class PipelineManager(object):
report_errors.append(ret)
except Exception as e:
item.setReportedResult('ERROR')
self.log.exception("Exception while reporting")
log.exception("Exception while reporting")
report_errors.append(str(e))
return report_errors
@@ -953,6 +954,7 @@ class PipelineManager(object):
request, request.job, build_set.item, request.nodeset)
def reportItem(self, item):
log = get_annotated_logger(self.log, item.event)
if not item.reported:
# _reportItem() returns True if it failed to report.
item.reported = not self._reportItem(item)
@@ -962,28 +964,29 @@ class PipelineManager(object):
source = item.change.project.source
if merged:
merged = source.isMerged(item.change, item.change.branch)
self.log.info("Reported change %s status: all-succeeded: %s, "
"merged: %s" % (item.change, succeeded, merged))
log.info("Reported change %s status: all-succeeded: %s, "
"merged: %s", item.change, succeeded, merged)
change_queue = item.queue
if not (succeeded and merged):
self.log.debug("Reported change %s failed tests or failed "
"to merge" % (item.change))
log.debug("Reported change %s failed tests or failed to merge",
item.change)
change_queue.decreaseWindowSize()
self.log.debug("%s window size decreased to %s" %
(change_queue, change_queue.window))
log.debug("%s window size decreased to %s",
change_queue, change_queue.window)
raise exceptions.MergeFailure(
"Change %s failed to merge" % item.change)
else:
change_queue.increaseWindowSize()
self.log.debug("%s window size increased to %s" %
(change_queue, change_queue.window))
log.debug("%s window size increased to %s",
change_queue, change_queue.window)
zuul_driver = self.sched.connections.drivers['zuul']
tenant = self.pipeline.tenant
zuul_driver.onChangeMerged(tenant, item.change, source)
def _reportItem(self, item):
self.log.debug("Reporting change %s" % item.change)
log = get_annotated_logger(self.log, item.event)
log.debug("Reporting change %s", item.change)
ret = True # Means error as returned by trigger.report
# In the case of failure, we may not hove completed an initial
@@ -999,14 +1002,14 @@ class PipelineManager(object):
try:
ppc = layout.getProjectPipelineConfig(item)
except Exception:
self.log.exception("Invalid config for change %s" % item.change)
log.exception("Invalid config for change %s", item.change)
if not ppc:
self.log.debug("Project %s not in pipeline %s for change %s" % (
item.change.project, self.pipeline, item.change))
log.debug("Project %s not in pipeline %s for change %s",
item.change.project, self.pipeline, item.change)
project_in_pipeline = False
actions = []
elif item.getConfigErrors():
self.log.debug("Invalid config for change %s" % item.change)
log.debug("Invalid config for change %s", item.change)
# TODOv3(jeblair): consider a new reporter action for this
actions = self.pipeline.merge_failure_actions
item.setReportedResult('CONFIG_ERROR')
@@ -1018,10 +1021,10 @@ class PipelineManager(object):
item.setReportedResult('FAILURE')
elif not item.getJobs():
# We don't send empty reports with +1
self.log.debug("No jobs for change %s" % (item.change,))
log.debug("No jobs for change %s", item.change)
actions = []
elif item.didAllJobsSucceed():
self.log.debug("success %s" % (self.pipeline.success_actions))
log.debug("success %s", self.pipeline.success_actions)
actions = self.pipeline.success_actions
item.setReportedResult('SUCCESS')
self.pipeline._consecutive_failures = 0
@@ -1038,12 +1041,10 @@ class PipelineManager(object):
self.pipeline._consecutive_failures >= self.pipeline.disable_at):
self.pipeline._disabled = True
if actions:
self.log.info("Reporting item %s, actions: %s" %
(item, actions))
log.info("Reporting item %s, actions: %s", item, actions)
ret = self.sendReport(actions, item)
if ret:
self.log.error("Reporting item %s received: %s" %
(item, ret))
log.error("Reporting item %s received: %s", item, ret)
return ret
def reportStats(self, item):