distcloud/distributedcloud/dccommon/kubeoperator.py
Li Zhu d951fcf20b Debian: Update python-kubernetes function calls
On Debian, a newer version of python-kubernetes is used, and it has
some function definitions changes, mainly, for some functions,
the argument 'body' is no longer a positional argument, these have
been updated to work on both OS's.

Test Plan:
PASS: Subcloud delete on CentOS
PASS: Subcloud delete on Debian

Story: 2010119
Task: 45840

Signed-off-by: Li Zhu <li.zhu@windriver.com>
Change-Id: I7be9a09b715713c9f8d4c47241cf33558223a6a6
2022-07-21 02:41:16 +00:00

152 lines
5.0 KiB
Python

#
# Copyright (c) 2020-2022 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
from kubernetes import __version__ as K8S_MODULE_VERSION
from kubernetes import client
from kubernetes.client import Configuration
from kubernetes.client.rest import ApiException
from kubernetes import config
from oslo_log import log as logging
from six.moves import http_client as httplib
LOG = logging.getLogger(__name__)
K8S_MODULE_MAJOR_VERSION = int(K8S_MODULE_VERSION.split('.')[0])
KUBE_CONFIG_PATH = '/etc/kubernetes/admin.conf'
CERT_MANAGER_GROUP = 'cert-manager.io'
CERT_MANAGER_VERSION = 'v1'
CERT_MANAGER_CERTIFICATE = 'certificates'
class KubeOperator(object):
def __init__(self):
self._kube_client_batch = None
self._kube_client_core = None
self._kube_client_custom_objects = None
def _load_kube_config(self):
config.load_kube_config(KUBE_CONFIG_PATH)
if K8S_MODULE_MAJOR_VERSION < 12:
c = Configuration()
else:
c = Configuration().get_default_copy()
# Workaround: Turn off SSL/TLS verification
c.verify_ssl = False
Configuration.set_default(c)
def _get_kubernetesclient_batch(self):
if not self._kube_client_batch:
self._load_kube_config()
self._kube_client_batch = client.BatchV1Api()
return self._kube_client_batch
def _get_kubernetesclient_core(self):
if not self._kube_client_core:
self._load_kube_config()
self._kube_client_core = client.CoreV1Api()
return self._kube_client_core
def _get_kubernetesclient_custom_objects(self):
if not self._kube_client_custom_objects:
self._load_kube_config()
self._kube_client_custom_objects = client.CustomObjectsApi()
return self._kube_client_custom_objects
def kube_get_secret(self, name, namespace):
c = self._get_kubernetesclient_core()
try:
return c.read_namespaced_secret(name, namespace)
except ApiException as e:
if e.status == httplib.NOT_FOUND:
return None
else:
LOG.error("Failed to get Secret %s under "
"Namespace %s: %s" % (name, namespace, e.body))
raise
except Exception as e:
LOG.error("Kubernetes exception in kube_get_secret: %s" % e)
raise
def kube_delete_secret(self, name, namespace, **kwargs):
body = {}
if kwargs:
body.update(kwargs)
c = self._get_kubernetesclient_core()
try:
c.delete_namespaced_secret(name, namespace, body=body)
except ApiException as e:
if e.status == httplib.NOT_FOUND:
LOG.warn("Secret %s under Namespace %s "
"not found." % (name, namespace))
else:
LOG.error("Failed to clean up Secret %s under "
"Namespace %s: %s" % (name, namespace, e.body))
raise
except Exception as e:
LOG.error("Kubernetes exception in kube_delete_secret: %s" % e)
raise
def get_cert_manager_certificate(self, namespace, name):
custom_object_api = self._get_kubernetesclient_custom_objects()
try:
cert = custom_object_api.get_namespaced_custom_object(
CERT_MANAGER_GROUP,
CERT_MANAGER_VERSION,
namespace,
CERT_MANAGER_CERTIFICATE,
name)
except ApiException as e:
if e.status == httplib.NOT_FOUND:
return None
else:
LOG.error("Fail to access %s:%s. %s" % (namespace, name, e))
raise
else:
return cert
def apply_cert_manager_certificate(self, namespace, name, body):
custom_object_api = self._get_kubernetesclient_custom_objects()
cert = self.get_cert_manager_certificate(namespace, name)
if cert:
custom_object_api.patch_namespaced_custom_object(
CERT_MANAGER_GROUP,
CERT_MANAGER_VERSION,
namespace,
CERT_MANAGER_CERTIFICATE,
name,
body
)
else:
custom_object_api.create_namespaced_custom_object(
CERT_MANAGER_GROUP,
CERT_MANAGER_VERSION,
namespace,
CERT_MANAGER_CERTIFICATE,
body)
def delete_cert_manager_certificate(self, namespace, name):
custom_object_api = self._get_kubernetesclient_custom_objects()
try:
custom_object_api.delete_namespaced_custom_object(
CERT_MANAGER_GROUP,
CERT_MANAGER_VERSION,
namespace,
CERT_MANAGER_CERTIFICATE,
name,
body={}
)
except ApiException as e:
if e.status != httplib.NOT_FOUND:
LOG.error("Fail to delete %s:%s. %s" % (namespace, name, e))
raise