From 5b9e5d8e7a45badc4741aff241f887adcf2312ab Mon Sep 17 00:00:00 2001 From: Chris Dent Date: Mon, 6 Jul 2015 19:17:55 +0000 Subject: [PATCH] Pollsters now send notifications without doing transforms This makes the polling agents not use pipelines. Instead it simply sends notifications for the notification agent to pick up and transform if the pipeline.yaml says it should. Inside the AgentManager and the PollingTask the data representation is adjusted somewhat. Rather than making a single task for any given interval, we make a single task for any name in the "sources" list. This ought to mean (given that the interval is the same across various sources in the default config) that we will get some I/Ox interleaving. At the moment all samples gathered by one pollng task are sent as an individual notification. This is being done to minimize the apparent surface area of this change. The expected long term change is for single samples to be sent so as to increase granularity and I/O interleaving. The unit tests have been updated to reflect the new data representation. The agent tests are fairly strongly oriented towards testing that discovery and resource handling behave correctly. Some additions have been made to make sure that samples traverse a fake messaging bus as expected. Coverage of the ceilometer/agent/base has increased from 98 to 99%. Additional functional testing should be implemented when we have established the infrastructure for such things. Implements blueprint pollsters-no-transform DocImpact Change-Id: I25c22077e80509799713571dfd79c87fe21c8677 --- ceilometer/agent/base.py | 194 +++++++++-------- ceilometer/pipeline.py | 53 +++++ ceilometer/service_base.py | 7 +- ceilometer/tests/agent/agentbase.py | 284 ++++--------------------- ceilometer/tests/agent/test_manager.py | 133 ++++++++++-- 5 files changed, 325 insertions(+), 346 deletions(-) diff --git a/ceilometer/agent/base.py b/ceilometer/agent/base.py index bde2acd9..050c1d19 100644 --- a/ceilometer/agent/base.py +++ b/ceilometer/agent/base.py @@ -26,7 +26,7 @@ import random from oslo_config import cfg from oslo_context import context from oslo_log import log -import six +import oslo_messaging from six import moves from six.moves.urllib import parse as urlparse from stevedore import extension @@ -34,7 +34,9 @@ from stevedore import extension from ceilometer.agent import plugin_base from ceilometer import coordination from ceilometer.i18n import _, _LI -from ceilometer import pipeline as publish_pipeline +from ceilometer import messaging +from ceilometer import pipeline +from ceilometer.publisher import utils as publisher_utils from ceilometer import service_base from ceilometer import utils @@ -49,6 +51,8 @@ OPTS = [ ] cfg.CONF.register_opts(OPTS) +cfg.CONF.import_opt('telemetry_driver', 'ceilometer.publisher.messaging', + group='publisher_notifier') class PollsterListForbidden(Exception): @@ -68,9 +72,9 @@ class Resources(object): self.blacklist = [] self.last_dup = [] - def setup(self, pipeline): - self._resources = pipeline.resources - self._discovery = pipeline.discovery + def setup(self, source): + self._resources = source.resources + self._discovery = source.discovery def get(self, discovery_cache=None): source_discovery = (self.agent_manager.discover(self._discovery, @@ -91,7 +95,7 @@ class Resources(object): class PollingTask(object): - """Polling task for polling samples and inject into pipeline. + """Polling task for polling samples and notifying. A polling task can be invoked periodically or only once. """ @@ -103,92 +107,94 @@ class PollingTask(object): # with a common interval self.pollster_matches = collections.defaultdict(set) - # per-sink publisher contexts associated with each source - self.publishers = {} - # we relate the static resources and per-source discovery to # each combination of pollster and matching source resource_factory = lambda: Resources(agent_manager) self.resources = collections.defaultdict(resource_factory) - def add(self, pollster, pipeline): - if pipeline.source.name not in self.publishers: - publish_context = publish_pipeline.PublishContext( - self.manager.context) - self.publishers[pipeline.source.name] = publish_context - self.publishers[pipeline.source.name].add_pipelines([pipeline]) - self.pollster_matches[pipeline.source.name].add(pollster) - key = Resources.key(pipeline.source.name, pollster) - self.resources[key].setup(pipeline) + def add(self, pollster, source): + self.pollster_matches[source.name].add(pollster) + key = Resources.key(source.name, pollster) + self.resources[key].setup(source) - def poll_and_publish(self): - """Polling sample and publish into pipeline.""" + def poll_and_notify(self): + """Polling sample and notify.""" cache = {} discovery_cache = {} for source_name in self.pollster_matches: - with self.publishers[source_name] as publisher: - for pollster in self.pollster_matches[source_name]: - LOG.info(_("Polling pollster %(poll)s in the context of " - "%(src)s"), - dict(poll=pollster.name, src=source_name)) - key = Resources.key(source_name, pollster) - candidate_res = list( - self.resources[key].get(discovery_cache)) - if not candidate_res and pollster.obj.default_discovery: - candidate_res = self.manager.discover( - [pollster.obj.default_discovery], discovery_cache) + for pollster in self.pollster_matches[source_name]: + LOG.info(_("Polling pollster %(poll)s in the context of " + "%(src)s"), + dict(poll=pollster.name, src=source_name)) + key = Resources.key(source_name, pollster) + candidate_res = list( + self.resources[key].get(discovery_cache)) + if not candidate_res and pollster.obj.default_discovery: + candidate_res = self.manager.discover( + [pollster.obj.default_discovery], discovery_cache) - # Remove duplicated resources and black resources. Using - # set() requires well defined __hash__ for each resource. - # Since __eq__ is defined, 'not in' is safe here. - seen = [] - duplicated = [] - polling_resources = [] - black_res = self.resources[key].blacklist - for x in candidate_res: - if x not in seen: - seen.append(x) - if x not in black_res: - polling_resources.append(x) - else: - duplicated.append(x) + # Remove duplicated resources and black resources. Using + # set() requires well defined __hash__ for each resource. + # Since __eq__ is defined, 'not in' is safe here. + seen = [] + duplicated = [] + polling_resources = [] + black_res = self.resources[key].blacklist + for x in candidate_res: + if x not in seen: + seen.append(x) + if x not in black_res: + polling_resources.append(x) + else: + duplicated.append(x) - # Warn duplicated resources for the 1st time - if self.resources[key].last_dup != duplicated: - self.resources[key].last_dup = duplicated - LOG.warning(_( - 'Found following duplicated resoures for ' - '%(name)s in context of %(source)s:%(list)s. ' - 'Check pipeline configuration.') - % ({'name': pollster.name, - 'source': source_name, - 'list': duplicated - })) + # Warn duplicated resources for the 1st time + if self.resources[key].last_dup != duplicated: + self.resources[key].last_dup = duplicated + LOG.warning(_( + 'Found following duplicated resoures for ' + '%(name)s in context of %(source)s:%(list)s. ' + 'Check pipeline configuration.') + % ({'name': pollster.name, + 'source': source_name, + 'list': duplicated + })) - # If no resources, skip for this pollster - if not polling_resources: - LOG.info(_("Skip polling pollster %s, no resources" - " found"), pollster.name) - continue + # If no resources, skip for this pollster + if not polling_resources: + LOG.info(_("Skip polling pollster %s, no resources" + " found"), pollster.name) + continue - try: - samples = list(pollster.obj.get_samples( - manager=self.manager, - cache=cache, - resources=polling_resources - )) - publisher(samples) - except plugin_base.PollsterPermanentError as err: - LOG.error(_( - 'Prevent pollster %(name)s for ' - 'polling source %(source)s anymore!') - % ({'name': pollster.name, 'source': source_name})) - self.resources[key].blacklist.append(err.fail_res) - except Exception as err: - LOG.warning(_( - 'Continue after error from %(name)s: %(error)s') - % ({'name': pollster.name, 'error': err}), - exc_info=True) + try: + samples = pollster.obj.get_samples( + manager=self.manager, + cache=cache, + resources=polling_resources + ) + sample_messages = [] + for sample in samples: + sample_dict = ( + publisher_utils.meter_message_from_counter( + sample, cfg.CONF.publisher.telemetry_secret + )) + sample_messages.append(sample_dict) + self.manager.notifier.info( + self.manager.context.to_dict(), + 'telemetry.api', + sample_messages + ) + except plugin_base.PollsterPermanentError as err: + LOG.error(_( + 'Prevent pollster %(name)s for ' + 'polling source %(source)s anymore!') + % ({'name': pollster.name, 'source': source_name})) + self.resources[key].blacklist.append(err.fail_res) + except Exception as err: + LOG.warning(_( + 'Continue after error from %(name)s: %(error)s') + % ({'name': pollster.name, 'error': err}), + exc_info=True) class AgentManager(service_base.BaseService): @@ -230,6 +236,11 @@ class AgentManager(service_base.BaseService): self.group_prefix = ('%s-%s' % (namespace_prefix, group_prefix) if group_prefix else namespace_prefix) + self.notifier = oslo_messaging.Notifier( + messaging.get_transport(), + driver=cfg.CONF.publisher_notifier.telemetry_driver, + publisher_id="ceilometer.api") + @staticmethod def _extensions(category, agent_ns=None): namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns @@ -261,7 +272,7 @@ class AgentManager(service_base.BaseService): # let each set of statically-defined resources have its own group static_resource_groups = set([ self.construct_group_id(utils.hash_of_set(p.resources)) - for p in self.pipeline_manager.pipelines + for p in self.polling_manager.sources if p.resources ]) self.groups.update(static_resource_groups) @@ -274,14 +285,18 @@ class AgentManager(service_base.BaseService): def setup_polling_tasks(self): polling_tasks = {} - for pipeline in self.pipeline_manager.pipelines: + for source in self.polling_manager.sources: + polling_task = None for pollster in self.extensions: - if pipeline.support_meter(pollster.name): - polling_task = polling_tasks.get(pipeline.get_interval()) + if source.support_meter(pollster.name): if not polling_task: polling_task = self.create_polling_task() - polling_tasks[pipeline.get_interval()] = polling_task - polling_task.add(pollster, pipeline) + polling_task.add(pollster, source) + if polling_task: + polling_tasks[source.name] = { + 'task': polling_task, + 'interval': source.get_interval() + } return polling_tasks @@ -299,7 +314,10 @@ class AgentManager(service_base.BaseService): 0, cfg.CONF.shuffle_time_before_polling_task) pollster_timers = [] - for interval, task in six.iteritems(self.setup_polling_tasks()): + data = self.setup_polling_tasks() + for name, polling_task in data.items(): + interval = polling_task['interval'] + task = polling_task['task'] delay_time = (interval + delay_polling_time if delay_start else delay_polling_time) pollster_timers.append(self.tg.add_timer(interval, @@ -312,7 +330,7 @@ class AgentManager(service_base.BaseService): return pollster_timers def start(self): - self.pipeline_manager = publish_pipeline.setup_pipeline() + self.polling_manager = pipeline.setup_polling() self.partition_coordinator.start() self.join_partitioning_groups() @@ -328,7 +346,7 @@ class AgentManager(service_base.BaseService): @staticmethod def interval_task(task): - task.poll_and_publish() + task.poll_and_notify() @staticmethod def _parse_discoverer(url): diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 17f6e7db..6aff21a4 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -311,6 +311,9 @@ class SampleSource(Source): raise PipelineException("Discovery should be a list", cfg) self.check_source_filtering(self.meters, 'meters') + def get_interval(self): + return self.interval + # (yjiang5) To support meters like instance:m1.tiny, # which include variable part at the end starting with ':'. # Hope we will not add such meters in future. @@ -704,6 +707,35 @@ class PipelineManager(object): return PublishContext(context, self.pipelines) +class PollingManager(object): + """Polling Manager + + Polling manager sets up polling according to config file. + """ + + def __init__(self, cfg): + """Setup the polling according to config. + + The configuration is the sources half of the Pipeline Config. + """ + self.sources = [] + if not ('sources' in cfg and 'sinks' in cfg): + raise PipelineException("Both sources & sinks are required", + cfg) + LOG.info(_('detected decoupled pipeline config format')) + + unique_names = set() + for s in cfg.get('sources', []): + name = s.get('name') + if name in unique_names: + raise PipelineException("Duplicated source names: %s" % + name, self) + else: + unique_names.add(name) + self.sources.append(SampleSource(s)) + unique_names.clear() + + def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE): if not os.path.exists(cfg_file): cfg_file = cfg.CONF.find_file(cfg_file) @@ -723,6 +755,21 @@ def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE): ), p_type) +def _setup_polling_manager(cfg_file): + if not os.path.exists(cfg_file): + cfg_file = cfg.CONF.find_file(cfg_file) + + LOG.debug(_("Polling config file: %s"), cfg_file) + + with open(cfg_file) as fap: + data = fap.read() + + pipeline_cfg = yaml.safe_load(data) + LOG.info(_("Pipeline config: %s"), pipeline_cfg) + + return PollingManager(pipeline_cfg) + + def setup_event_pipeline(transformer_manager=None): """Setup event pipeline manager according to yaml config file.""" cfg_file = cfg.CONF.event_pipeline_cfg_file @@ -762,3 +809,9 @@ def get_pipeline_hash(p_type=SAMPLE_TYPE): file_hash = hashlib.md5(data).hexdigest() return file_hash + + +def setup_polling(): + """Setup polling manager according to yaml config file.""" + cfg_file = cfg.CONF.pipeline_cfg_file + return _setup_polling_manager(cfg_file) diff --git a/ceilometer/service_base.py b/ceilometer/service_base.py index 0da8cdf0..93d15d15 100644 --- a/ceilometer/service_base.py +++ b/ceilometer/service_base.py @@ -54,7 +54,12 @@ class BaseService(os_service.Service): LOG.info(_LI("Detected change in pipeline configuration.")) try: - self.pipeline_manager = pipeline.setup_pipeline() + # Pipeline in the notification agent. + if hasattr(self, 'pipeline_manager'): + self.pipeline_manager = pipeline.setup_pipeline() + # Polling in the polling agent. + elif hasattr(self, 'polling_manager'): + self.polling_manager = pipeline.setup_polling() LOG.debug(_("Pipeline has been refreshed. " "old hash: %(old)s, new hash: %(new)s") % ({'old': self.pipeline_hash, diff --git a/ceilometer/tests/agent/agentbase.py b/ceilometer/tests/agent/agentbase.py index f8840ecb..4fd2920f 100644 --- a/ceilometer/tests/agent/agentbase.py +++ b/ceilometer/tests/agent/agentbase.py @@ -24,18 +24,12 @@ import abc import copy import datetime -import shutil -import eventlet import mock from oslo_config import fixture as fixture_config -from oslo_service import service as os_service -from oslo_utils import fileutils -from oslo_utils import timeutils from oslotest import mockpatch import six from stevedore import extension -import yaml from ceilometer.agent import plugin_base from ceilometer import pipeline @@ -177,13 +171,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase): class DiscoveryException(TestDiscoveryException): params = [] - def setup_pipeline(self): - self.transformer_manager = extension.ExtensionManager( - 'ceilometer.transformer', - ) - self.mgr.pipeline_manager = pipeline.PipelineManager( - self.pipeline_cfg, - self.transformer_manager) + def setup_polling(self): + self.mgr.polling_manager = pipeline.PollingManager(self.pipeline_cfg) def create_extension_list(self): return [extension.Extension('test', @@ -228,7 +217,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): def create_manager(self): """Return subclass specific manager.""" - @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) + @mock.patch('ceilometer.pipeline.setup_polling', mock.MagicMock()) def setUp(self): super(BaseAgentManagerTestCase, self).setUp() self.mgr = self.create_manager() @@ -250,7 +239,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'transformers': [], 'publishers': ["test"]}] } - self.setup_pipeline() + self.setup_polling() self.CONF = self.useFixture(fixture_config.Config()).conf self.CONF.set_override( 'pipeline_cfg_file', @@ -286,21 +275,23 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.DiscoveryAnother.resources = [] super(BaseAgentManagerTestCase, self).tearDown() - @mock.patch('ceilometer.pipeline.setup_pipeline') - def test_start(self, setup_pipeline): + @mock.patch('ceilometer.pipeline.setup_polling') + def test_start(self, setup_polling): self.mgr.join_partitioning_groups = mock.MagicMock() self.mgr.setup_polling_tasks = mock.MagicMock() self.CONF.set_override('heartbeat', 1.0, group='coordination') self.mgr.start() - setup_pipeline.assert_called_once_with() + setup_polling.assert_called_once_with() self.mgr.partition_coordinator.start.assert_called_once_with() self.mgr.join_partitioning_groups.assert_called_once_with() self.mgr.setup_polling_tasks.assert_called_once_with() timer_call = mock.call(1.0, self.mgr.partition_coordinator.heartbeat) self.assertEqual([timer_call], self.mgr.tg.add_timer.call_args_list) + self.mgr.stop() + self.mgr.partition_coordinator.stop.assert_called_once_with() - @mock.patch('ceilometer.pipeline.setup_pipeline') - def test_start_with_pipeline_poller(self, setup_pipeline): + @mock.patch('ceilometer.pipeline.setup_polling') + def test_start_with_pipeline_poller(self, setup_polling): self.mgr.join_partitioning_groups = mock.MagicMock() self.mgr.setup_polling_tasks = mock.MagicMock() @@ -308,7 +299,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.CONF.set_override('refresh_pipeline_cfg', True) self.CONF.set_override('pipeline_polling_interval', 5) self.mgr.start() - setup_pipeline.assert_called_once_with() + setup_polling.assert_called_once_with() self.mgr.partition_coordinator.start.assert_called_once_with() self.mgr.join_partitioning_groups.assert_called_once_with() self.mgr.setup_polling_tasks.assert_called_once_with() @@ -317,84 +308,6 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.assertEqual([timer_call, pipeline_poller_call], self.mgr.tg.add_timer.call_args_list) - def test_start_with_reloadable_pipeline(self): - - def setup_pipeline_file(pipeline): - if six.PY3: - pipeline = pipeline.encode('utf-8') - - pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline, - prefix="pipeline", - suffix="yaml") - return pipeline_cfg_file - - self.CONF.set_override('heartbeat', 1.0, group='coordination') - self.CONF.set_override('refresh_pipeline_cfg', True) - self.CONF.set_override('pipeline_polling_interval', 2) - - pipeline = yaml.dump({ - 'sources': [{ - 'name': 'test_pipeline', - 'interval': 1, - 'meters': ['test'], - 'resources': ['test://'] if self.source_resources else [], - 'sinks': ['test_sink']}], - 'sinks': [{ - 'name': 'test_sink', - 'transformers': [], - 'publishers': ["test"]}] - }) - - pipeline_cfg_file = setup_pipeline_file(pipeline) - - self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) - self.mgr.tg = os_service.threadgroup.ThreadGroup(1000) - self.mgr.start() - pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] - self.expected_samples = 1 - start = timeutils.utcnow() - while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: - if len(pub.samples) >= self.expected_samples: - break - eventlet.sleep(0) - - del pub.samples[0].resource_metadata['resources'] - self.assertEqual(self.Pollster.test_data, pub.samples[0]) - - # Flush publisher samples to test reloading - pub.samples = [] - # Modify the collection targets - pipeline = yaml.dump({ - 'sources': [{ - 'name': 'test_pipeline', - 'interval': 1, - 'meters': ['testanother'], - 'resources': ['test://'] if self.source_resources else [], - 'sinks': ['test_sink']}], - 'sinks': [{ - 'name': 'test_sink', - 'transformers': [], - 'publishers': ["test"]}] - }) - - updated_pipeline_cfg_file = setup_pipeline_file(pipeline) - # Move/re-name the updated pipeline file to the original pipeline - # file path as recorded in oslo config - shutil.move(updated_pipeline_cfg_file, pipeline_cfg_file) - # Random sleep to let the pipeline poller complete the reloading - eventlet.sleep(3) - - pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] - self.expected_samples = 1 - start = timeutils.utcnow() - while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: - if len(pub.samples) >= self.expected_samples: - break - eventlet.sleep(0) - - del pub.samples[0].resource_metadata['resources'] - self.assertEqual(self.PollsterAnother.test_data, pub.samples[0]) - def test_join_partitioning_groups(self): self.mgr.discovery_manager = self.create_discovery_manager() self.mgr.join_partitioning_groups() @@ -411,16 +324,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase): def test_setup_polling_tasks(self): polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(1, len(polling_tasks)) - self.assertTrue(60 in polling_tasks.keys()) - per_task_resources = polling_tasks[60].resources + self.assertTrue('test_pipeline' in polling_tasks.keys()) + per_task_resources = polling_tasks['test_pipeline']['task'].resources self.assertEqual(1, len(per_task_resources)) self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']), set(per_task_resources['test_pipeline-test'].get({}))) - task = list(polling_tasks.values())[0] - self.mgr.interval_task(task) - pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] - del pub.samples[0].resource_metadata['resources'] - self.assertEqual(self.Pollster.test_data, pub.samples[0]) def test_setup_polling_tasks_multiple_interval(self): self.pipeline_cfg['sources'].append({ @@ -430,11 +338,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'resources': ['test://'] if self.source_resources else [], 'sinks': ['test_sink'] }) - self.setup_pipeline() + self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(2, len(polling_tasks)) - self.assertTrue(60 in polling_tasks.keys()) - self.assertTrue(10 in polling_tasks.keys()) + self.assertTrue('test_pipeline' in polling_tasks.keys()) + self.assertTrue('test_pipeline_1' in polling_tasks.keys()) def test_setup_polling_tasks_mismatch_counter(self): self.pipeline_cfg['sources'].append({ @@ -446,58 +354,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase): }) polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(1, len(polling_tasks)) - self.assertTrue(60 in polling_tasks.keys()) - - def test_setup_polling_task_same_interval(self): - self.pipeline_cfg['sources'].append({ - 'name': 'test_pipeline_1', - 'interval': 60, - 'meters': ['testanother'], - 'resources': ['testanother://'] if self.source_resources else [], - 'sinks': ['test_sink'] - }) - self.setup_pipeline() - polling_tasks = self.mgr.setup_polling_tasks() - self.assertEqual(1, len(polling_tasks)) - pollsters = polling_tasks.get(60).pollster_matches - self.assertEqual(2, len(pollsters)) - per_task_resources = polling_tasks[60].resources - self.assertEqual(2, len(per_task_resources)) - key = 'test_pipeline-test' - self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']), - set(per_task_resources[key].get({}))) - key = 'test_pipeline_1-testanother' - self.assertEqual(set(self.pipeline_cfg['sources'][1]['resources']), - set(per_task_resources[key].get({}))) - - def test_interval_exception_isolation(self): - self.pipeline_cfg = { - 'sources': [{ - 'name': 'test_pipeline_1', - 'interval': 10, - 'meters': ['testexceptionanother'], - 'resources': ['test://'] if self.source_resources else [], - 'sinks': ['test_sink']}, - {'name': 'test_pipeline_2', - 'interval': 10, - 'meters': ['testexception'], - 'resources': ['test://'] if self.source_resources else [], - 'sinks': ['test_sink']}], - 'sinks': [{ - 'name': 'test_sink', - 'transformers': [], - 'publishers': ["test"]}] - } - self.mgr.pipeline_manager = pipeline.PipelineManager( - self.pipeline_cfg, - self.transformer_manager) - - polling_tasks = self.mgr.setup_polling_tasks() - self.assertEqual(1, len(polling_tasks.keys())) - polling_tasks.get(10) - self.mgr.interval_task(polling_tasks.get(10)) - pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(0, len(pub.samples)) + self.assertTrue('test_pipeline' in polling_tasks.keys()) + self.assertFalse('test_pipeline_1' in polling_tasks.keys()) def test_agent_manager_start(self): mgr = self.create_manager() @@ -514,7 +372,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'meters': ['testanother'], 'sinks': ['test_sink'] }) - self.setup_pipeline() + self.setup_polling() def _verify_discovery_params(self, expected): self.assertEqual(expected, self.Discovery.params) @@ -536,9 +394,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'testdiscoverynonexistent', 'testdiscoveryexception'] self.pipeline_cfg['sources'][0]['resources'] = static_resources - self.setup_pipeline() + self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() - self.mgr.interval_task(polling_tasks.get(60)) + self.mgr.interval_task(polling_tasks['test_pipeline']['task']) if static_resources: self.assertEqual(set(static_resources + self.DiscoveryAnother.resources), @@ -578,9 +436,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.Discovery.resources = discovered_resources self.pipeline_cfg['sources'][0]['meters'].append('testanother') self.pipeline_cfg['sources'][0]['resources'] = [] - self.setup_pipeline() + self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() - self.mgr.interval_task(polling_tasks.get(60)) + self.mgr.interval_task(polling_tasks['test_pipeline']['task']) self.assertEqual(1, len(self.Discovery.params)) self.assertEqual(discovered_resources, self.Pollster.resources) self.assertEqual(discovered_resources, self.PollsterAnother.resources) @@ -596,9 +454,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'testdiscovery', 'testdiscoveryanother', 'testdiscoverynonexistent', 'testdiscoveryexception'] self.pipeline_cfg['sources'][0]['resources'] = static_resources - self.setup_pipeline() + self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() - self.mgr.interval_task(polling_tasks.get(60)) + self.mgr.interval_task(polling_tasks['test_pipeline']['task']) discovery = self.Discovery.resources + self.DiscoveryAnother.resources # compare resource lists modulo ordering self.assertEqual(set(static_resources + discovery), @@ -638,19 +496,16 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'discovery': ['testdiscoveryanother'], 'sinks': ['test_sink_new'] }) - self.pipeline_cfg['sinks'].append({ - 'name': "test_sink_new", - 'transformers': [], - 'publishers': ["new"], - }) self.mgr.discovery_manager = self.create_discovery_manager() self.Discovery.resources = ['discovered_1', 'discovered_2'] self.DiscoveryAnother.resources = ['discovered_3', 'discovered_4'] - self.setup_pipeline() + self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() - self.assertEqual(1, len(polling_tasks)) - self.assertTrue(60 in polling_tasks.keys()) - self.mgr.interval_task(polling_tasks.get(60)) + self.assertEqual(2, len(polling_tasks)) + self.assertTrue('another_pipeline' in polling_tasks.keys()) + self.assertTrue('test_pipeline' in polling_tasks.keys()) + self.mgr.interval_task(polling_tasks['another_pipeline']['task']) + self.mgr.interval_task(polling_tasks['test_pipeline']['task']) self.assertEqual([None], self.Discovery.params) self.assertEqual([None], self.DiscoveryAnother.params) self.assertEqual(2, len(self.Pollster.samples)) @@ -663,23 +518,6 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.assertEqual(test_resources, samples[1][1]) else: self.fail('unexpected sample resources %s' % samples) - all_resources = set(test_resources) - all_resources.update(another_resources) - expected_pipelines = {'test://': 'test_pipeline:test_sink', - 'another://': 'another_pipeline:test_sink_new'} - sunk_resources = [] - for pipe_line in self.mgr.pipeline_manager.pipelines: - self.assertEqual(1, len(pipe_line.publishers[0].samples)) - published = pipe_line.publishers[0].samples[0] - published_resources = published.resource_metadata['resources'] - self.assertEqual(3, len(published_resources)) - self.assertTrue(published_resources[0] in expected_pipelines) - self.assertEqual(expected_pipelines[published_resources[0]], - pipe_line.name) - for published_resource in published_resources: - self.assertTrue(published_resource in all_resources) - sunk_resources.extend(published_resources) - self.assertEqual(all_resources, set(sunk_resources)) def test_multiple_sources_different_discoverers(self): self.Discovery.resources = ['discovered_1', 'discovered_2'] @@ -702,11 +540,13 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'publishers': ['test://']}] self.pipeline_cfg = {'sources': sources, 'sinks': sinks} self.mgr.discovery_manager = self.create_discovery_manager() - self.setup_pipeline() + self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() - self.assertEqual(1, len(polling_tasks)) - self.assertTrue(60 in polling_tasks.keys()) - self.mgr.interval_task(polling_tasks.get(60)) + self.assertEqual(2, len(polling_tasks)) + self.assertTrue('test_source_1' in polling_tasks.keys()) + self.assertTrue('test_source_2' in polling_tasks.keys()) + self.mgr.interval_task(polling_tasks['test_source_1']['task']) + self.mgr.interval_task(polling_tasks['test_source_2']['task']) self.assertEqual(1, len(self.Pollster.samples)) self.assertEqual(['discovered_1', 'discovered_2'], self.Pollster.resources) @@ -729,11 +569,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'publishers': ['test://']}] self.pipeline_cfg = {'sources': sources, 'sinks': sinks} self.mgr.discovery_manager = self.create_discovery_manager() - self.setup_pipeline() + self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(1, len(polling_tasks)) - self.assertTrue(60 in polling_tasks.keys()) - self.mgr.interval_task(polling_tasks.get(60)) + self.assertTrue('test_source_1' in polling_tasks.keys()) + self.mgr.interval_task(polling_tasks['test_source_1']['task']) self.assertEqual(1, len(self.Pollster.samples)) self.assertEqual(['discovered_1', 'discovered_2'], self.Pollster.resources) @@ -745,9 +585,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'testdiscovery', 'testdiscoveryanother', 'testdiscoverynonexistent', 'testdiscoveryexception'] self.pipeline_cfg['sources'][0]['resources'] = [] - self.setup_pipeline() + self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() - self.mgr.interval_task(polling_tasks.get(60)) + self.mgr.interval_task(polling_tasks['test_pipeline']['task']) expected = [mock.call(self.mgr.construct_group_id(d.obj.group_id), d.obj.resources) for d in self.mgr.discovery_manager @@ -777,9 +617,10 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'resources': [], 'sinks': ['test_sink'] }) - self.setup_pipeline() + self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() - self.mgr.interval_task(polling_tasks.get(60)) + for meter_name in polling_tasks: + self.mgr.interval_task(polling_tasks[meter_name]['task']) # Only two groups need to be created, one for each pipeline, # even though counter test is used twice expected = [mock.call(self.mgr.construct_group_id( @@ -791,36 +632,3 @@ class BaseAgentManagerTestCase(base.BaseTestCase): len(p_coord.extract_my_subset.call_args_list)) for c in expected: self.assertIn(c, p_coord.extract_my_subset.call_args_list) - - def test_arithmetic_transformer(self): - self.pipeline_cfg['sources'][0]['meters'] = ['test', 'testanother'] - self.pipeline_cfg['sinks'][0]['transformers'] = [ - {'name': 'arithmetic', - 'parameters': { - 'target': {'name': 'test_sum', - 'unit': default_test_data.unit, - 'type': default_test_data.type, - 'expr': '$(test) * 10 + $(testanother)' - } - }} - ] - self.setup_pipeline() - self.mgr.setup_polling_tasks()[60].poll_and_publish() - samples = self.mgr.pipeline_manager.pipelines[0].publishers[0].samples - self.assertEqual(1, len(samples)) - self.assertEqual('test_sum', samples[0].name) - self.assertEqual(11, samples[0].volume) - - @mock.patch('ceilometer.agent.base.LOG') - @mock.patch('ceilometer.tests.agent.agentbase.TestPollster.get_samples') - def test_skip_polling_and_publish_with_no_resources( - self, get_samples, LOG): - self.pipeline_cfg['sources'][0]['resources'] = [] - self.setup_pipeline() - polling_task = list(self.mgr.setup_polling_tasks().values())[0] - pollster = list(polling_task.pollster_matches['test_pipeline'])[0] - - polling_task.poll_and_publish() - LOG.info.assert_called_with( - 'Skip polling pollster %s, no resources found', pollster.name) - self.assertEqual(0, get_samples._mock_call_count) diff --git a/ceilometer/tests/agent/test_manager.py b/ceilometer/tests/agent/test_manager.py index 2b2fcd29..effe5dbf 100644 --- a/ceilometer/tests/agent/test_manager.py +++ b/ceilometer/tests/agent/test_manager.py @@ -15,10 +15,18 @@ """Tests for ceilometer/central/manager.py """ +import shutil + +import eventlet import mock +from oslo_service import service as os_service +from oslo_utils import fileutils +from oslo_utils import timeutils from oslotest import base from oslotest import mockpatch +import six from stevedore import extension +import yaml from ceilometer.agent import base as agent_base from ceilometer.agent import manager @@ -33,7 +41,7 @@ class PollingException(Exception): class TestManager(base.BaseTestCase): - @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) + @mock.patch('ceilometer.pipeline.setup_polling', mock.MagicMock()) def test_load_plugins(self): mgr = manager.AgentManager() self.assertIsNotNone(list(mgr.extensions)) @@ -176,7 +184,17 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): def create_manager(): return manager.AgentManager() + def fake_notifier_sample(self, ctxt, event_type, payload): + for m in payload: + del m['message_signature'] + self.notified_samples.append(m) + def setUp(self): + self.notified_samples = [] + notifier = mock.Mock() + notifier.info.side_effect = self.fake_notifier_sample + self.useFixture(mockpatch.Patch('oslo_messaging.Notifier', + return_value=notifier)) self.source_resources = True super(TestRunTasks, self).setUp() self.useFixture(mockpatch.Patch( @@ -204,8 +222,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): def test_get_sample_resources(self): polling_tasks = self.mgr.setup_polling_tasks() - task = list(polling_tasks.values())[0] - self.mgr.interval_task(task) + self.mgr.interval_task(polling_tasks['test_pipeline']['task']) self.assertTrue(self.Pollster.resources) def test_when_keystone_fail(self): @@ -225,18 +242,12 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): 'transformers': [], 'publishers': ["test"]}] } - self.mgr.pipeline_manager = pipeline.PipelineManager( - self.pipeline_cfg, - self.transformer_manager) + self.mgr.polling_manager = pipeline.PollingManager(self.pipeline_cfg) polling_tasks = self.mgr.setup_polling_tasks() - task = list(polling_tasks.values())[0] + task = polling_tasks['test_keystone']['task'] self.mgr.interval_task(task) self.assertFalse(self.PollsterKeystone.samples) - - def test_interval_exception_isolation(self): - super(TestRunTasks, self).test_interval_exception_isolation() - self.assertEqual(1, len(self.PollsterException.samples)) - self.assertEqual(1, len(self.PollsterExceptionAnother.samples)) + self.assertFalse(self.notified_samples) @mock.patch('ceilometer.agent.base.LOG') def test_polling_exception(self, LOG): @@ -253,18 +264,102 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): 'transformers': [], 'publishers': ["test"]}] } - self.mgr.pipeline_manager = pipeline.PipelineManager( - self.pipeline_cfg, - self.transformer_manager) - polling_task = list(self.mgr.setup_polling_tasks().values())[0] + self.mgr.polling_manager = pipeline.PollingManager(self.pipeline_cfg) + polling_task = self.mgr.setup_polling_tasks()[source_name]['task'] pollster = list(polling_task.pollster_matches[source_name])[0] - # 2 samples after 4 pollings, as pollster got disabled unpon exception + # 2 samples after 4 pollings, as pollster got disabled upon exception for x in range(0, 4): self.mgr.interval_task(polling_task) - pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(2, len(pub.samples)) + samples = self.notified_samples + self.assertEqual(2, len(samples)) LOG.error.assert_called_once_with(( 'Prevent pollster %(name)s for ' 'polling source %(source)s anymore!') % ({'name': pollster.name, 'source': source_name})) + + def test_start_with_reloadable_pipeline(self): + + def setup_pipeline_file(pipeline): + if six.PY3: + pipeline = pipeline.encode('utf-8') + + pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline, + prefix="pipeline", + suffix="yaml") + return pipeline_cfg_file + + self.CONF.set_override('heartbeat', 1.0, group='coordination') + self.CONF.set_override('refresh_pipeline_cfg', True) + self.CONF.set_override('pipeline_polling_interval', 2) + + pipeline = yaml.dump({ + 'sources': [{ + 'name': 'test_pipeline', + 'interval': 1, + 'meters': ['test'], + 'resources': ['test://'] if self.source_resources else [], + 'sinks': ['test_sink']}], + 'sinks': [{ + 'name': 'test_sink', + 'transformers': [], + 'publishers': ["test"]}] + }) + + pipeline_cfg_file = setup_pipeline_file(pipeline) + + self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) + self.mgr.tg = os_service.threadgroup.ThreadGroup(1000) + self.mgr.start() + expected_samples = 1 + start = timeutils.utcnow() + while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: + if len(self.notified_samples) >= expected_samples: + break + eventlet.sleep(0) + + # we only got the old name of meters + for sample in self.notified_samples: + self.assertEqual('test', sample['counter_name']) + self.assertEqual(1, sample['counter_volume']) + self.assertEqual('test_run_tasks', sample['resource_id']) + + # Modify the collection targets + pipeline = yaml.dump({ + 'sources': [{ + 'name': 'test_pipeline', + 'interval': 1, + 'meters': ['testanother'], + 'resources': ['test://'] if self.source_resources else [], + 'sinks': ['test_sink']}], + 'sinks': [{ + 'name': 'test_sink', + 'transformers': [], + 'publishers': ["test"]}] + }) + + updated_pipeline_cfg_file = setup_pipeline_file(pipeline) + + # Move/re-name the updated pipeline file to the original pipeline + # file path as recorded in oslo config + shutil.move(updated_pipeline_cfg_file, pipeline_cfg_file) + + # Random sleep to let the pipeline poller complete the reloading + eventlet.sleep(3) + + # Flush notified samples to test only new, nothing latent on + # fake message bus. + self.notified_samples = [] + + expected_samples = 1 + start = timeutils.utcnow() + while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: + if len(self.notified_samples) >= expected_samples: + break + eventlet.sleep(0) + + # we only got the new name of meters + for sample in self.notified_samples: + self.assertEqual('testanother', sample['counter_name']) + self.assertEqual(1, sample['counter_volume']) + self.assertEqual('test_run_tasks', sample['resource_id'])