From 657ed505aaab8b24a88df415ed6c0e24b9699709 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 19 Sep 2024 15:55:31 -0700 Subject: [PATCH] Handle kubectl port-foward terminations The kubernetes project made an interesting and not universally loved decision[1] to fix an issue where kubectl port-forward could get stuck in an infinite loop on some errors by causing it to exit on any error, even benign ones such as a connection refused due to the target service not yet listening. This means that, in Zuul, if a playbook ever runs a command task in a job on a pod before the zuul_console task, the port-forward will crash and never recover (and so will not be available for subsequent tasks). "Don't do that" is one solution to this, but isn't particularly robust, and there could be other temporary issues that might similarly cause a disconnect. Further, it's conceivable that if another job starts a port-foward that happens to land on the same port, the first job, if it's still running, might end up streaming the logs from the second job. To avoid all of this, this change does the following: 1) It runs the port-forward in a shell with an infinite while loop. This essentially lets us outsource process monitoring to a shell subprocess rather than needing to dedicate an internal thread to restarting the port-forward on failure. 2) In order to kill the port-forward (which is now protected from signals by the shell), we start it in a process group and kill the whole group (as we do for Ansible). 3) Because the port used for each "host" (ie, pod) must be known before we start running Ansible, we can no longer let kubectl pick the ephemeral local port. Instead, we will bind a socket ourselves to reserve an ephemeral port number, and then provide that port number to kubectl. As long as we aren't listening on the socket, kubectl is able to create its own socket and bind it. 4) Because of #3, we take extra care to close the socket when finished. A test was written in order to verify the problem and solution described above, but running it in the gate is currently impractical since we have no real pod running. It is likely that we can add such a system, but that is left to future work. In the interim though, this change also adds all of the infrastructure that would be needed to run such a test, including new test-only environment variables to indicate how to connect to a pod (this mirrors the way we specify how to connect to a vm). [1] https://github.com/kubernetes/kubernetes/pull/103526 Change-Id: Iaa5ab61d6dabd08ad9f9e04d78b4d2110ce24c13 --- tests/base.py | 15 +++ .../git/org_project/playbooks/pod.yaml | 21 +++++ .../git/org_project/playbooks/vm.yaml | 20 ++++ tests/fixtures/fake_kubectl.sh | 13 ++- tests/remote/test_remote_zuul_stream.py | 94 +++++++++++++------ zuul/executor/server.py | 66 +++++++++---- 6 files changed, 180 insertions(+), 49 deletions(-) create mode 100644 tests/fixtures/config/remote-zuul-stream/git/org_project/playbooks/pod.yaml create mode 100644 tests/fixtures/config/remote-zuul-stream/git/org_project/playbooks/vm.yaml diff --git a/tests/base.py b/tests/base.py index 149db52559..e89de957cf 100644 --- a/tests/base.py +++ b/tests/base.py @@ -1314,6 +1314,21 @@ class FakeNodepool(object): if 'fedora-pod' in node_type: data['connection_type'] = 'kubectl' data['connection_port']['pod'] = 'fedora-abcdefg' + if 'remote-pod' in node_type: + data['connection_type'] = 'kubectl' + data['connection_port'] = { + 'name': os.environ['ZUUL_POD_REMOTE_NAME'], + 'namespace': os.environ.get( + 'ZUUL_POD_REMOTE_NAMESPACE', 'default'), + 'host': os.environ['ZUUL_POD_REMOTE_SERVER'], + 'skiptls': False, + 'token': os.environ['ZUUL_POD_REMOTE_TOKEN'], + 'ca_crt': os.environ['ZUUL_POD_REMOTE_CA'], + 'user': os.environ['ZUUL_POD_REMOTE_USER'], + 'pod': os.environ['ZUUL_POD_REMOTE_NAME'], + } + data['interface_ip'] = data['connection_port']['pod'] + data['public_ipv4'] = None data['tenant_name'] = request['tenant_name'] data['requestor'] = request['requestor'] diff --git a/tests/fixtures/config/remote-zuul-stream/git/org_project/playbooks/pod.yaml b/tests/fixtures/config/remote-zuul-stream/git/org_project/playbooks/pod.yaml new file mode 100644 index 0000000000..1df8a87fa0 --- /dev/null +++ b/tests/fixtures/config/remote-zuul-stream/git/org_project/playbooks/pod.yaml @@ -0,0 +1,21 @@ +- hosts: all + tasks: + - zuul_console: + state: absent + failed_when: false + + # This task should cause the port forwarder to crash since it + # attempts to connect (even though the task is skipped). + - name: "first shell" + shell: "echo first shell" + when: false + + - zuul_console: + + # Subsequent tasks should work, if the forwarder is up. + - name: "second shell" + shell: "echo second shell" + when: false + + - name: "third shell" + shell: "echo third shell" diff --git a/tests/fixtures/config/remote-zuul-stream/git/org_project/playbooks/vm.yaml b/tests/fixtures/config/remote-zuul-stream/git/org_project/playbooks/vm.yaml new file mode 100644 index 0000000000..dcffeb5cf0 --- /dev/null +++ b/tests/fixtures/config/remote-zuul-stream/git/org_project/playbooks/vm.yaml @@ -0,0 +1,20 @@ +- hosts: all + tasks: + - zuul_console: + state: absent + port: "{{ test_console_port }}" + failed_when: false + + - name: "first shell" + shell: "echo first shell" + when: false + + - zuul_console: + port: "{{ test_console_port }}" + + - name: "second shell" + shell: "echo second shell" + when: false + + - name: "third shell" + shell: "echo third shell" diff --git a/tests/fixtures/fake_kubectl.sh b/tests/fixtures/fake_kubectl.sh index 7395a582d9..86e26e5241 100755 --- a/tests/fixtures/fake_kubectl.sh +++ b/tests/fixtures/fake_kubectl.sh @@ -1,6 +1,15 @@ -#!/bin/sh +#!/bin/bash -echo "Forwarding from 127.0.0.1:1234 -> 19885" +# Script arguments look like: +# --kubeconfig=/tmp/tmppm0yyqvv/zuul-test/builds/c21fc1eb7e2c469cb4997d688252dc3c/work/.kube/config --context=zuul-ci-abcdefg:zuul-worker/ -n zuul-ci-abcdefg port-forward pod/fedora-abcdefg 37303:19885 + +# Get the last argument to the script +arg=${@:$#} + +# Split on the colon +ports=(${arg//:/ }) + +echo "Forwarding from 127.0.0.1:${ports[0]} -> ${ports[1]}" while true; do sleep 5 diff --git a/tests/remote/test_remote_zuul_stream.py b/tests/remote/test_remote_zuul_stream.py index 9c5afd59a5..697592b62d 100644 --- a/tests/remote/test_remote_zuul_stream.py +++ b/tests/remote/test_remote_zuul_stream.py @@ -18,6 +18,8 @@ import logging import os import re import textwrap +import yaml +from unittest import skip from datetime import datetime, timedelta from tests.base import AnsibleZuulTestCase @@ -45,42 +47,43 @@ class FunctionalZuulStreamMixIn: ansible_remote = os.environ.get('ZUUL_REMOTE_IPV4') self.assertIsNotNone(ansible_remote) - def _run_job(self, job_name, create=True, split='false'): + def _run_job(self, job_name, create=True, nodes=None, split=False): # Keep the jobdir around so we can inspect contents if an # assert fails. It will be cleaned up anyway as it is contained # in a tmp dir which gets cleaned up after the test. self.executor_server.keep_jobdir = True + if nodes is None: + nodes = [ + {'name': 'compute1', + 'label': 'whatever'}, + {'name': 'controller', + 'label': 'whatever'}, + ] + # Output extra ansible info so we might see errors. self.executor_server.verbose = True if create: - conf = textwrap.dedent( - """ - - job: - name: {job_name} - run: playbooks/{job_name}.yaml - ansible-version: {version} - ansible-split-streams: {split} - vars: - test_console_port: {console_port} - roles: - - zuul: org/common-config - nodeset: - nodes: - - name: compute1 - label: whatever - - name: controller - label: whatever - - - project: - check: - jobs: - - {job_name} - """.format( - job_name=job_name, - version=self.ansible_version, - split=split, - console_port=self.log_console_port)) + conf = [ + { + 'job': { + 'name': job_name, + 'run': f'playbooks/{job_name}.yaml', + 'ansible-version': self.ansible_version, + 'ansible-split-streams': split, + 'vars': { + 'test_console_port': self.log_console_port, + }, + 'roles': [ + {'zuul': 'org/common-config'}, + ], + 'nodeset': {'nodes': nodes}, + } + }, { + 'project': {'check': {'jobs': [job_name]}} + } + ] + conf = yaml.safe_dump(conf) else: conf = textwrap.dedent( """ @@ -262,7 +265,7 @@ class FunctionalZuulStreamMixIn: r"""compute1 \| ok: \{'string': '\d.""", text) def test_command_split_streams(self): - job = self._run_job('command', split='true') + job = self._run_job('command', split=True) with self.jobLog(job): build = self.history[-1] self.assertEqual(build.result, 'SUCCESS') @@ -409,6 +412,39 @@ class FunctionalZuulStreamMixIn: r'handle its own exit"' self.assertLogLine(regex, text) + # These twe tests are helpful to have for local debugging, but we + # don't run them in the gate (yet) for two reasons: + # 1) The VM test has no useful assertions since it was created as + # a comparison for the pod test + # 2) The pod test can not be run in the gate. + @skip("No useful assertions") + def test_vm(self): + # This test is not particularly useful, it is mostly a + # benchmark to compare with the pod test below. + nodes = [{'name': 'controller', 'label': 'whatever'}] + job = self._run_job('vm', nodes=nodes) + with self.jobLog(job): + build = self.history[-1] + path = os.path.join(self.jobdir_root, build.uuid, + 'work', 'logs', 'job-output.txt') + with open(path) as f: + self.log.debug(f.read()) + + @skip("Pod unavailable in gate") + def test_pod(self): + # This test could be used to verify the kubectl restart + # functionality if we had a gate job with access to a pod. + # TODO: add microk8s or kind to the stream test and enable + # this + nodes = [{'name': 'controller', 'label': 'remote-pod'}] + job = self._run_job('pod', nodes=nodes) + with self.jobLog(job): + build = self.history[-1] + path = os.path.join(self.jobdir_root, build.uuid, + 'work', 'logs', 'job-output.txt') + with open(path) as f: + self.log.debug(f.read()) + class TestZuulStream8(AnsibleZuulTestCase, FunctionalZuulStreamMixIn): ansible_version = '8' diff --git a/zuul/executor/server.py b/zuul/executor/server.py index 4f89ed4e7b..0fc20dc730 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -409,31 +409,54 @@ class KubeFwd(object): self.context = context self.namespace = namespace self.pod = pod + self.socket = None + + def _getSocket(self): + # Reserve a port so that we can restart the forwarder if it + # exits, which it will if there is any connection problem at + # all. + self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.bind(('::', 0)) + self.port = self.socket.getsockname()[1] def start(self): if self.fwd: return + + if self.socket is None: + self._getSocket() + + cmd = [ + 'while', '!', + self.kubectl_command, + shlex.quote('--kubeconfig=%s' % self.kubeconfig), + shlex.quote('--context=%s' % self.context), + '-n', + shlex.quote(self.namespace), + 'port-forward', + shlex.quote('pod/%s' % self.pod), + '%s:19885' % self.port, + ';', 'do', ':;', 'done', + ] + cmd = ' '.join(cmd) + with open('/dev/null', 'r+') as devnull: - fwd = subprocess.Popen( - [self.kubectl_command, '--kubeconfig=%s' % self.kubeconfig, - '--context=%s' % self.context, - '-n', self.namespace, - 'port-forward', - 'pod/%s' % self.pod, ':19885'], - close_fds=True, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - stdin=devnull) + fwd = subprocess.Popen(cmd, + shell=True, + close_fds=True, + start_new_session=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + stdin=devnull) line = fwd.stdout.readline().decode('utf8') m = re.match(r'^Forwarding from 127.0.0.1:(\d+) -> 19885', line) + port = None if m: - self.port = m.group(1) - else: - try: - self.log.error("Could not find the forwarded port: %s", line) - fwd.kill() - except Exception: - pass + port = m.group(1) + if port != str(self.port): + self.log.error("Could not find the forwarded port: %s", line) + self.stop() raise Exception("Unable to start kubectl port forward") self.fwd = fwd self.log.info('Started Kubectl port forward on port {}'.format( @@ -442,7 +465,8 @@ class KubeFwd(object): def stop(self): try: if self.fwd: - self.fwd.kill() + pgid = os.getpgid(self.fwd.pid) + os.killpg(pgid, signal.SIGKILL) self.fwd.wait() # clear stdout buffer before its gone to not miss out on @@ -456,6 +480,12 @@ class KubeFwd(object): self.fwd = None except Exception: self.log.exception('Unable to stop kubectl port-forward:') + try: + if self.socket: + self.socket.close() + self.socket = None + except Exception: + self.log.exception('Unable to close port-forward socket:') def __del__(self): try: