[zmq] Driver optimizations for CALL

New DEALER-based publisher provided for CALL.
Futures-based reply waiting makes possible to
refuse using of REQ blocking socket and also
reduce number of openned sockets to a socket-per-target
instead of socket-per-message as it was for CALL.

Closes-Bug: #1517999

Optimized redis requests - request once instead of
per each message. This should be elaborated with an
autonomous nodes discovery mechanism to be correct
in general case.

Closes-Bug: #1517993

Reduced number of INFO log messages. Most of them switched
to the DEBUG level.

Closes-Bug: #1517997

Change-Id: Id017e79368cdc68613ddd7adef26411ea422dc8c
This commit is contained in:
Oleksii Zamiatin 2015-11-16 15:07:15 +02:00
parent e5a232ce03
commit eea60cfb36
20 changed files with 528 additions and 191 deletions

View File

@ -13,6 +13,7 @@
# under the License.
import logging
import os
import pprint
import socket
import threading
@ -23,6 +24,7 @@ from stevedore import driver
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client import zmq_client
from oslo_messaging._drivers.zmq_driver.client import zmq_client_light
from oslo_messaging._drivers.zmq_driver.server import zmq_server
from oslo_messaging._executors import impl_pooledexecutor # FIXME(markmc)
@ -78,8 +80,8 @@ zmq_opts = [
'Poll raises timeout exception when timeout expired.'),
cfg.BoolOpt('zmq_use_broker',
default=True,
help='Shows whether zmq-messaging uses broker or not.'),
default=False,
help='Configures zmq-messaging to use broker or not.'),
cfg.PortOpt('rpc_zmq_min_port',
default=49152,
@ -106,6 +108,7 @@ class LazyDriverItem(object):
self.item_class = item_cls
self.args = args
self.kwargs = kwargs
self.process_id = os.getpid()
def get(self):
# NOTE(ozamiatin): Lazy initialization.
@ -114,11 +117,12 @@ class LazyDriverItem(object):
# __init__, but 'fork' extensively used by services
# breaks all things.
if self.item is not None:
if self.item is not None and os.getpid() == self.process_id:
return self.item
self._lock.acquire()
if self.item is None:
if self.item is None or os.getpid() != self.process_id:
self.process_id = os.getpid()
self.item = self.item_class(*self.args, **self.kwargs)
self._lock.release()
return self.item
@ -175,12 +179,15 @@ class ZmqDriver(base.BaseDriver):
self.notify_server = LazyDriverItem(
zmq_server.ZmqServer, self, self.conf, self.matchmaker)
client_cls = zmq_client_light.ZmqClientLight \
if conf.zmq_use_broker else zmq_client.ZmqClient
self.client = LazyDriverItem(
zmq_client.ZmqClient, self.conf, self.matchmaker,
client_cls, self.conf, self.matchmaker,
self.allowed_remote_exmods)
self.notifier = LazyDriverItem(
zmq_client.ZmqClient, self.conf, self.matchmaker,
client_cls, self.conf, self.matchmaker,
self.allowed_remote_exmods)
super(ZmqDriver, self).__init__(conf, url, default_exchange,

View File

@ -15,8 +15,8 @@
import logging
from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_proxy
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
@ -39,8 +39,8 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
LOG.info(_LI("Polling at universal proxy"))
self.matchmaker = matchmaker
reply_receiver = zmq_dealer_publisher.ReplyReceiver(self.poller)
self.publisher = zmq_dealer_publisher.DealerPublisherProxy(
reply_receiver = zmq_dealer_publisher_proxy.ReplyReceiver(self.poller)
self.publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy(
conf, matchmaker, reply_receiver)
def run(self):
@ -54,18 +54,18 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
self._redirect_reply(message)
def _redirect_in_request(self, request):
LOG.info(_LI("-> Redirecting request %s to TCP publisher")
% request)
LOG.debug("-> Redirecting request %s to TCP publisher"
% request)
self.publisher.send_request(request)
def _redirect_reply(self, reply):
LOG.info(_LI("Reply proxy %s") % reply)
LOG.debug("Reply proxy %s" % reply)
if reply[zmq_names.IDX_REPLY_TYPE] == zmq_names.ACK_TYPE:
LOG.info(_LI("Acknowledge dropped %s") % reply)
LOG.debug("Acknowledge dropped %s" % reply)
return
LOG.info(_LI("<- Redirecting reply to ROUTER: reply: %s")
% reply[zmq_names.IDX_REPLY_BODY:])
LOG.debug("<- Redirecting reply to ROUTER: reply: %s"
% reply[zmq_names.IDX_REPLY_BODY:])
self.router_socket.send_multipart(reply[zmq_names.IDX_REPLY_BODY:])

View File

@ -0,0 +1,194 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
from concurrent import futures
import futurist
import oslo_messaging
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class DealerCallPublisher(zmq_publisher_base.PublisherBase):
"""Thread-safe CALL publisher
Used as faster and thread-safe publisher for CALL
instead of ReqPublisher.
"""
def __init__(self, conf, matchmaker):
super(DealerCallPublisher, self).__init__(conf)
self.matchmaker = matchmaker
self.reply_waiter = ReplyWaiter(conf)
self.sender = RequestSender(conf, matchmaker, self.reply_waiter) \
if not conf.zmq_use_broker else \
RequestSenderLight(conf, matchmaker, self.reply_waiter)
def send_request(self, request):
reply_future = self.sender.send_request(request)
try:
reply = reply_future.result(timeout=request.timeout)
except futures.TimeoutError:
raise oslo_messaging.MessagingTimeout(
"Timeout %s seconds was reached" % request.timeout)
finally:
self.reply_waiter.untrack_id(request.message_id)
LOG.debug("Received reply %s" % reply)
if reply[zmq_names.FIELD_FAILURE]:
raise rpc_common.deserialize_remote_exception(
reply[zmq_names.FIELD_FAILURE],
request.allowed_remote_exmods)
else:
return reply[zmq_names.FIELD_REPLY]
class RequestSender(zmq_publisher_base.PublisherMultisend):
def __init__(self, conf, matchmaker, reply_waiter):
super(RequestSender, self).__init__(conf, matchmaker, zmq.DEALER)
self.reply_waiter = reply_waiter
self.queue, self.empty_except = zmq_async.get_queue()
self.executor = zmq_async.get_executor(self.run_loop)
self.executor.execute()
def send_request(self, request):
reply_future = futurist.Future()
self.reply_waiter.track_reply(reply_future, request.message_id)
self.queue.put(request)
return reply_future
def _do_send_request(self, socket, request):
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(request)
LOG.debug("Sending message_id %(message)s to a target %(target)s"
% {"message": request.message_id,
"target": request.target})
def _check_hosts_connections(self, target, listener_type):
if str(target) in self.outbound_sockets:
socket = self.outbound_sockets[str(target)]
else:
hosts = self.matchmaker.get_hosts(
target, listener_type)
socket = zmq_socket.ZmqSocket(self.zmq_context, self.socket_type)
self.outbound_sockets[str(target)] = socket
for host in hosts:
self._connect_to_host(socket, host, target)
return socket
def run_loop(self):
try:
request = self.queue.get(timeout=self.conf.rpc_poll_timeout)
except self.empty_except:
return
socket = self._check_hosts_connections(
request.target, zmq_names.socket_type_str(zmq.ROUTER))
self._do_send_request(socket, request)
self.reply_waiter.poll_socket(socket)
class RequestSenderLight(RequestSender):
"""This class used with proxy.
Simplified address matching because there is only
one proxy IPC address.
"""
def __init__(self, conf, matchmaker, reply_waiter):
if not conf.zmq_use_broker:
raise rpc_common.RPCException("RequestSenderLight needs a proxy!")
super(RequestSenderLight, self).__init__(
conf, matchmaker, reply_waiter)
self.socket = None
def _check_hosts_connections(self, target, listener_type):
if self.socket is None:
self.socket = zmq_socket.ZmqSocket(self.zmq_context,
self.socket_type)
self.outbound_sockets[str(target)] = self.socket
address = zmq_address.get_broker_address(self.conf)
self._connect_to_address(self.socket, address, target)
return self.socket
def _do_send_request(self, socket, request):
LOG.debug("Sending %(type)s message_id %(message)s"
" to a target %(target)s"
% {"type": request.msg_type,
"message": request.message_id,
"target": request.target})
envelope = request.create_envelope()
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(envelope, zmq.SNDMORE)
socket.send_pyobj(request)
class ReplyWaiter(object):
def __init__(self, conf):
self.conf = conf
self.replies = {}
self.poller = zmq_async.get_poller()
self.executor = zmq_async.get_executor(self.run_loop)
self.executor.execute()
self._lock = threading.Lock()
def track_reply(self, reply_future, message_id):
self._lock.acquire()
self.replies[message_id] = reply_future
self._lock.release()
def untrack_id(self, message_id):
self._lock.acquire()
self.replies.pop(message_id)
self._lock.release()
def poll_socket(self, socket):
def _receive_method(socket):
empty = socket.recv()
assert empty == b'', "Empty expected!"
reply = socket.recv_pyobj()
LOG.debug("Received reply %s" % reply)
return reply
self.poller.register(socket, recv_method=_receive_method)
def run_loop(self):
reply, socket = self.poller.poll(
timeout=self.conf.rpc_poll_timeout)
if reply is not None:
call_future = self.replies[reply[zmq_names.FIELD_MSG_ID]]
call_future.set_result(reply)

View File

@ -18,7 +18,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI, _LW
from oslo_messaging._i18n import _LW
LOG = logging.getLogger(__name__)
@ -34,7 +34,7 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
self._check_request_pattern(request)
dealer_socket, hosts = self._check_hosts_connections(
dealer_socket = self._check_hosts_connections(
request.target, zmq_names.socket_type_str(zmq.ROUTER))
if not dealer_socket.connections:
@ -61,15 +61,16 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(request)
LOG.info(_LI("Sending message_id %(message)s to a target %(target)s")
% {"message": request.message_id,
"target": request.target})
LOG.debug("Sending message_id %(message)s to a target %(target)s"
% {"message": request.message_id,
"target": request.target})
def cleanup(self):
super(DealerPublisher, self).cleanup()
class DealerPublisherLight(zmq_publisher_base.PublisherBase):
"""Used when publishing to proxy. """
def __init__(self, conf, address):
super(DealerPublisherLight, self).__init__(conf)
@ -92,68 +93,6 @@ class DealerPublisherLight(zmq_publisher_base.PublisherBase):
self.socket.close()
class DealerPublisherProxy(DealerPublisher):
def __init__(self, conf, matchmaker, reply_receiver):
super(DealerPublisherProxy, self).__init__(conf, matchmaker)
self.reply_receiver = reply_receiver
def send_request(self, multipart_message):
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
LOG.info(_LI("Envelope: %s") % envelope)
target = envelope[zmq_names.FIELD_TARGET]
dealer_socket, hosts = self._check_hosts_connections(
target, zmq_names.socket_type_str(zmq.ROUTER))
if not dealer_socket.connections:
# NOTE(ozamiatin): Here we can provide
# a queue for keeping messages to send them later
# when some listener appears. However such approach
# being more reliable will consume additional memory.
LOG.warning(_LW("Request %s was dropped because no connection")
% envelope[zmq_names.FIELD_MSG_TYPE])
return
self.reply_receiver.track_socket(dealer_socket.handle)
LOG.info(_LI("Sending message %(message)s to a target %(target)s")
% {"message": envelope[zmq_names.FIELD_MSG_ID],
"target": envelope[zmq_names.FIELD_TARGET]})
if envelope[zmq_names.FIELD_MSG_TYPE] in zmq_names.MULTISEND_TYPES:
for _ in range(dealer_socket.connections_count()):
self._send_request(dealer_socket, multipart_message)
else:
self._send_request(dealer_socket, multipart_message)
def _send_request(self, socket, multipart_message):
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(
multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE],
zmq.SNDMORE)
socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY])
class ReplyReceiver(object):
def __init__(self, poller):
self.poller = poller
LOG.info(_LI("Reply waiter created in broker"))
def _receive_reply(self, socket):
return socket.recv_multipart()
def track_socket(self, socket):
self.poller.register(socket, self._receive_reply)
def cleanup(self):
self.poller.close()
class AcknowledgementReceiver(object):
def __init__(self):
@ -172,8 +111,7 @@ class AcknowledgementReceiver(object):
def poll_for_acknowledgements(self):
ack_message, socket = self.poller.poll()
LOG.info(_LI("Message %s acknowledged")
% ack_message[zmq_names.FIELD_ID])
LOG.debug("Message %s acknowledged" % ack_message[zmq_names.FIELD_ID])
def cleanup(self):
self.thread.stop()

View File

@ -0,0 +1,87 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI, _LW
zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)
class DealerPublisherProxy(zmq_dealer_publisher.DealerPublisher):
def __init__(self, conf, matchmaker, reply_receiver):
super(DealerPublisherProxy, self).__init__(conf, matchmaker)
self.reply_receiver = reply_receiver
def send_request(self, multipart_message):
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
LOG.debug("Envelope: %s" % envelope)
target = envelope[zmq_names.FIELD_TARGET]
dealer_socket = self._check_hosts_connections(
target, zmq_names.socket_type_str(zmq.ROUTER))
if not dealer_socket.connections:
# NOTE(ozamiatin): Here we can provide
# a queue for keeping messages to send them later
# when some listener appears. However such approach
# being more reliable will consume additional memory.
LOG.warning(_LW("Request %s was dropped because no connection")
% envelope[zmq_names.FIELD_MSG_TYPE])
return
self.reply_receiver.track_socket(dealer_socket.handle)
LOG.debug("Sending message %(message)s to a target %(target)s"
% {"message": envelope[zmq_names.FIELD_MSG_ID],
"target": envelope[zmq_names.FIELD_TARGET]})
if envelope[zmq_names.FIELD_MSG_TYPE] in zmq_names.MULTISEND_TYPES:
for _ in range(dealer_socket.connections_count()):
self._send_request(dealer_socket, multipart_message)
else:
self._send_request(dealer_socket, multipart_message)
def _send_request(self, socket, multipart_message):
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(
multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE],
zmq.SNDMORE)
socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY])
class ReplyReceiver(object):
def __init__(self, poller):
self.poller = poller
LOG.info(_LI("Reply waiter created in broker"))
def _receive_reply(self, socket):
return socket.recv_multipart()
def track_socket(self, socket):
self.poller.register(socket, self._receive_reply)
def cleanup(self):
self.poller.close()

