Let zuul-web look up the live log streaming address from ZooKeeper

This removes the RPC call (Gearman) in zuul-web to look up the live log
streaming address from the build objects in the scheduler and instead
uses the build requests stored in ZooKeeper.

As the address lookup is implemented as a shared library function which
is used by zuul-web and the fingergw, the fingergw is also switched from
RPC to ZooKeeper. The implementation itself was moved from
zuul.rpclistener to zuul.lib.streamer_utils.

To make the lookup via ZooKeeper work, the executor now stores its
worker information (hostname, log_port) on the build request when it
locks the request.

Additionally, the rpc client was removed from the fingergw as it's not
needed anymore. Instead the fingergw has now access to the component
registry and the executor api in ZooKeeper as both are needed to look up
the streaming address.

To not create unnecessary watches for build requests in each fingergw
and zuul-web component, the executor api (resp. the job_request_queue
base class) now provides a "use_cache" flag. The cache is enabled by
default, but if the flag is set to False, no watches will be created.

Overall this should reduce the load on the scheduler as it doesn't need
to handle the related RPC calls anymore.

Change-Id: I359b70f2d5700b0435544db3ce81d64cb8b73464
This commit is contained in:
Felix Edel 2021-09-16 10:44:18 +02:00
parent 010bf70c8b
commit df32d4cf58
10 changed files with 146 additions and 115 deletions

View File

@ -39,6 +39,10 @@ class ConfigurationError(Exception):
pass pass
class StreamingError(Exception):
pass
# Authentication Exceptions # Authentication Exceptions
class AuthTokenException(Exception): class AuthTokenException(Exception):

View File

@ -3579,6 +3579,12 @@ class ExecutorServer(BaseMergeServer):
# clearing the params so if we fail, no one tries to # clearing the params so if we fail, no one tries to
# re-run the job. # re-run the job.
build_request.state = BuildRequest.RUNNING build_request.state = BuildRequest.RUNNING
# Set the hostname on the build request so it can be used by
# zuul-web for the live log streaming.
build_request.worker_info = {
"hostname": self.hostname,
"log_port": self.log_streaming_port,
}
self.executor_api.update(build_request) self.executor_api.update(build_request)
except Exception: except Exception:
log.exception("Exception while preparing to start worker") log.exception("Exception while preparing to start worker")

View File

