Merge "Add tool to connect to a remote unix socket"
This commit is contained in:
commit
d048486959
|
@ -15,6 +15,8 @@
|
|||
# under the License.
|
||||
from __future__ import absolute_import
|
||||
|
||||
import shlex
|
||||
|
||||
from oslo_log import log
|
||||
import paramiko
|
||||
from paramiko import channel
|
||||
|
@ -28,7 +30,6 @@ from tobiko.shell.sh import _process
|
|||
from tobiko.shell import ssh
|
||||
import typing # noqa
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -107,7 +108,7 @@ class SSHShellProcessFixture(_process.ShellProcessFixture):
|
|||
process = client.get_transport().open_session()
|
||||
if environment:
|
||||
variables = " ".join(
|
||||
f"{name}='{value}'"
|
||||
f"{name}={shlex.quote(value)}"
|
||||
for name, value in self.environment.items())
|
||||
command = variables + " " + command
|
||||
process.exec_command(command)
|
||||
|
|
|
@ -21,11 +21,14 @@ import getpass
|
|||
import os
|
||||
import socket
|
||||
import subprocess
|
||||
import typing # noqa
|
||||
import time
|
||||
import threading
|
||||
import typing
|
||||
|
||||
import netaddr
|
||||
from oslo_log import log
|
||||
import paramiko
|
||||
from paramiko import common
|
||||
import six
|
||||
|
||||
import tobiko
|
||||
|
@ -353,7 +356,8 @@ class SSHClientFixture(tobiko.SharedFixture):
|
|||
|
||||
def connect(self, retry_count: typing.Optional[int] = 2,
|
||||
retry_timeout: tobiko.Seconds = None,
|
||||
retry_interval: tobiko.Seconds = None, **ssh_parameters):
|
||||
retry_interval: tobiko.Seconds = None,
|
||||
**ssh_parameters) -> paramiko.SSHClient:
|
||||
"""Ensures it is connected to remote SSH server
|
||||
"""
|
||||
with self.use_connect_parameters(**ssh_parameters):
|
||||
|
@ -372,7 +376,7 @@ class SSHClientFixture(tobiko.SharedFixture):
|
|||
LOG.debug("SSH connection is safe to use "
|
||||
f"(attempt={attempt})")
|
||||
connected = True
|
||||
return client
|
||||
break
|
||||
else:
|
||||
LOG.warning("SSH connection is not safe to use "
|
||||
f"(attempt={attempt})")
|
||||
|
@ -383,6 +387,10 @@ class SSHClientFixture(tobiko.SharedFixture):
|
|||
finally:
|
||||
if not connected:
|
||||
self.close()
|
||||
else:
|
||||
raise RuntimeError("Broken retry loop")
|
||||
|
||||
return client
|
||||
|
||||
def close(self):
|
||||
"""Ensures it is disconnected from remote SSH server
|
||||
|
@ -436,6 +444,68 @@ class SSHClientFixture(tobiko.SharedFixture):
|
|||
def details(self):
|
||||
return f"login='{self.login}'"
|
||||
|
||||
def open_unix_socket(self,
|
||||
socket_path: str,
|
||||
window_size: int = common.DEFAULT_WINDOW_SIZE,
|
||||
max_packet_size: int = common.DEFAULT_MAX_PACKET_SIZE,
|
||||
timeout: tobiko.Seconds = None) \
|
||||
-> paramiko.Channel:
|
||||
# pylint: disable=protected-access
|
||||
transport: typing.Any = self.connect().get_transport()
|
||||
if transport is None or not transport.active:
|
||||
raise paramiko.SSHException("SSH session not active")
|
||||
timeout = 3600 if timeout is None else timeout
|
||||
transport.lock.acquire()
|
||||
try:
|
||||
window_size = transport._sanitize_window_size(window_size)
|
||||
max_packet_size = transport._sanitize_packet_size(max_packet_size)
|
||||
chanid = transport._next_channel()
|
||||
|
||||
# Documented here:
|
||||
# https://github.com/openssh/openssh-portable/blob/master/PROTOCOL
|
||||
m = paramiko.Message()
|
||||
m.add_byte(b'Z')
|
||||
m.add_string('direct-streamlocal@openssh.com')
|
||||
m.add_int(chanid)
|
||||
m.add_int(window_size)
|
||||
m.add_int(max_packet_size)
|
||||
m.add_string(socket_path)
|
||||
# Reserved stuff
|
||||
m.add_string('')
|
||||
m.add_int(0)
|
||||
|
||||
sock: typing.Any = paramiko.Channel(chanid)
|
||||
transport._channels.put(chanid, sock)
|
||||
transport.channel_events[chanid] = event = threading.Event()
|
||||
transport.channels_seen[chanid] = True
|
||||
sock._set_transport(transport)
|
||||
sock._set_window(window_size, max_packet_size)
|
||||
finally:
|
||||
transport.lock.release()
|
||||
|
||||
transport._send_user_message(m)
|
||||
start_ts = tobiko.time()
|
||||
while True:
|
||||
event.wait(0.1)
|
||||
if not transport.active:
|
||||
e = transport.get_exception()
|
||||
if e is None:
|
||||
e = paramiko.SSHException("Unable to open channel.")
|
||||
raise e
|
||||
if event.is_set():
|
||||
break
|
||||
elif start_ts + timeout < time.time():
|
||||
raise paramiko.SSHException("Timeout opening channel.")
|
||||
|
||||
sock = transport._channels.get(chanid)
|
||||
if sock is None:
|
||||
e = transport.get_exception()
|
||||
if e is None:
|
||||
e = paramiko.SSHException("Unable to open channel.")
|
||||
raise e
|
||||
|
||||
return sock
|
||||
|
||||
def __repr__(self):
|
||||
return f"SSHClientFixture <{self.details}>"
|
||||
|
||||
|
|
Loading…
Reference in New Issue