@ -50,24 +50,6 @@ qpid_opts = [
cfg . StrOpt ( ' qpid_sasl_mechanisms ' ,
default = ' ' ,
help = ' Space separated list of SASL mechanisms to use for auth ' ) ,
cfg . BoolOpt ( ' qpid_reconnect ' ,
default = True ,
help = ' Automatically reconnect ' ) ,
cfg . IntOpt ( ' qpid_reconnect_timeout ' ,
default = 0 ,
help = ' Reconnection timeout in seconds ' ) ,
cfg . IntOpt ( ' qpid_reconnect_limit ' ,
default = 0 ,
help = ' Max reconnections before giving up ' ) ,
cfg . IntOpt ( ' qpid_reconnect_interval_min ' ,
default = 0 ,
help = ' Minimum seconds between reconnection attempts ' ) ,
cfg . IntOpt ( ' qpid_reconnect_interval_max ' ,
default = 0 ,
help = ' Maximum seconds between reconnection attempts ' ) ,
cfg . IntOpt ( ' qpid_reconnect_interval ' ,
default = 0 ,
help = ' Equivalent to setting max and min to the same value ' ) ,
cfg . IntOpt ( ' qpid_heartbeat ' ,
default = 60 ,
help = ' Seconds between connection keepalive heartbeats ' ) ,
@ -294,50 +276,36 @@ class Connection(object):
self . consumer_thread = None
self . conf = conf
if server_params is None :
server_params = { }
default_params = dict ( hostname = self . conf . qpid_hostname ,
port = self . conf . qpid_port ,
username = self . conf . qpid_username ,
password = self . conf . qpid_password )
params = server_params
for key in default_params . keys ( ) :
params . setdefault ( key , default_params [ key ] )
params = {
' hostname ' : self . conf . qpid_hostname ,
' port ' : self . conf . qpid_port ,
' username ' : self . conf . qpid_username ,
' password ' : self . conf . qpid_password ,
}
params . update ( server_params or { } )
self . broker = params [ ' hostname ' ] + " : " + str ( params [ ' port ' ] )
self . username = params [ ' username ' ]
self . password = params [ ' password ' ]
self . connection_create ( )
self . reconnect ( )
def connection_create ( self ) :
# Create the connection - this does not open the connection
self . connection = qpid . messaging . Connection ( self . broker )
# Check if flags are set and if so set them for the connection
# before we call open
self . connection . username = params [ ' username ' ]
self . connection . password = params [ ' password ' ]
self . connection . username = self . username
self . connection . password = self . password
self . connection . sasl_mechanisms = self . conf . qpid_sasl_mechanisms
self . connection . reconnect = self . conf . qpid_reconnect
if self . conf . qpid_reconnect_timeout :
self . connection . reconnect_timeout = (
self . conf . qpid_reconnect_timeout )
if self . conf . qpid_reconnect_limit :
self . connection . reconnect_limit = self . conf . qpid_reconnect_limit
if self . conf . qpid_reconnect_interval_max :
self . connection . reconnect_interval_max = (
self . conf . qpid_reconnect_interval_max )
if self . conf . qpid_reconnect_interval_min :
self . connection . reconnect_interval_min = (
self . conf . qpid_reconnect_interval_min )
if self . conf . qpid_reconnect_interval :
self . connection . reconnect_interval = (
self . conf . qpid_reconnect_interval )
# Reconnection is done by self.reconnect()
self . connection . reconnect = False
self . connection . heartbeat = self . conf . qpid_heartbeat
self . connection . protocol = self . conf . qpid_protocol
self . connection . tcp_nodelay = self . conf . qpid_tcp_nodelay
# Open is part of reconnect -
# NOTE(WGH) not sure we need this with the reconnect flags
self . reconnect ( )
def _register_consumer ( self , consumer ) :
self . consumers [ str ( consumer . get_receiver ( ) ) ] = consumer
@ -352,12 +320,18 @@ class Connection(object):
except qpid . messaging . exceptions . ConnectionError :
pass
delay = 1
while True :
try :
self . connection_create ( )
self . connection . open ( )
except qpid . messaging . exceptions . ConnectionError , e :
LOG . error ( _ ( ' Unable to connect to AMQP server: %s ' ) , e )
time . sleep ( self . conf . qpid_reconnect_interval or 1 )
msg_dict = dict ( e = e , delay = delay )
msg = _ ( " Unable to connect to AMQP server: %(e)s . "
" Sleeping %(delay)s seconds " ) % msg_dict
LOG . error ( msg )
time . sleep ( delay )
delay = min ( 2 * delay , 60 )
else :
break
@ -365,10 +339,14 @@ class Connection(object):
self . session = self . connection . session ( )
for consumer in self . consumers . itervalues ( ) :
consumer . reconnect ( self . session )
if self . consumers :
consumers = self . consumers
self . consumers = { }
for consumer in consumers . itervalues ( ) :
consumer . reconnect ( self . session )
self . _register_consumer ( consumer )
LOG . debug ( _ ( " Re-established AMQP queues " ) )
def ensure ( self , error_callback , method , * args , * * kwargs ) :