Permit usage of notifications for metering

This patch introduces a new publisher that uses notification instead
of RPC.

And set it by default.

Closes bug: #1005933
Implements blueprint replace-rpc-cast-with-notifications
Co-Authored-By: Ala Rezmerita <ala.rezmerita@cloudwatt.com>

Change-Id: Idc40c148ef60d5e1349d30c66ba85691d93c5675
This commit is contained in:
Mehdi Abaakouk 2014-07-29 12:12:06 +02:00
parent 4fc82d477b
commit fec77dbc85
7 changed files with 182 additions and 106 deletions

View File

@ -19,6 +19,7 @@ import socket
import msgpack import msgpack
from oslo.config import cfg from oslo.config import cfg
import oslo.messaging
from oslo.utils import units from oslo.utils import units
from ceilometer import dispatcher from ceilometer import dispatcher
@ -38,8 +39,10 @@ OPTS = [
] ]
cfg.CONF.register_opts(OPTS, group="collector") cfg.CONF.register_opts(OPTS, group="collector")
cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.rpc', cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging',
group="publisher_rpc") group="publisher_rpc")
cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging',
group="publisher_notifier")
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -52,6 +55,7 @@ class CollectorService(os_service.Service):
# ensure dispatcher is configured before starting other services # ensure dispatcher is configured before starting other services
self.dispatcher_manager = dispatcher.load_dispatcher_manager() self.dispatcher_manager = dispatcher.load_dispatcher_manager()
self.rpc_server = None self.rpc_server = None
self.notification_server = None
super(CollectorService, self).start() super(CollectorService, self).start()
if cfg.CONF.collector.udp_address: if cfg.CONF.collector.udp_address:
@ -61,7 +65,14 @@ class CollectorService(os_service.Service):
if transport: if transport:
self.rpc_server = messaging.get_rpc_server( self.rpc_server = messaging.get_rpc_server(
transport, cfg.CONF.publisher_rpc.metering_topic, self) transport, cfg.CONF.publisher_rpc.metering_topic, self)
target = oslo.messaging.Target(
topic=cfg.CONF.publisher_notifier.metering_topic)
self.notification_server = messaging.get_notification_listener(
transport, [target], [self])
self.rpc_server.start() self.rpc_server.start()
self.notification_server.start()
if not cfg.CONF.collector.udp_address: if not cfg.CONF.collector.udp_address:
# Add a dummy thread to have wait() working # Add a dummy thread to have wait() working
@ -94,8 +105,20 @@ class CollectorService(os_service.Service):
self.udp_run = False self.udp_run = False
if self.rpc_server: if self.rpc_server:
self.rpc_server.stop() self.rpc_server.stop()
if self.notification_server:
self.notification_server.stop()
super(CollectorService, self).stop() super(CollectorService, self).stop()
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
"""RPC endpoint for notification messages
When another service sends a notification over the message
bus, this method receives it.
"""
self.dispatcher_manager.map_method('record_metering_data',
data=payload)
def record_metering_data(self, context, data): def record_metering_data(self, context, data):
"""RPC endpoint for messages we send to ourselves. """RPC endpoint for messages we send to ourselves.

View File

