Add facility for plugins to register web routes

Rather than special-casing github and sql in zuul-web, add a
registration method to Connection to allow a Connection to register
routes.

Provide two types of WebHandler classes - a 'raw' class and a 'driver'
class. The driver class prepends '/connection/{connection}' to the
provided path.

Shift the github webhook and SQL web methods to use the plugin interface.

Change-Id: I065937b20447248e7894e1dfeec269faa6fd30d2
This commit is contained in:
Monty Taylor 2018-01-23 16:39:30 -06:00
parent e0bad8dc05
commit 64bf8e0839
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
8 changed files with 303 additions and 237 deletions

View File

@ -1006,7 +1006,7 @@ class FakeGithubConnection(githubconnection.GithubConnection):
if use_zuulweb:
req = urllib.request.Request(
'http://127.0.0.1:%s/driver/github/%s/payload'
'http://127.0.0.1:%s/connection/%s/payload'
% (self.zuul_web_port, self.connection_name),
data=payload, headers=headers)
return urllib.request.urlopen(req)

View File

@ -751,7 +751,7 @@ class TestGithubWebhook(ZuulTestCase):
self.web = zuul.web.ZuulWeb(
listen_address='127.0.0.1', listen_port=0,
gear_server='127.0.0.1', gear_port=self.gearman_server.port,
github_connections={'github': self.fake_github})
connections=[self.fake_github])
loop = asyncio.new_event_loop()
loop.set_debug(True)
ws_thread = threading.Thread(target=self.web.run, args=(loop,))

View File

@ -22,8 +22,6 @@ import threading
import zuul.cmd
import zuul.web
from zuul.driver.sql import sqlconnection
from zuul.driver.github import githubconnection
from zuul.lib.config import get_default
@ -50,34 +48,15 @@ class WebServer(zuul.cmd.ZuulDaemonApp):
params['ssl_cert'] = get_default(self.config, 'gearman', 'ssl_cert')
params['ssl_ca'] = get_default(self.config, 'gearman', 'ssl_ca')
sql_conn_name = get_default(self.config, 'web',
'sql_connection_name')
sql_conn = None
if sql_conn_name:
# we want a specific sql connection
sql_conn = self.connections.connections.get(sql_conn_name)
if not sql_conn:
self.log.error("Couldn't find sql connection '%s'" %
sql_conn_name)
sys.exit(1)
else:
# look for any sql connection
connections = [c for c in self.connections.connections.values()
if isinstance(c, sqlconnection.SQLConnection)]
if len(connections) > 1:
self.log.error("Multiple sql connection found, "
"set the sql_connection_name option "
"in zuul.conf [web] section")
sys.exit(1)
if connections:
# use this sql connection by default
sql_conn = connections[0]
params['sql_connection'] = sql_conn
params['github_connections'] = {}
params['connections'] = []
# Validate config here before we spin up the ZuulWeb object
for conn_name, connection in self.connections.connections.items():
if isinstance(connection, githubconnection.GithubConnection):
params['github_connections'][conn_name] = connection
try:
if connection.validateWebConfig(self.config, self.connections):
params['connections'].append(connection)
except Exception:
self.log.exception("Error validating config")
sys.exit(1)
try:
self.web = zuul.web.ZuulWeb(**params)

View File

