diff --git a/doc/source/zmq_driver.rst b/doc/source/zmq_driver.rst index 9f9a74d23..4c5e4119b 100644 --- a/doc/source/zmq_driver.rst +++ b/doc/source/zmq_driver.rst @@ -118,7 +118,6 @@ To specify the Redis server for RedisMatchMaker, use options in [matchmaker_redis] host = 127.0.0.1 port = 6379 - password = None In order to cleanup redis storage from expired records (e.g. target listener goes down) TTL may be applied for keys. Configure 'zmq_target_expire' option @@ -138,16 +137,51 @@ stored in Redis is that the key is a base topic and the corresponding values are hostname arrays to be sent to. +Proxy and huge number of TCP sockets +------------------------------------ + +The most heavily used RPC pattern (CALL) may consume too many TCP sockets in +directly connected configuration. To solve the issue ROUTER proxy may be used. +In order to configure driver to use ROUTER proxy set up the 'use_router_proxy' +option to True in [DEFAULT] section (False is set by default). + +For example:: + + use_router_proxy = True + +Not less than 3 proxies should be running on controllers or on stand alone +nodes. The parameters for the script oslo-messaging-zmq-proxy should be:: + + oslo-messaging-zmq-proxy + --type ROUTER + --config-file /etc/oslo/zeromq.conf + --log-file /var/log/oslo/zmq-router-proxy.log + + Proxy for fanout publishing --------------------------- -Each machine running OpenStack services, or sending RPC messages, should run -the 'oslo-messaging-zmq-broker' daemon. - Fanout-based patterns like CAST+Fanout and notifications always use proxy -as they act over PUB/SUB, 'use_pub_sub' - defaults to True. If not using -PUB/SUB (use_pub_sub = False) then fanout will be emulated over direct -DEALER/ROUTER unicast which is possible but less efficient and therefore +as they act over PUB/SUB, 'use_pub_sub' option defaults to True. In such case +publisher proxy should be running. Publisher-proxies are independent from each +other. Recommended number of proxies in the cloud is not less than 3. You +may run them on a standalone nodes or on controller nodes. +The parameters for the script oslo-messaging-zmq-proxy should be:: + + oslo-messaging-zmq-proxy + --type PUBLISHER + --config-file /etc/oslo/zeromq.conf + --log-file /var/log/oslo/zmq-publisher-proxy.log + +Actually PUBLISHER is the default value for the parameter --type, so +could be omitted:: + + oslo-messaging-zmq-proxy + --config-file /etc/oslo/zeromq.conf + --log-file /var/log/oslo/zmq-publisher-proxy.log + +If not using PUB/SUB (use_pub_sub = False) then fanout will be emulated over +direct DEALER/ROUTER unicast which is possible but less efficient and therefore is not recommended. In a case of direct DEALER/ROUTER unicast proxy is not needed. @@ -158,23 +192,12 @@ For example:: use_pub_sub = True -In case of using the broker all publishers (clients) talk to servers over -the local broker connecting to it via IPC transport. - -The IPC runtime directory, 'rpc_zmq_ipc_dir', can be set in [DEFAULT] section. - -For example:: - - rpc_zmq_ipc_dir = /var/run/openstack - -The parameters for the script oslo-messaging-zmq-receiver should be:: - - oslo-messaging-zmq-broker - --config-file /etc/oslo/zeromq.conf - --log-file /var/log/oslo/zmq-broker.log +In case of using a proxy all publishers (clients) talk to servers over +the proxy connecting to it via TCP. You can specify ZeroMQ options in /etc/oslo/zeromq.conf if necessary. + Listening Address (optional) ---------------------------- diff --git a/oslo_messaging/_cmd/zmq_broker.py b/oslo_messaging/_cmd/zmq_broker.py deleted file mode 100644 index 82c1580af..000000000 --- a/oslo_messaging/_cmd/zmq_broker.py +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import sys -import time - -from oslo_config import cfg - -from oslo_messaging._drivers import impl_zmq -from oslo_messaging._drivers.zmq_driver.broker import zmq_broker -from oslo_messaging import server - -CONF = cfg.CONF -CONF.register_opts(impl_zmq.zmq_opts) -CONF.register_opts(server._pool_opts) -CONF.rpc_zmq_native = True - - -def main(): - CONF(sys.argv[1:], project='oslo') - logging.basicConfig(level=logging.DEBUG) - - reactor = zmq_broker.ZmqBroker(CONF) - reactor.start() - - while True: - time.sleep(1) - -if __name__ == "__main__": - main() diff --git a/oslo_messaging/_cmd/zmq_proxy.py b/oslo_messaging/_cmd/zmq_proxy.py new file mode 100644 index 000000000..5669f5aa5 --- /dev/null +++ b/oslo_messaging/_cmd/zmq_proxy.py @@ -0,0 +1,74 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import logging +import time + +from oslo_config import cfg + +from oslo_messaging._drivers import impl_zmq +from oslo_messaging._drivers.zmq_driver.broker import zmq_proxy +from oslo_messaging import server + +CONF = cfg.CONF +CONF.register_opts(impl_zmq.zmq_opts) +CONF.register_opts(server._pool_opts) +CONF.rpc_zmq_native = True + + +USAGE = """ Usage: ./zmq-proxy.py --type {PUBLISHER,ROUTER} [-h] [] ... + +Usage example: + python oslo_messaging/_cmd/zmq-proxy.py\ + --type PUBLISHER""" + + +PUBLISHER = 'PUBLISHER' +ROUTER = 'ROUTER' +PROXY_TYPES = (PUBLISHER, ROUTER) + + +def main(): + logging.basicConfig(level=logging.DEBUG) + + parser = argparse.ArgumentParser( + description='ZeroMQ proxy service', + usage=USAGE + ) + + parser.add_argument('--type', dest='proxy_type', type=str, + default=PUBLISHER, + help='Proxy type PUBLISHER or ROUTER') + parser.add_argument('--config-file', dest='config_file', type=str, + help='Path to configuration file') + args = parser.parse_args() + + if args.config_file: + cfg.CONF(["--config-file", args.config_file]) + + if args.proxy_type not in PROXY_TYPES: + raise Exception("Bad proxy type %s, should be one of %s" % + (args.proxy_type, PROXY_TYPES)) + + reactor = zmq_proxy.ZmqPublisher(CONF) if args.proxy_type == PUBLISHER \ + else zmq_proxy.ZmqRouter(CONF) + + reactor.start() + + while True: + time.sleep(1) + +if __name__ == "__main__": + main() diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 0726ae097..568df14da 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -83,6 +83,9 @@ zmq_opts = [ help='Use PUB/SUB pattern for fanout methods. ' 'PUB/SUB always uses proxy.'), + cfg.BoolOpt('use_router_proxy', default=False, + help='Use ROUTER remote proxy for direct methods.'), + cfg.PortOpt('rpc_zmq_min_port', default=49153, help='Minimal port number for random ports range.'), diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py deleted file mode 100644 index 8351e2ef9..000000000 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py +++ /dev/null @@ -1,78 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import os - -from oslo_utils import excutils -from stevedore import driver - -from oslo_messaging._drivers.zmq_driver.broker import zmq_queue_proxy -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._i18n import _LE, _LI - -zmq = zmq_async.import_zmq(zmq_concurrency='native') -LOG = logging.getLogger(__name__) - - -class ZmqBroker(object): - """Local messaging IPC broker (nodes are still peers). - The main purpose is to have native zeromq application. - Benefits of such approach are following: - - 1. No risk to block the main thread of the process by unpatched - native parts of the libzmq (c-library is completely monkey-patch - unfriendly) - 2. Making use of standard zmq approaches as async pollers, - devices, queues etc. - 3. Possibility to implement queue persistence not touching existing - clients (staying in a separate process). - """ - - def __init__(self, conf): - super(ZmqBroker, self).__init__() - self.conf = conf - self._create_ipc_dirs() - self.matchmaker = driver.DriverManager( - 'oslo.messaging.zmq.matchmaker', - self.conf.rpc_zmq_matchmaker, - ).driver(self.conf) - - self.context = zmq.Context() - self.proxies = [zmq_queue_proxy.UniversalQueueProxy( - conf, self.context, self.matchmaker) - ] - - def _create_ipc_dirs(self): - ipc_dir = self.conf.rpc_zmq_ipc_dir - try: - os.makedirs("%s/fanout" % ipc_dir) - except os.error: - if not os.path.isdir(ipc_dir): - with excutils.save_and_reraise_exception(): - LOG.error(_LE("Required IPC directory does not exist at" - " %s"), ipc_dir) - - def start(self): - for proxy in self.proxies: - proxy.start() - - def wait(self): - for proxy in self.proxies: - proxy.wait() - - def close(self): - LOG.info(_LI("Broker shutting down ...")) - for proxy in self.proxies: - proxy.stop() diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_proxy.py new file mode 100644 index 000000000..03f48b312 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_proxy.py @@ -0,0 +1,115 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os + +from oslo_utils import excutils +from stevedore import driver + +from oslo_messaging._drivers.zmq_driver.broker import zmq_queue_proxy +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._i18n import _LE, _LI + +zmq = zmq_async.import_zmq(zmq_concurrency='native') +LOG = logging.getLogger(__name__) + + +class ZmqProxy(object): + """Base class for Publishers and Routers proxies. + The main reason to have a proxy is high complexity of TCP sockets number + growth with direct connections (when services connect directly to + each other). The general complexity for ZeroMQ+Openstack deployment + with direct connections may be square(N) (where N is a number of nodes + in deployment). With proxy the complexity is reduced to k*N where + k is a number of services. + + Currently there are 2 types of proxy, they are Publishers and Routers. + Publisher proxy serves for PUB-SUB pattern implementation where + Publisher is a server which performs broadcast to subscribers. + Router is used for direct message types in case of number of TCP socket + connections is critical for specific deployment. Generally 3 publishers + is enough for deployment. Routers should be + """ + + def __init__(self, conf): + super(ZmqProxy, self).__init__() + self.conf = conf + self._create_ipc_dirs() + self.matchmaker = driver.DriverManager( + 'oslo.messaging.zmq.matchmaker', + self.conf.rpc_zmq_matchmaker, + ).driver(self.conf) + self.context = zmq.Context() + self.proxies = [] + + def _create_ipc_dirs(self): + ipc_dir = self.conf.rpc_zmq_ipc_dir + try: + os.makedirs("%s/fanout" % ipc_dir) + except os.error: + if not os.path.isdir(ipc_dir): + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Required IPC directory does not exist at" + " %s"), ipc_dir) + + def start(self): + for proxy in self.proxies: + proxy.start() + + def wait(self): + for proxy in self.proxies: + proxy.wait() + + def close(self): + LOG.info(_LI("Broker shutting down ...")) + for proxy in self.proxies: + proxy.stop() + + +class ZmqPublisher(ZmqProxy): + + def __init__(self, conf): + super(ZmqPublisher, self).__init__(conf) + self.proxies.append(zmq_queue_proxy.PublisherProxy( + conf, self.context, self.matchmaker)) + + +class ZmqRouter(ZmqProxy): + """Router is used for direct messages in order to reduce the number of + allocated TCP sockets in controller. The list of requirements to Router: + + 1. There may be any number of routers in the deployment. Routers are + registered in a name-server and client connects dynamically to all of + them performing load balancing. + 2. Routers should be transparent for clients and servers. Which means + it doesn't change the way of messaging between client and the final + target by hiding the target from a client. + 3. Router may be restarted or get down at any time loosing all messages + in its queue. Smart retrying (based on acknowledgements from server + side) and load balancing between other Router instances from the + client side should handle the situation. + 4. Router takes all the routing information from message envelope and + doesn't perform Target-resolution in any way. + 5. Routers don't talk to each other and no synchronization is needed. + 6. Load balancing is performed by the client in a round-robin fashion. + + Those requirements should limit the performance impact caused by using + of proxies making proxies as lightweight as possible. + """ + + def __init__(self, conf): + super(ZmqRouter, self).__init__(conf) + self.proxies.append(zmq_queue_proxy.RouterProxy( + conf, self.context, self.matchmaker)) diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index 6d92465c5..e4845fe13 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -12,15 +12,19 @@ # License for the specific language governing permissions and limitations # under the License. +import abc import logging from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy +from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ + import zmq_dealer_publisher_proxy from oslo_messaging._drivers.zmq_driver.client.publishers \ import zmq_pub_publisher from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LE, _LI +from oslo_messaging._drivers.zmq_driver import zmq_socket +from oslo_messaging._i18n import _LI zmq = zmq_async.import_zmq(zmq_concurrency='native') LOG = logging.getLogger(__name__) @@ -30,25 +34,62 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): def __init__(self, conf, context, matchmaker): super(UniversalQueueProxy, self).__init__(conf, context) + self.matchmaker = matchmaker self.poller = zmq_async.get_poller(zmq_concurrency='native') - self.router_socket = context.socket(zmq.ROUTER) - self.router_socket.bind(zmq_address.get_broker_address(conf)) + self.router_socket = zmq_socket.ZmqRandomPortSocket( + conf, context, zmq.ROUTER) - self.poller.register(self.router_socket, self._receive_in_request) - LOG.info(_LI("Polling at universal proxy")) + self.poller.register(self.router_socket.handle, + self._receive_in_request) - self.matchmaker = matchmaker - self.pub_publisher = zmq_pub_publisher.PubPublisherProxy( - conf, matchmaker) + self.router_address = zmq_address.combine_address( + self.conf.rpc_zmq_host, self.router_socket.port) def run(self): message, socket = self.poller.poll(self.conf.rpc_poll_timeout) if message is None: return - if socket == self.router_socket: + if socket == self.router_socket.handle: self._redirect_in_request(message) + else: + self._redirect_reply(message) + + @abc.abstractmethod + def _redirect_in_request(self, multipart_message): + """Redirect incoming request to a publisher.""" + + @abc.abstractmethod + def _redirect_reply(self, multipart_message): + """Redirect reply to client. Implement in a concrete proxy.""" + + def _receive_in_request(self, socket): + reply_id = socket.recv() + assert reply_id is not None, "Valid id expected" + empty = socket.recv() + assert empty == b'', "Empty delimiter expected" + envelope = socket.recv_pyobj() + envelope.reply_id = reply_id + payload = socket.recv_multipart() + payload.insert(zmq_names.MULTIPART_IDX_ENVELOPE, envelope) + return payload + + +class PublisherProxy(UniversalQueueProxy): + + def __init__(self, conf, context, matchmaker): + super(PublisherProxy, self).__init__(conf, context, matchmaker) + LOG.info(_LI("Polling at PUBLISHER proxy")) + + self.pub_publisher = zmq_pub_publisher.PubPublisherProxy( + conf, matchmaker) + + self.matchmaker.register_publisher( + (self.pub_publisher.host, self.router_address)) + LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Run PUB publisher"), + {"pub": self.pub_publisher.host, + "router": self.router_address}) def _redirect_in_request(self, multipart_message): LOG.debug("-> Redirecting request %s to TCP publisher", @@ -57,15 +98,38 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): if self.conf.use_pub_sub and envelope.is_mult_send: self.pub_publisher.send_request(multipart_message) - def _receive_in_request(self, socket): - reply_id = socket.recv() - assert reply_id is not None, "Valid id expected" - empty = socket.recv() - assert empty == b'', "Empty delimiter expected" - envelope = socket.recv_pyobj() + def _redirect_reply(self, multipart_message): + """No reply is possible for publisher.""" + + +class RouterProxy(UniversalQueueProxy): + + def __init__(self, conf, context, matchmaker): + super(RouterProxy, self).__init__(conf, context, matchmaker) + LOG.info(_LI("Polling at ROUTER proxy")) + + self.dealer_publisher \ + = zmq_dealer_publisher_proxy.DealerPublisherProxy( + conf, matchmaker, self.poller) + + self.matchmaker.register_router(self.router_address) + LOG.info(_LI("ROUTER:%(router)s] Run ROUTER publisher"), + {"router": self.router_address}) + + def _redirect_in_request(self, multipart_message): + LOG.debug("-> Redirecting request %s to TCP publisher", + multipart_message) + envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] + LOG.debug("Envelope: %s", envelope) if not envelope.is_mult_send: - LOG.error(_LE("Message type %s is not supported by proxy"), - envelope.msg_type) - payload = socket.recv_multipart() - payload.insert(0, envelope) - return payload + self.dealer_publisher.send_request(multipart_message) + + def _redirect_reply(self, multipart_message): + envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] + LOG.debug("Envelope.reply_id: %s", envelope.reply_id) + response_binary = multipart_message[zmq_names.MULTIPART_IDX_BODY] + + self.router_socket.send(envelope.reply_id, zmq.SNDMORE) + self.router_socket.send(b'', zmq.SNDMORE) + self.router_socket.send_pyobj(envelope, zmq.SNDMORE) + self.router_socket.send(response_binary) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py index 3ced6ce4c..25cf2ed7d 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py @@ -42,20 +42,26 @@ class DealerCallPublisher(object): self.conf = conf self.matchmaker = matchmaker self.reply_waiter = ReplyWaiter(conf) - sockets_manager = zmq_publisher_base.SocketsManager( + self.sockets_manager = zmq_publisher_base.SocketsManager( conf, matchmaker, zmq.ROUTER, zmq.DEALER) def _do_send_request(socket, request): - # DEALER socket specific envelope empty delimiter + target_hosts = self.sockets_manager.get_hosts(request.target) + envelope = request.create_envelope(target_hosts) + # DEALER socket specific envelope empty delimiter socket.send(b'', zmq.SNDMORE) + socket.send_pyobj(envelope, zmq.SNDMORE) socket.send_pyobj(request) LOG.debug("Sent message_id %(message)s to a target %(target)s", {"message": request.message_id, "target": request.target}) - self.sender = CallSender(sockets_manager, _do_send_request, - self.reply_waiter) + self.sender = CallSender(self.sockets_manager, _do_send_request, + self.reply_waiter) \ + if not conf.use_router_proxy else \ + CallSenderLight(self.sockets_manager, _do_send_request, + self.reply_waiter) def send_request(self, request): reply_future = self.sender.send_request(request) @@ -99,6 +105,14 @@ class CallSender(zmq_publisher_base.QueuedSender): return socket +class CallSenderLight(CallSender): + + def _connect_socket(self, target): + socket = self.outbound_sockets.get_socket_to_routers() + self.reply_waiter.poll_socket(socket) + return socket + + class ReplyWaiter(object): def __init__(self, conf): @@ -122,6 +136,8 @@ class ReplyWaiter(object): def _receive_method(socket): empty = socket.recv() assert empty == b'', "Empty expected!" + envelope = socket.recv_pyobj() + assert envelope is not None, "Invalid envelope!" reply = socket.recv_pyobj() LOG.debug("Received reply %s", reply) return reply diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py index cba029426..74fbee2ca 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py @@ -16,7 +16,6 @@ import logging from oslo_messaging._drivers.zmq_driver.client.publishers\ import zmq_publisher_base -from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -31,6 +30,7 @@ class DealerPublisher(zmq_publisher_base.QueuedSender): def _send_message_data(socket, request): socket.send(b'', zmq.SNDMORE) + socket.send_pyobj(request.create_envelope(), zmq.SNDMORE) socket.send_pyobj(request) LOG.debug("Sent message_id %(message)s to a target %(target)s", @@ -75,13 +75,13 @@ class DealerPublisherLight(zmq_publisher_base.QueuedSender): "a target %(target)s", {"message": request.message_id, "target": request.target, - "addr": zmq_address.get_broker_address(conf)}) + "addr": list(socket.connections)}) sockets_manager = zmq_publisher_base.SocketsManager( conf, matchmaker, zmq.ROUTER, zmq.DEALER) super(DealerPublisherLight, self).__init__( sockets_manager, _do_send_request) - self.socket = self.outbound_sockets.get_socket_to_broker() + self.socket = self.outbound_sockets.get_socket_to_publishers() def _connect_socket(self, target): return self.socket diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py new file mode 100644 index 000000000..0c0ebb7e2 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -0,0 +1,67 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo_messaging._drivers.zmq_driver.client.publishers \ + import zmq_publisher_base +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names + +zmq = zmq_async.import_zmq() + +LOG = logging.getLogger(__name__) + + +class DealerPublisherProxy(object): + + def __init__(self, conf, matchmaker, poller): + super(DealerPublisherProxy, self).__init__() + self.conf = conf + self.matchmaker = matchmaker + self.poller = poller + self.sockets_manager = zmq_publisher_base.SocketsManager( + conf, matchmaker, zmq.ROUTER, zmq.DEALER) + + def send_request(self, multipart_message): + envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] + if envelope.is_mult_send: + raise zmq_publisher_base.UnsupportedSendPattern(envelope.msg_type) + if not envelope.target_hosts: + raise Exception("Target hosts are expected!") + + dealer_socket = self.sockets_manager.get_socket_to_hosts( + envelope.target, envelope.target_hosts) + self.poller.register(dealer_socket.handle, self.receive_reply) + + LOG.debug("Sending message %(message)s to a target %(target)s" + % {"message": envelope.message_id, + "target": envelope.target}) + + # Empty delimiter - DEALER socket specific + dealer_socket.send(b'', zmq.SNDMORE) + dealer_socket.send_pyobj(envelope, zmq.SNDMORE) + dealer_socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY]) + + def receive_reply(self, socket): + empty = socket.recv() + assert empty == b'', "Empty expected!" + envelope = socket.recv_pyobj() + assert envelope is not None, "Invalid envelope!" + reply = socket.recv() + LOG.debug("Received reply %s", reply) + return [envelope, reply] + + def cleanup(self): + self.sockets_manager.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py index 14c6444bf..890a32392 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py @@ -20,7 +20,6 @@ from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_socket -from oslo_messaging._i18n import _LI LOG = logging.getLogger(__name__) @@ -55,13 +54,6 @@ class PubPublisherProxy(object): self.sync_channel = SyncChannel(conf, matchmaker, self.zmq_context) - LOG.info(_LI("[PUB:%(pub)s, PULL:%(pull)s] Run PUB publisher"), - {"pub": self.host, - "pull": self.sync_channel.sync_host}) - - self.matchmaker.register_publisher( - (self.host, self.sync_channel.sync_host)) - def send_request(self, multipart_message): envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index 929d1a76d..86d1326fc 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -19,7 +19,6 @@ import time import six from oslo_messaging._drivers import common as rpc_common -from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_socket @@ -114,9 +113,15 @@ class SocketsManager(object): def _track_socket(self, socket, target): self.outbound_sockets[str(target)] = (socket, time.time()) - def _get_hosts_and_connect(self, socket, target): - hosts = self.matchmaker.get_hosts( + def get_hosts(self, target): + return self.matchmaker.get_hosts( target, zmq_names.socket_type_str(self.listener_type)) + + def _get_hosts_and_connect(self, socket, target): + hosts = self.get_hosts(target) + self._connect_to_hosts(socket, target, hosts) + + def _connect_to_hosts(self, socket, target, hosts): for host in hosts: socket.connect_to_host(host) self._track_socket(socket, target) @@ -136,11 +141,29 @@ class SocketsManager(object): self._get_hosts_and_connect(socket, target) return socket - def get_socket_to_broker(self): + def get_socket_to_hosts(self, target, hosts): + if str(target) in self.outbound_sockets: + socket = self._check_for_new_hosts(target) + else: + socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context, + self.socket_type) + self._connect_to_hosts(socket, target, hosts) + return socket + + def get_socket_to_publishers(self): socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context, self.socket_type) - address = zmq_address.get_broker_address(self.conf) - socket.connect_to_address(address) + publishers = self.matchmaker.get_publishers() + for pub_address, router_address in publishers: + socket.connect_to_host(router_address) + return socket + + def get_socket_to_routers(self): + socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context, + self.socket_type) + routers = self.matchmaker.get_routers() + for router_address in routers: + socket.connect_to_host(router_address) return socket def cleanup(self): diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index d52667475..7183c6114 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -17,8 +17,6 @@ from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_call_publisher from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_publisher -from oslo_messaging._drivers.zmq_driver.client.publishers \ - import zmq_push_publisher from oslo_messaging._drivers.zmq_driver.client import zmq_client_base from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -43,9 +41,6 @@ class ZmqClient(zmq_client_base.ZmqClientBase): zmq_dealer_call_publisher.DealerCallPublisher( conf, matchmaker), - zmq_names.CAST_TYPE: - zmq_push_publisher.PushPublisher(conf, matchmaker), - # Here use DealerPublisherLight for sending request to proxy # which finally uses PubPublisher to send fanout in case of # 'use_pub_sub' option configured. diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py index 1b3d023a7..a6d7d862e 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py @@ -18,12 +18,23 @@ from oslo_messaging._drivers.zmq_driver import zmq_names class Envelope(object): - def __init__(self, msg_type=None, message_id=None, target=None, **kwargs): + def __init__(self, msg_type=None, message_id=None, target=None, + target_hosts=None, **kwargs): self._msg_type = msg_type self._message_id = message_id self._target = target + self._target_hosts = target_hosts + self._reply_id = None self._kwargs = kwargs + @property + def reply_id(self): + return self._reply_id + + @reply_id.setter + def reply_id(self, value): + self._reply_id = value + @property def msg_type(self): return self._msg_type @@ -36,6 +47,10 @@ class Envelope(object): def target(self): return self._target + @property + def target_hosts(self): + return self._target_hosts + @property def is_mult_send(self): return self._msg_type in zmq_names.MULTISEND_TYPES @@ -44,6 +59,9 @@ class Envelope(object): def topic_filter(self): return zmq_address.target_to_subscribe_filter(self._target) + def has(self, key): + return key in self._kwargs + def set(self, key, value): self._kwargs[key] = value diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py index d957288bf..372a0c9e1 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py @@ -70,10 +70,12 @@ class Request(object): self.message_id = str(uuid.uuid1()) - def create_envelope(self): - return zmq_envelope.Envelope(msg_type=self.msg_type, - message_id=self.message_id, - target=self.target) + def create_envelope(self, hosts=None): + envelope = zmq_envelope.Envelope(msg_type=self.msg_type, + message_id=self.message_id, + target=self.target, + target_hosts=hosts) + return envelope @abc.abstractproperty def msg_type(self): @@ -112,8 +114,8 @@ class CallRequest(RpcRequest): super(CallRequest, self).__init__(*args, **kwargs) - def create_envelope(self): - envelope = super(CallRequest, self).create_envelope() + def create_envelope(self, hosts=None): + envelope = super(CallRequest, self).create_envelope(hosts) envelope.set('timeout', self.timeout) return envelope diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py index cc99aed22..65ade7ab4 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py @@ -56,6 +56,33 @@ class MatchMakerBase(object): :returns: a list of tuples of strings "hostname:port" hosts """ + @abc.abstractmethod + def register_router(self, hostname): + """Register router on the nameserver. + + This works for ROUTER proxy only + + :param hostname: host for the topic in "host:port" format + :type hostname: string + """ + + @abc.abstractmethod + def unregister_router(self, hostname): + """Unregister router on the nameserver. + + This works for ROUTER proxy only + + :param hostname: host for the topic in "host:port" format + :type hostname: string + """ + + @abc.abstractmethod + def get_routers(self): + """Get all router-hosts from nameserver. + + :returns: a list of strings "hostname:port" hosts + """ + @abc.abstractmethod def register(self, target, hostname, listener_type, expire=-1): """Register target on nameserver. @@ -101,6 +128,7 @@ class DummyMatchMaker(MatchMakerBase): self._cache = collections.defaultdict(list) self._publishers = set() + self._routers = set() def register_publisher(self, hostname): if hostname not in self._publishers: @@ -113,6 +141,17 @@ class DummyMatchMaker(MatchMakerBase): def get_publishers(self): return list(self._publishers) + def register_router(self, hostname): + if hostname not in self._routers: + self._routers.add(hostname) + + def unregister_router(self, hostname): + if hostname in self._routers: + self._routers.remove(hostname) + + def get_routers(self): + return list(self._routers) + def register(self, target, hostname, listener_type, expire=-1): key = zmq_address.target_to_key(target, listener_type) if hostname not in self._cache[key]: diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py index 0d0ce8bca..b463a8b5e 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py @@ -56,7 +56,8 @@ matchmaker_redis_opts = [ ] _PUBLISHERS_KEY = "PUBLISHERS" -_RETRY_METHODS = ("get_hosts", "get_publishers") +_ROUTERS_KEY = "ROUTERS" +_RETRY_METHODS = ("get_hosts", "get_publishers", "get_routers") def retry_if_connection_error(ex): @@ -144,6 +145,15 @@ class RedisMatchMaker(base.MatchMakerBase): self._get_hosts_by_key(_PUBLISHERS_KEY)]) return hosts + def register_router(self, hostname): + self._redis.sadd(_ROUTERS_KEY, hostname) + + def unregister_router(self, hostname): + self._redis.srem(_ROUTERS_KEY, hostname) + + def get_routers(self): + return self._get_hosts_by_key(_ROUTERS_KEY) + def _get_hosts_by_key(self, key): return self._redis.smembers(key) diff --git a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py index 414e22042..a5a486858 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py @@ -48,7 +48,7 @@ class ThreadingPoller(zmq_poller.ZmqPoller): def poll(self, timeout=None): if timeout: - timeout *= 1000 # zmq poller waits milliseconds + timeout *= 1000 # zmq poller expects milliseconds sockets = None @@ -65,9 +65,6 @@ class ThreadingPoller(zmq_poller.ZmqPoller): else: return socket.recv_multipart(), socket - def resume_polling(self, socket): - pass # Nothing to do for threading poller - def close(self): pass # Nothing to do for threading poller diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index eac2bb727..c8a0f58e6 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -36,7 +36,6 @@ class RouterIncomingMessage(base.RpcIncomingMessage): self.reply_id = reply_id self.msg_id = msg_id self.message = message - poller.resume_polling(socket) def reply(self, reply=None, failure=None, log_failure=True): """Reply is not needed for non-call messages""" @@ -58,12 +57,13 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): reply_id = socket.recv() empty = socket.recv() assert empty == b'', 'Bad format: empty delimiter expected' + envelope = socket.recv_pyobj() request = socket.recv_pyobj() - return request, reply_id + return request, envelope, reply_id def receive_message(self, socket): try: - request, reply_id = self._receive_request(socket) + request, envelope, reply_id = self._receive_request(socket) LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s", {"host": self.host, "type": request.msg_type, @@ -72,7 +72,7 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): if request.msg_type == zmq_names.CALL_TYPE: return zmq_incoming_message.ZmqIncomingRequest( - socket, reply_id, request, self.poller) + socket, reply_id, request, envelope, self.poller) elif request.msg_type in zmq_names.NON_BLOCKING_TYPES: return RouterIncomingMessage( request.context, request.message, socket, reply_id, diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index d0d3b0470..e3bd186d2 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -29,12 +29,13 @@ zmq = zmq_async.import_zmq() class ZmqIncomingRequest(base.RpcIncomingMessage): - def __init__(self, socket, rep_id, request, poller): + def __init__(self, socket, rep_id, request, envelope, poller): super(ZmqIncomingRequest, self).__init__(request.context, request.message) self.reply_socket = socket self.reply_id = rep_id self.request = request + self.envelope = envelope self.received = None self.poller = poller @@ -54,8 +55,8 @@ class ZmqIncomingRequest(base.RpcIncomingMessage): self.received = True self.reply_socket.send(self.reply_id, zmq.SNDMORE) self.reply_socket.send(b'', zmq.SNDMORE) + self.reply_socket.send_pyobj(self.envelope, zmq.SNDMORE) self.reply_socket.send_pyobj(response) - self.poller.resume_polling(self.reply_socket) def requeue(self): """Requeue is not supported""" diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index cf9262ece..6f1b939c6 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -16,8 +16,6 @@ import copy import logging from oslo_messaging._drivers import base -from oslo_messaging._drivers.zmq_driver.server.consumers\ - import zmq_pull_consumer from oslo_messaging._drivers.zmq_driver.server.consumers\ import zmq_router_consumer from oslo_messaging._drivers.zmq_driver.server.consumers\ @@ -41,12 +39,10 @@ class ZmqServer(base.Listener): self.poller = poller or zmq_async.get_poller() self.router_consumer = zmq_router_consumer.RouterConsumer( conf, self.poller, self) - self.pull_consumer = zmq_pull_consumer.PullConsumer( - conf, self.poller, self) self.sub_consumer = zmq_sub_consumer.SubConsumer( conf, self.poller, self) if conf.use_pub_sub else None - self.consumers = [self.router_consumer, self.pull_consumer] + self.consumers = [self.router_consumer] if self.sub_consumer: self.consumers.append(self.sub_consumer) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_async.py b/oslo_messaging/_drivers/zmq_driver/zmq_async.py index ff57046f2..741a08b1e 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_async.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_async.py @@ -54,11 +54,6 @@ def get_executor(method, zmq_concurrency='eventlet'): return threading_poller.ThreadingExecutor(method) -def get_proc_executor(method): - from oslo_messaging._drivers.zmq_driver import zmq_poller - return zmq_poller.MutliprocessingExecutor(method) - - def _is_eventlet_zmq_available(): return importutils.try_import('eventlet.green.zmq') diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py index dccdc0cfa..28fe6c8e1 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py @@ -13,7 +13,6 @@ # under the License. import abc -import multiprocessing import six @@ -79,13 +78,6 @@ class ZmqPoller(object): def close(self): """Terminate polling""" - def resume_polling(self, socket): - """Resume with polling - - Some implementations of poller may provide hold polling before reply - This method is intended to explicitly resume polling afterwards. - """ - @six.add_metaclass(abc.ABCMeta) class Executor(object): @@ -109,27 +101,3 @@ class Executor(object): @abc.abstractmethod def done(self): """More soft way to stop rather than killing thread""" - - -class MutliprocessingExecutor(Executor): - - def __init__(self, method): - process = multiprocessing.Process(target=self._loop) - self._method = method - super(MutliprocessingExecutor, self).__init__(process) - - def _loop(self): - while not self._stop.is_set(): - self._method() - - def execute(self): - self.thread.start() - - def stop(self): - self._stop.set() - - def wait(self): - self.thread.join() - - def done(self): - self._stop.set() diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index 30710f5e4..5a61cbcf2 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -134,7 +134,6 @@ class ZmqRandomPortSocket(ZmqSocket): min_port=conf.rpc_zmq_min_port, max_port=conf.rpc_zmq_max_port, max_tries=conf.rpc_zmq_bind_port_retries) - self.connected = True except zmq.ZMQBindError: LOG.error(_LE("Random ports range exceeded!")) raise ZmqPortRangeExceededException() diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py index 1b37be510..091397cf8 100644 --- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py +++ b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py @@ -40,6 +40,8 @@ class TestPubSub(zmq_common.ZmqBaseTestCase): self.publisher = zmq_pub_publisher.PubPublisherProxy( self.conf, self.driver.matchmaker) + self.driver.matchmaker.register_publisher( + (self.publisher.host, "")) self.listeners = [] for i in range(self.LISTENERS_COUNT): diff --git a/setup-test-env-zmq.sh b/setup-test-env-zmq.sh index effe0091b..816c01637 100755 --- a/setup-test-env-zmq.sh +++ b/setup-test-env-zmq.sh @@ -16,12 +16,14 @@ cat > ${DATADIR}/zmq.conf < ${DATADIR}/zmq-broker.log 2>&1 & +oslo-messaging-zmq-proxy --type PUBLISHER --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-publisher.log 2>&1 & +oslo-messaging-zmq-proxy --type ROUTER --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-router.log 2>&1 & $* diff --git a/setup.cfg b/setup.cfg index 98ca0270f..1b8d408b7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -25,7 +25,8 @@ packages = [entry_points] console_scripts = - oslo-messaging-zmq-broker = oslo_messaging._cmd.zmq_broker:main + oslo-messaging-zmq-proxy = oslo_messaging._cmd.zmq_proxy:main + oslo-messaging-zmq-broker = oslo_messaging._cmd.zmq_proxy:main oslo.messaging.drivers = rabbit = oslo_messaging._drivers.impl_rabbit:RabbitDriver