config/sysinv/sysinv/sysinv/sysinv/common/kubernetes.py

1006 lines
39 KiB
Python

#
# Copyright (c) 2013-2021 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# All Rights Reserved.
#
""" System Inventory Kubernetes Utilities and helper functions."""
from __future__ import absolute_import
from distutils.version import LooseVersion
import json
import os
import re
import time
from kubernetes import config
from kubernetes import client
from kubernetes.client import Configuration
from kubernetes.client.rest import ApiException
from kubernetes.client.models.v1_container_image import V1ContainerImage
from kubernetes.stream import stream
from six.moves import http_client as httplib
from oslo_log import log as logging
from sysinv.common import exception
LOG = logging.getLogger(__name__)
# Kubernetes groups
CERT_MANAGER_GROUP = 'cert-manager.io'
# Kubernetes API versions
V1_ALPHA_2 = 'v1alpha2'
# Kubernetes Files
KUBERNETES_ADMIN_CONF = '/etc/kubernetes/admin.conf'
KUBERNETES_ROOTCA_CERT = '/etc/kubernetes/pki/ca.crt'
KUBERNETES_NEW_ROOTCA_CERT = '/etc/kubernetes/pki/ca_new.crt'
KUBERNETES_APISERVER_CERT = '/etc/kubernetes/pki/apiserver.crt'
# Kubernetes clusters
KUBERNETES_CLUSTER_DEFAULT = "kubernetes"
# Kubernetes users
KUBERNETES_ADMIN_USER = "kubernetes-admin"
# Possible states for each supported kubernetes version
KUBE_STATE_AVAILABLE = 'available'
KUBE_STATE_UNAVAILABLE = 'unavailable'
KUBE_STATE_ACTIVE = 'active'
KUBE_STATE_PARTIAL = 'partial'
# Kubernetes namespaces
NAMESPACE_KUBE_SYSTEM = 'kube-system'
NAMESPACE_DEPLOYMENT = 'deployment'
# Kubernetes control plane components
KUBE_APISERVER = 'kube-apiserver'
KUBE_CONTROLLER_MANAGER = 'kube-controller-manager'
KUBE_SCHEDULER = 'kube-scheduler'
# Kubernetes upgrade states
KUBE_UPGRADE_STARTED = 'upgrade-started'
KUBE_UPGRADE_DOWNLOADING_IMAGES = 'downloading-images'
KUBE_UPGRADE_DOWNLOADING_IMAGES_FAILED = 'downloading-images-failed'
KUBE_UPGRADE_DOWNLOADED_IMAGES = 'downloaded-images'
KUBE_UPGRADING_NETWORKING = 'upgrading-networking'
KUBE_UPGRADING_NETWORKING_FAILED = 'upgrading-networking-failed'
KUBE_UPGRADED_NETWORKING = 'upgraded-networking'
KUBE_UPGRADING_FIRST_MASTER = 'upgrading-first-master'
KUBE_UPGRADING_FIRST_MASTER_FAILED = 'upgrading-first-master-failed'
KUBE_UPGRADED_FIRST_MASTER = 'upgraded-first-master'
KUBE_UPGRADING_SECOND_MASTER = 'upgrading-second-master'
KUBE_UPGRADING_SECOND_MASTER_FAILED = 'upgrading-second-master-failed'
KUBE_UPGRADED_SECOND_MASTER = 'upgraded-second-master'
KUBE_UPGRADING_KUBELETS = 'upgrading-kubelets'
KUBE_UPGRADE_COMPLETE = 'upgrade-complete'
# Kubernetes host upgrade statuses
KUBE_HOST_UPGRADING_CONTROL_PLANE = 'upgrading-control-plane'
KUBE_HOST_UPGRADING_CONTROL_PLANE_FAILED = 'upgrading-control-plane-failed'
KUBE_HOST_UPGRADING_KUBELET = 'upgrading-kubelet'
KUBE_HOST_UPGRADING_KUBELET_FAILED = 'upgrading-kubelet-failed'
KUBE_HOST_UPGRADED_KUBELET = 'upgraded-kubelet'
# Kubernetes rootca update states
KUBE_ROOTCA_UPDATE_STARTED = 'update-started'
KUBE_ROOTCA_UPDATE_CERT_UPLOADED = 'update-new-rootca-cert-uploaded'
KUBE_ROOTCA_UPDATE_CERT_GENERATED = 'update-new-rootca-cert-generated'
KUBE_ROOTCA_UPDATING_PODS_TRUSTBOTHCAS = 'updating-pods-trust-both-cas'
KUBE_ROOTCA_UPDATED_PODS_TRUSTBOTHCAS = 'updated-pods-trust-both-cas'
KUBE_ROOTCA_UPDATING_PODS_TRUSTBOTHCAS_FAILED = 'updating-pods-trust-both-cas-failed'
KUBE_ROOTCA_UPDATING_PODS_TRUSTNEWCA = 'updating-pods-trust-new-ca'
KUBE_ROOTCA_UPDATED_PODS_TRUSTNEWCA = 'updated-pods-trust-new-ca'
KUBE_ROOTCA_UPDATING_PODS_TRUSTNEWCA_FAILED = 'updating-pods-trust-new-ca-failed'
KUBE_ROOTCA_UPDATE_COMPLETED = 'update-completed'
KUBE_ROOTCA_UPDATE_ABORTED = 'update-aborted'
# Kubernetes rootca host update states
KUBE_ROOTCA_UPDATING_HOST_TRUSTBOTHCAS = 'updating-host-trust-both-cas'
KUBE_ROOTCA_UPDATED_HOST_TRUSTBOTHCAS = 'updated-host-trust-both-cas'
KUBE_ROOTCA_UPDATING_HOST_TRUSTBOTHCAS_FAILED = 'updating-host-trust-both-cas-failed'
KUBE_ROOTCA_UPDATING_HOST_UPDATECERTS = 'updating-host-update-certs'
KUBE_ROOTCA_UPDATED_HOST_UPDATECERTS = 'updated-host-update-certs'
KUBE_ROOTCA_UPDATING_HOST_UPDATECERTS_FAILED = 'updating-host-update-certs-failed'
KUBE_ROOTCA_UPDATING_HOST_TRUSTNEWCA = 'updating-host-trust-new-ca'
KUBE_ROOTCA_UPDATED_HOST_TRUSTNEWCA = 'updated-host-trust-new-ca'
KUBE_ROOTCA_UPDATING_HOST_TRUSTNEWCA_FAILED = 'updating-host-trust-new-ca-failed'
# Kubeadm and Kubelet default versions
KUBERNETES_DEFAULT_VERSION = '1.18.1'
# Kubernetes constants
MANIFEST_APPLY_TIMEOUT = 60 * 15
MANIFEST_APPLY_INTERVAL = 10
POD_START_TIMEOUT = 60 * 2
POD_START_INTERVAL = 10
def get_kube_versions():
"""Provides a list of supported kubernetes versions."""
return [
{'version': 'v1.18.1',
'upgrade_from': [],
'downgrade_to': [],
'applied_patches': [],
'available_patches': [],
},
{'version': 'v1.19.13',
'upgrade_from': ['v1.18.1'],
'downgrade_to': [],
'applied_patches': [],
'available_patches': [],
},
{'version': 'v1.20.9',
'upgrade_from': ['v1.19.13'],
'downgrade_to': [],
'applied_patches': [],
'available_patches': [],
},
{'version': 'v1.21.3',
'upgrade_from': ['v1.20.9'],
'downgrade_to': [],
'applied_patches': [],
'available_patches': [],
},
]
def is_kube_version_supported(kube_version, min_version=None, max_version=None):
"""Check if the k8s version is supported by the application.
:param kube_version: the running or target k8s version
:param min_version (optional): minimum k8s version supported by the app
:param max_version (optional): maximum k8s version supported by the app
:returns bool: True if k8s version is supported
"""
if ((min_version is not None and LooseVersion(kube_version) < LooseVersion(min_version)) or
(max_version is not None and LooseVersion(kube_version) > LooseVersion(max_version))):
return False
return True
def get_kube_networking_upgrade_version(kube_upgrade):
"""Determine the version that kubernetes networking
should be upgraded to."""
if kube_upgrade.state in [
KUBE_UPGRADE_STARTED,
KUBE_UPGRADE_DOWNLOADING_IMAGES,
KUBE_UPGRADE_DOWNLOADING_IMAGES_FAILED,
KUBE_UPGRADE_DOWNLOADED_IMAGES]:
return kube_upgrade.from_version
else:
return kube_upgrade.to_version
def is_k8s_configured():
"""Check to see if the k8s admin config file exists."""
if os.path.isfile(KUBERNETES_ADMIN_CONF):
return True
return False
# https://github.com/kubernetes-client/python/issues/895
# If a container image contains no tag or digest, patch/list
# node requests sent via python Kubernetes client will be
# returned with exception because python Kubernetes client
# deserializes the ContainerImage response from kube-apiserver
# and it fails the validation due to the empty image name.
#
# Implement this workaround to replace the V1ContainerImage.names
# in the python Kubernetes client to bypass the "none image"
# check because the error is not from kubernetes. If patching
# a node with a new host label, we can see the label is
# created successfully in Kubernetes.
#
# This workaround should be removed if the proposed solutions
# can be made in kubernetes or a workaround can be implemented
# in containerd.
# https://github.com/kubernetes/kubernetes/pull/79018
# https://github.com/containerd/containerd/issues/4771
def names(self, names):
"""Monkey patch V1ContainerImage with this to set the names."""
self._names = names
# Replacing address of "names" in V1ContainerImage
# with the "names" defined above
V1ContainerImage.names = V1ContainerImage.names.setter(names)
class KubeOperator(object):
def __init__(self):
self._kube_client_batch = None
self._kube_client_core = None
self._kube_client_custom_objects = None
self._kube_client_admission_registration = None
def _load_kube_config(self):
if not is_k8s_configured():
raise exception.KubeNotConfigured()
config.load_kube_config(KUBERNETES_ADMIN_CONF)
# Workaround: Turn off SSL/TLS verification
c = Configuration()
c.verify_ssl = False
Configuration.set_default(c)
return c
def _get_kubernetesclient_batch(self):
if not self._kube_client_batch:
self._load_kube_config()
self._kube_client_batch = client.BatchV1Api()
return self._kube_client_batch
def _get_kubernetesclient_core(self):
if not self._kube_client_core:
self._load_kube_config()
self._kube_client_core = client.CoreV1Api()
return self._kube_client_core
def _get_kubernetesclient_custom_objects(self):
if not self._kube_client_custom_objects:
self._load_kube_config()
self._kube_client_custom_objects = client.CustomObjectsApi()
return self._kube_client_custom_objects
def _get_kubernetesclient_admission_registration(self):
if not self._kube_client_admission_registration:
self._load_kube_config()
self._kube_client_admission_registration = client.AdmissionregistrationV1beta1Api()
return self._kube_client_admission_registration
def kube_get_kubernetes_config(self):
return self._load_kube_config()
def kube_patch_node(self, name, body):
try:
api_response = self._get_kubernetesclient_core().patch_node(name, body)
LOG.debug("Response: %s" % api_response)
except ApiException as e:
if e.status == httplib.UNPROCESSABLE_ENTITY:
reason = json.loads(e.body).get('message', "")
raise exception.HostLabelInvalid(reason=reason)
elif e.status == httplib.NOT_FOUND:
raise exception.KubeNodeNotFound(name=name)
else:
raise
except Exception as e:
LOG.error("Kubernetes exception in kube_patch_node: %s" % e)
raise
def kube_get_nodes(self):
try:
api_response = self._get_kubernetesclient_core().list_node()
LOG.debug("Response: %s" % api_response)
return api_response.items
except Exception as e:
LOG.error("Kubernetes exception in kube_get_nodes: %s" % e)
raise
def kube_namespaced_pods_exist(self, namespace):
LOG.debug("kube_namespaced_pods_exist, namespace=%s" %
(namespace))
try:
api_response = self._get_kubernetesclient_core().list_namespaced_pod(
namespace)
if api_response.items:
return True
else:
return False
except ApiException as e:
LOG.error("Kubernetes exception in list_namespaced_pod: %s" % e)
raise
def kube_get_image_by_selector(self, template_name, namespace, container_name):
LOG.debug("kube_get_image_by_selector template_name=%s, namespace=%s" %
(template_name, namespace))
try:
# Retrieve the named pod.
api_response = self._get_kubernetesclient_core().list_namespaced_pod(
namespace, label_selector="name=%s" % template_name)
for pod in api_response.items:
if template_name in pod.metadata.name:
for container in pod.spec.containers:
if container.name == container_name:
return container.image
return None
except ApiException as e:
LOG.error("Kubernetes exception in list_namespaced_pod: %s" % e)
raise
def kube_get_image_by_pod_name(self, pod_name, namespace, container_name):
"""Returns the image for the specified container."""
LOG.debug("kube_get_image_by_pod_name pod_name=%s, namespace=%s, "
"container_name=%s" % (pod_name, namespace, container_name))
try:
# Retrieve the named pod
api_response = \
self._get_kubernetesclient_core().list_namespaced_pod(
namespace, field_selector="metadata.name=%s" % pod_name)
# We expect only one pod with this name
if len(api_response.items) != 1:
LOG.warn("Expected one pod with pod_name=%s, namespace=%s, "
"container_name=%s but found %d" %
(pod_name, namespace, container_name,
len(api_response.items)))
# Use the first pod
if len(api_response.items) >= 1:
pod = api_response.items[0]
for container in pod.spec.containers:
if container.name == container_name:
return container.image
return None
except ApiException as e:
LOG.error("Kubernetes exception in list_namespaced_pod: %s" % e)
raise
def kube_create_namespace(self, namespace):
body = {'metadata': {'name': namespace}}
c = self._get_kubernetesclient_core()
try:
c.create_namespace(body)
except ApiException as e:
if e.status == httplib.CONFLICT:
# Already exist
LOG.warn("Namespace %s already exist." % namespace)
else:
LOG.error("Failed to create Namespace %s: %s" % (namespace, e.body))
raise
except Exception as e:
LOG.error("Kubernetes exception in "
"_kube_create_namespace %s: %s" % (namespace, e))
raise
def kube_get_namespace(self, namespace):
c = self._get_kubernetesclient_core()
try:
c.read_namespace(namespace)
return True
except ApiException as e:
if e.status == httplib.NOT_FOUND:
return False
else:
LOG.error("Failed to get Namespace %s: %s" % (namespace, e.body))
raise
except Exception as e:
LOG.error("Kubernetes exception in "
"kube_get_namespace %s: %s" % (namespace, e))
raise
def kube_get_namespace_name_list(self):
c = self._get_kubernetesclient_core()
try:
ns_list = c.list_namespace()
return list(set(ns.metadata.name for ns in ns_list.items))
except Exception as e:
LOG.error("Failed to get Namespace list: %s" % e)
raise
def kube_list_secret_for_all_namespaces(self, selector=None):
c = self._get_kubernetesclient_core()
try:
secret_list = c.list_secret_for_all_namespaces(field_selector=selector)
return secret_list.items
except ApiException as e:
if e.status == httplib.NOT_FOUND:
return None
else:
LOG.error("Failed to list secrets in all namespaces %s" % e.body)
raise
except Exception as e:
LOG.exception(e)
raise
def kube_list_secret(self, namespace):
c = self._get_kubernetesclient_core()
try:
secret_list = c.list_namespaced_secret(namespace)
return secret_list.items
except ApiException as e:
if e.status == httplib.NOT_FOUND:
return None
else:
LOG.error("Failed to list secret under "
"Namespace %s: %s" % (namespace, e.body))
raise
except Exception as e:
LOG.exception(e)
raise
def kube_get_secret(self, name, namespace):
c = self._get_kubernetesclient_core()
try:
return c.read_namespaced_secret(name, namespace)
except ApiException as e:
if e.status == httplib.NOT_FOUND:
return None
else:
LOG.error("Failed to get Secret %s under "
"Namespace %s: %s" % (name, namespace, e.body))
raise
except Exception as e:
LOG.error("Kubernetes exception in kube_get_secret: %s" % e)
raise
def kube_create_secret(self, namespace, body):
c = self._get_kubernetesclient_core()
try:
c.create_namespaced_secret(namespace, body)
except Exception as e:
LOG.error("Failed to create Secret %s under Namespace %s: "
"%s" % (body['metadata']['name'], namespace, e))
raise
def kube_copy_secret(self, name, src_namespace, dst_namespace):
c = self._get_kubernetesclient_core()
try:
body = c.read_namespaced_secret(name, src_namespace, export=True)
body.metadata.namespace = dst_namespace
c.create_namespaced_secret(dst_namespace, body)
except Exception as e:
LOG.error("Failed to copy Secret %s from Namespace %s to Namespace "
"%s: %s" % (name, src_namespace, dst_namespace, e))
raise
def kube_patch_secret(self, name, namespace, body):
c = self._get_kubernetesclient_core()
try:
return c.patch_namespaced_secret(name, namespace, body)
except Exception as e:
LOG.error("Failed to patch Secret %s under Namespace %s: "
"%s" % (name, namespace, e))
raise
def kube_delete_persistent_volume_claim(self, namespace, **kwargs):
c = self._get_kubernetesclient_core()
try:
c.delete_collection_namespaced_persistent_volume_claim(
namespace, **kwargs)
except Exception as e:
LOG.error("Failed to delete Persistent Volume Claim "
"under Namespace %s: %s" % (namespace, e))
raise
def kube_delete_secret(self, name, namespace, **kwargs):
body = {}
if kwargs:
body.update(kwargs)
c = self._get_kubernetesclient_core()
try:
return c.delete_namespaced_secret(name, namespace, body)
except ApiException as e:
if e.status == httplib.NOT_FOUND:
LOG.warn("Secret %s under Namespace %s "
"not found." % (name, namespace))
else:
LOG.error("Failed to clean up Secret %s under "
"Namespace %s: %s" % (name, namespace, e.body))
raise
except Exception as e:
LOG.error("Kubernetes exception in kube_delete_secret: %s" % e)
raise
def kube_delete_namespace(self, namespace, **kwargs):
body = {}
if kwargs:
body.update(kwargs)
c = self._get_kubernetesclient_core()
try:
c.delete_namespace(namespace, body)
except ApiException as e:
if e.status == httplib.NOT_FOUND:
LOG.warn("Namespace %s not found." % namespace)
else:
LOG.error("Failed to clean up Namespace %s: "
"%s" % (namespace, e.body))
raise
except Exception as e:
LOG.error("Kubernetes exception in kube_delete_namespace: %s" % e)
raise
def kube_get_config_map(self, name, namespace):
c = self._get_kubernetesclient_core()
try:
c.read_namespaced_config_map(name, namespace)
return True
except ApiException as e:
if e.status == httplib.NOT_FOUND:
return False
else:
LOG.error("Failed to get ConfigMap %s under "
"Namespace %s: %s" % (name, namespace, e.body))
raise
except Exception as e:
LOG.error("Kubernetes exception in kube_get_config_map: %s" % e)
raise
def kube_read_config_map(self, name, namespace):
c = self._get_kubernetesclient_core()
try:
configmap = c.read_namespaced_config_map(name, namespace)
return configmap
except ApiException as e:
if e.status == httplib.NOT_FOUND:
LOG.error("Failed to read Configmap")
return None
else:
LOG.error("Failed to get ConfigMap %s under "
"Namespace %s: %s" % (name, namespace, e.body))
raise
except Exception as e:
LOG.error("Kubernetes exception in kube_read_config_map: %s" % e)
raise
return None
def kube_create_config_map(self, namespace, body):
c = self._get_kubernetesclient_core()
try:
c.create_namespaced_config_map(namespace, body)
except Exception as e:
LOG.error("Failed to create ConfigMap %s under Namespace %s: "
"%s" % (body['metadata']['name'], namespace, e))
raise
def kube_copy_config_map(self, name, src_namespace, dst_namespace):
c = self._get_kubernetesclient_core()
try:
body = c.read_namespaced_config_map(name, src_namespace, export=True)
body.metadata.namespace = dst_namespace
c.create_namespaced_config_map(dst_namespace, body)
except Exception as e:
LOG.error("Failed to copy ConfigMap %s from Namespace %s to Namespace "
"%s: %s" % (name, src_namespace, dst_namespace, e))
raise
def kube_delete_config_map(self, name, namespace, **kwargs):
body = {}
if kwargs:
body.update(kwargs)
c = self._get_kubernetesclient_core()
try:
c.delete_namespaced_config_map(name, namespace, body)
except ApiException as e:
if e.status == httplib.NOT_FOUND:
LOG.warn("ConfigMap %s under Namespace %s "
"not found." % (name, namespace))
else:
LOG.error("Failed to clean up ConfigMap %s under "
"Namespace %s: %s" % (name, namespace, e.body))
raise
except Exception as e:
LOG.error("Kubernetes exception in kube_delete_config_map: %s" % e)
raise
def kube_delete_collection_namespaced_job(self, namespace, label):
c = self._get_kubernetesclient_batch()
try:
c.delete_collection_namespaced_job(namespace, label_selector=label)
except Exception as e:
LOG.error("Failed to delete Jobs with label %s under "
"Namespace %s: %s" % (label, namespace, e))
raise
def get_custom_resource(self, group, version, namespace, plural, name):
custom_resource_api = self._get_kubernetesclient_custom_objects()
try:
cert = custom_resource_api.get_namespaced_custom_object(
group,
version,
namespace,
plural,
name)
except ApiException as e:
if e.status == httplib.NOT_FOUND:
return None
else:
LOG.error("Fail to access %s:%s. %s" % (namespace, name, e))
raise
else:
return cert
def apply_custom_resource(self, group, version, namespace, plural, name, body):
custom_resource_api = self._get_kubernetesclient_custom_objects()
# if resource already exists we apply just a patch
cert = self.get_custom_resource(group, version, namespace, plural, name)
if cert:
custom_resource_api.patch_namespaced_custom_object(group,
version,
namespace,
plural,
name,
body)
else:
custom_resource_api.create_namespaced_custom_object(group,
version,
namespace,
plural,
body)
def delete_custom_resource(self, group, version, namespace, plural, name):
c = self._get_kubernetesclient_custom_objects()
body = {}
try:
c.delete_namespaced_custom_object(group, version, namespace,
plural, name, body)
except ApiException as ex:
if ex.reason == "Not Found":
LOG.warn("Failed to delete custom object, Namespace %s: %s"
% (namespace, str(ex.body).replace('\n', ' ')))
pass
except Exception as e:
LOG.error("Failed to delete custom object, Namespace %s: %s"
% (namespace, e))
raise
def kube_get_service_account(self, name, namespace):
c = self._get_kubernetesclient_core()
try:
return c.read_namespaced_service_account(name, namespace)
except ApiException as e:
if e.status == httplib.NOT_FOUND:
return None
else:
LOG.error("Failed to get ServiceAccount %s under "
"Namespace %s: %s" % (name, namespace, e.body))
raise
except Exception as e:
LOG.error("Kubernetes exception in kube_get_service_account: %s" % e)
raise
def kube_get_service_account_token(self, name, namespace):
sa = self.kube_get_service_account(name, namespace)
if not sa:
# ServiceAccount does not exist, no token available
return None
secret = self.kube_get_secret(sa.secrets[0].name, namespace)
return secret.data.get('token')
def kube_get_control_plane_pod_ready_status(self):
"""Returns the ready status of the control plane pods."""
c = self._get_kubernetesclient_core()
# First get a list of master nodes
master_nodes = list()
api_response = c.list_node(
label_selector="node-role.kubernetes.io/master")
for node in api_response.items:
master_nodes.append(node.metadata.name)
# Populate status dictionary
ready_status = dict()
for node_name in master_nodes:
for component in [KUBE_APISERVER,
KUBE_CONTROLLER_MANAGER,
KUBE_SCHEDULER]:
# Control plane pods are named by component and node.
# E.g. kube-apiserver-controller-0
pod_name = component + '-' + node_name
ready_status[pod_name] = None
# Retrieve the control plane pods
api_response = c.list_pod_for_all_namespaces(
label_selector="component in (%s,%s,%s)" % (
KUBE_APISERVER, KUBE_CONTROLLER_MANAGER, KUBE_SCHEDULER)
)
pods = api_response.items
for pod in pods:
if pod.status.conditions is not None:
for condition in pod.status.conditions:
if condition.type == "Ready":
ready_status[pod.metadata.name] = condition.status
return ready_status
def kube_get_control_plane_versions(self):
"""Returns the lowest control plane component version on each
master node."""
c = self._get_kubernetesclient_core()
# First get a list of master nodes
master_nodes = list()
api_response = c.list_node(
label_selector="node-role.kubernetes.io/master")
for node in api_response.items:
master_nodes.append(node.metadata.name)
node_versions = dict()
for node_name in master_nodes:
versions = list()
for component in [KUBE_APISERVER,
KUBE_CONTROLLER_MANAGER,
KUBE_SCHEDULER]:
# Control plane pods are named by component and node.
# E.g. kube-apiserver-controller-0
pod_name = component + '-' + node_name
image = self.kube_get_image_by_pod_name(
pod_name, NAMESPACE_KUBE_SYSTEM, component)
if image is not None:
versions.append(LooseVersion(image.rsplit(':')[-1]))
# Calculate the lowest version
node_versions[node_name] = str(min(versions))
return node_versions
def kube_get_kubelet_versions(self):
"""Returns the kubelet version on each node."""
c = self._get_kubernetesclient_core()
kubelet_versions = dict()
api_response = c.list_node()
for node in api_response.items:
kubelet_versions[node.metadata.name] = \
node.status.node_info.kubelet_version
return kubelet_versions
def kube_get_version_states(self):
"""Returns the state of each known kubernetes version."""
# Set counts to 0
version_counts = dict()
kube_versions = get_kube_versions()
for version in kube_versions:
version_counts[version['version']] = 0
# Count versions running on control plane
cp_versions = self.kube_get_control_plane_versions()
for cp_version in cp_versions.values():
if cp_version in version_counts:
version_counts[cp_version] += 1
else:
LOG.error("Unknown control plane version %s running." %
cp_version)
# Count versions running on kubelets
kubelet_versions = self.kube_get_kubelet_versions()
for kubelet_version in kubelet_versions.values():
if kubelet_version in version_counts:
version_counts[kubelet_version] += 1
else:
LOG.error("Unknown kubelet version %s running." %
kubelet_version)
version_states = dict()
active_candidates = list()
for version, count in version_counts.items():
if count > 0:
# This version is at least partially running
version_states[version] = KUBE_STATE_PARTIAL
active_candidates.append(version)
else:
# This version is not running anywhere
version_states[version] = KUBE_STATE_UNAVAILABLE
# If only a single version is running, then mark it as active
if len(active_candidates) == 1:
active_version = active_candidates[0]
version_states[active_version] = KUBE_STATE_ACTIVE
# mark the versions who can upgrade_from the active one as available
for version in kube_versions:
if active_version in version['upgrade_from']:
version_states[version['version']] = KUBE_STATE_AVAILABLE
return version_states
def kube_get_kubernetes_version(self):
"""Returns the kubernetes version from the kubadm config map."""
c = self._get_kubernetesclient_core()
# Get the kubernetes version from the kubeadm config map
config_map = c.read_namespaced_config_map('kubeadm-config',
NAMESPACE_KUBE_SYSTEM)
cluster_config = config_map.data['ClusterConfiguration']
match = re.search('\nkubernetesVersion: (.*)\n', cluster_config)
if match is None:
LOG.error("Unable to find kubernetesVersion in kubeadm-config %s" %
config_map)
return None
else:
return match.group(1)
def kube_get_all_pods(self):
c = self._get_kubernetesclient_core()
try:
api_response = c.list_pod_for_all_namespaces(watch=False)
return api_response.items
except Exception as e:
LOG.error("Kubernetes exception in "
"kube_get_pods: %s" % e)
raise
def kube_delete_pod(self, name, namespace, **kwargs):
body = {}
if kwargs:
body.update(kwargs)
c = self._get_kubernetesclient_core()
try:
api_response = c.delete_namespaced_pod(name, namespace, body)
LOG.debug("%s" % api_response)
return True
except ApiException as e:
if e.status == httplib.NOT_FOUND:
LOG.warn("Pod %s/%s not found." % (namespace, name))
return False
else:
LOG.error("Failed to delete Pod %s/%s: "
"%s" % (namespace, name, e.body))
raise
except Exception as e:
LOG.error("Kubernetes exception in kube_delete_pod: %s" % e)
raise
def kube_get_pod(self, name, namespace):
c = self._get_kubernetesclient_core()
try:
api_response = c.read_namespaced_pod(name, namespace)
return api_response
except ApiException as e:
if e.status == httplib.NOT_FOUND:
return None
else:
LOG.error("Failed to get Pod %s/%s: %s" % (namespace, name,
e.body))
raise
except Exception as e:
LOG.error("Kubernetes exception in "
"kube_get_pod %s/%s: %s" % (namespace, name, e))
def kube_get_pods_by_selector(self, namespace, label_selector,
field_selector):
c = self._get_kubernetesclient_core()
try:
api_response = c.list_namespaced_pod(namespace,
label_selector="%s" % label_selector,
field_selector="%s" % field_selector)
LOG.debug("Response: %s" % api_response)
return api_response.items
except ApiException as e:
LOG.error("Kubernetes exception in "
"kube_get_pods_by_selector %s/%s/%s: %s",
namespace, label_selector, field_selector, e)
raise
# NOTE: This is desired method to exec commands in a container.
# The minimal usage example indicates this can get separate streams for
# stdout and stderr. The code below produces a string of merged output,
# so we cannot deduce whether the provided exec_command is failing.
# This API can replace Popen/poll/kubectl exec calls if we peek at
# api_response. We require ability to poll, read and flush output from
# long running commands, wait for command completion, and timeout.
# See the following documentation:
# https://github.com/kubernetes-client/python/blob/master/examples/pod_exec.py
# https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md
def kube_exec_container_stream(self, name, namespace, exec_command, container=None):
c = self._get_kubernetesclient_core()
try:
api_response = stream(c.connect_get_namespaced_pod_exec,
name,
namespace,
container=container,
command=exec_command,
stderr=True, stdin=False,
stdout=True, tty=False)
return api_response
except ApiException as e:
LOG.error("Failed to exec Pod %s/%s: %s" % (namespace, name,
e.body))
raise
except Exception as e:
LOG.error("Kubernetes exception in "
"kube_exec_container %s/%s: %s" % (namespace, name, e))
raise
def kube_delete_validating_webhook_configuration(self, name, **kwargs):
c = self._get_kubernetesclient_admission_registration()
body = {}
if kwargs:
body.update(kwargs)
try:
c.delete_validating_webhook_configuration(name, body)
except ApiException as e:
if e.status == httplib.NOT_FOUND:
LOG.warn("ValidatingWebhookConfiguration %s "
"not found." % name)
else:
LOG.error("Failed to clean up ValidatingWebhookConfiguration "
"%s : %s" % (name, e.body))
raise
except Exception as e:
LOG.error("Kubernetes exception "
"in kube_delete_validating_webhook_configuration: %s" % e)
raise
def kube_get_validating_webhook_configurations_by_selector(self, label_selector, field_selector):
c = self._get_kubernetesclient_admission_registration()
try:
api_response = c.list_validating_webhook_configuration(
label_selector="%s" % label_selector,
field_selector="%s" % field_selector)
LOG.debug("kube_get_validating_webhook_configurations_by_selector "
"Response: %s" % api_response)
return api_response.items
except ApiException as e:
LOG.error("Kubernetes exception in "
"kube_get_validating_webhook_configurations_by_selector %s/%s: %s",
label_selector, field_selector, e)
raise
def kube_delete_mutating_webhook_configuration(self, name, **kwargs):
c = self._get_kubernetesclient_admission_registration()
body = {}
if kwargs:
body.update(kwargs)
try:
c.delete_mutating_webhook_configuration(name, body)
except ApiException as e:
if e.status == httplib.NOT_FOUND:
LOG.warn("MutatingWebhookConfiguration %s "
"not found." % name)
else:
LOG.error("Failed to clean up MutatingWebhookConfiguration "
"%s : %s" % (name, e.body))
raise
except Exception as e:
LOG.error("Kubernetes exception "
"in kube_delete_mutating_webhook_configuration: %s" % e)
raise
def kube_get_mutating_webhook_configurations_by_selector(self, label_selector, field_selector):
c = self._get_kubernetesclient_admission_registration()
try:
api_response = c.list_mutating_webhook_configuration(
label_selector="%s" % label_selector,
field_selector="%s" % field_selector)
LOG.debug("kube_get_mutating_webhook_configurations_by_selector "
"Response: %s" % api_response)
return api_response.items
except ApiException as e:
LOG.error("Kubernetes exception in "
"kube_get_mutating_webhook_configurations_by_selector %s/%s: %s",
label_selector, field_selector, e)
raise
def get_cert_secret(self, name, namespace, max_retries=4):
for i in range(0, max_retries):
secret = self.kube_get_secret(name, NAMESPACE_DEPLOYMENT)
if secret is not None:
return secret
time.sleep(5)
return None