Merge "Configure collector to only record meter or event"
This commit is contained in:
commit
77ec74f3f3
@ -52,7 +52,7 @@ API_OPTS = [
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(API_OPTS, group='api')
|
||||
cfg.CONF.import_opt('dispatcher', 'ceilometer.dispatcher')
|
||||
cfg.CONF.import_opt('meter_dispatchers', 'ceilometer.dispatcher')
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@ -110,8 +110,8 @@ class V2Controller(object):
|
||||
if cfg.CONF.api.gnocchi_is_enabled is not None:
|
||||
self._gnocchi_is_enabled = cfg.CONF.api.gnocchi_is_enabled
|
||||
|
||||
elif ("gnocchi" not in cfg.CONF.dispatcher
|
||||
or "database" in cfg.CONF.dispatcher):
|
||||
elif ("gnocchi" not in cfg.CONF.meter_dispatchers
|
||||
or "database" in cfg.CONF.meter_dispatchers):
|
||||
self._gnocchi_is_enabled = False
|
||||
else:
|
||||
try:
|
||||
|
@ -71,7 +71,8 @@ class CollectorService(os_service.Service):
|
||||
def start(self):
|
||||
"""Bind the UDP socket and handle incoming data."""
|
||||
# ensure dispatcher is configured before starting other services
|
||||
self.dispatcher_manager = dispatcher.load_dispatcher_manager()
|
||||
dispatcher_managers = dispatcher.load_dispatcher_manager()
|
||||
(self.meter_manager, self.event_manager) = dispatcher_managers
|
||||
self.rpc_server = None
|
||||
self.sample_listener = None
|
||||
self.event_listener = None
|
||||
@ -88,27 +89,28 @@ class CollectorService(os_service.Service):
|
||||
self.rpc_server = messaging.get_rpc_server(
|
||||
transport, cfg.CONF.publisher_rpc.metering_topic, self)
|
||||
|
||||
sample_target = oslo_messaging.Target(
|
||||
topic=cfg.CONF.publisher_notifier.metering_topic)
|
||||
self.sample_listener = messaging.get_notification_listener(
|
||||
transport, [sample_target],
|
||||
[SampleEndpoint(self.dispatcher_manager)],
|
||||
allow_requeue=(cfg.CONF.collector.
|
||||
requeue_sample_on_dispatcher_error))
|
||||
if list(self.meter_manager):
|
||||
sample_target = oslo_messaging.Target(
|
||||
topic=cfg.CONF.publisher_notifier.metering_topic)
|
||||
self.sample_listener = messaging.get_notification_listener(
|
||||
transport, [sample_target],
|
||||
[SampleEndpoint(self.meter_manager)],
|
||||
allow_requeue=(cfg.CONF.collector.
|
||||
requeue_sample_on_dispatcher_error))
|
||||
self.sample_listener.start()
|
||||
|
||||
if cfg.CONF.notification.store_events:
|
||||
if cfg.CONF.notification.store_events and list(self.event_manager):
|
||||
event_target = oslo_messaging.Target(
|
||||
topic=cfg.CONF.publisher_notifier.event_topic)
|
||||
self.event_listener = messaging.get_notification_listener(
|
||||
transport, [event_target],
|
||||
[EventEndpoint(self.dispatcher_manager)],
|
||||
[EventEndpoint(self.event_manager)],
|
||||
allow_requeue=(cfg.CONF.collector.
|
||||
requeue_event_on_dispatcher_error))
|
||||
self.event_listener.start()
|
||||
|
||||
if cfg.CONF.collector.enable_rpc:
|
||||
self.rpc_server.start()
|
||||
self.sample_listener.start()
|
||||
|
||||
if not cfg.CONF.collector.udp_address:
|
||||
# Add a dummy thread to have wait() working
|
||||
@ -135,8 +137,8 @@ class CollectorService(os_service.Service):
|
||||
else:
|
||||
try:
|
||||
LOG.debug("UDP: Storing %s", sample)
|
||||
self.dispatcher_manager.map_method('record_metering_data',
|
||||
sample)
|
||||
self.meter_manager.map_method('record_metering_data',
|
||||
sample)
|
||||
except Exception:
|
||||
LOG.exception(_("UDP: Unable to store meter"))
|
||||
|
||||
@ -156,7 +158,7 @@ class CollectorService(os_service.Service):
|
||||
When the notification messages are re-published through the
|
||||
RPC publisher, this method receives them for processing.
|
||||
"""
|
||||
self.dispatcher_manager.map_method('record_metering_data', data=data)
|
||||
self.meter_manager.map_method('record_metering_data', data=data)
|
||||
|
||||
|
||||
class CollectorEndpoint(object):
|
||||
|
@ -20,49 +20,62 @@ from oslo_log import log
|
||||
import six
|
||||
from stevedore import named
|
||||
|
||||
from ceilometer.i18n import _
|
||||
from ceilometer.i18n import _LW
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
OPTS = [
|
||||
cfg.MultiStrOpt('dispatcher',
|
||||
deprecated_group="collector",
|
||||
cfg.MultiStrOpt('meter_dispatchers',
|
||||
deprecated_name='dispatcher',
|
||||
default=['database'],
|
||||
help='Dispatcher to process data.'),
|
||||
help='Dispatchers to process metering data.'),
|
||||
cfg.MultiStrOpt('event_dispatchers',
|
||||
default=['database'],
|
||||
deprecated_name='dispatcher',
|
||||
help='Dispatchers to process event data.'),
|
||||
]
|
||||
cfg.CONF.register_opts(OPTS)
|
||||
|
||||
|
||||
DISPATCHER_NAMESPACE = 'ceilometer.dispatcher'
|
||||
def _load_dispatcher_manager(dispatcher_type):
|
||||
namespace = 'ceilometer.dispatcher.%s' % dispatcher_type
|
||||
conf_name = '%s_dispatchers' % dispatcher_type
|
||||
|
||||
|
||||
def load_dispatcher_manager():
|
||||
LOG.debug('loading dispatchers from %s', DISPATCHER_NAMESPACE)
|
||||
LOG.debug('loading dispatchers from %s', namespace)
|
||||
# set propagate_map_exceptions to True to enable stevedore
|
||||
# to propagate exceptions.
|
||||
dispatcher_manager = named.NamedExtensionManager(
|
||||
namespace=DISPATCHER_NAMESPACE,
|
||||
names=cfg.CONF.dispatcher,
|
||||
namespace=namespace,
|
||||
names=getattr(cfg.CONF, conf_name),
|
||||
invoke_on_load=True,
|
||||
invoke_args=[cfg.CONF],
|
||||
propagate_map_exceptions=True)
|
||||
if not list(dispatcher_manager):
|
||||
LOG.warning(_('Failed to load any dispatchers for %s'),
|
||||
DISPATCHER_NAMESPACE)
|
||||
LOG.warning(_LW('Failed to load any dispatchers for %s'),
|
||||
namespace)
|
||||
return dispatcher_manager
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Base(object):
|
||||
def load_dispatcher_manager():
|
||||
return (_load_dispatcher_manager('meter'),
|
||||
_load_dispatcher_manager('event'))
|
||||
|
||||
|
||||
class Base(object):
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class MeterDispatcherBase(Base):
|
||||
@abc.abstractmethod
|
||||
def record_metering_data(self, data):
|
||||
"""Recording metering data interface."""
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class EventDispatcherBase(Base):
|
||||
@abc.abstractmethod
|
||||
def record_events(self, events):
|
||||
"""Recording events interface."""
|
||||
|
@ -25,7 +25,8 @@ from ceilometer import storage
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class DatabaseDispatcher(dispatcher.Base):
|
||||
class DatabaseDispatcher(dispatcher.MeterDispatcherBase,
|
||||
dispatcher.EventDispatcherBase):
|
||||
"""Dispatcher class for recording metering data into database.
|
||||
|
||||
The dispatcher class which records each meter into a database configured
|
||||
@ -35,8 +36,10 @@ class DatabaseDispatcher(dispatcher.Base):
|
||||
ceilometer.conf file
|
||||
|
||||
[DEFAULT]
|
||||
dispatcher = database
|
||||
meter_dispatchers = database
|
||||
event_dispatchers = database
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
super(DatabaseDispatcher, self).__init__(conf)
|
||||
|
||||
|
@ -35,12 +35,13 @@ OPTS = [
|
||||
cfg.CONF.register_opts(OPTS, group="dispatcher_file")
|
||||
|
||||
|
||||
class FileDispatcher(dispatcher.Base):
|
||||
class FileDispatcher(dispatcher.MeterDispatcherBase,
|
||||
dispatcher.EventDispatcherBase):
|
||||
"""Dispatcher class for recording metering data to a file.
|
||||
|
||||
The dispatcher class which logs each meter into a file configured in
|
||||
ceilometer configuration file. An example configuration may look like the
|
||||
following:
|
||||
The dispatcher class which logs each meter and/or event into a file
|
||||
configured in ceilometer configuration file. An example configuration may
|
||||
look like the following:
|
||||
|
||||
[dispatcher_file]
|
||||
file_path = /tmp/meters
|
||||
@ -49,7 +50,8 @@ class FileDispatcher(dispatcher.Base):
|
||||
ceilometer.conf file
|
||||
|
||||
[DEFAULT]
|
||||
dispatcher = file
|
||||
meter_dispatchers = file
|
||||
event_dispatchers = file
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
|
@ -124,7 +124,23 @@ class ResourcesDefinition(object):
|
||||
return metrics
|
||||
|
||||
|
||||
class GnocchiDispatcher(dispatcher.Base):
|
||||
class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
|
||||
"""Dispatcher class for recording metering data into database.
|
||||
|
||||
The dispatcher class records each meter into the gnocchi service
|
||||
configured in ceilometer configuration file. An example configuration may
|
||||
look like the following:
|
||||
|
||||
[dispatcher_gnocchi]
|
||||
url = http://localhost:8041
|
||||
archive_policy = low
|
||||
|
||||
To enable this dispatcher, the following section needs to be present in
|
||||
ceilometer.conf file
|
||||
|
||||
[DEFAULT]
|
||||
meter_dispatchers = gnocchi
|
||||
"""
|
||||
def __init__(self, conf):
|
||||
super(GnocchiDispatcher, self).__init__(conf)
|
||||
self.conf = conf
|
||||
@ -279,7 +295,3 @@ class GnocchiDispatcher(dispatcher.Base):
|
||||
# NOTE(sileht): Just ignore the metric have been
|
||||
# created in the meantime.
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def record_events(events):
|
||||
pass
|
||||
|
@ -50,14 +50,16 @@ http_dispatcher_opts = [
|
||||
cfg.CONF.register_opts(http_dispatcher_opts, group="dispatcher_http")
|
||||
|
||||
|
||||
class HttpDispatcher(dispatcher.Base):
|
||||
"""Dispatcher class for posting metering data into a http target.
|
||||
class HttpDispatcher(dispatcher.MeterDispatcherBase,
|
||||
dispatcher.EventDispatcherBase):
|
||||
"""Dispatcher class for posting metering/event data into a http target.
|
||||
|
||||
To enable this dispatcher, the following option needs to be present in
|
||||
ceilometer.conf file::
|
||||
|
||||
[DEFAULT]
|
||||
dispatcher = http
|
||||
meter_dispatchers = http
|
||||
event_dispatchers = http
|
||||
|
||||
Dispatcher specific options can be added as follows::
|
||||
|
||||
@ -67,6 +69,7 @@ class HttpDispatcher(dispatcher.Base):
|
||||
cadf_only = true
|
||||
timeout = 2
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
super(HttpDispatcher, self).__init__(conf)
|
||||
self.headers = {'Content-type': 'application/json'}
|
||||
|
@ -29,7 +29,7 @@ class TestAPIUpgradePath(v2.FunctionalTest):
|
||||
self.CONF.set_override('gnocchi_is_enabled', None, group='api')
|
||||
self.CONF.set_override('aodh_is_enabled', None, group='api')
|
||||
self.CONF.set_override('aodh_url', None, group='api')
|
||||
self.CONF.set_override('dispatcher', ['database'])
|
||||
self.CONF.set_override('meter_dispatchers', ['database'])
|
||||
self.ks = mock.Mock()
|
||||
self.ks.service_catalog.url_for.side_effect = self._url_for
|
||||
self.useFixture(mockpatch.Patch(
|
||||
@ -44,7 +44,7 @@ class TestAPIUpgradePath(v2.FunctionalTest):
|
||||
raise exceptions.EndpointNotFound()
|
||||
|
||||
def _do_test_gnocchi_enabled_without_database_backend(self):
|
||||
self.CONF.set_override('dispatcher', 'gnocchi')
|
||||
self.CONF.set_override('meter_dispatchers', 'gnocchi')
|
||||
for endpoint in ['meters', 'samples', 'resources']:
|
||||
response = self.app.get(self.PATH_PREFIX + '/' + endpoint,
|
||||
status=410)
|
||||
|
@ -102,7 +102,7 @@ class TestCollector(tests_base.BaseTestCase):
|
||||
], propagate_map_exceptions=True)
|
||||
self.useFixture(mockpatch.Patch(
|
||||
'ceilometer.dispatcher.load_dispatcher_manager',
|
||||
return_value=fake_dispatcher))
|
||||
return_value=(fake_dispatcher, fake_dispatcher)))
|
||||
return plugin
|
||||
|
||||
def _make_fake_socket(self, sample):
|
||||
@ -124,7 +124,8 @@ class TestCollector(tests_base.BaseTestCase):
|
||||
|
||||
def test_record_metering_data(self):
|
||||
mock_dispatcher = self._setup_fake_dispatcher()
|
||||
self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager()
|
||||
dps = dispatcher.load_dispatcher_manager()
|
||||
(self.srv.meter_manager, self.srv.manager) = dps
|
||||
self.srv.record_metering_data(None, self.counter)
|
||||
mock_dispatcher.record_metering_data.assert_called_once_with(
|
||||
data=self.counter)
|
||||
@ -187,6 +188,7 @@ class TestCollector(tests_base.BaseTestCase):
|
||||
|
||||
def test_udp_receive_bad_decoding(self):
|
||||
self._setup_messaging(False)
|
||||
self._setup_fake_dispatcher()
|
||||
udp_socket = self._make_fake_socket(self.counter)
|
||||
with mock.patch('socket.socket', return_value=udp_socket):
|
||||
with mock.patch('msgpack.loads', self._raise_error):
|
||||
@ -199,6 +201,7 @@ class TestCollector(tests_base.BaseTestCase):
|
||||
def test_only_udp(self, udp_start, rpc_start):
|
||||
"""Check that only UDP is started if messaging transport is unset."""
|
||||
self._setup_messaging(False)
|
||||
self._setup_fake_dispatcher()
|
||||
udp_socket = self._make_fake_socket(self.counter)
|
||||
with mock.patch('socket.socket', return_value=udp_socket):
|
||||
self.srv.start()
|
||||
@ -211,6 +214,7 @@ class TestCollector(tests_base.BaseTestCase):
|
||||
"""Check that only RPC is started if udp_address is empty."""
|
||||
self.CONF.set_override('enable_rpc', True, group='collector')
|
||||
self.CONF.set_override('udp_address', '', group='collector')
|
||||
self._setup_fake_dispatcher()
|
||||
self.srv.start()
|
||||
# two calls because two servers (notification and rpc)
|
||||
self.assertEqual(2, rpc_start.call_count)
|
||||
@ -231,14 +235,13 @@ class TestCollector(tests_base.BaseTestCase):
|
||||
def test_collector_no_mock(self, mylog):
|
||||
self.CONF.set_override('enable_rpc', True, group='collector')
|
||||
self.CONF.set_override('udp_address', '', group='collector')
|
||||
self.srv.start()
|
||||
mylog.info.side_effect = lambda *args: self.srv.stop()
|
||||
self.srv.start()
|
||||
|
||||
client = messaging.get_rpc_client(self.transport, version='1.0')
|
||||
cclient = client.prepare(topic='metering')
|
||||
cclient.cast(context.RequestContext(),
|
||||
'record_metering_data', data=[self.utf8_msg])
|
||||
|
||||
self.srv.rpc_server.wait()
|
||||
mylog.info.assert_called_once_with(
|
||||
'metering data test for test_run_tasks: 1')
|
||||
|
52
ceilometer/tests/unit/dispatcher/test_dispatcher.py
Normal file
52
ceilometer/tests/unit/dispatcher/test_dispatcher.py
Normal file
@ -0,0 +1,52 @@
|
||||
# Copyright 2015 Intel Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import fixture
|
||||
from oslotest import mockpatch
|
||||
|
||||
from ceilometer import dispatcher
|
||||
from ceilometer.tests import base
|
||||
|
||||
|
||||
class FakeDispatcherSample(dispatcher.MeterDispatcherBase):
|
||||
def record_metering_data(self, data):
|
||||
pass
|
||||
|
||||
|
||||
class FakeDispatcher(dispatcher.MeterDispatcherBase,
|
||||
dispatcher.EventDispatcherBase):
|
||||
def record_metering_data(self, data):
|
||||
pass
|
||||
|
||||
def record_events(self, events):
|
||||
pass
|
||||
|
||||
|
||||
class TestDispatchManager(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestDispatchManager, self).setUp()
|
||||
self.conf = self.useFixture(fixture.Config())
|
||||
self.conf.config(meter_dispatchers=['database', 'gnocchi'],
|
||||
event_dispatchers=['database'])
|
||||
self.useFixture(mockpatch.Patch(
|
||||
'ceilometer.dispatcher.gnocchi.GnocchiDispatcher',
|
||||
new=FakeDispatcherSample))
|
||||
self.useFixture(mockpatch.Patch(
|
||||
'ceilometer.dispatcher.database.DatabaseDispatcher',
|
||||
new=FakeDispatcher))
|
||||
|
||||
def test_load(self):
|
||||
sample_mg, event_mg = dispatcher.load_dispatcher_manager()
|
||||
self.assertEqual(2, len(list(sample_mg)))
|
||||
self.assertEqual(1, len(list(event_mg)))
|
@ -281,12 +281,17 @@ console_scripts =
|
||||
ceilometer-alarm-evaluator = ceilometer.cmd.eventlet.alarm:evaluator
|
||||
ceilometer-alarm-notifier = ceilometer.cmd.eventlet.alarm:notifier
|
||||
|
||||
ceilometer.dispatcher =
|
||||
ceilometer.dispatcher.meter =
|
||||
database = ceilometer.dispatcher.database:DatabaseDispatcher
|
||||
file = ceilometer.dispatcher.file:FileDispatcher
|
||||
http = ceilometer.dispatcher.http:HttpDispatcher
|
||||
gnocchi = ceilometer.dispatcher.gnocchi:GnocchiDispatcher
|
||||
|
||||
ceilometer.dispatcher.event =
|
||||
database = ceilometer.dispatcher.database:DatabaseDispatcher
|
||||
file = ceilometer.dispatcher.file:FileDispatcher
|
||||
http = ceilometer.dispatcher.http:HttpDispatcher
|
||||
|
||||
network.statistics.drivers =
|
||||
opendaylight = ceilometer.network.statistics.opendaylight.driver:OpenDayLightDriver
|
||||
opencontrail = ceilometer.network.statistics.opencontrail.driver:OpencontrailDriver
|
||||
|
Loading…
Reference in New Issue
Block a user