Add liveness checks to Kuryr Controller

This patch adds liveness checks for watcher and handlers, without passing the
manager's reference to modules that probably should not be aware of it.

Related-Bug: #1705429
Change-Id: I0192756c556b13f98302a57acedce269c278e260
This commit is contained in:
Maysa Macedo 2018-01-19 00:31:25 +00:00
parent d614d04caf
commit c00897c02e
15 changed files with 199 additions and 44 deletions

View File

@ -439,10 +439,15 @@ spec:
subPath: kuryr.conf
readinessProbe:
httpGet:
path: /healthz
path: /ready
port: ${health_server_port}
scheme: HTTP
timeoutSeconds: 5
livenessProbe:
httpGet:
path: /alive
port: ${health_server_port}
initialDelaySeconds: 15
EOF
cat >> "${output_dir}/controller_deployment.yml" << EOF

View File

@ -23,8 +23,8 @@ The purpose of this document is to present the design decision behind
Kuryr Kubernetes Health Manager.
The main purpose of the Health Manager is to perform Health verifications
that assures Kuryr Controller readiness and so improve the management that
Kubernetes does on Kuryr Controller pod.
that assures Kuryr Controller readiness and liveness, and so improve the
management that Kubernetes does on Kuryr Controller pod.
Overview
--------
@ -34,19 +34,19 @@ unable to connect with services it depends on and they being not healthy.
It is important to check health of these services so that Kubernetes and
its users know when Kuryr Controller it is ready to perform its networking
tasks. To provide this functionality, Health Manager will verify and serve
the health state of these services to the probe.
tasks. Also, it is necessary to check the health state of Kuryr components in
order to assure Kuryr Controller service is alive. To provide these
functionalities, Health Manager will verify and serve the health state of
these services and components to the probe.
Proposed Solution
-----------------
The Health Manager will provide an endpoint that will check whether it is
One of the endpoints provided by The Health Manager will check whether it is
able to watch the Kubernetes API, authenticate with Keystone and talk to
Neutron, since these are services needed by Kuryr Controller. These checks
will assure the Controller readiness.
will assure the Controller readiness. The other endpoint, will verify
the health state of Kuryr components and guarantee Controller liveness.
The idea behind the Manager is to combine all the necessary checks in a
server running inside Kuryr Controller pod and provide the checks result
to the probe.
This design focuses on providing health checks for readiness probe, but
another endpoint can be created for liveness probes.

View File

@ -40,6 +40,7 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler):
OBJECT_KIND = k_const.K8S_OBJ_SERVICE
def __init__(self):
super(LBaaSSpecHandler, self).__init__()
self._drv_project = drv_base.ServiceProjectDriver.get_instance()
self._drv_subnets = drv_base.ServiceSubnetsDriver.get_instance()
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()
@ -219,6 +220,7 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
OBJECT_KIND = k_const.K8S_OBJ_ENDPOINTS
def __init__(self):
super(LoadBalancerHandler, self).__init__()
self._drv_lbaas = drv_base.LBaaSDriver.get_instance()
self._drv_pod_project = drv_base.PodProjectDriver.get_instance()
self._drv_pod_subnets = drv_base.PodSubnetsDriver.get_instance()

View File

@ -40,6 +40,7 @@ class VIFHandler(k8s_base.ResourceEventHandler):
OBJECT_KIND = constants.K8S_OBJ_POD
def __init__(self):
super(VIFHandler, self).__init__()
self._drv_project = drivers.PodProjectDriver.get_instance()
self._drv_subnets = drivers.PodSubnetsDriver.get_instance()
self._drv_sg = drivers.PodSecurityGroupsDriver.get_instance()

View File

