publisher.rpc: make per counter topic optional

Fixes: bug#1133206

Change-Id: Id56b4f367647f313de23b639991827723cb18d47
This commit is contained in:
Julien Danjou 2013-06-18 16:23:14 +02:00
parent ced2b691c1
commit 2ba6e5e2a4
4 changed files with 52 additions and 23 deletions

View File

@ -20,7 +20,7 @@
import abc import abc
from stevedore import driver from stevedore import driver
import urlparse from ceilometer.openstack.common import network_utils
def get_publisher(url, namespace='ceilometer.publisher'): def get_publisher(url, namespace='ceilometer.publisher'):
@ -29,7 +29,7 @@ def get_publisher(url, namespace='ceilometer.publisher'):
:param URL: URL for the publisher :param URL: URL for the publisher
:param namespace: Namespace to use to look for drivers. :param namespace: Namespace to use to look for drivers.
""" """
parse_result = urlparse.urlparse(url) parse_result = network_utils.urlsplit(url)
loaded_driver = driver.DriverManager(namespace, parse_result.scheme) loaded_driver = driver.DriverManager(namespace, parse_result.scheme)
return loaded_driver.driver(parse_result) return loaded_driver.driver(parse_result)

View File

@ -21,7 +21,9 @@
import hashlib import hashlib
import hmac import hmac
import itertools import itertools
import operator
import uuid import uuid
import urlparse
from oslo.config import cfg from oslo.config import cfg
@ -103,12 +105,19 @@ def meter_message_from_counter(counter, secret, source):
class RPCPublisher(publisher.PublisherBase): class RPCPublisher(publisher.PublisherBase):
def publish_counters(self, context, counters, source):
"""Send a metering message for publishing
:param context: Execution context from the service or RPC call def __init__(self, parsed_url):
:param counter: Counter from pipeline after transformation options = urlparse.parse_qs(parsed_url.query)
:param source: counter source self.per_meter_topic = bool(int(
options.get('per_meter_topic', [0])[-1]))
def publish_counters(self, context, counters, source):
"""Publish counters on RPC.
:param context: Execution context from the service or RPC call.
:param counters: Counters from pipeline after transformation.
:param source: Counter source.
""" """
meters = [ meters = [
@ -125,15 +134,20 @@ class RPCPublisher(publisher.PublisherBase):
'version': '1.0', 'version': '1.0',
'args': {'data': meters}, 'args': {'data': meters},
} }
LOG.debug('PUBLISH: %s', str(msg)) LOG.audit('Publishing %d counters on %s',
len(msg['args']['data']), topic)
rpc.cast(context, topic, msg) rpc.cast(context, topic, msg)
for meter_name, meter_list in itertools.groupby( if self.per_meter_topic:
sorted(meters, key=lambda m: m['counter_name']), for meter_name, meter_list in itertools.groupby(
lambda m: m['counter_name']): sorted(meters, key=operator.itemgetter('counter_name')),
msg = { operator.itemgetter('counter_name')):
'method': 'record_metering_data', msg = {
'version': '1.0', 'method': 'record_metering_data',
'args': {'data': list(meter_list)}, 'version': '1.0',
} 'args': {'data': list(meter_list)},
rpc.cast(context, topic + '.' + meter_name, msg) }
topic_name = topic + '.' + meter_name
LOG.audit('Publishing %d counters on %s',
len(msg['args']['data']), topic_name)
rpc.cast(context, topic_name, msg)

View File

@ -3,6 +3,7 @@
# Copyright © 2012 New Dream Network, LLC (DreamHost) # Copyright © 2012 New Dream Network, LLC (DreamHost)
# #
# Author: Doug Hellmann <doug.hellmann@dreamhost.com> # Author: Doug Hellmann <doug.hellmann@dreamhost.com>
# Julien Danjou <julien@danjou.info>
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -24,6 +25,7 @@ from oslo.config import cfg
from ceilometer import counter from ceilometer import counter
from ceilometer.openstack.common import jsonutils from ceilometer.openstack.common import jsonutils
from ceilometer.openstack.common import network_utils
from ceilometer.openstack.common import rpc as oslo_rpc from ceilometer.openstack.common import rpc as oslo_rpc
from ceilometer.publisher import rpc from ceilometer.publisher import rpc
from ceilometer.tests import base from ceilometer.tests import base
@ -255,12 +257,24 @@ class TestPublish(base.TestCase):
super(TestPublish, self).setUp() super(TestPublish, self).setUp()
self.published = [] self.published = []
self.stubs.Set(oslo_rpc, 'cast', self.faux_cast) self.stubs.Set(oslo_rpc, 'cast', self.faux_cast)
publisher = rpc.RPCPublisher(None)
def test_published(self):
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://'))
publisher.publish_counters(None, publisher.publish_counters(None,
self.test_data, self.test_data,
'test') 'test')
self.assertEqual(len(self.published), 1)
self.assertEqual(self.published[0][0],
cfg.CONF.publisher_rpc.metering_topic)
self.assertIsInstance(self.published[0][1]['args']['data'], list)
def test_published(self): def test_published_with_per_meter_topic(self):
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?per_meter_topic=1'))
publisher.publish_counters(None,
self.test_data,
'test')
self.assertEqual(len(self.published), 4) self.assertEqual(len(self.published), 4)
for topic, rpc_call in self.published: for topic, rpc_call in self.published:
meters = rpc_call['args']['data'] meters = rpc_call['args']['data']
@ -271,7 +285,6 @@ class TestPublish(base.TestCase):
1, 1,
"Meter are published grouped by name") "Meter are published grouped by name")
def test_published_topics(self):
topics = [topic for topic, meter in self.published] topics = [topic for topic, meter in self.published]
self.assertIn(cfg.CONF.publisher_rpc.metering_topic, topics) self.assertIn(cfg.CONF.publisher_rpc.metering_topic, topics)
self.assertIn( self.assertIn(

View File

@ -22,11 +22,11 @@ import datetime
import mock import mock
import msgpack import msgpack
from oslo.config import cfg from oslo.config import cfg
import urlparse
from ceilometer import counter from ceilometer import counter
from ceilometer.publisher import udp from ceilometer.publisher import udp
from ceilometer.tests import base from ceilometer.tests import base
from ceilometer.openstack.common import network_utils
class TestUDPPublisher(base.TestCase): class TestUDPPublisher(base.TestCase):
@ -105,7 +105,8 @@ class TestUDPPublisher(base.TestCase):
self.data_sent = [] self.data_sent = []
with mock.patch('socket.socket', with mock.patch('socket.socket',
self._make_fake_socket(self.data_sent)): self._make_fake_socket(self.data_sent)):
publisher = udp.UDPPublisher(urlparse.urlparse('udp://somehost')) publisher = udp.UDPPublisher(
network_utils.urlsplit('udp://somehost'))
publisher.publish_counters(None, publisher.publish_counters(None,
self.test_data, self.test_data,
self.COUNTER_SOURCE) self.COUNTER_SOURCE)
@ -142,7 +143,8 @@ class TestUDPPublisher(base.TestCase):
def test_publish_error(self): def test_publish_error(self):
with mock.patch('socket.socket', with mock.patch('socket.socket',
self._make_broken_socket): self._make_broken_socket):
publisher = udp.UDPPublisher(urlparse.urlparse('udp://localhost')) publisher = udp.UDPPublisher(
network_utils.urlsplit('udp://localhost'))
publisher.publish_counters(None, publisher.publish_counters(None,
self.test_data, self.test_data,
self.COUNTER_SOURCE) self.COUNTER_SOURCE)