Merge "Move github webhook from webapp to zuul-web"

This commit is contained in:
Zuul
2018-02-03 00:16:44 +00:00
committed by Gerrit Code Review
9 changed files with 292 additions and 92 deletions

View File

@@ -66,9 +66,11 @@ import zuul.merger.merger
import zuul.merger.server
import zuul.model
import zuul.nodepool
import zuul.rpcclient
import zuul.zk
import zuul.configloader
from zuul.exceptions import MergeFailure
from zuul.lib.config import get_default
FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
'fixtures')
@@ -939,7 +941,7 @@ class FakeGithubPullRequest(object):
class FakeGithubConnection(githubconnection.GithubConnection):
log = logging.getLogger("zuul.test.FakeGithubConnection")
def __init__(self, driver, connection_name, connection_config,
def __init__(self, driver, connection_name, connection_config, rpcclient,
changes_db=None, upstream_root=None):
super(FakeGithubConnection, self).__init__(driver, connection_name,
connection_config)
@@ -952,12 +954,16 @@ class FakeGithubConnection(githubconnection.GithubConnection):
self.merge_not_allowed_count = 0
self.reports = []
self.github_client = tests.fakegithub.FakeGithub(changes_db)
self.rpcclient = rpcclient
def getGithubClient(self,
project=None,
user_id=None):
return self.github_client
def setZuulWebPort(self, port):
self.zuul_web_port = port
def openFakePullRequest(self, project, branch, subject, files=[],
body=None):
self.pr_number += 1
@@ -991,19 +997,25 @@ class FakeGithubConnection(githubconnection.GithubConnection):
}
return (name, data)
def emitEvent(self, event):
def emitEvent(self, event, use_zuulweb=False):
"""Emulates sending the GitHub webhook event to the connection."""
port = self.webapp.server.socket.getsockname()[1]
name, data = event
payload = json.dumps(data).encode('utf8')
secret = self.connection_config['webhook_token']
signature = githubconnection._sign_request(payload, secret)
headers = {'X-Github-Event': name, 'X-Hub-Signature': signature}
req = urllib.request.Request(
'http://localhost:%s/connection/%s/payload'
% (port, self.connection_name),
data=payload, headers=headers)
return urllib.request.urlopen(req)
headers = {'x-github-event': name, 'x-hub-signature': signature}
if use_zuulweb:
req = urllib.request.Request(
'http://127.0.0.1:%s/driver/github/%s/payload'
% (self.zuul_web_port, self.connection_name),
data=payload, headers=headers)
return urllib.request.urlopen(req)
else:
job = self.rpcclient.submitJob(
'github:%s:payload' % self.connection_name,
{'headers': headers, 'body': data})
return json.loads(job.data[0])
def addProject(self, project):
# use the original method here and additionally register it in the
@@ -1974,6 +1986,13 @@ class ZuulTestCase(BaseTestCase):
'gearman', 'ssl_key',
os.path.join(FIXTURE_DIR, 'gearman/client.key'))
self.rpcclient = zuul.rpcclient.RPCClient(
self.config.get('gearman', 'server'),
self.gearman_server.port,
get_default(self.config, 'gearman', 'ssl_key'),
get_default(self.config, 'gearman', 'ssl_cert'),
get_default(self.config, 'gearman', 'ssl_ca'))
gerritsource.GerritSource.replication_timeout = 1.5
gerritsource.GerritSource.replication_retry_interval = 0.5
gerritconnection.GerritEventConnector.delay = 0.0
@@ -1991,7 +2010,7 @@ class ZuulTestCase(BaseTestCase):
]
self.configure_connections()
self.sched.registerConnections(self.connections, self.webapp)
self.sched.registerConnections(self.connections)
self.executor_server = RecordingExecutorServer(
self.config, self.connections,
@@ -2057,6 +2076,7 @@ class ZuulTestCase(BaseTestCase):
server = config.get('server', 'github.com')
db = self.github_changes_dbs.setdefault(server, {})
con = FakeGithubConnection(driver, name, config,
self.rpcclient,
changes_db=db,
upstream_root=self.upstream_root)
self.event_queues.append(con.event_queue)
@@ -2291,6 +2311,7 @@ class ZuulTestCase(BaseTestCase):
self.statsd.join()
self.webapp.stop()
self.webapp.join()
self.rpcclient.shutdown()
self.gearman_server.shutdown()
self.fake_nodepool.stop()
self.zk.disconnect()

View File

@@ -12,15 +12,20 @@
# License for the specific language governing permissions and limitations
# under the License.
import asyncio
import threading
import os
import re
from testtools.matchers import MatchesRegex, StartsWith
import urllib
import socket
import time
from unittest import skip
import git
import zuul.web
from tests.base import ZuulTestCase, simple_layout, random_sha1
@@ -734,3 +739,85 @@ class TestGithubUnprotectedBranches(ZuulTestCase):
# project2 should have no parsed branch
self.assertEqual(0, len(project2.unparsed_branch_config.keys()))
class TestGithubWebhook(ZuulTestCase):
config_file = 'zuul-github-driver.conf'
def setUp(self):
super(TestGithubWebhook, self).setUp()
# Start the web server
self.web = zuul.web.ZuulWeb(
listen_address='127.0.0.1', listen_port=0,
gear_server='127.0.0.1', gear_port=self.gearman_server.port,
github_connections={'github': self.fake_github})
loop = asyncio.new_event_loop()
loop.set_debug(True)
ws_thread = threading.Thread(target=self.web.run, args=(loop,))
ws_thread.start()
self.addCleanup(loop.close)
self.addCleanup(ws_thread.join)
self.addCleanup(self.web.stop)
host = '127.0.0.1'
# Wait until web server is started
while True:
time.sleep(0.1)
if self.web.server is None:
continue
port = self.web.server.sockets[0].getsockname()[1]
try:
with socket.create_connection((host, port)):
break
except ConnectionRefusedError:
pass
self.fake_github.setZuulWebPort(port)
def tearDown(self):
super(TestGithubWebhook, self).tearDown()
@simple_layout('layouts/basic-github.yaml', driver='github')
def test_webhook(self):
"""Test that we can get github events via zuul-web."""
self.executor_server.hold_jobs_in_build = True
A = self.fake_github.openFakePullRequest('org/project', 'master', 'A')
self.fake_github.emitEvent(A.getPullRequestOpenedEvent(),
use_zuulweb=True)
self.waitUntilSettled()
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertEqual('SUCCESS',
self.getJobFromHistory('project-test1').result)
self.assertEqual('SUCCESS',
self.getJobFromHistory('project-test2').result)
job = self.getJobFromHistory('project-test2')
zuulvars = job.parameters['zuul']
self.assertEqual(str(A.number), zuulvars['change'])
self.assertEqual(str(A.head_sha), zuulvars['patchset'])
self.assertEqual('master', zuulvars['branch'])
self.assertEqual(1, len(A.comments))
self.assertThat(
A.comments[0],
MatchesRegex('.*\[project-test1 \]\(.*\).*', re.DOTALL))
self.assertThat(
A.comments[0],
MatchesRegex('.*\[project-test2 \]\(.*\).*', re.DOTALL))
self.assertEqual(2, len(self.history))
# test_pull_unmatched_branch_event(self):
self.create_branch('org/project', 'unmatched_branch')
B = self.fake_github.openFakePullRequest(
'org/project', 'unmatched_branch', 'B')
self.fake_github.emitEvent(B.getPullRequestOpenedEvent(),
use_zuulweb=True)
self.waitUntilSettled()
self.assertEqual(2, len(self.history))

View File

@@ -162,7 +162,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
self.log.info('Starting scheduler')
try:
self.sched.start()
self.sched.registerConnections(self.connections, webapp)
self.sched.registerConnections(self.connections)
self.sched.reconfigure(self.config)
self.sched.resume()
except Exception:

View File

@@ -23,6 +23,7 @@ import zuul.cmd
import zuul.web
from zuul.driver.sql import sqlconnection
from zuul.driver.github import githubconnection
from zuul.lib.config import get_default
@@ -73,6 +74,11 @@ class WebServer(zuul.cmd.ZuulDaemonApp):
sql_conn = connections[0]
params['sql_connection'] = sql_conn
params['github_connections'] = {}
for conn_name, connection in self.connections.connections.items():
if isinstance(connection, githubconnection.GithubConnection):
params['github_connections'][conn_name] = connection
try:
self.web = zuul.web.ZuulWeb(**params)
except Exception as e:

View File

@@ -74,21 +74,3 @@ class BaseConnection(object, metaclass=abc.ABCMeta):
This lets the user supply a list of change objects that are
still in use. Anything in our cache that isn't in the supplied
list should be safe to remove from the cache."""
def registerWebapp(self, webapp):
self.webapp = webapp
def registerHttpHandler(self, path, handler):
"""Add connection handler for HTTP URI.
Connection can use builtin HTTP server for listening on incoming event
requests. The resulting path will be /connection/connection_name/path.
"""
self.webapp.register_path(self._connectionPath(path), handler)
def unregisterHttpHandler(self, path):
"""Remove the connection handler for HTTP URI."""
self.webapp.unregister_path(self._connectionPath(path))
def _connectionPath(self, path):
return '/connection/%s/%s' % (self.connection_name, path)

View File

@@ -21,6 +21,8 @@ import queue
import threading
import time
import re
import json
import traceback
import cachecontrol
from cachecontrol.cache import DictCache
@@ -28,13 +30,14 @@ from cachecontrol.heuristics import BaseHeuristic
import iso8601
import jwt
import requests
import webob
import webob.dec
import voluptuous as v
import github3
import github3.exceptions
import gear
from zuul.connection import BaseConnection
from zuul.lib.config import get_default
from zuul.model import Ref, Branch, Tag, Project
from zuul.exceptions import MergeFailure
from zuul.driver.github.githubmodel import PullRequest, GithubTriggerEvent
@@ -65,71 +68,101 @@ class UTC(datetime.tzinfo):
utc = UTC()
class GithubWebhookListener():
log = logging.getLogger("zuul.GithubWebhookListener")
class GithubGearmanWorker(object):
"""A thread that answers gearman requests"""
log = logging.getLogger("zuul.GithubGearmanWorker")
def __init__(self, connection):
self.config = connection.sched.config
self.connection = connection
self.thread = threading.Thread(target=self._run,
name='github-gearman-worker')
self._running = False
handler = "github:%s:payload" % self.connection.connection_name
self.jobs = {
handler: self.handle_payload,
}
def handle_request(self, path, tenant_name, request):
if request.method != 'POST':
self.log.debug("Only POST method is allowed.")
raise webob.exc.HTTPMethodNotAllowed(
'Only POST method is allowed.')
def _run(self):
while self._running:
try:
job = self.gearman.getJob()
try:
if job.name not in self.jobs:
self.log.exception("Exception while running job")
job.sendWorkException(
traceback.format_exc().encode('utf8'))
continue
output = self.jobs[job.name](json.loads(job.arguments))
job.sendWorkComplete(json.dumps(output))
except Exception:
self.log.exception("Exception while running job")
job.sendWorkException(
traceback.format_exc().encode('utf8'))
except gear.InterruptedError:
pass
except Exception:
self.log.exception("Exception while getting job")
delivery = request.headers.get('X-GitHub-Delivery')
def handle_payload(self, args):
headers = args.get("headers")
body = args.get("body")
delivery = headers.get('X-GitHub-Delivery')
self.log.debug("Github Webhook Received: {delivery}".format(
delivery=delivery))
self._validate_signature(request)
# TODO(jlk): Validate project in the request is a project we know
try:
self.__dispatch_event(request)
self.__dispatch_event(body, headers)
output = {'return_code': 200}
except Exception:
output = {'return_code': 503}
self.log.exception("Exception handling Github event:")
def __dispatch_event(self, request):
return output
def __dispatch_event(self, body, headers):
try:
event = request.headers['X-Github-Event']
event = headers['x-github-event']
self.log.debug("X-Github-Event: " + event)
except KeyError:
self.log.debug("Request headers missing the X-Github-Event.")
raise webob.exc.HTTPBadRequest('Please specify a X-Github-Event '
'header.')
raise Exception('Please specify a X-Github-Event header.')
try:
json_body = request.json_body
self.connection.addEvent(json_body, event)
self.connection.addEvent(body, event)
except Exception:
message = 'Exception deserializing JSON body'
self.log.exception(message)
raise webob.exc.HTTPBadRequest(message)
# TODO(jlk): Raise this as something different?
raise Exception(message)
def _validate_signature(self, request):
secret = self.connection.connection_config.get('webhook_token', None)
if secret is None:
raise RuntimeError("webhook_token is required")
def start(self):
self._running = True
server = self.config.get('gearman', 'server')
port = get_default(self.config, 'gearman', 'port', 4730)
ssl_key = get_default(self.config, 'gearman', 'ssl_key')
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
self.gearman = gear.TextWorker('Zuul Github Connector')
self.log.debug("Connect to gearman")
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
self.log.debug("Waiting for server")
self.gearman.waitForServer()
self.log.debug("Registering")
for job in self.jobs:
self.gearman.registerFunction(job)
self.thread.start()
body = request.body
try:
request_signature = request.headers['X-Hub-Signature']
except KeyError:
raise webob.exc.HTTPUnauthorized(
'Please specify a X-Hub-Signature header with secret.')
payload_signature = _sign_request(body, secret)
self.log.debug("Payload Signature: {0}".format(str(payload_signature)))
self.log.debug("Request Signature: {0}".format(str(request_signature)))
if not hmac.compare_digest(
str(payload_signature), str(request_signature)):
raise webob.exc.HTTPUnauthorized(
'Request signature does not match calculated payload '
'signature. Check that secret is correct.')
return True
def stop(self):
self._running = False
self.gearman.stopWaitingForJobs()
# We join here to avoid whitelisting the thread -- if it takes more
# than 5s to stop in tests, there's a problem.
self.thread.join(timeout=5)
self.gearman.shutdown()
class GithubEventConnector(threading.Thread):
@@ -458,15 +491,18 @@ class GithubConnection(BaseConnection):
re.MULTILINE | re.IGNORECASE)
def onLoad(self):
webhook_listener = GithubWebhookListener(self)
self.registerHttpHandler(self.payload_path,
webhook_listener.handle_request)
self.log.info('Starting GitHub connection: %s' % self.connection_name)
self.gearman_worker = GithubGearmanWorker(self)
self.log.info('Authing to GitHub')
self._authenticateGithubAPI()
self._prime_installation_map()
self.log.info('Starting event connector')
self._start_event_connector()
self.log.info('Starting GearmanWorker')
self.gearman_worker.start()
def onStop(self):
self.unregisterHttpHandler(self.payload_path)
self.gearman_worker.stop()
self._stop_event_connector()
def _start_event_connector(self):

View File

@@ -66,13 +66,6 @@ class ConnectionRegistry(object):
if load:
connection.onLoad()
def registerWebapp(self, webapp):
for driver_name, driver in self.drivers.items():
if hasattr(driver, 'registerWebapp'):
driver.registerWebapp(webapp)
for connection_name, connection in self.connections.items():
connection.registerWebapp(webapp)
def reconfigureDrivers(self, tenant):
for driver in self.drivers.values():
if hasattr(driver, 'reconfigure'):

View File

@@ -292,11 +292,10 @@ class Scheduler(threading.Thread):
except Exception:
self.log.exception("Exception while processing command")
def registerConnections(self, connections, webapp, load=True):
def registerConnections(self, connections, load=True):
# load: whether or not to trigger the onLoad for the connection. This
# is useful for not doing a full load during layout validation.
self.connections = connections
self.connections.registerWebapp(webapp)
self.connections.registerScheduler(self, load)
def stopConnections(self):

View File

@@ -16,6 +16,8 @@
import asyncio
import hashlib
import hmac
import json
import logging
import os
@@ -33,6 +35,12 @@ import zuul.rpcclient
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
def _sign_request(body, secret):
signature = 'sha1=' + hmac.new(
secret.encode('utf-8'), body, hashlib.sha1).hexdigest()
return signature
class LogStreamingHandler(object):
log = logging.getLogger("zuul.web.LogStreamingHandler")
@@ -150,8 +158,9 @@ class GearmanHandler(object):
# Tenant status cache expiry
cache_expiry = 1
def __init__(self, rpc):
def __init__(self, rpc, github_connections):
self.rpc = rpc
self.github_connections = github_connections
self.cache = {}
self.cache_time = {}
self.controllers = {
@@ -159,13 +168,14 @@ class GearmanHandler(object):
'status_get': self.status_get,
'job_list': self.job_list,
'key_get': self.key_get,
'payload_post': self.payload_post,
}
def tenant_list(self, request):
async def tenant_list(self, request):
job = self.rpc.submitJob('zuul:tenant_list', {})
return web.json_response(json.loads(job.data[0]))
def status_get(self, request):
async def status_get(self, request):
tenant = request.match_info["tenant"]
if tenant not in self.cache or \
(time.time() - self.cache_time[tenant]) > self.cache_expiry:
@@ -179,23 +189,82 @@ class GearmanHandler(object):
resp.last_modified = self.cache_time[tenant]
return resp
def job_list(self, request):
async def job_list(self, request):
tenant = request.match_info["tenant"]
job = self.rpc.submitJob('zuul:job_list', {'tenant': tenant})
resp = web.json_response(json.loads(job.data[0]))
resp.headers['Access-Control-Allow-Origin'] = '*'
return resp
def key_get(self, request):
async def key_get(self, request):
tenant = request.match_info["tenant"]
project = request.match_info["project"]
job = self.rpc.submitJob('zuul:key_get', {'tenant': tenant,
'project': project})
return web.Response(body=job.data[0])
def _validate_signature(self, body, headers, secret):
try:
request_signature = headers['x-hub-signature']
except KeyError:
raise web.HTTPUnauthorized(
reason='X-Hub-Signature header missing.')
payload_signature = _sign_request(body, secret)
self.log.debug("Payload Signature: {0}".format(str(payload_signature)))
self.log.debug("Request Signature: {0}".format(str(request_signature)))
if not hmac.compare_digest(
str(payload_signature), str(request_signature)):
raise web.HTTPUnauthorized(
reason=('Request signature does not match calculated payload '
'signature. Check that secret is correct.'))
return True
async def github_payload(self, post):
connection = post.match_info["connection"]
github_connection = self.github_connections.get(connection)
token = github_connection.connection_config.get('webhook_token')
# Note(tobiash): We need to normalize the headers. Otherwise we will
# have trouble to get them from the dict afterwards.
# e.g.
# GitHub: sent: X-GitHub-Event received: X-GitHub-Event
# urllib: sent: X-GitHub-Event received: X-Github-Event
#
# We cannot easily solve this mismatch as every http processing lib
# modifies the header casing in its own way and by specification http
# headers are case insensitive so just lowercase all so we don't have
# to take care later.
headers = dict()
for key, value in post.headers.items():
headers[key.lower()] = value
body = await post.read()
self._validate_signature(body, headers, token)
# We cannot send the raw body through gearman, so it's easy to just
# encode it as json, after decoding it as utf-8
json_body = json.loads(body.decode('utf-8'))
job = self.rpc.submitJob('github:%s:payload' % connection,
{'headers': headers, 'body': json_body})
jobdata = json.loads(job.data[0])
return web.json_response(jobdata, status=jobdata['return_code'])
async def payload_post(self, post):
# Allow for other drivers to also accept a payload in the future,
# instead of hardcoding this to GitHub
driver = post.match_info["driver"]
try:
method = getattr(self, driver + '_payload')
except AttributeError as e:
self.log.exception("Unknown driver error:")
raise web.HTTPNotFound
return await method(post)
async def processRequest(self, request, action):
try:
resp = self.controllers[action](request)
resp = await self.controllers[action](request)
except asyncio.CancelledError:
self.log.debug("request handling cancelled")
except Exception as e:
@@ -300,7 +369,8 @@ class ZuulWeb(object):
gear_server, gear_port,
ssl_key=None, ssl_cert=None, ssl_ca=None,
static_cache_expiry=3600,
sql_connection=None):
sql_connection=None,
github_connections={}):
self.listen_address = listen_address
self.listen_port = listen_port
self.event_loop = None
@@ -311,7 +381,7 @@ class ZuulWeb(object):
self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port,
ssl_key, ssl_cert, ssl_ca)
self.log_streaming_handler = LogStreamingHandler(self.rpc)
self.gearman_handler = GearmanHandler(self.rpc)
self.gearman_handler = GearmanHandler(self.rpc, github_connections)
if sql_connection:
self.sql_handler = SqlHandler(sql_connection)
else:
@@ -337,6 +407,10 @@ class ZuulWeb(object):
async def _handleKeyRequest(self, request):
return await self.gearman_handler.processRequest(request, 'key_get')
async def _handlePayloadPost(self, post):
return await self.gearman_handler.processRequest(post,
'payload_post')
async def _handleStaticRequest(self, request):
fp = None
if request.path.endswith("tenants.html") or request.path.endswith("/"):
@@ -376,6 +450,8 @@ class ZuulWeb(object):
('GET', '/{tenant}/jobs.html', self._handleStaticRequest),
('GET', '/{tenant}/stream.html', self._handleStaticRequest),
('GET', '/tenants.html', self._handleStaticRequest),
('POST', '/driver/{driver}/{connection}/payload',
self._handlePayloadPost),
('GET', '/', self._handleStaticRequest),
]