Debug and fix execution loop in comunicate process

Change-Id: I41016e8df4125160ff276f8e71d3c369b86524df
This commit is contained in:
Federico Ressi 2019-08-23 17:56:19 +02:00
parent 3d247cabdd
commit be4cec3203
3 changed files with 123 additions and 43 deletions

View File

@ -102,15 +102,28 @@ class LocalShellProcessFixture(_process.ShellProcessFixture):
def poll_exit_status(self):
return self.process.poll()
def get_exit_status(self, timeout=None):
exit_status = self.process.returncode
if exit_status is None:
timeout = self.check_timeout(timeout=timeout)
LOG.debug("Waiting for remote command termination: "
"timeout=%r, command=%r", timeout, self.command)
exit_status = timeout.wait(timeout=timeout)
return exit_status
def kill(self):
process = self.process
LOG.debug('Killing local process: %r', self.command)
try:
self.process.kill()
process.kill()
except Exception:
LOG.exception('Failed killing subprocess')
LOG.exception('Failed killing local process: %r (PID=%r)',
self.command, self.pid)
@property
def pid(self):
return self.process.pid
process = self.process
return process and process.pid or None
def set_non_blocking(fd):

View File

@ -85,6 +85,8 @@ class ShellProcessFixture(tobiko.SharedFixture):
stdout = None
stderr = None
_exit_status = None
def __init__(self, **kwargs):
super(ShellProcessFixture, self).__init__()
self.parameters = self.init_parameters(**kwargs)
@ -125,9 +127,11 @@ class ShellProcessFixture(tobiko.SharedFixture):
self.command = command
def setup_timeout(self):
self.timeout = ShellProcessTimeout(self.parameters.timeout)
self.timeout = shell_process_timeout(self.parameters.timeout)
def setup_process(self):
if self._exit_status:
del self._exit_status
self.process = self.create_process()
self.addCleanup(self.close)
@ -173,11 +177,22 @@ class ShellProcessFixture(tobiko.SharedFixture):
# Drain all incoming data from STDOUT and STDERR
self.wait(timeout=timeout)
finally:
# Avoid leaving zombie processes
self.timeout = None
self.close_stdout()
self.close_stderr()
if self.is_running:
try:
self._terminate()
except Exception:
LOG.debug('Exception terminating process: %r',
self.command, exc_info=1)
def _terminate(self):
self.close_stdout()
self.close_stderr()
try:
exit_status = self.get_exit_status()
except Exception:
LOG.exception('Error getting exit status')
exit_status = None
finally:
if exit_status is None:
self.kill()
def __getattr__(self, name):
@ -194,16 +209,24 @@ class ShellProcessFixture(tobiko.SharedFixture):
def poll_exit_status(self):
raise NotImplementedError
def get_exit_status(self, timeout=None):
raise NotImplementedError
@property
def exit_status(self):
return self.poll_exit_status()
exit_status = self._exit_status
if exit_status is None:
exit_status = self.poll_exit_status()
if exit_status is not None:
self._exit_status = exit_status
return exit_status
@property
def is_running(self):
return self.exit_status is None
def check_is_running(self):
exit_status = self.poll_exit_status()
exit_status = self.exit_status
if exit_status is not None:
raise _exception.ShellProcessTeriminated(
command=str(self.command),
@ -224,11 +247,15 @@ class ShellProcessFixture(tobiko.SharedFixture):
self.comunicate(stdin=data, timeout=timeout, wait=False)
def wait(self, timeout=None):
self.comunicate(stdin=None, timeout=timeout, wait=True)
timeout = shell_process_timeout(timeout=timeout)
try:
self.comunicate(stdin=None, timeout=timeout, wait=True)
finally:
self.get_exit_status(timeout=timeout)
def comunicate(self, stdin=None, stdout=True, stderr=True, timeout=None,
wait=True, buffer_size=None):
timeout = ShellProcessTimeout(timeout=timeout)
timeout = shell_process_timeout(timeout=timeout)
# Avoid waiting for data in the first loop
poll_interval = 0.
poll_files = _io.select_opened_files([stdin and self.stdin,
@ -236,9 +263,16 @@ class ShellProcessFixture(tobiko.SharedFixture):
stderr and self.stderr])
while wait or stdin:
self.check_timeout(timeout=timeout)
wait = wait and (poll_files or self.is_running)
LOG.debug('poll_files: %r', poll_files)
LOG.debug('is_running: %r', self.is_running)
wait = wait and bool(poll_files or self.is_running)
LOG.debug('wait: %r', wait)
read_ready, write_ready = _io.select_files(files=poll_files,
timeout=poll_interval)
LOG.debug('read_ready: %r', read_ready)
LOG.debug('write_ready: %r', write_ready)
if read_ready or write_ready:
# Avoid waiting for data the next time
poll_interval = 0.
@ -251,13 +285,15 @@ class ShellProcessFixture(tobiko.SharedFixture):
else:
# Stop polling STDIN for write
self.stdin.flush()
poll_files.remove(self.stdin)
poll_files.remove(self.stdin)
if self.stdout in read_ready:
# Read data from remote STDOUT
self._read_from_stdout(buffer_size=buffer_size)
if not self._read_from_stdout(buffer_size=buffer_size):
poll_files.remove(self.stdout)
if self.stderr in read_ready:
# Read data from remote STDERR
self._read_from_stderr(buffer_size=buffer_size)
if not self._read_from_stderr(buffer_size=buffer_size):
poll_files.remove(self.stderr)
else:
# Wait for data in the following loops
poll_interval = min(self.poll_interval,
@ -305,28 +341,17 @@ class ShellProcessFixture(tobiko.SharedFixture):
self.stderr.close()
return None
def time_left(self, now=None, timeout=None):
now = now or time.time()
time_left = self.timeout.time_left(now=now)
if timeout:
time_left = min(time_left, timeout.time_left(now=now))
return time_left
def check_timeout(self, timeout=None, now=None):
now = now or time.time()
time_left = float('inf')
for timeout in [self.timeout, timeout]:
if timeout is not None:
time_left = min(time_left, timeout.time_left(now=now))
if time_left <= 0.:
ex = _exception.ShellTimeoutExpired(
command=str(self.command),
timeout=timeout.timeout,
stdin=str_from_stream(self.stdin),
stdout=str_from_stream(self.stdout),
stderr=str_from_stream(self.stderr))
LOG.debug("%s", ex)
raise ex
time_left, timeout = get_time_left([self.timeout, timeout], now=now)
if time_left <= 0.:
ex = _exception.ShellTimeoutExpired(
command=str(self.command),
timeout=timeout and timeout.timeout or None,
stdin=str_from_stream(self.stdin),
stdout=str_from_stream(self.stdout),
stderr=str_from_stream(self.stderr))
LOG.debug("%s", ex)
raise ex
return time_left
def check_exit_status(self, expected_status=0):
@ -369,6 +394,27 @@ def merge_dictionaries(*dictionaries):
return merged
def shell_process_timeout(timeout):
if isinstance(timeout, ShellProcessTimeout):
return timeout
else:
return ShellProcessTimeout(timeout=timeout)
def get_time_left(timeouts, now=None):
now = now or time.time()
min_time_left = float('inf')
min_timeout = None
for timeout in timeouts:
if timeout is not None:
timeout = shell_process_timeout(timeout=timeout)
time_left = timeout.time_left(now=now)
if time_left < min_time_left:
min_time_left = time_left
min_timeout = timeout
return min_time_left, min_timeout
class ShellProcessTimeout(object):
timeout = float('inf')

View File

@ -108,13 +108,34 @@ class SSHShellProcessFixture(_process.ShellProcessFixture):
buffer_size=self.parameters.buffer_size)
def poll_exit_status(self):
if self.process.exit_status_ready():
return self.process.recv_exit_status()
else:
return None
process = self.process
exit_status = process.exit_status
if exit_status < 0:
exit_status = None
return exit_status
def get_exit_status(self, timeout=None):
process = self.process
exit_status = process.exit_status
if exit_status < 0:
timeout = self.check_timeout(timeout=timeout)
LOG.debug("Waiting for remote command termination: "
"timeout=%r, command=%r", timeout, self.command)
process.status_event.wait(timeout=timeout)
assert process.status_event.is_set()
exit_status = process.exit_status
if exit_status < 0:
exit_status = None
return exit_status
def kill(self):
self.process.close()
process = self.process
LOG.debug('Killing remote process: %r', self.command)
try:
process.kill()
except Exception:
LOG.exception("Failed killing remote process: %r",
self.command)
class SSHChannelFile(paramiko.ChannelFile):