[zmq] Send fanouts without pub/sub in background

Change-Id: Ibfab90bb1dac06cd54671bc9a358927b3519ce63
This commit is contained in:
Gevorg Davoian 2016-10-31 19:50:36 +02:00
parent ea8fad47a5
commit feefead288
10 changed files with 114 additions and 56 deletions

View File

@ -20,7 +20,6 @@ from oslo_messaging._drivers.zmq_driver.client import zmq_senders
from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_names
zmq = zmq_async.import_zmq() zmq = zmq_async.import_zmq()

View File

@ -28,7 +28,6 @@ from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_updater from oslo_messaging._drivers.zmq_driver import zmq_updater
zmq = zmq_async.import_zmq() zmq = zmq_async.import_zmq()

View File

@ -1,4 +1,4 @@
# Copyright 2015 Mirantis, Inc. # Copyright 2015-2016 Mirantis, Inc.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -19,7 +19,6 @@ import six
import oslo_messaging import oslo_messaging
from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_async
zmq = zmq_async.import_zmq() zmq = zmq_async.import_zmq()
@ -89,5 +88,7 @@ class PublisherBase(object):
) )
def cleanup(self): def cleanup(self):
"""Cleanup publisher. Close allocated connections.""" """Cleanup publisher: stop receiving responses, close allocated
connections etc.
"""
self.receiver.stop() self.receiver.stop()

View File

@ -29,10 +29,7 @@ zmq = zmq_async.import_zmq()
class AckManager(zmq_publisher_manager.PublisherManagerBase): class AckManager(zmq_publisher_manager.PublisherManagerBase):
def __init__(self, publisher): def __init__(self, publisher):
super(AckManager, self).__init__(publisher) super(AckManager, self).__init__(publisher, with_pool=True)
self._pool = zmq_async.get_pool(
size=self.conf.oslo_messaging_zmq.rpc_thread_pool_size
)
@staticmethod @staticmethod
def _check_ack(ack, request): def _check_ack(ack, request):
@ -98,7 +95,7 @@ class AckManager(zmq_publisher_manager.PublisherManagerBase):
ack_future = self._schedule_request_for_ack(request) ack_future = self._schedule_request_for_ack(request)
if ack_future is None: if ack_future is None:
self.publisher._raise_timeout(request) self.publisher._raise_timeout(request)
self._pool.submit(self._wait_for_ack, request, ack_future) self.pool.submit(self._wait_for_ack, request, ack_future)
try: try:
return self.publisher.receive_reply(ack_future.socket, request) return self.publisher.receive_reply(ack_future.socket, request)
finally: finally:
@ -106,14 +103,16 @@ class AckManager(zmq_publisher_manager.PublisherManagerBase):
ack_future.set_result(None) ack_future.set_result(None)
def send_cast(self, request): def send_cast(self, request):
self._pool.submit(self._wait_for_ack, request) self.pool.submit(self._wait_for_ack, request)
def send_fanout(self, request): send_fanout = _send_request
self._send_request(request) send_notify = _send_request
def send_notify(self, request):
self._send_request(request)
def cleanup(self): class AckManagerAsyncMultisend(AckManager):
self._pool.shutdown(wait=True)
super(AckManager, self).cleanup() def _send_request_async(self, request):
self.pool.submit(self._send_request, request)
send_fanout = _send_request_async
send_notify = _send_request_async

View File

