Handle kubectl port-foward terminations

The kubernetes project made an interesting and not universally
loved decision[1] to fix an issue where kubectl port-forward could
get stuck in an infinite loop on some errors by causing it to exit
on any error, even benign ones such as a connection refused due
to the target service not yet listening.

This means that, in Zuul, if a playbook ever runs a command task
in a job on a pod before the zuul_console task, the port-forward
will crash and never recover (and so will not be available for
subsequent tasks).  "Don't do that" is one solution to this, but
isn't particularly robust, and there could be other temporary issues
that might similarly cause a disconnect.

Further, it's conceivable that if another job starts a port-foward
that happens to land on the same port, the first job, if it's still
running, might end up streaming the logs from the second job.

To avoid all of this, this change does the following:

1) It runs the port-forward in a shell with an infinite while loop.
   This essentially lets us outsource process monitoring to a shell
   subprocess rather than needing to dedicate an internal thread
   to restarting the port-forward on failure.
2) In order to kill the port-forward (which is now protected from
   signals by the shell), we start it in a process group and kill
   the whole group (as we do for Ansible).
3) Because the port used for each "host" (ie, pod) must be known
   before we start running Ansible, we can no longer let kubectl
   pick the ephemeral local port.  Instead, we will bind a socket
   ourselves to reserve an ephemeral port number, and then provide
   that port number to kubectl.  As long as we aren't listening
   on the socket, kubectl is able to create its own socket and
   bind it.
4) Because of #3, we take extra care to close the socket when
   finished.

A test was written in order to verify the problem and solution
described above, but running it in the gate is currently impractical
since we have no real pod running.  It is likely that we can add
such a system, but that is left to future work.

In the interim though, this change also adds all of the infrastructure
that would be needed to run such a test, including new test-only
environment variables to indicate how to connect to a pod (this
mirrors the way we specify how to connect to a vm).

[1] https://github.com/kubernetes/kubernetes/pull/103526

Change-Id: Iaa5ab61d6dabd08ad9f9e04d78b4d2110ce24c13
This commit is contained in:
James E. Blair
2024-09-19 15:55:31 -07:00
parent 21be3b2d4b
commit 657ed505aa
6 changed files with 180 additions and 49 deletions

View File

@@ -1314,6 +1314,21 @@ class FakeNodepool(object):
if 'fedora-pod' in node_type:
data['connection_type'] = 'kubectl'
data['connection_port']['pod'] = 'fedora-abcdefg'
if 'remote-pod' in node_type:
data['connection_type'] = 'kubectl'
data['connection_port'] = {
'name': os.environ['ZUUL_POD_REMOTE_NAME'],
'namespace': os.environ.get(
'ZUUL_POD_REMOTE_NAMESPACE', 'default'),
'host': os.environ['ZUUL_POD_REMOTE_SERVER'],
'skiptls': False,
'token': os.environ['ZUUL_POD_REMOTE_TOKEN'],
'ca_crt': os.environ['ZUUL_POD_REMOTE_CA'],
'user': os.environ['ZUUL_POD_REMOTE_USER'],
'pod': os.environ['ZUUL_POD_REMOTE_NAME'],
}
data['interface_ip'] = data['connection_port']['pod']
data['public_ipv4'] = None
data['tenant_name'] = request['tenant_name']
data['requestor'] = request['requestor']

View File

@@ -0,0 +1,21 @@
- hosts: all
tasks:
- zuul_console:
state: absent
failed_when: false
# This task should cause the port forwarder to crash since it
# attempts to connect (even though the task is skipped).
- name: "first shell"
shell: "echo first shell"
when: false
- zuul_console:
# Subsequent tasks should work, if the forwarder is up.
- name: "second shell"
shell: "echo second shell"
when: false
- name: "third shell"
shell: "echo third shell"

View File

@@ -0,0 +1,20 @@
- hosts: all
tasks:
- zuul_console:
state: absent
port: "{{ test_console_port }}"
failed_when: false
- name: "first shell"
shell: "echo first shell"
when: false
- zuul_console:
port: "{{ test_console_port }}"
- name: "second shell"
shell: "echo second shell"
when: false
- name: "third shell"
shell: "echo third shell"

View File

