diff --git a/oslo_messaging/_cmd/__init__.py b/oslo_messaging/_cmd/__init__.py new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/oslo_messaging/_cmd/__init__.py @@ -0,0 +1 @@ + diff --git a/oslo_messaging/_cmd/zmq_broker.py b/oslo_messaging/_cmd/zmq_broker.py new file mode 100644 index 000000000..8b4220524 --- /dev/null +++ b/oslo_messaging/_cmd/zmq_broker.py @@ -0,0 +1,42 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib +import logging +import sys + +from oslo_config import cfg + +from oslo_messaging._drivers import impl_zmq +from oslo_messaging._drivers.zmq_driver.broker import zmq_broker +from oslo_messaging._executors import impl_pooledexecutor + +CONF = cfg.CONF +CONF.register_opts(impl_zmq.zmq_opts) +CONF.register_opts(impl_pooledexecutor._pool_opts) +# TODO(ozamiatin): Move this option assignment to an external config file +# Use efficient zmq poller in real-world deployment +CONF.rpc_zmq_native = True + + +def main(): + CONF(sys.argv[1:], project='oslo') + logging.basicConfig(level=logging.DEBUG) + + with contextlib.closing(zmq_broker.ZmqBroker(CONF)) as reactor: + reactor.start() + reactor.wait() + +if __name__ == "__main__": + main() diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index d30dc283c..0213bdaf9 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -80,6 +80,10 @@ zmq_opts = [ default=1, help='The default number of seconds that poll should wait. ' 'Poll raises timeout exception when timeout expired.'), + + cfg.BoolOpt('zmq_use_broker', + default=True, + help='Shows whether zmq-messaging uses broker or not.') ] diff --git a/oslo_messaging/_drivers/zmq_driver/broker/__init__.py b/oslo_messaging/_drivers/zmq_driver/broker/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py new file mode 100644 index 000000000..c309474f8 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py @@ -0,0 +1,53 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import logging + +import six + +from oslo_messaging._drivers.zmq_driver import zmq_async + +LOG = logging.getLogger(__name__) + + +@six.add_metaclass(abc.ABCMeta) +class BaseProxy(object): + + """Base TCP-proxy. + + TCP-proxy redirects messages received by TCP from clients to servers + over IPC. Consists of TCP-frontend and IPC-backend objects. Runs + in async executor. + """ + + def __init__(self, conf, context): + super(BaseProxy, self).__init__() + self.conf = conf + self.context = context + self.executor = zmq_async.get_executor(self.run, + zmq_concurrency='native') + + @abc.abstractmethod + def run(self): + """Main execution point of the proxy""" + + def start(self): + self.executor.execute() + + def stop(self): + self.executor.stop() + + def wait(self): + self.executor.wait() diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py new file mode 100644 index 000000000..163d3c174 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py @@ -0,0 +1,82 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os + +from oslo_utils import excutils +import six +from stevedore import driver +import zmq + +from oslo_messaging._drivers.zmq_driver.broker import zmq_queue_proxy +from oslo_messaging._i18n import _LE, _LI + + +LOG = logging.getLogger(__name__) + + +class ZmqBroker(object): + """Local messaging IPC broker (nodes are still peers). + The main purpose is to have native zeromq application. + Benefits of such approach are following: + + 1. No risk to block the main thread of the process by unpatched + native parts of the libzmq (c-library is completely monkey-patch + unfriendly) + 2. Making use of standard zmq approaches as async pollers, + devices, queues etc. + 3. Possibility to implement queue persistence not touching existing + clients (staying in a separate process). + """ + + def __init__(self, conf): + super(ZmqBroker, self).__init__() + self.conf = conf + self._create_ipc_dirs() + self.matchmaker = driver.DriverManager( + 'oslo.messaging.zmq.matchmaker', + self.conf.rpc_zmq_matchmaker, + ).driver(self.conf) + + self.context = zmq.Context() + self.queue = six.moves.queue.Queue() + self.proxies = [zmq_queue_proxy.OutgoingQueueProxy( + conf, self.context, self.queue, self.matchmaker), + zmq_queue_proxy.IncomingQueueProxy( + conf, self.context, self.queue) + ] + + def _create_ipc_dirs(self): + ipc_dir = self.conf.rpc_zmq_ipc_dir + try: + os.makedirs("%s/fanout" % ipc_dir) + except os.error: + if not os.path.isdir(ipc_dir): + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Required IPC directory does not exist at" + " %s"), ipc_dir) + + def start(self): + for proxy in self.proxies: + proxy.start() + + def wait(self): + for proxy in self.proxies: + proxy.wait() + + def close(self): + LOG.info(_LI("Broker shutting down ...")) + for proxy in self.proxies: + proxy.stop() diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py new file mode 100644 index 000000000..c3c547fe5 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -0,0 +1,78 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +import six +import zmq + +from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy +from oslo_messaging._drivers.zmq_driver.client.publishers\ + import zmq_dealer_publisher +from oslo_messaging._drivers.zmq_driver import zmq_address +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._i18n import _LI + +LOG = logging.getLogger(__name__) + + +class OutgoingQueueProxy(zmq_base_proxy.BaseProxy): + + def __init__(self, conf, context, queue, matchmaker): + super(OutgoingQueueProxy, self).__init__(conf, context) + self.queue = queue + self.matchmaker = matchmaker + self.publisher = zmq_dealer_publisher.DealerPublisher( + conf, matchmaker) + LOG.info(_LI("Polling at outgoing proxy ...")) + + def run(self): + try: + request = self.queue.get(timeout=self.conf.rpc_poll_timeout) + LOG.info(_LI("Redirecting request %s to TCP publisher ...") + % request) + self.publisher.send_request(request) + except six.moves.queue.Empty: + return + + +class IncomingQueueProxy(zmq_base_proxy.BaseProxy): + + def __init__(self, conf, context, queue): + super(IncomingQueueProxy, self).__init__(conf, context) + self.poller = zmq_async.get_poller( + zmq_concurrency='native') + + self.queue = queue + + self.socket = context.socket(zmq.ROUTER) + self.socket.bind(zmq_address.get_broker_address(conf)) + self.poller.register(self.socket, self.receive_request) + LOG.info(_LI("Polling at incoming proxy ...")) + + def run(self): + request, socket = self.poller.poll(self.conf.rpc_poll_timeout) + if request is None: + return + + LOG.info(_LI("Received request and queue it: %s") % str(request)) + + self.queue.put(request) + + def receive_request(self, socket): + reply_id = socket.recv() + assert reply_id is not None, "Valid id expected" + empty = socket.recv() + assert empty == b'', "Empty delimiter expected" + return socket.recv_pyobj() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py index a5c3f0fdf..2c8fc5ec5 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py @@ -13,7 +13,6 @@ # under the License. import logging -import uuid from oslo_messaging._drivers.zmq_driver.client.publishers\ import zmq_publisher_base @@ -58,13 +57,8 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend): def _send_request(self, socket, request): - message_id = str(uuid.uuid1()) - socket.send(b'', zmq.SNDMORE) - socket.send_string(request.msg_type, zmq.SNDMORE) - socket.send_string(message_id, zmq.SNDMORE) - socket.send_pyobj(request.context, zmq.SNDMORE) - socket.send_pyobj(request.message) + socket.send_pyobj(request) LOG.info(_LI("Sending message %(message)s to a target %(target)s") % {"message": request.message, @@ -75,6 +69,26 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend): super(DealerPublisher, self).cleanup() +class DealerPublisherLight(zmq_publisher_base.PublisherBase): + + def __init__(self, conf, address): + super(DealerPublisherLight, self).__init__(conf) + self.socket = self.zmq_context.socket(zmq.DEALER) + self.socket.connect(address) + + def send_request(self, request): + + if request.msg_type == zmq_names.CALL_TYPE: + raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) + + self.socket.send(b'', zmq.SNDMORE) + self.socket.send_pyobj(request) + + def cleanup(self): + self.socket.setsockopt(zmq.LINGER, 0) + self.socket.close() + + class AcknowledgementReceiver(object): def __init__(self): diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index 5ff1a41c6..faee64d25 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -56,7 +56,7 @@ class PublisherBase(object): Publisher can send request objects from zmq_request. """ - def __init__(self, conf, matchmaker): + def __init__(self, conf): """Construct publisher @@ -65,13 +65,10 @@ class PublisherBase(object): :param conf: configuration object :type conf: oslo_config.CONF - :param matchmaker: Name Service interface object - :type matchmaker: matchmaker.MatchMakerBase """ self.conf = conf self.zmq_context = zmq.Context() - self.matchmaker = matchmaker self.outbound_sockets = {} super(PublisherBase, self).__init__() @@ -92,9 +89,7 @@ class PublisherBase(object): :param request: Message data and destination container object :type request: zmq_request.Request """ - socket.send_string(request.msg_type, zmq.SNDMORE) - socket.send_pyobj(request.context, zmq.SNDMORE) - socket.send_pyobj(request.message) + socket.send_pyobj(request) def cleanup(self): """Cleanup publisher. Close allocated connections.""" @@ -106,8 +101,19 @@ class PublisherBase(object): class PublisherMultisend(PublisherBase): def __init__(self, conf, matchmaker, socket_type): + + """Construct publisher multi-send + + Base class for fanout-sending publishers. + + :param conf: configuration object + :type conf: oslo_config.CONF + :param matchmaker: Name Service interface object + :type matchmaker: matchmaker.MatchMakerBase + """ + super(PublisherMultisend, self).__init__(conf) self.socket_type = socket_type - super(PublisherMultisend, self).__init__(conf, matchmaker) + self.matchmaker = matchmaker def _check_hosts_connections(self, target): # TODO(ozamiatin): Place for significant optimization @@ -126,6 +132,7 @@ class PublisherMultisend(PublisherBase): def _connect_to_host(self, socket, host, target): address = zmq_address.get_tcp_direct_address(host) + LOG.info(address) stype = zmq_names.socket_type_str(self.socket_type) try: LOG.info(_LI("Connecting %(stype)s to %(address)s for %(target)s") diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py index 001fe026f..ab171930f 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py @@ -31,6 +31,10 @@ zmq = zmq_async.import_zmq() class ReqPublisher(zmq_publisher_base.PublisherBase): + def __init__(self, conf, matchmaker): + super(ReqPublisher, self).__init__(conf) + self.matchmaker = matchmaker + def send_request(self, request): if request.msg_type != zmq_names.CALL_TYPE: diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index 26a358f67..3e7888d5f 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -19,6 +19,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\ from oslo_messaging._drivers.zmq_driver.client.publishers\ import zmq_req_publisher from oslo_messaging._drivers.zmq_driver.client import zmq_request +from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async zmq = zmq_async.import_zmq() @@ -31,8 +32,14 @@ class ZmqClient(object): self.context = zmq.Context() self.matchmaker = matchmaker self.allowed_remote_exmods = allowed_remote_exmods or [] - self.dealer_publisher = zmq_dealer_publisher.DealerPublisher( - conf, matchmaker) + + self.dealer_publisher = None + if self.conf.zmq_use_broker: + self.dealer_publisher = zmq_dealer_publisher.DealerPublisherLight( + conf, zmq_address.get_broker_address(self.conf)) + else: + self.dealer_publisher = zmq_dealer_publisher.DealerPublisher( + conf, matchmaker) def send_call(self, target, context, message, timeout=None, retry=None): with contextlib.closing(zmq_request.CallRequest( diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py index e692a3aab..92d444a33 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py @@ -14,6 +14,7 @@ import abc import logging +import uuid import six @@ -61,6 +62,7 @@ class Request(object): self.context = context self.message = message self.retry = retry + self.message_id = str(uuid.uuid1()) @abc.abstractproperty def msg_type(self): diff --git a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py index 7719310cf..0c2beea0d 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py @@ -41,7 +41,7 @@ class ThreadingPoller(zmq_poller.ZmqPoller): self.poller.register(socket, zmq.POLLIN) def poll(self, timeout=None): - timeout = timeout * 1000 # zmq poller waits milliseconds + timeout *= 1000 # zmq poller waits milliseconds sockets = None try: diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index bdac85949..f6016607e 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -81,29 +81,22 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): reply_id = socket.recv() empty = socket.recv() assert empty == b'', 'Bad format: empty delimiter expected' - msg_type = socket.recv_string() - assert msg_type is not None, 'Bad format: msg type expected' + request = socket.recv_pyobj() - msg_id = None - if msg_type != zmq_names.CALL_TYPE: - msg_id = socket.recv_string() - - context = socket.recv_pyobj() - message = socket.recv_pyobj() LOG.info(_LI("Received %(msg_type)s message %(msg)s") - % {"msg_type": msg_type, - "msg": str(message)}) + % {"msg_type": request.msg_type, + "msg": str(request.message)}) - if msg_type == zmq_names.CALL_TYPE: + if request.msg_type == zmq_names.CALL_TYPE: return zmq_incoming_message.ZmqIncomingRequest( - self.server, context, message, socket, reply_id, - self.poller) - elif msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES): + self.server, request.context, request.message, socket, + reply_id, self.poller) + elif request.msg_type in zmq_names.NON_BLOCKING_TYPES: return RouterIncomingMessage( - self.server, context, message, socket, reply_id, - msg_id, self.poller) + self.server, request.context, request.message, socket, + reply_id, request.message_id, self.poller) else: - LOG.error(_LE("Unknown message type: %s") % msg_type) + LOG.error(_LE("Unknown message type: %s") % request.msg_type) except zmq.ZMQError as e: LOG.error(_LE("Receiving message failed: %s") % str(e)) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_address.py b/oslo_messaging/_drivers/zmq_driver/zmq_address.py index 7feb05d89..e8c48291b 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_address.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_address.py @@ -18,8 +18,12 @@ def combine_address(host, port): def get_tcp_direct_address(host): - return "tcp://%s" % (host) + return "tcp://%s" % str(host) def get_tcp_random_address(conf): return "tcp://*" + + +def get_broker_address(conf): + return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py index 1c3c33440..0f3112e3b 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py @@ -47,6 +47,7 @@ MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_FANOUT_TYPE) DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, NOTIFY_TYPE) CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE) NOTIFY_TYPES = (NOTIFY_TYPE, NOTIFY_FANOUT_TYPE) +NON_BLOCKING_TYPES = CAST_TYPES + NOTIFY_TYPES def socket_type_str(socket_type): diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py index 21641dd51..c40007523 100644 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py @@ -77,6 +77,7 @@ class ZmqBaseTestCase(test_utils.BaseTestCase): 'rpc_zmq_host': '127.0.0.1', 'rpc_response_timeout': 5, 'rpc_zmq_ipc_dir': self.internal_ipc_dir, + 'zmq_use_broker': False, 'rpc_zmq_matchmaker': 'dummy'} self.config(**kwargs) diff --git a/setup-test-env-zmq.sh b/setup-test-env-zmq.sh index 353c2602c..effe0091b 100755 --- a/setup-test-env-zmq.sh +++ b/setup-test-env-zmq.sh @@ -22,4 +22,6 @@ EOF redis-server --port $ZMQ_REDIS_PORT & +oslo-messaging-zmq-broker --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-broker.log 2>&1 & + $* diff --git a/setup.cfg b/setup.cfg index 8932415f1..ee63dc5bb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -22,7 +22,7 @@ packages = [entry_points] console_scripts = - oslo-messaging-zmq-receiver = oslo_messaging._cmd.zmq_receiver:main + oslo-messaging-zmq-broker = oslo_messaging._cmd.zmq_broker:main oslo.messaging.drivers = rabbit = oslo_messaging._drivers.impl_rabbit:RabbitDriver