[zmq] Redesign router proxy

In this change router was redesigned in a way most
appropriate for routing concept of zmq.ROUTER socket.

DEALER(cli)-ROUTER(proxy)-DEALER(srv) instead of
DEALER-ROUTER-DEALER-ROUTER (3 layers instead of 4)

The main reason is to use zmq.DEALER identity in message
routing. For this reason DealerConsumer was introduced
server-side. RouterConsumer is left for peer-to-peer
DEALER-ROUTER deployment option.

Also handled assertions in receive-methods in order
to not stop server when received message with wrong format.

Change-Id: If25edf500fa8d220d4233bb13d67121824e841c6
Closes-Bug: #1558601
Related-Bug: #1555007
changes/53/308653/25
ozamiatin 7 years ago
parent 7f4826737d
commit b5955b6ca9

@ -42,6 +42,11 @@ services) is both ZeroMQ client and server. As a result, each host needs to
listen to a certain TCP port for incoming connections and directly connect
to other hosts simultaneously.
Another option is to use a router proxy. It is not a broker because it
doesn't assume any message ownership or persistence or replication etc. It
performs only a redirection of messages to endpoints taking routing info from
message envelope.
Topics are used to identify the destination for a ZeroMQ RPC call. There are
two types of topics, bare topics and directed topics. Bare topics look like
'compute', while directed topics look like 'compute.machine1'.
@ -66,9 +71,10 @@ Assuming the following systems as a goal.
| Keystone | | Nova |
| Glance | | nova-compute |
| Neutron | | Ceilometer |
| Cinder | | Oslo-zmq-receiver |
| Cinder | | |
| Ceilometer | +------------------------+
| Oslo-zmq-receiver |
| zmq-proxy |
| Redis |
| Horizon |
+---------------------+
@ -125,6 +131,7 @@ which is 120 (seconds) by default. The option is related not specifically to
redis so it is also defined in [DEFAULT] section. If option value is <= 0
then keys don't expire and live forever in the storage.
MatchMaker Data Source (mandatory)
----------------------------------
@ -137,50 +144,34 @@ stored in Redis is that the key is a base topic and the corresponding values are
hostname arrays to be sent to.
Proxy and huge number of TCP sockets
------------------------------------
Restrict the number of TCP sockets on controller
------------------------------------------------
The most heavily used RPC pattern (CALL) may consume too many TCP sockets on
controller node in directly connected configuration. To solve the issue
ROUTER proxy may be used.
The most heavily used RPC pattern (CALL) may consume too many TCP sockets in
directly connected configuration. To solve the issue ROUTER proxy may be used.
In order to configure driver to use ROUTER proxy set up the 'use_router_proxy'
option to True in [DEFAULT] section (False is set by default).
option to true in [DEFAULT] section (false is set by default).
For example::
use_router_proxy = True
use_router_proxy = true
Not less than 3 proxies should be running on controllers or on stand alone
nodes. The parameters for the script oslo-messaging-zmq-proxy should be::
oslo-messaging-zmq-proxy
--type ROUTER
--config-file /etc/oslo/zeromq.conf
--log-file /var/log/oslo/zmq-router-proxy.log
Proxy for fanout publishing
---------------------------
Fanout-based patterns like CAST+Fanout and notifications always use proxy
as they act over PUB/SUB, 'use_pub_sub' option defaults to True. In such case
publisher proxy should be running. Publisher-proxies are independent from each
other. Recommended number of proxies in the cloud is not less than 3. You
may run them on a standalone nodes or on controller nodes.
The parameters for the script oslo-messaging-zmq-proxy should be::
oslo-messaging-zmq-proxy
--type PUBLISHER
--config-file /etc/oslo/zeromq.conf
--log-file /var/log/oslo/zmq-publisher-proxy.log
Actually PUBLISHER is the default value for the parameter --type, so
could be omitted::
as they act over PUB/SUB, 'use_pub_sub' option defaults to true. In such case
publisher proxy should be running. Actually proxy does both: routing to a
DEALER endpoint for direct messages and publishing to all subscribers over
zmq.PUB socket.
oslo-messaging-zmq-proxy
--config-file /etc/oslo/zeromq.conf
--log-file /var/log/oslo/zmq-publisher-proxy.log
If not using PUB/SUB (use_pub_sub = False) then fanout will be emulated over
If not using PUB/SUB (use_pub_sub = false) then fanout will be emulated over
direct DEALER/ROUTER unicast which is possible but less efficient and therefore
is not recommended. In a case of direct DEALER/ROUTER unicast proxy is not
needed.
@ -189,7 +180,7 @@ This option can be set in [DEFAULT] section.
For example::
use_pub_sub = True
use_pub_sub = true
In case of using a proxy all publishers (clients) talk to servers over
@ -239,12 +230,23 @@ For example::
enable_plugin zmq https://github.com/openstack/devstack-plugin-zmq.git
.. _devstack-plugin-zmq: https://github.com/openstack/devstack-plugin-zmq.git
Example of local.conf::
Current Status
--------------
[[local|localrc]]
DATABASE_PASSWORD=password
ADMIN_PASSWORD=password
SERVICE_PASSWORD=password
SERVICE_TOKEN=password
The current development status of ZeroMQ driver is shown in `wiki`_.
enable_plugin zmq https://github.com/openstack/devstack-plugin-zmq.git
.. _wiki: https://wiki.openstack.org/ZeroMQ
OSLOMSG_REPO=https://review.openstack.org/openstack/oslo.messaging
OSLOMSG_BRANCH=master
ZEROMQ_MATCHMAKER=redis
LIBS_FROM_GIT=oslo.messaging
ENABLE_DEBUG_LOG_LEVEL=True
.. _devstack-plugin-zmq: https://github.com/openstack/devstack-plugin-zmq.git

