Shard the requests to Loki

Loki default options are very conservative about resourcing and
access so the queries are manageable for the user. This patch
introduced sharding of queries so the user can decide to do more
queries to Loki (with the corresponding time overhead) in exchange
for being able to calculate and retrieve more granular data.

The user can also decide to store less granular data into Loki and
do less query sharding, improving the response time.

Change-Id: I309025e2e4c914aeb593cff7d88469a351edf2fd
Signed-off-by: Juan Larriba <jlarriba@redhat.com>
This commit is contained in:
Juan Larriba
2025-09-18 10:05:05 +02:00
parent c22aef560d
commit 24197d2cf8
4 changed files with 90 additions and 9 deletions

View File

@@ -52,9 +52,19 @@ loki_storage_opts = [
cfg.StrOpt(
'content_type',
help='The http Content-Type that will be used to send info to Loki. '
'Defaults to application/json. It can also be '
'application/x-protobuf',
'Currently only application/json is supported.',
default='application/json'),
cfg.IntOpt(
'shard_days',
help='Controls how data retrieval requests are split across time. '
'When fetching data from Loki over a long period, the time '
'range is divided into smaller intervals of N days, and '
'separate requests are made for each interval. This prevents '
'timeouts and improves performance. Defaults to 7 days. '
'Maximum is 30 days.',
default=7,
min=1,
max=30),
cfg.BoolOpt(
'insecure',
help='Set to true to allow insecure HTTPS connections to Loki',
@@ -98,6 +108,7 @@ class LokiStorage(v2_storage.BaseStorage):
CONF.storage_loki.stream,
CONF.storage_loki.content_type,
CONF.storage_loki.buffer_size,
CONF.storage_loki.shard_days,
cert,
verify)
@@ -127,7 +138,13 @@ class LokiStorage(v2_storage.BaseStorage):
def _build_dataframes(self, logs):
dataframes = {}
for log in logs:
labels = json.loads(log['values'][0][1])
try:
labels = json.loads(log['values'][0][1])
except json.JSONDecodeError:
# In case that we have non-json compliant log lines in Loki
LOG.error(f"Failed to decode log line: {log['values'][0][1]}, "
f"ignoring.")
continue
start = tzutils.dt_from_iso(labels['start'])
end = tzutils.dt_from_iso(labels['end'])
key = (start, end)

View File

@@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
#
import datetime
from oslo_log import log
import requests
@@ -25,7 +27,7 @@ class LokiClient(object):
"""Class used to ease interaction with Loki."""
def __init__(self, url, tenant, stream_labels, content_type, buffer_size,
cert, verify):
shard_days, cert, verify):
if content_type != "application/json":
raise exceptions.UnsupportedContentType(content_type)
@@ -37,6 +39,7 @@ class LokiClient(object):
}
self._buffer_size = buffer_size
self._points = []
self._shard_days = shard_days
self._cert = cert
self._verify = verify
@@ -164,7 +167,7 @@ class LokiClient(object):
self.delete_by_query(query, begin, end)
def retrieve(self, begin, end, filters, metric_types, limit):
def _retrieve(self, begin, end, filters, metric_types, limit):
"""Retrieves dataframes stored in Loki."""
query = self._base_query()
loki_query_parts = []
@@ -186,6 +189,11 @@ class LokiClient(object):
if loki_query_parts:
query += ' | ' + ', '.join(loki_query_parts)
LOG.debug(
f"Loki query: '{query}', begin: '{begin}', end: '{end}', "
f"limit: '{limit}'"
)
data_response = self.search(query, begin, end, limit)
if not isinstance(data_response, dict) or \
@@ -205,6 +213,26 @@ class LokiClient(object):
return total, output
def retrieve(self, begin, end, filters, metric_types, limit):
total = 0
data = []
if end - begin > datetime.timedelta(days=self._shard_days):
step = datetime.timedelta(days=self._shard_days)
current_begin = begin
while current_begin < end:
current_end = min(current_begin + step, end)
t, d = self._retrieve(
current_begin, current_end, filters, metric_types, limit)
total += t
data += d
current_begin = current_end
else:
total, data = self._retrieve(
begin, end, filters, metric_types, limit)
return total, data
def add_point(self, point, type, start, end):
"""Append a point to the client."""
timestamp_ns = int(end.timestamp() * 1_000_000_000)
@@ -246,8 +274,7 @@ class LokiClient(object):
LOG.warning("offset is not supported by Loki.")
total_count, data = self.retrieve(
begin, end, filters, metric_types, limit
)
begin, end, filters, metric_types, limit)
if not groupby:
total_qty = 0.0
@@ -297,4 +324,5 @@ class LokiClient(object):
'sum_qty': {'value': values['sum_qty']},
'sum_price': {'value': values['sum_price']}
})
return len(result), result

View File

@@ -14,6 +14,7 @@
#
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import json
import unittest
@@ -48,6 +49,7 @@ class TestLokiClient(unittest.TestCase):
self.stream_labels = {"app": "cloudkitty", "source": "test"}
self.content_type = "application/json"
self.buffer_size = 2
self.shard_days = 7
self.cert = ('/path/to/cert', '/path/to/key')
self.verify = '/path/to/cafile'
self.client = client.LokiClient(
@@ -56,6 +58,7 @@ class TestLokiClient(unittest.TestCase):
self.stream_labels,
self.content_type,
self.buffer_size,
self.shard_days,
cert=self.cert,
verify=self.verify
)
@@ -76,7 +79,8 @@ class TestLokiClient(unittest.TestCase):
def test_init_unsupported_content_type(self, mock_log, mock_requests):
with self.assertRaises(exceptions.UnsupportedContentType):
client.LokiClient(self.base_url, self.tenant, self.stream_labels,
"text/plain", self.buffer_size, None, True)
"text/plain", self.buffer_size, self.shard_days,
None, True)
def test_build_payload_json(self, mock_log, mock_requests):
batch = [["1609459200000000000", "log line 1"],
@@ -550,3 +554,34 @@ class TestLokiClient(unittest.TestCase):
mock_base_query.return_value, self.begin_dt, self.end_dt
)
mock_base_query.assert_called_once()
@patch.object(client.LokiClient, '_retrieve')
def test_retrieve_shards_with_long_time_range(
self, mock_retrieve, mock_log, mock_requests_arg):
begin_dt = datetime(2024, 1, 1, tzinfo=timezone.utc)
end_dt = datetime(2024, 1, 15, tzinfo=timezone.utc)
mock_retrieve.return_value = (1, ["dummy_data"])
self.client.retrieve(begin_dt, end_dt, None, None, 100)
calls = [
call(begin_dt, begin_dt + timedelta(days=self.shard_days),
None, None, 100),
call(begin_dt + timedelta(days=self.shard_days), end_dt,
None, None, 100)
]
mock_retrieve.assert_has_calls(calls)
self.assertEqual(mock_retrieve.call_count, 2)
@patch.object(client.LokiClient, '_retrieve')
def test_retrieve_no_sharding_with_short_time_range(
self, mock_retrieve, mock_log, mock_requests_arg):
begin_dt = datetime(2024, 1, 1, tzinfo=timezone.utc)
end_dt = datetime(2024, 1, 5, tzinfo=timezone.utc)
mock_retrieve.return_value = (1, ["dummy_data"])
self.client.retrieve(begin_dt, end_dt, None, None, 100)
mock_retrieve.assert_called_once_with(
begin_dt, end_dt, None, None, 100
)

View File

@@ -23,7 +23,7 @@ from cloudkitty.utils import json
class FakeLokiClient(loki_client_module.LokiClient):
def __init__(self, url, tenant, stream_labels, content_type,
buffer_size, cert, verify, **kwargs):
buffer_size, shard_days, cert, verify, **kwargs):
if content_type != "application/json":
raise loki_exceptions.UnsupportedContentType(content_type)
@@ -35,6 +35,7 @@ class FakeLokiClient(loki_client_module.LokiClient):
'Content-Type': content_type
}
self._buffer_size = buffer_size
self._shard_days = shard_days
self._cert = cert
self._verify = verify