[zmq] Add backend ROUTER to increase bandwidth

As for zmq we have a native thread running per each
zmq socket to perform async send/receive. Bandwidth
should increase by adding one more ROUTER socket to proxy.
We can not add more because it will increase the number
of connections, but with FE and BE sockets number of
connections will stay the same as with a single ROUTER,
because all clients will connect to FE, and all servers
will connect to BE.

Change-Id: Ib1f070a503272164ec0e9c28ce20530cfa6b79aa
This commit is contained in:
ozamiatin 2016-05-07 01:21:55 +03:00 committed by Oleksii Zamiatin
parent e65539bb70
commit 9cdc9e006b
4 changed files with 62 additions and 27 deletions

View File

@ -35,10 +35,6 @@ Usage example:
def main(): def main():
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)s '
'%(levelname)-8s %(message)s')
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='ZeroMQ proxy service', description='ZeroMQ proxy service',
usage=USAGE usage=USAGE
@ -46,11 +42,21 @@ def main():
parser.add_argument('--config-file', dest='config_file', type=str, parser.add_argument('--config-file', dest='config_file', type=str,
help='Path to configuration file') help='Path to configuration file')
parser.add_argument('-d', '--debug', dest='debug', type=bool,
default=False,
help="Turn on DEBUG logging level instead of INFO")
args = parser.parse_args() args = parser.parse_args()
if args.config_file: if args.config_file:
cfg.CONF(["--config-file", args.config_file]) cfg.CONF(["--config-file", args.config_file])
log_level = logging.INFO
if args.debug:
log_level = logging.DEBUG
logging.basicConfig(level=log_level,
format='%(asctime)s %(name)s '
'%(levelname)-8s %(message)s')
reactor = zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.UniversalQueueProxy) reactor = zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.UniversalQueueProxy)
try: try:

View File

@ -35,23 +35,32 @@ class UniversalQueueProxy(object):
self.matchmaker = matchmaker self.matchmaker = matchmaker
self.poller = zmq_async.get_poller(zmq_concurrency='native') self.poller = zmq_async.get_poller(zmq_concurrency='native')
self.router_socket = zmq_socket.ZmqRandomPortSocket( self.fe_router_socket = zmq_socket.ZmqRandomPortSocket(
conf, context, zmq.ROUTER)
self.be_router_socket = zmq_socket.ZmqRandomPortSocket(
conf, context, zmq.ROUTER) conf, context, zmq.ROUTER)
self.poller.register(self.router_socket.handle, self.poller.register(self.fe_router_socket.handle,
self._receive_in_request)
self.poller.register(self.be_router_socket.handle,
self._receive_in_request) self._receive_in_request)
self.router_address = zmq_address.combine_address( self.fe_router_address = zmq_address.combine_address(
self.conf.rpc_zmq_host, self.router_socket.port) self.conf.rpc_zmq_host, self.fe_router_socket.port)
self.be_router_address = zmq_address.combine_address(
self.conf.rpc_zmq_host, self.fe_router_socket.port)
self.pub_publisher = zmq_pub_publisher.PubPublisherProxy( self.pub_publisher = zmq_pub_publisher.PubPublisherProxy(
conf, matchmaker) conf, matchmaker)
self.matchmaker.register_publisher( self.matchmaker.register_publisher(
(self.pub_publisher.host, self.router_address)) (self.pub_publisher.host, self.fe_router_address))
LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Run PUB publisher"), LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Run PUB publisher"),
{"pub": self.pub_publisher.host, {"pub": self.pub_publisher.host,
"router": self.router_address}) "router": self.fe_router_address})
self.matchmaker.register_router(self.be_router_address)
LOG.info(_LI("[Backend ROUTER:%(router)s] Run ROUTER"),
{"router": self.be_router_address})
def run(self): def run(self):
message, socket = self.poller.poll(self.conf.rpc_poll_timeout) message, socket = self.poller.poll(self.conf.rpc_poll_timeout)
@ -63,10 +72,13 @@ class UniversalQueueProxy(object):
LOG.debug("-> Redirecting request %s to TCP publisher", envelope) LOG.debug("-> Redirecting request %s to TCP publisher", envelope)
self.pub_publisher.send_request(message) self.pub_publisher.send_request(message)
elif not envelope.is_mult_send: elif not envelope.is_mult_send:
self._redirect_message(message) self._redirect_message(self.be_router_socket
if socket is self.fe_router_socket
else self.fe_router_socket, message)
@staticmethod @staticmethod
def _receive_in_request(socket): def _receive_in_request(socket):
try:
reply_id = socket.recv() reply_id = socket.recv()
assert reply_id is not None, "Valid id expected" assert reply_id is not None, "Valid id expected"
empty = socket.recv() empty = socket.recv()
@ -75,19 +87,25 @@ class UniversalQueueProxy(object):
payload = socket.recv_multipart() payload = socket.recv_multipart()
payload.insert(zmq_names.MULTIPART_IDX_ENVELOPE, envelope) payload.insert(zmq_names.MULTIPART_IDX_ENVELOPE, envelope)
return payload return payload
except (AssertionError, zmq.ZMQError):
LOG.error("Received message with wrong format")
return None
def _redirect_message(self, multipart_message): @staticmethod
def _redirect_message(socket, multipart_message):
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
LOG.debug("<-> Dispatch message: %s", envelope) LOG.debug("<-> Dispatch message: %s", envelope)
response_binary = multipart_message[zmq_names.MULTIPART_IDX_BODY] response_binary = multipart_message[zmq_names.MULTIPART_IDX_BODY]
self.router_socket.send(envelope.routing_key, zmq.SNDMORE) socket.send(envelope.routing_key, zmq.SNDMORE)
self.router_socket.send(b'', zmq.SNDMORE) socket.send(b'', zmq.SNDMORE)
self.router_socket.send_pyobj(envelope, zmq.SNDMORE) socket.send_pyobj(envelope, zmq.SNDMORE)
self.router_socket.send(response_binary) socket.send(response_binary)
def cleanup(self): def cleanup(self):
self.router_socket.close() self.fe_router_socket.close()
self.be_router_socket.close()
self.pub_publisher.cleanup() self.pub_publisher.cleanup()
self.matchmaker.unregister_publisher( self.matchmaker.unregister_publisher(
(self.pub_publisher.host, self.router_address)) (self.pub_publisher.host, self.fe_router_address))
self.matchmaker.unregister_router(self.be_router_address)

