diff --git a/neutron/agent/linux/async_process.py b/neutron/agent/linux/async_process.py index f94cfa6a2e8..9a114b7c98c 100644 --- a/neutron/agent/linux/async_process.py +++ b/neutron/agent/linux/async_process.py @@ -76,6 +76,7 @@ class AsyncProcess(object): raise ValueError(_('respawn_interval must be >= 0 if provided.')) self.respawn_interval = respawn_interval self._process = None + self._is_running = False self._kill_event = None self._reset_queues() self._watchers = [] @@ -104,10 +105,10 @@ class AsyncProcess(object): :raises eventlet.timeout.Timeout if blocking is True and the process did not start in time. """ - if self._kill_event: + LOG.debug('Launching async process [%s].', self.cmd) + if self._is_running: raise AsyncProcessException(_('Process is already started')) else: - LOG.debug('Launching async process [%s].', self.cmd) self._spawn() if block: @@ -122,7 +123,7 @@ class AsyncProcess(object): :raises eventlet.timeout.Timeout if blocking is True and the process did not stop in time. """ - if self._kill_event: + if self._is_running: LOG.debug('Halting async process [%s].', self.cmd) self._kill(kill_signal) else: @@ -133,6 +134,7 @@ class AsyncProcess(object): def _spawn(self): """Spawn a process and its watchers.""" + self._is_running = True self._kill_event = eventlet.event.Event() self._process, cmd = utils.create_process(self._cmd, run_as_root=self.run_as_root) @@ -154,22 +156,16 @@ class AsyncProcess(object): self._process.pid, run_as_root=self.run_as_root) - def _kill(self, kill_signal, respawning=False): - """Kill the process and the associated watcher greenthreads. - - :param respawning: Optional, whether respawn will be subsequently - attempted. - """ - # Halt the greenthreads - self._kill_event.send() - + def _kill(self, kill_signal): + """Kill the process and the associated watcher greenthreads.""" pid = self.pid if pid: + self._is_running = False self._kill_process(pid, kill_signal) - if not respawning: - # Clear the kill event to ensure the process can be - # explicitly started again. + # Halt the greenthreads if they weren't already. + if self._kill_event: + self._kill_event.send() self._kill_event = None def _kill_process(self, pid, kill_signal): @@ -194,15 +190,15 @@ class AsyncProcess(object): """Kill the async process and respawn if necessary.""" LOG.debug('Halting async process [%s] in response to an error.', self.cmd) + self._kill(signal.SIGKILL) if self.respawn_interval is not None and self.respawn_interval >= 0: - respawning = True - else: - respawning = False - self._kill(signal.SIGKILL, respawning=respawning) - if respawning: eventlet.sleep(self.respawn_interval) LOG.debug('Respawning async process [%s].', self.cmd) - self._spawn() + try: + self.start() + except AsyncProcessException: + # Process was already respawned by someone else... + pass def _watch_process(self, callback, kill_event): while not kill_event.ready(): @@ -217,10 +213,11 @@ class AsyncProcess(object): # Ensure that watching a process with lots of output does # not block execution of other greenthreads. eventlet.sleep() - # The kill event not being ready indicates that the loop was + # self._is_running being True indicates that the loop was # broken out of due to an error in the watched process rather # than the loop condition being satisfied. - if not kill_event.ready(): + if self._is_running: + self._is_running = False self._handle_process_error() def _read(self, stream, queue): diff --git a/neutron/tests/unit/agent/linux/failing_process.py b/neutron/tests/unit/agent/linux/failing_process.py new file mode 100644 index 00000000000..29547ca1530 --- /dev/null +++ b/neutron/tests/unit/agent/linux/failing_process.py @@ -0,0 +1,26 @@ +# Copyright 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import sys + + +def main(): + filename = sys.argv[1] + if not os.path.exists(filename): + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/neutron/tests/unit/agent/linux/test_async_process.py b/neutron/tests/unit/agent/linux/test_async_process.py index ad3dfac011d..6e72061723f 100644 --- a/neutron/tests/unit/agent/linux/test_async_process.py +++ b/neutron/tests/unit/agent/linux/test_async_process.py @@ -23,6 +23,7 @@ import testtools from neutron.agent.linux import async_process from neutron.agent.linux import utils from neutron.tests import base +from neutron.tests.unit.agent.linux import failing_process class TestAsyncProcess(base.BaseTestCase): @@ -43,6 +44,7 @@ class TestAsyncProcess(base.BaseTestCase): with mock.patch('eventlet.spawn') as mock_spawn: proc._spawn() + self.assertTrue(self.proc._is_running) self.assertIsInstance(proc._kill_event, eventlet.event.Event) self.assertEqual(proc._process, expected_process) mock_spawn.assert_has_calls([ @@ -59,7 +61,7 @@ class TestAsyncProcess(base.BaseTestCase): with mock.patch.object(self.proc, '_kill') as kill: self.proc._handle_process_error() - kill.assert_has_calls([mock.call(signal.SIGKILL, respawning=False)]) + kill.assert_has_calls([mock.call(signal.SIGKILL)]) def test__handle_process_error_kills_without_respawn(self): self.proc.respawn_interval = 1 @@ -68,11 +70,22 @@ class TestAsyncProcess(base.BaseTestCase): with mock.patch('eventlet.sleep') as sleep: self.proc._handle_process_error() - kill.assert_has_calls([mock.call(signal.SIGKILL, respawning=True)]) + kill.assert_has_calls([mock.call(signal.SIGKILL)]) sleep.assert_has_calls([mock.call(self.proc.respawn_interval)]) spawn.assert_called_once_with() + def test__handle_process_error_no_crash_if_started(self): + self.proc._is_running = True + with mock.patch.object(self.proc, '_kill'): + with mock.patch.object(self.proc, '_spawn') as mock_spawn: + self.proc._handle_process_error() + mock_spawn.assert_not_called() + + def _watch_process_exception(self): + raise Exception('Error!') + def _test__watch_process(self, callback, kill_event): + self.proc._is_running = True self.proc._kill_event = kill_event # Ensure the test times out eventually if the watcher loops endlessly with eventlet.timeout.Timeout(5): @@ -87,9 +100,13 @@ class TestAsyncProcess(base.BaseTestCase): self._test__watch_process(lambda: None, eventlet.event.Event()) def test__watch_process_exits_on_exception(self): - def foo(): - raise Exception('Error!') - self._test__watch_process(foo, eventlet.event.Event()) + self._test__watch_process(self._watch_process_exception, + eventlet.event.Event()) + with mock.patch.object(self.proc, + '_handle_process_error') as func: + self.proc._watch_process(self._watch_process_exception, + self.proc._kill_event) + func.assert_not_called() def test__watch_process_exits_on_sent_kill_event(self): kill_event = eventlet.event.Event() @@ -117,7 +134,7 @@ class TestAsyncProcess(base.BaseTestCase): self._test_read_output_queues_and_returns_result('') def test_start_raises_exception_if_process_already_started(self): - self.proc._kill_event = True + self.proc._is_running = True with testtools.ExpectedException(async_process.AsyncProcessException): self.proc.start() @@ -155,7 +172,9 @@ class TestAsyncProcess(base.BaseTestCase): def test_iter_stderr(self): self._test_iter_output_calls_iter_queue_on_output_queue('stderr') - def _test__kill(self, respawning, pid=None): + def test__kill_targets_process_for_pid(self): + pid = 1 + with mock.patch.object(self.proc, '_kill_event' ) as mock_kill_event,\ mock.patch.object(utils, 'get_root_helper_child_pid', @@ -163,26 +182,15 @@ class TestAsyncProcess(base.BaseTestCase): mock.patch.object(self.proc, '_kill_process' ) as mock_kill_process,\ mock.patch.object(self.proc, '_process'): - self.proc._kill(signal.SIGKILL, respawning) + self.proc._kill(signal.SIGKILL) - if respawning: - self.assertIsNotNone(self.proc._kill_event) - else: - self.assertIsNone(self.proc._kill_event) + self.assertIsNone(self.proc._kill_event) + self.assertFalse(self.proc._is_running) mock_kill_event.send.assert_called_once_with() if pid: mock_kill_process.assert_called_once_with(pid, signal.SIGKILL) - def test__kill_when_respawning_does_not_clear_kill_event(self): - self._test__kill(True) - - def test__kill_when_not_respawning_clears_kill_event(self): - self._test__kill(False) - - def test__kill_targets_process_for_pid(self): - self._test__kill(False, pid='1') - def _test__kill_process(self, pid, expected, exception_message=None, kill_signal=signal.SIGKILL): self.proc.run_as_root = True @@ -211,7 +219,7 @@ class TestAsyncProcess(base.BaseTestCase): self._test__kill_process('1', True, kill_signal=signal.SIGTERM) def test_stop_calls_kill_with_provided_signal_number(self): - self.proc._kill_event = True + self.proc._is_running = True with mock.patch.object(self.proc, '_kill') as mock_kill: self.proc.stop(kill_signal=signal.SIGTERM) mock_kill.assert_called_once_with(signal.SIGTERM) @@ -267,3 +275,20 @@ class TestAsyncProcessDieOnError(base.BaseTestCase): with mock.patch.object(proc, '_read', return_value='fakedata'),\ mock.patch.object(proc, '_process'): self.assertIsNone(proc._read_stderr()) + + +class TestFailingAsyncProcess(base.BaseTestCase): + def setUp(self): + super(TestFailingAsyncProcess, self).setUp() + path = self.get_temp_file_path('async.tmp', self.get_new_temp_dir()) + self.process = async_process.AsyncProcess(['python', + failing_process.__file__, + path], + respawn_interval=0) + + def test_failing_async_process_handle_error_once(self): + with mock.patch.object(self.process, '_handle_process_error')\ + as handle_error_mock: + self.process.start() + self.process._process.wait() + self.assertEqual(1, handle_error_mock.call_count)