Merge "Replace "ip monitor" command with Pyroute2 implementation"

This commit is contained in:
Zuul 2019-12-12 01:59:07 +00:00 committed by Gerrit Code Review
commit cbe8ee4c5f
8 changed files with 165 additions and 331 deletions

View File

@ -368,9 +368,10 @@ class HaRouter(router.RouterInfo):
return external_process.ProcessManager(
self.agent_conf,
'%s.monitor' % self.router_id,
self.ha_namespace,
None,
service=KEEPALIVED_STATE_CHANGE_MONITOR_SERVICE_NAME,
default_cmd_callback=self._get_state_change_monitor_callback())
default_cmd_callback=self._get_state_change_monitor_callback(),
run_as_root=True)
def _get_state_change_monitor_callback(self):
ha_device = self.get_ha_device_name()

View File

@ -13,24 +13,23 @@
# under the License.
import os
import signal
import sys
import threading
import httplib2
import netaddr
from oslo_config import cfg
from oslo_log import log as logging
from six.moves import queue
from neutron._i18n import _
from neutron.agent.l3 import ha
from neutron.agent.l3 import ha_router
from neutron.agent.linux import daemon
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ip_monitor
from neutron.agent.linux import utils as agent_utils
from neutron.common import config
from neutron.conf.agent import common as agent_config
from neutron.conf.agent.l3 import keepalived
from neutron import privileged
LOG = logging.getLogger(__name__)
@ -55,33 +54,40 @@ class MonitorDaemon(daemon.Daemon):
self.interface = interface
self.cidr = cidr
self.monitor = None
super(MonitorDaemon, self).__init__(
pidfile, uuid=router_id,
user=user, group=group,
procname=ha_router.STATE_CHANGE_PROC_NAME)
self.event_stop = threading.Event()
self.event_started = threading.Event()
self.queue = queue.Queue()
super(MonitorDaemon, self).__init__(pidfile, uuid=router_id,
user=user, group=group)
def run(self, run_as_root=False):
self.monitor = ip_monitor.IPMonitor(namespace=self.namespace,
run_as_root=run_as_root)
self.monitor.start()
# Only drop privileges if the process is currently running as root
# (The run_as_root variable name here is unfortunate - It means to
# use a root helper when the running process is NOT already running
# as root
if not run_as_root:
super(MonitorDaemon, self).run()
def run(self):
self._thread_ip_monitor = threading.Thread(
target=ip_lib.ip_monitor,
args=(self.namespace, self.queue, self.event_stop,
self.event_started))
self._thread_read_queue = threading.Thread(
target=self.read_queue,
args=(self.queue, self.event_stop, self.event_started))
self._thread_ip_monitor.start()
self._thread_read_queue.start()
self.handle_initial_state()
for iterable in self.monitor:
self.parse_and_handle_event(iterable)
self._thread_read_queue.join()
def parse_and_handle_event(self, iterable):
try:
event = ip_monitor.IPMonitorEvent.from_text(iterable)
if event.interface == self.interface and event.cidr == self.cidr:
new_state = 'master' if event.added else 'backup'
def read_queue(self, _queue, event_stop, event_started):
event_started.wait()
while not event_stop.is_set():
try:
event = _queue.get(timeout=2)
except queue.Empty:
event = None
if not event:
continue
if event['name'] == self.interface and event['cidr'] == self.cidr:
new_state = 'master' if event['event'] == 'added' else 'backup'
self.write_state_change(new_state)
self.notify_agent(new_state)
elif event.interface != self.interface and event.added:
elif event['name'] != self.interface and event['event'] == 'added':
# Send GARPs for all new router interfaces.
# REVISIT(jlibosva): keepalived versions 1.2.19 and below
# contain bug where gratuitous ARPs are not sent on receiving
@ -90,9 +96,6 @@ class MonitorDaemon(daemon.Daemon):
# packaged in some distributions (RHEL/CentOS/Ubuntu Xenial).
# Remove this code once new keepalived versions are available.
self.send_garp(event)
except Exception:
LOG.exception('Failed to process or handle event for line %s',
iterable)
def handle_initial_state(self):
try:
@ -133,26 +136,19 @@ class MonitorDaemon(daemon.Daemon):
def send_garp(self, event):
"""Send gratuitous ARP for given event."""
ip_address = str(netaddr.IPNetwork(event['cidr']).ip)
ip_lib.send_ip_addr_adv_notif(
self.namespace,
event.interface,
str(netaddr.IPNetwork(event.cidr).ip),
event['name'],
ip_address,
log_exception=False
)
def _kill_monitor(self):
if self.monitor:
# Kill PID instead of calling self.monitor.stop() because the ip
# monitor is running as root while keepalived-state-change is not
# (dropped privileges after launching the ip monitor) and will fail
# with "Permission denied". Also, we can safely do this because the
# monitor was launched with respawn_interval=None so it won't be
# automatically respawned
agent_utils.kill_process(self.monitor.pid, signal.SIGKILL,
run_as_root=True)
LOG.debug('Sent GARP to %(ip_address)s from %(device_name)s',
{'ip_address': ip_address, 'device_name': event['name']})
def handle_sigterm(self, signum, frame):
self._kill_monitor()
self.event_stop.set()
self._thread_read_queue.join(timeout=5)
super(MonitorDaemon, self).handle_sigterm(signum, frame)
@ -162,7 +158,7 @@ def configure(conf):
conf.set_override('debug', True)
conf.set_override('use_syslog', True)
config.setup_logging()
agent_config.setup_privsep()
privileged.default.set_client_mode(False)
def main():

