Fix shell process execution:

- fix waiting for local process termination on Python2
- fix waiting for remote process termination with big timeouts
- exit from communicate method loop when all streams are closed
- fix attribute error when killing remote process


Change-Id: I6a54e8af45385aa25aa8562e031913c97e9e87ce
This commit is contained in:
Federico Ressi 2019-09-19 16:36:06 +02:00
parent e8af17614b
commit c74821dd88
4 changed files with 78 additions and 46 deletions

View File

@ -78,7 +78,7 @@ def execute(command, environment=None, timeout=None, shell=None,
def execute_process(process, stdin, expect_exit_status):
with process:
if stdin and isinstance(stdin, DATA_TYPES):
process.send(data=stdin)
process.send_all(data=stdin)
if expect_exit_status is not None:
process.check_exit_status(expect_exit_status)

View File

@ -19,6 +19,7 @@ import fcntl
import os
import subprocess
import sys
import time
from oslo_log import log
@ -32,6 +33,9 @@ from tobiko.shell.sh import _process
LOG = log.getLogger(__name__)
TimeoutExpired = getattr(subprocess, 'TimeoutExpired', None)
def local_execute(command, environment=None, timeout=None, shell=None,
stdin=None, stdout=None, stderr=None, expect_exit_status=0,
**kwargs):
@ -103,12 +107,22 @@ class LocalShellProcessFixture(_process.ShellProcessFixture):
return self.process.poll()
def get_exit_status(self, timeout=None):
exit_status = self.process.returncode
if exit_status is None:
exit_status = self.process.poll()
while exit_status is None:
timeout = self.check_timeout(timeout=timeout)
LOG.debug("Waiting for remote command termination: "
"timeout=%r, command=%r", timeout, self.command)
exit_status = timeout.wait(timeout=timeout)
if TimeoutExpired is None:
# Workaround for old Python versions that don't accept timeout
# as parameters for wait method
time.sleep(0.1)
exit_status = self.process.poll()
else:
try:
exit_status = self.process.wait(timeout=min(5., timeout))
except TimeoutExpired:
LOG.exception("Failed waiting for subprocess termination")
return exit_status
def kill(self):

View File

@ -243,63 +243,77 @@ class ShellProcessFixture(tobiko.SharedFixture):
stdout=str_from_stream(self.stdout),
stderr=str_from_stream(self.stderr))
def send(self, data, timeout=None):
self.comunicate(stdin=data, timeout=timeout, wait=False)
def send_all(self, data, **kwargs):
self.communicate(stdin=data, **kwargs)
self.stdin.flush()
def wait(self, timeout=None):
def receive_all(self, **kwargs):
self.communicate(receive_all=True, **kwargs)
def wait(self, timeout=None, receive_all=True, **kwargs):
timeout = shell_process_timeout(timeout=timeout)
try:
self.comunicate(stdin=None, timeout=timeout, wait=True)
self.communicate(timeout=timeout, receive_all=receive_all,
**kwargs)
finally:
self.get_exit_status(timeout=timeout)
def comunicate(self, stdin=None, stdout=True, stderr=True, timeout=None,
wait=True, buffer_size=None):
def communicate(self, stdin=None, stdout=True, stderr=True, timeout=None,
receive_all=False, buffer_size=None):
timeout = shell_process_timeout(timeout=timeout)
# Avoid waiting for data in the first loop
poll_interval = 0.
poll_files = _io.select_opened_files([stdin and self.stdin,
stdout and self.stdout,
stderr and self.stderr])
while wait or stdin:
self.check_timeout(timeout=timeout)
LOG.debug('poll_files: %r', poll_files)
LOG.debug('is_running: %r', self.is_running)
wait = wait and bool(poll_files or self.is_running)
LOG.debug('wait: %r', wait)
read_ready, write_ready = _io.select_files(files=poll_files,
timeout=poll_interval)
LOG.debug('read_ready: %r', read_ready)
LOG.debug('write_ready: %r', write_ready)
streams = _io.select_opened_files([stdin and self.stdin,
stdout and self.stdout,
stderr and self.stderr])
while self._is_communicating(streams=streams, send=stdin,
receive=receive_all):
# Remove closed streams
streams = _io.select_opened_files(streams)
# Select ready streams
read_ready, write_ready = _io.select_files(
files=streams, timeout=poll_interval)
if read_ready or write_ready:
LOG.debug('Communicating with process (%s): %r', self.command,
read_ready | write_ready)
# Avoid waiting for data the next time
poll_interval = 0.
if self.stdin in write_ready:
# Write data to remote STDIN
stdin = self._write_to_stdin(stdin)
if not stdin:
if wait:
self.stdin.close()
else:
# Stop polling STDIN for write
self.stdin.flush()
poll_files.remove(self.stdin)
streams.remove(self.stdin)
if self.stdout in read_ready:
# Read data from remote STDOUT
if not self._read_from_stdout(buffer_size=buffer_size):
poll_files.remove(self.stdout)
stdout = self._read_from_stdout(buffer_size=buffer_size)
if not stdout:
streams.remove(self.stdout)
if self.stderr in read_ready:
# Read data from remote STDERR
if not self._read_from_stderr(buffer_size=buffer_size):
poll_files.remove(self.stderr)
stderr = self._read_from_stderr(buffer_size=buffer_size)
if not stderr:
streams.remove(self.stderr)
else:
# Wait for data in the following loops
poll_interval = min(self.poll_interval,
self.check_timeout(timeout=timeout))
# Remove closed streams
poll_files = _io.select_opened_files(poll_files)
LOG.debug('Waiting for process (%s): %r', self.command,
streams)
def _is_communicating(self, streams, send, receive):
if send and self.stdin in streams:
LOG.debug('Trying to send data to process (%s): %r', self.command,
streams)
return True
elif receive and {self.stdout, self.stderr} & streams:
LOG.debug('Trying to receive data from process (%s): %r',
self.command, streams)
return True
else:
LOG.debug('Stop communicating with process (%s): %r', self.command,
streams)
return False
def _write_to_stdin(self, data, check=True):
"""Write data to STDIN"""

View File

@ -118,21 +118,25 @@ class SSHShellProcessFixture(_process.ShellProcessFixture):
process = self.process
exit_status = process.exit_status
if exit_status < 0:
timeout = self.check_timeout(timeout=timeout)
LOG.debug("Waiting for remote command termination: "
"timeout=%r, command=%r", timeout, self.command)
process.status_event.wait(timeout=timeout)
assert process.status_event.is_set()
exit_status = process.exit_status
if exit_status < 0:
exit_status = None
while True:
timeout = self.check_timeout(timeout=timeout)
LOG.debug("Waiting for remote command termination: "
"timeout=%r, command=%r", timeout, self.command)
process.status_event.wait(timeout=min(timeout, 5.))
if process.status_event.is_set():
exit_status = process.exit_status
break
if exit_status < 0:
exit_status = None
return exit_status
def kill(self):
process = self.process
LOG.debug('Killing remote process: %r', self.command)
try:
process.kill()
process.close()
except Exception:
LOG.exception("Failed killing remote process: %r",
self.command)