Allow to publish several counters in a row

Support publishing of multiple counters at once. This change the pipeline
and RPC calls to send a list of counter by default instead of only once
counter. This allows to make only one RPC calls when sending multiple
counters to be more efficient.

This implements blueprint publish-counters-list-rpc

Change-Id: Ie4155b35585f261e6ff9816e5a845a479151eefd
Signed-off-by: Julien Danjou <julien@danjou.info>
This commit is contained in:
Julien Danjou 2013-02-05 11:32:58 +01:00
parent 3340461656
commit 59b5a5a488
11 changed files with 177 additions and 89 deletions

View File

@ -18,7 +18,7 @@
from stevedore import dispatch from stevedore import dispatch
from ceilometer.collector import meter from ceilometer.collector import meter as meter_api
from ceilometer import extension_manager from ceilometer import extension_manager
from ceilometer import pipeline from ceilometer import pipeline
from ceilometer import service from ceilometer import service
@ -124,27 +124,32 @@ class CollectorService(service.PeriodicService):
"""This method is triggered when metering data is """This method is triggered when metering data is
cast from an agent. cast from an agent.
""" """
#LOG.info('metering data: %r', data) # We may have receive only one counter on the wire
if not isinstance(data, list):
data = [data]
for meter in data:
LOG.info('metering data %s for %s @ %s: %s', LOG.info('metering data %s for %s @ %s: %s',
data['counter_name'], meter['counter_name'],
data['resource_id'], meter['resource_id'],
data.get('timestamp', 'NO TIMESTAMP'), meter.get('timestamp', 'NO TIMESTAMP'),
data['counter_volume']) meter['counter_volume'])
if not meter.verify_signature(data, cfg.CONF.metering_secret): if meter_api.verify_signature(meter, cfg.CONF.metering_secret):
LOG.warning('message signature invalid, discarding message: %r',
data)
else:
try: try:
# Convert the timestamp to a datetime instance. # Convert the timestamp to a datetime instance.
# Storage engines are responsible for converting # Storage engines are responsible for converting
# that value to something they can store. # that value to something they can store.
if data.get('timestamp'): if meter.get('timestamp'):
ts = timeutils.parse_isotime(data['timestamp']) ts = timeutils.parse_isotime(meter['timestamp'])
data['timestamp'] = timeutils.normalize_time(ts) meter['timestamp'] = timeutils.normalize_time(ts)
self.storage_conn.record_metering_data(data) self.storage_conn.record_metering_data(meter)
except Exception as err: except Exception as err:
LOG.error('Failed to record metering data: %s', err) LOG.error('Failed to record metering data: %s', err)
LOG.exception(err) LOG.exception(err)
else:
LOG.warning(
'message signature invalid, discarding message: %r',
meter)
def periodic_tasks(self, context): def periodic_tasks(self, context):
pass pass

View File

@ -96,9 +96,9 @@ class CeilometerMiddleware(object):
now = timeutils.utcnow().isoformat() now = timeutils.utcnow().isoformat()
if bytes_received: if bytes_received:
self.pipeline_manager.publish_counter( self.pipeline_manager.publish_counters(
context.get_admin_context(), context.get_admin_context(),
counter.Counter( [counter.Counter(
name='storage.objects.incoming.bytes', name='storage.objects.incoming.bytes',
type='delta', type='delta',
unit='B', unit='B',
@ -112,13 +112,13 @@ class CeilometerMiddleware(object):
"version": version, "version": version,
"container": container, "container": container,
"object": obj, "object": obj,
}, ), }, )],
cfg.CONF.counter_source) cfg.CONF.counter_source)
if bytes_sent: if bytes_sent:
self.pipeline_manager.publish_counter( self.pipeline_manager.publish_counters(
context.get_admin_context(), context.get_admin_context(),
counter.Counter( [counter.Counter(
name='storage.objects.outgoing.bytes', name='storage.objects.outgoing.bytes',
type='delta', type='delta',
unit='B', unit='B',
@ -132,7 +132,7 @@ class CeilometerMiddleware(object):
"version": version, "version": version,
"container": container, "container": container,
"object": obj, "object": obj,
}), })],
cfg.CONF.counter_source) cfg.CONF.counter_source)

View File

@ -16,12 +16,12 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import itertools
import os import os
from stevedore import extension from stevedore import extension
import yaml import yaml
from ceilometer import extension_manager
from ceilometer.openstack.common import cfg from ceilometer.openstack.common import cfg
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
@ -170,13 +170,12 @@ class Pipeline(object):
return transformers return transformers
def _publish_counter_to_one_publisher(self, ext, ctxt, counter, source): def _publish_counters_to_one_publisher(self, ext, ctxt, counters, source):
try: try:
ext.obj.publish_counter(ctxt, counter, source) ext.obj.publish_counters(ctxt, counters, source)
except Exception as err: except Exception as err:
LOG.warning("Pipeline %s: Continue after error " LOG.warning("Pipeline %s: Continue after error "
"from publisher %s for %s", "from publisher %s", self, ext.name)
self, ext.name, counter)
LOG.exception(err) LOG.exception(err)
def _transform_counter(self, start, ctxt, counter, source): def _transform_counter(self, start, ctxt, counter, source):
@ -194,36 +193,45 @@ class Pipeline(object):
self, transformer, counter) self, transformer, counter)
LOG.exception(err) LOG.exception(err)
def _publish_counter(self, start, ctxt, counter, source): def _publish_counters(self, start, ctxt, counters, source):
"""Push counter into pipeline for publishing. """Push counter into pipeline for publishing.
param start: the first transformer that the counter will be injected. param start: the first transformer that the counter will be injected.
This is mainly for flush() invocation that transformer This is mainly for flush() invocation that transformer
may emit counters may emit counters
param ctxt: execution context from the manager or service param ctxt: execution context from the manager or service
param counter: counter param counters: counter list
param source: counter source param source: counter source
""" """
transformed_counters = []
for counter in counters:
LOG.audit("Pipeline %s: Transform counter %s from %s transformer", LOG.audit("Pipeline %s: Transform counter %s from %s transformer",
self, counter, start) self, counter, start)
counter = self._transform_counter(start, ctxt, counter, source) counter = self._transform_counter(start, ctxt, counter, source)
if not counter: if counter:
return transformed_counters.append(counter)
LOG.audit("Pipeline %s: Publish counter %s", self, counter) LOG.audit("Pipeline %s: Publishing counters", self)
self.publisher_manager.map(self.publishers, self.publisher_manager.map(self.publishers,
self._publish_counter_to_one_publisher, self._publish_counters_to_one_publisher,
ctxt=ctxt, ctxt=ctxt,
counter=counter, counters=transformed_counters,
source=source, source=source,
) )
LOG.audit("Pipeline %s: Published counter %s", self, counter) LOG.audit("Pipeline %s: Published counters", self)
def publish_counter(self, ctxt, counter, source): def publish_counter(self, ctxt, counter, source):
if self.support_counter(counter.name): self.publish_counters(ctxt, [counter], source)
self._publish_counter(0, ctxt, counter, source)
def publish_counters(self, ctxt, counters, source):
for counter_name, counters in itertools.groupby(
sorted(counters, key=lambda c: c.name),
lambda c: c.name):
if self.support_counter(counter_name):
self._publish_counters(0, ctxt, counters, source)
# (yjiang5) To support counters like instance:m1.tiny, # (yjiang5) To support counters like instance:m1.tiny,
# which include variable part at the end starting with ':'. # which include variable part at the end starting with ':'.
@ -251,8 +259,9 @@ class Pipeline(object):
LOG.audit("Flush pipeline %s", self) LOG.audit("Flush pipeline %s", self)
for (i, transformer) in enumerate(self.transformers): for (i, transformer) in enumerate(self.transformers):
for c in transformer.flush(ctxt, source): self._publish_counters(i + 1, ctxt,
self._publish_counter(i + 1, ctxt, c, source) list(transformer.flush(ctxt, source)),
source)
def get_interval(self): def get_interval(self):
return self.interval return self.interval

View File

@ -94,8 +94,8 @@ class PublisherBase(PluginBase):
__metaclass__ = abc.ABCMeta __metaclass__ = abc.ABCMeta
@abc.abstractmethod @abc.abstractmethod
def publish_counter(self, context, counter, source): def publish_counters(self, context, counters, source):
"publish counters into final conduit" "Publish counters into final conduit"
class TransformerBase(PluginBase): class TransformerBase(PluginBase):

View File

@ -18,7 +18,9 @@
"""Publish a counter using the preferred RPC mechanism. """Publish a counter using the preferred RPC mechanism.
""" """
from ceilometer.collector import meter import itertools
from ceilometer.collector import meter as meter_api
from ceilometer.openstack.common import cfg from ceilometer.openstack.common import cfg
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.openstack.common import rpc from ceilometer.openstack.common import rpc
@ -44,23 +46,36 @@ register_opts(cfg.CONF)
class MeterPublisher(plugin.PublisherBase): class MeterPublisher(plugin.PublisherBase):
def publish_counter(self, context, counter, source): def publish_counters(self, context, counters, source):
"""Send a metering message for publishing """Send a metering message for publishing
:param context: Execution context from the service or RPC call :param context: Execution context from the service or RPC call
:param counter: Counter from pipeline after transformation :param counter: Counter from pipeline after transformation
:param source: counter source :param source: counter source
""" """
meters = [
meter_api.meter_message_from_counter(counter,
cfg.CONF.metering_secret,
source)
for counter in counters
]
topic = cfg.CONF.metering_topic topic = cfg.CONF.metering_topic
msg = { msg = {
'method': 'record_metering_data', 'method': 'record_metering_data',
'version': '1.0', 'version': '1.0',
'args': {'data': meter.meter_message_from_counter( 'args': {'data': meters},
counter,
cfg.CONF.metering_secret,
source),
},
} }
LOG.debug('PUBLISH: %s', str(msg)) LOG.debug('PUBLISH: %s', str(msg))
rpc.cast(context, topic, msg) rpc.cast(context, topic, msg)
rpc.cast(context, topic + '.' + counter.name, msg)
for meter_name, meter_list in itertools.groupby(
sorted(meters, key=lambda m: m['counter_name']),
lambda m: m['counter_name']):
msg = {
'method': 'record_metering_data',
'version': '1.0',
'args': {'data': list(meter_list)},
}
rpc.cast(context, topic + '.' + meter_name, msg)

View File

@ -19,13 +19,13 @@
"""Test base classes. """Test base classes.
""" """
import unittest import unittest2
import mox import mox
import stubout import stubout
class TestCase(unittest.TestCase): class TestCase(unittest2.TestCase):
def setUp(self): def setUp(self):
super(TestCase, self).setUp() super(TestCase, self).setUp()

View File

@ -46,8 +46,8 @@ class TestSwiftMiddleware(base.TestCase):
def __init__(self): def __init__(self):
self.counters = [] self.counters = []
def publish_counter(self, context, counter, source): def publish_counters(self, context, counters, source):
self.counters.append(counter) self.counters.extend(counters)
def _faux_setup_pipeline(self, publisher_manager): def _faux_setup_pipeline(self, publisher_manager):
return self.pipeline_manager return self.pipeline_manager

View File

@ -20,6 +20,7 @@
import datetime import datetime
from ceilometer.openstack.common import cfg
from ceilometer.openstack.common import rpc from ceilometer.openstack.common import rpc
from ceilometer.tests import base from ceilometer.tests import base
@ -29,7 +30,8 @@ from ceilometer.publisher import meter_publish
class TestPublish(base.TestCase): class TestPublish(base.TestCase):
test_data = counter.Counter( test_data = [
counter.Counter(
name='test', name='test',
type=counter.TYPE_CUMULATIVE, type=counter.TYPE_CUMULATIVE,
unit='', unit='',
@ -39,24 +41,79 @@ class TestPublish(base.TestCase):
resource_id='test_run_tasks', resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(), timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'}, resource_metadata={'name': 'TestPublish'},
) ),
counter.Counter(
name='test',
type=counter.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
counter.Counter(
name='test2',
type=counter.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
counter.Counter(
name='test2',
type=counter.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
counter.Counter(
name='test3',
type=counter.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
]
def faux_notify(self, context, topic, msg): def faux_cast(self, context, topic, msg):
self.notifications.append((topic, msg)) self.published.append((topic, msg))
def setUp(self): def setUp(self):
super(TestPublish, self).setUp() super(TestPublish, self).setUp()
self.notifications = [] self.published = []
self.stubs.Set(rpc, 'cast', self.faux_notify) self.stubs.Set(rpc, 'cast', self.faux_cast)
publisher = meter_publish.MeterPublisher() publisher = meter_publish.MeterPublisher()
publisher.publish_counter(None, publisher.publish_counters(None,
self.test_data, self.test_data,
'test', 'test')
)
def test_notify(self): def test_published(self):
assert len(self.notifications) == 2 self.assertEqual(len(self.published), 4)
for topic, rpc_call in self.published:
meters = rpc_call['args']['data']
self.assertIsInstance(meters, list)
if topic != cfg.CONF.metering_topic:
self.assertEqual(len(set(meter['counter_name']
for meter in meters)),
1,
"Meter are published grouped by name")
def test_notify_topics(self): def test_published_topics(self):
topics = [n[0] for n in self.notifications] topics = [topic for topic, meter in self.published]
assert topics == ['metering', 'metering.test'] self.assertIn(cfg.CONF.metering_topic, topics)
self.assertIn(cfg.CONF.metering_topic + '.' + 'test', topics)
self.assertIn(cfg.CONF.metering_topic + '.' + 'test2', topics)
self.assertIn(cfg.CONF.metering_topic + '.' + 'test3', topics)

View File

@ -57,11 +57,11 @@ class TestPipeline(base.TestCase):
def __init__(self): def __init__(self):
self.counters = [] self.counters = []
def publish_counter(self, ctxt, counter, source): def publish_counters(self, ctxt, counters, source):
self.counters.append(counter) self.counters.extend(counters)
class PublisherClassException(): class PublisherClassException():
def publish_counter(self, ctxt, counter, source): def publish_counters(self, ctxt, counters, source):
raise Exception() raise Exception()
class TransformerClass(object): class TransformerClass(object):

View File

@ -1,4 +1,5 @@
nose nose
unittest2
coverage coverage
mock mock
mox mox

View File

@ -1,5 +1,6 @@
http://tarballs.openstack.org/nova/nova-stable-folsom.tar.gz http://tarballs.openstack.org/nova/nova-stable-folsom.tar.gz
nose nose
unittest2
coverage coverage
mock mock
mox mox