Send message ack in rpc.call and make queues durable.
This commit is contained in:
@@ -110,6 +110,7 @@ class TopicConsumer(Consumer):
|
|||||||
self.queue = topic
|
self.queue = topic
|
||||||
self.routing_key = topic
|
self.routing_key = topic
|
||||||
self.exchange = FLAGS.control_exchange
|
self.exchange = FLAGS.control_exchange
|
||||||
|
self.durable = False
|
||||||
super(TopicConsumer, self).__init__(connection=connection)
|
super(TopicConsumer, self).__init__(connection=connection)
|
||||||
|
|
||||||
|
|
||||||
@@ -195,7 +196,10 @@ def call(topic, msg):
|
|||||||
conn = Connection.instance()
|
conn = Connection.instance()
|
||||||
d = defer.Deferred()
|
d = defer.Deferred()
|
||||||
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
|
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
|
||||||
consumer.register_callback(lambda data, message: d.callback(data))
|
def deferred_receive(data, message):
|
||||||
|
message.ack()
|
||||||
|
d.callback(data)
|
||||||
|
consumer.register_callback(deferred_receive)
|
||||||
injected = consumer.attach_to_tornado()
|
injected = consumer.attach_to_tornado()
|
||||||
|
|
||||||
# clean up after the injected listened and return x
|
# clean up after the injected listened and return x
|
||||||
@@ -233,7 +237,8 @@ def send_message(topic, message, wait=True):
|
|||||||
exchange=msg_id,
|
exchange=msg_id,
|
||||||
auto_delete=True,
|
auto_delete=True,
|
||||||
exchange_type="direct",
|
exchange_type="direct",
|
||||||
routing_key=msg_id)
|
routing_key=msg_id,
|
||||||
|
durable=False)
|
||||||
consumer.register_callback(generic_response)
|
consumer.register_callback(generic_response)
|
||||||
|
|
||||||
publisher = messaging.Publisher(connection=Connection.instance(),
|
publisher = messaging.Publisher(connection=Connection.instance(),
|
||||||
|
|||||||
Reference in New Issue
Block a user