Add prometheus data source for watcher decision engine

This adds a new data source for the Watcher decision engine that
implements the watcher.decision_engine.datasources.DataSourceBase.

related spec was merged at [1].

Implements: blueprint prometheus-datasource

[1] https://review.opendev.org/c/openstack/watcher-specs/+/933300

Change-Id: I6a70c4acc70a864c418cf347f5f6951cb92ec906
This commit is contained in:
m 2024-11-01 10:48:36 +02:00
parent d6cb38289e
commit 3f26dc47f2
9 changed files with 1076 additions and 0 deletions
doc/source/datasources
releasenotes/notes
requirements.txt
watcher
common
conf
decision_engine/datasources
tests/decision_engine/datasources

@ -0,0 +1,130 @@
=====================
Prometheus datasource
=====================
Synopsis
--------
The Prometheus datasource allows Watcher to use a Prometheus server as the
source for collected metrics used by the Watcher decision engine. At minimum
deployers must configure the ``host`` and ``port`` at which the Prometheus
server is listening.
Requirements
-------------
It is required that Prometheus metrics contain a label to identify the hostname
of the exporter from which the metric was collected. This is used to match
against the Watcher cluster model ``ComputeNode.hostname``. The default for
this label is ``fqdn`` and in the prometheus scrape configs would look like:
.. code-block::
scrape_configs:
- job_name: node
static_configs:
- targets: ['10.1.2.3:9100']
labels:
fqdn: "testbox.controlplane.domain"
This default can be overridden when a deployer uses a different label to
identify the exporter host (for example ``hostname`` or ``host``, or any other
label, as long as it identifies the host).
Internally this label is used in creating a ``fqdn_instance_map``, mapping
the fqdn with the Prometheus instance label associated with each exporter.
The keys of the resulting fqdn_instance_map are expected to match the
``ComputeNode.hostname`` used in the Watcher decision engine cluster model.
An example ``fqdn_instance_map`` is the following:
.. code-block::
{
'ena.controlplane.domain': '10.1.2.1:9100',
'dio.controlplane.domain': '10.1.2.2:9100',
'tria.controlplane.domain': '10.1.2.3:9100'
}
Limitations
-----------
The current implementation doesn't support the ``statistic_series`` function of
the Watcher ``class DataSourceBase``. It is expected that the
``statistic_aggregation`` function (which is implemented) is sufficient in
providing the **current** state of the managed resources in the cluster.
The ``statistic_aggregation`` function defaults to querying back 300 seconds,
starting from the present time (the time period is a function parameter and
can be set to a value as required). Implementing the ``statistic_series`` can
always be re-visited if the requisite interest and work cycles are volunteered
by the interested parties.
One further note about a limitation in the implemented
``statistic_aggregation`` function. This function is defined with a
``granularity`` parameter, to be used when querying whichever of the Watcher
``DataSourceBase`` metrics providers. In the case of Prometheus, we do not
fetch and then process individual metrics across the specified time period.
Instead we use the PromQL querying operators and functions, so that the
server itself will process the request across the specified parameters and
then return the result. So ``granularity`` parameter is redundant and remains
unused for the Prometheus implementation of ``statistic_aggregation``. The
granularity of the data fetched by Prometheus server is specified in
configuration as the server ``scrape_interval`` (current default 15 seconds).
Configuration
-------------
A deployer must set the ``datasources`` parameter to include ``prometheus``
under the watcher_datasources section of watcher.conf (or add ``prometheus`` in
datasources for a specific strategy if preferred eg. under the
``[watcher_strategies.workload_stabilization]`` section).
The watcher.conf configuration file is also used to set the parameter values
required by the Watcher Prometheus data source. The configuration can be
added under the ``[prometheus_client]`` section and the available options are
duplicated below from the code as they are self documenting:
.. code-block::
cfg.StrOpt('host',
help="The hostname or IP address for the prometheus server."),
cfg.StrOpt('port',
help="The port number used by the prometheus server."),
cfg.StrOpt('fqdn_label',
default="fqdn",
help="The label that Prometheus uses to store the fqdn of "
"exporters. Defaults to 'fqdn'."),
cfg.StrOpt('username',
help="The basic_auth username to use to authenticate with the "
"Prometheus server."),
cfg.StrOpt('password',
secret=True,
help="The basic_auth password to use to authenticate with the "
"Prometheus server."),
cfg.StrOpt('cafile',
help="Path to the CA certificate for establishing a TLS "
"connection with the Prometheus server."),
cfg.StrOpt('certfile',
help="Path to the client certificate for establishing a TLS "
"connection with the Prometheus server."),
cfg.StrOpt('keyfile',
help="Path to the client key for establishing a TLS "
"connection with the Prometheus server."),
The ``host`` and ``port`` are **required** configuration options which have
no set default. These specify the hostname (or IP) and port for at which
the Prometheus server is listening. The ``fqdn_label`` allows deployers to
override the required metric label used to match Prometheus node exporters
against the Watcher ComputeNodes in the Watcher decision engine cluster data
model. The default is ``fqdn`` and deployers can specify any other value
(e.g. if they have an equivalent but different label such as ``host``).
So a sample watcher.conf configured to use the Prometheus server at
``10.2.3.4:9090`` would look like the following:
.. code-block::
[watcher_datasources]
datasources = prometheus
[prometheus_client]
host = 10.2.3.4
port = 9090
fqdn_label = fqdn

