add event listener to collector

enable a listener in collector that listens to events queue. this
patch also renames 'metering_*' options which are now used by both
events and samples.

Change-Id: I6250c91e913864c3973e2d93022e38ad8bed8328
Implements: blueprint notification-pipelines
This commit is contained in:
gordon chung 2015-01-16 14:03:13 -05:00
parent de0c2944b7
commit 7fc0d1e6c7
25 changed files with 225 additions and 174 deletions

View File

@ -175,7 +175,7 @@ class NotificationBase(PluginBase):
if self.requeue: if self.requeue:
meters = [ meters = [
utils.meter_message_from_counter( utils.meter_message_from_counter(
sample, cfg.CONF.publisher.metering_secret) sample, cfg.CONF.publisher.telemetry_secret)
for sample in self.process_notification(notification) for sample in self.process_notification(notification)
] ]
for notifier in self.transporter: for notifier in self.transporter:

View File

@ -18,13 +18,16 @@ import socket
import msgpack import msgpack
import oslo.messaging import oslo.messaging
from oslo_config import cfg from oslo_config import cfg
from oslo_utils import timeutils
from oslo_utils import units from oslo_utils import units
from ceilometer import dispatcher from ceilometer import dispatcher
from ceilometer.event.storage import models
from ceilometer import messaging from ceilometer import messaging
from ceilometer.i18n import _, _LE from ceilometer.i18n import _, _LE
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service from ceilometer.openstack.common import service as os_service
from ceilometer import utils
OPTS = [ OPTS = [
cfg.StrOpt('udp_address', cfg.StrOpt('udp_address',
@ -39,13 +42,21 @@ OPTS = [
help='Requeue the sample on the collector sample queue ' help='Requeue the sample on the collector sample queue '
'when the collector fails to dispatch it. This is only valid ' 'when the collector fails to dispatch it. This is only valid '
'if the sample come from the notifier publisher.'), 'if the sample come from the notifier publisher.'),
cfg.BoolOpt('requeue_event_on_dispatcher_error',
default=False,
help='Requeue the event on the collector event queue '
'when the collector fails to dispatch it.'),
] ]
cfg.CONF.register_opts(OPTS, group="collector") cfg.CONF.register_opts(OPTS, group="collector")
cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging', cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging',
group="publisher_rpc") group='publisher_rpc')
cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging', cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging',
group="publisher_notifier") group='publisher_notifier')
cfg.CONF.import_opt('event_topic', 'ceilometer.publisher.messaging',
group='publisher_notifier')
cfg.CONF.import_opt('store_events', 'ceilometer.notification',
group='notification')
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -58,26 +69,38 @@ class CollectorService(os_service.Service):
# ensure dispatcher is configured before starting other services # ensure dispatcher is configured before starting other services
self.dispatcher_manager = dispatcher.load_dispatcher_manager() self.dispatcher_manager = dispatcher.load_dispatcher_manager()
self.rpc_server = None self.rpc_server = None
self.notification_server = None self.sample_listener = None
self.event_listener = None
super(CollectorService, self).start() super(CollectorService, self).start()
if cfg.CONF.collector.udp_address: if cfg.CONF.collector.udp_address:
self.tg.add_thread(self.start_udp) self.tg.add_thread(self.start_udp)
allow_requeue = cfg.CONF.collector.requeue_sample_on_dispatcher_error
transport = messaging.get_transport(optional=True) transport = messaging.get_transport(optional=True)
if transport: if transport:
self.rpc_server = messaging.get_rpc_server( self.rpc_server = messaging.get_rpc_server(
transport, cfg.CONF.publisher_rpc.metering_topic, self) transport, cfg.CONF.publisher_rpc.metering_topic, self)
target = oslo.messaging.Target( sample_target = oslo.messaging.Target(
topic=cfg.CONF.publisher_notifier.metering_topic) topic=cfg.CONF.publisher_notifier.metering_topic)
self.notification_server = messaging.get_notification_listener( self.sample_listener = messaging.get_notification_listener(
transport, [target], [self], transport, [sample_target],
allow_requeue=allow_requeue) [SampleEndpoint(self.dispatcher_manager)],
allow_requeue=(cfg.CONF.collector.
requeue_sample_on_dispatcher_error))
if cfg.CONF.notification.store_events:
event_target = oslo.messaging.Target(
topic=cfg.CONF.publisher_notifier.event_topic)
self.event_listener = messaging.get_notification_listener(
transport, [event_target],
[EventEndpoint(self.dispatcher_manager)],
allow_requeue=(cfg.CONF.collector.
requeue_event_on_dispatcher_error))
self.event_listener.start()
self.rpc_server.start() self.rpc_server.start()
self.notification_server.start() self.sample_listener.start()
if not cfg.CONF.collector.udp_address: if not cfg.CONF.collector.udp_address:
# Add a dummy thread to have wait() working # Add a dummy thread to have wait() working
@ -110,27 +133,12 @@ class CollectorService(os_service.Service):
self.udp_run = False self.udp_run = False
if self.rpc_server: if self.rpc_server:
self.rpc_server.stop() self.rpc_server.stop()
if self.notification_server: if self.sample_listener:
self.notification_server.stop() utils.kill_listeners([self.sample_listener])
if self.event_listener:
utils.kill_listeners([self.event_listener])
super(CollectorService, self).stop() super(CollectorService, self).stop()
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
"""RPC endpoint for notification messages
When another service sends a notification over the message
bus, this method receives it.
"""
try:
self.dispatcher_manager.map_method('record_metering_data',
data=payload)
except Exception:
if cfg.CONF.collector.requeue_sample_on_dispatcher_error:
LOG.exception(_LE("Dispatcher failed to handle the sample, "
"requeue it."))
return oslo.messaging.NotificationResult.REQUEUE
raise
def record_metering_data(self, context, data): def record_metering_data(self, context, data):
"""RPC endpoint for messages we send to ourselves. """RPC endpoint for messages we send to ourselves.
@ -138,3 +146,65 @@ class CollectorService(os_service.Service):
RPC publisher, this method receives them for processing. RPC publisher, this method receives them for processing.
""" """
self.dispatcher_manager.map_method('record_metering_data', data=data) self.dispatcher_manager.map_method('record_metering_data', data=data)
class CollectorEndpoint(object):
def __init__(self, dispatcher_manager, requeue_on_error):
self.dispatcher_manager = dispatcher_manager
self.requeue_on_error = requeue_on_error
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
"""RPC endpoint for notification messages
When another service sends a notification over the message
bus, this method receives it.
"""
try:
self.dispatcher_manager.map_method(self.method, payload)
except Exception:
if self.requeue_on_error:
LOG.exception(_LE("Dispatcher failed to handle the %s, "
"requeue it."), self.ep_type)
return oslo.messaging.NotificationResult.REQUEUE
raise
class SampleEndpoint(CollectorEndpoint):
method = 'record_metering_data'
ep_type = 'sample'
def __init__(self, dispatcher_manager):
super(SampleEndpoint, self).__init__(
dispatcher_manager,
cfg.CONF.collector.requeue_sample_on_dispatcher_error)
class EventEndpoint(CollectorEndpoint):
method = 'record_events'
ep_type = 'event'
def __init__(self, dispatcher_manager):
super(EventEndpoint, self).__init__(
dispatcher_manager,
cfg.CONF.collector.requeue_event_on_dispatcher_error)
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
events = []
for ev in payload:
try:
events.append(
models.Event(
message_id=ev['message_id'],
event_type=ev['event_type'],
generated=timeutils.normalize_time(
timeutils.parse_isotime(ev['generated'])),
traits=[models.Trait(
name, dtype,
models.Trait.convert_value(dtype, value))
for name, dtype, value in ev['traits']])
)
except Exception:
LOG.exception(_LE("Error processing event and it will be "
"dropped: %s"), ev)
return super(EventEndpoint, self).sample(
ctxt, publisher_id, event_type, events, metadata)

