From 7bebd145ba766eb735d9df33ce5764e000c833ff Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Fri, 4 Jun 2021 11:41:03 +0200 Subject: [PATCH] Add tool to connect to a remote unix socket Change-Id: I9492d4e9dfaaf32e8db955fbe7c323f5281e7390 --- tobiko/shell/sh/_ssh.py | 5 ++- tobiko/shell/ssh/_client.py | 76 +++++++++++++++++++++++++++++++++++-- 2 files changed, 76 insertions(+), 5 deletions(-) diff --git a/tobiko/shell/sh/_ssh.py b/tobiko/shell/sh/_ssh.py index ad9c8eab4..005ff118c 100644 --- a/tobiko/shell/sh/_ssh.py +++ b/tobiko/shell/sh/_ssh.py @@ -15,6 +15,8 @@ # under the License. from __future__ import absolute_import +import shlex + from oslo_log import log import paramiko from paramiko import channel @@ -28,7 +30,6 @@ from tobiko.shell.sh import _process from tobiko.shell import ssh import typing # noqa - LOG = log.getLogger(__name__) @@ -107,7 +108,7 @@ class SSHShellProcessFixture(_process.ShellProcessFixture): process = client.get_transport().open_session() if environment: variables = " ".join( - f"{name}='{value}'" + f"{name}={shlex.quote(value)}" for name, value in self.environment.items()) command = variables + " " + command process.exec_command(command) diff --git a/tobiko/shell/ssh/_client.py b/tobiko/shell/ssh/_client.py index a7cf67b04..8f08d5576 100644 --- a/tobiko/shell/ssh/_client.py +++ b/tobiko/shell/ssh/_client.py @@ -21,11 +21,14 @@ import getpass import os import socket import subprocess -import typing # noqa +import time +import threading +import typing import netaddr from oslo_log import log import paramiko +from paramiko import common import six import tobiko @@ -353,7 +356,8 @@ class SSHClientFixture(tobiko.SharedFixture): def connect(self, retry_count: typing.Optional[int] = 2, retry_timeout: tobiko.Seconds = None, - retry_interval: tobiko.Seconds = None, **ssh_parameters): + retry_interval: tobiko.Seconds = None, + **ssh_parameters) -> paramiko.SSHClient: """Ensures it is connected to remote SSH server """ with self.use_connect_parameters(**ssh_parameters): @@ -372,7 +376,7 @@ class SSHClientFixture(tobiko.SharedFixture): LOG.debug("SSH connection is safe to use " f"(attempt={attempt})") connected = True - return client + break else: LOG.warning("SSH connection is not safe to use " f"(attempt={attempt})") @@ -383,6 +387,10 @@ class SSHClientFixture(tobiko.SharedFixture): finally: if not connected: self.close() + else: + raise RuntimeError("Broken retry loop") + + return client def close(self): """Ensures it is disconnected from remote SSH server @@ -436,6 +444,68 @@ class SSHClientFixture(tobiko.SharedFixture): def details(self): return f"login='{self.login}'" + def open_unix_socket(self, + socket_path: str, + window_size: int = common.DEFAULT_WINDOW_SIZE, + max_packet_size: int = common.DEFAULT_MAX_PACKET_SIZE, + timeout: tobiko.Seconds = None) \ + -> paramiko.Channel: + # pylint: disable=protected-access + transport: typing.Any = self.connect().get_transport() + if transport is None or not transport.active: + raise paramiko.SSHException("SSH session not active") + timeout = 3600 if timeout is None else timeout + transport.lock.acquire() + try: + window_size = transport._sanitize_window_size(window_size) + max_packet_size = transport._sanitize_packet_size(max_packet_size) + chanid = transport._next_channel() + + # Documented here: + # https://github.com/openssh/openssh-portable/blob/master/PROTOCOL + m = paramiko.Message() + m.add_byte(b'Z') + m.add_string('direct-streamlocal@openssh.com') + m.add_int(chanid) + m.add_int(window_size) + m.add_int(max_packet_size) + m.add_string(socket_path) + # Reserved stuff + m.add_string('') + m.add_int(0) + + sock: typing.Any = paramiko.Channel(chanid) + transport._channels.put(chanid, sock) + transport.channel_events[chanid] = event = threading.Event() + transport.channels_seen[chanid] = True + sock._set_transport(transport) + sock._set_window(window_size, max_packet_size) + finally: + transport.lock.release() + + transport._send_user_message(m) + start_ts = tobiko.time() + while True: + event.wait(0.1) + if not transport.active: + e = transport.get_exception() + if e is None: + e = paramiko.SSHException("Unable to open channel.") + raise e + if event.is_set(): + break + elif start_ts + timeout < time.time(): + raise paramiko.SSHException("Timeout opening channel.") + + sock = transport._channels.get(chanid) + if sock is None: + e = transport.get_exception() + if e is None: + e = paramiko.SSHException("Unable to open channel.") + raise e + + return sock + def __repr__(self): return f"SSHClientFixture <{self.details}>"