@ -0,0 +1,8 @@
---
features:
- |
A new Prometheus data source is added. This allows the watcher decision
engine to collect metrics from Prometheus server. For more information
about the Prometheus data source, including limitations and configuration
options see
https://docs.openstack.org/watcher/latest/datasources/prometheus.html

@ -36,6 +36,7 @@ python-keystoneclient>=3.15.0 # Apache-2.0
python-monascaclient>=1.12.0 # Apache-2.0
python-neutronclient>=6.7.0 # Apache-2.0
python-novaclient>=14.1.0 # Apache-2.0
python-observabilityclient>=0.3.0 # Apache-2.0
python-openstackclient>=3.14.0 # Apache-2.0
python-ironicclient>=2.5.0 # Apache-2.0
SQLAlchemy>=1.2.5 # MIT

@ -154,6 +154,10 @@ class InvalidParameter(Invalid):
msg_fmt = _("%(parameter)s has to be of type %(parameter_type)s")
class MissingParameter(Invalid):
msg_fmt = _("%(parameter)s is required but missing. Check watcher.conf")
class InvalidIdentity(Invalid):
msg_fmt = _("Expected a uuid or int but received %(identity)s")

@ -42,6 +42,7 @@ from watcher.conf import nova_client
from watcher.conf import paths
from watcher.conf import placement_client
from watcher.conf import planner
from watcher.conf import prometheus_client
from watcher.conf import service
CONF = cfg.CONF
@ -70,3 +71,4 @@ clients_auth.register_opts(CONF)
ironic_client.register_opts(CONF)
collector.register_opts(CONF)
placement_client.register_opts(CONF)
prometheus_client.register_opts(CONF)

@ -0,0 +1,58 @@
# Copyright 2024 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_config import cfg
prometheus_client = cfg.OptGroup(name='prometheus_client',
title='Configuration Options for Prometheus',
help="See https://docs.openstack.org/watcher/"
"latest/datasources/prometheus.html for "
"details on how these options are used.")
PROMETHEUS_CLIENT_OPTS = [
cfg.StrOpt('host',
help="The hostname or IP address for the prometheus server."),
cfg.StrOpt('port',
help="The port number used by the prometheus server."),
cfg.StrOpt('fqdn_label',
default="fqdn",
help="The label that Prometheus uses to store the fqdn of "
"exporters. Defaults to 'fqdn'."),
cfg.StrOpt('username',
help="The basic_auth username to use to authenticate with the "
"Prometheus server."),
cfg.StrOpt('password',
secret=True,
help="The basic_auth password to use to authenticate with the "
"Prometheus server."),
cfg.StrOpt('cafile',
help="Path to the CA certificate for establishing a TLS "
"connection with the Prometheus server."),
cfg.StrOpt('certfile',
help="Path to the client certificate for establishing a TLS "
"connection with the Prometheus server."),
cfg.StrOpt('keyfile',
help="Path to the client key for establishing a TLS "
"connection with the Prometheus server."),
]
def register_opts(conf):
conf.register_group(prometheus_client)
conf.register_opts(PROMETHEUS_CLIENT_OPTS, group=prometheus_client)
def list_opts():
return [(prometheus_client, PROMETHEUS_CLIENT_OPTS)]

@ -25,6 +25,7 @@ from watcher.decision_engine.datasources import ceilometer as ceil
from watcher.decision_engine.datasources import gnocchi as gnoc
from watcher.decision_engine.datasources import grafana as graf
from watcher.decision_engine.datasources import monasca as mon
from watcher.decision_engine.datasources import prometheus as prom
LOG = log.getLogger(__name__)
@ -36,6 +37,7 @@ class DataSourceManager(object):
(ceil.CeilometerHelper.NAME, ceil.CeilometerHelper.METRIC_MAP),
(mon.MonascaHelper.NAME, mon.MonascaHelper.METRIC_MAP),
(graf.GrafanaHelper.NAME, graf.GrafanaHelper.METRIC_MAP),
(prom.PrometheusHelper.NAME, prom.PrometheusHelper.METRIC_MAP),
])
"""Dictionary with all possible datasources, dictionary order is
the default order for attempting to use datasources
@ -48,6 +50,7 @@ class DataSourceManager(object):
self._monasca = None
self._gnocchi = None
self._grafana = None
self._prometheus = None
# Dynamically update grafana metric map, only available at runtime
# The metric map can still be overridden by a yaml config file
@ -104,6 +107,16 @@ class DataSourceManager(object):
def grafana(self, grafana):
self._grafana = grafana
@property
def prometheus(self):
if self._prometheus is None:
self._prometheus = prom.PrometheusHelper()
return self._prometheus
@prometheus.setter
def prometheus(self, prometheus):
self._prometheus = prometheus
def get_backend(self, metrics):
"""Determine the datasource to use from the configuration

@ -0,0 +1,442 @@
# Copyright 2024 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from observabilityclient import prometheus_client
from oslo_config import cfg
from oslo_log import log
import re
from watcher._i18n import _
from watcher.common import exception
from watcher.decision_engine.datasources import base
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class PrometheusHelper(base.DataSourceBase):
"""PrometheusHelper class for retrieving metrics from Prometheus server
This class implements the DataSourceBase to allow Watcher to query
Prometheus as a data source for metrics.
"""
NAME = 'prometheus'
METRIC_MAP = dict(host_cpu_usage='node_cpu_seconds_total',
host_ram_usage='node_memory_MemAvailable_bytes',
host_outlet_temp=None,
host_inlet_temp=None,
host_airflow=None,
host_power=None,
instance_cpu_usage=None,
instance_ram_usage=None,
instance_ram_allocated=None,
instance_l3_cache_usage=None,
instance_root_disk_size=None,
)
AGGREGATES_MAP = dict(mean='avg', max='max', min='min', count='avg')
def __init__(self):
"""Initialise the PrometheusHelper
The prometheus helper uses the PrometheusAPIClient provided by
python-observabilityclient.
The prometheus_fqdn_instance_map maps the fqdn of each node to the
Prometheus instance label added to all metrics on that node. When
making queries to Prometheus we use the instance label to specify
the node for which metrics are to be retrieved.
host, port and fqdn_label come from watcher_client
config. The prometheus_fqdn_label allows override of the required label
in Prometheus scrape configs that specifies each target's fqdn.
"""
self.prometheus = self._setup_prometheus_client()
self.prometheus_fqdn_label = (
CONF.prometheus_client.fqdn_label
)
self.prometheus_fqdn_instance_map = (
self._build_prometheus_fqdn_instance_map()
)
self.prometheus_host_instance_map = (
self._build_prometheus_host_instance_map()
)
def _setup_prometheus_client(self):
"""Initialise the prometheus client with config options
Use the prometheus_client options in watcher.conf to setup
the PrometheusAPIClient client object and return it.
:raises watcher.common.exception.MissingParameter if
prometheus host or port is not set in the watcher.conf
under the [prometheus_client] section.
:raises watcher.common.exception.InvalidParameter if
the prometheus host or port have invalid format.
"""
def _validate_host_port(host, port):
if len(host) > 255:
return (False, "hostname is too long: '%s'" % host)
if host[-1] == '.':
host = host[:-1]
legal_hostname = re.compile(
"(?!-)[a-z0-9-]{1,63}(?<!-)$", re.IGNORECASE)
if not all(legal_hostname.match(host_part)
for host_part in host.split(".")):
return (False, "hostname '%s' failed regex match " % host)
try:
assert bool(1 <= int(port) <= 65535)
except (AssertionError, ValueError):
return (False, "missing or invalid port number '%s' "
% port)
return (True, "all good")
_host = CONF.prometheus_client.host
_port = CONF.prometheus_client.port
if (not _host or not _port):
raise exception.MissingParameter(
message=(_(
"prometheus host and port must be set in watcher.conf "
"under the [prometheus_client] section. Can't initialise "
"the datasource without valid host and port."))
)
validated, reason = _validate_host_port(_host, _port)
if (not validated):
raise exception.InvalidParameter(
message=(_(
"A valid prometheus host and port are required. The #"
"values found in watcher.conf are '%(host)s' '%(port)s'. "
"This fails validation for the following reason: "
"%(reason)s.")
% {'host': _host, 'port': _port, 'reason': reason})
)
the_client = prometheus_client.PrometheusAPIClient(
"%s:%s" % (_host, _port))
# check if tls options or basic_auth options are set and use them
prometheus_user = CONF.prometheus_client.username
prometheus_pass = CONF.prometheus_client.password
prometheus_ca_cert = CONF.prometheus_client.cafile
prometheus_client_cert = CONF.prometheus_client.certfile
prometheus_client_key = CONF.prometheus_client.keyfile
if (prometheus_ca_cert):
the_client.set_ca_cert(prometheus_ca_cert)
if (prometheus_client_cert and prometheus_client_key):
the_client.set_client_cert(
prometheus_client_cert, prometheus_client_key)
if (prometheus_user and prometheus_pass):
the_client.set_basic_auth(prometheus_user, prometheus_pass)
return the_client
def _build_prometheus_fqdn_instance_map(self):
"""Build the fqdn<-->instance_label mapping needed for queries
Watcher knows nodes by their hostname. In Prometheus however the
scrape targets (also known as 'instances') are specified by I.P.
(or hostname) and port number. This function creates a mapping between
the fully qualified domain name of each node and the corresponding
instance label used in the scrape config. This relies on a custom
'fqdn' label added to Prometheus scrape_configs. Operators can use
a different custom label instead by setting the prometheus_fqdn_label
config option under the prometheus_client section of watcher config.
The built prometheus_fqdn_instance_map is used to match watcher
node.hostname if watcher stores fqdn and otherwise the
host_instance_map is used instead.
:return a dict mapping fqdn to instance label. For example:
{'marios-env-again.controlplane.domain': '10.1.2.3:9100'}
"""
prometheus_targets = self.prometheus._get(
"targets?state=active")['data']['activeTargets']
# >>> prometheus_targets[0]['labels']
# {'fqdn': 'marios-env-again.controlplane.domain',
# 'instance': 'localhost:9100', 'job': 'node'}
fqdn_instance_map = {
fqdn: instance for (fqdn, instance) in (
(target['labels'].get(self.prometheus_fqdn_label),
target['labels'].get('instance'))
for target in prometheus_targets
if target.get('labels', {}).get(self.prometheus_fqdn_label)
)
}
if not fqdn_instance_map:
LOG.error(
"Could not create fqdn instance map from Prometheus "
"targets config. Prometheus returned the following: %s",
prometheus_targets
)
return {}
return fqdn_instance_map
def _build_prometheus_host_instance_map(self):
"""Build the hostname<-->instance_label mapping needed for queries
The prometheus_fqdn_instance_map has the fully qualified domain name
for hosts. This will create a duplicate map containing only the host
name part. Depending on the watcher node.hostname either the
fqdn_instance_map or the host_instance_map will be used to resolve
the correct prometheus instance label for queries. In the event the
fqdn_instance_map keys are not valid fqdn (for example it contains
hostnames, not fqdn) the host_instance_map cannot be created and
an empty dictionary is returned with a warning logged.
:return a dict mapping hostname to instance label. For example:
{'marios-env-again': 'localhost:9100'}
"""
if not self.prometheus_fqdn_instance_map:
LOG.error("Cannot build host_instance_map without "
"fqdn_instance_map")
return {}
host_instance_map = {
host: instance for (host, instance) in (
(fqdn.split('.')[0], inst)
for fqdn, inst in self.prometheus_fqdn_instance_map.items()
if '.' in fqdn
)
}
if not host_instance_map:
LOG.warning("Creating empty host instance map. Are the keys "
"in prometheus_fqdn_instance_map valid fqdn?")
return {}
return host_instance_map
def _resolve_prometheus_instance_label(self, node_name):
"""Resolve the prometheus instance label to use in queries
Given the watcher node.hostname, resolve the prometheus instance
label for use in queries, first trying the fqdn_instance_map and
then the host_instance_map (watcher.node_name can be fqdn or hostname).
If the name is not resolved after the first attempt, rebuild the fqdn
and host instance maps and try again. This allows for new hosts added
after the initialisation of the fqdn_instance_map.
:param node_name: the watcher node.hostname
:return String for the prometheus instance label and None if not found
"""
def _query_maps(node):
return self.prometheus_fqdn_instance_map.get(
node, self.prometheus_host_instance_map.get(node, None))
instance_label = _query_maps(node_name)
# refresh the fqdn and host instance maps and retry
if not instance_label:
self.prometheus_fqdn_instance_map = (
self._build_prometheus_fqdn_instance_map()
)
self.prometheus_host_instance_map = (
self._build_prometheus_host_instance_map()
)
instance_label = _query_maps(node_name)
if not instance_label:
LOG.error("Cannot query prometheus without instance label. "
"Could not resolve %s", node_name)
return None
return instance_label
def _resolve_prometheus_aggregate(self, watcher_aggregate, meter):
"""Resolve the prometheus aggregate using self.AGGREGATES_MAP
This uses the AGGREGATES_MAP to resolve the correct prometheus
aggregate to use in queries, from the given watcher aggregate
"""
if watcher_aggregate == 'count':
LOG.warning('Prometheus data source does not currently support '
' the count aggregate. Proceeding with mean (avg).')
promql_aggregate = self.AGGREGATES_MAP.get(watcher_aggregate)
if not promql_aggregate:
raise exception.InvalidParameter(
message=(_("Unknown Watcher aggregate %s. This does not "
"resolve to any valid prometheus query aggregate.")
% watcher_aggregate)
)
return promql_aggregate
def _build_prometheus_query(self, aggregate, meter, instance_label,
period):
"""Build and return the prometheus query string with the given args
This function builds and returns the string query that will be sent
to the Prometheus server /query endpoint. For host cpu usage we use:
100 - (avg by (instance)(rate(node_cpu_seconds_total{mode='idle',
instance='some_host'}[300s])) * 100)
so using prometheus rate function over the specified period, we average
per instance (all cpus) idle time and then 'everything else' is cpu
usage time.
For host memory usage we use:
(node_memory_MemTotal_bytes{instance='the_host'} -
avg_over_time(
node_memory_MemAvailable_bytes{instance='the_host'}[300s]))
/ 1024 / 1024
So we take total and subtract available memory to determine
how much is in use. We use the prometheus xxx_over_time functions
avg/max/min depending on the aggregate with the specified time period.
:param aggregate: one of the values of self.AGGREGATES_MAP
:param meter: the name of the Prometheus meter to use
:param instance_label: the Prometheus instance label (scrape target).
:param period: the period in seconds for which to query
:return: a String containing the Prometheus query
:raises watcher.common.exception.InvalidParameter if params are None
:raises watcher.common.exception.InvalidParameter if meter is not
known or currently supported (prometheus meter name).
"""
query_args = None
if (meter is None or aggregate is None or instance_label is None or
period is None):
raise exception.InvalidParameter(
message=(_(
"Cannot build prometheus query without args. "
"You provided: meter %(mtr)s, aggregate %(agg)s, "
"instance_label %(inst)s, period %(prd)s")
% {'mtr': meter, 'agg': aggregate,
'inst': instance_label, 'prd': period})
)
if meter == 'node_cpu_seconds_total':
query_args = (
"100 - (%s by (instance)(rate(%s"
"{mode='idle',instance='%s'}[%ss])) * 100)" %
(aggregate, meter, instance_label, period)
)
elif meter == 'node_memory_MemAvailable_bytes':
query_args = (
"(node_memory_MemTotal_bytes{instance='%s'} "
"- %s_over_time(%s{instance='%s'}[%ss])) "
"/ 1024 / 1024" %
(instance_label, aggregate, meter,
instance_label, period)
)
else:
raise exception.InvalidParameter(
message=(_("Cannot process prometheus meter %s") % meter)
)
return query_args
def check_availability(self):
"""check if Prometheus server is available for queries
Performs HTTP get on the prometheus API /status/runtimeinfo endpoint.
The prometheus_client will raise a PrometheuAPIClientError if the
call is unsuccessful, which is caught here and a warning logged.
"""
try:
self.prometheus._get("status/runtimeinfo")
except prometheus_client.PrometheusAPIClientError:
LOG.warning(
"check_availability raised PrometheusAPIClientError. "
"Is Prometheus server down?"
)
return 'not available'
return 'available'
def list_metrics(self):
"""Fetch all prometheus metrics from api/v1/label/__name__/values
The prometheus_client will raise a PrometheuAPIClientError if the
call is unsuccessful, which is caught here and a warning logged.
"""
try:
response = self.prometheus._get("label/__name__/values")
except prometheus_client.PrometheusAPIClientError:
LOG.warning(
"list_metrics raised PrometheusAPIClientError. Is Prometheus"
"server down?"
)
return set()
return set(response['data'])
def statistic_aggregation(self, resource=None, resource_type=None,
meter_name=None, period=300, aggregate='mean',
granularity=300):
meter = self._get_meter(meter_name)
query_args = ''
instance_label = ''
if resource_type == 'compute_node':
instance_label = self._resolve_prometheus_instance_label(
resource.hostname)
else:
LOG.warning(
"Prometheus data source does not currently support "
"resource_type %s", resource_type
)
return None
promql_aggregate = self._resolve_prometheus_aggregate(aggregate, meter)
query_args = self._build_prometheus_query(
promql_aggregate, meter, instance_label, period
)
if not query_args:
LOG.error("Cannot proceed without valid prometheus query")
return None
result = self.query_retry(
self.prometheus.query, query_args,
ignored_exc=prometheus_client.PrometheusAPIClientError,
)
return float(result[0].value) if result else None
def statistic_series(self, resource=None, resource_type=None,
meter_name=None, start_time=None, end_time=None,
granularity=300):
raise NotImplementedError(
_('Prometheus helper currently does not support statistic_series. '
'This can be considered for future enhancement.'))
def _invert_max_min_aggregate(self, agg):
"""Invert max and min for node/host metric queries from node-exporter
because we query for 'idle'/'unused' cpu and memory.
For Watcher 'max cpu used' we query for prometheus 'min idle time'.
For Watcher 'max memory used' we retrieve min 'unused'/'available'
memory from Prometheus. This internal function is used exclusively
by get_host_cpu_usage and get_host_ram_usage.
:param agg: the metric collection aggregate
:return: a String aggregate
"""
if agg == 'max':
return 'min'
elif agg == 'min':
return 'max'
return agg
def get_host_cpu_usage(self, resource, period=300,
aggregate="mean", granularity=None):
"""Query prometheus for node_cpu_seconds_total
This calculates the host cpu usage and returns it as a percentage
The calculation is made by using the cpu 'idle' time, per
instance (so all CPUs are included). For example the query looks like
(100 - (avg by (instance)(rate(node_cpu_seconds_total
{mode='idle',instance='localhost:9100'}[300s])) * 100))
"""
aggregate = self._invert_max_min_aggregate(aggregate)
cpu_usage = self.statistic_aggregation(
resource, 'compute_node',
'host_cpu_usage', period=period,
granularity=granularity, aggregate=aggregate)
return float(cpu_usage) if cpu_usage else None
def get_host_ram_usage(self, resource, period=300,
aggregate="mean", granularity=None):
aggregate = self._invert_max_min_aggregate(aggregate)
ram_usage = self.statistic_aggregation(
resource, 'compute_node',
'host_ram_usage', period=period,
granularity=granularity, aggregate=aggregate)
return float(ram_usage) if ram_usage else None

