diff --git a/etc/neutron/rootwrap.d/l3.filters b/etc/neutron/rootwrap.d/l3.filters index be69b32c57e..a3818a2dcf7 100644 --- a/etc/neutron/rootwrap.d/l3.filters +++ b/etc/neutron/rootwrap.d/l3.filters @@ -31,6 +31,9 @@ kill_radvd: KillFilter, root, /sbin/radvd, -9, -HUP ip: IpFilter, ip, root ip_exec: IpNetnsExecFilter, ip, root +# For ip monitor +kill_ip_monitor: KillFilter, root, ip, -9 + # ovs_lib (if OVSInterfaceDriver is used) ovs-vsctl: CommandFilter, ovs-vsctl, root diff --git a/neutron/agent/linux/async_process.py b/neutron/agent/linux/async_process.py index 79f80ac298f..8da4ae2c30f 100644 --- a/neutron/agent/linux/async_process.py +++ b/neutron/agent/linux/async_process.py @@ -87,10 +87,10 @@ class AsyncProcess(object): return utils.pid_invoked_with_cmdline( self.pid, self.cmd_without_namespace) - def start(self, blocking=False): + def start(self, block=False): """Launch a process and monitor it asynchronously. - :param blocking: Block until the process has started. + :param block: Block until the process has started. :raises eventlet.timeout.Timeout if blocking is True and the process did not start in time. """ @@ -100,13 +100,13 @@ class AsyncProcess(object): LOG.debug('Launching async process [%s].', self.cmd) self._spawn() - if blocking: + if block: utils.wait_until_true(self.is_active) - def stop(self, blocking=False): + def stop(self, block=False): """Halt the process and watcher threads. - :param blocking: Block until the process has stopped. + :param block: Block until the process has stopped. :raises eventlet.timeout.Timeout if blocking is True and the process did not stop in time. """ @@ -116,7 +116,7 @@ class AsyncProcess(object): else: raise AsyncProcessException(_('Process is not running.')) - if blocking: + if block: utils.wait_until_true(lambda: not self.is_active()) def _spawn(self): @@ -216,15 +216,15 @@ class AsyncProcess(object): def _read_stderr(self): return self._read(self._process.stderr, self._stderr_lines) - def _iter_queue(self, queue): + def _iter_queue(self, queue, block): while True: try: - yield queue.get_nowait() + yield queue.get(block=block) except eventlet.queue.Empty: break - def iter_stdout(self): - return self._iter_queue(self._stdout_lines) + def iter_stdout(self, block=False): + return self._iter_queue(self._stdout_lines, block) - def iter_stderr(self): - return self._iter_queue(self._stderr_lines) + def iter_stderr(self, block=False): + return self._iter_queue(self._stderr_lines, block) diff --git a/neutron/agent/linux/ip_monitor.py b/neutron/agent/linux/ip_monitor.py new file mode 100644 index 00000000000..f7485c2ec4b --- /dev/null +++ b/neutron/agent/linux/ip_monitor.py @@ -0,0 +1,85 @@ +# Copyright 2015 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_utils import excutils + +from neutron.agent.linux import async_process +from neutron.i18n import _LE +from neutron.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +class IPMonitorEvent(object): + def __init__(self, line, added, interface, cidr): + self.line = line + self.added = added + self.interface = interface + self.cidr = cidr + + def __str__(self): + return self.line + + @classmethod + def from_text(cls, line): + route = line.split() + + try: + first_word = route[0] + except IndexError: + with excutils.save_and_reraise_exception(): + LOG.error(_LE('Unable to parse route "%s"'), line) + + added = (first_word != 'Deleted') + if not added: + route = route[1:] + + try: + interface = route[1] + cidr = route[3] + except IndexError: + with excutils.save_and_reraise_exception(): + LOG.error(_LE('Unable to parse route "%s"'), line) + + return cls(line, added, interface, cidr) + + +class IPMonitor(async_process.AsyncProcess): + """Wrapper over `ip monitor address`. + + To monitor and react indefinitely: + m = IPMonitor(namespace='tmp') + m.start() + for iterable in m: + event = IPMonitorEvent.from_text(iterable) + print event, event.added, event.interface, event.cidr + """ + + def __init__(self, + namespace=None, + respawn_interval=None): + super(IPMonitor, self).__init__(['ip', '-o', 'monitor', 'address'], + run_as_root=True, + respawn_interval=respawn_interval, + namespace=namespace) + + def __iter__(self): + return self.iter_stdout(block=True) + + def start(self): + super(IPMonitor, self).start(block=True) + + def stop(self): + super(IPMonitor, self).stop(block=True) diff --git a/neutron/tests/functional/agent/linux/test_async_process.py b/neutron/tests/functional/agent/linux/test_async_process.py index afb9d354a6f..8fddb6d4b39 100644 --- a/neutron/tests/functional/agent/linux/test_async_process.py +++ b/neutron/tests/functional/agent/linux/test_async_process.py @@ -50,9 +50,9 @@ class TestAsyncProcess(AsyncProcessTestFramework): proc = async_process.AsyncProcess(['tail', '-f', self.test_file_path]) self.addCleanup(self._safe_stop, proc) - proc.start(blocking=True) + proc.start(block=True) self._check_stdout(proc) - proc.stop(blocking=True) + proc.stop(block=True) # Ensure that the process and greenthreads have stopped proc._process.wait() diff --git a/neutron/tests/functional/agent/linux/test_ip_monitor.py b/neutron/tests/functional/agent/linux/test_ip_monitor.py new file mode 100644 index 00000000000..dd0fd125299 --- /dev/null +++ b/neutron/tests/functional/agent/linux/test_ip_monitor.py @@ -0,0 +1,67 @@ +# Copyright 2015 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.agent.linux import async_process +from neutron.agent.linux import ip_monitor +from neutron.tests.functional.agent.linux import test_ip_lib + + +class TestIPMonitor(test_ip_lib.IpLibTestFramework): + def setUp(self): + super(TestIPMonitor, self).setUp() + attr = self.generate_device_details() + self.device = self.manage_device(attr) + self.monitor = ip_monitor.IPMonitor(attr.namespace) + self.addCleanup(self._safe_stop_monitor) + + def _safe_stop_monitor(self): + try: + self.monitor.stop() + except async_process.AsyncProcessException: + pass + + def test_ip_monitor_lifecycle(self): + self.assertFalse(self.monitor.is_active()) + self.monitor.start() + self.assertTrue(self.monitor.is_active()) + self.monitor.stop() + self.assertFalse(self.monitor.is_active()) + + def test_ip_monitor_events(self): + self.monitor.start() + + cidr = '169.254.128.1/24' + self.device.addr.add(4, cidr, '169.254.128.255') + self._assert_event(expected_name=self.device.name, + expected_cidr=cidr, + expected_added=True, + event=ip_monitor.IPMonitorEvent.from_text( + next(self.monitor.iter_stdout(block=True)))) + + self.device.addr.delete(4, cidr) + self._assert_event(expected_name=self.device.name, + expected_cidr=cidr, + expected_added=False, + event=ip_monitor.IPMonitorEvent.from_text( + next(self.monitor.iter_stdout(block=True)))) + + def _assert_event(self, + expected_name, + expected_cidr, + expected_added, + event): + self.assertEqual(expected_name, event.interface) + self.assertEqual(expected_added, event.added) + self.assertEqual(expected_cidr, event.cidr) diff --git a/neutron/tests/functional/agent/linux/test_utils.py b/neutron/tests/functional/agent/linux/test_utils.py index a9e8671816e..5508457218e 100644 --- a/neutron/tests/functional/agent/linux/test_utils.py +++ b/neutron/tests/functional/agent/linux/test_utils.py @@ -24,7 +24,7 @@ class TestPIDHelpers(test_async_process.AsyncProcessTestFramework): def test_get_cmdline_from_pid_and_pid_invoked_with_cmdline(self): cmd = ['tail', '-f', self.test_file_path] proc = async_process.AsyncProcess(cmd) - proc.start(blocking=True) + proc.start(block=True) self.addCleanup(proc.stop) pid = proc.pid diff --git a/neutron/tests/unit/agent/linux/test_async_process.py b/neutron/tests/unit/agent/linux/test_async_process.py index b35af6e372a..6856d864682 100644 --- a/neutron/tests/unit/agent/linux/test_async_process.py +++ b/neutron/tests/unit/agent/linux/test_async_process.py @@ -128,13 +128,14 @@ class TestAsyncProcess(base.BaseTestCase): mock_start.assert_called_once_with() def test__iter_queue_returns_empty_list_for_empty_queue(self): - result = list(self.proc._iter_queue(eventlet.queue.LightQueue())) + result = list(self.proc._iter_queue(eventlet.queue.LightQueue(), + False)) self.assertEqual(result, []) def test__iter_queue_returns_queued_data(self): queue = eventlet.queue.LightQueue() queue.put('foo') - result = list(self.proc._iter_queue(queue)) + result = list(self.proc._iter_queue(queue, False)) self.assertEqual(result, ['foo']) def _test_iter_output_calls_iter_queue_on_output_queue(self, output_type): @@ -146,7 +147,7 @@ class TestAsyncProcess(base.BaseTestCase): self.assertEqual(value, expected_value) queue = getattr(self.proc, '_%s_lines' % output_type, None) - mock_iter_queue.assert_called_with(queue) + mock_iter_queue.assert_called_with(queue, False) def test_iter_stdout(self): self._test_iter_output_calls_iter_queue_on_output_queue('stdout') diff --git a/neutron/tests/unit/agent/linux/test_ip_monitor.py b/neutron/tests/unit/agent/linux/test_ip_monitor.py new file mode 100644 index 00000000000..ea71237862d --- /dev/null +++ b/neutron/tests/unit/agent/linux/test_ip_monitor.py @@ -0,0 +1,36 @@ +# Copyright 2015 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.agent.linux import ip_monitor +from neutron.tests import base + + +class TestIPMonitorEvent(base.BaseTestCase): + def test_from_text_parses_added_line(self): + event = ip_monitor.IPMonitorEvent.from_text( + '3: wlp3s0 inet 192.168.3.59/24 brd 192.168.3.255 ' + 'scope global dynamic wlp3s0\ valid_lft 300sec ' + 'preferred_lft 300sec') + self.assertEqual('wlp3s0', event.interface) + self.assertTrue(event.added) + self.assertEqual('192.168.3.59/24', event.cidr) + + def test_from_text_parses_deleted_line(self): + event = ip_monitor.IPMonitorEvent.from_text( + 'Deleted 1: lo inet 127.0.0.2/8 scope host secondary lo\'' + ' valid_lft forever preferred_lft forever') + self.assertEqual('lo', event.interface) + self.assertFalse(event.added) + self.assertEqual('127.0.0.2/8', event.cidr)