Route streams to different zones via finger gateway

In some distributed deployments we need to route traffic via single
entry points that need to dispatch the traffic. For this use case make
all components aware of their zone so it is possible to compute if
traffic needs to go via an intermediate finger gateway or not.

Therefore we register the gearman function 'fingergw:info:<zone>' if
the fingergw is zoned. That way the scheduler will be able to route
streams from different zones via finger gateways that are responsible
for their zone.

Change-Id: I655427283205ea02de6f0f271b4aa5092ac05278
This commit is contained in:
Tobias Henkel 2019-06-12 16:33:28 +02:00 committed by Simon Westphahl
parent 46d0ed8e8f
commit 5c4e8d7ddd
13 changed files with 460 additions and 67 deletions

View File

@ -1036,6 +1036,20 @@ sections of ``zuul.conf`` are used by the web server:
The Cache-Control max-age response header value for static files served
by the zuul-web. Set to 0 during development to disable Cache-Control.
.. attr:: zone
The zone in which zuul-web is deployed. This is only needed if there are
executors with different zones in the environment and not all executors
are directly addressable from zuul-web. This can be the case within a
k8s based zuul deployment spread over multiple sites. The zone defines
the zone where the executors are directly adressable. Live log streaming
will go directly to the executors of the same zone and be routed to
a finger gateway of the target zone of the zones are different. The
finger gateway of the other zone is a central entrypoint for all live
log streams of that zone.
If this is used the finger gateways should be configured accordingly.
.. _web-server-tenant-scoped-api:
Enabling tenant-scoped access to privileged actions
@ -1290,6 +1304,13 @@ sections of ``zuul.conf`` are used by the finger gateway:
user during startup. It is recommended to set this option to an
unprivileged user.
.. attr:: zone
The zone where the finger gateway is located. This is only needed for
live log streaming if the zuul deployment is spread over multiple
zones without the ability to directly connect to all executors from
zuul-web. See :attr:`executor.zone` for further information.
Operation
~~~~~~~~~

View File

@ -0,0 +1,7 @@
---
features:
- |
Zuul now can route live log streams via finger gateways to make it possible
to distribute executors over multiple datacenters without the possibility
to directly contact every executor from within zuul-web. This is typical
the case in an k8s based deployment.

View File

@ -1,7 +1,7 @@
# NOTE(Shrews): Do not run any tasks that will need zuul_console to stream
# output because that will not work. Since we just need any output in our
# ansible log, the test coordination tasks should be sufficient.
- hosts: all
- hosts: localhost
tasks:
- debug: var=waitpath

View File