@ -20,12 +20,13 @@ import threading
from configparser import ConfigParser from configparser import ConfigParser
from typing import Optional from typing import Optional
import zuul.rpcclient from zuul.exceptions import StreamingError
from zuul.lib import streamer_utils from zuul.lib import streamer_utils
from zuul.lib.commandsocket import CommandSocket from zuul.lib.commandsocket import CommandSocket
from zuul.lib.config import get_default from zuul.lib.config import get_default
from zuul.zk import ZooKeeperClient from zuul.zk import ZooKeeperClient
from zuul.zk.components import FingerGatewayComponent from zuul.zk.components import ComponentRegistry, FingerGatewayComponent
from zuul.zk.executor import ExecutorApi
COMMANDS = ['stop'] COMMANDS = ['stop']
@ -81,7 +82,8 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler):
port = None port = None
try: try:
build_uuid = self.getCommand() build_uuid = self.getCommand()
port_location = self.fingergw.rpc.get_job_log_stream_address( port_location = streamer_utils.getJobLogStreamAddress(
self.fingergw.executor_api, self.fingergw.component_registry,
build_uuid, source_zone=self.fingergw.zone) build_uuid, source_zone=self.fingergw.zone)
if not port_location: if not port_location:
@ -93,6 +95,8 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler):
port = port_location['port'] port = port_location['port']
use_ssl = port_location.get('use_ssl', False) use_ssl = port_location.get('use_ssl', False)
self._fingerClient(server, port, build_uuid, use_ssl) self._fingerClient(server, port, build_uuid, use_ssl)
except StreamingError as e:
self.request.sendall(str(e).encode("utf-8"))
except BrokenPipeError: # Client disconnect except BrokenPipeError: # Client disconnect
return return
except Exception: except Exception:
@ -110,7 +114,7 @@ class FingerGateway(object):
For each incoming finger request, a new thread is started that will For each incoming finger request, a new thread is started that will
be responsible for finding which Zuul executor is executing the be responsible for finding which Zuul executor is executing the
requested build (by asking Gearman), forwarding the request to that requested build (by asking ZooKeeper), forwarding the request to that
executor, and streaming the results back to our client. executor, and streaming the results back to our client.
''' '''
@ -127,27 +131,10 @@ class FingerGateway(object):
Initialize the finger gateway. Initialize the finger gateway.
:param config: The parsed Zuul configuration. :param config: The parsed Zuul configuration.
:param tuple gearman: Gearman connection information. This should
include the server, port, SSL key, SSL cert, and SSL CA.
:param tuple address: The address and port to bind to for our gateway.
:param str user: The user to which we should drop privileges after
binding to our address.
:param str command_socket: Path to the daemon command socket. :param str command_socket: Path to the daemon command socket.
:param str pid_file: Path to the daemon PID file. :param str pid_file: Path to the daemon PID file.
''' '''
gear_server = get_default(config, 'gearman', 'server')
gear_port = get_default(config, 'gearman', 'port', 4730)
gear_ssl_key = get_default(config, 'gearman', 'ssl_key')
gear_ssl_cert = get_default(config, 'gearman', 'ssl_cert')
gear_ssl_ca = get_default(config, 'gearman', 'ssl_ca')
self.gear_server = gear_server
self.gear_port = gear_port
self.gear_ssl_key = gear_ssl_key
self.gear_ssl_cert = gear_ssl_cert
self.gear_ssl_ca = gear_ssl_ca
host = get_default(config, 'fingergw', 'listen_address', '::') host = get_default(config, 'fingergw', 'listen_address', '::')
self.port = int(get_default(config, 'fingergw', 'port', 79)) self.port = int(get_default(config, 'fingergw', 'port', 79))
self.public_port = int(get_default( self.public_port = int(get_default(
@ -158,7 +145,6 @@ class FingerGateway(object):
self.user = user self.user = user
self.pid_file = pid_file self.pid_file = pid_file
self.rpc = None
self.server = None self.server = None
self.server_thread = None self.server_thread = None
@ -200,6 +186,10 @@ class FingerGateway(object):
self.component_info.use_ssl = True self.component_info.use_ssl = True
self.component_info.register() self.component_info.register()
self.component_registry = ComponentRegistry(self.zk_client)
self.executor_api = ExecutorApi(self.zk_client, use_cache=False)
def _runCommand(self): def _runCommand(self):
while self.command_running: while self.command_running:
try: try:
@ -219,14 +209,6 @@ class FingerGateway(object):
raise raise
def start(self): def start(self):
self.rpc = zuul.rpcclient.RPCClient(
self.gear_server,
self.gear_port,
self.gear_ssl_key,
self.gear_ssl_cert,
self.gear_ssl_ca,
client_id='Zuul Finger Gateway')
kwargs = dict( kwargs = dict(
user=self.user, user=self.user,
pid_file=self.pid_file, pid_file=self.pid_file,
@ -279,13 +261,6 @@ class FingerGateway(object):
except Exception: except Exception:
self.log.exception("Error stopping TCP server:") self.log.exception("Error stopping TCP server:")
if self.rpc:
try:
self.rpc.shutdown()
self.rpc = None
except Exception:
self.log.exception("Error stopping RCP client:")
if self.command_socket: if self.command_socket:
self.command_running = False self.command_running = False

View File

@ -18,8 +18,10 @@ The log streamer process within each executor, the finger gateway service,
and the web interface will all make use of this module. and the web interface will all make use of this module.
''' '''
import logging
import os import os
import pwd import pwd
import random
import select import select
import socket import socket
import socketserver import socketserver
@ -27,6 +29,11 @@ import ssl
import threading import threading
import time import time
from zuul.exceptions import StreamingError
log = logging.getLogger("zuul.lib.streamer_utils")
class BaseFingerRequestHandler(socketserver.BaseRequestHandler): class BaseFingerRequestHandler(socketserver.BaseRequestHandler):
''' '''
@ -166,3 +173,60 @@ class CustomThreadingTCPServer(socketserver.ThreadingTCPServer):
context.verify_mode = ssl.CERT_REQUIRED context.verify_mode = ssl.CERT_REQUIRED
sock = context.wrap_socket(sock, server_side=True) sock = context.wrap_socket(sock, server_side=True)
return sock, addr return sock, addr
def getJobLogStreamAddress(executor_api, component_registry, uuid,
source_zone):
"""
Looks up the log stream address for the given build UUID.
Try to find the build request for the given UUID in ZooKeeper
by searching through all available zones. If a build request
was found we use the worker information to build the log stream
address.
"""
# Search for the build request in ZooKeeper. This iterates over all
# available zones (inlcuding unzoned) and stops when the UUID is
# found.
build_request, worker_zone = executor_api.getByUuid(uuid)
if build_request is None:
raise StreamingError("Build not found")
worker_info = build_request.worker_info
if not worker_info:
raise StreamingError("Build did not start yet")
job_log_stream_address = {}
if worker_zone and source_zone != worker_zone:
info = _getFingerGatewayInZone(component_registry, worker_zone)
if info:
job_log_stream_address['server'] = info.hostname
job_log_stream_address['port'] = info.public_port
job_log_stream_address['use_ssl'] = info.use_ssl
log.debug('Source (%s) and worker (%s) zone '
'are different, routing via %s:%s',
source_zone, worker_zone,
info.hostname, info.public_port)
else:
log.warning('Source (%s) and worker (%s) zone are different'
'but no fingergw in target zone found. '
'Falling back to direct connection.',
source_zone, worker_zone)
else:
log.debug('Source (%s) or worker zone (%s) undefined '
'or equal, no routing is needed.',
source_zone, worker_zone)
if 'server' not in job_log_stream_address:
job_log_stream_address['server'] = worker_info["hostname"]
job_log_stream_address['port'] = worker_info["log_port"]
return job_log_stream_address
def _getFingerGatewayInZone(component_registry, zone):
gws = [gw for gw in component_registry.all('fingergw') if gw.zone == zone]
if gws:
return random.choice(gws)
return None

View File

@ -2223,6 +2223,10 @@ class BuildRequest(JobRequest):
self.tenant_name = tenant_name self.tenant_name = tenant_name
self.pipeline_name = pipeline_name self.pipeline_name = pipeline_name
self.event_id = event_id self.event_id = event_id
# The executor sets the worker info when it locks the build
# request so that zuul web can use this information to
# build the url for the live log stream.
self.worker_info = None
def toDict(self): def toDict(self):
d = super().toDict() d = super().toDict()
@ -2231,12 +2235,13 @@ class BuildRequest(JobRequest):
"tenant_name": self.tenant_name, "tenant_name": self.tenant_name,
"pipeline_name": self.pipeline_name, "pipeline_name": self.pipeline_name,
"event_id": self.event_id, "event_id": self.event_id,
"worker_info": self.worker_info,
}) })
return d return d
@classmethod @classmethod
def fromDict(cls, data): def fromDict(cls, data):
return cls( request = cls(
data["uuid"], data["uuid"],
data["zone"], data["zone"],
data["tenant_name"], data["tenant_name"],
@ -2247,6 +2252,10 @@ class BuildRequest(JobRequest):
result_path=data["result_path"] result_path=data["result_path"]
) )
request.worker_info = data["worker_info"]
return request
def __repr__(self): def __repr__(self):
return ( return (
f"<BuildRequest {self.uuid}, state={self.state}, " f"<BuildRequest {self.uuid}, state={self.state}, "

View File

@ -16,7 +16,6 @@
import json import json
import logging import logging
import time import time
import random
from abc import ABCMeta from abc import ABCMeta
from typing import List from typing import List
@ -151,7 +150,6 @@ class RPCListener(RPCListenerBase):
'allowed_labels_get', 'allowed_labels_get',
'get_admin_tenants', 'get_admin_tenants',
'get_running_jobs', 'get_running_jobs',
'get_job_log_stream_address',
'tenant_list', 'tenant_list',
'status_get', 'status_get',
'job_get', 'job_get',
@ -261,66 +259,6 @@ class RPCListener(RPCListenerBase):
job.sendWorkComplete(json.dumps(running_items)) job.sendWorkComplete(json.dumps(running_items))
def _get_fingergw_in_zone(self, zone):
gws = [gw for gw in self.sched.component_registry.all('fingergw')
if gw.zone == zone]
if gws:
return random.choice(gws)
return None
def handle_get_job_log_stream_address(self, job):
# TODO: map log files to ports. Currently there is only one
# log stream for a given job. But many jobs produce many
# log files, so this is forwards compatible with a future
# where there are more logs to potentially request than
# "console.log"
def find_build(uuid):
for tenant in self.sched.abide.tenants.values():
for pipeline_name, pipeline in tenant.layout.pipelines.items():
for queue in pipeline.queues:
for item in queue.queue:
for bld in item.current_build_set.getBuilds():
if bld.uuid == uuid:
return bld
return None
args = json.loads(job.arguments)
self.log.debug('Handle get_job_log_stream_address with arguments %s',
job.arguments)
uuid = args['uuid']
# TODO: logfile = args['logfile']
job_log_stream_address = {}
build = find_build(uuid)
if build:
# If zone and worker zone are both given check if we need to route
# via a finger gateway in that zone.
source_zone = args.get('source_zone')
if (build.worker.zone and source_zone != build.worker.zone):
info = self._get_fingergw_in_zone(build.worker.zone)
if info:
job_log_stream_address['server'] = info.hostname
job_log_stream_address['port'] = info.public_port
job_log_stream_address['use_ssl'] = info.use_ssl
self.log.debug('Source (%s) and worker (%s) zone '
'are different, routing via %s:%s',
source_zone, build.worker.zone,
info.hostname, info.public_port)
else:
self.log.warning('Source (%s) and worker (%s) zone '
'are different but no fingergw in '
'target zone found. '
'Falling back to direct connection.',
source_zone, build.worker.zone)
else:
self.log.debug('Source (%s) or worker zone (%s) undefined '
'or equal, no routing is needed.',
source_zone, build.worker.zone)
if 'server' not in job_log_stream_address:
job_log_stream_address['server'] = build.worker.hostname
job_log_stream_address['port'] = build.worker.log_port
job.sendWorkComplete(json.dumps(job_log_stream_address))
def _is_authorized(self, tenant, claims): def _is_authorized(self, tenant, claims):
authorized = False authorized = False
if tenant: if tenant:

View File

@ -32,11 +32,13 @@ import threading
from zuul import exceptions from zuul import exceptions
import zuul.lib.repl import zuul.lib.repl
from zuul.lib import commandsocket from zuul.lib import commandsocket
from zuul.lib import streamer_utils
from zuul.lib.re2util import filter_allowed_disallowed from zuul.lib.re2util import filter_allowed_disallowed
import zuul.model import zuul.model
import zuul.rpcclient import zuul.rpcclient
from zuul.zk import ZooKeeperClient from zuul.zk import ZooKeeperClient
from zuul.zk.components import WebComponent from zuul.zk.components import ComponentRegistry, WebComponent
from zuul.zk.executor import ExecutorApi
from zuul.zk.nodepool import ZooKeeperNodepool from zuul.zk.nodepool import ZooKeeperNodepool
from zuul.zk.system import ZuulSystem from zuul.zk.system import ZuulSystem
from zuul.lib.auth import AuthenticatorRegistry from zuul.lib.auth import AuthenticatorRegistry
@ -166,10 +168,15 @@ class LogStreamHandler(WebSocket):
"'{key}' missing from request payload".format( "'{key}' missing from request payload".format(
key=key)) key=key))
port_location = self.zuulweb.rpc.get_job_log_stream_address( try:
request['uuid'], source_zone=self.zuulweb.zone) port_location = streamer_utils.getJobLogStreamAddress(
self.zuulweb.executor_api, self.zuulweb.component_registry,
request['uuid'], source_zone=self.zuulweb.zone)
except exceptions.StreamingError as e:
return self.logClose(4011, str(e))
if not port_location: if not port_location:
return self.logClose(4011, "Error with Gearman") return self.logClose(4011, "Error with log streaming")
self.streamer = LogStreamer( self.streamer = LogStreamer(
self.zuulweb, self, self.zuulweb, self,
@ -1293,9 +1300,14 @@ class ZuulWeb(object):
client_id='Zuul Web Server') client_id='Zuul Web Server')
self.zk_client = ZooKeeperClient.fromConfig(self.config) self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect() self.zk_client.connect()
self.executor_api = ExecutorApi(self.zk_client, use_cache=False)
self.component_info = WebComponent(self.zk_client, self.hostname) self.component_info = WebComponent(self.zk_client, self.hostname)
self.component_info.register() self.component_info.register()
self.component_registry = ComponentRegistry(self.zk_client)
self.connections = connections self.connections = connections
self.authenticators = authenticators self.authenticators = authenticators
self.stream_manager = StreamManager() self.stream_manager = StreamManager()

View File

@ -27,11 +27,13 @@ class ExecutorQueue(JobRequestQueue):
def __init__(self, client, root, def __init__(self, client, root,
initial_state_getter, initial_state_getter,
use_cache=True,
request_callback=None, request_callback=None,
event_callback=None): event_callback=None):
self.log.debug("Creating executor queue at root %s", root) self.log.debug("Creating executor queue at root %s", root)
self._initial_state_getter = initial_state_getter self._initial_state_getter = initial_state_getter
super().__init__(client, root, request_callback, event_callback) super().__init__(
client, root, use_cache, request_callback, event_callback)
@property @property
def initial_state(self): def initial_state(self):
@ -51,10 +53,11 @@ class ExecutorQueue(JobRequestQueue):
class ExecutorApi: class ExecutorApi:
log = logging.getLogger("zuul.ExecutorApi") log = logging.getLogger("zuul.ExecutorApi")
def __init__(self, client, zone_filter=None, def __init__(self, client, zone_filter=None, use_cache=True,
build_request_callback=None, build_request_callback=None,
build_event_callback=None): build_event_callback=None):
self.client = client self.client = client
self.use_cache = use_cache
self.request_callback = build_request_callback self.request_callback = build_request_callback
self.event_callback = build_event_callback self.event_callback = build_event_callback
self.zone_filter = zone_filter self.zone_filter = zone_filter
@ -68,6 +71,7 @@ class ExecutorApi:
self.client, self.client,
self._getZoneRoot(zone), self._getZoneRoot(zone),
self._getInitialState, self._getInitialState,
self.use_cache,
self.request_callback, self.request_callback,
self.event_callback)) self.event_callback))
@ -138,6 +142,17 @@ class ExecutorApi:
zone = None zone = None
return self.zone_queues[zone].get(path) return self.zone_queues[zone].get(path)
def getByUuid(self, uuid):
"""Find a build request by its UUID.
This method will search for the UUID in all available zones.
"""
for zone in self._getAllZones():
request = self.zone_queues[zone].getByUuid(uuid)
if request:
return request, zone
return None, None
def remove(self, request): def remove(self, request):
return self.zone_queues[request.zone].remove(request) return self.zone_queues[request.zone].remove(request)

