diff --git a/Dockerfile b/Dockerfile index e1f3051843..a1d4636340 100644 --- a/Dockerfile +++ b/Dockerfile @@ -44,7 +44,7 @@ FROM opendevorg/python-base as zuul COPY --from=builder /output/ /output RUN echo "deb http://ftp.debian.org/debian stretch-backports main" >> /etc/apt/sources.list \ && apt-get update \ - && apt-get install -t stretch-backports -y bubblewrap \ + && apt-get install -t stretch-backports -y bubblewrap socat \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* RUN /output/install-from-bindep \ diff --git a/releasenotes/notes/kubectl-port-forward-93de41c7c0b107dc.yaml b/releasenotes/notes/kubectl-port-forward-93de41c7c0b107dc.yaml new file mode 100644 index 0000000000..f59ac3c9de --- /dev/null +++ b/releasenotes/notes/kubectl-port-forward-93de41c7c0b107dc.yaml @@ -0,0 +1,11 @@ +--- +upgrade: + - | + Kubectl and socat must now be installed on Zuul executors if using + Kubernetes or OpenShift `pod` resources from Nodepool. Additionally, + Nodepool version 3.12.0 or later is required. +fixes: + - | + Previously, no output from shell or command tasks on pods was placed + in the job output; that has been corrected and streaming output is + now available. diff --git a/tests/fixtures/fake_kubectl.sh b/tests/fixtures/fake_kubectl.sh new file mode 100755 index 0000000000..7395a582d9 --- /dev/null +++ b/tests/fixtures/fake_kubectl.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +echo "Forwarding from 127.0.0.1:1234 -> 19885" + +while true; do + sleep 5 +done diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py index b5e62e2391..8cffbc815e 100644 --- a/tests/unit/test_v3.py +++ b/tests/unit/test_v3.py @@ -5507,6 +5507,10 @@ class TestContainerJobs(AnsibleZuulTestCase): tenant_config_file = "config/container-build-resources/main.yaml" def test_container_jobs(self): + self.patch(zuul.executor.server.KubeFwd, + 'kubectl_command', + os.path.join(FIXTURE_DIR, 'fake_kubectl.sh')) + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() diff --git a/zuul/ansible/base/callback/zuul_stream.py b/zuul/ansible/base/callback/zuul_stream.py index 5b2206e144..ca34dc2abc 100644 --- a/zuul/ansible/base/callback/zuul_stream.py +++ b/zuul/ansible/base/callback/zuul_stream.py @@ -129,12 +129,12 @@ class CallbackModule(default.CallbackModule): else: self._display.display(msg) - def _read_log(self, host, ip, log_id, task_name, hosts): + def _read_log(self, host, ip, port, log_id, task_name, hosts): self._log("[%s] Starting to log %s for task %s" % (host, log_id, task_name), job=False, executor=True) while True: try: - s = socket.create_connection((ip, LOG_STREAM_PORT), 5) + s = socket.create_connection((ip, port), 5) # Disable the socket timeout after we have successfully # connected to accomodate the fact that jobs may not be writing # logs continously. Without this we can easily trip the 5 @@ -144,12 +144,12 @@ class CallbackModule(default.CallbackModule): self._log( "Timeout exception waiting for the logger. " "Please check connectivity to [%s:%s]" - % (ip, LOG_STREAM_PORT), executor=True) + % (ip, port), executor=True) self._log_streamline( "localhost", "Timeout exception waiting for the logger. " "Please check connectivity to [%s:%s]" - % (ip, LOG_STREAM_PORT)) + % (ip, port)) return except Exception: self._log("[%s] Waiting on logger" % host, @@ -254,6 +254,7 @@ class CallbackModule(default.CallbackModule): hosts = self._get_task_hosts(task) for host, inventory_hostname in hosts: + port = LOG_STREAM_PORT if host in ('localhost', '127.0.0.1'): # Don't try to stream from localhost continue @@ -267,14 +268,21 @@ class CallbackModule(default.CallbackModule): # Don't try to stream from loops continue if play_vars[host].get('ansible_connection') in ('kubectl', ): - # Don't try to stream from kubectl connection - continue + # Stream from the forwarded port on kubectl conns + port = play_vars[host]['zuul']['resources'][ + inventory_hostname].get('stream_port') + if port is None: + self._log("[Zuul] Kubectl and socat must be installed " + "on the Zuul executor for streaming output " + "from pods") + continue + ip = '127.0.0.1' log_id = "%s-%s" % ( task._uuid, paths._sanitize_filename(inventory_hostname)) streamer = threading.Thread( target=self._read_log, args=( - host, ip, log_id, task_name, hosts)) + host, ip, port, log_id, task_name, hosts)) streamer.daemon = True streamer.start() self._streamers.append(streamer) diff --git a/zuul/executor/server.py b/zuul/executor/server.py index e8f5717f60..d0ce36a311 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -29,6 +29,7 @@ import threading import time import traceback from concurrent.futures.process import ProcessPoolExecutor, BrokenProcessPool +import re import git from urllib.parse import urlsplit @@ -311,6 +312,70 @@ class SshAgent(object): return result +class KubeFwd(object): + kubectl_command = 'kubectl' + + def __init__(self, zuul_event_id, build, kubeconfig, context, + namespace, pod): + self.port = None + self.fwd = None + self.log = get_annotated_logger( + logging.getLogger("zuul.ExecutorServer"), + zuul_event_id, build=build) + self.kubeconfig = kubeconfig + self.context = context + self.namespace = namespace + self.pod = pod + + def start(self): + if self.fwd: + return + with open('/dev/null', 'r+') as devnull: + fwd = subprocess.Popen( + [self.kubectl_command, '--kubeconfig=%s' % self.kubeconfig, + '--context=%s' % self.context, + '-n', self.namespace, + 'port-forward', + '--address', '127.0.0.1', + 'pod/%s' % self.pod, ':19885'], + close_fds=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + stdin=devnull) + line = fwd.stdout.readline().decode('utf8') + m = re.match(r'^Forwarding from 127.0.0.1:(\d+) -> 19885', line) + if m: + self.port = m.group(1) + else: + try: + fwd.kill() + except Exception: + pass + raise Exception("Unable to start kubectl port forward") + self.fwd = fwd + self.log.info('Started Kubectl port forward on port {}'.format( + self.port)) + + def stop(self): + try: + if self.fwd: + self.fwd.kill() + self.fwd.wait() + self.fwd = None + except Exception: + self.log.exception('Unable to stop kubectl port-forward:') + + def __del__(self): + try: + self.stop() + except Exception: + self.log.exception('Exception in KubeFwd destructor') + try: + super().__del__(self) + except AttributeError: + pass + + class JobDirPlaybook(object): def __init__(self, root): self.root = root @@ -369,6 +434,8 @@ class JobDir(object): # work (mounted in bwrap read-write) # .ssh # known_hosts + # .kube + # config # src # # @@ -403,6 +470,9 @@ class JobDir(object): os.makedirs(self.untrusted_root) ssh_dir = os.path.join(self.work_root, '.ssh') os.mkdir(ssh_dir, 0o700) + kube_dir = os.path.join(self.work_root, ".kube") + os.makedirs(kube_dir) + self.kubeconfig = os.path.join(kube_dir, "config") # Create ansible cache directory self.ansible_cache_root = os.path.join(self.root, '.ansible') self.fact_cache = os.path.join(self.ansible_cache_root, 'fact-cache') @@ -758,7 +828,7 @@ class AnsibleJob(object): 'winrm_read_timeout_sec') self.ssh_agent = SshAgent(zuul_event_id=self.zuul_event_id, build=self.job.unique) - + self.port_forwards = [] self.executor_variables_file = None self.cpu_times = {'user': 0, 'system': 0, @@ -873,6 +943,11 @@ class AnsibleJob(object): self.ssh_agent.stop() except Exception: self.log.exception("Error stopping SSH agent:") + for fwd in self.port_forwards: + try: + fwd.stop() + except Exception: + self.log.exception("Error stopping port forward:") try: self.executor_server.finishJob(self.job.unique) except Exception: @@ -1780,12 +1855,11 @@ class AnsibleJob(object): self.log.debug("Adding role path %s", role_path) jobdir_playbook.roles_path.append(role_path) - def prepareKubeConfig(self, data): - kube_cfg_path = os.path.join(self.jobdir.work_root, ".kube", "config") + def prepareKubeConfig(self, jobdir, data): + kube_cfg_path = jobdir.kubeconfig if os.path.exists(kube_cfg_path): kube_cfg = yaml.safe_load(open(kube_cfg_path)) else: - os.makedirs(os.path.dirname(kube_cfg_path), exist_ok=True) kube_cfg = { 'apiVersion': 'v1', 'kind': 'Config', @@ -1854,7 +1928,7 @@ class AnsibleJob(object): # TODO: decrypt resource data using scheduler key data = node['connection_port'] # Setup kube/config file - self.prepareKubeConfig(data) + self.prepareKubeConfig(self.jobdir, data) # Convert connection_port in kubectl connection parameters node['connection_port'] = None node['kubectl_namespace'] = data['namespace'] @@ -1871,6 +1945,22 @@ class AnsibleJob(object): # Add the real pod name to the resources_var all_vars['zuul']['resources'][ node['name'][0]]['pod'] = data['pod'] + fwd = KubeFwd(zuul_event_id=self.zuul_event_id, + build=self.job.unique, + kubeconfig=self.jobdir.kubeconfig, + context=data['context_name'], + namespace=data['namespace'], + pod=data['pod']) + try: + fwd.start() + self.port_forwards.append(fwd) + all_vars['zuul']['resources'][ + node['name'][0]]['stream_port'] = fwd.port + except Exception: + self.log.exception("Unable to start port forward:") + self.log.error("Kubectl and socat are required for " + "streaming logs") + # Remove resource node from nodes list for node in resources_nodes: args['nodes'].remove(node)