Merge "Do not use threading.Event"

This commit is contained in:
Zuul 2019-12-18 23:58:11 +00:00 committed by Gerrit Code Review
commit 4e49a286e7
11 changed files with 40 additions and 31 deletions

View File

@ -48,7 +48,7 @@ oslo.log==3.36.0
oslo.middleware==3.31.0 oslo.middleware==3.31.0
oslo.serialization==2.18.0 oslo.serialization==2.18.0
oslo.service==1.24.0 oslo.service==1.24.0
oslo.utils==3.33.0 oslo.utils==3.37.0
oslotest==3.2.0 oslotest==3.2.0
Paste==2.0.2 Paste==2.0.2
PasteDeploy==1.5.0 PasteDeploy==1.5.0

View File

@ -35,6 +35,7 @@ import threading
import time import time
import uuid import uuid
from oslo_utils import eventletutils
import proton import proton
import pyngus import pyngus
from six import iteritems from six import iteritems
@ -85,7 +86,7 @@ class SubscribeTask(Task):
self._subscriber_id = listener.id self._subscriber_id = listener.id
self._in_queue = listener.incoming self._in_queue = listener.incoming
self._service = SERVICE_NOTIFY if notifications else SERVICE_RPC self._service = SERVICE_NOTIFY if notifications else SERVICE_RPC
self._wakeup = threading.Event() self._wakeup = eventletutils.Event()
def wait(self): def wait(self):
self._wakeup.wait() self._wakeup.wait()
@ -112,7 +113,7 @@ class SendTask(Task):
self.service = SERVICE_NOTIFY if notification else SERVICE_RPC self.service = SERVICE_NOTIFY if notification else SERVICE_RPC
self.timer = None self.timer = None
self._retry = None if retry is None or retry < 0 else retry self._retry = None if retry is None or retry < 0 else retry
self._wakeup = threading.Event() self._wakeup = eventletutils.Event()
self._error = None self._error = None
self._sender = None self._sender = None

View File

@ -19,6 +19,7 @@ import time
import uuid import uuid
import cachetools import cachetools
from oslo_utils import eventletutils
from oslo_utils import timeutils from oslo_utils import timeutils
from six import moves from six import moves
@ -49,7 +50,7 @@ class MessageOperationsHandler(object):
self.name = "%s (%s)" % (name, hex(id(self))) self.name = "%s (%s)" % (name, hex(id(self)))
self._tasks = moves.queue.Queue() self._tasks = moves.queue.Queue()
self._shutdown = threading.Event() self._shutdown = eventletutils.Event()
self._shutdown_thread = threading.Thread( self._shutdown_thread = threading.Thread(
target=self._process_in_background) target=self._process_in_background)
self._shutdown_thread.daemon = True self._shutdown_thread.daemon = True
@ -270,8 +271,8 @@ class AMQPListener(base.PollStyleListener):
self.conn = conn self.conn = conn
self.msg_id_cache = rpc_amqp._MsgIdCache() self.msg_id_cache = rpc_amqp._MsgIdCache()
self.incoming = [] self.incoming = []
self._shutdown = threading.Event() self._shutdown = eventletutils.Event()
self._shutoff = threading.Event() self._shutoff = eventletutils.Event()
self._obsolete_reply_queues = ObsoleteReplyQueuesCache() self._obsolete_reply_queues = ObsoleteReplyQueuesCache()
self._message_operations_handler = MessageOperationsHandler( self._message_operations_handler = MessageOperationsHandler(
"AMQPListener") "AMQPListener")
@ -434,7 +435,7 @@ class ReplyWaiter(object):
self.conn.declare_direct_consumer(reply_q, self) self.conn.declare_direct_consumer(reply_q, self)
self._thread_exit_event = threading.Event() self._thread_exit_event = eventletutils.Event()
self._thread = threading.Thread(target=self.poll) self._thread = threading.Thread(target=self.poll)
self._thread.daemon = True self._thread.daemon = True
self._thread.start() self._thread.start()

View File

@ -18,6 +18,7 @@ import threading
import time import time
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
from oslo_utils import eventletutils
from six import moves from six import moves
import oslo_messaging import oslo_messaging
@ -49,7 +50,7 @@ class FakeListener(base.PollStyleListener):
self._exchange_manager = exchange_manager self._exchange_manager = exchange_manager
self._targets = targets self._targets = targets
self._pool = pool self._pool = pool
self._stopped = threading.Event() self._stopped = eventletutils.Event()
# NOTE(sileht): Ensure that all needed queues exists even the listener # NOTE(sileht): Ensure that all needed queues exists even the listener
# have not been polled yet # have not been polled yet

View File

@ -347,7 +347,7 @@ class KafkaListener(base.PollStyleListener):
def __init__(self, conn): def __init__(self, conn):
super(KafkaListener, self).__init__() super(KafkaListener, self).__init__()
self._stopped = threading.Event() self._stopped = eventletutils.Event()
self.conn = conn self.conn = conn
self.incoming_queue = [] self.incoming_queue = []

View File

@ -24,6 +24,8 @@ import threading
import six import six
from oslo_utils import eventletutils
from oslo_messaging import _utils as utils from oslo_messaging import _utils as utils
from oslo_messaging import dispatcher from oslo_messaging import dispatcher
from oslo_messaging import serializer as msg_serializer from oslo_messaging import serializer as msg_serializer
@ -249,7 +251,7 @@ class RPCDispatcher(dispatcher.DispatcherBase):
# is executing if it runs for some time. The thread will wait # is executing if it runs for some time. The thread will wait
# for the event to be signaled, which we do explicitly below # for the event to be signaled, which we do explicitly below
# after dispatching the method call. # after dispatching the method call.
completion_event = threading.Event() completion_event = eventletutils.Event()
watchdog_thread = threading.Thread(target=self._watchdog, watchdog_thread = threading.Thread(target=self._watchdog,
args=(completion_event, incoming)) args=(completion_event, incoming))
if incoming.client_timeout: if incoming.client_timeout:

View File

@ -27,6 +27,7 @@ import threading
import time import time
import uuid import uuid
from oslo_utils import eventletutils
from oslo_utils import importutils from oslo_utils import importutils
from six import moves from six import moves
from string import Template from string import Template
@ -75,8 +76,8 @@ class _ListenerThread(threading.Thread):
self._msg_ack = msg_ack self._msg_ack = msg_ack
self.messages = moves.queue.Queue() self.messages = moves.queue.Queue()
self.daemon = True self.daemon = True
self.started = threading.Event() self.started = eventletutils.Event()
self._done = threading.Event() self._done = eventletutils.Event()
self.start() self.start()
self.started.wait() self.started.wait()
@ -1146,7 +1147,7 @@ class TestLinkRecovery(_AmqpBrokerTestCase):
self._addrs = {'unicast.test-topic': 2, self._addrs = {'unicast.test-topic': 2,
'broadcast.test-topic.all': 2, 'broadcast.test-topic.all': 2,
'exclusive.test-topic.server': 2} 'exclusive.test-topic.server': 2}
self._recovered = threading.Event() self._recovered = eventletutils.Event()
self._count = 0 self._count = 0
def _on_active(link): def _on_active(link):
@ -2100,7 +2101,7 @@ class FakeBroker(threading.Thread):
self._connections = {} self._connections = {}
self._sources = {} self._sources = {}
self._pause = threading.Event() self._pause = eventletutils.Event()
# count of messages forwarded, by messaging pattern # count of messages forwarded, by messaging pattern
self.direct_count = 0 self.direct_count = 0
self.topic_count = 0 self.topic_count = 0

View File

@ -23,6 +23,7 @@ import fixtures
import kombu import kombu
import kombu.transport.memory import kombu.transport.memory
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
from oslo_utils import eventletutils
import testscenarios import testscenarios
import oslo_messaging import oslo_messaging
@ -49,7 +50,7 @@ class TestHeartbeat(test_utils.BaseTestCase):
fake_logger, heartbeat_side_effect=None, fake_logger, heartbeat_side_effect=None,
info=None): info=None):
event = threading.Event() event = eventletutils.Event()
def heartbeat_check(rate=2): def heartbeat_check(rate=2):
event.set() event.set()

View File

