[k8s] Support updating k8s cluster health status

The original design of k8s cluster health status is allowing
the health status being updated by Magnum control plane. However,
it doesn't work when the cluster is private. This patch supports
updating the k8s cluster health status via the Magnum cluster
update API by a 3rd party service so that a controller (e.g.
magnum-auto-healer) running inside the k8s cluster can call
the Magnum update API to update the cluster health status.

Task: 38583
Story: 2007242

Change-Id: Ie7189d328c4038403576b0324e7b0e8a9b305a5e
This commit is contained in:
Feilong Wang 2020-02-28 17:34:28 +13:00
parent 0fffdd1956
commit 63e80c3108
15 changed files with 150 additions and 22 deletions

View File

@ -37,6 +37,7 @@ created and managed by Magnum to support the COE's.
#. `Rolling Upgrade`_
#. `Keystone Authentication and Authorization for Kubernetes`_
#. `Node Groups`_
#. `Kubernetes Health Monitoring`_
Overview
========
@ -3479,7 +3480,7 @@ Rolling Upgrade
===============
.. include:: rolling-upgrade.rst
=======
Keystone Authentication and Authorization for Kubernetes
========================================================
@ -3490,3 +3491,8 @@ Node Groups
===========
.. include:: node-groups.rst
Kubernetes Health Monitoring
============================
.. include:: k8s-health-monitoring.rst

View File

@ -0,0 +1,26 @@
Currently Magnum can support health monitoring for Kubernetes cluster. There
are two scenarios supported now: internal and external.
Internal Health Monitoring
--------------------------
Magnum has a periodic job to poll the k8s cluster if it is a reachable cluster.
If the floating IP is enabled, or the master loadbalancer is enabled and the
master loadbalancer has floating IP associated, then Magnum will take this
cluster as reachable. Then Magnum will call the k8s API per 10 seconds to poll
the health status of the cluster and then update the two attributes:
`health_status` and `health_status_reason`.
External Health Montorning
--------------------------
Currently, only `magnum-auto-healer
<https://github.com/kubernetes/cloud-provider-openstack/tree/master/pkg/autohealing>`_
is able to update cluster's `health_status` and `health_status_reason`
attributes. Both the label `auto_healing_enabled=True` and
`auto_healing_controller=magnum-auto-healer` must be set, otherwise, the two
attributes' value will be overwritten with 'UNKNOWN' and 'The cluster is not
accessible'. The health_status attribute can either be in `HEALTHY`,
`UNHEALTHY` or `UNKNOWN` and the health_status_reason is a dictionary
of the hostnames and their current health statuses and the API health status.

View File

@ -524,8 +524,12 @@ class ClustersController(base.Controller):
:param cluster_ident: UUID or logical name of a cluster.
:param patch: a json PATCH document to apply to this cluster.
"""
cluster, node_count = self._patch(cluster_ident, patch)
pecan.request.rpcapi.cluster_update_async(cluster, node_count)
(cluster, node_count,
health_status,
health_status_reason) = self._patch(cluster_ident, patch)
pecan.request.rpcapi.cluster_update_async(cluster, node_count,
health_status,
health_status_reason)
return ClusterID(cluster.uuid)
@base.Controller.api_version("1.3") # noqa
@ -539,8 +543,12 @@ class ClustersController(base.Controller):
:param rollback: whether to rollback cluster on update failure.
:param patch: a json PATCH document to apply to this cluster.
"""
cluster, node_count = self._patch(cluster_ident, patch)
(cluster, node_count,
health_status,
health_status_reason) = self._patch(cluster_ident, patch)
pecan.request.rpcapi.cluster_update_async(cluster, node_count,
health_status,
health_status_reason,
rollback)
return ClusterID(cluster.uuid)
@ -554,6 +562,8 @@ class ClustersController(base.Controller):
cluster = api_utils.get_resource('Cluster', cluster_ident)
policy.enforce(context, 'cluster:update', cluster.as_dict(),
action='cluster:update')
policy.enforce(context, "cluster:update_health_status",
action="cluster:update_health_status")
try:
cluster_dict = cluster.as_dict()
new_cluster = Cluster(**api_utils.apply_jsonpatch(cluster_dict,
@ -571,7 +581,8 @@ class ClustersController(base.Controller):
delta.add(field)
validation.validate_cluster_properties(delta)
return cluster, new_cluster.node_count
return (cluster, new_cluster.node_count,
new_cluster.health_status, new_cluster.health_status_reason)
@expose.expose(None, types.uuid_or_name, status_code=204)
def delete(self, cluster_ident):

View File

@ -82,7 +82,8 @@ def apply_jsonpatch(doc, patch):
"'replace' operation instead.") % p['path']
raise wsme.exc.ClientSideError(msg)
if p['op'] == 'replace' and p['path'] == '/labels':
if (p['op'] == 'replace' and (p['path'] == '/labels' or
p['path'] == '/health_status_reason')):
try:
val = p['value']
dict_val = val if type(val) == dict else ast.literal_eval(val)

View File

@ -29,7 +29,8 @@ from magnum import objects
CONF = magnum.conf.CONF
cluster_update_allowed_properties = set(['node_count'])
cluster_update_allowed_properties = set(['node_count', 'health_status',
'health_status_reason'])
federation_update_allowed_properties = set(['member_ids', 'properties'])

View File

@ -129,6 +129,17 @@ rules = [
}
]
),
policy.DocumentedRuleDefault(
name=CLUSTER % 'update_health_status',
check_str=base.RULE_ADMIN_OR_USER + " or " + base.RULE_CLUSTER_USER,
description='Update the health status of an existing cluster.',
operations=[
{
'path': '/v1/clusters/{cluster_ident}',
'method': 'PATCH'
}
]
),
policy.DocumentedRuleDefault(
name=CLUSTER % 'update_all_projects',
check_str=base.RULE_ADMIN_API,

View File

@ -49,13 +49,21 @@ class API(rpc_service.API):
def cluster_delete_async(self, uuid):
self._cast('cluster_delete', uuid=uuid)
def cluster_update(self, cluster, node_count):
def cluster_update(self, cluster, node_count,
health_status, health_status_reason):
return self._call(
'cluster_update', cluster=cluster, node_count=node_count)
'cluster_update', cluster=cluster, node_count=node_count,
health_status=health_status,
health_status_reason=health_status_reason)
def cluster_update_async(self, cluster, node_count, rollback=False):
def cluster_update_async(self, cluster, node_count,
health_status, health_status_reason,
rollback=False):
self._cast('cluster_update', cluster=cluster,
node_count=node_count, rollback=rollback)
node_count=node_count,
health_status=health_status,
health_status_reason=health_status_reason,
rollback=rollback)
def cluster_resize(self, cluster, node_count, nodes_to_remove,
nodegroup, rollback=False):

View File

@ -97,7 +97,8 @@ class Handler(object):
return cluster
def cluster_update(self, context, cluster, node_count, rollback=False):
def cluster_update(self, context, cluster, node_count,
health_status, health_status_reason, rollback=False):
LOG.debug('cluster_heat cluster_update')
osc = clients.OpenStackClients(context)
@ -122,8 +123,20 @@ class Handler(object):
# Updates will be only reflected to the default worker
# nodegroup.
worker_ng = cluster.default_ng_worker
if worker_ng.node_count == node_count:
if (worker_ng.node_count == node_count and
cluster.health_status == health_status and
cluster.health_status_reason == health_status_reason):
return
cluster.health_status = health_status
cluster.health_status_reason = health_status_reason
# It's not necessary to trigger driver's cluster update if it's
# only health status update
if worker_ng.node_count == node_count:
cluster.save()
return cluster
# Backup the old node count so that we can restore it
# in case of an exception.
old_node_count = worker_ng.node_count

View File

@ -49,6 +49,9 @@ class K8sMonitor(monitors.MonitorBase):
self.data['pods'] = self._parse_pod_info(pods)
def poll_health_status(self):
if self._is_magnum_auto_healer_running():
return
k8s_api = k8s.create_k8s_api(self.context, self.cluster)
if self._is_cluster_accessible():
status, reason = self._poll_health_status(k8s_api)
@ -60,6 +63,12 @@ class K8sMonitor(monitors.MonitorBase):
self.data['health_status'] = status
self.data['health_status_reason'] = reason
def _is_magnum_auto_healer_running(self):
auto_healing = self.cluster.labels.get("auto_healing_enabled")
auto_healing_enabled = strutils.bool_from_string(auto_healing)
controller = self.cluster.labels.get("auto_healing_controller")
return (auto_healing_enabled and controller == "magnum-auto-healer")
def _is_cluster_accessible(self):
if self.cluster.cluster_template.master_lb_enabled:
lb_fip = self.cluster.labels.get("master_lb_floating_ip_enabled",

View File

@ -259,13 +259,15 @@ class TestPatch(api_base.FunctionalTest):
self.cluster_template_obj = obj_utils.create_test_cluster_template(
self.context)
self.cluster_obj = obj_utils.create_test_cluster(
self.context, name='cluster_example_A', node_count=3)
self.context, name='cluster_example_A', node_count=3,
health_status='UNKNOWN', health_status_reason={})
p = mock.patch.object(rpcapi.API, 'cluster_update_async')
self.mock_cluster_update = p.start()
self.mock_cluster_update.side_effect = self._sim_rpc_cluster_update
self.addCleanup(p.stop)
def _sim_rpc_cluster_update(self, cluster, node_count, rollback=False):
def _sim_rpc_cluster_update(self, cluster, node_count, health_status,
health_status_reason, rollback=False):
cluster.status = 'UPDATE_IN_PROGRESS'
default_ng_worker = cluster.default_ng_worker
default_ng_worker.node_count = node_count
@ -434,7 +436,8 @@ class TestPatch(api_base.FunctionalTest):
headers={'OpenStack-API-Version': 'container-infra 1.3'})
self.mock_cluster_update.assert_called_once_with(
mock.ANY, node_count, True)
mock.ANY, node_count, self.cluster_obj.health_status,
self.cluster_obj.health_status_reason, True)
self.assertEqual(202, response.status_code)
def test_update_cluster_with_rollback_disabled(self):
@ -446,7 +449,8 @@ class TestPatch(api_base.FunctionalTest):
headers={'OpenStack-API-Version': 'container-infra 1.3'})
self.mock_cluster_update.assert_called_once_with(
mock.ANY, node_count, False)
mock.ANY, node_count, self.cluster_obj.health_status,
self.cluster_obj.health_status_reason, False)
self.assertEqual(202, response.status_code)
def test_remove_ok(self):

