Do not use threading.Event

Waiting on a threading.Event with eventlet can cause busy looping via
epoll_wait, see related bug for more details.

Change-Id: I007613058a2d21d1712c02fa6d1602b63705c1ab
Related-bug: #1518430
This commit is contained in:
John Eckersberg 2018-04-04 12:55:44 -04:00 committed by Stephen Finucane
parent 859e0d4eaa
commit d873c0d8f5
11 changed files with 40 additions and 31 deletions

View File

@ -48,7 +48,7 @@ oslo.log==3.36.0
oslo.middleware==3.31.0
oslo.serialization==2.18.0
oslo.service==1.24.0
oslo.utils==3.33.0
oslo.utils==3.37.0
oslotest==3.2.0
Paste==2.0.2
PasteDeploy==1.5.0

View File

@ -35,6 +35,7 @@ import threading
import time
import uuid
from oslo_utils import eventletutils
import proton
import pyngus
from six import iteritems
@ -85,7 +86,7 @@ class SubscribeTask(Task):
self._subscriber_id = listener.id
self._in_queue = listener.incoming
self._service = SERVICE_NOTIFY if notifications else SERVICE_RPC
self._wakeup = threading.Event()
self._wakeup = eventletutils.Event()
def wait(self):
self._wakeup.wait()
@ -112,7 +113,7 @@ class SendTask(Task):
self.service = SERVICE_NOTIFY if notification else SERVICE_RPC
self.timer = None
self._retry = None if retry is None or retry < 0 else retry
self._wakeup = threading.Event()
self._wakeup = eventletutils.Event()
self._error = None
self._sender = None

View File

@ -19,6 +19,7 @@ import time
import uuid
import cachetools
from oslo_utils import eventletutils
from oslo_utils import timeutils
from six import moves
@ -49,7 +50,7 @@ class MessageOperationsHandler(object):
self.name = "%s (%s)" % (name, hex(id(self)))
self._tasks = moves.queue.Queue()
self._shutdown = threading.Event()
self._shutdown = eventletutils.Event()
self._shutdown_thread = threading.Thread(
target=self._process_in_background)
self._shutdown_thread.daemon = True
@ -270,8 +271,8 @@ class AMQPListener(base.PollStyleListener):
self.conn = conn
self.msg_id_cache = rpc_amqp._MsgIdCache()
self.incoming = []
self._shutdown = threading.Event()
self._shutoff = threading.Event()
self._shutdown = eventletutils.Event()
self._shutoff = eventletutils.Event()
self._obsolete_reply_queues = ObsoleteReplyQueuesCache()
self._message_operations_handler = MessageOperationsHandler(
"AMQPListener")
@ -434,7 +435,7 @@ class ReplyWaiter(object):
self.conn.declare_direct_consumer(reply_q, self)
self._thread_exit_event = threading.Event()
self._thread_exit_event = eventletutils.Event()
self._thread = threading.Thread(target=self.poll)
self._thread.daemon = True
self._thread.start()

View File

@ -18,6 +18,7 @@ import threading
import time
from oslo_serialization import jsonutils
from oslo_utils import eventletutils
from six import moves
import oslo_messaging
@ -49,7 +50,7 @@ class FakeListener(base.PollStyleListener):
self._exchange_manager = exchange_manager
self._targets = targets
self._pool = pool
self._stopped = threading.Event()
self._stopped = eventletutils.Event()
# NOTE(sileht): Ensure that all needed queues exists even the listener
# have not been polled yet

View File

@ -347,7 +347,7 @@ class KafkaListener(base.PollStyleListener):
def __init__(self, conn):
super(KafkaListener, self).__init__()
self._stopped = threading.Event()
self._stopped = eventletutils.Event()
self.conn = conn
self.incoming_queue = []

View File

@ -24,6 +24,8 @@ import threading
import six
from oslo_utils import eventletutils
from oslo_messaging import _utils as utils
from oslo_messaging import dispatcher
from oslo_messaging import serializer as msg_serializer
@ -249,7 +251,7 @@ class RPCDispatcher(dispatcher.DispatcherBase):
# is executing if it runs for some time. The thread will wait
# for the event to be signaled, which we do explicitly below
# after dispatching the method call.
completion_event = threading.Event()
completion_event = eventletutils.Event()
watchdog_thread = threading.Thread(target=self._watchdog,
args=(completion_event, incoming))
if incoming.client_timeout:

View File

@ -27,6 +27,7 @@ import threading
import time
import uuid
from oslo_utils import eventletutils
from oslo_utils import importutils
from six import moves
from string import Template
@ -75,8 +76,8 @@ class _ListenerThread(threading.Thread):
self._msg_ack = msg_ack
self.messages = moves.queue.Queue()
self.daemon = True
self.started = threading.Event()
self._done = threading.Event()
self.started = eventletutils.Event()
self._done = eventletutils.Event()
self.start()
self.started.wait()
@ -1146,7 +1147,7 @@ class TestLinkRecovery(_AmqpBrokerTestCase):
self._addrs = {'unicast.test-topic': 2,
'broadcast.test-topic.all': 2,
'exclusive.test-topic.server': 2}
self._recovered = threading.Event()
self._recovered = eventletutils.Event()
self._count = 0
def _on_active(link):
@ -2100,7 +2101,7 @@ class FakeBroker(threading.Thread):
self._connections = {}
self._sources = {}
self._pause = threading.Event()
self._pause = eventletutils.Event()
# count of messages forwarded, by messaging pattern
self.direct_count = 0
self.topic_count = 0

