From 0eeceba5a55b051a6b62e591850d0489d1fdf770 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Fri, 11 May 2018 14:59:24 -0700 Subject: [PATCH] Replace use of aiohttp with cherrypy * Aiohttp (and related libraries) have a python support policy which is causing us problems. * Cherrypy supports threads which integrates well with the rest of Zuul. Change-Id: Ib611df06035890d3e87fc5ad92fdfc7ac441edce --- doc/source/developer/ansible.rst | 4 +- requirements.txt | 3 + tests/base.py | 15 +- tests/unit/test_github_driver.py | 17 +- tests/unit/test_streaming.py | 28 +- tests/unit/test_web.py | 19 +- zuul/cmd/web.py | 18 +- zuul/connection/__init__.py | 16 +- zuul/driver/github/githubconnection.py | 54 +- zuul/driver/sql/__init__.py | 42 -- zuul/driver/sql/sqlconnection.py | 13 +- zuul/web/__init__.py | 664 ++++++++++++------------- zuul/web/handler.py | 47 +- 13 files changed, 366 insertions(+), 574 deletions(-) diff --git a/doc/source/developer/ansible.rst b/doc/source/developer/ansible.rst index e3ebca7cac..c5fcbb4422 100644 --- a/doc/source/developer/ansible.rst +++ b/doc/source/developer/ansible.rst @@ -43,7 +43,7 @@ that starts a log streaming daemon on the build node. All jobs run with the :py:mod:`zuul.ansible.callback.zuul_stream` callback plugin enabled, which writes the build log to a file so that the :py:class:`zuul.lib.log_streamer.LogStreamer` can provide the data on demand -over the finger protocol. Finally, :py:class:`zuul.web.LogStreamingHandler` +over the finger protocol. Finally, :py:class:`zuul.web.LogStreamHandler` exposes that log stream over a websocket connection as part of :py:class:`zuul.web.ZuulWeb`. @@ -51,7 +51,7 @@ exposes that log stream over a websocket connection as part of :members: .. autoclass:: zuul.lib.log_streamer.LogStreamer -.. autoclass:: zuul.web.LogStreamingHandler +.. autoclass:: zuul.web.LogStreamHandler .. autoclass:: zuul.web.ZuulWeb In addition to real-time streaming, Zuul also installs another callback module, diff --git a/requirements.txt b/requirements.txt index 1b790f750e..4a0cad0c15 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,3 +29,6 @@ uvloop;python_version>='3.5' psutil fb-re2>=1.0.6 paho-mqtt +cherrypy +ws4py +routes diff --git a/tests/base.py b/tests/base.py index 70068e71ee..61e7e17a67 100755 --- a/tests/base.py +++ b/tests/base.py @@ -15,7 +15,6 @@ # License for the specific language governing permissions and limitations # under the License. -import asyncio import configparser from contextlib import contextmanager import datetime @@ -1894,22 +1893,14 @@ class ZuulWebFixture(fixtures.Fixture): listen_address='127.0.0.1', listen_port=0, gear_server='127.0.0.1', gear_port=self.gearman_server_port, info=zuul.model.WebInfo(), - _connections=self.connections) - loop = asyncio.new_event_loop() - loop.set_debug(True) - ws_thread = threading.Thread(target=self.web.run, args=(loop,)) - ws_thread.start() - self.addCleanup(loop.close) - self.addCleanup(ws_thread.join) + connections=self.connections) + self.web.start() self.addCleanup(self.web.stop) self.host = 'localhost' # Wait until web server is started while True: - time.sleep(0.1) - if self.web.server is None: - continue - self.port = self.web.server.sockets[0].getsockname()[1] + self.port = self.web.port try: with socket.create_connection((self.host, self.port)): break diff --git a/tests/unit/test_github_driver.py b/tests/unit/test_github_driver.py index 09b20fd1be..e8347aafcd 100644 --- a/tests/unit/test_github_driver.py +++ b/tests/unit/test_github_driver.py @@ -12,8 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -import asyncio -import threading import os import re from testtools.matchers import MatchesRegex, StartsWith @@ -789,23 +787,14 @@ class TestGithubWebhook(ZuulTestCase): self.web = zuul.web.ZuulWeb( listen_address='127.0.0.1', listen_port=0, gear_server='127.0.0.1', gear_port=self.gearman_server.port, - connections=[self.fake_github], - _connections=self.connections) - loop = asyncio.new_event_loop() - loop.set_debug(True) - ws_thread = threading.Thread(target=self.web.run, args=(loop,)) - ws_thread.start() - self.addCleanup(loop.close) - self.addCleanup(ws_thread.join) + connections=self.connections) + self.web.start() self.addCleanup(self.web.stop) host = '127.0.0.1' # Wait until web server is started while True: - time.sleep(0.1) - if self.web.server is None: - continue - port = self.web.server.sockets[0].getsockname()[1] + port = self.web.port try: with socket.create_connection((host, port)): break diff --git a/tests/unit/test_streaming.py b/tests/unit/test_streaming.py index 56b3488811..597c906497 100644 --- a/tests/unit/test_streaming.py +++ b/tests/unit/test_streaming.py @@ -283,21 +283,13 @@ class TestStreaming(tests.base.AnsibleZuulTestCase): listen_address='::', listen_port=0, gear_server='127.0.0.1', gear_port=self.gearman_server.port, static_path=tempfile.gettempdir(), - _connections=self.connections) - loop = asyncio.new_event_loop() - loop.set_debug(True) - ws_thread = threading.Thread(target=web_server.run, args=(loop,)) - ws_thread.start() - self.addCleanup(loop.close) - self.addCleanup(ws_thread.join) + connections=self.connections) + web_server.start() self.addCleanup(web_server.stop) # Wait until web server is started while True: - if web_server.server is None: - time.sleep(0.1) - continue - port = web_server.server.sockets[0].getsockname()[1] + port = web_server.port try: with socket.create_connection((self.host, port)): break @@ -374,21 +366,13 @@ class TestStreaming(tests.base.AnsibleZuulTestCase): listen_address='::', listen_port=0, gear_server='127.0.0.1', gear_port=self.gearman_server.port, static_path=tempfile.gettempdir(), - _connections=self.connections) - loop = asyncio.new_event_loop() - loop.set_debug(True) - ws_thread = threading.Thread(target=web_server.run, args=(loop,)) - ws_thread.start() - self.addCleanup(loop.close) - self.addCleanup(ws_thread.join) + connections=self.connections) + web_server.start() self.addCleanup(web_server.stop) # Wait until web server is started while True: - if web_server.server is None: - time.sleep(0.1) - continue - port = web_server.server.sockets[0].getsockname()[1] + port = web_server.port try: with socket.create_connection((self.host, port)): break diff --git a/tests/unit/test_web.py b/tests/unit/test_web.py index 33c3a8e544..569048322a 100644 --- a/tests/unit/test_web.py +++ b/tests/unit/test_web.py @@ -15,12 +15,9 @@ # License for the specific language governing permissions and limitations # under the License. -import asyncio import json -import threading import os import urllib.parse -import time import socket import requests @@ -63,25 +60,15 @@ class BaseTestWeb(ZuulTestCase): listen_address='127.0.0.1', listen_port=0, gear_server='127.0.0.1', gear_port=self.gearman_server.port, info=zuul.model.WebInfo.fromConfig(self.zuul_ini_config), - connections=self.connections.connections.values(), - _connections=self.connections + connections=self.connections ) - loop = asyncio.new_event_loop() - loop.set_debug(True) - ws_thread = threading.Thread(target=self.web.run, args=(loop,)) - ws_thread.start() - self.addCleanup(loop.close) - self.addCleanup(ws_thread.join) + self.web.start() self.addCleanup(self.web.stop) self.host = 'localhost' + self.port = self.web.port # Wait until web server is started while True: - time.sleep(0.1) - if self.web.server is None: - continue - self.port = self.web.server.sockets[0].getsockname()[1] - print(self.host, self.port) try: with socket.create_connection((self.host, self.port)): break diff --git a/zuul/cmd/web.py b/zuul/cmd/web.py index 19057a74a9..36c4eded18 100755 --- a/zuul/cmd/web.py +++ b/zuul/cmd/web.py @@ -13,11 +13,9 @@ # License for the specific language governing permissions and limitations # under the License. -import asyncio import logging import signal import sys -import threading import zuul.cmd import zuul.model @@ -55,13 +53,11 @@ class WebServer(zuul.cmd.ZuulDaemonApp): params['ssl_cert'] = get_default(self.config, 'gearman', 'ssl_cert') params['ssl_ca'] = get_default(self.config, 'gearman', 'ssl_ca') - params['_connections'] = self.connections - params['connections'] = [] + params['connections'] = self.connections # Validate config here before we spin up the ZuulWeb object for conn_name, connection in self.connections.connections.items(): try: - if connection.validateWebConfig(self.config, self.connections): - params['connections'].append(connection) + connection.validateWebConfig(self.config, self.connections) except Exception: self.log.exception("Error validating config") sys.exit(1) @@ -72,15 +68,11 @@ class WebServer(zuul.cmd.ZuulDaemonApp): self.log.exception("Error creating ZuulWeb:") sys.exit(1) - loop = asyncio.get_event_loop() signal.signal(signal.SIGUSR1, self.exit_handler) signal.signal(signal.SIGTERM, self.exit_handler) self.log.info('Zuul Web Server starting') - self.thread = threading.Thread(target=self.web.run, - args=(loop,), - name='web') - self.thread.start() + self.web.start() try: signal.pause() @@ -88,9 +80,7 @@ class WebServer(zuul.cmd.ZuulDaemonApp): print("Ctrl + C: asking web server to exit nicely...\n") self.exit_handler(signal.SIGINT, None) - self.thread.join() - loop.stop() - loop.close() + self.web.stop() self.log.info("Zuul Web Server stopped") def run(self): diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py index 1c62f4dab0..9011891d64 100644 --- a/zuul/connection/__init__.py +++ b/zuul/connection/__init__.py @@ -75,26 +75,20 @@ class BaseConnection(object, metaclass=abc.ABCMeta): still in use. Anything in our cache that isn't in the supplied list should be safe to remove from the cache.""" - def getWebHandlers(self, zuul_web, info): - """Return a list of web handlers to register with zuul-web. + def getWebController(self, zuul_web, info): + """Return a cherrypy web controller to register with zuul-web. :param zuul.web.ZuulWeb zuul_web: Zuul Web instance. :param zuul.model.WebInfo info: The WebInfo object for the Zuul Web instance. Can be used by plugins to toggle API capabilities. - :returns: List of `zuul.web.handler.BaseWebHandler` instances. + :returns: A `zuul.web.handler.BaseWebController` instance. """ - return [] + return None def validateWebConfig(self, config, connections): - """Validate config and determine whether to register web handlers. - - By default this method returns False, which means this connection - has no web handlers to register. - - If the method returns True, then its `getWebHandlers` method - should be called during route registration. + """Validate web config. If there is a fatal error, the method should raise an exception. diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index ce166b4c3d..426d23377e 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -12,7 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -import asyncio import collections import datetime import logging @@ -25,7 +24,7 @@ import re import json import traceback -from aiohttp import web +import cherrypy import cachecontrol from cachecontrol.cache import DictCache from cachecontrol.heuristics import BaseHeuristic @@ -39,7 +38,7 @@ import github3.exceptions import gear from zuul.connection import BaseConnection -from zuul.web.handler import BaseDriverWebHandler +from zuul.web.handler import BaseWebController from zuul.lib.config import get_default from zuul.model import Ref, Branch, Tag, Project from zuul.exceptions import MergeFailure @@ -1149,8 +1148,8 @@ class GithubConnection(BaseConnection): return statuses - def getWebHandlers(self, zuul_web, info): - return [GithubWebhookHandler(self, zuul_web, 'POST', 'payload')] + def getWebController(self, zuul_web, info): + return GithubWebController(zuul_web, self) def validateWebConfig(self, config, connections): if 'webhook_token' not in self.connection_config: @@ -1160,21 +1159,20 @@ class GithubConnection(BaseConnection): return True -class GithubWebhookHandler(BaseDriverWebHandler): +class GithubWebController(BaseWebController): - log = logging.getLogger("zuul.GithubWebhookHandler") + log = logging.getLogger("zuul.GithubWebController") - def __init__(self, connection, zuul_web, method, path): - super(GithubWebhookHandler, self).__init__( - connection=connection, zuul_web=zuul_web, method=method, path=path) + def __init__(self, zuul_web, connection): + self.connection = connection + self.zuul_web = zuul_web self.token = self.connection.connection_config.get('webhook_token') def _validate_signature(self, body, headers): try: request_signature = headers['x-hub-signature'] except KeyError: - raise web.HTTPUnauthorized( - reason='X-Hub-Signature header missing.') + raise cherrypy.HTTPError(401, 'X-Hub-Signature header missing.') payload_signature = _sign_request(body, self.token) @@ -1182,16 +1180,16 @@ class GithubWebhookHandler(BaseDriverWebHandler): self.log.debug("Request Signature: {0}".format(str(request_signature))) if not hmac.compare_digest( str(payload_signature), str(request_signature)): - raise web.HTTPUnauthorized( - reason=('Request signature does not match calculated payload ' - 'signature. Check that secret is correct.')) + raise cherrypy.HTTPError( + 401, + 'Request signature does not match calculated payload ' + 'signature. Check that secret is correct.') return True - def setEventLoop(self, event_loop): - self.event_loop = event_loop - - async def handleRequest(self, request): + @cherrypy.expose + @cherrypy.tools.json_out(content_type='application/json; charset=utf-8') + def payload(self): # Note(tobiash): We need to normalize the headers. Otherwise we will # have trouble to get them from the dict afterwards. # e.g. @@ -1202,28 +1200,22 @@ class GithubWebhookHandler(BaseDriverWebHandler): # modifies the header casing in its own way and by specification http # headers are case insensitive so just lowercase all so we don't have # to take care later. + # Note(corvus): Don't use cherrypy's json_in here so that we + # can validate the signature. headers = dict() - for key, value in request.headers.items(): + for key, value in cherrypy.request.headers.items(): headers[key.lower()] = value - body = await request.read() + body = cherrypy.request.body.read() self._validate_signature(body, headers) # We cannot send the raw body through gearman, so it's easy to just # encode it as json, after decoding it as utf-8 json_body = json.loads(body.decode('utf-8')) - gear_task = self.event_loop.run_in_executor( - None, self.zuul_web.rpc.submitJob, + job = self.zuul_web.rpc.submitJob( 'github:%s:payload' % self.connection.connection_name, {'headers': headers, 'body': json_body}) - try: - job = await asyncio.wait_for(gear_task, 300) - except asyncio.TimeoutError: - self.log.exception("Gearman timeout:") - return web.json_response({'error_description': 'Internal error'}, - status=500) - - return web.json_response(json.loads(job.data[0])) + return json.loads(job.data[0]) def _status_as_tuple(status): diff --git a/zuul/driver/sql/__init__.py b/zuul/driver/sql/__init__.py index 2340e10825..dbb70ea254 100644 --- a/zuul/driver/sql/__init__.py +++ b/zuul/driver/sql/__init__.py @@ -12,10 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -import logging -from aiohttp import web -import urllib.parse - from zuul.driver import Driver, ConnectionInterface, ReporterInterface from zuul.driver.sql import sqlconnection from zuul.driver.sql import sqlreporter @@ -23,7 +19,6 @@ from zuul.driver.sql import sqlreporter class SQLDriver(Driver, ConnectionInterface, ReporterInterface): name = 'sql' - log = logging.getLogger("zuul.SQLDriver") def __init__(self): self.tenant_connections = {} @@ -57,40 +52,3 @@ class SQLDriver(Driver, ConnectionInterface, ReporterInterface): def getReporterSchema(self): return sqlreporter.getSchema() - - # TODO(corvus): these are temporary, remove after cherrypy conversion - def setEventLoop(self, event_loop): - self.event_loop = event_loop - - async def handleRequest(self, request): - tenant_name = request.match_info["tenant"] - connection = self.tenant_connections.get(tenant_name) - if not connection: - return - try: - args = { - 'buildset_filters': {}, - 'build_filters': {}, - 'limit': 50, - 'skip': 0, - 'tenant': tenant_name, - } - for k, v in urllib.parse.parse_qsl(request.rel_url.query_string): - if k in ("project", "pipeline", "change", "branch", - "patchset", "ref", "newrev"): - args['buildset_filters'].setdefault(k, []).append(v) - elif k in ("uuid", "job_name", "voting", "node_name", - "result"): - args['build_filters'].setdefault(k, []).append(v) - elif k in ("limit", "skip"): - args[k] = int(v) - else: - raise ValueError("Unknown parameter %s" % k) - data = await connection.get_builds(args, self.event_loop) - resp = web.json_response(data) - resp.headers['Access-Control-Allow-Origin'] = '*' - except Exception as e: - self.log.exception("Jobs exception:") - resp = web.json_response({'error_description': 'Internal error'}, - status=500) - return resp diff --git a/zuul/driver/sql/sqlconnection.py b/zuul/driver/sql/sqlconnection.py index 06ba0bb8c0..822ba0a0fc 100644 --- a/zuul/driver/sql/sqlconnection.py +++ b/zuul/driver/sql/sqlconnection.py @@ -12,7 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -import asyncio import logging import alembic @@ -157,19 +156,11 @@ class SQLConnection(BaseConnection): return query.limit(args['limit']).offset(args['skip']).order_by( build.c.id.desc()) - async def get_builds(self, args, event_loop): + def get_builds(self, args): """Return a list of build""" builds = [] with self.engine.begin() as conn: - query = self.query(args) - query_task = event_loop.run_in_executor( - None, - conn.execute, - query - ) - rows = await asyncio.wait_for(query_task, 30) - - for row in rows: + for row in conn.execute(self.query(args)): build = dict(row) # Convert date to iso format if row.start_time: diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py index fcbbe136d2..03ac4c35bb 100755 --- a/zuul/web/__init__.py +++ b/zuul/web/__init__.py @@ -15,227 +15,48 @@ # limitations under the License. -import asyncio +import cherrypy +import socket +from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool +from ws4py.websocket import WebSocket import codecs import copy import json import logging import os import time -import uvloop - -import aiohttp -from aiohttp import web import zuul.model import zuul.rpcclient -from zuul.web.handler import StaticHandler STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static') +cherrypy.tools.websocket = WebSocketTool() -class LogStreamingHandler(object): - log = logging.getLogger("zuul.web.LogStreamingHandler") +class SaveParamsTool(cherrypy.Tool): + """ + Save the URL parameters to allow them to take precedence over query + string parameters. + """ + def __init__(self): + cherrypy.Tool.__init__(self, 'on_start_resource', + self.saveParams) - def __init__(self, rpc): - self.rpc = rpc + def _setup(self): + cherrypy.Tool._setup(self) + cherrypy.request.hooks.attach('before_handler', + self.restoreParams) - def setEventLoop(self, event_loop): - self.event_loop = event_loop + def saveParams(self, restore=True): + cherrypy.request.url_params = cherrypy.request.params.copy() + cherrypy.request.url_params_restore = restore - async def _fingerClient(self, ws, server, port, job_uuid): - """ - Create a client to connect to the finger streamer and pull results. - - :param aiohttp.web.WebSocketResponse ws: The websocket response object. - :param str server: The executor server running the job. - :param str port: The executor server port. - :param str job_uuid: The job UUID to stream. - """ - self.log.debug("Connecting to finger server %s:%s", server, port) - reader, writer = await asyncio.open_connection(host=server, port=port, - loop=self.event_loop) - - self.log.debug("Sending finger request for %s", job_uuid) - msg = "%s\n" % job_uuid # Must have a trailing newline! - - writer.write(msg.encode('utf8')) - await writer.drain() - - Decoder = codecs.getincrementaldecoder('utf8') - decoder = Decoder() - - while True: - data = await reader.read(1024) - if data: - data = decoder.decode(data) - if data: - await ws.send_str(data) - else: - # Make sure we flush anything left in the decoder - data = decoder.decode(b'', final=True) - if data: - await ws.send_str(data) - writer.close() - return - - async def _streamLog(self, ws, request): - """ - Stream the log for the requested job back to the client. - - :param aiohttp.web.WebSocketResponse ws: The websocket response object. - :param dict request: The client request parameters. - """ - for key in ('uuid', 'logfile'): - if key not in request: - return (4000, "'{key}' missing from request payload".format( - key=key)) - - # Schedule the blocking gearman work in an Executor - gear_task = self.event_loop.run_in_executor( - None, - self.rpc.get_job_log_stream_address, - request['uuid'], - ) - - try: - port_location = await asyncio.wait_for(gear_task, 10) - except asyncio.TimeoutError: - return (4010, "Gearman timeout") - - if not port_location: - return (4011, "Error with Gearman") - - try: - await self._fingerClient( - ws, port_location['server'], port_location['port'], - request['uuid'] - ) - except Exception as e: - self.log.exception("Finger client exception:") - await ws.send_str("Failure from finger client: %s" % e) - - return (1000, "No more data") - - async def processRequest(self, request): - """ - Handle a client websocket request for log streaming. - - :param aiohttp.web.Request request: The client request. - """ - try: - ws = web.WebSocketResponse() - await ws.prepare(request) - async for msg in ws: - if msg.type == aiohttp.WSMsgType.TEXT: - req = json.loads(msg.data) - self.log.debug("Websocket request: %s", req) - code, msg = await self._streamLog(ws, req) - - # We expect to process only a single message. I.e., we - # can stream only a single file at a time. - await ws.close(code=code, message=msg) - break - elif msg.type == aiohttp.WSMsgType.ERROR: - self.log.error( - "Websocket connection closed with exception %s", - ws.exception() - ) - break - elif msg.type == aiohttp.WSMsgType.CLOSED: - break - except asyncio.CancelledError: - self.log.debug("Websocket request handling cancelled") - pass - except Exception as e: - self.log.exception("Websocket exception:") - await ws.close(code=4009, message=str(e).encode('utf-8')) - return ws + def restoreParams(self): + if cherrypy.request.url_params_restore: + cherrypy.request.params.update(cherrypy.request.url_params) -class GearmanHandler(object): - log = logging.getLogger("zuul.web.GearmanHandler") - - # Tenant status cache expiry - cache_expiry = 1 - - def __init__(self, rpc): - self.rpc = rpc - self.cache = {} - self.cache_time = {} - self.controllers = { - 'tenant_list': self.tenant_list, - 'status_get': self.status_get, - 'job_list': self.job_list, - 'key_get': self.key_get, - } - - def setEventLoop(self, event_loop): - self.event_loop = event_loop - - # TODO: At some point, we should make this use a gear.Client, rather than - # the RPC client, so we can use that to make async Gearman calls. This - # implementation will create additional threads by putting the call onto - # the asycio ThreadPool, which is not ideal. - async def asyncSubmitJob(self, name, data): - ''' - Submit a job to Gearman asynchronously. - - This will raise a asyncio.TimeoutError if we hit the timeout. It is - up to the caller to handle the exception. - ''' - gear_task = self.event_loop.run_in_executor( - None, self.rpc.submitJob, name, data) - job = await asyncio.wait_for(gear_task, 300) - return job - - async def tenant_list(self, request, result_filter=None): - job = await self.asyncSubmitJob('zuul:tenant_list', {}) - return web.json_response(json.loads(job.data[0])) - - async def status_get(self, request, result_filter=None): - tenant = request.match_info["tenant"] - if tenant not in self.cache or \ - (time.time() - self.cache_time[tenant]) > self.cache_expiry: - job = await self.asyncSubmitJob('zuul:status_get', - {'tenant': tenant}) - self.cache[tenant] = json.loads(job.data[0]) - self.cache_time[tenant] = time.time() - payload = self.cache[tenant] - if payload.get('code') == 404: - return web.HTTPNotFound(reason=payload['message']) - if result_filter: - payload = result_filter.filterPayload(payload) - resp = web.json_response(payload) - resp.headers["Cache-Control"] = "public, max-age=%d" % \ - self.cache_expiry - resp.last_modified = self.cache_time[tenant] - return resp - - async def job_list(self, request, result_filter=None): - tenant = request.match_info["tenant"] - job = await self.asyncSubmitJob('zuul:job_list', {'tenant': tenant}) - return web.json_response(json.loads(job.data[0])) - - async def key_get(self, request, result_filter=None): - tenant = request.match_info["tenant"] - project = request.match_info["project"] - job = await self.asyncSubmitJob('zuul:key_get', {'tenant': tenant, - 'project': project}) - return web.Response(body=job.data[0]) - - async def processRequest(self, request, action, result_filter=None): - resp = None - try: - resp = await self.controllers[action](request, result_filter) - resp.headers['Access-Control-Allow-Origin'] = '*' - except asyncio.CancelledError: - self.log.debug("request handling cancelled") - except Exception as e: - self.log.exception("exception:") - resp = web.json_response({'error_description': 'Internal error'}, - status=500) - return resp +cherrypy.tools.save_params = SaveParamsTool() class ChangeFilter(object): @@ -256,8 +77,222 @@ class ChangeFilter(object): return change['id'] == self.desired -class ZuulWeb(object): +class LogStreamHandler(WebSocket): + log = logging.getLogger("zuul.web") + def received_message(self, message): + if message.is_text: + req = json.loads(message.data.decode('utf-8')) + self.log.debug("Websocket request: %s", req) + code, msg = self._streamLog(req) + self.log.debug("close Websocket request: %s %s", code, msg) + self.close(code, msg) + + def _streamLog(self, request): + """ + Stream the log for the requested job back to the client. + + :param dict request: The client request parameters. + """ + for key in ('uuid', 'logfile'): + if key not in request: + return (4000, "'{key}' missing from request payload".format( + key=key)) + + port_location = self.rpc.get_job_log_stream_address(request['uuid']) + if not port_location: + return (4011, "Error with Gearman") + + self._fingerClient( + port_location['server'], port_location['port'], + request['uuid']) + + return (1000, "No more data") + + def _fingerClient(self, server, port, build_uuid): + """ + Create a client to connect to the finger streamer and pull results. + + :param str server: The executor server running the job. + :param str port: The executor server port. + :param str build_uuid: The build UUID to stream. + """ + self.log.debug("Connecting to finger server %s:%s", server, port) + Decoder = codecs.getincrementaldecoder('utf8') + decoder = Decoder() + with socket.create_connection((server, port), timeout=10) as s: + # timeout only on the connection, let recv() wait forever + s.settimeout(None) + msg = "%s\n" % build_uuid # Must have a trailing newline! + s.sendall(msg.encode('utf-8')) + while True: + data = s.recv(1024) + if data: + data = decoder.decode(data) + if data: + self.send(data, False) + else: + # Make sure we flush anything left in the decoder + data = decoder.decode(b'', final=True) + if data: + self.send(data, False) + self.close() + return + + +class ZuulWebAPI(object): + log = logging.getLogger("zuul.web") + + def __init__(self, zuulweb): + self.rpc = zuulweb.rpc + self.zuulweb = zuulweb + self.cache = {} + self.cache_time = {} + self.cache_expiry = 1 + self.static_cache_expiry = zuulweb.static_cache_expiry + + @cherrypy.expose + @cherrypy.tools.json_out(content_type='application/json; charset=utf-8') + def info(self): + return self._handleInfo(self.zuulweb.info) + + @cherrypy.expose + @cherrypy.tools.save_params() + @cherrypy.tools.json_out(content_type='application/json; charset=utf-8') + def tenant_info(self, tenant): + info = self.zuulweb.info.copy() + info.tenant = tenant + return self._handleInfo(info) + + def _handleInfo(self, info): + ret = {'info': info.toDict()} + resp = cherrypy.response + resp.headers['Access-Control-Allow-Origin'] = '*' + if self.static_cache_expiry: + resp.headers['Cache-Control'] = "public, max-age=%d" % \ + self.static_cache_expiry + resp.last_modified = self.zuulweb.start_time + return ret + + @cherrypy.expose + @cherrypy.tools.json_out(content_type='application/json; charset=utf-8') + def tenants(self): + job = self.rpc.submitJob('zuul:tenant_list', {}) + ret = json.loads(job.data[0]) + resp = cherrypy.response + resp.headers['Access-Control-Allow-Origin'] = '*' + return ret + + def _getStatus(self, tenant): + if tenant not in self.cache or \ + (time.time() - self.cache_time[tenant]) > self.cache_expiry: + job = self.rpc.submitJob('zuul:status_get', + {'tenant': tenant}) + self.cache[tenant] = json.loads(job.data[0]) + self.cache_time[tenant] = time.time() + payload = self.cache[tenant] + if payload.get('code') == 404: + raise cherrypy.HTTPError(404, payload['message']) + resp = cherrypy.response + resp.headers["Cache-Control"] = "public, max-age=%d" % \ + self.cache_expiry + resp.headers["Last-modified"] = self.cache_time[tenant] + resp.headers['Access-Control-Allow-Origin'] = '*' + return payload + + @cherrypy.expose + @cherrypy.tools.save_params() + @cherrypy.tools.json_out(content_type='application/json; charset=utf-8') + def status(self, tenant): + return self._getStatus(tenant) + + @cherrypy.expose + @cherrypy.tools.save_params() + @cherrypy.tools.json_out(content_type='application/json; charset=utf-8') + def status_change(self, tenant, change): + payload = self._getStatus(tenant) + result_filter = ChangeFilter(change) + return result_filter.filterPayload(payload) + + @cherrypy.expose + @cherrypy.tools.save_params() + @cherrypy.tools.json_out(content_type='application/json; charset=utf-8') + def jobs(self, tenant): + job = self.rpc.submitJob('zuul:job_list', {'tenant': tenant}) + ret = json.loads(job.data[0]) + resp = cherrypy.response + resp.headers['Access-Control-Allow-Origin'] = '*' + return ret + + @cherrypy.expose + @cherrypy.tools.save_params() + def key(self, tenant, project): + job = self.rpc.submitJob('zuul:key_get', {'tenant': tenant, + 'project': project}) + resp = cherrypy.response + resp.headers['Access-Control-Allow-Origin'] = '*' + return job.data[0] + + @cherrypy.expose + @cherrypy.tools.save_params() + @cherrypy.tools.json_out(content_type='application/json; charset=utf-8') + def builds(self, tenant, project=None, pipeline=None, change=None, + branch=None, patchset=None, ref=None, newrev=None, + uuid=None, job_name=None, voting=None, node_name=None, + result=None, limit=50, skip=0): + sql_driver = self.zuulweb.connections.drivers['sql'] + connection = sql_driver.tenant_connections.get(tenant) + if not connection: + raise Exception("Unable to find connection for tenant %s" % tenant) + + args = { + 'buildset_filters': {'tenant': [tenant]}, + 'build_filters': {}, + 'limit': limit, + 'skip': skip, + } + + for k in ("project", "pipeline", "change", "branch", + "patchset", "ref", "newrev"): + v = locals()[k] + if v: + args['buildset_filters'].setdefault(k, []).append(v) + for k in ("uuid", "job_name", "voting", "node_name", + "result"): + v = locals()[k] + if v: + args['build_filters'].setdefault(k, []).append(v) + data = connection.get_builds(args) + resp = cherrypy.response + resp.headers['Access-Control-Allow-Origin'] = '*' + return data + + @cherrypy.expose + @cherrypy.tools.save_params() + @cherrypy.tools.websocket(handler_cls=LogStreamHandler) + def console_stream(self, tenant): + cherrypy.request.ws_handler.rpc = self.rpc + + +class TenantStaticHandler(object): + def __init__(self, path): + self._cp_config = { + 'tools.staticdir.on': True, + 'tools.staticdir.dir': path, + 'tools.staticdir.index': 'status.html', + } + + +class RootStaticHandler(object): + def __init__(self, path): + self._cp_config = { + 'tools.staticdir.on': True, + 'tools.staticdir.dir': path, + 'tools.staticdir.index': 'tenants.html', + } + + +class ZuulWeb(object): log = logging.getLogger("zuul.web.ZuulWeb") def __init__(self, listen_address, listen_port, @@ -265,7 +300,6 @@ class ZuulWeb(object): ssl_key=None, ssl_cert=None, ssl_ca=None, static_cache_expiry=3600, connections=None, - _connections=None, info=None, static_path=None): self.start_time = time.time() @@ -276,168 +310,90 @@ class ZuulWeb(object): self.server = None self.static_cache_expiry = static_cache_expiry self.info = info - self.static_path = static_path or STATIC_DIR + self.static_path = os.path.abspath(static_path or STATIC_DIR) # instanciate handlers self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port, ssl_key, ssl_cert, ssl_ca) - self.log_streaming_handler = LogStreamingHandler(self.rpc) - self.gearman_handler = GearmanHandler(self.rpc) - self._plugin_routes = [] # type: List[zuul.web.handler.BaseWebHandler] - self._connection_handlers = [] - connections = connections or [] - for connection in connections: - self._connection_handlers.extend( - connection.getWebHandlers(self, self.info)) - self.connections = _connections - self._plugin_routes.extend(self._connection_handlers) + self.connections = connections - async def _handleWebsocket(self, request): - return await self.log_streaming_handler.processRequest( - request) + route_map = cherrypy.dispatch.RoutesDispatcher() + api = ZuulWebAPI(self) + tenant_static = TenantStaticHandler(self.static_path) + root_static = RootStaticHandler(self.static_path) + route_map.connect('api', '/api/info', + controller=api, action='info') + route_map.connect('api', '/api/tenants', + controller=api, action='tenants') + route_map.connect('api', '/api/tenant/{tenant}/info', + controller=api, action='tenant_info') + route_map.connect('api', '/api/tenant/{tenant}/status', + controller=api, action='status') + route_map.connect('api', '/api/tenant/{tenant}/status/change/{change}', + controller=api, action='status_change') + route_map.connect('api', '/api/tenant/{tenant}/jobs', + controller=api, action='jobs') + route_map.connect('api', '/api/tenant/{tenant}/key/{project:.*}.pub', + controller=api, action='key') + route_map.connect('api', '/api/tenant/{tenant}/console-stream', + controller=api, action='console_stream') + route_map.connect('api', '/api/tenant/{tenant}/builds', + controller=api, action='builds') - def _handleRootInfo(self, request): - return self._handleInfo(self.info) - - def _handleTenantInfo(self, request): - info = self.info.copy() - info.tenant = request.match_info["tenant"] - return self._handleInfo(info) - - def _handleInfo(self, info): - resp = web.json_response({'info': info.toDict()}, status=200) - resp.headers['Access-Control-Allow-Origin'] = '*' - if self.static_cache_expiry: - resp.headers['Cache-Control'] = "public, max-age=%d" % \ - self.static_cache_expiry - resp.last_modified = self.start_time - return resp - - async def _handleTenantsRequest(self, request): - return await self.gearman_handler.processRequest(request, - 'tenant_list') - - async def _handleStatusRequest(self, request): - return await self.gearman_handler.processRequest(request, 'status_get') - - async def _handleStatusChangeRequest(self, request): - change = request.match_info["change"] - return await self.gearman_handler.processRequest( - request, 'status_get', ChangeFilter(change)) - - async def _handleJobsRequest(self, request): - return await self.gearman_handler.processRequest(request, 'job_list') - - async def _handleKeyRequest(self, request): - return await self.gearman_handler.processRequest(request, 'key_get') - - async def _handleStatic(self, request): - # http://example.com//status.html comes in as '/status.html' - target_path = request.match_info['path'].lstrip('/') - fs_path = os.path.abspath(os.path.join(self.static_path, target_path)) - if not fs_path.startswith(os.path.abspath(self.static_path)): - return web.HTTPForbidden() - if not os.path.exists(fs_path): - return web.HTTPNotFound() - return web.FileResponse(fs_path) - - def run(self, loop=None): - """ - Run the websocket daemon. - - Because this method can be the target of a new thread, we need to - set the thread event loop here, rather than in __init__(). - - :param loop: The event loop to use. If not supplied, the default main - thread event loop is used. This should be supplied if ZuulWeb - is run within a separate (non-main) thread. - """ - sql_driver = self.connections.drivers['sql'] - routes = [ - ('GET', '/api/info', self._handleRootInfo), - ('GET', '/api/tenants', self._handleTenantsRequest), - ('GET', '/api/tenant/{tenant}/info', self._handleTenantInfo), - ('GET', '/api/tenant/{tenant}/status', self._handleStatusRequest), - ('GET', '/api/tenant/{tenant}/jobs', self._handleJobsRequest), - ('GET', '/api/tenant/{tenant}/status/change/{change}', - self._handleStatusChangeRequest), - ('GET', '/api/tenant/{tenant}/console-stream', - self._handleWebsocket), - ('GET', '/api/tenant/{tenant}/key/{project:.*}.pub', - self._handleKeyRequest), - ('GET', '/api/tenant/{tenant}/builds', - sql_driver.handleRequest), - ] - - static_routes = [ - StaticHandler(self, '/t/{tenant}/', 'status.html'), - StaticHandler(self, '/', 'tenants.html'), - ] - - for route in static_routes + self._plugin_routes: - routes.append((route.method, route.path, route.handleRequest)) + for connection in connections.connections.values(): + controller = connection.getWebController(self, self.info) + if controller: + cherrypy.tree.mount( + controller, + '/api/connection/%s' % connection.connection_name) # Add fallthrough routes at the end for the static html/js files - routes.append(('GET', '/t/{tenant}/{path:.*}', self._handleStatic)) - routes.append(('GET', '/{path:.*}', self._handleStatic)) + route_map.connect('root_static', '/{path:.*}', + controller=root_static, action='default') + route_map.connect('tenant_static', '/t/{tenant}/{path:.*}', + controller=tenant_static, action='default') + conf = { + '/': { + 'request.dispatch': route_map + } + } + cherrypy.config.update({ + 'global': { + 'environment': 'production', + 'server.socket_host': listen_address, + 'server.socket_port': listen_port, + }, + }) + + cherrypy.tree.mount(api, '/', config=conf) + + @property + def port(self): + return cherrypy.server.bound_addr[1] + + def start(self): self.log.debug("ZuulWeb starting") - asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) - user_supplied_loop = loop is not None - if not loop: - loop = asyncio.get_event_loop() - asyncio.set_event_loop(loop) - - self.event_loop = loop - self.log_streaming_handler.setEventLoop(loop) - self.gearman_handler.setEventLoop(loop) - sql_driver.setEventLoop(loop) - - for handler in self._connection_handlers: - if hasattr(handler, 'setEventLoop'): - handler.setEventLoop(loop) - - app = web.Application() - for method, path, handler in routes: - app.router.add_route(method, path, handler) - handler = app.make_handler(loop=self.event_loop) - - # create the server - coro = self.event_loop.create_server(handler, - self.listen_address, - self.listen_port) - self.server = self.event_loop.run_until_complete(coro) - - self.term = asyncio.Future() - - # start the server - self.event_loop.run_until_complete(self.term) - - # cleanup - self.log.debug("ZuulWeb stopping") - self.server.close() - self.event_loop.run_until_complete(self.server.wait_closed()) - self.event_loop.run_until_complete(app.shutdown()) - self.event_loop.run_until_complete(handler.shutdown(60.0)) - self.event_loop.run_until_complete(app.cleanup()) - self.log.debug("ZuulWeb stopped") - - # Only run these if we are controlling the loop - they need to be - # run from the main thread - if not user_supplied_loop: - loop.stop() - loop.close() - - self.rpc.shutdown() + self.wsplugin = WebSocketPlugin(cherrypy.engine) + self.wsplugin.subscribe() + cherrypy.engine.start() def stop(self): - if self.event_loop and self.term: - self.event_loop.call_soon_threadsafe(self.term.set_result, True) + self.log.debug("ZuulWeb stopping") + self.rpc.shutdown() + cherrypy.engine.exit() + # Not strictly necessary, but without this, if the server is + # started again (e.g., in the unit tests) it will reuse the + # same host/port settings. + cherrypy.server.httpserver = None + self.wsplugin.unsubscribe() if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) - loop = asyncio.get_event_loop() - loop.set_debug(True) + import zuul.lib.connections + connections = zuul.lib.connections.ConnectionRegistry() z = ZuulWeb(listen_address="127.0.0.1", listen_port=9000, - gear_server="127.0.0.1", gear_port=4730) - z.run(loop) + gear_server="127.0.0.1", gear_port=4730, + connections=connections) + z.start() + cherrypy.engine.block() diff --git a/zuul/web/handler.py b/zuul/web/handler.py index ff631b2b85..75a9a7b443 100644 --- a/zuul/web/handler.py +++ b/zuul/web/handler.py @@ -12,49 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -import abc -import os -from aiohttp import web - - -class BaseWebHandler(object, metaclass=abc.ABCMeta): - - def __init__(self, connection, zuul_web, method, path): - self.connection = connection - self.zuul_web = zuul_web - self.method = method - self.path = path - - @abc.abstractmethod - async def handleRequest(self, request): - """Process a web request.""" - - -class BaseDriverWebHandler(BaseWebHandler): - - def __init__(self, connection, zuul_web, method, path): - super(BaseDriverWebHandler, self).__init__( - connection=connection, zuul_web=zuul_web, method=method, path=path) - if path.startswith('/'): - path = path[1:] - self.path = '/api/connection/{connection}/{path}'.format( - connection=self.connection.connection_name, - path=path) - - -class StaticHandler(BaseWebHandler): - - def __init__(self, zuul_web, path, file_path=None): - super(StaticHandler, self).__init__(None, zuul_web, 'GET', path) - self.static_path = zuul_web.static_path - self.file_path = file_path or path.split('/')[-1] - - async def handleRequest(self, request): - """Process a web request.""" - headers = {} - fp = os.path.join(self.static_path, self.file_path) - if self.zuul_web.static_cache_expiry: - headers['Cache-Control'] = "public, max-age=%d" % \ - self.zuul_web.static_cache_expiry - return web.FileResponse(fp, headers=headers) +class BaseWebController(object): + pass