From eb1d64782a6db8bd3640ecf560879fcaf0af3474 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Mon, 27 Feb 2017 23:05:35 +0100 Subject: [PATCH] storage: Add redis driver This change adds a redis driver and uses it by default in devstack. Because reading/writting from/to disks is too slow in our testing environment. Change-Id: If617260a9d8e38dc9ba9311c832be333346dd41e --- bindep.txt | 1 + devstack/plugin.sh | 5 +- devstack/settings | 7 +- doc/source/architecture.rst | 2 + doc/source/install.rst | 4 + gnocchi/opts.py | 2 + gnocchi/storage/common/redis.py | 127 +++++++++++++++++ gnocchi/storage/incoming/redis.py | 87 ++++++++++++ gnocchi/storage/redis.py | 129 ++++++++++++++++++ gnocchi/tests/base.py | 9 ++ .../notes/redis-driver-299dc443170364bc.yaml | 5 + run-tests.sh | 4 +- setup.cfg | 7 + tox.ini | 6 +- 14 files changed, 388 insertions(+), 7 deletions(-) create mode 100644 gnocchi/storage/common/redis.py create mode 100644 gnocchi/storage/incoming/redis.py create mode 100644 gnocchi/storage/redis.py create mode 100644 releasenotes/notes/redis-driver-299dc443170364bc.yaml diff --git a/bindep.txt b/bindep.txt index cd6bd714..50e6e0ca 100644 --- a/bindep.txt +++ b/bindep.txt @@ -6,3 +6,4 @@ build-essential [platform:dpkg] libffi-dev [platform:dpkg] librados-dev [platform:dpkg] ceph [platform:dpkg] +redis-server [platform:dpkg] diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 43313145..9add5828 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -250,6 +250,9 @@ function configure_gnocchi { elif [[ "$GNOCCHI_STORAGE_BACKEND" = 'file' ]] ; then iniset $GNOCCHI_CONF storage driver file iniset $GNOCCHI_CONF storage file_basepath $GNOCCHI_DATA_DIR/ + elif [[ "$GNOCCHI_STORAGE_BACKEND" = 'redis' ]] ; then + iniset $GNOCCHI_CONF storage driver redis + iniset $GNOCCHI_CONF storage redis_url $GNOCCHI_REDIS_URL else echo "ERROR: could not configure storage driver" exit 1 @@ -353,7 +356,7 @@ function preinstall_gnocchi { # install_gnocchi() - Collect source and prepare function install_gnocchi { - if [ "${GNOCCHI_COORDINATOR_URL%%:*}" == "redis" ]; then + if [[ "$GNOCCHI_STORAGE_BACKEND" = 'redis' ]] || [[ "${GNOCCHI_COORDINATOR_URL%%:*}" == "redis" ]]; then _gnocchi_install_redis fi diff --git a/devstack/settings b/devstack/settings index 1d683536..769d149a 100644 --- a/devstack/settings +++ b/devstack/settings @@ -44,14 +44,17 @@ GNOCCHI_STATSD_RESOURCE_ID=${GNOCCHI_STATSD_RESOURCE_ID:-$(uuidgen)} GNOCCHI_STATSD_USER_ID=${GNOCCHI_STATSD_USER_ID:-$(uuidgen)} GNOCCHI_STATSD_PROJECT_ID=${GNOCCHI_STATSD_PROJECT_ID:-$(uuidgen)} -# ceph gnocchi info +# Ceph gnocchi info GNOCCHI_CEPH_USER=${GNOCCHI_CEPH_USER:-gnocchi} GNOCCHI_CEPH_POOL=${GNOCCHI_CEPH_POOL:-gnocchi} GNOCCHI_CEPH_POOL_PG=${GNOCCHI_CEPH_POOL_PG:-8} GNOCCHI_CEPH_POOL_PGP=${GNOCCHI_CEPH_POOL_PGP:-8} +# Redis gnocchi info +GNOCCHI_REDIS_URL=${GNOCCHI_REDIS_URL:-redis://localhost:6379} + # Gnocchi backend -GNOCCHI_STORAGE_BACKEND=${GNOCCHI_STORAGE_BACKEND:-file} +GNOCCHI_STORAGE_BACKEND=${GNOCCHI_STORAGE_BACKEND:-redis} # Grafana settings GRAFANA_RPM_PKG=${GRAFANA_RPM_PKG:-https://grafanarel.s3.amazonaws.com/builds/grafana-3.0.4-1464167696.x86_64.rpm} diff --git a/doc/source/architecture.rst b/doc/source/architecture.rst index 29dbd249..f62a67c7 100755 --- a/doc/source/architecture.rst +++ b/doc/source/architecture.rst @@ -44,6 +44,7 @@ Gnocchi currently offers different storage drivers: * `Ceph`_ (preferred) * `OpenStack Swift`_ * `S3`_ +* `Redis`_ The drivers are based on an intermediate library, named *Carbonara*, which handles the time series manipulation, since none of these storage technologies @@ -63,6 +64,7 @@ the recommended driver. .. _OpenStack Swift: http://docs.openstack.org/developer/swift/ .. _Ceph: https://ceph.com .. _`S3`: https://aws.amazon.com/s3/ +.. _`Redis`: https://redis.io Available index back-ends ~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/source/install.rst b/doc/source/install.rst index 10d4037f..5ed1b35b 100644 --- a/doc/source/install.rst +++ b/doc/source/install.rst @@ -31,6 +31,7 @@ The list of variants available is: * ceph_recommended_lib – provides Ceph (>=0.80) storage support * ceph_alternative_lib – provides Ceph (>=10.1.0) storage support * file – provides file driver support +* redis – provides Redis storage support * doc – documentation building support * test – unit and functional tests support @@ -117,6 +118,9 @@ options you want to change and configure: | storage.s3_* | Configuration options to access S3 | | | if you use the S3 storage driver. | +---------------------+---------------------------------------------------+ +| storage.redis_* | Configuration options to access Redis | +| | if you use the Redis storage driver. | ++---------------------+---------------------------------------------------+ Configuring authentication ----------------------------- diff --git a/gnocchi/opts.py b/gnocchi/opts.py index e86e9237..f8baeffd 100644 --- a/gnocchi/opts.py +++ b/gnocchi/opts.py @@ -26,6 +26,7 @@ import gnocchi.indexer import gnocchi.storage import gnocchi.storage.ceph import gnocchi.storage.file +import gnocchi.storage.redis import gnocchi.storage.s3 import gnocchi.storage.swift @@ -48,6 +49,7 @@ _STORAGE_OPTS = list(itertools.chain(gnocchi.storage.OPTS, gnocchi.storage.ceph.OPTS, gnocchi.storage.file.OPTS, gnocchi.storage.swift.OPTS, + gnocchi.storage.redis.OPTS, gnocchi.storage.s3.OPTS)) diff --git a/gnocchi/storage/common/redis.py b/gnocchi/storage/common/redis.py new file mode 100644 index 00000000..7986e25c --- /dev/null +++ b/gnocchi/storage/common/redis.py @@ -0,0 +1,127 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2017 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import + +from six.moves.urllib import parse + +from oslo_utils import strutils + +try: + import redis + from redis import sentinel +except ImportError: + redis = None + sentinel = None + + +CLIENT_ARGS = frozenset([ + 'db', + 'encoding', + 'retry_on_timeout', + 'socket_keepalive', + 'socket_timeout', + 'ssl', + 'ssl_certfile', + 'ssl_keyfile', + 'sentinel', + 'sentinel_fallback', +]) +""" +Keys that we allow to proxy from the coordinator configuration into the +redis client (used to configure the redis client internals so that +it works as you expect/want it to). + +See: http://redis-py.readthedocs.org/en/latest/#redis.Redis + +See: https://github.com/andymccurdy/redis-py/blob/2.10.3/redis/client.py +""" + +#: Client arguments that are expected/allowed to be lists. +CLIENT_LIST_ARGS = frozenset([ + 'sentinel_fallback', +]) + +#: Client arguments that are expected to be boolean convertible. +CLIENT_BOOL_ARGS = frozenset([ + 'retry_on_timeout', + 'ssl', +]) + +#: Client arguments that are expected to be int convertible. +CLIENT_INT_ARGS = frozenset([ + 'db', + 'socket_keepalive', + 'socket_timeout', +]) + +#: Default socket timeout to use when none is provided. +CLIENT_DEFAULT_SOCKET_TO = 30 + + +def get_client(conf): + if redis is None: + raise RuntimeError("python-redis unavailable") + parsed_url = parse.urlparse(conf.redis_url) + options = parse.parse_qs(parsed_url.query) + + kwargs = {} + if parsed_url.hostname: + kwargs['host'] = parsed_url.hostname + if parsed_url.port: + kwargs['port'] = parsed_url.port + else: + if not parsed_url.path: + raise ValueError("Expected socket path in parsed urls path") + kwargs['unix_socket_path'] = parsed_url.path + if parsed_url.password: + kwargs['password'] = parsed_url.password + + for a in CLIENT_ARGS: + if a not in options: + continue + if a in CLIENT_BOOL_ARGS: + v = strutils.bool_from_string(options[a][-1]) + elif a in CLIENT_LIST_ARGS: + v = options[a][-1] + elif a in CLIENT_INT_ARGS: + v = int(options[a][-1]) + else: + v = options[a][-1] + kwargs[a] = v + if 'socket_timeout' not in kwargs: + kwargs['socket_timeout'] = CLIENT_DEFAULT_SOCKET_TO + + # Ask the sentinel for the current master if there is a + # sentinel arg. + if 'sentinel' in kwargs: + sentinel_hosts = [ + tuple(fallback.split(':')) + for fallback in kwargs.get('sentinel_fallback', []) + ] + sentinel_hosts.insert(0, (kwargs['host'], kwargs['port'])) + sentinel_server = sentinel.Sentinel( + sentinel_hosts, + socket_timeout=kwargs['socket_timeout']) + sentinel_name = kwargs['sentinel'] + del kwargs['sentinel'] + if 'sentinel_fallback' in kwargs: + del kwargs['sentinel_fallback'] + master_client = sentinel_server.master_for(sentinel_name, **kwargs) + # The master_client is a redis.StrictRedis using a + # Sentinel managed connection pool. + return master_client + return redis.StrictRedis(**kwargs) diff --git a/gnocchi/storage/incoming/redis.py b/gnocchi/storage/incoming/redis.py new file mode 100644 index 00000000..dcc9d529 --- /dev/null +++ b/gnocchi/storage/incoming/redis.py @@ -0,0 +1,87 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2017 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import contextlib +import datetime +import os +import uuid + +import six + +from gnocchi.storage.common import redis +from gnocchi.storage.incoming import _carbonara + + +class RedisStorage(_carbonara.CarbonaraBasedStorage): + + STORAGE_PREFIX = "incoming" + + def __init__(self, conf): + super(RedisStorage, self).__init__(conf) + self._client = redis.get_client(conf) + + def _build_measure_path(self, metric_id, random_id=None): + path = os.path.join(self.STORAGE_PREFIX, six.text_type(metric_id)) + if random_id: + if random_id is True: + now = datetime.datetime.utcnow().strftime("_%Y%m%d_%H:%M:%S") + random_id = six.text_type(uuid.uuid4()) + now + return os.path.join(path, random_id) + return path + + def _store_new_measures(self, metric, data): + path = self._build_measure_path(metric.id, True) + self._client.set(path.encode("utf8"), data) + + def _build_report(self, details): + match = os.path.join(self.STORAGE_PREFIX, "*") + metric_details = {} + for key in self._client.scan_iter(match=match.encode('utf8')): + metric = key.decode('utf8').split(os.path.sep)[1] + count = metric_details.setdefault(metric, 0) + count += 1 + return (len(metric_details.keys()), sum(metric_details.values()), + metric_details if details else None) + + def list_metric_with_measures_to_process(self, size, part, full=False): + match = os.path.join(self.STORAGE_PREFIX, "*") + keys = self._client.scan_iter(match=match.encode('utf8')) + measures = set([k.decode('utf8').split(os.path.sep)[1] for k in keys]) + if full: + return measures + return set(list(measures)[size * part:size * (part + 1)]) + + def _list_measures_container_for_metric_id(self, metric_id): + match = os.path.join(self._build_measure_path(metric_id), "*") + return list(self._client.scan_iter(match=match.encode("utf8"))) + + def delete_unprocessed_measures_for_metric_id(self, metric_id): + keys = self._list_measures_container_for_metric_id(metric_id) + if keys: + self._client.delete(*keys) + + @contextlib.contextmanager + def process_measure_for_metric(self, metric): + keys = self._list_measures_container_for_metric_id(metric.id) + measures = [] + for k in keys: + data = self._client.get(k) + sp_key = k.decode('utf8').split("/")[-1] + measures.extend(self._unserialize_measures(sp_key, data)) + + yield measures + + if keys: + self._client.delete(*keys) diff --git a/gnocchi/storage/redis.py b/gnocchi/storage/redis.py new file mode 100644 index 00000000..bfbaa670 --- /dev/null +++ b/gnocchi/storage/redis.py @@ -0,0 +1,129 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2017 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import os + +from oslo_config import cfg + +from gnocchi import storage +from gnocchi.storage import _carbonara +from gnocchi.storage.common import redis + + +OPTS = [ + cfg.StrOpt('redis_url', + default='redis://localhost:6379/', + help='Redis URL'), +] + + +class RedisStorage(_carbonara.CarbonaraBasedStorage): + WRITE_FULL = True + + STORAGE_PREFIX = "timeseries" + + def __init__(self, conf, incoming): + super(RedisStorage, self).__init__(conf, incoming) + self._client = redis.get_client(conf) + + def _build_metric_dir(self, metric): + return os.path.join(self.STORAGE_PREFIX, str(metric.id)) + + def _build_unaggregated_timeserie_path(self, metric, version=3): + return os.path.join( + self._build_metric_dir(metric), + 'none' + ("_v%s" % version if version else "")) + + def _build_metric_path(self, metric, aggregation): + return os.path.join(self._build_metric_dir(metric), + "agg_" + aggregation) + + def _build_metric_path_for_split(self, metric, aggregation, + timestamp_key, granularity, version=3): + path = os.path.join(self._build_metric_path(metric, aggregation), + timestamp_key + "_" + str(granularity)) + return path + '_v%s' % version if version else path + + def _create_metric(self, metric): + path = self._build_metric_dir(metric) + ret = self._client.set(path.encode("utf-8"), "created", nx=True) + if ret is None: + raise storage.MetricAlreadyExists(metric) + + def _store_unaggregated_timeserie(self, metric, data, version=3): + path = self._build_unaggregated_timeserie_path(metric, version) + self._client.set(path.encode("utf8"), data) + + def _get_unaggregated_timeserie(self, metric, version=3): + path = self._build_unaggregated_timeserie_path(metric, version) + data = self._client.get(path.encode("utf8")) + if data is None: + raise storage.MetricDoesNotExist(metric) + return data + + def _delete_unaggregated_timeserie(self, metric, version=3): + path = self._build_unaggregated_timeserie_path(metric, version) + data = self._client.get(path.encode("utf8")) + if data is None: + raise storage.MetricDoesNotExist(metric) + self._client.delete(path.encode("utf8")) + + def _list_split_keys_for_metric(self, metric, aggregation, granularity, + version=None): + path = self._build_metric_dir(metric) + if self._client.get(path.encode("utf8")) is None: + raise storage.MetricDoesNotExist(metric) + match = os.path.join(self._build_metric_path(metric, aggregation), + "*") + split_keys = set() + for key in self._client.scan_iter(match=match.encode("utf8")): + key = key.decode("utf8") + key = key.split(os.path.sep)[-1] + meta = key.split("_") + if meta[1] == str(granularity) and self._version_check(key, + version): + split_keys.add(meta[0]) + return split_keys + + def _delete_metric_measures(self, metric, timestamp_key, aggregation, + granularity, version=3): + path = self._build_metric_path_for_split( + metric, aggregation, timestamp_key, granularity, version) + self._client.delete(path.encode("utf8")) + + def _store_metric_measures(self, metric, timestamp_key, aggregation, + granularity, data, offset=None, version=3): + path = self._build_metric_path_for_split(metric, aggregation, + timestamp_key, granularity, + version) + self._client.set(path.encode("utf8"), data) + + def _delete_metric(self, metric): + path = self._build_metric_dir(metric) + self._client.delete(path.encode("utf8")) + + # Carbonara API + + def _get_measures(self, metric, timestamp_key, aggregation, granularity, + version=3): + path = self._build_metric_path_for_split( + metric, aggregation, timestamp_key, granularity, version) + data = self._client.get(path.encode("utf8")) + if data is None: + fpath = self._build_metric_dir(metric) + if self._client.get(fpath.encode("utf8")) is None: + raise storage.MetricDoesNotExist(metric) + raise storage.AggregationDoesNotExist(metric, aggregation) + return data diff --git a/gnocchi/tests/base.py b/gnocchi/tests/base.py index 04cbb574..b28ab604 100644 --- a/gnocchi/tests/base.py +++ b/gnocchi/tests/base.py @@ -17,6 +17,7 @@ import functools import json import os import subprocess +import threading import uuid import fixtures @@ -178,6 +179,9 @@ class FakeSwiftClient(object): @six.add_metaclass(SkipNotImplementedMeta) class TestCase(base.BaseTestCase): + REDIS_DB_INDEX = 0 + REDIS_DB_LOCK = threading.Lock() + ARCHIVE_POLICIES = { 'no_granularity_match': archive_policy.ArchivePolicy( "no_granularity_match", @@ -310,6 +314,11 @@ class TestCase(base.BaseTestCase): "storage") self.storage = storage.get_driver(self.conf) + if self.conf.storage.driver == 'redis': + # Create one prefix per test + self.storage.STORAGE_PREFIX = str(uuid.uuid4()) + self.storage.incoming.STORAGE_PREFIX = str(uuid.uuid4()) + # NOTE(jd) Do not upgrade the storage. We don't really need the storage # upgrade for now, and the code that upgrade from pre-1.3 # (TimeSerieArchive) uses a lot of parallel lock, which makes tooz diff --git a/releasenotes/notes/redis-driver-299dc443170364bc.yaml b/releasenotes/notes/redis-driver-299dc443170364bc.yaml new file mode 100644 index 00000000..b8214f27 --- /dev/null +++ b/releasenotes/notes/redis-driver-299dc443170364bc.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + A Redis driver has been introduced for storing incoming measures and + computed timeseries. diff --git a/run-tests.sh b/run-tests.sh index ecb9797d..0e6d11f8 100755 --- a/run-tests.sh +++ b/run-tests.sh @@ -8,8 +8,8 @@ do for indexer in ${GNOCCHI_TEST_INDEXER_DRIVERS} do case $GNOCCHI_TEST_STORAGE_DRIVER in - ceph) - pifpaf run ceph -- pifpaf -g GNOCCHI_INDEXER_URL run $indexer -- ./tools/pretty_tox.sh $* + ceph|redis) + pifpaf run $GNOCCHI_TEST_STORAGE_DRIVER -- pifpaf -g GNOCCHI_INDEXER_URL run $indexer -- ./tools/pretty_tox.sh $* ;; s3) if ! which s3rver >/dev/null 2>&1 diff --git a/setup.cfg b/setup.cfg index 0d34d9bb..da5b5c8e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -40,6 +40,11 @@ s3 = msgpack-python lz4 tooz>=1.38 +redis = + redis>=2.10.0 # MIT + msgpack-python + lz4 + tooz>=1.38 swift = python-swiftclient>=3.1.0 msgpack-python @@ -106,12 +111,14 @@ gnocchi.storage = ceph = gnocchi.storage.ceph:CephStorage file = gnocchi.storage.file:FileStorage s3 = gnocchi.storage.s3:S3Storage + redis = gnocchi.storage.redis:RedisStorage gnocchi.incoming = ceph = gnocchi.storage.incoming.ceph:CephStorage file = gnocchi.storage.incoming.file:FileStorage swift = gnocchi.storage.incoming.swift:SwiftStorage s3 = gnocchi.storage.incoming.s3:S3Storage + redis = gnocchi.storage.incoming.redis:RedisStorage gnocchi.indexer = mysql = gnocchi.indexer.sqlalchemy:SQLAlchemyIndexer diff --git a/tox.ini b/tox.ini index 4a3b9b03..7c1b72aa 100644 --- a/tox.ini +++ b/tox.ini @@ -9,19 +9,21 @@ passenv = LANG OS_DEBUG OS_TEST_TIMEOUT OS_STDOUT_CAPTURE OS_STDERR_CAPTURE OS_L setenv = GNOCCHI_TEST_STORAGE_DRIVER=file GNOCCHI_TEST_INDEXER_DRIVER=postgresql - GNOCCHI_TEST_STORAGE_DRIVERS=file swift ceph s3 + GNOCCHI_TEST_STORAGE_DRIVERS=file swift ceph s3 redis GNOCCHI_TEST_INDEXER_DRIVERS=postgresql mysql file: GNOCCHI_TEST_STORAGE_DRIVERS=file swift: GNOCCHI_TEST_STORAGE_DRIVERS=swift ceph: GNOCCHI_TEST_STORAGE_DRIVERS=ceph + redis: GNOCCHI_TEST_STORAGE_DRIVERS=redis s3: GNOCCHI_TEST_STORAGE_DRIVERS=s3 postgresql: GNOCCHI_TEST_INDEXER_DRIVERS=postgresql mysql: GNOCCHI_TEST_INDEXER_DRIVERS=mysql - GNOCCHI_STORAGE_DEPS=file,swift,s3,ceph,ceph_recommended_lib + GNOCCHI_STORAGE_DEPS=file,swift,s3,ceph,ceph_recommended_lib,redis ceph: GNOCCHI_STORAGE_DEPS=ceph,ceph_recommended_lib swift: GNOCCHI_STORAGE_DEPS=swift file: GNOCCHI_STORAGE_DEPS=file + redis: GNOCCHI_STORAGE_DEPS=redis s3: GNOCCHI_STORAGE_DEPS=s3 deps = .[test] postgresql: .[postgresql,{env:GNOCCHI_STORAGE_DEPS}]