Use component registry in fingergw routing
This uses the component registry rather than gearman to perform fingergw routing lookups. It also adjusts the logic for routing to match the latest version of the spec, where unzoned fingergw process are expected to route to zoned fingergws if they exist (because the unzoned fingergw might be a public gateway outside of the zone). Change-Id: I2f9fed03159db59cc4e496802b9dab05f746e1a2
This commit is contained in:
parent
5c4e8d7ddd
commit
a0974f9f8c
|
@ -1038,15 +1038,17 @@ sections of ``zuul.conf`` are used by the web server:
|
|||
|
||||
.. attr:: zone
|
||||
|
||||
The zone in which zuul-web is deployed. This is only needed if there are
|
||||
executors with different zones in the environment and not all executors
|
||||
are directly addressable from zuul-web. This can be the case within a
|
||||
k8s based zuul deployment spread over multiple sites. The zone defines
|
||||
the zone where the executors are directly adressable. Live log streaming
|
||||
will go directly to the executors of the same zone and be routed to
|
||||
a finger gateway of the target zone of the zones are different. The
|
||||
finger gateway of the other zone is a central entrypoint for all live
|
||||
log streams of that zone.
|
||||
The zone in which zuul-web is deployed. This is only needed if
|
||||
there are executors with different zones in the environment and
|
||||
not all executors are directly addressable from zuul-web. The
|
||||
parameter specifies the zone where the executors are directly
|
||||
adressable. Live log streaming will go directly to the executors
|
||||
of the same zone and be routed to a finger gateway of the target
|
||||
zone if the zones are different.
|
||||
|
||||
In a mixed system (with zoned and unzoned executors) there may
|
||||
also be zoned and unzoned zuul-web services. Omit the zone
|
||||
parameter for any unzoned zuul-web servers.
|
||||
|
||||
If this is used the finger gateways should be configured accordingly.
|
||||
|
||||
|
@ -1304,6 +1306,16 @@ sections of ``zuul.conf`` are used by the finger gateway:
|
|||
user during startup. It is recommended to set this option to an
|
||||
unprivileged user.
|
||||
|
||||
.. attr:: hostname
|
||||
:default: hostname of the server
|
||||
|
||||
When running finger gateways in a multi-zone configuration, the
|
||||
gateway needs to know its hostname under which it is reachable
|
||||
by zuul-web. Otherwise live console log streaming doesn't
|
||||
work. In most cases This is automatically detected
|
||||
correctly. But when running in environments where it cannot
|
||||
determine its hostname correctly this can be overridden here.
|
||||
|
||||
.. attr:: zone
|
||||
|
||||
The zone where the finger gateway is located. This is only needed for
|
||||
|
@ -1311,6 +1323,10 @@ sections of ``zuul.conf`` are used by the finger gateway:
|
|||
zones without the ability to directly connect to all executors from
|
||||
zuul-web. See :attr:`executor.zone` for further information.
|
||||
|
||||
In a mixed system (with zoned and unzoned executors) there may
|
||||
also be zoned and unzoned finger gateway services. Omit the zone
|
||||
parameter for any unzoned finger gateway servers.
|
||||
|
||||
Operation
|
||||
~~~~~~~~~
|
||||
|
||||
|
|
|
@ -3,5 +3,5 @@ features:
|
|||
- |
|
||||
Zuul now can route live log streams via finger gateways to make it possible
|
||||
to distribute executors over multiple datacenters without the possibility
|
||||
to directly contact every executor from within zuul-web. This is typical
|
||||
the case in an k8s based deployment.
|
||||
to directly contact every executor from within zuul-web. This is typically
|
||||
the case in a Kubernetes based deployment.
|
||||
|
|
|
@ -106,7 +106,7 @@ class TestComponentRegistry(ZuulTestCase):
|
|||
'fingergw': {
|
||||
'listen_address': self.host,
|
||||
'port': '0',
|
||||
'hostname': 'localhost',
|
||||
'hostname': 'janine',
|
||||
}
|
||||
})
|
||||
gateway = FingerGateway(
|
||||
|
@ -118,6 +118,7 @@ class TestComponentRegistry(ZuulTestCase):
|
|||
|
||||
try:
|
||||
self.assertComponentState("fingergw", BaseComponent.RUNNING)
|
||||
self.assertComponentAttr("fingergw", "hostname", "janine")
|
||||
finally:
|
||||
gateway.stop()
|
||||
|
||||
|
|
|
@ -32,8 +32,6 @@ from tests.base import iterate_timeout, ZuulWebFixture
|
|||
|
||||
from ws4py.client import WebSocketBaseClient
|
||||
|
||||
from zuul.lib.gear_utils import getGearmanFunctions
|
||||
|
||||
|
||||
class WSClient(WebSocketBaseClient):
|
||||
def __init__(self, port, build_uuid):
|
||||
|
@ -187,9 +185,13 @@ class TestStreamingBase(tests.base.AnsibleZuulTestCase):
|
|||
|
||||
if zone:
|
||||
for _ in iterate_timeout(20, 'fingergw is registered'):
|
||||
functions = getGearmanFunctions(gateway.gearworker.gearman)
|
||||
jobname = 'fingergw:info:%s' % zone
|
||||
if jobname in functions:
|
||||
found = False
|
||||
for gw in self.scheds.first.sched.component_registry.\
|
||||
all('fingergw'):
|
||||
if gw.zone == zone:
|
||||
found = True
|
||||
break
|
||||
if found:
|
||||
break
|
||||
|
||||
gateway_port = gateway.server.socket.getsockname()[1]
|
||||
|
@ -724,14 +726,15 @@ class TestStreamingZones(TestStreamingBase):
|
|||
gateway_eu_central.history.clear()
|
||||
gateway_us_west.history.clear()
|
||||
|
||||
# This finger client runs against an unzoned finger gateway while there
|
||||
# is a target finger client. As it is unzoned it should not route via
|
||||
# The finger gateway in eu-central.
|
||||
# This finger client runs against an unzoned finger gateway
|
||||
# while there is a gateway in the worker zone. It should still
|
||||
# route via the gateway in the worker zone since that may be
|
||||
# the only way it's accessible.
|
||||
finger_client_unzoned2 = self._run_finger_client(
|
||||
build, gateway_unzoned_address, name='unzoned2')
|
||||
wait_for_stream('unzoned2')
|
||||
self.assertEqual(1, len(gateway_unzoned.history))
|
||||
self.assertEqual(0, len(gateway_eu_central.history))
|
||||
self.assertEqual(1, len(gateway_eu_central.history))
|
||||
self.assertEqual(0, len(gateway_us_west.history))
|
||||
gateway_unzoned.history.clear()
|
||||
gateway_eu_central.history.clear()
|
||||
|
|
|
@ -13,23 +13,16 @@
|
|||
# under the License.
|
||||
|
||||
import functools
|
||||
import json
|
||||
import logging
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
from configparser import ConfigParser
|
||||
from typing import Optional
|
||||
|
||||
import gear
|
||||
|
||||
import zuul.rpcclient
|
||||
from zuul.lib import streamer_utils
|
||||
from zuul.lib.commandsocket import CommandSocket
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.gear_utils import getGearmanFunctions
|
||||
from zuul.lib.gearworker import ZuulGearWorker
|
||||
from zuul.rpcclient import RPCFailure
|
||||
from zuul.zk import ZooKeeperClient
|
||||
from zuul.zk.components import FingerGatewayComponent
|
||||
|
||||
|
@ -114,8 +107,6 @@ class FingerGateway(object):
|
|||
log = logging.getLogger("zuul.fingergw")
|
||||
handler_class = RequestHandler
|
||||
|
||||
gearworker: Optional[ZuulGearWorker]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: ConfigParser,
|
||||
|
@ -174,36 +165,16 @@ class FingerGateway(object):
|
|||
socket.getfqdn())
|
||||
self.zone = get_default(config, 'fingergw', 'zone')
|
||||
|
||||
if self.zone is not None:
|
||||
jobs = {
|
||||
'fingergw:info:%s' % self.zone: self.handle_info,
|
||||
}
|
||||
self.gearworker = ZuulGearWorker(
|
||||
'Finger Gateway',
|
||||
'zuul.fingergw',
|
||||
'fingergw-gearman-worker',
|
||||
config,
|
||||
jobs)
|
||||
else:
|
||||
self.gearworker = None
|
||||
|
||||
self.zk_client = ZooKeeperClient.fromConfig(config)
|
||||
self.zk_client.connect()
|
||||
self.component_info = FingerGatewayComponent(
|
||||
self.zk_client, self.hostname
|
||||
)
|
||||
if self.zone is not None:
|
||||
self.component_info.zone = self.zone
|
||||
self.component_info.public_port = self.public_port
|
||||
self.component_info.register()
|
||||
|
||||
def handle_info(self, job):
|
||||
self.log.debug('Got %s job: %s', job.name, job.unique)
|
||||
info = {
|
||||
'server': self.hostname,
|
||||
'port': self.public_port,
|
||||
}
|
||||
if self.zone:
|
||||
info['zone'] = self.zone
|
||||
job.sendWorkComplete(json.dumps(info))
|
||||
|
||||
def _runCommand(self):
|
||||
while self.command_running:
|
||||
try:
|
||||
|
@ -240,6 +211,7 @@ class FingerGateway(object):
|
|||
# Update port that we really use if we configured a port of 0
|
||||
if self.public_port == 0:
|
||||
self.public_port = self.server.socket.getsockname()[1]
|
||||
self.component_info.public_port = self.public_port
|
||||
|
||||
# Start the command processor after the server and privilege drop
|
||||
if self.command_socket_path:
|
||||
|
@ -259,19 +231,11 @@ class FingerGateway(object):
|
|||
self.server_thread.start()
|
||||
self.component_info.state = self.component_info.RUNNING
|
||||
|
||||
# Register this finger gateway in case we are zoned
|
||||
if self.gearworker:
|
||||
self.log.info('Starting gearworker')
|
||||
self.gearworker.start()
|
||||
|
||||
self.log.info("Finger gateway is started")
|
||||
|
||||
def stop(self):
|
||||
self.component_info.state = self.component_info.STOPPED
|
||||
|
||||
if self.gearworker:
|
||||
self.gearworker.stop()
|
||||
|
||||
if self.server:
|
||||
try:
|
||||
self.server.shutdown()
|
||||
|
@ -303,55 +267,7 @@ class FingerGateway(object):
|
|||
'''
|
||||
Wait on the gateway to shutdown.
|
||||
'''
|
||||
self.gearworker.join()
|
||||
self.server_thread.join()
|
||||
|
||||
if self.command_thread:
|
||||
self.command_thread.join()
|
||||
|
||||
|
||||
class FingerClient:
|
||||
log = logging.getLogger("zuul.FingerClient")
|
||||
|
||||
def __init__(self, server, port, ssl_key=None, ssl_cert=None, ssl_ca=None):
|
||||
self.log.debug("Connecting to gearman at %s:%s" % (server, port))
|
||||
self.gearman = gear.Client()
|
||||
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
|
||||
keepalive=True, tcp_keepidle=60,
|
||||
tcp_keepintvl=30, tcp_keepcnt=5)
|
||||
self.log.debug("Waiting for gearman")
|
||||
self.gearman.waitForServer()
|
||||
|
||||
def submitJob(self, name, data):
|
||||
self.log.debug("Submitting job %s with data %s" % (name, data))
|
||||
job = gear.TextJob(name,
|
||||
json.dumps(data),
|
||||
unique=str(time.time()))
|
||||
self.gearman.submitJob(job, timeout=300)
|
||||
|
||||
self.log.debug("Waiting for job completion")
|
||||
while not job.complete:
|
||||
time.sleep(0.1)
|
||||
if job.exception:
|
||||
raise RPCFailure(job.exception)
|
||||
self.log.debug("Job complete, success: %s" % (not job.failure))
|
||||
return job
|
||||
|
||||
def shutdown(self):
|
||||
self.gearman.shutdown()
|
||||
|
||||
def get_fingergw_in_zone(self, zone):
|
||||
job_name = 'fingergw:info:%s' % zone
|
||||
functions = getGearmanFunctions(self.gearman)
|
||||
if job_name not in functions:
|
||||
# There is no matching fingergw
|
||||
self.log.warning('No fingergw found in zone %s', zone)
|
||||
return None
|
||||
|
||||
job = self.submitJob(job_name, {})
|
||||
if job.failure:
|
||||
self.log.warning('Failed to get fingergw info from zone %s: '
|
||||
'%s', zone, job)
|
||||
return None
|
||||
else:
|
||||
return json.loads(job.data[0])
|
||||
|
|
|
@ -260,6 +260,11 @@ class RPCListener(RPCListenerBase):
|
|||
|
||||
job.sendWorkComplete(json.dumps(running_items))
|
||||
|
||||
def _get_fingergw_in_zone(self, zone):
|
||||
for gw in self.sched.component_registry.all('fingergw'):
|
||||
if gw.zone == zone:
|
||||
return gw
|
||||
|
||||
def handle_get_job_log_stream_address(self, job):
|
||||
# TODO: map log files to ports. Currently there is only one
|
||||
# log stream for a given job. But many jobs produce many
|
||||
|
@ -287,26 +292,28 @@ class RPCListener(RPCListenerBase):
|
|||
# If zone and worker zone are both given check if we need to route
|
||||
# via a finger gateway in that zone.
|
||||
source_zone = args.get('source_zone')
|
||||
if (source_zone and build.worker.zone and
|
||||
source_zone != build.worker.zone):
|
||||
info = self.sched.finger_client.get_fingergw_in_zone(
|
||||
build.worker.zone)
|
||||
if (build.worker.zone and source_zone != build.worker.zone):
|
||||
info = self._get_fingergw_in_zone(build.worker.zone)
|
||||
if info:
|
||||
job_log_stream_address['server'] = info['server']
|
||||
job_log_stream_address['port'] = info['port']
|
||||
use_ssl = info.get('use_ssl')
|
||||
job_log_stream_address['server'] = info.hostname
|
||||
job_log_stream_address['port'] = info.public_port
|
||||
use_ssl = getattr(info, 'use_ssl', False)
|
||||
if use_ssl:
|
||||
job_log_stream_address['use_ssl'] = use_ssl
|
||||
self.log.debug('Source and worker zone are different, '
|
||||
'routing via %s:%s', info['server'],
|
||||
info['port'])
|
||||
self.log.debug('Source (%s) and worker (%s) zone '
|
||||
'are different, routing via %s:%s',
|
||||
source_zone, build.worker.zone,
|
||||
info.hostname, info.public_port)
|
||||
else:
|
||||
self.log.warning('Source and worker zone are different '
|
||||
'but no fingergw in target zone found. '
|
||||
'Falling back to direct connection.')
|
||||
self.log.warning('Source (%s) and worker (%s) zone '
|
||||
'are different but no fingergw in '
|
||||
'target zone found. '
|
||||
'Falling back to direct connection.',
|
||||
source_zone, build.worker.zone)
|
||||
else:
|
||||
self.log.debug('Source or worker zone undefined or equal, no'
|
||||
' routing is needed.')
|
||||
self.log.debug('Source (%s) or worker zone (%s) undefined '
|
||||
'or equal, no routing is needed.',
|
||||
source_zone, build.worker.zone)
|
||||
|
||||
if 'server' not in job_log_stream_address:
|
||||
job_log_stream_address['server'] = build.worker.hostname
|
||||
|
|
|
@ -36,7 +36,6 @@ from zuul import rpclistener
|
|||
from zuul.lib import commandsocket
|
||||
from zuul.lib.ansible import AnsibleManager
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.fingergw import FingerClient
|
||||
from zuul.lib.gear_utils import getGearmanFunctions
|
||||
from zuul.lib.keystorage import FileKeyStorage, ZooKeeperKeyStorage
|
||||
from zuul.lib.logutil import get_annotated_logger
|
||||
|
@ -223,14 +222,6 @@ class Scheduler(threading.Thread):
|
|||
self.nodepool = nodepool.Nodepool(
|
||||
self.zk_client, self.hostname, self.statsd, self)
|
||||
|
||||
self.gear_server = get_default(config, 'gearman', 'server')
|
||||
self.gear_port = get_default(config, 'gearman', 'port', 4730)
|
||||
self.gear_ssl_key = get_default(config, 'gearman', 'ssl_key')
|
||||
self.gear_ssl_cert = get_default(config, 'gearman', 'ssl_cert')
|
||||
self.gear_ssl_ca = get_default(config, 'gearman', 'ssl_ca')
|
||||
|
||||
self.finger_client = None
|
||||
|
||||
def start(self):
|
||||
super(Scheduler, self).start()
|
||||
self.keystore = ZooKeeperKeyStorage(
|
||||
|
@ -246,10 +237,6 @@ class Scheduler(threading.Thread):
|
|||
self.command_thread.daemon = True
|
||||
self.command_thread.start()
|
||||
|
||||
self.finger_client = FingerClient(
|
||||
self.gear_server, self.gear_port, self.gear_ssl_key,
|
||||
self.gear_ssl_cert, self.gear_ssl_ca)
|
||||
|
||||
self.rpc.start()
|
||||
self.rpc_slow.start()
|
||||
self.stats_thread.start()
|
||||
|
@ -270,7 +257,6 @@ class Scheduler(threading.Thread):
|
|||
self.rpc_slow.stop()
|
||||
self.rpc_slow.join()
|
||||
self.stop_repl()
|
||||
self.finger_client.shutdown()
|
||||
self._command_running = False
|
||||
self.command_socket.stop()
|
||||
self.command_thread.join()
|
||||
|
|
|
@ -143,6 +143,14 @@ class MergerComponent(BaseComponent):
|
|||
class FingerGatewayComponent(BaseComponent):
|
||||
kind = "fingergw"
|
||||
|
||||
def __init__(self, client, hostname):
|
||||
super().__init__(client, hostname)
|
||||
self.initial_state = {
|
||||
"zone": None,
|
||||
"public_port": None,
|
||||
}
|
||||
self.content.update(self.initial_state)
|
||||
|
||||
|
||||
class WebComponent(BaseComponent):
|
||||
kind = "web"
|
||||
|
|
Loading…
Reference in New Issue