View File

@ -1,86 +0,0 @@
# Copyright 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from oslo_utils import excutils
from neutron.agent.common import async_process
from neutron.agent.linux import ip_lib
LOG = logging.getLogger(__name__)
class IPMonitorEvent(object):
def __init__(self, line, added, interface, cidr):
self.line = line
self.added = added
self.interface = interface
self.cidr = cidr
def __str__(self):
return self.line
@classmethod
def from_text(cls, line):
route = line.split()
try:
first_word = route[0]
except IndexError:
with excutils.save_and_reraise_exception():
LOG.error('Unable to parse route "%s"', line)
added = (first_word != 'Deleted')
if not added:
route = route[1:]
try:
interface = ip_lib.remove_interface_suffix(route[1])
cidr = route[3]
except IndexError:
with excutils.save_and_reraise_exception():
LOG.error('Unable to parse route "%s"', line)
return cls(line, added, interface, cidr)
class IPMonitor(async_process.AsyncProcess):
"""Wrapper over `ip monitor address`.
To monitor and react indefinitely:
m = IPMonitor(namespace='tmp', root_as_root=True)
m.start()
for iterable in m:
event = IPMonitorEvent.from_text(iterable)
print(event, event.added, event.interface, event.cidr)
"""
def __init__(self,
namespace=None,
run_as_root=True,
respawn_interval=None):
super(IPMonitor, self).__init__(['ip', '-o', 'monitor', 'address'],
run_as_root=run_as_root,
respawn_interval=respawn_interval,
namespace=namespace)
def __iter__(self):
return self.iter_stdout(block=True)
def start(self):
super(IPMonitor, self).start(block=True)
def stop(self):
super(IPMonitor, self).stop(block=True)

View File

@ -0,0 +1,22 @@
#!/usr/bin/python
# Copyright (c) 2015 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from neutron.agent.l3.keepalived_state_change import main
if __name__ == "__main__":
sys.exit(main())

View File

