From ddaee9f060c8fd1585d3e06e04eb301518ff90e0 Mon Sep 17 00:00:00 2001 From: John Schwarz Date: Wed, 14 Oct 2015 15:39:33 +0300 Subject: [PATCH] Keep reading stdout/stderr until after kill Currently, when calling AsyncProcess.stop(), the code stops the stdout and stderr readers and kills the process. There exists an end case (as described in the bug report) in which after the readers have been stopped the sub-process will generate a substantial amount of outputs to either fd. Since the 'subprocess' module is launched with subprocess.PIPE as stdout/stderr, and since Linux's pipes can be filled to the point where writing new data to them will block, this may cause a deadlock if the sub-process has a signal handler for the signal (for example, the process is handling SIGTERM to produce a graceful exit of the program). Therefore, this patch proposes to only kill the readers until AFTER wait() returned and the process truly died. Also, relying on _kill_event had to cease since invoking its send() method caused a logical loop back to _kill, causing eventlet errors. A different possible solution is closing the stdout/stderr pipes. Alas, this may raise an exception in the sub-process ("what? No stdout?! Crash!") and defeats the 'graceful' part of the process. Closes-Bug: #1506021 Change-Id: I506c41c634a8d656d81a8ad7963412b834bdfa5b --- neutron/agent/linux/async_process.py | 43 ++++++------ .../tests/unit/agent/linux/failing_process.py | 26 +++++++ .../unit/agent/linux/test_async_process.py | 69 +++++++++++++------ 3 files changed, 93 insertions(+), 45 deletions(-) create mode 100644 neutron/tests/unit/agent/linux/failing_process.py diff --git a/neutron/agent/linux/async_process.py b/neutron/agent/linux/async_process.py index f94cfa6a2e8..9a114b7c98c 100644 --- a/neutron/agent/linux/async_process.py +++ b/neutron/agent/linux/async_process.py @@ -76,6 +76,7 @@ class AsyncProcess(object): raise ValueError(_('respawn_interval must be >= 0 if provided.')) self.respawn_interval = respawn_interval self._process = None + self._is_running = False self._kill_event = None self._reset_queues() self._watchers = [] @@ -104,10 +105,10 @@ class AsyncProcess(object): :raises eventlet.timeout.Timeout if blocking is True and the process did not start in time. """ - if self._kill_event: + LOG.debug('Launching async process [%s].', self.cmd) + if self._is_running: raise AsyncProcessException(_('Process is already started')) else: - LOG.debug('Launching async process [%s].', self.cmd) self._spawn() if block: @@ -122,7 +123,7 @@ class AsyncProcess(object): :raises eventlet.timeout.Timeout if blocking is True and the process did not stop in time. """ - if self._kill_event: + if self._is_running: LOG.debug('Halting async process [%s].', self.cmd) self._kill(kill_signal) else: @@ -133,6 +134,7 @@ class AsyncProcess(object): def _spawn(self): """Spawn a process and its watchers.""" + self._is_running = True self._kill_event = eventlet.event.Event() self._process, cmd = utils.create_process(self._cmd, run_as_root=self.run_as_root) @@ -154,22 +156,16 @@ class AsyncProcess(object): self._process.pid, run_as_root=self.run_as_root) - def _kill(self, kill_signal, respawning=False): - """Kill the process and the associated watcher greenthreads. - - :param respawning: Optional, whether respawn will be subsequently - attempted. - """ - # Halt the greenthreads - self._kill_event.send() - + def _kill(self, kill_signal): + """Kill the process and the associated watcher greenthreads.""" pid = self.pid if pid: + self._is_running = False self._kill_process(pid, kill_signal) - if not respawning: - # Clear the kill event to ensure the process can be - # explicitly started again. + # Halt the greenthreads if they weren't already. + if self._kill_event: + self._kill_event.send() self._kill_event = None def _kill_process(self, pid, kill_signal): @@ -194,15 +190,15 @@ class AsyncProcess(object): """Kill the async process and respawn if necessary.""" LOG.debug('Halting async process [%s] in response to an error.', self.cmd) + self._kill(signal.SIGKILL) if self.respawn_interval is not None and self.respawn_interval >= 0: - respawning = True - else: - respawning = False - self._kill(signal.SIGKILL, respawning=respawning) - if respawning: eventlet.sleep(self.respawn_interval) LOG.debug('Respawning async process [%s].', self.cmd) - self._spawn() + try: + self.start() + except AsyncProcessException: + # Process was already respawned by someone else... + pass def _watch_process(self, callback, kill_event): while not kill_event.ready(): @@ -217,10 +213,11 @@ class AsyncProcess(object): # Ensure that watching a process with lots of output does # not block execution of other greenthreads. eventlet.sleep() - # The kill event not being ready indicates that the loop was + # self._is_running being True indicates that the loop was # broken out of due to an error in the watched process rather # than the loop condition being satisfied. - if not kill_event.ready(): + if self._is_running: + self._is_running = False self._handle_process_error() def _read(self, stream, queue): diff --git a/neutron/tests/unit/agent/linux/failing_process.py b/neutron/tests/unit/agent/linux/failing_process.py new file mode 100644 index 00000000000..29547ca1530 --- /dev/null +++ b/neutron/tests/unit/agent/linux/failing_process.py @@ -0,0 +1,26 @@ +# Copyright 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import sys + + +def main(): + filename = sys.argv[1] + if not os.path.exists(filename): + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/neutron/tests/unit/agent/linux/test_async_process.py b/neutron/tests/unit/agent/linux/test_async_process.py index ad3dfac011d..6e72061723f 100644 --- a/neutron/tests/unit/agent/linux/test_async_process.py +++ b/neutron/tests/unit/agent/linux/test_async_process.py @@ -23,6 +23,7 @@ import testtools from neutron.agent.linux import async_process from neutron.agent.linux import utils from neutron.tests import base +from neutron.tests.unit.agent.linux import failing_process class TestAsyncProcess(base.BaseTestCase): @@ -43,6 +44,7 @@ class TestAsyncProcess(base.BaseTestCase): with mock.patch('eventlet.spawn') as mock_spawn: proc._spawn() + self.assertTrue(self.proc._is_running) self.assertIsInstance(proc._kill_event, eventlet.event.Event) self.assertEqual(proc._process, expected_process) mock_spawn.assert_has_calls([ @@ -59,7 +61,7 @@ class TestAsyncProcess(base.BaseTestCase): with mock.patch.object(self.proc, '_kill') as kill: self.proc._handle_process_error() - kill.assert_has_calls([mock.call(signal.SIGKILL, respawning=False)]) + kill.assert_has_calls([mock.call(signal.SIGKILL)]) def test__handle_process_error_kills_without_respawn(self): self.proc.respawn_interval = 1 @@ -68,11 +70,22 @@ class TestAsyncProcess(base.BaseTestCase): with mock.patch('eventlet.sleep') as sleep: self.proc._handle_process_error() - kill.assert_has_calls([mock.call(signal.SIGKILL, respawning=True)]) + kill.assert_has_calls([mock.call(signal.SIGKILL)]) sleep.assert_has_calls([mock.call(self.proc.respawn_interval)]) spawn.assert_called_once_with() + def test__handle_process_error_no_crash_if_started(self): + self.proc._is_running = True + with mock.patch.object(self.proc, '_kill'): + with mock.patch.object(self.proc, '_spawn') as mock_spawn: + self.proc._handle_process_error() + mock_spawn.assert_not_called() + + def _watch_process_exception(self): + raise Exception('Error!') + def _test__watch_process(self, callback, kill_event): + self.proc._is_running = True self.proc._kill_event = kill_event # Ensure the test times out eventually if the watcher loops endlessly with eventlet.timeout.Timeout(5): @@ -87,9 +100,13 @@ class TestAsyncProcess(base.BaseTestCase): self._test__watch_process(lambda: None, eventlet.event.Event()) def test__watch_process_exits_on_exception(self): - def foo(): - raise Exception('Error!') - self._test__watch_process(foo, eventlet.event.Event()) + self._test__watch_process(self._watch_process_exception, + eventlet.event.Event()) + with mock.patch.object(self.proc, + '_handle_process_error') as func: + self.proc._watch_process(self._watch_process_exception, + self.proc._kill_event) + func.assert_not_called() def test__watch_process_exits_on_sent_kill_event(self): kill_event = eventlet.event.Event() @@ -117,7 +134,7 @@ class TestAsyncProcess(base.BaseTestCase): self._test_read_output_queues_and_returns_result('') def test_start_raises_exception_if_process_already_started(self): - self.proc._kill_event = True + self.proc._is_running = True with testtools.ExpectedException(async_process.AsyncProcessException): self.proc.start() @@ -155,7 +172,9 @@ class TestAsyncProcess(base.BaseTestCase): def test_iter_stderr(self): self._test_iter_output_calls_iter_queue_on_output_queue('stderr') - def _test__kill(self, respawning, pid=None): + def test__kill_targets_process_for_pid(self): + pid = 1 + with mock.patch.object(self.proc, '_kill_event' ) as mock_kill_event,\ mock.patch.object(utils, 'get_root_helper_child_pid', @@ -163,26 +182,15 @@ class TestAsyncProcess(base.BaseTestCase): mock.patch.object(self.proc, '_kill_process' ) as mock_kill_process,\ mock.patch.object(self.proc, '_process'): - self.proc._kill(signal.SIGKILL, respawning) + self.proc._kill(signal.SIGKILL) - if respawning: - self.assertIsNotNone(self.proc._kill_event) - else: - self.assertIsNone(self.proc._kill_event) + self.assertIsNone(self.proc._kill_event) + self.assertFalse(self.proc._is_running) mock_kill_event.send.assert_called_once_with() if pid: mock_kill_process.assert_called_once_with(pid, signal.SIGKILL) - def test__kill_when_respawning_does_not_clear_kill_event(self): - self._test__kill(True) - - def test__kill_when_not_respawning_clears_kill_event(self): - self._test__kill(False) - - def test__kill_targets_process_for_pid(self): - self._test__kill(False, pid='1') - def _test__kill_process(self, pid, expected, exception_message=None, kill_signal=signal.SIGKILL): self.proc.run_as_root = True @@ -211,7 +219,7 @@ class TestAsyncProcess(base.BaseTestCase): self._test__kill_process('1', True, kill_signal=signal.SIGTERM) def test_stop_calls_kill_with_provided_signal_number(self): - self.proc._kill_event = True + self.proc._is_running = True with mock.patch.object(self.proc, '_kill') as mock_kill: self.proc.stop(kill_signal=signal.SIGTERM) mock_kill.assert_called_once_with(signal.SIGTERM) @@ -267,3 +275,20 @@ class TestAsyncProcessDieOnError(base.BaseTestCase): with mock.patch.object(proc, '_read', return_value='fakedata'),\ mock.patch.object(proc, '_process'): self.assertIsNone(proc._read_stderr()) + + +class TestFailingAsyncProcess(base.BaseTestCase): + def setUp(self): + super(TestFailingAsyncProcess, self).setUp() + path = self.get_temp_file_path('async.tmp', self.get_new_temp_dir()) + self.process = async_process.AsyncProcess(['python', + failing_process.__file__, + path], + respawn_interval=0) + + def test_failing_async_process_handle_error_once(self): + with mock.patch.object(self.process, '_handle_process_error')\ + as handle_error_mock: + self.process.start() + self.process._process.wait() + self.assertEqual(1, handle_error_mock.call_count)