@@ -1,6 +1,15 @@
#!/bin/sh
#!/bin/bash
echo "Forwarding from 127.0.0.1:1234 -> 19885"
# Script arguments look like:
# --kubeconfig=/tmp/tmppm0yyqvv/zuul-test/builds/c21fc1eb7e2c469cb4997d688252dc3c/work/.kube/config --context=zuul-ci-abcdefg:zuul-worker/ -n zuul-ci-abcdefg port-forward pod/fedora-abcdefg 37303:19885
# Get the last argument to the script
arg=${@:$#}
# Split on the colon
ports=(${arg//:/ })
echo "Forwarding from 127.0.0.1:${ports[0]} -> ${ports[1]}"
while true; do
sleep 5

View File

@@ -18,6 +18,8 @@ import logging
import os
import re
import textwrap
import yaml
from unittest import skip
from datetime import datetime, timedelta
from tests.base import AnsibleZuulTestCase
@@ -45,42 +47,43 @@ class FunctionalZuulStreamMixIn:
ansible_remote = os.environ.get('ZUUL_REMOTE_IPV4')
self.assertIsNotNone(ansible_remote)
def _run_job(self, job_name, create=True, split='false'):
def _run_job(self, job_name, create=True, nodes=None, split=False):
# Keep the jobdir around so we can inspect contents if an
# assert fails. It will be cleaned up anyway as it is contained
# in a tmp dir which gets cleaned up after the test.
self.executor_server.keep_jobdir = True
if nodes is None:
nodes = [
{'name': 'compute1',
'label': 'whatever'},
{'name': 'controller',
'label': 'whatever'},
]
# Output extra ansible info so we might see errors.
self.executor_server.verbose = True
if create:
conf = textwrap.dedent(
"""
- job:
name: {job_name}
run: playbooks/{job_name}.yaml
ansible-version: {version}
ansible-split-streams: {split}
vars:
test_console_port: {console_port}
roles:
- zuul: org/common-config
nodeset:
nodes:
- name: compute1
label: whatever
- name: controller
label: whatever
- project:
check:
jobs:
- {job_name}
""".format(
job_name=job_name,
version=self.ansible_version,
split=split,
console_port=self.log_console_port))
conf = [
{
'job': {
'name': job_name,
'run': f'playbooks/{job_name}.yaml',
'ansible-version': self.ansible_version,
'ansible-split-streams': split,
'vars': {
'test_console_port': self.log_console_port,
},
'roles': [
{'zuul': 'org/common-config'},
],
'nodeset': {'nodes': nodes},
}
}, {
'project': {'check': {'jobs': [job_name]}}
}
]
conf = yaml.safe_dump(conf)
else:
conf = textwrap.dedent(
"""
@@ -262,7 +265,7 @@ class FunctionalZuulStreamMixIn:
r"""compute1 \| ok: \{'string': '\d.""", text)
def test_command_split_streams(self):
job = self._run_job('command', split='true')
job = self._run_job('command', split=True)
with self.jobLog(job):
build = self.history[-1]
self.assertEqual(build.result, 'SUCCESS')
@@ -409,6 +412,39 @@ class FunctionalZuulStreamMixIn:
r'handle its own exit"'
self.assertLogLine(regex, text)
# These twe tests are helpful to have for local debugging, but we
# don't run them in the gate (yet) for two reasons:
# 1) The VM test has no useful assertions since it was created as
# a comparison for the pod test
# 2) The pod test can not be run in the gate.
@skip("No useful assertions")
def test_vm(self):
# This test is not particularly useful, it is mostly a
# benchmark to compare with the pod test below.
nodes = [{'name': 'controller', 'label': 'whatever'}]
job = self._run_job('vm', nodes=nodes)
with self.jobLog(job):
build = self.history[-1]
path = os.path.join(self.jobdir_root, build.uuid,
'work', 'logs', 'job-output.txt')
with open(path) as f:
self.log.debug(f.read())
@skip("Pod unavailable in gate")
def test_pod(self):
# This test could be used to verify the kubectl restart
# functionality if we had a gate job with access to a pod.
# TODO: add microk8s or kind to the stream test and enable
# this
nodes = [{'name': 'controller', 'label': 'remote-pod'}]
job = self._run_job('pod', nodes=nodes)
with self.jobLog(job):
build = self.history[-1]
path = os.path.join(self.jobdir_root, build.uuid,
'work', 'logs', 'job-output.txt')
with open(path) as f:
self.log.debug(f.read())
class TestZuulStream8(AnsibleZuulTestCase, FunctionalZuulStreamMixIn):
ansible_version = '8'

View File

@@ -409,31 +409,54 @@ class KubeFwd(object):
self.context = context
self.namespace = namespace
self.pod = pod
self.socket = None
def _getSocket(self):
# Reserve a port so that we can restart the forwarder if it
# exits, which it will if there is any connection problem at
# all.
self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(('::', 0))
self.port = self.socket.getsockname()[1]
def start(self):
if self.fwd:
return
if self.socket is None:
self._getSocket()
cmd = [
'while', '!',
self.kubectl_command,
shlex.quote('--kubeconfig=%s' % self.kubeconfig),
shlex.quote('--context=%s' % self.context),
'-n',
shlex.quote(self.namespace),
'port-forward',
shlex.quote('pod/%s' % self.pod),
'%s:19885' % self.port,
';', 'do', ':;', 'done',
]
cmd = ' '.join(cmd)
with open('/dev/null', 'r+') as devnull:
fwd = subprocess.Popen(
[self.kubectl_command, '--kubeconfig=%s' % self.kubeconfig,
'--context=%s' % self.context,
'-n', self.namespace,
'port-forward',
'pod/%s' % self.pod, ':19885'],
close_fds=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=devnull)
fwd = subprocess.Popen(cmd,
shell=True,
close_fds=True,
start_new_session=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=devnull)
line = fwd.stdout.readline().decode('utf8')
m = re.match(r'^Forwarding from 127.0.0.1:(\d+) -> 19885', line)
port = None
if m:
self.port = m.group(1)
else:
try:
self.log.error("Could not find the forwarded port: %s", line)
fwd.kill()
except Exception:
pass
port = m.group(1)
if port != str(self.port):
self.log.error("Could not find the forwarded port: %s", line)
self.stop()
raise Exception("Unable to start kubectl port forward")
self.fwd = fwd
self.log.info('Started Kubectl port forward on port {}'.format(
@@ -442,7 +465,8 @@ class KubeFwd(object):
def stop(self):
try:
if self.fwd:
self.fwd.kill()
pgid = os.getpgid(self.fwd.pid)
os.killpg(pgid, signal.SIGKILL)
self.fwd.wait()
# clear stdout buffer before its gone to not miss out on
@@ -456,6 +480,12 @@ class KubeFwd(object):
self.fwd = None
except Exception:
self.log.exception('Unable to stop kubectl port-forward:')
try:
if self.socket:
self.socket.close()
self.socket = None
except Exception:
self.log.exception('Unable to close port-forward socket:')
def __del__(self):
try: