separate polling code

polling is different from pipeline.

Change-Id: I5ec3ece1295181acd10c3b598523e796d23264bf
This commit is contained in:
gord chung 2017-10-06 13:53:36 +00:00
parent 50415c0d08
commit 9e58f1a6f4
5 changed files with 136 additions and 137 deletions
ceilometer

@ -46,11 +46,6 @@ OPTS = [
LOG = log.getLogger(__name__)
class PollingException(agent.ConfigException):
def __init__(self, message, cfg):
super(PollingException, self).__init__('Polling', message, cfg)
class PipelineException(agent.ConfigException):
def __init__(self, message, cfg):
super(PipelineException, self).__init__('Pipeline', message, cfg)
@ -273,52 +268,6 @@ class SampleSource(PipelineSource):
return self.is_supported(self.meters, meter_name)
class PollingSource(agent.Source):
"""Represents a source of pollsters
In effect it is a set of pollsters emitting
samples for a set of matching meters. Each source encapsulates meter name
matching, polling interval determination, optional resource enumeration or
discovery.
"""
def __init__(self, cfg):
try:
super(PollingSource, self).__init__(cfg)
except agent.SourceException as err:
raise PipelineException(err.msg, cfg)
try:
self.meters = cfg['meters']
except KeyError:
raise PipelineException("Missing meters value", cfg)
try:
self.interval = int(cfg['interval'])
except ValueError:
raise PipelineException("Invalid interval value", cfg)
except KeyError:
raise PipelineException("Missing interval value", cfg)
if self.interval <= 0:
raise PipelineException("Interval value should > 0", cfg)
self.resources = cfg.get('resources') or []
if not isinstance(self.resources, list):
raise PipelineException("Resources should be a list", cfg)
self.discovery = cfg.get('discovery') or []
if not isinstance(self.discovery, list):
raise PipelineException("Discovery should be a list", cfg)
try:
self.check_source_filtering(self.meters, 'meters')
except agent.SourceException as err:
raise PipelineException(err.msg, cfg)
def get_interval(self):
return self.interval
def support_meter(self, meter_name):
return self.is_supported(self.meters, meter_name)
class Sink(object):
"""Represents a sink for the transformation and publication of data.
@ -725,54 +674,6 @@ class PipelineManager(agent.ConfigManagerBase):
return PublishContext(self.pipelines)
class PollingManager(agent.ConfigManagerBase):
"""Polling Manager
Polling manager sets up polling according to config file.
"""
def __init__(self, conf, cfg_file):
"""Setup the polling according to config.
The configuration is supported as follows:
{"sources": [{"name": source_1,
"interval": interval_time,
"meters" : ["meter_1", "meter_2"],
"resources": ["resource_uri1", "resource_uri2"],
},
{"name": source_2,
"interval": interval_time,
"meters" : ["meter_3"],
},
]}
}
The interval determines the cadence of sample polling
Valid meter format is '*', '!meter_name', or 'meter_name'.
'*' is wildcard symbol means any meters; '!meter_name' means
"meter_name" will be excluded; 'meter_name' means 'meter_name'
will be included.
Valid meters definition is all "included meter names", all
"excluded meter names", wildcard and "excluded meter names", or
only wildcard.
The resources is list of URI indicating the resources from where
the meters should be polled. It's optional and it's up to the
specific pollster to decide how to use it.
"""
super(PollingManager, self).__init__(conf)
cfg = self.load_config(cfg_file)
self.sources = []
if 'sources' not in cfg:
raise PollingException("sources required", cfg)
for s in cfg.get('sources'):
self.sources.append(PollingSource(s))
def setup_event_pipeline(conf, transformer_manager=None):
"""Setup event pipeline manager according to yaml config file."""
default = extension.ExtensionManager('ceilometer.transformer')
@ -789,12 +690,6 @@ def setup_pipeline(conf, transformer_manager=None):
SAMPLE_TYPE)
def setup_polling(conf):
"""Setup polling manager according to yaml config file."""
cfg_file = conf.polling.cfg_file
return PollingManager(conf, cfg_file)
def get_pipeline_grouping_key(pipe):
keys = []
for transformer in pipe.sink.transformers:

@ -33,9 +33,9 @@ from six.moves.urllib import parse as urlparse
from stevedore import extension
from tooz import coordination
from ceilometer import agent
from ceilometer import keystone_client
from ceilometer import messaging
from ceilometer import pipeline
from ceilometer.polling import plugin_base
from ceilometer.publisher import utils as publisher_utils
from ceilometer import utils
@ -58,7 +58,7 @@ OPTS = [
POLLING_OPTS = [
cfg.StrOpt('cfg_file',
default="polling.yaml",
help="Configuration file for pipeline definition."
help="Configuration file for polling definition."
),
cfg.StrOpt('partitioning_group_prefix',
deprecated_group='central',
@ -77,6 +77,11 @@ class EmptyPollstersList(Exception):
super(EmptyPollstersList, self).__init__(msg)
class PollingException(agent.ConfigException):
def __init__(self, message, cfg):
super(PollingException, self).__init__('Polling', message, cfg)
class Resources(object):
def __init__(self, agent_manager):
self.agent_manager = agent_manager
@ -387,7 +392,7 @@ class AgentManager(cotyledon.Service):
def run(self):
super(AgentManager, self).run()
self.polling_manager = pipeline.setup_polling(self.conf)
self.polling_manager = setup_polling(self.conf)
if self.partition_coordinator:
self.partition_coordinator.start()
self.join_partitioning_groups()
@ -495,3 +500,103 @@ class AgentManager(cotyledon.Service):
self.polling_periodics.stop()
self.polling_periodics.wait()
self.polling_periodics = None
class PollingManager(agent.ConfigManagerBase):
"""Polling Manager
Polling manager sets up polling according to config file.
"""
def __init__(self, conf, cfg_file):
"""Setup the polling according to config.
The configuration is supported as follows:
{"sources": [{"name": source_1,
"interval": interval_time,
"meters" : ["meter_1", "meter_2"],
"resources": ["resource_uri1", "resource_uri2"],
},
{"name": source_2,
"interval": interval_time,
"meters" : ["meter_3"],
},
]}
}
The interval determines the cadence of sample polling
Valid meter format is '*', '!meter_name', or 'meter_name'.
'*' is wildcard symbol means any meters; '!meter_name' means
"meter_name" will be excluded; 'meter_name' means 'meter_name'
will be included.
Valid meters definition is all "included meter names", all
"excluded meter names", wildcard and "excluded meter names", or
only wildcard.
The resources is list of URI indicating the resources from where
the meters should be polled. It's optional and it's up to the
specific pollster to decide how to use it.
"""
super(PollingManager, self).__init__(conf)
cfg = self.load_config(cfg_file)
self.sources = []
if 'sources' not in cfg:
raise PollingException("sources required", cfg)
for s in cfg.get('sources'):
self.sources.append(PollingSource(s))
class PollingSource(agent.Source):
"""Represents a source of pollsters
In effect it is a set of pollsters emitting
samples for a set of matching meters. Each source encapsulates meter name
matching, polling interval determination, optional resource enumeration or
discovery.
"""
def __init__(self, cfg):
try:
super(PollingSource, self).__init__(cfg)
except agent.SourceException as err:
raise PollingException(err.msg, cfg)
try:
self.meters = cfg['meters']
except KeyError:
raise PollingException("Missing meters value", cfg)
try:
self.interval = int(cfg['interval'])
except ValueError:
raise PollingException("Invalid interval value", cfg)
except KeyError:
raise PollingException("Missing interval value", cfg)
if self.interval <= 0:
raise PollingException("Interval value should > 0", cfg)
self.resources = cfg.get('resources') or []
if not isinstance(self.resources, list):
raise PollingException("Resources should be a list", cfg)
self.discovery = cfg.get('discovery') or []
if not isinstance(self.discovery, list):
raise PollingException("Discovery should be a list", cfg)
try:
self.check_source_filtering(self.meters, 'meters')
except agent.SourceException as err:
raise PollingException(err.msg, cfg)
def get_interval(self):
return self.interval
def support_meter(self, meter_name):
return self.is_supported(self.meters, meter_name)
def setup_polling(conf):
"""Setup polling manager according to yaml config file."""
cfg_file = conf.polling.cfg_file
return PollingManager(conf, cfg_file)

@ -24,7 +24,7 @@ import mock
import six
from stevedore import extension
from ceilometer import pipeline
from ceilometer.polling import manager as poll_manager
from ceilometer.polling import plugin_base
from ceilometer import sample
from ceilometer import service
@ -189,7 +189,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
params = []
def setup_polling(self):
self.mgr.polling_manager = pipeline.PollingManager(
self.mgr.polling_manager = poll_manager.PollingManager(
self.CONF, self.cfg2file(self.polling_cfg))
def create_extension_list(self):
@ -240,7 +240,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
def create_manager(self):
"""Return subclass specific manager."""
@mock.patch('ceilometer.pipeline.setup_polling', mock.MagicMock())
@mock.patch('ceilometer.polling.manager.setup_polling', mock.MagicMock())
def setUp(self):
super(BaseAgentManagerTestCase, self).setUp()
self.CONF = service.prepare_service([], [])
@ -288,7 +288,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.DiscoveryAnother.resources = []
super(BaseAgentManagerTestCase, self).tearDown()
@mock.patch('ceilometer.pipeline.setup_polling')
@mock.patch('ceilometer.polling.manager.setup_polling')
def test_start(self, setup_polling):
self.mgr.setup_polling_tasks = mock.MagicMock()
self.mgr.run()

@ -23,7 +23,6 @@ from stevedore import extension
from ceilometer.compute import discovery as nova_discover
from ceilometer.hardware import discovery
from ceilometer import pipeline
from ceilometer.polling import manager
from ceilometer.polling import plugin_base
from ceilometer import service
@ -49,7 +48,7 @@ class TestManager(base.BaseTestCase):
super(TestManager, self).setUp()
self.conf = service.prepare_service([], [])
@mock.patch('ceilometer.pipeline.setup_polling', mock.MagicMock())
@mock.patch('ceilometer.polling.manager.setup_polling', mock.MagicMock())
def test_load_plugins(self):
mgr = manager.AgentManager(0, self.conf)
self.assertIsNotNone(list(mgr.extensions))
@ -266,7 +265,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
'transformers': [],
'publishers': ["test"]}]
}
self.mgr.polling_manager = pipeline.PollingManager(
self.mgr.polling_manager = manager.PollingManager(
self.CONF,
self.cfg2file(self.pipeline_cfg))
polling_tasks = self.mgr.setup_polling_tasks()
@ -311,7 +310,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
'transformers': [],
'publishers': ["test"]}]
}
self.mgr.polling_manager = pipeline.PollingManager(
self.mgr.polling_manager = manager.PollingManager(
self.CONF,
self.cfg2file(self.pipeline_cfg))
polling_tasks = self.mgr.setup_polling_tasks()
@ -335,7 +334,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
'transformers': [],
'publishers': ["test"]}]
}
self.mgr.polling_manager = pipeline.PollingManager(
self.mgr.polling_manager = manager.PollingManager(
self.CONF,
self.cfg2file(self.pipeline_cfg))
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
@ -366,7 +365,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
'transformers': [],
'publishers': ["test"]}]
}
self.mgr.polling_manager = pipeline.PollingManager(
self.mgr.polling_manager = manager.PollingManager(
self.CONF, self.cfg2file(self.polling_cfg))
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
pollster = list(polling_task.pollster_matches[source_name])[0]
@ -411,7 +410,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
'publishers': ["test"]}]
}
self.mgr.polling_manager = pipeline.PollingManager(
self.mgr.polling_manager = manager.PollingManager(
self.CONF,
self.cfg2file(pipeline_cfg))
polling_task = list(self.mgr.setup_polling_tasks().values())[0]

@ -17,7 +17,7 @@ import tempfile
from oslotest import base
import yaml
from ceilometer import pipeline
from ceilometer.polling import manager
from ceilometer import service
@ -43,60 +43,60 @@ class PollingTestCase(base.BaseTestCase):
def test_no_name(self):
del self.poll_cfg['sources'][0]['name']
self.assertRaises(pipeline.PipelineException,
pipeline.PollingManager,
self.assertRaises(manager.PollingException,
manager.PollingManager,
self.CONF, self.cfg2file(self.poll_cfg))
def test_no_interval(self):
del self.poll_cfg['sources'][0]['interval']
self.assertRaises(pipeline.PipelineException,
pipeline.PollingManager,
self.assertRaises(manager.PollingException,
manager.PollingManager,
self.CONF, self.cfg2file(self.poll_cfg))
def test_invalid_string_interval(self):
self.poll_cfg['sources'][0]['interval'] = 'string'
self.assertRaises(pipeline.PipelineException,
pipeline.PollingManager,
self.assertRaises(manager.PollingException,
manager.PollingManager,
self.CONF, self.cfg2file(self.poll_cfg))
def test_get_interval(self):
poll_manager = pipeline.PollingManager(
poll_manager = manager.PollingManager(
self.CONF, self.cfg2file(self.poll_cfg))
source = poll_manager.sources[0]
self.assertEqual(600, source.get_interval())
def test_invalid_resources(self):
self.poll_cfg['sources'][0]['resources'] = {'invalid': 1}
self.assertRaises(pipeline.PipelineException,
pipeline.PollingManager,
self.assertRaises(manager.PollingException,
manager.PollingManager,
self.CONF, self.cfg2file(self.poll_cfg))
def test_resources(self):
resources = ['test1://', 'test2://']
self.poll_cfg['sources'][0]['resources'] = resources
poll_manager = pipeline.PollingManager(
poll_manager = manager.PollingManager(
self.CONF, self.cfg2file(self.poll_cfg))
self.assertEqual(resources, poll_manager.sources[0].resources)
def test_no_resources(self):
poll_manager = pipeline.PollingManager(
poll_manager = manager.PollingManager(
self.CONF, self.cfg2file(self.poll_cfg))
self.assertEqual(0, len(poll_manager.sources[0].resources))
def test_check_meters_include_exclude_same(self):
self.poll_cfg['sources'][0]['meters'] = ['a', '!a']
self.assertRaises(pipeline.PipelineException,
pipeline.PollingManager,
self.assertRaises(manager.PollingException,
manager.PollingManager,
self.CONF, self.cfg2file(self.poll_cfg))
def test_check_meters_include_exclude(self):
self.poll_cfg['sources'][0]['meters'] = ['a', '!b']
self.assertRaises(pipeline.PipelineException,
pipeline.PollingManager,
self.assertRaises(manager.PollingException,
manager.PollingManager,
self.CONF, self.cfg2file(self.poll_cfg))
def test_check_meters_wildcard_included(self):
self.poll_cfg['sources'][0]['meters'] = ['a', '*']
self.assertRaises(pipeline.PipelineException,
pipeline.PollingManager,
self.assertRaises(manager.PollingException,
manager.PollingManager,
self.CONF, self.cfg2file(self.poll_cfg))