View File

@ -78,8 +78,7 @@ class DatabaseDispatcher(dispatcher.Base):
'timestamp': meter.get('timestamp', 'NO TIMESTAMP'), 'timestamp': meter.get('timestamp', 'NO TIMESTAMP'),
'counter_volume': meter['counter_volume']})) 'counter_volume': meter['counter_volume']}))
if publisher_utils.verify_signature( if publisher_utils.verify_signature(
meter, meter, self.conf.publisher.telemetry_secret):
self.conf.publisher.metering_secret):
try: try:
# Convert the timestamp to a datetime instance. # Convert the timestamp to a datetime instance.
# Storage engines are responsible for converting # Storage engines are responsible for converting

View File

@ -85,8 +85,7 @@ class HttpDispatcher(dispatcher.Base):
'timestamp': meter.get('timestamp', 'NO TIMESTAMP'), 'timestamp': meter.get('timestamp', 'NO TIMESTAMP'),
'counter_volume': meter['counter_volume']})) 'counter_volume': meter['counter_volume']}))
if publisher_utils.verify_signature( if publisher_utils.verify_signature(
meter, meter, self.conf.publisher.telemetry_secret):
self.conf.publisher.metering_secret):
try: try:
if self.cadf_only: if self.cadf_only:
# Only cadf messages are being wanted. # Only cadf messages are being wanted.

View File

@ -68,7 +68,7 @@ class EventsNotificationEndpoint(object):
self.ctxt.to_dict(), self.ctxt.to_dict(),
event_type='pipeline.event', event_type='pipeline.event',
payload=[utils.message_from_event( payload=[utils.message_from_event(
event, cfg.CONF.publisher.metering_secret)]) event, cfg.CONF.publisher.telemetry_secret)])
else: else:
with self.transporter.publisher(self.ctxt) as p: with self.transporter.publisher(self.ctxt) as p:
p(event) p(event)

View File

@ -25,6 +25,7 @@ from ceilometer import messaging
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service from ceilometer.openstack.common import service as os_service
from ceilometer import pipeline from ceilometer import pipeline
from ceilometer import utils
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -52,7 +53,7 @@ OPTS = [
] ]
cfg.CONF.register_opts(OPTS, group="notification") cfg.CONF.register_opts(OPTS, group="notification")
cfg.CONF.import_opt('metering_driver', 'ceilometer.publisher.messaging', cfg.CONF.import_opt('telemetry_driver', 'ceilometer.publisher.messaging',
group='publisher_notifier') group='publisher_notifier')
@ -80,7 +81,7 @@ class NotificationService(os_service.Service):
def _get_notifier(self, transport, pipe): def _get_notifier(self, transport, pipe):
return oslo.messaging.Notifier( return oslo.messaging.Notifier(
transport, transport,
driver=cfg.CONF.publisher_notifier.metering_driver, driver=cfg.CONF.publisher_notifier.telemetry_driver,
publisher_id='ceilometer.notification', publisher_id='ceilometer.notification',
topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name)) topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name))
@ -174,17 +175,8 @@ class NotificationService(os_service.Service):
listener.start() listener.start()
self.listeners.append(listener) self.listeners.append(listener)
@staticmethod
def _kill_listeners(listeners):
# NOTE(gordc): correct usage of oslo.messaging listener is to stop(),
# which stops new messages, and wait(), which processes remaining
# messages and closes connection
for listener in listeners:
listener.stop()
listener.wait()
def _refresh_agent(self, event): def _refresh_agent(self, event):
self._kill_listeners(self.pipeline_listeners) utils.kill_listeners(self.pipeline_listeners)
self._configure_pipeline_listeners() self._configure_pipeline_listeners()
def _configure_pipeline_listeners(self): def _configure_pipeline_listeners(self):
@ -210,5 +202,5 @@ class NotificationService(os_service.Service):
def stop(self): def stop(self):
self.partition_coordinator.leave_group(self.group_id) self.partition_coordinator.leave_group(self.group_id)
self._kill_listeners(self.listeners + self.pipeline_listeners) utils.kill_listeners(self.listeners + self.pipeline_listeners)
super(NotificationService, self).stop() super(NotificationService, self).stop()

View File

@ -42,8 +42,7 @@ class DirectPublisher(publisher.PublisherBase):
# Transform the Sample objects into a list of dicts # Transform the Sample objects into a list of dicts
meters = [ meters = [
utils.meter_message_from_counter( utils.meter_message_from_counter(
sample, sample, cfg.CONF.publisher.telemetry_secret)
cfg.CONF.publisher.metering_secret)
for sample in samples for sample in samples
] ]

View File

