From 4c8b97eca32c9c2beadf95fef14ed5b7d8981c5a Mon Sep 17 00:00:00 2001 From: Brian Haley Date: Thu, 1 Mar 2018 15:42:59 +0000 Subject: [PATCH] Do not start conntrack worker thread from __init__ Instead, start it when the first entry is being added to the queue. Also, log any exceptions just in case get() throws something so we can do further debugging. Changed class from Queue to LightQueue was done after going through the eventlet.queue code looking at usage, since it's a little smaller and should be faster. Change-Id: Ie84be88382f327ebe312bf17ec2dc5c80a8de35f Closes-bug: 1750777 --- neutron/agent/linux/ip_conntrack.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/neutron/agent/linux/ip_conntrack.py b/neutron/agent/linux/ip_conntrack.py index 4a9e70fb9db..648ab332ad8 100644 --- a/neutron/agent/linux/ip_conntrack.py +++ b/neutron/agent/linux/ip_conntrack.py @@ -17,7 +17,6 @@ import eventlet import netaddr from oslo_concurrency import lockutils from oslo_log import log as logging -from six.moves import queue as Queue from neutron.agent.linux import utils as linux_utils from neutron.common import constants as n_const @@ -44,7 +43,7 @@ class IpConntrackUpdate(object): class IpConntrackProcessingQueue(object): """Manager of the queue of conntrack updates to process.""" def __init__(self): - self._queue = Queue.Queue() + self._queue = eventlet.queue.LightQueue() def add(self, update): self._queue.put(update) @@ -52,8 +51,15 @@ class IpConntrackProcessingQueue(object): def updates(self): """Grabs the next conntrack update from the queue and processes.""" while not self._queue.empty(): - update = self._queue.get() - yield update + try: + update = self._queue.get() + yield update + except IndexError: + # queue was empty, another worker stole our entry + continue + except Exception as e: + LOG.error("Failed to yield ip_conntrack process queue " + "entry: %s", e) @lockutils.synchronized('conntrack') @@ -83,9 +89,11 @@ class IpConntrackManager(object): self.zone_per_port = zone_per_port # zone per port vs per network self._populate_initial_zone_map() self._queue = IpConntrackProcessingQueue() - self.start_process_queue() + # Don't start the queue processing thread here, do later when + # the first entry is added to the queue. + self._process_queue_started = False - def start_process_queue(self): + def _start_process_queue(self): eventlet.spawn_n(self._process_queue_loop) def _process_queue_loop(self): @@ -105,6 +113,9 @@ class IpConntrackManager(object): update.rule) def _process(self, device_info_list, rule, remote_ips=None): + if not self._process_queue_started: + self._process_queue_started = True + self._start_process_queue() # queue the update to allow the caller to resume its work update = IpConntrackUpdate(device_info_list, rule, remote_ips) self._queue.add(update)