View File

@ -110,6 +110,7 @@ class SocketsManager(object):
self.zmq_context = zmq.Context() self.zmq_context = zmq.Context()
self.outbound_sockets = {} self.outbound_sockets = {}
self.socket_to_publishers = None self.socket_to_publishers = None
self.socket_to_routers = None
def _track_socket(self, socket, target): def _track_socket(self, socket, target):
self.outbound_sockets[str(target)] = (socket, time.time()) self.outbound_sockets[str(target)] = (socket, time.time())
@ -162,6 +163,16 @@ class SocketsManager(object):
self.socket_to_publishers.connect_to_host(router_address) self.socket_to_publishers.connect_to_host(router_address)
return self.socket_to_publishers return self.socket_to_publishers
def get_socket_to_routers(self):
if self.socket_to_routers is not None:
return self.socket_to_routers
self.socket_to_routers = zmq_socket.ZmqSocket(
self.conf, self.zmq_context, self.socket_type)
routers = self.matchmaker.get_routers()
for router_address in routers:
self.socket_to_routers.connect_to_host(router_address)
return self.socket_to_routers
def cleanup(self): def cleanup(self):
for socket, tm in self.outbound_sockets.values(): for socket, tm in self.outbound_sockets.values():
socket.close() socket.close()

View File

@ -87,7 +87,7 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase):
self.target = server.target self.target = server.target
self.sockets_manager = zmq_publisher_base.SocketsManager( self.sockets_manager = zmq_publisher_base.SocketsManager(
conf, self.matchmaker, zmq.ROUTER, zmq.DEALER) conf, self.matchmaker, zmq.ROUTER, zmq.DEALER)
self.socket = self.sockets_manager.get_socket_to_publishers() self.socket = self.sockets_manager.get_socket_to_routers()
self.poller.register(self.socket, self.receive_message) self.poller.register(self.socket, self.receive_message)
self.host = self.socket.handle.identity self.host = self.socket.handle.identity
self.target_updater = zmq_consumer_base.TargetUpdater( self.target_updater = zmq_consumer_base.TargetUpdater(