Allow AsyncProcess to block on process start and stop

* Move utility functions from the test tree to the non-test tree
* Add tests for the newly moved functions
* Use these functions in AsyncProcess

This will allow the ip monitor in the following patch to start
and stop in a synchronous manner.

Related-Bug: #1402010
Change-Id: I03727d8acc17e561d3473b0ebecfbe49cb5523b1
This commit is contained in:
Assaf Muller 2015-02-21 19:27:44 -05:00
parent 9ddcd1adc8
commit 7907d40075
9 changed files with 202 additions and 92 deletions

View File

@ -16,6 +16,7 @@ import eventlet
import eventlet.event
import eventlet.queue
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.i18n import _LE
from neutron.openstack.common import log as logging
@ -52,7 +53,8 @@ class AsyncProcess(object):
... print line
"""
def __init__(self, cmd, run_as_root=False, respawn_interval=None):
def __init__(self, cmd, run_as_root=False, respawn_interval=None,
namespace=None):
"""Constructor.
:param cmd: The list of command arguments to invoke.
@ -60,8 +62,11 @@ class AsyncProcess(object):
:param respawn_interval: Optional, the interval in seconds to wait
to respawn after unexpected process death. Respawn will
only be attempted if a value of 0 or greater is provided.
:param namespace: Optional, start the command in the specified
namespace.
"""
self.cmd = cmd
self.cmd_without_namespace = cmd
self.cmd = ip_lib.add_namespace_to_cmd(cmd, namespace)
self.run_as_root = run_as_root
if respawn_interval is not None and respawn_interval < 0:
raise ValueError(_('respawn_interval must be >= 0 if provided.'))
@ -75,22 +80,45 @@ class AsyncProcess(object):
self._stdout_lines = eventlet.queue.LightQueue()
self._stderr_lines = eventlet.queue.LightQueue()
def start(self):
"""Launch a process and monitor it asynchronously."""
def is_active(self):
# If using sudo rootwrap as a root_helper, we have to wait until sudo
# spawns rootwrap and rootwrap spawns the process.
return utils.pid_invoked_with_cmdline(
self.pid, self.cmd_without_namespace)
def start(self, blocking=False):
"""Launch a process and monitor it asynchronously.
:param blocking: Block until the process has started.
:raises eventlet.timeout.Timeout if blocking is True and the process
did not start in time.
"""
if self._kill_event:
raise AsyncProcessException(_('Process is already started'))
else:
LOG.debug('Launching async process [%s].', self.cmd)
self._spawn()
def stop(self):
"""Halt the process and watcher threads."""
if blocking:
utils.wait_until_true(self.is_active)
def stop(self, blocking=False):
"""Halt the process and watcher threads.
:param blocking: Block until the process has stopped.
:raises eventlet.timeout.Timeout if blocking is True and the process
did not stop in time.
"""
if self._kill_event:
LOG.debug('Halting async process [%s].', self.cmd)
self._kill()
else:
raise AsyncProcessException(_('Process is not running.'))
if blocking:
utils.wait_until_true(lambda: not self.is_active())
def _spawn(self):
"""Spawn a process and its watchers."""
self._kill_event = eventlet.event.Event()
@ -107,6 +135,13 @@ class AsyncProcess(object):
self._kill_event)
self._watchers.append(watcher)
@property
def pid(self):
if self._process:
return utils.get_root_helper_child_pid(
self._process.pid,
run_as_root=self.run_as_root)
def _kill(self, respawning=False):
"""Kill the process and the associated watcher greenthreads.
@ -116,8 +151,7 @@ class AsyncProcess(object):
# Halt the greenthreads
self._kill_event.send()
pid = utils.get_root_helper_child_pid(self._process.pid,
run_as_root=self.run_as_root)
pid = self.pid
if pid:
self._kill_process(pid)

View File