View File

@ -23,6 +23,7 @@ import fixtures
import kombu
import kombu.transport.memory
from oslo_serialization import jsonutils
from oslo_utils import eventletutils
import testscenarios
import oslo_messaging
@ -49,7 +50,7 @@ class TestHeartbeat(test_utils.BaseTestCase):
fake_logger, heartbeat_side_effect=None,
info=None):
event = threading.Event()
event = eventletutils.Event()
def heartbeat_check(rate=2):
event.set()

View File

@ -19,6 +19,7 @@ import warnings
import eventlet
import fixtures
from oslo_config import cfg
from oslo_utils import eventletutils
from six.moves import mock
import testscenarios
@ -61,7 +62,7 @@ class ServerSetupMixin(object):
class ServerController(object):
def __init__(self):
self.stopped = threading.Event()
self.stopped = eventletutils.Event()
def stop(self, ctxt):
self.stopped.set()
@ -704,11 +705,11 @@ class TestServerLocking(test_utils.BaseTestCase):
# Test that if 2 threads call a method simultaneously, both will wait,
# but only 1 will call the underlying executor method.
start_event = threading.Event()
finish_event = threading.Event()
start_event = eventletutils.Event()
finish_event = eventletutils.Event()
running_event = threading.Event()
done_event = threading.Event()
running_event = eventletutils.Event()
done_event = eventletutils.Event()
_runner = [None]
@ -734,7 +735,7 @@ class TestServerLocking(test_utils.BaseTestCase):
runner = _runner[0]
waiter = start2 if runner == start1 else start2
waiter_finished = threading.Event()
waiter_finished = eventletutils.Event()
waiter.link(lambda _: waiter_finished.set())
# At this point, runner is running start(), and waiter() is waiting for
@ -783,8 +784,8 @@ class TestServerLocking(test_utils.BaseTestCase):
# Ensure that if 2 threads wait for the completion of 'start', the
# first will wait until complete_event is signalled, but the second
# will continue
complete_event = threading.Event()
complete_waiting_callback = threading.Event()
complete_event = eventletutils.Event()
complete_waiting_callback = eventletutils.Event()
start_state = self.server._states['start']
old_wait_for_completion = start_state.wait_for_completion
@ -801,7 +802,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# thread1 will wait for start to complete until we signal it
thread1 = eventlet.spawn(self.server.stop)
thread1_finished = threading.Event()
thread1_finished = eventletutils.Event()
thread1.link(lambda _: thread1_finished.set())
self.server.start()
@ -847,7 +848,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# Test that we generate a log message if we wait longer than
# DEFAULT_LOG_AFTER
log_event = threading.Event()
log_event = eventletutils.Event()
mock_log.warning.side_effect = lambda _, __: log_event.set()
# Call stop without calling start. We should log a wait after 1 second
@ -863,7 +864,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# Test that we generate a log message if we wait longer than
# the number of seconds passed to log_after
log_event = threading.Event()
log_event = eventletutils.Event()
mock_log.warning.side_effect = lambda _, __: log_event.set()
# Call stop without calling start. We should log a wait after 1 second
@ -879,7 +880,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# Test that we log a message after log_after seconds if we've also
# specified an absolute timeout
log_event = threading.Event()
log_event = eventletutils.Event()
mock_log.warning.side_effect = lambda _, __: log_event.set()
# Call stop without calling start. We should log a wait after 1 second
@ -904,7 +905,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# Start the server, which will also instantiate an executor
self.server.start()
self.server.stop()
shutdown_called = threading.Event()
shutdown_called = eventletutils.Event()
# Patch the executor's stop method to be very slow
def slow_shutdown(wait):

View File

@ -22,6 +22,7 @@
import threading
from oslo_config import cfg
from oslo_utils import eventletutils
from oslotest import base
@ -63,8 +64,8 @@ class ServerThreadHelper(threading.Thread):
super(ServerThreadHelper, self).__init__()
self.daemon = True
self._server = server
self._stop_event = threading.Event()
self._start_event = threading.Event()
self._stop_event = eventletutils.Event()
self._start_event = eventletutils.Event()
def start(self):
super(ServerThreadHelper, self).start()

View File

@ -7,7 +7,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0
futurist>=1.2.0 # Apache-2.0
oslo.config>=5.2.0 # Apache-2.0
oslo.log>=3.36.0 # Apache-2.0
oslo.utils>=3.33.0 # Apache-2.0
oslo.utils>=3.37.0 # Apache-2.0
oslo.serialization!=2.19.1,>=2.18.0 # Apache-2.0
oslo.service!=1.28.1,>=1.24.0 # Apache-2.0
stevedore>=1.20.0 # Apache-2.0