Browse Source
Interacting with a long-running asynchronous process requires the
use of non-blocking io. This change adds a helper class that can
launch a long-running process and read stdout and stderr in a
non-blocking fashion via eventlet.
This functionality is intended to support monitoring ovsdb via
a long-running and root-privileged invocation of ovsdb-client.
The complexity of the system interaction in this patch suggested
the addition of a functional test that validated actual behaviour.
The test was added under the neutron/tests/functional path which
is now included in the testr search path.
Partial-Bug: #1177973
Change-Id: I9969e556acecf7a9e77d873371cc2ec2647be011
(cherry picked from commit acf0209b28
)
changes/08/65808/1
12 changed files with 681 additions and 18 deletions
@ -1,4 +1,4 @@
|
||||
[DEFAULT] |
||||
test_command=OS_STDOUT_CAPTURE=1 OS_STDERR_CAPTURE=1 OS_LOG_CAPTURE=1 ${PYTHON:-python} -m subunit.run discover -t ./ neutron/tests/unit $LISTOPT $IDOPTION |
||||
test_command=OS_STDOUT_CAPTURE=1 OS_STDERR_CAPTURE=1 OS_LOG_CAPTURE=1 ${PYTHON:-python} -m subunit.run discover -t ./ neutron/tests $LISTOPT $IDOPTION |
||||
test_id_option=--load-list $IDFILE |
||||
test_list_option=--list |
||||
|
@ -0,0 +1,214 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4 |
||||
|
||||
# Copyright 2013 Red Hat, Inc. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
||||
|
||||
import eventlet |
||||
import eventlet.event |
||||
import eventlet.queue |
||||
import eventlet.timeout |
||||
|
||||
from neutron.agent.linux import utils |
||||
from neutron.openstack.common import log as logging |
||||
|
||||
|
||||
LOG = logging.getLogger(__name__) |
||||
|
||||
|
||||
class AsyncProcessException(Exception): |
||||
pass |
||||
|
||||
|
||||
class AsyncProcess(object): |
||||
"""Manages an asynchronous process. |
||||
|
||||
This class spawns a new process via subprocess and uses |
||||
greenthreads to read stderr and stdout asynchronously into queues |
||||
that can be read via repeatedly calling iter_stdout() and |
||||
iter_stderr(). |
||||
|
||||
If respawn_interval is non-zero, any error in communicating with |
||||
the managed process will result in the process and greenthreads |
||||
being cleaned up and the process restarted after the specified |
||||
interval. |
||||
|
||||
Example usage: |
||||
|
||||
>>> import time |
||||
>>> proc = AsyncProcess(['ping']) |
||||
>>> proc.start() |
||||
>>> time.sleep(5) |
||||
>>> proc.stop() |
||||
>>> for line in proc.iter_stdout(): |
||||
... print line |
||||
""" |
||||
|
||||
def __init__(self, cmd, root_helper=None, respawn_interval=None): |
||||
"""Constructor. |
||||
|
||||
:param cmd: The list of command arguments to invoke. |
||||
:param root_helper: Optional, utility to use when running shell cmds. |
||||
:param respawn_interval: Optional, the interval in seconds to wait |
||||
to respawn after unexpected process death. Respawn will |
||||
only be attempted if a value of 0 or greater is provided. |
||||
""" |
||||
self.cmd = cmd |
||||
self.root_helper = root_helper |
||||
if respawn_interval is not None and respawn_interval < 0: |
||||
raise ValueError(_('respawn_interval must be >= 0 if provided.')) |
||||
self.respawn_interval = respawn_interval |
||||
self._process = None |
||||
self._kill_event = None |
||||
self._stdout_lines = eventlet.queue.LightQueue() |
||||
self._stderr_lines = eventlet.queue.LightQueue() |
||||
self._watchers = [] |
||||
|
||||
def start(self): |
||||
"""Launch a process and monitor it asynchronously.""" |
||||
if self._kill_event: |
||||
raise AsyncProcessException(_('Process is already started')) |
||||
else: |
||||
LOG.debug(_('Launching async process [%s].'), self.cmd) |
||||
self._spawn() |
||||
|
||||
def stop(self): |
||||
"""Halt the process and watcher threads.""" |
||||
if self._kill_event: |
||||
LOG.debug(_('Halting async process [%s].'), self.cmd) |
||||
self._kill() |
||||
else: |
||||
raise AsyncProcessException(_('Process is not running.')) |
||||
|
||||
def _spawn(self): |
||||
"""Spawn a process and its watchers.""" |
||||
self._kill_event = eventlet.event.Event() |
||||
self._process, cmd = utils.create_process(self.cmd, |
||||
root_helper=self.root_helper) |
||||
self._watchers = [] |
||||
for reader in (self._read_stdout, self._read_stderr): |
||||
# Pass the stop event directly to the greenthread to |
||||
# ensure that assignment of a new event to the instance |
||||
# attribute does not prevent the greenthread from using |
||||
# the original event. |
||||
watcher = eventlet.spawn(self._watch_process, |
||||
reader, |
||||
self._kill_event) |
||||
self._watchers.append(watcher) |
||||
|
||||
def _kill(self, respawning=False): |
||||
"""Kill the process and the associated watcher greenthreads. |
||||
|
||||
:param respawning: Optional, whether respawn will be subsequently |
||||
attempted. |
||||
""" |
||||
# Halt the greenthreads |
||||
self._kill_event.send() |
||||
|
||||
pid = self._get_pid_to_kill() |
||||
if pid: |
||||
self._kill_process(pid) |
||||
|
||||
if not respawning: |
||||
# Clear the kill event to ensure the process can be |
||||
# explicitly started again. |
||||
self._kill_event = None |
||||
|
||||
def _get_pid_to_kill(self): |
||||
pid = self._process.pid |
||||
# If root helper was used, two processes will be created: |
||||
# |
||||
# - a root helper process (e.g. sudo myscript) |
||||
# - a child process (e.g. myscript) |
||||
# |
||||
# Killing the root helper process will leave the child process |
||||
# as a zombie, so the only way to ensure that both die is to |
||||
# target the child process directly. |
||||
if self.root_helper: |
||||
pids = utils.find_child_pids(pid) |
||||
if pids: |
||||
# The root helper will only ever launch a single child. |
||||
pid = pids[0] |
||||
else: |
||||
# Process is already dead. |
||||
pid = None |
||||
return pid |
||||
|
||||
def _kill_process(self, pid): |
||||
try: |
||||
# A process started by a root helper will be running as |
||||
# root and need to be killed via the same helper. |
||||
utils.execute(['kill', '-9', pid], root_helper=self.root_helper) |
||||
except Exception as ex: |
||||
stale_pid = (isinstance(ex, RuntimeError) and |
||||
'No such process' in str(ex)) |
||||
if not stale_pid: |
||||
LOG.exception(_('An error occurred while killing [%s].'), |
||||
self.cmd) |
||||
return False |
||||
return True |
||||
|
||||
def _handle_process_error(self): |
||||
"""Kill the async process and respawn if necessary.""" |
||||
LOG.debug(_('Halting async process [%s] in response to an error.'), |
||||
self.cmd) |
||||
respawning = self.respawn_interval >= 0 |
||||
self._kill(respawning=respawning) |
||||
if respawning: |
||||
eventlet.sleep(self.respawn_interval) |
||||
LOG.debug(_('Respawning async process [%s].'), self.cmd) |
||||
self._spawn() |
||||
|
||||
def _watch_process(self, callback, kill_event): |
||||
while not kill_event.ready(): |
||||
try: |
||||
if not callback(): |
||||
break |
||||
except Exception: |
||||
LOG.exception(_('An error occured while communicating ' |
||||
'with async process [%s].'), self.cmd) |
||||
break |
||||
# Ensure that watching a process with lots of output does |
||||
# not block execution of other greenthreads. |
||||
eventlet.sleep() |
||||
# The kill event not being ready indicates that the loop was |
||||
# broken out of due to an error in the watched process rather |
||||
# than the loop condition being satisfied. |
||||
if not kill_event.ready(): |
||||
self._handle_process_error() |
||||
|
||||
def _read(self, stream, queue): |
||||
data = stream.readline() |
||||
if data: |
||||
data = data.strip() |
||||
queue.put(data) |
||||
return data |
||||
|
||||
def _read_stdout(self): |
||||
return self._read(self._process.stdout, self._stdout_lines) |
||||
|
||||
def _read_stderr(self): |
||||
return self._read(self._process.stderr, self._stderr_lines) |
||||
|
||||
def _iter_queue(self, queue): |
||||
while True: |
||||
try: |
||||
yield queue.get_nowait() |
||||
except eventlet.queue.Empty: |
||||
break |
||||
|
||||
def iter_stdout(self): |
||||
return self._iter_queue(self._stdout_lines) |
||||
|
||||
def iter_stderr(self): |
||||
return self._iter_queue(self._stderr_lines) |
@ -0,0 +1,15 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4 |
||||
|
||||
# Copyright 2013 Red Hat, Inc. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
@ -0,0 +1,15 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4 |
||||
|
||||
# Copyright 2013 Red Hat, Inc. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
@ -0,0 +1,15 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4 |
||||
|
||||
# Copyright 2013 Red Hat, Inc. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
@ -0,0 +1,79 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4 |
||||
|
||||
# Copyright 2013 Red Hat, Inc. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
||||
|
||||
import contextlib |
||||
|
||||
import eventlet |
||||
import eventlet.timeout |
||||
import fixtures |
||||
|
||||
from neutron.agent.linux import async_process |
||||
from neutron.tests import base |
||||
|
||||
|
||||
class TestAsyncProcess(base.BaseTestCase): |
||||
|
||||
def setUp(self): |
||||
super(TestAsyncProcess, self).setUp() |
||||
self.test_file_path = self.useFixture( |
||||
fixtures.TempDir()).join("test_async_process.tmp") |
||||
self.data = [str(x) for x in xrange(4)] |
||||
with file(self.test_file_path, 'w') as f: |
||||
f.writelines('%s\n' % item for item in self.data) |
||||
|
||||
def _check_stdout(self, proc): |
||||
# Ensure that all the output from the file is read |
||||
output = [] |
||||
while output != self.data: |
||||
new_output = list(proc.iter_stdout()) |
||||
if new_output: |
||||
output += new_output |
||||
eventlet.sleep(0.01) |
||||
|
||||
@contextlib.contextmanager |
||||
def assert_max_execution_time(self, max_execution_time=5): |
||||
with eventlet.timeout.Timeout(max_execution_time, False): |
||||
yield |
||||
return |
||||
self.fail('Execution of this test timed out') |
||||
|
||||
def test_stopping_async_process_lifecycle(self): |
||||
with self.assert_max_execution_time(): |
||||
proc = async_process.AsyncProcess(['tail', '-f', |
||||
self.test_file_path]) |
||||
proc.start() |
||||
self._check_stdout(proc) |
||||
proc.stop() |
||||
|
||||
# Ensure that the process and greenthreads have stopped |
||||
proc._process.wait() |
||||
self.assertEqual(proc._process.returncode, -9) |
||||
for watcher in proc._watchers: |
||||
watcher.wait() |
||||
|
||||
def test_async_process_respawns(self): |
||||
with self.assert_max_execution_time(): |
||||
proc = async_process.AsyncProcess(['tail', '-f', |
||||
self.test_file_path], |
||||
respawn_interval=0) |
||||
proc.start() |
||||
|
||||
# Ensure that the same output is read twice |
||||
self._check_stdout(proc) |
||||
pid = proc._get_pid_to_kill() |
||||
proc._kill_process(pid) |
||||
self._check_stdout(proc) |
||||
proc.stop() |
@ -0,0 +1,15 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4 |
||||
|
||||
# Copyright 2013 Red Hat, Inc. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
@ -0,0 +1,15 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4 |
||||
|
||||
# Copyright 2013 Red Hat, Inc. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
@ -0,0 +1,239 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4 |
||||
|
||||
# Copyright 2013 Red Hat, Inc. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
||||
|
||||
import eventlet.event |
||||
import eventlet.queue |
||||
import eventlet.timeout |
||||
import mock |
||||
import testtools |
||||
|
||||
from neutron.agent.linux import async_process |
||||
from neutron.agent.linux import utils |
||||
from neutron.tests import base |
||||
|
||||
|
||||
_marker = () |
||||
|
||||
|
||||
class TestAsyncProcess(base.BaseTestCase): |
||||
|
||||
def setUp(self): |
||||
super(TestAsyncProcess, self).setUp() |
||||
self.proc = async_process.AsyncProcess(['fake']) |
||||
|
||||
def test_construtor_raises_exception_for_negative_respawn_interval(self): |
||||
with testtools.ExpectedException(ValueError): |
||||
async_process.AsyncProcess(['fake'], respawn_interval=-1) |
||||
|
||||
def test__spawn(self): |
||||
expected_process = 'Foo' |
||||
proc = self.proc |
||||
with mock.patch.object(utils, 'create_process') as mock_create_process: |
||||
mock_create_process.return_value = [expected_process, None] |
||||
with mock.patch('eventlet.spawn') as mock_spawn: |
||||
proc._spawn() |
||||
|
||||
self.assertIsInstance(proc._kill_event, eventlet.event.Event) |
||||
self.assertEqual(proc._process, expected_process) |
||||
mock_spawn.assert_has_calls([ |
||||
mock.call(proc._watch_process, |
||||
proc._read_stdout, |
||||
proc._kill_event), |
||||
mock.call(proc._watch_process, |
||||
proc._read_stderr, |
||||
proc._kill_event), |
||||
]) |
||||
self.assertEqual(len(proc._watchers), 2) |
||||
|
||||
def test__handle_process_error_kills_with_respawn(self): |
||||
with mock.patch.object(self.proc, '_kill') as kill: |
||||
self.proc._handle_process_error() |
||||
|
||||
kill.assert_has_calls(mock.call(respawning=False)) |
||||
|
||||
def test__handle_process_error_kills_without_respawn(self): |
||||
self.proc.respawn_interval = 1 |
||||
with mock.patch.object(self.proc, '_kill') as kill: |
||||
with mock.patch.object(self.proc, '_spawn') as spawn: |
||||
with mock.patch('eventlet.sleep') as sleep: |
||||
self.proc._handle_process_error() |
||||
|
||||
kill.assert_has_calls(mock.call(respawning=True)) |
||||
sleep.assert_has_calls(mock.call(self.proc.respawn_interval)) |
||||
spawn.assert_called_once() |
||||
|
||||
def _test__watch_process(self, callback, kill_event): |
||||
self.proc._kill_event = kill_event |
||||
# Ensure the test times out eventually if the watcher loops endlessly |
||||
with eventlet.timeout.Timeout(5): |
||||
with mock.patch.object(self.proc, |
||||
'_handle_process_error') as func: |
||||
self.proc._watch_process(callback, kill_event) |
||||
|
||||
if not kill_event.ready(): |
||||
func.assert_called_once() |
||||
|
||||
def test__watch_process_exits_on_callback_failure(self): |
||||
self._test__watch_process(lambda: False, eventlet.event.Event()) |
||||
|
||||
def test__watch_process_exits_on_exception(self): |
||||
def foo(): |
||||
raise Exception('Error!') |
||||
self._test__watch_process(foo, eventlet.event.Event()) |
||||
|
||||
def test__watch_process_exits_on_sent_kill_event(self): |
||||
kill_event = eventlet.event.Event() |
||||
kill_event.send() |
||||
self._test__watch_process(None, kill_event) |
||||
|
||||
def _test_read_output_queues_and_returns_result(self, output): |
||||
queue = eventlet.queue.LightQueue() |
||||
mock_stream = mock.Mock() |
||||
with mock.patch.object(mock_stream, 'readline') as mock_readline: |
||||
mock_readline.return_value = output |
||||
result = self.proc._read(mock_stream, queue) |
||||
|
||||
if output: |
||||
self.assertEqual(output, result) |
||||
self.assertEqual(output, queue.get_nowait()) |
||||
else: |
||||
self.assertFalse(result) |
||||
self.assertTrue(queue.empty()) |
||||
|
||||
def test__read_queues_and_returns_output(self): |
||||
self._test_read_output_queues_and_returns_result('foo') |
||||
|
||||
def test__read_returns_none_for_missing_output(self): |
||||
self._test_read_output_queues_and_returns_result('') |
||||
|
||||
def test_start_raises_exception_if_process_already_started(self): |
||||
self.proc._kill_event = True |
||||
with testtools.ExpectedException(async_process.AsyncProcessException): |
||||
self.proc.start() |
||||
|
||||
def test_start_invokes__spawn(self): |
||||
with mock.patch.object(self.proc, '_spawn') as mock_start: |
||||
self.proc.start() |
||||
|
||||
mock_start.assert_called_once() |
||||
|
||||
def test__iter_queue_returns_empty_list_for_empty_queue(self): |
||||
result = list(self.proc._iter_queue(eventlet.queue.LightQueue())) |
||||
self.assertEqual(result, []) |
||||
|
||||
def test__iter_queue_returns_queued_data(self): |
||||
queue = eventlet.queue.LightQueue() |
||||
queue.put('foo') |
||||
result = list(self.proc._iter_queue(queue)) |
||||
self.assertEqual(result, ['foo']) |
||||
|
||||
def _test_iter_output_calls_iter_queue_on_output_queue(self, output_type): |
||||
expected_value = 'foo' |
||||
with mock.patch.object(self.proc, '_iter_queue') as mock_iter_queue: |
||||
mock_iter_queue.return_value = expected_value |
||||
target_func = getattr(self.proc, 'iter_%s' % output_type, None) |
||||
value = target_func() |
||||
|
||||
self.assertEqual(value, expected_value) |
||||
queue = getattr(self.proc, '_%s_lines' % output_type, None) |
||||
mock_iter_queue.assert_called_with(queue) |
||||
|
||||
def test_iter_stdout(self): |
||||
self._test_iter_output_calls_iter_queue_on_output_queue('stdout') |
||||
|
||||
def test_iter_stderr(self): |
||||
self._test_iter_output_calls_iter_queue_on_output_queue('stderr') |
||||
|
||||
def _test__kill(self, respawning, pid=None): |
||||
with mock.patch.object(self.proc, '_kill_event') as mock_kill_event: |
||||
with mock.patch.object(self.proc, '_get_pid_to_kill', |
||||
return_value=pid): |
||||
with mock.patch.object(self.proc, |
||||
'_kill_process') as mock_kill_process: |
||||
self.proc._kill(respawning) |
||||
|
||||
if respawning: |
||||
self.assertIsNotNone(self.proc._kill_event) |
||||
else: |
||||
self.assertIsNone(self.proc._kill_event) |
||||
|
||||
mock_kill_event.send.assert_called_once() |
||||
if pid: |
||||
mock_kill_process.assert_called_once(pid) |
||||
|
||||
def test__kill_when_respawning_does_not_clear_kill_event(self): |
||||
self._test__kill(True) |
||||
|
||||
def test__kill_when_not_respawning_clears_kill_event(self): |
||||
self._test__kill(False) |
||||
|
||||
def test__kill_targets_process_for_pid(self): |
||||
self._test__kill(False, pid='1') |
||||
|
||||
def _test__get_pid_to_kill(self, expected=_marker, |
||||
root_helper=None, pids=None): |
||||
if root_helper: |
||||
self.proc.root_helper = root_helper |
||||
with mock.patch.object(self.proc, '_process') as mock_process: |
||||
with mock.patch.object(mock_process, 'pid') as mock_pid: |
||||
with mock.patch.object(utils, 'find_child_pids', |
||||
return_value=pids): |
||||
actual = self.proc._get_pid_to_kill() |
||||
if expected is _marker: |
||||
expected = mock_pid |
||||
self.assertEqual(expected, actual) |
||||
|
||||
def test__get_pid_to_kill_returns_process_pid_without_root_helper(self): |
||||
self._test__get_pid_to_kill() |
||||
|
||||
def test__get_pid_to_kill_returns_child_pid_with_root_helper(self): |
||||
self._test__get_pid_to_kill(expected='1', pids=['1'], root_helper='a') |
||||
|
||||
def test__get_pid_to_kill_returns_none_with_root_helper(self): |
||||
self._test__get_pid_to_kill(expected=None, root_helper='a') |
||||
|
||||
def _test__kill_process(self, pid, expected, exception_message=None): |
||||
self.proc.root_helper = 'foo' |
||||
if exception_message: |
||||
exc = RuntimeError(exception_message) |
||||
else: |
||||
exc = None |
||||
with mock.patch.object(utils, 'execute', |
||||
side_effect=exc) as mock_execute: |
||||
actual = self.proc._kill_process(pid) |
||||
|
||||
self.assertEqual(expected, actual) |
||||
mock_execute.assert_called_with(['kill', '-9', pid], |
||||
root_helper=self.proc.root_helper) |
||||
|
||||
def test__kill_process_returns_true_for_valid_pid(self): |
||||
self._test__kill_process('1', True) |
||||
|
||||
def test__kill_process_returns_true_for_stale_pid(self): |
||||
self._test__kill_process('1', True, 'No such process') |
||||
|
||||
def test__kill_process_returns_false_for_execute_exception(self): |
||||
self._test__kill_process('1', False, 'Invalid') |
||||
|
||||
def test_stop_calls_kill(self): |
||||
self.proc._kill_event = True |
||||
with mock.patch.object(self.proc, '_kill') as mock_kill: |
||||
self.proc.stop() |
||||
mock_kill.called_once() |
||||
|
||||
def test_stop_raises_exception_if_already_started(self): |
||||
with testtools.ExpectedException(async_process.AsyncProcessException): |
||||
self.proc.stop() |
Loading…
Reference in new issue