From 7ffae8e9fce377a1e3e2b22721c0564f3aff46ae Mon Sep 17 00:00:00 2001 From: Michael James Hoppal Date: Tue, 31 Jan 2017 10:32:30 -0700 Subject: [PATCH] Add Kubernetes API Plugin Plugin that connects to the Kubernetes API to gather metrics about the Kubernetes environment. Taken from original review https://review.openstack.org/#/c/391559/ Change-Id: Ifff9285e9a2ac06d59383b986619ee62c59c712e --- conf.d/kubernetes_api.yaml.example | 18 ++ docs/Plugins.md | 59 +++++ .../collector/checks_d/kubernetes_api.py | 216 ++++++++++++++++++ tests/checks_d/test_kubernetes_api.py | 147 ++++++++++++ 4 files changed, 440 insertions(+) create mode 100644 conf.d/kubernetes_api.yaml.example create mode 100644 monasca_agent/collector/checks_d/kubernetes_api.py create mode 100644 tests/checks_d/test_kubernetes_api.py diff --git a/conf.d/kubernetes_api.yaml.example b/conf.d/kubernetes_api.yaml.example new file mode 100644 index 00000000..2748fb86 --- /dev/null +++ b/conf.d/kubernetes_api.yaml.example @@ -0,0 +1,18 @@ +# (C) Copyright 2017 Hewlett Packard Enterprise Development LP + +init_config: + # Timeout on GET requests endpoints + connection_timeout: 3 +instances: + # There is two options for connecting to the api. Either by passing in the host the api is running on and the port + # it is bound to or by deriving the api url from the kubernetes environment variables (if the agent is running in + # a kubernetes container. You must set one or the other. + - host: "127.0.0.1" + # Port of kubernetes master to connect to defaults to 8080 + # kubernetes_api_port: 8080 + + # Derive kubernetes api url from kubernetes environmental variables. + # derive_api_url: True + + # Set of kubernetes labels that we search for in the kubernetes metadata to set as dimensions + # kubernetes_labels: ['k8s-app', 'version'] diff --git a/docs/Plugins.md b/docs/Plugins.md index f7fdcf0a..bc0a7755 100644 --- a/docs/Plugins.md +++ b/docs/Plugins.md @@ -57,6 +57,7 @@ - [The monasca.json_plugin.status Metric](#the-monascajson_pluginstatus-metric) - [Kafka Checks](#kafka-checks) - [Kubernetes](#kubernetes) + - [Kubernetes API](#kubernetes_api) - [KyotoTycoon](#kyototycoon) - [Libvirt VM Monitoring](#libvirt-vm-monitoring) - [Open vSwitch Neutron Router Monitoring](#open-vswitch-neutron-router-monitoring) @@ -155,6 +156,7 @@ The following plugins are delivered via setup as part of the standard plugin che | kafka_consumer | | | | kibana | **kibana_install_dir**/kibana.yml | Integration to Kibana | | kubernetes | | | +| kubernetes_api | | | | kyototycoon | | | | libvirt | | | | lighttpd | | | @@ -1303,6 +1305,18 @@ Sample configs: Without custom labels and host being manually set: +## Kubernetes_API + +This plugin collects metrics from the kubernetes api on kubernetes components, nodes, deployments and replication controllers. + +When setting the kubernetes configuration there is a parameter "kubernetes_labels" where it will look for kubernetes tags that are user defined to use as dimensions for replication controller and deployment metrics. + +There are two ways you can configure the plugin to connect to the kubernetes api. Either by setting the host and port or by setting the derive_api_url to True. If deriving the plugin sets the kubernetes api url by looking at the environment variables. (This should be used if the agent is running in a kubernetes container) + +Sample configs: + +Without custom labels: + ``` init_config: # Timeout on GET requests @@ -1316,6 +1330,13 @@ instances: ``` With custom labels and host being manually set: +instances: + # Set to the host that the plugin will use when connecting to the Kubernetes API + - host: "127.0.0.1" + kubernetes_api_port: 8080 +``` + +With custom labels: ``` init_config: @@ -1331,6 +1352,14 @@ instances: ``` With custom labels and derive host being set: +instances: + # Set to the host that the plugin will use when connecting to the Kubernetes API + - host: "127.0.0.1" + kubernetes_api_port: 8080 + kubernetes_labels: ['k8s-app', 'version'] +``` + +With custom labels and derive api url set to True: ``` init_config: @@ -1461,6 +1490,36 @@ Pod Phase Mapping: | 3 | Failed | | 4 | Unknown | +instances: + - derive api url: True + kubernetes_labels: ['k8s-app', 'version'] +``` + +Note this plugin only supports one instance in the config file. + +Metrics (Note for replication controller and deployment metrics they can also have custom dimensions set from the configuration option 'kubernetes_labels') + +| Metric Name | Dimensions | Semantics | +| ----------- | ---------- | --------- | +| kubernetes.api.health_status | | Health status of the api +| kubernetes.component_status | component_name | Status of cluster's components +| kubernetes.node.out_of_disk | hostname | The node is out of disk +| kubernetes.node.memory_pressure | hostname | Available memory on the node has satisfied an eviction threshold +| kubernetes.node.disk_pressure | hostname | Available disk space and inodes on either the node’s root filesystem or image filesystem has satisfied an eviction threshold +| kubernetes.node.ready_status | hostname | The ready status of the kubernetes node +| kubernetes.node.allocatable.memory_bytes | hostname, unit | Total allocatable memory in bytes available for scheduling on the node +| kubernetes.node.allocatable.cpu | hostname, unit | Total allocatable cpu cores available for scheduling on the node +| kubernetes.node.allocatable.pods | hostname | Total allocatable pods available for scheduling on the node +| kubernetes.node.capacity.memory_bytes | hostname, unit | Total memory on the node +| kubernetes.node.capacity.cpu | hostname, unit | Total amount of cpu cores on the node +| kubernetes.node.capacity.pods | hostname | Total amount of pods that could be run on the node +| kubernetes.deployment.available_replicas | deployment, namespace | The number of available replicas for the deployment +| kubernetes.deployment.replicas | deployment, namespace | The number of replicas for the deployment +| kubernetes.deployment.unavailable_replicas | deployment, namespace | The number of unavailable replicas for the deployment +| kubernetes.deployment.updated_replicas | deployment, namespace | The number of updated replicas for the deployment +| kubernetes.replication.controller.ready_replicas | replication_controller, namespace | The number of ready replicas for the replication controller +| kubernetes.replication.controller.replicas | replication_controller, namespace | The number of replicas for the replication controller + ## KyotoTycoon See [the example configuration](https://github.com/openstack/monasca-agent/blob/master/conf.d/kyototycoon.yaml.example) for how to configure the KyotoTycoon plugin. diff --git a/monasca_agent/collector/checks_d/kubernetes_api.py b/monasca_agent/collector/checks_d/kubernetes_api.py new file mode 100644 index 00000000..89d24df1 --- /dev/null +++ b/monasca_agent/collector/checks_d/kubernetes_api.py @@ -0,0 +1,216 @@ +# (C) Copyright 2017 Hewlett Packard Enterprise Development LP +import requests + +from monasca_agent.collector import checks +from monasca_agent.collector.checks import utils + +DEFAULT_TIMEOUT = 5 +NODE_CONDITIONS_MAP = { + "OutOfDisk": { + "metric_name": "node.out_of_disk", + "expected_status": "False" + }, + "MemoryPressure": { + "metric_name": "node.memory_pressure", + "expected_status": "False" + }, + "DiskPressure": { + "metric_name": "node.disk_pressure", + "expected_status": "False" + }, + "Ready": { + "metric_name": "node.ready_status", + "expected_status": "True" + } +} + + +class KubernetesAPI(checks.AgentCheck): + """Queries Kubernetes API to get metrics about the Kubernetes deployment + """ + def __init__(self, name, init_config, agent_config, instances=None): + checks.AgentCheck.__init__(self, name, init_config, agent_config, instances) + if instances is not None and len(instances) > 1: + raise Exception('Kubernetes api check only supports one configured instance.') + self.connection_timeout = int(init_config.get('connection_timeout', DEFAULT_TIMEOUT)) + self.kubernetes_connector = None + self.kubernetes_api = None + + def prepare_run(self): + """Set up Kubernetes connection information""" + instance = self.instances[0] + host = instance.get("host", None) + derive_api_url = instance.get("derive_api_url", None) + if not host: + if derive_api_url: + self.kubernetes_connector = utils.KubernetesConnector(self.connection_timeout) + else: + exception_message = "Either Kubernetes API url (host and port)" \ + " or derive_api_url=True must be set" \ + " when running Kubernetes API plugin." + self.log.error(exception_message) + raise Exception(exception_message) + else: + kubernetes_api_port = instance.get("kubernetes_api_port", "8080") + self.kubernetes_api = "http://{}:{}".format(host, kubernetes_api_port) + + def check(self, instance): + kubernetes_labels = instance.get('kubernetes_labels', ["app"]) + dimensions = self._set_dimensions(None, instance) + # Remove hostname from dimensions as the majority of the metrics are not tied to the hostname. + del dimensions['hostname'] + kubernetes_api_health = self._get_api_health() + self.gauge("kubernetes.api.health_status", 0 if kubernetes_api_health else 1, dimensions, + hostname="SUPPRESS") + self._report_cluster_component_statuses(dimensions) + self._report_nodes_metrics(dimensions) + self._report_deployment_metrics(dimensions, kubernetes_labels) + self._report_replication_controller_metrics(dimensions, kubernetes_labels) + + def _send_request(self, endpoint, as_json=True): + if self.kubernetes_connector: + return self.kubernetes_connector.get_request(endpoint, as_json=as_json) + else: + result = requests.get("{}/{}".format(self.kubernetes_api, endpoint)) + return result.json() if as_json else result + + def _get_api_health(self): + try: + result = self._send_request("healthz", as_json=False) + except Exception as e: + self.log.error("Error connecting to the health endpoint with exception {}".format(e)) + return False + else: + # Return true if 'ok' is in result + return 'ok' in result.iter_lines() + + def _report_cluster_component_statuses(self, dimensions): + try: + component_statuses = self._send_request("/api/v1/componentstatuses") + except Exception as e: + self.log.error("Error getting data from Kubernetes API - {}".format(e)) + return + for component in component_statuses['items']: + component_dimensions = dimensions.copy() + component_dimensions['component_name'] = component['metadata']['name'] + component_status = False + component_conditions = component['conditions'] + for condition in component_conditions: + if 'type' in condition and condition['type'] == 'Healthy': + if condition['status']: + component_status = True + break + self.gauge("kubernetes.component_status", 0 if component_status else 1, component_dimensions, + hostname="SUPPRESS") + + def _set_kubernetes_dimensions(self, dimensions, type, metadata, kubernetes_labels): + dimensions['type'] = metadata['name'] + dimensions['namespace'] = metadata['namespace'] + if 'labels' in metadata: + labels = metadata['labels'] + for label in kubernetes_labels: + if label in labels: + dimensions[label] = labels[label] + + def _report_node_resource_metrics(self, resource, metrics, node_dimensions): + resource_metrics_dimensions = node_dimensions.copy() + for metric_name, metric_value in metrics.items(): + if "gpu" in metric_name: + continue + if metric_name == "memory": + metric_name += "_bytes" + metric_value = utils.convert_memory_string_to_bytes(metric_value) + resource_metrics_dimensions.update({'unit': 'bytes'}) + elif metric_name == "cpu": + resource_metrics_dimensions.update({'unit': 'cores'}) + metric_name = "kubernetes.node.{}.{}".format(resource, metric_name) + self.gauge(metric_name, float(metric_value), resource_metrics_dimensions) + + def _report_node_conditions_metrics(self, node_conditions, node_dimensions): + for condition in node_conditions: + condition_type = condition["type"] + if condition_type in NODE_CONDITIONS_MAP: + condition_map = NODE_CONDITIONS_MAP[condition_type] + condition_status = condition['status'] + if condition_status == condition_map['expected_status']: + self.gauge("kubernetes." + condition_map['metric_name'], 0, node_dimensions) + else: + value_meta = {"reason": condition['message'][:1024]} + self.gauge("kubernetes." + condition_map['metric_name'], 1, node_dimensions, value_meta=value_meta) + + def _report_nodes_metrics(self, dimensions): + try: + nodes = self._send_request("/api/v1/nodes") + except Exception as e: + self.log.error("Error getting node data from Kubernetes API - {}".format(e)) + return + for node in nodes['items']: + node_dimensions = dimensions.copy() + node_dimensions['hostname'] = node['metadata']['name'] + node_status = node['status'] + self._report_node_conditions_metrics(node_status['conditions'], node_dimensions) + if 'spec' in node and 'unschedulable' in node['spec']: + if node['spec']['unschedulable']: + continue + node_capacity = node_status['capacity'] + node_allocatable = node_status['allocatable'] + self._report_node_resource_metrics('capacity', node_capacity, node_dimensions) + self._report_node_resource_metrics('allocatable', node_allocatable, node_dimensions) + + def _report_deployment_metrics(self, dimensions, kubernetes_labels): + try: + deployments = self._send_request("/apis/extensions/v1beta1/deployments") + except Exception as e: + self.log.error("Error getting deployment data from Kubernetes API - {}".format(e)) + return + for deployment in deployments['items']: + try: + deployment_dimensions = dimensions.copy() + self._set_kubernetes_dimensions(deployment_dimensions, "deployment", deployment['metadata'], + kubernetes_labels) + deployment_status = deployment['status'] + deployment_replicas = deployment_status['replicas'] + deployment_updated_replicas = deployment_status['updatedReplicas'] + deployment_available_replicas = deployment_status['availableReplicas'] + deployment_unavailable_replicas = deployment_available_replicas - deployment_replicas + self.gauge("kubernetes.deployment.replicas", deployment_replicas, + deployment_dimensions, hostname="SUPPRESS") + self.gauge("kubernetes.deployment.available_replicas", deployment_available_replicas, + deployment_dimensions, hostname="SUPPRESS") + self.gauge("kubernetes.deployment.unavailable_replicas", deployment_unavailable_replicas, + deployment_dimensions, hostname="SUPPRESS") + self.gauge("kubernetes.deployment.updated_replicas", deployment_updated_replicas, + deployment_dimensions, hostname="SUPPRESS") + except Exception as e: + self.log.info("Error {} parsing deployment {}. Skipping".format(e, deployment), exc_info=e) + + def _report_replication_controller_metrics(self, dimensions, kubernetes_labels): + # Get namespaces first + try: + namespaces = self._send_request("/api/v1/namespaces") + except Exception as e: + self.log.error("Error getting namespaces from API - {}. " + "Skipping getting replication controller metrics".format(e)) + return + for namespace in namespaces['items']: + namespace_name = namespace['metadata']['name'] + try: + replication_controllers = self._send_request( + "/api/v1/namespaces/{}/replicationcontrollers".format(namespace_name)) + except Exception as e: + self.log.error("Error getting replication controllers for the namespace {} " + "with the error {}".format(namespace, e)) + continue + if 'items' not in replication_controllers: + continue + for rc in replication_controllers['items']: + rc_dimensions = dimensions.copy() + self._set_kubernetes_dimensions(rc_dimensions, "replication_controller", rc['metadata'], + kubernetes_labels) + rc_status = rc['status'] + if 'replicas' not in rc_status or not rc_status['replicas']: + continue + self.gauge("kubernetes.replication.controller.replicas", rc_status['replicas'], + rc_dimensions, hostname="SUPPRESS") + self.gauge("kubernetes.replication.controller.ready_replicas", rc_status['readyReplicas'], + rc_dimensions, hostname="SUPPRESS") diff --git a/tests/checks_d/test_kubernetes_api.py b/tests/checks_d/test_kubernetes_api.py new file mode 100644 index 00000000..0d7db5b2 --- /dev/null +++ b/tests/checks_d/test_kubernetes_api.py @@ -0,0 +1,147 @@ +# (C) Copyright 2017 Hewlett Packard Enterprise Development LP + +import mock +import unittest + +from monasca_agent.collector.checks_d.kubernetes_api import KubernetesAPI + +SUCCESS = 0 +FAILURE = 1 +KUBERNETES_LABELS = ['app'] + + +class TestKubernetesAPI(unittest.TestCase): + def setUp(self): + unittest.TestCase.setUp(self) + init_config = {} + agent_config = {} + self._kubernetes_api = KubernetesAPI('TestKubernetesAPI', + init_config, + agent_config) + self._gauge = mock.Mock() + self._kubernetes_api.gauge = self._gauge + self._hostname = 'SUPPRESS' + self._instance = {'derive_api_url': True} + self._base_dimensions = {} + + def _get_api_health_check(self, instance, result_input): + mock_check = mock.Mock(return_value=result_input) + self._kubernetes_api._get_api_health = mock_check + self._kubernetes_api.check(instance) + + def test_kubernetes_api_is_healthy(self): + api_health_result = True + self._get_api_health_check(self._instance, + api_health_result) + self._gauge.assert_called_with('kubernetes.api.health_status', + SUCCESS, + self._base_dimensions, + hostname=self._hostname) + + def test_kubernetes_api_is_not_healthy(self): + api_health_result = False + self._get_api_health_check(self._instance, + api_health_result) + + self._gauge.assert_called_with('kubernetes.api.health_status', + FAILURE, + self._base_dimensions, + hostname=self._hostname) + + def _send_request(self, result_input): + mock_check = mock.Mock(return_value=result_input) + self._kubernetes_api._send_request = mock_check + + def test_report_cluster_component_statuses(self): + component_statuses_request_result = { + u'items': [ + {u'conditions': [{ + u'status': u'True', + u'message': u'{"health": "true"}', + u'type': u'Healthy'}], + u'metadata': {u'creationTimestamp': None, + u'name': u'etcd-0'}}], + u'kind': u'ComponentStatusList', + u'apiVersion': u'v1', + u'metadata': {u'selfLink': u'/api/v1/componentstatuses'}} + self._send_request(component_statuses_request_result) + self._kubernetes_api._report_cluster_component_statuses( + self._base_dimensions) + self._gauge.assert_called_with('kubernetes.component_status', + SUCCESS, + {'component_name': u'etcd-0'}, + hostname=self._hostname) + + def test_nodes_capacity_metric(self): + nodes_request_result = { + u'items': [ + {u'status': { + u'capacity': {u'cpu': u'4'}, + u'allocatable': {}, + u'daemonEndpoints': { + u'kubeletEndpoint': {u'Port': 10250}}, + u'images': [{u'sizeBytes': 821774423, + u'names': [u'image_name', + u'image_name:latest']}], + u'conditions': [{u'status': u'False', + u'type': u'OutOfDisk'}]}, + u'metadata': {u'name': u'node01', + u'uid': u'e3600619-2557-11e7-9d76-aab101'}}]} + self._send_request(nodes_request_result) + self._kubernetes_api._report_nodes_metrics(self._base_dimensions) + self._gauge.assert_called_with('kubernetes.node.capacity.cpu', + 4.0, + {'hostname': u'node01', + 'unit': 'cores'}) + + def test_nodes_allocatable_metric(self): + nodes_request_result = { + u'items': [ + {u'status': { + u'capacity': {}, + u'allocatable': { + u'alpha.kubernetes.io/nvidia-gpu': u'0', + u'pods': u'110'}, + u'daemonEndpoints': { + u'kubeletEndpoint': {u'Port': 10250}}, + u'images': [{u'sizeBytes': 821774423, + u'names': [u'image_name', + u'image_name:latest']}], + u'conditions': [{u'status': u'False', + u'type': u'OutOfDisk'}, + {u'status': u'False', + u'type': u'MemoryPressure'}, + {u'status': u'False', + u'type': u'DiskPressure'}, + {u'status': u'True', + u'type': u'Ready'}]}, + u'metadata': {u'name': u'node01', + u'uid': u'e3600619-2557-11e7-9d76-aa3201'}}]} + self._send_request(nodes_request_result) + self._kubernetes_api._report_nodes_metrics(self._base_dimensions) + self._gauge.assert_called_with('kubernetes.node.allocatable.pods', + 110.0, + {'hostname': u'node01'}) + + def test_deployment_metrics(self): + deployments_request_result = { + u'items': [ + {u'status': { + u'observedGeneration': 1, + u'updatedReplicas': 2, + u'availableReplicas': 3, + u'replicas': 4 + }, + u'metadata': {u'name': u'kube-controller-manager', + u'labels': { + u'k8s-app': u'kube-controller-manager'}, + u'namespace': u'kube-system', + u'uid': u'e61835b9-2557-11e7-9d76-aabbcc201'}}]} + self._send_request(deployments_request_result) + self._kubernetes_api._report_deployment_metrics(self._base_dimensions, + KUBERNETES_LABELS) + self._gauge.assert_called_with( + 'kubernetes.deployment.updated_replicas', 2, + {'type': u'kube-controller-manager', + 'namespace': u'kube-system'}, + hostname=self._hostname)