@ -17,13 +17,14 @@
"""Publish a sample using the preferred RPC mechanism. """Publish a sample using the preferred RPC mechanism.
""" """
import abc
import itertools import itertools
import operator import operator
from oslo.config import cfg from oslo.config import cfg
import oslo.messaging import oslo.messaging
import oslo.messaging._drivers.common import oslo.messaging._drivers.common
import six
import six.moves.urllib.parse as urlparse import six.moves.urllib.parse as urlparse
from ceilometer import messaging from ceilometer import messaging
@ -35,7 +36,7 @@ from ceilometer.publisher import utils
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
METER_PUBLISH_OPTS = [ METER_PUBLISH_RPC_OPTS = [
cfg.StrOpt('metering_topic', cfg.StrOpt('metering_topic',
default='metering', default='metering',
help='The topic that ceilometer uses for metering messages.', help='The topic that ceilometer uses for metering messages.',
@ -43,13 +44,24 @@ METER_PUBLISH_OPTS = [
), ),
] ]
METER_PUBLISH_NOTIFIER_OPTS = [
cfg.StrOpt('metering_topic',
default='metering',
help='The topic that ceilometer uses for metering '
'notifications.',
),
cfg.StrOpt('metering_driver',
default='messagingv2',
help='The driver that ceilometer uses for metering '
'notifications.',
)
]
def register_opts(config): cfg.CONF.register_opts(METER_PUBLISH_RPC_OPTS,
"""Register the options for publishing metering messages.""" group="publisher_rpc")
config.register_opts(METER_PUBLISH_OPTS, group="publisher_rpc") cfg.CONF.register_opts(METER_PUBLISH_NOTIFIER_OPTS,
group="publisher_notifier")
cfg.CONF.import_opt('host', 'ceilometer.service')
register_opts(cfg.CONF)
def oslo_messaging_is_rabbit(): def oslo_messaging_is_rabbit():
@ -76,7 +88,8 @@ def override_backend_retry_config(value):
cfg.CONF.set_override('rabbit_max_retries', value) cfg.CONF.set_override('rabbit_max_retries', value)
class RPCPublisher(publisher.PublisherBase): @six.add_metaclass(abc.ABCMeta)
class MessagingPublisher(publisher.PublisherBase):
def __init__(self, parsed_url): def __init__(self, parsed_url):
options = urlparse.parse_qs(parsed_url.query) options = urlparse.parse_qs(parsed_url.query)
@ -86,8 +99,6 @@ class RPCPublisher(publisher.PublisherBase):
self.per_meter_topic = bool(int( self.per_meter_topic = bool(int(
options.get('per_meter_topic', [0])[-1])) options.get('per_meter_topic', [0])[-1]))
self.target = options.get('target', ['record_metering_data'])[0]
self.policy = options.get('policy', ['default'])[-1] self.policy = options.get('policy', ['default'])[-1]
self.max_queue_length = int(options.get( self.max_queue_length = int(options.get(
'max_queue_length', [1024])[-1]) 'max_queue_length', [1024])[-1])
@ -105,9 +116,6 @@ class RPCPublisher(publisher.PublisherBase):
% self.policy) % self.policy)
self.policy = 'default' self.policy = 'default'
transport = messaging.get_transport()
self.rpc_client = messaging.get_rpc_client(transport, version='1.0')
def publish_samples(self, context, samples): def publish_samples(self, context, samples):
"""Publish samples on RPC. """Publish samples on RPC.
@ -174,8 +182,7 @@ class RPCPublisher(publisher.PublisherBase):
while queue: while queue:
context, topic, meters = queue[0] context, topic, meters = queue[0]
try: try:
self.rpc_client.prepare(topic=topic).cast( self._send(context, topic, meters)
context, self.target, data=meters)
except oslo.messaging._drivers.common.RPCException: except oslo.messaging._drivers.common.RPCException:
samples = sum([len(m) for __, __, m in queue]) samples = sum([len(m) for __, __, m in queue])
if policy == 'queue': if policy == 'queue':
@ -191,3 +198,39 @@ class RPCPublisher(publisher.PublisherBase):
else: else:
queue.pop(0) queue.pop(0)
return [] return []
@abc.abstractmethod
def _send(self, context, topic, meters):
"""Send the meters to the messaging topic."""
class RPCPublisher(MessagingPublisher):
def __init__(self, parsed_url):
super(RPCPublisher, self).__init__(parsed_url)
options = urlparse.parse_qs(parsed_url.query)
self.target = options.get('target', ['record_metering_data'])[0]
self.rpc_client = messaging.get_rpc_client(
messaging.get_transport(),
version='1.0'
)
def _send(self, context, topic, meters):
self.rpc_client.prepare(topic=topic).cast(context, self.target,
data=meters)
class NotifierPublisher(MessagingPublisher):
def __init__(self, parsed_url):
super(NotifierPublisher, self).__init__(parsed_url)
self.notifier = oslo.messaging.Notifier(
messaging.get_transport(),
driver=cfg.CONF.publisher_notifier.metering_driver,
publisher_id='metering.publisher.%s' % cfg.CONF.host,
topic=cfg.CONF.publisher_notifier.metering_topic
)
def _send(self, context, event_type, meters):
self.notifier.sample(context.to_dict(), event_type=event_type,
payload=meters)

