Merge "Use component registry in fingergw routing"

This commit is contained in:
Zuul 2021-07-30 00:51:37 +00:00 committed by Gerrit Code Review
commit 32235f980d
8 changed files with 75 additions and 138 deletions

View File

@ -1044,15 +1044,17 @@ sections of ``zuul.conf`` are used by the web server:
.. attr:: zone
The zone in which zuul-web is deployed. This is only needed if there are
executors with different zones in the environment and not all executors
are directly addressable from zuul-web. This can be the case within a
k8s based zuul deployment spread over multiple sites. The zone defines
the zone where the executors are directly adressable. Live log streaming
will go directly to the executors of the same zone and be routed to
a finger gateway of the target zone of the zones are different. The
finger gateway of the other zone is a central entrypoint for all live
log streams of that zone.
The zone in which zuul-web is deployed. This is only needed if
there are executors with different zones in the environment and
not all executors are directly addressable from zuul-web. The
parameter specifies the zone where the executors are directly
adressable. Live log streaming will go directly to the executors
of the same zone and be routed to a finger gateway of the target
zone if the zones are different.
In a mixed system (with zoned and unzoned executors) there may
also be zoned and unzoned zuul-web services. Omit the zone
parameter for any unzoned zuul-web servers.
If this is used the finger gateways should be configured accordingly.
@ -1310,6 +1312,16 @@ sections of ``zuul.conf`` are used by the finger gateway:
user during startup. It is recommended to set this option to an
unprivileged user.
.. attr:: hostname
:default: hostname of the server
When running finger gateways in a multi-zone configuration, the
gateway needs to know its hostname under which it is reachable
by zuul-web. Otherwise live console log streaming doesn't
work. In most cases This is automatically detected
correctly. But when running in environments where it cannot
determine its hostname correctly this can be overridden here.
.. attr:: zone
The zone where the finger gateway is located. This is only needed for
@ -1317,6 +1329,10 @@ sections of ``zuul.conf`` are used by the finger gateway:
zones without the ability to directly connect to all executors from
zuul-web. See :attr:`executor.zone` for further information.
In a mixed system (with zoned and unzoned executors) there may
also be zoned and unzoned finger gateway services. Omit the zone
parameter for any unzoned finger gateway servers.
Operation
~~~~~~~~~

View File

@ -3,5 +3,5 @@ features:
- |
Zuul now can route live log streams via finger gateways to make it possible
to distribute executors over multiple datacenters without the possibility
to directly contact every executor from within zuul-web. This is typical
the case in an k8s based deployment.
to directly contact every executor from within zuul-web. This is typically
the case in a Kubernetes based deployment.

View File