@ -18,13 +18,14 @@ import os
import eventlet
import mock
import netaddr
from oslo_config import fixture as fixture_config
from oslo_utils import uuidutils
from neutron.agent.l3 import keepalived_state_change
from neutron.agent.l3 import ha
from neutron.agent.l3 import ha_router
from neutron.agent.linux import external_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils as linux_utils
from neutron.common import utils
from neutron.conf.agent.l3 import keepalived as kd
from neutron.tests.common import machine_fixtures as mf
from neutron.tests.common import net_helpers
from neutron.tests.functional import base
@ -37,104 +38,71 @@ def has_expected_arp_entry(device_name, namespace, ip, mac):
return entry != []
class TestKeepalivedStateChange(base.BaseSudoTestCase):
def setUp(self):
super(TestKeepalivedStateChange, self).setUp()
self.conf_fixture = self.useFixture(fixture_config.Config())
kd.register_l3_agent_keepalived_opts(self.conf_fixture)
self.router_id = uuidutils.generate_uuid()
self.conf_dir = self.get_default_temp_dir().path
self.cidr = '169.254.128.1/24'
self.interface_name = utils.get_rand_name()
self.monitor = keepalived_state_change.MonitorDaemon(
self.get_temp_file_path('monitor.pid'),
self.router_id,
1,
2,
utils.get_rand_name(),
self.conf_dir,
self.interface_name,
self.cidr)
mock.patch.object(self.monitor, 'notify_agent').start()
self.line = '1: %s inet %s' % (self.interface_name, self.cidr)
def test_parse_and_handle_event_wrong_device_completes_without_error(self):
self.monitor.parse_and_handle_event(
'1: wrong_device inet wrong_cidr')
def _get_state(self):
with open(os.path.join(self.monitor.conf_dir, 'state')) as state_file:
return state_file.read()
def test_parse_and_handle_event_writes_to_file(self):
self.monitor.parse_and_handle_event('Deleted %s' % self.line)
self.assertEqual('backup', self._get_state())
self.monitor.parse_and_handle_event(self.line)
self.assertEqual('master', self._get_state())
def test_parse_and_handle_event_fails_writing_state(self):
with mock.patch.object(
self.monitor, 'write_state_change', side_effect=OSError):
self.monitor.parse_and_handle_event(self.line)
def test_parse_and_handle_event_fails_notifying_agent(self):
with mock.patch.object(
self.monitor, 'notify_agent', side_effect=Exception):
self.monitor.parse_and_handle_event(self.line)
def test_handle_initial_state_backup(self):
ip = ip_lib.IPWrapper(namespace=self.monitor.namespace)
ip.netns.add(self.monitor.namespace)
self.addCleanup(ip.netns.delete, self.monitor.namespace)
ip.add_dummy(self.interface_name)
with mock.patch.object(
self.monitor, 'write_state_change') as write_state_change,\
mock.patch.object(
self.monitor, 'notify_agent') as notify_agent:
self.monitor.handle_initial_state()
write_state_change.assert_not_called()
notify_agent.assert_not_called()
def test_handle_initial_state_master(self):
ip = ip_lib.IPWrapper(namespace=self.monitor.namespace)
ip.netns.add(self.monitor.namespace)
self.addCleanup(ip.netns.delete, self.monitor.namespace)
ha_interface = ip.add_dummy(self.interface_name)
ha_interface.addr.add(self.cidr)
self.monitor.handle_initial_state()
self.assertEqual('master', self._get_state())
class TestMonitorDaemon(base.BaseSudoTestCase):
class TestMonitorDaemon(base.BaseLoggingTestCase):
def setUp(self):
super(TestMonitorDaemon, self).setUp()
self.conf_dir = self.get_default_temp_dir().path
self.pid_file = os.path.join(self.conf_dir, 'pid_file')
self.log_file = os.path.join(self.conf_dir, 'log_file')
self.state_file = os.path.join(self.conf_dir,
'keepalived-state-change')
self.cidr = '169.254.151.1/24'
bridge = self.useFixture(net_helpers.OVSBridgeFixture()).bridge
self.machines = self.useFixture(mf.PeerMachines(bridge))
self.router, self.peer = self.machines.machines[:2]
self.router_id = uuidutils.generate_uuid()
conf_dir = self.get_default_temp_dir().path
monitor = keepalived_state_change.MonitorDaemon(
self.get_temp_file_path('monitor.pid'),
uuidutils.generate_uuid(),
1,
2,
self.router.namespace,
conf_dir,
'foo-iface',
self.machines.ip_cidr
)
eventlet.spawn_n(monitor.run, run_as_root=True)
monitor_started = functools.partial(
lambda mon: mon.monitor is not None, monitor)
utils.wait_until_true(monitor_started)
self.addCleanup(monitor.monitor.stop)
self.cmd_opts = [
ha_router.STATE_CHANGE_PROC_NAME,
'--router_id=%s' % self.router_id,
'--namespace=%s' % self.router.namespace,
'--conf_dir=%s' % self.conf_dir,
'--log-file=%s' % self.log_file,
'--monitor_interface=%s' % self.router.port.name,
'--monitor_cidr=%s' % self.cidr,
'--pid_file=%s' % self.pid_file,
'--state_path=%s' % self.conf_dir,
'--user=%s' % os.geteuid(),
'--group=%s' % os.getegid()
]
self.ext_process = external_process.ProcessManager(
None, '%s.monitor' % self.pid_file, None, service='test_ip_mon',
pids_path=self.conf_dir, default_cmd_callback=self._callback,
run_as_root=True)
server = linux_utils.UnixDomainWSGIServer(
'neutron-keepalived-state-change', num_threads=1)
server.start(ha.KeepalivedStateChangeHandler(mock.Mock()),
self.state_file, workers=0,
backlog=ha.KEEPALIVED_STATE_CHANGE_SERVER_BACKLOG)
self.addCleanup(server.stop)
def _run_monitor(self):
self.ext_process.enable()
self.addCleanup(self.ext_process.disable)
eventlet.sleep(5)
def _callback(self, *args):
return self.cmd_opts
def _search_in_file(self, file_name, text):
def text_in_file():
try:
return text in open(file_name).read()
except FileNotFoundError:
return False
try:
utils.wait_until_true(text_in_file, timeout=15)
except utils.WaitTimeout:
# NOTE: we need to read here the content of the file.
raise RuntimeError(
'Text not found in file %(file_name)s: "%(text)s". File '
'content: %(file_content)s' %
{'file_name': file_name, 'text': text,
'file_content': open(file_name).read()})
def test_new_fip_sends_garp(self):
self._run_monitor()
next_ip_cidr = net_helpers.increment_ip_cidr(self.machines.ip_cidr, 2)
expected_ip = str(netaddr.IPNetwork(next_ip_cidr).ip)
# Create incomplete ARP entry
@ -159,6 +127,42 @@ class TestMonitorDaemon(base.BaseSudoTestCase):
self.peer.namespace,
expected_ip,
self.router.port.link.address))
utils.wait_until_true(
has_arp_entry_predicate,
exception=exc)
utils.wait_until_true(has_arp_entry_predicate, timeout=15,
exception=exc)
def test_read_queue_change_state(self):
self._run_monitor()
msg = 'Wrote router %s state %s'
self.router.port.addr.add(self.cidr)
self._search_in_file(self.log_file, msg % (self.router_id, 'master'))
self.router.port.addr.delete(self.cidr)
self._search_in_file(self.log_file, msg % (self.router_id, 'backup'))
def test_read_queue_send_garp(self):
self._run_monitor()
dev_dummy = 'dev_dummy'
ip_wrapper = ip_lib.IPWrapper(namespace=self.router.namespace)
ip_wrapper.add_dummy(dev_dummy)
ip_device = ip_lib.IPDevice(dev_dummy, namespace=self.router.namespace)
ip_device.link.set_up()
msg = 'Sent GARP to %(ip_address)s from %(device_name)s'
for idx in range(2, 20):
next_cidr = net_helpers.increment_ip_cidr(self.cidr, idx)
ip_device.addr.add(next_cidr)
msg_args = {'ip_address': str(netaddr.IPNetwork(next_cidr).ip),
'device_name': dev_dummy}
self._search_in_file(self.log_file, msg % msg_args)
ip_device.addr.delete(next_cidr)
def test_handle_initial_state_backup(self):
# No tracked IP (self.cidr) is configured in the monitored interface
# (self.router.port)
self._run_monitor()
msg = 'Initial status of router %s is %s' % (self.router_id, 'backup')
self._search_in_file(self.log_file, msg)
def test_handle_initial_state_master(self):
self.router.port.addr.add(self.cidr)
self._run_monitor()
msg = 'Initial status of router %s is %s' % (self.router_id, 'master')
self._search_in_file(self.log_file, msg)