@ -0,0 +1,418 @@
# Copyright 2024 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from unittest import mock
from observabilityclient import prometheus_client
from oslo_config import cfg
from watcher.common import exception
from watcher.decision_engine.datasources import prometheus as prometheus_helper
from watcher.tests import base
class TestPrometheusHelper(base.BaseTestCase):
def setUp(self):
super(TestPrometheusHelper, self).setUp()
with mock.patch.object(
prometheus_client.PrometheusAPIClient, '_get',
return_value={'data': {'activeTargets': [
{'labels': {
'fqdn': 'marios-env.controlplane.domain',
'instance': '10.0.1.2:9100', 'job': 'node',
}},
{'labels': {
'fqdn': 'marios-env-again.controlplane.domain',
'instance': 'localhost:9100', 'job': 'node'
}}
]}}):
cfg.CONF.prometheus_client.host = "foobarbaz"
cfg.CONF.prometheus_client.port = "1234"
self.helper = prometheus_helper.PrometheusHelper()
stat_agg_patcher = mock.patch.object(
self.helper, 'statistic_aggregation',
spec=prometheus_helper.PrometheusHelper.statistic_aggregation)
self.mock_aggregation = stat_agg_patcher.start()
self.addCleanup(stat_agg_patcher.stop)
def test_unset_missing_prometheus_host(self):
cfg.CONF.prometheus_client.port = '123'
cfg.CONF.prometheus_client.host = None
self.assertRaisesRegex(
exception.MissingParameter, 'prometheus host and port must be '
'set in watcher.conf',
prometheus_helper.PrometheusHelper
)
cfg.CONF.prometheus_client.host = ''
self.assertRaisesRegex(
exception.MissingParameter, 'prometheus host and port must be '
'set in watcher.conf',
prometheus_helper.PrometheusHelper
)
def test_unset_missing_prometheus_port(self):
cfg.CONF.prometheus_client.host = 'some.host.domain'
cfg.CONF.prometheus_client.port = None
self.assertRaisesRegex(
exception.MissingParameter, 'prometheus host and port must be '
'set in watcher.conf',
prometheus_helper.PrometheusHelper
)
cfg.CONF.prometheus_client.port = ''
self.assertRaisesRegex(
exception.MissingParameter, 'prometheus host and port must be '
'set in watcher.conf',
prometheus_helper.PrometheusHelper
)
def test_invalid_prometheus_port(self):
cfg.CONF.prometheus_client.host = "hostOK"
cfg.CONF.prometheus_client.port = "123badPort"
self.assertRaisesRegex(
exception.InvalidParameter, "missing or invalid port number "
"'123badPort'",
prometheus_helper.PrometheusHelper
)
cfg.CONF.prometheus_client.port = "123456"
self.assertRaisesRegex(
exception.InvalidParameter, "missing or invalid port number "
"'123456'",
prometheus_helper.PrometheusHelper
)
def test_invalid_prometheus_host(self):
cfg.CONF.prometheus_client.port = "123"
cfg.CONF.prometheus_client.host = "-badhost"
self.assertRaisesRegex(
exception.InvalidParameter, "hostname '-badhost' "
"failed regex match",
prometheus_helper.PrometheusHelper
)
too_long_hostname = ("a" * 256)
cfg.CONF.prometheus_client.host = too_long_hostname
self.assertRaisesRegex(
exception.InvalidParameter, ("hostname is too long: " +
"'" + too_long_hostname + "'"),
prometheus_helper.PrometheusHelper
)
@mock.patch.object(prometheus_client.PrometheusAPIClient, 'query')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_statistic_aggregation(self, mock_prometheus_get,
mock_prometheus_query):
mock_node = mock.Mock(
uuid='1234',
hostname='marios-env.controlplane.domain')
expected_cpu_usage = 3.2706140350701673
mock_prom_metric = mock.Mock(
labels={'instance': '10.0.1.2:9100'},
timestamp=1731065985.408,
value=expected_cpu_usage
)
mock_prometheus_query.return_value = [mock_prom_metric]
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'fqdn': 'marios-env.controlplane.domain',
'instance': '10.0.1.2:9100', 'job': 'node',
}}]}}
helper = prometheus_helper.PrometheusHelper()
result = helper.statistic_aggregation(
resource=mock_node,
resource_type='compute_node',
meter_name='host_cpu_usage',
period=300,
aggregate='mean',
granularity=300,
)
self.assertEqual(expected_cpu_usage, result)
mock_prometheus_query.assert_called_once_with(
"100 - (avg by (instance)(rate(node_cpu_seconds_total"
"{mode='idle',instance='10.0.1.2:9100'}[300s])) * 100)")
def test_statistic_aggregation_metric_unavailable(self):
self.assertRaisesRegex(
NotImplementedError, 'does not support statistic_series',
self.helper.statistic_series
)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_list_metrics(self, mock_prometheus_get):
expected_metrics = set(
['go_gc_duration_seconds', 'go_gc_duration_seconds_count',
'go_gc_duration_seconds_sum', 'go_goroutines',]
)
mock_prometheus_get.return_value = {
'status': 'success', 'data': [
'go_gc_duration_seconds', 'go_gc_duration_seconds_count',
'go_gc_duration_seconds_sum', 'go_goroutines',
]
}
result = self.helper.list_metrics()
self.assertEqual(expected_metrics, result)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_list_metrics_error(self, mock_prometheus_get):
mock_prometheus_get.side_effect = (
prometheus_client.PrometheusAPIClientError("nope"))
result = self.helper.list_metrics()
self.assertEqual(set(), result)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_check_availability(self, mock_prometheus_get):
mock_prometheus_get.return_value = {
'status': 'success',
'data': {
'startTime': '2024-11-05T12:59:56.962333207Z',
'CWD': '/prometheus', 'reloadConfigSuccess': True,
'lastConfigTime': '2024-11-05T12:59:56Z',
'corruptionCount': 0, 'goroutineCount': 30,
'GOMAXPROCS': 8, 'GOMEMLIMIT': 9223372036854775807,
'GOGC': '75', 'GODEBUG': '', 'storageRetention': '15d'
}
}
result = self.helper.check_availability()
self.assertEqual('available', result)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_prometheus_check_availability_error(self, mock_prometheus_get):
mock_prometheus_get.side_effect = (
prometheus_client.PrometheusAPIClientError("nope"))
result = self.helper.check_availability()
self.assertEqual('not available', result)
def test_get_host_cpu_usage(self):
cpu_use = self.helper.get_host_cpu_usage('someNode', 345, 'mean', 300)
self.assertIsInstance(cpu_use, float)
self.mock_aggregation.assert_called_once_with(
'someNode', 'compute_node', 'host_cpu_usage', period=345,
granularity=300, aggregate='mean')
def test_get_host_cpu_usage_none(self):
self.mock_aggregation.return_value = None
cpu_use = self.helper.get_host_cpu_usage('someNode', 345, 'mean', 300)
self.assertIsNone(cpu_use)
def test_get_host_cpu_usage_max(self):
cpu_use = self.helper.get_host_cpu_usage('theNode', 223, 'max', 100)
self.assertIsInstance(cpu_use, float)
self.mock_aggregation.assert_called_once_with(
'theNode', 'compute_node', 'host_cpu_usage', period=223,
granularity=100, aggregate='min')
def test_get_host_cpu_usage_min(self):
cpu_use = self.helper.get_host_cpu_usage('theNode', 223, 'min', 100)
self.assertIsInstance(cpu_use, float)
self.mock_aggregation.assert_called_once_with(
'theNode', 'compute_node', 'host_cpu_usage', period=223,
granularity=100, aggregate='max')
def test_get_host_ram_usage(self):
ram_use = self.helper.get_host_ram_usage(
'anotherNode', 456, 'mean', 300)
self.assertIsInstance(ram_use, float)
self.mock_aggregation.assert_called_once_with(
'anotherNode', 'compute_node', 'host_ram_usage', period=456,
granularity=300, aggregate='mean')
def test_get_host_ram_usage_none(self):
self.mock_aggregation.return_value = None
ram_use = self.helper.get_host_ram_usage('NOPE', 234, 'mean', 567)
self.assertIsNone(ram_use, float)
self.mock_aggregation.assert_called()
self.mock_aggregation.assert_called_once_with(
'NOPE', 'compute_node', 'host_ram_usage', period=234,
granularity=567, aggregate='mean')
def test_get_host_ram_usage_max(self):
ram_use = self.helper.get_host_ram_usage(
'aNode', 456, 'max', 300)
self.assertIsInstance(ram_use, float)
self.mock_aggregation.assert_called_once_with(
'aNode', 'compute_node', 'host_ram_usage', period=456,
granularity=300, aggregate='min')
def test_get_host_ram_usage_min(self):
ram_use = self.helper.get_host_ram_usage(
'aNode', 456, 'min', 300)
self.assertIsInstance(ram_use, float)
self.mock_aggregation.assert_called_once_with(
'aNode', 'compute_node', 'host_ram_usage', period=456,
granularity=300, aggregate='max')
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_build_prometheus_fqdn_host_instance_map(
self, mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'fqdn': 'foo.controlplane.domain',
'instance': '10.1.2.1:9100', 'job': 'node',
}},
{'labels': {
'fqdn': 'bar.controlplane.domain',
'instance': '10.1.2.2:9100', 'job': 'node',
}},
{'labels': {
'fqdn': 'baz.controlplane.domain',
'instance': '10.1.2.3:9100', 'job': 'node',
}},
]}}
expected_fqdn_map = {'foo.controlplane.domain': '10.1.2.1:9100',
'bar.controlplane.domain': '10.1.2.2:9100',
'baz.controlplane.domain': '10.1.2.3:9100'}
expected_host_map = {'foo': '10.1.2.1:9100',
'bar': '10.1.2.2:9100',
'baz': '10.1.2.3:9100'}
helper = prometheus_helper.PrometheusHelper()
self.assertEqual(helper.prometheus_fqdn_instance_map,
expected_fqdn_map)
self.assertEqual(helper.prometheus_host_instance_map,
expected_host_map)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_missing_prometheus_fqdn_label(self, mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'instance': '10.1.2.1:9100', 'job': 'node',
}},
{'labels': {
'instance': '10.1.2.2:9100', 'job': 'node',
}},
]}}
helper = prometheus_helper.PrometheusHelper()
self.assertEqual({}, helper.prometheus_fqdn_instance_map)
self.assertEqual({}, helper.prometheus_host_instance_map)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_using_hostnames_not_fqdn(self, mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'fqdn': 'ena',
'instance': '10.1.2.1:9100', 'job': 'node',
}},
{'labels': {
'fqdn': 'dyo',
'instance': '10.1.2.2:9100', 'job': 'node',
}},
]}}
helper = prometheus_helper.PrometheusHelper()
expected_fqdn_map = {'ena': '10.1.2.1:9100',
'dyo': '10.1.2.2:9100'}
self.assertEqual(
helper.prometheus_fqdn_instance_map, expected_fqdn_map)
self.assertEqual({}, helper.prometheus_host_instance_map)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_override_prometheus_fqdn_label(self, mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': [
{'labels': {
'custom_fqdn_label': 'foo.controlplane.domain',
'instance': '10.1.2.1:9100', 'job': 'node',
}},
{'labels': {
'custom_fqdn_label': 'bar.controlplane.domain',
'instance': '10.1.2.2:9100', 'job': 'node',
}},
]}}
expected_fqdn_map = {'foo.controlplane.domain': '10.1.2.1:9100',
'bar.controlplane.domain': '10.1.2.2:9100'}
expected_host_map = {'foo': '10.1.2.1:9100',
'bar': '10.1.2.2:9100'}
cfg.CONF.prometheus_client.fqdn_label = 'custom_fqdn_label'
helper = prometheus_helper.PrometheusHelper()
self.assertEqual(helper.prometheus_fqdn_instance_map,
expected_fqdn_map)
self.assertEqual(helper.prometheus_host_instance_map,
expected_host_map)
def test_resolve_prometheus_instance_label(self):
expected_instance_label = '10.0.1.2:9100'
result = self.helper._resolve_prometheus_instance_label(
'marios-env.controlplane.domain')
self.assertEqual(result, expected_instance_label)
result = self.helper._resolve_prometheus_instance_label(
'marios-env')
self.assertEqual(result, expected_instance_label)
@mock.patch.object(prometheus_client.PrometheusAPIClient, '_get')
def test_resolve_prometheus_instance_label_none(self,
mock_prometheus_get):
mock_prometheus_get.return_value = {'data': {'activeTargets': []}}
result = self.helper._resolve_prometheus_instance_label('nope')
self.assertIsNone(result)
mock_prometheus_get.assert_called_once_with("targets?state=active")
def test_build_prometheus_query_node_cpu_avg_agg(self):
expected_query = (
"100 - (avg by (instance)(rate(node_cpu_seconds_total"
"{mode='idle',instance='a_host'}[111s])) * 100)")
result = self.helper._build_prometheus_query(
'avg', 'node_cpu_seconds_total', 'a_host', '111')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_node_cpu_max_agg(self):
expected_query = (
"100 - (max by (instance)(rate(node_cpu_seconds_total"
"{mode='idle',instance='b_host'}[444s])) * 100)")
result = self.helper._build_prometheus_query(
'max', 'node_cpu_seconds_total', 'b_host', '444')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_node_memory_avg_agg(self):
expected_query = (
"(node_memory_MemTotal_bytes{instance='c_host'} - avg_over_time"
"(node_memory_MemAvailable_bytes{instance='c_host'}[555s])) "
"/ 1024 / 1024")
result = self.helper._build_prometheus_query(
'avg', 'node_memory_MemAvailable_bytes', 'c_host', '555')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_node_memory_min_agg(self):
expected_query = (
"(node_memory_MemTotal_bytes{instance='d_host'} - min_over_time"
"(node_memory_MemAvailable_bytes{instance='d_host'}[222s])) "
"/ 1024 / 1024")
result = self.helper._build_prometheus_query(
'min', 'node_memory_MemAvailable_bytes', 'd_host', '222')
self.assertEqual(result, expected_query)
def test_build_prometheus_query_error(self):
self.assertRaisesRegex(
exception.InvalidParameter, 'Cannot process prometheus meter NOPE',
self.helper._build_prometheus_query,
'min', 'NOPE', 'the_host', '222'
)
self.assertRaisesRegex(
exception.InvalidParameter, 'instance_label None, period 333',
self.helper._build_prometheus_query,
'min', 'node_cpu_seconds_total', None, '333'
)
def test_resolve_prometheus_aggregate_vanilla(self):
result = self.helper._resolve_prometheus_aggregate('mean', 'foo')
self.assertEqual(result, 'avg')
result = self.helper._resolve_prometheus_aggregate('count', 'foo')
self.assertEqual(result, 'avg')
result = self.helper._resolve_prometheus_aggregate('max', 'foometric')
self.assertEqual(result, 'max')
result = self.helper._resolve_prometheus_aggregate('min', 'barmetric')
self.assertEqual(result, 'min')
def test_resolve_prometheus_aggregate_unknown(self):
self.assertRaisesRegex(
exception.InvalidParameter, 'Unknown Watcher aggregate NOPE.',
self.helper._resolve_prometheus_aggregate, 'NOPE', 'some_meter')