Configure collector to only record meter or event

Added the configuration options to allow the administrator to configure
the collector to only record meter or event by setting the
meter_dispatchers and event_dispatchers configuration.

Separated dispatcher managers load the dispatchers from different
namespaces for meter/event respectively.

Doc-Impact: separated meter/event dispatchers

Change-Id: I161318f4503ce79205b54a93f2ae9b4e052f7971
Closes-Bug: #1480333
This commit is contained in:
Lianhao Lu 2015-08-20 14:12:28 +08:00
parent 6a10d00a38
commit adaa57f062
11 changed files with 148 additions and 53 deletions

View File

@ -51,7 +51,7 @@ API_OPTS = [
] ]
cfg.CONF.register_opts(API_OPTS, group='api') cfg.CONF.register_opts(API_OPTS, group='api')
cfg.CONF.import_opt('dispatcher', 'ceilometer.dispatcher') cfg.CONF.import_opt('meter_dispatchers', 'ceilometer.dispatcher')
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -109,8 +109,8 @@ class V2Controller(object):
if cfg.CONF.api.gnocchi_is_enabled is not None: if cfg.CONF.api.gnocchi_is_enabled is not None:
self._gnocchi_is_enabled = cfg.CONF.api.gnocchi_is_enabled self._gnocchi_is_enabled = cfg.CONF.api.gnocchi_is_enabled
elif ("gnocchi" not in cfg.CONF.dispatcher elif ("gnocchi" not in cfg.CONF.meter_dispatchers
or "database" in cfg.CONF.dispatcher): or "database" in cfg.CONF.meter_dispatchers):
self._gnocchi_is_enabled = False self._gnocchi_is_enabled = False
else: else:
try: try:

View File

@ -72,7 +72,8 @@ class CollectorService(os_service.Service):
def start(self): def start(self):
"""Bind the UDP socket and handle incoming data.""" """Bind the UDP socket and handle incoming data."""
# ensure dispatcher is configured before starting other services # ensure dispatcher is configured before starting other services
self.dispatcher_manager = dispatcher.load_dispatcher_manager() dispatcher_managers = dispatcher.load_dispatcher_manager()
(self.meter_manager, self.event_manager) = dispatcher_managers
self.rpc_server = None self.rpc_server = None
self.sample_listener = None self.sample_listener = None
self.event_listener = None self.event_listener = None
@ -89,27 +90,28 @@ class CollectorService(os_service.Service):
self.rpc_server = messaging.get_rpc_server( self.rpc_server = messaging.get_rpc_server(
transport, cfg.CONF.publisher_rpc.metering_topic, self) transport, cfg.CONF.publisher_rpc.metering_topic, self)
sample_target = oslo_messaging.Target( if list(self.meter_manager):
topic=cfg.CONF.publisher_notifier.metering_topic) sample_target = oslo_messaging.Target(
self.sample_listener = messaging.get_notification_listener( topic=cfg.CONF.publisher_notifier.metering_topic)
transport, [sample_target], self.sample_listener = messaging.get_notification_listener(
[SampleEndpoint(self.dispatcher_manager)], transport, [sample_target],
allow_requeue=(cfg.CONF.collector. [SampleEndpoint(self.meter_manager)],
requeue_sample_on_dispatcher_error)) allow_requeue=(cfg.CONF.collector.
requeue_sample_on_dispatcher_error))
self.sample_listener.start()
if cfg.CONF.notification.store_events: if cfg.CONF.notification.store_events and list(self.event_manager):
event_target = oslo_messaging.Target( event_target = oslo_messaging.Target(
topic=cfg.CONF.publisher_notifier.event_topic) topic=cfg.CONF.publisher_notifier.event_topic)
self.event_listener = messaging.get_notification_listener( self.event_listener = messaging.get_notification_listener(
transport, [event_target], transport, [event_target],
[EventEndpoint(self.dispatcher_manager)], [EventEndpoint(self.event_manager)],
allow_requeue=(cfg.CONF.collector. allow_requeue=(cfg.CONF.collector.
requeue_event_on_dispatcher_error)) requeue_event_on_dispatcher_error))
self.event_listener.start() self.event_listener.start()
if cfg.CONF.collector.enable_rpc: if cfg.CONF.collector.enable_rpc:
self.rpc_server.start() self.rpc_server.start()
self.sample_listener.start()
if not cfg.CONF.collector.udp_address: if not cfg.CONF.collector.udp_address:
# Add a dummy thread to have wait() working # Add a dummy thread to have wait() working
@ -136,8 +138,8 @@ class CollectorService(os_service.Service):
else: else:
try: try:
LOG.debug("UDP: Storing %s", sample) LOG.debug("UDP: Storing %s", sample)
self.dispatcher_manager.map_method('record_metering_data', self.meter_manager.map_method('record_metering_data',
sample) sample)
except Exception: except Exception:
LOG.exception(_("UDP: Unable to store meter")) LOG.exception(_("UDP: Unable to store meter"))
@ -157,7 +159,7 @@ class CollectorService(os_service.Service):
When the notification messages are re-published through the When the notification messages are re-published through the
RPC publisher, this method receives them for processing. RPC publisher, this method receives them for processing.
""" """
self.dispatcher_manager.map_method('record_metering_data', data=data) self.meter_manager.map_method('record_metering_data', data=data)
class CollectorEndpoint(object): class CollectorEndpoint(object):

