Improve event logging in githubconnection
This moves event processing to its own class, so that it's easier to bundle all of the data related to an event along with an event-specific logger. This logs the delivery ID for every line when we're preparing the event. It also logs the start time and queue length as well as the end time, even on error. Change-Id: I941a74ecbdb418cf94537ca9f8f1917a5e38dd33
This commit is contained in:
parent
6d6c69f93e
commit
d540ebfe32
|
@ -171,30 +171,37 @@ class GithubGearmanWorker(object):
|
||||||
self.gearman.shutdown()
|
self.gearman.shutdown()
|
||||||
|
|
||||||
|
|
||||||
class GithubEventConnector(threading.Thread):
|
class GithubEventLogAdapter(logging.LoggerAdapter):
|
||||||
"""Move events from GitHub into the scheduler"""
|
def process(self, msg, kwargs):
|
||||||
|
msg, kwargs = super(GithubEventLogAdapter, self).process(msg, kwargs)
|
||||||
|
msg = '[delivery: %s] %s' % (kwargs['extra']['delivery'], msg)
|
||||||
|
return msg, kwargs
|
||||||
|
|
||||||
log = logging.getLogger("zuul.GithubEventConnector")
|
|
||||||
|
|
||||||
def __init__(self, connection):
|
class GithubEventProcessor(object):
|
||||||
super(GithubEventConnector, self).__init__()
|
def __init__(self, connector, event_tuple):
|
||||||
self.daemon = True
|
self.connector = connector
|
||||||
self.connection = connection
|
self.connection = connector.connection
|
||||||
self._stopped = False
|
self.ts, self.body, self.event_type, self.delivery = event_tuple
|
||||||
|
logger = logging.getLogger("zuul.GithubEventConnector")
|
||||||
|
self.log = GithubEventLogAdapter(logger, {'delivery': self.delivery})
|
||||||
|
|
||||||
def stop(self):
|
def run(self):
|
||||||
self._stopped = True
|
self.log.debug("Starting event processing, queue length %s",
|
||||||
self.connection.addEvent(None)
|
self.connection.getEventQueueSize())
|
||||||
|
try:
|
||||||
|
self._handle_event()
|
||||||
|
finally:
|
||||||
|
self.log.debug("Finished event processing")
|
||||||
|
|
||||||
def _handleEvent(self):
|
def _handle_event(self):
|
||||||
ts, json_body, event_type, delivery = self.connection.getEvent()
|
if self.connector._stopped:
|
||||||
if self._stopped:
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# If there's any installation mapping information in the body then
|
# If there's any installation mapping information in the body then
|
||||||
# update the project mapping before any requests are made.
|
# update the project mapping before any requests are made.
|
||||||
installation_id = json_body.get('installation', {}).get('id')
|
installation_id = self.body.get('installation', {}).get('id')
|
||||||
project_name = json_body.get('repository', {}).get('full_name')
|
project_name = self.body.get('repository', {}).get('full_name')
|
||||||
|
|
||||||
if installation_id and project_name:
|
if installation_id and project_name:
|
||||||
old_id = self.connection.installation_map.get(project_name)
|
old_id = self.connection.installation_map.get(project_name)
|
||||||
|
@ -206,29 +213,33 @@ class GithubEventConnector(threading.Thread):
|
||||||
self.connection.installation_map[project_name] = installation_id
|
self.connection.installation_map[project_name] = installation_id
|
||||||
|
|
||||||
try:
|
try:
|
||||||
method = getattr(self, '_event_' + event_type)
|
method = getattr(self, '_event_' + self.event_type)
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
# TODO(jlk): Gracefully handle event types we don't care about
|
# TODO(jlk): Gracefully handle event types we don't care about
|
||||||
# instead of logging an exception.
|
# instead of logging an exception.
|
||||||
message = "Unhandled X-Github-Event: {0}".format(event_type)
|
message = "Unhandled X-Github-Event: {0}".format(self.event_type)
|
||||||
self.log.debug(message)
|
self.log.debug(message)
|
||||||
# Returns empty on unhandled events
|
# Returns empty on unhandled events
|
||||||
return
|
return
|
||||||
|
|
||||||
|
self.log.debug("Handling %s event", self.event_type)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
event = method(json_body)
|
event = method()
|
||||||
except Exception:
|
except Exception:
|
||||||
self.log.exception('Exception when handling event:')
|
self.log.exception('Exception when handling event:')
|
||||||
event = None
|
event = None
|
||||||
|
|
||||||
if event:
|
if event:
|
||||||
event.delivery = delivery
|
event.delivery = self.delivery
|
||||||
project = self.connection.source.getProject(event.project_name)
|
project = self.connection.source.getProject(event.project_name)
|
||||||
if event.change_number:
|
if event.change_number:
|
||||||
self.connection._getChange(project,
|
self.connection._getChange(project,
|
||||||
event.change_number,
|
event.change_number,
|
||||||
event.patch_number,
|
event.patch_number,
|
||||||
refresh=True)
|
refresh=True)
|
||||||
|
self.log.debug("Refreshed change %s,%s",
|
||||||
|
event.change_number, event.patch_number)
|
||||||
|
|
||||||
# If this event references a branch and we're excluding unprotected
|
# If this event references a branch and we're excluding unprotected
|
||||||
# branches, we might need to check whether the branch is now
|
# branches, we might need to check whether the branch is now
|
||||||
|
@ -237,8 +248,8 @@ class GithubEventConnector(threading.Thread):
|
||||||
b = self.connection.getBranch(project.name, event.branch)
|
b = self.connection.getBranch(project.name, event.branch)
|
||||||
if b is not None:
|
if b is not None:
|
||||||
branch_protected = b.get('protected')
|
branch_protected = b.get('protected')
|
||||||
self.connection.checkBranchCache(project, event.branch,
|
self.connection.checkBranchCache(
|
||||||
branch_protected)
|
project, event.branch, branch_protected, self.log)
|
||||||
event.branch_protected = branch_protected
|
event.branch_protected = branch_protected
|
||||||
else:
|
else:
|
||||||
# This can happen if the branch was deleted in GitHub. In
|
# This can happen if the branch was deleted in GitHub. In
|
||||||
|
@ -251,8 +262,8 @@ class GithubEventConnector(threading.Thread):
|
||||||
self.connection.logEvent(event)
|
self.connection.logEvent(event)
|
||||||
self.connection.sched.addEvent(event)
|
self.connection.sched.addEvent(event)
|
||||||
|
|
||||||
def _event_push(self, body):
|
def _event_push(self):
|
||||||
base_repo = body.get('repository')
|
base_repo = self.body.get('repository')
|
||||||
|
|
||||||
event = GithubTriggerEvent()
|
event = GithubTriggerEvent()
|
||||||
event.trigger_name = 'github'
|
event.trigger_name = 'github'
|
||||||
|
@ -260,10 +271,10 @@ class GithubEventConnector(threading.Thread):
|
||||||
event.type = 'push'
|
event.type = 'push'
|
||||||
event.branch_updated = True
|
event.branch_updated = True
|
||||||
|
|
||||||
event.ref = body.get('ref')
|
event.ref = self.body.get('ref')
|
||||||
event.oldrev = body.get('before')
|
event.oldrev = self.body.get('before')
|
||||||
event.newrev = body.get('after')
|
event.newrev = self.body.get('after')
|
||||||
event.commits = body.get('commits')
|
event.commits = self.body.get('commits')
|
||||||
|
|
||||||
ref_parts = event.ref.split('/', 2) # ie, ['refs', 'heads', 'foo/bar']
|
ref_parts = event.ref.split('/', 2) # ie, ['refs', 'heads', 'foo/bar']
|
||||||
|
|
||||||
|
@ -289,20 +300,20 @@ class GithubEventConnector(threading.Thread):
|
||||||
# know if branch protection has been disabled before deletion
|
# know if branch protection has been disabled before deletion
|
||||||
# of the branch.
|
# of the branch.
|
||||||
# FIXME(tobiash): Find a way to handle that case
|
# FIXME(tobiash): Find a way to handle that case
|
||||||
self.connection._clearBranchCache(project)
|
self.connection._clearBranchCache(project, self.log)
|
||||||
elif event.branch_created:
|
elif event.branch_created:
|
||||||
# A new branch never can be protected because that needs to be
|
# A new branch never can be protected because that needs to be
|
||||||
# configured after it has been created.
|
# configured after it has been created.
|
||||||
self.connection._clearBranchCache(project)
|
self.connection._clearBranchCache(project, self.log)
|
||||||
|
|
||||||
return event
|
return event
|
||||||
|
|
||||||
def _event_pull_request(self, body):
|
def _event_pull_request(self):
|
||||||
action = body.get('action')
|
action = self.body.get('action')
|
||||||
pr_body = body.get('pull_request')
|
pr_body = self.body.get('pull_request')
|
||||||
|
|
||||||
event = self._pull_request_to_event(pr_body)
|
event = self._pull_request_to_event(pr_body)
|
||||||
event.account = self._get_sender(body)
|
event.account = self._get_sender(self.body)
|
||||||
|
|
||||||
event.type = 'pull_request'
|
event.type = 'pull_request'
|
||||||
if action == 'opened':
|
if action == 'opened':
|
||||||
|
@ -315,10 +326,10 @@ class GithubEventConnector(threading.Thread):
|
||||||
event.action = 'reopened'
|
event.action = 'reopened'
|
||||||
elif action == 'labeled':
|
elif action == 'labeled':
|
||||||
event.action = 'labeled'
|
event.action = 'labeled'
|
||||||
event.label = body['label']['name']
|
event.label = self.body['label']['name']
|
||||||
elif action == 'unlabeled':
|
elif action == 'unlabeled':
|
||||||
event.action = 'unlabeled'
|
event.action = 'unlabeled'
|
||||||
event.label = body['label']['name']
|
event.label = self.body['label']['name']
|
||||||
elif action == 'edited':
|
elif action == 'edited':
|
||||||
event.action = 'edited'
|
event.action = 'edited'
|
||||||
else:
|
else:
|
||||||
|
@ -326,66 +337,67 @@ class GithubEventConnector(threading.Thread):
|
||||||
|
|
||||||
return event
|
return event
|
||||||
|
|
||||||
def _event_issue_comment(self, body):
|
def _event_issue_comment(self):
|
||||||
"""Handles pull request comments"""
|
"""Handles pull request comments"""
|
||||||
action = body.get('action')
|
action = self.body.get('action')
|
||||||
if action != 'created':
|
if action != 'created':
|
||||||
return
|
return
|
||||||
if not body.get('issue', {}).get('pull_request'):
|
if not self.body.get('issue', {}).get('pull_request'):
|
||||||
# Do not process non-PR issue comment
|
# Do not process non-PR issue comment
|
||||||
return
|
return
|
||||||
pr_body = self._issue_to_pull_request(body)
|
pr_body = self._issue_to_pull_request(self.body)
|
||||||
if pr_body is None:
|
if pr_body is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
event = self._pull_request_to_event(pr_body)
|
event = self._pull_request_to_event(pr_body)
|
||||||
event.account = self._get_sender(body)
|
event.account = self._get_sender(self.body)
|
||||||
event.comment = body.get('comment').get('body')
|
event.comment = self.body.get('comment').get('body')
|
||||||
event.type = 'pull_request'
|
event.type = 'pull_request'
|
||||||
event.action = 'comment'
|
event.action = 'comment'
|
||||||
return event
|
return event
|
||||||
|
|
||||||
def _event_pull_request_review(self, body):
|
def _event_pull_request_review(self):
|
||||||
"""Handles pull request reviews"""
|
"""Handles pull request reviews"""
|
||||||
pr_body = body.get('pull_request')
|
pr_body = self.body.get('pull_request')
|
||||||
if pr_body is None:
|
if pr_body is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
review = body.get('review')
|
review = self.body.get('review')
|
||||||
if review is None:
|
if review is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
event = self._pull_request_to_event(pr_body)
|
event = self._pull_request_to_event(pr_body)
|
||||||
event.state = review.get('state')
|
event.state = review.get('state')
|
||||||
event.account = self._get_sender(body)
|
event.account = self._get_sender(self.body)
|
||||||
event.type = 'pull_request_review'
|
event.type = 'pull_request_review'
|
||||||
event.action = body.get('action')
|
event.action = self.body.get('action')
|
||||||
return event
|
return event
|
||||||
|
|
||||||
def _event_status(self, body):
|
def _event_status(self):
|
||||||
action = body.get('action')
|
action = self.body.get('action')
|
||||||
if action == 'pending':
|
if action == 'pending':
|
||||||
return
|
return
|
||||||
project = body.get('name')
|
project = self.body.get('name')
|
||||||
pr_body = self.connection.getPullBySha(body['sha'], project)
|
pr_body = self.connection.getPullBySha(
|
||||||
|
self.body['sha'], project, self.log)
|
||||||
if pr_body is None:
|
if pr_body is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
event = self._pull_request_to_event(pr_body)
|
event = self._pull_request_to_event(pr_body)
|
||||||
event.account = self._get_sender(body)
|
event.account = self._get_sender(self.body)
|
||||||
event.type = 'pull_request'
|
event.type = 'pull_request'
|
||||||
event.action = 'status'
|
event.action = 'status'
|
||||||
# Github API is silly. Webhook blob sets author data in
|
# Github API is silly. Webhook blob sets author data in
|
||||||
# 'sender', but API call to get status puts it in 'creator'.
|
# 'sender', but API call to get status puts it in 'creator'.
|
||||||
# Duplicate the data so our code can look in one place
|
# Duplicate the data so our code can look in one place
|
||||||
body['creator'] = body['sender']
|
self.body['creator'] = self.body['sender']
|
||||||
event.status = "%s:%s:%s" % _status_as_tuple(body)
|
event.status = "%s:%s:%s" % _status_as_tuple(self.body)
|
||||||
return event
|
return event
|
||||||
|
|
||||||
def _issue_to_pull_request(self, body):
|
def _issue_to_pull_request(self, body):
|
||||||
number = body.get('issue').get('number')
|
number = body.get('issue').get('number')
|
||||||
project_name = body.get('repository').get('full_name')
|
project_name = body.get('repository').get('full_name')
|
||||||
pr_body = self.connection.getPull(project_name, number)
|
pr_body = self.connection.getPull(project_name, number, self.log)
|
||||||
if pr_body is None:
|
if pr_body is None:
|
||||||
self.log.debug('Pull request #%s not found in project %s' %
|
self.log.debug('Pull request #%s not found in project %s' %
|
||||||
(number, project_name))
|
(number, project_name))
|
||||||
|
@ -417,14 +429,33 @@ class GithubEventConnector(threading.Thread):
|
||||||
if login:
|
if login:
|
||||||
# TODO(tobiash): it might be better to plumb in the installation id
|
# TODO(tobiash): it might be better to plumb in the installation id
|
||||||
project = body.get('repository', {}).get('full_name')
|
project = body.get('repository', {}).get('full_name')
|
||||||
return self.connection.getUser(login, project)
|
user = self.connection.getUser(login, project)
|
||||||
|
self.log.debug("Got user %s", user)
|
||||||
|
return user
|
||||||
|
|
||||||
|
|
||||||
|
class GithubEventConnector(threading.Thread):
|
||||||
|
"""Move events from GitHub into the scheduler"""
|
||||||
|
|
||||||
|
log = logging.getLogger("zuul.GithubEventConnector")
|
||||||
|
|
||||||
|
def __init__(self, connection):
|
||||||
|
super(GithubEventConnector, self).__init__()
|
||||||
|
self.daemon = True
|
||||||
|
self.connection = connection
|
||||||
|
self._stopped = False
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self._stopped = True
|
||||||
|
self.connection.addEvent(None)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while True:
|
while True:
|
||||||
if self._stopped:
|
if self._stopped:
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
self._handleEvent()
|
data = self.connection.getEvent()
|
||||||
|
GithubEventProcessor(self, data).run()
|
||||||
except Exception:
|
except Exception:
|
||||||
self.log.exception("Exception moving GitHub event:")
|
self.log.exception("Exception moving GitHub event:")
|
||||||
finally:
|
finally:
|
||||||
|
@ -754,6 +785,9 @@ class GithubConnection(BaseConnection):
|
||||||
def getEvent(self):
|
def getEvent(self):
|
||||||
return self.event_queue.get()
|
return self.event_queue.get()
|
||||||
|
|
||||||
|
def getEventQueueSize(self):
|
||||||
|
return self.event_queue.qsize()
|
||||||
|
|
||||||
def eventDone(self):
|
def eventDone(self):
|
||||||
self.event_queue.task_done()
|
self.event_queue.task_done()
|
||||||
|
|
||||||
|
@ -1065,7 +1099,9 @@ class GithubConnection(BaseConnection):
|
||||||
def getPullUrl(self, project, number):
|
def getPullUrl(self, project, number):
|
||||||
return '%s/pull/%s' % (self.getGitwebUrl(project), number)
|
return '%s/pull/%s' % (self.getGitwebUrl(project), number)
|
||||||
|
|
||||||
def getPull(self, project_name, number):
|
def getPull(self, project_name, number, log=None):
|
||||||
|
if log is None:
|
||||||
|
log = self.log
|
||||||
github = self.getGithubClient(project_name)
|
github = self.getGithubClient(project_name)
|
||||||
owner, proj = project_name.split('/')
|
owner, proj = project_name.split('/')
|
||||||
for retry in range(5):
|
for retry in range(5):
|
||||||
|
@ -1089,7 +1125,7 @@ class GithubConnection(BaseConnection):
|
||||||
pr['files'] = []
|
pr['files'] = []
|
||||||
|
|
||||||
pr['labels'] = [l.name for l in issueobj.labels()]
|
pr['labels'] = [l.name for l in issueobj.labels()]
|
||||||
self.log.debug('Got PR %s#%s', project_name, number)
|
log.debug('Got PR %s#%s', project_name, number)
|
||||||
self.log_rate_limit(self.log, github)
|
self.log_rate_limit(self.log, github)
|
||||||
return pr
|
return pr
|
||||||
|
|
||||||
|
@ -1125,7 +1161,7 @@ class GithubConnection(BaseConnection):
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def getPullBySha(self, sha, project):
|
def getPullBySha(self, sha, project, log):
|
||||||
pulls = []
|
pulls = []
|
||||||
owner, project = project.split('/')
|
owner, project = project.split('/')
|
||||||
github = self.getGithubClient("%s/%s" % (owner, project))
|
github = self.getGithubClient("%s/%s" % (owner, project))
|
||||||
|
@ -1137,7 +1173,7 @@ class GithubConnection(BaseConnection):
|
||||||
continue
|
continue
|
||||||
pulls.append(pr.as_dict())
|
pulls.append(pr.as_dict())
|
||||||
|
|
||||||
self.log.debug('Got PR on project %s for sha %s', project, sha)
|
log.debug('Got PR on project %s for sha %s', project, sha)
|
||||||
self.log_rate_limit(self.log, github)
|
self.log_rate_limit(self.log, github)
|
||||||
if len(pulls) > 1:
|
if len(pulls) > 1:
|
||||||
raise Exception('Multiple pulls found with head sha %s' % sha)
|
raise Exception('Multiple pulls found with head sha %s' % sha)
|
||||||
|
@ -1433,8 +1469,8 @@ class GithubConnection(BaseConnection):
|
||||||
log.debug('GitHub API rate limit remaining: %s reset: %s',
|
log.debug('GitHub API rate limit remaining: %s reset: %s',
|
||||||
remaining, reset)
|
remaining, reset)
|
||||||
|
|
||||||
def _clearBranchCache(self, project):
|
def _clearBranchCache(self, project, log):
|
||||||
self.log.debug("Clearing branch cache for %s", project.name)
|
log.debug("Clearing branch cache for %s", project.name)
|
||||||
for cache in [
|
for cache in [
|
||||||
self._project_branch_cache_exclude_unprotected,
|
self._project_branch_cache_exclude_unprotected,
|
||||||
self._project_branch_cache_include_unprotected,
|
self._project_branch_cache_include_unprotected,
|
||||||
|
@ -1444,7 +1480,7 @@ class GithubConnection(BaseConnection):
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def checkBranchCache(self, project, branch, protected):
|
def checkBranchCache(self, project, branch, protected, log):
|
||||||
# If the branch appears in the exclude_unprotected cache but
|
# If the branch appears in the exclude_unprotected cache but
|
||||||
# is unprotected, clear the exclude cache.
|
# is unprotected, clear the exclude cache.
|
||||||
|
|
||||||
|
@ -1457,16 +1493,16 @@ class GithubConnection(BaseConnection):
|
||||||
cache = self._project_branch_cache_exclude_unprotected
|
cache = self._project_branch_cache_exclude_unprotected
|
||||||
branches = cache.get(project.name, [])
|
branches = cache.get(project.name, [])
|
||||||
if (branch in branches) and (not protected):
|
if (branch in branches) and (not protected):
|
||||||
self.log.debug("Clearing protected branch cache for %s",
|
log.debug("Clearing protected branch cache for %s",
|
||||||
project.name)
|
project.name)
|
||||||
try:
|
try:
|
||||||
del cache[project.name]
|
del cache[project.name]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
return
|
return
|
||||||
if (branch not in branches) and (protected):
|
if (branch not in branches) and (protected):
|
||||||
self.log.debug("Clearing protected branch cache for %s",
|
log.debug("Clearing protected branch cache for %s",
|
||||||
project.name)
|
project.name)
|
||||||
try:
|
try:
|
||||||
del cache[project.name]
|
del cache[project.name]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
|
Loading…
Reference in New Issue