[zmq] Let proxy serve on a static port numbers
Currently proxy binds to a random port from a port range specified in zmq config and therefore needs to register in redis to become visible to clients and servers. That could be done much simpler by using a static port(s) for proxy. Moreover zmq handles reconnect to a socket if restarted service uses the same port number as it had before restart. Change-Id: I088792fd08a4161d08e9160830fc3ec4d560cca4
This commit is contained in:
parent
0ecc25509f
commit
18c8bc933d
@ -18,14 +18,15 @@ import logging
|
||||
from oslo_config import cfg
|
||||
|
||||
from oslo_messaging._drivers import impl_zmq
|
||||
from oslo_messaging._drivers.zmq_driver.broker import zmq_proxy
|
||||
from oslo_messaging._drivers.zmq_driver.broker import zmq_queue_proxy
|
||||
from oslo_messaging import server
|
||||
from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
|
||||
from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(impl_zmq.zmq_opts)
|
||||
CONF.register_opts(server._pool_opts)
|
||||
CONF.rpc_zmq_native = True
|
||||
|
||||
opt_group = cfg.OptGroup(name='zmq_proxy_opts',
|
||||
title='ZeroMQ proxy options')
|
||||
CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group)
|
||||
|
||||
|
||||
USAGE = """ Usage: ./zmq-proxy.py [-h] [] ...
|
||||
@ -42,9 +43,20 @@ def main():
|
||||
|
||||
parser.add_argument('--config-file', dest='config_file', type=str,
|
||||
help='Path to configuration file')
|
||||
|
||||
parser.add_argument('--host', dest='host', type=str,
|
||||
help='Host FQDN for current proxy')
|
||||
parser.add_argument('--frontend-port', dest='frontend_port', type=int,
|
||||
help='Front-end ROUTER port number')
|
||||
parser.add_argument('--backend-port', dest='backend_port', type=int,
|
||||
help='Back-end ROUTER port number')
|
||||
parser.add_argument('--publisher-port', dest='publisher_port', type=int,
|
||||
help='Back-end PUBLISHER port number')
|
||||
|
||||
parser.add_argument('-d', '--debug', dest='debug', type=bool,
|
||||
default=False,
|
||||
help="Turn on DEBUG logging level instead of INFO")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.config_file:
|
||||
@ -57,6 +69,18 @@ def main():
|
||||
format='%(asctime)s %(name)s '
|
||||
'%(levelname)-8s %(message)s')
|
||||
|
||||
if args.host:
|
||||
CONF.zmq_proxy_opts.host = args.host
|
||||
if args.frontend_port:
|
||||
CONF.set_override('frontend_port', args.frontend_port,
|
||||
group='zmq_proxy_opts')
|
||||
if args.backend_port:
|
||||
CONF.set_override('backend_port', args.backend_port,
|
||||
group='zmq_proxy_opts')
|
||||
if args.publisher_port:
|
||||
CONF.set_override('publisher_port', args.publisher_port,
|
||||
group='zmq_proxy_opts')
|
||||
|
||||
reactor = zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.UniversalQueueProxy)
|
||||
|
||||
try:
|
||||
|
@ -13,9 +13,11 @@
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import socket
|
||||
|
||||
from stevedore import driver
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._i18n import _LI
|
||||
|
||||
@ -23,6 +25,22 @@ zmq = zmq_async.import_zmq()
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
zmq_proxy_opts = [
|
||||
cfg.StrOpt('host', default=socket.gethostname(),
|
||||
help='Hostname (FQDN) of current proxy'
|
||||
' an ethernet interface, or IP address.'),
|
||||
|
||||
cfg.IntOpt('frontend_port', default=0,
|
||||
help='Front-end ROUTER port number. Zero means random.'),
|
||||
|
||||
cfg.IntOpt('backend_port', default=0,
|
||||
help='Back-end ROUTER port number. Zero means random.'),
|
||||
|
||||
cfg.IntOpt('publisher_port', default=0,
|
||||
help='Publisher port number. Zero means random.'),
|
||||
]
|
||||
|
||||
|
||||
class ZmqProxy(object):
|
||||
"""Wrapper class for Publishers and Routers proxies.
|
||||
The main reason to have a proxy is high complexity of TCP sockets number
|
@ -14,7 +14,6 @@
|
||||
|
||||
import logging
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
@ -24,7 +23,7 @@ LOG = logging.getLogger(__name__)
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class PubPublisherProxy(object):
|
||||
class PublisherProxy(object):
|
||||
"""PUB/SUB based request publisher
|
||||
|
||||
The publisher intended to be used for Fanout and Notify
|
||||
@ -39,16 +38,20 @@ class PubPublisherProxy(object):
|
||||
"""
|
||||
|
||||
def __init__(self, conf, matchmaker):
|
||||
super(PubPublisherProxy, self).__init__()
|
||||
super(PublisherProxy, self).__init__()
|
||||
self.conf = conf
|
||||
self.zmq_context = zmq.Context()
|
||||
self.matchmaker = matchmaker
|
||||
|
||||
self.socket = zmq_socket.ZmqRandomPortSocket(
|
||||
self.conf, self.zmq_context, zmq.PUB)
|
||||
port = conf.zmq_proxy_opts.publisher_port
|
||||
|
||||
self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
|
||||
self.socket.port)
|
||||
self.socket = zmq_socket.ZmqFixedPortSocket(
|
||||
self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host,
|
||||
port) if port != 0 else \
|
||||
zmq_socket.ZmqRandomPortSocket(
|
||||
self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host)
|
||||
|
||||
self.host = self.socket.connect_address
|
||||
|
||||
def send_request(self, multipart_message):
|
||||
message_type = multipart_message.pop(0)
|
@ -16,9 +16,7 @@ import logging
|
||||
|
||||
import six
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers \
|
||||
import zmq_pub_publisher
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver.proxy import zmq_publisher_proxy
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
@ -38,27 +36,31 @@ class UniversalQueueProxy(object):
|
||||
self.matchmaker = matchmaker
|
||||
self.poller = zmq_async.get_poller()
|
||||
|
||||
self.fe_router_socket = zmq_socket.ZmqRandomPortSocket(
|
||||
conf, context, zmq.ROUTER)
|
||||
self.be_router_socket = zmq_socket.ZmqRandomPortSocket(
|
||||
conf, context, zmq.ROUTER)
|
||||
port = conf.zmq_proxy_opts.frontend_port
|
||||
host = conf.zmq_proxy_opts.host
|
||||
self.fe_router_socket = zmq_socket.ZmqFixedPortSocket(
|
||||
conf, context, zmq.ROUTER, host,
|
||||
conf.zmq_proxy_opts.frontend_port) if port != 0 else \
|
||||
zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host)
|
||||
|
||||
port = conf.zmq_proxy_opts.backend_port
|
||||
self.be_router_socket = zmq_socket.ZmqFixedPortSocket(
|
||||
conf, context, zmq.ROUTER, host,
|
||||
conf.zmq_proxy_opts.backend_port) if port != 0 else \
|
||||
zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host)
|
||||
|
||||
self.poller.register(self.fe_router_socket.handle,
|
||||
self._receive_in_request)
|
||||
self.poller.register(self.be_router_socket.handle,
|
||||
self._receive_in_request)
|
||||
|
||||
self.fe_router_address = zmq_address.combine_address(
|
||||
self.conf.rpc_zmq_host, self.fe_router_socket.port)
|
||||
self.be_router_address = zmq_address.combine_address(
|
||||
self.conf.rpc_zmq_host, self.be_router_socket.port)
|
||||
|
||||
self.pub_publisher = zmq_pub_publisher.PubPublisherProxy(
|
||||
self.pub_publisher = zmq_publisher_proxy.PublisherProxy(
|
||||
conf, matchmaker)
|
||||
|
||||
self._router_updater = RouterUpdater(
|
||||
conf, matchmaker, self.pub_publisher.host, self.fe_router_address,
|
||||
self.be_router_address)
|
||||
conf, matchmaker, self.pub_publisher.host,
|
||||
self.fe_router_socket.connect_address,
|
||||
self.be_router_socket.connect_address)
|
||||
|
||||
def run(self):
|
||||
message, socket = self.poller.poll()
|
||||
@ -102,16 +104,17 @@ class UniversalQueueProxy(object):
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send(reply_id, zmq.SNDMORE)
|
||||
socket.send(six.b(str(message_type)), zmq.SNDMORE)
|
||||
LOG.debug("Dispatching message %s" % message_id)
|
||||
LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - to %(rkey)s" %
|
||||
{"msg_type": zmq_names.message_type_str(message_type),
|
||||
"msg_id": message_id,
|
||||
"rkey": routing_key})
|
||||
socket.send_multipart(multipart_message)
|
||||
|
||||
def cleanup(self):
|
||||
self.fe_router_socket.close()
|
||||
self.be_router_socket.close()
|
||||
self.pub_publisher.cleanup()
|
||||
self.matchmaker.unregister_publisher(
|
||||
(self.pub_publisher.host, self.fe_router_address))
|
||||
self.matchmaker.unregister_router(self.be_router_address)
|
||||
self._router_updater.cleanup()
|
||||
|
||||
|
||||
class RouterUpdater(zmq_updater.UpdaterBase):
|
||||
@ -138,3 +141,10 @@ class RouterUpdater(zmq_updater.UpdaterBase):
|
||||
expire=self.conf.zmq_target_expire)
|
||||
LOG.info(_LI("[Backend ROUTER:%(router)s] Update ROUTER"),
|
||||
{"router": self.be_router_address})
|
||||
|
||||
def cleanup(self):
|
||||
super(RouterUpdater, self).cleanup()
|
||||
self.matchmaker.unregister_publisher(
|
||||
(self.publisher_address, self.fe_router_address))
|
||||
self.matchmaker.unregister_router(
|
||||
self.be_router_address)
|
@ -150,23 +150,50 @@ class ZmqSocket(object):
|
||||
self.connect_to_address(address)
|
||||
|
||||
|
||||
class ZmqPortRangeExceededException(exceptions.MessagingException):
|
||||
"""Raised by ZmqRandomPortSocket - wrapping zmq.ZMQBindError"""
|
||||
class ZmqPortBusy(exceptions.MessagingException):
|
||||
"""Raised when binding to a port failure"""
|
||||
|
||||
def __init__(self, port_number):
|
||||
super(ZmqPortBusy, self).__init__()
|
||||
self.port_number = port_number
|
||||
|
||||
|
||||
class ZmqRandomPortSocket(ZmqSocket):
|
||||
|
||||
def __init__(self, conf, context, socket_type, high_watermark=0):
|
||||
def __init__(self, conf, context, socket_type, host=None,
|
||||
high_watermark=0):
|
||||
super(ZmqRandomPortSocket, self).__init__(conf, context, socket_type,
|
||||
high_watermark)
|
||||
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
|
||||
|
||||
if host is None:
|
||||
host = conf.rpc_zmq_host
|
||||
try:
|
||||
self.port = self.handle.bind_to_random_port(
|
||||
self.bind_address,
|
||||
min_port=conf.rpc_zmq_min_port,
|
||||
max_port=conf.rpc_zmq_max_port,
|
||||
max_tries=conf.rpc_zmq_bind_port_retries)
|
||||
self.connect_address = zmq_address.combine_address(host, self.port)
|
||||
except zmq.ZMQBindError:
|
||||
LOG.error(_LE("Random ports range exceeded!"))
|
||||
raise ZmqPortRangeExceededException()
|
||||
raise ZmqPortBusy(port_number=0)
|
||||
|
||||
|
||||
class ZmqFixedPortSocket(ZmqSocket):
|
||||
|
||||
def __init__(self, conf, context, socket_type, host, port,
|
||||
high_watermark=0):
|
||||
super(ZmqFixedPortSocket, self).__init__(conf, context, socket_type,
|
||||
high_watermark)
|
||||
self.connect_address = zmq_address.combine_address(host, port)
|
||||
self.bind_address = zmq_address.get_tcp_direct_address(
|
||||
zmq_address.combine_address(conf.rpc_zmq_bind_address, port))
|
||||
self.host = host
|
||||
self.port = port
|
||||
|
||||
try:
|
||||
self.handle.bind(self.bind_address)
|
||||
except zmq.ZMQError as e:
|
||||
LOG.exception(e)
|
||||
LOG.error(_LE("Chosen port %d is being busy.") % self.port)
|
||||
raise ZmqPortBusy(port_number=port)
|
||||
|
@ -45,7 +45,7 @@ class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
|
||||
target = oslo_messaging.Target(topic='testtopic_' + str(i))
|
||||
new_listener = self.driver.listen(target, None, None)
|
||||
listeners.append(new_listener)
|
||||
except zmq_socket.ZmqPortRangeExceededException:
|
||||
except zmq_socket.ZmqPortBusy:
|
||||
pass
|
||||
|
||||
self.assertLessEqual(len(listeners), 5)
|
||||
|
@ -13,15 +13,17 @@
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import msgpack
|
||||
import time
|
||||
|
||||
import msgpack
|
||||
import six
|
||||
import testscenarios
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers \
|
||||
import zmq_pub_publisher
|
||||
from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
|
||||
from oslo_messaging._drivers.zmq_driver.proxy import zmq_publisher_proxy
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
@ -31,6 +33,10 @@ load_tests = testscenarios.load_tests_apply_scenarios
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
opt_group = cfg.OptGroup(name='zmq_proxy_opts',
|
||||
title='ZeroMQ proxy options')
|
||||
cfg.CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group)
|
||||
|
||||
|
||||
class TestPubSub(zmq_common.ZmqBaseTestCase):
|
||||
|
||||
@ -50,7 +56,10 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
|
||||
'rpc_zmq_serialization': self.serialization}
|
||||
self.config(**kwargs)
|
||||
|
||||
self.publisher = zmq_pub_publisher.PubPublisherProxy(
|
||||
self.config(host="127.0.0.1", group="zmq_proxy_opts")
|
||||
self.config(publisher_port="0", group="zmq_proxy_opts")
|
||||
|
||||
self.publisher = zmq_publisher_proxy.PublisherProxy(
|
||||
self.conf, self.driver.matchmaker)
|
||||
self.driver.matchmaker.register_publisher(
|
||||
(self.publisher.host, ""))
|
||||
@ -59,6 +68,12 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
|
||||
for i in range(self.LISTENERS_COUNT):
|
||||
self.listeners.append(zmq_common.TestServerListener(self.driver))
|
||||
|
||||
def tearDown(self):
|
||||
super(TestPubSub, self).tearDown()
|
||||
self.publisher.cleanup()
|
||||
for listener in self.listeners:
|
||||
listener.stop()
|
||||
|
||||
def _send_request(self, target):
|
||||
# Needed only in test env to give listener a chance to connect
|
||||
# before request fires
|
||||
|
@ -13,6 +13,8 @@ export ZMQ_IPC_DIR=${DATADIR}
|
||||
export ZMQ_USE_PUB_SUB=false
|
||||
export ZMQ_USE_ROUTER_PROXY=true
|
||||
|
||||
export ZMQ_PROXY_HOST=127.0.0.1
|
||||
|
||||
cat > ${DATADIR}/zmq.conf <<EOF
|
||||
[DEFAULT]
|
||||
transport_url=${TRANSPORT_URL}
|
||||
@ -22,6 +24,9 @@ use_pub_sub=${ZMQ_USE_PUB_SUB}
|
||||
use_router_proxy=${ZMQ_USE_ROUTER_PROXY}
|
||||
[matchmaker_redis]
|
||||
port=${ZMQ_REDIS_PORT}
|
||||
|
||||
[zmq_proxy_opts]
|
||||
host=${ZMQ_PROXY_HOST}
|
||||
EOF
|
||||
|
||||
redis-server --port $ZMQ_REDIS_PORT &
|
||||
|
Loading…
Reference in New Issue
Block a user