diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index d96ede0dc..ed94bccc5 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -469,7 +469,7 @@ class AMQPDriverBase(base.BaseDriver): return self._send(target, ctxt, message, envelope=(version == 2.0), notify=True, retry=retry) - def listen(self, target, on_incoming_callback, batch_size, batch_timeout): + def listen(self, target, batch_size, batch_timeout): conn = self._get_connection(rpc_common.PURPOSE_LISTEN) listener = AMQPListener(self, conn) @@ -483,12 +483,11 @@ class AMQPDriverBase(base.BaseDriver): callback=listener) conn.declare_fanout_consumer(target.topic, listener) - return base.PollStyleListenerAdapter(listener, on_incoming_callback, - batch_size, batch_timeout) + return base.PollStyleListenerAdapter(listener, batch_size, + batch_timeout) def listen_for_notifications(self, targets_and_priorities, pool, - on_incoming_callback, batch_size, - batch_timeout): + batch_size, batch_timeout): conn = self._get_connection(rpc_common.PURPOSE_LISTEN) listener = AMQPListener(self, conn) @@ -497,8 +496,8 @@ class AMQPDriverBase(base.BaseDriver): exchange_name=self._get_exchange(target), topic='%s.%s' % (target.topic, priority), callback=listener, queue_name=pool) - return base.PollStyleListenerAdapter(listener, on_incoming_callback, - batch_size, batch_timeout) + return base.PollStyleListenerAdapter(listener, batch_size, + batch_timeout) def cleanup(self): if self._connection_pool: diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py index 107a24c53..78a379284 100644 --- a/oslo_messaging/_drivers/base.py +++ b/oslo_messaging/_drivers/base.py @@ -81,11 +81,11 @@ class IncomingMessage(object): self.message = message def acknowledge(self): - "Acknowledge the message." + """Acknowledge the message.""" @abc.abstractmethod def requeue(self): - "Requeue the message." + """Requeue the message.""" @six.add_metaclass(abc.ABCMeta) @@ -128,44 +128,39 @@ class PollStyleListener(object): @six.add_metaclass(abc.ABCMeta) class Listener(object): - def __init__(self, on_incoming_callback, batch_size, batch_timeout, + def __init__(self, batch_size, batch_timeout, prefetch_size=-1): """Init Listener - :param on_incoming_callback: callback function to be executed when - listener received messages. Messages should be processed and - acked/nacked by callback :param batch_size: desired number of messages passed to - single on_incoming_callback call + single on_incoming_callback notification :param batch_timeout: defines how long should we wait for batch_size messages if we already have some messages waiting for processing :param prefetch_size: defines how many massages we want to prefetch from backend (depend on driver type) by single request """ - self.on_incoming_callback = on_incoming_callback + self.on_incoming_callback = None self.batch_timeout = batch_timeout self.prefetch_size = prefetch_size if prefetch_size > 0: batch_size = min(batch_size, prefetch_size) self.batch_size = batch_size - @abc.abstractmethod - def start(self): - """Stop listener. - Stop the listener message polling - """ + def start(self, on_incoming_callback): + """Start listener. + Start the listener message polling - @abc.abstractmethod - def wait(self): - """Wait listener. - Wait for processing remained input after listener Stop + :param on_incoming_callback: callback function to be executed when + listener received messages. Messages should be processed and + acked/nacked by callback """ + self.on_incoming_callback = on_incoming_callback - @abc.abstractmethod def stop(self): """Stop listener. Stop the listener message polling """ + self.on_incoming_callback = None @abc.abstractmethod def cleanup(self): @@ -177,21 +172,24 @@ class Listener(object): class PollStyleListenerAdapter(Listener): - def __init__(self, poll_style_listener, on_incoming_callback, batch_size, - batch_timeout): + def __init__(self, poll_style_listener, batch_size, batch_timeout): super(PollStyleListenerAdapter, self).__init__( - on_incoming_callback, batch_size, batch_timeout, - poll_style_listener.prefetch_size + batch_size, batch_timeout, poll_style_listener.prefetch_size ) self._poll_style_listener = poll_style_listener self._listen_thread = threading.Thread(target=self._runner) self._listen_thread.daemon = True self._started = False - def start(self): + def start(self, on_incoming_callback): """Start listener. Start the listener message polling + + :param on_incoming_callback: callback function to be executed when + listener received messages. Messages should be processed and + acked/nacked by callback """ + super(PollStyleListenerAdapter, self).start(on_incoming_callback) self._started = True self._listen_thread.start() @@ -220,9 +218,8 @@ class PollStyleListenerAdapter(Listener): """ self._started = False self._poll_style_listener.stop() - - def wait(self): self._listen_thread.join() + super(PollStyleListenerAdapter, self).stop() def cleanup(self): """Cleanup listener. @@ -259,13 +256,12 @@ class BaseDriver(object): """Send a notification message to the given target.""" @abc.abstractmethod - def listen(self, target, on_incoming_callback, batch_size, batch_timeout): + def listen(self, target, batch_size, batch_timeout): """Construct a Listener for the given target.""" @abc.abstractmethod def listen_for_notifications(self, targets_and_priorities, pool, - on_incoming_callback, batch_size, - batch_timeout): + batch_size, batch_timeout): """Construct a notification Listener for the given list of tuple of (target, priority). """ diff --git a/oslo_messaging/_drivers/impl_fake.py b/oslo_messaging/_drivers/impl_fake.py index 7a8fc96d1..f35980493 100644 --- a/oslo_messaging/_drivers/impl_fake.py +++ b/oslo_messaging/_drivers/impl_fake.py @@ -222,7 +222,7 @@ class FakeDriver(base.BaseDriver): # transport always works self._send(target, ctxt, message) - def listen(self, target, on_incoming_callback, batch_size, batch_timeout): + def listen(self, target, batch_size, batch_timeout): exchange = target.exchange or self._default_exchange listener = FakeListener(self._exchange_manager, [oslo_messaging.Target( @@ -232,12 +232,11 @@ class FakeDriver(base.BaseDriver): oslo_messaging.Target( topic=target.topic, exchange=exchange)]) - return base.PollStyleListenerAdapter(listener, on_incoming_callback, - batch_size, batch_timeout) + return base.PollStyleListenerAdapter(listener, batch_size, + batch_timeout) def listen_for_notifications(self, targets_and_priorities, pool, - on_incoming_callback, batch_size, - batch_timeout): + batch_size, batch_timeout): targets = [ oslo_messaging.Target( topic='%s.%s' % (target.topic, priority), @@ -245,8 +244,8 @@ class FakeDriver(base.BaseDriver): for target, priority in targets_and_priorities] listener = FakeListener(self._exchange_manager, targets, pool) - return base.PollStyleListenerAdapter(listener, on_incoming_callback, - batch_size, batch_timeout) + return base.PollStyleListenerAdapter(listener, batch_size, + batch_timeout) def cleanup(self): pass diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 471734cfb..7264d5fda 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -340,8 +340,7 @@ class KafkaDriver(base.BaseDriver): 'The RPC implementation for Kafka is not implemented') def listen_for_notifications(self, targets_and_priorities, pool, - on_incoming_callback, batch_size, - batch_timeout): + batch_size, batch_timeout): """Listen to a specified list of targets on Kafka brokers :param targets_and_priorities: List of pairs (target, priority) @@ -360,8 +359,8 @@ class KafkaDriver(base.BaseDriver): conn.declare_topic_consumer(topics, pool) listener = KafkaListener(conn) - return base.PollStyleListenerAdapter(listener, on_incoming_callback, - batch_size, batch_timeout) + return base.PollStyleListenerAdapter(listener, batch_size, + batch_timeout) def _get_connection(self, purpose): return driver_common.ConnectionContext(self.connection_pool, purpose) diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py index 33f8c3312..42686350d 100644 --- a/oslo_messaging/_drivers/impl_pika.py +++ b/oslo_messaging/_drivers/impl_pika.py @@ -334,17 +334,16 @@ class PikaDriver(base.BaseDriver): retrier=retrier ) - def listen(self, target, on_incoming_callback, batch_size, batch_timeout): + def listen(self, target, batch_size, batch_timeout): listener = pika_drv_poller.RpcServicePikaPoller( self._pika_engine, target, prefetch_count=self._pika_engine.rpc_listener_prefetch_count ) listener.start() - return base.PollStyleListenerAdapter(listener, on_incoming_callback, - batch_size, batch_timeout) + return base.PollStyleListenerAdapter(listener, batch_size, + batch_timeout) def listen_for_notifications(self, targets_and_priorities, pool, - on_incoming_callback, batch_size, batch_timeout): listener = pika_drv_poller.NotificationPikaPoller( self._pika_engine, targets_and_priorities, @@ -354,8 +353,8 @@ class PikaDriver(base.BaseDriver): queue_name=pool ) listener.start() - return base.PollStyleListenerAdapter(listener, on_incoming_callback, - batch_size, batch_timeout) + return base.PollStyleListenerAdapter(listener, batch_size, + batch_timeout) def cleanup(self): self._reply_listener.cleanup() diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index f8175899d..739525670 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -254,7 +254,7 @@ class ZmqDriver(base.BaseDriver): client = self.notifier.get() client.send_notify(target, ctxt, message, version, retry) - def listen(self, target, on_incoming_callback, batch_size, batch_timeout): + def listen(self, target, batch_size, batch_timeout): """Listen to a specified target on a server side :param target: Message destination target @@ -262,12 +262,11 @@ class ZmqDriver(base.BaseDriver): """ listener = zmq_server.ZmqServer(self, self.conf, self.matchmaker, target) - return base.PollStyleListenerAdapter(listener, on_incoming_callback, - batch_size, batch_timeout) + return base.PollStyleListenerAdapter(listener, batch_size, + batch_timeout) def listen_for_notifications(self, targets_and_priorities, pool, - on_incoming_callback, batch_size, - batch_timeout): + batch_size, batch_timeout): """Listen to a specified list of targets on a server side :param targets_and_priorities: List of pairs (target, priority) @@ -277,8 +276,8 @@ class ZmqDriver(base.BaseDriver): """ listener = zmq_server.ZmqNotificationServer( self, self.conf, self.matchmaker, targets_and_priorities) - return base.PollStyleListenerAdapter(listener, on_incoming_callback, - batch_size, batch_timeout) + return base.PollStyleListenerAdapter(listener, batch_size, + batch_timeout) def cleanup(self): """Cleanup all driver's connections finally diff --git a/oslo_messaging/_drivers/protocols/amqp/driver.py b/oslo_messaging/_drivers/protocols/amqp/driver.py index 0a3f889e7..50d0036a9 100644 --- a/oslo_messaging/_drivers/protocols/amqp/driver.py +++ b/oslo_messaging/_drivers/protocols/amqp/driver.py @@ -267,19 +267,18 @@ class ProtonDriver(base.BaseDriver): return self.send(target, ctxt, message, envelope=(version == 2.0)) @_ensure_connect_called - def listen(self, target, on_incoming_callback, batch_size, batch_timeout): + def listen(self, target, batch_size, batch_timeout): """Construct a Listener for the given target.""" LOG.debug("Listen to %s", target) listener = ProtonListener(self) self._ctrl.add_task(drivertasks.ListenTask(target, listener)) - return base.PollStyleListenerAdapter(listener, on_incoming_callback, - batch_size, batch_timeout) + return base.PollStyleListenerAdapter(listener, batch_size, + batch_timeout) return listener @_ensure_connect_called def listen_for_notifications(self, targets_and_priorities, pool, - on_incoming_callback, batch_size, - batch_timeout): + batch_size, batch_timeout): LOG.debug("Listen for notifications %s", targets_and_priorities) if pool: raise NotImplementedError('"pool" not implemented by ' @@ -289,8 +288,8 @@ class ProtonDriver(base.BaseDriver): topic = '%s.%s' % (target.topic, priority) t = messaging_target.Target(topic=topic) self._ctrl.add_task(drivertasks.ListenTask(t, listener, True)) - return base.PollStyleListenerAdapter(listener, on_incoming_callback, - batch_size, batch_timeout) + return base.PollStyleListenerAdapter(listener, batch_size, + batch_timeout) def cleanup(self): """Release all resources.""" diff --git a/oslo_messaging/notify/listener.py b/oslo_messaging/notify/listener.py index 89f42f347..4026e9c55 100644 --- a/oslo_messaging/notify/listener.py +++ b/oslo_messaging/notify/listener.py @@ -113,11 +113,12 @@ from oslo_messaging import server as msg_server LOG = logging.getLogger(__name__) -class NotificationServer(msg_server.MessageHandlingServer): +class NotificationServerBase(msg_server.MessageHandlingServer): def __init__(self, transport, targets, dispatcher, executor='blocking', - allow_requeue=True, pool=None): - super(NotificationServer, self).__init__(transport, dispatcher, - executor) + allow_requeue=True, pool=None, batch_size=1, + batch_timeout=None): + super(NotificationServerBase, self).__init__(transport, dispatcher, + executor) self._allow_requeue = allow_requeue self._pool = pool self.targets = targets @@ -126,46 +127,42 @@ class NotificationServer(msg_server.MessageHandlingServer): self.dispatcher.supported_priorities) ) + self._batch_size = batch_size + self._batch_timeout = batch_timeout + def _create_listener(self): return self.transport._listen_for_notifications( - self._targets_priorities, self._pool, - lambda incoming: self._on_incoming(incoming[0]), 1, None + self._targets_priorities, self._pool, self._batch_size, + self._batch_timeout + ) + + +class NotificationServer(NotificationServerBase): + def __init__(self, transport, targets, dispatcher, executor='blocking', + allow_requeue=True, pool=None): + super(NotificationServer, self).__init__( + transport, targets, dispatcher, executor, allow_requeue, pool, 1, + None ) def _process_incoming(self, incoming): - res = notify_dispatcher.NotificationResult.REQUEUE + message = incoming[0] try: - res = self.dispatcher.dispatch(incoming) + res = self.dispatcher.dispatch(message) except Exception: LOG.error(_LE('Exception during message handling'), exc_info=True) try: if (res == notify_dispatcher.NotificationResult.REQUEUE and self._allow_requeue): - incoming.requeue() + message.requeue() else: - incoming.acknowledge() + message.acknowledge() except Exception: LOG.error(_LE("Fail to ack/requeue message"), exc_info=True) -class BatchNotificationServer(NotificationServer): - def __init__(self, transport, targets, dispatcher, executor='blocking', - allow_requeue=True, pool=None, batch_size=1, - batch_timeout=None): - super(BatchNotificationServer, self).__init__( - transport=transport, targets=targets, dispatcher=dispatcher, - executor=executor, allow_requeue=allow_requeue, pool=pool - ) - - self._batch_size = batch_size - self._batch_timeout = batch_timeout - - def _create_listener(self): - return self.transport._listen_for_notifications( - self._targets_priorities, self._pool, self._on_incoming, - self._batch_size, self._batch_timeout, - ) +class BatchNotificationServer(NotificationServerBase): def _process_incoming(self, incoming): try: diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py index 37afc4512..a562ea38d 100644 --- a/oslo_messaging/rpc/server.py +++ b/oslo_messaging/rpc/server.py @@ -118,19 +118,17 @@ class RPCServer(msg_server.MessageHandlingServer): self._target = target def _create_listener(self): - return self.transport._listen( - self._target, - lambda incoming: self._on_incoming(incoming[0]), 1, None - ) + return self.transport._listen(self._target, 1, None) def _process_incoming(self, incoming): - incoming.acknowledge() + message = incoming[0] + message.acknowledge() try: - res = self.dispatcher.dispatch(incoming) + res = self.dispatcher.dispatch(message) except rpc_dispatcher.ExpectedException as e: LOG.debug(u'Expected exception during message handling (%s)', e.exc_info[1]) - incoming.reply(failure=e.exc_info) + message.reply(failure=e.exc_info) except Exception as e: # current sys.exc_info() content can be overriden # by another exception raise by a log handler during @@ -138,7 +136,7 @@ class RPCServer(msg_server.MessageHandlingServer): exc_info = sys.exc_info() try: LOG.exception(_LE('Exception during message handling: %s'), e) - incoming.reply(failure=exc_info) + message.reply(failure=exc_info) finally: # NOTE(dhellmann): Remove circular object reference # between the current stack frame and the traceback in @@ -146,9 +144,9 @@ class RPCServer(msg_server.MessageHandlingServer): del exc_info else: try: - incoming.reply(res) + message.reply(res) except Exception: - LOG.Exception("Can not send reply for message %s", incoming) + LOG.Exception("Can not send reply for message %s", message) def get_rpc_server(transport, target, endpoints, diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py index d69eeb9c4..8b4f4197c 100644 --- a/oslo_messaging/server.py +++ b/oslo_messaging/server.py @@ -308,8 +308,8 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): def __init__(self, transport, dispatcher, executor='blocking'): """Construct a message handling server. - The dispatcher parameter is a callable which is invoked with context - and message dictionaries each time a message is received. + The dispatcher parameter is a DispatcherBase instance which is used + for routing request to endpoint for processing. The executor parameter controls how incoming messages will be received and dispatched. By default, the most simple executor is used - the @@ -317,8 +317,9 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): :param transport: the messaging transport :type transport: Transport - :param dispatcher: a callable which is invoked for each method - :type dispatcher: callable + :param dispatcher: has a dispatch() method which is invoked for each + incoming request + :type dispatcher: DispatcherBase :param executor: name of message executor - for example 'eventlet', 'blocking' :type executor: str @@ -347,7 +348,7 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): super(MessageHandlingServer, self).__init__() def _on_incoming(self, incoming): - """Hanles on_incoming event + """Handles on_incoming event :param incoming: incoming request. """ @@ -411,7 +412,7 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): except driver_base.TransportDriverError as ex: raise ServerListenError(self.target, ex) - return self.listener.start + self.listener.start(self._on_incoming) @ordered(after='start') def stop(self): @@ -436,7 +437,6 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): Once it's finished, the underlying driver resources associated to this server are released (like closing useless network connections). """ - self.listener.wait() self._work_executor.shutdown(wait=True) # Close listener connection after processing all messages diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index e5528cc30..1ba3a85ed 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -204,7 +204,7 @@ class TestKafkaListener(test_utils.BaseTestCase): fake_target = oslo_messaging.Target(topic='fake_topic') fake_targets_and_priorities = [(fake_target, 'info')] self.driver.listen_for_notifications(fake_targets_and_priorities, None, - None, None, None) + None, None) self.assertEqual(1, len(fake_consumer.mock_calls)) @mock.patch.object(kafka_driver.Connection, '_ensure_connection') @@ -222,7 +222,7 @@ class TestKafkaListener(test_utils.BaseTestCase): exchange="test3"), 'error'), ] self.driver.listen_for_notifications(fake_targets_and_priorities, None, - None, None, None) + None, None) self.assertEqual(1, len(fake_consumer.mock_calls)) fake_consumer.assert_called_once_with(set(['fake_topic.error', 'fake_topic.info']), @@ -234,8 +234,7 @@ class TestKafkaListener(test_utils.BaseTestCase): fake_target = oslo_messaging.Target(topic='fake_topic') fake_targets_and_priorities = [(fake_target, 'info')] listener = self.driver.listen_for_notifications( - fake_targets_and_priorities, None, None, None, - None)._poll_style_listener + fake_targets_and_priorities, None, None, None)._poll_style_listener listener.conn.consume = mock.MagicMock() listener.conn.consume.return_value = ( iter([kafka.common.KafkaMessage( @@ -267,8 +266,7 @@ class TestWithRealKafkaBroker(test_utils.BaseTestCase): targets_and_priorities = [(target, 'fake_info')] listener = self.driver.listen_for_notifications( - targets_and_priorities, None, None, None, - None)._poll_style_listener + targets_and_priorities, None, None, None)._poll_style_listener fake_context = {"fake_context_key": "fake_context_value"} fake_message = {"fake_message_key": "fake_message_value"} self.driver.send_notification( @@ -285,8 +283,7 @@ class TestWithRealKafkaBroker(test_utils.BaseTestCase): targets_and_priorities = [(target, 'fake_info')] listener = self.driver.listen_for_notifications( - targets_and_priorities, None, None, None, - None)._poll_style_listener + targets_and_priorities, None, None, None)._poll_style_listener fake_context = {"fake_context_key": "fake_context_value"} fake_message = {"fake_message_key": "fake_message_value"} self.driver.send_notification( @@ -304,8 +301,7 @@ class TestWithRealKafkaBroker(test_utils.BaseTestCase): targets_and_priorities = [(target, 'fake_info')] listener = self.driver.listen_for_notifications( - targets_and_priorities, None, None, None, - None)._poll_style_listener + targets_and_priorities, None, None, None)._poll_style_listener deadline = time.time() + 3 received_message = listener.poll(batch_timeout=3) diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 169c82c76..62597d65a 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -435,7 +435,7 @@ class TestSendReceive(test_utils.BaseTestCase): target = oslo_messaging.Target(topic='testtopic') - listener = driver.listen(target, None, None, None)._poll_style_listener + listener = driver.listen(target, None, None)._poll_style_listener senders = [] replies = [] @@ -525,7 +525,7 @@ class TestPollAsync(test_utils.BaseTestCase): self.addCleanup(transport.cleanup) driver = transport._driver target = oslo_messaging.Target(topic='testtopic') - listener = driver.listen(target, None, None, None)._poll_style_listener + listener = driver.listen(target, None, None)._poll_style_listener received = listener.poll(timeout=0.050) self.assertEqual([], received) @@ -541,7 +541,7 @@ class TestRacyWaitForReply(test_utils.BaseTestCase): target = oslo_messaging.Target(topic='testtopic') - listener = driver.listen(target, None, None, None)._poll_style_listener + listener = driver.listen(target, None, None)._poll_style_listener senders = [] replies = [] msgs = [] @@ -877,7 +877,7 @@ class TestReplyWireFormat(test_utils.BaseTestCase): server=self.server, fanout=self.fanout) - listener = driver.listen(target, None, None, None)._poll_style_listener + listener = driver.listen(target, None, None)._poll_style_listener connection, producer = _create_producer(target) self.addCleanup(connection.release) diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py index e4e4dbfe1..f9f6f5265 100644 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py @@ -43,7 +43,7 @@ class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase): for i in range(10): try: target = oslo_messaging.Target(topic='testtopic_' + str(i)) - new_listener = self.driver.listen(target, None, None, None) + new_listener = self.driver.listen(target, None, None) listeners.append(new_listener) except zmq_socket.ZmqPortRangeExceededException: pass diff --git a/oslo_messaging/tests/drivers/zmq/zmq_common.py b/oslo_messaging/tests/drivers/zmq/zmq_common.py index c6dfddcb2..f0ef4a419 100644 --- a/oslo_messaging/tests/drivers/zmq/zmq_common.py +++ b/oslo_messaging/tests/drivers/zmq/zmq_common.py @@ -39,14 +39,13 @@ class TestServerListener(object): self.message = None def listen(self, target): - self.listener = self.driver.listen(target, None, None, + self.listener = self.driver.listen(target, None, None)._poll_style_listener self.executor.execute() def listen_notifications(self, targets_and_priorities): self.listener = self.driver.listen_for_notifications( - targets_and_priorities, None, None, None, - None)._poll_style_listener + targets_and_priorities, None, None, None)._poll_style_listener self.executor.execute() def _run(self): diff --git a/oslo_messaging/tests/test_amqp_driver.py b/oslo_messaging/tests/test_amqp_driver.py index 0c355170f..af6724dba 100644 --- a/oslo_messaging/tests/test_amqp_driver.py +++ b/oslo_messaging/tests/test_amqp_driver.py @@ -131,7 +131,7 @@ class TestAmqpSend(_AmqpBrokerTestCase): """Verify unused listener can cleanly shutdown.""" driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) target = oslo_messaging.Target(topic="test-topic") - listener = driver.listen(target, None, None, None)._poll_style_listener + listener = driver.listen(target, None, None)._poll_style_listener self.assertIsInstance(listener, amqp_driver.ProtonListener) driver.cleanup() @@ -139,7 +139,7 @@ class TestAmqpSend(_AmqpBrokerTestCase): driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) target = oslo_messaging.Target(topic="test-topic") listener = _ListenerThread( - driver.listen(target, None, None, None)._poll_style_listener, 1) + driver.listen(target, None, None)._poll_style_listener, 1) rc = driver.send(target, {"context": True}, {"msg": "value"}, wait_for_reply=False) self.assertIsNone(rc) @@ -152,10 +152,10 @@ class TestAmqpSend(_AmqpBrokerTestCase): driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) target1 = oslo_messaging.Target(topic="test-topic", exchange="e1") listener1 = _ListenerThread( - driver.listen(target1, None, None, None)._poll_style_listener, 1) + driver.listen(target1, None, None)._poll_style_listener, 1) target2 = oslo_messaging.Target(topic="test-topic", exchange="e2") listener2 = _ListenerThread( - driver.listen(target2, None, None, None)._poll_style_listener, 1) + driver.listen(target2, None, None)._poll_style_listener, 1) rc = driver.send(target1, {"context": "whatever"}, {"method": "echo", "id": "e1"}, @@ -182,10 +182,10 @@ class TestAmqpSend(_AmqpBrokerTestCase): driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) target1 = oslo_messaging.Target(topic="test-topic", server="server1") listener1 = _ListenerThread( - driver.listen(target1, None, None, None)._poll_style_listener, 4) + driver.listen(target1, None, None)._poll_style_listener, 4) target2 = oslo_messaging.Target(topic="test-topic", server="server2") listener2 = _ListenerThread( - driver.listen(target2, None, None, None)._poll_style_listener, 3) + driver.listen(target2, None, None)._poll_style_listener, 3) shared_target = oslo_messaging.Target(topic="test-topic") fanout_target = oslo_messaging.Target(topic="test-topic", @@ -256,7 +256,7 @@ class TestAmqpSend(_AmqpBrokerTestCase): driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) target = oslo_messaging.Target(topic="test-topic") listener = _ListenerThread( - driver.listen(target, None, None, None)._poll_style_listener, 1) + driver.listen(target, None, None)._poll_style_listener, 1) # the listener will drop this message: try: @@ -283,7 +283,7 @@ class TestAmqpNotification(_AmqpBrokerTestCase): (oslo_messaging.Target(topic="topic-1"), 'error'), (oslo_messaging.Target(topic="topic-2"), 'debug')] nl = driver.listen_for_notifications( - notifications, None, None, None, None)._poll_style_listener + notifications, None, None, None)._poll_style_listener # send one for each support version: msg_count = len(notifications) * 2 @@ -345,7 +345,7 @@ class TestAuthentication(test_utils.BaseTestCase): driver = amqp_driver.ProtonDriver(self.conf, url) target = oslo_messaging.Target(topic="test-topic") listener = _ListenerThread( - driver.listen(target, None, None, None)._poll_style_listener, 1) + driver.listen(target, None, None)._poll_style_listener, 1) rc = driver.send(target, {"context": True}, {"method": "echo"}, wait_for_reply=True) self.assertIsNotNone(rc) @@ -364,7 +364,7 @@ class TestAuthentication(test_utils.BaseTestCase): driver = amqp_driver.ProtonDriver(self.conf, url) target = oslo_messaging.Target(topic="test-topic") _ListenerThread( - driver.listen(target, None, None, None)._poll_style_listener, 1) + driver.listen(target, None, None)._poll_style_listener, 1) self.assertRaises(oslo_messaging.MessagingTimeout, driver.send, target, {"context": True}, @@ -436,7 +436,7 @@ mech_list: ${mechs} driver = amqp_driver.ProtonDriver(self.conf, url) target = oslo_messaging.Target(topic="test-topic") listener = _ListenerThread( - driver.listen(target, None, None, None)._poll_style_listener, 1) + driver.listen(target, None, None)._poll_style_listener, 1) rc = driver.send(target, {"context": True}, {"method": "echo"}, wait_for_reply=True) self.assertIsNotNone(rc) @@ -455,7 +455,7 @@ mech_list: ${mechs} driver = amqp_driver.ProtonDriver(self.conf, url) target = oslo_messaging.Target(topic="test-topic") _ListenerThread( - driver.listen(target, None, None, None)._poll_style_listener, 1) + driver.listen(target, None, None)._poll_style_listener, 1) self.assertRaises(oslo_messaging.MessagingTimeout, driver.send, target, {"context": True}, @@ -476,7 +476,7 @@ mech_list: ${mechs} driver = amqp_driver.ProtonDriver(self.conf, url) target = oslo_messaging.Target(topic="test-topic") _ListenerThread( - driver.listen(target, None, None, None)._poll_style_listener, 1) + driver.listen(target, None, None)._poll_style_listener, 1) self.assertRaises(oslo_messaging.MessagingTimeout, driver.send, target, {"context": True}, @@ -497,7 +497,7 @@ mech_list: ${mechs} driver = amqp_driver.ProtonDriver(self.conf, url) target = oslo_messaging.Target(topic="test-topic") listener = _ListenerThread( - driver.listen(target, None, None, None)._poll_style_listener, 1) + driver.listen(target, None, None)._poll_style_listener, 1) rc = driver.send(target, {"context": True}, {"method": "echo"}, wait_for_reply=True) self.assertIsNotNone(rc) @@ -533,7 +533,7 @@ class TestFailover(test_utils.BaseTestCase): target = oslo_messaging.Target(topic="my-topic") listener = _ListenerThread( - driver.listen(target, None, None, None)._poll_style_listener, 2) + driver.listen(target, None, None)._poll_style_listener, 2) # wait for listener links to come up # 4 == 3 links per listener + 1 for the global reply queue @@ -620,9 +620,9 @@ class TestFailover(test_utils.BaseTestCase): target = oslo_messaging.Target(topic="my-topic") bcast = oslo_messaging.Target(topic="my-topic", fanout=True) listener1 = _ListenerThread( - driver.listen(target, None, None, None)._poll_style_listener, 2) + driver.listen(target, None, None)._poll_style_listener, 2) listener2 = _ListenerThread( - driver.listen(target, None, None, None)._poll_style_listener, 2) + driver.listen(target, None, None)._poll_style_listener, 2) # wait for 7 sending links to become active on the broker. # 7 = 3 links per Listener + 1 global reply link diff --git a/oslo_messaging/tests/test_transport.py b/oslo_messaging/tests/test_transport.py index feccaafc8..a9d7dc6b2 100644 --- a/oslo_messaging/tests/test_transport.py +++ b/oslo_messaging/tests/test_transport.py @@ -38,7 +38,7 @@ class _FakeDriver(object): def send_notification(self, *args, **kwargs): pass - def listen(self, target, on_incoming_callback, batch_size, batch_timeout): + def listen(self, target, batch_size, batch_timeout): pass @@ -314,10 +314,10 @@ class TestTransportMethodArgs(test_utils.BaseTestCase): t = transport.Transport(_FakeDriver(cfg.CONF)) self.mox.StubOutWithMock(t._driver, 'listen') - t._driver.listen(self._target, None, 1, None) + t._driver.listen(self._target, 1, None) self.mox.ReplayAll() - t._listen(self._target, None, 1, None) + t._listen(self._target, 1, None) class TestTransportUrlCustomisation(test_utils.BaseTestCase): diff --git a/oslo_messaging/transport.py b/oslo_messaging/transport.py index 745ea08a2..9d945640c 100644 --- a/oslo_messaging/transport.py +++ b/oslo_messaging/transport.py @@ -97,25 +97,23 @@ class Transport(object): self._driver.send_notification(target, ctxt, message, version, retry=retry) - def _listen(self, target, on_incoming_callback, batch_size, batch_timeout): + def _listen(self, target, batch_size, batch_timeout): if not (target.topic and target.server): raise exceptions.InvalidTarget('A server\'s target must have ' 'topic and server names specified', target) - return self._driver.listen(target, on_incoming_callback, batch_size, + return self._driver.listen(target, batch_size, batch_timeout) def _listen_for_notifications(self, targets_and_priorities, pool, - on_incoming_callback, batch_size, - batch_timeout): + batch_size, batch_timeout): for target, priority in targets_and_priorities: if not target.topic: raise exceptions.InvalidTarget('A target must have ' 'topic specified', target) return self._driver.listen_for_notifications( - targets_and_priorities, pool, on_incoming_callback, batch_size, - batch_timeout + targets_and_priorities, pool, batch_size, batch_timeout ) def cleanup(self):