diff --git a/cloudkitty/storage/v2/loki/__init__.py b/cloudkitty/storage/v2/loki/__init__.py index e6423ef5..b704cb9b 100644 --- a/cloudkitty/storage/v2/loki/__init__.py +++ b/cloudkitty/storage/v2/loki/__init__.py @@ -52,9 +52,19 @@ loki_storage_opts = [ cfg.StrOpt( 'content_type', help='The http Content-Type that will be used to send info to Loki. ' - 'Defaults to application/json. It can also be ' - 'application/x-protobuf', + 'Currently only application/json is supported.', default='application/json'), + cfg.IntOpt( + 'shard_days', + help='Controls how data retrieval requests are split across time. ' + 'When fetching data from Loki over a long period, the time ' + 'range is divided into smaller intervals of N days, and ' + 'separate requests are made for each interval. This prevents ' + 'timeouts and improves performance. Defaults to 7 days. ' + 'Maximum is 30 days.', + default=7, + min=1, + max=30), cfg.BoolOpt( 'insecure', help='Set to true to allow insecure HTTPS connections to Loki', @@ -98,6 +108,7 @@ class LokiStorage(v2_storage.BaseStorage): CONF.storage_loki.stream, CONF.storage_loki.content_type, CONF.storage_loki.buffer_size, + CONF.storage_loki.shard_days, cert, verify) @@ -127,7 +138,13 @@ class LokiStorage(v2_storage.BaseStorage): def _build_dataframes(self, logs): dataframes = {} for log in logs: - labels = json.loads(log['values'][0][1]) + try: + labels = json.loads(log['values'][0][1]) + except json.JSONDecodeError: + # In case that we have non-json compliant log lines in Loki + LOG.error(f"Failed to decode log line: {log['values'][0][1]}, " + f"ignoring.") + continue start = tzutils.dt_from_iso(labels['start']) end = tzutils.dt_from_iso(labels['end']) key = (start, end) diff --git a/cloudkitty/storage/v2/loki/client.py b/cloudkitty/storage/v2/loki/client.py index 0be31c2c..028ca152 100644 --- a/cloudkitty/storage/v2/loki/client.py +++ b/cloudkitty/storage/v2/loki/client.py @@ -12,6 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. # +import datetime + from oslo_log import log import requests @@ -25,7 +27,7 @@ class LokiClient(object): """Class used to ease interaction with Loki.""" def __init__(self, url, tenant, stream_labels, content_type, buffer_size, - cert, verify): + shard_days, cert, verify): if content_type != "application/json": raise exceptions.UnsupportedContentType(content_type) @@ -37,6 +39,7 @@ class LokiClient(object): } self._buffer_size = buffer_size self._points = [] + self._shard_days = shard_days self._cert = cert self._verify = verify @@ -164,7 +167,7 @@ class LokiClient(object): self.delete_by_query(query, begin, end) - def retrieve(self, begin, end, filters, metric_types, limit): + def _retrieve(self, begin, end, filters, metric_types, limit): """Retrieves dataframes stored in Loki.""" query = self._base_query() loki_query_parts = [] @@ -186,6 +189,11 @@ class LokiClient(object): if loki_query_parts: query += ' | ' + ', '.join(loki_query_parts) + LOG.debug( + f"Loki query: '{query}', begin: '{begin}', end: '{end}', " + f"limit: '{limit}'" + ) + data_response = self.search(query, begin, end, limit) if not isinstance(data_response, dict) or \ @@ -205,6 +213,26 @@ class LokiClient(object): return total, output + def retrieve(self, begin, end, filters, metric_types, limit): + total = 0 + data = [] + if end - begin > datetime.timedelta(days=self._shard_days): + step = datetime.timedelta(days=self._shard_days) + current_begin = begin + + while current_begin < end: + current_end = min(current_begin + step, end) + t, d = self._retrieve( + current_begin, current_end, filters, metric_types, limit) + total += t + data += d + current_begin = current_end + else: + total, data = self._retrieve( + begin, end, filters, metric_types, limit) + + return total, data + def add_point(self, point, type, start, end): """Append a point to the client.""" timestamp_ns = int(end.timestamp() * 1_000_000_000) @@ -246,8 +274,7 @@ class LokiClient(object): LOG.warning("offset is not supported by Loki.") total_count, data = self.retrieve( - begin, end, filters, metric_types, limit - ) + begin, end, filters, metric_types, limit) if not groupby: total_qty = 0.0 @@ -297,4 +324,5 @@ class LokiClient(object): 'sum_qty': {'value': values['sum_qty']}, 'sum_price': {'value': values['sum_price']} }) + return len(result), result diff --git a/cloudkitty/tests/storage/v2/loki/test_client.py b/cloudkitty/tests/storage/v2/loki/test_client.py index 4e0081ef..b3524543 100644 --- a/cloudkitty/tests/storage/v2/loki/test_client.py +++ b/cloudkitty/tests/storage/v2/loki/test_client.py @@ -14,6 +14,7 @@ # from datetime import datetime +from datetime import timedelta from datetime import timezone import json import unittest @@ -48,6 +49,7 @@ class TestLokiClient(unittest.TestCase): self.stream_labels = {"app": "cloudkitty", "source": "test"} self.content_type = "application/json" self.buffer_size = 2 + self.shard_days = 7 self.cert = ('/path/to/cert', '/path/to/key') self.verify = '/path/to/cafile' self.client = client.LokiClient( @@ -56,6 +58,7 @@ class TestLokiClient(unittest.TestCase): self.stream_labels, self.content_type, self.buffer_size, + self.shard_days, cert=self.cert, verify=self.verify ) @@ -76,7 +79,8 @@ class TestLokiClient(unittest.TestCase): def test_init_unsupported_content_type(self, mock_log, mock_requests): with self.assertRaises(exceptions.UnsupportedContentType): client.LokiClient(self.base_url, self.tenant, self.stream_labels, - "text/plain", self.buffer_size, None, True) + "text/plain", self.buffer_size, self.shard_days, + None, True) def test_build_payload_json(self, mock_log, mock_requests): batch = [["1609459200000000000", "log line 1"], @@ -550,3 +554,34 @@ class TestLokiClient(unittest.TestCase): mock_base_query.return_value, self.begin_dt, self.end_dt ) mock_base_query.assert_called_once() + + @patch.object(client.LokiClient, '_retrieve') + def test_retrieve_shards_with_long_time_range( + self, mock_retrieve, mock_log, mock_requests_arg): + begin_dt = datetime(2024, 1, 1, tzinfo=timezone.utc) + end_dt = datetime(2024, 1, 15, tzinfo=timezone.utc) + mock_retrieve.return_value = (1, ["dummy_data"]) + + self.client.retrieve(begin_dt, end_dt, None, None, 100) + + calls = [ + call(begin_dt, begin_dt + timedelta(days=self.shard_days), + None, None, 100), + call(begin_dt + timedelta(days=self.shard_days), end_dt, + None, None, 100) + ] + mock_retrieve.assert_has_calls(calls) + self.assertEqual(mock_retrieve.call_count, 2) + + @patch.object(client.LokiClient, '_retrieve') + def test_retrieve_no_sharding_with_short_time_range( + self, mock_retrieve, mock_log, mock_requests_arg): + begin_dt = datetime(2024, 1, 1, tzinfo=timezone.utc) + end_dt = datetime(2024, 1, 5, tzinfo=timezone.utc) + mock_retrieve.return_value = (1, ["dummy_data"]) + + self.client.retrieve(begin_dt, end_dt, None, None, 100) + + mock_retrieve.assert_called_once_with( + begin_dt, end_dt, None, None, 100 + ) diff --git a/cloudkitty/tests/storage/v2/loki_utils.py b/cloudkitty/tests/storage/v2/loki_utils.py index bc860436..fba5a469 100644 --- a/cloudkitty/tests/storage/v2/loki_utils.py +++ b/cloudkitty/tests/storage/v2/loki_utils.py @@ -23,7 +23,7 @@ from cloudkitty.utils import json class FakeLokiClient(loki_client_module.LokiClient): def __init__(self, url, tenant, stream_labels, content_type, - buffer_size, cert, verify, **kwargs): + buffer_size, shard_days, cert, verify, **kwargs): if content_type != "application/json": raise loki_exceptions.UnsupportedContentType(content_type) @@ -35,6 +35,7 @@ class FakeLokiClient(loki_client_module.LokiClient): 'Content-Type': content_type } self._buffer_size = buffer_size + self._shard_days = shard_days self._cert = cert self._verify = verify