Merge "[zmq] Reduce proxy for direct messaging"
This commit is contained in:
commit
f3c8a5cce8
@ -138,28 +138,24 @@ stored in Redis is that the key is a base topic and the corresponding values are
|
||||
hostname arrays to be sent to.
|
||||
|
||||
|
||||
Proxy to avoid blocking (optional)
|
||||
----------------------------------
|
||||
Proxy for fanout publishing
|
||||
---------------------------
|
||||
|
||||
Each machine running OpenStack services, or sending RPC messages, may run the
|
||||
'oslo-messaging-zmq-broker' daemon. This is needed to avoid blocking
|
||||
if a listener (server) appears after the sender (client).
|
||||
Each machine running OpenStack services, or sending RPC messages, should run
|
||||
the 'oslo-messaging-zmq-broker' daemon.
|
||||
|
||||
Fanout-based patterns like CAST+Fanout and notifications always use proxy
|
||||
as they act over PUB/SUB, 'use_pub_sub' - defaults to True. If not using
|
||||
PUB/SUB (use_pub_sub = False) then fanout will be emulated over direct
|
||||
DEALER/ROUTER unicast which is possible but less efficient and therefore
|
||||
is not recommended.
|
||||
is not recommended. In a case of direct DEALER/ROUTER unicast proxy is not
|
||||
needed.
|
||||
|
||||
Running direct RPC methods like CALL and CAST over a proxy is controlled by
|
||||
the option 'direct_over_proxy' which is True by default.
|
||||
|
||||
These options can be set in [DEFAULT] section.
|
||||
This option can be set in [DEFAULT] section.
|
||||
|
||||
For example::
|
||||
|
||||
use_pub_sub = True
|
||||
direct_over_proxy = False
|
||||
|
||||
|
||||
In case of using the broker all publishers (clients) talk to servers over
|
||||
|
@ -72,11 +72,7 @@ zmq_opts = [
|
||||
help='Expiration timeout in seconds of a name service record '
|
||||
'about existing target ( < 0 means no timeout).'),
|
||||
|
||||
cfg.BoolOpt('direct_over_proxy', default=False,
|
||||
help='Configures zmq-messaging to use proxy with '
|
||||
'non PUB/SUB patterns.'),
|
||||
|
||||
cfg.BoolOpt('use_pub_sub', default=True,
|
||||
cfg.BoolOpt('use_pub_sub', default=False,
|
||||
help='Use PUB/SUB pattern for fanout methods. '
|
||||
'PUB/SUB always uses proxy.'),
|
||||
|
||||
|
@ -15,14 +15,12 @@
|
||||
import logging
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_publisher_proxy
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers \
|
||||
import zmq_pub_publisher
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._i18n import _LI
|
||||
from oslo_messaging._i18n import _LE, _LI
|
||||
|
||||
zmq = zmq_async.import_zmq(zmq_concurrency='native')
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -41,9 +39,6 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
|
||||
LOG.info(_LI("Polling at universal proxy"))
|
||||
|
||||
self.matchmaker = matchmaker
|
||||
reply_receiver = zmq_dealer_publisher_proxy.ReplyReceiver(self.poller)
|
||||
self.direct_publisher = zmq_dealer_publisher_proxy \
|
||||
.DealerPublisherProxy(conf, matchmaker, reply_receiver)
|
||||
self.pub_publisher = zmq_pub_publisher.PubPublisherProxy(
|
||||
conf, matchmaker)
|
||||
|
||||
@ -54,8 +49,6 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
|
||||
|
||||
if socket == self.router_socket:
|
||||
self._redirect_in_request(message)
|
||||
else:
|
||||
self._redirect_reply(message)
|
||||
|
||||
def _redirect_in_request(self, multipart_message):
|
||||
LOG.debug("-> Redirecting request %s to TCP publisher",
|
||||
@ -65,19 +58,6 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
|
||||
envelope[zmq_names.FIELD_MSG_TYPE] \
|
||||
in zmq_names.MULTISEND_TYPES:
|
||||
self.pub_publisher.send_request(multipart_message)
|
||||
else:
|
||||
self.direct_publisher.send_request(multipart_message)
|
||||
|
||||
def _redirect_reply(self, reply):
|
||||
LOG.debug("Reply proxy %s", reply)
|
||||
if reply[zmq_names.IDX_REPLY_TYPE] == zmq_names.ACK_TYPE:
|
||||
LOG.debug("Acknowledge dropped %s", reply)
|
||||
return
|
||||
|
||||
LOG.debug("<- Redirecting reply to ROUTER: reply: %s",
|
||||
reply[zmq_names.IDX_REPLY_BODY:])
|
||||
|
||||
self.router_socket.send_multipart(reply[zmq_names.IDX_REPLY_BODY:])
|
||||
|
||||
def _receive_in_request(self, socket):
|
||||
reply_id = socket.recv()
|
||||
@ -85,8 +65,9 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
|
||||
empty = socket.recv()
|
||||
assert empty == b'', "Empty delimiter expected"
|
||||
envelope = socket.recv_pyobj()
|
||||
if envelope[zmq_names.FIELD_MSG_TYPE] == zmq_names.CALL_TYPE:
|
||||
envelope[zmq_names.FIELD_REPLY_ID] = reply_id
|
||||
if envelope[zmq_names.FIELD_MSG_TYPE] not in zmq_names.MULTISEND_TYPES:
|
||||
LOG.error(_LE("Message type %s is not supported by proxy"),
|
||||
envelope[zmq_names.FIELD_MSG_TYPE])
|
||||
payload = socket.recv_multipart()
|
||||
payload.insert(0, envelope)
|
||||
return payload
|
||||
|
@ -43,9 +43,7 @@ class DealerCallPublisher(object):
|
||||
self.conf = conf
|
||||
self.matchmaker = matchmaker
|
||||
self.reply_waiter = ReplyWaiter(conf)
|
||||
self.sender = RequestSender(conf, matchmaker, self.reply_waiter) \
|
||||
if not conf.direct_over_proxy else \
|
||||
RequestSenderLight(conf, matchmaker, self.reply_waiter)
|
||||
self.sender = RequestSender(conf, matchmaker, self.reply_waiter)
|
||||
|
||||
def send_request(self, request):
|
||||
reply_future = self.sender.send_request(request)
|
||||
@ -113,39 +111,6 @@ class RequestSender(zmq_publisher_base.PublisherBase):
|
||||
super(RequestSender, self).cleanup()
|
||||
|
||||
|
||||
class RequestSenderLight(RequestSender):
|
||||
"""This class used with proxy.
|
||||
|
||||
Simplified address matching because there is only
|
||||
one proxy IPC address.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, matchmaker, reply_waiter):
|
||||
if not conf.direct_over_proxy:
|
||||
raise rpc_common.RPCException("RequestSenderLight needs a proxy!")
|
||||
|
||||
super(RequestSenderLight, self).__init__(
|
||||
conf, matchmaker, reply_waiter)
|
||||
|
||||
self.socket = None
|
||||
|
||||
def _connect_socket(self, target):
|
||||
return self.outbound_sockets.get_socket_to_broker(target)
|
||||
|
||||
def _do_send_request(self, socket, request):
|
||||
LOG.debug("Sending %(type)s message_id %(message)s"
|
||||
" to a target %(target)s",
|
||||
{"type": request.msg_type,
|
||||
"message": request.message_id,
|
||||
"target": request.target})
|
||||
|
||||
envelope = request.create_envelope()
|
||||
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send_pyobj(envelope, zmq.SNDMORE)
|
||||
socket.send_pyobj(request)
|
||||
|
||||
|
||||
class ReplyWaiter(object):
|
||||
|
||||
def __init__(self, conf):
|
||||
|
@ -1,87 +0,0 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_publisher
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._i18n import _LI, _LW
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DealerPublisherProxy(zmq_dealer_publisher.DealerPublisher):
|
||||
|
||||
def __init__(self, conf, matchmaker, reply_receiver):
|
||||
super(DealerPublisherProxy, self).__init__(conf, matchmaker)
|
||||
self.reply_receiver = reply_receiver
|
||||
|
||||
def send_request(self, multipart_message):
|
||||
|
||||
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
|
||||
|
||||
LOG.debug("Envelope: %s", envelope)
|
||||
|
||||
target = envelope[zmq_names.FIELD_TARGET]
|
||||
dealer_socket = self._check_hosts_connections(
|
||||
target, zmq_names.socket_type_str(zmq.ROUTER))
|
||||
|
||||
if not dealer_socket.connections:
|
||||
# NOTE(ozamiatin): Here we can provide
|
||||
# a queue for keeping messages to send them later
|
||||
# when some listener appears. However such approach
|
||||
# being more reliable will consume additional memory.
|
||||
LOG.warning(_LW("Request %s was dropped because no connection"),
|
||||
envelope[zmq_names.FIELD_MSG_TYPE])
|
||||
return
|
||||
|
||||
self.reply_receiver.track_socket(dealer_socket.handle)
|
||||
|
||||
LOG.debug("Sending message %(message)s to a target %(target)s"
|
||||
% {"message": envelope[zmq_names.FIELD_MSG_ID],
|
||||
"target": envelope[zmq_names.FIELD_TARGET]})
|
||||
|
||||
if envelope[zmq_names.FIELD_MSG_TYPE] in zmq_names.MULTISEND_TYPES:
|
||||
for _ in range(dealer_socket.connections_count()):
|
||||
self._send_request(dealer_socket, multipart_message)
|
||||
else:
|
||||
self._send_request(dealer_socket, multipart_message)
|
||||
|
||||
def _send_request(self, socket, multipart_message):
|
||||
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send_pyobj(
|
||||
multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE],
|
||||
zmq.SNDMORE)
|
||||
socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY])
|
||||
|
||||
|
||||
class ReplyReceiver(object):
|
||||
|
||||
def __init__(self, poller):
|
||||
self.poller = poller
|
||||
LOG.info(_LI("Reply waiter created in broker"))
|
||||
|
||||
def _receive_reply(self, socket):
|
||||
return socket.recv_multipart()
|
||||
|
||||
def track_socket(self, socket):
|
||||
self.poller.register(socket, self._receive_reply)
|
||||
|
||||
def cleanup(self):
|
||||
self.poller.close()
|
@ -30,9 +30,7 @@ class ZmqClient(zmq_client_base.ZmqClientBase):
|
||||
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
|
||||
|
||||
default_publisher = zmq_dealer_publisher.DealerPublisher(
|
||||
conf, matchmaker) if not conf.direct_over_proxy else \
|
||||
zmq_dealer_publisher.DealerPublisherLight(
|
||||
conf, zmq_address.get_broker_address(conf))
|
||||
conf, matchmaker)
|
||||
|
||||
fanout_publisher = zmq_dealer_publisher.DealerPublisherLight(
|
||||
conf, zmq_address.get_broker_address(conf)) \
|
||||
|
@ -68,7 +68,6 @@ class Request(object):
|
||||
"retry must be an integer, not {0}".format(type(retry)))
|
||||
|
||||
self.message_id = str(uuid.uuid1())
|
||||
self.proxy_reply_id = None
|
||||
|
||||
def create_envelope(self):
|
||||
return {'msg_type': self.msg_type,
|
||||
|
@ -100,23 +100,6 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||
LOG.error(_LE("Receiving message failed: %s"), str(e))
|
||||
|
||||
|
||||
class RouterConsumerBroker(RouterConsumer):
|
||||
|
||||
def __init__(self, conf, poller, server):
|
||||
super(RouterConsumerBroker, self).__init__(conf, poller, server)
|
||||
|
||||
def _receive_request(self, socket):
|
||||
reply_id = socket.recv()
|
||||
empty = socket.recv()
|
||||
assert empty == b'', 'Bad format: empty delimiter expected'
|
||||
envelope = socket.recv_pyobj()
|
||||
request = socket.recv_pyobj()
|
||||
|
||||
if zmq_names.FIELD_REPLY_ID in envelope:
|
||||
request.proxy_reply_id = envelope[zmq_names.FIELD_REPLY_ID]
|
||||
return request, reply_id
|
||||
|
||||
|
||||
class TargetsManager(object):
|
||||
|
||||
def __init__(self, conf, matchmaker, host):
|
||||
|
@ -45,7 +45,6 @@ class ZmqIncomingRequest(base.IncomingMessage):
|
||||
zmq_names.FIELD_REPLY: reply,
|
||||
zmq_names.FIELD_FAILURE: failure,
|
||||
zmq_names.FIELD_LOG_FAILURE: log_failure,
|
||||
zmq_names.FIELD_ID: self.request.proxy_reply_id,
|
||||
zmq_names.FIELD_MSG_ID: self.request.message_id}
|
||||
|
||||
LOG.debug("Replying %s", (str(self.request.message_id)))
|
||||
@ -53,10 +52,6 @@ class ZmqIncomingRequest(base.IncomingMessage):
|
||||
self.received = True
|
||||
self.reply_socket.send(self.reply_id, zmq.SNDMORE)
|
||||
self.reply_socket.send(b'', zmq.SNDMORE)
|
||||
if self.request.proxy_reply_id:
|
||||
self.reply_socket.send_string(zmq_names.REPLY_TYPE, zmq.SNDMORE)
|
||||
self.reply_socket.send(self.request.proxy_reply_id, zmq.SNDMORE)
|
||||
self.reply_socket.send(b'', zmq.SNDMORE)
|
||||
self.reply_socket.send_pyobj(message_reply)
|
||||
self.poller.resume_polling(self.reply_socket)
|
||||
|
||||
|
@ -34,10 +34,8 @@ class ZmqServer(base.Listener):
|
||||
super(ZmqServer, self).__init__(driver)
|
||||
self.matchmaker = matchmaker
|
||||
self.poller = zmq_async.get_poller()
|
||||
self.router_consumer = zmq_router_consumer.RouterConsumerBroker(
|
||||
conf, self.poller, self) if conf.direct_over_proxy else \
|
||||
zmq_router_consumer.RouterConsumer(
|
||||
conf, self.poller, self)
|
||||
self.router_consumer = zmq_router_consumer.RouterConsumer(
|
||||
conf, self.poller, self)
|
||||
self.sub_consumer = zmq_sub_consumer.SubConsumer(
|
||||
conf, self.poller, self) if conf.use_pub_sub else None
|
||||
self.notify_consumer = self.sub_consumer if conf.use_pub_sub \
|
||||
|
@ -145,7 +145,7 @@ class TestZmqBasics(zmq_common.ZmqBaseTestCase):
|
||||
|
||||
message = {'method': 'hello-world', 'tx_id': 1}
|
||||
context = {}
|
||||
target.topic = target.topic + '.info'
|
||||
target.topic += '.info'
|
||||
self.driver.send_notification(target, context, message, '3.0')
|
||||
self.listener._received.wait(5)
|
||||
self.assertTrue(self.listener._received.isSet())
|
||||
|
@ -78,7 +78,6 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
|
||||
'rpc_response_timeout': 5,
|
||||
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
|
||||
'use_pub_sub': False,
|
||||
'direct_over_proxy': False,
|
||||
'rpc_zmq_matchmaker': 'dummy'}
|
||||
self.config(**kwargs)
|
||||
|
||||
|
@ -31,8 +31,7 @@ class StartupOrderTestCase(multiproc_utils.MutliprocTestCase):
|
||||
self.conf.project = "test_project"
|
||||
|
||||
kwargs = {'rpc_response_timeout': 30,
|
||||
'use_pub_sub': False,
|
||||
'direct_over_proxy': False}
|
||||
'use_pub_sub': False}
|
||||
self.config(**kwargs)
|
||||
|
||||
log_path = self.conf.rpc_zmq_ipc_dir + "/" + str(os.getpid()) + ".log"
|
||||
|
Loading…
Reference in New Issue
Block a user