promenade/promenade/kubeclient.py

138 lines
4.6 KiB
Python

# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import kubernetes
from kubernetes.client.rest import ApiException
from urllib3.exceptions import MaxRetryError
from promenade import logging
from promenade.exceptions import KubernetesApiError
from promenade.exceptions import KubernetesConfigException
from promenade.exceptions import NodeNotFoundException
from promenade.utils.success_message import SuccessMessage
LOG = logging.getLogger(__name__)
class KubeClient(object):
"""
Class for Kubernetes APIs client
"""
def __init__(self):
""" Set Kubernetes APIs connection """
try:
LOG.info('Loading in-cluster Kubernetes configuration.')
kubernetes.config.load_incluster_config()
except kubernetes.config.config_exception.ConfigException:
LOG.debug('Failed to load in-cluster configuration')
try:
LOG.info('Loading out-of-cluster Kubernetes configuration.')
kubernetes.config.load_kube_config()
except FileNotFoundError:
LOG.exception(
'FileNotFoundError: Failed to load Kubernetes config file.'
)
raise KubernetesConfigException
self.client = kubernetes.client.CoreV1Api()
def update_node_labels(self, node_name, input_labels):
"""
Updating node labels
Args:
node_name(str): node for which updating labels
input_labels(dict): input labels dict
Returns:
SuccessMessage(dict): API success response
"""
resp_body_succ = SuccessMessage('Update node labels', falcon.HTTP_200)
try:
existing_labels = self.get_node_labels(node_name)
update_labels = _get_update_labels(existing_labels, input_labels)
# If there is a change
if bool(update_labels):
body = {"metadata": {"labels": update_labels}}
self.client.patch_node(node_name, body)
return resp_body_succ.get_output_json()
except (ApiException, MaxRetryError) as e:
LOG.exception("An exception occurred during node labels update: " +
str(e))
raise KubernetesApiError
def get_node_labels(self, node_name):
"""
Get existing registered node labels
Args:
node_name(str): node of which getting labels
Returns:
dict: labels dict
"""
try:
response = self.client.read_node(node_name)
if response is not None:
return response.metadata.labels
else:
return {}
except (ApiException, MaxRetryError) as e:
LOG.exception("An exception occurred in fetching node labels: " +
str(e))
if hasattr(e, 'status') and str(e.status) == "404":
raise NodeNotFoundException
else:
raise KubernetesApiError
def _get_update_labels(existing_labels, input_labels):
"""
Helper function to add new labels, delete labels, override
existing labels
Args:
existing_labels(dict): Existing node labels
input_labels(dict): Input/Req. labels
Returns:
update_labels(dict): Node labels to be updated
or
input_labels(dict): Node labels to be updated
"""
update_labels = {}
# no existing labels found
if not existing_labels:
# filter delete label request since there is no labels set on a node
update_labels.update({
k: v
for k, v in input_labels.items() if v is not None
})
return update_labels
# new labels or overriding labels
update_labels.update({
k: v
for k, v in input_labels.items()
if k not in existing_labels or v != existing_labels[k]
})
# deleted labels
update_labels.update({
k: None
for k in existing_labels.keys()
if k not in input_labels and "kubernetes.io" not in k
})
return update_labels