From 8657983c6d7f5f40c980ef7ff9c95f57c259f97b Mon Sep 17 00:00:00 2001 From: Rodolfo Alonso Hernandez Date: Tue, 22 Oct 2019 17:51:18 +0000 Subject: [PATCH] Use threads insted of greethreads in IP monitor IP monitor is a method that is going to be executed in a separate process, to monitor the IP addresses changes in a namespace. This method spawns a thread to read from a socket opened by Pyroute2. The read function is a blocking method that will end only when the socket is closed. To avoid thread starvation that can happen using greenthreads, IP monitor will use kernel threads. This will increase the resources used but will ensure that no message is lost when reading the monitor socket. Reduced the number of IPs generated in "test_add_and_remove_multiple_ips" to shrink the testing time used. Change-Id: I3fbba2854d40ab0f683443aa30c2a95752345d2e Closes-Bug: #1849547 (cherry picked from commit 48730d9449c0e425637f0f1182d81f60a3d259ac) --- neutron/agent/linux/ip_lib.py | 28 +++++++------ .../functional/agent/linux/bin/ip_monitor.py | 41 +++++++++++-------- .../functional/agent/linux/test_ip_lib.py | 6 +-- 3 files changed, 44 insertions(+), 31 deletions(-) diff --git a/neutron/agent/linux/ip_lib.py b/neutron/agent/linux/ip_lib.py index e1747be00dc..2315a598c95 100644 --- a/neutron/agent/linux/ip_lib.py +++ b/neutron/agent/linux/ip_lib.py @@ -15,6 +15,7 @@ import errno import re +import threading import time import eventlet @@ -1412,7 +1413,7 @@ def ip_monitor(namespace, queue, event_stop, event_started): return raise - def read_ip_updates(_queue): + def read_ip_updates(_ip, _queue): """Read Pyroute2.IPRoute input socket The aim of this function is to open and bind an IPRoute socket only for @@ -1420,13 +1421,14 @@ def ip_monitor(namespace, queue, event_stop, event_started): opened socket. This function is executed in a separate thread, dedicated only to this task. """ - with privileged.get_iproute(namespace) as ip: - ip.bind() + _ip.bind(async_cache=True) + try: while True: - eventlet.sleep(0) - ip_addresses = ip.get() + ip_addresses = _ip.get() for ip_address in ip_addresses: _queue.put(ip_address) + except EOFError: + pass _queue = eventlet.Queue() try: @@ -1435,13 +1437,14 @@ def ip_monitor(namespace, queue, event_stop, event_started): for device in ip.get_links(): cache_devices[device['index']] = get_attr(device, 'IFLA_IFNAME') - pool = eventlet.GreenPool(1) - ip_updates_thread = pool.spawn(read_ip_updates, _queue) - event_started.send() - while not event_stop.ready(): - eventlet.sleep(0) + _ip = privileged.get_iproute(namespace) + ip_updates_thread = threading.Thread(target=read_ip_updates, + args=(_ip, _queue)) + ip_updates_thread.start() + event_started.set() + while not event_stop.is_set(): try: - ip_address = _queue.get(timeout=2) + ip_address = _queue.get(timeout=1) except eventlet.queue.Empty: continue if 'index' in ip_address and 'prefixlen' in ip_address: @@ -1454,7 +1457,8 @@ def ip_monitor(namespace, queue, event_stop, event_started): cache_devices[index] = name queue.put(_parse_ip_address(ip_address, name)) - ip_updates_thread.kill() + _ip.close() + ip_updates_thread.join(timeout=5) except OSError as e: if e.errno == errno.ENOENT: diff --git a/neutron/tests/functional/agent/linux/bin/ip_monitor.py b/neutron/tests/functional/agent/linux/bin/ip_monitor.py index b86b58a5e73..d27d053ff99 100755 --- a/neutron/tests/functional/agent/linux/bin/ip_monitor.py +++ b/neutron/tests/functional/agent/linux/bin/ip_monitor.py @@ -17,24 +17,27 @@ import signal import sys +import threading -import eventlet -from eventlet import queue from oslo_serialization import jsonutils +from six.moves import queue from neutron.agent.linux import ip_lib -EVENT_STOP = eventlet.Event() -EVENT_STARTED = eventlet.Event() -POOL = eventlet.GreenPool(2) +EVENT_STOP = threading.Event() +EVENT_STARTED = threading.Event() +IP_MONITOR = None +READ_QUEUE = None def sigterm_handler(_signo, _stack_frame): global EVENT_STOP - global POOL - EVENT_STOP.send() - POOL.waitall() + global IP_MONITOR + global READ_QUEUE + EVENT_STOP.set() + IP_MONITOR.join() + READ_QUEUE.join() exit(0) @@ -45,11 +48,10 @@ def read_queue(temp_file, _queue, event_stop, event_started): event_started.wait() with open(temp_file, 'w') as f: f.write('') - while not event_stop.ready(): - eventlet.sleep(0) + while not event_stop.is_set(): try: - retval = _queue.get(timeout=2) - except eventlet.queue.Empty: + retval = _queue.get(timeout=1) + except queue.Empty: retval = None if retval: with open(temp_file, 'a+') as f: @@ -57,12 +59,19 @@ def read_queue(temp_file, _queue, event_stop, event_started): def main(temp_file, namespace): - global POOL + global IP_MONITOR + global READ_QUEUE namespace = None if namespace == 'None' else namespace _queue = queue.Queue() - POOL.spawn(ip_lib.ip_monitor, namespace, _queue, EVENT_STOP, EVENT_STARTED) - POOL.spawn(read_queue, temp_file, _queue, EVENT_STOP, EVENT_STARTED) - POOL.waitall() + IP_MONITOR = threading.Thread( + target=ip_lib.ip_monitor, + args=(namespace, _queue, EVENT_STOP, EVENT_STARTED)) + IP_MONITOR.start() + READ_QUEUE = threading.Thread( + target=read_queue, + args=(temp_file, _queue, EVENT_STOP, EVENT_STARTED)) + READ_QUEUE.start() + READ_QUEUE.join() if __name__ == "__main__": diff --git a/neutron/tests/functional/agent/linux/test_ip_lib.py b/neutron/tests/functional/agent/linux/test_ip_lib.py index 14df5f0b3de..7c81f9eac6c 100644 --- a/neutron/tests/functional/agent/linux/test_ip_lib.py +++ b/neutron/tests/functional/agent/linux/test_ip_lib.py @@ -694,7 +694,7 @@ class IpMonitorTestCase(testscenarios.WithScenarios, self.proc = self._run_ip_monitor(ip_monitor) def _cleanup(self): - self.proc.stop(block=True, kill_signal=signal.SIGTERM) + self.proc.stop(kill_timeout=10, kill_signal=signal.SIGTERM) if self.namespace: priv_ip_lib.remove_netns(self.namespace) else: @@ -814,7 +814,7 @@ class IpMonitorTestCase(testscenarios.WithScenarios, utils.wait_until_true(lambda: self._read_file({}), timeout=30) self.ip_wrapper.add_dummy(self.devices[0]) ip_addresses = [] - for i in range(250): + for i in range(100): _cidr = str(netaddr.IPNetwork('192.168.252.1/32').ip + i) + '/32' ip_addresses.append({'cidr': _cidr, 'event': 'added', 'name': self.devices[0]}) @@ -822,7 +822,7 @@ class IpMonitorTestCase(testscenarios.WithScenarios, self._handle_ip_addresses('added', ip_addresses) self._check_read_file(ip_addresses) - for i in range(250): + for i in range(100): _cidr = str(netaddr.IPNetwork('192.168.252.1/32').ip + i) + '/32' ip_addresses.append({'cidr': _cidr, 'event': 'removed', 'name': self.devices[0]})