fakerabbit's declare_consumer should support more than 1 consumer. also: make fakerabbit Backend.consume be an iterator like it should be..

This commit is contained in:
Chris Behrens
2011-05-25 15:42:24 -07:00
committed by termie
parent 1b75e5eba0
commit 2368bc4fe3

View File

@@ -77,6 +77,10 @@ class Queue(object):
class Backend(base.BaseBackend): class Backend(base.BaseBackend):
def __init__(self, connection, **kwargs):
super(Backend, self).__init__(connection, **kwargs)
self.consumers = []
def queue_declare(self, queue, **kwargs): def queue_declare(self, queue, **kwargs):
global QUEUES global QUEUES
if queue not in QUEUES: if queue not in QUEUES:
@@ -97,16 +101,20 @@ class Backend(base.BaseBackend):
EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key) EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
def declare_consumer(self, queue, callback, *args, **kwargs): def declare_consumer(self, queue, callback, *args, **kwargs):
self.current_queue = queue self.consumers.append((queue, callback))
self.current_callback = callback
def consume(self, limit=None): def consume(self, limit=None):
num = 0
while True: while True:
item = self.get(self.current_queue) for (queue, callback) in self.consumers:
if item: item = self.get(queue)
self.current_callback(item) if item:
raise StopIteration() callback(item)
greenthread.sleep(0) num += 1
yield
if limit and num == limit:
raise StopIteration()
greenthread.sleep(0.1)
def get(self, queue, no_ack=False): def get(self, queue, no_ack=False):
global QUEUES global QUEUES