Add kill_timeout to AsyncProcess

AsyncProcess.stop() method has now additional parameter
kill_timeout. If this is set to some value different than
None, eventlet.green.subprocess.Popen.wait() will be called
with this timeout, so TimeoutExpired exception will be raised
in case if process will not be killed for this "kill_timeout"
time.
In such case process will be killed "again" with SIGKILL signal
to make sure that it is gone.

This should fix problem with failing fullstack tests, when
ovs_agent process is sometimes not killed and test timeout was
reached in this wait() method.

Change-Id: I1e12255e5e142c395adf4e67be9d9da0f7a3d4fd
Closes-Bug: #1798472
This commit is contained in:
Slawek Kaplonski 2018-11-14 21:31:04 +01:00
parent 84e22945ce
commit 9b23abbdb6
3 changed files with 63 additions and 20 deletions

View File

@ -16,6 +16,7 @@ import signal
import eventlet
import eventlet.event
from eventlet.green import subprocess
import eventlet.queue
from neutron_lib.utils import helpers
from oslo_log import log as logging
@ -117,19 +118,22 @@ class AsyncProcess(object):
if block:
common_utils.wait_until_true(self.is_active)
def stop(self, block=False, kill_signal=None):
def stop(self, block=False, kill_signal=None, kill_timeout=None):
"""Halt the process and watcher threads.
:param block: Block until the process has stopped.
:param kill_signal: Number of signal that will be sent to the process
when terminating the process
:param kill_timeout: If given, process will be killed with SIGKILL
if timeout will be reached and process will
still be running
:raises utils.WaitTimeout if blocking is True and the process
did not stop in time.
"""
kill_signal = kill_signal or getattr(signal, 'SIGKILL', signal.SIGTERM)
if self._is_running:
LOG.debug('Halting async process [%s].', self.cmd)
self._kill(kill_signal)
self._kill(kill_signal, kill_timeout)
else:
raise AsyncProcessException(_('Process is not running.'))
@ -164,19 +168,38 @@ class AsyncProcess(object):
run_as_root=self.run_as_root)
return self._pid
def _kill(self, kill_signal):
def _kill(self, kill_signal, kill_timeout=None):
"""Kill the process and the associated watcher greenthreads."""
pid = self.pid
if pid:
self._is_running = False
self._pid = None
self._kill_process(pid, kill_signal)
self._kill_process_and_wait(pid, kill_signal, kill_timeout)
# Halt the greenthreads if they weren't already.
if self._kill_event:
self._kill_event.send()
self._kill_event = None
def _kill_process_and_wait(self, pid, kill_signal, kill_timeout=None):
kill_result = self._kill_process(pid, kill_signal)
if kill_result is False:
return kill_result
if self._process:
try:
self._process.wait(kill_timeout)
except subprocess.TimeoutExpired:
LOG.warning("Process %(pid)s [%(cmd)s] still running after "
"%(timeout)d seconds. Sending %(signal)d to kill "
"it.",
{'pid': pid,
'cmd': self.cmd,
'timeout': kill_timeout,
'signal': signal.SIGKILL})
return self._kill_process(pid, signal.SIGKILL)
return True
def _kill_process(self, pid, kill_signal):
try:
# A process started by a root helper will be running as
@ -186,9 +209,6 @@ class AsyncProcess(object):
LOG.exception('An error occurred while killing [%s].',
self.cmd)
return False
if self._process:
self._process.wait()
return True
def _handle_process_error(self):

View File

@ -75,7 +75,9 @@ class ProcessFixture(fixtures.Fixture):
def stop(self, kill_signal=None):
kill_signal = kill_signal or self.kill_signal
try:
self.process.stop(block=True, kill_signal=kill_signal)
self.process.stop(
block=True, kill_signal=kill_signal,
kill_timeout=15)
except async_process.AsyncProcessException as e:
if "Process is not running" not in str(e):
raise

View File

@ -15,6 +15,7 @@
import signal
import eventlet.event
from eventlet.green import subprocess
import eventlet.queue
import mock
import testtools
@ -197,8 +198,8 @@ class TestAsyncProcess(base.BaseTestCase):
) as mock_kill_event,\
mock.patch.object(utils, 'get_root_helper_child_pid',
return_value=pid),\
mock.patch.object(self.proc, '_kill_process'
) as mock_kill_process,\
mock.patch.object(self.proc, '_kill_process_and_wait'
) as mock_kill_process_and_wait,\
mock.patch.object(self.proc, '_process'):
self.proc._kill(signal.SIGKILL)
@ -208,10 +209,12 @@ class TestAsyncProcess(base.BaseTestCase):
mock_kill_event.send.assert_called_once_with()
if pid:
mock_kill_process.assert_called_once_with(pid, signal.SIGKILL)
mock_kill_process_and_wait.assert_called_once_with(
pid, signal.SIGKILL, None)
def _test__kill_process(self, pid, expected, exception_message=None,
kill_signal=signal.SIGKILL):
def _test__kill_process_and_wait(self, pid, expected,
exception_message=None,
kill_signal=signal.SIGKILL):
self.proc.run_as_root = True
if exception_message:
exc = RuntimeError(exception_message)
@ -226,20 +229,38 @@ class TestAsyncProcess(base.BaseTestCase):
kill_signal,
self.proc.run_as_root)
def test__kill_process_returns_true_for_valid_pid(self):
self._test__kill_process('1', True)
def test__kill_process_and_wait_returns_true_for_valid_pid(self):
self._test__kill_process_and_wait('1', True)
def test__kill_process_returns_false_for_execute_exception(self):
self._test__kill_process('1', False, 'Invalid')
def test__kill_process_and_wait_returns_false_for_execute_exception(self):
self._test__kill_process_and_wait('1', False, 'Invalid')
def test_kill_process_with_different_signal(self):
self._test__kill_process('1', True, kill_signal=signal.SIGTERM)
def test_kill_process_and_wait_with_different_signal(self):
self._test__kill_process_and_wait(
'1', True, kill_signal=signal.SIGTERM)
def test__kill_process_timeout_reached(self):
self.proc.run_as_root = True
kill_timeout = 5
pid = '1'
with mock.patch.object(utils, 'kill_process') as mock_kill_process, \
mock.patch.object(self.proc, '_process') as process_mock:
process_mock.wait.side_effect = subprocess.TimeoutExpired(
self.proc.cmd, kill_timeout)
self.assertTrue(
self.proc._kill_process_and_wait(
pid, signal.SIGTERM, kill_timeout))
process_mock.wait.assert_called_once_with(kill_timeout)
mock_kill_process.assert_has_calls([
mock.call(pid, signal.SIGTERM, self.proc.run_as_root),
mock.call(pid, signal.SIGKILL, self.proc.run_as_root)])
def test_stop_calls_kill_with_provided_signal_number(self):
self.proc._is_running = True
with mock.patch.object(self.proc, '_kill') as mock_kill:
self.proc.stop(kill_signal=signal.SIGTERM)
mock_kill.assert_called_once_with(signal.SIGTERM)
mock_kill.assert_called_once_with(signal.SIGTERM, None)
def test_stop_raises_exception_if_already_started(self):
with testtools.ExpectedException(async_process.AsyncProcessException):