Use Confluent Kafka client

The change introduces the possibility to run the persister with the new
confluent-kafka client. It has to be enabled in the configuration file.

Story: 2003705
Task: 35858

Depends-On: https://review.opendev.org/675297
Change-Id: I8d9e173e9a252712fb285c0676c26334e1f677a7
This commit is contained in:
Witek Bedyk 2019-07-30 13:19:58 +02:00
parent 3db7b179ef
commit d47bbae4e8
9 changed files with 59 additions and 37 deletions

View File

@ -23,7 +23,7 @@ keystoneauth1==3.4.0
linecache2==1.0.0 linecache2==1.0.0
mccabe==0.2.1 mccabe==0.2.1
mock==2.0.0 mock==2.0.0
monasca-common==2.7.0 monasca-common==2.16.0
monasca-statsd==1.4.0 monasca-statsd==1.4.0
monotonic==0.6 monotonic==0.6
mox3==0.20.0 mox3==0.20.0

View File

@ -62,7 +62,13 @@ kafka_opts = [
required=True, advanced=True, required=True, advanced=True,
help='Maximum lag for topic that is acceptable by ' help='Maximum lag for topic that is acceptable by '
'the monasca-notification. Notifications that are older ' 'the monasca-notification. Notifications that are older '
'than this offset are skipped.') 'than this offset are skipped.'),
cfg.BoolOpt(name='legacy_kafka_client_enabled', default=True,
required=True, advanced=True,
help='Enable legacy Kafka client. When set old version of '
'kafka-python library is used. Message format version '
'for the brokers should be set to 0.9.0.0 to avoid '
'performance issues until all consumers are upgraded.')
] ]

View File

@ -19,8 +19,7 @@ import time
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from monasca_common.kafka import consumer from monasca_common.kafka import client_factory
from monasca_common.kafka import producer
from monasca_notification.common.utils import get_statsd_client from monasca_notification.common.utils import get_statsd_client
from monasca_notification.processors import alarm_processor as ap from monasca_notification.processors import alarm_processor as ap
from monasca_notification.processors import notification_processor as np from monasca_notification.processors import notification_processor as np
@ -32,13 +31,16 @@ CONF = cfg.CONF
class NotificationEngine(object): class NotificationEngine(object):
def __init__(self): def __init__(self):
self._statsd = get_statsd_client() self._statsd = get_statsd_client()
self._consumer = consumer.KafkaConsumer( self._consumer = client_factory.get_kafka_consumer(
CONF.kafka.url, CONF.kafka.url,
','.join(CONF.zookeeper.url),
CONF.zookeeper.notification_path,
CONF.kafka.group, CONF.kafka.group,
CONF.kafka.alarm_topic) CONF.kafka.alarm_topic,
self._producer = producer.KafkaProducer(CONF.kafka.url) CONF.zookeeper.url,
CONF.zookeeper.notification_path,
CONF.kafka.legacy_kafka_client_enabled)
self._producer = client_factory.get_kafka_producer(
CONF.kafka.url,
CONF.kafka.legacy_kafka_client_enabled)
self._alarms = ap.AlarmProcessor() self._alarms = ap.AlarmProcessor()
self._notifier = np.NotificationProcessor() self._notifier = np.NotificationProcessor()

View File

@ -20,8 +20,7 @@ from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
from monasca_common.kafka import consumer from monasca_common.kafka import client_factory
from monasca_common.kafka import producer
from monasca_notification.common.repositories import exceptions from monasca_notification.common.repositories import exceptions
from monasca_notification.common.utils import construct_notification_object from monasca_notification.common.utils import construct_notification_object
from monasca_notification.common.utils import get_db_repo from monasca_notification.common.utils import get_db_repo
@ -38,14 +37,16 @@ class PeriodicEngine(object):
self._statsd = get_statsd_client() self._statsd = get_statsd_client()
zookeeper_path = CONF.zookeeper.periodic_path[period] self._consumer = client_factory.get_kafka_consumer(
self._consumer = consumer.KafkaConsumer(CONF.kafka.url, CONF.kafka.url,
','.join(CONF.zookeeper.url),
zookeeper_path,
CONF.kafka.group, CONF.kafka.group,
self._topic_name) self._topic_name,
CONF.zookeeper.url,
self._producer = producer.KafkaProducer(CONF.kafka.url) CONF.zookeeper.periodic_path[period],
CONF.kafka.legacy_kafka_client_enabled)
self._producer = client_factory.get_kafka_producer(
CONF.kafka.url,
CONF.kafka.legacy_kafka_client_enabled)
self._notifier = notification_processor.NotificationProcessor() self._notifier = notification_processor.NotificationProcessor()
self._db_repo = get_db_repo() self._db_repo = get_db_repo()
@ -74,7 +75,7 @@ class PeriodicEngine(object):
def run(self): def run(self):
for raw_notification in self._consumer: for raw_notification in self._consumer:
message = raw_notification[1].message.value message = raw_notification.value()
notification_data = jsonutils.loads(message) notification_data = jsonutils.loads(message)
notification = construct_notification_object(self._db_repo, notification_data) notification = construct_notification_object(self._db_repo, notification_data)

View File