View File

@ -30,23 +30,17 @@ from ceilometer.tests import db as tests_db
class TestPostSamples(v2.FunctionalTest, class TestPostSamples(v2.FunctionalTest,
tests_db.MixinTestsWithBackendScenarios): tests_db.MixinTestsWithBackendScenarios):
def fake_cast(self, ctxt, target, data): def fake_notifier_sample(self, ctxt, event_type, payload):
for m in data: for m in payload:
del m['message_signature'] del m['message_signature']
self.published.append(data) self.published.append(payload)
def fake_get_rpc_client(self, *args, **kwargs):
cast_ctxt = mock.Mock()
cast_ctxt.cast.side_effect = self.fake_cast
client = mock.Mock()
client.prepare.return_value = cast_ctxt
return client
def setUp(self): def setUp(self):
self.published = [] self.published = []
self.useFixture(mockpatch.Patch( notifier = mock.Mock()
'ceilometer.messaging.get_rpc_client', notifier.sample.side_effect = self.fake_notifier_sample
new=self.fake_get_rpc_client)) self.useFixture(mockpatch.Patch('oslo.messaging.Notifier',
return_value=notifier))
super(TestPostSamples, self).setUp() super(TestPostSamples, self).setUp()
def test_one(self): def test_one(self):

View File

