Merge "group pollsters by interval"
This commit is contained in:
commit
003047bdb5
@ -313,15 +313,11 @@ class AgentManager(service_base.BaseService):
|
||||
polling_task = None
|
||||
for pollster in self.extensions:
|
||||
if source.support_meter(pollster.name):
|
||||
polling_task = polling_tasks.get(source.get_interval())
|
||||
if not polling_task:
|
||||
polling_task = self.create_polling_task()
|
||||
polling_tasks[source.get_interval()] = polling_task
|
||||
polling_task.add(pollster, source)
|
||||
if polling_task:
|
||||
polling_tasks[source.name] = {
|
||||
'task': polling_task,
|
||||
'interval': source.get_interval()
|
||||
}
|
||||
|
||||
return polling_tasks
|
||||
|
||||
def construct_group_id(self, discovery_group_id):
|
||||
@ -339,15 +335,13 @@ class AgentManager(service_base.BaseService):
|
||||
|
||||
pollster_timers = []
|
||||
data = self.setup_polling_tasks()
|
||||
for name, polling_task in data.items():
|
||||
interval = polling_task['interval']
|
||||
task = polling_task['task']
|
||||
for interval, polling_task in data.items():
|
||||
delay_time = (interval + delay_polling_time if delay_start
|
||||
else delay_polling_time)
|
||||
pollster_timers.append(self.tg.add_timer(interval,
|
||||
self.interval_task,
|
||||
initial_delay=delay_time,
|
||||
task=task))
|
||||
task=polling_task))
|
||||
self.tg.add_timer(cfg.CONF.coordination.heartbeat,
|
||||
self.partition_coordinator.heartbeat)
|
||||
|
||||
|
@ -353,8 +353,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
def test_setup_polling_tasks(self):
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.assertEqual(1, len(polling_tasks))
|
||||
self.assertTrue('test_pipeline' in polling_tasks.keys())
|
||||
per_task_resources = polling_tasks['test_pipeline']['task'].resources
|
||||
self.assertTrue(60 in polling_tasks.keys())
|
||||
per_task_resources = polling_tasks[60].resources
|
||||
self.assertEqual(1, len(per_task_resources))
|
||||
self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']),
|
||||
set(per_task_resources['test_pipeline-test'].get({})))
|
||||
@ -370,8 +370,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
self.setup_polling()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.assertEqual(2, len(polling_tasks))
|
||||
self.assertTrue('test_pipeline' in polling_tasks.keys())
|
||||
self.assertTrue('test_pipeline_1' in polling_tasks.keys())
|
||||
self.assertTrue(60 in polling_tasks.keys())
|
||||
self.assertTrue(10 in polling_tasks.keys())
|
||||
|
||||
def test_setup_polling_tasks_mismatch_counter(self):
|
||||
self.pipeline_cfg['sources'].append({
|
||||
@ -383,8 +383,30 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
})
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.assertEqual(1, len(polling_tasks))
|
||||
self.assertTrue('test_pipeline' in polling_tasks.keys())
|
||||
self.assertFalse('test_pipeline_1' in polling_tasks.keys())
|
||||
self.assertTrue(60 in polling_tasks.keys())
|
||||
self.assertFalse(10 in polling_tasks.keys())
|
||||
|
||||
def test_setup_polling_task_same_interval(self):
|
||||
self.pipeline_cfg['sources'].append({
|
||||
'name': 'test_pipeline_1',
|
||||
'interval': 60,
|
||||
'meters': ['testanother'],
|
||||
'resources': ['testanother://'] if self.source_resources else [],
|
||||
'sinks': ['test_sink']
|
||||
})
|
||||
self.setup_polling()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.assertEqual(1, len(polling_tasks))
|
||||
pollsters = polling_tasks.get(60).pollster_matches
|
||||
self.assertEqual(2, len(pollsters))
|
||||
per_task_resources = polling_tasks[60].resources
|
||||
self.assertEqual(2, len(per_task_resources))
|
||||
key = 'test_pipeline-test'
|
||||
self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']),
|
||||
set(per_task_resources[key].get({})))
|
||||
key = 'test_pipeline_1-testanother'
|
||||
self.assertEqual(set(self.pipeline_cfg['sources'][1]['resources']),
|
||||
set(per_task_resources[key].get({})))
|
||||
|
||||
def test_agent_manager_start(self):
|
||||
mgr = self.create_manager()
|
||||
@ -425,7 +447,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
self.pipeline_cfg['sources'][0]['resources'] = static_resources
|
||||
self.setup_polling()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
|
||||
self.mgr.interval_task(polling_tasks.get(60))
|
||||
if static_resources:
|
||||
self.assertEqual(set(static_resources +
|
||||
self.DiscoveryAnother.resources),
|
||||
@ -467,7 +489,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
self.pipeline_cfg['sources'][0]['resources'] = []
|
||||
self.setup_polling()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
|
||||
self.mgr.interval_task(polling_tasks.get(60))
|
||||
self.assertEqual(1, len(self.Discovery.params))
|
||||
self.assertEqual(discovered_resources, self.Pollster.resources)
|
||||
self.assertEqual(discovered_resources, self.PollsterAnother.resources)
|
||||
@ -485,7 +507,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
self.pipeline_cfg['sources'][0]['resources'] = static_resources
|
||||
self.setup_polling()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
|
||||
self.mgr.interval_task(polling_tasks.get(60))
|
||||
discovery = self.Discovery.resources + self.DiscoveryAnother.resources
|
||||
# compare resource lists modulo ordering
|
||||
self.assertEqual(set(static_resources + discovery),
|
||||
@ -530,11 +552,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
self.DiscoveryAnother.resources = ['discovered_3', 'discovered_4']
|
||||
self.setup_polling()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.assertEqual(2, len(polling_tasks))
|
||||
self.assertTrue('another_pipeline' in polling_tasks.keys())
|
||||
self.assertTrue('test_pipeline' in polling_tasks.keys())
|
||||
self.mgr.interval_task(polling_tasks['another_pipeline']['task'])
|
||||
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
|
||||
self.assertEqual(1, len(polling_tasks))
|
||||
self.assertTrue(60 in polling_tasks.keys())
|
||||
self.mgr.interval_task(polling_tasks.get(60))
|
||||
self.assertEqual([None], self.Discovery.params)
|
||||
self.assertEqual([None], self.DiscoveryAnother.params)
|
||||
self.assertEqual(2, len(self.Pollster.samples))
|
||||
@ -571,11 +591,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
self.mgr.discovery_manager = self.create_discovery_manager()
|
||||
self.setup_polling()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.assertEqual(2, len(polling_tasks))
|
||||
self.assertTrue('test_source_1' in polling_tasks.keys())
|
||||
self.assertTrue('test_source_2' in polling_tasks.keys())
|
||||
self.mgr.interval_task(polling_tasks['test_source_1']['task'])
|
||||
self.mgr.interval_task(polling_tasks['test_source_2']['task'])
|
||||
self.assertEqual(1, len(polling_tasks))
|
||||
self.assertTrue(60 in polling_tasks.keys())
|
||||
self.mgr.interval_task(polling_tasks.get(60))
|
||||
self.assertEqual(1, len(self.Pollster.samples))
|
||||
self.assertEqual(['discovered_1', 'discovered_2'],
|
||||
self.Pollster.resources)
|
||||
@ -601,8 +619,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
self.setup_polling()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.assertEqual(1, len(polling_tasks))
|
||||
self.assertTrue('test_source_1' in polling_tasks.keys())
|
||||
self.mgr.interval_task(polling_tasks['test_source_1']['task'])
|
||||
self.assertTrue(60 in polling_tasks.keys())
|
||||
self.mgr.interval_task(polling_tasks.get(60))
|
||||
self.assertEqual(1, len(self.Pollster.samples))
|
||||
self.assertEqual(['discovered_1', 'discovered_2'],
|
||||
self.Pollster.resources)
|
||||
@ -616,7 +634,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
self.pipeline_cfg['sources'][0]['resources'] = []
|
||||
self.setup_polling()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
|
||||
self.mgr.interval_task(polling_tasks.get(60))
|
||||
expected = [mock.call(self.mgr.construct_group_id(d.obj.group_id),
|
||||
d.obj.resources)
|
||||
for d in self.mgr.discovery_manager
|
||||
@ -648,8 +666,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
})
|
||||
self.setup_polling()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
for meter_name in polling_tasks:
|
||||
self.mgr.interval_task(polling_tasks[meter_name]['task'])
|
||||
self.mgr.interval_task(polling_tasks.get(60))
|
||||
# Only two groups need to be created, one for each pipeline,
|
||||
# even though counter test is used twice
|
||||
expected = [mock.call(self.mgr.construct_group_id(
|
||||
|
@ -232,7 +232,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
|
||||
|
||||
def test_get_sample_resources(self):
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
|
||||
self.mgr.interval_task(list(polling_tasks.values())[0])
|
||||
self.assertTrue(self.Pollster.resources)
|
||||
|
||||
def test_when_keystone_fail(self):
|
||||
@ -254,8 +254,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
|
||||
}
|
||||
self.mgr.polling_manager = pipeline.PollingManager(self.pipeline_cfg)
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
task = polling_tasks['test_keystone']['task']
|
||||
self.mgr.interval_task(task)
|
||||
self.mgr.interval_task(list(polling_tasks.values())[0])
|
||||
self.assertFalse(self.PollsterKeystone.samples)
|
||||
self.assertFalse(self.notified_samples)
|
||||
|
||||
@ -275,7 +274,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
|
||||
'publishers': ["test"]}]
|
||||
}
|
||||
self.mgr.polling_manager = pipeline.PollingManager(self.pipeline_cfg)
|
||||
polling_task = self.mgr.setup_polling_tasks()[source_name]['task']
|
||||
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
|
||||
pollster = list(polling_task.pollster_matches[source_name])[0]
|
||||
|
||||
# 2 samples after 4 pollings, as pollster got disabled upon exception
|
||||
|
Loading…
Reference in New Issue
Block a user