Gnocchi Dispatcher support in Ceilometer

Moving dispatcher code from gnocchi to ceilometer.

Change-Id: Iaa9c31a4330b1bf40617afbab8be152a4103fca0
This commit is contained in:
Pradeep Kilambi 2015-04-28 10:04:39 -04:00
parent baca6ebcdc
commit a61b6dfa80
15 changed files with 1136 additions and 0 deletions

View File

@ -0,0 +1,394 @@
#
# Copyright 2014 eNovance
#
# Authors: Julien Danjou <julien@danjou.info>
# Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fnmatch
import threading
import itertools
import json
import operator
import os
import yaml
from ceilometer import dispatcher
from ceilometer.i18n import _
from ceilometer import keystone_client
from oslo_config import cfg
from oslo_log import log
import requests
import six
import stevedore.dispatch
LOG = log.getLogger(__name__)
dispatcher_opts = [
cfg.BoolOpt('filter_service_activity',
default=True,
help='Filter out samples generated by Gnocchi '
'service activity'),
cfg.StrOpt('filter_project',
default='gnocchi',
help='Gnocchi project used to filter out samples '
'generated by Gnocchi service activity'),
cfg.StrOpt('url',
default="http://localhost:8041",
help='URL to Gnocchi.'),
cfg.StrOpt('archive_policy',
default="low",
help='The archive policy to use when the dispatcher '
'create a new metric.'),
cfg.StrOpt('archive_policy_file',
default='gnocchi_archive_policy_map.yaml',
help=_('The Yaml file that defines per metric archive '
'policies.')),
]
cfg.CONF.register_opts(dispatcher_opts, group="dispatcher_gnocchi")
class UnexpectedWorkflowError(Exception):
pass
class NoSuchMetric(Exception):
pass
class MetricAlreadyExists(Exception):
pass
class NoSuchResource(Exception):
pass
class ResourceAlreadyExists(Exception):
pass
def log_and_ignore_unexpected_workflow_error(func):
def log_and_ignore(self, *args, **kwargs):
try:
func(self, *args, **kwargs)
except requests.ConnectionError as e:
with self._gnocchi_api_lock:
self._gnocchi_api = None
LOG.warn("Connection error, reconnecting...")
except UnexpectedWorkflowError as e:
LOG.error(six.text_type(e))
return log_and_ignore
class GnocchiDispatcher(dispatcher.Base):
def __init__(self, conf):
super(GnocchiDispatcher, self).__init__(conf)
self.conf = conf
self.filter_service_activity = (
conf.dispatcher_gnocchi.filter_service_activity)
self._ks_client = keystone_client.get_client()
self.gnocchi_url = conf.dispatcher_gnocchi.url
self.gnocchi_archive_policy_default = (
conf.dispatcher_gnocchi.archive_policy)
self.gnocchi_archive_policy_data = self._load_archive_policy(conf)
self.mgmr = stevedore.dispatch.DispatchExtensionManager(
'ceilometer.dispatcher.resource', lambda x: True,
invoke_on_load=True)
self._gnocchi_project_id = None
self._gnocchi_project_id_lock = threading.Lock()
self._gnocchi_api = None
self._gnocchi_api_lock = threading.Lock()
def _get_headers(self, content_type="application/json"):
return {
'Content-Type': content_type,
'X-Auth-Token': self._ks_client.auth_token,
}
def _load_archive_policy(self, conf):
policy_config_file = self._get_config_file(conf)
data = {}
if policy_config_file is not None:
with open(policy_config_file) as data_file:
try:
data = yaml.safe_load(data_file)
except ValueError:
data = {}
return data
def get_archive_policy(self, metric_name):
archive_policy = {}
if self.gnocchi_archive_policy_data is not None:
policy_match = self._match_metric(metric_name)
archive_policy['archive_policy_name'] = (
policy_match or self.gnocchi_archive_policy_default)
else:
LOG.debug(_("No archive policy file found!"
" Using default config."))
archive_policy['archive_policy_name'] = (
self.gnocchi_archive_policy_default)
return archive_policy
@staticmethod
def _get_config_file(conf):
config_file = conf.dispatcher_gnocchi.archive_policy_file
if not os.path.exists(config_file):
config_file = cfg.CONF.find_file(config_file)
return config_file
def _match_metric(self, metric_name):
for metric, policy in self.gnocchi_archive_policy_data.items():
# Support wild cards such as disk.*
if fnmatch.fnmatch(metric_name, metric):
return policy
@property
def gnocchi_project_id(self):
if self._gnocchi_project_id is not None:
return self._gnocchi_project_id
with self._gnocchi_project_id_lock:
if self._gnocchi_project_id is None:
try:
project = self._ks_client.tenants.find(
name=self.conf.dispatcher_gnocchi.filter_project)
except Exception:
LOG.exception('fail to retreive user of Gnocchi service')
raise
self._gnocchi_project_id = project.id
LOG.debug("gnocchi project found: %s" %
self.gnocchi_project_id)
return self._gnocchi_project_id
@property
def gnocchi_api(self):
"""return a working requests session object"""
if self._gnocchi_api is not None:
return self._gnocchi_api
with self._gnocchi_api_lock:
if self._gnocchi_api is None:
self._gnocchi_api = requests.session()
# NOTE(sileht): wait when the pool is empty
# instead of raising errors.
adapter = requests.adapters.HTTPAdapter(pool_block=True)
self._gnocchi_api.mount("http://", adapter)
self._gnocchi_api.mount("https://", adapter)
return self._gnocchi_api
def _is_gnocchi_activity(self, sample):
return (self.filter_service_activity and (
# avoid anything from the user used by gnocchi
sample['project_id'] == self.gnocchi_project_id or
# avoid anything in the swift account used by gnocchi
(sample['resource_id'] == self.gnocchi_project_id and
sample['counter_name'] in
self.mgmr['swift_account'].obj.get_metrics_names())
))
def record_metering_data(self, data):
# NOTE(sileht): skip sample generated by gnocchi itself
data = [s for s in data if not self._is_gnocchi_activity(s)]
# FIXME(sileht): This method bulk the processing of samples
# grouped by resource_id and metric_name but this is not
# efficient yet because the data received here doesn't often
# contains a lot of different kind of samples
# So perhaps the next step will be to pool the received data from
# message bus.
resource_grouped_samples = itertools.groupby(
data, key=operator.itemgetter('resource_id'))
for resource_id, samples_of_resource in resource_grouped_samples:
resource_need_to_be_updated = True
metric_grouped_samples = itertools.groupby(
list(samples_of_resource),
key=operator.itemgetter('counter_name'))
for metric_name, samples in metric_grouped_samples:
for ext in self.mgmr:
if metric_name in ext.obj.get_metrics_names():
self._process_samples(
ext, resource_id, metric_name, list(samples),
resource_need_to_be_updated)
# FIXME(sileht): Does it reasonable to skip the resource
# update here ? Does differents kind of counter_name
# can have different metadata set ?
# (ie: one have only flavor_id, and an other one have only
# image_ref ?)
#
# resource_need_to_be_updated = False
@log_and_ignore_unexpected_workflow_error
def _process_samples(self, ext, resource_id, metric_name, samples,
resource_need_to_be_updated):
resource_type = ext.name
measure_attributes = [{'timestamp': sample['timestamp'],
'value': sample['counter_volume']}
for sample in samples]
try:
self._post_measure(resource_type, resource_id, metric_name,
measure_attributes)
except NoSuchMetric:
# NOTE(sileht): we try first to create the resource, because
# they more chance that the resource doesn't exists than the metric
# is missing, the should be reduce the number of resource API call
resource_attributes = self._get_resource_attributes(
ext, resource_id, metric_name, samples)
try:
self._create_resource(resource_type, resource_id,
resource_attributes)
except ResourceAlreadyExists:
try:
self._create_metric(resource_type, resource_id,
metric_name)
except MetricAlreadyExists:
# NOTE(sileht): Just ignore the metric have been created in
# the meantime.
pass
else:
# No need to update it we just created it
# with everything we need
resource_need_to_be_updated = False
# NOTE(sileht): we retry to post the measure but if it fail we
# don't catch the exception to just log it and continue to process
# other samples
self._post_measure(resource_type, resource_id, metric_name,
measure_attributes)
if resource_need_to_be_updated:
resource_attributes = self._get_resource_attributes(
ext, resource_id, metric_name, samples, for_update=True)
self._update_resource(resource_type, resource_id,
resource_attributes)
def _get_resource_attributes(self, ext, resource_id, metric_name, samples,
for_update=False):
# FIXME(sileht): Should I merge attibutes of all samples ?
# Or keep only the last one is sufficient ?
attributes = ext.obj.get_resource_extra_attributes(
samples[-1])
if not for_update:
attributes["id"] = resource_id
attributes["user_id"] = samples[-1]['user_id']
attributes["project_id"] = samples[-1]['project_id']
attributes["metrics"] = dict(
(metric_name, self.get_archive_policy(metric_name))
for metric_name in ext.obj.get_metrics_names()
)
return attributes
def _post_measure(self, resource_type, resource_id, metric_name,
measure_attributes):
r = self.gnocchi_api.post("%s/v1/resource/%s/%s/metric/%s/measures"
% (self.gnocchi_url, resource_type,
resource_id, metric_name),
headers=self._get_headers(),
data=json.dumps(measure_attributes))
if r.status_code == 404:
LOG.debug(_("The metric %(metric_name)s of "
"resource %(resource_id)s doesn't exists: "
"%(status_code)d"),
{'metric_name': metric_name,
'resource_id': resource_id,
'status_code': r.status_code})
raise NoSuchMetric
elif int(r.status_code / 100) != 2:
raise UnexpectedWorkflowError(
_("Fail to post measure on metric %(metric_name)s of "
"resource %(resource_id)s with status: "
"%(status_code)d: %(msg)s") %
{'metric_name': metric_name,
'resource_id': resource_id,
'status_code': r.status_code,
'msg': r.text})
else:
LOG.debug("Measure posted on metric %s of resource %s",
metric_name, resource_id)
def _create_resource(self, resource_type, resource_id,
resource_attributes):
r = self.gnocchi_api.post("%s/v1/resource/%s"
% (self.gnocchi_url, resource_type),
headers=self._get_headers(),
data=json.dumps(resource_attributes))
if r.status_code == 409:
LOG.debug("Resource %s already exists", resource_id)
raise ResourceAlreadyExists
elif int(r.status_code / 100) != 2:
raise UnexpectedWorkflowError(
_("Resource %(resource_id)s creation failed with "
"status: %(status_code)d: %(msg)s") %
{'resource_id': resource_id,
'status_code': r.status_code,
'msg': r.text})
else:
LOG.debug("Resource %s created", resource_id)
def _update_resource(self, resource_type, resource_id,
resource_attributes):
r = self.gnocchi_api.patch(
"%s/v1/resource/%s/%s"
% (self.gnocchi_url, resource_type, resource_id),
headers=self._get_headers(),
data=json.dumps(resource_attributes))
if int(r.status_code / 100) != 2:
raise UnexpectedWorkflowError(
_("Resource %(resource_id)s update failed with "
"status: %(status_code)d: %(msg)s") %
{'resource_id': resource_id,
'status_code': r.status_code,
'msg': r.text})
else:
LOG.debug("Resource %s updated", resource_id)
def _create_metric(self, resource_type, resource_id, metric_name):
params = {metric_name: self.get_archive_policy(metric_name)}
r = self.gnocchi_api.post("%s/v1/resource/%s/%s/metric"
% (self.gnocchi_url, resource_type,
resource_id),
headers=self._get_headers(),
data=json.dumps(params))
if r.status_code == 409:
LOG.debug("Metric %s of resource %s already exists",
metric_name, resource_id)
raise MetricAlreadyExists
elif int(r.status_code / 100) != 2:
raise UnexpectedWorkflowError(
_("Fail to create metric %(metric_name)s of "
"resource %(resource_id)s with status: "
"%(status_code)d: %(msg)s") %
{'metric_name': metric_name,
'resource_id': resource_id,
'status_code': r.status_code,
'msg': r.text})
else:
LOG.debug("Metric %s of resource %s created",
metric_name, resource_id)
@staticmethod
def record_events(events):
raise NotImplementedError

