Re-connect to SSH server when connection is broken

Change-Id: Ifac16b1cc7fbc10ce01ee7f5346ddfe86de9fee7
This commit is contained in:
Federico Ressi 2020-08-05 14:43:37 +02:00
parent afa575291d
commit ab3debf791
5 changed files with 156 additions and 87 deletions

View File

@ -15,8 +15,6 @@
# under the License. # under the License.
from __future__ import absolute_import from __future__ import absolute_import
import time
from oslo_log import log from oslo_log import log
import paramiko import paramiko
@ -76,7 +74,8 @@ class SSHShellProcessParameters(_process.ShellProcessParameters):
class SSHShellProcessFixture(_process.ShellProcessFixture): class SSHShellProcessFixture(_process.ShellProcessFixture):
retry_create_process_count = 3 retry_create_process_count = 3
retry_create_process_intervall = 1. retry_create_process_intervall = 5.
retry_create_process_timeout = 120.
def init_parameters(self, **kwargs): def init_parameters(self, **kwargs):
return SSHShellProcessParameters(**kwargs) return SSHShellProcessParameters(**kwargs)
@ -91,48 +90,41 @@ class SSHShellProcessFixture(_process.ShellProcessFixture):
tobiko.check_valid_type(parameters, SSHShellProcessParameters) tobiko.check_valid_type(parameters, SSHShellProcessParameters)
environment = parameters.environment environment = parameters.environment
retry_count = self.retry_create_process_count for attempt in tobiko.retry(
for retry_number in range(1, retry_count + 1): timeout=self.parameters.timeout,
timeout = self.timeout and float(self.timeout) default_count=self.retry_create_process_count,
LOG.debug("Executing remote command: %r (login=%r, timeout=%r, " default_interval=self.retry_create_process_intervall,
"environment=%r)...", default_timeout=self.retry_create_process_timeout):
command, ssh_client.login, timeout, environment or {})
timeout = attempt.time_left
details = (f"command='{command}', "
f"login={ssh_client.login}, "
f"timeout={timeout}, "
f"attempt={attempt}, "
f"environment={environment}")
LOG.debug(f"Create remote process... ({details})")
try: try:
return self._try_create_process(command=command, client = ssh_client.connect(timeout=timeout)
environment=environment, process = client.get_transport().open_session(timeout=timeout)
ssh_client=ssh_client, if environment:
timeout=timeout) process.update_environment(environment)
except paramiko.SSHException as ex: process.exec_command(command)
try: LOG.debug(f"Remote process created. ({details})")
# Before doing anything else cleanup SSH connection return process
tobiko.cleanup_fixture(ssh_client) except Exception:
except Exception: # Before doing anything else cleanup SSH connection
LOG.exception('Failed closing SSH connection') ssh_client.close()
if "timeout" in str(ex).lower(): LOG.debug(f"Error creating remote process. ({details})",
LOG.debug('Timed out executing command %r (timeout=%s)', exc_info=1)
command, timeout, exc_info=1) try:
raise _exception.ShellTimeoutExpired(command=command, attempt.check_limits()
stdin=None, except tobiko.RetryTimeLimitError:
stdout=None, LOG.debug(f"Timed out creating remote process. ({details})")
stderr=None, raise _exception.ShellTimeoutExpired(command=command,
timeout=timeout) stdin=None,
stdout=None,
LOG.debug('Error creating SSH process (attempt %d of %d)', stderr=None,
retry_number, retry_count, exc_info=1) timeout=timeout)
if retry_number >= retry_count:
# Last attempt has failed!
raise
else:
# Be patient, this could help things getting better
time.sleep(self.retry_create_process_intervall)
def _try_create_process(self, command, environment, ssh_client, timeout):
client = ssh_client.connect()
process = client.get_transport().open_session(timeout=timeout)
if environment:
process.update_environment(environment)
process.exec_command(command)
return process
def setup_stdin(self): def setup_stdin(self):
self.stdin = _io.ShellStdin( self.stdin = _io.ShellStdin(

View File

@ -16,10 +16,10 @@
from __future__ import absolute_import from __future__ import absolute_import
import collections import collections
import contextlib
import getpass import getpass
import os import os
import socket import socket
import time
import subprocess import subprocess
import netaddr import netaddr
@ -143,6 +143,9 @@ SSH_CONNECT_PARAMETERS = {
#: Minimum amount of time to wait between two connection attempts #: Minimum amount of time to wait between two connection attempts
'connection_interval': positive_float, 'connection_interval': positive_float,
#: Amount of time before timing our connection attempt
'connection_timeout': positive_float,
#: Command to be executed to open proxy sock #: Command to be executed to open proxy sock
'proxy_command': str, 'proxy_command': str,
} }
@ -300,31 +303,80 @@ class SSHClientFixture(tobiko.SharedFixture):
client = self.client client = self.client
self.client = None self.client = None
if client: if client:
LOG.debug(f"Closing SSH connection... ({self.details})")
try: try:
client.close() client.close()
except Exception: except Exception:
LOG.exception('Error closing client (%r)', self) LOG.exception("Error closing SSH connection. "
f"({self.details})")
else:
LOG.debug(f"SSH connection closed. ({self.details})")
def cleanup_proxy_sock(self): def cleanup_proxy_sock(self):
proxy_sock = self.proxy_sock proxy_sock = self.proxy_sock
self.proxy_sock = None self.proxy_sock = None
if proxy_sock: if proxy_sock:
LOG.debug(f"Closing SSH proxy sock... ({self.details})")
try: try:
proxy_sock.close() proxy_sock.close()
except Exception: except Exception:
LOG.exception('Error closing proxy socket (%r)', self) LOG.exception("Error closing proxy socket. "
f"({self.details})")
else:
LOG.debug(f"SSH proxy sock closed. ({self.details})")
def connect(self): @contextlib.contextmanager
client = tobiko.setup_fixture(self).client def use_connect_parameters(self, **parameters):
if not client: if parameters:
# For some unknown reason at this point client could be None: restore_parameters = self._connect_parameters
# try reconnecting to it self._connect_parameters = dict(self._connect_parameters,
LOG.error('SSH Paramiko client found None, reconnecting...') **parameters)
client = tobiko.reset_fixture(self).client else:
return client restore_parameters = None
try:
yield
finally:
if restore_parameters is not None:
self._connect_parameters = restore_parameters
def connect(self, timeout: tobiko.Seconds = None, **parameters):
"""Ensures it is connected to remote SSH server
"""
with self.use_connect_parameters(**parameters):
# This retry is mostly intended to ensure connection is
# reestablished in case it is lost
for attempt in tobiko.retry(timeout=timeout,
default_count=2,
default_interval=5.):
LOG.debug(f"Ensuring SSH connection (attempt={attempt})")
connected = False
try:
client = tobiko.setup_fixture(self).client
# For any reason at this point client could
# be None: force fixture cleanup
if check_ssh_connection(client):
LOG.debug("SSH connection is safe to use "
f"(attempt={attempt})")
connected = True
return client
else:
LOG.warning("SSH connection is not safe to use "
f"(attempt={attempt})")
except Exception:
attempt.check_limits()
LOG.exception(f"Failed connecting to '{self.login}' "
"(attempt={attempt})")
finally:
if not connected:
self.close()
def close(self): def close(self):
tobiko.cleanup_fixture(self) """Ensures it is disconnected from remote SSH server
"""
try:
tobiko.cleanup_fixture(self)
except Exception:
LOG.exception(f"Failed closing SSH connection to '{self.login}'")
def get_ssh_command(self, host=None, username=None, port=None, def get_ssh_command(self, host=None, username=None, port=None,
command=None, config_files=None, host_config=None, command=None, config_files=None, host_config=None,
@ -366,6 +418,13 @@ class SSHClientFixture(tobiko.SharedFixture):
parameters = self.setup_connect_parameters() parameters = self.setup_connect_parameters()
return parameters['hostname'] return parameters['hostname']
@property
def details(self):
return f"login='{self.login}'"
def __repr__(self):
return f"SSHClientFixture <{self.details}>"
UNDEFINED_CLIENT = 'UNDEFINED_CLIENT' UNDEFINED_CLIENT = 'UNDEFINED_CLIENT'
@ -424,23 +483,28 @@ def ssh_client(host, port=None, username=None, proxy_jump=None,
def ssh_connect(hostname, username=None, port=None, connection_interval=None, def ssh_connect(hostname, username=None, port=None, connection_interval=None,
connection_attempts=None, proxy_command=None, connection_attempts=None, connection_timeout=None,
proxy_client=None, **parameters): proxy_command=None, proxy_client=None, **parameters):
client = paramiko.SSHClient() client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.WarningPolicy()) client.set_missing_host_key_policy(paramiko.WarningPolicy())
login = _command.ssh_login(hostname=hostname, username=username, port=port) login = _command.ssh_login(hostname=hostname, username=username, port=port)
attempts = connection_attempts or 1
interval = connection_interval or 5. for attempt in tobiko.retry(count=connection_attempts,
for attempt in range(1, attempts + 1): timeout=connection_timeout,
LOG.debug("Logging in to %r (%r)... attempt %d out of %d", interval=connection_interval,
login, parameters, attempt, attempts) default_count=60,
start_time = time.time() default_timeout=300.,
proxy_sock = ssh_proxy_sock(hostname=hostname, default_interval=5.):
port=port, LOG.debug(f"Logging in to '{login}'...\n"
command=proxy_command, f" - parameters: {parameters}\n"
client=proxy_client) f" - attempt: {attempt.details}\n")
try: try:
proxy_sock = ssh_proxy_sock(hostname=hostname,
port=port,
command=proxy_command,
client=proxy_client,
timeout=attempt.time_left)
client.connect(hostname=hostname, client.connect(hostname=hostname,
username=username, username=username,
port=port, port=port,
@ -448,21 +512,16 @@ def ssh_connect(hostname, username=None, port=None, connection_interval=None,
**parameters) **parameters)
except (EOFError, socket.error, socket.timeout, except (EOFError, socket.error, socket.timeout,
paramiko.SSHException) as ex: paramiko.SSHException) as ex:
if attempt >= attempts: attempt.check_limits()
raise LOG.debug(f"Error logging in to '{login}': {ex}")
LOG.debug("Error logging in to %r: %s", login, ex)
sleep_time = start_time + interval - time.time()
if sleep_time > 0.:
time.sleep(sleep_time)
else: else:
LOG.debug("Successfully logged in to %s", login) LOG.debug(f"Successfully logged in to '{login}'")
return client, proxy_sock return client, proxy_sock
def ssh_proxy_sock(hostname=None, port=None, command=None, client=None, def ssh_proxy_sock(hostname=None, port=None, command=None, client=None,
source_address=None): source_address=None, timeout=None):
if not command: if not command:
if client: if client:
# I need a command to execute with proxy client # I need a command to execute with proxy client
@ -482,7 +541,7 @@ def ssh_proxy_sock(hostname=None, port=None, command=None, client=None,
if client: if client:
if isinstance(client, SSHClientFixture): if isinstance(client, SSHClientFixture):
# Connect to proxy server # Connect to proxy server
client = client.connect() client = client.connect(retry=tobiko.retry(timeout=timeout))
elif not isinstance(client, paramiko.SSHClient): elif not isinstance(client, paramiko.SSHClient):
message = "Object {!r} is not an SSHClient".format(client) message = "Object {!r} is not an SSHClient".format(client)
raise TypeError(message) raise TypeError(message)
@ -490,7 +549,7 @@ def ssh_proxy_sock(hostname=None, port=None, command=None, client=None,
# Open proxy channel # Open proxy channel
LOG.debug("Execute proxy command with proxy client %r: %r", LOG.debug("Execute proxy command with proxy client %r: %r",
client, command) client, command)
sock = client.get_transport().open_session() sock = client.get_transport().open_session(timeout=timeout)
sock.exec_command(command) sock.exec_command(command)
else: else:
LOG.debug("Execute proxy command on local host: %r", command) LOG.debug("Execute proxy command on local host: %r", command)
@ -505,3 +564,13 @@ def ssh_proxy_client(manager=None, host=None, host_config=None,
return manager.get_proxy_client(host=host, return manager.get_proxy_client(host=host,
host_config=host_config, host_config=host_config,
config_files=config_files) config_files=config_files)
def check_ssh_connection(client):
if client:
transport = client.get_transport()
if transport.is_authenticated():
# Send a keep-alive message
transport.send_ignore()
return True
return False

View File

@ -161,8 +161,11 @@ class SSHHostConfig(collections.namedtuple('SSHHostConfig', ['host',
@property @property
def connection_interval(self): def connection_interval(self):
return (self.host_config.get('connetcttimeout') or return self.default.connection_interval
self.default.connection_interval)
@property
def connection_timeout(self):
return self.default.connection_timeout
@property @property
def connect_parameters(self): def connect_parameters(self):
@ -174,7 +177,8 @@ class SSHHostConfig(collections.namedtuple('SSHHostConfig', ['host',
timeout=self.timeout, timeout=self.timeout,
allow_agent=self.allow_agent, allow_agent=self.allow_agent,
connection_attempts=self.connection_attempts, connection_attempts=self.connection_attempts,
connection_interval=self.connection_interval) connection_interval=self.connection_interval,
connection_timeout=self.connection_timeout)
def is_yes(value): def is_yes(value):

View File

@ -47,16 +47,20 @@ OPTIONS = [
default=False, default=False,
help="Set to True to turn on compression"), help="Set to True to turn on compression"),
cfg.FloatOpt('timeout', cfg.FloatOpt('timeout',
default=10., default=15.,
help="SSH connect timeout in seconds"), help="SSH connect timeout in seconds"),
cfg.IntOpt('connection_attempts', cfg.IntOpt('connection_attempts',
default=100, default=120,
help=("Maximum number of connection attempts to be tried " help=("Maximum number of connection attempts to be tried "
"before timeout")), "before timeout")),
cfg.FloatOpt('connection_interval', cfg.FloatOpt('connection_interval',
default=10., default=5.,
help=("Minimal seconds to wait between every " help=("Minimal seconds to wait between every "
"failed SSH connection attempt")), "failed SSH connection attempt")),
cfg.IntOpt('connection_timeout',
default=600.,
help=("Time before stopping retrying establishing an SSH "
"connection")),
cfg.StrOpt('proxy_jump', cfg.StrOpt('proxy_jump',
default=None, default=None,
help="Default SSH proxy server"), help="Default SSH proxy server"),

View File

@ -80,7 +80,7 @@ class HasUndercloudFixture(tobiko.SharedFixture):
def setup_fixture(self): def setup_fixture(self):
ssh_client = undercloud_ssh_client() ssh_client = undercloud_ssh_client()
try: try:
ssh_client.connect() ssh_client.connect(connection_attempts=1, timeout=15.)
except Exception as ex: except Exception as ex:
LOG.debug('Unable to connect to undercloud host: %s', ex) LOG.debug('Unable to connect to undercloud host: %s', ex)
self.has_undercloud = False self.has_undercloud = False