diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index ffe913884..d39f9927c 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -42,8 +42,8 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): self.matchmaker = matchmaker reply_receiver = zmq_dealer_publisher_proxy.ReplyReceiver(self.poller) - self.publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy( - conf, matchmaker, reply_receiver) + self.direct_publisher = zmq_dealer_publisher_proxy \ + .DealerPublisherProxy(conf, matchmaker, reply_receiver) self.pub_publisher = zmq_pub_publisher.PubPublisherProxy( conf, matchmaker) @@ -62,11 +62,11 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy): % multipart_message) envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] if self.conf.use_pub_sub and \ - envelope[zmq_names.FIELD_MSG_TYPE] \ - == zmq_names.CAST_FANOUT_TYPE: + envelope[zmq_names.FIELD_MSG_TYPE] \ + in zmq_names.MULTISEND_TYPES: self.pub_publisher.send_request(multipart_message) else: - self.publisher.send_request(multipart_message) + self.direct_publisher.send_request(multipart_message) def _redirect_reply(self, reply): LOG.debug("Reply proxy %s" % reply) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index fa8ccdc3c..ffe484557 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -34,6 +34,10 @@ class ZmqClient(zmq_client_base.ZmqClientBase): zmq_dealer_publisher.DealerPublisherLight( conf, zmq_address.get_broker_address(conf)) + fanout_publisher = zmq_dealer_publisher.DealerPublisherLight( + conf, zmq_address.get_broker_address(conf)) \ + if conf.use_pub_sub else default_publisher + super(ZmqClient, self).__init__( conf, matchmaker, allowed_remote_exmods, publishers={ @@ -44,10 +48,9 @@ class ZmqClient(zmq_client_base.ZmqClientBase): # Here use DealerPublisherLight for sending request to proxy # which finally uses PubPublisher to send fanout in case of # 'use_pub_sub' option configured. - zmq_names.CAST_FANOUT_TYPE: - zmq_dealer_publisher.DealerPublisherLight( - conf, zmq_address.get_broker_address(conf)) - if conf.use_pub_sub else default_publisher, + zmq_names.CAST_FANOUT_TYPE: fanout_publisher, + + zmq_names.NOTIFY_TYPE: fanout_publisher, "default": default_publisher } diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py index ae7ebc919..05edefffd 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py @@ -134,8 +134,3 @@ class NotificationRequest(Request): def __init__(self, *args, **kwargs): self.version = kwargs.pop("version") super(NotificationRequest, self).__init__(*args, **kwargs) - - -class NotificationFanoutRequest(NotificationRequest): - - msg_type = zmq_names.NOTIFY_FANOUT_TYPE diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index 0dfec71f5..4d6fa90ff 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -33,15 +33,16 @@ class ZmqServer(base.Listener): super(ZmqServer, self).__init__(driver) self.matchmaker = matchmaker self.poller = zmq_async.get_poller() - self.rpc_consumer = zmq_router_consumer.RouterConsumerBroker( + self.router_consumer = zmq_router_consumer.RouterConsumerBroker( conf, self.poller, self) if conf.direct_over_proxy else \ zmq_router_consumer.RouterConsumer( conf, self.poller, self) - self.notify_consumer = self.rpc_consumer self.sub_consumer = zmq_sub_consumer.SubConsumer( conf, self.poller, self) if conf.use_pub_sub else None + self.notify_consumer = self.sub_consumer if conf.use_pub_sub \ + else self.router_consumer - self.consumers = [self.rpc_consumer] + self.consumers = [self.router_consumer] if self.sub_consumer: self.consumers.append(self.sub_consumer) @@ -52,7 +53,7 @@ class ZmqServer(base.Listener): return message def stop(self): - consumer = self.rpc_consumer + consumer = self.router_consumer LOG.info("Stop server %s:%d" % (consumer.address, consumer.port)) def cleanup(self): @@ -61,19 +62,14 @@ class ZmqServer(base.Listener): consumer.cleanup() def listen(self, target): - consumer = self.rpc_consumer + consumer = self.router_consumer consumer.listen(target) if self.sub_consumer: self.sub_consumer.listen(target) def listen_notification(self, targets_and_priorities): - consumer = self.notify_consumer - - LOG.info("Listen for notifications on %s:%d" - % (consumer.address, consumer.port)) - for target, priority in targets_and_priorities: t = copy.deepcopy(target) t.topic = target.topic + '.' + priority diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py index f7401ab21..d69f60b86 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py @@ -39,7 +39,6 @@ CALL_TYPE = 'call' CAST_TYPE = 'cast' CAST_FANOUT_TYPE = 'cast-f' NOTIFY_TYPE = 'notify' -NOTIFY_FANOUT_TYPE = 'notify-f' REPLY_TYPE = 'reply' ACK_TYPE = 'ack' @@ -47,13 +46,12 @@ ACK_TYPE = 'ack' MESSAGE_TYPES = (CALL_TYPE, CAST_TYPE, CAST_FANOUT_TYPE, - NOTIFY_TYPE, - NOTIFY_FANOUT_TYPE) + NOTIFY_TYPE) -MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_FANOUT_TYPE) -DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, NOTIFY_TYPE) +MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_TYPE) +DIRECT_TYPES = (CALL_TYPE, CAST_TYPE) CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE) -NOTIFY_TYPES = (NOTIFY_TYPE, NOTIFY_FANOUT_TYPE) +NOTIFY_TYPES = (NOTIFY_TYPE,) NON_BLOCKING_TYPES = CAST_TYPES + NOTIFY_TYPES diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py index 0957bfe88..566b9c7e8 100644 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py @@ -29,7 +29,6 @@ LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() - class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase): @testtools.skipIf(zmq is None, "zmq not available") @@ -57,6 +56,7 @@ class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase): for l in listeners: l.cleanup() + class TestConfZmqDriverLoad(test_utils.BaseTestCase): @testtools.skipIf(zmq is None, "zmq not available") diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index 8ee758dfc..d9ef04682 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -254,6 +254,9 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): def test_multiple_servers(self): if self.url.startswith("amqp:"): self.skipTest("QPID-6307") + if self.url.startswith("zmq:"): + self.skipTest("ZeroMQ-PUB-SUB") + listener_a = self.useFixture( utils.NotificationFixture(self.conf, self.url, ['test-topic']))