Merge "Remove references to kafka-python"

This commit is contained in:
Jenkins 2017-01-26 07:36:44 +00:00 committed by Gerrit Code Review
commit ca5e57d5db
3 changed files with 10 additions and 73 deletions

View File

@ -1,4 +1,4 @@
# Copyright 2014 Hewlett-Packard # Copyright 2014,2017 Hewlett-Packard
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -12,17 +12,15 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import time
from kafka import client
from kafka import common
from kafka import producer
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
from monasca_api.common.messaging import exceptions from monasca_api.common.messaging import exceptions
from monasca_api.common.messaging import publisher from monasca_api.common.messaging import publisher
import monasca_common.kafka.producer as kafka_producer
import monasca_common.kafka_lib.common as kafka_common
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -46,77 +44,17 @@ class KafkaPublisher(publisher.Publisher):
self.partitions = cfg.CONF.kafka.partitions self.partitions = cfg.CONF.kafka.partitions
self.drop_data = cfg.CONF.kafka.drop_data self.drop_data = cfg.CONF.kafka.drop_data
self._client = None self._producer = kafka_producer.KafkaProducer(self.uri)
self._producer = None
def _init_client(self, wait_time=None):
for i in range(self.max_retry):
try:
# if there is a client instance, but _init_client is called
# again, most likely the connection has gone stale, close that
# connection and reconnect.
if self._client:
self._client.close()
if not wait_time:
wait_time = self.wait_time
time.sleep(wait_time)
self._client = client.KafkaClient(self.uri)
# when a client is re-initialized, existing consumer should be
# reset as well.
self._producer = None
break
except common.KafkaUnavailableError:
LOG.error('Kafka server at %s is down.' % self.uri)
except common.LeaderNotAvailableError:
LOG.error('Kafka at %s has no leader available.' % self.uri)
except Exception:
LOG.error('Kafka at %s initialization failed.' % self.uri)
# Wait a bit and try again to get a client
time.sleep(self.wait_time)
def _init_producer(self):
try:
if not self._client:
self._init_client()
self._producer = producer.SimpleProducer(
self._client, async=self.async, ack_timeout=self.ack_time)
LOG.debug('Kafka SimpleProducer was created successfully.')
except Exception:
self._producer = None
LOG.exception('Kafka (%s) producer can not be created.' % self.uri)
def close(self): def close(self):
if self._client: pass
self._producer = None
self._client.close()
def send_message(self, message): def send_message(self, message):
try: try:
if not self._producer: self._producer.publish(self.topic, message)
self._init_producer()
self._producer.send_messages(self.topic, message)
except (common.KafkaUnavailableError, except (kafka_common.KafkaUnavailableError,
common.LeaderNotAvailableError): kafka_common.LeaderNotAvailableError):
self._client = None
LOG.exception('Error occurred while posting data to Kafka.')
raise exceptions.MessageQueueException()
except Exception:
LOG.exception('Unknown error.')
raise exceptions.MessageQueueException()
def send_message_batch(self, messages):
try:
if not self._producer:
self._init_producer()
self._producer.send_messages(self.topic, *messages)
except (common.KafkaUnavailableError,
common.LeaderNotAvailableError):
self._client = None
LOG.exception('Error occurred while posting data to Kafka.') LOG.exception('Error occurred while posting data to Kafka.')
raise exceptions.MessageQueueException() raise exceptions.MessageQueueException()
except Exception: except Exception:

View File

@ -95,7 +95,7 @@ class Metrics(metrics_api_v2.MetricsV2API):
def _send_metrics(self, metrics): def _send_metrics(self, metrics):
try: try:
self._message_queue.send_message_batch(metrics) self._message_queue.send_message(metrics)
except message_queue_exceptions.MessageQueueException as ex: except message_queue_exceptions.MessageQueueException as ex:
LOG.exception(ex) LOG.exception(ex)
raise falcon.HTTPServiceUnavailable('Service unavailable', raise falcon.HTTPServiceUnavailable('Service unavailable',

View File

@ -20,7 +20,6 @@ voluptuous>=0.8.9 # BSD License
#influxdb #influxdb
#cassandra-driver>=2.1.4,!=3.6.0 # Apache-2.0 #cassandra-driver>=2.1.4,!=3.6.0 # Apache-2.0
eventlet!=0.18.3,>=0.18.2 # MIT eventlet!=0.18.3,>=0.18.2 # MIT
kafka-python<1.0.0,>=0.9.5 # Apache-2.0
simplejson>=2.2.0 # MIT simplejson>=2.2.0 # MIT
monasca-common>=1.4.0 # Apache-2.0 monasca-common>=1.4.0 # Apache-2.0
SQLAlchemy<1.1.0,>=1.0.10 # MIT SQLAlchemy<1.1.0,>=1.0.10 # MIT