Emma Foley cfcdbfdfd1 Add collectd_gnocchi support
- Create a new gnocchi dir for the collectd-gnocchi plugin.
- Get gnocchi endpoint from keystone
- Add new _base_url format
- Create metrics if they don't already exist
- Add instructions to doc/source/devstackGSG.rst

Change-Id: Id7ce8130cb22f33147b7f031cd65564375db10d6
2017-01-17 09:55:32 +00:00

140 lines
4.3 KiB
Python

# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Gnocchi collectd plugin implementation"""
from __future__ import unicode_literals
from collectd_ceilometer.gnocchi.sender import Sender
from collections import defaultdict
from collections import namedtuple
import datetime
import json
import logging
import six
import threading
LOGGER = logging.getLogger(__name__)
class Sample(namedtuple('Sample', ['value', 'timestamp', 'meta',
'unit', 'metername'])):
"""Sample data"""
def to_payload(self):
"""Return a payload dictionary"""
return {
'value': self.value,
'timestamp': self.timestamp,
}
class SampleContainer(object):
"""Sample storage"""
def __init__(self):
self._lock = threading.Lock()
self._data = defaultdict(list)
def add(self, key, samples, limit):
"""Store list of samples under the key
Store the list of samples under the given key. If numer of stored
samples is greater than the given limit, all the samples are returned
and the stored samples are dropped. Otherwise None is returned.
@param key key of the samples
@param samples list of samples
@param limit sample list limit
"""
with self._lock:
current = self._data[key]
current += samples
if len(current) >= limit:
self._data[key] = []
return current
return None
def reset(self):
"""Reset stored samples
Returns all samples and removes them from the container.
"""
with self._lock:
retval = self._data
self._data = defaultdict(list)
return retval
class Writer(object):
"""Data collector"""
def __init__(self, meters, config):
self._meters = meters
self._samples = SampleContainer()
self._sender = Sender()
self._config = config
def write(self, vl, data):
"""Collect data from collectd
example of vl: collectd.Values(type='vmpage_action',
type_instance='interleave_hit',plugin='numa',plugin_instance='node0',
host='localhost',time=1443092594.625033,interval=10.0,values=[21383])
"""
# take the plugin (specialized or default) for parsing the data
plugin = self._meters.get(vl.plugin)
# prepare all data related to the sample
metername = plugin.meter_name(vl)
unit = plugin.unit(vl)
timestamp = datetime.datetime.utcfromtimestamp(vl.time).isoformat()
LOGGER.debug(
'Writing: plugin="%s", metername="%s"', vl.plugin, metername)
# store sample for every value
data = [
Sample(
value=value, timestamp=timestamp, meta=vl.meta,
unit=unit, metername=metername
)
for value in vl.values
]
# add data to cache and get the samples to send
to_send = self._samples.add(metername, data,
self._config.BATCH_SIZE)
if to_send:
self._send_data(metername, to_send, unit)
def flush(self):
"""Flush all pending samples"""
# get all stored samples
to_send = self._samples.reset()
# send all cached samples
for key, samples in six.iteritems(to_send):
if samples:
self._send_data(key, samples)
def _send_data(self, metername, to_send, unit=None):
"""Send data to gnocchi"""
LOGGER.debug('Sending %d samples of %s',
len(to_send), metername)
# gnocchi samples
payload = json.dumps([sample.to_payload() for sample in to_send])
self._sender.send(metername, payload, unit)