Implements the ProcessMonitor in the l3_agent

The ProcessMonitor class will watch over spawned external processes,
taking the administrator configured action in the case of any
of the external processes dying unexpectedly.

It covers both the neutron-ns-metadata-proxy for non-ha routers
and the IPv6 radvd external processes. Keepalived +
neutron-ns-metadata-proxy needs to be covered in a second follow up
patch when neutron-ns-metadata-proxy is handled by the l3-agent
(instead keepalived) in the ha-routers.

Implements: blueprint agent-child-processes-status

Change-Id: Id6cc4786d837b96c61429d51485bc86ae37872cb
This commit is contained in:
Miguel Angel Ajo 2014-08-18 13:00:58 +02:00
parent 2e5bf64a51
commit dcd95ee343
9 changed files with 97 additions and 77 deletions

View File

@ -29,6 +29,7 @@ from neutron.agent.l3 import ha
from neutron.agent.l3 import ha_router
from neutron.agent.l3 import legacy_router
from neutron.agent.l3 import router_processing_queue as queue
from neutron.agent.linux import external_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ra
from neutron.agent.metadata import driver as metadata_driver
@ -148,6 +149,11 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
self._check_config_params()
self.process_monitor = external_process.ProcessMonitor(
config=self.conf,
root_helper=self.root_helper,
resource_type='router')
try:
self.driver = importutils.import_object(
self.conf.interface_driver,
@ -281,7 +287,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
def _destroy_router_namespace(self, ns):
router_id = self.get_router_id(ns)
ra.disable_ipv6_ra(router_id, ns, self.root_helper)
ra.disable_ipv6_ra(router_id, self.process_monitor)
ns_ip = ip_lib.IPWrapper(self.root_helper, namespace=ns)
for d in ns_ip.get_devices(exclude_loopback=True):
if d.name.startswith(INTERNAL_DEV_PREFIX):
@ -450,7 +456,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
ri.ns_name,
internal_ports,
self.get_internal_device_name,
self.root_helper)
self.process_monitor)
existing_devices = self._get_existing_devices(ri)
current_internal_devs = set([n for n in existing_devices

View File

@ -147,12 +147,15 @@ class AgentMixin(object):
callback = (
metadata_driver.MetadataDriver._get_metadata_proxy_callback(
ri.router_id, self.conf))
# TODO(mangelajo): use the process monitor in keepalived when
# keepalived stops killing/starting metadata
# proxy on its own
pm = (
metadata_driver.MetadataDriver.
_get_metadata_proxy_process_manager(ri.router_id,
ri.ns_name,
self.conf))
pid = pm.get_pid_file_name(ensure_pids_dir=True)
pid = pm.get_pid_file_name()
ri.keepalived_manager.add_notifier(
callback(pid), 'master', ri.ha_vr_id)
for state in ('backup', 'fault'):

View File

@ -66,11 +66,13 @@ class ProcessManager(object):
self.service_pid_fname = 'pid'
self.service = 'default-service'
utils.ensure_dir(os.path.dirname(self.get_pid_file_name()))
def enable(self, cmd_callback=None, reload_cfg=False):
if not self.active:
if not cmd_callback:
cmd_callback = self.default_cmd_callback
cmd = cmd_callback(self.get_pid_file_name(ensure_pids_dir=True))
cmd = cmd_callback(self.get_pid_file_name())
ip_wrapper = ip_lib.IPWrapper(self.root_helper, self.namespace)
ip_wrapper.netns.execute(cmd, addl_env=self.cmd_addl_env)
@ -96,17 +98,14 @@ class ProcessManager(object):
else:
LOG.debug('No process started for %s', self.uuid)
def get_pid_file_name(self, ensure_pids_dir=False):
def get_pid_file_name(self):
"""Returns the file name for a given kind of config file."""
if self.pid_file:
if ensure_pids_dir:
utils.ensure_dir(os.path.dirname(self.pid_file))
return self.pid_file
else:
return utils.get_conf_file_name(self.pids_path,
self.uuid,
self.service_pid_fname,
ensure_pids_dir)
self.service_pid_fname)
@property
def pid(self):
@ -223,6 +222,11 @@ class ProcessMonitor(object):
service=service,
pid_file=pid_file).pid
def get_pid_file_name(self, uuid, service=None):
return self._ensure_process_manager(
uuid=uuid,
service=service).get_pid_file_name()
def _ensure_process_manager(self, uuid, cmd_callback=None,
namespace=None, service=None,
cmd_addl_env=None,

View File

@ -18,12 +18,14 @@ import netaddr
from oslo.config import cfg
import six
from neutron.agent.linux import external_process
from neutron.agent.linux import utils
from neutron.common import constants
from neutron.openstack.common import log as logging
RADVD_SERVICE_NAME = 'radvd'
RADVD_SERVICE_CMD = 'radvd'
LOG = logging.getLogger(__name__)
OPTS = [
@ -76,49 +78,43 @@ def _generate_radvd_conf(router_id, router_ports, dev_name_helper):
return radvd_conf
def _spawn_radvd(router_id, radvd_conf, router_ns, root_helper):
def _spawn_radvd(router_id, radvd_conf, router_ns, process_monitor):
def callback(pid_file):
# we need to use -m syslog and f.e. not -m stderr (the default)
# or -m stderr_syslog so that radvd 2.0+ will close stderr and
# exit after daemonization; otherwise, the current thread will
# be locked waiting for result from radvd that won't ever come
# until the process dies
radvd_cmd = ['radvd',
radvd_cmd = [RADVD_SERVICE_CMD,
'-C', '%s' % radvd_conf,
'-p', '%s' % pid_file,
'-m', 'syslog']
return radvd_cmd
radvd = external_process.ProcessManager(cfg.CONF,
router_id,
root_helper,
router_ns,
'radvd')
radvd.enable(callback, True)
process_monitor.enable(uuid=router_id,
cmd_callback=callback,
namespace=router_ns,
service=RADVD_SERVICE_NAME,
reload_cfg=True)
LOG.debug("radvd enabled for router %s", router_id)
def enable_ipv6_ra(router_id, router_ns, router_ports,
dev_name_helper, root_helper):
dev_name_helper, process_monitor):
for p in router_ports:
if netaddr.IPNetwork(p['subnet']['cidr']).version == 6:
break
else:
# Kill the daemon if it's running
disable_ipv6_ra(router_id, router_ns, root_helper)
disable_ipv6_ra(router_id, process_monitor)
return
LOG.debug("Enable IPv6 RA for router %s", router_id)
radvd_conf = _generate_radvd_conf(router_id, router_ports, dev_name_helper)
_spawn_radvd(router_id, radvd_conf, router_ns, root_helper)
_spawn_radvd(router_id, radvd_conf, router_ns, process_monitor)
def disable_ipv6_ra(router_id, router_ns, root_helper):
radvd = external_process.ProcessManager(cfg.CONF,
router_id,
root_helper,
router_ns,
'radvd')
radvd.disable()
def disable_ipv6_ra(router_id, process_monitor):
process_monitor.disable(router_id, service=RADVD_SERVICE_NAME)
utils.remove_conf_files(cfg.CONF.ra_confs, router_id)
LOG.debug("radvd disabled for router %s", router_id)

View File

@ -141,6 +141,8 @@ def ensure_dir(dir_path):
def _get_conf_base(cfg_root, uuid, ensure_conf_dir):
#TODO(mangelajo): separate responsibilities here, ensure_conf_dir
# should be a separate function
conf_dir = os.path.abspath(os.path.normpath(cfg_root))
conf_base = os.path.join(conf_dir, uuid)
if ensure_conf_dir:

View File

@ -36,9 +36,8 @@ class MetadataDriver(advanced_service.AdvancedService):
router.iptables_manager.apply()
if not router.is_ha:
self._spawn_metadata_proxy(router.router_id,
router.ns_name,
self.l3_agent.conf)
self._spawn_monitored_metadata_proxy(router.router_id,
router.ns_name)
def before_router_removed(self, router):
for c, r in self.metadata_filter_rules(self.metadata_port):
@ -47,9 +46,8 @@ class MetadataDriver(advanced_service.AdvancedService):
router.iptables_manager.ipv4['nat'].remove_rule(c, r)
router.iptables_manager.apply()
self._destroy_metadata_proxy(router.router['id'],
router.ns_name,
self.l3_agent.conf)
self._destroy_monitored_metadata_proxy(router.router['id'],
router.ns_name)
@classmethod
def metadata_filter_rules(cls, port):
@ -89,6 +87,17 @@ class MetadataDriver(advanced_service.AdvancedService):
return callback
def _spawn_monitored_metadata_proxy(self, router_id, ns_name):
callback = self._get_metadata_proxy_callback(
router_id, self.l3_agent.conf)
self.l3_agent.process_monitor.enable(router_id, callback, ns_name)
def _destroy_monitored_metadata_proxy(self, router_id, ns_name):
self.l3_agent.process_monitor.disable(router_id, ns_name)
# TODO(mangelajo): remove the unmonitored _get_*_process_manager,
# _spawn_* and _destroy* when keepalived stops
# spawning and killing proxies on its own.
@classmethod
def _get_metadata_proxy_process_manager(cls, router_id, ns_name, conf):
return external_process.ProcessManager(

View File

@ -63,6 +63,7 @@ class L3AgentTestFramework(base.BaseOVSLinuxTestCase):
config.register_cli_opts(logging.logging_cli_opts)
config.register_opts(logging.generic_log_opts)
config.register_opts(logging.log_opts)
agent_config.register_process_monitor_opts(config)
return config
def _configure_agent(self, host):

View File

@ -19,7 +19,6 @@ import eventlet
import mock
import netaddr
from oslo.config import cfg
from oslo import messaging
from testtools import matchers
@ -31,6 +30,7 @@ from neutron.agent.l3 import dvr_router
from neutron.agent.l3 import ha
from neutron.agent.l3 import link_local_allocator as lla
from neutron.agent.l3 import router_info as l3router
from neutron.agent.linux import external_process
from neutron.agent.linux import interface
from neutron.agent.linux import ra
from neutron.agent.metadata import driver as metadata_driver
@ -180,7 +180,9 @@ class TestBasicRouterOperations(base.BaseTestCase):
agent_config.register_interface_driver_opts_helper(self.conf)
agent_config.register_use_namespaces_opts_helper(self.conf)
agent_config.register_root_helper(self.conf)
agent_config.register_process_monitor_opts(self.conf)
self.conf.register_opts(interface.OPTS)
self.conf.register_opts(external_process.OPTS)
self.conf.set_override('router_id', 'fake_id')
self.conf.set_override('interface_driver',
'neutron.agent.linux.interface.NullDriver')
@ -1276,23 +1278,34 @@ class TestBasicRouterOperations(base.BaseTestCase):
self.assertFalse(nat_rules_delta)
return ri
def _expected_call_lookup_ri_process(self, ri, process):
def _expected_call_lookup_ri_process_enabled(self, ri, process):
"""Expected call if a process is looked up in a router instance."""
return [mock.call(cfg.CONF,
ri.router['id'],
self.conf.root_helper,
ri.ns_name,
process)]
return [mock.call(uuid=ri.router['id'],
service=process,
default_cmd_callback=mock.ANY,
namespace=ri.ns_name,
root_helper=self.conf.root_helper,
conf=self.conf,
pid_file=None,
cmd_addl_env=None)]
def _expected_call_lookup_ri_process_disabled(self, ri, process):
"""Expected call if a process is looked up in a router instance."""
# The ProcessManager does already exist, and it's found via
# ProcessMonitor lookup _ensure_process_manager
return [mock.call().__nonzero__()]
def _assert_ri_process_enabled(self, ri, process):
"""Verify that process was enabled for a router instance."""
expected_calls = self._expected_call_lookup_ri_process(ri, process)
expected_calls.append(mock.call().enable(mock.ANY, True))
expected_calls = self._expected_call_lookup_ri_process_enabled(
ri, process)
expected_calls.append(mock.call().enable(reload_cfg=True))
self.assertEqual(expected_calls, self.external_process.mock_calls)
def _assert_ri_process_disabled(self, ri, process):
"""Verify that process was disabled for a router instance."""
expected_calls = self._expected_call_lookup_ri_process(ri, process)
expected_calls = self._expected_call_lookup_ri_process_disabled(
ri, process)
expected_calls.append(mock.call().disable())
self.assertEqual(expected_calls, self.external_process.mock_calls)
@ -1677,20 +1690,18 @@ class TestBasicRouterOperations(base.BaseTestCase):
'distributed': False}
driver = metadata_driver.MetadataDriver
with mock.patch.object(
driver, '_destroy_metadata_proxy') as destroy_proxy:
driver, '_destroy_monitored_metadata_proxy') as destroy_proxy:
with mock.patch.object(
driver, '_spawn_metadata_proxy') as spawn_proxy:
driver, '_spawn_monitored_metadata_proxy') as spawn_proxy:
agent._process_added_router(router)
if enableflag:
spawn_proxy.assert_called_with(router_id,
mock.ANY,
mock.ANY)
else:
self.assertFalse(spawn_proxy.call_count)
agent._router_removed(router_id)
if enableflag:
destroy_proxy.assert_called_with(router_id,
mock.ANY,
mock.ANY)
else:
self.assertFalse(destroy_proxy.call_count)
@ -2195,15 +2206,17 @@ class TestBasicRouterOperations(base.BaseTestCase):
self.external_process_p.stop()
self.ip_cls_p.stop()
ensure_dir = 'neutron.agent.linux.utils.ensure_dir'
get_pid_file_name = ('neutron.agent.linux.external_process.'
'ProcessManager.get_pid_file_name')
with mock.patch('neutron.agent.linux.utils.execute') as execute:
with mock.patch(get_pid_file_name) as get_pid:
get_pid.return_value = pidfile
ra._spawn_radvd(router['id'],
conffile,
agent.get_ns_name(router['id']),
self.conf.root_helper)
with mock.patch(ensure_dir) as ensure_dir:
get_pid.return_value = pidfile
ra._spawn_radvd(router['id'],
conffile,
agent.get_ns_name(router['id']),
agent.process_monitor)
cmd = execute.call_args[0][0]
self.assertIn('radvd', cmd)