View File

@ -44,10 +44,12 @@ class JobRequestQueue(ZooKeeperSimpleBase):
log = logging.getLogger("zuul.JobRequestQueue") log = logging.getLogger("zuul.JobRequestQueue")
request_class = JobRequest request_class = JobRequest
def __init__(self, client, root, def __init__(self, client, root, use_cache=True,
request_callback=None, event_callback=None): request_callback=None, event_callback=None):
super().__init__(client) super().__init__(client)
self.use_cache = use_cache
self.REQUEST_ROOT = f"{root}/requests" self.REQUEST_ROOT = f"{root}/requests"
self.LOCK_ROOT = f"{root}/locks" self.LOCK_ROOT = f"{root}/locks"
self.PARAM_ROOT = f"{root}/params" self.PARAM_ROOT = f"{root}/params"
@ -76,12 +78,13 @@ class JobRequestQueue(ZooKeeperSimpleBase):
return self.request_class.REQUESTED return self.request_class.REQUESTED
def register(self): def register(self):
# Register a child watch that listens for new requests if self.use_cache:
self.kazoo_client.ChildrenWatch( # Register a child watch that listens for new requests
self.REQUEST_ROOT, self.kazoo_client.ChildrenWatch(
self._makeRequestWatcher(self.REQUEST_ROOT), self.REQUEST_ROOT,
send_event=True, self._makeRequestWatcher(self.REQUEST_ROOT),
) send_event=True,
)
def _makeRequestWatcher(self, path): def _makeRequestWatcher(self, path):
def watch(requests, event=None): def watch(requests, event=None):
@ -292,6 +295,11 @@ class JobRequestQueue(ZooKeeperSimpleBase):
return request return request
def getByUuid(self, uuid):
"""Get a request by its UUID without using the cache."""
path = f"{self.REQUEST_ROOT}/{uuid}"
return self.get(path)
def refresh(self, request): def refresh(self, request):
"""Refreshs a request object with the current data from ZooKeeper. """ """Refreshs a request object with the current data from ZooKeeper. """
try: try:

View File

@ -22,6 +22,6 @@ class MergerApi(JobRequestQueue):
log = logging.getLogger("zuul.MergerApi") log = logging.getLogger("zuul.MergerApi")
request_class = MergeRequest request_class = MergeRequest
def __init__(self, client, merge_request_callback=None): def __init__(self, client, use_cache=True, merge_request_callback=None):
root = '/zuul/merger' root = '/zuul/merger'
super().__init__(client, root, merge_request_callback) super().__init__(client, root, use_cache, merge_request_callback)