@ -28,29 +28,22 @@ CONF.register_opts(server._pool_opts)
CONF.rpc_zmq_native = True
USAGE = """ Usage: ./zmq-proxy.py --type {PUBLISHER,ROUTER} [-h] [] ...
USAGE = """ Usage: ./zmq-proxy.py [-h] [] ...
Usage example:
python oslo_messaging/_cmd/zmq-proxy.py\
--type PUBLISHER"""
PUBLISHER = 'PUBLISHER'
ROUTER = 'ROUTER'
PROXY_TYPES = (PUBLISHER, ROUTER)
python oslo_messaging/_cmd/zmq-proxy.py"""
def main():
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)s '
'%(levelname)-8s %(message)s')
parser = argparse.ArgumentParser(
description='ZeroMQ proxy service',
usage=USAGE
)
parser.add_argument('--type', dest='proxy_type', type=str,
default=PUBLISHER,
help='Proxy type PUBLISHER or ROUTER')
parser.add_argument('--config-file', dest='config_file', type=str,
help='Path to configuration file')
args = parser.parse_args()
@ -58,18 +51,12 @@ def main():
if args.config_file:
cfg.CONF(["--config-file", args.config_file])
if args.proxy_type not in PROXY_TYPES:
raise Exception("Bad proxy type %s, should be one of %s" %
(args.proxy_type, PROXY_TYPES))
reactor = zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.PublisherProxy) \
if args.proxy_type == PUBLISHER \
else zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.RouterProxy)
reactor = zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.UniversalQueueProxy)
try:
while True:
reactor.run()
except KeyboardInterrupt:
except (KeyboardInterrupt, SystemExit):
reactor.close()
if __name__ == "__main__":

@ -86,7 +86,7 @@ zmq_opts = [
help='Use PUB/SUB pattern for fanout methods. '
'PUB/SUB always uses proxy.'),
cfg.BoolOpt('use_router_proxy', default=False,
cfg.BoolOpt('use_router_proxy', default=True,
help='Use ROUTER remote proxy for direct methods.'),
cfg.PortOpt('rpc_zmq_min_port',

@ -12,11 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_proxy
from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_pub_publisher
from oslo_messaging._drivers.zmq_driver import zmq_address
@ -47,101 +44,50 @@ class UniversalQueueProxy(object):
self.router_address = zmq_address.combine_address(
self.conf.rpc_zmq_host, self.router_socket.port)
self.pub_publisher = zmq_pub_publisher.PubPublisherProxy(
conf, matchmaker)
self.matchmaker.register_publisher(
(self.pub_publisher.host, self.router_address))
LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Run PUB publisher"),
{"pub": self.pub_publisher.host,
"router": self.router_address})
def run(self):
message, socket = self.poller.poll(self.conf.rpc_poll_timeout)
if message is None:
return
if socket == self.router_socket.handle:
self._redirect_in_request(message)
else:
self._redirect_reply(message)
@abc.abstractmethod
def _redirect_in_request(self, multipart_message):
"""Redirect incoming request to a publisher."""
@abc.abstractmethod
def _redirect_reply(self, multipart_message):
"""Redirect reply to client. Implement in a concrete proxy."""
envelope = message[zmq_names.MULTIPART_IDX_ENVELOPE]
if self.conf.use_pub_sub and envelope.is_mult_send:
LOG.debug("-> Redirecting request %s to TCP publisher", envelope)
self.pub_publisher.send_request(message)
elif not envelope.is_mult_send:
self._redirect_message(message)
def _receive_in_request(self, socket):
@staticmethod
def _receive_in_request(socket):
reply_id = socket.recv()
assert reply_id is not None, "Valid id expected"
empty = socket.recv()
assert empty == b'', "Empty delimiter expected"
envelope = socket.recv_pyobj()
envelope.reply_id = reply_id
payload = socket.recv_multipart()
payload.insert(zmq_names.MULTIPART_IDX_ENVELOPE, envelope)
return payload
def cleanup(self):
self.router_socket.close()
class PublisherProxy(UniversalQueueProxy):
def __init__(self, conf, context, matchmaker):
super(PublisherProxy, self).__init__(conf, context, matchmaker)
LOG.info(_LI("Polling at PUBLISHER proxy"))
self.pub_publisher = zmq_pub_publisher.PubPublisherProxy(
conf, matchmaker)
self.matchmaker.register_publisher(
(self.pub_publisher.host, self.router_address))
LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Run PUB publisher"),
{"pub": self.pub_publisher.host,
"router": self.router_address})
def _redirect_in_request(self, multipart_message):
def _redirect_message(self, multipart_message):
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
if self.conf.use_pub_sub and envelope.is_mult_send:
LOG.debug("-> Redirecting request %s to TCP publisher", envelope)
self.pub_publisher.send_request(multipart_message)
def _redirect_reply(self, multipart_message):
"""No reply is possible for publisher."""
def cleanup(self):
super(PublisherProxy, self).cleanup()
self.pub_publisher.cleanup()
self.matchmaker.unregister_publisher(
(self.pub_publisher.host, self.router_address))
class RouterProxy(UniversalQueueProxy):
def __init__(self, conf, context, matchmaker):
super(RouterProxy, self).__init__(conf, context, matchmaker)
LOG.info(_LI("Polling at ROUTER proxy"))
self.dealer_publisher \
= zmq_dealer_publisher_proxy.DealerPublisherProxy(
conf, matchmaker, self.poller)
self.matchmaker.register_router(self.router_address)
LOG.info(_LI("ROUTER:%(router)s] Run ROUTER publisher"),
{"router": self.router_address})
def _redirect_in_request(self, multipart_message):
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
LOG.debug("-> Redirecting request %s to TCP publisher", envelope)
if not envelope.is_mult_send:
self.dealer_publisher.send_request(multipart_message)
def _redirect_reply(self, multipart_message):
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
LOG.debug("<- Redirecting reply: %s", envelope)
LOG.debug("<-> Route message: %s", envelope)
response_binary = multipart_message[zmq_names.MULTIPART_IDX_BODY]
self.router_socket.send(envelope.reply_id, zmq.SNDMORE)
self.router_socket.send(envelope.routing_key, zmq.SNDMORE)
self.router_socket.send(b'', zmq.SNDMORE)
self.router_socket.send_pyobj(envelope, zmq.SNDMORE)
self.router_socket.send(response_binary)
def cleanup(self):
super(RouterProxy, self).cleanup()
self.dealer_publisher.cleanup()
self.matchmaker.unregister_router(self.router_address)
self.router_socket.close()
self.pub_publisher.cleanup()
self.matchmaker.unregister_publisher(
(self.pub_publisher.host, self.router_address))

