Use Confluent Kafka client

The change introduces the possibility to run the persister with
the new confluent-kafka client. It has to be enabled in the
configuration file.

Story: 2003705
Task: 30117

Depends-On: https://review.opendev.org/675297
Change-Id: I05428b8ae9e0ba9af5b81d3b103434ebd5657108
This commit is contained in:
Witek Bedyk 2019-03-20 00:14:42 +01:00
parent 4e8b1fae05
commit b88084003f
18 changed files with 153 additions and 44 deletions

View File

@ -22,7 +22,7 @@ keystoneauth1==3.4.0
linecache2==1.0.0
mccabe==0.4.0
mock==2.0.0
monasca-common==2.7.0
monasca-common==2.16.0
monotonic==0.6
mox3==0.20.0
msgpack-python==0.4.0

View File

@ -42,7 +42,13 @@ kafka_common_opts = [
default=32768),
cfg.IntOpt('num_processors',
help='Number of processes spawned by persister',
default=1)
default=1),
cfg.BoolOpt('legacy_kafka_client_enabled',
help='Enable legacy Kafka client. When set old version of '
'kafka-python library is used. Message format version '
'for the brokers should be set to 0.9.0.0 to avoid '
'performance issues until all consumers are upgraded.',
default=True)
]
kafka_common_group = cfg.OptGroup(name='kafka',

View File

View File

@ -0,0 +1,37 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_common.kafka import client_factory
import six
from monasca_persister.repositories import persister
from monasca_persister.repositories import singleton
@six.add_metaclass(singleton.Singleton)
class ConfluentKafkaPersister(persister.Persister):
def __init__(self, kafka_conf, repository, client_id=""):
super(ConfluentKafkaPersister, self).__init__(kafka_conf, repository)
self._consumer = client_factory.get_kafka_consumer(
kafka_url=kafka_conf.uri,
kafka_consumer_group=kafka_conf.group_id,
kafka_topic=kafka_conf.topic,
client_id=client_id,
repartition_callback=ConfluentKafkaPersister.flush,
commit_callback=self._flush,
max_commit_interval=kafka_conf.max_wait_time_seconds
)
@staticmethod
def flush(kafka_consumer, partitions):
p = ConfluentKafkaPersister()
p._flush()

View File

@ -0,0 +1,34 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_common.kafka import client_factory
import six
from monasca_persister.repositories import persister
from monasca_persister.repositories import singleton
@six.add_metaclass(singleton.Singleton)
class LegacyKafkaPersister(persister.Persister):
def __init__(self, kafka_conf, zookeeper_conf, repository):
super(LegacyKafkaPersister, self).__init__(kafka_conf, repository)
self._consumer = client_factory.get_kafka_consumer(
kafka_url=kafka_conf.uri,
kafka_consumer_group=kafka_conf.group_id,
kafka_topic=kafka_conf.topic,
zookeeper_url=zookeeper_conf.uri,
zookeeper_path=kafka_conf.zookeeper_path,
use_legacy_client=True,
repartition_callback=self._flush,
commit_callback=self._flush,
max_commit_interval=kafka_conf.max_wait_time_seconds
)

View File

@ -33,7 +33,8 @@ from oslo_config import cfg
from oslo_log import log
from monasca_persister import config
from monasca_persister.repositories import persister
from monasca_persister.kafka import confluent_kafka_persister
from monasca_persister.kafka import legacy_kafka_persister
LOG = log.getLogger(__name__)
@ -91,8 +92,12 @@ def clean_exit(signum, frame=None):
def start_process(respository, kafka_config):
LOG.info("start process: {}".format(respository))
m_persister = persister.Persister(kafka_config, cfg.CONF.zookeeper,
respository)
if kafka_config.legacy_kafka_client_enabled:
m_persister = legacy_kafka_persister.LegacyKafkaPersister(
kafka_config, cfg.CONF.zookeeper, respository)
else:
m_persister = confluent_kafka_persister.ConfluentKafkaPersister(
kafka_config, respository)
m_persister.run()

View File

@ -13,36 +13,25 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABCMeta
import os
from oslo_config import cfg
from oslo_log import log
import six
from monasca_common.kafka import consumer
from monasca_persister.repositories import singleton
LOG = log.getLogger(__name__)
class Persister(object):
def __init__(self, kafka_conf, zookeeper_conf, repository):
@six.add_metaclass(singleton.Singleton)
class Persister(six.with_metaclass(ABCMeta, object)):
def __init__(self, kafka_conf, repository):
self._data_points = []
self._kafka_topic = kafka_conf.topic
self._batch_size = kafka_conf.batch_size
self._consumer = consumer.KafkaConsumer(
kafka_conf.uri,
zookeeper_conf.uri,
kafka_conf.zookeeper_path,
kafka_conf.group_id,
kafka_conf.topic,
repartition_callback=self._flush,
commit_callback=self._flush,
commit_timeout=kafka_conf.max_wait_time_seconds)
self.repository = repository()
def _flush(self):
@ -76,9 +65,8 @@ class Persister(object):
def run(self):
try:
for raw_message in self._consumer:
for message in self._consumer:
try:
message = raw_message[1]
data_point = self.repository.process_message(message)
self._data_points.append(data_point)
except Exception:

View File

@ -0,0 +1,22 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class Singleton(type):
def __init__(cls, name, bases, d):
super(Singleton, cls).__init__(name, bases, d)
cls.instance = None
def __call__(cls, *args, **kw):
if cls.instance is None:
cls.instance = super(Singleton, cls).__call__(*args, **kw)
return cls.instance

View File

@ -17,7 +17,8 @@ import ujson as json
def parse_measurement_message(message):
decoded_message = json.loads(message.message.value)
decoded_message = json.loads(message.value())
metric = decoded_message['metric']
@ -39,7 +40,8 @@ def parse_measurement_message(message):
def parse_alarm_state_hist_message(message):
decoded_message = json.loads(message.message.value)
decoded_message = json.loads(message.value())
alarm_transitioned = decoded_message['alarm-transitioned']
@ -94,7 +96,8 @@ def parse_alarm_state_hist_message(message):
def parse_events_message(message):
decoded_message = json.loads(message.message.value)
decoded_message = json.loads(message.value())
event_type = decoded_message['event']['event_type']
timestamp = decoded_message['event']['timestamp']
payload = decoded_message['event']['payload']

View File

@ -57,7 +57,7 @@ class TestAlarmStateHistoryRepo(base.BaseTestCase):
def test_process_message(self):
message = Mock()
message.message.value = """{
message.value.return_value = """{
"alarm-transitioned": {
"alarmId": "dummyid",
"metrics": "dummymetrics",

View File

@ -11,15 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
import json
import os
from mock import patch
from oslotest import base
from testtools import matchers
from monasca_persister.repositories.elasticsearch import events_repository
from monasca_persister.repositories import utils
from mock import Mock
from testtools import matchers
from datetime import datetime
class TestEvents(base.BaseTestCase):
@ -58,10 +59,13 @@ class TestEvents(base.BaseTestCase):
self.assertEqual('2017-08-07', normalize_timestamp('2017-08-07 11:22:43'))
def _load_event(self, event_name):
@patch('monasca_common.kafka.legacy_kafka_message')
def _load_event(self, event_name, mock_kafka_message):
if self.events is None:
filepath = os.path.join(os.path.dirname(__file__), 'events.json')
self.events = json.load(open(filepath))
# create a kafka message envelope
value = json.dumps(self.events[event_name])
return Mock(message=Mock(value=value))
message = mock_kafka_message.LegacyKafkaMessage()
message.value.return_value = value
return message

View File

@ -35,7 +35,7 @@ class TestInfluxdbAlarmStateHistoryRepo(base.BaseTestCase):
def test_process_message(self):
message = Mock()
message.message.value = """{
message.value.return_value = """{
"alarm-transitioned": {
"alarmId": "dummyid",
"metrics": "dummymetrics",

View File

@ -60,4 +60,6 @@ class TestMetricInfluxdbRepository(base.BaseTestCase):
"creation_time":1554725988
}
'''
return Mock(message=Mock(value=metric))
message = Mock()
message.value.return_value = metric
return message

View File

@ -188,7 +188,8 @@ class TestPersister(base.BaseTestCase):
fake_kafka_config = Mock()
fake_repository = Mock()
with patch('monasca_persister.repositories.persister.Persister') as mock_persister_class:
with patch('monasca_persister.kafka.legacy_kafka_persister'
'.LegacyKafkaPersister') as mock_persister_class:
self.persister.start_process(fake_repository, fake_kafka_config)
mock_persister_class.assert_called_once_with(

View File

@ -21,7 +21,7 @@ from oslotest import base
from oslo_config import cfg
from monasca_common.kafka import consumer
from monasca_persister.repositories.persister import Persister
from monasca_persister.kafka.legacy_kafka_persister import LegacyKafkaPersister
from monasca_persister.repositories.persister import LOG
@ -36,7 +36,7 @@ class TestPersisterRepo(base.BaseTestCase):
self._set_patchers()
self._set_mocks()
self.persister = Persister(self.mock_kafka, self.mock_zookeeper, Mock())
self.persister = LegacyKafkaPersister(self.mock_kafka, self.mock_zookeeper, Mock())
def _set_mocks(self):
self.mock_kafka = Mock()
@ -102,7 +102,7 @@ class TestPersisterRepo(base.BaseTestCase):
return_value='message'):
with patch.object(self.persister, '_consumer', return_value=Mock()) as mock_consumer:
self.persister._data_points = ['a']
self.persister._consumer.__iter__.return_value = ['aa', 'bb']
self.persister._consumer.__iter__.return_value = ('aa', 'bb')
self.persister._batch_size = 1
self.persister.run()
mock_consumer.commit.assert_called()

View File

@ -27,7 +27,7 @@ class TestUtils(base.BaseTestCase):
def test_parse_measurement_message(self):
message = Mock()
message.message.value = """{
message.value.return_value = """{
"metric": {
"name": "metric_name",
"timestamp": "metric_timestamp",
@ -54,7 +54,7 @@ class TestUtils(base.BaseTestCase):
def test_parse_alarm_state_hist_message(self):
message = Mock()
message.message.value = """{
message.value.return_value = """{
"alarm-transitioned": {
"alarmId": "dummyid",
"metrics": "dummymetrics",
@ -92,7 +92,7 @@ class TestUtils(base.BaseTestCase):
def test_parse_events_message(self):
message = Mock()
message.message.value = """{
message.value.return_value = """{
"event": {
"event_type": "dummy_event_type",
"timestamp": "dummy_timestamp",

View File

@ -0,0 +1,7 @@
---
features:
- |
Configuration option `legacy_kafka_client_enabled` added to allow working
with both legacy kafka-python and new Confluent Kafka client. Please set
message format version for the Kafka brokers to 0.9.0.0 to avoid
performance issues until all consumers are upgraded.

View File

@ -5,4 +5,4 @@ oslo.config>=5.2.0 # Apache-2.0
oslo.log>=3.36.0 # Apache-2.0
six>=1.10.0 # MIT
monasca-common>=2.7.0 # Apache-2.0
monasca-common>=2.16.0 # Apache-2.0