View File

@ -0,0 +1,40 @@
#
# Copyright 2014 eNovance
#
# Authors: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class ResourceBase(object):
"""Base class for resource."""
@abc.abstractmethod
def get_resource_extra_attributes(self, sample):
"""Extract the metadata from a ceilometer sample.
:param sample: The ceilometer sample
:returns: the resource attributes
"""
@abc.abstractmethod
def get_metrics_names(self):
"""Return the metric handled by this resource.
:returns: list of metric names
"""

View File

@ -0,0 +1,31 @@
#
# Copyright 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer.dispatcher.resources import base
class CephAccount(base.ResourceBase):
@staticmethod
def get_resource_extra_attributes(sample):
return {}
@staticmethod
def get_metrics_names():
return ['radosgw.api.request',
'radosgw.objects.size',
'radosgw.objects',
'radosgw.objects.containers',
'radosgw.containers.objects',
'radosgw.containers.objects.size',
]

View File

@ -0,0 +1,44 @@
#
# Copyright 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer.dispatcher.resources import base
class Identity(base.ResourceBase):
@staticmethod
def get_resource_extra_attributes(sample):
return {}
@staticmethod
def get_metrics_names():
return ['identity.authenticate.success',
'identity.authenticate.pending',
'identity.authenticate.failure',
'identity.user.created',
'identity.user.deleted',
'identity.user.updated',
'identity.group.created',
'identity.group.deleted',
'identity.group.updated',
'identity.role.created',
'identity.role.deleted',
'identity.role.updated',
'identity.project.created',
'identity.project.deleted',
'identity.project.updated',
'identity.trust.created',
'identity.trust.deleted',
'identity.role_assignment.created',
'identity.role_assignment.deleted',
]

