Merge "Revert "Do not start conntrack worker thread from __init__""
This commit is contained in:
commit
78d14d3b46
@ -17,6 +17,7 @@ import eventlet
|
|||||||
import netaddr
|
import netaddr
|
||||||
from oslo_concurrency import lockutils
|
from oslo_concurrency import lockutils
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
from six.moves import queue as Queue
|
||||||
|
|
||||||
from neutron.agent.linux import utils as linux_utils
|
from neutron.agent.linux import utils as linux_utils
|
||||||
from neutron.common import constants as n_const
|
from neutron.common import constants as n_const
|
||||||
@ -43,7 +44,7 @@ class IpConntrackUpdate(object):
|
|||||||
class IpConntrackProcessingQueue(object):
|
class IpConntrackProcessingQueue(object):
|
||||||
"""Manager of the queue of conntrack updates to process."""
|
"""Manager of the queue of conntrack updates to process."""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._queue = eventlet.queue.LightQueue()
|
self._queue = Queue.Queue()
|
||||||
|
|
||||||
def add(self, update):
|
def add(self, update):
|
||||||
self._queue.put(update)
|
self._queue.put(update)
|
||||||
@ -51,15 +52,8 @@ class IpConntrackProcessingQueue(object):
|
|||||||
def updates(self):
|
def updates(self):
|
||||||
"""Grabs the next conntrack update from the queue and processes."""
|
"""Grabs the next conntrack update from the queue and processes."""
|
||||||
while not self._queue.empty():
|
while not self._queue.empty():
|
||||||
try:
|
update = self._queue.get()
|
||||||
update = self._queue.get()
|
yield update
|
||||||
yield update
|
|
||||||
except IndexError:
|
|
||||||
# queue was empty, another worker stole our entry
|
|
||||||
continue
|
|
||||||
except Exception as e:
|
|
||||||
LOG.error("Failed to yield ip_conntrack process queue "
|
|
||||||
"entry: %s", e)
|
|
||||||
|
|
||||||
|
|
||||||
@lockutils.synchronized('conntrack')
|
@lockutils.synchronized('conntrack')
|
||||||
@ -89,11 +83,9 @@ class IpConntrackManager(object):
|
|||||||
self.zone_per_port = zone_per_port # zone per port vs per network
|
self.zone_per_port = zone_per_port # zone per port vs per network
|
||||||
self._populate_initial_zone_map()
|
self._populate_initial_zone_map()
|
||||||
self._queue = IpConntrackProcessingQueue()
|
self._queue = IpConntrackProcessingQueue()
|
||||||
# Don't start the queue processing thread here, do later when
|
self.start_process_queue()
|
||||||
# the first entry is added to the queue.
|
|
||||||
self._process_queue_started = False
|
|
||||||
|
|
||||||
def _start_process_queue(self):
|
def start_process_queue(self):
|
||||||
eventlet.spawn_n(self._process_queue_loop)
|
eventlet.spawn_n(self._process_queue_loop)
|
||||||
|
|
||||||
def _process_queue_loop(self):
|
def _process_queue_loop(self):
|
||||||
@ -113,9 +105,6 @@ class IpConntrackManager(object):
|
|||||||
update.rule)
|
update.rule)
|
||||||
|
|
||||||
def _process(self, device_info_list, rule, remote_ips=None):
|
def _process(self, device_info_list, rule, remote_ips=None):
|
||||||
if not self._process_queue_started:
|
|
||||||
self._process_queue_started = True
|
|
||||||
self._start_process_queue()
|
|
||||||
# queue the update to allow the caller to resume its work
|
# queue the update to allow the caller to resume its work
|
||||||
update = IpConntrackUpdate(device_info_list, rule, remote_ips)
|
update = IpConntrackUpdate(device_info_list, rule, remote_ips)
|
||||||
self._queue.add(update)
|
self._queue.add(update)
|
||||||
|
Loading…
Reference in New Issue
Block a user