Browse Source

[k8s] Update cluster health status by native API

Calling Kubernetes native API to update the cluster health status
so that it can used for cluster auto healing.

Task: 24593
Story: 2002742

Change-Id: Ia76eeeb2f1734dff38d9660c804d7d2d0f65b9fb
changes/97/572897/21
Feilong Wang 4 years ago
parent
commit
c38edc6929
  1. 5
      magnum/api/controllers/v1/cluster.py
  2. 67
      magnum/conductor/k8s_api.py
  3. 72
      magnum/drivers/common/k8s_monitor.py
  4. 70
      magnum/service/periodic.py
  5. 79
      magnum/tests/unit/conductor/test_monitors.py
  6. 30
      magnum/tests/unit/service/test_periodic.py

5
magnum/api/controllers/v1/cluster.py

@ -138,7 +138,7 @@ class Cluster(base.APIBase):
status_reason = wtypes.text
"""Status reason of the cluster from the heat stack"""
health_status = wtypes.Enum(wtypes.text, *fields.ClusterStatus.ALL)
health_status = wtypes.Enum(wtypes.text, *fields.ClusterHealthStatus.ALL)
"""Health status of the cluster from the native COE API"""
health_status_reason = wtypes.DictType(wtypes.text, wtypes.text)
@ -222,7 +222,8 @@ class Cluster(base.APIBase):
status=fields.ClusterStatus.CREATE_COMPLETE,
status_reason="CREATE completed successfully",
health_status=fields.ClusterHealthStatus.HEALTHY,
health_status_reason='{"api_server": "OK"}',
health_status_reason={"api": "ok",
"node-0.Ready": 'True'},
api_address='172.24.4.3',
node_addresses=['172.24.4.4', '172.24.4.5'],
created_at=timeutils.utcnow(),

67
magnum/conductor/k8s_api.py

@ -17,6 +17,8 @@ import tempfile
from kubernetes import client as k8s_config
from kubernetes.client import api_client
from kubernetes.client.apis import core_v1_api
from kubernetes.client import configuration as k8s_configuration
from kubernetes.client import rest
from oslo_log import log as logging
from magnum.conductor.handlers.common.cert_manager import create_client_files
@ -24,6 +26,69 @@ from magnum.conductor.handlers.common.cert_manager import create_client_files
LOG = logging.getLogger(__name__)
class ApiClient(api_client.ApiClient):
def __init__(self, configuration=None, header_name=None,
header_value=None, cookie=None):
if configuration is None:
configuration = k8s_configuration.Configuration()
self.configuration = configuration
self.rest_client = rest.RESTClientObject(configuration)
self.default_headers = {}
if header_name is not None:
self.default_headers[header_name] = header_value
self.cookie = cookie
def __del__(self):
pass
def call_api(self, resource_path, method,
path_params=None, query_params=None, header_params=None,
body=None, post_params=None, files=None, async=None,
response_type=None, auth_settings=None, async_req=None,
_return_http_data_only=None, collection_formats=None,
_preload_content=True, _request_timeout=None):
"""Makes http request (synchronous) and return the deserialized data
:param resource_path: Path to method endpoint.
:param method: Method to call.
:param path_params: Path parameters in the url.
:param query_params: Query parameters in the url.
:param header_params: Header parameters to be
placed in the request header.
:param body: Request body.
:param post_params dict: Request post form parameters,
for `application/x-www-form-urlencoded`, `multipart/form-data`.
:param auth_settings list: Auth Settings names for the request.
:param response: Response data type.
:param files dict: key -> filename, value -> filepath,
for `multipart/form-data`.
:param async bool: to be compatible with k8s-client before 7.0.0
:param async_req bool: execute request asynchronously
:param _return_http_data_only: response data without head status code
and headers
:param collection_formats: dict of collection formats for path, query,
header, and post parameters.
:param _preload_content: if False, the urllib3.HTTPResponse object will
be returned without reading/decoding response
data. Default is True.
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: The method will return the response directly
"""
return self.__call_api(resource_path, method,
path_params, query_params, header_params,
body, post_params, files,
response_type, auth_settings,
_return_http_data_only, collection_formats,
_preload_content, _request_timeout)
class K8sAPI(core_v1_api.CoreV1Api):
def _create_temp_file_with_content(self, content):
@ -57,7 +122,7 @@ class K8sAPI(core_v1_api.CoreV1Api):
config.key_file = self.key_file.name
# build a connection with Kubernetes master
client = api_client.ApiClient(configuration=config)
client = ApiClient(configuration=config)
super(K8sAPI, self).__init__(client)