View File

@ -0,0 +1,30 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer.dispatcher.resources import base
class Image(base.ResourceBase):
@staticmethod
def get_resource_extra_attributes(sample):
metadata = sample['resource_metadata']
params = {
"name": metadata['name'],
"container_format": metadata["container_format"],
"disk_format": metadata["disk_format"]
}
return params
@staticmethod
def get_metrics_names():
return ['image',
'image.size']

View File

@ -0,0 +1,54 @@
#
# Copyright 2014 eNovance
#
# Authors: Julien Danjou <julien@danjou.info>
# Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer.dispatcher.resources import base
class Instance(base.ResourceBase):
@staticmethod
def get_resource_extra_attributes(sample):
metadata = sample['resource_metadata']
params = {
"host": metadata['host'],
"image_ref": metadata['image_ref_url'],
"display_name": metadata['display_name'],
}
if "instance_flavor_id" in metadata:
params["flavor_id"] = int(metadata['instance_flavor_id'])
else:
# NOTE(sileht): instance.exists have the flavor here
params["flavor_id"] = int(metadata["flavor"]["id"])
server_group = metadata.get('user_metadata', {}).get('server_group')
if server_group:
params["server_group"] = server_group
return params
@staticmethod
def get_metrics_names():
# NOTE(sileht): Can we generate the list by loading ceilometer
# plugin ?
return ['instance',
'disk.root.size',
'disk.ephemeral.size',
'memory',
'memory.usage',
'vcpus',
'cpu',
'cpu_util']

