pipeline: stop using global conf

Change-Id: Ieedc84ebe87dfb45941332db25e1fbb0739da455
This commit is contained in:
Mehdi Abaakouk 2016-10-10 23:40:56 +02:00
parent 32fc7ebd20
commit bf46cee8b6
12 changed files with 164 additions and 77 deletions

View File

@ -416,7 +416,7 @@ class AgentManager(service_base.PipelineBasedService):
def run(self):
super(AgentManager, self).run()
self.polling_manager = pipeline.setup_polling()
self.polling_manager = pipeline.setup_polling(self.conf)
self.join_partitioning_groups()
self.start_polling_tasks()
self.init_pipeline_refresh()

View File

@ -65,7 +65,7 @@ def send_sample():
help='Meter metadata.'),
])
service.prepare_service()
conf = service.prepare_service()
# Set up logging to use the console
console = logging.StreamHandler(sys.stderr)
@ -77,17 +77,17 @@ def send_sample():
root_logger.setLevel(logging.DEBUG)
pipeline_manager = pipeline.setup_pipeline(
extension.ExtensionManager('ceilometer.transformer'))
conf, extension.ExtensionManager('ceilometer.transformer'))
with pipeline_manager.publisher() as p:
p([sample.Sample(
name=cfg.CONF.sample_name,
type=cfg.CONF.sample_type,
unit=cfg.CONF.sample_unit,
volume=cfg.CONF.sample_volume,
user_id=cfg.CONF.sample_user,
project_id=cfg.CONF.sample_project,
resource_id=cfg.CONF.sample_resource,
timestamp=cfg.CONF.sample_timestamp,
resource_metadata=cfg.CONF.sample_metadata and eval(
cfg.CONF.sample_metadata))])
name=conf.sample_name,
type=conf.sample_type,
unit=conf.sample_unit,
volume=conf.sample_volume,
user_id=conf.sample_user,
project_id=conf.sample_project,
resource_id=conf.sample_resource,
timestamp=conf.sample_timestamp,
resource_metadata=conf.sample_metadata and eval(
conf.sample_metadata))])

View File