@ -15,7 +15,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
"""Tests for ceilometer/publisher/rpc.py """Tests for ceilometer/publisher/messaging.py
""" """
import datetime import datetime
@ -25,15 +25,16 @@ from oslo.config import fixture as fixture_config
import oslo.messaging import oslo.messaging
import oslo.messaging._drivers.common import oslo.messaging._drivers.common
from oslo.utils import netutils from oslo.utils import netutils
import testscenarios.testcase
from ceilometer import messaging from ceilometer import messaging
from ceilometer.openstack.common import context from ceilometer.openstack.common import context
from ceilometer.publisher import rpc from ceilometer.publisher import messaging as msg_publisher
from ceilometer import sample from ceilometer import sample
from ceilometer.tests import base as tests_base from ceilometer.tests import base as tests_base
class TestPublish(tests_base.BaseTestCase): class BasePublisherTestCase(tests_base.BaseTestCase):
test_data = [ test_data = [
sample.Sample( sample.Sample(
name='test', name='test',
@ -93,13 +94,15 @@ class TestPublish(tests_base.BaseTestCase):
] ]
def setUp(self): def setUp(self):
super(TestPublish, self).setUp() super(BasePublisherTestCase, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf self.CONF = self.useFixture(fixture_config.Config()).conf
self.setup_messaging(self.CONF) self.setup_messaging(self.CONF)
self.published = [] self.published = []
class RpcOnlyPublisherTest(BasePublisherTestCase):
def test_published_no_mock(self): def test_published_no_mock(self):
publisher = rpc.RPCPublisher( publisher = msg_publisher.RPCPublisher(
netutils.urlsplit('rpc://')) netutils.urlsplit('rpc://'))
endpoint = mock.MagicMock(['record_metering_data']) endpoint = mock.MagicMock(['record_metering_data'])
@ -126,7 +129,7 @@ class TestPublish(tests_base.BaseTestCase):
mock.ANY, data=Matcher()) mock.ANY, data=Matcher())
def test_publish_target(self): def test_publish_target(self):
publisher = rpc.RPCPublisher( publisher = msg_publisher.RPCPublisher(
netutils.urlsplit('rpc://?target=custom_procedure_call')) netutils.urlsplit('rpc://?target=custom_procedure_call'))
cast_context = mock.MagicMock() cast_context = mock.MagicMock()
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
@ -140,7 +143,7 @@ class TestPublish(tests_base.BaseTestCase):
mock.ANY, 'custom_procedure_call', data=mock.ANY) mock.ANY, 'custom_procedure_call', data=mock.ANY)
def test_published_with_per_meter_topic(self): def test_published_with_per_meter_topic(self):
publisher = rpc.RPCPublisher( publisher = msg_publisher.RPCPublisher(
netutils.urlsplit('rpc://?per_meter_topic=1')) netutils.urlsplit('rpc://?per_meter_topic=1'))
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
publisher.publish_samples(mock.MagicMock(), publisher.publish_samples(mock.MagicMock(),
@ -166,23 +169,29 @@ class TestPublish(tests_base.BaseTestCase):
data=MeterGroupMatcher())] data=MeterGroupMatcher())]
self.assertEqual(expected, prepare.mock_calls) self.assertEqual(expected, prepare.mock_calls)
class TestPublisher(testscenarios.testcase.WithScenarios,
BasePublisherTestCase):
scenarios = [
('notifier', dict(protocol="notifier",
publisher_cls=msg_publisher.NotifierPublisher)),
('rpc', dict(protocol="rpc",
publisher_cls=msg_publisher.RPCPublisher)),
]
def test_published_concurrency(self): def test_published_concurrency(self):
"""Test concurrent access to the local queue of the rpc publisher.""" """Test concurrent access to the local queue of the rpc publisher."""
publisher = rpc.RPCPublisher(netutils.urlsplit('rpc://')) publisher = self.publisher_cls(
cast_context = mock.MagicMock() netutils.urlsplit('%s://' % self.protocol))
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: with mock.patch.object(publisher, '_send') as fake_send:
def fake_prepare_go(topic): def fake_send_wait(ctxt, topic, meters):
return cast_context fake_send.side_effect = mock.Mock()
def fake_prepare_wait(topic):
prepare.side_effect = fake_prepare_go
# Sleep to simulate concurrency and allow other threads to work # Sleep to simulate concurrency and allow other threads to work
eventlet.sleep(0) eventlet.sleep(0)
return cast_context
prepare.side_effect = fake_prepare_wait fake_send.side_effect = fake_send_wait
job1 = eventlet.spawn(publisher.publish_samples, job1 = eventlet.spawn(publisher.publish_samples,
mock.MagicMock(), self.test_data) mock.MagicMock(), self.test_data)
@ -193,16 +202,16 @@ class TestPublish(tests_base.BaseTestCase):
job2.wait() job2.wait()
self.assertEqual('default', publisher.policy) self.assertEqual('default', publisher.policy)
self.assertEqual(2, len(cast_context.cast.mock_calls)) self.assertEqual(2, len(fake_send.mock_calls))
self.assertEqual(0, len(publisher.local_queue)) self.assertEqual(0, len(publisher.local_queue))
@mock.patch('ceilometer.publisher.rpc.LOG') @mock.patch('ceilometer.publisher.messaging.LOG')
def test_published_with_no_policy(self, mylog): def test_published_with_no_policy(self, mylog):
publisher = rpc.RPCPublisher( publisher = self.publisher_cls(
netutils.urlsplit('rpc://')) netutils.urlsplit('%s://' % self.protocol))
side_effect = oslo.messaging._drivers.common.RPCException() side_effect = oslo.messaging._drivers.common.RPCException()
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: with mock.patch.object(publisher, '_send') as fake_send:
prepare.side_effect = side_effect fake_send.side_effect = side_effect
self.assertRaises( self.assertRaises(
oslo.messaging._drivers.common.RPCException, oslo.messaging._drivers.common.RPCException,
@ -211,32 +220,34 @@ class TestPublish(tests_base.BaseTestCase):
self.assertTrue(mylog.info.called) self.assertTrue(mylog.info.called)
self.assertEqual('default', publisher.policy) self.assertEqual('default', publisher.policy)
self.assertEqual(0, len(publisher.local_queue)) self.assertEqual(0, len(publisher.local_queue))
prepare.assert_called_once_with( fake_send.assert_called_once_with(
topic=self.CONF.publisher_rpc.metering_topic) mock.ANY, self.CONF.publisher_rpc.metering_topic,
mock.ANY)
@mock.patch('ceilometer.publisher.rpc.LOG') @mock.patch('ceilometer.publisher.messaging.LOG')
def test_published_with_policy_block(self, mylog): def test_published_with_policy_block(self, mylog):
publisher = rpc.RPCPublisher( publisher = self.publisher_cls(
netutils.urlsplit('rpc://?policy=default')) netutils.urlsplit('%s://?policy=default' % self.protocol))
side_effect = oslo.messaging._drivers.common.RPCException() side_effect = oslo.messaging._drivers.common.RPCException()
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: with mock.patch.object(publisher, '_send') as fake_send:
prepare.side_effect = side_effect fake_send.side_effect = side_effect
self.assertRaises( self.assertRaises(
oslo.messaging._drivers.common.RPCException, oslo.messaging._drivers.common.RPCException,
publisher.publish_samples, publisher.publish_samples,
mock.MagicMock(), self.test_data) mock.MagicMock(), self.test_data)
self.assertTrue(mylog.info.called) self.assertTrue(mylog.info.called)
self.assertEqual(0, len(publisher.local_queue)) self.assertEqual(0, len(publisher.local_queue))
prepare.assert_called_once_with( fake_send.assert_called_once_with(
topic=self.CONF.publisher_rpc.metering_topic) mock.ANY, self.CONF.publisher_rpc.metering_topic,
mock.ANY)
@mock.patch('ceilometer.publisher.rpc.LOG') @mock.patch('ceilometer.publisher.messaging.LOG')
def test_published_with_policy_incorrect(self, mylog): def test_published_with_policy_incorrect(self, mylog):
publisher = rpc.RPCPublisher( publisher = self.publisher_cls(
netutils.urlsplit('rpc://?policy=notexist')) netutils.urlsplit('%s://?policy=notexist' % self.protocol))
side_effect = oslo.messaging._drivers.common.RPCException() side_effect = oslo.messaging._drivers.common.RPCException()
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: with mock.patch.object(publisher, '_send') as fake_send:
prepare.side_effect = side_effect fake_send.side_effect = side_effect
self.assertRaises( self.assertRaises(
oslo.messaging._drivers.common.RPCException, oslo.messaging._drivers.common.RPCException,
publisher.publish_samples, publisher.publish_samples,
@ -244,66 +255,69 @@ class TestPublish(tests_base.BaseTestCase):
self.assertTrue(mylog.warn.called) self.assertTrue(mylog.warn.called)
self.assertEqual('default', publisher.policy) self.assertEqual('default', publisher.policy)
self.assertEqual(0, len(publisher.local_queue)) self.assertEqual(0, len(publisher.local_queue))
prepare.assert_called_once_with( fake_send.assert_called_once_with(
topic=self.CONF.publisher_rpc.metering_topic) mock.ANY, self.CONF.publisher_rpc.metering_topic,
mock.ANY)
def test_published_with_policy_drop_and_rpc_down(self): def test_published_with_policy_drop_and_rpc_down(self):
publisher = rpc.RPCPublisher( publisher = self.publisher_cls(
netutils.urlsplit('rpc://?policy=drop')) netutils.urlsplit('%s://?policy=drop' % self.protocol))
side_effect = oslo.messaging._drivers.common.RPCException() side_effect = oslo.messaging._drivers.common.RPCException()
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: with mock.patch.object(publisher, '_send') as fake_send:
prepare.side_effect = side_effect fake_send.side_effect = side_effect
publisher.publish_samples(mock.MagicMock(), publisher.publish_samples(mock.MagicMock(),
self.test_data) self.test_data)
self.assertEqual(0, len(publisher.local_queue)) self.assertEqual(0, len(publisher.local_queue))
prepare.assert_called_once_with( fake_send.assert_called_once_with(
topic=self.CONF.publisher_rpc.metering_topic) mock.ANY, self.CONF.publisher_rpc.metering_topic,
mock.ANY)
def test_published_with_policy_queue_and_rpc_down(self): def test_published_with_policy_queue_and_rpc_down(self):
publisher = rpc.RPCPublisher( publisher = self.publisher_cls(
netutils.urlsplit('rpc://?policy=queue')) netutils.urlsplit('%s://?policy=queue' % self.protocol))
side_effect = oslo.messaging._drivers.common.RPCException() side_effect = oslo.messaging._drivers.common.RPCException()
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: with mock.patch.object(publisher, '_send') as fake_send:
prepare.side_effect = side_effect fake_send.side_effect = side_effect
publisher.publish_samples(mock.MagicMock(), publisher.publish_samples(mock.MagicMock(),
self.test_data) self.test_data)
self.assertEqual(1, len(publisher.local_queue)) self.assertEqual(1, len(publisher.local_queue))
prepare.assert_called_once_with( fake_send.assert_called_once_with(
topic=self.CONF.publisher_rpc.metering_topic) mock.ANY, self.CONF.publisher_rpc.metering_topic,
mock.ANY)
def test_published_with_policy_queue_and_rpc_down_up(self): def test_published_with_policy_queue_and_rpc_down_up(self):
self.rpc_unreachable = True self.rpc_unreachable = True
publisher = rpc.RPCPublisher( publisher = self.publisher_cls(
netutils.urlsplit('rpc://?policy=queue')) netutils.urlsplit('%s://?policy=queue' % self.protocol))
side_effect = oslo.messaging._drivers.common.RPCException() side_effect = oslo.messaging._drivers.common.RPCException()
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: with mock.patch.object(publisher, '_send') as fake_send:
prepare.side_effect = side_effect fake_send.side_effect = side_effect
publisher.publish_samples(mock.MagicMock(), publisher.publish_samples(mock.MagicMock(),
self.test_data) self.test_data)
self.assertEqual(1, len(publisher.local_queue)) self.assertEqual(1, len(publisher.local_queue))
prepare.side_effect = mock.MagicMock() fake_send.side_effect = mock.MagicMock()
publisher.publish_samples(mock.MagicMock(), publisher.publish_samples(mock.MagicMock(),
self.test_data) self.test_data)
self.assertEqual(0, len(publisher.local_queue)) self.assertEqual(0, len(publisher.local_queue))
topic = self.CONF.publisher_rpc.metering_topic topic = self.CONF.publisher_rpc.metering_topic
expected = [mock.call(topic=topic), expected = [mock.call(mock.ANY, topic, mock.ANY),
mock.call(topic=topic), mock.call(mock.ANY, topic, mock.ANY),
mock.call(topic=topic)] mock.call(mock.ANY, topic, mock.ANY)]
self.assertEqual(expected, prepare.mock_calls) self.assertEqual(expected, fake_send.mock_calls)
def test_published_with_policy_sized_queue_and_rpc_down(self): def test_published_with_policy_sized_queue_and_rpc_down(self):
publisher = rpc.RPCPublisher( publisher = self.publisher_cls(netutils.urlsplit(
netutils.urlsplit('rpc://?policy=queue&max_queue_length=3')) '%s://?policy=queue&max_queue_length=3' % self.protocol))
side_effect = oslo.messaging._drivers.common.RPCException() side_effect = oslo.messaging._drivers.common.RPCException()
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: with mock.patch.object(publisher, '_send') as fake_send:
prepare.side_effect = side_effect fake_send.side_effect = side_effect
for i in range(0, 5): for i in range(0, 5):
for s in self.test_data: for s in self.test_data:
s.source = 'test-%d' % i s.source = 'test-%d' % i
@ -325,12 +339,12 @@ class TestPublish(tests_base.BaseTestCase):
) )
def test_published_with_policy_default_sized_queue_and_rpc_down(self): def test_published_with_policy_default_sized_queue_and_rpc_down(self):
publisher = rpc.RPCPublisher( publisher = self.publisher_cls(
netutils.urlsplit('rpc://?policy=queue')) netutils.urlsplit('%s://?policy=queue' % self.protocol))
side_effect = oslo.messaging._drivers.common.RPCException() side_effect = oslo.messaging._drivers.common.RPCException()
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: with mock.patch.object(publisher, '_send') as fake_send:
prepare.side_effect = side_effect fake_send.side_effect = side_effect
for i in range(0, 2000): for i in range(0, 2000):
for s in self.test_data: for s in self.test_data:
s.source = 'test-%d' % i s.source = 'test-%d' % i

View File

@ -200,7 +200,8 @@ class TestCollector(tests_base.BaseTestCase):
"""Check that only RPC is started if udp_address is empty.""" """Check that only RPC is started if udp_address is empty."""
self.CONF.set_override('udp_address', '', group='collector') self.CONF.set_override('udp_address', '', group='collector')
self.srv.start() self.srv.start()
self.assertEqual(1, rpc_start.call_count) # two calls because two servers (notification and rpc)
self.assertEqual(2, rpc_start.call_count)
self.assertEqual(0, udp_start.call_count) self.assertEqual(0, udp_start.call_count)
def test_udp_receive_valid_encoding(self): def test_udp_receive_valid_encoding(self):

View File

@ -98,7 +98,7 @@ sinks:
- name: meter_sink - name: meter_sink
transformers: transformers:
publishers: publishers:
- rpc:// - notifier://
- name: cpu_sink - name: cpu_sink
transformers: transformers:
- name: "rate_of_change" - name: "rate_of_change"
@ -109,7 +109,7 @@ sinks:
type: "gauge" type: "gauge"
scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
publishers: publishers:
- rpc:// - notifier://
- name: disk_sink - name: disk_sink
transformers: transformers:
- name: "rate_of_change" - name: "rate_of_change"
@ -124,7 +124,7 @@ sinks:
unit: "\\1/s" unit: "\\1/s"
type: "gauge" type: "gauge"
publishers: publishers:
- rpc:// - notifier://
- name: network_sink - name: network_sink
transformers: transformers:
- name: "rate_of_change" - name: "rate_of_change"
@ -139,4 +139,4 @@ sinks:
unit: "\\1/s" unit: "\\1/s"
type: "gauge" type: "gauge"
publishers: publishers:
- rpc:// - notifier://

View File

@ -212,9 +212,10 @@ ceilometer.transformer =
ceilometer.publisher = ceilometer.publisher =
test = ceilometer.publisher.test:TestPublisher test = ceilometer.publisher.test:TestPublisher
meter_publisher = ceilometer.publisher.rpc:RPCPublisher meter_publisher = ceilometer.publisher.messaging:RPCPublisher
meter = ceilometer.publisher.rpc:RPCPublisher meter = ceilometer.publisher.messaging:RPCPublisher
rpc = ceilometer.publisher.rpc:RPCPublisher rpc = ceilometer.publisher.messaging:RPCPublisher
notifier = ceilometer.publisher.messaging:NotifierPublisher
udp = ceilometer.publisher.udp:UDPPublisher udp = ceilometer.publisher.udp:UDPPublisher
file = ceilometer.publisher.file:FilePublisher file = ceilometer.publisher.file:FilePublisher