View File

@ -0,0 +1,30 @@
#
# Copyright 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer.dispatcher.resources import base
class IPMI(base.ResourceBase):
@staticmethod
def get_resource_extra_attributes(sample):
return {}
@staticmethod
def get_metrics_names():
return ['hardware.ipmi.node.power',
'hardware.ipmi.node.temperature',
'hardware.ipmi.node.fan',
'hardware.ipmi.node.current',
'hardware.ipmi.node.voltage',
]

View File

@ -0,0 +1,41 @@
#
# Copyright 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer.dispatcher.resources import base
class Network(base.ResourceBase):
@staticmethod
def get_resource_extra_attributes(sample):
return {}
@staticmethod
def get_metrics_names():
return ['bandwidth',
'network',
'network.create',
'network.update',
'subnet',
'subnet.create',
'subnet.update',
'port',
'port.create',
'port.update',
'router',
'router.create',
'router.update',
'ip.floating',
'ip.floating.create',
'ip.floating.update',
]

View File

@ -0,0 +1,30 @@
#
# Copyright 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer.dispatcher.resources import base
class Stack(base.ResourceBase):
@staticmethod
def get_resource_extra_attributes(sample):
return {}
@staticmethod
def get_metrics_names():
return ['stack.create',
'stack.update',
'stack.delete',
'stack.resume',
'stack.suspend',
]

View File

@ -0,0 +1,33 @@
#
# Copyright 2014 eNovance
#
# Authors: Julien Danjou <julien@danjou.info>
# Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer.dispatcher.resources import base
class SwiftAccount(base.ResourceBase):
@staticmethod
def get_resource_extra_attributes(sample):
return {}
@staticmethod
def get_metrics_names():
return ['storage.objects.incoming.bytes',
'storage.objects.outgoing.bytes',
'storage.api.request',
'storage.objects.size',
'storage.objects',
'storage.objects.containers']

View File

@ -0,0 +1,38 @@
#
# Copyright 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer.dispatcher.resources import base
class Volume(base.ResourceBase):
@staticmethod
def get_resource_extra_attributes(sample):
metadata = sample['resource_metadata']
params = {
"display_name": metadata['display_name'],
}
return params
@staticmethod
def get_metrics_names():
return ['volume',
'volume.size',
'volume.create',
'volume.delete',
'volume.update',
'volume.resize',
'volume.attach',
'volume.detach',
]

View File

@ -0,0 +1,352 @@
#
# Copyright 2014 eNovance
#
# Authors: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import uuid
import mock
from oslo_config import fixture as config_fixture
from oslotest import base
from oslotest import mockpatch
import requests
import six.moves.urllib.parse as urlparse
import tempfile
import testscenarios
import yaml
from ceilometer.dispatcher import gnocchi
from ceilometer import service as ceilometer_service
load_tests = testscenarios.load_tests_apply_scenarios
class json_matcher(object):
def __init__(self, ref):
self.ref = ref
def __eq__(self, obj):
return self.ref == json.loads(obj)
def __repr__(self):
return "<json_matcher \"%s\">" % self.ref
class DispatcherTest(base.BaseTestCase):
def setUp(self):
super(DispatcherTest, self).setUp()
self.conf = self.useFixture(config_fixture.Config())
ceilometer_service.prepare_service([])
self.resource_id = str(uuid.uuid4())
self.samples = [{
'counter_name': 'disk.root.size',
'counter_type': 'gauge',
'counter_volume': '2',
'user_id': 'test_user',
'project_id': 'test_project',
'source': 'openstack',
'timestamp': '2012-05-08 20:23:48.028195',
'resource_id': self.resource_id,
'resource_metadata': {
'host': 'foo',
'image_ref_url': 'imageref!',
'instance_flavor_id': 1234,
'display_name': 'myinstance',
}},
{
'counter_name': 'disk.root.size',
'counter_type': 'gauge',
'counter_volume': '2',
'user_id': 'test_user',
'project_id': 'test_project',
'source': 'openstack',
'timestamp': '2014-05-08 20:23:48.028195',
'resource_id': self.resource_id,
'resource_metadata': {
'host': 'foo',
'image_ref_url': 'imageref!',
'instance_flavor_id': 1234,
'display_name': 'myinstance',
}
}]
ks_client = mock.Mock(auth_token='fake_token')
ks_client.tenants.find.return_value = mock.Mock(
name='gnocchi', id='a2d42c23-d518-46b6-96ab-3fba2e146859')
self.useFixture(mockpatch.Patch(
'ceilometer.keystone_client.get_client',
return_value=ks_client))
def test_extensions_load(self):
self.conf.config(filter_service_activity=False,
group='dispatcher_gnocchi')
d = gnocchi.GnocchiDispatcher(self.conf.conf)
self.assertIn('instance', d.mgmr.names())
self.assertIn('volume', d.mgmr.names())
@mock.patch('ceilometer.dispatcher.gnocchi.GnocchiDispatcher'
'._process_samples')
def _do_test_activity_filter(self, expected_samples, fake_process_samples):
d = gnocchi.GnocchiDispatcher(self.conf.conf)
d.record_metering_data(self.samples)
fake_process_samples.assert_called_with(
mock.ANY, self.resource_id, 'disk.root.size',
expected_samples, True,
)
def test_archive_policy_default(self):
d = gnocchi.GnocchiDispatcher(self.conf.conf)
self.assertEqual(d.gnocchi_archive_policy_default, "low")
def test_archive_policy_map_config(self):
archive_policy_map = yaml.dump({
'foo.*': 'low'
})
archive_policy_cfg_file = tempfile.NamedTemporaryFile(
mode='w+b', prefix="foo", suffix=".yaml")
archive_policy_cfg_file.write(archive_policy_map.encode())
archive_policy_cfg_file.seek(0)
self.conf.conf.dispatcher_gnocchi.archive_policy_file = (
archive_policy_cfg_file.name)
d = gnocchi.GnocchiDispatcher(self.conf.conf)
self.assertEqual(
d.get_archive_policy(
'foo.disk.rate')['archive_policy_name'], "low")
archive_policy_cfg_file.close()
def test_activity_filter_match_project_id(self):
self.samples[0]['project_id'] = (
'a2d42c23-d518-46b6-96ab-3fba2e146859')
self._do_test_activity_filter([self.samples[1]])
def test_activity_filter_match_swift_event(self):
self.samples[0]['counter_name'] = 'storage.api.request'
self.samples[0]['resource_id'] = 'a2d42c23-d518-46b6-96ab-3fba2e146859'
self._do_test_activity_filter([self.samples[1]])
def test_activity_filter_nomatch(self):
self._do_test_activity_filter(self.samples)
class MockResponse(mock.NonCallableMock):
def __init__(self, code):
text = {500: 'Internal Server Error',
404: 'Not Found',
204: 'Created',
409: 'Conflict',
}.get(code)
super(MockResponse, self).__init__(spec=requests.Response,
status_code=code,
text=text)
class DispatcherWorkflowTest(base.BaseTestCase,
testscenarios.TestWithScenarios):
sample_scenarios = [
('disk.root.size', dict(
sample={
'counter_name': 'disk.root.size',
'counter_type': 'gauge',
'counter_volume': '2',
'user_id': 'test_user',
'project_id': 'test_project',
'source': 'openstack',
'timestamp': '2012-05-08 20:23:48.028195',
'resource_metadata': {
'host': 'foo',
'image_ref_url': 'imageref!',
'instance_flavor_id': 1234,
'display_name': 'myinstance',
}
},
measures_attributes=[{
'timestamp': '2012-05-08 20:23:48.028195',
'value': '2'
}],
postable_attributes={
'user_id': 'test_user',
'project_id': 'test_project',
},
patchable_attributes={
'host': 'foo',
'image_ref': 'imageref!',
'flavor_id': 1234,
'display_name': 'myinstance',
},
metric_names=[
'instance', 'disk.root.size', 'disk.ephemeral.size',
'memory', 'vcpus', 'memory.usage', 'cpu', 'cpu_util'],
resource_type='instance')),
]
worflow_scenarios = [
('normal_workflow', dict(measure=204, post_resource=None, metric=None,
measure_retry=None, patch_resource=204)),
('new_resource', dict(measure=404, post_resource=204, metric=None,
measure_retry=204, patch_resource=None)),
('new_resource_fail', dict(measure=404, post_resource=500, metric=None,
measure_retry=None, patch_resource=None)),
('resource_update_fail', dict(measure=204, post_resource=None,
metric=None, measure_retry=None,
patch_resource=500)),
('new_metric', dict(measure=404, post_resource=409, metric=204,
measure_retry=204, patch_resource=204)),
('new_metric_fail', dict(measure=404, post_resource=409, metric=500,
measure_retry=None, patch_resource=None)),
('retry_fail', dict(measure=404, post_resource=409, metric=409,
measure_retry=500, patch_resource=None)),
('measure_fail', dict(measure=500, post_resource=None, metric=None,
measure_retry=None, patch_resource=None)),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls.sample_scenarios,
cls.worflow_scenarios)
def setUp(self):
super(DispatcherWorkflowTest, self).setUp()
self.conf = self.useFixture(config_fixture.Config())
ks_client = mock.Mock(auth_token='fake_token')
ks_client.tenants.find.return_value = mock.Mock(
name='gnocchi', id='a2d42c23-d518-46b6-96ab-3fba2e146859')
self.useFixture(mockpatch.Patch(
'ceilometer.keystone_client.get_client',
return_value=ks_client))
ceilometer_service.prepare_service([])
self.dispatcher = gnocchi.GnocchiDispatcher(self.conf.conf)
self.sample['resource_id'] = str(uuid.uuid4())
@mock.patch('ceilometer.dispatcher.gnocchi.LOG')
@mock.patch('ceilometer.dispatcher.gnocchi.requests')
def test_workflow(self, fake_requests, logger):
base_url = self.dispatcher.conf.dispatcher_gnocchi.url
url_params = {
'url': urlparse.urljoin(base_url, '/v1/resource'),
'resource_id': self.sample['resource_id'],
'resource_type': self.resource_type,
'metric_name': self.sample['counter_name']
}
headers = {'Content-Type': 'application/json',
'X-Auth-Token': 'fake_token'}
expected_calls = []
patch_responses = []
post_responses = []
# This is needed to mock Exception in py3
fake_requests.ConnectionError = requests.ConnectionError
expected_calls.extend([
mock.call.session(),
mock.call.adapters.HTTPAdapter(pool_block=True),
mock.call.session().mount('http://', mock.ANY),
mock.call.session().mount('https://', mock.ANY),
mock.call.session().post(
"%(url)s/%(resource_type)s/%(resource_id)s/"
"metric/%(metric_name)s/measures" % url_params,
headers=headers,
data=json_matcher(self.measures_attributes))
])
post_responses.append(MockResponse(self.measure))
if self.post_resource:
attributes = self.postable_attributes.copy()
attributes.update(self.patchable_attributes)
attributes['id'] = self.sample['resource_id']
attributes['metrics'] = dict((metric_name,
{'archive_policy_name': 'low'})
for metric_name in self.metric_names)
expected_calls.append(mock.call.session().post(
"%(url)s/%(resource_type)s" % url_params,
headers=headers,
data=json_matcher(attributes)),
)
post_responses.append(MockResponse(self.post_resource))
if self.metric:
expected_calls.append(mock.call.session().post(
"%(url)s/%(resource_type)s/%(resource_id)s/metric"
% url_params,
headers=headers,
data=json_matcher({self.sample['counter_name']:
{'archive_policy_name': 'low'}})
))
post_responses.append(MockResponse(self.metric))
if self.measure_retry:
expected_calls.append(mock.call.session().post(
"%(url)s/%(resource_type)s/%(resource_id)s/"
"metric/%(metric_name)s/measures" % url_params,
headers=headers,
data=json_matcher(self.measures_attributes))
)
post_responses.append(MockResponse(self.measure_retry))
if self.patch_resource:
expected_calls.append(mock.call.session().patch(
"%(url)s/%(resource_type)s/%(resource_id)s" % url_params,
headers=headers,
data=json_matcher(self.patchable_attributes)),
)
patch_responses.append(MockResponse(self.patch_resource))
s = fake_requests.session.return_value
s.patch.side_effect = patch_responses
s.post.side_effect = post_responses
self.dispatcher.record_metering_data([self.sample])
# Check that the last log message is the expected one
if self.measure == 500 or self.measure_retry == 500:
logger.error.assert_called_with(
"Fail to post measure on metric %s of resource %s "
"with status: %d: Internal Server Error" %
(self.sample['counter_name'],
self.sample['resource_id'],
500))
elif self.post_resource == 500 or self.patch_resource == 500:
logger.error.assert_called_with(
"Resource %s %s failed with status: "
"%d: Internal Server Error" %
(self.sample['resource_id'],
'update' if self.patch_resource else 'creation',
500))
elif self.metric == 500:
logger.error.assert_called_with(
"Fail to create metric %s of resource %s "
"with status: %d: Internal Server Error" %
(self.sample['counter_name'],
self.sample['resource_id'],
500))
elif self.patch_resource == 204:
logger.debug.assert_called_with(
'Resource %s updated', self.sample['resource_id'])
else:
logger.debug.assert_called_with(
"Measure posted on metric %s of resource %s",
self.sample['counter_name'],
self.sample['resource_id'])
self.assertEqual(expected_calls, fake_requests.mock_calls)
DispatcherWorkflowTest.generate_scenarios()

