[zmq] Fix send_cast in AckManager
This patch moves message sending closer to ack receiving. As a result we get more uniform load on servers and more reliable behavior when some proxies suddenly shut down. Change-Id: I0bc7b94c8259fcaa590c111bc5437520cdd302ef
This commit is contained in:
parent
a5e2a633e3
commit
86f4b80b0c
@ -34,8 +34,7 @@ class DealerPublisherBase(zmq_publisher_base.PublisherBase):
|
||||
|
||||
def __init__(self, conf, matchmaker, sender, receiver):
|
||||
sockets_manager = zmq_sockets_manager.SocketsManager(
|
||||
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
|
||||
self.socket_type = zmq.DEALER
|
||||
conf, matchmaker, zmq.DEALER)
|
||||
super(DealerPublisherBase, self).__init__(
|
||||
sockets_manager, sender, receiver)
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
# Copyright 2015-2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -12,7 +12,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import logging
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
@ -22,8 +21,6 @@ from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -55,8 +52,6 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
"""
|
||||
|
||||
def __init__(self, conf, matchmaker):
|
||||
self.routing_table = zmq_routing_table.RoutingTableAdaptor(
|
||||
conf, matchmaker, zmq.ROUTER)
|
||||
sender = zmq_senders.RequestSenderDirect(conf)
|
||||
if conf.oslo_messaging_zmq.rpc_use_acks:
|
||||
receiver = zmq_receivers.AckAndReplyReceiverDirect(conf)
|
||||
@ -65,6 +60,9 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
super(DealerPublisherDirect, self).__init__(conf, matchmaker, sender,
|
||||
receiver)
|
||||
|
||||
self.routing_table = zmq_routing_table.RoutingTableAdaptor(
|
||||
conf, matchmaker, zmq.ROUTER)
|
||||
|
||||
def _get_round_robin_host_connection(self, target, socket):
|
||||
host = self.routing_table.get_round_robin_host(target)
|
||||
socket.connect_to_host(host)
|
||||
@ -74,8 +72,7 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
socket.connect_to_host(host)
|
||||
|
||||
def acquire_connection(self, request):
|
||||
socket = zmq_socket.ZmqSocket(self.conf, self.context,
|
||||
self.socket_type, immediate=False)
|
||||
socket = self.sockets_manager.get_socket()
|
||||
if request.msg_type in zmq_names.DIRECT_TYPES:
|
||||
self._get_round_robin_host_connection(request.target, socket)
|
||||
elif request.msg_type in zmq_names.MULTISEND_TYPES:
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
# Copyright 2015-2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -27,10 +27,8 @@ from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
@ -47,14 +45,13 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
receiver = zmq_receivers.ReplyReceiverProxy(conf)
|
||||
super(DealerPublisherProxy, self).__init__(conf, matchmaker, sender,
|
||||
receiver)
|
||||
|
||||
self.socket = self.sockets_manager.get_socket_to_publishers(
|
||||
self._generate_identity())
|
||||
|
||||
self.routing_table = zmq_routing_table.RoutingTableAdaptor(
|
||||
conf, matchmaker, zmq.DEALER)
|
||||
|
||||
self.connection_updater = \
|
||||
PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket)
|
||||
self.connection_updater = PublisherConnectionUpdater(
|
||||
self.conf, self.matchmaker, self.socket)
|
||||
|
||||
def _generate_identity(self):
|
||||
return six.b(self.conf.oslo_messaging_zmq.rpc_zmq_host + "/" +
|
||||
@ -84,50 +81,49 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
self.sender.send(socket, request)
|
||||
|
||||
def cleanup(self):
|
||||
super(DealerPublisherProxy, self).cleanup()
|
||||
self.routing_table.cleanup()
|
||||
self.connection_updater.stop()
|
||||
self.routing_table.cleanup()
|
||||
self.socket.close()
|
||||
super(DealerPublisherProxy, self).cleanup()
|
||||
|
||||
|
||||
class PublisherConnectionUpdater(zmq_updater.ConnectionUpdater):
|
||||
|
||||
def _update_connection(self):
|
||||
publishers = self.matchmaker.get_publishers()
|
||||
for pub_address, router_address in publishers:
|
||||
self.socket.connect_to_host(router_address)
|
||||
for pub_address, fe_router_address in publishers:
|
||||
self.socket.connect_to_host(fe_router_address)
|
||||
|
||||
|
||||
class DealerPublisherProxyDynamic(
|
||||
zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
|
||||
def __init__(self, conf, matchmaker):
|
||||
sender = zmq_senders.RequestSenderProxy(conf)
|
||||
receiver = zmq_receivers.ReplyReceiverDirect(conf)
|
||||
super(DealerPublisherProxyDynamic, self).__init__(conf, matchmaker,
|
||||
sender, receiver)
|
||||
|
||||
self.publishers = set()
|
||||
self.updater = DynamicPublishersUpdater(conf, matchmaker,
|
||||
self.publishers)
|
||||
self.updater.update_publishers()
|
||||
sender = zmq_senders.RequestSenderProxy(conf)
|
||||
receiver = zmq_receivers.ReplyReceiverDirect(conf)
|
||||
super(DealerPublisherProxyDynamic, self).__init__(
|
||||
conf, matchmaker, sender, receiver)
|
||||
|
||||
def acquire_connection(self, request):
|
||||
socket = zmq_socket.ZmqSocket(self.conf, self.context,
|
||||
self.socket_type, immediate=False)
|
||||
if not self.publishers:
|
||||
raise zmq_matchmaker_base.MatchmakerUnavailable()
|
||||
socket = self.sockets_manager.get_socket()
|
||||
socket.connect_to_host(random.choice(tuple(self.publishers)))
|
||||
return socket
|
||||
|
||||
def send_request(self, socket, request):
|
||||
assert request.msg_type in zmq_names.MULTISEND_TYPES
|
||||
request.routing_key = zmq_address.target_to_subscribe_filter(
|
||||
request.target)
|
||||
request.routing_key = \
|
||||
zmq_address.target_to_subscribe_filter(request.target)
|
||||
self.sender.send(socket, request)
|
||||
|
||||
def cleanup(self):
|
||||
super(DealerPublisherProxyDynamic, self).cleanup()
|
||||
self.updater.cleanup()
|
||||
super(DealerPublisherProxyDynamic, self).cleanup()
|
||||
|
||||
|
||||
class DynamicPublishersUpdater(zmq_updater.UpdaterBase):
|
||||
@ -140,5 +136,6 @@ class DynamicPublishersUpdater(zmq_updater.UpdaterBase):
|
||||
self.publishers = publishers
|
||||
|
||||
def update_publishers(self):
|
||||
for _, pub_frontend in self.matchmaker.get_publishers():
|
||||
self.publishers.add(pub_frontend)
|
||||
publishers = self.matchmaker.get_publishers()
|
||||
for pub_address, fe_router_address in publishers:
|
||||
self.publishers.add(fe_router_address)
|
||||
|
@ -20,8 +20,8 @@ import six
|
||||
import oslo_messaging
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
@ -49,7 +49,6 @@ class PublisherBase(object):
|
||||
:param receiver: reply receiver object
|
||||
:type receiver: zmq_receivers.ReplyReceiver
|
||||
"""
|
||||
self.context = zmq.Context()
|
||||
self.sockets_manager = sockets_manager
|
||||
self.conf = sockets_manager.conf
|
||||
self.matchmaker = sockets_manager.matchmaker
|
||||
@ -94,4 +93,3 @@ class PublisherBase(object):
|
||||
def cleanup(self):
|
||||
"""Cleanup publisher. Close allocated connections."""
|
||||
self.receiver.stop()
|
||||
self.sockets_manager.cleanup()
|
||||
|
@ -33,15 +33,17 @@ class AckManager(zmq_publisher_manager.PublisherManagerBase):
|
||||
size=self.conf.oslo_messaging_zmq.rpc_thread_pool_size
|
||||
)
|
||||
|
||||
def _wait_for_ack(self, ack_future):
|
||||
request = ack_future.request
|
||||
def _wait_for_ack(self, request, ack_future=None):
|
||||
if ack_future is None:
|
||||
ack_future = self._schedule_request_for_ack(request)
|
||||
|
||||
retries = \
|
||||
request.retry or self.conf.oslo_messaging_zmq.rpc_retry_attempts
|
||||
if retries is None:
|
||||
retries = -1
|
||||
timeout = self.conf.oslo_messaging_zmq.rpc_ack_timeout_base
|
||||
|
||||
done = False
|
||||
done = ack_future is None
|
||||
while not done:
|
||||
try:
|
||||
reply_id, response = ack_future.result(timeout=timeout)
|
||||
@ -72,39 +74,41 @@ class AckManager(zmq_publisher_manager.PublisherManagerBase):
|
||||
if request.msg_type != zmq_names.CALL_TYPE:
|
||||
self.receiver.untrack_request(request)
|
||||
|
||||
def _schedule_request_for_ack(self, request):
|
||||
@zmq_publisher_manager.target_not_found_warn
|
||||
def _send_request(self, request):
|
||||
socket = self.publisher.acquire_connection(request)
|
||||
self.publisher.send_request(socket, request)
|
||||
return socket
|
||||
|
||||
def _schedule_request_for_ack(self, request):
|
||||
socket = self._send_request(request)
|
||||
if socket is None:
|
||||
return None
|
||||
self.receiver.register_socket(socket)
|
||||
futures_by_type = self.receiver.track_request(request)
|
||||
ack_future = futures_by_type[zmq_names.ACK_TYPE]
|
||||
ack_future.request = request
|
||||
ack_future = self.receiver.track_request(request)[zmq_names.ACK_TYPE]
|
||||
ack_future.socket = socket
|
||||
return ack_future
|
||||
|
||||
@zmq_publisher_manager.target_not_found_timeout
|
||||
def send_call(self, request):
|
||||
try:
|
||||
ack_future = self._schedule_request_for_ack(request)
|
||||
self._pool.submit(self._wait_for_ack, ack_future)
|
||||
if ack_future is None:
|
||||
self.publisher._raise_timeout(request)
|
||||
self._pool.submit(self._wait_for_ack, request, ack_future)
|
||||
try:
|
||||
return self.publisher.receive_reply(ack_future.socket, request)
|
||||
finally:
|
||||
if not ack_future.done():
|
||||
ack_future.set_result((None, None))
|
||||
|
||||
@zmq_publisher_manager.target_not_found_warn
|
||||
def send_cast(self, request):
|
||||
ack_future = self._schedule_request_for_ack(request)
|
||||
self._pool.submit(self._wait_for_ack, ack_future)
|
||||
self._pool.submit(self._wait_for_ack, request)
|
||||
|
||||
@zmq_publisher_manager.target_not_found_warn
|
||||
def _send_request(self, request):
|
||||
socket = self.publisher.acquire_connection(request)
|
||||
self.publisher.send_request(socket, request)
|
||||
def send_fanout(self, request):
|
||||
self._send_request(request)
|
||||
|
||||
def send_notify(self, request):
|
||||
self._send_request(request)
|
||||
|
||||
def cleanup(self):
|
||||
self._pool.shutdown(wait=True)
|
||||
super(AckManager, self).cleanup()
|
||||
|
||||
send_fanout = _send_request
|
||||
send_notify = _send_request
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
# Copyright 2015-2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -12,7 +12,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from oslo_messaging._drivers import common
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
@ -69,8 +68,9 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase):
|
||||
|
||||
super(ZmqClientDirect, self).__init__(
|
||||
conf, matchmaker, allowed_remote_exmods,
|
||||
publishers={"default": self._create_publisher_direct(
|
||||
conf, matchmaker)}
|
||||
publishers={
|
||||
"default": self._create_publisher_direct(conf, matchmaker)
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@ -91,6 +91,7 @@ class ZmqClientProxy(zmq_client_base.ZmqClientBase):
|
||||
|
||||
super(ZmqClientProxy, self).__init__(
|
||||
conf, matchmaker, allowed_remote_exmods,
|
||||
publishers={"default": self._create_publisher_proxy(
|
||||
conf, matchmaker)}
|
||||
publishers={
|
||||
"default": self._create_publisher_proxy(conf, matchmaker)
|
||||
}
|
||||
)
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
# Copyright 2015-2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -73,8 +73,7 @@ class ZmqClientBase(object):
|
||||
def _create_publisher_direct(conf, matchmaker):
|
||||
publisher_direct = zmq_dealer_publisher_direct.DealerPublisherDirect(
|
||||
conf, matchmaker)
|
||||
return zmq_publisher_manager.PublisherManagerDynamic(
|
||||
publisher_direct)
|
||||
return zmq_publisher_manager.PublisherManagerDynamic(publisher_direct)
|
||||
|
||||
@staticmethod
|
||||
def _create_publisher_proxy(conf, matchmaker):
|
||||
@ -86,9 +85,10 @@ class ZmqClientBase(object):
|
||||
|
||||
@staticmethod
|
||||
def _create_publisher_proxy_dynamic(conf, matchmaker):
|
||||
return zmq_publisher_manager.PublisherManagerDynamic(
|
||||
publisher_proxy = \
|
||||
zmq_dealer_publisher_proxy.DealerPublisherProxyDynamic(
|
||||
conf, matchmaker))
|
||||
conf, matchmaker)
|
||||
return zmq_publisher_manager.PublisherManagerDynamic(publisher_proxy)
|
||||
|
||||
def cleanup(self):
|
||||
cleaned = set()
|
||||
|
@ -28,16 +28,20 @@ LOG = logging.getLogger(__name__)
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
def _drop_message_warn(request):
|
||||
LOG.warning(_LW("Matchmaker contains no records for specified "
|
||||
"target %(target)s. Dropping message %(msg_id)s.")
|
||||
% {"target": request.target,
|
||||
"msg_id": request.message_id})
|
||||
|
||||
|
||||
def target_not_found_warn(func):
|
||||
def _target_not_found_warn(self, request, *args, **kwargs):
|
||||
try:
|
||||
return func(self, request, *args, **kwargs)
|
||||
except (zmq_matchmaker_base.MatchmakerUnavailable,
|
||||
retrying.RetryError):
|
||||
LOG.warning(_LW("Matchmaker contains no records for specified "
|
||||
"target %(target)s. Dropping message %(msg_id)s.")
|
||||
% {"target": request.target,
|
||||
"msg_id": request.message_id})
|
||||
_drop_message_warn(request)
|
||||
return _target_not_found_warn
|
||||
|
||||
|
||||
@ -47,6 +51,7 @@ def target_not_found_timeout(func):
|
||||
return func(self, request, *args, **kwargs)
|
||||
except (zmq_matchmaker_base.MatchmakerUnavailable,
|
||||
retrying.RetryError):
|
||||
_drop_message_warn(request)
|
||||
self.publisher._raise_timeout(request)
|
||||
return _target_not_found_timeout
|
||||
|
||||
@ -72,31 +77,31 @@ class PublisherManagerBase(object):
|
||||
"""Send call request
|
||||
|
||||
:param request: request object
|
||||
:type senders: zmq_request.Request
|
||||
:type request: zmq_request.CallRequest
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_cast(self, request):
|
||||
"""Send call request
|
||||
"""Send cast request
|
||||
|
||||
:param request: request object
|
||||
:type senders: zmq_request.Request
|
||||
:type request: zmq_request.CastRequest
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_fanout(self, request):
|
||||
"""Send call request
|
||||
"""Send fanout request
|
||||
|
||||
:param request: request object
|
||||
:type senders: zmq_request.Request
|
||||
:type request: zmq_request.FanoutRequest
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def send_notify(self, request):
|
||||
"""Send call request
|
||||
"""Send notification request
|
||||
|
||||
:param request: request object
|
||||
:type senders: zmq_request.Request
|
||||
:type request: zmq_request.NotificationRequest
|
||||
"""
|
||||
|
||||
def cleanup(self):
|
||||
@ -107,8 +112,8 @@ class PublisherManagerDynamic(PublisherManagerBase):
|
||||
|
||||
@target_not_found_timeout
|
||||
def send_call(self, request):
|
||||
with contextlib.closing(
|
||||
self.publisher.acquire_connection(request)) as socket:
|
||||
with contextlib.closing(self.publisher.acquire_connection(request)) \
|
||||
as socket:
|
||||
self.publisher.send_request(socket, request)
|
||||
reply = self.publisher.receive_reply(socket, request)
|
||||
return reply
|
||||
|
@ -12,12 +12,11 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
import itertools
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
@ -25,10 +24,10 @@ from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
||||
from oslo_messaging._i18n import _LW
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class RoutingTableAdaptor(object):
|
||||
|
||||
@ -63,8 +62,8 @@ class RoutingTableAdaptor(object):
|
||||
return host
|
||||
|
||||
def get_fanout_hosts(self, target):
|
||||
target_key = zmq_address.target_to_key(
|
||||
target, zmq_names.socket_type_str(self.listener_type))
|
||||
target_key = zmq_address.prefix_str(
|
||||
target.topic, zmq_names.socket_type_str(self.listener_type))
|
||||
|
||||
LOG.debug("Processing target %s for fanout." % target_key)
|
||||
|
||||
@ -123,14 +122,13 @@ class RoutingTable(object):
|
||||
self.targets[target_key] = (hosts_updated, self._create_tm())
|
||||
|
||||
def get_hosts_round_robin(self, target_key):
|
||||
while self._contains_hosts(target_key):
|
||||
while self.contains(target_key):
|
||||
for host in self._get_hosts_rr(target_key):
|
||||
yield host
|
||||
|
||||
def get_hosts_fanout(self, target_key):
|
||||
hosts, _ = self._get_hosts(target_key)
|
||||
for host in hosts:
|
||||
yield host
|
||||
return hosts
|
||||
|
||||
def contains(self, target_key):
|
||||
with self._lock:
|
||||
@ -147,10 +145,6 @@ class RoutingTable(object):
|
||||
_, tm = self.targets.get(target_key)
|
||||
return tm
|
||||
|
||||
def _contains_hosts(self, target_key):
|
||||
with self._lock:
|
||||
return target_key in self.targets
|
||||
|
||||
def _is_target_changed(self, target_key, tm_orig):
|
||||
return self._get_tm(target_key) != tm_orig
|
||||
|
||||
|
@ -12,10 +12,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
@ -23,61 +20,17 @@ zmq = zmq_async.import_zmq()
|
||||
|
||||
class SocketsManager(object):
|
||||
|
||||
def __init__(self, conf, matchmaker, listener_type, socket_type):
|
||||
def __init__(self, conf, matchmaker, socket_type):
|
||||
self.conf = conf
|
||||
self.matchmaker = matchmaker
|
||||
self.listener_type = listener_type
|
||||
self.socket_type = socket_type
|
||||
self.zmq_context = zmq.Context()
|
||||
self.outbound_sockets = {}
|
||||
self.socket_to_publishers = None
|
||||
self.socket_to_routers = None
|
||||
|
||||
def get_hosts(self, target):
|
||||
return self.matchmaker.get_hosts_retry(
|
||||
target, zmq_names.socket_type_str(self.listener_type))
|
||||
|
||||
def get_hosts_fanout(self, target):
|
||||
return self.matchmaker.get_hosts_fanout_retry(
|
||||
target, zmq_names.socket_type_str(self.listener_type))
|
||||
|
||||
@staticmethod
|
||||
def _key_from_target(target):
|
||||
return target.topic if target.fanout else str(target)
|
||||
|
||||
def _get_hosts_and_track(self, socket, target):
|
||||
self._get_hosts_and_connect(socket, target)
|
||||
self._track_socket(socket, target)
|
||||
|
||||
def _get_hosts_and_connect(self, socket, target):
|
||||
get_hosts = self.get_hosts_fanout if target.fanout else self.get_hosts
|
||||
hosts = get_hosts(target)
|
||||
self._connect_to_hosts(socket, hosts)
|
||||
|
||||
def _track_socket(self, socket, target):
|
||||
key = self._key_from_target(target)
|
||||
self.outbound_sockets[key] = (socket, time.time())
|
||||
|
||||
def _connect_to_hosts(self, socket, hosts):
|
||||
for host in hosts:
|
||||
socket.connect_to_host(host)
|
||||
|
||||
def _check_for_new_hosts(self, target):
|
||||
key = self._key_from_target(target)
|
||||
socket, tm = self.outbound_sockets[key]
|
||||
if 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \
|
||||
<= time.time() - tm:
|
||||
self._get_hosts_and_track(socket, target)
|
||||
return socket
|
||||
|
||||
def get_socket(self, target):
|
||||
key = self._key_from_target(target)
|
||||
if key in self.outbound_sockets:
|
||||
socket = self._check_for_new_hosts(target)
|
||||
else:
|
||||
def get_socket(self):
|
||||
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
|
||||
self.socket_type, immediate=False)
|
||||
self._get_hosts_and_track(socket, target)
|
||||
return socket
|
||||
|
||||
def get_socket_to_publishers(self, identity=None):
|
||||
@ -88,8 +41,8 @@ class SocketsManager(object):
|
||||
immediate=self.conf.oslo_messaging_zmq.zmq_immediate,
|
||||
identity=identity)
|
||||
publishers = self.matchmaker.get_publishers()
|
||||
for pub_address, router_address in publishers:
|
||||
self.socket_to_publishers.connect_to_host(router_address)
|
||||
for pub_address, fe_router_address in publishers:
|
||||
self.socket_to_publishers.connect_to_host(fe_router_address)
|
||||
return self.socket_to_publishers
|
||||
|
||||
def get_socket_to_routers(self, identity=None):
|
||||
@ -100,10 +53,6 @@ class SocketsManager(object):
|
||||
immediate=self.conf.oslo_messaging_zmq.zmq_immediate,
|
||||
identity=identity)
|
||||
routers = self.matchmaker.get_routers()
|
||||
for router_address in routers:
|
||||
self.socket_to_routers.connect_to_host(router_address)
|
||||
for be_router_address in routers:
|
||||
self.socket_to_routers.connect_to_host(be_router_address)
|
||||
return self.socket_to_routers
|
||||
|
||||
def cleanup(self):
|
||||
for socket, tm in self.outbound_sockets.values():
|
||||
socket.close()
|
||||
|
@ -41,7 +41,7 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||
def __init__(self, conf, poller, server):
|
||||
self.reply_sender = zmq_senders.ReplySenderProxy(conf)
|
||||
self.sockets_manager = zmq_sockets_manager.SocketsManager(
|
||||
conf, server.matchmaker, zmq.ROUTER, zmq.DEALER)
|
||||
conf, server.matchmaker, zmq.DEALER)
|
||||
self.host = None
|
||||
super(DealerConsumer, self).__init__(conf, poller, server, zmq.DEALER)
|
||||
self.connection_updater = ConsumerConnectionUpdater(
|
||||
|
Loading…
Reference in New Issue
Block a user