Merge "Loki: Add timeout for HTTP requests"
This commit is contained in:
@@ -83,7 +83,12 @@ loki_storage_opts = [
|
||||
'key_file',
|
||||
help="Path to a client key for establishing mTLS connections to "
|
||||
"Loki.",
|
||||
default=None)
|
||||
default=None),
|
||||
cfg.FloatOpt(
|
||||
'timeout',
|
||||
default=60,
|
||||
min=0,
|
||||
help='Timeout value for http requests'),
|
||||
]
|
||||
|
||||
CONF.register_opts(loki_storage_opts, LOKI_STORAGE_GROUP)
|
||||
@@ -111,7 +116,9 @@ class LokiStorage(v2_storage.BaseStorage):
|
||||
CONF.storage_loki.buffer_size,
|
||||
CONF.storage_loki.shard_days,
|
||||
cert,
|
||||
verify)
|
||||
verify,
|
||||
CONF.storage_loki.timeout,
|
||||
)
|
||||
|
||||
def init(self):
|
||||
LOG.debug('LokiStorage Init.')
|
||||
|
||||
@@ -27,7 +27,7 @@ class LokiClient(object):
|
||||
"""Class used to ease interaction with Loki."""
|
||||
|
||||
def __init__(self, url, tenant, stream_labels, content_type, buffer_size,
|
||||
shard_days, cert, verify):
|
||||
shard_days, cert, verify, timeout):
|
||||
if content_type != "application/json":
|
||||
raise exceptions.UnsupportedContentType(content_type)
|
||||
|
||||
@@ -44,6 +44,8 @@ class LokiClient(object):
|
||||
self._cert = cert
|
||||
self._verify = verify
|
||||
|
||||
self.timeout = timeout
|
||||
|
||||
def _build_payload_json(self, batch):
|
||||
"""Build payload with separate streams per project_id."""
|
||||
streams = []
|
||||
@@ -104,7 +106,8 @@ class LokiClient(object):
|
||||
|
||||
LOG.debug("Executing Loki query: %s", params)
|
||||
response = requests.get(url, params=params, headers=self._headers,
|
||||
cert=self._cert, verify=self._verify)
|
||||
cert=self._cert, verify=self._verify,
|
||||
timeout=self.timeout)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()['data']
|
||||
@@ -128,7 +131,8 @@ class LokiClient(object):
|
||||
|
||||
payload = self._build_payload_json(self._points)
|
||||
response = requests.post(url, json=payload, headers=self._headers,
|
||||
cert=self._cert, verify=self._verify)
|
||||
cert=self._cert, verify=self._verify,
|
||||
timeout=self.timeout)
|
||||
|
||||
if response.status_code == 204:
|
||||
LOG.debug(
|
||||
@@ -159,7 +163,8 @@ class LokiClient(object):
|
||||
}
|
||||
|
||||
response = requests.post(url, params=params, headers=self._headers,
|
||||
cert=self._cert, verify=self._verify)
|
||||
cert=self._cert, verify=self._verify,
|
||||
timeout=self.timeout)
|
||||
|
||||
if response.status_code == 204:
|
||||
LOG.debug(
|
||||
|
||||
@@ -52,6 +52,7 @@ class TestLokiClient(unittest.TestCase):
|
||||
self.shard_days = 7
|
||||
self.cert = ('/path/to/cert', '/path/to/key')
|
||||
self.verify = '/path/to/cafile'
|
||||
self.timeout = 60
|
||||
self.client = client.LokiClient(
|
||||
self.base_url,
|
||||
self.tenant,
|
||||
@@ -60,7 +61,8 @@ class TestLokiClient(unittest.TestCase):
|
||||
self.buffer_size,
|
||||
self.shard_days,
|
||||
cert=self.cert,
|
||||
verify=self.verify
|
||||
verify=self.verify,
|
||||
timeout=self.timeout,
|
||||
)
|
||||
self.begin_dt = datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
|
||||
self.end_dt = datetime(2024, 1, 1, 1, 0, 0, tzinfo=timezone.utc)
|
||||
@@ -80,7 +82,7 @@ class TestLokiClient(unittest.TestCase):
|
||||
with self.assertRaises(exceptions.UnsupportedContentType):
|
||||
client.LokiClient(self.base_url, self.tenant, self.stream_labels,
|
||||
"text/plain", self.buffer_size, self.shard_days,
|
||||
None, True)
|
||||
None, True, 60.0)
|
||||
|
||||
def test_build_payload_json(self, mock_log, mock_requests):
|
||||
batch = {
|
||||
@@ -190,7 +192,8 @@ class TestLokiClient(unittest.TestCase):
|
||||
params=expected_params,
|
||||
headers=self.client._headers,
|
||||
cert=self.client._cert,
|
||||
verify=self.client._verify
|
||||
verify=self.client._verify,
|
||||
timeout=self.client.timeout,
|
||||
)
|
||||
self.assertEqual(
|
||||
data,
|
||||
@@ -239,7 +242,8 @@ class TestLokiClient(unittest.TestCase):
|
||||
)
|
||||
mock_requests.post.assert_called_once_with(
|
||||
expected_url, json=expected_payload, headers=self.client._headers,
|
||||
cert=self.client._cert, verify=self.client._verify
|
||||
cert=self.client._cert, verify=self.client._verify,
|
||||
timeout=self.client.timeout
|
||||
)
|
||||
self.assertEqual(self.client._points, {})
|
||||
expected_msg = ("Batch of 2 messages across 1 project(s) "
|
||||
@@ -268,7 +272,8 @@ class TestLokiClient(unittest.TestCase):
|
||||
json=self.client._build_payload_json(initial_points),
|
||||
headers=self.client._headers,
|
||||
cert=self.client._cert,
|
||||
verify=self.client._verify
|
||||
verify=self.client._verify,
|
||||
timeout=self.client.timeout
|
||||
)
|
||||
|
||||
def test_push_no_points(self, mock_log, mock_requests):
|
||||
@@ -540,7 +545,8 @@ class TestLokiClient(unittest.TestCase):
|
||||
},
|
||||
headers=self.client._headers,
|
||||
cert=self.client._cert,
|
||||
verify=self.client._verify
|
||||
verify=self.client._verify,
|
||||
timeout=self.client.timeout
|
||||
)
|
||||
ml.debug.assert_has_calls([
|
||||
call("Dataframes deleted successfully.")
|
||||
@@ -568,7 +574,8 @@ class TestLokiClient(unittest.TestCase):
|
||||
params=expected_params,
|
||||
headers=self.client._headers,
|
||||
cert=self.client._cert,
|
||||
verify=self.client._verify
|
||||
verify=self.client._verify,
|
||||
timeout=self.client.timeout
|
||||
)
|
||||
expected_error_msg = ("Failed to delete dataframes: "
|
||||
"500 - Internal Server Error")
|
||||
|
||||
@@ -23,7 +23,7 @@ from cloudkitty.utils import json
|
||||
class FakeLokiClient(loki_client_module.LokiClient):
|
||||
|
||||
def __init__(self, url, tenant, stream_labels, content_type,
|
||||
buffer_size, shard_days, cert, verify, **kwargs):
|
||||
buffer_size, shard_days, cert, verify, timeout, **kwargs):
|
||||
if content_type != "application/json":
|
||||
raise loki_exceptions.UnsupportedContentType(content_type)
|
||||
|
||||
@@ -40,6 +40,8 @@ class FakeLokiClient(loki_client_module.LokiClient):
|
||||
self._cert = cert
|
||||
self._verify = verify
|
||||
|
||||
self.timeout = timeout
|
||||
|
||||
self.init()
|
||||
|
||||
def init(self):
|
||||
|
||||
6
releasenotes/notes/loki-timeout-e07904458ab6cccb.yaml
Normal file
6
releasenotes/notes/loki-timeout-e07904458ab6cccb.yaml
Normal file
@@ -0,0 +1,6 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Introduced timeout for HTTP requests sent to Loki. Timeout threshold is
|
||||
60 seconds by default and can be customized by
|
||||
the ``[storage_loki] timeout`` option.
|
||||
Reference in New Issue
Block a user