Merge "Add html based websocket client for console stream" into feature/zuulv3

This commit is contained in:
Jenkins 2017-07-19 16:05:35 +00:00 committed by Gerrit Code Review
commit a8b35774b3
6 changed files with 172 additions and 21 deletions

View File

@ -385,6 +385,10 @@ web
port=9000
**websocket_url**
Base URL on which the websocket service is exposed, if different than the
base URL of the web app.
Operation
~~~~~~~~~

View File

@ -2289,11 +2289,15 @@ class TestScheduler(ZuulTestCase):
status_jobs.append(job)
self.assertEqual('project-merge', status_jobs[0]['name'])
# TODO(mordred) pull uuids from self.builds
self.assertEqual(
'static/stream.html?uuid={uuid}&logfile=console.log'.format(
uuid=status_jobs[0]['uuid']),
status_jobs[0]['url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[0]['uuid']),
status_jobs[0]['url'])
status_jobs[0]['finger_url'])
# TOOD(mordred) configure a success-url on the base job
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
@ -2301,11 +2305,15 @@ class TestScheduler(ZuulTestCase):
uuid=status_jobs[0]['uuid']),
status_jobs[0]['report_url'])
self.assertEqual('project-test1', status_jobs[1]['name'])
self.assertEqual(
'static/stream.html?uuid={uuid}&logfile=console.log'.format(
uuid=status_jobs[1]['uuid']),
status_jobs[1]['url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[1]['uuid']),
status_jobs[1]['url'])
status_jobs[1]['finger_url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
@ -2313,11 +2321,15 @@ class TestScheduler(ZuulTestCase):
status_jobs[1]['report_url'])
self.assertEqual('project-test2', status_jobs[2]['name'])
self.assertEqual(
'static/stream.html?uuid={uuid}&logfile=console.log'.format(
uuid=status_jobs[2]['uuid']),
status_jobs[2]['url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[2]['uuid']),
status_jobs[2]['url'])
status_jobs[2]['finger_url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
@ -3606,11 +3618,14 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual('project-merge', job['name'])
self.assertEqual('gate', job['pipeline'])
self.assertEqual(False, job['retry'])
self.assertEqual(
'static/stream.html?uuid={uuid}&logfile=console.log'
.format(uuid=job['uuid']), job['url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=job['uuid']),
job['url'])
job['finger_url'])
self.assertEqual(2, len(job['worker']))
self.assertEqual(False, job['canceled'])
self.assertEqual(True, job['voting'])

View File

@ -164,7 +164,7 @@ class Pipeline(object):
items.extend(shared_queue.queue)
return items
def formatStatusJSON(self):
def formatStatusJSON(self, websocket_url=None):
j_pipeline = dict(name=self.name,
description=self.description)
j_queues = []
@ -181,7 +181,7 @@ class Pipeline(object):
if j_changes:
j_queue['heads'].append(j_changes)
j_changes = []
j_changes.append(e.formatJSON())
j_changes.append(e.formatJSON(websocket_url))
if (len(j_changes) > 1 and
(j_changes[-2]['remaining_time'] is not None) and
(j_changes[-1]['remaining_time'] is not None)):
@ -1673,7 +1673,7 @@ class QueueItem(object):
url = default_url or build.url or job.name
return (result, url)
def formatJSON(self):
def formatJSON(self, websocket_url=None):
ret = {}
ret['active'] = self.active
ret['live'] = self.live
@ -1710,11 +1710,20 @@ class QueueItem(object):
remaining = None
result = None
build_url = None
finger_url = None
report_url = None
worker = None
if build:
result = build.result
build_url = build.url
finger_url = build.url
# TODO(tobiash): add support for custom web root
urlformat = 'static/stream.html?' \
'uuid={build.uuid}&' \
'logfile=console.log'
if websocket_url:
urlformat += '&websocket_url={websocket_url}'
build_url = urlformat.format(
build=build, websocket_url=websocket_url)
(unused, report_url) = self.formatJobResult(job)
if build.start_time:
if build.end_time:
@ -1740,6 +1749,7 @@ class QueueItem(object):
'elapsed_time': elapsed,
'remaining_time': remaining,
'url': build_url,
'finger_url': finger_url,
'report_url': report_url,
'result': result,
'voting': job.voting,

View File

@ -889,6 +889,7 @@ class Scheduler(threading.Thread):
data = {}
data['zuul_version'] = self.zuul_version
websocket_url = get_default(self.config, 'web', 'websocket_url', None)
if self._pause:
ret = '<p><b>Queue only mode:</b> preparing to '
@ -912,5 +913,5 @@ class Scheduler(threading.Thread):
data['pipelines'] = pipelines
tenant = self.abide.tenants.get(tenant_name)
for pipeline in tenant.layout.pipelines.values():
pipelines.append(pipeline.formatStatusJSON())
pipelines.append(pipeline.formatStatusJSON(websocket_url))
return json.dumps(data)

View File

@ -18,6 +18,7 @@
import asyncio
import json
import logging
import os
import uvloop
import aiohttp
@ -25,6 +26,8 @@ from aiohttp import web
import zuul.rpcclient
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
class LogStreamingHandler(object):
log = logging.getLogger("zuul.web.LogStreamingHandler")
@ -39,11 +42,11 @@ class LogStreamingHandler(object):
self.ssl_ca = ssl_ca
def _getPortLocation(self, job_uuid):
'''
"""
Query Gearman for the executor running the given job.
:param str job_uuid: The job UUID we want to stream.
'''
"""
# TODO: Fetch the entire list of uuid/file/server/ports once and
# share that, and fetch a new list on cache misses perhaps?
# TODO: Avoid recreating a client for each request.
@ -55,14 +58,14 @@ class LogStreamingHandler(object):
return ret
async def _fingerClient(self, ws, server, port, job_uuid):
'''
"""
Create a client to connect to the finger streamer and pull results.
:param aiohttp.web.WebSocketResponse ws: The websocket response object.
:param str server: The executor server running the job.
:param str port: The executor server port.
:param str job_uuid: The job UUID to stream.
'''
"""
self.log.debug("Connecting to finger server %s:%s", server, port)
reader, writer = await asyncio.open_connection(host=server, port=port,
loop=self.event_loop)
@ -82,12 +85,12 @@ class LogStreamingHandler(object):
return
async def _streamLog(self, ws, request):
'''
"""
Stream the log for the requested job back to the client.
:param aiohttp.web.WebSocketResponse ws: The websocket response object.
:param dict request: The client request parameters.
'''
"""
for key in ('uuid', 'logfile'):
if key not in request:
return (4000, "'{key}' missing from request payload".format(
@ -112,11 +115,11 @@ class LogStreamingHandler(object):
return (1000, "No more data")
async def processRequest(self, request):
'''
"""
Handle a client websocket request for log streaming.
:param aiohttp.web.Request request: The client request.
'''
"""
try:
ws = web.WebSocketResponse()
await ws.prepare(request)
@ -161,6 +164,8 @@ class ZuulWeb(object):
self.ssl_key = ssl_key
self.ssl_cert = ssl_cert
self.ssl_ca = ssl_ca
self.event_loop = None
self.term = None
async def _handleWebsocket(self, request):
handler = LogStreamingHandler(self.event_loop,
@ -169,7 +174,7 @@ class ZuulWeb(object):
return await handler.processRequest(request)
def run(self, loop=None):
'''
"""
Run the websocket daemon.
Because this method can be the target of a new thread, we need to
@ -178,9 +183,9 @@ class ZuulWeb(object):
:param loop: The event loop to use. If not supplied, the default main
thread event loop is used. This should be supplied if ZuulWeb
is run within a separate (non-main) thread.
'''
"""
routes = [
('GET', '/console-stream', self._handleWebsocket)
('GET', '/console-stream', self._handleWebsocket),
]
self.log.debug("ZuulWeb starting")
@ -195,6 +200,7 @@ class ZuulWeb(object):
app = web.Application()
for method, path, handler in routes:
app.router.add_route(method, path, handler)
app.router.add_static('/static', STATIC_DIR)
handler = app.make_handler(loop=self.event_loop)
# create the server
@ -224,7 +230,8 @@ class ZuulWeb(object):
loop.close()
def stop(self):
self.event_loop.call_soon_threadsafe(self.term.set_result, True)
if self.event_loop and self.term:
self.event_loop.call_soon_threadsafe(self.term.set_result, True)
if __name__ == "__main__":

114
zuul/web/static/stream.html Normal file
View File

@ -0,0 +1,114 @@
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN"
"http://www.w3.org/TR/html4/strict.dtd">
<html>
<head>
<style type="text/css">
body {
font-family: monospace;
background-color: black;
color: lightgrey;
}
#overlay {
position: fixed;
top: 5px;
right: 5px;
background-color: darkgrey;
color: black;
}
pre {
white-space: pre;
margin: 0px 10px;
}
</style>
<script type="text/javascript">
function escapeLog(text) {
var pattern = /[<>&"']/g;
return text.replace(pattern, function(match) {
return '&#' + match.charCodeAt(0) + ';';
});
}
window.onload = function() {
pageUpdateInMS = 250;
var receiveBuffer = "";
var websocket_url = null
setInterval(function() {
console.log("autoScroll");
if (receiveBuffer != "") {
document.getElementById('pagecontent').innerHTML += receiveBuffer;
receiveBuffer = "";
if (document.getElementById('autoscroll').checked) {
window.scrollTo(0, document.body.scrollHeight);
}
}
}, pageUpdateInMS);
var url = new URL(window.location);
var params = {
uuid: url.searchParams.get('uuid')
}
document.getElementById('pagetitle').innerHTML = params['uuid'];
if (url.searchParams.has('logfile')) {
params['logfile'] = url.searchParams.get('logfile');
var logfile_suffix = "(" + params['logfile'] + ")";
document.getElementById('pagetitle').innerHTML += logfile_suffix;
}
if (url.searchParams.has('websocket_url')) {
params['websocket_url'] = url.searchParams.get('websocket_url');
} else {
// Websocket doesn't accept relative urls so construct an
// absolute one.
var protocol = '';
if (url['protocol'] == 'https:') {
protocol = 'wss://';
} else {
protocol = 'ws://';
}
path = url['pathname'].replace(/static\/.*$/g, '') + 'console-stream';
params['websocket_url'] = protocol + url['host'] + path;
}
var ws = new WebSocket(params['websocket_url']);
ws.onmessage = function(event) {
console.log("onmessage");
receiveBuffer = receiveBuffer + escapeLog(event.data);
};
ws.onopen = function(event) {
console.log("onopen");
ws.send(JSON.stringify(params));
};
ws.onclose = function(event) {
console.log("onclose");
receiveBuffer = receiveBuffer + "\n--- END OF STREAM ---\n";
};
};
</script>
<title id="pagetitle"></title>
</head>
<body>
<div id="overlay">
<form>
<input type="checkbox" id="autoscroll" checked> autoscroll
</form>
</div>
<pre id="pagecontent"></pre>
</body>
</html>