From adaa57f062e13de47076f897411f2825ac3219ea Mon Sep 17 00:00:00 2001 From: Lianhao Lu Date: Thu, 20 Aug 2015 14:12:28 +0800 Subject: [PATCH] Configure collector to only record meter or event Added the configuration options to allow the administrator to configure the collector to only record meter or event by setting the meter_dispatchers and event_dispatchers configuration. Separated dispatcher managers load the dispatchers from different namespaces for meter/event respectively. Doc-Impact: separated meter/event dispatchers Change-Id: I161318f4503ce79205b54a93f2ae9b4e052f7971 Closes-Bug: #1480333 --- ceilometer/api/controllers/v2/root.py | 6 +-- ceilometer/collector.py | 30 ++++++----- ceilometer/dispatcher/__init__.py | 41 ++++++++++----- ceilometer/dispatcher/database.py | 7 ++- ceilometer/dispatcher/file.py | 12 +++-- ceilometer/dispatcher/gnocchi.py | 22 ++++++-- ceilometer/dispatcher/http.py | 9 ++-- .../functional/api/v2/test_api_upgrade.py | 4 +- ceilometer/tests/functional/test_collector.py | 11 ++-- .../tests/unit/dispatcher/test_dispatcher.py | 52 +++++++++++++++++++ setup.cfg | 7 ++- 11 files changed, 148 insertions(+), 53 deletions(-) create mode 100644 ceilometer/tests/unit/dispatcher/test_dispatcher.py diff --git a/ceilometer/api/controllers/v2/root.py b/ceilometer/api/controllers/v2/root.py index 5ebd70a3..d9964bce 100644 --- a/ceilometer/api/controllers/v2/root.py +++ b/ceilometer/api/controllers/v2/root.py @@ -51,7 +51,7 @@ API_OPTS = [ ] cfg.CONF.register_opts(API_OPTS, group='api') -cfg.CONF.import_opt('dispatcher', 'ceilometer.dispatcher') +cfg.CONF.import_opt('meter_dispatchers', 'ceilometer.dispatcher') LOG = log.getLogger(__name__) @@ -109,8 +109,8 @@ class V2Controller(object): if cfg.CONF.api.gnocchi_is_enabled is not None: self._gnocchi_is_enabled = cfg.CONF.api.gnocchi_is_enabled - elif ("gnocchi" not in cfg.CONF.dispatcher - or "database" in cfg.CONF.dispatcher): + elif ("gnocchi" not in cfg.CONF.meter_dispatchers + or "database" in cfg.CONF.meter_dispatchers): self._gnocchi_is_enabled = False else: try: diff --git a/ceilometer/collector.py b/ceilometer/collector.py index 489da810..532cbe87 100644 --- a/ceilometer/collector.py +++ b/ceilometer/collector.py @@ -72,7 +72,8 @@ class CollectorService(os_service.Service): def start(self): """Bind the UDP socket and handle incoming data.""" # ensure dispatcher is configured before starting other services - self.dispatcher_manager = dispatcher.load_dispatcher_manager() + dispatcher_managers = dispatcher.load_dispatcher_manager() + (self.meter_manager, self.event_manager) = dispatcher_managers self.rpc_server = None self.sample_listener = None self.event_listener = None @@ -89,27 +90,28 @@ class CollectorService(os_service.Service): self.rpc_server = messaging.get_rpc_server( transport, cfg.CONF.publisher_rpc.metering_topic, self) - sample_target = oslo_messaging.Target( - topic=cfg.CONF.publisher_notifier.metering_topic) - self.sample_listener = messaging.get_notification_listener( - transport, [sample_target], - [SampleEndpoint(self.dispatcher_manager)], - allow_requeue=(cfg.CONF.collector. - requeue_sample_on_dispatcher_error)) + if list(self.meter_manager): + sample_target = oslo_messaging.Target( + topic=cfg.CONF.publisher_notifier.metering_topic) + self.sample_listener = messaging.get_notification_listener( + transport, [sample_target], + [SampleEndpoint(self.meter_manager)], + allow_requeue=(cfg.CONF.collector. + requeue_sample_on_dispatcher_error)) + self.sample_listener.start() - if cfg.CONF.notification.store_events: + if cfg.CONF.notification.store_events and list(self.event_manager): event_target = oslo_messaging.Target( topic=cfg.CONF.publisher_notifier.event_topic) self.event_listener = messaging.get_notification_listener( transport, [event_target], - [EventEndpoint(self.dispatcher_manager)], + [EventEndpoint(self.event_manager)], allow_requeue=(cfg.CONF.collector. requeue_event_on_dispatcher_error)) self.event_listener.start() if cfg.CONF.collector.enable_rpc: self.rpc_server.start() - self.sample_listener.start() if not cfg.CONF.collector.udp_address: # Add a dummy thread to have wait() working @@ -136,8 +138,8 @@ class CollectorService(os_service.Service): else: try: LOG.debug("UDP: Storing %s", sample) - self.dispatcher_manager.map_method('record_metering_data', - sample) + self.meter_manager.map_method('record_metering_data', + sample) except Exception: LOG.exception(_("UDP: Unable to store meter")) @@ -157,7 +159,7 @@ class CollectorService(os_service.Service): When the notification messages are re-published through the RPC publisher, this method receives them for processing. """ - self.dispatcher_manager.map_method('record_metering_data', data=data) + self.meter_manager.map_method('record_metering_data', data=data) class CollectorEndpoint(object): diff --git a/ceilometer/dispatcher/__init__.py b/ceilometer/dispatcher/__init__.py index ac1be731..8e56688d 100644 --- a/ceilometer/dispatcher/__init__.py +++ b/ceilometer/dispatcher/__init__.py @@ -20,49 +20,62 @@ from oslo_log import log import six from stevedore import named -from ceilometer.i18n import _ +from ceilometer.i18n import _LW LOG = log.getLogger(__name__) OPTS = [ - cfg.MultiStrOpt('dispatcher', - deprecated_group="collector", + cfg.MultiStrOpt('meter_dispatchers', + deprecated_name='dispatcher', default=['database'], - help='Dispatcher to process data.'), + help='Dispatchers to process metering data.'), + cfg.MultiStrOpt('event_dispatchers', + default=['database'], + deprecated_name='dispatcher', + help='Dispatchers to process event data.'), ] cfg.CONF.register_opts(OPTS) -DISPATCHER_NAMESPACE = 'ceilometer.dispatcher' +def _load_dispatcher_manager(dispatcher_type): + namespace = 'ceilometer.dispatcher.%s' % dispatcher_type + conf_name = '%s_dispatchers' % dispatcher_type - -def load_dispatcher_manager(): - LOG.debug('loading dispatchers from %s', DISPATCHER_NAMESPACE) + LOG.debug('loading dispatchers from %s', namespace) # set propagate_map_exceptions to True to enable stevedore # to propagate exceptions. dispatcher_manager = named.NamedExtensionManager( - namespace=DISPATCHER_NAMESPACE, - names=cfg.CONF.dispatcher, + namespace=namespace, + names=getattr(cfg.CONF, conf_name), invoke_on_load=True, invoke_args=[cfg.CONF], propagate_map_exceptions=True) if not list(dispatcher_manager): - LOG.warning(_('Failed to load any dispatchers for %s'), - DISPATCHER_NAMESPACE) + LOG.warning(_LW('Failed to load any dispatchers for %s'), + namespace) return dispatcher_manager -@six.add_metaclass(abc.ABCMeta) -class Base(object): +def load_dispatcher_manager(): + return (_load_dispatcher_manager('meter'), + _load_dispatcher_manager('event')) + +class Base(object): def __init__(self, conf): self.conf = conf + +@six.add_metaclass(abc.ABCMeta) +class MeterDispatcherBase(Base): @abc.abstractmethod def record_metering_data(self, data): """Recording metering data interface.""" + +@six.add_metaclass(abc.ABCMeta) +class EventDispatcherBase(Base): @abc.abstractmethod def record_events(self, events): """Recording events interface.""" diff --git a/ceilometer/dispatcher/database.py b/ceilometer/dispatcher/database.py index 0e1b7361..620b8a0f 100644 --- a/ceilometer/dispatcher/database.py +++ b/ceilometer/dispatcher/database.py @@ -25,7 +25,8 @@ from ceilometer import storage LOG = log.getLogger(__name__) -class DatabaseDispatcher(dispatcher.Base): +class DatabaseDispatcher(dispatcher.MeterDispatcherBase, + dispatcher.EventDispatcherBase): """Dispatcher class for recording metering data into database. The dispatcher class which records each meter into a database configured @@ -35,8 +36,10 @@ class DatabaseDispatcher(dispatcher.Base): ceilometer.conf file [DEFAULT] - dispatcher = database + meter_dispatchers = database + event_dispatchers = database """ + def __init__(self, conf): super(DatabaseDispatcher, self).__init__(conf) diff --git a/ceilometer/dispatcher/file.py b/ceilometer/dispatcher/file.py index 5ec6e3c5..a4da54d0 100644 --- a/ceilometer/dispatcher/file.py +++ b/ceilometer/dispatcher/file.py @@ -35,12 +35,13 @@ OPTS = [ cfg.CONF.register_opts(OPTS, group="dispatcher_file") -class FileDispatcher(dispatcher.Base): +class FileDispatcher(dispatcher.MeterDispatcherBase, + dispatcher.EventDispatcherBase): """Dispatcher class for recording metering data to a file. - The dispatcher class which logs each meter into a file configured in - ceilometer configuration file. An example configuration may look like the - following: + The dispatcher class which logs each meter and/or event into a file + configured in ceilometer configuration file. An example configuration may + look like the following: [dispatcher_file] file_path = /tmp/meters @@ -49,7 +50,8 @@ class FileDispatcher(dispatcher.Base): ceilometer.conf file [DEFAULT] - dispatcher = file + meter_dispatchers = file + event_dispatchers = file """ def __init__(self, conf): diff --git a/ceilometer/dispatcher/gnocchi.py b/ceilometer/dispatcher/gnocchi.py index 047638bf..92ca492e 100644 --- a/ceilometer/dispatcher/gnocchi.py +++ b/ceilometer/dispatcher/gnocchi.py @@ -171,7 +171,23 @@ class ResourcesDefinition(object): return metrics -class GnocchiDispatcher(dispatcher.Base): +class GnocchiDispatcher(dispatcher.MeterDispatcherBase): + """Dispatcher class for recording metering data into database. + + The dispatcher class records each meter into the gnocchi service + configured in ceilometer configuration file. An example configuration may + look like the following: + + [dispatcher_gnocchi] + url = http://localhost:8041 + archive_policy = low + + To enable this dispatcher, the following section needs to be present in + ceilometer.conf file + + [DEFAULT] + meter_dispatchers = gnocchi + """ def __init__(self, conf): super(GnocchiDispatcher, self).__init__(conf) self.conf = conf @@ -347,7 +363,3 @@ class GnocchiDispatcher(dispatcher.Base): # NOTE(sileht): Just ignore the metric have been # created in the meantime. pass - - @staticmethod - def record_events(events): - pass diff --git a/ceilometer/dispatcher/http.py b/ceilometer/dispatcher/http.py index f26c358a..eef68a81 100644 --- a/ceilometer/dispatcher/http.py +++ b/ceilometer/dispatcher/http.py @@ -50,14 +50,16 @@ http_dispatcher_opts = [ cfg.CONF.register_opts(http_dispatcher_opts, group="dispatcher_http") -class HttpDispatcher(dispatcher.Base): - """Dispatcher class for posting metering data into a http target. +class HttpDispatcher(dispatcher.MeterDispatcherBase, + dispatcher.EventDispatcherBase): + """Dispatcher class for posting metering/event data into a http target. To enable this dispatcher, the following option needs to be present in ceilometer.conf file:: [DEFAULT] - dispatcher = http + meter_dispatchers = http + event_dispatchers = http Dispatcher specific options can be added as follows:: @@ -67,6 +69,7 @@ class HttpDispatcher(dispatcher.Base): cadf_only = true timeout = 2 """ + def __init__(self, conf): super(HttpDispatcher, self).__init__(conf) self.headers = {'Content-type': 'application/json'} diff --git a/ceilometer/tests/functional/api/v2/test_api_upgrade.py b/ceilometer/tests/functional/api/v2/test_api_upgrade.py index 047c989e..15abca75 100644 --- a/ceilometer/tests/functional/api/v2/test_api_upgrade.py +++ b/ceilometer/tests/functional/api/v2/test_api_upgrade.py @@ -29,7 +29,7 @@ class TestAPIUpgradePath(v2.FunctionalTest): self.CONF.set_override('gnocchi_is_enabled', None, group='api') self.CONF.set_override('aodh_is_enabled', None, group='api') self.CONF.set_override('aodh_url', None, group='api') - self.CONF.set_override('dispatcher', ['database']) + self.CONF.set_override('meter_dispatchers', ['database']) self.ks = mock.Mock() self.ks.service_catalog.url_for.side_effect = self._url_for self.useFixture(mockpatch.Patch( @@ -44,7 +44,7 @@ class TestAPIUpgradePath(v2.FunctionalTest): raise exceptions.EndpointNotFound() def _do_test_gnocchi_enabled_without_database_backend(self): - self.CONF.set_override('dispatcher', 'gnocchi') + self.CONF.set_override('meter_dispatchers', 'gnocchi') for endpoint in ['meters', 'samples', 'resources']: response = self.app.get(self.PATH_PREFIX + '/' + endpoint, status=410) diff --git a/ceilometer/tests/functional/test_collector.py b/ceilometer/tests/functional/test_collector.py index 9bc0661a..5e6bf585 100644 --- a/ceilometer/tests/functional/test_collector.py +++ b/ceilometer/tests/functional/test_collector.py @@ -102,7 +102,7 @@ class TestCollector(tests_base.BaseTestCase): ], propagate_map_exceptions=True) self.useFixture(mockpatch.Patch( 'ceilometer.dispatcher.load_dispatcher_manager', - return_value=fake_dispatcher)) + return_value=(fake_dispatcher, fake_dispatcher))) return plugin def _make_fake_socket(self, sample): @@ -124,7 +124,8 @@ class TestCollector(tests_base.BaseTestCase): def test_record_metering_data(self): mock_dispatcher = self._setup_fake_dispatcher() - self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager() + dps = dispatcher.load_dispatcher_manager() + (self.srv.meter_manager, self.srv.manager) = dps self.srv.record_metering_data(None, self.counter) mock_dispatcher.record_metering_data.assert_called_once_with( data=self.counter) @@ -187,6 +188,7 @@ class TestCollector(tests_base.BaseTestCase): def test_udp_receive_bad_decoding(self): self._setup_messaging(False) + self._setup_fake_dispatcher() udp_socket = self._make_fake_socket(self.counter) with mock.patch('socket.socket', return_value=udp_socket): with mock.patch('msgpack.loads', self._raise_error): @@ -199,6 +201,7 @@ class TestCollector(tests_base.BaseTestCase): def test_only_udp(self, udp_start, rpc_start): """Check that only UDP is started if messaging transport is unset.""" self._setup_messaging(False) + self._setup_fake_dispatcher() udp_socket = self._make_fake_socket(self.counter) with mock.patch('socket.socket', return_value=udp_socket): self.srv.start() @@ -211,6 +214,7 @@ class TestCollector(tests_base.BaseTestCase): """Check that only RPC is started if udp_address is empty.""" self.CONF.set_override('enable_rpc', True, group='collector') self.CONF.set_override('udp_address', '', group='collector') + self._setup_fake_dispatcher() self.srv.start() # two calls because two servers (notification and rpc) self.assertEqual(2, rpc_start.call_count) @@ -231,14 +235,13 @@ class TestCollector(tests_base.BaseTestCase): def test_collector_no_mock(self, mylog): self.CONF.set_override('enable_rpc', True, group='collector') self.CONF.set_override('udp_address', '', group='collector') - self.srv.start() mylog.info.side_effect = lambda *args: self.srv.stop() + self.srv.start() client = messaging.get_rpc_client(self.transport, version='1.0') cclient = client.prepare(topic='metering') cclient.cast(context.RequestContext(), 'record_metering_data', data=[self.utf8_msg]) - self.srv.rpc_server.wait() mylog.info.assert_called_once_with( 'metering data test for test_run_tasks: 1') diff --git a/ceilometer/tests/unit/dispatcher/test_dispatcher.py b/ceilometer/tests/unit/dispatcher/test_dispatcher.py new file mode 100644 index 00000000..780c3128 --- /dev/null +++ b/ceilometer/tests/unit/dispatcher/test_dispatcher.py @@ -0,0 +1,52 @@ +# Copyright 2015 Intel Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import fixture +from oslotest import mockpatch + +from ceilometer import dispatcher +from ceilometer.tests import base + + +class FakeDispatcherSample(dispatcher.MeterDispatcherBase): + def record_metering_data(self, data): + pass + + +class FakeDispatcher(dispatcher.MeterDispatcherBase, + dispatcher.EventDispatcherBase): + def record_metering_data(self, data): + pass + + def record_events(self, events): + pass + + +class TestDispatchManager(base.BaseTestCase): + def setUp(self): + super(TestDispatchManager, self).setUp() + self.conf = self.useFixture(fixture.Config()) + self.conf.config(meter_dispatchers=['database', 'gnocchi'], + event_dispatchers=['database']) + self.useFixture(mockpatch.Patch( + 'ceilometer.dispatcher.gnocchi.GnocchiDispatcher', + new=FakeDispatcherSample)) + self.useFixture(mockpatch.Patch( + 'ceilometer.dispatcher.database.DatabaseDispatcher', + new=FakeDispatcher)) + + def test_load(self): + sample_mg, event_mg = dispatcher.load_dispatcher_manager() + self.assertEqual(2, len(list(sample_mg))) + self.assertEqual(1, len(list(event_mg))) diff --git a/setup.cfg b/setup.cfg index ff4905b4..68ef0683 100644 --- a/setup.cfg +++ b/setup.cfg @@ -281,12 +281,17 @@ console_scripts = ceilometer-alarm-evaluator = ceilometer.cmd.eventlet.alarm:evaluator ceilometer-alarm-notifier = ceilometer.cmd.eventlet.alarm:notifier -ceilometer.dispatcher = +ceilometer.dispatcher.meter = database = ceilometer.dispatcher.database:DatabaseDispatcher file = ceilometer.dispatcher.file:FileDispatcher http = ceilometer.dispatcher.http:HttpDispatcher gnocchi = ceilometer.dispatcher.gnocchi:GnocchiDispatcher +ceilometer.dispatcher.event = + database = ceilometer.dispatcher.database:DatabaseDispatcher + file = ceilometer.dispatcher.file:FileDispatcher + http = ceilometer.dispatcher.http:HttpDispatcher + network.statistics.drivers = opendaylight = ceilometer.network.statistics.opendaylight.driver:OpenDayLightDriver opencontrail = ceilometer.network.statistics.opencontrail.driver:OpencontrailDriver