Sketch out a plugin system for saving metering data.

Change-Id: Ib91213ac78fd5e3baf7200849b4fa8554c652a99
This commit is contained in:
Doug Hellmann 2012-06-06 15:32:42 -04:00
parent 0767165ebc
commit 1bd1fd6846
11 changed files with 319 additions and 2 deletions

View File

@ -26,6 +26,7 @@ from ceilometer import log
from ceilometer import publish from ceilometer import publish
from ceilometer import rpc from ceilometer import rpc
from ceilometer.collector import dispatcher from ceilometer.collector import dispatcher
from ceilometer import storage
# FIXME(dhellmann): There must be another way to do this. # FIXME(dhellmann): There must be another way to do this.
# Import rabbit_notifier to register notification_topics flag # Import rabbit_notifier to register notification_topics flag
@ -43,6 +44,10 @@ class CollectorManager(manager.Manager):
def init_host(self): def init_host(self):
self.connection = rpc.Connection(flags.FLAGS) self.connection = rpc.Connection(flags.FLAGS)
storage.register_opts(cfg.CONF)
self.storage_engine = storage.get_engine(cfg.CONF)
self.storage_conn = self.storage_engine.get_connection(cfg.CONF)
self.compute_handler = dispatcher.NotificationDispatcher( self.compute_handler = dispatcher.NotificationDispatcher(
COMPUTE_COLLECTOR_NAMESPACE, COMPUTE_COLLECTOR_NAMESPACE,
self._publish_counter, self._publish_counter,
@ -79,3 +84,8 @@ class CollectorManager(manager.Manager):
data['event_type'], data['event_type'],
data['resource_id'], data['resource_id'],
data['counter_volume']) data['counter_volume'])
try:
self.storage_conn.record_metering_data(data)
except Exception as err:
LOG.error('Failed to record metering data: %s', err)
LOG.exception(err)

View File

@ -0,0 +1,63 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Storage backend management
"""
import pkg_resources
from ceilometer import cfg
from ceilometer import log
LOG = log.getLogger(__name__)
STORAGE_ENGINE_NAMESPACE = 'ceilometer.storage'
STORAGE_OPTS = [
cfg.StrOpt('metering_storage_engine',
default='log',
help='The name of the storage engine to use',
),
]
def register_opts(conf):
"""Register any options for the storage system.
"""
conf.register_opts(STORAGE_OPTS)
p = get_engine(conf)
p.register_opts(conf)
def get_engine(conf):
"""Load the configured engine and return an instance.
"""
engine_name = conf.metering_storage_engine
for ep in pkg_resources.iter_entry_points(STORAGE_ENGINE_NAMESPACE,
engine_name):
try:
engine_class = ep.load()
engine = engine_class()
except Exception as err:
LOG.error('Failed to load storage engine %s: %s',
engine_name, err)
LOG.exception(err)
raise
LOG.info('Loaded %s storage engine', engine_name)
return engine
else:
raise RuntimeError('No %r storage engine found' % engine_name)

View File

@ -0,0 +1,64 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Base classes for storage engines
"""
import abc
from ceilometer import log
LOG = log.getLogger(__name__)
class StorageEngine(object):
"""Base class for storage engines.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def register_opts(self, conf):
"""Register any configuration options used by this engine.
"""
@abc.abstractmethod
def get_connection(self, conf):
"""Return a Connection instance based on the configuration settings.
"""
class Connection(object):
"""Base class for storage system connections.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __init__(self, conf):
"""Constructor"""
@abc.abstractmethod
def record_metering_data(self, data):
"""Write the data to the backend storage system.
:param data: a dictionary such as returned by
ceilometer.meter.meter_message_from_counter
"""
# FIXME(dhellmann): We will eventually need to add query methods
# for the API server to use, too.

View File

@ -0,0 +1,57 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Simple logging storage backend.
"""
from ceilometer import log
from ceilometer.storage import base
LOG = log.getLogger(__name__)
class LogStorage(base.StorageEngine):
"""Log the data
"""
def register_opts(self, conf):
"""Register any configuration options used by this engine.
"""
def get_connection(self, conf):
"""Return a Connection instance based on the configuration settings.
"""
return Connection(conf)
class Connection(base.Connection):
"""Base class for storage system connections.
"""
def __init__(self, conf):
return
def record_metering_data(self, data):
"""Write the data to the backend storage system.
:param data: a dictionary such as returned by
ceilometer.meter.meter_message_from_counter
"""
LOG.info('metering data %s for %s: %s',
data['event_type'],
data['resource_id'],
data['counter_volume'])

View File

@ -41,5 +41,8 @@ setuptools.setup(
'network_floatingip' 'network_floatingip'
'= ceilometer.compute.network:FloatingIPPollster', '= ceilometer.compute.network:FloatingIPPollster',
], ],
'ceilometer.storage': [
'log = ceilometer.storage.impl_log:LogStorage',
],
}, },
) )

View File

View File

@ -0,0 +1,40 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/storage/
"""
import mox
from ceilometer import storage
from ceilometer.storage import impl_log
def test_get_engine():
conf = mox.Mox().CreateMockAnything()
conf.metering_storage_engine = 'log'
engine = storage.get_engine(conf)
assert isinstance(engine, impl_log.LogStorage)
def test_get_engine_no_such_engine():
conf = mox.Mox().CreateMockAnything()
conf.metering_storage_engine = 'no-such-engine'
try:
storage.get_engine(conf)
except RuntimeError as err:
assert 'no-such-engine' in unicode(err)

View File

@ -0,0 +1,34 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/storage/impl_log.py
"""
import mox
from ceilometer.storage import impl_log
def test_get_connection():
conf = mox.Mox().CreateMockAnything()
conf.metering_storage_engine = 'log'
log_stg = impl_log.LogStorage()
conn = log_stg.get_connection(conf)
conn.record_metering_data({'event_type': 'test',
'resource_id': __name__,
'counter_volume': 1,
})

View File

@ -0,0 +1,46 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/storage/
"""
from nova import flags
from nova import test
from nova import rpc
from ceilometer import cfg
from ceilometer import storage
from ceilometer.storage import base
# (dhellmann): This is needed to get the fake_rabbit config value set
# up for the test base class.
cfg.CONF.register_opts(rpc.rpc_opts)
class RegisterOpts(test.TestCase):
def faux_get_engine(self, conf):
return self._faux_engine
def test_register_opts(self):
self.stubs.Set(storage, 'get_engine', self.faux_get_engine)
flags.FLAGS.metering_storage_engine = 'log'
self._faux_engine = self.mox.CreateMock(base.StorageEngine)
self._faux_engine.register_opts(flags.FLAGS)
self.mox.ReplayAll()
storage.register_opts(flags.FLAGS)