From 08dd23d1d412cfdd8d3a8923a5245daba2e89a3d Mon Sep 17 00:00:00 2001 From: Oleksii Zamiatin Date: Mon, 25 Jan 2016 15:00:38 +0200 Subject: [PATCH] [zmq] Reduce proxy for direct messaging Since the change I643a111fca8bac32f41ced232d54ff2a2ebcbf77 we don't need proxy for direct types because any message woud be sent before server listening to the target appears on name server registry, so DEALER wouldn't block. Change-Id: I3c0f3e6930a4092cac5a6e18529d98e6d6e65f32 --- doc/source/zmq_driver.rst | 18 ++-- oslo_messaging/_drivers/impl_zmq.py | 6 +- .../zmq_driver/broker/zmq_queue_proxy.py | 27 +----- .../dealer/zmq_dealer_call_publisher.py | 37 +------- .../dealer/zmq_dealer_publisher_proxy.py | 87 ------------------- .../_drivers/zmq_driver/client/zmq_client.py | 4 +- .../_drivers/zmq_driver/client/zmq_request.py | 1 - .../server/consumers/zmq_router_consumer.py | 17 ---- .../zmq_driver/server/zmq_incoming_message.py | 5 -- .../_drivers/zmq_driver/server/zmq_server.py | 6 +- .../tests/drivers/zmq/test_impl_zmq.py | 2 +- .../tests/drivers/zmq/zmq_common.py | 1 - .../tests/functional/zmq/test_startup.py | 3 +- 13 files changed, 18 insertions(+), 196 deletions(-) delete mode 100644 oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py diff --git a/doc/source/zmq_driver.rst b/doc/source/zmq_driver.rst index 0ab7903a1..9f9a74d23 100644 --- a/doc/source/zmq_driver.rst +++ b/doc/source/zmq_driver.rst @@ -138,28 +138,24 @@ stored in Redis is that the key is a base topic and the corresponding values are hostname arrays to be sent to. -Proxy to avoid blocking (optional) ----------------------------------- +Proxy for fanout publishing +--------------------------- -Each machine running OpenStack services, or sending RPC messages, may run the -'oslo-messaging-zmq-broker' daemon. This is needed to avoid blocking -if a listener (server) appears after the sender (client). +Each machine running OpenStack services, or sending RPC messages, should run +the 'oslo-messaging-zmq-broker' daemon. Fanout-based patterns like CAST+Fanout and notifications always use proxy as they act over PUB/SUB, 'use_pub_sub' - defaults to True. If not using PUB/SUB (use_pub_sub = False) then fanout will be emulated over direct DEALER/ROUTER unicast which is possible but less efficient and therefore -is not recommended. +is not recommended. In a case of direct DEALER/ROUTER unicast proxy is not +needed. -Running direct RPC methods like CALL and CAST over a proxy is controlled by -the option 'direct_over_proxy' which is True by default. - -These options can be set in [DEFAULT] section. +This option can be set in [DEFAULT] section. For example:: use_pub_sub = True - direct_over_proxy = False In case of using the broker all publishers (clients) talk to servers over diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 5eb3fdc35..3d02f68c8 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -72,11 +72,7 @@ zmq_opts = [ help='Expiration timeout in seconds of a name service record ' 'about existing target ( < 0 means no timeout).'), - cfg.BoolOpt('direct_over_proxy', default=False, - help='Configures zmq-messaging to use proxy with ' - 'non PUB/SUB patterns.'), - - cfg.BoolOpt('use_pub_sub', default=True, + cfg.BoolOpt('use_pub_sub', default=False, help='Use PUB/SUB pattern for fanout methods. ' 'PUB/SUB always uses proxy.'), diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index 1d5729c80..b5e71c13d 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -15,14 +15,12 @@ import logging from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy -from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ - import zmq_dealer_publisher_proxy from oslo_messaging._drivers.zmq_driver.client.publishers \ import zmq_pub_publisher from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LI +from oslo_messaging._i18n import _LE, _LI zmq = zmq_async.import_zmq(zmq_concurrency='native') LOG = logging.getLogger(__name__) @@ -41,9 +39,6 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): LOG.info(_LI("Polling at universal proxy")) self.matchmaker = matchmaker - reply_receiver = zmq_dealer_publisher_proxy.ReplyReceiver(self.poller) - self.direct_publisher = zmq_dealer_publisher_proxy \ - .DealerPublisherProxy(conf, matchmaker, reply_receiver) self.pub_publisher = zmq_pub_publisher.PubPublisherProxy( conf, matchmaker) @@ -54,8 +49,6 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): if socket == self.router_socket: self._redirect_in_request(message) - else: - self._redirect_reply(message) def _redirect_in_request(self, multipart_message): LOG.debug("-> Redirecting request %s to TCP publisher", @@ -65,19 +58,6 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): envelope[zmq_names.FIELD_MSG_TYPE] \ in zmq_names.MULTISEND_TYPES: self.pub_publisher.send_request(multipart_message) - else: - self.direct_publisher.send_request(multipart_message) - - def _redirect_reply(self, reply): - LOG.debug("Reply proxy %s", reply) - if reply[zmq_names.IDX_REPLY_TYPE] == zmq_names.ACK_TYPE: - LOG.debug("Acknowledge dropped %s", reply) - return - - LOG.debug("<- Redirecting reply to ROUTER: reply: %s", - reply[zmq_names.IDX_REPLY_BODY:]) - - self.router_socket.send_multipart(reply[zmq_names.IDX_REPLY_BODY:]) def _receive_in_request(self, socket): reply_id = socket.recv() @@ -85,8 +65,9 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): empty = socket.recv() assert empty == b'', "Empty delimiter expected" envelope = socket.recv_pyobj() - if envelope[zmq_names.FIELD_MSG_TYPE] == zmq_names.CALL_TYPE: - envelope[zmq_names.FIELD_REPLY_ID] = reply_id + if envelope[zmq_names.FIELD_MSG_TYPE] not in zmq_names.MULTISEND_TYPES: + LOG.error(_LE("Message type %s is not supported by proxy"), + envelope[zmq_names.FIELD_MSG_TYPE]) payload = socket.recv_multipart() payload.insert(0, envelope) return payload diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py index c1115588e..dfccc70a0 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py @@ -43,9 +43,7 @@ class DealerCallPublisher(object): self.conf = conf self.matchmaker = matchmaker self.reply_waiter = ReplyWaiter(conf) - self.sender = RequestSender(conf, matchmaker, self.reply_waiter) \ - if not conf.direct_over_proxy else \ - RequestSenderLight(conf, matchmaker, self.reply_waiter) + self.sender = RequestSender(conf, matchmaker, self.reply_waiter) def send_request(self, request): reply_future = self.sender.send_request(request) @@ -113,39 +111,6 @@ class RequestSender(zmq_publisher_base.PublisherBase): super(RequestSender, self).cleanup() -class RequestSenderLight(RequestSender): - """This class used with proxy. - - Simplified address matching because there is only - one proxy IPC address. - """ - - def __init__(self, conf, matchmaker, reply_waiter): - if not conf.direct_over_proxy: - raise rpc_common.RPCException("RequestSenderLight needs a proxy!") - - super(RequestSenderLight, self).__init__( - conf, matchmaker, reply_waiter) - - self.socket = None - - def _connect_socket(self, target): - return self.outbound_sockets.get_socket_to_broker(target) - - def _do_send_request(self, socket, request): - LOG.debug("Sending %(type)s message_id %(message)s" - " to a target %(target)s", - {"type": request.msg_type, - "message": request.message_id, - "target": request.target}) - - envelope = request.create_envelope() - - socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(envelope, zmq.SNDMORE) - socket.send_pyobj(request) - - class ReplyWaiter(object): def __init__(self, conf): diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py deleted file mode 100644 index f233d099b..000000000 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ /dev/null @@ -1,87 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging - -from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ - import zmq_dealer_publisher -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LI, _LW - -zmq = zmq_async.import_zmq() - -LOG = logging.getLogger(__name__) - - -class DealerPublisherProxy(zmq_dealer_publisher.DealerPublisher): - - def __init__(self, conf, matchmaker, reply_receiver): - super(DealerPublisherProxy, self).__init__(conf, matchmaker) - self.reply_receiver = reply_receiver - - def send_request(self, multipart_message): - - envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] - - LOG.debug("Envelope: %s", envelope) - - target = envelope[zmq_names.FIELD_TARGET] - dealer_socket = self._check_hosts_connections( - target, zmq_names.socket_type_str(zmq.ROUTER)) - - if not dealer_socket.connections: - # NOTE(ozamiatin): Here we can provide - # a queue for keeping messages to send them later - # when some listener appears. However such approach - # being more reliable will consume additional memory. - LOG.warning(_LW("Request %s was dropped because no connection"), - envelope[zmq_names.FIELD_MSG_TYPE]) - return - - self.reply_receiver.track_socket(dealer_socket.handle) - - LOG.debug("Sending message %(message)s to a target %(target)s" - % {"message": envelope[zmq_names.FIELD_MSG_ID], - "target": envelope[zmq_names.FIELD_TARGET]}) - - if envelope[zmq_names.FIELD_MSG_TYPE] in zmq_names.MULTISEND_TYPES: - for _ in range(dealer_socket.connections_count()): - self._send_request(dealer_socket, multipart_message) - else: - self._send_request(dealer_socket, multipart_message) - - def _send_request(self, socket, multipart_message): - - socket.send(b'', zmq.SNDMORE) - socket.send_pyobj( - multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE], - zmq.SNDMORE) - socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY]) - - -class ReplyReceiver(object): - - def __init__(self, poller): - self.poller = poller - LOG.info(_LI("Reply waiter created in broker")) - - def _receive_reply(self, socket): - return socket.recv_multipart() - - def track_socket(self, socket): - self.poller.register(socket, self._receive_reply) - - def cleanup(self): - self.poller.close() diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index ffe484557..c6bc67993 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -30,9 +30,7 @@ class ZmqClient(zmq_client_base.ZmqClientBase): def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): default_publisher = zmq_dealer_publisher.DealerPublisher( - conf, matchmaker) if not conf.direct_over_proxy else \ - zmq_dealer_publisher.DealerPublisherLight( - conf, zmq_address.get_broker_address(conf)) + conf, matchmaker) fanout_publisher = zmq_dealer_publisher.DealerPublisherLight( conf, zmq_address.get_broker_address(conf)) \ diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py index 05edefffd..2646451bf 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py @@ -68,7 +68,6 @@ class Request(object): "retry must be an integer, not {0}".format(type(retry))) self.message_id = str(uuid.uuid1()) - self.proxy_reply_id = None def create_envelope(self): return {'msg_type': self.msg_type, diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index c317b9929..94617b3d7 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -100,23 +100,6 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): LOG.error(_LE("Receiving message failed: %s"), str(e)) -class RouterConsumerBroker(RouterConsumer): - - def __init__(self, conf, poller, server): - super(RouterConsumerBroker, self).__init__(conf, poller, server) - - def _receive_request(self, socket): - reply_id = socket.recv() - empty = socket.recv() - assert empty == b'', 'Bad format: empty delimiter expected' - envelope = socket.recv_pyobj() - request = socket.recv_pyobj() - - if zmq_names.FIELD_REPLY_ID in envelope: - request.proxy_reply_id = envelope[zmq_names.FIELD_REPLY_ID] - return request, reply_id - - class TargetsManager(object): def __init__(self, conf, matchmaker, host): diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index e009d55c9..f1db740c0 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -45,7 +45,6 @@ class ZmqIncomingRequest(base.IncomingMessage): zmq_names.FIELD_REPLY: reply, zmq_names.FIELD_FAILURE: failure, zmq_names.FIELD_LOG_FAILURE: log_failure, - zmq_names.FIELD_ID: self.request.proxy_reply_id, zmq_names.FIELD_MSG_ID: self.request.message_id} LOG.debug("Replying %s", (str(self.request.message_id))) @@ -53,10 +52,6 @@ class ZmqIncomingRequest(base.IncomingMessage): self.received = True self.reply_socket.send(self.reply_id, zmq.SNDMORE) self.reply_socket.send(b'', zmq.SNDMORE) - if self.request.proxy_reply_id: - self.reply_socket.send_string(zmq_names.REPLY_TYPE, zmq.SNDMORE) - self.reply_socket.send(self.request.proxy_reply_id, zmq.SNDMORE) - self.reply_socket.send(b'', zmq.SNDMORE) self.reply_socket.send_pyobj(message_reply) self.poller.resume_polling(self.reply_socket) diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index 62c12c7ff..8a95a1f6e 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -34,10 +34,8 @@ class ZmqServer(base.Listener): super(ZmqServer, self).__init__(driver) self.matchmaker = matchmaker self.poller = zmq_async.get_poller() - self.router_consumer = zmq_router_consumer.RouterConsumerBroker( - conf, self.poller, self) if conf.direct_over_proxy else \ - zmq_router_consumer.RouterConsumer( - conf, self.poller, self) + self.router_consumer = zmq_router_consumer.RouterConsumer( + conf, self.poller, self) self.sub_consumer = zmq_sub_consumer.SubConsumer( conf, self.poller, self) if conf.use_pub_sub else None self.notify_consumer = self.sub_consumer if conf.use_pub_sub \ diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py index 5f86679b3..5b957e625 100644 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py @@ -145,7 +145,7 @@ class TestZmqBasics(zmq_common.ZmqBaseTestCase): message = {'method': 'hello-world', 'tx_id': 1} context = {} - target.topic = target.topic + '.info' + target.topic += '.info' self.driver.send_notification(target, context, message, '3.0') self.listener._received.wait(5) self.assertTrue(self.listener._received.isSet()) diff --git a/oslo_messaging/tests/drivers/zmq/zmq_common.py b/oslo_messaging/tests/drivers/zmq/zmq_common.py index f5c6bafac..21b56e6a9 100644 --- a/oslo_messaging/tests/drivers/zmq/zmq_common.py +++ b/oslo_messaging/tests/drivers/zmq/zmq_common.py @@ -78,7 +78,6 @@ class ZmqBaseTestCase(test_utils.BaseTestCase): 'rpc_response_timeout': 5, 'rpc_zmq_ipc_dir': self.internal_ipc_dir, 'use_pub_sub': False, - 'direct_over_proxy': False, 'rpc_zmq_matchmaker': 'dummy'} self.config(**kwargs) diff --git a/oslo_messaging/tests/functional/zmq/test_startup.py b/oslo_messaging/tests/functional/zmq/test_startup.py index c3258131f..6f114ae4a 100644 --- a/oslo_messaging/tests/functional/zmq/test_startup.py +++ b/oslo_messaging/tests/functional/zmq/test_startup.py @@ -31,8 +31,7 @@ class StartupOrderTestCase(multiproc_utils.MutliprocTestCase): self.conf.project = "test_project" kwargs = {'rpc_response_timeout': 30, - 'use_pub_sub': False, - 'direct_over_proxy': False} + 'use_pub_sub': False} self.config(**kwargs) log_path = self.conf.rpc_zmq_ipc_dir + "/" + str(os.getpid()) + ".log"