@ -13,17 +13,18 @@
# under the License.
import logging
import threading
from concurrent import futures
import futurist
import oslo_messaging
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_reply_waiter
from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LW
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
@ -37,43 +38,33 @@ class DealerCallPublisher(object):
instead of ReqPublisher.
"""
def __init__(self, conf, matchmaker):
def __init__(self, conf, matchmaker, sockets_manager, sender=None,
reply_waiter=None):
super(DealerCallPublisher, self).__init__()
self.conf = conf
self.matchmaker = matchmaker
self.reply_waiter = ReplyWaiter(conf)
self.sockets_manager = zmq_publisher_base.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
def _do_send_request(socket, request):
target_hosts = self.sockets_manager.get_hosts(request.target)
envelope = request.create_envelope(target_hosts)
# DEALER socket specific envelope empty delimiter
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(envelope, zmq.SNDMORE)
socket.send_pyobj(request)
LOG.debug("Sent message_id %(message)s to a target %(target)s",
{"message": request.message_id,
"target": request.target})
self.sender = CallSender(self.sockets_manager, _do_send_request,
self.reply_waiter) \
if not conf.use_router_proxy else \
CallSenderLight(self.sockets_manager, _do_send_request,
self.reply_waiter)
self.reply_waiter = reply_waiter or zmq_reply_waiter.ReplyWaiter(conf)
self.sockets_manager = sockets_manager
self.sender = sender or CallSender(self.sockets_manager,
self.reply_waiter)
def send_request(self, request):
reply_future = self.sender.send_request(request)
try:
reply = reply_future.result(timeout=request.timeout)
LOG.debug("Received reply %s", request.message_id)
except AssertionError:
LOG.error(_LE("Message format error in reply %s"),
request.message_id)
return None
except futures.TimeoutError:
raise oslo_messaging.MessagingTimeout(
"Timeout %s seconds was reached" % request.timeout)
"Timeout %(tout)s seconds was reached for message %(id)s" %
{"tout": request.timeout,
"id": request.message_id})
finally:
self.reply_waiter.untrack_id(request.message_id)
LOG.debug("Received reply %s", reply)
if reply.failure:
raise rpc_common.deserialize_remote_exception(
reply.failure,
@ -88,11 +79,23 @@ class DealerCallPublisher(object):
class CallSender(zmq_publisher_base.QueuedSender):
def __init__(self, sockets_manager, _do_send_request, reply_waiter):
super(CallSender, self).__init__(sockets_manager, _do_send_request)
def __init__(self, sockets_manager, reply_waiter):
super(CallSender, self).__init__(sockets_manager,
self._do_send_request)
assert reply_waiter, "Valid ReplyWaiter expected!"
self.reply_waiter = reply_waiter
def _do_send_request(self, socket, request):
envelope = request.create_envelope()
# DEALER socket specific envelope empty delimiter
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(envelope, zmq.SNDMORE)
socket.send_pyobj(request)
LOG.debug("Sent message_id %(message)s to a target %(target)s",
{"message": request.message_id,
"target": request.target})
def send_request(self, request):
reply_future = futurist.Future()
self.reply_waiter.track_reply(reply_future, request.message_id)
@ -103,61 +106,3 @@ class CallSender(zmq_publisher_base.QueuedSender):
socket = self.outbound_sockets.get_socket(target)
self.reply_waiter.poll_socket(socket)
return socket
class CallSenderLight(CallSender):
def __init__(self, sockets_manager, _do_send_request, reply_waiter):
super(CallSenderLight, self).__init__(
sockets_manager, _do_send_request, reply_waiter)
self.socket = self.outbound_sockets.get_socket_to_routers()
self.reply_waiter.poll_socket(self.socket)
def _connect_socket(self, target):
return self.socket
class ReplyWaiter(object):
def __init__(self, conf):
self.conf = conf
self.replies = {}
self.poller = zmq_async.get_poller()
self.executor = zmq_async.get_executor(self.run_loop)
self.executor.execute()
self._lock = threading.Lock()
def track_reply(self, reply_future, message_id):
with self._lock:
self.replies[message_id] = reply_future
def untrack_id(self, message_id):
with self._lock:
self.replies.pop(message_id)
def poll_socket(self, socket):
def _receive_method(socket):
empty = socket.recv()
assert empty == b'', "Empty expected!"
envelope = socket.recv_pyobj()
assert envelope is not None, "Invalid envelope!"
reply = socket.recv_pyobj()
LOG.debug("Received reply %s", reply)
return reply
self.poller.register(socket, recv_method=_receive_method)
def run_loop(self):
reply, socket = self.poller.poll(
timeout=self.conf.rpc_poll_timeout)
if reply is not None:
call_future = self.replies.get(reply.message_id)
if call_future:
call_future.set_result(reply)
else:
LOG.warning(_LW("Received timed out reply: %s"),
reply.message_id)
def cleanup(self):
self.poller.close()

@ -89,32 +89,3 @@ class DealerPublisherAsync(object):
def cleanup(self):
self.sockets_manager.cleanup()
class DealerPublisherLight(object):
"""Used when publishing to a proxy. """
def __init__(self, conf, matchmaker):
self.sockets_manager = zmq_publisher_base.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
self.socket = self.sockets_manager.get_socket_to_publishers()
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(
request.msg_type)
envelope = request.create_envelope()
self.socket.send(b'', zmq.SNDMORE)
self.socket.send_pyobj(envelope, zmq.SNDMORE)
self.socket.send_pyobj(request)
LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to "
"a target %(target)s",
{"message": request.message_id,
"target": request.target,
"addr": list(self.socket.connections)})
def cleanup(self):
self.socket.close()

@ -13,7 +13,12 @@
# under the License.
import logging
import time
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_call_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_reply_waiter
from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
@ -25,43 +30,109 @@ LOG = logging.getLogger(__name__)
class DealerPublisherProxy(object):
"""Used when publishing to a proxy. """
def __init__(self, conf, matchmaker, poller):
super(DealerPublisherProxy, self).__init__()
self.conf = conf
self.matchmaker = matchmaker
self.poller = poller
def __init__(self, conf, matchmaker, socket_to_proxy):
self.sockets_manager = zmq_publisher_base.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
self.socket = socket_to_proxy
self.routing_table = RoutingTable(conf, matchmaker)
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(
request.msg_type)
def send_request(self, multipart_message):
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
if envelope.is_mult_send:
raise zmq_publisher_base.UnsupportedSendPattern(envelope.msg_type)
if not envelope.target_hosts:
raise Exception("Target hosts are expected!")
dealer_socket = self.sockets_manager.get_socket_to_hosts(
envelope.target, envelope.target_hosts)
self.poller.register(dealer_socket.handle, self.receive_reply)
LOG.debug("Sending message %(message)s to a target %(target)s"
% {"message": envelope.message_id,
"target": envelope.target})
# Empty delimiter - DEALER socket specific
dealer_socket.send(b'', zmq.SNDMORE)
dealer_socket.send_pyobj(envelope, zmq.SNDMORE)
dealer_socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY])
def receive_reply(self, socket):
empty = socket.recv()
assert empty == b'', "Empty expected!"
envelope = socket.recv_pyobj()
assert envelope is not None, "Invalid envelope!"
reply = socket.recv()
LOG.debug("Received reply %s", envelope)
return [envelope, reply]
envelope = request.create_envelope(
routing_key=self.routing_table.get_routable_host(request.target)
if request.msg_type in zmq_names.DIRECT_TYPES else None)
self.socket.send(b'', zmq.SNDMORE)
self.socket.send_pyobj(envelope, zmq.SNDMORE)
self.socket.send_pyobj(request)
LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to "
"a target %(target)s",
{"message": request.message_id,
"target": request.target,
"addr": list(self.socket.connections)})
def cleanup(self):
self.sockets_manager.cleanup()
self.socket.close()
class DealerCallPublisherProxy(zmq_dealer_call_publisher.DealerCallPublisher):
def __init__(self, conf, matchmaker, sockets_manager):
reply_waiter = zmq_reply_waiter.ReplyWaiter(conf)
sender = CallSenderProxy(conf, matchmaker, sockets_manager,
reply_waiter)
super(DealerCallPublisherProxy, self).__init__(
conf, matchmaker, sockets_manager, sender, reply_waiter)
class CallSenderProxy(zmq_dealer_call_publisher.CallSender):
def __init__(self, conf, matchmaker, sockets_manager, reply_waiter):
super(CallSenderProxy, self).__init__(
sockets_manager, reply_waiter)
self.socket = self.outbound_sockets.get_socket_to_publishers()
self.reply_waiter.poll_socket(self.socket)
self.routing_table = RoutingTable(conf, matchmaker)
def _connect_socket(self, target):
return self.socket
def _do_send_request(self, socket, request):
envelope = request.create_envelope(
routing_key=self.routing_table.get_routable_host(request.target),
reply_id=self.socket.handle.identity)
# DEALER socket specific envelope empty delimiter
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(envelope, zmq.SNDMORE)
socket.send_pyobj(request)
LOG.debug("Sent message_id %(message)s to a target %(target)s",
{"message": request.message_id,
"target": request.target})
class RoutingTable(object):
"""This class implements local routing-table cache
taken from matchmaker. Its purpose is to give the next routable
host id (remote DEALER's id) by request for specific target in
round-robin fashion.
"""
def __init__(self, conf, matchmaker):
self.conf = conf
self.matchmaker = matchmaker
self.routing_table = {}
self.routable_hosts = {}
def get_routable_host(self, target):
self._update_routing_table(target)
hosts_for_target = self.routable_hosts[str(target)]
host = hosts_for_target.pop(0)
if not hosts_for_target:
self._renew_routable_hosts(target)
return host
def _is_tm_expired(self, tm):
return 0 <= self.conf.zmq_target_expire <= time.time() - tm
def _update_routing_table(self, target):
routing_record = self.routing_table.get(str(target))
if routing_record is None:
self._fetch_hosts(target)
self._renew_routable_hosts(target)
elif self._is_tm_expired(routing_record[1]):
self._fetch_hosts(target)
def _fetch_hosts(self, target):
self.routing_table[str(target)] = (self.matchmaker.get_hosts(
target, zmq_names.socket_type_str(zmq.DEALER)), time.time())
def _renew_routable_hosts(self, target):
hosts, _ = self.routing_table[str(target)]
self.routable_hosts[str(target)] = list(hosts)

@ -0,0 +1,69 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LW
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class ReplyWaiter(object):
def __init__(self, conf):
self.conf = conf
self.replies = {}
self.poller = zmq_async.get_poller()
self.executor = zmq_async.get_executor(self.run_loop)
self.executor.execute()
self._lock = threading.Lock()
def track_reply(self, reply_future, message_id):
with self._lock:
self.replies[message_id] = reply_future
def untrack_id(self, message_id):
with self._lock:
self.replies.pop(message_id)
def poll_socket(self, socket):
def _receive_method(socket):
empty = socket.recv()
assert empty == b'', "Empty expected!"
envelope = socket.recv_pyobj()
assert envelope is not None, "Invalid envelope!"
reply = socket.recv_pyobj()
LOG.debug("Received reply %s", envelope)
return reply
self.poller.register(socket, recv_method=_receive_method)
def run_loop(self):
reply, socket = self.poller.poll(
timeout=self.conf.rpc_poll_timeout)
if reply is not None:
call_future = self.replies.get(reply.message_id)
if call_future:
call_future.set_result(reply)
else:
LOG.warning(_LW("Received timed out reply: %s"),
reply.message_id)
def cleanup(self):
self.poller.close()

@ -52,8 +52,6 @@ class PubPublisherProxy(object):
self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
self.socket.port)
self.sync_channel = SyncChannel(conf, matchmaker, self.zmq_context)
def send_request(self, multipart_message):
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
@ -72,42 +70,4 @@ class PubPublisherProxy(object):
"topic": topic_filter})
def cleanup(self):
self.matchmaker.unregister_publisher(
(self.host, self.sync_channel.sync_host))
self.socket.close()
class SyncChannel(object):
"""Subscribers synchronization channel
As far as PUB/SUB is one directed way pattern we need some
backwards channel to have a possibility of subscribers
to talk back to publisher.
May be used for heartbeats or some kind of acknowledgments etc.
"""
def __init__(self, conf, matchmaker, context):
self.conf = conf
self.matchmaker = matchmaker
self.context = context
self._ready = None
# NOTE(ozamiatin): May be used for heartbeats when we
# implement them
self.sync_socket = zmq_socket.ZmqRandomPortSocket(
self.conf, self.context, zmq.PULL)
self.poller = zmq_async.get_poller()
self.poller.register(self.sync_socket)
self.sync_host = zmq_address.combine_address(self.conf.rpc_zmq_host,
self.sync_socket.port)
def is_ready(self):
LOG.debug("[%s] Waiting for ready from first subscriber",
self.sync_host)
if self._ready is None:
self._ready = self.poller.poll()
LOG.debug("[%s] Received ready from first subscriber",
self.sync_host)
return self._ready is not None

@ -109,6 +109,7 @@ class SocketsManager(object):
self.socket_type = socket_type
self.zmq_context = zmq.Context()
self.outbound_sockets = {}
self.socket_to_publishers = None
def _track_socket(self, socket, target):
self.outbound_sockets[str(target)] = (socket, time.time())
@ -152,20 +153,14 @@ class SocketsManager(object):
return socket
def get_socket_to_publishers(self):
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
self.socket_type)
if self.socket_to_publishers is not None:
return self.socket_to_publishers
self.socket_to_publishers = zmq_socket.ZmqSocket(
self.conf, self.zmq_context, self.socket_type)
publishers = self.matchmaker.get_publishers()
for pub_address, router_address in publishers:
socket.connect_to_host(router_address)
return socket
def get_socket_to_routers(self):
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
self.socket_type)
routers = self.matchmaker.get_routers()
for router_address in routers:
socket.connect_to_host(router_address)
return socket
self.socket_to_publishers.connect_to_host(router_address)
return self.socket_to_publishers
def cleanup(self):
for socket, tm in self.outbound_sockets.values():

@ -17,6 +17,10 @@ from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_call_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_proxy
from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
@ -28,23 +32,31 @@ class ZmqClient(zmq_client_base.ZmqClientBase):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
self.sockets_manager = zmq_publisher_base.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
default_publisher = zmq_dealer_publisher.DealerPublisher(
conf, matchmaker)
cast_publisher = zmq_dealer_publisher.DealerPublisherAsync(
conf, matchmaker) \
if zmq_async.is_eventlet_concurrency(conf) \
else default_publisher
publisher_to_proxy = zmq_dealer_publisher_proxy.DealerPublisherProxy(
conf, matchmaker, self.sockets_manager.get_socket_to_publishers())
call_publisher = zmq_dealer_publisher_proxy.DealerCallPublisherProxy(
conf, matchmaker, self.sockets_manager) if conf.use_router_proxy \
else zmq_dealer_call_publisher.DealerCallPublisher(
conf, matchmaker, self.sockets_manager)
cast_publisher = publisher_to_proxy if conf.use_router_proxy \
else zmq_dealer_publisher.DealerPublisherAsync(
conf, matchmaker)
fanout_publisher = zmq_dealer_publisher.DealerPublisherLight(
conf, matchmaker) if conf.use_pub_sub else default_publisher
fanout_publisher = publisher_to_proxy \
if conf.use_pub_sub else default_publisher
super(ZmqClient, self).__init__(
conf, matchmaker, allowed_remote_exmods,
publishers={
zmq_names.CALL_TYPE:
zmq_dealer_call_publisher.DealerCallPublisher(
conf, matchmaker),
zmq_names.CALL_TYPE: call_publisher,
zmq_names.CAST_TYPE: cast_publisher,

@ -19,12 +19,12 @@ from oslo_messaging._drivers.zmq_driver import zmq_names
class Envelope(object):
def __init__(self, msg_type=None, message_id=None, target=None,
target_hosts=None, **kwargs):
routing_key=None, **kwargs):
self._msg_type = msg_type
self._message_id = message_id
self._target = target
self._target_hosts = target_hosts
self._reply_id = None
self._routing_key = routing_key
self._kwargs = kwargs
@property
@ -35,10 +35,22 @@ class Envelope(object):
def reply_id(self, value):
self._reply_id = value
@property
def routing_key(self):
return self._routing_key
@routing_key.setter
def routing_key(self, value):
self._routing_key = value
@property
def msg_type(self):
return self._msg_type
@msg_type.setter
def msg_type(self, value):
self._msg_type = value
@property
def message_id(self):
return self._message_id
@ -47,10 +59,6 @@ class Envelope(object):
def target(self):
return self._target
@property
def target_hosts(self):
return self._target_hosts
@property
def is_mult_send(self):
return self._msg_type in zmq_names.MULTISEND_TYPES
@ -72,7 +80,7 @@ class Envelope(object):
envelope = {zmq_names.FIELD_MSG_TYPE: self._msg_type,
zmq_names.FIELD_MSG_ID: self._message_id,
zmq_names.FIELD_TARGET: self._target,
zmq_names.FIELD_TARGET_HOSTS: self._target_hosts}
zmq_names.FIELD_ROUTING_KEY: self._routing_key}
envelope.update({k: v for k, v in self._kwargs.items()
if v is not None})
return envelope

@ -70,11 +70,12 @@ class Request(object):
self.message_id = str(uuid.uuid1())
def create_envelope(self, hosts=None):
def create_envelope(self, routing_key=None, reply_id=None):
envelope = zmq_envelope.Envelope(msg_type=self.msg_type,
message_id=self.message_id,
target=self.target,
target_hosts=hosts)
routing_key=routing_key)
envelope.reply_id = reply_id
return envelope
@abc.abstractproperty
@ -114,8 +115,9 @@ class CallRequest(RpcRequest):
super(CallRequest, self).__init__(*args, **kwargs)
def create_envelope(self, hosts=None):
envelope = super(CallRequest, self).create_envelope(hosts)
def create_envelope(self, routing_key=None, reply_id=None):
envelope = super(CallRequest, self).create_envelope(
routing_key, reply_id)
envelope.set('timeout', self.timeout)
return envelope

@ -45,13 +45,13 @@ matchmaker_redis_opts = [
default='oslo-messaging-zeromq',
help='Redis replica set name.'),
cfg.IntOpt('wait_timeout',
default=500,
default=5000,
help='Time in ms to wait between connection attempts.'),
cfg.IntOpt('check_timeout',
default=20000,
default=60000,
help='Time in ms to wait before the transaction is killed.'),
cfg.IntOpt('socket_timeout',
default=1000,
default=10000,
help='Timeout in ms on blocking socket operations'),
]

@ -0,0 +1,123 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver.client import zmq_response
from oslo_messaging._drivers.zmq_driver.server.consumers\
import zmq_consumer_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class DealerIncomingMessage(base.RpcIncomingMessage):
def __init__(self, context, message, msg_id):
super(DealerIncomingMessage, self).__init__(context, message)
self.msg_id = msg_id
def reply(self, reply=None, failure=None, log_failure=True):
"""Reply is not needed for non-call messages"""
def acknowledge(self):
LOG.debug("Not sending acknowledge for %s", self.msg_id)
def requeue(self):
"""Requeue is not supported"""
class DealerIncomingRequest(base.RpcIncomingMessage):
def __init__(self, socket, request, envelope):
super(DealerIncomingRequest, self).__init__(request.context,
request.message)
self.reply_socket = socket
self.request = request
self.envelope = envelope
def reply(self, reply=None, failure=None, log_failure=True):
if failure is not None:
failure = rpc_common.serialize_remote_exception(failure,
log_failure)
response = zmq_response.Response(type=zmq_names.REPLY_TYPE,
message_id=self.request.message_id,
reply_id=self.envelope.reply_id,
reply_body=reply,
failure=failure,
log_failure=log_failure)
LOG.debug("Replying %s", (str(self.request.message_id)))
self.envelope.routing_key = self.envelope.reply_id
self.envelope.msg_type = zmq_names.REPLY_TYPE
self.reply_socket.send(b'', zmq.SNDMORE)
self.reply_socket.send_pyobj(self.envelope, zmq.SNDMORE)
self.reply_socket.send_pyobj(response)
def requeue(self):
"""Requeue is not supported"""
class DealerConsumer(zmq_consumer_base.ConsumerBase):
def __init__(self, conf, poller, server):
super(DealerConsumer, self).__init__(conf, poller, server)
self.matchmaker = server.matchmaker
self.target = server.target
self.sockets_manager = zmq_publisher_base.SocketsManager(
conf, self.matchmaker, zmq.ROUTER, zmq.DEALER)
self.socket = self.sockets_manager.get_socket_to_publishers()
self.poller.register(self.socket, self.receive_message)
self.host = self.socket.handle.identity
self.target_updater = zmq_consumer_base.TargetUpdater(
conf, self.matchmaker, self.target, self.host,
zmq.DEALER)
LOG.info(_LI("[%s] Run DEALER consumer"), self.host)
def _receive_request(self, socket):
empty = socket.recv()
assert empty == b'', 'Bad format: empty delimiter expected'
envelope = socket.recv_pyobj()
request = socket.recv_pyobj()
return request, envelope
def receive_message(self, socket):
try:
request, envelope = self._receive_request(socket)
LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s",
{"host": self.host,
"type": request.msg_type,
"id": request.message_id,
"target": request.target})
if request.msg_type == zmq_names.CALL_TYPE:
return DealerIncomingRequest(socket, request, envelope)
elif request.msg_type in zmq_names.NON_BLOCKING_TYPES:
return DealerIncomingMessage(request.context, request.message,
request.message_id)
else:
LOG.error(_LE("Unknown message type: %s"), request.msg_type)
except (zmq.ZMQError, AssertionError) as e:
LOG.error(_LE("Receiving message failure: %s"), str(e))

@ -65,5 +65,5 @@ class PullConsumer(zmq_consumer_base.SingleSocketConsumer):
else:
LOG.error(_LE("Unknown message type: %s"), msg_type)
except zmq.ZMQError as e:
except (zmq.ZMQError, AssertionError) as e:
LOG.error(_LE("Receiving message failed: %s"), str(e))

@ -80,5 +80,5 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
else:
LOG.error(_LE("Unknown message type: %s"), request.msg_type)
except zmq.ZMQError as e:
except (zmq.ZMQError, AssertionError) as e:
LOG.error(_LE("Receiving message failed: %s"), str(e))

@ -55,7 +55,6 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
super(SubConsumer, self).__init__(conf, poller, server)
self.matchmaker = server.matchmaker
self.target = server.target
self.subscriptions = set()
self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB)
self.sockets.append(self.socket)
self.id = uuid.uuid4()
@ -75,13 +74,10 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
topic_filter = zmq_address.target_to_subscribe_filter(target)
if target.topic:
self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.topic))
self.subscriptions.add(six.b(target.topic))
if target.server:
self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.server))
self.subscriptions.add(six.b(target.server))
if target.topic and target.server:
self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter)
self.subscriptions.add(topic_filter)
LOG.debug("[%(host)s] Subscribing to topic %(filter)s",
{"host": self.id, "filter": topic_filter})
@ -90,7 +86,6 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
topic_filter = socket.recv()
LOG.debug("[%(id)s] Received %(topic_filter)s topic",
{'id': self.id, 'topic_filter': topic_filter})
assert topic_filter in self.subscriptions
request = socket.recv_pyobj()
return request
@ -108,7 +103,7 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
LOG.error(_LE("Unknown message type: %s"), request.msg_type)
else:
return SubIncomingMessage(request, socket)
except zmq.ZMQError as e:
except (zmq.ZMQError, AssertionError) as e:
LOG.error(_LE("Receiving message failed: %s"), str(e))
def cleanup(self):

@ -16,6 +16,8 @@ import copy
import logging
from oslo_messaging._drivers import base
from oslo_messaging._drivers.zmq_driver.server.consumers\
import zmq_dealer_consumer
from oslo_messaging._drivers.zmq_driver.server.consumers\
import zmq_router_consumer
from oslo_messaging._drivers.zmq_driver.server.consumers\
@ -37,12 +39,19 @@ class ZmqServer(base.PollStyleListener):
self.matchmaker = matchmaker
self.target = target
self.poller = poller or zmq_async.get_poller()
self.router_consumer = zmq_router_consumer.RouterConsumer(
conf, self.poller, self)
conf, self.poller, self) if not conf.use_router_proxy else None
self.dealer_consumer = zmq_dealer_consumer.DealerConsumer(
conf, self.poller, self) if conf.use_router_proxy else None
self.sub_consumer = zmq_sub_consumer.SubConsumer(
conf, self.poller, self) if conf.use_pub_sub else None
self.consumers = [self.router_consumer]
self.consumers = []
if self.router_consumer:
self.consumers.append(self.router_consumer)
if self.dealer_consumer:
self.consumers.append(self.dealer_consumer)
if self.sub_consumer:
self.consumers.append(self.sub_consumer)
@ -53,9 +62,10 @@ class ZmqServer(base.PollStyleListener):
return message
def stop(self):
consumer = self.router_consumer
LOG.info(_LI("Stop server %(address)s:%(port)s"),
{'address': consumer.address, 'port': consumer.port})
if self.router_consumer:
LOG.info(_LI("Stop server %(address)s:%(port)s"),
{'address': self.router_consumer.address,
'port': self.router_consumer.port})
def cleanup(self):
self.poller.close()

@ -26,7 +26,7 @@ FIELD_MSG_ID = 'message_id'
FIELD_MSG_TYPE = 'msg_type'
FIELD_REPLY_ID = 'reply_id'
FIELD_TARGET = 'target'
FIELD_TARGET_HOSTS = 'target_hosts'
FIELD_ROUTING_KEY = 'routing_key'
IDX_REPLY_TYPE = 1

@ -12,12 +12,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from fixtures._fixtures import timeout
import retrying
from stevedore import driver
import testscenarios
import testtools
import retrying
import oslo_messaging
from oslo_messaging.tests import utils as test_utils
from oslo_utils import importutils
@ -97,6 +97,6 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
hosts = []
try:
hosts = self.test_matcher.get_hosts(target, "test")
except retrying.RetryError:
except (timeout.TimeoutException, retrying.RetryError):
pass
self.assertEqual(hosts, [])

@ -30,8 +30,7 @@ class StartupOrderTestCase(multiproc_utils.MutliprocTestCase):
self.conf.prog = "test_prog"
self.conf.project = "test_project"
kwargs = {'rpc_response_timeout': 30,
'use_pub_sub': False}
kwargs = {'rpc_response_timeout': 30}
self.config(**kwargs)
log_path = self.conf.rpc_zmq_ipc_dir + "/" + str(os.getpid()) + ".log"

@ -23,7 +23,6 @@ EOF
redis-server --port $ZMQ_REDIS_PORT &
oslo-messaging-zmq-proxy --type PUBLISHER --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-publisher.log 2>&1 &
oslo-messaging-zmq-proxy --type ROUTER --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-router.log 2>&1 &
oslo-messaging-zmq-proxy --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-publisher.log 2>&1 &
$*

Loading…
Cancel
Save