Add prometheus client plugin

The plugin can be configured two ways:

Scrape metrics from a configured set of endpoints that were set up
via prometheus client libraries.

Attempt to autodetect running endpoints we should be monitoring in
a kubernetes environment by either looking at running services or
pods and evaluate their annotations to see if it is set for scraping.

Change-Id: I6d174daa705aff23fbfece9de759a1718a91050f
This commit is contained in:
Michael James Hoppal 2017-02-08 09:13:58 -07:00 committed by Kaiyan Sheng
parent e04f5df4ad
commit dc634a73b8
5 changed files with 406 additions and 67 deletions

View File

@ -0,0 +1,20 @@
# (C) Copyright 2017 Hewlett Packard Enterprise Development LP
init_config:
timeout: 3
auto_detect_endpoints: True
# Detection method can be either service or pod. Default is pod
detect_method: "pod"
instances:
# If configuring each metric_endpoint
- metric_endpoint: "http://127.0.0.1:8000"
# Dimensions to add to every metric coming out of the plugin
default_dimensions:
app: my_app
- metric_endpoint: "http://127.0.0.1:9000"
# Kubernetes labels to match on when using auto detection in a Kubernetes environment.
# There can only be one instance when auto_detection_endpoints is set to true
- kubernetes_labels: ['app']

View File

