Merge "Dispatch Github webhook events via Zookeeper"

This commit is contained in:
Zuul 2021-03-26 17:35:14 +00:00 committed by Gerrit Code Review
commit 21f682c3dd
2 changed files with 108 additions and 122 deletions

View File

@ -84,8 +84,11 @@ from zuul.driver.gitlab import GitlabDriver
from zuul.driver.gerrit import GerritDriver
from zuul.driver.github.githubconnection import GithubClientManager
from zuul.driver.elasticsearch import ElasticsearchDriver
from zuul.lib.collections import DefaultKeyDict
from zuul.lib.connections import ConnectionRegistry
from zuul.lib.queue import NamedQueue
from zuul.zk import ZooKeeperClient
from zuul.zk.event_queues import ConnectionEventQueue
from psutil import Popen
import tests.fakegithub
@ -260,7 +263,6 @@ class GithubDriverMock(GithubDriver):
changes_db=db,
upstream_root=self.upstream_root,
git_url_with_auth=self.git_url_with_auth)
self.additional_event_queues.append(connection.event_queue)
setattr(self.registry, 'fake_' + name, connection)
client = connection.getGithubClient(None)
registerProjects(connection.source.name, client, self.config)
@ -1124,6 +1126,7 @@ class FakeElasticsearchConnection(elconnection.ElasticsearchConnection):
def __init__(self, driver, connection_name, connection_config):
self.driver = driver
self.connection_name = connection_name
self.source_it = None
def add_docs(self, source_it, index):
@ -2722,10 +2725,9 @@ class FakeGithubConnection(githubconnection.GithubConnection):
% (self.zuul_web_port, self.connection_name),
json=data, headers=headers)
else:
job = self.rpcclient.submitJob(
'github:%s:payload' % self.connection_name,
{'headers': headers, 'body': data})
return json.loads(job.data[0])
data = {'headers': headers, 'body': data}
self.event_queue.put(data)
return data
def addProject(self, project):
# use the original method here and additionally register it in the
@ -4317,6 +4319,11 @@ class ZuulTestCase(BaseTestCase):
self.changes: Dict[str, Dict[str, Change]] = {}
self.additional_event_queues = []
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
self.connection_event_queues = DefaultKeyDict(
lambda cn: ConnectionEventQueue(self.zk_client, cn)
)
self.poller_events = {}
self._configureSmtp()
self._configureMqtt()
@ -4671,6 +4678,7 @@ class ZuulTestCase(BaseTestCase):
self.rpcclient.shutdown()
self.gearman_server.shutdown()
self.fake_nodepool.stop()
self.zk_client.disconnect()
self.printHistory()
# We whitelist watchdog threads as they have relatively long delays
# before noticing they should exit, but they should exit on their own.
@ -4914,6 +4922,9 @@ class ZuulTestCase(BaseTestCase):
def __areZooKeeperEventQueuesEmpty(self, matcher=None) -> bool:
for sched in map(lambda app: app.sched, self.scheds.filter(matcher)):
for connection_name in sched.connections.connections:
if self.connection_event_queues[connection_name].hasEvents():
return False
if sched.management_events.hasEvents():
return False
if sched.trigger_events.hasEvents():

View File

