Merge "Handle kubectl port-foward terminations"

This commit is contained in:
Zuul 2024-10-31 23:37:46 +00:00 committed by Gerrit Code Review
commit 5bd020a2ae
6 changed files with 180 additions and 49 deletions

View File

@ -1331,6 +1331,21 @@ class FakeNodepool(object):
if 'fedora-pod' in node_type:
data['connection_type'] = 'kubectl'
data['connection_port']['pod'] = 'fedora-abcdefg'
if 'remote-pod' in node_type:
data['connection_type'] = 'kubectl'
data['connection_port'] = {
'name': os.environ['ZUUL_POD_REMOTE_NAME'],
'namespace': os.environ.get(
'ZUUL_POD_REMOTE_NAMESPACE', 'default'),
'host': os.environ['ZUUL_POD_REMOTE_SERVER'],
'skiptls': False,
'token': os.environ['ZUUL_POD_REMOTE_TOKEN'],
'ca_crt': os.environ['ZUUL_POD_REMOTE_CA'],
'user': os.environ['ZUUL_POD_REMOTE_USER'],
'pod': os.environ['ZUUL_POD_REMOTE_NAME'],
}
data['interface_ip'] = data['connection_port']['pod']
data['public_ipv4'] = None
data['tenant_name'] = request['tenant_name']
data['requestor'] = request['requestor']

View File

@ -0,0 +1,21 @@
- hosts: all
tasks:
- zuul_console:
state: absent
failed_when: false
# This task should cause the port forwarder to crash since it
# attempts to connect (even though the task is skipped).
- name: "first shell"
shell: "echo first shell"
when: false
- zuul_console:
# Subsequent tasks should work, if the forwarder is up.
- name: "second shell"
shell: "echo second shell"
when: false
- name: "third shell"
shell: "echo third shell"

View File

@ -0,0 +1,20 @@
- hosts: all
tasks:
- zuul_console:
state: absent
port: "{{ test_console_port }}"
failed_when: false
- name: "first shell"
shell: "echo first shell"
when: false
- zuul_console:
port: "{{ test_console_port }}"
- name: "second shell"
shell: "echo second shell"
when: false
- name: "third shell"
shell: "echo third shell"

View File

