From 7c5d039fd355e60e099a0a36408c85a08bfcc2ad Mon Sep 17 00:00:00 2001 From: Oleksii Zamiatin Date: Thu, 4 Aug 2016 15:31:45 +0300 Subject: [PATCH] Move zmq driver options into its own group ZeroMQ driver options are current stored into the DEFAULT group. This change makes the zmq configuration clearer by putting its options into oslo_messaging_zmq group. Change-Id: Ia00fda005b1664750d2646f8c82ebdf295b156fb Closes-bug: #1417040 Co-Authored-By: Oleksii Zamiatin --- doc/source/zmq_driver.rst | 43 +++--- oslo_messaging/_cmd/zmq_proxy.py | 5 +- oslo_messaging/_drivers/impl_pika.py | 0 oslo_messaging/_drivers/impl_zmq.py | 96 ++------------ .../dealer/zmq_dealer_publisher_proxy.py | 2 +- .../_drivers/zmq_driver/client/zmq_client.py | 8 +- .../zmq_driver/client/zmq_receivers.py | 3 +- .../zmq_driver/client/zmq_routing_table.py | 3 +- .../zmq_driver/client/zmq_sockets_manager.py | 3 +- .../_drivers/zmq_driver/proxy/zmq_proxy.py | 2 +- .../zmq_driver/proxy/zmq_queue_proxy.py | 12 +- .../server/consumers/zmq_consumer_base.py | 6 +- .../_drivers/zmq_driver/server/zmq_server.py | 13 +- .../_drivers/zmq_driver/zmq_address.py | 4 +- .../_drivers/zmq_driver/zmq_options.py | 122 ++++++++++++++++++ .../_drivers/zmq_driver/zmq_socket.py | 26 ++-- .../_drivers/zmq_driver/zmq_updater.py | 2 +- oslo_messaging/conffixture.py | 3 +- oslo_messaging/opts.py | 5 +- .../tests/drivers/zmq/test_impl_zmq.py | 2 +- .../tests/drivers/zmq/test_pub_sub.py | 2 +- .../tests/drivers/zmq/zmq_common.py | 6 +- oslo_messaging/tests/functional/utils.py | 12 +- .../tests/functional/zmq/multiproc_utils.py | 3 +- .../tests/functional/zmq/test_startup.py | 6 +- oslo_messaging/tests/test_opts.py | 3 +- setup-test-env-zmq-proxy.sh | 1 + setup-test-env-zmq-pub-sub.sh | 1 + setup-test-env-zmq.sh | 1 + tools/simulator.py | 2 +- 30 files changed, 230 insertions(+), 167 deletions(-) mode change 100755 => 100644 oslo_messaging/_drivers/impl_pika.py create mode 100644 oslo_messaging/_drivers/zmq_driver/zmq_options.py diff --git a/doc/source/zmq_driver.rst b/doc/source/zmq_driver.rst index e73fdf93a..bcc3d668a 100644 --- a/doc/source/zmq_driver.rst +++ b/doc/source/zmq_driver.rst @@ -85,12 +85,14 @@ Configuration Enabling (mandatory) -------------------- -To enable the driver, in the section [DEFAULT] of the conf file, -the 'rpc_backend' flag must be set to 'zmq' and the 'rpc_zmq_host' flag +To enable the driver the 'transport_url' option must be set to 'zmq://' +in the section [DEFAULT] of the conf file, the 'rpc_zmq_host' flag must be set to the hostname of the current node. :: [DEFAULT] - rpc_backend = zmq + transport_url = "zmq://" + + [oslo_messaging_zmq] rpc_zmq_host = {hostname} @@ -110,27 +112,17 @@ RedisMatchMaker: loads the hash table from a remote Redis server, supports dynamic host/topic registrations, host expiration, and hooks for consuming applications to acknowledge or neg-acknowledge topic.host service availability. -To set the MatchMaker class, use option 'rpc_zmq_matchmaker' in [DEFAULT]. :: +For ZeroMQ driver Redis is configured in transport_url also. For using Redis +specify the URL as follows:: - rpc_zmq_matchmaker = dummy - -or:: - - rpc_zmq_matchmaker = redis - -To specify the Redis server for RedisMatchMaker, use options in -[matchmaker_redis] of each project. :: - - [matchmaker_redis] - host = 127.0.0.1 - port = 6379 + [DEFAULT] + transport_url = "zmq+redis://127.0.0.1:6379" In order to cleanup redis storage from expired records (e.g. target listener goes down) TTL may be applied for keys. Configure 'zmq_target_expire' option which is 120 (seconds) by default. The option is related not specifically to -redis so it is also defined in [DEFAULT] section. If option value is <= 0 -then keys don't expire and live forever in the storage. - +redis so it is also defined in [oslo_messaging_zmq] section. If option value +is <= 0 then keys don't expire and live forever in the storage. MatchMaker Data Source (mandatory) ---------------------------------- @@ -159,11 +151,10 @@ we use Sentinel solution and redis master-slave-slave configuration (if we have 3 controllers and run Redis on each of them). To deploy redis with HA follow the `sentinel-install`_ instructions. From the -messaging driver's side you will need to setup following configuration which -is different from a single-node redis deployment :: +messaging driver's side you will need to setup following configuration :: - [matchmaker_redis] - sentinel_hosts=host1:26379, host2:26379, host3:26379 + [DEFAULT] + transport_url = "zmq+redis://host1:26379,host2:26379,host3:26379" Restrict the number of TCP sockets on controller @@ -174,7 +165,7 @@ controller node in directly connected configuration. To solve the issue ROUTER proxy may be used. In order to configure driver to use ROUTER proxy set up the 'use_router_proxy' -option to true in [DEFAULT] section (false is set by default). +option to true in [oslo_messaging_zmq] section (false is set by default). For example:: @@ -198,7 +189,7 @@ direct DEALER/ROUTER unicast which is possible but less efficient and therefore is not recommended. In a case of direct DEALER/ROUTER unicast proxy is not needed. -This option can be set in [DEFAULT] section. +This option can be set in [oslo_messaging_zmq] section. For example:: @@ -218,7 +209,7 @@ All services bind to an IP address or Ethernet adapter. By default, all services bind to '*', effectively binding to 0.0.0.0. This may be changed with the option 'rpc_zmq_bind_address' which accepts a wildcard, IP address, or Ethernet adapter. -This configuration can be set in [DEFAULT] section. +This configuration can be set in [oslo_messaging_zmq] section. For example:: diff --git a/oslo_messaging/_cmd/zmq_proxy.py b/oslo_messaging/_cmd/zmq_proxy.py index a76b0b534..3126a41b3 100644 --- a/oslo_messaging/_cmd/zmq_proxy.py +++ b/oslo_messaging/_cmd/zmq_proxy.py @@ -17,12 +17,13 @@ import logging from oslo_config import cfg -from oslo_messaging._drivers import impl_zmq from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy +from oslo_messaging._drivers.zmq_driver import zmq_options CONF = cfg.CONF -CONF.register_opts(impl_zmq.zmq_opts) + +zmq_options.register_opts(CONF) opt_group = cfg.OptGroup(name='zmq_proxy_opts', title='ZeroMQ proxy options') diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py old mode 100755 new mode 100644 diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 5636d0118..90c2c20bc 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -14,10 +14,8 @@ import logging import os -import socket import threading -from oslo_config import cfg from stevedore import driver from oslo_messaging._drivers import base @@ -25,90 +23,14 @@ from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.client import zmq_client from oslo_messaging._drivers.zmq_driver.server import zmq_server from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_options from oslo_messaging._i18n import _LE -from oslo_messaging import server RPCException = rpc_common.RPCException -_MATCHMAKER_BACKENDS = ('redis', 'dummy') -_MATCHMAKER_DEFAULT = 'redis' LOG = logging.getLogger(__name__) -zmq_opts = [ - cfg.StrOpt('rpc_zmq_bind_address', default='*', - help='ZeroMQ bind address. Should be a wildcard (*), ' - 'an ethernet interface, or IP. ' - 'The "host" option should point or resolve to this ' - 'address.'), - - cfg.StrOpt('rpc_zmq_matchmaker', default=_MATCHMAKER_DEFAULT, - choices=_MATCHMAKER_BACKENDS, - help='MatchMaker driver.'), - - cfg.IntOpt('rpc_zmq_contexts', default=1, - help='Number of ZeroMQ contexts, defaults to 1.'), - - cfg.IntOpt('rpc_zmq_topic_backlog', - help='Maximum number of ingress messages to locally buffer ' - 'per topic. Default is unlimited.'), - - cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', - help='Directory for holding IPC sockets.'), - - cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(), - sample_default='localhost', - help='Name of this node. Must be a valid hostname, FQDN, or ' - 'IP address. Must match "host" option, if running Nova.'), - - cfg.IntOpt('rpc_cast_timeout', default=-1, - help='Seconds to wait before a cast expires (TTL). ' - 'The default value of -1 specifies an infinite linger ' - 'period. The value of 0 specifies no linger period. ' - 'Pending messages shall be discarded immediately ' - 'when the socket is closed. Only supported by impl_zmq.'), - - cfg.IntOpt('rpc_poll_timeout', default=1, - help='The default number of seconds that poll should wait. ' - 'Poll raises timeout exception when timeout expired.'), - - cfg.IntOpt('zmq_target_expire', default=300, - help='Expiration timeout in seconds of a name service record ' - 'about existing target ( < 0 means no timeout).'), - - cfg.IntOpt('zmq_target_update', default=180, - help='Update period in seconds of a name service record ' - 'about existing target.'), - - cfg.BoolOpt('use_pub_sub', default=True, - help='Use PUB/SUB pattern for fanout methods. ' - 'PUB/SUB always uses proxy.'), - - cfg.BoolOpt('use_router_proxy', default=True, - help='Use ROUTER remote proxy.'), - - cfg.PortOpt('rpc_zmq_min_port', - default=49153, - help='Minimal port number for random ports range.'), - - cfg.IntOpt('rpc_zmq_max_port', - min=1, - max=65536, - default=65536, - help='Maximal port number for random ports range.'), - - cfg.IntOpt('rpc_zmq_bind_port_retries', - default=100, - help='Number of retries to find free port number before ' - 'fail with ZMQBindError.'), - - cfg.StrOpt('rpc_zmq_serialization', default='json', - choices=('json', 'msgpack'), - help='Default serialization mechanism for ' - 'serializing/deserializing outgoing/incoming messages') -] - - class LazyDriverItem(object): def __init__(self, item_cls, *args, **kwargs): @@ -174,9 +96,7 @@ class ZmqDriver(base.BaseDriver): if zmq is None: raise ImportError(_LE("ZeroMQ is not available!")) - conf.register_opts(zmq_opts) - conf.register_opts(server._pool_opts) - conf.register_opts(base.base_opts) + zmq_options.register_opts(conf) self.conf = conf self.allowed_remote_exmods = allowed_remote_exmods @@ -186,9 +106,11 @@ class ZmqDriver(base.BaseDriver): ).driver(self.conf, url=url) client_cls = zmq_client.ZmqClientProxy - if conf.use_pub_sub and not conf.use_router_proxy: + if conf.oslo_messaging_zmq.use_pub_sub and not \ + conf.oslo_messaging_zmq.use_router_proxy: client_cls = zmq_client.ZmqClientMixDirectPubSub - elif not conf.use_pub_sub and not conf.use_router_proxy: + elif not conf.oslo_messaging_zmq.use_pub_sub and not \ + conf.oslo_messaging_zmq.use_router_proxy: client_cls = zmq_client.ZmqClientDirect self.client = LazyDriverItem( @@ -206,13 +128,13 @@ class ZmqDriver(base.BaseDriver): zmq_transport, p, matchmaker_backend = url.transport.partition('+') assert zmq_transport == 'zmq', "Needs to be zmq for this transport!" if not matchmaker_backend: - return self.conf.rpc_zmq_matchmaker - elif matchmaker_backend not in _MATCHMAKER_BACKENDS: + return self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker + elif matchmaker_backend not in zmq_options.MATCHMAKER_BACKENDS: raise rpc_common.RPCException( _LE("Incorrect matchmaker backend name %(backend_name)s!" "Available names are: %(available_names)s") % {"backend_name": matchmaker_backend, - "available_names": _MATCHMAKER_BACKENDS}) + "available_names": zmq_options.MATCHMAKER_BACKENDS}) return matchmaker_backend def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py index 29dd3fcd3..fb10ce77f 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -63,7 +63,7 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase): else: return \ [zmq_address.target_to_subscribe_filter(request.target)] \ - if self.conf.use_pub_sub else \ + if self.conf.oslo_messaging_zmq.use_pub_sub else \ self.routing_table.get_all_hosts(request.target) except retrying.RetryError: return [] diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index e7362e2f6..0ec27e9d3 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -39,7 +39,8 @@ class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase): def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): - if conf.use_router_proxy or not conf.use_pub_sub: + if conf.oslo_messaging_zmq.use_router_proxy or not \ + conf.oslo_messaging_zmq.use_pub_sub: raise WrongClientException() publisher_direct = \ @@ -68,7 +69,8 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase): def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): - if conf.use_pub_sub or conf.use_router_proxy: + if conf.oslo_messaging_zmq.use_pub_sub or \ + conf.oslo_messaging_zmq.use_router_proxy: raise WrongClientException() publisher = \ @@ -92,7 +94,7 @@ class ZmqClientProxy(zmq_client_base.ZmqClientBase): def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): - if not conf.use_router_proxy: + if not conf.oslo_messaging_zmq.use_router_proxy: raise WrongClientException() publisher = \ diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py index 63c683f2d..96ebeada0 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py @@ -87,7 +87,8 @@ class ReceiverBase(object): return self._requests.pop((message_id, message_type), None) def _run_loop(self): - data, socket = self._poller.poll(timeout=self.conf.rpc_poll_timeout) + data, socket = self._poller.poll( + timeout=self.conf.oslo_messaging_zmq.rpc_poll_timeout) if data is None: return reply_id, message_type, message_id, response = data diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py index 2abb21b7e..16de0bc63 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py @@ -46,7 +46,8 @@ class RoutingTable(object): return host def _is_tm_expired(self, tm): - return 0 <= self.conf.zmq_target_expire <= time.time() - tm + return 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \ + <= time.time() - tm def _update_routing_table(self, target): routing_record = self.routing_table.get(str(target)) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py index 890d5a168..aa82b8453 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py @@ -57,7 +57,8 @@ class SocketsManager(object): def _check_for_new_hosts(self, target): key = self._key_from_target(target) socket, tm = self.outbound_sockets[key] - if 0 <= self.conf.zmq_target_expire <= time.time() - tm: + if 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \ + <= time.time() - tm: self._get_hosts_and_connect(socket, target) return socket diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py index b35a7f9be..15c777489 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py @@ -85,7 +85,7 @@ class ZmqProxy(object): self.conf = conf self.matchmaker = driver.DriverManager( 'oslo.messaging.zmq.matchmaker', - self.conf.rpc_zmq_matchmaker, + self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker, ).driver(self.conf) self.context = zmq.Context() self.proxy = proxy_cls(conf, self.context, self.matchmaker) diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py index 39de56698..4c747abed 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py @@ -68,8 +68,9 @@ class UniversalQueueProxy(object): return msg_type = message[0] - if self.conf.use_pub_sub and msg_type in (zmq_names.CAST_FANOUT_TYPE, - zmq_names.NOTIFY_TYPE): + if self.conf.oslo_messaging_zmq.use_pub_sub and \ + msg_type in (zmq_names.CAST_FANOUT_TYPE, + zmq_names.NOTIFY_TYPE): self.pub_publisher.send_request(message) else: self._redirect_message(self.be_router_socket.handle @@ -133,12 +134,13 @@ class RouterUpdater(zmq_updater.UpdaterBase): def _update_records(self): self.matchmaker.register_publisher( (self.publisher_address, self.fe_router_address), - expire=self.conf.zmq_target_expire) + expire=self.conf.oslo_messaging_zmq.zmq_target_expire) LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Update PUB publisher"), {"pub": self.publisher_address, "router": self.fe_router_address}) - self.matchmaker.register_router(self.be_router_address, - expire=self.conf.zmq_target_expire) + self.matchmaker.register_router( + self.be_router_address, + expire=self.conf.oslo_messaging_zmq.zmq_target_expire) LOG.info(_LI("[Backend ROUTER:%(router)s] Update ROUTER"), {"router": self.be_router_address}) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py index d413a988c..69a707789 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py @@ -79,8 +79,8 @@ class SingleSocketConsumer(ConsumerBase): {"stype": zmq_names.socket_type_str(socket_type), "addr": socket.bind_address, "port": socket.port}) - self.host = zmq_address.combine_address(self.conf.rpc_zmq_host, - socket.port) + self.host = zmq_address.combine_address( + self.conf.oslo_messaging_zmq.rpc_zmq_host, socket.port) self.poller.register(socket, self.receive_message) return socket except zmq.ZMQError as e: @@ -119,7 +119,7 @@ class TargetUpdater(zmq_updater.UpdaterBase): self.matchmaker.register( self.target, self.host, zmq_names.socket_type_str(self.socket_type), - expire=self.conf.zmq_target_expire) + expire=self.conf.oslo_messaging_zmq.zmq_target_expire) def stop(self): super(TargetUpdater, self).stop() diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index fa7b0bc9c..b40bdc098 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -41,11 +41,14 @@ class ZmqServer(base.PollStyleListener): self.poller = poller or zmq_async.get_poller() self.router_consumer = zmq_router_consumer.RouterConsumer( - conf, self.poller, self) if not conf.use_router_proxy else None + conf, self.poller, self) \ + if not conf.oslo_messaging_zmq.use_router_proxy else None self.dealer_consumer = zmq_dealer_consumer.DealerConsumer( - conf, self.poller, self) if conf.use_router_proxy else None + conf, self.poller, self) \ + if conf.oslo_messaging_zmq.use_router_proxy else None self.sub_consumer = zmq_sub_consumer.SubConsumer( - conf, self.poller, self) if conf.use_pub_sub else None + conf, self.poller, self) \ + if conf.oslo_messaging_zmq.use_pub_sub else None self.consumers = [] if self.router_consumer is not None: @@ -58,7 +61,7 @@ class ZmqServer(base.PollStyleListener): @base.batch_poll_helper def poll(self, timeout=None): message, socket = self.poller.poll( - timeout or self.conf.rpc_poll_timeout) + timeout or self.conf.oslo_messaging_zmq.rpc_poll_timeout) return message def stop(self): @@ -94,7 +97,7 @@ class ZmqNotificationServer(base.PollStyleListener): @base.batch_poll_helper def poll(self, timeout=None): message, socket = self.poller.poll( - timeout or self.conf.rpc_poll_timeout) + timeout or self.conf.oslo_messaging_zmq.rpc_poll_timeout) return message def stop(self): diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_address.py b/oslo_messaging/_drivers/zmq_driver/zmq_address.py index b33c288be..0175e7ec0 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_address.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_address.py @@ -24,11 +24,11 @@ def get_tcp_direct_address(host): def get_tcp_random_address(conf): - return "tcp://%s" % conf.rpc_zmq_bind_address + return "tcp://%s" % conf.oslo_messaging_zmq.rpc_zmq_bind_address def get_broker_address(conf): - return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir + return "ipc://%s/zmq-broker" % conf.oslo_messaging_zmq.rpc_zmq_ipc_dir def prefix_str(key, listener_type): diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_options.py b/oslo_messaging/_drivers/zmq_driver/zmq_options.py new file mode 100644 index 000000000..2ac76f9a8 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/zmq_options.py @@ -0,0 +1,122 @@ +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import socket + +from oslo_config import cfg + +from oslo_messaging._drivers import base +from oslo_messaging import server + + +MATCHMAKER_BACKENDS = ('redis', 'dummy') +MATCHMAKER_DEFAULT = 'redis' + + +zmq_opts = [ + cfg.StrOpt('rpc_zmq_bind_address', default='*', + deprecated_group='DEFAULT', + help='ZeroMQ bind address. Should be a wildcard (*), ' + 'an ethernet interface, or IP. ' + 'The "host" option should point or resolve to this ' + 'address.'), + + cfg.StrOpt('rpc_zmq_matchmaker', default=MATCHMAKER_DEFAULT, + choices=MATCHMAKER_BACKENDS, + deprecated_group='DEFAULT', + help='MatchMaker driver.'), + + cfg.IntOpt('rpc_zmq_contexts', default=1, + deprecated_group='DEFAULT', + help='Number of ZeroMQ contexts, defaults to 1.'), + + cfg.IntOpt('rpc_zmq_topic_backlog', + deprecated_group='DEFAULT', + help='Maximum number of ingress messages to locally buffer ' + 'per topic. Default is unlimited.'), + + cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', + deprecated_group='DEFAULT', + help='Directory for holding IPC sockets.'), + + cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(), + sample_default='localhost', + deprecated_group='DEFAULT', + help='Name of this node. Must be a valid hostname, FQDN, or ' + 'IP address. Must match "host" option, if running Nova.'), + + cfg.IntOpt('rpc_cast_timeout', default=-1, + deprecated_group='DEFAULT', + help='Seconds to wait before a cast expires (TTL). ' + 'The default value of -1 specifies an infinite linger ' + 'period. The value of 0 specifies no linger period. ' + 'Pending messages shall be discarded immediately ' + 'when the socket is closed. Only supported by impl_zmq.'), + + cfg.IntOpt('rpc_poll_timeout', default=1, + deprecated_group='DEFAULT', + help='The default number of seconds that poll should wait. ' + 'Poll raises timeout exception when timeout expired.'), + + cfg.IntOpt('zmq_target_expire', default=300, + deprecated_group='DEFAULT', + help='Expiration timeout in seconds of a name service record ' + 'about existing target ( < 0 means no timeout).'), + + cfg.IntOpt('zmq_target_update', default=180, + deprecated_group='DEFAULT', + help='Update period in seconds of a name service record ' + 'about existing target.'), + + cfg.BoolOpt('use_pub_sub', default=True, + deprecated_group='DEFAULT', + help='Use PUB/SUB pattern for fanout methods. ' + 'PUB/SUB always uses proxy.'), + + cfg.BoolOpt('use_router_proxy', default=True, + deprecated_group='DEFAULT', + help='Use ROUTER remote proxy.'), + + cfg.PortOpt('rpc_zmq_min_port', + default=49153, + deprecated_group='DEFAULT', + help='Minimal port number for random ports range.'), + + cfg.IntOpt('rpc_zmq_max_port', + min=1, + max=65536, + default=65536, + deprecated_group='DEFAULT', + help='Maximal port number for random ports range.'), + + cfg.IntOpt('rpc_zmq_bind_port_retries', + default=100, + deprecated_group='DEFAULT', + help='Number of retries to find free port number before ' + 'fail with ZMQBindError.'), + + cfg.StrOpt('rpc_zmq_serialization', default='json', + choices=('json', 'msgpack'), + deprecated_group='DEFAULT', + help='Default serialization mechanism for ' + 'serializing/deserializing outgoing/incoming messages') +] + + +def register_opts(conf): + opt_group = cfg.OptGroup(name='oslo_messaging_zmq', + title='ZeroMQ driver options') + conf.register_opts(zmq_opts, group=opt_group) + conf.register_opts(server._pool_opts) + conf.register_opts(base.base_opts) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index 14061b2eb..285eafa6c 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -47,8 +47,9 @@ class ZmqSocket(object): self.handle.set_hwm(high_watermark) self.close_linger = -1 - if self.conf.rpc_cast_timeout > 0: - self.close_linger = self.conf.rpc_cast_timeout * 1000 + if self.conf.oslo_messaging_zmq.rpc_cast_timeout > 0: + self.close_linger = \ + self.conf.oslo_messaging_zmq.rpc_cast_timeout * 1000 self.handle.setsockopt(zmq.LINGER, self.close_linger) # Put messages to only connected queues self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0) @@ -96,8 +97,9 @@ class ZmqSocket(object): self.handle.send_multipart(*args, **kwargs) def send_dumped(self, obj, *args, **kwargs): - serialization = kwargs.pop('serialization', - self.conf.rpc_zmq_serialization) + serialization = kwargs.pop( + 'serialization', + self.conf.oslo_messaging_zmq.rpc_zmq_serialization) serializer = self._get_serializer(serialization) s = serializer.dump_as_bytes(obj) self.handle.send(s, *args, **kwargs) @@ -118,8 +120,9 @@ class ZmqSocket(object): return self.handle.recv_multipart(*args, **kwargs) def recv_loaded(self, *args, **kwargs): - serialization = kwargs.pop('serialization', - self.conf.rpc_zmq_serialization) + serialization = kwargs.pop( + 'serialization', + self.conf.oslo_messaging_zmq.rpc_zmq_serialization) serializer = self._get_serializer(serialization) s = self.handle.recv(*args, **kwargs) obj = serializer.load_from_bytes(s) @@ -170,13 +173,13 @@ class ZmqRandomPortSocket(ZmqSocket): high_watermark=high_watermark) self.bind_address = zmq_address.get_tcp_random_address(self.conf) if host is None: - host = conf.rpc_zmq_host + host = conf.oslo_messaging_zmq.rpc_zmq_host try: self.port = self.handle.bind_to_random_port( self.bind_address, - min_port=conf.rpc_zmq_min_port, - max_port=conf.rpc_zmq_max_port, - max_tries=conf.rpc_zmq_bind_port_retries) + min_port=conf.oslo_messaging_zmq.rpc_zmq_min_port, + max_port=conf.oslo_messaging_zmq.rpc_zmq_max_port, + max_tries=conf.oslo_messaging_zmq.rpc_zmq_bind_port_retries) self.connect_address = zmq_address.combine_address(host, self.port) except zmq.ZMQBindError: LOG.error(_LE("Random ports range exceeded!")) @@ -192,7 +195,8 @@ class ZmqFixedPortSocket(ZmqSocket): high_watermark=high_watermark) self.connect_address = zmq_address.combine_address(host, port) self.bind_address = zmq_address.get_tcp_direct_address( - zmq_address.combine_address(conf.rpc_zmq_bind_address, port)) + zmq_address.combine_address( + conf.oslo_messaging_zmq.rpc_zmq_bind_address, port)) self.host = host self.port = port diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py index 302915d1c..2d4f9e0a1 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py @@ -41,7 +41,7 @@ class UpdaterBase(object): def _update_loop(self): self.update_method() - time.sleep(self.conf.zmq_target_update) + time.sleep(self.conf.oslo_messaging_zmq.zmq_target_update) def cleanup(self): self.executor.stop() diff --git a/oslo_messaging/conffixture.py b/oslo_messaging/conffixture.py index 4e6c9d59b..5eb4e5ef7 100644 --- a/oslo_messaging/conffixture.py +++ b/oslo_messaging/conffixture.py @@ -58,7 +58,8 @@ class ConfFixture(fixtures.Fixture): 'oslo_messaging._drivers.amqp1_driver.opts', 'amqp1_opts', 'oslo_messaging_amqp') _import_opts(self.conf, - 'oslo_messaging._drivers.impl_zmq', 'zmq_opts') + 'oslo_messaging._drivers.zmq_driver.zmq_options', + 'zmq_opts', 'oslo_messaging_zmq') _import_opts(self.conf, 'oslo_messaging._drivers.zmq_driver.' 'matchmaker.matchmaker_redis', diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py index b04768a28..c252496ae 100644 --- a/oslo_messaging/opts.py +++ b/oslo_messaging/opts.py @@ -25,7 +25,7 @@ from oslo_messaging._drivers.amqp1_driver import opts as amqp_opts from oslo_messaging._drivers import base as drivers_base from oslo_messaging._drivers import impl_pika from oslo_messaging._drivers import impl_rabbit -from oslo_messaging._drivers import impl_zmq +from oslo_messaging._drivers.impl_zmq import zmq_options from oslo_messaging._drivers.pika_driver import pika_connection_factory from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis from oslo_messaging.notify import notifier @@ -36,7 +36,7 @@ from oslo_messaging import transport _global_opt_lists = [ drivers_base.base_opts, - impl_zmq.zmq_opts, + zmq_options.zmq_opts, server._pool_opts, client._client_opts, transport._transport_opts, @@ -45,6 +45,7 @@ _global_opt_lists = [ _opts = [ (None, list(itertools.chain(*_global_opt_lists))), ('matchmaker_redis', matchmaker_redis.matchmaker_redis_opts), + ('oslo_messaging_zmq', zmq_options.zmq_opts), ('oslo_messaging_amqp', amqp_opts.amqp1_opts), ('oslo_messaging_notifications', notifier._notifier_opts), ('oslo_messaging_rabbit', list( diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py index 76b61cf1c..04d86d9de 100644 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py @@ -35,7 +35,7 @@ class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase): # Set config values kwargs = {'rpc_zmq_min_port': 5555, 'rpc_zmq_max_port': 5560} - self.config(**kwargs) + self.config(group='oslo_messaging_zmq', **kwargs) def test_ports_range(self): listeners = [] diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py index 02519def9..297352156 100644 --- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py +++ b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py @@ -54,7 +54,7 @@ class TestPubSub(zmq_common.ZmqBaseTestCase): kwargs = {'use_pub_sub': True, 'rpc_zmq_serialization': self.serialization} - self.config(**kwargs) + self.config(group='oslo_messaging_zmq', **kwargs) self.config(host="127.0.0.1", group="zmq_proxy_opts") self.config(publisher_port="0", group="zmq_proxy_opts") diff --git a/oslo_messaging/tests/drivers/zmq/zmq_common.py b/oslo_messaging/tests/drivers/zmq/zmq_common.py index f0ef4a419..ff48bfbed 100644 --- a/oslo_messaging/tests/drivers/zmq/zmq_common.py +++ b/oslo_messaging/tests/drivers/zmq/zmq_common.py @@ -20,6 +20,7 @@ import testtools import oslo_messaging from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_options from oslo_messaging._i18n import _LE from oslo_messaging.tests import utils as test_utils @@ -71,17 +72,18 @@ class ZmqBaseTestCase(test_utils.BaseTestCase): def setUp(self): super(ZmqBaseTestCase, self).setUp() self.messaging_conf.transport_driver = 'zmq' + zmq_options.register_opts(self.conf) # Set config values self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path kwargs = {'rpc_zmq_bind_address': '127.0.0.1', 'rpc_zmq_host': '127.0.0.1', - 'rpc_response_timeout': 5, 'rpc_zmq_ipc_dir': self.internal_ipc_dir, 'use_pub_sub': False, 'use_router_proxy': False, 'rpc_zmq_matchmaker': 'dummy'} - self.config(**kwargs) + self.config(group='oslo_messaging_zmq', **kwargs) + self.config(rpc_response_timeout=5) # Get driver transport = oslo_messaging.get_transport(self.conf) diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index 0dcc04778..1d215a593 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -293,10 +293,12 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase): zmq_matchmaker = os.environ.get('ZMQ_MATCHMAKER') if zmq_matchmaker: - self.config(rpc_zmq_matchmaker=zmq_matchmaker) + self.config(rpc_zmq_matchmaker=zmq_matchmaker, + group="oslo_messaging_zmq") zmq_ipc_dir = os.environ.get('ZMQ_IPC_DIR') if zmq_ipc_dir: - self.config(rpc_zmq_ipc_dir=zmq_ipc_dir) + self.config(group="oslo_messaging_zmq", + rpc_zmq_ipc_dir=zmq_ipc_dir) zmq_redis_port = os.environ.get('ZMQ_REDIS_PORT') if zmq_redis_port: self.config(port=zmq_redis_port, group="matchmaker_redis") @@ -304,10 +306,12 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase): self.config(wait_timeout=1000, group="matchmaker_redis") zmq_use_pub_sub = os.environ.get('ZMQ_USE_PUB_SUB') if zmq_use_pub_sub: - self.config(use_pub_sub=zmq_use_pub_sub) + self.config(use_pub_sub=zmq_use_pub_sub, + group='oslo_messaging_zmq') zmq_use_router_proxy = os.environ.get('ZMQ_USE_ROUTER_PROXY') if zmq_use_router_proxy: - self.config(use_router_proxy=zmq_use_router_proxy) + self.config(use_router_proxy=zmq_use_router_proxy, + group='oslo_messaging_zmq') class NotificationFixture(fixtures.Fixture): diff --git a/oslo_messaging/tests/functional/zmq/multiproc_utils.py b/oslo_messaging/tests/functional/zmq/multiproc_utils.py index ee9f56e03..4a1498a13 100644 --- a/oslo_messaging/tests/functional/zmq/multiproc_utils.py +++ b/oslo_messaging/tests/functional/zmq/multiproc_utils.py @@ -70,7 +70,8 @@ def listener_configurer(conf): '%(levelname)-8s %(message)s') h.setFormatter(f) root.addHandler(h) - log_path = conf.rpc_zmq_ipc_dir + "/" + "zmq_multiproc.log" + log_path = conf.oslo_messaging_zmq.rpc_zmq_ipc_dir + \ + "/" + "zmq_multiproc.log" file_handler = logging.StreamHandler(open(log_path, 'w')) file_handler.setFormatter(f) root.addHandler(file_handler) diff --git a/oslo_messaging/tests/functional/zmq/test_startup.py b/oslo_messaging/tests/functional/zmq/test_startup.py index ea287b3b4..f1b89b06b 100644 --- a/oslo_messaging/tests/functional/zmq/test_startup.py +++ b/oslo_messaging/tests/functional/zmq/test_startup.py @@ -30,10 +30,10 @@ class StartupOrderTestCase(multiproc_utils.MutliprocTestCase): self.conf.prog = "test_prog" self.conf.project = "test_project" - kwargs = {'rpc_response_timeout': 30} - self.config(**kwargs) + self.config(rpc_response_timeout=30) - log_path = self.conf.rpc_zmq_ipc_dir + "/" + str(os.getpid()) + ".log" + log_path = os.path.join(self.conf.oslo_messaging_zmq.rpc_zmq_ipc_dir, + str(os.getpid()) + ".log") sys.stdout = open(log_path, "w", buffering=0) def test_call_server_before_client(self): diff --git a/oslo_messaging/tests/test_opts.py b/oslo_messaging/tests/test_opts.py index 2ca8f8a2e..0e4b1f89e 100644 --- a/oslo_messaging/tests/test_opts.py +++ b/oslo_messaging/tests/test_opts.py @@ -32,11 +32,12 @@ class OptsTestCase(test_utils.BaseTestCase): super(OptsTestCase, self).setUp() def _test_list_opts(self, result): - self.assertEqual(5, len(result)) + self.assertEqual(6, len(result)) groups = [g for (g, l) in result] self.assertIn(None, groups) self.assertIn('matchmaker_redis', groups) + self.assertIn('oslo_messaging_zmq', groups) self.assertIn('oslo_messaging_amqp', groups) self.assertIn('oslo_messaging_notifications', groups) self.assertIn('oslo_messaging_rabbit', groups) diff --git a/setup-test-env-zmq-proxy.sh b/setup-test-env-zmq-proxy.sh index ebce12ca7..12649c88b 100755 --- a/setup-test-env-zmq-proxy.sh +++ b/setup-test-env-zmq-proxy.sh @@ -18,6 +18,7 @@ export ZMQ_PROXY_HOST=127.0.0.1 cat > ${DATADIR}/zmq.conf < ${DATADIR}/zmq.conf < ${DATADIR}/zmq.conf <