@ -24,3 +24,7 @@
vars:
waitpath: '{{zuul._test.test_root}}/builds/{{zuul.build}}/test_wait'
run: playbooks/python27.yaml
nodeset:
nodes:
- name: controller
label: ubuntu-trusty

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import configparser
from zuul.lib.fingergw import FingerGateway
from zuul.zk import ZooKeeperClient
@ -99,14 +100,17 @@ class TestComponentRegistry(ZuulTestCase):
self.assertComponentStopped("merger")
def test_fingergw_component(self):
self.config.read_dict({
config = configparser.ConfigParser()
config.read_dict(self.config)
config.read_dict({
'fingergw': {
'listen_address': self.host,
'port': '0',
'hostname': 'localhost',
}
})
gateway = FingerGateway(
self.config,
config,
command_socket=None,
pid_file=None
)
@ -116,7 +120,6 @@ class TestComponentRegistry(ZuulTestCase):
self.assertComponentState("fingergw", BaseComponent.RUNNING)
finally:
gateway.stop()
gateway.wait()
self.assertComponentStopped("fingergw")

View File

@ -11,7 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import configparser
import io
import logging
import json
@ -32,6 +32,8 @@ from tests.base import iterate_timeout, ZuulWebFixture
from ws4py.client import WebSocketBaseClient
from zuul.lib.gear_utils import getGearmanFunctions
class WSClient(WebSocketBaseClient):
def __init__(self, port, build_uuid):
@ -95,17 +97,17 @@ class TestLogStreamer(tests.base.BaseTestCase):
s.close()
class TestStreaming(tests.base.AnsibleZuulTestCase):
class TestStreamingBase(tests.base.AnsibleZuulTestCase):
tenant_config_file = 'config/streamer/main.yaml'
log = logging.getLogger("zuul.test_streaming")
def setUp(self):
super(TestStreaming, self).setUp()
super().setUp()
self.host = '::'
self.streamer = None
self.stop_streamer = False
self.streaming_data = ''
self.streaming_data = {}
self.test_streaming_event = threading.Event()
def stopStreamer(self):
@ -124,16 +126,78 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
s.sendall(req.encode('utf-8'))
self.test_streaming_event.set()
self.streaming_data.setdefault(None, '')
while not self.stop_streamer:
data = s.recv(2048)
if not data:
break
self.streaming_data += data.decode('utf-8')
self.streaming_data[None] += data.decode('utf-8')
s.shutdown(socket.SHUT_RDWR)
s.close()
self.streamer.stop()
def runFingerClient(self, build_uuid, gateway_address, event, name=None):
# Wait until the gateway is started
for x in iterate_timeout(30, "finger client to start"):
try:
# NOTE(Shrews): This causes the gateway to begin to handle
# a request for which it never receives data, and thus
# causes the getCommand() method to timeout (seen in the
# test results, but is harmless).
with socket.create_connection(gateway_address) as s:
break
except ConnectionRefusedError:
pass
self.streaming_data[name] = ''
with socket.create_connection(gateway_address) as s:
msg = "%s\r\n" % build_uuid
s.sendall(msg.encode('utf-8'))
event.set() # notify we are connected and req sent
while True:
data = s.recv(1024)
if not data:
break
self.streaming_data[name] += data.decode('utf-8')
s.shutdown(socket.SHUT_RDWR)
def runFingerGateway(self, zone=None):
self.log.info('Starting fingergw with zone %s', zone)
config = configparser.ConfigParser()
config.read_dict(self.config)
config.read_dict({
'fingergw': {
'listen_address': self.host,
'port': '0',
'hostname': 'localhost',
}
})
if zone:
config.set('fingergw', 'zone', zone)
gateway = FingerGateway(
config,
command_socket=None,
pid_file=None
)
gateway.history = []
gateway.start()
self.addCleanup(gateway.stop)
if zone:
for _ in iterate_timeout(20, 'fingergw is registered'):
functions = getGearmanFunctions(gateway.gearworker.gearman)
jobname = 'fingergw:info:%s' % zone
if jobname in functions:
break
gateway_port = gateway.server.socket.getsockname()[1]
return gateway, (self.host, gateway_port)
class TestStreaming(TestStreamingBase):
def test_streaming(self):
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
@ -195,14 +259,14 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
logfile.close()
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
self.assertEqual(file_contents, self.streaming_data)
self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data[None])
self.assertEqual(file_contents, self.streaming_data[None])
# Check that we logged a multiline debug message
pattern = (r'^\d\d\d\d-\d\d-\d\d \d\d:\d\d\:\d\d\.\d\d\d\d\d\d \| '
r'Debug Test Token String$')
r = re.compile(pattern, re.MULTILINE)
match = r.search(self.streaming_data)
match = r.search(self.streaming_data[None])
self.assertNotEqual(match, None)
def runWSClient(self, port, build_uuid):
@ -210,30 +274,6 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
client.event.wait()
return client
def runFingerClient(self, build_uuid, gateway_address, event):
# Wait until the gateway is started
for x in iterate_timeout(30, "finger client to start"):
try:
# NOTE(Shrews): This causes the gateway to begin to handle
# a request for which it never receives data, and thus
# causes the getCommand() method to timeout (seen in the
# test results, but is harmless).
with socket.create_connection(gateway_address) as s:
break
except ConnectionRefusedError:
pass
with socket.create_connection(gateway_address) as s:
msg = "%s\r\n" % build_uuid
s.sendall(msg.encode('utf-8'))
event.set() # notify we are connected and req sent
while True:
data = s.recv(1024)
if not data:
break
self.streaming_data += data.decode('utf-8')
s.shutdown(socket.SHUT_RDWR)
def test_decode_boundaries(self):
'''
Test multi-byte characters crossing read buffer boundaries.
@ -522,23 +562,7 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
self.addCleanup(logfile.close)
# Start the finger gateway daemon
self.config.read_dict({
'fingergw': {
'listen_address': self.host,
'port': '0',
}
})
gateway = FingerGateway(
self.config,
command_socket=None,
pid_file=None
)
gateway.start()
self.addCleanup(gateway.stop)
gateway_port = gateway.server.socket.getsockname()[1]
gateway_address = (self.host, gateway_port)
_, gateway_address = self.runFingerGateway()
# Start a thread with the finger client
finger_client_event = threading.Event()
@ -563,5 +587,188 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
file_contents = logfile.read()
logfile.close()
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
self.assertEqual(file_contents, self.streaming_data)
self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data[None])
self.assertEqual(file_contents, self.streaming_data[None])
class CountingFingerRequestHandler(zuul.lib.fingergw.RequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# if not hasattr(self.fingergw, 'history'):
# self.fingergw.history = []
def _fingerClient(self, server, port, build_uuid):
self.fingergw.history.append(build_uuid)
super()._fingerClient(server, port, build_uuid)
class TestStreamingZones(TestStreamingBase):
def setUp(self):
super().setUp()
self.fake_nodepool.attributes = {'executor-zone': 'eu-central'}
zuul.lib.fingergw.FingerGateway.handler_class = \
CountingFingerRequestHandler
def setup_config(self, config_file: str):
config = super().setup_config(config_file)
config.set('executor', 'zone', 'eu-central')
return config
def _run_finger_client(self, build, address, name):
# Start a thread with the finger client
finger_client_event = threading.Event()
self.finger_client_results = ''
finger_client_thread = threading.Thread(
target=self.runFingerClient,
args=(build.uuid, address, finger_client_event),
kwargs={'name': name}
)
finger_client_thread.start()
finger_client_event.wait()
return finger_client_thread
def test_finger_gateway(self):
# Start the finger streamer daemon
streamer = zuul.lib.log_streamer.LogStreamer(
self.host, 0, self.executor_server.jobdir_root)
self.addCleanup(streamer.stop)
finger_port = streamer.server.socket.getsockname()[1]
# Need to set the streaming port before submitting the job
self.executor_server.log_streaming_port = finger_port
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
# We don't have any real synchronization for the ansible jobs, so
# just wait until we get our running build.
for x in iterate_timeout(30, "build"):
if len(self.builds):
break
build = self.builds[0]
self.assertEqual(build.name, 'python27')
build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
for x in iterate_timeout(30, "build dir"):
if os.path.exists(build_dir):
break
# Need to wait to make sure that jobdir gets set
for x in iterate_timeout(30, "jobdir"):
if build.jobdir is not None:
break
# Wait for the job to begin running and create the ansible log file.
# The job waits to complete until the flag file exists, so we can
# safely access the log here. We only open it (to force a file handle
# to be kept open for it after the job finishes) but wait to read the
# contents until the job is done.
ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
for x in iterate_timeout(30, "ansible log"):
if os.path.exists(ansible_log):
break
logfile = open(ansible_log, 'r')
self.addCleanup(logfile.close)
def wait_for_stream(name):
for x in iterate_timeout(30, "incoming streaming data"):
if len(self.streaming_data.get(name, '')) > 0:
break
# Start the finger gateway daemons
try:
gateway_unzoned, gateway_unzoned_address = self.runFingerGateway()
gateway_us_west, gateway_us_west_address = self.runFingerGateway(
zone='us-west')
except Exception:
self.log.exception("Failed to run finger gateway")
raise
# This finger client runs against a finger gateway in a different zone
# while there is no gateway in the worker zone yet. This should work.
finger_client_us_west_alone = self._run_finger_client(
build, gateway_us_west_address, name='us-west-alone')
# The stream must go only via gateway_us_west
wait_for_stream('us-west-alone')
self.assertEqual(0, len(gateway_unzoned.history))
self.assertEqual(1, len(gateway_us_west.history))
gateway_unzoned.history.clear()
gateway_us_west.history.clear()
# This finger client runs against an unzoned finger gateway
finger_client_unzoned = self._run_finger_client(
build, gateway_unzoned_address, name='unzoned')
wait_for_stream('unzoned')
self.assertEqual(1, len(gateway_unzoned.history))
self.assertEqual(0, len(gateway_us_west.history))
gateway_unzoned.history.clear()
gateway_us_west.history.clear()
# Now start a finger gateway in the target zone.
gateway_eu_central, gateway_eu_central_address = self.runFingerGateway(
zone='eu-central')
# This finger client runs against a finger gateway in a different zone
# while there is a gateway in the worker zone. This should route via
# the gateway in the worker zone.
finger_client_us_west = self._run_finger_client(
build, gateway_us_west_address, name='us-west')
# The stream must go only via gateway_us_west
wait_for_stream('us-west')
self.assertEqual(0, len(gateway_unzoned.history))
self.assertEqual(1, len(gateway_eu_central.history))
self.assertEqual(1, len(gateway_us_west.history))
gateway_unzoned.history.clear()
gateway_eu_central.history.clear()
gateway_us_west.history.clear()
# This finger client runs against an unzoned finger gateway while there
# is a target finger client. As it is unzoned it should not route via
# The finger gateway in eu-central.
finger_client_unzoned2 = self._run_finger_client(
build, gateway_unzoned_address, name='unzoned2')
wait_for_stream('unzoned2')
self.assertEqual(1, len(gateway_unzoned.history))
self.assertEqual(0, len(gateway_eu_central.history))
self.assertEqual(0, len(gateway_us_west.history))
gateway_unzoned.history.clear()
gateway_eu_central.history.clear()
gateway_us_west.history.clear()
# This finger client runs against the target finger gateway.
finger_client_eu_central = self._run_finger_client(
build, gateway_eu_central_address, name='eu-central')
wait_for_stream('eu-central')
self.assertEqual(0, len(gateway_unzoned.history))
self.assertEqual(1, len(gateway_eu_central.history))
self.assertEqual(0, len(gateway_us_west.history))
gateway_unzoned.history.clear()
gateway_eu_central.history.clear()
gateway_us_west.history.clear()
# Allow the job to complete
flag_file = os.path.join(build_dir, 'test_wait')
open(flag_file, 'w').close()
# Wait for the finger client to complete, which it should when
# it's received the full log.
finger_client_us_west_alone.join()
finger_client_us_west.join()
finger_client_eu_central.join()
finger_client_unzoned.join()
finger_client_unzoned2.join()
self.waitUntilSettled()
file_contents = logfile.read()
logfile.close()
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
self.log.debug("\n\nStreamed: %s\n\n",
self.streaming_data['us-west-alone'])
self.assertEqual(file_contents, self.streaming_data['us-west-alone'])
self.assertEqual(file_contents, self.streaming_data['us-west'])
self.assertEqual(file_contents, self.streaming_data['unzoned'])
self.assertEqual(file_contents, self.streaming_data['unzoned2'])
self.assertEqual(file_contents, self.streaming_data['eu-central'])

View File

@ -1001,7 +1001,7 @@ class AnsibleJob(object):
time.monotonic() - self.time_starting_build))
def _base_job_data(self):
return {
data = {
# TODO(mordred) worker_name is needed as a unique name for the
# client to use for cancelling jobs on an executor. It's
# defaulting to the hostname for now, but in the future we
@ -1011,6 +1011,9 @@ class AnsibleJob(object):
'worker_hostname': self.executor_server.hostname,
'worker_log_port': self.executor_server.log_streaming_port,
}
if self.executor_server.zone:
data['worker_zone'] = self.executor_server.zone
return data
def _send_aborted(self):
result = dict(result='ABORTED')

View File

@ -13,20 +13,26 @@
# under the License.
import functools
import json
import logging
import socket
import threading
import time
from configparser import ConfigParser
from typing import Optional
import gear
import zuul.rpcclient
from zuul.lib import streamer_utils
from zuul.lib.commandsocket import CommandSocket
from zuul.lib.config import get_default
from zuul.lib.gear_utils import getGearmanFunctions
from zuul.lib.gearworker import ZuulGearWorker
from zuul.rpcclient import RPCFailure
from zuul.zk import ZooKeeperClient
from zuul.zk.components import FingerGatewayComponent
COMMANDS = ['stop']
@ -38,7 +44,8 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler):
log = logging.getLogger("zuul.fingergw")
def __init__(self, *args, **kwargs):
self.rpc = kwargs.pop('rpc')
self.fingergw = kwargs.pop('fingergw')
super(RequestHandler, self).__init__(*args, **kwargs)
def _fingerClient(self, server, port, build_uuid):
@ -72,7 +79,8 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler):
port = None
try:
build_uuid = self.getCommand()
port_location = self.rpc.get_job_log_stream_address(build_uuid)
port_location = self.fingergw.rpc.get_job_log_stream_address(
build_uuid, source_zone=self.fingergw.zone)
if not port_location:
msg = 'Invalid build UUID %s' % build_uuid
@ -104,6 +112,9 @@ class FingerGateway(object):
'''
log = logging.getLogger("zuul.fingergw")
handler_class = RequestHandler
gearworker: Optional[ZuulGearWorker]
def __init__(
self,
@ -137,10 +148,12 @@ class FingerGateway(object):
self.gear_ssl_ca = gear_ssl_ca
host = get_default(config, 'fingergw', 'listen_address', '::')
port = int(get_default(config, 'fingergw', 'port', 79))
self.port = int(get_default(config, 'fingergw', 'port', 79))
self.public_port = int(get_default(
config, 'fingergw', 'public_port', self.port))
user = get_default(config, 'fingergw', 'user', None)
self.address = (host, port)
self.address = (host, self.port)
self.user = user
self.pid_file = pid_file
@ -157,14 +170,40 @@ class FingerGateway(object):
stop=self.stop,
)
self.hostname = get_default(config, 'fingergw', 'hostname',
socket.getfqdn())
self.zone = get_default(config, 'fingergw', 'zone')
if self.zone is not None:
jobs = {
'fingergw:info:%s' % self.zone: self.handle_info,
}
self.gearworker = ZuulGearWorker(
'Finger Gateway',
'zuul.fingergw',
'fingergw-gearman-worker',
config,
jobs)
else:
self.gearworker = None
self.zk_client = ZooKeeperClient.fromConfig(config)
self.zk_client.connect()
self.hostname = socket.getfqdn()
self.component_info = FingerGatewayComponent(
self.zk_client, self.hostname
)
self.component_info.register()
def handle_info(self, job):
self.log.debug('Got %s job: %s', job.name, job.unique)
info = {
'server': self.hostname,
'port': self.public_port,
}
if self.zone:
info['zone'] = self.zone
job.sendWorkComplete(json.dumps(info))
def _runCommand(self):
while self.command_running:
try:
@ -194,10 +233,14 @@ class FingerGateway(object):
self.server = streamer_utils.CustomThreadingTCPServer(
self.address,
functools.partial(RequestHandler, rpc=self.rpc),
functools.partial(self.handler_class, fingergw=self),
user=self.user,
pid_file=self.pid_file)
# Update port that we really use if we configured a port of 0
if self.public_port == 0:
self.public_port = self.server.socket.getsockname()[1]
# Start the command processor after the server and privilege drop
if self.command_socket_path:
self.log.debug("Starting command processor")
@ -215,10 +258,20 @@ class FingerGateway(object):
self.server_thread.daemon = True
self.server_thread.start()
self.component_info.state = self.component_info.RUNNING
# Register this finger gateway in case we are zoned
if self.gearworker:
self.log.info('Starting gearworker')
self.gearworker.start()
self.log.info("Finger gateway is started")
def stop(self):
self.component_info.state = self.component_info.STOPPED
if self.gearworker:
self.gearworker.stop()
if self.server:
try:
self.server.shutdown()
@ -250,7 +303,55 @@ class FingerGateway(object):
'''
Wait on the gateway to shutdown.
'''
self.gearworker.join()
self.server_thread.join()
if self.command_thread:
self.command_thread.join()
class FingerClient:
log = logging.getLogger("zuul.FingerClient")
def __init__(self, server, port, ssl_key=None, ssl_cert=None, ssl_ca=None):
self.log.debug("Connecting to gearman at %s:%s" % (server, port))
self.gearman = gear.Client()
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
keepalive=True, tcp_keepidle=60,
tcp_keepintvl=30, tcp_keepcnt=5)
self.log.debug("Waiting for gearman")
self.gearman.waitForServer()
def submitJob(self, name, data):
self.log.debug("Submitting job %s with data %s" % (name, data))
job = gear.TextJob(name,
json.dumps(data),
unique=str(time.time()))
self.gearman.submitJob(job, timeout=300)
self.log.debug("Waiting for job completion")
while not job.complete:
time.sleep(0.1)
if job.exception:
raise RPCFailure(job.exception)
self.log.debug("Job complete, success: %s" % (not job.failure))
return job
def shutdown(self):
self.gearman.shutdown()
def get_fingergw_in_zone(self, zone):
job_name = 'fingergw:info:%s' % zone
functions = getGearmanFunctions(self.gearman)
if job_name not in functions:
# There is no matching fingergw
self.log.warning('No fingergw found in zone %s', zone)
return None
job = self.submitJob(job_name, {})
if job.failure:
self.log.warning('Failed to get fingergw info from zone %s: '
'%s', zone, job)
return None
else:
return json.loads(job.data[0])

