Merge "Dispatch Gitlab webhook events via Zookeeper"

This commit is contained in:
Zuul 2021-03-26 21:12:29 +00:00 committed by Gerrit Code Review
commit ffc6bf5c6d
2 changed files with 54 additions and 97 deletions

View File

@ -310,7 +310,6 @@ class GitlabDriverMock(GitlabDriver):
self, name, config, self.rpcclient,
changes_db=db,
upstream_root=self.upstream_root)
self.additional_event_queues.append(connection.event_queue)
setattr(self.registry, 'fake_' + name, connection)
registerProjects(connection.source.name, connection.gl_client,
self.config)
@ -1836,10 +1835,9 @@ class FakeGitlabConnection(gitlabconnection.GitlabConnection):
% (self.zuul_web_port, self.connection_name),
data=payload, headers=headers)
else:
job = self.rpcclient.submitJob(
'gitlab:%s:payload' % self.connection_name,
{'payload': payload})
return json.loads(job.data[0])
data = {'payload': payload}
self.event_queue.put(data)
return data
def setZuulWebPort(self, port):
self.zuul_web_port = port

View File

@ -15,7 +15,6 @@
import logging
import threading
import json
import queue
import cherrypy
import voluptuous as v
import time
@ -29,63 +28,10 @@ from typing import List, Optional
from zuul.connection import CachedBranchConnection
from zuul.web.handler import BaseWebController
from zuul.lib.gearworker import ZuulGearWorker
from zuul.lib.logutil import get_annotated_logger
from zuul.model import Branch, Project, Ref, Tag
from zuul.driver.gitlab.gitlabmodel import GitlabTriggerEvent, MergeRequest
class GitlabGearmanWorker(object):
"""A thread that answers gearman requests"""
log = logging.getLogger("zuul.GitlabGearmanWorker")
def __init__(self, connection):
self.config = connection.sched.config
self.connection = connection
handler = "gitlab:%s:payload" % self.connection.connection_name
self.jobs = {
handler: self.handle_payload,
}
self.gearworker = ZuulGearWorker(
'Zuul Gitlab Worker',
'zuul.GitlabGearmanWorker',
'gitlab',
self.config,
self.jobs)
def handle_payload(self, job):
args = json.loads(job.arguments)
payload = args["payload"]
self.log.info(
"Gitlab Webhook Received event kind: %(object_kind)s" % payload)
try:
self.__dispatch_event(payload)
output = {'return_code': 200}
except Exception:
output = {'return_code': 503}
self.log.exception("Exception handling Gitlab event:")
job.sendWorkComplete(json.dumps(output))
def __dispatch_event(self, payload):
self.log.info(payload)
event = payload['object_kind']
try:
self.log.info("Dispatching event %s" % event)
self.connection.addEvent(payload, event)
except Exception as err:
message = 'Exception dispatching event: %s' % str(err)
self.log.exception(message)
raise Exception(message)
def start(self):
self.gearworker.start()
def stop(self):
self.gearworker.stop()
from zuul.zk.event_queues import ConnectionEventQueue
class GitlabEventConnector(threading.Thread):
@ -97,7 +43,9 @@ class GitlabEventConnector(threading.Thread):
super(GitlabEventConnector, self).__init__()
self.daemon = True
self.connection = connection
self.event_queue = connection.event_queue
self._stopped = False
self._process_event = threading.Event()
self.event_handler_mapping = {
'merge_request': self._event_merge_request,
'note': self._event_note,
@ -107,7 +55,33 @@ class GitlabEventConnector(threading.Thread):
def stop(self):
self._stopped = True
self.connection.addEvent(None)
self._process_event.set()
self.event_queue.election.cancel()
def _onNewEvent(self):
self._process_event.set()
# Stop the data watch in case the connector was stopped
return not self._stopped
def run(self):
self.event_queue.registerEventWatch(self._onNewEvent)
while not self._stopped:
try:
self.event_queue.election.run(self._run)
except Exception:
self.log.exception("Exception handling Gitlab event:")
def _run(self):
while not self._stopped:
for event in self.event_queue:
try:
self._handleEvent(event)
finally:
self.event_queue.ack(event)
if self._stopped:
return
self._process_event.wait(10)
self._process_event.clear()
def _event_base(self, body):
event = GitlabTriggerEvent()
@ -195,12 +169,16 @@ class GitlabEventConnector(threading.Thread):
event.type = 'gl_push'
return event
def _handleEvent(self):
ts, json_body, event_type = self.connection.getEvent()
def _handleEvent(self, connection_event):
if self._stopped:
return
self.log.info("Received event: %s" % str(event_type))
timestamp = time.time()
json_body = connection_event["payload"]
self.log.info(json_body)
event_type = json_body['object_kind']
self.log.info("Received event: %s", event_type)
if event_type not in self.event_handler_mapping:
message = "Unhandled Gitlab event: %s" % event_type
@ -219,7 +197,7 @@ class GitlabEventConnector(threading.Thread):
if event:
event.zuul_event_id = str(uuid.uuid4())
event.timestamp = ts
event.timestamp = timestamp
event.project_hostname = self.connection.canonical_hostname
if event.change_number:
project = self.connection.source.getProject(event.project_name)
@ -241,17 +219,6 @@ class GitlabEventConnector(threading.Thread):
self.connection.driver_name, event
)
def run(self):
while True:
if self._stopped:
return
try:
self._handleEvent()
except Exception:
self.log.exception("Exception moving Gitlab event:")
finally:
self.connection.eventDone()
class GitlabAPIClientException(Exception):
pass
@ -416,7 +383,6 @@ class GitlabConnection(CachedBranchConnection):
'api_token', '')
self.gl_client = GitlabAPIClient(self.baseurl, self.api_token)
self.sched = None
self.event_queue = queue.Queue()
self.source = driver.getSource(self)
def _start_event_connector(self):
@ -429,27 +395,18 @@ class GitlabConnection(CachedBranchConnection):
self.gitlab_event_connector.join()
def onLoad(self):
self.log.info('Starting Gitlab connection: %s' % self.connection_name)
self.gearman_worker = GitlabGearmanWorker(self)
self.log.info('Starting Gitlab connection: %s', self.connection_name)
self.log.info('Creating Zookeeper event queue')
self.event_queue = ConnectionEventQueue(
self.sched.zk_client, self.connection_name
)
self.log.info('Starting event connector')
self._start_event_connector()
self.log.info('Starting GearmanWorker')
self.gearman_worker.start()
def onStop(self):
if hasattr(self, 'gearman_worker'):
self.gearman_worker.stop()
if hasattr(self, 'gitlab_event_connector'):
self._stop_event_connector()
def addEvent(self, data, event=None):
return self.event_queue.put((time.time(), data, event))
def getEvent(self):
return self.event_queue.get()
def eventDone(self):
self.event_queue.task_done()
def getWebController(self, zuul_web):
return GitlabWebController(zuul_web, self)
@ -630,6 +587,10 @@ class GitlabWebController(BaseWebController):
def __init__(self, zuul_web, connection):
self.connection = connection
self.zuul_web = zuul_web
self.event_queue = ConnectionEventQueue(
self.zuul_web.zk_client,
self.connection.connection_name
)
def _validate_token(self, headers):
try:
@ -658,11 +619,9 @@ class GitlabWebController(BaseWebController):
self._validate_token(headers)
json_payload = json.loads(body.decode('utf-8'))
job = self.zuul_web.rpc.submitJob(
'gitlab:%s:payload' % self.connection.connection_name,
{'payload': json_payload})
return json.loads(job.data[0])
data = {'payload': json_payload}
self.event_queue.put(data)
return data
def getSchema():