Implement consumer cancel notification in astute

- auto_recover exchange/queue in case of consumer canceled queue
- amqp dependency changed to 1.4.1
- autorecovery performed for each each exchange/queue

Change-Id: I28d44841a207585d5d7b4a7920b58132be7b7394
Closes-Bug: #1340555
This commit is contained in:
Dima Shulyak 2014-07-11 10:22:15 +03:00
parent 18a6861ecf
commit 6498789c89
3 changed files with 9 additions and 2 deletions

View File

@ -18,7 +18,7 @@ Gem::Specification.new do |s|
s.add_dependency 'net-ssh-multi', '~> 1.1'
# Astute as service
s.add_dependency 'amqp', '0.9.10'
s.add_dependency 'amqp', '1.4.1'
s.add_dependency 'raemon', '0.3.0'
s.add_development_dependency 'rake', '10.0.4'

View File

@ -48,6 +48,13 @@ module Astute
def main_worker
@consumer = AMQP::Consumer.new(@channel, @queue, consumer_tag=nil, exclusive=false, no_ack=true)
@consumer.on_cancel do |basic_cancel|
Astute.logger.debug("Received cancel notification from in main worker.")
@exchange.auto_recover
@service_exchange.auto_recover
@queue.auto_recover
@service_queue.auto_recover
end
@consumer.on_delivery do |metadata, payload|
if @main_work_thread.nil? || !@main_work_thread.alive?
Astute.logger.debug "Process message from worker queue: #{payload.inspect}"

View File

@ -95,7 +95,7 @@ module Astute
def create_channel(connection, prefetch=true)
prefetch_opts = ( prefetch ? {:prefetch => 1} : {} )
channel = AMQP::Channel.new(connection, AMQP::Channel.next_channel_id, prefetch_opts)
channel = AMQP::Channel.new(connection, connection.next_channel_id, prefetch_opts)
channel.auto_recovery = true
channel.on_error do |ch, error|
if error.reply_code == 406 #PRECONDITION_FAILED