Merge "Shard the requests to Loki"
This commit is contained in:
@@ -52,9 +52,19 @@ loki_storage_opts = [
|
||||
cfg.StrOpt(
|
||||
'content_type',
|
||||
help='The http Content-Type that will be used to send info to Loki. '
|
||||
'Defaults to application/json. It can also be '
|
||||
'application/x-protobuf',
|
||||
'Currently only application/json is supported.',
|
||||
default='application/json'),
|
||||
cfg.IntOpt(
|
||||
'shard_days',
|
||||
help='Controls how data retrieval requests are split across time. '
|
||||
'When fetching data from Loki over a long period, the time '
|
||||
'range is divided into smaller intervals of N days, and '
|
||||
'separate requests are made for each interval. This prevents '
|
||||
'timeouts and improves performance. Defaults to 7 days. '
|
||||
'Maximum is 30 days.',
|
||||
default=7,
|
||||
min=1,
|
||||
max=30),
|
||||
cfg.BoolOpt(
|
||||
'insecure',
|
||||
help='Set to true to allow insecure HTTPS connections to Loki',
|
||||
@@ -98,6 +108,7 @@ class LokiStorage(v2_storage.BaseStorage):
|
||||
CONF.storage_loki.stream,
|
||||
CONF.storage_loki.content_type,
|
||||
CONF.storage_loki.buffer_size,
|
||||
CONF.storage_loki.shard_days,
|
||||
cert,
|
||||
verify)
|
||||
|
||||
@@ -127,7 +138,13 @@ class LokiStorage(v2_storage.BaseStorage):
|
||||
def _build_dataframes(self, logs):
|
||||
dataframes = {}
|
||||
for log in logs:
|
||||
labels = json.loads(log['values'][0][1])
|
||||
try:
|
||||
labels = json.loads(log['values'][0][1])
|
||||
except json.JSONDecodeError:
|
||||
# In case that we have non-json compliant log lines in Loki
|
||||
LOG.error(f"Failed to decode log line: {log['values'][0][1]}, "
|
||||
f"ignoring.")
|
||||
continue
|
||||
start = tzutils.dt_from_iso(labels['start'])
|
||||
end = tzutils.dt_from_iso(labels['end'])
|
||||
key = (start, end)
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
import datetime
|
||||
|
||||
from oslo_log import log
|
||||
import requests
|
||||
|
||||
@@ -25,7 +27,7 @@ class LokiClient(object):
|
||||
"""Class used to ease interaction with Loki."""
|
||||
|
||||
def __init__(self, url, tenant, stream_labels, content_type, buffer_size,
|
||||
cert, verify):
|
||||
shard_days, cert, verify):
|
||||
if content_type != "application/json":
|
||||
raise exceptions.UnsupportedContentType(content_type)
|
||||
|
||||
@@ -37,6 +39,7 @@ class LokiClient(object):
|
||||
}
|
||||
self._buffer_size = buffer_size
|
||||
self._points = []
|
||||
self._shard_days = shard_days
|
||||
|
||||
self._cert = cert
|
||||
self._verify = verify
|
||||
@@ -164,7 +167,7 @@ class LokiClient(object):
|
||||
|
||||
self.delete_by_query(query, begin, end)
|
||||
|
||||
def retrieve(self, begin, end, filters, metric_types, limit):
|
||||
def _retrieve(self, begin, end, filters, metric_types, limit):
|
||||
"""Retrieves dataframes stored in Loki."""
|
||||
query = self._base_query()
|
||||
loki_query_parts = []
|
||||
@@ -186,6 +189,11 @@ class LokiClient(object):
|
||||
if loki_query_parts:
|
||||
query += ' | ' + ', '.join(loki_query_parts)
|
||||
|
||||
LOG.debug(
|
||||
f"Loki query: '{query}', begin: '{begin}', end: '{end}', "
|
||||
f"limit: '{limit}'"
|
||||
)
|
||||
|
||||
data_response = self.search(query, begin, end, limit)
|
||||
|
||||
if not isinstance(data_response, dict) or \
|
||||
@@ -205,6 +213,26 @@ class LokiClient(object):
|
||||
|
||||
return total, output
|
||||
|
||||
def retrieve(self, begin, end, filters, metric_types, limit):
|
||||
total = 0
|
||||
data = []
|
||||
if end - begin > datetime.timedelta(days=self._shard_days):
|
||||
step = datetime.timedelta(days=self._shard_days)
|
||||
current_begin = begin
|
||||
|
||||
while current_begin < end:
|
||||
current_end = min(current_begin + step, end)
|
||||
t, d = self._retrieve(
|
||||
current_begin, current_end, filters, metric_types, limit)
|
||||
total += t
|
||||
data += d
|
||||
current_begin = current_end
|
||||
else:
|
||||
total, data = self._retrieve(
|
||||
begin, end, filters, metric_types, limit)
|
||||
|
||||
return total, data
|
||||
|
||||
def add_point(self, point, type, start, end):
|
||||
"""Append a point to the client."""
|
||||
timestamp_ns = int(end.timestamp() * 1_000_000_000)
|
||||
@@ -246,8 +274,7 @@ class LokiClient(object):
|
||||
LOG.warning("offset is not supported by Loki.")
|
||||
|
||||
total_count, data = self.retrieve(
|
||||
begin, end, filters, metric_types, limit
|
||||
)
|
||||
begin, end, filters, metric_types, limit)
|
||||
|
||||
if not groupby:
|
||||
total_qty = 0.0
|
||||
@@ -297,4 +324,5 @@ class LokiClient(object):
|
||||
'sum_qty': {'value': values['sum_qty']},
|
||||
'sum_price': {'value': values['sum_price']}
|
||||
})
|
||||
|
||||
return len(result), result
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from datetime import timezone
|
||||
import json
|
||||
import unittest
|
||||
@@ -48,6 +49,7 @@ class TestLokiClient(unittest.TestCase):
|
||||
self.stream_labels = {"app": "cloudkitty", "source": "test"}
|
||||
self.content_type = "application/json"
|
||||
self.buffer_size = 2
|
||||
self.shard_days = 7
|
||||
self.cert = ('/path/to/cert', '/path/to/key')
|
||||
self.verify = '/path/to/cafile'
|
||||
self.client = client.LokiClient(
|
||||
@@ -56,6 +58,7 @@ class TestLokiClient(unittest.TestCase):
|
||||
self.stream_labels,
|
||||
self.content_type,
|
||||
self.buffer_size,
|
||||
self.shard_days,
|
||||
cert=self.cert,
|
||||
verify=self.verify
|
||||
)
|
||||
@@ -76,7 +79,8 @@ class TestLokiClient(unittest.TestCase):
|
||||
def test_init_unsupported_content_type(self, mock_log, mock_requests):
|
||||
with self.assertRaises(exceptions.UnsupportedContentType):
|
||||
client.LokiClient(self.base_url, self.tenant, self.stream_labels,
|
||||
"text/plain", self.buffer_size, None, True)
|
||||
"text/plain", self.buffer_size, self.shard_days,
|
||||
None, True)
|
||||
|
||||
def test_build_payload_json(self, mock_log, mock_requests):
|
||||
batch = [["1609459200000000000", "log line 1"],
|
||||
@@ -550,3 +554,34 @@ class TestLokiClient(unittest.TestCase):
|
||||
mock_base_query.return_value, self.begin_dt, self.end_dt
|
||||
)
|
||||
mock_base_query.assert_called_once()
|
||||
|
||||
@patch.object(client.LokiClient, '_retrieve')
|
||||
def test_retrieve_shards_with_long_time_range(
|
||||
self, mock_retrieve, mock_log, mock_requests_arg):
|
||||
begin_dt = datetime(2024, 1, 1, tzinfo=timezone.utc)
|
||||
end_dt = datetime(2024, 1, 15, tzinfo=timezone.utc)
|
||||
mock_retrieve.return_value = (1, ["dummy_data"])
|
||||
|
||||
self.client.retrieve(begin_dt, end_dt, None, None, 100)
|
||||
|
||||
calls = [
|
||||
call(begin_dt, begin_dt + timedelta(days=self.shard_days),
|
||||
None, None, 100),
|
||||
call(begin_dt + timedelta(days=self.shard_days), end_dt,
|
||||
None, None, 100)
|
||||
]
|
||||
mock_retrieve.assert_has_calls(calls)
|
||||
self.assertEqual(mock_retrieve.call_count, 2)
|
||||
|
||||
@patch.object(client.LokiClient, '_retrieve')
|
||||
def test_retrieve_no_sharding_with_short_time_range(
|
||||
self, mock_retrieve, mock_log, mock_requests_arg):
|
||||
begin_dt = datetime(2024, 1, 1, tzinfo=timezone.utc)
|
||||
end_dt = datetime(2024, 1, 5, tzinfo=timezone.utc)
|
||||
mock_retrieve.return_value = (1, ["dummy_data"])
|
||||
|
||||
self.client.retrieve(begin_dt, end_dt, None, None, 100)
|
||||
|
||||
mock_retrieve.assert_called_once_with(
|
||||
begin_dt, end_dt, None, None, 100
|
||||
)
|
||||
|
||||
@@ -23,7 +23,7 @@ from cloudkitty.utils import json
|
||||
class FakeLokiClient(loki_client_module.LokiClient):
|
||||
|
||||
def __init__(self, url, tenant, stream_labels, content_type,
|
||||
buffer_size, cert, verify, **kwargs):
|
||||
buffer_size, shard_days, cert, verify, **kwargs):
|
||||
if content_type != "application/json":
|
||||
raise loki_exceptions.UnsupportedContentType(content_type)
|
||||
|
||||
@@ -35,6 +35,7 @@ class FakeLokiClient(loki_client_module.LokiClient):
|
||||
'Content-Type': content_type
|
||||
}
|
||||
self._buffer_size = buffer_size
|
||||
self._shard_days = shard_days
|
||||
|
||||
self._cert = cert
|
||||
self._verify = verify
|
||||
|
||||
Reference in New Issue
Block a user