diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst index 86b01efc37..68dbf926b6 100644 --- a/doc/source/admin/components.rst +++ b/doc/source/admin/components.rst @@ -287,7 +287,7 @@ The following section of ``zuul.conf`` is used by the merger: .. attr:: merger - ,, attr:: command_socket + .. attr:: command_socket :default: /var/lib/zuul/merger.socket Path to command socket file for the merger process. @@ -627,3 +627,65 @@ Operation To start the web server, run ``zuul-web``. To stop it, kill the PID which was saved in the pidfile specified in the configuration. + +Finger Gateway +-------------- + +The Zuul finger gateway connects to the standard finger port (79) and listens +for finger requests specifying a build UUID for which it should stream log +results. The gateway will determine which executor is currently running that +build and query that executor for the log stream. + +This is intended to be used with the standard finger command line client. +For example:: + + finger UUID@zuul.example.com + +The above would stream the logs for the build identified by `UUID`. + +Configuration +~~~~~~~~~~~~~ + +In addition to the common configuration sections, the following +sections of ``zuul.conf`` are used by the finger gateway: + +.. attr:: fingergw + + .. attr:: command_socket + :default: /var/lib/zuul/fingergw.socket + + Path to command socket file for the executor process. + + .. attr:: listen_address + :default: all addresses + + IP address or domain name on which to listen. + + .. attr:: log_config + + Path to log config file for the finger gateway process. + + .. attr:: pidfile + :default: /var/run/zuul-fingergw/zuul-fingergw.pid + + Path to PID lock file for the finger gateway process. + + .. attr:: port + :default: 79 + + Port to use for the finger gateway. Note that since command line + finger clients cannot usually specify the port, leaving this set to + the default value is highly recommended. + + .. attr:: user + :default: zuul + + User ID for the zuul-fingergw process. In normal operation as a + daemon, the finger gateway should be started as the ``root`` user, but + it will drop privileges to this user during startup. + +Operation +~~~~~~~~~ + +To start the finger gateway, run ``zuul-fingergw``. To stop it, kill the +PID which was saved in the pidfile specified in the configuration. diff --git a/setup.cfg b/setup.cfg index 63ff562ebe..dea31582fa 100644 --- a/setup.cfg +++ b/setup.cfg @@ -28,6 +28,7 @@ console_scripts = zuul-bwrap = zuul.driver.bubblewrap:main zuul-web = zuul.cmd.web:main zuul-migrate = zuul.cmd.migrate:main + zuul-fingergw = zuul.cmd.fingergw:main [build_sphinx] source-dir = doc/source diff --git a/tests/base.py b/tests/base.py index ea01d20a1c..69d9f55227 100755 --- a/tests/base.py +++ b/tests/base.py @@ -2421,7 +2421,7 @@ class ZuulTestCase(BaseTestCase): 'pydevd.CommandThread', 'pydevd.Reader', 'pydevd.Writer', - 'FingerStreamer', + 'socketserver_Thread', ] threads = [t for t in threading.enumerate() if t.name not in whitelist] diff --git a/tests/unit/test_log_streamer.py b/tests/unit/test_streaming.py similarity index 69% rename from tests/unit/test_log_streamer.py rename to tests/unit/test_streaming.py index 27368e33ab..4bb541a687 100644 --- a/tests/unit/test_log_streamer.py +++ b/tests/unit/test_streaming.py @@ -28,6 +28,7 @@ import time import zuul.web import zuul.lib.log_streamer +import zuul.lib.fingergw import tests.base @@ -60,7 +61,7 @@ class TestLogStreamer(tests.base.BaseTestCase): class TestStreaming(tests.base.AnsibleZuulTestCase): tenant_config_file = 'config/streamer/main.yaml' - log = logging.getLogger("zuul.test.test_log_streamer.TestStreaming") + log = logging.getLogger("zuul.test_streaming") def setUp(self): super(TestStreaming, self).setUp() @@ -181,9 +182,38 @@ class TestStreaming(tests.base.AnsibleZuulTestCase): loop.run_until_complete(client(loop, build_uuid, event)) loop.close() + def runFingerClient(self, build_uuid, gateway_address, event): + # Wait until the gateway is started + while True: + try: + # NOTE(Shrews): This causes the gateway to begin to handle + # a request for which it never receives data, and thus + # causes the getCommand() method to timeout (seen in the + # test results, but is harmless). + with socket.create_connection(gateway_address) as s: + break + except ConnectionRefusedError: + time.sleep(0.1) + + with socket.create_connection(gateway_address) as s: + msg = "%s\n" % build_uuid + s.sendall(msg.encode('utf-8')) + event.set() # notify we are connected and req sent + while True: + data = s.recv(1024) + if not data: + break + self.streaming_data += data.decode('utf-8') + s.shutdown(socket.SHUT_RDWR) + def test_websocket_streaming(self): + # Start the finger streamer daemon + streamer = zuul.lib.log_streamer.LogStreamer( + None, self.host, 0, self.executor_server.jobdir_root) + self.addCleanup(streamer.stop) + # Need to set the streaming port before submitting the job - finger_port = 7902 + finger_port = streamer.server.socket.getsockname()[1] self.executor_server.log_streaming_port = finger_port A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') @@ -216,11 +246,6 @@ class TestStreaming(tests.base.AnsibleZuulTestCase): logfile = open(ansible_log, 'r') self.addCleanup(logfile.close) - # Start the finger streamer daemon - streamer = zuul.lib.log_streamer.LogStreamer( - None, self.host, finger_port, self.executor_server.jobdir_root) - self.addCleanup(streamer.stop) - # Start the web server web_server = zuul.web.ZuulWeb( listen_address='::', listen_port=9000, @@ -265,3 +290,83 @@ class TestStreaming(tests.base.AnsibleZuulTestCase): self.log.debug("\n\nFile contents: %s\n\n", file_contents) self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results) self.assertEqual(file_contents, self.ws_client_results) + + def test_finger_gateway(self): + # Start the finger streamer daemon + streamer = zuul.lib.log_streamer.LogStreamer( + None, self.host, 0, self.executor_server.jobdir_root) + self.addCleanup(streamer.stop) + finger_port = streamer.server.socket.getsockname()[1] + + # Need to set the streaming port before submitting the job + self.executor_server.log_streaming_port = finger_port + + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + + # We don't have any real synchronization for the ansible jobs, so + # just wait until we get our running build. + while not len(self.builds): + time.sleep(0.1) + build = self.builds[0] + self.assertEqual(build.name, 'python27') + + build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid) + while not os.path.exists(build_dir): + time.sleep(0.1) + + # Need to wait to make sure that jobdir gets set + while build.jobdir is None: + time.sleep(0.1) + build = self.builds[0] + + # Wait for the job to begin running and create the ansible log file. + # The job waits to complete until the flag file exists, so we can + # safely access the log here. We only open it (to force a file handle + # to be kept open for it after the job finishes) but wait to read the + # contents until the job is done. + ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt') + while not os.path.exists(ansible_log): + time.sleep(0.1) + logfile = open(ansible_log, 'r') + self.addCleanup(logfile.close) + + # Start the finger gateway daemon + gateway = zuul.lib.fingergw.FingerGateway( + ('127.0.0.1', self.gearman_server.port, None, None, None), + (self.host, 0), + user=None, + command_socket=None, + pid_file=None + ) + gateway.start() + self.addCleanup(gateway.stop) + + gateway_port = gateway.server.socket.getsockname()[1] + gateway_address = (self.host, gateway_port) + + # Start a thread with the finger client + finger_client_event = threading.Event() + self.finger_client_results = '' + finger_client_thread = threading.Thread( + target=self.runFingerClient, + args=(build.uuid, gateway_address, finger_client_event) + ) + finger_client_thread.start() + finger_client_event.wait() + + # Allow the job to complete + flag_file = os.path.join(build_dir, 'test_wait') + open(flag_file, 'w').close() + + # Wait for the finger client to complete, which it should when + # it's received the full log. + finger_client_thread.join() + + self.waitUntilSettled() + + file_contents = logfile.read() + logfile.close() + self.log.debug("\n\nFile contents: %s\n\n", file_contents) + self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data) + self.assertEqual(file_contents, self.streaming_data) diff --git a/zuul/cmd/fingergw.py b/zuul/cmd/fingergw.py new file mode 100644 index 0000000000..920eed8f2f --- /dev/null +++ b/zuul/cmd/fingergw.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +# Copyright 2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import signal +import sys + +import zuul.cmd +import zuul.lib.fingergw + +from zuul.lib.config import get_default + + +class FingerGatewayApp(zuul.cmd.ZuulDaemonApp): + ''' + Class for the daemon that will distribute any finger requests to the + appropriate Zuul executor handling the specified build UUID. + ''' + app_name = 'fingergw' + app_description = 'The Zuul finger gateway.' + + def __init__(self): + super(FingerGatewayApp, self).__init__() + self.gateway = None + + def createParser(self): + parser = super(FingerGatewayApp, self).createParser() + parser.add_argument('command', + choices=zuul.lib.fingergw.COMMANDS, + nargs='?') + return parser + + def parseArguments(self, args=None): + super(FingerGatewayApp, self).parseArguments() + if self.args.command: + self.args.nodaemon = True + + def run(self): + ''' + Main entry point for the FingerGatewayApp. + + Called by the main() method of the parent class. + ''' + if self.args.command in zuul.lib.fingergw.COMMANDS: + self.send_command(self.args.command) + sys.exit(0) + + self.setup_logging('fingergw', 'log_config') + self.log = logging.getLogger('zuul.fingergw') + + # Get values from configuration file + host = get_default(self.config, 'fingergw', 'listen_address', '::') + port = int(get_default(self.config, 'fingergw', 'port', 79)) + user = get_default(self.config, 'fingergw', 'user', 'zuul') + cmdsock = get_default( + self.config, 'fingergw', 'command_socket', + '/var/lib/zuul/%s.socket' % self.app_name) + gear_server = get_default(self.config, 'gearman', 'server') + gear_port = get_default(self.config, 'gearman', 'port', 4730) + ssl_key = get_default(self.config, 'gearman', 'ssl_key') + ssl_cert = get_default(self.config, 'gearman', 'ssl_cert') + ssl_ca = get_default(self.config, 'gearman', 'ssl_ca') + + self.gateway = zuul.lib.fingergw.FingerGateway( + (gear_server, gear_port, ssl_key, ssl_cert, ssl_ca), + (host, port), + user, + cmdsock, + self.getPidFile(), + ) + + self.log.info('Starting Zuul finger gateway app') + self.gateway.start() + + if self.args.nodaemon: + # NOTE(Shrews): When running in non-daemon mode, although sending + # the 'stop' command via the command socket will shutdown the + # gateway, it's still necessary to Ctrl+C to stop the app. + while True: + try: + signal.pause() + except KeyboardInterrupt: + print("Ctrl + C: asking gateway to exit nicely...\n") + self.stop() + break + else: + self.gateway.wait() + + self.log.info('Stopped Zuul finger gateway app') + + def stop(self): + if self.gateway: + self.gateway.stop() + + +def main(): + FingerGatewayApp().main() diff --git a/zuul/lib/fingergw.py b/zuul/lib/fingergw.py new file mode 100644 index 0000000000..c89ed0f4a0 --- /dev/null +++ b/zuul/lib/fingergw.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python +# Copyright 2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import functools +import logging +import socket +import threading + +import zuul.rpcclient + +from zuul.lib import commandsocket +from zuul.lib import streamer_utils + + +COMMANDS = ['stop'] + + +class RequestHandler(streamer_utils.BaseFingerRequestHandler): + ''' + Class implementing the logic for handling a single finger request. + ''' + + log = logging.getLogger("zuul.fingergw") + + def __init__(self, *args, **kwargs): + self.rpc = kwargs.pop('rpc') + super(RequestHandler, self).__init__(*args, **kwargs) + + def _fingerClient(self, server, port, build_uuid): + ''' + Open a finger connection and return all streaming results. + + :param server: The remote server. + :param port: The remote port. + :param build_uuid: The build UUID to stream. + + Both IPv4 and IPv6 are supported. + ''' + with socket.create_connection((server, port), timeout=10) as s: + msg = "%s\n" % build_uuid # Must have a trailing newline! + s.sendall(msg.encode('utf-8')) + while True: + data = s.recv(1024) + if data: + self.request.sendall(data) + else: + break + + def handle(self): + ''' + This method is called by the socketserver framework to handle an + incoming request. + ''' + try: + build_uuid = self.getCommand() + port_location = self.rpc.get_job_log_stream_address(build_uuid) + self._fingerClient( + port_location['server'], + port_location['port'], + build_uuid, + ) + except Exception: + self.log.exception('Finger request handling exception:') + msg = 'Internal streaming error' + self.request.sendall(msg.encode('utf-8')) + return + + +class FingerGateway(object): + ''' + Class implementing the finger multiplexing/gateway logic. + + For each incoming finger request, a new thread is started that will + be responsible for finding which Zuul executor is executing the + requested build (by asking Gearman), forwarding the request to that + executor, and streaming the results back to our client. + ''' + + log = logging.getLogger("zuul.fingergw") + + def __init__(self, gearman, address, user, command_socket, pid_file): + ''' + Initialize the finger gateway. + + :param tuple gearman: Gearman connection information. This should + include the server, port, SSL key, SSL cert, and SSL CA. + :param tuple address: The address and port to bind to for our gateway. + :param str user: The user to which we should drop privileges after + binding to our address. + :param str command_socket: Path to the daemon command socket. + :param str pid_file: Path to the daemon PID file. + ''' + self.gear_server = gearman[0] + self.gear_port = gearman[1] + self.gear_ssl_key = gearman[2] + self.gear_ssl_cert = gearman[3] + self.gear_ssl_ca = gearman[4] + self.address = address + self.user = user + self.pid_file = pid_file + + self.rpc = None + self.server = None + self.server_thread = None + + self.command_thread = None + self.command_running = False + self.command_socket = command_socket + + self.command_map = dict( + stop=self.stop, + ) + + def _runCommand(self): + while self.command_running: + try: + command = self.command_socket.get().decode('utf8') + if command != '_stop': + self.command_map[command]() + else: + return + except Exception: + self.log.exception("Exception while processing command") + + def _run(self): + try: + self.server.serve_forever() + except Exception: + self.log.exception('Abnormal termination:') + raise + + def start(self): + self.rpc = zuul.rpcclient.RPCClient( + self.gear_server, + self.gear_port, + self.gear_ssl_key, + self.gear_ssl_cert, + self.gear_ssl_ca) + + self.server = streamer_utils.CustomThreadingTCPServer( + self.address, + functools.partial(RequestHandler, rpc=self.rpc), + user=self.user, + pid_file=self.pid_file) + + # Start the command processor after the server and privilege drop + if self.command_socket: + self.log.debug("Starting command processor") + self.command_socket = commandsocket.CommandSocket( + self.command_socket) + self.command_socket.start() + self.command_running = True + self.command_thread = threading.Thread( + target=self._runCommand, name='command') + self.command_thread.daemon = True + self.command_thread.start() + + # The socketserver shutdown() call will hang unless the call + # to server_forever() happens in another thread. So let's do that. + self.server_thread = threading.Thread(target=self._run) + self.server_thread.daemon = True + self.server_thread.start() + self.log.info("Finger gateway is started") + + def stop(self): + if self.command_socket: + self.command_running = False + try: + self.command_socket.stop() + except Exception: + self.log.exception("Error stopping command socket:") + + if self.server: + try: + self.server.shutdown() + self.server.server_close() + self.server = None + except Exception: + self.log.exception("Error stopping TCP server:") + + if self.rpc: + try: + self.rpc.shutdown() + self.rpc = None + except Exception: + self.log.exception("Error stopping RCP client:") + + self.log.info("Finger gateway is stopped") + + def wait(self): + ''' + Wait on the gateway to shutdown. + ''' + self.server_thread.join() diff --git a/zuul/lib/log_streamer.py b/zuul/lib/log_streamer.py index 1906be7344..5c894b44c5 100644 --- a/zuul/lib/log_streamer.py +++ b/zuul/lib/log_streamer.py @@ -18,14 +18,13 @@ import logging import os import os.path -import pwd import re import select -import socket -import socketserver import threading import time +from zuul.lib import streamer_utils + class Log(object): @@ -38,7 +37,7 @@ class Log(object): self.size = self.stat.st_size -class RequestHandler(socketserver.BaseRequestHandler): +class RequestHandler(streamer_utils.BaseFingerRequestHandler): ''' Class to handle a single log streaming request. @@ -46,47 +45,13 @@ class RequestHandler(socketserver.BaseRequestHandler): the (class/method/attribute) names were changed to protect the innocent. ''' - MAX_REQUEST_LEN = 1024 - REQUEST_TIMEOUT = 10 - - # NOTE(Shrews): We only use this to log exceptions since a new process - # is used per-request (and having multiple processes write to the same - # log file constantly is bad). - log = logging.getLogger("zuul.log_streamer.RequestHandler") - - def get_command(self): - poll = select.poll() - bitmask = (select.POLLIN | select.POLLERR | - select.POLLHUP | select.POLLNVAL) - poll.register(self.request, bitmask) - buffer = b'' - ret = None - start = time.time() - while True: - elapsed = time.time() - start - timeout = max(self.REQUEST_TIMEOUT - elapsed, 0) - if not timeout: - raise Exception("Timeout while waiting for input") - for fd, event in poll.poll(timeout): - if event & select.POLLIN: - buffer += self.request.recv(self.MAX_REQUEST_LEN) - else: - raise Exception("Received error event") - if len(buffer) >= self.MAX_REQUEST_LEN: - raise Exception("Request too long") - try: - ret = buffer.decode('utf-8') - x = ret.find('\n') - if x > 0: - return ret[:x] - except UnicodeDecodeError: - pass + log = logging.getLogger("zuul.log_streamer") def handle(self): try: - build_uuid = self.get_command() + build_uuid = self.getCommand() except Exception: - self.log.exception("Failure during get_command:") + self.log.exception("Failure during getCommand:") msg = 'Internal streaming error' self.request.sendall(msg.encode("utf-8")) return @@ -182,59 +147,11 @@ class RequestHandler(socketserver.BaseRequestHandler): return False -class CustomThreadingTCPServer(socketserver.ThreadingTCPServer): - ''' - Custom version that allows us to drop privileges after port binding. - ''' - address_family = socket.AF_INET6 +class LogStreamerServer(streamer_utils.CustomThreadingTCPServer): def __init__(self, *args, **kwargs): - self.user = kwargs.pop('user') self.jobdir_root = kwargs.pop('jobdir_root') - # For some reason, setting custom attributes does not work if we - # call the base class __init__ first. Wha?? - socketserver.ThreadingTCPServer.__init__(self, *args, **kwargs) - - def change_privs(self): - ''' - Drop our privileges to the zuul user. - ''' - if os.getuid() != 0: - return - pw = pwd.getpwnam(self.user) - os.setgroups([]) - os.setgid(pw.pw_gid) - os.setuid(pw.pw_uid) - os.umask(0o022) - - def server_bind(self): - self.allow_reuse_address = True - socketserver.ThreadingTCPServer.server_bind(self) - if self.user: - self.change_privs() - - def server_close(self): - ''' - Overridden from base class to shutdown the socket immediately. - ''' - try: - self.socket.shutdown(socket.SHUT_RD) - self.socket.close() - except socket.error as e: - # If it's already closed, don't error. - if e.errno == socket.EBADF: - return - raise - - def process_request(self, request, client_address): - ''' - Overridden from the base class to name the thread. - ''' - t = threading.Thread(target=self.process_request_thread, - name='FingerStreamer', - args=(request, client_address)) - t.daemon = self.daemon_threads - t.start() + super(LogStreamerServer, self).__init__(*args, **kwargs) class LogStreamer(object): @@ -243,12 +160,12 @@ class LogStreamer(object): ''' def __init__(self, user, host, port, jobdir_root): - self.log = logging.getLogger('zuul.lib.LogStreamer') + self.log = logging.getLogger('zuul.log_streamer') self.log.debug("LogStreamer starting on port %s", port) - self.server = CustomThreadingTCPServer((host, port), - RequestHandler, - user=user, - jobdir_root=jobdir_root) + self.server = LogStreamerServer((host, port), + RequestHandler, + user=user, + jobdir_root=jobdir_root) # We start the actual serving within a thread so we can return to # the owner. diff --git a/zuul/lib/streamer_utils.py b/zuul/lib/streamer_utils.py new file mode 100644 index 0000000000..985f3c37a4 --- /dev/null +++ b/zuul/lib/streamer_utils.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python +# Copyright 2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +''' +This file contains code common to finger log streaming functionality. +The log streamer process within each executor, the finger gateway service, +and the web interface will all make use of this module. +''' + +import os +import pwd +import select +import socket +import socketserver +import threading +import time + + +class BaseFingerRequestHandler(socketserver.BaseRequestHandler): + ''' + Base class for common methods for handling finger requests. + ''' + + MAX_REQUEST_LEN = 1024 + REQUEST_TIMEOUT = 10 + + def getCommand(self): + poll = select.poll() + bitmask = (select.POLLIN | select.POLLERR | + select.POLLHUP | select.POLLNVAL) + poll.register(self.request, bitmask) + buffer = b'' + ret = None + start = time.time() + while True: + elapsed = time.time() - start + timeout = max(self.REQUEST_TIMEOUT - elapsed, 0) + if not timeout: + raise Exception("Timeout while waiting for input") + for fd, event in poll.poll(timeout): + if event & select.POLLIN: + buffer += self.request.recv(self.MAX_REQUEST_LEN) + else: + raise Exception("Received error event") + if len(buffer) >= self.MAX_REQUEST_LEN: + raise Exception("Request too long") + try: + ret = buffer.decode('utf-8') + x = ret.find('\n') + if x > 0: + return ret[:x] + except UnicodeDecodeError: + pass + + +class CustomThreadingTCPServer(socketserver.ThreadingTCPServer): + ''' + Custom version that allows us to drop privileges after port binding. + ''' + + address_family = socket.AF_INET6 + + def __init__(self, *args, **kwargs): + self.user = kwargs.pop('user') + self.pid_file = kwargs.pop('pid_file', None) + socketserver.ThreadingTCPServer.__init__(self, *args, **kwargs) + + def change_privs(self): + ''' + Drop our privileges to another user. + ''' + if os.getuid() != 0: + return + + pw = pwd.getpwnam(self.user) + + # Change owner on our pid file so it can be removed by us after + # dropping privileges. May not exist if not a daemon. + if self.pid_file and os.path.exists(self.pid_file): + os.chown(self.pid_file, pw.pw_uid, pw.pw_gid) + + os.setgroups([]) + os.setgid(pw.pw_gid) + os.setuid(pw.pw_uid) + os.umask(0o022) + + def server_bind(self): + ''' + Overridden from the base class to allow address reuse and to drop + privileges after binding to the listening socket. + ''' + self.allow_reuse_address = True + socketserver.ThreadingTCPServer.server_bind(self) + if self.user: + self.change_privs() + + def server_close(self): + ''' + Overridden from base class to shutdown the socket immediately. + ''' + try: + self.socket.shutdown(socket.SHUT_RD) + self.socket.close() + except socket.error as e: + # If it's already closed, don't error. + if e.errno == socket.EBADF: + return + raise + + def process_request(self, request, client_address): + ''' + Overridden from the base class to name the thread. + ''' + t = threading.Thread(target=self.process_request_thread, + name='socketserver_Thread', + args=(request, client_address)) + t.daemon = self.daemon_threads + t.start() diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py index e4a361205e..3c0b855898 100755 --- a/zuul/web/__init__.py +++ b/zuul/web/__init__.py @@ -42,17 +42,6 @@ class LogStreamingHandler(object): def setEventLoop(self, event_loop): self.event_loop = event_loop - def _getPortLocation(self, job_uuid): - """ - Query Gearman for the executor running the given job. - - :param str job_uuid: The job UUID we want to stream. - """ - # TODO: Fetch the entire list of uuid/file/server/ports once and - # share that, and fetch a new list on cache misses perhaps? - ret = self.rpc.get_job_log_stream_address(job_uuid) - return ret - async def _fingerClient(self, ws, server, port, job_uuid): """ Create a client to connect to the finger streamer and pull results. @@ -94,7 +83,10 @@ class LogStreamingHandler(object): # Schedule the blocking gearman work in an Executor gear_task = self.event_loop.run_in_executor( - None, self._getPortLocation, request['uuid']) + None, + self.rpc.get_job_log_stream_address, + request['uuid'], + ) try: port_location = await asyncio.wait_for(gear_task, 10)