From 342488506f0953a6f7e534730f9fd62e69ffcff1 Mon Sep 17 00:00:00 2001 From: Rohit Jaiswal Date: Fri, 31 Jul 2015 21:57:11 +0000 Subject: [PATCH] Adds support for dynamic event pipeline Similar to how sample pipeline can be refreshed on-the-fly, this patch enables dynamic event pipeline via configuration. Closes-Bug: 1480412 Change-Id: Ia9f46f789a8c1e6a2e3e92fffcb3b6b2db29f98f --- ceilometer/agent/manager.py | 19 ++-- ceilometer/notification.py | 16 +-- ceilometer/pipeline.py | 4 + ceilometer/service_base.py | 96 +++++++++++++++--- .../tests/functional/test_notification.py | 97 ++++++++++++++++--- 5 files changed, 188 insertions(+), 44 deletions(-) diff --git a/ceilometer/agent/manager.py b/ceilometer/agent/manager.py index 274760d2..338747bb 100644 --- a/ceilometer/agent/manager.py +++ b/ceilometer/agent/manager.py @@ -436,14 +436,15 @@ class AgentManager(service_base.BaseService): self.pollster_timers = [] def reload_pipeline(self): - LOG.info(_LI("Reconfiguring polling tasks.")) + if self.pipeline_validated: + LOG.info(_LI("Reconfiguring polling tasks.")) - # stop existing pollsters and leave partitioning groups - self.stop_pollsters() - for group in self.groups: - self.partition_coordinator.leave_group(group) + # stop existing pollsters and leave partitioning groups + self.stop_pollsters() + for group in self.groups: + self.partition_coordinator.leave_group(group) - # re-create partitioning groups according to pipeline - # and configure polling tasks with latest pipeline conf - self.join_partitioning_groups() - self.pollster_timers = self.configure_polling_tasks() + # re-create partitioning groups according to pipeline + # and configure polling tasks with latest pipeline conf + self.join_partitioning_groups() + self.pollster_timers = self.configure_polling_tasks() diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 7887cabf..9baa5b40 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -127,8 +127,6 @@ class NotificationService(service_base.BaseService): def _get_event_pipeline_manager(self, transport): if cfg.CONF.notification.store_events: - self.event_pipeline_manager = pipeline.setup_event_pipeline() - if cfg.CONF.notification.workload_partitioning: event_pipe_manager = pipeline.EventPipelineTransportManager() for pipe in self.event_pipeline_manager.pipelines: @@ -144,6 +142,10 @@ class NotificationService(service_base.BaseService): super(NotificationService, self).start() self.pipeline_manager = pipeline.setup_pipeline() + + if cfg.CONF.notification.store_events: + self.event_pipeline_manager = pipeline.setup_event_pipeline() + self.transport = messaging.get_transport() if cfg.CONF.notification.workload_partitioning: @@ -272,11 +274,13 @@ class NotificationService(service_base.BaseService): def reload_pipeline(self): LOG.info(_LI("Reloading notification agent and listeners.")) - self.pipe_manager = self._get_pipe_manager( - self.transport, self.pipeline_manager) + if self.pipeline_validated: + self.pipe_manager = self._get_pipe_manager( + self.transport, self.pipeline_manager) - self.event_pipe_manager = self._get_event_pipeline_manager( - self.transport) + if self.event_pipeline_validated: + self.event_pipe_manager = self._get_event_pipeline_manager( + self.transport) # re-start the main queue listeners. utils.kill_listeners(self.listeners) diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index d82a3b7f..ad58e863 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -51,6 +51,10 @@ OPTS = [ default=False, help="Refresh Pipeline configuration on-the-fly." ), + cfg.BoolOpt('refresh_event_pipeline_cfg', + default=False, + help="Refresh Event Pipeline configuration on-the-fly." + ), cfg.IntOpt('pipeline_polling_interval', default=20, help="Polling interval for pipeline file configuration" diff --git a/ceilometer/service_base.py b/ceilometer/service_base.py index 9ba9ea12..ff40dac6 100644 --- a/ceilometer/service_base.py +++ b/ceilometer/service_base.py @@ -29,30 +29,76 @@ LOG = log.getLogger(__name__) @six.add_metaclass(abc.ABCMeta) class BaseService(os_service.Service): + def clear_pipeline_validation_status(self): + """Clears pipeline validation status flags.""" + self.pipeline_validated = False + self.event_pipeline_validated = False + def init_pipeline_refresh(self): + """Initializes pipeline refresh state.""" + + self.clear_pipeline_validation_status() if cfg.CONF.refresh_pipeline_cfg: + self.set_pipeline_mtime(pipeline.get_pipeline_mtime()) + self.set_pipeline_hash(pipeline.get_pipeline_hash()) - self.pipeline_mtime = pipeline.get_pipeline_mtime() - self.pipeline_hash = pipeline.get_pipeline_hash() + if cfg.CONF.refresh_event_pipeline_cfg: + self.set_pipeline_mtime(pipeline.get_pipeline_mtime( + pipeline.EVENT_TYPE), pipeline.EVENT_TYPE) + self.set_pipeline_hash(pipeline.get_pipeline_hash( + pipeline.EVENT_TYPE), pipeline.EVENT_TYPE) + if (cfg.CONF.refresh_pipeline_cfg or + cfg.CONF.refresh_event_pipeline_cfg): self.tg.add_timer(cfg.CONF.pipeline_polling_interval, self.refresh_pipeline) + def get_pipeline_mtime(self, p_type=pipeline.SAMPLE_TYPE): + return (self.event_pipeline_mtime if p_type == pipeline.EVENT_TYPE else + self.pipeline_mtime) + + def set_pipeline_mtime(self, mtime, p_type=pipeline.SAMPLE_TYPE): + if p_type == pipeline.EVENT_TYPE: + self.event_pipeline_mtime = mtime + else: + self.pipeline_mtime = mtime + + def get_pipeline_hash(self, p_type=pipeline.SAMPLE_TYPE): + return (self.event_pipeline_hash if p_type == pipeline.EVENT_TYPE else + self.pipeline_hash) + + def set_pipeline_hash(self, _hash, p_type=pipeline.SAMPLE_TYPE): + if p_type == pipeline.EVENT_TYPE: + self.event_pipeline_hash = _hash + else: + self.pipeline_hash = _hash + @abc.abstractmethod def reload_pipeline(self): """Reload pipeline in the agents.""" - def refresh_pipeline(self): - mtime = pipeline.get_pipeline_mtime() - if mtime > self.pipeline_mtime: + def pipeline_changed(self, p_type=pipeline.SAMPLE_TYPE): + """Returns hash of changed pipeline else False.""" + + pipeline_mtime = self.get_pipeline_mtime(p_type) + mtime = pipeline.get_pipeline_mtime(p_type) + if mtime > pipeline_mtime: LOG.info(_LI('Pipeline configuration file has been updated.')) - self.pipeline_mtime = mtime - _hash = pipeline.get_pipeline_hash() - - if _hash != self.pipeline_hash: + self.set_pipeline_mtime(mtime, p_type) + _hash = pipeline.get_pipeline_hash(p_type) + pipeline_hash = self.get_pipeline_hash(p_type) + if _hash != pipeline_hash: LOG.info(_LI("Detected change in pipeline configuration.")) + return _hash + return False + def refresh_pipeline(self): + """Refreshes appropriate pipeline, then delegates to agent.""" + + if cfg.CONF.refresh_pipeline_cfg: + pipeline_hash = self.pipeline_changed() + if pipeline_hash: try: # Pipeline in the notification agent. if hasattr(self, 'pipeline_manager'): @@ -63,13 +109,37 @@ class BaseService(os_service.Service): LOG.debug("Pipeline has been refreshed. " "old hash: %(old)s, new hash: %(new)s", {'old': self.pipeline_hash, - 'new': _hash}) + 'new': pipeline_hash}) + self.set_pipeline_hash(pipeline_hash) + self.pipeline_validated = True except Exception as err: LOG.debug("Active pipeline config's hash is %s", self.pipeline_hash) LOG.exception(_LE('Unable to load changed pipeline: %s') % err) - return - self.pipeline_hash = _hash - self.reload_pipeline() + if cfg.CONF.refresh_event_pipeline_cfg: + ev_pipeline_hash = self.pipeline_changed(pipeline.EVENT_TYPE) + if ev_pipeline_hash: + try: + # Pipeline in the notification agent. + if hasattr(self, 'event_pipeline_manager'): + self.event_pipeline_manager = (pipeline. + setup_event_pipeline()) + + LOG.debug("Event Pipeline has been refreshed. " + "old hash: %(old)s, new hash: %(new)s", + {'old': self.event_pipeline_hash, + 'new': ev_pipeline_hash}) + self.set_pipeline_hash(ev_pipeline_hash, + pipeline.EVENT_TYPE) + self.event_pipeline_validated = True + except Exception as err: + LOG.debug("Active event pipeline config's hash is %s", + self.event_pipeline_hash) + LOG.exception(_LE('Unable to load changed event pipeline:' + ' %s') % err) + + if self.pipeline_validated or self.event_pipeline_validated: + self.reload_pipeline() + self.clear_pipeline_validation_status() diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py index 5aa9c32b..31b85ad7 100644 --- a/ceilometer/tests/functional/test_notification.py +++ b/ceilometer/tests/functional/test_notification.py @@ -196,6 +196,25 @@ class BaseRealNotification(tests_base.BaseTestCase): suffix="yaml") return pipeline_cfg_file + def setup_event_pipeline(self, event_names): + ev_pipeline = yaml.dump({ + 'sources': [{ + 'name': 'test_event', + 'events': event_names, + 'sinks': ['test_sink'] + }], + 'sinks': [{ + 'name': 'test_sink', + 'publishers': ['test://'] + }] + }) + if six.PY3: + ev_pipeline = ev_pipeline.encode('utf-8') + + ev_pipeline_cfg_file = fileutils.write_to_tempfile( + content=ev_pipeline, prefix="event_pipeline", suffix="yaml") + return ev_pipeline_cfg_file + def setUp(self): super(BaseRealNotification, self).setUp() self.CONF = self.useFixture(fixture_config.Config()).conf @@ -211,23 +230,13 @@ class BaseRealNotification(tests_base.BaseTestCase): self.CONF.set_override("store_events", True, group="notification") self.CONF.set_override("disable_non_metric_meters", False, group="notification") - ev_pipeline = yaml.dump({ - 'sources': [{ - 'name': 'test_event', - 'events': ['compute.instance.*'], - 'sinks': ['test_sink'] - }], - 'sinks': [{ - 'name': 'test_sink', - 'publishers': ['test://'] - }] - }) - if six.PY3: - ev_pipeline = ev_pipeline.encode('utf-8') + + ev_pipeline_cfg_file = self.setup_event_pipeline( + ['compute.instance.*']) self.expected_events = 1 - ev_pipeline_cfg_file = fileutils.write_to_tempfile( - content=ev_pipeline, prefix="event_pipeline", suffix="yaml") - self.CONF.set_override("event_pipeline_cfg_file", ev_pipeline_cfg_file) + + self.CONF.set_override("event_pipeline_cfg_file", + ev_pipeline_cfg_file) self.CONF.set_override( "definitions_cfg_file", self.path_get('etc/ceilometer/event_definitions.yaml'), @@ -261,6 +270,7 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification): def setUp(self): super(TestRealNotificationReloadablePipeline, self).setUp() self.CONF.set_override('refresh_pipeline_cfg', True) + self.CONF.set_override('refresh_event_pipeline_cfg', True) self.CONF.set_override('pipeline_polling_interval', 1) self.srv = notification.NotificationService() @@ -328,6 +338,61 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification): (self.assertIn(sample.name, ['disk.root.size', 'vcpus']) for sample in self.publisher.samples) + @mock.patch('ceilometer.publisher.test.TestPublisher') + def test_notification_reloaded_event_pipeline(self, fake_publisher_cls): + fake_publisher_cls.return_value = self.publisher + + ev_pipeline_cfg_file = self.setup_event_pipeline( + ['compute.instance.create.start']) + self.CONF.set_override("event_pipeline_cfg_file", ev_pipeline_cfg_file) + + self.CONF.set_override("store_events", True, group="notification") + self.expected_events = 1 + self.srv.start() + + notifier = messaging.get_notifier(self.transport, + "compute.vagrant-precise") + notifier.info(context.RequestContext(), + 'compute.instance.create.start', + TEST_NOTICE_PAYLOAD) + + start = timeutils.utcnow() + while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: + if len(self.publisher.events) >= self.expected_events: + break + eventlet.sleep(0) + + self.assertEqual(self.expected_events, len(self.publisher.events)) + + # Flush publisher events to test reloading + self.publisher.events = [] + # Modify the collection targets + updated_ev_pipeline_cfg_file = self.setup_event_pipeline( + ['compute.instance.*']) + # Move/re-name the updated pipeline file to the original pipeline + # file path as recorded in oslo config + shutil.move(updated_ev_pipeline_cfg_file, ev_pipeline_cfg_file) + + self.expected_events = 1 + # Random sleep to let the pipeline poller complete the reloading + eventlet.sleep(3) + # Send message again to verify the reload works + notifier = messaging.get_notifier(self.transport, + "compute.vagrant-precise") + notifier.info(context.RequestContext(), 'compute.instance.create.end', + TEST_NOTICE_PAYLOAD) + + start = timeutils.utcnow() + while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: + if len(self.publisher.events) >= self.expected_events: + break + eventlet.sleep(0) + + self.assertEqual(self.expected_events, len(self.publisher.events)) + + self.assertEqual(self.publisher.events[0].event_type, + 'compute.instance.create.end') + class TestRealNotification(BaseRealNotification):