Merge "[zmq] Routing table refactoring, dynamic direct connections"
This commit is contained in:
commit
7768b5d8e8
@ -34,15 +34,18 @@ class DealerPublisherBase(zmq_publisher_base.PublisherBase):
|
||||
|
||||
def __init__(self, conf, matchmaker, sender, receiver):
|
||||
sockets_manager = zmq_sockets_manager.SocketsManager(
|
||||
conf, matchmaker, zmq.ROUTER, zmq.DEALER
|
||||
)
|
||||
super(DealerPublisherBase, self).__init__(sockets_manager, sender,
|
||||
receiver)
|
||||
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
|
||||
self.socket_type = zmq.DEALER
|
||||
super(DealerPublisherBase, self).__init__(
|
||||
sockets_manager, sender, receiver)
|
||||
|
||||
def _check_received_data(self, reply_id, reply, request):
|
||||
assert isinstance(reply, zmq_response.Reply), "Reply expected!"
|
||||
|
||||
def _recv_reply(self, request, socket):
|
||||
def _finally_unregister(self, socket, request):
|
||||
self.receiver.untrack_request(request)
|
||||
|
||||
def receive_reply(self, socket, request):
|
||||
self.receiver.register_socket(socket)
|
||||
reply_future = \
|
||||
self.receiver.track_request(request)[zmq_names.REPLY_TYPE]
|
||||
@ -57,11 +60,10 @@ class DealerPublisherBase(zmq_publisher_base.PublisherBase):
|
||||
except futures.TimeoutError:
|
||||
self._raise_timeout(request)
|
||||
finally:
|
||||
self.receiver.untrack_request(request)
|
||||
self._finally_unregister(socket, request)
|
||||
|
||||
if reply.failure:
|
||||
raise rpc_common.deserialize_remote_exception(
|
||||
reply.failure, request.allowed_remote_exmods
|
||||
)
|
||||
reply.failure, request.allowed_remote_exmods)
|
||||
else:
|
||||
return reply.reply_body
|
||||
|
@ -12,16 +12,18 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
import retrying
|
||||
import logging
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_publisher_base
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -29,9 +31,32 @@ zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
"""DEALER-publisher using direct connections."""
|
||||
"""DEALER-publisher using direct connections.
|
||||
|
||||
Publishing directly to remote services assumes the following:
|
||||
- All direct connections are dynamic - so they live per message,
|
||||
thus each message send executes the following:
|
||||
* Open a new socket
|
||||
* Connect to some host got from the RoutingTable
|
||||
* Send message(s)
|
||||
* Close connection, destroy socket
|
||||
- RoutingTable/RoutingTableUpdater implements local cache of
|
||||
matchmaker (e.g. Redis) for target resolution to the list of
|
||||
available hosts. Cache updates in a background thread.
|
||||
- Caching of connections is not appropriate for directly connected
|
||||
OS services, because finally it results in a full-mesh of
|
||||
connections between services.
|
||||
- Yes we lose on performance opening and closing connections
|
||||
for each message, but that is done intentionally to implement
|
||||
the dynamic connections concept. The key thought here is to
|
||||
have minimum number of connected services at the moment.
|
||||
- Using the local RoutingTable cache is done to optimise access
|
||||
to the matchmaker so we don't call the matchmaker per each message
|
||||
"""
|
||||
|
||||
def __init__(self, conf, matchmaker):
|
||||
self.routing_table = zmq_routing_table.RoutingTableAdaptor(
|
||||
conf, matchmaker, zmq.ROUTER)
|
||||
sender = zmq_senders.RequestSenderDirect(conf)
|
||||
if conf.oslo_messaging_zmq.rpc_use_acks:
|
||||
receiver = zmq_receivers.AckAndReplyReceiverDirect(conf)
|
||||
@ -40,19 +65,34 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
super(DealerPublisherDirect, self).__init__(conf, matchmaker, sender,
|
||||
receiver)
|
||||
|
||||
def _connect_socket(self, request):
|
||||
try:
|
||||
return self.sockets_manager.get_socket(request.target)
|
||||
except retrying.RetryError:
|
||||
return None
|
||||
def _get_round_robin_host_connection(self, target, socket):
|
||||
host = self.routing_table.get_round_robin_host(target)
|
||||
socket.connect_to_host(host)
|
||||
|
||||
def _send_request(self, request):
|
||||
socket = self._connect_socket(request)
|
||||
if not socket:
|
||||
return None
|
||||
def _get_fanout_connection(self, target, socket):
|
||||
for host in self.routing_table.get_fanout_hosts(target):
|
||||
socket.connect_to_host(host)
|
||||
|
||||
def acquire_connection(self, request):
|
||||
socket = zmq_socket.ZmqSocket(self.conf, self.context,
|
||||
self.socket_type, immediate=False)
|
||||
if request.msg_type in zmq_names.DIRECT_TYPES:
|
||||
self._get_round_robin_host_connection(request.target, socket)
|
||||
elif request.msg_type in zmq_names.MULTISEND_TYPES:
|
||||
self._get_fanout_connection(request.target, socket)
|
||||
return socket
|
||||
|
||||
def _finally_unregister(self, socket, request):
|
||||
super(DealerPublisherDirect, self)._finally_unregister(socket, request)
|
||||
self.receiver.unregister_socket(socket)
|
||||
|
||||
def send_request(self, socket, request):
|
||||
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
||||
for _ in range(socket.connections_count()):
|
||||
self.sender.send(socket, request)
|
||||
else:
|
||||
self.sender.send(socket, request)
|
||||
return socket
|
||||
|
||||
def cleanup(self):
|
||||
self.routing_table.cleanup()
|
||||
super(DealerPublisherDirect, self).cleanup()
|
||||
|
@ -13,6 +13,7 @@
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import random
|
||||
import uuid
|
||||
|
||||
import six
|
||||
@ -22,11 +23,13 @@ from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
|
||||
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
||||
from oslo_messaging._i18n import _LW
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -46,8 +49,10 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
receiver)
|
||||
self.socket = self.sockets_manager.get_socket_to_publishers(
|
||||
self._generate_identity())
|
||||
self.routing_table = zmq_routing_table.RoutingTable(self.conf,
|
||||
self.matchmaker)
|
||||
|
||||
self.routing_table = zmq_routing_table.RoutingTableAdaptor(
|
||||
conf, matchmaker, zmq.DEALER)
|
||||
|
||||
self.connection_updater = \
|
||||
PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket)
|
||||
|
||||
@ -63,30 +68,24 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
|
||||
def _get_routing_keys(self, request):
|
||||
if request.msg_type in zmq_names.DIRECT_TYPES:
|
||||
return [self.routing_table.get_routable_host(request.target)]
|
||||
return [self.routing_table.get_round_robin_host(request.target)]
|
||||
else:
|
||||
return \
|
||||
[zmq_address.target_to_subscribe_filter(request.target)] \
|
||||
if self.conf.oslo_messaging_zmq.use_pub_sub else \
|
||||
self.routing_table.get_all_hosts(request.target)
|
||||
self.routing_table.get_fanout_hosts(request.target)
|
||||
|
||||
def _send_request(self, request):
|
||||
routing_keys = [routing_key
|
||||
for routing_key in self._get_routing_keys(request)
|
||||
if routing_key is not None]
|
||||
if not routing_keys:
|
||||
LOG.warning(_LW("Matchmaker contains no records for specified "
|
||||
"target %(target)s. Dropping message %(msg_id)s.")
|
||||
% {"target": request.target,
|
||||
"msg_id": request.message_id})
|
||||
return None
|
||||
for routing_key in routing_keys:
|
||||
request.routing_key = routing_key
|
||||
self.sender.send(self.socket, request)
|
||||
def acquire_connection(self, request):
|
||||
return self.socket
|
||||
|
||||
def send_request(self, socket, request):
|
||||
for routing_key in self._get_routing_keys(request):
|
||||
request.routing_key = routing_key
|
||||
self.sender.send(socket, request)
|
||||
|
||||
def cleanup(self):
|
||||
super(DealerPublisherProxy, self).cleanup()
|
||||
self.routing_table.cleanup()
|
||||
self.connection_updater.stop()
|
||||
self.socket.close()
|
||||
|
||||
@ -97,3 +96,49 @@ class PublisherConnectionUpdater(zmq_updater.ConnectionUpdater):
|
||||
publishers = self.matchmaker.get_publishers()
|
||||
for pub_address, router_address in publishers:
|
||||
self.socket.connect_to_host(router_address)
|
||||
|
||||
|
||||
class DealerPublisherProxyDynamic(
|
||||
zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
|
||||
def __init__(self, conf, matchmaker):
|
||||
self.publishers = set()
|
||||
self.updater = DynamicPublishersUpdater(conf, matchmaker,
|
||||
self.publishers)
|
||||
self.updater.update_publishers()
|
||||
sender = zmq_senders.RequestSenderProxy(conf)
|
||||
receiver = zmq_receivers.ReplyReceiverDirect(conf)
|
||||
super(DealerPublisherProxyDynamic, self).__init__(
|
||||
conf, matchmaker, sender, receiver)
|
||||
|
||||
def acquire_connection(self, request):
|
||||
socket = zmq_socket.ZmqSocket(self.conf, self.context,
|
||||
self.socket_type, immediate=False)
|
||||
if not self.publishers:
|
||||
raise zmq_matchmaker_base.MatchmakerUnavailable()
|
||||
socket.connect_to_host(random.choice(tuple(self.publishers)))
|
||||
return socket
|
||||
|
||||
def send_request(self, socket, request):
|
||||
assert request.msg_type in zmq_names.MULTISEND_TYPES
|
||||
request.routing_key = zmq_address.target_to_subscribe_filter(
|
||||
request.target)
|
||||
self.sender.send(socket, request)
|
||||
|
||||
def cleanup(self):
|
||||
super(DealerPublisherProxyDynamic, self).cleanup()
|
||||
self.updater.cleanup()
|
||||
|
||||
|
||||
class DynamicPublishersUpdater(zmq_updater.UpdaterBase):
|
||||
|
||||
def __init__(self, conf, matchmaker, publishers):
|
||||
super(DynamicPublishersUpdater, self).__init__(
|
||||
conf, matchmaker, self.update_publishers,
|
||||
sleep_for=conf.oslo_messaging_zmq.zmq_target_update
|
||||
)
|
||||
self.publishers = publishers
|
||||
|
||||
def update_publishers(self):
|
||||
for _, pub_frontend in self.matchmaker.get_publishers():
|
||||
self.publishers.add(pub_frontend)
|
||||
|
@ -18,32 +18,13 @@ import logging
|
||||
import six
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers import common as rpc_common
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._i18n import _LE
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class UnsupportedSendPattern(rpc_common.RPCException):
|
||||
|
||||
"""Exception to raise from publishers in case of unsupported
|
||||
sending pattern called.
|
||||
"""
|
||||
|
||||
def __init__(self, pattern_name):
|
||||
"""Construct exception object
|
||||
|
||||
:param pattern_name: Message type name from zmq_names
|
||||
:type pattern_name: str
|
||||
"""
|
||||
errmsg = _LE("Sending pattern %s is unsupported.") % pattern_name
|
||||
super(UnsupportedSendPattern, self).__init__(errmsg)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class PublisherBase(object):
|
||||
|
||||
@ -68,16 +49,40 @@ class PublisherBase(object):
|
||||
:param receiver: reply receiver object
|
||||
:type receiver: zmq_receivers.ReplyReceiver
|
||||
"""
|
||||
self.context = zmq.Context()
|
||||
self.sockets_manager = sockets_manager
|
||||
self.conf = sockets_manager.conf
|
||||
self.matchmaker = sockets_manager.matchmaker
|
||||
self.sender = sender
|
||||
self.receiver = receiver
|
||||
|
||||
@staticmethod
|
||||
def _check_message_pattern(expected, actual):
|
||||
if expected != actual:
|
||||
raise UnsupportedSendPattern(zmq_names.message_type_str(actual))
|
||||
@abc.abstractmethod
|
||||
def acquire_connection(self, request):
|
||||
"""Get socket to publish request on it
|
||||
|
||||
:param request: request object
|
||||
:type senders: zmq_request.Request
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_request(self, socket, request):
|
||||
"""Publish request on a socket
|
||||
|
||||
:param socket: socket object to publish request on
|
||||
:type socket: zmq_socket.ZmqSocket
|
||||
:param request: request object
|
||||
:type senders: zmq_request.Request
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def receive_reply(self, socket, request):
|
||||
"""Wait for a reply via the socket used for sending the request.
|
||||
|
||||
:param socket: socket object to receive reply from
|
||||
:type socket: zmq_socket.ZmqSocket
|
||||
:param request: request object
|
||||
:type senders: zmq_request.Request
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def _raise_timeout(request):
|
||||
@ -86,37 +91,6 @@ class PublisherBase(object):
|
||||
{"tout": request.timeout, "msg_id": request.message_id}
|
||||
)
|
||||
|
||||
@abc.abstractmethod
|
||||
def _send_request(self, request):
|
||||
"""Send the request and return a socket used for that.
|
||||
Return value of None means some failure (e.g. connection
|
||||
can't be established, etc).
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def _recv_reply(self, request, socket):
|
||||
"""Wait for a reply via the socket used for sending the request."""
|
||||
|
||||
def send_call(self, request):
|
||||
self._check_message_pattern(zmq_names.CALL_TYPE, request.msg_type)
|
||||
socket = self._send_request(request)
|
||||
if not socket:
|
||||
raise self._raise_timeout(request)
|
||||
return self._recv_reply(request, socket)
|
||||
|
||||
def send_cast(self, request):
|
||||
self._check_message_pattern(zmq_names.CAST_TYPE, request.msg_type)
|
||||
self._send_request(request)
|
||||
|
||||
def send_fanout(self, request):
|
||||
self._check_message_pattern(zmq_names.CAST_FANOUT_TYPE,
|
||||
request.msg_type)
|
||||
self._send_request(request)
|
||||
|
||||
def send_notify(self, request):
|
||||
self._check_message_pattern(zmq_names.NOTIFY_TYPE, request.msg_type)
|
||||
self._send_request(request)
|
||||
|
||||
def cleanup(self):
|
||||
"""Cleanup publisher. Close allocated connections."""
|
||||
self.receiver.stop()
|
||||
|
@ -15,6 +15,7 @@
|
||||
from concurrent import futures
|
||||
import logging
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_publisher_manager
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._i18n import _LE, _LW
|
||||
@ -24,38 +25,10 @@ LOG = logging.getLogger(__name__)
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class AckManagerBase(object):
|
||||
class AckManager(zmq_publisher_manager.PublisherManagerBase):
|
||||
|
||||
def __init__(self, publisher):
|
||||
self.publisher = publisher
|
||||
self.conf = publisher.conf
|
||||
self.sender = publisher.sender
|
||||
self.receiver = publisher.receiver
|
||||
|
||||
def send_call(self, request):
|
||||
return self.publisher.send_call(request)
|
||||
|
||||
def send_cast(self, request):
|
||||
self.publisher.send_cast(request)
|
||||
|
||||
def send_fanout(self, request):
|
||||
self.publisher.send_fanout(request)
|
||||
|
||||
def send_notify(self, request):
|
||||
self.publisher.send_notify(request)
|
||||
|
||||
def cleanup(self):
|
||||
self.publisher.cleanup()
|
||||
|
||||
|
||||
class AckManagerDirect(AckManagerBase):
|
||||
pass
|
||||
|
||||
|
||||
class AckManagerProxy(AckManagerBase):
|
||||
|
||||
def __init__(self, publisher):
|
||||
super(AckManagerProxy, self).__init__(publisher)
|
||||
super(AckManager, self).__init__(publisher)
|
||||
self._pool = zmq_async.get_pool(
|
||||
size=self.conf.oslo_messaging_zmq.rpc_thread_pool_size
|
||||
)
|
||||
@ -99,33 +72,39 @@ class AckManagerProxy(AckManagerBase):
|
||||
if request.msg_type != zmq_names.CALL_TYPE:
|
||||
self.receiver.untrack_request(request)
|
||||
|
||||
def _send_request_and_get_ack_future(self, request):
|
||||
socket = self.publisher._send_request(request)
|
||||
if not socket:
|
||||
return None
|
||||
def _schedule_request_for_ack(self, request):
|
||||
socket = self.publisher.acquire_connection(request)
|
||||
self.publisher.send_request(socket, request)
|
||||
self.receiver.register_socket(socket)
|
||||
ack_future = self.receiver.track_request(request)[zmq_names.ACK_TYPE]
|
||||
futures_by_type = self.receiver.track_request(request)
|
||||
ack_future = futures_by_type[zmq_names.ACK_TYPE]
|
||||
ack_future.request = request
|
||||
ack_future.socket = socket
|
||||
return ack_future
|
||||
|
||||
@zmq_publisher_manager.target_not_found_timeout
|
||||
def send_call(self, request):
|
||||
ack_future = self._send_request_and_get_ack_future(request)
|
||||
if not ack_future:
|
||||
self.publisher._raise_timeout(request)
|
||||
self._pool.submit(self._wait_for_ack, ack_future)
|
||||
try:
|
||||
return self.publisher._recv_reply(request, ack_future.socket)
|
||||
ack_future = self._schedule_request_for_ack(request)
|
||||
self._pool.submit(self._wait_for_ack, ack_future)
|
||||
return self.publisher.receive_reply(ack_future.socket, request)
|
||||
finally:
|
||||
if not ack_future.done():
|
||||
ack_future.set_result((None, None))
|
||||
|
||||
@zmq_publisher_manager.target_not_found_warn
|
||||
def send_cast(self, request):
|
||||
ack_future = self._send_request_and_get_ack_future(request)
|
||||
if not ack_future:
|
||||
return
|
||||
ack_future = self._schedule_request_for_ack(request)
|
||||
self._pool.submit(self._wait_for_ack, ack_future)
|
||||
|
||||
@zmq_publisher_manager.target_not_found_warn
|
||||
def _send_request(self, request):
|
||||
socket = self.publisher.acquire_connection(request)
|
||||
self.publisher.send_request(socket, request)
|
||||
|
||||
def cleanup(self):
|
||||
self._pool.shutdown(wait=True)
|
||||
super(AckManagerProxy, self).cleanup()
|
||||
super(AckManager, self).cleanup()
|
||||
|
||||
send_fanout = _send_request
|
||||
send_notify = _send_request
|
||||
|
@ -14,11 +14,6 @@
|
||||
|
||||
|
||||
from oslo_messaging._drivers import common
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_publisher_direct
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_publisher_proxy
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_ack_manager
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
@ -44,17 +39,9 @@ class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase):
|
||||
conf.oslo_messaging_zmq.use_pub_sub:
|
||||
raise WrongClientException()
|
||||
|
||||
publisher_direct = self.create_publisher(
|
||||
conf, matchmaker,
|
||||
zmq_dealer_publisher_direct.DealerPublisherDirect,
|
||||
zmq_ack_manager.AckManagerDirect
|
||||
)
|
||||
|
||||
publisher_proxy = self.create_publisher(
|
||||
conf, matchmaker,
|
||||
zmq_dealer_publisher_proxy.DealerPublisherProxy,
|
||||
zmq_ack_manager.AckManagerProxy
|
||||
)
|
||||
publisher_direct = self._create_publisher_direct(conf, matchmaker)
|
||||
publisher_proxy = self._create_publisher_proxy_dynamic(conf,
|
||||
matchmaker)
|
||||
|
||||
super(ZmqClientMixDirectPubSub, self).__init__(
|
||||
conf, matchmaker, allowed_remote_exmods,
|
||||
@ -80,15 +67,10 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase):
|
||||
conf.oslo_messaging_zmq.use_router_proxy:
|
||||
raise WrongClientException()
|
||||
|
||||
publisher = self.create_publisher(
|
||||
conf, matchmaker,
|
||||
zmq_dealer_publisher_direct.DealerPublisherDirect,
|
||||
zmq_ack_manager.AckManagerDirect
|
||||
)
|
||||
|
||||
super(ZmqClientDirect, self).__init__(
|
||||
conf, matchmaker, allowed_remote_exmods,
|
||||
publishers={"default": publisher}
|
||||
publishers={"default": self._create_publisher_direct(
|
||||
conf, matchmaker)}
|
||||
)
|
||||
|
||||
|
||||
@ -107,13 +89,8 @@ class ZmqClientProxy(zmq_client_base.ZmqClientBase):
|
||||
if not conf.oslo_messaging_zmq.use_router_proxy:
|
||||
raise WrongClientException()
|
||||
|
||||
publisher = self.create_publisher(
|
||||
conf, matchmaker,
|
||||
zmq_dealer_publisher_proxy.DealerPublisherProxy,
|
||||
zmq_ack_manager.AckManagerProxy
|
||||
)
|
||||
|
||||
super(ZmqClientProxy, self).__init__(
|
||||
conf, matchmaker, allowed_remote_exmods,
|
||||
publishers={"default": publisher}
|
||||
publishers={"default": self._create_publisher_proxy(
|
||||
conf, matchmaker)}
|
||||
)
|
||||
|
@ -12,6 +12,12 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_publisher_direct
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_publisher_proxy
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_ack_manager
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_publisher_manager
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_request
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
@ -37,13 +43,6 @@ class ZmqClientBase(object):
|
||||
self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE,
|
||||
publishers["default"])
|
||||
|
||||
@staticmethod
|
||||
def create_publisher(conf, matchmaker, publisher_cls, ack_manager_cls):
|
||||
publisher = publisher_cls(conf, matchmaker)
|
||||
if conf.oslo_messaging_zmq.rpc_use_acks:
|
||||
publisher = ack_manager_cls(publisher)
|
||||
return publisher
|
||||
|
||||
def send_call(self, target, context, message, timeout=None, retry=None):
|
||||
request = zmq_request.CallRequest(
|
||||
target, context=context, message=message, retry=retry,
|
||||
@ -70,6 +69,27 @@ class ZmqClientBase(object):
|
||||
)
|
||||
self.notify_publisher.send_notify(request)
|
||||
|
||||
@staticmethod
|
||||
def _create_publisher_direct(conf, matchmaker):
|
||||
publisher_direct = zmq_dealer_publisher_direct.DealerPublisherDirect(
|
||||
conf, matchmaker)
|
||||
return zmq_publisher_manager.PublisherManagerDynamic(
|
||||
publisher_direct)
|
||||
|
||||
@staticmethod
|
||||
def _create_publisher_proxy(conf, matchmaker):
|
||||
publisher_proxy = zmq_dealer_publisher_proxy.DealerPublisherProxy(
|
||||
conf, matchmaker)
|
||||
return zmq_ack_manager.AckManager(publisher_proxy) \
|
||||
if conf.oslo_messaging_zmq.rpc_use_acks else \
|
||||
zmq_publisher_manager.PublisherManagerStatic(publisher_proxy)
|
||||
|
||||
@staticmethod
|
||||
def _create_publisher_proxy_dynamic(conf, matchmaker):
|
||||
return zmq_publisher_manager.PublisherManagerDynamic(
|
||||
zmq_dealer_publisher_proxy.DealerPublisherProxyDynamic(
|
||||
conf, matchmaker))
|
||||
|
||||
def cleanup(self):
|
||||
cleaned = set()
|
||||
for publisher in self.publishers.values():
|
||||
|
@ -0,0 +1,143 @@
|
||||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import contextlib
|
||||
import logging
|
||||
|
||||
import retrying
|
||||
import six
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._i18n import _LW
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
def target_not_found_warn(func):
|
||||
def _target_not_found_warn(self, request, *args, **kwargs):
|
||||
try:
|
||||
return func(self, request, *args, **kwargs)
|
||||
except (zmq_matchmaker_base.MatchmakerUnavailable,
|
||||
retrying.RetryError):
|
||||
LOG.warning(_LW("Matchmaker contains no records for specified "
|
||||
"target %(target)s. Dropping message %(msg_id)s.")
|
||||
% {"target": request.target,
|
||||
"msg_id": request.message_id})
|
||||
return _target_not_found_warn
|
||||
|
||||
|
||||
def target_not_found_timeout(func):
|
||||
def _target_not_found_timeout(self, request, *args, **kwargs):
|
||||
try:
|
||||
return func(self, request, *args, **kwargs)
|
||||
except (zmq_matchmaker_base.MatchmakerUnavailable,
|
||||
retrying.RetryError):
|
||||
self.publisher._raise_timeout(request)
|
||||
return _target_not_found_timeout
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class PublisherManagerBase(object):
|
||||
|
||||
"""Abstract publisher manager class
|
||||
|
||||
Publisher knows how to establish connection, how to send message,
|
||||
and how to receive reply. PublisherManager coordinates all these steps
|
||||
regarding retrying logic in AckManager implementations
|
||||
"""
|
||||
|
||||
def __init__(self, publisher):
|
||||
self.publisher = publisher
|
||||
self.conf = publisher.conf
|
||||
self.sender = publisher.sender
|
||||
self.receiver = publisher.receiver
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_call(self, request):
|
||||
"""Send call request
|
||||
|
||||
:param request: request object
|
||||
:type senders: zmq_request.Request
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_cast(self, request):
|
||||
"""Send call request
|
||||
|
||||
:param request: request object
|
||||
:type senders: zmq_request.Request
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_fanout(self, request):
|
||||
"""Send call request
|
||||
|
||||
:param request: request object
|
||||
:type senders: zmq_request.Request
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_notify(self, request):
|
||||
"""Send call request
|
||||
|
||||
:param request: request object
|
||||
:type senders: zmq_request.Request
|
||||
"""
|
||||
|
||||
def cleanup(self):
|
||||
self.publisher.cleanup()
|
||||
|
||||
|
||||
class PublisherManagerDynamic(PublisherManagerBase):
|
||||
|
||||
@target_not_found_timeout
|
||||
def send_call(self, request):
|
||||
with contextlib.closing(
|
||||
self.publisher.acquire_connection(request)) as socket:
|
||||
self.publisher.send_request(socket, request)
|
||||
reply = self.publisher.receive_reply(socket, request)
|
||||
return reply
|
||||
|
||||
@target_not_found_warn
|
||||
def _send(self, request):
|
||||
with contextlib.closing(self.publisher.acquire_connection(request)) \
|
||||
as socket:
|
||||
self.publisher.send_request(socket, request)
|
||||
|
||||
send_cast = _send
|
||||
send_fanout = _send
|
||||
send_notify = _send
|
||||
|
||||
|
||||
class PublisherManagerStatic(PublisherManagerBase):
|
||||
|
||||
@target_not_found_timeout
|
||||
def send_call(self, request):
|
||||
socket = self.publisher.acquire_connection(request)
|
||||
self.publisher.send_request(socket, request)
|
||||
reply = self.publisher.receive_reply(socket, request)
|
||||
return reply
|
||||
|
||||
@target_not_found_warn
|
||||
def _send(self, request):
|
||||
socket = self.publisher.acquire_connection(request)
|
||||
self.publisher.send_request(socket, request)
|
||||
|
||||
send_cast = _send
|
||||
send_fanout = _send
|
||||
send_notify = _send
|
@ -95,8 +95,7 @@ class ReceiverBase(object):
|
||||
|
||||
def _run_loop(self):
|
||||
data, socket = self._poller.poll(
|
||||
timeout=self.conf.oslo_messaging_zmq.rpc_poll_timeout
|
||||
)
|
||||
timeout=self.conf.oslo_messaging_zmq.rpc_poll_timeout)
|
||||
if data is None:
|
||||
return
|
||||
reply_id, message_type, message_id, response = data
|
||||
|
@ -13,12 +13,16 @@
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import retrying
|
||||
import threading
|
||||
import time
|
||||
|
||||
import itertools
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
||||
from oslo_messaging._i18n import _LW
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
@ -26,75 +30,162 @@ zmq = zmq_async.import_zmq()
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RoutingTable(object):
|
||||
"""This class implements local routing-table cache
|
||||
taken from matchmaker. Its purpose is to give the next routable
|
||||
host id (remote DEALER's id) by request for specific target in
|
||||
round-robin fashion.
|
||||
"""
|
||||
class RoutingTableAdaptor(object):
|
||||
|
||||
def __init__(self, conf, matchmaker):
|
||||
def __init__(self, conf, matchmaker, listener_type):
|
||||
self.conf = conf
|
||||
self.matchmaker = matchmaker
|
||||
self.routing_table = {}
|
||||
self.routable_hosts = {}
|
||||
self.listener_type = listener_type
|
||||
self.routing_table = RoutingTable(conf)
|
||||
self.routing_table_updater = RoutingTableUpdater(
|
||||
conf, matchmaker, self.routing_table)
|
||||
self.round_robin_targets = {}
|
||||
|
||||
def get_all_hosts(self, target):
|
||||
self._update_routing_table(
|
||||
target,
|
||||
get_hosts=self.matchmaker.get_hosts_fanout,
|
||||
get_hosts_retry=self.matchmaker.get_hosts_fanout_retry)
|
||||
return self.routable_hosts.get(str(target), [])
|
||||
def get_round_robin_host(self, target):
|
||||
target_key = zmq_address.target_to_key(
|
||||
target, zmq_names.socket_type_str(self.listener_type))
|
||||
|
||||
def get_routable_host(self, target):
|
||||
self._update_routing_table(
|
||||
target,
|
||||
get_hosts=self.matchmaker.get_hosts,
|
||||
get_hosts_retry=self.matchmaker.get_hosts_retry)
|
||||
hosts_for_target = self.routable_hosts.get(str(target))
|
||||
if not hosts_for_target:
|
||||
# Matchmaker doesn't contain any target
|
||||
return None
|
||||
host = hosts_for_target.pop(0)
|
||||
if not hosts_for_target:
|
||||
self._renew_routable_hosts(target)
|
||||
LOG.debug("Processing target %s for round-robin." % target_key)
|
||||
|
||||
if target_key not in self.round_robin_targets:
|
||||
LOG.debug("Target %s is not in cache. Check matchmaker server."
|
||||
% target_key)
|
||||
hosts = self.matchmaker.get_hosts_retry(
|
||||
target, zmq_names.socket_type_str(self.listener_type))
|
||||
LOG.debug("Received hosts %s" % hosts)
|
||||
self.routing_table.update_hosts(target_key, hosts)
|
||||
self.round_robin_targets[target_key] = \
|
||||
self.routing_table.get_hosts_round_robin(target_key)
|
||||
|
||||
rr_gen = self.round_robin_targets[target_key]
|
||||
host = next(rr_gen)
|
||||
LOG.debug("Host resolved for the current connection is %s" % host)
|
||||
return host
|
||||
|
||||
def _is_tm_expired(self, tm):
|
||||
return 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \
|
||||
<= time.time() - tm
|
||||
def get_fanout_hosts(self, target):
|
||||
target_key = zmq_address.target_to_key(
|
||||
target, zmq_names.socket_type_str(self.listener_type))
|
||||
|
||||
def _update_routing_table(self, target, get_hosts, get_hosts_retry):
|
||||
routing_record = self.routing_table.get(str(target))
|
||||
if routing_record is None:
|
||||
self._fetch_hosts(target, get_hosts, get_hosts_retry)
|
||||
self._renew_routable_hosts(target)
|
||||
elif self._is_tm_expired(routing_record[1]):
|
||||
self._fetch_hosts(target, get_hosts, get_hosts_retry)
|
||||
LOG.debug("Processing target %s for fanout." % target_key)
|
||||
|
||||
def _fetch_hosts(self, target, get_hosts, get_hosts_retry):
|
||||
key = str(target)
|
||||
if key not in self.routing_table:
|
||||
try:
|
||||
hosts = get_hosts_retry(
|
||||
target, zmq_names.socket_type_str(zmq.DEALER))
|
||||
self.routing_table[key] = (hosts, time.time())
|
||||
except retrying.RetryError:
|
||||
LOG.warning(_LW("Matchmaker contains no hosts for target %s")
|
||||
% key)
|
||||
if not self.routing_table.contains(target_key):
|
||||
LOG.debug("Target %s is not in cache. Check matchmaker server."
|
||||
% target_key)
|
||||
hosts = self.matchmaker.get_hosts_fanout_retry(
|
||||
target, zmq_names.socket_type_str(self.listener_type))
|
||||
LOG.debug("Received hosts %s" % hosts)
|
||||
self.routing_table.update_hosts(target_key, hosts)
|
||||
else:
|
||||
try:
|
||||
hosts = get_hosts(
|
||||
target, zmq_names.socket_type_str(zmq.DEALER))
|
||||
self.routing_table[key] = (hosts, time.time())
|
||||
except zmq_matchmaker_base.MatchmakerUnavailable:
|
||||
LOG.warning(_LW("Matchmaker contains no hosts for target %s")
|
||||
% key)
|
||||
LOG.debug("Target %s has been found in cache." % target_key)
|
||||
return self.routing_table.get_hosts_fanout(target_key)
|
||||
|
||||
def cleanup(self):
|
||||
self.routing_table_updater.cleanup()
|
||||
|
||||
|
||||
class RoutingTable(object):
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
self.targets = {}
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def register(self, target_key, host):
|
||||
with self._lock:
|
||||
if target_key in self.targets:
|
||||
hosts, tm = self.targets[target_key]
|
||||
if host not in hosts:
|
||||
hosts.add(host)
|
||||
self.targets[target_key] = (hosts, self._create_tm())
|
||||
else:
|
||||
self.targets[target_key] = ({host}, self._create_tm())
|
||||
|
||||
def get_targets(self):
|
||||
with self._lock:
|
||||
return list(self.targets.keys())
|
||||
|
||||
def unregister(self, target_key, host):
|
||||
with self._lock:
|
||||
hosts, tm = self.targets.get(target_key)
|
||||
if hosts and host in hosts:
|
||||
hosts.discard(host)
|
||||
self.targets[target_key] = (hosts, self._create_tm())
|
||||
|
||||
def update_hosts(self, target_key, hosts_updated):
|
||||
with self._lock:
|
||||
if target_key in self.targets and not hosts_updated:
|
||||
self.targets.pop(target_key)
|
||||
return
|
||||
hosts_current, _ = self.targets.get(target_key, (set(), None))
|
||||
hosts_updated = set(hosts_updated)
|
||||
has_differences = hosts_updated ^ hosts_current
|
||||
if has_differences:
|
||||
self.targets[target_key] = (hosts_updated, self._create_tm())
|
||||
|
||||
def get_hosts_round_robin(self, target_key):
|
||||
while self._contains_hosts(target_key):
|
||||
for host in self._get_hosts_rr(target_key):
|
||||
yield host
|
||||
|
||||
def get_hosts_fanout(self, target_key):
|
||||
hosts, _ = self._get_hosts(target_key)
|
||||
for host in hosts:
|
||||
yield host
|
||||
|
||||
def contains(self, target_key):
|
||||
with self._lock:
|
||||
return target_key in self.targets
|
||||
|
||||
def _get_hosts(self, target_key):
|
||||
with self._lock:
|
||||
hosts, tm = self.targets.get(target_key, ([], None))
|
||||
hosts = list(hosts)
|
||||
return hosts, tm
|
||||
|
||||
def _get_tm(self, target_key):
|
||||
with self._lock:
|
||||
_, tm = self.targets.get(target_key)
|
||||
return tm
|
||||
|
||||
def _contains_hosts(self, target_key):
|
||||
with self._lock:
|
||||
return target_key in self.targets
|
||||
|
||||
def _is_target_changed(self, target_key, tm_orig):
|
||||
return self._get_tm(target_key) != tm_orig
|
||||
|
||||
@staticmethod
|
||||
def _create_tm():
|
||||
return time.time()
|
||||
|
||||
def _get_hosts_rr(self, target_key):
|
||||
hosts, tm_original = self._get_hosts(target_key)
|
||||
for host in itertools.cycle(hosts):
|
||||
if self._is_target_changed(target_key, tm_original):
|
||||
raise StopIteration()
|
||||
yield host
|
||||
|
||||
|
||||
class RoutingTableUpdater(zmq_updater.UpdaterBase):
|
||||
|
||||
def __init__(self, conf, matchmaker, routing_table):
|
||||
self.routing_table = routing_table
|
||||
super(RoutingTableUpdater, self).__init__(
|
||||
conf, matchmaker, self._update_routing_table,
|
||||
conf.oslo_messaging_zmq.zmq_target_update)
|
||||
|
||||
def _update_routing_table(self):
|
||||
target_keys = self.routing_table.get_targets()
|
||||
|
||||
def _renew_routable_hosts(self, target):
|
||||
key = str(target)
|
||||
try:
|
||||
hosts, _ = self.routing_table[key]
|
||||
self.routable_hosts[key] = list(hosts)
|
||||
except KeyError:
|
||||
self.routable_hosts[key] = []
|
||||
for target_key in target_keys:
|
||||
hosts = self.matchmaker.get_hosts_by_key(target_key)
|
||||
if not hosts:
|
||||
LOG.warning(_LW("Target %s has been removed") % target_key)
|
||||
else:
|
||||
self.routing_table.update_hosts(target_key, hosts)
|
||||
LOG.debug("Updating routing table from the matchmaker. "
|
||||
"%d target(s) updated %s." % (len(target_keys),
|
||||
target_keys))
|
||||
except zmq_matchmaker_base.MatchmakerUnavailable:
|
||||
LOG.warning(_LW("Not updated. Matchmaker was not available."))
|
||||
|
@ -45,26 +45,29 @@ class SocketsManager(object):
|
||||
def _key_from_target(target):
|
||||
return target.topic if target.fanout else str(target)
|
||||
|
||||
def _get_hosts_and_track(self, socket, target):
|
||||
self._get_hosts_and_connect(socket, target)
|
||||
self._track_socket(socket, target)
|
||||
|
||||
def _get_hosts_and_connect(self, socket, target):
|
||||
get_hosts = self.get_hosts_fanout if target.fanout else self.get_hosts
|
||||
hosts = get_hosts(target)
|
||||
self._connect_to_hosts(socket, target, hosts)
|
||||
self._connect_to_hosts(socket, hosts)
|
||||
|
||||
def _track_socket(self, socket, target):
|
||||
key = self._key_from_target(target)
|
||||
self.outbound_sockets[key] = (socket, time.time())
|
||||
|
||||
def _connect_to_hosts(self, socket, target, hosts):
|
||||
def _connect_to_hosts(self, socket, hosts):
|
||||
for host in hosts:
|
||||
socket.connect_to_host(host)
|
||||
self._track_socket(socket, target)
|
||||
|
||||
def _check_for_new_hosts(self, target):
|
||||
key = self._key_from_target(target)
|
||||
socket, tm = self.outbound_sockets[key]
|
||||
if 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \
|
||||
<= time.time() - tm:
|
||||
self._get_hosts_and_connect(socket, target)
|
||||
self._get_hosts_and_track(socket, target)
|
||||
return socket
|
||||
|
||||
def get_socket(self, target):
|
||||
@ -74,7 +77,7 @@ class SocketsManager(object):
|
||||
else:
|
||||
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
|
||||
self.socket_type, immediate=False)
|
||||
self._get_hosts_and_connect(socket, target)
|
||||
self._get_hosts_and_track(socket, target)
|
||||
return socket
|
||||
|
||||
def get_socket_to_publishers(self, identity=None):
|
||||
|
@ -195,6 +195,10 @@ class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase):
|
||||
def get_routers(self):
|
||||
return self._get_hosts_by_key(_ROUTERS_KEY)
|
||||
|
||||
@redis_connection_warn
|
||||
def get_hosts_by_key(self, key):
|
||||
return self._get_hosts_by_key(key)
|
||||
|
||||
def _get_hosts_by_key(self, key):
|
||||
return self._redis.smembers(key)
|
||||
|
||||
|
@ -156,8 +156,9 @@ class RouterUpdater(zmq_updater.UpdaterBase):
|
||||
self.publisher_address = publisher_address
|
||||
self.fe_router_address = fe_router_address
|
||||
self.be_router_address = be_router_address
|
||||
super(RouterUpdater, self).__init__(conf, matchmaker,
|
||||
self._update_records)
|
||||
super(RouterUpdater, self).__init__(
|
||||
conf, matchmaker, self._update_records,
|
||||
conf.oslo_messaging_zmq.zmq_target_update)
|
||||
|
||||
def _update_records(self):
|
||||
self.matchmaker.register_publisher(
|
||||
|
@ -112,8 +112,17 @@ class TargetUpdater(zmq_updater.UpdaterBase):
|
||||
self.target = target
|
||||
self.host = host
|
||||
self.socket_type = socket_type
|
||||
super(TargetUpdater, self).__init__(conf, matchmaker,
|
||||
self._update_target)
|
||||
self.conf = conf
|
||||
self.matchmaker = matchmaker
|
||||
self._sleep_for = conf.oslo_messaging_zmq.zmq_target_update
|
||||
|
||||
# NOTE(ozamiatin): Update target immediately not waiting
|
||||
# for background executor to initialize.
|
||||
self._update_target()
|
||||
|
||||
super(TargetUpdater, self).__init__(
|
||||
conf, matchmaker, self._update_target,
|
||||
conf.oslo_messaging_zmq.zmq_target_update)
|
||||
|
||||
def _update_target(self):
|
||||
try:
|
||||
|
@ -87,7 +87,7 @@ zmq_opts = [
|
||||
help='Use PUB/SUB pattern for fanout methods. '
|
||||
'PUB/SUB always uses proxy.'),
|
||||
|
||||
cfg.BoolOpt('use_router_proxy', default=True,
|
||||
cfg.BoolOpt('use_router_proxy', default=False,
|
||||
deprecated_group='DEFAULT',
|
||||
help='Use ROUTER remote proxy.'),
|
||||
|
||||
|
@ -21,7 +21,7 @@ from oslo_messaging._drivers import common as rpc_common
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._i18n import _LE, _LI
|
||||
from oslo_messaging._i18n import _LE
|
||||
from oslo_messaging import exceptions
|
||||
from oslo_serialization.serializer import json_serializer
|
||||
from oslo_serialization.serializer import msgpack_serializer
|
||||
@ -174,7 +174,9 @@ class ZmqSocket(object):
|
||||
return obj
|
||||
|
||||
def close(self, *args, **kwargs):
|
||||
identity = self.handle.identity
|
||||
self.handle.close(*args, **kwargs)
|
||||
LOG.debug("Socket %s closed" % identity)
|
||||
|
||||
def connect_to_address(self, address):
|
||||
if address in self.connections:
|
||||
@ -182,8 +184,8 @@ class ZmqSocket(object):
|
||||
stype = zmq_names.socket_type_str(self.socket_type)
|
||||
sid = self.handle.identity
|
||||
try:
|
||||
LOG.info(_LI("Connecting %(stype)s socket %(sid)s to %(address)s"),
|
||||
{"stype": stype, "sid": sid, "address": address})
|
||||
LOG.debug("Connecting %(stype)s socket %(sid)s to %(address)s",
|
||||
{"stype": stype, "sid": sid, "address": address})
|
||||
self.connect(address)
|
||||
except zmq.ZMQError as e:
|
||||
LOG.error(_LE("Failed connecting %(stype)s-%(sid)s to "
|
||||
|
@ -27,12 +27,11 @@ zmq = zmq_async.import_zmq()
|
||||
|
||||
class UpdaterBase(object):
|
||||
|
||||
def __init__(self, conf, matchmaker, update_method):
|
||||
def __init__(self, conf, matchmaker, update_method, sleep_for):
|
||||
self.conf = conf
|
||||
self.matchmaker = matchmaker
|
||||
self.update_method = update_method
|
||||
self._sleep_for = self.conf.oslo_messaging_zmq.zmq_target_update
|
||||
self.update_method()
|
||||
self._sleep_for = sleep_for
|
||||
self.executor = zmq_async.get_executor(method=self._update_loop)
|
||||
self.executor.execute()
|
||||
|
||||
@ -53,7 +52,8 @@ class ConnectionUpdater(UpdaterBase):
|
||||
def __init__(self, conf, matchmaker, socket):
|
||||
self.socket = socket
|
||||
super(ConnectionUpdater, self).__init__(
|
||||
conf, matchmaker, self._update_connection)
|
||||
conf, matchmaker, self._update_connection,
|
||||
conf.oslo_messaging_zmq.zmq_target_update)
|
||||
|
||||
@abc.abstractmethod
|
||||
def _update_connection(self):
|
||||
|
80
oslo_messaging/tests/drivers/zmq/test_routing_table.py
Normal file
80
oslo_messaging/tests/drivers/zmq/test_routing_table.py
Normal file
@ -0,0 +1,80 @@
|
||||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class TestRoutingTable(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestRoutingTable, self).setUp()
|
||||
|
||||
def test_get_next_while_origin_changed(self):
|
||||
table = zmq_routing_table.RoutingTable(self.conf)
|
||||
table.register("topic1.server1", "1")
|
||||
table.register("topic1.server1", "2")
|
||||
table.register("topic1.server1", "3")
|
||||
|
||||
rr_gen = table.get_hosts_round_robin("topic1.server1")
|
||||
|
||||
result = []
|
||||
for i in range(3):
|
||||
result.append(next(rr_gen))
|
||||
|
||||
self.assertEqual(3, len(result))
|
||||
self.assertIn("1", result)
|
||||
self.assertIn("2", result)
|
||||
self.assertIn("3", result)
|
||||
|
||||
table.register("topic1.server1", "4")
|
||||
table.register("topic1.server1", "5")
|
||||
table.register("topic1.server1", "6")
|
||||
|
||||
result = []
|
||||
for i in range(6):
|
||||
result.append(next(rr_gen))
|
||||
|
||||
self.assertEqual(6, len(result))
|
||||
self.assertIn("1", result)
|
||||
self.assertIn("2", result)
|
||||
self.assertIn("3", result)
|
||||
self.assertIn("4", result)
|
||||
self.assertIn("5", result)
|
||||
self.assertIn("6", result)
|
||||
|
||||
def test_no_targets(self):
|
||||
table = zmq_routing_table.RoutingTable(self.conf)
|
||||
rr_gen = table.get_hosts_round_robin("topic1.server1")
|
||||
|
||||
result = []
|
||||
for t in rr_gen:
|
||||
result.append(t)
|
||||
self.assertEqual(0, len(result))
|
||||
|
||||
def test_target_unchanged(self):
|
||||
table = zmq_routing_table.RoutingTable(self.conf)
|
||||
table.register("topic1.server1", "1")
|
||||
|
||||
rr_gen = table.get_hosts_round_robin("topic1.server1")
|
||||
|
||||
result = []
|
||||
for i in range(3):
|
||||
result.append(next(rr_gen))
|
||||
|
||||
self.assertEqual(["1", "1", "1"], result)
|
@ -165,6 +165,7 @@ class CastTestCase(utils.SkipIfNoTransportURL):
|
||||
client.append(text='stack')
|
||||
client.add(increment=2)
|
||||
client.add(increment=10)
|
||||
time.sleep(0.3)
|
||||
client.sync()
|
||||
|
||||
group.sync(1)
|
||||
@ -205,6 +206,7 @@ class CastTestCase(utils.SkipIfNoTransportURL):
|
||||
client.append(text='stack')
|
||||
client.add(increment=2)
|
||||
client.add(increment=10)
|
||||
time.sleep(0.3)
|
||||
client.sync()
|
||||
group.sync(server='all')
|
||||
for s in group.servers:
|
||||
|
@ -177,6 +177,7 @@ class RpcServerGroupFixture(fixtures.Fixture):
|
||||
if server is None:
|
||||
for i in range(len(self.servers)):
|
||||
self.client(i).ping()
|
||||
time.sleep(0.3)
|
||||
else:
|
||||
if server == 'all':
|
||||
for s in self.servers:
|
||||
|
Loading…
Reference in New Issue
Block a user