nearly pluggable notification agent

notification agent now just asks for pipelinemanagers and gets
endpoints it should broadcast to from there. it only sets up a
listener for main queue and a listener for internal queue
(if applicable)

- pass in publishing/processing context into endpoints instead of
manager. context is based on partitioning or not
- move all endpoint/notifier setup to respective pipeline managers
- change interim broadcast filtering to use event_type rather than
publisher_id so all filtering uses event_type.
- add namespace to load supported pipeline managers
- remove some notification tests as they are redundant and only
different that it mocks stuff other tests don't mock
- change relevant_endpoint test to verify endpoints cover all pipelines

Related-Bug: #1720021
Change-Id: I9f9073e3b15c4e3a502976c2e3e0306bc99282d9
This commit is contained in:
gord chung 2017-11-01 22:00:08 +00:00
parent 48f35a35a4
commit 2d67bd21dd
12 changed files with 293 additions and 404 deletions

View File

@ -186,8 +186,8 @@ class ProcessMeterNotifications(endpoint.SampleEndpoint):
event_types = []
def __init__(self, manager):
super(ProcessMeterNotifications, self).__init__(manager)
def __init__(self, conf, publisher):
super(ProcessMeterNotifications, self).__init__(conf, publisher)
self.definitions = self._load_definitions()
def _load_definitions(self):
@ -195,18 +195,18 @@ class ProcessMeterNotifications(endpoint.SampleEndpoint):
namespace='ceilometer.event.trait_plugin')
definitions = {}
mfs = []
for dir in self.manager.conf.meter.meter_definitions_dirs:
for dir in self.conf.meter.meter_definitions_dirs:
for filepath in sorted(glob.glob(os.path.join(dir, "*.yaml"))):
if filepath is not None:
mfs.append(filepath)
if self.manager.conf.meter.meter_definitions_cfg_file is not None:
if self.conf.meter.meter_definitions_cfg_file is not None:
mfs.append(
pkg_resources.resource_filename(
self.manager.conf.meter.meter_definitions_cfg_file)
self.conf.meter.meter_definitions_cfg_file)
)
for mf in mfs:
meters_cfg = declarative.load_definitions(
self.manager.conf, {}, mf)
self.conf, {}, mf)
for meter_cfg in reversed(meters_cfg['metric']):
if meter_cfg.get('name') in definitions:
@ -215,8 +215,7 @@ class ProcessMeterNotifications(endpoint.SampleEndpoint):
% meter_cfg)
continue
try:
md = MeterDefinition(meter_cfg, self.manager.conf,
plugin_manager)
md = MeterDefinition(meter_cfg, self.conf, plugin_manager)
except declarative.DefinitionException as e:
errmsg = "Error loading meter definition: %s"
LOG.error(errmsg, six.text_type(e))

View File

@ -27,11 +27,7 @@ import oslo_messaging
from stevedore import extension
from tooz import coordination
from ceilometer.i18n import _
from ceilometer import messaging
from ceilometer import pipeline
from ceilometer.pipeline import event as event_pipe
from ceilometer.pipeline import sample as sample_pipe
from ceilometer import utils
@ -105,7 +101,6 @@ class NotificationService(cotyledon.Service):
"""
NOTIFICATION_NAMESPACE = 'ceilometer.notification'
NOTIFICATION_IPC = 'ceilometer-pipe'
def __init__(self, worker_id, conf, coordination_id=None):
super(NotificationService, self).__init__(worker_id)
@ -131,51 +126,6 @@ class NotificationService(cotyledon.Service):
else:
self.partition_coordinator = None
@classmethod
def _get_notifications_manager(cls, pm):
return extension.ExtensionManager(
namespace=cls.NOTIFICATION_NAMESPACE,
invoke_on_load=True,
invoke_args=(pm, )
)
def _get_notifiers(self, transport, pipe):
notifiers = []
for x in range(self.conf.notification.pipeline_processing_queues):
notifiers.append(oslo_messaging.Notifier(
transport,
driver=self.conf.publisher_notifier.telemetry_driver,
publisher_id=pipe.name,
topics=['%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, x)]))
return notifiers
def _get_pipe_manager(self, transport, pipeline_manager):
if self.conf.notification.workload_partitioning:
pipe_manager = pipeline.SamplePipelineTransportManager(self.conf)
for pipe in pipeline_manager.pipelines:
key = pipe.get_grouping_key() or ['resource_id']
pipe_manager.add_transporter(
(pipe.source.support_meter, key,
self._get_notifiers(transport, pipe)))
else:
pipe_manager = pipeline_manager
return pipe_manager
def _get_event_pipeline_manager(self, transport):
if self.conf.notification.workload_partitioning:
event_pipe_manager = pipeline.EventPipelineTransportManager(
self.conf)
for pipe in self.event_pipeline_manager.pipelines:
event_pipe_manager.add_transporter(
(pipe.source.support_event, ['event_type'],
self._get_notifiers(transport, pipe)))
else:
event_pipe_manager = self.event_pipeline_manager
return event_pipe_manager
def get_targets(self):
"""Return a sequence of oslo_messaging.Target
@ -196,9 +146,10 @@ class NotificationService(cotyledon.Service):
super(NotificationService, self).run()
self.coord_lock = threading.Lock()
self.pipeline_manager = sample_pipe.SamplePipelineManager(self.conf)
self.event_pipeline_manager = (
event_pipe.EventPipelineManager(self.conf))
self.managers = [ext.obj for ext in extension.ExtensionManager(
namespace='ceilometer.notification.pipeline', invoke_on_load=True,
invoke_args=(self.conf,
self.conf.notification.workload_partitioning))]
self.transport = messaging.get_transport(self.conf)
@ -212,11 +163,7 @@ class NotificationService(cotyledon.Service):
# to ensure the option has been registered by oslo_messaging.
messaging.get_notifier(self.transport, '')
pipe_manager = self._get_pipe_manager(self.transport,
self.pipeline_manager)
event_pipe_manager = self._get_event_pipeline_manager(self.transport)
self._configure_main_queue_listeners(pipe_manager, event_pipe_manager)
self._configure_main_queue_listeners()
if self.conf.notification.workload_partitioning:
# join group after all manager set up is configured
@ -237,28 +184,11 @@ class NotificationService(cotyledon.Service):
self.periodic.add(run_watchers)
utils.spawn_thread(self.periodic.start)
def _configure_main_queue_listeners(self, pipe_manager,
event_pipe_manager):
notification_manager = self._get_notifications_manager(pipe_manager)
if not list(notification_manager):
LOG.warning(_('Failed to load any notification handlers for %s'),
self.NOTIFICATION_NAMESPACE)
ack_on_error = self.conf.notification.ack_on_event_error
def _configure_main_queue_listeners(self):
endpoints = []
endpoints.append(
event_pipe.EventEndpoint(event_pipe_manager))
for pipe_mgr in self.managers:
endpoints.extend(pipe_mgr.get_main_endpoints())
targets = self.get_targets()
for ext in notification_manager:
handler = ext.obj
LOG.debug('Event types from %(name)s: %(type)s'
' (ack_on_error=%(error)s)',
{'name': ext.name,
'type': ', '.join(handler.event_types),
'error': ack_on_error})
endpoints.append(handler)
urls = self.conf.notification.messaging_urls or [None]
for url in urls:
@ -281,36 +211,29 @@ class NotificationService(cotyledon.Service):
self._configure_pipeline_listener()
def _configure_pipeline_listener(self):
ev_pipes = self.event_pipeline_manager.pipelines
pipelines = self.pipeline_manager.pipelines + ev_pipes
transport = messaging.get_transport(self.conf)
partitioned = list(filter(
self.hashring.belongs_to_self, self.partition_set))
endpoints = []
for pipe_mgr in self.managers:
endpoints.extend(pipe_mgr.get_interim_endpoints())
targets = []
for pipe in pipelines:
if isinstance(pipe, event_pipe.EventPipeline):
endpoints.append(pipeline.EventPipelineEndpoint(pipe))
else:
endpoints.append(pipeline.SamplePipelineEndpoint(pipe))
for pipe_set, pipe in itertools.product(partitioned, pipelines):
LOG.debug('Pipeline endpoint: %s from set: %s',
pipe.name, pipe_set)
topic = '%s-%s-%s' % (self.NOTIFICATION_IPC,
pipe.name, pipe_set)
targets.append(oslo_messaging.Target(topic=topic))
for mgr in self.managers:
for pipe_set, pipe in itertools.product(partitioned,
mgr.pipelines):
LOG.debug('Pipeline endpoint: %s from set: %s',
pipe.name, pipe_set)
topic = '%s-%s-%s' % (pipe.NOTIFICATION_IPC,
pipe.name, pipe_set)
targets.append(oslo_messaging.Target(topic=topic))
if self.pipeline_listener:
self.pipeline_listener.stop()
self.pipeline_listener.wait()
self.pipeline_listener = messaging.get_batch_notification_listener(
transport,
targets,
endpoints,
self.transport, targets, endpoints,
batch_size=self.conf.notification.batch_size,
batch_timeout=self.conf.notification.batch_timeout)
# NOTE(gordc): set single thread to process data sequentially

View File

@ -15,20 +15,15 @@
# under the License.
import abc
from itertools import chain
from operator import methodcaller
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
from oslo_utils import timeutils
import six
from ceilometer import agent
from ceilometer.event import models
from ceilometer import messaging
from ceilometer import publisher
from ceilometer.publisher import utils as publisher_utils
from ceilometer import sample as sample_util
OPTS = [
cfg.StrOpt('pipeline_cfg_file',
@ -50,143 +45,58 @@ class PipelineException(agent.ConfigException):
super(PipelineException, self).__init__('Pipeline', message, cfg)
@six.add_metaclass(abc.ABCMeta)
class PipelineEndpoint(object):
class InterimPublishContext(object):
"""Publisher to hash/shard data to pipelines"""
def __init__(self, pipeline):
self.filter_rule = oslo_messaging.NotificationFilter(
publisher_id=pipeline.name)
self.publish_context = PublishContext([pipeline])
self.conf = pipeline.conf
@abc.abstractmethod
def sample(self, messages):
pass
class SamplePipelineEndpoint(PipelineEndpoint):
def sample(self, messages):
samples = chain.from_iterable(m["payload"] for m in messages)
samples = [
sample_util.Sample(name=s['counter_name'],
type=s['counter_type'],
unit=s['counter_unit'],
volume=s['counter_volume'],
user_id=s['user_id'],
project_id=s['project_id'],
resource_id=s['resource_id'],
timestamp=s['timestamp'],
resource_metadata=s['resource_metadata'],
source=s.get('source'),
# NOTE(sileht): May come from an older node,
# Put None in this case.
monotonic_time=s.get('monotonic_time'))
for s in samples if publisher_utils.verify_signature(
s, self.conf.publisher.telemetry_secret)
]
with self.publish_context as p:
p(sorted(samples, key=methodcaller('get_iso_timestamp')))
class EventPipelineEndpoint(PipelineEndpoint):
def sample(self, messages):
events = chain.from_iterable(m["payload"] for m in messages)
events = [
models.Event(
message_id=ev['message_id'],
event_type=ev['event_type'],
generated=timeutils.normalize_time(
timeutils.parse_isotime(ev['generated'])),
traits=[models.Trait(name, dtype,
models.Trait.convert_value(dtype, value))
for name, dtype, value in ev['traits']],
raw=ev.get('raw', {}))
for ev in events if publisher_utils.verify_signature(
ev, self.conf.publisher.telemetry_secret)
]
try:
with self.publish_context as p:
p(events)
except Exception:
if not self.conf.notification.ack_on_event_error:
return oslo_messaging.NotificationResult.REQUEUE
raise
return oslo_messaging.NotificationResult.HANDLED
class _PipelineTransportManager(object):
def __init__(self, conf):
def __init__(self, conf, pipelines):
self.conf = conf
self.transporters = []
self.pipe_notifiers = []
transport = messaging.get_transport(conf)
for pipe in pipelines:
self.pipe_notifiers.append(
(pipe, self._get_notifiers(transport, pipe)))
def _get_notifiers(self, transport, pipe):
notifiers = []
for x in range(self.conf.notification.pipeline_processing_queues):
notifiers.append(oslo_messaging.Notifier(
transport,
driver=self.conf.publisher_notifier.telemetry_driver,
topics=['%s-%s-%s' % (pipe.NOTIFICATION_IPC, pipe.name, x)]))
return notifiers
@staticmethod
def hash_grouping(datapoint, grouping_keys):
# FIXME(gordc): this logic only supports a single grouping_key. we
# need to change to support pipeline with multiple transformers and
# different grouping_keys
value = ''
for key in grouping_keys or []:
value += datapoint.get(key) if datapoint.get(key) else ''
return hash(value)
def add_transporter(self, transporter):
self.transporters.append(transporter)
def __enter__(self):
def p(data):
data = [data] if not isinstance(data, list) else data
for datapoint in data:
for pipe, notifiers in self.pipe_notifiers:
if pipe.supported(datapoint):
serialized_data = pipe.serializer(datapoint)
key = (self.hash_grouping(serialized_data,
pipe.get_grouping_key())
% len(notifiers))
notifier = notifiers[key]
notifier.sample({}, event_type=pipe.name,
payload=[serialized_data])
return p
def publisher(self):
serializer = self.serializer
hash_grouping = self.hash_grouping
transporters = self.transporters
filter_attr = self.filter_attr
event_type = self.event_type
class PipelinePublishContext(object):
def __enter__(self):
def p(data):
# TODO(gordc): cleanup so payload is always single
# datapoint. we can't correctly bucketise
# datapoints if batched.
data = [data] if not isinstance(data, list) else data
for datapoint in data:
serialized_data = serializer(datapoint)
for d_filter, grouping_keys, notifiers in transporters:
if d_filter(serialized_data[filter_attr]):
key = (hash_grouping(serialized_data,
grouping_keys)
% len(notifiers))
notifier = notifiers[key]
notifier.sample({},
event_type=event_type,
payload=[serialized_data])
return p
def __exit__(self, exc_type, exc_value, traceback):
pass
return PipelinePublishContext()
class SamplePipelineTransportManager(_PipelineTransportManager):
filter_attr = 'counter_name'
event_type = 'ceilometer.pipeline'
def serializer(self, data):
return publisher_utils.meter_message_from_counter(
data, self.conf.publisher.telemetry_secret)
class EventPipelineTransportManager(_PipelineTransportManager):
filter_attr = 'event_type'
event_type = 'pipeline.event'
def serializer(self, data):
return publisher_utils.message_from_event(
data, self.conf.publisher.telemetry_secret)
def __exit__(self, exc_type, exc_value, traceback):
pass
class PublishContext(object):
def __init__(self, pipelines=None):
pipelines = pipelines or []
self.pipelines = set(pipelines)
def add_pipelines(self, pipelines):
self.pipelines.update(pipelines)
def __init__(self, pipelines):
self.pipelines = pipelines or []
def __enter__(self):
def p(data):
@ -311,6 +221,8 @@ class Sink(object):
class Pipeline(object):
"""Represents a coupling between a sink and a corresponding source."""
NOTIFICATION_IPC = 'ceilometer-pipe'
def __init__(self, conf, source, sink):
self.conf = conf
self.source = source
@ -332,11 +244,23 @@ class Pipeline(object):
def publish_data(self, data):
"""Publish data from pipeline."""
@abc.abstractproperty
def default_grouping_key(self):
"""Attribute to hash data on. Pass if no partitioning."""
@abc.abstractmethod
def supported(self, data):
"""Attribute to filter on. Pass if no partitioning."""
@abc.abstractmethod
def serializer(self, data):
"""Serialize data for interim transport. Pass if no partitioning."""
def get_grouping_key(self):
keys = []
for transformer in self.sink.transformers:
keys += transformer.grouping_keys
return list(set(keys))
return list(set(keys)) or self.default_grouping_key
class PublisherManager(object):
@ -360,7 +284,7 @@ class PipelineManager(agent.ConfigManagerBase):
Pipeline manager sets up pipelines according to config file
"""
def __init__(self, conf, cfg_file, transformer_manager):
def __init__(self, conf, cfg_file, transformer_manager, partition):
"""Setup the pipelines according to config.
The configuration is supported as follows:
@ -460,6 +384,7 @@ class PipelineManager(agent.ConfigManagerBase):
unique_names.add(pipe.name)
self.pipelines.append(pipe)
unique_names.clear()
self.partition = partition
@abc.abstractproperty
def pm_type(self):
@ -478,31 +403,42 @@ class PipelineManager(agent.ConfigManagerBase):
"""Pipeline sink class"""
def publisher(self):
"""Build a new Publisher for these manager pipelines.
:param context: The context.
"""
"""Build publisher for pipeline publishing."""
return PublishContext(self.pipelines)
def interim_publisher(self):
"""Build publishing context for IPC."""
return InterimPublishContext(self.conf, self.pipelines)
def get_main_publisher(self):
"""Return the publishing context to use"""
return (self.interim_publisher() if self.partition else
self.publisher())
def get_main_endpoints(self):
"""Return endpoints for main queue."""
pass
def get_interim_endpoints(self):
"""Return endpoints for interim pipeline queues."""
pass
class NotificationEndpoint(object):
"""Base Endpoint for plugins that support the notification API."""
def __init__(self, manager):
event_types = []
"""List of strings to filter messages on."""
def __init__(self, conf, publisher):
super(NotificationEndpoint, self).__init__()
# NOTE(gordc): this is filter rule used by oslo.messaging to dispatch
# messages to an endpoint.
if self.event_types:
self.filter_rule = oslo_messaging.NotificationFilter(
event_type='|'.join(self.event_types))
self.manager = manager
@abc.abstractproperty
def event_types(self):
"""Return a sequence of strings to filter on.
Strings are defining the event types to be given to this plugin.
"""
self.conf = conf
self.publisher = publisher
@abc.abstractmethod
def process_notifications(self, priority, notifications):
@ -511,58 +447,21 @@ class NotificationEndpoint(object):
:param message: Message to process.
"""
@staticmethod
def _consume_and_drop(notifications):
@classmethod
def _consume_and_drop(cls, notifications):
"""RPC endpoint for useless notification level"""
# NOTE(sileht): nothing special todo here, but because we listen
# for the generic notification exchange we have to consume all its
# queues
def audit(self, notifications):
"""endpoint for notification messages at audit level
:param notifications: list of notifications
"""
self._consume_and_drop(notifications)
class MainNotificationEndpoint(NotificationEndpoint):
"""Listens to queues on all priority levels and clears by default."""
def critical(self, notifications):
"""endpoint for notification messages at critical level
:param notifications: list of notifications
"""
self._consume_and_drop(notifications)
def debug(self, notifications):
"""endpoint for notification messages at debug level
:param notifications: list of notifications
"""
self._consume_and_drop(notifications)
def error(self, notifications):
"""endpoint for notification messages at error level
:param notifications: list of notifications
"""
self._consume_and_drop(notifications)
def info(self, notifications):
"""endpoint for notification messages at info level
:param notifications: list of notifications
"""
self._consume_and_drop(notifications)
def sample(self, notifications):
"""endpoint for notification messages at sample level
:param notifications: list of notifications
"""
self._consume_and_drop(notifications)
def warn(self, notifications):
"""endpoint for notification messages at warn level
:param notifications: list of notifications
"""
self._consume_and_drop(notifications)
audit = NotificationEndpoint._consume_and_drop
critical = NotificationEndpoint._consume_and_drop
debug = NotificationEndpoint._consume_and_drop
error = NotificationEndpoint._consume_and_drop
info = NotificationEndpoint._consume_and_drop
sample = NotificationEndpoint._consume_and_drop
warn = NotificationEndpoint._consume_and_drop

View File

@ -11,27 +11,31 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from itertools import chain
from oslo_log import log
import oslo_messaging
from oslo_utils import timeutils
from stevedore import extension
from ceilometer import agent
from ceilometer.event import converter
from ceilometer.event import models
from ceilometer import pipeline
from ceilometer.publisher import utils as publisher_utils
LOG = log.getLogger(__name__)
class EventEndpoint(pipeline.NotificationEndpoint):
class EventEndpoint(pipeline.MainNotificationEndpoint):
event_types = []
def __init__(self, manager):
super(EventEndpoint, self).__init__(manager)
def __init__(self, conf, publisher):
super(EventEndpoint, self).__init__(conf, publisher)
LOG.debug('Loading event definitions')
self.event_converter = converter.setup_events(
manager.conf,
conf,
extension.ExtensionManager(
namespace='ceilometer.event.trait_plugin'))
@ -54,15 +58,48 @@ class EventEndpoint(pipeline.NotificationEndpoint):
try:
event = self.event_converter.to_event(priority, message)
if event is not None:
with self.manager.publisher() as p:
with self.publisher as p:
p(event)
except Exception:
if not self.manager.conf.notification.ack_on_event_error:
if not self.conf.notification.ack_on_event_error:
return oslo_messaging.NotificationResult.REQUEUE
LOG.error('Fail to process a notification', exc_info=True)
return oslo_messaging.NotificationResult.HANDLED
class InterimEventEndpoint(pipeline.NotificationEndpoint):
def __init__(self, conf, publisher, pipe_name):
self.event_types = [pipe_name]
super(InterimEventEndpoint, self).__init__(conf, publisher)
def sample(self, notifications):
return self.process_notifications('sample', notifications)
def process_notifications(self, priority, notifications):
events = chain.from_iterable(m["payload"] for m in notifications)
events = [
models.Event(
message_id=ev['message_id'],
event_type=ev['event_type'],
generated=timeutils.normalize_time(
timeutils.parse_isotime(ev['generated'])),
traits=[models.Trait(name, dtype,
models.Trait.convert_value(dtype, value))
for name, dtype, value in ev['traits']],
raw=ev.get('raw', {}))
for ev in events if publisher_utils.verify_signature(
ev, self.conf.publisher.telemetry_secret)
]
try:
with self.publisher as p:
p(events)
except Exception:
if not self.conf.notification.ack_on_event_error:
return oslo_messaging.NotificationResult.REQUEUE
raise
return oslo_messaging.NotificationResult.HANDLED
class EventSource(pipeline.PipelineSource):
"""Represents a source of events.
@ -103,21 +140,30 @@ class EventSink(pipeline.Sink):
class EventPipeline(pipeline.Pipeline):
"""Represents a pipeline for Events."""
default_grouping_key = ['event_type']
def __str__(self):
# NOTE(gordc): prepend a namespace so we ensure event and sample
# pipelines do not have the same name.
return 'event:%s' % super(EventPipeline, self).__str__()
def support_event(self, event_type):
# FIXME(gordc): this is only used in tests
return self.source.support_event(event_type)
def publish_data(self, events):
if not isinstance(events, list):
events = [events]
supported = [e for e in events
if self.source.support_event(e.event_type)]
supported = [e for e in events if self.supported(e)]
self.sink.publish_events(supported)
def serializer(self, event):
return publisher_utils.message_from_event(
event, self.conf.publisher.telemetry_secret)
def supported(self, event):
return self.source.support_event(event.event_type)
class EventPipelineManager(pipeline.PipelineManager):
@ -126,6 +172,17 @@ class EventPipelineManager(pipeline.PipelineManager):
pm_source = EventSource
pm_sink = EventSink
def __init__(self, conf):
def __init__(self, conf, partition=False):
super(EventPipelineManager, self).__init__(
conf, conf.event_pipeline_cfg_file, {})
conf, conf.event_pipeline_cfg_file, {}, partition)
def get_main_endpoints(self):
return [EventEndpoint(self.conf, self.get_main_publisher())]
def get_interim_endpoints(self):
# FIXME(gordc): change this so we shard data rather than per
# pipeline. this will allow us to use self.publisher and less
# queues.
return [InterimEventEndpoint(
self.conf, pipeline.PublishContext([pipe]), pipe.name)
for pipe in self.pipelines]

View File

@ -10,16 +10,21 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from itertools import chain
from operator import methodcaller
from oslo_log import log
from stevedore import extension
from ceilometer import agent
from ceilometer import pipeline
from ceilometer.publisher import utils as publisher_utils
from ceilometer import sample as sample_util
LOG = log.getLogger(__name__)
class SampleEndpoint(pipeline.NotificationEndpoint):
class SampleEndpoint(pipeline.MainNotificationEndpoint):
def info(self, notifications):
"""Convert message at info level to Ceilometer sample.
@ -38,7 +43,7 @@ class SampleEndpoint(pipeline.NotificationEndpoint):
def process_notifications(self, priority, notifications):
for message in notifications:
try:
with self.manager.publisher() as p:
with self.publisher as p:
p(list(self.build_sample(message)))
except Exception:
LOG.error('Fail to process notification', exc_info=True)
@ -48,6 +53,37 @@ class SampleEndpoint(pipeline.NotificationEndpoint):
pass
class InterimSampleEndpoint(pipeline.NotificationEndpoint):
def __init__(self, conf, publisher, pipe_name):
self.event_types = [pipe_name]
super(InterimSampleEndpoint, self).__init__(conf, publisher)
def sample(self, notifications):
return self.process_notifications('sample', notifications)
def process_notifications(self, priority, notifications):
samples = chain.from_iterable(m["payload"] for m in notifications)
samples = [
sample_util.Sample(name=s['counter_name'],
type=s['counter_type'],
unit=s['counter_unit'],
volume=s['counter_volume'],
user_id=s['user_id'],
project_id=s['project_id'],
resource_id=s['resource_id'],
timestamp=s['timestamp'],
resource_metadata=s['resource_metadata'],
source=s.get('source'),
# NOTE(sileht): May come from an older node,
# Put None in this case.
monotonic_time=s.get('monotonic_time'))
for s in samples if publisher_utils.verify_signature(
s, self.conf.publisher.telemetry_secret)
]
with self.publisher as p:
p(sorted(samples, key=methodcaller('get_iso_timestamp')))
class SampleSource(pipeline.PipelineSource):
"""Represents a source of samples.
@ -146,7 +182,10 @@ class SampleSink(pipeline.Sink):
class SamplePipeline(pipeline.Pipeline):
"""Represents a pipeline for Samples."""
default_grouping_key = ['resource_id']
def support_meter(self, meter_name):
# FIXME(gordc): this is only used in tests
return self.source.support_meter(meter_name)
def _validate_volume(self, s):
@ -181,10 +220,17 @@ class SamplePipeline(pipeline.Pipeline):
def publish_data(self, samples):
if not isinstance(samples, list):
samples = [samples]
supported = [s for s in samples if self.source.support_meter(s.name)
supported = [s for s in samples if self.supported(s)
and self._validate_volume(s)]
self.sink.publish_samples(supported)
def serializer(self, sample):
return publisher_utils.meter_message_from_counter(
sample, self.conf.publisher.telemetry_secret)
def supported(self, sample):
return self.source.support_meter(sample.name)
class SamplePipelineManager(pipeline.PipelineManager):
@ -193,10 +239,26 @@ class SamplePipelineManager(pipeline.PipelineManager):
pm_source = SampleSource
pm_sink = SampleSink
def __init__(self, conf):
def __init__(self, conf, partition=False):
super(SamplePipelineManager, self).__init__(
conf, conf.pipeline_cfg_file, self.get_transform_manager())
conf, conf.pipeline_cfg_file, self.get_transform_manager(),
partition)
@staticmethod
def get_transform_manager():
return extension.ExtensionManager('ceilometer.transformer')
def get_main_endpoints(self):
exts = extension.ExtensionManager(
namespace='ceilometer.sample.endpoint',
invoke_on_load=True,
invoke_args=(self.conf, self.get_main_publisher()))
return [ext.obj for ext in exts]
def get_interim_endpoints(self):
# FIXME(gordc): change this so we shard data rather than per
# pipeline. this will allow us to use self.publisher and less
# queues.
return [InterimSampleEndpoint(
self.conf, pipeline.PublishContext([pipe]), pipe.name)
for pipe in self.pipelines]

View File

@ -113,7 +113,8 @@ class TestEventEndpoint(tests_base.BaseTestCase):
def _setup_endpoint(self, publishers):
ev_pipeline_mgr = self._setup_pipeline(publishers)
self.endpoint = event_pipe.EventEndpoint(ev_pipeline_mgr)
self.endpoint = event_pipe.EventEndpoint(
ev_pipeline_mgr.conf, ev_pipeline_mgr.publisher())
self.endpoint.event_converter = mock.MagicMock()
self.endpoint.event_converter.to_event.return_value = mock.MagicMock(

View File

@ -38,7 +38,7 @@ class TestNotifications(base.BaseTestCase):
* some readings are skipped if the value is 'Disabled'
* metatata with the node id
"""
processor = ipmi.TemperatureSensorNotification(None)
processor = ipmi.TemperatureSensorNotification(None, None)
counters = dict([(counter.resource_id, counter) for counter in
processor.build_sample(
ipmi_test_data.SENSOR_DATA)])
@ -64,7 +64,7 @@ class TestNotifications(base.BaseTestCase):
A single current reading is effectively the same as temperature,
modulo "current".
"""
processor = ipmi.CurrentSensorNotification(None)
processor = ipmi.CurrentSensorNotification(None, None)
counters = dict([(counter.resource_id, counter) for counter in
processor.build_sample(
ipmi_test_data.SENSOR_DATA)])
@ -85,7 +85,7 @@ class TestNotifications(base.BaseTestCase):
A single fan reading is effectively the same as temperature,
modulo "fan".
"""
processor = ipmi.FanSensorNotification(None)
processor = ipmi.FanSensorNotification(None, None)
counters = dict([(counter.resource_id, counter) for counter in
processor.build_sample(
ipmi_test_data.SENSOR_DATA)])
@ -106,7 +106,7 @@ class TestNotifications(base.BaseTestCase):
A single voltage reading is effectively the same as temperature,
modulo "voltage".
"""
processor = ipmi.VoltageSensorNotification(None)
processor = ipmi.VoltageSensorNotification(None, None)
counters = dict([(counter.resource_id, counter) for counter in
processor.build_sample(
ipmi_test_data.SENSOR_DATA)])
@ -123,7 +123,7 @@ class TestNotifications(base.BaseTestCase):
def test_disabed_skips_metric(self):
"""Test that a meter which a disabled volume is skipped."""
processor = ipmi.TemperatureSensorNotification(None)
processor = ipmi.TemperatureSensorNotification(None, None)
counters = dict([(counter.resource_id, counter) for counter in
processor.build_sample(
ipmi_test_data.SENSOR_DATA)])
@ -138,7 +138,7 @@ class TestNotifications(base.BaseTestCase):
self.assertNotIn(resource_id, counters)
def test_empty_payload_no_metrics_success(self):
processor = ipmi.TemperatureSensorNotification(None)
processor = ipmi.TemperatureSensorNotification(None, None)
counters = dict([(counter.resource_id, counter) for counter in
processor.build_sample(
ipmi_test_data.EMPTY_PAYLOAD)])
@ -147,7 +147,7 @@ class TestNotifications(base.BaseTestCase):
@mock.patch('ceilometer.ipmi.notifications.ironic.LOG')
def test_missing_sensor_data(self, mylog):
processor = ipmi.TemperatureSensorNotification(None)
processor = ipmi.TemperatureSensorNotification(None, None)
messages = []
mylog.warning = lambda *args: messages.extend(args)
@ -163,7 +163,7 @@ class TestNotifications(base.BaseTestCase):
@mock.patch('ceilometer.ipmi.notifications.ironic.LOG')
def test_sensor_data_malformed(self, mylog):
processor = ipmi.TemperatureSensorNotification(None)
processor = ipmi.TemperatureSensorNotification(None, None)
messages = []
mylog.warning = lambda *args: messages.extend(args)
@ -184,7 +184,7 @@ class TestNotifications(base.BaseTestCase):
Presumably this will never happen given the way the data
is created, but better defensive than dead.
"""
processor = ipmi.TemperatureSensorNotification(None)
processor = ipmi.TemperatureSensorNotification(None, None)
messages = []
mylog.warning = lambda *args: messages.extend(args)
@ -200,7 +200,7 @@ class TestNotifications(base.BaseTestCase):
@mock.patch('ceilometer.ipmi.notifications.ironic.LOG')
def test_missing_sensor_id(self, mylog):
"""Test for desired error message when 'Sensor ID' missing."""
processor = ipmi.TemperatureSensorNotification(None)
processor = ipmi.TemperatureSensorNotification(None, None)
messages = []
mylog.warning = lambda *args: messages.extend(args)

View File

@ -282,7 +282,7 @@ class TestMeterProcessing(test.BaseTestCase):
self.CONF = ceilometer_service.prepare_service([], [])
self.path = self.useFixture(fixtures.TempDir()).path
self.handler = notifications.ProcessMeterNotifications(
mock.Mock(conf=self.CONF))
self.CONF, mock.Mock())
def _load_meter_def_file(self, cfgs=None):
self.CONF.set_override('meter_definitions_dirs',

View File

@ -357,7 +357,6 @@ class EventPipelineTestCase(base.BaseTestCase):
self._exception_create_pipelinemanager()
def test_event_pipeline_endpoint_requeue_on_failure(self):
self.CONF.set_override("ack_on_event_error", False,
group="notification")
self.CONF.set_override("telemetry_secret", "not-so-secret",
@ -384,8 +383,9 @@ class EventPipelineTestCase(base.BaseTestCase):
self._build_and_set_new_pipeline()
pipeline_manager = event.EventPipelineManager(self.CONF)
event_pipeline_endpoint = pipeline.EventPipelineEndpoint(
pipeline_manager.pipelines[0])
pipe = pipeline_manager.pipelines[0]
event_pipeline_endpoint = event.InterimEventEndpoint(
self.CONF, pipeline.PublishContext([pipe]), pipe.name)
fake_publisher.publish_events.side_effect = Exception
ret = event_pipeline_endpoint.sample([

View File

@ -73,9 +73,8 @@ class TestNotifications(base.BaseTestCase):
self.setup_messaging(self.CONF)
def test_process_request_notification(self):
sample = list(middleware.HTTPRequest(mock.Mock()).build_sample(
HTTP_REQUEST
))[0]
sample = list(middleware.HTTPRequest(
mock.Mock(), mock.Mock()).build_sample(HTTP_REQUEST))[0]
self.assertEqual(HTTP_REQUEST['payload']['request']['HTTP_X_USER_ID'],
sample.user_id)
self.assertEqual(HTTP_REQUEST['payload']['request']
@ -86,7 +85,7 @@ class TestNotifications(base.BaseTestCase):
def test_process_response_notification(self):
sample = list(middleware.HTTPResponse(
mock.Mock()).build_sample(HTTP_RESPONSE))[0]
mock.Mock(), mock.Mock()).build_sample(HTTP_RESPONSE))[0]
self.assertEqual(HTTP_RESPONSE['payload']['request']['HTTP_X_USER_ID'],
sample.user_id)
self.assertEqual(HTTP_RESPONSE['payload']['request']

View File

@ -20,12 +20,10 @@ import mock
import oslo_messaging
from oslo_utils import fileutils
import six
from stevedore import extension
import yaml
from ceilometer import messaging
from ceilometer import notification
from ceilometer import pipeline
from ceilometer.publisher import test as test_publisher
from ceilometer import service
from ceilometer.tests import base as tests_base
@ -96,69 +94,24 @@ class BaseNotificationTest(tests_base.BaseTestCase):
self.fail('Did not start pipeline queues')
class _FakeNotificationPlugin(pipeline.NotificationEndpoint):
event_types = ['fake.event']
def build_sample(self, message):
return []
class TestNotification(BaseNotificationTest):
def setUp(self):
super(TestNotification, self).setUp()
self.CONF = service.prepare_service([], [])
self.CONF.set_override("backend_url", "zake://", group="coordination")
self.setup_messaging(self.CONF)
self.srv = notification.NotificationService(0, self.CONF)
def test_targets(self):
self.assertEqual(15, len(self.srv.get_targets()))
def fake_get_notifications_manager(self, pm):
self.plugin = _FakeNotificationPlugin(pm)
return extension.ExtensionManager.make_test_instance(
[
extension.Extension('test',
None,
None,
self.plugin)
]
)
@mock.patch('ceilometer.pipeline.event.EventEndpoint')
def _do_process_notification_manager_start(self,
fake_event_endpoint_class):
with mock.patch.object(self.srv,
'_get_notifications_manager') as get_nm:
get_nm.side_effect = self.fake_get_notifications_manager
self.run_service(self.srv)
self.fake_event_endpoint = fake_event_endpoint_class.return_value
def test_start_multiple_listeners(self):
urls = ["fake://vhost1", "fake://vhost2"]
self.CONF.set_override("messaging_urls", urls, group="notification")
self._do_process_notification_manager_start()
self.srv.run()
self.addCleanup(self.srv.terminate)
self.assertEqual(2, len(self.srv.listeners))
def test_build_sample(self):
self._do_process_notification_manager_start()
self.srv.pipeline_manager.pipelines[0] = mock.MagicMock()
self.plugin.info([{'ctxt': TEST_NOTICE_CTXT,
'publisher_id': 'compute.vagrant-precise',
'event_type': 'compute.instance.create.end',
'payload': TEST_NOTICE_PAYLOAD,
'metadata': TEST_NOTICE_METADATA}])
self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints))
def test_process_notification_with_events(self):
self._do_process_notification_manager_start()
self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints))
self.assertEqual(self.fake_event_endpoint,
self.srv.listeners[0].dispatcher.endpoints[0])
@mock.patch('oslo_messaging.get_batch_notification_listener')
def test_unique_consumers(self, mock_listener):
self.CONF.set_override('notification_control_exchanges', ['dup'] * 2,
@ -343,24 +296,18 @@ class TestRealNotificationHA(BaseRealNotification):
targets = mock_listener.call_args[0][1]
self.assertIsNotEmpty(targets)
endpoints = {}
for endpoint in mock_listener.call_args[0][2]:
self.assertEqual(1, len(endpoint.publish_context.pipelines))
pipe = list(endpoint.publish_context.pipelines)[0]
endpoints[pipe.name] = endpoint
pipe_list = []
for mgr in self.srv.managers:
for pipe in mgr.pipelines:
pipe_list.append(pipe.name)
notifiers = []
pipe_manager = self.srv._get_pipe_manager(
self.srv.transport, self.srv.pipeline_manager)
notifiers.extend(pipe_manager.transporters[0][2])
event_pipe_manager = self.srv._get_event_pipeline_manager(
self.srv.transport)
notifiers.extend(event_pipe_manager.transporters[0][2])
for notifier in notifiers:
filter_rule = endpoints[notifier.publisher_id].filter_rule
self.assertEqual(True, filter_rule.match(None,
notifier.publisher_id,
None, None, None))
for pipe in pipe_list:
for endpoint in mock_listener.call_args[0][2]:
self.assertTrue(hasattr(endpoint, 'filter_rule'))
if endpoint.filter_rule.match(None, None, pipe, None, None):
break
else:
self.fail('%s not handled by any endpoint' % pipe)
@mock.patch('oslo_messaging.Notifier.sample')
def test_broadcast_to_relevant_pipes_only(self, mock_notifier):
@ -392,12 +339,10 @@ class TestRealNotificationHA(BaseRealNotification):
self.assertTrue(mock_notifier.called)
self.assertEqual(3, mock_notifier.call_count)
self.assertEqual('pipeline.event',
mock_notifier.call_args_list[0][1]['event_type'])
self.assertEqual('ceilometer.pipeline',
mock_notifier.call_args_list[1][1]['event_type'])
self.assertEqual('ceilometer.pipeline',
mock_notifier.call_args_list[2][1]['event_type'])
self.assertEqual(1, len([i for i in mock_notifier.call_args_list
if 'event_type' in i[1]['payload'][0]]))
self.assertEqual(2, len([i for i in mock_notifier.call_args_list
if 'counter_name' in i[1]['payload'][0]]))
class TestRealNotificationMultipleAgents(BaseNotificationTest):

View File

@ -38,7 +38,11 @@ zaqar =
python-zaqarclient>=1.0.0 # Apache-2.0
[entry_points]
ceilometer.notification =
ceilometer.notification.pipeline =
meter = ceilometer.pipeline.sample:SamplePipelineManager
event = ceilometer.pipeline.event:EventPipelineManager
ceilometer.sample.endpoint =
http.request = ceilometer.middleware:HTTPRequest
http.response = ceilometer.middleware:HTTPResponse
hardware.ipmi.temperature = ceilometer.ipmi.notifications.ironic:TemperatureSensorNotification