Dispatch Github webhook events via Zookeeper

Change-Id: I5d06351ed1475afd1862b9f38db4382a7f7fc10d
This commit is contained in:
Simon Westphahl
2021-02-12 14:20:16 +01:00
parent 1ef47dbbd2
commit 614bd40341
2 changed files with 108 additions and 122 deletions

View File

@@ -18,7 +18,6 @@ import datetime
import logging
import hmac
import hashlib
import queue
import threading
import time
import json
@@ -42,7 +41,6 @@ from github3.session import AppInstallationTokenAuth
from zuul.connection import CachedBranchConnection
from zuul.driver.github.graphql import GraphQLClient
from zuul.lib.gearworker import ZuulGearWorker
from zuul.lib.queue import NamedQueue
from zuul.web.handler import BaseWebController
from zuul.lib.logutil import get_annotated_logger
@@ -50,6 +48,7 @@ from zuul.model import Ref, Branch, Tag, Project
from zuul.exceptions import MergeFailure
from zuul.driver.github.githubmodel import PullRequest, GithubTriggerEvent
from zuul.model import DequeueEvent
from zuul.zk.event_queues import ConnectionEventQueue
GITHUB_BASE_URL = 'https://api.github.com'
PREVIEW_JSON_ACCEPT = 'application/vnd.github.machine-man-preview+json'
@@ -66,6 +65,10 @@ ANNOTATION_LEVELS = {
"error": "failure",
}
EventTuple = collections.namedtuple(
"EventTuple", ["timestamp", "body", "event_type", "delivery"]
)
def _sign_request(body, secret):
signature = 'sha1=' + hmac.new(
@@ -303,90 +306,26 @@ class GithubShaCache(object):
return cached_prs
class GithubGearmanWorker(object):
"""A thread that answers gearman requests"""
log = logging.getLogger("zuul.GithubGearmanWorker")
def __init__(self, connection):
self.config = connection.sched.config
self.connection = connection
handler = "github:%s:payload" % self.connection.connection_name
self.jobs = {
handler: self.handle_payload,
}
self.gearworker = ZuulGearWorker(
'Zuul Github Connector',
'zuul.GithubGearmanWorker',
'github-gearman-worker',
self.config,
self.jobs)
def handle_payload(self, job):
args = json.loads(job.arguments)
headers = args.get("headers")
body = args.get("body")
delivery = headers.get('x-github-delivery')
log = get_annotated_logger(self.log, delivery)
log.debug("Github Webhook Received")
# TODO(jlk): Validate project in the request is a project we know
try:
self.__dispatch_event(body, headers, log)
output = {'return_code': 200}
except Exception:
output = {'return_code': 503}
log.exception("Exception handling Github event:")
job.sendWorkComplete(json.dumps(output))
def __dispatch_event(self, body, headers, log):
try:
event = headers['x-github-event']
log.debug("X-Github-Event: " + event)
except KeyError:
log.debug("Request headers missing the X-Github-Event.")
raise Exception('Please specify a X-Github-Event header.')
delivery = headers.get('x-github-delivery')
try:
self.connection.addEvent(body, event, delivery)
except Exception:
message = 'Exception deserializing JSON body'
log.exception(message)
# TODO(jlk): Raise this as something different?
raise Exception(message)
def start(self):
self.gearworker.start()
def stop(self):
self.gearworker.stop()
class GithubEventProcessor(object):
def __init__(self, connector, event_tuple):
def __init__(self, connector, event_tuple, connection_event):
self.connector = connector
self.connection = connector.connection
self.ts, self.body, self.event_type, self.delivery = event_tuple
logger = logging.getLogger("zuul.GithubEventProcessor")
self.zuul_event_id = self.delivery
self.log = get_annotated_logger(logger, self.zuul_event_id)
self.connection_event = connection_event
self.event = None
def run(self):
self.log.debug("Starting event processing, queue length %s",
self.connection.getEventQueueSize())
self.log.debug("Starting event processing")
try:
self._process_event()
except Exception:
self.log.exception("Exception when processing event:")
finally:
self.log.debug("Finished event processing")
return self.event
return self.event, self.connection_event
def _process_event(self):
if self.connector._stopped:
@@ -706,7 +645,10 @@ class GithubEventConnector:
def __init__(self, connection):
self.connection = connection
self.event_queue = connection.event_queue
self._stopped = False
self._events_in_progress = set()
self._dispatcher_wake_event = threading.Event()
self._event_dispatcher = threading.Thread(
name='GithubEventDispatcher', target=self.run_event_dispatcher,
daemon=True)
@@ -718,7 +660,8 @@ class GithubEventConnector:
def stop(self):
self._stopped = True
self.connection.addEvent(None)
self._dispatcher_wake_event.set()
self.event_queue.election.cancel()
self._event_dispatcher.join()
self._event_forward_queue.put(None)
@@ -729,19 +672,47 @@ class GithubEventConnector:
self._event_forwarder.start()
self._event_dispatcher.start()
def _onNewEvent(self):
self._dispatcher_wake_event.set()
# Stop the data watch in case the connector was stopped
return not self._stopped
def run_event_dispatcher(self):
while True:
if self._stopped:
return
self.event_queue.registerEventWatch(self._onNewEvent)
while not self._stopped:
try:
data = self.connection.getEvent()
processor = GithubEventProcessor(self, data)
future = self._thread_pool.submit(processor.run)
self._event_forward_queue.put(future)
self.event_queue.election.run(self._dispatchEvents)
except Exception:
self.log.exception("Exception moving GitHub event:")
finally:
self.connection.eventDone()
self.log.exception("Exception handling GitHub event:")
def _dispatchEvents(self):
while not self._stopped:
# We need to create a copy of the in-progress set in order to
# prevent a race between the dispatcher and forwarder thread.
# This could happen if a previously seen event finished and was
# removed from the set between start of the iteration and the
# in-progress check for this event.
in_progress = set(self._events_in_progress)
for event in self.event_queue:
if event.ack_ref in in_progress:
continue
etuple = self._eventAsTuple(event)
log = get_annotated_logger(self.log, etuple.delivery)
log.debug("Github Webhook Received")
log.debug("X-Github-Event: %s", etuple.event_type)
processor = GithubEventProcessor(self, etuple, event)
future = self._thread_pool.submit(processor.run)
# Events are acknowledged in the event forwarder loop after
# pre-processing. This way we can ensure that no events are
# lost.
self._events_in_progress.add(event.ack_ref)
self._event_forward_queue.put(future)
if self._stopped:
return
self._dispatcher_wake_event.wait(10)
self._dispatcher_wake_event.clear()
def run_event_forwarder(self):
while True:
@@ -751,21 +722,35 @@ class GithubEventConnector:
future = self._event_forward_queue.get()
if future is None:
return
event = future.result()
if not event:
continue
self.connection.logEvent(event)
if isinstance(event, DequeueEvent):
self.connection.sched.addManagementEvent(event)
else:
self.connection.sched.addTriggerEvent(
self.connection.driver_name, event
)
event, connection_event = future.result()
try:
if not event:
continue
self.connection.logEvent(event)
if isinstance(event, DequeueEvent):
self.connection.sched.addManagementEvent(event)
else:
self.connection.sched.addTriggerEvent(
self.connection.driver_name, event
)
finally:
# Ack event in Zookeeper
self.event_queue.ack(connection_event)
self._events_in_progress.remove(connection_event.ack_ref)
except Exception:
self.log.exception("Exception moving GitHub event:")
finally:
# Ack task in forward queue
self._event_forward_queue.task_done()
@staticmethod
def _eventAsTuple(event):
body = event.get("body")
headers = event.get("headers", {})
event_type = headers.get('x-github-event')
delivery = headers.get('x-github-delivery')
return EventTuple(time.time(), body, event_type, delivery)
class GithubUser(collections.Mapping):
log = logging.getLogger('zuul.GithubUser')
@@ -1155,6 +1140,7 @@ class GithubConnection(CachedBranchConnection):
log = logging.getLogger("zuul.GithubConnection")
payload_path = 'payload'
client_manager_class = GithubClientManager
_event_connector_class = GithubEventConnector
def __init__(self, driver, connection_name, connection_config):
super(GithubConnection, self).__init__(driver, connection_name,
@@ -1167,7 +1153,6 @@ class GithubConnection(CachedBranchConnection):
self.canonical_hostname = self.connection_config.get(
'canonical_hostname', self.server)
self.source = driver.getSource(self)
self.event_queue = queue.Queue()
self._sha_pr_cache = GithubShaCache()
self._request_locks = {}
@@ -1192,24 +1177,24 @@ class GithubConnection(CachedBranchConnection):
return d
def onLoad(self):
self.log.info('Starting GitHub connection: %s' % self.connection_name)
self.gearman_worker = GithubGearmanWorker(self)
self.log.info('Starting GitHub connection: %s', self.connection_name)
self._github_client_manager.initialize()
self.log.debug('Creating Zookeeper event queue')
self.event_queue = ConnectionEventQueue(
self.sched.zk_client, self.connection_name
)
self.log.info('Starting event connector')
self._start_event_connector()
self.log.info('Starting GearmanWorker')
self.gearman_worker.start()
def onStop(self):
# TODO(jeblair): remove this check which is here only so that
# zuul-web can call connections.stop to shut down the sql
# connection.
if hasattr(self, 'gearman_worker'):
self.gearman_worker.stop()
if hasattr(self, 'github_event_connector'):
self._stop_event_connector()
def _start_event_connector(self):
self.github_event_connector = GithubEventConnector(self)
self.github_event_connector = self._event_connector_class(self)
self.github_event_connector.start()
def _stop_event_connector(self):
@@ -1230,18 +1215,6 @@ class GithubConnection(CachedBranchConnection):
installation_id, threading.Semaphore(
value=self.max_threads_per_installation))
def addEvent(self, data, event=None, delivery=None):
return self.event_queue.put((time.time(), data, event, delivery))
def getEvent(self):
return self.event_queue.get()
def getEventQueueSize(self):
return self.event_queue.qsize()
def eventDone(self):
self.event_queue.task_done()
def getGithubClient(self,
project_name=None,
zuul_event_id=None):
@@ -2229,6 +2202,10 @@ class GithubWebController(BaseWebController):
def __init__(self, zuul_web, connection):
self.connection = connection
self.zuul_web = zuul_web
self.event_queue = ConnectionEventQueue(
self.zuul_web.zk_client,
self.connection.connection_name
)
self.token = self.connection.connection_config.get('webhook_token')
def _validate_signature(self, body, headers):
@@ -2270,15 +2247,13 @@ class GithubWebController(BaseWebController):
headers[key.lower()] = value
body = cherrypy.request.body.read()
self._validate_signature(body, headers)
# We cannot send the raw body through gearman, so it's easy to just
# We cannot send the raw body through zookeeper, so it's easy to just
# encode it as json, after decoding it as utf-8
json_body = json.loads(body.decode('utf-8'))
job = self.zuul_web.rpc.submitJob(
'github:%s:payload' % self.connection.connection_name,
{'headers': headers, 'body': json_body})
return json.loads(job.data[0])
data = {'headers': headers, 'body': json_body}
self.event_queue.put(data)
return data
def _status_as_tuple(status):