publisher: clean out context usage
It turns out the context object is never used by anybody, and is anyway empty. So let's remove its usage completely. Change-Id: I4efeb629dfd84e7cddeb0df908207cf017fc741c
This commit is contained in:
parent
b2549ae93e
commit
5d850f961c
@ -18,7 +18,6 @@
|
|||||||
import abc
|
import abc
|
||||||
import collections
|
import collections
|
||||||
|
|
||||||
from oslo_context import context
|
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
import six
|
import six
|
||||||
@ -103,19 +102,18 @@ class NotificationBase(PluginBase):
|
|||||||
try:
|
try:
|
||||||
notification = messaging.convert_to_old_notification_format(
|
notification = messaging.convert_to_old_notification_format(
|
||||||
priority, notification)
|
priority, notification)
|
||||||
self.to_samples_and_publish(context.get_admin_context(),
|
self.to_samples_and_publish(notification)
|
||||||
notification)
|
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.error(_LE('Fail to process notification'), exc_info=True)
|
LOG.error(_LE('Fail to process notification'), exc_info=True)
|
||||||
|
|
||||||
def to_samples_and_publish(self, context, notification):
|
def to_samples_and_publish(self, notification):
|
||||||
"""Return samples produced by *process_notification*.
|
"""Return samples produced by *process_notification*.
|
||||||
|
|
||||||
Samples produced for the given notification.
|
Samples produced for the given notification.
|
||||||
:param context: Execution context from the service or RPC call
|
:param context: Execution context from the service or RPC call
|
||||||
:param notification: The notification to process.
|
:param notification: The notification to process.
|
||||||
"""
|
"""
|
||||||
with self.manager.publisher(context) as p:
|
with self.manager.publisher() as p:
|
||||||
p(list(self.process_notification(notification)))
|
p(list(self.process_notification(notification)))
|
||||||
|
|
||||||
|
|
||||||
|
@ -20,7 +20,6 @@ import logging
|
|||||||
import sys
|
import sys
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_context import context
|
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
from stevedore import extension
|
from stevedore import extension
|
||||||
|
|
||||||
@ -80,7 +79,7 @@ def send_sample():
|
|||||||
pipeline_manager = pipeline.setup_pipeline(
|
pipeline_manager = pipeline.setup_pipeline(
|
||||||
extension.ExtensionManager('ceilometer.transformer'))
|
extension.ExtensionManager('ceilometer.transformer'))
|
||||||
|
|
||||||
with pipeline_manager.publisher(context.get_admin_context()) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([sample.Sample(
|
p([sample.Sample(
|
||||||
name=cfg.CONF.sample_name,
|
name=cfg.CONF.sample_name,
|
||||||
type=cfg.CONF.sample_type,
|
type=cfg.CONF.sample_type,
|
||||||
|
@ -15,7 +15,6 @@
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_context import context
|
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
from stevedore import extension
|
from stevedore import extension
|
||||||
|
|
||||||
@ -30,7 +29,6 @@ class EventsNotificationEndpoint(object):
|
|||||||
def __init__(self, manager):
|
def __init__(self, manager):
|
||||||
super(EventsNotificationEndpoint, self).__init__()
|
super(EventsNotificationEndpoint, self).__init__()
|
||||||
LOG.debug('Loading event definitions')
|
LOG.debug('Loading event definitions')
|
||||||
self.ctxt = context.get_admin_context()
|
|
||||||
self.event_converter = event_converter.setup_events(
|
self.event_converter = event_converter.setup_events(
|
||||||
extension.ExtensionManager(
|
extension.ExtensionManager(
|
||||||
namespace='ceilometer.event.trait_plugin'))
|
namespace='ceilometer.event.trait_plugin'))
|
||||||
@ -61,7 +59,7 @@ class EventsNotificationEndpoint(object):
|
|||||||
try:
|
try:
|
||||||
event = self.event_converter.to_event(notification)
|
event = self.event_converter.to_event(notification)
|
||||||
if event is not None:
|
if event is not None:
|
||||||
with self.manager.publisher(self.ctxt) as p:
|
with self.manager.publisher() as p:
|
||||||
p(event)
|
p(event)
|
||||||
except Exception:
|
except Exception:
|
||||||
if not cfg.CONF.notification.ack_on_event_error:
|
if not cfg.CONF.notification.ack_on_event_error:
|
||||||
|
@ -16,7 +16,6 @@ import itertools
|
|||||||
import threading
|
import threading
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_context import context
|
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
from stevedore import extension
|
from stevedore import extension
|
||||||
@ -156,7 +155,6 @@ class NotificationService(service_base.BaseService):
|
|||||||
self.transport = messaging.get_transport()
|
self.transport = messaging.get_transport()
|
||||||
|
|
||||||
if cfg.CONF.notification.workload_partitioning:
|
if cfg.CONF.notification.workload_partitioning:
|
||||||
self.ctxt = context.get_admin_context()
|
|
||||||
self.group_id = self.NOTIFICATION_NAMESPACE
|
self.group_id = self.NOTIFICATION_NAMESPACE
|
||||||
self.partition_coordinator = coordination.PartitionCoordinator()
|
self.partition_coordinator = coordination.PartitionCoordinator()
|
||||||
self.partition_coordinator.start()
|
self.partition_coordinator.start()
|
||||||
@ -285,7 +283,7 @@ class NotificationService(service_base.BaseService):
|
|||||||
listener = messaging.get_batch_notification_listener(
|
listener = messaging.get_batch_notification_listener(
|
||||||
transport,
|
transport,
|
||||||
[oslo_messaging.Target(topic=topic)],
|
[oslo_messaging.Target(topic=topic)],
|
||||||
[pipe_endpoint(self.ctxt, pipe)],
|
[pipe_endpoint(pipe)],
|
||||||
batch_size=cfg.CONF.notification.batch_size,
|
batch_size=cfg.CONF.notification.batch_size,
|
||||||
batch_timeout=cfg.CONF.notification.batch_timeout)
|
batch_timeout=cfg.CONF.notification.batch_timeout)
|
||||||
listener.start()
|
listener.start()
|
||||||
|
@ -80,8 +80,8 @@ class PipelineException(Exception):
|
|||||||
@six.add_metaclass(abc.ABCMeta)
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
class PipelineEndpoint(object):
|
class PipelineEndpoint(object):
|
||||||
|
|
||||||
def __init__(self, context, pipeline):
|
def __init__(self, pipeline):
|
||||||
self.publish_context = PublishContext(context, [pipeline])
|
self.publish_context = PublishContext([pipeline])
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def sample(self, messages):
|
def sample(self, messages):
|
||||||
@ -149,7 +149,7 @@ class _PipelineTransportManager(object):
|
|||||||
def add_transporter(self, transporter):
|
def add_transporter(self, transporter):
|
||||||
self.transporters.append(transporter)
|
self.transporters.append(transporter)
|
||||||
|
|
||||||
def publisher(self, context):
|
def publisher(self):
|
||||||
serializer = self.serializer
|
serializer = self.serializer
|
||||||
hash_grouping = self.hash_grouping
|
hash_grouping = self.hash_grouping
|
||||||
transporters = self.transporters
|
transporters = self.transporters
|
||||||
@ -171,7 +171,7 @@ class _PipelineTransportManager(object):
|
|||||||
grouping_keys)
|
grouping_keys)
|
||||||
% len(notifiers))
|
% len(notifiers))
|
||||||
notifier = notifiers[key]
|
notifier = notifiers[key]
|
||||||
notifier.sample(context.to_dict(),
|
notifier.sample({},
|
||||||
event_type=event_type,
|
event_type=event_type,
|
||||||
payload=[serialized_data])
|
payload=[serialized_data])
|
||||||
return p
|
return p
|
||||||
@ -204,10 +204,9 @@ class EventPipelineTransportManager(_PipelineTransportManager):
|
|||||||
|
|
||||||
class PublishContext(object):
|
class PublishContext(object):
|
||||||
|
|
||||||
def __init__(self, context, pipelines=None):
|
def __init__(self, pipelines=None):
|
||||||
pipelines = pipelines or []
|
pipelines = pipelines or []
|
||||||
self.pipelines = set(pipelines)
|
self.pipelines = set(pipelines)
|
||||||
self.context = context
|
|
||||||
|
|
||||||
def add_pipelines(self, pipelines):
|
def add_pipelines(self, pipelines):
|
||||||
self.pipelines.update(pipelines)
|
self.pipelines.update(pipelines)
|
||||||
@ -215,12 +214,12 @@ class PublishContext(object):
|
|||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
def p(data):
|
def p(data):
|
||||||
for p in self.pipelines:
|
for p in self.pipelines:
|
||||||
p.publish_data(self.context, data)
|
p.publish_data(data)
|
||||||
return p
|
return p
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_value, traceback):
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
for p in self.pipelines:
|
for p in self.pipelines:
|
||||||
p.flush(self.context)
|
p.flush()
|
||||||
|
|
||||||
|
|
||||||
class Source(object):
|
class Source(object):
|
||||||
@ -419,11 +418,11 @@ class EventSink(Sink):
|
|||||||
|
|
||||||
NAMESPACE = 'ceilometer.event.publisher'
|
NAMESPACE = 'ceilometer.event.publisher'
|
||||||
|
|
||||||
def publish_events(self, ctxt, events):
|
def publish_events(self, events):
|
||||||
if events:
|
if events:
|
||||||
for p in self.publishers:
|
for p in self.publishers:
|
||||||
try:
|
try:
|
||||||
p.publish_events(ctxt, events)
|
p.publish_events(events)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception(_("Pipeline %(pipeline)s: %(status)s"
|
LOG.exception(_("Pipeline %(pipeline)s: %(status)s"
|
||||||
" after error from publisher %(pub)s") %
|
" after error from publisher %(pub)s") %
|
||||||
@ -433,19 +432,19 @@ class EventSink(Sink):
|
|||||||
if not self.multi_publish:
|
if not self.multi_publish:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def flush(self, ctxt):
|
@staticmethod
|
||||||
|
def flush():
|
||||||
"""Flush data after all events have been injected to pipeline."""
|
"""Flush data after all events have been injected to pipeline."""
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class SampleSink(Sink):
|
class SampleSink(Sink):
|
||||||
|
|
||||||
NAMESPACE = 'ceilometer.publisher'
|
NAMESPACE = 'ceilometer.publisher'
|
||||||
|
|
||||||
def _transform_sample(self, start, ctxt, sample):
|
def _transform_sample(self, start, sample):
|
||||||
try:
|
try:
|
||||||
for transformer in self.transformers[start:]:
|
for transformer in self.transformers[start:]:
|
||||||
sample = transformer.handle_sample(ctxt, sample)
|
sample = transformer.handle_sample(sample)
|
||||||
if not sample:
|
if not sample:
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
"Pipeline %(pipeline)s: Sample dropped by "
|
"Pipeline %(pipeline)s: Sample dropped by "
|
||||||
@ -462,13 +461,12 @@ class SampleSink(Sink):
|
|||||||
'smp': sample}))
|
'smp': sample}))
|
||||||
LOG.exception(err)
|
LOG.exception(err)
|
||||||
|
|
||||||
def _publish_samples(self, start, ctxt, samples):
|
def _publish_samples(self, start, samples):
|
||||||
"""Push samples into pipeline for publishing.
|
"""Push samples into pipeline for publishing.
|
||||||
|
|
||||||
:param start: The first transformer that the sample will be injected.
|
:param start: The first transformer that the sample will be injected.
|
||||||
This is mainly for flush() invocation that transformer
|
This is mainly for flush() invocation that transformer
|
||||||
may emit samples.
|
may emit samples.
|
||||||
:param ctxt: Execution context from the manager or service.
|
|
||||||
:param samples: Sample list.
|
:param samples: Sample list.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@ -483,30 +481,30 @@ class SampleSink(Sink):
|
|||||||
"%(smp)s from %(trans)s transformer", {'pipeline': self,
|
"%(smp)s from %(trans)s transformer", {'pipeline': self,
|
||||||
'smp': sample,
|
'smp': sample,
|
||||||
'trans': start})
|
'trans': start})
|
||||||
sample = self._transform_sample(start, ctxt, sample)
|
sample = self._transform_sample(start, sample)
|
||||||
if sample:
|
if sample:
|
||||||
transformed_samples.append(sample)
|
transformed_samples.append(sample)
|
||||||
|
|
||||||
if transformed_samples:
|
if transformed_samples:
|
||||||
for p in self.publishers:
|
for p in self.publishers:
|
||||||
try:
|
try:
|
||||||
p.publish_samples(ctxt, transformed_samples)
|
p.publish_samples(transformed_samples)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception(_(
|
LOG.exception(_(
|
||||||
"Pipeline %(pipeline)s: Continue after error "
|
"Pipeline %(pipeline)s: Continue after error "
|
||||||
"from publisher %(pub)s") % ({'pipeline': self,
|
"from publisher %(pub)s") % ({'pipeline': self,
|
||||||
'pub': p}))
|
'pub': p}))
|
||||||
|
|
||||||
def publish_samples(self, ctxt, samples):
|
def publish_samples(self, samples):
|
||||||
self._publish_samples(0, ctxt, samples)
|
self._publish_samples(0, samples)
|
||||||
|
|
||||||
def flush(self, ctxt):
|
def flush(self):
|
||||||
"""Flush data after all samples have been injected to pipeline."""
|
"""Flush data after all samples have been injected to pipeline."""
|
||||||
|
|
||||||
for (i, transformer) in enumerate(self.transformers):
|
for (i, transformer) in enumerate(self.transformers):
|
||||||
try:
|
try:
|
||||||
self._publish_samples(i + 1, ctxt,
|
self._publish_samples(i + 1,
|
||||||
list(transformer.flush(ctxt)))
|
list(transformer.flush()))
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
LOG.warning(_(
|
LOG.warning(_(
|
||||||
"Pipeline %(pipeline)s: Error flushing "
|
"Pipeline %(pipeline)s: Error flushing "
|
||||||
@ -528,15 +526,15 @@ class Pipeline(object):
|
|||||||
return (self.source.name if self.source.name == self.sink.name
|
return (self.source.name if self.source.name == self.sink.name
|
||||||
else '%s:%s' % (self.source.name, self.sink.name))
|
else '%s:%s' % (self.source.name, self.sink.name))
|
||||||
|
|
||||||
def flush(self, ctxt):
|
def flush(self):
|
||||||
self.sink.flush(ctxt)
|
self.sink.flush()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def publishers(self):
|
def publishers(self):
|
||||||
return self.sink.publishers
|
return self.sink.publishers
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def publish_data(self, ctxt, data):
|
def publish_data(self, data):
|
||||||
"""Publish data from pipeline."""
|
"""Publish data from pipeline."""
|
||||||
|
|
||||||
|
|
||||||
@ -551,12 +549,12 @@ class EventPipeline(Pipeline):
|
|||||||
def support_event(self, event_type):
|
def support_event(self, event_type):
|
||||||
return self.source.support_event(event_type)
|
return self.source.support_event(event_type)
|
||||||
|
|
||||||
def publish_data(self, ctxt, events):
|
def publish_data(self, events):
|
||||||
if not isinstance(events, list):
|
if not isinstance(events, list):
|
||||||
events = [events]
|
events = [events]
|
||||||
supported = [e for e in events
|
supported = [e for e in events
|
||||||
if self.source.support_event(e.event_type)]
|
if self.source.support_event(e.event_type)]
|
||||||
self.sink.publish_events(ctxt, supported)
|
self.sink.publish_events(supported)
|
||||||
|
|
||||||
|
|
||||||
class SamplePipeline(Pipeline):
|
class SamplePipeline(Pipeline):
|
||||||
@ -605,12 +603,12 @@ class SamplePipeline(Pipeline):
|
|||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def publish_data(self, ctxt, samples):
|
def publish_data(self, samples):
|
||||||
if not isinstance(samples, list):
|
if not isinstance(samples, list):
|
||||||
samples = [samples]
|
samples = [samples]
|
||||||
supported = [s for s in samples if self.source.support_meter(s.name)
|
supported = [s for s in samples if self.source.support_meter(s.name)
|
||||||
and self._validate_volume(s)]
|
and self._validate_volume(s)]
|
||||||
self.sink.publish_samples(ctxt, supported)
|
self.sink.publish_samples(supported)
|
||||||
|
|
||||||
|
|
||||||
SAMPLE_TYPE = {'pipeline': SamplePipeline,
|
SAMPLE_TYPE = {'pipeline': SamplePipeline,
|
||||||
@ -741,12 +739,12 @@ class PipelineManager(object):
|
|||||||
self.pipelines.append(pipe)
|
self.pipelines.append(pipe)
|
||||||
unique_names.clear()
|
unique_names.clear()
|
||||||
|
|
||||||
def publisher(self, context):
|
def publisher(self):
|
||||||
"""Build a new Publisher for these manager pipelines.
|
"""Build a new Publisher for these manager pipelines.
|
||||||
|
|
||||||
:param context: The context.
|
:param context: The context.
|
||||||
"""
|
"""
|
||||||
return PublishContext(context, self.pipelines)
|
return PublishContext(self.pipelines)
|
||||||
|
|
||||||
|
|
||||||
class PollingManager(object):
|
class PollingManager(object):
|
||||||
|
@ -40,9 +40,9 @@ class PublisherBase(object):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def publish_samples(self, context, samples):
|
def publish_samples(self, samples):
|
||||||
"""Publish samples into final conduit."""
|
"""Publish samples into final conduit."""
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def publish_events(self, context, events):
|
def publish_events(self, events):
|
||||||
"""Publish events into final conduit."""
|
"""Publish events into final conduit."""
|
||||||
|
@ -35,7 +35,7 @@ class DirectPublisher(publisher.PublisherBase):
|
|||||||
self.meter_conn = dispatcher.meter_conn
|
self.meter_conn = dispatcher.meter_conn
|
||||||
self.event_conn = dispatcher.event_conn
|
self.event_conn = dispatcher.event_conn
|
||||||
|
|
||||||
def publish_samples(self, context, samples):
|
def publish_samples(self, samples):
|
||||||
if not isinstance(samples, list):
|
if not isinstance(samples, list):
|
||||||
samples = [samples]
|
samples = [samples]
|
||||||
|
|
||||||
@ -52,7 +52,7 @@ class DirectPublisher(publisher.PublisherBase):
|
|||||||
meter['timestamp'] = timeutils.normalize_time(ts)
|
meter['timestamp'] = timeutils.normalize_time(ts)
|
||||||
self.meter_conn.record_metering_data(meter)
|
self.meter_conn.record_metering_data(meter)
|
||||||
|
|
||||||
def publish_events(self, context, events):
|
def publish_events(self, events):
|
||||||
if not isinstance(events, list):
|
if not isinstance(events, list):
|
||||||
events = [events]
|
events = [events]
|
||||||
|
|
||||||
|
@ -87,20 +87,18 @@ class FilePublisher(publisher.PublisherBase):
|
|||||||
rfh.setLevel(logging.INFO)
|
rfh.setLevel(logging.INFO)
|
||||||
self.publisher_logger.addHandler(rfh)
|
self.publisher_logger.addHandler(rfh)
|
||||||
|
|
||||||
def publish_samples(self, context, samples):
|
def publish_samples(self, samples):
|
||||||
"""Send a metering message for publishing
|
"""Send a metering message for publishing
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call
|
|
||||||
:param samples: Samples from pipeline after transformation
|
:param samples: Samples from pipeline after transformation
|
||||||
"""
|
"""
|
||||||
if self.publisher_logger:
|
if self.publisher_logger:
|
||||||
for sample in samples:
|
for sample in samples:
|
||||||
self.publisher_logger.info(sample.as_dict())
|
self.publisher_logger.info(sample.as_dict())
|
||||||
|
|
||||||
def publish_events(self, context, events):
|
def publish_events(self, events):
|
||||||
"""Send an event message for publishing
|
"""Send an event message for publishing
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call
|
|
||||||
:param events: events from pipeline after transformation
|
:param events: events from pipeline after transformation
|
||||||
"""
|
"""
|
||||||
raise ceilometer.NotImplementedError
|
raise ceilometer.NotImplementedError
|
||||||
|
@ -119,19 +119,17 @@ class HttpPublisher(publisher.PublisherBase):
|
|||||||
LOG.error(_LE('Data post failed with status code %s') %
|
LOG.error(_LE('Data post failed with status code %s') %
|
||||||
res.status_code)
|
res.status_code)
|
||||||
|
|
||||||
def publish_samples(self, context, samples):
|
def publish_samples(self, samples):
|
||||||
"""Send a metering message for publishing
|
"""Send a metering message for publishing
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call
|
|
||||||
:param samples: Samples from pipeline after transformation
|
:param samples: Samples from pipeline after transformation
|
||||||
"""
|
"""
|
||||||
data = [sample.as_dict() for sample in samples]
|
data = [sample.as_dict() for sample in samples]
|
||||||
self._do_post(data)
|
self._do_post(data)
|
||||||
|
|
||||||
def publish_events(self, context, events):
|
def publish_events(self, events):
|
||||||
"""Send an event message for publishing
|
"""Send an event message for publishing
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call
|
|
||||||
:param events: events from pipeline after transformation
|
:param events: events from pipeline after transformation
|
||||||
"""
|
"""
|
||||||
data = [evt.as_dict()['raw']['payload'] for evt in events
|
data = [evt.as_dict()['raw']['payload'] for evt in events
|
||||||
|
@ -84,7 +84,7 @@ class KafkaBrokerPublisher(messaging.MessagingPublisher):
|
|||||||
raise messaging.DeliveryFailure('Kafka Client is not available, '
|
raise messaging.DeliveryFailure('Kafka Client is not available, '
|
||||||
'please restart Kafka client')
|
'please restart Kafka client')
|
||||||
|
|
||||||
def _send(self, context, event_type, data):
|
def _send(self, event_type, data):
|
||||||
self._ensure_connection()
|
self._ensure_connection()
|
||||||
# TODO(sileht): don't split the payload into multiple network
|
# TODO(sileht): don't split the payload into multiple network
|
||||||
# message ... but how to do that without breaking consuming
|
# message ... but how to do that without breaking consuming
|
||||||
|
@ -98,10 +98,9 @@ class MessagingPublisher(publisher.PublisherBase):
|
|||||||
|
|
||||||
self.retry = 1 if self.policy in ['queue', 'drop'] else None
|
self.retry = 1 if self.policy in ['queue', 'drop'] else None
|
||||||
|
|
||||||
def publish_samples(self, context, samples):
|
def publish_samples(self, samples):
|
||||||
"""Publish samples on RPC.
|
"""Publish samples on RPC.
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call.
|
|
||||||
:param samples: Samples from pipeline after transformation.
|
:param samples: Samples from pipeline after transformation.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@ -112,7 +111,7 @@ class MessagingPublisher(publisher.PublisherBase):
|
|||||||
for sample in samples
|
for sample in samples
|
||||||
]
|
]
|
||||||
topic = cfg.CONF.publisher_notifier.metering_topic
|
topic = cfg.CONF.publisher_notifier.metering_topic
|
||||||
self.local_queue.append((context, topic, meters))
|
self.local_queue.append((topic, meters))
|
||||||
|
|
||||||
if self.per_meter_topic:
|
if self.per_meter_topic:
|
||||||
for meter_name, meter_list in itertools.groupby(
|
for meter_name, meter_list in itertools.groupby(
|
||||||
@ -122,7 +121,7 @@ class MessagingPublisher(publisher.PublisherBase):
|
|||||||
topic_name = topic + '.' + meter_name
|
topic_name = topic + '.' + meter_name
|
||||||
LOG.debug('Publishing %(m)d samples on %(n)s',
|
LOG.debug('Publishing %(m)d samples on %(n)s',
|
||||||
{'m': len(meter_list), 'n': topic_name})
|
{'m': len(meter_list), 'n': topic_name})
|
||||||
self.local_queue.append((context, topic_name, meter_list))
|
self.local_queue.append((topic_name, meter_list))
|
||||||
|
|
||||||
self.flush()
|
self.flush()
|
||||||
|
|
||||||
@ -150,11 +149,11 @@ class MessagingPublisher(publisher.PublisherBase):
|
|||||||
def _process_queue(self, queue, policy):
|
def _process_queue(self, queue, policy):
|
||||||
current_retry = 0
|
current_retry = 0
|
||||||
while queue:
|
while queue:
|
||||||
context, topic, data = queue[0]
|
topic, data = queue[0]
|
||||||
try:
|
try:
|
||||||
self._send(context, topic, data)
|
self._send(topic, data)
|
||||||
except DeliveryFailure:
|
except DeliveryFailure:
|
||||||
data = sum([len(m) for __, __, m in queue])
|
data = sum([len(m) for __, m in queue])
|
||||||
if policy == 'queue':
|
if policy == 'queue':
|
||||||
LOG.warning(_("Failed to publish %d datapoints, queue "
|
LOG.warning(_("Failed to publish %d datapoints, queue "
|
||||||
"them"), data)
|
"them"), data)
|
||||||
@ -172,21 +171,20 @@ class MessagingPublisher(publisher.PublisherBase):
|
|||||||
queue.pop(0)
|
queue.pop(0)
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def publish_events(self, context, events):
|
def publish_events(self, events):
|
||||||
"""Send an event message for publishing
|
"""Send an event message for publishing
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call
|
|
||||||
:param events: events from pipeline after transformation
|
:param events: events from pipeline after transformation
|
||||||
"""
|
"""
|
||||||
ev_list = [utils.message_from_event(
|
ev_list = [utils.message_from_event(
|
||||||
event, cfg.CONF.publisher.telemetry_secret) for event in events]
|
event, cfg.CONF.publisher.telemetry_secret) for event in events]
|
||||||
|
|
||||||
topic = cfg.CONF.publisher_notifier.event_topic
|
topic = cfg.CONF.publisher_notifier.event_topic
|
||||||
self.local_queue.append((context, topic, ev_list))
|
self.local_queue.append((topic, ev_list))
|
||||||
self.flush()
|
self.flush()
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def _send(self, context, topic, meters):
|
def _send(self, topic, meters):
|
||||||
"""Send the meters to the messaging topic."""
|
"""Send the meters to the messaging topic."""
|
||||||
|
|
||||||
|
|
||||||
@ -203,9 +201,9 @@ class NotifierPublisher(MessagingPublisher):
|
|||||||
retry=self.retry
|
retry=self.retry
|
||||||
)
|
)
|
||||||
|
|
||||||
def _send(self, context, event_type, data):
|
def _send(self, event_type, data):
|
||||||
try:
|
try:
|
||||||
self.notifier.sample(context.to_dict(), event_type=event_type,
|
self.notifier.sample({}, event_type=event_type,
|
||||||
payload=data)
|
payload=data)
|
||||||
except oslo_messaging.MessageDeliveryFailure as e:
|
except oslo_messaging.MessageDeliveryFailure as e:
|
||||||
raise_delivery_failure(e)
|
raise_delivery_failure(e)
|
||||||
|
@ -26,19 +26,17 @@ class TestPublisher(publisher.PublisherBase):
|
|||||||
self.events = []
|
self.events = []
|
||||||
self.calls = 0
|
self.calls = 0
|
||||||
|
|
||||||
def publish_samples(self, context, samples):
|
def publish_samples(self, samples):
|
||||||
"""Send a metering message for publishing
|
"""Send a metering message for publishing
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call
|
|
||||||
:param samples: Samples from pipeline after transformation
|
:param samples: Samples from pipeline after transformation
|
||||||
"""
|
"""
|
||||||
self.samples.extend(samples)
|
self.samples.extend(samples)
|
||||||
self.calls += 1
|
self.calls += 1
|
||||||
|
|
||||||
def publish_events(self, context, events):
|
def publish_events(self, events):
|
||||||
"""Send an event message for publishing
|
"""Send an event message for publishing
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call
|
|
||||||
:param events: events from pipeline after transformation
|
:param events: events from pipeline after transformation
|
||||||
"""
|
"""
|
||||||
self.events.extend(events)
|
self.events.extend(events)
|
||||||
|
@ -45,10 +45,9 @@ class UDPPublisher(publisher.PublisherBase):
|
|||||||
self.socket = socket.socket(addr_family,
|
self.socket = socket.socket(addr_family,
|
||||||
socket.SOCK_DGRAM)
|
socket.SOCK_DGRAM)
|
||||||
|
|
||||||
def publish_samples(self, context, samples):
|
def publish_samples(self, samples):
|
||||||
"""Send a metering message for publishing
|
"""Send a metering message for publishing
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call
|
|
||||||
:param samples: Samples from pipeline after transformation
|
:param samples: Samples from pipeline after transformation
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@ -67,10 +66,9 @@ class UDPPublisher(publisher.PublisherBase):
|
|||||||
LOG.warning(_("Unable to send sample over UDP"))
|
LOG.warning(_("Unable to send sample over UDP"))
|
||||||
LOG.exception(e)
|
LOG.exception(e)
|
||||||
|
|
||||||
def publish_events(self, context, events):
|
def publish_events(self, events):
|
||||||
"""Send an event message for publishing
|
"""Send an event message for publishing
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call
|
|
||||||
:param events: events from pipeline after transformation
|
:param events: events from pipeline after transformation
|
||||||
"""
|
"""
|
||||||
raise ceilometer.NotImplementedError
|
raise ceilometer.NotImplementedError
|
||||||
|
@ -72,8 +72,7 @@ class TestDirectPublisher(tests_db.TestBase):
|
|||||||
group='database')
|
group='database')
|
||||||
parsed_url = netutils.urlsplit('direct://')
|
parsed_url = netutils.urlsplit('direct://')
|
||||||
publisher = direct.DirectPublisher(parsed_url)
|
publisher = direct.DirectPublisher(parsed_url)
|
||||||
publisher.publish_samples(None,
|
publisher.publish_samples(self.test_data)
|
||||||
self.test_data)
|
|
||||||
|
|
||||||
meters = list(self.conn.get_meters(resource=self.resource_id))
|
meters = list(self.conn.get_meters(resource=self.resource_id))
|
||||||
names = sorted([meter.name for meter in meters])
|
names = sorted([meter.name for meter in meters])
|
||||||
@ -92,7 +91,7 @@ class TestEventDirectPublisher(tests_db.TestBase):
|
|||||||
def test_direct_publisher(self):
|
def test_direct_publisher(self):
|
||||||
parsed_url = netutils.urlsplit('direct://')
|
parsed_url = netutils.urlsplit('direct://')
|
||||||
publisher = direct.DirectPublisher(parsed_url)
|
publisher = direct.DirectPublisher(parsed_url)
|
||||||
publisher.publish_events(None, self.test_data)
|
publisher.publish_events(self.test_data)
|
||||||
|
|
||||||
e_types = list(self.event_conn.get_event_types())
|
e_types = list(self.event_conn.get_event_types())
|
||||||
self.assertEqual(5, len(e_types))
|
self.assertEqual(5, len(e_types))
|
||||||
|
@ -23,7 +23,6 @@ import datetime
|
|||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
from oslo_context import context
|
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
from oslotest import base
|
from oslotest import base
|
||||||
from oslotest import mockpatch
|
from oslotest import mockpatch
|
||||||
@ -81,10 +80,10 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
return fake_drivers[url](url)
|
return fake_drivers[url](url)
|
||||||
|
|
||||||
class PublisherClassException(publisher.PublisherBase):
|
class PublisherClassException(publisher.PublisherBase):
|
||||||
def publish_samples(self, ctxt, samples):
|
def publish_samples(self, samples):
|
||||||
raise Exception()
|
raise Exception()
|
||||||
|
|
||||||
def publish_events(self, ctxt, events):
|
def publish_events(self, events):
|
||||||
raise Exception()
|
raise Exception()
|
||||||
|
|
||||||
class TransformerClass(transformer.TransformerBase):
|
class TransformerClass(transformer.TransformerBase):
|
||||||
@ -95,10 +94,11 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self.__class__.samples = []
|
self.__class__.samples = []
|
||||||
self.append_name = append_name
|
self.append_name = append_name
|
||||||
|
|
||||||
def flush(self, ctxt):
|
@staticmethod
|
||||||
|
def flush():
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def handle_sample(self, ctxt, counter):
|
def handle_sample(self, counter):
|
||||||
self.__class__.samples.append(counter)
|
self.__class__.samples.append(counter)
|
||||||
newname = getattr(counter, 'name') + self.append_name
|
newname = getattr(counter, 'name') + self.append_name
|
||||||
return sample.Sample(
|
return sample.Sample(
|
||||||
@ -120,14 +120,14 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.__class__.samples = []
|
self.__class__.samples = []
|
||||||
|
|
||||||
def handle_sample(self, ctxt, counter):
|
def handle_sample(self, counter):
|
||||||
self.__class__.samples.append(counter)
|
self.__class__.samples.append(counter)
|
||||||
|
|
||||||
class TransformerClassException(object):
|
class TransformerClassException(object):
|
||||||
grouping_keys = ['resource_id']
|
grouping_keys = ['resource_id']
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def handle_sample(ctxt, counter):
|
def handle_sample(counter):
|
||||||
raise Exception()
|
raise Exception()
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -268,7 +268,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -284,7 +284,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -302,7 +302,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
resource_metadata=self.test_counter.resource_metadata,
|
resource_metadata=self.test_counter.resource_metadata,
|
||||||
)
|
)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
self.assertEqual(2, len(publisher.samples))
|
self.assertEqual(2, len(publisher.samples))
|
||||||
@ -329,7 +329,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
resource_metadata=self.test_counter.resource_metadata,
|
resource_metadata=self.test_counter.resource_metadata,
|
||||||
)
|
)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([test_s])
|
p([test_s])
|
||||||
|
|
||||||
LOG.warning.assert_called_once_with(
|
LOG.warning.assert_called_once_with(
|
||||||
@ -362,7 +362,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
resource_metadata=self.test_counter.resource_metadata,
|
resource_metadata=self.test_counter.resource_metadata,
|
||||||
)
|
)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([test_s])
|
p([test_s])
|
||||||
|
|
||||||
LOG.warning.assert_called_once_with(
|
LOG.warning.assert_called_once_with(
|
||||||
@ -381,7 +381,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self._set_pipeline_cfg('counters', counter_cfg)
|
self._set_pipeline_cfg('counters', counter_cfg)
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -393,7 +393,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self._set_pipeline_cfg('counters', counter_cfg)
|
self._set_pipeline_cfg('counters', counter_cfg)
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -413,7 +413,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self._set_pipeline_cfg('counters', counter_cfg)
|
self._set_pipeline_cfg('counters', counter_cfg)
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(1, len(publisher.samples))
|
self.assertEqual(1, len(publisher.samples))
|
||||||
@ -425,7 +425,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self._set_pipeline_cfg('counters', counter_cfg)
|
self._set_pipeline_cfg('counters', counter_cfg)
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -480,7 +480,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
|
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
self.test_counter = sample.Sample(
|
self.test_counter = sample.Sample(
|
||||||
@ -495,7 +495,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
resource_metadata=self.test_counter.resource_metadata,
|
resource_metadata=self.test_counter.resource_metadata,
|
||||||
)
|
)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -518,7 +518,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
self.test_counter = sample.Sample(
|
self.test_counter = sample.Sample(
|
||||||
@ -533,7 +533,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
resource_metadata=self.test_counter.resource_metadata,
|
resource_metadata=self.test_counter.resource_metadata,
|
||||||
)
|
)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -550,7 +550,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self._set_pipeline_cfg('transformers', None)
|
self._set_pipeline_cfg('transformers', None)
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(1, len(publisher.samples))
|
self.assertEqual(1, len(publisher.samples))
|
||||||
@ -561,7 +561,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self._set_pipeline_cfg('transformers', [])
|
self._set_pipeline_cfg('transformers', [])
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(1, len(publisher.samples))
|
self.assertEqual(1, len(publisher.samples))
|
||||||
@ -583,7 +583,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -617,7 +617,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self._set_pipeline_cfg('transformers', transformer_cfg)
|
self._set_pipeline_cfg('transformers', transformer_cfg)
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
self.assertEqual(2, len(self.TransformerClass.samples))
|
self.assertEqual(2, len(self.TransformerClass.samples))
|
||||||
@ -655,7 +655,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self._set_pipeline_cfg('transformers', transformer_cfg)
|
self._set_pipeline_cfg('transformers', transformer_cfg)
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -673,7 +673,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -690,7 +690,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self._set_pipeline_cfg('publishers', ['except://', 'new://'])
|
self._set_pipeline_cfg('publishers', ['except://', 'new://'])
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
new_publisher = pipeline_manager.pipelines[0].publishers[1]
|
new_publisher = pipeline_manager.pipelines[0].publishers[1]
|
||||||
@ -702,7 +702,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self._set_pipeline_cfg('counters', ['a', 'b'])
|
self._set_pipeline_cfg('counters', ['a', 'b'])
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter,
|
p([self.test_counter,
|
||||||
sample.Sample(
|
sample.Sample(
|
||||||
name='b',
|
name='b',
|
||||||
@ -743,17 +743,17 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
pipe.publish_data(None, self.test_counter)
|
pipe.publish_data(self.test_counter)
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(0, len(publisher.samples))
|
self.assertEqual(0, len(publisher.samples))
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
self.assertEqual(0, len(publisher.samples))
|
self.assertEqual(0, len(publisher.samples))
|
||||||
pipe.publish_data(None, self.test_counter)
|
pipe.publish_data(self.test_counter)
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
self.assertEqual(0, len(publisher.samples))
|
self.assertEqual(0, len(publisher.samples))
|
||||||
for i in range(CACHE_SIZE - 2):
|
for i in range(CACHE_SIZE - 2):
|
||||||
pipe.publish_data(None, self.test_counter)
|
pipe.publish_data(self.test_counter)
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
self.assertEqual(CACHE_SIZE, len(publisher.samples))
|
self.assertEqual(CACHE_SIZE, len(publisher.samples))
|
||||||
self.assertEqual('a_update_new', getattr(publisher.samples[0], 'name'))
|
self.assertEqual('a_update_new', getattr(publisher.samples[0], 'name'))
|
||||||
|
|
||||||
@ -778,7 +778,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self._set_pipeline_cfg('counters', ['a', 'b'])
|
self._set_pipeline_cfg('counters', ['a', 'b'])
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter,
|
p([self.test_counter,
|
||||||
sample.Sample(
|
sample.Sample(
|
||||||
name='b',
|
name='b',
|
||||||
@ -795,7 +795,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(0, len(publisher.samples))
|
self.assertEqual(0, len(publisher.samples))
|
||||||
|
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
self.assertEqual(CACHE_SIZE, len(publisher.samples))
|
self.assertEqual(CACHE_SIZE, len(publisher.samples))
|
||||||
@ -815,9 +815,9 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
publisher = pipe.publishers[0]
|
publisher = pipe.publishers[0]
|
||||||
pipe.publish_data(None, self.test_counter)
|
pipe.publish_data(self.test_counter)
|
||||||
self.assertEqual(0, len(publisher.samples))
|
self.assertEqual(0, len(publisher.samples))
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
self.assertEqual(1, len(publisher.samples))
|
self.assertEqual(1, len(publisher.samples))
|
||||||
self.assertEqual('a_update',
|
self.assertEqual('a_update',
|
||||||
getattr(publisher.samples[0], 'name'))
|
getattr(publisher.samples[0], 'name'))
|
||||||
@ -855,10 +855,10 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
pipe.publish_data(None, counters)
|
pipe.publish_data(counters)
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(1, len(publisher.samples))
|
self.assertEqual(1, len(publisher.samples))
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
self.assertEqual(1, len(publisher.samples))
|
self.assertEqual(1, len(publisher.samples))
|
||||||
cpu_mins = publisher.samples[-1]
|
cpu_mins = publisher.samples[-1]
|
||||||
self.assertEqual('cpu_mins', getattr(cpu_mins, 'name'))
|
self.assertEqual('cpu_mins', getattr(cpu_mins, 'name'))
|
||||||
@ -909,7 +909,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
pipe.publish_data(None, counters)
|
pipe.publish_data(counters)
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(2, len(publisher.samples))
|
self.assertEqual(2, len(publisher.samples))
|
||||||
core_temp = publisher.samples[0]
|
core_temp = publisher.samples[0]
|
||||||
@ -999,10 +999,10 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
pipe.publish_data(None, counters)
|
pipe.publish_data(counters)
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(2, len(publisher.samples))
|
self.assertEqual(2, len(publisher.samples))
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
self.assertEqual(2, len(publisher.samples))
|
self.assertEqual(2, len(publisher.samples))
|
||||||
cpu_util = publisher.samples[0]
|
cpu_util = publisher.samples[0]
|
||||||
self.assertEqual('cpu_util', getattr(cpu_util, 'name'))
|
self.assertEqual('cpu_util', getattr(cpu_util, 'name'))
|
||||||
@ -1084,10 +1084,10 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
pipe.publish_data(None, counters)
|
pipe.publish_data(counters)
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(0, len(publisher.samples))
|
self.assertEqual(0, len(publisher.samples))
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
self.assertEqual(0, len(publisher.samples))
|
self.assertEqual(0, len(publisher.samples))
|
||||||
|
|
||||||
@mock.patch('ceilometer.transformer.conversions.LOG')
|
@mock.patch('ceilometer.transformer.conversions.LOG')
|
||||||
@ -1151,10 +1151,10 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
),
|
),
|
||||||
]
|
]
|
||||||
|
|
||||||
pipe.publish_data(None, counters)
|
pipe.publish_data(counters)
|
||||||
publisher = pipe.publishers[0]
|
publisher = pipe.publishers[0]
|
||||||
self.assertEqual(1, len(publisher.samples))
|
self.assertEqual(1, len(publisher.samples))
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
self.assertEqual(1, len(publisher.samples))
|
self.assertEqual(1, len(publisher.samples))
|
||||||
|
|
||||||
cpu_util_sample = publisher.samples[0]
|
cpu_util_sample = publisher.samples[0]
|
||||||
@ -1201,10 +1201,10 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
)
|
)
|
||||||
counters.append(s)
|
counters.append(s)
|
||||||
|
|
||||||
pipe.publish_data(None, counters)
|
pipe.publish_data(counters)
|
||||||
publisher = pipe.publishers[0]
|
publisher = pipe.publishers[0]
|
||||||
self.assertEqual(2, len(publisher.samples))
|
self.assertEqual(2, len(publisher.samples))
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
self.assertEqual(2, len(publisher.samples))
|
self.assertEqual(2, len(publisher.samples))
|
||||||
bps = publisher.samples[0]
|
bps = publisher.samples[0]
|
||||||
self.assertEqual('%s.rate' % meters[0], getattr(bps, 'name'))
|
self.assertEqual('%s.rate' % meters[0], getattr(bps, 'name'))
|
||||||
@ -1330,8 +1330,8 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
pipe.publish_data(None, counters)
|
pipe.publish_data(counters)
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(expected_length, len(publisher.samples))
|
self.assertEqual(expected_length, len(publisher.samples))
|
||||||
return sorted(publisher.samples, key=lambda s: s.volume)
|
return sorted(publisher.samples, key=lambda s: s.volume)
|
||||||
@ -1366,8 +1366,8 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
pipe.publish_data(None, counters)
|
pipe.publish_data(counters)
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
actual = sorted(s.volume for s in publisher.samples)
|
actual = sorted(s.volume for s in publisher.samples)
|
||||||
self.assertEqual([2.0, 3.0, 6.0], actual)
|
self.assertEqual([2.0, 3.0, 6.0], actual)
|
||||||
@ -1562,13 +1562,13 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
pipe.publish_data(None, [counters[0]])
|
pipe.publish_data([counters[0]])
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
publisher = pipe.publishers[0]
|
publisher = pipe.publishers[0]
|
||||||
self.assertEqual(0, len(publisher.samples))
|
self.assertEqual(0, len(publisher.samples))
|
||||||
|
|
||||||
pipe.publish_data(None, [counters[1]])
|
pipe.publish_data([counters[1]])
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
publisher = pipe.publishers[0]
|
publisher = pipe.publishers[0]
|
||||||
self.assertEqual(2, len(publisher.samples))
|
self.assertEqual(2, len(publisher.samples))
|
||||||
|
|
||||||
@ -1600,13 +1600,13 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
pipe.publish_data(None, counters)
|
pipe.publish_data(counters)
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(0, len(publisher.samples))
|
self.assertEqual(0, len(publisher.samples))
|
||||||
|
|
||||||
timeutils.advance_time_seconds(120)
|
timeutils.advance_time_seconds(120)
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(1, len(publisher.samples))
|
self.assertEqual(1, len(publisher.samples))
|
||||||
|
|
||||||
@ -1648,13 +1648,13 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
pipe.publish_data(None, [counters[0]])
|
pipe.publish_data([counters[0]])
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
publisher = pipe.publishers[0]
|
publisher = pipe.publishers[0]
|
||||||
self.assertEqual(0, len(publisher.samples))
|
self.assertEqual(0, len(publisher.samples))
|
||||||
|
|
||||||
pipe.publish_data(None, [counters[1]])
|
pipe.publish_data([counters[1]])
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
publisher = pipe.publishers[0]
|
publisher = pipe.publishers[0]
|
||||||
|
|
||||||
self.assertEqual(1, len(publisher.samples))
|
self.assertEqual(1, len(publisher.samples))
|
||||||
@ -1680,20 +1680,19 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
counter.volume = offset
|
counter.volume = offset
|
||||||
counter.type = sample.TYPE_CUMULATIVE
|
counter.type = sample.TYPE_CUMULATIVE
|
||||||
counter.unit = 'ns'
|
counter.unit = 'ns'
|
||||||
aggregator.handle_sample(context.get_admin_context(), counter)
|
aggregator.handle_sample(counter)
|
||||||
|
|
||||||
if offset == 1:
|
if offset == 1:
|
||||||
test_time = counter_time
|
test_time = counter_time
|
||||||
|
|
||||||
counter_time = counter_time + datetime.timedelta(0, 1)
|
counter_time = counter_time + datetime.timedelta(0, 1)
|
||||||
|
|
||||||
aggregated_counters = aggregator.flush(context.get_admin_context())
|
aggregated_counters = aggregator.flush()
|
||||||
self.assertEqual(len(aggregated_counters), 1)
|
self.assertEqual(len(aggregated_counters), 1)
|
||||||
self.assertEqual(aggregated_counters[0].timestamp,
|
self.assertEqual(aggregated_counters[0].timestamp,
|
||||||
timeutils.isotime(test_time))
|
timeutils.isotime(test_time))
|
||||||
|
|
||||||
rate_of_change_transformer.handle_sample(context.get_admin_context(),
|
rate_of_change_transformer.handle_sample(aggregated_counters[0])
|
||||||
aggregated_counters[0])
|
|
||||||
|
|
||||||
for offset in range(2):
|
for offset in range(2):
|
||||||
counter = copy.copy(self.test_counter)
|
counter = copy.copy(self.test_counter)
|
||||||
@ -1702,20 +1701,20 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
counter.volume = 2
|
counter.volume = 2
|
||||||
counter.type = sample.TYPE_CUMULATIVE
|
counter.type = sample.TYPE_CUMULATIVE
|
||||||
counter.unit = 'ns'
|
counter.unit = 'ns'
|
||||||
aggregator.handle_sample(context.get_admin_context(), counter)
|
aggregator.handle_sample(counter)
|
||||||
|
|
||||||
if offset == 0:
|
if offset == 0:
|
||||||
test_time = counter_time
|
test_time = counter_time
|
||||||
|
|
||||||
counter_time = counter_time + datetime.timedelta(0, 1)
|
counter_time = counter_time + datetime.timedelta(0, 1)
|
||||||
|
|
||||||
aggregated_counters = aggregator.flush(context.get_admin_context())
|
aggregated_counters = aggregator.flush()
|
||||||
self.assertEqual(len(aggregated_counters), 2)
|
self.assertEqual(len(aggregated_counters), 2)
|
||||||
|
|
||||||
for counter in aggregated_counters:
|
for counter in aggregated_counters:
|
||||||
if counter.resource_id == resource_id[0]:
|
if counter.resource_id == resource_id[0]:
|
||||||
rateOfChange = rate_of_change_transformer.handle_sample(
|
rateOfChange = rate_of_change_transformer.handle_sample(
|
||||||
context.get_admin_context(), counter)
|
counter)
|
||||||
self.assertEqual(counter.timestamp,
|
self.assertEqual(counter.timestamp,
|
||||||
timeutils.isotime(test_time))
|
timeutils.isotime(test_time))
|
||||||
|
|
||||||
@ -1796,8 +1795,8 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
for s in counters:
|
for s in counters:
|
||||||
pipe.publish_data(None, s)
|
pipe.publish_data(s)
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
expected_len = len(test_resources) * len(expected)
|
expected_len = len(test_resources) * len(expected)
|
||||||
self.assertEqual(expected_len, len(publisher.samples))
|
self.assertEqual(expected_len, len(publisher.samples))
|
||||||
@ -1916,19 +1915,19 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
pipe.publish_data(None, [counter])
|
pipe.publish_data([counter])
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(0, len(publisher.samples))
|
self.assertEqual(0, len(publisher.samples))
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
self.assertEqual(1, len(publisher.samples))
|
self.assertEqual(1, len(publisher.samples))
|
||||||
self.assertEqual(1026.0, publisher.samples[0].volume)
|
self.assertEqual(1026.0, publisher.samples[0].volume)
|
||||||
|
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
self.assertEqual(1, len(publisher.samples))
|
self.assertEqual(1, len(publisher.samples))
|
||||||
|
|
||||||
counter.volume = 2048.0
|
counter.volume = 2048.0
|
||||||
pipe.publish_data(None, [counter])
|
pipe.publish_data([counter])
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
self.assertEqual(2, len(publisher.samples))
|
self.assertEqual(2, len(publisher.samples))
|
||||||
self.assertEqual(2050.0, publisher.samples[1].volume)
|
self.assertEqual(2050.0, publisher.samples[1].volume)
|
||||||
|
|
||||||
@ -1946,7 +1945,7 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
timeutils.advance_time_seconds(200)
|
timeutils.advance_time_seconds(200)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(0, len(publisher.samples))
|
self.assertEqual(0, len(publisher.samples))
|
||||||
|
|
||||||
@ -1967,8 +1966,8 @@ class BasePipelineTestCase(base.BaseTestCase):
|
|||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
pipe = pipeline_manager.pipelines[0]
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
pipe.publish_data(None, data)
|
pipe.publish_data(data)
|
||||||
pipe.flush(None)
|
pipe.flush()
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(expected, len(publisher.samples))
|
self.assertEqual(expected, len(publisher.samples))
|
||||||
return publisher.samples
|
return publisher.samples
|
||||||
|
@ -57,5 +57,4 @@ class NotificationBaseTestCase(base.BaseTestCase):
|
|||||||
'payload': {'foo': 'bar'},
|
'payload': {'foo': 'bar'},
|
||||||
'message_id': '3577a84f-29ec-4904-9566-12c52289c2e8'
|
'message_id': '3577a84f-29ec-4904-9566-12c52289c2e8'
|
||||||
}
|
}
|
||||||
plugin.to_samples_and_publish.assert_called_with(mock.ANY,
|
plugin.to_samples_and_publish.assert_called_with(notification)
|
||||||
notification)
|
|
||||||
|
@ -72,8 +72,7 @@ class TestFilePublisher(base.BaseTestCase):
|
|||||||
parsed_url = netutils.urlsplit('file://%s?max_bytes=50&backup_count=3'
|
parsed_url = netutils.urlsplit('file://%s?max_bytes=50&backup_count=3'
|
||||||
% name)
|
% name)
|
||||||
publisher = file.FilePublisher(parsed_url)
|
publisher = file.FilePublisher(parsed_url)
|
||||||
publisher.publish_samples(None,
|
publisher.publish_samples(self.test_data)
|
||||||
self.test_data)
|
|
||||||
|
|
||||||
handler = publisher.publisher_logger.handlers[0]
|
handler = publisher.publisher_logger.handlers[0]
|
||||||
self.assertIsInstance(handler,
|
self.assertIsInstance(handler,
|
||||||
@ -90,8 +89,7 @@ class TestFilePublisher(base.BaseTestCase):
|
|||||||
name = '%s/log_file_plain' % tempdir
|
name = '%s/log_file_plain' % tempdir
|
||||||
parsed_url = netutils.urlsplit('file://%s' % name)
|
parsed_url = netutils.urlsplit('file://%s' % name)
|
||||||
publisher = file.FilePublisher(parsed_url)
|
publisher = file.FilePublisher(parsed_url)
|
||||||
publisher.publish_samples(None,
|
publisher.publish_samples(self.test_data)
|
||||||
self.test_data)
|
|
||||||
|
|
||||||
handler = publisher.publisher_logger.handlers[0]
|
handler = publisher.publisher_logger.handlers[0]
|
||||||
self.assertIsInstance(handler,
|
self.assertIsInstance(handler,
|
||||||
@ -114,7 +112,6 @@ class TestFilePublisher(base.BaseTestCase):
|
|||||||
'file://%s/log_file_bad'
|
'file://%s/log_file_bad'
|
||||||
'?max_bytes=yus&backup_count=5y' % tempdir)
|
'?max_bytes=yus&backup_count=5y' % tempdir)
|
||||||
publisher = file.FilePublisher(parsed_url)
|
publisher = file.FilePublisher(parsed_url)
|
||||||
publisher.publish_samples(None,
|
publisher.publish_samples(self.test_data)
|
||||||
self.test_data)
|
|
||||||
|
|
||||||
self.assertIsNone(publisher.publisher_logger)
|
self.assertIsNone(publisher.publisher_logger)
|
||||||
|
@ -123,14 +123,14 @@ class TestHttpPublisher(base.BaseTestCase):
|
|||||||
res = mock.Mock()
|
res = mock.Mock()
|
||||||
res.status_code = 200
|
res.status_code = 200
|
||||||
with mock.patch.object(Session, 'post', return_value=res) as m_req:
|
with mock.patch.object(Session, 'post', return_value=res) as m_req:
|
||||||
publisher.publish_samples(None, self.sample_data)
|
publisher.publish_samples(self.sample_data)
|
||||||
|
|
||||||
self.assertEqual(1, m_req.call_count)
|
self.assertEqual(1, m_req.call_count)
|
||||||
self.assertFalse(thelog.error.called)
|
self.assertFalse(thelog.error.called)
|
||||||
|
|
||||||
res.status_code = 401
|
res.status_code = 401
|
||||||
with mock.patch.object(Session, 'post', return_value=res) as m_req:
|
with mock.patch.object(Session, 'post', return_value=res) as m_req:
|
||||||
publisher.publish_samples(None, self.sample_data)
|
publisher.publish_samples(self.sample_data)
|
||||||
|
|
||||||
self.assertEqual(1, m_req.call_count)
|
self.assertEqual(1, m_req.call_count)
|
||||||
self.assertTrue(thelog.error.called)
|
self.assertTrue(thelog.error.called)
|
||||||
@ -144,14 +144,14 @@ class TestHttpPublisher(base.BaseTestCase):
|
|||||||
res = mock.Mock()
|
res = mock.Mock()
|
||||||
res.status_code = 200
|
res.status_code = 200
|
||||||
with mock.patch.object(Session, 'post', return_value=res) as m_req:
|
with mock.patch.object(Session, 'post', return_value=res) as m_req:
|
||||||
publisher.publish_events(None, self.event_data)
|
publisher.publish_events(self.event_data)
|
||||||
|
|
||||||
self.assertEqual(1, m_req.call_count)
|
self.assertEqual(1, m_req.call_count)
|
||||||
self.assertFalse(thelog.error.called)
|
self.assertFalse(thelog.error.called)
|
||||||
|
|
||||||
res.status_code = 401
|
res.status_code = 401
|
||||||
with mock.patch.object(Session, 'post', return_value=res) as m_req:
|
with mock.patch.object(Session, 'post', return_value=res) as m_req:
|
||||||
publisher.publish_samples(None, self.event_data)
|
publisher.publish_samples(self.event_data)
|
||||||
|
|
||||||
self.assertEqual(1, m_req.call_count)
|
self.assertEqual(1, m_req.call_count)
|
||||||
self.assertTrue(thelog.error.called)
|
self.assertTrue(thelog.error.called)
|
||||||
@ -164,7 +164,7 @@ class TestHttpPublisher(base.BaseTestCase):
|
|||||||
res = mock.Mock()
|
res = mock.Mock()
|
||||||
res.status_code = 200
|
res.status_code = 200
|
||||||
with mock.patch.object(Session, 'post', return_value=res) as m_req:
|
with mock.patch.object(Session, 'post', return_value=res) as m_req:
|
||||||
publisher.publish_events(None, self.empty_event_data)
|
publisher.publish_events(self.empty_event_data)
|
||||||
|
|
||||||
self.assertEqual(0, m_req.call_count)
|
self.assertEqual(0, m_req.call_count)
|
||||||
self.assertTrue(thelog.debug.called)
|
self.assertTrue(thelog.debug.called)
|
||||||
|
@ -102,7 +102,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
|
|||||||
'kafka://127.0.0.1:9092?topic=ceilometer'))
|
'kafka://127.0.0.1:9092?topic=ceilometer'))
|
||||||
|
|
||||||
with mock.patch.object(publisher, '_producer') as fake_producer:
|
with mock.patch.object(publisher, '_producer') as fake_producer:
|
||||||
publisher.publish_samples(mock.MagicMock(), self.test_data)
|
publisher.publish_samples(self.test_data)
|
||||||
self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
|
self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
|
||||||
self.assertEqual(0, len(publisher.local_queue))
|
self.assertEqual(0, len(publisher.local_queue))
|
||||||
|
|
||||||
@ -111,7 +111,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
|
|||||||
netutils.urlsplit('kafka://127.0.0.1:9092'))
|
netutils.urlsplit('kafka://127.0.0.1:9092'))
|
||||||
|
|
||||||
with mock.patch.object(publisher, '_producer') as fake_producer:
|
with mock.patch.object(publisher, '_producer') as fake_producer:
|
||||||
publisher.publish_samples(mock.MagicMock(), self.test_data)
|
publisher.publish_samples(self.test_data)
|
||||||
self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
|
self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
|
||||||
self.assertEqual(0, len(publisher.local_queue))
|
self.assertEqual(0, len(publisher.local_queue))
|
||||||
|
|
||||||
@ -132,7 +132,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
|
|||||||
fake_producer.send_messages.side_effect = TypeError
|
fake_producer.send_messages.side_effect = TypeError
|
||||||
self.assertRaises(msg_publisher.DeliveryFailure,
|
self.assertRaises(msg_publisher.DeliveryFailure,
|
||||||
publisher.publish_samples,
|
publisher.publish_samples,
|
||||||
mock.MagicMock(), self.test_data)
|
self.test_data)
|
||||||
self.assertEqual(100, len(fake_producer.send_messages.mock_calls))
|
self.assertEqual(100, len(fake_producer.send_messages.mock_calls))
|
||||||
self.assertEqual(0, len(publisher.local_queue))
|
self.assertEqual(0, len(publisher.local_queue))
|
||||||
|
|
||||||
@ -142,7 +142,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
|
|||||||
|
|
||||||
with mock.patch.object(publisher, '_producer') as fake_producer:
|
with mock.patch.object(publisher, '_producer') as fake_producer:
|
||||||
fake_producer.send_messages.side_effect = Exception("test")
|
fake_producer.send_messages.side_effect = Exception("test")
|
||||||
publisher.publish_samples(mock.MagicMock(), self.test_data)
|
publisher.publish_samples(self.test_data)
|
||||||
self.assertEqual(1, len(fake_producer.send_messages.mock_calls))
|
self.assertEqual(1, len(fake_producer.send_messages.mock_calls))
|
||||||
self.assertEqual(0, len(publisher.local_queue))
|
self.assertEqual(0, len(publisher.local_queue))
|
||||||
|
|
||||||
@ -152,7 +152,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
|
|||||||
|
|
||||||
with mock.patch.object(publisher, '_producer') as fake_producer:
|
with mock.patch.object(publisher, '_producer') as fake_producer:
|
||||||
fake_producer.send_messages.side_effect = Exception("test")
|
fake_producer.send_messages.side_effect = Exception("test")
|
||||||
publisher.publish_samples(mock.MagicMock(), self.test_data)
|
publisher.publish_samples(self.test_data)
|
||||||
self.assertEqual(1, len(fake_producer.send_messages.mock_calls))
|
self.assertEqual(1, len(fake_producer.send_messages.mock_calls))
|
||||||
self.assertEqual(1, len(publisher.local_queue))
|
self.assertEqual(1, len(publisher.local_queue))
|
||||||
|
|
||||||
@ -166,13 +166,13 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
|
|||||||
for i in range(0, 2000):
|
for i in range(0, 2000):
|
||||||
for s in self.test_data:
|
for s in self.test_data:
|
||||||
s.name = 'test-%d' % i
|
s.name = 'test-%d' % i
|
||||||
publisher.publish_samples(mock.MagicMock(), self.test_data)
|
publisher.publish_samples(self.test_data)
|
||||||
|
|
||||||
self.assertEqual(1024, len(publisher.local_queue))
|
self.assertEqual(1024, len(publisher.local_queue))
|
||||||
self.assertEqual('test-976',
|
self.assertEqual('test-976',
|
||||||
publisher.local_queue[0][2][0]['counter_name'])
|
publisher.local_queue[0][1][0]['counter_name'])
|
||||||
self.assertEqual('test-1999',
|
self.assertEqual('test-1999',
|
||||||
publisher.local_queue[1023][2][0]['counter_name'])
|
publisher.local_queue[1023][1][0]['counter_name'])
|
||||||
|
|
||||||
def test_publish_to_host_from_down_to_up_with_queue(self):
|
def test_publish_to_host_from_down_to_up_with_queue(self):
|
||||||
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
|
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
|
||||||
@ -183,14 +183,14 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
|
|||||||
for i in range(0, 16):
|
for i in range(0, 16):
|
||||||
for s in self.test_data:
|
for s in self.test_data:
|
||||||
s.name = 'test-%d' % i
|
s.name = 'test-%d' % i
|
||||||
publisher.publish_samples(mock.MagicMock(), self.test_data)
|
publisher.publish_samples(self.test_data)
|
||||||
|
|
||||||
self.assertEqual(16, len(publisher.local_queue))
|
self.assertEqual(16, len(publisher.local_queue))
|
||||||
|
|
||||||
fake_producer.send_messages.side_effect = None
|
fake_producer.send_messages.side_effect = None
|
||||||
for s in self.test_data:
|
for s in self.test_data:
|
||||||
s.name = 'test-%d' % 16
|
s.name = 'test-%d' % 16
|
||||||
publisher.publish_samples(mock.MagicMock(), self.test_data)
|
publisher.publish_samples(self.test_data)
|
||||||
self.assertEqual(0, len(publisher.local_queue))
|
self.assertEqual(0, len(publisher.local_queue))
|
||||||
|
|
||||||
def test_publish_event_with_default_policy(self):
|
def test_publish_event_with_default_policy(self):
|
||||||
@ -198,13 +198,13 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
|
|||||||
netutils.urlsplit('kafka://127.0.0.1:9092?topic=ceilometer'))
|
netutils.urlsplit('kafka://127.0.0.1:9092?topic=ceilometer'))
|
||||||
|
|
||||||
with mock.patch.object(publisher, '_producer') as fake_producer:
|
with mock.patch.object(publisher, '_producer') as fake_producer:
|
||||||
publisher.publish_events(mock.MagicMock(), self.test_event_data)
|
publisher.publish_events(self.test_event_data)
|
||||||
self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
|
self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
|
||||||
|
|
||||||
with mock.patch.object(publisher, '_producer') as fake_producer:
|
with mock.patch.object(publisher, '_producer') as fake_producer:
|
||||||
fake_producer.send_messages.side_effect = Exception("test")
|
fake_producer.send_messages.side_effect = Exception("test")
|
||||||
self.assertRaises(msg_publisher.DeliveryFailure,
|
self.assertRaises(msg_publisher.DeliveryFailure,
|
||||||
publisher.publish_events,
|
publisher.publish_events,
|
||||||
mock.MagicMock(), self.test_event_data)
|
self.test_event_data)
|
||||||
self.assertEqual(100, len(fake_producer.send_messages.mock_calls))
|
self.assertEqual(100, len(fake_producer.send_messages.mock_calls))
|
||||||
self.assertEqual(0, len(publisher.local_queue))
|
self.assertEqual(0, len(publisher.local_queue))
|
||||||
|
@ -151,12 +151,12 @@ class TestPublisherPolicy(TestPublisher):
|
|||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
msg_publisher.DeliveryFailure,
|
msg_publisher.DeliveryFailure,
|
||||||
getattr(publisher, self.pub_func),
|
getattr(publisher, self.pub_func),
|
||||||
mock.MagicMock(), self.test_data)
|
self.test_data)
|
||||||
self.assertTrue(mylog.info.called)
|
self.assertTrue(mylog.info.called)
|
||||||
self.assertEqual('default', publisher.policy)
|
self.assertEqual('default', publisher.policy)
|
||||||
self.assertEqual(0, len(publisher.local_queue))
|
self.assertEqual(0, len(publisher.local_queue))
|
||||||
fake_send.assert_called_once_with(
|
fake_send.assert_called_once_with(
|
||||||
mock.ANY, self.topic, mock.ANY)
|
self.topic, mock.ANY)
|
||||||
|
|
||||||
@mock.patch('ceilometer.publisher.messaging.LOG')
|
@mock.patch('ceilometer.publisher.messaging.LOG')
|
||||||
def test_published_with_policy_block(self, mylog):
|
def test_published_with_policy_block(self, mylog):
|
||||||
@ -168,11 +168,11 @@ class TestPublisherPolicy(TestPublisher):
|
|||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
msg_publisher.DeliveryFailure,
|
msg_publisher.DeliveryFailure,
|
||||||
getattr(publisher, self.pub_func),
|
getattr(publisher, self.pub_func),
|
||||||
mock.MagicMock(), self.test_data)
|
self.test_data)
|
||||||
self.assertTrue(mylog.info.called)
|
self.assertTrue(mylog.info.called)
|
||||||
self.assertEqual(0, len(publisher.local_queue))
|
self.assertEqual(0, len(publisher.local_queue))
|
||||||
fake_send.assert_called_once_with(
|
fake_send.assert_called_once_with(
|
||||||
mock.ANY, self.topic, mock.ANY)
|
self.topic, mock.ANY)
|
||||||
|
|
||||||
@mock.patch('ceilometer.publisher.messaging.LOG')
|
@mock.patch('ceilometer.publisher.messaging.LOG')
|
||||||
def test_published_with_policy_incorrect(self, mylog):
|
def test_published_with_policy_incorrect(self, mylog):
|
||||||
@ -184,12 +184,12 @@ class TestPublisherPolicy(TestPublisher):
|
|||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
msg_publisher.DeliveryFailure,
|
msg_publisher.DeliveryFailure,
|
||||||
getattr(publisher, self.pub_func),
|
getattr(publisher, self.pub_func),
|
||||||
mock.MagicMock(), self.test_data)
|
self.test_data)
|
||||||
self.assertTrue(mylog.warning.called)
|
self.assertTrue(mylog.warning.called)
|
||||||
self.assertEqual('default', publisher.policy)
|
self.assertEqual('default', publisher.policy)
|
||||||
self.assertEqual(0, len(publisher.local_queue))
|
self.assertEqual(0, len(publisher.local_queue))
|
||||||
fake_send.assert_called_once_with(
|
fake_send.assert_called_once_with(
|
||||||
mock.ANY, self.topic, mock.ANY)
|
self.topic, mock.ANY)
|
||||||
|
|
||||||
|
|
||||||
@mock.patch('ceilometer.publisher.messaging.LOG', mock.Mock())
|
@mock.patch('ceilometer.publisher.messaging.LOG', mock.Mock())
|
||||||
@ -201,11 +201,10 @@ class TestPublisherPolicyReactions(TestPublisher):
|
|||||||
side_effect = msg_publisher.DeliveryFailure()
|
side_effect = msg_publisher.DeliveryFailure()
|
||||||
with mock.patch.object(publisher, '_send') as fake_send:
|
with mock.patch.object(publisher, '_send') as fake_send:
|
||||||
fake_send.side_effect = side_effect
|
fake_send.side_effect = side_effect
|
||||||
getattr(publisher, self.pub_func)(mock.MagicMock(),
|
getattr(publisher, self.pub_func)(self.test_data)
|
||||||
self.test_data)
|
|
||||||
self.assertEqual(0, len(publisher.local_queue))
|
self.assertEqual(0, len(publisher.local_queue))
|
||||||
fake_send.assert_called_once_with(
|
fake_send.assert_called_once_with(
|
||||||
mock.ANY, self.topic, mock.ANY)
|
self.topic, mock.ANY)
|
||||||
|
|
||||||
def test_published_with_policy_queue_and_rpc_down(self):
|
def test_published_with_policy_queue_and_rpc_down(self):
|
||||||
publisher = self.publisher_cls(
|
publisher = self.publisher_cls(
|
||||||
@ -214,11 +213,10 @@ class TestPublisherPolicyReactions(TestPublisher):
|
|||||||
with mock.patch.object(publisher, '_send') as fake_send:
|
with mock.patch.object(publisher, '_send') as fake_send:
|
||||||
fake_send.side_effect = side_effect
|
fake_send.side_effect = side_effect
|
||||||
|
|
||||||
getattr(publisher, self.pub_func)(mock.MagicMock(),
|
getattr(publisher, self.pub_func)(self.test_data)
|
||||||
self.test_data)
|
|
||||||
self.assertEqual(1, len(publisher.local_queue))
|
self.assertEqual(1, len(publisher.local_queue))
|
||||||
fake_send.assert_called_once_with(
|
fake_send.assert_called_once_with(
|
||||||
mock.ANY, self.topic, mock.ANY)
|
self.topic, mock.ANY)
|
||||||
|
|
||||||
def test_published_with_policy_queue_and_rpc_down_up(self):
|
def test_published_with_policy_queue_and_rpc_down_up(self):
|
||||||
self.rpc_unreachable = True
|
self.rpc_unreachable = True
|
||||||
@ -228,21 +226,19 @@ class TestPublisherPolicyReactions(TestPublisher):
|
|||||||
side_effect = msg_publisher.DeliveryFailure()
|
side_effect = msg_publisher.DeliveryFailure()
|
||||||
with mock.patch.object(publisher, '_send') as fake_send:
|
with mock.patch.object(publisher, '_send') as fake_send:
|
||||||
fake_send.side_effect = side_effect
|
fake_send.side_effect = side_effect
|
||||||
getattr(publisher, self.pub_func)(mock.MagicMock(),
|
getattr(publisher, self.pub_func)(self.test_data)
|
||||||
self.test_data)
|
|
||||||
|
|
||||||
self.assertEqual(1, len(publisher.local_queue))
|
self.assertEqual(1, len(publisher.local_queue))
|
||||||
|
|
||||||
fake_send.side_effect = mock.MagicMock()
|
fake_send.side_effect = mock.MagicMock()
|
||||||
getattr(publisher, self.pub_func)(mock.MagicMock(),
|
getattr(publisher, self.pub_func)(self.test_data)
|
||||||
self.test_data)
|
|
||||||
|
|
||||||
self.assertEqual(0, len(publisher.local_queue))
|
self.assertEqual(0, len(publisher.local_queue))
|
||||||
|
|
||||||
topic = self.topic
|
topic = self.topic
|
||||||
expected = [mock.call(mock.ANY, topic, mock.ANY),
|
expected = [mock.call(topic, mock.ANY),
|
||||||
mock.call(mock.ANY, topic, mock.ANY),
|
mock.call(topic, mock.ANY),
|
||||||
mock.call(mock.ANY, topic, mock.ANY)]
|
mock.call(topic, mock.ANY)]
|
||||||
self.assertEqual(expected, fake_send.mock_calls)
|
self.assertEqual(expected, fake_send.mock_calls)
|
||||||
|
|
||||||
def test_published_with_policy_sized_queue_and_rpc_down(self):
|
def test_published_with_policy_sized_queue_and_rpc_down(self):
|
||||||
@ -255,21 +251,20 @@ class TestPublisherPolicyReactions(TestPublisher):
|
|||||||
for i in range(0, 5):
|
for i in range(0, 5):
|
||||||
for s in self.test_data:
|
for s in self.test_data:
|
||||||
setattr(s, self.attr, 'test-%d' % i)
|
setattr(s, self.attr, 'test-%d' % i)
|
||||||
getattr(publisher, self.pub_func)(mock.MagicMock(),
|
getattr(publisher, self.pub_func)(self.test_data)
|
||||||
self.test_data)
|
|
||||||
|
|
||||||
self.assertEqual(3, len(publisher.local_queue))
|
self.assertEqual(3, len(publisher.local_queue))
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
'test-2',
|
'test-2',
|
||||||
publisher.local_queue[0][2][0][self.attr]
|
publisher.local_queue[0][1][0][self.attr]
|
||||||
)
|
)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
'test-3',
|
'test-3',
|
||||||
publisher.local_queue[1][2][0][self.attr]
|
publisher.local_queue[1][1][0][self.attr]
|
||||||
)
|
)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
'test-4',
|
'test-4',
|
||||||
publisher.local_queue[2][2][0][self.attr]
|
publisher.local_queue[2][1][0][self.attr]
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_published_with_policy_default_sized_queue_and_rpc_down(self):
|
def test_published_with_policy_default_sized_queue_and_rpc_down(self):
|
||||||
@ -282,15 +277,14 @@ class TestPublisherPolicyReactions(TestPublisher):
|
|||||||
for i in range(0, 2000):
|
for i in range(0, 2000):
|
||||||
for s in self.test_data:
|
for s in self.test_data:
|
||||||
setattr(s, self.attr, 'test-%d' % i)
|
setattr(s, self.attr, 'test-%d' % i)
|
||||||
getattr(publisher, self.pub_func)(mock.MagicMock(),
|
getattr(publisher, self.pub_func)(self.test_data)
|
||||||
self.test_data)
|
|
||||||
|
|
||||||
self.assertEqual(1024, len(publisher.local_queue))
|
self.assertEqual(1024, len(publisher.local_queue))
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
'test-976',
|
'test-976',
|
||||||
publisher.local_queue[0][2][0][self.attr]
|
publisher.local_queue[0][1][0][self.attr]
|
||||||
)
|
)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
'test-1999',
|
'test-1999',
|
||||||
publisher.local_queue[1023][2][0][self.attr]
|
publisher.local_queue[1023][1][0][self.attr]
|
||||||
)
|
)
|
||||||
|
@ -133,8 +133,7 @@ class TestUDPPublisher(base.BaseTestCase):
|
|||||||
self._make_fake_socket(self.data_sent)):
|
self._make_fake_socket(self.data_sent)):
|
||||||
publisher = udp.UDPPublisher(
|
publisher = udp.UDPPublisher(
|
||||||
netutils.urlsplit('udp://somehost'))
|
netutils.urlsplit('udp://somehost'))
|
||||||
publisher.publish_samples(None,
|
publisher.publish_samples(self.test_data)
|
||||||
self.test_data)
|
|
||||||
|
|
||||||
self.assertEqual(5, len(self.data_sent))
|
self.assertEqual(5, len(self.data_sent))
|
||||||
|
|
||||||
@ -172,5 +171,4 @@ class TestUDPPublisher(base.BaseTestCase):
|
|||||||
self._make_broken_socket):
|
self._make_broken_socket):
|
||||||
publisher = udp.UDPPublisher(
|
publisher = udp.UDPPublisher(
|
||||||
netutils.urlsplit('udp://localhost'))
|
netutils.urlsplit('udp://localhost'))
|
||||||
publisher.publish_samples(None,
|
publisher.publish_samples(self.test_data)
|
||||||
self.test_data)
|
|
||||||
|
@ -139,7 +139,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
|
|||||||
|
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
self.test_counter = sample.Sample(
|
self.test_counter = sample.Sample(
|
||||||
@ -154,7 +154,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
|
|||||||
resource_metadata=self.test_counter.resource_metadata,
|
resource_metadata=self.test_counter.resource_metadata,
|
||||||
)
|
)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
self.assertEqual(2, len(pipeline_manager.pipelines))
|
self.assertEqual(2, len(pipeline_manager.pipelines))
|
||||||
@ -182,7 +182,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
|
|||||||
|
|
||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager)
|
self.transformer_manager)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
self.test_counter = sample.Sample(
|
self.test_counter = sample.Sample(
|
||||||
@ -197,7 +197,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
|
|||||||
resource_metadata=self.test_counter.resource_metadata,
|
resource_metadata=self.test_counter.resource_metadata,
|
||||||
)
|
)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_counter])
|
p([self.test_counter])
|
||||||
|
|
||||||
self.assertEqual(2, len(pipeline_manager.pipelines))
|
self.assertEqual(2, len(pipeline_manager.pipelines))
|
||||||
|
@ -37,10 +37,10 @@ class EventPipelineTestCase(base.BaseTestCase):
|
|||||||
return fake_drivers[url](url)
|
return fake_drivers[url](url)
|
||||||
|
|
||||||
class PublisherClassException(publisher.PublisherBase):
|
class PublisherClassException(publisher.PublisherBase):
|
||||||
def publish_samples(self, ctxt, samples):
|
def publish_samples(self, samples):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def publish_events(self, ctxt, events):
|
def publish_events(self, events):
|
||||||
raise Exception()
|
raise Exception()
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -199,13 +199,13 @@ class EventPipelineTestCase(base.BaseTestCase):
|
|||||||
self.transformer_manager,
|
self.transformer_manager,
|
||||||
self.p_type)
|
self.p_type)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_event])
|
p([self.test_event])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(1, len(publisher.events))
|
self.assertEqual(1, len(publisher.events))
|
||||||
|
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_event2])
|
p([self.test_event2])
|
||||||
|
|
||||||
self.assertEqual(2, len(publisher.events))
|
self.assertEqual(2, len(publisher.events))
|
||||||
@ -218,7 +218,7 @@ class EventPipelineTestCase(base.BaseTestCase):
|
|||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager,
|
self.transformer_manager,
|
||||||
self.p_type)
|
self.p_type)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_event])
|
p([self.test_event])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -231,7 +231,7 @@ class EventPipelineTestCase(base.BaseTestCase):
|
|||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager,
|
self.transformer_manager,
|
||||||
self.p_type)
|
self.p_type)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_event])
|
p([self.test_event])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -252,7 +252,7 @@ class EventPipelineTestCase(base.BaseTestCase):
|
|||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager,
|
self.transformer_manager,
|
||||||
self.p_type)
|
self.p_type)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_event])
|
p([self.test_event])
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(1, len(publisher.events))
|
self.assertEqual(1, len(publisher.events))
|
||||||
@ -264,7 +264,7 @@ class EventPipelineTestCase(base.BaseTestCase):
|
|||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager,
|
self.transformer_manager,
|
||||||
self.p_type)
|
self.p_type)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_event])
|
p([self.test_event])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -324,7 +324,7 @@ class EventPipelineTestCase(base.BaseTestCase):
|
|||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager,
|
self.transformer_manager,
|
||||||
self.p_type)
|
self.p_type)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_event, self.test_event2])
|
p([self.test_event, self.test_event2])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -342,7 +342,7 @@ class EventPipelineTestCase(base.BaseTestCase):
|
|||||||
self.transformer_manager,
|
self.transformer_manager,
|
||||||
self.p_type)
|
self.p_type)
|
||||||
|
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_event])
|
p([self.test_event])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
@ -358,7 +358,7 @@ class EventPipelineTestCase(base.BaseTestCase):
|
|||||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
self.transformer_manager,
|
self.transformer_manager,
|
||||||
self.p_type)
|
self.p_type)
|
||||||
with pipeline_manager.publisher(None) as p:
|
with pipeline_manager.publisher() as p:
|
||||||
p([self.test_event])
|
p([self.test_event])
|
||||||
|
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[1]
|
publisher = pipeline_manager.pipelines[0].publishers[1]
|
||||||
@ -401,7 +401,7 @@ class EventPipelineTestCase(base.BaseTestCase):
|
|||||||
self.transformer_manager,
|
self.transformer_manager,
|
||||||
self.p_type)
|
self.p_type)
|
||||||
event_pipeline_endpoint = pipeline.EventPipelineEndpoint(
|
event_pipeline_endpoint = pipeline.EventPipelineEndpoint(
|
||||||
mock.Mock(), pipeline_manager.pipelines[0])
|
pipeline_manager.pipelines[0])
|
||||||
|
|
||||||
fake_publisher.publish_events.side_effect = Exception
|
fake_publisher.publish_events.side_effect = Exception
|
||||||
ret = event_pipeline_endpoint.sample([
|
ret = event_pipeline_endpoint.sample([
|
||||||
|
@ -14,7 +14,6 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
import copy
|
import copy
|
||||||
|
|
||||||
from oslo_context import context
|
|
||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
from oslotest import base
|
from oslotest import base
|
||||||
|
|
||||||
@ -94,7 +93,7 @@ class AggregatorTransformerTestCase(base.BaseTestCase):
|
|||||||
retention_time="300")
|
retention_time="300")
|
||||||
self._insert_sample_data(aggregator)
|
self._insert_sample_data(aggregator)
|
||||||
|
|
||||||
samples = aggregator.flush(context.get_admin_context())
|
samples = aggregator.flush()
|
||||||
|
|
||||||
self.assertEqual([], samples)
|
self.assertEqual([], samples)
|
||||||
|
|
||||||
@ -102,7 +101,7 @@ class AggregatorTransformerTestCase(base.BaseTestCase):
|
|||||||
aggregator = conversions.AggregatorTransformer(size="100")
|
aggregator = conversions.AggregatorTransformer(size="100")
|
||||||
self._insert_sample_data(aggregator)
|
self._insert_sample_data(aggregator)
|
||||||
|
|
||||||
samples = aggregator.flush(context.get_admin_context())
|
samples = aggregator.flush()
|
||||||
|
|
||||||
self.assertEqual(100, len(samples))
|
self.assertEqual(100, len(samples))
|
||||||
|
|
||||||
@ -111,5 +110,5 @@ class AggregatorTransformerTestCase(base.BaseTestCase):
|
|||||||
sample = copy.copy(self.SAMPLE)
|
sample = copy.copy(self.SAMPLE)
|
||||||
sample.resource_id = sample.resource_id + str(self._sample_offset)
|
sample.resource_id = sample.resource_id + str(self._sample_offset)
|
||||||
sample.timestamp = timeutils.isotime()
|
sample.timestamp = timeutils.isotime()
|
||||||
aggregator.handle_sample(context.get_admin_context(), sample)
|
aggregator.handle_sample(sample)
|
||||||
self._sample_offset += 1
|
self._sample_offset += 1
|
||||||
|
@ -36,10 +36,9 @@ class TransformerBase(object):
|
|||||||
super(TransformerBase, self).__init__()
|
super(TransformerBase, self).__init__()
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def handle_sample(self, context, sample):
|
def handle_sample(self, sample):
|
||||||
"""Transform a sample.
|
"""Transform a sample.
|
||||||
|
|
||||||
:param context: Passed from the data collector.
|
|
||||||
:param sample: A sample.
|
:param sample: A sample.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@ -47,11 +46,9 @@ class TransformerBase(object):
|
|||||||
def grouping_keys(self):
|
def grouping_keys(self):
|
||||||
"""Keys used to group transformer."""
|
"""Keys used to group transformer."""
|
||||||
|
|
||||||
def flush(self, context):
|
@staticmethod
|
||||||
"""Flush samples cached previously.
|
def flush():
|
||||||
|
"""Flush samples cached previously."""
|
||||||
:param context: Passed from the data collector.
|
|
||||||
"""
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
@ -30,13 +30,13 @@ class TransformerAccumulator(transformer.TransformerBase):
|
|||||||
self.size = size
|
self.size = size
|
||||||
super(TransformerAccumulator, self).__init__(**kwargs)
|
super(TransformerAccumulator, self).__init__(**kwargs)
|
||||||
|
|
||||||
def handle_sample(self, context, sample):
|
def handle_sample(self, sample):
|
||||||
if self.size >= 1:
|
if self.size >= 1:
|
||||||
self.samples.append(sample)
|
self.samples.append(sample)
|
||||||
else:
|
else:
|
||||||
return sample
|
return sample
|
||||||
|
|
||||||
def flush(self, context):
|
def flush(self):
|
||||||
if len(self.samples) >= self.size:
|
if len(self.samples) >= self.size:
|
||||||
x = self.samples
|
x = self.samples
|
||||||
self.samples = []
|
self.samples = []
|
||||||
|
@ -95,11 +95,11 @@ class ArithmeticTransformer(transformer.TransformerBase):
|
|||||||
LOG.warning(_('Unable to evaluate expression %(expr)s: %(exc)s'),
|
LOG.warning(_('Unable to evaluate expression %(expr)s: %(exc)s'),
|
||||||
{'expr': self.expr, 'exc': e})
|
{'expr': self.expr, 'exc': e})
|
||||||
|
|
||||||
def handle_sample(self, context, _sample):
|
def handle_sample(self, _sample):
|
||||||
self._update_cache(_sample)
|
self._update_cache(_sample)
|
||||||
self.latest_timestamp = _sample.timestamp
|
self.latest_timestamp = _sample.timestamp
|
||||||
|
|
||||||
def flush(self, context):
|
def flush(self):
|
||||||
new_samples = []
|
new_samples = []
|
||||||
cache_clean_list = []
|
cache_clean_list = []
|
||||||
if not self.misconfigured:
|
if not self.misconfigured:
|
||||||
|
@ -72,7 +72,7 @@ class DeltaTransformer(BaseConversionTransformer):
|
|||||||
self.growth_only = growth_only
|
self.growth_only = growth_only
|
||||||
self.cache = {}
|
self.cache = {}
|
||||||
|
|
||||||
def handle_sample(self, context, s):
|
def handle_sample(self, s):
|
||||||
"""Handle a sample, converting if necessary."""
|
"""Handle a sample, converting if necessary."""
|
||||||
key = s.name + s.resource_id
|
key = s.name + s.resource_id
|
||||||
prev = self.cache.get(key)
|
prev = self.cache.get(key)
|
||||||
@ -159,7 +159,7 @@ class ScalingTransformer(BaseConversionTransformer):
|
|||||||
resource_metadata=s.resource_metadata
|
resource_metadata=s.resource_metadata
|
||||||
)
|
)
|
||||||
|
|
||||||
def handle_sample(self, context, s):
|
def handle_sample(self, s):
|
||||||
"""Handle a sample, converting if necessary."""
|
"""Handle a sample, converting if necessary."""
|
||||||
LOG.debug('handling sample %s', s)
|
LOG.debug('handling sample %s', s)
|
||||||
if self.source.get('unit', s.unit) == s.unit:
|
if self.source.get('unit', s.unit) == s.unit:
|
||||||
@ -181,7 +181,7 @@ class RateOfChangeTransformer(ScalingTransformer):
|
|||||||
self.cache = {}
|
self.cache = {}
|
||||||
self.scale = self.scale or '1'
|
self.scale = self.scale or '1'
|
||||||
|
|
||||||
def handle_sample(self, context, s):
|
def handle_sample(self, s):
|
||||||
"""Handle a sample, converting if necessary."""
|
"""Handle a sample, converting if necessary."""
|
||||||
LOG.debug('handling sample %s', s)
|
LOG.debug('handling sample %s', s)
|
||||||
key = s.name + s.resource_id
|
key = s.name + s.resource_id
|
||||||
@ -293,7 +293,7 @@ class AggregatorTransformer(ScalingTransformer):
|
|||||||
# NOTE(sileht): it assumes, a meter always have the same unit/type
|
# NOTE(sileht): it assumes, a meter always have the same unit/type
|
||||||
return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys)
|
return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys)
|
||||||
|
|
||||||
def handle_sample(self, context, sample_):
|
def handle_sample(self, sample_):
|
||||||
if not self.initial_timestamp:
|
if not self.initial_timestamp:
|
||||||
self.initial_timestamp = timeutils.parse_isotime(sample_.timestamp)
|
self.initial_timestamp = timeutils.parse_isotime(sample_.timestamp)
|
||||||
|
|
||||||
@ -317,7 +317,7 @@ class AggregatorTransformer(ScalingTransformer):
|
|||||||
setattr(self.samples[key], field,
|
setattr(self.samples[key], field,
|
||||||
getattr(sample_, field))
|
getattr(sample_, field))
|
||||||
|
|
||||||
def flush(self, context):
|
def flush(self):
|
||||||
if not self.initial_timestamp:
|
if not self.initial_timestamp:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user