@ -1,6 +1,15 @@
#!/bin/sh
#!/bin/bash
echo "Forwarding from 127.0.0.1:1234 -> 19885"
# Script arguments look like:
# --kubeconfig=/tmp/tmppm0yyqvv/zuul-test/builds/c21fc1eb7e2c469cb4997d688252dc3c/work/.kube/config --context=zuul-ci-abcdefg:zuul-worker/ -n zuul-ci-abcdefg port-forward pod/fedora-abcdefg 37303:19885
# Get the last argument to the script
arg=${@:$#}
# Split on the colon
ports=(${arg//:/ })
echo "Forwarding from 127.0.0.1:${ports[0]} -> ${ports[1]}"
while true; do
sleep 5

View File

@ -18,6 +18,8 @@ import logging
import os
import re
import textwrap
import yaml
from unittest import skip
from datetime import datetime, timedelta
from tests.base import AnsibleZuulTestCase
@ -45,42 +47,43 @@ class FunctionalZuulStreamMixIn:
ansible_remote = os.environ.get('ZUUL_REMOTE_IPV4')
self.assertIsNotNone(ansible_remote)
def _run_job(self, job_name, create=True, split='false'):
def _run_job(self, job_name, create=True, nodes=None, split=False):
# Keep the jobdir around so we can inspect contents if an
# assert fails. It will be cleaned up anyway as it is contained
# in a tmp dir which gets cleaned up after the test.
self.executor_server.keep_jobdir = True
if nodes is None:
nodes = [
{'name': 'compute1',
'label': 'whatever'},
{'name': 'controller',
'label': 'whatever'},
]
# Output extra ansible info so we might see errors.
self.executor_server.verbose = True
if create:
conf = textwrap.dedent(
"""
- job:
name: {job_name}
run: playbooks/{job_name}.yaml
ansible-version: {version}
ansible-split-streams: {split}
vars:
test_console_port: {console_port}
roles:
- zuul: org/common-config
nodeset:
nodes:
- name: compute1
label: whatever
- name: controller
label: whatever
- project:
check:
jobs:
- {job_name}
""".format(
job_name=job_name,
version=self.ansible_version,
split=split,
console_port=self.log_console_port))
conf = [
{
'job': {
'name': job_name,
'run': f'playbooks/{job_name}.yaml',
'ansible-version': self.ansible_version,
'ansible-split-streams': split,
'vars': {
'test_console_port': self.log_console_port,
},
'roles': [
{'zuul': 'org/common-config'},
],
'nodeset': {'nodes': nodes},
}
}, {
'project': {'check': {'jobs': [job_name]}}
}
]
conf = yaml.safe_dump(conf)
else:
conf = textwrap.dedent(
"""
@ -262,7 +265,7 @@ class FunctionalZuulStreamMixIn:
r"""compute1 \| ok: \{'string': '\d.""", text)
def test_command_split_streams(self):
job = self._run_job('command', split='true')
job = self._run_job('command', split=True)
with self.jobLog(job):
build = self.history[-1]
self.assertEqual(build.result, 'SUCCESS')
@ -409,6 +412,39 @@ class FunctionalZuulStreamMixIn:
r'handle its own exit"'
self.assertLogLine(regex, text)
# These twe tests are helpful to have for local debugging, but we
# don't run them in the gate (yet) for two reasons:
# 1) The VM test has no useful assertions since it was created as
# a comparison for the pod test
# 2) The pod test can not be run in the gate.
@skip("No useful assertions")
def test_vm(self):
# This test is not particularly useful, it is mostly a
# benchmark to compare with the pod test below.
nodes = [{'name': 'controller', 'label': 'whatever'}]
job = self._run_job('vm', nodes=nodes)
with self.jobLog(job):
build = self.history[-1]
path = os.path.join(self.jobdir_root, build.uuid,
'work', 'logs', 'job-output.txt')
with open(path) as f:
self.log.debug(f.read())
@skip("Pod unavailable in gate")
def test_pod(self):
# This test could be used to verify the kubectl restart
# functionality if we had a gate job with access to a pod.
# TODO: add microk8s or kind to the stream test and enable
# this
nodes = [{'name': 'controller', 'label': 'remote-pod'}]
job = self._run_job('pod', nodes=nodes)
with self.jobLog(job):
build = self.history[-1]
path = os.path.join(self.jobdir_root, build.uuid,
'work', 'logs', 'job-output.txt')
with open(path) as f:
self.log.debug(f.read())
class TestZuulStream8(AnsibleZuulTestCase, FunctionalZuulStreamMixIn):
ansible_version = '8'

View File

@ -409,31 +409,54 @@ class KubeFwd(object):
self.context = context
self.namespace = namespace
self.pod = pod
self.socket = None
def _getSocket(self):
# Reserve a port so that we can restart the forwarder if it
# exits, which it will if there is any connection problem at
# all.
self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(('::', 0))
self.port = self.socket.getsockname()[1]
def start(self):
if self.fwd:
return
if self.socket is None:
self._getSocket()
cmd = [
'while', '!',
self.kubectl_command,
shlex.quote('--kubeconfig=%s' % self.kubeconfig),
shlex.quote('--context=%s' % self.context),
'-n',
shlex.quote(self.namespace),
'port-forward',
shlex.quote('pod/%s' % self.pod),
'%s:19885' % self.port,
';', 'do', ':;', 'done',
]
cmd = ' '.join(cmd)
with open('/dev/null', 'r+') as devnull:
fwd = subprocess.Popen(
[self.kubectl_command, '--kubeconfig=%s' % self.kubeconfig,
'--context=%s' % self.context,
'-n', self.namespace,
'port-forward',
'pod/%s' % self.pod, ':19885'],
close_fds=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=devnull)
fwd = subprocess.Popen(cmd,
shell=True,
close_fds=True,
start_new_session=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=devnull)
line = fwd.stdout.readline().decode('utf8')
m = re.match(r'^Forwarding from 127.0.0.1:(\d+) -> 19885', line)
port = None
if m:
self.port = m.group(1)
else:
try:
self.log.error("Could not find the forwarded port: %s", line)
fwd.kill()
except Exception:
pass
port = m.group(1)
if port != str(self.port):
self.log.error("Could not find the forwarded port: %s", line)
self.stop()
raise Exception("Unable to start kubectl port forward")
self.fwd = fwd
self.log.info('Started Kubectl port forward on port {}'.format(
@ -442,7 +465,8 @@ class KubeFwd(object):
def stop(self):
try:
if self.fwd:
self.fwd.kill()
pgid = os.getpgid(self.fwd.pid)
os.killpg(pgid, signal.SIGKILL)
self.fwd.wait()
# clear stdout buffer before its gone to not miss out on
@ -456,6 +480,12 @@ class KubeFwd(object):
self.fwd = None
except Exception:
self.log.exception('Unable to stop kubectl port-forward:')
try:
if self.socket:
self.socket.close()
self.socket = None
except Exception:
self.log.exception('Unable to close port-forward socket:')
def __del__(self):
try: