Create reboot host operation

Filter listed processes using regular expressions so
that it can wait that the init process is terminated
before rebooting remote VM

Change-Id: I2f0736a64acb7868494740dbcf339bfaf59f1736
This commit is contained in:
Federico Ressi 2020-02-05 10:39:32 +01:00
parent 087cadb617
commit 60bb46639d
7 changed files with 389 additions and 108 deletions

View File

@ -22,6 +22,7 @@ from tobiko.shell.sh import _hostname
from tobiko.shell.sh import _io
from tobiko.shell.sh import _local
from tobiko.shell.sh import _process
from tobiko.shell.sh import _ps
from tobiko.shell.sh import _reboot
from tobiko.shell.sh import _ssh
from tobiko.shell.sh import _uptime
@ -53,7 +54,15 @@ LocalExecutePathFixture = _local.LocalExecutePathFixture
process = _process.process
ShellProcessFixture = _process.ShellProcessFixture
PsError = _ps.PsError
PsWaitTimeout = _ps.PsWaitTimeout
list_all_processes = _ps.list_all_processes
list_kernel_processes = _ps.list_kernel_processes
list_processes = _ps.list_processes
wait_for_processes = _ps.wait_for_processes
reboot_host = _reboot.reboot_host
RebootHostOperation = _reboot.RebootHostOperation
ssh_process = _ssh.ssh_process
ssh_execute = _ssh.ssh_execute

153
tobiko/shell/sh/_ps.py Normal file
View File

@ -0,0 +1,153 @@
# Copyright (c) 2020 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import collections
import re
import time
import tobiko
from tobiko.shell.sh import _execute
from tobiko.shell.sh import _hostname
class PsError(tobiko.TobikoException):
message = "Unable to list processes from host: {error}"
class PsWaitTimeout(PsError):
message = ("Process(es) still running on host {hostname!r} after "
"{timeout} seconds:\n{processes!s}")
IS_KERNEL_RE = re.compile('^\\[.*\\]$')
class PsProcess(collections.namedtuple('PsProcess', ['ssh_client',
'pid',
'command'])):
"""Process listed by ps command
"""
@property
def is_kernel(self):
return IS_KERNEL_RE.match(self.command) is not None
def list_kernel_processes(**list_params):
return list_processes(is_kernel=True, **list_params)
def list_all_processes(**list_params):
return list_processes(is_kernel=None, **list_params)
def list_processes(pid=None, command=None, is_kernel=False, ssh_client=None,
**execute_params):
"""Returns the number of seconds passed since last host reboot
It reads and parses remote special file /proc/uptime and returns a floating
point value that represents the number of seconds passed since last host
reboot
"""
result = _execute.execute('ps -A', expect_exit_status=None,
ssh_client=ssh_client, **execute_params)
output = result.stdout and result.stdout.strip()
if result.exit_status or not output:
raise PsError(error=result.stderr)
# Extract a list of PsProcess instances from table body
processes = tobiko.Selection()
for process_data in parse_table(lines=output.splitlines(),
schema=PS_TABLE_SCHEMA):
processes.append(PsProcess(ssh_client=ssh_client, **process_data))
if processes and pid:
# filter processes by PID
pid = int(pid)
assert pid > 0
processes = processes.with_attributes(pid=pid)
if processes and command is not None:
# filter processes by command
command = re.compile(command)
processes = tobiko.select(process
for process in processes
if command.match(process.command))
if processes and is_kernel is not None:
# filter kernel processes
processes = processes.with_attributes(is_kernel=bool(is_kernel))
return processes
def wait_for_processes(timeout=float('inf'), sleep_interval=5.,
ssh_client=None, **list_params):
start_time = time.time()
time_left = timeout
while True:
processes = list_processes(timeout=time_left,
ssh_client=ssh_client,
**list_params)
if not processes:
break
time_left = timeout - (time.time() - start_time)
if time_left < sleep_interval:
hostname = _hostname.get_hostname(ssh_client=ssh_client)
process_lines = [
' {pid} {command}'.format(pid=process.pid,
command=process.command)
for process in processes]
raise PsWaitTimeout(timeout=timeout, hostname=hostname,
processes='\n'.join(process_lines))
time.sleep(sleep_interval)
def parse_pid(value):
return 'pid', int(value)
def parse_command(value):
return 'command', str(value)
PS_TABLE_SCHEMA = {
'pid': parse_pid,
'cmd': parse_command,
'command': parse_command,
}
def parse_table(lines, schema, header_line=None):
lines = iter(lines)
while not header_line:
header_line = next(lines)
getters = []
column_names = header_line.strip().lower().split()
for position, name in enumerate(column_names):
getter = schema.get(name)
if getter:
getters.append((position, getter))
for line in lines:
row = line.strip().split()
if row:
yield dict(getter(row[position])
for position, getter in getters)

View File

@ -1,4 +1,4 @@
# Copyright 2019 Red Hat
# Copyright 2020 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -19,8 +19,8 @@ from oslo_log import log
import tobiko
from tobiko.shell.sh import _execute
from tobiko.shell.sh import _hostname
from tobiko.shell.sh import _uptime
from tobiko.shell import ssh
LOG = log.getLogger(__name__)
@ -30,105 +30,122 @@ class RebootHostTimeoutError(tobiko.TobikoException):
message = "host {hostname!r} not rebooted after {timeout!s} seconds"
def reboot_host(ssh_client, wait=True, timeout=None, sleep_interval=None,
retry_interval=None):
"""Gracefully reboots a remote host using an SSH client
Given an SSH client to a remote host it executes /sbin/reboot command
and then it start polling for remote host uptime value to make sure
the node is actually rebooted before a given timeout.
"""
with ssh_client:
hostname = _hostname.get_hostname(ssh_client=ssh_client,
timeout=timeout)
LOG.debug('Rebooting host %r...', hostname)
_execute.execute('sudo /sbin/reboot', timeout=timeout, stdout=False,
ssh_client=ssh_client)
if wait:
if timeout is None:
timeout = 300.
if sleep_interval is None:
sleep_interval = 1.
if retry_interval is None:
retry_interval = 100.
else:
retry_interval = max(retry_interval, 5.)
start_time = time.time()
elapsed_time = 0.
retry_time = retry_interval
while True:
try:
_wait_for_host_rebooted(ssh_client=ssh_client,
hostname=hostname,
start_time=start_time,
timeout=min(retry_time, timeout),
sleep_interval=sleep_interval)
break
except RebootHostTimeoutError:
elapsed_time = time.time() - start_time
if elapsed_time >= timeout:
raise
LOG.debug("Retrying rebooting host %r %s seconds after "
"reboot...", hostname, elapsed_time)
with ssh_client:
_execute.execute('sudo /sbin/reboot', timeout=(
timeout - elapsed_time), ssh_client=ssh_client)
elapsed_time = time.time() - start_time
retry_time = elapsed_time + retry_interval
def reboot_host(ssh_client, wait=True, timeout=None, sleep_interval=None):
reboot = RebootHostOperation(ssh_client=ssh_client,
wait=wait,
timeout=timeout,
sleep_interval=sleep_interval)
return tobiko.setup_fixture(reboot)
def _wait_for_host_rebooted(ssh_client, hostname, start_time, timeout,
sleep_interval):
while not _is_host_rebooted(ssh_client=ssh_client,
hostname=hostname,
start_time=start_time,
timeout=timeout):
if sleep_interval > 0.:
time.sleep(sleep_interval)
class RebootHostOperation(tobiko.Operation):
wait = True
start_time = None
hostname = None
timeout = 600.
ssh_client = None
sleep_interval = 1.
is_rebooted = False
def _is_host_rebooted(ssh_client, hostname, start_time, timeout):
# ensure SSH connection is closed before retrying connecting
tobiko.cleanup_fixture(ssh_client)
assert ssh_client.client is None
def __init__(self, ssh_client=None, timeout=None, wait=None,
sleep_interval=None):
super(RebootHostOperation, self).__init__()
if ssh_client:
self.ssh_client = ssh_client
tobiko.check_valid_type(self.ssh_client, ssh.SSHClientFixture)
elapsed_time = time.time() - start_time
if elapsed_time >= timeout:
raise RebootHostTimeoutError(hostname=hostname,
timeout=timeout)
if timeout is not None:
self.timeout = float(timeout)
assert self.timeout > 0.
LOG.debug("Reconnecting to host %r %s seconds after reboot...",
hostname, elapsed_time)
try:
uptime = _uptime.get_uptime(ssh_client=ssh_client,
timeout=(timeout-elapsed_time))
except Exception as ex:
# if disconnected while getting uptime we assume the VM is just
# rebooting. These are good news!
if wait is not None:
self.wait = bool(wait)
if sleep_interval is not None:
self.sleep_interval = float(sleep_interval)
assert self.sleep_interval >= 0.
def run_operation(self):
self.start_time = time.time()
ssh_client = self.ssh_client
with ssh_client:
self.is_rebooted = False
self.hostname = hostname = ssh_client.hostname
LOG.debug('Rebooting host %r...', hostname)
_execute.execute('sudo /sbin/reboot', timeout=self.timeout,
stdout=False, ssh_client=ssh_client)
if self.wait:
self.wait_for_operation()
def cleanup_fixture(self):
if self.hostname is not None:
del self.hostname
if self.start_time is not None:
del self.start_time
self.is_rebooted = False
def wait_for_operation(self):
sleep_interval = self.sleep_interval
while not self.check_is_rebooted():
if sleep_interval > 0.:
time.sleep(sleep_interval)
def check_is_rebooted(self):
if self.is_rebooted:
return True
# ensure SSH connection is closed before retrying connecting
ssh_client = self.ssh_client
tobiko.cleanup_fixture(ssh_client)
assert ssh_client.client is None
elapsed_time = time.time() - start_time
LOG.debug("Unable to get uptime from %r host after %r "
"seconds: %s", hostname, elapsed_time, ex)
return False
# verify that reboot actually happened by comparing elapsed time with
# uptime
elapsed_time = time.time() - start_time
if uptime >= elapsed_time:
tobiko.cleanup_fixture(ssh_client)
assert ssh_client.client is None
LOG.warning("Host %r still not rebooted after %s seconds after reboot "
"(uptime=%r)", hostname, elapsed_time, uptime)
return False
elapsed_time = self.check_elapsed_time()
LOG.debug("Reconnecting to host %r %s seconds after reboot...",
self.hostname, elapsed_time)
if elapsed_time is None:
raise RuntimeError("Reboot operation didn't started")
LOG.debug("Reconnected to host %r %s seconds after reboot "
"(uptime=%r)", hostname, elapsed_time, uptime)
assert ssh_client.client is not None
return True
try:
uptime = _uptime.get_uptime(ssh_client=ssh_client,
timeout=(self.timeout-elapsed_time))
except Exception:
# if disconnected while getting uptime we assume the VM is just
# rebooting. These are good news!
tobiko.cleanup_fixture(ssh_client)
assert ssh_client.client is None
LOG.debug("Unable to get uptime from host %r", self.hostname,
exc_info=1)
return False
# verify that reboot actually happened by comparing elapsed time with
# uptime
elapsed_time = self.get_elapsed_time()
if uptime >= elapsed_time:
tobiko.cleanup_fixture(ssh_client)
assert ssh_client.client is None
LOG.warning("Host %r still not restarted %s seconds after "
"reboot operation (uptime=%r)", self.hostname,
elapsed_time, uptime)
return False
self.is_rebooted = True
LOG.debug("Host %r resterted %s seconds after reboot operation"
"(uptime=%r)", self.hostname, elapsed_time - uptime, uptime)
assert ssh_client.client is not None
return True
def check_elapsed_time(self):
elapsed_time = self.get_elapsed_time()
if elapsed_time is None:
return None
if elapsed_time >= self.timeout:
raise RebootHostTimeoutError(hostname=self.hostname,
timeout=self.timeout)
return elapsed_time
def get_elapsed_time(self):
start_time = self.start_time
if start_time is None:
return None
return time.time() - start_time

