diff --git a/tests/unit/test_log_streamer.py b/tests/unit/test_log_streamer.py new file mode 100644 index 0000000000..3ea5a8e50c --- /dev/null +++ b/tests/unit/test_log_streamer.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python + +# Copyright 2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import socket +import tempfile + +import zuul.lib.log_streamer +import tests.base + + +class TestLogStreamer(tests.base.BaseTestCase): + + log = logging.getLogger("zuul.test.cloner") + + def setUp(self): + super(TestLogStreamer, self).setUp() + self.host = '0.0.0.0' + + def startStreamer(self, port, root=None): + if not root: + root = tempfile.gettempdir() + return zuul.lib.log_streamer.LogStreamer(None, self.host, port, root) + + def test_start_stop(self): + port = 7900 + streamer = self.startStreamer(port) + self.addCleanup(streamer.stop) + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.addCleanup(s.close) + self.assertEqual(0, s.connect_ex((self.host, port))) + s.close() + + streamer.stop() + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.addCleanup(s.close) + self.assertNotEqual(0, s.connect_ex((self.host, port))) + s.close() diff --git a/zuul/cmd/executor.py b/zuul/cmd/executor.py index 4f5b61cc8e..1124d688bc 100755 --- a/zuul/cmd/executor.py +++ b/zuul/cmd/executor.py @@ -24,9 +24,11 @@ pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile']) import logging import os +import pwd import socket import sys import signal +import tempfile import zuul.cmd import zuul.executor.server @@ -37,6 +39,12 @@ import zuul.executor.server # Similar situation with gear and statsd. +# TODO(Shrews): Get this from the config file +USER = 'zuul' + +FINGER_PORT = 79 + + class Executor(zuul.cmd.ZuulApp): def parse_arguments(self): @@ -72,21 +80,58 @@ class Executor(zuul.cmd.ZuulApp): self.executor.stop() self.executor.join() + def start_log_streamer(self): + pipe_read, pipe_write = os.pipe() + child_pid = os.fork() + if child_pid == 0: + os.close(pipe_write) + import zuul.lib.log_streamer + + self.log.info("Starting log streamer") + streamer = zuul.lib.log_streamer.LogStreamer( + USER, '0.0.0.0', FINGER_PORT, self.jobdir_root) + + # Keep running until the parent dies: + pipe_read = os.fdopen(pipe_read) + pipe_read.read() + self.log.info("Stopping log streamer") + streamer.stop() + os._exit(0) + else: + os.close(pipe_read) + self.log_streamer_pid = child_pid + + def change_privs(self): + ''' + Drop our privileges to the zuul user. + ''' + if os.getuid() != 0: + return + pw = pwd.getpwnam(USER) + os.setgroups([]) + os.setgid(pw.pw_gid) + os.setuid(pw.pw_uid) + os.umask(0o022) + def main(self, daemon=True): # See comment at top of file about zuul imports - self.setup_logging('executor', 'log_config') + self.jobroot_dir = None + if self.config.has_option('zuul', 'jobroot_dir'): + self.jobroot_dir = os.path.expanduser( + self.config.get('zuul', 'jobroot_dir')) + else: + self.jobdir_root = tempfile.gettempdir() + self.setup_logging('executor', 'log_config') self.log = logging.getLogger("zuul.Executor") - jobroot_dir = None - if self.config.has_option('zuul', 'jobroot_dir'): - jobroot_dir = os.path.expanduser( - self.config.get('zuul', 'jobroot_dir')) + self.start_log_streamer() + self.change_privs() ExecutorServer = zuul.executor.server.ExecutorServer self.executor = ExecutorServer(self.config, self.connections, - jobdir_root=jobroot_dir, + jobdir_root=self.jobroot_dir, keep_jobdir=self.args.keep_jobdir) self.executor.start() diff --git a/zuul/lib/log_streamer.py b/zuul/lib/log_streamer.py new file mode 100644 index 0000000000..9764237f5d --- /dev/null +++ b/zuul/lib/log_streamer.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python + +# Copyright (c) 2016 IBM Corp. +# Copyright 2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import os.path +import pwd +import re +import select +import socket +import threading +import time + +try: + import SocketServer as ss # python 2.x +except ImportError: + import socketserver as ss # python 3 + + +class Log(object): + + def __init__(self, path): + self.path = path + self.file = open(path) + self.stat = os.stat(path) + self.size = self.stat.st_size + + +class RequestHandler(ss.BaseRequestHandler): + ''' + Class to handle a single log streaming request. + + The log streaming code was blatantly stolen from zuul_console.py. Only + the (class/method/attribute) names were changed to protect the innocent. + ''' + + def handle(self): + build_uuid = self.request.recv(1024) + build_uuid = build_uuid.rstrip() + + # validate build ID + if not re.match("[0-9A-Fa-f]+$", build_uuid): + self.request.sendall('Build ID %s is not valid' % build_uuid) + return + + job_dir = os.path.join(self.server.jobdir_root, build_uuid) + if not os.path.exists(job_dir): + self.request.sendall('Build ID %s not found' % build_uuid) + return + + # check if log file exists + log_file = os.path.join(job_dir, 'ansible', 'ansible_log.txt') + if not os.path.exists(log_file): + self.request.sendall('Log not found for build ID %s' % build_uuid) + return + + self.stream_log(log_file) + + def stream_log(self, log_file): + log = None + while True: + if log is not None: + try: + log.file.close() + except: + pass + while True: + log = self.chunk_log(log_file) + if log: + break + time.sleep(0.5) + while True: + if self.follow_log(log): + break + else: + return + + def chunk_log(self, log_file): + try: + log = Log(log_file) + except Exception: + return + while True: + chunk = log.file.read(4096) + if not chunk: + break + self.request.send(chunk) + return log + + def follow_log(self, log): + while True: + # As long as we have unread data, keep reading/sending + while True: + chunk = log.file.read(4096) + if chunk: + self.request.send(chunk) + else: + break + + # At this point, we are waiting for more data to be written + time.sleep(0.5) + + # Check to see if the remote end has sent any data, if so, + # discard + r, w, e = select.select([self.request], [], [self.request], 0) + if self.request in e: + return False + if self.request in r: + ret = self.request.recv(1024) + # Discard anything read, if input is eof, it has + # disconnected. + if not ret: + return False + + # See if the file has been truncated + try: + st = os.stat(log.path) + if (st.st_ino != log.stat.st_ino or + st.st_size < log.size): + return True + except Exception: + return True + log.size = st.st_size + + +class CustomForkingTCPServer(ss.ForkingTCPServer): + ''' + Custom version that allows us to drop privileges after port binding. + ''' + def __init__(self, *args, **kwargs): + self.user = kwargs.pop('user') + self.jobdir_root = kwargs.pop('jobdir_root') + # For some reason, setting custom attributes does not work if we + # call the base class __init__ first. Wha?? + ss.ForkingTCPServer.__init__(self, *args, **kwargs) + + def change_privs(self): + ''' + Drop our privileges to the zuul user. + ''' + if os.getuid() != 0: + return + pw = pwd.getpwnam(self.user) + os.setgroups([]) + os.setgid(pw.pw_gid) + os.setuid(pw.pw_uid) + os.umask(0o022) + + def server_bind(self): + self.allow_reuse_address = True + ss.ForkingTCPServer.server_bind(self) + if self.user: + self.change_privs() + + def server_close(self): + ''' + Overridden from base class to shutdown the socket immediately. + ''' + self.socket.shutdown(socket.SHUT_RD) + self.socket.close() + + +class LogStreamer(object): + ''' + Class implementing log streaming over the finger daemon port. + ''' + + def __init__(self, user, host, port, jobdir_root): + self.server = CustomForkingTCPServer((host, port), + RequestHandler, + user=user, + jobdir_root=jobdir_root) + + # We start the actual serving within a thread so we can return to + # the owner. + self.thd = threading.Thread(target=self.server.serve_forever) + self.thd.daemon = True + self.thd.start() + + def stop(self): + if self.thd.isAlive(): + self.server.shutdown() + self.server.server_close()