Merge "Pollsters now send notifications without doing transforms"

This commit is contained in:
Jenkins 2015-07-22 18:43:51 +00:00 committed by Gerrit Code Review
commit ae7587a36a
5 changed files with 325 additions and 346 deletions

View File

@ -26,7 +26,7 @@ import random
from oslo_config import cfg
from oslo_context import context
from oslo_log import log
import six
import oslo_messaging
from six import moves
from six.moves.urllib import parse as urlparse
from stevedore import extension
@ -34,7 +34,9 @@ from stevedore import extension
from ceilometer.agent import plugin_base
from ceilometer import coordination
from ceilometer.i18n import _, _LI
from ceilometer import pipeline as publish_pipeline
from ceilometer import messaging
from ceilometer import pipeline
from ceilometer.publisher import utils as publisher_utils
from ceilometer import service_base
from ceilometer import utils
@ -49,6 +51,8 @@ OPTS = [
]
cfg.CONF.register_opts(OPTS)
cfg.CONF.import_opt('telemetry_driver', 'ceilometer.publisher.messaging',
group='publisher_notifier')
class PollsterListForbidden(Exception):
@ -68,9 +72,9 @@ class Resources(object):
self.blacklist = []
self.last_dup = []
def setup(self, pipeline):
self._resources = pipeline.resources
self._discovery = pipeline.discovery
def setup(self, source):
self._resources = source.resources
self._discovery = source.discovery
def get(self, discovery_cache=None):
source_discovery = (self.agent_manager.discover(self._discovery,
@ -91,7 +95,7 @@ class Resources(object):
class PollingTask(object):
"""Polling task for polling samples and inject into pipeline.
"""Polling task for polling samples and notifying.
A polling task can be invoked periodically or only once.
"""
@ -103,92 +107,94 @@ class PollingTask(object):
# with a common interval
self.pollster_matches = collections.defaultdict(set)
# per-sink publisher contexts associated with each source
self.publishers = {}
# we relate the static resources and per-source discovery to
# each combination of pollster and matching source
resource_factory = lambda: Resources(agent_manager)
self.resources = collections.defaultdict(resource_factory)
def add(self, pollster, pipeline):
if pipeline.source.name not in self.publishers:
publish_context = publish_pipeline.PublishContext(
self.manager.context)
self.publishers[pipeline.source.name] = publish_context
self.publishers[pipeline.source.name].add_pipelines([pipeline])
self.pollster_matches[pipeline.source.name].add(pollster)
key = Resources.key(pipeline.source.name, pollster)
self.resources[key].setup(pipeline)
def add(self, pollster, source):
self.pollster_matches[source.name].add(pollster)
key = Resources.key(source.name, pollster)
self.resources[key].setup(source)
def poll_and_publish(self):
"""Polling sample and publish into pipeline."""
def poll_and_notify(self):
"""Polling sample and notify."""
cache = {}
discovery_cache = {}
for source_name in self.pollster_matches:
with self.publishers[source_name] as publisher:
for pollster in self.pollster_matches[source_name]:
LOG.info(_("Polling pollster %(poll)s in the context of "
"%(src)s"),
dict(poll=pollster.name, src=source_name))
key = Resources.key(source_name, pollster)
candidate_res = list(
self.resources[key].get(discovery_cache))
if not candidate_res and pollster.obj.default_discovery:
candidate_res = self.manager.discover(
[pollster.obj.default_discovery], discovery_cache)
for pollster in self.pollster_matches[source_name]:
LOG.info(_("Polling pollster %(poll)s in the context of "
"%(src)s"),
dict(poll=pollster.name, src=source_name))
key = Resources.key(source_name, pollster)
candidate_res = list(
self.resources[key].get(discovery_cache))
if not candidate_res and pollster.obj.default_discovery:
candidate_res = self.manager.discover(
[pollster.obj.default_discovery], discovery_cache)
# Remove duplicated resources and black resources. Using
# set() requires well defined __hash__ for each resource.
# Since __eq__ is defined, 'not in' is safe here.
seen = []
duplicated = []
polling_resources = []
black_res = self.resources[key].blacklist
for x in candidate_res:
if x not in seen:
seen.append(x)
if x not in black_res:
polling_resources.append(x)
else:
duplicated.append(x)
# Remove duplicated resources and black resources. Using
# set() requires well defined __hash__ for each resource.
# Since __eq__ is defined, 'not in' is safe here.
seen = []
duplicated = []
polling_resources = []
black_res = self.resources[key].blacklist
for x in candidate_res:
if x not in seen:
seen.append(x)
if x not in black_res:
polling_resources.append(x)
else:
duplicated.append(x)
# Warn duplicated resources for the 1st time
if self.resources[key].last_dup != duplicated:
self.resources[key].last_dup = duplicated
LOG.warning(_(
'Found following duplicated resoures for '
'%(name)s in context of %(source)s:%(list)s. '
'Check pipeline configuration.')
% ({'name': pollster.name,
'source': source_name,
'list': duplicated
}))
# Warn duplicated resources for the 1st time
if self.resources[key].last_dup != duplicated:
self.resources[key].last_dup = duplicated
LOG.warning(_(
'Found following duplicated resoures for '
'%(name)s in context of %(source)s:%(list)s. '
'Check pipeline configuration.')
% ({'name': pollster.name,
'source': source_name,
'list': duplicated
}))
# If no resources, skip for this pollster
if not polling_resources:
LOG.info(_("Skip polling pollster %s, no resources"
" found"), pollster.name)
continue
# If no resources, skip for this pollster
if not polling_resources:
LOG.info(_("Skip polling pollster %s, no resources"
" found"), pollster.name)
continue
try:
samples = list(pollster.obj.get_samples(
manager=self.manager,
cache=cache,
resources=polling_resources
))
publisher(samples)
except plugin_base.PollsterPermanentError as err:
LOG.error(_(
'Prevent pollster %(name)s for '
'polling source %(source)s anymore!')
% ({'name': pollster.name, 'source': source_name}))
self.resources[key].blacklist.append(err.fail_res)
except Exception as err:
LOG.warning(_(
'Continue after error from %(name)s: %(error)s')
% ({'name': pollster.name, 'error': err}),
exc_info=True)
try:
samples = pollster.obj.get_samples(
manager=self.manager,
cache=cache,
resources=polling_resources
)
sample_messages = []
for sample in samples:
sample_dict = (
publisher_utils.meter_message_from_counter(
sample, cfg.CONF.publisher.telemetry_secret
))
sample_messages.append(sample_dict)
self.manager.notifier.info(
self.manager.context.to_dict(),
'telemetry.api',
sample_messages
)
except plugin_base.PollsterPermanentError as err:
LOG.error(_(
'Prevent pollster %(name)s for '
'polling source %(source)s anymore!')
% ({'name': pollster.name, 'source': source_name}))
self.resources[key].blacklist.append(err.fail_res)
except Exception as err:
LOG.warning(_(
'Continue after error from %(name)s: %(error)s')
% ({'name': pollster.name, 'error': err}),
exc_info=True)
class AgentManager(service_base.BaseService):
@ -230,6 +236,11 @@ class AgentManager(service_base.BaseService):
self.group_prefix = ('%s-%s' % (namespace_prefix, group_prefix)
if group_prefix else namespace_prefix)
self.notifier = oslo_messaging.Notifier(
messaging.get_transport(),
driver=cfg.CONF.publisher_notifier.telemetry_driver,
publisher_id="ceilometer.api")
@staticmethod
def _extensions(category, agent_ns=None):
namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns
@ -261,7 +272,7 @@ class AgentManager(service_base.BaseService):
# let each set of statically-defined resources have its own group
static_resource_groups = set([
self.construct_group_id(utils.hash_of_set(p.resources))
for p in self.pipeline_manager.pipelines
for p in self.polling_manager.sources
if p.resources
])
self.groups.update(static_resource_groups)
@ -274,14 +285,18 @@ class AgentManager(service_base.BaseService):
def setup_polling_tasks(self):
polling_tasks = {}
for pipeline in self.pipeline_manager.pipelines:
for source in self.polling_manager.sources:
polling_task = None
for pollster in self.extensions:
if pipeline.support_meter(pollster.name):
polling_task = polling_tasks.get(pipeline.get_interval())
if source.support_meter(pollster.name):
if not polling_task:
polling_task = self.create_polling_task()
polling_tasks[pipeline.get_interval()] = polling_task
polling_task.add(pollster, pipeline)
polling_task.add(pollster, source)
if polling_task:
polling_tasks[source.name] = {
'task': polling_task,
'interval': source.get_interval()
}
return polling_tasks
@ -299,7 +314,10 @@ class AgentManager(service_base.BaseService):
0, cfg.CONF.shuffle_time_before_polling_task)
pollster_timers = []
for interval, task in six.iteritems(self.setup_polling_tasks()):
data = self.setup_polling_tasks()
for name, polling_task in data.items():
interval = polling_task['interval']
task = polling_task['task']
delay_time = (interval + delay_polling_time if delay_start
else delay_polling_time)
pollster_timers.append(self.tg.add_timer(interval,
@ -312,7 +330,7 @@ class AgentManager(service_base.BaseService):
return pollster_timers
def start(self):
self.pipeline_manager = publish_pipeline.setup_pipeline()
self.polling_manager = pipeline.setup_polling()
self.partition_coordinator.start()
self.join_partitioning_groups()
@ -328,7 +346,7 @@ class AgentManager(service_base.BaseService):
@staticmethod
def interval_task(task):
task.poll_and_publish()
task.poll_and_notify()
@staticmethod
def _parse_discoverer(url):

View File

@ -311,6 +311,9 @@ class SampleSource(Source):
raise PipelineException("Discovery should be a list", cfg)
self.check_source_filtering(self.meters, 'meters')
def get_interval(self):
return self.interval
# (yjiang5) To support meters like instance:m1.tiny,
# which include variable part at the end starting with ':'.
# Hope we will not add such meters in future.
@ -704,6 +707,35 @@ class PipelineManager(object):
return PublishContext(context, self.pipelines)
class PollingManager(object):
"""Polling Manager
Polling manager sets up polling according to config file.
"""
def __init__(self, cfg):
"""Setup the polling according to config.
The configuration is the sources half of the Pipeline Config.
"""
self.sources = []
if not ('sources' in cfg and 'sinks' in cfg):
raise PipelineException("Both sources & sinks are required",
cfg)
LOG.info(_('detected decoupled pipeline config format'))
unique_names = set()
for s in cfg.get('sources', []):
name = s.get('name')
if name in unique_names:
raise PipelineException("Duplicated source names: %s" %
name, self)
else:
unique_names.add(name)
self.sources.append(SampleSource(s))
unique_names.clear()
def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE):
if not os.path.exists(cfg_file):
cfg_file = cfg.CONF.find_file(cfg_file)
@ -723,6 +755,21 @@ def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE):
), p_type)
def _setup_polling_manager(cfg_file):
if not os.path.exists(cfg_file):
cfg_file = cfg.CONF.find_file(cfg_file)
LOG.debug(_("Polling config file: %s"), cfg_file)
with open(cfg_file) as fap:
data = fap.read()
pipeline_cfg = yaml.safe_load(data)
LOG.info(_("Pipeline config: %s"), pipeline_cfg)
return PollingManager(pipeline_cfg)
def setup_event_pipeline(transformer_manager=None):
"""Setup event pipeline manager according to yaml config file."""
cfg_file = cfg.CONF.event_pipeline_cfg_file
@ -762,3 +809,9 @@ def get_pipeline_hash(p_type=SAMPLE_TYPE):
file_hash = hashlib.md5(data).hexdigest()
return file_hash
def setup_polling():
"""Setup polling manager according to yaml config file."""
cfg_file = cfg.CONF.pipeline_cfg_file
return _setup_polling_manager(cfg_file)

View File

@ -54,7 +54,12 @@ class BaseService(os_service.Service):
LOG.info(_LI("Detected change in pipeline configuration."))
try:
self.pipeline_manager = pipeline.setup_pipeline()
# Pipeline in the notification agent.
if hasattr(self, 'pipeline_manager'):
self.pipeline_manager = pipeline.setup_pipeline()
# Polling in the polling agent.
elif hasattr(self, 'polling_manager'):
self.polling_manager = pipeline.setup_polling()
LOG.debug(_("Pipeline has been refreshed. "
"old hash: %(old)s, new hash: %(new)s") %
({'old': self.pipeline_hash,

View File

@ -24,18 +24,12 @@
import abc
import copy
import datetime
import shutil
import eventlet
import mock
from oslo_config import fixture as fixture_config
from oslo_service import service as os_service
from oslo_utils import fileutils
from oslo_utils import timeutils
from oslotest import mockpatch
import six
from stevedore import extension
import yaml
from ceilometer.agent import plugin_base
from ceilometer import pipeline
@ -177,13 +171,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
class DiscoveryException(TestDiscoveryException):
params = []
def setup_pipeline(self):
self.transformer_manager = extension.ExtensionManager(
'ceilometer.transformer',
)
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.transformer_manager)
def setup_polling(self):
self.mgr.polling_manager = pipeline.PollingManager(self.pipeline_cfg)
def create_extension_list(self):
return [extension.Extension('test',
@ -228,7 +217,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
def create_manager(self):
"""Return subclass specific manager."""
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
@mock.patch('ceilometer.pipeline.setup_polling', mock.MagicMock())
def setUp(self):
super(BaseAgentManagerTestCase, self).setUp()
self.mgr = self.create_manager()
@ -250,7 +239,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'transformers': [],
'publishers': ["test"]}]
}
self.setup_pipeline()
self.setup_polling()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.CONF.set_override(
'pipeline_cfg_file',
@ -286,21 +275,23 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.DiscoveryAnother.resources = []
super(BaseAgentManagerTestCase, self).tearDown()
@mock.patch('ceilometer.pipeline.setup_pipeline')
def test_start(self, setup_pipeline):
@mock.patch('ceilometer.pipeline.setup_polling')
def test_start(self, setup_polling):
self.mgr.join_partitioning_groups = mock.MagicMock()
self.mgr.setup_polling_tasks = mock.MagicMock()
self.CONF.set_override('heartbeat', 1.0, group='coordination')
self.mgr.start()
setup_pipeline.assert_called_once_with()
setup_polling.assert_called_once_with()
self.mgr.partition_coordinator.start.assert_called_once_with()
self.mgr.join_partitioning_groups.assert_called_once_with()
self.mgr.setup_polling_tasks.assert_called_once_with()
timer_call = mock.call(1.0, self.mgr.partition_coordinator.heartbeat)
self.assertEqual([timer_call], self.mgr.tg.add_timer.call_args_list)
self.mgr.stop()
self.mgr.partition_coordinator.stop.assert_called_once_with()
@mock.patch('ceilometer.pipeline.setup_pipeline')
def test_start_with_pipeline_poller(self, setup_pipeline):
@mock.patch('ceilometer.pipeline.setup_polling')
def test_start_with_pipeline_poller(self, setup_polling):
self.mgr.join_partitioning_groups = mock.MagicMock()
self.mgr.setup_polling_tasks = mock.MagicMock()
@ -308,7 +299,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.CONF.set_override('refresh_pipeline_cfg', True)
self.CONF.set_override('pipeline_polling_interval', 5)
self.mgr.start()
setup_pipeline.assert_called_once_with()
setup_polling.assert_called_once_with()
self.mgr.partition_coordinator.start.assert_called_once_with()
self.mgr.join_partitioning_groups.assert_called_once_with()
self.mgr.setup_polling_tasks.assert_called_once_with()
@ -317,84 +308,6 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertEqual([timer_call, pipeline_poller_call],
self.mgr.tg.add_timer.call_args_list)
def test_start_with_reloadable_pipeline(self):
def setup_pipeline_file(pipeline):
if six.PY3:
pipeline = pipeline.encode('utf-8')
pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline,
prefix="pipeline",
suffix="yaml")
return pipeline_cfg_file
self.CONF.set_override('heartbeat', 1.0, group='coordination')
self.CONF.set_override('refresh_pipeline_cfg', True)
self.CONF.set_override('pipeline_polling_interval', 2)
pipeline = yaml.dump({
'sources': [{
'name': 'test_pipeline',
'interval': 1,
'meters': ['test'],
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
})
pipeline_cfg_file = setup_pipeline_file(pipeline)
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.mgr.tg = os_service.threadgroup.ThreadGroup(1000)
self.mgr.start()
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
self.expected_samples = 1
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if len(pub.samples) >= self.expected_samples:
break
eventlet.sleep(0)
del pub.samples[0].resource_metadata['resources']
self.assertEqual(self.Pollster.test_data, pub.samples[0])
# Flush publisher samples to test reloading
pub.samples = []
# Modify the collection targets
pipeline = yaml.dump({
'sources': [{
'name': 'test_pipeline',
'interval': 1,
'meters': ['testanother'],
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
})
updated_pipeline_cfg_file = setup_pipeline_file(pipeline)
# Move/re-name the updated pipeline file to the original pipeline
# file path as recorded in oslo config
shutil.move(updated_pipeline_cfg_file, pipeline_cfg_file)
# Random sleep to let the pipeline poller complete the reloading
eventlet.sleep(3)
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
self.expected_samples = 1
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if len(pub.samples) >= self.expected_samples:
break
eventlet.sleep(0)
del pub.samples[0].resource_metadata['resources']
self.assertEqual(self.PollsterAnother.test_data, pub.samples[0])
def test_join_partitioning_groups(self):
self.mgr.discovery_manager = self.create_discovery_manager()
self.mgr.join_partitioning_groups()
@ -411,16 +324,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
def test_setup_polling_tasks(self):
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertTrue(60 in polling_tasks.keys())
per_task_resources = polling_tasks[60].resources
self.assertTrue('test_pipeline' in polling_tasks.keys())
per_task_resources = polling_tasks['test_pipeline']['task'].resources
self.assertEqual(1, len(per_task_resources))
self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']),
set(per_task_resources['test_pipeline-test'].get({})))
task = list(polling_tasks.values())[0]
self.mgr.interval_task(task)
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
del pub.samples[0].resource_metadata['resources']
self.assertEqual(self.Pollster.test_data, pub.samples[0])
def test_setup_polling_tasks_multiple_interval(self):
self.pipeline_cfg['sources'].append({
@ -430,11 +338,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']
})
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(2, len(polling_tasks))
self.assertTrue(60 in polling_tasks.keys())
self.assertTrue(10 in polling_tasks.keys())
self.assertTrue('test_pipeline' in polling_tasks.keys())
self.assertTrue('test_pipeline_1' in polling_tasks.keys())
def test_setup_polling_tasks_mismatch_counter(self):
self.pipeline_cfg['sources'].append({
@ -446,58 +354,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
})
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertTrue(60 in polling_tasks.keys())
def test_setup_polling_task_same_interval(self):
self.pipeline_cfg['sources'].append({
'name': 'test_pipeline_1',
'interval': 60,
'meters': ['testanother'],
'resources': ['testanother://'] if self.source_resources else [],
'sinks': ['test_sink']
})
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
pollsters = polling_tasks.get(60).pollster_matches
self.assertEqual(2, len(pollsters))
per_task_resources = polling_tasks[60].resources
self.assertEqual(2, len(per_task_resources))
key = 'test_pipeline-test'
self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']),
set(per_task_resources[key].get({})))
key = 'test_pipeline_1-testanother'
self.assertEqual(set(self.pipeline_cfg['sources'][1]['resources']),
set(per_task_resources[key].get({})))
def test_interval_exception_isolation(self):
self.pipeline_cfg = {
'sources': [{
'name': 'test_pipeline_1',
'interval': 10,
'meters': ['testexceptionanother'],
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']},
{'name': 'test_pipeline_2',
'interval': 10,
'meters': ['testexception'],
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
}
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.transformer_manager)
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks.keys()))
polling_tasks.get(10)
self.mgr.interval_task(polling_tasks.get(10))
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(0, len(pub.samples))
self.assertTrue('test_pipeline' in polling_tasks.keys())
self.assertFalse('test_pipeline_1' in polling_tasks.keys())
def test_agent_manager_start(self):
mgr = self.create_manager()
@ -514,7 +372,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'meters': ['testanother'],
'sinks': ['test_sink']
})
self.setup_pipeline()
self.setup_polling()
def _verify_discovery_params(self, expected):
self.assertEqual(expected, self.Discovery.params)
@ -536,9 +394,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'testdiscoverynonexistent',
'testdiscoveryexception']
self.pipeline_cfg['sources'][0]['resources'] = static_resources
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
if static_resources:
self.assertEqual(set(static_resources +
self.DiscoveryAnother.resources),
@ -578,9 +436,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.Discovery.resources = discovered_resources
self.pipeline_cfg['sources'][0]['meters'].append('testanother')
self.pipeline_cfg['sources'][0]['resources'] = []
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
self.assertEqual(1, len(self.Discovery.params))
self.assertEqual(discovered_resources, self.Pollster.resources)
self.assertEqual(discovered_resources, self.PollsterAnother.resources)
@ -596,9 +454,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'testdiscovery', 'testdiscoveryanother',
'testdiscoverynonexistent', 'testdiscoveryexception']
self.pipeline_cfg['sources'][0]['resources'] = static_resources
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
discovery = self.Discovery.resources + self.DiscoveryAnother.resources
# compare resource lists modulo ordering
self.assertEqual(set(static_resources + discovery),
@ -638,19 +496,16 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'discovery': ['testdiscoveryanother'],
'sinks': ['test_sink_new']
})
self.pipeline_cfg['sinks'].append({
'name': "test_sink_new",
'transformers': [],
'publishers': ["new"],
})
self.mgr.discovery_manager = self.create_discovery_manager()
self.Discovery.resources = ['discovered_1', 'discovered_2']
self.DiscoveryAnother.resources = ['discovered_3', 'discovered_4']
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertTrue(60 in polling_tasks.keys())
self.mgr.interval_task(polling_tasks.get(60))
self.assertEqual(2, len(polling_tasks))
self.assertTrue('another_pipeline' in polling_tasks.keys())
self.assertTrue('test_pipeline' in polling_tasks.keys())
self.mgr.interval_task(polling_tasks['another_pipeline']['task'])
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
self.assertEqual([None], self.Discovery.params)
self.assertEqual([None], self.DiscoveryAnother.params)
self.assertEqual(2, len(self.Pollster.samples))
@ -663,23 +518,6 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertEqual(test_resources, samples[1][1])
else:
self.fail('unexpected sample resources %s' % samples)
all_resources = set(test_resources)
all_resources.update(another_resources)
expected_pipelines = {'test://': 'test_pipeline:test_sink',
'another://': 'another_pipeline:test_sink_new'}
sunk_resources = []
for pipe_line in self.mgr.pipeline_manager.pipelines:
self.assertEqual(1, len(pipe_line.publishers[0].samples))
published = pipe_line.publishers[0].samples[0]
published_resources = published.resource_metadata['resources']
self.assertEqual(3, len(published_resources))
self.assertTrue(published_resources[0] in expected_pipelines)
self.assertEqual(expected_pipelines[published_resources[0]],
pipe_line.name)
for published_resource in published_resources:
self.assertTrue(published_resource in all_resources)
sunk_resources.extend(published_resources)
self.assertEqual(all_resources, set(sunk_resources))
def test_multiple_sources_different_discoverers(self):
self.Discovery.resources = ['discovered_1', 'discovered_2']
@ -702,11 +540,13 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'publishers': ['test://']}]
self.pipeline_cfg = {'sources': sources, 'sinks': sinks}
self.mgr.discovery_manager = self.create_discovery_manager()
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertTrue(60 in polling_tasks.keys())
self.mgr.interval_task(polling_tasks.get(60))
self.assertEqual(2, len(polling_tasks))
self.assertTrue('test_source_1' in polling_tasks.keys())
self.assertTrue('test_source_2' in polling_tasks.keys())
self.mgr.interval_task(polling_tasks['test_source_1']['task'])
self.mgr.interval_task(polling_tasks['test_source_2']['task'])
self.assertEqual(1, len(self.Pollster.samples))
self.assertEqual(['discovered_1', 'discovered_2'],
self.Pollster.resources)
@ -729,11 +569,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'publishers': ['test://']}]
self.pipeline_cfg = {'sources': sources, 'sinks': sinks}
self.mgr.discovery_manager = self.create_discovery_manager()
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertTrue(60 in polling_tasks.keys())
self.mgr.interval_task(polling_tasks.get(60))
self.assertTrue('test_source_1' in polling_tasks.keys())
self.mgr.interval_task(polling_tasks['test_source_1']['task'])
self.assertEqual(1, len(self.Pollster.samples))
self.assertEqual(['discovered_1', 'discovered_2'],
self.Pollster.resources)
@ -745,9 +585,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'testdiscovery', 'testdiscoveryanother',
'testdiscoverynonexistent', 'testdiscoveryexception']
self.pipeline_cfg['sources'][0]['resources'] = []
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
expected = [mock.call(self.mgr.construct_group_id(d.obj.group_id),
d.obj.resources)
for d in self.mgr.discovery_manager
@ -777,9 +617,10 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'resources': [],
'sinks': ['test_sink']
})
self.setup_pipeline()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
for meter_name in polling_tasks:
self.mgr.interval_task(polling_tasks[meter_name]['task'])
# Only two groups need to be created, one for each pipeline,
# even though counter test is used twice
expected = [mock.call(self.mgr.construct_group_id(
@ -791,36 +632,3 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
len(p_coord.extract_my_subset.call_args_list))
for c in expected:
self.assertIn(c, p_coord.extract_my_subset.call_args_list)
def test_arithmetic_transformer(self):
self.pipeline_cfg['sources'][0]['meters'] = ['test', 'testanother']
self.pipeline_cfg['sinks'][0]['transformers'] = [
{'name': 'arithmetic',
'parameters': {
'target': {'name': 'test_sum',
'unit': default_test_data.unit,
'type': default_test_data.type,
'expr': '$(test) * 10 + $(testanother)'
}
}}
]
self.setup_pipeline()
self.mgr.setup_polling_tasks()[60].poll_and_publish()
samples = self.mgr.pipeline_manager.pipelines[0].publishers[0].samples
self.assertEqual(1, len(samples))
self.assertEqual('test_sum', samples[0].name)
self.assertEqual(11, samples[0].volume)
@mock.patch('ceilometer.agent.base.LOG')
@mock.patch('ceilometer.tests.agent.agentbase.TestPollster.get_samples')
def test_skip_polling_and_publish_with_no_resources(
self, get_samples, LOG):
self.pipeline_cfg['sources'][0]['resources'] = []
self.setup_pipeline()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
pollster = list(polling_task.pollster_matches['test_pipeline'])[0]
polling_task.poll_and_publish()
LOG.info.assert_called_with(
'Skip polling pollster %s, no resources found', pollster.name)
self.assertEqual(0, get_samples._mock_call_count)

View File

@ -15,10 +15,18 @@
"""Tests for ceilometer/central/manager.py
"""
import shutil
import eventlet
import mock
from oslo_service import service as os_service
from oslo_utils import fileutils
from oslo_utils import timeutils
from oslotest import base
from oslotest import mockpatch
import six
from stevedore import extension
import yaml
from ceilometer.agent import base as agent_base
from ceilometer.agent import manager
@ -33,7 +41,7 @@ class PollingException(Exception):
class TestManager(base.BaseTestCase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
@mock.patch('ceilometer.pipeline.setup_polling', mock.MagicMock())
def test_load_plugins(self):
mgr = manager.AgentManager()
self.assertIsNotNone(list(mgr.extensions))
@ -176,7 +184,17 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
def create_manager():
return manager.AgentManager()
def fake_notifier_sample(self, ctxt, event_type, payload):
for m in payload:
del m['message_signature']
self.notified_samples.append(m)
def setUp(self):
self.notified_samples = []
notifier = mock.Mock()
notifier.info.side_effect = self.fake_notifier_sample
self.useFixture(mockpatch.Patch('oslo_messaging.Notifier',
return_value=notifier))
self.source_resources = True
super(TestRunTasks, self).setUp()
self.useFixture(mockpatch.Patch(
@ -204,8 +222,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
def test_get_sample_resources(self):
polling_tasks = self.mgr.setup_polling_tasks()
task = list(polling_tasks.values())[0]
self.mgr.interval_task(task)
self.mgr.interval_task(polling_tasks['test_pipeline']['task'])
self.assertTrue(self.Pollster.resources)
def test_when_keystone_fail(self):
@ -225,18 +242,12 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
'transformers': [],
'publishers': ["test"]}]
}
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.transformer_manager)
self.mgr.polling_manager = pipeline.PollingManager(self.pipeline_cfg)
polling_tasks = self.mgr.setup_polling_tasks()
task = list(polling_tasks.values())[0]
task = polling_tasks['test_keystone']['task']
self.mgr.interval_task(task)
self.assertFalse(self.PollsterKeystone.samples)
def test_interval_exception_isolation(self):
super(TestRunTasks, self).test_interval_exception_isolation()
self.assertEqual(1, len(self.PollsterException.samples))
self.assertEqual(1, len(self.PollsterExceptionAnother.samples))
self.assertFalse(self.notified_samples)
@mock.patch('ceilometer.agent.base.LOG')
def test_polling_exception(self, LOG):
@ -253,18 +264,102 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
'transformers': [],
'publishers': ["test"]}]
}
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.transformer_manager)
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
self.mgr.polling_manager = pipeline.PollingManager(self.pipeline_cfg)
polling_task = self.mgr.setup_polling_tasks()[source_name]['task']
pollster = list(polling_task.pollster_matches[source_name])[0]
# 2 samples after 4 pollings, as pollster got disabled unpon exception
# 2 samples after 4 pollings, as pollster got disabled upon exception
for x in range(0, 4):
self.mgr.interval_task(polling_task)
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(2, len(pub.samples))
samples = self.notified_samples
self.assertEqual(2, len(samples))
LOG.error.assert_called_once_with((
'Prevent pollster %(name)s for '
'polling source %(source)s anymore!')
% ({'name': pollster.name, 'source': source_name}))
def test_start_with_reloadable_pipeline(self):
def setup_pipeline_file(pipeline):
if six.PY3:
pipeline = pipeline.encode('utf-8')
pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline,
prefix="pipeline",
suffix="yaml")
return pipeline_cfg_file
self.CONF.set_override('heartbeat', 1.0, group='coordination')
self.CONF.set_override('refresh_pipeline_cfg', True)
self.CONF.set_override('pipeline_polling_interval', 2)
pipeline = yaml.dump({
'sources': [{
'name': 'test_pipeline',
'interval': 1,
'meters': ['test'],
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
})
pipeline_cfg_file = setup_pipeline_file(pipeline)
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.mgr.tg = os_service.threadgroup.ThreadGroup(1000)
self.mgr.start()
expected_samples = 1
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if len(self.notified_samples) >= expected_samples:
break
eventlet.sleep(0)
# we only got the old name of meters
for sample in self.notified_samples:
self.assertEqual('test', sample['counter_name'])
self.assertEqual(1, sample['counter_volume'])
self.assertEqual('test_run_tasks', sample['resource_id'])
# Modify the collection targets
pipeline = yaml.dump({
'sources': [{
'name': 'test_pipeline',
'interval': 1,
'meters': ['testanother'],
'resources': ['test://'] if self.source_resources else [],
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
'transformers': [],
'publishers': ["test"]}]
})
updated_pipeline_cfg_file = setup_pipeline_file(pipeline)
# Move/re-name the updated pipeline file to the original pipeline
# file path as recorded in oslo config
shutil.move(updated_pipeline_cfg_file, pipeline_cfg_file)
# Random sleep to let the pipeline poller complete the reloading
eventlet.sleep(3)
# Flush notified samples to test only new, nothing latent on
# fake message bus.
self.notified_samples = []
expected_samples = 1
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if len(self.notified_samples) >= expected_samples:
break
eventlet.sleep(0)
# we only got the new name of meters
for sample in self.notified_samples:
self.assertEqual('testanother', sample['counter_name'])
self.assertEqual(1, sample['counter_volume'])
self.assertEqual('test_run_tasks', sample['resource_id'])