diff --git a/magnum/api/controllers/v1/cluster.py b/magnum/api/controllers/v1/cluster.py index f41980b43c..1bf75abc6b 100755 --- a/magnum/api/controllers/v1/cluster.py +++ b/magnum/api/controllers/v1/cluster.py @@ -138,7 +138,7 @@ class Cluster(base.APIBase): status_reason = wtypes.text """Status reason of the cluster from the heat stack""" - health_status = wtypes.Enum(wtypes.text, *fields.ClusterStatus.ALL) + health_status = wtypes.Enum(wtypes.text, *fields.ClusterHealthStatus.ALL) """Health status of the cluster from the native COE API""" health_status_reason = wtypes.DictType(wtypes.text, wtypes.text) @@ -222,7 +222,8 @@ class Cluster(base.APIBase): status=fields.ClusterStatus.CREATE_COMPLETE, status_reason="CREATE completed successfully", health_status=fields.ClusterHealthStatus.HEALTHY, - health_status_reason='{"api_server": "OK"}', + health_status_reason={"api": "ok", + "node-0.Ready": 'True'}, api_address='172.24.4.3', node_addresses=['172.24.4.4', '172.24.4.5'], created_at=timeutils.utcnow(), diff --git a/magnum/conductor/k8s_api.py b/magnum/conductor/k8s_api.py index 496a3f1924..b72b362462 100755 --- a/magnum/conductor/k8s_api.py +++ b/magnum/conductor/k8s_api.py @@ -17,6 +17,8 @@ import tempfile from kubernetes import client as k8s_config from kubernetes.client import api_client from kubernetes.client.apis import core_v1_api +from kubernetes.client import configuration as k8s_configuration +from kubernetes.client import rest from oslo_log import log as logging from magnum.conductor.handlers.common.cert_manager import create_client_files @@ -24,6 +26,69 @@ from magnum.conductor.handlers.common.cert_manager import create_client_files LOG = logging.getLogger(__name__) +class ApiClient(api_client.ApiClient): + + def __init__(self, configuration=None, header_name=None, + header_value=None, cookie=None): + if configuration is None: + configuration = k8s_configuration.Configuration() + self.configuration = configuration + + self.rest_client = rest.RESTClientObject(configuration) + self.default_headers = {} + if header_name is not None: + self.default_headers[header_name] = header_value + self.cookie = cookie + + def __del__(self): + pass + + def call_api(self, resource_path, method, + path_params=None, query_params=None, header_params=None, + body=None, post_params=None, files=None, async=None, + response_type=None, auth_settings=None, async_req=None, + _return_http_data_only=None, collection_formats=None, + _preload_content=True, _request_timeout=None): + """Makes http request (synchronous) and return the deserialized data + + :param resource_path: Path to method endpoint. + :param method: Method to call. + :param path_params: Path parameters in the url. + :param query_params: Query parameters in the url. + :param header_params: Header parameters to be + placed in the request header. + :param body: Request body. + :param post_params dict: Request post form parameters, + for `application/x-www-form-urlencoded`, `multipart/form-data`. + :param auth_settings list: Auth Settings names for the request. + :param response: Response data type. + :param files dict: key -> filename, value -> filepath, + for `multipart/form-data`. + :param async bool: to be compatible with k8s-client before 7.0.0 + :param async_req bool: execute request asynchronously + :param _return_http_data_only: response data without head status code + and headers + :param collection_formats: dict of collection formats for path, query, + header, and post parameters. + :param _preload_content: if False, the urllib3.HTTPResponse object will + be returned without reading/decoding response + data. Default is True. + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + + :return: The method will return the response directly + + """ + return self.__call_api(resource_path, method, + path_params, query_params, header_params, + body, post_params, files, + response_type, auth_settings, + _return_http_data_only, collection_formats, + _preload_content, _request_timeout) + + class K8sAPI(core_v1_api.CoreV1Api): def _create_temp_file_with_content(self, content): @@ -57,7 +122,7 @@ class K8sAPI(core_v1_api.CoreV1Api): config.key_file = self.key_file.name # build a connection with Kubernetes master - client = api_client.ApiClient(configuration=config) + client = ApiClient(configuration=config) super(K8sAPI, self).__init__(client) diff --git a/magnum/drivers/common/k8s_monitor.py b/magnum/drivers/common/k8s_monitor.py index c6ac1472d3..7739569345 100644 --- a/magnum/drivers/common/k8s_monitor.py +++ b/magnum/drivers/common/k8s_monitor.py @@ -12,9 +12,12 @@ import ast +from oslo_utils import strutils + from magnum.common import utils from magnum.conductor import k8s_api as k8s from magnum.conductor import monitors +from magnum.objects import fields as m_fields class K8sMonitor(monitors.MonitorBase): @@ -45,6 +48,12 @@ class K8sMonitor(monitors.MonitorBase): pods = k8s_api.list_namespaced_pod('default') self.data['pods'] = self._parse_pod_info(pods) + def poll_health_status(self): + k8s_api = k8s.create_k8s_api(self.context, self.cluster) + status, reason = self._poll_health_status(k8s_api) + self.data['health_status'] = status + self.data['health_status_reason'] = reason + def _compute_res_util(self, res): res_total = 0 for node in self.data['nodes']: @@ -164,3 +173,66 @@ class K8sMonitor(monitors.MonitorBase): parsed_nodes.append({'Memory': memory, 'Cpu': cpu}) return parsed_nodes + + def _poll_health_status(self, k8s_api): + """Poll health status of API and nodes for given cluster + + Design Policy: + 1. How to calculate the overall health status? + Any node (including API and minion nodes) is not OK, then the + overall health status is UNHEALTHY + + 2. The data structure of health_status_reason + As an attribute of the cluster, the health_status_reason have to + use the field type from + oslo.versionedobjects/blob/master/oslo_versionedobjects/fields.py + + 3. How to get the health_status and health_status_reason? + 3.1 Call /healthz to get the API health status + 3.2 Call list_node (using API /api/v1/nodes) to get the nodes + health status + + :param k8s_api: The api client to the cluster + :return: Tumple including status and reason. Example: + ( + ClusterHealthStatus.HEALTHY, + { + 'api': 'ok', + 'k8scluster-ydz7cfbxqqu3-node-0.Ready': False, + 'k8scluster-ydz7cfbxqqu3-node-1.Ready': True, + 'k8scluster-ydz7cfbxqqu3-node-2.Ready': True, + } + ) + + """ + + health_status = m_fields.ClusterHealthStatus.UNHEALTHY + health_status_reason = {} + api_status = None + + try: + api_status, _, _ = k8s_api.api_client.call_api( + '/healthz', 'GET', response_type=object) + + for node in k8s_api.list_node().items: + node_key = node.metadata.name + ".Ready" + ready = False + for condition in node.status.conditions: + if condition.type == 'Ready': + ready = strutils.bool_from_string(condition.status) + break + + health_status_reason[node_key] = ready + + if (api_status == 'ok' and + all(n for n in health_status_reason.values())): + health_status = m_fields.ClusterHealthStatus.HEALTHY + + health_status_reason['api'] = api_status + except Exception as exp_api: + if not api_status: + api_status = (getattr(exp_api, 'body', None) or + getattr(exp_api, 'message', None)) + health_status_reason['api'] = api_status + + return health_status, health_status_reason diff --git a/magnum/service/periodic.py b/magnum/service/periodic.py index c4852f0509..dc3010b1bc 100755 --- a/magnum/service/periodic.py +++ b/magnum/service/periodic.py @@ -88,6 +88,46 @@ class ClusterUpdateJob(object): raise loopingcall.LoopingCallDone() +class ClusterHealthUpdateJob(object): + + def __init__(self, ctx, cluster): + self.ctx = ctx + self.cluster = cluster + + def _update_health_status(self): + monitor = monitors.create_monitor(self.ctx, self.cluster) + if monitor is None: + return + + try: + monitor.poll_health_status() + except Exception as e: + LOG.warning( + "Skip pulling data from cluster %(cluster)s due to " + "error: %(e)s", + {'e': e, 'cluster': self.cluster.uuid}, exc_info=True) + # TODO(flwang): Should we mark this cluster's health status as + # UNKNOWN if Magnum failed to pull data from the cluster? Because + # that basically means the k8s API doesn't work at that moment. + return + + if monitor.data.get('health_status'): + self.cluster.health_status = monitor.data.get('health_status') + self.cluster.health_status_reason = monitor.data.get( + 'health_status_reason') + self.cluster.save() + + def update_health_status(self): + LOG.debug("Updating health status for cluster %s", self.cluster.id) + self._update_health_status() + LOG.debug("Status for cluster %s updated to %s (%s)", + self.cluster.id, self.cluster.health_status, + self.cluster.health_status_reason) + # TODO(flwang): Health status update notifications? + # end the "loop" + raise loopingcall.LoopingCallDone() + + @profiler.trace_cls("rpc") class MagnumPeriodicTasks(periodic_task.PeriodicTasks): """Magnum periodic Task class @@ -139,6 +179,36 @@ class MagnumPeriodicTasks(periodic_task.PeriodicTasks): "Ignore error [%s] when syncing up cluster status.", e, exc_info=True) + @periodic_task.periodic_task(spacing=10, run_immediately=True) + @set_context + def sync_cluster_health_status(self, ctx): + try: + LOG.debug('Starting to sync up cluster health status') + + status = [objects.fields.ClusterStatus.CREATE_COMPLETE, + objects.fields.ClusterStatus.UPDATE_COMPLETE, + objects.fields.ClusterStatus.UPDATE_IN_PROGRESS, + objects.fields.ClusterStatus.ROLLBACK_IN_PROGRESS] + filters = {'status': status} + clusters = objects.Cluster.list(ctx, filters=filters) + if not clusters: + return + + # synchronize using native COE API + for cluster in clusters: + job = ClusterHealthUpdateJob(ctx, cluster) + # though this call isn't really looping, we use this + # abstraction anyway to avoid dealing directly with eventlet + # hooey + lc = loopingcall.FixedIntervalLoopingCall( + f=job.update_health_status) + lc.start(1, stop_on_exception=True) + + except Exception as e: + LOG.warning( + "Ignore error [%s] when syncing up cluster status.", + e, exc_info=True) + @periodic_task.periodic_task(run_immediately=True) @set_context @deprecated(as_of=deprecated.ROCKY) diff --git a/magnum/tests/unit/conductor/test_monitors.py b/magnum/tests/unit/conductor/test_monitors.py index 5442665435..0cb5bbfc6f 100644 --- a/magnum/tests/unit/conductor/test_monitors.py +++ b/magnum/tests/unit/conductor/test_monitors.py @@ -13,17 +13,24 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import namedtuple + import mock from oslo_serialization import jsonutils +from magnum.common import exception from magnum.drivers.common import k8s_monitor from magnum.drivers.mesos_ubuntu_v1 import monitor as mesos_monitor from magnum.drivers.swarm_fedora_atomic_v1 import monitor as swarm_monitor from magnum.drivers.swarm_fedora_atomic_v2 import monitor as swarm_v2_monitor from magnum import objects +from magnum.objects import fields as m_fields from magnum.tests import base from magnum.tests.unit.db import utils +NODE_STATUS_CONDITION = namedtuple('Condition', + ['type', 'status']) + class MonitorsTestCase(base.TestCase): @@ -418,3 +425,75 @@ class MonitorsTestCase(base.TestCase): self.mesos_monitor.data = test_data cpu_util = self.mesos_monitor.compute_cpu_util() self.assertEqual(0, cpu_util) + + @mock.patch('magnum.conductor.k8s_api.create_k8s_api') + def test_k8s_monitor_health_healthy(self, mock_k8s_api): + mock_nodes = mock.MagicMock() + mock_node = mock.MagicMock() + mock_api_client = mock.MagicMock() + mock_node.status = mock.MagicMock() + mock_node.metadata.name = 'k8s-cluster-node-0' + mock_node.status.conditions = [NODE_STATUS_CONDITION(type='Ready', + status='True')] + mock_nodes.items = [mock_node] + mock_k8s_api.return_value.list_node.return_value = ( + mock_nodes) + mock_k8s_api.return_value.api_client = mock_api_client + mock_api_client.call_api.return_value = ('ok', None, None) + + self.k8s_monitor.poll_health_status() + self.assertEqual(self.k8s_monitor.data['health_status'], + m_fields.ClusterHealthStatus.HEALTHY) + self.assertEqual(self.k8s_monitor.data['health_status_reason'], + {'api': 'ok', 'k8s-cluster-node-0.Ready': True}) + + @mock.patch('magnum.conductor.k8s_api.create_k8s_api') + def test_k8s_monitor_health_unhealthy_api(self, mock_k8s_api): + mock_nodes = mock.MagicMock() + mock_node = mock.MagicMock() + mock_api_client = mock.MagicMock() + mock_node.status = mock.MagicMock() + mock_node.metadata.name = 'k8s-cluster-node-0' + mock_node.status.conditions = [NODE_STATUS_CONDITION(type='Ready', + status='True')] + mock_nodes.items = [mock_node] + mock_k8s_api.return_value.list_node.return_value = ( + mock_nodes) + mock_k8s_api.return_value.api_client = mock_api_client + mock_api_client.call_api.side_effect = exception.MagnumException( + message='failed') + + self.k8s_monitor.poll_health_status() + self.assertEqual(self.k8s_monitor.data['health_status'], + m_fields.ClusterHealthStatus.UNHEALTHY) + self.assertEqual(self.k8s_monitor.data['health_status_reason'], + {'api': 'failed'}) + + @mock.patch('magnum.conductor.k8s_api.create_k8s_api') + def test_k8s_monitor_health_unhealthy_node(self, mock_k8s_api): + mock_nodes = mock.MagicMock() + mock_api_client = mock.MagicMock() + + mock_node0 = mock.MagicMock() + mock_node0.status = mock.MagicMock() + mock_node0.metadata.name = 'k8s-cluster-node-0' + mock_node0.status.conditions = [NODE_STATUS_CONDITION(type='Ready', + status='False')] + mock_node1 = mock.MagicMock() + mock_node1.status = mock.MagicMock() + mock_node1.metadata.name = 'k8s-cluster-node-1' + mock_node1.status.conditions = [NODE_STATUS_CONDITION(type='Ready', + status='True')] + + mock_nodes.items = [mock_node0, mock_node1] + mock_k8s_api.return_value.list_node.return_value = ( + mock_nodes) + mock_k8s_api.return_value.api_client = mock_api_client + mock_api_client.call_api.return_value = ('ok', None, None) + + self.k8s_monitor.poll_health_status() + self.assertEqual(self.k8s_monitor.data['health_status'], + m_fields.ClusterHealthStatus.UNHEALTHY) + self.assertEqual(self.k8s_monitor.data['health_status_reason'], + {'api': 'ok', 'k8s-cluster-node-0.Ready': False, + 'api': 'ok', 'k8s-cluster-node-1.Ready': True}) diff --git a/magnum/tests/unit/service/test_periodic.py b/magnum/tests/unit/service/test_periodic.py index 4d50765a33..f32f9f4844 100644 --- a/magnum/tests/unit/service/test_periodic.py +++ b/magnum/tests/unit/service/test_periodic.py @@ -18,7 +18,9 @@ from magnum.common import context from magnum.common.rpc_service import CONF from magnum.db.sqlalchemy import api as dbapi from magnum.drivers.common import driver +from magnum.drivers.common import k8s_monitor from magnum import objects +from magnum.objects.fields import ClusterHealthStatus as cluster_health_status from magnum.objects.fields import ClusterStatus as cluster_status from magnum.service import periodic from magnum.tests import base @@ -361,3 +363,31 @@ class PeriodicTestCase(base.TestCase): self.assertEqual(0, mock_create_monitor.call_count) self.assertEqual(0, notifier.info.call_count) + + @mock.patch('oslo_service.loopingcall.FixedIntervalLoopingCall', + new=fakes.FakeLoopingCall) + @mock.patch('magnum.conductor.monitors.create_monitor') + @mock.patch('magnum.objects.Cluster.list') + @mock.patch('magnum.common.rpc.get_notifier') + @mock.patch('magnum.common.context.make_admin_context') + def test_sync_cluster_health_status(self, mock_make_admin_context, + mock_get_notifier, mock_cluster_list, + mock_create_monitor): + """Test sync cluster health status""" + mock_make_admin_context.return_value = self.context + notifier = mock.MagicMock() + mock_get_notifier.return_value = notifier + mock_cluster_list.return_value = [self.cluster4] + self.cluster4.status = cluster_status.CREATE_COMPLETE + health = {'health_status': cluster_health_status.UNHEALTHY, + 'health_status_reason': {'api': 'ok', 'node-0.Ready': False}} + monitor = mock.MagicMock(spec=k8s_monitor.K8sMonitor, name='test', + data=health) + mock_create_monitor.return_value = monitor + periodic.MagnumPeriodicTasks(CONF).sync_cluster_health_status( + self.context) + + self.assertEqual(cluster_health_status.UNHEALTHY, + self.cluster4.health_status) + self.assertEqual({'api': 'ok', 'node-0.Ready': 'False'}, + self.cluster4.health_status_reason)