diff --git a/tobiko/shell/sh/_ps.py b/tobiko/shell/sh/_ps.py index 25f9db796..f20d514e5 100644 --- a/tobiko/shell/sh/_ps.py +++ b/tobiko/shell/sh/_ps.py @@ -16,7 +16,6 @@ from __future__ import absolute_import import re -import time import typing from oslo_log import log @@ -51,6 +50,7 @@ class PsProcessBase: command: str pid: int ssh_client: ssh.SSHClientType + is_cirros: typing.Optional[bool] = None @property def is_kernel(self) -> bool: @@ -68,13 +68,23 @@ class PsProcessBase: return None def kill(self, signal: int = None, **execute_params): + execute_params.update(ssh_client=self.ssh_client) command_line = _command.shell_command("kill") if signal is not None: command_line += f"-s {signal}" command_line += str(self.pid) - _execute.execute(command_line, - ssh_client=self.ssh_client, - **execute_params) + _execute.execute(command_line, **execute_params) + + def wait(self, + timeout: tobiko.Seconds = None, + sleep_interval: tobiko.Seconds = None, + **execute_params): + execute_params.update(ssh_client=self.ssh_client) + wait_for_processes(timeout=timeout, + sleep_interval=sleep_interval, + is_cirros=self.is_cirros, + pid=self.pid, + **execute_params) class PsProcessTuple(typing.NamedTuple): @@ -83,6 +93,7 @@ class PsProcessTuple(typing.NamedTuple): command: str pid: int ssh_client: ssh.SSHClientType + is_cirros: typing.Optional[bool] = None class PsProcess(PsProcessTuple, PsProcessBase): @@ -141,21 +152,32 @@ def list_processes( is_kernel: typing.Optional[bool] = False, ssh_client: ssh.SSHClientType = None, command_line: _command.ShellCommandType = None, + is_cirros: bool = None, **execute_params) -> tobiko.Selection[PsProcess]: """Returns list of running process """ - result = _execute.execute('ps -A', expect_exit_status=None, - ssh_client=ssh_client, **execute_params) + ps_command = _command.shell_command('ps') + if pid is None or is_cirros in [True, None]: + ps_command += '-A' + else: + ps_command += f"-p {pid}" + + result = _execute.execute(ps_command, + expect_exit_status=None, + ssh_client=ssh_client, + **execute_params) output = result.stdout and result.stdout.strip() - if result.exit_status or not output: + if not output: raise PsError(error=result.stderr) # Extract a list of PsProcess instances from table body processes = tobiko.Selection[PsProcess]() for process_data in parse_table(lines=output.splitlines(), schema=PS_TABLE_SCHEMA): - processes.append(PsProcess(ssh_client=ssh_client, **process_data)) + processes.append(PsProcess(ssh_client=ssh_client, + is_cirros=is_cirros, + **process_data)) return select_processes(processes, pid=pid, @@ -164,28 +186,31 @@ def list_processes( command_line=command_line) -def wait_for_processes(timeout=float('inf'), sleep_interval=5., - ssh_client=None, **list_params): - start_time = time.time() - time_left = timeout - while True: - processes = list_processes(timeout=time_left, - ssh_client=ssh_client, +def wait_for_processes(timeout: tobiko.Seconds = None, + sleep_interval: tobiko.Seconds = None, + ssh_client: ssh.SSHClientType = None, + is_cirros: bool = None, + **list_params): + for attempt in tobiko.retry(timeout=timeout, + interval=sleep_interval, + default_interval=5.): + processes = list_processes(ssh_client=ssh_client, + is_cirros=is_cirros, **list_params) if not processes: break - time_left = timeout - (time.time() - start_time) - if time_left < sleep_interval: - hostname = _hostname.get_hostname(ssh_client=ssh_client) - process_lines = [ - ' {pid} {command}'.format(pid=process.pid, - command=process.command) - for process in processes] + hostname = _hostname.get_hostname(ssh_client=ssh_client) + process_lines = [ + ' {pid} {command}'.format(pid=process.pid, + command=process.command) + for process in processes] + + if attempt.is_last: raise PsWaitTimeout(timeout=timeout, hostname=hostname, processes='\n'.join(process_lines)) - - time.sleep(sleep_interval) + LOG.debug(f"Waiting for process(es) on host {hostname}...\n" + '\n'.join(process_lines)) def parse_pid(value): @@ -216,7 +241,7 @@ def parse_table(lines, schema, header_line=None): getters.append((position, getter)) for line in lines: - row = line.strip().split() + row = line.strip().split(maxsplit=len(column_names) - 1) if row: yield dict(getter(row[position]) for position, getter in getters) diff --git a/tobiko/tests/functional/shell/test_ps.py b/tobiko/tests/functional/shell/test_ps.py index b80c2dec9..def00cc55 100644 --- a/tobiko/tests/functional/shell/test_ps.py +++ b/tobiko/tests/functional/shell/test_ps.py @@ -15,6 +15,8 @@ # under the License. from __future__ import absolute_import +import typing + from oslo_log import log import testtools @@ -34,28 +36,35 @@ class LocalPsTest(testtools.TestCase): def ssh_client(self) -> ssh.SSHClientType: return False + is_cirros = False + + @property + def parameters(self) -> typing.Dict[str, typing.Any]: + return dict(is_cirros=self.is_cirros, + ssh_client=self.ssh_client) + def test_list_processes(self): - processes = sh.list_processes(ssh_client=self.ssh_client) + processes = sh.list_processes(**self.parameters) self._check_processes(processes, is_kernel=False) def test_list_kernel_processes(self): - processes = sh.list_kernel_processes(ssh_client=self.ssh_client) + processes = sh.list_kernel_processes(**self.parameters) self._check_processes(processes=processes, is_kernel=True) def test_list_all_processes(self): - processes = sh.list_all_processes(ssh_client=self.ssh_client) + processes = sh.list_all_processes(**self.parameters) self._check_processes(processes=processes, is_kernel=None) def test_list_processes_with_pid(self): - processes = sh.list_processes(ssh_client=self.ssh_client) + processes = sh.list_processes(**self.parameters) processes_with_pid = sh.list_processes(pid=processes[0].pid, - ssh_client=self.ssh_client) + **self.parameters) self.assertEqual(processes[:1], processes_with_pid) def test_list_processes_with_command(self): processes = sh.list_processes(command='systemd', - ssh_client=self.ssh_client) + **self.parameters) for process in processes: self.assertTrue(process.command.startswith('systemd'), process) @@ -64,18 +73,18 @@ class LocalPsTest(testtools.TestCase): ssh_client=self.ssh_client).execute() self.addCleanup(cat_process.kill) processes = sh.list_processes(command_line='cat -', - ssh_client=self.ssh_client) + **self.parameters) for process in processes: self.assertEqual('cat', process.command) self.assertEqual(('cat', '-'), process.command_line) cat_process.kill() sh.wait_for_processes(command_line='cat -', timeout=30., - ssh_client=self.ssh_client) + **self.parameters) def test_list_processes_with_exact_command(self): processes = sh.list_processes(command='^systemd$', - ssh_client=self.ssh_client) + **self.parameters) self.assertEqual(processes.with_attributes(command='systemd'), processes) @@ -94,17 +103,35 @@ class LocalPsTest(testtools.TestCase): # assume the PID of the first execution of PS process is not more there # at the second execution process = sh.list_processes(command='ps', - ssh_client=self.ssh_client)[-1] + **self.parameters).first sh.wait_for_processes(pid=process.pid, command='ps', timeout=30., - ssh_client=self.ssh_client) + **self.parameters) def test_wait_for_processes_timeout(self): # assume there are always to be running processes on host - ex = self.assertRaises(sh.PsWaitTimeout, sh.wait_for_processes, + ex = self.assertRaises(sh.PsWaitTimeout, + sh.wait_for_processes, + pid=1, timeout=3., - ssh_client=self.ssh_client) + **self.parameters) + self.assertEqual(3., ex.timeout) + self.assertEqual(sh.get_hostname(ssh_client=self.ssh_client), + ex.hostname) + + def test_wait(self): + process = sh.list_processes(command='ps', + **self.parameters).first + process.wait() + + def test_wait_with_timeout(self): + # assume there are always to be running processes on host + process = sh.list_processes(pid=1, + **self.parameters).unique + ex = self.assertRaises(sh.PsWaitTimeout, + process.wait, + timeout=3.) self.assertEqual(3., ex.timeout) self.assertEqual(sh.get_hostname(ssh_client=self.ssh_client), ex.hostname) @@ -112,6 +139,7 @@ class LocalPsTest(testtools.TestCase): class CirrosPsTest(LocalPsTest): + is_cirros = True stack = tobiko.required_fixture(stacks.CirrosServerStackFixture) @property