@ -71,23 +71,34 @@ class ZmqClientBase(object):
@staticmethod @staticmethod
def _create_publisher_direct(conf, matchmaker): def _create_publisher_direct(conf, matchmaker):
publisher_direct = zmq_dealer_publisher_direct.DealerPublisherDirect( publisher_direct = \
conf, matchmaker) zmq_dealer_publisher_direct.DealerPublisherDirect(conf, matchmaker)
return zmq_publisher_manager.PublisherManagerDynamic(publisher_direct) publisher_manager_cls = zmq_publisher_manager.PublisherManagerDynamic \
if conf.oslo_messaging_zmq.use_pub_sub else \
zmq_publisher_manager.PublisherManagerDynamicAsyncMultisend
return publisher_manager_cls(publisher_direct)
@staticmethod @staticmethod
def _create_publisher_proxy(conf, matchmaker): def _create_publisher_proxy(conf, matchmaker):
publisher_proxy = zmq_dealer_publisher_proxy.DealerPublisherProxy( publisher_proxy = \
conf, matchmaker) zmq_dealer_publisher_proxy.DealerPublisherProxy(conf, matchmaker)
return zmq_ack_manager.AckManager(publisher_proxy) \ if conf.oslo_messaging_zmq.rpc_use_acks:
if conf.oslo_messaging_zmq.rpc_use_acks else \ ack_manager_cls = zmq_ack_manager.AckManager \
zmq_publisher_manager.PublisherManagerStatic(publisher_proxy) if conf.oslo_messaging_zmq.use_pub_sub else \
zmq_ack_manager.AckManagerAsyncMultisend
return ack_manager_cls(publisher_proxy)
else:
publisher_manager_cls = \
zmq_publisher_manager.PublisherManagerStatic \
if conf.oslo_messaging_zmq.use_pub_sub else \
zmq_publisher_manager.PublisherManagerStaticAsyncMultisend
return publisher_manager_cls(publisher_proxy)
@staticmethod @staticmethod
def _create_publisher_proxy_dynamic(conf, matchmaker): def _create_publisher_proxy_dynamic(conf, matchmaker):
publisher_proxy = \ publisher_proxy = \
zmq_dealer_publisher_proxy.DealerPublisherProxyDynamic( zmq_dealer_publisher_proxy.DealerPublisherProxyDynamic(conf,
conf, matchmaker) matchmaker)
return zmq_publisher_manager.PublisherManagerDynamic(publisher_proxy) return zmq_publisher_manager.PublisherManagerDynamic(publisher_proxy)
def cleanup(self): def cleanup(self):

View File

@ -63,14 +63,21 @@ class PublisherManagerBase(object):
Publisher knows how to establish connection, how to send message, Publisher knows how to establish connection, how to send message,
and how to receive reply. PublisherManager coordinates all these steps and how to receive reply. PublisherManager coordinates all these steps
regarding retrying logic in AckManager implementations regarding retrying logic in AckManager implementations. May also have an
additional thread pool for scheduling background tasks.
""" """
def __init__(self, publisher): def __init__(self, publisher, with_pool=False):
self.publisher = publisher self.publisher = publisher
self.conf = publisher.conf self.conf = publisher.conf
self.sender = publisher.sender self.sender = publisher.sender
self.receiver = publisher.receiver self.receiver = publisher.receiver
if with_pool:
self.pool = zmq_async.get_pool(
size=self.conf.oslo_messaging_zmq.rpc_thread_pool_size
)
else:
self.pool = None
@abc.abstractmethod @abc.abstractmethod
def send_call(self, request): def send_call(self, request):
@ -105,6 +112,8 @@ class PublisherManagerBase(object):
""" """
def cleanup(self): def cleanup(self):
if self.pool:
self.pool.shutdown(wait=True)
self.publisher.cleanup() self.publisher.cleanup()
@ -129,6 +138,20 @@ class PublisherManagerDynamic(PublisherManagerBase):
send_notify = _send send_notify = _send
class PublisherManagerDynamicAsyncMultisend(PublisherManagerDynamic):
def __init__(self, publisher):
super(PublisherManagerDynamicAsyncMultisend, self).__init__(
publisher, with_pool=True
)
def _send_async(self, request):
self.pool.submit(self._send, request)
send_fanout = _send_async
send_notify = _send_async
class PublisherManagerStatic(PublisherManagerBase): class PublisherManagerStatic(PublisherManagerBase):
@target_not_found_timeout @target_not_found_timeout
@ -146,3 +169,17 @@ class PublisherManagerStatic(PublisherManagerBase):
send_cast = _send send_cast = _send
send_fanout = _send send_fanout = _send
send_notify = _send send_notify = _send
class PublisherManagerStaticAsyncMultisend(PublisherManagerStatic):
def __init__(self, publisher):
super(PublisherManagerStaticAsyncMultisend, self).__init__(
publisher, with_pool=True
)
def _send_async(self, request):
self.pool.submit(self._send, request)
send_fanout = _send_async
send_notify = _send_async

View File

