From aec3a94cd360be94a20d095241e5b2a6dd5ec21f Mon Sep 17 00:00:00 2001 From: Assaf Muller Date: Sat, 21 Feb 2015 19:28:54 -0500 Subject: [PATCH] Introduce ip address monitor In Juno, we used keepalived notifier scripts to report the local state of an HA router's state. These have been found to be unreliable. The proposed approach is to not use them altogether. Instead, monitor the omnipresent VIP on the HA device - It is only configured on the master instance. In order to do that, we'll use the 'ip monitor address' wrapper introduced in this patch to get address change events as they happen to avoid polling. Related-Bug: #1402010 Change-Id: Icc2c07efb7e20008ff5b07d7df2104e6099091d7 --- etc/neutron/rootwrap.d/l3.filters | 3 + neutron/agent/linux/async_process.py | 24 +++--- neutron/agent/linux/ip_monitor.py | 85 +++++++++++++++++++ .../agent/linux/test_async_process.py | 4 +- .../functional/agent/linux/test_ip_monitor.py | 67 +++++++++++++++ .../functional/agent/linux/test_utils.py | 2 +- .../unit/agent/linux/test_async_process.py | 7 +- .../tests/unit/agent/linux/test_ip_monitor.py | 36 ++++++++ 8 files changed, 210 insertions(+), 18 deletions(-) create mode 100644 neutron/agent/linux/ip_monitor.py create mode 100644 neutron/tests/functional/agent/linux/test_ip_monitor.py create mode 100644 neutron/tests/unit/agent/linux/test_ip_monitor.py diff --git a/etc/neutron/rootwrap.d/l3.filters b/etc/neutron/rootwrap.d/l3.filters index be69b32c57e..a3818a2dcf7 100644 --- a/etc/neutron/rootwrap.d/l3.filters +++ b/etc/neutron/rootwrap.d/l3.filters @@ -31,6 +31,9 @@ kill_radvd: KillFilter, root, /sbin/radvd, -9, -HUP ip: IpFilter, ip, root ip_exec: IpNetnsExecFilter, ip, root +# For ip monitor +kill_ip_monitor: KillFilter, root, ip, -9 + # ovs_lib (if OVSInterfaceDriver is used) ovs-vsctl: CommandFilter, ovs-vsctl, root diff --git a/neutron/agent/linux/async_process.py b/neutron/agent/linux/async_process.py index 79f80ac298f..8da4ae2c30f 100644 --- a/neutron/agent/linux/async_process.py +++ b/neutron/agent/linux/async_process.py @@ -87,10 +87,10 @@ class AsyncProcess(object): return utils.pid_invoked_with_cmdline( self.pid, self.cmd_without_namespace) - def start(self, blocking=False): + def start(self, block=False): """Launch a process and monitor it asynchronously. - :param blocking: Block until the process has started. + :param block: Block until the process has started. :raises eventlet.timeout.Timeout if blocking is True and the process did not start in time. """ @@ -100,13 +100,13 @@ class AsyncProcess(object): LOG.debug('Launching async process [%s].', self.cmd) self._spawn() - if blocking: + if block: utils.wait_until_true(self.is_active) - def stop(self, blocking=False): + def stop(self, block=False): """Halt the process and watcher threads. - :param blocking: Block until the process has stopped. + :param block: Block until the process has stopped. :raises eventlet.timeout.Timeout if blocking is True and the process did not stop in time. """ @@ -116,7 +116,7 @@ class AsyncProcess(object): else: raise AsyncProcessException(_('Process is not running.')) - if blocking: + if block: utils.wait_until_true(lambda: not self.is_active()) def _spawn(self): @@ -216,15 +216,15 @@ class AsyncProcess(object): def _read_stderr(self): return self._read(self._process.stderr, self._stderr_lines) - def _iter_queue(self, queue): + def _iter_queue(self, queue, block): while True: try: - yield queue.get_nowait() + yield queue.get(block=block) except eventlet.queue.Empty: break - def iter_stdout(self): - return self._iter_queue(self._stdout_lines) + def iter_stdout(self, block=False): + return self._iter_queue(self._stdout_lines, block) - def iter_stderr(self): - return self._iter_queue(self._stderr_lines) + def iter_stderr(self, block=False): + return self._iter_queue(self._stderr_lines, block) diff --git a/neutron/agent/linux/ip_monitor.py b/neutron/agent/linux/ip_monitor.py new file mode 100644 index 00000000000..f7485c2ec4b --- /dev/null +++ b/neutron/agent/linux/ip_monitor.py @@ -0,0 +1,85 @@ +# Copyright 2015 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_utils import excutils + +from neutron.agent.linux import async_process +from neutron.i18n import _LE +from neutron.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +class IPMonitorEvent(object): + def __init__(self, line, added, interface, cidr): + self.line = line + self.added = added + self.interface = interface + self.cidr = cidr + + def __str__(self): + return self.line + + @classmethod + def from_text(cls, line): + route = line.split() + + try: + first_word = route[0] + except IndexError: + with excutils.save_and_reraise_exception(): + LOG.error(_LE('Unable to parse route "%s"'), line) + + added = (first_word != 'Deleted') + if not added: + route = route[1:] + + try: + interface = route[1] + cidr = route[3] + except IndexError: + with excutils.save_and_reraise_exception(): + LOG.error(_LE('Unable to parse route "%s"'), line) + + return cls(line, added, interface, cidr) + + +class IPMonitor(async_process.AsyncProcess): + """Wrapper over `ip monitor address`. + + To monitor and react indefinitely: + m = IPMonitor(namespace='tmp') + m.start() + for iterable in m: + event = IPMonitorEvent.from_text(iterable) + print event, event.added, event.interface, event.cidr + """ + + def __init__(self, + namespace=None, + respawn_interval=None): + super(IPMonitor, self).__init__(['ip', '-o', 'monitor', 'address'], + run_as_root=True, + respawn_interval=respawn_interval, + namespace=namespace) + + def __iter__(self): + return self.iter_stdout(block=True) + + def start(self): + super(IPMonitor, self).start(block=True) + + def stop(self): + super(IPMonitor, self).stop(block=True) diff --git a/neutron/tests/functional/agent/linux/test_async_process.py b/neutron/tests/functional/agent/linux/test_async_process.py index afb9d354a6f..8fddb6d4b39 100644 --- a/neutron/tests/functional/agent/linux/test_async_process.py +++ b/neutron/tests/functional/agent/linux/test_async_process.py @@ -50,9 +50,9 @@ class TestAsyncProcess(AsyncProcessTestFramework): proc = async_process.AsyncProcess(['tail', '-f', self.test_file_path]) self.addCleanup(self._safe_stop, proc) - proc.start(blocking=True) + proc.start(block=True) self._check_stdout(proc) - proc.stop(blocking=True) + proc.stop(block=True) # Ensure that the process and greenthreads have stopped proc._process.wait() diff --git a/neutron/tests/functional/agent/linux/test_ip_monitor.py b/neutron/tests/functional/agent/linux/test_ip_monitor.py new file mode 100644 index 00000000000..dd0fd125299 --- /dev/null +++ b/neutron/tests/functional/agent/linux/test_ip_monitor.py @@ -0,0 +1,67 @@ +# Copyright 2015 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.agent.linux import async_process +from neutron.agent.linux import ip_monitor +from neutron.tests.functional.agent.linux import test_ip_lib + + +class TestIPMonitor(test_ip_lib.IpLibTestFramework): + def setUp(self): + super(TestIPMonitor, self).setUp() + attr = self.generate_device_details() + self.device = self.manage_device(attr) + self.monitor = ip_monitor.IPMonitor(attr.namespace) + self.addCleanup(self._safe_stop_monitor) + + def _safe_stop_monitor(self): + try: + self.monitor.stop() + except async_process.AsyncProcessException: + pass + + def test_ip_monitor_lifecycle(self): + self.assertFalse(self.monitor.is_active()) + self.monitor.start() + self.assertTrue(self.monitor.is_active()) + self.monitor.stop() + self.assertFalse(self.monitor.is_active()) + + def test_ip_monitor_events(self): + self.monitor.start() + + cidr = '169.254.128.1/24' + self.device.addr.add(4, cidr, '169.254.128.255') + self._assert_event(expected_name=self.device.name, + expected_cidr=cidr, + expected_added=True, + event=ip_monitor.IPMonitorEvent.from_text( + next(self.monitor.iter_stdout(block=True)))) + + self.device.addr.delete(4, cidr) + self._assert_event(expected_name=self.device.name, + expected_cidr=cidr, + expected_added=False, + event=ip_monitor.IPMonitorEvent.from_text( + next(self.monitor.iter_stdout(block=True)))) + + def _assert_event(self, + expected_name, + expected_cidr, + expected_added, + event): + self.assertEqual(expected_name, event.interface) + self.assertEqual(expected_added, event.added) + self.assertEqual(expected_cidr, event.cidr) diff --git a/neutron/tests/functional/agent/linux/test_utils.py b/neutron/tests/functional/agent/linux/test_utils.py index a9e8671816e..5508457218e 100644 --- a/neutron/tests/functional/agent/linux/test_utils.py +++ b/neutron/tests/functional/agent/linux/test_utils.py @@ -24,7 +24,7 @@ class TestPIDHelpers(test_async_process.AsyncProcessTestFramework): def test_get_cmdline_from_pid_and_pid_invoked_with_cmdline(self): cmd = ['tail', '-f', self.test_file_path] proc = async_process.AsyncProcess(cmd) - proc.start(blocking=True) + proc.start(block=True) self.addCleanup(proc.stop) pid = proc.pid diff --git a/neutron/tests/unit/agent/linux/test_async_process.py b/neutron/tests/unit/agent/linux/test_async_process.py index b35af6e372a..6856d864682 100644 --- a/neutron/tests/unit/agent/linux/test_async_process.py +++ b/neutron/tests/unit/agent/linux/test_async_process.py @@ -128,13 +128,14 @@ class TestAsyncProcess(base.BaseTestCase): mock_start.assert_called_once_with() def test__iter_queue_returns_empty_list_for_empty_queue(self): - result = list(self.proc._iter_queue(eventlet.queue.LightQueue())) + result = list(self.proc._iter_queue(eventlet.queue.LightQueue(), + False)) self.assertEqual(result, []) def test__iter_queue_returns_queued_data(self): queue = eventlet.queue.LightQueue() queue.put('foo') - result = list(self.proc._iter_queue(queue)) + result = list(self.proc._iter_queue(queue, False)) self.assertEqual(result, ['foo']) def _test_iter_output_calls_iter_queue_on_output_queue(self, output_type): @@ -146,7 +147,7 @@ class TestAsyncProcess(base.BaseTestCase): self.assertEqual(value, expected_value) queue = getattr(self.proc, '_%s_lines' % output_type, None) - mock_iter_queue.assert_called_with(queue) + mock_iter_queue.assert_called_with(queue, False) def test_iter_stdout(self): self._test_iter_output_calls_iter_queue_on_output_queue('stdout') diff --git a/neutron/tests/unit/agent/linux/test_ip_monitor.py b/neutron/tests/unit/agent/linux/test_ip_monitor.py new file mode 100644 index 00000000000..ea71237862d --- /dev/null +++ b/neutron/tests/unit/agent/linux/test_ip_monitor.py @@ -0,0 +1,36 @@ +# Copyright 2015 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.agent.linux import ip_monitor +from neutron.tests import base + + +class TestIPMonitorEvent(base.BaseTestCase): + def test_from_text_parses_added_line(self): + event = ip_monitor.IPMonitorEvent.from_text( + '3: wlp3s0 inet 192.168.3.59/24 brd 192.168.3.255 ' + 'scope global dynamic wlp3s0\ valid_lft 300sec ' + 'preferred_lft 300sec') + self.assertEqual('wlp3s0', event.interface) + self.assertTrue(event.added) + self.assertEqual('192.168.3.59/24', event.cidr) + + def test_from_text_parses_deleted_line(self): + event = ip_monitor.IPMonitorEvent.from_text( + 'Deleted 1: lo inet 127.0.0.2/8 scope host secondary lo\'' + ' valid_lft forever preferred_lft forever') + self.assertEqual('lo', event.interface) + self.assertFalse(event.added) + self.assertEqual('127.0.0.2/8', event.cidr)