From 1385df618107400330c4bc58e2bdce8489bb1dcd Mon Sep 17 00:00:00 2001 From: Oleksii Zamiatin Date: Mon, 22 Feb 2016 13:55:01 +0200 Subject: [PATCH] [zmq] Refactoring consumer side * Introduce ZmqNotificationServer * Implement single listener per target Change-Id: I874c3fa6a86d3110a2145bea8ad06ca0bbd522c7 --- oslo_messaging/_drivers/impl_zmq.py | 9 +-- .../dealer/zmq_dealer_call_publisher.py | 10 +-- .../publishers/dealer/zmq_dealer_publisher.py | 4 +- .../zmq_driver/poller/green_poller.py | 37 +--------- .../server/consumers/zmq_consumer_base.py | 62 +++++++--------- .../server/consumers/zmq_pull_consumer.py | 14 ---- .../server/consumers/zmq_router_consumer.py | 15 ---- .../server/consumers/zmq_sub_consumer.py | 73 ++++--------------- .../_drivers/zmq_driver/server/zmq_server.py | 45 ++++++++---- .../_drivers/zmq_driver/zmq_async.py | 11 --- .../_drivers/zmq_driver/zmq_socket.py | 1 + .../tests/drivers/zmq/test_impl_zmq.py | 55 -------------- .../tests/drivers/zmq/test_pub_sub.py | 2 +- .../tests/drivers/zmq/test_zmq_async.py | 12 +-- 14 files changed, 92 insertions(+), 258 deletions(-) diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index d27f8c036..0726ae097 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -254,9 +254,7 @@ class ZmqDriver(base.BaseDriver): :param target: Message destination target :type target: oslo_messaging.Target """ - server = zmq_server.ZmqServer(self, self.conf, self.matchmaker) - server.listen(target) - return server + return zmq_server.ZmqServer(self, self.conf, self.matchmaker, target) def listen_for_notifications(self, targets_and_priorities, pool): """Listen to a specified list of targets on a server side @@ -266,9 +264,8 @@ class ZmqDriver(base.BaseDriver): :param pool: Not used for zmq implementation :type pool: object """ - server = zmq_server.ZmqServer(self, self.conf, self.matchmaker) - server.listen_notification(targets_and_priorities) - return server + return zmq_server.ZmqNotificationServer( + self, self.conf, self.matchmaker, targets_and_priorities) def cleanup(self): """Cleanup all driver's connections finally diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py index 383c41485..3ced6ce4c 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py @@ -110,14 +110,12 @@ class ReplyWaiter(object): self._lock = threading.Lock() def track_reply(self, reply_future, message_id): - self._lock.acquire() - self.replies[message_id] = reply_future - self._lock.release() + with self._lock: + self.replies[message_id] = reply_future def untrack_id(self, message_id): - self._lock.acquire() - self.replies.pop(message_id) - self._lock.release() + with self._lock: + self.replies.pop(message_id) def poll_socket(self, socket): diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py index f8fa60552..cba029426 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py @@ -54,9 +54,6 @@ class DealerPublisher(zmq_publisher_base.QueuedSender): raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) super(DealerPublisher, self).send_request(request) - def cleanup(self): - super(DealerPublisher, self).cleanup() - class DealerPublisherLight(zmq_publisher_base.QueuedSender): """Used when publishing to proxy. """ @@ -91,3 +88,4 @@ class DealerPublisherLight(zmq_publisher_base.QueuedSender): def cleanup(self): self.socket.close() + super(DealerPublisherLight, self).cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py index e17879b7f..591b2ac7a 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py @@ -44,7 +44,7 @@ class GreenPoller(zmq_poller.ZmqPoller): try: return self.incoming_queue.get(timeout=timeout) except eventlet.queue.Empty: - return (None, None) + return None, None def close(self): for thread in self.thread_by_socket.values(): @@ -53,41 +53,6 @@ class GreenPoller(zmq_poller.ZmqPoller): self.thread_by_socket = {} -class HoldReplyPoller(GreenPoller): - - def __init__(self): - super(HoldReplyPoller, self).__init__() - self.event_by_socket = {} - self._is_running = threading.Event() - - def register(self, socket, recv_method=None): - super(HoldReplyPoller, self).register(socket, recv_method) - self.event_by_socket[socket] = threading.Event() - - def resume_polling(self, socket): - pause = self.event_by_socket[socket] - pause.set() - - def _socket_receive(self, socket, recv_method=None): - pause = self.event_by_socket[socket] - while not self._is_running.is_set(): - pause.clear() - if recv_method: - incoming = recv_method(socket) - else: - incoming = socket.recv_multipart() - self.incoming_queue.put((incoming, socket)) - pause.wait() - - def close(self): - self._is_running.set() - for pause in self.event_by_socket.values(): - pause.set() - eventlet.sleep() - - super(HoldReplyPoller, self).close() - - class GreenExecutor(zmq_poller.Executor): def __init__(self, method): diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py index c6ac324b0..86dddee61 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py @@ -14,12 +14,12 @@ import abc import logging -import threading import time import six from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_socket @@ -40,10 +40,6 @@ class ConsumerBase(object): self.sockets = [] self.context = zmq.Context() - @abc.abstractmethod - def listen(self, target): - """Associate new sockets with targets here""" - @abc.abstractmethod def receive_message(self, target): """Method for poller - receiving message routine""" @@ -59,18 +55,26 @@ class SingleSocketConsumer(ConsumerBase): def __init__(self, conf, poller, server, socket_type): super(SingleSocketConsumer, self).__init__(conf, poller, server) + self.matchmaker = server.matchmaker + self.target = server.target + self.socket_type = socket_type + self.host = None self.socket = self.subscribe_socket(socket_type) + self.target_updater = TargetUpdater(conf, self.matchmaker, self.target, + self.host, socket_type) def subscribe_socket(self, socket_type): try: socket = zmq_socket.ZmqRandomPortSocket( self.conf, self.context, socket_type) self.sockets.append(socket) - self.poller.register(socket, self.receive_message) LOG.debug("Run %(stype)s consumer on %(addr)s:%(port)d", {"stype": zmq_names.socket_type_str(socket_type), "addr": socket.bind_address, "port": socket.port}) + self.host = zmq_address.combine_address(self.conf.rpc_zmq_host, + socket.port) + self.poller.register(socket, self.receive_message) return socket except zmq.ZMQError as e: errmsg = _LE("Failed binding to port %(port)d: %(e)s")\ @@ -87,42 +91,30 @@ class SingleSocketConsumer(ConsumerBase): def port(self): return self.socket.port + def cleanup(self): + self.target_updater.cleanup() + super(SingleSocketConsumer, self).cleanup() -class TargetsManager(object): - def __init__(self, conf, matchmaker, host, socket_type): - self.targets = [] +class TargetUpdater(object): + """This entity performs periodic async updates + to the matchmaker. + """ + + def __init__(self, conf, matchmaker, target, host, socket_type): self.conf = conf self.matchmaker = matchmaker + self.target = target self.host = host self.socket_type = socket_type - self.targets_lock = threading.Lock() - self.updater = zmq_async.get_executor(method=self._update_targets) \ - if conf.zmq_target_expire > 0 else None - if self.updater: - self.updater.execute() + self.executor = zmq_async.get_executor(method=self._update_target) + self.executor.execute() - def _update_targets(self): - with self.targets_lock: - for target in self.targets: - self.matchmaker.register( - target, self.host, - zmq_names.socket_type_str(self.socket_type)) - - # Update target-records once per half expiration time + def _update_target(self): + self.matchmaker.register( + self.target, self.host, + zmq_names.socket_type_str(self.socket_type)) time.sleep(self.conf.zmq_target_expire / 2) - def listen(self, target): - with self.targets_lock: - self.targets.append(target) - self.matchmaker.register( - target, self.host, - zmq_names.socket_type_str(self.socket_type)) - def cleanup(self): - if self.updater: - self.updater.stop() - for target in self.targets: - self.matchmaker.unregister( - target, self.host, - zmq_names.socket_type_str(self.socket_type)) + self.executor.stop() diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py index 6943d1f69..611d854aa 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py @@ -17,7 +17,6 @@ import logging from oslo_messaging._drivers import base from oslo_messaging._drivers.zmq_driver.server.consumers\ import zmq_consumer_base -from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._i18n import _LE, _LI @@ -46,21 +45,8 @@ class PullConsumer(zmq_consumer_base.SingleSocketConsumer): def __init__(self, conf, poller, server): super(PullConsumer, self).__init__(conf, poller, server, zmq.PULL) - self.matchmaker = server.matchmaker - self.host = zmq_address.combine_address(self.conf.rpc_zmq_host, - self.port) - self.targets = zmq_consumer_base.TargetsManager( - conf, self.matchmaker, self.host, zmq.PULL) LOG.info(_LI("[%s] Run PULL consumer"), self.host) - def listen(self, target): - LOG.info(_LI("Listen to target %s"), str(target)) - self.targets.listen(target) - - def cleanup(self): - super(PullConsumer, self).cleanup() - self.targets.cleanup() - def receive_message(self, socket): try: request = socket.recv_pyobj() diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index 010eecb62..eac2bb727 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -18,7 +18,6 @@ from oslo_messaging._drivers import base from oslo_messaging._drivers.zmq_driver.server.consumers\ import zmq_consumer_base from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message -from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._i18n import _LE, _LI @@ -53,22 +52,8 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): def __init__(self, conf, poller, server): super(RouterConsumer, self).__init__(conf, poller, server, zmq.ROUTER) - self.matchmaker = server.matchmaker - self.host = zmq_address.combine_address(self.conf.rpc_zmq_host, - self.port) - self.targets = zmq_consumer_base.TargetsManager( - conf, self.matchmaker, self.host, zmq.ROUTER) LOG.info(_LI("[%s] Run ROUTER consumer"), self.host) - def listen(self, target): - LOG.info(_LI("[%(host)s] Listen to target %(target)s"), - {'host': self.host, 'target': target}) - self.targets.listen(target) - - def cleanup(self): - super(RouterConsumer, self).cleanup() - self.targets.cleanup() - def _receive_request(self, socket): reply_id = socket.recv() empty = socket.recv() diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py index 19a00c042..3baffe685 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py @@ -13,7 +13,6 @@ # under the License. import logging -import threading import uuid import six @@ -34,12 +33,11 @@ zmq = zmq_async.import_zmq() class SubIncomingMessage(base.RpcIncomingMessage): - def __init__(self, request, socket, poller): + def __init__(self, request, socket): super(SubIncomingMessage, self).__init__( request.context, request.message) self.socket = socket self.msg_id = request.message_id - poller.resume_polling(socket) def reply(self, reply=None, failure=None, log_failure=True): """Reply is not needed for non-call messages.""" @@ -56,16 +54,24 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): def __init__(self, conf, poller, server): super(SubConsumer, self).__init__(conf, poller, server) self.matchmaker = server.matchmaker + self.target = server.target self.subscriptions = set() - self.targets = [] - self._socket_lock = threading.Lock() self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB) self.sockets.append(self.socket) self.id = uuid.uuid4() - self.publishers_poller = MatchmakerPoller( - self.matchmaker, on_result=self.on_publishers) + self._subscribe_on_target(self.target) + self.on_publishers(self.matchmaker.get_publishers()) + self.poller.register(self.socket, self.receive_message) + + def on_publishers(self, publishers): + for host, sync in publishers: + self.socket.connect(zmq_address.get_tcp_direct_address(host)) + LOG.debug("[%s] SUB consumer connected to publishers %s", + self.id, publishers) def _subscribe_on_target(self, target): + # NOTE(ozamiatin): No locks needed here, because this is called + # before the async updater loop started topic_filter = zmq_address.target_to_subscribe_filter(target) if target.topic: self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.topic)) @@ -80,20 +86,6 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): LOG.debug("[%(host)s] Subscribing to topic %(filter)s", {"host": self.id, "filter": topic_filter}) - def on_publishers(self, publishers): - with self._socket_lock: - for host, sync in publishers: - self.socket.connect(zmq_address.get_tcp_direct_address(host)) - - self.poller.register(self.socket, self.receive_message) - LOG.debug("[%s] SUB consumer connected to publishers %s", - self.id, publishers) - - def listen(self, target): - LOG.debug("Listen to target %s", target) - with self._socket_lock: - self._subscribe_on_target(target) - def _receive_request(self, socket): topic_filter = socket.recv() LOG.debug("[%(id)s] Received %(topic_filter)s topic", @@ -115,42 +107,9 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): if request.msg_type not in zmq_names.MULTISEND_TYPES: LOG.error(_LE("Unknown message type: %s"), request.msg_type) else: - return SubIncomingMessage(request, socket, self.poller) + return SubIncomingMessage(request, socket) except zmq.ZMQError as e: LOG.error(_LE("Receiving message failed: %s"), str(e)) - -class MatchmakerPoller(object): - """This entity performs periodical async polling - to the matchmaker if no hosts were registered for - specified target before. - """ - - def __init__(self, matchmaker, on_result): - self.matchmaker = matchmaker - self.executor = zmq_async.get_executor( - method=self._poll_for_publishers) - self.on_result = on_result - self.executor.execute() - - def _poll_for_publishers(self): - publishers = self.matchmaker.get_publishers() - if publishers: - self.on_result(publishers) - self.executor.done() - - -class BackChatter(object): - - def __init__(self, conf, context): - self.socket = zmq_socket.ZmqSocket(conf, context, zmq.PUSH) - - def connect(self, address): - self.socket.connect(address) - - def send_ready(self): - for i in range(self.socket.connections_count()): - self.socket.send(zmq_names.ACK_TYPE) - - def close(self): - self.socket.close() + def cleanup(self): + super(SubConsumer, self).cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index 94d79360c..cf9262ece 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -32,27 +32,26 @@ zmq = zmq_async.import_zmq() class ZmqServer(base.Listener): - def __init__(self, driver, conf, matchmaker=None): + def __init__(self, driver, conf, matchmaker, target, poller=None): super(ZmqServer, self).__init__() self.driver = driver self.conf = conf self.matchmaker = matchmaker - self.poller = zmq_async.get_poller() + self.target = target + self.poller = poller or zmq_async.get_poller() self.router_consumer = zmq_router_consumer.RouterConsumer( conf, self.poller, self) self.pull_consumer = zmq_pull_consumer.PullConsumer( conf, self.poller, self) self.sub_consumer = zmq_sub_consumer.SubConsumer( conf, self.poller, self) if conf.use_pub_sub else None - self.notify_consumer = self.sub_consumer if conf.use_pub_sub \ - else self.router_consumer self.consumers = [self.router_consumer, self.pull_consumer] if self.sub_consumer: self.consumers.append(self.sub_consumer) @base.batch_poll_helper - def poll(self, timeout=None): + def poll(self, timeout=None, prefetch_size=1): message, socket = self.poller.poll( timeout or self.conf.rpc_poll_timeout) return message @@ -67,15 +66,35 @@ class ZmqServer(base.Listener): for consumer in self.consumers: consumer.cleanup() - def listen(self, target): - self.router_consumer.listen(target) - self.pull_consumer.listen(target) - if self.sub_consumer: - self.sub_consumer.listen(target) - def listen_notification(self, targets_and_priorities): - consumer = self.notify_consumer +class ZmqNotificationServer(base.Listener): + + def __init__(self, driver, conf, matchmaker, targets_and_priorities): + super(ZmqNotificationServer, self).__init__() + self.driver = driver + self.conf = conf + self.matchmaker = matchmaker + self.servers = [] + self.poller = zmq_async.get_poller() + self._listen(targets_and_priorities) + + def _listen(self, targets_and_priorities): for target, priority in targets_and_priorities: t = copy.deepcopy(target) t.topic = target.topic + '.' + priority - consumer.listen(t) + self.servers.append(ZmqServer( + self.driver, self.conf, self.matchmaker, t, self.poller)) + + @base.batch_poll_helper + def poll(self, timeout=None, prefetch_size=1): + message, socket = self.poller.poll( + timeout or self.conf.rpc_poll_timeout) + return message + + def stop(self): + for server in self.servers: + server.stop() + + def cleanup(self): + for server in self.servers: + server.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_async.py b/oslo_messaging/_drivers/zmq_driver/zmq_async.py index a8c81795b..ff57046f2 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_async.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_async.py @@ -43,17 +43,6 @@ def get_poller(zmq_concurrency='eventlet'): return threading_poller.ThreadingPoller() -def get_reply_poller(zmq_concurrency='eventlet'): - _raise_error_if_invalid_config_value(zmq_concurrency) - - if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available(): - from oslo_messaging._drivers.zmq_driver.poller import green_poller - return green_poller.HoldReplyPoller() - - from oslo_messaging._drivers.zmq_driver.poller import threading_poller - return threading_poller.ThreadingPoller() - - def get_executor(method, zmq_concurrency='eventlet'): _raise_error_if_invalid_config_value(zmq_concurrency) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index 5a61cbcf2..30710f5e4 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -134,6 +134,7 @@ class ZmqRandomPortSocket(ZmqSocket): min_port=conf.rpc_zmq_min_port, max_port=conf.rpc_zmq_max_port, max_tries=conf.rpc_zmq_bind_port_retries) + self.connected = True except zmq.ZMQBindError: LOG.error(_LE("Random ports range exceeded!")) raise ZmqPortRangeExceededException() diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py index f3955fa7f..621204d33 100644 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py @@ -12,7 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -import fixtures import testtools import oslo_messaging @@ -150,57 +149,3 @@ class TestZmqBasics(zmq_common.ZmqBaseTestCase): self.driver.send_notification(target, context, message, '3.0') self.listener._received.wait(5) self.assertTrue(self.listener._received.isSet()) - - -class TestPoller(test_utils.BaseTestCase): - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestPoller, self).setUp() - self.poller = zmq_async.get_poller() - self.ctx = zmq.Context() - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - self.ADDR_REQ = "ipc://%s/request1" % self.internal_ipc_dir - - def test_poll_blocking(self): - - rep = self.ctx.socket(zmq.REP) - rep.bind(self.ADDR_REQ) - - reply_poller = zmq_async.get_reply_poller() - reply_poller.register(rep) - - def listener(): - incoming, socket = reply_poller.poll() - self.assertEqual(b'Hello', incoming[0]) - socket.send_string('Reply') - reply_poller.resume_polling(socket) - - executor = zmq_async.get_executor(listener) - executor.execute() - - req1 = self.ctx.socket(zmq.REQ) - req1.connect(self.ADDR_REQ) - - req2 = self.ctx.socket(zmq.REQ) - req2.connect(self.ADDR_REQ) - - req1.send_string('Hello') - req2.send_string('Hello') - - reply = req1.recv_string() - self.assertEqual('Reply', reply) - - reply = req2.recv_string() - self.assertEqual('Reply', reply) - - def test_poll_timeout(self): - rep = self.ctx.socket(zmq.REP) - rep.bind(self.ADDR_REQ) - - reply_poller = zmq_async.get_reply_poller() - reply_poller.register(rep) - - incoming, socket = reply_poller.poll(1) - self.assertIsNone(incoming) - self.assertIsNone(socket) diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py index 30132e852..1b37be510 100644 --- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py +++ b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py @@ -46,7 +46,7 @@ class TestPubSub(zmq_common.ZmqBaseTestCase): self.listeners.append(zmq_common.TestServerListener(self.driver)) def _send_request(self, target): - # Needed only in test env to get listener a chance to connect + # Needed only in test env to give listener a chance to connect # before request fires time.sleep(1) with contextlib.closing(zmq_request.FanoutRequest( diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_async.py b/oslo_messaging/tests/drivers/zmq/test_zmq_async.py index ccfae3348..f929ab3dc 100644 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_async.py +++ b/oslo_messaging/tests/drivers/zmq/test_zmq_async.py @@ -105,21 +105,21 @@ class TestGetReplyPoller(test_utils.BaseTestCase): def test_default_reply_poller_is_HoldReplyPoller(self): zmq_async._is_eventlet_zmq_available = lambda: True - actual = zmq_async.get_reply_poller() + actual = zmq_async.get_poller() - self.assertTrue(isinstance(actual, green_poller.HoldReplyPoller)) + self.assertTrue(isinstance(actual, green_poller.GreenPoller)) def test_when_eventlet_is_available_then_return_HoldReplyPoller(self): zmq_async._is_eventlet_zmq_available = lambda: True - actual = zmq_async.get_reply_poller('eventlet') + actual = zmq_async.get_poller('eventlet') - self.assertTrue(isinstance(actual, green_poller.HoldReplyPoller)) + self.assertTrue(isinstance(actual, green_poller.GreenPoller)) def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self): zmq_async._is_eventlet_zmq_available = lambda: False - actual = zmq_async.get_reply_poller('eventlet') + actual = zmq_async.get_poller('eventlet') self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller)) @@ -128,7 +128,7 @@ class TestGetReplyPoller(test_utils.BaseTestCase): errmsg = 'Invalid zmq_concurrency value: x' with self.assertRaisesRegexp(ValueError, errmsg): - zmq_async.get_reply_poller(invalid_opt) + zmq_async.get_poller(invalid_opt) class TestGetExecutor(test_utils.BaseTestCase):