Merge "Implements the ProcessMonitor in the l3_agent"
This commit is contained in:
commit
6a797f354e
|
@ -29,6 +29,7 @@ from neutron.agent.l3 import ha
|
|||
from neutron.agent.l3 import ha_router
|
||||
from neutron.agent.l3 import legacy_router
|
||||
from neutron.agent.l3 import router_processing_queue as queue
|
||||
from neutron.agent.linux import external_process
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.agent.linux import ra
|
||||
from neutron.agent.metadata import driver as metadata_driver
|
||||
|
@ -148,6 +149,11 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
|
|||
|
||||
self._check_config_params()
|
||||
|
||||
self.process_monitor = external_process.ProcessMonitor(
|
||||
config=self.conf,
|
||||
root_helper=self.root_helper,
|
||||
resource_type='router')
|
||||
|
||||
try:
|
||||
self.driver = importutils.import_object(
|
||||
self.conf.interface_driver,
|
||||
|
@ -281,7 +287,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
|
|||
|
||||
def _destroy_router_namespace(self, ns):
|
||||
router_id = self.get_router_id(ns)
|
||||
ra.disable_ipv6_ra(router_id, ns, self.root_helper)
|
||||
ra.disable_ipv6_ra(router_id, self.process_monitor)
|
||||
ns_ip = ip_lib.IPWrapper(self.root_helper, namespace=ns)
|
||||
for d in ns_ip.get_devices(exclude_loopback=True):
|
||||
if d.name.startswith(INTERNAL_DEV_PREFIX):
|
||||
|
@ -452,7 +458,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
|
|||
ri.ns_name,
|
||||
internal_ports,
|
||||
self.get_internal_device_name,
|
||||
self.root_helper)
|
||||
self.process_monitor)
|
||||
|
||||
existing_devices = self._get_existing_devices(ri)
|
||||
current_internal_devs = set([n for n in existing_devices
|
||||
|
|
|
@ -147,12 +147,15 @@ class AgentMixin(object):
|
|||
callback = (
|
||||
metadata_driver.MetadataDriver._get_metadata_proxy_callback(
|
||||
ri.router_id, self.conf))
|
||||
# TODO(mangelajo): use the process monitor in keepalived when
|
||||
# keepalived stops killing/starting metadata
|
||||
# proxy on its own
|
||||
pm = (
|
||||
metadata_driver.MetadataDriver.
|
||||
_get_metadata_proxy_process_manager(ri.router_id,
|
||||
ri.ns_name,
|
||||
self.conf))
|
||||
pid = pm.get_pid_file_name(ensure_pids_dir=True)
|
||||
pid = pm.get_pid_file_name()
|
||||
ri.keepalived_manager.add_notifier(
|
||||
callback(pid), 'master', ri.ha_vr_id)
|
||||
for state in ('backup', 'fault'):
|
||||
|
|
|
@ -66,11 +66,13 @@ class ProcessManager(object):
|
|||
self.service_pid_fname = 'pid'
|
||||
self.service = 'default-service'
|
||||
|
||||
utils.ensure_dir(os.path.dirname(self.get_pid_file_name()))
|
||||
|
||||
def enable(self, cmd_callback=None, reload_cfg=False):
|
||||
if not self.active:
|
||||
if not cmd_callback:
|
||||
cmd_callback = self.default_cmd_callback
|
||||
cmd = cmd_callback(self.get_pid_file_name(ensure_pids_dir=True))
|
||||
cmd = cmd_callback(self.get_pid_file_name())
|
||||
|
||||
ip_wrapper = ip_lib.IPWrapper(self.root_helper, self.namespace)
|
||||
ip_wrapper.netns.execute(cmd, addl_env=self.cmd_addl_env)
|
||||
|
@ -96,17 +98,14 @@ class ProcessManager(object):
|
|||
else:
|
||||
LOG.debug('No process started for %s', self.uuid)
|
||||
|
||||
def get_pid_file_name(self, ensure_pids_dir=False):
|
||||
def get_pid_file_name(self):
|
||||
"""Returns the file name for a given kind of config file."""
|
||||
if self.pid_file:
|
||||
if ensure_pids_dir:
|
||||
utils.ensure_dir(os.path.dirname(self.pid_file))
|
||||
return self.pid_file
|
||||
else:
|
||||
return utils.get_conf_file_name(self.pids_path,
|
||||
self.uuid,
|
||||
self.service_pid_fname,
|
||||
ensure_pids_dir)
|
||||
self.service_pid_fname)
|
||||
|
||||
@property
|
||||
def pid(self):
|
||||
|
@ -223,6 +222,11 @@ class ProcessMonitor(object):
|
|||
service=service,
|
||||
pid_file=pid_file).pid
|
||||
|
||||
def get_pid_file_name(self, uuid, service=None):
|
||||
return self._ensure_process_manager(
|
||||
uuid=uuid,
|
||||
service=service).get_pid_file_name()
|
||||
|
||||
def _ensure_process_manager(self, uuid, cmd_callback=None,
|
||||
namespace=None, service=None,
|
||||
cmd_addl_env=None,
|
||||
|
|
|
@ -18,12 +18,14 @@ import netaddr
|
|||
from oslo.config import cfg
|
||||
import six
|
||||
|
||||
from neutron.agent.linux import external_process
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.common import constants
|
||||
from neutron.openstack.common import log as logging
|
||||
|
||||
|
||||
RADVD_SERVICE_NAME = 'radvd'
|
||||
RADVD_SERVICE_CMD = 'radvd'
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
OPTS = [
|
||||
|
@ -76,49 +78,43 @@ def _generate_radvd_conf(router_id, router_ports, dev_name_helper):
|
|||
return radvd_conf
|
||||
|
||||
|
||||
def _spawn_radvd(router_id, radvd_conf, router_ns, root_helper):
|
||||
def _spawn_radvd(router_id, radvd_conf, router_ns, process_monitor):
|
||||
def callback(pid_file):
|
||||
# we need to use -m syslog and f.e. not -m stderr (the default)
|
||||
# or -m stderr_syslog so that radvd 2.0+ will close stderr and
|
||||
# exit after daemonization; otherwise, the current thread will
|
||||
# be locked waiting for result from radvd that won't ever come
|
||||
# until the process dies
|
||||
radvd_cmd = ['radvd',
|
||||
radvd_cmd = [RADVD_SERVICE_CMD,
|
||||
'-C', '%s' % radvd_conf,
|
||||
'-p', '%s' % pid_file,
|
||||
'-m', 'syslog']
|
||||
return radvd_cmd
|
||||
|
||||
radvd = external_process.ProcessManager(cfg.CONF,
|
||||
router_id,
|
||||
root_helper,
|
||||
router_ns,
|
||||
'radvd')
|
||||
radvd.enable(callback, True)
|
||||
process_monitor.enable(uuid=router_id,
|
||||
cmd_callback=callback,
|
||||
namespace=router_ns,
|
||||
service=RADVD_SERVICE_NAME,
|
||||
reload_cfg=True)
|
||||
LOG.debug("radvd enabled for router %s", router_id)
|
||||
|
||||
|
||||
def enable_ipv6_ra(router_id, router_ns, router_ports,
|
||||
dev_name_helper, root_helper):
|
||||
dev_name_helper, process_monitor):
|
||||
for p in router_ports:
|
||||
if netaddr.IPNetwork(p['subnet']['cidr']).version == 6:
|
||||
break
|
||||
else:
|
||||
# Kill the daemon if it's running
|
||||
disable_ipv6_ra(router_id, router_ns, root_helper)
|
||||
disable_ipv6_ra(router_id, process_monitor)
|
||||
return
|
||||
|
||||
LOG.debug("Enable IPv6 RA for router %s", router_id)
|
||||
radvd_conf = _generate_radvd_conf(router_id, router_ports, dev_name_helper)
|
||||
_spawn_radvd(router_id, radvd_conf, router_ns, root_helper)
|
||||
_spawn_radvd(router_id, radvd_conf, router_ns, process_monitor)
|
||||
|
||||
|
||||
def disable_ipv6_ra(router_id, router_ns, root_helper):
|
||||
radvd = external_process.ProcessManager(cfg.CONF,
|
||||
router_id,
|
||||
root_helper,
|
||||
router_ns,
|
||||
'radvd')
|
||||
radvd.disable()
|
||||
def disable_ipv6_ra(router_id, process_monitor):
|
||||
process_monitor.disable(router_id, service=RADVD_SERVICE_NAME)
|
||||
utils.remove_conf_files(cfg.CONF.ra_confs, router_id)
|
||||
LOG.debug("radvd disabled for router %s", router_id)
|
||||
|
|
|
@ -141,6 +141,8 @@ def ensure_dir(dir_path):
|
|||
|
||||
|
||||
def _get_conf_base(cfg_root, uuid, ensure_conf_dir):
|
||||
#TODO(mangelajo): separate responsibilities here, ensure_conf_dir
|
||||
# should be a separate function
|
||||
conf_dir = os.path.abspath(os.path.normpath(cfg_root))
|
||||
conf_base = os.path.join(conf_dir, uuid)
|
||||
if ensure_conf_dir:
|
||||
|
|
|
@ -56,9 +56,8 @@ class MetadataDriver(advanced_service.AdvancedService):
|
|||
router.iptables_manager.apply()
|
||||
|
||||
if not router.is_ha:
|
||||
self._spawn_metadata_proxy(router.router_id,
|
||||
router.ns_name,
|
||||
self.l3_agent.conf)
|
||||
self._spawn_monitored_metadata_proxy(router.router_id,
|
||||
router.ns_name)
|
||||
|
||||
def before_router_removed(self, router):
|
||||
for c, r in self.metadata_filter_rules(self.metadata_port):
|
||||
|
@ -67,9 +66,8 @@ class MetadataDriver(advanced_service.AdvancedService):
|
|||
router.iptables_manager.ipv4['nat'].remove_rule(c, r)
|
||||
router.iptables_manager.apply()
|
||||
|
||||
self._destroy_metadata_proxy(router.router['id'],
|
||||
router.ns_name,
|
||||
self.l3_agent.conf)
|
||||
self._destroy_monitored_metadata_proxy(router.router['id'],
|
||||
router.ns_name)
|
||||
|
||||
@classmethod
|
||||
def metadata_filter_rules(cls, port):
|
||||
|
@ -109,6 +107,17 @@ class MetadataDriver(advanced_service.AdvancedService):
|
|||
|
||||
return callback
|
||||
|
||||
def _spawn_monitored_metadata_proxy(self, router_id, ns_name):
|
||||
callback = self._get_metadata_proxy_callback(
|
||||
router_id, self.l3_agent.conf)
|
||||
self.l3_agent.process_monitor.enable(router_id, callback, ns_name)
|
||||
|
||||
def _destroy_monitored_metadata_proxy(self, router_id, ns_name):
|
||||
self.l3_agent.process_monitor.disable(router_id, ns_name)
|
||||
|
||||
# TODO(mangelajo): remove the unmonitored _get_*_process_manager,
|
||||
# _spawn_* and _destroy* when keepalived stops
|
||||
# spawning and killing proxies on its own.
|
||||
@classmethod
|
||||
def _get_metadata_proxy_process_manager(cls, router_id, ns_name, conf):
|
||||
return external_process.ProcessManager(
|
||||
|
|
|
@ -62,6 +62,7 @@ class L3AgentTestFramework(base.BaseOVSLinuxTestCase):
|
|||
config.register_cli_opts(logging.logging_cli_opts)
|
||||
config.register_opts(logging.generic_log_opts)
|
||||
config.register_opts(logging.log_opts)
|
||||
agent_config.register_process_monitor_opts(config)
|
||||
return config
|
||||
|
||||
def _configure_agent(self, host):
|
||||
|
|
|
@ -19,7 +19,6 @@ import eventlet
|
|||
|
||||
import mock
|
||||
import netaddr
|
||||
from oslo.config import cfg
|
||||
from oslo import messaging
|
||||
from testtools import matchers
|
||||
|
||||
|
@ -31,6 +30,7 @@ from neutron.agent.l3 import dvr_router
|
|||
from neutron.agent.l3 import ha
|
||||
from neutron.agent.l3 import link_local_allocator as lla
|
||||
from neutron.agent.l3 import router_info as l3router
|
||||
from neutron.agent.linux import external_process
|
||||
from neutron.agent.linux import interface
|
||||
from neutron.agent.linux import ra
|
||||
from neutron.agent.metadata import driver as metadata_driver
|
||||
|
@ -185,7 +185,9 @@ class TestBasicRouterOperations(base.BaseTestCase):
|
|||
agent_config.register_interface_driver_opts_helper(self.conf)
|
||||
agent_config.register_use_namespaces_opts_helper(self.conf)
|
||||
agent_config.register_root_helper(self.conf)
|
||||
agent_config.register_process_monitor_opts(self.conf)
|
||||
self.conf.register_opts(interface.OPTS)
|
||||
self.conf.register_opts(external_process.OPTS)
|
||||
self.conf.set_override('router_id', 'fake_id')
|
||||
self.conf.set_override('interface_driver',
|
||||
'neutron.agent.linux.interface.NullDriver')
|
||||
|
@ -1267,23 +1269,34 @@ class TestBasicRouterOperations(base.BaseTestCase):
|
|||
self.assertFalse(nat_rules_delta)
|
||||
return ri
|
||||
|
||||
def _expected_call_lookup_ri_process(self, ri, process):
|
||||
def _expected_call_lookup_ri_process_enabled(self, ri, process):
|
||||
"""Expected call if a process is looked up in a router instance."""
|
||||
return [mock.call(cfg.CONF,
|
||||
ri.router['id'],
|
||||
self.conf.root_helper,
|
||||
ri.ns_name,
|
||||
process)]
|
||||
return [mock.call(uuid=ri.router['id'],
|
||||
service=process,
|
||||
default_cmd_callback=mock.ANY,
|
||||
namespace=ri.ns_name,
|
||||
root_helper=self.conf.root_helper,
|
||||
conf=self.conf,
|
||||
pid_file=None,
|
||||
cmd_addl_env=None)]
|
||||
|
||||
def _expected_call_lookup_ri_process_disabled(self, ri, process):
|
||||
"""Expected call if a process is looked up in a router instance."""
|
||||
# The ProcessManager does already exist, and it's found via
|
||||
# ProcessMonitor lookup _ensure_process_manager
|
||||
return [mock.call().__nonzero__()]
|
||||
|
||||
def _assert_ri_process_enabled(self, ri, process):
|
||||
"""Verify that process was enabled for a router instance."""
|
||||
expected_calls = self._expected_call_lookup_ri_process(ri, process)
|
||||
expected_calls.append(mock.call().enable(mock.ANY, True))
|
||||
expected_calls = self._expected_call_lookup_ri_process_enabled(
|
||||
ri, process)
|
||||
expected_calls.append(mock.call().enable(reload_cfg=True))
|
||||
self.assertEqual(expected_calls, self.external_process.mock_calls)
|
||||
|
||||
def _assert_ri_process_disabled(self, ri, process):
|
||||
"""Verify that process was disabled for a router instance."""
|
||||
expected_calls = self._expected_call_lookup_ri_process(ri, process)
|
||||
expected_calls = self._expected_call_lookup_ri_process_disabled(
|
||||
ri, process)
|
||||
expected_calls.append(mock.call().disable())
|
||||
self.assertEqual(expected_calls, self.external_process.mock_calls)
|
||||
|
||||
|
@ -1659,20 +1672,18 @@ class TestBasicRouterOperations(base.BaseTestCase):
|
|||
'distributed': False}
|
||||
driver = metadata_driver.MetadataDriver
|
||||
with mock.patch.object(
|
||||
driver, '_destroy_metadata_proxy') as destroy_proxy:
|
||||
driver, '_destroy_monitored_metadata_proxy') as destroy_proxy:
|
||||
with mock.patch.object(
|
||||
driver, '_spawn_metadata_proxy') as spawn_proxy:
|
||||
driver, '_spawn_monitored_metadata_proxy') as spawn_proxy:
|
||||
agent._process_added_router(router)
|
||||
if enableflag:
|
||||
spawn_proxy.assert_called_with(router_id,
|
||||
mock.ANY,
|
||||
mock.ANY)
|
||||
else:
|
||||
self.assertFalse(spawn_proxy.call_count)
|
||||
agent._router_removed(router_id)
|
||||
if enableflag:
|
||||
destroy_proxy.assert_called_with(router_id,
|
||||
mock.ANY,
|
||||
mock.ANY)
|
||||
else:
|
||||
self.assertFalse(destroy_proxy.call_count)
|
||||
|
@ -2172,15 +2183,17 @@ class TestBasicRouterOperations(base.BaseTestCase):
|
|||
self.external_process_p.stop()
|
||||
self.ip_cls_p.stop()
|
||||
|
||||
ensure_dir = 'neutron.agent.linux.utils.ensure_dir'
|
||||
get_pid_file_name = ('neutron.agent.linux.external_process.'
|
||||
'ProcessManager.get_pid_file_name')
|
||||
with mock.patch('neutron.agent.linux.utils.execute') as execute:
|
||||
with mock.patch(get_pid_file_name) as get_pid:
|
||||
get_pid.return_value = pidfile
|
||||
ra._spawn_radvd(router['id'],
|
||||
conffile,
|
||||
agent.get_ns_name(router['id']),
|
||||
self.conf.root_helper)
|
||||
with mock.patch(ensure_dir) as ensure_dir:
|
||||
get_pid.return_value = pidfile
|
||||
ra._spawn_radvd(router['id'],
|
||||
conffile,
|
||||
agent.get_ns_name(router['id']),
|
||||
agent.process_monitor)
|
||||
cmd = execute.call_args[0][0]
|
||||
|
||||
self.assertIn('radvd', cmd)
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# under the License.
|
||||
|
||||
import mock
|
||||
import os.path
|
||||
|
||||
from neutron.agent.linux import external_process as ep
|
||||
from neutron.tests import base
|
||||
|
@ -25,10 +26,16 @@ class TestProcessManager(base.BaseTestCase):
|
|||
self.execute = self.execute_p.start()
|
||||
self.delete_if_exists = mock.patch(
|
||||
'neutron.openstack.common.fileutils.delete_if_exists').start()
|
||||
self.makedirs = mock.patch('os.makedirs').start()
|
||||
|
||||
self.conf = mock.Mock()
|
||||
self.conf.external_pids = '/var/path'
|
||||
|
||||
def test_processmanager_ensures_pid_dir(self):
|
||||
pid_file = os.path.join(self.conf.external_pids, 'pid')
|
||||
ep.ProcessManager(self.conf, 'uuid', pid_file=pid_file)
|
||||
self.makedirs.assert_called_once_with(self.conf.external_pids, 0o755)
|
||||
|
||||
def test_enable_no_namespace(self):
|
||||
callback = mock.Mock()
|
||||
callback.return_value = ['the', 'cmd']
|
||||
|
@ -41,7 +48,6 @@ class TestProcessManager(base.BaseTestCase):
|
|||
manager = ep.ProcessManager(self.conf, 'uuid')
|
||||
manager.enable(callback)
|
||||
callback.assert_called_once_with('pidfile')
|
||||
name.assert_called_once_with(ensure_pids_dir=True)
|
||||
self.execute.assert_called_once_with(['the', 'cmd'],
|
||||
root_helper='sudo',
|
||||
check_exit_code=True,
|
||||
|
@ -60,7 +66,6 @@ class TestProcessManager(base.BaseTestCase):
|
|||
with mock.patch.object(ep, 'ip_lib') as ip_lib:
|
||||
manager.enable(callback)
|
||||
callback.assert_called_once_with('pidfile')
|
||||
name.assert_called_once_with(ensure_pids_dir=True)
|
||||
ip_lib.assert_has_calls([
|
||||
mock.call.IPWrapper('sudo', 'ns'),
|
||||
mock.call.IPWrapper().netns.execute(['the', 'cmd'],
|
||||
|
@ -121,29 +126,10 @@ class TestProcessManager(base.BaseTestCase):
|
|||
manager.disable()
|
||||
debug.assert_called_once_with(mock.ANY, mock.ANY)
|
||||
|
||||
def test_get_pid_file_name_existing(self):
|
||||
with mock.patch.object(ep.utils.os.path, 'isdir') as isdir:
|
||||
isdir.return_value = True
|
||||
manager = ep.ProcessManager(self.conf, 'uuid')
|
||||
retval = manager.get_pid_file_name(ensure_pids_dir=True)
|
||||
self.assertEqual(retval, '/var/path/uuid.pid')
|
||||
|
||||
def test_get_pid_file_name_not_existing(self):
|
||||
with mock.patch.object(ep.utils.os.path, 'isdir') as isdir:
|
||||
with mock.patch.object(ep.utils.os, 'makedirs') as makedirs:
|
||||
isdir.return_value = False
|
||||
manager = ep.ProcessManager(self.conf, 'uuid')
|
||||
retval = manager.get_pid_file_name(ensure_pids_dir=True)
|
||||
self.assertEqual(retval, '/var/path/uuid.pid')
|
||||
makedirs.assert_called_once_with('/var/path', 0o755)
|
||||
|
||||
def test_get_pid_file_name_default(self):
|
||||
with mock.patch.object(ep.utils.os.path, 'isdir') as isdir:
|
||||
isdir.return_value = True
|
||||
manager = ep.ProcessManager(self.conf, 'uuid')
|
||||
retval = manager.get_pid_file_name(ensure_pids_dir=False)
|
||||
self.assertEqual(retval, '/var/path/uuid.pid')
|
||||
self.assertFalse(isdir.called)
|
||||
manager = ep.ProcessManager(self.conf, 'uuid')
|
||||
retval = manager.get_pid_file_name()
|
||||
self.assertEqual(retval, '/var/path/uuid.pid')
|
||||
|
||||
def test_pid(self):
|
||||
with mock.patch('__builtin__.open') as mock_open:
|
||||
|
|
Loading…
Reference in New Issue