diff --git a/neutron/agent/l3/ha_router.py b/neutron/agent/l3/ha_router.py index 995172451ea..8ac4556f86f 100644 --- a/neutron/agent/l3/ha_router.py +++ b/neutron/agent/l3/ha_router.py @@ -376,11 +376,11 @@ class HaRouter(router.RouterInfo): def _get_state_change_monitor_process_manager(self): return external_process.ProcessManager( - self.agent_conf, - '%s.monitor' % self.router_id, - self.ha_namespace, + conf=self.agent_conf, uuid='%s.monitor' % self.router_id, + namespace=self.ha_namespace, service=KEEPALIVED_STATE_CHANGE_MONITOR_SERVICE_NAME, - default_cmd_callback=self._get_state_change_monitor_callback()) + default_cmd_callback=self._get_state_change_monitor_callback(), + run_as_root=True) def _get_state_change_monitor_callback(self): ha_device = self.get_ha_device_name() diff --git a/neutron/agent/l3/keepalived_state_change.py b/neutron/agent/l3/keepalived_state_change.py index 412d30b9835..7f4afb27638 100644 --- a/neutron/agent/l3/keepalived_state_change.py +++ b/neutron/agent/l3/keepalived_state_change.py @@ -13,24 +13,24 @@ # under the License. import os -import signal import sys +import threading +import time import httplib2 import netaddr from oslo_config import cfg from oslo_log import log as logging +from six.moves import queue from neutron._i18n import _ from neutron.agent.l3 import ha -from neutron.agent.l3 import ha_router from neutron.agent.linux import daemon from neutron.agent.linux import ip_lib -from neutron.agent.linux import ip_monitor from neutron.agent.linux import utils as agent_utils from neutron.common import config -from neutron.conf.agent import common as agent_config from neutron.conf.agent.l3 import keepalived +from neutron import privileged LOG = logging.getLogger(__name__) @@ -55,33 +55,41 @@ class MonitorDaemon(daemon.Daemon): self.interface = interface self.cidr = cidr self.monitor = None - super(MonitorDaemon, self).__init__( - pidfile, uuid=router_id, - user=user, group=group, - procname=ha_router.STATE_CHANGE_PROC_NAME) + self.event_stop = threading.Event() + self.event_started = threading.Event() + self.queue = queue.Queue() + super(MonitorDaemon, self).__init__(pidfile, uuid=router_id, + user=user, group=group) - def run(self, run_as_root=False): - self.monitor = ip_monitor.IPMonitor(namespace=self.namespace, - run_as_root=run_as_root) - self.monitor.start() - # Only drop privileges if the process is currently running as root - # (The run_as_root variable name here is unfortunate - It means to - # use a root helper when the running process is NOT already running - # as root - if not run_as_root: - super(MonitorDaemon, self).run() + def run(self): + self._thread_ip_monitor = threading.Thread( + target=ip_lib.ip_monitor, + args=(self.namespace, self.queue, self.event_stop, + self.event_started)) + self._thread_read_queue = threading.Thread( + target=self.read_queue, + args=(self.queue, self.event_stop, self.event_started)) + self._thread_ip_monitor.start() + self._thread_read_queue.start() self.handle_initial_state() - for iterable in self.monitor: - self.parse_and_handle_event(iterable) + while True: + time.sleep(1) - def parse_and_handle_event(self, iterable): - try: - event = ip_monitor.IPMonitorEvent.from_text(iterable) - if event.interface == self.interface and event.cidr == self.cidr: - new_state = 'master' if event.added else 'backup' + def read_queue(self, _queue, event_stop, event_started): + event_started.wait() + while not event_stop.is_set(): + try: + event = _queue.get(timeout=2) + except queue.Empty: + event = None + if not event: + continue + + if event['name'] == self.interface and event['cidr'] == self.cidr: + new_state = 'master' if event['event'] == 'added' else 'backup' self.write_state_change(new_state) self.notify_agent(new_state) - elif event.interface != self.interface and event.added: + elif event['name'] != self.interface and event['event'] == 'added': # Send GARPs for all new router interfaces. # REVISIT(jlibosva): keepalived versions 1.2.19 and below # contain bug where gratuitous ARPs are not sent on receiving @@ -90,9 +98,6 @@ class MonitorDaemon(daemon.Daemon): # packaged in some distributions (RHEL/CentOS/Ubuntu Xenial). # Remove this code once new keepalived versions are available. self.send_garp(event) - except Exception: - LOG.exception('Failed to process or handle event for line %s', - iterable) def handle_initial_state(self): try: @@ -133,26 +138,19 @@ class MonitorDaemon(daemon.Daemon): def send_garp(self, event): """Send gratuitous ARP for given event.""" + ip_address = str(netaddr.IPNetwork(event['cidr']).ip) ip_lib.send_ip_addr_adv_notif( self.namespace, - event.interface, - str(netaddr.IPNetwork(event.cidr).ip), + event['name'], + ip_address, log_exception=False ) - - def _kill_monitor(self): - if self.monitor: - # Kill PID instead of calling self.monitor.stop() because the ip - # monitor is running as root while keepalived-state-change is not - # (dropped privileges after launching the ip monitor) and will fail - # with "Permission denied". Also, we can safely do this because the - # monitor was launched with respawn_interval=None so it won't be - # automatically respawned - agent_utils.kill_process(self.monitor.pid, signal.SIGKILL, - run_as_root=True) + LOG.debug('Sent GARP to %(ip_address)s from %(device_name)s', + {'ip_address': ip_address, 'device_name': event['name']}) def handle_sigterm(self, signum, frame): - self._kill_monitor() + self.event_stop.set() + self._thread_read_queue.join(timeout=5) super(MonitorDaemon, self).handle_sigterm(signum, frame) @@ -162,7 +160,7 @@ def configure(conf): conf.set_override('debug', True) conf.set_override('use_syslog', True) config.setup_logging() - agent_config.setup_privsep() + privileged.default.set_client_mode(False) def main(): diff --git a/neutron/agent/linux/ip_monitor.py b/neutron/agent/linux/ip_monitor.py deleted file mode 100644 index 2a386f06ae0..00000000000 --- a/neutron/agent/linux/ip_monitor.py +++ /dev/null @@ -1,86 +0,0 @@ -# Copyright 2015 Red Hat, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_log import log as logging -from oslo_utils import excutils - -from neutron.agent.common import async_process -from neutron.agent.linux import ip_lib - -LOG = logging.getLogger(__name__) - - -class IPMonitorEvent(object): - def __init__(self, line, added, interface, cidr): - self.line = line - self.added = added - self.interface = interface - self.cidr = cidr - - def __str__(self): - return self.line - - @classmethod - def from_text(cls, line): - route = line.split() - - try: - first_word = route[0] - except IndexError: - with excutils.save_and_reraise_exception(): - LOG.error('Unable to parse route "%s"', line) - - added = (first_word != 'Deleted') - if not added: - route = route[1:] - - try: - interface = ip_lib.remove_interface_suffix(route[1]) - cidr = route[3] - except IndexError: - with excutils.save_and_reraise_exception(): - LOG.error('Unable to parse route "%s"', line) - - return cls(line, added, interface, cidr) - - -class IPMonitor(async_process.AsyncProcess): - """Wrapper over `ip monitor address`. - - To monitor and react indefinitely: - m = IPMonitor(namespace='tmp', root_as_root=True) - m.start() - for iterable in m: - event = IPMonitorEvent.from_text(iterable) - print(event, event.added, event.interface, event.cidr) - """ - - def __init__(self, - namespace=None, - run_as_root=True, - respawn_interval=None): - super(IPMonitor, self).__init__(['ip', '-o', 'monitor', 'address'], - run_as_root=run_as_root, - respawn_interval=respawn_interval, - namespace=namespace) - - def __iter__(self): - return self.iter_stdout(block=True) - - def start(self): - super(IPMonitor, self).start(block=True) - - def stop(self): - super(IPMonitor, self).stop(block=True) diff --git a/neutron/tests/functional/agent/l3/bin/__init__.py b/neutron/tests/functional/agent/l3/bin/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/tests/functional/agent/l3/bin/cmd_keepalived_state_change.py b/neutron/tests/functional/agent/l3/bin/cmd_keepalived_state_change.py new file mode 100755 index 00000000000..8639b838955 --- /dev/null +++ b/neutron/tests/functional/agent/l3/bin/cmd_keepalived_state_change.py @@ -0,0 +1,22 @@ +#!/usr/bin/python +# Copyright (c) 2015 Red Hat Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sys + +from neutron.agent.l3.keepalived_state_change import main + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/neutron/tests/functional/agent/l3/test_keepalived_state_change.py b/neutron/tests/functional/agent/l3/test_keepalived_state_change.py index 33976cdd7b9..aec78116227 100644 --- a/neutron/tests/functional/agent/l3/test_keepalived_state_change.py +++ b/neutron/tests/functional/agent/l3/test_keepalived_state_change.py @@ -18,13 +18,14 @@ import os import eventlet import mock import netaddr -from oslo_config import fixture as fixture_config from oslo_utils import uuidutils -from neutron.agent.l3 import keepalived_state_change +from neutron.agent.l3 import ha +from neutron.agent.l3 import ha_router +from neutron.agent.linux import external_process from neutron.agent.linux import ip_lib +from neutron.agent.linux import utils as linux_utils from neutron.common import utils -from neutron.conf.agent.l3 import keepalived as kd from neutron.tests.common import machine_fixtures as mf from neutron.tests.common import net_helpers from neutron.tests.functional import base @@ -37,104 +38,72 @@ def has_expected_arp_entry(device_name, namespace, ip, mac): return entry != [] -class TestKeepalivedStateChange(base.BaseSudoTestCase): - def setUp(self): - super(TestKeepalivedStateChange, self).setUp() - self.conf_fixture = self.useFixture(fixture_config.Config()) - kd.register_l3_agent_keepalived_opts(self.conf_fixture) - self.router_id = uuidutils.generate_uuid() - self.conf_dir = self.get_default_temp_dir().path - self.cidr = '169.254.128.1/24' - self.interface_name = utils.get_rand_name() - self.monitor = keepalived_state_change.MonitorDaemon( - self.get_temp_file_path('monitor.pid'), - self.router_id, - 1, - 2, - utils.get_rand_name(), - self.conf_dir, - self.interface_name, - self.cidr) - mock.patch.object(self.monitor, 'notify_agent').start() - self.line = '1: %s inet %s' % (self.interface_name, self.cidr) - - def test_parse_and_handle_event_wrong_device_completes_without_error(self): - self.monitor.parse_and_handle_event( - '1: wrong_device inet wrong_cidr') - - def _get_state(self): - with open(os.path.join(self.monitor.conf_dir, 'state')) as state_file: - return state_file.read() - - def test_parse_and_handle_event_writes_to_file(self): - self.monitor.parse_and_handle_event('Deleted %s' % self.line) - self.assertEqual('backup', self._get_state()) - - self.monitor.parse_and_handle_event(self.line) - self.assertEqual('master', self._get_state()) - - def test_parse_and_handle_event_fails_writing_state(self): - with mock.patch.object( - self.monitor, 'write_state_change', side_effect=OSError): - self.monitor.parse_and_handle_event(self.line) - - def test_parse_and_handle_event_fails_notifying_agent(self): - with mock.patch.object( - self.monitor, 'notify_agent', side_effect=Exception): - self.monitor.parse_and_handle_event(self.line) - - def test_handle_initial_state_backup(self): - ip = ip_lib.IPWrapper(namespace=self.monitor.namespace) - ip.netns.add(self.monitor.namespace) - self.addCleanup(ip.netns.delete, self.monitor.namespace) - ip.add_dummy(self.interface_name) - - with mock.patch.object( - self.monitor, 'write_state_change') as write_state_change,\ - mock.patch.object( - self.monitor, 'notify_agent') as notify_agent: - - self.monitor.handle_initial_state() - write_state_change.assert_not_called() - notify_agent.assert_not_called() - - def test_handle_initial_state_master(self): - ip = ip_lib.IPWrapper(namespace=self.monitor.namespace) - ip.netns.add(self.monitor.namespace) - self.addCleanup(ip.netns.delete, self.monitor.namespace) - ha_interface = ip.add_dummy(self.interface_name) - - ha_interface.addr.add(self.cidr) - - self.monitor.handle_initial_state() - self.assertEqual('master', self._get_state()) - - -class TestMonitorDaemon(base.BaseSudoTestCase): +class TestMonitorDaemon(base.BaseLoggingTestCase): def setUp(self): super(TestMonitorDaemon, self).setUp() + self.conf_dir = self.get_default_temp_dir().path + self.pid_file = os.path.join(self.conf_dir, 'pid_file') + self.log_file = os.path.join(self.conf_dir, 'log_file') + self.state_file = os.path.join(self.conf_dir, + 'keepalived-state-change') + self.cidr = '169.254.151.1/24' bridge = self.useFixture(net_helpers.OVSBridgeFixture()).bridge self.machines = self.useFixture(mf.PeerMachines(bridge)) self.router, self.peer = self.machines.machines[:2] + self.router_id = uuidutils.generate_uuid() - conf_dir = self.get_default_temp_dir().path - monitor = keepalived_state_change.MonitorDaemon( - self.get_temp_file_path('monitor.pid'), - uuidutils.generate_uuid(), - 1, - 2, - self.router.namespace, - conf_dir, - 'foo-iface', - self.machines.ip_cidr - ) - eventlet.spawn_n(monitor.run, run_as_root=True) - monitor_started = functools.partial( - lambda mon: mon.monitor is not None, monitor) - utils.wait_until_true(monitor_started) - self.addCleanup(monitor.monitor.stop) + self.cmd_opts = [ + ha_router.STATE_CHANGE_PROC_NAME, + '--router_id=%s' % self.router_id, + '--namespace=%s' % self.router.namespace, + '--conf_dir=%s' % self.conf_dir, + '--log-file=%s' % self.log_file, + '--monitor_interface=%s' % self.router.port.name, + '--monitor_cidr=%s' % self.cidr, + '--pid_file=%s' % self.pid_file, + '--state_path=%s' % self.conf_dir, + '--user=%s' % os.geteuid(), + '--group=%s' % os.getegid() + ] + self.ext_process = external_process.ProcessManager( + None, self.router_id, namespace=self.router.namespace, + service='test_ip_mon', pids_path=self.conf_dir, + default_cmd_callback=self._callback, run_as_root=True, + pid_file=self.pid_file) + + server = linux_utils.UnixDomainWSGIServer( + 'neutron-keepalived-state-change', num_threads=1) + server.start(ha.KeepalivedStateChangeHandler(mock.Mock()), + self.state_file, workers=0, + backlog=ha.KEEPALIVED_STATE_CHANGE_SERVER_BACKLOG) + self.addCleanup(server.stop) + + def _run_monitor(self): + self.ext_process.enable() + self.addCleanup(self.ext_process.disable) + eventlet.sleep(5) + + def _callback(self, *args): + return self.cmd_opts + + def _search_in_file(self, file_name, text): + def text_in_file(): + try: + return text in open(file_name).read() + except FileNotFoundError: + return False + try: + utils.wait_until_true(text_in_file, timeout=15) + except utils.WaitTimeout: + # NOTE: we need to read here the content of the file. + raise RuntimeError( + 'Text not found in file %(file_name)s: "%(text)s". File ' + 'content: %(file_content)s' % + {'file_name': file_name, 'text': text, + 'file_content': open(file_name).read()}) def test_new_fip_sends_garp(self): + self._run_monitor() next_ip_cidr = net_helpers.increment_ip_cidr(self.machines.ip_cidr, 2) expected_ip = str(netaddr.IPNetwork(next_ip_cidr).ip) # Create incomplete ARP entry @@ -159,6 +128,42 @@ class TestMonitorDaemon(base.BaseSudoTestCase): self.peer.namespace, expected_ip, self.router.port.link.address)) - utils.wait_until_true( - has_arp_entry_predicate, - exception=exc) + utils.wait_until_true(has_arp_entry_predicate, timeout=15, + exception=exc) + + def test_read_queue_change_state(self): + self._run_monitor() + msg = 'Wrote router %s state %s' + self.router.port.addr.add(self.cidr) + self._search_in_file(self.log_file, msg % (self.router_id, 'master')) + self.router.port.addr.delete(self.cidr) + self._search_in_file(self.log_file, msg % (self.router_id, 'backup')) + + def test_read_queue_send_garp(self): + self._run_monitor() + dev_dummy = 'dev_dummy' + ip_wrapper = ip_lib.IPWrapper(namespace=self.router.namespace) + ip_wrapper.add_dummy(dev_dummy) + ip_device = ip_lib.IPDevice(dev_dummy, namespace=self.router.namespace) + ip_device.link.set_up() + msg = 'Sent GARP to %(ip_address)s from %(device_name)s' + for idx in range(2, 20): + next_cidr = net_helpers.increment_ip_cidr(self.cidr, idx) + ip_device.addr.add(next_cidr) + msg_args = {'ip_address': str(netaddr.IPNetwork(next_cidr).ip), + 'device_name': dev_dummy} + self._search_in_file(self.log_file, msg % msg_args) + ip_device.addr.delete(next_cidr) + + def test_handle_initial_state_backup(self): + # No tracked IP (self.cidr) is configured in the monitored interface + # (self.router.port) + self._run_monitor() + msg = 'Initial status of router %s is %s' % (self.router_id, 'backup') + self._search_in_file(self.log_file, msg) + + def test_handle_initial_state_master(self): + self.router.port.addr.add(self.cidr) + self._run_monitor() + msg = 'Initial status of router %s is %s' % (self.router_id, 'master') + self._search_in_file(self.log_file, msg) diff --git a/neutron/tests/functional/agent/linux/test_ip_monitor.py b/neutron/tests/functional/agent/linux/test_ip_monitor.py deleted file mode 100644 index ae1a0379ee3..00000000000 --- a/neutron/tests/functional/agent/linux/test_ip_monitor.py +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright 2015 Red Hat, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from neutron.agent.common import async_process -from neutron.agent.linux import ip_monitor -from neutron.tests.functional.agent.linux import test_ip_lib - - -class TestIPMonitor(test_ip_lib.IpLibTestFramework): - def setUp(self): - super(TestIPMonitor, self).setUp() - attr = self.generate_device_details() - self.device = self.manage_device(attr) - self.monitor = ip_monitor.IPMonitor(attr.namespace) - self.addCleanup(self._safe_stop_monitor) - - def _safe_stop_monitor(self): - try: - self.monitor.stop() - except async_process.AsyncProcessException: - pass - - def test_ip_monitor_lifecycle(self): - self.assertFalse(self.monitor.is_active()) - self.monitor.start() - self.assertTrue(self.monitor.is_active()) - self.monitor.stop() - self.assertFalse(self.monitor.is_active()) - - def test_ip_monitor_events(self): - self.monitor.start() - - cidr = '169.254.128.1/24' - self.device.addr.add(cidr) - self._assert_event(expected_name=self.device.name, - expected_cidr=cidr, - expected_added=True, - event=ip_monitor.IPMonitorEvent.from_text( - next(self.monitor.iter_stdout(block=True)))) - - self.device.addr.delete(cidr) - self._assert_event(expected_name=self.device.name, - expected_cidr=cidr, - expected_added=False, - event=ip_monitor.IPMonitorEvent.from_text( - next(self.monitor.iter_stdout(block=True)))) - - def _assert_event(self, - expected_name, - expected_cidr, - expected_added, - event): - self.assertEqual(expected_name, event.interface) - self.assertEqual(expected_added, event.added) - self.assertEqual(expected_cidr, event.cidr) diff --git a/neutron/tests/unit/agent/linux/test_ip_monitor.py b/neutron/tests/unit/agent/linux/test_ip_monitor.py deleted file mode 100644 index c73693f9b46..00000000000 --- a/neutron/tests/unit/agent/linux/test_ip_monitor.py +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright 2015 Red Hat, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from neutron.agent.linux import ip_monitor -from neutron.tests import base - - -class TestIPMonitorEvent(base.BaseTestCase): - def test_from_text_parses_added_line(self): - event = ip_monitor.IPMonitorEvent.from_text( - '3: wlp3s0 inet 192.168.3.59/24 brd 192.168.3.255 ' - r'scope global dynamic wlp3s0\ valid_lft 300sec ' - 'preferred_lft 300sec') - self.assertEqual('wlp3s0', event.interface) - self.assertTrue(event.added) - self.assertEqual('192.168.3.59/24', event.cidr) - - def test_from_text_parses_deleted_line(self): - event = ip_monitor.IPMonitorEvent.from_text( - 'Deleted 1: lo inet 127.0.0.2/8 scope host secondary lo\'' - ' valid_lft forever preferred_lft forever') - self.assertEqual('lo', event.interface) - self.assertFalse(event.added) - self.assertEqual('127.0.0.2/8', event.cidr)