View File

@ -2075,12 +2075,14 @@ class Worker(object):
self.name = "Unknown"
self.hostname = None
self.log_port = None
self.zone = None
def updateFromData(self, data):
"""Update worker information if contained in the WORK_DATA response."""
self.name = data.get('worker_name', self.name)
self.hostname = data.get('worker_hostname', self.hostname)
self.log_port = data.get('worker_log_port', self.log_port)
self.zone = data.get('worker_zone', self.zone)
def __repr__(self):
return '<Worker %s>' % self.name

View File

@ -136,8 +136,11 @@ class RPCClient(object):
def shutdown(self):
self.gearman.shutdown()
def get_job_log_stream_address(self, uuid, logfile='console.log'):
def get_job_log_stream_address(self, uuid, logfile='console.log',
source_zone=None):
data = {'uuid': uuid, 'logfile': logfile}
if source_zone:
data['source_zone'] = source_zone
job = self.submitJob('zuul:get_job_log_stream_address', data)
if job.failure:
return False

View File

@ -277,13 +277,40 @@ class RPCListener(RPCListenerBase):
return None
args = json.loads(job.arguments)
self.log.debug('Handle get_job_log_stream_address with arguments %s',
job.arguments)
uuid = args['uuid']
# TODO: logfile = args['logfile']
job_log_stream_address = {}
build = find_build(uuid)
if build:
job_log_stream_address['server'] = build.worker.hostname
job_log_stream_address['port'] = build.worker.log_port
# If zone and worker zone are both given check if we need to route
# via a finger gateway in that zone.
source_zone = args.get('source_zone')
if (source_zone and build.worker.zone and
source_zone != build.worker.zone):
info = self.sched.finger_client.get_fingergw_in_zone(
build.worker.zone)
if info:
job_log_stream_address['server'] = info['server']
job_log_stream_address['port'] = info['port']
use_ssl = info.get('use_ssl')
if use_ssl:
job_log_stream_address['use_ssl'] = use_ssl
self.log.debug('Source and worker zone are different, '
'routing via %s:%s', info['server'],
info['port'])
else:
self.log.warning('Source and worker zone are different '
'but no fingergw in target zone found. '
'Falling back to direct connection.')
else:
self.log.debug('Source or worker zone undefined or equal, no'
' routing is needed.')
if 'server' not in job_log_stream_address:
job_log_stream_address['server'] = build.worker.hostname
job_log_stream_address['port'] = build.worker.log_port
job.sendWorkComplete(json.dumps(job_log_stream_address))
def _is_authorized(self, tenant, claims):

