[zmq] Add config options to specify dynamic ports range
Operators may need a possibility to restrict ports ranges for a specified services in order to distinguish ports related to zmq messaging from all other ports in a system. Change-Id: Ibe5b02c1211b16859ff58bc02a59d96e1d2fa660 Closes-Bug: #1511181
This commit is contained in:
parent
3f6ef7be46
commit
4a3ddce05b
|
@ -79,7 +79,20 @@ zmq_opts = [
|
||||||
|
|
||||||
cfg.BoolOpt('zmq_use_broker',
|
cfg.BoolOpt('zmq_use_broker',
|
||||||
default=True,
|
default=True,
|
||||||
help='Shows whether zmq-messaging uses broker or not.')
|
help='Shows whether zmq-messaging uses broker or not.'),
|
||||||
|
|
||||||
|
cfg.IntOpt('rpc_zmq_min_port',
|
||||||
|
default=49152,
|
||||||
|
help='Minimal port number for random ports range.'),
|
||||||
|
|
||||||
|
cfg.IntOpt('rpc_zmq_max_port',
|
||||||
|
default=65536,
|
||||||
|
help='Maximal port number for random ports range.'),
|
||||||
|
|
||||||
|
cfg.IntOpt('rpc_zmq_bind_port_retries',
|
||||||
|
default=100,
|
||||||
|
help='Number of retries to find free port number before '
|
||||||
|
'fail with ZMQBindError.')
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,8 @@ import logging
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
|
from oslo_messaging._i18n import _LE
|
||||||
|
from oslo_messaging import exceptions
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -79,10 +81,23 @@ class ZmqSocket(object):
|
||||||
self.handle.close(*args, **kwargs)
|
self.handle.close(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class ZmqPortRangeExceededException(exceptions.MessagingException):
|
||||||
|
"""Raised by ZmqRandomPortSocket - wrapping zmq.ZMQBindError"""
|
||||||
|
|
||||||
|
|
||||||
class ZmqRandomPortSocket(ZmqSocket):
|
class ZmqRandomPortSocket(ZmqSocket):
|
||||||
|
|
||||||
def __init__(self, conf, context, socket_type):
|
def __init__(self, conf, context, socket_type):
|
||||||
super(ZmqRandomPortSocket, self).__init__(context, socket_type)
|
super(ZmqRandomPortSocket, self).__init__(context, socket_type)
|
||||||
self.conf = conf
|
self.conf = conf
|
||||||
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
|
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
|
||||||
self.port = self.handle.bind_to_random_port(self.bind_address)
|
|
||||||
|
try:
|
||||||
|
self.port = self.handle.bind_to_random_port(
|
||||||
|
self.bind_address,
|
||||||
|
min_port=conf.rpc_zmq_min_port,
|
||||||
|
max_port=conf.rpc_zmq_max_port,
|
||||||
|
max_tries=conf.rpc_zmq_bind_port_retries)
|
||||||
|
except zmq.ZMQBindError:
|
||||||
|
LOG.error(_LE("Random ports range exceeded!"))
|
||||||
|
raise ZmqPortRangeExceededException()
|
||||||
|
|
|
@ -21,6 +21,7 @@ import testtools
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
from oslo_messaging._drivers import impl_zmq
|
from oslo_messaging._drivers import impl_zmq
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||||
from oslo_messaging._i18n import _
|
from oslo_messaging._i18n import _
|
||||||
from oslo_messaging.tests import utils as test_utils
|
from oslo_messaging.tests import utils as test_utils
|
||||||
|
|
||||||
|
@ -78,7 +79,9 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
|
||||||
'rpc_response_timeout': 5,
|
'rpc_response_timeout': 5,
|
||||||
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
|
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
|
||||||
'zmq_use_broker': False,
|
'zmq_use_broker': False,
|
||||||
'rpc_zmq_matchmaker': 'dummy'}
|
'rpc_zmq_matchmaker': 'dummy',
|
||||||
|
'rpc_zmq_min_port': 5555,
|
||||||
|
'rpc_zmq_max_port': 5560}
|
||||||
self.config(**kwargs)
|
self.config(**kwargs)
|
||||||
|
|
||||||
# Get driver
|
# Get driver
|
||||||
|
@ -115,6 +118,22 @@ class stopRpc(object):
|
||||||
|
|
||||||
class TestZmqBasics(ZmqBaseTestCase):
|
class TestZmqBasics(ZmqBaseTestCase):
|
||||||
|
|
||||||
|
def test_ports_range(self):
|
||||||
|
listeners = []
|
||||||
|
|
||||||
|
for i in range(10):
|
||||||
|
try:
|
||||||
|
target = oslo_messaging.Target(topic='testtopic_'+str(i))
|
||||||
|
new_listener = self.driver.listen(target)
|
||||||
|
listeners.append(new_listener)
|
||||||
|
except zmq_socket.ZmqPortRangeExceededException:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self.assertLessEqual(len(listeners), 5)
|
||||||
|
|
||||||
|
for l in listeners:
|
||||||
|
l.cleanup()
|
||||||
|
|
||||||
def test_send_receive_raises(self):
|
def test_send_receive_raises(self):
|
||||||
"""Call() without method."""
|
"""Call() without method."""
|
||||||
target = oslo_messaging.Target(topic='testtopic')
|
target = oslo_messaging.Target(topic='testtopic')
|
||||||
|
|
Loading…
Reference in New Issue