Merge "[zmq] Switch notifications to PUB/SUB pattern"

This commit is contained in:
Jenkins 2016-01-06 14:51:34 +00:00 committed by Gerrit Code Review
commit 9ed56f8a1a
7 changed files with 26 additions and 31 deletions

View File

@ -42,8 +42,8 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
self.matchmaker = matchmaker
reply_receiver = zmq_dealer_publisher_proxy.ReplyReceiver(self.poller)
self.publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy(
conf, matchmaker, reply_receiver)
self.direct_publisher = zmq_dealer_publisher_proxy \
.DealerPublisherProxy(conf, matchmaker, reply_receiver)
self.pub_publisher = zmq_pub_publisher.PubPublisherProxy(
conf, matchmaker)
@ -62,11 +62,11 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
% multipart_message)
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
if self.conf.use_pub_sub and \
envelope[zmq_names.FIELD_MSG_TYPE] \
== zmq_names.CAST_FANOUT_TYPE:
envelope[zmq_names.FIELD_MSG_TYPE] \
in zmq_names.MULTISEND_TYPES:
self.pub_publisher.send_request(multipart_message)
else:
self.publisher.send_request(multipart_message)
self.direct_publisher.send_request(multipart_message)
def _redirect_reply(self, reply):
LOG.debug("Reply proxy %s" % reply)

View File

@ -34,6 +34,10 @@ class ZmqClient(zmq_client_base.ZmqClientBase):
zmq_dealer_publisher.DealerPublisherLight(
conf, zmq_address.get_broker_address(conf))
fanout_publisher = zmq_dealer_publisher.DealerPublisherLight(
conf, zmq_address.get_broker_address(conf)) \
if conf.use_pub_sub else default_publisher
super(ZmqClient, self).__init__(
conf, matchmaker, allowed_remote_exmods,
publishers={
@ -44,10 +48,9 @@ class ZmqClient(zmq_client_base.ZmqClientBase):
# Here use DealerPublisherLight for sending request to proxy
# which finally uses PubPublisher to send fanout in case of
# 'use_pub_sub' option configured.
zmq_names.CAST_FANOUT_TYPE:
zmq_dealer_publisher.DealerPublisherLight(
conf, zmq_address.get_broker_address(conf))
if conf.use_pub_sub else default_publisher,
zmq_names.CAST_FANOUT_TYPE: fanout_publisher,
zmq_names.NOTIFY_TYPE: fanout_publisher,
"default": default_publisher
}

View File

@ -134,8 +134,3 @@ class NotificationRequest(Request):
def __init__(self, *args, **kwargs):
self.version = kwargs.pop("version")
super(NotificationRequest, self).__init__(*args, **kwargs)
class NotificationFanoutRequest(NotificationRequest):
msg_type = zmq_names.NOTIFY_FANOUT_TYPE

View File

@ -33,15 +33,16 @@ class ZmqServer(base.Listener):
super(ZmqServer, self).__init__(driver)
self.matchmaker = matchmaker
self.poller = zmq_async.get_poller()
self.rpc_consumer = zmq_router_consumer.RouterConsumerBroker(
self.router_consumer = zmq_router_consumer.RouterConsumerBroker(
conf, self.poller, self) if conf.direct_over_proxy else \
zmq_router_consumer.RouterConsumer(
conf, self.poller, self)
self.notify_consumer = self.rpc_consumer
self.sub_consumer = zmq_sub_consumer.SubConsumer(
conf, self.poller, self) if conf.use_pub_sub else None
self.notify_consumer = self.sub_consumer if conf.use_pub_sub \
else self.router_consumer
self.consumers = [self.rpc_consumer]
self.consumers = [self.router_consumer]
if self.sub_consumer:
self.consumers.append(self.sub_consumer)
@ -52,7 +53,7 @@ class ZmqServer(base.Listener):
return message
def stop(self):
consumer = self.rpc_consumer
consumer = self.router_consumer
LOG.info("Stop server %s:%d" % (consumer.address, consumer.port))
def cleanup(self):
@ -61,19 +62,14 @@ class ZmqServer(base.Listener):
consumer.cleanup()
def listen(self, target):
consumer = self.rpc_consumer
consumer = self.router_consumer
consumer.listen(target)
if self.sub_consumer:
self.sub_consumer.listen(target)
def listen_notification(self, targets_and_priorities):
consumer = self.notify_consumer
LOG.info("Listen for notifications on %s:%d"
% (consumer.address, consumer.port))
for target, priority in targets_and_priorities:
t = copy.deepcopy(target)
t.topic = target.topic + '.' + priority

View File

@ -39,7 +39,6 @@ CALL_TYPE = 'call'
CAST_TYPE = 'cast'
CAST_FANOUT_TYPE = 'cast-f'
NOTIFY_TYPE = 'notify'
NOTIFY_FANOUT_TYPE = 'notify-f'
REPLY_TYPE = 'reply'
ACK_TYPE = 'ack'
@ -47,13 +46,12 @@ ACK_TYPE = 'ack'
MESSAGE_TYPES = (CALL_TYPE,
CAST_TYPE,
CAST_FANOUT_TYPE,
NOTIFY_TYPE,
NOTIFY_FANOUT_TYPE)
NOTIFY_TYPE)
MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_FANOUT_TYPE)
DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, NOTIFY_TYPE)
MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_TYPE)
DIRECT_TYPES = (CALL_TYPE, CAST_TYPE)
CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE)
NOTIFY_TYPES = (NOTIFY_TYPE, NOTIFY_FANOUT_TYPE)
NOTIFY_TYPES = (NOTIFY_TYPE,)
NON_BLOCKING_TYPES = CAST_TYPES + NOTIFY_TYPES

View File

@ -29,7 +29,6 @@ LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
@ -57,6 +56,7 @@ class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
for l in listeners:
l.cleanup()
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")

View File

@ -254,6 +254,9 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
def test_multiple_servers(self):
if self.url.startswith("amqp:"):
self.skipTest("QPID-6307")
if self.url.startswith("zmq:"):
self.skipTest("ZeroMQ-PUB-SUB")
listener_a = self.useFixture(
utils.NotificationFixture(self.conf, self.url, ['test-topic']))