Browse Source

Replace use of aiohttp with cherrypy

* Aiohttp (and related libraries) have a python support policy
  which is causing us problems.
* Cherrypy supports threads which integrates well with the rest
  of Zuul.

Change-Id: Ib611df06035890d3e87fc5ad92fdfc7ac441edce
changes/59/567959/23
James E. Blair 3 years ago
parent
commit
0eeceba5a5
  1. 4
      doc/source/developer/ansible.rst
  2. 3
      requirements.txt
  3. 15
      tests/base.py
  4. 17
      tests/unit/test_github_driver.py
  5. 28
      tests/unit/test_streaming.py
  6. 19
      tests/unit/test_web.py
  7. 18
      zuul/cmd/web.py
  8. 16
      zuul/connection/__init__.py
  9. 54
      zuul/driver/github/githubconnection.py
  10. 42
      zuul/driver/sql/__init__.py
  11. 13
      zuul/driver/sql/sqlconnection.py
  12. 626
      zuul/web/__init__.py
  13. 47
      zuul/web/handler.py

4
doc/source/developer/ansible.rst

@ -43,7 +43,7 @@ that starts a log streaming daemon on the build node.
All jobs run with the :py:mod:`zuul.ansible.callback.zuul_stream` callback
plugin enabled, which writes the build log to a file so that the
:py:class:`zuul.lib.log_streamer.LogStreamer` can provide the data on demand
over the finger protocol. Finally, :py:class:`zuul.web.LogStreamingHandler`
over the finger protocol. Finally, :py:class:`zuul.web.LogStreamHandler`
exposes that log stream over a websocket connection as part of
:py:class:`zuul.web.ZuulWeb`.
@ -51,7 +51,7 @@ exposes that log stream over a websocket connection as part of
:members:
.. autoclass:: zuul.lib.log_streamer.LogStreamer
.. autoclass:: zuul.web.LogStreamingHandler
.. autoclass:: zuul.web.LogStreamHandler
.. autoclass:: zuul.web.ZuulWeb
In addition to real-time streaming, Zuul also installs another callback module,

3
requirements.txt

@ -29,3 +29,6 @@ uvloop;python_version>='3.5'
psutil
fb-re2>=1.0.6
paho-mqtt
cherrypy
ws4py
routes

15
tests/base.py

@ -15,7 +15,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import asyncio
import configparser
from contextlib import contextmanager
import datetime
@ -1894,22 +1893,14 @@ class ZuulWebFixture(fixtures.Fixture):
listen_address='127.0.0.1', listen_port=0,
gear_server='127.0.0.1', gear_port=self.gearman_server_port,
info=zuul.model.WebInfo(),
_connections=self.connections)
loop = asyncio.new_event_loop()
loop.set_debug(True)
ws_thread = threading.Thread(target=self.web.run, args=(loop,))
ws_thread.start()
self.addCleanup(loop.close)
self.addCleanup(ws_thread.join)
connections=self.connections)
self.web.start()
self.addCleanup(self.web.stop)
self.host = 'localhost'
# Wait until web server is started
while True:
time.sleep(0.1)
if self.web.server is None:
continue
self.port = self.web.server.sockets[0].getsockname()[1]
self.port = self.web.port
try:
with socket.create_connection((self.host, self.port)):
break

17
tests/unit/test_github_driver.py

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import asyncio
import threading
import os
import re
from testtools.matchers import MatchesRegex, StartsWith
@ -789,23 +787,14 @@ class TestGithubWebhook(ZuulTestCase):
self.web = zuul.web.ZuulWeb(
listen_address='127.0.0.1', listen_port=0,
gear_server='127.0.0.1', gear_port=self.gearman_server.port,
connections=[self.fake_github],
_connections=self.connections)
loop = asyncio.new_event_loop()
loop.set_debug(True)
ws_thread = threading.Thread(target=self.web.run, args=(loop,))
ws_thread.start()
self.addCleanup(loop.close)
self.addCleanup(ws_thread.join)
connections=self.connections)
self.web.start()
self.addCleanup(self.web.stop)
host = '127.0.0.1'
# Wait until web server is started
while True:
time.sleep(0.1)
if self.web.server is None:
continue
port = self.web.server.sockets[0].getsockname()[1]
port = self.web.port
try:
with socket.create_connection((host, port)):
break

28
tests/unit/test_streaming.py

@ -283,21 +283,13 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
listen_address='::', listen_port=0,
gear_server='127.0.0.1', gear_port=self.gearman_server.port,
static_path=tempfile.gettempdir(),
_connections=self.connections)
loop = asyncio.new_event_loop()
loop.set_debug(True)
ws_thread = threading.Thread(target=web_server.run, args=(loop,))
ws_thread.start()
self.addCleanup(loop.close)
self.addCleanup(ws_thread.join)
connections=self.connections)
web_server.start()
self.addCleanup(web_server.stop)
# Wait until web server is started
while True:
if web_server.server is None:
time.sleep(0.1)
continue
port = web_server.server.sockets[0].getsockname()[1]
port = web_server.port
try:
with socket.create_connection((self.host, port)):
break
@ -374,21 +366,13 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
listen_address='::', listen_port=0,
gear_server='127.0.0.1', gear_port=self.gearman_server.port,
static_path=tempfile.gettempdir(),
_connections=self.connections)
loop = asyncio.new_event_loop()
loop.set_debug(True)
ws_thread = threading.Thread(target=web_server.run, args=(loop,))
ws_thread.start()
self.addCleanup(loop.close)
self.addCleanup(ws_thread.join)
connections=self.connections)
web_server.start()
self.addCleanup(web_server.stop)
# Wait until web server is started
while True:
if web_server.server is None:
time.sleep(0.1)
continue
port = web_server.server.sockets[0].getsockname()[1]
port = web_server.port
try:
with socket.create_connection((self.host, port)):
break

19
tests/unit/test_web.py

@ -15,12 +15,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import asyncio
import json
import threading
import os
import urllib.parse
import time
import socket
import requests
@ -63,25 +60,15 @@ class BaseTestWeb(ZuulTestCase):
listen_address='127.0.0.1', listen_port=0,
gear_server='127.0.0.1', gear_port=self.gearman_server.port,
info=zuul.model.WebInfo.fromConfig(self.zuul_ini_config),
connections=self.connections.connections.values(),
_connections=self.connections
connections=self.connections
)
loop = asyncio.new_event_loop()
loop.set_debug(True)
ws_thread = threading.Thread(target=self.web.run, args=(loop,))
ws_thread.start()
self.addCleanup(loop.close)
self.addCleanup(ws_thread.join)
self.web.start()
self.addCleanup(self.web.stop)
self.host = 'localhost'
self.port = self.web.port
# Wait until web server is started
while True:
time.sleep(0.1)
if self.web.server is None:
continue
self.port = self.web.server.sockets[0].getsockname()[1]
print(self.host, self.port)
try:
with socket.create_connection((self.host, self.port)):
break

18
zuul/cmd/web.py

@ -13,11 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import asyncio
import logging
import signal
import sys
import threading
import zuul.cmd
import zuul.model
@ -55,13 +53,11 @@ class WebServer(zuul.cmd.ZuulDaemonApp):
params['ssl_cert'] = get_default(self.config, 'gearman', 'ssl_cert')
params['ssl_ca'] = get_default(self.config, 'gearman', 'ssl_ca')
params['_connections'] = self.connections
params['connections'] = []
params['connections'] = self.connections
# Validate config here before we spin up the ZuulWeb object
for conn_name, connection in self.connections.connections.items():
try:
if connection.validateWebConfig(self.config, self.connections):
params['connections'].append(connection)
connection.validateWebConfig(self.config, self.connections)
except Exception:
self.log.exception("Error validating config")
sys.exit(1)
@ -72,15 +68,11 @@ class WebServer(zuul.cmd.ZuulDaemonApp):
self.log.exception("Error creating ZuulWeb:")
sys.exit(1)
loop = asyncio.get_event_loop()
signal.signal(signal.SIGUSR1, self.exit_handler)
signal.signal(signal.SIGTERM, self.exit_handler)
self.log.info('Zuul Web Server starting')
self.thread = threading.Thread(target=self.web.run,
args=(loop,),
name='web')
self.thread.start()
self.web.start()
try:
signal.pause()
@ -88,9 +80,7 @@ class WebServer(zuul.cmd.ZuulDaemonApp):
print("Ctrl + C: asking web server to exit nicely...\n")
self.exit_handler(signal.SIGINT, None)
self.thread.join()
loop.stop()
loop.close()
self.web.stop()
self.log.info("Zuul Web Server stopped")
def run(self):

16
zuul/connection/__init__.py

@ -75,26 +75,20 @@ class BaseConnection(object, metaclass=abc.ABCMeta):
still in use. Anything in our cache that isn't in the supplied
list should be safe to remove from the cache."""
def getWebHandlers(self, zuul_web, info):
"""Return a list of web handlers to register with zuul-web.
def getWebController(self, zuul_web, info):
"""Return a cherrypy web controller to register with zuul-web.
:param zuul.web.ZuulWeb zuul_web:
Zuul Web instance.
:param zuul.model.WebInfo info:
The WebInfo object for the Zuul Web instance. Can be used by
plugins to toggle API capabilities.
:returns: List of `zuul.web.handler.BaseWebHandler` instances.
:returns: A `zuul.web.handler.BaseWebController` instance.
"""
return []
return None
def validateWebConfig(self, config, connections):
"""Validate config and determine whether to register web handlers.
By default this method returns False, which means this connection
has no web handlers to register.
If the method returns True, then its `getWebHandlers` method
should be called during route registration.
"""Validate web config.
If there is a fatal error, the method should raise an exception.

54
zuul/driver/github/githubconnection.py

@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import asyncio
import collections
import datetime
import logging
@ -25,7 +24,7 @@ import re
import json
import traceback
from aiohttp import web
import cherrypy
import cachecontrol
from cachecontrol.cache import DictCache
from cachecontrol.heuristics import BaseHeuristic
@ -39,7 +38,7 @@ import github3.exceptions
import gear
from zuul.connection import BaseConnection
from zuul.web.handler import BaseDriverWebHandler
from zuul.web.handler import BaseWebController
from zuul.lib.config import get_default
from zuul.model import Ref, Branch, Tag, Project
from zuul.exceptions import MergeFailure
@ -1149,8 +1148,8 @@ class GithubConnection(BaseConnection):
return statuses
def getWebHandlers(self, zuul_web, info):
return [GithubWebhookHandler(self, zuul_web, 'POST', 'payload')]
def getWebController(self, zuul_web, info):
return GithubWebController(zuul_web, self)
def validateWebConfig(self, config, connections):
if 'webhook_token' not in self.connection_config:
@ -1160,21 +1159,20 @@ class GithubConnection(BaseConnection):
return True
class GithubWebhookHandler(BaseDriverWebHandler):
class GithubWebController(BaseWebController):
log = logging.getLogger("zuul.GithubWebhookHandler")
log = logging.getLogger("zuul.GithubWebController")
def __init__(self, connection, zuul_web, method, path):
super(GithubWebhookHandler, self).__init__(
connection=connection, zuul_web=zuul_web, method=method, path=path)
def __init__(self, zuul_web, connection):
self.connection = connection
self.zuul_web = zuul_web
self.token = self.connection.connection_config.get('webhook_token')
def _validate_signature(self, body, headers):
try:
request_signature = headers['x-hub-signature']
except KeyError:
raise web.HTTPUnauthorized(
reason='X-Hub-Signature header missing.')
raise cherrypy.HTTPError(401, 'X-Hub-Signature header missing.')
payload_signature = _sign_request(body, self.token)
@ -1182,16 +1180,16 @@ class GithubWebhookHandler(BaseDriverWebHandler):
self.log.debug("Request Signature: {0}".format(str(request_signature)))
if not hmac.compare_digest(
str(payload_signature), str(request_signature)):
raise web.HTTPUnauthorized(
reason=('Request signature does not match calculated payload '
'signature. Check that secret is correct.'))
raise cherrypy.HTTPError(
401,
'Request signature does not match calculated payload '
'signature. Check that secret is correct.')
return True
def setEventLoop(self, event_loop):
self.event_loop = event_loop
async def handleRequest(self, request):
@cherrypy.expose
@cherrypy.tools.json_out(content_type='application/json; charset=utf-8')
def payload(self):
# Note(tobiash): We need to normalize the headers. Otherwise we will
# have trouble to get them from the dict afterwards.
# e.g.
@ -1202,28 +1200,22 @@ class GithubWebhookHandler(BaseDriverWebHandler):
# modifies the header casing in its own way and by specification http
# headers are case insensitive so just lowercase all so we don't have
# to take care later.
# Note(corvus): Don't use cherrypy's json_in here so that we
# can validate the signature.
headers = dict()
for key, value in request.headers.items():
for key, value in cherrypy.request.headers.items():
headers[key.lower()] = value
body = await request.read()
body = cherrypy.request.body.read()
self._validate_signature(body, headers)
# We cannot send the raw body through gearman, so it's easy to just
# encode it as json, after decoding it as utf-8
json_body = json.loads(body.decode('utf-8'))
gear_task = self.event_loop.run_in_executor(
None, self.zuul_web.rpc.submitJob,
job = self.zuul_web.rpc.submitJob(
'github:%s:payload' % self.connection.connection_name,
{'headers': headers, 'body': json_body})
try:
job = await asyncio.wait_for(gear_task, 300)
except asyncio.TimeoutError:
self.log.exception("Gearman timeout:")
return web.json_response({'error_description': 'Internal error'},
status=500)
return web.json_response(json.loads(job.data[0]))
return json.loads(job.data[0])
def _status_as_tuple(status):

42
zuul/driver/sql/__init__.py

@ -12,10 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
from aiohttp import web
import urllib.parse
from zuul.driver import Driver, ConnectionInterface, ReporterInterface
from zuul.driver.sql import sqlconnection
from zuul.driver.sql import sqlreporter
@ -23,7 +19,6 @@ from zuul.driver.sql import sqlreporter
class SQLDriver(Driver, ConnectionInterface, ReporterInterface):
name = 'sql'
log = logging.getLogger("zuul.SQLDriver")
def __init__(self):
self.tenant_connections = {}
@ -57,40 +52,3 @@ class SQLDriver(Driver, ConnectionInterface, ReporterInterface):
def getReporterSchema(self):
return sqlreporter.getSchema()
# TODO(corvus): these are temporary, remove after cherrypy conversion
def setEventLoop(self, event_loop):
self.event_loop = event_loop
async def handleRequest(self, request):
tenant_name = request.match_info["tenant"]
connection = self.tenant_connections.get(tenant_name)
if not connection:
return
try:
args = {
'buildset_filters': {},
'build_filters': {},
'limit': 50,
'skip': 0,
'tenant': tenant_name,
}
for k, v in urllib.parse.parse_qsl(request.rel_url.query_string):
if k in ("project", "pipeline", "change", "branch",
"patchset", "ref", "newrev"):
args['buildset_filters'].setdefault(k, []).append(v)
elif k in ("uuid", "job_name", "voting", "node_name",
"result"):
args['build_filters'].setdefault(k, []).append(v)
elif k in ("limit", "skip"):
args[k] = int(v)
else:
raise ValueError("Unknown parameter %s" % k)
data = await connection.get_builds(args, self.event_loop)
resp = web.json_response(data)
resp.headers['Access-Control-Allow-Origin'] = '*'
except Exception as e:
self.log.exception("Jobs exception:")
resp = web.json_response({'error_description': 'Internal error'},
status=500)
return resp

13
zuul/driver/sql/sqlconnection.py

@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import asyncio
import logging
import alembic
@ -157,19 +156,11 @@ class SQLConnection(BaseConnection):
return query.limit(args['limit']).offset(args['skip']).order_by(
build.c.id.desc())
async def get_builds(self, args, event_loop):
def get_builds(self, args):
"""Return a list of build"""
builds = []
with self.engine.begin() as conn:
query = self.query(args)
query_task = event_loop.run_in_executor(
None,
conn.execute,
query
)
rows = await asyncio.wait_for(query_task, 30)
for row in rows:
for row in conn.execute(self.query(args)):
build = dict(row)
# Convert date to iso format
if row.start_time:

626
zuul/web/__init__.py

@ -15,75 +15,83 @@
# limitations under the License.
import asyncio
import cherrypy
import socket
from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
from ws4py.websocket import WebSocket
import codecs
import copy
import json
import logging
import os
import time
import uvloop
import aiohttp
from aiohttp import web
import zuul.model
import zuul.rpcclient
from zuul.web.handler import StaticHandler
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
cherrypy.tools.websocket = WebSocketTool()
class LogStreamingHandler(object):
log = logging.getLogger("zuul.web.LogStreamingHandler")
class SaveParamsTool(cherrypy.Tool):
"""
Save the URL parameters to allow them to take precedence over query
string parameters.
"""
def __init__(self):
cherrypy.Tool.__init__(self, 'on_start_resource',
self.saveParams)
def __init__(self, rpc):
self.rpc = rpc
def _setup(self):
cherrypy.Tool._setup(self)
cherrypy.request.hooks.attach('before_handler',
self.restoreParams)
def setEventLoop(self, event_loop):
self.event_loop = event_loop
def saveParams(self, restore=True):
cherrypy.request.url_params = cherrypy.request.params.copy()
cherrypy.request.url_params_restore = restore
async def _fingerClient(self, ws, server, port, job_uuid):
"""
Create a client to connect to the finger streamer and pull results.
def restoreParams(self):
if cherrypy.request.url_params_restore:
cherrypy.request.params.update(cherrypy.request.url_params)
:param aiohttp.web.WebSocketResponse ws: The websocket response object.
:param str server: The executor server running the job.
:param str port: The executor server port.
:param str job_uuid: The job UUID to stream.
"""
self.log.debug("Connecting to finger server %s:%s", server, port)
reader, writer = await asyncio.open_connection(host=server, port=port,
loop=self.event_loop)
self.log.debug("Sending finger request for %s", job_uuid)
msg = "%s\n" % job_uuid # Must have a trailing newline!
cherrypy.tools.save_params = SaveParamsTool()
writer.write(msg.encode('utf8'))
await writer.drain()
Decoder = codecs.getincrementaldecoder('utf8')
decoder = Decoder()
class ChangeFilter(object):
def __init__(self, desired):
self.desired = desired
while True:
data = await reader.read(1024)
if data:
data = decoder.decode(data)
if data:
await ws.send_str(data)
else:
# Make sure we flush anything left in the decoder
data = decoder.decode(b'', final=True)
if data:
await ws.send_str(data)
writer.close()
return
def filterPayload(self, payload):
status = []
for pipeline in payload['pipelines']:
for change_queue in pipeline['change_queues']:
for head in change_queue['heads']:
for change in head:
if self.wantChange(change):
status.append(copy.deepcopy(change))
return status
def wantChange(self, change):
return change['id'] == self.desired
class LogStreamHandler(WebSocket):
log = logging.getLogger("zuul.web")
async def _streamLog(self, ws, request):
def received_message(self, message):
if message.is_text:
req = json.loads(message.data.decode('utf-8'))
self.log.debug("Websocket request: %s", req)
code, msg = self._streamLog(req)
self.log.debug("close Websocket request: %s %s", code, msg)
self.close(code, msg)
def _streamLog(self, request):
"""
Stream the log for the requested job back to the client.
:param aiohttp.web.WebSocketResponse ws: The websocket response object.
:param dict request: The client request parameters.
"""
for key in ('uuid', 'logfile'):
@ -91,173 +99,200 @@ class LogStreamingHandler(object):
return (4000, "'{key}' missing from request payload".format(
key=key))
# Schedule the blocking gearman work in an Executor
gear_task = self.event_loop.run_in_executor(
None,
self.rpc.get_job_log_stream_address,
request['uuid'],
)
try:
port_location = await asyncio.wait_for(gear_task, 10)
except asyncio.TimeoutError:
return (4010, "Gearman timeout")
port_location = self.rpc.get_job_log_stream_address(request['uuid'])
if not port_location:
return (4011, "Error with Gearman")
try:
await self._fingerClient(
ws, port_location['server'], port_location['port'],
request['uuid']
)
except Exception as e:
self.log.exception("Finger client exception:")
await ws.send_str("Failure from finger client: %s" % e)
self._fingerClient(
port_location['server'], port_location['port'],
request['uuid'])
return (1000, "No more data")
async def processRequest(self, request):
def _fingerClient(self, server, port, build_uuid):
"""
Handle a client websocket request for log streaming.
Create a client to connect to the finger streamer and pull results.
:param aiohttp.web.Request request: The client request.
:param str server: The executor server running the job.
:param str port: The executor server port.
:param str build_uuid: The build UUID to stream.
"""
try:
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
req = json.loads(msg.data)
self.log.debug("Websocket request: %s", req)
code, msg = await self._streamLog(ws, req)
# We expect to process only a single message. I.e., we
# can stream only a single file at a time.
await ws.close(code=code, message=msg)
break
elif msg.type == aiohttp.WSMsgType.ERROR:
self.log.error(
"Websocket connection closed with exception %s",
ws.exception()
)
break
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
except asyncio.CancelledError:
self.log.debug("Websocket request handling cancelled")
pass
except Exception as e:
self.log.exception("Websocket exception:")
await ws.close(code=4009, message=str(e).encode('utf-8'))
return ws
class GearmanHandler(object):
log = logging.getLogger("zuul.web.GearmanHandler")
# Tenant status cache expiry
cache_expiry = 1
def __init__(self, rpc):
self.rpc = rpc
self.log.debug("Connecting to finger server %s:%s", server, port)
Decoder = codecs.getincrementaldecoder('utf8')
decoder = Decoder()
with socket.create_connection((server, port), timeout=10) as s:
# timeout only on the connection, let recv() wait forever
s.settimeout(None)
msg = "%s\n" % build_uuid # Must have a trailing newline!
s.sendall(msg.encode('utf-8'))
while True:
data = s.recv(1024)
if data:
data = decoder.decode(data)
if data:
self.send(data, False)
else:
# Make sure we flush anything left in the decoder
data = decoder.decode(b'', final=True)
if data:
self.send(data, False)
self.close()
return
class ZuulWebAPI(object):
log = logging.getLogger("zuul.web")
def __init__(self, zuulweb):
self.rpc = zuulweb.rpc
self.zuulweb = zuulweb
self.cache = {}
self.cache_time = {}
self.controllers = {
'tenant_list': self.tenant_list,
'status_get': self.status_get,
'job_list': self.job_list,
'key_get': self.key_get,
}
self.cache_expiry = 1
self.static_cache_expiry = zuulweb.static_cache_expiry
@cherrypy.expose
@cherrypy.tools.json_out(content_type='application/json; charset=utf-8')
def info(self):
return self._handleInfo(self.zuulweb.info)
@cherrypy.expose
@cherrypy.tools.save_params()
@cherrypy.tools.json_out(content_type='application/json; charset=utf-8')
def tenant_info(self, tenant):
info = self.zuulweb.info.copy()
info.tenant = tenant
return self._handleInfo(info)
def _handleInfo(self, info):
ret = {'info': info.toDict()}
resp = cherrypy.response
resp.headers['Access-Control-Allow-Origin'] = '*'
if self.static_cache_expiry:
resp.headers['Cache-Control'] = "public, max-age=%d" % \
self.static_cache_expiry
resp.last_modified = self.zuulweb.start_time
return ret
@cherrypy.expose
@cherrypy.tools.json_out(content_type='application/json; charset=utf-8')
def tenants(self):
job = self.rpc.submitJob('zuul:tenant_list', {})
ret = json.loads(job.data[0])
resp = cherrypy.response
resp.headers['Access-Control-Allow-Origin'] = '*'
return ret
def setEventLoop(self, event_loop):
self.event_loop = event_loop
# TODO: At some point, we should make this use a gear.Client, rather than
# the RPC client, so we can use that to make async Gearman calls. This
# implementation will create additional threads by putting the call onto
# the asycio ThreadPool, which is not ideal.
async def asyncSubmitJob(self, name, data):
'''
Submit a job to Gearman asynchronously.
This will raise a asyncio.TimeoutError if we hit the timeout. It is
up to the caller to handle the exception.
'''
gear_task = self.event_loop.run_in_executor(
None, self.rpc.submitJob, name, data)
job = await asyncio.wait_for(gear_task, 300)
return job
async def tenant_list(self, request, result_filter=None):
job = await self.asyncSubmitJob('zuul:tenant_list', {})
return web.json_response(json.loads(job.data[0]))
async def status_get(self, request, result_filter=None):
tenant = request.match_info["tenant"]
def _getStatus(self, tenant):
if tenant not in self.cache or \
(time.time() - self.cache_time[tenant]) > self.cache_expiry:
job = await self.asyncSubmitJob('zuul:status_get',
{'tenant': tenant})
job = self.rpc.submitJob('zuul:status_get',
{'tenant': tenant})
self.cache[tenant] = json.loads(job.data[0])
self.cache_time[tenant] = time.time()
payload = self.cache[tenant]
if payload.get('code') == 404:
return web.HTTPNotFound(reason=payload['message'])
if result_filter:
payload = result_filter.filterPayload(payload)
resp = web.json_response(payload)
raise cherrypy.HTTPError(404, payload['message'])
resp = cherrypy.response
resp.headers["Cache-Control"] = "public, max-age=%d" % \
self.cache_expiry
resp.last_modified = self.cache_time[tenant]
return resp
async def job_list(self, request, result_filter=None):
tenant = request.match_info["tenant"]
job = await self.asyncSubmitJob('zuul:job_list', {'tenant': tenant})
return web.json_response(json.loads(job.data[0]))
async def key_get(self, request, result_filter=None):
tenant = request.match_info["tenant"]
project = request.match_info["project"]
job = await self.asyncSubmitJob('zuul:key_get', {'tenant': tenant,
'project': project})
return web.Response(body=job.data[0])
async def processRequest(self, request, action, result_filter=None):
resp = None
try:
resp = await self.controllers[action](request, result_filter)
resp.headers['Access-Control-Allow-Origin'] = '*'
except asyncio.CancelledError:
self.log.debug("request handling cancelled")
except Exception as e:
self.log.exception("exception:")
resp = web.json_response({'error_description': 'Internal error'},
status=500)
return resp
resp.headers["Last-modified"] = self.cache_time[tenant]
resp.headers['Access-Control-Allow-Origin'] = '*'
return payload
@cherrypy.expose
@cherrypy.tools.save_params()
@cherrypy.tools.json_out(content_type='application/json; charset=utf-8')
def status(self, tenant):
return self._getStatus(tenant)
@cherrypy.expose
@cherrypy.tools.save_params()
@cherrypy.tools.json_out(content_type='application/json; charset=utf-8')
def status_change(self, tenant, change):
payload = self._getStatus(tenant)
result_filter = ChangeFilter(change)
return result_filter.filterPayload(payload)
@cherrypy.expose
@cherrypy.tools.save_params()
@cherrypy.tools.json_out(content_type='application/json; charset=utf-8')
def jobs(self, tenant):
job = self.rpc.submitJob('zuul:job_list', {'tenant': tenant})
ret = json.loads(job.data[0])
resp = cherrypy.response
resp.headers['Access-Control-Allow-Origin'] = '*'
return ret
@cherrypy.expose
@cherrypy.tools.save_params()
def key(self, tenant, project):
job = self.rpc.submitJob('zuul:key_get', {'tenant': tenant,
'project': project})
resp = cherrypy.response
resp.headers['Access-Control-Allow-Origin'] = '*'
return job.data[0]
@cherrypy.expose
@cherrypy.tools.save_params()
@cherrypy.tools.json_out(content_type='application/json; charset=utf-8')
def builds(self, tenant, project=None, pipeline=None, change=None,
branch=None, patchset=None, ref=None, newrev=None,
uuid=None, job_name=None, voting=None, node_name=None,
result=None, limit=50, skip=0):
sql_driver = self.zuulweb.connections.drivers['sql']
connection = sql_driver.tenant_connections.get(tenant)
if not connection:
raise Exception("Unable to find connection for tenant %s" % tenant)
args = {
'buildset_filters': {'tenant': [tenant]},
'build_filters': {},
'limit': limit,
'skip': skip,
}
for k in ("project", "pipeline", "change", "branch",
"patchset", "ref", "newrev"):
v = locals()[k]
if v:
args['buildset_filters'].setdefault(k, []).append(v)
for k in ("uuid", "job_name", "voting", "node_name",
"result"):
v = locals()[k]
if v:
args['build_filters'].setdefault(k, []).append(v)
data = connection.get_builds(args)
resp = cherrypy.response
resp.headers['Access-Control-Allow-Origin'] = '*'
return data
class ChangeFilter(object):
def __init__(self, desired):
self.desired = desired
@cherrypy.expose
@cherrypy.tools.save_params()
@cherrypy.tools.websocket(handler_cls=LogStreamHandler)
def console_stream(self, tenant):
cherrypy.request.ws_handler.rpc = self.rpc
def filterPayload(self, payload):
status = []
for pipeline in payload['pipelines']:
for change_queue in pipeline['change_queues']:
for head in change_queue['heads']:
for change in head:
if self.wantChange(change):
status.append(copy.deepcopy(change))
return status
def wantChange(self, change):
return change['id'] == self.desired
class TenantStaticHandler(object):
def __init__(self, path):
self._cp_config = {
'tools.staticdir.on': True,
'tools.staticdir.dir': path,
'tools.staticdir.index': 'status.html',
}
class ZuulWeb(object):
class RootStaticHandler(object):
def __init__(self, path):
self._cp_config = {
'tools.staticdir.on': True,
'tools.staticdir.dir': path,
'tools.staticdir.index': 'tenants.html',
}
class ZuulWeb(object):
log = logging.getLogger("zuul.web.ZuulWeb")
def __init__(self, listen_address, listen_port,
@ -265,7 +300,6 @@ class ZuulWeb(object):
ssl_key=None, ssl_cert=None, ssl_ca=None,
static_cache_expiry=3600,
connections=None,
_connections=None,
info=None,
static_path=None):
self.start_time = time.time()
@ -276,168 +310,90 @@ class ZuulWeb(object):
self.server = None
self.static_cache_expiry = static_cache_expiry
self.info = info
self.static_path = static_path or STATIC_DIR
self.static_path = os.path.abspath(static_path or STATIC_DIR)
# instanciate handlers
self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port,
ssl_key, ssl_cert, ssl_ca)
self.log_streaming_handler = LogStreamingHandler(self.rpc)
self.gearman_handler = GearmanHandler(self.rpc)
self._plugin_routes = [] # type: List[zuul.web.handler.BaseWebHandler]
self._connection_handlers = []
connections = connections or []
for connection in connections:
self._connection_handlers.extend(
connection.getWebHandlers(self, self.info))
self.connections = _connections
self._plugin_routes.extend(self._connection_handlers)
async def _handleWebsocket(self, request):
return await self.log_streaming_handler.processRequest(
request)
def _handleRootInfo(self, request):
return self._handleInfo(self.info)
def _handleTenantInfo(self, request):
info = self.info.copy()
info.tenant = request.match_info["tenant"]
return self._handleInfo(info)
def _handleInfo(self, info):
resp = web.json_response({'info': info.toDict()}, status=200)
resp.headers['Access-Control-Allow-Origin'] = '*'
if self.static_cache_expiry:
resp.headers['Cache-Control'] = "public, max-age=%d" % \
self.static_cache_expiry
resp.last_modified = self.start_time
return resp
async def _handleTenantsRequest(self, request):
return await self.gearman_handler.processRequest(request,
'tenant_list')
async def _handleStatusRequest(self, request):
return await self.gearman_handler.processRequest(request, 'status_get')
async def _handleStatusChangeRequest(self, request):
change = request.match_info["change"]
return await self.gearman_handler.processRequest(
request, 'status_get', ChangeFilter(change))
async def _handleJobsRequest(self, request):
return await self.gearman_handler.processRequest(request, 'job_list')
async def _handleKeyRequest(self, request):
return await self.gearman_handler.processRequest(request, 'key_get')
async def _handleStatic(self, request):
# http://example.com//status.html comes in as '/status.html'
target_path = request.match_info['path'].lstrip('/')
fs_path = os.path.abspath(os.path.join(self.static_path, target_path))
if not fs_path.startswith(os.path.abspath(self.static_path)):
return web.HTTPForbidden()
if not os.path.exists(fs_path):
return web.HTTPNotFound()
return web.FileResponse(fs_path)
def run(self, loop=None):
"""
Run the websocket daemon.
self.connections = connections
route_map = cherrypy.dispatch.RoutesDispatcher()
api = ZuulWebAPI(self)
tenant_static = TenantStaticHandler(self.static_path)
root_static = RootStaticHandler(self.static_path)
route_map.connect('api', '/api/info',
controller=api, action='info')
route_map.connect('api', '/api/tenants',
controller=api, action='tenants')
route_map.connect('api', '/api/tenant/{tenant}/info',
controller=api, action='tenant_info')
route_map.connect('api', '/api/tenant/{tenant}/status',
controller=api, action='status')
route_map.connect('api', '/api/tenant/{tenant}/status/change/{change}',
controller=api, action='status_change')
route_map.connect('api', '/api/tenant/{tenant}/jobs',
controller=api, action='jobs')
route_map.connect('api', '/api/tenant/{tenant}/key/{project:.*}.pub',
controller=api, action='key')
route_map.connect('api', '/api/tenant/{tenant}/console-stream',
controller=api, action='console_stream')
route_map.connect('api', '/api/tenant/{tenant}/builds',
controller=api, action='builds')
for connection in connections.connections.values():
controller = connection.getWebController(self, self.info)
if controller:
cherrypy.tree.mount(
controller,
'/api/connection/%s' % connection.connection_name)
Because this method can be the target of a new thread, we need to
set the thread event loop here, rather than in __init__().
# Add fallthrough routes at the end for the static html/js files
route_map.connect('root_static', '/{path:.*}',
controller=root_static, action='default')
route_map.connect('tenant_static', '/t/{tenant}/{path:.*}',
controller=tenant_static, action='default')
conf = {
'/': {
'request.dispatch': route_map
}
}
cherrypy.config.update({
'global': {
'environment': 'production',
'server.socket_host': listen_address,
'server.socket_port': listen_port,
},
})
:param loop: The event loop to use. If not supplied, the default main
thread event loop is used. This should be supplied if ZuulWeb
is run within a separate (non-main) thread.
"""
sql_driver = self.connections.drivers['sql']
routes = [
('GET', '/api/info', self._handleRootInfo),
('GET', '/api/tenants', self._handleTenantsRequest),
('GET', '/api/tenant/{tenant}/info', self._handleTenantInfo),
('GET', '/api/tenant/{tenant}/status', self._handleStatusRequest),
('GET', '/api/tenant/{tenant}/jobs', self._handleJobsRequest),
('GET', '/api/tenant/{tenant}/status/change/{change}',
self._handleStatusChangeRequest),
('GET', '/api/tenant/{tenant}/console-stream',
self._handleWebsocket),
('GET', '/api/tenant/{tenant}/key/{project:.*}.pub',
self._handleKeyRequest),
('GET', '/api/tenant/{tenant}/builds',
sql_driver.handleRequest),
]
static_routes = [
StaticHandler(self, '/t/{tenant}/', 'status.html'),
StaticHandler(self, '/', 'tenants.html'),
]
for route in static_routes + self._plugin_routes:
routes.append((route.method, route.path, route.handleRequest))
cherrypy.tree.mount(api, '/', config=conf)
# Add fallthrough routes at the end for the static html/js files
routes.append(('GET', '/t/{tenant}/{path:.*}', self._handleStatic))
routes.append(('GET', '/{path:.*}', self._handleStatic))
@property
def port(self):
return cherrypy.server.bound_addr[1]
def start(self):
self.log.debug("ZuulWeb starting")
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
user_supplied_loop = loop is not None
if not loop:
loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
self.event_loop = loop
self.log_streaming_handler.setEventLoop(loop)
self.gearman_handler.setEventLoop(loop)
sql_driver.setEventLoop(loop)
for handler in self._connection_handlers:
if hasattr(handler, 'setEventLoop'):
handler.setEventLoop(loop)
app = web.Application()
for method, path, handler in routes:
app.router.add_route(method, path, handler)
handler = app.make_handler(loop=self.event_loop)
# create the server
coro = self.event_loop.create_server(handler,
self.listen_address,
self.listen_port)
self.server = self.event_loop.run_until_complete(coro)
self.term = asyncio.Future()
# start the server
self.event_loop.run_until_complete(self.term)
# cleanup
self.log.debug("ZuulWeb stopping")
self.server.close()
self.event_loop.run_until_complete(self.server.wait_closed())
self.event_loop.run_until_complete(app.shutdown())
self.event_loop.run_until_complete(handler.shutdown(60.0))
self.event_loop.run_until_complete(app.cleanup())
self.log.debug("ZuulWeb stopped")
# Only run these if we are controlling the loop - they need to be
# run from the main thread
if not user_supplied_loop:
loop.stop()
loop.close()
self.rpc.shutdown()
self.wsplugin = WebSocketPlugin(cherrypy.engine)
self.wsplugin.subscribe()
cherrypy.engine.start()
def stop(self):
if self.event_loop and self.term:
self.event_loop.call_soon_threadsafe(self.term.set_result, True)
self.log.debug("ZuulWeb stopping")
self.rpc.shutdown()
cherrypy.engine.exit()
# Not strictly necessary, but without this, if the server is
# started again (e.g., in the unit tests) it will reuse the
# same host/port settings.
cherrypy.server.httpserver = None
self.wsplugin.unsubscribe()
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.set_debug(True)
import zuul.lib.connections
connections = zuul.lib.connections.ConnectionRegistry()
z = ZuulWeb(listen_address="127.0.0.1", listen_port=9000,
gear_server="127.0.0.1", gear_port=4730)
z.run(loop)
gear_server="127.0.0.1", gear_port=4730,
connections=connections)
z.start()
cherrypy.engine.block()