View File

@ -0,0 +1,7 @@
# This file is used to map a metric name to corresponding archive policy
# and used by the ceilometer dispatcher.
# Format: <metric_name>: <policy>
#cpu_utils: "high"
#disk.*: "low"

View File

@ -342,6 +342,18 @@ ceilometer.dispatcher =
database = ceilometer.dispatcher.database:DatabaseDispatcher
file = ceilometer.dispatcher.file:FileDispatcher
http = ceilometer.dispatcher.http:HttpDispatcher
gnocchi = ceilometer.dispatcher.gnocchi.GnocchiDispatcher
ceilometer.dispatcher.resource =
instance = ceilometer.dispatcher.resources.instance:Instance
swift_account = ceilometer.dispatcher.resources.swift_account:SwiftAccount
volume = ceilometer.dispatcher.resources.volume:Volume
ceph_account = ceilometer.dispatcher.resources.ceph_account:CephAccount
network = ceilometer.dispatcher.resources.network:Network
identity = ceilometer.dispatcher.resources.identity:Identity
ipmi = ceilometer.dispatcher.resources.ipmi:IPMI
stack = ceilometer.dispatcher.resources.orchestration:Stack
image = ceilometer.dispatcher.resources.image:Image
network.statistics.drivers =
opendaylight = ceilometer.network.statistics.opendaylight.driver:OpenDayLightDriver