zuul-web: refactor LogStreamingHandler to keep a single rpcclient

This change makes the LogStreamingHandler uses a single rpcclient so that
it can be shared with upcomming new zuul-web handlers.

Change-Id: Idee7e1a5750230dc321f661cb3a5f71800393d53
This commit is contained in:
Tristan Cacqueray 2017-09-22 12:10:48 +00:00
parent 5a7b52af3d
commit 41fa9eaa29
1 changed files with 15 additions and 23 deletions

38
zuul/web/__init__.py Normal file → Executable file
View File

@ -32,14 +32,11 @@ STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
class LogStreamingHandler(object):
log = logging.getLogger("zuul.web.LogStreamingHandler")
def __init__(self, loop, gear_server, gear_port,
ssl_key=None, ssl_cert=None, ssl_ca=None):
self.event_loop = loop
self.gear_server = gear_server
self.gear_port = gear_port
self.ssl_key = ssl_key
self.ssl_cert = ssl_cert
self.ssl_ca = ssl_ca
def __init__(self, rpc):
self.rpc = rpc
def setEventLoop(self, event_loop):
self.event_loop = event_loop
def _getPortLocation(self, job_uuid):
"""
@ -49,12 +46,7 @@ class LogStreamingHandler(object):
"""
# TODO: Fetch the entire list of uuid/file/server/ports once and
# share that, and fetch a new list on cache misses perhaps?
# TODO: Avoid recreating a client for each request.
rpc = zuul.rpcclient.RPCClient(self.gear_server, self.gear_port,
self.ssl_key, self.ssl_cert,
self.ssl_ca)
ret = rpc.get_job_log_stream_address(job_uuid)
rpc.shutdown()
ret = self.rpc.get_job_log_stream_address(job_uuid)
return ret
async def _fingerClient(self, ws, server, port, job_uuid):
@ -159,19 +151,16 @@ class ZuulWeb(object):
ssl_key=None, ssl_cert=None, ssl_ca=None):
self.listen_address = listen_address
self.listen_port = listen_port
self.gear_server = gear_server
self.gear_port = gear_port
self.ssl_key = ssl_key
self.ssl_cert = ssl_cert
self.ssl_ca = ssl_ca
self.event_loop = None
self.term = None
# instanciate handlers
self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port,
ssl_key, ssl_cert, ssl_ca)
self.log_streaming_handler = LogStreamingHandler(self.rpc)
async def _handleWebsocket(self, request):
handler = LogStreamingHandler(self.event_loop,
self.gear_server, self.gear_port,
self.ssl_key, self.ssl_cert, self.ssl_ca)
return await handler.processRequest(request)
return await self.log_streaming_handler.processRequest(
request)
def run(self, loop=None):
"""
@ -196,6 +185,7 @@ class ZuulWeb(object):
asyncio.set_event_loop(loop)
self.event_loop = loop
self.log_streaming_handler.setEventLoop(loop)
app = web.Application()
for method, path, handler in routes:
@ -229,6 +219,8 @@ class ZuulWeb(object):
loop.stop()
loop.close()
self.rpc.shutdown()
def stop(self):
if self.event_loop and self.term:
self.event_loop.call_soon_threadsafe(self.term.set_result, True)