@ -74,3 +74,30 @@ class BaseConnection(object, metaclass=abc.ABCMeta):
This lets the user supply a list of change objects that are
still in use. Anything in our cache that isn't in the supplied
list should be safe to remove from the cache."""
def getWebHandlers(self, zuul_web):
"""Return a list of web handlers to register with zuul-web.
:param zuul.web.ZuulWeb zuul_web:
Zuul Web instance.
:returns: List of `zuul.web.handler.BaseWebHandler` instances.
"""
return []
def validateWebConfig(self, config, connections):
"""Validate config and determine whether to register web handlers.
By default this method returns False, which means this connection
has no web handlers to register.
If the method returns True, then its `getWebHandlers` method
should be called during route registration.
If there is a fatal error, the method should raise an exception.
:param config:
The parsed config object.
:param zuul.lib.connections.ConnectionRegistry connections:
Registry of all configured connections.
"""
return False

View File

@ -24,6 +24,7 @@ import re
import json
import traceback
from aiohttp import web
import cachecontrol
from cachecontrol.cache import DictCache
from cachecontrol.heuristics import BaseHeuristic
@ -37,6 +38,7 @@ import github3.exceptions
import gear
from zuul.connection import BaseConnection
from zuul.web.handler import BaseDriverWebHandler
from zuul.lib.config import get_default
from zuul.model import Ref, Branch, Tag, Project
from zuul.exceptions import MergeFailure
@ -1134,6 +1136,69 @@ class GithubConnection(BaseConnection):
return statuses
def getWebHandlers(self, zuul_web):
return [GithubWebhookHandler(self, zuul_web, 'POST', 'payload')]
def validateWebConfig(self, config, connections):
if 'webhook_token' not in self.connection_config:
raise Exception(
"webhook_token not found in config for connection %s" %
self.connection_name)
return True
class GithubWebhookHandler(BaseDriverWebHandler):
log = logging.getLogger("zuul.GithubWebhookHandler")
def __init__(self, connection, zuul_web, method, path):
super(GithubWebhookHandler, self).__init__(
connection=connection, zuul_web=zuul_web, method=method, path=path)
self.token = self.connection.connection_config.get('webhook_token')
def _validate_signature(self, body, headers):
try:
request_signature = headers['x-hub-signature']
except KeyError:
raise web.HTTPUnauthorized(
reason='X-Hub-Signature header missing.')
payload_signature = _sign_request(body, self.token)
self.log.debug("Payload Signature: {0}".format(str(payload_signature)))
self.log.debug("Request Signature: {0}".format(str(request_signature)))
if not hmac.compare_digest(
str(payload_signature), str(request_signature)):
raise web.HTTPUnauthorized(
reason=('Request signature does not match calculated payload '
'signature. Check that secret is correct.'))
return True
async def handleRequest(self, request):
# Note(tobiash): We need to normalize the headers. Otherwise we will
# have trouble to get them from the dict afterwards.
# e.g.
# GitHub: sent: X-GitHub-Event received: X-GitHub-Event
# urllib: sent: X-GitHub-Event received: X-Github-Event
#
# We cannot easily solve this mismatch as every http processing lib
# modifies the header casing in its own way and by specification http
# headers are case insensitive so just lowercase all so we don't have
# to take care later.
headers = dict()
for key, value in request.headers.items():
headers[key.lower()] = value
body = await request.read()
self._validate_signature(body, headers)
# We cannot send the raw body through gearman, so it's easy to just
# encode it as json, after decoding it as utf-8
json_body = json.loads(body.decode('utf-8'))
job = self.zuul_web.rpc.submitJob(
'github:%s:payload' % self.connection.connection_name,
{'headers': headers, 'body': json_body})
return web.json_response(json.loads(job.data[0]))
def _status_as_tuple(status):
"""Translate a status into a tuple of user, context, state"""

View File

@ -14,14 +14,19 @@
import logging
from aiohttp import web
import alembic
import alembic.command
import alembic.config
import sqlalchemy as sa
import sqlalchemy.pool
import voluptuous as v
from sqlalchemy.sql import select
import urllib.parse
import voluptuous
from zuul.connection import BaseConnection
from zuul.lib.config import get_default
from zuul.web.handler import BaseWebHandler, StaticHandler
BUILDSET_TABLE = 'zuul_buildset'
BUILD_TABLE = 'zuul_build'
@ -120,7 +125,122 @@ class SQLConnection(BaseConnection):
return zuul_buildset_table, zuul_build_table
def getWebHandlers(self, zuul_web):
return [
SqlWebHandler(self, zuul_web, 'GET', '/{tenant}/builds.json'),
StaticHandler(zuul_web, '/{tenant}/builds.html'),
]
def validateWebConfig(self, config, connections):
sql_conn_name = get_default(config, 'web', 'sql_connection_name')
if sql_conn_name:
# The config wants a specific sql connection. Check the whole
# list of connections to make sure it can be satisfied.
sql_conn = connections.connections.get(sql_conn_name)
if not sql_conn:
raise Exception(
"Couldn't find sql connection '%s'" % sql_conn_name)
if self.connection_name == sql_conn.connection_name:
return True
else:
# Check to see if there is more than one connection
conn_objects = [c for c in connections.connections.values()
if isinstance(c, SQLConnection)]
if len(conn_objects) > 1:
raise Exception("Multiple sql connection found, "
"set the sql_connection_name option "
"in zuul.conf [web] section")
return True
class SqlWebHandler(BaseWebHandler):
log = logging.getLogger("zuul.web.SqlHandler")
filters = ("project", "pipeline", "change", "patchset", "ref",
"result", "uuid", "job_name", "voting", "node_name", "newrev")
def __init__(self, connection, zuul_web, method, path):
super(SqlWebHandler, self).__init__(
connection=connection, zuul_web=zuul_web, method=method, path=path)
def query(self, args):
build = self.connection.zuul_build_table
buildset = self.connection.zuul_buildset_table
query = select([
buildset.c.project,
buildset.c.pipeline,
buildset.c.change,
buildset.c.patchset,
buildset.c.ref,
buildset.c.newrev,
buildset.c.ref_url,
build.c.result,
build.c.uuid,
build.c.job_name,
build.c.voting,
build.c.node_name,
build.c.start_time,
build.c.end_time,
build.c.log_url]).select_from(build.join(buildset))
for table in ('build', 'buildset'):
for key, val in args['%s_filters' % table].items():
if table == 'build':
column = build.c
else:
column = buildset.c
query = query.where(getattr(column, key).in_(val))
return query.limit(args['limit']).offset(args['skip']).order_by(
build.c.id.desc())
async def get_builds(self, args):
"""Return a list of build"""
builds = []
with self.connection.engine.begin() as conn:
query = self.query(args)
for row in conn.execute(query):
build = dict(row)
# Convert date to iso format
if row.start_time:
build['start_time'] = row.start_time.strftime(
'%Y-%m-%dT%H:%M:%S')
if row.end_time:
build['end_time'] = row.end_time.strftime(
'%Y-%m-%dT%H:%M:%S')
# Compute run duration
if row.start_time and row.end_time:
build['duration'] = (row.end_time -
row.start_time).total_seconds()
builds.append(build)
return builds
async def handleRequest(self, request):
try:
args = {
'buildset_filters': {},
'build_filters': {},
'limit': 50,
'skip': 0,
}
for k, v in urllib.parse.parse_qsl(request.rel_url.query_string):
if k in ("tenant", "project", "pipeline", "change",
"patchset", "ref", "newrev"):
args['buildset_filters'].setdefault(k, []).append(v)
elif k in ("uuid", "job_name", "voting", "node_name",
"result"):
args['build_filters'].setdefault(k, []).append(v)
elif k in ("limit", "skip"):
args[k] = int(v)
else:
raise ValueError("Unknown parameter %s" % k)
data = await self.get_builds(args)
resp = web.json_response(data)
resp.headers['Access-Control-Allow-Origin'] = '*'
except Exception as e:
self.log.exception("Jobs exception:")
resp = web.json_response({'error_description': 'Internal error'},
status=500)
return resp
def getSchema():
sql_connection = v.Any(str, v.Schema(dict))
sql_connection = voluptuous.Any(str, voluptuous.Schema(dict))
return sql_connection

View File

@ -16,31 +16,21 @@
import asyncio
import hashlib
import hmac
import json
import logging
import os
import time
import urllib.parse
import uvloop
import aiohttp
from aiohttp import web
from sqlalchemy.sql import select
import zuul.rpcclient
from zuul.web.handler import StaticHandler
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
def _sign_request(body, secret):
signature = 'sha1=' + hmac.new(
secret.encode('utf-8'), body, hashlib.sha1).hexdigest()
return signature
class LogStreamingHandler(object):
log = logging.getLogger("zuul.web.LogStreamingHandler")
@ -158,9 +148,8 @@ class GearmanHandler(object):
# Tenant status cache expiry
cache_expiry = 1
def __init__(self, rpc, github_connections):
def __init__(self, rpc):
self.rpc = rpc
self.github_connections = github_connections
self.cache = {}
self.cache_time = {}
self.controllers = {
@ -168,7 +157,6 @@ class GearmanHandler(object):
'status_get': self.status_get,
'job_list': self.job_list,
'key_get': self.key_get,
'payload_post': self.payload_post,
}
async def tenant_list(self, request):
@ -203,65 +191,6 @@ class GearmanHandler(object):
'project': project})
return web.Response(body=job.data[0])
def _validate_signature(self, body, headers, secret):
try:
request_signature = headers['x-hub-signature']
except KeyError:
raise web.HTTPUnauthorized(
reason='X-Hub-Signature header missing.')
payload_signature = _sign_request(body, secret)
self.log.debug("Payload Signature: {0}".format(str(payload_signature)))
self.log.debug("Request Signature: {0}".format(str(request_signature)))
if not hmac.compare_digest(
str(payload_signature), str(request_signature)):
raise web.HTTPUnauthorized(
reason=('Request signature does not match calculated payload '
'signature. Check that secret is correct.'))
return True
async def github_payload(self, post):
connection = post.match_info["connection"]
github_connection = self.github_connections.get(connection)
token = github_connection.connection_config.get('webhook_token')
# Note(tobiash): We need to normalize the headers. Otherwise we will
# have trouble to get them from the dict afterwards.
# e.g.
# GitHub: sent: X-GitHub-Event received: X-GitHub-Event
# urllib: sent: X-GitHub-Event received: X-Github-Event
#
# We cannot easily solve this mismatch as every http processing lib
# modifies the header casing in its own way and by specification http
# headers are case insensitive so just lowercase all so we don't have
# to take care later.
headers = dict()
for key, value in post.headers.items():
headers[key.lower()] = value
body = await post.read()
self._validate_signature(body, headers, token)
# We cannot send the raw body through gearman, so it's easy to just
# encode it as json, after decoding it as utf-8
json_body = json.loads(body.decode('utf-8'))
job = self.rpc.submitJob('github:%s:payload' % connection,
{'headers': headers, 'body': json_body})
jobdata = json.loads(job.data[0])
return web.json_response(jobdata, status=jobdata['return_code'])
async def payload_post(self, post):
# Allow for other drivers to also accept a payload in the future,
# instead of hardcoding this to GitHub
driver = post.match_info["driver"]
try:
method = getattr(self, driver + '_payload')
except AttributeError as e:
self.log.exception("Unknown driver error:")
raise web.HTTPNotFound
return await method(post)
async def processRequest(self, request, action):
try:
resp = await self.controllers[action](request)
@ -274,93 +203,6 @@ class GearmanHandler(object):
return resp
class SqlHandler(object):
log = logging.getLogger("zuul.web.SqlHandler")
filters = ("project", "pipeline", "change", "patchset", "ref",
"result", "uuid", "job_name", "voting", "node_name", "newrev")
def __init__(self, connection):
self.connection = connection
def query(self, args):
build = self.connection.zuul_build_table
buildset = self.connection.zuul_buildset_table
query = select([
buildset.c.project,
buildset.c.pipeline,
buildset.c.change,
buildset.c.patchset,
buildset.c.ref,
buildset.c.newrev,
buildset.c.ref_url,
build.c.result,
build.c.uuid,
build.c.job_name,
build.c.voting,
build.c.node_name,
build.c.start_time,
build.c.end_time,
build.c.log_url]).select_from(build.join(buildset))
for table in ('build', 'buildset'):
for k, v in args['%s_filters' % table].items():
if table == 'build':
column = build.c
else:
column = buildset.c
query = query.where(getattr(column, k).in_(v))
return query.limit(args['limit']).offset(args['skip']).order_by(
build.c.id.desc())
def get_builds(self, args):
"""Return a list of build"""
builds = []
with self.connection.engine.begin() as conn:
query = self.query(args)
for row in conn.execute(query):
build = dict(row)
# Convert date to iso format
if row.start_time:
build['start_time'] = row.start_time.strftime(
'%Y-%m-%dT%H:%M:%S')
if row.end_time:
build['end_time'] = row.end_time.strftime(
'%Y-%m-%dT%H:%M:%S')
# Compute run duration
if row.start_time and row.end_time:
build['duration'] = (row.end_time -
row.start_time).total_seconds()
builds.append(build)
return builds
async def processRequest(self, request):
try:
args = {
'buildset_filters': {},
'build_filters': {},
'limit': 50,
'skip': 0,
}
for k, v in urllib.parse.parse_qsl(request.rel_url.query_string):
if k in ("tenant", "project", "pipeline", "change",
"patchset", "ref", "newrev"):
args['buildset_filters'].setdefault(k, []).append(v)
elif k in ("uuid", "job_name", "voting", "node_name",
"result"):
args['build_filters'].setdefault(k, []).append(v)
elif k in ("limit", "skip"):
args[k] = int(v)
else:
raise ValueError("Unknown parameter %s" % k)
data = self.get_builds(args)
resp = web.json_response(data)
resp.headers['Access-Control-Allow-Origin'] = '*'
except Exception as e:
self.log.exception("Jobs exception:")
resp = web.json_response({'error_description': 'Internal error'},
status=500)
return resp
class ZuulWeb(object):
log = logging.getLogger("zuul.web.ZuulWeb")
@ -369,8 +211,7 @@ class ZuulWeb(object):
gear_server, gear_port,
ssl_key=None, ssl_cert=None, ssl_ca=None,
static_cache_expiry=3600,
sql_connection=None,
github_connections={}):
connections=None):
self.listen_address = listen_address
self.listen_port = listen_port
self.event_loop = None
@ -381,11 +222,11 @@ class ZuulWeb(object):
self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port,
ssl_key, ssl_cert, ssl_ca)
self.log_streaming_handler = LogStreamingHandler(self.rpc)
self.gearman_handler = GearmanHandler(self.rpc, github_connections)
if sql_connection:
self.sql_handler = SqlHandler(sql_connection)
else:
self.sql_handler = None
self.gearman_handler = GearmanHandler(self.rpc)
self._plugin_routes = [] # type: List[zuul.web.handler.BaseWebHandler]
connections = connections or []
for connection in connections:
self._plugin_routes.extend(connection.getWebHandlers(self))
async def _handleWebsocket(self, request):
return await self.log_streaming_handler.processRequest(
@ -401,34 +242,9 @@ class ZuulWeb(object):
async def _handleJobsRequest(self, request):
return await self.gearman_handler.processRequest(request, 'job_list')
async def _handleSqlRequest(self, request):
return await self.sql_handler.processRequest(request)
async def _handleKeyRequest(self, request):
return await self.gearman_handler.processRequest(request, 'key_get')
async def _handlePayloadPost(self, post):
return await self.gearman_handler.processRequest(post,
'payload_post')
async def _handleStaticRequest(self, request):
fp = None
if request.path.endswith("tenants.html") or request.path.endswith("/"):
fp = os.path.join(STATIC_DIR, "index.html")
elif request.path.endswith("status.html"):
fp = os.path.join(STATIC_DIR, "status.html")
elif request.path.endswith("jobs.html"):
fp = os.path.join(STATIC_DIR, "jobs.html")
elif request.path.endswith("builds.html"):
fp = os.path.join(STATIC_DIR, "builds.html")
elif request.path.endswith("stream.html"):
fp = os.path.join(STATIC_DIR, "stream.html")
headers = {}
if self.static_cache_expiry:
headers['Cache-Control'] = "public, max-age=%d" % \
self.static_cache_expiry
return web.FileResponse(fp, headers=headers)
def run(self, loop=None):
"""
Run the websocket daemon.
@ -446,20 +262,18 @@ class ZuulWeb(object):
('GET', '/{tenant}/jobs.json', self._handleJobsRequest),
('GET', '/{tenant}/console-stream', self._handleWebsocket),
('GET', '/{tenant}/{project:.*}.pub', self._handleKeyRequest),
('GET', '/{tenant}/status.html', self._handleStaticRequest),
('GET', '/{tenant}/jobs.html', self._handleStaticRequest),
('GET', '/{tenant}/stream.html', self._handleStaticRequest),
('GET', '/tenants.html', self._handleStaticRequest),
('POST', '/driver/{driver}/{connection}/payload',
self._handlePayloadPost),
('GET', '/', self._handleStaticRequest),
]
if self.sql_handler:
routes.append(('GET', '/{tenant}/builds.json',
self._handleSqlRequest))
routes.append(('GET', '/{tenant}/builds.html',
self._handleStaticRequest))
static_routes = [
StaticHandler(self, '/{tenant}/status.html'),
StaticHandler(self, '/{tenant}/jobs.html'),
StaticHandler(self, '/{tenant}/stream.html'),
StaticHandler(self, '/tenants.html', 'index.html'),
StaticHandler(self, '/', 'index.html'),
]
for route in static_routes + self._plugin_routes:
routes.append((route.method, route.path, route.handleRequest))
self.log.debug("ZuulWeb starting")
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

