Update type hints for ps tool integration

Change-Id: I61abfcd4602d0a17f8bef05070f404bbe0779b02
This commit is contained in:
Federico Ressi 2021-06-17 14:32:06 +02:00
parent b801a0453e
commit 46cba5ab1f
4 changed files with 208 additions and 75 deletions

View File

@ -15,6 +15,7 @@
# under the License.
from __future__ import absolute_import
from tobiko.shell.sh import _cmdline
from tobiko.shell.sh import _command
from tobiko.shell.sh import _exception
from tobiko.shell.sh import _execute
@ -30,6 +31,8 @@ from tobiko.shell.sh import _ssh
from tobiko.shell.sh import _uptime
get_command_line = _cmdline.get_command_line
ShellCommand = _command.ShellCommand
ShellCommandType = _command.ShellCommandType
shell_command = _command.shell_command

View File

@ -0,0 +1,61 @@
# Copyright (c) 2021 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import functools
import os
from oslo_log import log
import tobiko
from tobiko.shell.sh import _command
from tobiko.shell.sh import _exception
from tobiko.shell.sh import _execute
from tobiko.shell import ssh
LOG = log.getLogger(__name__)
class GetCommandLineError(tobiko.TobikoException):
message = "Unable to get process command line: {error}"
class GetCommandLineMismatch(GetCommandLineError):
message = ("Command line of process ({pid}) doesn't match its command "
"({command}): {command_line}")
@functools.lru_cache(typed=True)
def get_command_line(pid: int,
ssh_client: ssh.SSHClientType = None,
command: str = None,
_cache_id: int = None) \
-> _command.ShellCommand:
try:
output = _execute.execute(f'cat /proc/{pid}/cmdline',
ssh_client=ssh_client).stdout
except _exception.ShellCommandFailed as ex:
raise GetCommandLineError(error=ex.stderr) from ex
command_line = _command.ShellCommand(output.strip().split('\0')[:-1])
if not command_line:
raise GetCommandLineError(error="command line is empty")
if command is not None and os.path.basename(command_line[0]) != command:
raise GetCommandLineMismatch(pid=pid, command=command,
command_line=command_line)
return command_line

View File

