Add kill_timeout to AsyncProcess

AsyncProcess.stop() method has now additional parameter
kill_timeout. If this is set to some value different than
None, eventlet.green.subprocess.Popen.wait() will be called
with this timeout, so TimeoutExpired exception will be raised
in case if process will not be killed for this "kill_timeout"
time.
In such case process will be killed "again" with SIGKILL signal
to make sure that it is gone.

This should fix problem with failing fullstack tests, when
ovs_agent process is sometimes not killed and test timeout was
reached in this wait() method.

Conflicts:
    neutron/agent/linux/async_process.py

Change-Id: I1e12255e5e142c395adf4e67be9d9da0f7a3d4fd
Closes-Bug: #1798472
(cherry picked from commit 9b23abbdb6)
This commit is contained in:
Slawek Kaplonski 2018-11-14 21:31:04 +01:00
parent 182d13b9b5
commit c86473d1a6
3 changed files with 63 additions and 20 deletions

View File

@ -16,6 +16,7 @@ import signal
import eventlet import eventlet
import eventlet.event import eventlet.event
from eventlet.green import subprocess
import eventlet.queue import eventlet.queue
from neutron_lib.utils import helpers from neutron_lib.utils import helpers
from oslo_log import log as logging from oslo_log import log as logging
@ -117,18 +118,21 @@ class AsyncProcess(object):
if block: if block:
common_utils.wait_until_true(self.is_active) common_utils.wait_until_true(self.is_active)
def stop(self, block=False, kill_signal=signal.SIGKILL): def stop(self, block=False, kill_signal=signal.SIGKILL, kill_timeout=None):
"""Halt the process and watcher threads. """Halt the process and watcher threads.
:param block: Block until the process has stopped. :param block: Block until the process has stopped.
:param kill_signal: Number of signal that will be sent to the process :param kill_signal: Number of signal that will be sent to the process
when terminating the process when terminating the process
:param kill_timeout: If given, process will be killed with SIGKILL
if timeout will be reached and process will
still be running
:raises utils.WaitTimeout if blocking is True and the process :raises utils.WaitTimeout if blocking is True and the process
did not stop in time. did not stop in time.
""" """
if self._is_running: if self._is_running:
LOG.debug('Halting async process [%s].', self.cmd) LOG.debug('Halting async process [%s].', self.cmd)
self._kill(kill_signal) self._kill(kill_signal, kill_timeout)
else: else:
raise AsyncProcessException(_('Process is not running.')) raise AsyncProcessException(_('Process is not running.'))
@ -163,19 +167,38 @@ class AsyncProcess(object):
run_as_root=self.run_as_root) run_as_root=self.run_as_root)
return self._pid return self._pid
def _kill(self, kill_signal): def _kill(self, kill_signal, kill_timeout=None):
"""Kill the process and the associated watcher greenthreads.""" """Kill the process and the associated watcher greenthreads."""
pid = self.pid pid = self.pid
if pid: if pid:
self._is_running = False self._is_running = False
self._pid = None self._pid = None
self._kill_process(pid, kill_signal) self._kill_process_and_wait(pid, kill_signal, kill_timeout)
# Halt the greenthreads if they weren't already. # Halt the greenthreads if they weren't already.
if self._kill_event: if self._kill_event:
self._kill_event.send() self._kill_event.send()
self._kill_event = None self._kill_event = None
def _kill_process_and_wait(self, pid, kill_signal, kill_timeout=None):
kill_result = self._kill_process(pid, kill_signal)
if kill_result is False:
return kill_result
if self._process:
try:
self._process.wait(kill_timeout)
except subprocess.TimeoutExpired:
LOG.warning("Process %(pid)s [%(cmd)s] still running after "
"%(timeout)d seconds. Sending %(signal)d to kill "
"it.",
{'pid': pid,
'cmd': self.cmd,
'timeout': kill_timeout,
'signal': signal.SIGKILL})
return self._kill_process(pid, signal.SIGKILL)
return True
def _kill_process(self, pid, kill_signal): def _kill_process(self, pid, kill_signal):
try: try:
# A process started by a root helper will be running as # A process started by a root helper will be running as
@ -185,9 +208,6 @@ class AsyncProcess(object):
LOG.exception('An error occurred while killing [%s].', LOG.exception('An error occurred while killing [%s].',
self.cmd) self.cmd)
return False return False
if self._process:
self._process.wait()
return True return True
def _handle_process_error(self): def _handle_process_error(self):

View File

