From df32d4cf585babdded55f67f3a6f6b2c98881455 Mon Sep 17 00:00:00 2001 From: Felix Edel Date: Thu, 16 Sep 2021 10:44:18 +0200 Subject: [PATCH] Let zuul-web look up the live log streaming address from ZooKeeper This removes the RPC call (Gearman) in zuul-web to look up the live log streaming address from the build objects in the scheduler and instead uses the build requests stored in ZooKeeper. As the address lookup is implemented as a shared library function which is used by zuul-web and the fingergw, the fingergw is also switched from RPC to ZooKeeper. The implementation itself was moved from zuul.rpclistener to zuul.lib.streamer_utils. To make the lookup via ZooKeeper work, the executor now stores its worker information (hostname, log_port) on the build request when it locks the request. Additionally, the rpc client was removed from the fingergw as it's not needed anymore. Instead the fingergw has now access to the component registry and the executor api in ZooKeeper as both are needed to look up the streaming address. To not create unnecessary watches for build requests in each fingergw and zuul-web component, the executor api (resp. the job_request_queue base class) now provides a "use_cache" flag. The cache is enabled by default, but if the flag is set to False, no watches will be created. Overall this should reduce the load on the scheduler as it doesn't need to handle the related RPC calls anymore. Change-Id: I359b70f2d5700b0435544db3ce81d64cb8b73464 --- zuul/exceptions.py | 4 +++ zuul/executor/server.py | 6 ++++ zuul/lib/fingergw.py | 49 +++++++-------------------- zuul/lib/streamer_utils.py | 64 ++++++++++++++++++++++++++++++++++++ zuul/model.py | 11 ++++++- zuul/rpclistener.py | 62 ---------------------------------- zuul/web/__init__.py | 20 ++++++++--- zuul/zk/executor.py | 19 +++++++++-- zuul/zk/job_request_queue.py | 22 +++++++++---- zuul/zk/merger.py | 4 +-- 10 files changed, 146 insertions(+), 115 deletions(-) diff --git a/zuul/exceptions.py b/zuul/exceptions.py index 834404f5ea..ec1e5cdaf4 100644 --- a/zuul/exceptions.py +++ b/zuul/exceptions.py @@ -39,6 +39,10 @@ class ConfigurationError(Exception): pass +class StreamingError(Exception): + pass + + # Authentication Exceptions class AuthTokenException(Exception): diff --git a/zuul/executor/server.py b/zuul/executor/server.py index f69a5f996d..5d25856c54 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -3579,6 +3579,12 @@ class ExecutorServer(BaseMergeServer): # clearing the params so if we fail, no one tries to # re-run the job. build_request.state = BuildRequest.RUNNING + # Set the hostname on the build request so it can be used by + # zuul-web for the live log streaming. + build_request.worker_info = { + "hostname": self.hostname, + "log_port": self.log_streaming_port, + } self.executor_api.update(build_request) except Exception: log.exception("Exception while preparing to start worker") diff --git a/zuul/lib/fingergw.py b/zuul/lib/fingergw.py index daabaf72f1..1614a39763 100644 --- a/zuul/lib/fingergw.py +++ b/zuul/lib/fingergw.py @@ -20,12 +20,13 @@ import threading from configparser import ConfigParser from typing import Optional -import zuul.rpcclient +from zuul.exceptions import StreamingError from zuul.lib import streamer_utils from zuul.lib.commandsocket import CommandSocket from zuul.lib.config import get_default from zuul.zk import ZooKeeperClient -from zuul.zk.components import FingerGatewayComponent +from zuul.zk.components import ComponentRegistry, FingerGatewayComponent +from zuul.zk.executor import ExecutorApi COMMANDS = ['stop'] @@ -81,7 +82,8 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler): port = None try: build_uuid = self.getCommand() - port_location = self.fingergw.rpc.get_job_log_stream_address( + port_location = streamer_utils.getJobLogStreamAddress( + self.fingergw.executor_api, self.fingergw.component_registry, build_uuid, source_zone=self.fingergw.zone) if not port_location: @@ -93,6 +95,8 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler): port = port_location['port'] use_ssl = port_location.get('use_ssl', False) self._fingerClient(server, port, build_uuid, use_ssl) + except StreamingError as e: + self.request.sendall(str(e).encode("utf-8")) except BrokenPipeError: # Client disconnect return except Exception: @@ -110,7 +114,7 @@ class FingerGateway(object): For each incoming finger request, a new thread is started that will be responsible for finding which Zuul executor is executing the - requested build (by asking Gearman), forwarding the request to that + requested build (by asking ZooKeeper), forwarding the request to that executor, and streaming the results back to our client. ''' @@ -127,27 +131,10 @@ class FingerGateway(object): Initialize the finger gateway. :param config: The parsed Zuul configuration. - :param tuple gearman: Gearman connection information. This should - include the server, port, SSL key, SSL cert, and SSL CA. - :param tuple address: The address and port to bind to for our gateway. - :param str user: The user to which we should drop privileges after - binding to our address. :param str command_socket: Path to the daemon command socket. :param str pid_file: Path to the daemon PID file. ''' - gear_server = get_default(config, 'gearman', 'server') - gear_port = get_default(config, 'gearman', 'port', 4730) - gear_ssl_key = get_default(config, 'gearman', 'ssl_key') - gear_ssl_cert = get_default(config, 'gearman', 'ssl_cert') - gear_ssl_ca = get_default(config, 'gearman', 'ssl_ca') - - self.gear_server = gear_server - self.gear_port = gear_port - self.gear_ssl_key = gear_ssl_key - self.gear_ssl_cert = gear_ssl_cert - self.gear_ssl_ca = gear_ssl_ca - host = get_default(config, 'fingergw', 'listen_address', '::') self.port = int(get_default(config, 'fingergw', 'port', 79)) self.public_port = int(get_default( @@ -158,7 +145,6 @@ class FingerGateway(object): self.user = user self.pid_file = pid_file - self.rpc = None self.server = None self.server_thread = None @@ -200,6 +186,10 @@ class FingerGateway(object): self.component_info.use_ssl = True self.component_info.register() + self.component_registry = ComponentRegistry(self.zk_client) + + self.executor_api = ExecutorApi(self.zk_client, use_cache=False) + def _runCommand(self): while self.command_running: try: @@ -219,14 +209,6 @@ class FingerGateway(object): raise def start(self): - self.rpc = zuul.rpcclient.RPCClient( - self.gear_server, - self.gear_port, - self.gear_ssl_key, - self.gear_ssl_cert, - self.gear_ssl_ca, - client_id='Zuul Finger Gateway') - kwargs = dict( user=self.user, pid_file=self.pid_file, @@ -279,13 +261,6 @@ class FingerGateway(object): except Exception: self.log.exception("Error stopping TCP server:") - if self.rpc: - try: - self.rpc.shutdown() - self.rpc = None - except Exception: - self.log.exception("Error stopping RCP client:") - if self.command_socket: self.command_running = False diff --git a/zuul/lib/streamer_utils.py b/zuul/lib/streamer_utils.py index 171f61dd8f..1882dd64a1 100644 --- a/zuul/lib/streamer_utils.py +++ b/zuul/lib/streamer_utils.py @@ -18,8 +18,10 @@ The log streamer process within each executor, the finger gateway service, and the web interface will all make use of this module. ''' +import logging import os import pwd +import random import select import socket import socketserver @@ -27,6 +29,11 @@ import ssl import threading import time +from zuul.exceptions import StreamingError + + +log = logging.getLogger("zuul.lib.streamer_utils") + class BaseFingerRequestHandler(socketserver.BaseRequestHandler): ''' @@ -166,3 +173,60 @@ class CustomThreadingTCPServer(socketserver.ThreadingTCPServer): context.verify_mode = ssl.CERT_REQUIRED sock = context.wrap_socket(sock, server_side=True) return sock, addr + + +def getJobLogStreamAddress(executor_api, component_registry, uuid, + source_zone): + """ + Looks up the log stream address for the given build UUID. + + Try to find the build request for the given UUID in ZooKeeper + by searching through all available zones. If a build request + was found we use the worker information to build the log stream + address. + """ + # Search for the build request in ZooKeeper. This iterates over all + # available zones (inlcuding unzoned) and stops when the UUID is + # found. + build_request, worker_zone = executor_api.getByUuid(uuid) + + if build_request is None: + raise StreamingError("Build not found") + + worker_info = build_request.worker_info + if not worker_info: + raise StreamingError("Build did not start yet") + + job_log_stream_address = {} + if worker_zone and source_zone != worker_zone: + info = _getFingerGatewayInZone(component_registry, worker_zone) + if info: + job_log_stream_address['server'] = info.hostname + job_log_stream_address['port'] = info.public_port + job_log_stream_address['use_ssl'] = info.use_ssl + log.debug('Source (%s) and worker (%s) zone ' + 'are different, routing via %s:%s', + source_zone, worker_zone, + info.hostname, info.public_port) + else: + log.warning('Source (%s) and worker (%s) zone are different' + 'but no fingergw in target zone found. ' + 'Falling back to direct connection.', + source_zone, worker_zone) + else: + log.debug('Source (%s) or worker zone (%s) undefined ' + 'or equal, no routing is needed.', + source_zone, worker_zone) + + if 'server' not in job_log_stream_address: + job_log_stream_address['server'] = worker_info["hostname"] + job_log_stream_address['port'] = worker_info["log_port"] + + return job_log_stream_address + + +def _getFingerGatewayInZone(component_registry, zone): + gws = [gw for gw in component_registry.all('fingergw') if gw.zone == zone] + if gws: + return random.choice(gws) + return None diff --git a/zuul/model.py b/zuul/model.py index 02de935894..0c4ad63e3e 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -2223,6 +2223,10 @@ class BuildRequest(JobRequest): self.tenant_name = tenant_name self.pipeline_name = pipeline_name self.event_id = event_id + # The executor sets the worker info when it locks the build + # request so that zuul web can use this information to + # build the url for the live log stream. + self.worker_info = None def toDict(self): d = super().toDict() @@ -2231,12 +2235,13 @@ class BuildRequest(JobRequest): "tenant_name": self.tenant_name, "pipeline_name": self.pipeline_name, "event_id": self.event_id, + "worker_info": self.worker_info, }) return d @classmethod def fromDict(cls, data): - return cls( + request = cls( data["uuid"], data["zone"], data["tenant_name"], @@ -2247,6 +2252,10 @@ class BuildRequest(JobRequest): result_path=data["result_path"] ) + request.worker_info = data["worker_info"] + + return request + def __repr__(self): return ( f"