Merge "Export Prometheus metrics"

This commit is contained in:
Zuul 2021-10-05 17:11:35 +00:00 committed by Gerrit Code Review
commit dfe6daf2e6
11 changed files with 419 additions and 9 deletions

View File

@ -40,6 +40,7 @@ from kuryr_kubernetes import clients
from kuryr_kubernetes.cni import handlers as h_cni
from kuryr_kubernetes.cni import health
from kuryr_kubernetes.cni.plugins import k8s_cni_registry
from kuryr_kubernetes.cni import prometheus_exporter
from kuryr_kubernetes.cni import utils as cni_utils
from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const
@ -57,10 +58,11 @@ ErrInternal = 999
class DaemonServer(object):
def __init__(self, plugin, healthy):
def __init__(self, plugin, healthy, metrics):
self.ctx = None
self.plugin = plugin
self.healthy = healthy
self.metrics = metrics
self.failure_count = multiprocessing.Value('i', 0)
self.application = flask.Flask('kuryr-daemon')
self.application.add_url_rule(
@ -86,6 +88,22 @@ class DaemonServer(object):
data = jsonutils.dumps(template)
return data
def _update_metrics(self, command, error, duration):
"""Add a new metric value to the shared metrics dict"""
params = {}
try:
params = self._prepare_request()
except Exception:
LOG.exception('Exception when reading CNI params.')
return
namespace = params.args.K8S_POD_NAMESPACE
name = params.args.K8S_POD_NAME
name = f'export-{namespace}/{name}'
labels = {'command': command, 'error': error}
with lockutils.lock(name):
self.metrics[name] = {'labels': labels, 'duration': duration}
@cni_utils.measure_time('ADD')
def add(self):
try:
params = self._prepare_request()
@ -138,6 +156,7 @@ class DaemonServer(object):
return data, httplib.ACCEPTED, self.headers
@cni_utils.measure_time('DEL')
def delete(self):
try:
params = self._prepare_request()
@ -215,13 +234,14 @@ class DaemonServer(object):
class CNIDaemonServerService(cotyledon.Service):
name = "server"
def __init__(self, worker_id, registry, healthy):
def __init__(self, worker_id, registry, healthy, metrics):
super(CNIDaemonServerService, self).__init__(worker_id)
self.registry = registry
self.healthy = healthy
self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin(registry,
self.healthy)
self.server = DaemonServer(self.plugin, self.healthy)
self.metrics = metrics
self.server = DaemonServer(self.plugin, self.healthy, self.metrics)
def run(self):
# NOTE(dulek): We might do a *lot* of pyroute2 operations, let's
@ -340,6 +360,37 @@ class CNIDaemonHealthServerService(cotyledon.Service):
self.health_server.run()
class CNIDaemonExporterService(cotyledon.Service):
name = "Prometheus Exporter"
def __init__(self, worker_id, metrics):
super(CNIDaemonExporterService, self).__init__(worker_id)
self.prometheus_exporter = prometheus_exporter.CNIPrometheusExporter()
self.is_running = True
self.metrics = metrics
self.exporter_thread = threading.Thread(
target=self._start_metric_updater)
self.exporter_thread.start()
def _start_metric_updater(self):
while self.is_running:
if self.metrics:
pod_name = list(self.metrics.keys())[0]
with lockutils.lock(pod_name):
labels = self.metrics[pod_name]['labels']
duration = self.metrics[pod_name]['duration']
self.prometheus_exporter.update_metric(labels, duration)
del self.metrics[pod_name]
def terminate(self):
self.is_running = False
if self.exporter_thread:
self.exporter_thread.join()
def run(self):
self.prometheus_exporter.run()
class CNIDaemonServiceManager(cotyledon.ServiceManager):
def __init__(self):
# NOTE(mdulko): Default shutdown timeout is 60 seconds and K8s won't
@ -359,10 +410,12 @@ class CNIDaemonServiceManager(cotyledon.ServiceManager):
self.manager = multiprocessing.Manager()
registry = self.manager.dict() # For Watcher->Server communication.
healthy = multiprocessing.Value(c_bool, True)
metrics = self.manager.dict()
self.add(CNIDaemonWatcherService, workers=1, args=(registry, healthy,))
self._server_service = self.add(
CNIDaemonServerService, workers=1, args=(registry, healthy))
self.add(CNIDaemonServerService, workers=1, args=(registry, healthy,
metrics,))
self.add(CNIDaemonHealthServerService, workers=1, args=(healthy,))
self.add(CNIDaemonExporterService, workers=1, args=(metrics,))
self.register_hooks(on_terminate=self.terminate)
def run(self):

View File

@ -0,0 +1,67 @@
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import flask
import prometheus_client
from prometheus_client.exposition import generate_latest
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
_INF = float("inf")
class CNIPrometheusExporter(object):
"""Provides metrics to Prometheus"""
def __init__(self):
self.application = flask.Flask('prometheus-exporter')
self.ctx = None
self.application.add_url_rule(
'/metrics', methods=['GET'], view_func=self.metrics)
self.headers = {'Connection': 'close'}
self._create_metric()
def update_metric(self, labels, duration):
"""Observes the request duration value and count it in buckets"""
self.cni_requests_duration.labels(**labels).observe(duration)
def metrics(self):
"""Provides the registered metrics"""
collected_metric = generate_latest(self.registry)
return flask.Response(collected_metric, mimetype='text/plain')
def run(self):
address = '::'
try:
LOG.info('Starting CNI Prometheus exporter')
self.application.run(
address, CONF.prometheus_exporter.cni_exporter_port)
except Exception:
LOG.exception('Failed to start Prometheus exporter')
raise
def _create_metric(self):
"""Creates a registry and records a new Histogram metric."""
self.registry = prometheus_client.CollectorRegistry()
metric_name = 'kuryr_cni_request_duration_seconds'
metric_description = 'The duration of CNI requests'
buckets = (5, 10, 15, 20, 25, 30, 40, 50, 60, _INF)
self.cni_requests_duration = prometheus_client.Histogram(
metric_name, metric_description,
labelnames={'command', 'error'}, buckets=buckets,
registry=self.registry)

View File

@ -14,9 +14,12 @@
# under the License.
import functools
import time
from http import client as httplib
from oslo_log import log as logging
PROC_ONE_CGROUP_PATH = '/proc/1/cgroup'
CONTAINER_RUNTIME_CGROUP_IDS = (
'docker', # This is set by docker/moby
@ -24,6 +27,7 @@ CONTAINER_RUNTIME_CGROUP_IDS = (
)
LOG = logging.getLogger(__name__)
SUCCESSFUL_REQUEST_CODE = (httplib.NO_CONTENT, httplib.ACCEPTED)
def running_under_container_runtime(proc_one_cg_path=PROC_ONE_CGROUP_PATH):
@ -86,3 +90,19 @@ def log_ipdb(func):
pass
raise
return with_logging
def measure_time(command):
"""Measures CNI ADD/DEL resquest duration"""
def decorator(method):
def wrapper(obj, *args, **kwargs):
start_time = time.time()
result = method(obj, *args, **kwargs)
cni_request_error = (
result[1] not in SUCCESSFUL_REQUEST_CODE)
obj._update_metrics(
command, cni_request_error, time.time() - start_time)
return result
wrapper.__name__ = method.__name__
return wrapper
return decorator

View File

@ -342,6 +342,15 @@ vhostuser = [
default='/var/run/openvswitch/')
]
prometheus_exporter_opts = [
cfg.IntOpt('controller_exporter_port',
help=_('port for the Controller Prometheus exporter.'),
default=9654),
cfg.IntOpt('cni_exporter_port',
help=_('port for the CNI Prometheus exporter.'),
default=9655)
]
CONF = cfg.CONF
CONF.register_opts(kuryr_k8s_opts)
CONF.register_opts(daemon_opts, group='cni_daemon')
@ -352,6 +361,7 @@ CONF.register_opts(cache_defaults, group='cache_defaults')
CONF.register_opts(nested_vif_driver_opts, group='pod_vif_nested')
CONF.register_opts(sriov_opts, group='sriov')
CONF.register_opts(vhostuser, group='vhostuser')
CONF.register_opts(prometheus_exporter_opts, "prometheus_exporter")
CONF.register_opts(lib_config.core_opts)
CONF.register_opts(lib_config.binding_opts, 'binding')

View File

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from openstack import exceptions as os_exc
from os_vif import objects
from oslo_config import cfg as oslo_cfg
@ -21,6 +23,7 @@ from kuryr_kubernetes import clients
from kuryr_kubernetes import constants
from kuryr_kubernetes.controller.drivers import base as drivers
from kuryr_kubernetes.controller.drivers import utils as driver_utils
from kuryr_kubernetes.controller.managers import prometheus_exporter as exp
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes import utils
@ -111,6 +114,11 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler):
security_groups)
except k_exc.K8sClientException:
raise k_exc.ResourceNotReady(pod['metadata']['name'])
try:
self._record_pod_creation_metric(pod)
except Exception:
LOG.debug("Failed to record metric for pod %s",
pod['metadata']['name'])
if driver_utils.is_network_policy_enabled():
crd_pod_selectors = self._drv_sg.create_sg_rules(pod)
@ -278,6 +286,17 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler):
project_id)
self._drv_lbaas.update_lbaas_sg(service, sgs)
def _record_pod_creation_metric(self, pod):
exporter = exp.ControllerPrometheusExporter.get_instance()
for condition in pod['status'].get('conditions'):
if condition['type'] == 'PodScheduled' and condition['status']:
f_str = "%Y-%m-%dT%H:%M:%SZ"
time_obj = datetime.datetime.strptime(
condition['lastTransitionTime'], f_str)
pod_creation_time = datetime.datetime.now() - time_obj
pod_creation_sec = (pod_creation_time).total_seconds()
exporter.record_pod_creation_metric(pod_creation_sec)
def _get_pod(self, kuryrport_crd):
try:
name = kuryrport_crd['metadata']['name']