@ -52,10 +52,11 @@ NOTIFIER_OPTS = [
help='The topic that ceilometer uses for event ' help='The topic that ceilometer uses for event '
'notifications.', 'notifications.',
), ),
cfg.StrOpt('metering_driver', cfg.StrOpt('telemetry_driver',
default='messagingv2', default='messagingv2',
help='The driver that ceilometer uses for metering ' help='The driver that ceilometer uses for metering '
'notifications.', 'notifications.',
deprecated_name='metering_driver',
) )
] ]
@ -102,8 +103,7 @@ class MessagingPublisher(publisher.PublisherBase):
meters = [ meters = [
utils.meter_message_from_counter( utils.meter_message_from_counter(
sample, sample, cfg.CONF.publisher.telemetry_secret)
cfg.CONF.publisher.metering_secret)
for sample in samples for sample in samples
] ]
topic = cfg.CONF.publisher_rpc.metering_topic topic = cfg.CONF.publisher_rpc.metering_topic
@ -171,7 +171,7 @@ class MessagingPublisher(publisher.PublisherBase):
:param events: events from pipeline after transformation :param events: events from pipeline after transformation
""" """
ev_list = [utils.message_from_event( ev_list = [utils.message_from_event(
event, cfg.CONF.publisher.metering_secret) for event in events] event, cfg.CONF.publisher.telemetry_secret) for event in events]
topic = cfg.CONF.publisher_notifier.event_topic topic = cfg.CONF.publisher_notifier.event_topic
self.local_queue.append((context, topic, ev_list)) self.local_queue.append((context, topic, ev_list))
@ -204,7 +204,7 @@ class NotifierPublisher(MessagingPublisher):
super(NotifierPublisher, self).__init__(parsed_url) super(NotifierPublisher, self).__init__(parsed_url)
self.notifier = oslo.messaging.Notifier( self.notifier = oslo.messaging.Notifier(
messaging.get_transport(), messaging.get_transport(),
driver=cfg.CONF.publisher_notifier.metering_driver, driver=cfg.CONF.publisher_notifier.telemetry_driver,
publisher_id='telemetry.publisher.%s' % cfg.CONF.host, publisher_id='telemetry.publisher.%s' % cfg.CONF.host,
topic=topic, topic=topic,
retry=self.retry retry=self.retry

View File

@ -50,8 +50,7 @@ class UDPPublisher(publisher.PublisherBase):
for sample in samples: for sample in samples:
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
sample, sample, cfg.CONF.publisher.telemetry_secret)
cfg.CONF.publisher.metering_secret)
host = self.host host = self.host
port = self.port port = self.port
LOG.debug(_("Publishing sample %(msg)s over UDP to " LOG.debug(_("Publishing sample %(msg)s over UDP to "

View File

@ -24,14 +24,16 @@ import six
from ceilometer import utils from ceilometer import utils
OPTS = [ OPTS = [
cfg.StrOpt('metering_secret', cfg.StrOpt('telemetry_secret',
secret=True, secret=True,
default='change this or be hacked', default='change this or be hacked',
help='Secret value for signing metering messages.', help='Secret value for signing metering messages.',
deprecated_opts=[cfg.DeprecatedOpt("metering_secret", deprecated_opts=[cfg.DeprecatedOpt("metering_secret",
"DEFAULT"), "DEFAULT"),
cfg.DeprecatedOpt("metering_secret", cfg.DeprecatedOpt("metering_secret",
"publisher_rpc")] "publisher_rpc"),
cfg.DeprecatedOpt("metering_secret",
"publisher")]
), ),
] ]
cfg.CONF.register_opts(OPTS, group="publisher") cfg.CONF.register_opts(OPTS, group="publisher")

View File

@ -101,8 +101,7 @@ class TestAPIACL(v2.FunctionalTest,
'tag': 'self.sample4'}, 'tag': 'self.sample4'},
source='test_source')]: source='test_source')]:
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
cnt, cnt, self.CONF.publisher.telemetry_secret)
self.CONF.publisher.metering_secret)
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
def get_json(self, path, expect_errors=False, headers=None, def get_json(self, path, expect_errors=False, headers=None,

View File

@ -89,8 +89,7 @@ class TestQueryMetersController(tests_api.FunctionalTest,
source='test_source')]: source='test_source')]:
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
cnt, cnt, self.CONF.publisher.telemetry_secret)
self.CONF.publisher.metering_secret)
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
def test_query_fields_are_optional(self): def test_query_fields_are_optional(self):

View File

@ -54,8 +54,7 @@ class TestListEvents(v2.FunctionalTest,
source='test_source', source='test_source',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
self.sample1, self.sample1, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -74,8 +73,7 @@ class TestListEvents(v2.FunctionalTest,
source='source2', source='source2',
) )
msg2 = utils.meter_message_from_counter( msg2 = utils.meter_message_from_counter(
self.sample2, self.sample2, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg2) self.conn.record_metering_data(msg2)

View File

@ -177,8 +177,7 @@ class TestListMeters(v2.FunctionalTest,
resource_metadata={}, resource_metadata={},
source='test_source1')]: source='test_source1')]:
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
cnt, cnt, self.CONF.publisher.telemetry_secret)
self.CONF.publisher.metering_secret)
self.messages.append(msg) self.messages.append(msg)
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)

View File

