make telemetry sample payloads dictionaries

this patch makes the payload we generate via polling agents and
api a dictionary. this gives us more flexibility to include metadata
on how the data was generated that we don't necessary want to
include in sample metadata.

Change-Id: I1350c78c52fad8111241b3f77698fef6c4aa77a9
Closes-Bug: #1484937
This commit is contained in:
gordon chung 2015-08-17 16:20:49 -04:00
parent fcf5b4e7e2
commit 988a573f7f
6 changed files with 42 additions and 49 deletions

View File

@ -117,10 +117,7 @@ class PollingTask(object):
resource_factory = lambda: Resources(agent_manager) resource_factory = lambda: Resources(agent_manager)
self.resources = collections.defaultdict(resource_factory) self.resources = collections.defaultdict(resource_factory)
if cfg.CONF.batch_polled_samples: self._batch = cfg.CONF.batch_polled_samples
self._handle_sample = self._assemble_samples
else:
self._handle_sample = self._send_notification
self._telemetry_secret = cfg.CONF.publisher.telemetry_secret self._telemetry_secret = cfg.CONF.publisher.telemetry_secret
def add(self, pollster, source): def add(self, pollster, source):
@ -179,10 +176,11 @@ class PollingTask(object):
publisher_utils.meter_message_from_counter( publisher_utils.meter_message_from_counter(
sample, self._telemetry_secret sample, self._telemetry_secret
)) ))
self._handle_sample([sample_dict], sample_batch) if self._batch:
sample_batch.append(sample_dict)
else:
self._send_notification([sample_dict])
# sample_batch will contain samples if
# cfg.CONF.batch_polled_samples is True
if sample_batch: if sample_batch:
self._send_notification(sample_batch) self._send_notification(sample_batch)
@ -198,15 +196,11 @@ class PollingTask(object):
% ({'name': pollster.name, 'error': err}), % ({'name': pollster.name, 'error': err}),
exc_info=True) exc_info=True)
@staticmethod def _send_notification(self, samples):
def _assemble_samples(samples, batch):
batch.extend(samples)
def _send_notification(self, samples, batch=None):
self.manager.notifier.info( self.manager.notifier.info(
self.manager.context.to_dict(), self.manager.context.to_dict(),
'telemetry.api', 'telemetry.polling',
samples {'samples': samples}
) )

View File

@ -367,7 +367,8 @@ class MeterController(rest.RestController):
tenant=def_project_id, tenant=def_project_id,
is_admin=True) is_admin=True)
notifier = pecan.request.notifier notifier = pecan.request.notifier
notifier.info(ctxt.to_dict(), 'telemetry.api', published_samples) notifier.info(ctxt.to_dict(), 'telemetry.api',
{'samples': published_samples})
return samples return samples

View File

@ -45,10 +45,10 @@ class TelemetryBase(plugin_base.NotificationBase):
class TelemetryApiPost(TelemetryBase): class TelemetryApiPost(TelemetryBase):
"""Handle sample from notification bus, which is posted via API.""" """Handle sample from notification bus, which is posted via API."""
event_types = ['telemetry.api'] event_types = ['telemetry.api', 'telemetry.polling']
def process_notification(self, message): def process_notification(self, message):
samples = message['payload'] samples = message['payload']['samples']
for sample_dict in samples: for sample_dict in samples:
yield sample.Sample( yield sample.Sample(
name=sample_dict['counter_name'], name=sample_dict['counter_name'],

View File

@ -29,9 +29,10 @@ from ceilometer.tests.functional.api import v2
class TestPostSamples(v2.FunctionalTest, class TestPostSamples(v2.FunctionalTest,
tests_db.MixinTestsWithBackendScenarios): tests_db.MixinTestsWithBackendScenarios):
def fake_notifier_sample(self, ctxt, event_type, payload): def fake_notifier_sample(self, ctxt, event_type, payload):
for m in payload: samples = payload['samples']
for m in samples:
del m['message_signature'] del m['message_signature']
self.published.append(payload) self.published.append(samples)
def setUp(self): def setUp(self):
self.published = [] self.published = []

View File

@ -201,7 +201,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
return pipeline_cfg_file return pipeline_cfg_file
def fake_notifier_sample(self, ctxt, event_type, payload): def fake_notifier_sample(self, ctxt, event_type, payload):
for m in payload: for m in payload['samples']:
del m['message_signature'] del m['message_signature']
self.notified_samples.append(m) self.notified_samples.append(m)

View File

@ -21,34 +21,31 @@ NOTIFICATION = {
'timestamp': u'2015-06-1909: 19: 35.786893', 'timestamp': u'2015-06-1909: 19: 35.786893',
u'_context_auth_token': None, u'_context_auth_token': None,
u'_context_read_only': False, u'_context_read_only': False,
'payload': [{ 'payload': {'samples':
u'counter_name': u'instance100', [{'counter_name': u'instance100',
u'user_id': u'e1d870e51c7340cb9d555b15cbfcaec2', u'user_id': u'e1d870e51c7340cb9d555b15cbfcaec2',
u'resource_id': u'instance', u'resource_id': u'instance',
u'timestamp': u'2015-06-19T09: 19: 35.785330', u'timestamp': u'2015-06-19T09: 19: 35.785330',
u'message_signature': u'fake_signature1', u'message_signature': u'fake_signature1',
u'resource_metadata': {u'foo': u'bar'}, u'resource_metadata': {u'foo': u'bar'},
u'source': u'30be1fc9a03c4e94ab05c403a8a377f2: openstack', u'source': u'30be1fc9a03c4e94ab05c403a8a377f2: openstack',
u'counter_unit': u'instance', u'counter_unit': u'instance',
u'counter_volume': 1.0, u'counter_volume': 1.0,
u'project_id': u'30be1fc9a03c4e94ab05c403a8a377f2', u'project_id': u'30be1fc9a03c4e94ab05c403a8a377f2',
u'message_id': u'4d865c6e-1664-11e5-9d41-0819a6cff905', u'message_id': u'4d865c6e-1664-11e5-9d41-0819a6cff905',
u'counter_type': u'gauge' u'counter_type': u'gauge'},
}, {u'counter_name': u'instance100',
{ u'user_id': u'e1d870e51c7340cb9d555b15cbfcaec2',
u'counter_name': u'instance100', u'resource_id': u'instance',
u'user_id': u'e1d870e51c7340cb9d555b15cbfcaec2', u'timestamp': u'2015-06-19T09: 19: 35.785330',
u'resource_id': u'instance', u'message_signature': u'fake_signature12',
u'timestamp': u'2015-06-19T09: 19: 35.785330', u'resource_metadata': {u'foo': u'bar'},
u'message_signature': u'fake_signature12', u'source': u'30be1fc9a03c4e94ab05c403a8a377f2: openstack',
u'resource_metadata': {u'foo': u'bar'}, u'counter_unit': u'instance',
u'source': u'30be1fc9a03c4e94ab05c403a8a377f2: openstack', u'counter_volume': 1.0,
u'counter_unit': u'instance', u'project_id': u'30be1fc9a03c4e94ab05c403a8a377f2',
u'counter_volume': 1.0, u'message_id': u'4d866da8-1664-11e5-9d41-0819a6cff905',
u'project_id': u'30be1fc9a03c4e94ab05c403a8a377f2', u'counter_type': u'gauge'}]},
u'message_id': u'4d866da8-1664-11e5-9d41-0819a6cff905',
u'counter_type': u'gauge'
}],
u'_context_resource_uuid': None, u'_context_resource_uuid': None,
u'_context_user_identity': u'fake_user_identity---', u'_context_user_identity': u'fake_user_identity---',
u'_context_show_deleted': False, u'_context_show_deleted': False,
@ -69,7 +66,7 @@ class TelemetryApiPostTestCase(base.BaseTestCase):
sample_creation = notifications.TelemetryApiPost(None) sample_creation = notifications.TelemetryApiPost(None)
samples = list(sample_creation.process_notification(NOTIFICATION)) samples = list(sample_creation.process_notification(NOTIFICATION))
self.assertEqual(2, len(samples)) self.assertEqual(2, len(samples))
payload = NOTIFICATION["payload"] payload = NOTIFICATION["payload"]['samples']
for index, sample in enumerate(samples): for index, sample in enumerate(samples):
self.assertEqual(payload[index]["user_id"], sample.user_id) self.assertEqual(payload[index]["user_id"], sample.user_id)
self.assertEqual(payload[index]["counter_name"], sample.name) self.assertEqual(payload[index]["counter_name"], sample.name)