[zmq] Routing table refactoring, dynamic direct connections

* Refactored RoutingTable, in order to work on string target
      representation.
    * RoutingTableAdaptor - provides usable adaptation of RoutingTable
      to use with Target object.
    * RoutingTableUpdater - implements asynchronous table updates
      from the matchmaker.
    * Implemented dynamic connections for DealerPublisherDirect
    * Use dynamic connections for direct messages

Change-Id: I20a76c3b2e8f9e71ffcc5ac658fbc659ad4c8153
This commit is contained in:
ozamiatin 2016-09-18 03:51:15 +03:00 committed by Gevorg Davoian
parent e491573654
commit a5e2a633e3
20 changed files with 627 additions and 255 deletions

View File

@ -34,15 +34,18 @@ class DealerPublisherBase(zmq_publisher_base.PublisherBase):
def __init__(self, conf, matchmaker, sender, receiver):
sockets_manager = zmq_sockets_manager.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER
)
super(DealerPublisherBase, self).__init__(sockets_manager, sender,
receiver)
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
self.socket_type = zmq.DEALER
super(DealerPublisherBase, self).__init__(
sockets_manager, sender, receiver)
def _check_received_data(self, reply_id, reply, request):
assert isinstance(reply, zmq_response.Reply), "Reply expected!"
def _recv_reply(self, request, socket):
def _finally_unregister(self, socket, request):
self.receiver.untrack_request(request)
def receive_reply(self, socket, request):
self.receiver.register_socket(socket)
reply_future = \
self.receiver.track_request(request)[zmq_names.REPLY_TYPE]
@ -57,11 +60,10 @@ class DealerPublisherBase(zmq_publisher_base.PublisherBase):
except futures.TimeoutError:
self._raise_timeout(request)
finally:
self.receiver.untrack_request(request)
self._finally_unregister(socket, request)
if reply.failure:
raise rpc_common.deserialize_remote_exception(
reply.failure, request.allowed_remote_exmods
)
reply.failure, request.allowed_remote_exmods)
else:
return reply.reply_body

View File

@ -12,16 +12,18 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
import retrying
import logging
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_base
from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
LOG = logging.getLogger(__name__)
@ -29,9 +31,32 @@ zmq = zmq_async.import_zmq()
class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
"""DEALER-publisher using direct connections."""
"""DEALER-publisher using direct connections.
Publishing directly to remote services assumes the following:
- All direct connections are dynamic - so they live per message,
thus each message send executes the following:
* Open a new socket
* Connect to some host got from the RoutingTable
* Send message(s)
* Close connection, destroy socket
- RoutingTable/RoutingTableUpdater implements local cache of
matchmaker (e.g. Redis) for target resolution to the list of
available hosts. Cache updates in a background thread.
- Caching of connections is not appropriate for directly connected
OS services, because finally it results in a full-mesh of
connections between services.
- Yes we lose on performance opening and closing connections
for each message, but that is done intentionally to implement
the dynamic connections concept. The key thought here is to
have minimum number of connected services at the moment.
- Using the local RoutingTable cache is done to optimise access
to the matchmaker so we don't call the matchmaker per each message
"""
def __init__(self, conf, matchmaker):
self.routing_table = zmq_routing_table.RoutingTableAdaptor(
conf, matchmaker, zmq.ROUTER)
sender = zmq_senders.RequestSenderDirect(conf)
if conf.oslo_messaging_zmq.rpc_use_acks:
receiver = zmq_receivers.AckAndReplyReceiverDirect(conf)
@ -40,19 +65,34 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
super(DealerPublisherDirect, self).__init__(conf, matchmaker, sender,
receiver)
def _connect_socket(self, request):
try:
return self.sockets_manager.get_socket(request.target)
except retrying.RetryError:
return None
def _get_round_robin_host_connection(self, target, socket):
host = self.routing_table.get_round_robin_host(target)
socket.connect_to_host(host)
def _send_request(self, request):
socket = self._connect_socket(request)
if not socket:
return None
def _get_fanout_connection(self, target, socket):
for host in self.routing_table.get_fanout_hosts(target):
socket.connect_to_host(host)
def acquire_connection(self, request):
socket = zmq_socket.ZmqSocket(self.conf, self.context,
self.socket_type, immediate=False)
if request.msg_type in zmq_names.DIRECT_TYPES:
self._get_round_robin_host_connection(request.target, socket)
elif request.msg_type in zmq_names.MULTISEND_TYPES:
self._get_fanout_connection(request.target, socket)
return socket
def _finally_unregister(self, socket, request):
super(DealerPublisherDirect, self)._finally_unregister(socket, request)
self.receiver.unregister_socket(socket)
def send_request(self, socket, request):
if request.msg_type in zmq_names.MULTISEND_TYPES:
for _ in range(socket.connections_count()):
self.sender.send(socket, request)
else:
self.sender.send(socket, request)
return socket
def cleanup(self):
self.routing_table.cleanup()
super(DealerPublisherDirect, self).cleanup()