View File

@ -27,6 +27,7 @@ from magnum.conductor.handlers import cluster_conductor
import magnum.conf
from magnum.drivers.k8s_fedora_atomic_v1 import driver as k8s_atomic_dr
from magnum import objects
from magnum.objects.fields import ClusterHealthStatus
from magnum.objects.fields import ClusterStatus as cluster_status
from magnum.tests import fake_notifier
from magnum.tests.unit.db import base as db_base
@ -79,7 +80,8 @@ class TestHandler(db_base.DbTestCase):
self.master.create()
self.worker.create()
self.cluster.status = cluster_status.CREATE_COMPLETE
self.handler.cluster_update(self.context, self.cluster, node_count)
self.handler.cluster_update(self.context, self.cluster, node_count,
ClusterHealthStatus.UNKNOWN, {})
notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(1, len(notifications))
@ -111,7 +113,8 @@ class TestHandler(db_base.DbTestCase):
self.worker.create()
self.cluster.status = cluster_status.CREATE_FAILED
self.assertRaises(exception.NotSupported, self.handler.cluster_update,
self.context, self.cluster, node_count)
self.context, self.cluster, node_count,
ClusterHealthStatus.UNKNOWN, {})
notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(1, len(notifications))
@ -144,7 +147,8 @@ class TestHandler(db_base.DbTestCase):
self.cluster.status = cluster_status.CREATE_COMPLETE
self.master.create()
self.worker.create()
self.handler.cluster_update(self.context, self.cluster, node_count)
self.handler.cluster_update(self.context, self.cluster, node_count,
ClusterHealthStatus.UNKNOWN, {})
notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(1, len(notifications))

View File

@ -542,3 +542,24 @@ class MonitorsTestCase(base.TestCase):
self.k8s_monitor.poll_health_status()
self.assertEqual(self.k8s_monitor.data['health_status'],
m_fields.ClusterHealthStatus.UNKNOWN)
def test_is_magnum_auto_healer_running(self):
cluster = self.k8s_monitor.cluster
cluster.labels['auto_healing_enabled'] = True
cluster.labels['auto_healing_controller'] = 'magnum-auto-healer'
self.k8s_monitor._is_magnum_auto_healer_running()
self.assertTrue(self.k8s_monitor._is_magnum_auto_healer_running())
cluster.labels['auto_healing_enabled'] = False
cluster.labels['auto_healing_controller'] = 'magnum-auto-healer'
self.k8s_monitor._is_magnum_auto_healer_running()
self.assertFalse(self.k8s_monitor._is_magnum_auto_healer_running())
cluster.labels['auto_healing_enabled'] = True
cluster.labels['auto_healing_controller'] = 'draino'
self.k8s_monitor._is_magnum_auto_healer_running()
self.assertFalse(self.k8s_monitor._is_magnum_auto_healer_running())
cluster.labels = {}
self.k8s_monitor._is_magnum_auto_healer_running()
self.assertFalse(self.k8s_monitor._is_magnum_auto_healer_running())

View File

@ -19,6 +19,7 @@ import mock
from magnum.conductor import api as conductor_rpcapi
from magnum import objects
from magnum.objects.fields import ClusterHealthStatus
from magnum.tests.unit.db import base
from magnum.tests.unit.db import utils as dbutils
@ -99,7 +100,9 @@ class RPCAPITestCase(base.DbTestCase):
'call',
version='1.1',
cluster=self.fake_cluster['name'],
node_count=2)
node_count=2,
health_status=ClusterHealthStatus.UNKNOWN,
health_status_reason={})
def test_ping_conductor(self):
self._test_rpcapi('ping_conductor',

View File

@ -116,7 +116,7 @@ def get_test_cluster(**kw):
if attr in kw:
attrs[attr] = kw[attr]
# Required only in PeriodicTestCase, may break other tests
for attr in ['keypair', 'health_status']:
for attr in ['keypair', 'health_status', 'health_status_reason']:
if attr in kw:
attrs[attr] = kw[attr]

View File

@ -0,0 +1,10 @@
---
features:
- |
The original design of k8s cluster health status is allowing
the health status being updated by Magnum control plane. However,
it doesn't work when the cluster is private. Now Magnum supports
updating the k8s cluster health status via the Magnum cluster
update API so that a controller (e.g. magnum-auto-healer) running
inside the k8s cluster can call the Magnum update API to update
the cluster health status.