Merge "Stream output from kubectl pods"

This commit is contained in:
Zuul 2020-02-28 23:07:50 +00:00 committed by Gerrit Code Review
commit 5823ca6e23
6 changed files with 133 additions and 13 deletions

View File

@ -44,7 +44,7 @@ FROM opendevorg/python-base as zuul
COPY --from=builder /output/ /output
RUN echo "deb http://ftp.debian.org/debian stretch-backports main" >> /etc/apt/sources.list \
&& apt-get update \
&& apt-get install -t stretch-backports -y bubblewrap \
&& apt-get install -t stretch-backports -y bubblewrap socat \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
RUN /output/install-from-bindep \

View File

@ -0,0 +1,11 @@
---
upgrade:
- |
Kubectl and socat must now be installed on Zuul executors if using
Kubernetes or OpenShift `pod` resources from Nodepool. Additionally,
Nodepool version 3.12.0 or later is required.
fixes:
- |
Previously, no output from shell or command tasks on pods was placed
in the job output; that has been corrected and streaming output is
now available.

7
tests/fixtures/fake_kubectl.sh vendored Executable file
View File

@ -0,0 +1,7 @@
#!/bin/sh
echo "Forwarding from 127.0.0.1:1234 -> 19885"
while true; do
sleep 5
done

View File

@ -5507,6 +5507,10 @@ class TestContainerJobs(AnsibleZuulTestCase):
tenant_config_file = "config/container-build-resources/main.yaml"
def test_container_jobs(self):
self.patch(zuul.executor.server.KubeFwd,
'kubectl_command',
os.path.join(FIXTURE_DIR, 'fake_kubectl.sh'))
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()

View File

@ -129,12 +129,12 @@ class CallbackModule(default.CallbackModule):
else:
self._display.display(msg)
def _read_log(self, host, ip, log_id, task_name, hosts):
def _read_log(self, host, ip, port, log_id, task_name, hosts):
self._log("[%s] Starting to log %s for task %s"
% (host, log_id, task_name), job=False, executor=True)
while True:
try:
s = socket.create_connection((ip, LOG_STREAM_PORT), 5)
s = socket.create_connection((ip, port), 5)
# Disable the socket timeout after we have successfully
# connected to accomodate the fact that jobs may not be writing
# logs continously. Without this we can easily trip the 5
@ -144,12 +144,12 @@ class CallbackModule(default.CallbackModule):
self._log(
"Timeout exception waiting for the logger. "
"Please check connectivity to [%s:%s]"
% (ip, LOG_STREAM_PORT), executor=True)
% (ip, port), executor=True)
self._log_streamline(
"localhost",
"Timeout exception waiting for the logger. "
"Please check connectivity to [%s:%s]"
% (ip, LOG_STREAM_PORT))
% (ip, port))
return
except Exception:
self._log("[%s] Waiting on logger" % host,
@ -254,6 +254,7 @@ class CallbackModule(default.CallbackModule):
hosts = self._get_task_hosts(task)
for host, inventory_hostname in hosts:
port = LOG_STREAM_PORT
if host in ('localhost', '127.0.0.1'):
# Don't try to stream from localhost
continue
@ -267,14 +268,21 @@ class CallbackModule(default.CallbackModule):
# Don't try to stream from loops
continue
if play_vars[host].get('ansible_connection') in ('kubectl', ):
# Don't try to stream from kubectl connection
continue
# Stream from the forwarded port on kubectl conns
port = play_vars[host]['zuul']['resources'][
inventory_hostname].get('stream_port')
if port is None:
self._log("[Zuul] Kubectl and socat must be installed "
"on the Zuul executor for streaming output "
"from pods")
continue
ip = '127.0.0.1'
log_id = "%s-%s" % (
task._uuid, paths._sanitize_filename(inventory_hostname))
streamer = threading.Thread(
target=self._read_log, args=(
host, ip, log_id, task_name, hosts))
host, ip, port, log_id, task_name, hosts))
streamer.daemon = True
streamer.start()
self._streamers.append(streamer)

View File

