Add Kubernetes API Plugin

Plugin that connects to the Kubernetes API to gather metrics
about the Kubernetes environment.

Taken from original review https://review.openstack.org/#/c/391559/

Change-Id: Ifff9285e9a2ac06d59383b986619ee62c59c712e
This commit is contained in:
Michael James Hoppal 2017-01-31 10:32:30 -07:00 committed by Kaiyan Sheng
parent 75ba59793a
commit 7ffae8e9fc
4 changed files with 440 additions and 0 deletions

View File

@ -0,0 +1,18 @@
# (C) Copyright 2017 Hewlett Packard Enterprise Development LP
init_config:
# Timeout on GET requests endpoints
connection_timeout: 3
instances:
# There is two options for connecting to the api. Either by passing in the host the api is running on and the port
# it is bound to or by deriving the api url from the kubernetes environment variables (if the agent is running in
# a kubernetes container. You must set one or the other.
- host: "127.0.0.1"
# Port of kubernetes master to connect to defaults to 8080
# kubernetes_api_port: 8080
# Derive kubernetes api url from kubernetes environmental variables.
# derive_api_url: True
# Set of kubernetes labels that we search for in the kubernetes metadata to set as dimensions
# kubernetes_labels: ['k8s-app', 'version']

View File

@ -57,6 +57,7 @@
- [The monasca.json_plugin.status Metric](#the-monascajson_pluginstatus-metric)
- [Kafka Checks](#kafka-checks)
- [Kubernetes](#kubernetes)
- [Kubernetes API](#kubernetes_api)
- [KyotoTycoon](#kyototycoon)
- [Libvirt VM Monitoring](#libvirt-vm-monitoring)
- [Open vSwitch Neutron Router Monitoring](#open-vswitch-neutron-router-monitoring)
@ -155,6 +156,7 @@ The following plugins are delivered via setup as part of the standard plugin che
| kafka_consumer | | |
| kibana | **kibana_install_dir**/kibana.yml | Integration to Kibana |
| kubernetes | | |
| kubernetes_api | | |
| kyototycoon | | |
| libvirt | | |
| lighttpd | | |
@ -1303,6 +1305,18 @@ Sample configs:
Without custom labels and host being manually set:
## Kubernetes_API
This plugin collects metrics from the kubernetes api on kubernetes components, nodes, deployments and replication controllers.
When setting the kubernetes configuration there is a parameter "kubernetes_labels" where it will look for kubernetes tags that are user defined to use as dimensions for replication controller and deployment metrics.
There are two ways you can configure the plugin to connect to the kubernetes api. Either by setting the host and port or by setting the derive_api_url to True. If deriving the plugin sets the kubernetes api url by looking at the environment variables. (This should be used if the agent is running in a kubernetes container)
Sample configs:
Without custom labels:
```
init_config:
# Timeout on GET requests
@ -1316,6 +1330,13 @@ instances:
```
With custom labels and host being manually set:
instances:
# Set to the host that the plugin will use when connecting to the Kubernetes API
- host: "127.0.0.1"
kubernetes_api_port: 8080
```
With custom labels:
```
init_config:
@ -1331,6 +1352,14 @@ instances:
```
With custom labels and derive host being set:
instances:
# Set to the host that the plugin will use when connecting to the Kubernetes API
- host: "127.0.0.1"
kubernetes_api_port: 8080
kubernetes_labels: ['k8s-app', 'version']
```
With custom labels and derive api url set to True:
```
init_config:
@ -1461,6 +1490,36 @@ Pod Phase Mapping:
| 3 | Failed |
| 4 | Unknown |
instances:
- derive api url: True
kubernetes_labels: ['k8s-app', 'version']
```
Note this plugin only supports one instance in the config file.
Metrics (Note for replication controller and deployment metrics they can also have custom dimensions set from the configuration option 'kubernetes_labels')
| Metric Name | Dimensions | Semantics |
| ----------- | ---------- | --------- |
| kubernetes.api.health_status | | Health status of the api
| kubernetes.component_status | component_name | Status of cluster's components
| kubernetes.node.out_of_disk | hostname | The node is out of disk
| kubernetes.node.memory_pressure | hostname | Available memory on the node has satisfied an eviction threshold
| kubernetes.node.disk_pressure | hostname | Available disk space and inodes on either the nodes root filesystem or image filesystem has satisfied an eviction threshold
| kubernetes.node.ready_status | hostname | The ready status of the kubernetes node
| kubernetes.node.allocatable.memory_bytes | hostname, unit | Total allocatable memory in bytes available for scheduling on the node
| kubernetes.node.allocatable.cpu | hostname, unit | Total allocatable cpu cores available for scheduling on the node
| kubernetes.node.allocatable.pods | hostname | Total allocatable pods available for scheduling on the node
| kubernetes.node.capacity.memory_bytes | hostname, unit | Total memory on the node
| kubernetes.node.capacity.cpu | hostname, unit | Total amount of cpu cores on the node
| kubernetes.node.capacity.pods | hostname | Total amount of pods that could be run on the node
| kubernetes.deployment.available_replicas | deployment, namespace | The number of available replicas for the deployment
| kubernetes.deployment.replicas | deployment, namespace | The number of replicas for the deployment
| kubernetes.deployment.unavailable_replicas | deployment, namespace | The number of unavailable replicas for the deployment
| kubernetes.deployment.updated_replicas | deployment, namespace | The number of updated replicas for the deployment
| kubernetes.replication.controller.ready_replicas | replication_controller, namespace | The number of ready replicas for the replication controller
| kubernetes.replication.controller.replicas | replication_controller, namespace | The number of replicas for the replication controller
## KyotoTycoon
See [the example configuration](https://github.com/openstack/monasca-agent/blob/master/conf.d/kyototycoon.yaml.example) for how to configure the KyotoTycoon plugin.

View File

@ -0,0 +1,216 @@
# (C) Copyright 2017 Hewlett Packard Enterprise Development LP
import requests
from monasca_agent.collector import checks
from monasca_agent.collector.checks import utils
DEFAULT_TIMEOUT = 5
NODE_CONDITIONS_MAP = {
"OutOfDisk": {
"metric_name": "node.out_of_disk",
"expected_status": "False"
},
"MemoryPressure": {
"metric_name": "node.memory_pressure",
"expected_status": "False"
},
"DiskPressure": {
"metric_name": "node.disk_pressure",
"expected_status": "False"
},
"Ready": {
"metric_name": "node.ready_status",
"expected_status": "True"
}
}
class KubernetesAPI(checks.AgentCheck):
"""Queries Kubernetes API to get metrics about the Kubernetes deployment
"""
def __init__(self, name, init_config, agent_config, instances=None):
checks.AgentCheck.__init__(self, name, init_config, agent_config, instances)
if instances is not None and len(instances) > 1:
raise Exception('Kubernetes api check only supports one configured instance.')
self.connection_timeout = int(init_config.get('connection_timeout', DEFAULT_TIMEOUT))
self.kubernetes_connector = None
self.kubernetes_api = None
def prepare_run(self):
"""Set up Kubernetes connection information"""
instance = self.instances[0]
host = instance.get("host", None)
derive_api_url = instance.get("derive_api_url", None)
if not host:
if derive_api_url:
self.kubernetes_connector = utils.KubernetesConnector(self.connection_timeout)
else:
exception_message = "Either Kubernetes API url (host and port)" \
" or derive_api_url=True must be set" \
" when running Kubernetes API plugin."
self.log.error(exception_message)
raise Exception(exception_message)
else:
kubernetes_api_port = instance.get("kubernetes_api_port", "8080")
self.kubernetes_api = "http://{}:{}".format(host, kubernetes_api_port)
def check(self, instance):
kubernetes_labels = instance.get('kubernetes_labels', ["app"])
dimensions = self._set_dimensions(None, instance)
# Remove hostname from dimensions as the majority of the metrics are not tied to the hostname.
del dimensions['hostname']
kubernetes_api_health = self._get_api_health()
self.gauge("kubernetes.api.health_status", 0 if kubernetes_api_health else 1, dimensions,
hostname="SUPPRESS")
self._report_cluster_component_statuses(dimensions)
self._report_nodes_metrics(dimensions)
self._report_deployment_metrics(dimensions, kubernetes_labels)
self._report_replication_controller_metrics(dimensions, kubernetes_labels)
def _send_request(self, endpoint, as_json=True):
if self.kubernetes_connector:
return self.kubernetes_connector.get_request(endpoint, as_json=as_json)
else:
result = requests.get("{}/{}".format(self.kubernetes_api, endpoint))
return result.json() if as_json else result
def _get_api_health(self):
try:
result = self._send_request("healthz", as_json=False)
except Exception as e:
self.log.error("Error connecting to the health endpoint with exception {}".format(e))
return False
else:
# Return true if 'ok' is in result
return 'ok' in result.iter_lines()
def _report_cluster_component_statuses(self, dimensions):
try:
component_statuses = self._send_request("/api/v1/componentstatuses")
except Exception as e:
self.log.error("Error getting data from Kubernetes API - {}".format(e))
return
for component in component_statuses['items']:
component_dimensions = dimensions.copy()
component_dimensions['component_name'] = component['metadata']['name']
component_status = False
component_conditions = component['conditions']
for condition in component_conditions:
if 'type' in condition and condition['type'] == 'Healthy':
if condition['status']:
component_status = True
break
self.gauge("kubernetes.component_status", 0 if component_status else 1, component_dimensions,
hostname="SUPPRESS")
def _set_kubernetes_dimensions(self, dimensions, type, metadata, kubernetes_labels):
dimensions['type'] = metadata['name']
dimensions['namespace'] = metadata['namespace']
if 'labels' in metadata:
labels = metadata['labels']
for label in kubernetes_labels:
if label in labels:
dimensions[label] = labels[label]
def _report_node_resource_metrics(self, resource, metrics, node_dimensions):
resource_metrics_dimensions = node_dimensions.copy()
for metric_name, metric_value in metrics.items():
if "gpu" in metric_name:
continue
if metric_name == "memory":
metric_name += "_bytes"
metric_value = utils.convert_memory_string_to_bytes(metric_value)
resource_metrics_dimensions.update({'unit': 'bytes'})
elif metric_name == "cpu":
resource_metrics_dimensions.update({'unit': 'cores'})
metric_name = "kubernetes.node.{}.{}".format(resource, metric_name)
self.gauge(metric_name, float(metric_value), resource_metrics_dimensions)
def _report_node_conditions_metrics(self, node_conditions, node_dimensions):
for condition in node_conditions:
condition_type = condition["type"]
if condition_type in NODE_CONDITIONS_MAP:
condition_map = NODE_CONDITIONS_MAP[condition_type]
condition_status = condition['status']
if condition_status == condition_map['expected_status']:
self.gauge("kubernetes." + condition_map['metric_name'], 0, node_dimensions)
else:
value_meta = {"reason": condition['message'][:1024]}
self.gauge("kubernetes." + condition_map['metric_name'], 1, node_dimensions, value_meta=value_meta)
def _report_nodes_metrics(self, dimensions):
try:
nodes = self._send_request("/api/v1/nodes")
except Exception as e:
self.log.error("Error getting node data from Kubernetes API - {}".format(e))
return
for node in nodes['items']:
node_dimensions = dimensions.copy()
node_dimensions['hostname'] = node['metadata']['name']
node_status = node['status']
self._report_node_conditions_metrics(node_status['conditions'], node_dimensions)
if 'spec' in node and 'unschedulable' in node['spec']:
if node['spec']['unschedulable']:
continue
node_capacity = node_status['capacity']
node_allocatable = node_status['allocatable']
self._report_node_resource_metrics('capacity', node_capacity, node_dimensions)
self._report_node_resource_metrics('allocatable', node_allocatable, node_dimensions)
def _report_deployment_metrics(self, dimensions, kubernetes_labels):
try:
deployments = self._send_request("/apis/extensions/v1beta1/deployments")
except Exception as e:
self.log.error("Error getting deployment data from Kubernetes API - {}".format(e))
return
for deployment in deployments['items']:
try:
deployment_dimensions = dimensions.copy()
self._set_kubernetes_dimensions(deployment_dimensions, "deployment", deployment['metadata'],
kubernetes_labels)
deployment_status = deployment['status']
deployment_replicas = deployment_status['replicas']
deployment_updated_replicas = deployment_status['updatedReplicas']
deployment_available_replicas = deployment_status['availableReplicas']
deployment_unavailable_replicas = deployment_available_replicas - deployment_replicas
self.gauge("kubernetes.deployment.replicas", deployment_replicas,
deployment_dimensions, hostname="SUPPRESS")
self.gauge("kubernetes.deployment.available_replicas", deployment_available_replicas,
deployment_dimensions, hostname="SUPPRESS")
self.gauge("kubernetes.deployment.unavailable_replicas", deployment_unavailable_replicas,
deployment_dimensions, hostname="SUPPRESS")
self.gauge("kubernetes.deployment.updated_replicas", deployment_updated_replicas,
deployment_dimensions, hostname="SUPPRESS")
except Exception as e:
self.log.info("Error {} parsing deployment {}. Skipping".format(e, deployment), exc_info=e)
def _report_replication_controller_metrics(self, dimensions, kubernetes_labels):
# Get namespaces first
try:
namespaces = self._send_request("/api/v1/namespaces")
except Exception as e:
self.log.error("Error getting namespaces from API - {}. "
"Skipping getting replication controller metrics".format(e))
return
for namespace in namespaces['items']:
namespace_name = namespace['metadata']['name']
try:
replication_controllers = self._send_request(
"/api/v1/namespaces/{}/replicationcontrollers".format(namespace_name))
except Exception as e:
self.log.error("Error getting replication controllers for the namespace {} "
"with the error {}".format(namespace, e))
continue
if 'items' not in replication_controllers:
continue
for rc in replication_controllers['items']:
rc_dimensions = dimensions.copy()
self._set_kubernetes_dimensions(rc_dimensions, "replication_controller", rc['metadata'],
kubernetes_labels)
rc_status = rc['status']
if 'replicas' not in rc_status or not rc_status['replicas']:
continue
self.gauge("kubernetes.replication.controller.replicas", rc_status['replicas'],
rc_dimensions, hostname="SUPPRESS")
self.gauge("kubernetes.replication.controller.ready_replicas", rc_status['readyReplicas'],
rc_dimensions, hostname="SUPPRESS")

View File

@ -0,0 +1,147 @@
# (C) Copyright 2017 Hewlett Packard Enterprise Development LP
import mock
import unittest
from monasca_agent.collector.checks_d.kubernetes_api import KubernetesAPI
SUCCESS = 0
FAILURE = 1
KUBERNETES_LABELS = ['app']
class TestKubernetesAPI(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
init_config = {}
agent_config = {}
self._kubernetes_api = KubernetesAPI('TestKubernetesAPI',
init_config,
agent_config)
self._gauge = mock.Mock()
self._kubernetes_api.gauge = self._gauge
self._hostname = 'SUPPRESS'
self._instance = {'derive_api_url': True}
self._base_dimensions = {}
def _get_api_health_check(self, instance, result_input):
mock_check = mock.Mock(return_value=result_input)
self._kubernetes_api._get_api_health = mock_check
self._kubernetes_api.check(instance)
def test_kubernetes_api_is_healthy(self):
api_health_result = True
self._get_api_health_check(self._instance,
api_health_result)
self._gauge.assert_called_with('kubernetes.api.health_status',
SUCCESS,
self._base_dimensions,
hostname=self._hostname)
def test_kubernetes_api_is_not_healthy(self):
api_health_result = False
self._get_api_health_check(self._instance,
api_health_result)
self._gauge.assert_called_with('kubernetes.api.health_status',
FAILURE,
self._base_dimensions,
hostname=self._hostname)
def _send_request(self, result_input):
mock_check = mock.Mock(return_value=result_input)
self._kubernetes_api._send_request = mock_check
def test_report_cluster_component_statuses(self):
component_statuses_request_result = {
u'items': [
{u'conditions': [{
u'status': u'True',
u'message': u'{"health": "true"}',
u'type': u'Healthy'}],
u'metadata': {u'creationTimestamp': None,
u'name': u'etcd-0'}}],
u'kind': u'ComponentStatusList',
u'apiVersion': u'v1',
u'metadata': {u'selfLink': u'/api/v1/componentstatuses'}}
self._send_request(component_statuses_request_result)
self._kubernetes_api._report_cluster_component_statuses(
self._base_dimensions)
self._gauge.assert_called_with('kubernetes.component_status',
SUCCESS,
{'component_name': u'etcd-0'},
hostname=self._hostname)
def test_nodes_capacity_metric(self):
nodes_request_result = {
u'items': [
{u'status': {
u'capacity': {u'cpu': u'4'},
u'allocatable': {},
u'daemonEndpoints': {
u'kubeletEndpoint': {u'Port': 10250}},
u'images': [{u'sizeBytes': 821774423,
u'names': [u'image_name',
u'image_name:latest']}],
u'conditions': [{u'status': u'False',
u'type': u'OutOfDisk'}]},
u'metadata': {u'name': u'node01',
u'uid': u'e3600619-2557-11e7-9d76-aab101'}}]}
self._send_request(nodes_request_result)
self._kubernetes_api._report_nodes_metrics(self._base_dimensions)
self._gauge.assert_called_with('kubernetes.node.capacity.cpu',
4.0,
{'hostname': u'node01',
'unit': 'cores'})
def test_nodes_allocatable_metric(self):
nodes_request_result = {
u'items': [
{u'status': {
u'capacity': {},
u'allocatable': {
u'alpha.kubernetes.io/nvidia-gpu': u'0',
u'pods': u'110'},
u'daemonEndpoints': {
u'kubeletEndpoint': {u'Port': 10250}},
u'images': [{u'sizeBytes': 821774423,
u'names': [u'image_name',
u'image_name:latest']}],
u'conditions': [{u'status': u'False',
u'type': u'OutOfDisk'},
{u'status': u'False',
u'type': u'MemoryPressure'},
{u'status': u'False',
u'type': u'DiskPressure'},
{u'status': u'True',
u'type': u'Ready'}]},
u'metadata': {u'name': u'node01',
u'uid': u'e3600619-2557-11e7-9d76-aa3201'}}]}
self._send_request(nodes_request_result)
self._kubernetes_api._report_nodes_metrics(self._base_dimensions)
self._gauge.assert_called_with('kubernetes.node.allocatable.pods',
110.0,
{'hostname': u'node01'})
def test_deployment_metrics(self):
deployments_request_result = {
u'items': [
{u'status': {
u'observedGeneration': 1,
u'updatedReplicas': 2,
u'availableReplicas': 3,
u'replicas': 4
},
u'metadata': {u'name': u'kube-controller-manager',
u'labels': {
u'k8s-app': u'kube-controller-manager'},
u'namespace': u'kube-system',
u'uid': u'e61835b9-2557-11e7-9d76-aabbcc201'}}]}
self._send_request(deployments_request_result)
self._kubernetes_api._report_deployment_metrics(self._base_dimensions,
KUBERNETES_LABELS)
self._gauge.assert_called_with(
'kubernetes.deployment.updated_replicas', 2,
{'type': u'kube-controller-manager',
'namespace': u'kube-system'},
hostname=self._hostname)