From ab3debf791a166b8627383ccb33ded798f0f527e Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Wed, 5 Aug 2020 14:43:37 +0200 Subject: [PATCH] Re-connect to SSH server when connection is broken Change-Id: Ifac16b1cc7fbc10ce01ee7f5346ddfe86de9fee7 --- tobiko/shell/sh/_ssh.py | 80 +++++++++---------- tobiko/shell/ssh/_client.py | 141 +++++++++++++++++++++++++--------- tobiko/shell/ssh/_config.py | 10 ++- tobiko/shell/ssh/config.py | 10 ++- tobiko/tripleo/_undercloud.py | 2 +- 5 files changed, 156 insertions(+), 87 deletions(-) diff --git a/tobiko/shell/sh/_ssh.py b/tobiko/shell/sh/_ssh.py index e2b8d9565..c7d75c8a8 100644 --- a/tobiko/shell/sh/_ssh.py +++ b/tobiko/shell/sh/_ssh.py @@ -15,8 +15,6 @@ # under the License. from __future__ import absolute_import -import time - from oslo_log import log import paramiko @@ -76,7 +74,8 @@ class SSHShellProcessParameters(_process.ShellProcessParameters): class SSHShellProcessFixture(_process.ShellProcessFixture): retry_create_process_count = 3 - retry_create_process_intervall = 1. + retry_create_process_intervall = 5. + retry_create_process_timeout = 120. def init_parameters(self, **kwargs): return SSHShellProcessParameters(**kwargs) @@ -91,48 +90,41 @@ class SSHShellProcessFixture(_process.ShellProcessFixture): tobiko.check_valid_type(parameters, SSHShellProcessParameters) environment = parameters.environment - retry_count = self.retry_create_process_count - for retry_number in range(1, retry_count + 1): - timeout = self.timeout and float(self.timeout) - LOG.debug("Executing remote command: %r (login=%r, timeout=%r, " - "environment=%r)...", - command, ssh_client.login, timeout, environment or {}) + for attempt in tobiko.retry( + timeout=self.parameters.timeout, + default_count=self.retry_create_process_count, + default_interval=self.retry_create_process_intervall, + default_timeout=self.retry_create_process_timeout): + + timeout = attempt.time_left + details = (f"command='{command}', " + f"login={ssh_client.login}, " + f"timeout={timeout}, " + f"attempt={attempt}, " + f"environment={environment}") + LOG.debug(f"Create remote process... ({details})") try: - return self._try_create_process(command=command, - environment=environment, - ssh_client=ssh_client, - timeout=timeout) - except paramiko.SSHException as ex: - try: - # Before doing anything else cleanup SSH connection - tobiko.cleanup_fixture(ssh_client) - except Exception: - LOG.exception('Failed closing SSH connection') - if "timeout" in str(ex).lower(): - LOG.debug('Timed out executing command %r (timeout=%s)', - command, timeout, exc_info=1) - raise _exception.ShellTimeoutExpired(command=command, - stdin=None, - stdout=None, - stderr=None, - timeout=timeout) - - LOG.debug('Error creating SSH process (attempt %d of %d)', - retry_number, retry_count, exc_info=1) - if retry_number >= retry_count: - # Last attempt has failed! - raise - else: - # Be patient, this could help things getting better - time.sleep(self.retry_create_process_intervall) - - def _try_create_process(self, command, environment, ssh_client, timeout): - client = ssh_client.connect() - process = client.get_transport().open_session(timeout=timeout) - if environment: - process.update_environment(environment) - process.exec_command(command) - return process + client = ssh_client.connect(timeout=timeout) + process = client.get_transport().open_session(timeout=timeout) + if environment: + process.update_environment(environment) + process.exec_command(command) + LOG.debug(f"Remote process created. ({details})") + return process + except Exception: + # Before doing anything else cleanup SSH connection + ssh_client.close() + LOG.debug(f"Error creating remote process. ({details})", + exc_info=1) + try: + attempt.check_limits() + except tobiko.RetryTimeLimitError: + LOG.debug(f"Timed out creating remote process. ({details})") + raise _exception.ShellTimeoutExpired(command=command, + stdin=None, + stdout=None, + stderr=None, + timeout=timeout) def setup_stdin(self): self.stdin = _io.ShellStdin( diff --git a/tobiko/shell/ssh/_client.py b/tobiko/shell/ssh/_client.py index 96d24f972..ece36a5e6 100644 --- a/tobiko/shell/ssh/_client.py +++ b/tobiko/shell/ssh/_client.py @@ -16,10 +16,10 @@ from __future__ import absolute_import import collections +import contextlib import getpass import os import socket -import time import subprocess import netaddr @@ -143,6 +143,9 @@ SSH_CONNECT_PARAMETERS = { #: Minimum amount of time to wait between two connection attempts 'connection_interval': positive_float, + #: Amount of time before timing our connection attempt + 'connection_timeout': positive_float, + #: Command to be executed to open proxy sock 'proxy_command': str, } @@ -300,31 +303,80 @@ class SSHClientFixture(tobiko.SharedFixture): client = self.client self.client = None if client: + LOG.debug(f"Closing SSH connection... ({self.details})") try: client.close() except Exception: - LOG.exception('Error closing client (%r)', self) + LOG.exception("Error closing SSH connection. " + f"({self.details})") + else: + LOG.debug(f"SSH connection closed. ({self.details})") def cleanup_proxy_sock(self): proxy_sock = self.proxy_sock self.proxy_sock = None if proxy_sock: + LOG.debug(f"Closing SSH proxy sock... ({self.details})") try: proxy_sock.close() except Exception: - LOG.exception('Error closing proxy socket (%r)', self) + LOG.exception("Error closing proxy socket. " + f"({self.details})") + else: + LOG.debug(f"SSH proxy sock closed. ({self.details})") - def connect(self): - client = tobiko.setup_fixture(self).client - if not client: - # For some unknown reason at this point client could be None: - # try reconnecting to it - LOG.error('SSH Paramiko client found None, reconnecting...') - client = tobiko.reset_fixture(self).client - return client + @contextlib.contextmanager + def use_connect_parameters(self, **parameters): + if parameters: + restore_parameters = self._connect_parameters + self._connect_parameters = dict(self._connect_parameters, + **parameters) + else: + restore_parameters = None + try: + yield + finally: + if restore_parameters is not None: + self._connect_parameters = restore_parameters + + def connect(self, timeout: tobiko.Seconds = None, **parameters): + """Ensures it is connected to remote SSH server + """ + with self.use_connect_parameters(**parameters): + # This retry is mostly intended to ensure connection is + # reestablished in case it is lost + for attempt in tobiko.retry(timeout=timeout, + default_count=2, + default_interval=5.): + LOG.debug(f"Ensuring SSH connection (attempt={attempt})") + connected = False + try: + client = tobiko.setup_fixture(self).client + # For any reason at this point client could + # be None: force fixture cleanup + if check_ssh_connection(client): + LOG.debug("SSH connection is safe to use " + f"(attempt={attempt})") + connected = True + return client + else: + LOG.warning("SSH connection is not safe to use " + f"(attempt={attempt})") + except Exception: + attempt.check_limits() + LOG.exception(f"Failed connecting to '{self.login}' " + "(attempt={attempt})") + finally: + if not connected: + self.close() def close(self): - tobiko.cleanup_fixture(self) + """Ensures it is disconnected from remote SSH server + """ + try: + tobiko.cleanup_fixture(self) + except Exception: + LOG.exception(f"Failed closing SSH connection to '{self.login}'") def get_ssh_command(self, host=None, username=None, port=None, command=None, config_files=None, host_config=None, @@ -366,6 +418,13 @@ class SSHClientFixture(tobiko.SharedFixture): parameters = self.setup_connect_parameters() return parameters['hostname'] + @property + def details(self): + return f"login='{self.login}'" + + def __repr__(self): + return f"SSHClientFixture <{self.details}>" + UNDEFINED_CLIENT = 'UNDEFINED_CLIENT' @@ -424,23 +483,28 @@ def ssh_client(host, port=None, username=None, proxy_jump=None, def ssh_connect(hostname, username=None, port=None, connection_interval=None, - connection_attempts=None, proxy_command=None, - proxy_client=None, **parameters): + connection_attempts=None, connection_timeout=None, + proxy_command=None, proxy_client=None, **parameters): client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.WarningPolicy()) - login = _command.ssh_login(hostname=hostname, username=username, port=port) - attempts = connection_attempts or 1 - interval = connection_interval or 5. - for attempt in range(1, attempts + 1): - LOG.debug("Logging in to %r (%r)... attempt %d out of %d", - login, parameters, attempt, attempts) - start_time = time.time() - proxy_sock = ssh_proxy_sock(hostname=hostname, - port=port, - command=proxy_command, - client=proxy_client) + + for attempt in tobiko.retry(count=connection_attempts, + timeout=connection_timeout, + interval=connection_interval, + default_count=60, + default_timeout=300., + default_interval=5.): + LOG.debug(f"Logging in to '{login}'...\n" + f" - parameters: {parameters}\n" + f" - attempt: {attempt.details}\n") + try: + proxy_sock = ssh_proxy_sock(hostname=hostname, + port=port, + command=proxy_command, + client=proxy_client, + timeout=attempt.time_left) client.connect(hostname=hostname, username=username, port=port, @@ -448,21 +512,16 @@ def ssh_connect(hostname, username=None, port=None, connection_interval=None, **parameters) except (EOFError, socket.error, socket.timeout, paramiko.SSHException) as ex: - if attempt >= attempts: - raise - - LOG.debug("Error logging in to %r: %s", login, ex) - sleep_time = start_time + interval - time.time() - if sleep_time > 0.: - time.sleep(sleep_time) + attempt.check_limits() + LOG.debug(f"Error logging in to '{login}': {ex}") else: - LOG.debug("Successfully logged in to %s", login) + LOG.debug(f"Successfully logged in to '{login}'") return client, proxy_sock def ssh_proxy_sock(hostname=None, port=None, command=None, client=None, - source_address=None): + source_address=None, timeout=None): if not command: if client: # I need a command to execute with proxy client @@ -482,7 +541,7 @@ def ssh_proxy_sock(hostname=None, port=None, command=None, client=None, if client: if isinstance(client, SSHClientFixture): # Connect to proxy server - client = client.connect() + client = client.connect(retry=tobiko.retry(timeout=timeout)) elif not isinstance(client, paramiko.SSHClient): message = "Object {!r} is not an SSHClient".format(client) raise TypeError(message) @@ -490,7 +549,7 @@ def ssh_proxy_sock(hostname=None, port=None, command=None, client=None, # Open proxy channel LOG.debug("Execute proxy command with proxy client %r: %r", client, command) - sock = client.get_transport().open_session() + sock = client.get_transport().open_session(timeout=timeout) sock.exec_command(command) else: LOG.debug("Execute proxy command on local host: %r", command) @@ -505,3 +564,13 @@ def ssh_proxy_client(manager=None, host=None, host_config=None, return manager.get_proxy_client(host=host, host_config=host_config, config_files=config_files) + + +def check_ssh_connection(client): + if client: + transport = client.get_transport() + if transport.is_authenticated(): + # Send a keep-alive message + transport.send_ignore() + return True + return False diff --git a/tobiko/shell/ssh/_config.py b/tobiko/shell/ssh/_config.py index 7613b454c..c16f70eb5 100644 --- a/tobiko/shell/ssh/_config.py +++ b/tobiko/shell/ssh/_config.py @@ -161,8 +161,11 @@ class SSHHostConfig(collections.namedtuple('SSHHostConfig', ['host', @property def connection_interval(self): - return (self.host_config.get('connetcttimeout') or - self.default.connection_interval) + return self.default.connection_interval + + @property + def connection_timeout(self): + return self.default.connection_timeout @property def connect_parameters(self): @@ -174,7 +177,8 @@ class SSHHostConfig(collections.namedtuple('SSHHostConfig', ['host', timeout=self.timeout, allow_agent=self.allow_agent, connection_attempts=self.connection_attempts, - connection_interval=self.connection_interval) + connection_interval=self.connection_interval, + connection_timeout=self.connection_timeout) def is_yes(value): diff --git a/tobiko/shell/ssh/config.py b/tobiko/shell/ssh/config.py index 9ad018189..e14f4da8f 100644 --- a/tobiko/shell/ssh/config.py +++ b/tobiko/shell/ssh/config.py @@ -47,16 +47,20 @@ OPTIONS = [ default=False, help="Set to True to turn on compression"), cfg.FloatOpt('timeout', - default=10., + default=15., help="SSH connect timeout in seconds"), cfg.IntOpt('connection_attempts', - default=100, + default=120, help=("Maximum number of connection attempts to be tried " "before timeout")), cfg.FloatOpt('connection_interval', - default=10., + default=5., help=("Minimal seconds to wait between every " "failed SSH connection attempt")), + cfg.IntOpt('connection_timeout', + default=600., + help=("Time before stopping retrying establishing an SSH " + "connection")), cfg.StrOpt('proxy_jump', default=None, help="Default SSH proxy server"), diff --git a/tobiko/tripleo/_undercloud.py b/tobiko/tripleo/_undercloud.py index 95c221442..097ba1061 100644 --- a/tobiko/tripleo/_undercloud.py +++ b/tobiko/tripleo/_undercloud.py @@ -80,7 +80,7 @@ class HasUndercloudFixture(tobiko.SharedFixture): def setup_fixture(self): ssh_client = undercloud_ssh_client() try: - ssh_client.connect() + ssh_client.connect(connection_attempts=1, timeout=15.) except Exception as ex: LOG.debug('Unable to connect to undercloud host: %s', ex) self.has_undercloud = False