Implement Prometheus fetcher

A Prometheus scope fetcher has been added in order to dynamically discover
scopes from a Prometheus service using a user defined metric and a scope
attribute.
It can also filter out the response from Prometheus using metadata filters
to have a more fine-grained control over scope discovery.
It features HTTP basic auth capabilities and HTTPS configuration options
similar to Prometheus collector.

Change-Id: If3c2da8d7949e0aec08f3699547faf34af4ddee4
Story: 2005427
Task: 30458
This commit is contained in:
Justin Ferrieu 2019-04-09 10:21:52 +02:00 committed by Luka Peschke
parent 7ca8b43cb4
commit 46a54ad05f
10 changed files with 517 additions and 192 deletions

View File

@ -18,13 +18,14 @@ from decimal import ROUND_HALF_UP
from oslo_config import cfg
from oslo_log import log
import requests
from voluptuous import In
from voluptuous import Required
from voluptuous import Schema
from cloudkitty import collector
from cloudkitty.collector import exceptions as collect_exceptions
from cloudkitty.collector.exceptions import CollectError
from cloudkitty.common.prometheus_client import PrometheusClient
from cloudkitty.common.prometheus_client import PrometheusResponseError
from cloudkitty import utils as ck_utils
@ -72,60 +73,6 @@ PROMETHEUS_EXTRA_SCHEMA = {
}
class PrometheusResponseError(collect_exceptions.CollectError):
pass
class PrometheusClient(object):
INSTANT_QUERY_ENDPOINT = 'query'
RANGE_QUERY_ENDPOINT = 'query_range'
def __init__(self, url, auth=None, verify=True):
self.url = url
self.auth = auth
self.verify = verify
def _get(self, endpoint, params):
return requests.get(
'{}/{}'.format(self.url, endpoint),
params=params,
auth=self.auth,
verify=self.verify,
)
def get_instant(self, query, time=None, timeout=None):
res = self._get(
self.INSTANT_QUERY_ENDPOINT,
params={'query': query, 'time': time, 'timeout': timeout},
)
try:
return res.json()
except ValueError:
raise PrometheusResponseError(
'Could not get a valid json response for '
'{} (response: {})'.format(res.url, res.text)
)
def get_range(self, query, start, end, step, timeout=None):
res = self._get(
self.RANGE_QUERY_ENDPOINT,
params={
'query': query,
'start': start,
'end': end,
'step': step,
'timeout': timeout,
},
)
try:
return res.json()
except ValueError:
raise PrometheusResponseError(
'Could not get a valid json response for '
'{} (response: {})'.format(res.url, res.text)
)
class PrometheusCollector(collector.BaseCollector):
collector_name = 'prometheus'
@ -203,10 +150,14 @@ class PrometheusCollector(collector.BaseCollector):
period,
', '.join(groupby + metadata),
)
res = self._conn.get_instant(
query,
time,
)
try:
res = self._conn.get_instant(
query,
time,
)
except PrometheusResponseError as e:
raise CollectError(*e.args)
# If the query returns an empty dataset,
# return an empty list

View File