View File

@ -13,6 +13,7 @@
# under the License.
import logging
import random
import uuid
import six
@ -22,11 +23,13 @@ from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._drivers.zmq_driver import zmq_updater
from oslo_messaging._i18n import _LW
LOG = logging.getLogger(__name__)
@ -46,8 +49,10 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
receiver)
self.socket = self.sockets_manager.get_socket_to_publishers(
self._generate_identity())
self.routing_table = zmq_routing_table.RoutingTable(self.conf,
self.matchmaker)
self.routing_table = zmq_routing_table.RoutingTableAdaptor(
conf, matchmaker, zmq.DEALER)
self.connection_updater = \
PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket)
@ -63,30 +68,24 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
def _get_routing_keys(self, request):
if request.msg_type in zmq_names.DIRECT_TYPES:
return [self.routing_table.get_routable_host(request.target)]
return [self.routing_table.get_round_robin_host(request.target)]
else:
return \
[zmq_address.target_to_subscribe_filter(request.target)] \
if self.conf.oslo_messaging_zmq.use_pub_sub else \
self.routing_table.get_all_hosts(request.target)
self.routing_table.get_fanout_hosts(request.target)
def _send_request(self, request):
routing_keys = [routing_key
for routing_key in self._get_routing_keys(request)
if routing_key is not None]
if not routing_keys:
LOG.warning(_LW("Matchmaker contains no records for specified "
"target %(target)s. Dropping message %(msg_id)s.")
% {"target": request.target,
"msg_id": request.message_id})
return None
for routing_key in routing_keys:
request.routing_key = routing_key
self.sender.send(self.socket, request)
def acquire_connection(self, request):
return self.socket
def send_request(self, socket, request):
for routing_key in self._get_routing_keys(request):
request.routing_key = routing_key
self.sender.send(socket, request)
def cleanup(self):
super(DealerPublisherProxy, self).cleanup()
self.routing_table.cleanup()
self.connection_updater.stop()
self.socket.close()
@ -97,3 +96,49 @@ class PublisherConnectionUpdater(zmq_updater.ConnectionUpdater):
publishers = self.matchmaker.get_publishers()
for pub_address, router_address in publishers:
self.socket.connect_to_host(router_address)
class DealerPublisherProxyDynamic(
zmq_dealer_publisher_base.DealerPublisherBase):
def __init__(self, conf, matchmaker):
self.publishers = set()
self.updater = DynamicPublishersUpdater(conf, matchmaker,
self.publishers)
self.updater.update_publishers()
sender = zmq_senders.RequestSenderProxy(conf)
receiver = zmq_receivers.ReplyReceiverDirect(conf)
super(DealerPublisherProxyDynamic, self).__init__(
conf, matchmaker, sender, receiver)
def acquire_connection(self, request):
socket = zmq_socket.ZmqSocket(self.conf, self.context,
self.socket_type, immediate=False)
if not self.publishers:
raise zmq_matchmaker_base.MatchmakerUnavailable()
socket.connect_to_host(random.choice(tuple(self.publishers)))
return socket
def send_request(self, socket, request):
assert request.msg_type in zmq_names.MULTISEND_TYPES
request.routing_key = zmq_address.target_to_subscribe_filter(
request.target)
self.sender.send(socket, request)
def cleanup(self):
super(DealerPublisherProxyDynamic, self).cleanup()
self.updater.cleanup()
class DynamicPublishersUpdater(zmq_updater.UpdaterBase):
def __init__(self, conf, matchmaker, publishers):
super(DynamicPublishersUpdater, self).__init__(
conf, matchmaker, self.update_publishers,
sleep_for=conf.oslo_messaging_zmq.zmq_target_update
)
self.publishers = publishers
def update_publishers(self):
for _, pub_frontend in self.matchmaker.get_publishers():
self.publishers.add(pub_frontend)

View File

