From 1a838653fb83ea903333fffc184be64af8ed827f Mon Sep 17 00:00:00 2001 From: jlarriba Date: Mon, 15 Sep 2025 12:32:17 +0200 Subject: [PATCH] Support TLS connections to Loki storage The Loki storage could be behind an HTTPS proxy that requires TLS encrypted connections. This change supports two different TLS configurations: * Connect via one-way TLS as a normal HTTPS connection via the ca_file parameter to provide encryption * Connect via two-way TLS as a mutual TLS connection for encryption and authentication. If cert_file and key_file are provided, they will be sent to Loki to authenticate the requests. Both methods supports the insecure parameter in case the ca_file has not been provided or it is incorrect. With that, the driver will not verify the certificates. Depends-On: https://review.opendev.org/c/openstack/cloudkitty/+/950868 Change-Id: Idfbd5f6e85b572b129b8595fa1eb122b80827d90 Signed-off-by: jlarriba --- cloudkitty/storage/v2/loki/__init__.py | 29 ++++-- cloudkitty/storage/v2/loki/client.py | 16 +++- .../tests/storage/v2/loki/test_client.py | 93 ++++++++----------- cloudkitty/tests/storage/v2/loki_utils.py | 6 +- doc/source/admin/configuration/storage.rst | 10 +- 5 files changed, 85 insertions(+), 69 deletions(-) diff --git a/cloudkitty/storage/v2/loki/__init__.py b/cloudkitty/storage/v2/loki/__init__.py index bef4cdd4..e6423ef5 100644 --- a/cloudkitty/storage/v2/loki/__init__.py +++ b/cloudkitty/storage/v2/loki/__init__.py @@ -37,8 +37,8 @@ loki_storage_opts = [ default='http://localhost:3100/loki/api/v1'), cfg.StrOpt( 'tenant', - help='The loki tenant to be used. Defaults to tenant1.', - default='tenant1'), + help='The loki tenant to be used. Defaults to cloudkitty.', + default='cloudkitty'), cfg.DictOpt( 'stream', help='The labels that are going to be used to define the Loki stream ' @@ -60,8 +60,18 @@ loki_storage_opts = [ help='Set to true to allow insecure HTTPS connections to Loki', default=False), cfg.StrOpt( - 'cafile', + 'ca_file', help='Path of the CA certificate to trust for HTTPS connections.', + default=None), + cfg.StrOpt( + 'cert_file', + help="Path to a client cert for establishing mTLS connections to " + "Loki.", + default=None), + cfg.StrOpt( + 'key_file', + help="Path to a client key for establishing mTLS connections to " + "Loki.", default=None) ] @@ -74,15 +84,22 @@ class LokiStorage(v2_storage.BaseStorage): super(LokiStorage, self).__init__(*args, **kwargs) verify = not CONF.storage_loki.insecure - if verify and CONF.storage_loki.cafile: - verify = CONF.storage_loki.cafile + if verify and CONF.storage_loki.ca_file: + verify = CONF.storage_loki.ca_file + + if CONF.storage_loki.cert_file and CONF.storage_loki.key_file: + cert = (CONF.storage_loki.cert_file, CONF.storage_loki.key_file) + else: + cert = None self._conn = os_client.LokiClient( CONF.storage_loki.url, CONF.storage_loki.tenant, CONF.storage_loki.stream, CONF.storage_loki.content_type, - CONF.storage_loki.buffer_size) + CONF.storage_loki.buffer_size, + cert, + verify) def init(self): LOG.debug('LokiStorage Init.') diff --git a/cloudkitty/storage/v2/loki/client.py b/cloudkitty/storage/v2/loki/client.py index 2b752123..0be31c2c 100644 --- a/cloudkitty/storage/v2/loki/client.py +++ b/cloudkitty/storage/v2/loki/client.py @@ -24,7 +24,8 @@ LOG = log.getLogger(__name__) class LokiClient(object): """Class used to ease interaction with Loki.""" - def __init__(self, url, tenant, stream_labels, content_type, buffer_size): + def __init__(self, url, tenant, stream_labels, content_type, buffer_size, + cert, verify): if content_type != "application/json": raise exceptions.UnsupportedContentType(content_type) @@ -37,6 +38,9 @@ class LokiClient(object): self._buffer_size = buffer_size self._points = [] + self._cert = cert + self._verify = verify + def _build_payload_json(self, batch): payload = { "streams": [ @@ -86,7 +90,8 @@ class LokiClient(object): "limit": limit } - response = requests.get(url, params=params, headers=self._headers) + response = requests.get(url, params=params, headers=self._headers, + cert=self._cert, verify=self._verify) if response.status_code == 200: data = response.json()['data'] @@ -103,7 +108,8 @@ class LokiClient(object): while self._points: payload = self._build_payload_json(self._points) - response = requests.post(url, json=payload, headers=self._headers) + response = requests.post(url, json=payload, headers=self._headers, + cert=self._cert, verify=self._verify) if response.status_code == 204: LOG.debug( @@ -130,8 +136,8 @@ class LokiClient(object): "end": int(end.timestamp()), } - LOG.debug(f"Request Params: {params}") - response = requests.post(url, params=params, headers=self._headers) + response = requests.post(url, params=params, headers=self._headers, + cert=self._cert, verify=self._verify) if response.status_code == 204: LOG.debug( diff --git a/cloudkitty/tests/storage/v2/loki/test_client.py b/cloudkitty/tests/storage/v2/loki/test_client.py index f5271927..4e0081ef 100644 --- a/cloudkitty/tests/storage/v2/loki/test_client.py +++ b/cloudkitty/tests/storage/v2/loki/test_client.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. # + from datetime import datetime from datetime import timezone import json @@ -47,12 +48,16 @@ class TestLokiClient(unittest.TestCase): self.stream_labels = {"app": "cloudkitty", "source": "test"} self.content_type = "application/json" self.buffer_size = 2 + self.cert = ('/path/to/cert', '/path/to/key') + self.verify = '/path/to/cafile' self.client = client.LokiClient( self.base_url, self.tenant, self.stream_labels, self.content_type, - self.buffer_size + self.buffer_size, + cert=self.cert, + verify=self.verify ) self.begin_dt = datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc) self.end_dt = datetime(2024, 1, 1, 1, 0, 0, tzinfo=timezone.utc) @@ -65,11 +70,13 @@ class TestLokiClient(unittest.TestCase): self.content_type) self.assertEqual(self.client._buffer_size, self.buffer_size) self.assertEqual(self.client._points, []) + self.assertEqual(self.client._cert, self.cert) + self.assertEqual(self.client._verify, self.verify) def test_init_unsupported_content_type(self, mock_log, mock_requests): with self.assertRaises(exceptions.UnsupportedContentType): client.LokiClient(self.base_url, self.tenant, self.stream_labels, - "text/plain", self.buffer_size) + "text/plain", self.buffer_size, None, True) def test_build_payload_json(self, mock_log, mock_requests): batch = [["1609459200000000000", "log line 1"], @@ -97,7 +104,6 @@ class TestLokiClient(unittest.TestCase): {"foo": "bar", "baz": "qux"})) self.assertIn('baz="qux"', self.client._dict_to_loki_query( {"foo": "bar", "baz": "qux"})) - self.assertEqual( self.client._dict_to_loki_query({"foo": ["bar", "baz"]}), '{foo="bar"}' @@ -140,10 +146,8 @@ class TestLokiClient(unittest.TestCase): } } mock_requests.get.return_value = mock_response - query = '{app="test"} | json' data = self.client.search(query, self.begin_dt, self.end_dt, 100) - expected_url = f"{self.base_url}/query_range" expected_params = { "query": query, @@ -154,7 +158,9 @@ class TestLokiClient(unittest.TestCase): mock_requests.get.assert_called_once_with( expected_url, params=expected_params, - headers=self.client._headers + headers=self.client._headers, + cert=self.client._cert, + verify=self.client._verify ) self.assertEqual( data, @@ -166,23 +172,21 @@ class TestLokiClient(unittest.TestCase): mock_response.status_code = 200 mock_response.json.return_value = {"data": {"result": []}} mock_requests.get.return_value = mock_response - self.client.search(None, self.begin_dt, self.end_dt, 100) - expected_query = self.client._base_query() mock_requests.get.assert_called_once() _called_args, called_kwargs = mock_requests.get.call_args self.assertEqual(called_kwargs['params']['query'], expected_query) + self.assertEqual(called_kwargs['cert'], self.client._cert) + self.assertEqual(called_kwargs['verify'], self.client._verify) def test_search_failure(self, mock_log, mock_requests): mock_response = MagicMock() mock_response.status_code = 500 mock_response.text = "Internal Server Error" mock_requests.get.return_value = mock_response - query = '{app="test"} | json' data = self.client.search(query, self.begin_dt, self.end_dt, 100) - self.assertEqual(data, []) expected_msg = ("Failed to query logs or empty result: 500 - " "Internal Server Error") @@ -192,17 +196,15 @@ class TestLokiClient(unittest.TestCase): mock_response = MagicMock() mock_response.status_code = 204 mock_requests.post.return_value = mock_response - self.client._points = [["ts1", "log1"], ["ts2", "log2"]] self.client.push() - expected_url = f"{self.base_url}/push" expected_payload = self.client._build_payload_json( [["ts1", "log1"], ["ts2", "log2"]] ) - mock_requests.post.assert_called_once_with( - expected_url, json=expected_payload, headers=self.client._headers + expected_url, json=expected_payload, headers=self.client._headers, + cert=self.client._cert, verify=self.client._verify ) self.assertEqual(self.client._points, []) log_msg = "Batch of 2 messages pushed successfully." @@ -213,14 +215,19 @@ class TestLokiClient(unittest.TestCase): mock_response.status_code = 400 mock_response.text = "Bad Request" mock_requests.post.return_value = mock_response - initial_points = [["ts1", "log1"], ["ts2", "log2"]] self.client._points = list(initial_points) self.client.push() - self.assertEqual(self.client._points, initial_points) expected_msg = "Failed to push logs: 400 - Bad Request" mock_log.error.assert_called_once_with(expected_msg) + mock_requests.post.assert_called_once_with( + f"{self.base_url}/push", + json=self.client._build_payload_json(initial_points), + headers=self.client._headers, + cert=self.client._cert, + verify=self.client._verify + ) def test_push_no_points(self, mock_log, mock_requests): self.client._points = [] @@ -235,10 +242,8 @@ class TestLokiClient(unittest.TestCase): "result": [{"stream": {}, "values": [["ts", '{"key":"val"}']]}] } mock_search.return_value = mock_search_result - total, output = self.client.retrieve(self.begin_dt, self.end_dt, None, None, 100) - expected_base_query = self.client._base_query() mock_search.assert_called_once_with( expected_base_query, self.begin_dt, self.end_dt, 100 @@ -256,10 +261,8 @@ class TestLokiClient(unittest.TestCase): mock_search.return_value = mock_search_result filters = {"project_id": "proj1", "region": "reg1"} metric_types = "cpu_util" - total, output = self.client.retrieve(self.begin_dt, self.end_dt, filters, metric_types, 50) - base_query = self.client._base_query() filter_query_part = self.client._dict_to_loki_query( filters, groupby=True, brackets=False @@ -268,7 +271,6 @@ class TestLokiClient(unittest.TestCase): expected_full_query = ( f"{base_query} | {filter_query_part}, {metric_query_part}" ) - mock_search.assert_called_once_with( expected_full_query, self.begin_dt, self.end_dt, 50 ) @@ -283,10 +285,8 @@ class TestLokiClient(unittest.TestCase): "result": ["data"] } metric_types = ["cpu_util", "ram_util"] - self.client.retrieve(self.begin_dt, self.end_dt, None, metric_types, 50) - base_query = self.client._base_query() metric_query_part = f'type = "{metric_types[0]}"' expected_full_query = f"{base_query} | {metric_query_part}" @@ -299,13 +299,10 @@ class TestLokiClient(unittest.TestCase): self, mock_search, mock_loki_client_log, mock_requests_arg): mock_search.return_value = [] expected_query_for_log = self.client._base_query() - total, output = self.client.retrieve(self.begin_dt, self.end_dt, None, None, 100) - self.assertEqual(total, 0) self.assertEqual(output, []) - expected_log_message_case1 = ( f"Data from Loki search is not in the expected dictionary format " f"or is missing keys. Query: '{expected_query_for_log}'. " @@ -314,17 +311,13 @@ class TestLokiClient(unittest.TestCase): mock_loki_client_log.warning.assert_called_with( expected_log_message_case1 ) - mock_search.reset_mock() mock_loki_client_log.reset_mock() - mock_search.return_value = {"nodata": True} total, output = self.client.retrieve(self.begin_dt, self.end_dt, None, None, 100) - self.assertEqual(total, 0) self.assertEqual(output, []) - expected_log_message_case2 = ( f"Data from Loki search is not in the expected dictionary format " f"or is missing keys. Query: '{expected_query_for_log}'. " @@ -339,9 +332,7 @@ class TestLokiClient(unittest.TestCase): mock_requests_arg): self.client._buffer_size = 3 point = MockDataPoint(qty=1, price=10) - self.client.add_point(point, "test_type", self.begin_dt, self.end_dt) - self.assertEqual(len(self.client._points), 1) added_point_data = json.loads(self.client._points[0][1]) self.assertEqual(added_point_data['type'], "test_type") @@ -358,7 +349,6 @@ class TestLokiClient(unittest.TestCase): mock_requests_arg): self.client._buffer_size = 1 point = MockDataPoint() - self.client.add_point(point, "test_type", self.begin_dt, self.end_dt) self.assertEqual(len(self.client._points), 1) mock_push.assert_called_once() @@ -383,12 +373,10 @@ class TestLokiClient(unittest.TestCase): }, ] mock_retrieve.return_value = (2, loki_data_for_total) - count, result = self.client.total( self.begin_dt, self.end_dt, "some_type", None, None, None, 0, 100, False ) - mock_retrieve.assert_called_once_with( self.begin_dt, self.end_dt, None, "some_type", 100 ) @@ -437,19 +425,16 @@ class TestLokiClient(unittest.TestCase): ] mock_retrieve.return_value = (4, loki_data_for_total) groupby_fields = ["type", "project_id"] - count, result = self.client.total( self.begin_dt, self.end_dt, "any_metric_type", {"filter_key": "val"}, groupby_fields, None, 0, 100, False ) - mock_retrieve.assert_called_once_with( self.begin_dt, self.end_dt, {"filter_key": "val"}, "any_metric_type", 100 ) self.assertEqual(count, 3) - expected_results_map = { tuple(sorted({'type': 'typeA', 'project_id': 'proj1'}.items())): @@ -477,12 +462,10 @@ class TestLokiClient(unittest.TestCase): mock_retrieve.return_value = (0, []) custom_fields = ["field1", "field2"] offset = 5 - self.client.total( self.begin_dt, self.end_dt, None, None, None, custom_fields, offset, 100, False ) - mock_log.warning.assert_any_call( "'custom_fields' are not implemented yet for Loki. Therefore, " "the custom fields [%s] informed by the user will be ignored.", @@ -496,11 +479,18 @@ class TestLokiClient(unittest.TestCase): test_query = '{app="cloudkitty"} | json ' \ '| type="compute.instance.exists"}' self.client.delete_by_query(test_query, self.begin_dt, self.end_dt) - mr.post.assert_called_once() + mr.post.assert_called_once_with( + f"{self.base_url}/delete", + params={ + "query": test_query, + "start": int(self.begin_dt.timestamp()), + "end": int(self.end_dt.timestamp()), + }, + headers=self.client._headers, + cert=self.client._cert, + verify=self.client._verify + ) ml.debug.assert_has_calls([ - call(f"Request Params: {{'query': '{test_query}', " - f"'start': {int(self.begin_dt.timestamp())}, " - f"'end': {int(self.end_dt.timestamp())}}}"), call("Dataframes deleted successfully.") ]) mq.assert_not_called() @@ -512,22 +502,21 @@ class TestLokiClient(unittest.TestCase): mock_response.status_code = 500 mock_response.text = "Internal Server Error" mock_requests_arg.post.return_value = mock_response - - test_query = '{app="cloudkitty"} | json | ' - 'type="compute.instance.exists"' + test_query = '{app="cloudkitty"} | json | ' \ + 'type="compute.instance.exists"' self.client.delete_by_query(test_query, self.begin_dt, self.end_dt) - expected_url = f"{self.base_url}/delete" expected_params = { "query": test_query, "start": int(self.begin_dt.timestamp()), "end": int(self.end_dt.timestamp()), } - mock_requests_arg.post.assert_called_once_with( expected_url, params=expected_params, - headers=self.client._headers + headers=self.client._headers, + cert=self.client._cert, + verify=self.client._verify ) expected_error_msg = ("Failed to delete dataframes: " "500 - Internal Server Error") @@ -541,9 +530,7 @@ class TestLokiClient(unittest.TestCase): mock_base_query.return_value = '{app="cloudkitty", source="test"} ' \ '| json' filters = {"project_id": "proj1", "resource_type": "instance"} - self.client.delete(self.begin_dt, self.end_dt, filters) - exp_query_filters = 'groupby_project_id="proj1", ' \ 'groupby_resource_type="instance"' exp_query = f'{mock_base_query.return_value} | {exp_query_filters}' @@ -558,9 +545,7 @@ class TestLokiClient(unittest.TestCase): mock_log, mock_requests_arg): mock_base_query.return_value = '{app="cloudkitty", source="test"} ' \ '| json' - self.client.delete(self.begin_dt, self.end_dt, None) - mock_delete_by_query.assert_called_once_with( mock_base_query.return_value, self.begin_dt, self.end_dt ) diff --git a/cloudkitty/tests/storage/v2/loki_utils.py b/cloudkitty/tests/storage/v2/loki_utils.py index 457cfdc3..bc860436 100644 --- a/cloudkitty/tests/storage/v2/loki_utils.py +++ b/cloudkitty/tests/storage/v2/loki_utils.py @@ -23,7 +23,7 @@ from cloudkitty.utils import json class FakeLokiClient(loki_client_module.LokiClient): def __init__(self, url, tenant, stream_labels, content_type, - buffer_size, **kwargs): + buffer_size, cert, verify, **kwargs): if content_type != "application/json": raise loki_exceptions.UnsupportedContentType(content_type) @@ -36,7 +36,9 @@ class FakeLokiClient(loki_client_module.LokiClient): } self._buffer_size = buffer_size - self._logs = [] + self._cert = cert + self._verify = verify + self.init() def init(self): diff --git a/doc/source/admin/configuration/storage.rst b/doc/source/admin/configuration/storage.rst index 04800655..5024d8f7 100644 --- a/doc/source/admin/configuration/storage.rst +++ b/doc/source/admin/configuration/storage.rst @@ -121,7 +121,7 @@ Section ``storage_loki``: * ``url``: Defaults to ``http://localhost:3100/loki/api/v1``. Loki host, along with port and protocol. -* ``tenant``: Defaults to tenant1. Loki tenant. +* ``tenant``: Defaults to cloudkitty. Loki tenant. * ``stream``: Defaults to ``{"service": "cloudkitty"}``. The labels that are going to be used to define the Loki stream as Python dict. @@ -136,4 +136,10 @@ Section ``storage_loki``: * ``insecure``: Defaults to ``false``. Set to true to allow insecure HTTPS connections to Loki. -* ``cafile``: Path of the CA certificate to trust for HTTPS connections. +* ``ca_file``: Path of the CA certificate to trust for HTTPS connections. + +* ``cert_file``: Path to a client cert for establishing mTLS connections to + Loki. + +* ``key_file``: Path to a client key for establishing mTLS connections to + Loki.