Introduce ip address monitor

In Juno, we used keepalived notifier scripts to report the local
state of an HA router's state. These have been found to be
unreliable. The proposed approach is to not use them altogether.
Instead, monitor the omnipresent VIP on the HA device - It is
only configured on the master instance. In order to do that,
we'll use the 'ip monitor address' wrapper introduced in this patch
to get address change events as they happen to avoid polling.

Related-Bug: #1402010
Change-Id: Icc2c07efb7e20008ff5b07d7df2104e6099091d7
This commit is contained in:
Assaf Muller 2015-02-21 19:28:54 -05:00
parent 7907d40075
commit aec3a94cd3
8 changed files with 210 additions and 18 deletions

View File

@ -31,6 +31,9 @@ kill_radvd: KillFilter, root, /sbin/radvd, -9, -HUP
ip: IpFilter, ip, root
ip_exec: IpNetnsExecFilter, ip, root
# For ip monitor
kill_ip_monitor: KillFilter, root, ip, -9
# ovs_lib (if OVSInterfaceDriver is used)
ovs-vsctl: CommandFilter, ovs-vsctl, root

View File

@ -87,10 +87,10 @@ class AsyncProcess(object):
return utils.pid_invoked_with_cmdline(
self.pid, self.cmd_without_namespace)
def start(self, blocking=False):
def start(self, block=False):
"""Launch a process and monitor it asynchronously.
:param blocking: Block until the process has started.
:param block: Block until the process has started.
:raises eventlet.timeout.Timeout if blocking is True and the process
did not start in time.
"""
@ -100,13 +100,13 @@ class AsyncProcess(object):
LOG.debug('Launching async process [%s].', self.cmd)
self._spawn()
if blocking:
if block:
utils.wait_until_true(self.is_active)
def stop(self, blocking=False):
def stop(self, block=False):
"""Halt the process and watcher threads.
:param blocking: Block until the process has stopped.
:param block: Block until the process has stopped.
:raises eventlet.timeout.Timeout if blocking is True and the process
did not stop in time.
"""
@ -116,7 +116,7 @@ class AsyncProcess(object):
else:
raise AsyncProcessException(_('Process is not running.'))
if blocking:
if block:
utils.wait_until_true(lambda: not self.is_active())
def _spawn(self):
@ -216,15 +216,15 @@ class AsyncProcess(object):
def _read_stderr(self):
return self._read(self._process.stderr, self._stderr_lines)
def _iter_queue(self, queue):
def _iter_queue(self, queue, block):
while True:
try:
yield queue.get_nowait()
yield queue.get(block=block)
except eventlet.queue.Empty:
break
def iter_stdout(self):
return self._iter_queue(self._stdout_lines)
def iter_stdout(self, block=False):
return self._iter_queue(self._stdout_lines, block)
def iter_stderr(self):
return self._iter_queue(self._stderr_lines)
def iter_stderr(self, block=False):
return self._iter_queue(self._stderr_lines, block)

View File

@ -0,0 +1,85 @@
# Copyright 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import excutils
from neutron.agent.linux import async_process
from neutron.i18n import _LE
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class IPMonitorEvent(object):
def __init__(self, line, added, interface, cidr):
self.line = line
self.added = added
self.interface = interface
self.cidr = cidr
def __str__(self):
return self.line
@classmethod
def from_text(cls, line):
route = line.split()
try:
first_word = route[0]
except IndexError:
with excutils.save_and_reraise_exception():
LOG.error(_LE('Unable to parse route "%s"'), line)
added = (first_word != 'Deleted')
if not added:
route = route[1:]
try:
interface = route[1]
cidr = route[3]
except IndexError:
with excutils.save_and_reraise_exception():
LOG.error(_LE('Unable to parse route "%s"'), line)
return cls(line, added, interface, cidr)
class IPMonitor(async_process.AsyncProcess):
"""Wrapper over `ip monitor address`.
To monitor and react indefinitely:
m = IPMonitor(namespace='tmp')
m.start()
for iterable in m:
event = IPMonitorEvent.from_text(iterable)
print event, event.added, event.interface, event.cidr
"""
def __init__(self,
namespace=None,
respawn_interval=None):
super(IPMonitor, self).__init__(['ip', '-o', 'monitor', 'address'],
run_as_root=True,
respawn_interval=respawn_interval,
namespace=namespace)
def __iter__(self):
return self.iter_stdout(block=True)
def start(self):
super(IPMonitor, self).start(block=True)
def stop(self):
super(IPMonitor, self).stop(block=True)

View File

@ -50,9 +50,9 @@ class TestAsyncProcess(AsyncProcessTestFramework):
proc = async_process.AsyncProcess(['tail', '-f',
self.test_file_path])
self.addCleanup(self._safe_stop, proc)
proc.start(blocking=True)
proc.start(block=True)
self._check_stdout(proc)
proc.stop(blocking=True)
proc.stop(block=True)
# Ensure that the process and greenthreads have stopped
proc._process.wait()

View File

@ -0,0 +1,67 @@
# Copyright 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.linux import async_process
from neutron.agent.linux import ip_monitor
from neutron.tests.functional.agent.linux import test_ip_lib
class TestIPMonitor(test_ip_lib.IpLibTestFramework):
def setUp(self):
super(TestIPMonitor, self).setUp()
attr = self.generate_device_details()
self.device = self.manage_device(attr)
self.monitor = ip_monitor.IPMonitor(attr.namespace)
self.addCleanup(self._safe_stop_monitor)
def _safe_stop_monitor(self):
try:
self.monitor.stop()
except async_process.AsyncProcessException:
pass
def test_ip_monitor_lifecycle(self):
self.assertFalse(self.monitor.is_active())
self.monitor.start()
self.assertTrue(self.monitor.is_active())
self.monitor.stop()
self.assertFalse(self.monitor.is_active())
def test_ip_monitor_events(self):
self.monitor.start()
cidr = '169.254.128.1/24'
self.device.addr.add(4, cidr, '169.254.128.255')
self._assert_event(expected_name=self.device.name,
expected_cidr=cidr,
expected_added=True,
event=ip_monitor.IPMonitorEvent.from_text(
next(self.monitor.iter_stdout(block=True))))
self.device.addr.delete(4, cidr)
self._assert_event(expected_name=self.device.name,
expected_cidr=cidr,
expected_added=False,
event=ip_monitor.IPMonitorEvent.from_text(
next(self.monitor.iter_stdout(block=True))))
def _assert_event(self,
expected_name,
expected_cidr,
expected_added,
event):
self.assertEqual(expected_name, event.interface)
self.assertEqual(expected_added, event.added)
self.assertEqual(expected_cidr, event.cidr)

View File

@ -24,7 +24,7 @@ class TestPIDHelpers(test_async_process.AsyncProcessTestFramework):
def test_get_cmdline_from_pid_and_pid_invoked_with_cmdline(self):
cmd = ['tail', '-f', self.test_file_path]
proc = async_process.AsyncProcess(cmd)
proc.start(blocking=True)
proc.start(block=True)
self.addCleanup(proc.stop)
pid = proc.pid

View File

@ -128,13 +128,14 @@ class TestAsyncProcess(base.BaseTestCase):
mock_start.assert_called_once_with()
def test__iter_queue_returns_empty_list_for_empty_queue(self):
result = list(self.proc._iter_queue(eventlet.queue.LightQueue()))
result = list(self.proc._iter_queue(eventlet.queue.LightQueue(),
False))
self.assertEqual(result, [])
def test__iter_queue_returns_queued_data(self):
queue = eventlet.queue.LightQueue()
queue.put('foo')
result = list(self.proc._iter_queue(queue))
result = list(self.proc._iter_queue(queue, False))
self.assertEqual(result, ['foo'])
def _test_iter_output_calls_iter_queue_on_output_queue(self, output_type):
@ -146,7 +147,7 @@ class TestAsyncProcess(base.BaseTestCase):
self.assertEqual(value, expected_value)
queue = getattr(self.proc, '_%s_lines' % output_type, None)
mock_iter_queue.assert_called_with(queue)
mock_iter_queue.assert_called_with(queue, False)
def test_iter_stdout(self):
self._test_iter_output_calls_iter_queue_on_output_queue('stdout')

View File

@ -0,0 +1,36 @@
# Copyright 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.linux import ip_monitor
from neutron.tests import base
class TestIPMonitorEvent(base.BaseTestCase):
def test_from_text_parses_added_line(self):
event = ip_monitor.IPMonitorEvent.from_text(
'3: wlp3s0 inet 192.168.3.59/24 brd 192.168.3.255 '
'scope global dynamic wlp3s0\ valid_lft 300sec '
'preferred_lft 300sec')
self.assertEqual('wlp3s0', event.interface)
self.assertTrue(event.added)
self.assertEqual('192.168.3.59/24', event.cidr)
def test_from_text_parses_deleted_line(self):
event = ip_monitor.IPMonitorEvent.from_text(
'Deleted 1: lo inet 127.0.0.2/8 scope host secondary lo\''
' valid_lft forever preferred_lft forever')
self.assertEqual('lo', event.interface)
self.assertFalse(event.added)
self.assertEqual('127.0.0.2/8', event.cidr)