@ -18,32 +18,13 @@ import logging
import six
import oslo_messaging
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class UnsupportedSendPattern(rpc_common.RPCException):
"""Exception to raise from publishers in case of unsupported
sending pattern called.
"""
def __init__(self, pattern_name):
"""Construct exception object
:param pattern_name: Message type name from zmq_names
:type pattern_name: str
"""
errmsg = _LE("Sending pattern %s is unsupported.") % pattern_name
super(UnsupportedSendPattern, self).__init__(errmsg)
@six.add_metaclass(abc.ABCMeta)
class PublisherBase(object):
@ -68,16 +49,40 @@ class PublisherBase(object):
:param receiver: reply receiver object
:type receiver: zmq_receivers.ReplyReceiver
"""
self.context = zmq.Context()
self.sockets_manager = sockets_manager
self.conf = sockets_manager.conf
self.matchmaker = sockets_manager.matchmaker
self.sender = sender
self.receiver = receiver
@staticmethod
def _check_message_pattern(expected, actual):
if expected != actual:
raise UnsupportedSendPattern(zmq_names.message_type_str(actual))
@abc.abstractmethod
def acquire_connection(self, request):
"""Get socket to publish request on it
:param request: request object
:type senders: zmq_request.Request
"""
@abc.abstractmethod
def send_request(self, socket, request):
"""Publish request on a socket
:param socket: socket object to publish request on
:type socket: zmq_socket.ZmqSocket
:param request: request object
:type senders: zmq_request.Request
"""
@abc.abstractmethod
def receive_reply(self, socket, request):
"""Wait for a reply via the socket used for sending the request.
:param socket: socket object to receive reply from
:type socket: zmq_socket.ZmqSocket
:param request: request object
:type senders: zmq_request.Request
"""
@staticmethod
def _raise_timeout(request):
@ -86,37 +91,6 @@ class PublisherBase(object):
{"tout": request.timeout, "msg_id": request.message_id}
)
@abc.abstractmethod
def _send_request(self, request):
"""Send the request and return a socket used for that.
Return value of None means some failure (e.g. connection
can't be established, etc).
"""
@abc.abstractmethod
def _recv_reply(self, request, socket):
"""Wait for a reply via the socket used for sending the request."""
def send_call(self, request):
self._check_message_pattern(zmq_names.CALL_TYPE, request.msg_type)
socket = self._send_request(request)
if not socket:
raise self._raise_timeout(request)
return self._recv_reply(request, socket)
def send_cast(self, request):
self._check_message_pattern(zmq_names.CAST_TYPE, request.msg_type)
self._send_request(request)
def send_fanout(self, request):
self._check_message_pattern(zmq_names.CAST_FANOUT_TYPE,
request.msg_type)
self._send_request(request)
def send_notify(self, request):
self._check_message_pattern(zmq_names.NOTIFY_TYPE, request.msg_type)
self._send_request(request)
def cleanup(self):
"""Cleanup publisher. Close allocated connections."""
self.receiver.stop()

View File

@ -15,6 +15,7 @@
from concurrent import futures
import logging
from oslo_messaging._drivers.zmq_driver.client import zmq_publisher_manager
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE, _LW
@ -24,38 +25,10 @@ LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class AckManagerBase(object):
class AckManager(zmq_publisher_manager.PublisherManagerBase):
def __init__(self, publisher):
self.publisher = publisher
self.conf = publisher.conf
self.sender = publisher.sender
self.receiver = publisher.receiver
def send_call(self, request):
return self.publisher.send_call(request)
def send_cast(self, request):
self.publisher.send_cast(request)
def send_fanout(self, request):
self.publisher.send_fanout(request)
def send_notify(self, request):
self.publisher.send_notify(request)
def cleanup(self):
self.publisher.cleanup()
class AckManagerDirect(AckManagerBase):
pass
class AckManagerProxy(AckManagerBase):
def __init__(self, publisher):
super(AckManagerProxy, self).__init__(publisher)
super(AckManager, self).__init__(publisher)
self._pool = zmq_async.get_pool(
size=self.conf.oslo_messaging_zmq.rpc_thread_pool_size
)
@ -99,33 +72,39 @@ class AckManagerProxy(AckManagerBase):
if request.msg_type != zmq_names.CALL_TYPE:
self.receiver.untrack_request(request)
def _send_request_and_get_ack_future(self, request):
socket = self.publisher._send_request(request)
if not socket:
return None
def _schedule_request_for_ack(self, request):
socket = self.publisher.acquire_connection(request)
self.publisher.send_request(socket, request)
self.receiver.register_socket(socket)
ack_future = self.receiver.track_request(request)[zmq_names.ACK_TYPE]
futures_by_type = self.receiver.track_request(request)
ack_future = futures_by_type[zmq_names.ACK_TYPE]
ack_future.request = request
ack_future.socket = socket
return ack_future
@zmq_publisher_manager.target_not_found_timeout
def send_call(self, request):
ack_future = self._send_request_and_get_ack_future(request)
if not ack_future:
self.publisher._raise_timeout(request)
self._pool.submit(self._wait_for_ack, ack_future)
try:
return self.publisher._recv_reply(request, ack_future.socket)
ack_future = self._schedule_request_for_ack(request)
self._pool.submit(self._wait_for_ack, ack_future)
return self.publisher.receive_reply(ack_future.socket, request)
finally:
if not ack_future.done():
ack_future.set_result((None, None))
@zmq_publisher_manager.target_not_found_warn
def send_cast(self, request):
ack_future = self._send_request_and_get_ack_future(request)
if not ack_future:
return
ack_future = self._schedule_request_for_ack(request)
self._pool.submit(self._wait_for_ack, ack_future)
@zmq_publisher_manager.target_not_found_warn
def _send_request(self, request):
socket = self.publisher.acquire_connection(request)
self.publisher.send_request(socket, request)
def cleanup(self):
self._pool.shutdown(wait=True)
super(AckManagerProxy, self).cleanup()
super(AckManager, self).cleanup()
send_fanout = _send_request
send_notify = _send_request

View File

@ -14,11 +14,6 @@
from oslo_messaging._drivers import common
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_direct
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_proxy
from oslo_messaging._drivers.zmq_driver.client import zmq_ack_manager
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
@ -44,17 +39,9 @@ class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase):
conf.oslo_messaging_zmq.use_pub_sub:
raise WrongClientException()
publisher_direct = self.create_publisher(
conf, matchmaker,
zmq_dealer_publisher_direct.DealerPublisherDirect,
zmq_ack_manager.AckManagerDirect
)
publisher_proxy = self.create_publisher(
conf, matchmaker,
zmq_dealer_publisher_proxy.DealerPublisherProxy,
zmq_ack_manager.AckManagerProxy
)
publisher_direct = self._create_publisher_direct(conf, matchmaker)
publisher_proxy = self._create_publisher_proxy_dynamic(conf,
matchmaker)
super(ZmqClientMixDirectPubSub, self).__init__(
conf, matchmaker, allowed_remote_exmods,
@ -80,15 +67,10 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase):
conf.oslo_messaging_zmq.use_router_proxy:
raise WrongClientException()
publisher = self.create_publisher(
conf, matchmaker,
zmq_dealer_publisher_direct.DealerPublisherDirect,
zmq_ack_manager.AckManagerDirect
)
super(ZmqClientDirect, self).__init__(
conf, matchmaker, allowed_remote_exmods,
publishers={"default": publisher}
publishers={"default": self._create_publisher_direct(
conf, matchmaker)}
)
@ -107,13 +89,8 @@ class ZmqClientProxy(zmq_client_base.ZmqClientBase):
if not conf.oslo_messaging_zmq.use_router_proxy:
raise WrongClientException()
publisher = self.create_publisher(
conf, matchmaker,
zmq_dealer_publisher_proxy.DealerPublisherProxy,
zmq_ack_manager.AckManagerProxy
)
super(ZmqClientProxy, self).__init__(
conf, matchmaker, allowed_remote_exmods,
publishers={"default": publisher}
publishers={"default": self._create_publisher_proxy(
conf, matchmaker)}
)

View File

@ -12,6 +12,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_direct
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_proxy
from oslo_messaging._drivers.zmq_driver.client import zmq_ack_manager
from oslo_messaging._drivers.zmq_driver.client import zmq_publisher_manager
from oslo_messaging._drivers.zmq_driver.client import zmq_request
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
@ -37,13 +43,6 @@ class ZmqClientBase(object):
self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE,
publishers["default"])
@staticmethod
def create_publisher(conf, matchmaker, publisher_cls, ack_manager_cls):
publisher = publisher_cls(conf, matchmaker)
if conf.oslo_messaging_zmq.rpc_use_acks:
publisher = ack_manager_cls(publisher)
return publisher
def send_call(self, target, context, message, timeout=None, retry=None):
request = zmq_request.CallRequest(
target, context=context, message=message, retry=retry,
@ -70,6 +69,27 @@ class ZmqClientBase(object):
)
self.notify_publisher.send_notify(request)
@staticmethod
def _create_publisher_direct(conf, matchmaker):
publisher_direct = zmq_dealer_publisher_direct.DealerPublisherDirect(
conf, matchmaker)
return zmq_publisher_manager.PublisherManagerDynamic(
publisher_direct)
@staticmethod
def _create_publisher_proxy(conf, matchmaker):
publisher_proxy = zmq_dealer_publisher_proxy.DealerPublisherProxy(
conf, matchmaker)
return zmq_ack_manager.AckManager(publisher_proxy) \
if conf.oslo_messaging_zmq.rpc_use_acks else \
zmq_publisher_manager.PublisherManagerStatic(publisher_proxy)
@staticmethod
def _create_publisher_proxy_dynamic(conf, matchmaker):
return zmq_publisher_manager.PublisherManagerDynamic(
zmq_dealer_publisher_proxy.DealerPublisherProxyDynamic(
conf, matchmaker))
def cleanup(self):
cleaned = set()
for publisher in self.publishers.values():

View File

@ -0,0 +1,143 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import contextlib
import logging
import retrying
import six
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LW
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
def target_not_found_warn(func):
def _target_not_found_warn(self, request, *args, **kwargs):
try:
return func(self, request, *args, **kwargs)
except (zmq_matchmaker_base.MatchmakerUnavailable,
retrying.RetryError):
LOG.warning(_LW("Matchmaker contains no records for specified "
"target %(target)s. Dropping message %(msg_id)s.")
% {"target": request.target,
"msg_id": request.message_id})
return _target_not_found_warn
def target_not_found_timeout(func):
def _target_not_found_timeout(self, request, *args, **kwargs):
try:
return func(self, request, *args, **kwargs)
except (zmq_matchmaker_base.MatchmakerUnavailable,
retrying.RetryError):
self.publisher._raise_timeout(request)
return _target_not_found_timeout
@six.add_metaclass(abc.ABCMeta)
class PublisherManagerBase(object):
"""Abstract publisher manager class
Publisher knows how to establish connection, how to send message,
and how to receive reply. PublisherManager coordinates all these steps
regarding retrying logic in AckManager implementations
"""
def __init__(self, publisher):
self.publisher = publisher
self.conf = publisher.conf
self.sender = publisher.sender
self.receiver = publisher.receiver
@abc.abstractmethod
def send_call(self, request):
"""Send call request
:param request: request object
:type senders: zmq_request.Request
"""
@abc.abstractmethod
def send_cast(self, request):
"""Send call request
:param request: request object
:type senders: zmq_request.Request
"""
@abc.abstractmethod
def send_fanout(self, request):
"""Send call request
:param request: request object
:type senders: zmq_request.Request
"""
@abc.abstractmethod
def send_notify(self, request):
"""Send call request
:param request: request object
:type senders: zmq_request.Request
"""
def cleanup(self):
self.publisher.cleanup()
class PublisherManagerDynamic(PublisherManagerBase):
@target_not_found_timeout
def send_call(self, request):
with contextlib.closing(
self.publisher.acquire_connection(request)) as socket:
self.publisher.send_request(socket, request)
reply = self.publisher.receive_reply(socket, request)
return reply
@target_not_found_warn
def _send(self, request):
with contextlib.closing(self.publisher.acquire_connection(request)) \
as socket:
self.publisher.send_request(socket, request)
send_cast = _send
send_fanout = _send
send_notify = _send
class PublisherManagerStatic(PublisherManagerBase):
@target_not_found_timeout
def send_call(self, request):
socket = self.publisher.acquire_connection(request)
self.publisher.send_request(socket, request)
reply = self.publisher.receive_reply(socket, request)
return reply
@target_not_found_warn
def _send(self, request):
socket = self.publisher.acquire_connection(request)
self.publisher.send_request(socket, request)
send_cast = _send
send_fanout = _send
send_notify = _send

View File

@ -95,8 +95,7 @@ class ReceiverBase(object):
def _run_loop(self):
data, socket = self._poller.poll(
timeout=self.conf.oslo_messaging_zmq.rpc_poll_timeout
)
timeout=self.conf.oslo_messaging_zmq.rpc_poll_timeout)
if data is None:
return
reply_id, message_type, message_id, response = data

View File

@ -13,12 +13,16 @@
# under the License.
import logging
import retrying
import threading
import time
import itertools
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_updater
from oslo_messaging._i18n import _LW
zmq = zmq_async.import_zmq()
@ -26,75 +30,162 @@ zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)
class RoutingTable(object):
"""This class implements local routing-table cache
taken from matchmaker. Its purpose is to give the next routable
host id (remote DEALER's id) by request for specific target in
round-robin fashion.
"""
class RoutingTableAdaptor(object):
def __init__(self, conf, matchmaker):
def __init__(self, conf, matchmaker, listener_type):
self.conf = conf
self.matchmaker = matchmaker
self.routing_table = {}
self.routable_hosts = {}
self.listener_type = listener_type
self.routing_table = RoutingTable(conf)
self.routing_table_updater = RoutingTableUpdater(
conf, matchmaker, self.routing_table)
self.round_robin_targets = {}
def get_all_hosts(self, target):
self._update_routing_table(
target,
get_hosts=self.matchmaker.get_hosts_fanout,
get_hosts_retry=self.matchmaker.get_hosts_fanout_retry)
return self.routable_hosts.get(str(target), [])
def get_round_robin_host(self, target):
target_key = zmq_address.target_to_key(
target, zmq_names.socket_type_str(self.listener_type))
def get_routable_host(self, target):
self._update_routing_table(
target,
get_hosts=self.matchmaker.get_hosts,
get_hosts_retry=self.matchmaker.get_hosts_retry)
hosts_for_target = self.routable_hosts.get(str(target))
if not hosts_for_target:
# Matchmaker doesn't contain any target
return None
host = hosts_for_target.pop(0)
if not hosts_for_target:
self._renew_routable_hosts(target)
LOG.debug("Processing target %s for round-robin." % target_key)
if target_key not in self.round_robin_targets:
LOG.debug("Target %s is not in cache. Check matchmaker server."
% target_key)
hosts = self.matchmaker.get_hosts_retry(
target, zmq_names.socket_type_str(self.listener_type))
LOG.debug("Received hosts %s" % hosts)
self.routing_table.update_hosts(target_key, hosts)
self.round_robin_targets[target_key] = \
self.routing_table.get_hosts_round_robin(target_key)
rr_gen = self.round_robin_targets[target_key]
host = next(rr_gen)
LOG.debug("Host resolved for the current connection is %s" % host)
return host
def _is_tm_expired(self, tm):
return 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \
<= time.time() - tm
def get_fanout_hosts(self, target):
target_key = zmq_address.target_to_key(
target, zmq_names.socket_type_str(self.listener_type))
def _update_routing_table(self, target, get_hosts, get_hosts_retry):
routing_record = self.routing_table.get(str(target))
if routing_record is None:
self._fetch_hosts(target, get_hosts, get_hosts_retry)
self._renew_routable_hosts(target)
elif self._is_tm_expired(routing_record[1]):
self._fetch_hosts(target, get_hosts, get_hosts_retry)
LOG.debug("Processing target %s for fanout." % target_key)
def _fetch_hosts(self, target, get_hosts, get_hosts_retry):
key = str(target)
if key not in self.routing_table:
try:
hosts = get_hosts_retry(
target, zmq_names.socket_type_str(zmq.DEALER))
self.routing_table[key] = (hosts, time.time())
except retrying.RetryError:
LOG.warning(_LW("Matchmaker contains no hosts for target %s")
% key)
if not self.routing_table.contains(target_key):
LOG.debug("Target %s is not in cache. Check matchmaker server."
% target_key)
hosts = self.matchmaker.get_hosts_fanout_retry(
target, zmq_names.socket_type_str(self.listener_type))
LOG.debug("Received hosts %s" % hosts)
self.routing_table.update_hosts(target_key, hosts)
else:
try:
hosts = get_hosts(
target, zmq_names.socket_type_str(zmq.DEALER))
self.routing_table[key] = (hosts, time.time())
except zmq_matchmaker_base.MatchmakerUnavailable:
LOG.warning(_LW("Matchmaker contains no hosts for target %s")
% key)
LOG.debug("Target %s has been found in cache." % target_key)
return self.routing_table.get_hosts_fanout(target_key)
def cleanup(self):
self.routing_table_updater.cleanup()
class RoutingTable(object):
def __init__(self, conf):
self.conf = conf
self.targets = {}
self._lock = threading.Lock()
def register(self, target_key, host):
with self._lock:
if target_key in self.targets:
hosts, tm = self.targets[target_key]
if host not in hosts:
hosts.add(host)
self.targets[target_key] = (hosts, self._create_tm())
else:
self.targets[target_key] = ({host}, self._create_tm())
def get_targets(self):
with self._lock:
return list(self.targets.keys())
def unregister(self, target_key, host):
with self._lock:
hosts, tm = self.targets.get(target_key)
if hosts and host in hosts:
hosts.discard(host)
self.targets[target_key] = (hosts, self._create_tm())
def update_hosts(self, target_key, hosts_updated):
with self._lock:
if target_key in self.targets and not hosts_updated:
self.targets.pop(target_key)
return
hosts_current, _ = self.targets.get(target_key, (set(), None))
hosts_updated = set(hosts_updated)
has_differences = hosts_updated ^ hosts_current
if has_differences:
self.targets[target_key] = (hosts_updated, self._create_tm())
def get_hosts_round_robin(self, target_key):
while self._contains_hosts(target_key):
for host in self._get_hosts_rr(target_key):
yield host
def get_hosts_fanout(self, target_key):
hosts, _ = self._get_hosts(target_key)
for host in hosts:
yield host
def contains(self, target_key):
with self._lock:
return target_key in self.targets
def _get_hosts(self, target_key):
with self._lock:
hosts, tm = self.targets.get(target_key, ([], None))
hosts = list(hosts)
return hosts, tm
def _get_tm(self, target_key):
with self._lock:
_, tm = self.targets.get(target_key)
return tm
def _contains_hosts(self, target_key):
with self._lock:
return target_key in self.targets
def _is_target_changed(self, target_key, tm_orig):
return self._get_tm(target_key) != tm_orig
@staticmethod
def _create_tm():
return time.time()
def _get_hosts_rr(self, target_key):
hosts, tm_original = self._get_hosts(target_key)
for host in itertools.cycle(hosts):
if self._is_target_changed(target_key, tm_original):
raise StopIteration()
yield host
class RoutingTableUpdater(zmq_updater.UpdaterBase):
def __init__(self, conf, matchmaker, routing_table):
self.routing_table = routing_table
super(RoutingTableUpdater, self).__init__(
conf, matchmaker, self._update_routing_table,
conf.oslo_messaging_zmq.zmq_target_update)
def _update_routing_table(self):
target_keys = self.routing_table.get_targets()
def _renew_routable_hosts(self, target):
key = str(target)
try:
hosts, _ = self.routing_table[key]
self.routable_hosts[key] = list(hosts)
except KeyError:
self.routable_hosts[key] = []
for target_key in target_keys:
hosts = self.matchmaker.get_hosts_by_key(target_key)
if not hosts:
LOG.warning(_LW("Target %s has been removed") % target_key)
else:
self.routing_table.update_hosts(target_key, hosts)
LOG.debug("Updating routing table from the matchmaker. "
"%d target(s) updated %s." % (len(target_keys),
target_keys))
except zmq_matchmaker_base.MatchmakerUnavailable:
LOG.warning(_LW("Not updated. Matchmaker was not available."))

View File

@ -45,26 +45,29 @@ class SocketsManager(object):
def _key_from_target(target):
return target.topic if target.fanout else str(target)
def _get_hosts_and_track(self, socket, target):
self._get_hosts_and_connect(socket, target)
self._track_socket(socket, target)
def _get_hosts_and_connect(self, socket, target):
get_hosts = self.get_hosts_fanout if target.fanout else self.get_hosts
hosts = get_hosts(target)
self._connect_to_hosts(socket, target, hosts)
self._connect_to_hosts(socket, hosts)
def _track_socket(self, socket, target):
key = self._key_from_target(target)
self.outbound_sockets[key] = (socket, time.time())
def _connect_to_hosts(self, socket, target, hosts):
def _connect_to_hosts(self, socket, hosts):
for host in hosts:
socket.connect_to_host(host)
self._track_socket(socket, target)
def _check_for_new_hosts(self, target):
key = self._key_from_target(target)
socket, tm = self.outbound_sockets[key]
if 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \
<= time.time() - tm:
self._get_hosts_and_connect(socket, target)
self._get_hosts_and_track(socket, target)
return socket
def get_socket(self, target):
@ -74,7 +77,7 @@ class SocketsManager(object):
else:
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
self.socket_type, immediate=False)
self._get_hosts_and_connect(socket, target)
self._get_hosts_and_track(socket, target)
return socket
def get_socket_to_publishers(self, identity=None):

View File

@ -193,6 +193,10 @@ class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase):
def get_routers(self):
return self._get_hosts_by_key(_ROUTERS_KEY)
@redis_connection_warn
def get_hosts_by_key(self, key):
return self._get_hosts_by_key(key)
def _get_hosts_by_key(self, key):
return self._redis.smembers(key)

View File

@ -156,8 +156,9 @@ class RouterUpdater(zmq_updater.UpdaterBase):
self.publisher_address = publisher_address
self.fe_router_address = fe_router_address
self.be_router_address = be_router_address
super(RouterUpdater, self).__init__(conf, matchmaker,
self._update_records)
super(RouterUpdater, self).__init__(
conf, matchmaker, self._update_records,
conf.oslo_messaging_zmq.zmq_target_update)
def _update_records(self):
self.matchmaker.register_publisher(

View File

@ -112,8 +112,17 @@ class TargetUpdater(zmq_updater.UpdaterBase):
self.target = target
self.host = host
self.socket_type = socket_type
super(TargetUpdater, self).__init__(conf, matchmaker,
self._update_target)
self.conf = conf
self.matchmaker = matchmaker
self._sleep_for = conf.oslo_messaging_zmq.zmq_target_update
# NOTE(ozamiatin): Update target immediately not waiting
# for background executor to initialize.
self._update_target()
super(TargetUpdater, self).__init__(
conf, matchmaker, self._update_target,
conf.oslo_messaging_zmq.zmq_target_update)
def _update_target(self):
try:

View File

@ -87,7 +87,7 @@ zmq_opts = [
help='Use PUB/SUB pattern for fanout methods. '
'PUB/SUB always uses proxy.'),
cfg.BoolOpt('use_router_proxy', default=True,
cfg.BoolOpt('use_router_proxy', default=False,
deprecated_group='DEFAULT',
help='Use ROUTER remote proxy.'),

View File

@ -21,7 +21,7 @@ from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE, _LI
from oslo_messaging._i18n import _LE
from oslo_messaging import exceptions
from oslo_serialization.serializer import json_serializer
from oslo_serialization.serializer import msgpack_serializer
@ -174,7 +174,9 @@ class ZmqSocket(object):
return obj
def close(self, *args, **kwargs):
identity = self.handle.identity
self.handle.close(*args, **kwargs)
LOG.debug("Socket %s closed" % identity)
def connect_to_address(self, address):
if address in self.connections:
@ -182,8 +184,8 @@ class ZmqSocket(object):
stype = zmq_names.socket_type_str(self.socket_type)
sid = self.handle.identity
try:
LOG.info(_LI("Connecting %(stype)s socket %(sid)s to %(address)s"),
{"stype": stype, "sid": sid, "address": address})
LOG.debug("Connecting %(stype)s socket %(sid)s to %(address)s",
{"stype": stype, "sid": sid, "address": address})
self.connect(address)
except zmq.ZMQError as e:
LOG.error(_LE("Failed connecting %(stype)s-%(sid)s to "

View File

@ -27,12 +27,11 @@ zmq = zmq_async.import_zmq()
class UpdaterBase(object):
def __init__(self, conf, matchmaker, update_method):
def __init__(self, conf, matchmaker, update_method, sleep_for):
self.conf = conf
self.matchmaker = matchmaker
self.update_method = update_method
self._sleep_for = self.conf.oslo_messaging_zmq.zmq_target_update
self.update_method()
self._sleep_for = sleep_for
self.executor = zmq_async.get_executor(method=self._update_loop)
self.executor.execute()
@ -53,7 +52,8 @@ class ConnectionUpdater(UpdaterBase):
def __init__(self, conf, matchmaker, socket):
self.socket = socket
super(ConnectionUpdater, self).__init__(
conf, matchmaker, self._update_connection)
conf, matchmaker, self._update_connection,
conf.oslo_messaging_zmq.zmq_target_update)
@abc.abstractmethod
def _update_connection(self):

View File

@ -0,0 +1,80 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging.tests import utils as test_utils
zmq = zmq_async.import_zmq()
class TestRoutingTable(test_utils.BaseTestCase):
def setUp(self):
super(TestRoutingTable, self).setUp()
def test_get_next_while_origin_changed(self):
table = zmq_routing_table.RoutingTable(self.conf)
table.register("topic1.server1", "1")
table.register("topic1.server1", "2")
table.register("topic1.server1", "3")
rr_gen = table.get_hosts_round_robin("topic1.server1")
result = []
for i in range(3):
result.append(next(rr_gen))
self.assertEqual(3, len(result))
self.assertIn("1", result)
self.assertIn("2", result)
self.assertIn("3", result)
table.register("topic1.server1", "4")
table.register("topic1.server1", "5")
table.register("topic1.server1", "6")
result = []
for i in range(6):
result.append(next(rr_gen))
self.assertEqual(6, len(result))
self.assertIn("1", result)
self.assertIn("2", result)
self.assertIn("3", result)
self.assertIn("4", result)
self.assertIn("5", result)
self.assertIn("6", result)
def test_no_targets(self):
table = zmq_routing_table.RoutingTable(self.conf)
rr_gen = table.get_hosts_round_robin("topic1.server1")
result = []
for t in rr_gen:
result.append(t)
self.assertEqual(0, len(result))
def test_target_unchanged(self):
table = zmq_routing_table.RoutingTable(self.conf)
table.register("topic1.server1", "1")
rr_gen = table.get_hosts_round_robin("topic1.server1")
result = []
for i in range(3):
result.append(next(rr_gen))
self.assertEqual(["1", "1", "1"], result)

View File

@ -165,6 +165,7 @@ class CastTestCase(utils.SkipIfNoTransportURL):
client.append(text='stack')
client.add(increment=2)
client.add(increment=10)
time.sleep(0.3)
client.sync()
group.sync(1)
@ -205,6 +206,7 @@ class CastTestCase(utils.SkipIfNoTransportURL):
client.append(text='stack')
client.add(increment=2)
client.add(increment=10)
time.sleep(0.3)
client.sync()
group.sync(server='all')
for s in group.servers:

View File

@ -177,6 +177,7 @@ class RpcServerGroupFixture(fixtures.Fixture):
if server is None:
for i in range(len(self.servers)):
self.client(i).ping()
time.sleep(0.3)
else:
if server == 'all':
for s in self.servers: