diff --git a/zuul/exceptions.py b/zuul/exceptions.py index 834404f5ea..ec1e5cdaf4 100644 --- a/zuul/exceptions.py +++ b/zuul/exceptions.py @@ -39,6 +39,10 @@ class ConfigurationError(Exception): pass +class StreamingError(Exception): + pass + + # Authentication Exceptions class AuthTokenException(Exception): diff --git a/zuul/executor/server.py b/zuul/executor/server.py index f69a5f996d..5d25856c54 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -3579,6 +3579,12 @@ class ExecutorServer(BaseMergeServer): # clearing the params so if we fail, no one tries to # re-run the job. build_request.state = BuildRequest.RUNNING + # Set the hostname on the build request so it can be used by + # zuul-web for the live log streaming. + build_request.worker_info = { + "hostname": self.hostname, + "log_port": self.log_streaming_port, + } self.executor_api.update(build_request) except Exception: log.exception("Exception while preparing to start worker") diff --git a/zuul/lib/fingergw.py b/zuul/lib/fingergw.py index daabaf72f1..1614a39763 100644 --- a/zuul/lib/fingergw.py +++ b/zuul/lib/fingergw.py @@ -20,12 +20,13 @@ import threading from configparser import ConfigParser from typing import Optional -import zuul.rpcclient +from zuul.exceptions import StreamingError from zuul.lib import streamer_utils from zuul.lib.commandsocket import CommandSocket from zuul.lib.config import get_default from zuul.zk import ZooKeeperClient -from zuul.zk.components import FingerGatewayComponent +from zuul.zk.components import ComponentRegistry, FingerGatewayComponent +from zuul.zk.executor import ExecutorApi COMMANDS = ['stop'] @@ -81,7 +82,8 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler): port = None try: build_uuid = self.getCommand() - port_location = self.fingergw.rpc.get_job_log_stream_address( + port_location = streamer_utils.getJobLogStreamAddress( + self.fingergw.executor_api, self.fingergw.component_registry, build_uuid, source_zone=self.fingergw.zone) if not port_location: @@ -93,6 +95,8 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler): port = port_location['port'] use_ssl = port_location.get('use_ssl', False) self._fingerClient(server, port, build_uuid, use_ssl) + except StreamingError as e: + self.request.sendall(str(e).encode("utf-8")) except BrokenPipeError: # Client disconnect return except Exception: @@ -110,7 +114,7 @@ class FingerGateway(object): For each incoming finger request, a new thread is started that will be responsible for finding which Zuul executor is executing the - requested build (by asking Gearman), forwarding the request to that + requested build (by asking ZooKeeper), forwarding the request to that executor, and streaming the results back to our client. ''' @@ -127,27 +131,10 @@ class FingerGateway(object): Initialize the finger gateway. :param config: The parsed Zuul configuration. - :param tuple gearman: Gearman connection information. This should - include the server, port, SSL key, SSL cert, and SSL CA. - :param tuple address: The address and port to bind to for our gateway. - :param str user: The user to which we should drop privileges after - binding to our address. :param str command_socket: Path to the daemon command socket. :param str pid_file: Path to the daemon PID file. ''' - gear_server = get_default(config, 'gearman', 'server') - gear_port = get_default(config, 'gearman', 'port', 4730) - gear_ssl_key = get_default(config, 'gearman', 'ssl_key') - gear_ssl_cert = get_default(config, 'gearman', 'ssl_cert') - gear_ssl_ca = get_default(config, 'gearman', 'ssl_ca') - - self.gear_server = gear_server - self.gear_port = gear_port - self.gear_ssl_key = gear_ssl_key - self.gear_ssl_cert = gear_ssl_cert - self.gear_ssl_ca = gear_ssl_ca - host = get_default(config, 'fingergw', 'listen_address', '::') self.port = int(get_default(config, 'fingergw', 'port', 79)) self.public_port = int(get_default( @@ -158,7 +145,6 @@ class FingerGateway(object): self.user = user self.pid_file = pid_file - self.rpc = None self.server = None self.server_thread = None @@ -200,6 +186,10 @@ class FingerGateway(object): self.component_info.use_ssl = True self.component_info.register() + self.component_registry = ComponentRegistry(self.zk_client) + + self.executor_api = ExecutorApi(self.zk_client, use_cache=False) + def _runCommand(self): while self.command_running: try: @@ -219,14 +209,6 @@ class FingerGateway(object): raise def start(self): - self.rpc = zuul.rpcclient.RPCClient( - self.gear_server, - self.gear_port, - self.gear_ssl_key, - self.gear_ssl_cert, - self.gear_ssl_ca, - client_id='Zuul Finger Gateway') - kwargs = dict( user=self.user, pid_file=self.pid_file, @@ -279,13 +261,6 @@ class FingerGateway(object): except Exception: self.log.exception("Error stopping TCP server:") - if self.rpc: - try: - self.rpc.shutdown() - self.rpc = None - except Exception: - self.log.exception("Error stopping RCP client:") - if self.command_socket: self.command_running = False diff --git a/zuul/lib/streamer_utils.py b/zuul/lib/streamer_utils.py index 171f61dd8f..1882dd64a1 100644 --- a/zuul/lib/streamer_utils.py +++ b/zuul/lib/streamer_utils.py @@ -18,8 +18,10 @@ The log streamer process within each executor, the finger gateway service, and the web interface will all make use of this module. ''' +import logging import os import pwd +import random import select import socket import socketserver @@ -27,6 +29,11 @@ import ssl import threading import time +from zuul.exceptions import StreamingError + + +log = logging.getLogger("zuul.lib.streamer_utils") + class BaseFingerRequestHandler(socketserver.BaseRequestHandler): ''' @@ -166,3 +173,60 @@ class CustomThreadingTCPServer(socketserver.ThreadingTCPServer): context.verify_mode = ssl.CERT_REQUIRED sock = context.wrap_socket(sock, server_side=True) return sock, addr + + +def getJobLogStreamAddress(executor_api, component_registry, uuid, + source_zone): + """ + Looks up the log stream address for the given build UUID. + + Try to find the build request for the given UUID in ZooKeeper + by searching through all available zones. If a build request + was found we use the worker information to build the log stream + address. + """ + # Search for the build request in ZooKeeper. This iterates over all + # available zones (inlcuding unzoned) and stops when the UUID is + # found. + build_request, worker_zone = executor_api.getByUuid(uuid) + + if build_request is None: + raise StreamingError("Build not found") + + worker_info = build_request.worker_info + if not worker_info: + raise StreamingError("Build did not start yet") + + job_log_stream_address = {} + if worker_zone and source_zone != worker_zone: + info = _getFingerGatewayInZone(component_registry, worker_zone) + if info: + job_log_stream_address['server'] = info.hostname + job_log_stream_address['port'] = info.public_port + job_log_stream_address['use_ssl'] = info.use_ssl + log.debug('Source (%s) and worker (%s) zone ' + 'are different, routing via %s:%s', + source_zone, worker_zone, + info.hostname, info.public_port) + else: + log.warning('Source (%s) and worker (%s) zone are different' + 'but no fingergw in target zone found. ' + 'Falling back to direct connection.', + source_zone, worker_zone) + else: + log.debug('Source (%s) or worker zone (%s) undefined ' + 'or equal, no routing is needed.', + source_zone, worker_zone) + + if 'server' not in job_log_stream_address: + job_log_stream_address['server'] = worker_info["hostname"] + job_log_stream_address['port'] = worker_info["log_port"] + + return job_log_stream_address + + +def _getFingerGatewayInZone(component_registry, zone): + gws = [gw for gw in component_registry.all('fingergw') if gw.zone == zone] + if gws: + return random.choice(gws) + return None diff --git a/zuul/model.py b/zuul/model.py index 02de935894..0c4ad63e3e 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -2223,6 +2223,10 @@ class BuildRequest(JobRequest): self.tenant_name = tenant_name self.pipeline_name = pipeline_name self.event_id = event_id + # The executor sets the worker info when it locks the build + # request so that zuul web can use this information to + # build the url for the live log stream. + self.worker_info = None def toDict(self): d = super().toDict() @@ -2231,12 +2235,13 @@ class BuildRequest(JobRequest): "tenant_name": self.tenant_name, "pipeline_name": self.pipeline_name, "event_id": self.event_id, + "worker_info": self.worker_info, }) return d @classmethod def fromDict(cls, data): - return cls( + request = cls( data["uuid"], data["zone"], data["tenant_name"], @@ -2247,6 +2252,10 @@ class BuildRequest(JobRequest): result_path=data["result_path"] ) + request.worker_info = data["worker_info"] + + return request + def __repr__(self): return ( f"