Merge "Add ip_monitor command implemented using Pyroute2"

This commit is contained in:
Zuul 2019-06-07 13:36:03 +00:00 committed by Gerrit Code Review
commit 6ceba7aa47
4 changed files with 290 additions and 19 deletions

View File

@ -24,6 +24,7 @@ from neutron_lib import exceptions
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from pyroute2.netlink import exceptions as netlink_exceptions
from pyroute2.netlink import rtnl
from pyroute2.netlink.rtnl import ifaddrmsg
from pyroute2.netlink.rtnl import ifinfmsg
@ -65,6 +66,9 @@ IP_ADDRESS_SCOPE = {rtnl.rtscopes['RT_SCOPE_UNIVERSE']: 'global',
IP_ADDRESS_SCOPE_NAME = {v: k for k, v in IP_ADDRESS_SCOPE.items()}
IP_ADDRESS_EVENTS = {'RTM_NEWADDR': 'added',
'RTM_DELADDR': 'removed'}
SYS_NET_PATH = '/sys/class/net'
DEFAULT_GW_PATTERN = re.compile(r"via (\S+)")
METRIC_PATTERN = re.compile(r"metric (\S+)")
@ -1374,6 +1378,26 @@ def get_attr(pyroute2_obj, attr_name):
return attr[1]
def _parse_ip_address(pyroute2_address, device_name):
ip = get_attr(pyroute2_address, 'IFA_ADDRESS')
ip_length = pyroute2_address['prefixlen']
event = IP_ADDRESS_EVENTS.get(pyroute2_address.get('event'))
cidr = common_utils.ip_to_cidr(ip, prefix=ip_length)
flags = get_attr(pyroute2_address, 'IFA_FLAGS')
dynamic = not bool(flags & ifaddrmsg.IFA_F_PERMANENT)
tentative = bool(flags & ifaddrmsg.IFA_F_TENTATIVE)
dadfailed = bool(flags & ifaddrmsg.IFA_F_DADFAILED)
scope = IP_ADDRESS_SCOPE[pyroute2_address['scope']]
return {'name': device_name,
'cidr': cidr,
'scope': scope,
'broadcast': get_attr(pyroute2_address, 'IFA_BROADCAST'),
'dynamic': dynamic,
'tentative': tentative,
'dadfailed': dadfailed,
'event': event}
def _parse_link_device(namespace, device, **kwargs):
"""Parse pytoute2 link device information
@ -1387,21 +1411,7 @@ def _parse_link_device(namespace, device, **kwargs):
index=device['index'],
**kwargs)
for ip_address in ip_addresses:
ip = get_attr(ip_address, 'IFA_ADDRESS')
ip_length = ip_address['prefixlen']
cidr = common_utils.ip_to_cidr(ip, prefix=ip_length)
flags = get_attr(ip_address, 'IFA_FLAGS')
dynamic = not bool(flags & ifaddrmsg.IFA_F_PERMANENT)
tentative = bool(flags & ifaddrmsg.IFA_F_TENTATIVE)
dadfailed = bool(flags & ifaddrmsg.IFA_F_DADFAILED)
scope = IP_ADDRESS_SCOPE[ip_address['scope']]
retval.append({'name': name,
'cidr': cidr,
'scope': scope,
'broadcast': get_attr(ip_address, 'IFA_BROADCAST'),
'dynamic': dynamic,
'tentative': tentative,
'dadfailed': dadfailed})
retval.append(_parse_ip_address(ip_address, name))
return retval
@ -1455,3 +1465,53 @@ def get_devices_info(namespace, **kwargs):
retval[device['vxlan_link_index']]['name'])
return list(retval.values())
def ip_monitor(namespace, queue, event_stop, event_started):
"""Monitor IP address changes
If namespace is not None, this function must be executed as root user, but
cannot use privsep because is a blocking function and can exhaust the
number of working threads.
"""
def get_device_name(ip, index):
try:
device = ip.link('get', index=index)
if device:
attrs = device[0].get('attrs', [])
for attr in (attr for attr in attrs
if attr[0] == 'IFLA_IFNAME'):
return attr[1]
except netlink_exceptions.NetlinkError as e:
if e.code == errno.ENODEV:
return
raise
try:
with privileged.get_iproute(namespace) as ip:
ip.bind()
cache_devices = {}
for device in ip.get_links():
cache_devices[device['index']] = get_attr(device,
'IFLA_IFNAME')
event_started.send()
while not event_stop.ready():
eventlet.sleep(0)
ip_address = []
with common_utils.Timer(timeout=2, raise_exception=False):
ip_address = ip.get()
if not ip_address:
continue
if 'index' in ip_address[0] and 'prefixlen' in ip_address[0]:
index = ip_address[0]['index']
name = (get_device_name(ip, index) or
cache_devices.get(index))
if not name:
continue
cache_devices[index] = name
queue.put(_parse_ip_address(ip_address[0], name))
except OSError as e:
if e.errno == errno.ENOENT:
raise privileged.NetworkNamespaceNotFound(netns_name=namespace)
raise

View File

@ -0,0 +1,69 @@
#! /usr/bin/env python
# Copyright (c) 2019 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import signal
import sys
import eventlet
from eventlet import queue
from oslo_serialization import jsonutils
from neutron.agent.linux import ip_lib
EVENT_STOP = eventlet.Event()
EVENT_STARTED = eventlet.Event()
POOL = eventlet.GreenPool(2)
def sigterm_handler(_signo, _stack_frame):
global EVENT_STOP
global POOL
EVENT_STOP.send()
POOL.waitall()
exit(0)
signal.signal(signal.SIGTERM, sigterm_handler)
def read_queue(temp_file, _queue, event_stop, event_started):
event_started.wait()
with open(temp_file, 'w') as f:
f.write('')
while not event_stop.ready():
eventlet.sleep(0)
try:
retval = _queue.get(timeout=2)
except eventlet.queue.Empty:
retval = None
if retval:
with open(temp_file, 'a+') as f:
f.write(jsonutils.dumps(retval) + '\n')
def main(temp_file, namespace):
global POOL
namespace = None if namespace == 'None' else namespace
_queue = queue.Queue()
POOL.spawn(ip_lib.ip_monitor, namespace, _queue, EVENT_STOP, EVENT_STARTED)
POOL.spawn(read_queue, temp_file, _queue, EVENT_STOP, EVENT_STARTED)
POOL.waitall()
if __name__ == "__main__":
main(sys.argv[1], sys.argv[2])

View File

@ -14,21 +14,26 @@
# under the License.
import collections
import signal
import netaddr
from neutron_lib import constants
from neutron_lib.utils import net
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import importutils
from oslo_utils import uuidutils
import testscenarios
import testtools
from neutron.agent.common import async_process
from neutron.agent.linux import ip_lib
from neutron.common import utils
from neutron.conf.agent import common as config
from neutron.privileged.agent.linux import ip_lib as priv_ip_lib
from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux.bin import ip_monitor
from neutron.tests.functional import base as functional_base
LOG = logging.getLogger(__name__)
@ -661,3 +666,139 @@ class NamespaceTestCase(functional_base.BaseSudoTestCase):
def test_network_namespace_exists_ns_doesnt_exists_try_is_ready(self):
self.assertFalse(ip_lib.network_namespace_exists('another_ns',
try_is_ready=True))
class IpMonitorTestCase(testscenarios.WithScenarios,
functional_base.BaseLoggingTestCase):
scenarios = [
('namespace', {'namespace': 'ns_' + uuidutils.generate_uuid()}),
('no_namespace', {'namespace': None})
]
def setUp(self):
super(IpMonitorTestCase, self).setUp()
self.addCleanup(self._cleanup)
if self.namespace:
priv_ip_lib.create_netns(self.namespace)
self.devices = [('int_' + uuidutils.generate_uuid())[
:constants.DEVICE_NAME_MAX_LEN] for _ in range(5)]
self.ip_wrapper = ip_lib.IPWrapper(self.namespace)
self.temp_file = self.get_temp_file_path('out_' + self.devices[0] +
'.tmp')
self.proc = self._run_ip_monitor(ip_monitor)
def _cleanup(self):
self.proc.stop(block=True, kill_signal=signal.SIGTERM)
if self.namespace:
priv_ip_lib.remove_netns(self.namespace)
else:
for device in self.devices:
try:
priv_ip_lib.delete_interface(device, self.namespace)
except priv_ip_lib.NetworkInterfaceNotFound:
pass
@staticmethod
def _normalize_module_name(name):
for suf in ['.pyc', '.pyo']:
if name.endswith(suf):
return name[:-len(suf)] + '.py'
return name
def _run_ip_monitor(self, module):
executable = self._normalize_module_name(module.__file__)
proc = async_process.AsyncProcess(
[executable, self.temp_file, str(self.namespace)],
run_as_root=True)
proc.start(block=True)
return proc
def _read_file(self, ip_addresses):
try:
registers = []
with open(self.temp_file, 'r') as f:
data = f.read()
for line in data.splitlines():
register = jsonutils.loads(line)
registers.append({'name': register['name'],
'cidr': register['cidr'],
'event': register['event']})
for ip_address in ip_addresses:
if ip_address not in registers:
return False
return True
except (OSError, IOError) as e:
return False
def _check_read_file(self, ip_addresses):
try:
utils.wait_until_true(lambda: self._read_file(ip_addresses),
timeout=30)
except utils.WaitTimeout:
with open(self.temp_file, 'r') as f:
registers = f.read()
self.fail('Defined IP addresses: %s, IP addresses registered: %s' %
(ip_addresses, registers))
def _handle_ip_addresses(self, event, ip_addresses):
for ip_address in (_ip for _ip in ip_addresses
if _ip['event'] == event):
ip_device = ip_lib.IPDevice(ip_address['name'], self.namespace)
if event == 'removed':
ip_device.addr.delete(ip_address['cidr'])
if event == 'added':
ip_device.addr.add(ip_address['cidr'])
def test_add_remove_ip_address_and_interface(self):
for device in self.devices:
self.ip_wrapper.add_dummy(device)
utils.wait_until_true(lambda: self._read_file({}), timeout=30)
ip_addresses = [
{'cidr': '192.168.250.1/24', 'event': 'added',
'name': self.devices[0]},
{'cidr': '192.168.250.2/24', 'event': 'added',
'name': self.devices[1]},
{'cidr': '192.168.250.3/24', 'event': 'added',
'name': self.devices[2]},
{'cidr': '192.168.250.10/24', 'event': 'added',
'name': self.devices[3]},
{'cidr': '192.168.250.10/24', 'event': 'removed',
'name': self.devices[3]},
{'cidr': '2001:db8::1/64', 'event': 'added',
'name': self.devices[4]},
{'cidr': '2001:db8::2/64', 'event': 'added',
'name': self.devices[4]}]
self._handle_ip_addresses('added', ip_addresses)
self._handle_ip_addresses('removed', ip_addresses)
self._check_read_file(ip_addresses)
ip_device = ip_lib.IPDevice(self.devices[4], self.namespace)
ip_device.link.delete()
ip_addresses = [
{'cidr': '2001:db8::1/64', 'event': 'removed',
'name': self.devices[4]},
{'cidr': '2001:db8::2/64', 'event': 'removed',
'name': self.devices[4]}]
self._check_read_file(ip_addresses)
def test_interface_added_after_initilization(self):
for device in self.devices[:len(self.devices) - 1]:
self.ip_wrapper.add_dummy(device)
utils.wait_until_true(lambda: self._read_file({}), timeout=30)
ip_addresses = [
{'cidr': '192.168.250.21/24', 'event': 'added',
'name': self.devices[0]},
{'cidr': '192.168.250.22/24', 'event': 'added',
'name': self.devices[1]}]
self._handle_ip_addresses('added', ip_addresses)
self._check_read_file(ip_addresses)
self.ip_wrapper.add_dummy(self.devices[-1])
ip_addresses.append({'cidr': '192.168.250.23/24', 'event': 'added',
'name': self.devices[-1]})
self._handle_ip_addresses('added', [ip_addresses[-1]])
self._check_read_file(ip_addresses)

View File

@ -1890,20 +1890,21 @@ class ParseLinkDeviceTestCase(base.BaseTestCase):
def test_parse_link_devices(self):
device = ({'index': 1, 'attrs': [['IFLA_IFNAME', 'int_name']]})
self.mock_get_ip_addresses.return_value = [
{'prefixlen': 24, 'scope': 200, 'attrs': [
{'prefixlen': 24, 'scope': 200, 'event': 'RTM_NEWADDR', 'attrs': [
['IFA_ADDRESS', '192.168.10.20'],
['IFA_FLAGS', ifaddrmsg.IFA_F_PERMANENT]]},
{'prefixlen': 64, 'scope': 200, 'attrs': [
{'prefixlen': 64, 'scope': 200, 'event': 'RTM_DELADDR', 'attrs': [
['IFA_ADDRESS', '2001:db8::1'],
['IFA_FLAGS', ifaddrmsg.IFA_F_PERMANENT]]}]
retval = ip_lib._parse_link_device('namespace', device)
expected = [{'scope': 'site', 'cidr': '192.168.10.20/24',
'dynamic': False, 'dadfailed': False, 'name': 'int_name',
'broadcast': None, 'tentative': False},
'broadcast': None, 'tentative': False, 'event': 'added'},
{'scope': 'site', 'cidr': '2001:db8::1/64',
'dynamic': False, 'dadfailed': False, 'name': 'int_name',
'broadcast': None, 'tentative': False}]
'broadcast': None, 'tentative': False,
'event': 'removed'}]
self.assertEqual(expected, retval)