kludge for kombu 1.1.3 memory transport bug

This commit is contained in:
Chris Behrens
2011-08-31 11:54:19 -07:00
parent bfc4f94298
commit 44d59d00e7

View File

@@ -303,6 +303,7 @@ class Connection(object):
self.interval_stepping = FLAGS.rabbit_retry_backoff
# max retry-interval = 30 seconds
self.interval_max = 30
self.memory_transport = False
self.params = dict(hostname=FLAGS.rabbit_host,
port=FLAGS.rabbit_port,
@@ -311,6 +312,9 @@ class Connection(object):
virtual_host=FLAGS.rabbit_virtual_host)
if FLAGS.fake_rabbit:
self.params['transport'] = 'memory'
self.memory_transport = True
else:
self.memory_transport = False
self.connection = None
self.reconnect()
@@ -323,7 +327,7 @@ class Connection(object):
pass
time.sleep(1)
self.connection = kombu.connection.BrokerConnection(**self.params)
if FLAGS.fake_rabbit:
if self.memory_transport:
# Kludge to speed up tests.
self.connection.transport.polling_interval = 0.0
self.consumer_num = itertools.count(1)
@@ -345,6 +349,9 @@ class Connection(object):
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d' %
self.params))
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
if self.memory_transport:
self.channel._new_queue('ae.undeliver')
for consumer in self.consumers:
consumer.reconnect(self.channel)
if self.consumers:
@@ -374,6 +381,9 @@ class Connection(object):
self.cancel_consumer_thread()
self.channel.close()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
if self.memory_transport:
self.channel._new_queue('ae.undeliver')
self.consumers = []
def declare_consumer(self, consumer_cls, topic, callback):