@ -12,19 +12,20 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import functools
import itertools
import logging
import os
import socket
import ssl
import threading
import time
import uuid
import kombu
import kombu.connection
import kombu.entity
import kombu.exceptions
import kombu.messaging
from oslo_config import cfg
from oslo_utils import netutils
@ -35,6 +36,7 @@ from oslo_messaging._drivers import amqp as rpc_amqp
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._i18n import _
from oslo_messaging._i18n import _LE
from oslo_messaging._i18n import _LI
from oslo_messaging._i18n import _LW
from oslo_messaging import exceptions
@ -120,6 +122,15 @@ rabbit_opts = [
help = ' Use HA queues in RabbitMQ (x-ha-policy: all). '
' If you change this option, you must wipe the '
' RabbitMQ database. ' ) ,
cfg . IntOpt ( ' heartbeat_timeout_threshold ' ,
default = 60 ,
help = " Number of seconds after which the Rabbit broker is "
" considered down if heartbeat ' s keep-alive fails "
" (0 disable the heartbeat). " ) ,
cfg . IntOpt ( ' heartbeat_rate ' ,
default = 2 ,
help = ' How often times during the heartbeat_timeout_threshold '
' we check the heartbeat. ' ) ,
# NOTE(sileht): deprecated option since oslo_messaging 1.5.0,
cfg . BoolOpt ( ' fake_rabbit ' ,
@ -460,12 +471,119 @@ class NotifyPublisher(TopicPublisher):
queue . declare ( )
class DummyConnectionLock ( object ) :
def acquire ( self ) :
pass
def release ( self ) :
pass
def heartbeat_acquire ( self ) :
pass
def __enter__ ( self ) :
self . acquire ( )
def __exit__ ( self , type , value , traceback ) :
self . release ( )
class ConnectionLock ( DummyConnectionLock ) :
""" Lock object to protect access the the kombu connection
This is a lock object to protect access the the kombu connection
object between the heartbeat thread and the driver thread .
They are two way to acquire this lock :
* lock . acquire ( )
* lock . heartbeat_acquire ( )
In both case lock . release ( ) , release the lock .
The goal is that the heartbeat thread always have the priority
for acquiring the lock . This ensures we have no heartbeat
starvation when the driver sends a lot of messages .
So when lock . heartbeat_acquire ( ) is called next time the lock
is released ( ) , the caller unconditionnaly acquires
the lock , even someone else have asked for the lock before it .
"""
def __init__ ( self ) :
self . _workers_waiting = 0
self . _heartbeat_waiting = False
self . _lock_acquired = None
self . _monitor = threading . Lock ( )
self . _workers_locks = threading . Condition ( self . _monitor )
self . _heartbeat_lock = threading . Condition ( self . _monitor )
self . _get_thread_id = self . _fetch_current_thread_functor ( )
def acquire ( self ) :
with self . _monitor :
while self . _lock_acquired :
self . _workers_waiting + = 1
self . _workers_locks . wait ( )
self . _workers_waiting - = 1
self . _lock_acquired = self . _get_thread_id ( )
def heartbeat_acquire ( self ) :
# NOTE(sileht): must be called only one time
with self . _monitor :
while self . _lock_acquired is not None :
self . _heartbeat_waiting = True
self . _heartbeat_lock . wait ( )
self . _heartbeat_waiting = False
self . _lock_acquired = self . _get_thread_id ( )
def release ( self ) :
with self . _monitor :
if self . _lock_acquired is None :
raise RuntimeError ( " We can ' t release a not acquired lock " )
thread_id = self . _get_thread_id ( )
if self . _lock_acquired != thread_id :
raise RuntimeError ( " We can ' t release lock acquired by another "
" thread/greenthread; %s vs %s " %
( self . _lock_acquired , thread_id ) )
self . _lock_acquired = None
if self . _heartbeat_waiting :
self . _heartbeat_lock . notify ( )
elif self . _workers_waiting > 0 :
self . _workers_locks . notify ( )
@contextlib.contextmanager
def for_heartbeat ( self ) :
self . heartbeat_acquire ( )
try :
yield
finally :
self . release ( )
@staticmethod
def _fetch_current_thread_functor ( ) :
# Until https://github.com/eventlet/eventlet/issues/172 is resolved
# or addressed we have to use complicated workaround to get a object
# that will not be recycled; the usage of threading.current_thread()
# doesn't appear to currently be monkey patched and therefore isn't
# reliable to use (and breaks badly when used as all threads share
# the same current_thread() object)...
try :
import eventlet
from eventlet import patcher
green_threaded = patcher . is_monkey_patched ( ' thread ' )
except ImportError :
green_threaded = False
if green_threaded :
return lambda : eventlet . getcurrent ( )
else :
return lambda : threading . current_thread ( )
class Connection ( object ) :
""" Connection object. """
pools = { }
def __init__ ( self , conf , url ) :
def __init__ ( self , conf , url , purpose ) :
self . consumers = [ ]
self . consumer_num = itertools . count ( 1 )
self . conf = conf
@ -527,18 +645,47 @@ class Connection(object):
self . do_consume = True
self . _consume_loop_stopped = False
self . channel = None
# NOTE(sileht): if purpose is PURPOSE_LISTEN
# we don't need the lock because we don't
# have a heartbeat thread
if purpose == rpc_amqp . PURPOSE_SEND :
self . _connection_lock = ConnectionLock ( )
else :
self . _connection_lock = DummyConnectionLock ( )
self . connection = kombu . connection . Connection (
self . _url , ssl = self . _fetch_ssl_params ( ) ,
login_method = self . _login_method ,
failover_strategy = " shuffle " )
failover_strategy = " shuffle " ,
heartbeat = self . driver_conf . heartbeat_timeout_threshold )
LOG . info ( _LI ( ' Connecting to AMQP server on %(hostname)s : %(port)d ' ) ,
self . connection . info ( ) )
# NOTE(sileht): kombu recommend to run heartbeat_check every
# seconds, but we use a lock around the kombu connection
# so, to not lock to much this lock to most of the time do nothing
# expected waiting the events drain, we start heartbeat_check and
# retreive the server heartbeat packet only two times more than
# the minimum required for the heartbeat works
# (heatbeat_timeout/heartbeat_rate/2.0, default kombu
# heartbeat_rate is 2)
self . _heartbeat_wait_timeout = (
float ( self . driver_conf . heartbeat_timeout_threshold ) /
float ( self . driver_conf . heartbeat_rate ) / 2.0 )
self . _heartbeat_support_log_emitted = False
# NOTE(sileht): just ensure the connection is setuped at startup
self . ensure ( error_callback = None ,
method = lambda : True )
self . ensure_connection ( )
# NOTE(sileht): if purpose is PURPOSE_LISTEN
# the consume code does the heartbeat stuff
# we don't need a thread
if purpose == rpc_amqp . PURPOSE_SEND :
self . _heartbeat_start ( )
LOG . info ( _LI ( ' Connected to AMQP server on %(hostname)s : %(port)d ' ) ,
self . connection . info ( ) )
@ -602,6 +749,10 @@ class Connection(object):
return ssl_params or True
return False
def ensure_connection ( self ) :
self . ensure ( error_callback = None ,
method = lambda : True )
def ensure ( self , error_callback , method , retry = None ,
timeout_is_error = True ) :
""" Will retry up to retry number of times.
@ -609,6 +760,8 @@ class Connection(object):
retry = - 1 means to retry forever
retry = 0 means no retry
retry = N means N retries
NOTE ( sileht ) : Must be called within the connection lock
"""
current_pid = os . getpid ( )
@ -676,6 +829,7 @@ class Connection(object):
recoverable_errors = ( self . connection . recoverable_channel_errors +
self . connection . recoverable_connection_errors )
try :
autoretry_method = self . connection . autoretry (
execute_method , channel = self . channel ,
@ -703,12 +857,17 @@ class Connection(object):
raise exceptions . MessageDeliveryFailure ( msg )
def _set_current_channel ( self , new_channel ) :
""" Change the channel to use.
NOTE ( sileht ) : Must be called within the connection lock
"""
if self . channel is not None and new_channel != self . channel :
self . connection . maybe_close_channel ( self . channel )
self . channel = new_channel
def close ( self ) :
""" Close/release this connection. """
self . _heartbeat_stop ( )
if self . connection :
self . _set_current_channel ( None )
self . connection . release ( )
@ -716,10 +875,74 @@ class Connection(object):
def reset ( self ) :
""" Reset a connection so it can be used again. """
self . _set_current_channel ( self . connection . channel ( ) )
with self . _connection_lock :
self . _set_current_channel ( self . connection . channel ( ) )
self . consumers = [ ]
self . consumer_num = itertools . count ( 1 )
def _heartbeat_supported_and_enabled ( self ) :
if self . driver_conf . heartbeat_timeout_threshold < = 0 :
return False
if self . connection . supports_heartbeats :
return True
elif not self . _heartbeat_support_log_emitted :
LOG . warn ( _LW ( " Heartbeat support requested but it is not supported "
" by the kombu driver or the broker " ) )
self . _heartbeat_support_log_emitted = True
return False
def _heartbeat_start ( self ) :
if self . _heartbeat_supported_and_enabled ( ) :
self . _heartbeat_exit_event = threading . Event ( )
self . _heartbeat_thread = threading . Thread (
target = self . _heartbeat_thread_job )
self . _heartbeat_thread . daemon = True
self . _heartbeat_thread . start ( )
else :
self . _heartbeat_thread = None
def _heartbeat_stop ( self ) :
if self . _heartbeat_thread is not None :
self . _heartbeat_exit_event . set ( )
self . _heartbeat_thread . join ( )
self . _heartbeat_thread = None
def _heartbeat_thread_job ( self ) :
""" Thread that maintains inactive connections
"""
while not self . _heartbeat_exit_event . is_set ( ) :
with self . _connection_lock . for_heartbeat ( ) :
recoverable_errors = (
self . connection . recoverable_channel_errors +
self . connection . recoverable_connection_errors )
try :
try :
self . connection . heartbeat_check (
rate = self . driver_conf . heartbeat_rate )
# NOTE(sileht): We need to drain event to receive
# heartbeat from the broker but don't hold the
# connection too much times. In amqpdriver a connection
# is used exclusivly for read or for write, so we have
# to do this for connection used for write drain_events
# already do that for other connection
try :
self . connection . drain_events ( timeout = 0.001 )
except socket . timeout :
pass
except recoverable_errors as exc :
LOG . info ( _LI ( " A recoverable connection/channel error "
" occurs, try to reconnect: %s " ) , exc )
except Exception :
LOG . exception ( _LE ( " Unexpected error during heartbeart "
" thread processing, retrying... " ) )
self . _heartbeat_exit_event . wait (
timeout = self . _heartbeat_wait_timeout )
self . _heartbeat_exit_event . clear ( )
def declare_consumer ( self , consumer_cls , topic , callback ) :
""" Create a Consumer using the class that was passed in and
add it to our list of consumers
@ -736,10 +959,14 @@ class Connection(object):
self . consumers . append ( consumer )
return consumer
return self . ensure ( _connect_error , _declare_consumer )
with self . _connection_lock :
return self . ensure ( _connect_error , _declare_consumer )
def iterconsume ( self , limit = None , timeout = None ) :
""" Return an iterator that will consume from all queues/consumers. """
""" Return an iterator that will consume from all queues/consumers.
NOTE ( sileht ) : Must be called within the connection lock
"""
timer = rpc_common . DecayingTimer ( duration = timeout )
timer . start ( )
@ -770,6 +997,9 @@ class Connection(object):
self . _consume_loop_stopped = False
raise StopIteration
if self . _heartbeat_supported_and_enabled ( ) :
self . connection . heartbeat_check (
rate = self . driver_conf . heartbeat_rate )
try :
return self . connection . drain_events ( timeout = poll_timeout )
except socket . timeout as exc :
@ -795,7 +1025,8 @@ class Connection(object):
* * kwargs )
publisher . send ( msg , timeout )
self . ensure ( _error_callback , _publish , retry = retry )
with self . _connection_lock :
self . ensure ( _error_callback , _publish , retry = retry )
def declare_direct_consumer ( self , topic , callback ) :
""" Create a ' direct ' queue.
@ -861,12 +1092,13 @@ class Connection(object):
def consume ( self , limit = None , timeout = None ) :
""" Consume from all queues/consumers. """
it = self . iterconsume ( limit = limit , timeout = timeout )
while True :
try :
six . next ( it )
except StopIteration :
return
with self . _connection_lock :
it = self . iterconsume ( limit = limit , timeout = timeout )
while True :
try :
six . next ( it )
except StopIteration :
return
def stop_consuming ( self ) :
self . _consume_loop_stopped = True