Merge "Ensures listener queues exist in fake driver"
This commit is contained in:
@@ -45,6 +45,12 @@ class FakeListener(base.Listener):
|
|||||||
self._exchange_manager = exchange_manager
|
self._exchange_manager = exchange_manager
|
||||||
self._targets = targets
|
self._targets = targets
|
||||||
|
|
||||||
|
# NOTE(sileht): Ensure that all needed queues exists even the listener
|
||||||
|
# have not been polled yet
|
||||||
|
for target in self._targets:
|
||||||
|
exchange = self._exchange_manager.get_exchange(target.exchange)
|
||||||
|
exchange.ensure_queue(target)
|
||||||
|
|
||||||
def poll(self, timeout=None):
|
def poll(self, timeout=None):
|
||||||
if timeout is not None:
|
if timeout is not None:
|
||||||
deadline = time.time() + timeout
|
deadline = time.time() + timeout
|
||||||
@@ -77,6 +83,13 @@ class FakeExchange(object):
|
|||||||
self._topic_queues = {}
|
self._topic_queues = {}
|
||||||
self._server_queues = {}
|
self._server_queues = {}
|
||||||
|
|
||||||
|
def ensure_queue(self, target):
|
||||||
|
with self._queues_lock:
|
||||||
|
if target.server:
|
||||||
|
self._get_server_queue(target.topic, target.server)
|
||||||
|
else:
|
||||||
|
self._get_topic_queue(target.topic)
|
||||||
|
|
||||||
def _get_topic_queue(self, topic):
|
def _get_topic_queue(self, topic):
|
||||||
return self._topic_queues.setdefault(topic, [])
|
return self._topic_queues.setdefault(topic, [])
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user