72
magnum/drivers/common/k8s_monitor.py

@ -12,9 +12,12 @@
import ast
from oslo_utils import strutils
from magnum.common import utils
from magnum.conductor import k8s_api as k8s
from magnum.conductor import monitors
from magnum.objects import fields as m_fields
class K8sMonitor(monitors.MonitorBase):
@ -45,6 +48,12 @@ class K8sMonitor(monitors.MonitorBase):
pods = k8s_api.list_namespaced_pod('default')
self.data['pods'] = self._parse_pod_info(pods)
def poll_health_status(self):
k8s_api = k8s.create_k8s_api(self.context, self.cluster)
status, reason = self._poll_health_status(k8s_api)
self.data['health_status'] = status
self.data['health_status_reason'] = reason
def _compute_res_util(self, res):
res_total = 0
for node in self.data['nodes']:
@ -164,3 +173,66 @@ class K8sMonitor(monitors.MonitorBase):
parsed_nodes.append({'Memory': memory, 'Cpu': cpu})
return parsed_nodes
def _poll_health_status(self, k8s_api):
"""Poll health status of API and nodes for given cluster
Design Policy:
1. How to calculate the overall health status?
Any node (including API and minion nodes) is not OK, then the
overall health status is UNHEALTHY
2. The data structure of health_status_reason
As an attribute of the cluster, the health_status_reason have to
use the field type from
oslo.versionedobjects/blob/master/oslo_versionedobjects/fields.py
3. How to get the health_status and health_status_reason?
3.1 Call /healthz to get the API health status
3.2 Call list_node (using API /api/v1/nodes) to get the nodes
health status
:param k8s_api: The api client to the cluster
:return: Tumple including status and reason. Example:
(
ClusterHealthStatus.HEALTHY,
{
'api': 'ok',
'k8scluster-ydz7cfbxqqu3-node-0.Ready': False,
'k8scluster-ydz7cfbxqqu3-node-1.Ready': True,
'k8scluster-ydz7cfbxqqu3-node-2.Ready': True,
}
)
"""
health_status = m_fields.ClusterHealthStatus.UNHEALTHY
health_status_reason = {}
api_status = None
try:
api_status, _, _ = k8s_api.api_client.call_api(
'/healthz', 'GET', response_type=object)
for node in k8s_api.list_node().items:
node_key = node.metadata.name + ".Ready"
ready = False
for condition in node.status.conditions:
if condition.type == 'Ready':
ready = strutils.bool_from_string(condition.status)
break
health_status_reason[node_key] = ready
if (api_status == 'ok' and
all(n for n in health_status_reason.values())):
health_status = m_fields.ClusterHealthStatus.HEALTHY
health_status_reason['api'] = api_status
except Exception as exp_api:
if not api_status:
api_status = (getattr(exp_api, 'body', None) or
getattr(exp_api, 'message', None))
health_status_reason['api'] = api_status
return health_status, health_status_reason

70
magnum/service/periodic.py

@ -88,6 +88,46 @@ class ClusterUpdateJob(object):
raise loopingcall.LoopingCallDone()
class ClusterHealthUpdateJob(object):
def __init__(self, ctx, cluster):
self.ctx = ctx
self.cluster = cluster
def _update_health_status(self):
monitor = monitors.create_monitor(self.ctx, self.cluster)
if monitor is None:
return
try:
monitor.poll_health_status()
except Exception as e:
LOG.warning(
"Skip pulling data from cluster %(cluster)s due to "
"error: %(e)s",
{'e': e, 'cluster': self.cluster.uuid}, exc_info=True)
# TODO(flwang): Should we mark this cluster's health status as
# UNKNOWN if Magnum failed to pull data from the cluster? Because
# that basically means the k8s API doesn't work at that moment.
return
if monitor.data.get('health_status'):
self.cluster.health_status = monitor.data.get('health_status')
self.cluster.health_status_reason = monitor.data.get(
'health_status_reason')
self.cluster.save()
def update_health_status(self):
LOG.debug("Updating health status for cluster %s", self.cluster.id)
self._update_health_status()
LOG.debug("Status for cluster %s updated to %s (%s)",
self.cluster.id, self.cluster.health_status,
self.cluster.health_status_reason)
# TODO(flwang): Health status update notifications?
# end the "loop"
raise loopingcall.LoopingCallDone()
@profiler.trace_cls("rpc")
class MagnumPeriodicTasks(periodic_task.PeriodicTasks):
"""Magnum periodic Task class
@ -139,6 +179,36 @@ class MagnumPeriodicTasks(periodic_task.PeriodicTasks):
"Ignore error [%s] when syncing up cluster status.",
e, exc_info=True)
@periodic_task.periodic_task(spacing=10, run_immediately=True)
@set_context
def sync_cluster_health_status(self, ctx):
try:
LOG.debug('Starting to sync up cluster health status')
status = [objects.fields.ClusterStatus.CREATE_COMPLETE,
objects.fields.ClusterStatus.UPDATE_COMPLETE,
objects.fields.ClusterStatus.UPDATE_IN_PROGRESS,
objects.fields.ClusterStatus.ROLLBACK_IN_PROGRESS]
filters = {'status': status}
clusters = objects.Cluster.list(ctx, filters=filters)
if not clusters:
return
# synchronize using native COE API
for cluster in clusters:
job = ClusterHealthUpdateJob(ctx, cluster)
# though this call isn't really looping, we use this
# abstraction anyway to avoid dealing directly with eventlet
# hooey
lc = loopingcall.FixedIntervalLoopingCall(
f=job.update_health_status)
lc.start(1, stop_on_exception=True)
except Exception as e:
LOG.warning(
"Ignore error [%s] when syncing up cluster status.",
e, exc_info=True)
@periodic_task.periodic_task(run_immediately=True)
@set_context
@deprecated(as_of=deprecated.ROCKY)

79
magnum/tests/unit/conductor/test_monitors.py

@ -13,17 +13,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
import mock
from oslo_serialization import jsonutils
from magnum.common import exception
from magnum.drivers.common import k8s_monitor
from magnum.drivers.mesos_ubuntu_v1 import monitor as mesos_monitor
from magnum.drivers.swarm_fedora_atomic_v1 import monitor as swarm_monitor
from magnum.drivers.swarm_fedora_atomic_v2 import monitor as swarm_v2_monitor
from magnum import objects
from magnum.objects import fields as m_fields
from magnum.tests import base
from magnum.tests.unit.db import utils
NODE_STATUS_CONDITION = namedtuple('Condition',
['type', 'status'])
class MonitorsTestCase(base.TestCase):
@ -418,3 +425,75 @@ class MonitorsTestCase(base.TestCase):
self.mesos_monitor.data = test_data
cpu_util = self.mesos_monitor.compute_cpu_util()
self.assertEqual(0, cpu_util)
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
def test_k8s_monitor_health_healthy(self, mock_k8s_api):
mock_nodes = mock.MagicMock()
mock_node = mock.MagicMock()
mock_api_client = mock.MagicMock()
mock_node.status = mock.MagicMock()
mock_node.metadata.name = 'k8s-cluster-node-0'
mock_node.status.conditions = [NODE_STATUS_CONDITION(type='Ready',
status='True')]
mock_nodes.items = [mock_node]
mock_k8s_api.return_value.list_node.return_value = (
mock_nodes)
mock_k8s_api.return_value.api_client = mock_api_client
mock_api_client.call_api.return_value = ('ok', None, None)
self.k8s_monitor.poll_health_status()
self.assertEqual(self.k8s_monitor.data['health_status'],
m_fields.ClusterHealthStatus.HEALTHY)
self.assertEqual(self.k8s_monitor.data['health_status_reason'],
{'api': 'ok', 'k8s-cluster-node-0.Ready': True})
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
def test_k8s_monitor_health_unhealthy_api(self, mock_k8s_api):
mock_nodes = mock.MagicMock()
mock_node = mock.MagicMock()
mock_api_client = mock.MagicMock()
mock_node.status = mock.MagicMock()
mock_node.metadata.name = 'k8s-cluster-node-0'
mock_node.status.conditions = [NODE_STATUS_CONDITION(type='Ready',
status='True')]
mock_nodes.items = [mock_node]
mock_k8s_api.return_value.list_node.return_value = (
mock_nodes)
mock_k8s_api.return_value.api_client = mock_api_client
mock_api_client.call_api.side_effect = exception.MagnumException(
message='failed')
self.k8s_monitor.poll_health_status()
self.assertEqual(self.k8s_monitor.data['health_status'],
m_fields.ClusterHealthStatus.UNHEALTHY)
self.assertEqual(self.k8s_monitor.data['health_status_reason'],
{'api': 'failed'})
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
def test_k8s_monitor_health_unhealthy_node(self, mock_k8s_api):
mock_nodes = mock.MagicMock()
mock_api_client = mock.MagicMock()
mock_node0 = mock.MagicMock()
mock_node0.status = mock.MagicMock()
mock_node0.metadata.name = 'k8s-cluster-node-0'
mock_node0.status.conditions = [NODE_STATUS_CONDITION(type='Ready',
status='False')]
mock_node1 = mock.MagicMock()
mock_node1.status = mock.MagicMock()
mock_node1.metadata.name = 'k8s-cluster-node-1'
mock_node1.status.conditions = [NODE_STATUS_CONDITION(type='Ready',
status='True')]
mock_nodes.items = [mock_node0, mock_node1]
mock_k8s_api.return_value.list_node.return_value = (
mock_nodes)
mock_k8s_api.return_value.api_client = mock_api_client
mock_api_client.call_api.return_value = ('ok', None, None)
self.k8s_monitor.poll_health_status()
self.assertEqual(self.k8s_monitor.data['health_status'],
m_fields.ClusterHealthStatus.UNHEALTHY)
self.assertEqual(self.k8s_monitor.data['health_status_reason'],
{'api': 'ok', 'k8s-cluster-node-0.Ready': False,
'api': 'ok', 'k8s-cluster-node-1.Ready': True})

30
magnum/tests/unit/service/test_periodic.py

@ -18,7 +18,9 @@ from magnum.common import context
from magnum.common.rpc_service import CONF
from magnum.db.sqlalchemy import api as dbapi
from magnum.drivers.common import driver
from magnum.drivers.common import k8s_monitor
from magnum import objects
from magnum.objects.fields import ClusterHealthStatus as cluster_health_status
from magnum.objects.fields import ClusterStatus as cluster_status
from magnum.service import periodic
from magnum.tests import base
@ -361,3 +363,31 @@ class PeriodicTestCase(base.TestCase):
self.assertEqual(0, mock_create_monitor.call_count)
self.assertEqual(0, notifier.info.call_count)
@mock.patch('oslo_service.loopingcall.FixedIntervalLoopingCall',
new=fakes.FakeLoopingCall)
@mock.patch('magnum.conductor.monitors.create_monitor')
@mock.patch('magnum.objects.Cluster.list')
@mock.patch('magnum.common.rpc.get_notifier')
@mock.patch('magnum.common.context.make_admin_context')
def test_sync_cluster_health_status(self, mock_make_admin_context,
mock_get_notifier, mock_cluster_list,
mock_create_monitor):
"""Test sync cluster health status"""
mock_make_admin_context.return_value = self.context
notifier = mock.MagicMock()
mock_get_notifier.return_value = notifier
mock_cluster_list.return_value = [self.cluster4]
self.cluster4.status = cluster_status.CREATE_COMPLETE
health = {'health_status': cluster_health_status.UNHEALTHY,
'health_status_reason': {'api': 'ok', 'node-0.Ready': False}}
monitor = mock.MagicMock(spec=k8s_monitor.K8sMonitor, name='test',
data=health)
mock_create_monitor.return_value = monitor
periodic.MagnumPeriodicTasks(CONF).sync_cluster_health_status(
self.context)
self.assertEqual(cluster_health_status.UNHEALTHY,
self.cluster4.health_status)
self.assertEqual({'api': 'ok', 'node-0.Ready': 'False'},
self.cluster4.health_status_reason)

Loading…
Cancel
Save