diff --git a/tests/base.py b/tests/base.py index c463e33845..9092de474b 100644 --- a/tests/base.py +++ b/tests/base.py @@ -1331,6 +1331,21 @@ class FakeNodepool(object): if 'fedora-pod' in node_type: data['connection_type'] = 'kubectl' data['connection_port']['pod'] = 'fedora-abcdefg' + if 'remote-pod' in node_type: + data['connection_type'] = 'kubectl' + data['connection_port'] = { + 'name': os.environ['ZUUL_POD_REMOTE_NAME'], + 'namespace': os.environ.get( + 'ZUUL_POD_REMOTE_NAMESPACE', 'default'), + 'host': os.environ['ZUUL_POD_REMOTE_SERVER'], + 'skiptls': False, + 'token': os.environ['ZUUL_POD_REMOTE_TOKEN'], + 'ca_crt': os.environ['ZUUL_POD_REMOTE_CA'], + 'user': os.environ['ZUUL_POD_REMOTE_USER'], + 'pod': os.environ['ZUUL_POD_REMOTE_NAME'], + } + data['interface_ip'] = data['connection_port']['pod'] + data['public_ipv4'] = None data['tenant_name'] = request['tenant_name'] data['requestor'] = request['requestor'] diff --git a/tests/fixtures/config/remote-zuul-stream/git/org_project/playbooks/pod.yaml b/tests/fixtures/config/remote-zuul-stream/git/org_project/playbooks/pod.yaml new file mode 100644 index 0000000000..1df8a87fa0 --- /dev/null +++ b/tests/fixtures/config/remote-zuul-stream/git/org_project/playbooks/pod.yaml @@ -0,0 +1,21 @@ +- hosts: all + tasks: + - zuul_console: + state: absent + failed_when: false + + # This task should cause the port forwarder to crash since it + # attempts to connect (even though the task is skipped). + - name: "first shell" + shell: "echo first shell" + when: false + + - zuul_console: + + # Subsequent tasks should work, if the forwarder is up. + - name: "second shell" + shell: "echo second shell" + when: false + + - name: "third shell" + shell: "echo third shell" diff --git a/tests/fixtures/config/remote-zuul-stream/git/org_project/playbooks/vm.yaml b/tests/fixtures/config/remote-zuul-stream/git/org_project/playbooks/vm.yaml new file mode 100644 index 0000000000..dcffeb5cf0 --- /dev/null +++ b/tests/fixtures/config/remote-zuul-stream/git/org_project/playbooks/vm.yaml @@ -0,0 +1,20 @@ +- hosts: all + tasks: + - zuul_console: + state: absent + port: "{{ test_console_port }}" + failed_when: false + + - name: "first shell" + shell: "echo first shell" + when: false + + - zuul_console: + port: "{{ test_console_port }}" + + - name: "second shell" + shell: "echo second shell" + when: false + + - name: "third shell" + shell: "echo third shell" diff --git a/tests/fixtures/fake_kubectl.sh b/tests/fixtures/fake_kubectl.sh index 7395a582d9..86e26e5241 100755 --- a/tests/fixtures/fake_kubectl.sh +++ b/tests/fixtures/fake_kubectl.sh @@ -1,6 +1,15 @@ -#!/bin/sh +#!/bin/bash -echo "Forwarding from 127.0.0.1:1234 -> 19885" +# Script arguments look like: +# --kubeconfig=/tmp/tmppm0yyqvv/zuul-test/builds/c21fc1eb7e2c469cb4997d688252dc3c/work/.kube/config --context=zuul-ci-abcdefg:zuul-worker/ -n zuul-ci-abcdefg port-forward pod/fedora-abcdefg 37303:19885 + +# Get the last argument to the script +arg=${@:$#} + +# Split on the colon +ports=(${arg//:/ }) + +echo "Forwarding from 127.0.0.1:${ports[0]} -> ${ports[1]}" while true; do sleep 5 diff --git a/tests/remote/test_remote_zuul_stream.py b/tests/remote/test_remote_zuul_stream.py index bd9f4f6d27..89760b33ac 100644 --- a/tests/remote/test_remote_zuul_stream.py +++ b/tests/remote/test_remote_zuul_stream.py @@ -18,6 +18,8 @@ import logging import os import re import textwrap +import yaml +from unittest import skip from datetime import datetime, timedelta from tests.base import AnsibleZuulTestCase @@ -45,42 +47,43 @@ class FunctionalZuulStreamMixIn: ansible_remote = os.environ.get('ZUUL_REMOTE_IPV4') self.assertIsNotNone(ansible_remote) - def _run_job(self, job_name, create=True, split='false'): + def _run_job(self, job_name, create=True, nodes=None, split=False): # Keep the jobdir around so we can inspect contents if an # assert fails. It will be cleaned up anyway as it is contained # in a tmp dir which gets cleaned up after the test. self.executor_server.keep_jobdir = True + if nodes is None: + nodes = [ + {'name': 'compute1', + 'label': 'whatever'}, + {'name': 'controller', + 'label': 'whatever'}, + ] + # Output extra ansible info so we might see errors. self.executor_server.verbose = True if create: - conf = textwrap.dedent( - """ - - job: - name: {job_name} - run: playbooks/{job_name}.yaml - ansible-version: {version} - ansible-split-streams: {split} - vars: - test_console_port: {console_port} - roles: - - zuul: org/common-config - nodeset: - nodes: - - name: compute1 - label: whatever - - name: controller - label: whatever - - - project: - check: - jobs: - - {job_name} - """.format( - job_name=job_name, - version=self.ansible_version, - split=split, - console_port=self.log_console_port)) + conf = [ + { + 'job': { + 'name': job_name, + 'run': f'playbooks/{job_name}.yaml', + 'ansible-version': self.ansible_version, + 'ansible-split-streams': split, + 'vars': { + 'test_console_port': self.log_console_port, + }, + 'roles': [ + {'zuul': 'org/common-config'}, + ], + 'nodeset': {'nodes': nodes}, + } + }, { + 'project': {'check': {'jobs': [job_name]}} + } + ] + conf = yaml.safe_dump(conf) else: conf = textwrap.dedent( """ @@ -262,7 +265,7 @@ class FunctionalZuulStreamMixIn: r"""compute1 \| ok: \{'string': '\d.""", text) def test_command_split_streams(self): - job = self._run_job('command', split='true') + job = self._run_job('command', split=True) with self.jobLog(job): build = self.history[-1] self.assertEqual(build.result, 'SUCCESS') @@ -409,6 +412,39 @@ class FunctionalZuulStreamMixIn: r'handle its own exit"' self.assertLogLine(regex, text) + # These twe tests are helpful to have for local debugging, but we + # don't run them in the gate (yet) for two reasons: + # 1) The VM test has no useful assertions since it was created as + # a comparison for the pod test + # 2) The pod test can not be run in the gate. + @skip("No useful assertions") + def test_vm(self): + # This test is not particularly useful, it is mostly a + # benchmark to compare with the pod test below. + nodes = [{'name': 'controller', 'label': 'whatever'}] + job = self._run_job('vm', nodes=nodes) + with self.jobLog(job): + build = self.history[-1] + path = os.path.join(self.jobdir_root, build.uuid, + 'work', 'logs', 'job-output.txt') + with open(path) as f: + self.log.debug(f.read()) + + @skip("Pod unavailable in gate") + def test_pod(self): + # This test could be used to verify the kubectl restart + # functionality if we had a gate job with access to a pod. + # TODO: add microk8s or kind to the stream test and enable + # this + nodes = [{'name': 'controller', 'label': 'remote-pod'}] + job = self._run_job('pod', nodes=nodes) + with self.jobLog(job): + build = self.history[-1] + path = os.path.join(self.jobdir_root, build.uuid, + 'work', 'logs', 'job-output.txt') + with open(path) as f: + self.log.debug(f.read()) + class TestZuulStream8(AnsibleZuulTestCase, FunctionalZuulStreamMixIn): ansible_version = '8' diff --git a/zuul/executor/server.py b/zuul/executor/server.py index b5a54fcc3b..96e3d0a703 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -409,31 +409,54 @@ class KubeFwd(object): self.context = context self.namespace = namespace self.pod = pod + self.socket = None + + def _getSocket(self): + # Reserve a port so that we can restart the forwarder if it + # exits, which it will if there is any connection problem at + # all. + self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.bind(('::', 0)) + self.port = self.socket.getsockname()[1] def start(self): if self.fwd: return + + if self.socket is None: + self._getSocket() + + cmd = [ + 'while', '!', + self.kubectl_command, + shlex.quote('--kubeconfig=%s' % self.kubeconfig), + shlex.quote('--context=%s' % self.context), + '-n', + shlex.quote(self.namespace), + 'port-forward', + shlex.quote('pod/%s' % self.pod), + '%s:19885' % self.port, + ';', 'do', ':;', 'done', + ] + cmd = ' '.join(cmd) + with open('/dev/null', 'r+') as devnull: - fwd = subprocess.Popen( - [self.kubectl_command, '--kubeconfig=%s' % self.kubeconfig, - '--context=%s' % self.context, - '-n', self.namespace, - 'port-forward', - 'pod/%s' % self.pod, ':19885'], - close_fds=True, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - stdin=devnull) + fwd = subprocess.Popen(cmd, + shell=True, + close_fds=True, + start_new_session=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + stdin=devnull) line = fwd.stdout.readline().decode('utf8') m = re.match(r'^Forwarding from 127.0.0.1:(\d+) -> 19885', line) + port = None if m: - self.port = m.group(1) - else: - try: - self.log.error("Could not find the forwarded port: %s", line) - fwd.kill() - except Exception: - pass + port = m.group(1) + if port != str(self.port): + self.log.error("Could not find the forwarded port: %s", line) + self.stop() raise Exception("Unable to start kubectl port forward") self.fwd = fwd self.log.info('Started Kubectl port forward on port {}'.format( @@ -442,7 +465,8 @@ class KubeFwd(object): def stop(self): try: if self.fwd: - self.fwd.kill() + pgid = os.getpgid(self.fwd.pid) + os.killpg(pgid, signal.SIGKILL) self.fwd.wait() # clear stdout buffer before its gone to not miss out on @@ -456,6 +480,12 @@ class KubeFwd(object): self.fwd = None except Exception: self.log.exception('Unable to stop kubectl port-forward:') + try: + if self.socket: + self.socket.close() + self.socket = None + except Exception: + self.log.exception('Unable to close port-forward socket:') def __del__(self): try: