Merge "Use project_id as part of the stream instead of part of the search"

This commit is contained in:
Zuul
2026-01-05 15:33:38 +00:00
committed by Gerrit Code Review
2 changed files with 134 additions and 56 deletions

View File

@@ -38,21 +38,27 @@ class LokiClient(object):
'Content-Type': content_type
}
self._buffer_size = buffer_size
self._points = []
self._points = {}
self._shard_days = shard_days
self._cert = cert
self._verify = verify
def _build_payload_json(self, batch):
payload = {
"streams": [
{
"stream": self._stream_labels,
"values": batch
}
]
}
"""Build payload with separate streams per project_id."""
streams = []
for project_id, values in batch.items():
# Combine base stream labels with project_id
stream_labels = self._stream_labels.copy()
# Only add project_id label if it's not None
if project_id is not None:
stream_labels['project_id'] = project_id
streams.append({
"stream": stream_labels,
"values": values
})
payload = {"streams": streams}
return payload
def _dict_to_loki_query(self, tags_dict, groupby=False, brackets=True):
@@ -76,9 +82,12 @@ class LokiClient(object):
else:
return ', '.join(pairs)
def _base_query(self):
def _base_query(self, project_id=None):
"""Makes sure that we always get json results."""
return self._dict_to_loki_query(self._stream_labels) + ' | json'
stream_labels = self._stream_labels.copy()
if project_id is not None:
stream_labels['project_id'] = project_id
return self._dict_to_loki_query(stream_labels) + ' | json'
def search(self, query, begin, end, limit):
url = f"{self._base_url}/query_range"
@@ -93,6 +102,7 @@ class LokiClient(object):
"limit": limit
}
LOG.debug(f"Executing Loki query: {params}")
response = requests.get(url, params=params, headers=self._headers,
cert=self._cert, verify=self._verify)
@@ -109,23 +119,27 @@ class LokiClient(object):
"""Send messages to Loki in batches."""
url = f"{self._base_url}/push"
while self._points:
payload = self._build_payload_json(self._points)
response = requests.post(url, json=payload, headers=self._headers,
cert=self._cert, verify=self._verify)
if not self._points:
return
if response.status_code == 204:
LOG.debug(
f"Batch of {len(self._points)} messages pushed "
f"successfully."
)
self._points = []
else:
LOG.error(
f"Failed to push logs: {response.status_code} - "
f"{response.text}"
)
break
# Count total points across all project_ids
total_points = sum(len(points) for points in self._points.values())
payload = self._build_payload_json(self._points)
response = requests.post(url, json=payload, headers=self._headers,
cert=self._cert, verify=self._verify)
if response.status_code == 204:
LOG.debug(
f"Batch of {total_points} messages across "
f"{len(self._points)} project(s) pushed successfully."
)
self._points = {}
else:
LOG.error(
f"Failed to push logs: {response.status_code} - "
f"{response.text}"
)
def delete_by_query(self, query, begin, end):
url = f"{self._base_url}/delete"
@@ -156,12 +170,20 @@ class LokiClient(object):
)
def delete(self, begin, end, filters):
query = self._base_query()
# Extract project_id from filters to use in stream selector
project_id = None
remaining_filters = filters
if filters and 'project_id' in filters:
project_id = filters['project_id']
remaining_filters = {k: v for k, v in filters.items()
if k != 'project_id'}
query = self._base_query(project_id=project_id)
loki_query_parts = []
if filters:
if remaining_filters:
loki_query_parts.append(
self._dict_to_loki_query(
filters, groupby=True, brackets=False
remaining_filters, groupby=True, brackets=False
)
)
@@ -172,13 +194,20 @@ class LokiClient(object):
def _retrieve(self, begin, end, filters, metric_types, limit):
"""Retrieves dataframes stored in Loki."""
query = self._base_query()
project_id = None
remaining_filters = filters
if filters and 'project_id' in filters:
project_id = filters['project_id']
remaining_filters = {k: v for k, v in filters.items()
if k != 'project_id'}
query = self._base_query(project_id=project_id)
loki_query_parts = []
if filters:
if remaining_filters:
loki_query_parts.append(
self._dict_to_loki_query(
filters, groupby=True, brackets=False
remaining_filters, groupby=True, brackets=False
)
)
@@ -237,10 +266,13 @@ class LokiClient(object):
return total, data
def add_point(self, point, type, start, end):
"""Append a point to the client."""
"""Append a point to the client, grouped by project_id."""
timestamp_ns = int(end.timestamp() * 1_000_000_000)
timestamp = str(timestamp_ns)
# Extract project_id from point.groupby (use None if not present)
project_id = point.groupby.get('project_id', None)
data = {
'start': start,
'end': end,
@@ -254,9 +286,16 @@ class LokiClient(object):
}
log_line = json.dumps(data)
self._points.append([timestamp, log_line])
if len(self._points) >= self._buffer_size:
# Group points by project_id
if project_id not in self._points:
self._points[project_id] = []
self._points[project_id].append([timestamp, log_line])
# Check if any project has reached buffer size
if any(len(points) >= self._buffer_size for points
in self._points.values()):
self.push()
def total(self, begin, end, metric_types, filters, groupby,

View File

@@ -72,7 +72,7 @@ class TestLokiClient(unittest.TestCase):
self.assertEqual(self.client._headers['Content-Type'],
self.content_type)
self.assertEqual(self.client._buffer_size, self.buffer_size)
self.assertEqual(self.client._points, [])
self.assertEqual(self.client._points, {})
self.assertEqual(self.client._cert, self.cert)
self.assertEqual(self.client._verify, self.verify)
@@ -83,18 +83,44 @@ class TestLokiClient(unittest.TestCase):
None, True)
def test_build_payload_json(self, mock_log, mock_requests):
batch = [["1609459200000000000", "log line 1"],
["1609459200000000001", "log line 2"]]
batch = {
"proj1": [["1609459200000000000", "log line 1"],
["1609459200000000001", "log line 2"]],
"proj2": [["1609459200000000002", "log line 3"]]
}
payload = self.client._build_payload_json(batch)
expected_payload = {
"streams": [
{
"stream": {**self.stream_labels, "project_id": "proj1"},
"values": batch["proj1"]
},
{
"stream": {**self.stream_labels, "project_id": "proj2"},
"values": batch["proj2"]
}
]
}
self.assertEqual(payload, expected_payload)
def test_build_payload_json_with_none_project_id(
self, mock_log, mock_requests):
batch = {
None: [["1609459200000000000", "log line 1"],
["1609459200000000001", "log line 2"]]
}
payload = self.client._build_payload_json(batch)
expected_payload = {
"streams": [
{
"stream": self.stream_labels,
"values": batch
"values": batch[None]
}
]
}
self.assertEqual(payload, expected_payload)
# Verify that project_id is NOT in the stream labels
self.assertNotIn('project_id', payload['streams'][0]['stream'])
def test_dict_to_loki_query(self, mock_log, mock_requests):
self.assertEqual(self.client._dict_to_loki_query({}), '{}')
@@ -200,18 +226,21 @@ class TestLokiClient(unittest.TestCase):
mock_response = MagicMock()
mock_response.status_code = 204
mock_requests.post.return_value = mock_response
self.client._points = [["ts1", "log1"], ["ts2", "log2"]]
self.client._points = {
"proj1": [["ts1", "log1"], ["ts2", "log2"]]
}
self.client.push()
expected_url = f"{self.base_url}/push"
expected_payload = self.client._build_payload_json(
[["ts1", "log1"], ["ts2", "log2"]]
{"proj1": [["ts1", "log1"], ["ts2", "log2"]]}
)
mock_requests.post.assert_called_once_with(
expected_url, json=expected_payload, headers=self.client._headers,
cert=self.client._cert, verify=self.client._verify
)
self.assertEqual(self.client._points, [])
log_msg = "Batch of 2 messages pushed successfully."
self.assertEqual(self.client._points, {})
log_msg = ("Batch of 2 messages across 1 project(s) "
"pushed successfully.")
mock_log.debug.assert_called_once_with(log_msg)
def test_push_failure(self, mock_log, mock_requests):
@@ -219,8 +248,8 @@ class TestLokiClient(unittest.TestCase):
mock_response.status_code = 400
mock_response.text = "Bad Request"
mock_requests.post.return_value = mock_response
initial_points = [["ts1", "log1"], ["ts2", "log2"]]
self.client._points = list(initial_points)
initial_points = {"proj1": [["ts1", "log1"], ["ts2", "log2"]]}
self.client._points = dict(initial_points)
self.client.push()
self.assertEqual(self.client._points, initial_points)
expected_msg = "Failed to push logs: 400 - Bad Request"
@@ -234,7 +263,7 @@ class TestLokiClient(unittest.TestCase):
)
def test_push_no_points(self, mock_log, mock_requests):
self.client._points = []
self.client._points = {}
self.client.push()
mock_requests.post.assert_not_called()
@@ -267,9 +296,11 @@ class TestLokiClient(unittest.TestCase):
metric_types = "cpu_util"
total, output = self.client.retrieve(self.begin_dt, self.end_dt,
filters, metric_types, 50)
base_query = self.client._base_query()
# project_id is now part of the stream selector, not groupby filter
base_query = self.client._base_query(project_id="proj1")
# Only remaining filters (region) go in groupby
filter_query_part = self.client._dict_to_loki_query(
filters, groupby=True, brackets=False
{"region": "reg1"}, groupby=True, brackets=False
)
metric_query_part = f'type = "{metric_types}"'
expected_full_query = (
@@ -337,8 +368,11 @@ class TestLokiClient(unittest.TestCase):
self.client._buffer_size = 3
point = MockDataPoint(qty=1, price=10)
self.client.add_point(point, "test_type", self.begin_dt, self.end_dt)
# _points is now a dict with project_id as key
self.assertEqual(len(self.client._points), 1)
added_point_data = json.loads(self.client._points[0][1])
self.assertIn('proj1', self.client._points)
self.assertEqual(len(self.client._points['proj1']), 1)
added_point_data = json.loads(self.client._points['proj1'][0][1])
self.assertEqual(added_point_data['type'], "test_type")
self.assertEqual(added_point_data['qty'], 1)
self.assertEqual(added_point_data['price'], 10)
@@ -354,7 +388,10 @@ class TestLokiClient(unittest.TestCase):
self.client._buffer_size = 1
point = MockDataPoint()
self.client.add_point(point, "test_type", self.begin_dt, self.end_dt)
# _points is now a dict with project_id as key
self.assertEqual(len(self.client._points), 1)
self.assertIn('proj1', self.client._points)
self.assertEqual(len(self.client._points['proj1']), 1)
mock_push.assert_called_once()
@patch.object(client.LokiClient, 'retrieve')
@@ -531,17 +568,19 @@ class TestLokiClient(unittest.TestCase):
@patch.object(client.LokiClient, '_base_query')
def test_delete_with_filters(self, mock_base_query, mock_delete_by_query,
mock_log, mock_requests_arg):
mock_base_query.return_value = '{app="cloudkitty", source="test"} ' \
'| json'
mock_base_query.return_value = (
'{app="cloudkitty", source="test", project_id="proj1"} | json'
)
filters = {"project_id": "proj1", "resource_type": "instance"}
self.client.delete(self.begin_dt, self.end_dt, filters)
exp_query_filters = 'groupby_project_id="proj1", ' \
'groupby_resource_type="instance"'
# project_id is now in stream selector via _base_query
# only resource_type goes in groupby filters
exp_query_filters = 'groupby_resource_type="instance"'
exp_query = f'{mock_base_query.return_value} | {exp_query_filters}'
mock_delete_by_query.assert_called_once_with(
exp_query, self.begin_dt, self.end_dt
)
mock_base_query.assert_called_once()
mock_base_query.assert_called_once_with(project_id="proj1")
@patch.object(client.LokiClient, 'delete_by_query')
@patch.object(client.LokiClient, '_base_query')
@@ -553,7 +592,7 @@ class TestLokiClient(unittest.TestCase):
mock_delete_by_query.assert_called_once_with(
mock_base_query.return_value, self.begin_dt, self.end_dt
)
mock_base_query.assert_called_once()
mock_base_query.assert_called_once_with(project_id=None)
@patch.object(client.LokiClient, '_retrieve')
def test_retrieve_shards_with_long_time_range(