diff --git a/ceilometer/collector/dispatcher/__init__.py b/ceilometer/collector/dispatcher/__init__.py new file mode 100644 index 00000000..94003b69 --- /dev/null +++ b/ceilometer/collector/dispatcher/__init__.py @@ -0,0 +1,31 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2013 IBM +# +# Author: Tong Li +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + + +class Base(object): + + __metaclass__ = abc.ABCMeta + + def __init__(self, conf): + self.conf = conf + + @abc.abstractmethod + def record_metering_data(self, context, data): + """Recording metering data interface.""" diff --git a/ceilometer/collector/dispatcher/database.py b/ceilometer/collector/dispatcher/database.py new file mode 100644 index 00000000..732a26e7 --- /dev/null +++ b/ceilometer/collector/dispatcher/database.py @@ -0,0 +1,72 @@ +# -*- encoding: utf-8 -*- +# +# Copyright 2013 IBM Corp +# +# Author: Tong Li +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from ceilometer import storage +from ceilometer.collector import dispatcher +from ceilometer.openstack.common import log +from ceilometer.openstack.common import timeutils +from ceilometer.publisher import rpc as publisher_rpc + +LOG = log.getLogger(__name__) + + +class DatabaseDispatcher(dispatcher.Base): + '''Dispatcher class for recording metering data into database. + + The dispatcher class which records each meter into a database configured + in ceilometer configuration file. + + To enable this dispatcher, the following section needs to be present in + ceilometer.conf file + + dispatchers = database + ''' + def __init__(self, conf): + super(DatabaseDispatcher, self).__init__(conf) + self.storage_conn = storage.get_connection(conf) + + def record_metering_data(self, context, data): + # We may have receive only one counter on the wire + if not isinstance(data, list): + data = [data] + + for meter in data: + LOG.debug('metering data %s for %s @ %s: %s', + meter['counter_name'], + meter['resource_id'], + meter.get('timestamp', 'NO TIMESTAMP'), + meter['counter_volume']) + if publisher_rpc.verify_signature( + meter, + self.conf.publisher_rpc.metering_secret): + try: + # Convert the timestamp to a datetime instance. + # Storage engines are responsible for converting + # that value to something they can store. + if meter.get('timestamp'): + ts = timeutils.parse_isotime(meter['timestamp']) + meter['timestamp'] = timeutils.normalize_time(ts) + self.storage_conn.record_metering_data(meter) + except Exception as err: + LOG.error('Failed to record metering data: %s', err) + LOG.exception(err) + else: + LOG.warning( + 'message signature invalid, discarding message: %r', + meter) diff --git a/ceilometer/collector/dispatcher/file.py b/ceilometer/collector/dispatcher/file.py new file mode 100644 index 00000000..1c818df4 --- /dev/null +++ b/ceilometer/collector/dispatcher/file.py @@ -0,0 +1,81 @@ +# -*- encoding: utf-8 -*- +# +# Copyright 2013 IBM Corp +# +# Author: Tong Li +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import logging.handlers +from oslo.config import cfg +from ceilometer.collector import dispatcher + +file_dispatcher_opts = [ + cfg.StrOpt('file_path', + default=None, + help='Name and the location of the file to record ' + 'meters.'), + cfg.IntOpt('max_bytes', + default=0, + help='The max size of the file'), + cfg.IntOpt('backup_count', + default=0, + help='The max number of the files to keep'), +] + +cfg.CONF.register_opts(file_dispatcher_opts, group="dispatcher_file") + + +class FileDispatcher(dispatcher.Base): + '''Dispatcher class for recording metering data to a file. + + The dispatcher class which logs each meter into a file configured in + ceilometer configuration file. An example configuration may look like the + following: + + [dispatcher_file] + file_path = /tmp/meters + + To enable this dispatcher, the following section needs to be present in + ceilometer.conf file + + [collector] + dispatchers = file + ''' + + def __init__(self, conf): + super(FileDispatcher, self).__init__(conf) + self.log = None + + # if the directory and path are configured, then log to the file + if self.conf.dispatcher_file.file_path: + dispatcher_logger = logging.Logger('dispather.file') + dispatcher_logger.setLevel(logging.INFO) + # create rotating file handler which logs meters + rfh = logging.handlers.RotatingFileHandler( + self.conf.dispatcher_file.file_path, + maxBytes=self.conf.dispatcher_file.max_bytes, + backupCount=self.conf.dispatcher_file.backup_count, + encoding='utf8') + + rfh.setLevel(logging.INFO) + # Only wanted the meters to be saved in the file, not the + # project root logger. + dispatcher_logger.propagate = False + dispatcher_logger.addHandler(rfh) + self.log = dispatcher_logger + + def record_metering_data(self, context, data): + if self.log: + self.log.info(data) diff --git a/ceilometer/collector/service.py b/ceilometer/collector/service.py index d4c08a5b..3c646d86 100644 --- a/ceilometer/collector/service.py +++ b/ceilometer/collector/service.py @@ -20,8 +20,8 @@ import msgpack from oslo.config import cfg import socket from stevedore import extension +from stevedore import named -from ceilometer.publisher import rpc as publisher_rpc from ceilometer.service import prepare_service from ceilometer.openstack.common import context from ceilometer.openstack.common.gettextutils import _ @@ -30,7 +30,6 @@ from ceilometer.openstack.common import service as os_service from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher from ceilometer.openstack.common.rpc import service as rpc_service - from ceilometer.openstack.common import timeutils from ceilometer import pipeline from ceilometer import storage @@ -51,6 +50,9 @@ OPTS = [ cfg.BoolOpt('store_events', default=False, help='Save event details'), + cfg.MultiStrOpt('dispatcher', + default=['database'], + help='dispatcher to process metering data'), ] cfg.CONF.register_opts(OPTS, group="collector") @@ -108,10 +110,10 @@ def udp_collector(): class CollectorService(rpc_service.Service): COLLECTOR_NAMESPACE = 'ceilometer.collector' + DISPATCHER_NAMESPACE = 'ceilometer.dispatcher' def __init__(self, host, topic, manager=None): super(CollectorService, self).__init__(host, topic, manager) - self.storage_conn = storage.get_connection(cfg.CONF) def start(self): super(CollectorService, self).start() @@ -140,6 +142,18 @@ class CollectorService(rpc_service.Service): self.COLLECTOR_NAMESPACE) self.notification_manager.map(self._setup_subscription) + # Load all configured dispatchers + self.dispatchers = [] + for dispatcher in named.NamedExtensionManager( + namespace=self.DISPATCHER_NAMESPACE, + names=cfg.CONF.collector.dispatcher, + invoke_on_load=True, + invoke_args=[cfg.CONF]): + if dispatcher.obj: + self.dispatchers.append(dispatcher.obj) + + LOG.info('dispatchers loaded %s' % str(self.dispatchers)) + # Set ourselves up as a separate worker for the metering data, # since the default for service is to use create_consumer(). self.conn.create_worker( @@ -168,6 +182,10 @@ class CollectorService(rpc_service.Service): LOG.exception('Could not join consumer pool %s/%s' % (topic, exchange_topic.exchange)) + def record_metering_data(self, context, data): + for dispatcher in self.dispatchers: + dispatcher.record_metering_data(context, data) + def process_notification(self, notification): """Make a notification processed by an handler.""" LOG.debug('notification %r', notification.get('event_type')) @@ -236,39 +254,6 @@ class CollectorService(rpc_service.Service): # FIXME(dhellmann): Spawn green thread? p(list(handler.process_notification(notification))) - def record_metering_data(self, context, data): - """This method is triggered when metering data is - cast from an agent. - """ - # We may have receive only one counter on the wire - if not isinstance(data, list): - data = [data] - - for meter in data: - LOG.debug('metering data %s for %s @ %s: %s', - meter['counter_name'], - meter['resource_id'], - meter.get('timestamp', 'NO TIMESTAMP'), - meter['counter_volume']) - if publisher_rpc.verify_signature( - meter, - cfg.CONF.publisher_rpc.metering_secret): - try: - # Convert the timestamp to a datetime instance. - # Storage engines are responsible for converting - # that value to something they can store. - if meter.get('timestamp'): - ts = timeutils.parse_isotime(meter['timestamp']) - meter['timestamp'] = timeutils.normalize_time(ts) - self.storage_conn.record_metering_data(meter) - except Exception as err: - LOG.error('Failed to record metering data: %s', err) - LOG.exception(err) - else: - LOG.warning( - 'message signature invalid, discarding message: %r', - meter) - def collector(): prepare_service() diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index 5847ea2e..53d41521 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -247,6 +247,7 @@ rpc_conn_pool_size 30 Size of RPC c rpc_response_timeout 60 Seconds to wait for a response from call or multicall rpc_cast_timeout 30 Seconds to wait before a cast expires (TTL). Only supported by impl_zmq. +dispatchers database The list of dispatchers to process metering data. =========================== ==================================== ============================================================== A sample configuration file can be found in `ceilometer.conf.sample`_. diff --git a/doc/source/install/manual.rst b/doc/source/install/manual.rst index ade08acd..b37526ff 100644 --- a/doc/source/install/manual.rst +++ b/doc/source/install/manual.rst @@ -375,3 +375,46 @@ Configuring keystone to work with API default port value for ceilometer API is 8777. If the port value has been customized, adjust accordingly. + +Use multiple dispatchers +======================== + +.. index:: + double: installing; multiple dispatchers + +.. note:: + The Ceilometer collector allows multiple dispatchers to be configured so that + metering data can be easily sent to multiple internal and external systems. + + Ceilometer by default only saves metering data in a database, to allow + Ceilometer to send metering data to other systems in addition to the + database, multiple dispatchers can be developed and enabled by modifying + Ceilometer configuration file. + + Ceilometer ships two dispatchers currently. One is called database + dispatcher, and the other is called file dispatcher. As the names imply, + database dispatcher basically sends metering data to a database driver, + eventually metering data will be saved in database. File dispatcher sends + metering data into a file. The location, name, size of the file can be + configured in ceilometer configuration file. These two dispatchers are + shipped in the Ceilometer egg and defined in the entry_points as follows: + + [ceilometer.dispatcher] + file = ceilometer.collector.dispatcher.file:FileDispatcher + database = ceilometer.collector.dispatcher.database:DatabaseDispatcher + + To use both dispatchers on a Ceilometer collector service, add the following + line in file ceilometer.conf + + [collector] + dispatcher=database + dispatcher=file + + If there is no dispatcher present, database dispatcher is used as the + default. If in some cases such as traffic tests, no dispatcher is needed, + one can configure the line like the following: + + dispatcher= + + With above configuration, no dispatcher is used by the Ceilometer collector + service, all metering data received by Ceilometer collector will be dropped. diff --git a/etc/ceilometer/ceilometer.conf.sample b/etc/ceilometer/ceilometer.conf.sample index 45d7689b..92e0f26b 100644 --- a/etc/ceilometer/ceilometer.conf.sample +++ b/etc/ceilometer/ceilometer.conf.sample @@ -597,6 +597,23 @@ #os_endpoint_type=publicURL +[dispatcher_file] + +# +# Options defined in ceilometer.collector.dispatcher.file +# + +# Name and the location of the file to record meters. (string +# value) +#file_path= + +# The max size of the file (integer value) +#max_bytes=0 + +# The max number of the files to keep (integer value) +#backup_count=0 + + [collector] # @@ -617,6 +634,9 @@ # Save event details (boolean value) #store_events=false +# dispatcher to process metering data (multi valued) +#dispatcher=database + [matchmaker_ring] @@ -644,4 +664,4 @@ #password= -# Total option count: 123 +# Total option count: 127 diff --git a/setup.cfg b/setup.cfg index 95070144..3f32931d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -119,6 +119,10 @@ console_scripts = ceilometer-alarm-singleton = ceilometer.alarm.service:singleton_alarm ceilometer-alarm-notifier = ceilometer.alarm.service:alarm_notifier +ceilometer.dispatcher = + database = ceilometer.collector.dispatcher.database:DatabaseDispatcher + file = ceilometer.collector.dispatcher.file:FileDispatcher + [build_sphinx] all_files = 1 build-dir = doc/build diff --git a/tests/collector/dispatcher/__init__.py b/tests/collector/dispatcher/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/collector/dispatcher/test_db.py b/tests/collector/dispatcher/test_db.py new file mode 100644 index 00000000..16e1d47a --- /dev/null +++ b/tests/collector/dispatcher/test_db.py @@ -0,0 +1,114 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2013 IBM Corp +# +# Author: Tong Li +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for ceilometer/collector/dispatcher/database.py +""" +from oslo.config import cfg +from datetime import datetime + +from ceilometer.collector.dispatcher import database +from ceilometer.publisher import rpc +from ceilometer.tests import base as tests_base +from ceilometer.storage import base + + +class TestDispatcherDB(tests_base.TestCase): + + def setUp(self): + super(TestDispatcherDB, self).setUp() + self.dispatcher = database.DatabaseDispatcher(cfg.CONF) + self.ctx = None + + def test_valid_message(self): + msg = {'counter_name': 'test', + 'resource_id': self.id(), + 'counter_volume': 1, + } + msg['message_signature'] = rpc.compute_signature( + msg, + cfg.CONF.publisher_rpc.metering_secret, + ) + + self.dispatcher.storage_conn = self.mox.CreateMock(base.Connection) + self.dispatcher.storage_conn.record_metering_data(msg) + self.mox.ReplayAll() + + self.dispatcher.record_metering_data(self.ctx, msg) + self.mox.VerifyAll() + + def test_invalid_message(self): + msg = {'counter_name': 'test', + 'resource_id': self.id(), + 'counter_volume': 1, + } + msg['message_signature'] = 'invalid-signature' + + class ErrorConnection: + + called = False + + def record_metering_data(self, data): + self.called = True + + self.dispatcher.storage_conn = ErrorConnection() + + self.dispatcher.record_metering_data(self.ctx, msg) + + assert not self.dispatcher.storage_conn.called, \ + 'Should not have called the storage connection' + + def test_timestamp_conversion(self): + msg = {'counter_name': 'test', + 'resource_id': self.id(), + 'counter_volume': 1, + 'timestamp': '2012-07-02T13:53:40Z', + } + msg['message_signature'] = rpc.compute_signature( + msg, + cfg.CONF.publisher_rpc.metering_secret, + ) + + expected = {} + expected.update(msg) + expected['timestamp'] = datetime(2012, 7, 2, 13, 53, 40) + + self.dispatcher.storage_conn = self.mox.CreateMock(base.Connection) + self.dispatcher.storage_conn.record_metering_data(expected) + self.mox.ReplayAll() + + self.dispatcher.record_metering_data(self.ctx, msg) + + def test_timestamp_tzinfo_conversion(self): + msg = {'counter_name': 'test', + 'resource_id': self.id(), + 'counter_volume': 1, + 'timestamp': '2012-09-30T15:31:50.262-08:00', + } + msg['message_signature'] = rpc.compute_signature( + msg, + cfg.CONF.publisher_rpc.metering_secret, + ) + + expected = {} + expected.update(msg) + expected['timestamp'] = datetime(2012, 9, 30, 23, 31, 50, 262000) + + self.dispatcher.storage_conn = self.mox.CreateMock(base.Connection) + self.dispatcher.storage_conn.record_metering_data(expected) + self.mox.ReplayAll() + + self.dispatcher.record_metering_data(self.ctx, msg) diff --git a/tests/collector/dispatcher/test_file.py b/tests/collector/dispatcher/test_file.py new file mode 100644 index 00000000..7262ddcd --- /dev/null +++ b/tests/collector/dispatcher/test_file.py @@ -0,0 +1,105 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2013 IBM Corp +# +# Author: Tong Li +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for ceilometer/collector/dispatcher/file.py +""" + +import os +import tempfile +import logging.handlers +from oslo.config import cfg + +from ceilometer.collector.dispatcher import file +from ceilometer.publisher import rpc +from ceilometer.tests import base as tests_base + + +class TestDispatcherFile(tests_base.TestCase): + + def setUp(self): + super(TestDispatcherFile, self).setUp() + + def test_file_dispatcher_with_all_config(self): + # Create a temporaryFile to get a file name + tf = tempfile.NamedTemporaryFile('r') + filename = tf.name + tf.close() + + cfg.CONF.dispatcher_file.file_path = filename + cfg.CONF.dispatcher_file.max_bytes = 50 + cfg.CONF.dispatcher_file.backup_count = 5 + dispatcher = file.FileDispatcher(cfg.CONF) + + # The number of the handlers should be 1 + self.assertEqual(1, len(dispatcher.log.handlers)) + # The handler should be RotatingFileHandler + handler = dispatcher.log.handlers[0] + self.assertTrue(isinstance(handler, + logging.handlers.RotatingFileHandler)) + + msg = {'counter_name': 'test', + 'resource_id': self.id(), + 'counter_volume': 1, + } + msg['message_signature'] = rpc.compute_signature( + msg, + cfg.CONF.publisher_rpc.metering_secret, + ) + + # The record_metering_data method should exist and not produce errors. + dispatcher.record_metering_data(None, msg) + # After the method call above, the file should have been created. + self.assertTrue(os.path.exists(handler.baseFilename)) + + def test_file_dispatcher_with_path_only(self): + # Create a temporaryFile to get a file name + tf = tempfile.NamedTemporaryFile('r') + filename = tf.name + tf.close() + + cfg.CONF.dispatcher_file.file_path = filename + cfg.CONF.dispatcher_file.max_bytes = None + cfg.CONF.dispatcher_file.backup_count = None + dispatcher = file.FileDispatcher(cfg.CONF) + + # The number of the handlers should be 1 + self.assertEqual(1, len(dispatcher.log.handlers)) + # The handler should be RotatingFileHandler + handler = dispatcher.log.handlers[0] + self.assertTrue(isinstance(handler, + logging.FileHandler)) + + msg = {'counter_name': 'test', + 'resource_id': self.id(), + 'counter_volume': 1, + } + msg['message_signature'] = rpc.compute_signature( + msg, + cfg.CONF.publisher_rpc.metering_secret, + ) + + # The record_metering_data method should exist and not produce errors. + dispatcher.record_metering_data(None, msg) + # After the method call above, the file should have been created. + self.assertTrue(os.path.exists(handler.baseFilename)) + + def test_file_dispatcher_with_no_path(self): + cfg.CONF.dispatcher_file.file_path = None + dispatcher = file.FileDispatcher(cfg.CONF) + + # The log should be None + self.assertIsNone(dispatcher.log) diff --git a/tests/collector/test_service.py b/tests/collector/test_service.py index 8b1b001a..9882fe47 100644 --- a/tests/collector/test_service.py +++ b/tests/collector/test_service.py @@ -30,7 +30,6 @@ from stevedore.tests import manager as test_manager from ceilometer import counter from ceilometer.openstack.common import timeutils -from ceilometer.publisher import rpc from ceilometer.collector import service from ceilometer.storage import base from ceilometer.tests import base as tests_base @@ -184,87 +183,6 @@ class TestCollectorService(TestCollector): with patch('ceilometer.openstack.common.rpc.create_connection'): self.srv.start() - def test_valid_message(self): - msg = {'counter_name': 'test', - 'resource_id': self.id(), - 'counter_volume': 1, - } - msg['message_signature'] = rpc.compute_signature( - msg, - cfg.CONF.publisher_rpc.metering_secret, - ) - - self.srv.storage_conn = self.mox.CreateMock(base.Connection) - self.srv.storage_conn.record_metering_data(msg) - self.mox.ReplayAll() - - self.srv.record_metering_data(self.ctx, msg) - self.mox.VerifyAll() - - def test_invalid_message(self): - msg = {'counter_name': 'test', - 'resource_id': self.id(), - 'counter_volume': 1, - } - msg['message_signature'] = 'invalid-signature' - - class ErrorConnection: - - called = False - - def record_metering_data(self, data): - self.called = True - - self.srv.storage_conn = ErrorConnection() - - self.srv.record_metering_data(self.ctx, msg) - - assert not self.srv.storage_conn.called, \ - 'Should not have called the storage connection' - - def test_timestamp_conversion(self): - msg = {'counter_name': 'test', - 'resource_id': self.id(), - 'counter_volume': 1, - 'timestamp': '2012-07-02T13:53:40Z', - } - msg['message_signature'] = rpc.compute_signature( - msg, - cfg.CONF.publisher_rpc.metering_secret, - ) - - expected = {} - expected.update(msg) - expected['timestamp'] = datetime.datetime(2012, 7, 2, 13, 53, 40) - - self.srv.storage_conn = self.mox.CreateMock(base.Connection) - self.srv.storage_conn.record_metering_data(expected) - self.mox.ReplayAll() - - self.srv.record_metering_data(self.ctx, msg) - - def test_timestamp_tzinfo_conversion(self): - msg = {'counter_name': 'test', - 'resource_id': self.id(), - 'counter_volume': 1, - 'timestamp': '2012-09-30T15:31:50.262-08:00', - } - msg['message_signature'] = rpc.compute_signature( - msg, - cfg.CONF.publisher_rpc.metering_secret, - ) - - expected = {} - expected.update(msg) - expected['timestamp'] = datetime.datetime(2012, 9, 30, - 23, 31, 50, 262000) - - self.srv.storage_conn = self.mox.CreateMock(base.Connection) - self.srv.storage_conn.record_metering_data(expected) - self.mox.ReplayAll() - - self.srv.record_metering_data(self.ctx, msg) - @patch('ceilometer.pipeline.setup_pipeline', MagicMock()) def test_process_notification(self): # If we try to create a real RPC connection, init_host() never