Factorize yaml loading of declarative stuffs

This change continues to remove code duplication by factorizing the
yaml file loading.

This also homogenizes the error handling.
The precision of error message will be the same for all.
When the yaml file is incorrectly formatted, we now raise an exception,
instead of sometimes raising and sometimes loading an empty definitions,
that can generated an unwanted behavior of collector and notification agents.

Change-Id: I9ae8f519472f1ae2a14e61028a427ba6f3b0d1f3
This commit is contained in:
Mehdi Abaakouk 2015-09-17 09:34:07 +02:00
parent e08188f5b8
commit 72edfb6093
6 changed files with 122 additions and 181 deletions

View File

@ -11,11 +11,17 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
from jsonpath_rw_ext import parser
from oslo_config import cfg
from oslo_log import log
import six
import yaml
from ceilometer.i18n import _
from ceilometer.i18n import _, _LI
LOG = log.getLogger(__name__)
class DefinitionException(Exception):
@ -116,3 +122,45 @@ class Definition(object):
return values
else:
return values[0] if values else None
def load_definitions(defaults, config_file, fallback_file=None):
"""Setup a definitions from yaml config file."""
if not os.path.exists(config_file):
config_file = cfg.CONF.find_file(config_file)
if not config_file and fallback_file is not None:
LOG.debug("No Definitions configuration file found!"
"Using default config.")
config_file = fallback_file
if config_file is not None:
LOG.debug("Loading definitions configuration file: %s", config_file)
with open(config_file) as cf:
config = cf.read()
try:
definition_cfg = yaml.safe_load(config)
except yaml.YAMLError as err:
if hasattr(err, 'problem_mark'):
mark = err.problem_mark
errmsg = (_("Invalid YAML syntax in Definitions file "
"%(file)s at line: %(line)s, column: %(column)s.")
% dict(file=config_file,
line=mark.line + 1,
column=mark.column + 1))
else:
errmsg = (_("YAML error reading Definitions file "
"%(file)s")
% dict(file=config_file))
LOG.error(errmsg)
raise
else:
LOG.debug("No Definitions configuration file found!"
"Using default config.")
definition_cfg = defaults
LOG.info(_LI("Definitions: %s"), definition_cfg)
return definition_cfg

View File

@ -22,7 +22,6 @@ from oslo_config import cfg
from oslo_log import log
import six
from stevedore import extension
import yaml
from ceilometer import declarative
from ceilometer import dispatcher
@ -149,16 +148,8 @@ class GnocchiDispatcher(dispatcher.Base):
def _load_resources_definitions(cls, conf):
plugin_manager = extension.ExtensionManager(
namespace='ceilometer.event.trait_plugin')
res_def_file = cls._get_config_file(
conf, conf.dispatcher_gnocchi.resources_definition_file)
data = {}
if res_def_file is not None:
with open(res_def_file) as data_file:
try:
data = yaml.safe_load(data_file)
except ValueError:
data = {}
data = declarative.load_definitions(
{}, conf.dispatcher_gnocchi.resources_definition_file)
return [ResourcesDefinition(r, conf.dispatcher_gnocchi.archive_policy,
plugin_manager)
for r in data.get('resources', [])]

View File

@ -14,18 +14,16 @@
# under the License.
import fnmatch
import os
from debtcollector import moves
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
import six
import yaml
from ceilometer import declarative
from ceilometer.event.storage import models
from ceilometer.i18n import _, _LI
from ceilometer.i18n import _
OPTS = [
cfg.StrOpt('definitions_cfg_file',
@ -296,47 +294,9 @@ class NotificationEventsConverter(object):
return edef.to_event(notification_body)
def get_config_file():
config_file = cfg.CONF.event.definitions_cfg_file
if not os.path.exists(config_file):
config_file = cfg.CONF.find_file(config_file)
return config_file
def setup_events(trait_plugin_mgr):
"""Setup the event definitions from yaml config file."""
config_file = get_config_file()
if config_file is not None:
LOG.debug("Event Definitions configuration file: %s", config_file)
with open(config_file) as cf:
config = cf.read()
try:
events_config = yaml.safe_load(config)
except yaml.YAMLError as err:
if hasattr(err, 'problem_mark'):
mark = err.problem_mark
errmsg = (_("Invalid YAML syntax in Event Definitions file "
"%(file)s at line: %(line)s, column: %(column)s.")
% dict(file=config_file,
line=mark.line + 1,
column=mark.column + 1))
else:
errmsg = (_("YAML error reading Event Definitions file "
"%(file)s")
% dict(file=config_file))
LOG.error(errmsg)
raise
else:
LOG.debug("No Event Definitions configuration file found!"
" Using default config.")
events_config = []
LOG.info(_LI("Event Definitions: %s"), events_config)
allow_drop = cfg.CONF.event.drop_unmatched_notifications
return NotificationEventsConverter(events_config,
trait_plugin_mgr,
add_catchall=not allow_drop)
return NotificationEventsConverter(
declarative.load_definitions([], cfg.CONF.event.definitions_cfg_file),
trait_plugin_mgr,
add_catchall=not cfg.CONF.event.drop_unmatched_notifications)

View File

@ -13,10 +13,8 @@
import fnmatch
import itertools
import os
import pkg_resources
import six
import yaml
from debtcollector import moves
from oslo_config import cfg
@ -26,7 +24,7 @@ from stevedore import extension
from ceilometer.agent import plugin_base
from ceilometer import declarative
from ceilometer.i18n import _LE, _LI
from ceilometer.i18n import _LE
from ceilometer import sample
OPTS = [
@ -168,79 +166,35 @@ class MeterDefinition(object):
yield sample
def get_config_file():
config_file = cfg.CONF.meter.meter_definitions_cfg_file
if not os.path.exists(config_file):
config_file = cfg.CONF.find_file(config_file)
if not config_file:
config_file = pkg_resources.resource_filename(
__name__, "data/meters.yaml")
return config_file
def setup_meters_config():
"""Setup the meters definitions from yaml config file."""
config_file = get_config_file()
if config_file is not None:
LOG.debug(_LE("Meter Definitions configuration file: %s"), config_file)
with open(config_file) as cf:
config = cf.read()
try:
meters_config = yaml.safe_load(config)
except yaml.YAMLError as err:
if hasattr(err, 'problem_mark'):
mark = err.problem_mark
errmsg = (_LE("Invalid YAML syntax in Meter Definitions file "
"%(file)s at line: %(line)s, column: %(column)s.")
% dict(file=config_file,
line=mark.line + 1,
column=mark.column + 1))
else:
errmsg = (_LE("YAML error reading Meter Definitions file "
"%(file)s")
% dict(file=config_file))
LOG.error(errmsg)
raise
else:
LOG.debug(_LE("No Meter Definitions configuration file found!"
" Using default config."))
meters_config = {}
LOG.info(_LI("Meter Definitions: %s"), meters_config)
return meters_config
def load_definitions(config_def):
if not config_def:
return []
plugin_manager = extension.ExtensionManager(
namespace='ceilometer.event.trait_plugin')
meter_defs = []
for event_def in reversed(config_def['metric']):
try:
if (event_def['volume'] != 1 or
not cfg.CONF.notification.disable_non_metric_meters):
meter_defs.append(MeterDefinition(event_def, plugin_manager))
except declarative.DefinitionException as me:
errmsg = (_LE("Error loading meter definition : %(err)s")
% dict(err=six.text_type(me)))
LOG.error(errmsg)
return meter_defs
class ProcessMeterNotifications(plugin_base.NotificationBase):
event_types = []
def __init__(self, manager):
super(ProcessMeterNotifications, self).__init__(manager)
self.definitions = load_definitions(setup_meters_config())
self.definitions = self._load_definitions()
@staticmethod
def _load_definitions():
plugin_manager = extension.ExtensionManager(
namespace='ceilometer.event.trait_plugin')
meters_cfg = declarative.load_definitions(
{}, cfg.CONF.meter.meter_definitions_cfg_file,
pkg_resources.resource_filename(__name__, "data/meters.yaml"))
definitions = []
for meter_cfg in reversed(meters_cfg['metric']):
if (meter_cfg['volume'] != 1
or not cfg.CONF.notification.disable_non_metric_meters):
try:
md = MeterDefinition(meter_cfg, plugin_manager)
except declarative.DefinitionException as me:
errmsg = (_LE("Error loading meter definition : %(err)s")
% dict(err=six.text_type(me)))
LOG.error(errmsg)
else:
definitions.append(md)
return definitions
def get_targets(self, conf):
"""Return a sequence of oslo_messaging.Target

View File

@ -23,6 +23,7 @@ import six
from ceilometer import declarative
from ceilometer.event import converter
from ceilometer.event.storage import models
from ceilometer import service as ceilometer_service
from ceilometer.tests import base
@ -597,6 +598,7 @@ class TestNotificationConverter(ConverterBase):
def setUp(self):
super(TestNotificationConverter, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
ceilometer_service.prepare_service(argv=[], config_files=[])
self.valid_event_def1 = [{
'event_type': 'compute.instance.create.*',
'traits': {
@ -760,9 +762,9 @@ class TestNotificationConverter(ConverterBase):
self.assertTrue(self._convert_message(c, 'info').raw)
self.assertFalse(self._convert_message(c, 'error').raw)
@mock.patch('ceilometer.event.converter.get_config_file',
mock.Mock(return_value=None))
def test_setup_events_default_config(self):
self.CONF.set_override('definitions_cfg_file',
'/not/existing/file', group='event')
self.CONF.set_override('drop_unmatched_notifications',
False, group='event')

View File

@ -14,14 +14,15 @@
"""
import copy
import mock
import os
import six
import yaml
from oslo_config import fixture as fixture_config
from oslo_utils import encodeutils
from oslo_utils import fileutils
from oslotest import mockpatch
import ceilometer
from ceilometer import declarative
from ceilometer.meter import notifications
from ceilometer import service as ceilometer_service
@ -264,22 +265,27 @@ class TestMeterProcessing(test.BaseTestCase):
self.handler = notifications.ProcessMeterNotifications(mock.Mock())
def test_fallback_meter_path(self):
self.useFixture(mockpatch.PatchObject(self.CONF,
'find_file', return_value=None))
fall_bak_path = notifications.get_config_file()
self.assertIn("meter/data/meters.yaml", fall_bak_path)
self.CONF.set_override('meter_definitions_cfg_file',
'/not/existing/path', group='meter')
with mock.patch('ceilometer.declarative.open',
mock.mock_open(read_data='---\nmetric: []'),
create=True) as mock_open:
self.handler._load_definitions()
if six.PY3:
path = os.path.dirname(ceilometer.__file__)
else:
path = "ceilometer"
mock_open.assert_called_with(path + "/meter/data/meters.yaml")
def __setup_meter_def_file(self, cfg):
def _load_meter_def_file(self, cfg):
if six.PY3:
cfg = cfg.encode('utf-8')
meter_cfg_file = fileutils.write_to_tempfile(content=cfg,
prefix="meters",
suffix="yaml")
self.CONF.set_override(
'meter_definitions_cfg_file',
meter_cfg_file, group='meter')
cfg = notifications.setup_meters_config()
return cfg
self.CONF.set_override('meter_definitions_cfg_file',
meter_cfg_file, group='meter')
self.handler.definitions = self.handler._load_definitions()
@mock.patch('ceilometer.meter.notifications.LOG')
def test_bad_meter_definition_skip(self, LOG):
@ -302,9 +308,8 @@ class TestMeterProcessing(test.BaseTestCase):
volume="$.payload.volume",
resource_id="$.payload.resource_id",
project_id="$.payload.project_id")]})
data = self.__setup_meter_def_file(cfg)
meter_loaded = notifications.load_definitions(data)
self.assertEqual(2, len(meter_loaded))
self._load_meter_def_file(cfg)
self.assertEqual(2, len(self.handler.definitions))
LOG.error.assert_called_with(
"Error loading meter definition : "
"Invalid type bad_type specified")
@ -318,8 +323,7 @@ class TestMeterProcessing(test.BaseTestCase):
volume="$.payload.volume",
resource_id="$.payload.resource_id",
project_id="$.payload.project_id")]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(NOTIFICATION))
self.assertEqual(1, len(c))
s1 = c[0].as_dict()
@ -344,8 +348,7 @@ class TestMeterProcessing(test.BaseTestCase):
volume="$.payload.volume",
resource_id="$.payload.resource_id",
project_id="$.payload.project_id")]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(NOTIFICATION))
self.assertEqual(2, len(c))
s1 = c[0].as_dict()
@ -362,8 +365,7 @@ class TestMeterProcessing(test.BaseTestCase):
volume="$.payload.volume",
resource_id="$.payload.resource_id",
project_id="$.payload.project_id")]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(NOTIFICATION))
self.assertEqual(0, len(c))
@ -376,8 +378,7 @@ class TestMeterProcessing(test.BaseTestCase):
volume="$.payload.volume",
resource_id="$.payload.resource_id",
project_id="$.payload.project_id")]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(NOTIFICATION))
self.assertEqual(1, len(c))
@ -393,8 +394,7 @@ class TestMeterProcessing(test.BaseTestCase):
resource_id="$.payload.target_id",
project_id="$.payload.initiator.project_id",
multi="name")]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(event))
self.assertEqual(1, len(c))
s1 = c[0].as_dict()
@ -413,8 +413,7 @@ class TestMeterProcessing(test.BaseTestCase):
project_id="$.payload.initiator.project_id",
multi="name",
timestamp='$.payload.eventTime')]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(event))
self.assertEqual(1, len(c))
s1 = c[0].as_dict()
@ -432,8 +431,7 @@ class TestMeterProcessing(test.BaseTestCase):
resource_id="'prefix-' + $.payload.nodename",
timestamp="$.payload.metrics"
"[?(@.name='cpu.frequency')].timestamp")]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(METRICS_UPDATE))
self.assertEqual(1, len(c))
s1 = c[0].as_dict()
@ -449,8 +447,7 @@ class TestMeterProcessing(test.BaseTestCase):
volume="$.payload.volume",
resource_id="$.payload.resource_id",
project_id="$.payload.project_id")]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(NOTIFICATION))
self.assertEqual(1, len(c))
s1 = c[0].as_dict()
@ -470,8 +467,7 @@ class TestMeterProcessing(test.BaseTestCase):
project_id="$.payload.project_id",
metadata={'proj': '$.payload.project_id',
'dict': '$.payload.resource_metadata'})]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(NOTIFICATION))
self.assertEqual(1, len(c))
s1 = c[0].as_dict()
@ -495,8 +491,7 @@ class TestMeterProcessing(test.BaseTestCase):
volume="$.payload.volume",
resource_id="$.payload.resource_id",
project_id="$.payload.project_id")]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(NOTIFICATION))
self.assertEqual(2, len(c))
@ -510,8 +505,7 @@ class TestMeterProcessing(test.BaseTestCase):
resource_id="$.payload.target_id",
project_id="$.payload.initiator.project_id",
lookup=["name", "volume", "unit"])]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(MIDDLEWARE_EVENT))
self.assertEqual(2, len(c))
s1 = c[0].as_dict()
@ -535,8 +529,7 @@ class TestMeterProcessing(test.BaseTestCase):
resource_id="$.payload.target_id",
project_id="$.payload.initiator.project_id",
lookup=["name", "unit"])]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(event))
self.assertEqual(1, len(c))
s1 = c[0].as_dict()
@ -556,8 +549,7 @@ class TestMeterProcessing(test.BaseTestCase):
resource_id="$.payload.target_id",
project_id="$.payload.initiator.project_id",
lookup="name")]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(event))
self.assertEqual(0, len(c))
@ -573,8 +565,7 @@ class TestMeterProcessing(test.BaseTestCase):
user_id="$.payload.[*].user_id",
lookup=['name', 'type', 'unit', 'volume',
'resource_id', 'project_id', 'user_id'])]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(FULL_MULTI_MSG))
self.assertEqual(2, len(c))
msg = FULL_MULTI_MSG['payload']
@ -602,8 +593,7 @@ class TestMeterProcessing(test.BaseTestCase):
resource_id="$.payload.target_id",
project_id="$.payload.initiator.project_id",
lookup=["name", "unit", "volume"])]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(event))
self.assertEqual(0, len(c))
LOG.warning.assert_called_with('Only 0 fetched meters contain '
@ -622,8 +612,7 @@ class TestMeterProcessing(test.BaseTestCase):
resource_id="$.payload.target_id",
project_id="$.payload.initiator.project_id",
lookup=["name", "unit", "volume"])]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(event))
self.assertEqual(0, len(c))
LOG.warning.assert_called_with('Only 1 fetched meters contain '
@ -640,8 +629,7 @@ class TestMeterProcessing(test.BaseTestCase):
" * 100",
resource_id="$.payload.host + '_'"
" + $.payload.nodename")]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(METRICS_UPDATE))
self.assertEqual(1, len(c))
s1 = c[0].as_dict()
@ -660,8 +648,7 @@ class TestMeterProcessing(test.BaseTestCase):
".value",
resource_id="$.payload.host + '_'"
" + $.payload.nodename")]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(METRICS_UPDATE))
self.assertEqual(1, len(c))
s1 = c[0].as_dict()
@ -679,8 +666,7 @@ class TestMeterProcessing(test.BaseTestCase):
volume="$.payload.metrics[?(@.name='cpu.frequency')]"
".value",
resource_id="'prefix-' + $.payload.nodename")]})
self.handler.definitions = notifications.load_definitions(
self.__setup_meter_def_file(cfg))
self._load_meter_def_file(cfg)
c = list(self.handler.process_notification(METRICS_UPDATE))
self.assertEqual(1, len(c))
s1 = c[0].as_dict()