@ -39,6 +39,7 @@ class RoutingTableAdaptor(object):
self.routing_table_updater = RoutingTableUpdater( self.routing_table_updater = RoutingTableUpdater(
conf, matchmaker, self.routing_table) conf, matchmaker, self.routing_table)
self.round_robin_targets = {} self.round_robin_targets = {}
self._lock = threading.Lock()
def get_round_robin_host(self, target): def get_round_robin_host(self, target):
target_key = zmq_address.target_to_key( target_key = zmq_address.target_to_key(
@ -47,20 +48,25 @@ class RoutingTableAdaptor(object):
LOG.debug("Processing target %s for round-robin." % target_key) LOG.debug("Processing target %s for round-robin." % target_key)
if target_key not in self.round_robin_targets: if target_key not in self.round_robin_targets:
LOG.debug("Target %s is not in cache. Check matchmaker server." self._fetch_round_robin_hosts_from_matchmaker(target, target_key)
% target_key)
hosts = self.matchmaker.get_hosts_retry(
target, zmq_names.socket_type_str(self.listener_type))
LOG.debug("Received hosts %s" % hosts)
self.routing_table.update_hosts(target_key, hosts)
self.round_robin_targets[target_key] = \
self.routing_table.get_hosts_round_robin(target_key)
rr_gen = self.round_robin_targets[target_key] rr_gen = self.round_robin_targets[target_key]
host = next(rr_gen) host = next(rr_gen)
LOG.debug("Host resolved for the current connection is %s" % host) LOG.debug("Host resolved for the current connection is %s" % host)
return host return host
def _fetch_round_robin_hosts_from_matchmaker(self, target, target_key):
with self._lock:
if target_key not in self.round_robin_targets:
LOG.debug("Target %s is not in cache. Check matchmaker server."
% target_key)
hosts = self.matchmaker.get_hosts_retry(
target, zmq_names.socket_type_str(self.listener_type))
LOG.debug("Received hosts %s" % hosts)
self.routing_table.update_hosts(target_key, hosts)
self.round_robin_targets[target_key] = \
self.routing_table.get_hosts_round_robin(target_key)
def get_fanout_hosts(self, target): def get_fanout_hosts(self, target):
target_key = zmq_address.prefix_str( target_key = zmq_address.prefix_str(
target.topic, zmq_names.socket_type_str(self.listener_type)) target.topic, zmq_names.socket_type_str(self.listener_type))
@ -68,16 +74,20 @@ class RoutingTableAdaptor(object):
LOG.debug("Processing target %s for fanout." % target_key) LOG.debug("Processing target %s for fanout." % target_key)
if not self.routing_table.contains(target_key): if not self.routing_table.contains(target_key):
LOG.debug("Target %s is not in cache. Check matchmaker server." self._fetch_fanout_hosts_from_matchmaker(target, target_key)
% target_key)
hosts = self.matchmaker.get_hosts_fanout(
target, zmq_names.socket_type_str(self.listener_type))
LOG.debug("Received hosts %s" % hosts)
self.routing_table.update_hosts(target_key, hosts)
else:
LOG.debug("Target %s has been found in cache." % target_key)
return self.routing_table.get_hosts_fanout(target_key) return self.routing_table.get_hosts_fanout(target_key)
def _fetch_fanout_hosts_from_matchmaker(self, target, target_key):
with self._lock:
if not self.routing_table.contains(target_key):
LOG.debug("Target %s is not in cache. Check matchmaker server."
% target_key)
hosts = self.matchmaker.get_hosts_fanout(
target, zmq_names.socket_type_str(self.listener_type))
LOG.debug("Received hosts %s" % hosts)
self.routing_table.update_hosts(target_key, hosts)
def cleanup(self): def cleanup(self):
self.routing_table_updater.cleanup() self.routing_table_updater.cleanup()

View File

@ -154,11 +154,12 @@ class DealerConsumerWithAcks(DealerConsumer):
"msg_type": zmq_names.message_type_str(message_type), "msg_type": zmq_names.message_type_str(message_type),
"msg_id": message_id} "msg_id": message_id}
) )
# NOTE(gdavoian): send yet another ack for the non-CALL # NOTE(gdavoian): send yet another ack for the direct
# message, since the old one might be lost; # message, since the old one might be lost;
# for the CALL message also try to resend its reply # for the CALL message also try to resend its reply
# (of course, if it was already obtained and cached). # (of course, if it was already obtained and cached).
self._acknowledge(reply_id, message_id, socket) if message_type in zmq_names.DIRECT_TYPES:
self._acknowledge(reply_id, message_id, socket)
if message_type == zmq_names.CALL_TYPE: if message_type == zmq_names.CALL_TYPE:
self._reply_from_cache(message_id, socket) self._reply_from_cache(message_id, socket)
return None return None
@ -168,7 +169,8 @@ class DealerConsumerWithAcks(DealerConsumer):
# NOTE(gdavoian): send an immediate ack, since it may # NOTE(gdavoian): send an immediate ack, since it may
# be too late to wait until the message will be # be too late to wait until the message will be
# dispatched and processed by a RPC server # dispatched and processed by a RPC server
self._acknowledge(reply_id, message_id, socket) if message_type in zmq_names.DIRECT_TYPES:
self._acknowledge(reply_id, message_id, socket)
return super(DealerConsumerWithAcks, self)._create_message( return super(DealerConsumerWithAcks, self)._create_message(
context, message, reply_id, message_id, socket, message_type context, message, reply_id, message_id, socket, message_type

View File

@ -82,7 +82,7 @@ zmq_opts = [
help='Update period in seconds of a name service record ' help='Update period in seconds of a name service record '
'about existing target.'), 'about existing target.'),
cfg.BoolOpt('use_pub_sub', default=True, cfg.BoolOpt('use_pub_sub', default=False,
deprecated_group='DEFAULT', deprecated_group='DEFAULT',
help='Use PUB/SUB pattern for fanout methods. ' help='Use PUB/SUB pattern for fanout methods. '
'PUB/SUB always uses proxy.'), 'PUB/SUB always uses proxy.'),

View File

@ -109,7 +109,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
self.target, {}, self.message, wait_for_reply=False self.target, {}, self.message, wait_for_reply=False
) )
self.assertIsNone(result) self.assertIsNone(result)
self.ack_manager._pool.shutdown(wait=True) self.ack_manager.pool.shutdown(wait=True)
self.assertTrue(self.listener._received.isSet()) self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.message, self.listener.message.message) self.assertEqual(self.message, self.listener.message.message)
self.assertEqual(1, self.send.call_count) self.assertEqual(1, self.send.call_count)
@ -133,7 +133,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
with mock.patch.object(DealerConsumerWithAcks, '_acknowledge', with mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
side_effect=DealerConsumerWithAcks._acknowledge, side_effect=DealerConsumerWithAcks._acknowledge,
autospec=True) as received_ack_mock: autospec=True) as received_ack_mock:
self.ack_manager._pool.shutdown(wait=True) self.ack_manager.pool.shutdown(wait=True)
self.assertFalse(self.listener._received.isSet()) self.assertFalse(self.listener._received.isSet())
self.assertEqual(2, self.send.call_count) self.assertEqual(2, self.send.call_count)
self.assertEqual(1, received_ack_mock.call_count) self.assertEqual(1, received_ack_mock.call_count)
@ -161,7 +161,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
with mock.patch.object(DealerConsumerWithAcks, '_acknowledge', with mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
side_effect=DealerConsumerWithAcks._acknowledge, side_effect=DealerConsumerWithAcks._acknowledge,
autospec=True) as received_ack_mock: autospec=True) as received_ack_mock:
self.ack_manager._pool.shutdown(wait=True) self.ack_manager.pool.shutdown(wait=True)
self.assertFalse(self.listener._received.isSet()) self.assertFalse(self.listener._received.isSet())
self.assertEqual(3, self.send.call_count) self.assertEqual(3, self.send.call_count)
self.assertEqual(1, received_ack_mock.call_count) self.assertEqual(1, received_ack_mock.call_count)
@ -173,7 +173,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
self.target, {}, self.message, wait_for_reply=False self.target, {}, self.message, wait_for_reply=False
) )
self.assertIsNone(result) self.assertIsNone(result)
self.ack_manager._pool.shutdown(wait=True) self.ack_manager.pool.shutdown(wait=True)
self.assertTrue(self.listener._received.isSet()) self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.message, self.listener.message.message) self.assertEqual(self.message, self.listener.message.message)
self.assertEqual(3, self.send.call_count) self.assertEqual(3, self.send.call_count)
@ -196,7 +196,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
self.target, {}, self.message, wait_for_reply=True, timeout=10 self.target, {}, self.message, wait_for_reply=True, timeout=10
) )
self.assertIsNotNone(result) self.assertIsNotNone(result)
self.ack_manager._pool.shutdown(wait=True) self.ack_manager.pool.shutdown(wait=True)
self.assertTrue(self.listener._received.isSet()) self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.message, self.listener.message.message) self.assertEqual(self.message, self.listener.message.message)
self.assertEqual(1, self.send.call_count) self.assertEqual(1, self.send.call_count)
@ -215,7 +215,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
self.driver.send, self.driver.send,
self.target, {}, self.message, self.target, {}, self.message,
wait_for_reply=True, timeout=20) wait_for_reply=True, timeout=20)
self.ack_manager._pool.shutdown(wait=True) self.ack_manager.pool.shutdown(wait=True)
self.assertTrue(self.listener._received.isSet()) self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.message, self.listener.message.message) self.assertEqual(self.message, self.listener.message.message)
self.assertEqual(3, self.send.call_count) self.assertEqual(3, self.send.call_count)