Merge "publisher.rpc: make per counter topic optional"
This commit is contained in:
commit
e087e7e143
@ -20,7 +20,7 @@
|
|||||||
|
|
||||||
import abc
|
import abc
|
||||||
from stevedore import driver
|
from stevedore import driver
|
||||||
import urlparse
|
from ceilometer.openstack.common import network_utils
|
||||||
|
|
||||||
|
|
||||||
def get_publisher(url, namespace='ceilometer.publisher'):
|
def get_publisher(url, namespace='ceilometer.publisher'):
|
||||||
@ -29,7 +29,7 @@ def get_publisher(url, namespace='ceilometer.publisher'):
|
|||||||
:param URL: URL for the publisher
|
:param URL: URL for the publisher
|
||||||
:param namespace: Namespace to use to look for drivers.
|
:param namespace: Namespace to use to look for drivers.
|
||||||
"""
|
"""
|
||||||
parse_result = urlparse.urlparse(url)
|
parse_result = network_utils.urlsplit(url)
|
||||||
loaded_driver = driver.DriverManager(namespace, parse_result.scheme)
|
loaded_driver = driver.DriverManager(namespace, parse_result.scheme)
|
||||||
return loaded_driver.driver(parse_result)
|
return loaded_driver.driver(parse_result)
|
||||||
|
|
||||||
|
@ -21,7 +21,9 @@
|
|||||||
import hashlib
|
import hashlib
|
||||||
import hmac
|
import hmac
|
||||||
import itertools
|
import itertools
|
||||||
|
import operator
|
||||||
import uuid
|
import uuid
|
||||||
|
import urlparse
|
||||||
|
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
@ -103,12 +105,19 @@ def meter_message_from_counter(counter, secret, source):
|
|||||||
|
|
||||||
|
|
||||||
class RPCPublisher(publisher.PublisherBase):
|
class RPCPublisher(publisher.PublisherBase):
|
||||||
def publish_counters(self, context, counters, source):
|
|
||||||
"""Send a metering message for publishing
|
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call
|
def __init__(self, parsed_url):
|
||||||
:param counter: Counter from pipeline after transformation
|
options = urlparse.parse_qs(parsed_url.query)
|
||||||
:param source: counter source
|
self.per_meter_topic = bool(int(
|
||||||
|
options.get('per_meter_topic', [0])[-1]))
|
||||||
|
|
||||||
|
def publish_counters(self, context, counters, source):
|
||||||
|
"""Publish counters on RPC.
|
||||||
|
|
||||||
|
:param context: Execution context from the service or RPC call.
|
||||||
|
:param counters: Counters from pipeline after transformation.
|
||||||
|
:param source: Counter source.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
meters = [
|
meters = [
|
||||||
@ -125,15 +134,20 @@ class RPCPublisher(publisher.PublisherBase):
|
|||||||
'version': '1.0',
|
'version': '1.0',
|
||||||
'args': {'data': meters},
|
'args': {'data': meters},
|
||||||
}
|
}
|
||||||
LOG.debug('PUBLISH: %s', str(msg))
|
LOG.audit('Publishing %d counters on %s',
|
||||||
|
len(msg['args']['data']), topic)
|
||||||
rpc.cast(context, topic, msg)
|
rpc.cast(context, topic, msg)
|
||||||
|
|
||||||
for meter_name, meter_list in itertools.groupby(
|
if self.per_meter_topic:
|
||||||
sorted(meters, key=lambda m: m['counter_name']),
|
for meter_name, meter_list in itertools.groupby(
|
||||||
lambda m: m['counter_name']):
|
sorted(meters, key=operator.itemgetter('counter_name')),
|
||||||
msg = {
|
operator.itemgetter('counter_name')):
|
||||||
'method': 'record_metering_data',
|
msg = {
|
||||||
'version': '1.0',
|
'method': 'record_metering_data',
|
||||||
'args': {'data': list(meter_list)},
|
'version': '1.0',
|
||||||
}
|
'args': {'data': list(meter_list)},
|
||||||
rpc.cast(context, topic + '.' + meter_name, msg)
|
}
|
||||||
|
topic_name = topic + '.' + meter_name
|
||||||
|
LOG.audit('Publishing %d counters on %s',
|
||||||
|
len(msg['args']['data']), topic_name)
|
||||||
|
rpc.cast(context, topic_name, msg)
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
# Copyright © 2012 New Dream Network, LLC (DreamHost)
|
# Copyright © 2012 New Dream Network, LLC (DreamHost)
|
||||||
#
|
#
|
||||||
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
|
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
|
||||||
|
# Julien Danjou <julien@danjou.info>
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
@ -24,6 +25,7 @@ from oslo.config import cfg
|
|||||||
|
|
||||||
from ceilometer import counter
|
from ceilometer import counter
|
||||||
from ceilometer.openstack.common import jsonutils
|
from ceilometer.openstack.common import jsonutils
|
||||||
|
from ceilometer.openstack.common import network_utils
|
||||||
from ceilometer.openstack.common import rpc as oslo_rpc
|
from ceilometer.openstack.common import rpc as oslo_rpc
|
||||||
from ceilometer.publisher import rpc
|
from ceilometer.publisher import rpc
|
||||||
from ceilometer.tests import base
|
from ceilometer.tests import base
|
||||||
@ -255,12 +257,24 @@ class TestPublish(base.TestCase):
|
|||||||
super(TestPublish, self).setUp()
|
super(TestPublish, self).setUp()
|
||||||
self.published = []
|
self.published = []
|
||||||
self.stubs.Set(oslo_rpc, 'cast', self.faux_cast)
|
self.stubs.Set(oslo_rpc, 'cast', self.faux_cast)
|
||||||
publisher = rpc.RPCPublisher(None)
|
|
||||||
|
def test_published(self):
|
||||||
|
publisher = rpc.RPCPublisher(
|
||||||
|
network_utils.urlsplit('rpc://'))
|
||||||
publisher.publish_counters(None,
|
publisher.publish_counters(None,
|
||||||
self.test_data,
|
self.test_data,
|
||||||
'test')
|
'test')
|
||||||
|
self.assertEqual(len(self.published), 1)
|
||||||
|
self.assertEqual(self.published[0][0],
|
||||||
|
cfg.CONF.publisher_rpc.metering_topic)
|
||||||
|
self.assertIsInstance(self.published[0][1]['args']['data'], list)
|
||||||
|
|
||||||
def test_published(self):
|
def test_published_with_per_meter_topic(self):
|
||||||
|
publisher = rpc.RPCPublisher(
|
||||||
|
network_utils.urlsplit('rpc://?per_meter_topic=1'))
|
||||||
|
publisher.publish_counters(None,
|
||||||
|
self.test_data,
|
||||||
|
'test')
|
||||||
self.assertEqual(len(self.published), 4)
|
self.assertEqual(len(self.published), 4)
|
||||||
for topic, rpc_call in self.published:
|
for topic, rpc_call in self.published:
|
||||||
meters = rpc_call['args']['data']
|
meters = rpc_call['args']['data']
|
||||||
@ -271,7 +285,6 @@ class TestPublish(base.TestCase):
|
|||||||
1,
|
1,
|
||||||
"Meter are published grouped by name")
|
"Meter are published grouped by name")
|
||||||
|
|
||||||
def test_published_topics(self):
|
|
||||||
topics = [topic for topic, meter in self.published]
|
topics = [topic for topic, meter in self.published]
|
||||||
self.assertIn(cfg.CONF.publisher_rpc.metering_topic, topics)
|
self.assertIn(cfg.CONF.publisher_rpc.metering_topic, topics)
|
||||||
self.assertIn(
|
self.assertIn(
|
||||||
|
@ -22,11 +22,11 @@ import datetime
|
|||||||
import mock
|
import mock
|
||||||
import msgpack
|
import msgpack
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
import urlparse
|
|
||||||
|
|
||||||
from ceilometer import counter
|
from ceilometer import counter
|
||||||
from ceilometer.publisher import udp
|
from ceilometer.publisher import udp
|
||||||
from ceilometer.tests import base
|
from ceilometer.tests import base
|
||||||
|
from ceilometer.openstack.common import network_utils
|
||||||
|
|
||||||
|
|
||||||
class TestUDPPublisher(base.TestCase):
|
class TestUDPPublisher(base.TestCase):
|
||||||
@ -105,7 +105,8 @@ class TestUDPPublisher(base.TestCase):
|
|||||||
self.data_sent = []
|
self.data_sent = []
|
||||||
with mock.patch('socket.socket',
|
with mock.patch('socket.socket',
|
||||||
self._make_fake_socket(self.data_sent)):
|
self._make_fake_socket(self.data_sent)):
|
||||||
publisher = udp.UDPPublisher(urlparse.urlparse('udp://somehost'))
|
publisher = udp.UDPPublisher(
|
||||||
|
network_utils.urlsplit('udp://somehost'))
|
||||||
publisher.publish_counters(None,
|
publisher.publish_counters(None,
|
||||||
self.test_data,
|
self.test_data,
|
||||||
self.COUNTER_SOURCE)
|
self.COUNTER_SOURCE)
|
||||||
@ -142,7 +143,8 @@ class TestUDPPublisher(base.TestCase):
|
|||||||
def test_publish_error(self):
|
def test_publish_error(self):
|
||||||
with mock.patch('socket.socket',
|
with mock.patch('socket.socket',
|
||||||
self._make_broken_socket):
|
self._make_broken_socket):
|
||||||
publisher = udp.UDPPublisher(urlparse.urlparse('udp://localhost'))
|
publisher = udp.UDPPublisher(
|
||||||
|
network_utils.urlsplit('udp://localhost'))
|
||||||
publisher.publish_counters(None,
|
publisher.publish_counters(None,
|
||||||
self.test_data,
|
self.test_data,
|
||||||
self.COUNTER_SOURCE)
|
self.COUNTER_SOURCE)
|
||||||
|
Loading…
Reference in New Issue
Block a user