diff --git a/cloudkitty/storage/v2/elasticsearch/__init__.py b/cloudkitty/storage/v2/elasticsearch/__init__.py index 0384d323..49d43ba7 100644 --- a/cloudkitty/storage/v2/elasticsearch/__init__.py +++ b/cloudkitty/storage/v2/elasticsearch/__init__.py @@ -52,6 +52,10 @@ elasticsearch_storage_opts = [ "should be kept alive.", advanced=True, default=30, min=0, max=300), + cfg.BoolOpt('use_datastream', + help='Use a datastream rather than an index. This is useful ' + 'starting with Elasticsearch 7.17.', + default=False), ] CONF.register_opts(elasticsearch_storage_opts, ELASTICSEARCH_STORAGE_GROUP) diff --git a/cloudkitty/storage/v2/elasticsearch/client.py b/cloudkitty/storage/v2/elasticsearch/client.py index 525f8655..98dc8a29 100644 --- a/cloudkitty/storage/v2/elasticsearch/client.py +++ b/cloudkitty/storage/v2/elasticsearch/client.py @@ -12,11 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. # +from datetime import datetime import itertools from oslo_log import log import requests +from cloudkitty.storage.v2 import elasticsearch from cloudkitty.storage.v2.elasticsearch import exceptions from cloudkitty.utils import json @@ -248,8 +250,12 @@ class ElasticsearchClient(object): data = '\n'.join(itertools.chain( *[(instruction, json.dumps(term)) for term in terms] )) + '\n' - url = '/'.join( - (self._url, self._index_name, self._mapping_name, '_bulk')) + if elasticsearch.CONF.storage_elasticsearch.use_datastream: + url = '/'.join( + (self._url, self._index_name, '_bulk')) + else: + url = '/'.join( + (self._url, self._index_name, self._mapping_name, '_bulk')) return self._req(self._sess.post, url, data, None, deserialize=False) def bulk_index(self, terms): @@ -259,7 +265,10 @@ class ElasticsearchClient(object): :type terms: collections.abc.Iterable """ LOG.debug("Indexing {} documents".format(len(terms))) - return self.bulk_with_instruction({"index": {}}, terms) + if elasticsearch.CONF.storage_elasticsearch.use_datastream: + return self.bulk_with_instruction({"create": {}}, terms) + else: + return self.bulk_with_instruction({"index": {}}, terms) def commit(self): """Index all documents""" @@ -275,17 +284,31 @@ class ElasticsearchClient(object): :param type_: type of the DataPoint :type type_: str """ - self._docs.append({ - 'start': start, - 'end': end, - 'type': type_, - 'unit': point.unit, - 'description': point.description, - 'qty': point.qty, - 'price': point.price, - 'groupby': point.groupby, - 'metadata': point.metadata, - }) + if elasticsearch.CONF.storage_elasticsearch.use_datastream: + self._docs.append({ + '@timestamp': datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), + 'start': start, + 'end': end, + 'type': type_, + 'unit': point.unit, + 'description': point.description, + 'qty': point.qty, + 'price': point.price, + 'groupby': point.groupby, + 'metadata': point.metadata, + }) + else: + self._docs.append({ + 'start': start, + 'end': end, + 'type': type_, + 'unit': point.unit, + 'description': point.description, + 'qty': point.qty, + 'price': point.price, + 'groupby': point.groupby, + 'metadata': point.metadata, + }) if self._autocommit and len(self._docs) >= self._chunk_size: self.commit() diff --git a/cloudkitty/storage/v2/opensearch/__init__.py b/cloudkitty/storage/v2/opensearch/__init__.py index 65b07fba..0f431049 100644 --- a/cloudkitty/storage/v2/opensearch/__init__.py +++ b/cloudkitty/storage/v2/opensearch/__init__.py @@ -52,6 +52,9 @@ opensearch_storage_opts = [ "contexts should be kept alive.", advanced=True, default=30, min=0, max=300), + cfg.BoolOpt('use_datastream', + help='Use a datastream rather than an index.', + default=False), ] CONF.register_opts(opensearch_storage_opts, OPENSEARCH_STORAGE_GROUP) diff --git a/cloudkitty/storage/v2/opensearch/client.py b/cloudkitty/storage/v2/opensearch/client.py index 6242019f..868af2c4 100644 --- a/cloudkitty/storage/v2/opensearch/client.py +++ b/cloudkitty/storage/v2/opensearch/client.py @@ -12,11 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. # +from datetime import datetime import itertools from oslo_log import log import requests +from cloudkitty.storage.v2 import opensearch from cloudkitty.storage.v2.opensearch import exceptions from cloudkitty.utils import json @@ -246,8 +248,7 @@ class OpenSearchClient(object): data = '\n'.join(itertools.chain( *[(instruction, json.dumps(term)) for term in terms] )) + '\n' - url = '/'.join( - (self._url, self._index_name, '_bulk')) + url = '/'.join((self._url, self._index_name, '_bulk')) return self._req(self._sess.post, url, data, None, deserialize=False) def bulk_index(self, terms): @@ -257,7 +258,10 @@ class OpenSearchClient(object): :type terms: collections.abc.Iterable """ LOG.debug("Indexing {} documents".format(len(terms))) - return self.bulk_with_instruction({"index": {}}, terms) + if opensearch.CONF.storage_opensearch.use_datastream: + return self.bulk_with_instruction({"create": {}}, terms) + else: + return self.bulk_with_instruction({"index": {}}, terms) def commit(self): """Index all documents""" @@ -273,16 +277,30 @@ class OpenSearchClient(object): :param type_: type of the DataPoint :type type_: str """ - self._docs.append({ - 'start': start, - 'end': end, - 'type': type_, - 'unit': point.unit, - 'qty': point.qty, - 'price': point.price, - 'groupby': point.groupby, - 'metadata': point.metadata, - }) + if opensearch.CONF.storage_opensearch.use_datastream: + self._docs.append({ + '@timestamp': datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), + 'start': start, + 'end': end, + 'type': type_, + 'unit': point.unit, + 'description': point.description, + 'qty': point.qty, + 'price': point.price, + 'groupby': point.groupby, + 'metadata': point.metadata, + }) + else: + self._docs.append({ + 'start': start, + 'end': end, + 'type': type_, + 'unit': point.unit, + 'qty': point.qty, + 'price': point.price, + 'groupby': point.groupby, + 'metadata': point.metadata, + }) if self._autocommit and len(self._docs) >= self._chunk_size: self.commit() diff --git a/releasenotes/notes/add-opensearch-elasticsearch-datastreams-support-28b7c1ce700d33c0.yaml b/releasenotes/notes/add-opensearch-elasticsearch-datastreams-support-28b7c1ce700d33c0.yaml new file mode 100644 index 00000000..f41fb186 --- /dev/null +++ b/releasenotes/notes/add-opensearch-elasticsearch-datastreams-support-28b7c1ce700d33c0.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + Adds support to the OpenSearch and Elasticsearch v2 storage backends for + using datastreams, rather than indices. These are useful when a cluster + has multiple storage types, as data can be rotated from hot to cold storage + when not in active use. The option ``use_datastream`` can be set under + either ``[storage_elasticsearch]`` or ``[storage_opensearch]``.