@ -120,7 +120,7 @@ class NotificationService(service_base.PipelineBasedService):
def _get_pipe_manager(self, transport, pipeline_manager):
if self.conf.notification.workload_partitioning:
pipe_manager = pipeline.SamplePipelineTransportManager()
pipe_manager = pipeline.SamplePipelineTransportManager(self.conf)
for pipe in pipeline_manager.pipelines:
key = pipeline.get_pipeline_grouping_key(pipe)
pipe_manager.add_transporter(
@ -133,7 +133,8 @@ class NotificationService(service_base.PipelineBasedService):
def _get_event_pipeline_manager(self, transport):
if self.conf.notification.workload_partitioning:
event_pipe_manager = pipeline.EventPipelineTransportManager()
event_pipe_manager = pipeline.EventPipelineTransportManager(
self.conf)
for pipe in self.event_pipeline_manager.pipelines:
event_pipe_manager.add_transporter(
(pipe.source.support_event, ['event_type'],
@ -156,9 +157,9 @@ class NotificationService(service_base.PipelineBasedService):
# hence only one listener is required
self.pipeline_listener = None
self.pipeline_manager = pipeline.setup_pipeline()
self.pipeline_manager = pipeline.setup_pipeline(self.conf)
self.event_pipeline_manager = pipeline.setup_event_pipeline()
self.event_pipeline_manager = pipeline.setup_event_pipeline(self.conf)
self.transport = messaging.get_transport(self.conf)

View File

@ -82,6 +82,7 @@ class PipelineEndpoint(object):
self.filter_rule = oslo_messaging.NotificationFilter(
publisher_id=pipeline.name)
self.publish_context = PublishContext([pipeline])
self.conf = pipeline.conf
@abc.abstractmethod
def sample(self, messages):
@ -103,7 +104,7 @@ class SamplePipelineEndpoint(PipelineEndpoint):
resource_metadata=s['resource_metadata'],
source=s.get('source'))
for s in samples if publisher_utils.verify_signature(
s, cfg.CONF.publisher.telemetry_secret)
s, self.conf.publisher.telemetry_secret)
]
with self.publish_context as p:
p(sorted(samples, key=methodcaller('get_iso_timestamp')))
@ -123,20 +124,21 @@ class EventPipelineEndpoint(PipelineEndpoint):
for name, dtype, value in ev['traits']],
raw=ev.get('raw', {}))
for ev in events if publisher_utils.verify_signature(
ev, cfg.CONF.publisher.telemetry_secret)
ev, self.conf.publisher.telemetry_secret)
]
try:
with self.publish_context as p:
p(events)
except Exception:
if not cfg.CONF.notification.ack_on_event_error:
if not self.conf.notification.ack_on_event_error:
return oslo_messaging.NotificationResult.REQUEUE
raise
return oslo_messaging.NotificationResult.HANDLED
class _PipelineTransportManager(object):
def __init__(self):
def __init__(self, conf):
self.conf = conf
self.transporters = []
@staticmethod
@ -186,20 +188,18 @@ class SamplePipelineTransportManager(_PipelineTransportManager):
filter_attr = 'counter_name'
event_type = 'ceilometer.pipeline'
@staticmethod
def serializer(data):
def serializer(self, data):
return publisher_utils.meter_message_from_counter(
data, cfg.CONF.publisher.telemetry_secret)
data, self.conf.publisher.telemetry_secret)
class EventPipelineTransportManager(_PipelineTransportManager):
filter_attr = 'event_type'
event_type = 'pipeline.event'
@staticmethod
def serializer(data):
def serializer(self, data):
return publisher_utils.message_from_event(
data, cfg.CONF.publisher.telemetry_secret)
data, self.conf.publisher.telemetry_secret)
class PublishContext(object):
@ -517,7 +517,8 @@ class SampleSink(Sink):
class Pipeline(object):
"""Represents a coupling between a sink and a corresponding source."""
def __init__(self, source, sink):
def __init__(self, conf, source, sink):
self.conf = conf
self.source = source
self.sink = sink
self.name = str(self)
@ -623,7 +624,8 @@ EVENT_TYPE = {'pipeline': EventPipeline,
class ConfigManagerBase(object):
"""Base class for managing configuration file refresh"""
def __init__(self):
def __init__(self, conf):
self.conf = conf
self.cfg_loc = None
def load_config(self, cfg_info):
@ -632,7 +634,7 @@ class ConfigManagerBase(object):
conf = cfg_info
else:
if not os.path.exists(cfg_info):
cfg_info = cfg.CONF.find_file(cfg_info)
cfg_info = self.conf.find_file(cfg_info)
with open(cfg_info) as fap:
data = fap.read()
@ -682,7 +684,8 @@ class PipelineManager(ConfigManagerBase):
"""
def __init__(self, cfg_info, transformer_manager, p_type=SAMPLE_TYPE):
def __init__(self, conf, cfg_info, transformer_manager,
p_type=SAMPLE_TYPE):
"""Setup the pipelines according to config.
The configuration is supported as follows:
@ -749,7 +752,7 @@ class PipelineManager(ConfigManagerBase):
Publisher's name is plugin name in setup.cfg
"""
super(PipelineManager, self).__init__()
super(PipelineManager, self).__init__(conf)
cfg = self.load_config(cfg_info)
self.pipelines = []
if not ('sources' in cfg and 'sinks' in cfg):
@ -783,7 +786,7 @@ class PipelineManager(ConfigManagerBase):
for source in sources:
source.check_sinks(sinks)
for target in source.sinks:
pipe = p_type['pipeline'](source, sinks[target])
pipe = p_type['pipeline'](self.conf, source, sinks[target])
if pipe.name in unique_names:
raise PipelineException(
"Duplicate pipeline name: %s. Ensure pipeline"
@ -808,12 +811,12 @@ class PollingManager(ConfigManagerBase):
Polling manager sets up polling according to config file.
"""
def __init__(self, cfg_info):
def __init__(self, conf, cfg_info):
"""Setup the polling according to config.
The configuration is the sources half of the Pipeline Config.
"""
super(PollingManager, self).__init__()
super(PollingManager, self).__init__(conf)
cfg = self.load_config(cfg_info)
self.sources = []
if not ('sources' in cfg and 'sinks' in cfg):
@ -833,26 +836,26 @@ class PollingManager(ConfigManagerBase):
unique_names.clear()
def setup_event_pipeline(transformer_manager=None):
def setup_event_pipeline(conf, transformer_manager=None):
"""Setup event pipeline manager according to yaml config file."""
default = extension.ExtensionManager('ceilometer.transformer')
cfg_file = cfg.CONF.event_pipeline_cfg_file
return PipelineManager(cfg_file, transformer_manager or default,
cfg_file = conf.event_pipeline_cfg_file
return PipelineManager(conf, cfg_file, transformer_manager or default,
EVENT_TYPE)
def setup_pipeline(transformer_manager=None):
def setup_pipeline(conf, transformer_manager=None):
"""Setup pipeline manager according to yaml config file."""
default = extension.ExtensionManager('ceilometer.transformer')
cfg_file = cfg.CONF.pipeline_cfg_file
return PipelineManager(cfg_file, transformer_manager or default,
cfg_file = conf.pipeline_cfg_file
return PipelineManager(conf, cfg_file, transformer_manager or default,
SAMPLE_TYPE)
def setup_polling():
def setup_polling(conf):
"""Setup polling manager according to yaml config file."""
cfg_file = cfg.CONF.pipeline_cfg_file
return PollingManager(cfg_file)
cfg_file = conf.pipeline_cfg_file
return PollingManager(conf, cfg_file)
def get_pipeline_grouping_key(pipe):

View File

@ -62,12 +62,13 @@ cfg.CONF.register_opt(COLL_OPT, 'collector')
keystone_client.register_keystoneauth_opts(cfg.CONF)
def prepare_service(argv=None, config_files=None):
def prepare_service(argv=None, config_files=None, conf=None):
if argv is None:
argv = sys.argv
# FIXME(sileht): Use ConfigOpts() instead
conf = cfg.CONF
if conf is None:
conf = cfg.CONF
oslo_i18n.enable_lazy()
log.register_options(conf)

View File

@ -76,10 +76,12 @@ class PipelineBasedService(cotyledon.Service):
'new': pipeline_hash})
# Pipeline in the notification agent.
if hasattr(self, 'pipeline_manager'):
self.pipeline_manager = pipeline.setup_pipeline()
self.pipeline_manager = pipeline.setup_pipeline(
self.conf)
# Polling in the polling agent.
elif hasattr(self, 'polling_manager'):
self.polling_manager = pipeline.setup_polling()
self.polling_manager = pipeline.setup_polling(
self.conf)
self.pipeline_validated = True
except Exception as err:
LOG.exception(_LE('Unable to load changed pipeline: %s')
@ -96,8 +98,8 @@ class PipelineBasedService(cotyledon.Service):
"old hash: %(old)s, new hash: %(new)s",
{'old': manager.cfg_hash,
'new': ev_pipeline_hash})
self.event_pipeline_manager = (pipeline.
setup_event_pipeline())
self.event_pipeline_manager = (
pipeline. setup_event_pipeline(self.conf))
self.event_pipeline_validated = True
except Exception as err:
LOG.exception(_LE('Unable to load changed event pipeline:'

View File

@ -22,6 +22,7 @@ import tempfile
import traceback
import mock
from oslo_config import fixture as fixture_config
from oslo_utils import timeutils
from oslotest import base
from oslotest import mockpatch
@ -137,6 +138,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def setUp(self):
super(BasePipelineTestCase, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.test_counter = sample.Sample(
name='a',
@ -204,6 +206,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def _exception_create_pipelinemanager(self):
self.assertRaises(pipeline.PipelineException,
pipeline.PipelineManager,
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager)
@ -213,7 +216,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_no_transformers(self):
self._unset_pipeline_cfg('transformers')
pipeline.PipelineManager(self.cfg2file(self.pipeline_cfg),
pipeline.PipelineManager(self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager)
def test_no_name(self):
@ -223,6 +227,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_no_interval(self):
self._unset_pipeline_cfg('interval')
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
self.assertEqual(600, pipe.get_interval())
@ -269,12 +274,14 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_get_interval(self):
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
self.assertEqual(5, pipe.get_interval())
def test_publisher_transformer_invoked(self):
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@ -290,6 +297,7 @@ class BasePipelineTestCase(base.BaseTestCase):
counter_cfg = ['a', 'b']
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -322,6 +330,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_none_volume_counter(self, LOG):
self._set_pipeline_cfg('counters', ['empty_volume'])
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -355,6 +364,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_fake_volume_counter(self, LOG):
self._set_pipeline_cfg('counters', ['fake_volume'])
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -388,6 +398,7 @@ class BasePipelineTestCase(base.BaseTestCase):
counter_cfg = ['nomatch']
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@ -400,6 +411,7 @@ class BasePipelineTestCase(base.BaseTestCase):
counter_cfg = ['*']
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@ -413,6 +425,7 @@ class BasePipelineTestCase(base.BaseTestCase):
counter_cfg = ['*', '!a']
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].support_meter('a'))
@ -420,6 +433,7 @@ class BasePipelineTestCase(base.BaseTestCase):
counter_cfg = ['*', '!b']
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@ -432,6 +446,7 @@ class BasePipelineTestCase(base.BaseTestCase):
counter_cfg = ['!b', '!c']
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@ -447,6 +462,7 @@ class BasePipelineTestCase(base.BaseTestCase):
counter_cfg = ['!a', '!c']
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].support_meter('a'))
self.assertTrue(pipeline_manager.pipelines[0].support_meter('b'))
@ -456,6 +472,7 @@ class BasePipelineTestCase(base.BaseTestCase):
counter_cfg = ['*', '!disk.*']
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].
support_meter('disk.read.bytes'))
@ -465,6 +482,7 @@ class BasePipelineTestCase(base.BaseTestCase):
counter_cfg = ['cpu', 'disk.*']
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertTrue(pipeline_manager.pipelines[0].
support_meter('disk.read.bytes'))
@ -476,6 +494,7 @@ class BasePipelineTestCase(base.BaseTestCase):
counter_cfg = ['!cpu', '!disk.*']
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].
support_meter('disk.read.bytes'))
@ -487,6 +506,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._augment_pipeline_cfg()
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@ -524,6 +544,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._reraise_exception = False
self._break_pipeline_cfg()
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
@ -557,6 +578,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_none_transformer_pipeline(self):
self._set_pipeline_cfg('transformers', None)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@ -568,6 +590,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_empty_transformer_pipeline(self):
self._set_pipeline_cfg('transformers', [])
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@ -589,6 +612,7 @@ class BasePipelineTestCase(base.BaseTestCase):
]
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@ -623,6 +647,7 @@ class BasePipelineTestCase(base.BaseTestCase):
]
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@ -661,6 +686,7 @@ class BasePipelineTestCase(base.BaseTestCase):
]
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@ -678,6 +704,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_multiple_publisher(self):
self._set_pipeline_cfg('publishers', ['test://', 'new://'])
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@ -695,6 +722,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._reraise_exception = False
self._set_pipeline_cfg('publishers', ['except://', 'new://'])
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@ -707,6 +735,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_multiple_counter_pipeline(self):
self._set_pipeline_cfg('counters', ['a', 'b'])
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter,
@ -746,6 +775,7 @@ class BasePipelineTestCase(base.BaseTestCase):
]
self._extend_pipeline_cfg('transformers', extra_transformer_cfg)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -783,6 +813,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._extend_pipeline_cfg('transformers', extra_transformer_cfg)
self._set_pipeline_cfg('counters', ['a', 'b'])
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter,
@ -817,6 +848,7 @@ class BasePipelineTestCase(base.BaseTestCase):
}]
self._extend_pipeline_cfg('transformers', extra_transformer_cfg)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -857,6 +889,7 @@ class BasePipelineTestCase(base.BaseTestCase):
),
]
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -909,7 +942,8 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata={}
),
]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
pipeline_manager = pipeline.PipelineManager(self.CONF,
self.pipeline_cfg,
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -999,6 +1033,7 @@ class BasePipelineTestCase(base.BaseTestCase):
),
]
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1083,6 +1118,7 @@ class BasePipelineTestCase(base.BaseTestCase):
),
]
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1110,6 +1146,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('counters', ['cpu'])
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1170,12 +1207,14 @@ class BasePipelineTestCase(base.BaseTestCase):
resources = ['test1://', 'test2://']
self._set_pipeline_cfg('resources', resources)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertEqual(resources,
pipeline_manager.pipelines[0].resources)
def test_no_resources(self):
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertEqual(0, len(pipeline_manager.pipelines[0].resources))
@ -1244,6 +1283,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('counters', ['disk.read.bytes',
'disk.write.requests'])
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
meters = ('disk.read.bytes', 'disk.write.requests')
@ -1327,7 +1367,8 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata={'version': '3.0'}
),
]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
pipeline_manager = pipeline.PipelineManager(self.CONF,
self.pipeline_cfg,
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1363,6 +1404,7 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata={'version': '1.0'}
))
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1558,6 +1600,7 @@ class BasePipelineTestCase(base.BaseTestCase):
)
]
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1595,6 +1638,7 @@ class BasePipelineTestCase(base.BaseTestCase):
),
]
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1642,6 +1686,7 @@ class BasePipelineTestCase(base.BaseTestCase):
)
]
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1788,6 +1833,7 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata=s.get('metadata')
))
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
for s in counters:
@ -1907,6 +1953,7 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata=None
)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -1937,6 +1984,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('counters', ['unrelated-sample'])
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
timeutils.advance_time_seconds(200)
pipe = pipeline_manager.pipelines[0]
@ -1957,6 +2005,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('counters', ['cpu'])
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -2127,6 +2176,7 @@ class BasePipelineTestCase(base.BaseTestCase):
]
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertEqual(set(['resource_id', 'counter_name']),
set(pipeline.get_pipeline_grouping_key(
@ -2145,6 +2195,7 @@ class BasePipelineTestCase(base.BaseTestCase):
]
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertEqual(['counter_name'],
pipeline.get_pipeline_grouping_key(

View File

@ -194,7 +194,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
params = []
def setup_polling(self):
self.mgr.polling_manager = pipeline.PollingManager(self.pipeline_cfg)
self.mgr.polling_manager = pipeline.PollingManager(self.CONF,
self.pipeline_cfg)
def create_extension_list(self):
return [extension.Extension('test',
@ -311,7 +312,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.CONF.set_override('heartbeat', 1.0, group='coordination')
self.mgr.partition_coordinator.heartbeat = mock.MagicMock()
self.mgr.run()
setup_polling.assert_called_once_with()
setup_polling.assert_called_once_with(self.CONF)
mpc.start.assert_called_once_with()
self.assertEqual(2, mpc.join_group.call_count)
self.mgr.setup_polling_tasks.assert_called_once_with()

View File

@ -265,6 +265,8 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
self.notified_samples.append(m)
def setUp(self):
self.conf = self.useFixture(fixture_config.Config()).conf
self.conf([])
self.notified_samples = []
self.notifier = mock.Mock()
self.notifier.sample.side_effect = self.fake_notifier_sample
@ -316,7 +318,8 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
'transformers': [],
'publishers': ["test"]}]
}
self.mgr.polling_manager = pipeline.PollingManager(self.pipeline_cfg)
self.mgr.polling_manager = pipeline.PollingManager(self.CONF,
self.pipeline_cfg)
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(list(polling_tasks.values())[0])
self.assertFalse(self.PollsterKeystone.samples)
@ -364,7 +367,8 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
'transformers': [],
'publishers': ["test"]}]
}
self.mgr.polling_manager = pipeline.PollingManager(self.pipeline_cfg)
self.mgr.polling_manager = pipeline.PollingManager(self.CONF,
self.pipeline_cfg)
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(list(polling_tasks.values())[0])
self.assertEqual(1, novalog.exception.call_count)
@ -385,7 +389,8 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
'transformers': [],
'publishers': ["test"]}]
}
self.mgr.polling_manager = pipeline.PollingManager(self.pipeline_cfg)
self.mgr.polling_manager = pipeline.PollingManager(self.CONF,
self.pipeline_cfg)
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
pollster = list(polling_task.pollster_matches[source_name])[0]
@ -426,7 +431,8 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
'publishers': ["test"]}]
}
self.mgr.polling_manager = pipeline.PollingManager(pipeline_cfg)
self.mgr.polling_manager = pipeline.PollingManager(self.CONF,
pipeline_cfg)
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
self.mgr.interval_task(polling_task)

