Merge "Allow AsyncProcess to block on process start and stop"
This commit is contained in:
commit
592f641b46
@ -16,6 +16,7 @@ import eventlet
|
||||
import eventlet.event
|
||||
import eventlet.queue
|
||||
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.i18n import _LE
|
||||
from neutron.openstack.common import log as logging
|
||||
@ -52,7 +53,8 @@ class AsyncProcess(object):
|
||||
... print line
|
||||
"""
|
||||
|
||||
def __init__(self, cmd, run_as_root=False, respawn_interval=None):
|
||||
def __init__(self, cmd, run_as_root=False, respawn_interval=None,
|
||||
namespace=None):
|
||||
"""Constructor.
|
||||
|
||||
:param cmd: The list of command arguments to invoke.
|
||||
@ -60,8 +62,11 @@ class AsyncProcess(object):
|
||||
:param respawn_interval: Optional, the interval in seconds to wait
|
||||
to respawn after unexpected process death. Respawn will
|
||||
only be attempted if a value of 0 or greater is provided.
|
||||
:param namespace: Optional, start the command in the specified
|
||||
namespace.
|
||||
"""
|
||||
self.cmd = cmd
|
||||
self.cmd_without_namespace = cmd
|
||||
self.cmd = ip_lib.add_namespace_to_cmd(cmd, namespace)
|
||||
self.run_as_root = run_as_root
|
||||
if respawn_interval is not None and respawn_interval < 0:
|
||||
raise ValueError(_('respawn_interval must be >= 0 if provided.'))
|
||||
@ -75,22 +80,45 @@ class AsyncProcess(object):
|
||||
self._stdout_lines = eventlet.queue.LightQueue()
|
||||
self._stderr_lines = eventlet.queue.LightQueue()
|
||||
|
||||
def start(self):
|
||||
"""Launch a process and monitor it asynchronously."""
|
||||
def is_active(self):
|
||||
# If using sudo rootwrap as a root_helper, we have to wait until sudo
|
||||
# spawns rootwrap and rootwrap spawns the process.
|
||||
|
||||
return utils.pid_invoked_with_cmdline(
|
||||
self.pid, self.cmd_without_namespace)
|
||||
|
||||
def start(self, blocking=False):
|
||||
"""Launch a process and monitor it asynchronously.
|
||||
|
||||
:param blocking: Block until the process has started.
|
||||
:raises eventlet.timeout.Timeout if blocking is True and the process
|
||||
did not start in time.
|
||||
"""
|
||||
if self._kill_event:
|
||||
raise AsyncProcessException(_('Process is already started'))
|
||||
else:
|
||||
LOG.debug('Launching async process [%s].', self.cmd)
|
||||
self._spawn()
|
||||
|
||||
def stop(self):
|
||||
"""Halt the process and watcher threads."""
|
||||
if blocking:
|
||||
utils.wait_until_true(self.is_active)
|
||||
|
||||
def stop(self, blocking=False):
|
||||
"""Halt the process and watcher threads.
|
||||
|
||||
:param blocking: Block until the process has stopped.
|
||||
:raises eventlet.timeout.Timeout if blocking is True and the process
|
||||
did not stop in time.
|
||||
"""
|
||||
if self._kill_event:
|
||||
LOG.debug('Halting async process [%s].', self.cmd)
|
||||
self._kill()
|
||||
else:
|
||||
raise AsyncProcessException(_('Process is not running.'))
|
||||
|
||||
if blocking:
|
||||
utils.wait_until_true(lambda: not self.is_active())
|
||||
|
||||
def _spawn(self):
|
||||
"""Spawn a process and its watchers."""
|
||||
self._kill_event = eventlet.event.Event()
|
||||
@ -107,6 +135,13 @@ class AsyncProcess(object):
|
||||
self._kill_event)
|
||||
self._watchers.append(watcher)
|
||||
|
||||
@property
|
||||
def pid(self):
|
||||
if self._process:
|
||||
return utils.get_root_helper_child_pid(
|
||||
self._process.pid,
|
||||
run_as_root=self.run_as_root)
|
||||
|
||||
def _kill(self, respawning=False):
|
||||
"""Kill the process and the associated watcher greenthreads.
|
||||
|
||||
@ -116,8 +151,7 @@ class AsyncProcess(object):
|
||||
# Halt the greenthreads
|
||||
self._kill_event.send()
|
||||
|
||||
pid = utils.get_root_helper_child_pid(self._process.pid,
|
||||
run_as_root=self.run_as_root)
|
||||
pid = self.pid
|
||||
if pid:
|
||||
self._kill_process(pid)
|
||||
|
||||
|
@ -75,10 +75,7 @@ class SubProcessBase(object):
|
||||
def _execute(cls, options, command, args, run_as_root=False,
|
||||
namespace=None, log_fail_as_error=True):
|
||||
opt_list = ['-%s' % o for o in options]
|
||||
if namespace:
|
||||
ip_cmd = ['ip', 'netns', 'exec', namespace, 'ip']
|
||||
else:
|
||||
ip_cmd = ['ip']
|
||||
ip_cmd = add_namespace_to_cmd(['ip'], namespace)
|
||||
cmd = ip_cmd + opt_list + [command] + list(args)
|
||||
return utils.execute(cmd, run_as_root=run_as_root,
|
||||
log_fail_as_error=log_fail_as_error)
|
||||
@ -689,3 +686,9 @@ def send_garp_for_proxyarp(ns_name, iface_name, address, count):
|
||||
|
||||
if count > 0:
|
||||
eventlet.spawn_n(arping_with_temporary_address)
|
||||
|
||||
|
||||
def add_namespace_to_cmd(cmd, namespace=None):
|
||||
"""Add an optional namespace to the command."""
|
||||
|
||||
return ['ip', 'netns', 'exec', namespace] + cmd if namespace else cmd
|
||||
|
@ -21,6 +21,7 @@ import socket
|
||||
import struct
|
||||
import tempfile
|
||||
|
||||
import eventlet
|
||||
from eventlet.green import subprocess
|
||||
from eventlet import greenthread
|
||||
from oslo_config import cfg
|
||||
@ -213,3 +214,60 @@ def get_root_helper_child_pid(pid, run_as_root=False):
|
||||
# Last process in the tree, return it
|
||||
break
|
||||
return pid
|
||||
|
||||
|
||||
def remove_abs_path(cmd):
|
||||
"""Remove absolute path of executable in cmd
|
||||
|
||||
Note: New instance of list is returned
|
||||
|
||||
:param cmd: parsed shlex command (e.g. ['/bin/foo', 'param1', 'param two'])
|
||||
|
||||
"""
|
||||
if cmd and os.path.isabs(cmd[0]):
|
||||
cmd = list(cmd)
|
||||
cmd[0] = os.path.basename(cmd[0])
|
||||
|
||||
return cmd
|
||||
|
||||
|
||||
def get_cmdline_from_pid(pid):
|
||||
if pid is None or not os.path.exists('/proc/%s' % pid):
|
||||
return []
|
||||
with open('/proc/%s/cmdline' % pid, 'r') as f:
|
||||
return f.readline().split('\0')[:-1]
|
||||
|
||||
|
||||
def cmdlines_are_equal(cmd1, cmd2):
|
||||
"""Validate provided lists containing output of /proc/cmdline are equal
|
||||
|
||||
This function ignores absolute paths of executables in order to have
|
||||
correct results in case one list uses absolute path and the other does not.
|
||||
"""
|
||||
cmd1 = remove_abs_path(cmd1)
|
||||
cmd2 = remove_abs_path(cmd2)
|
||||
return cmd1 == cmd2
|
||||
|
||||
|
||||
def pid_invoked_with_cmdline(pid, expected_cmd):
|
||||
"""Validate process with given pid is running with provided parameters
|
||||
|
||||
"""
|
||||
cmdline = get_cmdline_from_pid(pid)
|
||||
return cmdlines_are_equal(expected_cmd, cmdline)
|
||||
|
||||
|
||||
def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
|
||||
"""
|
||||
Wait until callable predicate is evaluated as True
|
||||
|
||||
:param predicate: Callable deciding whether waiting should continue.
|
||||
Best practice is to instantiate predicate with functools.partial()
|
||||
:param timeout: Timeout in seconds how long should function wait.
|
||||
:param sleep: Polling interval for results in seconds.
|
||||
:param exception: Exception class for eventlet.Timeout.
|
||||
(see doc for eventlet.Timeout for more information)
|
||||
"""
|
||||
with eventlet.timeout.Timeout(timeout, exception):
|
||||
while not predicate():
|
||||
eventlet.sleep(sleep)
|
||||
|
@ -20,8 +20,6 @@ import select
|
||||
import shlex
|
||||
import subprocess
|
||||
|
||||
import eventlet
|
||||
|
||||
from neutron.agent.common import config
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.agent.linux import utils
|
||||
@ -72,63 +70,6 @@ def get_unused_port(used, start=1024, end=65535):
|
||||
return random.choice(list(candidates - used))
|
||||
|
||||
|
||||
def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
|
||||
"""
|
||||
Wait until callable predicate is evaluated as True
|
||||
|
||||
:param predicate: Callable deciding whether waiting should continue.
|
||||
Best practice is to instantiate predicate with functools.partial()
|
||||
:param timeout: Timeout in seconds how long should function wait.
|
||||
:param sleep: Polling interval for results in seconds.
|
||||
:param exception: Exception class for eventlet.Timeout.
|
||||
(see doc for eventlet.Timeout for more information)
|
||||
"""
|
||||
with eventlet.timeout.Timeout(timeout, exception):
|
||||
while not predicate():
|
||||
eventlet.sleep(sleep)
|
||||
|
||||
|
||||
def remove_abs_path(cmd):
|
||||
"""Remove absolute path of executable in cmd
|
||||
|
||||
Note: New instance of list is returned
|
||||
|
||||
:param cmd: parsed shlex command (e.g. ['/bin/foo', 'param1', 'param two'])
|
||||
|
||||
"""
|
||||
if cmd and os.path.isabs(cmd[0]):
|
||||
cmd = list(cmd)
|
||||
cmd[0] = os.path.basename(cmd[0])
|
||||
|
||||
return cmd
|
||||
|
||||
|
||||
def get_cmdline_from_pid(pid):
|
||||
if pid is None or not os.path.exists('/proc/%s' % pid):
|
||||
return list()
|
||||
with open('/proc/%s/cmdline' % pid, 'r') as f:
|
||||
return f.readline().split('\0')[:-1]
|
||||
|
||||
|
||||
def cmdlines_are_equal(cmd1, cmd2):
|
||||
"""Validate provided lists containing output of /proc/cmdline are equal
|
||||
|
||||
This function ignores absolute paths of executables in order to have
|
||||
correct results in case one list uses absolute path and the other does not.
|
||||
"""
|
||||
cmd1 = remove_abs_path(cmd1)
|
||||
cmd2 = remove_abs_path(cmd2)
|
||||
return cmd1 == cmd2
|
||||
|
||||
|
||||
def pid_invoked_with_cmdline(pid, expected_cmd):
|
||||
"""Validate process with given pid is running with provided parameters
|
||||
|
||||
"""
|
||||
cmdline = get_cmdline_from_pid(pid)
|
||||
return cmdlines_are_equal(expected_cmd, cmdline)
|
||||
|
||||
|
||||
class Pinger(object):
|
||||
def __init__(self, namespace, timeout=1, max_attempts=1):
|
||||
self.namespace = namespace
|
||||
@ -182,9 +123,9 @@ class RootHelperProcess(subprocess.Popen):
|
||||
poller = select.poll()
|
||||
poller.register(stream.fileno())
|
||||
poll_predicate = functools.partial(poller.poll, 1)
|
||||
wait_until_true(poll_predicate, timeout, 0.1,
|
||||
RuntimeError(
|
||||
'No output in %.2f seconds' % timeout))
|
||||
utils.wait_until_true(poll_predicate, timeout, 0.1,
|
||||
RuntimeError(
|
||||
'No output in %.2f seconds' % timeout))
|
||||
return stream.readline()
|
||||
|
||||
def writeline(self, data):
|
||||
@ -196,10 +137,10 @@ class RootHelperProcess(subprocess.Popen):
|
||||
def child_is_running():
|
||||
child_pid = utils.get_root_helper_child_pid(
|
||||
self.pid, run_as_root=self.run_as_root)
|
||||
if pid_invoked_with_cmdline(child_pid, self.cmd):
|
||||
if utils.pid_invoked_with_cmdline(child_pid, self.cmd):
|
||||
return True
|
||||
|
||||
wait_until_true(
|
||||
utils.wait_until_true(
|
||||
child_is_running,
|
||||
timeout,
|
||||
exception=RuntimeError("Process %s hasn't been spawned "
|
||||
|
@ -17,14 +17,13 @@ import eventlet
|
||||
from six import moves
|
||||
|
||||
from neutron.agent.linux import async_process
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
class TestAsyncProcess(base.BaseTestCase):
|
||||
class AsyncProcessTestFramework(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestAsyncProcess, self).setUp()
|
||||
super(AsyncProcessTestFramework, self).setUp()
|
||||
self.test_file_path = self.get_temp_file_path('test_async_process.tmp')
|
||||
self.data = [str(x) for x in moves.xrange(4)]
|
||||
with file(self.test_file_path, 'w') as f:
|
||||
@ -39,12 +38,21 @@ class TestAsyncProcess(base.BaseTestCase):
|
||||
output += new_output
|
||||
eventlet.sleep(0.01)
|
||||
|
||||
|
||||
class TestAsyncProcess(AsyncProcessTestFramework):
|
||||
def _safe_stop(self, proc):
|
||||
try:
|
||||
proc.stop()
|
||||
except async_process.AsyncProcessException:
|
||||
pass
|
||||
|
||||
def test_stopping_async_process_lifecycle(self):
|
||||
proc = async_process.AsyncProcess(['tail', '-f',
|
||||
self.test_file_path])
|
||||
proc.start()
|
||||
self.addCleanup(self._safe_stop, proc)
|
||||
proc.start(blocking=True)
|
||||
self._check_stdout(proc)
|
||||
proc.stop()
|
||||
proc.stop(blocking=True)
|
||||
|
||||
# Ensure that the process and greenthreads have stopped
|
||||
proc._process.wait()
|
||||
@ -56,12 +64,10 @@ class TestAsyncProcess(base.BaseTestCase):
|
||||
proc = async_process.AsyncProcess(['tail', '-f',
|
||||
self.test_file_path],
|
||||
respawn_interval=0)
|
||||
self.addCleanup(self._safe_stop, proc)
|
||||
proc.start()
|
||||
|
||||
# Ensure that the same output is read twice
|
||||
self._check_stdout(proc)
|
||||
pid = utils.get_root_helper_child_pid(proc._process.pid,
|
||||
proc.run_as_root)
|
||||
proc._kill_process(pid)
|
||||
proc._kill_process(proc.pid)
|
||||
self._check_stdout(proc)
|
||||
proc.stop()
|
||||
|
40
neutron/tests/functional/agent/linux/test_utils.py
Normal file
40
neutron/tests/functional/agent/linux/test_utils.py
Normal file
@ -0,0 +1,40 @@
|
||||
# Copyright 2015 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
import testtools
|
||||
|
||||
from neutron.agent.linux import async_process
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.tests.functional.agent.linux import test_async_process
|
||||
|
||||
|
||||
class TestPIDHelpers(test_async_process.AsyncProcessTestFramework):
|
||||
def test_get_cmdline_from_pid_and_pid_invoked_with_cmdline(self):
|
||||
cmd = ['tail', '-f', self.test_file_path]
|
||||
proc = async_process.AsyncProcess(cmd)
|
||||
proc.start(blocking=True)
|
||||
self.addCleanup(proc.stop)
|
||||
|
||||
pid = proc.pid
|
||||
self.assertEqual(cmd, utils.get_cmdline_from_pid(pid))
|
||||
self.assertTrue(utils.pid_invoked_with_cmdline(pid, cmd))
|
||||
self.assertEqual([], utils.get_cmdline_from_pid(-1))
|
||||
|
||||
def test_wait_until_true_predicate_succeeds(self):
|
||||
utils.wait_until_true(lambda: True)
|
||||
|
||||
def test_wait_until_true_predicate_fails(self):
|
||||
with testtools.ExpectedException(eventlet.timeout.Timeout):
|
||||
utils.wait_until_true(lambda: False, 2)
|
@ -31,6 +31,7 @@ from neutron.agent.linux import dhcp
|
||||
from neutron.agent.linux import external_process
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.agent.linux import ovs_lib
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.agent.metadata import agent as metadata_agent
|
||||
from neutron.common import config as common_config
|
||||
from neutron.common import constants as l3_constants
|
||||
@ -395,7 +396,7 @@ class L3AgentTestCase(L3AgentTestFramework):
|
||||
port = router.get_ex_gw_port()
|
||||
interface_name = self.agent.get_external_device_name(port['id'])
|
||||
self._assert_no_ip_addresses_on_interface(router, interface_name)
|
||||
helpers.wait_until_true(lambda: router.ha_state == 'master')
|
||||
utils.wait_until_true(lambda: router.ha_state == 'master')
|
||||
|
||||
# Keepalived notifies of a state transition when it starts,
|
||||
# not when it ends. Thus, we have to wait until keepalived finishes
|
||||
@ -407,7 +408,7 @@ class L3AgentTestCase(L3AgentTestFramework):
|
||||
device,
|
||||
self.agent.get_internal_device_name,
|
||||
router.ns_name)
|
||||
helpers.wait_until_true(device_exists)
|
||||
utils.wait_until_true(device_exists)
|
||||
|
||||
self.assertTrue(self._namespace_exists(router.ns_name))
|
||||
self.assertTrue(self._metadata_proxy_exists(self.agent.conf, router))
|
||||
@ -478,7 +479,7 @@ class L3AgentTestCase(L3AgentTestFramework):
|
||||
restarted_agent = l3_test_agent.TestL3NATAgent(self.agent.host,
|
||||
self.agent.conf)
|
||||
self._create_router(restarted_agent, router1.router)
|
||||
helpers.wait_until_true(lambda: self._floating_ips_configured(router1))
|
||||
utils.wait_until_true(lambda: self._floating_ips_configured(router1))
|
||||
|
||||
|
||||
class L3HATestFramework(L3AgentTestFramework):
|
||||
@ -506,16 +507,16 @@ class L3HATestFramework(L3AgentTestFramework):
|
||||
|
||||
router2 = self.manage_router(self.failover_agent, router_info_2)
|
||||
|
||||
helpers.wait_until_true(lambda: router1.ha_state == 'master')
|
||||
helpers.wait_until_true(lambda: router2.ha_state == 'backup')
|
||||
utils.wait_until_true(lambda: router1.ha_state == 'master')
|
||||
utils.wait_until_true(lambda: router2.ha_state == 'backup')
|
||||
|
||||
device_name = router1.get_ha_device_name(
|
||||
router1.router[l3_constants.HA_INTERFACE_KEY]['id'])
|
||||
ha_device = ip_lib.IPDevice(device_name, namespace=router1.ns_name)
|
||||
ha_device.link.set_down()
|
||||
|
||||
helpers.wait_until_true(lambda: router2.ha_state == 'master')
|
||||
helpers.wait_until_true(lambda: router1.ha_state == 'fault')
|
||||
utils.wait_until_true(lambda: router2.ha_state == 'master')
|
||||
utils.wait_until_true(lambda: router1.ha_state == 'fault')
|
||||
|
||||
|
||||
class MetadataFakeProxyHandler(object):
|
||||
|
@ -187,3 +187,19 @@ class TestGetRoothelperChildPid(base.BaseTestCase):
|
||||
|
||||
def test_returns_none_as_root(self):
|
||||
self._test_get_root_helper_child_pid(expected=None, run_as_root=True)
|
||||
|
||||
|
||||
class TestPathUtilities(base.BaseTestCase):
|
||||
def test_remove_abs_path(self):
|
||||
self.assertEqual(['ping', '8.8.8.8'],
|
||||
utils.remove_abs_path(['/usr/bin/ping', '8.8.8.8']))
|
||||
|
||||
def test_cmdlines_are_equal(self):
|
||||
self.assertTrue(utils.cmdlines_are_equal(
|
||||
['ping', '8.8.8.8'],
|
||||
['/usr/bin/ping', '8.8.8.8']))
|
||||
|
||||
def test_cmdlines_are_equal_different_commands(self):
|
||||
self.assertFalse(utils.cmdlines_are_equal(
|
||||
['ping', '8.8.8.8'],
|
||||
['/usr/bin/ping6', '8.8.8.8']))
|
||||
|
@ -1002,3 +1002,14 @@ class TestArpPing(TestIPCmdBase):
|
||||
|
||||
# If this was called then check_added_address probably had a assert
|
||||
self.assertFalse(device.addr.add.called)
|
||||
|
||||
|
||||
class TestAddNamespaceToCmd(base.BaseTestCase):
|
||||
def test_add_namespace_to_cmd_with_namespace(self):
|
||||
cmd = ['ping', '8.8.8.8']
|
||||
self.assertEqual(['ip', 'netns', 'exec', 'tmp'] + cmd,
|
||||
ip_lib.add_namespace_to_cmd(cmd, 'tmp'))
|
||||
|
||||
def test_add_namespace_to_cmd_without_namespace(self):
|
||||
cmd = ['ping', '8.8.8.8']
|
||||
self.assertEqual(cmd, ip_lib.add_namespace_to_cmd(cmd, None))
|
||||
|
Loading…
Reference in New Issue
Block a user