Drop Kubernetes Python client dependency

We depend on the Kubernetes Python client for several things such as
health checks & metrics polling.  Those are both run inside periodic
jobs which spawn in greenthreads.

The Kubernetes API uses it's own thread pools which seem to use
native pools and cause several different deadlocks when it comes to
logging.  Since we don't make extensive use of the Kubernetes API
and we want something that doesn't use any threadpools, we can
simply use a simple wrapper using Requests.

This patch takes care of dropping the dependency and refactoring
all the code to use this simple mechansim instead, which should
reduce the overall dependency list as well as avoid any deadlock
issues which are present in the upstream client.

Change-Id: If0b7c96cb77bba0c79a678c9885622f1fe0f7ebc
This commit is contained in:
Mohammed Naser 2021-12-04 11:16:18 +04:00 committed by Spyros Trigazis
parent a9b9ba2361
commit 6eb907cc89
8 changed files with 326 additions and 232 deletions

View File

@ -49,7 +49,6 @@ jsonschema==2.6.0
keystoneauth1==3.14.0
keystonemiddleware==9.0.0
kombu==5.0.1
kubernetes==12.0.0
linecache2==1.0.0
logutils==0.3.5
Mako==1.0.7
@ -124,6 +123,7 @@ repoze.lru==0.7
requests-oauthlib==0.8.0
requests-toolbelt==0.8.0
requests==2.20.1
requests-mock==1.2.0
requestsexceptions==1.4.0
restructuredtext-lint==1.1.3
rfc3986==1.2.0

View File

@ -12,134 +12,91 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import tempfile
from kubernetes import client as k8s_config
from kubernetes.client import api_client
from kubernetes.client.apis import core_v1_api
from kubernetes.client import configuration as k8s_configuration
from kubernetes.client import rest
from oslo_log import log as logging
import requests
from magnum.conductor.handlers.common.cert_manager import create_client_files
LOG = logging.getLogger(__name__)
class ApiClient(api_client.ApiClient):
def __init__(self, configuration=None, header_name=None,
header_value=None, cookie=None):
if configuration is None:
configuration = k8s_configuration.Configuration()
self.configuration = configuration
self.rest_client = rest.RESTClientObject(configuration)
self.default_headers = {}
if header_name is not None:
self.default_headers[header_name] = header_value
self.cookie = cookie
def __del__(self):
pass
def call_api(self, resource_path, method,
path_params=None, query_params=None, header_params=None,
body=None, post_params=None, files=None,
response_type=None, auth_settings=None,
_return_http_data_only=None, collection_formats=None,
_preload_content=True, _request_timeout=None, **kwargs):
"""Makes http request (synchronous) and return the deserialized data
:param resource_path: Path to method endpoint.
:param method: Method to call.
:param path_params: Path parameters in the url.
:param query_params: Query parameters in the url.
:param header_params: Header parameters to be
placed in the request header.
:param body: Request body.
:param post_params dict: Request post form parameters,
for `application/x-www-form-urlencoded`, `multipart/form-data`.
:param auth_settings list: Auth Settings names for the request.
:param response: Response data type.
:param files dict: key -> filename, value -> filepath,
for `multipart/form-data`.
:param _return_http_data_only: response data without head status code
and headers
:param collection_formats: dict of collection formats for path, query,
header, and post parameters.
:param _preload_content: if False, the urllib3.HTTPResponse object will
be returned without reading/decoding response
data. Default is True.
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: The method will return the response directly
class KubernetesAPI:
"""
return self.__call_api(resource_path, method,
path_params, query_params, header_params,
body, post_params, files,
response_type, auth_settings,
_return_http_data_only, collection_formats,
_preload_content, _request_timeout)
Simple Kubernetes API client using requests.
This API wrapper allows for a set of very simple operations to be
performed on a Kubernetes cluster using the `requests` library. The
reason behind it is that the native `kubernetes` library does not
seem to be quite thread-safe at the moment.
class K8sAPI(core_v1_api.CoreV1Api):
def _create_temp_file_with_content(self, content):
"""Creates temp file and write content to the file.
:param content: file content
:returns: temp file
Also, our interactions with the Kubernetes API are happening inside
Greenthreads so we don't need to use connection pooling on top of it,
in addition to pools not being something that you can disable with
the native Kubernetes API.
"""
try:
tmp = tempfile.NamedTemporaryFile(delete=True)
tmp.write(content)
tmp.flush()
except Exception as err:
LOG.error("Error while creating temp file: %s", err)
raise
return tmp
def __init__(self, context, cluster):
self.ca_file = None
self.cert_file = None
self.key_file = None
self.context = context
self.cluster = cluster
if cluster.magnum_cert_ref:
(self.ca_file, self.key_file,
self.cert_file) = create_client_files(cluster, context)
# Load certificates for cluster
(self.ca_file, self.key_file, self.cert_file) = create_client_files(
self.cluster, self.context
)
config = k8s_config.Configuration()
config.host = cluster.api_address
config.ssl_ca_cert = self.ca_file.name
config.cert_file = self.cert_file.name
config.key_file = self.key_file.name
def _request(self, method, url, json=True):
response = requests.request(
method,
url,
verify=self.ca_file.name,
cert=(self.cert_file.name, self.key_file.name)
)
response.raise_for_status()
if json:
return response.json()
else:
return response.text
# build a connection with Kubernetes master
client = ApiClient(configuration=config)
def get_healthz(self):
"""
Get the health of the cluster from API
"""
return self._request(
'GET',
f"{self.cluster.api_address}/healthz",
json=False
)
super(K8sAPI, self).__init__(client)
def list_node(self):
"""
List all nodes in the cluster.
:return: List of nodes.
"""
return self._request(
'GET',
f"{self.cluster.api_address}/api/v1/nodes"
)
def list_namespaced_pod(self, namespace):
"""
List all pods in the given namespace.
:param namespace: Namespace to list pods from.
:return: List of pods.
"""
return self._request(
'GET',
f"{self.cluster.api_address}/api/v1/namespaces/{namespace}/pods"
)
def __del__(self):
if self.ca_file:
self.ca_file.close()
if self.cert_file:
self.cert_file.close()
if self.key_file:
self.key_file.close()
def create_k8s_api(context, cluster):
"""Create a kubernetes API client
Creates connection with Kubernetes master and creates ApivApi instance
to call Kubernetes APIs.
:param context: The security context
:param cluster: Cluster object
"""
return K8sAPI(context, cluster)
Close all of the file descriptions for the certificates, since they
are left open by `create_client_files`.
TODO(mnaser): Use a context manager and avoid having these here.
"""
if hasattr(self, 'ca_file'):
self.ca_file.close()
if hasattr(self, 'cert_file'):
self.cert_file.close()
if hasattr(self, 'key_file'):
self.key_file.close()

View File

@ -42,7 +42,7 @@ class K8sMonitor(monitors.MonitorBase):
}
def pull_data(self):
k8s_api = k8s.create_k8s_api(self.context, self.cluster)
k8s_api = k8s.KubernetesAPI(self.context, self.cluster)
nodes = k8s_api.list_node()
self.data['nodes'] = self._parse_node_info(nodes)
pods = k8s_api.list_namespaced_pod('default')
@ -52,7 +52,7 @@ class K8sMonitor(monitors.MonitorBase):
if self._is_magnum_auto_healer_running():
return
k8s_api = k8s.create_k8s_api(self.context, self.cluster)
k8s_api = k8s.KubernetesAPI(self.context, self.cluster)
if self._is_cluster_accessible():
status, reason = self._poll_health_status(k8s_api)
else:
@ -132,20 +132,16 @@ class K8sMonitor(monitors.MonitorBase):
[{'Memory': 1280000.0, cpu: 0.5},
{'Memory': 1280000.0, cpu: 0.5}]
"""
pods = pods.items
pods = pods['items']
parsed_containers = []
for pod in pods:
containers = pod.spec.containers
containers = pod['spec']['containers']
for container in containers:
memory = 0
cpu = 0
resources = container.resources
limits = resources.limits
resources = container['resources']
limits = resources['limits']
if limits is not None:
# Output of resources.limits is string
# for example:
# limits = "{cpu': '500m': 'memory': '1000Ki'}"
limits = ast.literal_eval(limits)
if limits.get('memory', ''):
memory = utils.get_k8s_quantity(limits['memory'])
if limits.get('cpu', ''):
@ -184,13 +180,13 @@ class K8sMonitor(monitors.MonitorBase):
{'cpu': 1, 'Memory': 1024.0}]
"""
nodes = nodes.items
nodes = nodes['items']
parsed_nodes = []
for node in nodes:
# Output of node.status.capacity is strong
# for example:
# capacity = "{'cpu': '1', 'memory': '1000Ki'}"
capacity = node.status.capacity
capacity = node['status']['capacity']
memory = utils.get_k8s_quantity(capacity['memory'])
cpu = int(capacity['cpu'])
parsed_nodes.append({'Memory': memory, 'Cpu': cpu})
@ -234,15 +230,14 @@ class K8sMonitor(monitors.MonitorBase):
api_status = None
try:
api_status, _, _ = k8s_api.api_client.call_api(
'/healthz', 'GET', response_type=object)
api_status = k8s_api.get_healthz()
for node in k8s_api.list_node().items:
node_key = node.metadata.name + ".Ready"
for node in k8s_api.list_node()['items']:
node_key = node['metadata']['name'] + ".Ready"
ready = False
for condition in node.status.conditions:
if condition.type == 'Ready':
ready = strutils.bool_from_string(condition.status)
for condition in node['status']['conditions']:
if condition['type'] == 'Ready':
ready = strutils.bool_from_string(condition['status'])
break
health_status_reason[node_key] = ready

View File

@ -20,9 +20,9 @@ class K8sScaleManager(ScaleManager):
super(K8sScaleManager, self).__init__(context, osclient, cluster)
def _get_hosts_with_container(self, context, cluster):
k8s_api = k8s.create_k8s_api(self.context, cluster)
k8s_api = k8s.KubernetesAPI(context, cluster)
hosts = set()
for pod in k8s_api.list_namespaced_pod(namespace='default').items:
hosts.add(pod.spec.node_name)
for pod in k8s_api.list_namespaced_pod(namespace='default')['items']:
hosts.add(pod['spec']['node_name'])
return hosts

View File

@ -13,10 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
import tempfile
from unittest import mock
from oslo_serialization import jsonutils
from requests_mock.contrib import fixture
from magnum.common import exception
from magnum.drivers.common import k8s_monitor
@ -28,9 +29,6 @@ from magnum.objects import fields as m_fields
from magnum.tests import base
from magnum.tests.unit.db import utils
NODE_STATUS_CONDITION = namedtuple('Condition',
['type', 'status'])
class MonitorsTestCase(base.TestCase):
@ -47,6 +45,7 @@ class MonitorsTestCase(base.TestCase):
def setUp(self):
super(MonitorsTestCase, self).setUp()
self.requests_mock = self.useFixture(fixture.Fixture())
cluster = utils.get_test_cluster(node_addresses=['1.2.3.4'],
api_address='https://5.6.7.8:2376',
master_addresses=['10.0.0.6'],
@ -233,24 +232,50 @@ class MonitorsTestCase(base.TestCase):
mem_util = self.v2_monitor.compute_memory_util()
self.assertEqual(0, mem_util)
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
def test_k8s_monitor_pull_data_success(self, mock_k8s_api):
mock_nodes = mock.MagicMock()
mock_node = mock.MagicMock()
mock_node.status = mock.MagicMock()
mock_node.status.capacity = {'memory': '2000Ki', 'cpu': '1'}
mock_nodes.items = [mock_node]
mock_k8s_api.return_value.list_node.return_value = (
mock_nodes)
mock_pods = mock.MagicMock()
mock_pod = mock.MagicMock()
mock_pod.spec = mock.MagicMock()
mock_container = mock.MagicMock()
mock_container.resources = mock.MagicMock()
mock_container.resources.limits = "{'memory': '100Mi', 'cpu': '500m'}"
mock_pod.spec.containers = [mock_container]
mock_pods.items = [mock_pod]
mock_k8s_api.return_value.list_namespaced_pod.return_value = mock_pods
@mock.patch('magnum.conductor.k8s_api.create_client_files')
def test_k8s_monitor_pull_data_success(self, mock_create_client_files):
mock_create_client_files.return_value = (
tempfile.NamedTemporaryFile(),
tempfile.NamedTemporaryFile(),
tempfile.NamedTemporaryFile()
)
self.requests_mock.register_uri(
'GET',
f"{self.cluster.api_address}/api/v1/nodes",
json={
'items': [
{
'status': {
'capacity': {'memory': '2000Ki', 'cpu': '1'}
}
}
]
},
)
self.requests_mock.register_uri(
'GET',
f"{self.cluster.api_address}/api/v1/namespaces/default/pods",
json={
'items': [
{
'spec': {
'containers': [
{
'resources': {
'limits': {
'memory': '100Mi',
'cpu': '500m'
}
}
}
]
}
}
]
}
)
self.k8s_monitor.pull_data()
self.assertEqual(self.k8s_monitor.data['nodes'],
@ -444,20 +469,41 @@ class MonitorsTestCase(base.TestCase):
cpu_util = self.mesos_monitor.compute_cpu_util()
self.assertEqual(0, cpu_util)
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
def test_k8s_monitor_health_healthy(self, mock_k8s_api):
mock_nodes = mock.MagicMock()
mock_node = mock.MagicMock()
mock_api_client = mock.MagicMock()
mock_node.status = mock.MagicMock()
mock_node.metadata.name = 'k8s-cluster-node-0'
mock_node.status.conditions = [NODE_STATUS_CONDITION(type='Ready',
status='True')]
mock_nodes.items = [mock_node]
mock_k8s_api.return_value.list_node.return_value = (
mock_nodes)
mock_k8s_api.return_value.api_client = mock_api_client
mock_api_client.call_api.return_value = ('ok', None, None)
@mock.patch('magnum.conductor.k8s_api.create_client_files')
def test_k8s_monitor_health_healthy(self, mock_create_client_files):
mock_create_client_files.return_value = (
tempfile.NamedTemporaryFile(),
tempfile.NamedTemporaryFile(),
tempfile.NamedTemporaryFile()
)
self.requests_mock.register_uri(
'GET',
f"{self.cluster.api_address}/api/v1/nodes",
json={
'items': [
{
'metadata': {
'name': 'k8s-cluster-node-0'
},
'status': {
'conditions': [
{
'type': 'Ready',
'status': 'True',
}
]
}
}
]
}
)
self.requests_mock.register_uri(
'GET',
f"{self.cluster.api_address}/healthz",
text="ok",
)
self.k8s_monitor.poll_health_status()
self.assertEqual(self.k8s_monitor.data['health_status'],
@ -465,21 +511,41 @@ class MonitorsTestCase(base.TestCase):
self.assertEqual(self.k8s_monitor.data['health_status_reason'],
{'api': 'ok', 'k8s-cluster-node-0.Ready': True})
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
def test_k8s_monitor_health_unhealthy_api(self, mock_k8s_api):
mock_nodes = mock.MagicMock()
mock_node = mock.MagicMock()
mock_api_client = mock.MagicMock()
mock_node.status = mock.MagicMock()
mock_node.metadata.name = 'k8s-cluster-node-0'
mock_node.status.conditions = [NODE_STATUS_CONDITION(type='Ready',
status='True')]
mock_nodes.items = [mock_node]
mock_k8s_api.return_value.list_node.return_value = (
mock_nodes)
mock_k8s_api.return_value.api_client = mock_api_client
mock_api_client.call_api.side_effect = exception.MagnumException(
message='failed')
@mock.patch('magnum.conductor.k8s_api.create_client_files')
def test_k8s_monitor_health_unhealthy_api(self, mock_create_client_files):
mock_create_client_files.return_value = (
tempfile.NamedTemporaryFile(),
tempfile.NamedTemporaryFile(),
tempfile.NamedTemporaryFile()
)
self.requests_mock.register_uri(
'GET',
f"{self.cluster.api_address}/api/v1/nodes",
json={
'items': [
{
'metadata': {
'name': 'k8s-cluster-node-0'
},
'status': {
'conditions': [
{
'type': 'Ready',
'status': 'True',
}
]
}
}
]
}
)
self.requests_mock.register_uri(
'GET',
f"{self.cluster.api_address}/healthz",
exc=exception.MagnumException(message='failed'),
)
self.k8s_monitor.poll_health_status()
self.assertEqual(self.k8s_monitor.data['health_status'],
@ -487,27 +553,54 @@ class MonitorsTestCase(base.TestCase):
self.assertEqual(self.k8s_monitor.data['health_status_reason'],
{'api': 'failed'})
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
def test_k8s_monitor_health_unhealthy_node(self, mock_k8s_api):
mock_nodes = mock.MagicMock()
mock_api_client = mock.MagicMock()
@mock.patch('magnum.conductor.k8s_api.create_client_files')
def test_k8s_monitor_health_unhealthy_node(self, mock_create_client_files):
mock_create_client_files.return_value = (
tempfile.NamedTemporaryFile(),
tempfile.NamedTemporaryFile(),
tempfile.NamedTemporaryFile()
)
mock_node0 = mock.MagicMock()
mock_node0.status = mock.MagicMock()
mock_node0.metadata.name = 'k8s-cluster-node-0'
mock_node0.status.conditions = [NODE_STATUS_CONDITION(type='Ready',
status='False')]
mock_node1 = mock.MagicMock()
mock_node1.status = mock.MagicMock()
mock_node1.metadata.name = 'k8s-cluster-node-1'
mock_node1.status.conditions = [NODE_STATUS_CONDITION(type='Ready',
status='True')]
self.requests_mock.register_uri(
'GET',
f"{self.cluster.api_address}/api/v1/nodes",
json={
'items': [
{
'metadata': {
'name': 'k8s-cluster-node-0'
},
'status': {
'conditions': [
{
'type': 'Ready',
'status': 'False',
}
]
}
},
{
'metadata': {
'name': 'k8s-cluster-node-1'
},
'status': {
'conditions': [
{
'type': 'Ready',
'status': 'True',
}
]
}
}
]
}
)
mock_nodes.items = [mock_node0, mock_node1]
mock_k8s_api.return_value.list_node.return_value = (
mock_nodes)
mock_k8s_api.return_value.api_client = mock_api_client
mock_api_client.call_api.return_value = ('ok', None, None)
self.requests_mock.register_uri(
'GET',
f"{self.cluster.api_address}/healthz",
text="ok",
)
self.k8s_monitor.poll_health_status()
self.assertEqual(self.k8s_monitor.data['health_status'],
@ -516,24 +609,48 @@ class MonitorsTestCase(base.TestCase):
{'api': 'ok', 'k8s-cluster-node-0.Ready': False,
'api': 'ok', 'k8s-cluster-node-1.Ready': True})
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
def test_k8s_monitor_health_unreachable_cluster(self, mock_k8s_api):
mock_nodes = mock.MagicMock()
mock_node = mock.MagicMock()
mock_node.status = mock.MagicMock()
mock_nodes.items = [mock_node]
@mock.patch('magnum.conductor.k8s_api.create_client_files')
def test_k8s_monitor_health_unreachable_cluster(self, mock_create_client_files):
mock_create_client_files.return_value = (
tempfile.NamedTemporaryFile(),
tempfile.NamedTemporaryFile(),
tempfile.NamedTemporaryFile()
)
self.requests_mock.register_uri(
'GET',
f"{self.cluster.api_address}/api/v1/nodes",
json={
'items': [
{}
]
}
)
self.k8s_monitor.cluster.floating_ip_enabled = False
self.k8s_monitor.poll_health_status()
self.assertEqual(self.k8s_monitor.data['health_status'],
m_fields.ClusterHealthStatus.UNKNOWN)
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
def test_k8s_monitor_health_unreachable_with_master_lb(self, mock_k8s_api):
mock_nodes = mock.MagicMock()
mock_node = mock.MagicMock()
mock_node.status = mock.MagicMock()
mock_nodes.items = [mock_node]
@mock.patch('magnum.conductor.k8s_api.create_client_files')
def test_k8s_monitor_health_unreachable_with_master_lb(self, mock_create_client_files):
mock_create_client_files.return_value = (
tempfile.NamedTemporaryFile(),
tempfile.NamedTemporaryFile(),
tempfile.NamedTemporaryFile()
)
self.requests_mock.register_uri(
'GET',
f"{self.cluster.api_address}/api/v1/nodes",
json={
'items': [
{}
]
}
)
cluster = self.k8s_monitor.cluster
cluster.floating_ip_enabled = True
cluster.master_lb_enabled = True

View File

@ -12,8 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import tempfile
from unittest import mock
from requests_mock.contrib import fixture
from magnum.common import exception
from magnum.conductor import scale_manager
from magnum.drivers.common.k8s_scale_manager import K8sScaleManager
@ -181,23 +184,45 @@ class TestScaleManager(base.TestCase):
class TestK8sScaleManager(base.TestCase):
def setUp(self):
super(TestK8sScaleManager, self).setUp()
self.requests_mock = self.useFixture(fixture.Fixture())
@mock.patch('magnum.objects.Cluster.get_by_uuid')
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
def test_get_hosts_with_container(self, mock_create_api, mock_get):
pods = mock.MagicMock()
pod_1 = mock.MagicMock()
pod_1.spec.node_name = 'node1'
pod_2 = mock.MagicMock()
pod_2.spec.node_name = 'node2'
pods.items = [pod_1, pod_2]
mock_api = mock.MagicMock()
mock_api.list_namespaced_pod.return_value = pods
mock_create_api.return_value = mock_api
@mock.patch('magnum.conductor.k8s_api.create_client_files')
def test_get_hosts_with_container(self, mock_create_client_files, mock_get):
mock_cluster = mock.MagicMock()
mock_cluster.api_address = "https://foobar.com:6443"
mock_create_client_files.return_value = (
tempfile.NamedTemporaryFile(),
tempfile.NamedTemporaryFile(),
tempfile.NamedTemporaryFile()
)
self.requests_mock.register_uri(
'GET',
f"{mock_cluster.api_address}/api/v1/namespaces/default/pods",
json={
'items': [
{
'spec': {
'node_name': 'node1',
}
},
{
'spec': {
'node_name': 'node2',
}
}
]
},
)
mgr = K8sScaleManager(
mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
hosts = mgr._get_hosts_with_container(
mock.MagicMock(), mock.MagicMock())
mock.MagicMock(), mock_cluster)
self.assertEqual(hosts, {'node1', 'node2'})

View File

@ -19,7 +19,6 @@ iso8601>=0.1.11 # MIT
jsonpatch!=1.20,>=1.16 # BSD
keystoneauth1>=3.14.0 # Apache-2.0
keystonemiddleware>=9.0.0 # Apache-2.0
kubernetes>=12.0.0 # Apache-2.0
marathon!=0.9.1,>=0.8.6 # MIT
netaddr>=0.7.18 # BSD
oslo.concurrency>=4.1.0 # Apache-2.0

View File

@ -16,6 +16,7 @@ osprofiler>=3.4.0 # Apache-2.0
Pygments>=2.7.2 # BSD license
python-subunit>=1.4.0 # Apache-2.0/BSD
pytz>=2020.4 # MIT
requests-mock>=1.2.0 # Apache-2.0
testrepository>=0.0.20 # Apache-2.0/BSD
stestr>=3.1.0 # Apache-2.0
testscenarios>=0.4 # Apache-2.0/BSD