Add Kubernetes Connector util class

Class to establish a connection to the Kubernetes API if the
container is running within a Kubernetes container.

In Kubernetes access details are passed into the container in how
to connect to the API. This includes host, port, certs. The class
will configure those details for you.

Also add in method to convert memory from string format to bytes

Taken from original review https://review.openstack.org/#/c/391559/

Change-Id: Iea809ea9ece423ef47efffe7c37f3f6eccd81d44
This commit is contained in:
Michael James Hoppal 2017-01-31 09:08:37 -07:00 committed by Kaiyan Sheng
parent 2a8bf2926d
commit 03c202d629
3 changed files with 147 additions and 3 deletions

View File

@ -6,6 +6,8 @@
- [Examples](#examples)
- [Adding a new instance](#adding-a-new-instance)
- [Changing the current instance](#changing-the-current-instance)
- [Connector](#connector)
- [Kubernetes Connector](#kubernetes-connector)
- [License](#license)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
@ -135,7 +137,16 @@ output_config from modify_config:
}
```
# Connector
## Kubernetes Connector
Kubernetes Connector is a class within [monasca-collector utils](https://github.com/openstack/monasca-agent/blob/master/monasca_agent/collector/checks/utils.py)
that is used for connecting to the Kubernetes API from within a container that is running in a k8 cluster.
When a container is brought up in Kubernetes by default there are environmental variables passed in that include needed
configurations to connect to the API. Also, the cacert and token that is tied to the serviceaccount the container is
under is mounted to the container file system. This class processes both and allows requests to the Kubernetes API.
# License
(C) Copyright 2016 Hewlett Packard Enterprise Development LP
(C) Copyright 2016,2017 Hewlett Packard Enterprise Development LP

View File

@ -1,6 +1,16 @@
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2015,2017 Hewlett Packard Enterprise Development LP
import base64
import logging
import math
import os
import requests
from monasca_agent.common import exceptions
log = logging.getLogger(__name__)
DEFAULT_TIMEOUT = 20
def add_basic_auth(request, username, password):
@ -47,3 +57,118 @@ def get_tenant_list(config, log):
log.error(msg.format(e))
return tenants
def convert_memory_string_to_bytes(memory_string):
"""Conversion from memory represented in string format to bytes"""
if "m" in memory_string:
memory = float(memory_string.split('m')[0])
return memory / 1000
elif "K" in memory_string:
memory = float(memory_string.split('K')[0])
return _compute_memory_bytes(memory_string, memory, 1)
elif "M" in memory_string:
memory = float(memory_string.split('M')[0])
return _compute_memory_bytes(memory_string, memory, 2)
elif "G" in memory_string:
memory = float(memory_string.split('G')[0])
return _compute_memory_bytes(memory_string, memory, 3)
elif "T" in memory_string:
memory = float(memory_string.split('T')[0])
return _compute_memory_bytes(memory_string, memory, 4)
else:
return float(memory_string)
def _compute_memory_bytes(memory_string, memory, power):
if "i" in memory_string:
return memory * math.pow(1024, power)
return memory * math.pow(1000, power)
class KubernetesConnector(object):
"""Class for connecting to Kubernetes API from within a container running
in a Kubernetes environment
"""
CACERT_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/ca.crt'
TOKEN_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/token'
def __init__(self, connection_timeout):
self.api_url = None
self.api_verify = None
self.api_request_header = None
if connection_timeout is None:
self.connection_timeout = DEFAULT_TIMEOUT
else:
self.connection_timeout = connection_timeout
self._set_kubernetes_api_connection_info()
self._set_kubernetes_service_account_info()
def _set_kubernetes_api_connection_info(self):
"""Set kubernetes API string from default container environment
variables
"""
api_host = os.environ.get('KUBERNETES_SERVICE_HOST', "kubernetes")
api_port = os.environ.get('KUBERNETES_SERVICE_PORT', "443")
self.api_url = "https://{}:{}".format(api_host, api_port)
def _set_kubernetes_service_account_info(self):
"""Set cert and token info to included on requests to the API"""
try:
with open(self.TOKEN_PATH) as token_file:
token = token_file.read()
except Exception as e:
log.error("Unable to read token - {}. Defaulting to using no token".format(e))
token = None
self.api_request_header = {'Authorization': 'Bearer {}'.format(token)} if token else None
self.api_verify = self.CACERT_PATH if os.path.exists(self.CACERT_PATH) else False
def get_agent_pod_host(self, return_host_name=False):
"""Obtain host the agent is running on in Kubernetes.
Used when trying to connect to services running on the node (Kubelet, cAdvisor)
"""
# Get pod name and namespace from environment variables
pod_name = os.environ.get("AGENT_POD_NAME")
pod_namespace = os.environ.get("AGENT_POD_NAMESPACE")
if not pod_name:
raise exceptions.MissingEnvironmentVariables(
"pod_name is not set as environment variables cannot derive"
" host from Kubernetes API")
if not pod_namespace:
raise exceptions.MissingEnvironmentVariables(
"pod_namespace is not set as environment variables cannot "
"derive host from Kubernetes API")
pod_url = "/api/v1/namespaces/{}/pods/{}".format(pod_namespace, pod_name)
try:
agent_pod = self.get_request(pod_url)
except Exception as e:
exception_message = "Could not get agent pod from Kubernetes API" \
" to get host IP with error - {}".format(e)
log.exception(exception_message)
raise exceptions.KubernetesAPIConnectionError(exception_message)
if not return_host_name:
return agent_pod['status']['hostIP']
else:
return agent_pod['spec']['nodeName']
def get_request(self, request_endpoint, as_json=True, retried=False):
"""Sends request to Kubernetes API with given endpoint.
Will retry the request once, with updated token/cert, if unauthorized.
"""
request_url = "{}/{}".format(self.api_url, request_endpoint)
result = requests.get(request_url,
timeout=self.connection_timeout,
headers=self.api_request_header,
verify=self.api_verify)
if result.status_code >= 300:
if result.status_code == 401 and not retried:
log.info("Could not authenticate with Kubernetes API at the"
" first time. Rereading in cert and token.")
self._set_kubernetes_service_account_info()
return self.get_request(request_endpoint, as_json=as_json,
retried=True)
exception_message = "Could not obtain data from {} with the " \
"given status code {} and return text {}".\
format(request_url, result.status_code, result.text)
raise exceptions.KubernetesAPIConnectionError(exception_message)
return result.json() if as_json else result

View File

@ -1,4 +1,4 @@
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2015,2017 Hewlett Packard Enterprise Development LP
class Infinity(Exception):
@ -19,3 +19,11 @@ class NaN(CheckException):
class PathNotFound(Exception):
pass
class MissingEnvironmentVariables(Exception):
pass
class KubernetesAPIConnectionError(Exception):
pass