@ -54,6 +54,8 @@ _opts = [
('fetcher_keystone', list(itertools.chain(
cloudkitty.fetcher.keystone.keystone_opts,
cloudkitty.fetcher.keystone.fetcher_keystone_opts))),
('fetcher_prometheus', list(itertools.chain(
cloudkitty.fetcher.prometheus.fetcher_prometheus_opts))),
('fetcher_source', list(itertools.chain(
cloudkitty.fetcher.source.fetcher_source_opts))),
('orchestrator', list(itertools.chain(

View File

@ -0,0 +1,69 @@
# Copyright 2019 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import requests
class PrometheusResponseError(Exception):
pass
class PrometheusClient(object):
INSTANT_QUERY_ENDPOINT = 'query'
RANGE_QUERY_ENDPOINT = 'query_range'
def __init__(self, url, auth=None, verify=True):
self.url = url
self.auth = auth
self.verify = verify
def _get(self, endpoint, params):
return requests.get(
'{}/{}'.format(self.url, endpoint),
params=params,
auth=self.auth,
verify=self.verify,
)
def get_instant(self, query, time=None, timeout=None):
res = self._get(
self.INSTANT_QUERY_ENDPOINT,
params={'query': query, 'time': time, 'timeout': timeout},
)
try:
return res.json()
except ValueError:
raise PrometheusResponseError(
'Could not get a valid json response for '
'{} (response: {})'.format(res.url, res.text)
)
def get_range(self, query, start, end, step, timeout=None):
res = self._get(
self.RANGE_QUERY_ENDPOINT,
params={
'query': query,
'start': start,
'end': end,
'step': step,
'timeout': timeout,
},
)
try:
return res.json()
except ValueError:
raise PrometheusResponseError(
'Could not get a valid json response for '
'{} (response: {})'.format(res.url, res.text)
)

View File

@ -0,0 +1,148 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_config import cfg
from cloudkitty.common.prometheus_client import PrometheusClient
from cloudkitty.common.prometheus_client import PrometheusResponseError
from cloudkitty import fetcher
class PrometheusFetcherError(Exception):
pass
FETCHER_PROMETHEUS_OPTS = 'fetcher_prometheus'
fetcher_prometheus_opts = [
cfg.StrOpt(
'metric',
help='Metric from which scope_ids should be requested',
),
cfg.StrOpt(
'scope_attribute',
default='project_id',
help='Attribute from which scope_ids should be collected',
),
cfg.StrOpt(
'prometheus_url',
help='Prometheus service URL',
),
cfg.StrOpt(
'prometheus_user',
default='',
help='Prometheus user (for basic auth only)',
),
cfg.StrOpt(
'prometheus_password',
default='',
help='Prometheus user (for basic auth only)',
),
cfg.StrOpt(
'cafile',
help='Custom certificate authority file path',
),
cfg.BoolOpt(
'insecure',
default=False,
help='Explicitly trust untrusted HTTPS responses',
),
cfg.DictOpt(
'filters',
default=dict(),
help='Metadata to filter out the scope_ids discovery request response',
),
]
cfg.CONF.register_opts(fetcher_prometheus_opts, FETCHER_PROMETHEUS_OPTS)
CONF = cfg.CONF
class PrometheusFetcher(fetcher.BaseFetcher):
"""Prometheus scope_id fetcher"""
name = 'prometheus'
def __init__(self):
super(PrometheusFetcher, self).__init__()
url = CONF.fetcher_prometheus.prometheus_url
user = CONF.fetcher_prometheus.prometheus_user
password = CONF.fetcher_prometheus.prometheus_password
verify = True
if CONF.fetcher_prometheus.cafile:
verify = CONF.fetcher_prometheus.cafile
elif CONF.fetcher_prometheus.insecure:
verify = False
self._conn = PrometheusClient(
url,
auth=(user, password) if user and password else None,
verify=verify,
)
def get_tenants(self):
metric = CONF.fetcher_prometheus.metric
scope_attribute = CONF.fetcher_prometheus.scope_attribute
filters = CONF.fetcher_prometheus.filters
metadata = ''
# Preformatting filters as {label1="value1", label2="value2"}
if filters:
metadata = '{{{}}}'.format(', '.join([
'{}="{}"'.format(k, v) for k, v in filters.items()
]))
# Formatting PromQL query
query = 'max({}{}) by ({})'.format(
metric,
metadata,
scope_attribute,
)
try:
res = self._conn.get_instant(query)
except PrometheusResponseError as e:
raise PrometheusFetcherError(*e.args)
try:
result = res['data']['result']
if not result:
return []
scope_ids = [
item['metric'][scope_attribute] for item in result
if item['metric'][scope_attribute]
]
except KeyError as e:
missing_key = e.args[0]
if missing_key in ['data', 'result', 'metric']:
msg = (
'Unexpected Prometheus server response '
'"{}" for "{}"'
).format(
res,
query,
)
else:
msg = '"{}" not found in Prometheus server response'.format(
missing_key
)
raise PrometheusFetcherError(msg)
# Returning unique ids
return list(set(scope_ids))

View File

@ -20,8 +20,9 @@ from decimal import Decimal
import mock
from cloudkitty import collector
from cloudkitty.collector import exceptions
from cloudkitty.collector import prometheus
from cloudkitty import json_utils as json
from cloudkitty.common.prometheus_client import PrometheusResponseError
from cloudkitty import tests
from cloudkitty.tests import samples
from cloudkitty import transformer
@ -150,7 +151,7 @@ class PrometheusCollectorTest(tests.TestCase):
}
no_response = mock.patch(
'cloudkitty.collector.prometheus.PrometheusClient.get_instant',
'cloudkitty.common.prometheus_client.PrometheusClient.get_instant',
return_value=samples.PROMETHEUS_RESP_INSTANT_QUERY,
)
@ -167,7 +168,7 @@ class PrometheusCollectorTest(tests.TestCase):
def test_format_retrieve_raise_NoDataCollected(self):
no_response = mock.patch(
'cloudkitty.collector.prometheus.PrometheusClient.get_instant',
'cloudkitty.common.prometheus_client.PrometheusClient.get_instant',
return_value=samples.PROMETHEUS_EMPTY_RESP_INSTANT_QUERY,
)
@ -182,136 +183,19 @@ class PrometheusCollectorTest(tests.TestCase):
q_filter=None,
)
class PrometheusClientTest(tests.TestCase):
class FakeResponse(object):
"""Mimics an HTTP ``requests`` response"""
def __init__(self, url, text, status_code):
self.url = url
self.text = text
self.status_code = status_code
def json(self):
return json.loads(self.text)
@staticmethod
def _mock_requests_get(text):
"""Factory to build FakeResponse with desired response body text"""
return lambda *args, **kwargs: PrometheusClientTest.FakeResponse(
args[0], text, 200,
def test_format_retrieve_all_raises_exception(self):
invalid_response = mock.patch(
'cloudkitty.common.prometheus_client.PrometheusClient.get_instant',
side_effect=PrometheusResponseError,
)
def setUp(self):
super(PrometheusClientTest, self).setUp()
self.client = prometheus.PrometheusClient(
'http://localhost:9090/api/v1',
)
def test_get_with_no_options(self):
with mock.patch('requests.get') as mock_get:
self.client._get(
'query_range',
params={
'query': 'max(http_requests_total) by (project_id)',
'start': samples.FIRST_PERIOD_BEGIN,
'end': samples.FIRST_PERIOD_END,
'step': 10,
},
)
mock_get.assert_called_once_with(
'http://localhost:9090/api/v1/query_range',
params={
'query': 'max(http_requests_total) by (project_id)',
'start': samples.FIRST_PERIOD_BEGIN,
'end': samples.FIRST_PERIOD_END,
'step': 10,
},
auth=None,
verify=True,
)
def test_get_with_options(self):
client = prometheus.PrometheusClient(
'http://localhost:9090/api/v1',
auth=('foo', 'bar'),
verify='/some/random/path',
)
with mock.patch('requests.get') as mock_get:
client._get(
'query_range',
params={
'query': 'max(http_requests_total) by (project_id)',
'start': samples.FIRST_PERIOD_BEGIN,
'end': samples.FIRST_PERIOD_END,
'step': 10,
},
)
mock_get.assert_called_once_with(
'http://localhost:9090/api/v1/query_range',
params={
'query': 'max(http_requests_total) by (project_id)',
'start': samples.FIRST_PERIOD_BEGIN,
'end': samples.FIRST_PERIOD_END,
'step': 10,
},
auth=('foo', 'bar'),
verify='/some/random/path',
)
def test_get_instant(self):
mock_get = mock.patch(
'requests.get',
side_effect=self._mock_requests_get('{"foo": "bar"}'),
)
with mock_get:
res = self.client.get_instant(
'max(http_requests_total) by (project_id)',
)
self.assertEqual(res, {'foo': 'bar'})
def test_get_range(self):
mock_get = mock.patch(
'requests.get',
side_effect=self._mock_requests_get('{"foo": "bar"}'),
)
with mock_get:
res = self.client.get_range(
'max(http_requests_total) by (project_id)',
samples.FIRST_PERIOD_BEGIN,
samples.FIRST_PERIOD_END,
10,
)
self.assertEqual(res, {'foo': 'bar'})
def test_get_instant_raises_error_on_bad_json(self):
# Simulating malformed JSON response from HTTP+PromQL instant request
mock_get = mock.patch(
'requests.get',
side_effect=self._mock_requests_get('{"foo": "bar"'),
)
with mock_get:
with invalid_response:
self.assertRaises(
prometheus.PrometheusResponseError,
self.client.get_instant,
'max(http_requests_total) by (project_id)',
)
def test_get_range_raises_error_on_bad_json(self):
# Simulating malformed JSON response from HTTP+PromQL range request
mock_get = mock.patch(
'requests.get',
side_effect=self._mock_requests_get('{"foo": "bar"'),
)
with mock_get:
self.assertRaises(
prometheus.PrometheusResponseError,
self.client.get_range,
'max(http_requests_total) by (project_id)',
samples.FIRST_PERIOD_BEGIN,
samples.FIRST_PERIOD_END,
10,
exceptions.CollectError,
self.collector.retrieve,
metric_name='http_requests_total',
start=samples.FIRST_PERIOD_BEGIN,
end=samples.FIRST_PERIOD_END,
project_id=samples.TENANT,
q_filter=None,
)

View File

@ -0,0 +1,155 @@
# -*- coding: utf-8 -*-
# Copyright 2019 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import mock
from cloudkitty.collector import prometheus
from cloudkitty import json_utils as json
from cloudkitty import tests
from cloudkitty.tests import samples
class PrometheusClientTest(tests.TestCase):
class FakeResponse(object):
"""Mimics an HTTP ``requests`` response"""
def __init__(self, url, text, status_code):
self.url = url
self.text = text
self.status_code = status_code
def json(self):
return json.loads(self.text)
@staticmethod
def _mock_requests_get(text):
"""Factory to build FakeResponse with desired response body text"""
return lambda *args, **kwargs: PrometheusClientTest.FakeResponse(
args[0], text, 200,
)
def setUp(self):
super(PrometheusClientTest, self).setUp()
self.client = prometheus.PrometheusClient(
'http://localhost:9090/api/v1',
)
def test_get_with_no_options(self):
with mock.patch('requests.get') as mock_get:
self.client._get(
'query_range',
params={
'query': 'max(http_requests_total) by (project_id)',
'start': samples.FIRST_PERIOD_BEGIN,
'end': samples.FIRST_PERIOD_END,
'step': 10,
},
)
mock_get.assert_called_once_with(
'http://localhost:9090/api/v1/query_range',
params={
'query': 'max(http_requests_total) by (project_id)',
'start': samples.FIRST_PERIOD_BEGIN,
'end': samples.FIRST_PERIOD_END,
'step': 10,
},
auth=None,
verify=True,
)
def test_get_with_options(self):
client = prometheus.PrometheusClient(
'http://localhost:9090/api/v1',
auth=('foo', 'bar'),
verify='/some/random/path',
)
with mock.patch('requests.get') as mock_get:
client._get(
'query_range',
params={
'query': 'max(http_requests_total) by (project_id)',
'start': samples.FIRST_PERIOD_BEGIN,
'end': samples.FIRST_PERIOD_END,
'step': 10,
},
)
mock_get.assert_called_once_with(
'http://localhost:9090/api/v1/query_range',
params={
'query': 'max(http_requests_total) by (project_id)',
'start': samples.FIRST_PERIOD_BEGIN,
'end': samples.FIRST_PERIOD_END,
'step': 10,
},
auth=('foo', 'bar'),
verify='/some/random/path',
)
def test_get_instant(self):
mock_get = mock.patch(
'requests.get',
side_effect=self._mock_requests_get('{"foo": "bar"}'),
)
with mock_get:
res = self.client.get_instant(
'max(http_requests_total) by (project_id)',
)
self.assertEqual(res, {'foo': 'bar'})
def test_get_range(self):
mock_get = mock.patch(
'requests.get',
side_effect=self._mock_requests_get('{"foo": "bar"}'),
)
with mock_get:
res = self.client.get_range(
'max(http_requests_total) by (project_id)',
samples.FIRST_PERIOD_BEGIN,
samples.FIRST_PERIOD_END,
10,
)
self.assertEqual(res, {'foo': 'bar'})
def test_get_instant_raises_error_on_bad_json(self):
# Simulating malformed JSON response from HTTP+PromQL instant request
mock_get = mock.patch(
'requests.get',
side_effect=self._mock_requests_get('{"foo": "bar"'),
)
with mock_get:
self.assertRaises(
prometheus.PrometheusResponseError,
self.client.get_instant,
'max(http_requests_total) by (project_id)',
)
def test_get_range_raises_error_on_bad_json(self):
# Simulating malformed JSON response from HTTP+PromQL range request
mock_get = mock.patch(
'requests.get',
side_effect=self._mock_requests_get('{"foo": "bar"'),
)
with mock_get:
self.assertRaises(
prometheus.PrometheusResponseError,
self.client.get_range,
'max(http_requests_total) by (project_id)',
samples.FIRST_PERIOD_BEGIN,
samples.FIRST_PERIOD_END,
10,
)

View File

View File

@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
# Copyright 2019 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
#
import mock
from cloudkitty.common.prometheus_client import PrometheusClient
from cloudkitty.common.prometheus_client import PrometheusResponseError
from cloudkitty.fetcher import prometheus
from cloudkitty import tests
class PrometheusFetcherTest(tests.TestCase):
def setUp(self):
super(PrometheusFetcherTest, self).setUp()
self.conf.set_override(
'metric', 'http_requests_total', 'fetcher_prometheus',
)
self.conf.set_override(
'scope_attribute', 'namespace', 'fetcher_prometheus',
)
self.fetcher = prometheus.PrometheusFetcher()
def test_get_tenants_build_query(self):
query = (
'max(http_requests_total) by (namespace)'
)
with mock.patch.object(
PrometheusClient, 'get_instant',
) as mock_get:
self.fetcher.get_tenants()
mock_get.assert_called_once_with(query)
def test_get_tenants_build_query_with_filter(self):
query = (
'max(http_requests_total{label1="foo"})'
' by (namespace)'
)
self.conf.set_override(
'filters', 'label1:foo', 'fetcher_prometheus',
)
with mock.patch.object(
PrometheusClient, 'get_instant',
) as mock_get:
self.fetcher.get_tenants()
mock_get.assert_called_once_with(query)
def test_get_tenants_raises_exception(self):
no_response = mock.patch(
'cloudkitty.common.prometheus_client.PrometheusClient.get_instant',
return_value={},
)
with no_response:
self.assertRaises(
prometheus.PrometheusFetcherError,
self.fetcher.get_tenants,
)
def test_get_tenants_raises_exception2(self):
no_response = mock.patch(
'cloudkitty.common.prometheus_client.PrometheusClient.get_instant',
return_value={
'data': {
'result': [{
'metric': {
'foo': 'bar'
}
}]
}
},
)
with no_response:
self.assertRaises(
prometheus.PrometheusFetcherError,
self.fetcher.get_tenants,
)
def test_get_tenants_raises_exception3(self):
invalid_response = mock.patch(
'cloudkitty.common.prometheus_client.PrometheusClient.get_instant',
side_effect=PrometheusResponseError,
)
with invalid_response:
self.assertRaises(
prometheus.PrometheusFetcherError,
self.fetcher.get_tenants,
)

View File

@ -0,0 +1,10 @@
---
features:
- |
A Prometheus scope fetcher has been added in order to dynamically discover
scopes from a Prometheus service using a user defined metric and a scope
attribute.
It can also filter out the response from Prometheus using metadata filters
to have a more fine-grained control over scope discovery.
It features HTTP basic auth capabilities and HTTPS configuration options
similar to Prometheus collector.

View File

@ -54,6 +54,7 @@ cloudkitty.fetchers =
keystone = cloudkitty.fetcher.keystone:KeystoneFetcher
source = cloudkitty.fetcher.source:SourceFetcher
gnocchi = cloudkitty.fetcher.gnocchi:GnocchiFetcher
prometheus = cloudkitty.fetcher.prometheus:PrometheusFetcher
cloudkitty.transformers =
CloudKittyFormatTransformer = cloudkitty.transformer.format:CloudKittyFormatTransformer