From cca870abbc4f8a784c9cbed817ff307ab3094daa Mon Sep 17 00:00:00 2001
From: Thomas Morin <thomas.morin@orange.com>
Date: Sat, 3 Mar 2018 00:00:47 +0000
Subject: [PATCH] Remove race and simplify conntrack state management

This change:
* avoids creating lots of short-lived threads
  (because: why would we ?)
* adds a try/except block to be sure we can log
  any issue that would otherwise be hidden by
  spawn_n.
* changes class from Queue to LightQueue after
  eventlet.queue code inspection, since it's a
  little smaller and should be faster

Change-Id: Ic348c08af375099a919116188ae17d2017695ecb
Closes-Bug: 1750777
(cherry picked from commit 35c225aaa37ee685000bfa07a02e83481485138d)
---
 neutron/agent/linux/ip_conntrack.py           | 56 ++++++++++---------
 .../agent/linux/test_iptables_firewall.py     | 12 ++--
 2 files changed, 37 insertions(+), 31 deletions(-)

diff --git a/neutron/agent/linux/ip_conntrack.py b/neutron/agent/linux/ip_conntrack.py
index 4a9e70fb9db..799a8820337 100644
--- a/neutron/agent/linux/ip_conntrack.py
+++ b/neutron/agent/linux/ip_conntrack.py
@@ -17,7 +17,6 @@ import eventlet
 import netaddr
 from oslo_concurrency import lockutils
 from oslo_log import log as logging
-from six.moves import queue as Queue
 
 from neutron.agent.linux import utils as linux_utils
 from neutron.common import constants as n_const
@@ -28,6 +27,8 @@ CONTRACK_MGRS = {}
 MAX_CONNTRACK_ZONES = 65535
 ZONE_START = 4097
 
+WORKERS = 8
+
 
 class IpConntrackUpdate(object):
     """Encapsulates a conntrack update
@@ -40,20 +41,10 @@ class IpConntrackUpdate(object):
         self.rule = rule
         self.remote_ips = remote_ips
 
-
-class IpConntrackProcessingQueue(object):
-    """Manager of the queue of conntrack updates to process."""
-    def __init__(self):
-        self._queue = Queue.Queue()
-
-    def add(self, update):
-        self._queue.put(update)
-
-    def updates(self):
-        """Grabs the next conntrack update from the queue and processes."""
-        while not self._queue.empty():
-            update = self._queue.get()
-            yield update
+    def __repr__(self):
+        return ('<IpConntrackUpdate(device_info_list=%s, rule=%s, '
+                'remote_ips=%s>' % (self.device_info_list, self.rule,
+                                    self.remote_ips))
 
 
 @lockutils.synchronized('conntrack')
@@ -82,32 +73,43 @@ class IpConntrackManager(object):
         self.unfiltered_ports = unfiltered_ports
         self.zone_per_port = zone_per_port  # zone per port vs per network
         self._populate_initial_zone_map()
-        self._queue = IpConntrackProcessingQueue()
-        self.start_process_queue()
+        self._queue = eventlet.queue.LightQueue()
+        self._start_process_queue()
 
-    def start_process_queue(self):
-        eventlet.spawn_n(self._process_queue_loop)
+    def _start_process_queue(self):
+        LOG.debug("Starting ip_conntrack _process_queue_worker() threads")
+        pool = eventlet.GreenPool(size=WORKERS)
+        for i in range(WORKERS):
+            pool.spawn_n(self._process_queue_worker)
 
-    def _process_queue_loop(self):
-        LOG.debug("Starting ipconntrack _process_queue_loop()")
-        pool = eventlet.GreenPool(size=8)
+    def _process_queue_worker(self):
+        # While it's technically not necessary to have this method, the
+        # 'while True' could just be in _process_queue(), the tests have
+        # to be able to drain the queue without blocking, so _process_queue()
+        # is made standalone.
         while True:
-            pool.spawn_n(self._process_queue)
+            self._process_queue()
 
     def _process_queue(self):
-        for update in self._queue.updates():
+        update = None
+        try:
+            # this will block until an entry gets added to the queue
+            update = self._queue.get()
             if update.remote_ips:
                 for remote_ip in update.remote_ips:
                     self._delete_conntrack_state(
                         update.device_info_list, update.rule, remote_ip)
             else:
-                self._delete_conntrack_state(update.device_info_list,
-                                             update.rule)
+                self._delete_conntrack_state(
+                    update.device_info_list, update.rule)
+        except Exception:
+            LOG.exception("Failed to process ip_conntrack queue entry: %s",
+                          update)
 
     def _process(self, device_info_list, rule, remote_ips=None):
         # queue the update to allow the caller to resume its work
         update = IpConntrackUpdate(device_info_list, rule, remote_ips)
-        self._queue.add(update)
+        self._queue.put(update)
 
     @staticmethod
     def _generate_conntrack_cmd_by_rule(rule, namespace):
diff --git a/neutron/tests/unit/agent/linux/test_iptables_firewall.py b/neutron/tests/unit/agent/linux/test_iptables_firewall.py
index 37bd31a9d52..765a28f26aa 100644
--- a/neutron/tests/unit/agent/linux/test_iptables_firewall.py
+++ b/neutron/tests/unit/agent/linux/test_iptables_firewall.py
@@ -1249,7 +1249,8 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
                 self.assertFalse(self.utils_exec.called)
                 return
             # process conntrack updates in the queue
-            self.firewall.ipconntrack._process_queue()
+            while not self.firewall.ipconntrack._queue.empty():
+                self.firewall.ipconntrack._process_queue()
             cmd = ['conntrack', '-D']
             if protocol:
                 cmd.extend(['-p', protocol])
@@ -1339,7 +1340,8 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
                 self.assertFalse(self.utils_exec.called)
                 return
             # process conntrack updates in the queue
-            self.firewall.ipconntrack._process_queue()
+            while not self.firewall.ipconntrack._queue.empty():
+                self.firewall.ipconntrack._process_queue()
             calls = self._get_expected_conntrack_calls(
                 [('ipv4', '10.0.0.1'), ('ipv6', 'fe80::1')], ct_zone)
             self.utils_exec.assert_has_calls(calls)
@@ -1404,7 +1406,8 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
                    "ipv6": ['fe80::1', 'fe80::2']}
             calls = []
             # process conntrack updates in the queue
-            self.firewall.ipconntrack._process_queue()
+            while not self.firewall.ipconntrack._queue.empty():
+                self.firewall.ipconntrack._process_queue()
             for direction in ['ingress', 'egress']:
                 direction = '-d' if direction == 'ingress' else '-s'
                 remote_ip_direction = '-s' if direction == '-d' else '-d'
@@ -1650,7 +1653,8 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
             self.assertFalse(self.utils_exec.called)
             return
         # process conntrack updates in the queue
-        self.firewall.ipconntrack._process_queue()
+        while not self.firewall.ipconntrack._queue.empty():
+            self.firewall.ipconntrack._process_queue()
         calls = self._get_expected_conntrack_calls(
             [('ipv4', '10.0.0.1'), ('ipv6', 'fe80::1')], ct_zone)
         self.utils_exec.assert_has_calls(calls)