diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index f0ff1a64..09dde4b1 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -15,6 +15,7 @@ import logging import pprint import socket +import threading from oslo_config import cfg from stevedore import driver @@ -82,6 +83,36 @@ zmq_opts = [ ] +class LazyDriverItem(object): + + def __init__(self, item_cls, *args, **kwargs): + self._lock = threading.Lock() + self.item = None + self.item_class = item_cls + self.args = args + self.kwargs = kwargs + + def get(self): + # NOTE(ozamiatin): Lazy initialization. + # All init stuff moved closer to usage point - lazy init. + # Better design approach is to initialize in the driver's + # __init__, but 'fork' extensively used by services + # breaks all things. + + if self.item is not None: + return self.item + + self._lock.acquire() + if self.item is None: + self.item = self.item_class(*self.args, **self.kwargs) + self._lock.release() + return self.item + + def cleanup(self): + if self.item: + self.item.cleanup() + + class ZmqDriver(base.BaseDriver): """ZeroMQ Driver implementation. @@ -115,15 +146,27 @@ class ZmqDriver(base.BaseDriver): conf.register_opts(zmq_opts) conf.register_opts(executor_base._pool_opts) self.conf = conf + self.allowed_remote_exmods = allowed_remote_exmods self.matchmaker = driver.DriverManager( 'oslo.messaging.zmq.matchmaker', self.conf.rpc_zmq_matchmaker, ).driver(self.conf) - self.server = zmq_server.ZmqServer(self.conf, self.matchmaker) - self.client = zmq_client.ZmqClient(self.conf, self.matchmaker, - allowed_remote_exmods) + self.server = LazyDriverItem( + zmq_server.ZmqServer, self, self.conf, self.matchmaker) + + self.notify_server = LazyDriverItem( + zmq_server.ZmqServer, self, self.conf, self.matchmaker) + + self.client = LazyDriverItem( + zmq_client.ZmqClient, self.conf, self.matchmaker, + self.allowed_remote_exmods) + + self.notifier = LazyDriverItem( + zmq_client.ZmqClient, self.conf, self.matchmaker, + self.allowed_remote_exmods) + super(ZmqDriver, self).__init__(conf, url, default_exchange, allowed_remote_exmods) @@ -147,13 +190,14 @@ class ZmqDriver(base.BaseDriver): N means N retries :type retry: int """ + client = self.client.get() timeout = timeout or self.conf.rpc_response_timeout if wait_for_reply: - return self.client.send_call(target, ctxt, message, timeout, retry) + return client.send_call(target, ctxt, message, timeout, retry) elif target.fanout: - self.client.send_fanout(target, ctxt, message, timeout, retry) + client.send_fanout(target, ctxt, message, timeout, retry) else: - self.client.send_cast(target, ctxt, message, timeout, retry) + client.send_cast(target, ctxt, message, timeout, retry) def send_notification(self, target, ctxt, message, version, retry=None): """Send notification to server @@ -172,11 +216,11 @@ class ZmqDriver(base.BaseDriver): N means N retries :type retry: int """ + client = self.notifier.get() if target.fanout: - self.client.send_notify_fanout(target, ctxt, message, version, - retry) + client.send_notify_fanout(target, ctxt, message, version, retry) else: - self.client.send_notify(target, ctxt, message, version, retry) + client.send_notify(target, ctxt, message, version, retry) def listen(self, target): """Listen to a specified target on a server side @@ -184,8 +228,9 @@ class ZmqDriver(base.BaseDriver): :param target: Message destination target :type target: oslo_messaging.Target """ - self.server.listen(target) - return self.server + server = self.server.get() + server.listen(target) + return server def listen_for_notifications(self, targets_and_priorities, pool): """Listen to a specified list of targets on a server side @@ -195,11 +240,14 @@ class ZmqDriver(base.BaseDriver): :param pool: Not used for zmq implementation :type pool: object """ - self.server.listen_notification(targets_and_priorities) - return self.server + server = self.notify_server.get() + server.listen_notification(targets_and_priorities) + return server def cleanup(self): """Cleanup all driver's connections finally """ self.client.cleanup() self.server.cleanup() + self.notify_server.cleanup() + self.notifier.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py index bf6f253f..9fdd6d7a 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py @@ -14,20 +14,21 @@ import logging -from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.client.publishers\ import zmq_publisher_base -from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LE, _LI +from oslo_messaging._i18n import _LI, _LW LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class DealerPublisher(zmq_publisher_base.PublisherBase): +class DealerPublisher(zmq_publisher_base.PublisherMultisend): + + def __init__(self, conf, matchmaker): + super(DealerPublisher, self).__init__(conf, matchmaker, zmq.DEALER) def send_request(self, request): @@ -37,41 +38,25 @@ class DealerPublisher(zmq_publisher_base.PublisherBase): dealer_socket, hosts = self._check_hosts_connections(request.target) if request.msg_type in zmq_names.MULTISEND_TYPES: - for _ in range(len(hosts)): + for _ in range(dealer_socket.connections_count()): self._send_request(dealer_socket, request) else: self._send_request(dealer_socket, request) def _send_request(self, socket, request): + if not socket.connections: + # NOTE(ozamiatin): Here we can provide + # a queue for keeping messages to send them later + # when some listener appears. However such approach + # being more reliable will consume additional memory. + LOG.warning(_LW("Request %s was dropped because no connection") + % request.msg_type) + return + socket.send(b'', zmq.SNDMORE) super(DealerPublisher, self)._send_request(socket, request) LOG.info(_LI("Sending message %(message)s to a target %(target)s") % {"message": request.message, "target": request.target}) - - def _check_hosts_connections(self, target): - if str(target) in self.outbound_sockets: - dealer_socket, hosts = self.outbound_sockets[str(target)] - else: - dealer_socket = zmq.Context().socket(zmq.DEALER) - hosts = self.matchmaker.get_hosts(target) - for host in hosts: - self._connect_to_host(dealer_socket, host, target) - self.outbound_sockets[str(target)] = (dealer_socket, hosts) - return dealer_socket, hosts - - @staticmethod - def _connect_to_host(socket, host, target): - address = zmq_address.get_tcp_direct_address(host) - try: - LOG.info(_LI("Connecting DEALER to %(address)s for %(target)s") - % {"address": address, - "target": target}) - socket.connect(address) - except zmq.ZMQError as e: - errmsg = _LE("Failed connecting DEALER to %(address)s: %(e)s")\ - % (address, e) - LOG.error(errmsg) - raise rpc_common.RPCException(errmsg) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index a367e9ed..51de8a5e 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -13,13 +13,18 @@ # under the License. import abc +import logging import six from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._i18n import _LE +from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._drivers.zmq_driver import zmq_socket +from oslo_messaging._i18n import _LE, _LI +LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() @@ -93,6 +98,42 @@ class PublisherBase(object): def cleanup(self): """Cleanup publisher. Close allocated connections.""" - for socket, hosts in self.outbound_sockets.values(): + for socket in self.outbound_sockets.values(): socket.setsockopt(zmq.LINGER, 0) socket.close() + + +class PublisherMultisend(PublisherBase): + + def __init__(self, conf, matchmaker, socket_type): + self.socket_type = socket_type + super(PublisherMultisend, self).__init__(conf, matchmaker) + + def _check_hosts_connections(self, target): + hosts = self.matchmaker.get_hosts(target) + + if str(target) in self.outbound_sockets: + socket = self.outbound_sockets[str(target)] + else: + socket = zmq_socket.ZmqSocket(self.zmq_context, self.socket_type) + self.outbound_sockets[str(target)] = socket + + for host in hosts: + self._connect_to_host(socket, host, target) + + return socket, hosts + + def _connect_to_host(self, socket, host, target): + address = zmq_address.get_tcp_direct_address(host) + stype = zmq_names.socket_type_str(self.socket_type) + try: + LOG.info(_LI("Connecting %(stype)s to %(address)s for %(target)s") + % {"stype": stype, + "address": address, + "target": target}) + socket.connect(address) + except zmq.ZMQError as e: + errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\ + % (stype, address, e) + LOG.error(errmsg) + raise rpc_common.RPCException(errmsg) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py index 68beab90..a3096959 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py @@ -52,7 +52,7 @@ class ReqPublisher(zmq_publisher_base.PublisherBase): LOG.info(_LI("Connecting REQ to %s") % connect_address) socket.connect(connect_address) - self.outbound_sockets[str(target)] = (socket, [host]) + self.outbound_sockets[str(target)] = socket return socket except zmq.ZMQError as e: diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index 58680da9..92b9364b 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -30,6 +30,7 @@ class RouterConsumer(object): def __init__(self, conf, poller, server): + self.conf = conf self.poller = poller self.server = server @@ -38,6 +39,7 @@ class RouterConsumer(object): self.socket = self.context.socket(zmq.ROUTER) self.address = zmq_address.get_tcp_random_address(conf) self.port = self.socket.bind_to_random_port(self.address) + self.poller.register(self.socket, self._receive_message) LOG.info(_LI("Run ROUTER consumer on %(addr)s:%(port)d"), {"addr": self.address, "port": self.port}) @@ -49,7 +51,7 @@ class RouterConsumer(object): def listen(self, target): LOG.info(_LI("Listen to target %s") % str(target)) - self.poller.register(self.socket, self._receive_message) + # Do nothing here because we have single socket def cleanup(self): if not self.socket.closed: @@ -66,7 +68,9 @@ class RouterConsumer(object): assert msg_type is not None, 'Bad format: msg type expected' context = socket.recv_json() message = socket.recv_json() - LOG.debug("Received %s message %s" % (msg_type, str(message))) + LOG.info(_LI("Received %(msg_type)s message %(msg)s") + % {"msg_type": msg_type, + "msg": str(message)}) if msg_type == zmq_names.CALL_TYPE: return zmq_incoming_message.ZmqIncomingRequest( diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index 30cacd40..8f7f1265 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -28,8 +28,8 @@ zmq = zmq_async.import_zmq() class ZmqServer(base.Listener): - def __init__(self, conf, matchmaker=None): - self.conf = conf + def __init__(self, driver, conf, matchmaker=None): + super(ZmqServer, self).__init__(driver) self.matchmaker = matchmaker self.poller = zmq_async.get_poller() self.rpc_consumer = zmq_router_consumer.RouterConsumer( diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py index 583600ec..33fe9247 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py @@ -12,6 +12,17 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo_messaging._drivers.zmq_driver import zmq_async + +zmq = zmq_async.import_zmq() + + +ZMQ_SOCKET_STR = {zmq.DEALER: "DEALER", + zmq.ROUTER: "ROUTER", + zmq.REQ: "REQ", + zmq.REP: "REP", + zmq.PUB: "PUB", + zmq.SUB: "SUB"} FIELD_FAILURE = 'failure' FIELD_REPLY = 'reply' @@ -33,3 +44,7 @@ MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_FANOUT_TYPE) DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, NOTIFY_TYPE) CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE) NOTIFY_TYPES = (NOTIFY_TYPE, NOTIFY_FANOUT_TYPE) + + +def socket_type_str(socket_type): + return ZMQ_SOCKET_STR[socket_type] diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py new file mode 100644 index 00000000..a4f77b7e --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -0,0 +1,57 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class ZmqSocket(object): + + def __init__(self, context, socket_type): + self.context = context + self.socket_type = socket_type + self.handle = context.socket(socket_type) + self.connections = set() + + def type_name(self): + return zmq_names(self.socket_type) + + def connections_count(self): + return len(self.connections) + + def connect(self, address): + if address not in self.connections: + self.handle.connect(address) + self.connections.add(address) + + def setsockopt(self, *args, **kwargs): + self.handle.setsockopt(*args, **kwargs) + + def send(self, *args, **kwargs): + self.handle.send(*args, **kwargs) + + def send_string(self, *args, **kwargs): + self.handle.send_string(*args, **kwargs) + + def send_json(self, *args, **kwargs): + self.handle.send_json(*args, **kwargs) + + def close(self, *args, **kwargs): + self.handle.close(*args, **kwargs)