diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 6729f8701..09abfc550 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -265,18 +265,17 @@ class ProducerConnection(Connection): self.producer = None self.producer_lock = threading.Lock() - def _produce_message(self, topic, message): - while True: - try: - self.producer.produce(topic, message) - except KafkaException as e: - LOG.error("Produce message failed: %s" % str(e)) - except BufferError: - LOG.debug("Produce message queue full, waiting for deliveries") - self.producer.poll(0.5) - continue - break - + def _produce_message(self, topic, message, poll): + if poll: + self.producer.poll(poll) + try: + self.producer.produce(topic, message) + except KafkaException as e: + self.producer.poll(0) + raise e + except BufferError as e: + # We'll have to poll next time + raise e self.producer.poll(0) def notify_send(self, topic, ctxt, msg, retry): @@ -293,9 +292,22 @@ class ProducerConnection(Connection): try: self._ensure_producer() - if eventletutils.is_monkey_patched('thread'): - return tpool.execute(self._produce_message, topic, message) - return self._produce_message(topic, message) + poll = 0 + while True: + try: + if eventletutils.is_monkey_patched('thread'): + return tpool.execute(self._produce_message, topic, + message, poll) + return self._produce_message(topic, message, poll) + except KafkaException as e: + LOG.error("Produce message failed: %s" % str(e)) + break + except BufferError: + LOG.debug("Produce message queue full, " + "waiting for deliveries") + # We'll retry with .5s polling + poll = 0.5 + except Exception: # NOTE(sileht): if something goes wrong close the producer # connection diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index 77b2ed6cf..5e78369a1 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -15,6 +15,8 @@ import testscenarios from unittest import mock +from confluent_kafka import KafkaException + import oslo_messaging from oslo_messaging._drivers import impl_kafka as kafka_driver from oslo_messaging.tests import utils as test_utils @@ -120,6 +122,36 @@ class TestKafkaDriver(test_utils.BaseTestCase): 'ssl.key.password': '', }) + def test_send_notification_retries_on_buffer_error(self): + target = oslo_messaging.Target(topic="topic_test") + + with mock.patch("confluent_kafka.Producer") as producer: + fake_producer = mock.MagicMock() + fake_producer.produce = mock.Mock( + side_effect=[BufferError, BufferError, None]) + producer.return_value = fake_producer + + self.driver.send_notification( + target, {}, {"payload": ["test_1"]}, + None, retry=3) + + assert fake_producer.produce.call_count == 3 + + def test_send_notification_stops_on_kafka_error(self): + target = oslo_messaging.Target(topic="topic_test") + + with mock.patch("confluent_kafka.Producer") as producer: + fake_producer = mock.MagicMock() + fake_producer.produce = mock.Mock( + side_effect=[KafkaException, None]) + producer.return_value = fake_producer + + self.driver.send_notification( + target, {}, {"payload": ["test_1"]}, + None, retry=3) + + assert fake_producer.produce.call_count == 1 + def test_listen(self): target = oslo_messaging.Target(topic="topic_test") self.assertRaises(NotImplementedError, self.driver.listen, target, diff --git a/releasenotes/notes/bug-1981093-kafka-dont-log-in-tpool-execute-fa50ceee2d55ebae.yaml b/releasenotes/notes/bug-1981093-kafka-dont-log-in-tpool-execute-fa50ceee2d55ebae.yaml new file mode 100644 index 000000000..1103247f1 --- /dev/null +++ b/releasenotes/notes/bug-1981093-kafka-dont-log-in-tpool-execute-fa50ceee2d55ebae.yaml @@ -0,0 +1,8 @@ +--- +fixes: + - | + [`bug 1981093 `_] + Pulls calls to logging functions out of ``impl_kafka._produce_message``. + Since ``_produce_message`` is called through tpool.execute, calling logging + functions inside ``_produce_message`` could cause subsequent calls to + logging functions to deadlock.