View File

@ -13,6 +13,7 @@
# under the License.
import mock
import os.path
from neutron.agent.linux import external_process as ep
from neutron.tests import base
@ -25,10 +26,16 @@ class TestProcessManager(base.BaseTestCase):
self.execute = self.execute_p.start()
self.delete_if_exists = mock.patch(
'neutron.openstack.common.fileutils.delete_if_exists').start()
self.makedirs = mock.patch('os.makedirs').start()
self.conf = mock.Mock()
self.conf.external_pids = '/var/path'
def test_processmanager_ensures_pid_dir(self):
pid_file = os.path.join(self.conf.external_pids, 'pid')
ep.ProcessManager(self.conf, 'uuid', pid_file=pid_file)
self.makedirs.assert_called_once_with(self.conf.external_pids, 0o755)
def test_enable_no_namespace(self):
callback = mock.Mock()
callback.return_value = ['the', 'cmd']
@ -41,7 +48,6 @@ class TestProcessManager(base.BaseTestCase):
manager = ep.ProcessManager(self.conf, 'uuid')
manager.enable(callback)
callback.assert_called_once_with('pidfile')
name.assert_called_once_with(ensure_pids_dir=True)
self.execute.assert_called_once_with(['the', 'cmd'],
root_helper='sudo',
check_exit_code=True,
@ -60,7 +66,6 @@ class TestProcessManager(base.BaseTestCase):
with mock.patch.object(ep, 'ip_lib') as ip_lib:
manager.enable(callback)
callback.assert_called_once_with('pidfile')
name.assert_called_once_with(ensure_pids_dir=True)
ip_lib.assert_has_calls([
mock.call.IPWrapper('sudo', 'ns'),
mock.call.IPWrapper().netns.execute(['the', 'cmd'],
@ -121,29 +126,10 @@ class TestProcessManager(base.BaseTestCase):
manager.disable()
debug.assert_called_once_with(mock.ANY, mock.ANY)
def test_get_pid_file_name_existing(self):
with mock.patch.object(ep.utils.os.path, 'isdir') as isdir:
isdir.return_value = True
manager = ep.ProcessManager(self.conf, 'uuid')
retval = manager.get_pid_file_name(ensure_pids_dir=True)
self.assertEqual(retval, '/var/path/uuid.pid')
def test_get_pid_file_name_not_existing(self):
with mock.patch.object(ep.utils.os.path, 'isdir') as isdir:
with mock.patch.object(ep.utils.os, 'makedirs') as makedirs:
isdir.return_value = False
manager = ep.ProcessManager(self.conf, 'uuid')
retval = manager.get_pid_file_name(ensure_pids_dir=True)
self.assertEqual(retval, '/var/path/uuid.pid')
makedirs.assert_called_once_with('/var/path', 0o755)
def test_get_pid_file_name_default(self):
with mock.patch.object(ep.utils.os.path, 'isdir') as isdir:
isdir.return_value = True
manager = ep.ProcessManager(self.conf, 'uuid')
retval = manager.get_pid_file_name(ensure_pids_dir=False)
self.assertEqual(retval, '/var/path/uuid.pid')
self.assertFalse(isdir.called)
manager = ep.ProcessManager(self.conf, 'uuid')
retval = manager.get_pid_file_name()
self.assertEqual(retval, '/var/path/uuid.pid')
def test_pid(self):
with mock.patch('__builtin__.open') as mock_open: