Merge "add flexible grouping key"

This commit is contained in:
Jenkins 2015-08-24 16:57:37 +00:00 committed by Gerrit Code Review
commit e1fc1ca8df
8 changed files with 182 additions and 29 deletions

View File

@ -109,8 +109,9 @@ class NotificationService(service_base.BaseService):
if cfg.CONF.notification.workload_partitioning:
pipe_manager = pipeline.SamplePipelineTransportManager()
for pipe in pipeline_manager.pipelines:
key = pipeline.get_pipeline_grouping_key(pipe)
pipe_manager.add_transporter(
(pipe.source.support_meter,
(pipe.source.support_meter, key or ['resource_id'],
self._get_notifiers(transport, pipe)))
else:
pipe_manager = pipeline_manager
@ -126,7 +127,7 @@ class NotificationService(service_base.BaseService):
event_pipe_manager = pipeline.EventPipelineTransportManager()
for pipe in self.event_pipeline_manager.pipelines:
event_pipe_manager.add_transporter(
(pipe.source.support_event,
(pipe.source.support_event, ['event_type'],
self._get_notifiers(transport, pipe)))
else:
event_pipe_manager = self.event_pipeline_manager

View File

@ -125,12 +125,19 @@ class _PipelineTransportManager(object):
def __init__(self):
self.transporters = []
@staticmethod
def hash_grouping(datapoint, grouping_keys):
value = ''
for key in grouping_keys or []:
value += datapoint.get(key) if datapoint.get(key) else ''
return hash(value)
def add_transporter(self, transporter):
self.transporters.append(transporter)
def publisher(self, context):
serializer = self.serializer
hash_to_bucketise = self.hash_to_bucketise
hash_grouping = self.hash_grouping
transporters = self.transporters
filter_attr = self.filter_attr
event_type = self.event_type
@ -144,10 +151,11 @@ class _PipelineTransportManager(object):
data = [data] if not isinstance(data, list) else data
for datapoint in data:
serialized_data = serializer(datapoint)
for d_filter, notifiers in transporters:
for d_filter, grouping_keys, notifiers in transporters:
if d_filter(serialized_data[filter_attr]):
key = (hash_to_bucketise(serialized_data) %
len(notifiers))
key = (hash_grouping(serialized_data,
grouping_keys)
% len(notifiers))
notifier = notifiers[key]
notifier.sample(context.to_dict(),
event_type=event_type,
@ -164,10 +172,6 @@ class SamplePipelineTransportManager(_PipelineTransportManager):
filter_attr = 'counter_name'
event_type = 'ceilometer.pipeline'
@staticmethod
def hash_to_bucketise(datapoint):
return hash(datapoint['resource_id'])
@staticmethod
def serializer(data):
return publisher_utils.meter_message_from_counter(
@ -178,10 +182,6 @@ class EventPipelineTransportManager(_PipelineTransportManager):
filter_attr = 'event_type'
event_type = 'pipeline.event'
@staticmethod
def hash_to_bucketise(datapoint):
return hash(datapoint['event_type'])
@staticmethod
def serializer(data):
return publisher_utils.message_from_event(
@ -853,3 +853,10 @@ def setup_polling():
"""Setup polling manager according to yaml config file."""
cfg_file = cfg.CONF.pipeline_cfg_file
return _setup_polling_manager(cfg_file)
def get_pipeline_grouping_key(pipe):
keys = []
for transformer in pipe.sink.transformers:
keys += transformer.grouping_keys
return list(set(keys))

View File

@ -425,10 +425,50 @@ class TestRealNotificationHA(BaseRealNotification):
mock_notifier.call_args_list[2][1]['event_type'])
self.srv.stop()
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_multiple_agents(self, fake_publisher_cls):
fake_publisher_cls.return_value = self.publisher
class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
def setup_pipeline(self, transformers):
pipeline = yaml.dump({
'sources': [{
'name': 'test_pipeline',
'interval': 5,
'meters': ['instance', 'memory'],
'sinks': ['test_sink']
}],
'sinks': [{
'name': 'test_sink',
'transformers': transformers,
'publishers': ['test://']
}]
})
if six.PY3:
pipeline = pipeline.encode('utf-8')
pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline,
prefix="pipeline",
suffix="yaml")
return pipeline_cfg_file
def setUp(self):
super(TestRealNotificationMultipleAgents, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
service.prepare_service([])
self.setup_messaging(self.CONF, 'nova')
pipeline_cfg_file = self.setup_pipeline(['instance', 'memory'])
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.CONF.set_override("store_events", False, group="notification")
self.CONF.set_override("disable_non_metric_meters", False,
group="notification")
self.CONF.set_override('workload_partitioning', True,
group='notification')
self.publisher = test_publisher.TestPublisher("")
self.publisher2 = test_publisher.TestPublisher("")
def _check_notifications(self, fake_publisher_cls):
fake_publisher_cls.side_effect = [self.publisher, self.publisher2]
self.srv = notification.NotificationService()
self.srv2 = notification.NotificationService()
with mock.patch('ceilometer.coordination.PartitionCoordinator'
'._get_members', return_value=['harry', 'lloyd']):
@ -448,17 +488,63 @@ class TestRealNotificationHA(BaseRealNotification):
notifier.info(context.RequestContext(), 'compute.instance.create.end',
payload2)
self.expected_samples = 4
self.expected_events = 2
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 60:
if (len(self.publisher.samples) >= self.expected_samples and
len(self.publisher.events) >= self.expected_events):
break
eventlet.sleep(0)
self.srv.stop()
self.srv2.stop()
with mock.patch('six.moves.builtins.hash', lambda x: int(x)):
while timeutils.delta_seconds(start, timeutils.utcnow()) < 60:
if (len(self.publisher.samples + self.publisher2.samples) >=
self.expected_samples):
break
eventlet.sleep(0)
self.srv.stop()
self.srv2.stop()
resources = set(s.resource_id for s in self.publisher.samples)
self.assertEqual(self.expected_samples, len(self.publisher.samples))
self.assertEqual(self.expected_events, len(self.publisher.events))
self.assertEqual(set(['1', '0']), resources)
self.assertEqual(2, len(self.publisher.samples))
self.assertEqual(2, len(self.publisher2.samples))
self.assertEqual(1, len(set(
s.resource_id for s in self.publisher.samples)))
self.assertEqual(1, len(set(
s.resource_id for s in self.publisher2.samples)))
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_multiple_agents_no_transform(self, fake_publisher_cls):
pipeline_cfg_file = self.setup_pipeline([])
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self._check_notifications(fake_publisher_cls)
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_multiple_agents_transform(self, fake_publisher_cls):
pipeline_cfg_file = self.setup_pipeline(
[{
'name': 'unit_conversion',
'parameters': {
'source': {},
'target': {'name': 'cpu_mins',
'unit': 'min',
'scale': 'volume'},
}
}])
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self._check_notifications(fake_publisher_cls)
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_multiple_agents_multiple_transform(self, fake_publisher_cls):
pipeline_cfg_file = self.setup_pipeline(
[{
'name': 'unit_conversion',
'parameters': {
'source': {},
'target': {'name': 'cpu_mins',
'unit': 'min',
'scale': 'volume'},
}
}, {
'name': 'unit_conversion',
'parameters': {
'source': {},
'target': {'name': 'cpu_mins',
'unit': 'min',
'scale': 'volume'},
}
}])
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self._check_notifications(fake_publisher_cls)

View File

@ -86,6 +86,7 @@ class BasePipelineTestCase(base.BaseTestCase):
class TransformerClass(transformer.TransformerBase):
samples = []
grouping_keys = ['counter_name']
def __init__(self, append_name='_update'):
self.__class__.samples = []
@ -111,6 +112,7 @@ class BasePipelineTestCase(base.BaseTestCase):
class TransformerClassDrop(transformer.TransformerBase):
samples = []
grouping_keys = ['resource_id']
def __init__(self):
self.__class__.samples = []
@ -119,6 +121,8 @@ class BasePipelineTestCase(base.BaseTestCase):
self.__class__.samples.append(counter)
class TransformerClassException(object):
grouping_keys = ['resource_id']
@staticmethod
def handle_sample(ctxt, counter):
raise Exception()
@ -1897,3 +1901,48 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_unique_pipeline_names(self):
self._dup_pipeline_name_cfg()
self._exception_create_pipelinemanager()
def test_get_pipeline_grouping_key(self):
transformer_cfg = [
{
'name': 'update',
'parameters': {}
},
{
'name': 'unit_conversion',
'parameters': {
'source': {},
'target': {'name': 'cpu_mins',
'unit': 'min',
'scale': 'volume'},
}
},
{
'name': 'update',
'parameters': {}
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
self.assertEqual(set(['resource_id', 'counter_name']),
set(pipeline.get_pipeline_grouping_key(
pipeline_manager.pipelines[0])))
def test_get_pipeline_duplicate_grouping_key(self):
transformer_cfg = [
{
'name': 'update',
'parameters': {}
},
{
'name': 'update',
'parameters': {}
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
self.assertEqual(['counter_name'],
pipeline.get_pipeline_grouping_key(
pipeline_manager.pipelines[0]))

View File

@ -43,6 +43,10 @@ class TransformerBase(object):
:param sample: A sample.
"""
@abc.abstractproperty
def grouping_keys(self):
"""Keys used to group transformer."""
def flush(self, context):
"""Flush samples cached previously.

View File

@ -22,6 +22,8 @@ class TransformerAccumulator(transformer.TransformerBase):
And then flushes them out into the wild.
"""
grouping_keys = ['resource_id']
def __init__(self, size=1, **kwargs):
if size >= 1:
self.samples = []

View File

@ -35,6 +35,8 @@ class ArithmeticTransformer(transformer.TransformerBase):
over one or more meters and/or their metadata.
"""
grouping_keys = ['resource_id']
meter_name_re = re.compile(r'\$\(([\w\.\-]+)\)')
def __init__(self, target=None, **kwargs):

View File

@ -30,6 +30,8 @@ LOG = log.getLogger(__name__)
class ScalingTransformer(transformer.TransformerBase):
"""Transformer to apply a scaling conversion."""
grouping_keys = ['resource_id']
def __init__(self, source=None, target=None, **kwargs):
"""Initialize transformer with configured parameters.