From 315e56ae2b91cb7ab5a8e24ead1b9ad8c0120552 Mon Sep 17 00:00:00 2001 From: Oleksii Zamiatin Date: Thu, 23 Jul 2015 14:05:04 +0300 Subject: [PATCH] Get rid of proxy process in zmq As far as we use redis as a name service we don't need a proxy, becase we can pass binded port over name service too. Change-Id: I59bbe2b34dcedfeef113ef06d6a988e1c413405e --- oslo_messaging/_cmd/__init__.py | 1 - oslo_messaging/_cmd/zmq_receiver.py | 44 ----- .../_drivers/zmq_driver/broker/__init__.py | 1 - .../zmq_driver/broker/zmq_base_proxy.py | 163 ------------------ .../_drivers/zmq_driver/broker/zmq_broker.py | 71 -------- .../zmq_driver/broker/zmq_call_proxy.py | 106 ------------ .../zmq_driver/broker/zmq_fanout_proxy.py | 38 ---- .../zmq_driver/broker/zmq_universal_proxy.py | 72 -------- .../_drivers/zmq_driver/matchmaker/base.py | 2 +- .../zmq_driver/poller/green_poller.py | 13 +- .../zmq_driver/rpc/client/zmq_call_request.py | 17 +- .../zmq_driver/rpc/client/zmq_cast_dealer.py | 12 +- .../zmq_driver/rpc/client/zmq_request.py | 1 - .../rpc/server/zmq_base_consumer.py | 37 ---- .../rpc/server/zmq_fanout_consumer.py | 74 -------- ...l_responder.py => zmq_incoming_message.py} | 41 ++--- .../zmq_driver/rpc/server/zmq_server.py | 73 +++++--- .../_drivers/zmq_driver/zmq_poller.py | 3 + .../_drivers/zmq_driver/zmq_serializer.py | 2 +- .../_drivers/zmq_driver/zmq_target.py | 29 ++-- .../zmq/matchmaker/test_impl_matchmaker.py | 2 +- .../tests/drivers/zmq/test_impl_zmq.py | 32 +--- setup-test-env-zmq.sh | 2 - tests/drivers/test_impl_zmq.py | 43 ++--- 24 files changed, 135 insertions(+), 744 deletions(-) delete mode 100644 oslo_messaging/_cmd/__init__.py delete mode 100644 oslo_messaging/_cmd/zmq_receiver.py delete mode 100644 oslo_messaging/_drivers/zmq_driver/broker/__init__.py delete mode 100644 oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py delete mode 100644 oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py delete mode 100644 oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py delete mode 100644 oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py delete mode 100644 oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py delete mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py delete mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_fanout_consumer.py rename oslo_messaging/_drivers/zmq_driver/rpc/server/{zmq_call_responder.py => zmq_incoming_message.py} (58%) diff --git a/oslo_messaging/_cmd/__init__.py b/oslo_messaging/_cmd/__init__.py deleted file mode 100644 index 8b1378917..000000000 --- a/oslo_messaging/_cmd/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/oslo_messaging/_cmd/zmq_receiver.py b/oslo_messaging/_cmd/zmq_receiver.py deleted file mode 100644 index abd24e8d4..000000000 --- a/oslo_messaging/_cmd/zmq_receiver.py +++ /dev/null @@ -1,44 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2011 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import contextlib -import logging -import sys - -from oslo_config import cfg - -from oslo_messaging._drivers import impl_zmq -from oslo_messaging._drivers.zmq_driver.broker import zmq_broker -from oslo_messaging._executors import base # FIXME(markmc) - -CONF = cfg.CONF -CONF.register_opts(impl_zmq.zmq_opts) -CONF.register_opts(base._pool_opts) -# TODO(ozamiatin): Move this option assignment to an external config file -# Use efficient zmq poller in real-world deployment -CONF.rpc_zmq_native = True - - -def main(): - CONF(sys.argv[1:], project='oslo') - logging.basicConfig(level=logging.DEBUG) - - with contextlib.closing(zmq_broker.ZmqBroker(CONF)) as reactor: - reactor.start() - reactor.wait() - -if __name__ == "__main__": - main() diff --git a/oslo_messaging/_drivers/zmq_driver/broker/__init__.py b/oslo_messaging/_drivers/zmq_driver/broker/__init__.py deleted file mode 100644 index 8af3e63a7..000000000 --- a/oslo_messaging/_drivers/zmq_driver/broker/__init__.py +++ /dev/null @@ -1 +0,0 @@ -__author__ = 'ozamiatin' diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py deleted file mode 100644 index 758c15c5f..000000000 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py +++ /dev/null @@ -1,163 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc -import logging - -import six - -from oslo_messaging._drivers.common import RPCException -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_target -from oslo_messaging._i18n import _LE, _LI - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() - - -@six.add_metaclass(abc.ABCMeta) -class BaseProxy(object): - - """Base TCP-proxy. - - TCP-proxy redirects messages received by TCP from clients to servers - over IPC. Consists of TCP-frontend and IPC-backend objects. Runs - in async executor. - """ - - def __init__(self, conf, context): - super(BaseProxy, self).__init__() - self.conf = conf - self.context = context - self.executor = zmq_async.get_executor( - self.run, native_zmq=conf.rpc_zmq_native) - - @abc.abstractmethod - def run(self): - """Main execution point of the proxy""" - - def start(self): - self.executor.execute() - - def stop(self): - self.executor.stop() - - def wait(self): - self.executor.wait() - - -@six.add_metaclass(abc.ABCMeta) -class BaseTcpFrontend(object): - - """Base frontend clause. - - TCP-frontend is a part of TCP-proxy which receives incoming - messages from clients. - """ - - def __init__(self, conf, poller, context, - socket_type=None, - port_number=None, - receive_meth=None): - - """Construct a TCP-frontend. - - Its attributes are: - - :param conf: Driver configuration object. - :type conf: ConfigOpts - :param poller: Messages poller-object green or threading. - :type poller: ZmqPoller - :param context: ZeroMQ context object. - :type context: zmq.Context - :param socket_type: ZeroMQ socket type. - :type socket_type: int - :param port_number: Current messaging pipeline port. - :type port_number: int - :param receive_meth: Receive method for poller. - :type receive_meth: method - """ - - self.conf = conf - self.poller = poller - self.context = context - try: - self.frontend = self.context.socket(socket_type) - bind_address = zmq_target.get_tcp_bind_address(port_number) - LOG.info(_LI("Binding to TCP %s") % bind_address) - self.frontend.bind(bind_address) - self.poller.register(self.frontend, receive_meth) - except zmq.ZMQError as e: - errmsg = _LE("Could not create ZeroMQ receiver daemon. " - "Socket may already be in use: %s") % str(e) - LOG.error(errmsg) - raise RPCException(errmsg) - - def receive_incoming(self): - message, socket = self.poller.poll(1) - LOG.info(_LI("Message %s received."), message) - return message - - def close(self): - self.frontend.close() - - -@six.add_metaclass(abc.ABCMeta) -class BaseBackendMatcher(object): - - def __init__(self, conf, poller, context): - self.conf = conf - self.context = context - self.backends = {} - self.poller = poller - - @abc.abstractmethod - def redirect_to_backend(self, message): - """Redirect message""" - - def close(self): - if self.backends: - for backend in self.backends.values(): - backend.close() - - -@six.add_metaclass(abc.ABCMeta) -class DirectBackendMatcher(BaseBackendMatcher): - - def redirect_to_backend(self, message): - backend, target = self._match_backend(message) - self._send_message(backend, message, target) - - def _match_backend(self, message): - target = self._get_target(message) - ipc_address = self._get_ipc_address(target) - backend = self._create_backend(ipc_address) - return backend, target - - @abc.abstractmethod - def _get_target(self, message): - """Extract topic from message""" - - @abc.abstractmethod - def _get_ipc_address(self, target): - """Get ipc backend address from topic""" - - @abc.abstractmethod - def _send_message(self, backend, message, target): - """Backend specific sending logic""" - - @abc.abstractmethod - def _create_backend(self, ipc_address): - """Backend specific socket opening logic""" diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py deleted file mode 100644 index e3835bae6..000000000 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py +++ /dev/null @@ -1,71 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import os - -from oslo_utils import excutils - -from oslo_messaging._drivers.zmq_driver.broker import zmq_universal_proxy -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._i18n import _LE, _LI - - -LOG = logging.getLogger(__name__) - - -class ZmqBroker(object): - """Local messaging IPC broker (nodes are still peers). - - The main purpose is to have one TCP connection - (one TCP port assigned for ZMQ messaging) per node. - There could be a number of services running on a node. - Without such broker a number of opened TCP ports used for - messaging become unpredictable for the engine. - - All messages are coming to TCP ROUTER socket and then - distributed between their targets by topic via IPC. - """ - - def __init__(self, conf): - super(ZmqBroker, self).__init__() - zmq = zmq_async.import_zmq(native_zmq=conf.rpc_zmq_native) - self.conf = conf - self.context = zmq.Context() - proxy = zmq_universal_proxy.UniversalProxy(conf, self.context) - self.proxies = [proxy] - self._create_ipc_dirs() - - def _create_ipc_dirs(self): - ipc_dir = self.conf.rpc_zmq_ipc_dir - try: - os.makedirs("%s/fanout" % ipc_dir) - except os.error: - if not os.path.isdir(ipc_dir): - with excutils.save_and_reraise_exception(): - LOG.error(_LE("Required IPC directory does not exist at" - " %s"), ipc_dir) - - def start(self): - for proxy in self.proxies: - proxy.start() - - def wait(self): - for proxy in self.proxies: - proxy.wait() - - def close(self): - LOG.info(_LI("Broker shutting down ...")) - for proxy in self.proxies: - proxy.stop() diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py deleted file mode 100644 index 623303902..000000000 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_call_proxy.py +++ /dev/null @@ -1,106 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging - -from oslo_messaging._drivers.common import RPCException -import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_serializer -from oslo_messaging._drivers.zmq_driver import zmq_target -from oslo_messaging._i18n import _LE, _LI - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() - - -class DealerBackend(base_proxy.DirectBackendMatcher): - - def __init__(self, conf, context, poller=None): - if poller is None: - poller = zmq_async.get_poller( - native_zmq=conf.rpc_zmq_native) - super(DealerBackend, self).__init__(conf, poller, context) - - def _get_target(self, message): - return zmq_serializer.get_target_from_call_message(message) - - def _get_ipc_address(self, target): - return zmq_target.get_ipc_address_call(self.conf, target) - - def _send_message(self, backend, message, topic): - # Empty needed for awaiting REP socket to work properly - # (DEALER-REP usage specific) - backend.send(b'', zmq.SNDMORE) - backend.send(message.pop(0), zmq.SNDMORE) - backend.send_string(message.pop(0), zmq.SNDMORE) - message.pop(0) # Drop target unneeded any more - backend.send_multipart(message) - - def _create_backend(self, ipc_address): - if ipc_address in self.backends: - return self.backends[ipc_address] - backend = self.context.socket(zmq.DEALER) - backend.connect(ipc_address) - self.poller.register(backend) - self.backends[ipc_address] = backend - return backend - - -class FrontendTcpRouter(base_proxy.BaseTcpFrontend): - - def __init__(self, conf, context, poller=None): - if poller is None: - poller = zmq_async.get_poller( - native_zmq=conf.rpc_zmq_native) - super(FrontendTcpRouter, self).__init__( - conf, poller, context, - socket_type=zmq.ROUTER, - port_number=conf.rpc_zmq_port, - receive_meth=self._receive_message) - - def _receive_message(self, socket): - - try: - reply_id = socket.recv() - empty = socket.recv() - assert empty == b'', "Empty delimiter expected" - msg_type = socket.recv_string() - target_dict = socket.recv_json() - target = zmq_target.target_from_dict(target_dict) - other = socket.recv_multipart() - except zmq.ZMQError as e: - LOG.error(_LE("Error receiving message %s") % str(e)) - return None - - if msg_type == zmq_serializer.FANOUT_TYPE: - other.insert(0, zmq_target.target_to_str(target).encode("utf-8")) - - return [reply_id, msg_type, target] + other - - @staticmethod - def _reduce_empty(reply): - reply.pop(0) - return reply - - def redirect_outgoing_reply(self, reply): - self._reduce_empty(reply) - try: - self.frontend.send_multipart(reply) - LOG.info(_LI("Redirecting reply to client %s") % reply) - except zmq.ZMQError: - errmsg = _LE("Failed redirecting reply to client %s") % reply - LOG.error(errmsg) - raise RPCException(errmsg) diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py deleted file mode 100644 index bf6492ee1..000000000 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_fanout_proxy.py +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_serializer -from oslo_messaging._drivers.zmq_driver import zmq_target - -zmq = zmq_async.import_zmq() - - -class PublisherBackend(base_proxy.BaseBackendMatcher): - - def __init__(self, conf, context): - poller = zmq_async.get_poller(native_zmq=conf.rpc_zmq_native) - super(PublisherBackend, self).__init__(conf, poller, context) - self.backend = self.context.socket(zmq.PUB) - self.backend.bind(zmq_target.get_ipc_address_fanout(conf)) - - def redirect_to_backend(self, message): - target_pos = zmq_serializer.MESSAGE_CALL_TARGET_POSITION + 1 - msg = message[target_pos:] - self.backend.send_multipart(msg) - - def close(self): - self.backend.close() diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py deleted file mode 100644 index 82d3e024d..000000000 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_universal_proxy.py +++ /dev/null @@ -1,72 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging - -import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy -from oslo_messaging._drivers.zmq_driver.broker import zmq_call_proxy -from oslo_messaging._drivers.zmq_driver.broker import zmq_fanout_proxy -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_serializer -from oslo_messaging._i18n import _LI - -LOG = logging.getLogger(__name__) - - -class UniversalProxy(base_proxy.BaseProxy): - - def __init__(self, conf, context): - super(UniversalProxy, self).__init__(conf, context) - self.poller = zmq_async.get_poller( - native_zmq=conf.rpc_zmq_native) - self.tcp_frontend = zmq_call_proxy.FrontendTcpRouter( - conf, context, poller=self.poller) - self.backend_matcher = BackendMatcher( - conf, context, poller=self.poller) - call = zmq_serializer.CALL_TYPE - self.call_backend = self.backend_matcher.backends[call] - LOG.info(_LI("Starting universal-proxy thread")) - - def run(self): - message, socket = self.poller.poll(self.conf.rpc_poll_timeout) - if message is None: - return - - LOG.info(_LI("Received message at universal proxy: %s") % str(message)) - - if socket == self.tcp_frontend.frontend: - self.backend_matcher.redirect_to_backend(message) - else: - self.tcp_frontend.redirect_outgoing_reply(message) - - def stop(self): - self.poller.close() - super(UniversalProxy, self).stop() - self.tcp_frontend.close() - self.backend_matcher.close() - - -class BackendMatcher(base_proxy.BaseBackendMatcher): - - def __init__(self, conf, context, poller=None): - super(BackendMatcher, self).__init__(conf, poller, context) - direct_backend = zmq_call_proxy.DealerBackend(conf, context, poller) - self.backends[zmq_serializer.CALL_TYPE] = direct_backend - self.backends[zmq_serializer.CAST_TYPE] = direct_backend - fanout_backend = zmq_fanout_proxy.PublisherBackend(conf, context) - self.backends[zmq_serializer.FANOUT_TYPE] = fanout_backend - - def redirect_to_backend(self, message): - message_type = zmq_serializer.get_msg_type(message) - self.backends[message_type].redirect_to_backend(message) diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py index 766e367c2..1bd75f2a2 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py @@ -46,7 +46,7 @@ class MatchMakerBase(object): if len(hosts) == 0: LOG.warning(_LW("No hosts were found for target %s. Using " "localhost") % target) - return "localhost" + return "localhost:" + str(self.conf.rpc_zmq_port) elif len(hosts) == 1: LOG.info(_LI("A single host found for target %s.") % target) return hosts[0] diff --git a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py index 3e6f5148a..72429f1f1 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py @@ -62,12 +62,15 @@ class GreenPoller(zmq_poller.ZmqPoller): for thread in self.threads: thread.kill() + self.threads = [] + class HoldReplyPoller(GreenPoller): def __init__(self): super(HoldReplyPoller, self).__init__() self.event_by_socket = {} + self._is_running = threading.Event() def register(self, socket, recv_method=None): super(HoldReplyPoller, self).register(socket, recv_method) @@ -79,7 +82,7 @@ class HoldReplyPoller(GreenPoller): def _socket_receive(self, socket, recv_method=None): pause = self.event_by_socket[socket] - while True: + while not self._is_running.is_set(): pause.clear() if recv_method: incoming = recv_method(socket) @@ -88,6 +91,14 @@ class HoldReplyPoller(GreenPoller): self.incoming_queue.put((incoming, socket)) pause.wait() + def close(self): + self._is_running.set() + for pause in self.event_by_socket.values(): + pause.set() + eventlet.sleep() + + super(HoldReplyPoller, self).close() + class GreenExecutor(zmq_poller.Executor): diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py index 8bdd9c0af..dea54d471 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py @@ -42,25 +42,26 @@ class CallRequest(Request): message, socket, zmq_serializer.CALL_TYPE, timeout, retry) - self.host = self.matchmaker.get_single_host(self.target) - self.connect_address = zmq_target.get_tcp_address_call(conf, - self.host) + self.connect_address = zmq_target.get_tcp_direct_address( + self.host) LOG.info(_LI("Connecting REQ to %s") % self.connect_address) self.socket.connect(self.connect_address) + self.reply_poller.register( + self.socket, recv_method=lambda socket: socket.recv_json()) + except zmq.ZMQError as e: - LOG.error(_LE("Error connecting to socket: %s") % str(e)) - raise + errmsg = _LE("Error connecting to socket: %s") % str(e) + LOG.error(errmsg) + raise rpc_common.RPCException(errmsg) def close(self): self.reply_poller.close() + self.socket.setsockopt(zmq.LINGER, 0) self.socket.close() def receive_reply(self): # NOTE(ozamiatin): Check for retry here (no retries now) - self.reply_poller.register( - self.socket, recv_method=lambda socket: socket.recv_json()) - reply, socket = self.reply_poller.poll(timeout=self.timeout) if reply is None: raise oslo_messaging.MessagingTimeout( diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py index f28ed428f..75551a6a4 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py @@ -14,6 +14,7 @@ import logging +from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_publisher from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request from oslo_messaging._drivers.zmq_driver import zmq_async @@ -58,7 +59,7 @@ class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase): def cast(self, target, context, message, timeout=None, retry=None): host = self.matchmaker.get_single_host(target) - connect_address = zmq_target.get_tcp_address_call(self.conf, host) + connect_address = zmq_target.get_tcp_direct_address(host) dealer_socket = self._create_socket(connect_address) request = CastRequest(self.conf, target, context, message, dealer_socket, connect_address, timeout, retry) @@ -73,11 +74,14 @@ class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase): dealer_socket.connect(address) self.outbound_sockets[address] = dealer_socket return dealer_socket - except zmq.ZMQError: - LOG.error(_LE("Failed connecting DEALER to %s") % address) - raise + except zmq.ZMQError as e: + errmsg = _LE("Failed connecting DEALER to %(address)s: %(e)s")\ + % (address, e) + LOG.error(errmsg) + raise rpc_common.RPCException(errmsg) def cleanup(self): if self.outbound_sockets: for socket in self.outbound_sockets.values(): + socket.setsockopt(zmq.LINGER, 0) socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py index 144c5c107..3f8e1da61 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py @@ -62,7 +62,6 @@ class Request(object): def send_request(self): self.socket.send_string(self.msg_type, zmq.SNDMORE) self.socket.send_json(self.target.__dict__, zmq.SNDMORE) - self.socket.send_string(self.msg_id, zmq.SNDMORE) self.socket.send_json(self.context, zmq.SNDMORE) self.socket.send_json(self.message) diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py deleted file mode 100644 index a74b7dcba..000000000 --- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_base_consumer.py +++ /dev/null @@ -1,37 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc - -import six - - -@six.add_metaclass(abc.ABCMeta) -class ConsumerBase(object): - - def __init__(self, listener, conf, zmq_poller, context): - self.listener = listener - self.conf = conf - self.poller = zmq_poller - self.context = context - self.sockets_per_target = {} - - def cleanup(self): - if self.sockets_per_target: - for socket in self.sockets_per_target.values(): - socket.close() - - @abc.abstractmethod - def listen(self, target): - """Listen for target""" diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_fanout_consumer.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_fanout_consumer.py deleted file mode 100644 index 0c54ca36b..000000000 --- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_fanout_consumer.py +++ /dev/null @@ -1,74 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -import logging - -import six - -from oslo_messaging._drivers import base -from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_base_consumer -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_target as topic_utils -from oslo_messaging._i18n import _LE - - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() - - -class ZmqFanoutMessage(base.IncomingMessage): - - def __init__(self, listener, context, message, socket, poller): - super(ZmqFanoutMessage, self).__init__(listener, context, message) - poller.resume_polling(socket) - - def reply(self, reply=None, failure=None, log_failure=True): - """Reply is not needed for fanout(cast) messages""" - - def acknowledge(self): - pass - - def requeue(self): - pass - - -class FanoutConsumer(zmq_base_consumer.ConsumerBase): - - def _receive_message(self, socket): - try: - topic = socket.recv_string() - assert topic is not None, 'Bad format: Topic is expected' - msg_id = socket.recv_string() - assert msg_id is not None, 'Bad format: message ID expected' - context = socket.recv_json() - message = socket.recv_json() - LOG.debug("[Server] REP Received message %s" % str(message)) - incoming = ZmqFanoutMessage(self.listener, context, message, - socket, self.poller) - return incoming - except zmq.ZMQError as e: - LOG.error(_LE("Receiving message failed ... {}"), e) - - def listen(self, target): - topic = topic_utils.target_to_str(target) - ipc_address = topic_utils.get_ipc_address_fanout(self.conf) - sub_socket = self.context.socket(zmq.SUB) - sub_socket.connect(ipc_address) - if six.PY3: - sub_socket.setsockopt_string(zmq.SUBSCRIBE, str(topic)) - else: - sub_socket.setsockopt(zmq.SUBSCRIBE, str(topic)) - self.poller.register(sub_socket, self._receive_message) diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_incoming_message.py similarity index 58% rename from oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py rename to oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_incoming_message.py index f440359a2..1373019e1 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_incoming_message.py @@ -17,11 +17,8 @@ import logging from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common -from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_base_consumer from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_serializer -from oslo_messaging._drivers.zmq_driver import zmq_target -from oslo_messaging._i18n import _LE LOG = logging.getLogger(__name__) @@ -59,31 +56,17 @@ class ZmqIncomingRequest(base.IncomingMessage): pass -class CallResponder(zmq_base_consumer.ConsumerBase): +class ZmqFanoutMessage(base.IncomingMessage): - def _receive_message(self, socket): - try: - reply_id = socket.recv() - msg_type = socket.recv_string() - assert msg_type is not None, 'Bad format: msg type expected' - msg_id = socket.recv_string() - assert msg_id is not None, 'Bad format: message ID expected' - context = socket.recv_json() - message = socket.recv_json() - LOG.debug("[Server] REP Received message %s" % str(message)) - incoming = ZmqIncomingRequest(self.listener, - context, - message, socket, - reply_id, - self.poller) - return incoming - except zmq.ZMQError as e: - LOG.error(_LE("Receiving message failed: %s") % str(e)) + def __init__(self, listener, context, message, socket, poller): + super(ZmqFanoutMessage, self).__init__(listener, context, message) + poller.resume_polling(socket) - def listen(self, target): - ipc_rep_address = zmq_target.get_ipc_address_call(self.conf, target) - rep_socket = self.context.socket(zmq.REP) - rep_socket.bind(ipc_rep_address) - str_target = zmq_target.target_to_str(target) - self.sockets_per_target[str_target] = rep_socket - self.poller.register(rep_socket, self._receive_message) + def reply(self, reply=None, failure=None, log_failure=True): + """Reply is not needed for fanout(cast) messages""" + + def acknowledge(self): + pass + + def requeue(self): + pass diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py index 9621e8ce9..0132aacaf 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py @@ -15,9 +15,12 @@ import logging from oslo_messaging._drivers import base -from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_call_responder -from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_fanout_consumer +from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_incoming_message from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_serializer +from oslo_messaging._drivers.zmq_driver import zmq_target +from oslo_messaging._i18n import _LE LOG = logging.getLogger(__name__) @@ -27,36 +30,64 @@ zmq = zmq_async.import_zmq() class ZmqServer(base.Listener): def __init__(self, conf, matchmaker=None): - LOG.info("[Server] __init__") self.conf = conf - self.context = zmq.Context() - self.poller = zmq_async.get_reply_poller() + try: + self.context = zmq.Context() + self.socket = self.context.socket(zmq.ROUTER) + self.address = zmq_target.get_tcp_random_address(conf) + self.port = self.socket.bind_to_random_port(self.address) + LOG.info("Run server on tcp://%s:%d" % + (self.address, self.port)) + except zmq.ZMQError as e: + errmsg = _LE("Failed binding to port %(port)d: %(e)s")\ + % (self.port, e) + LOG.error(errmsg) + raise rpc_common.RPCException(errmsg) + + self.poller = zmq_async.get_poller() + self.poller.register(self.socket, self._receive_message) self.matchmaker = matchmaker - self.call_resp = zmq_call_responder.CallResponder(self, conf, - self.poller, - self.context) - self.fanout_resp = zmq_fanout_consumer.FanoutConsumer(self, conf, - self.poller, - self.context) def poll(self, timeout=None): incoming = self.poller.poll(timeout or self.conf.rpc_poll_timeout) return incoming[0] def stop(self): - LOG.info("[Server] Stop") + LOG.info("Stop server tcp://%s:%d" % (self.address, self.port)) def cleanup(self): self.poller.close() - self.call_resp.cleanup() - self.fanout_resp.cleanup() + if not self.socket.closed: + self.socket.setsockopt(zmq.LINGER, 0) + self.socket.close() def listen(self, target): - LOG.info("[Server] Listen to Target %s" % target) - + LOG.info("Listen to Target %s on tcp://%s:%d" % + (target, self.address, self.port)) + host = zmq_target.combine_address(self.conf.rpc_zmq_host, self.port) self.matchmaker.register(target=target, - hostname=self.conf.rpc_zmq_host) - if target.fanout: - self.fanout_resp.listen(target) - else: - self.call_resp.listen(target) + hostname=host) + + def _receive_message(self, socket): + try: + reply_id = socket.recv() + empty = socket.recv() + assert empty == b'', 'Bad format: empty delimiter expected' + msg_type = socket.recv_string() + assert msg_type is not None, 'Bad format: msg type expected' + target_dict = socket.recv_json() + assert target_dict is not None, 'Bad format: target expected' + context = socket.recv_json() + message = socket.recv_json() + LOG.debug("Received CALL message %s" % str(message)) + + direct_type = (zmq_serializer.CALL_TYPE, zmq_serializer.CAST_TYPE) + if msg_type in direct_type: + return zmq_incoming_message.ZmqIncomingRequest( + self, context, message, socket, reply_id, self.poller) + elif msg_type == zmq_serializer.FANOUT_TYPE: + return zmq_incoming_message.ZmqFanoutMessage( + self, context, message, socket, self.poller) + + except zmq.ZMQError as e: + LOG.error(_LE("Receiving message failed: %s") % str(e)) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py index 02d4ee87c..437c841ab 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py @@ -32,6 +32,9 @@ class ZmqPoller(object): def close(self): """Terminate polling""" + def resume_polling(self, socket): + """Resume with polling""" + @six.add_metaclass(abc.ABCMeta) class Executor(object): diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py b/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py index cda3aca4e..41663f639 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py @@ -44,7 +44,7 @@ def get_msg_type(message): if type not in MESSAGE_TYPES: errmsg = _LE("Unknown message type: %s") % str(type) LOG.error(errmsg) - rpc_common.RPCException(errmsg) + raise rpc_common.RPCException(errmsg) return type diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_target.py b/oslo_messaging/_drivers/zmq_driver/zmq_target.py index a5e5de8dd..3db6aace3 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_target.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_target.py @@ -15,11 +15,6 @@ from oslo_messaging import target -def get_ipc_address_call(conf, target): - target_addr = target_to_str(target) - return "ipc://%s/%s" % (conf.rpc_zmq_ipc_dir, target_addr) - - def get_tcp_bind_address(port): return "tcp://*:%s" % port @@ -28,19 +23,27 @@ def get_tcp_address_call(conf, host): return "tcp://%s:%s" % (host, conf.rpc_zmq_port) -def get_ipc_address_cast(conf, target): - target_addr = target_to_str(target) - return "ipc://%s/fanout/%s" % (conf.rpc_zmq_ipc_dir, target_addr) +def combine_address(host, port): + return "%s:%s" % (host, port) -def get_ipc_address_fanout(conf): - return "ipc://%s/fanout_general" % conf.rpc_zmq_ipc_dir +def get_tcp_direct_address(host): + return "tcp://%s" % (host) + + +def get_tcp_random_address(conf): + return "tcp://*" def target_to_str(target): - if target.server is None: - return target.topic - return "%s.%s" % (target.server, target.topic) + items = [] + if target.topic: + items.append(target.topic) + if target.exchange: + items.append(target.exchange) + if target.server: + items.append(target.server) + return '.'.join(items) def target_from_dict(target_dict): diff --git a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py index 9cae5fe6a..da296d82f 100644 --- a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py +++ b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py @@ -77,4 +77,4 @@ class TestImplMatchmaker(test_utils.BaseTestCase): def test_get_single_host_wrong_topic(self): target = oslo_messaging.Target(topic="no_such_topic") self.assertEqual(self.test_matcher.get_single_host(target), - "localhost") + "localhost:9501") diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py index 563483a25..d191ae64c 100644 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py @@ -13,7 +13,6 @@ # under the License. import logging -import socket import threading import fixtures @@ -21,7 +20,6 @@ import testtools import oslo_messaging from oslo_messaging._drivers import impl_zmq -from oslo_messaging._drivers.zmq_driver.broker.zmq_broker import ZmqBroker from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._i18n import _ from oslo_messaging.tests import utils as test_utils @@ -61,40 +59,26 @@ class TestRPCServerListener(object): self.executor.stop() -def get_unused_port(): - """Returns an unused port on localhost.""" - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind(('localhost', 0)) - port = s.getsockname()[1] - s.close() - return port - - class ZmqBaseTestCase(test_utils.BaseTestCase): - """Base test case for all ZMQ tests that make use of the ZMQ Proxy""" + """Base test case for all ZMQ tests """ @testtools.skipIf(zmq is None, "zmq not available") def setUp(self): super(ZmqBaseTestCase, self).setUp() self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver # Set config values self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path kwargs = {'rpc_zmq_bind_address': '127.0.0.1', 'rpc_zmq_host': '127.0.0.1', 'rpc_response_timeout': 5, - 'rpc_zmq_port': get_unused_port(), - 'rpc_zmq_ipc_dir': self.internal_ipc_dir} + 'rpc_zmq_ipc_dir': self.internal_ipc_dir, + 'rpc_zmq_matchmaker': 'dummy'} self.config(**kwargs) - # Start RPC - LOG.info("Running internal zmq receiver.") - self.broker = ZmqBroker(self.conf) - self.broker.start() + # Get driver + transport = oslo_messaging.get_transport(self.conf) + self.driver = transport._driver self.listener = TestRPCServerListener(self.driver) @@ -118,8 +102,6 @@ class stopRpc(object): self.attrs = attrs def __call__(self): - if self.attrs['broker']: - self.attrs['broker'].close() if self.attrs['driver']: self.attrs['driver'].cleanup() if self.attrs['listener']: @@ -151,7 +133,7 @@ class TestZmqBasics(ZmqBaseTestCase): def test_send_noreply(self): """Cast() with topic.""" - target = oslo_messaging.Target(topic='testtopic', server="127.0.0.1") + target = oslo_messaging.Target(topic='testtopic', server="my@server") self.listener.listen(target) result = self.driver.send( target, {}, diff --git a/setup-test-env-zmq.sh b/setup-test-env-zmq.sh index b27ee9d3f..353c2602c 100755 --- a/setup-test-env-zmq.sh +++ b/setup-test-env-zmq.sh @@ -22,6 +22,4 @@ EOF redis-server --port $ZMQ_REDIS_PORT & -oslo-messaging-zmq-receiver --config-file ${DATADIR}/zmq.conf > ${DATADIR}/receiver.log 2>&1 & - $* diff --git a/tests/drivers/test_impl_zmq.py b/tests/drivers/test_impl_zmq.py index fb12ac760..d191ae64c 100644 --- a/tests/drivers/test_impl_zmq.py +++ b/tests/drivers/test_impl_zmq.py @@ -13,7 +13,6 @@ # under the License. import logging -import socket import threading import fixtures @@ -21,7 +20,6 @@ import testtools import oslo_messaging from oslo_messaging._drivers import impl_zmq -from oslo_messaging._drivers.zmq_driver.broker.zmq_broker import ZmqBroker from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._i18n import _ from oslo_messaging.tests import utils as test_utils @@ -61,40 +59,26 @@ class TestRPCServerListener(object): self.executor.stop() -def get_unused_port(): - """Returns an unused port on localhost.""" - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind(('localhost', 0)) - port = s.getsockname()[1] - s.close() - return port - - class ZmqBaseTestCase(test_utils.BaseTestCase): - """Base test case for all ZMQ tests that make use of the ZMQ Proxy""" + """Base test case for all ZMQ tests """ @testtools.skipIf(zmq is None, "zmq not available") def setUp(self): super(ZmqBaseTestCase, self).setUp() self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver # Set config values self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path kwargs = {'rpc_zmq_bind_address': '127.0.0.1', 'rpc_zmq_host': '127.0.0.1', 'rpc_response_timeout': 5, - 'rpc_zmq_port': get_unused_port(), - 'rpc_zmq_ipc_dir': self.internal_ipc_dir} + 'rpc_zmq_ipc_dir': self.internal_ipc_dir, + 'rpc_zmq_matchmaker': 'dummy'} self.config(**kwargs) - # Start RPC - LOG.info("Running internal zmq receiver.") - self.broker = ZmqBroker(self.conf) - self.broker.start() + # Get driver + transport = oslo_messaging.get_transport(self.conf) + self.driver = transport._driver self.listener = TestRPCServerListener(self.driver) @@ -118,8 +102,6 @@ class stopRpc(object): self.attrs = attrs def __call__(self): - if self.attrs['broker']: - self.attrs['broker'].close() if self.attrs['driver']: self.attrs['driver'].cleanup() if self.attrs['listener']: @@ -146,12 +128,12 @@ class TestZmqBasics(ZmqBaseTestCase): target, {}, {'method': 'hello-world', 'tx_id': 1}, wait_for_reply=True) - self.assertIsNotNone(result) + self.assertTrue(result) def test_send_noreply(self): """Cast() with topic.""" - target = oslo_messaging.Target(topic='testtopic', server="127.0.0.1") + target = oslo_messaging.Target(topic='testtopic', server="my@server") self.listener.listen(target) result = self.driver.send( target, {}, @@ -165,20 +147,21 @@ class TestZmqBasics(ZmqBaseTestCase): method = self.listener.message.message[u'method'] self.assertEqual(u'hello-world', method) - @testtools.skip("Not implemented feature") def test_send_fanout(self): target = oslo_messaging.Target(topic='testtopic', fanout=True) - self.driver.listen(target) + self.listener.listen(target) result = self.driver.send( target, {}, {'method': 'hello-world', 'tx_id': 1}, wait_for_reply=False) + self.listener._received.wait() + self.assertIsNone(result) self.assertEqual(True, self.listener._received.isSet()) - msg_pattern = "{'method': 'hello-world', 'tx_id': 1}" - self.assertEqual(msg_pattern, self.listener.message) + method = self.listener.message.message[u'method'] + self.assertEqual(u'hello-world', method) def test_send_receive_direct(self): """Call() without topic."""