@ -12,13 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
from flask import Flask
from keystoneauth1 import exceptions as k_exc
from keystoneclient import client as keystone_client
from kuryr.lib._i18n import _
from kuryr.lib import config as kuryr_config
from kuryr.lib import utils
from kuryr_kubernetes.handlers import health as h_health
import os
from oslo_config import cfg
from oslo_log import log as logging
@ -38,16 +38,25 @@ CONF.register_opts(health_server_opts, "health_server")
class HealthServer(object):
"""Proxy server used by readiness and liveness probes to manage health checks.
Allows to verify connectivity with Kubernetes API, Keystone and Neutron.
If pool ports functionality is enabled it is verified whether
the precreated ports are loaded into the pools. Also, checks handlers
states.
"""
def __init__(self):
self.ctx = None
self._registry = h_health.HealthRegister.get_instance().registry
self.application = Flask('health-daemon')
self.application.add_url_rule(
'/healthz', methods=['GET'], view_func=self.read)
'/ready', methods=['GET'], view_func=self.readiness_status)
self.application.add_url_rule(
'/alive', methods=['GET'], view_func=self.liveness_status)
self.headers = {'Connection': 'close'}
def read(self):
def readiness_status(self):
data = 'ok'
if CONF.kubernetes.vif_pool_driver != 'noop':
@ -81,6 +90,15 @@ class HealthServer(object):
LOG.info('Kuryr Controller readiness verified.')
return data, httplib.OK, self.headers
def liveness_status(self):
data = 'ok'
for component in self._registry:
if not component.is_healthy():
LOG.debug('Kuryr Controller not healthy.')
return '', httplib.INTERNAL_SERVER_ERROR, self.headers
LOG.debug('Kuryr Controller Liveness verified.')
return data, httplib.OK, self.headers
def run(self):
address = ''
try:
@ -109,19 +127,3 @@ class HealthServer(object):
def verify_neutron_connection(self):
neutron = utils.get_neutron_client()
neutron.list_extensions()
class ReadinessChecker(object):
"""Proxy server used by readiness probe to manage health checks.
Allows to verify connectivity with Kubernetes API, Keystone and Neutron.
Also, if pool ports functionality is enabled it is verified whether
the precreated ports are loaded into the pools.
"""
def __init__(self):
eventlet.spawn(self._start_readiness_checker_daemon)
def _start_readiness_checker_daemon(self):
server = HealthServer()
server.run()

View File

@ -41,6 +41,7 @@ class KuryrK8sService(service.Service):
objects.register_locally_defined_vifs()
pipeline = h_pipeline.ControllerPipeline(self.tg)
self.watcher = watcher.Watcher(pipeline, self.tg)
self.health_manager = health.HealthServer()
# TODO(ivc): pluggable resource/handler registration
for resource in ["pods", "services", "endpoints"]:
self.watcher.add("%s/%s" % (constants.K8S_API_BASE, resource))
@ -50,9 +51,9 @@ class KuryrK8sService(service.Service):
def start(self):
LOG.info("Service '%s' starting", self.__class__.__name__)
health.ReadinessChecker()
super(KuryrK8sService, self).start()
self.watcher.start()
self.health_manager.run()
LOG.info("Service '%s' started", self.__class__.__name__)
def wait(self):

View File

@ -72,6 +72,9 @@ class EventConsumer(h_base.EventHandler):
registered by the `EventPipeline`.
"""
def __init__(self):
super(EventConsumer, self).__init__()
@abc.abstractproperty
def consumes(self):
"""Predicates determining events supported by this handler.

View File

@ -0,0 +1,44 @@
# Copyright 2018 Maysa de Macedo Souza.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class HealthRegister(object):
instance = None
def __init__(self):
self.registry = []
def register(self, elem):
self.registry.append(elem)
@classmethod
def get_instance(cls):
if not HealthRegister.instance:
HealthRegister.instance = cls()
return HealthRegister.instance
class HealthHandler(object):
"""Base class for health handlers."""
def __init__(self):
super(HealthHandler, self).__init__()
self._healthy = True
self._manager = HealthRegister.get_instance()
self._manager.register(self)
def set_health_status(self, healthy):
self._healthy = healthy
def is_healthy(self):
return self._healthy

View File

