Dispatch Pagure webhook events via Zookeeper
Change-Id: I46be9f6001bba84e31c1bba4a0c1507affd42205
This commit is contained in:
committed by
Tobias Henkel
parent
614bd40341
commit
244166a688
@@ -15,24 +15,20 @@
|
||||
import logging
|
||||
import hmac
|
||||
import hashlib
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
import re
|
||||
import json
|
||||
import requests
|
||||
import cherrypy
|
||||
import traceback
|
||||
import voluptuous as v
|
||||
|
||||
import gear
|
||||
|
||||
from zuul.connection import BaseConnection
|
||||
from zuul.lib.logutil import get_annotated_logger
|
||||
from zuul.web.handler import BaseWebController
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.model import Ref, Branch, Tag
|
||||
from zuul.lib import dependson
|
||||
from zuul.zk.event_queues import ConnectionEventQueue
|
||||
|
||||
from zuul.driver.pagure.paguremodel import PagureTriggerEvent, PullRequest
|
||||
|
||||
@@ -100,94 +96,6 @@ def _sign_request(body, secret):
|
||||
return signature, body
|
||||
|
||||
|
||||
class PagureGearmanWorker(object):
|
||||
"""A thread that answers gearman requests"""
|
||||
log = logging.getLogger("zuul.PagureGearmanWorker")
|
||||
|
||||
def __init__(self, connection):
|
||||
self.config = connection.sched.config
|
||||
self.connection = connection
|
||||
self.thread = threading.Thread(target=self._run,
|
||||
name='pagure-gearman-worker')
|
||||
self._running = False
|
||||
handler = "pagure:%s:payload" % self.connection.connection_name
|
||||
self.jobs = {
|
||||
handler: self.handle_payload,
|
||||
}
|
||||
|
||||
def _run(self):
|
||||
while self._running:
|
||||
try:
|
||||
job = self.gearman.getJob()
|
||||
try:
|
||||
if job.name not in self.jobs:
|
||||
self.log.exception("Exception while running job")
|
||||
job.sendWorkException(
|
||||
traceback.format_exc().encode('utf8'))
|
||||
continue
|
||||
output = self.jobs[job.name](json.loads(job.arguments))
|
||||
job.sendWorkComplete(json.dumps(output))
|
||||
except Exception:
|
||||
self.log.exception("Exception while running job")
|
||||
job.sendWorkException(
|
||||
traceback.format_exc().encode('utf8'))
|
||||
except gear.InterruptedError:
|
||||
pass
|
||||
except Exception:
|
||||
self.log.exception("Exception while getting job")
|
||||
|
||||
def handle_payload(self, args):
|
||||
payload = args["payload"]
|
||||
|
||||
self.log.info(
|
||||
"Pagure Webhook Received (id: %(msg_id)s, topic: %(topic)s)" % (
|
||||
payload))
|
||||
|
||||
try:
|
||||
self.__dispatch_event(payload)
|
||||
output = {'return_code': 200}
|
||||
except Exception:
|
||||
output = {'return_code': 503}
|
||||
self.log.exception("Exception handling Pagure event:")
|
||||
|
||||
return output
|
||||
|
||||
def __dispatch_event(self, payload):
|
||||
event = payload['topic']
|
||||
try:
|
||||
self.log.info("Dispatching event %s" % event)
|
||||
self.connection.addEvent(payload, event)
|
||||
except Exception as err:
|
||||
message = 'Exception dispatching event: %s' % str(err)
|
||||
self.log.exception(message)
|
||||
raise Exception(message)
|
||||
|
||||
def start(self):
|
||||
self._running = True
|
||||
server = self.config.get('gearman', 'server')
|
||||
port = get_default(self.config, 'gearman', 'port', 4730)
|
||||
ssl_key = get_default(self.config, 'gearman', 'ssl_key')
|
||||
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
self.gearman = gear.TextWorker('Zuul Pagure Connector')
|
||||
self.log.debug("Connect to gearman")
|
||||
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
|
||||
self.log.debug("Waiting for server")
|
||||
self.gearman.waitForServer()
|
||||
self.log.debug("Registering")
|
||||
for job in self.jobs:
|
||||
self.gearman.registerFunction(job)
|
||||
self.thread.start()
|
||||
|
||||
def stop(self):
|
||||
self._running = False
|
||||
self.gearman.stopWaitingForJobs()
|
||||
# We join here to avoid whitelisting the thread -- if it takes more
|
||||
# than 5s to stop in tests, there's a problem.
|
||||
self.thread.join(timeout=5)
|
||||
self.gearman.shutdown()
|
||||
|
||||
|
||||
class PagureEventConnector(threading.Thread):
|
||||
"""Move events from Pagure into the scheduler"""
|
||||
|
||||
@@ -197,7 +105,9 @@ class PagureEventConnector(threading.Thread):
|
||||
super(PagureEventConnector, self).__init__()
|
||||
self.daemon = True
|
||||
self.connection = connection
|
||||
self.event_queue = connection.event_queue
|
||||
self._stopped = False
|
||||
self._process_event = threading.Event()
|
||||
self.metadata_notif = re.compile(
|
||||
r"^\*\*Metadata Update", re.MULTILINE)
|
||||
self.event_handler_mapping = {
|
||||
@@ -218,14 +128,46 @@ class PagureEventConnector(threading.Thread):
|
||||
|
||||
def stop(self):
|
||||
self._stopped = True
|
||||
self.connection.addEvent(None)
|
||||
self._process_event.set()
|
||||
self.event_queue.election.cancel()
|
||||
|
||||
def _handleEvent(self):
|
||||
ts, json_body, event_type = self.connection.getEvent()
|
||||
def _onNewEvent(self):
|
||||
self._process_event.set()
|
||||
# Stop the data watch in case the connector was stopped
|
||||
return not self._stopped
|
||||
|
||||
def run(self):
|
||||
self.event_queue.registerEventWatch(self._onNewEvent)
|
||||
while not self._stopped:
|
||||
try:
|
||||
self.event_queue.election.run(self._run)
|
||||
except Exception:
|
||||
self.log.exception("Exception handling Pagure event:")
|
||||
|
||||
def _run(self):
|
||||
while not self._stopped:
|
||||
for event in self.event_queue:
|
||||
try:
|
||||
self._handleEvent(event)
|
||||
finally:
|
||||
self.event_queue.ack(event)
|
||||
if self._stopped:
|
||||
return
|
||||
self._process_event.wait(10)
|
||||
self._process_event.clear()
|
||||
|
||||
def _handleEvent(self, connection_event):
|
||||
if self._stopped:
|
||||
return
|
||||
|
||||
self.log.info("Received event: %s" % str(event_type))
|
||||
timestamp = time.time()
|
||||
json_body = connection_event["payload"]
|
||||
event_type = json_body['topic']
|
||||
|
||||
self.log.info(
|
||||
"Received event id: %s, topic: %s",
|
||||
json_body["msg_id"], event_type
|
||||
)
|
||||
# self.log.debug("Event payload: %s " % json_body)
|
||||
|
||||
if event_type not in self.event_handler_mapping:
|
||||
@@ -244,7 +186,7 @@ class PagureEventConnector(threading.Thread):
|
||||
event = None
|
||||
|
||||
if event:
|
||||
event.timestamp = ts
|
||||
event.timestamp = timestamp
|
||||
if event.change_number:
|
||||
project = self.connection.source.getProject(event.project_name)
|
||||
self.connection._getChange(project,
|
||||
@@ -389,17 +331,6 @@ class PagureEventConnector(threading.Thread):
|
||||
event.project_name].remove(event.branch)
|
||||
return event
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
if self._stopped:
|
||||
return
|
||||
try:
|
||||
self._handleEvent()
|
||||
except Exception:
|
||||
self.log.exception("Exception moving Pagure event:")
|
||||
finally:
|
||||
self.connection.eventDone()
|
||||
|
||||
|
||||
class PagureAPIClientException(Exception):
|
||||
pass
|
||||
@@ -542,18 +473,18 @@ class PagureConnection(BaseConnection):
|
||||
'source_whitelist', '').split(',')
|
||||
self.webhook_tokens = {}
|
||||
self.source = driver.getSource(self)
|
||||
self.event_queue = queue.Queue()
|
||||
self.metadata_notif = re.compile(
|
||||
r"^\*\*Metadata Update", re.MULTILINE)
|
||||
self.sched = None
|
||||
|
||||
def onLoad(self):
|
||||
self.log.info('Starting Pagure connection: %s' % self.connection_name)
|
||||
self.gearman_worker = PagureGearmanWorker(self)
|
||||
self.log.info('Starting Pagure connection: %s', self.connection_name)
|
||||
self.log.info('Creating Zookeeper event queue')
|
||||
self.event_queue = ConnectionEventQueue(
|
||||
self.sched.zk_client, self.connection_name
|
||||
)
|
||||
self.log.info('Starting event connector')
|
||||
self._start_event_connector()
|
||||
self.log.info('Starting GearmanWorker')
|
||||
self.gearman_worker.start()
|
||||
|
||||
def _start_event_connector(self):
|
||||
self.pagure_event_connector = PagureEventConnector(self)
|
||||
@@ -565,19 +496,9 @@ class PagureConnection(BaseConnection):
|
||||
self.pagure_event_connector.join()
|
||||
|
||||
def onStop(self):
|
||||
if hasattr(self, 'gearman_worker'):
|
||||
self.gearman_worker.stop()
|
||||
if hasattr(self, 'pagure_event_connector'):
|
||||
self._stop_event_connector()
|
||||
|
||||
def addEvent(self, data, event=None):
|
||||
return self.event_queue.put((time.time(), data, event))
|
||||
|
||||
def getEvent(self):
|
||||
return self.event_queue.get()
|
||||
|
||||
def eventDone(self):
|
||||
self.event_queue.task_done()
|
||||
|
||||
def set_my_username(self, client):
|
||||
self.log.debug("Fetching my username ...")
|
||||
self.username = client.whoami()
|
||||
@@ -867,6 +788,10 @@ class PagureWebController(BaseWebController):
|
||||
def __init__(self, zuul_web, connection):
|
||||
self.connection = connection
|
||||
self.zuul_web = zuul_web
|
||||
self.event_queue = ConnectionEventQueue(
|
||||
self.zuul_web.zk_client,
|
||||
self.connection.connection_name
|
||||
)
|
||||
|
||||
def _source_whitelisted(self, remote_ip, forwarded_ip):
|
||||
if remote_ip and remote_ip in self.connection.source_whitelist:
|
||||
@@ -922,11 +847,9 @@ class PagureWebController(BaseWebController):
|
||||
"Payload origin IP address whitelisted. Skip verify")
|
||||
|
||||
json_payload = json.loads(body.decode('utf-8'))
|
||||
job = self.zuul_web.rpc.submitJob(
|
||||
'pagure:%s:payload' % self.connection.connection_name,
|
||||
{'payload': json_payload})
|
||||
|
||||
return json.loads(job.data[0])
|
||||
data = {'payload': json_payload}
|
||||
self.event_queue.put(data)
|
||||
return data
|
||||
|
||||
|
||||
def getSchema():
|
||||
|
||||
Reference in New Issue
Block a user