Use retry tool for shell process synchronization

Change-Id: I4c8dbf2ed216ad01480930d908156f61757ade35
This commit is contained in:
Federico Ressi 2020-08-10 14:07:49 +02:00
parent c7d56d0213
commit 52405588eb
4 changed files with 111 additions and 160 deletions

View File

@ -19,7 +19,6 @@ import fcntl
import os
import subprocess
import sys
import time
from oslo_log import log
@ -33,12 +32,9 @@ from tobiko.shell.sh import _process
LOG = log.getLogger(__name__)
TimeoutExpired = getattr(subprocess, 'TimeoutExpired', None)
def local_execute(command, environment=None, timeout=None, shell=None,
stdin=None, stdout=None, stderr=None, expect_exit_status=0,
**kwargs):
def local_execute(command, environment=None, timeout: tobiko.Seconds = None,
shell=None, stdin=None, stdout=None, stderr=None,
expect_exit_status=0, **kwargs):
"""Execute command on local host using local shell"""
process = local_process(command=command,
environment=environment,
@ -53,9 +49,9 @@ def local_execute(command, environment=None, timeout=None, shell=None,
expect_exit_status=expect_exit_status)
def local_process(command, environment=None, current_dir=None, timeout=None,
shell=None, stdin=None, stdout=None, stderr=True, sudo=None,
network_namespace=None):
def local_process(command, environment=None, current_dir=None,
timeout: tobiko.Seconds = None, shell=None, stdin=None,
stdout=None, stderr=True, sudo=None, network_namespace=None):
return LocalShellProcessFixture(
command=command, environment=environment, current_dir=current_dir,
timeout=timeout, shell=shell, stdin=stdin, stdout=stdout,
@ -107,27 +103,15 @@ class LocalShellProcessFixture(_process.ShellProcessFixture):
def poll_exit_status(self):
return self.process.poll()
if TimeoutExpired is None:
# Workaround for old Python versions
def _get_exit_status(self, time_left):
start_time = now = time.time()
end_time = start_time + time_left
exit_status = self.poll_exit_status()
while exit_status is None and now < end_time:
time.sleep(0.1)
exit_status = self.poll_exit_status()
now = time.time()
def _get_exit_status(self, timeout: tobiko.Seconds):
try:
exit_status = self.process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
LOG.exception("Failed waiting for subprocess termination")
return None
else:
assert exit_status is not None
return exit_status
else:
def _get_exit_status(self, time_left):
try:
exit_status = self.process.wait(timeout=time_left)
except TimeoutExpired:
LOG.exception("Failed waiting for subprocess termination")
return None
else:
assert exit_status is not None
return exit_status
def kill(self):
process = self.process

View File

@ -16,7 +16,7 @@
from __future__ import absolute_import
import io
import time
import typing # noqa
from oslo_log import log
@ -29,16 +29,12 @@ from tobiko.shell.sh import _io
LOG = log.getLogger(__name__)
MAX_TIMEOUT = 3600. # 1 hour
def process(command=None, environment=None, timeout=None, shell=None,
stdin=None, stdout=None, stderr=None, ssh_client=None, sudo=None,
**kwargs):
def process(command=None, environment=None, timeout: tobiko.Seconds = None,
shell=None, stdin=None, stdout=None, stderr=None, ssh_client=None,
sudo=None, **kwargs):
kwargs.update(command=command, environment=environment, timeout=timeout,
shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
sudo=sudo)
timeout = kwargs['timeout']
if timeout is not None:
if timeout < 0.:
raise ValueError("Invalid timeout for executing process: "
@ -75,7 +71,7 @@ class ShellProcessParameters(Parameters):
command = None
environment = None
current_dir = None
timeout = None
timeout: tobiko.Seconds = None
shell = None
stdin = False
stdout = True
@ -84,14 +80,15 @@ class ShellProcessParameters(Parameters):
poll_interval = 1.
sudo = None
network_namespace = None
retry_count: typing.Optional[int] = 3
retry_interval: tobiko.Seconds = 5.
retry_timeout: tobiko.Seconds = 120.
class ShellProcessFixture(tobiko.SharedFixture):
parameters = None
command = None
timeout = None
process = None
process: typing.Any = None
stdin = None
stdout = None
stderr = None
@ -102,7 +99,7 @@ class ShellProcessFixture(tobiko.SharedFixture):
super(ShellProcessFixture, self).__init__()
self.parameters = self.init_parameters(**kwargs)
def init_parameters(self, **kwargs):
def init_parameters(self, **kwargs) -> ShellProcessParameters:
return ShellProcessParameters(**kwargs)
def execute(self):
@ -112,9 +109,6 @@ class ShellProcessFixture(tobiko.SharedFixture):
parameters = self.parameters
self.setup_command()
if parameters.timeout:
self.setup_timeout()
self.setup_process()
if parameters.stdin:
@ -153,9 +147,6 @@ class ShellProcessFixture(tobiko.SharedFixture):
self.command = command
def setup_timeout(self):
self.timeout = shell_process_timeout(self.parameters.timeout)
def setup_process(self):
if self._exit_status:
del self._exit_status
@ -198,7 +189,7 @@ class ShellProcessFixture(tobiko.SharedFixture):
except Exception:
LOG.exception("Error closing STDERR stream: %r", self.stderr)
def close(self, timeout=None):
def close(self, timeout: tobiko.Seconds = None):
self.close_stdin()
try:
# Drain all incoming data from STDOUT and STDERR
@ -233,16 +224,16 @@ class ShellProcessFixture(tobiko.SharedFixture):
def poll_exit_status(self):
raise NotImplementedError
def get_exit_status(self, timeout=None):
time_left, timeout = get_time_left([self.timeout, timeout])
if time_left > 0.:
exit_status = self._get_exit_status(time_left=time_left)
if exit_status is not None:
return exit_status
def get_exit_status(self, timeout: tobiko.Seconds = None):
if timeout is None:
timeout = self.parameters.timeout
exit_status = self._get_exit_status(timeout=timeout)
if exit_status is not None:
return exit_status
ex = _exception.ShellTimeoutExpired(
command=str(self.command),
timeout=timeout and timeout.timeout or None,
timeout=timeout,
stdin=str_from_stream(self.stdin),
stdout=str_from_stream(self.stdout),
stderr=str_from_stream(self.stderr))
@ -250,7 +241,7 @@ class ShellProcessFixture(tobiko.SharedFixture):
self.command)
raise ex
def _get_exit_status(self, time_left):
def _get_exit_status(self, timeout):
raise NotImplementedError
@property
@ -291,24 +282,29 @@ class ShellProcessFixture(tobiko.SharedFixture):
def receive_all(self, **kwargs):
self.communicate(receive_all=True, **kwargs)
def wait(self, timeout=None, receive_all=True,
def wait(self, timeout: tobiko.Seconds = None, receive_all=True,
**kwargs):
self.communicate(timeout=timeout, receive_all=receive_all,
**kwargs)
def communicate(self, stdin=None, stdout=True, stderr=True, timeout=None,
def communicate(self, stdin=None, stdout=True, stderr=True,
timeout: tobiko.Seconds = None,
receive_all=False, buffer_size=None):
timeout = shell_process_timeout(timeout=timeout)
timeout = tobiko.to_seconds(timeout)
# Avoid waiting for data in the first loop
poll_interval = 0.
streams = _io.select_opened_files([stdin and self.stdin,
stdout and self.stdout,
stderr and self.stderr])
while self._is_communicating(streams=streams, send=stdin,
receive=receive_all):
for attempt in tobiko.retry(timeout=timeout):
if not self._is_communicating(streams=streams, send=stdin,
receive=receive_all):
break
# Remove closed streams
streams = _io.select_opened_files(streams)
# Select ready streams
read_ready, write_ready = _io.select_files(
files=streams, timeout=poll_interval)
@ -331,11 +327,27 @@ class ShellProcessFixture(tobiko.SharedFixture):
if not stderr:
streams.remove(self.stderr)
else:
self._check_communicate_timeout(attempt=attempt,
timeout=timeout)
# Wait for data in the following loops
poll_interval = min(self.poll_interval,
self.check_timeout(timeout=timeout))
LOG.debug('Waiting for process (%s): %r', self.command,
streams)
poll_interval = self.parameters.poll_interval
LOG.debug(f"Waiting for process data {poll_interval} "
f"seconds... \n"
f" command: '{self.command}'\n"
f" attempt: {attempt.details}\n"
f" streams: {streams}")
def _check_communicate_timeout(self, attempt: tobiko.RetryAttempt,
timeout: tobiko.Seconds):
try:
attempt.check_limits()
except tobiko.RetryTimeLimitError:
pass
else:
return
# Eventually raises ShellCommandTimeout exception
self.get_exit_status(timeout=timeout)
raise StopIteration
def _is_communicating(self, streams, send, receive):
if send and self.stdin in streams:
@ -379,18 +391,6 @@ class ShellProcessFixture(tobiko.SharedFixture):
self.stderr.close()
return None
def check_timeout(self, timeout=None, now=None):
time_left, timeout = get_time_left([self.timeout, timeout], now=now)
if time_left <= 0.:
ex = _exception.ShellTimeoutExpired(
command=str(self.command),
timeout=timeout and timeout.timeout or None,
stdin=str_from_stream(self.stdin),
stdout=str_from_stream(self.stdout),
stderr=str_from_stream(self.stderr))
raise ex
return time_left
def check_exit_status(self, expected_status=0):
exit_status = self.poll_exit_status()
if exit_status is None:
@ -422,51 +422,6 @@ def merge_dictionaries(*dictionaries):
return merged
def shell_process_timeout(timeout):
if isinstance(timeout, ShellProcessTimeout):
return timeout
else:
return ShellProcessTimeout(timeout=timeout)
def get_time_left(timeouts, now=None):
now = now or time.time()
min_time_left = float(MAX_TIMEOUT)
min_timeout = None
for timeout in timeouts:
if timeout is not None:
timeout = shell_process_timeout(timeout=timeout)
time_left = timeout.time_left(now=now)
if time_left < min_time_left:
min_time_left = time_left
min_timeout = timeout
return min_time_left, min_timeout
class ShellProcessTimeout(object):
timeout = MAX_TIMEOUT
def __init__(self, timeout=None, start_time=None):
if timeout is None:
timeout = self.timeout
else:
self.timeout = float(timeout)
start_time = start_time and float(start_time) or time.time()
self.start_time = start_time
self.end_time = start_time + timeout
def __float__(self):
return self.timeout
def time_left(self, now=None):
now = now or time.time()
return self.end_time - now
def is_expired(self, now=None):
raise self.time_left(now=now) <= 0.
def str_from_stream(stream):
if stream is not None:
return str(stream)

View File

@ -25,14 +25,15 @@ from tobiko.shell.sh import _io
from tobiko.shell.sh import _local
from tobiko.shell.sh import _process
from tobiko.shell import ssh
import typing # noqa
LOG = log.getLogger(__name__)
def ssh_execute(ssh_client, command, environment=None, timeout=None,
stdin=None, stdout=None, stderr=None, shell=None,
expect_exit_status=0, **kwargs):
def ssh_execute(ssh_client, command, environment=None,
timeout: tobiko.Seconds = None, stdin=None, stdout=None,
stderr=None, shell=None, expect_exit_status=0, **kwargs):
"""Execute command on remote host using SSH client"""
process = ssh_process(command=command,
environment=environment,
@ -48,9 +49,10 @@ def ssh_execute(ssh_client, command, environment=None, timeout=None,
expect_exit_status=expect_exit_status)
def ssh_process(command, environment=None, current_dir=None, timeout=None,
shell=None, stdin=None, stdout=None, stderr=None,
ssh_client=None, sudo=None, network_namespace=None):
def ssh_process(command, environment=None, current_dir=None,
timeout: tobiko.Seconds = None, shell=None, stdin=None,
stdout=None, stderr=None, ssh_client=None, sudo=None,
network_namespace=None):
if ssh_client is None:
ssh_client = ssh.ssh_proxy_client()
if ssh_client:
@ -73,11 +75,7 @@ class SSHShellProcessParameters(_process.ShellProcessParameters):
class SSHShellProcessFixture(_process.ShellProcessFixture):
retry_create_process_count = 3
retry_create_process_intervall = 5.
retry_create_process_timeout = 120.
def init_parameters(self, **kwargs):
def init_parameters(self, **kwargs) -> SSHShellProcessParameters:
return SSHShellProcessParameters(**kwargs)
def create_process(self):
@ -92,9 +90,9 @@ class SSHShellProcessFixture(_process.ShellProcessFixture):
for attempt in tobiko.retry(
timeout=self.parameters.timeout,
default_count=self.retry_create_process_count,
default_interval=self.retry_create_process_intervall,
default_timeout=self.retry_create_process_timeout):
default_count=self.parameters.retry_count,
default_interval=self.parameters.retry_interval,
default_timeout=self.parameters.retry_timeout):
timeout = attempt.time_left
details = (f"command='{command}', "
@ -147,17 +145,26 @@ class SSHShellProcessFixture(_process.ShellProcessFixture):
exit_status = None
return exit_status
def _get_exit_status(self, time_left=None):
def _get_exit_status(self, timeout: tobiko.Seconds = None):
process = self.process
if not process.exit_status_ready():
# workaround for paramiko timeout problem
time_left = min(time_left, 120.0)
# Workaround for Paramiko timeout problem
# CirrOS instances could close SSH channel without sending process
# exit status
if timeout is None:
timeout = 120.
else:
timeout = min(timeout, 120.0)
LOG.debug(f"Waiting for command '{self.command}' exit status "
f"(timeout={timeout})")
# recv_exit_status method doesn't accept timeout parameter
LOG.debug('Waiting for command (%s) exit status (time_left=%r)',
self.command, time_left)
if not process.status_event.wait(timeout=time_left):
LOG.debug('Timed out before status event being set')
# therefore here we wait for next channel event expecting it is
# actually the exit status
# TODO (fressi): we could use an itimer to set a timeout for
# recv_exit_status
if not process.status_event.wait(timeout=timeout):
LOG.error("Timed out before status event being set "
f"(timeout={timeout})")
if process.exit_status >= 0:
return process.exit_status
else:

View File

@ -21,6 +21,7 @@ import getpass
import os
import socket
import subprocess
import typing # noqa
import netaddr
from oslo_log import log
@ -326,11 +327,13 @@ class SSHClientFixture(tobiko.SharedFixture):
LOG.debug(f"SSH proxy sock closed. ({self.details})")
@contextlib.contextmanager
def use_connect_parameters(self, **parameters):
if parameters:
restore_parameters = self._connect_parameters
self._connect_parameters = dict(self._connect_parameters,
**parameters)
def use_connect_parameters(self, **kwargs):
if kwargs:
restore_parameters = dict(self._connect_parameters)
gather_ssh_connect_parameters(schema=self.schema,
destination=self._connect_parameters,
**kwargs)
self.connect_parameters = self.get_connect_parameters()
else:
restore_parameters = None
try:
@ -338,16 +341,19 @@ class SSHClientFixture(tobiko.SharedFixture):
finally:
if restore_parameters is not None:
self._connect_parameters = restore_parameters
self.connect_parameters = self.get_connect_parameters()
def connect(self, timeout: tobiko.Seconds = None, **parameters):
def connect(self, retry_count: typing.Optional[int] = 2,
retry_timeout: tobiko.Seconds = None,
retry_interval: tobiko.Seconds = None, **ssh_parameters):
"""Ensures it is connected to remote SSH server
"""
with self.use_connect_parameters(**parameters):
with self.use_connect_parameters(**ssh_parameters):
# This retry is mostly intended to ensure connection is
# reestablished in case it is lost
for attempt in tobiko.retry(timeout=timeout,
default_count=2,
default_interval=5.):
for attempt in tobiko.retry(count=retry_count,
timeout=retry_timeout,
interval=retry_interval):
LOG.debug(f"Ensuring SSH connection (attempt={attempt})")
connected = False
try:
@ -365,7 +371,7 @@ class SSHClientFixture(tobiko.SharedFixture):
except Exception:
attempt.check_limits()
LOG.exception(f"Failed connecting to '{self.login}' "
"(attempt={attempt})")
f"(attempt={attempt})")
finally:
if not connected:
self.close()
@ -504,7 +510,7 @@ def ssh_connect(hostname, username=None, port=None, connection_interval=None,
port=port,
command=proxy_command,
client=proxy_client,
timeout=attempt.time_left)
timeout=connection_timeout)
client.connect(hostname=hostname,
username=username,
port=port,
@ -514,7 +520,6 @@ def ssh_connect(hostname, username=None, port=None, connection_interval=None,
paramiko.SSHException) as ex:
attempt.check_limits()
LOG.debug(f"Error logging in to '{login}': {ex}")
else:
LOG.debug(f"Successfully logged in to '{login}'")
return client, proxy_sock
@ -541,7 +546,7 @@ def ssh_proxy_sock(hostname=None, port=None, command=None, client=None,
if client:
if isinstance(client, SSHClientFixture):
# Connect to proxy server
client = client.connect(retry=tobiko.retry(timeout=timeout))
client = client.connect(connection_timeout=timeout)
elif not isinstance(client, paramiko.SSHClient):
message = "Object {!r} is not an SSHClient".format(client)
raise TypeError(message)