@ -61,8 +61,7 @@ class TestListResources(v2.FunctionalTest,
source='test', source='test',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
sample1, sample1, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -90,8 +89,7 @@ class TestListResources(v2.FunctionalTest,
source='test', source='test',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
sample1, sample1, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -110,8 +108,7 @@ class TestListResources(v2.FunctionalTest,
source='test', source='test',
) )
msg2 = utils.meter_message_from_counter( msg2 = utils.meter_message_from_counter(
sample2, sample2, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg2) self.conn.record_metering_data(msg2)
@ -144,7 +141,7 @@ class TestListResources(v2.FunctionalTest,
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
datapoint, datapoint,
self.CONF.publisher.metering_secret, self.CONF.publisher.telemetry_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -169,8 +166,7 @@ class TestListResources(v2.FunctionalTest,
source='test', source='test',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
sample1, sample1, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -189,8 +185,7 @@ class TestListResources(v2.FunctionalTest,
source='test', source='test',
) )
msg2 = utils.meter_message_from_counter( msg2 = utils.meter_message_from_counter(
sample2, sample2, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg2) self.conn.record_metering_data(msg2)
@ -213,8 +208,7 @@ class TestListResources(v2.FunctionalTest,
source='test_list_resources', source='test_list_resources',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
sample1, sample1, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -233,8 +227,7 @@ class TestListResources(v2.FunctionalTest,
source='not-test', source='not-test',
) )
msg2 = utils.meter_message_from_counter( msg2 = utils.meter_message_from_counter(
sample2, sample2, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg2) self.conn.record_metering_data(msg2)
@ -262,8 +255,7 @@ class TestListResources(v2.FunctionalTest,
source='test_list_resources', source='test_list_resources',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
sample1, sample1, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -282,8 +274,7 @@ class TestListResources(v2.FunctionalTest,
source='test_list_resources', source='test_list_resources',
) )
msg2 = utils.meter_message_from_counter( msg2 = utils.meter_message_from_counter(
sample2, sample2, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg2) self.conn.record_metering_data(msg2)
@ -315,8 +306,7 @@ class TestListResources(v2.FunctionalTest,
source='test_list_resources', source='test_list_resources',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
sample1, sample1, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -335,8 +325,7 @@ class TestListResources(v2.FunctionalTest,
source='not-test', source='not-test',
) )
msg2 = utils.meter_message_from_counter( msg2 = utils.meter_message_from_counter(
sample2, sample2, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg2) self.conn.record_metering_data(msg2)
@ -362,8 +351,7 @@ class TestListResources(v2.FunctionalTest,
source='test_list_resources', source='test_list_resources',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
sample1, sample1, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -382,8 +370,7 @@ class TestListResources(v2.FunctionalTest,
source='not-test', source='not-test',
) )
msg2 = utils.meter_message_from_counter( msg2 = utils.meter_message_from_counter(
sample2, sample2, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg2) self.conn.record_metering_data(msg2)
@ -409,8 +396,7 @@ class TestListResources(v2.FunctionalTest,
source='not-test', source='not-test',
) )
msg2 = utils.meter_message_from_counter( msg2 = utils.meter_message_from_counter(
sample1, sample1, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg2) self.conn.record_metering_data(msg2)
@ -436,8 +422,7 @@ class TestListResources(v2.FunctionalTest,
source='not-test', source='not-test',
) )
msg2 = utils.meter_message_from_counter( msg2 = utils.meter_message_from_counter(
sample1, sample1, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg2) self.conn.record_metering_data(msg2)
@ -465,8 +450,7 @@ class TestListResources(v2.FunctionalTest,
source='test', source='test',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
sample1, sample1, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -494,8 +478,7 @@ class TestListResources(v2.FunctionalTest,
source='test_list_resources', source='test_list_resources',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
sample1, sample1, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -526,8 +509,7 @@ class TestListResources(v2.FunctionalTest,
source='test_list_resources', source='test_list_resources',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
sample1, sample1, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)

View File

@ -45,8 +45,7 @@ class TestMaxProjectVolume(v2.FunctionalTest,
source='source1', source='source1',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
s, s, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -143,8 +142,7 @@ class TestMaxResourceVolume(v2.FunctionalTest,
source='source1', source='source1',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
s, s, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -273,8 +271,7 @@ class TestSumProjectVolume(v2.FunctionalTest,
source='source1', source='source1',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
s, s, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -373,8 +370,7 @@ class TestSumResourceVolume(v2.FunctionalTest,
source='source1', source='source1',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
s, s, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -530,8 +526,7 @@ class TestGroupByInstance(v2.FunctionalTest,
source=test_sample['source'], source=test_sample['source'],
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
c, c, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -1281,8 +1276,7 @@ class TestGroupBySource(v2.FunctionalTest,
source=test_sample['source'], source=test_sample['source'],
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
c, c, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -1383,8 +1377,7 @@ class TestSelectableAggregates(v2.FunctionalTest,
source=test_sample['source'], source=test_sample['source'],
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
c, c, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -1482,8 +1475,7 @@ class TestSelectableAggregates(v2.FunctionalTest,
source='source', source='source',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
s, s, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -1635,8 +1627,7 @@ class TestUnparameterizedAggregates(v2.FunctionalTest,
source=test_sample['source'], source=test_sample['source'],
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
c, c, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)

View File

@ -48,8 +48,7 @@ class TestDispatcherDB(base.BaseTestCase):
'counter_volume': 1, 'counter_volume': 1,
} }
msg['message_signature'] = utils.compute_signature( msg['message_signature'] = utils.compute_signature(
msg, msg, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
with mock.patch.object(self.dispatcher.meter_conn, with mock.patch.object(self.dispatcher.meter_conn,
@ -85,8 +84,7 @@ class TestDispatcherDB(base.BaseTestCase):
'timestamp': '2012-07-02T13:53:40Z', 'timestamp': '2012-07-02T13:53:40Z',
} }
msg['message_signature'] = utils.compute_signature( msg['message_signature'] = utils.compute_signature(
msg, msg, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
expected = msg.copy() expected = msg.copy()
@ -105,8 +103,7 @@ class TestDispatcherDB(base.BaseTestCase):
'timestamp': '2012-09-30T15:31:50.262-08:00', 'timestamp': '2012-09-30T15:31:50.262-08:00',
} }
msg['message_signature'] = utils.compute_signature( msg['message_signature'] = utils.compute_signature(
msg, msg, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
expected = msg.copy() expected = msg.copy()

View File

@ -52,8 +52,7 @@ class TestDispatcherFile(base.BaseTestCase):
'counter_volume': 1, 'counter_volume': 1,
} }
msg['message_signature'] = utils.compute_signature( msg['message_signature'] = utils.compute_signature(
msg, msg, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
# The record_metering_data method should exist and not produce errors. # The record_metering_data method should exist and not produce errors.
@ -84,8 +83,7 @@ class TestDispatcherFile(base.BaseTestCase):
'counter_volume': 1, 'counter_volume': 1,
} }
msg['message_signature'] = utils.compute_signature( msg['message_signature'] = utils.compute_signature(
msg, msg, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
# The record_metering_data method should exist and not produce errors. # The record_metering_data method should exist and not produce errors.

View File

@ -32,8 +32,7 @@ class TestDispatcherHttp(base.BaseTestCase):
'counter_volume': 1, 'counter_volume': 1,
} }
self.msg['message_signature'] = utils.compute_signature( self.msg['message_signature'] = utils.compute_signature(
self.msg, self.msg, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
def test_http_dispatcher_config_options(self): def test_http_dispatcher_config_options(self):
@ -78,8 +77,7 @@ class TestDispatcherHttp(base.BaseTestCase):
self.msg['resource_metadata'] = {'request': {'NONE_CADF_EVENT': { self.msg['resource_metadata'] = {'request': {'NONE_CADF_EVENT': {
'q1': 'v1', 'q2': 'v2'}, }, } 'q1': 'v1', 'q2': 'v2'}, }, }
self.msg['message_signature'] = utils.compute_signature( self.msg['message_signature'] = utils.compute_signature(
self.msg, self.msg, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
with mock.patch.object(requests, 'post') as post: with mock.patch.object(requests, 'post') as post:
@ -97,8 +95,7 @@ class TestDispatcherHttp(base.BaseTestCase):
self.msg['resource_metadata'] = {'request': {'CADF_EVENT': { self.msg['resource_metadata'] = {'request': {'CADF_EVENT': {
'q1': 'v1', 'q2': 'v2'}, }, } 'q1': 'v1', 'q2': 'v2'}, }, }
self.msg['message_signature'] = utils.compute_signature( self.msg['message_signature'] = utils.compute_signature(
self.msg, self.msg, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
with mock.patch.object(requests, 'post') as post: with mock.patch.object(requests, 'post') as post:
@ -114,8 +111,7 @@ class TestDispatcherHttp(base.BaseTestCase):
self.msg['resource_metadata'] = {'any': {'thing1': 'v1', self.msg['resource_metadata'] = {'any': {'thing1': 'v1',
'thing2': 'v2', }, } 'thing2': 'v2', }, }
self.msg['message_signature'] = utils.compute_signature( self.msg['message_signature'] = utils.compute_signature(
self.msg, self.msg, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
with mock.patch.object(requests, 'post') as post: with mock.patch.object(requests, 'post') as post:

View File

@ -110,7 +110,7 @@ class TestUDPPublisher(base.BaseTestCase):
def setUp(self): def setUp(self):
super(TestUDPPublisher, self).setUp() super(TestUDPPublisher, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf self.CONF = self.useFixture(fixture_config.Config()).conf
self.CONF.publisher.metering_secret = 'not-so-secret' self.CONF.publisher.telemetry_secret = 'not-so-secret'
def test_published(self): def test_published(self):
self.data_sent = [] self.data_sent = []

View File

@ -51,7 +51,7 @@ class DBTestBase(tests_db.TestBase):
resource_metadata=metadata, source=source resource_metadata=metadata, source=source
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
s, self.CONF.publisher.metering_secret s, self.CONF.publisher.telemetry_secret
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
return msg return msg
@ -1409,8 +1409,7 @@ class StatisticsGroupByTest(DBTestBase,
source=test_sample['source'], source=test_sample['source'],
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
c, c, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -2608,8 +2607,7 @@ class CounterDataTypeTest(DBTestBase,
source='test-1', source='test-1',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
c, c, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -2627,8 +2625,7 @@ class CounterDataTypeTest(DBTestBase,
source='test-1', source='test-1',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
c, c, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -2645,8 +2642,7 @@ class CounterDataTypeTest(DBTestBase,
source='test-1', source='test-1',
) )
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
c, c, self.CONF.publisher.telemetry_secret,
self.CONF.publisher.metering_secret,
) )
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)
@ -3492,7 +3488,7 @@ class BigIntegerTest(tests_db.TestBase,
timestamp=datetime.datetime.utcnow(), timestamp=datetime.datetime.utcnow(),
resource_metadata=metadata) resource_metadata=metadata)
msg = utils.meter_message_from_counter( msg = utils.meter_message_from_counter(
s, self.CONF.publisher.metering_secret) s, self.CONF.publisher.telemetry_secret)
self.conn.record_metering_data(msg) self.conn.record_metering_data(msg)

View File

@ -47,7 +47,7 @@ class TestCollector(tests_base.BaseTestCase):
self.CONF = self.useFixture(fixture_config.Config()).conf self.CONF = self.useFixture(fixture_config.Config()).conf
self.CONF.import_opt("connection", "oslo.db.options", group="database") self.CONF.import_opt("connection", "oslo.db.options", group="database")
self.CONF.set_override("connection", "log://", group='database') self.CONF.set_override("connection", "log://", group='database')
self.CONF.set_override('metering_secret', 'not-so-secret', self.CONF.set_override('telemetry_secret', 'not-so-secret',
group='publisher') group='publisher')
self._setup_messaging() self._setup_messaging()
@ -230,25 +230,53 @@ class TestCollector(tests_base.BaseTestCase):
mylog.info.assert_called_once_with( mylog.info.assert_called_once_with(
'metering data test for test_run_tasks: 1') 'metering data test for test_run_tasks: 1')
@mock.patch.object(oslo.messaging.MessageHandlingServer, 'start') def _test_collector_requeue(self, listener):
@mock.patch.object(collector.CollectorService, 'start_udp')
def test_collector_requeue(self, udp_start, rpc_start):
self.CONF.set_override('requeue_sample_on_dispatcher_error', True,
group='collector')
self.srv.start() self.srv.start()
with mock.patch.object(self.srv.dispatcher_manager, 'map_method', with mock.patch.object(self.srv.dispatcher_manager, 'map_method',
side_effect=Exception('boom')): side_effect=Exception('boom')):
ret = self.srv.sample({}, 'pub_id', 'event', {}, {}) endp = getattr(self.srv, listener).dispatcher.endpoints[0]
ret = endp.sample({}, 'pub_id', 'event', {}, {})
self.assertEqual(oslo.messaging.NotificationResult.REQUEUE, self.assertEqual(oslo.messaging.NotificationResult.REQUEUE,
ret) ret)
@mock.patch.object(oslo.messaging.MessageHandlingServer, 'start') @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start',
@mock.patch.object(collector.CollectorService, 'start_udp') mock.Mock())
def test_collector_no_requeue(self, udp_start, rpc_start): @mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
self.CONF.set_override('requeue_sample_on_dispatcher_error', False, def test_collector_sample_requeue(self):
self.CONF.set_override('requeue_sample_on_dispatcher_error', True,
group='collector') group='collector')
self._test_collector_requeue('sample_listener')
@mock.patch.object(oslo.messaging.MessageHandlingServer, 'start',
mock.Mock())
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
def test_collector_event_requeue(self):
self.CONF.set_override('requeue_event_on_dispatcher_error', True,
group='collector')
self.CONF.set_override('store_events', True, group='notification')
self._test_collector_requeue('event_listener')
def _test_collector_no_requeue(self, listener):
self.srv.start() self.srv.start()
with mock.patch.object(self.srv.dispatcher_manager, 'map_method', with mock.patch.object(self.srv.dispatcher_manager, 'map_method',
side_effect=FakeException('boom')): side_effect=FakeException('boom')):
self.assertRaises(FakeException, self.srv.sample, {}, 'pub_id', endp = getattr(self.srv, listener).dispatcher.endpoints[0]
self.assertRaises(FakeException, endp.sample, {}, 'pub_id',
'event', {}, {}) 'event', {}, {})
@mock.patch.object(oslo.messaging.MessageHandlingServer, 'start',
mock.Mock())
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
def test_collector_sample_no_requeue(self):
self.CONF.set_override('requeue_sample_on_dispatcher_error', False,
group='collector')
self._test_collector_no_requeue('sample_listener')
@mock.patch.object(oslo.messaging.MessageHandlingServer, 'start',
mock.Mock())
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
def test_collector_event_no_requeue(self):
self.CONF.set_override('requeue_event_on_dispatcher_error', False,
group='collector')
self.CONF.set_override('store_events', True, group='notification')
self._test_collector_no_requeue('event_listener')

View File

@ -249,3 +249,12 @@ class HashRing(object):
return None return None
pos = self._get_position_on_ring(key) pos = self._get_position_on_ring(key)
return self._ring[self._sorted_keys[pos]] return self._ring[self._sorted_keys[pos]]
def kill_listeners(listeners):
# NOTE(gordc): correct usage of oslo.messaging listener is to stop(),
# which stops new messages, and wait(), which processes remaining
# messages and closes connection
for listener in listeners:
listener.stop()
listener.wait()

View File

@ -217,9 +217,9 @@ Installing the notification agent
not yet been tested with ZeroMQ. We recommend using Rabbit or not yet been tested with ZeroMQ. We recommend using Rabbit or
qpid for now. qpid for now.
2. Set the ``metering_secret`` value. 2. Set the ``telemetry_secret`` value.
Set the ``metering_secret`` value to a large, random, value. Use Set the ``telemetry_secret`` value to a large, random, value. Use
the same value in all ceilometer configuration files, on all the same value in all ceilometer configuration files, on all
nodes, so that messages passing between the nodes can be nodes, so that messages passing between the nodes can be
validated. validated.
@ -289,9 +289,9 @@ Installing the collector
not yet been tested with ZeroMQ. We recommend using Rabbit or not yet been tested with ZeroMQ. We recommend using Rabbit or
qpid for now. qpid for now.
2. Set the ``metering_secret`` value. 2. Set the ``telemetry_secret`` value.
Set the ``metering_secret`` value to a large, random, value. Use Set the ``telemetry_secret`` value to a large, random, value. Use
the same value in all ceilometer configuration files, on all the same value in all ceilometer configuration files, on all
nodes, so that messages passing between the nodes can be nodes, so that messages passing between the nodes can be
validated. validated.
@ -372,9 +372,9 @@ Installing the Compute Agent
not yet been tested with ZeroMQ. We recommend using Rabbit or not yet been tested with ZeroMQ. We recommend using Rabbit or
qpid for now. qpid for now.
2. Set the ``metering_secret`` value. 2. Set the ``telemetry_secret`` value.
Set the ``metering_secret`` value to a large, random, value. Use Set the ``telemetry_secret`` value to a large, random, value. Use
the same value in all ceilometer configuration files, on all the same value in all ceilometer configuration files, on all
nodes, so that messages passing between the nodes can be nodes, so that messages passing between the nodes can be
validated. validated.
@ -446,9 +446,9 @@ Installing the Central Agent
not yet been tested with ZeroMQ. We recommend using Rabbit or not yet been tested with ZeroMQ. We recommend using Rabbit or
qpid for now. qpid for now.
2. Set the ``metering_secret`` value. 2. Set the ``telemetry_secret`` value.
Set the ``metering_secret`` value to a large, random, value. Use Set the ``telemetry_secret`` value to a large, random, value. Use
the same value in all ceilometer configuration files, on all the same value in all ceilometer configuration files, on all
nodes, so that messages passing between the nodes can be nodes, so that messages passing between the nodes can be
validated. validated.

View File

@ -81,8 +81,7 @@ def make_test_data(name, meter_type, unit, volume, random_min,
source=source, source=source,
) )
data = utils.meter_message_from_counter( data = utils.meter_message_from_counter(
c, c, cfg.CONF.publisher.telemetry_secret)
cfg.CONF.publisher.metering_secret)
yield data yield data
n += 1 n += 1