Keep reading stdout/stderr until after kill

Currently, when calling AsyncProcess.stop(), the code stops the stdout
and stderr readers and kills the process. There exists an end case (as
described in the bug report) in which after the readers have been
stopped the sub-process will generate a substantial amount of outputs to
either fd. Since the 'subprocess' module is launched with
subprocess.PIPE as stdout/stderr, and since Linux's pipes can be filled
to the point where writing new data to them will block, this may cause a
deadlock if the sub-process has a signal handler for the signal (for
example, the process is handling SIGTERM to produce a graceful exit of
the program).

Therefore, this patch proposes to only kill the readers until AFTER
wait() returned and the process truly died. Also, relying on _kill_event
had to cease since invoking its send() method caused a logical loop back
to _kill, causing eventlet errors.

A different possible solution is closing the stdout/stderr pipes. Alas,
this may raise an exception in the sub-process ("what? No stdout?!
Crash!") and defeats the 'graceful' part of the process.

Closes-Bug: #1506021
Change-Id: I506c41c634a8d656d81a8ad7963412b834bdfa5b
This commit is contained in:
John Schwarz 2015-10-14 15:39:33 +03:00
parent 44cc4b9a63
commit ddaee9f060
3 changed files with 93 additions and 45 deletions

View File

@ -76,6 +76,7 @@ class AsyncProcess(object):
raise ValueError(_('respawn_interval must be >= 0 if provided.'))
self.respawn_interval = respawn_interval
self._process = None
self._is_running = False
self._kill_event = None
self._reset_queues()
self._watchers = []
@ -104,10 +105,10 @@ class AsyncProcess(object):
:raises eventlet.timeout.Timeout if blocking is True and the process
did not start in time.
"""
if self._kill_event:
LOG.debug('Launching async process [%s].', self.cmd)
if self._is_running:
raise AsyncProcessException(_('Process is already started'))
else:
LOG.debug('Launching async process [%s].', self.cmd)
self._spawn()
if block:
@ -122,7 +123,7 @@ class AsyncProcess(object):
:raises eventlet.timeout.Timeout if blocking is True and the process
did not stop in time.
"""
if self._kill_event:
if self._is_running:
LOG.debug('Halting async process [%s].', self.cmd)
self._kill(kill_signal)
else:
@ -133,6 +134,7 @@ class AsyncProcess(object):
def _spawn(self):
"""Spawn a process and its watchers."""
self._is_running = True
self._kill_event = eventlet.event.Event()
self._process, cmd = utils.create_process(self._cmd,
run_as_root=self.run_as_root)
@ -154,22 +156,16 @@ class AsyncProcess(object):
self._process.pid,
run_as_root=self.run_as_root)
def _kill(self, kill_signal, respawning=False):
"""Kill the process and the associated watcher greenthreads.
:param respawning: Optional, whether respawn will be subsequently
attempted.
"""
# Halt the greenthreads
self._kill_event.send()
def _kill(self, kill_signal):
"""Kill the process and the associated watcher greenthreads."""
pid = self.pid
if pid:
self._is_running = False
self._kill_process(pid, kill_signal)
if not respawning:
# Clear the kill event to ensure the process can be
# explicitly started again.
# Halt the greenthreads if they weren't already.
if self._kill_event:
self._kill_event.send()
self._kill_event = None
def _kill_process(self, pid, kill_signal):
@ -194,15 +190,15 @@ class AsyncProcess(object):
"""Kill the async process and respawn if necessary."""
LOG.debug('Halting async process [%s] in response to an error.',
self.cmd)
self._kill(signal.SIGKILL)
if self.respawn_interval is not None and self.respawn_interval >= 0:
respawning = True
else:
respawning = False
self._kill(signal.SIGKILL, respawning=respawning)
if respawning:
eventlet.sleep(self.respawn_interval)
LOG.debug('Respawning async process [%s].', self.cmd)
self._spawn()
try:
self.start()
except AsyncProcessException:
# Process was already respawned by someone else...
pass
def _watch_process(self, callback, kill_event):
while not kill_event.ready():
@ -217,10 +213,11 @@ class AsyncProcess(object):
# Ensure that watching a process with lots of output does
# not block execution of other greenthreads.
eventlet.sleep()
# The kill event not being ready indicates that the loop was
# self._is_running being True indicates that the loop was
# broken out of due to an error in the watched process rather
# than the loop condition being satisfied.
if not kill_event.ready():
if self._is_running:
self._is_running = False
self._handle_process_error()
def _read(self, stream, queue):

View File

@ -0,0 +1,26 @@
# Copyright 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import sys
def main():
filename = sys.argv[1]
if not os.path.exists(filename):
sys.exit(1)
if __name__ == '__main__':
main()

View File

