Merge "InfluxDB: drop support"

This commit is contained in:
Jenkins 2016-03-25 16:30:44 +00:00 committed by Gerrit Code Review
commit 21ef133466
13 changed files with 8 additions and 384 deletions

View File

@ -39,9 +39,6 @@ case $STORAGE_DRIVER in
ENABLED_SERVICES+="ceph"
DEVSTACK_LOCAL_CONFIG+=$'\nexport GNOCCHI_STORAGE_BACKEND=ceph'
;;
influxdb)
DEVSTACK_LOCAL_CONFIG+=$'\nexport GNOCCHI_STORAGE_BACKEND=influxdb'
;;
esac

View File

@ -119,32 +119,6 @@ function _gnocchi_install_redis {
pip_install_gr redis
}
# install influxdb
# NOTE(chdent): InfluxDB is not currently packaged by the distro at the
# version that gnocchi needs. Until that is true we're downloading
# the debs and rpms packaged by the InfluxDB company. When it is
# true this method can be changed to be similar to
# _gnocchi_install_redis above.
function _gnocchi_install_influxdb {
if is_package_installed influxdb; then
echo "influxdb already installed"
else
local file=$(mktemp /tmp/influxpkg-XXXXX)
if is_ubuntu; then
wget -O $file $GNOCCHI_INFLUXDB_DEB_PKG
sudo dpkg -i $file
elif is_fedora; then
wget -O $file $GNOCCHI_INFLUXDB_RPM_PKG
sudo rpm -i $file
fi
rm $file
fi
# restart influxdb via its initscript
sudo /opt/influxdb/init.sh restart
}
function _gnocchi_install_grafana {
if is_ubuntu; then
local file=$(mktemp /tmp/grafanapkg-XXXXX)
@ -163,11 +137,6 @@ function _gnocchi_install_grafana {
sudo service grafana-server restart
}
# remove the influxdb database
function _gnocchi_cleanup_influxdb {
curl -G 'http://localhost:8086/query' --data-urlencode "q=DROP DATABASE $GNOCCHI_INFLUXDB_DBNAME"
}
function _cleanup_gnocchi_apache_wsgi {
sudo rm -f $GNOCCHI_WSGI_DIR/*.wsgi
sudo rm -f $(apache_site_config_for gnocchi)
@ -261,9 +230,6 @@ function configure_gnocchi {
elif [[ "$GNOCCHI_STORAGE_BACKEND" = 'file' ]] ; then
iniset $GNOCCHI_CONF storage driver file
iniset $GNOCCHI_CONF storage file_basepath $GNOCCHI_DATA_DIR/
elif [[ "$GNOCCHI_STORAGE_BACKEND" == 'influxdb' ]] ; then
iniset $GNOCCHI_CONF storage driver influxdb
iniset $GNOCCHI_CONF storage influxdb_database $GNOCCHI_INFLUXDB_DBNAME
else
echo "ERROR: could not configure storage driver"
exit 1
@ -337,11 +303,6 @@ function install_gnocchi {
_gnocchi_install_redis
fi
if [[ "${GNOCCHI_STORAGE_BACKEND}" == 'influxdb' ]] ; then
_gnocchi_install_influxdb
pip_install influxdb
fi
if [[ "$GNOCCHI_STORAGE_BACKEND" = 'ceph' ]] ; then
pip_install cradox
fi
@ -420,10 +381,6 @@ function stop_gnocchi {
stop_process $serv
done
if [[ "${GNOCCHI_STORAGE_BACKEND}" == 'influxdb' ]] ; then
_gnocchi_cleanup_influxdb
fi
if is_service_enabled gnocchi-grafana; then
sudo umount /usr/share/grafana/public/app/plugins/datasource/gnocchi
fi

View File

@ -48,11 +48,6 @@ GNOCCHI_CEPH_POOL_PGP=${GNOCCHI_CEPH_POOL_PGP:-8}
# Gnocchi backend
GNOCCHI_STORAGE_BACKEND=${GNOCCHI_STORAGE_BACKEND:-file}
# InfluxDB Settings
GNOCCHI_INFLUXDB_DBNAME=${GNOCCHI_INFLUXDB_DBNAME:-gnocchidevstack}
GNOCCHI_INFLUXDB_RPM_PKG=${GNOCCHI_INFLUXDB_RPM_PKG:-https://s3.amazonaws.com/influxdb/influxdb-0.9.4.2-1.x86_64.rpm}
GNOCCHI_INFLUXDB_DEB_PKG=${GNOCCHI_INFLUXDB_DEB_PKG:-https://s3.amazonaws.com/influxdb/influxdb_0.9.4.2_amd64.deb}
# Grafana settings
GRAFANA_RPM_PKG=${GRAFANA_RPM_PKG:-https://grafanarel.s3.amazonaws.com/builds/grafana-2.6.0-1.x86_64.rpm}
GRAFANA_DEB_PKG=${GRAFANA_DEB_PKG:-https://grafanarel.s3.amazonaws.com/builds/grafana_2.6.0_amd64.deb}

View File

@ -37,14 +37,10 @@ Gnocchi currently offers 4 storage drivers:
* File
* Swift
* Ceph (preferred)
* InfluxDB (experimental)
The first three drivers are based on an intermediate library, named
*Carbonara*, which handles the time series manipulation, since none of these
storage technologies handle time series natively. `InfluxDB`_ does not need
this layer since it is itself a time series database. However, The InfluxDB
driver is still experimental and suffers from bugs in InfluxDB itself that are
yet to be fixed as of this writing.
The drivers are based on an intermediate library, named *Carbonara*, which
handles the time series manipulation, since none of these storage technologies
handle time series natively.
The three *Carbonara* based drivers are working well and are as scalable as
their back-end technology permits. Ceph and Swift are inherently more scalable
@ -57,8 +53,6 @@ Gnocchi processes. In any case, it is obvious that Ceph and Swift drivers are
largely more scalable. Ceph also offers better consistency, and hence is the
recommended driver.
.. _InfluxDB: http://influxdb.com
How to plan for Gnocchis storage
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@ -51,7 +51,6 @@ Gnocchi provides these storage drivers:
- File (default)
- `Swift`_
- `Ceph`_
- `InfluxDB`_ (experimental)
Gnocchi provides these indexer drivers:
@ -62,7 +61,6 @@ Gnocchi provides these indexer drivers:
.. _`Ceph`: http://ceph.com/
.. _`PostgreSQL`: http://postgresql.org
.. _`MySQL`: http://mysql.com
.. _`InfluxDB`: http://influxdb.com
Configuring the WSGI pipeline
-----------------------------

View File

@ -45,7 +45,6 @@ The list of variants available is:
* keystone provides Keystone authentication support
* mysql - provides MySQL indexer support
* postgresql provides PostgreSQL indexer support
* influxdb provides InfluxDB storage support
* swift provides OpenStack Swift storage support
* ceph provides Ceph storage support
* file provides file driver support

View File

@ -21,7 +21,6 @@ import gnocchi.indexer
import gnocchi.storage
import gnocchi.storage.ceph
import gnocchi.storage.file
import gnocchi.storage.influxdb
import gnocchi.storage.swift
@ -58,8 +57,7 @@ def list_opts():
gnocchi.storage.OPTS,
gnocchi.storage.ceph.OPTS,
gnocchi.storage.file.OPTS,
gnocchi.storage.swift.OPTS,
gnocchi.storage.influxdb.OPTS)),
gnocchi.storage.swift.OPTS)),
("statsd", (
cfg.StrOpt('host',
default='0.0.0.0',

View File

@ -1,281 +0,0 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2015 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import datetime
import logging
import operator
try:
import influxdb
except ImportError:
influxdb = None
import iso8601
from oslo_config import cfg
from oslo_utils import timeutils
import retrying
from gnocchi import exceptions
from gnocchi import storage
from gnocchi import utils
OPTS = [
cfg.StrOpt('influxdb_host',
default='localhost',
help='InfluxDB host'),
cfg.PortOpt('influxdb_port',
default=8086,
help='InfluxDB port'),
cfg.StrOpt('influxdb_username',
default='root',
help='InfluxDB username'),
cfg.StrOpt('influxdb_password',
secret=True,
help='InfluxDB password'),
cfg.StrOpt('influxdb_database',
default='gnocchi',
help='InfluxDB database'),
cfg.BoolOpt('influxdb_block_until_data_ingested',
default=False,
help='InfluxDB ingests data in asynchroneous ways. '
'Set to True to wait data are ingested.'),
]
LOG = logging.getLogger(__name__)
START_EPOCH = datetime.datetime(1, 1, 1, tzinfo=iso8601.iso8601.UTC)
class InfluxDBStorage(storage.StorageDriver):
def __init__(self, conf):
if not influxdb:
raise ImportError("Module influxdb could not be loaded")
super(InfluxDBStorage, self).__init__(conf)
self._block_until_data_ingested = (
conf.influxdb_block_until_data_ingested)
self.influx = influxdb.InfluxDBClient(conf.influxdb_host,
conf.influxdb_port,
conf.influxdb_username,
conf.influxdb_password,
conf.influxdb_database)
self.database = conf.influxdb_database
@staticmethod
def _get_metric_id(metric):
return str(metric.id)
def _metric_exists(self, metric):
list_series = [s['name'] for s in self.influx.get_list_series()]
return self._get_metric_id(metric) in list_series
def _query(self, metric, query):
try:
return self.influx.query(query, database=self.database)
except influxdb.client.InfluxDBClientError as e:
# NOTE(ityaptin) If metric exists but doesn't have any measures
# with `value` field influxdb client may raise exception for
# (aggregate) query. It's not error in Gnocchi context and we
# should to return empty list in this case.
if ("unknown field or tag name" in e.content
or "measurement not found" in e.content):
return {self._get_metric_id(metric): []}
raise
@retrying.retry(stop_max_delay=5000, wait_fixed=500,
retry_on_exception=utils.retry_if_retry_raised)
def _wait_points_exists(self, metric_id, where):
# NOTE(sileht): influxdb query returns even the data is not yet insert
# in the asked series, the work is done in an async fashion, so a
# immediate get_measures after an add_measures will not returns the
# just inserted data. perhaps related:
# https://github.com/influxdb/influxdb/issues/2450 This is a workaround
# to wait that data appear in influxdb...
if not self._block_until_data_ingested:
return
try:
result = self.influx.query("SELECT * FROM \"%(metric_id)s\" WHERE "
"%(where)s LIMIT 1" %
dict(metric_id=metric_id, where=where),
database=self.database)
except influxdb.client.InfluxDBClientError as e:
if "measurement not found" in e.content:
raise utils.Retry
raise
result = list(result[metric_id])
if not result:
raise utils.Retry
def delete_metric(self, metric):
metric_id = self._get_metric_id(metric)
self._query(metric, "DROP MEASUREMENT \"%s\"" % metric_id)
def add_measures(self, metric, measures):
metric_id = self._get_metric_id(metric)
points = [dict(measurement=metric_id,
time=self._timestamp_to_utc(m.timestamp).isoformat(),
fields=dict(value=float(m.value)))
for m in measures]
self.influx.write_points(points=points, time_precision='n',
database=self.database,
retention_policy="default")
self._wait_points_exists(metric_id, "time = '%(time)s' AND "
"value = %(value)s" %
dict(time=points[-1]['time'],
value=points[-1]["fields"]["value"]))
def get_measures(self, metric, from_timestamp=None, to_timestamp=None,
aggregation='mean', granularity=None):
super(InfluxDBStorage, self).get_measures(
metric, from_timestamp, to_timestamp, aggregation)
if from_timestamp:
from_timestamp = self._timestamp_to_utc(from_timestamp)
if to_timestamp:
to_timestamp = self._timestamp_to_utc(to_timestamp)
metric_id = self._get_metric_id(metric)
if from_timestamp:
first_measure_timestamp = from_timestamp
else:
result = self._query(metric, "select * from \"%(metric_id)s\"" %
dict(metric_id=metric_id))
result = list(result[metric_id])
if result:
first_measure_timestamp = self._timestamp_to_utc(
timeutils.parse_isotime(result[0]['time']))
else:
first_measure_timestamp = None
query = ("SELECT %(aggregation)s(value) FROM \"%(metric_id)s\""
% dict(aggregation=aggregation,
metric_id=metric_id))
# NOTE(jd) So this is totally suboptimal as we CANNOT limit the range
# on time. InfluxDB is not smart enough yet to limit the result of the
# time we want based on the GROUP BY result, not based on the time
# value. If we do from_timestamp < t < to_timestamp, InfluxDB will
# limit the datapoints to those, and then run the aggregate function.
# What we want instead, is something like:
# SELECT mean(value) FROM serie
# GROUP BY time(5s) as groupedtime
# WHERE from_timestamp <= groupedtime < to_timestamp
# Since we cannot do that, we aggregate everything and then limit
# the returned result.
# see https://github.com/influxdb/influxdb/issues/1973
# NOTE(sileht): But we have to set one time boundary to have the
# request accept by influxdb.
# see https://github.com/influxdb/influxdb/issues/2444
#
# That's good enough until we support continuous query or the like.
results = []
defs = sorted(
(d
for d in metric.archive_policy.definition
if granularity is None or granularity == d.granularity),
key=operator.attrgetter('granularity'))
for definition in defs:
time_query = self._make_time_query(
first_measure_timestamp,
to_timestamp,
definition.granularity)
subquery = (query +
" WHERE %(times)s GROUP BY time(%(granularity)ds) "
"fill(none) LIMIT %(points)d" %
dict(times=time_query,
granularity=definition.granularity,
points=definition.points))
result = self._query(metric, subquery)
subresults = []
for point in result[metric_id]:
timestamp = self._timestamp_to_utc(
timeutils.parse_isotime(point['time']))
if (point[aggregation] is not None and
((from_timestamp is None or timestamp >= from_timestamp)
and (to_timestamp is None or timestamp < to_timestamp))):
subresults.insert(0, (timestamp,
definition.granularity,
point[aggregation]))
results.extend(subresults)
return list(reversed(results))
def search_value(self, metrics, query, from_timestamp=None,
to_timestamp=None,
aggregation='mean'):
results = {}
predicate = storage.MeasureQuery(query)
for metric in metrics:
measures = self.get_measures(metric, from_timestamp, to_timestamp,
aggregation)
results[metric] = [
(timestamp, granularity, value)
for timestamp, granularity, value in measures
if predicate(value)]
return results
@staticmethod
def _timestamp_to_utc(ts):
return timeutils.normalize_time(ts).replace(tzinfo=iso8601.iso8601.UTC)
def _make_time_query(self, from_timestamp, to_timestamp, granularity):
if from_timestamp:
from_timestamp = find_nearest_stable_point(from_timestamp,
granularity)
left_time = self._timestamp_to_utc(from_timestamp).isoformat()
else:
left_time = "now()"
if to_timestamp and to_timestamp >= from_timestamp:
right_time = self._timestamp_to_utc(to_timestamp).isoformat()
else:
right_time = None
return ("time >= '%s'" % left_time) + (" and time < '%s'" % right_time
if right_time else "")
def get_cross_metric_measures(self, metrics, from_timestamp=None,
to_timestamp=None, aggregation='mean',
needed_overlap=None):
super(InfluxDBStorage, self).get_cross_metric_measures(
metrics, from_timestamp, to_timestamp, aggregation, needed_overlap)
raise exceptions.NotImplementedError
def find_nearest_stable_point(timestamp, granularity, next=False):
"""Find the timetamp before another one for a particular granularity.
e.g. the nearest timestamp for 14:23:45
with a granularity of 60 is 14:23:00
:param timestamp: The timestamp to use as a reference point
:param granularity: Granularity to use to look for the nearest timestamp
:param next: Whatever to run the next timestamp
rather than the previous one
"""
seconds = timeutils.delta_seconds(START_EPOCH, timestamp)
seconds = int(seconds - seconds % granularity)
stable_point = START_EPOCH + datetime.timedelta(seconds=seconds)
if next:
stable_point += datetime.timedelta(seconds=granularity)
return stable_point

View File

@ -436,20 +436,6 @@ class TestCase(base.BaseTestCase):
self.conf.set_override('file_basepath',
tempdir.path,
'storage')
elif self.conf.storage.driver == 'influxdb':
self.conf.set_override('influxdb_block_until_data_ingested', True,
'storage')
self.conf.set_override('influxdb_database', 'test', 'storage')
self.conf.set_override('influxdb_password', 'root', 'storage')
self.conf.set_override('influxdb_port',
os.getenv("GNOCCHI_TEST_INFLUXDB_PORT",
51234), 'storage')
# NOTE(ityaptin) Creating unique database for every test may cause
# tests failing by timeout, but in may be useful in some cases
if os.getenv("GNOCCHI_TEST_INFLUXDB_UNIQUE_DATABASES"):
self.conf.set_override("influxdb_database",
"gnocchi_%s" % uuid.uuid4().hex,
'storage')
self.storage = storage.get_driver(self.conf)
# NOTE(jd) Do not upgrade the storage. We don't really need the storage

View File

@ -322,11 +322,6 @@ class MetricTest(RestTest):
status=403)
def test_add_measures_back_window(self):
if self.conf.storage.driver == 'influxdb':
# FIXME(sileht): Won't pass with influxdb because it doesn't
# check archive policy
raise testcase.TestSkipped("InfluxDB issue")
ap_name = str(uuid.uuid4())
with self.app.use_admin_user():
self.app.post_json(

View File

@ -5,13 +5,4 @@ set -x
GNOCCHI_TEST_INDEXER_DRIVER=${GNOCCHI_TEST_INDEXER_DRIVER:-postgresql}
source $(which overtest) $GNOCCHI_TEST_INDEXER_DRIVER
export GNOCCHI_INDEXER_URL=${OVERTEST_URL/#mysql:/mysql+pymysql:}
# Activate overtest for storage
case $GNOCCHI_TEST_STORAGE_DRIVER in
influxdb)
source $(which overtest) $GNOCCHI_TEST_STORAGE_DRIVER
GNOCCHI_TEST_INFLUXDB_PORT=${OVERTEST_INFLUXDB_PORT}
;;
*)
;;
esac
$*

View File

@ -33,8 +33,6 @@ postgresql =
sqlalchemy
sqlalchemy-utils
alembic>=0.7.6,!=0.8.1
influxdb =
influxdb>=2.4
swift =
python-swiftclient>=3.0.0
msgpack-python
@ -124,7 +122,6 @@ gnocchi.storage =
swift = gnocchi.storage.swift:SwiftStorage
ceph = gnocchi.storage.ceph:CephStorage
file = gnocchi.storage.file:FileStorage
influxdb = gnocchi.storage.influxdb:InfluxDBStorage
gnocchi.indexer =
null = gnocchi.indexer.null:NullIndexer

10
tox.ini
View File

@ -1,6 +1,6 @@
[tox]
minversion = 1.8
envlist = py{27,34},py{27,34}-{postgresql,mysql}{,-file,-swift,-ceph,-influxdb},pep8,bashate
envlist = py{27,34},py{27,34}-{postgresql,mysql}{,-file,-swift,-ceph},pep8,bashate
[testenv]
usedevelop = True
@ -9,7 +9,6 @@ passenv = LANG OS_TEST_TIMEOUT OS_STDOUT_CAPTURE OS_STDERR_CAPTURE OS_LOG_CAPTUR
deps = .[test]
py{27,34}-postgresql: .[postgresql,swift,ceph,file]
py{27,34}-mysql: .[mysql,swift,ceph,file]
py{27,34}-{postgresql,mysql}-influxdb: .[influxdb]
setenv =
GNOCCHI_TEST_STORAGE_DRIVER=file
GNOCCHI_TEST_INDEXER_DRIVER=postgresql
@ -18,9 +17,8 @@ setenv =
py{27,34}-{postgresql,mysql}-file: GNOCCHI_TEST_STORAGE_DRIVERS=file
py{27,34}-{postgresql,mysql}-swift: GNOCCHI_TEST_STORAGE_DRIVERS=swift
py{27,34}-{postgresql,mysql}-ceph: GNOCCHI_TEST_STORAGE_DRIVERS=ceph
py{27,34}-{postgresql,mysql}-influxdb: GNOCCHI_TEST_STORAGE_DRIVERS=influxdb
py{27,34}-postgresql{,-file,-swift,-ceph,-influxdb}: GNOCCHI_TEST_INDEXER_DRIVERS=postgresql
py{27,34}-mysql{,-file,-swift,-ceph,-influxdb}: GNOCCHI_TEST_INDEXER_DRIVERS=mysql
py{27,34}-postgresql{,-file,-swift,-ceph}: GNOCCHI_TEST_INDEXER_DRIVERS=postgresql
py{27,34}-mysql{,-file,-swift,-ceph}: GNOCCHI_TEST_INDEXER_DRIVERS=mysql
commands =
doc8 --ignore-path doc/source/rest.rst doc/source
@ -64,7 +62,7 @@ exclude = .tox,.eggs,doc
show-source = true
[testenv:genconfig]
deps = .[mysql,postgresql,test,file,influxdb,ceph,swift]
deps = .[mysql,postgresql,test,file,ceph,swift]
commands = oslo-config-generator --config-file=gnocchi-config-generator.conf
[testenv:docs]