Use threads insted of greethreads in IP monitor

IP monitor is a method that is going to be executed in a separate
process, to monitor the IP addresses changes in a namespace.

This method spawns a thread to read from a socket opened by Pyroute2.
The read function is a blocking method that will end only when the
socket is closed. To avoid thread starvation that can happen using
greenthreads, IP monitor will use kernel threads.

This will increase the resources used but will ensure that no message
is lost when reading the monitor socket.

Reduced the number of IPs generated in "test_add_and_remove_multiple_ips"
to shrink the testing time used.

Change-Id: I3fbba2854d40ab0f683443aa30c2a95752345d2e
Closes-Bug: #1849547
This commit is contained in:
Rodolfo Alonso Hernandez 2019-10-22 17:51:18 +00:00
parent 471dc98707
commit 48730d9449
3 changed files with 44 additions and 31 deletions

View File

@ -15,6 +15,7 @@
import errno
import re
import threading
import time
import eventlet
@ -1412,7 +1413,7 @@ def ip_monitor(namespace, queue, event_stop, event_started):
return
raise
def read_ip_updates(_queue):
def read_ip_updates(_ip, _queue):
"""Read Pyroute2.IPRoute input socket
The aim of this function is to open and bind an IPRoute socket only for
@ -1420,13 +1421,14 @@ def ip_monitor(namespace, queue, event_stop, event_started):
opened socket. This function is executed in a separate thread,
dedicated only to this task.
"""
with privileged.get_iproute(namespace) as ip:
ip.bind()
_ip.bind(async_cache=True)
try:
while True:
eventlet.sleep(0)
ip_addresses = ip.get()
ip_addresses = _ip.get()
for ip_address in ip_addresses:
_queue.put(ip_address)
except EOFError:
pass
_queue = eventlet.Queue()
try:
@ -1435,13 +1437,14 @@ def ip_monitor(namespace, queue, event_stop, event_started):
for device in ip.get_links():
cache_devices[device['index']] = get_attr(device,
'IFLA_IFNAME')
pool = eventlet.GreenPool(1)
ip_updates_thread = pool.spawn(read_ip_updates, _queue)
event_started.send()
while not event_stop.ready():
eventlet.sleep(0)
_ip = privileged.get_iproute(namespace)
ip_updates_thread = threading.Thread(target=read_ip_updates,
args=(_ip, _queue))
ip_updates_thread.start()
event_started.set()
while not event_stop.is_set():
try:
ip_address = _queue.get(timeout=2)
ip_address = _queue.get(timeout=1)
except eventlet.queue.Empty:
continue
if 'index' in ip_address and 'prefixlen' in ip_address:
@ -1454,7 +1457,8 @@ def ip_monitor(namespace, queue, event_stop, event_started):
cache_devices[index] = name
queue.put(_parse_ip_address(ip_address, name))
ip_updates_thread.kill()
_ip.close()
ip_updates_thread.join(timeout=5)
except OSError as e:
if e.errno == errno.ENOENT:

View File

@ -17,24 +17,27 @@
import signal
import sys
import threading
import eventlet
from eventlet import queue
from oslo_serialization import jsonutils
from six.moves import queue
from neutron.agent.linux import ip_lib
EVENT_STOP = eventlet.Event()
EVENT_STARTED = eventlet.Event()
POOL = eventlet.GreenPool(2)
EVENT_STOP = threading.Event()
EVENT_STARTED = threading.Event()
IP_MONITOR = None
READ_QUEUE = None
def sigterm_handler(_signo, _stack_frame):
global EVENT_STOP
global POOL
EVENT_STOP.send()
POOL.waitall()
global IP_MONITOR
global READ_QUEUE
EVENT_STOP.set()
IP_MONITOR.join()
READ_QUEUE.join()
exit(0)
@ -45,11 +48,10 @@ def read_queue(temp_file, _queue, event_stop, event_started):
event_started.wait()
with open(temp_file, 'w') as f:
f.write('')
while not event_stop.ready():
eventlet.sleep(0)
while not event_stop.is_set():
try:
retval = _queue.get(timeout=2)
except eventlet.queue.Empty:
retval = _queue.get(timeout=1)
except queue.Empty:
retval = None
if retval:
with open(temp_file, 'a+') as f:
@ -57,12 +59,19 @@ def read_queue(temp_file, _queue, event_stop, event_started):
def main(temp_file, namespace):
global POOL
global IP_MONITOR
global READ_QUEUE
namespace = None if namespace == 'None' else namespace
_queue = queue.Queue()
POOL.spawn(ip_lib.ip_monitor, namespace, _queue, EVENT_STOP, EVENT_STARTED)
POOL.spawn(read_queue, temp_file, _queue, EVENT_STOP, EVENT_STARTED)
POOL.waitall()
IP_MONITOR = threading.Thread(
target=ip_lib.ip_monitor,
args=(namespace, _queue, EVENT_STOP, EVENT_STARTED))
IP_MONITOR.start()
READ_QUEUE = threading.Thread(
target=read_queue,
args=(temp_file, _queue, EVENT_STOP, EVENT_STARTED))
READ_QUEUE.start()
READ_QUEUE.join()
if __name__ == "__main__":

View File

@ -694,7 +694,7 @@ class IpMonitorTestCase(testscenarios.WithScenarios,
self.proc = self._run_ip_monitor(ip_monitor)
def _cleanup(self):
self.proc.stop(block=True, kill_signal=signal.SIGTERM)
self.proc.stop(kill_timeout=10, kill_signal=signal.SIGTERM)
if self.namespace:
priv_ip_lib.remove_netns(self.namespace)
else:
@ -814,7 +814,7 @@ class IpMonitorTestCase(testscenarios.WithScenarios,
utils.wait_until_true(lambda: self._read_file({}), timeout=30)
self.ip_wrapper.add_dummy(self.devices[0])
ip_addresses = []
for i in range(250):
for i in range(100):
_cidr = str(netaddr.IPNetwork('192.168.252.1/32').ip + i) + '/32'
ip_addresses.append({'cidr': _cidr, 'event': 'added',
'name': self.devices[0]})
@ -822,7 +822,7 @@ class IpMonitorTestCase(testscenarios.WithScenarios,
self._handle_ip_addresses('added', ip_addresses)
self._check_read_file(ip_addresses)
for i in range(250):
for i in range(100):
_cidr = str(netaddr.IPNetwork('192.168.252.1/32').ip + i) + '/32'
ip_addresses.append({'cidr': _cidr, 'event': 'removed',
'name': self.devices[0]})