Merge "add poll history to avoid duplicate samples"

This commit is contained in:
Jenkins 2015-08-20 20:10:08 +00:00 committed by Gerrit Code Review
commit c6710ff2b5
2 changed files with 48 additions and 23 deletions

View File

@ -132,11 +132,9 @@ class PollingTask(object):
"""Polling sample and notify."""
cache = {}
discovery_cache = {}
poll_history = {}
for source_name in self.pollster_matches:
for pollster in self.pollster_matches[source_name]:
LOG.info(_("Polling pollster %(poll)s in the context of "
"%(src)s"),
dict(poll=pollster.name, src=source_name))
key = Resources.key(source_name, pollster)
candidate_res = list(
self.resources[key].get(discovery_cache))
@ -147,36 +145,27 @@ class PollingTask(object):
# Remove duplicated resources and black resources. Using
# set() requires well defined __hash__ for each resource.
# Since __eq__ is defined, 'not in' is safe here.
seen = []
duplicated = []
polling_resources = []
black_res = self.resources[key].blacklist
history = poll_history.get(pollster.name, [])
for x in candidate_res:
if x not in seen:
seen.append(x)
if x not in history:
history.append(x)
if x not in black_res:
polling_resources.append(x)
else:
duplicated.append(x)
# Warn duplicated resources for the 1st time
if self.resources[key].last_dup != duplicated:
self.resources[key].last_dup = duplicated
LOG.warning(_(
'Found following duplicated resoures for '
'%(name)s in context of %(source)s:%(list)s. '
'Check pipeline configuration.')
% ({'name': pollster.name,
'source': source_name,
'list': duplicated
}))
poll_history[pollster.name] = history
# If no resources, skip for this pollster
if not polling_resources:
LOG.info(_("Skip polling pollster %s, no resources"
" found"), pollster.name)
p_context = 'new ' if history else ''
LOG.info(_("Skip pollster %(name)s, no %(p_context)s"
"resources found this cycle"),
{'name': pollster.name, 'p_context': p_context})
continue
LOG.info(_("Polling pollster %(poll)s in the context of "
"%(src)s"),
dict(poll=pollster.name, src=source_name))
try:
samples = pollster.obj.get_samples(
manager=self.manager,

View File

@ -678,3 +678,39 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
len(p_coord.extract_my_subset.call_args_list))
for c in expected:
self.assertIn(c, p_coord.extract_my_subset.call_args_list)
@mock.patch('ceilometer.agent.base.LOG')
def test_polling_and_notify_with_resources(self, LOG):
self.setup_polling()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
polling_task.poll_and_notify()
LOG.info.assert_called_with(
'Polling pollster %(poll)s in the context of %(src)s',
{'poll': 'test', 'src': 'test_pipeline'})
@mock.patch('ceilometer.agent.base.LOG')
def test_skip_polling_and_notify_with_no_resources(self, LOG):
self.pipeline_cfg['sources'][0]['resources'] = []
self.setup_polling()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
pollster = list(polling_task.pollster_matches['test_pipeline'])[0]
polling_task.poll_and_notify()
LOG.info.assert_called_with(
'Skip pollster %(name)s, no %(p_context)sresources found this '
'cycle', {'name': pollster.name, 'p_context': ''})
@mock.patch('ceilometer.agent.base.LOG')
def test_skip_polling_polled_resources(self, LOG):
self.pipeline_cfg['sources'].append({
'name': 'test_pipeline_1',
'interval': 60,
'meters': ['test'],
'resources': ['test://'],
'sinks': ['test_sink']
})
self.setup_polling()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
polling_task.poll_and_notify()
LOG.info.assert_called_with(
'Skip pollster %(name)s, no %(p_context)sresources found this '
'cycle', {'name': 'test', 'p_context': 'new '})