@ -11,15 +11,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import inspect
import logging
from retrying import retry
from oslo_config import cfg
from oslo_utils import importutils
from oslo_messaging . _drivers . zmq_driver . matchmaker import base
from oslo_messaging . _drivers . zmq_driver . matchmaker import zmq_matchmaker_ base
from oslo_messaging . _drivers . zmq_driver import zmq_address
from retrying import retry
from oslo_messaging. _i18n import _LW
redis = importutils . try_import ( ' redis ' )
redis_sentinel = importutils . try_import ( ' redis.sentinel ' )
@ -53,10 +53,10 @@ matchmaker_redis_opts = [
default = ' oslo-messaging-zeromq ' ,
help = ' Redis replica set name. ' ) ,
cfg . IntOpt ( ' wait_timeout ' ,
default = 5 000,
default = 2 000,
help = ' Time in ms to wait between connection attempts. ' ) ,
cfg . IntOpt ( ' check_timeout ' ,
default = 6 0000,
default = 2 0000,
help = ' Time in ms to wait before the transaction is killed. ' ) ,
cfg . IntOpt ( ' socket_timeout ' ,
default = 10000 ,
@ -65,37 +65,52 @@ matchmaker_redis_opts = [
_PUBLISHERS_KEY = " PUBLISHERS "
_ROUTERS_KEY = " ROUTERS "
_RETRY_METHODS = ( " get_hosts " , " get_publishers " , " get_routers " )
def redis_connection_warn ( func ) :
def func_wrapper ( * args , * * kwargs ) :
try :
return func ( * args , * * kwargs )
except redis . ConnectionError :
LOG . warning ( _LW ( " Redis is currently not available. "
" Messages are being sent to known targets using "
" existing connections. But new nodes "
" can not be discovered until Redis is up "
" and running. " ) )
raise zmq_matchmaker_base . MatchmakerUnavailable ( )
return func_wrapper
def no_reraise ( func ) :
def func_wrapper ( * args , * * kwargs ) :
try :
return func ( * args , * * kwargs )
except zmq_matchmaker_base . MatchmakerUnavailable :
pass
return func_wrapper
def empty_list_on_error ( func ) :
def func_wrapper ( * args , * * kwargs ) :
try :
return func ( * args , * * kwargs )
except zmq_matchmaker_base . MatchmakerUnavailable :
return [ ]
return func_wrapper
def retry_if_connection_error ( ex ) :
return isinstance ( ex , redis . ConnectionError )
return isinstance ( ex , zmq_matchmaker_base. MatchmakerUnavailable )
def retry_if_empty ( hosts ) :
return not hosts
def apply_retrying ( obj , cfg ) :
for attr_name , attr in inspect . getmembers ( obj ) :
if not ( inspect . ismethod ( attr ) or inspect . isfunction ( attr ) ) :
continue
if attr_name in _RETRY_METHODS :
setattr (
obj ,
attr_name ,
retry (
wait_fixed = cfg . matchmaker_redis . wait_timeout ,
stop_max_delay = cfg . matchmaker_redis . check_timeout ,
retry_on_exception = retry_if_connection_error ,
retry_on_result = retry_if_empty
) ( attr ) )
class RedisMatchMaker ( base . MatchMakerBase ) :
class MatchmakerRedis ( zmq_matchmaker_base . MatchmakerBase ) :
def __init__ ( self , conf , * args , * * kwargs ) :
super ( RedisMatchMaker , self ) . __init__ ( conf , * args , * * kwargs )
super ( MatchmakerRedis , self ) . __init__ ( conf , * args , * * kwargs )
self . conf . register_opts ( matchmaker_redis_opts , " matchmaker_redis " )
self . sentinel_hosts = self . _extract_sentinel_options ( )
@ -117,7 +132,6 @@ class RedisMatchMaker(base.MatchMakerBase):
self . conf . matchmaker_redis . sentinel_group_name ,
socket_timeout = socket_timeout
)
apply_retrying ( self , self . conf )
def _extract_sentinel_options ( self ) :
if self . url and self . url . hosts :
@ -143,14 +157,20 @@ class RedisMatchMaker(base.MatchMakerBase):
if expire > 0 :
self . _redis . expire ( key , expire )
@no_reraise
@redis_connection_warn
def register_publisher ( self , hostname , expire = - 1 ) :
host_str = " , " . join ( hostname )
self . _add_key_with_expire ( _PUBLISHERS_KEY , host_str , expire )
@no_reraise
@redis_connection_warn
def unregister_publisher ( self , hostname ) :
host_str = " , " . join ( hostname )
self . _redis . srem ( _PUBLISHERS_KEY , host_str )
@empty_list_on_error
@redis_connection_warn
def get_publishers ( self ) :
hosts = [ ]
hosts . extend ( [ tuple ( host_str . split ( " , " ) )
@ -158,18 +178,25 @@ class RedisMatchMaker(base.MatchMakerBase):
self . _get_hosts_by_key ( _PUBLISHERS_KEY ) ] )
return hosts
@no_reraise
@redis_connection_warn
def register_router ( self , hostname , expire = - 1 ) :
self . _add_key_with_expire ( _ROUTERS_KEY , hostname , expire )
@no_reraise
@redis_connection_warn
def unregister_router ( self , hostname ) :
self . _redis . srem ( _ROUTERS_KEY , hostname )
@empty_list_on_error
@redis_connection_warn
def get_routers ( self ) :
return self . _get_hosts_by_key ( _ROUTERS_KEY )
def _get_hosts_by_key ( self , key ) :
return self . _redis . smembers ( key )
@redis_connection_warn
def register ( self , target , hostname , listener_type , expire = - 1 ) :
if target . topic and target . server :
key = zmq_address . target_to_key ( target , listener_type )
@ -179,6 +206,8 @@ class RedisMatchMaker(base.MatchMakerBase):
key = zmq_address . prefix_str ( target . topic , listener_type )
self . _add_key_with_expire ( key , hostname , expire )
@no_reraise
@redis_connection_warn
def unregister ( self , target , hostname , listener_type ) :
if target . topic and target . server :
key = zmq_address . target_to_key ( target , listener_type )
@ -188,9 +217,8 @@ class RedisMatchMaker(base.MatchMakerBase):
key = zmq_address . prefix_str ( target . topic , listener_type )
self . _redis . srem ( key , hostname )
@redis_connection_warn
def get_hosts ( self , target , listener_type ) :
LOG . debug ( " [Redis] get_hosts for target %s " , target )
hosts = [ ]
if target . topic and target . server :
@ -201,4 +229,39 @@ class RedisMatchMaker(base.MatchMakerBase):
key = zmq_address . prefix_str ( target . topic , listener_type )
hosts . extend ( self . _get_hosts_by_key ( key ) )
LOG . debug ( " [Redis] get_hosts for target %(target)s : %(hosts)s " ,
{ " target " : target , " hosts " : hosts } )
return hosts
def get_hosts_retry ( self , target , listener_type ) :
return self . _retry_method ( target , listener_type , self . get_hosts )
@redis_connection_warn
def get_hosts_fanout ( self , target , listener_type ) :
LOG . debug ( " [Redis] get_hosts for target %s " , target )
hosts = [ ]
if target . topic and target . server :
key = zmq_address . target_to_key ( target , listener_type )
hosts . extend ( self . _get_hosts_by_key ( key ) )
key = zmq_address . prefix_str ( target . topic , listener_type )
hosts . extend ( self . _get_hosts_by_key ( key ) )
return hosts
def get_hosts_fanout_retry ( self , target , listener_type ) :
return self . _retry_method ( target , listener_type , self . get_hosts_fanout )
def _retry_method ( self , target , listener_type , method ) :
conf = self . conf
@retry ( retry_on_result = retry_if_empty ,
wrap_exception = True ,
wait_fixed = conf . matchmaker_redis . wait_timeout ,
stop_max_delay = conf . matchmaker_redis . check_timeout )
def _get_hosts_retry ( target , listener_type ) :
return method ( target , listener_type )
return _get_hosts_retry ( target , listener_type )