Add a finger protocol log streamer

This will be started along side the the executor process, similar
to how the scheduler and gearman server are linked.

This will require starting the process as the root user so that we
can grab the finger port properly. The process listening on the
finger port will drop its privileges to the designated user after
grabbing the socket. The executor will also drop its privileges
to the same user after starting the log streamer.

Change-Id: Ib52585cafbd073ccdb7f87432888ce15c7a66f67
changes/21/456721/16
David Shrewsbury 6 years ago
parent d678429560
commit eb8564702c
  1. 53
      tests/unit/test_log_streamer.py
  2. 57
      zuul/cmd/executor.py
  3. 196
      zuul/lib/log_streamer.py

@ -0,0 +1,53 @@
#!/usr/bin/env python
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import socket
import tempfile
import zuul.lib.log_streamer
import tests.base
class TestLogStreamer(tests.base.BaseTestCase):
log = logging.getLogger("zuul.test.cloner")
def setUp(self):
super(TestLogStreamer, self).setUp()
self.host = '0.0.0.0'
def startStreamer(self, port, root=None):
if not root:
root = tempfile.gettempdir()
return zuul.lib.log_streamer.LogStreamer(None, self.host, port, root)
def test_start_stop(self):
port = 7900
streamer = self.startStreamer(port)
self.addCleanup(streamer.stop)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(s.close)
self.assertEqual(0, s.connect_ex((self.host, port)))
s.close()
streamer.stop()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(s.close)
self.assertNotEqual(0, s.connect_ex((self.host, port)))
s.close()

@ -24,9 +24,11 @@ pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
import logging
import os
import pwd
import socket
import sys
import signal
import tempfile
import zuul.cmd
import zuul.executor.server
@ -37,6 +39,12 @@ import zuul.executor.server
# Similar situation with gear and statsd.
# TODO(Shrews): Get this from the config file
USER = 'zuul'
FINGER_PORT = 79
class Executor(zuul.cmd.ZuulApp):
def parse_arguments(self):
@ -72,21 +80,58 @@ class Executor(zuul.cmd.ZuulApp):
self.executor.stop()
self.executor.join()
def start_log_streamer(self):
pipe_read, pipe_write = os.pipe()
child_pid = os.fork()
if child_pid == 0:
os.close(pipe_write)
import zuul.lib.log_streamer
self.log.info("Starting log streamer")
streamer = zuul.lib.log_streamer.LogStreamer(
USER, '0.0.0.0', FINGER_PORT, self.jobdir_root)
# Keep running until the parent dies:
pipe_read = os.fdopen(pipe_read)
pipe_read.read()
self.log.info("Stopping log streamer")
streamer.stop()
os._exit(0)
else:
os.close(pipe_read)
self.log_streamer_pid = child_pid
def change_privs(self):
'''
Drop our privileges to the zuul user.
'''
if os.getuid() != 0:
return
pw = pwd.getpwnam(USER)
os.setgroups([])
os.setgid(pw.pw_gid)
os.setuid(pw.pw_uid)
os.umask(0o022)
def main(self, daemon=True):
# See comment at top of file about zuul imports
self.setup_logging('executor', 'log_config')
self.jobroot_dir = None
if self.config.has_option('zuul', 'jobroot_dir'):
self.jobroot_dir = os.path.expanduser(
self.config.get('zuul', 'jobroot_dir'))
else:
self.jobdir_root = tempfile.gettempdir()
self.setup_logging('executor', 'log_config')
self.log = logging.getLogger("zuul.Executor")
jobroot_dir = None
if self.config.has_option('zuul', 'jobroot_dir'):
jobroot_dir = os.path.expanduser(
self.config.get('zuul', 'jobroot_dir'))
self.start_log_streamer()
self.change_privs()
ExecutorServer = zuul.executor.server.ExecutorServer
self.executor = ExecutorServer(self.config, self.connections,
jobdir_root=jobroot_dir,
jobdir_root=self.jobroot_dir,
keep_jobdir=self.args.keep_jobdir)
self.executor.start()

