Implement the client side of ZmqDriver
Get sending working with an initial version of the driver. There's a bunch of FIXMEs inline reflecting that even the client side needs a tonne of work yet. Change-Id: I6d69ebc9ae3b3999832209e0c4100ffe26e35919
This commit is contained in:
parent
ff3a4155bf
commit
5d3fc9c18c
@ -27,6 +27,7 @@ import eventlet
|
|||||||
import greenlet
|
import greenlet
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
|
from oslo.messaging._drivers import base
|
||||||
from oslo.messaging._drivers import common as rpc_common
|
from oslo.messaging._drivers import common as rpc_common
|
||||||
from oslo.messaging.openstack.common import excutils
|
from oslo.messaging.openstack.common import excutils
|
||||||
from oslo.messaging.openstack.common import importutils
|
from oslo.messaging.openstack.common import importutils
|
||||||
@ -54,7 +55,7 @@ zmq_opts = [
|
|||||||
# The module.Class to use for matchmaking.
|
# The module.Class to use for matchmaking.
|
||||||
cfg.StrOpt(
|
cfg.StrOpt(
|
||||||
'rpc_zmq_matchmaker',
|
'rpc_zmq_matchmaker',
|
||||||
default=('openstack.common.rpc.'
|
default=('oslo.messaging._drivers.'
|
||||||
'matchmaker.MatchMakerLocalhost'),
|
'matchmaker.MatchMakerLocalhost'),
|
||||||
help='MatchMaker driver',
|
help='MatchMaker driver',
|
||||||
),
|
),
|
||||||
@ -78,9 +79,7 @@ zmq_opts = [
|
|||||||
'IP address. Must match "host" option, if running Nova.')
|
'IP address. Must match "host" option, if running Nova.')
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
CONF.register_opts(zmq_opts)
|
|
||||||
|
|
||||||
ZMQ_CTX = None # ZeroMQ Context, must be global.
|
ZMQ_CTX = None # ZeroMQ Context, must be global.
|
||||||
matchmaker = None # memoized matchmaker object
|
matchmaker = None # memoized matchmaker object
|
||||||
@ -818,3 +817,72 @@ def _get_matchmaker(*args, **kwargs):
|
|||||||
orig=CONF.rpc_zmq_matchmaker, new=mm))
|
orig=CONF.rpc_zmq_matchmaker, new=mm))
|
||||||
matchmaker = importutils.import_object(mm, *args, **kwargs)
|
matchmaker = importutils.import_object(mm, *args, **kwargs)
|
||||||
return matchmaker
|
return matchmaker
|
||||||
|
|
||||||
|
|
||||||
|
class ZmqDriver(base.BaseDriver):
|
||||||
|
|
||||||
|
# FIXME(markmc): allow this driver to be used without eventlet
|
||||||
|
|
||||||
|
def __init__(self, conf, url, default_exchange=None,
|
||||||
|
allowed_remote_exmods=[]):
|
||||||
|
conf.register_opts(zmq_opts)
|
||||||
|
|
||||||
|
super(ZmqDriver, self).__init__(conf, url, default_exchange,
|
||||||
|
allowed_remote_exmods)
|
||||||
|
|
||||||
|
# FIXME(markmc): handle default_exchange
|
||||||
|
|
||||||
|
# FIXME(markmc): handle transport URL
|
||||||
|
if self._url.hosts:
|
||||||
|
raise NotImplementedError('The ZeroMQ driver does not yet support '
|
||||||
|
'transport URLs')
|
||||||
|
|
||||||
|
# FIXME(markmc): use self.conf everywhere
|
||||||
|
if self.conf is not CONF:
|
||||||
|
raise NotImplementedError('The ZeroMQ driver currently only works '
|
||||||
|
'with oslo.config.cfg.CONF')
|
||||||
|
|
||||||
|
def _send(self, target, ctxt, message,
|
||||||
|
wait_for_reply=None, timeout=None, envelope=True):
|
||||||
|
|
||||||
|
# FIXME(markmc): remove this temporary hack
|
||||||
|
class Context(object):
|
||||||
|
def __init__(self, d):
|
||||||
|
self.d = d
|
||||||
|
|
||||||
|
def to_dict(self):
|
||||||
|
return self.d
|
||||||
|
|
||||||
|
context = Context(ctxt)
|
||||||
|
|
||||||
|
if wait_for_reply:
|
||||||
|
method = _call
|
||||||
|
else:
|
||||||
|
method = _cast
|
||||||
|
|
||||||
|
topic = target.topic
|
||||||
|
if target.fanout:
|
||||||
|
# NOTE(ewindisch): fanout~ is used because it avoid splitting on
|
||||||
|
# and acts as a non-subtle hint to the matchmaker and ZmqProxy.
|
||||||
|
topic = 'fanout~' + topic
|
||||||
|
|
||||||
|
reply = _multi_send(method, context, topic, message,
|
||||||
|
envelope=envelope)
|
||||||
|
|
||||||
|
if wait_for_reply:
|
||||||
|
return reply[-1]
|
||||||
|
|
||||||
|
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
|
||||||
|
return self._send(target, ctxt, message, wait_for_reply, timeout)
|
||||||
|
|
||||||
|
def send_notification(self, target, ctxt, message, version):
|
||||||
|
# NOTE(ewindisch): dot-priority in rpc notifier does not
|
||||||
|
# work with our assumptions.
|
||||||
|
target = target(topic=target.topic.replace('.', '-'))
|
||||||
|
return self._send(target, ctxt, message, envelope=(version == 2.0))
|
||||||
|
|
||||||
|
def listen(self, target):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
cleanup()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user