Add wait method to PsProcess class

Change-Id: I66cb378b358eb4074e023327440cac1775c12f1a
This commit is contained in:
Federico Ressi 2022-02-17 06:19:02 +01:00
parent 462b4b5f2e
commit 6fe739fde7
2 changed files with 91 additions and 38 deletions

View File

@ -16,7 +16,6 @@
from __future__ import absolute_import from __future__ import absolute_import
import re import re
import time
import typing import typing
from oslo_log import log from oslo_log import log
@ -51,6 +50,7 @@ class PsProcessBase:
command: str command: str
pid: int pid: int
ssh_client: ssh.SSHClientType ssh_client: ssh.SSHClientType
is_cirros: typing.Optional[bool] = None
@property @property
def is_kernel(self) -> bool: def is_kernel(self) -> bool:
@ -68,13 +68,23 @@ class PsProcessBase:
return None return None
def kill(self, signal: int = None, **execute_params): def kill(self, signal: int = None, **execute_params):
execute_params.update(ssh_client=self.ssh_client)
command_line = _command.shell_command("kill") command_line = _command.shell_command("kill")
if signal is not None: if signal is not None:
command_line += f"-s {signal}" command_line += f"-s {signal}"
command_line += str(self.pid) command_line += str(self.pid)
_execute.execute(command_line, _execute.execute(command_line, **execute_params)
ssh_client=self.ssh_client,
**execute_params) def wait(self,
timeout: tobiko.Seconds = None,
sleep_interval: tobiko.Seconds = None,
**execute_params):
execute_params.update(ssh_client=self.ssh_client)
wait_for_processes(timeout=timeout,
sleep_interval=sleep_interval,
is_cirros=self.is_cirros,
pid=self.pid,
**execute_params)
class PsProcessTuple(typing.NamedTuple): class PsProcessTuple(typing.NamedTuple):
@ -83,6 +93,7 @@ class PsProcessTuple(typing.NamedTuple):
command: str command: str
pid: int pid: int
ssh_client: ssh.SSHClientType ssh_client: ssh.SSHClientType
is_cirros: typing.Optional[bool] = None
class PsProcess(PsProcessTuple, PsProcessBase): class PsProcess(PsProcessTuple, PsProcessBase):
@ -141,21 +152,32 @@ def list_processes(
is_kernel: typing.Optional[bool] = False, is_kernel: typing.Optional[bool] = False,
ssh_client: ssh.SSHClientType = None, ssh_client: ssh.SSHClientType = None,
command_line: _command.ShellCommandType = None, command_line: _command.ShellCommandType = None,
is_cirros: bool = None,
**execute_params) -> tobiko.Selection[PsProcess]: **execute_params) -> tobiko.Selection[PsProcess]:
"""Returns list of running process """Returns list of running process
""" """
result = _execute.execute('ps -A', expect_exit_status=None, ps_command = _command.shell_command('ps')
ssh_client=ssh_client, **execute_params) if pid is None or is_cirros in [True, None]:
ps_command += '-A'
else:
ps_command += f"-p {pid}"
result = _execute.execute(ps_command,
expect_exit_status=None,
ssh_client=ssh_client,
**execute_params)
output = result.stdout and result.stdout.strip() output = result.stdout and result.stdout.strip()
if result.exit_status or not output: if not output:
raise PsError(error=result.stderr) raise PsError(error=result.stderr)
# Extract a list of PsProcess instances from table body # Extract a list of PsProcess instances from table body
processes = tobiko.Selection[PsProcess]() processes = tobiko.Selection[PsProcess]()
for process_data in parse_table(lines=output.splitlines(), for process_data in parse_table(lines=output.splitlines(),
schema=PS_TABLE_SCHEMA): schema=PS_TABLE_SCHEMA):
processes.append(PsProcess(ssh_client=ssh_client, **process_data)) processes.append(PsProcess(ssh_client=ssh_client,
is_cirros=is_cirros,
**process_data))
return select_processes(processes, return select_processes(processes,
pid=pid, pid=pid,
@ -164,28 +186,31 @@ def list_processes(
command_line=command_line) command_line=command_line)
def wait_for_processes(timeout=float('inf'), sleep_interval=5., def wait_for_processes(timeout: tobiko.Seconds = None,
ssh_client=None, **list_params): sleep_interval: tobiko.Seconds = None,
start_time = time.time() ssh_client: ssh.SSHClientType = None,
time_left = timeout is_cirros: bool = None,
while True: **list_params):
processes = list_processes(timeout=time_left, for attempt in tobiko.retry(timeout=timeout,
ssh_client=ssh_client, interval=sleep_interval,
default_interval=5.):
processes = list_processes(ssh_client=ssh_client,
is_cirros=is_cirros,
**list_params) **list_params)
if not processes: if not processes:
break break
time_left = timeout - (time.time() - start_time) hostname = _hostname.get_hostname(ssh_client=ssh_client)
if time_left < sleep_interval: process_lines = [
hostname = _hostname.get_hostname(ssh_client=ssh_client) ' {pid} {command}'.format(pid=process.pid,
process_lines = [ command=process.command)
' {pid} {command}'.format(pid=process.pid, for process in processes]
command=process.command)
for process in processes] if attempt.is_last:
raise PsWaitTimeout(timeout=timeout, hostname=hostname, raise PsWaitTimeout(timeout=timeout, hostname=hostname,
processes='\n'.join(process_lines)) processes='\n'.join(process_lines))
LOG.debug(f"Waiting for process(es) on host {hostname}...\n"
time.sleep(sleep_interval) '\n'.join(process_lines))
def parse_pid(value): def parse_pid(value):
@ -216,7 +241,7 @@ def parse_table(lines, schema, header_line=None):
getters.append((position, getter)) getters.append((position, getter))
for line in lines: for line in lines:
row = line.strip().split() row = line.strip().split(maxsplit=len(column_names) - 1)
if row: if row:
yield dict(getter(row[position]) yield dict(getter(row[position])
for position, getter in getters) for position, getter in getters)

View File

@ -15,6 +15,8 @@
# under the License. # under the License.
from __future__ import absolute_import from __future__ import absolute_import
import typing
from oslo_log import log from oslo_log import log
import testtools import testtools
@ -34,28 +36,35 @@ class LocalPsTest(testtools.TestCase):
def ssh_client(self) -> ssh.SSHClientType: def ssh_client(self) -> ssh.SSHClientType:
return False return False
is_cirros = False
@property
def parameters(self) -> typing.Dict[str, typing.Any]:
return dict(is_cirros=self.is_cirros,
ssh_client=self.ssh_client)
def test_list_processes(self): def test_list_processes(self):
processes = sh.list_processes(ssh_client=self.ssh_client) processes = sh.list_processes(**self.parameters)
self._check_processes(processes, self._check_processes(processes,
is_kernel=False) is_kernel=False)
def test_list_kernel_processes(self): def test_list_kernel_processes(self):
processes = sh.list_kernel_processes(ssh_client=self.ssh_client) processes = sh.list_kernel_processes(**self.parameters)
self._check_processes(processes=processes, is_kernel=True) self._check_processes(processes=processes, is_kernel=True)
def test_list_all_processes(self): def test_list_all_processes(self):
processes = sh.list_all_processes(ssh_client=self.ssh_client) processes = sh.list_all_processes(**self.parameters)
self._check_processes(processes=processes, is_kernel=None) self._check_processes(processes=processes, is_kernel=None)
def test_list_processes_with_pid(self): def test_list_processes_with_pid(self):
processes = sh.list_processes(ssh_client=self.ssh_client) processes = sh.list_processes(**self.parameters)
processes_with_pid = sh.list_processes(pid=processes[0].pid, processes_with_pid = sh.list_processes(pid=processes[0].pid,
ssh_client=self.ssh_client) **self.parameters)
self.assertEqual(processes[:1], processes_with_pid) self.assertEqual(processes[:1], processes_with_pid)
def test_list_processes_with_command(self): def test_list_processes_with_command(self):
processes = sh.list_processes(command='systemd', processes = sh.list_processes(command='systemd',
ssh_client=self.ssh_client) **self.parameters)
for process in processes: for process in processes:
self.assertTrue(process.command.startswith('systemd'), process) self.assertTrue(process.command.startswith('systemd'), process)
@ -64,18 +73,18 @@ class LocalPsTest(testtools.TestCase):
ssh_client=self.ssh_client).execute() ssh_client=self.ssh_client).execute()
self.addCleanup(cat_process.kill) self.addCleanup(cat_process.kill)
processes = sh.list_processes(command_line='cat -', processes = sh.list_processes(command_line='cat -',
ssh_client=self.ssh_client) **self.parameters)
for process in processes: for process in processes:
self.assertEqual('cat', process.command) self.assertEqual('cat', process.command)
self.assertEqual(('cat', '-'), process.command_line) self.assertEqual(('cat', '-'), process.command_line)
cat_process.kill() cat_process.kill()
sh.wait_for_processes(command_line='cat -', sh.wait_for_processes(command_line='cat -',
timeout=30., timeout=30.,
ssh_client=self.ssh_client) **self.parameters)
def test_list_processes_with_exact_command(self): def test_list_processes_with_exact_command(self):
processes = sh.list_processes(command='^systemd$', processes = sh.list_processes(command='^systemd$',
ssh_client=self.ssh_client) **self.parameters)
self.assertEqual(processes.with_attributes(command='systemd'), self.assertEqual(processes.with_attributes(command='systemd'),
processes) processes)
@ -94,17 +103,35 @@ class LocalPsTest(testtools.TestCase):
# assume the PID of the first execution of PS process is not more there # assume the PID of the first execution of PS process is not more there
# at the second execution # at the second execution
process = sh.list_processes(command='ps', process = sh.list_processes(command='ps',
ssh_client=self.ssh_client)[-1] **self.parameters).first
sh.wait_for_processes(pid=process.pid, sh.wait_for_processes(pid=process.pid,
command='ps', command='ps',
timeout=30., timeout=30.,
ssh_client=self.ssh_client) **self.parameters)
def test_wait_for_processes_timeout(self): def test_wait_for_processes_timeout(self):
# assume there are always to be running processes on host # assume there are always to be running processes on host
ex = self.assertRaises(sh.PsWaitTimeout, sh.wait_for_processes, ex = self.assertRaises(sh.PsWaitTimeout,
sh.wait_for_processes,
pid=1,
timeout=3., timeout=3.,
ssh_client=self.ssh_client) **self.parameters)
self.assertEqual(3., ex.timeout)
self.assertEqual(sh.get_hostname(ssh_client=self.ssh_client),
ex.hostname)
def test_wait(self):
process = sh.list_processes(command='ps',
**self.parameters).first
process.wait()
def test_wait_with_timeout(self):
# assume there are always to be running processes on host
process = sh.list_processes(pid=1,
**self.parameters).unique
ex = self.assertRaises(sh.PsWaitTimeout,
process.wait,
timeout=3.)
self.assertEqual(3., ex.timeout) self.assertEqual(3., ex.timeout)
self.assertEqual(sh.get_hostname(ssh_client=self.ssh_client), self.assertEqual(sh.get_hostname(ssh_client=self.ssh_client),
ex.hostname) ex.hostname)
@ -112,6 +139,7 @@ class LocalPsTest(testtools.TestCase):
class CirrosPsTest(LocalPsTest): class CirrosPsTest(LocalPsTest):
is_cirros = True
stack = tobiko.required_fixture(stacks.CirrosServerStackFixture) stack = tobiko.required_fixture(stacks.CirrosServerStackFixture)
@property @property