Don't load many times the same publisher
This change save some resources usage by loading only once each publisher. Change-Id: Ibc0fd399ca0668937f53ac3a2e68938a0a24cb87
This commit is contained in:
parent
6cfb819c38
commit
a7c73445a0
@ -59,6 +59,7 @@ OPTS = [
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
@ -360,7 +361,7 @@ class Sink(object):
|
||||
passed data directly from the sink which are published unchanged.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, cfg, transformer_manager):
|
||||
def __init__(self, conf, cfg, transformer_manager, publisher_manager):
|
||||
self.conf = conf
|
||||
self.cfg = cfg
|
||||
|
||||
@ -380,9 +381,9 @@ class Sink(object):
|
||||
if '://' not in p:
|
||||
# Support old format without URL
|
||||
p = p + "://"
|
||||
|
||||
try:
|
||||
self.publishers.append(publisher.get_publisher(
|
||||
self.conf, p, self.NAMESPACE))
|
||||
self.publishers.append(publisher_manager.get(p))
|
||||
except Exception:
|
||||
LOG.error(_LE("Unable to load publisher %s"), p,
|
||||
exc_info=True)
|
||||
@ -415,7 +416,7 @@ class Sink(object):
|
||||
|
||||
class EventSink(Sink):
|
||||
|
||||
NAMESPACE = 'ceilometer.event.publisher'
|
||||
PUBLISHER_PURPOSE = 'event'
|
||||
|
||||
def publish_events(self, events):
|
||||
if events:
|
||||
@ -439,7 +440,7 @@ class EventSink(Sink):
|
||||
|
||||
class SampleSink(Sink):
|
||||
|
||||
NAMESPACE = 'ceilometer.publisher'
|
||||
PUBLISHER_PURPOSE = 'sample'
|
||||
|
||||
def _transform_sample(self, start, sample):
|
||||
try:
|
||||
@ -610,11 +611,13 @@ class SamplePipeline(Pipeline):
|
||||
self.sink.publish_samples(supported)
|
||||
|
||||
|
||||
SAMPLE_TYPE = {'pipeline': SamplePipeline,
|
||||
SAMPLE_TYPE = {'name': 'sample',
|
||||
'pipeline': SamplePipeline,
|
||||
'source': SampleSource,
|
||||
'sink': SampleSink}
|
||||
|
||||
EVENT_TYPE = {'pipeline': EventPipeline,
|
||||
EVENT_TYPE = {'name': 'event',
|
||||
'pipeline': EventPipeline,
|
||||
'source': EventSource,
|
||||
'sink': EventSink}
|
||||
|
||||
@ -670,6 +673,21 @@ class ConfigManagerBase(object):
|
||||
return False
|
||||
|
||||
|
||||
class PublisherManager(object):
|
||||
def __init__(self, conf, purpose):
|
||||
self._loaded_publishers = {}
|
||||
self._conf = conf
|
||||
self._purpose = purpose
|
||||
|
||||
def get(self, url):
|
||||
if url not in self._loaded_publishers:
|
||||
p = publisher.get_publisher(
|
||||
self._conf, url,
|
||||
'ceilometer.%s.publisher' % self._purpose)
|
||||
self._loaded_publishers[url] = p
|
||||
return self._loaded_publishers[url]
|
||||
|
||||
|
||||
class PipelineManager(ConfigManagerBase):
|
||||
"""Pipeline Manager
|
||||
|
||||
@ -754,6 +772,7 @@ class PipelineManager(ConfigManagerBase):
|
||||
raise PipelineException("Both sources & sinks are required",
|
||||
cfg)
|
||||
LOG.info(_LI('detected decoupled pipeline config format'))
|
||||
publisher_manager = PublisherManager(self.conf, p_type['name'])
|
||||
|
||||
unique_names = set()
|
||||
sources = []
|
||||
@ -776,7 +795,8 @@ class PipelineManager(ConfigManagerBase):
|
||||
else:
|
||||
unique_names.add(name)
|
||||
sinks[s['name']] = p_type['sink'](self.conf, s,
|
||||
transformer_manager)
|
||||
transformer_manager,
|
||||
publisher_manager)
|
||||
unique_names.clear()
|
||||
|
||||
for source in sources:
|
||||
|
@ -22,7 +22,7 @@ import six
|
||||
from stevedore import driver
|
||||
|
||||
|
||||
def get_publisher(conf, url, namespace='ceilometer.publisher'):
|
||||
def get_publisher(conf, url, namespace):
|
||||
"""Get publisher driver and load it.
|
||||
|
||||
:param URL: URL for the publisher
|
||||
|
@ -226,7 +226,7 @@ ceilometer.transformer =
|
||||
aggregator = ceilometer.transformer.conversions:AggregatorTransformer
|
||||
arithmetic = ceilometer.transformer.arithmetic:ArithmeticTransformer
|
||||
|
||||
ceilometer.publisher =
|
||||
ceilometer.sample.publisher =
|
||||
test = ceilometer.publisher.test:TestPublisher
|
||||
notifier = ceilometer.publisher.messaging:SampleNotifierPublisher
|
||||
udp = ceilometer.publisher.udp:UDPPublisher
|
||||
|
Loading…
x
Reference in New Issue
Block a user