From d47bbae4e888f6f51e5faaf2f6ce2f775cc83427 Mon Sep 17 00:00:00 2001 From: Witek Bedyk Date: Tue, 30 Jul 2019 13:19:58 +0200 Subject: [PATCH] Use Confluent Kafka client The change introduces the possibility to run the persister with the new confluent-kafka client. It has to be enabled in the configuration file. Story: 2003705 Task: 35858 Depends-On: https://review.opendev.org/675297 Change-Id: I8d9e173e9a252712fb285c0676c26334e1f677a7 --- lower-constraints.txt | 2 +- monasca_notification/conf/kafka.py | 8 ++++++- monasca_notification/notification_engine.py | 16 +++++++------ monasca_notification/periodic_engine.py | 23 ++++++++++--------- .../processors/alarm_processor.py | 6 ++--- monasca_notification/retry_engine.py | 19 +++++++-------- ...lient_enabled_option-11fdacc063e95ae2.yaml | 8 +++++++ requirements.txt | 2 +- tests/test_alarm_processor.py | 12 ++++++---- 9 files changed, 59 insertions(+), 37 deletions(-) create mode 100644 releasenotes/notes/add_legacy_kafka_client_enabled_option-11fdacc063e95ae2.yaml diff --git a/lower-constraints.txt b/lower-constraints.txt index f45d867..bdeadf9 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -23,7 +23,7 @@ keystoneauth1==3.4.0 linecache2==1.0.0 mccabe==0.2.1 mock==2.0.0 -monasca-common==2.7.0 +monasca-common==2.16.0 monasca-statsd==1.4.0 monotonic==0.6 mox3==0.20.0 diff --git a/monasca_notification/conf/kafka.py b/monasca_notification/conf/kafka.py index 8a07c2a..0fddbf0 100644 --- a/monasca_notification/conf/kafka.py +++ b/monasca_notification/conf/kafka.py @@ -62,7 +62,13 @@ kafka_opts = [ required=True, advanced=True, help='Maximum lag for topic that is acceptable by ' 'the monasca-notification. Notifications that are older ' - 'than this offset are skipped.') + 'than this offset are skipped.'), + cfg.BoolOpt(name='legacy_kafka_client_enabled', default=True, + required=True, advanced=True, + help='Enable legacy Kafka client. When set old version of ' + 'kafka-python library is used. Message format version ' + 'for the brokers should be set to 0.9.0.0 to avoid ' + 'performance issues until all consumers are upgraded.') ] diff --git a/monasca_notification/notification_engine.py b/monasca_notification/notification_engine.py index 3673560..41ef936 100644 --- a/monasca_notification/notification_engine.py +++ b/monasca_notification/notification_engine.py @@ -19,8 +19,7 @@ import time from oslo_config import cfg from oslo_log import log as logging -from monasca_common.kafka import consumer -from monasca_common.kafka import producer +from monasca_common.kafka import client_factory from monasca_notification.common.utils import get_statsd_client from monasca_notification.processors import alarm_processor as ap from monasca_notification.processors import notification_processor as np @@ -32,13 +31,16 @@ CONF = cfg.CONF class NotificationEngine(object): def __init__(self): self._statsd = get_statsd_client() - self._consumer = consumer.KafkaConsumer( + self._consumer = client_factory.get_kafka_consumer( CONF.kafka.url, - ','.join(CONF.zookeeper.url), - CONF.zookeeper.notification_path, CONF.kafka.group, - CONF.kafka.alarm_topic) - self._producer = producer.KafkaProducer(CONF.kafka.url) + CONF.kafka.alarm_topic, + CONF.zookeeper.url, + CONF.zookeeper.notification_path, + CONF.kafka.legacy_kafka_client_enabled) + self._producer = client_factory.get_kafka_producer( + CONF.kafka.url, + CONF.kafka.legacy_kafka_client_enabled) self._alarms = ap.AlarmProcessor() self._notifier = np.NotificationProcessor() diff --git a/monasca_notification/periodic_engine.py b/monasca_notification/periodic_engine.py index ba2c083..4e772dc 100644 --- a/monasca_notification/periodic_engine.py +++ b/monasca_notification/periodic_engine.py @@ -20,8 +20,7 @@ from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils -from monasca_common.kafka import consumer -from monasca_common.kafka import producer +from monasca_common.kafka import client_factory from monasca_notification.common.repositories import exceptions from monasca_notification.common.utils import construct_notification_object from monasca_notification.common.utils import get_db_repo @@ -38,14 +37,16 @@ class PeriodicEngine(object): self._statsd = get_statsd_client() - zookeeper_path = CONF.zookeeper.periodic_path[period] - self._consumer = consumer.KafkaConsumer(CONF.kafka.url, - ','.join(CONF.zookeeper.url), - zookeeper_path, - CONF.kafka.group, - self._topic_name) - - self._producer = producer.KafkaProducer(CONF.kafka.url) + self._consumer = client_factory.get_kafka_consumer( + CONF.kafka.url, + CONF.kafka.group, + self._topic_name, + CONF.zookeeper.url, + CONF.zookeeper.periodic_path[period], + CONF.kafka.legacy_kafka_client_enabled) + self._producer = client_factory.get_kafka_producer( + CONF.kafka.url, + CONF.kafka.legacy_kafka_client_enabled) self._notifier = notification_processor.NotificationProcessor() self._db_repo = get_db_repo() @@ -74,7 +75,7 @@ class PeriodicEngine(object): def run(self): for raw_notification in self._consumer: - message = raw_notification[1].message.value + message = raw_notification.value() notification_data = jsonutils.loads(message) notification = construct_notification_object(self._db_repo, notification_data) diff --git a/monasca_notification/processors/alarm_processor.py b/monasca_notification/processors/alarm_processor.py index bb58108..704192f 100644 --- a/monasca_notification/processors/alarm_processor.py +++ b/monasca_notification/processors/alarm_processor.py @@ -107,10 +107,10 @@ class AlarmProcessor(object): no_notification_count = self._statsd.get_counter(name='alarms_no_notification_count') notification_count = self._statsd.get_counter(name='created_count') - partition = raw_alarm[0] - offset = raw_alarm[1].offset + partition = raw_alarm.partition() + offset = raw_alarm.offset() try: - alarm = self._parse_alarm(raw_alarm[1].message.value) + alarm = self._parse_alarm(raw_alarm.value()) except Exception as e: # This is general because of a lack of json exception base class failed_parse_count += 1 log.exception( diff --git a/monasca_notification/retry_engine.py b/monasca_notification/retry_engine.py index 045168d..78979f0 100644 --- a/monasca_notification/retry_engine.py +++ b/monasca_notification/retry_engine.py @@ -16,12 +16,11 @@ import time -from monasca_common.kafka import consumer -from monasca_common.kafka import producer from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils +from monasca_common.kafka import client_factory from monasca_notification.common.utils import construct_notification_object from monasca_notification.common.utils import get_db_repo from monasca_notification.common.utils import get_statsd_client @@ -35,21 +34,23 @@ class RetryEngine(object): def __init__(self): self._statsd = get_statsd_client() - self._consumer = consumer.KafkaConsumer( + self._consumer = client_factory.get_kafka_consumer( CONF.kafka.url, - ','.join(CONF.zookeeper.url), - CONF.zookeeper.notification_retry_path, CONF.kafka.group, - CONF.kafka.notification_retry_topic - ) - self._producer = producer.KafkaProducer(CONF.kafka.url) + CONF.kafka.notification_retry_topic, + CONF.zookeeper.url, + CONF.zookeeper.notification_retry_path, + CONF.kafka.legacy_kafka_client_enabled) + self._producer = client_factory.get_kafka_producer( + CONF.kafka.url, + CONF.kafka.legacy_kafka_client_enabled) self._notifier = notification_processor.NotificationProcessor() self._db_repo = get_db_repo() def run(self): for raw_notification in self._consumer: - message = raw_notification[1].message.value + message = raw_notification.value() notification_data = jsonutils.loads(message) notification = construct_notification_object(self._db_repo, notification_data) diff --git a/releasenotes/notes/add_legacy_kafka_client_enabled_option-11fdacc063e95ae2.yaml b/releasenotes/notes/add_legacy_kafka_client_enabled_option-11fdacc063e95ae2.yaml new file mode 100644 index 0000000..3d6e8b8 --- /dev/null +++ b/releasenotes/notes/add_legacy_kafka_client_enabled_option-11fdacc063e95ae2.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + Configuration option `legacy_kafka_client_enabled` added to allow working + with both legacy *kafka-python* and new *Confluent Kafka* client. Setting + this to *true* enables *kafka-python* client which is default. Please set + message format version for the Kafka brokers to 0.9.0.0 to avoid + performance issues until all consumers are upgraded. diff --git a/requirements.txt b/requirements.txt index 1d85502..f45bae6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ monasca-statsd>=1.4.0 # Apache-2.0 requests>=2.14.2 # Apache-2.0 PyYAML>=3.12 # MIT six>=1.10.0 # MIT -monasca-common>=2.7.0 # Apache-2.0 +monasca-common>=2.16.0 # Apache-2.0 oslo.config>=5.2.0 # Apache-2.0 oslo.log>=3.36.0 # Apache-2.0 oslo.serialization>=2.18.0 # Apache-2.0 diff --git a/tests/test_alarm_processor.py b/tests/test_alarm_processor.py index dba5f17..f341495 100644 --- a/tests/test_alarm_processor.py +++ b/tests/test_alarm_processor.py @@ -21,13 +21,15 @@ import json import mock import time +from monasca_common.kafka import legacy_kafka_message + from monasca_notification import notification as m_notification from monasca_notification.processors import alarm_processor from tests import base alarm_tuple = collections.namedtuple('alarm_tuple', ['offset', 'message']) -message_tuple = collections.namedtuple('message_tuple', ['value']) +message_tuple = collections.namedtuple('message_tuple', ['key', 'value']) class TestAlarmProcessor(base.BaseTestCase): @@ -35,12 +37,14 @@ class TestAlarmProcessor(base.BaseTestCase): super(TestAlarmProcessor, self).setUp() self.trap = [] - def _create_raw_alarm(self, partition, offset, message): + def _create_raw_alarm(self, partition, offset, message, key=1): """Create a raw alarm, with the given message dictionary. """ json_msg = json.dumps({'alarm-transitioned': message}) - msg_tuple = message_tuple(json_msg) - return [partition, alarm_tuple(offset, msg_tuple)] + msg_tuple = message_tuple(key, json_msg) + return legacy_kafka_message.LegacyKafkaMessage([partition, + alarm_tuple(offset, + msg_tuple)]) @mock.patch('pymysql.connect') @mock.patch('monasca_notification.processors.alarm_processor.log')