From d873c0d8f5722d5e2a08b5fc49d58b63dbe061a0 Mon Sep 17 00:00:00 2001 From: John Eckersberg Date: Wed, 4 Apr 2018 12:55:44 -0400 Subject: [PATCH] Do not use threading.Event Waiting on a threading.Event with eventlet can cause busy looping via epoll_wait, see related bug for more details. Change-Id: I007613058a2d21d1712c02fa6d1602b63705c1ab Related-bug: #1518430 --- lower-constraints.txt | 2 +- .../_drivers/amqp1_driver/controller.py | 5 ++-- oslo_messaging/_drivers/amqpdriver.py | 9 ++++--- oslo_messaging/_drivers/impl_fake.py | 3 ++- oslo_messaging/_drivers/impl_kafka.py | 2 +- oslo_messaging/rpc/dispatcher.py | 4 ++- .../tests/drivers/test_amqp_driver.py | 9 ++++--- .../tests/drivers/test_impl_rabbit.py | 3 ++- oslo_messaging/tests/rpc/test_server.py | 27 ++++++++++--------- oslo_messaging/tests/utils.py | 5 ++-- requirements.txt | 2 +- 11 files changed, 40 insertions(+), 31 deletions(-) diff --git a/lower-constraints.txt b/lower-constraints.txt index ff23649dc..abf4267a0 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -48,7 +48,7 @@ oslo.log==3.36.0 oslo.middleware==3.31.0 oslo.serialization==2.18.0 oslo.service==1.24.0 -oslo.utils==3.33.0 +oslo.utils==3.37.0 oslotest==3.2.0 Paste==2.0.2 PasteDeploy==1.5.0 diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py index 6803c9194..24dfc99ba 100644 --- a/oslo_messaging/_drivers/amqp1_driver/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -35,6 +35,7 @@ import threading import time import uuid +from oslo_utils import eventletutils import proton import pyngus from six import iteritems @@ -85,7 +86,7 @@ class SubscribeTask(Task): self._subscriber_id = listener.id self._in_queue = listener.incoming self._service = SERVICE_NOTIFY if notifications else SERVICE_RPC - self._wakeup = threading.Event() + self._wakeup = eventletutils.Event() def wait(self): self._wakeup.wait() @@ -112,7 +113,7 @@ class SendTask(Task): self.service = SERVICE_NOTIFY if notification else SERVICE_RPC self.timer = None self._retry = None if retry is None or retry < 0 else retry - self._wakeup = threading.Event() + self._wakeup = eventletutils.Event() self._error = None self._sender = None diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index ccab0ce07..b4d0a234e 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -19,6 +19,7 @@ import time import uuid import cachetools +from oslo_utils import eventletutils from oslo_utils import timeutils from six import moves @@ -49,7 +50,7 @@ class MessageOperationsHandler(object): self.name = "%s (%s)" % (name, hex(id(self))) self._tasks = moves.queue.Queue() - self._shutdown = threading.Event() + self._shutdown = eventletutils.Event() self._shutdown_thread = threading.Thread( target=self._process_in_background) self._shutdown_thread.daemon = True @@ -270,8 +271,8 @@ class AMQPListener(base.PollStyleListener): self.conn = conn self.msg_id_cache = rpc_amqp._MsgIdCache() self.incoming = [] - self._shutdown = threading.Event() - self._shutoff = threading.Event() + self._shutdown = eventletutils.Event() + self._shutoff = eventletutils.Event() self._obsolete_reply_queues = ObsoleteReplyQueuesCache() self._message_operations_handler = MessageOperationsHandler( "AMQPListener") @@ -434,7 +435,7 @@ class ReplyWaiter(object): self.conn.declare_direct_consumer(reply_q, self) - self._thread_exit_event = threading.Event() + self._thread_exit_event = eventletutils.Event() self._thread = threading.Thread(target=self.poll) self._thread.daemon = True self._thread.start() diff --git a/oslo_messaging/_drivers/impl_fake.py b/oslo_messaging/_drivers/impl_fake.py index c5476fdf8..47fa4a382 100644 --- a/oslo_messaging/_drivers/impl_fake.py +++ b/oslo_messaging/_drivers/impl_fake.py @@ -18,6 +18,7 @@ import threading import time from oslo_serialization import jsonutils +from oslo_utils import eventletutils from six import moves import oslo_messaging @@ -49,7 +50,7 @@ class FakeListener(base.PollStyleListener): self._exchange_manager = exchange_manager self._targets = targets self._pool = pool - self._stopped = threading.Event() + self._stopped = eventletutils.Event() # NOTE(sileht): Ensure that all needed queues exists even the listener # have not been polled yet diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 07b473d0a..88fdb7eb0 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -347,7 +347,7 @@ class KafkaListener(base.PollStyleListener): def __init__(self, conn): super(KafkaListener, self).__init__() - self._stopped = threading.Event() + self._stopped = eventletutils.Event() self.conn = conn self.incoming_queue = [] diff --git a/oslo_messaging/rpc/dispatcher.py b/oslo_messaging/rpc/dispatcher.py index 0db66d0d1..c118931e4 100644 --- a/oslo_messaging/rpc/dispatcher.py +++ b/oslo_messaging/rpc/dispatcher.py @@ -24,6 +24,8 @@ import threading import six +from oslo_utils import eventletutils + from oslo_messaging import _utils as utils from oslo_messaging import dispatcher from oslo_messaging import serializer as msg_serializer @@ -249,7 +251,7 @@ class RPCDispatcher(dispatcher.DispatcherBase): # is executing if it runs for some time. The thread will wait # for the event to be signaled, which we do explicitly below # after dispatching the method call. - completion_event = threading.Event() + completion_event = eventletutils.Event() watchdog_thread = threading.Thread(target=self._watchdog, args=(completion_event, incoming)) if incoming.client_timeout: diff --git a/oslo_messaging/tests/drivers/test_amqp_driver.py b/oslo_messaging/tests/drivers/test_amqp_driver.py index 831e1611e..cfcc0678b 100644 --- a/oslo_messaging/tests/drivers/test_amqp_driver.py +++ b/oslo_messaging/tests/drivers/test_amqp_driver.py @@ -27,6 +27,7 @@ import threading import time import uuid +from oslo_utils import eventletutils from oslo_utils import importutils from six import moves from string import Template @@ -75,8 +76,8 @@ class _ListenerThread(threading.Thread): self._msg_ack = msg_ack self.messages = moves.queue.Queue() self.daemon = True - self.started = threading.Event() - self._done = threading.Event() + self.started = eventletutils.Event() + self._done = eventletutils.Event() self.start() self.started.wait() @@ -1146,7 +1147,7 @@ class TestLinkRecovery(_AmqpBrokerTestCase): self._addrs = {'unicast.test-topic': 2, 'broadcast.test-topic.all': 2, 'exclusive.test-topic.server': 2} - self._recovered = threading.Event() + self._recovered = eventletutils.Event() self._count = 0 def _on_active(link): @@ -2100,7 +2101,7 @@ class FakeBroker(threading.Thread): self._connections = {} self._sources = {} - self._pause = threading.Event() + self._pause = eventletutils.Event() # count of messages forwarded, by messaging pattern self.direct_count = 0 self.topic_count = 0 diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index ce6d11d7c..b7ab4f3f8 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -23,6 +23,7 @@ import fixtures import kombu import kombu.transport.memory from oslo_serialization import jsonutils +from oslo_utils import eventletutils import testscenarios import oslo_messaging @@ -49,7 +50,7 @@ class TestHeartbeat(test_utils.BaseTestCase): fake_logger, heartbeat_side_effect=None, info=None): - event = threading.Event() + event = eventletutils.Event() def heartbeat_check(rate=2): event.set() diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py index 693e88a52..b4ec519b6 100644 --- a/oslo_messaging/tests/rpc/test_server.py +++ b/oslo_messaging/tests/rpc/test_server.py @@ -19,6 +19,7 @@ import warnings import eventlet import fixtures from oslo_config import cfg +from oslo_utils import eventletutils from six.moves import mock import testscenarios @@ -61,7 +62,7 @@ class ServerSetupMixin(object): class ServerController(object): def __init__(self): - self.stopped = threading.Event() + self.stopped = eventletutils.Event() def stop(self, ctxt): self.stopped.set() @@ -704,11 +705,11 @@ class TestServerLocking(test_utils.BaseTestCase): # Test that if 2 threads call a method simultaneously, both will wait, # but only 1 will call the underlying executor method. - start_event = threading.Event() - finish_event = threading.Event() + start_event = eventletutils.Event() + finish_event = eventletutils.Event() - running_event = threading.Event() - done_event = threading.Event() + running_event = eventletutils.Event() + done_event = eventletutils.Event() _runner = [None] @@ -734,7 +735,7 @@ class TestServerLocking(test_utils.BaseTestCase): runner = _runner[0] waiter = start2 if runner == start1 else start2 - waiter_finished = threading.Event() + waiter_finished = eventletutils.Event() waiter.link(lambda _: waiter_finished.set()) # At this point, runner is running start(), and waiter() is waiting for @@ -783,8 +784,8 @@ class TestServerLocking(test_utils.BaseTestCase): # Ensure that if 2 threads wait for the completion of 'start', the # first will wait until complete_event is signalled, but the second # will continue - complete_event = threading.Event() - complete_waiting_callback = threading.Event() + complete_event = eventletutils.Event() + complete_waiting_callback = eventletutils.Event() start_state = self.server._states['start'] old_wait_for_completion = start_state.wait_for_completion @@ -801,7 +802,7 @@ class TestServerLocking(test_utils.BaseTestCase): # thread1 will wait for start to complete until we signal it thread1 = eventlet.spawn(self.server.stop) - thread1_finished = threading.Event() + thread1_finished = eventletutils.Event() thread1.link(lambda _: thread1_finished.set()) self.server.start() @@ -847,7 +848,7 @@ class TestServerLocking(test_utils.BaseTestCase): # Test that we generate a log message if we wait longer than # DEFAULT_LOG_AFTER - log_event = threading.Event() + log_event = eventletutils.Event() mock_log.warning.side_effect = lambda _, __: log_event.set() # Call stop without calling start. We should log a wait after 1 second @@ -863,7 +864,7 @@ class TestServerLocking(test_utils.BaseTestCase): # Test that we generate a log message if we wait longer than # the number of seconds passed to log_after - log_event = threading.Event() + log_event = eventletutils.Event() mock_log.warning.side_effect = lambda _, __: log_event.set() # Call stop without calling start. We should log a wait after 1 second @@ -879,7 +880,7 @@ class TestServerLocking(test_utils.BaseTestCase): # Test that we log a message after log_after seconds if we've also # specified an absolute timeout - log_event = threading.Event() + log_event = eventletutils.Event() mock_log.warning.side_effect = lambda _, __: log_event.set() # Call stop without calling start. We should log a wait after 1 second @@ -904,7 +905,7 @@ class TestServerLocking(test_utils.BaseTestCase): # Start the server, which will also instantiate an executor self.server.start() self.server.stop() - shutdown_called = threading.Event() + shutdown_called = eventletutils.Event() # Patch the executor's stop method to be very slow def slow_shutdown(wait): diff --git a/oslo_messaging/tests/utils.py b/oslo_messaging/tests/utils.py index c5b25e338..057796678 100644 --- a/oslo_messaging/tests/utils.py +++ b/oslo_messaging/tests/utils.py @@ -22,6 +22,7 @@ import threading from oslo_config import cfg +from oslo_utils import eventletutils from oslotest import base @@ -63,8 +64,8 @@ class ServerThreadHelper(threading.Thread): super(ServerThreadHelper, self).__init__() self.daemon = True self._server = server - self._stop_event = threading.Event() - self._start_event = threading.Event() + self._stop_event = eventletutils.Event() + self._start_event = eventletutils.Event() def start(self): super(ServerThreadHelper, self).start() diff --git a/requirements.txt b/requirements.txt index 6de25a490..ad19a3877 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0 futurist>=1.2.0 # Apache-2.0 oslo.config>=5.2.0 # Apache-2.0 oslo.log>=3.36.0 # Apache-2.0 -oslo.utils>=3.33.0 # Apache-2.0 +oslo.utils>=3.37.0 # Apache-2.0 oslo.serialization!=2.19.1,>=2.18.0 # Apache-2.0 oslo.service!=1.28.1,>=1.24.0 # Apache-2.0 stevedore>=1.20.0 # Apache-2.0