diff --git a/oslo_messaging/_cmd/zmq_receiver.py b/oslo_messaging/_cmd/zmq_receiver.py index f259299f9..abd24e8d4 100644 --- a/oslo_messaging/_cmd/zmq_receiver.py +++ b/oslo_messaging/_cmd/zmq_receiver.py @@ -14,9 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import eventlet -eventlet.monkey_patch() - import contextlib import logging import sys @@ -30,6 +27,9 @@ from oslo_messaging._executors import base # FIXME(markmc) CONF = cfg.CONF CONF.register_opts(impl_zmq.zmq_opts) CONF.register_opts(base._pool_opts) +# TODO(ozamiatin): Move this option assignment to an external config file +# Use efficient zmq poller in real-world deployment +CONF.rpc_zmq_native = True def main(): diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py index 2051e9a30..1d2620825 100644 --- a/oslo_messaging/_drivers/base.py +++ b/oslo_messaging/_drivers/base.py @@ -75,8 +75,7 @@ class Listener(object): def cleanup(self): """Cleanup listener. - Close connection used by listener if any. For some listeners like - zmq there is no connection so no need to close connection. + Close connection (socket) used by listener if any. As this is listener specific method, overwrite it in to derived class if cleanup of listener required. """ diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index b75bf8f9c..fbd9f081f 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -45,9 +45,13 @@ zmq_opts = [ cfg.BoolOpt('rpc_zmq_all_req_rep', default=True, - deprecated_group='DEFAULT', help='Use REQ/REP pattern for all methods CALL/CAST/FANOUT.'), + cfg.BoolOpt('rpc_zmq_native', + default=False, + help='Switches ZeroMQ eventlet/threading way of usage.' + 'Affects pollers, executors etc.'), + # The following port is unassigned by IANA as of 2012-05-21 cfg.IntOpt('rpc_zmq_port', default=9501, help='ZeroMQ receiver listening port.'), diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py index 59cd42a79..5443e6af0 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py @@ -41,7 +41,8 @@ class BaseProxy(object): super(BaseProxy, self).__init__() self.conf = conf self.context = context - self.executor = zmq_async.get_executor(self.run) + self.executor = zmq_async.get_executor( + self.run, native_zmq=conf.rpc_zmq_native) @abc.abstractmethod def run(self): @@ -132,9 +133,8 @@ class DirectBackendMatcher(BaseBackendMatcher): def _match_backend(self, message): topic = self._get_topic(message) ipc_address = self._get_ipc_address(topic) - if ipc_address not in self.backends: - self._create_backend(ipc_address) - return self.backend, topic + backend = self._create_backend(ipc_address) + return backend, topic @abc.abstractmethod def _get_topic(self, message): diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py index 08c5d7f79..e3835bae6 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py @@ -24,8 +24,6 @@ from oslo_messaging._i18n import _LE, _LI LOG = logging.getLogger(__name__) -zmq = zmq_async.import_zmq() - class ZmqBroker(object): """Local messaging IPC broker (nodes are still peers). @@ -42,6 +40,7 @@ class ZmqBroker(object): def __init__(self, conf): super(ZmqBroker, self).__init__() + zmq = zmq_async.import_zmq(native_zmq=conf.rpc_zmq_native) self.conf = conf self.context = zmq.Context() proxy = zmq_universal_proxy.UniversalProxy(conf, self.context) diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py index 57c7d80e8..a2150fa30 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py @@ -46,12 +46,11 @@ class CallProxy(base_proxy.BaseProxy): class DealerBackend(base_proxy.DirectBackendMatcher): - def __init__(self, conf, context): - super(DealerBackend, self).__init__(conf, - zmq_async.get_poller(), - context) - self.backend = self.context.socket(zmq.DEALER) - self.poller.register(self.backend) + def __init__(self, conf, context, poller=None): + if poller is None: + poller = zmq_async.get_poller( + native_zmq=conf.rpc_zmq_native) + super(DealerBackend, self).__init__(conf, poller, context) def receive_outgoing_reply(self): reply_message = self.poller.poll(1) @@ -71,16 +70,22 @@ class DealerBackend(base_proxy.DirectBackendMatcher): backend.send_multipart(message) def _create_backend(self, ipc_address): - self.backend.connect(ipc_address) - self.backends[str(ipc_address)] = True + if ipc_address in self.backends: + return self.backends[ipc_address] + backend = self.context.socket(zmq.DEALER) + backend.connect(ipc_address) + self.poller.register(backend) + self.backends[ipc_address] = backend + return backend class FrontendTcpRouter(base_proxy.BaseTcpFrontend): - def __init__(self, conf, context): - super(FrontendTcpRouter, self).__init__(conf, - zmq_async.get_poller(), - context, + def __init__(self, conf, context, poller=None): + if poller is None: + poller = zmq_async.get_poller( + native_zmq=conf.rpc_zmq_native) + super(FrontendTcpRouter, self).__init__(conf, poller, context, socket_type=zmq.ROUTER, port_number=conf.rpc_zmq_port) diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_cast_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_cast_proxy.py index 8eef8befc..9779779df 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_cast_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_cast_proxy.py @@ -42,8 +42,8 @@ class CastProxy(base_proxy.BaseProxy): class FrontendTcpPull(base_proxy.BaseTcpFrontend): def __init__(self, conf, context): - super(FrontendTcpPull, self).__init__(conf, zmq_async.get_poller(), - context) + poller = zmq_async.get_poller(native_zmq=conf.rpc_zmq_native) + super(FrontendTcpPull, self).__init__(conf, poller, context) self.frontend = self.context.socket(zmq.PULL) address = zmq_topic.get_tcp_bind_address(conf.rpc_zmq_fanout_port) LOG.info(_LI("Binding to TCP PULL %s") % address) @@ -58,9 +58,8 @@ class FrontendTcpPull(base_proxy.BaseTcpFrontend): class CastPushBackendMatcher(base_proxy.BaseBackendMatcher): def __init__(self, conf, context): - super(CastPushBackendMatcher, self).__init__(conf, - zmq_async.get_poller(), - context) + poller = zmq_async.get_poller(native_zmq=conf.rpc_zmq_native) + super(CastPushBackendMatcher, self).__init__(conf, poller, context) self.backend = self.context.socket(zmq.PUSH) def _get_topic(self, message): diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py index 131101661..8d1f8b185 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py @@ -24,9 +24,8 @@ zmq = zmq_async.import_zmq() class PublisherBackend(base_proxy.BaseBackendMatcher): def __init__(self, conf, context): - super(PublisherBackend, self).__init__(conf, - zmq_async.get_poller(), - context) + poller = zmq_async.get_poller(native_zmq=conf.rpc_zmq_native) + super(PublisherBackend, self).__init__(conf, poller, context) self.backend = self.context.socket(zmq.PUB) self.backend.bind(zmq_topic.get_ipc_address_fanout(conf)) diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py index 1d8982d41..c57a60f9f 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py @@ -17,6 +17,7 @@ import logging import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy from oslo_messaging._drivers.zmq_driver.broker import zmq_call_proxy from oslo_messaging._drivers.zmq_driver.broker import zmq_fanout_proxy +from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_serializer from oslo_messaging._i18n import _LI @@ -27,27 +28,34 @@ class UniversalProxy(base_proxy.BaseProxy): def __init__(self, conf, context): super(UniversalProxy, self).__init__(conf, context) - self.tcp_frontend = zmq_call_proxy.FrontendTcpRouter(conf, context) - self.backend_matcher = BackendMatcher(conf, context) + self.poller = zmq_async.get_poller( + native_zmq=conf.rpc_zmq_native) + self.tcp_frontend = zmq_call_proxy.FrontendTcpRouter( + conf, context, poller=self.poller) + self.backend_matcher = BackendMatcher( + conf, context, poller=self.poller) call = zmq_serializer.CALL_TYPE self.call_backend = self.backend_matcher.backends[call] LOG.info(_LI("Starting universal-proxy thread")) def run(self): - message = self.tcp_frontend.receive_incoming() - if message is not None: - self.backend_matcher.redirect_to_backend(message) + message, socket = self.poller.poll(self.conf.rpc_poll_timeout) + if message is None: + return - reply, socket = self.call_backend.receive_outgoing_reply() - if reply is not None: - self.tcp_frontend.redirect_outgoing_reply(reply) + LOG.info(_LI("Received message at universal proxy: %s") % str(message)) + + if socket == self.tcp_frontend.frontend: + self.backend_matcher.redirect_to_backend(message) + else: + self.tcp_frontend.redirect_outgoing_reply(message) class BackendMatcher(base_proxy.BaseBackendMatcher): - def __init__(self, conf, context): - super(BackendMatcher, self).__init__(conf, None, context) - direct_backend = zmq_call_proxy.DealerBackend(conf, context) + def __init__(self, conf, context, poller=None): + super(BackendMatcher, self).__init__(conf, poller, context) + direct_backend = zmq_call_proxy.DealerBackend(conf, context, poller) self.backends[zmq_serializer.CALL_TYPE] = direct_backend self.backends[zmq_serializer.CAST_TYPE] = direct_backend fanout_backend = zmq_fanout_proxy.PublisherBackend(conf, context) diff --git a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py index e4317c487..db9c1463a 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py @@ -15,36 +15,60 @@ import logging import threading +from oslo_utils import eventletutils import zmq from oslo_messaging._drivers.zmq_driver import zmq_poller LOG = logging.getLogger(__name__) +_threading = threading + +if eventletutils.EVENTLET_AVAILABLE: + import eventlet + _threading = eventlet.patcher.original('threading') + class ThreadingPoller(zmq_poller.ZmqPoller): def __init__(self): self.poller = zmq.Poller() + self.recv_methods = {} - def register(self, socket): - self.poller.register(socket, zmq.POLLOUT) + def register(self, socket, recv_method=None): + if recv_method is not None: + self.recv_methods[socket] = recv_method + self.poller.register(socket, zmq.POLLIN) def poll(self, timeout=None): - socks = dict(self.poller.poll(timeout)) - for socket in socks: - incoming = socket.recv() - return incoming + timeout = timeout * 1000 # zmq poller waits milliseconds + sockets = dict(self.poller.poll(timeout=timeout)) + if not sockets: + return None, None + for socket in sockets: + if socket in self.recv_methods: + return self.recv_methods[socket](socket) + else: + return socket.recv_multipart(), socket class ThreadingExecutor(zmq_poller.Executor): def __init__(self, method): - thread = threading.Thread(target=method) - super(ThreadingExecutor, self).__init__(thread) + self._method = method + super(ThreadingExecutor, self).__init__( + _threading.Thread(target=self._loop)) + self._stop = _threading.Event() + + def _loop(self): + while not self._stop.is_set(): + self._method() def execute(self): self.thread.start() + def stop(self): + self._stop.set() + def wait(self): self.thread.join() diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py index d66e5d4de..47a87d1d2 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py @@ -60,8 +60,9 @@ class CallRequest(Request): raise oslo_messaging.MessagingTimeout( "Timeout %s seconds was reached" % self.timeout) - if reply['failure']: + if reply[zmq_serializer.FIELD_FAILURE]: raise rpc_common.deserialize_remote_exception( - reply['failure'], self.allowed_remote_exmods) + reply[zmq_serializer.FIELD_FAILURE], + self.allowed_remote_exmods) else: - return reply['reply'] + return reply[zmq_serializer.FIELD_REPLY] diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py index 59b46e535..7f7ec57a3 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py @@ -19,6 +19,7 @@ from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_base_consumer from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_serializer from oslo_messaging._drivers.zmq_driver import zmq_topic as topic_utils from oslo_messaging._i18n import _LE @@ -41,9 +42,9 @@ class ZmqIncomingRequest(base.IncomingMessage): if failure is not None: failure = rpc_common.serialize_remote_exception(failure, log_failure) - message_reply = {u'reply': reply, - u'failure': failure, - u'log_failure': log_failure} + message_reply = {zmq_serializer.FIELD_REPLY: reply, + zmq_serializer.FIELD_FAILURE: failure, + zmq_serializer.FIELD_LOG_FAILURE: log_failure} LOG.debug("Replying %s REP", (str(message_reply))) self.received = True self.reply_socket.send(self.reply_id, zmq.SNDMORE) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_async.py b/oslo_messaging/_drivers/zmq_driver/zmq_async.py index 3694d0f5a..261746392 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_async.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_async.py @@ -23,8 +23,12 @@ LOG = logging.getLogger(__name__) green_zmq = importutils.try_import('eventlet.green.zmq') -def import_zmq(): - imported_zmq = green_zmq or importutils.try_import('zmq') +def import_zmq(native_zmq=False): + if native_zmq: + imported_zmq = importutils.try_import('zmq') + else: + imported_zmq = green_zmq or importutils.try_import('zmq') + if imported_zmq is None: errmsg = _LE("ZeroMQ not found!") LOG.error(errmsg) @@ -32,28 +36,28 @@ def import_zmq(): return imported_zmq -def get_poller(): - if green_zmq: +def get_poller(native_zmq=False): + if native_zmq or green_zmq is None: + from oslo_messaging._drivers.zmq_driver.poller import threading_poller + return threading_poller.ThreadingPoller() + else: from oslo_messaging._drivers.zmq_driver.poller import green_poller return green_poller.GreenPoller() - else: + + +def get_reply_poller(native_zmq=False): + if native_zmq or green_zmq is None: from oslo_messaging._drivers.zmq_driver.poller import threading_poller return threading_poller.ThreadingPoller() - - -def get_reply_poller(): - if green_zmq: + else: from oslo_messaging._drivers.zmq_driver.poller import green_poller return green_poller.HoldReplyPoller() - else: + + +def get_executor(method, native_zmq=False): + if native_zmq or green_zmq is None: from oslo_messaging._drivers.zmq_driver.poller import threading_poller - return threading_poller.ThreadingPoller() - - -def get_executor(method): - if green_zmq is not None: + return threading_poller.ThreadingExecutor(method) + else: from oslo_messaging._drivers.zmq_driver.poller import green_poller return green_poller.GreenExecutor(method) - else: - from oslo_messaging._drivers.zmq_driver.poller import threading_poller - return threading_poller.ThreadingExecutor() diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py b/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py index 64145ab4f..ef422ff8d 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py @@ -26,6 +26,10 @@ LOG = logging.getLogger(__name__) MESSAGE_CALL_TYPE_POSITION = 2 MESSAGE_CALL_TOPIC_POSITION = 3 +FIELD_FAILURE = 'failure' +FIELD_REPLY = 'reply' +FIELD_LOG_FAILURE = 'log_failure' + CALL_TYPE = 'call' CAST_TYPE = 'cast' FANOUT_TYPE = 'fanout' diff --git a/oslo_messaging/tests/drivers/test_impl_zmq.py b/oslo_messaging/tests/drivers/test_impl_zmq.py index a2499ce10..dd6df4448 100644 --- a/oslo_messaging/tests/drivers/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/test_impl_zmq.py @@ -150,7 +150,7 @@ class TestZmqBasics(ZmqBaseTestCase): target, {}, {'method': 'hello-world', 'tx_id': 1}, wait_for_reply=True) - self.assertIsNotNone(result) + self.assertTrue(result) def test_send_noreply(self): """Cast() with topic.""" diff --git a/setup-test-env-zmq.sh b/setup-test-env-zmq.sh index c3c8e33c1..b27ee9d3f 100755 --- a/setup-test-env-zmq.sh +++ b/setup-test-env-zmq.sh @@ -24,7 +24,4 @@ redis-server --port $ZMQ_REDIS_PORT & oslo-messaging-zmq-receiver --config-file ${DATADIR}/zmq.conf > ${DATADIR}/receiver.log 2>&1 & -# FIXME(sileht): This does the same kind of setup that devstack does -# But this doesn't work yet, a zeromq maintener should take a look on that - $*