@ -75,10 +75,7 @@ class SubProcessBase(object):
def _execute(cls, options, command, args, run_as_root=False,
namespace=None, log_fail_as_error=True):
opt_list = ['-%s' % o for o in options]
if namespace:
ip_cmd = ['ip', 'netns', 'exec', namespace, 'ip']
else:
ip_cmd = ['ip']
ip_cmd = add_namespace_to_cmd(['ip'], namespace)
cmd = ip_cmd + opt_list + [command] + list(args)
return utils.execute(cmd, run_as_root=run_as_root,
log_fail_as_error=log_fail_as_error)
@ -689,3 +686,9 @@ def send_garp_for_proxyarp(ns_name, iface_name, address, count):
if count > 0:
eventlet.spawn_n(arping_with_temporary_address)
def add_namespace_to_cmd(cmd, namespace=None):
"""Add an optional namespace to the command."""
return ['ip', 'netns', 'exec', namespace] + cmd if namespace else cmd

View File

@ -21,6 +21,7 @@ import socket
import struct
import tempfile
import eventlet
from eventlet.green import subprocess
from eventlet import greenthread
from oslo_config import cfg
@ -213,3 +214,60 @@ def get_root_helper_child_pid(pid, run_as_root=False):
# Last process in the tree, return it
break
return pid
def remove_abs_path(cmd):
"""Remove absolute path of executable in cmd
Note: New instance of list is returned
:param cmd: parsed shlex command (e.g. ['/bin/foo', 'param1', 'param two'])
"""
if cmd and os.path.isabs(cmd[0]):
cmd = list(cmd)
cmd[0] = os.path.basename(cmd[0])
return cmd
def get_cmdline_from_pid(pid):
if pid is None or not os.path.exists('/proc/%s' % pid):
return []
with open('/proc/%s/cmdline' % pid, 'r') as f:
return f.readline().split('\0')[:-1]
def cmdlines_are_equal(cmd1, cmd2):
"""Validate provided lists containing output of /proc/cmdline are equal
This function ignores absolute paths of executables in order to have
correct results in case one list uses absolute path and the other does not.
"""
cmd1 = remove_abs_path(cmd1)
cmd2 = remove_abs_path(cmd2)
return cmd1 == cmd2
def pid_invoked_with_cmdline(pid, expected_cmd):
"""Validate process with given pid is running with provided parameters
"""
cmdline = get_cmdline_from_pid(pid)
return cmdlines_are_equal(expected_cmd, cmdline)
def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
"""
Wait until callable predicate is evaluated as True
:param predicate: Callable deciding whether waiting should continue.
Best practice is to instantiate predicate with functools.partial()
:param timeout: Timeout in seconds how long should function wait.
:param sleep: Polling interval for results in seconds.
:param exception: Exception class for eventlet.Timeout.
(see doc for eventlet.Timeout for more information)
"""
with eventlet.timeout.Timeout(timeout, exception):
while not predicate():
eventlet.sleep(sleep)

View File