@ -73,6 +73,7 @@
- [Postfix Checks](#postfix-checks)
- [PostgreSQL](#postgresql)
- [Process Checks](#process-checks)
- [Prometheus](#prometheus)
- [RabbitMQ Checks](#rabbitmq-checks)
- [RedisDB](#redisdb)
- [Riak](#riak)
@ -176,6 +177,7 @@ The following plugins are delivered via setup as part of the standard plugin che
| postfix | | Provides metrics on the number of messages in a given postfix queue|
| postgres | | |
| process | | |
| prometheus | | |
| rabbitmq | /root/.rabbitmq.cnf |
| redisdb | | |
| riak | | |
@ -1866,7 +1868,7 @@ monasca-setup -d ProcessCheck -a "conf_file_path=/home/stack/myprocess.yaml"
```
Example yaml input file format for process check by process names:
```
---
process_config:
- process_names:
- monasca-notification
@ -1876,7 +1878,7 @@ process_config:
```
Example yaml input file format for multiple process_names entries:
```
---
process_config:
- process_names:
- monasca-notification
@ -1942,6 +1944,72 @@ The process checks return the following metrics ( if detailed is set to true, ot
On Linux, if the Agent is not run as root or the owner of the process the io metrics and the open_file_descriptors metric will fail to be reported if the mon-agent user does not have permission to get it for the process.
## Prometheus Client
This plugin is for scraping metrics from endpoints that are created by prometheus client libraries - https://prometheus.io/docs/instrumenting/clientlibs/
It can be configured in two ways. One being manually setting all the endpoints that you want to scrape. The other being
running in a Kubernetes environment where we autodetect on either services or pods based on annotations set.
### Manually Configuring Endpoints
In this instance the plugin goes to a configured list of prometheus client endpoints and scrapes the posted metrics from each.
When configuring each endpoint you can define a set of dimensions that is attached to each metric being scraped.
By default we grab the defined labels on each metric as dimensions.
Example yaml file:
```
init_config:
# Timeout on connections to each endpoint
timeout: 3
instances:
- metric_endpoint: "http://127.0.0.1:8000"
# Dimensions to add to every metric coming out of the plugin
default_dimensions:
app: my_app
- metric_endpoint: "http://127.0.0.1:9000"
```
### Running in a Kubernetes Environment with autodetection
There are two ways for the autodetection to be set up. One for auto detecting based on pods and the other auto detecting
for services. In both cases it is looking for the annotations set for the Kubernetes service or pod.
The annotations the plugin is looking for are -
* prometheus.io/scrape: Only scrape pods that have a value of 'true'
* prometheus.io/path: If the metrics path is not '/metrics' override this.
* prometheus.io/port: Scrape the pod on the indicated port instead of the default of '9102'.
These annotations are pulled from the Kubelet for pod autodetection and the Kubernetes API for the service auto detection
There is also configuration parameter of "kubernetes_labels" where it will look for Kubernetes tags to use as dimensions
for metrics coming out. By default that will be set to "app"
Example yaml file (by pod):
```
init_config:
timeout: 3
auto_detect_endpoints: True
detect_method: "pod"
instances:
- kubernetes_labels: ['app']
```
Example yaml file (by service):
```
init_config:
timeout: 3
auto_detect_endpoints: True
detect_method: "service"
instances:
- kubernetes_labels: ['app']
```
**NOTE** This Plugin can only have one configured instance
## RabbitMQ Checks
This section describes the RabbitMQ check that can be performed by the Agent. The RabbitMQ check gathers metrics on Nodes, Exchanges and Queues from the rabbit server. The RabbitMQ check requires a configuration file called rabbitmq.yaml to be available in the agent conf.d configuration directory. The config file must contain the names of the Exchanges and Queues that you are interested in monitoring.

View File

@ -1,6 +1,7 @@
# (C) Copyright 2015,2017 Hewlett Packard Enterprise Development LP
import base64
import json
import logging
import math
from numbers import Number
@ -745,3 +746,70 @@ class DynamicCheckHelper(object):
metric = re.sub(r"_\.", ".", metric)
return metric
def get_pod_dimensions(kubernetes_connector, pod_metadata, kubernetes_labels):
pod_name = pod_metadata['name']
pod_dimensions = {'pod_name': pod_name, 'namespace': pod_metadata['namespace']}
if "labels" in pod_metadata:
pod_labels = pod_metadata['labels']
for label in kubernetes_labels:
if label in pod_labels:
pod_dimensions[label] = pod_labels[label]
# Get owner of pod to set as a dimension
# Try to get from pod owner references
pod_owner_references = pod_metadata.get('ownerReferences', None)
if pod_owner_references:
try:
if len(pod_owner_references) > 1:
log.warn("More then one owner for pod {}".format(pod_name))
pod_owner_reference = pod_owner_references[0]
pod_owner_type = pod_owner_reference['kind']
pod_owner_name = pod_owner_reference['name']
_set_pod_owner_dimension(kubernetes_connector, pod_dimensions, pod_owner_type, pod_owner_name)
except Exception:
log.info("Could not get pod owner from ownerReferences for pod {}".format(pod_name))
# Try to get owner from annotations
else:
try:
pod_created_by = json.loads(pod_metadata['annotations']['kubernetes.io/created-by'])
pod_owner_type = pod_created_by['reference']['kind']
pod_owner_name = pod_created_by['reference']['name']
_set_pod_owner_dimension(kubernetes_connector, pod_dimensions, pod_owner_type, pod_owner_name)
except Exception:
log.info("Could not get pod owner from annotations for pod {}".format(pod_name))
return pod_dimensions
def _get_deployment_name(kubernetes_connector, pod_owner_name, pod_namespace):
replica_set_endpoint = "/apis/extensions/v1beta1/namespaces/{}/replicasets/{}".format(pod_namespace, pod_owner_name)
try:
replica_set = kubernetes_connector.get_request(replica_set_endpoint)
replica_set_annotations = replica_set['metadata']['annotations']
if "deployment.kubernetes.io/revision" in replica_set_annotations:
return "-".join(pod_owner_name.split("-")[:-1])
except Exception as e:
log.warn("Could not connect to api to get replicaset data - {}".format(e))
return None
return None
def _set_pod_owner_dimension(kubernetes_connector, pod_dimensions, pod_owner_type, pod_owner_name):
if pod_owner_type == "ReplicationController":
pod_dimensions['replication_controller'] = pod_owner_name
elif pod_owner_type == "ReplicaSet":
if not kubernetes_connector:
log.error("Can not set deployment name as connection information to API is not set. "
"Setting ReplicaSet as dimension")
deployment_name = None
else:
deployment_name = _get_deployment_name(kubernetes_connector, pod_owner_name, pod_dimensions['namespace'])
if not deployment_name:
pod_dimensions['replica_set'] = pod_owner_name
else:
pod_dimensions['deployment'] = deployment_name
elif pod_owner_type == "DaemonSet":
pod_dimensions['daemon_set'] = pod_owner_name
else:
log.info("Unsupported pod owner kind {} as a dimension for pod {}".format(pod_owner_type,
pod_dimensions))

View File

@ -172,7 +172,8 @@ class Kubernetes(checks.AgentCheck):
# Pod does not have any containers assigned to it no-op going to next pod
continue
pod_dimensions = dimensions.copy()
pod_dimensions.update(self._get_pod_dimensions(pod['metadata'], kubernetes_labels))
pod_dimensions.update(utils.get_pod_dimensions(self.kubernetes_connector, pod['metadata'],
kubernetes_labels))
pod_key = pod_dimensions['pod_name'] + pod_dimensions['namespace']
pod_dimensions_map[pod_key] = pod_dimensions
pod_retry_count = 0
@ -243,70 +244,6 @@ class Kubernetes(checks.AgentCheck):
return cpu / 1000
return float(cpu_string)
def _get_pod_dimensions(self, pod_metadata, kubernetes_labels):
pod_name = pod_metadata['name']
pod_dimensions = {'pod_name': pod_name, 'namespace': pod_metadata['namespace']}
if "labels" in pod_metadata:
pod_labels = pod_metadata['labels']
for label in kubernetes_labels:
if label in pod_labels:
pod_dimensions[label] = pod_labels[label]
# Get owner of pod to set as a dimension
# Try to get from pod owner references
pod_owner_references = pod_metadata.get('ownerReferences', None)
if pod_owner_references:
try:
if len(pod_owner_references) > 1:
self.log.warn("More then one owner for pod {}".format(pod_name))
pod_owner_reference = pod_owner_references[0]
pod_owner_type = pod_owner_reference['kind']
pod_owner_name = pod_owner_reference['name']
self._set_pod_owner_dimension(pod_dimensions, pod_owner_type, pod_owner_name)
except Exception:
self.log.info("Could not get pod owner from ownerReferences for pod {}".format(pod_name))
# Try to get owner from annotations
else:
try:
pod_created_by = json.loads(pod_metadata['annotations']['kubernetes.io/created-by'])
pod_owner_type = pod_created_by['reference']['kind']
pod_owner_name = pod_created_by['reference']['name']
self._set_pod_owner_dimension(pod_dimensions, pod_owner_type, pod_owner_name)
except Exception:
self.log.info("Could not get pod owner from annotations for pod {}".format(pod_name))
return pod_dimensions
def _get_deployment_name(self, pod_owner_name, pod_namespace):
replica_set_endpoint = "/apis/extensions/v1beta1/namespaces/{}" \
"/replicasets/{}".format(pod_namespace,
pod_owner_name)
try:
replica_set = self.kubernetes_connector.get_request(replica_set_endpoint)
replica_set_annotations = replica_set['metadata']['annotations']
if "deployment.kubernetes.io/revision" in replica_set_annotations:
return "-".join(pod_owner_name.split("-")[:-1])
except Exception as e:
self.log.warn("Could not connect to api to get replicaset data - {}".format(e))
def _set_pod_owner_dimension(self, pod_dimensions, pod_owner_type, pod_owner_name):
if pod_owner_type == "ReplicationController":
pod_dimensions['replication_controller'] = pod_owner_name
elif pod_owner_type == "ReplicaSet":
if not self.kubernetes_connector:
self.log.error("Can not set deployment name as connection information to API is not set."
" Setting ReplicaSet as dimension")
deployment_name = None
else:
deployment_name = self._get_deployment_name(pod_owner_name, pod_dimensions['namespace'])
if not deployment_name:
pod_dimensions['replica_set'] = pod_owner_name
else:
pod_dimensions['deployment'] = deployment_name
elif pod_owner_type == "DaemonSet":
pod_dimensions['daemon_set'] = pod_owner_name
else:
self.log.info("Unsupported pod owner kind {} as a dimension for"
" pod {}".format(pod_owner_type, pod_dimensions))
def _send_metrics(self, metric_name, value, dimensions, metric_types,
metric_units):
for metric_type in metric_types:

View File

@ -0,0 +1,246 @@
# (C) Copyright 2017 Hewlett Packard Enterprise Development LP
import math
import requests
import six
from prometheus_client.parser import text_string_to_metric_families
import monasca_agent.collector.checks as checks
import monasca_agent.collector.checks.utils as utils
KUBERNETES_LABELS = ['app']
class Prometheus(checks.AgentCheck):
"""Scrapes metrics from Prometheus endpoints
Can be configured three ways:
1. Autodetect endpoints by pod annotations
2. Autodetect endpoints by services
3. Manually configure each prometheus endpoints to scrape
We autodetect based on the annotations assigned to pods/services.
We look for the following entries:
'prometheus.io/scrape': Only scrape pods that have a value of 'true'
'prometheus.io/path': If the metrics path is not '/metrics' override this.
'prometheus.io/port': Scrape the pod on the indicated port instead of the default of '9102'.
"""
def __init__(self, name, init_config, agent_config, instances=None):
super(Prometheus, self).__init__(name, init_config, agent_config, instances)
self.connection_timeout = init_config.get("timeout", 3)
self.auto_detect_endpoints = init_config.get("auto_detect_endpoints", False)
if self.auto_detect_endpoints:
self.kubernetes_connector = None
self.detect_method = init_config.get("detect_method", "pod").lower()
self.kubelet_url = None
if instances is not None and len(instances) > 1:
raise Exception('Prometheus Client only supports one configured instance if auto detection is set')
if self.detect_method not in ['pod', 'service']:
raise Exception('Invalid detect method {}. Must be either pod or service')
def check(self, instance):
dimensions = self._set_dimensions(None, instance)
del dimensions['hostname']
if not self.auto_detect_endpoints:
metric_endpoint = instance.get("metric_endpoint", None)
if not metric_endpoint:
self.log.error("metric_endpoint must be defined for each instance")
return
endpoint_dimensions = instance.get("default_dimensions", {})
endpoint_dimensions.update(dimensions)
self.report_endpoint_metrics(metric_endpoint, endpoint_dimensions)
else:
self.kubernetes_labels = instance.get('kubernetes_labels', KUBERNETES_LABELS)
if not self.kubernetes_connector:
self.kubernetes_connector = utils.KubernetesConnector(self.connection_timeout)
if self.detect_method == "pod":
if not self.kubelet_url:
try:
host = self.kubernetes_connector.get_agent_pod_host()
self.kubelet_url = "http://{}:10255/pods".format(host)
except Exception as e:
self.log.error("Could not obtain current host from Kubernetes API {}. "
"Skipping check".format(e))
return
metric_endpoints = self._get_metric_endpoints_by_pod(dimensions)
# Detect by service
else:
metric_endpoints = self._get_metric_endpoints_by_service(dimensions)
for metric_endpoint, endpoint_dimensions in six.iteritems(metric_endpoints):
endpoint_dimensions.update(dimensions)
self.report_endpoint_metrics(metric_endpoint, endpoint_dimensions)
def _get_metric_endpoints_by_pod(self, dimensions):
scrape_endpoints = {}
# Grab running pods from local Kubelet
try:
pods = requests.get(self.kubelet_url, timeout=self.connection_timeout).json()
except Exception as e:
exception_message = "Could not get pods from local kubelet with error - {}".format(e)
self.log.exception(exception_message)
raise Exception(exception_message)
# Iterate through each pod and check if it contains a scrape endpoint
for pod in pods['items']:
try:
pod_metadata = pod['metadata']
pod_spec = pod['spec']
pod_status = pod['status']
if "annotations" not in pod_metadata or not ('containers' in pod_spec and 'podIP' in pod_status):
# No annotations, containers, or endpoints skipping pod
continue
# Check pod annotations if we should scrape pod
pod_annotations = pod_metadata['annotations']
prometheus_scrape = pod_annotations.get("prometheus.io/scrape", "false").lower()
if prometheus_scrape != "true":
continue
pod_ports = []
pod_containers = pod_spec['containers']
for container in pod_containers:
if "ports" in container:
pod_ports += container['ports']
pod_name = pod_metadata['name']
endpoints = self._get_prometheus_endpoint(pod_annotations, pod_ports, pod_name)
if not endpoints:
continue
# Add pod endpoint to scrape endpoints
pod_ip = pod_status['podIP']
# Loop through list of ports and build list of endpoints
pod_dimensions = dimensions.copy()
pod_dimensions.update(utils.get_pod_dimensions(
self.kubernetes_connector, pod['metadata'],
self.kubernetes_labels))
for endpoint in endpoints:
scrape_endpoint = "http://{}:{}".format(pod_ip, endpoint)
scrape_endpoints[scrape_endpoint] = pod_dimensions
self.log.info("Detected pod endpoint - {} with metadata "
"of {}".format(scrape_endpoint,
pod_dimensions))
except Exception as e:
self.log.warn("Error parsing {} to detect for scraping - {}".format(pod, e))
continue
return scrape_endpoints
def _get_metric_endpoints_by_service(self, dimensions):
scrape_endpoints = {}
# Grab services from Kubernetes API
try:
services = self.kubernetes_connector.get_request("/api/v1/services")
except Exception as e:
exception_message = "Could not get services from Kubernetes API with error - {}".format(e)
self.log.exception(exception_message)
raise Exception(exception_message)
# Iterate through each service and check if it is a scape endpoint
for service in services['items']:
service_metadata = service['metadata']
service_spec = service['spec']
if "annotations" not in service_metadata or "ports" not in service_spec:
# No annotations or pods skipping service
continue
# Check service annotations if we should scrape service
service_annotations = service_metadata['annotations']
prometheus_scrape = service_annotations.get("prometheus.io/scrape", "false").lower()
if prometheus_scrape != "true":
continue
service_name = service_metadata['name']
service_ports = service_spec['ports']
endpoints = self._get_prometheus_endpoint(service_annotations,
service_ports,
service_name)
if not endpoints:
continue
# Add service endpoint to scrape endpoints
cluster_ip = service_spec['clusterIP']
service_dimensions = dimensions.copy()
service_dimensions.update(
self._get_service_dimensions(service_metadata))
for endpoint in endpoints:
scrape_endpoint = "http://{}:{}".format(cluster_ip, endpoint)
scrape_endpoints[scrape_endpoint] = service_dimensions
self.log.info("Detected service endpoint - {} with metadata "
"of {}".format(scrape_endpoint,
service_dimensions))
return scrape_endpoints
def _get_service_dimensions(self, service_metadata):
service_dimensions = {'service_name': service_metadata['name'],
'namespace': service_metadata['namespace']}
if "labels" in service_metadata:
service_labels = service_metadata['labels']
for label in self.kubernetes_labels:
if label in service_labels:
service_dimensions[label] = service_labels[label]
return service_dimensions
def _get_prometheus_endpoint(self, annotations, ports, name):
"""Analyzes annotations and ports to generate a scrape target"""
pod_index = "containerPort" if self.detect_method == "pod" else "port"
configured_ports = []
if "prometheus.io/port" in annotations:
configured_ports = annotations.get("prometheus.io/port").split(',')
configured_ports = [int(i) for i in configured_ports]
if self.detect_method == "pod" and not configured_ports:
configured_ports = [9102]
prometheus_endpoint = annotations.get("prometheus.io/path", "/metrics")
endpoints = []
for port in ports:
for configured_port in configured_ports:
if port[pod_index] == configured_port:
# Build up list of ports and prometheus endpoints to return
endpoints += "{}/{}".format(configured_port,
prometheus_endpoint)
if len(ports) == 1 and not endpoints:
self.log.info("Could not find matching port using only port "
"configured")
endpoints += "{}/{}".format(ports[pod_index], prometheus_endpoint)
if not endpoints:
self.log.error("Can not derive which port to use. Due to more "
"then one port configured and none of them "
"selected via configurations. {} {} skipped for "
"scraping".format(self.detect_method, name))
return endpoints
def _send_metrics(self, metric_families, dimensions):
for metric_family in metric_families:
for metric in metric_family.samples:
metric_dimensions = dimensions.copy()
metric_name = metric[0]
metric_labels = metric[1]
metric_value = float(metric[2])
if math.isnan(metric_value):
self.log.debug('filtering out NaN value provided for metric %s{%s}', metric_name, metric_labels)
continue
# remove empty string dimensions from prometheus labels
for dim_key, dim_value in metric_labels.items():
if len(dim_value) > 0:
metric_dimensions[dim_key] = dim_value
self.gauge(metric_name, metric_value, dimensions=metric_dimensions, hostname="SUPPRESS")
def report_endpoint_metrics(self, metric_endpoint, endpoint_dimensions):
# Hit metric endpoint
try:
result = requests.get(metric_endpoint, timeout=self.connection_timeout)
except Exception as e:
self.log.error("Could not get metrics from {} with error {}".format(metric_endpoint, e))
else:
result_content_type = result.headers['Content-Type']
if "text/plain" in result_content_type:
try:
metric_families = text_string_to_metric_families(result.text)
self._send_metrics(metric_families, endpoint_dimensions)
except Exception as e:
self.log.error("Error parsing data from {} with error {}".format(metric_endpoint, e))
else:
self.log.error("Unsupported content type - {}".format(result_content_type))