@ -107,10 +107,10 @@ class AlarmProcessor(object):
no_notification_count = self._statsd.get_counter(name='alarms_no_notification_count') no_notification_count = self._statsd.get_counter(name='alarms_no_notification_count')
notification_count = self._statsd.get_counter(name='created_count') notification_count = self._statsd.get_counter(name='created_count')
partition = raw_alarm[0] partition = raw_alarm.partition()
offset = raw_alarm[1].offset offset = raw_alarm.offset()
try: try:
alarm = self._parse_alarm(raw_alarm[1].message.value) alarm = self._parse_alarm(raw_alarm.value())
except Exception as e: # This is general because of a lack of json exception base class except Exception as e: # This is general because of a lack of json exception base class
failed_parse_count += 1 failed_parse_count += 1
log.exception( log.exception(

View File

@ -16,12 +16,11 @@
import time import time
from monasca_common.kafka import consumer
from monasca_common.kafka import producer
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
from monasca_common.kafka import client_factory
from monasca_notification.common.utils import construct_notification_object from monasca_notification.common.utils import construct_notification_object
from monasca_notification.common.utils import get_db_repo from monasca_notification.common.utils import get_db_repo
from monasca_notification.common.utils import get_statsd_client from monasca_notification.common.utils import get_statsd_client
@ -35,21 +34,23 @@ class RetryEngine(object):
def __init__(self): def __init__(self):
self._statsd = get_statsd_client() self._statsd = get_statsd_client()
self._consumer = consumer.KafkaConsumer( self._consumer = client_factory.get_kafka_consumer(
CONF.kafka.url, CONF.kafka.url,
','.join(CONF.zookeeper.url),
CONF.zookeeper.notification_retry_path,
CONF.kafka.group, CONF.kafka.group,
CONF.kafka.notification_retry_topic CONF.kafka.notification_retry_topic,
) CONF.zookeeper.url,
self._producer = producer.KafkaProducer(CONF.kafka.url) CONF.zookeeper.notification_retry_path,
CONF.kafka.legacy_kafka_client_enabled)
self._producer = client_factory.get_kafka_producer(
CONF.kafka.url,
CONF.kafka.legacy_kafka_client_enabled)
self._notifier = notification_processor.NotificationProcessor() self._notifier = notification_processor.NotificationProcessor()
self._db_repo = get_db_repo() self._db_repo = get_db_repo()
def run(self): def run(self):
for raw_notification in self._consumer: for raw_notification in self._consumer:
message = raw_notification[1].message.value message = raw_notification.value()
notification_data = jsonutils.loads(message) notification_data = jsonutils.loads(message)
notification = construct_notification_object(self._db_repo, notification_data) notification = construct_notification_object(self._db_repo, notification_data)

View File

@ -0,0 +1,8 @@
---
features:
- |
Configuration option `legacy_kafka_client_enabled` added to allow working
with both legacy *kafka-python* and new *Confluent Kafka* client. Setting
this to *true* enables *kafka-python* client which is default. Please set
message format version for the Kafka brokers to 0.9.0.0 to avoid
performance issues until all consumers are upgraded.

View File

@ -8,7 +8,7 @@ monasca-statsd>=1.4.0 # Apache-2.0
requests>=2.14.2 # Apache-2.0 requests>=2.14.2 # Apache-2.0
PyYAML>=3.12 # MIT PyYAML>=3.12 # MIT
six>=1.10.0 # MIT six>=1.10.0 # MIT
monasca-common>=2.7.0 # Apache-2.0 monasca-common>=2.16.0 # Apache-2.0
oslo.config>=5.2.0 # Apache-2.0 oslo.config>=5.2.0 # Apache-2.0
oslo.log>=3.36.0 # Apache-2.0 oslo.log>=3.36.0 # Apache-2.0
oslo.serialization>=2.18.0 # Apache-2.0 oslo.serialization>=2.18.0 # Apache-2.0

View File

@ -21,13 +21,15 @@ import json
import mock import mock
import time import time
from monasca_common.kafka import legacy_kafka_message
from monasca_notification import notification as m_notification from monasca_notification import notification as m_notification
from monasca_notification.processors import alarm_processor from monasca_notification.processors import alarm_processor
from tests import base from tests import base
alarm_tuple = collections.namedtuple('alarm_tuple', ['offset', 'message']) alarm_tuple = collections.namedtuple('alarm_tuple', ['offset', 'message'])
message_tuple = collections.namedtuple('message_tuple', ['value']) message_tuple = collections.namedtuple('message_tuple', ['key', 'value'])
class TestAlarmProcessor(base.BaseTestCase): class TestAlarmProcessor(base.BaseTestCase):
@ -35,12 +37,14 @@ class TestAlarmProcessor(base.BaseTestCase):
super(TestAlarmProcessor, self).setUp() super(TestAlarmProcessor, self).setUp()
self.trap = [] self.trap = []
def _create_raw_alarm(self, partition, offset, message): def _create_raw_alarm(self, partition, offset, message, key=1):
"""Create a raw alarm, with the given message dictionary. """Create a raw alarm, with the given message dictionary.
""" """
json_msg = json.dumps({'alarm-transitioned': message}) json_msg = json.dumps({'alarm-transitioned': message})
msg_tuple = message_tuple(json_msg) msg_tuple = message_tuple(key, json_msg)
return [partition, alarm_tuple(offset, msg_tuple)] return legacy_kafka_message.LegacyKafkaMessage([partition,
alarm_tuple(offset,
msg_tuple)])
@mock.patch('pymysql.connect') @mock.patch('pymysql.connect')
@mock.patch('monasca_notification.processors.alarm_processor.log') @mock.patch('monasca_notification.processors.alarm_processor.log')