Add web-based console log streaming
zuul now provides socket-based console streaming, which is super cool. In order to have jenkins parity with web streaming, we need to provide a websocket (javascript in browsers can't really connect to random ports on servers) After surveying the existing python websocket options, basically all of them are based around twisted, eventlet, gevent or asyncio. It's not just a thing we can easily deal with from our current webob/paste structure, because it is a change to the fundamental HTTP handling. While we could write our own websocket server implementation that was threaded like the rest of zuul, that's a pretty giant amount of work. Instead, we can run an async-based server that's just for the websockets, so that we're not all of a sudden putting async code into the rest of zuul and winding up frankensteined. Since this is new code, using asyncio and python3 seems like an excellent starting place. aiohttp supports running a websocket server in a thread. It also supports doing other HTTP/REST calls, so by going aiohttp we can set ourselves up for a single answer for the HTTP tier. In order to keep us from being an open socket relay, we'll expect two parameters as the first message on the websocket - what's the zuul build uuid, and what log file do we want to stream. (the second thing, multiple log files, isn't supported yet by the rest of zuul, but one can imagine a future where we'd like to support that too, so it's in the protocol) The websocket server will then ask zuul over gearman for the IP and port associated with the build and logfile and will start streaming it to the socket. Ultimately we'll want the status page to make links of the form: /console.html?uuid=<uuid>&logfile=console.log and we'll want to have apache map the websocket server to something like /console. Co-Authored-By: Monty Taylor <mordred@inaugust.com> Change-Id: Idd0d3f9259e81fa9a60d7540664ce8d5ad2c298fchanges/53/463353/31
parent
0dbe15993a
commit
51139a0682
|
@ -29,6 +29,10 @@ default_username=zuul
|
|||
trusted_ro_dirs=/opt/zuul-scripts:/var/cache
|
||||
trusted_rw_dirs=/opt/zuul-logs
|
||||
|
||||
[web]
|
||||
listen_address=127.0.0.1
|
||||
port=9000
|
||||
|
||||
[webapp]
|
||||
listen_address=0.0.0.0
|
||||
port=8001
|
||||
|
|
|
@ -24,3 +24,5 @@ cryptography>=1.6
|
|||
cachecontrol
|
||||
pyjwt
|
||||
iso8601
|
||||
aiohttp
|
||||
uvloop;python_version>='3.5'
|
||||
|
|
|
@ -26,6 +26,7 @@ console_scripts =
|
|||
zuul-cloner = zuul.cmd.cloner:main
|
||||
zuul-executor = zuul.cmd.executor:main
|
||||
zuul-bwrap = zuul.driver.bubblewrap:main
|
||||
zuul-web = zuul.cmd.web:main
|
||||
|
||||
[build_sphinx]
|
||||
source-dir = doc/source
|
||||
|
|
|
@ -1226,7 +1226,6 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
|
|||
self.build_history = []
|
||||
self.fail_tests = {}
|
||||
self.job_builds = {}
|
||||
self.hostname = 'zl.example.com'
|
||||
|
||||
def failJob(self, name, change):
|
||||
"""Instruct the executor to report matching builds as failures.
|
||||
|
|
|
@ -121,7 +121,8 @@ class TestSQLConnection(ZuulDBTestCase):
|
|||
self.assertEqual('project-merge', buildset0_builds[0]['job_name'])
|
||||
self.assertEqual("SUCCESS", buildset0_builds[0]['result'])
|
||||
self.assertEqual(
|
||||
'finger://zl.example.com/{uuid}'.format(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=buildset0_builds[0]['uuid']),
|
||||
buildset0_builds[0]['log_url'])
|
||||
self.assertEqual('check', buildset1['pipeline'])
|
||||
|
@ -144,7 +145,8 @@ class TestSQLConnection(ZuulDBTestCase):
|
|||
self.assertEqual('project-test1', buildset1_builds[-2]['job_name'])
|
||||
self.assertEqual("FAILURE", buildset1_builds[-2]['result'])
|
||||
self.assertEqual(
|
||||
'finger://zl.example.com/{uuid}'.format(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=buildset1_builds[-2]['uuid']),
|
||||
buildset1_builds[-2]['log_url'])
|
||||
|
||||
|
|
|
@ -14,6 +14,10 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import logging
|
||||
import json
|
||||
import os
|
||||
import os.path
|
||||
import socket
|
||||
|
@ -21,6 +25,7 @@ import tempfile
|
|||
import threading
|
||||
import time
|
||||
|
||||
import zuul.web
|
||||
import zuul.lib.log_streamer
|
||||
import tests.base
|
||||
|
||||
|
@ -57,6 +62,7 @@ class TestLogStreamer(tests.base.BaseTestCase):
|
|||
class TestStreaming(tests.base.AnsibleZuulTestCase):
|
||||
|
||||
tenant_config_file = 'config/streamer/main.yaml'
|
||||
log = logging.getLogger("zuul.test.test_log_streamer.TestStreaming")
|
||||
|
||||
def setUp(self):
|
||||
super(TestStreaming, self).setUp()
|
||||
|
@ -146,9 +152,116 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
|
|||
# job and deleted. However, we still have a file handle to it, so we
|
||||
# can make sure that we read the entire contents at this point.
|
||||
# Compact the returned lines into a single string for easy comparison.
|
||||
file_contents = ''.join(logfile.readlines())
|
||||
file_contents = logfile.read()
|
||||
logfile.close()
|
||||
|
||||
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
|
||||
self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
|
||||
self.assertEqual(file_contents, self.streaming_data)
|
||||
|
||||
def runWSClient(self, build_uuid, event):
|
||||
async def client(loop, build_uuid, event):
|
||||
uri = 'http://127.0.0.1:9000/console-stream'
|
||||
try:
|
||||
session = aiohttp.ClientSession(loop=loop)
|
||||
async with session.ws_connect(uri) as ws:
|
||||
req = {'uuid': build_uuid, 'logfile': None}
|
||||
ws.send_str(json.dumps(req))
|
||||
event.set() # notify we are connected and req sent
|
||||
async for msg in ws:
|
||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||
self.ws_client_results += msg.data
|
||||
elif msg.type == aiohttp.WSMsgType.CLOSED:
|
||||
break
|
||||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||||
break
|
||||
session.close()
|
||||
except Exception as e:
|
||||
self.log.exception("client exception:")
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
loop.set_debug(True)
|
||||
loop.run_until_complete(client(loop, build_uuid, event))
|
||||
loop.close()
|
||||
|
||||
def test_websocket_streaming(self):
|
||||
# Need to set the streaming port before submitting the job
|
||||
finger_port = 7902
|
||||
self.executor_server.log_streaming_port = finger_port
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
|
||||
# We don't have any real synchronization for the ansible jobs, so
|
||||
# just wait until we get our running build.
|
||||
while not len(self.builds):
|
||||
time.sleep(0.1)
|
||||
build = self.builds[0]
|
||||
self.assertEqual(build.name, 'python27')
|
||||
|
||||
build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
|
||||
while not os.path.exists(build_dir):
|
||||
time.sleep(0.1)
|
||||
|
||||
# Need to wait to make sure that jobdir gets set
|
||||
while build.jobdir is None:
|
||||
time.sleep(0.1)
|
||||
build = self.builds[0]
|
||||
|
||||
# Wait for the job to begin running and create the ansible log file.
|
||||
# The job waits to complete until the flag file exists, so we can
|
||||
# safely access the log here. We only open it (to force a file handle
|
||||
# to be kept open for it after the job finishes) but wait to read the
|
||||
# contents until the job is done.
|
||||
ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
|
||||
while not os.path.exists(ansible_log):
|
||||
time.sleep(0.1)
|
||||
logfile = open(ansible_log, 'r')
|
||||
self.addCleanup(logfile.close)
|
||||
|
||||
# Start the finger streamer daemon
|
||||
streamer = zuul.lib.log_streamer.LogStreamer(
|
||||
None, self.host, finger_port, self.executor_server.jobdir_root)
|
||||
self.addCleanup(streamer.stop)
|
||||
|
||||
# Start the web server
|
||||
web_server = zuul.web.ZuulWeb(
|
||||
listen_address='127.0.0.1', listen_port=9000,
|
||||
gear_server='127.0.0.1', gear_port=self.gearman_server.port)
|
||||
loop = asyncio.new_event_loop()
|
||||
loop.set_debug(True)
|
||||
ws_thread = threading.Thread(target=web_server.run, args=(loop,))
|
||||
ws_thread.start()
|
||||
self.addCleanup(loop.close)
|
||||
self.addCleanup(ws_thread.join)
|
||||
self.addCleanup(web_server.stop)
|
||||
|
||||
# Wait until web server is started
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
while s.connect_ex((self.host, 9000)):
|
||||
time.sleep(0.1)
|
||||
|
||||
# Start a thread with the websocket client
|
||||
ws_client_event = threading.Event()
|
||||
self.ws_client_results = ''
|
||||
ws_client_thread = threading.Thread(
|
||||
target=self.runWSClient, args=(build.uuid, ws_client_event)
|
||||
)
|
||||
ws_client_thread.start()
|
||||
ws_client_event.wait()
|
||||
|
||||
# Allow the job to complete
|
||||
flag_file = os.path.join(build_dir, 'test_wait')
|
||||
open(flag_file, 'w').close()
|
||||
|
||||
# Wait for the websocket client to complete, which it should when
|
||||
# it's received the full log.
|
||||
ws_client_thread.join()
|
||||
|
||||
self.waitUntilSettled()
|
||||
|
||||
file_contents = logfile.read()
|
||||
logfile.close()
|
||||
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
|
||||
self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results)
|
||||
self.assertEqual(file_contents, self.ws_client_results)
|
||||
|
|
|
@ -2289,22 +2289,40 @@ class TestScheduler(ZuulTestCase):
|
|||
status_jobs.append(job)
|
||||
self.assertEqual('project-merge', status_jobs[0]['name'])
|
||||
# TODO(mordred) pull uuids from self.builds
|
||||
self.assertEqual('finger://zl.example.com/%s' % status_jobs[0]['uuid'],
|
||||
status_jobs[0]['url'])
|
||||
self.assertEqual(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=status_jobs[0]['uuid']),
|
||||
status_jobs[0]['url'])
|
||||
# TOOD(mordred) configure a success-url on the base job
|
||||
self.assertEqual('finger://zl.example.com/%s' % status_jobs[0]['uuid'],
|
||||
status_jobs[0]['report_url'])
|
||||
self.assertEqual(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=status_jobs[0]['uuid']),
|
||||
status_jobs[0]['report_url'])
|
||||
self.assertEqual('project-test1', status_jobs[1]['name'])
|
||||
self.assertEqual('finger://zl.example.com/%s' % status_jobs[1]['uuid'],
|
||||
status_jobs[1]['url'])
|
||||
self.assertEqual('finger://zl.example.com/%s' % status_jobs[1]['uuid'],
|
||||
status_jobs[1]['report_url'])
|
||||
self.assertEqual(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=status_jobs[1]['uuid']),
|
||||
status_jobs[1]['url'])
|
||||
self.assertEqual(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=status_jobs[1]['uuid']),
|
||||
status_jobs[1]['report_url'])
|
||||
|
||||
self.assertEqual('project-test2', status_jobs[2]['name'])
|
||||
self.assertEqual('finger://zl.example.com/%s' % status_jobs[2]['uuid'],
|
||||
status_jobs[2]['url'])
|
||||
self.assertEqual('finger://zl.example.com/%s' % status_jobs[2]['uuid'],
|
||||
status_jobs[2]['report_url'])
|
||||
self.assertEqual(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=status_jobs[2]['uuid']),
|
||||
status_jobs[2]['url'])
|
||||
self.assertEqual(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=status_jobs[2]['uuid']),
|
||||
status_jobs[2]['report_url'])
|
||||
|
||||
def test_live_reconfiguration(self):
|
||||
"Test that live reconfiguration works"
|
||||
|
@ -3577,8 +3595,11 @@ For CI problems and help debugging, contact ci@example.org"""
|
|||
self.assertEqual('project-merge', job['name'])
|
||||
self.assertEqual('gate', job['pipeline'])
|
||||
self.assertEqual(False, job['retry'])
|
||||
self.assertEqual('finger://zl.example.com/%s' % job['uuid'],
|
||||
job['url'])
|
||||
self.assertEqual(
|
||||
'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=job['uuid']),
|
||||
job['url'])
|
||||
self.assertEqual(2, len(job['worker']))
|
||||
self.assertEqual(False, job['canceled'])
|
||||
self.assertEqual(True, job['voting'])
|
||||
|
@ -4674,7 +4695,8 @@ class TestSchedulerSuccessURL(ZuulTestCase):
|
|||
|
||||
# NOTE: This default URL is currently hard-coded in executor/server.py
|
||||
self.assertIn(
|
||||
'- docs-draft-test2 finger://zl.example.com/{uuid}'.format(
|
||||
'- docs-draft-test2 finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=uuid_test2),
|
||||
body[3])
|
||||
|
||||
|
|
|
@ -0,0 +1,115 @@
|
|||
#!/usr/bin/env python
|
||||
# Copyright 2017 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import daemon
|
||||
import extras
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
|
||||
import zuul.cmd
|
||||
import zuul.web
|
||||
|
||||
from zuul.lib.config import get_default
|
||||
|
||||
# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore
|
||||
# instead it depends on lockfile-0.9.1 which uses pidfile.
|
||||
pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
|
||||
|
||||
|
||||
class WebServer(zuul.cmd.ZuulApp):
|
||||
|
||||
def parse_arguments(self):
|
||||
parser = argparse.ArgumentParser(description='Zuul Web Server.')
|
||||
parser.add_argument('-c', dest='config',
|
||||
help='specify the config file')
|
||||
parser.add_argument('-d', dest='nodaemon', action='store_true',
|
||||
help='do not run as a daemon')
|
||||
parser.add_argument('--version', dest='version', action='version',
|
||||
version=self._get_version(),
|
||||
help='show zuul version')
|
||||
self.args = parser.parse_args()
|
||||
|
||||
def exit_handler(self, signum, frame):
|
||||
self.web.stop()
|
||||
|
||||
def _main(self):
|
||||
params = dict()
|
||||
|
||||
params['listen_address'] = get_default(self.config,
|
||||
'web', 'listen_address',
|
||||
'127.0.0.1')
|
||||
params['listen_port'] = get_default(self.config, 'web', 'port', 9000)
|
||||
params['gear_server'] = get_default(self.config, 'gearman', 'server')
|
||||
params['gear_port'] = get_default(self.config, 'gearman', 'port', 4730)
|
||||
params['ssl_key'] = get_default(self.config, 'gearman', 'ssl_key')
|
||||
params['ssl_cert'] = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
params['ssl_ca'] = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
|
||||
try:
|
||||
self.web = zuul.web.ZuulWeb(**params)
|
||||
except Exception as e:
|
||||
self.log.exception("Error creating ZuulWeb:")
|
||||
sys.exit(1)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
signal.signal(signal.SIGUSR1, self.exit_handler)
|
||||
signal.signal(signal.SIGTERM, self.exit_handler)
|
||||
|
||||
self.log.info('Zuul Web Server starting')
|
||||
self.thread = threading.Thread(target=self.web.run,
|
||||
args=(loop,),
|
||||
name='web')
|
||||
self.thread.start()
|
||||
|
||||
try:
|
||||
signal.pause()
|
||||
except KeyboardInterrupt:
|
||||
print("Ctrl + C: asking web server to exit nicely...\n")
|
||||
self.exit_handler(signal.SIGINT, None)
|
||||
|
||||
self.thread.join()
|
||||
loop.stop()
|
||||
loop.close()
|
||||
self.log.info("Zuul Web Server stopped")
|
||||
|
||||
def main(self):
|
||||
self.setup_logging('web', 'log_config')
|
||||
self.log = logging.getLogger("zuul.WebServer")
|
||||
|
||||
try:
|
||||
self._main()
|
||||
except Exception:
|
||||
self.log.exception("Exception from WebServer:")
|
||||
|
||||
|
||||
def main():
|
||||
server = WebServer()
|
||||
server.parse_arguments()
|
||||
server.read_config()
|
||||
|
||||
pid_fn = get_default(server.config, 'web', 'pidfile',
|
||||
'/var/run/zuul-web/zuul-web.pid', expand_user=True)
|
||||
|
||||
pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
|
||||
|
||||
if server.args.nodaemon:
|
||||
server.main()
|
||||
else:
|
||||
with daemon.DaemonContext(pidfile=pid):
|
||||
server.main()
|
|
@ -15,6 +15,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import os.path
|
||||
import pwd
|
||||
|
@ -210,6 +211,8 @@ class LogStreamer(object):
|
|||
'''
|
||||
|
||||
def __init__(self, user, host, port, jobdir_root):
|
||||
self.log = logging.getLogger('zuul.lib.LogStreamer')
|
||||
self.log.debug("LogStreamer starting on port %s", port)
|
||||
self.server = CustomForkingTCPServer((host, port),
|
||||
RequestHandler,
|
||||
user=user,
|
||||
|
@ -225,3 +228,4 @@ class LogStreamer(object):
|
|||
if self.thd.isAlive():
|
||||
self.server.shutdown()
|
||||
self.server.server_close()
|
||||
self.log.debug("LogStreamer stopped")
|
||||
|
|
|
@ -86,3 +86,11 @@ class RPCClient(object):
|
|||
|
||||
def shutdown(self):
|
||||
self.gearman.shutdown()
|
||||
|
||||
def get_job_log_stream_address(self, uuid, logfile='console.log'):
|
||||
data = {'uuid': uuid, 'logfile': logfile}
|
||||
job = self.submitJob('zuul:get_job_log_stream_address', data)
|
||||
if job.failure:
|
||||
return False
|
||||
else:
|
||||
return json.loads(job.data[0])
|
||||
|
|
|
@ -53,6 +53,7 @@ class RPCListener(object):
|
|||
self.worker.registerFunction("zuul:enqueue_ref")
|
||||
self.worker.registerFunction("zuul:promote")
|
||||
self.worker.registerFunction("zuul:get_running_jobs")
|
||||
self.worker.registerFunction("zuul:get_job_log_stream_address")
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
|
@ -173,3 +174,29 @@ class RPCListener(object):
|
|||
running_items.append(item.formatJSON())
|
||||
|
||||
job.sendWorkComplete(json.dumps(running_items))
|
||||
|
||||
def handle_get_job_log_stream_address(self, job):
|
||||
# TODO: map log files to ports. Currently there is only one
|
||||
# log stream for a given job. But many jobs produce many
|
||||
# log files, so this is forwards compatible with a future
|
||||
# where there are more logs to potentially request than
|
||||
# "console.log"
|
||||
def find_build(uuid):
|
||||
for tenant in self.sched.abide.tenants.values():
|
||||
for pipeline_name, pipeline in tenant.layout.pipelines.items():
|
||||
for queue in pipeline.queues:
|
||||
for item in queue.queue:
|
||||
for bld in item.current_build_set.getBuilds():
|
||||
if bld.uuid == uuid:
|
||||
return bld
|
||||
return None
|
||||
|
||||
args = json.loads(job.arguments)
|
||||
uuid = args['uuid']
|
||||
# TODO: logfile = args['logfile']
|
||||
job_log_stream_address = {}
|
||||
build = find_build(uuid)
|
||||
if build:
|
||||
job_log_stream_address['server'] = build.worker.hostname
|
||||
job_log_stream_address['port'] = build.worker.log_port
|
||||
job.sendWorkComplete(json.dumps(job_log_stream_address))
|
||||
|
|
|
@ -0,0 +1,232 @@
|
|||
#!/usr/bin/env python
|
||||
# Copyright (c) 2017 Red Hat
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import uvloop
|
||||
|
||||
import aiohttp
|
||||
from aiohttp import web
|
||||
|
||||
import zuul.rpcclient
|
||||
|
||||
|
||||
class LogStreamingHandler(object):
|
||||
log = logging.getLogger("zuul.web.LogStreamingHandler")
|
||||
|
||||
def __init__(self, loop, gear_server, gear_port,
|
||||
ssl_key=None, ssl_cert=None, ssl_ca=None):
|
||||
self.event_loop = loop
|
||||
self.gear_server = gear_server
|
||||
self.gear_port = gear_port
|
||||
self.ssl_key = ssl_key
|
||||
self.ssl_cert = ssl_cert
|
||||
self.ssl_ca = ssl_ca
|
||||
|
||||
def _getPortLocation(self, job_uuid):
|
||||
'''
|
||||
Query Gearman for the executor running the given job.
|
||||
|
||||
:param str job_uuid: The job UUID we want to stream.
|
||||
'''
|
||||
# TODO: Fetch the entire list of uuid/file/server/ports once and
|
||||
# share that, and fetch a new list on cache misses perhaps?
|
||||
# TODO: Avoid recreating a client for each request.
|
||||
rpc = zuul.rpcclient.RPCClient(self.gear_server, self.gear_port,
|
||||
self.ssl_key, self.ssl_cert,
|
||||
self.ssl_ca)
|
||||
ret = rpc.get_job_log_stream_address(job_uuid)
|
||||
rpc.shutdown()
|
||||
return ret
|
||||
|
||||
async def _fingerClient(self, ws, server, port, job_uuid):
|
||||
'''
|
||||
Create a client to connect to the finger streamer and pull results.
|
||||
|
||||
:param aiohttp.web.WebSocketResponse ws: The websocket response object.
|
||||
:param str server: The executor server running the job.
|
||||
:param str port: The executor server port.
|
||||
:param str job_uuid: The job UUID to stream.
|
||||
'''
|
||||
self.log.debug("Connecting to finger server %s:%s", server, port)
|
||||
reader, writer = await asyncio.open_connection(host=server, port=port,
|
||||
loop=self.event_loop)
|
||||
|
||||
self.log.debug("Sending finger request for %s", job_uuid)
|
||||
msg = "%s\n" % job_uuid # Must have a trailing newline!
|
||||
|
||||
writer.write(msg.encode('utf8'))
|
||||
await writer.drain()
|
||||
|
||||
while True:
|
||||
data = await reader.read(1024)
|
||||
if data:
|
||||
await ws.send_str(data.decode('utf8'))
|
||||
else:
|
||||
writer.close()
|
||||
return
|
||||
|
||||
async def _streamLog(self, ws, request):
|
||||
'''
|
||||
Stream the log for the requested job back to the client.
|
||||
|
||||
:param aiohttp.web.WebSocketResponse ws: The websocket response object.
|
||||
:param dict request: The client request parameters.
|
||||
'''
|
||||
for key in ('uuid', 'logfile'):
|
||||
if key not in request:
|
||||
return (4000, "'{key}' missing from request payload".format(
|
||||
key=key))
|
||||
|
||||
# Schedule the blocking gearman work in an Executor
|
||||
gear_task = self.event_loop.run_in_executor(
|
||||
None, self._getPortLocation, request['uuid'])
|
||||
|
||||
try:
|
||||
port_location = await asyncio.wait_for(gear_task, 10)
|
||||
except asyncio.TimeoutError:
|
||||
return (4010, "Gearman timeout")
|
||||
|
||||
if not port_location:
|
||||
return (4011, "Error with Gearman")
|
||||
|
||||
await self._fingerClient(
|
||||
ws, port_location['server'], port_location['port'], request['uuid']
|
||||
)
|
||||
|
||||
return (1000, "No more data")
|
||||
|
||||
async def processRequest(self, request):
|
||||
'''
|
||||
Handle a client websocket request for log streaming.
|
||||
|
||||
:param aiohttp.web.Request request: The client request.
|
||||
'''
|
||||
try:
|
||||
ws = web.WebSocketResponse()
|
||||
await ws.prepare(request)
|
||||
async for msg in ws:
|
||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||
req = json.loads(msg.data)
|
||||
self.log.debug("Websocket request: %s", req)
|
||||
code, msg = await self._streamLog(ws, req)
|
||||
|
||||
# We expect to process only a single message. I.e., we
|
||||
# can stream only a single file at a time.
|
||||
await ws.close(code=code, message=msg)
|
||||
break
|
||||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||||
self.log.error(
|
||||
"Websocket connection closed with exception %s",
|
||||
ws.exception()
|
||||
)
|
||||
break
|
||||
elif msg.type == aiohttp.WSMsgType.CLOSED:
|
||||
break
|
||||
except Exception as e:
|
||||
self.log.exception("Websocket exception:")
|
||||
await ws.close(code=4009, message=str(e).encode('utf-8'))
|
||||
return ws
|
||||
|
||||
|
||||
class ZuulWeb(object):
|
||||
|
||||
log = logging.getLogger("zuul.web.ZuulWeb")
|
||||
|
||||
def __init__(self, listen_address, listen_port,
|
||||
gear_server, gear_port,
|
||||
ssl_key=None, ssl_cert=None, ssl_ca=None):
|
||||
self.listen_address = listen_address
|
||||
self.listen_port = listen_port
|
||||
self.gear_server = gear_server
|
||||
self.gear_port = gear_port
|
||||
self.ssl_key = ssl_key
|
||||
self.ssl_cert = ssl_cert
|
||||
self.ssl_ca = ssl_ca
|
||||
|
||||
async def _handleWebsocket(self, request):
|
||||
handler = LogStreamingHandler(self.event_loop,
|
||||
self.gear_server, self.gear_port,
|
||||
self.ssl_key, self.ssl_cert, self.ssl_ca)
|
||||
return await handler.processRequest(request)
|
||||
|
||||
def run(self, loop=None):
|
||||
'''
|
||||
Run the websocket daemon.
|
||||
|
||||
Because this method can be the target of a new thread, we need to
|
||||
set the thread event loop here, rather than in __init__().
|
||||
|
||||
:param loop: The event loop to use. If not supplied, the default main
|
||||
thread event loop is used. This should be supplied if ZuulWeb
|
||||
is run within a separate (non-main) thread.
|
||||
'''
|
||||
routes = [
|
||||
('GET', '/console-stream', self._handleWebsocket)
|
||||
]
|
||||
|
||||
self.log.debug("ZuulWeb starting")
|
||||
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
||||
user_supplied_loop = loop is not None
|
||||
if not loop:
|
||||
loop = asyncio.get_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
self.event_loop = loop
|
||||
|
||||
app = web.Application()
|
||||
for method, path, handler in routes:
|
||||
app.router.add_route(method, path, handler)
|
||||
handler = app.make_handler(loop=self.event_loop)
|
||||
|
||||
# create the server
|
||||
coro = self.event_loop.create_server(handler,
|
||||
self.listen_address,
|
||||
self.listen_port)
|
||||
self.server = self.event_loop.run_until_complete(coro)
|
||||
|
||||
self.term = asyncio.Future()
|
||||
|
||||
# start the server
|
||||
self.event_loop.run_until_complete(self.term)
|
||||
|
||||
# cleanup
|
||||
self.log.debug("ZuulWeb stopping")
|
||||
self.server.close()
|
||||
self.event_loop.run_until_complete(self.server.wait_closed())
|
||||
self.event_loop.run_until_complete(app.shutdown())
|
||||
self.event_loop.run_until_complete(handler.shutdown(60.0))
|
||||
self.event_loop.run_until_complete(app.cleanup())
|
||||
self.log.debug("ZuulWeb stopped")
|
||||
|
||||
# Only run these if we are controlling the loop - they need to be
|
||||
# run from the main thread
|
||||
if not user_supplied_loop:
|
||||
loop.stop()
|
||||
loop.close()
|
||||
|
||||
def stop(self):
|
||||
self.event_loop.call_soon_threadsafe(self.term.set_result, True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.set_debug(True)
|
||||
z = ZuulWeb()
|
||||
z.run(loop)
|
Loading…
Reference in New Issue