[zmq] Switch notifications to PUB/SUB pattern
In this change PUB/SUB pattern was used for notifications. Reduced useless split to direct/fanout notifications. All notifications are fanout by default. Change-Id: I232c093af76757dc6fc5e942d47f0473877f9efa Closes-Bug: #1529845
This commit is contained in:
@@ -42,8 +42,8 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
|
|||||||
|
|
||||||
self.matchmaker = matchmaker
|
self.matchmaker = matchmaker
|
||||||
reply_receiver = zmq_dealer_publisher_proxy.ReplyReceiver(self.poller)
|
reply_receiver = zmq_dealer_publisher_proxy.ReplyReceiver(self.poller)
|
||||||
self.publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy(
|
self.direct_publisher = zmq_dealer_publisher_proxy \
|
||||||
conf, matchmaker, reply_receiver)
|
.DealerPublisherProxy(conf, matchmaker, reply_receiver)
|
||||||
self.pub_publisher = zmq_pub_publisher.PubPublisherProxy(
|
self.pub_publisher = zmq_pub_publisher.PubPublisherProxy(
|
||||||
conf, matchmaker)
|
conf, matchmaker)
|
||||||
|
|
||||||
@@ -62,11 +62,11 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
|
|||||||
% multipart_message)
|
% multipart_message)
|
||||||
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
|
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
|
||||||
if self.conf.use_pub_sub and \
|
if self.conf.use_pub_sub and \
|
||||||
envelope[zmq_names.FIELD_MSG_TYPE] \
|
envelope[zmq_names.FIELD_MSG_TYPE] \
|
||||||
== zmq_names.CAST_FANOUT_TYPE:
|
in zmq_names.MULTISEND_TYPES:
|
||||||
self.pub_publisher.send_request(multipart_message)
|
self.pub_publisher.send_request(multipart_message)
|
||||||
else:
|
else:
|
||||||
self.publisher.send_request(multipart_message)
|
self.direct_publisher.send_request(multipart_message)
|
||||||
|
|
||||||
def _redirect_reply(self, reply):
|
def _redirect_reply(self, reply):
|
||||||
LOG.debug("Reply proxy %s" % reply)
|
LOG.debug("Reply proxy %s" % reply)
|
||||||
|
|||||||
@@ -34,6 +34,10 @@ class ZmqClient(zmq_client_base.ZmqClientBase):
|
|||||||
zmq_dealer_publisher.DealerPublisherLight(
|
zmq_dealer_publisher.DealerPublisherLight(
|
||||||
conf, zmq_address.get_broker_address(conf))
|
conf, zmq_address.get_broker_address(conf))
|
||||||
|
|
||||||
|
fanout_publisher = zmq_dealer_publisher.DealerPublisherLight(
|
||||||
|
conf, zmq_address.get_broker_address(conf)) \
|
||||||
|
if conf.use_pub_sub else default_publisher
|
||||||
|
|
||||||
super(ZmqClient, self).__init__(
|
super(ZmqClient, self).__init__(
|
||||||
conf, matchmaker, allowed_remote_exmods,
|
conf, matchmaker, allowed_remote_exmods,
|
||||||
publishers={
|
publishers={
|
||||||
@@ -44,10 +48,9 @@ class ZmqClient(zmq_client_base.ZmqClientBase):
|
|||||||
# Here use DealerPublisherLight for sending request to proxy
|
# Here use DealerPublisherLight for sending request to proxy
|
||||||
# which finally uses PubPublisher to send fanout in case of
|
# which finally uses PubPublisher to send fanout in case of
|
||||||
# 'use_pub_sub' option configured.
|
# 'use_pub_sub' option configured.
|
||||||
zmq_names.CAST_FANOUT_TYPE:
|
zmq_names.CAST_FANOUT_TYPE: fanout_publisher,
|
||||||
zmq_dealer_publisher.DealerPublisherLight(
|
|
||||||
conf, zmq_address.get_broker_address(conf))
|
zmq_names.NOTIFY_TYPE: fanout_publisher,
|
||||||
if conf.use_pub_sub else default_publisher,
|
|
||||||
|
|
||||||
"default": default_publisher
|
"default": default_publisher
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -134,8 +134,3 @@ class NotificationRequest(Request):
|
|||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
self.version = kwargs.pop("version")
|
self.version = kwargs.pop("version")
|
||||||
super(NotificationRequest, self).__init__(*args, **kwargs)
|
super(NotificationRequest, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
class NotificationFanoutRequest(NotificationRequest):
|
|
||||||
|
|
||||||
msg_type = zmq_names.NOTIFY_FANOUT_TYPE
|
|
||||||
|
|||||||
@@ -33,15 +33,16 @@ class ZmqServer(base.Listener):
|
|||||||
super(ZmqServer, self).__init__(driver)
|
super(ZmqServer, self).__init__(driver)
|
||||||
self.matchmaker = matchmaker
|
self.matchmaker = matchmaker
|
||||||
self.poller = zmq_async.get_poller()
|
self.poller = zmq_async.get_poller()
|
||||||
self.rpc_consumer = zmq_router_consumer.RouterConsumerBroker(
|
self.router_consumer = zmq_router_consumer.RouterConsumerBroker(
|
||||||
conf, self.poller, self) if conf.direct_over_proxy else \
|
conf, self.poller, self) if conf.direct_over_proxy else \
|
||||||
zmq_router_consumer.RouterConsumer(
|
zmq_router_consumer.RouterConsumer(
|
||||||
conf, self.poller, self)
|
conf, self.poller, self)
|
||||||
self.notify_consumer = self.rpc_consumer
|
|
||||||
self.sub_consumer = zmq_sub_consumer.SubConsumer(
|
self.sub_consumer = zmq_sub_consumer.SubConsumer(
|
||||||
conf, self.poller, self) if conf.use_pub_sub else None
|
conf, self.poller, self) if conf.use_pub_sub else None
|
||||||
|
self.notify_consumer = self.sub_consumer if conf.use_pub_sub \
|
||||||
|
else self.router_consumer
|
||||||
|
|
||||||
self.consumers = [self.rpc_consumer]
|
self.consumers = [self.router_consumer]
|
||||||
if self.sub_consumer:
|
if self.sub_consumer:
|
||||||
self.consumers.append(self.sub_consumer)
|
self.consumers.append(self.sub_consumer)
|
||||||
|
|
||||||
@@ -52,7 +53,7 @@ class ZmqServer(base.Listener):
|
|||||||
return message
|
return message
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
consumer = self.rpc_consumer
|
consumer = self.router_consumer
|
||||||
LOG.info("Stop server %s:%d" % (consumer.address, consumer.port))
|
LOG.info("Stop server %s:%d" % (consumer.address, consumer.port))
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
@@ -61,19 +62,14 @@ class ZmqServer(base.Listener):
|
|||||||
consumer.cleanup()
|
consumer.cleanup()
|
||||||
|
|
||||||
def listen(self, target):
|
def listen(self, target):
|
||||||
consumer = self.rpc_consumer
|
consumer = self.router_consumer
|
||||||
consumer.listen(target)
|
consumer.listen(target)
|
||||||
|
|
||||||
if self.sub_consumer:
|
if self.sub_consumer:
|
||||||
self.sub_consumer.listen(target)
|
self.sub_consumer.listen(target)
|
||||||
|
|
||||||
def listen_notification(self, targets_and_priorities):
|
def listen_notification(self, targets_and_priorities):
|
||||||
|
|
||||||
consumer = self.notify_consumer
|
consumer = self.notify_consumer
|
||||||
|
|
||||||
LOG.info("Listen for notifications on %s:%d"
|
|
||||||
% (consumer.address, consumer.port))
|
|
||||||
|
|
||||||
for target, priority in targets_and_priorities:
|
for target, priority in targets_and_priorities:
|
||||||
t = copy.deepcopy(target)
|
t = copy.deepcopy(target)
|
||||||
t.topic = target.topic + '.' + priority
|
t.topic = target.topic + '.' + priority
|
||||||
|
|||||||
@@ -39,7 +39,6 @@ CALL_TYPE = 'call'
|
|||||||
CAST_TYPE = 'cast'
|
CAST_TYPE = 'cast'
|
||||||
CAST_FANOUT_TYPE = 'cast-f'
|
CAST_FANOUT_TYPE = 'cast-f'
|
||||||
NOTIFY_TYPE = 'notify'
|
NOTIFY_TYPE = 'notify'
|
||||||
NOTIFY_FANOUT_TYPE = 'notify-f'
|
|
||||||
|
|
||||||
REPLY_TYPE = 'reply'
|
REPLY_TYPE = 'reply'
|
||||||
ACK_TYPE = 'ack'
|
ACK_TYPE = 'ack'
|
||||||
@@ -47,13 +46,12 @@ ACK_TYPE = 'ack'
|
|||||||
MESSAGE_TYPES = (CALL_TYPE,
|
MESSAGE_TYPES = (CALL_TYPE,
|
||||||
CAST_TYPE,
|
CAST_TYPE,
|
||||||
CAST_FANOUT_TYPE,
|
CAST_FANOUT_TYPE,
|
||||||
NOTIFY_TYPE,
|
NOTIFY_TYPE)
|
||||||
NOTIFY_FANOUT_TYPE)
|
|
||||||
|
|
||||||
MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_FANOUT_TYPE)
|
MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_TYPE)
|
||||||
DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, NOTIFY_TYPE)
|
DIRECT_TYPES = (CALL_TYPE, CAST_TYPE)
|
||||||
CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE)
|
CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE)
|
||||||
NOTIFY_TYPES = (NOTIFY_TYPE, NOTIFY_FANOUT_TYPE)
|
NOTIFY_TYPES = (NOTIFY_TYPE,)
|
||||||
NON_BLOCKING_TYPES = CAST_TYPES + NOTIFY_TYPES
|
NON_BLOCKING_TYPES = CAST_TYPES + NOTIFY_TYPES
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -29,7 +29,6 @@ LOG = logging.getLogger(__name__)
|
|||||||
zmq = zmq_async.import_zmq()
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
|
class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
|
||||||
|
|
||||||
@testtools.skipIf(zmq is None, "zmq not available")
|
@testtools.skipIf(zmq is None, "zmq not available")
|
||||||
@@ -57,6 +56,7 @@ class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
|
|||||||
for l in listeners:
|
for l in listeners:
|
||||||
l.cleanup()
|
l.cleanup()
|
||||||
|
|
||||||
|
|
||||||
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
|
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
|
||||||
|
|
||||||
@testtools.skipIf(zmq is None, "zmq not available")
|
@testtools.skipIf(zmq is None, "zmq not available")
|
||||||
|
|||||||
@@ -254,6 +254,9 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
|
|||||||
def test_multiple_servers(self):
|
def test_multiple_servers(self):
|
||||||
if self.url.startswith("amqp:"):
|
if self.url.startswith("amqp:"):
|
||||||
self.skipTest("QPID-6307")
|
self.skipTest("QPID-6307")
|
||||||
|
if self.url.startswith("zmq:"):
|
||||||
|
self.skipTest("ZeroMQ-PUB-SUB")
|
||||||
|
|
||||||
listener_a = self.useFixture(
|
listener_a = self.useFixture(
|
||||||
utils.NotificationFixture(self.conf, self.url, ['test-topic']))
|
utils.NotificationFixture(self.conf, self.url, ['test-topic']))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user