View File

@ -1,67 +0,0 @@
# Copyright 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.common import async_process
from neutron.agent.linux import ip_monitor
from neutron.tests.functional.agent.linux import test_ip_lib
class TestIPMonitor(test_ip_lib.IpLibTestFramework):
def setUp(self):
super(TestIPMonitor, self).setUp()
attr = self.generate_device_details()
self.device = self.manage_device(attr)
self.monitor = ip_monitor.IPMonitor(attr.namespace)
self.addCleanup(self._safe_stop_monitor)
def _safe_stop_monitor(self):
try:
self.monitor.stop()
except async_process.AsyncProcessException:
pass
def test_ip_monitor_lifecycle(self):
self.assertFalse(self.monitor.is_active())
self.monitor.start()
self.assertTrue(self.monitor.is_active())
self.monitor.stop()
self.assertFalse(self.monitor.is_active())
def test_ip_monitor_events(self):
self.monitor.start()
cidr = '169.254.128.1/24'
self.device.addr.add(cidr)
self._assert_event(expected_name=self.device.name,
expected_cidr=cidr,
expected_added=True,
event=ip_monitor.IPMonitorEvent.from_text(
next(self.monitor.iter_stdout(block=True))))
self.device.addr.delete(cidr)
self._assert_event(expected_name=self.device.name,
expected_cidr=cidr,
expected_added=False,
event=ip_monitor.IPMonitorEvent.from_text(
next(self.monitor.iter_stdout(block=True))))
def _assert_event(self,
expected_name,
expected_cidr,
expected_added,
event):
self.assertEqual(expected_name, event.interface)
self.assertEqual(expected_added, event.added)
self.assertEqual(expected_cidr, event.cidr)

View File

@ -1,36 +0,0 @@
# Copyright 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.linux import ip_monitor
from neutron.tests import base
class TestIPMonitorEvent(base.BaseTestCase):
def test_from_text_parses_added_line(self):
event = ip_monitor.IPMonitorEvent.from_text(
'3: wlp3s0 inet 192.168.3.59/24 brd 192.168.3.255 '
r'scope global dynamic wlp3s0\ valid_lft 300sec '
'preferred_lft 300sec')
self.assertEqual('wlp3s0', event.interface)
self.assertTrue(event.added)
self.assertEqual('192.168.3.59/24', event.cidr)
def test_from_text_parses_deleted_line(self):
event = ip_monitor.IPMonitorEvent.from_text(
'Deleted 1: lo inet 127.0.0.2/8 scope host secondary lo\''
' valid_lft forever preferred_lft forever')
self.assertEqual('lo', event.interface)
self.assertFalse(event.added)
self.assertEqual('127.0.0.2/8', event.cidr)