View File

@ -0,0 +1,129 @@
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import flask
import netaddr
import prometheus_client
from prometheus_client.exposition import generate_latest
from oslo_config import cfg
from oslo_log import log as logging
from kuryr_kubernetes import clients
from kuryr_kubernetes import config
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
RESOURCES = ('ports', 'subnets', 'networks', 'security_groups',
'security_group_rules')
_NO_QUOTA = 0
_INF = float("inf")
_NO_LIMIT = -1
class ControllerPrometheusExporter(object):
"""Provides metrics to Prometheus"""
instance = None
def __init__(self):
self.application = flask.Flask('prometheus-exporter')
self.ctx = None
self.application.add_url_rule(
'/metrics', methods=['GET'], view_func=self.metrics)
self.headers = {'Connection': 'close'}
self._os_net = clients.get_network_client()
self._project_id = config.CONF.neutron_defaults.project
self._create_metrics()
def metrics(self):
"""Provides the registered metrics"""
self._record_quota_free_count_metric()
self._record_ports_quota_per_subnet_metric()
collected_metric = generate_latest(self.registry)
return flask.Response(collected_metric, mimetype='text/plain')
def record_pod_creation_metric(self, duration):
"""Records pod creation duration to the registry"""
self.pod_creation_latency.observe(duration)
@classmethod
def get_instance(cls):
if not ControllerPrometheusExporter.instance:
ControllerPrometheusExporter.instance = cls()
return ControllerPrometheusExporter.instance
def run(self):
address = '::'
try:
LOG.info('Starting Prometheus exporter')
self.application.run(
address, CONF.prometheus_exporter.controller_exporter_port)
except Exception:
LOG.exception('Failed to start Prometheus exporter')
raise
def _record_quota_free_count_metric(self):
"""Records Network resources availability to the registry"""
quota = self._os_net.get_quota(quota=self._project_id, details=True)
for resource in RESOURCES:
resource_quota = quota[resource]
labels = {'resource': resource}
quota_limit = resource_quota['limit']
if quota_limit == _NO_LIMIT:
self.quota_free_count.labels(**labels).set(quota_limit)
continue
quota_used = resource_quota['used']
availability = quota_limit - quota_used
if availability >= _NO_QUOTA:
self.quota_free_count.labels(**labels).set(availability)
def _record_ports_quota_per_subnet_metric(self):
"""Records the ports quota per subnet to the registry"""
subnets = self._os_net.subnets(project_id=self._project_id)
namespace_prefix = 'ns/'
for subnet in subnets:
if namespace_prefix not in subnet.name:
continue
total_num_addresses = 0
ports_availability = 0
for allocation in subnet.allocation_pools:
total_num_addresses += netaddr.IPRange(
netaddr.IPAddress(allocation['start']),
netaddr.IPAddress(allocation['end'])).size
ports_count = len(list(self._os_net.ports(
network_id=subnet.network_id,
project_id=self._project_id)))
labels = {'subnet_id': subnet.id, 'subnet_name': subnet.name}
ports_availability = total_num_addresses-ports_count
self.port_quota_per_subnet.labels(**labels).set(ports_availability)
def _create_metrics(self):
"""Creates a registry and records a new Gauge metric"""
self.registry = prometheus_client.CollectorRegistry()
self.quota_free_count = prometheus_client.Gauge(
'kuryr_quota_free_count', 'Amount of quota available'
' for the network resource', labelnames={'resource'},
registry=self.registry)
self.port_quota_per_subnet = prometheus_client.Gauge(
'kuryr_port_quota_per_subnet', 'Amount of ports available'
' on Subnet', labelnames={'subnet_id', 'subnet_name'},
registry=self.registry)
buckets = (10, 20, 30, 40, 50, 60, _INF)
self.pod_creation_latency = prometheus_client.Histogram(
'kuryr_pod_creation_latency', 'Time taken for a pod to have'
' Kuryr annotations set', buckets=buckets, registry=self.registry)