@ -20,8 +20,6 @@ import select
import shlex
import subprocess
import eventlet
from neutron.agent.common import config
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
@ -72,63 +70,6 @@ def get_unused_port(used, start=1024, end=65535):
return random.choice(list(candidates - used))
def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
"""
Wait until callable predicate is evaluated as True
:param predicate: Callable deciding whether waiting should continue.
Best practice is to instantiate predicate with functools.partial()
:param timeout: Timeout in seconds how long should function wait.
:param sleep: Polling interval for results in seconds.
:param exception: Exception class for eventlet.Timeout.
(see doc for eventlet.Timeout for more information)
"""
with eventlet.timeout.Timeout(timeout, exception):
while not predicate():
eventlet.sleep(sleep)
def remove_abs_path(cmd):
"""Remove absolute path of executable in cmd
Note: New instance of list is returned
:param cmd: parsed shlex command (e.g. ['/bin/foo', 'param1', 'param two'])
"""
if cmd and os.path.isabs(cmd[0]):
cmd = list(cmd)
cmd[0] = os.path.basename(cmd[0])
return cmd
def get_cmdline_from_pid(pid):
if pid is None or not os.path.exists('/proc/%s' % pid):
return list()
with open('/proc/%s/cmdline' % pid, 'r') as f:
return f.readline().split('\0')[:-1]
def cmdlines_are_equal(cmd1, cmd2):
"""Validate provided lists containing output of /proc/cmdline are equal
This function ignores absolute paths of executables in order to have
correct results in case one list uses absolute path and the other does not.
"""
cmd1 = remove_abs_path(cmd1)
cmd2 = remove_abs_path(cmd2)
return cmd1 == cmd2
def pid_invoked_with_cmdline(pid, expected_cmd):
"""Validate process with given pid is running with provided parameters
"""
cmdline = get_cmdline_from_pid(pid)
return cmdlines_are_equal(expected_cmd, cmdline)
class Pinger(object):
def __init__(self, namespace, timeout=1, max_attempts=1):
self.namespace = namespace
@ -182,9 +123,9 @@ class RootHelperProcess(subprocess.Popen):
poller = select.poll()
poller.register(stream.fileno())
poll_predicate = functools.partial(poller.poll, 1)
wait_until_true(poll_predicate, timeout, 0.1,
RuntimeError(
'No output in %.2f seconds' % timeout))
utils.wait_until_true(poll_predicate, timeout, 0.1,
RuntimeError(
'No output in %.2f seconds' % timeout))
return stream.readline()
def writeline(self, data):
@ -196,10 +137,10 @@ class RootHelperProcess(subprocess.Popen):
def child_is_running():
child_pid = utils.get_root_helper_child_pid(
self.pid, run_as_root=self.run_as_root)
if pid_invoked_with_cmdline(child_pid, self.cmd):
if utils.pid_invoked_with_cmdline(child_pid, self.cmd):
return True
wait_until_true(
utils.wait_until_true(
child_is_running,
timeout,
exception=RuntimeError("Process %s hasn't been spawned "

View File

@ -17,14 +17,13 @@ import eventlet
from six import moves
from neutron.agent.linux import async_process
from neutron.agent.linux import utils
from neutron.tests import base
class TestAsyncProcess(base.BaseTestCase):
class AsyncProcessTestFramework(base.BaseTestCase):
def setUp(self):
super(TestAsyncProcess, self).setUp()
super(AsyncProcessTestFramework, self).setUp()
self.test_file_path = self.get_temp_file_path('test_async_process.tmp')
self.data = [str(x) for x in moves.xrange(4)]
with file(self.test_file_path, 'w') as f:
@ -39,12 +38,21 @@ class TestAsyncProcess(base.BaseTestCase):
output += new_output
eventlet.sleep(0.01)
class TestAsyncProcess(AsyncProcessTestFramework):
def _safe_stop(self, proc):
try:
proc.stop()
except async_process.AsyncProcessException:
pass
def test_stopping_async_process_lifecycle(self):
proc = async_process.AsyncProcess(['tail', '-f',
self.test_file_path])
proc.start()
self.addCleanup(self._safe_stop, proc)
proc.start(blocking=True)
self._check_stdout(proc)
proc.stop()
proc.stop(blocking=True)
# Ensure that the process and greenthreads have stopped
proc._process.wait()
@ -56,12 +64,10 @@ class TestAsyncProcess(base.BaseTestCase):
proc = async_process.AsyncProcess(['tail', '-f',
self.test_file_path],
respawn_interval=0)
self.addCleanup(self._safe_stop, proc)
proc.start()
# Ensure that the same output is read twice
self._check_stdout(proc)
pid = utils.get_root_helper_child_pid(proc._process.pid,
proc.run_as_root)
proc._kill_process(pid)
proc._kill_process(proc.pid)
self._check_stdout(proc)
proc.stop()

View File

@ -0,0 +1,40 @@
# Copyright 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import testtools
from neutron.agent.linux import async_process
from neutron.agent.linux import utils
from neutron.tests.functional.agent.linux import test_async_process
class TestPIDHelpers(test_async_process.AsyncProcessTestFramework):
def test_get_cmdline_from_pid_and_pid_invoked_with_cmdline(self):
cmd = ['tail', '-f', self.test_file_path]
proc = async_process.AsyncProcess(cmd)
proc.start(blocking=True)
self.addCleanup(proc.stop)
pid = proc.pid
self.assertEqual(cmd, utils.get_cmdline_from_pid(pid))
self.assertTrue(utils.pid_invoked_with_cmdline(pid, cmd))
self.assertEqual([], utils.get_cmdline_from_pid(-1))
def test_wait_until_true_predicate_succeeds(self):
utils.wait_until_true(lambda: True)
def test_wait_until_true_predicate_fails(self):
with testtools.ExpectedException(eventlet.timeout.Timeout):
utils.wait_until_true(lambda: False, 2)

View File

@ -31,6 +31,7 @@ from neutron.agent.linux import dhcp
from neutron.agent.linux import external_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
from neutron.agent.metadata import agent as metadata_agent
from neutron.common import config as common_config
from neutron.common import constants as l3_constants
@ -395,7 +396,7 @@ class L3AgentTestCase(L3AgentTestFramework):
port = router.get_ex_gw_port()
interface_name = self.agent.get_external_device_name(port['id'])
self._assert_no_ip_addresses_on_interface(router, interface_name)
helpers.wait_until_true(lambda: router.ha_state == 'master')
utils.wait_until_true(lambda: router.ha_state == 'master')
# Keepalived notifies of a state transition when it starts,
# not when it ends. Thus, we have to wait until keepalived finishes
@ -407,7 +408,7 @@ class L3AgentTestCase(L3AgentTestFramework):
device,
self.agent.get_internal_device_name,
router.ns_name)
helpers.wait_until_true(device_exists)
utils.wait_until_true(device_exists)
self.assertTrue(self._namespace_exists(router.ns_name))
self.assertTrue(self._metadata_proxy_exists(self.agent.conf, router))
@ -478,7 +479,7 @@ class L3AgentTestCase(L3AgentTestFramework):
restarted_agent = l3_test_agent.TestL3NATAgent(self.agent.host,
self.agent.conf)
self._create_router(restarted_agent, router1.router)
helpers.wait_until_true(lambda: self._floating_ips_configured(router1))
utils.wait_until_true(lambda: self._floating_ips_configured(router1))
class L3HATestFramework(L3AgentTestFramework):
@ -506,16 +507,16 @@ class L3HATestFramework(L3AgentTestFramework):
router2 = self.manage_router(self.failover_agent, router_info_2)
helpers.wait_until_true(lambda: router1.ha_state == 'master')
helpers.wait_until_true(lambda: router2.ha_state == 'backup')
utils.wait_until_true(lambda: router1.ha_state == 'master')
utils.wait_until_true(lambda: router2.ha_state == 'backup')
device_name = router1.get_ha_device_name(
router1.router[l3_constants.HA_INTERFACE_KEY]['id'])
ha_device = ip_lib.IPDevice(device_name, namespace=router1.ns_name)
ha_device.link.set_down()
helpers.wait_until_true(lambda: router2.ha_state == 'master')
helpers.wait_until_true(lambda: router1.ha_state == 'fault')
utils.wait_until_true(lambda: router2.ha_state == 'master')
utils.wait_until_true(lambda: router1.ha_state == 'fault')
class MetadataFakeProxyHandler(object):

View File

@ -187,3 +187,19 @@ class TestGetRoothelperChildPid(base.BaseTestCase):
def test_returns_none_as_root(self):
self._test_get_root_helper_child_pid(expected=None, run_as_root=True)
class TestPathUtilities(base.BaseTestCase):
def test_remove_abs_path(self):
self.assertEqual(['ping', '8.8.8.8'],
utils.remove_abs_path(['/usr/bin/ping', '8.8.8.8']))
def test_cmdlines_are_equal(self):
self.assertTrue(utils.cmdlines_are_equal(
['ping', '8.8.8.8'],
['/usr/bin/ping', '8.8.8.8']))
def test_cmdlines_are_equal_different_commands(self):
self.assertFalse(utils.cmdlines_are_equal(
['ping', '8.8.8.8'],
['/usr/bin/ping6', '8.8.8.8']))

View File

@ -1002,3 +1002,14 @@ class TestArpPing(TestIPCmdBase):
# If this was called then check_added_address probably had a assert
self.assertFalse(device.addr.add.called)
class TestAddNamespaceToCmd(base.BaseTestCase):
def test_add_namespace_to_cmd_with_namespace(self):
cmd = ['ping', '8.8.8.8']
self.assertEqual(['ip', 'netns', 'exec', 'tmp'] + cmd,
ip_lib.add_namespace_to_cmd(cmd, 'tmp'))
def test_add_namespace_to_cmd_without_namespace(self):
cmd = ['ping', '8.8.8.8']
self.assertEqual(cmd, ip_lib.add_namespace_to_cmd(cmd, None))