From 4a3ddce05bac17903fb768a47b042d4bc17fd0d9 Mon Sep 17 00:00:00 2001 From: Oleksii Zamiatin Date: Wed, 11 Nov 2015 14:12:20 +0200 Subject: [PATCH] [zmq] Add config options to specify dynamic ports range Operators may need a possibility to restrict ports ranges for a specified services in order to distinguish ports related to zmq messaging from all other ports in a system. Change-Id: Ibe5b02c1211b16859ff58bc02a59d96e1d2fa660 Closes-Bug: #1511181 --- oslo_messaging/_drivers/impl_zmq.py | 15 ++++++++++++- .../_drivers/zmq_driver/zmq_socket.py | 17 ++++++++++++++- .../tests/drivers/zmq/test_impl_zmq.py | 21 ++++++++++++++++++- 3 files changed, 50 insertions(+), 3 deletions(-) diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index aed6563a9..acff79c7b 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -79,7 +79,20 @@ zmq_opts = [ cfg.BoolOpt('zmq_use_broker', default=True, - help='Shows whether zmq-messaging uses broker or not.') + help='Shows whether zmq-messaging uses broker or not.'), + + cfg.IntOpt('rpc_zmq_min_port', + default=49152, + help='Minimal port number for random ports range.'), + + cfg.IntOpt('rpc_zmq_max_port', + default=65536, + help='Maximal port number for random ports range.'), + + cfg.IntOpt('rpc_zmq_bind_port_retries', + default=100, + help='Number of retries to find free port number before ' + 'fail with ZMQBindError.') ] diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index 8e51e30f1..8f79bd083 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -17,6 +17,8 @@ import logging from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._i18n import _LE +from oslo_messaging import exceptions LOG = logging.getLogger(__name__) @@ -79,10 +81,23 @@ class ZmqSocket(object): self.handle.close(*args, **kwargs) +class ZmqPortRangeExceededException(exceptions.MessagingException): + """Raised by ZmqRandomPortSocket - wrapping zmq.ZMQBindError""" + + class ZmqRandomPortSocket(ZmqSocket): def __init__(self, conf, context, socket_type): super(ZmqRandomPortSocket, self).__init__(context, socket_type) self.conf = conf self.bind_address = zmq_address.get_tcp_random_address(self.conf) - self.port = self.handle.bind_to_random_port(self.bind_address) + + try: + self.port = self.handle.bind_to_random_port( + self.bind_address, + min_port=conf.rpc_zmq_min_port, + max_port=conf.rpc_zmq_max_port, + max_tries=conf.rpc_zmq_bind_port_retries) + except zmq.ZMQBindError: + LOG.error(_LE("Random ports range exceeded!")) + raise ZmqPortRangeExceededException() diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py index c40007523..feb1f7656 100644 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py @@ -21,6 +21,7 @@ import testtools import oslo_messaging from oslo_messaging._drivers import impl_zmq from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_socket from oslo_messaging._i18n import _ from oslo_messaging.tests import utils as test_utils @@ -78,7 +79,9 @@ class ZmqBaseTestCase(test_utils.BaseTestCase): 'rpc_response_timeout': 5, 'rpc_zmq_ipc_dir': self.internal_ipc_dir, 'zmq_use_broker': False, - 'rpc_zmq_matchmaker': 'dummy'} + 'rpc_zmq_matchmaker': 'dummy', + 'rpc_zmq_min_port': 5555, + 'rpc_zmq_max_port': 5560} self.config(**kwargs) # Get driver @@ -115,6 +118,22 @@ class stopRpc(object): class TestZmqBasics(ZmqBaseTestCase): + def test_ports_range(self): + listeners = [] + + for i in range(10): + try: + target = oslo_messaging.Target(topic='testtopic_'+str(i)) + new_listener = self.driver.listen(target) + listeners.append(new_listener) + except zmq_socket.ZmqPortRangeExceededException: + pass + + self.assertLessEqual(len(listeners), 5) + + for l in listeners: + l.cleanup() + def test_send_receive_raises(self): """Call() without method.""" target = oslo_messaging.Target(topic='testtopic')