Rework zmq setup and cleanup
Register config options at import time and remove the register_opts() function. It seems this isn't being used. Add accessor methods for ZMQ_CTXT and matchmaker so that they are lazily created rather than created at import time. Remove the rpc_zmq_matchmaker override in the tests since MatchMakerLocalhost is the default anyway. We were setting it with an incorrect value, but only after the matchmaker had already been created. Change-Id: I2520252c64d70e4c4903e34d07952fed43e70ebe
This commit is contained in:
parent
9827528b14
commit
4a6ef49850
|
@ -76,9 +76,9 @@ zmq_opts = [
|
|||
]
|
||||
|
||||
|
||||
# These globals are defined in register_opts(conf),
|
||||
# a mandatory initialization call
|
||||
CONF = None
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(zmq_opts)
|
||||
|
||||
ZMQ_CTX = None # ZeroMQ Context, must be global.
|
||||
matchmaker = None # memoized matchmaker object
|
||||
|
||||
|
@ -113,7 +113,7 @@ class ZmqSocket(object):
|
|||
"""
|
||||
|
||||
def __init__(self, addr, zmq_type, bind=True, subscribe=None):
|
||||
self.sock = ZMQ_CTX.socket(zmq_type)
|
||||
self.sock = _get_ctxt().socket(zmq_type)
|
||||
self.addr = addr
|
||||
self.type = zmq_type
|
||||
self.subscriptions = []
|
||||
|
@ -685,7 +685,7 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
|
|||
conf = CONF
|
||||
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
|
||||
|
||||
queues = matchmaker.queues(topic)
|
||||
queues = _get_matchmaker().queues(topic)
|
||||
LOG.debug(_("Sending message(s) to: %s"), queues)
|
||||
|
||||
# Don't stack if we have no matchmaker results
|
||||
|
@ -753,32 +753,26 @@ def notify(conf, context, topic, msg, **kwargs):
|
|||
def cleanup():
|
||||
"""Clean up resources in use by implementation."""
|
||||
global ZMQ_CTX
|
||||
global matchmaker
|
||||
matchmaker = None
|
||||
ZMQ_CTX.term()
|
||||
if ZMQ_CTX:
|
||||
ZMQ_CTX.term()
|
||||
ZMQ_CTX = None
|
||||
|
||||
|
||||
def register_opts(conf):
|
||||
"""Registration of options for this driver."""
|
||||
#NOTE(ewindisch): ZMQ_CTX and matchmaker
|
||||
# are initialized here as this is as good
|
||||
# an initialization method as any.
|
||||
|
||||
# We memoize through these globals
|
||||
global ZMQ_CTX
|
||||
global matchmaker
|
||||
global CONF
|
||||
matchmaker = None
|
||||
|
||||
if not CONF:
|
||||
conf.register_opts(zmq_opts)
|
||||
CONF = conf
|
||||
# Don't re-set, if this method is called twice.
|
||||
|
||||
def _get_ctxt():
|
||||
global ZMQ_CTX
|
||||
if not ZMQ_CTX:
|
||||
ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
|
||||
ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
|
||||
return ZMQ_CTX
|
||||
|
||||
|
||||
def _get_matchmaker():
|
||||
global matchmaker
|
||||
if not matchmaker:
|
||||
# rpc_zmq_matchmaker should be set to a 'module.Class'
|
||||
mm_path = conf.rpc_zmq_matchmaker.split('.')
|
||||
mm_path = CONF.rpc_zmq_matchmaker.split('.')
|
||||
mm_module = '.'.join(mm_path[:-1])
|
||||
mm_class = mm_path[-1]
|
||||
|
||||
|
@ -791,6 +785,4 @@ def register_opts(conf):
|
|||
mm_impl = importutils.import_module(mm_module)
|
||||
mm_constructor = getattr(mm_impl, mm_class)
|
||||
matchmaker = mm_constructor()
|
||||
|
||||
|
||||
register_opts(cfg.CONF)
|
||||
return matchmaker
|
||||
|
|
|
@ -59,7 +59,6 @@ class _RpcZmqBaseTestCase(common.BaseRpcTestCase):
|
|||
self.config(rpc_zmq_bind_address='127.0.0.1')
|
||||
self.config(rpc_zmq_host='127.0.0.1')
|
||||
self.config(rpc_response_timeout=5)
|
||||
self.config(rpc_zmq_matchmaker='mod_matchmaker.MatchMakerLocalhost')
|
||||
|
||||
# We'll change this if we detect no daemon running.
|
||||
ipc_dir = FLAGS.rpc_zmq_ipc_dir
|
||||
|
|
Loading…
Reference in New Issue