@ -15,7 +15,6 @@
# under the License.
from __future__ import absolute_import
import collections
import re
import time
import typing
@ -23,10 +22,11 @@ import typing
from oslo_log import log
import tobiko
from tobiko.shell.sh import _cmdline
from tobiko.shell.sh import _command
from tobiko.shell.sh import _exception
from tobiko.shell.sh import _execute
from tobiko.shell.sh import _hostname
from tobiko.shell import ssh
LOG = log.getLogger(__name__)
@ -47,36 +47,75 @@ IS_KERNEL_RE = re.compile('^\\[.*\\]$')
_NOT_FOUND = object()
class PsProcess(collections.namedtuple('PsProcess', ['ssh_client',
'pid',
'command'])):
"""Process listed by ps command
"""
class PsProcessBase:
command: str
pid: int
ssh_client: ssh.SSHClientType
@property
def is_kernel(self):
def is_kernel(self) -> bool:
return IS_KERNEL_RE.match(self.command) is not None
@property
def command_line(self) -> typing.Optional[_command.ShellCommand]:
command_line = self.__dict__.get('_command_line', _NOT_FOUND)
if command_line is _NOT_FOUND:
command_line = None
try:
output = _execute.execute(f'cat /proc/{self.pid}/cmdline',
ssh_client=self.ssh_client).stdout
except _exception.ShellCommandFailed as ex:
LOG.error(f"Unable to get process command line: {ex.stderr}")
else:
line = _command.ShellCommand(output.strip().split('\0')[:-1])
if line[0] != self.command:
LOG.error(f"Command line of process {self.pid} "
"doesn't match its command "
f"({self.command}): {line}")
else:
command_line = line
self.__dict__['command_line'] = command_line
return command_line
try:
return _cmdline.get_command_line(command=self.command,
pid=self.pid,
ssh_client=self.ssh_client,
_cache_id=id(self))
except _cmdline.GetCommandLineError as ex:
LOG.error(str(ex))
return None
class PsProcessTuple(typing.NamedTuple):
"""Process listed by ps command
"""
command: str
pid: int
ssh_client: ssh.SSHClientType
class PsProcess(PsProcessTuple, PsProcessBase):
pass
P = typing.TypeVar('P', bound=PsProcessBase)
def select_processes(
processes: typing.Iterable[PsProcessBase],
command: str = None,
pid: int = None,
is_kernel: typing.Optional[bool] = False,
command_line: _command.ShellCommandType = None) \
-> tobiko.Selection[P]:
selection = tobiko.Selection[P](processes)
if selection and pid is not None:
# filter files by PID
selection = selection.with_attributes(pid=pid)
if selection and command_line is not None:
if command is None:
command = _command.shell_command(command_line)[0]
if selection and command is not None:
# filter processes by command
pattern = re.compile(command)
selection = selection.select(
lambda process: bool(pattern.match(str(process.command))))
if selection and is_kernel is not None:
# filter kernel processes
selection = selection.with_attributes(is_kernel=bool(is_kernel))
if selection and command_line is not None:
pattern = re.compile(str(command_line))
selection = selection.select(
lambda process: bool(pattern.match(str(process.command_line))))
return selection
def list_kernel_processes(**list_params):
@ -87,17 +126,15 @@ def list_all_processes(**list_params):
return list_processes(is_kernel=None, **list_params)
def list_processes(pid=None,
command: typing.Optional[str] = None,
is_kernel=False,
ssh_client=None,
command_line: typing.Optional[str] = None,
**execute_params) -> tobiko.Selection[PsProcess]:
"""Returns the number of seconds passed since last host reboot
def list_processes(
pid: int = None,
command: str = None,
is_kernel: typing.Optional[bool] = False,
ssh_client: ssh.SSHClientType = None,
command_line: _command.ShellCommandType = None,
**execute_params) -> tobiko.Selection[PsProcess]:
"""Returns list of running process
It reads and parses remote special file /proc/uptime and returns a floating
point value that represents the number of seconds passed since last host
reboot
"""
result = _execute.execute('ps -A', expect_exit_status=None,
ssh_client=ssh_client, **execute_params)
@ -111,33 +148,11 @@ def list_processes(pid=None,
schema=PS_TABLE_SCHEMA):
processes.append(PsProcess(ssh_client=ssh_client, **process_data))
if processes and pid:
# filter processes by PID
pid = int(pid)
assert pid > 0
processes = processes.with_attributes(pid=pid)
if processes and command is not None:
# filter processes by command
pattern = re.compile(command)
processes = tobiko.Selection[PsProcess](
process
for process in processes
if pattern.match(process.command))
if processes and is_kernel is not None:
# filter kernel processes
processes = processes.with_attributes(is_kernel=bool(is_kernel))
if processes and command_line is not None:
pattern = re.compile(command_line)
processes = tobiko.Selection[PsProcess](
process
for process in processes
if (process.command_line is not None and
pattern.match(f"{process.command_line}")))
return processes
return select_processes(processes,
pid=pid,
command=command,
is_kernel=is_kernel,
command_line=command_line)
def wait_for_processes(timeout=float('inf'), sleep_interval=5.,

View File

@ -19,39 +19,63 @@ from oslo_log import log
import testtools
import tobiko
from tobiko.openstack import stacks
from tobiko.openstack import topology
from tobiko.shell import sh
from tobiko.shell import ssh
LOG = log.getLogger(__name__)
class RebootHostTest(testtools.TestCase):
class LocalPsTest(testtools.TestCase):
@property
def ssh_client(self) -> ssh.SSHClientType:
return False
def test_list_processes(self):
processes = sh.list_processes()
processes = sh.list_processes(ssh_client=self.ssh_client)
self._check_processes(processes,
is_kernel=False)
def test_list_kernel_processes(self):
processes = sh.list_kernel_processes()
processes = sh.list_kernel_processes(ssh_client=self.ssh_client)
self._check_processes(processes=processes, is_kernel=True)
def test_list_all_processes(self):
processes = sh.list_all_processes()
processes = sh.list_all_processes(ssh_client=self.ssh_client)
self._check_processes(processes=processes, is_kernel=None)
def test_list_processes_with_pid(self):
processes = sh.list_processes()
processes_with_pid = sh.list_processes(pid=processes[0].pid)
processes = sh.list_processes(ssh_client=self.ssh_client)
processes_with_pid = sh.list_processes(pid=processes[0].pid,
ssh_client=self.ssh_client)
self.assertEqual(processes[:1], processes_with_pid)
def test_list_processes_with_command(self):
processes = sh.list_processes(command='systemd')
processes = sh.list_processes(command='systemd',
ssh_client=self.ssh_client)
for process in processes:
self.assertTrue(process.command.startswith('systemd'), process)
def test_list_processes_with_command_line(self):
cat_process = sh.process('cat -',
ssh_client=self.ssh_client).execute()
self.addCleanup(cat_process.kill)
processes = sh.list_processes(command_line='cat -',
ssh_client=self.ssh_client)
for process in processes:
self.assertTrue('cat', process.command)
self.assertEqual(('cat', '-'), process.command_line)
cat_process.kill()
sh.wait_for_processes(command_line='cat -',
timeout=30.,
ssh_client=self.ssh_client)
def test_list_processes_with_exact_command(self):
processes = sh.list_processes(command='^systemd$')
processes = sh.list_processes(command='^systemd$',
ssh_client=self.ssh_client)
self.assertEqual(processes.with_attributes(command='systemd'),
processes)
@ -69,12 +93,42 @@ class RebootHostTest(testtools.TestCase):
def test_wait_for_processes(self):
# assume the PID of the first execution of PS process is not more there
# at the second execution
process = sh.list_processes(command='ps')[-1]
sh.wait_for_processes(pid=process.pid, command='ps', timeout=30.)
process = sh.list_processes(command='ps',
ssh_client=self.ssh_client)[-1]
sh.wait_for_processes(pid=process.pid,
command='ps',
timeout=30.,
ssh_client=self.ssh_client)
def test_wait_for_processes_timeout(self):
# assume there are always to be running processes on host
ex = self.assertRaises(sh.PsWaitTimeout, sh.wait_for_processes,
timeout=3.)
timeout=3.,
ssh_client=self.ssh_client)
self.assertEqual(3., ex.timeout)
self.assertEqual(sh.get_hostname(), ex.hostname)
self.assertEqual(sh.get_hostname(ssh_client=self.ssh_client),
ex.hostname)
class CirrosPsTest(LocalPsTest):
stack = tobiko.required_setup_fixture(stacks.CirrosServerStackFixture)
@property
def ssh_client(self) -> ssh.SSHClientType:
return self.stack.ssh_client
class SSHPsTest(LocalPsTest):
@property
def ssh_client(self) -> ssh.SSHClientType:
ssh_client = ssh.ssh_proxy_client()
if isinstance(ssh_client, ssh.SSHClientFixture):
return ssh_client
nodes = topology.list_openstack_nodes()
for node in nodes:
if isinstance(node.ssh_client, ssh.SSHClientFixture):
return ssh_client
tobiko.skip_test('No such SSH server host to connect to')