From 0bcde58eb642acfa53fc9da1650a0b03352a9864 Mon Sep 17 00:00:00 2001 From: Thomas Goirand Date: Tue, 16 Apr 2024 15:48:46 +0200 Subject: [PATCH] Add opensearch/elasticsearch datastreams support Currently, cloudkitty only uses Elasticsearch/Opensearch leagacy index, but not datastreams. Datastreams are much nicer, because they handle automation for rotating data to cold storage (if the elastic cluster has multiple types of storage, like hot / warm / cold). This patch adds a configuration option to use datastreams. Change-Id: Iac717c948665effda82a2a87329647c74e3f0667 --- .../storage/v2/elasticsearch/__init__.py | 4 ++ cloudkitty/storage/v2/elasticsearch/client.py | 51 ++++++++++++++----- cloudkitty/storage/v2/opensearch/__init__.py | 3 ++ cloudkitty/storage/v2/opensearch/client.py | 44 +++++++++++----- ...-datastreams-support-28b7c1ce700d33c0.yaml | 8 +++ 5 files changed, 83 insertions(+), 27 deletions(-) create mode 100644 releasenotes/notes/add-opensearch-elasticsearch-datastreams-support-28b7c1ce700d33c0.yaml diff --git a/cloudkitty/storage/v2/elasticsearch/__init__.py b/cloudkitty/storage/v2/elasticsearch/__init__.py index 0384d323..49d43ba7 100644 --- a/cloudkitty/storage/v2/elasticsearch/__init__.py +++ b/cloudkitty/storage/v2/elasticsearch/__init__.py @@ -52,6 +52,10 @@ elasticsearch_storage_opts = [ "should be kept alive.", advanced=True, default=30, min=0, max=300), + cfg.BoolOpt('use_datastream', + help='Use a datastream rather than an index. This is useful ' + 'starting with Elasticsearch 7.17.', + default=False), ] CONF.register_opts(elasticsearch_storage_opts, ELASTICSEARCH_STORAGE_GROUP) diff --git a/cloudkitty/storage/v2/elasticsearch/client.py b/cloudkitty/storage/v2/elasticsearch/client.py index 525f8655..98dc8a29 100644 --- a/cloudkitty/storage/v2/elasticsearch/client.py +++ b/cloudkitty/storage/v2/elasticsearch/client.py @@ -12,11 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. # +from datetime import datetime import itertools from oslo_log import log import requests +from cloudkitty.storage.v2 import elasticsearch from cloudkitty.storage.v2.elasticsearch import exceptions from cloudkitty.utils import json @@ -248,8 +250,12 @@ class ElasticsearchClient(object): data = '\n'.join(itertools.chain( *[(instruction, json.dumps(term)) for term in terms] )) + '\n' - url = '/'.join( - (self._url, self._index_name, self._mapping_name, '_bulk')) + if elasticsearch.CONF.storage_elasticsearch.use_datastream: + url = '/'.join( + (self._url, self._index_name, '_bulk')) + else: + url = '/'.join( + (self._url, self._index_name, self._mapping_name, '_bulk')) return self._req(self._sess.post, url, data, None, deserialize=False) def bulk_index(self, terms): @@ -259,7 +265,10 @@ class ElasticsearchClient(object): :type terms: collections.abc.Iterable """ LOG.debug("Indexing {} documents".format(len(terms))) - return self.bulk_with_instruction({"index": {}}, terms) + if elasticsearch.CONF.storage_elasticsearch.use_datastream: + return self.bulk_with_instruction({"create": {}}, terms) + else: + return self.bulk_with_instruction({"index": {}}, terms) def commit(self): """Index all documents""" @@ -275,17 +284,31 @@ class ElasticsearchClient(object): :param type_: type of the DataPoint :type type_: str """ - self._docs.append({ - 'start': start, - 'end': end, - 'type': type_, - 'unit': point.unit, - 'description': point.description, - 'qty': point.qty, - 'price': point.price, - 'groupby': point.groupby, - 'metadata': point.metadata, - }) + if elasticsearch.CONF.storage_elasticsearch.use_datastream: + self._docs.append({ + '@timestamp': datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), + 'start': start, + 'end': end, + 'type': type_, + 'unit': point.unit, + 'description': point.description, + 'qty': point.qty, + 'price': point.price, + 'groupby': point.groupby, + 'metadata': point.metadata, + }) + else: + self._docs.append({ + 'start': start, + 'end': end, + 'type': type_, + 'unit': point.unit, + 'description': point.description, + 'qty': point.qty, + 'price': point.price, + 'groupby': point.groupby, + 'metadata': point.metadata, + }) if self._autocommit and len(self._docs) >= self._chunk_size: self.commit() diff --git a/cloudkitty/storage/v2/opensearch/__init__.py b/cloudkitty/storage/v2/opensearch/__init__.py index 65b07fba..0f431049 100644 --- a/cloudkitty/storage/v2/opensearch/__init__.py +++ b/cloudkitty/storage/v2/opensearch/__init__.py @@ -52,6 +52,9 @@ opensearch_storage_opts = [ "contexts should be kept alive.", advanced=True, default=30, min=0, max=300), + cfg.BoolOpt('use_datastream', + help='Use a datastream rather than an index.', + default=False), ] CONF.register_opts(opensearch_storage_opts, OPENSEARCH_STORAGE_GROUP) diff --git a/cloudkitty/storage/v2/opensearch/client.py b/cloudkitty/storage/v2/opensearch/client.py index 6242019f..868af2c4 100644 --- a/cloudkitty/storage/v2/opensearch/client.py +++ b/cloudkitty/storage/v2/opensearch/client.py @@ -12,11 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. # +from datetime import datetime import itertools from oslo_log import log import requests +from cloudkitty.storage.v2 import opensearch from cloudkitty.storage.v2.opensearch import exceptions from cloudkitty.utils import json @@ -246,8 +248,7 @@ class OpenSearchClient(object): data = '\n'.join(itertools.chain( *[(instruction, json.dumps(term)) for term in terms] )) + '\n' - url = '/'.join( - (self._url, self._index_name, '_bulk')) + url = '/'.join((self._url, self._index_name, '_bulk')) return self._req(self._sess.post, url, data, None, deserialize=False) def bulk_index(self, terms): @@ -257,7 +258,10 @@ class OpenSearchClient(object): :type terms: collections.abc.Iterable """ LOG.debug("Indexing {} documents".format(len(terms))) - return self.bulk_with_instruction({"index": {}}, terms) + if opensearch.CONF.storage_opensearch.use_datastream: + return self.bulk_with_instruction({"create": {}}, terms) + else: + return self.bulk_with_instruction({"index": {}}, terms) def commit(self): """Index all documents""" @@ -273,16 +277,30 @@ class OpenSearchClient(object): :param type_: type of the DataPoint :type type_: str """ - self._docs.append({ - 'start': start, - 'end': end, - 'type': type_, - 'unit': point.unit, - 'qty': point.qty, - 'price': point.price, - 'groupby': point.groupby, - 'metadata': point.metadata, - }) + if opensearch.CONF.storage_opensearch.use_datastream: + self._docs.append({ + '@timestamp': datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), + 'start': start, + 'end': end, + 'type': type_, + 'unit': point.unit, + 'description': point.description, + 'qty': point.qty, + 'price': point.price, + 'groupby': point.groupby, + 'metadata': point.metadata, + }) + else: + self._docs.append({ + 'start': start, + 'end': end, + 'type': type_, + 'unit': point.unit, + 'qty': point.qty, + 'price': point.price, + 'groupby': point.groupby, + 'metadata': point.metadata, + }) if self._autocommit and len(self._docs) >= self._chunk_size: self.commit() diff --git a/releasenotes/notes/add-opensearch-elasticsearch-datastreams-support-28b7c1ce700d33c0.yaml b/releasenotes/notes/add-opensearch-elasticsearch-datastreams-support-28b7c1ce700d33c0.yaml new file mode 100644 index 00000000..f41fb186 --- /dev/null +++ b/releasenotes/notes/add-opensearch-elasticsearch-datastreams-support-28b7c1ce700d33c0.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + Adds support to the OpenSearch and Elasticsearch v2 storage backends for + using datastreams, rather than indices. These are useful when a cluster + has multiple storage types, as data can be rotated from hot to cold storage + when not in active use. The option ``use_datastream`` can be set under + either ``[storage_elasticsearch]`` or ``[storage_opensearch]``.