From eb8564702c780cffc2ffac7c1c6f5cfb4465462a Mon Sep 17 00:00:00 2001 From: David Shrewsbury Date: Thu, 13 Apr 2017 14:23:04 -0400 Subject: [PATCH] Add a finger protocol log streamer This will be started along side the the executor process, similar to how the scheduler and gearman server are linked. This will require starting the process as the root user so that we can grab the finger port properly. The process listening on the finger port will drop its privileges to the designated user after grabbing the socket. The executor will also drop its privileges to the same user after starting the log streamer. Change-Id: Ib52585cafbd073ccdb7f87432888ce15c7a66f67 --- tests/unit/test_log_streamer.py | 53 +++++++++ zuul/cmd/executor.py | 57 +++++++++- zuul/lib/log_streamer.py | 196 ++++++++++++++++++++++++++++++++ 3 files changed, 300 insertions(+), 6 deletions(-) create mode 100644 tests/unit/test_log_streamer.py create mode 100644 zuul/lib/log_streamer.py diff --git a/tests/unit/test_log_streamer.py b/tests/unit/test_log_streamer.py new file mode 100644 index 0000000000..3ea5a8e50c --- /dev/null +++ b/tests/unit/test_log_streamer.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python + +# Copyright 2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import socket +import tempfile + +import zuul.lib.log_streamer +import tests.base + + +class TestLogStreamer(tests.base.BaseTestCase): + + log = logging.getLogger("zuul.test.cloner") + + def setUp(self): + super(TestLogStreamer, self).setUp() + self.host = '0.0.0.0' + + def startStreamer(self, port, root=None): + if not root: + root = tempfile.gettempdir() + return zuul.lib.log_streamer.LogStreamer(None, self.host, port, root) + + def test_start_stop(self): + port = 7900 + streamer = self.startStreamer(port) + self.addCleanup(streamer.stop) + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.addCleanup(s.close) + self.assertEqual(0, s.connect_ex((self.host, port))) + s.close() + + streamer.stop() + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.addCleanup(s.close) + self.assertNotEqual(0, s.connect_ex((self.host, port))) + s.close() diff --git a/zuul/cmd/executor.py b/zuul/cmd/executor.py index 4f5b61cc8e..1124d688bc 100755 --- a/zuul/cmd/executor.py +++ b/zuul/cmd/executor.py @@ -24,9 +24,11 @@ pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile']) import logging import os +import pwd import socket import sys import signal +import tempfile import zuul.cmd import zuul.executor.server @@ -37,6 +39,12 @@ import zuul.executor.server # Similar situation with gear and statsd. +# TODO(Shrews): Get this from the config file +USER = 'zuul' + +FINGER_PORT = 79 + + class Executor(zuul.cmd.ZuulApp): def parse_arguments(self): @@ -72,21 +80,58 @@ class Executor(zuul.cmd.ZuulApp): self.executor.stop() self.executor.join() + def start_log_streamer(self): + pipe_read, pipe_write = os.pipe() + child_pid = os.fork() + if child_pid == 0: + os.close(pipe_write) + import zuul.lib.log_streamer + + self.log.info("Starting log streamer") + streamer = zuul.lib.log_streamer.LogStreamer( + USER, '0.0.0.0', FINGER_PORT, self.jobdir_root) + + # Keep running until the parent dies: + pipe_read = os.fdopen(pipe_read) + pipe_read.read() + self.log.info("Stopping log streamer") + streamer.stop() + os._exit(0) + else: + os.close(pipe_read) + self.log_streamer_pid = child_pid + + def change_privs(self): + ''' + Drop our privileges to the zuul user. + ''' + if os.getuid() != 0: + return + pw = pwd.getpwnam(USER) + os.setgroups([]) + os.setgid(pw.pw_gid) + os.setuid(pw.pw_uid) + os.umask(0o022) + def main(self, daemon=True): # See comment at top of file about zuul imports - self.setup_logging('executor', 'log_config') + self.jobroot_dir = None + if self.config.has_option('zuul', 'jobroot_dir'): + self.jobroot_dir = os.path.expanduser( + self.config.get('zuul', 'jobroot_dir')) + else: + self.jobdir_root = tempfile.gettempdir() + self.setup_logging('executor', 'log_config') self.log = logging.getLogger("zuul.Executor") - jobroot_dir = None - if self.config.has_option('zuul', 'jobroot_dir'): - jobroot_dir = os.path.expanduser( - self.config.get('zuul', 'jobroot_dir')) + self.start_log_streamer() + self.change_privs() ExecutorServer = zuul.executor.server.ExecutorServer self.executor = ExecutorServer(self.config, self.connections, - jobdir_root=jobroot_dir, + jobdir_root=self.jobroot_dir, keep_jobdir=self.args.keep_jobdir) self.executor.start() diff --git a/zuul/lib/log_streamer.py b/zuul/lib/log_streamer.py new file mode 100644 index 0000000000..9764237f5d --- /dev/null +++ b/zuul/lib/log_streamer.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python + +# Copyright (c) 2016 IBM Corp. +# Copyright 2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import os.path +import pwd +import re +import select +import socket +import threading +import time + +try: + import SocketServer as ss # python 2.x +except ImportError: + import socketserver as ss # python 3 + + +class Log(object): + + def __init__(self, path): + self.path = path + self.file = open(path) + self.stat = os.stat(path) + self.size = self.stat.st_size + + +class RequestHandler(ss.BaseRequestHandler): + ''' + Class to handle a single log streaming request. + + The log streaming code was blatantly stolen from zuul_console.py. Only + the (class/method/attribute) names were changed to protect the innocent. + ''' + + def handle(self): + build_uuid = self.request.recv(1024) + build_uuid = build_uuid.rstrip() + + # validate build ID + if not re.match("[0-9A-Fa-f]+$", build_uuid): + self.request.sendall('Build ID %s is not valid' % build_uuid) + return + + job_dir = os.path.join(self.server.jobdir_root, build_uuid) + if not os.path.exists(job_dir): + self.request.sendall('Build ID %s not found' % build_uuid) + return + + # check if log file exists + log_file = os.path.join(job_dir, 'ansible', 'ansible_log.txt') + if not os.path.exists(log_file): + self.request.sendall('Log not found for build ID %s' % build_uuid) + return + + self.stream_log(log_file) + + def stream_log(self, log_file): + log = None + while True: + if log is not None: + try: + log.file.close() + except: + pass + while True: + log = self.chunk_log(log_file) + if log: + break + time.sleep(0.5) + while True: + if self.follow_log(log): + break + else: + return + + def chunk_log(self, log_file): + try: + log = Log(log_file) + except Exception: + return + while True: + chunk = log.file.read(4096) + if not chunk: + break + self.request.send(chunk) + return log + + def follow_log(self, log): + while True: + # As long as we have unread data, keep reading/sending + while True: + chunk = log.file.read(4096) + if chunk: + self.request.send(chunk) + else: + break + + # At this point, we are waiting for more data to be written + time.sleep(0.5) + + # Check to see if the remote end has sent any data, if so, + # discard + r, w, e = select.select([self.request], [], [self.request], 0) + if self.request in e: + return False + if self.request in r: + ret = self.request.recv(1024) + # Discard anything read, if input is eof, it has + # disconnected. + if not ret: + return False + + # See if the file has been truncated + try: + st = os.stat(log.path) + if (st.st_ino != log.stat.st_ino or + st.st_size < log.size): + return True + except Exception: + return True + log.size = st.st_size + + +class CustomForkingTCPServer(ss.ForkingTCPServer): + ''' + Custom version that allows us to drop privileges after port binding. + ''' + def __init__(self, *args, **kwargs): + self.user = kwargs.pop('user') + self.jobdir_root = kwargs.pop('jobdir_root') + # For some reason, setting custom attributes does not work if we + # call the base class __init__ first. Wha?? + ss.ForkingTCPServer.__init__(self, *args, **kwargs) + + def change_privs(self): + ''' + Drop our privileges to the zuul user. + ''' + if os.getuid() != 0: + return + pw = pwd.getpwnam(self.user) + os.setgroups([]) + os.setgid(pw.pw_gid) + os.setuid(pw.pw_uid) + os.umask(0o022) + + def server_bind(self): + self.allow_reuse_address = True + ss.ForkingTCPServer.server_bind(self) + if self.user: + self.change_privs() + + def server_close(self): + ''' + Overridden from base class to shutdown the socket immediately. + ''' + self.socket.shutdown(socket.SHUT_RD) + self.socket.close() + + +class LogStreamer(object): + ''' + Class implementing log streaming over the finger daemon port. + ''' + + def __init__(self, user, host, port, jobdir_root): + self.server = CustomForkingTCPServer((host, port), + RequestHandler, + user=user, + jobdir_root=jobdir_root) + + # We start the actual serving within a thread so we can return to + # the owner. + self.thd = threading.Thread(target=self.server.serve_forever) + self.thd.daemon = True + self.thd.start() + + def stop(self): + if self.thd.isAlive(): + self.server.shutdown() + self.server.server_close()