Files
monasca-api/monasca_api/common/messaging/kafka_publisher.py
Witek Bedyk a76a745978 Set maximum buffer size for Kafka producer
The change sets queue.buffering.max.messages configuration option for
Kafka producer effectively limiting the number of messages in the buffer
before sending them to Apache Kafka.

Depends-On: https://review.opendev.org/694738
Change-Id: I6ebd4e21e9d55d1ac836e92dd8bf02a678170c68
Story: 2006059
Task: 37532
2019-11-18 17:57:24 +01:00

65 lines
2.4 KiB
Python

# Copyright 2014,2017 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from monasca_common.kafka import client_factory
import monasca_common.kafka_lib.common as kafka_common
from oslo_config import cfg
from oslo_log import log
from monasca_api.common.messaging import exceptions
from monasca_api.common.messaging import publisher
LOG = log.getLogger(__name__)
class KafkaPublisher(publisher.Publisher):
def __init__(self, topic):
if not cfg.CONF.kafka.uri:
raise Exception('Kafka is not configured correctly! '
'Use configuration file to specify Kafka '
'uri, for example: '
'uri=192.168.1.191:9092')
self.uri = cfg.CONF.kafka.uri
self.topic = topic
self.group = cfg.CONF.kafka.group
self.wait_time = cfg.CONF.kafka.wait_time
self.is_async = cfg.CONF.kafka.is_async
self.ack_time = cfg.CONF.kafka.ack_time
self.max_retry = cfg.CONF.kafka.max_retry
self.auto_commit = cfg.CONF.kafka.auto_commit
self.compact = cfg.CONF.kafka.compact
self.partitions = cfg.CONF.kafka.partitions
self.drop_data = cfg.CONF.kafka.drop_data
config = {'queue.buffering.max.messages':
cfg.CONF.kafka.queue_buffering_max_messages}
self._producer = client_factory.get_kafka_producer(
self.uri, cfg.CONF.kafka.legacy_kafka_client_enabled, **config)
def close(self):
pass
def send_message(self, message):
try:
self._producer.publish(self.topic, message)
except (kafka_common.KafkaUnavailableError,
kafka_common.LeaderNotAvailableError):
LOG.exception('Error occurred while posting data to Kafka.')
raise exceptions.MessageQueueException()
except Exception:
LOG.exception('Unknown error.')
raise exceptions.MessageQueueException()