Add ip_monitor command implemented using Pyroute2

This method allows to track any IP address change in a
namespace. In future patches, this method will replace
the current IP monitor used in the keepalived_state_change
daemon. The current implementation relays in a spawned shell,
executed in root mode, and the output of this shell,
conveniently parsed.

If the passed namespace is not None, this new method must
be executed in privileged mode (root user), but cannot use
privsep because is a blocking function and can exhaust the
number of working threads.

This function should be executed in a parallel thread, returning
the data using the eventlet queue. Pyroute does not implement yet
a non blocking method to retrieve the command output or to know if
the buffer has data. This method, spawned in a greenthread, must be
stopped by killing this thread.

An example of how to use it can be found in the functional tests
implemented in this patch.

Change-Id: I86e4487035d60e1b52e951dd3cd50d6bb54f388b
Related-Bug: #1680183
This commit is contained in:
Rodolfo Alonso Hernandez 2019-05-21 10:48:42 +00:00
parent 6ea01444dd
commit a477c31a23
4 changed files with 290 additions and 19 deletions

View File

@ -24,6 +24,7 @@ from neutron_lib import exceptions
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from pyroute2.netlink import exceptions as netlink_exceptions
from pyroute2.netlink import rtnl
from pyroute2.netlink.rtnl import ifaddrmsg
from pyroute2.netlink.rtnl import ifinfmsg
@ -65,6 +66,9 @@ IP_ADDRESS_SCOPE = {rtnl.rtscopes['RT_SCOPE_UNIVERSE']: 'global',
IP_ADDRESS_SCOPE_NAME = {v: k for k, v in IP_ADDRESS_SCOPE.items()}
IP_ADDRESS_EVENTS = {'RTM_NEWADDR': 'added',
'RTM_DELADDR': 'removed'}
SYS_NET_PATH = '/sys/class/net'
DEFAULT_GW_PATTERN = re.compile(r"via (\S+)")
METRIC_PATTERN = re.compile(r"metric (\S+)")
@ -1374,6 +1378,26 @@ def get_attr(pyroute2_obj, attr_name):
return attr[1]
def _parse_ip_address(pyroute2_address, device_name):
ip = get_attr(pyroute2_address, 'IFA_ADDRESS')
ip_length = pyroute2_address['prefixlen']
event = IP_ADDRESS_EVENTS.get(pyroute2_address.get('event'))
cidr = common_utils.ip_to_cidr(ip, prefix=ip_length)
flags = get_attr(pyroute2_address, 'IFA_FLAGS')
dynamic = not bool(flags & ifaddrmsg.IFA_F_PERMANENT)
tentative = bool(flags & ifaddrmsg.IFA_F_TENTATIVE)
dadfailed = bool(flags & ifaddrmsg.IFA_F_DADFAILED)
scope = IP_ADDRESS_SCOPE[pyroute2_address['scope']]
return {'name': device_name,
'cidr': cidr,
'scope': scope,
'broadcast': get_attr(pyroute2_address, 'IFA_BROADCAST'),
'dynamic': dynamic,
'tentative': tentative,
'dadfailed': dadfailed,
'event': event}
def _parse_link_device(namespace, device, **kwargs):
"""Parse pytoute2 link device information
@ -1387,21 +1411,7 @@ def _parse_link_device(namespace, device, **kwargs):
index=device['index'],
**kwargs)
for ip_address in ip_addresses:
ip = get_attr(ip_address, 'IFA_ADDRESS')
ip_length = ip_address['prefixlen']
cidr = common_utils.ip_to_cidr(ip, prefix=ip_length)
flags = get_attr(ip_address, 'IFA_FLAGS')
dynamic = not bool(flags & ifaddrmsg.IFA_F_PERMANENT)
tentative = bool(flags & ifaddrmsg.IFA_F_TENTATIVE)
dadfailed = bool(flags & ifaddrmsg.IFA_F_DADFAILED)
scope = IP_ADDRESS_SCOPE[ip_address['scope']]
retval.append({'name': name,
'cidr': cidr,
'scope': scope,
'broadcast': get_attr(ip_address, 'IFA_BROADCAST'),
'dynamic': dynamic,
'tentative': tentative,
'dadfailed': dadfailed})
retval.append(_parse_ip_address(ip_address, name))
return retval
@ -1455,3 +1465,53 @@ def get_devices_info(namespace, **kwargs):
retval[device['vxlan_link_index']]['name'])
return list(retval.values())
def ip_monitor(namespace, queue, event_stop, event_started):
"""Monitor IP address changes
If namespace is not None, this function must be executed as root user, but
cannot use privsep because is a blocking function and can exhaust the
number of working threads.
"""
def get_device_name(ip, index):
try:
device = ip.link('get', index=index)
if device:
attrs = device[0].get('attrs', [])
for attr in (attr for attr in attrs
if attr[0] == 'IFLA_IFNAME'):
return attr[1]
except netlink_exceptions.NetlinkError as e:
if e.code == errno.ENODEV:
return
raise
try:
with privileged.get_iproute(namespace) as ip:
ip.bind()
cache_devices = {}
for device in ip.get_links():
cache_devices[device['index']] = get_attr(device,
'IFLA_IFNAME')
event_started.send()
while not event_stop.ready():
eventlet.sleep(0)
ip_address = []
with common_utils.Timer(timeout=2, raise_exception=False):
ip_address = ip.get()
if not ip_address:
continue
if 'index' in ip_address[0] and 'prefixlen' in ip_address[0]:
index = ip_address[0]['index']
name = (get_device_name(ip, index) or
cache_devices.get(index))
if not name:
continue
cache_devices[index] = name
queue.put(_parse_ip_address(ip_address[0], name))
except OSError as e:
if e.errno == errno.ENOENT:
raise privileged.NetworkNamespaceNotFound(netns_name=namespace)
raise

View File

@ -0,0 +1,69 @@
#! /usr/bin/env python
# Copyright (c) 2019 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import signal
import sys
import eventlet
from eventlet import queue
from oslo_serialization import jsonutils
from neutron.agent.linux import ip_lib
EVENT_STOP = eventlet.Event()
EVENT_STARTED = eventlet.Event()
POOL = eventlet.GreenPool(2)
def sigterm_handler(_signo, _stack_frame):
global EVENT_STOP
global POOL
EVENT_STOP.send()
POOL.waitall()
exit(0)
signal.signal(signal.SIGTERM, sigterm_handler)
def read_queue(temp_file, _queue, event_stop, event_started):
event_started.wait()
with open(temp_file, 'w') as f:
f.write('')
while not event_stop.ready():
eventlet.sleep(0)
try:
retval = _queue.get(timeout=2)
except eventlet.queue.Empty:
retval = None
if retval:
with open(temp_file, 'a+') as f:
f.write(jsonutils.dumps(retval) + '\n')
def main(temp_file, namespace):
global POOL
namespace = None if namespace == 'None' else namespace
_queue = queue.Queue()
POOL.spawn(ip_lib.ip_monitor, namespace, _queue, EVENT_STOP, EVENT_STARTED)
POOL.spawn(read_queue, temp_file, _queue, EVENT_STOP, EVENT_STARTED)
POOL.waitall()
if __name__ == "__main__":
main(sys.argv[1], sys.argv[2])

View File

@ -14,21 +14,26 @@
# under the License.
import collections
import signal
import netaddr
from neutron_lib import constants
from neutron_lib.utils import net
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import importutils
from oslo_utils import uuidutils
import testscenarios
import testtools
from neutron.agent.common import async_process
from neutron.agent.linux import ip_lib
from neutron.common import utils
from neutron.conf.agent import common as config
from neutron.privileged.agent.linux import ip_lib as priv_ip_lib
from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux.bin import ip_monitor
from neutron.tests.functional import base as functional_base
LOG = logging.getLogger(__name__)
@ -661,3 +666,139 @@ class NamespaceTestCase(functional_base.BaseSudoTestCase):
def test_network_namespace_exists_ns_doesnt_exists_try_is_ready(self):
self.assertFalse(ip_lib.network_namespace_exists('another_ns',
try_is_ready=True))
class IpMonitorTestCase(testscenarios.WithScenarios,
functional_base.BaseLoggingTestCase):
scenarios = [
('namespace', {'namespace': 'ns_' + uuidutils.generate_uuid()}),
('no_namespace', {'namespace': None})
]
def setUp(self):
super(IpMonitorTestCase, self).setUp()
self.addCleanup(self._cleanup)
if self.namespace:
priv_ip_lib.create_netns(self.namespace)
self.devices = [('int_' + uuidutils.generate_uuid())[
:constants.DEVICE_NAME_MAX_LEN] for _ in range(5)]
self.ip_wrapper = ip_lib.IPWrapper(self.namespace)
self.temp_file = self.get_temp_file_path('out_' + self.devices[0] +
'.tmp')
self.proc = self._run_ip_monitor(ip_monitor)
def _cleanup(self):
self.proc.stop(block=True, kill_signal=signal.SIGTERM)
if self.namespace:
priv_ip_lib.remove_netns(self.namespace)
else:
for device in self.devices:
try:
priv_ip_lib.delete_interface(device, self.namespace)
except priv_ip_lib.NetworkInterfaceNotFound:
pass
@staticmethod
def _normalize_module_name(name):
for suf in ['.pyc', '.pyo']:
if name.endswith(suf):
return name[:-len(suf)] + '.py'
return name
def _run_ip_monitor(self, module):
executable = self._normalize_module_name(module.__file__)
proc = async_process.AsyncProcess(
[executable, self.temp_file, str(self.namespace)],
run_as_root=True)
proc.start(block=True)
return proc
def _read_file(self, ip_addresses):
try:
registers = []
with open(self.temp_file, 'r') as f:
data = f.read()
for line in data.splitlines():
register = jsonutils.loads(line)
registers.append({'name': register['name'],
'cidr': register['cidr'],
'event': register['event']})
for ip_address in ip_addresses:
if ip_address not in registers:
return False
return True
except (OSError, IOError) as e:
return False
def _check_read_file(self, ip_addresses):
try:
utils.wait_until_true(lambda: self._read_file(ip_addresses),
timeout=30)
except utils.WaitTimeout:
with open(self.temp_file, 'r') as f:
registers = f.read()
self.fail('Defined IP addresses: %s, IP addresses registered: %s' %
(ip_addresses, registers))
def _handle_ip_addresses(self, event, ip_addresses):
for ip_address in (_ip for _ip in ip_addresses
if _ip['event'] == event):
ip_device = ip_lib.IPDevice(ip_address['name'], self.namespace)
if event == 'removed':
ip_device.addr.delete(ip_address['cidr'])
if event == 'added':
ip_device.addr.add(ip_address['cidr'])
def test_add_remove_ip_address_and_interface(self):
for device in self.devices:
self.ip_wrapper.add_dummy(device)
utils.wait_until_true(lambda: self._read_file({}), timeout=30)
ip_addresses = [
{'cidr': '192.168.250.1/24', 'event': 'added',
'name': self.devices[0]},
{'cidr': '192.168.250.2/24', 'event': 'added',
'name': self.devices[1]},
{'cidr': '192.168.250.3/24', 'event': 'added',
'name': self.devices[2]},
{'cidr': '192.168.250.10/24', 'event': 'added',
'name': self.devices[3]},
{'cidr': '192.168.250.10/24', 'event': 'removed',
'name': self.devices[3]},
{'cidr': '2001:db8::1/64', 'event': 'added',
'name': self.devices[4]},
{'cidr': '2001:db8::2/64', 'event': 'added',
'name': self.devices[4]}]
self._handle_ip_addresses('added', ip_addresses)
self._handle_ip_addresses('removed', ip_addresses)
self._check_read_file(ip_addresses)
ip_device = ip_lib.IPDevice(self.devices[4], self.namespace)
ip_device.link.delete()
ip_addresses = [
{'cidr': '2001:db8::1/64', 'event': 'removed',
'name': self.devices[4]},
{'cidr': '2001:db8::2/64', 'event': 'removed',
'name': self.devices[4]}]
self._check_read_file(ip_addresses)
def test_interface_added_after_initilization(self):
for device in self.devices[:len(self.devices) - 1]:
self.ip_wrapper.add_dummy(device)
utils.wait_until_true(lambda: self._read_file({}), timeout=30)
ip_addresses = [
{'cidr': '192.168.250.21/24', 'event': 'added',
'name': self.devices[0]},
{'cidr': '192.168.250.22/24', 'event': 'added',
'name': self.devices[1]}]
self._handle_ip_addresses('added', ip_addresses)
self._check_read_file(ip_addresses)
self.ip_wrapper.add_dummy(self.devices[-1])
ip_addresses.append({'cidr': '192.168.250.23/24', 'event': 'added',
'name': self.devices[-1]})
self._handle_ip_addresses('added', [ip_addresses[-1]])
self._check_read_file(ip_addresses)

View File

@ -1890,20 +1890,21 @@ class ParseLinkDeviceTestCase(base.BaseTestCase):
def test_parse_link_devices(self):
device = ({'index': 1, 'attrs': [['IFLA_IFNAME', 'int_name']]})
self.mock_get_ip_addresses.return_value = [
{'prefixlen': 24, 'scope': 200, 'attrs': [
{'prefixlen': 24, 'scope': 200, 'event': 'RTM_NEWADDR', 'attrs': [
['IFA_ADDRESS', '192.168.10.20'],
['IFA_FLAGS', ifaddrmsg.IFA_F_PERMANENT]]},
{'prefixlen': 64, 'scope': 200, 'attrs': [
{'prefixlen': 64, 'scope': 200, 'event': 'RTM_DELADDR', 'attrs': [
['IFA_ADDRESS', '2001:db8::1'],
['IFA_FLAGS', ifaddrmsg.IFA_F_PERMANENT]]}]
retval = ip_lib._parse_link_device('namespace', device)
expected = [{'scope': 'site', 'cidr': '192.168.10.20/24',
'dynamic': False, 'dadfailed': False, 'name': 'int_name',
'broadcast': None, 'tentative': False},
'broadcast': None, 'tentative': False, 'event': 'added'},
{'scope': 'site', 'cidr': '2001:db8::1/64',
'dynamic': False, 'dadfailed': False, 'name': 'int_name',
'broadcast': None, 'tentative': False}]
'broadcast': None, 'tentative': False,
'event': 'removed'}]
self.assertEqual(expected, retval)