Merge "Add opensearch/elasticsearch datastreams support"

This commit is contained in:
Zuul
2025-01-20 15:07:22 +00:00
committed by Gerrit Code Review
5 changed files with 83 additions and 27 deletions

View File

@@ -52,6 +52,10 @@ elasticsearch_storage_opts = [
"should be kept alive.",
advanced=True,
default=30, min=0, max=300),
cfg.BoolOpt('use_datastream',
help='Use a datastream rather than an index. This is useful '
'starting with Elasticsearch 7.17.',
default=False),
]
CONF.register_opts(elasticsearch_storage_opts, ELASTICSEARCH_STORAGE_GROUP)

View File

@@ -12,11 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
#
from datetime import datetime
import itertools
from oslo_log import log
import requests
from cloudkitty.storage.v2 import elasticsearch
from cloudkitty.storage.v2.elasticsearch import exceptions
from cloudkitty.utils import json
@@ -248,8 +250,12 @@ class ElasticsearchClient(object):
data = '\n'.join(itertools.chain(
*[(instruction, json.dumps(term)) for term in terms]
)) + '\n'
url = '/'.join(
(self._url, self._index_name, self._mapping_name, '_bulk'))
if elasticsearch.CONF.storage_elasticsearch.use_datastream:
url = '/'.join(
(self._url, self._index_name, '_bulk'))
else:
url = '/'.join(
(self._url, self._index_name, self._mapping_name, '_bulk'))
return self._req(self._sess.post, url, data, None, deserialize=False)
def bulk_index(self, terms):
@@ -259,7 +265,10 @@ class ElasticsearchClient(object):
:type terms: collections.abc.Iterable
"""
LOG.debug("Indexing {} documents".format(len(terms)))
return self.bulk_with_instruction({"index": {}}, terms)
if elasticsearch.CONF.storage_elasticsearch.use_datastream:
return self.bulk_with_instruction({"create": {}}, terms)
else:
return self.bulk_with_instruction({"index": {}}, terms)
def commit(self):
"""Index all documents"""
@@ -275,17 +284,31 @@ class ElasticsearchClient(object):
:param type_: type of the DataPoint
:type type_: str
"""
self._docs.append({
'start': start,
'end': end,
'type': type_,
'unit': point.unit,
'description': point.description,
'qty': point.qty,
'price': point.price,
'groupby': point.groupby,
'metadata': point.metadata,
})
if elasticsearch.CONF.storage_elasticsearch.use_datastream:
self._docs.append({
'@timestamp': datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
'start': start,
'end': end,
'type': type_,
'unit': point.unit,
'description': point.description,
'qty': point.qty,
'price': point.price,
'groupby': point.groupby,
'metadata': point.metadata,
})
else:
self._docs.append({
'start': start,
'end': end,
'type': type_,
'unit': point.unit,
'description': point.description,
'qty': point.qty,
'price': point.price,
'groupby': point.groupby,
'metadata': point.metadata,
})
if self._autocommit and len(self._docs) >= self._chunk_size:
self.commit()

View File

@@ -52,6 +52,9 @@ opensearch_storage_opts = [
"contexts should be kept alive.",
advanced=True,
default=30, min=0, max=300),
cfg.BoolOpt('use_datastream',
help='Use a datastream rather than an index.',
default=False),
]
CONF.register_opts(opensearch_storage_opts, OPENSEARCH_STORAGE_GROUP)

View File

@@ -12,11 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
#
from datetime import datetime
import itertools
from oslo_log import log
import requests
from cloudkitty.storage.v2 import opensearch
from cloudkitty.storage.v2.opensearch import exceptions
from cloudkitty.utils import json
@@ -246,8 +248,7 @@ class OpenSearchClient(object):
data = '\n'.join(itertools.chain(
*[(instruction, json.dumps(term)) for term in terms]
)) + '\n'
url = '/'.join(
(self._url, self._index_name, '_bulk'))
url = '/'.join((self._url, self._index_name, '_bulk'))
return self._req(self._sess.post, url, data, None, deserialize=False)
def bulk_index(self, terms):
@@ -257,7 +258,10 @@ class OpenSearchClient(object):
:type terms: collections.abc.Iterable
"""
LOG.debug("Indexing {} documents".format(len(terms)))
return self.bulk_with_instruction({"index": {}}, terms)
if opensearch.CONF.storage_opensearch.use_datastream:
return self.bulk_with_instruction({"create": {}}, terms)
else:
return self.bulk_with_instruction({"index": {}}, terms)
def commit(self):
"""Index all documents"""
@@ -273,16 +277,30 @@ class OpenSearchClient(object):
:param type_: type of the DataPoint
:type type_: str
"""
self._docs.append({
'start': start,
'end': end,
'type': type_,
'unit': point.unit,
'qty': point.qty,
'price': point.price,
'groupby': point.groupby,
'metadata': point.metadata,
})
if opensearch.CONF.storage_opensearch.use_datastream:
self._docs.append({
'@timestamp': datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
'start': start,
'end': end,
'type': type_,
'unit': point.unit,
'description': point.description,
'qty': point.qty,
'price': point.price,
'groupby': point.groupby,
'metadata': point.metadata,
})
else:
self._docs.append({
'start': start,
'end': end,
'type': type_,
'unit': point.unit,
'qty': point.qty,
'price': point.price,
'groupby': point.groupby,
'metadata': point.metadata,
})
if self._autocommit and len(self._docs) >= self._chunk_size:
self.commit()

View File

@@ -0,0 +1,8 @@
---
features:
- |
Adds support to the OpenSearch and Elasticsearch v2 storage backends for
using datastreams, rather than indices. These are useful when a cluster
has multiple storage types, as data can be rotated from hot to cold storage
when not in active use. The option ``use_datastream`` can be set under
either ``[storage_elasticsearch]`` or ``[storage_opensearch]``.