From 1618ccd38fc4db3f9741389711c07277321636ff Mon Sep 17 00:00:00 2001 From: Komei Shimamura Date: Wed, 8 Oct 2014 20:03:54 +0000 Subject: [PATCH] Add a Kafka publisher as a Ceilometer publisher kafka publisher to support requeuing metering and event data to kafka MQ for external consumption Implements: blueprint kafka-publisher Change-Id: I7f8727ef5a85627a5a605079b6acdcd0619f978c --- ceilometer/publisher/kafka_broker.py | 199 +++++++++++++++++ .../publisher/test_kafka_broker_publisher.py | 206 ++++++++++++++++++ requirements.txt | 1 + setup.cfg | 2 + 4 files changed, 408 insertions(+) create mode 100644 ceilometer/publisher/kafka_broker.py create mode 100644 ceilometer/tests/publisher/test_kafka_broker_publisher.py diff --git a/ceilometer/publisher/kafka_broker.py b/ceilometer/publisher/kafka_broker.py new file mode 100644 index 00000000..423ad070 --- /dev/null +++ b/ceilometer/publisher/kafka_broker.py @@ -0,0 +1,199 @@ +# +# Copyright 2015 Cisco Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json + +import kafka +from oslo_config import cfg +from oslo_utils import netutils +from six.moves.urllib import parse as urlparse + +from ceilometer.i18n import _LE +from ceilometer.i18n import _LI +from ceilometer.i18n import _LW +from ceilometer.openstack.common import log +from ceilometer import publisher +from ceilometer.publisher import utils + +LOG = log.getLogger(__name__) + + +class KafkaBrokerPublisher(publisher.PublisherBase): + """Publish metering data to kafka broker. + + The ip address and port number of kafka broker should be configured in + ceilometer pipeline configuration file. If an ip address is not specified, + this kafka publisher will not publish any meters. + + To enable this publisher, add the following section to the + /etc/ceilometer/pipeline.yaml file or simply add it to an existing + pipeline:: + + meter: + - name: meter_kafka + interval: 600 + counters: + - "*" + transformers: + sinks: + - kafka_sink + sinks: + - name: kafka_sink + transformers: + publishers: + - kafka://[kafka_broker_ip]:[kafka_broker_port]?topic=[topic] + + Kafka topic name and broker's port are required for this publisher to work + properly. If topic parameter is missing, this kafka publisher publish + metering data under a topic name, 'ceilometer'. If the port number is not + specified, this Kafka Publisher will use 9092 as the broker's port. + This publisher has transmit options such as queue, drop, and retry. These + this option is specified using policy field of URL parameter. When queue + option could be selected, local queue length can be determined using + max_queue_length field as well. When the transfer fails with with retry + option, try to resend the data as many times as specified in max_retry + field. If max_retry is not specified, default the number of retry is 100. + """ + + def __init__(self, parsed_url): + self.kafka_client = None + + self.host, self.port = netutils.parse_host_port( + parsed_url.netloc, default_port=9092) + + self.local_queue = [] + + params = urlparse.parse_qs(parsed_url.query) + self.topic = params.get('topic', ['ceilometer'])[-1] + self.policy = params.get('policy', ['default'])[-1] + self.max_queue_length = int(params.get( + 'max_queue_length', [1024])[-1]) + self.max_retry = int(params.get('max_retry', [100])[-1]) + + if self.policy in ['default', 'drop', 'queue']: + LOG.info(_LI('Publishing policy set to %s') % self.policy) + else: + LOG.warn(_LW('Publishing policy is unknown (%s) force to default') + % self.policy) + self.policy = 'default' + + try: + self._get_client() + except Exception as e: + LOG.exception(_LE("Failed to connect to Kafka service: %s"), e) + + def publish_samples(self, context, samples): + """Send a metering message for kafka broker. + + :param context: Execution context from the service or RPC call + :param samples: Samples from pipeline after transformation + """ + samples_list = [ + utils.meter_message_from_counter( + sample, cfg.CONF.publisher.telemetry_secret) + for sample in samples + ] + + self.local_queue.append(samples_list) + + try: + self._check_kafka_connection() + except Exception as e: + raise e + + self.flush() + + def flush(self): + queue = self.local_queue + self.local_queue = self._process_queue(queue) + if self.policy == 'queue': + self._check_queue_length() + + def publish_events(self, context, events): + """Send an event message for kafka broker. + + :param context: Execution context from the service or RPC call + :param events: events from pipeline after transformation + """ + events_list = [utils.message_from_event( + event, cfg.CONF.publisher.telemetry_secret) for event in events] + + self.local_queue.append(events_list) + + try: + self._check_kafka_connection() + except Exception as e: + raise e + + self.flush() + + def _process_queue(self, queue): + current_retry = 0 + while queue: + data = queue[0] + try: + self._send(data) + except Exception: + LOG.warn(_LW("Failed to publish %d datum"), + sum([len(d) for d in queue])) + if self.policy == 'queue': + return queue + elif self.policy == 'drop': + return [] + current_retry += 1 + if self.current_retry >= self.max_retry: + self.local_queue = [] + LOG.exception(_LE("Failed to retry to send sample data " + "with max_retry times")) + raise + else: + queue.pop(0) + return [] + + def _check_queue_length(self): + queue_length = len(self.local_queue) + if queue_length > self.max_queue_length > 0: + diff = queue_length - self.max_queue_length + self.local_queue = self.local_queue[diff:] + LOG.warn(_LW("Kafka Publisher max local queue length is exceeded, " + "dropping %d oldest data") % diff) + + def _check_kafka_connection(self): + try: + self._get_client() + except Exception as e: + LOG.exception(_LE("Failed to connect to Kafka service: %s"), e) + + if self.policy == 'queue': + self._check_queue_length() + else: + self.local_queue = [] + raise Exception('Kafka Client is not available, ' + 'please restart Kafka client') + + def _get_client(self): + if not self.kafka_client: + self.kafka_client = kafka.KafkaClient( + "%s:%s" % (self.host, self.port)) + self.kafka_producer = kafka.SimpleProducer(self.kafka_client) + + def _send(self, data): + for d in data: + try: + self.kafka_producer.send_messages( + self.topic, json.dumps(d)) + except Exception as e: + LOG.exception(_LE("Failed to send sample data: %s"), e) + raise diff --git a/ceilometer/tests/publisher/test_kafka_broker_publisher.py b/ceilometer/tests/publisher/test_kafka_broker_publisher.py new file mode 100644 index 00000000..418918c3 --- /dev/null +++ b/ceilometer/tests/publisher/test_kafka_broker_publisher.py @@ -0,0 +1,206 @@ +# +# Copyright 2015 Cisco Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for ceilometer/publisher/kafka_broker.py +""" +import datetime +import uuid + +import mock +from oslo_utils import netutils + +from ceilometer.event.storage import models as event +from ceilometer.publisher import kafka_broker as kafka_publisher +from ceilometer import sample +from ceilometer.tests import base as tests_base + + +class TestKafkaPublisher(tests_base.BaseTestCase): + test_event_data = [ + event.Event(message_id=uuid.uuid4(), + event_type='event_%d' % i, + generated=datetime.datetime.utcnow(), + traits=[], raw={}) + for i in range(0, 5) + ] + + test_data = [ + sample.Sample( + name='test', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + sample.Sample( + name='test', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + sample.Sample( + name='test2', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + sample.Sample( + name='test2', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + sample.Sample( + name='test3', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + ] + + def setUp(self): + super(TestKafkaPublisher, self).setUp() + + def _make_fake_kafka_broker(self, published): + def _fake_kafka_broker(): + def record_data(msg, dest): + published.append((msg, dest)) + + kafka_broker = mock.Mock() + kafka_broker.send_to = record_data + return _fake_kafka_broker + + def test_publish(self): + publisher = kafka_publisher.KafkaBrokerPublisher( + netutils.urlsplit('kafka://127.0.0.1:9092?topic=ceilometer')) + publisher._get_client = mock.Mock(name="_get_client") + publisher._get_client.return_value = mock.Mock() + + with mock.patch.object(publisher, '_send') as fake_send: + fake_send.side_effect = mock.Mock() + publisher.publish_samples(mock.MagicMock(), self.test_data) + self.assertEqual(1, len(fake_send.mock_calls)) + self.assertEqual(0, len(publisher.local_queue)) + + def test_publish_without_options(self): + publisher = kafka_publisher.KafkaBrokerPublisher( + netutils.urlsplit('kafka://127.0.0.1:9092')) + publisher._get_client = mock.Mock(name="_get_client") + publisher._get_client.return_value = mock.Mock() + + with mock.patch.object(publisher, '_send') as fake_send: + fake_send.side_effect = mock.Mock() + publisher.publish_samples(mock.MagicMock(), self.test_data) + self.assertEqual(1, len(fake_send.mock_calls)) + self.assertEqual(0, len(publisher.local_queue)) + + def test_publish_to_unreacheable_host_under_retry_policy(self): + publisher = kafka_publisher.KafkaBrokerPublisher( + netutils.urlsplit( + 'kafka://127.0.0.1:9092?topic=ceilometer&policy=retry')) + + with mock.patch.object(publisher, '_get_client') as fake_client: + fake_client.return_value = None + self.assertRaises(TypeError, publisher.publish_samples, + (mock.MagicMock(), self.test_data)) + + def test_publish_to_unreacheable_host_under_drop_policy(self): + publisher = kafka_publisher.KafkaBrokerPublisher( + netutils.urlsplit( + 'kafka://127.0.0.1:9092?topic=ceilometer&policy=drop')) + + with mock.patch.object(publisher, '_get_client') as fake_client: + fake_client.return_value = None + publisher.publish_samples(mock.MagicMock(), self.test_data) + self.assertEqual(0, len(publisher.local_queue)) + + def test_publish_to_unreacheable_host_under_queue_policy(self): + publisher = kafka_publisher.KafkaBrokerPublisher( + netutils.urlsplit( + 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue')) + + with mock.patch.object(publisher, '_get_client') as fake_client: + fake_client.return_value = None + publisher.publish_samples(mock.MagicMock(), self.test_data) + self.assertEqual(1, len(publisher.local_queue)) + + def test_publish_to_unreachable_host_with_default_queue_size(self): + publisher = kafka_publisher.KafkaBrokerPublisher( + netutils.urlsplit( + 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue')) + + with mock.patch.object(publisher, '_get_client') as fake_client: + fake_client.return_value = None + for i in range(0, 2000): + for s in self.test_data: + s.name = 'test-%d' % i + publisher.publish_samples(mock.MagicMock(), + self.test_data) + + self.assertEqual(1024, len(publisher.local_queue)) + self.assertEqual( + 'test-976', + publisher.local_queue[0][0]['counter_name'] + ) + self.assertEqual( + 'test-1999', + publisher.local_queue[1023][0]['counter_name'] + ) + + def test_publish_to_host_from_down_to_up_with_local_queue(self): + publisher = kafka_publisher.KafkaBrokerPublisher( + netutils.urlsplit( + 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue')) + + with mock.patch.object(publisher, "_get_client") as fake_client: + fake_client.return_value = None + for i in range(0, 16): + for s in self.test_data: + s.name = 'test-%d' % i + publisher.publish_samples(mock.MagicMock(), self.test_data) + + self.assertEqual(16, len(publisher.local_queue)) + + fake_client.return_value = mock.Mock() + + with mock.patch.object(publisher, '_send') as fake_send: + fake_send.return_value = mock.Mock() + for s in self.test_data: + s.name = 'test-%d' % 16 + publisher.publish_samples(mock.MagicMock(), self.test_data) + self.assertEqual(0, len(publisher.local_queue)) diff --git a/requirements.txt b/requirements.txt index 2ba95174..e6e8adf9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,7 @@ eventlet>=0.16.1 iso8601>=0.1.9 jsonpath-rw>=1.2.0,<2.0 jsonschema>=2.0.0,<3.0.0 +kafka-python>=0.9.2 keystonemiddleware>=1.0.0 lxml>=2.3 msgpack-python>=0.4.0 diff --git a/setup.cfg b/setup.cfg index 5d5bd564..fa1dd783 100644 --- a/setup.cfg +++ b/setup.cfg @@ -286,11 +286,13 @@ ceilometer.publisher = udp = ceilometer.publisher.udp:UDPPublisher file = ceilometer.publisher.file:FilePublisher direct = ceilometer.publisher.direct:DirectPublisher + kafka = ceilometer.publisher.kafka_broker:KafkaBrokerPublisher ceilometer.event.publisher = test = ceilometer.publisher.test:TestPublisher direct = ceilometer.publisher.direct:DirectPublisher notifier = ceilometer.publisher.messaging:EventNotifierPublisher + kafka = ceilometer.publisher.kafka_broker:KafkaBrokerPublisher ceilometer.alarm.rule = threshold = ceilometer.api.controllers.v2.alarm_rules.threshold:AlarmThresholdRule