@ -19,6 +19,7 @@ import warnings
import eventlet import eventlet
import fixtures import fixtures
from oslo_config import cfg from oslo_config import cfg
from oslo_utils import eventletutils
from six.moves import mock from six.moves import mock
import testscenarios import testscenarios
@ -61,7 +62,7 @@ class ServerSetupMixin(object):
class ServerController(object): class ServerController(object):
def __init__(self): def __init__(self):
self.stopped = threading.Event() self.stopped = eventletutils.Event()
def stop(self, ctxt): def stop(self, ctxt):
self.stopped.set() self.stopped.set()
@ -704,11 +705,11 @@ class TestServerLocking(test_utils.BaseTestCase):
# Test that if 2 threads call a method simultaneously, both will wait, # Test that if 2 threads call a method simultaneously, both will wait,
# but only 1 will call the underlying executor method. # but only 1 will call the underlying executor method.
start_event = threading.Event() start_event = eventletutils.Event()
finish_event = threading.Event() finish_event = eventletutils.Event()
running_event = threading.Event() running_event = eventletutils.Event()
done_event = threading.Event() done_event = eventletutils.Event()
_runner = [None] _runner = [None]
@ -734,7 +735,7 @@ class TestServerLocking(test_utils.BaseTestCase):
runner = _runner[0] runner = _runner[0]
waiter = start2 if runner == start1 else start2 waiter = start2 if runner == start1 else start2
waiter_finished = threading.Event() waiter_finished = eventletutils.Event()
waiter.link(lambda _: waiter_finished.set()) waiter.link(lambda _: waiter_finished.set())
# At this point, runner is running start(), and waiter() is waiting for # At this point, runner is running start(), and waiter() is waiting for
@ -783,8 +784,8 @@ class TestServerLocking(test_utils.BaseTestCase):
# Ensure that if 2 threads wait for the completion of 'start', the # Ensure that if 2 threads wait for the completion of 'start', the
# first will wait until complete_event is signalled, but the second # first will wait until complete_event is signalled, but the second
# will continue # will continue
complete_event = threading.Event() complete_event = eventletutils.Event()
complete_waiting_callback = threading.Event() complete_waiting_callback = eventletutils.Event()
start_state = self.server._states['start'] start_state = self.server._states['start']
old_wait_for_completion = start_state.wait_for_completion old_wait_for_completion = start_state.wait_for_completion
@ -801,7 +802,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# thread1 will wait for start to complete until we signal it # thread1 will wait for start to complete until we signal it
thread1 = eventlet.spawn(self.server.stop) thread1 = eventlet.spawn(self.server.stop)
thread1_finished = threading.Event() thread1_finished = eventletutils.Event()
thread1.link(lambda _: thread1_finished.set()) thread1.link(lambda _: thread1_finished.set())
self.server.start() self.server.start()
@ -847,7 +848,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# Test that we generate a log message if we wait longer than # Test that we generate a log message if we wait longer than
# DEFAULT_LOG_AFTER # DEFAULT_LOG_AFTER
log_event = threading.Event() log_event = eventletutils.Event()
mock_log.warning.side_effect = lambda _, __: log_event.set() mock_log.warning.side_effect = lambda _, __: log_event.set()
# Call stop without calling start. We should log a wait after 1 second # Call stop without calling start. We should log a wait after 1 second
@ -863,7 +864,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# Test that we generate a log message if we wait longer than # Test that we generate a log message if we wait longer than
# the number of seconds passed to log_after # the number of seconds passed to log_after
log_event = threading.Event() log_event = eventletutils.Event()
mock_log.warning.side_effect = lambda _, __: log_event.set() mock_log.warning.side_effect = lambda _, __: log_event.set()
# Call stop without calling start. We should log a wait after 1 second # Call stop without calling start. We should log a wait after 1 second
@ -879,7 +880,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# Test that we log a message after log_after seconds if we've also # Test that we log a message after log_after seconds if we've also
# specified an absolute timeout # specified an absolute timeout
log_event = threading.Event() log_event = eventletutils.Event()
mock_log.warning.side_effect = lambda _, __: log_event.set() mock_log.warning.side_effect = lambda _, __: log_event.set()
# Call stop without calling start. We should log a wait after 1 second # Call stop without calling start. We should log a wait after 1 second
@ -904,7 +905,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# Start the server, which will also instantiate an executor # Start the server, which will also instantiate an executor
self.server.start() self.server.start()
self.server.stop() self.server.stop()
shutdown_called = threading.Event() shutdown_called = eventletutils.Event()
# Patch the executor's stop method to be very slow # Patch the executor's stop method to be very slow
def slow_shutdown(wait): def slow_shutdown(wait):

View File

@ -22,6 +22,7 @@
import threading import threading
from oslo_config import cfg from oslo_config import cfg
from oslo_utils import eventletutils
from oslotest import base from oslotest import base
@ -63,8 +64,8 @@ class ServerThreadHelper(threading.Thread):
super(ServerThreadHelper, self).__init__() super(ServerThreadHelper, self).__init__()
self.daemon = True self.daemon = True
self._server = server self._server = server
self._stop_event = threading.Event() self._stop_event = eventletutils.Event()
self._start_event = threading.Event() self._start_event = eventletutils.Event()
def start(self): def start(self):
super(ServerThreadHelper, self).start() super(ServerThreadHelper, self).start()

View File

@ -7,7 +7,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0
futurist>=1.2.0 # Apache-2.0 futurist>=1.2.0 # Apache-2.0
oslo.config>=5.2.0 # Apache-2.0 oslo.config>=5.2.0 # Apache-2.0
oslo.log>=3.36.0 # Apache-2.0 oslo.log>=3.36.0 # Apache-2.0
oslo.utils>=3.33.0 # Apache-2.0 oslo.utils>=3.37.0 # Apache-2.0
oslo.serialization!=2.19.1,>=2.18.0 # Apache-2.0 oslo.serialization!=2.19.1,>=2.18.0 # Apache-2.0
oslo.service!=1.28.1,>=1.24.0 # Apache-2.0 oslo.service!=1.28.1,>=1.24.0 # Apache-2.0
stevedore>=1.20.0 # Apache-2.0 stevedore>=1.20.0 # Apache-2.0