Replace the Gnocchi dispatcher by a publisher

This removes the deprecated Gnocchi dispatcher and replaces it by its
equivalent publisher.

Change-Id: Ie44baf20ccb8de5794f5f0c3d4717f7e56afa63b
This commit is contained in:
Julien Danjou 2017-09-04 13:47:58 +02:00
parent bca9d45ea4
commit 83ffaffcb2
18 changed files with 267 additions and 321 deletions

View File

@ -110,10 +110,6 @@ class V2Controller(object):
if pecan.request.cfg.api.gnocchi_is_enabled is not None:
self._gnocchi_is_enabled = (
pecan.request.cfg.api.gnocchi_is_enabled)
elif ("gnocchi" not in pecan.request.cfg.meter_dispatchers
or "database" in pecan.request.cfg.meter_dispatchers):
self._gnocchi_is_enabled = False
else:
try:
catalog = keystone_client.get_service_catalog(

View File

@ -1,34 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
dispatcher_opts = [
cfg.BoolOpt('filter_service_activity',
default=True,
help='Filter out samples generated by Gnocchi '
'service activity'),
cfg.StrOpt('filter_project',
default='gnocchi',
help='Gnocchi project used to filter out samples '
'generated by Gnocchi service activity'),
cfg.StrOpt('archive_policy',
help='The archive policy to use when the dispatcher '
'create a new metric.'),
cfg.StrOpt('resources_definition_file',
default='gnocchi_resources.yaml',
help=('The Yaml file that defines mapping between samples '
'and gnocchi resources/metrics')),
cfg.FloatOpt('request_timeout', default=6.05, min=0.0,
help='Number of seconds before request to gnocchi times out'),
]

View File

@ -21,12 +21,10 @@ from ceilometer import keystone_client
LOG = log.getLogger(__name__)
def get_gnocchiclient(conf, timeout_override=False):
group = conf.dispatcher_gnocchi.auth_section
timeout = (None if (not conf.dispatcher_gnocchi.request_timeout or
timeout_override)
else conf.dispatcher_gnocchi.request_timeout)
session = keystone_client.get_session(conf, group=group, timeout=timeout)
def get_gnocchiclient(conf, request_timeout=None):
group = conf.gnocchi.auth_section
session = keystone_client.get_session(conf, group=group,
timeout=request_timeout)
adapter = keystoneauth1.session.TCPKeepAliveAdapter(
pool_maxsize=conf.max_parallel_requests)
session.mount("http://", adapter)
@ -188,7 +186,7 @@ resources_update_operations = [
def upgrade_resource_types(conf):
gnocchi = get_gnocchiclient(conf, True)
gnocchi = get_gnocchiclient(conf)
for name, attributes in resources_initial.items():
try:
gnocchi.resource_type.get(name=name)

View File

@ -23,7 +23,7 @@ DEFAULT_GROUP = "service_credentials"
# List of group that can set auth_section to use a different
# credentials section
OVERRIDABLE_GROUPS = ['dispatcher_gnocchi', 'zaqar']
OVERRIDABLE_GROUPS = ['gnocchi', 'zaqar']
def get_session(conf, requests_session=None, group=None, timeout=None):

View File

@ -27,7 +27,6 @@ import ceilometer.compute.virt.libvirt.utils
import ceilometer.compute.virt.vmware.inspector
import ceilometer.compute.virt.xenapi.inspector
import ceilometer.dispatcher
import ceilometer.dispatcher.gnocchi_opts
import ceilometer.event.converter
import ceilometer.hardware.discovery
import ceilometer.hardware.pollsters.generic
@ -105,8 +104,28 @@ def list_opts():
'membership has changed'),
]),
('database', ceilometer.storage.OPTS),
('dispatcher_gnocchi',
ceilometer.dispatcher.gnocchi_opts.dispatcher_opts),
('dispatcher_gnocchi', (
cfg.StrOpt(
'filter_project',
deprecated_for_removal=True,
default='gnocchi',
help='Gnocchi project used to filter out samples '
'generated by Gnocchi service activity'),
cfg.StrOpt(
'archive_policy',
deprecated_for_removal=True,
help='The archive policy to use when the dispatcher '
'create a new metric.'),
cfg.StrOpt(
'resources_definition_file',
deprecated_for_removal=True,
default='gnocchi_resources.yaml',
help=('The Yaml file that defines mapping between samples '
'and gnocchi resources/metrics')),
cfg.FloatOpt(
'request_timeout', default=6.05, min=0.0,
deprecated_for_removal=True,
help='Number of seconds before request to gnocchi times out'))),
('event', ceilometer.event.converter.OPTS),
('hardware', itertools.chain(
ceilometer.hardware.discovery.OPTS,

View File

@ -31,14 +31,14 @@ class DirectPublisher(publisher.ConfigPublisherBase):
are required.
By default, the database dispatcher is used to select another one we
can use direct://?dispatcher=gnocchi, ...
can use direct://?dispatcher=name_of_dispatcher, ...
"""
def __init__(self, conf, parsed_url):
super(DirectPublisher, self).__init__(conf, parsed_url)
default_dispatcher = parsed_url.scheme
if default_dispatcher == 'direct':
LOG.warning('Direct publisher is deprecated for removal. Use '
'an explicit publisher instead, e.g. "gnocchi", '
'an explicit publisher instead, e.g. '
'"database", "file", ...')
default_dispatcher = 'database'
options = urlparse.parse_qs(parsed_url.query)

View File

@ -27,13 +27,14 @@ from oslo_serialization import jsonutils
from oslo_utils import fnmatch
from oslo_utils import timeutils
import six
import six.moves.urllib.parse as urlparse
from stevedore import extension
from ceilometer import declarative
from ceilometer import dispatcher
from ceilometer import gnocchi_client
from ceilometer.i18n import _
from ceilometer import keystone_client
from ceilometer import publisher
NAME_ENCODED = __name__.encode('utf-8')
CACHE_NAMESPACE = uuid.UUID(bytes=hashlib.md5(NAME_ENCODED).digest())
@ -129,14 +130,14 @@ class ResourcesDefinition(object):
def sample_attributes(self, sample):
attrs = {}
for name, definition in self._attributes.items():
value = definition.parse(sample)
value = definition.parse(sample.as_dict())
if value is not None:
attrs[name] = value
return attrs
def event_attributes(self, event):
attrs = {'type': self.cfg['resource_type']}
traits = dict([(trait[0], trait[2]) for trait in event['traits']])
traits = dict([(trait[0], trait[2]) for trait in event.traits])
for attr, field in self.cfg.get('event_attributes', {}).items():
value = traits.get(field)
if value is not None:
@ -168,44 +169,49 @@ class LockedDefaultDict(defaultdict):
key_lock.release()
class GnocchiDispatcher(dispatcher.MeterDispatcherBase,
dispatcher.EventDispatcherBase):
"""Dispatcher class for recording metering data into the Gnocchi service.
class GnocchiPublisher(publisher.ConfigPublisherBase):
"""Publisher class for recording metering data into the Gnocchi service.
The dispatcher class records each meter into the gnocchi service
configured in ceilometer configuration file. An example configuration may
The publisher class records each meter into the gnocchi service
configured in Ceilometer pipeline file. An example target may
look like the following:
[dispatcher_gnocchi]
archive_policy = low
To enable this dispatcher, the following section needs to be present in
ceilometer.conf file
[DEFAULT]
meter_dispatchers = gnocchi
event_dispatchers = gnocchi
gnocchi://?archive_policy=low&filter_project=gnocchi
"""
def __init__(self, conf):
super(GnocchiDispatcher, self).__init__(conf)
self.conf = conf
self.filter_service_activity = (
conf.dispatcher_gnocchi.filter_service_activity)
def __init__(self, conf, parsed_url):
super(GnocchiPublisher, self).__init__(conf, parsed_url)
# TODO(jd) allow to override Gnocchi endpoint via the host in the URL
options = urlparse.parse_qs(parsed_url.query)
self.filter_project = options.get(
'filter_project',
[conf.dispatcher_gnocchi.filter_project])[-1]
resources_definition_file = options.get(
'resources_definition_file',
[conf.dispatcher_gnocchi.resources_definition_file])[-1]
archive_policy = options.get(
'archive_policy',
[conf.dispatcher_gnocchi.archive_policy])[-1]
self.resources_definition = self._load_resources_definitions(
conf, archive_policy, resources_definition_file)
timeout = options.get('timeout',
[conf.dispatcher_gnocchi.request_timeout])[-1]
self._ks_client = keystone_client.get_client(conf)
self.resources_definition = self._load_resources_definitions(conf)
self.cache = None
try:
import oslo_cache
oslo_cache.configure(self.conf)
oslo_cache.configure(conf)
# NOTE(cdent): The default cache backend is a real but
# noop backend. We don't want to use that here because
# we want to avoid the cache pathways entirely if the
# cache has not been configured explicitly.
if self.conf.cache.enabled:
if conf.cache.enabled:
cache_region = oslo_cache.create_region()
self.cache = oslo_cache.configure_cache_region(
self.conf, cache_region)
conf, cache_region)
self.cache.key_mangler = cache_key_mangler
except ImportError:
pass
@ -216,16 +222,18 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase,
self._gnocchi_project_id_lock = threading.Lock()
self._gnocchi_resource_lock = LockedDefaultDict(threading.Lock)
self._gnocchi = gnocchi_client.get_gnocchiclient(conf)
self._gnocchi = gnocchi_client.get_gnocchiclient(
conf, request_timeout=timeout)
self._already_logged_event_types = set()
self._already_logged_metric_names = set()
@classmethod
def _load_resources_definitions(cls, conf):
@staticmethod
def _load_resources_definitions(conf, archive_policy,
resources_definition_file):
plugin_manager = extension.ExtensionManager(
namespace='ceilometer.event.trait_plugin')
data = declarative.load_definitions(
conf, {}, conf.dispatcher_gnocchi.resources_definition_file,
conf, {}, resources_definition_file,
pkg_resources.resource_filename(__name__,
"data/gnocchi_resources.yaml"))
resource_defs = []
@ -233,7 +241,7 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase,
try:
resource_defs.append(ResourcesDefinition(
resource,
conf.dispatcher_gnocchi.archive_policy, plugin_manager))
archive_policy, plugin_manager))
except Exception as exc:
LOG.error("Failed to load resource due to error %s" %
exc)
@ -247,32 +255,32 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase,
if self._gnocchi_project_id is None:
try:
project = self._ks_client.projects.find(
name=self.conf.dispatcher_gnocchi.filter_project)
name=self.filter_project)
except ka_exceptions.NotFound:
LOG.warning('gnocchi project not found in keystone,'
' ignoring the filter_service_activity '
LOG.warning('filtered project not found in keystone,'
' ignoring the filter_project '
'option')
self.filter_service_activity = False
self.filter_project = None
return None
except Exception:
LOG.exception('fail to retrieve user of Gnocchi '
'service')
LOG.exception('fail to retrieve filtered project ')
raise
self._gnocchi_project_id = project.id
LOG.debug("gnocchi project found: %s", self.gnocchi_project_id)
LOG.debug("filtered project found: %s",
self.gnocchi_project_id)
return self._gnocchi_project_id
def _is_swift_account_sample(self, sample):
return bool([rd for rd in self.resources_definition
if rd.cfg['resource_type'] == 'swift_account'
and rd.metric_match(sample['counter_name'])])
and rd.metric_match(sample.name)])
def _is_gnocchi_activity(self, sample):
return (self.filter_service_activity and self.gnocchi_project_id and (
return (self.filter_project and self.gnocchi_project_id and (
# avoid anything from the user used by gnocchi
sample['project_id'] == self.gnocchi_project_id or
sample.project_id == self.gnocchi_project_id or
# avoid anything in the swift account used by gnocchi
(sample['resource_id'] == self.gnocchi_project_id and
(sample.resource_id == self.gnocchi_project_id and
self._is_swift_account_sample(sample))
))
@ -287,16 +295,13 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase,
if operation:
return rd, operation
def record_metering_data(self, data):
# We may have receive only one counter on the wire
if not isinstance(data, list):
data = [data]
def publish_samples(self, data):
# NOTE(sileht): skip sample generated by gnocchi itself
data = [s for s in data if not self._is_gnocchi_activity(s)]
data.sort(key=lambda s: (s['resource_id'], s['counter_name']))
data.sort(key=lambda s: (s.resource_id, s.name))
resource_grouped_samples = itertools.groupby(
data, key=operator.itemgetter('resource_id'))
data, key=operator.attrgetter('resource_id'))
gnocchi_data = {}
measures = {}
@ -308,7 +313,7 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase,
stats['resources'] += 1
metric_grouped_samples = itertools.groupby(
list(samples_of_resource),
key=operator.itemgetter('counter_name'))
key=operator.attrgetter('name'))
res_info = {}
for metric_name, samples in metric_grouped_samples:
@ -328,8 +333,8 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase,
res_info['resource_type'] = rd.cfg['resource_type']
res_info.setdefault("resource", {}).update({
"id": resource_id,
"user_id": samples[0]['user_id'],
"project_id": samples[0]['project_id'],
"user_id": samples[0].user_id,
"project_id": samples[0].project_id,
"metrics": rd.metrics,
})
@ -338,10 +343,10 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase,
rd.sample_attributes(sample))
m = measures.setdefault(resource_id, {}).setdefault(
metric_name, [])
m.append({'timestamp': sample['timestamp'],
'value': sample['counter_volume']})
unit = sample['counter_unit']
metric = sample['counter_name']
m.append({'timestamp': sample.timestamp,
'value': sample.volume})
unit = sample.unit
metric = sample.name
res_info['resource']['metrics'][metric]['unit'] = unit
stats['measures'] += len(measures[resource_id][metric_name])
@ -463,14 +468,14 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase,
else:
return None
def record_events(self, events):
def publish_events(self, events):
for event in events:
rd = self._get_resource_definition_from_event(event['event_type'])
rd = self._get_resource_definition_from_event(event.event_type)
if not rd:
if event['event_type'] not in self._already_logged_event_types:
if event.event_type not in self._already_logged_event_types:
LOG.debug("No gnocchi definition for event type: %s",
event['event_type'])
self._already_logged_event_types.add(event['event_type'])
event.event_type)
self._already_logged_event_types.add(event.event_type)
continue
rd, operation = rd

