Merge "[k8s] Update cluster health status by native API"
This commit is contained in:
commit
91d84ff01d
@ -138,7 +138,7 @@ class Cluster(base.APIBase):
|
||||
status_reason = wtypes.text
|
||||
"""Status reason of the cluster from the heat stack"""
|
||||
|
||||
health_status = wtypes.Enum(wtypes.text, *fields.ClusterStatus.ALL)
|
||||
health_status = wtypes.Enum(wtypes.text, *fields.ClusterHealthStatus.ALL)
|
||||
"""Health status of the cluster from the native COE API"""
|
||||
|
||||
health_status_reason = wtypes.DictType(wtypes.text, wtypes.text)
|
||||
@ -222,7 +222,8 @@ class Cluster(base.APIBase):
|
||||
status=fields.ClusterStatus.CREATE_COMPLETE,
|
||||
status_reason="CREATE completed successfully",
|
||||
health_status=fields.ClusterHealthStatus.HEALTHY,
|
||||
health_status_reason='{"api_server": "OK"}',
|
||||
health_status_reason={"api": "ok",
|
||||
"node-0.Ready": 'True'},
|
||||
api_address='172.24.4.3',
|
||||
node_addresses=['172.24.4.4', '172.24.4.5'],
|
||||
created_at=timeutils.utcnow(),
|
||||
|
@ -17,6 +17,8 @@ import tempfile
|
||||
from kubernetes import client as k8s_config
|
||||
from kubernetes.client import api_client
|
||||
from kubernetes.client.apis import core_v1_api
|
||||
from kubernetes.client import configuration as k8s_configuration
|
||||
from kubernetes.client import rest
|
||||
from oslo_log import log as logging
|
||||
|
||||
from magnum.conductor.handlers.common.cert_manager import create_client_files
|
||||
@ -24,6 +26,69 @@ from magnum.conductor.handlers.common.cert_manager import create_client_files
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ApiClient(api_client.ApiClient):
|
||||
|
||||
def __init__(self, configuration=None, header_name=None,
|
||||
header_value=None, cookie=None):
|
||||
if configuration is None:
|
||||
configuration = k8s_configuration.Configuration()
|
||||
self.configuration = configuration
|
||||
|
||||
self.rest_client = rest.RESTClientObject(configuration)
|
||||
self.default_headers = {}
|
||||
if header_name is not None:
|
||||
self.default_headers[header_name] = header_value
|
||||
self.cookie = cookie
|
||||
|
||||
def __del__(self):
|
||||
pass
|
||||
|
||||
def call_api(self, resource_path, method,
|
||||
path_params=None, query_params=None, header_params=None,
|
||||
body=None, post_params=None, files=None, async=None,
|
||||
response_type=None, auth_settings=None, async_req=None,
|
||||
_return_http_data_only=None, collection_formats=None,
|
||||
_preload_content=True, _request_timeout=None):
|
||||
"""Makes http request (synchronous) and return the deserialized data
|
||||
|
||||
:param resource_path: Path to method endpoint.
|
||||
:param method: Method to call.
|
||||
:param path_params: Path parameters in the url.
|
||||
:param query_params: Query parameters in the url.
|
||||
:param header_params: Header parameters to be
|
||||
placed in the request header.
|
||||
:param body: Request body.
|
||||
:param post_params dict: Request post form parameters,
|
||||
for `application/x-www-form-urlencoded`, `multipart/form-data`.
|
||||
:param auth_settings list: Auth Settings names for the request.
|
||||
:param response: Response data type.
|
||||
:param files dict: key -> filename, value -> filepath,
|
||||
for `multipart/form-data`.
|
||||
:param async bool: to be compatible with k8s-client before 7.0.0
|
||||
:param async_req bool: execute request asynchronously
|
||||
:param _return_http_data_only: response data without head status code
|
||||
and headers
|
||||
:param collection_formats: dict of collection formats for path, query,
|
||||
header, and post parameters.
|
||||
:param _preload_content: if False, the urllib3.HTTPResponse object will
|
||||
be returned without reading/decoding response
|
||||
data. Default is True.
|
||||
:param _request_timeout: timeout setting for this request. If one
|
||||
number provided, it will be total request
|
||||
timeout. It can also be a pair (tuple) of
|
||||
(connection, read) timeouts.
|
||||
|
||||
:return: The method will return the response directly
|
||||
|
||||
"""
|
||||
return self.__call_api(resource_path, method,
|
||||
path_params, query_params, header_params,
|
||||
body, post_params, files,
|
||||
response_type, auth_settings,
|
||||
_return_http_data_only, collection_formats,
|
||||
_preload_content, _request_timeout)
|
||||
|
||||
|
||||
class K8sAPI(core_v1_api.CoreV1Api):
|
||||
|
||||
def _create_temp_file_with_content(self, content):
|
||||
@ -57,7 +122,7 @@ class K8sAPI(core_v1_api.CoreV1Api):
|
||||
config.key_file = self.key_file.name
|
||||
|
||||
# build a connection with Kubernetes master
|
||||
client = api_client.ApiClient(configuration=config)
|
||||
client = ApiClient(configuration=config)
|
||||
|
||||
super(K8sAPI, self).__init__(client)
|
||||
|
||||
|
@ -12,9 +12,12 @@
|
||||
|
||||
import ast
|
||||
|
||||
from oslo_utils import strutils
|
||||
|
||||
from magnum.common import utils
|
||||
from magnum.conductor import k8s_api as k8s
|
||||
from magnum.conductor import monitors
|
||||
from magnum.objects import fields as m_fields
|
||||
|
||||
|
||||
class K8sMonitor(monitors.MonitorBase):
|
||||
@ -45,6 +48,12 @@ class K8sMonitor(monitors.MonitorBase):
|
||||
pods = k8s_api.list_namespaced_pod('default')
|
||||
self.data['pods'] = self._parse_pod_info(pods)
|
||||
|
||||
def poll_health_status(self):
|
||||
k8s_api = k8s.create_k8s_api(self.context, self.cluster)
|
||||
status, reason = self._poll_health_status(k8s_api)
|
||||
self.data['health_status'] = status
|
||||
self.data['health_status_reason'] = reason
|
||||
|
||||
def _compute_res_util(self, res):
|
||||
res_total = 0
|
||||
for node in self.data['nodes']:
|
||||
@ -164,3 +173,66 @@ class K8sMonitor(monitors.MonitorBase):
|
||||
parsed_nodes.append({'Memory': memory, 'Cpu': cpu})
|
||||
|
||||
return parsed_nodes
|
||||
|
||||
def _poll_health_status(self, k8s_api):
|
||||
"""Poll health status of API and nodes for given cluster
|
||||
|
||||
Design Policy:
|
||||
1. How to calculate the overall health status?
|
||||
Any node (including API and minion nodes) is not OK, then the
|
||||
overall health status is UNHEALTHY
|
||||
|
||||
2. The data structure of health_status_reason
|
||||
As an attribute of the cluster, the health_status_reason have to
|
||||
use the field type from
|
||||
oslo.versionedobjects/blob/master/oslo_versionedobjects/fields.py
|
||||
|
||||
3. How to get the health_status and health_status_reason?
|
||||
3.1 Call /healthz to get the API health status
|
||||
3.2 Call list_node (using API /api/v1/nodes) to get the nodes
|
||||
health status
|
||||
|
||||
:param k8s_api: The api client to the cluster
|
||||
:return: Tumple including status and reason. Example:
|
||||
(
|
||||
ClusterHealthStatus.HEALTHY,
|
||||
{
|
||||
'api': 'ok',
|
||||
'k8scluster-ydz7cfbxqqu3-node-0.Ready': False,
|
||||
'k8scluster-ydz7cfbxqqu3-node-1.Ready': True,
|
||||
'k8scluster-ydz7cfbxqqu3-node-2.Ready': True,
|
||||
}
|
||||
)
|
||||
|
||||
"""
|
||||
|
||||
health_status = m_fields.ClusterHealthStatus.UNHEALTHY
|
||||
health_status_reason = {}
|
||||
api_status = None
|
||||
|
||||
try:
|
||||
api_status, _, _ = k8s_api.api_client.call_api(
|
||||
'/healthz', 'GET', response_type=object)
|
||||
|
||||
for node in k8s_api.list_node().items:
|
||||
node_key = node.metadata.name + ".Ready"
|
||||
ready = False
|
||||
for condition in node.status.conditions:
|
||||
if condition.type == 'Ready':
|
||||
ready = strutils.bool_from_string(condition.status)
|
||||
break
|
||||
|
||||
health_status_reason[node_key] = ready
|
||||
|
||||
if (api_status == 'ok' and
|
||||
all(n for n in health_status_reason.values())):
|
||||
health_status = m_fields.ClusterHealthStatus.HEALTHY
|
||||
|
||||
health_status_reason['api'] = api_status
|
||||
except Exception as exp_api:
|
||||
if not api_status:
|
||||
api_status = (getattr(exp_api, 'body', None) or
|
||||
getattr(exp_api, 'message', None))
|
||||
health_status_reason['api'] = api_status
|
||||
|
||||
return health_status, health_status_reason
|
||||
|
@ -88,6 +88,46 @@ class ClusterUpdateJob(object):
|
||||
raise loopingcall.LoopingCallDone()
|
||||
|
||||
|
||||
class ClusterHealthUpdateJob(object):
|
||||
|
||||
def __init__(self, ctx, cluster):
|
||||
self.ctx = ctx
|
||||
self.cluster = cluster
|
||||
|
||||
def _update_health_status(self):
|
||||
monitor = monitors.create_monitor(self.ctx, self.cluster)
|
||||
if monitor is None:
|
||||
return
|
||||
|
||||
try:
|
||||
monitor.poll_health_status()
|
||||
except Exception as e:
|
||||
LOG.warning(
|
||||
"Skip pulling data from cluster %(cluster)s due to "
|
||||
"error: %(e)s",
|
||||
{'e': e, 'cluster': self.cluster.uuid}, exc_info=True)
|
||||
# TODO(flwang): Should we mark this cluster's health status as
|
||||
# UNKNOWN if Magnum failed to pull data from the cluster? Because
|
||||
# that basically means the k8s API doesn't work at that moment.
|
||||
return
|
||||
|
||||
if monitor.data.get('health_status'):
|
||||
self.cluster.health_status = monitor.data.get('health_status')
|
||||
self.cluster.health_status_reason = monitor.data.get(
|
||||
'health_status_reason')
|
||||
self.cluster.save()
|
||||
|
||||
def update_health_status(self):
|
||||
LOG.debug("Updating health status for cluster %s", self.cluster.id)
|
||||
self._update_health_status()
|
||||
LOG.debug("Status for cluster %s updated to %s (%s)",
|
||||
self.cluster.id, self.cluster.health_status,
|
||||
self.cluster.health_status_reason)
|
||||
# TODO(flwang): Health status update notifications?
|
||||
# end the "loop"
|
||||
raise loopingcall.LoopingCallDone()
|
||||
|
||||
|
||||
@profiler.trace_cls("rpc")
|
||||
class MagnumPeriodicTasks(periodic_task.PeriodicTasks):
|
||||
"""Magnum periodic Task class
|
||||
@ -139,6 +179,36 @@ class MagnumPeriodicTasks(periodic_task.PeriodicTasks):
|
||||
"Ignore error [%s] when syncing up cluster status.",
|
||||
e, exc_info=True)
|
||||
|
||||
@periodic_task.periodic_task(spacing=10, run_immediately=True)
|
||||
@set_context
|
||||
def sync_cluster_health_status(self, ctx):
|
||||
try:
|
||||
LOG.debug('Starting to sync up cluster health status')
|
||||
|
||||
status = [objects.fields.ClusterStatus.CREATE_COMPLETE,
|
||||
objects.fields.ClusterStatus.UPDATE_COMPLETE,
|
||||
objects.fields.ClusterStatus.UPDATE_IN_PROGRESS,
|
||||
objects.fields.ClusterStatus.ROLLBACK_IN_PROGRESS]
|
||||
filters = {'status': status}
|
||||
clusters = objects.Cluster.list(ctx, filters=filters)
|
||||
if not clusters:
|
||||
return
|
||||
|
||||
# synchronize using native COE API
|
||||
for cluster in clusters:
|
||||
job = ClusterHealthUpdateJob(ctx, cluster)
|
||||
# though this call isn't really looping, we use this
|
||||
# abstraction anyway to avoid dealing directly with eventlet
|
||||
# hooey
|
||||
lc = loopingcall.FixedIntervalLoopingCall(
|
||||
f=job.update_health_status)
|
||||
lc.start(1, stop_on_exception=True)
|
||||
|
||||
except Exception as e:
|
||||
LOG.warning(
|
||||
"Ignore error [%s] when syncing up cluster status.",
|
||||
e, exc_info=True)
|
||||
|
||||
@periodic_task.periodic_task(run_immediately=True)
|
||||
@set_context
|
||||
@deprecated(as_of=deprecated.ROCKY)
|
||||
|
@ -13,17 +13,24 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
import mock
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from magnum.common import exception
|
||||
from magnum.drivers.common import k8s_monitor
|
||||
from magnum.drivers.mesos_ubuntu_v1 import monitor as mesos_monitor
|
||||
from magnum.drivers.swarm_fedora_atomic_v1 import monitor as swarm_monitor
|
||||
from magnum.drivers.swarm_fedora_atomic_v2 import monitor as swarm_v2_monitor
|
||||
from magnum import objects
|
||||
from magnum.objects import fields as m_fields
|
||||
from magnum.tests import base
|
||||
from magnum.tests.unit.db import utils
|
||||
|
||||
NODE_STATUS_CONDITION = namedtuple('Condition',
|
||||
['type', 'status'])
|
||||
|
||||
|
||||
class MonitorsTestCase(base.TestCase):
|
||||
|
||||
@ -418,3 +425,75 @@ class MonitorsTestCase(base.TestCase):
|
||||
self.mesos_monitor.data = test_data
|
||||
cpu_util = self.mesos_monitor.compute_cpu_util()
|
||||
self.assertEqual(0, cpu_util)
|
||||
|
||||
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
|
||||
def test_k8s_monitor_health_healthy(self, mock_k8s_api):
|
||||
mock_nodes = mock.MagicMock()
|
||||
mock_node = mock.MagicMock()
|
||||
mock_api_client = mock.MagicMock()
|
||||
mock_node.status = mock.MagicMock()
|
||||
mock_node.metadata.name = 'k8s-cluster-node-0'
|
||||
mock_node.status.conditions = [NODE_STATUS_CONDITION(type='Ready',
|
||||
status='True')]
|
||||
mock_nodes.items = [mock_node]
|
||||
mock_k8s_api.return_value.list_node.return_value = (
|
||||
mock_nodes)
|
||||
mock_k8s_api.return_value.api_client = mock_api_client
|
||||
mock_api_client.call_api.return_value = ('ok', None, None)
|
||||
|
||||
self.k8s_monitor.poll_health_status()
|
||||
self.assertEqual(self.k8s_monitor.data['health_status'],
|
||||
m_fields.ClusterHealthStatus.HEALTHY)
|
||||
self.assertEqual(self.k8s_monitor.data['health_status_reason'],
|
||||
{'api': 'ok', 'k8s-cluster-node-0.Ready': True})
|
||||
|
||||
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
|
||||
def test_k8s_monitor_health_unhealthy_api(self, mock_k8s_api):
|
||||
mock_nodes = mock.MagicMock()
|
||||
mock_node = mock.MagicMock()
|
||||
mock_api_client = mock.MagicMock()
|
||||
mock_node.status = mock.MagicMock()
|
||||
mock_node.metadata.name = 'k8s-cluster-node-0'
|
||||
mock_node.status.conditions = [NODE_STATUS_CONDITION(type='Ready',
|
||||
status='True')]
|
||||
mock_nodes.items = [mock_node]
|
||||
mock_k8s_api.return_value.list_node.return_value = (
|
||||
mock_nodes)
|
||||
mock_k8s_api.return_value.api_client = mock_api_client
|
||||
mock_api_client.call_api.side_effect = exception.MagnumException(
|
||||
message='failed')
|
||||
|
||||
self.k8s_monitor.poll_health_status()
|
||||
self.assertEqual(self.k8s_monitor.data['health_status'],
|
||||
m_fields.ClusterHealthStatus.UNHEALTHY)
|
||||
self.assertEqual(self.k8s_monitor.data['health_status_reason'],
|
||||
{'api': 'failed'})
|
||||
|
||||
@mock.patch('magnum.conductor.k8s_api.create_k8s_api')
|
||||
def test_k8s_monitor_health_unhealthy_node(self, mock_k8s_api):
|
||||
mock_nodes = mock.MagicMock()
|
||||
mock_api_client = mock.MagicMock()
|
||||
|
||||
mock_node0 = mock.MagicMock()
|
||||
mock_node0.status = mock.MagicMock()
|
||||
mock_node0.metadata.name = 'k8s-cluster-node-0'
|
||||
mock_node0.status.conditions = [NODE_STATUS_CONDITION(type='Ready',
|
||||
status='False')]
|
||||
mock_node1 = mock.MagicMock()
|
||||
mock_node1.status = mock.MagicMock()
|
||||
mock_node1.metadata.name = 'k8s-cluster-node-1'
|
||||
mock_node1.status.conditions = [NODE_STATUS_CONDITION(type='Ready',
|
||||
status='True')]
|
||||
|
||||
mock_nodes.items = [mock_node0, mock_node1]
|
||||
mock_k8s_api.return_value.list_node.return_value = (
|
||||
mock_nodes)
|
||||
mock_k8s_api.return_value.api_client = mock_api_client
|
||||
mock_api_client.call_api.return_value = ('ok', None, None)
|
||||
|
||||
self.k8s_monitor.poll_health_status()
|
||||
self.assertEqual(self.k8s_monitor.data['health_status'],
|
||||
m_fields.ClusterHealthStatus.UNHEALTHY)
|
||||
self.assertEqual(self.k8s_monitor.data['health_status_reason'],
|
||||
{'api': 'ok', 'k8s-cluster-node-0.Ready': False,
|
||||
'api': 'ok', 'k8s-cluster-node-1.Ready': True})
|
||||
|
@ -18,7 +18,9 @@ from magnum.common import context
|
||||
from magnum.common.rpc_service import CONF
|
||||
from magnum.db.sqlalchemy import api as dbapi
|
||||
from magnum.drivers.common import driver
|
||||
from magnum.drivers.common import k8s_monitor
|
||||
from magnum import objects
|
||||
from magnum.objects.fields import ClusterHealthStatus as cluster_health_status
|
||||
from magnum.objects.fields import ClusterStatus as cluster_status
|
||||
from magnum.service import periodic
|
||||
from magnum.tests import base
|
||||
@ -361,3 +363,31 @@ class PeriodicTestCase(base.TestCase):
|
||||
|
||||
self.assertEqual(0, mock_create_monitor.call_count)
|
||||
self.assertEqual(0, notifier.info.call_count)
|
||||
|
||||
@mock.patch('oslo_service.loopingcall.FixedIntervalLoopingCall',
|
||||
new=fakes.FakeLoopingCall)
|
||||
@mock.patch('magnum.conductor.monitors.create_monitor')
|
||||
@mock.patch('magnum.objects.Cluster.list')
|
||||
@mock.patch('magnum.common.rpc.get_notifier')
|
||||
@mock.patch('magnum.common.context.make_admin_context')
|
||||
def test_sync_cluster_health_status(self, mock_make_admin_context,
|
||||
mock_get_notifier, mock_cluster_list,
|
||||
mock_create_monitor):
|
||||
"""Test sync cluster health status"""
|
||||
mock_make_admin_context.return_value = self.context
|
||||
notifier = mock.MagicMock()
|
||||
mock_get_notifier.return_value = notifier
|
||||
mock_cluster_list.return_value = [self.cluster4]
|
||||
self.cluster4.status = cluster_status.CREATE_COMPLETE
|
||||
health = {'health_status': cluster_health_status.UNHEALTHY,
|
||||
'health_status_reason': {'api': 'ok', 'node-0.Ready': False}}
|
||||
monitor = mock.MagicMock(spec=k8s_monitor.K8sMonitor, name='test',
|
||||
data=health)
|
||||
mock_create_monitor.return_value = monitor
|
||||
periodic.MagnumPeriodicTasks(CONF).sync_cluster_health_status(
|
||||
self.context)
|
||||
|
||||
self.assertEqual(cluster_health_status.UNHEALTHY,
|
||||
self.cluster4.health_status)
|
||||
self.assertEqual({'api': 'ok', 'node-0.Ready': 'False'},
|
||||
self.cluster4.health_status_reason)
|
||||
|
Loading…
x
Reference in New Issue
Block a user