@ -23,6 +23,7 @@ import testtools
from neutron.agent.linux import async_process
from neutron.agent.linux import utils
from neutron.tests import base
from neutron.tests.unit.agent.linux import failing_process
class TestAsyncProcess(base.BaseTestCase):
@ -43,6 +44,7 @@ class TestAsyncProcess(base.BaseTestCase):
with mock.patch('eventlet.spawn') as mock_spawn:
proc._spawn()
self.assertTrue(self.proc._is_running)
self.assertIsInstance(proc._kill_event, eventlet.event.Event)
self.assertEqual(proc._process, expected_process)
mock_spawn.assert_has_calls([
@ -59,7 +61,7 @@ class TestAsyncProcess(base.BaseTestCase):
with mock.patch.object(self.proc, '_kill') as kill:
self.proc._handle_process_error()
kill.assert_has_calls([mock.call(signal.SIGKILL, respawning=False)])
kill.assert_has_calls([mock.call(signal.SIGKILL)])
def test__handle_process_error_kills_without_respawn(self):
self.proc.respawn_interval = 1
@ -68,11 +70,22 @@ class TestAsyncProcess(base.BaseTestCase):
with mock.patch('eventlet.sleep') as sleep:
self.proc._handle_process_error()
kill.assert_has_calls([mock.call(signal.SIGKILL, respawning=True)])
kill.assert_has_calls([mock.call(signal.SIGKILL)])
sleep.assert_has_calls([mock.call(self.proc.respawn_interval)])
spawn.assert_called_once_with()
def test__handle_process_error_no_crash_if_started(self):
self.proc._is_running = True
with mock.patch.object(self.proc, '_kill'):
with mock.patch.object(self.proc, '_spawn') as mock_spawn:
self.proc._handle_process_error()
mock_spawn.assert_not_called()
def _watch_process_exception(self):
raise Exception('Error!')
def _test__watch_process(self, callback, kill_event):
self.proc._is_running = True
self.proc._kill_event = kill_event
# Ensure the test times out eventually if the watcher loops endlessly
with eventlet.timeout.Timeout(5):
@ -87,9 +100,13 @@ class TestAsyncProcess(base.BaseTestCase):
self._test__watch_process(lambda: None, eventlet.event.Event())
def test__watch_process_exits_on_exception(self):
def foo():
raise Exception('Error!')
self._test__watch_process(foo, eventlet.event.Event())
self._test__watch_process(self._watch_process_exception,
eventlet.event.Event())
with mock.patch.object(self.proc,
'_handle_process_error') as func:
self.proc._watch_process(self._watch_process_exception,
self.proc._kill_event)
func.assert_not_called()
def test__watch_process_exits_on_sent_kill_event(self):
kill_event = eventlet.event.Event()
@ -117,7 +134,7 @@ class TestAsyncProcess(base.BaseTestCase):
self._test_read_output_queues_and_returns_result('')
def test_start_raises_exception_if_process_already_started(self):
self.proc._kill_event = True
self.proc._is_running = True
with testtools.ExpectedException(async_process.AsyncProcessException):
self.proc.start()
@ -155,7 +172,9 @@ class TestAsyncProcess(base.BaseTestCase):
def test_iter_stderr(self):
self._test_iter_output_calls_iter_queue_on_output_queue('stderr')
def _test__kill(self, respawning, pid=None):
def test__kill_targets_process_for_pid(self):
pid = 1
with mock.patch.object(self.proc, '_kill_event'
) as mock_kill_event,\
mock.patch.object(utils, 'get_root_helper_child_pid',
@ -163,26 +182,15 @@ class TestAsyncProcess(base.BaseTestCase):
mock.patch.object(self.proc, '_kill_process'
) as mock_kill_process,\
mock.patch.object(self.proc, '_process'):
self.proc._kill(signal.SIGKILL, respawning)
self.proc._kill(signal.SIGKILL)
if respawning:
self.assertIsNotNone(self.proc._kill_event)
else:
self.assertIsNone(self.proc._kill_event)
self.assertFalse(self.proc._is_running)
mock_kill_event.send.assert_called_once_with()
if pid:
mock_kill_process.assert_called_once_with(pid, signal.SIGKILL)
def test__kill_when_respawning_does_not_clear_kill_event(self):
self._test__kill(True)
def test__kill_when_not_respawning_clears_kill_event(self):
self._test__kill(False)
def test__kill_targets_process_for_pid(self):
self._test__kill(False, pid='1')
def _test__kill_process(self, pid, expected, exception_message=None,
kill_signal=signal.SIGKILL):
self.proc.run_as_root = True
@ -211,7 +219,7 @@ class TestAsyncProcess(base.BaseTestCase):
self._test__kill_process('1', True, kill_signal=signal.SIGTERM)
def test_stop_calls_kill_with_provided_signal_number(self):
self.proc._kill_event = True
self.proc._is_running = True
with mock.patch.object(self.proc, '_kill') as mock_kill:
self.proc.stop(kill_signal=signal.SIGTERM)
mock_kill.assert_called_once_with(signal.SIGTERM)
@ -267,3 +275,20 @@ class TestAsyncProcessDieOnError(base.BaseTestCase):
with mock.patch.object(proc, '_read', return_value='fakedata'),\
mock.patch.object(proc, '_process'):
self.assertIsNone(proc._read_stderr())
class TestFailingAsyncProcess(base.BaseTestCase):
def setUp(self):
super(TestFailingAsyncProcess, self).setUp()
path = self.get_temp_file_path('async.tmp', self.get_new_temp_dir())
self.process = async_process.AsyncProcess(['python',
failing_process.__file__,
path],
respawn_interval=0)
def test_failing_async_process_handle_error_once(self):
with mock.patch.object(self.process, '_handle_process_error')\
as handle_error_mock:
self.process.start()
self.process._process.wait()
self.assertEqual(1, handle_error_mock.call_count)