Merge "Route streams to different zones via finger gateway"
This commit is contained in:
commit
306adb6674
|
@ -1042,6 +1042,20 @@ sections of ``zuul.conf`` are used by the web server:
|
|||
The Cache-Control max-age response header value for static files served
|
||||
by the zuul-web. Set to 0 during development to disable Cache-Control.
|
||||
|
||||
.. attr:: zone
|
||||
|
||||
The zone in which zuul-web is deployed. This is only needed if there are
|
||||
executors with different zones in the environment and not all executors
|
||||
are directly addressable from zuul-web. This can be the case within a
|
||||
k8s based zuul deployment spread over multiple sites. The zone defines
|
||||
the zone where the executors are directly adressable. Live log streaming
|
||||
will go directly to the executors of the same zone and be routed to
|
||||
a finger gateway of the target zone of the zones are different. The
|
||||
finger gateway of the other zone is a central entrypoint for all live
|
||||
log streams of that zone.
|
||||
|
||||
If this is used the finger gateways should be configured accordingly.
|
||||
|
||||
.. _web-server-tenant-scoped-api:
|
||||
|
||||
Enabling tenant-scoped access to privileged actions
|
||||
|
@ -1296,6 +1310,13 @@ sections of ``zuul.conf`` are used by the finger gateway:
|
|||
user during startup. It is recommended to set this option to an
|
||||
unprivileged user.
|
||||
|
||||
.. attr:: zone
|
||||
|
||||
The zone where the finger gateway is located. This is only needed for
|
||||
live log streaming if the zuul deployment is spread over multiple
|
||||
zones without the ability to directly connect to all executors from
|
||||
zuul-web. See :attr:`executor.zone` for further information.
|
||||
|
||||
Operation
|
||||
~~~~~~~~~
|
||||
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
Zuul now can route live log streams via finger gateways to make it possible
|
||||
to distribute executors over multiple datacenters without the possibility
|
||||
to directly contact every executor from within zuul-web. This is typical
|
||||
the case in an k8s based deployment.
|
|
@ -1,7 +1,7 @@
|
|||
# NOTE(Shrews): Do not run any tasks that will need zuul_console to stream
|
||||
# output because that will not work. Since we just need any output in our
|
||||
# ansible log, the test coordination tasks should be sufficient.
|
||||
- hosts: all
|
||||
- hosts: localhost
|
||||
tasks:
|
||||
- debug: var=waitpath
|
||||
|
||||
|
|
|
@ -24,3 +24,7 @@
|
|||
vars:
|
||||
waitpath: '{{zuul._test.test_root}}/builds/{{zuul.build}}/test_wait'
|
||||
run: playbooks/python27.yaml
|
||||
nodeset:
|
||||
nodes:
|
||||
- name: controller
|
||||
label: ubuntu-trusty
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import configparser
|
||||
|
||||
from zuul.lib.fingergw import FingerGateway
|
||||
from zuul.zk import ZooKeeperClient
|
||||
|
@ -109,14 +110,17 @@ class TestComponentRegistry(ZuulTestCase):
|
|||
raise
|
||||
|
||||
def test_fingergw_component(self):
|
||||
self.config.read_dict({
|
||||
config = configparser.ConfigParser()
|
||||
config.read_dict(self.config)
|
||||
config.read_dict({
|
||||
'fingergw': {
|
||||
'listen_address': self.host,
|
||||
'port': '0',
|
||||
'hostname': 'localhost',
|
||||
}
|
||||
})
|
||||
gateway = FingerGateway(
|
||||
self.config,
|
||||
config,
|
||||
command_socket=None,
|
||||
pid_file=None
|
||||
)
|
||||
|
@ -126,7 +130,6 @@ class TestComponentRegistry(ZuulTestCase):
|
|||
self.assertComponentState("fingergw", BaseComponent.RUNNING)
|
||||
finally:
|
||||
gateway.stop()
|
||||
gateway.wait()
|
||||
|
||||
self.assertComponentStopped("fingergw")
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import configparser
|
||||
import io
|
||||
import logging
|
||||
import json
|
||||
|
@ -32,6 +32,8 @@ from tests.base import iterate_timeout, ZuulWebFixture
|
|||
|
||||
from ws4py.client import WebSocketBaseClient
|
||||
|
||||
from zuul.lib.gear_utils import getGearmanFunctions
|
||||
|
||||
|
||||
class WSClient(WebSocketBaseClient):
|
||||
def __init__(self, port, build_uuid):
|
||||
|
@ -95,17 +97,17 @@ class TestLogStreamer(tests.base.BaseTestCase):
|
|||
s.close()
|
||||
|
||||
|
||||
class TestStreaming(tests.base.AnsibleZuulTestCase):
|
||||
class TestStreamingBase(tests.base.AnsibleZuulTestCase):
|
||||
|
||||
tenant_config_file = 'config/streamer/main.yaml'
|
||||
log = logging.getLogger("zuul.test_streaming")
|
||||
|
||||
def setUp(self):
|
||||
super(TestStreaming, self).setUp()
|
||||
super().setUp()
|
||||
self.host = '::'
|
||||
self.streamer = None
|
||||
self.stop_streamer = False
|
||||
self.streaming_data = ''
|
||||
self.streaming_data = {}
|
||||
self.test_streaming_event = threading.Event()
|
||||
|
||||
def stopStreamer(self):
|
||||
|
@ -124,16 +126,78 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
|
|||
s.sendall(req.encode('utf-8'))
|
||||
self.test_streaming_event.set()
|
||||
|
||||
self.streaming_data.setdefault(None, '')
|
||||
while not self.stop_streamer:
|
||||
data = s.recv(2048)
|
||||
if not data:
|
||||
break
|
||||
self.streaming_data += data.decode('utf-8')
|
||||
self.streaming_data[None] += data.decode('utf-8')
|
||||
|
||||
s.shutdown(socket.SHUT_RDWR)
|
||||
s.close()
|
||||
self.streamer.stop()
|
||||
|
||||
def runFingerClient(self, build_uuid, gateway_address, event, name=None):
|
||||
# Wait until the gateway is started
|
||||
for x in iterate_timeout(30, "finger client to start"):
|
||||
try:
|
||||
# NOTE(Shrews): This causes the gateway to begin to handle
|
||||
# a request for which it never receives data, and thus
|
||||
# causes the getCommand() method to timeout (seen in the
|
||||
# test results, but is harmless).
|
||||
with socket.create_connection(gateway_address) as s:
|
||||
break
|
||||
except ConnectionRefusedError:
|
||||
pass
|
||||
|
||||
self.streaming_data[name] = ''
|
||||
with socket.create_connection(gateway_address) as s:
|
||||
msg = "%s\r\n" % build_uuid
|
||||
s.sendall(msg.encode('utf-8'))
|
||||
event.set() # notify we are connected and req sent
|
||||
while True:
|
||||
data = s.recv(1024)
|
||||
if not data:
|
||||
break
|
||||
self.streaming_data[name] += data.decode('utf-8')
|
||||
s.shutdown(socket.SHUT_RDWR)
|
||||
|
||||
def runFingerGateway(self, zone=None):
|
||||
self.log.info('Starting fingergw with zone %s', zone)
|
||||
config = configparser.ConfigParser()
|
||||
config.read_dict(self.config)
|
||||
config.read_dict({
|
||||
'fingergw': {
|
||||
'listen_address': self.host,
|
||||
'port': '0',
|
||||
'hostname': 'localhost',
|
||||
}
|
||||
})
|
||||
if zone:
|
||||
config.set('fingergw', 'zone', zone)
|
||||
|
||||
gateway = FingerGateway(
|
||||
config,
|
||||
command_socket=None,
|
||||
pid_file=None
|
||||
)
|
||||
gateway.history = []
|
||||
gateway.start()
|
||||
self.addCleanup(gateway.stop)
|
||||
|
||||
if zone:
|
||||
for _ in iterate_timeout(20, 'fingergw is registered'):
|
||||
functions = getGearmanFunctions(gateway.gearworker.gearman)
|
||||
jobname = 'fingergw:info:%s' % zone
|
||||
if jobname in functions:
|
||||
break
|
||||
|
||||
gateway_port = gateway.server.socket.getsockname()[1]
|
||||
return gateway, (self.host, gateway_port)
|
||||
|
||||
|
||||
class TestStreaming(TestStreamingBase):
|
||||
|
||||
def test_streaming(self):
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
|
@ -195,14 +259,14 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
|
|||
logfile.close()
|
||||
|
||||
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
|
||||
self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
|
||||
self.assertEqual(file_contents, self.streaming_data)
|
||||
self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data[None])
|
||||
self.assertEqual(file_contents, self.streaming_data[None])
|
||||
|
||||
# Check that we logged a multiline debug message
|
||||
pattern = (r'^\d\d\d\d-\d\d-\d\d \d\d:\d\d\:\d\d\.\d\d\d\d\d\d \| '
|
||||
r'Debug Test Token String$')
|
||||
r = re.compile(pattern, re.MULTILINE)
|
||||
match = r.search(self.streaming_data)
|
||||
match = r.search(self.streaming_data[None])
|
||||
self.assertNotEqual(match, None)
|
||||
|
||||
def runWSClient(self, port, build_uuid):
|
||||
|
@ -210,30 +274,6 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
|
|||
client.event.wait()
|
||||
return client
|
||||
|
||||
def runFingerClient(self, build_uuid, gateway_address, event):
|
||||
# Wait until the gateway is started
|
||||
for x in iterate_timeout(30, "finger client to start"):
|
||||
try:
|
||||
# NOTE(Shrews): This causes the gateway to begin to handle
|
||||
# a request for which it never receives data, and thus
|
||||
# causes the getCommand() method to timeout (seen in the
|
||||
# test results, but is harmless).
|
||||
with socket.create_connection(gateway_address) as s:
|
||||
break
|
||||
except ConnectionRefusedError:
|
||||
pass
|
||||
|
||||
with socket.create_connection(gateway_address) as s:
|
||||
msg = "%s\r\n" % build_uuid
|
||||
s.sendall(msg.encode('utf-8'))
|
||||
event.set() # notify we are connected and req sent
|
||||
while True:
|
||||
data = s.recv(1024)
|
||||
if not data:
|
||||
break
|
||||
self.streaming_data += data.decode('utf-8')
|
||||
s.shutdown(socket.SHUT_RDWR)
|
||||
|
||||
def test_decode_boundaries(self):
|
||||
'''
|
||||
Test multi-byte characters crossing read buffer boundaries.
|
||||
|
@ -522,23 +562,7 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
|
|||
self.addCleanup(logfile.close)
|
||||
|
||||
# Start the finger gateway daemon
|
||||
self.config.read_dict({
|
||||
'fingergw': {
|
||||
'listen_address': self.host,
|
||||
'port': '0',
|
||||
}
|
||||
})
|
||||
|
||||
gateway = FingerGateway(
|
||||
self.config,
|
||||
command_socket=None,
|
||||
pid_file=None
|
||||
)
|
||||
gateway.start()
|
||||
self.addCleanup(gateway.stop)
|
||||
|
||||
gateway_port = gateway.server.socket.getsockname()[1]
|
||||
gateway_address = (self.host, gateway_port)
|
||||
_, gateway_address = self.runFingerGateway()
|
||||
|
||||
# Start a thread with the finger client
|
||||
finger_client_event = threading.Event()
|
||||
|
@ -563,5 +587,188 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
|
|||
file_contents = logfile.read()
|
||||
logfile.close()
|
||||
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
|
||||
self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
|
||||
self.assertEqual(file_contents, self.streaming_data)
|
||||
self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data[None])
|
||||
self.assertEqual(file_contents, self.streaming_data[None])
|
||||
|
||||
|
||||
class CountingFingerRequestHandler(zuul.lib.fingergw.RequestHandler):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
# if not hasattr(self.fingergw, 'history'):
|
||||
# self.fingergw.history = []
|
||||
|
||||
def _fingerClient(self, server, port, build_uuid):
|
||||
self.fingergw.history.append(build_uuid)
|
||||
super()._fingerClient(server, port, build_uuid)
|
||||
|
||||
|
||||
class TestStreamingZones(TestStreamingBase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.fake_nodepool.attributes = {'executor-zone': 'eu-central'}
|
||||
zuul.lib.fingergw.FingerGateway.handler_class = \
|
||||
CountingFingerRequestHandler
|
||||
|
||||
def setup_config(self, config_file: str):
|
||||
config = super().setup_config(config_file)
|
||||
config.set('executor', 'zone', 'eu-central')
|
||||
return config
|
||||
|
||||
def _run_finger_client(self, build, address, name):
|
||||
# Start a thread with the finger client
|
||||
finger_client_event = threading.Event()
|
||||
self.finger_client_results = ''
|
||||
finger_client_thread = threading.Thread(
|
||||
target=self.runFingerClient,
|
||||
args=(build.uuid, address, finger_client_event),
|
||||
kwargs={'name': name}
|
||||
)
|
||||
finger_client_thread.start()
|
||||
finger_client_event.wait()
|
||||
return finger_client_thread
|
||||
|
||||
def test_finger_gateway(self):
|
||||
# Start the finger streamer daemon
|
||||
streamer = zuul.lib.log_streamer.LogStreamer(
|
||||
self.host, 0, self.executor_server.jobdir_root)
|
||||
self.addCleanup(streamer.stop)
|
||||
finger_port = streamer.server.socket.getsockname()[1]
|
||||
|
||||
# Need to set the streaming port before submitting the job
|
||||
self.executor_server.log_streaming_port = finger_port
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
|
||||
# We don't have any real synchronization for the ansible jobs, so
|
||||
# just wait until we get our running build.
|
||||
for x in iterate_timeout(30, "build"):
|
||||
if len(self.builds):
|
||||
break
|
||||
build = self.builds[0]
|
||||
self.assertEqual(build.name, 'python27')
|
||||
|
||||
build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
|
||||
for x in iterate_timeout(30, "build dir"):
|
||||
if os.path.exists(build_dir):
|
||||
break
|
||||
|
||||
# Need to wait to make sure that jobdir gets set
|
||||
for x in iterate_timeout(30, "jobdir"):
|
||||
if build.jobdir is not None:
|
||||
break
|
||||
|
||||
# Wait for the job to begin running and create the ansible log file.
|
||||
# The job waits to complete until the flag file exists, so we can
|
||||
# safely access the log here. We only open it (to force a file handle
|
||||
# to be kept open for it after the job finishes) but wait to read the
|
||||
# contents until the job is done.
|
||||
ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
|
||||
for x in iterate_timeout(30, "ansible log"):
|
||||
if os.path.exists(ansible_log):
|
||||
break
|
||||
logfile = open(ansible_log, 'r')
|
||||
self.addCleanup(logfile.close)
|
||||
|
||||
def wait_for_stream(name):
|
||||
for x in iterate_timeout(30, "incoming streaming data"):
|
||||
if len(self.streaming_data.get(name, '')) > 0:
|
||||
break
|
||||
|
||||
# Start the finger gateway daemons
|
||||
try:
|
||||
gateway_unzoned, gateway_unzoned_address = self.runFingerGateway()
|
||||
gateway_us_west, gateway_us_west_address = self.runFingerGateway(
|
||||
zone='us-west')
|
||||
except Exception:
|
||||
self.log.exception("Failed to run finger gateway")
|
||||
raise
|
||||
|
||||
# This finger client runs against a finger gateway in a different zone
|
||||
# while there is no gateway in the worker zone yet. This should work.
|
||||
finger_client_us_west_alone = self._run_finger_client(
|
||||
build, gateway_us_west_address, name='us-west-alone')
|
||||
# The stream must go only via gateway_us_west
|
||||
wait_for_stream('us-west-alone')
|
||||
self.assertEqual(0, len(gateway_unzoned.history))
|
||||
self.assertEqual(1, len(gateway_us_west.history))
|
||||
gateway_unzoned.history.clear()
|
||||
gateway_us_west.history.clear()
|
||||
|
||||
# This finger client runs against an unzoned finger gateway
|
||||
finger_client_unzoned = self._run_finger_client(
|
||||
build, gateway_unzoned_address, name='unzoned')
|
||||
wait_for_stream('unzoned')
|
||||
self.assertEqual(1, len(gateway_unzoned.history))
|
||||
self.assertEqual(0, len(gateway_us_west.history))
|
||||
gateway_unzoned.history.clear()
|
||||
gateway_us_west.history.clear()
|
||||
|
||||
# Now start a finger gateway in the target zone.
|
||||
gateway_eu_central, gateway_eu_central_address = self.runFingerGateway(
|
||||
zone='eu-central')
|
||||
|
||||
# This finger client runs against a finger gateway in a different zone
|
||||
# while there is a gateway in the worker zone. This should route via
|
||||
# the gateway in the worker zone.
|
||||
finger_client_us_west = self._run_finger_client(
|
||||
build, gateway_us_west_address, name='us-west')
|
||||
# The stream must go only via gateway_us_west
|
||||
wait_for_stream('us-west')
|
||||
self.assertEqual(0, len(gateway_unzoned.history))
|
||||
self.assertEqual(1, len(gateway_eu_central.history))
|
||||
self.assertEqual(1, len(gateway_us_west.history))
|
||||
gateway_unzoned.history.clear()
|
||||
gateway_eu_central.history.clear()
|
||||
gateway_us_west.history.clear()
|
||||
|
||||
# This finger client runs against an unzoned finger gateway while there
|
||||
# is a target finger client. As it is unzoned it should not route via
|
||||
# The finger gateway in eu-central.
|
||||
finger_client_unzoned2 = self._run_finger_client(
|
||||
build, gateway_unzoned_address, name='unzoned2')
|
||||
wait_for_stream('unzoned2')
|
||||
self.assertEqual(1, len(gateway_unzoned.history))
|
||||
self.assertEqual(0, len(gateway_eu_central.history))
|
||||
self.assertEqual(0, len(gateway_us_west.history))
|
||||
gateway_unzoned.history.clear()
|
||||
gateway_eu_central.history.clear()
|
||||
gateway_us_west.history.clear()
|
||||
|
||||
# This finger client runs against the target finger gateway.
|
||||
finger_client_eu_central = self._run_finger_client(
|
||||
build, gateway_eu_central_address, name='eu-central')
|
||||
wait_for_stream('eu-central')
|
||||
self.assertEqual(0, len(gateway_unzoned.history))
|
||||
self.assertEqual(1, len(gateway_eu_central.history))
|
||||
self.assertEqual(0, len(gateway_us_west.history))
|
||||
gateway_unzoned.history.clear()
|
||||
gateway_eu_central.history.clear()
|
||||
gateway_us_west.history.clear()
|
||||
|
||||
# Allow the job to complete
|
||||
flag_file = os.path.join(build_dir, 'test_wait')
|
||||
open(flag_file, 'w').close()
|
||||
|
||||
# Wait for the finger client to complete, which it should when
|
||||
# it's received the full log.
|
||||
finger_client_us_west_alone.join()
|
||||
finger_client_us_west.join()
|
||||
finger_client_eu_central.join()
|
||||
finger_client_unzoned.join()
|
||||
finger_client_unzoned2.join()
|
||||
|
||||
self.waitUntilSettled()
|
||||
|
||||
file_contents = logfile.read()
|
||||
logfile.close()
|
||||
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
|
||||
self.log.debug("\n\nStreamed: %s\n\n",
|
||||
self.streaming_data['us-west-alone'])
|
||||
self.assertEqual(file_contents, self.streaming_data['us-west-alone'])
|
||||
self.assertEqual(file_contents, self.streaming_data['us-west'])
|
||||
self.assertEqual(file_contents, self.streaming_data['unzoned'])
|
||||
self.assertEqual(file_contents, self.streaming_data['unzoned2'])
|
||||
self.assertEqual(file_contents, self.streaming_data['eu-central'])
|
||||
|
|
|
@ -1144,7 +1144,7 @@ class AnsibleJob(object):
|
|||
)
|
||||
|
||||
def _base_job_data(self):
|
||||
return {
|
||||
data = {
|
||||
# TODO(mordred) worker_name is needed as a unique name for the
|
||||
# client to use for cancelling jobs on an executor. It's
|
||||
# defaulting to the hostname for now, but in the future we
|
||||
|
@ -1154,6 +1154,9 @@ class AnsibleJob(object):
|
|||
'worker_hostname': self.executor_server.hostname,
|
||||
'worker_log_port': self.executor_server.log_streaming_port,
|
||||
}
|
||||
if self.executor_server.zone:
|
||||
data['worker_zone'] = self.executor_server.zone
|
||||
return data
|
||||
|
||||
def _send_aborted(self):
|
||||
result = dict(result='ABORTED')
|
||||
|
|
|
@ -13,20 +13,26 @@
|
|||
# under the License.
|
||||
|
||||
import functools
|
||||
import json
|
||||
import logging
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
from configparser import ConfigParser
|
||||
from typing import Optional
|
||||
|
||||
import gear
|
||||
|
||||
import zuul.rpcclient
|
||||
from zuul.lib import streamer_utils
|
||||
from zuul.lib.commandsocket import CommandSocket
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.gear_utils import getGearmanFunctions
|
||||
from zuul.lib.gearworker import ZuulGearWorker
|
||||
from zuul.rpcclient import RPCFailure
|
||||
from zuul.zk import ZooKeeperClient
|
||||
from zuul.zk.components import FingerGatewayComponent
|
||||
|
||||
|
||||
COMMANDS = ['stop']
|
||||
|
||||
|
||||
|
@ -38,7 +44,8 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler):
|
|||
log = logging.getLogger("zuul.fingergw")
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.rpc = kwargs.pop('rpc')
|
||||
self.fingergw = kwargs.pop('fingergw')
|
||||
|
||||
super(RequestHandler, self).__init__(*args, **kwargs)
|
||||
|
||||
def _fingerClient(self, server, port, build_uuid):
|
||||
|
@ -72,7 +79,8 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler):
|
|||
port = None
|
||||
try:
|
||||
build_uuid = self.getCommand()
|
||||
port_location = self.rpc.get_job_log_stream_address(build_uuid)
|
||||
port_location = self.fingergw.rpc.get_job_log_stream_address(
|
||||
build_uuid, source_zone=self.fingergw.zone)
|
||||
|
||||
if not port_location:
|
||||
msg = 'Invalid build UUID %s' % build_uuid
|
||||
|
@ -104,6 +112,9 @@ class FingerGateway(object):
|
|||
'''
|
||||
|
||||
log = logging.getLogger("zuul.fingergw")
|
||||
handler_class = RequestHandler
|
||||
|
||||
gearworker: Optional[ZuulGearWorker]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -137,10 +148,12 @@ class FingerGateway(object):
|
|||
self.gear_ssl_ca = gear_ssl_ca
|
||||
|
||||
host = get_default(config, 'fingergw', 'listen_address', '::')
|
||||
port = int(get_default(config, 'fingergw', 'port', 79))
|
||||
self.port = int(get_default(config, 'fingergw', 'port', 79))
|
||||
self.public_port = int(get_default(
|
||||
config, 'fingergw', 'public_port', self.port))
|
||||
user = get_default(config, 'fingergw', 'user', None)
|
||||
|
||||
self.address = (host, port)
|
||||
self.address = (host, self.port)
|
||||
self.user = user
|
||||
self.pid_file = pid_file
|
||||
|
||||
|
@ -157,14 +170,40 @@ class FingerGateway(object):
|
|||
stop=self.stop,
|
||||
)
|
||||
|
||||
self.hostname = get_default(config, 'fingergw', 'hostname',
|
||||
socket.getfqdn())
|
||||
self.zone = get_default(config, 'fingergw', 'zone')
|
||||
|
||||
if self.zone is not None:
|
||||
jobs = {
|
||||
'fingergw:info:%s' % self.zone: self.handle_info,
|
||||
}
|
||||
self.gearworker = ZuulGearWorker(
|
||||
'Finger Gateway',
|
||||
'zuul.fingergw',
|
||||
'fingergw-gearman-worker',
|
||||
config,
|
||||
jobs)
|
||||
else:
|
||||
self.gearworker = None
|
||||
|
||||
self.zk_client = ZooKeeperClient.fromConfig(config)
|
||||
self.zk_client.connect()
|
||||
self.hostname = socket.getfqdn()
|
||||
self.component_info = FingerGatewayComponent(
|
||||
self.zk_client, self.hostname
|
||||
)
|
||||
self.component_info.register()
|
||||
|
||||
def handle_info(self, job):
|
||||
self.log.debug('Got %s job: %s', job.name, job.unique)
|
||||
info = {
|
||||
'server': self.hostname,
|
||||
'port': self.public_port,
|
||||
}
|
||||
if self.zone:
|
||||
info['zone'] = self.zone
|
||||
job.sendWorkComplete(json.dumps(info))
|
||||
|
||||
def _runCommand(self):
|
||||
while self.command_running:
|
||||
try:
|
||||
|
@ -194,10 +233,14 @@ class FingerGateway(object):
|
|||
|
||||
self.server = streamer_utils.CustomThreadingTCPServer(
|
||||
self.address,
|
||||
functools.partial(RequestHandler, rpc=self.rpc),
|
||||
functools.partial(self.handler_class, fingergw=self),
|
||||
user=self.user,
|
||||
pid_file=self.pid_file)
|
||||
|
||||
# Update port that we really use if we configured a port of 0
|
||||
if self.public_port == 0:
|
||||
self.public_port = self.server.socket.getsockname()[1]
|
||||
|
||||
# Start the command processor after the server and privilege drop
|
||||
if self.command_socket_path:
|
||||
self.log.debug("Starting command processor")
|
||||
|
@ -215,10 +258,20 @@ class FingerGateway(object):
|
|||
self.server_thread.daemon = True
|
||||
self.server_thread.start()
|
||||
self.component_info.state = self.component_info.RUNNING
|
||||
|
||||
# Register this finger gateway in case we are zoned
|
||||
if self.gearworker:
|
||||
self.log.info('Starting gearworker')
|
||||
self.gearworker.start()
|
||||
|
||||
self.log.info("Finger gateway is started")
|
||||
|
||||
def stop(self):
|
||||
self.component_info.state = self.component_info.STOPPED
|
||||
|
||||
if self.gearworker:
|
||||
self.gearworker.stop()
|
||||
|
||||
if self.server:
|
||||
try:
|
||||
self.server.shutdown()
|
||||
|
@ -250,7 +303,55 @@ class FingerGateway(object):
|
|||
'''
|
||||
Wait on the gateway to shutdown.
|
||||
'''
|
||||
self.gearworker.join()
|
||||
self.server_thread.join()
|
||||
|
||||
if self.command_thread:
|
||||
self.command_thread.join()
|
||||
|
||||
|
||||
class FingerClient:
|
||||
log = logging.getLogger("zuul.FingerClient")
|
||||
|
||||
def __init__(self, server, port, ssl_key=None, ssl_cert=None, ssl_ca=None):
|
||||
self.log.debug("Connecting to gearman at %s:%s" % (server, port))
|
||||
self.gearman = gear.Client()
|
||||
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
|
||||
keepalive=True, tcp_keepidle=60,
|
||||
tcp_keepintvl=30, tcp_keepcnt=5)
|
||||
self.log.debug("Waiting for gearman")
|
||||
self.gearman.waitForServer()
|
||||
|
||||
def submitJob(self, name, data):
|
||||
self.log.debug("Submitting job %s with data %s" % (name, data))
|
||||
job = gear.TextJob(name,
|
||||
json.dumps(data),
|
||||
unique=str(time.time()))
|
||||
self.gearman.submitJob(job, timeout=300)
|
||||
|
||||
self.log.debug("Waiting for job completion")
|
||||
while not job.complete:
|
||||
time.sleep(0.1)
|
||||
if job.exception:
|
||||
raise RPCFailure(job.exception)
|
||||
self.log.debug("Job complete, success: %s" % (not job.failure))
|
||||
return job
|
||||
|
||||
def shutdown(self):
|
||||
self.gearman.shutdown()
|
||||
|
||||
def get_fingergw_in_zone(self, zone):
|
||||
job_name = 'fingergw:info:%s' % zone
|
||||
functions = getGearmanFunctions(self.gearman)
|
||||
if job_name not in functions:
|
||||
# There is no matching fingergw
|
||||
self.log.warning('No fingergw found in zone %s', zone)
|
||||
return None
|
||||
|
||||
job = self.submitJob(job_name, {})
|
||||
if job.failure:
|
||||
self.log.warning('Failed to get fingergw info from zone %s: '
|
||||
'%s', zone, job)
|
||||
return None
|
||||
else:
|
||||
return json.loads(job.data[0])
|
||||
|
|
|
@ -2203,12 +2203,14 @@ class Worker(object):
|
|||
self.name = "Unknown"
|
||||
self.hostname = None
|
||||
self.log_port = None
|
||||
self.zone = None
|
||||
|
||||
def updateFromData(self, data):
|
||||
"""Update worker information if contained in the WORK_DATA response."""
|
||||
self.name = data.get('worker_name', self.name)
|
||||
self.hostname = data.get('worker_hostname', self.hostname)
|
||||
self.log_port = data.get('worker_log_port', self.log_port)
|
||||
self.zone = data.get('worker_zone', self.zone)
|
||||
|
||||
def __repr__(self):
|
||||
return '<Worker %s>' % self.name
|
||||
|
|
|
@ -136,8 +136,11 @@ class RPCClient(object):
|
|||
def shutdown(self):
|
||||
self.gearman.shutdown()
|
||||
|
||||
def get_job_log_stream_address(self, uuid, logfile='console.log'):
|
||||
def get_job_log_stream_address(self, uuid, logfile='console.log',
|
||||
source_zone=None):
|
||||
data = {'uuid': uuid, 'logfile': logfile}
|
||||
if source_zone:
|
||||
data['source_zone'] = source_zone
|
||||
job = self.submitJob('zuul:get_job_log_stream_address', data)
|
||||
if job.failure:
|
||||
return False
|
||||
|
|
|
@ -277,13 +277,40 @@ class RPCListener(RPCListenerBase):
|
|||
return None
|
||||
|
||||
args = json.loads(job.arguments)
|
||||
self.log.debug('Handle get_job_log_stream_address with arguments %s',
|
||||
job.arguments)
|
||||
uuid = args['uuid']
|
||||
# TODO: logfile = args['logfile']
|
||||
job_log_stream_address = {}
|
||||
build = find_build(uuid)
|
||||
if build:
|
||||
job_log_stream_address['server'] = build.worker.hostname
|
||||
job_log_stream_address['port'] = build.worker.log_port
|
||||
# If zone and worker zone are both given check if we need to route
|
||||
# via a finger gateway in that zone.
|
||||
source_zone = args.get('source_zone')
|
||||
if (source_zone and build.worker.zone and
|
||||
source_zone != build.worker.zone):
|
||||
info = self.sched.finger_client.get_fingergw_in_zone(
|
||||
build.worker.zone)
|
||||
if info:
|
||||
job_log_stream_address['server'] = info['server']
|
||||
job_log_stream_address['port'] = info['port']
|
||||
use_ssl = info.get('use_ssl')
|
||||
if use_ssl:
|
||||
job_log_stream_address['use_ssl'] = use_ssl
|
||||
self.log.debug('Source and worker zone are different, '
|
||||
'routing via %s:%s', info['server'],
|
||||
info['port'])
|
||||
else:
|
||||
self.log.warning('Source and worker zone are different '
|
||||
'but no fingergw in target zone found. '
|
||||
'Falling back to direct connection.')
|
||||
else:
|
||||
self.log.debug('Source or worker zone undefined or equal, no'
|
||||
' routing is needed.')
|
||||
|
||||
if 'server' not in job_log_stream_address:
|
||||
job_log_stream_address['server'] = build.worker.hostname
|
||||
job_log_stream_address['port'] = build.worker.log_port
|
||||
job.sendWorkComplete(json.dumps(job_log_stream_address))
|
||||
|
||||
def _is_authorized(self, tenant, claims):
|
||||
|
|
|
@ -37,6 +37,7 @@ from zuul import rpclistener
|
|||
from zuul.lib import commandsocket
|
||||
from zuul.lib.ansible import AnsibleManager
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.fingergw import FingerClient
|
||||
from zuul.lib.gear_utils import getGearmanFunctions
|
||||
from zuul.lib.keystorage import FileKeyStorage, ZooKeeperKeyStorage
|
||||
from zuul.lib.logutil import get_annotated_logger
|
||||
|
@ -239,6 +240,14 @@ class Scheduler(threading.Thread):
|
|||
self.nodepool = nodepool.Nodepool(
|
||||
self.zk_client, self.hostname, self.statsd, self)
|
||||
|
||||
self.gear_server = get_default(config, 'gearman', 'server')
|
||||
self.gear_port = get_default(config, 'gearman', 'port', 4730)
|
||||
self.gear_ssl_key = get_default(config, 'gearman', 'ssl_key')
|
||||
self.gear_ssl_cert = get_default(config, 'gearman', 'ssl_cert')
|
||||
self.gear_ssl_ca = get_default(config, 'gearman', 'ssl_ca')
|
||||
|
||||
self.finger_client = None
|
||||
|
||||
def start(self):
|
||||
super(Scheduler, self).start()
|
||||
self.keystore = ZooKeeperKeyStorage(
|
||||
|
@ -254,6 +263,10 @@ class Scheduler(threading.Thread):
|
|||
self.command_thread.daemon = True
|
||||
self.command_thread.start()
|
||||
|
||||
self.finger_client = FingerClient(
|
||||
self.gear_server, self.gear_port, self.gear_ssl_key,
|
||||
self.gear_ssl_cert, self.gear_ssl_ca)
|
||||
|
||||
self.rpc.start()
|
||||
self.rpc_slow.start()
|
||||
self.stats_thread.start()
|
||||
|
@ -278,6 +291,7 @@ class Scheduler(threading.Thread):
|
|||
self.rpc_slow.stop()
|
||||
self.rpc_slow.join()
|
||||
self.stop_repl()
|
||||
self.finger_client.shutdown()
|
||||
self._command_running = False
|
||||
self.command_socket.stop()
|
||||
self.command_thread.join()
|
||||
|
|
|
@ -165,7 +165,7 @@ class LogStreamHandler(WebSocket):
|
|||
key=key))
|
||||
|
||||
port_location = self.zuulweb.rpc.get_job_log_stream_address(
|
||||
request['uuid'])
|
||||
request['uuid'], source_zone=self.zuulweb.zone)
|
||||
if not port_location:
|
||||
return self.logClose(4011, "Error with Gearman")
|
||||
|
||||
|
@ -1275,6 +1275,7 @@ class ZuulWeb(object):
|
|||
self.connections = connections
|
||||
self.authenticators = authenticators
|
||||
self.stream_manager = StreamManager()
|
||||
self.zone = get_default(self.config, 'web', 'zone')
|
||||
|
||||
command_socket = get_default(
|
||||
self.config, 'web', 'command_socket',
|
||||
|
|
Loading…
Reference in New Issue