@ -116,7 +116,7 @@ class TestComponentRegistry(ZuulTestCase):
'fingergw': {
'listen_address': self.host,
'port': '0',
'hostname': 'localhost',
'hostname': 'janine',
}
})
gateway = FingerGateway(
@ -128,6 +128,7 @@ class TestComponentRegistry(ZuulTestCase):
try:
self.assertComponentState("fingergw", BaseComponent.RUNNING)
self.assertComponentAttr("fingergw", "hostname", "janine")
finally:
gateway.stop()

View File

@ -32,8 +32,6 @@ from tests.base import iterate_timeout, ZuulWebFixture
from ws4py.client import WebSocketBaseClient
from zuul.lib.gear_utils import getGearmanFunctions
class WSClient(WebSocketBaseClient):
def __init__(self, port, build_uuid):
@ -187,9 +185,13 @@ class TestStreamingBase(tests.base.AnsibleZuulTestCase):
if zone:
for _ in iterate_timeout(20, 'fingergw is registered'):
functions = getGearmanFunctions(gateway.gearworker.gearman)
jobname = 'fingergw:info:%s' % zone
if jobname in functions:
found = False
for gw in self.scheds.first.sched.component_registry.\
all('fingergw'):
if gw.zone == zone:
found = True
break
if found:
break
gateway_port = gateway.server.socket.getsockname()[1]
@ -724,14 +726,15 @@ class TestStreamingZones(TestStreamingBase):
gateway_eu_central.history.clear()
gateway_us_west.history.clear()
# This finger client runs against an unzoned finger gateway while there
# is a target finger client. As it is unzoned it should not route via
# The finger gateway in eu-central.
# This finger client runs against an unzoned finger gateway
# while there is a gateway in the worker zone. It should still
# route via the gateway in the worker zone since that may be
# the only way it's accessible.
finger_client_unzoned2 = self._run_finger_client(
build, gateway_unzoned_address, name='unzoned2')
wait_for_stream('unzoned2')
self.assertEqual(1, len(gateway_unzoned.history))
self.assertEqual(0, len(gateway_eu_central.history))
self.assertEqual(1, len(gateway_eu_central.history))
self.assertEqual(0, len(gateway_us_west.history))
gateway_unzoned.history.clear()
gateway_eu_central.history.clear()

View File

@ -13,23 +13,16 @@
# under the License.
import functools
import json
import logging
import socket
import threading
import time
from configparser import ConfigParser
from typing import Optional
import gear
import zuul.rpcclient
from zuul.lib import streamer_utils
from zuul.lib.commandsocket import CommandSocket
from zuul.lib.config import get_default
from zuul.lib.gear_utils import getGearmanFunctions
from zuul.lib.gearworker import ZuulGearWorker
from zuul.rpcclient import RPCFailure
from zuul.zk import ZooKeeperClient
from zuul.zk.components import FingerGatewayComponent
@ -114,8 +107,6 @@ class FingerGateway(object):
log = logging.getLogger("zuul.fingergw")
handler_class = RequestHandler
gearworker: Optional[ZuulGearWorker]
def __init__(
self,
config: ConfigParser,
@ -174,36 +165,16 @@ class FingerGateway(object):
socket.getfqdn())
self.zone = get_default(config, 'fingergw', 'zone')
if self.zone is not None:
jobs = {
'fingergw:info:%s' % self.zone: self.handle_info,
}
self.gearworker = ZuulGearWorker(
'Finger Gateway',
'zuul.fingergw',
'fingergw-gearman-worker',
config,
jobs)
else:
self.gearworker = None
self.zk_client = ZooKeeperClient.fromConfig(config)
self.zk_client.connect()
self.component_info = FingerGatewayComponent(
self.zk_client, self.hostname
)
if self.zone is not None:
self.component_info.zone = self.zone
self.component_info.public_port = self.public_port
self.component_info.register()
def handle_info(self, job):
self.log.debug('Got %s job: %s', job.name, job.unique)
info = {
'server': self.hostname,
'port': self.public_port,
}
if self.zone:
info['zone'] = self.zone
job.sendWorkComplete(json.dumps(info))
def _runCommand(self):
while self.command_running:
try:
@ -240,6 +211,7 @@ class FingerGateway(object):
# Update port that we really use if we configured a port of 0
if self.public_port == 0:
self.public_port = self.server.socket.getsockname()[1]
self.component_info.public_port = self.public_port
# Start the command processor after the server and privilege drop
if self.command_socket_path:
@ -259,19 +231,11 @@ class FingerGateway(object):
self.server_thread.start()
self.component_info.state = self.component_info.RUNNING
# Register this finger gateway in case we are zoned
if self.gearworker:
self.log.info('Starting gearworker')
self.gearworker.start()
self.log.info("Finger gateway is started")
def stop(self):
self.component_info.state = self.component_info.STOPPED
if self.gearworker:
self.gearworker.stop()
if self.server:
try:
self.server.shutdown()
@ -303,55 +267,7 @@ class FingerGateway(object):
'''
Wait on the gateway to shutdown.
'''
self.gearworker.join()
self.server_thread.join()
if self.command_thread:
self.command_thread.join()
class FingerClient:
log = logging.getLogger("zuul.FingerClient")
def __init__(self, server, port, ssl_key=None, ssl_cert=None, ssl_ca=None):
self.log.debug("Connecting to gearman at %s:%s" % (server, port))
self.gearman = gear.Client()
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
keepalive=True, tcp_keepidle=60,
tcp_keepintvl=30, tcp_keepcnt=5)
self.log.debug("Waiting for gearman")
self.gearman.waitForServer()
def submitJob(self, name, data):
self.log.debug("Submitting job %s with data %s" % (name, data))
job = gear.TextJob(name,
json.dumps(data),
unique=str(time.time()))
self.gearman.submitJob(job, timeout=300)
self.log.debug("Waiting for job completion")
while not job.complete:
time.sleep(0.1)
if job.exception:
raise RPCFailure(job.exception)
self.log.debug("Job complete, success: %s" % (not job.failure))
return job
def shutdown(self):
self.gearman.shutdown()
def get_fingergw_in_zone(self, zone):
job_name = 'fingergw:info:%s' % zone
functions = getGearmanFunctions(self.gearman)
if job_name not in functions:
# There is no matching fingergw
self.log.warning('No fingergw found in zone %s', zone)
return None
job = self.submitJob(job_name, {})
if job.failure:
self.log.warning('Failed to get fingergw info from zone %s: '
'%s', zone, job)
return None
else:
return json.loads(job.data[0])