@ -18,7 +18,6 @@ import datetime
import logging
import hmac
import hashlib
import queue
import threading
import time
import json
@ -42,7 +41,6 @@ from github3.session import AppInstallationTokenAuth
from zuul.connection import CachedBranchConnection
from zuul.driver.github.graphql import GraphQLClient
from zuul.lib.gearworker import ZuulGearWorker
from zuul.lib.queue import NamedQueue
from zuul.web.handler import BaseWebController
from zuul.lib.logutil import get_annotated_logger
@ -50,6 +48,7 @@ from zuul.model import Ref, Branch, Tag, Project
from zuul.exceptions import MergeFailure
from zuul.driver.github.githubmodel import PullRequest, GithubTriggerEvent
from zuul.model import DequeueEvent
from zuul.zk.event_queues import ConnectionEventQueue
GITHUB_BASE_URL = 'https://api.github.com'
PREVIEW_JSON_ACCEPT = 'application/vnd.github.machine-man-preview+json'
@ -66,6 +65,10 @@ ANNOTATION_LEVELS = {
"error": "failure",
}
EventTuple = collections.namedtuple(
"EventTuple", ["timestamp", "body", "event_type", "delivery"]
)
def _sign_request(body, secret):
signature = 'sha1=' + hmac.new(
@ -303,90 +306,26 @@ class GithubShaCache(object):
return cached_prs
class GithubGearmanWorker(object):
"""A thread that answers gearman requests"""
log = logging.getLogger("zuul.GithubGearmanWorker")
def __init__(self, connection):
self.config = connection.sched.config
self.connection = connection
handler = "github:%s:payload" % self.connection.connection_name
self.jobs = {
handler: self.handle_payload,
}
self.gearworker = ZuulGearWorker(
'Zuul Github Connector',
'zuul.GithubGearmanWorker',
'github-gearman-worker',
self.config,
self.jobs)
def handle_payload(self, job):
args = json.loads(job.arguments)
headers = args.get("headers")
body = args.get("body")
delivery = headers.get('x-github-delivery')
log = get_annotated_logger(self.log, delivery)
log.debug("Github Webhook Received")
# TODO(jlk): Validate project in the request is a project we know
try:
self.__dispatch_event(body, headers, log)
output = {'return_code': 200}
except Exception:
output = {'return_code': 503}
log.exception("Exception handling Github event:")
job.sendWorkComplete(json.dumps(output))
def __dispatch_event(self, body, headers, log):
try:
event = headers['x-github-event']
log.debug("X-Github-Event: " + event)
except KeyError:
log.debug("Request headers missing the X-Github-Event.")
raise Exception('Please specify a X-Github-Event header.')
delivery = headers.get('x-github-delivery')
try:
self.connection.addEvent(body, event, delivery)
except Exception:
message = 'Exception deserializing JSON body'
log.exception(message)
# TODO(jlk): Raise this as something different?
raise Exception(message)
def start(self):
self.gearworker.start()
def stop(self):
self.gearworker.stop()
class GithubEventProcessor(object):
def __init__(self, connector, event_tuple):
def __init__(self, connector, event_tuple, connection_event):
self.connector = connector
self.connection = connector.connection
self.ts, self.body, self.event_type, self.delivery = event_tuple
logger = logging.getLogger("zuul.GithubEventProcessor")
self.zuul_event_id = self.delivery
self.log = get_annotated_logger(logger, self.zuul_event_id)
self.connection_event = connection_event
self.event = None
def run(self):
self.log.debug("Starting event processing, queue length %s",
self.connection.getEventQueueSize())
self.log.debug("Starting event processing")
try:
self._process_event()
except Exception:
self.log.exception("Exception when processing event:")
finally:
self.log.debug("Finished event processing")
return self.event
return self.event, self.connection_event
def _process_event(self):
if self.connector._stopped:
@ -706,7 +645,10 @@ class GithubEventConnector:
def __init__(self, connection):
self.connection = connection
self.event_queue = connection.event_queue
self._stopped = False
self._events_in_progress = set()
self._dispatcher_wake_event = threading.Event()
self._event_dispatcher = threading.Thread(
name='GithubEventDispatcher', target=self.run_event_dispatcher,
daemon=True)
@ -718,7 +660,8 @@ class GithubEventConnector:
def stop(self):
self._stopped = True
self.connection.addEvent(None)
self._dispatcher_wake_event.set()
self.event_queue.election.cancel()
self._event_dispatcher.join()
self._event_forward_queue.put(None)
@ -729,19 +672,47 @@ class GithubEventConnector:
self._event_forwarder.start()
self._event_dispatcher.start()
def _onNewEvent(self):
self._dispatcher_wake_event.set()
# Stop the data watch in case the connector was stopped
return not self._stopped
def run_event_dispatcher(self):
while True:
if self._stopped:
return
self.event_queue.registerEventWatch(self._onNewEvent)
while not self._stopped:
try:
data = self.connection.getEvent()
processor = GithubEventProcessor(self, data)
future = self._thread_pool.submit(processor.run)
self._event_forward_queue.put(future)
self.event_queue.election.run(self._dispatchEvents)
except Exception:
self.log.exception("Exception moving GitHub event:")
finally:
self.connection.eventDone()
self.log.exception("Exception handling GitHub event:")
def _dispatchEvents(self):
while not self._stopped:
# We need to create a copy of the in-progress set in order to
# prevent a race between the dispatcher and forwarder thread.
# This could happen if a previously seen event finished and was
# removed from the set between start of the iteration and the
# in-progress check for this event.
in_progress = set(self._events_in_progress)
for event in self.event_queue:
if event.ack_ref in in_progress:
continue
etuple = self._eventAsTuple(event)
log = get_annotated_logger(self.log, etuple.delivery)
log.debug("Github Webhook Received")
log.debug("X-Github-Event: %s", etuple.event_type)
processor = GithubEventProcessor(self, etuple, event)
future = self._thread_pool.submit(processor.run)
# Events are acknowledged in the event forwarder loop after
# pre-processing. This way we can ensure that no events are
# lost.
self._events_in_progress.add(event.ack_ref)
self._event_forward_queue.put(future)
if self._stopped:
return
self._dispatcher_wake_event.wait(10)
self._dispatcher_wake_event.clear()
def run_event_forwarder(self):
while True:
@ -751,21 +722,35 @@ class GithubEventConnector:
future = self._event_forward_queue.get()
if future is None:
return
event = future.result()
if not event:
continue
self.connection.logEvent(event)
if isinstance(event, DequeueEvent):
self.connection.sched.addManagementEvent(event)
else:
self.connection.sched.addTriggerEvent(
self.connection.driver_name, event
)
event, connection_event = future.result()
try:
if not event:
continue
self.connection.logEvent(event)
if isinstance(event, DequeueEvent):
self.connection.sched.addManagementEvent(event)
else:
self.connection.sched.addTriggerEvent(
self.connection.driver_name, event
)
finally:
# Ack event in Zookeeper
self.event_queue.ack(connection_event)
self._events_in_progress.remove(connection_event.ack_ref)
except Exception:
self.log.exception("Exception moving GitHub event:")
finally:
# Ack task in forward queue
self._event_forward_queue.task_done()
@staticmethod
def _eventAsTuple(event):
body = event.get("body")
headers = event.get("headers", {})
event_type = headers.get('x-github-event')
delivery = headers.get('x-github-delivery')
return EventTuple(time.time(), body, event_type, delivery)
class GithubUser(collections.Mapping):
log = logging.getLogger('zuul.GithubUser')
@ -1155,6 +1140,7 @@ class GithubConnection(CachedBranchConnection):
log = logging.getLogger("zuul.GithubConnection")
payload_path = 'payload'
client_manager_class = GithubClientManager
_event_connector_class = GithubEventConnector
def __init__(self, driver, connection_name, connection_config):
super(GithubConnection, self).__init__(driver, connection_name,
@ -1167,7 +1153,6 @@ class GithubConnection(CachedBranchConnection):
self.canonical_hostname = self.connection_config.get(
'canonical_hostname', self.server)
self.source = driver.getSource(self)
self.event_queue = queue.Queue()
self._sha_pr_cache = GithubShaCache()
self._request_locks = {}
@ -1192,24 +1177,24 @@ class GithubConnection(CachedBranchConnection):
return d
def onLoad(self):
self.log.info('Starting GitHub connection: %s' % self.connection_name)
self.gearman_worker = GithubGearmanWorker(self)
self.log.info('Starting GitHub connection: %s', self.connection_name)
self._github_client_manager.initialize()
self.log.debug('Creating Zookeeper event queue')
self.event_queue = ConnectionEventQueue(
self.sched.zk_client, self.connection_name
)
self.log.info('Starting event connector')
self._start_event_connector()
self.log.info('Starting GearmanWorker')
self.gearman_worker.start()
def onStop(self):
# TODO(jeblair): remove this check which is here only so that
# zuul-web can call connections.stop to shut down the sql
# connection.
if hasattr(self, 'gearman_worker'):
self.gearman_worker.stop()
if hasattr(self, 'github_event_connector'):
self._stop_event_connector()
def _start_event_connector(self):
self.github_event_connector = GithubEventConnector(self)
self.github_event_connector = self._event_connector_class(self)
self.github_event_connector.start()
def _stop_event_connector(self):
@ -1230,18 +1215,6 @@ class GithubConnection(CachedBranchConnection):
installation_id, threading.Semaphore(
value=self.max_threads_per_installation))
def addEvent(self, data, event=None, delivery=None):
return self.event_queue.put((time.time(), data, event, delivery))
def getEvent(self):
return self.event_queue.get()
def getEventQueueSize(self):
return self.event_queue.qsize()
def eventDone(self):
self.event_queue.task_done()
def getGithubClient(self,
project_name=None,
zuul_event_id=None):
@ -2229,6 +2202,10 @@ class GithubWebController(BaseWebController):
def __init__(self, zuul_web, connection):
self.connection = connection
self.zuul_web = zuul_web
self.event_queue = ConnectionEventQueue(
self.zuul_web.zk_client,
self.connection.connection_name
)
self.token = self.connection.connection_config.get('webhook_token')
def _validate_signature(self, body, headers):
@ -2270,15 +2247,13 @@ class GithubWebController(BaseWebController):
headers[key.lower()] = value
body = cherrypy.request.body.read()
self._validate_signature(body, headers)
# We cannot send the raw body through gearman, so it's easy to just
# We cannot send the raw body through zookeeper, so it's easy to just
# encode it as json, after decoding it as utf-8
json_body = json.loads(body.decode('utf-8'))
job = self.zuul_web.rpc.submitJob(
'github:%s:payload' % self.connection.connection_name,
{'headers': headers, 'body': json_body})
return json.loads(job.data[0])
data = {'headers': headers, 'body': json_body}
self.event_queue.put(data)
return data
def _status_as_tuple(status):