@ -14,6 +14,7 @@
# under the License.
from kuryr_kubernetes.handlers import dispatch
from kuryr_kubernetes.handlers import health
def object_kind(event):
@ -30,7 +31,7 @@ def object_link(event):
return None
class ResourceEventHandler(dispatch.EventConsumer):
class ResourceEventHandler(dispatch.EventConsumer, health.HealthHandler):
"""Base class for K8s event handlers.
Implementing classes should override the `OBJECT_KIND` attribute with a
@ -48,6 +49,9 @@ class ResourceEventHandler(dispatch.EventConsumer):
OBJECT_KIND = None
def __init__(self):
super(ResourceEventHandler, self).__init__()
@property
def consumes(self):
return {object_kind: self.OBJECT_KIND}

View File

@ -64,6 +64,12 @@ class Retry(base.EventHandler):
with excutils.save_and_reraise_exception() as ex:
if self._sleep(deadline, attempt, ex.value):
ex.reraise = False
else:
LOG.debug('Report handler unhealthy %s', self._handler)
self._handler.set_health_status(healthy=False)
except Exception:
LOG.debug('Report handler unhealthy %s', self._handler)
self._handler.set_health_status(healthy=False)
def _sleep(self, deadline, attempt, exception):
now = time.time()

View File

@ -14,11 +14,17 @@
from keystoneauth1 import exceptions
from kuryr_kubernetes.controller.managers import health
from kuryr_kubernetes.handlers import health as h_health
from kuryr_kubernetes.tests import base
import mock
from oslo_config import cfg as oslo_cfg
class _TestHandler(h_health.HealthHandler):
def is_healthy(self):
pass
class TestHealthServer(base.TestCase):
def setUp(self):
@ -38,7 +44,7 @@ class TestHealthServer(base.TestCase):
m_verify_neutron_conn, m_exist):
m_verify_k8s_conn.return_value = True, 200
m_exist.return_value = True
resp = self.test_client.get('/healthz')
resp = self.test_client.get('/ready')
m_verify_k8s_conn.assert_called_once()
m_verify_keystone_conn.assert_called_once()
m_verify_neutron_conn.assert_called_once_with()
@ -51,7 +57,7 @@ class TestHealthServer(base.TestCase):
m_exist.return_value = False
oslo_cfg.CONF.set_override('vif_pool_driver', 'neutron',
group='kubernetes')
resp = self.test_client.get('/healthz')
resp = self.test_client.get('/ready')
self.assertEqual(404, resp.status_code)
@mock.patch('kuryr_kubernetes.controller.managers.health.HealthServer.'
@ -60,7 +66,7 @@ class TestHealthServer(base.TestCase):
def test_read_k8s_error(self, m_exist, m_verify_k8s_conn):
m_exist.return_value = True
m_verify_k8s_conn.return_value = False, 503
resp = self.test_client.get('/healthz')
resp = self.test_client.get('/ready')
m_verify_k8s_conn.assert_called_once()
self.assertEqual(503, resp.status_code)
@ -75,7 +81,7 @@ class TestHealthServer(base.TestCase):
m_exist.return_value = True
m_verify_k8s_conn.return_value = True, 200
m_verify_keystone_conn.side_effect = exceptions.http.Unauthorized
resp = self.test_client.get('/healthz')
resp = self.test_client.get('/ready')
m_verify_keystone_conn.assert_called_once()
self.assertEqual(401, resp.status_code)
@ -92,7 +98,26 @@ class TestHealthServer(base.TestCase):
m_exist.return_value = True
m_verify_k8s_conn.return_value = True, 200
m_verify_neutron_conn.side_effect = Exception
resp = self.test_client.get('/healthz')
resp = self.test_client.get('/ready')
m_verify_neutron_conn.assert_called_once()
self.assertEqual(500, resp.status_code)
@mock.patch.object(_TestHandler, 'is_healthy')
def test_liveness(self, m_status):
m_status.return_value = True
self.srv._registry = [_TestHandler()]
resp = self.test_client.get('/alive')
m_status.assert_called_once()
self.assertEqual(200, resp.status_code)
@mock.patch.object(_TestHandler, 'is_healthy')
def test_liveness_error(self, m_status):
m_status.return_value = False
self.srv._registry = [_TestHandler()]
resp = self.test_client.get('/alive')
m_status.assert_called_once()
self.assertEqual(500, resp.status_code)

View File

@ -0,0 +1,47 @@
# Copyright 2018 Maysa de Macedo Souza.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kuryr_kubernetes.handlers import health as h_health
from kuryr_kubernetes.tests import base as test_base
import mock
class _TestHandler(h_health.HealthHandler):
def is_healthy(self):
pass
class TestHealthRegister(test_base.TestCase):
def test_register(self):
m_component = mock.Mock()
health_register = h_health.HealthRegister()
health_register.register(m_component)
self.assertEqual(health_register.registry, [m_component])
class TestHealthHandler(test_base.TestCase):
@mock.patch.object(h_health.HealthRegister, 'get_instance')
def test_init(self, m_health_register):
cls = h_health.HealthRegister
m_health_register_obj = mock.Mock(spec=cls)
m_health_register.return_value = m_health_register_obj
health_handler = _TestHandler()
self.assertTrue(health_handler._healthy)
m_health_register_obj.register.assert_called_once_with(health_handler)
self.assertEqual(m_health_register_obj, health_handler._manager)

View File

@ -137,14 +137,15 @@ class TestRetryHandler(test_base.TestCase):
@mock.patch('itertools.count')
@mock.patch.object(h_retry.Retry, '_sleep')
def test_call_raises_no_retry(self, m_sleep, m_count):
def test_call_should_not_raise(self, m_sleep, m_count):
event = mock.sentinel.event
m_handler = mock.Mock()
m_handler.side_effect = _EX1()
m_count.return_value = list(range(1, 5))
retry = h_retry.Retry(m_handler, exceptions=(_EX11, _EX2))
self.assertRaises(_EX1, retry, event)
retry(event)
m_handler.assert_called_once_with(event)
m_handler.assert_called_with(event)
m_handler.set_health_status.assert_called_with(healthy=False)
m_sleep.assert_not_called()

View File

@ -19,6 +19,7 @@ import mock
from kuryr_kubernetes.tests import base as test_base
from kuryr_kubernetes.tests.unit import kuryr_fixtures as kuryr_fixtures
from kuryr_kubernetes import watcher
from requests import exceptions
class TestWatcher(test_base.TestCase):
@ -289,3 +290,14 @@ class TestWatcher(test_base.TestCase):
m_handler.assert_called_once_with(events[0])
self.assertNotIn(path, watcher_obj._idle)
self.assertNotIn(path, watcher_obj._watching)
def test_watch_client_request_failed(self):
path = '/test'
m_handler = mock.Mock()
watcher_obj = self._test_watch_create_watcher(path, m_handler)
watcher_obj._watch(path)
self.client.watch.side_effect = exceptions.ChunkedEncodingError(
"Connection Broken")
self.client.watch.assert_called_once()
self.assertFalse(watcher_obj._healthy)

View File

@ -13,14 +13,14 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from kuryr_kubernetes import clients
from kuryr_kubernetes.handlers import health
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class Watcher(object):
class Watcher(health.HealthHandler):
"""Observes K8s resources' events using K8s '?watch=true' API.
The `Watcher` maintains a list of K8s resources and manages the event
@ -65,11 +65,11 @@ class Watcher(object):
specified, the `Watcher` will operate in a
synchronous mode.
"""
super(Watcher, self).__init__()
self._client = clients.get_kubernetes_client()
self._handler = handler
self._thread_group = thread_group
self._running = False
self._resources = set()
self._watching = {}
self._idle = {}
@ -141,6 +141,8 @@ class Watcher(object):
self._idle[path] = True
if not (self._running and path in self._resources):
return
except Exception:
self._healthy = False
finally:
self._watching.pop(path)
self._idle.pop(path)