tobiko/tobiko/shell/sh/_process.py

460 lines
15 KiB
Python

# Copyright (c) 2019 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import io
import typing # noqa
from oslo_log import log
import tobiko
from tobiko.shell.sh import _command
from tobiko.shell.sh import _exception
from tobiko.shell.sh import _io
LOG = log.getLogger(__name__)
def process(command=None, environment=None, timeout: tobiko.Seconds = None,
shell=None, stdin=None, stdout=None, stderr=None, ssh_client=None,
sudo=None, **kwargs):
kwargs.update(command=command, environment=environment, timeout=timeout,
shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
sudo=sudo)
if timeout is not None:
if timeout < 0.:
raise ValueError("Invalid timeout for executing process: "
"{!r}".format(timeout))
try:
from tobiko.shell.sh import _ssh
from tobiko.shell import ssh
except ImportError:
if ssh_client:
raise
else:
if ssh_client is None:
ssh_client = ssh.ssh_proxy_client()
if ssh_client:
return _ssh.ssh_process(ssh_client=ssh_client, **kwargs)
from tobiko.shell.sh import _local
return _local.local_process(**kwargs)
class Parameters(object):
def __init__(self, **kwargs):
cls = type(self)
for name, value in kwargs.items():
if value is not None:
if not hasattr(cls, name):
raise ValueError('Invalid parameter: {!s}'.format(name))
setattr(self, name, value)
class ShellProcessParameters(Parameters):
command = None
environment = None
current_dir = None
timeout: tobiko.Seconds = None
stdin = False
stdout = True
stderr = True
buffer_size = io.DEFAULT_BUFFER_SIZE
poll_interval = 1.
network_namespace = None
retry_count: typing.Optional[int] = 3
retry_interval: tobiko.Seconds = 5.
retry_timeout: tobiko.Seconds = 120.
shell: typing.Union[None, bool, str] = None
sudo: typing.Union[None, bool, str] = None
class ShellProcessFixture(tobiko.SharedFixture):
command = None
process: typing.Any = None
stdin = None
stdout = None
stderr = None
default_shell: typing.Union[None, bool, str] = None
_exit_status = None
def __init__(self, **kwargs):
super(ShellProcessFixture, self).__init__()
self.parameters = self.init_parameters(**kwargs)
def init_parameters(self, **kwargs) -> ShellProcessParameters:
return ShellProcessParameters(**kwargs)
def execute(self) -> 'ShellProcessFixture':
return tobiko.setup_fixture(self)
def setup_fixture(self):
parameters = self.parameters
self.setup_command()
self.setup_process()
if parameters.stdin:
self.setup_stdin()
if parameters.stdout:
self.setup_stdout()
if parameters.stderr:
self.setup_stderr()
def setup_command(self):
command = _command.shell_command(self.parameters.command)
network_namespace = self.parameters.network_namespace
sudo = self.parameters.sudo
shell = self.parameters.shell
if shell is None:
shell = self.default_shell
if shell is not None:
tobiko.check_valid_type(shell, (bool, str))
if isinstance(shell, str):
command = _command.shell_command(shell) + [str(command)]
elif shell is True:
command = default_shell_command() + [str(command)]
if network_namespace:
if sudo is None:
sudo = True
command = network_namespace_command(network_namespace, command)
if sudo:
if sudo is True:
sudo = default_sudo_command()
else:
sudo = _command.shell_command(sudo)
command = sudo + command
self.command = command
def setup_process(self):
if self._exit_status:
del self._exit_status
self.process = self.create_process()
self.addCleanup(self.close)
def setup_stdin(self):
raise NotImplementedError
def setup_stdout(self):
raise NotImplementedError
def setup_stderr(self):
raise NotImplementedError
def create_process(self):
raise NotImplementedError
def close_stdin(self):
stdin = self.stdin
if stdin is not None:
try:
stdin.closed or stdin.close()
except Exception:
LOG.exception("Error closing STDIN stream: %r", self.stdin)
def close_stdout(self):
stdout = self.stdout
if stdout is not None:
try:
stdout.closed or stdout.close()
except Exception:
LOG.exception("Error closing STDOUT stream: %r", self.stdout)
def close_stderr(self):
stderr = self.stderr
if stderr is not None:
try:
stderr.closed or stderr.close()
except Exception:
LOG.exception("Error closing STDERR stream: %r", self.stderr)
def close(self, timeout: tobiko.Seconds = None):
self.close_stdin()
try:
# Drain all incoming data from STDOUT and STDERR
self.wait(timeout=timeout)
finally:
self._terminate()
def _terminate(self):
self.close_stdout()
self.close_stderr()
exit_status = None
try:
exit_status = self.get_exit_status()
finally:
if exit_status is None:
try:
self.kill()
except Exception:
LOG.exception('Error killing process: %r', self.command)
def __getattr__(self, name):
try:
# Get attributes from parameters class
return getattr(self.parameters, name)
except AttributeError as ex:
message = "object {!r} has not attribute {!r}".format(self, name)
raise AttributeError(message) from ex
def kill(self, sudo=False):
raise NotImplementedError
def poll_exit_status(self):
raise NotImplementedError
def get_exit_status(self, timeout: tobiko.Seconds = None):
if timeout is None:
timeout = self.parameters.timeout
exit_status = self._get_exit_status(timeout=timeout)
if exit_status is not None:
return exit_status
ex = _exception.ShellTimeoutExpired(
command=str(self.command),
timeout=timeout,
stdin=str_from_stream(self.stdin),
stdout=str_from_stream(self.stdout),
stderr=str_from_stream(self.stderr))
LOG.debug("Timed out while waiting for command termination:\n%s",
self.command)
raise ex
def _get_exit_status(self, timeout):
raise NotImplementedError
@property
def exit_status(self):
exit_status = self._exit_status
if exit_status is None:
exit_status = self.poll_exit_status()
if exit_status is not None:
self._exit_status = exit_status
return exit_status
@property
def is_running(self):
return self.exit_status is None
def check_is_running(self):
exit_status = self.exit_status
if exit_status is not None:
raise _exception.ShellProcessTerminated(
command=str(self.command),
exit_status=int(exit_status),
stdin=str_from_stream(self.stdin),
stdout=str_from_stream(self.stdout),
stderr=str_from_stream(self.stderr))
def check_stdin_is_opened(self):
if self.stdin.closed:
raise _exception.ShellStdinClosed(
command=str(self.command),
stdin=str_from_stream(self.stdin),
stdout=str_from_stream(self.stdout),
stderr=str_from_stream(self.stderr))
def send_all(self, data, **kwargs):
self.communicate(stdin=data, **kwargs)
self.stdin.flush()
def receive_all(self, **kwargs):
self.communicate(receive_all=True, **kwargs)
def wait(self, timeout: tobiko.Seconds = None, receive_all=True,
**kwargs):
self.communicate(timeout=timeout, receive_all=receive_all,
**kwargs)
def communicate(self, stdin=None, stdout=True, stderr=True,
timeout: tobiko.Seconds = None,
receive_all=False, buffer_size=None):
timeout = tobiko.to_seconds(timeout)
# Avoid waiting for data in the first loop
poll_interval = 0.
streams = _io.select_opened_files([stdin and self.stdin,
stdout and self.stdout,
stderr and self.stderr])
for attempt in tobiko.retry(timeout=timeout):
if not self._is_communicating(streams=streams, send=stdin,
receive=receive_all):
break
# Remove closed streams
streams = _io.select_opened_files(streams)
# Select ready streams
read_ready, write_ready = _io.select_files(
files=streams, timeout=poll_interval)
if read_ready or write_ready:
# Avoid waiting for data the next time
poll_interval = 0.
if self.stdin in write_ready:
# Write data to remote STDIN
stdin = self._write_to_stdin(stdin)
if not stdin:
streams.remove(self.stdin)
if self.stdout in read_ready:
# Read data from remote STDOUT
stdout = self._read_from_stdout(buffer_size=buffer_size)
if not stdout:
streams.remove(self.stdout)
if self.stderr in read_ready:
# Read data from remote STDERR
stderr = self._read_from_stderr(buffer_size=buffer_size)
if not stderr:
streams.remove(self.stderr)
else:
self._check_communicate_timeout(attempt=attempt,
timeout=timeout)
# Wait for data in the following loops
poll_interval = self.parameters.poll_interval
LOG.debug(f"Waiting for process data {poll_interval} "
f"seconds... \n"
f" command: {self.command}\n"
f" attempt: {attempt.details}\n"
f" streams: {streams}")
def _check_communicate_timeout(self, attempt: tobiko.RetryAttempt,
timeout: tobiko.Seconds):
try:
attempt.check_limits()
except tobiko.RetryTimeLimitError:
pass
else:
return
# Eventually raises ShellCommandTimeout exception
self.get_exit_status(timeout=timeout)
raise StopIteration
def _is_communicating(self, streams, send, receive):
if send and self.stdin in streams:
return True
elif receive and {self.stdout, self.stderr} & streams:
return True
else:
return False
def _write_to_stdin(self, data, check=True):
"""Write data to STDIN"""
if check:
self.check_stdin_is_opened()
sent_bytes = self.stdin.write(data)
if sent_bytes:
return data[sent_bytes:] or None
else:
LOG.debug("%r closed by peer on %r", self.stdin, self)
self.stdin.close()
return data
def _read_from_stdout(self, buffer_size=None):
"""Read data from remote stream"""
# Read data from remote stream
chunk = self.stdout.read(buffer_size)
if chunk:
return chunk
else:
LOG.debug("%r closed by peer on %r", self.stdout, self)
self.stdout.close()
return None
def _read_from_stderr(self, buffer_size=None):
"""Read data from remote stream"""
# Read data from remote stream
chunk = self.stderr.read(buffer_size)
if chunk:
return chunk
else:
LOG.debug("%r closed by peer on %r", self.stderr, self)
self.stderr.close()
return None
def check_exit_status(self, expected_status=0):
exit_status = self.poll_exit_status()
if exit_status is None:
time_left = self.check_timeout()
ex = _exception.ShellProcessNotTerminated(
command=str(self.command),
time_left=time_left,
stdin=self.stdin,
stdout=self.stdout,
stderr=self.stderr)
raise ex
exit_status = int(exit_status)
if expected_status != exit_status:
ex = _exception.ShellCommandFailed(
command=str(self.command),
exit_status=exit_status,
stdin=str_from_stream(self.stdin),
stdout=str_from_stream(self.stdout),
stderr=str_from_stream(self.stderr))
raise ex
def merge_dictionaries(*dictionaries):
merged = {}
for d in dictionaries:
if d:
merged.update(d)
return merged
def str_from_stream(stream):
if stream is not None:
try:
return str(stream)
except UnicodeDecodeError:
LOG.exception('Unable to decode as a string - '
'Returning the raw data')
return stream.data
else:
return None
def bytes_from_stream(stream):
if stream is not None:
return stream.data
else:
return None
def default_shell_command():
from tobiko import config
CONF = config.CONF
return _command.shell_command(CONF.tobiko.shell.command)
def default_sudo_command():
from tobiko import config
CONF = config.CONF
return _command.shell_command(CONF.tobiko.shell.sudo)
def network_namespace_command(network_namespace, command):
return _command.shell_command(['/sbin/ip', 'netns', 'exec',
network_namespace]) + command