Merge "Dispatch Pagure webhook events via Zookeeper"

This commit is contained in:
Zuul 2021-03-26 21:12:07 +00:00 committed by Gerrit Code Review
commit 6a353e824f
2 changed files with 56 additions and 135 deletions

View File

@ -287,7 +287,6 @@ class PagureDriverMock(PagureDriver):
self, name, config, self.rpcclient,
changes_db=db,
upstream_root=self.upstream_root)
self.additional_event_queues.append(connection.event_queue)
setattr(self.registry, 'fake_' + name, connection)
return connection
@ -1722,10 +1721,9 @@ class FakePagureConnection(pagureconnection.PagureConnection):
% (self.zuul_web_port, self.connection_name),
data=payload, headers=headers)
else:
job = self.rpcclient.submitJob(
'pagure:%s:payload' % self.connection_name,
{'payload': payload})
return json.loads(job.data[0])
data = {'payload': payload}
self.event_queue.put(data)
return data
def openFakePullRequest(self, project, branch, subject, files=[],
initial_comment=None):

View File

@ -15,24 +15,20 @@
import logging
import hmac
import hashlib
import queue
import threading
import time
import re
import json
import requests
import cherrypy
import traceback
import voluptuous as v
import gear
from zuul.connection import BaseConnection
from zuul.lib.logutil import get_annotated_logger
from zuul.web.handler import BaseWebController
from zuul.lib.config import get_default
from zuul.model import Ref, Branch, Tag
from zuul.lib import dependson
from zuul.zk.event_queues import ConnectionEventQueue
from zuul.driver.pagure.paguremodel import PagureTriggerEvent, PullRequest
@ -100,94 +96,6 @@ def _sign_request(body, secret):
return signature, body
class PagureGearmanWorker(object):
"""A thread that answers gearman requests"""
log = logging.getLogger("zuul.PagureGearmanWorker")
def __init__(self, connection):
self.config = connection.sched.config
self.connection = connection
self.thread = threading.Thread(target=self._run,
name='pagure-gearman-worker')
self._running = False
handler = "pagure:%s:payload" % self.connection.connection_name
self.jobs = {
handler: self.handle_payload,
}
def _run(self):
while self._running:
try:
job = self.gearman.getJob()
try:
if job.name not in self.jobs:
self.log.exception("Exception while running job")
job.sendWorkException(
traceback.format_exc().encode('utf8'))
continue
output = self.jobs[job.name](json.loads(job.arguments))
job.sendWorkComplete(json.dumps(output))
except Exception:
self.log.exception("Exception while running job")
job.sendWorkException(
traceback.format_exc().encode('utf8'))
except gear.InterruptedError:
pass
except Exception:
self.log.exception("Exception while getting job")
def handle_payload(self, args):
payload = args["payload"]
self.log.info(
"Pagure Webhook Received (id: %(msg_id)s, topic: %(topic)s)" % (
payload))
try:
self.__dispatch_event(payload)
output = {'return_code': 200}
except Exception:
output = {'return_code': 503}
self.log.exception("Exception handling Pagure event:")
return output
def __dispatch_event(self, payload):
event = payload['topic']
try:
self.log.info("Dispatching event %s" % event)
self.connection.addEvent(payload, event)
except Exception as err:
message = 'Exception dispatching event: %s' % str(err)
self.log.exception(message)
raise Exception(message)
def start(self):
self._running = True
server = self.config.get('gearman', 'server')
port = get_default(self.config, 'gearman', 'port', 4730)
ssl_key = get_default(self.config, 'gearman', 'ssl_key')
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
self.gearman = gear.TextWorker('Zuul Pagure Connector')
self.log.debug("Connect to gearman")
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
self.log.debug("Waiting for server")
self.gearman.waitForServer()
self.log.debug("Registering")
for job in self.jobs:
self.gearman.registerFunction(job)
self.thread.start()
def stop(self):
self._running = False
self.gearman.stopWaitingForJobs()
# We join here to avoid whitelisting the thread -- if it takes more
# than 5s to stop in tests, there's a problem.
self.thread.join(timeout=5)
self.gearman.shutdown()
class PagureEventConnector(threading.Thread):
"""Move events from Pagure into the scheduler"""
@ -197,7 +105,9 @@ class PagureEventConnector(threading.Thread):
super(PagureEventConnector, self).__init__()
self.daemon = True
self.connection = connection
self.event_queue = connection.event_queue
self._stopped = False
self._process_event = threading.Event()
self.metadata_notif = re.compile(
r"^\*\*Metadata Update", re.MULTILINE)
self.event_handler_mapping = {
@ -218,14 +128,46 @@ class PagureEventConnector(threading.Thread):
def stop(self):
self._stopped = True
self.connection.addEvent(None)
self._process_event.set()
self.event_queue.election.cancel()
def _handleEvent(self):
ts, json_body, event_type = self.connection.getEvent()
def _onNewEvent(self):
self._process_event.set()
# Stop the data watch in case the connector was stopped
return not self._stopped
def run(self):
self.event_queue.registerEventWatch(self._onNewEvent)
while not self._stopped:
try:
self.event_queue.election.run(self._run)
except Exception:
self.log.exception("Exception handling Pagure event:")
def _run(self):
while not self._stopped:
for event in self.event_queue:
try:
self._handleEvent(event)
finally:
self.event_queue.ack(event)
if self._stopped:
return
self._process_event.wait(10)
self._process_event.clear()
def _handleEvent(self, connection_event):
if self._stopped:
return
self.log.info("Received event: %s" % str(event_type))
timestamp = time.time()
json_body = connection_event["payload"]
event_type = json_body['topic']
self.log.info(
"Received event id: %s, topic: %s",
json_body["msg_id"], event_type
)
# self.log.debug("Event payload: %s " % json_body)
if event_type not in self.event_handler_mapping:
@ -244,7 +186,7 @@ class PagureEventConnector(threading.Thread):
event = None
if event:
event.timestamp = ts
event.timestamp = timestamp
if event.change_number:
project = self.connection.source.getProject(event.project_name)
self.connection._getChange(project,
@ -389,17 +331,6 @@ class PagureEventConnector(threading.Thread):
event.project_name].remove(event.branch)
return event
def run(self):
while True:
if self._stopped:
return
try:
self._handleEvent()
except Exception:
self.log.exception("Exception moving Pagure event:")
finally:
self.connection.eventDone()
class PagureAPIClientException(Exception):
pass
@ -542,18 +473,18 @@ class PagureConnection(BaseConnection):
'source_whitelist', '').split(',')
self.webhook_tokens = {}
self.source = driver.getSource(self)
self.event_queue = queue.Queue()
self.metadata_notif = re.compile(
r"^\*\*Metadata Update", re.MULTILINE)
self.sched = None
def onLoad(self):
self.log.info('Starting Pagure connection: %s' % self.connection_name)
self.gearman_worker = PagureGearmanWorker(self)
self.log.info('Starting Pagure connection: %s', self.connection_name)
self.log.info('Creating Zookeeper event queue')
self.event_queue = ConnectionEventQueue(
self.sched.zk_client, self.connection_name
)
self.log.info('Starting event connector')
self._start_event_connector()
self.log.info('Starting GearmanWorker')
self.gearman_worker.start()
def _start_event_connector(self):
self.pagure_event_connector = PagureEventConnector(self)
@ -565,19 +496,9 @@ class PagureConnection(BaseConnection):
self.pagure_event_connector.join()
def onStop(self):
if hasattr(self, 'gearman_worker'):
self.gearman_worker.stop()
if hasattr(self, 'pagure_event_connector'):
self._stop_event_connector()
def addEvent(self, data, event=None):
return self.event_queue.put((time.time(), data, event))
def getEvent(self):
return self.event_queue.get()
def eventDone(self):
self.event_queue.task_done()
def set_my_username(self, client):
self.log.debug("Fetching my username ...")
self.username = client.whoami()
@ -867,6 +788,10 @@ class PagureWebController(BaseWebController):
def __init__(self, zuul_web, connection):
self.connection = connection
self.zuul_web = zuul_web
self.event_queue = ConnectionEventQueue(
self.zuul_web.zk_client,
self.connection.connection_name
)
def _source_whitelisted(self, remote_ip, forwarded_ip):
if remote_ip and remote_ip in self.connection.source_whitelist:
@ -922,11 +847,9 @@ class PagureWebController(BaseWebController):
"Payload origin IP address whitelisted. Skip verify")
json_payload = json.loads(body.decode('utf-8'))
job = self.zuul_web.rpc.submitJob(
'pagure:%s:payload' % self.connection.connection_name,
{'payload': json_payload})
return json.loads(job.data[0])
data = {'payload': json_payload}
self.event_queue.put(data)
return data
def getSchema():