From a76a51a78c9a5d07dfc2f57878738febe5fff4d9 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Wed, 7 Dec 2016 18:28:55 +0100 Subject: [PATCH] kafka: Remove Producer singleton This producer singleton works only if one transport use kafka. If multiple transports with kafka are used, their overrides each other the producer on each send. This change creates only one producer per connection. A later optimisation can be one producer per driver instance. Change-Id: I429f7c5efcb41690dd1b17f856bda2425c788e53 --- oslo_messaging/_drivers/impl_kafka.py | 84 +++++++------------ .../tests/drivers/test_impl_kafka.py | 20 +++-- 2 files changed, 42 insertions(+), 62 deletions(-) diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 52e793327..6c90d30cb 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -115,35 +115,6 @@ def with_reconnect(retries=None): return decorator -class Producer(object): - _producer = None - _servers = None - _lock = threading.Lock() - - @staticmethod - @with_reconnect() - def connect(servers, **kwargs): - return kafka.KafkaProducer( - bootstrap_servers=servers, - selector=KAFKA_SELECTOR, - **kwargs) - - @classmethod - def producer(cls, servers, **kwargs): - with cls._lock: - if not cls._producer or cls._servers != servers: - cls._servers = servers - cls._producer = cls.connect(servers, **kwargs) - return cls._producer - - @classmethod - def cleanup(cls): - with cls._lock: - if cls._producer: - cls._producer.close() - cls._producer = None - - class Connection(object): def __init__(self, conf, url, purpose): @@ -154,6 +125,7 @@ class Connection(object): self.linger_ms = driver_conf.producer_batch_timeout * 1000 self.conf = conf self.producer = None + self.producer_lock = threading.Lock() self.consumer = None self.consumer_timeout = float(driver_conf.kafka_consumer_timeout) self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes @@ -189,25 +161,24 @@ class Connection(object): :param msg: messages for publishing :param retry: the number of retry """ - - message = pack_message(ctxt, msg) - self._ensure_connection() - self._send_and_retry(message, topic, retry) - - def _send_and_retry(self, message, topic, retry): - if not isinstance(message, str): - message = jsonutils.dumps(message) retry = retry if retry >= 0 else None + message = pack_message(ctxt, msg) + message = jsonutils.dumps(message) @with_reconnect(retries=retry) - def _send(topic, message): + def wrapped_with_reconnect(): + self._ensure_producer() + # NOTE(sileht): This returns a future, we can use get() + # if we want to block like other driver self.producer.send(topic, message) try: - _send(topic, message) + wrapped_with_reconnect() except Exception: - Producer.cleanup() - LOG.exception(_LE("Failed to send message")) + # NOTE(sileht): if something goes wrong close the producer + # connection + self._close_producer() + raise @with_reconnect() def _poll_messages(self, timeout): @@ -239,12 +210,10 @@ class Connection(object): pass def close(self): - if self.producer: - self.producer.close() - self.producer = None + self._close_producer() if self.consumer: self.consumer.close() - self.consumer = None + self.consumer = None def commit(self): """Commit is used by subscribers belonging to the same group. @@ -257,14 +226,23 @@ class Connection(object): """ self.consumer.commit() - def _ensure_connection(self): - try: - self.producer = Producer.producer(self.hostaddrs, - linger_ms=self.linger_ms, - batch_size=self.batch_size) - except kafka.errors.KafkaError as e: - LOG.exception(_LE("KafkaProducer could not be initialized: %s"), e) - raise + def _close_producer(self): + with self.producer_lock: + if self.producer: + self.producer.close() + self.producer = None + + def _ensure_producer(self): + if self.producer: + return + with self.producer_lock: + if self.producer: + return + self.producer = kafka.KafkaProducer( + bootstrap_servers=self.hostaddrs, + linger_ms=self.linger_ms, + batch_size=self.batch_size, + selector=KAFKA_SELECTOR) @with_reconnect() def declare_topic_consumer(self, topics, group=None): diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index 6262aab4f..8d76cdc13 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -74,7 +74,6 @@ class TestKafkaDriver(test_utils.BaseTestCase): self.messaging_conf.transport_driver = 'kafka' transport = oslo_messaging.get_transport(self.conf) self.driver = transport._driver - self.addCleanup(kafka_driver.Producer.cleanup) def test_send(self): target = oslo_messaging.Target(topic="topic_test") @@ -87,8 +86,10 @@ class TestKafkaDriver(test_utils.BaseTestCase): with mock.patch("kafka.KafkaProducer") as fake_producer_class: fake_producer = fake_producer_class.return_value fake_producer.send.side_effect = kafka.errors.NoBrokersAvailable - self.driver.send_notification(target, {}, {"payload": ["test_1"]}, - None, retry=3) + self.assertRaises(kafka.errors.NoBrokersAvailable, + self.driver.send_notification, + target, {}, {"payload": ["test_1"]}, + None, retry=3) self.assertEqual(3, fake_producer.send.call_count) def test_listen(self): @@ -127,10 +128,11 @@ class TestKafkaConnection(test_utils.BaseTestCase): transport = oslo_messaging.get_transport(self.conf) self.driver = transport._driver - @mock.patch.object(kafka_driver.Connection, '_ensure_connection') - @mock.patch.object(kafka_driver.Connection, '_send_and_retry') - def test_notify(self, fake_send, fake_ensure_connection): + def test_notify(self): conn = self.driver._get_connection(common_driver.PURPOSE_SEND) - conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"}, - {"fake_text": "fake_message_1"}, 10) - self.assertEqual(1, len(fake_send.mock_calls)) + + with mock.patch("kafka.KafkaProducer") as fake_producer_class: + fake_producer = fake_producer_class.return_value + conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"}, + {"fake_text": "fake_message_1"}, 10) + self.assertEqual(1, len(fake_producer.send.mock_calls))