View File

@ -12,7 +12,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import sys
@ -30,6 +29,7 @@ from kuryr_kubernetes import config
from kuryr_kubernetes.controller.drivers import base as drivers
from kuryr_kubernetes.controller.handlers import pipeline as h_pipeline
from kuryr_kubernetes.controller.managers import health
from kuryr_kubernetes.controller.managers import prometheus_exporter as exp
from kuryr_kubernetes import objects
from kuryr_kubernetes import utils
from kuryr_kubernetes import watcher
@ -85,6 +85,7 @@ class KuryrK8sService(service.Service, periodic_task.PeriodicTasks,
pipeline = h_pipeline.ControllerPipeline(self.tg)
self.watcher = watcher.Watcher(pipeline, self.tg)
self.health_manager = health.HealthServer()
self.exporter = exp.ControllerPrometheusExporter.get_instance()
self.current_leader = None
self.node_name = utils.get_node_name()
@ -112,7 +113,8 @@ class KuryrK8sService(service.Service, periodic_task.PeriodicTasks,
f = functools.partial(self.run_periodic_tasks, None)
self.tg.add_timer(1, f)
self.health_manager.run()
self.tg.add_thread(self.exporter.run)
self.tg.add_thread(self.health_manager.run)
LOG.info("Service '%s' started", self.__class__.__name__)
@periodic_task.periodic_task(spacing=5, run_immediately=True)

View File

@ -31,11 +31,14 @@ class TestDaemonServer(base.TestCase):
self.k8s_mock = self.useFixture(kuryr_fixtures.MockK8sClient())
self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin({}, healthy)
self.health_registry = mock.Mock()
self.srv = service.DaemonServer(self.plugin, self.health_registry)
self.metrics = dict()
self.srv = service.DaemonServer(
self.plugin, self.health_registry, self.metrics)
self.srv.application.testing = True
self.test_client = self.srv.application.test_client()
params = {'config_kuryr': {}, 'CNI_ARGS': 'foo=bar',
cni_args = 'foo=bar;K8S_POD_NAMESPACE=test;K8S_POD_NAME=test'
params = {'config_kuryr': {}, 'CNI_ARGS': cni_args,
'CNI_CONTAINERID': 'baz', 'CNI_COMMAND': 'ADD'}
self.params_str = jsonutils.dumps(params)

View File

@ -0,0 +1,105 @@
# Copyright 2021 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import munch
import prometheus_client
from unittest import mock
from kuryr_kubernetes.controller.managers import prometheus_exporter
from kuryr_kubernetes.tests import base
from kuryr_kubernetes.tests.unit import kuryr_fixtures as k_fix
def get_quota_obj():
return {
'subnets': {
'used': 50,
'limit': 100,
'reserved': 0
},
'networks': {
'used': 50,
'limit': 100,
'reserved': 0
},
'security_group_rules': {
'used': 50,
'limit': 100,
'reserved': 0
},
'security_groups': {
'used': 5,
'limit': 10,
'reserved': 0
},
'ports': {
'used': 250,
'limit': 500,
'reserved': 0
}
}
class TestControllerPrometheusExporter(base.TestCase):
def setUp(self):
super(TestControllerPrometheusExporter, self).setUp()
self.cls = prometheus_exporter.ControllerPrometheusExporter
self.srv = mock.MagicMock(spec=self.cls)
self.srv.quota_free_count = mock.MagicMock(
spec=prometheus_client.Gauge)
self.srv.port_quota_per_subnet = mock.MagicMock(
spec=prometheus_client.Gauge)
self.srv._project_id = mock.sentinel.project_id
self.srv._os_net = self.useFixture(k_fix.MockNetworkClient()).client
def test__record_quota_free_count_metric(self):
quota = get_quota_obj()
self.srv._os_net.get_quota.return_value = quota
self.cls._record_quota_free_count_metric(self.srv)
calls = []
for resource in prometheus_exporter.RESOURCES:
calls.extend(
[mock.call(**{'resource': resource}),
mock.call().set(
quota[resource]['limit']-quota[resource]['used'])])
self.srv.quota_free_count.labels.assert_has_calls(calls)
def test__record_no_quota_free_count_metric(self):
quota = get_quota_obj()
for resource in quota:
quota[resource]['used'] = quota[resource]['limit']
self.srv._os_net.get_quota.return_value = quota
self.cls._record_quota_free_count_metric(self.srv)
calls = []
for resource in prometheus_exporter.RESOURCES:
calls.extend(
[mock.call(**{'resource': resource}),
mock.call().set(0)])
self.srv.quota_free_count.labels.assert_has_calls(calls)
def test__record_ports_quota_per_subnet_metric(self):
subnet_id = mock.sentinel.id
subnet_name = 'ns/cluster-version-net'
network_id = mock.sentinel.network_id
subnets = [munch.Munch(
{'id': subnet_id, 'name': subnet_name,
'network_id': network_id, 'allocation_pools': [
{'start': '10.128.70.2', 'end': '10.128.71.254'}]})]
ports = [mock.MagicMock()]
self.srv._os_net.subnets.return_value = subnets
self.srv._os_net.ports.return_value = ports
self.cls._record_ports_quota_per_subnet_metric(self.srv)
self.srv.port_quota_per_subnet.labels.assert_called_with(
**{'subnet_id': subnet_id, 'subnet_name': subnet_name})
self.srv.port_quota_per_subnet.labels().set.assert_called_with(508)

View File

@ -80,6 +80,7 @@ pbr==2.0.0
pika==0.10.0
pika-pool==0.1.3
prettytable==0.7.2
prometheus_client==0.6.0
protobuf==3.6.0
psutil==5.4.3
pycparser==2.18

View File

@ -24,3 +24,4 @@ retrying!=1.3.0,>=1.2.3 # Apache-2.0
stevedore>=1.20.0 # Apache-2.0
grpcio>=1.12.0 # Apache-2.0
protobuf>=3.6.0 # 3-Clause BSD
prometheus_client>=0.6.0 # Apache-2.0