View File

@ -1,4 +1,4 @@
# Copyright (c) 2019 Red Hat, Inc.
# Copyright (c) 2020 Red Hat, Inc.
#
# All Rights Reserved.
#

View File

@ -353,6 +353,11 @@ class SSHClientFixture(tobiko.SharedFixture):
username=parameters['username'],
port=parameters['port'])
@property
def hostname(self):
parameters = self.setup_connect_parameters()
return parameters['hostname']
UNDEFINED_CLIENT = 'UNDEFINED_CLIENT'

View File

@ -0,0 +1,80 @@
# Copyright (c) 2020 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from oslo_log import log
import testtools
import tobiko
from tobiko.shell import sh
LOG = log.getLogger(__name__)
class RebootHostTest(testtools.TestCase):
def test_list_processes(self):
processes = sh.list_processes()
self._check_processes(processes,
is_kernel=False)
def test_list_kernel_processes(self):
processes = sh.list_kernel_processes()
self._check_processes(processes=processes, is_kernel=True)
def test_list_all_processes(self):
processes = sh.list_all_processes()
self._check_processes(processes=processes, is_kernel=None)
def test_list_processes_with_pid(self):
processes = sh.list_processes()
processes_with_pid = sh.list_processes(pid=processes[0].pid)
self.assertEqual(processes[:1], processes_with_pid)
def test_list_processes_with_command(self):
processes = sh.list_processes(command='systemd')
for process in processes:
self.assertTrue(process.command.startswith('systemd'), process)
def test_list_processes_with_exact_command(self):
processes = sh.list_processes(command='^systemd$')
self.assertEqual(processes.with_attributes(command='systemd'),
processes)
def _check_processes(self, processes, is_kernel):
self.assertIsInstance(processes, tobiko.Selection)
for process in processes:
self.assertGreater(process.pid, 0)
self.assertIs(
(process.command.startswith('[') and
process.command.endswith(']')),
process.is_kernel)
if is_kernel is not None:
self.assertIs(bool(is_kernel), process.is_kernel)
def test_wait_for_processes(self):
# assume the PID of the first execution of PS process is not more there
# at the second execution
process = sh.list_processes(command='ps')[-1]
sh.wait_for_processes(pid=process.pid, command='ps', timeout=30.)
def test_wait_for_processes_timeout(self):
# assume there are always to be running processes on host
ex = self.assertRaises(sh.PsWaitTimeout, sh.wait_for_processes,
timeout=3.)
self.assertEqual(3., ex.timeout)
self.assertEqual(sh.get_hostname(), ex.hostname)