View File

@ -20,49 +20,62 @@ from oslo_log import log
import six import six
from stevedore import named from stevedore import named
from ceilometer.i18n import _ from ceilometer.i18n import _LW
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
OPTS = [ OPTS = [
cfg.MultiStrOpt('dispatcher', cfg.MultiStrOpt('meter_dispatchers',
deprecated_group="collector", deprecated_name='dispatcher',
default=['database'], default=['database'],
help='Dispatcher to process data.'), help='Dispatchers to process metering data.'),
cfg.MultiStrOpt('event_dispatchers',
default=['database'],
deprecated_name='dispatcher',
help='Dispatchers to process event data.'),
] ]
cfg.CONF.register_opts(OPTS) cfg.CONF.register_opts(OPTS)
DISPATCHER_NAMESPACE = 'ceilometer.dispatcher' def _load_dispatcher_manager(dispatcher_type):
namespace = 'ceilometer.dispatcher.%s' % dispatcher_type
conf_name = '%s_dispatchers' % dispatcher_type
LOG.debug('loading dispatchers from %s', namespace)
def load_dispatcher_manager():
LOG.debug('loading dispatchers from %s', DISPATCHER_NAMESPACE)
# set propagate_map_exceptions to True to enable stevedore # set propagate_map_exceptions to True to enable stevedore
# to propagate exceptions. # to propagate exceptions.
dispatcher_manager = named.NamedExtensionManager( dispatcher_manager = named.NamedExtensionManager(
namespace=DISPATCHER_NAMESPACE, namespace=namespace,
names=cfg.CONF.dispatcher, names=getattr(cfg.CONF, conf_name),
invoke_on_load=True, invoke_on_load=True,
invoke_args=[cfg.CONF], invoke_args=[cfg.CONF],
propagate_map_exceptions=True) propagate_map_exceptions=True)
if not list(dispatcher_manager): if not list(dispatcher_manager):
LOG.warning(_('Failed to load any dispatchers for %s'), LOG.warning(_LW('Failed to load any dispatchers for %s'),
DISPATCHER_NAMESPACE) namespace)
return dispatcher_manager return dispatcher_manager
@six.add_metaclass(abc.ABCMeta) def load_dispatcher_manager():
class Base(object): return (_load_dispatcher_manager('meter'),
_load_dispatcher_manager('event'))
class Base(object):
def __init__(self, conf): def __init__(self, conf):
self.conf = conf self.conf = conf
@six.add_metaclass(abc.ABCMeta)
class MeterDispatcherBase(Base):
@abc.abstractmethod @abc.abstractmethod
def record_metering_data(self, data): def record_metering_data(self, data):
"""Recording metering data interface.""" """Recording metering data interface."""
@six.add_metaclass(abc.ABCMeta)
class EventDispatcherBase(Base):
@abc.abstractmethod @abc.abstractmethod
def record_events(self, events): def record_events(self, events):
"""Recording events interface.""" """Recording events interface."""

View File