@ -75,7 +75,9 @@ class ProcessFixture(fixtures.Fixture):
def stop(self, kill_signal=None): def stop(self, kill_signal=None):
kill_signal = kill_signal or self.kill_signal kill_signal = kill_signal or self.kill_signal
try: try:
self.process.stop(block=True, kill_signal=kill_signal) self.process.stop(
block=True, kill_signal=kill_signal,
kill_timeout=15)
except async_process.AsyncProcessException as e: except async_process.AsyncProcessException as e:
if "Process is not running" not in str(e): if "Process is not running" not in str(e):
raise raise

View File

@ -15,6 +15,7 @@
import signal import signal
import eventlet.event import eventlet.event
from eventlet.green import subprocess
import eventlet.queue import eventlet.queue
import mock import mock
import testtools import testtools
@ -197,8 +198,8 @@ class TestAsyncProcess(base.BaseTestCase):
) as mock_kill_event,\ ) as mock_kill_event,\
mock.patch.object(utils, 'get_root_helper_child_pid', mock.patch.object(utils, 'get_root_helper_child_pid',
return_value=pid),\ return_value=pid),\
mock.patch.object(self.proc, '_kill_process' mock.patch.object(self.proc, '_kill_process_and_wait'
) as mock_kill_process,\ ) as mock_kill_process_and_wait,\
mock.patch.object(self.proc, '_process'): mock.patch.object(self.proc, '_process'):
self.proc._kill(signal.SIGKILL) self.proc._kill(signal.SIGKILL)
@ -208,10 +209,12 @@ class TestAsyncProcess(base.BaseTestCase):
mock_kill_event.send.assert_called_once_with() mock_kill_event.send.assert_called_once_with()
if pid: if pid:
mock_kill_process.assert_called_once_with(pid, signal.SIGKILL) mock_kill_process_and_wait.assert_called_once_with(
pid, signal.SIGKILL, None)
def _test__kill_process(self, pid, expected, exception_message=None, def _test__kill_process_and_wait(self, pid, expected,
kill_signal=signal.SIGKILL): exception_message=None,
kill_signal=signal.SIGKILL):
self.proc.run_as_root = True self.proc.run_as_root = True
if exception_message: if exception_message:
exc = RuntimeError(exception_message) exc = RuntimeError(exception_message)
@ -226,20 +229,38 @@ class TestAsyncProcess(base.BaseTestCase):
kill_signal, kill_signal,
self.proc.run_as_root) self.proc.run_as_root)
def test__kill_process_returns_true_for_valid_pid(self): def test__kill_process_and_wait_returns_true_for_valid_pid(self):
self._test__kill_process('1', True) self._test__kill_process_and_wait('1', True)
def test__kill_process_returns_false_for_execute_exception(self): def test__kill_process_and_wait_returns_false_for_execute_exception(self):
self._test__kill_process('1', False, 'Invalid') self._test__kill_process_and_wait('1', False, 'Invalid')
def test_kill_process_with_different_signal(self): def test_kill_process_and_wait_with_different_signal(self):
self._test__kill_process('1', True, kill_signal=signal.SIGTERM) self._test__kill_process_and_wait(
'1', True, kill_signal=signal.SIGTERM)
def test__kill_process_timeout_reached(self):
self.proc.run_as_root = True
kill_timeout = 5
pid = '1'
with mock.patch.object(utils, 'kill_process') as mock_kill_process, \
mock.patch.object(self.proc, '_process') as process_mock:
process_mock.wait.side_effect = subprocess.TimeoutExpired(
self.proc.cmd, kill_timeout)
self.assertTrue(
self.proc._kill_process_and_wait(
pid, signal.SIGTERM, kill_timeout))
process_mock.wait.assert_called_once_with(kill_timeout)
mock_kill_process.assert_has_calls([
mock.call(pid, signal.SIGTERM, self.proc.run_as_root),
mock.call(pid, signal.SIGKILL, self.proc.run_as_root)])
def test_stop_calls_kill_with_provided_signal_number(self): def test_stop_calls_kill_with_provided_signal_number(self):
self.proc._is_running = True self.proc._is_running = True
with mock.patch.object(self.proc, '_kill') as mock_kill: with mock.patch.object(self.proc, '_kill') as mock_kill:
self.proc.stop(kill_signal=signal.SIGTERM) self.proc.stop(kill_signal=signal.SIGTERM)
mock_kill.assert_called_once_with(signal.SIGTERM) mock_kill.assert_called_once_with(signal.SIGTERM, None)
def test_stop_raises_exception_if_already_started(self): def test_stop_raises_exception_if_already_started(self):
with testtools.ExpectedException(async_process.AsyncProcessException): with testtools.ExpectedException(async_process.AsyncProcessException):