View File

@ -108,7 +108,7 @@ class TestEventEndpoint(tests_base.BaseTestCase):
self.CONF.set_override('event_pipeline_cfg_file',
ev_pipeline_cfg_file)
ev_pipeline_mgr = pipeline.setup_event_pipeline()
ev_pipeline_mgr = pipeline.setup_event_pipeline(self.CONF)
return ev_pipeline_mgr
def _setup_endpoint(self, publishers):

View File

@ -137,6 +137,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
})
self.pipeline_cfg['sources'][0]['sinks'].append('second_sink')
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@ -179,6 +180,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
'sinks': ['test_sink']
})
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@ -224,6 +226,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
for s in pipeline_cfg['sinks']:
s['publishers'] = ['test://']
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[index]
self._do_test_rate_of_change_mapping(pipe, meters, units)
@ -277,6 +280,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
})
self.assertRaises(pipeline.PipelineException,
pipeline.PipelineManager,
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager)
@ -290,5 +294,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
})
self.assertRaises(pipeline.PipelineException,
pipeline.PipelineManager,
self.CONF,
self.cfg2file(self.pipeline_cfg),
self.transformer_manager)

View File

@ -45,6 +45,9 @@ class EventPipelineTestCase(base.BaseTestCase):
def setUp(self):
super(EventPipelineTestCase, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.CONF([])
self.p_type = pipeline.EVENT_TYPE
self.transformer_manager = None
@ -150,6 +153,7 @@ class EventPipelineTestCase(base.BaseTestCase):
def _exception_create_pipelinemanager(self):
self.assertRaises(pipeline.PipelineException,
pipeline.PipelineManager,
self.CONF,
self.pipeline_cfg,
self.transformer_manager,
self.p_type)
@ -163,7 +167,8 @@ class EventPipelineTestCase(base.BaseTestCase):
self._exception_create_pipelinemanager()
def test_name(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
pipeline_manager = pipeline.PipelineManager(self.CONF,
self.pipeline_cfg,
self.transformer_manager,
self.p_type)
for pipe in pipeline_manager.pipelines:
@ -195,7 +200,8 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_multiple_included_events(self):
event_cfg = ['a', 'b']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
pipeline_manager = pipeline.PipelineManager(self.CONF,
self.pipeline_cfg,
self.transformer_manager,
self.p_type)
@ -215,7 +221,8 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_event_non_match(self):
event_cfg = ['nomatch']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
pipeline_manager = pipeline.PipelineManager(self.CONF,
self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher() as p:
@ -228,7 +235,8 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_wildcard_event(self):
event_cfg = ['*']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
pipeline_manager = pipeline.PipelineManager(self.CONF,
self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher() as p:
@ -241,7 +249,8 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_wildcard_excluded_events(self):
event_cfg = ['*', '!a']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
pipeline_manager = pipeline.PipelineManager(self.CONF,
self.pipeline_cfg,
self.transformer_manager,
self.p_type)
self.assertFalse(pipeline_manager.pipelines[0].support_event('a'))
@ -249,7 +258,8 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_wildcard_excluded_events_not_excluded(self):
event_cfg = ['*', '!b']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
pipeline_manager = pipeline.PipelineManager(self.CONF,
self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher() as p:
@ -261,7 +271,8 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_all_excluded_events_not_excluded(self):
event_cfg = ['!b', '!c']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
pipeline_manager = pipeline.PipelineManager(self.CONF,
self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher() as p:
@ -274,7 +285,8 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_all_excluded_events_excluded(self):
event_cfg = ['!a', '!c']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
pipeline_manager = pipeline.PipelineManager(self.CONF,
self.pipeline_cfg,
self.transformer_manager,
self.p_type)
self.assertFalse(pipeline_manager.pipelines[0].support_event('a'))
@ -284,7 +296,8 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_wildcard_and_excluded_wildcard_events(self):
event_cfg = ['*', '!compute.*']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
pipeline_manager = pipeline.PipelineManager(self.CONF,
self.pipeline_cfg,
self.transformer_manager,
self.p_type)
self.assertFalse(pipeline_manager.pipelines[0].
@ -295,7 +308,8 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_included_event_and_wildcard_events(self):
event_cfg = ['compute.instance.create.start', 'identity.*']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
pipeline_manager = pipeline.PipelineManager(self.CONF,
self.pipeline_cfg,
self.transformer_manager,
self.p_type)
self.assertTrue(pipeline_manager.pipelines[0].
@ -308,7 +322,8 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_excluded_event_and_excluded_wildcard_events(self):
event_cfg = ['!compute.instance.create.start', '!identity.*']
self._set_pipeline_cfg('events', event_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
pipeline_manager = pipeline.PipelineManager(self.CONF,
self.pipeline_cfg,
self.transformer_manager,
self.p_type)
self.assertFalse(pipeline_manager.pipelines[0].
@ -321,7 +336,8 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_multiple_pipeline(self):
self._augment_pipeline_cfg()
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
pipeline_manager = pipeline.PipelineManager(self.CONF,
self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher() as p:
@ -338,7 +354,8 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_multiple_publisher(self):
self._set_pipeline_cfg('publishers', ['test://', 'new://'])
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
pipeline_manager = pipeline.PipelineManager(self.CONF,
self.pipeline_cfg,
self.transformer_manager,
self.p_type)
@ -355,7 +372,8 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_multiple_publisher_isolation(self):
self._reraise_exception = False
self._set_pipeline_cfg('publishers', ['except://', 'new://'])
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
pipeline_manager = pipeline.PipelineManager(self.CONF,
self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher() as p:
@ -370,8 +388,6 @@ class EventPipelineTestCase(base.BaseTestCase):
self._exception_create_pipelinemanager()
def test_event_pipeline_endpoint_requeue_on_failure(self):
self.CONF = self.useFixture(fixture_config.Config()).conf
self.CONF([])
self.CONF.set_override("ack_on_event_error", False,
group="notification")
@ -397,7 +413,8 @@ class EventPipelineTestCase(base.BaseTestCase):
'ceilometer.publisher.test.TestPublisher',
return_value=fake_publisher))
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
pipeline_manager = pipeline.PipelineManager(self.CONF,
self.pipeline_cfg,
self.transformer_manager,
self.p_type)
event_pipeline_endpoint = pipeline.EventPipelineEndpoint(