add poll history to avoid duplicate samples
this is a patch until we properly separate polling logic. this patch adds history so if a resource is defined in separate pipelines it will only be processed in one. by doing this, we send only a single copy of each sample and it can be processed appropriately by the notification agent this removes duplication check as it is covered by poll history and removes the pipeline references in polling. Change-Id: I9c1d3e25740fff5281d796c7bfea2f0e105bb5c5 Closes-Bug: #1480442
This commit is contained in:
@@ -132,11 +132,9 @@ class PollingTask(object):
|
||||
"""Polling sample and notify."""
|
||||
cache = {}
|
||||
discovery_cache = {}
|
||||
poll_history = {}
|
||||
for source_name in self.pollster_matches:
|
||||
for pollster in self.pollster_matches[source_name]:
|
||||
LOG.info(_("Polling pollster %(poll)s in the context of "
|
||||
"%(src)s"),
|
||||
dict(poll=pollster.name, src=source_name))
|
||||
key = Resources.key(source_name, pollster)
|
||||
candidate_res = list(
|
||||
self.resources[key].get(discovery_cache))
|
||||
@@ -147,36 +145,27 @@ class PollingTask(object):
|
||||
# Remove duplicated resources and black resources. Using
|
||||
# set() requires well defined __hash__ for each resource.
|
||||
# Since __eq__ is defined, 'not in' is safe here.
|
||||
seen = []
|
||||
duplicated = []
|
||||
polling_resources = []
|
||||
black_res = self.resources[key].blacklist
|
||||
history = poll_history.get(pollster.name, [])
|
||||
for x in candidate_res:
|
||||
if x not in seen:
|
||||
seen.append(x)
|
||||
if x not in history:
|
||||
history.append(x)
|
||||
if x not in black_res:
|
||||
polling_resources.append(x)
|
||||
else:
|
||||
duplicated.append(x)
|
||||
|
||||
# Warn duplicated resources for the 1st time
|
||||
if self.resources[key].last_dup != duplicated:
|
||||
self.resources[key].last_dup = duplicated
|
||||
LOG.warning(_(
|
||||
'Found following duplicated resoures for '
|
||||
'%(name)s in context of %(source)s:%(list)s. '
|
||||
'Check pipeline configuration.')
|
||||
% ({'name': pollster.name,
|
||||
'source': source_name,
|
||||
'list': duplicated
|
||||
}))
|
||||
poll_history[pollster.name] = history
|
||||
|
||||
# If no resources, skip for this pollster
|
||||
if not polling_resources:
|
||||
LOG.info(_("Skip polling pollster %s, no resources"
|
||||
" found"), pollster.name)
|
||||
p_context = 'new ' if history else ''
|
||||
LOG.info(_("Skip pollster %(name)s, no %(p_context)s"
|
||||
"resources found this cycle"),
|
||||
{'name': pollster.name, 'p_context': p_context})
|
||||
continue
|
||||
|
||||
LOG.info(_("Polling pollster %(poll)s in the context of "
|
||||
"%(src)s"),
|
||||
dict(poll=pollster.name, src=source_name))
|
||||
try:
|
||||
samples = pollster.obj.get_samples(
|
||||
manager=self.manager,
|
||||
|
||||
@@ -678,3 +678,39 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
len(p_coord.extract_my_subset.call_args_list))
|
||||
for c in expected:
|
||||
self.assertIn(c, p_coord.extract_my_subset.call_args_list)
|
||||
|
||||
@mock.patch('ceilometer.agent.base.LOG')
|
||||
def test_polling_and_notify_with_resources(self, LOG):
|
||||
self.setup_polling()
|
||||
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
|
||||
polling_task.poll_and_notify()
|
||||
LOG.info.assert_called_with(
|
||||
'Polling pollster %(poll)s in the context of %(src)s',
|
||||
{'poll': 'test', 'src': 'test_pipeline'})
|
||||
|
||||
@mock.patch('ceilometer.agent.base.LOG')
|
||||
def test_skip_polling_and_notify_with_no_resources(self, LOG):
|
||||
self.pipeline_cfg['sources'][0]['resources'] = []
|
||||
self.setup_polling()
|
||||
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
|
||||
pollster = list(polling_task.pollster_matches['test_pipeline'])[0]
|
||||
polling_task.poll_and_notify()
|
||||
LOG.info.assert_called_with(
|
||||
'Skip pollster %(name)s, no %(p_context)sresources found this '
|
||||
'cycle', {'name': pollster.name, 'p_context': ''})
|
||||
|
||||
@mock.patch('ceilometer.agent.base.LOG')
|
||||
def test_skip_polling_polled_resources(self, LOG):
|
||||
self.pipeline_cfg['sources'].append({
|
||||
'name': 'test_pipeline_1',
|
||||
'interval': 60,
|
||||
'meters': ['test'],
|
||||
'resources': ['test://'],
|
||||
'sinks': ['test_sink']
|
||||
})
|
||||
self.setup_polling()
|
||||
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
|
||||
polling_task.poll_and_notify()
|
||||
LOG.info.assert_called_with(
|
||||
'Skip pollster %(name)s, no %(p_context)sresources found this '
|
||||
'cycle', {'name': 'test', 'p_context': 'new '})
|
||||
|
||||
Reference in New Issue
Block a user