View File

@ -1,4 +1,4 @@
# Copyright (c) 2019 Red Hat, Inc.
# Copyright (c) 2020 Red Hat, Inc.
#
# All Rights Reserved.
#
@ -47,21 +47,38 @@ class RebootHostTest(testtools.TestCase):
"uptime=%r", uptime_0)
boottime_0 = time.time() - uptime_0
sh.reboot_host(ssh_client=ssh_client, **params)
# Wait for CirrOS init script to terminate before rebooting the VM
sh.wait_for_processes(command='^{.*}',
ssh_client=ssh_client,
timeout=90.)
reboot = sh.reboot_host(ssh_client=ssh_client, **params)
self.assertIs(ssh_client, reboot.ssh_client)
self.assertEqual(ssh_client.hostname, reboot.hostname)
self.assertGreater(reboot.start_time, 0.)
self.assertEqual(params.get('timeout', sh.RebootHostOperation.timeout),
reboot.timeout)
self.assertIs(params.get('wait', True), reboot.wait)
self.assertEqual(params.get('sleep_interval', 1.),
reboot.sleep_interval)
if not reboot.wait:
self.assertFalse(reboot.is_rebooted)
self.assert_is_not_connected(ssh_client)
reboot.wait_for_operation()
self.assertTrue(reboot.is_rebooted)
self.assert_is_connected(ssh_client)
server = nova.wait_for_server_status(server, 'ACTIVE')
self.assertEqual('ACTIVE', server.status)
wait = params.get('wait', True)
if wait:
self.assert_is_connected(ssh_client)
uptime_1 = sh.get_uptime(ssh_client=ssh_client)
boottime_1 = time.time() - uptime_1
LOG.debug("Reboot operation executed on remote host: "
"uptime=%r", uptime_1)
self.assertGreater(boottime_1, boottime_0)
else:
self.assert_is_not_connected(ssh_client)
uptime_1 = sh.get_uptime(ssh_client=ssh_client)
boottime_1 = time.time() - uptime_1
LOG.debug("Reboot operation executed on remote host: "
"uptime=%r", uptime_1)
self.assertGreater(boottime_1, boottime_0)
def test_reboot_host_with_wait(self):
self.test_reboot_host(wait=True)
@ -79,7 +96,7 @@ class RebootHostTest(testtools.TestCase):
server = nova.shutoff_server(self.stack.server_id)
self.assertEqual('SHUTOFF', server.status)
self.assertRaises(sh.HostNameError, sh.reboot_host,
self.assertRaises(sh.ShellTimeoutExpired, sh.reboot_host,
ssh_client=ssh_client, timeout=5.0)
self.assert_is_not_connected(ssh_client)
server = nova.wait_for_server_status(self.stack.server_id, 'SHUTOFF')