Read IP monitor changes in a parallel thread

In order to capture all IP address changes, the method reading the
netlink socket will be executed in a parallel thread. Once the
"ip_monitor" method is stopped, this blocking thread will be killed.

A new functional test, "test_add_multiple_ips", is added in order to
stress test this method.

Change-Id: I8f1de4a31f97bab734a33f94c3069444defd870f
Closes-Bug: #1832307
This commit is contained in:
Rodolfo Alonso Hernandez 2019-06-12 10:39:55 +00:00
parent 76754e06f5
commit a20f4c08c4
2 changed files with 70 additions and 27 deletions

View File

@ -1474,43 +1474,64 @@ def ip_monitor(namespace, queue, event_stop, event_started):
cannot use privsep because is a blocking function and can exhaust the cannot use privsep because is a blocking function and can exhaust the
number of working threads. number of working threads.
""" """
def get_device_name(ip, index): def get_device_name(index):
try: try:
device = ip.link('get', index=index) with privileged.get_iproute(namespace) as ip:
if device: device = ip.link('get', index=index)
attrs = device[0].get('attrs', []) if device:
for attr in (attr for attr in attrs attrs = device[0].get('attrs', [])
if attr[0] == 'IFLA_IFNAME'): for attr in (attr for attr in attrs
return attr[1] if attr[0] == 'IFLA_IFNAME'):
return attr[1]
except netlink_exceptions.NetlinkError as e: except netlink_exceptions.NetlinkError as e:
if e.code == errno.ENODEV: if e.code == errno.ENODEV:
return return
raise raise
try: def read_ip_updates(_queue):
"""Read Pyroute2.IPRoute input socket
The aim of this function is to open and bind an IPRoute socket only for
reading the netlink changes; no other operations are done with this
opened socket. This function is executed in a separate thread,
dedicated only to this task.
"""
with privileged.get_iproute(namespace) as ip: with privileged.get_iproute(namespace) as ip:
ip.bind() ip.bind()
cache_devices = {} while True:
eventlet.sleep(0)
ip_addresses = ip.get()
for ip_address in ip_addresses:
_queue.put(ip_address)
_queue = eventlet.Queue()
try:
cache_devices = {}
with privileged.get_iproute(namespace) as ip:
for device in ip.get_links(): for device in ip.get_links():
cache_devices[device['index']] = get_attr(device, cache_devices[device['index']] = get_attr(device,
'IFLA_IFNAME') 'IFLA_IFNAME')
event_started.send() pool = eventlet.GreenPool(1)
while not event_stop.ready(): ip_updates_thread = pool.spawn(read_ip_updates, _queue)
eventlet.sleep(0) event_started.send()
ip_address = [] while not event_stop.ready():
with common_utils.Timer(timeout=2, raise_exception=False): eventlet.sleep(0)
ip_address = ip.get() try:
if not ip_address: ip_address = _queue.get(timeout=2)
except eventlet.queue.Empty:
continue
if 'index' in ip_address and 'prefixlen' in ip_address:
index = ip_address['index']
name = (get_device_name(index) or
cache_devices.get(index))
if not name:
continue continue
if 'index' in ip_address[0] and 'prefixlen' in ip_address[0]:
index = ip_address[0]['index']
name = (get_device_name(ip, index) or
cache_devices.get(index))
if not name:
continue
cache_devices[index] = name cache_devices[index] = name
queue.put(_parse_ip_address(ip_address[0], name)) queue.put(_parse_ip_address(ip_address, name))
ip_updates_thread.kill()
except OSError as e: except OSError as e:
if e.errno == errno.ENOENT: if e.errno == errno.ENOENT:
raise privileged.NetworkNamespaceNotFound(netns_name=namespace) raise privileged.NetworkNamespaceNotFound(netns_name=namespace)

View File

@ -788,17 +788,39 @@ class IpMonitorTestCase(testscenarios.WithScenarios,
self.ip_wrapper.add_dummy(device) self.ip_wrapper.add_dummy(device)
utils.wait_until_true(lambda: self._read_file({}), timeout=30) utils.wait_until_true(lambda: self._read_file({}), timeout=30)
ip_addresses = [ ip_addresses = [
{'cidr': '192.168.250.21/24', 'event': 'added', {'cidr': '192.168.251.21/24', 'event': 'added',
'name': self.devices[0]}, 'name': self.devices[0]},
{'cidr': '192.168.250.22/24', 'event': 'added', {'cidr': '192.168.251.22/24', 'event': 'added',
'name': self.devices[1]}] 'name': self.devices[1]}]
self._handle_ip_addresses('added', ip_addresses) self._handle_ip_addresses('added', ip_addresses)
self._check_read_file(ip_addresses) self._check_read_file(ip_addresses)
self.ip_wrapper.add_dummy(self.devices[-1]) self.ip_wrapper.add_dummy(self.devices[-1])
ip_addresses.append({'cidr': '192.168.250.23/24', 'event': 'added', ip_addresses.append({'cidr': '192.168.251.23/24', 'event': 'added',
'name': self.devices[-1]}) 'name': self.devices[-1]})
self._handle_ip_addresses('added', [ip_addresses[-1]]) self._handle_ip_addresses('added', [ip_addresses[-1]])
self._check_read_file(ip_addresses) self._check_read_file(ip_addresses)
def test_add_and_remove_multiple_ips(self):
# NOTE(ralonsoh): testing [1], adding multiple IPs.
# [1] https://bugs.launchpad.net/neutron/+bug/1832307
utils.wait_until_true(lambda: self._read_file({}), timeout=30)
self.ip_wrapper.add_dummy(self.devices[0])
ip_addresses = []
for i in range(250):
_cidr = str(netaddr.IPNetwork('192.168.252.1/32').ip + i) + '/32'
ip_addresses.append({'cidr': _cidr, 'event': 'added',
'name': self.devices[0]})
self._handle_ip_addresses('added', ip_addresses)
self._check_read_file(ip_addresses)
for i in range(250):
_cidr = str(netaddr.IPNetwork('192.168.252.1/32').ip + i) + '/32'
ip_addresses.append({'cidr': _cidr, 'event': 'removed',
'name': self.devices[0]})
self._handle_ip_addresses('removed', ip_addresses)
self._check_read_file(ip_addresses)