Remove store_events options
The current option is misnamed, as it does not enable any storing but just the processing of events from the notification agent to the collector. This means that even if you set event_dispatchers=panko and forget to set store_events=true, nothing will happen. This patch enable the event processing as soon as something is configured in the pipeline. Change-Id: I5a906684f6371b0548ac08cacc13aa238f780f78
This commit is contained in:
parent
5c05c047a6
commit
7f8f6cccd6
@ -52,8 +52,6 @@ cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging',
|
|||||||
group='publisher_notifier')
|
group='publisher_notifier')
|
||||||
cfg.CONF.import_opt('event_topic', 'ceilometer.publisher.messaging',
|
cfg.CONF.import_opt('event_topic', 'ceilometer.publisher.messaging',
|
||||||
group='publisher_notifier')
|
group='publisher_notifier')
|
||||||
cfg.CONF.import_opt('store_events', 'ceilometer.notification',
|
|
||||||
group='notification')
|
|
||||||
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
@ -89,7 +87,7 @@ class CollectorService(cotyledon.Service):
|
|||||||
batch_timeout=cfg.CONF.collector.batch_timeout))
|
batch_timeout=cfg.CONF.collector.batch_timeout))
|
||||||
self.sample_listener.start()
|
self.sample_listener.start()
|
||||||
|
|
||||||
if cfg.CONF.notification.store_events and list(self.event_manager):
|
if list(self.event_manager):
|
||||||
event_target = oslo_messaging.Target(
|
event_target = oslo_messaging.Target(
|
||||||
topic=cfg.CONF.publisher_notifier.event_topic)
|
topic=cfg.CONF.publisher_notifier.event_topic)
|
||||||
self.event_listener = (
|
self.event_listener = (
|
||||||
|
@ -47,10 +47,6 @@ OPTS = [
|
|||||||
default=True,
|
default=True,
|
||||||
deprecated_group='collector',
|
deprecated_group='collector',
|
||||||
help='Acknowledge message when event persistence fails.'),
|
help='Acknowledge message when event persistence fails.'),
|
||||||
cfg.BoolOpt('store_events',
|
|
||||||
deprecated_group='collector',
|
|
||||||
default=False,
|
|
||||||
help='Save event details.'),
|
|
||||||
cfg.BoolOpt('disable_non_metric_meters',
|
cfg.BoolOpt('disable_non_metric_meters',
|
||||||
default=True,
|
default=True,
|
||||||
help='WARNING: Ceilometer historically offered the ability to '
|
help='WARNING: Ceilometer historically offered the ability to '
|
||||||
@ -133,18 +129,16 @@ class NotificationService(service_base.PipelineBasedService):
|
|||||||
return pipe_manager
|
return pipe_manager
|
||||||
|
|
||||||
def _get_event_pipeline_manager(self, transport):
|
def _get_event_pipeline_manager(self, transport):
|
||||||
|
if cfg.CONF.notification.workload_partitioning:
|
||||||
|
event_pipe_manager = pipeline.EventPipelineTransportManager()
|
||||||
|
for pipe in self.event_pipeline_manager.pipelines:
|
||||||
|
event_pipe_manager.add_transporter(
|
||||||
|
(pipe.source.support_event, ['event_type'],
|
||||||
|
self._get_notifiers(transport, pipe)))
|
||||||
|
else:
|
||||||
|
event_pipe_manager = self.event_pipeline_manager
|
||||||
|
|
||||||
if cfg.CONF.notification.store_events:
|
return event_pipe_manager
|
||||||
if cfg.CONF.notification.workload_partitioning:
|
|
||||||
event_pipe_manager = pipeline.EventPipelineTransportManager()
|
|
||||||
for pipe in self.event_pipeline_manager.pipelines:
|
|
||||||
event_pipe_manager.add_transporter(
|
|
||||||
(pipe.source.support_event, ['event_type'],
|
|
||||||
self._get_notifiers(transport, pipe)))
|
|
||||||
else:
|
|
||||||
event_pipe_manager = self.event_pipeline_manager
|
|
||||||
|
|
||||||
return event_pipe_manager
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
super(NotificationService, self).run()
|
super(NotificationService, self).run()
|
||||||
@ -161,8 +155,7 @@ class NotificationService(service_base.PipelineBasedService):
|
|||||||
|
|
||||||
self.pipeline_manager = pipeline.setup_pipeline()
|
self.pipeline_manager = pipeline.setup_pipeline()
|
||||||
|
|
||||||
if cfg.CONF.notification.store_events:
|
self.event_pipeline_manager = pipeline.setup_event_pipeline()
|
||||||
self.event_pipeline_manager = pipeline.setup_event_pipeline()
|
|
||||||
|
|
||||||
self.transport = messaging.get_transport()
|
self.transport = messaging.get_transport()
|
||||||
|
|
||||||
@ -232,9 +225,8 @@ class NotificationService(service_base.PipelineBasedService):
|
|||||||
ack_on_error = cfg.CONF.notification.ack_on_event_error
|
ack_on_error = cfg.CONF.notification.ack_on_event_error
|
||||||
|
|
||||||
endpoints = []
|
endpoints = []
|
||||||
if cfg.CONF.notification.store_events:
|
endpoints.append(
|
||||||
endpoints.append(
|
event_endpoint.EventsNotificationEndpoint(event_pipe_manager))
|
||||||
event_endpoint.EventsNotificationEndpoint(event_pipe_manager))
|
|
||||||
|
|
||||||
targets = []
|
targets = []
|
||||||
for ext in notification_manager:
|
for ext in notification_manager:
|
||||||
@ -274,9 +266,7 @@ class NotificationService(service_base.PipelineBasedService):
|
|||||||
self._configure_pipeline_listener()
|
self._configure_pipeline_listener()
|
||||||
|
|
||||||
def _configure_pipeline_listener(self):
|
def _configure_pipeline_listener(self):
|
||||||
ev_pipes = []
|
ev_pipes = self.event_pipeline_manager.pipelines
|
||||||
if cfg.CONF.notification.store_events:
|
|
||||||
ev_pipes = self.event_pipeline_manager.pipelines
|
|
||||||
pipelines = self.pipeline_manager.pipelines + ev_pipes
|
pipelines = self.pipeline_manager.pipelines + ev_pipes
|
||||||
transport = messaging.get_transport()
|
transport = messaging.get_transport()
|
||||||
partitioned = self.partition_coordinator.extract_my_subset(
|
partitioned = self.partition_coordinator.extract_my_subset(
|
||||||
|
@ -63,8 +63,6 @@ class ConfigFixture(fixture.GabbiFixture):
|
|||||||
self.conf([], project='ceilometer', validate_default_values=True)
|
self.conf([], project='ceilometer', validate_default_values=True)
|
||||||
opts.set_defaults(self.conf)
|
opts.set_defaults(self.conf)
|
||||||
conf.import_group('api', 'ceilometer.api.controllers.v2.root')
|
conf.import_group('api', 'ceilometer.api.controllers.v2.root')
|
||||||
conf.import_opt('store_events', 'ceilometer.notification',
|
|
||||||
group='notification')
|
|
||||||
|
|
||||||
content = ('{"default": ""}')
|
content = ('{"default": ""}')
|
||||||
if six.PY3:
|
if six.PY3:
|
||||||
@ -94,8 +92,6 @@ class ConfigFixture(fixture.GabbiFixture):
|
|||||||
conf.set_override('gnocchi_is_enabled', False, group='api')
|
conf.set_override('gnocchi_is_enabled', False, group='api')
|
||||||
conf.set_override('aodh_is_enabled', False, group='api')
|
conf.set_override('aodh_is_enabled', False, group='api')
|
||||||
|
|
||||||
conf.set_override('store_events', True, group='notification')
|
|
||||||
|
|
||||||
def stop_fixture(self):
|
def stop_fixture(self):
|
||||||
"""Reset the config and remove data."""
|
"""Reset the config and remove data."""
|
||||||
if self.conf:
|
if self.conf:
|
||||||
|
@ -233,5 +233,4 @@ class TestCollector(tests_base.BaseTestCase):
|
|||||||
|
|
||||||
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
|
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
|
||||||
def test_collector_event_requeue(self):
|
def test_collector_event_requeue(self):
|
||||||
self.CONF.set_override('store_events', True, group='notification')
|
|
||||||
self._test_collector_requeue('event_listener')
|
self._test_collector_requeue('event_listener')
|
||||||
|
@ -90,7 +90,6 @@ class TestNotification(tests_base.BaseTestCase):
|
|||||||
self.CONF = self.useFixture(fixture_config.Config()).conf
|
self.CONF = self.useFixture(fixture_config.Config()).conf
|
||||||
self.CONF.set_override("connection", "log://", group='database')
|
self.CONF.set_override("connection", "log://", group='database')
|
||||||
self.CONF.set_override("backend_url", None, group="coordination")
|
self.CONF.set_override("backend_url", None, group="coordination")
|
||||||
self.CONF.set_override("store_events", False, group="notification")
|
|
||||||
self.CONF.set_override("disable_non_metric_meters", False,
|
self.CONF.set_override("disable_non_metric_meters", False,
|
||||||
group="notification")
|
group="notification")
|
||||||
self.setup_messaging(self.CONF)
|
self.setup_messaging(self.CONF)
|
||||||
@ -111,6 +110,7 @@ class TestNotification(tests_base.BaseTestCase):
|
|||||||
@mock.patch('ceilometer.event.endpoint.EventsNotificationEndpoint')
|
@mock.patch('ceilometer.event.endpoint.EventsNotificationEndpoint')
|
||||||
def _do_process_notification_manager_start(self,
|
def _do_process_notification_manager_start(self,
|
||||||
fake_event_endpoint_class):
|
fake_event_endpoint_class):
|
||||||
|
self.CONF([], project='ceilometer', validate_default_values=True)
|
||||||
with mock.patch.object(self.srv,
|
with mock.patch.object(self.srv,
|
||||||
'_get_notifications_manager') as get_nm:
|
'_get_notifications_manager') as get_nm:
|
||||||
get_nm.side_effect = self.fake_get_notifications_manager
|
get_nm.side_effect = self.fake_get_notifications_manager
|
||||||
@ -134,18 +134,10 @@ class TestNotification(tests_base.BaseTestCase):
|
|||||||
'payload': TEST_NOTICE_PAYLOAD,
|
'payload': TEST_NOTICE_PAYLOAD,
|
||||||
'metadata': TEST_NOTICE_METADATA}])
|
'metadata': TEST_NOTICE_METADATA}])
|
||||||
|
|
||||||
self.assertEqual(1, len(self.srv.listeners[0].dispatcher.endpoints))
|
self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints))
|
||||||
self.assertTrue(self.srv.pipeline_manager.publisher.called)
|
|
||||||
|
|
||||||
def test_process_notification_no_events(self):
|
|
||||||
self._do_process_notification_manager_start()
|
|
||||||
self.assertEqual(1, len(self.srv.listeners[0].dispatcher.endpoints))
|
|
||||||
self.assertNotEqual(self.fake_event_endpoint,
|
|
||||||
self.srv.listeners[0].dispatcher.endpoints[0])
|
|
||||||
|
|
||||||
@mock.patch('ceilometer.pipeline.setup_event_pipeline', mock.MagicMock())
|
@mock.patch('ceilometer.pipeline.setup_event_pipeline', mock.MagicMock())
|
||||||
def test_process_notification_with_events(self):
|
def test_process_notification_with_events(self):
|
||||||
self.CONF.set_override("store_events", True, group="notification")
|
|
||||||
self._do_process_notification_manager_start()
|
self._do_process_notification_manager_start()
|
||||||
self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints))
|
self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints))
|
||||||
self.assertEqual(self.fake_event_endpoint,
|
self.assertEqual(self.fake_event_endpoint,
|
||||||
@ -161,15 +153,14 @@ class TestNotification(tests_base.BaseTestCase):
|
|||||||
[extension.Extension('test', None, None, plugin),
|
[extension.Extension('test', None, None, plugin),
|
||||||
extension.Extension('test', None, None, plugin)])
|
extension.Extension('test', None, None, plugin)])
|
||||||
|
|
||||||
|
self.CONF([], project='ceilometer', validate_default_values=True)
|
||||||
with mock.patch.object(self.srv,
|
with mock.patch.object(self.srv,
|
||||||
'_get_notifications_manager') as get_nm:
|
'_get_notifications_manager') as get_nm:
|
||||||
get_nm.side_effect = fake_get_notifications_manager_dup_targets
|
get_nm.side_effect = fake_get_notifications_manager_dup_targets
|
||||||
self.srv.run()
|
self.srv.run()
|
||||||
self.addCleanup(self.srv.terminate)
|
self.addCleanup(self.srv.terminate)
|
||||||
self.assertEqual(1, len(mock_listener.call_args_list))
|
self.assertEqual(2, len(mock_listener.call_args_list))
|
||||||
args, kwargs = mock_listener.call_args
|
args, kwargs = mock_listener.call_args
|
||||||
self.assertEqual(1, len(args[1]))
|
|
||||||
self.assertIsInstance(args[1][0], oslo_messaging.Target)
|
|
||||||
self.assertEqual(1, len(self.srv.listeners))
|
self.assertEqual(1, len(self.srv.listeners))
|
||||||
|
|
||||||
|
|
||||||
@ -228,7 +219,6 @@ class BaseRealNotification(tests_base.BaseTestCase):
|
|||||||
self.expected_samples = 2
|
self.expected_samples = 2
|
||||||
|
|
||||||
self.CONF.set_override("backend_url", None, group="coordination")
|
self.CONF.set_override("backend_url", None, group="coordination")
|
||||||
self.CONF.set_override("store_events", True, group="notification")
|
|
||||||
self.CONF.set_override("disable_non_metric_meters", False,
|
self.CONF.set_override("disable_non_metric_meters", False,
|
||||||
group="notification")
|
group="notification")
|
||||||
|
|
||||||
@ -307,8 +297,6 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification):
|
|||||||
['compute.instance.create.start'])
|
['compute.instance.create.start'])
|
||||||
self.CONF.set_override("event_pipeline_cfg_file", ev_pipeline_cfg_file)
|
self.CONF.set_override("event_pipeline_cfg_file", ev_pipeline_cfg_file)
|
||||||
|
|
||||||
self.CONF.set_override("store_events", True, group="notification")
|
|
||||||
|
|
||||||
self.srv.run()
|
self.srv.run()
|
||||||
self.addCleanup(self.srv.terminate)
|
self.addCleanup(self.srv.terminate)
|
||||||
|
|
||||||
@ -516,7 +504,6 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
|
|||||||
pipeline_cfg_file = self.setup_pipeline(['instance', 'memory'])
|
pipeline_cfg_file = self.setup_pipeline(['instance', 'memory'])
|
||||||
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
|
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
|
||||||
self.CONF.set_override("backend_url", None, group="coordination")
|
self.CONF.set_override("backend_url", None, group="coordination")
|
||||||
self.CONF.set_override("store_events", False, group="notification")
|
|
||||||
self.CONF.set_override("disable_non_metric_meters", False,
|
self.CONF.set_override("disable_non_metric_meters", False,
|
||||||
group="notification")
|
group="notification")
|
||||||
self.CONF.set_override('workload_partitioning', True,
|
self.CONF.set_override('workload_partitioning', True,
|
||||||
|
@ -341,7 +341,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
|||||||
self.mgr.run()
|
self.mgr.run()
|
||||||
self.addCleanup(self.mgr.terminate)
|
self.addCleanup(self.mgr.terminate)
|
||||||
setup_polling.assert_called_once_with()
|
setup_polling.assert_called_once_with()
|
||||||
mpc.start.assert_called_once_with()
|
mpc.start.assert_called_with()
|
||||||
self.assertEqual(2, mpc.join_group.call_count)
|
self.assertEqual(2, mpc.join_group.call_count)
|
||||||
self.mgr.setup_polling_tasks.assert_called_once_with()
|
self.mgr.setup_polling_tasks.assert_called_once_with()
|
||||||
|
|
||||||
|
@ -15,7 +15,6 @@
|
|||||||
"""Tests for Ceilometer notify daemon."""
|
"""Tests for Ceilometer notify daemon."""
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
from oslo_config import cfg
|
|
||||||
from oslo_config import fixture as fixture_config
|
from oslo_config import fixture as fixture_config
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
from oslo_utils import fileutils
|
from oslo_utils import fileutils
|
||||||
@ -82,10 +81,6 @@ TEST_NOTICE_PAYLOAD = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
cfg.CONF.import_opt('store_events', 'ceilometer.notification',
|
|
||||||
group='notification')
|
|
||||||
|
|
||||||
|
|
||||||
class TestEventEndpoint(tests_base.BaseTestCase):
|
class TestEventEndpoint(tests_base.BaseTestCase):
|
||||||
|
|
||||||
def get_publisher(self, url, namespace=''):
|
def get_publisher(self, url, namespace=''):
|
||||||
@ -130,7 +125,6 @@ class TestEventEndpoint(tests_base.BaseTestCase):
|
|||||||
self.CONF = self.useFixture(fixture_config.Config()).conf
|
self.CONF = self.useFixture(fixture_config.Config()).conf
|
||||||
self.CONF([])
|
self.CONF([])
|
||||||
self.CONF.set_override("connection", "log://", group='database')
|
self.CONF.set_override("connection", "log://", group='database')
|
||||||
self.CONF.set_override("store_events", True, group="notification")
|
|
||||||
self.setup_messaging(self.CONF)
|
self.setup_messaging(self.CONF)
|
||||||
|
|
||||||
self.useFixture(mockpatch.PatchObject(publisher, 'get_publisher',
|
self.useFixture(mockpatch.PatchObject(publisher, 'get_publisher',
|
||||||
|
@ -36,7 +36,6 @@
|
|||||||
# CEILOMETER_PIPELINE_INTERVAL: Seconds between pipeline processing runs. Default 600.
|
# CEILOMETER_PIPELINE_INTERVAL: Seconds between pipeline processing runs. Default 600.
|
||||||
# CEILOMETER_BACKEND: Database backend (e.g. 'mysql', 'mongodb', 'es')
|
# CEILOMETER_BACKEND: Database backend (e.g. 'mysql', 'mongodb', 'es')
|
||||||
# CEILOMETER_COORDINATION_URL: URL for group membership service provided by tooz.
|
# CEILOMETER_COORDINATION_URL: URL for group membership service provided by tooz.
|
||||||
# CEILOMETER_EVENTS: Set to True to enable event collection
|
|
||||||
# CEILOMETER_EVENT_ALARM: Set to True to enable publisher for event alarming
|
# CEILOMETER_EVENT_ALARM: Set to True to enable publisher for event alarming
|
||||||
|
|
||||||
# Save trace setting
|
# Save trace setting
|
||||||
@ -251,9 +250,7 @@ function _ceilometer_configure_storage_backend {
|
|||||||
elif [ "$CEILOMETER_BACKEND" = 'gnocchi' ] ; then
|
elif [ "$CEILOMETER_BACKEND" = 'gnocchi' ] ; then
|
||||||
gnocchi_url=$(gnocchi_service_url)
|
gnocchi_url=$(gnocchi_service_url)
|
||||||
iniset $CEILOMETER_CONF DEFAULT meter_dispatchers gnocchi
|
iniset $CEILOMETER_CONF DEFAULT meter_dispatchers gnocchi
|
||||||
# FIXME(sileht): We shouldn't load event_dispatchers if store_event is False
|
|
||||||
iniset $CEILOMETER_CONF DEFAULT event_dispatchers ""
|
iniset $CEILOMETER_CONF DEFAULT event_dispatchers ""
|
||||||
iniset $CEILOMETER_CONF notification store_events False
|
|
||||||
# NOTE(gordc): set higher retry in case gnocchi is started after ceilometer on a slow machine
|
# NOTE(gordc): set higher retry in case gnocchi is started after ceilometer on a slow machine
|
||||||
iniset $CEILOMETER_CONF storage max_retries 20
|
iniset $CEILOMETER_CONF storage max_retries 20
|
||||||
# NOTE(gordc): set batching to better handle recording on a slow machine
|
# NOTE(gordc): set batching to better handle recording on a slow machine
|
||||||
@ -327,8 +324,6 @@ function configure_ceilometer {
|
|||||||
|
|
||||||
configure_auth_token_middleware $CEILOMETER_CONF ceilometer $CEILOMETER_AUTH_CACHE_DIR
|
configure_auth_token_middleware $CEILOMETER_CONF ceilometer $CEILOMETER_AUTH_CACHE_DIR
|
||||||
|
|
||||||
iniset $CEILOMETER_CONF notification store_events $CEILOMETER_EVENTS
|
|
||||||
|
|
||||||
# Configure storage
|
# Configure storage
|
||||||
if is_service_enabled ceilometer-collector ceilometer-api; then
|
if is_service_enabled ceilometer-collector ceilometer-api; then
|
||||||
_ceilometer_configure_storage_backend
|
_ceilometer_configure_storage_backend
|
||||||
|
@ -29,7 +29,6 @@ CEILOMETER_USE_MOD_WSGI=${CEILOMETER_USE_MOD_WSGI:-${ENABLE_HTTPD_MOD_WSGI_SERVI
|
|||||||
|
|
||||||
# To enable OSprofiler change value of this variable to "notifications,profiler"
|
# To enable OSprofiler change value of this variable to "notifications,profiler"
|
||||||
CEILOMETER_NOTIFICATION_TOPICS=${CEILOMETER_NOTIFICATION_TOPICS:-notifications}
|
CEILOMETER_NOTIFICATION_TOPICS=${CEILOMETER_NOTIFICATION_TOPICS:-notifications}
|
||||||
CEILOMETER_EVENTS=${CEILOMETER_EVENTS:-True}
|
|
||||||
|
|
||||||
CEILOMETER_COORDINATION_URL=${CEILOMETER_COORDINATION_URL:-redis://localhost:6379}
|
CEILOMETER_COORDINATION_URL=${CEILOMETER_COORDINATION_URL:-redis://localhost:6379}
|
||||||
CEILOMETER_PIPELINE_INTERVAL=${CEILOMETER_PIPELINE_INTERVAL:-}
|
CEILOMETER_PIPELINE_INTERVAL=${CEILOMETER_PIPELINE_INTERVAL:-}
|
||||||
|
@ -47,9 +47,6 @@ Gnocchi
|
|||||||
meter_dispatchers = gnocchi
|
meter_dispatchers = gnocchi
|
||||||
event_dispatchers =
|
event_dispatchers =
|
||||||
|
|
||||||
[notification]
|
|
||||||
store_events = False
|
|
||||||
|
|
||||||
[dispatcher_gnocchi]
|
[dispatcher_gnocchi]
|
||||||
filter_service_activity = False # Enable if using swift backend
|
filter_service_activity = False # Enable if using swift backend
|
||||||
filter_project = <project name associated with gnocchi user> # if using swift backend
|
filter_project = <project name associated with gnocchi user> # if using swift backend
|
||||||
|
Loading…
Reference in New Issue
Block a user