61
zuul/web/handler.py Normal file
View File

@ -0,0 +1,61 @@
# Copyright 2018 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import os
from aiohttp import web
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
class BaseWebHandler(object, metaclass=abc.ABCMeta):
def __init__(self, connection, zuul_web, method, path):
self.connection = connection
self.zuul_web = zuul_web
self.method = method
self.path = path
@abc.abstractmethod
async def handleRequest(self, request):
"""Process a web request."""
class BaseDriverWebHandler(BaseWebHandler):
def __init__(self, connection, zuul_web, method, path):
super(BaseDriverWebHandler, self).__init__(
connection=connection, zuul_web=zuul_web, method=method, path=path)
if path.startswith('/'):
path = path[1:]
self.path = '/connection/{connection}/{path}'.format(
connection=self.connection.connection_name,
path=path)
class StaticHandler(BaseWebHandler):
def __init__(self, zuul_web, path, file_path=None):
super(StaticHandler, self).__init__(None, zuul_web, 'GET', path)
self.file_path = file_path or path.split('/')[-1]
async def handleRequest(self, request):
"""Process a web request."""
headers = {}
fp = os.path.join(STATIC_DIR, self.file_path)
if self.zuul_web.static_cache_expiry:
headers['Cache-Control'] = "public, max-age=%d" % \
self.zuul_web.static_cache_expiry
return web.FileResponse(fp, headers=headers)