View File

@ -18,7 +18,6 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI
LOG = logging.getLogger(__name__)
@ -35,7 +34,7 @@ class PubPublisher(zmq_publisher_base.PublisherMultisend):
if request.msg_type not in zmq_names.NOTIFY_TYPES:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
pub_socket, hosts = self._check_hosts_connections(
pub_socket = self._check_hosts_connections(
request.target, zmq_names.socket_type_str(zmq.SUB))
self._send_request(pub_socket, request)
@ -43,6 +42,6 @@ class PubPublisher(zmq_publisher_base.PublisherMultisend):
super(PubPublisher, self)._send_request(socket, request)
LOG.info(_LI("Publishing message %(message)s to a target %(target)s")
% {"message": request.message,
"target": request.target})
LOG.debug("Publishing message %(message)s to a target %(target)s"
% {"message": request.message,
"target": request.target})

View File

@ -14,6 +14,7 @@
import abc
import logging
import uuid
import six
@ -89,12 +90,11 @@ class PublisherBase(object):
:param request: Message data and destination container object
:type request: zmq_request.Request
"""
LOG.info(_LI("Sending %(type)s message_id %(message)s to a target"
"%(target)s host:%(host)s")
% {"type": request.msg_type,
"message": request.message_id,
"target": request.target,
"host": request.host})
LOG.debug("Sending %(type)s message_id %(message)s to a target"
"%(target)s"
% {"type": request.msg_type,
"message": request.message_id,
"target": request.target})
socket.send_pyobj(request)
def cleanup(self):
@ -124,28 +124,30 @@ class PublisherMultisend(PublisherBase):
def _check_hosts_connections(self, target, listener_type):
# TODO(ozamiatin): Place for significant optimization
# Matchmaker cache should be implemented
hosts = self.matchmaker.get_hosts(
target, listener_type)
if str(target) in self.outbound_sockets:
socket = self.outbound_sockets[str(target)]
else:
hosts = self.matchmaker.get_hosts(target, listener_type)
socket = zmq_socket.ZmqSocket(self.zmq_context, self.socket_type)
self.outbound_sockets[str(target)] = socket
for host in hosts:
self._connect_to_host(socket, host, target)
for host in hosts:
self._connect_to_host(socket, host, target)
return socket
return socket, hosts
def _connect_to_host(self, socket, host, target):
address = zmq_address.get_tcp_direct_address(host)
LOG.info(address)
def _connect_to_address(self, socket, address, target):
stype = zmq_names.socket_type_str(self.socket_type)
try:
LOG.info(_LI("Connecting %(stype)s to %(address)s for %(target)s")
% {"stype": stype,
"address": address,
"target": target})
if six.PY3:
socket.setsockopt_string(zmq.IDENTITY, str(uuid.uuid1()))
else:
socket.handle.identity = str(uuid.uuid1())
socket.connect(address)
except zmq.ZMQError as e:
errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\
@ -153,3 +155,7 @@ class PublisherMultisend(PublisherBase):
LOG.error(_LE("Failed connecting %(stype) to %(address)s: %(e)s")
% (stype, address, e))
raise rpc_common.RPCException(errmsg)
def _connect_to_host(self, socket, host, target):
address = zmq_address.get_tcp_direct_address(host)
self._connect_to_address(socket, address, target)

View File

@ -18,7 +18,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI, _LW
from oslo_messaging._i18n import _LW
LOG = logging.getLogger(__name__)
@ -35,7 +35,7 @@ class PushPublisher(zmq_publisher_base.PublisherMultisend):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
push_socket, hosts = self._check_hosts_connections(
push_socket = self._check_hosts_connections(
request.target, zmq_names.socket_type_str(zmq.PULL))
if not push_socket.connections:
@ -53,6 +53,6 @@ class PushPublisher(zmq_publisher_base.PublisherMultisend):
super(PushPublisher, self)._send_request(socket, request)
LOG.info(_LI("Publishing message %(message)s to a target %(target)s")
% {"message": request.message,
"target": request.target})
LOG.debug("Publishing message %(message)s to a target %(target)s"
% {"message": request.message,
"target": request.target})

View File

@ -82,8 +82,11 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
def _receive_reply(socket, request):
def _receive_method(socket):
return socket.recv_pyobj()
reply = socket.recv_pyobj()
LOG.debug("Received reply %s" % reply)
return reply
LOG.debug("Start waiting reply")
# NOTE(ozamiatin): Check for retry here (no retries now)
with contextlib.closing(zmq_async.get_reply_poller()) as poller:
poller.register(socket, recv_method=_receive_method)
@ -91,7 +94,7 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
if reply is None:
raise oslo_messaging.MessagingTimeout(
"Timeout %s seconds was reached" % request.timeout)
LOG.info(_LI("Received reply %s") % reply)
LOG.debug("Received reply %s" % reply)
if reply[zmq_names.FIELD_FAILURE]:
raise rpc_common.deserialize_remote_exception(
reply[zmq_names.FIELD_FAILURE],
@ -114,12 +117,12 @@ class ReqPublisherLight(ReqPublisher):
def _send_request(self, socket, request):
LOG.info(_LI("Sending %(type)s message_id %(message)s"
" to a target %(target)s, host:%(host)s")
% {"type": request.msg_type,
"message": request.message_id,
"target": request.target,
"host": request.host})
LOG.debug("Sending %(type)s message_id %(message)s"
" to a target %(target)s, host:%(host)s"
% {"type": request.msg_type,
"message": request.message_id,
"target": request.target,
"host": request.host})
envelope = request.create_envelope()

View File

@ -12,70 +12,33 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from oslo_messaging._drivers.zmq_driver.client.publishers\
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_call_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_req_publisher
from oslo_messaging._drivers.zmq_driver.client import zmq_request
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
zmq = zmq_async.import_zmq()
class ZmqClient(object):
class ZmqClient(zmq_client_base.ZmqClientBase):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
self.conf = conf
self.context = zmq.Context()
self.matchmaker = matchmaker
self.allowed_remote_exmods = allowed_remote_exmods or []
if conf.zmq_use_broker:
raise rpc_common.RPCException("This client doesn't need proxy!")
self.dealer_publisher = None
if self.conf.zmq_use_broker:
self.dealer_publisher = zmq_dealer_publisher.DealerPublisherLight(
conf, zmq_address.get_broker_address(self.conf))
self.req_publisher_cls = zmq_req_publisher.ReqPublisherLight
else:
self.dealer_publisher = zmq_dealer_publisher.DealerPublisher(
conf, matchmaker)
self.req_publisher_cls = zmq_req_publisher.ReqPublisher
super(ZmqClient, self).__init__(
conf, matchmaker, allowed_remote_exmods,
publishers={
zmq_names.CALL_TYPE:
zmq_dealer_call_publisher.DealerCallPublisher(
conf, matchmaker),
def send_call(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.CallRequest(
target, context=context, message=message,
timeout=timeout, retry=retry,
allowed_remote_exmods=self.allowed_remote_exmods)) as request:
with contextlib.closing(self.req_publisher_cls(
self.conf, self.matchmaker)) as req_publisher:
return req_publisher.send_request(request)
def send_cast(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.CastRequest(
target, context=context, message=message,
timeout=timeout, retry=retry)) as request:
self.dealer_publisher.send_request(request)
def send_fanout(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.FanoutRequest(
target, context=context, message=message,
timeout=timeout, retry=retry)) as request:
self.dealer_publisher.send_request(request)
def send_notify(self, target, context, message, version, retry=None):
with contextlib.closing(zmq_request.NotificationRequest(
target, context, message, version=version,
retry=retry)) as request:
self.dealer_publisher.send_request(request)
def send_notify_fanout(self, target, context, message, version,
retry=None):
with contextlib.closing(zmq_request.NotificationFanoutRequest(
target, context, message, version=version,
retry=retry)) as request:
self.dealer_publisher.send_request(request)
def cleanup(self):
self.dealer_publisher.cleanup()
"default": zmq_dealer_publisher.DealerPublisher(
conf, matchmaker)
}
)

View File

@ -0,0 +1,77 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from oslo_messaging._drivers.zmq_driver.client import zmq_request
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
zmq = zmq_async.import_zmq()
class ZmqClientBase(object):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None,
publishers=None):
self.conf = conf
self.context = zmq.Context()
self.matchmaker = matchmaker
self.allowed_remote_exmods = allowed_remote_exmods or []
self.publishers = publishers
self.call_publisher = publishers.get(zmq_names.CALL_TYPE) \
or publishers["default"]
self.cast_publisher = publishers.get(zmq_names.CAST_TYPE) \
or publishers["default"]
self.fanout_publisher = publishers.get(zmq_names.CAST_FANOUT_TYPE) \
or publishers["default"]
self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE) \
or publishers["default"]
def send_call(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.CallRequest(
target, context=context, message=message,
timeout=timeout, retry=retry,
allowed_remote_exmods=self.allowed_remote_exmods)) as request:
return self.call_publisher.send_request(request)
def send_cast(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.CastRequest(
target, context=context, message=message,
timeout=timeout, retry=retry)) as request:
self.cast_publisher.send_request(request)
def send_fanout(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.FanoutRequest(
target, context=context, message=message,
timeout=timeout, retry=retry)) as request:
self.fanout_publisher.send_request(request)
def send_notify(self, target, context, message, version, retry=None):
with contextlib.closing(zmq_request.NotificationRequest(
target, context, message, version=version,
retry=retry)) as request:
self.notify_publisher.send_request(request)
def send_notify_fanout(self, target, context, message, version,
retry=None):
with contextlib.closing(zmq_request.NotificationFanoutRequest(
target, context, message, version=version,
retry=retry)) as request:
self.notify_publisher.send_request(request)
def cleanup(self):
for publisher in self.publishers.values():
publisher.cleanup()

View File

@ -0,0 +1,46 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_call_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
zmq = zmq_async.import_zmq()
class ZmqClientLight(zmq_client_base.ZmqClientBase):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
if not conf.zmq_use_broker:
raise rpc_common.RPCException(
"This client needs proxy to be configured!")
super(ZmqClientLight, self).__init__(
conf, matchmaker, allowed_remote_exmods,
publishers={
zmq_names.CALL_TYPE:
zmq_dealer_call_publisher.DealerCallPublisher(
conf, matchmaker),
"default": zmq_dealer_publisher.DealerPublisherLight(
conf, zmq_address.get_broker_address(self.conf))
}
)

View File

@ -38,6 +38,7 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
self.recv_methods = {}
def register(self, socket, recv_method=None):
LOG.debug("Registering socket")
if socket in self.recv_methods:
return
if recv_method is not None:
@ -46,6 +47,8 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
def poll(self, timeout=None):
LOG.debug("Entering poll method")
if timeout:
timeout *= 1000 # zmq poller waits milliseconds

View File

@ -21,7 +21,7 @@ from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._i18n import _LE, _LI
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
@ -44,10 +44,10 @@ class ConsumerBase(object):
self.conf, self.context, socket_type)
self.sockets.append(socket)
self.poller.register(socket, self.receive_message)
LOG.info(_LI("Run %(stype)s consumer on %(addr)s:%(port)d"),
{"stype": zmq_names.socket_type_str(socket_type),
"addr": socket.bind_address,
"port": socket.port})
LOG.debug("Run %(stype)s consumer on %(addr)s:%(port)d",
{"stype": zmq_names.socket_type_str(socket_type),
"addr": socket.bind_address,
"port": socket.port})
return socket
except zmq.ZMQError as e:
errmsg = _LE("Failed binding to port %(port)d: %(e)s")\

View File

@ -56,9 +56,9 @@ class PullConsumer(zmq_consumer_base.SingleSocketConsumer):
assert msg_type is not None, 'Bad format: msg type expected'
context = socket.recv_pyobj()
message = socket.recv_pyobj()
LOG.info(_LI("Received %(msg_type)s message %(msg)s")
% {"msg_type": msg_type,
"msg": str(message)})
LOG.debug("Received %(msg_type)s message %(msg)s"
% {"msg_type": msg_type,
"msg": str(message)})
if msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES):
return PullIncomingMessage(self.server, context, message)

View File

@ -21,7 +21,7 @@ from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE, _LI
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
@ -43,7 +43,7 @@ class RouterIncomingMessage(base.IncomingMessage):
"""Reply is not needed for non-call messages"""
def acknowledge(self):
LOG.info("Not sending acknowledge for %s", self.msg_id)
LOG.debug("Not sending acknowledge for %s", self.msg_id)
def requeue(self):
"""Requeue is not supported"""
@ -83,11 +83,11 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
def receive_message(self, socket):
try:
request, reply_id = self._receive_request(socket)
LOG.info(_LI("[%(host)s] Received %(type)s, %(id)s, %(target)s")
% {"host": self.host,
"type": request.msg_type,
"id": request.message_id,
"target": request.target})
LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s"
% {"host": self.host,
"type": request.msg_type,
"id": request.message_id,
"target": request.target})
if request.msg_type == zmq_names.CALL_TYPE:
return zmq_incoming_message.ZmqIncomingRequest(

View File

@ -45,9 +45,10 @@ class ZmqIncomingRequest(base.IncomingMessage):
zmq_names.FIELD_REPLY: reply,
zmq_names.FIELD_FAILURE: failure,
zmq_names.FIELD_LOG_FAILURE: log_failure,
zmq_names.FIELD_ID: self.request.proxy_reply_id}
zmq_names.FIELD_ID: self.request.proxy_reply_id,
zmq_names.FIELD_MSG_ID: self.request.message_id}
LOG.info("Replying %s REP", (str(self.request.message_id)))
LOG.debug("Replying %s", (str(self.request.message_id)))
self.received = True
self.reply_socket.send(self.reply_id, zmq.SNDMORE)

View File

@ -80,3 +80,13 @@ def _raise_error_if_invalid_config_value(zmq_concurrency):
if zmq_concurrency not in ZMQ_MODULES:
errmsg = _('Invalid zmq_concurrency value: %s')
raise ValueError(errmsg % zmq_concurrency)
def get_queue(zmq_concurrency='eventlet'):
_raise_error_if_invalid_config_value(zmq_concurrency)
if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available():
import eventlet
return eventlet.queue.Queue(), eventlet.queue.Empty
else:
import six
return six.moves.queue.Queue(), six.moves.queue.Empty

View File

@ -47,6 +47,9 @@ class ZmqSocket(object):
def setsockopt(self, *args, **kwargs):
self.handle.setsockopt(*args, **kwargs)
def setsockopt_string(self, *args, **kwargs):
self.handle.setsockopt_string(*args, **kwargs)
def send(self, *args, **kwargs):
self.handle.send(*args, **kwargs)