@ -25,7 +25,8 @@ from ceilometer import storage
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
class DatabaseDispatcher(dispatcher.Base): class DatabaseDispatcher(dispatcher.MeterDispatcherBase,
dispatcher.EventDispatcherBase):
"""Dispatcher class for recording metering data into database. """Dispatcher class for recording metering data into database.
The dispatcher class which records each meter into a database configured The dispatcher class which records each meter into a database configured
@ -35,8 +36,10 @@ class DatabaseDispatcher(dispatcher.Base):
ceilometer.conf file ceilometer.conf file
[DEFAULT] [DEFAULT]
dispatcher = database meter_dispatchers = database
event_dispatchers = database
""" """
def __init__(self, conf): def __init__(self, conf):
super(DatabaseDispatcher, self).__init__(conf) super(DatabaseDispatcher, self).__init__(conf)

View File

@ -35,12 +35,13 @@ OPTS = [
cfg.CONF.register_opts(OPTS, group="dispatcher_file") cfg.CONF.register_opts(OPTS, group="dispatcher_file")
class FileDispatcher(dispatcher.Base): class FileDispatcher(dispatcher.MeterDispatcherBase,
dispatcher.EventDispatcherBase):
"""Dispatcher class for recording metering data to a file. """Dispatcher class for recording metering data to a file.
The dispatcher class which logs each meter into a file configured in The dispatcher class which logs each meter and/or event into a file
ceilometer configuration file. An example configuration may look like the configured in ceilometer configuration file. An example configuration may
following: look like the following:
[dispatcher_file] [dispatcher_file]
file_path = /tmp/meters file_path = /tmp/meters
@ -49,7 +50,8 @@ class FileDispatcher(dispatcher.Base):
ceilometer.conf file ceilometer.conf file
[DEFAULT] [DEFAULT]
dispatcher = file meter_dispatchers = file
event_dispatchers = file
""" """
def __init__(self, conf): def __init__(self, conf):

View File

@ -171,7 +171,23 @@ class ResourcesDefinition(object):
return metrics return metrics
class GnocchiDispatcher(dispatcher.Base): class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
"""Dispatcher class for recording metering data into database.
The dispatcher class records each meter into the gnocchi service
configured in ceilometer configuration file. An example configuration may
look like the following:
[dispatcher_gnocchi]
url = http://localhost:8041
archive_policy = low
To enable this dispatcher, the following section needs to be present in
ceilometer.conf file
[DEFAULT]
meter_dispatchers = gnocchi
"""
def __init__(self, conf): def __init__(self, conf):
super(GnocchiDispatcher, self).__init__(conf) super(GnocchiDispatcher, self).__init__(conf)
self.conf = conf self.conf = conf
@ -347,7 +363,3 @@ class GnocchiDispatcher(dispatcher.Base):
# NOTE(sileht): Just ignore the metric have been # NOTE(sileht): Just ignore the metric have been
# created in the meantime. # created in the meantime.
pass pass
@staticmethod
def record_events(events):
pass

View File

@ -50,14 +50,16 @@ http_dispatcher_opts = [
cfg.CONF.register_opts(http_dispatcher_opts, group="dispatcher_http") cfg.CONF.register_opts(http_dispatcher_opts, group="dispatcher_http")
class HttpDispatcher(dispatcher.Base): class HttpDispatcher(dispatcher.MeterDispatcherBase,
"""Dispatcher class for posting metering data into a http target. dispatcher.EventDispatcherBase):
"""Dispatcher class for posting metering/event data into a http target.
To enable this dispatcher, the following option needs to be present in To enable this dispatcher, the following option needs to be present in
ceilometer.conf file:: ceilometer.conf file::
[DEFAULT] [DEFAULT]
dispatcher = http meter_dispatchers = http
event_dispatchers = http
Dispatcher specific options can be added as follows:: Dispatcher specific options can be added as follows::
@ -67,6 +69,7 @@ class HttpDispatcher(dispatcher.Base):
cadf_only = true cadf_only = true
timeout = 2 timeout = 2
""" """
def __init__(self, conf): def __init__(self, conf):
super(HttpDispatcher, self).__init__(conf) super(HttpDispatcher, self).__init__(conf)
self.headers = {'Content-type': 'application/json'} self.headers = {'Content-type': 'application/json'}

View File

@ -29,7 +29,7 @@ class TestAPIUpgradePath(v2.FunctionalTest):
self.CONF.set_override('gnocchi_is_enabled', None, group='api') self.CONF.set_override('gnocchi_is_enabled', None, group='api')
self.CONF.set_override('aodh_is_enabled', None, group='api') self.CONF.set_override('aodh_is_enabled', None, group='api')
self.CONF.set_override('aodh_url', None, group='api') self.CONF.set_override('aodh_url', None, group='api')
self.CONF.set_override('dispatcher', ['database']) self.CONF.set_override('meter_dispatchers', ['database'])
self.ks = mock.Mock() self.ks = mock.Mock()
self.ks.service_catalog.url_for.side_effect = self._url_for self.ks.service_catalog.url_for.side_effect = self._url_for
self.useFixture(mockpatch.Patch( self.useFixture(mockpatch.Patch(
@ -44,7 +44,7 @@ class TestAPIUpgradePath(v2.FunctionalTest):
raise exceptions.EndpointNotFound() raise exceptions.EndpointNotFound()
def _do_test_gnocchi_enabled_without_database_backend(self): def _do_test_gnocchi_enabled_without_database_backend(self):
self.CONF.set_override('dispatcher', 'gnocchi') self.CONF.set_override('meter_dispatchers', 'gnocchi')
for endpoint in ['meters', 'samples', 'resources']: for endpoint in ['meters', 'samples', 'resources']:
response = self.app.get(self.PATH_PREFIX + '/' + endpoint, response = self.app.get(self.PATH_PREFIX + '/' + endpoint,
status=410) status=410)

View File

@ -102,7 +102,7 @@ class TestCollector(tests_base.BaseTestCase):
], propagate_map_exceptions=True) ], propagate_map_exceptions=True)
self.useFixture(mockpatch.Patch( self.useFixture(mockpatch.Patch(
'ceilometer.dispatcher.load_dispatcher_manager', 'ceilometer.dispatcher.load_dispatcher_manager',
return_value=fake_dispatcher)) return_value=(fake_dispatcher, fake_dispatcher)))
return plugin return plugin
def _make_fake_socket(self, sample): def _make_fake_socket(self, sample):
@ -124,7 +124,8 @@ class TestCollector(tests_base.BaseTestCase):
def test_record_metering_data(self): def test_record_metering_data(self):
mock_dispatcher = self._setup_fake_dispatcher() mock_dispatcher = self._setup_fake_dispatcher()
self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager() dps = dispatcher.load_dispatcher_manager()
(self.srv.meter_manager, self.srv.manager) = dps
self.srv.record_metering_data(None, self.counter) self.srv.record_metering_data(None, self.counter)
mock_dispatcher.record_metering_data.assert_called_once_with( mock_dispatcher.record_metering_data.assert_called_once_with(
data=self.counter) data=self.counter)
@ -187,6 +188,7 @@ class TestCollector(tests_base.BaseTestCase):
def test_udp_receive_bad_decoding(self): def test_udp_receive_bad_decoding(self):
self._setup_messaging(False) self._setup_messaging(False)
self._setup_fake_dispatcher()
udp_socket = self._make_fake_socket(self.counter) udp_socket = self._make_fake_socket(self.counter)
with mock.patch('socket.socket', return_value=udp_socket): with mock.patch('socket.socket', return_value=udp_socket):
with mock.patch('msgpack.loads', self._raise_error): with mock.patch('msgpack.loads', self._raise_error):
@ -199,6 +201,7 @@ class TestCollector(tests_base.BaseTestCase):
def test_only_udp(self, udp_start, rpc_start): def test_only_udp(self, udp_start, rpc_start):
"""Check that only UDP is started if messaging transport is unset.""" """Check that only UDP is started if messaging transport is unset."""
self._setup_messaging(False) self._setup_messaging(False)
self._setup_fake_dispatcher()
udp_socket = self._make_fake_socket(self.counter) udp_socket = self._make_fake_socket(self.counter)
with mock.patch('socket.socket', return_value=udp_socket): with mock.patch('socket.socket', return_value=udp_socket):
self.srv.start() self.srv.start()
@ -211,6 +214,7 @@ class TestCollector(tests_base.BaseTestCase):
"""Check that only RPC is started if udp_address is empty.""" """Check that only RPC is started if udp_address is empty."""
self.CONF.set_override('enable_rpc', True, group='collector') self.CONF.set_override('enable_rpc', True, group='collector')
self.CONF.set_override('udp_address', '', group='collector') self.CONF.set_override('udp_address', '', group='collector')
self._setup_fake_dispatcher()
self.srv.start() self.srv.start()
# two calls because two servers (notification and rpc) # two calls because two servers (notification and rpc)
self.assertEqual(2, rpc_start.call_count) self.assertEqual(2, rpc_start.call_count)
@ -231,14 +235,13 @@ class TestCollector(tests_base.BaseTestCase):
def test_collector_no_mock(self, mylog): def test_collector_no_mock(self, mylog):
self.CONF.set_override('enable_rpc', True, group='collector') self.CONF.set_override('enable_rpc', True, group='collector')
self.CONF.set_override('udp_address', '', group='collector') self.CONF.set_override('udp_address', '', group='collector')
self.srv.start()
mylog.info.side_effect = lambda *args: self.srv.stop() mylog.info.side_effect = lambda *args: self.srv.stop()
self.srv.start()
client = messaging.get_rpc_client(self.transport, version='1.0') client = messaging.get_rpc_client(self.transport, version='1.0')
cclient = client.prepare(topic='metering') cclient = client.prepare(topic='metering')
cclient.cast(context.RequestContext(), cclient.cast(context.RequestContext(),
'record_metering_data', data=[self.utf8_msg]) 'record_metering_data', data=[self.utf8_msg])
self.srv.rpc_server.wait() self.srv.rpc_server.wait()
mylog.info.assert_called_once_with( mylog.info.assert_called_once_with(
'metering data test for test_run_tasks: 1') 'metering data test for test_run_tasks: 1')

View File

@ -0,0 +1,52 @@
# Copyright 2015 Intel Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import fixture
from oslotest import mockpatch
from ceilometer import dispatcher
from ceilometer.tests import base
class FakeDispatcherSample(dispatcher.MeterDispatcherBase):
def record_metering_data(self, data):
pass
class FakeDispatcher(dispatcher.MeterDispatcherBase,
dispatcher.EventDispatcherBase):
def record_metering_data(self, data):
pass
def record_events(self, events):
pass
class TestDispatchManager(base.BaseTestCase):
def setUp(self):
super(TestDispatchManager, self).setUp()
self.conf = self.useFixture(fixture.Config())
self.conf.config(meter_dispatchers=['database', 'gnocchi'],
event_dispatchers=['database'])
self.useFixture(mockpatch.Patch(
'ceilometer.dispatcher.gnocchi.GnocchiDispatcher',
new=FakeDispatcherSample))
self.useFixture(mockpatch.Patch(
'ceilometer.dispatcher.database.DatabaseDispatcher',
new=FakeDispatcher))
def test_load(self):
sample_mg, event_mg = dispatcher.load_dispatcher_manager()
self.assertEqual(2, len(list(sample_mg)))
self.assertEqual(1, len(list(event_mg)))

View File

@ -281,12 +281,17 @@ console_scripts =
ceilometer-alarm-evaluator = ceilometer.cmd.eventlet.alarm:evaluator ceilometer-alarm-evaluator = ceilometer.cmd.eventlet.alarm:evaluator
ceilometer-alarm-notifier = ceilometer.cmd.eventlet.alarm:notifier ceilometer-alarm-notifier = ceilometer.cmd.eventlet.alarm:notifier
ceilometer.dispatcher = ceilometer.dispatcher.meter =
database = ceilometer.dispatcher.database:DatabaseDispatcher database = ceilometer.dispatcher.database:DatabaseDispatcher
file = ceilometer.dispatcher.file:FileDispatcher file = ceilometer.dispatcher.file:FileDispatcher
http = ceilometer.dispatcher.http:HttpDispatcher http = ceilometer.dispatcher.http:HttpDispatcher
gnocchi = ceilometer.dispatcher.gnocchi:GnocchiDispatcher gnocchi = ceilometer.dispatcher.gnocchi:GnocchiDispatcher
ceilometer.dispatcher.event =
database = ceilometer.dispatcher.database:DatabaseDispatcher
file = ceilometer.dispatcher.file:FileDispatcher
http = ceilometer.dispatcher.http:HttpDispatcher
network.statistics.drivers = network.statistics.drivers =
opendaylight = ceilometer.network.statistics.opendaylight.driver:OpenDayLightDriver opendaylight = ceilometer.network.statistics.opendaylight.driver:OpenDayLightDriver
opencontrail = ceilometer.network.statistics.opencontrail.driver:OpencontrailDriver opencontrail = ceilometer.network.statistics.opencontrail.driver:OpencontrailDriver