Merge "Allow fake driver to consume multiple topics"
This commit is contained in:
commit
286ac38905
@ -66,8 +66,8 @@ class AMQPIncomingMessage(base.IncomingMessage):
|
||||
|
||||
class AMQPListener(base.Listener):
|
||||
|
||||
def __init__(self, driver, target, conn):
|
||||
super(AMQPListener, self).__init__(driver, target)
|
||||
def __init__(self, driver, conn):
|
||||
super(AMQPListener, self).__init__(driver)
|
||||
self.conn = conn
|
||||
self.msg_id_cache = rpc_amqp._MsgIdCache()
|
||||
self.incoming = []
|
||||
@ -395,7 +395,7 @@ class AMQPDriverBase(base.BaseDriver):
|
||||
def listen(self, target):
|
||||
conn = self._get_connection(pooled=False)
|
||||
|
||||
listener = AMQPListener(self, target, conn)
|
||||
listener = AMQPListener(self, conn)
|
||||
|
||||
conn.declare_topic_consumer(target.topic, listener)
|
||||
conn.declare_topic_consumer('%s.%s' % (target.topic, target.server),
|
||||
|
@ -41,10 +41,9 @@ class IncomingMessage(object):
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Listener(object):
|
||||
|
||||
def __init__(self, driver, target):
|
||||
def __init__(self, driver):
|
||||
self.conf = driver.conf
|
||||
self.driver = driver
|
||||
self.target = target
|
||||
|
||||
@abc.abstractmethod
|
||||
def poll(self):
|
||||
|
@ -39,15 +39,17 @@ class FakeIncomingMessage(base.IncomingMessage):
|
||||
|
||||
class FakeListener(base.Listener):
|
||||
|
||||
def __init__(self, driver, target, exchange):
|
||||
super(FakeListener, self).__init__(driver, target)
|
||||
def __init__(self, driver, exchange, targets):
|
||||
super(FakeListener, self).__init__(driver)
|
||||
self._exchange = exchange
|
||||
self._targets = targets
|
||||
|
||||
def poll(self):
|
||||
while True:
|
||||
(ctxt, message, reply_q) = self._exchange.poll(self.target)
|
||||
if message is not None:
|
||||
return FakeIncomingMessage(self, ctxt, message, reply_q)
|
||||
for target in self._targets:
|
||||
(ctxt, message, reply_q) = self._exchange.poll(target)
|
||||
if message is not None:
|
||||
return FakeIncomingMessage(self, ctxt, message, reply_q)
|
||||
time.sleep(.05)
|
||||
|
||||
|
||||
@ -80,8 +82,9 @@ class FakeExchange(object):
|
||||
|
||||
def poll(self, target):
|
||||
with self._queues_lock:
|
||||
queue = self._get_server_queue(target.topic, target.server)
|
||||
if not queue:
|
||||
if target.server:
|
||||
queue = self._get_server_queue(target.topic, target.server)
|
||||
else:
|
||||
queue = self._get_topic_queue(target.topic)
|
||||
return queue.pop(0) if queue else (None, None, None)
|
||||
|
||||
@ -152,7 +155,11 @@ class FakeDriver(base.BaseDriver):
|
||||
exchange = self._get_exchange(target.exchange or
|
||||
self._default_exchange)
|
||||
|
||||
return FakeListener(self, target, exchange)
|
||||
listener = FakeListener(self, exchange,
|
||||
[messaging.Target(topic=target.topic,
|
||||
server=target.server),
|
||||
messaging.Target(topic=target.topic)])
|
||||
return listener
|
||||
|
||||
def cleanup(self):
|
||||
pass
|
||||
|
@ -846,8 +846,8 @@ class ZmqIncomingMessage(base.IncomingMessage):
|
||||
|
||||
class ZmqListener(base.Listener):
|
||||
|
||||
def __init__(self, driver, target):
|
||||
super(ZmqListener, self).__init__(driver, target)
|
||||
def __init__(self, driver):
|
||||
super(ZmqListener, self).__init__(driver)
|
||||
self.incoming_queue = moves.queue.Queue()
|
||||
|
||||
def dispatch(self, ctxt, version, method, namespace, **kwargs):
|
||||
@ -948,7 +948,7 @@ class ZmqDriver(base.BaseDriver):
|
||||
def listen(self, target):
|
||||
conn = create_connection(self.conf)
|
||||
|
||||
listener = ZmqListener(self, target)
|
||||
listener = ZmqListener(self)
|
||||
|
||||
conn.create_consumer(target.topic, listener)
|
||||
conn.create_consumer('%s.%s' % (target.topic, target.server),
|
||||
|
Loading…
Reference in New Issue
Block a user