diff --git a/neutron/agent/linux/ip_lib.py b/neutron/agent/linux/ip_lib.py index 81d0acca866..59472657f59 100644 --- a/neutron/agent/linux/ip_lib.py +++ b/neutron/agent/linux/ip_lib.py @@ -15,6 +15,7 @@ import errno import re +import threading import time import eventlet @@ -1412,7 +1413,7 @@ def ip_monitor(namespace, queue, event_stop, event_started): return raise - def read_ip_updates(_queue): + def read_ip_updates(_ip, _queue): """Read Pyroute2.IPRoute input socket The aim of this function is to open and bind an IPRoute socket only for @@ -1420,13 +1421,14 @@ def ip_monitor(namespace, queue, event_stop, event_started): opened socket. This function is executed in a separate thread, dedicated only to this task. """ - with privileged.get_iproute(namespace) as ip: - ip.bind() + _ip.bind(async_cache=True) + try: while True: - eventlet.sleep(0) - ip_addresses = ip.get() + ip_addresses = _ip.get() for ip_address in ip_addresses: _queue.put(ip_address) + except EOFError: + pass _queue = eventlet.Queue() try: @@ -1435,13 +1437,14 @@ def ip_monitor(namespace, queue, event_stop, event_started): for device in ip.get_links(): cache_devices[device['index']] = get_attr(device, 'IFLA_IFNAME') - pool = eventlet.GreenPool(1) - ip_updates_thread = pool.spawn(read_ip_updates, _queue) - event_started.send() - while not event_stop.ready(): - eventlet.sleep(0) + _ip = privileged.get_iproute(namespace) + ip_updates_thread = threading.Thread(target=read_ip_updates, + args=(_ip, _queue)) + ip_updates_thread.start() + event_started.set() + while not event_stop.is_set(): try: - ip_address = _queue.get(timeout=2) + ip_address = _queue.get(timeout=1) except eventlet.queue.Empty: continue if 'index' in ip_address and 'prefixlen' in ip_address: @@ -1454,7 +1457,8 @@ def ip_monitor(namespace, queue, event_stop, event_started): cache_devices[index] = name queue.put(_parse_ip_address(ip_address, name)) - ip_updates_thread.kill() + _ip.close() + ip_updates_thread.join(timeout=5) except OSError as e: if e.errno == errno.ENOENT: diff --git a/neutron/tests/functional/agent/linux/bin/ip_monitor.py b/neutron/tests/functional/agent/linux/bin/ip_monitor.py index b86b58a5e73..d27d053ff99 100755 --- a/neutron/tests/functional/agent/linux/bin/ip_monitor.py +++ b/neutron/tests/functional/agent/linux/bin/ip_monitor.py @@ -17,24 +17,27 @@ import signal import sys +import threading -import eventlet -from eventlet import queue from oslo_serialization import jsonutils +from six.moves import queue from neutron.agent.linux import ip_lib -EVENT_STOP = eventlet.Event() -EVENT_STARTED = eventlet.Event() -POOL = eventlet.GreenPool(2) +EVENT_STOP = threading.Event() +EVENT_STARTED = threading.Event() +IP_MONITOR = None +READ_QUEUE = None def sigterm_handler(_signo, _stack_frame): global EVENT_STOP - global POOL - EVENT_STOP.send() - POOL.waitall() + global IP_MONITOR + global READ_QUEUE + EVENT_STOP.set() + IP_MONITOR.join() + READ_QUEUE.join() exit(0) @@ -45,11 +48,10 @@ def read_queue(temp_file, _queue, event_stop, event_started): event_started.wait() with open(temp_file, 'w') as f: f.write('') - while not event_stop.ready(): - eventlet.sleep(0) + while not event_stop.is_set(): try: - retval = _queue.get(timeout=2) - except eventlet.queue.Empty: + retval = _queue.get(timeout=1) + except queue.Empty: retval = None if retval: with open(temp_file, 'a+') as f: @@ -57,12 +59,19 @@ def read_queue(temp_file, _queue, event_stop, event_started): def main(temp_file, namespace): - global POOL + global IP_MONITOR + global READ_QUEUE namespace = None if namespace == 'None' else namespace _queue = queue.Queue() - POOL.spawn(ip_lib.ip_monitor, namespace, _queue, EVENT_STOP, EVENT_STARTED) - POOL.spawn(read_queue, temp_file, _queue, EVENT_STOP, EVENT_STARTED) - POOL.waitall() + IP_MONITOR = threading.Thread( + target=ip_lib.ip_monitor, + args=(namespace, _queue, EVENT_STOP, EVENT_STARTED)) + IP_MONITOR.start() + READ_QUEUE = threading.Thread( + target=read_queue, + args=(temp_file, _queue, EVENT_STOP, EVENT_STARTED)) + READ_QUEUE.start() + READ_QUEUE.join() if __name__ == "__main__": diff --git a/neutron/tests/functional/agent/linux/test_ip_lib.py b/neutron/tests/functional/agent/linux/test_ip_lib.py index 2bf15ec4fa3..c24271dd7b4 100644 --- a/neutron/tests/functional/agent/linux/test_ip_lib.py +++ b/neutron/tests/functional/agent/linux/test_ip_lib.py @@ -694,7 +694,7 @@ class IpMonitorTestCase(testscenarios.WithScenarios, self.proc = self._run_ip_monitor(ip_monitor) def _cleanup(self): - self.proc.stop(block=True, kill_signal=signal.SIGTERM) + self.proc.stop(kill_timeout=10, kill_signal=signal.SIGTERM) if self.namespace: priv_ip_lib.remove_netns(self.namespace) else: @@ -814,7 +814,7 @@ class IpMonitorTestCase(testscenarios.WithScenarios, utils.wait_until_true(lambda: self._read_file({}), timeout=30) self.ip_wrapper.add_dummy(self.devices[0]) ip_addresses = [] - for i in range(250): + for i in range(100): _cidr = str(netaddr.IPNetwork('192.168.252.1/32').ip + i) + '/32' ip_addresses.append({'cidr': _cidr, 'event': 'added', 'name': self.devices[0]}) @@ -822,7 +822,7 @@ class IpMonitorTestCase(testscenarios.WithScenarios, self._handle_ip_addresses('added', ip_addresses) self._check_read_file(ip_addresses) - for i in range(250): + for i in range(100): _cidr = str(netaddr.IPNetwork('192.168.252.1/32').ip + i) + '/32' ip_addresses.append({'cidr': _cidr, 'event': 'removed', 'name': self.devices[0]})