View File

@ -260,6 +260,11 @@ class RPCListener(RPCListenerBase):
job.sendWorkComplete(json.dumps(running_items))
def _get_fingergw_in_zone(self, zone):
for gw in self.sched.component_registry.all('fingergw'):
if gw.zone == zone:
return gw
def handle_get_job_log_stream_address(self, job):
# TODO: map log files to ports. Currently there is only one
# log stream for a given job. But many jobs produce many
@ -287,26 +292,28 @@ class RPCListener(RPCListenerBase):
# If zone and worker zone are both given check if we need to route
# via a finger gateway in that zone.
source_zone = args.get('source_zone')
if (source_zone and build.worker.zone and
source_zone != build.worker.zone):
info = self.sched.finger_client.get_fingergw_in_zone(
build.worker.zone)
if (build.worker.zone and source_zone != build.worker.zone):
info = self._get_fingergw_in_zone(build.worker.zone)
if info:
job_log_stream_address['server'] = info['server']
job_log_stream_address['port'] = info['port']
use_ssl = info.get('use_ssl')
job_log_stream_address['server'] = info.hostname
job_log_stream_address['port'] = info.public_port
use_ssl = getattr(info, 'use_ssl', False)
if use_ssl:
job_log_stream_address['use_ssl'] = use_ssl
self.log.debug('Source and worker zone are different, '
'routing via %s:%s', info['server'],
info['port'])
self.log.debug('Source (%s) and worker (%s) zone '
'are different, routing via %s:%s',
source_zone, build.worker.zone,
info.hostname, info.public_port)
else:
self.log.warning('Source and worker zone are different '
'but no fingergw in target zone found. '
'Falling back to direct connection.')
self.log.warning('Source (%s) and worker (%s) zone '
'are different but no fingergw in '
'target zone found. '
'Falling back to direct connection.',
source_zone, build.worker.zone)
else:
self.log.debug('Source or worker zone undefined or equal, no'
' routing is needed.')
self.log.debug('Source (%s) or worker zone (%s) undefined '
'or equal, no routing is needed.',
source_zone, build.worker.zone)
if 'server' not in job_log_stream_address:
job_log_stream_address['server'] = build.worker.hostname

View File

@ -37,7 +37,6 @@ from zuul import rpclistener
from zuul.lib import commandsocket
from zuul.lib.ansible import AnsibleManager
from zuul.lib.config import get_default
from zuul.lib.fingergw import FingerClient
from zuul.lib.gear_utils import getGearmanFunctions
from zuul.lib.keystorage import FileKeyStorage, ZooKeeperKeyStorage
from zuul.lib.logutil import get_annotated_logger
@ -240,14 +239,6 @@ class Scheduler(threading.Thread):
self.nodepool = nodepool.Nodepool(
self.zk_client, self.hostname, self.statsd, self)
self.gear_server = get_default(config, 'gearman', 'server')
self.gear_port = get_default(config, 'gearman', 'port', 4730)
self.gear_ssl_key = get_default(config, 'gearman', 'ssl_key')
self.gear_ssl_cert = get_default(config, 'gearman', 'ssl_cert')
self.gear_ssl_ca = get_default(config, 'gearman', 'ssl_ca')
self.finger_client = None
def start(self):
super(Scheduler, self).start()
self.keystore = ZooKeeperKeyStorage(
@ -263,10 +254,6 @@ class Scheduler(threading.Thread):
self.command_thread.daemon = True
self.command_thread.start()
self.finger_client = FingerClient(
self.gear_server, self.gear_port, self.gear_ssl_key,
self.gear_ssl_cert, self.gear_ssl_ca)
self.rpc.start()
self.rpc_slow.start()
self.stats_thread.start()
@ -291,7 +278,6 @@ class Scheduler(threading.Thread):
self.rpc_slow.stop()
self.rpc_slow.join()
self.stop_repl()
self.finger_client.shutdown()
self._command_running = False
self.command_socket.stop()
self.command_thread.join()

View File

@ -150,6 +150,14 @@ class MergerComponent(BaseComponent):
class FingerGatewayComponent(BaseComponent):
kind = "fingergw"
def __init__(self, client, hostname):
super().__init__(client, hostname)
self.initial_state = {
"zone": None,
"public_port": None,
}
self.content.update(self.initial_state)
class WebComponent(BaseComponent):
kind = "web"