From 9cdc9e006bfa12d4ca9c1a0dd434813ace7c5b9c Mon Sep 17 00:00:00 2001 From: ozamiatin Date: Sat, 7 May 2016 01:21:55 +0300 Subject: [PATCH] [zmq] Add backend ROUTER to increase bandwidth As for zmq we have a native thread running per each zmq socket to perform async send/receive. Bandwidth should increase by adding one more ROUTER socket to proxy. We can not add more because it will increase the number of connections, but with FE and BE sockets number of connections will stay the same as with a single ROUTER, because all clients will connect to FE, and all servers will connect to BE. Change-Id: Ib1f070a503272164ec0e9c28ce20530cfa6b79aa --- oslo_messaging/_cmd/zmq_proxy.py | 14 +++-- .../zmq_driver/broker/zmq_queue_proxy.py | 62 ++++++++++++------- .../client/publishers/zmq_publisher_base.py | 11 ++++ .../server/consumers/zmq_dealer_consumer.py | 2 +- 4 files changed, 62 insertions(+), 27 deletions(-) diff --git a/oslo_messaging/_cmd/zmq_proxy.py b/oslo_messaging/_cmd/zmq_proxy.py index cea63d78..03ccea19 100644 --- a/oslo_messaging/_cmd/zmq_proxy.py +++ b/oslo_messaging/_cmd/zmq_proxy.py @@ -35,10 +35,6 @@ Usage example: def main(): - logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s %(name)s ' - '%(levelname)-8s %(message)s') - parser = argparse.ArgumentParser( description='ZeroMQ proxy service', usage=USAGE @@ -46,11 +42,21 @@ def main(): parser.add_argument('--config-file', dest='config_file', type=str, help='Path to configuration file') + parser.add_argument('-d', '--debug', dest='debug', type=bool, + default=False, + help="Turn on DEBUG logging level instead of INFO") args = parser.parse_args() if args.config_file: cfg.CONF(["--config-file", args.config_file]) + log_level = logging.INFO + if args.debug: + log_level = logging.DEBUG + logging.basicConfig(level=log_level, + format='%(asctime)s %(name)s ' + '%(levelname)-8s %(message)s') + reactor = zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.UniversalQueueProxy) try: diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index e3e3755d..31143646 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -35,23 +35,32 @@ class UniversalQueueProxy(object): self.matchmaker = matchmaker self.poller = zmq_async.get_poller(zmq_concurrency='native') - self.router_socket = zmq_socket.ZmqRandomPortSocket( + self.fe_router_socket = zmq_socket.ZmqRandomPortSocket( + conf, context, zmq.ROUTER) + self.be_router_socket = zmq_socket.ZmqRandomPortSocket( conf, context, zmq.ROUTER) - self.poller.register(self.router_socket.handle, + self.poller.register(self.fe_router_socket.handle, + self._receive_in_request) + self.poller.register(self.be_router_socket.handle, self._receive_in_request) - self.router_address = zmq_address.combine_address( - self.conf.rpc_zmq_host, self.router_socket.port) + self.fe_router_address = zmq_address.combine_address( + self.conf.rpc_zmq_host, self.fe_router_socket.port) + self.be_router_address = zmq_address.combine_address( + self.conf.rpc_zmq_host, self.fe_router_socket.port) self.pub_publisher = zmq_pub_publisher.PubPublisherProxy( conf, matchmaker) self.matchmaker.register_publisher( - (self.pub_publisher.host, self.router_address)) + (self.pub_publisher.host, self.fe_router_address)) LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Run PUB publisher"), {"pub": self.pub_publisher.host, - "router": self.router_address}) + "router": self.fe_router_address}) + self.matchmaker.register_router(self.be_router_address) + LOG.info(_LI("[Backend ROUTER:%(router)s] Run ROUTER"), + {"router": self.be_router_address}) def run(self): message, socket = self.poller.poll(self.conf.rpc_poll_timeout) @@ -63,31 +72,40 @@ class UniversalQueueProxy(object): LOG.debug("-> Redirecting request %s to TCP publisher", envelope) self.pub_publisher.send_request(message) elif not envelope.is_mult_send: - self._redirect_message(message) + self._redirect_message(self.be_router_socket + if socket is self.fe_router_socket + else self.fe_router_socket, message) @staticmethod def _receive_in_request(socket): - reply_id = socket.recv() - assert reply_id is not None, "Valid id expected" - empty = socket.recv() - assert empty == b'', "Empty delimiter expected" - envelope = socket.recv_pyobj() - payload = socket.recv_multipart() - payload.insert(zmq_names.MULTIPART_IDX_ENVELOPE, envelope) - return payload + try: + reply_id = socket.recv() + assert reply_id is not None, "Valid id expected" + empty = socket.recv() + assert empty == b'', "Empty delimiter expected" + envelope = socket.recv_pyobj() + payload = socket.recv_multipart() + payload.insert(zmq_names.MULTIPART_IDX_ENVELOPE, envelope) + return payload + except (AssertionError, zmq.ZMQError): + LOG.error("Received message with wrong format") + return None - def _redirect_message(self, multipart_message): + @staticmethod + def _redirect_message(socket, multipart_message): envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] LOG.debug("<-> Dispatch message: %s", envelope) response_binary = multipart_message[zmq_names.MULTIPART_IDX_BODY] - self.router_socket.send(envelope.routing_key, zmq.SNDMORE) - self.router_socket.send(b'', zmq.SNDMORE) - self.router_socket.send_pyobj(envelope, zmq.SNDMORE) - self.router_socket.send(response_binary) + socket.send(envelope.routing_key, zmq.SNDMORE) + socket.send(b'', zmq.SNDMORE) + socket.send_pyobj(envelope, zmq.SNDMORE) + socket.send(response_binary) def cleanup(self): - self.router_socket.close() + self.fe_router_socket.close() + self.be_router_socket.close() self.pub_publisher.cleanup() self.matchmaker.unregister_publisher( - (self.pub_publisher.host, self.router_address)) + (self.pub_publisher.host, self.fe_router_address)) + self.matchmaker.unregister_router(self.be_router_address) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index a701dad9..51e8d418 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -110,6 +110,7 @@ class SocketsManager(object): self.zmq_context = zmq.Context() self.outbound_sockets = {} self.socket_to_publishers = None + self.socket_to_routers = None def _track_socket(self, socket, target): self.outbound_sockets[str(target)] = (socket, time.time()) @@ -162,6 +163,16 @@ class SocketsManager(object): self.socket_to_publishers.connect_to_host(router_address) return self.socket_to_publishers + def get_socket_to_routers(self): + if self.socket_to_routers is not None: + return self.socket_to_routers + self.socket_to_routers = zmq_socket.ZmqSocket( + self.conf, self.zmq_context, self.socket_type) + routers = self.matchmaker.get_routers() + for router_address in routers: + self.socket_to_routers.connect_to_host(router_address) + return self.socket_to_routers + def cleanup(self): for socket, tm in self.outbound_sockets.values(): socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py index 8fd7b562..dc541945 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py @@ -87,7 +87,7 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase): self.target = server.target self.sockets_manager = zmq_publisher_base.SocketsManager( conf, self.matchmaker, zmq.ROUTER, zmq.DEALER) - self.socket = self.sockets_manager.get_socket_to_publishers() + self.socket = self.sockets_manager.get_socket_to_routers() self.poller.register(self.socket, self.receive_message) self.host = self.socket.handle.identity self.target_updater = zmq_consumer_base.TargetUpdater(