diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index 6cc4c2e04c..218c82c93f 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -171,30 +171,37 @@ class GithubGearmanWorker(object): self.gearman.shutdown() -class GithubEventConnector(threading.Thread): - """Move events from GitHub into the scheduler""" +class GithubEventLogAdapter(logging.LoggerAdapter): + def process(self, msg, kwargs): + msg, kwargs = super(GithubEventLogAdapter, self).process(msg, kwargs) + msg = '[delivery: %s] %s' % (kwargs['extra']['delivery'], msg) + return msg, kwargs - log = logging.getLogger("zuul.GithubEventConnector") - def __init__(self, connection): - super(GithubEventConnector, self).__init__() - self.daemon = True - self.connection = connection - self._stopped = False +class GithubEventProcessor(object): + def __init__(self, connector, event_tuple): + self.connector = connector + self.connection = connector.connection + self.ts, self.body, self.event_type, self.delivery = event_tuple + logger = logging.getLogger("zuul.GithubEventConnector") + self.log = GithubEventLogAdapter(logger, {'delivery': self.delivery}) - def stop(self): - self._stopped = True - self.connection.addEvent(None) + def run(self): + self.log.debug("Starting event processing, queue length %s", + self.connection.getEventQueueSize()) + try: + self._handle_event() + finally: + self.log.debug("Finished event processing") - def _handleEvent(self): - ts, json_body, event_type, delivery = self.connection.getEvent() - if self._stopped: + def _handle_event(self): + if self.connector._stopped: return # If there's any installation mapping information in the body then # update the project mapping before any requests are made. - installation_id = json_body.get('installation', {}).get('id') - project_name = json_body.get('repository', {}).get('full_name') + installation_id = self.body.get('installation', {}).get('id') + project_name = self.body.get('repository', {}).get('full_name') if installation_id and project_name: old_id = self.connection.installation_map.get(project_name) @@ -206,29 +213,33 @@ class GithubEventConnector(threading.Thread): self.connection.installation_map[project_name] = installation_id try: - method = getattr(self, '_event_' + event_type) + method = getattr(self, '_event_' + self.event_type) except AttributeError: # TODO(jlk): Gracefully handle event types we don't care about # instead of logging an exception. - message = "Unhandled X-Github-Event: {0}".format(event_type) + message = "Unhandled X-Github-Event: {0}".format(self.event_type) self.log.debug(message) # Returns empty on unhandled events return + self.log.debug("Handling %s event", self.event_type) + try: - event = method(json_body) + event = method() except Exception: self.log.exception('Exception when handling event:') event = None if event: - event.delivery = delivery + event.delivery = self.delivery project = self.connection.source.getProject(event.project_name) if event.change_number: self.connection._getChange(project, event.change_number, event.patch_number, refresh=True) + self.log.debug("Refreshed change %s,%s", + event.change_number, event.patch_number) # If this event references a branch and we're excluding unprotected # branches, we might need to check whether the branch is now @@ -237,8 +248,8 @@ class GithubEventConnector(threading.Thread): b = self.connection.getBranch(project.name, event.branch) if b is not None: branch_protected = b.get('protected') - self.connection.checkBranchCache(project, event.branch, - branch_protected) + self.connection.checkBranchCache( + project, event.branch, branch_protected, self.log) event.branch_protected = branch_protected else: # This can happen if the branch was deleted in GitHub. In @@ -251,8 +262,8 @@ class GithubEventConnector(threading.Thread): self.connection.logEvent(event) self.connection.sched.addEvent(event) - def _event_push(self, body): - base_repo = body.get('repository') + def _event_push(self): + base_repo = self.body.get('repository') event = GithubTriggerEvent() event.trigger_name = 'github' @@ -260,10 +271,10 @@ class GithubEventConnector(threading.Thread): event.type = 'push' event.branch_updated = True - event.ref = body.get('ref') - event.oldrev = body.get('before') - event.newrev = body.get('after') - event.commits = body.get('commits') + event.ref = self.body.get('ref') + event.oldrev = self.body.get('before') + event.newrev = self.body.get('after') + event.commits = self.body.get('commits') ref_parts = event.ref.split('/', 2) # ie, ['refs', 'heads', 'foo/bar'] @@ -289,20 +300,20 @@ class GithubEventConnector(threading.Thread): # know if branch protection has been disabled before deletion # of the branch. # FIXME(tobiash): Find a way to handle that case - self.connection._clearBranchCache(project) + self.connection._clearBranchCache(project, self.log) elif event.branch_created: # A new branch never can be protected because that needs to be # configured after it has been created. - self.connection._clearBranchCache(project) + self.connection._clearBranchCache(project, self.log) return event - def _event_pull_request(self, body): - action = body.get('action') - pr_body = body.get('pull_request') + def _event_pull_request(self): + action = self.body.get('action') + pr_body = self.body.get('pull_request') event = self._pull_request_to_event(pr_body) - event.account = self._get_sender(body) + event.account = self._get_sender(self.body) event.type = 'pull_request' if action == 'opened': @@ -315,10 +326,10 @@ class GithubEventConnector(threading.Thread): event.action = 'reopened' elif action == 'labeled': event.action = 'labeled' - event.label = body['label']['name'] + event.label = self.body['label']['name'] elif action == 'unlabeled': event.action = 'unlabeled' - event.label = body['label']['name'] + event.label = self.body['label']['name'] elif action == 'edited': event.action = 'edited' else: @@ -326,66 +337,67 @@ class GithubEventConnector(threading.Thread): return event - def _event_issue_comment(self, body): + def _event_issue_comment(self): """Handles pull request comments""" - action = body.get('action') + action = self.body.get('action') if action != 'created': return - if not body.get('issue', {}).get('pull_request'): + if not self.body.get('issue', {}).get('pull_request'): # Do not process non-PR issue comment return - pr_body = self._issue_to_pull_request(body) + pr_body = self._issue_to_pull_request(self.body) if pr_body is None: return event = self._pull_request_to_event(pr_body) - event.account = self._get_sender(body) - event.comment = body.get('comment').get('body') + event.account = self._get_sender(self.body) + event.comment = self.body.get('comment').get('body') event.type = 'pull_request' event.action = 'comment' return event - def _event_pull_request_review(self, body): + def _event_pull_request_review(self): """Handles pull request reviews""" - pr_body = body.get('pull_request') + pr_body = self.body.get('pull_request') if pr_body is None: return - review = body.get('review') + review = self.body.get('review') if review is None: return event = self._pull_request_to_event(pr_body) event.state = review.get('state') - event.account = self._get_sender(body) + event.account = self._get_sender(self.body) event.type = 'pull_request_review' - event.action = body.get('action') + event.action = self.body.get('action') return event - def _event_status(self, body): - action = body.get('action') + def _event_status(self): + action = self.body.get('action') if action == 'pending': return - project = body.get('name') - pr_body = self.connection.getPullBySha(body['sha'], project) + project = self.body.get('name') + pr_body = self.connection.getPullBySha( + self.body['sha'], project, self.log) if pr_body is None: return event = self._pull_request_to_event(pr_body) - event.account = self._get_sender(body) + event.account = self._get_sender(self.body) event.type = 'pull_request' event.action = 'status' # Github API is silly. Webhook blob sets author data in # 'sender', but API call to get status puts it in 'creator'. # Duplicate the data so our code can look in one place - body['creator'] = body['sender'] - event.status = "%s:%s:%s" % _status_as_tuple(body) + self.body['creator'] = self.body['sender'] + event.status = "%s:%s:%s" % _status_as_tuple(self.body) return event def _issue_to_pull_request(self, body): number = body.get('issue').get('number') project_name = body.get('repository').get('full_name') - pr_body = self.connection.getPull(project_name, number) + pr_body = self.connection.getPull(project_name, number, self.log) if pr_body is None: self.log.debug('Pull request #%s not found in project %s' % (number, project_name)) @@ -417,14 +429,33 @@ class GithubEventConnector(threading.Thread): if login: # TODO(tobiash): it might be better to plumb in the installation id project = body.get('repository', {}).get('full_name') - return self.connection.getUser(login, project) + user = self.connection.getUser(login, project) + self.log.debug("Got user %s", user) + return user + + +class GithubEventConnector(threading.Thread): + """Move events from GitHub into the scheduler""" + + log = logging.getLogger("zuul.GithubEventConnector") + + def __init__(self, connection): + super(GithubEventConnector, self).__init__() + self.daemon = True + self.connection = connection + self._stopped = False + + def stop(self): + self._stopped = True + self.connection.addEvent(None) def run(self): while True: if self._stopped: return try: - self._handleEvent() + data = self.connection.getEvent() + GithubEventProcessor(self, data).run() except Exception: self.log.exception("Exception moving GitHub event:") finally: @@ -754,6 +785,9 @@ class GithubConnection(BaseConnection): def getEvent(self): return self.event_queue.get() + def getEventQueueSize(self): + return self.event_queue.qsize() + def eventDone(self): self.event_queue.task_done() @@ -1065,7 +1099,9 @@ class GithubConnection(BaseConnection): def getPullUrl(self, project, number): return '%s/pull/%s' % (self.getGitwebUrl(project), number) - def getPull(self, project_name, number): + def getPull(self, project_name, number, log=None): + if log is None: + log = self.log github = self.getGithubClient(project_name) owner, proj = project_name.split('/') for retry in range(5): @@ -1089,7 +1125,7 @@ class GithubConnection(BaseConnection): pr['files'] = [] pr['labels'] = [l.name for l in issueobj.labels()] - self.log.debug('Got PR %s#%s', project_name, number) + log.debug('Got PR %s#%s', project_name, number) self.log_rate_limit(self.log, github) return pr @@ -1125,7 +1161,7 @@ class GithubConnection(BaseConnection): return True - def getPullBySha(self, sha, project): + def getPullBySha(self, sha, project, log): pulls = [] owner, project = project.split('/') github = self.getGithubClient("%s/%s" % (owner, project)) @@ -1137,7 +1173,7 @@ class GithubConnection(BaseConnection): continue pulls.append(pr.as_dict()) - self.log.debug('Got PR on project %s for sha %s', project, sha) + log.debug('Got PR on project %s for sha %s', project, sha) self.log_rate_limit(self.log, github) if len(pulls) > 1: raise Exception('Multiple pulls found with head sha %s' % sha) @@ -1433,8 +1469,8 @@ class GithubConnection(BaseConnection): log.debug('GitHub API rate limit remaining: %s reset: %s', remaining, reset) - def _clearBranchCache(self, project): - self.log.debug("Clearing branch cache for %s", project.name) + def _clearBranchCache(self, project, log): + log.debug("Clearing branch cache for %s", project.name) for cache in [ self._project_branch_cache_exclude_unprotected, self._project_branch_cache_include_unprotected, @@ -1444,7 +1480,7 @@ class GithubConnection(BaseConnection): except KeyError: pass - def checkBranchCache(self, project, branch, protected): + def checkBranchCache(self, project, branch, protected, log): # If the branch appears in the exclude_unprotected cache but # is unprotected, clear the exclude cache. @@ -1457,16 +1493,16 @@ class GithubConnection(BaseConnection): cache = self._project_branch_cache_exclude_unprotected branches = cache.get(project.name, []) if (branch in branches) and (not protected): - self.log.debug("Clearing protected branch cache for %s", - project.name) + log.debug("Clearing protected branch cache for %s", + project.name) try: del cache[project.name] except KeyError: pass return if (branch not in branches) and (protected): - self.log.debug("Clearing protected branch cache for %s", - project.name) + log.debug("Clearing protected branch cache for %s", + project.name) try: del cache[project.name] except KeyError: