From 708d9f938ca44696db40e6c26bd48055741ebbe7 Mon Sep 17 00:00:00 2001 From: Juan Larriba Date: Tue, 2 Dec 2025 14:38:14 +0100 Subject: [PATCH] Use project_id as part of the stream instead of part of the search Loki searches in two layers: the first one is the streams. Each stream is uniquely identified by a set of attributes. Before this patch, these attributes are just what the user passes in the "stream" parameter, or {service="cloudkitty"} by default. This patch adds a project_id="aaa-bbb-ccc-ddd" field to the stream, effectively creating one stream per project. As the most common use case retrieves the resources in a per-project basis, this will make most queries much faster. Even more, the openstack rating CLI always puts the project_id in the query, so that makes much more efficient all those queries. Change-Id: I35840e50a292ba122b978ac332849d793fe9f10b Signed-off-by: Juan Larriba --- cloudkitty/storage/v2/loki/client.py | 111 ++++++++++++------ .../tests/storage/v2/loki/test_client.py | 79 +++++++++---- 2 files changed, 134 insertions(+), 56 deletions(-) diff --git a/cloudkitty/storage/v2/loki/client.py b/cloudkitty/storage/v2/loki/client.py index 028ca152..560f20af 100644 --- a/cloudkitty/storage/v2/loki/client.py +++ b/cloudkitty/storage/v2/loki/client.py @@ -38,21 +38,27 @@ class LokiClient(object): 'Content-Type': content_type } self._buffer_size = buffer_size - self._points = [] + self._points = {} self._shard_days = shard_days self._cert = cert self._verify = verify def _build_payload_json(self, batch): - payload = { - "streams": [ - { - "stream": self._stream_labels, - "values": batch - } - ] - } + """Build payload with separate streams per project_id.""" + streams = [] + for project_id, values in batch.items(): + # Combine base stream labels with project_id + stream_labels = self._stream_labels.copy() + # Only add project_id label if it's not None + if project_id is not None: + stream_labels['project_id'] = project_id + streams.append({ + "stream": stream_labels, + "values": values + }) + + payload = {"streams": streams} return payload def _dict_to_loki_query(self, tags_dict, groupby=False, brackets=True): @@ -76,9 +82,12 @@ class LokiClient(object): else: return ', '.join(pairs) - def _base_query(self): + def _base_query(self, project_id=None): """Makes sure that we always get json results.""" - return self._dict_to_loki_query(self._stream_labels) + ' | json' + stream_labels = self._stream_labels.copy() + if project_id is not None: + stream_labels['project_id'] = project_id + return self._dict_to_loki_query(stream_labels) + ' | json' def search(self, query, begin, end, limit): url = f"{self._base_url}/query_range" @@ -93,6 +102,7 @@ class LokiClient(object): "limit": limit } + LOG.debug(f"Executing Loki query: {params}") response = requests.get(url, params=params, headers=self._headers, cert=self._cert, verify=self._verify) @@ -109,23 +119,27 @@ class LokiClient(object): """Send messages to Loki in batches.""" url = f"{self._base_url}/push" - while self._points: - payload = self._build_payload_json(self._points) - response = requests.post(url, json=payload, headers=self._headers, - cert=self._cert, verify=self._verify) + if not self._points: + return - if response.status_code == 204: - LOG.debug( - f"Batch of {len(self._points)} messages pushed " - f"successfully." - ) - self._points = [] - else: - LOG.error( - f"Failed to push logs: {response.status_code} - " - f"{response.text}" - ) - break + # Count total points across all project_ids + total_points = sum(len(points) for points in self._points.values()) + + payload = self._build_payload_json(self._points) + response = requests.post(url, json=payload, headers=self._headers, + cert=self._cert, verify=self._verify) + + if response.status_code == 204: + LOG.debug( + f"Batch of {total_points} messages across " + f"{len(self._points)} project(s) pushed successfully." + ) + self._points = {} + else: + LOG.error( + f"Failed to push logs: {response.status_code} - " + f"{response.text}" + ) def delete_by_query(self, query, begin, end): url = f"{self._base_url}/delete" @@ -153,12 +167,20 @@ class LokiClient(object): ) def delete(self, begin, end, filters): - query = self._base_query() + # Extract project_id from filters to use in stream selector + project_id = None + remaining_filters = filters + if filters and 'project_id' in filters: + project_id = filters['project_id'] + remaining_filters = {k: v for k, v in filters.items() + if k != 'project_id'} + + query = self._base_query(project_id=project_id) loki_query_parts = [] - if filters: + if remaining_filters: loki_query_parts.append( self._dict_to_loki_query( - filters, groupby=True, brackets=False + remaining_filters, groupby=True, brackets=False ) ) @@ -169,13 +191,20 @@ class LokiClient(object): def _retrieve(self, begin, end, filters, metric_types, limit): """Retrieves dataframes stored in Loki.""" - query = self._base_query() + project_id = None + remaining_filters = filters + if filters and 'project_id' in filters: + project_id = filters['project_id'] + remaining_filters = {k: v for k, v in filters.items() + if k != 'project_id'} + + query = self._base_query(project_id=project_id) loki_query_parts = [] - if filters: + if remaining_filters: loki_query_parts.append( self._dict_to_loki_query( - filters, groupby=True, brackets=False + remaining_filters, groupby=True, brackets=False ) ) @@ -234,10 +263,13 @@ class LokiClient(object): return total, data def add_point(self, point, type, start, end): - """Append a point to the client.""" + """Append a point to the client, grouped by project_id.""" timestamp_ns = int(end.timestamp() * 1_000_000_000) timestamp = str(timestamp_ns) + # Extract project_id from point.groupby (use None if not present) + project_id = point.groupby.get('project_id', None) + data = { 'start': start, 'end': end, @@ -251,9 +283,16 @@ class LokiClient(object): } log_line = json.dumps(data) - self._points.append([timestamp, log_line]) - if len(self._points) >= self._buffer_size: + # Group points by project_id + if project_id not in self._points: + self._points[project_id] = [] + + self._points[project_id].append([timestamp, log_line]) + + # Check if any project has reached buffer size + if any(len(points) >= self._buffer_size for points + in self._points.values()): self.push() def total(self, begin, end, metric_types, filters, groupby, diff --git a/cloudkitty/tests/storage/v2/loki/test_client.py b/cloudkitty/tests/storage/v2/loki/test_client.py index b3524543..745d1dbe 100644 --- a/cloudkitty/tests/storage/v2/loki/test_client.py +++ b/cloudkitty/tests/storage/v2/loki/test_client.py @@ -72,7 +72,7 @@ class TestLokiClient(unittest.TestCase): self.assertEqual(self.client._headers['Content-Type'], self.content_type) self.assertEqual(self.client._buffer_size, self.buffer_size) - self.assertEqual(self.client._points, []) + self.assertEqual(self.client._points, {}) self.assertEqual(self.client._cert, self.cert) self.assertEqual(self.client._verify, self.verify) @@ -83,18 +83,44 @@ class TestLokiClient(unittest.TestCase): None, True) def test_build_payload_json(self, mock_log, mock_requests): - batch = [["1609459200000000000", "log line 1"], - ["1609459200000000001", "log line 2"]] + batch = { + "proj1": [["1609459200000000000", "log line 1"], + ["1609459200000000001", "log line 2"]], + "proj2": [["1609459200000000002", "log line 3"]] + } + payload = self.client._build_payload_json(batch) + expected_payload = { + "streams": [ + { + "stream": {**self.stream_labels, "project_id": "proj1"}, + "values": batch["proj1"] + }, + { + "stream": {**self.stream_labels, "project_id": "proj2"}, + "values": batch["proj2"] + } + ] + } + self.assertEqual(payload, expected_payload) + + def test_build_payload_json_with_none_project_id( + self, mock_log, mock_requests): + batch = { + None: [["1609459200000000000", "log line 1"], + ["1609459200000000001", "log line 2"]] + } payload = self.client._build_payload_json(batch) expected_payload = { "streams": [ { "stream": self.stream_labels, - "values": batch + "values": batch[None] } ] } self.assertEqual(payload, expected_payload) + # Verify that project_id is NOT in the stream labels + self.assertNotIn('project_id', payload['streams'][0]['stream']) def test_dict_to_loki_query(self, mock_log, mock_requests): self.assertEqual(self.client._dict_to_loki_query({}), '{}') @@ -200,18 +226,21 @@ class TestLokiClient(unittest.TestCase): mock_response = MagicMock() mock_response.status_code = 204 mock_requests.post.return_value = mock_response - self.client._points = [["ts1", "log1"], ["ts2", "log2"]] + self.client._points = { + "proj1": [["ts1", "log1"], ["ts2", "log2"]] + } self.client.push() expected_url = f"{self.base_url}/push" expected_payload = self.client._build_payload_json( - [["ts1", "log1"], ["ts2", "log2"]] + {"proj1": [["ts1", "log1"], ["ts2", "log2"]]} ) mock_requests.post.assert_called_once_with( expected_url, json=expected_payload, headers=self.client._headers, cert=self.client._cert, verify=self.client._verify ) - self.assertEqual(self.client._points, []) - log_msg = "Batch of 2 messages pushed successfully." + self.assertEqual(self.client._points, {}) + log_msg = ("Batch of 2 messages across 1 project(s) " + "pushed successfully.") mock_log.debug.assert_called_once_with(log_msg) def test_push_failure(self, mock_log, mock_requests): @@ -219,8 +248,8 @@ class TestLokiClient(unittest.TestCase): mock_response.status_code = 400 mock_response.text = "Bad Request" mock_requests.post.return_value = mock_response - initial_points = [["ts1", "log1"], ["ts2", "log2"]] - self.client._points = list(initial_points) + initial_points = {"proj1": [["ts1", "log1"], ["ts2", "log2"]]} + self.client._points = dict(initial_points) self.client.push() self.assertEqual(self.client._points, initial_points) expected_msg = "Failed to push logs: 400 - Bad Request" @@ -234,7 +263,7 @@ class TestLokiClient(unittest.TestCase): ) def test_push_no_points(self, mock_log, mock_requests): - self.client._points = [] + self.client._points = {} self.client.push() mock_requests.post.assert_not_called() @@ -267,9 +296,11 @@ class TestLokiClient(unittest.TestCase): metric_types = "cpu_util" total, output = self.client.retrieve(self.begin_dt, self.end_dt, filters, metric_types, 50) - base_query = self.client._base_query() + # project_id is now part of the stream selector, not groupby filter + base_query = self.client._base_query(project_id="proj1") + # Only remaining filters (region) go in groupby filter_query_part = self.client._dict_to_loki_query( - filters, groupby=True, brackets=False + {"region": "reg1"}, groupby=True, brackets=False ) metric_query_part = f'type = "{metric_types}"' expected_full_query = ( @@ -337,8 +368,11 @@ class TestLokiClient(unittest.TestCase): self.client._buffer_size = 3 point = MockDataPoint(qty=1, price=10) self.client.add_point(point, "test_type", self.begin_dt, self.end_dt) + # _points is now a dict with project_id as key self.assertEqual(len(self.client._points), 1) - added_point_data = json.loads(self.client._points[0][1]) + self.assertIn('proj1', self.client._points) + self.assertEqual(len(self.client._points['proj1']), 1) + added_point_data = json.loads(self.client._points['proj1'][0][1]) self.assertEqual(added_point_data['type'], "test_type") self.assertEqual(added_point_data['qty'], 1) self.assertEqual(added_point_data['price'], 10) @@ -354,7 +388,10 @@ class TestLokiClient(unittest.TestCase): self.client._buffer_size = 1 point = MockDataPoint() self.client.add_point(point, "test_type", self.begin_dt, self.end_dt) + # _points is now a dict with project_id as key self.assertEqual(len(self.client._points), 1) + self.assertIn('proj1', self.client._points) + self.assertEqual(len(self.client._points['proj1']), 1) mock_push.assert_called_once() @patch.object(client.LokiClient, 'retrieve') @@ -531,17 +568,19 @@ class TestLokiClient(unittest.TestCase): @patch.object(client.LokiClient, '_base_query') def test_delete_with_filters(self, mock_base_query, mock_delete_by_query, mock_log, mock_requests_arg): - mock_base_query.return_value = '{app="cloudkitty", source="test"} ' \ - '| json' + mock_base_query.return_value = ( + '{app="cloudkitty", source="test", project_id="proj1"} | json' + ) filters = {"project_id": "proj1", "resource_type": "instance"} self.client.delete(self.begin_dt, self.end_dt, filters) - exp_query_filters = 'groupby_project_id="proj1", ' \ - 'groupby_resource_type="instance"' + # project_id is now in stream selector via _base_query + # only resource_type goes in groupby filters + exp_query_filters = 'groupby_resource_type="instance"' exp_query = f'{mock_base_query.return_value} | {exp_query_filters}' mock_delete_by_query.assert_called_once_with( exp_query, self.begin_dt, self.end_dt ) - mock_base_query.assert_called_once() + mock_base_query.assert_called_once_with(project_id="proj1") @patch.object(client.LokiClient, 'delete_by_query') @patch.object(client.LokiClient, '_base_query') @@ -553,7 +592,7 @@ class TestLokiClient(unittest.TestCase): mock_delete_by_query.assert_called_once_with( mock_base_query.return_value, self.begin_dt, self.end_dt ) - mock_base_query.assert_called_once() + mock_base_query.assert_called_once_with(project_id=None) @patch.object(client.LokiClient, '_retrieve') def test_retrieve_shards_with_long_time_range(