Merge "Add HTTPS and auth support to Prometheus collector"
This commit is contained in:
commit
16646f1afc
|
@ -41,6 +41,24 @@ pcollector_collector_opts = [
|
|||
default='',
|
||||
help='Prometheus service URL',
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'prometheus_user',
|
||||
help='Prometheus user (for basic auth only)',
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'prometheus_password',
|
||||
help='Prometheus user password (for basic auth only)',
|
||||
secret=True,
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'cafile',
|
||||
help='Custom certificate authority file path',
|
||||
),
|
||||
cfg.BoolOpt(
|
||||
'insecure',
|
||||
default=False,
|
||||
help='Explicitly trust untrusted HTTPS responses',
|
||||
),
|
||||
]
|
||||
cfg.CONF.register_opts(pcollector_collector_opts, PROMETHEUS_COLLECTOR_OPTS)
|
||||
|
||||
|
@ -57,39 +75,58 @@ class PrometheusConfigError(collect_exceptions.CollectError):
|
|||
pass
|
||||
|
||||
|
||||
class PrometheusResponseError(collect_exceptions.CollectError):
|
||||
pass
|
||||
|
||||
|
||||
class PrometheusClient(object):
|
||||
@classmethod
|
||||
def build_query(cls, source, query, start, end, period, metric_name):
|
||||
"""Build PromQL instant queries."""
|
||||
start = ck_utils.iso8601_from_timestamp(start)
|
||||
end = ck_utils.iso8601_from_timestamp(end)
|
||||
INSTANT_QUERY_ENDPOINT = 'query'
|
||||
RANGE_QUERY_ENDPOINT = 'query_range'
|
||||
|
||||
if '$period' in query:
|
||||
try:
|
||||
query = ck_utils.template_str_substitute(
|
||||
query, {'period': str(period) + 's'},
|
||||
)
|
||||
except (KeyError, ValueError):
|
||||
raise PrometheusConfigError(
|
||||
'Invalid prometheus query: {}'.format(query))
|
||||
def __init__(self, url, auth=None, verify=True):
|
||||
self.url = url
|
||||
self.auth = auth
|
||||
self.verify = verify
|
||||
|
||||
# Due to the design of Cloudkitty, only instant queries are supported.
|
||||
# In that case 'time' equals 'end' and
|
||||
# the window time is reprezented by the period.
|
||||
return source + '/query?query=' + query + '&time=' + end
|
||||
|
||||
@classmethod
|
||||
def get_data(cls, source, query, start, end, period, metric_name):
|
||||
url = cls.build_query(
|
||||
source,
|
||||
query,
|
||||
start,
|
||||
end,
|
||||
period,
|
||||
metric_name,
|
||||
def _get(self, endpoint, params):
|
||||
return requests.get(
|
||||
'{}/{}'.format(self.url, endpoint),
|
||||
params=params,
|
||||
auth=self.auth,
|
||||
verify=self.verify,
|
||||
)
|
||||
|
||||
return requests.get(url).json()
|
||||
def get_instant(self, query, time=None, timeout=None):
|
||||
res = self._get(
|
||||
self.INSTANT_QUERY_ENDPOINT,
|
||||
params={'query': query, 'time': time, 'timeout': timeout},
|
||||
)
|
||||
try:
|
||||
return res.json()
|
||||
except ValueError:
|
||||
raise PrometheusResponseError(
|
||||
'Could not get a valid json response for '
|
||||
'{} (response: {})'.format(res.url, res.text)
|
||||
)
|
||||
|
||||
def get_range(self, query, start, end, step, timeout=None):
|
||||
res = self._get(
|
||||
self.RANGE_QUERY_ENDPOINT,
|
||||
params={
|
||||
'query': query,
|
||||
'start': start,
|
||||
'end': end,
|
||||
'step': step,
|
||||
'timeout': timeout,
|
||||
},
|
||||
)
|
||||
try:
|
||||
return res.json()
|
||||
except ValueError:
|
||||
raise PrometheusResponseError(
|
||||
'Could not get a valid json response for '
|
||||
'{} (response: {})'.format(res.url, res.text)
|
||||
)
|
||||
|
||||
|
||||
class PrometheusCollector(collector.BaseCollector):
|
||||
|
@ -97,6 +134,22 @@ class PrometheusCollector(collector.BaseCollector):
|
|||
|
||||
def __init__(self, transformers, **kwargs):
|
||||
super(PrometheusCollector, self).__init__(transformers, **kwargs)
|
||||
url = CONF.collector_prometheus.prometheus_url
|
||||
|
||||
user = CONF.collector_prometheus.prometheus_user
|
||||
password = CONF.collector_prometheus.prometheus_password
|
||||
|
||||
verify = True
|
||||
if CONF.collector_prometheus.cafile:
|
||||
verify = CONF.collector_prometheus.cafile
|
||||
elif CONF.collector_prometheus.insecure:
|
||||
verify = False
|
||||
|
||||
self._conn = PrometheusClient(
|
||||
url,
|
||||
auth=(user, password) if user and password else None,
|
||||
verify=verify,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def check_configuration(conf):
|
||||
|
@ -138,19 +191,21 @@ class PrometheusCollector(collector.BaseCollector):
|
|||
|
||||
def fetch_all(self, metric_name, start, end, project_id, q_filter=None):
|
||||
"""Returns metrics to be valorized."""
|
||||
# NOTE(mc): Remove potential trailing '/' to avoid
|
||||
# url building problems
|
||||
url = CONF.collector_prometheus.prometheus_url
|
||||
if url.endswith('/'):
|
||||
url = url[:-1]
|
||||
query = self.conf[metric_name]['extra_args']['query']
|
||||
period = CONF.collect.period
|
||||
|
||||
res = PrometheusClient.get_data(
|
||||
url,
|
||||
self.conf[metric_name]['extra_args']['query'],
|
||||
start,
|
||||
if '$period' in query:
|
||||
try:
|
||||
query = ck_utils.template_str_substitute(
|
||||
query, {'period': str(period) + 's'},
|
||||
)
|
||||
except (KeyError, ValueError):
|
||||
raise PrometheusConfigError(
|
||||
'Invalid prometheus query: {}'.format(query))
|
||||
|
||||
res = self._conn.get_instant(
|
||||
query,
|
||||
end,
|
||||
self.period,
|
||||
metric_name,
|
||||
)
|
||||
|
||||
# If the query returns an empty dataset,
|
||||
|
|
|
@ -16,10 +16,12 @@
|
|||
# @author: Martin CAMEY
|
||||
#
|
||||
from decimal import Decimal
|
||||
|
||||
import mock
|
||||
|
||||
from cloudkitty import collector
|
||||
from cloudkitty.collector import prometheus
|
||||
from cloudkitty import json_utils as json
|
||||
from cloudkitty import tests
|
||||
from cloudkitty.tests import samples
|
||||
from cloudkitty import transformer
|
||||
|
@ -96,7 +98,7 @@ class PrometheusCollectorTest(tests.TestCase):
|
|||
}
|
||||
|
||||
no_response = mock.patch(
|
||||
'cloudkitty.collector.prometheus.PrometheusClient.get_data',
|
||||
'cloudkitty.collector.prometheus.PrometheusClient.get_instant',
|
||||
return_value=samples.PROMETHEUS_RESP_INSTANT_QUERY,
|
||||
)
|
||||
|
||||
|
@ -113,7 +115,7 @@ class PrometheusCollectorTest(tests.TestCase):
|
|||
|
||||
def test_format_retrieve_raise_NoDataCollected(self):
|
||||
no_response = mock.patch(
|
||||
'cloudkitty.collector.prometheus.PrometheusClient.get_data',
|
||||
'cloudkitty.collector.prometheus.PrometheusClient.get_instant',
|
||||
return_value=samples.PROMETHEUS_EMPTY_RESP_INSTANT_QUERY,
|
||||
)
|
||||
|
||||
|
@ -130,49 +132,133 @@ class PrometheusCollectorTest(tests.TestCase):
|
|||
|
||||
|
||||
class PrometheusClientTest(tests.TestCase):
|
||||
class FakeResponse(object):
|
||||
"""Mimics an HTTP ``requests`` response"""
|
||||
|
||||
def __init__(self, url, text, status_code):
|
||||
self.url = url
|
||||
self.text = text
|
||||
self.status_code = status_code
|
||||
|
||||
def json(self):
|
||||
return json.loads(self.text)
|
||||
|
||||
@staticmethod
|
||||
def _mock_requests_get(text):
|
||||
"""Factory to build FakeResponse with desired response body text"""
|
||||
return lambda *args, **kwargs: PrometheusClientTest.FakeResponse(
|
||||
args[0], text, 200,
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super(PrometheusClientTest, self).setUp()
|
||||
self.client = prometheus.PrometheusClient
|
||||
|
||||
def test_build_instant_query_first_period(self):
|
||||
expected = 'http://localhost:9090/api/v1/query?' \
|
||||
'query=increase(http_requests_total[3600s])' \
|
||||
'&time=2015-01-01T01:00:00Z'
|
||||
params = {
|
||||
'source': 'http://localhost:9090/api/v1',
|
||||
'query': 'increase(http_requests_total[$period])',
|
||||
'start': samples.FIRST_PERIOD_BEGIN,
|
||||
'end': samples.FIRST_PERIOD_END,
|
||||
'period': '3600',
|
||||
'metric_name': 'http_requests_total',
|
||||
}
|
||||
actual = self.client.build_query(**params)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_build_instant_query_second_period(self):
|
||||
expected = 'http://localhost:9090/api/v1/query?' \
|
||||
'query=increase(http_requests_total[3600s])' \
|
||||
'&time=2015-01-01T02:00:00Z'
|
||||
params = {
|
||||
'source': 'http://localhost:9090/api/v1',
|
||||
'query': 'increase(http_requests_total[$period])',
|
||||
'start': samples.SECOND_PERIOD_BEGIN,
|
||||
'end': samples.SECOND_PERIOD_END,
|
||||
'period': '3600',
|
||||
'metric_name': 'http_requests_total',
|
||||
}
|
||||
actual = self.client.build_query(**params)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_build_query_raises_PrometheusConfigError(self):
|
||||
class InvalidPeriod(object):
|
||||
def __str__(self):
|
||||
raise ValueError
|
||||
|
||||
period = InvalidPeriod()
|
||||
|
||||
self.assertRaises(
|
||||
prometheus.PrometheusConfigError,
|
||||
self.client.build_query,
|
||||
None, '$period', 0, 0, period, 'broken_metric',
|
||||
self.client = prometheus.PrometheusClient(
|
||||
'http://localhost:9090/api/v1',
|
||||
)
|
||||
|
||||
def test_get_with_no_options(self):
|
||||
with mock.patch('requests.get') as mock_get:
|
||||
self.client._get(
|
||||
'query_range',
|
||||
params={
|
||||
'query': 'max(http_requests_total) by (project_id)',
|
||||
'start': samples.FIRST_PERIOD_BEGIN,
|
||||
'end': samples.FIRST_PERIOD_END,
|
||||
'step': 10,
|
||||
},
|
||||
)
|
||||
mock_get.assert_called_once_with(
|
||||
'http://localhost:9090/api/v1/query_range',
|
||||
params={
|
||||
'query': 'max(http_requests_total) by (project_id)',
|
||||
'start': samples.FIRST_PERIOD_BEGIN,
|
||||
'end': samples.FIRST_PERIOD_END,
|
||||
'step': 10,
|
||||
},
|
||||
auth=None,
|
||||
verify=True,
|
||||
)
|
||||
|
||||
def test_get_with_options(self):
|
||||
client = prometheus.PrometheusClient(
|
||||
'http://localhost:9090/api/v1',
|
||||
auth=('foo', 'bar'),
|
||||
verify='/some/random/path',
|
||||
)
|
||||
with mock.patch('requests.get') as mock_get:
|
||||
client._get(
|
||||
'query_range',
|
||||
params={
|
||||
'query': 'max(http_requests_total) by (project_id)',
|
||||
'start': samples.FIRST_PERIOD_BEGIN,
|
||||
'end': samples.FIRST_PERIOD_END,
|
||||
'step': 10,
|
||||
},
|
||||
)
|
||||
mock_get.assert_called_once_with(
|
||||
'http://localhost:9090/api/v1/query_range',
|
||||
params={
|
||||
'query': 'max(http_requests_total) by (project_id)',
|
||||
'start': samples.FIRST_PERIOD_BEGIN,
|
||||
'end': samples.FIRST_PERIOD_END,
|
||||
'step': 10,
|
||||
},
|
||||
auth=('foo', 'bar'),
|
||||
verify='/some/random/path',
|
||||
)
|
||||
|
||||
def test_get_instant(self):
|
||||
mock_get = mock.patch(
|
||||
'requests.get',
|
||||
side_effect=self._mock_requests_get('{"foo": "bar"}'),
|
||||
)
|
||||
|
||||
with mock_get:
|
||||
res = self.client.get_instant(
|
||||
'max(http_requests_total) by (project_id)',
|
||||
)
|
||||
self.assertEqual(res, {'foo': 'bar'})
|
||||
|
||||
def test_get_range(self):
|
||||
mock_get = mock.patch(
|
||||
'requests.get',
|
||||
side_effect=self._mock_requests_get('{"foo": "bar"}'),
|
||||
)
|
||||
|
||||
with mock_get:
|
||||
res = self.client.get_range(
|
||||
'max(http_requests_total) by (project_id)',
|
||||
samples.FIRST_PERIOD_BEGIN,
|
||||
samples.FIRST_PERIOD_END,
|
||||
10,
|
||||
)
|
||||
self.assertEqual(res, {'foo': 'bar'})
|
||||
|
||||
def test_get_instant_raises_error_on_bad_json(self):
|
||||
# Simulating malformed JSON response from HTTP+PromQL instant request
|
||||
mock_get = mock.patch(
|
||||
'requests.get',
|
||||
side_effect=self._mock_requests_get('{"foo": "bar"'),
|
||||
)
|
||||
with mock_get:
|
||||
self.assertRaises(
|
||||
prometheus.PrometheusResponseError,
|
||||
self.client.get_instant,
|
||||
'max(http_requests_total) by (project_id)',
|
||||
)
|
||||
|
||||
def test_get_range_raises_error_on_bad_json(self):
|
||||
# Simulating malformed JSON response from HTTP+PromQL range request
|
||||
mock_get = mock.patch(
|
||||
'requests.get',
|
||||
side_effect=self._mock_requests_get('{"foo": "bar"'),
|
||||
)
|
||||
with mock_get:
|
||||
self.assertRaises(
|
||||
prometheus.PrometheusResponseError,
|
||||
self.client.get_range,
|
||||
'max(http_requests_total) by (project_id)',
|
||||
samples.FIRST_PERIOD_BEGIN,
|
||||
samples.FIRST_PERIOD_END,
|
||||
10,
|
||||
)
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
Prometheus collector now supports HTTPS with custom CA file,
|
||||
an insecure option to allow untrusted certificate
|
||||
and basic HTTP authentication.
|
Loading…
Reference in New Issue