Add an ElasticSearch v2 storage driver

This introduces a v2 storage driver for ElasticSearch. It is considered
experimental for now and should not be used in production.

Support for this storage backend will be added to the devstack plugin.

Change-Id: I2f962a32ed541e62fcb847018f270613a69c8677
Story: 2006332
Task: 36076
This commit is contained in:
Luka Peschke 2019-08-28 16:30:00 +02:00
parent a81c01d6e2
commit 15f6118ece
10 changed files with 1254 additions and 12 deletions

View File

@ -30,6 +30,7 @@ import cloudkitty.orchestrator
import cloudkitty.service
import cloudkitty.storage
import cloudkitty.storage.v1.hybrid.backends.gnocchi
import cloudkitty.storage.v2.elasticsearch
import cloudkitty.storage.v2.influx
import cloudkitty.utils
@ -70,6 +71,8 @@ _opts = [
cloudkitty.storage.storage_opts))),
('storage_influxdb', list(itertools.chain(
cloudkitty.storage.v2.influx.influx_storage_opts))),
('storage_elasticsearch', list(itertools.chain(
cloudkitty.storage.v2.elasticsearch.elasticsearch_storage_opts))),
('storage_gnocchi', list(itertools.chain(
cloudkitty.storage.v1.hybrid.backends.gnocchi.gnocchi_storage_opts))),
(None, list(itertools.chain(

View File

@ -0,0 +1,204 @@
# Copyright 2019 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import datetime
from oslo_config import cfg
from oslo_log import log
from cloudkitty import dataframe
from cloudkitty.storage import v2 as v2_storage
from cloudkitty.storage.v2.elasticsearch import client as es_client
from cloudkitty.storage.v2.elasticsearch import exceptions
from cloudkitty import tzutils
LOG = log.getLogger(__name__)
CONF = cfg.CONF
ELASTICSEARCH_STORAGE_GROUP = 'storage_elasticsearch'
elasticsearch_storage_opts = [
cfg.StrOpt(
'host',
help='Elasticsearch host, along with port and protocol. '
'Defaults to http://localhost:9200',
default='http://localhost:9200'),
cfg.StrOpt(
'index_name',
help='Elasticsearch index to use. Defaults to "cloudkitty".',
default='cloudkitty'),
cfg.BoolOpt('insecure',
help='Set to true to allow insecure HTTPS '
'connections to Elasticsearch',
default=False),
cfg.StrOpt('cafile',
help='Path of the CA certificate to trust for '
'HTTPS connections.',
default=None),
cfg.IntOpt('scroll_duration',
help="Duration (in seconds) for which the ES scroll contexts "
"should be kept alive.",
advanced=True,
default=30, min=0, max=300),
]
CONF.register_opts(elasticsearch_storage_opts, ELASTICSEARCH_STORAGE_GROUP)
CLOUDKITTY_INDEX_MAPPING = {
"dynamic_templates": [
{
"strings_as_keywords": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
],
"dynamic": False,
"properties": {
"start": {"type": "date"},
"end": {"type": "date"},
"type": {"type": "keyword"},
"unit": {"type": "keyword"},
"qty": {"type": "double"},
"price": {"type": "double"},
"groupby": {"dynamic": True, "type": "object"},
"metadata": {"dynamic": True, "type": "object"}
},
}
class ElasticsearchStorage(v2_storage.BaseStorage):
def __init__(self, *args, **kwargs):
super(ElasticsearchStorage, self).__init__(*args, **kwargs)
LOG.warning('The Elasticsearch storage driver is experimental. '
'DO NOT USE IT IN PRODUCTION.')
verify = not CONF.storage_elasticsearch.insecure
if verify and CONF.storage_elasticsearch.cafile:
verify = CONF.storage_elasticsearch.cafile
self._conn = es_client.ElasticsearchClient(
CONF.storage_elasticsearch.host,
CONF.storage_elasticsearch.index_name,
"_doc",
verify=verify)
def init(self):
r = self._conn.get_index()
if r.status_code != 200:
raise exceptions.IndexDoesNotExist(
CONF.storage_elasticsearch.index_name)
LOG.info('Creating mapping "_doc" on index {}...'.format(
CONF.storage_elasticsearch.index_name))
self._conn.put_mapping(CLOUDKITTY_INDEX_MAPPING)
LOG.info('Mapping created.')
def push(self, dataframes, scope_id=None):
for frame in dataframes:
for type_, point in frame.iterpoints():
start, end = self._local_to_utc(frame.start, frame.end)
self._conn.add_point(point, type_, start, end)
self._conn.commit()
@staticmethod
def _local_to_utc(*args):
return [tzutils.local_to_utc(arg) for arg in args]
@staticmethod
def _doc_to_datapoint(doc):
return dataframe.DataPoint(
doc['unit'],
doc['qty'],
doc['price'],
doc['groupby'],
doc['metadata'],
)
def _build_dataframes(self, docs):
dataframes = {}
nb_points = 0
for doc in docs:
source = doc['_source']
start = tzutils.dt_from_iso(source['start'])
end = tzutils.dt_from_iso(source['end'])
key = (start, end)
if key not in dataframes.keys():
dataframes[key] = dataframe.DataFrame(start=start, end=end)
dataframes[key].add_point(
self._doc_to_datapoint(source), source['type'])
nb_points += 1
output = list(dataframes.values())
output.sort(key=lambda frame: (frame.start, frame.end))
return output
def retrieve(self, begin=None, end=None,
filters=None,
metric_types=None,
offset=0, limit=1000, paginate=True):
begin, end = self._local_to_utc(begin or tzutils.get_month_start(),
end or tzutils.get_next_month())
total, docs = self._conn.retrieve(
begin, end, filters, metric_types,
offset=offset, limit=limit, paginate=paginate)
return {
'total': total,
'dataframes': self._build_dataframes(docs),
}
def delete(self, begin=None, end=None, filters=None):
self._conn.delete_by_query(begin, end, filters)
@staticmethod
def _normalize_time(t):
if isinstance(t, datetime.datetime):
return tzutils.utc_to_local(t)
return tzutils.dt_from_iso(t)
def _doc_to_total_result(self, doc, start, end):
output = {
'begin': self._normalize_time(doc.get('start', start)),
'end': self._normalize_time(doc.get('end', end)),
'qty': doc['sum_qty']['value'],
'rate': doc['sum_price']['value'],
}
# Means we had a composite aggregation
if 'key' in doc.keys():
for key, value in doc['key'].items():
output[key] = value
return output
def total(self, groupby=None,
begin=None, end=None,
metric_types=None,
filters=None,
offset=0, limit=1000, paginate=True):
begin, end = self._local_to_utc(begin or tzutils.get_month_start(),
end or tzutils.get_next_month())
total, docs = self._conn.total(
begin, end, metric_types, filters, groupby,
offset=offset, limit=limit, paginate=paginate)
return {
'total': total,
'results': [self._doc_to_total_result(doc, begin, end)
for doc in docs],
}

View File

@ -0,0 +1,399 @@
# Copyright 2019 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import itertools
from oslo_log import log
import requests
from cloudkitty import json_utils as json
from cloudkitty.storage.v2.elasticsearch import exceptions
LOG = log.getLogger(__name__)
class ElasticsearchClient(object):
"""Class used to ease interaction with Elasticsearch.
:param autocommit: Defaults to True. Automatically push documents to
Elasticsearch once chunk_size has been reached.
:type autocommit: bool
:param chunk_size: Maximal number of documents to commit/retrieve at once.
:type chunk_size: int
:param scroll_duration: Defaults to 60. Duration, in seconds, for which
search contexts should be kept alive
:type scroll_duration: int
"""
def __init__(self, url, index_name, mapping_name,
verify=True,
autocommit=True,
chunk_size=5000,
scroll_duration=60):
self._url = url.strip('/')
self._index_name = index_name.strip('/')
self._mapping_name = mapping_name.strip('/')
self._autocommit = autocommit
self._chunk_size = chunk_size
self._scroll_duration = str(scroll_duration) + 's'
self._scroll_params = {'scroll': self._scroll_duration}
self._docs = []
self._scroll_ids = set()
self._sess = requests.Session()
self._verify = self._sess.verify = verify
self._sess.headers = {'Content-Type': 'application/json'}
@staticmethod
def _log_query(url, query, response):
message = 'Query on {} with body "{}" took {}ms'.format(
url, query, response['took'])
if 'hits' in response.keys():
message += ' for {} hits'.format(response['hits']['total'])
LOG.debug(message)
@staticmethod
def _build_must(start, end, metric_types, filters):
must = []
if start:
must.append({"range": {"start": {"gte": start.isoformat()}}})
if end:
must.append({"range": {"end": {"lte": end.isoformat()}}})
if filters and 'type' in filters.keys():
must.append({'term': {'type': filters['type']}})
if metric_types:
must.append({"terms": {"type": metric_types}})
return must
@staticmethod
def _build_should(filters):
if not filters:
return []
should = []
for k, v in filters.items():
if k != 'type':
should += [{'term': {'groupby.' + k: v}},
{'term': {'metadata.' + k: v}}]
return should
@staticmethod
def _build_composite(groupby):
if not groupby:
return []
sources = []
for elem in groupby:
if elem == 'type':
sources.append({'type': {'terms': {'field': 'type'}}})
else:
sources.append({elem: {'terms': {'field': 'groupby.' + elem}}})
return {"sources": sources}
@staticmethod
def _build_query(must, should, composite):
query = {}
if must or should:
query["query"] = {"bool": {}}
if must:
query["query"]["bool"]["must"] = must
if should:
query["query"]["bool"]["should"] = should
# We want each term to match exactly once, and each term introduces
# two "term" aggregations: one for "groupby" and one for "metadata"
query["query"]["bool"]["minimum_should_match"] = len(should) // 2
if composite:
query["aggs"] = {"sum_and_price": {
"composite": composite,
"aggregations": {
"sum_price": {"sum": {"field": "price"}},
"sum_qty": {"sum": {"field": "qty"}},
}
}}
return query
def _req(self, method, url, data, params, deserialize=True):
r = method(url, data=data, params=params)
if r.status_code < 200 or r.status_code >= 300:
raise exceptions.InvalidStatusCode(
200, r.status_code, r.text, data)
if not deserialize:
return r
output = r.json()
self._log_query(url, data, output)
return output
def put_mapping(self, mapping):
"""Does a PUT request against ES's mapping API.
The PUT request will be done against
`/<index_name>/_mapping/<mapping_name>`
:mapping: body of the request
:type mapping: dict
:rtype: requests.models.Response
"""
url = '/'.join(
(self._url, self._index_name, '_mapping', self._mapping_name))
# NOTE(peschk_l): This is done for compatibility with
# Elasticsearch 6 and 7.
param = {"include_type_name": "true"}
return self._req(
self._sess.put, url, json.dumps(mapping), param, deserialize=False)
def get_index(self):
"""Does a GET request against ES's index API.
The GET request will be done against `/<index_name>`
:rtype: requests.models.Response
"""
url = '/'.join((self._url, self._index_name))
return self._req(self._sess.get, url, None, None, deserialize=False)
def search(self, body, scroll=True):
"""Does a GET request against ES's search API.
The GET request will be done against `/<index_name>/_search`
:param body: body of the request
:type body: dict
:rtype: dict
"""
url = '/'.join((self._url, self._index_name, '_search'))
params = self._scroll_params if scroll else None
return self._req(
self._sess.get, url, json.dumps(body), params)
def scroll(self, body):
"""Does a GET request against ES's scroll API.
The GET request will be done against `/_search/scroll`
:param body: body of the request
:type body: dict
:rtype: dict
"""
url = '/'.join((self._url, '_search/scroll'))
return self._req(self._sess.get, url, json.dumps(body), None)
def close_scroll(self, body):
"""Does a DELETE request against ES's scroll API.
The DELETE request will be done against `/_search/scroll`
:param body: body of the request
:type body: dict
:rtype: dict
"""
url = '/'.join((self._url, '_search/scroll'))
resp = self._req(
self._sess.delete, url, json.dumps(body), None, deserialize=False)
body = resp.json()
LOG.debug('Freed {} scrolls contexts'.format(body['num_freed']))
return body
def close_scrolls(self):
"""Closes all scroll contexts opened by this client."""
ids = list(self._scroll_ids)
LOG.debug('Closing {} scroll contexts: {}'.format(len(ids), ids))
self.close_scroll({'scroll_id': ids})
self._scroll_ids = set()
def bulk_with_instruction(self, instruction, terms):
"""Does a POST request against ES's bulk API
The POST request will be done against
`/<index_name>/<mapping_name>/_bulk`
The instruction will be appended before each term. For example,
bulk_with_instruction('instr', ['one', 'two']) will produce::
instr
one
instr
two
:param instruction: instruction to execute for each term
:type instruction: dict
:param terms: list of terms for which instruction should be executed
:type terms: collections.Iterable
:rtype: requests.models.Response
"""
instruction = json.dumps(instruction)
data = '\n'.join(itertools.chain(
*[(instruction, json.dumps(term)) for term in terms]
)) + '\n'
url = '/'.join(
(self._url, self._index_name, self._mapping_name, '_bulk'))
return self._req(self._sess.post, url, data, None, deserialize=False)
def bulk_index(self, terms):
"""Indexes each of the documents in 'terms'
:param terms: list of documents to index
:type terms: collections.Iterable
"""
LOG.debug("Indexing {} documents".format(len(terms)))
return self.bulk_with_instruction({"index": {}}, terms)
def commit(self):
"""Index all documents"""
while self._docs:
self.bulk_index(self._docs[:self._chunk_size])
self._docs = self._docs[self._chunk_size:]
def add_point(self, point, type_, start, end):
"""Append a point to the client.
:param point: DataPoint to append
:type point: cloudkitty.dataframe.DataPoint
:param type_: type of the DataPoint
:type type_: str
"""
self._docs.append({
'start': start,
'end': end,
'type': type_,
'unit': point.unit,
'qty': point.qty,
'price': point.price,
'groupby': point.groupby,
'metadata': point.metadata,
})
if self._autocommit and len(self._docs) >= self._chunk_size:
self.commit()
def _get_chunk_size(self, offset, limit, paginate):
if paginate and offset + limit < self._chunk_size:
return offset + limit
return self._chunk_size
def retrieve(self, begin, end, filters, metric_types,
offset=0, limit=1000, paginate=True):
"""Retrieves a paginated list of documents from Elasticsearch."""
if not paginate:
offset = 0
query = self._build_query(
self._build_must(begin, end, metric_types, filters),
self._build_should(filters), None)
query['size'] = self._get_chunk_size(offset, limit, paginate)
resp = self.search(query)
scroll_id = resp['_scroll_id']
self._scroll_ids.add(scroll_id)
total = resp['hits']['total']
chunk = resp['hits']['hits']
output = chunk[offset:offset+limit if paginate else len(chunk)]
offset = 0 if len(chunk) > offset else offset - len(chunk)
while (not paginate) or len(output) < limit:
resp = self.scroll({
'scroll_id': scroll_id,
'scroll': self._scroll_duration,
})
scroll_id, chunk = resp['_scroll_id'], resp['hits']['hits']
self._scroll_ids.add(scroll_id)
# Means we've scrolled until the end
if not chunk:
break
output += chunk[offset:offset+limit if paginate else len(chunk)]
offset = 0 if len(chunk) > offset else offset - len(chunk)
self.close_scrolls()
return total, output
def delete_by_query(self, begin=None, end=None, filters=None):
"""Does a POST request against ES's Delete By Query API.
The POST request will be done against
`/<index_name>/_delete_by_query`
:param filters: Optional filters for documents to delete
:type filters: list of dicts
:rtype: requests.models.Response
"""
url = '/'.join((self._url, self._index_name, '_delete_by_query'))
must = self._build_must(begin, end, None, filters)
data = (json.dumps({"query": {"bool": {"must": must}}})
if must else None)
return self._req(self._sess.post, url, data, None)
def total(self, begin, end, metric_types, filters, groupby,
offset=0, limit=1000, paginate=True):
if not paginate:
offset = 0
must = self._build_must(begin, end, metric_types, filters)
should = self._build_should(filters)
composite = self._build_composite(groupby) if groupby else None
if composite:
composite['size'] = self._chunk_size
query = self._build_query(must, should, composite)
if "aggs" not in query.keys():
query["aggs"] = {
"sum_price": {"sum": {"field": "price"}},
"sum_qty": {"sum": {"field": "qty"}},
}
query['size'] = 0
resp = self.search(query, scroll=False)
# Means we didn't group, so length is 1
if not composite:
return 1, [resp["aggregations"]]
after = resp["aggregations"]["sum_and_price"].get("after_key")
chunk = resp["aggregations"]["sum_and_price"]["buckets"]
total = len(chunk)
output = chunk[offset:offset+limit if paginate else len(chunk)]
offset = 0 if len(chunk) > offset else offset - len(chunk)
# FIXME(peschk_l): We have to iterate over ALL buckets in order to get
# the total length. If there is a way for composite aggregations to get
# the total amount of buckets, please fix this
while after:
composite_query = query["aggs"]["sum_and_price"]["composite"]
composite_query["size"] = self._chunk_size
composite_query["after"] = after
resp = self.search(query, scroll=False)
after = resp["aggregations"]["sum_and_price"].get("after_key")
chunk = resp["aggregations"]["sum_and_price"]["buckets"]
if not chunk:
break
output += chunk[offset:offset+limit if paginate else len(chunk)]
offset = 0 if len(chunk) > offset else offset - len(chunk)
total += len(chunk)
if paginate:
output = output[offset:offset+limit]
return total, output

View File

@ -0,0 +1,32 @@
# Copyright 2019 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
class BaseElasticsearchException(Exception):
"""Base exception raised by the Elasticsearch v2 storage driver"""
class InvalidStatusCode(BaseElasticsearchException):
def __init__(self, expected, actual, msg, query):
super(InvalidStatusCode, self).__init__(
"Expected {} status code, got {}: {}. Query was {}".format(
expected, actual, msg, query))
class IndexDoesNotExist(BaseElasticsearchException):
def __init__(self, index_name):
super(IndexDoesNotExist, self).__init__(
"Elasticsearch index {} does not exist".format(index_name)
)

View File

@ -0,0 +1,482 @@
# Copyright 2019 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import collections
import datetime
import unittest
from dateutil import tz
import mock
from cloudkitty import dataframe
from cloudkitty.storage.v2.elasticsearch import client
from cloudkitty.storage.v2.elasticsearch import exceptions
class TestElasticsearchClient(unittest.TestCase):
def setUp(self):
super(TestElasticsearchClient, self).setUp()
self.client = client.ElasticsearchClient(
'http://elasticsearch:9200',
'index_name',
'test_mapping',
autocommit=False)
def test_build_must_no_params(self):
self.assertEqual(self.client._build_must(None, None, None, None), [])
def test_build_must_with_start_end(self):
start = datetime.datetime(2019, 8, 30, tzinfo=tz.UTC)
end = datetime.datetime(2019, 8, 31, tzinfo=tz.UTC)
self.assertEqual(
self.client._build_must(start, end, None, None),
[{'range': {'start': {'gte': '2019-08-30T00:00:00+00:00'}}},
{'range': {'end': {'lte': '2019-08-31T00:00:00+00:00'}}}],
)
def test_build_must_with_filters(self):
filters = {'one': '1', 'two': '2', 'type': 'awesome'}
self.assertEqual(
self.client._build_must(None, None, None, filters),
[{'term': {'type': 'awesome'}}],
)
def test_build_must_with_metric_types(self):
types = ['awesome', 'amazing']
self.assertEqual(
self.client._build_must(None, None, types, None),
[{'terms': {'type': ['awesome', 'amazing']}}],
)
def test_build_should_no_filters(self):
self.assertEqual(
self.client._build_should(None),
[],
)
def test_build_should_with_filters(self):
filters = collections.OrderedDict([
('one', '1'), ('two', '2'), ('type', 'awesome')])
self.assertEqual(
self.client._build_should(filters),
[
{'term': {'groupby.one': '1'}},
{'term': {'metadata.one': '1'}},
{'term': {'groupby.two': '2'}},
{'term': {'metadata.two': '2'}},
],
)
def test_build_composite_no_groupby(self):
self.assertEqual(self.client._build_composite(None), [])
def test_build_composite(self):
self.assertEqual(
self.client._build_composite(['one', 'type', 'two']),
{'sources': [
{'one': {'terms': {'field': 'groupby.one'}}},
{'type': {'terms': {'field': 'type'}}},
{'two': {'terms': {'field': 'groupby.two'}}},
]},
)
def test_build_query_no_args(self):
self.assertEqual(self.client._build_query(None, None, None), {})
def test_build_query(self):
must = [{'range': {'start': {'gte': '2019-08-30T00:00:00+00:00'}}},
{'range': {'start': {'lt': '2019-08-31T00:00:00+00:00'}}}]
should = [
{'term': {'groupby.one': '1'}},
{'term': {'metadata.one': '1'}},
{'term': {'groupby.two': '2'}},
{'term': {'metadata.two': '2'}},
]
composite = {'sources': [
{'one': {'terms': {'field': 'groupby.one'}}},
{'type': {'terms': {'field': 'type'}}},
{'two': {'terms': {'field': 'groupby.two'}}},
]}
expected = {
'query': {
'bool': {
'must': must,
'should': should,
'minimum_should_match': 2,
},
},
'aggs': {
'sum_and_price': {
'composite': composite,
'aggregations': {
"sum_price": {"sum": {"field": "price"}},
"sum_qty": {"sum": {"field": "qty"}},
},
},
},
}
self.assertEqual(
self.client._build_query(must, should, composite), expected)
def test_log_query_no_hits(self):
url = '/endpoint'
body = {'1': 'one'}
response = {'took': 42}
expected = """Query on /endpoint with body "{'1': 'one'}" took 42ms"""
with mock.patch.object(client.LOG, 'debug') as debug_mock:
self.client._log_query(url, body, response)
debug_mock.assert_called_once_with(expected)
def test_log_query_with_hits(self):
url = '/endpoint'
body = {'1': 'one'}
response = {'took': 42, 'hits': {'total': 1337}}
expected = """Query on /endpoint with body "{'1': 'one'}" took 42ms"""
expected += " for 1337 hits"
with mock.patch.object(client.LOG, 'debug') as debug_mock:
self.client._log_query(url, body, response)
debug_mock.assert_called_once_with(expected)
def test_req_valid_status_code_no_deserialize(self):
resp_mock = mock.MagicMock()
resp_mock.status_code = 200
method_mock = mock.MagicMock()
method_mock.return_value = resp_mock
req_resp = self.client._req(
method_mock, None, None, None, deserialize=False)
method_mock.assert_called_once_with(None, data=None, params=None)
self.assertEqual(req_resp, resp_mock)
def test_req_valid_status_code_deserialize(self):
resp_mock = mock.MagicMock()
resp_mock.status_code = 200
resp_mock.json.return_value = 'output'
method_mock = mock.MagicMock()
method_mock.return_value = resp_mock
with mock.patch.object(self.client, '_log_query') as log_mock:
req_resp = self.client._req(
method_mock, None, None, None, deserialize=True)
method_mock.assert_called_once_with(None, data=None, params=None)
self.assertEqual(req_resp, 'output')
log_mock.assert_called_once_with(None, None, 'output')
def test_req_invalid_status_code(self):
resp_mock = mock.MagicMock()
resp_mock.status_code = 400
method_mock = mock.MagicMock()
method_mock.return_value = resp_mock
self.assertRaises(exceptions.InvalidStatusCode,
self.client._req,
method_mock, None, None, None)
def test_put_mapping(self):
mapping = {'a': 'b'}
with mock.patch.object(self.client, '_req') as rmock:
self.client.put_mapping(mapping)
rmock.assert_called_once_with(
self.client._sess.put,
'http://elasticsearch:9200/index_name/_mapping/test_mapping',
'{"a": "b"}', {'include_type_name': 'true'}, deserialize=False)
def test_get_index(self):
with mock.patch.object(self.client, '_req') as rmock:
self.client.get_index()
rmock.assert_called_once_with(
self.client._sess.get,
'http://elasticsearch:9200/index_name',
None, None, deserialize=False)
def test_search_without_scroll(self):
mapping = {'a': 'b'}
with mock.patch.object(self.client, '_req') as rmock:
self.client.search(mapping, scroll=False)
rmock.assert_called_once_with(
self.client._sess.get,
'http://elasticsearch:9200/index_name/_search',
'{"a": "b"}', None)
def test_search_with_scroll(self):
mapping = {'a': 'b'}
with mock.patch.object(self.client, '_req') as rmock:
self.client.search(mapping, scroll=True)
rmock.assert_called_once_with(
self.client._sess.get,
'http://elasticsearch:9200/index_name/_search',
'{"a": "b"}', {'scroll': '60s'})
def test_scroll(self):
body = {'a': 'b'}
with mock.patch.object(self.client, '_req') as rmock:
self.client.scroll(body)
rmock.assert_called_once_with(
self.client._sess.get,
'http://elasticsearch:9200/_search/scroll',
'{"a": "b"}', None)
def test_close_scroll(self):
body = {'a': 'b'}
with mock.patch.object(self.client, '_req') as rmock:
self.client.close_scroll(body)
rmock.assert_called_once_with(
self.client._sess.delete,
'http://elasticsearch:9200/_search/scroll',
'{"a": "b"}', None, deserialize=False)
def test_close_scrolls(self):
with mock.patch.object(self.client, 'close_scroll') as func_mock:
with mock.patch.object(self.client, '_scroll_ids',
new=['a', 'b', 'c']):
self.client.close_scrolls()
func_mock.assert_called_once_with(
{'scroll_id': ['a', 'b', 'c']})
self.assertSetEqual(set(), self.client._scroll_ids)
def test_bulk_with_instruction(self):
instruction = {'instruction': {}}
terms = ('one', 'two', 'three')
expected_data = ''.join([
'{"instruction": {}}\n'
'"one"\n'
'{"instruction": {}}\n'
'"two"\n'
'{"instruction": {}}\n'
'"three"\n',
])
with mock.patch.object(self.client, '_req') as rmock:
self.client.bulk_with_instruction(instruction, terms)
rmock.assert_called_once_with(
self.client._sess.post,
'http://elasticsearch:9200/index_name/test_mapping/_bulk',
expected_data, None, deserialize=False)
def test_bulk_index(self):
terms = ('one', 'two', 'three')
with mock.patch.object(self.client, 'bulk_with_instruction') as fmock:
self.client.bulk_index(terms)
fmock.assert_called_once_with({'index': {}}, terms)
def test_commit(self):
docs = ['one', 'two', 'three', 'four', 'five', 'six', 'seven']
size = 3
with mock.patch.object(self.client, 'bulk_index') as bulk_mock:
with mock.patch.object(self.client, '_docs', new=docs):
with mock.patch.object(self.client, '_chunk_size', new=size):
self.client.commit()
bulk_mock.assert_has_calls([
mock.call(['one', 'two', 'three']),
mock.call(['four', 'five', 'six']),
mock.call(['seven']),
])
def test_add_point_no_autocommit(self):
point = dataframe.DataPoint(
'unit', '0.42', '0.1337', {}, {})
start = datetime.datetime(2019, 1, 1)
end = datetime.datetime(2019, 1, 1, 1)
with mock.patch.object(self.client, 'commit') as func_mock:
with mock.patch.object(self.client, '_autocommit', new=False):
with mock.patch.object(self.client, '_chunk_size', new=3):
self.client._docs = []
for _ in range(5):
self.client.add_point(
point, 'awesome_type', start, end)
func_mock.assert_not_called()
self.assertEqual(self.client._docs, [{
'start': start,
'end': end,
'type': 'awesome_type',
'unit': point.unit,
'qty': point.qty,
'price': point.price,
'groupby': point.groupby,
'metadata': point.metadata,
} for _ in range(5)])
self.client._docs = []
def test_add_point_with_autocommit(self):
point = dataframe.DataPoint(
'unit', '0.42', '0.1337', {}, {})
start = datetime.datetime(2019, 1, 1)
end = datetime.datetime(2019, 1, 1, 1)
commit_calls = {'count': 0}
def commit():
# We can't re-assign nonlocal variables in python2
commit_calls['count'] += 1
self.client._docs = []
with mock.patch.object(self.client, 'commit', new=commit):
with mock.patch.object(self.client, '_autocommit', new=True):
with mock.patch.object(self.client, '_chunk_size', new=3):
self.client._docs = []
for i in range(5):
self.client.add_point(
point, 'awesome_type', start, end)
self.assertEqual(commit_calls['count'], 1)
self.assertEqual(self.client._docs, [{
'start': start,
'end': end,
'type': 'awesome_type',
'unit': point.unit,
'qty': point.qty,
'price': point.price,
'groupby': point.groupby,
'metadata': point.metadata,
} for _ in range(2)])
# cleanup
self.client._docs = []
def test_delete_by_query_with_must(self):
with mock.patch.object(self.client, '_req') as rmock:
with mock.patch.object(self.client, '_build_must') as func_mock:
func_mock.return_value = {'a': 'b'}
self.client.delete_by_query()
rmock.assert_called_once_with(
self.client._sess.post,
'http://elasticsearch:9200/index_name/_delete_by_query',
'{"query": {"bool": {"must": {"a": "b"}}}}', None)
def test_delete_by_query_no_must(self):
with mock.patch.object(self.client, '_req') as rmock:
with mock.patch.object(self.client, '_build_must') as func_mock:
func_mock.return_value = {}
self.client.delete_by_query()
rmock.assert_called_once_with(
self.client._sess.post,
'http://elasticsearch:9200/index_name/_delete_by_query',
None, None)
def test_retrieve_no_pagination(self):
search_resp = {
'_scroll_id': '000',
'hits': {'hits': ['one', 'two', 'three'], 'total': 12},
}
scroll_resps = [{
'_scroll_id': str(i + 1) * 3,
'hits': {'hits': ['one', 'two', 'three']},
} for i in range(3)]
scroll_resps.append({'_scroll_id': '444', 'hits': {'hits': []}})
self.client._scroll_ids = set()
with mock.patch.object(self.client, 'search') as search_mock:
with mock.patch.object(self.client, 'scroll') as scroll_mock:
with mock.patch.object(self.client, 'close_scrolls') as close:
search_mock.return_value = search_resp
scroll_mock.side_effect = scroll_resps
total, resp = self.client.retrieve(
None, None, None, None, paginate=False)
search_mock.assert_called_once()
scroll_mock.assert_has_calls([
mock.call({
'scroll_id': str(i) * 3,
'scroll': '60s',
}) for i in range(4)
])
self.assertEqual(total, 12)
self.assertEqual(resp, ['one', 'two', 'three'] * 4)
self.assertSetEqual(self.client._scroll_ids,
set(str(i) * 3 for i in range(5)))
close.assert_called_once()
self.client._scroll_ids = set()
def test_retrieve_with_pagination(self):
search_resp = {
'_scroll_id': '000',
'hits': {'hits': ['one', 'two', 'three'], 'total': 12},
}
scroll_resps = [{
'_scroll_id': str(i + 1) * 3,
'hits': {'hits': ['one', 'two', 'three']},
} for i in range(3)]
scroll_resps.append({'_scroll_id': '444', 'hits': {'hits': []}})
self.client._scroll_ids = set()
with mock.patch.object(self.client, 'search') as search_mock:
with mock.patch.object(self.client, 'scroll') as scroll_mock:
with mock.patch.object(self.client, 'close_scrolls') as close:
search_mock.return_value = search_resp
scroll_mock.side_effect = scroll_resps
total, resp = self.client.retrieve(
None, None, None, None,
offset=2, limit=4, paginate=True)
search_mock.assert_called_once()
scroll_mock.assert_called_once_with({
'scroll_id': '000',
'scroll': '60s',
})
self.assertEqual(total, 12)
self.assertEqual(resp, ['three', 'one', 'two', 'three'])
self.assertSetEqual(self.client._scroll_ids,
set(str(i) * 3 for i in range(2)))
close.assert_called_once()
self.client._scroll_ids = set()
def _do_test_total(self, groupby, paginate):
with mock.patch.object(self.client, 'search') as search_mock:
if groupby:
search_resps = [{
'aggregations': {
'sum_and_price': {
'buckets': ['one', 'two', 'three'],
'after_key': str(i),
}
}
} for i in range(3)]
last_resp_aggs = search_resps[2]['aggregations']
last_resp_aggs['sum_and_price'].pop('after_key')
last_resp_aggs['sum_and_price']['buckets'] = []
search_mock.side_effect = search_resps
else:
search_mock.return_value = {
'aggregations': ['one', 'two', 'three'],
}
resp = self.client.total(None, None, None, None, groupby,
offset=2, limit=4, paginate=paginate)
if not groupby:
search_mock.assert_called_once()
return resp
def test_total_no_groupby_no_pagination(self):
total, aggs = self._do_test_total(None, False)
self.assertEqual(total, 1)
self.assertEqual(aggs, [['one', 'two', 'three']])
def test_total_no_groupby_with_pagination(self):
total, aggs = self._do_test_total(None, True)
self.assertEqual(total, 1)
self.assertEqual(aggs, [['one', 'two', 'three']])
def test_total_with_groupby_no_pagination(self):
total, aggs = self._do_test_total(['x'], False)
self.assertEqual(total, 6)
self.assertEqual(aggs, ['one', 'two', 'three'] * 2)
def test_total_with_groupby_with_pagination(self):
total, aggs = self._do_test_total(['x'], True)
self.assertEqual(total, 6)
self.assertEqual(aggs, ['three', 'one', 'two', 'three'])

View File

@ -0,0 +1,96 @@
# Copyright 2019 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
import functools
import itertools
import requests
from cloudkitty.storage.v2.elasticsearch import client
class FakeElasticsearchClient(client.ElasticsearchClient):
def __init__(self, *args, **kwargs):
kwargs["autocommit"] = False
super(FakeElasticsearchClient, self).__init__(*args, **kwargs)
for method in ('get_index', 'put_mapping'):
setattr(self, method, self.__base_response)
@staticmethod
def __base_response(*args, **kwargs):
r = requests.Response()
r.status_code = 200
return r
def commit(self):
pass
@staticmethod
def __filter_func(begin, end, filters, mtypes, doc):
type_filter = lambda doc: doc['type'] in mtypes if mtypes else True
time_filter = lambda doc: (
(doc['start'] >= begin if begin else True)
and (doc['start'] < end if end else True))
def filter_(doc):
return all((doc['groupby'].get(k) == v
or (doc['metadata'].get(k) == v)
for k, v in filters.items())) if filters else True
return type_filter(doc) and time_filter(doc) and filter_(doc)
def retrieve(self, begin, end, filters, metric_types,
offset=0, limit=1000, paginate=True):
filter_func = functools.partial(
self.__filter_func, begin, end, filters, metric_types)
output = list(filter(filter_func, self._docs))[offset:offset+limit]
for doc in output:
doc["start"] = doc["start"].isoformat()
doc["end"] = doc["end"].isoformat()
doc["_source"] = copy.deepcopy(doc)
return len(output), output
def total(self, begin, end, metric_types, filters, groupby,
offset=0, limit=1000, paginate=True):
filter_func = functools.partial(
self.__filter_func, begin, end, filters, metric_types)
docs = list(filter(filter_func, self._docs))
if not groupby:
return 1, [{
'sum_qty': {'value': sum(doc['qty'] for doc in docs)},
'sum_price': {'value': sum(doc['price'] for doc in docs)},
'begin': begin,
'end': end,
}]
output = []
key_func = lambda d: tuple(
d['type'] if g == 'type' else d['groupby'][g] for g in groupby)
docs.sort(key=key_func)
for groups, values in itertools.groupby(docs, key_func):
val_list = list(values)
output.append({
'begin': begin,
'end': end,
'sum_qty': {'value': sum(doc['qty'] for doc in val_list)},
'sum_price': {'value': sum(doc['price'] for doc in val_list)},
'key': dict(zip(groupby, groups)),
})
return len(output), output[offset:offset+limit]
def _req(self, method, url, data, params, deserialize=True):
pass

View File

@ -19,16 +19,24 @@ import testscenarios
from cloudkitty import storage
from cloudkitty.tests import samples
from cloudkitty.tests.storage.v2 import es_utils
from cloudkitty.tests.storage.v2 import influx_utils
from cloudkitty.tests import TestCase
from cloudkitty.tests import utils as test_utils
from cloudkitty import tzutils
_ES_CLIENT_PATH = ('cloudkitty.storage.v2.elasticsearch'
'.client.ElasticsearchClient')
_INFLUX_CLIENT_PATH = 'cloudkitty.storage.v2.influx.InfluxClient'
class StorageUnitTest(TestCase):
storage_scenarios = [
('influx', dict(storage_backend='influxdb'))]
('influx', dict(storage_backend='influxdb')),
('elastic', dict(storage_backend='elasticsearch'))]
@classmethod
def generate_scenarios(cls):
@ -36,7 +44,9 @@ class StorageUnitTest(TestCase):
cls.scenarios,
cls.storage_scenarios)
@mock.patch('cloudkitty.storage.v2.influx.InfluxClient',
@mock.patch(_ES_CLIENT_PATH,
new=es_utils.FakeElasticsearchClient)
@mock.patch(_INFLUX_CLIENT_PATH,
new=influx_utils.FakeInfluxClient)
@mock.patch('cloudkitty.utils.load_conf', new=test_utils.load_conf)
def setUp(self):
@ -92,10 +102,12 @@ class StorageUnitTest(TestCase):
self.assertEqual(total['total'], expected_total_len)
returned_total = round(sum(r['rate'] for r in total['results']), 5)
self.assertLessEqual(abs(expected_total - returned_total), 0.00001)
self.assertLessEqual(
abs(expected_total - float(returned_total)), 0.00001)
returned_qty = round(sum(r['qty'] for r in total['results']), 5)
self.assertLessEqual(abs(expected_qty - returned_qty), 0.00001)
self.assertLessEqual(
abs(expected_qty - float(returned_qty)), 0.00001)
def test_get_total_all_scopes_all_periods(self):
expected_total, expected_qty, _ = self._expected_total_qty_len(
@ -178,19 +190,23 @@ class StorageUnitTest(TestCase):
total['results'].sort(key=lambda x: x['project_id'], reverse=True)
self.assertLessEqual(
abs(round(total['results'][0]['rate'] - expected_total_first, 5)),
abs(round(float(total['results'][0]['rate'])
- expected_total_first, 5)),
0.00001,
)
self.assertLessEqual(
abs(round(total['results'][1]['rate'] - expected_total_second, 5)),
abs(round(float(total['results'][1]['rate'])
- expected_total_second, 5)),
0.00001,
)
self.assertLessEqual(
abs(round(total['results'][0]['qty'] - expected_qty_first, 5)),
abs(round(float(total['results'][0]['qty'])
- expected_qty_first, 5)),
0.00001,
)
self.assertLessEqual(
abs(round(total['results'][1]['qty'] - expected_qty_second, 5)),
abs(round(float(total['results'][1]['qty'])
- expected_qty_second, 5)),
0.00001,
)
@ -213,19 +229,23 @@ class StorageUnitTest(TestCase):
total['results'].sort(key=lambda x: x['project_id'], reverse=True)
self.assertLessEqual(
abs(round(total['results'][0]['rate'] - expected_total_first, 5)),
abs(round(float(total['results'][0]['rate'])
- expected_total_first, 5)),
0.00001,
)
self.assertLessEqual(
abs(round(total['results'][1]['rate'] - expected_total_second, 5)),
abs(round(float(total['results'][1]['rate'])
- expected_total_second, 5)),
0.00001,
)
self.assertLessEqual(
abs(round(total['results'][0]['qty'] - expected_qty_first, 5)),
abs(round(float(total['results'][0]['qty'])
- expected_qty_first, 5)),
0.00001,
)
self.assertLessEqual(
abs(round(total['results'][1]['qty'] - expected_qty_second, 5)),
abs(round(float(total['results'][1]['qty'])
- expected_qty_second, 5)),
0.00001,
)

View File

@ -0,0 +1,5 @@
---
features:
- |
A v2 storage driver for Elasticsearch has been added. It is marked as
``EXPERIMENTAL`` for now.

View File

@ -68,6 +68,7 @@ cloudkitty.storage.v1.backends =
cloudkitty.storage.v2.backends =
influxdb = cloudkitty.storage.v2.influx:InfluxStorage
elasticsearch = cloudkitty.storage.v2.elasticsearch:ElasticsearchStorage
cloudkitty.storage.hybrid.backends =
gnocchi = cloudkitty.storage.v1.hybrid.backends.gnocchi:GnocchiStorage