split discover into different namespaces
Polling agent doesn't need to load all discovers, loading by need can save some memory resource usage. Each discover has a group id, for each unique group id, there is a long connection to coordinator backend. We usually polling by namespace, hence central polling agent doesn't need local group id, and compute polling agent does't need global group id if workload_partition is disabled. So loading by need can save an additional long connection. Change-Id: I1b5a8d563ff10f448a5e19fe1c11f82c13cc6fd2
This commit is contained in:
@@ -217,7 +217,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
None,
|
||||
self.PollsterExceptionAnother(), )]
|
||||
|
||||
def create_discovery_manager(self):
|
||||
def create_discoveries(self):
|
||||
return extension.ExtensionManager.make_test_instance(
|
||||
[
|
||||
extension.Extension(
|
||||
@@ -336,7 +336,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
self.mgr.tg.add_timer.call_args_list)
|
||||
|
||||
def test_join_partitioning_groups(self):
|
||||
self.mgr.discovery_manager = self.create_discovery_manager()
|
||||
self.mgr.discoveries = self.create_discoveries()
|
||||
self.mgr.join_partitioning_groups()
|
||||
p_coord = self.mgr.partition_coordinator
|
||||
static_group_ids = [utils.hash_of_set(p['resources'])
|
||||
@@ -432,7 +432,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
def _do_test_per_pollster_discovery(self, discovered_resources,
|
||||
static_resources):
|
||||
self.Pollster.discovery = 'testdiscovery'
|
||||
self.mgr.discovery_manager = self.create_discovery_manager()
|
||||
self.mgr.discoveries = self.create_discoveries()
|
||||
self.Discovery.resources = discovered_resources
|
||||
self.DiscoveryAnother.resources = [d[::-1]
|
||||
for d in discovered_resources]
|
||||
@@ -482,7 +482,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
discovered_resources = ['discovered_1', 'discovered_2']
|
||||
self.Pollster.discovery = 'testdiscovery'
|
||||
self.PollsterAnother.discovery = 'testdiscovery'
|
||||
self.mgr.discovery_manager = self.create_discovery_manager()
|
||||
self.mgr.discoveries = self.create_discoveries()
|
||||
self.Discovery.resources = discovered_resources
|
||||
self.pipeline_cfg['sources'][0]['meters'].append('testanother')
|
||||
self.pipeline_cfg['sources'][0]['resources'] = []
|
||||
@@ -496,7 +496,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
def _do_test_per_pipeline_discovery(self,
|
||||
discovered_resources,
|
||||
static_resources):
|
||||
self.mgr.discovery_manager = self.create_discovery_manager()
|
||||
self.mgr.discoveries = self.create_discoveries()
|
||||
self.Discovery.resources = discovered_resources
|
||||
self.DiscoveryAnother.resources = [d[::-1]
|
||||
for d in discovered_resources]
|
||||
@@ -546,7 +546,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
'discovery': ['testdiscoveryanother'],
|
||||
'sinks': ['test_sink_new']
|
||||
})
|
||||
self.mgr.discovery_manager = self.create_discovery_manager()
|
||||
self.mgr.discoveries = self.create_discoveries()
|
||||
self.Discovery.resources = ['discovered_1', 'discovered_2']
|
||||
self.DiscoveryAnother.resources = ['discovered_3', 'discovered_4']
|
||||
self.setup_polling()
|
||||
@@ -587,7 +587,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
'transformers': [],
|
||||
'publishers': ['test://']}]
|
||||
self.pipeline_cfg = {'sources': sources, 'sinks': sinks}
|
||||
self.mgr.discovery_manager = self.create_discovery_manager()
|
||||
self.mgr.discoveries = self.create_discoveries()
|
||||
self.setup_polling()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.assertEqual(1, len(polling_tasks))
|
||||
@@ -614,7 +614,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
'transformers': [],
|
||||
'publishers': ['test://']}]
|
||||
self.pipeline_cfg = {'sources': sources, 'sinks': sinks}
|
||||
self.mgr.discovery_manager = self.create_discovery_manager()
|
||||
self.mgr.discoveries = self.create_discoveries()
|
||||
self.setup_polling()
|
||||
polling_tasks = self.mgr.setup_polling_tasks()
|
||||
self.assertEqual(1, len(polling_tasks))
|
||||
@@ -625,7 +625,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
self.Pollster.resources)
|
||||
|
||||
def test_discovery_partitioning(self):
|
||||
self.mgr.discovery_manager = self.create_discovery_manager()
|
||||
self.mgr.discoveries = self.create_discoveries()
|
||||
p_coord = self.mgr.partition_coordinator
|
||||
self.pipeline_cfg['sources'][0]['discovery'] = [
|
||||
'testdiscovery', 'testdiscoveryanother',
|
||||
@@ -636,7 +636,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
|
||||
self.mgr.interval_task(polling_tasks.get(60))
|
||||
expected = [mock.call(self.mgr.construct_group_id(d.obj.group_id),
|
||||
d.obj.resources)
|
||||
for d in self.mgr.discovery_manager
|
||||
for d in self.mgr.discoveries
|
||||
if hasattr(d.obj, 'resources')]
|
||||
self.assertEqual(len(expected),
|
||||
len(p_coord.extract_my_subset.call_args_list))
|
||||
|
||||
Reference in New Issue
Block a user