Add opensearch/elasticsearch datastreams support

Currently, cloudkitty only uses Elasticsearch/Opensearch leagacy index,
but not datastreams. Datastreams are much nicer, because they
handle automation for rotating data to cold storage
(if the elastic cluster has multiple types of storage,
like hot / warm / cold).

This patch adds a configuration option to use datastreams.

Change-Id: Iac717c948665effda82a2a87329647c74e3f0667
This commit is contained in:
Thomas Goirand
2024-04-16 15:48:46 +02:00
committed by Matt Crees
parent 9bedcdc6fc
commit 0bcde58eb6
5 changed files with 83 additions and 27 deletions

View File

@@ -52,6 +52,10 @@ elasticsearch_storage_opts = [
"should be kept alive.",
advanced=True,
default=30, min=0, max=300),
cfg.BoolOpt('use_datastream',
help='Use a datastream rather than an index. This is useful '
'starting with Elasticsearch 7.17.',
default=False),
]
CONF.register_opts(elasticsearch_storage_opts, ELASTICSEARCH_STORAGE_GROUP)

View File

@@ -12,11 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
#
from datetime import datetime
import itertools
from oslo_log import log
import requests
from cloudkitty.storage.v2 import elasticsearch
from cloudkitty.storage.v2.elasticsearch import exceptions
from cloudkitty.utils import json
@@ -248,8 +250,12 @@ class ElasticsearchClient(object):
data = '\n'.join(itertools.chain(
*[(instruction, json.dumps(term)) for term in terms]
)) + '\n'
url = '/'.join(
(self._url, self._index_name, self._mapping_name, '_bulk'))
if elasticsearch.CONF.storage_elasticsearch.use_datastream:
url = '/'.join(
(self._url, self._index_name, '_bulk'))
else:
url = '/'.join(
(self._url, self._index_name, self._mapping_name, '_bulk'))
return self._req(self._sess.post, url, data, None, deserialize=False)
def bulk_index(self, terms):
@@ -259,7 +265,10 @@ class ElasticsearchClient(object):
:type terms: collections.abc.Iterable
"""
LOG.debug("Indexing {} documents".format(len(terms)))
return self.bulk_with_instruction({"index": {}}, terms)
if elasticsearch.CONF.storage_elasticsearch.use_datastream:
return self.bulk_with_instruction({"create": {}}, terms)
else:
return self.bulk_with_instruction({"index": {}}, terms)
def commit(self):
"""Index all documents"""
@@ -275,17 +284,31 @@ class ElasticsearchClient(object):
:param type_: type of the DataPoint
:type type_: str
"""
self._docs.append({
'start': start,
'end': end,
'type': type_,
'unit': point.unit,
'description': point.description,
'qty': point.qty,
'price': point.price,
'groupby': point.groupby,
'metadata': point.metadata,
})
if elasticsearch.CONF.storage_elasticsearch.use_datastream:
self._docs.append({
'@timestamp': datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
'start': start,
'end': end,
'type': type_,
'unit': point.unit,
'description': point.description,
'qty': point.qty,
'price': point.price,
'groupby': point.groupby,
'metadata': point.metadata,
})
else:
self._docs.append({
'start': start,
'end': end,
'type': type_,
'unit': point.unit,
'description': point.description,
'qty': point.qty,
'price': point.price,
'groupby': point.groupby,
'metadata': point.metadata,
})
if self._autocommit and len(self._docs) >= self._chunk_size:
self.commit()

View File

@@ -52,6 +52,9 @@ opensearch_storage_opts = [
"contexts should be kept alive.",
advanced=True,
default=30, min=0, max=300),
cfg.BoolOpt('use_datastream',
help='Use a datastream rather than an index.',
default=False),
]
CONF.register_opts(opensearch_storage_opts, OPENSEARCH_STORAGE_GROUP)

View File

@@ -12,11 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
#
from datetime import datetime
import itertools
from oslo_log import log
import requests
from cloudkitty.storage.v2 import opensearch
from cloudkitty.storage.v2.opensearch import exceptions
from cloudkitty.utils import json
@@ -246,8 +248,7 @@ class OpenSearchClient(object):
data = '\n'.join(itertools.chain(
*[(instruction, json.dumps(term)) for term in terms]
)) + '\n'
url = '/'.join(
(self._url, self._index_name, '_bulk'))
url = '/'.join((self._url, self._index_name, '_bulk'))
return self._req(self._sess.post, url, data, None, deserialize=False)
def bulk_index(self, terms):
@@ -257,7 +258,10 @@ class OpenSearchClient(object):
:type terms: collections.abc.Iterable
"""
LOG.debug("Indexing {} documents".format(len(terms)))
return self.bulk_with_instruction({"index": {}}, terms)
if opensearch.CONF.storage_opensearch.use_datastream:
return self.bulk_with_instruction({"create": {}}, terms)
else:
return self.bulk_with_instruction({"index": {}}, terms)
def commit(self):
"""Index all documents"""
@@ -273,16 +277,30 @@ class OpenSearchClient(object):
:param type_: type of the DataPoint
:type type_: str
"""
self._docs.append({
'start': start,
'end': end,
'type': type_,
'unit': point.unit,
'qty': point.qty,
'price': point.price,
'groupby': point.groupby,
'metadata': point.metadata,
})
if opensearch.CONF.storage_opensearch.use_datastream:
self._docs.append({
'@timestamp': datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
'start': start,
'end': end,
'type': type_,
'unit': point.unit,
'description': point.description,
'qty': point.qty,
'price': point.price,
'groupby': point.groupby,
'metadata': point.metadata,
})
else:
self._docs.append({
'start': start,
'end': end,
'type': type_,
'unit': point.unit,
'qty': point.qty,
'price': point.price,
'groupby': point.groupby,
'metadata': point.metadata,
})
if self._autocommit and len(self._docs) >= self._chunk_size:
self.commit()

View File

@@ -0,0 +1,8 @@
---
features:
- |
Adds support to the OpenSearch and Elasticsearch v2 storage backends for
using datastreams, rather than indices. These are useful when a cluster
has multiple storage types, as data can be rotated from hot to cold storage
when not in active use. The option ``use_datastream`` can be set under
either ``[storage_elasticsearch]`` or ``[storage_opensearch]``.