View File

@ -36,6 +36,7 @@ from zuul import rpclistener
from zuul.lib import commandsocket
from zuul.lib.ansible import AnsibleManager
from zuul.lib.config import get_default
from zuul.lib.fingergw import FingerClient
from zuul.lib.gear_utils import getGearmanFunctions
from zuul.lib.keystorage import FileKeyStorage, ZooKeeperKeyStorage
from zuul.lib.logutil import get_annotated_logger
@ -222,6 +223,14 @@ class Scheduler(threading.Thread):
self.nodepool = nodepool.Nodepool(
self.zk_client, self.hostname, self.statsd, self)
self.gear_server = get_default(config, 'gearman', 'server')
self.gear_port = get_default(config, 'gearman', 'port', 4730)
self.gear_ssl_key = get_default(config, 'gearman', 'ssl_key')
self.gear_ssl_cert = get_default(config, 'gearman', 'ssl_cert')
self.gear_ssl_ca = get_default(config, 'gearman', 'ssl_ca')
self.finger_client = None
def start(self):
super(Scheduler, self).start()
self.keystore = ZooKeeperKeyStorage(
@ -237,6 +246,10 @@ class Scheduler(threading.Thread):
self.command_thread.daemon = True
self.command_thread.start()
self.finger_client = FingerClient(
self.gear_server, self.gear_port, self.gear_ssl_key,
self.gear_ssl_cert, self.gear_ssl_ca)
self.rpc.start()
self.rpc_slow.start()
self.stats_thread.start()
@ -257,6 +270,7 @@ class Scheduler(threading.Thread):
self.rpc_slow.stop()
self.rpc_slow.join()
self.stop_repl()
self.finger_client.shutdown()
self._command_running = False
self.command_socket.stop()
self.command_thread.join()

View File

@ -165,7 +165,7 @@ class LogStreamHandler(WebSocket):
key=key))
port_location = self.zuulweb.rpc.get_job_log_stream_address(
request['uuid'])
request['uuid'], source_zone=self.zuulweb.zone)
if not port_location:
return self.logClose(4011, "Error with Gearman")
@ -1272,6 +1272,7 @@ class ZuulWeb(object):
self.connections = connections
self.authenticators = authenticators
self.stream_manager = StreamManager()
self.zone = get_default(self.config, 'web', 'zone')
command_socket = get_default(
self.config, 'web', 'command_socket',