Merge "[k8s] Support updating k8s cluster health status"
This commit is contained in:
commit
8768888b0e
|
@ -37,6 +37,7 @@ created and managed by Magnum to support the COE's.
|
|||
#. `Rolling Upgrade`_
|
||||
#. `Keystone Authentication and Authorization for Kubernetes`_
|
||||
#. `Node Groups`_
|
||||
#. `Kubernetes Health Monitoring`_
|
||||
|
||||
Overview
|
||||
========
|
||||
|
@ -3497,7 +3498,7 @@ Rolling Upgrade
|
|||
===============
|
||||
|
||||
.. include:: rolling-upgrade.rst
|
||||
=======
|
||||
|
||||
|
||||
Keystone Authentication and Authorization for Kubernetes
|
||||
========================================================
|
||||
|
@ -3508,3 +3509,8 @@ Node Groups
|
|||
===========
|
||||
|
||||
.. include:: node-groups.rst
|
||||
|
||||
Kubernetes Health Monitoring
|
||||
============================
|
||||
|
||||
.. include:: k8s-health-monitoring.rst
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
Currently Magnum can support health monitoring for Kubernetes cluster. There
|
||||
are two scenarios supported now: internal and external.
|
||||
|
||||
Internal Health Monitoring
|
||||
--------------------------
|
||||
|
||||
Magnum has a periodic job to poll the k8s cluster if it is a reachable cluster.
|
||||
If the floating IP is enabled, or the master loadbalancer is enabled and the
|
||||
master loadbalancer has floating IP associated, then Magnum will take this
|
||||
cluster as reachable. Then Magnum will call the k8s API per 10 seconds to poll
|
||||
the health status of the cluster and then update the two attributes:
|
||||
`health_status` and `health_status_reason`.
|
||||
|
||||
External Health Montorning
|
||||
--------------------------
|
||||
|
||||
Currently, only `magnum-auto-healer
|
||||
<https://github.com/kubernetes/cloud-provider-openstack/tree/master/pkg/autohealing>`_
|
||||
is able to update cluster's `health_status` and `health_status_reason`
|
||||
attributes. Both the label `auto_healing_enabled=True` and
|
||||
`auto_healing_controller=magnum-auto-healer` must be set, otherwise, the two
|
||||
attributes' value will be overwritten with 'UNKNOWN' and 'The cluster is not
|
||||
accessible'. The health_status attribute can either be in `HEALTHY`,
|
||||
`UNHEALTHY` or `UNKNOWN` and the health_status_reason is a dictionary
|
||||
of the hostnames and their current health statuses and the API health status.
|
||||
|
|
@ -524,8 +524,12 @@ class ClustersController(base.Controller):
|
|||
:param cluster_ident: UUID or logical name of a cluster.
|
||||
:param patch: a json PATCH document to apply to this cluster.
|
||||
"""
|
||||
cluster, node_count = self._patch(cluster_ident, patch)
|
||||
pecan.request.rpcapi.cluster_update_async(cluster, node_count)
|
||||
(cluster, node_count,
|
||||
health_status,
|
||||
health_status_reason) = self._patch(cluster_ident, patch)
|
||||
pecan.request.rpcapi.cluster_update_async(cluster, node_count,
|
||||
health_status,
|
||||
health_status_reason)
|
||||
return ClusterID(cluster.uuid)
|
||||
|
||||
@base.Controller.api_version("1.3") # noqa
|
||||
|
@ -539,8 +543,12 @@ class ClustersController(base.Controller):
|
|||
:param rollback: whether to rollback cluster on update failure.
|
||||
:param patch: a json PATCH document to apply to this cluster.
|
||||
"""
|
||||
cluster, node_count = self._patch(cluster_ident, patch)
|
||||
(cluster, node_count,
|
||||
health_status,
|
||||
health_status_reason) = self._patch(cluster_ident, patch)
|
||||
pecan.request.rpcapi.cluster_update_async(cluster, node_count,
|
||||
health_status,
|
||||
health_status_reason,
|
||||
rollback)
|
||||
return ClusterID(cluster.uuid)
|
||||
|
||||
|
@ -554,6 +562,8 @@ class ClustersController(base.Controller):
|
|||
cluster = api_utils.get_resource('Cluster', cluster_ident)
|
||||
policy.enforce(context, 'cluster:update', cluster.as_dict(),
|
||||
action='cluster:update')
|
||||
policy.enforce(context, "cluster:update_health_status",
|
||||
action="cluster:update_health_status")
|
||||
try:
|
||||
cluster_dict = cluster.as_dict()
|
||||
new_cluster = Cluster(**api_utils.apply_jsonpatch(cluster_dict,
|
||||
|
@ -571,7 +581,8 @@ class ClustersController(base.Controller):
|
|||
delta.add(field)
|
||||
|
||||
validation.validate_cluster_properties(delta)
|
||||
return cluster, new_cluster.node_count
|
||||
return (cluster, new_cluster.node_count,
|
||||
new_cluster.health_status, new_cluster.health_status_reason)
|
||||
|
||||
@expose.expose(None, types.uuid_or_name, status_code=204)
|
||||
def delete(self, cluster_ident):
|
||||
|
|
|
@ -82,7 +82,8 @@ def apply_jsonpatch(doc, patch):
|
|||
"'replace' operation instead.") % p['path']
|
||||
raise wsme.exc.ClientSideError(msg)
|
||||
|
||||
if p['op'] == 'replace' and p['path'] == '/labels':
|
||||
if (p['op'] == 'replace' and (p['path'] == '/labels' or
|
||||
p['path'] == '/health_status_reason')):
|
||||
try:
|
||||
val = p['value']
|
||||
dict_val = val if type(val) == dict else ast.literal_eval(val)
|
||||
|
|
|
@ -29,7 +29,8 @@ from magnum import objects
|
|||
|
||||
CONF = magnum.conf.CONF
|
||||
|
||||
cluster_update_allowed_properties = set(['node_count'])
|
||||
cluster_update_allowed_properties = set(['node_count', 'health_status',
|
||||
'health_status_reason'])
|
||||
federation_update_allowed_properties = set(['member_ids', 'properties'])
|
||||
|
||||
|
||||
|
|
|
@ -129,6 +129,17 @@ rules = [
|
|||
}
|
||||
]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
name=CLUSTER % 'update_health_status',
|
||||
check_str=base.RULE_ADMIN_OR_USER + " or " + base.RULE_CLUSTER_USER,
|
||||
description='Update the health status of an existing cluster.',
|
||||
operations=[
|
||||
{
|
||||
'path': '/v1/clusters/{cluster_ident}',
|
||||
'method': 'PATCH'
|
||||
}
|
||||
]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
name=CLUSTER % 'update_all_projects',
|
||||
check_str=base.RULE_ADMIN_API,
|
||||
|
|
|
@ -49,13 +49,21 @@ class API(rpc_service.API):
|
|||
def cluster_delete_async(self, uuid):
|
||||
self._cast('cluster_delete', uuid=uuid)
|
||||
|
||||
def cluster_update(self, cluster, node_count):
|
||||
def cluster_update(self, cluster, node_count,
|
||||
health_status, health_status_reason):
|
||||
return self._call(
|
||||
'cluster_update', cluster=cluster, node_count=node_count)
|
||||
'cluster_update', cluster=cluster, node_count=node_count,
|
||||
health_status=health_status,
|
||||
health_status_reason=health_status_reason)
|
||||
|
||||
def cluster_update_async(self, cluster, node_count, rollback=False):
|
||||
def cluster_update_async(self, cluster, node_count,
|
||||
health_status, health_status_reason,
|
||||
rollback=False):
|
||||
self._cast('cluster_update', cluster=cluster,
|
||||
node_count=node_count, rollback=rollback)
|
||||
node_count=node_count,
|
||||
health_status=health_status,
|
||||
health_status_reason=health_status_reason,
|
||||
rollback=rollback)
|
||||
|
||||
def cluster_resize(self, cluster, node_count, nodes_to_remove,
|
||||
nodegroup, rollback=False):
|
||||
|
|
|
@ -97,7 +97,8 @@ class Handler(object):
|
|||
|
||||
return cluster
|
||||
|
||||
def cluster_update(self, context, cluster, node_count, rollback=False):
|
||||
def cluster_update(self, context, cluster, node_count,
|
||||
health_status, health_status_reason, rollback=False):
|
||||
LOG.debug('cluster_heat cluster_update')
|
||||
|
||||
osc = clients.OpenStackClients(context)
|
||||
|
@ -122,8 +123,20 @@ class Handler(object):
|
|||
# Updates will be only reflected to the default worker
|
||||
# nodegroup.
|
||||
worker_ng = cluster.default_ng_worker
|
||||
if worker_ng.node_count == node_count:
|
||||
if (worker_ng.node_count == node_count and
|
||||
cluster.health_status == health_status and
|
||||
cluster.health_status_reason == health_status_reason):
|
||||
return
|
||||
|
||||
cluster.health_status = health_status
|
||||
cluster.health_status_reason = health_status_reason
|
||||
|
||||
# It's not necessary to trigger driver's cluster update if it's
|
||||
# only health status update
|
||||
if worker_ng.node_count == node_count:
|
||||
cluster.save()
|
||||
return cluster
|
||||
|
||||
# Backup the old node count so that we can restore it
|
||||
# in case of an exception.
|
||||
old_node_count = worker_ng.node_count
|
||||
|
|
|
@ -49,6 +49,9 @@ class K8sMonitor(monitors.MonitorBase):
|
|||
self.data['pods'] = self._parse_pod_info(pods)
|
||||
|
||||
def poll_health_status(self):
|
||||
if self._is_magnum_auto_healer_running():
|
||||
return
|
||||
|
||||
k8s_api = k8s.create_k8s_api(self.context, self.cluster)
|
||||
if self._is_cluster_accessible():
|
||||
status, reason = self._poll_health_status(k8s_api)
|
||||
|
@ -60,6 +63,12 @@ class K8sMonitor(monitors.MonitorBase):
|
|||
self.data['health_status'] = status
|
||||
self.data['health_status_reason'] = reason
|
||||
|
||||
def _is_magnum_auto_healer_running(self):
|
||||
auto_healing = self.cluster.labels.get("auto_healing_enabled")
|
||||
auto_healing_enabled = strutils.bool_from_string(auto_healing)
|
||||
controller = self.cluster.labels.get("auto_healing_controller")
|
||||
return (auto_healing_enabled and controller == "magnum-auto-healer")
|
||||
|
||||
def _is_cluster_accessible(self):
|
||||
if self.cluster.cluster_template.master_lb_enabled:
|
||||
lb_fip = self.cluster.labels.get("master_lb_floating_ip_enabled",
|
||||
|
|
|
@ -259,13 +259,15 @@ class TestPatch(api_base.FunctionalTest):
|
|||
self.cluster_template_obj = obj_utils.create_test_cluster_template(
|
||||
self.context)
|
||||
self.cluster_obj = obj_utils.create_test_cluster(
|
||||
self.context, name='cluster_example_A', node_count=3)
|
||||
self.context, name='cluster_example_A', node_count=3,
|
||||
health_status='UNKNOWN', health_status_reason={})
|
||||
p = mock.patch.object(rpcapi.API, 'cluster_update_async')
|
||||
self.mock_cluster_update = p.start()
|
||||
self.mock_cluster_update.side_effect = self._sim_rpc_cluster_update
|
||||
self.addCleanup(p.stop)
|
||||
|
||||
def _sim_rpc_cluster_update(self, cluster, node_count, rollback=False):
|
||||
def _sim_rpc_cluster_update(self, cluster, node_count, health_status,
|
||||
health_status_reason, rollback=False):
|
||||
cluster.status = 'UPDATE_IN_PROGRESS'
|
||||
default_ng_worker = cluster.default_ng_worker
|
||||
default_ng_worker.node_count = node_count
|
||||
|
@ -434,7 +436,8 @@ class TestPatch(api_base.FunctionalTest):
|
|||
headers={'OpenStack-API-Version': 'container-infra 1.3'})
|
||||
|
||||
self.mock_cluster_update.assert_called_once_with(
|
||||
mock.ANY, node_count, True)
|
||||
mock.ANY, node_count, self.cluster_obj.health_status,
|
||||
self.cluster_obj.health_status_reason, True)
|
||||
self.assertEqual(202, response.status_code)
|
||||
|
||||
def test_update_cluster_with_rollback_disabled(self):
|
||||
|
@ -446,7 +449,8 @@ class TestPatch(api_base.FunctionalTest):
|
|||
headers={'OpenStack-API-Version': 'container-infra 1.3'})
|
||||
|
||||
self.mock_cluster_update.assert_called_once_with(
|
||||
mock.ANY, node_count, False)
|
||||
mock.ANY, node_count, self.cluster_obj.health_status,
|
||||
self.cluster_obj.health_status_reason, False)
|
||||
self.assertEqual(202, response.status_code)
|
||||
|
||||
def test_remove_ok(self):
|
||||
|
|
|
@ -27,6 +27,7 @@ from magnum.conductor.handlers import cluster_conductor
|
|||
import magnum.conf
|
||||
from magnum.drivers.k8s_fedora_atomic_v1 import driver as k8s_atomic_dr
|
||||
from magnum import objects
|
||||
from magnum.objects.fields import ClusterHealthStatus
|
||||
from magnum.objects.fields import ClusterStatus as cluster_status
|
||||
from magnum.tests import fake_notifier
|
||||
from magnum.tests.unit.db import base as db_base
|
||||
|
@ -79,7 +80,8 @@ class TestHandler(db_base.DbTestCase):
|
|||
self.master.create()
|
||||
self.worker.create()
|
||||
self.cluster.status = cluster_status.CREATE_COMPLETE
|
||||
self.handler.cluster_update(self.context, self.cluster, node_count)
|
||||
self.handler.cluster_update(self.context, self.cluster, node_count,
|
||||
ClusterHealthStatus.UNKNOWN, {})
|
||||
|
||||
notifications = fake_notifier.NOTIFICATIONS
|
||||
self.assertEqual(1, len(notifications))
|
||||
|
@ -111,7 +113,8 @@ class TestHandler(db_base.DbTestCase):
|
|||
self.worker.create()
|
||||
self.cluster.status = cluster_status.CREATE_FAILED
|
||||
self.assertRaises(exception.NotSupported, self.handler.cluster_update,
|
||||
self.context, self.cluster, node_count)
|
||||
self.context, self.cluster, node_count,
|
||||
ClusterHealthStatus.UNKNOWN, {})
|
||||
|
||||
notifications = fake_notifier.NOTIFICATIONS
|
||||
self.assertEqual(1, len(notifications))
|
||||
|
@ -144,7 +147,8 @@ class TestHandler(db_base.DbTestCase):
|
|||
self.cluster.status = cluster_status.CREATE_COMPLETE
|
||||
self.master.create()
|
||||
self.worker.create()
|
||||
self.handler.cluster_update(self.context, self.cluster, node_count)
|
||||
self.handler.cluster_update(self.context, self.cluster, node_count,
|
||||
ClusterHealthStatus.UNKNOWN, {})
|
||||
|
||||
notifications = fake_notifier.NOTIFICATIONS
|
||||
self.assertEqual(1, len(notifications))
|
||||
|
|
|
@ -542,3 +542,24 @@ class MonitorsTestCase(base.TestCase):
|
|||
self.k8s_monitor.poll_health_status()
|
||||
self.assertEqual(self.k8s_monitor.data['health_status'],
|
||||
m_fields.ClusterHealthStatus.UNKNOWN)
|
||||
|
||||
def test_is_magnum_auto_healer_running(self):
|
||||
cluster = self.k8s_monitor.cluster
|
||||
cluster.labels['auto_healing_enabled'] = True
|
||||
cluster.labels['auto_healing_controller'] = 'magnum-auto-healer'
|
||||
self.k8s_monitor._is_magnum_auto_healer_running()
|
||||
self.assertTrue(self.k8s_monitor._is_magnum_auto_healer_running())
|
||||
|
||||
cluster.labels['auto_healing_enabled'] = False
|
||||
cluster.labels['auto_healing_controller'] = 'magnum-auto-healer'
|
||||
self.k8s_monitor._is_magnum_auto_healer_running()
|
||||
self.assertFalse(self.k8s_monitor._is_magnum_auto_healer_running())
|
||||
|
||||
cluster.labels['auto_healing_enabled'] = True
|
||||
cluster.labels['auto_healing_controller'] = 'draino'
|
||||
self.k8s_monitor._is_magnum_auto_healer_running()
|
||||
self.assertFalse(self.k8s_monitor._is_magnum_auto_healer_running())
|
||||
|
||||
cluster.labels = {}
|
||||
self.k8s_monitor._is_magnum_auto_healer_running()
|
||||
self.assertFalse(self.k8s_monitor._is_magnum_auto_healer_running())
|
||||
|
|
|
@ -19,6 +19,7 @@ import mock
|
|||
|
||||
from magnum.conductor import api as conductor_rpcapi
|
||||
from magnum import objects
|
||||
from magnum.objects.fields import ClusterHealthStatus
|
||||
from magnum.tests.unit.db import base
|
||||
from magnum.tests.unit.db import utils as dbutils
|
||||
|
||||
|
@ -99,7 +100,9 @@ class RPCAPITestCase(base.DbTestCase):
|
|||
'call',
|
||||
version='1.1',
|
||||
cluster=self.fake_cluster['name'],
|
||||
node_count=2)
|
||||
node_count=2,
|
||||
health_status=ClusterHealthStatus.UNKNOWN,
|
||||
health_status_reason={})
|
||||
|
||||
def test_ping_conductor(self):
|
||||
self._test_rpcapi('ping_conductor',
|
||||
|
|
|
@ -116,7 +116,7 @@ def get_test_cluster(**kw):
|
|||
if attr in kw:
|
||||
attrs[attr] = kw[attr]
|
||||
# Required only in PeriodicTestCase, may break other tests
|
||||
for attr in ['keypair', 'health_status']:
|
||||
for attr in ['keypair', 'health_status', 'health_status_reason']:
|
||||
if attr in kw:
|
||||
attrs[attr] = kw[attr]
|
||||
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
The original design of k8s cluster health status is allowing
|
||||
the health status being updated by Magnum control plane. However,
|
||||
it doesn't work when the cluster is private. Now Magnum supports
|
||||
updating the k8s cluster health status via the Magnum cluster
|
||||
update API so that a controller (e.g. magnum-auto-healer) running
|
||||
inside the k8s cluster can call the Magnum update API to update
|
||||
the cluster health status.
|
Loading…
Reference in New Issue