add flexible grouping key

all transformers will now require a grouping key. the pipeline will
take grouping keys from all transformers in it's definition. using
the set of keys, the values will be pulled from the datapoint, hashed,
and sent to a pipeline. if no transformers are applied, samples will
be grouped by resource_id and events will be grouped by event_type.

Implement blueprint distributed-coordinated-notifications

Change-Id: Ief462d19655c238e5951881a58e183084d37ac13
This commit is contained in:
gordon chung 2015-07-21 10:52:39 -04:00
parent 003047bdb5
commit c79b598259
8 changed files with 182 additions and 29 deletions

View File

@ -109,8 +109,9 @@ class NotificationService(service_base.BaseService):
if cfg.CONF.notification.workload_partitioning:
pipe_manager = pipeline.SamplePipelineTransportManager()
for pipe in pipeline_manager.pipelines:
key = pipeline.get_pipeline_grouping_key(pipe)
pipe_manager.add_transporter(
(pipe.source.support_meter,
(pipe.source.support_meter, key or ['resource_id'],
self._get_notifiers(transport, pipe)))
else:
pipe_manager = pipeline_manager
@ -126,7 +127,7 @@ class NotificationService(service_base.BaseService):
event_pipe_manager = pipeline.EventPipelineTransportManager()
for pipe in self.event_pipeline_manager.pipelines:
event_pipe_manager.add_transporter(
(pipe.source.support_event,
(pipe.source.support_event, ['event_type'],
self._get_notifiers(transport, pipe)))
else:
event_pipe_manager = self.event_pipeline_manager

View File

@ -125,12 +125,19 @@ class _PipelineTransportManager(object):
def __init__(self):
self.transporters = []
@staticmethod
def hash_grouping(datapoint, grouping_keys):
value = ''
for key in grouping_keys or []:
value += datapoint.get(key) if datapoint.get(key) else ''
return hash(value)
def add_transporter(self, transporter):
self.transporters.append(transporter)
def publisher(self, context):
serializer = self.serializer
hash_to_bucketise = self.hash_to_bucketise
hash_grouping = self.hash_grouping
transporters = self.transporters
filter_attr = self.filter_attr
event_type = self.event_type
@ -144,10 +151,11 @@ class _PipelineTransportManager(object):
data = [data] if not isinstance(data, list) else data
for datapoint in data:
serialized_data = serializer(datapoint)
for d_filter, notifiers in transporters:
for d_filter, grouping_keys, notifiers in transporters:
if d_filter(serialized_data[filter_attr]):
key = (hash_to_bucketise(serialized_data) %
len(notifiers))
key = (hash_grouping(serialized_data,
grouping_keys)
% len(notifiers))
notifier = notifiers[key]
notifier.sample(context.to_dict(),
event_type=event_type,
@ -164,10 +172,6 @@ class SamplePipelineTransportManager(_PipelineTransportManager):
filter_attr = 'counter_name'
event_type = 'ceilometer.pipeline'
@staticmethod
def hash_to_bucketise(datapoint):
return hash(datapoint['resource_id'])
@staticmethod
def serializer(data):
return publisher_utils.meter_message_from_counter(
@ -178,10 +182,6 @@ class EventPipelineTransportManager(_PipelineTransportManager):
filter_attr = 'event_type'
event_type = 'pipeline.event'
@staticmethod
def hash_to_bucketise(datapoint):
return hash(datapoint['event_type'])
@staticmethod
def serializer(data):
return publisher_utils.message_from_event(
@ -849,3 +849,10 @@ def setup_polling():
"""Setup polling manager according to yaml config file."""
cfg_file = cfg.CONF.pipeline_cfg_file
return _setup_polling_manager(cfg_file)
def get_pipeline_grouping_key(pipe):
keys = []
for transformer in pipe.sink.transformers:
keys += transformer.grouping_keys
return list(set(keys))

View File

@ -425,10 +425,50 @@ class TestRealNotificationHA(BaseRealNotification):
mock_notifier.call_args_list[2][1]['event_type'])
self.srv.stop()
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_multiple_agents(self, fake_publisher_cls):
fake_publisher_cls.return_value = self.publisher
class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
def setup_pipeline(self, transformers):
pipeline = yaml.dump({
'sources': [{
'name': 'test_pipeline',
'interval': 5,
'meters': ['instance', 'memory'],
'sinks': ['test_sink']
}],
'sinks': [{
'name': 'test_sink',
'transformers': transformers,
'publishers': ['test://']
}]
})
if six.PY3:
pipeline = pipeline.encode('utf-8')
pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline,
prefix="pipeline",
suffix="yaml")
return pipeline_cfg_file
def setUp(self):
super(TestRealNotificationMultipleAgents, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
service.prepare_service([])
self.setup_messaging(self.CONF, 'nova')
pipeline_cfg_file = self.setup_pipeline(['instance', 'memory'])
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.CONF.set_override("store_events", False, group="notification")
self.CONF.set_override("disable_non_metric_meters", False,
group="notification")
self.CONF.set_override('workload_partitioning', True,
group='notification')
self.publisher = test_publisher.TestPublisher("")
self.publisher2 = test_publisher.TestPublisher("")
def _check_notifications(self, fake_publisher_cls):
fake_publisher_cls.side_effect = [self.publisher, self.publisher2]
self.srv = notification.NotificationService()
self.srv2 = notification.NotificationService()
with mock.patch('ceilometer.coordination.PartitionCoordinator'
'._get_members', return_value=['harry', 'lloyd']):
@ -448,17 +488,63 @@ class TestRealNotificationHA(BaseRealNotification):
notifier.info(context.RequestContext(), 'compute.instance.create.end',
payload2)
self.expected_samples = 4
self.expected_events = 2
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 60:
if (len(self.publisher.samples) >= self.expected_samples and
len(self.publisher.events) >= self.expected_events):
break
eventlet.sleep(0)
self.srv.stop()
self.srv2.stop()
with mock.patch('six.moves.builtins.hash', lambda x: int(x)):
while timeutils.delta_seconds(start, timeutils.utcnow()) < 60:
if (len(self.publisher.samples + self.publisher2.samples) >=
self.expected_samples):
break
eventlet.sleep(0)
self.srv.stop()
self.srv2.stop()
resources = set(s.resource_id for s in self.publisher.samples)
self.assertEqual(self.expected_samples, len(self.publisher.samples))
self.assertEqual(self.expected_events, len(self.publisher.events))
self.assertEqual(set(['1', '0']), resources)
self.assertEqual(2, len(self.publisher.samples))
self.assertEqual(2, len(self.publisher2.samples))
self.assertEqual(1, len(set(
s.resource_id for s in self.publisher.samples)))
self.assertEqual(1, len(set(
s.resource_id for s in self.publisher2.samples)))
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_multiple_agents_no_transform(self, fake_publisher_cls):
pipeline_cfg_file = self.setup_pipeline([])
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self._check_notifications(fake_publisher_cls)
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_multiple_agents_transform(self, fake_publisher_cls):
pipeline_cfg_file = self.setup_pipeline(
[{
'name': 'unit_conversion',
'parameters': {
'source': {},
'target': {'name': 'cpu_mins',
'unit': 'min',
'scale': 'volume'},
}
}])
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self._check_notifications(fake_publisher_cls)
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_multiple_agents_multiple_transform(self, fake_publisher_cls):
pipeline_cfg_file = self.setup_pipeline(
[{
'name': 'unit_conversion',
'parameters': {
'source': {},
'target': {'name': 'cpu_mins',
'unit': 'min',
'scale': 'volume'},
}
}, {
'name': 'unit_conversion',
'parameters': {
'source': {},
'target': {'name': 'cpu_mins',
'unit': 'min',
'scale': 'volume'},
}
}])
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self._check_notifications(fake_publisher_cls)

View File

@ -86,6 +86,7 @@ class BasePipelineTestCase(base.BaseTestCase):
class TransformerClass(transformer.TransformerBase):
samples = []
grouping_keys = ['counter_name']
def __init__(self, append_name='_update'):
self.__class__.samples = []
@ -111,6 +112,7 @@ class BasePipelineTestCase(base.BaseTestCase):
class TransformerClassDrop(transformer.TransformerBase):
samples = []
grouping_keys = ['resource_id']
def __init__(self):
self.__class__.samples = []
@ -119,6 +121,8 @@ class BasePipelineTestCase(base.BaseTestCase):
self.__class__.samples.append(counter)
class TransformerClassException(object):
grouping_keys = ['resource_id']
@staticmethod
def handle_sample(ctxt, counter):
raise Exception()
@ -1897,3 +1901,48 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_unique_pipeline_names(self):
self._dup_pipeline_name_cfg()
self._exception_create_pipelinemanager()
def test_get_pipeline_grouping_key(self):
transformer_cfg = [
{
'name': 'update',
'parameters': {}
},
{
'name': 'unit_conversion',
'parameters': {
'source': {},
'target': {'name': 'cpu_mins',
'unit': 'min',
'scale': 'volume'},
}
},
{
'name': 'update',
'parameters': {}
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
self.assertEqual(set(['resource_id', 'counter_name']),
set(pipeline.get_pipeline_grouping_key(
pipeline_manager.pipelines[0])))
def test_get_pipeline_duplicate_grouping_key(self):
transformer_cfg = [
{
'name': 'update',
'parameters': {}
},
{
'name': 'update',
'parameters': {}
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
self.assertEqual(['counter_name'],
pipeline.get_pipeline_grouping_key(
pipeline_manager.pipelines[0]))

View File

@ -43,6 +43,10 @@ class TransformerBase(object):
:param sample: A sample.
"""
@abc.abstractproperty
def grouping_keys(self):
"""Keys used to group transformer."""
def flush(self, context):
"""Flush samples cached previously.

View File

@ -22,6 +22,8 @@ class TransformerAccumulator(transformer.TransformerBase):
And then flushes them out into the wild.
"""
grouping_keys = ['resource_id']
def __init__(self, size=1, **kwargs):
if size >= 1:
self.samples = []

View File

@ -35,6 +35,8 @@ class ArithmeticTransformer(transformer.TransformerBase):
over one or more meters and/or their metadata.
"""
grouping_keys = ['resource_id']
meter_name_re = re.compile(r'\$\(([\w\.\-]+)\)')
def __init__(self, target=None, **kwargs):

View File

@ -30,6 +30,8 @@ LOG = log.getLogger(__name__)
class ScalingTransformer(transformer.TransformerBase):
"""Transformer to apply a scaling conversion."""
grouping_keys = ['resource_id']
def __init__(self, source=None, target=None, **kwargs):
"""Initialize transformer with configured parameters.