diff --git a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py index dcf9da588..58f8d8af1 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py @@ -16,9 +16,7 @@ import logging import threading import eventlet -import six -from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver import zmq_poller LOG = logging.getLogger(__name__) @@ -27,7 +25,7 @@ LOG = logging.getLogger(__name__) class GreenPoller(zmq_poller.ZmqPoller): def __init__(self): - self.incoming_queue = six.moves.queue.Queue() + self.incoming_queue = eventlet.queue.LightQueue() self.green_pool = eventlet.GreenPool() self.thread_by_socket = {} @@ -46,17 +44,10 @@ class GreenPoller(zmq_poller.ZmqPoller): eventlet.sleep() def poll(self, timeout=None): - incoming = None try: - with eventlet.Timeout(timeout, exception=rpc_common.Timeout): - while incoming is None: - try: - incoming = self.incoming_queue.get_nowait() - except six.moves.queue.Empty: - eventlet.sleep() - except rpc_common.Timeout: - return None, None - return incoming[0], incoming[1] + return self.incoming_queue.get(timeout=timeout) + except eventlet.queue.Empty: + return (None, None) def close(self): for thread in self.thread_by_socket.values():