diff --git a/oslo_messaging/_cmd/zmq_proxy.py b/oslo_messaging/_cmd/zmq_proxy.py index 80f8ad69c..29bf34e03 100644 --- a/oslo_messaging/_cmd/zmq_proxy.py +++ b/oslo_messaging/_cmd/zmq_proxy.py @@ -1,4 +1,4 @@ -# Copyright 2015 Mirantis, Inc. +# Copyright 2015-2016 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -12,86 +12,36 @@ # License for the specific language governing permissions and limitations # under the License. -import argparse import logging from oslo_config import cfg from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy -from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy from oslo_messaging._drivers.zmq_driver import zmq_options +from oslo_messaging._i18n import _LI -CONF = cfg.CONF - -zmq_options.register_opts(CONF) - -opt_group = cfg.OptGroup(name='zmq_proxy_opts', - title='ZeroMQ proxy options') -CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group) - - -USAGE = """ Usage: ./zmq-proxy.py [-h] [] ... - -Usage example: - python oslo_messaging/_cmd/zmq-proxy.py""" +LOG = logging.getLogger(__name__) def main(): - parser = argparse.ArgumentParser( - description='ZeroMQ proxy service', - usage=USAGE - ) - parser.add_argument('-c', '--config-file', dest='config_file', type=str, - help='Path to configuration file') - parser.add_argument('-l', '--log-file', dest='log_file', type=str, - help='Path to log file') + conf = cfg.CONF + opt_group = cfg.OptGroup(name='zmq_proxy_opts', + title='ZeroMQ proxy options') + conf.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group) + zmq_options.register_opts(conf) + zmq_proxy.parse_command_line_args(conf) - parser.add_argument('-H', '--host', dest='host', type=str, - help='Host FQDN for current proxy') - parser.add_argument('-f', '--frontend-port', dest='frontend_port', - type=int, - help='Front-end ROUTER port number') - parser.add_argument('-b', '--backend-port', dest='backend_port', type=int, - help='Back-end ROUTER port number') - parser.add_argument('-p', '--publisher-port', dest='publisher_port', - type=int, - help='Front-end PUBLISHER port number') - - parser.add_argument('-d', '--debug', dest='debug', type=bool, - default=False, - help='Turn on DEBUG logging level instead of INFO') - - args = parser.parse_args() - - if args.config_file: - cfg.CONF(['--config-file', args.config_file]) - - log_kwargs = {'level': logging.DEBUG if args.debug else logging.INFO, - 'format': '%(asctime)s %(name)s %(levelname)-8s %(message)s'} - if args.log_file: - log_kwargs.update({'filename': args.log_file}) - logging.basicConfig(**log_kwargs) - - if args.host: - CONF.zmq_proxy_opts.host = args.host - if args.frontend_port: - CONF.set_override('frontend_port', args.frontend_port, - group='zmq_proxy_opts') - if args.backend_port: - CONF.set_override('backend_port', args.backend_port, - group='zmq_proxy_opts') - if args.publisher_port: - CONF.set_override('publisher_port', args.publisher_port, - group='zmq_proxy_opts') - - reactor = zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.UniversalQueueProxy) + reactor = zmq_proxy.ZmqProxy(conf) try: while True: reactor.run() except (KeyboardInterrupt, SystemExit): + LOG.info(_LI("Exit proxy by interrupt signal.")) + finally: reactor.close() + if __name__ == "__main__": main() diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py index b3f8aae86..c3d45e185 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py @@ -55,7 +55,7 @@ class Request(object): :type retry: int """ - if self.msg_type not in zmq_names.MESSAGE_TYPES: + if self.msg_type not in zmq_names.REQUEST_TYPES: raise RuntimeError("Unknown message type!") self.target = target diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/central/__init__.py b/oslo_messaging/_drivers/zmq_driver/proxy/central/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py similarity index 60% rename from oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py rename to oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py index 1b2ebd433..02bff5a83 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py @@ -1,4 +1,4 @@ -# Copyright 2015 Mirantis, Inc. +# Copyright 2015-2016 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -14,114 +14,123 @@ import logging -import six - -from oslo_messaging._drivers.zmq_driver.proxy import zmq_publisher_proxy +from oslo_messaging._drivers.zmq_driver.proxy.central \ + import zmq_publisher_proxy +from oslo_messaging._drivers.zmq_driver.proxy import zmq_sender from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_socket from oslo_messaging._drivers.zmq_driver import zmq_updater -from oslo_messaging._i18n import _LE, _LI +from oslo_messaging._i18n import _LI, _LE zmq = zmq_async.import_zmq() LOG = logging.getLogger(__name__) -class UniversalQueueProxy(object): +def check_message_format(func): + def _check_message_format(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + LOG.error(_LE("Received message with wrong format")) + LOG.exception(e) + return _check_message_format + + +class SingleRouterProxy(object): def __init__(self, conf, context, matchmaker): self.conf = conf self.context = context - super(UniversalQueueProxy, self).__init__() + super(SingleRouterProxy, self).__init__() self.matchmaker = matchmaker + host = conf.zmq_proxy_opts.host + self.poller = zmq_async.get_poller() port = conf.zmq_proxy_opts.frontend_port - host = conf.zmq_proxy_opts.host self.fe_router_socket = zmq_socket.ZmqFixedPortSocket( conf, context, zmq.ROUTER, host, conf.zmq_proxy_opts.frontend_port) if port != 0 else \ - zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host) + zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, + host) - port = conf.zmq_proxy_opts.backend_port - self.be_router_socket = zmq_socket.ZmqFixedPortSocket( - conf, context, zmq.ROUTER, host, - conf.zmq_proxy_opts.backend_port) if port != 0 else \ - zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host) + self.poller.register(self.fe_router_socket, self._receive_message) - self.poller.register(self.fe_router_socket, self._receive_in_request) - self.poller.register(self.be_router_socket, self._receive_in_request) - - self.pub_publisher = zmq_publisher_proxy.PublisherProxy( + self.publisher = zmq_publisher_proxy.PublisherProxy( conf, matchmaker) - - self._router_updater = RouterUpdater( - conf, matchmaker, self.pub_publisher.host, - self.fe_router_socket.connect_address, - self.be_router_socket.connect_address) + self.router_sender = zmq_sender.CentralRouterSender() + self._router_updater = self._create_router_updater() def run(self): message, socket = self.poller.poll() if message is None: return - msg_type = message[0] + msg_type = int(message[zmq_names.MESSAGE_TYPE_IDX]) if self.conf.oslo_messaging_zmq.use_pub_sub and \ msg_type in (zmq_names.CAST_FANOUT_TYPE, zmq_names.NOTIFY_TYPE): - self.pub_publisher.send_request(message) + self.publisher.send_request(message) else: - self._redirect_message(self.be_router_socket - if socket is self.fe_router_socket - else self.fe_router_socket, message) + self.router_sender.send_message( + self._get_socket_to_dispatch_on(socket), message) + + def _create_router_updater(self): + return RouterUpdater( + self.conf, self.matchmaker, self.publisher.host, + self.fe_router_socket.connect_address, + self.fe_router_socket.connect_address) + + def _get_socket_to_dispatch_on(self, socket): + return self.fe_router_socket @staticmethod - def _receive_in_request(socket): - try: - reply_id = socket.recv() - assert reply_id is not None, "Valid id expected" - empty = socket.recv() - assert empty == b'', "Empty delimiter expected" - msg_type = int(socket.recv()) - routing_key = socket.recv() - payload = socket.recv_multipart() - payload.insert(0, reply_id) - payload.insert(0, routing_key) - payload.insert(0, msg_type) - return payload - except (AssertionError, ValueError): - LOG.error(_LE("Received message with wrong format")) - if socket.getsockopt(zmq.RCVMORE): - # NOTE(ozamiatin): Drop the left parts of broken message - socket.recv_multipart() - except zmq.ZMQError as e: - LOG.exception(e) - return None - - @staticmethod - def _redirect_message(socket, multipart_message): - message_type = multipart_message.pop(0) - routing_key = multipart_message.pop(0) - reply_id = multipart_message.pop(0) - message_id = multipart_message[0] - socket.send(routing_key, zmq.SNDMORE) - socket.send(b'', zmq.SNDMORE) - socket.send(reply_id, zmq.SNDMORE) - socket.send(six.b(str(message_type)), zmq.SNDMORE) - LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - from %(rid)s " - "to -> %(rkey)s" % - {"msg_type": zmq_names.message_type_str(message_type), - "msg_id": message_id, - "rkey": routing_key, - "rid": reply_id}) - socket.send_multipart(multipart_message) + @check_message_format + def _receive_message(socket): + message = socket.recv_multipart() + assert len(message) > zmq_names.MESSAGE_ID_IDX, "Not enough parts" + assert message[zmq_names.REPLY_ID_IDX] != b'', "Valid id expected" + message_type = int(message[zmq_names.MESSAGE_TYPE_IDX]) + assert message_type in zmq_names.MESSAGE_TYPES, "Known type expected!" + assert message[zmq_names.EMPTY_IDX] == b'', "Empty delimiter expected" + return message def cleanup(self): + self._router_updater.cleanup() self.poller.close() self.fe_router_socket.close() + self.publisher.cleanup() + + +class DoubleRouterProxy(SingleRouterProxy): + + def __init__(self, conf, context, matchmaker): + LOG.info(_LI('Running double router proxy')) + port = conf.zmq_proxy_opts.backend_port + host = conf.zmq_proxy_opts.host + self.be_router_socket = zmq_socket.ZmqFixedPortSocket( + conf, context, zmq.ROUTER, host, + conf.zmq_proxy_opts.backend_port) if port != 0 else \ + zmq_socket.ZmqRandomPortSocket( + conf, context, zmq.ROUTER, host) + super(DoubleRouterProxy, self).__init__(conf, context, matchmaker) + self.poller.register(self.be_router_socket, self._receive_message) + + def _create_router_updater(self): + return RouterUpdater( + self.conf, self.matchmaker, self.publisher.host, + self.fe_router_socket.connect_address, + self.be_router_socket.connect_address) + + def _get_socket_to_dispatch_on(self, socket): + return self.be_router_socket \ + if socket is self.fe_router_socket \ + else self.fe_router_socket + + def cleanup(self): + super(DoubleRouterProxy, self).cleanup() self.be_router_socket.close() - self.pub_publisher.cleanup() - self._router_updater.cleanup() class RouterUpdater(zmq_updater.UpdaterBase): diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_publisher_proxy.py similarity index 71% rename from oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py rename to oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_publisher_proxy.py index 727b41903..09f578552 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_publisher_proxy.py @@ -1,4 +1,4 @@ -# Copyright 2015 Mirantis, Inc. +# Copyright 2015-2016 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -14,8 +14,8 @@ import logging +from oslo_messaging._drivers.zmq_driver.proxy import zmq_sender from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_socket LOG = logging.getLogger(__name__) @@ -44,7 +44,6 @@ class PublisherProxy(object): self.matchmaker = matchmaker port = conf.zmq_proxy_opts.publisher_port - self.socket = zmq_socket.ZmqFixedPortSocket( self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host, port) if port != 0 else \ @@ -52,23 +51,10 @@ class PublisherProxy(object): self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host) self.host = self.socket.connect_address + self.sender = zmq_sender.CentralPublisherSender() def send_request(self, multipart_message): - message_type = multipart_message.pop(0) - assert message_type in (zmq_names.CAST_FANOUT_TYPE, - zmq_names.NOTIFY_TYPE), "Fanout expected!" - topic_filter = multipart_message.pop(0) - reply_id = multipart_message.pop(0) - message_id = multipart_message.pop(0) - assert reply_id is not None, "Reply id expected!" - - self.socket.send(topic_filter, zmq.SNDMORE) - self.socket.send(message_id, zmq.SNDMORE) - self.socket.send_multipart(multipart_message) - - LOG.debug("Publishing message %(message_id)s on [%(topic)s]", - {"topic": topic_filter, - "message_id": message_id}) + self.sender.send_message(self.socket, multipart_message) def cleanup(self): self.socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py index 15c777489..886da5464 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py @@ -1,4 +1,4 @@ -# Copyright 2015 Mirantis, Inc. +# Copyright 2015-2016 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -12,12 +12,14 @@ # License for the specific language governing permissions and limitations # under the License. +import argparse import logging import socket +from oslo_config import cfg from stevedore import driver -from oslo_config import cfg +from oslo_messaging._drivers.zmq_driver.proxy.central import zmq_central_proxy from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._i18n import _LI @@ -25,6 +27,12 @@ zmq = zmq_async.import_zmq() LOG = logging.getLogger(__name__) +USAGE = """ Usage: ./zmq-proxy.py [-h] [] ... + +Usage example: + python oslo_messaging/_cmd/zmq-proxy.py""" + + zmq_proxy_opts = [ cfg.StrOpt('host', default=socket.gethostname(), help='Hostname (FQDN) of current proxy' @@ -41,6 +49,56 @@ zmq_proxy_opts = [ ] +def parse_command_line_args(conf): + parser = argparse.ArgumentParser( + description='ZeroMQ proxy service', + usage=USAGE + ) + + parser.add_argument('-c', '--config-file', dest='config_file', type=str, + help='Path to configuration file') + parser.add_argument('-l', '--log-file', dest='log_file', type=str, + help='Path to log file') + + parser.add_argument('-H', '--host', dest='host', type=str, + help='Host FQDN for current proxy') + parser.add_argument('-f', '--frontend-port', dest='frontend_port', + type=int, + help='Front-end ROUTER port number') + parser.add_argument('-b', '--backend-port', dest='backend_port', type=int, + help='Back-end ROUTER port number') + parser.add_argument('-p', '--publisher-port', dest='publisher_port', + type=int, + help='Front-end PUBLISHER port number') + + parser.add_argument('-d', '--debug', dest='debug', type=bool, + default=False, + help='Turn on DEBUG logging level instead of INFO') + + args = parser.parse_args() + + if args.config_file: + conf(['--config-file', args.config_file]) + + log_kwargs = {'level': logging.DEBUG if args.debug else logging.INFO, + 'format': '%(asctime)s %(name)s %(levelname)-8s %(message)s'} + if args.log_file: + log_kwargs.update({'filename': args.log_file}) + logging.basicConfig(**log_kwargs) + + if args.host: + conf.zmq_proxy_opts.host = args.host + if args.frontend_port: + conf.set_override('frontend_port', args.frontend_port, + group='zmq_proxy_opts') + if args.backend_port: + conf.set_override('backend_port', args.backend_port, + group='zmq_proxy_opts') + if args.publisher_port: + conf.set_override('publisher_port', args.publisher_port, + group='zmq_proxy_opts') + + class ZmqProxy(object): """Wrapper class for Publishers and Routers proxies. The main reason to have a proxy is high complexity of TCP sockets number @@ -80,7 +138,7 @@ class ZmqProxy(object): """ - def __init__(self, conf, proxy_cls): + def __init__(self, conf): super(ZmqProxy, self).__init__() self.conf = conf self.matchmaker = driver.DriverManager( @@ -88,7 +146,16 @@ class ZmqProxy(object): self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker, ).driver(self.conf) self.context = zmq.Context() - self.proxy = proxy_cls(conf, self.context, self.matchmaker) + self.proxy = self._choose_proxy_implementation() + + def _choose_proxy_implementation(self): + if self.conf.zmq_proxy_opts.frontend_port != 0 and \ + self.conf.zmq_proxy_opts.backend_port == 0: + return zmq_central_proxy.SingleRouterProxy(self.conf, self.context, + self.matchmaker) + else: + return zmq_central_proxy.DoubleRouterProxy(self.conf, self.context, + self.matchmaker) def run(self): self.proxy.run() diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py new file mode 100644 index 000000000..3499292ff --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_sender.py @@ -0,0 +1,69 @@ +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import logging + +import six + +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names + +zmq = zmq_async.import_zmq() +LOG = logging.getLogger(__name__) + + +@six.add_metaclass(abc.ABCMeta) +class Sender(object): + + @abc.abstractmethod + def send_message(self, socket, multipart_message): + """Send message to a socket from multipart list""" + + +class CentralRouterSender(Sender): + + def send_message(self, socket, multipart_message): + message_type = int(multipart_message[zmq_names.MESSAGE_TYPE_IDX]) + routing_key = multipart_message[zmq_names.ROUTING_KEY_IDX] + reply_id = multipart_message[zmq_names.REPLY_ID_IDX] + message_id = multipart_message[zmq_names.MESSAGE_ID_IDX] + socket.send(routing_key, zmq.SNDMORE) + socket.send(b'', zmq.SNDMORE) + socket.send(reply_id, zmq.SNDMORE) + socket.send(multipart_message[zmq_names.MESSAGE_TYPE_IDX], zmq.SNDMORE) + LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - from %(rid)s " + "to -> %(rkey)s" % + {"msg_type": zmq_names.message_type_str(message_type), + "msg_id": message_id, + "rkey": routing_key, + "rid": reply_id}) + socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:]) + + +class CentralPublisherSender(Sender): + + def send_message(self, socket, multipart_message): + message_type = int(multipart_message[zmq_names.MESSAGE_TYPE_IDX]) + assert message_type in (zmq_names.CAST_FANOUT_TYPE, + zmq_names.NOTIFY_TYPE), "Fanout expected!" + topic_filter = multipart_message[zmq_names.ROUTING_KEY_IDX] + message_id = multipart_message[zmq_names.MESSAGE_ID_IDX] + + socket.send(topic_filter, zmq.SNDMORE) + socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:]) + + LOG.debug("Publishing message %(message_id)s on [%(topic)s]", + {"topic": topic_filter, + "message_id": message_id}) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_address.py b/oslo_messaging/_drivers/zmq_driver/zmq_address.py index a1ca86d2f..800a26a3a 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_address.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_address.py @@ -27,10 +27,6 @@ def get_tcp_random_address(conf): return "tcp://%s" % conf.oslo_messaging_zmq.rpc_zmq_bind_address -def get_broker_address(conf): - return "ipc://%s/zmq-broker" % conf.oslo_messaging_zmq.rpc_zmq_ipc_dir - - def prefix_str(key, listener_type): return listener_type + "/" + key diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py index 6ec99cb83..83361a2d6 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py @@ -23,13 +23,14 @@ FIELD_REPLY_BODY = 'reply_body' FIELD_FAILURE = 'failure' -IDX_REPLY_TYPE = 1 -IDX_REPLY_BODY = 2 - -MULTIPART_IDX_ENVELOPE = 0 -MULTIPART_IDX_BODY = 1 +REPLY_ID_IDX = 0 +EMPTY_IDX = 1 +MESSAGE_TYPE_IDX = 2 +ROUTING_KEY_IDX = 3 +MESSAGE_ID_IDX = 4 +DEFAULT_TYPE = 0 CALL_TYPE = 1 CAST_TYPE = 2 CAST_FANOUT_TYPE = 3 @@ -37,13 +38,17 @@ NOTIFY_TYPE = 4 REPLY_TYPE = 5 ACK_TYPE = 6 -MESSAGE_TYPES = (CALL_TYPE, +REQUEST_TYPES = (CALL_TYPE, CAST_TYPE, CAST_FANOUT_TYPE, NOTIFY_TYPE) +RESPONSE_TYPES = (REPLY_TYPE, ACK_TYPE) + +MESSAGE_TYPES = REQUEST_TYPES + RESPONSE_TYPES + MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_TYPE) -DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, REPLY_TYPE) +DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, REPLY_TYPE, ACK_TYPE) CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE) NOTIFY_TYPES = (NOTIFY_TYPE,) NON_BLOCKING_TYPES = CAST_TYPES + NOTIFY_TYPES diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py index 81a708cfd..da21c604b 100644 --- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py +++ b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py @@ -22,8 +22,9 @@ import testscenarios from oslo_config import cfg import oslo_messaging +from oslo_messaging._drivers.zmq_driver.proxy.central \ + import zmq_publisher_proxy from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy -from oslo_messaging._drivers.zmq_driver.proxy import zmq_publisher_proxy from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -82,9 +83,8 @@ class TestPubSub(zmq_common.ZmqBaseTestCase): message = {'method': 'hello-world'} self.publisher.send_request( - [zmq_names.CAST_FANOUT_TYPE, + [b'', b'', zmq_names.CAST_FANOUT_TYPE, zmq_address.target_to_subscribe_filter(target), - b"message", b"0000-0000", self.dumps([context, message])]) diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py index dea640c19..d1b45acca 100644 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py +++ b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py @@ -20,7 +20,6 @@ import oslo_messaging from oslo_messaging._drivers.zmq_driver.client import zmq_receivers from oslo_messaging._drivers.zmq_driver.client import zmq_senders from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy -from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_options @@ -70,8 +69,7 @@ class TestZmqAckManager(test_utils.BaseTestCase): self.driver = transport._driver # prepare and launch proxy - self.proxy = zmq_proxy.ZmqProxy(self.conf, - zmq_queue_proxy.UniversalQueueProxy) + self.proxy = zmq_proxy.ZmqProxy(self.conf) vars(self.driver.matchmaker).update(vars(self.proxy.matchmaker)) self.executor = zmq_async.get_executor(self.proxy.run) self.executor.execute()