@ -29,6 +29,7 @@ import threading
import time
import traceback
from concurrent.futures.process import ProcessPoolExecutor, BrokenProcessPool
import re
import git
from urllib.parse import urlsplit
@ -318,6 +319,70 @@ class SshAgent(object):
return result
class KubeFwd(object):
kubectl_command = 'kubectl'
def __init__(self, zuul_event_id, build, kubeconfig, context,
namespace, pod):
self.port = None
self.fwd = None
self.log = get_annotated_logger(
logging.getLogger("zuul.ExecutorServer"),
zuul_event_id, build=build)
self.kubeconfig = kubeconfig
self.context = context
self.namespace = namespace
self.pod = pod
def start(self):
if self.fwd:
return
with open('/dev/null', 'r+') as devnull:
fwd = subprocess.Popen(
[self.kubectl_command, '--kubeconfig=%s' % self.kubeconfig,
'--context=%s' % self.context,
'-n', self.namespace,
'port-forward',
'--address', '127.0.0.1',
'pod/%s' % self.pod, ':19885'],
close_fds=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=devnull)
line = fwd.stdout.readline().decode('utf8')
m = re.match(r'^Forwarding from 127.0.0.1:(\d+) -> 19885', line)
if m:
self.port = m.group(1)
else:
try:
fwd.kill()
except Exception:
pass
raise Exception("Unable to start kubectl port forward")
self.fwd = fwd
self.log.info('Started Kubectl port forward on port {}'.format(
self.port))
def stop(self):
try:
if self.fwd:
self.fwd.kill()
self.fwd.wait()
self.fwd = None
except Exception:
self.log.exception('Unable to stop kubectl port-forward:')
def __del__(self):
try:
self.stop()
except Exception:
self.log.exception('Exception in KubeFwd destructor')
try:
super().__del__(self)
except AttributeError:
pass
class JobDirPlaybook(object):
def __init__(self, root):
self.root = root
@ -377,6 +442,8 @@ class JobDir(object):
# work (mounted in bwrap read-write)
# .ssh
# known_hosts
# .kube
# config
# src
# <git.example.com>
# <project>
@ -415,6 +482,9 @@ class JobDir(object):
os.makedirs(self.untrusted_root)
ssh_dir = os.path.join(self.work_root, '.ssh')
os.mkdir(ssh_dir, 0o700)
kube_dir = os.path.join(self.work_root, ".kube")
os.makedirs(kube_dir)
self.kubeconfig = os.path.join(kube_dir, "config")
# Create ansible cache directory
self.ansible_cache_root = os.path.join(self.root, '.ansible')
self.fact_cache = os.path.join(self.ansible_cache_root, 'fact-cache')
@ -770,7 +840,7 @@ class AnsibleJob(object):
'winrm_read_timeout_sec')
self.ssh_agent = SshAgent(zuul_event_id=self.zuul_event_id,
build=self.job.unique)
self.port_forwards = []
self.executor_variables_file = None
self.cpu_times = {'user': 0, 'system': 0,
@ -885,6 +955,11 @@ class AnsibleJob(object):
self.ssh_agent.stop()
except Exception:
self.log.exception("Error stopping SSH agent:")
for fwd in self.port_forwards:
try:
fwd.stop()
except Exception:
self.log.exception("Error stopping port forward:")
try:
self.executor_server.finishJob(self.job.unique)
except Exception:
@ -1800,12 +1875,11 @@ class AnsibleJob(object):
self.log.debug("Adding role path %s", role_path)
jobdir_playbook.roles_path.append(role_path)
def prepareKubeConfig(self, data):
kube_cfg_path = os.path.join(self.jobdir.work_root, ".kube", "config")
def prepareKubeConfig(self, jobdir, data):
kube_cfg_path = jobdir.kubeconfig
if os.path.exists(kube_cfg_path):
kube_cfg = yaml.safe_load(open(kube_cfg_path))
else:
os.makedirs(os.path.dirname(kube_cfg_path), exist_ok=True)
kube_cfg = {
'apiVersion': 'v1',
'kind': 'Config',
@ -1874,7 +1948,7 @@ class AnsibleJob(object):
# TODO: decrypt resource data using scheduler key
data = node['connection_port']
# Setup kube/config file
self.prepareKubeConfig(data)
self.prepareKubeConfig(self.jobdir, data)
# Convert connection_port in kubectl connection parameters
node['connection_port'] = None
node['kubectl_namespace'] = data['namespace']
@ -1891,6 +1965,22 @@ class AnsibleJob(object):
# Add the real pod name to the resources_var
all_vars['zuul']['resources'][
node['name'][0]]['pod'] = data['pod']
fwd = KubeFwd(zuul_event_id=self.zuul_event_id,
build=self.job.unique,
kubeconfig=self.jobdir.kubeconfig,
context=data['context_name'],
namespace=data['namespace'],
pod=data['pod'])
try:
fwd.start()
self.port_forwards.append(fwd)
all_vars['zuul']['resources'][
node['name'][0]]['stream_port'] = fwd.port
except Exception:
self.log.exception("Unable to start port forward:")
self.log.error("Kubectl and socat are required for "
"streaming logs")
# Remove resource node from nodes list
for node in resources_nodes:
args['nodes'].remove(node)