View File

@ -64,7 +64,6 @@ class TestAPIUpgradePath(v2.FunctionalTest):
return 'http://event-endpoint:8009/'
def _do_test_gnocchi_enabled_without_database_backend(self):
self.CONF.set_override('meter_dispatchers', ['gnocchi'])
for endpoint in ['meters', 'samples', 'resources']:
response = self.app.get(self.PATH_PREFIX + '/' + endpoint,
status=410)

View File

@ -30,16 +30,13 @@ class TestDispatchManager(base.BaseTestCase):
super(TestDispatchManager, self).setUp()
conf = service.prepare_service([], [])
self.conf = self.useFixture(fixture.Config(conf))
self.conf.config(meter_dispatchers=['database', 'gnocchi'],
self.conf.config(meter_dispatchers=['database'],
event_dispatchers=['database'])
self.CONF = self.conf.conf
self.useFixture(fixtures.MockPatch(
'ceilometer.dispatcher.gnocchi.GnocchiDispatcher',
new=FakeMeterDispatcher))
self.useFixture(fixtures.MockPatch(
'ceilometer.dispatcher.database.MeterDatabaseDispatcher',
new=FakeMeterDispatcher))
def test_load(self):
sample_mg, event_mg = dispatcher.load_dispatcher_manager(self.CONF)
self.assertEqual(2, len(list(sample_mg)))
self.assertEqual(1, len(list(sample_mg)))

View File

@ -23,22 +23,24 @@ import mock
from oslo_config import fixture as config_fixture
from oslo_utils import fileutils
from oslo_utils import fixture as utils_fixture
from oslo_utils import netutils
from oslo_utils import timeutils
import requests
import six
from stevedore import extension
import testscenarios
from ceilometer.dispatcher import gnocchi
from ceilometer.publisher import utils
from ceilometer.event.storage import models
from ceilometer.publisher import gnocchi
from ceilometer import sample
from ceilometer import service as ceilometer_service
from ceilometer.tests import base
load_tests = testscenarios.load_tests_apply_scenarios
INSTANCE_DELETE_START = {
'event_type': u'compute.instance.delete.start',
'traits': [
INSTANCE_DELETE_START = models.Event(
event_type=u'compute.instance.delete.start',
traits=[
[
'state',
1,
@ -120,16 +122,14 @@ INSTANCE_DELETE_START = {
'2012-05-08T20:23:47'
]
],
'message_signature':
'831719d54059734f82e7d6498c6d7a8fd637568732e79c1fd375e128f142373a',
'raw': {},
'generated': '2012-05-08T20:24:14.824743',
'message_id': u'a15b94ee-cb8e-4c71-9abe-14aa80055fb4'
}
raw={},
generated='2012-05-08T20:24:14.824743',
message_id=u'a15b94ee-cb8e-4c71-9abe-14aa80055fb4',
)
IMAGE_DELETE_START = {
u'event_type': u'image.delete',
u'traits': [
IMAGE_DELETE_START = models.Event(
event_type=u'image.delete',
traits=[
[
u'status',
1,
@ -176,17 +176,15 @@ IMAGE_DELETE_START = {
u'13287936'
]
],
u'message_signature':
u'46fb4958e911f45007a3bb5934c5de7610892a6d742a4900695fd5929cd4c9b3',
u'raw': {},
u'generated': u'2016-11-04T04:25:56.493820',
u'message_id': u'7f5280f7-1d10-46a5-ba58-4d5508e49f99'
}
raw={},
generated=u'2016-11-04T04:25:56.493820',
message_id=u'7f5280f7-1d10-46a5-ba58-4d5508e49f99'
)
VOLUME_DELETE_START = {
'event_type': u'volume.delete.start',
'traits': [
VOLUME_DELETE_START = models.Event(
event_type=u'volume.delete.start',
traits=[
[
u'availability_zone',
1,
@ -258,16 +256,14 @@ VOLUME_DELETE_START = {
u'819bbd28f5374506b8502521c89430b5'
]
],
'message_signature':
'831719d54059734f82e7d6498c6d7a8fd637568732e79c1fd375e128f142373a',
'raw': {},
'generated': '2016-11-28T13:42:15.484674',
'message_id': u'a15b94ee-cb8e-4c71-9abe-14aa80055fb4'
}
raw={},
generated='2016-11-28T13:42:15.484674',
message_id=u'a15b94ee-cb8e-4c71-9abe-14aa80055fb4',
)
FLOATINGIP_DELETE_END = {
'event_type': u'floatingip.delete.end',
'traits': [
FLOATINGIP_DELETE_END = models.Event(
event_type=u'floatingip.delete.end',
traits=[
[
u'project_id',
1,
@ -299,63 +295,54 @@ FLOATINGIP_DELETE_END = {
u'819bbd28f5374506b8502521c89430b5'
]
],
'message_signature':
'831719d54059734f82e7d6498c6d7a8fd637568732e79c1fd375e128f142373a',
'raw': {},
'generated': '2016-11-29T09:25:55.474710',
'message_id': u'a15b94ee-cb8e-4c71-9abe-14aa80055fb4'
}
raw={},
generated='2016-11-29T09:25:55.474710',
message_id=u'a15b94ee-cb8e-4c71-9abe-14aa80055fb4'
)
class DispatcherTest(base.BaseTestCase):
class PublisherTest(base.BaseTestCase):
def setUp(self):
super(DispatcherTest, self).setUp()
super(PublisherTest, self).setUp()
conf = ceilometer_service.prepare_service(argv=[], config_files=[])
self.conf = self.useFixture(config_fixture.Config(conf))
self.conf.config(
resources_definition_file=self.path_get(
'etc/ceilometer/gnocchi_resources.yaml'),
group="dispatcher_gnocchi"
)
self.resource_id = str(uuid.uuid4())
self.samples = [{
'counter_name': 'disk.root.size',
'counter_unit': 'GB',
'counter_type': 'gauge',
'counter_volume': '2',
'user_id': 'test_user',
'project_id': 'test_project',
'source': 'openstack',
'timestamp': '2012-05-08 20:23:48.028195',
'resource_id': self.resource_id,
'resource_metadata': {
self.samples = [sample.Sample(
name='disk.root.size',
unit='GB',
type=sample.TYPE_GAUGE,
volume=2,
user_id='test_user',
project_id='test_project',
source='openstack',
timestamp='2012-05-08 20:23:48.028195',
resource_id=self.resource_id,
resource_metadata={
'host': 'foo',
'image_ref': 'imageref!',
'instance_flavor_id': 1234,
'display_name': 'myinstance',
}
},
{
'counter_name': 'disk.root.size',
'counter_unit': 'GB',
'counter_type': 'gauge',
'counter_volume': '2',
'user_id': 'test_user',
'project_id': 'test_project',
'source': 'openstack',
'timestamp': '2014-05-08 20:23:48.028195',
'resource_id': self.resource_id,
'resource_metadata': {
),
sample.Sample(
name='disk.root.size',
unit='GB',
type=sample.TYPE_GAUGE,
volume=2,
user_id='test_user',
project_id='test_project',
source='openstack',
timestamp='2014-05-08 20:23:48.028195',
resource_id=self.resource_id,
resource_metadata={
'host': 'foo',
'image_ref': 'imageref!',
'instance_flavor_id': 1234,
'display_name': 'myinstance',
}
}]
for sample in self.samples:
sample['message_signature'] = utils.compute_signature(
sample, self.conf.conf.publisher.telemetry_secret)
},
),
]
ks_client = mock.Mock(auth_token='fake_token')
ks_client.projects.find.return_value = mock.Mock(
@ -364,12 +351,10 @@ class DispatcherTest(base.BaseTestCase):
'ceilometer.keystone_client.get_client',
return_value=ks_client))
self.ks_client = ks_client
self.conf.conf.dispatcher_gnocchi.filter_service_activity = True
def test_config_load(self):
self.conf.config(filter_service_activity=False,
group='dispatcher_gnocchi')
d = gnocchi.GnocchiDispatcher(self.conf.conf)
url = netutils.urlsplit("gnocchi://")
d = gnocchi.GnocchiPublisher(self.conf.conf, url)
names = [rd.cfg['resource_type'] for rd in d.resources_definition]
self.assertIn('instance', names)
self.assertIn('volume', names)
@ -388,13 +373,12 @@ class DispatcherTest(base.BaseTestCase):
plugin_manager = extension.ExtensionManager(
namespace='ceilometer.event.trait.trait_plugin')
rd = gnocchi.ResourcesDefinition(
resource, self.conf.conf.dispatcher_gnocchi.archive_policy,
plugin_manager)
resource, "low", plugin_manager)
operation = rd.event_match("image.delete")
self.assertEqual('delete', operation)
self.assertEqual(True, rd.metric_match('image'))
@mock.patch('ceilometer.dispatcher.gnocchi.LOG')
@mock.patch('ceilometer.publisher.gnocchi.LOG')
def test_broken_config_load(self, mylog):
contents = [("---\n"
"resources:\n"
@ -419,63 +403,63 @@ class DispatcherTest(base.BaseTestCase):
prefix='gnocchi_resources',
suffix='.yaml')
self.addCleanup(os.remove, temp)
self.conf.config(filter_service_activity=False,
resources_definition_file=temp,
group='dispatcher_gnocchi')
d = gnocchi.GnocchiDispatcher(self.conf.conf)
url = netutils.urlsplit(
"gnocchi://?resources_definition_file=" + temp)
d = gnocchi.GnocchiPublisher(self.conf.conf, url)
self.assertTrue(mylog.error.called)
self.assertEqual(0, len(d.resources_definition))
@mock.patch('ceilometer.dispatcher.gnocchi.GnocchiDispatcher'
@mock.patch('ceilometer.publisher.gnocchi.GnocchiPublisher'
'._if_not_cached')
@mock.patch('ceilometer.dispatcher.gnocchi.GnocchiDispatcher'
@mock.patch('ceilometer.publisher.gnocchi.GnocchiPublisher'
'.batch_measures')
def _do_test_activity_filter(self, expected_measures, fake_batch, __):
d = gnocchi.GnocchiDispatcher(self.conf.conf)
d.record_metering_data(self.samples)
url = netutils.urlsplit("gnocchi://")
d = gnocchi.GnocchiPublisher(self.conf.conf, url)
d.publish_samples(self.samples)
fake_batch.assert_called_with(
mock.ANY, mock.ANY,
{'metrics': 1, 'resources': 1, 'measures': expected_measures})
def test_activity_filter_match_project_id(self):
self.samples[0]['project_id'] = (
self.samples[0].project_id = (
'a2d42c23-d518-46b6-96ab-3fba2e146859')
self._do_test_activity_filter(1)
@mock.patch('ceilometer.dispatcher.gnocchi.LOG')
@mock.patch('ceilometer.publisher.gnocchi.LOG')
def test_activity_gnocchi_project_not_found(self, logger):
self.ks_client.projects.find.side_effect = ka_exceptions.NotFound
self._do_test_activity_filter(2)
logger.warning.assert_called_with('gnocchi project not found in '
logger.warning.assert_called_with('filtered project not found in '
'keystone, ignoring the '
'filter_service_activity option')
'filter_project option')
def test_activity_filter_match_swift_event(self):
self.samples[0]['counter_name'] = 'storage.api.request'
self.samples[0]['resource_id'] = 'a2d42c23-d518-46b6-96ab-3fba2e146859'
self.samples[0].name = 'storage.api.request'
self.samples[0].resource_id = 'a2d42c23-d518-46b6-96ab-3fba2e146859'
self._do_test_activity_filter(1)
def test_activity_filter_nomatch(self):
self._do_test_activity_filter(2)
@mock.patch('ceilometer.dispatcher.gnocchi.GnocchiDispatcher'
@mock.patch('ceilometer.publisher.gnocchi.GnocchiPublisher'
'.batch_measures')
def test_unhandled_meter(self, fake_batch):
samples = [{
'counter_name': 'unknown.meter',
'counter_unit': 'GB',
'counter_type': 'gauge',
'counter_volume': '2',
'user_id': 'test_user',
'project_id': 'test_project',
'source': 'openstack',
'timestamp': '2014-05-08 20:23:48.028195',
'resource_id': 'randomid',
'resource_metadata': {}
}]
d = gnocchi.GnocchiDispatcher(self.conf.conf)
d.record_metering_data(samples)
samples = [sample.Sample(
name='unknown.meter',
unit='GB',
type=sample.TYPE_GAUGE,
volume=2,
user_id='test_user',
project_id='test_project',
source='openstack',
timestamp='2014-05-08 20:23:48.028195',
resource_id='randomid',
resource_metadata={}
)]
url = netutils.urlsplit("gnocchi://")
d = gnocchi.GnocchiPublisher(self.conf.conf, url)
d.publish_samples(samples)
self.assertEqual(0, len(fake_batch.call_args[0][1]))
@ -491,30 +475,31 @@ class MockResponse(mock.NonCallableMock):
text=text)
class DispatcherWorkflowTest(base.BaseTestCase,
testscenarios.TestWithScenarios):
class PublisherWorkflowTest(base.BaseTestCase,
testscenarios.TestWithScenarios):
sample_scenarios = [
('disk.root.size', dict(
sample={
'counter_name': 'disk.root.size',
'counter_unit': 'GB',
'counter_type': 'gauge',
'counter_volume': '2',
'user_id': 'test_user',
'project_id': 'test_project',
'source': 'openstack',
'timestamp': '2012-05-08 20:23:48.028195',
'resource_metadata': {
sample=sample.Sample(
resource_id=str(uuid.uuid4()) + "_foobar",
name='disk.root.size',
unit='GB',
type=sample.TYPE_GAUGE,
volume=2,
user_id='test_user',
project_id='test_project',
source='openstack',
timestamp='2012-05-08 20:23:48.028195',
resource_metadata={
'host': 'foo',
'image_ref': 'imageref!',
'instance_flavor_id': 1234,
'display_name': 'myinstance',
}
},
},
),
measures_attributes=[{
'timestamp': '2012-05-08 20:23:48.028195',
'value': '2'
'value': 2
}],
postable_attributes={
'user_id': 'test_user',
@ -542,22 +527,23 @@ class DispatcherWorkflowTest(base.BaseTestCase,
'compute.instance.booting.time'],
resource_type='instance')),
('hardware.ipmi.node.power', dict(
sample={
'counter_name': 'hardware.ipmi.node.power',
'counter_unit': 'W',
'counter_type': 'gauge',
'counter_volume': '2',
'user_id': 'test_user',
'project_id': 'test_project',
'source': 'openstack',
'timestamp': '2012-05-08 20:23:48.028195',
'resource_metadata': {
sample=sample.Sample(
resource_id=str(uuid.uuid4()) + "_foobar",
name='hardware.ipmi.node.power',
unit='W',
type=sample.TYPE_GAUGE,
volume=2,
user_id='test_user',
project_id='test_project',
source='openstack',
timestamp='2012-05-08 20:23:48.028195',
resource_metadata={
'useless': 'not_used',
}
},
},
),
measures_attributes=[{
'timestamp': '2012-05-08 20:23:48.028195',
'value': '2'
'value': 2
}],
postable_attributes={
'user_id': 'test_user',
@ -608,7 +594,7 @@ class DispatcherWorkflowTest(base.BaseTestCase,
workflow_scenarios)
def setUp(self):
super(DispatcherWorkflowTest, self).setUp()
super(PublisherWorkflowTest, self).setUp()
conf = ceilometer_service.prepare_service(argv=[], config_files=[])
self.conf = self.useFixture(config_fixture.Config(conf))
ks_client = mock.Mock()
@ -619,19 +605,10 @@ class DispatcherWorkflowTest(base.BaseTestCase,
return_value=ks_client))
self.ks_client = ks_client
self.conf.config(
resources_definition_file=self.path_get(
'etc/ceilometer/gnocchi_resources.yaml'),
group="dispatcher_gnocchi"
)
self.sample['resource_id'] = str(uuid.uuid4()) + "_foobar"
self.sample['message_signature'] = utils.compute_signature(
self.sample, self.conf.conf.publisher.telemetry_secret)
@mock.patch('gnocchiclient.v1.client.Client')
def test_event_workflow(self, fakeclient_cls):
self.dispatcher = gnocchi.GnocchiDispatcher(self.conf.conf)
url = netutils.urlsplit("gnocchi://")
self.publisher = gnocchi.GnocchiPublisher(self.conf.conf, url)
fakeclient = fakeclient_cls.return_value
@ -677,23 +654,24 @@ class DispatcherWorkflowTest(base.BaseTestCase,
{'ended_at': now.isoformat()})
]
self.dispatcher.record_events([INSTANCE_DELETE_START,
IMAGE_DELETE_START,
VOLUME_DELETE_START,
FLOATINGIP_DELETE_END])
self.publisher.publish_events([INSTANCE_DELETE_START,
IMAGE_DELETE_START,
VOLUME_DELETE_START,
FLOATINGIP_DELETE_END])
self.assertEqual(8, len(fakeclient.mock_calls))
for call in expected_calls:
self.assertIn(call, fakeclient.mock_calls)
@mock.patch('ceilometer.dispatcher.gnocchi.LOG')
@mock.patch('ceilometer.publisher.gnocchi.LOG')
@mock.patch('gnocchiclient.v1.client.Client')
def test_workflow(self, fakeclient_cls, logger):
self.dispatcher = gnocchi.GnocchiDispatcher(self.conf.conf)
url = netutils.urlsplit("gnocchi://")
self.publisher = gnocchi.GnocchiPublisher(self.conf.conf, url)
fakeclient = fakeclient_cls.return_value
resource_id = self.sample['resource_id'].replace("/", "_")
metric_name = self.sample['counter_name']
resource_id = self.sample.resource_id.replace("/", "_")
metric_name = self.sample.name
gnocchi_id = uuid.uuid4()
expected_calls = [
@ -702,7 +680,7 @@ class DispatcherWorkflowTest(base.BaseTestCase,
create_metrics=True)
]
expected_debug = [
mock.call('gnocchi project found: %s',
mock.call('filtered project found: %s',
'a2d42c23-d518-46b6-96ab-3fba2e146859'),
]
@ -720,7 +698,7 @@ class DispatcherWorkflowTest(base.BaseTestCase,
attributes = self.postable_attributes.copy()
attributes.update(self.patchable_attributes)
attributes['id'] = self.sample['resource_id']
attributes['id'] = self.sample.resource_id
attributes['metrics'] = dict((metric_name, {})
for metric_name in self.metric_names)
for k, v in six.iteritems(attributes['metrics']):
@ -740,7 +718,7 @@ class DispatcherWorkflowTest(base.BaseTestCase,
gnocchi_exc.ResourceAlreadyExists(409)]
else: # not resource_exists
expected_debug.append(mock.call(
'Resource %s created', self.sample['resource_id']))
'Resource %s created', self.sample.resource_id))
if not self.create_resource_fail:
expected_calls.append(
@ -774,12 +752,12 @@ class DispatcherWorkflowTest(base.BaseTestCase,
fakeclient.resource.update.side_effect = [Exception('boom!')]
else:
expected_debug.append(mock.call(
'Resource %s updated', self.sample['resource_id']))
'Resource %s updated', self.sample.resource_id))
batch = fakeclient.metric.batch_resources_metrics_measures
batch.side_effect = batch_side_effect
self.dispatcher.record_metering_data([self.sample])
self.publisher.publish_samples([self.sample])
# Check that the last log message is the expected one
if (self.post_measure_fail
@ -792,4 +770,4 @@ class DispatcherWorkflowTest(base.BaseTestCase,
self.assertEqual(expected_calls, fakeclient.mock_calls)
self.assertEqual(expected_debug, logger.debug.mock_calls)
DispatcherWorkflowTest.generate_scenarios()
PublisherWorkflowTest.generate_scenarios()

View File

@ -299,13 +299,7 @@ function _ceilometer_configure_storage_backend {
# NOTE(gordc): set batching to better handle recording on a slow machine
iniset $CEILOMETER_CONF collector batch_size 50
iniset $CEILOMETER_CONF collector batch_timeout 5
iniset $CEILOMETER_CONF dispatcher_gnocchi archive_policy ${GNOCCHI_ARCHIVE_POLICY}
if is_service_enabled swift && [[ "$GNOCCHI_STORAGE_BACKEND" = 'swift' ]] ; then
iniset $CEILOMETER_CONF dispatcher_gnocchi filter_service_activity "True"
iniset $CEILOMETER_CONF dispatcher_gnocchi filter_project "gnocchi_swift"
else
iniset $CEILOMETER_CONF dispatcher_gnocchi filter_service_activity "False"
fi
sed -i "s/gnocchi:\/\//gnocchi:\/\/?archive_policy=${GNOCCHI_ARCHIVE_POLICY}\&filter_project=gnocchi_swift/" $CEILOMETER_CONF_DIR/event_pipeline.yaml $CEILOMETER_CONF_DIR/pipeline.yaml
else
die $LINENO "Unable to configure unknown CEILOMETER_BACKEND $CEILOMETER_BACKEND"
fi

View File

@ -92,6 +92,10 @@ configuration file ``pipeline.yaml`` and/or ``event_pipeline.yaml`` which is
normally located at /etc/ceilometer directory and make changes accordingly.
Your configuration file can be in a different directory.
For the Gnocchi publisher, the archive policy can be defined as a configuration
settings. The value specified for ``archive_policy`` should correspond to the
name of an ``archive_policy`` configured within Gnocchi.
To use multiple publishers, add multiple publisher lines in ``pipeline.yaml`` and/or
``event_pipeline.yaml`` file like the following::
@ -107,17 +111,9 @@ To use multiple publishers, add multiple publisher lines in ``pipeline.yaml`` an
transformers:
publishers:
- database://
- gnocchi://
- gnocchi://?archive_policy=low
- file://
For the Gnocchi publisher, the following configuration settings should be added
into /etc/ceilometer/ceilometer.conf::
[dispatcher_gnocchi]
archive_policy = low
The value specified for ``archive_policy`` should correspond to the name of an
``archive_policy`` configured within Gnocchi.
For the Gnocchi publisher backed by Swift storage, the following additional
configuration settings should be added::

View File

@ -39,10 +39,6 @@ Gnocchi
* With Keystone authentication enabled::
[dispatcher_gnocchi]
filter_service_activity = False # Enable if using swift backend
filter_project = <project name associated with gnocchi user> # if using swift backend
[service_credentials]
auth_url = <auth_url>:5000
region_name = RegionOne
@ -58,9 +54,7 @@ Gnocchi
authentication doesn't matter. This will increase the performance of
Gnocchi::
[dispatcher_gnocchi]
filter_service_activity = False # Enable if using swift backend
filter_project = <project name associated with gnocchi user> # if using swift backend
[gnocchi]
auth_section=service_credentials_gnocchi
[service_credentials_gnocchi]

View File

@ -1,15 +1,15 @@
2. Edit the ``/etc/ceilometer/ceilometer.conf`` file and complete
2. Edit the ``/etc/ceilometer/pipeline.yaml`` file and complete
the following actions:
* Configure Gnocchi connection:
.. code-block:: ini
.. code-block:: yaml
[dispatcher_gnocchi]
# filter out Gnocchi-related activity meters (Swift driver)
filter_service_activity = False
# default metric storage archival policy
archive_policy = low
publishers:
# set address of Gnocchi
# + filter out Gnocchi-related activity meters (Swift driver)
# + set default archive policy
- gnocchi://?filter_project=service&archive_policy=low
* In the ``[DEFAULT]`` section,
configure ``RabbitMQ`` message queue access:

View File

@ -0,0 +1,8 @@
---
upgrade:
- |
The Gnocchi dispatcher has been removed and replaced by a native Gnocchi
publisher. The configuration options from the `[dispatcher_gnocchi]` has
been removed and should be passed via the URL in `pipeline.yaml`. The
service authentication override can be done by adding specific credentials
to a `[gnocchi]` section instead.

View File

@ -247,7 +247,7 @@ ceilometer.sample.publisher =
direct = ceilometer.publisher.direct:DirectPublisher
http = ceilometer.publisher.http:HttpPublisher
https = ceilometer.publisher.http:HttpPublisher
gnocchi = ceilometer.publisher.direct:DirectPublisher
gnocchi = ceilometer.publisher.gnocchi:GnocchiPublisher
database = ceilometer.publisher.direct:DirectPublisher
file_alt = ceilometer.publisher.direct:DirectPublisher
http_alt = ceilometer.publisher.direct:DirectPublisher
@ -259,7 +259,7 @@ ceilometer.event.publisher =
notifier = ceilometer.publisher.messaging:EventNotifierPublisher
http = ceilometer.publisher.http:HttpPublisher
https = ceilometer.publisher.http:HttpPublisher
gnocchi = ceilometer.publisher.direct:DirectPublisher
gnocchi = ceilometer.publisher.gnocchi:GnocchiPublisher
database = ceilometer.publisher.direct:DirectPublisher
file_alt = ceilometer.publisher.direct:DirectPublisher
http_alt = ceilometer.publisher.direct:DirectPublisher
@ -285,10 +285,6 @@ console_scripts =
ceilometer.dispatcher.meter =
database = ceilometer.dispatcher.database:MeterDatabaseDispatcher
gnocchi = ceilometer.dispatcher.gnocchi:GnocchiDispatcher
ceilometer.dispatcher.event =
gnocchi = ceilometer.dispatcher.gnocchi:GnocchiDispatcher
network.statistics.drivers =
opendaylight = ceilometer.network.statistics.opendaylight.driver:OpenDayLightDriver

View File

@ -37,7 +37,7 @@ from oslo_config import cfg
from oslo_db import options as db_options
from oslo_log import log
from ceilometer.dispatcher import gnocchi
from ceilometer.publisher import gnocchi
from ceilometer import service
from ceilometer import storage
from ceilometer.storage import impl_mongodb
@ -60,7 +60,7 @@ def get_parser():
parser.add_argument(
'--ceilometer-config-file',
help="The config file of ceilometer, it is main used for gnocchi "
"dispatcher to init gnocchiclient with the service credentials "
"publisher to init gnocchiclient with the service credentials "
"defined in the ceilometer config file. Default as "
"/etc/ceilometer/ceilometer.conf",
)
@ -144,7 +144,7 @@ def main():
if args.end_timestamp:
time_filters.append({"<": {'timestamp': args.end_timestamp}})
gnocchi_dispatcher = gnocchi.GnocchiDispatcher(gnocchi_conf)
gnocchi_publisher = gnocchi.GnocchiPublisher(gnocchi_conf, "gnocchi://")
batch_size = args.batch_migration_size
if total_amount == 'Unknown':
@ -181,7 +181,7 @@ def main():
sample.counter_name,
sample.resource_id))
samples_dict = [sample.as_dict() for sample in samples]
gnocchi_dispatcher.record_metering_data(samples_dict)
gnocchi_publisher.publish_samples(samples_dict)
length = len(samples)
migrated_amount += length
if pbar: