diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py index 5eae1af27..dea2cadac 100644 --- a/oslo/messaging/_drivers/impl_zmq.py +++ b/oslo/messaging/_drivers/impl_zmq.py @@ -27,6 +27,7 @@ import eventlet import greenlet from oslo.config import cfg +from oslo.messaging._drivers import base from oslo.messaging._drivers import common as rpc_common from oslo.messaging.openstack.common import excutils from oslo.messaging.openstack.common import importutils @@ -54,7 +55,7 @@ zmq_opts = [ # The module.Class to use for matchmaking. cfg.StrOpt( 'rpc_zmq_matchmaker', - default=('openstack.common.rpc.' + default=('oslo.messaging._drivers.' 'matchmaker.MatchMakerLocalhost'), help='MatchMaker driver', ), @@ -78,9 +79,7 @@ zmq_opts = [ 'IP address. Must match "host" option, if running Nova.') ] - CONF = cfg.CONF -CONF.register_opts(zmq_opts) ZMQ_CTX = None # ZeroMQ Context, must be global. matchmaker = None # memoized matchmaker object @@ -818,3 +817,72 @@ def _get_matchmaker(*args, **kwargs): orig=CONF.rpc_zmq_matchmaker, new=mm)) matchmaker = importutils.import_object(mm, *args, **kwargs) return matchmaker + + +class ZmqDriver(base.BaseDriver): + + # FIXME(markmc): allow this driver to be used without eventlet + + def __init__(self, conf, url, default_exchange=None, + allowed_remote_exmods=[]): + conf.register_opts(zmq_opts) + + super(ZmqDriver, self).__init__(conf, url, default_exchange, + allowed_remote_exmods) + + # FIXME(markmc): handle default_exchange + + # FIXME(markmc): handle transport URL + if self._url.hosts: + raise NotImplementedError('The ZeroMQ driver does not yet support ' + 'transport URLs') + + # FIXME(markmc): use self.conf everywhere + if self.conf is not CONF: + raise NotImplementedError('The ZeroMQ driver currently only works ' + 'with oslo.config.cfg.CONF') + + def _send(self, target, ctxt, message, + wait_for_reply=None, timeout=None, envelope=True): + + # FIXME(markmc): remove this temporary hack + class Context(object): + def __init__(self, d): + self.d = d + + def to_dict(self): + return self.d + + context = Context(ctxt) + + if wait_for_reply: + method = _call + else: + method = _cast + + topic = target.topic + if target.fanout: + # NOTE(ewindisch): fanout~ is used because it avoid splitting on + # and acts as a non-subtle hint to the matchmaker and ZmqProxy. + topic = 'fanout~' + topic + + reply = _multi_send(method, context, topic, message, + envelope=envelope) + + if wait_for_reply: + return reply[-1] + + def send(self, target, ctxt, message, wait_for_reply=None, timeout=None): + return self._send(target, ctxt, message, wait_for_reply, timeout) + + def send_notification(self, target, ctxt, message, version): + # NOTE(ewindisch): dot-priority in rpc notifier does not + # work with our assumptions. + target = target(topic=target.topic.replace('.', '-')) + return self._send(target, ctxt, message, envelope=(version == 2.0)) + + def listen(self, target): + raise NotImplementedError() + + def cleanup(self): + cleanup()