@ -0,0 +1,196 @@
#!/usr/bin/env python
# Copyright (c) 2016 IBM Corp.
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import os.path
import pwd
import re
import select
import socket
import threading
import time
try:
import SocketServer as ss # python 2.x
except ImportError:
import socketserver as ss # python 3
class Log(object):
def __init__(self, path):
self.path = path
self.file = open(path)
self.stat = os.stat(path)
self.size = self.stat.st_size
class RequestHandler(ss.BaseRequestHandler):
'''
Class to handle a single log streaming request.
The log streaming code was blatantly stolen from zuul_console.py. Only
the (class/method/attribute) names were changed to protect the innocent.
'''
def handle(self):
build_uuid = self.request.recv(1024)
build_uuid = build_uuid.rstrip()
# validate build ID
if not re.match("[0-9A-Fa-f]+$", build_uuid):
self.request.sendall('Build ID %s is not valid' % build_uuid)
return
job_dir = os.path.join(self.server.jobdir_root, build_uuid)
if not os.path.exists(job_dir):
self.request.sendall('Build ID %s not found' % build_uuid)
return
# check if log file exists
log_file = os.path.join(job_dir, 'ansible', 'ansible_log.txt')
if not os.path.exists(log_file):
self.request.sendall('Log not found for build ID %s' % build_uuid)
return
self.stream_log(log_file)
def stream_log(self, log_file):
log = None
while True:
if log is not None:
try:
log.file.close()
except:
pass
while True:
log = self.chunk_log(log_file)
if log:
break
time.sleep(0.5)
while True:
if self.follow_log(log):
break
else:
return
def chunk_log(self, log_file):
try:
log = Log(log_file)
except Exception:
return
while True:
chunk = log.file.read(4096)
if not chunk:
break
self.request.send(chunk)
return log
def follow_log(self, log):
while True:
# As long as we have unread data, keep reading/sending
while True:
chunk = log.file.read(4096)
if chunk:
self.request.send(chunk)
else:
break
# At this point, we are waiting for more data to be written
time.sleep(0.5)
# Check to see if the remote end has sent any data, if so,
# discard
r, w, e = select.select([self.request], [], [self.request], 0)
if self.request in e:
return False
if self.request in r:
ret = self.request.recv(1024)
# Discard anything read, if input is eof, it has
# disconnected.
if not ret:
return False
# See if the file has been truncated
try:
st = os.stat(log.path)
if (st.st_ino != log.stat.st_ino or
st.st_size < log.size):
return True
except Exception:
return True
log.size = st.st_size
class CustomForkingTCPServer(ss.ForkingTCPServer):
'''
Custom version that allows us to drop privileges after port binding.
'''
def __init__(self, *args, **kwargs):
self.user = kwargs.pop('user')
self.jobdir_root = kwargs.pop('jobdir_root')
# For some reason, setting custom attributes does not work if we
# call the base class __init__ first. Wha??
ss.ForkingTCPServer.__init__(self, *args, **kwargs)
def change_privs(self):
'''
Drop our privileges to the zuul user.
'''
if os.getuid() != 0:
return
pw = pwd.getpwnam(self.user)
os.setgroups([])
os.setgid(pw.pw_gid)
os.setuid(pw.pw_uid)
os.umask(0o022)
def server_bind(self):
self.allow_reuse_address = True
ss.ForkingTCPServer.server_bind(self)
if self.user:
self.change_privs()
def server_close(self):
'''
Overridden from base class to shutdown the socket immediately.
'''
self.socket.shutdown(socket.SHUT_RD)
self.socket.close()
class LogStreamer(object):
'''
Class implementing log streaming over the finger daemon port.
'''
def __init__(self, user, host, port, jobdir_root):
self.server = CustomForkingTCPServer((host, port),
RequestHandler,
user=user,
jobdir_root=jobdir_root)
# We start the actual serving within a thread so we can return to
# the owner.
self.thd = threading.Thread(target=self.server.serve_forever)
self.thd.daemon = True
self.thd.start()
def stop(self):
if self.thd.isAlive():
self.server.shutdown()
self.server.server_close()
Loading…
Cancel
Save