diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index deb5da0f3..676b1adba 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -33,7 +33,6 @@ import tenacity from oslo_messaging._drivers import base from oslo_messaging._drivers import common as driver_common from oslo_messaging._drivers.kafka_driver import kafka_options -from oslo_messaging._drivers import pool as driver_pool from oslo_messaging._i18n import _LE from oslo_messaging._i18n import _LW from oslo_serialization import jsonutils @@ -81,17 +80,21 @@ def pack_message(ctxt, msg): return msg -def target_to_topic(target, priority=None): +def concat(sep, items): + return sep.join(filter(bool, items)) + + +def target_to_topic(target, priority=None, vhost=None): """Convert target into topic string :param target: Message destination target :type target: oslo_messaging.Target :param priority: Notification priority :type priority: string + :param priority: Notification vhost + :type priority: string """ - if not priority: - return target.topic - return target.topic + '.' + priority + return concat(".", [target.topic, priority, vhost]) def retry_on_retriable_kafka_error(exc): @@ -114,22 +117,12 @@ def with_reconnect(retries=None): class Connection(object): - def __init__(self, conf, url, purpose): + def __init__(self, conf, url): - self.client = None - driver_conf = conf.oslo_messaging_kafka - self.batch_size = driver_conf.producer_batch_size - self.linger_ms = driver_conf.producer_batch_timeout * 1000 self.conf = conf - self.producer = None - self.producer_lock = threading.Lock() - self.consumer = None - self.consumer_timeout = driver_conf.kafka_consumer_timeout - self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes - self.group_id = driver_conf.consumer_group self.url = url + self.virtual_host = url.virtual_host self._parse_url() - self._consume_loop_stopped = False def _parse_url(self): driver_conf = self.conf.oslo_messaging_kafka @@ -145,33 +138,22 @@ class Connection(object): self.hostaddrs.append("%s:%s" % (driver_conf.kafka_default_host, driver_conf.kafka_default_port)) - def notify_send(self, topic, ctxt, msg, retry): - """Send messages to Kafka broker. + def reset(self): + """Reset a connection so it can be used again.""" + pass - :param topic: String of the topic - :param ctxt: context for the messages - :param msg: messages for publishing - :param retry: the number of retry - """ - retry = retry if retry >= 0 else None - message = pack_message(ctxt, msg) - message = jsonutils.dumps(message) - @with_reconnect(retries=retry) - def wrapped_with_reconnect(): - self._ensure_producer() - # NOTE(sileht): This returns a future, we can use get() - # if we want to block like other driver - future = self.producer.send(topic, message) - future.get() +class ConsumerConnection(Connection): - try: - wrapped_with_reconnect() - except Exception: - # NOTE(sileht): if something goes wrong close the producer - # connection - self._close_producer() - raise + def __init__(self, conf, url): + + super(ConsumerConnection, self).__init__(conf, url) + driver_conf = self.conf.oslo_messaging_kafka + self.consumer = None + self.consumer_timeout = driver_conf.kafka_consumer_timeout + self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes + self.group_id = driver_conf.consumer_group + self._consume_loop_stopped = False @with_reconnect() def _poll_messages(self, timeout): @@ -215,16 +197,67 @@ class Connection(object): def stop_consuming(self): self._consume_loop_stopped = True - def reset(self): - """Reset a connection so it can be used again.""" - pass - def close(self): - self._close_producer() if self.consumer: self.consumer.close() self.consumer = None + @with_reconnect() + def declare_topic_consumer(self, topics, group=None): + # TODO(Support for manual/auto_commit functionality) + # When auto_commit is False, consumer can manually notify + # the completion of the subscription. + # Currently we don't support for non auto commit option + self.consumer = kafka.KafkaConsumer( + *topics, group_id=(group or self.group_id), + bootstrap_servers=self.hostaddrs, + max_partition_fetch_bytes=self.max_fetch_bytes, + selector=KAFKA_SELECTOR + ) + + +class ProducerConnection(Connection): + + def __init__(self, conf, url): + + super(ProducerConnection, self).__init__(conf, url) + driver_conf = self.conf.oslo_messaging_kafka + self.batch_size = driver_conf.producer_batch_size + self.linger_ms = driver_conf.producer_batch_timeout * 1000 + self.producer = None + self.producer_lock = threading.Lock() + + def notify_send(self, topic, ctxt, msg, retry): + """Send messages to Kafka broker. + + :param topic: String of the topic + :param ctxt: context for the messages + :param msg: messages for publishing + :param retry: the number of retry + """ + retry = retry if retry >= 0 else None + message = pack_message(ctxt, msg) + message = jsonutils.dumps(message) + + @with_reconnect(retries=retry) + def wrapped_with_reconnect(): + self._ensure_producer() + # NOTE(sileht): This returns a future, we can use get() + # if we want to block like other driver + future = self.producer.send(topic, message) + future.get() + + try: + wrapped_with_reconnect() + except Exception: + # NOTE(sileht): if something goes wrong close the producer + # connection + self._close_producer() + raise + + def close(self): + self._close_producer() + def _close_producer(self): with self.producer_lock: if self.producer: @@ -243,19 +276,6 @@ class Connection(object): batch_size=self.batch_size, selector=KAFKA_SELECTOR) - @with_reconnect() - def declare_topic_consumer(self, topics, group=None): - # TODO(Support for manual/auto_commit functionality) - # When auto_commit is False, consumer can manually notify - # the completion of the subscription. - # Currently we don't support for non auto commit option - self.consumer = kafka.KafkaConsumer( - *topics, group_id=(group or self.group_id), - bootstrap_servers=self.hostaddrs, - max_partition_fetch_bytes=self.max_fetch_bytes, - selector=KAFKA_SELECTOR - ) - class OsloKafkaMessage(base.RpcIncomingMessage): @@ -314,17 +334,12 @@ class KafkaDriver(base.BaseDriver): super(KafkaDriver, self).__init__( conf, url, default_exchange, allowed_remote_exmods) - # the pool configuration properties - max_size = self.conf.oslo_messaging_kafka.pool_size - min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size - ttl = self.conf.oslo_messaging_kafka.conn_pool_ttl - - self.connection_pool = driver_pool.ConnectionPool( - self.conf, max_size, min_size, ttl, - self._url, Connection) self.listeners = [] + self.virtual_host = url.virtual_host + self.pconn = ProducerConnection(conf, url) def cleanup(self): + self.pconn.close() for c in self.listeners: c.close() self.listeners = [] @@ -351,8 +366,9 @@ class KafkaDriver(base.BaseDriver): N means N retries :type retry: int """ - with self._get_connection(purpose=driver_common.PURPOSE_SEND) as conn: - conn.notify_send(target_to_topic(target), ctxt, message, retry) + self.pconn.notify_send(target_to_topic(target, + vhost=self.virtual_host), + ctxt, message, retry) def listen(self, target, batch_size, batch_timeout): raise NotImplementedError( @@ -370,7 +386,7 @@ class KafkaDriver(base.BaseDriver): :param pool: consumer group of Kafka consumers :type pool: string """ - conn = self._get_connection(purpose=driver_common.PURPOSE_LISTEN) + conn = ConsumerConnection(self.conf, self._url) topics = set() for target, priority in targets_and_priorities: topics.add(target_to_topic(target, priority)) @@ -380,6 +396,3 @@ class KafkaDriver(base.BaseDriver): listener = KafkaListener(conn) return base.PollStyleListenerAdapter(listener, batch_size, batch_timeout) - - def _get_connection(self, purpose): - return driver_common.ConnectionContext(self.connection_pool, purpose) diff --git a/oslo_messaging/_drivers/kafka_driver/kafka_options.py b/oslo_messaging/_drivers/kafka_driver/kafka_options.py index 0bdcde532..398f707c5 100644 --- a/oslo_messaging/_drivers/kafka_driver/kafka_options.py +++ b/oslo_messaging/_drivers/kafka_driver/kafka_options.py @@ -33,12 +33,18 @@ KAFKA_OPTS = [ help='Default timeout(s) for Kafka consumers'), cfg.IntOpt('pool_size', default=10, + deprecated_for_removal=True, + deprecated_reason='Driver no longer uses connection pool. ', help='Pool Size for Kafka Consumers'), cfg.IntOpt('conn_pool_min_size', default=2, + deprecated_for_removal=True, + deprecated_reason='Driver no longer uses connection pool. ', help='The pool size limit for connections expiration policy'), cfg.IntOpt('conn_pool_ttl', default=1200, + deprecated_for_removal=True, + deprecated_reason='Driver no longer uses connection pool. ', help='The time-to-live in sec of idle connections in the pool'), cfg.StrOpt('consumer_group', default="oslo_messaging_consumer", diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index e7dc11599..f2f6f7f73 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -17,7 +17,6 @@ from six.moves import mock import testscenarios import oslo_messaging -from oslo_messaging._drivers import common as common_driver from oslo_messaging._drivers import impl_kafka as kafka_driver from oslo_messaging.tests import utils as test_utils @@ -39,16 +38,24 @@ class TestKafkaTransportURL(test_utils.BaseTestCase): scenarios = [ ('none', dict(url=None, - expected=dict(hostaddrs=['localhost:9092']))), + expected=dict(hostaddrs=['localhost:9092'], + vhost=None))), ('empty', dict(url='kafka:///', - expected=dict(hostaddrs=['localhost:9092']))), + expected=dict(hostaddrs=['localhost:9092'], + vhost=''))), ('host', dict(url='kafka://127.0.0.1', - expected=dict(hostaddrs=['127.0.0.1:9092']))), + expected=dict(hostaddrs=['127.0.0.1:9092'], + vhost=None))), ('port', dict(url='kafka://localhost:1234', - expected=dict(hostaddrs=['localhost:1234']))), + expected=dict(hostaddrs=['localhost:1234'], + vhost=None))), + ('vhost', dict(url='kafka://localhost:1234/my_host', + expected=dict(hostaddrs=['localhost:1234'], + vhost='my_host'))), ('two', dict(url='kafka://localhost:1234,localhost2:1234', expected=dict(hostaddrs=['localhost:1234', - 'localhost2:1234']))), + 'localhost2:1234'], + vhost=None))), ] @@ -62,8 +69,8 @@ class TestKafkaTransportURL(test_utils.BaseTestCase): self.addCleanup(transport.cleanup) driver = transport._driver - conn = driver._get_connection(common_driver.PURPOSE_SEND) - self.assertEqual(self.expected['hostaddrs'], conn.hostaddrs) + self.assertEqual(self.expected['hostaddrs'], driver.pconn.hostaddrs) + self.assertEqual(self.expected['vhost'], driver.virtual_host) class TestKafkaDriver(test_utils.BaseTestCase): @@ -130,10 +137,11 @@ class TestKafkaConnection(test_utils.BaseTestCase): self.driver = transport._driver def test_notify(self): - conn = self.driver._get_connection(common_driver.PURPOSE_SEND) with mock.patch("kafka.KafkaProducer") as fake_producer_class: fake_producer = fake_producer_class.return_value - conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"}, - {"fake_text": "fake_message_1"}, 10) + self.driver.pconn.notify_send("fake_topic", + {"fake_ctxt": "fake_param"}, + {"fake_text": "fake_message_1"}, + 10) self.assertEqual(2, len(fake_producer.send.mock_calls)) diff --git a/tox.ini b/tox.ini index bbf06cd68..d772882a6 100644 --- a/tox.ini +++ b/tox.ini @@ -53,7 +53,7 @@ commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args= [testenv:py27-func-kafka] setenv = {[testenv]setenv} - TRANSPORT_URL=kafka://127.0.0.1:9092// + TRANSPORT_URL=kafka://127.0.0.1:9092/ OS_GROUP_REGEX=oslo_messaging.tests.functional commands = {toxinidir}/setup-test-env-kafka.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'