47
zuul/web/handler.py

@ -12,49 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import os
from aiohttp import web
class BaseWebHandler(object, metaclass=abc.ABCMeta):
def __init__(self, connection, zuul_web, method, path):
self.connection = connection
self.zuul_web = zuul_web
self.method = method
self.path = path
@abc.abstractmethod
async def handleRequest(self, request):
"""Process a web request."""
class BaseDriverWebHandler(BaseWebHandler):
def __init__(self, connection, zuul_web, method, path):
super(BaseDriverWebHandler, self).__init__(
connection=connection, zuul_web=zuul_web, method=method, path=path)
if path.startswith('/'):
path = path[1:]
self.path = '/api/connection/{connection}/{path}'.format(
connection=self.connection.connection_name,
path=path)
class StaticHandler(BaseWebHandler):
def __init__(self, zuul_web, path, file_path=None):
super(StaticHandler, self).__init__(None, zuul_web, 'GET', path)
self.static_path = zuul_web.static_path
self.file_path = file_path or path.split('/')[-1]
async def handleRequest(self, request):
"""Process a web request."""
headers = {}
fp = os.path.join(self.static_path, self.file_path)
if self.zuul_web.static_cache_expiry:
headers['Cache-Control'] = "public, max-age=%d" % \
self.zuul_web.static_cache_expiry
return web.FileResponse(fp, headers=headers)
class BaseWebController(object):
pass
Loading…
Cancel
Save