From 47c5ad37d5a4bdcf1a37fd1aa76f7f062a4416bf Mon Sep 17 00:00:00 2001 From: Witek Bedyk Date: Tue, 13 Aug 2019 09:50:10 +0200 Subject: [PATCH] Use Confluent Kafka client The change introduces the possibility to run the API with the new confluent-kafka client. It has to be enabled in the configuration file. Story: 2003705 Task: 35859 Depends-On: https://review.opendev.org/680653 Change-Id: Id513e01c60ea584548c954a8d2e61b9510eee8de --- devstack/plugin.sh | 1 + lower-constraints.txt | 2 +- .../common/messaging/kafka_publisher.py | 7 +- monasca_api/conf/kafka.py | 71 +++++++------------ ...lient_enabled_option-7be9bc4e0fcecc70.yaml | 7 ++ requirements.txt | 2 +- 6 files changed, 40 insertions(+), 50 deletions(-) create mode 100644 releasenotes/notes/add_legacy_kafka_client_enabled_option-7be9bc4e0fcecc70.yaml diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 88278eead..eaefdb60b 100755 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -829,6 +829,7 @@ function configure_monasca_api_python { # messaging iniset "$MONASCA_API_CONF" messaging driver "monasca_api.common.messaging.kafka_publisher:KafkaPublisher" iniset "$MONASCA_API_CONF" kafka uri "$SERVICE_HOST:9092" + iniset "$MONASCA_API_CONF" kafka legacy_kafka_client_enabled false # databases iniset "$MONASCA_API_CONF" database connection $dbAlarmUrl diff --git a/lower-constraints.txt b/lower-constraints.txt index 2f99db682..fb32190d8 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -46,7 +46,7 @@ Mako==0.4.0 MarkupSafe==1.0 mccabe==0.2.1 mock==2.0.0 -monasca-common==2.7.0 +monasca-common==2.16.0 monotonic==0.6 mox3==0.20.0 msgpack-python==0.4.0 diff --git a/monasca_api/common/messaging/kafka_publisher.py b/monasca_api/common/messaging/kafka_publisher.py index a874e58a9..1cb28eba9 100644 --- a/monasca_api/common/messaging/kafka_publisher.py +++ b/monasca_api/common/messaging/kafka_publisher.py @@ -12,14 +12,14 @@ # License for the specific language governing permissions and limitations # under the License. +from monasca_common.kafka import client_factory +import monasca_common.kafka_lib.common as kafka_common from oslo_config import cfg from oslo_log import log from monasca_api.common.messaging import exceptions from monasca_api.common.messaging import publisher -import monasca_common.kafka.producer as kafka_producer -import monasca_common.kafka_lib.common as kafka_common LOG = log.getLogger(__name__) @@ -44,7 +44,8 @@ class KafkaPublisher(publisher.Publisher): self.partitions = cfg.CONF.kafka.partitions self.drop_data = cfg.CONF.kafka.drop_data - self._producer = kafka_producer.KafkaProducer(self.uri) + self._producer = client_factory.get_kafka_producer( + self.uri, cfg.CONF.kafka.legacy_kafka_client_enabled) def close(self): pass diff --git a/monasca_api/conf/kafka.py b/monasca_api/conf/kafka.py index 2e682a922..4c916b765 100644 --- a/monasca_api/conf/kafka.py +++ b/monasca_api/conf/kafka.py @@ -23,65 +23,46 @@ kafka_opts = [ cfg.ListOpt('uri', default=['127.0.0.1:9092'], item_type=types.HostAddressPortType(), - help=''' -Comma separated list of Kafka broker host:port -'''), + help='Comma separated list of Kafka broker host:port'), cfg.StrOpt('metrics_topic', default='metrics', - help=''' -The topic that metrics will be published to -'''), + help='The topic that metrics will be published to'), cfg.StrOpt('events_topic', default='events', - help=''' -The topic that events will be published too -'''), + help='The topic that events will be published to'), cfg.StrOpt('alarm_state_transitions_topic', default='alarm-state-transitions', - help=''' -The topic that alarm state will be published too -'''), + help='The topic that alarm state will be published to'), cfg.StrOpt('group', default='api', - help=''' -The group name that this service belongs to -'''), + help='The group name that this service belongs to'), cfg.IntOpt('wait_time', default=1, advanced=True, min=1, - help=''' -The wait time when no messages on kafka queue -'''), + help='The wait time when no messages on kafka queue (NOT USED)'), cfg.IntOpt('ack_time', default=20, - help=''' -The ack time back to kafka. -'''), + help='The ack time back to kafka. (NOT USED)'), cfg.IntOpt('max_retry', default=3, - help=''' -The number of retry when there is a connection error -'''), + help='Number of retries in case of connection error (NOT USED)'), cfg.BoolOpt('auto_commit', default=False, - advanced=True, help=''' -Should messages be automatically committed -'''), + advanced=True, + help='Whether the message is automatically committed ' + '(NOT USED)'), cfg.BoolOpt('is_async', default=True, - help=''' -The type of posting -'''), + help='Whether posting is asynchronous or not (NOT USED)'), cfg.BoolOpt('compact', default=True, - help=''' -Specify if the message received should be parsed. -If True, message will not be parsed, otherwise -messages will be parsed -'''), + help='Specify if the message received should be parsed. If ' + 'True, message will not be parsed, otherwise messages ' + 'will be parsed (NOT USED)'), cfg.ListOpt('partitions', item_type=int, - default=[0], help=''' -The partitions this connection should -listen for messages on. Currently does not -support multiple partitions. -Default is to listen on partition 0 -'''), + default=[0], + help='The partitions this connection should listen for ' + 'messages on. (NOT USED)'), cfg.BoolOpt('drop_data', default=False, - help=''' -Specify if received data should be simply dropped. -This parameter is only for testing purposes -''') + help='Specify if received data should be simply dropped. ' + 'This parameter is only for testing purposes. (NOT USED)'), + cfg.BoolOpt(name='legacy_kafka_client_enabled', default=True, + required=True, advanced=True, + help='Enable legacy Kafka client. When set old version of ' + 'kafka-python library is used. Message format version ' + 'for the brokers should be set to 0.9.0.0 to avoid ' + 'performance issues until all consumers are upgraded.') ] kafka_group = cfg.OptGroup(name='kafka', title='kafka') diff --git a/releasenotes/notes/add_legacy_kafka_client_enabled_option-7be9bc4e0fcecc70.yaml b/releasenotes/notes/add_legacy_kafka_client_enabled_option-7be9bc4e0fcecc70.yaml new file mode 100644 index 000000000..d8d2cc0b0 --- /dev/null +++ b/releasenotes/notes/add_legacy_kafka_client_enabled_option-7be9bc4e0fcecc70.yaml @@ -0,0 +1,7 @@ +--- +upgrade: + - | + Configuration option `legacy_kafka_client_enabled` added to allow working + with both legacy kafka-python and new Confluent Kafka client. Please set + message format version for the Kafka brokers to 0.9.0.0 to avoid + performance issues until all consumers are upgraded. diff --git a/requirements.txt b/requirements.txt index 1cc04e4d6..1d776fd22 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,5 +21,5 @@ six>=1.10.0 # MIT pyparsing>=2.1.0 # MIT voluptuous>=0.8.9 # BSD License eventlet!=0.18.3,!=0.20.1,!=0.21.0,!=0.23.0,!=0.25.0,>=0.18.2 # MIT -monasca-common>=2.7.0 # Apache-2.0 +monasca-common>=2.16.0 # Apache-2.0 SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT