Support TLS connections to Loki storage
The Loki storage could be behind an HTTPS proxy that requires TLS encrypted connections. This change supports two different TLS configurations: * Connect via one-way TLS as a normal HTTPS connection via the ca_file parameter to provide encryption * Connect via two-way TLS as a mutual TLS connection for encryption and authentication. If cert_file and key_file are provided, they will be sent to Loki to authenticate the requests. Both methods supports the insecure parameter in case the ca_file has not been provided or it is incorrect. With that, the driver will not verify the certificates. Depends-On: https://review.opendev.org/c/openstack/cloudkitty/+/950868 Change-Id: Idfbd5f6e85b572b129b8595fa1eb122b80827d90 Signed-off-by: jlarriba <jlarriba@redhat.com>
This commit is contained in:
@@ -37,8 +37,8 @@ loki_storage_opts = [
|
|||||||
default='http://localhost:3100/loki/api/v1'),
|
default='http://localhost:3100/loki/api/v1'),
|
||||||
cfg.StrOpt(
|
cfg.StrOpt(
|
||||||
'tenant',
|
'tenant',
|
||||||
help='The loki tenant to be used. Defaults to tenant1.',
|
help='The loki tenant to be used. Defaults to cloudkitty.',
|
||||||
default='tenant1'),
|
default='cloudkitty'),
|
||||||
cfg.DictOpt(
|
cfg.DictOpt(
|
||||||
'stream',
|
'stream',
|
||||||
help='The labels that are going to be used to define the Loki stream '
|
help='The labels that are going to be used to define the Loki stream '
|
||||||
@@ -60,8 +60,18 @@ loki_storage_opts = [
|
|||||||
help='Set to true to allow insecure HTTPS connections to Loki',
|
help='Set to true to allow insecure HTTPS connections to Loki',
|
||||||
default=False),
|
default=False),
|
||||||
cfg.StrOpt(
|
cfg.StrOpt(
|
||||||
'cafile',
|
'ca_file',
|
||||||
help='Path of the CA certificate to trust for HTTPS connections.',
|
help='Path of the CA certificate to trust for HTTPS connections.',
|
||||||
|
default=None),
|
||||||
|
cfg.StrOpt(
|
||||||
|
'cert_file',
|
||||||
|
help="Path to a client cert for establishing mTLS connections to "
|
||||||
|
"Loki.",
|
||||||
|
default=None),
|
||||||
|
cfg.StrOpt(
|
||||||
|
'key_file',
|
||||||
|
help="Path to a client key for establishing mTLS connections to "
|
||||||
|
"Loki.",
|
||||||
default=None)
|
default=None)
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -74,15 +84,22 @@ class LokiStorage(v2_storage.BaseStorage):
|
|||||||
super(LokiStorage, self).__init__(*args, **kwargs)
|
super(LokiStorage, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
verify = not CONF.storage_loki.insecure
|
verify = not CONF.storage_loki.insecure
|
||||||
if verify and CONF.storage_loki.cafile:
|
if verify and CONF.storage_loki.ca_file:
|
||||||
verify = CONF.storage_loki.cafile
|
verify = CONF.storage_loki.ca_file
|
||||||
|
|
||||||
|
if CONF.storage_loki.cert_file and CONF.storage_loki.key_file:
|
||||||
|
cert = (CONF.storage_loki.cert_file, CONF.storage_loki.key_file)
|
||||||
|
else:
|
||||||
|
cert = None
|
||||||
|
|
||||||
self._conn = os_client.LokiClient(
|
self._conn = os_client.LokiClient(
|
||||||
CONF.storage_loki.url,
|
CONF.storage_loki.url,
|
||||||
CONF.storage_loki.tenant,
|
CONF.storage_loki.tenant,
|
||||||
CONF.storage_loki.stream,
|
CONF.storage_loki.stream,
|
||||||
CONF.storage_loki.content_type,
|
CONF.storage_loki.content_type,
|
||||||
CONF.storage_loki.buffer_size)
|
CONF.storage_loki.buffer_size,
|
||||||
|
cert,
|
||||||
|
verify)
|
||||||
|
|
||||||
def init(self):
|
def init(self):
|
||||||
LOG.debug('LokiStorage Init.')
|
LOG.debug('LokiStorage Init.')
|
||||||
|
|||||||
@@ -24,7 +24,8 @@ LOG = log.getLogger(__name__)
|
|||||||
class LokiClient(object):
|
class LokiClient(object):
|
||||||
"""Class used to ease interaction with Loki."""
|
"""Class used to ease interaction with Loki."""
|
||||||
|
|
||||||
def __init__(self, url, tenant, stream_labels, content_type, buffer_size):
|
def __init__(self, url, tenant, stream_labels, content_type, buffer_size,
|
||||||
|
cert, verify):
|
||||||
if content_type != "application/json":
|
if content_type != "application/json":
|
||||||
raise exceptions.UnsupportedContentType(content_type)
|
raise exceptions.UnsupportedContentType(content_type)
|
||||||
|
|
||||||
@@ -37,6 +38,9 @@ class LokiClient(object):
|
|||||||
self._buffer_size = buffer_size
|
self._buffer_size = buffer_size
|
||||||
self._points = []
|
self._points = []
|
||||||
|
|
||||||
|
self._cert = cert
|
||||||
|
self._verify = verify
|
||||||
|
|
||||||
def _build_payload_json(self, batch):
|
def _build_payload_json(self, batch):
|
||||||
payload = {
|
payload = {
|
||||||
"streams": [
|
"streams": [
|
||||||
@@ -86,7 +90,8 @@ class LokiClient(object):
|
|||||||
"limit": limit
|
"limit": limit
|
||||||
}
|
}
|
||||||
|
|
||||||
response = requests.get(url, params=params, headers=self._headers)
|
response = requests.get(url, params=params, headers=self._headers,
|
||||||
|
cert=self._cert, verify=self._verify)
|
||||||
|
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
data = response.json()['data']
|
data = response.json()['data']
|
||||||
@@ -103,7 +108,8 @@ class LokiClient(object):
|
|||||||
|
|
||||||
while self._points:
|
while self._points:
|
||||||
payload = self._build_payload_json(self._points)
|
payload = self._build_payload_json(self._points)
|
||||||
response = requests.post(url, json=payload, headers=self._headers)
|
response = requests.post(url, json=payload, headers=self._headers,
|
||||||
|
cert=self._cert, verify=self._verify)
|
||||||
|
|
||||||
if response.status_code == 204:
|
if response.status_code == 204:
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
@@ -130,8 +136,8 @@ class LokiClient(object):
|
|||||||
"end": int(end.timestamp()),
|
"end": int(end.timestamp()),
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug(f"Request Params: {params}")
|
response = requests.post(url, params=params, headers=self._headers,
|
||||||
response = requests.post(url, params=params, headers=self._headers)
|
cert=self._cert, verify=self._verify)
|
||||||
|
|
||||||
if response.status_code == 204:
|
if response.status_code == 204:
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
|
|||||||
@@ -12,6 +12,7 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
#
|
#
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from datetime import timezone
|
from datetime import timezone
|
||||||
import json
|
import json
|
||||||
@@ -47,12 +48,16 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
self.stream_labels = {"app": "cloudkitty", "source": "test"}
|
self.stream_labels = {"app": "cloudkitty", "source": "test"}
|
||||||
self.content_type = "application/json"
|
self.content_type = "application/json"
|
||||||
self.buffer_size = 2
|
self.buffer_size = 2
|
||||||
|
self.cert = ('/path/to/cert', '/path/to/key')
|
||||||
|
self.verify = '/path/to/cafile'
|
||||||
self.client = client.LokiClient(
|
self.client = client.LokiClient(
|
||||||
self.base_url,
|
self.base_url,
|
||||||
self.tenant,
|
self.tenant,
|
||||||
self.stream_labels,
|
self.stream_labels,
|
||||||
self.content_type,
|
self.content_type,
|
||||||
self.buffer_size
|
self.buffer_size,
|
||||||
|
cert=self.cert,
|
||||||
|
verify=self.verify
|
||||||
)
|
)
|
||||||
self.begin_dt = datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
|
self.begin_dt = datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
|
||||||
self.end_dt = datetime(2024, 1, 1, 1, 0, 0, tzinfo=timezone.utc)
|
self.end_dt = datetime(2024, 1, 1, 1, 0, 0, tzinfo=timezone.utc)
|
||||||
@@ -65,11 +70,13 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
self.content_type)
|
self.content_type)
|
||||||
self.assertEqual(self.client._buffer_size, self.buffer_size)
|
self.assertEqual(self.client._buffer_size, self.buffer_size)
|
||||||
self.assertEqual(self.client._points, [])
|
self.assertEqual(self.client._points, [])
|
||||||
|
self.assertEqual(self.client._cert, self.cert)
|
||||||
|
self.assertEqual(self.client._verify, self.verify)
|
||||||
|
|
||||||
def test_init_unsupported_content_type(self, mock_log, mock_requests):
|
def test_init_unsupported_content_type(self, mock_log, mock_requests):
|
||||||
with self.assertRaises(exceptions.UnsupportedContentType):
|
with self.assertRaises(exceptions.UnsupportedContentType):
|
||||||
client.LokiClient(self.base_url, self.tenant, self.stream_labels,
|
client.LokiClient(self.base_url, self.tenant, self.stream_labels,
|
||||||
"text/plain", self.buffer_size)
|
"text/plain", self.buffer_size, None, True)
|
||||||
|
|
||||||
def test_build_payload_json(self, mock_log, mock_requests):
|
def test_build_payload_json(self, mock_log, mock_requests):
|
||||||
batch = [["1609459200000000000", "log line 1"],
|
batch = [["1609459200000000000", "log line 1"],
|
||||||
@@ -97,7 +104,6 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
{"foo": "bar", "baz": "qux"}))
|
{"foo": "bar", "baz": "qux"}))
|
||||||
self.assertIn('baz="qux"', self.client._dict_to_loki_query(
|
self.assertIn('baz="qux"', self.client._dict_to_loki_query(
|
||||||
{"foo": "bar", "baz": "qux"}))
|
{"foo": "bar", "baz": "qux"}))
|
||||||
|
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
self.client._dict_to_loki_query({"foo": ["bar", "baz"]}),
|
self.client._dict_to_loki_query({"foo": ["bar", "baz"]}),
|
||||||
'{foo="bar"}'
|
'{foo="bar"}'
|
||||||
@@ -140,10 +146,8 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
mock_requests.get.return_value = mock_response
|
mock_requests.get.return_value = mock_response
|
||||||
|
|
||||||
query = '{app="test"} | json'
|
query = '{app="test"} | json'
|
||||||
data = self.client.search(query, self.begin_dt, self.end_dt, 100)
|
data = self.client.search(query, self.begin_dt, self.end_dt, 100)
|
||||||
|
|
||||||
expected_url = f"{self.base_url}/query_range"
|
expected_url = f"{self.base_url}/query_range"
|
||||||
expected_params = {
|
expected_params = {
|
||||||
"query": query,
|
"query": query,
|
||||||
@@ -154,7 +158,9 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
mock_requests.get.assert_called_once_with(
|
mock_requests.get.assert_called_once_with(
|
||||||
expected_url,
|
expected_url,
|
||||||
params=expected_params,
|
params=expected_params,
|
||||||
headers=self.client._headers
|
headers=self.client._headers,
|
||||||
|
cert=self.client._cert,
|
||||||
|
verify=self.client._verify
|
||||||
)
|
)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
data,
|
data,
|
||||||
@@ -166,23 +172,21 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
mock_response.status_code = 200
|
mock_response.status_code = 200
|
||||||
mock_response.json.return_value = {"data": {"result": []}}
|
mock_response.json.return_value = {"data": {"result": []}}
|
||||||
mock_requests.get.return_value = mock_response
|
mock_requests.get.return_value = mock_response
|
||||||
|
|
||||||
self.client.search(None, self.begin_dt, self.end_dt, 100)
|
self.client.search(None, self.begin_dt, self.end_dt, 100)
|
||||||
|
|
||||||
expected_query = self.client._base_query()
|
expected_query = self.client._base_query()
|
||||||
mock_requests.get.assert_called_once()
|
mock_requests.get.assert_called_once()
|
||||||
_called_args, called_kwargs = mock_requests.get.call_args
|
_called_args, called_kwargs = mock_requests.get.call_args
|
||||||
self.assertEqual(called_kwargs['params']['query'], expected_query)
|
self.assertEqual(called_kwargs['params']['query'], expected_query)
|
||||||
|
self.assertEqual(called_kwargs['cert'], self.client._cert)
|
||||||
|
self.assertEqual(called_kwargs['verify'], self.client._verify)
|
||||||
|
|
||||||
def test_search_failure(self, mock_log, mock_requests):
|
def test_search_failure(self, mock_log, mock_requests):
|
||||||
mock_response = MagicMock()
|
mock_response = MagicMock()
|
||||||
mock_response.status_code = 500
|
mock_response.status_code = 500
|
||||||
mock_response.text = "Internal Server Error"
|
mock_response.text = "Internal Server Error"
|
||||||
mock_requests.get.return_value = mock_response
|
mock_requests.get.return_value = mock_response
|
||||||
|
|
||||||
query = '{app="test"} | json'
|
query = '{app="test"} | json'
|
||||||
data = self.client.search(query, self.begin_dt, self.end_dt, 100)
|
data = self.client.search(query, self.begin_dt, self.end_dt, 100)
|
||||||
|
|
||||||
self.assertEqual(data, [])
|
self.assertEqual(data, [])
|
||||||
expected_msg = ("Failed to query logs or empty result: 500 - "
|
expected_msg = ("Failed to query logs or empty result: 500 - "
|
||||||
"Internal Server Error")
|
"Internal Server Error")
|
||||||
@@ -192,17 +196,15 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
mock_response = MagicMock()
|
mock_response = MagicMock()
|
||||||
mock_response.status_code = 204
|
mock_response.status_code = 204
|
||||||
mock_requests.post.return_value = mock_response
|
mock_requests.post.return_value = mock_response
|
||||||
|
|
||||||
self.client._points = [["ts1", "log1"], ["ts2", "log2"]]
|
self.client._points = [["ts1", "log1"], ["ts2", "log2"]]
|
||||||
self.client.push()
|
self.client.push()
|
||||||
|
|
||||||
expected_url = f"{self.base_url}/push"
|
expected_url = f"{self.base_url}/push"
|
||||||
expected_payload = self.client._build_payload_json(
|
expected_payload = self.client._build_payload_json(
|
||||||
[["ts1", "log1"], ["ts2", "log2"]]
|
[["ts1", "log1"], ["ts2", "log2"]]
|
||||||
)
|
)
|
||||||
|
|
||||||
mock_requests.post.assert_called_once_with(
|
mock_requests.post.assert_called_once_with(
|
||||||
expected_url, json=expected_payload, headers=self.client._headers
|
expected_url, json=expected_payload, headers=self.client._headers,
|
||||||
|
cert=self.client._cert, verify=self.client._verify
|
||||||
)
|
)
|
||||||
self.assertEqual(self.client._points, [])
|
self.assertEqual(self.client._points, [])
|
||||||
log_msg = "Batch of 2 messages pushed successfully."
|
log_msg = "Batch of 2 messages pushed successfully."
|
||||||
@@ -213,14 +215,19 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
mock_response.status_code = 400
|
mock_response.status_code = 400
|
||||||
mock_response.text = "Bad Request"
|
mock_response.text = "Bad Request"
|
||||||
mock_requests.post.return_value = mock_response
|
mock_requests.post.return_value = mock_response
|
||||||
|
|
||||||
initial_points = [["ts1", "log1"], ["ts2", "log2"]]
|
initial_points = [["ts1", "log1"], ["ts2", "log2"]]
|
||||||
self.client._points = list(initial_points)
|
self.client._points = list(initial_points)
|
||||||
self.client.push()
|
self.client.push()
|
||||||
|
|
||||||
self.assertEqual(self.client._points, initial_points)
|
self.assertEqual(self.client._points, initial_points)
|
||||||
expected_msg = "Failed to push logs: 400 - Bad Request"
|
expected_msg = "Failed to push logs: 400 - Bad Request"
|
||||||
mock_log.error.assert_called_once_with(expected_msg)
|
mock_log.error.assert_called_once_with(expected_msg)
|
||||||
|
mock_requests.post.assert_called_once_with(
|
||||||
|
f"{self.base_url}/push",
|
||||||
|
json=self.client._build_payload_json(initial_points),
|
||||||
|
headers=self.client._headers,
|
||||||
|
cert=self.client._cert,
|
||||||
|
verify=self.client._verify
|
||||||
|
)
|
||||||
|
|
||||||
def test_push_no_points(self, mock_log, mock_requests):
|
def test_push_no_points(self, mock_log, mock_requests):
|
||||||
self.client._points = []
|
self.client._points = []
|
||||||
@@ -235,10 +242,8 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
"result": [{"stream": {}, "values": [["ts", '{"key":"val"}']]}]
|
"result": [{"stream": {}, "values": [["ts", '{"key":"val"}']]}]
|
||||||
}
|
}
|
||||||
mock_search.return_value = mock_search_result
|
mock_search.return_value = mock_search_result
|
||||||
|
|
||||||
total, output = self.client.retrieve(self.begin_dt, self.end_dt,
|
total, output = self.client.retrieve(self.begin_dt, self.end_dt,
|
||||||
None, None, 100)
|
None, None, 100)
|
||||||
|
|
||||||
expected_base_query = self.client._base_query()
|
expected_base_query = self.client._base_query()
|
||||||
mock_search.assert_called_once_with(
|
mock_search.assert_called_once_with(
|
||||||
expected_base_query, self.begin_dt, self.end_dt, 100
|
expected_base_query, self.begin_dt, self.end_dt, 100
|
||||||
@@ -256,10 +261,8 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
mock_search.return_value = mock_search_result
|
mock_search.return_value = mock_search_result
|
||||||
filters = {"project_id": "proj1", "region": "reg1"}
|
filters = {"project_id": "proj1", "region": "reg1"}
|
||||||
metric_types = "cpu_util"
|
metric_types = "cpu_util"
|
||||||
|
|
||||||
total, output = self.client.retrieve(self.begin_dt, self.end_dt,
|
total, output = self.client.retrieve(self.begin_dt, self.end_dt,
|
||||||
filters, metric_types, 50)
|
filters, metric_types, 50)
|
||||||
|
|
||||||
base_query = self.client._base_query()
|
base_query = self.client._base_query()
|
||||||
filter_query_part = self.client._dict_to_loki_query(
|
filter_query_part = self.client._dict_to_loki_query(
|
||||||
filters, groupby=True, brackets=False
|
filters, groupby=True, brackets=False
|
||||||
@@ -268,7 +271,6 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
expected_full_query = (
|
expected_full_query = (
|
||||||
f"{base_query} | {filter_query_part}, {metric_query_part}"
|
f"{base_query} | {filter_query_part}, {metric_query_part}"
|
||||||
)
|
)
|
||||||
|
|
||||||
mock_search.assert_called_once_with(
|
mock_search.assert_called_once_with(
|
||||||
expected_full_query, self.begin_dt, self.end_dt, 50
|
expected_full_query, self.begin_dt, self.end_dt, 50
|
||||||
)
|
)
|
||||||
@@ -283,10 +285,8 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
"result": ["data"]
|
"result": ["data"]
|
||||||
}
|
}
|
||||||
metric_types = ["cpu_util", "ram_util"]
|
metric_types = ["cpu_util", "ram_util"]
|
||||||
|
|
||||||
self.client.retrieve(self.begin_dt, self.end_dt, None,
|
self.client.retrieve(self.begin_dt, self.end_dt, None,
|
||||||
metric_types, 50)
|
metric_types, 50)
|
||||||
|
|
||||||
base_query = self.client._base_query()
|
base_query = self.client._base_query()
|
||||||
metric_query_part = f'type = "{metric_types[0]}"'
|
metric_query_part = f'type = "{metric_types[0]}"'
|
||||||
expected_full_query = f"{base_query} | {metric_query_part}"
|
expected_full_query = f"{base_query} | {metric_query_part}"
|
||||||
@@ -299,13 +299,10 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
self, mock_search, mock_loki_client_log, mock_requests_arg):
|
self, mock_search, mock_loki_client_log, mock_requests_arg):
|
||||||
mock_search.return_value = []
|
mock_search.return_value = []
|
||||||
expected_query_for_log = self.client._base_query()
|
expected_query_for_log = self.client._base_query()
|
||||||
|
|
||||||
total, output = self.client.retrieve(self.begin_dt, self.end_dt,
|
total, output = self.client.retrieve(self.begin_dt, self.end_dt,
|
||||||
None, None, 100)
|
None, None, 100)
|
||||||
|
|
||||||
self.assertEqual(total, 0)
|
self.assertEqual(total, 0)
|
||||||
self.assertEqual(output, [])
|
self.assertEqual(output, [])
|
||||||
|
|
||||||
expected_log_message_case1 = (
|
expected_log_message_case1 = (
|
||||||
f"Data from Loki search is not in the expected dictionary format "
|
f"Data from Loki search is not in the expected dictionary format "
|
||||||
f"or is missing keys. Query: '{expected_query_for_log}'. "
|
f"or is missing keys. Query: '{expected_query_for_log}'. "
|
||||||
@@ -314,17 +311,13 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
mock_loki_client_log.warning.assert_called_with(
|
mock_loki_client_log.warning.assert_called_with(
|
||||||
expected_log_message_case1
|
expected_log_message_case1
|
||||||
)
|
)
|
||||||
|
|
||||||
mock_search.reset_mock()
|
mock_search.reset_mock()
|
||||||
mock_loki_client_log.reset_mock()
|
mock_loki_client_log.reset_mock()
|
||||||
|
|
||||||
mock_search.return_value = {"nodata": True}
|
mock_search.return_value = {"nodata": True}
|
||||||
total, output = self.client.retrieve(self.begin_dt, self.end_dt,
|
total, output = self.client.retrieve(self.begin_dt, self.end_dt,
|
||||||
None, None, 100)
|
None, None, 100)
|
||||||
|
|
||||||
self.assertEqual(total, 0)
|
self.assertEqual(total, 0)
|
||||||
self.assertEqual(output, [])
|
self.assertEqual(output, [])
|
||||||
|
|
||||||
expected_log_message_case2 = (
|
expected_log_message_case2 = (
|
||||||
f"Data from Loki search is not in the expected dictionary format "
|
f"Data from Loki search is not in the expected dictionary format "
|
||||||
f"or is missing keys. Query: '{expected_query_for_log}'. "
|
f"or is missing keys. Query: '{expected_query_for_log}'. "
|
||||||
@@ -339,9 +332,7 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
mock_requests_arg):
|
mock_requests_arg):
|
||||||
self.client._buffer_size = 3
|
self.client._buffer_size = 3
|
||||||
point = MockDataPoint(qty=1, price=10)
|
point = MockDataPoint(qty=1, price=10)
|
||||||
|
|
||||||
self.client.add_point(point, "test_type", self.begin_dt, self.end_dt)
|
self.client.add_point(point, "test_type", self.begin_dt, self.end_dt)
|
||||||
|
|
||||||
self.assertEqual(len(self.client._points), 1)
|
self.assertEqual(len(self.client._points), 1)
|
||||||
added_point_data = json.loads(self.client._points[0][1])
|
added_point_data = json.loads(self.client._points[0][1])
|
||||||
self.assertEqual(added_point_data['type'], "test_type")
|
self.assertEqual(added_point_data['type'], "test_type")
|
||||||
@@ -358,7 +349,6 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
mock_requests_arg):
|
mock_requests_arg):
|
||||||
self.client._buffer_size = 1
|
self.client._buffer_size = 1
|
||||||
point = MockDataPoint()
|
point = MockDataPoint()
|
||||||
|
|
||||||
self.client.add_point(point, "test_type", self.begin_dt, self.end_dt)
|
self.client.add_point(point, "test_type", self.begin_dt, self.end_dt)
|
||||||
self.assertEqual(len(self.client._points), 1)
|
self.assertEqual(len(self.client._points), 1)
|
||||||
mock_push.assert_called_once()
|
mock_push.assert_called_once()
|
||||||
@@ -383,12 +373,10 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
},
|
},
|
||||||
]
|
]
|
||||||
mock_retrieve.return_value = (2, loki_data_for_total)
|
mock_retrieve.return_value = (2, loki_data_for_total)
|
||||||
|
|
||||||
count, result = self.client.total(
|
count, result = self.client.total(
|
||||||
self.begin_dt, self.end_dt, "some_type", None, None,
|
self.begin_dt, self.end_dt, "some_type", None, None,
|
||||||
None, 0, 100, False
|
None, 0, 100, False
|
||||||
)
|
)
|
||||||
|
|
||||||
mock_retrieve.assert_called_once_with(
|
mock_retrieve.assert_called_once_with(
|
||||||
self.begin_dt, self.end_dt, None, "some_type", 100
|
self.begin_dt, self.end_dt, None, "some_type", 100
|
||||||
)
|
)
|
||||||
@@ -437,19 +425,16 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
]
|
]
|
||||||
mock_retrieve.return_value = (4, loki_data_for_total)
|
mock_retrieve.return_value = (4, loki_data_for_total)
|
||||||
groupby_fields = ["type", "project_id"]
|
groupby_fields = ["type", "project_id"]
|
||||||
|
|
||||||
count, result = self.client.total(
|
count, result = self.client.total(
|
||||||
self.begin_dt, self.end_dt, "any_metric_type",
|
self.begin_dt, self.end_dt, "any_metric_type",
|
||||||
{"filter_key": "val"}, groupby_fields,
|
{"filter_key": "val"}, groupby_fields,
|
||||||
None, 0, 100, False
|
None, 0, 100, False
|
||||||
)
|
)
|
||||||
|
|
||||||
mock_retrieve.assert_called_once_with(
|
mock_retrieve.assert_called_once_with(
|
||||||
self.begin_dt, self.end_dt, {"filter_key": "val"},
|
self.begin_dt, self.end_dt, {"filter_key": "val"},
|
||||||
"any_metric_type", 100
|
"any_metric_type", 100
|
||||||
)
|
)
|
||||||
self.assertEqual(count, 3)
|
self.assertEqual(count, 3)
|
||||||
|
|
||||||
expected_results_map = {
|
expected_results_map = {
|
||||||
tuple(sorted({'type': 'typeA',
|
tuple(sorted({'type': 'typeA',
|
||||||
'project_id': 'proj1'}.items())):
|
'project_id': 'proj1'}.items())):
|
||||||
@@ -477,12 +462,10 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
mock_retrieve.return_value = (0, [])
|
mock_retrieve.return_value = (0, [])
|
||||||
custom_fields = ["field1", "field2"]
|
custom_fields = ["field1", "field2"]
|
||||||
offset = 5
|
offset = 5
|
||||||
|
|
||||||
self.client.total(
|
self.client.total(
|
||||||
self.begin_dt, self.end_dt, None, None, None,
|
self.begin_dt, self.end_dt, None, None, None,
|
||||||
custom_fields, offset, 100, False
|
custom_fields, offset, 100, False
|
||||||
)
|
)
|
||||||
|
|
||||||
mock_log.warning.assert_any_call(
|
mock_log.warning.assert_any_call(
|
||||||
"'custom_fields' are not implemented yet for Loki. Therefore, "
|
"'custom_fields' are not implemented yet for Loki. Therefore, "
|
||||||
"the custom fields [%s] informed by the user will be ignored.",
|
"the custom fields [%s] informed by the user will be ignored.",
|
||||||
@@ -496,11 +479,18 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
test_query = '{app="cloudkitty"} | json ' \
|
test_query = '{app="cloudkitty"} | json ' \
|
||||||
'| type="compute.instance.exists"}'
|
'| type="compute.instance.exists"}'
|
||||||
self.client.delete_by_query(test_query, self.begin_dt, self.end_dt)
|
self.client.delete_by_query(test_query, self.begin_dt, self.end_dt)
|
||||||
mr.post.assert_called_once()
|
mr.post.assert_called_once_with(
|
||||||
|
f"{self.base_url}/delete",
|
||||||
|
params={
|
||||||
|
"query": test_query,
|
||||||
|
"start": int(self.begin_dt.timestamp()),
|
||||||
|
"end": int(self.end_dt.timestamp()),
|
||||||
|
},
|
||||||
|
headers=self.client._headers,
|
||||||
|
cert=self.client._cert,
|
||||||
|
verify=self.client._verify
|
||||||
|
)
|
||||||
ml.debug.assert_has_calls([
|
ml.debug.assert_has_calls([
|
||||||
call(f"Request Params: {{'query': '{test_query}', "
|
|
||||||
f"'start': {int(self.begin_dt.timestamp())}, "
|
|
||||||
f"'end': {int(self.end_dt.timestamp())}}}"),
|
|
||||||
call("Dataframes deleted successfully.")
|
call("Dataframes deleted successfully.")
|
||||||
])
|
])
|
||||||
mq.assert_not_called()
|
mq.assert_not_called()
|
||||||
@@ -512,22 +502,21 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
mock_response.status_code = 500
|
mock_response.status_code = 500
|
||||||
mock_response.text = "Internal Server Error"
|
mock_response.text = "Internal Server Error"
|
||||||
mock_requests_arg.post.return_value = mock_response
|
mock_requests_arg.post.return_value = mock_response
|
||||||
|
test_query = '{app="cloudkitty"} | json | ' \
|
||||||
test_query = '{app="cloudkitty"} | json | '
|
'type="compute.instance.exists"'
|
||||||
'type="compute.instance.exists"'
|
|
||||||
self.client.delete_by_query(test_query, self.begin_dt, self.end_dt)
|
self.client.delete_by_query(test_query, self.begin_dt, self.end_dt)
|
||||||
|
|
||||||
expected_url = f"{self.base_url}/delete"
|
expected_url = f"{self.base_url}/delete"
|
||||||
expected_params = {
|
expected_params = {
|
||||||
"query": test_query,
|
"query": test_query,
|
||||||
"start": int(self.begin_dt.timestamp()),
|
"start": int(self.begin_dt.timestamp()),
|
||||||
"end": int(self.end_dt.timestamp()),
|
"end": int(self.end_dt.timestamp()),
|
||||||
}
|
}
|
||||||
|
|
||||||
mock_requests_arg.post.assert_called_once_with(
|
mock_requests_arg.post.assert_called_once_with(
|
||||||
expected_url,
|
expected_url,
|
||||||
params=expected_params,
|
params=expected_params,
|
||||||
headers=self.client._headers
|
headers=self.client._headers,
|
||||||
|
cert=self.client._cert,
|
||||||
|
verify=self.client._verify
|
||||||
)
|
)
|
||||||
expected_error_msg = ("Failed to delete dataframes: "
|
expected_error_msg = ("Failed to delete dataframes: "
|
||||||
"500 - Internal Server Error")
|
"500 - Internal Server Error")
|
||||||
@@ -541,9 +530,7 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
mock_base_query.return_value = '{app="cloudkitty", source="test"} ' \
|
mock_base_query.return_value = '{app="cloudkitty", source="test"} ' \
|
||||||
'| json'
|
'| json'
|
||||||
filters = {"project_id": "proj1", "resource_type": "instance"}
|
filters = {"project_id": "proj1", "resource_type": "instance"}
|
||||||
|
|
||||||
self.client.delete(self.begin_dt, self.end_dt, filters)
|
self.client.delete(self.begin_dt, self.end_dt, filters)
|
||||||
|
|
||||||
exp_query_filters = 'groupby_project_id="proj1", ' \
|
exp_query_filters = 'groupby_project_id="proj1", ' \
|
||||||
'groupby_resource_type="instance"'
|
'groupby_resource_type="instance"'
|
||||||
exp_query = f'{mock_base_query.return_value} | {exp_query_filters}'
|
exp_query = f'{mock_base_query.return_value} | {exp_query_filters}'
|
||||||
@@ -558,9 +545,7 @@ class TestLokiClient(unittest.TestCase):
|
|||||||
mock_log, mock_requests_arg):
|
mock_log, mock_requests_arg):
|
||||||
mock_base_query.return_value = '{app="cloudkitty", source="test"} ' \
|
mock_base_query.return_value = '{app="cloudkitty", source="test"} ' \
|
||||||
'| json'
|
'| json'
|
||||||
|
|
||||||
self.client.delete(self.begin_dt, self.end_dt, None)
|
self.client.delete(self.begin_dt, self.end_dt, None)
|
||||||
|
|
||||||
mock_delete_by_query.assert_called_once_with(
|
mock_delete_by_query.assert_called_once_with(
|
||||||
mock_base_query.return_value, self.begin_dt, self.end_dt
|
mock_base_query.return_value, self.begin_dt, self.end_dt
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ from cloudkitty.utils import json
|
|||||||
class FakeLokiClient(loki_client_module.LokiClient):
|
class FakeLokiClient(loki_client_module.LokiClient):
|
||||||
|
|
||||||
def __init__(self, url, tenant, stream_labels, content_type,
|
def __init__(self, url, tenant, stream_labels, content_type,
|
||||||
buffer_size, **kwargs):
|
buffer_size, cert, verify, **kwargs):
|
||||||
if content_type != "application/json":
|
if content_type != "application/json":
|
||||||
raise loki_exceptions.UnsupportedContentType(content_type)
|
raise loki_exceptions.UnsupportedContentType(content_type)
|
||||||
|
|
||||||
@@ -36,7 +36,9 @@ class FakeLokiClient(loki_client_module.LokiClient):
|
|||||||
}
|
}
|
||||||
self._buffer_size = buffer_size
|
self._buffer_size = buffer_size
|
||||||
|
|
||||||
self._logs = []
|
self._cert = cert
|
||||||
|
self._verify = verify
|
||||||
|
|
||||||
self.init()
|
self.init()
|
||||||
|
|
||||||
def init(self):
|
def init(self):
|
||||||
|
|||||||
@@ -121,7 +121,7 @@ Section ``storage_loki``:
|
|||||||
* ``url``: Defaults to ``http://localhost:3100/loki/api/v1``. Loki host, along
|
* ``url``: Defaults to ``http://localhost:3100/loki/api/v1``. Loki host, along
|
||||||
with port and protocol.
|
with port and protocol.
|
||||||
|
|
||||||
* ``tenant``: Defaults to tenant1. Loki tenant.
|
* ``tenant``: Defaults to cloudkitty. Loki tenant.
|
||||||
|
|
||||||
* ``stream``: Defaults to ``{"service": "cloudkitty"}``. The labels that are
|
* ``stream``: Defaults to ``{"service": "cloudkitty"}``. The labels that are
|
||||||
going to be used to define the Loki stream as Python dict.
|
going to be used to define the Loki stream as Python dict.
|
||||||
@@ -136,4 +136,10 @@ Section ``storage_loki``:
|
|||||||
* ``insecure``: Defaults to ``false``. Set to true to allow insecure HTTPS
|
* ``insecure``: Defaults to ``false``. Set to true to allow insecure HTTPS
|
||||||
connections to Loki.
|
connections to Loki.
|
||||||
|
|
||||||
* ``cafile``: Path of the CA certificate to trust for HTTPS connections.
|
* ``ca_file``: Path of the CA certificate to trust for HTTPS connections.
|
||||||
|
|
||||||
|
* ``cert_file``: Path to a client cert for establishing mTLS connections to
|
||||||
|
Loki.
|
||||||
|
|
||||||
|
* ``key_file``: Path to a client key for establishing mTLS connections to
|
||||||
|
Loki.
|
||||||
|
|||||||
Reference in New Issue
Block a user