Add readiness and liveness checks to CNI.

This patch adds readiness and liveness to CNI. It checks presence
of NET_ADMIN capabilities, IPDB in working order, connection to
Kubernetes API, quantity of CNI add failures, health of CNI
components and existence of memory leaks.

Implements: blueprint cni-daemon-readiness-liveness
Change-Id: I9a4b871d196dbadfed687df93bb3cad97c957bfb
This commit is contained in:
Maysa Macedo 2018-01-25 02:05:44 +00:00
parent 519391fac0
commit 1e4b7f1109
16 changed files with 441 additions and 51 deletions

View File

@ -461,8 +461,10 @@ EOF
function generate_cni_daemon_set() { function generate_cni_daemon_set() {
output_dir=$1 output_dir=$1
cni_bin_dir=${2:-/opt/cni/bin} cni_health_server_port=$2
cni_conf_dir=${3:-/etc/cni/net.d} cni_daemon=${3:-False}
cni_bin_dir=${4:-/opt/cni/bin}
cni_conf_dir=${5:-/etc/cni/net.d}
mkdir -p "$output_dir" mkdir -p "$output_dir"
rm -f ${output_dir}/cni_ds.yml rm -f ${output_dir}/cni_ds.yml
cat >> "${output_dir}/cni_ds.yml" << EOF cat >> "${output_dir}/cni_ds.yml" << EOF
@ -513,6 +515,24 @@ spec:
mountPath: /host_proc mountPath: /host_proc
- name: openvswitch - name: openvswitch
mountPath: /var/run/openvswitch mountPath: /var/run/openvswitch
EOF
if [ "$cni_daemon" == "True" ]; then
cat >> "${output_dir}/cni_ds.yml" << EOF
readinessProbe:
httpGet:
path: /ready
port: ${cni_health_server_port}
scheme: HTTP
initialDelaySeconds: 15
timeoutSeconds: 5
livenessProbe:
httpGet:
path: /alive
port: ${cni_health_server_port}
initialDelaySeconds: 15
EOF
fi
cat >> "${output_dir}/cni_ds.yml" << EOF
volumes: volumes:
- name: bin - name: bin
hostPath: hostPath:

View File

@ -121,6 +121,9 @@ function configure_kuryr {
} }
function generate_containerized_kuryr_resources { function generate_containerized_kuryr_resources {
local cni_daemon
cni_daemon=$1
# Containerized deployment will use tokens provided by k8s itself. # Containerized deployment will use tokens provided by k8s itself.
inicomment "$KURYR_CONFIG" kubernetes ssl_client_crt_file inicomment "$KURYR_CONFIG" kubernetes ssl_client_crt_file
inicomment "$KURYR_CONFIG" kubernetes ssl_client_key_file inicomment "$KURYR_CONFIG" kubernetes ssl_client_key_file
@ -138,7 +141,7 @@ function generate_containerized_kuryr_resources {
generate_kuryr_configmap $output_dir $KURYR_CONFIG $KURYR_CNI_CONFIG generate_kuryr_configmap $output_dir $KURYR_CONFIG $KURYR_CNI_CONFIG
generate_kuryr_service_account $output_dir generate_kuryr_service_account $output_dir
generate_controller_deployment $output_dir $KURYR_HEALTH_SERVER_PORT generate_controller_deployment $output_dir $KURYR_HEALTH_SERVER_PORT
generate_cni_daemon_set $output_dir $CNI_BIN_DIR $CNI_CONF_DIR generate_cni_daemon_set $output_dir $KURYR_CNI_HEALTH_SERVER_PORT $cni_daemon $CNI_BIN_DIR $CNI_CONF_DIR
} }
function run_containerized_kuryr_resources { function run_containerized_kuryr_resources {
@ -712,10 +715,11 @@ if [[ "$1" == "stack" && "$2" == "extra" ]]; then
else else
if is_service_enabled kuryr-daemon; then if is_service_enabled kuryr-daemon; then
build_kuryr_containers $CNI_BIN_DIR $CNI_CONF_DIR True build_kuryr_containers $CNI_BIN_DIR $CNI_CONF_DIR True
generate_containerized_kuryr_resources True
else else
build_kuryr_containers $CNI_BIN_DIR $CNI_CONF_DIR False build_kuryr_containers $CNI_BIN_DIR $CNI_CONF_DIR False
generate_containerized_kuryr_resources False
fi fi
generate_containerized_kuryr_resources
run_containerized_kuryr_resources run_containerized_kuryr_resources
fi fi
fi fi

View File

@ -80,3 +80,6 @@ KURYR_HEALTH_SERVER_PORT=${KURYR_HEALTH_SERVER_PORT:-8082}
# OVS HOST PATH # OVS HOST PATH
OVS_HOST_PATH=${OVS_HOST_PATH:-/var/run/openvswitch} OVS_HOST_PATH=${OVS_HOST_PATH:-/var/run/openvswitch}
# Health Server
KURYR_CNI_HEALTH_SERVER_PORT=${KURYR_CNI_HEALTH_SERVER_PORT:-8090}

View File

@ -20,11 +20,11 @@ Kuryr Kubernetes Health Manager Design
Purpose Purpose
------- -------
The purpose of this document is to present the design decision behind The purpose of this document is to present the design decision behind
Kuryr Kubernetes Health Manager. Kuryr Kubernetes Health Managers.
The main purpose of the Health Manager is to perform Health verifications The main purpose of the Health Managers is to perform Health verifications that
that assures Kuryr Controller readiness and liveness, and so improve the assures readiness and liveness to Kuryr Controller and CNI pod, and so improve
management that Kubernetes does on Kuryr Controller pod. the management that Kubernetes does on Kuryr-Kubernetes pods.
Overview Overview
-------- --------
@ -36,17 +36,43 @@ It is important to check health of these services so that Kubernetes and
its users know when Kuryr Controller it is ready to perform its networking its users know when Kuryr Controller it is ready to perform its networking
tasks. Also, it is necessary to check the health state of Kuryr components in tasks. Also, it is necessary to check the health state of Kuryr components in
order to assure Kuryr Controller service is alive. To provide these order to assure Kuryr Controller service is alive. To provide these
functionalities, Health Manager will verify and serve the health state of functionalities, Controller's Health Manager will verify and serve the health
these services and components to the probe. state of these services and components to the probes.
Besides these problems on the Controller, Kuryr CNI daemon also might get to a
broken state as a result of its components being not healthy and necessary
configurations not present. It is essencial that CNI components health and
configurations are properly verified to assure CNI daemon is in a good shape.
On this way, the CNI Health Manager will check and serve the health state to
Kubenetes readiness and liveness probes.
Proposed Solution Proposed Solution
----------------- -----------------
One of the endpoints provided by The Health Manager will check whether it is One of the endpoints provided by the Controller Health Manager will check
able to watch the Kubernetes API, authenticate with Keystone and talk to whether it is able to watch the Kubernetes API, authenticate with Keystone
Neutron, since these are services needed by Kuryr Controller. These checks and talk to Neutron, since these are services needed by Kuryr Controller.
will assure the Controller readiness. The other endpoint, will verify These checks will assure the Controller readiness. The other endpoint, will
the health state of Kuryr components and guarantee Controller liveness. verify the health state of Kuryr components and guarantee Controller liveness.
The idea behind the Manager is to combine all the necessary checks in a The CNI Health Manager also provides two endpoins to Kubernetes probes.
server running inside Kuryr Controller pod and provide the checks result The endpoint that provides readiness state to the probe checks connection
to the probe. to Kubernetes API and presence of NET_ADMIN capabilities. The other endpoint,
which provides liveness, validates whether IPDB is in working order, maximum
CNI ADD failure is reached, health of CNI components and existence of memory
leak.
.. note::
The CNI Health Manager will be started with the check for memory leak disabled.
In order to enable, set the following option in kuryr.conf to a limit value
of memory in MiBs.
[cni_health_server]
max_memory_usage = -1
The CNI Health Manager is added as a process to CNI daemon and communicates
to the other two processes i.e. Watcher and Server with a shared boolean object,
which indicates the current health state of each component.
The idea behind these two Managers is to combine all the necessary checks in
servers running inside Kuryr Controller and CNI pods to provide the checks result
to the probes.

View File

@ -75,14 +75,18 @@ def _configure_l3(vif, ifname, netns):
dst='default').commit() dst='default').commit()
def connect(vif, instance_info, ifname, netns=None): def connect(vif, instance_info, ifname, netns=None, report_health=None):
driver = _get_binding_driver(vif) driver = _get_binding_driver(vif)
if report_health:
report_health(driver.is_healthy())
os_vif.plug(vif, instance_info) os_vif.plug(vif, instance_info)
driver.connect(vif, ifname, netns) driver.connect(vif, ifname, netns)
_configure_l3(vif, ifname, netns) _configure_l3(vif, ifname, netns)
def disconnect(vif, instance_info, ifname, netns=None): def disconnect(vif, instance_info, ifname, netns=None, report_health=None):
driver = _get_binding_driver(vif) driver = _get_binding_driver(vif)
if report_health:
report_health(driver.is_healthy())
driver.disconnect(vif, ifname, netns) driver.disconnect(vif, ifname, netns)
os_vif.unplug(vif, instance_info) os_vif.unplug(vif, instance_info)

View File

@ -12,18 +12,23 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import os import os
from oslo_config import cfg
from oslo_log import log from oslo_log import log
from kuryr_kubernetes.cni.binding import base as b_base from kuryr_kubernetes.cni.binding import base as b_base
from kuryr_kubernetes.handlers import health
from kuryr_kubernetes import linux_net_utils as net_utils from kuryr_kubernetes import linux_net_utils as net_utils
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
CONF = cfg.CONF
class BaseBridgeDriver(object): class BaseBridgeDriver(health.HealthHandler):
def __init__(self):
super(BaseBridgeDriver, self).__init__()
def connect(self, vif, ifname, netns): def connect(self, vif, ifname, netns):
host_ifname = vif.vif_name host_ifname = vif.vif_name
@ -59,6 +64,9 @@ class BaseBridgeDriver(object):
class BridgeDriver(BaseBridgeDriver): class BridgeDriver(BaseBridgeDriver):
def __init__(self):
super(BridgeDriver, self).__init__()
def connect(self, vif, ifname, netns): def connect(self, vif, ifname, netns):
super(BridgeDriver, self).connect(vif, ifname, netns) super(BridgeDriver, self).connect(vif, ifname, netns)
host_ifname = vif.vif_name host_ifname = vif.vif_name
@ -75,6 +83,10 @@ class BridgeDriver(BaseBridgeDriver):
class VIFOpenVSwitchDriver(BaseBridgeDriver): class VIFOpenVSwitchDriver(BaseBridgeDriver):
def __init__(self):
super(VIFOpenVSwitchDriver, self).__init__()
def connect(self, vif, ifname, netns): def connect(self, vif, ifname, netns):
super(VIFOpenVSwitchDriver, self).connect(vif, ifname, netns) super(VIFOpenVSwitchDriver, self).connect(vif, ifname, netns)
# FIXME(irenab) use pod_id (neutron port device_id) # FIXME(irenab) use pod_id (neutron port device_id)
@ -86,3 +98,13 @@ class VIFOpenVSwitchDriver(BaseBridgeDriver):
def disconnect(self, vif, ifname, netns): def disconnect(self, vif, ifname, netns):
super(VIFOpenVSwitchDriver, self).disconnect(vif, ifname, netns) super(VIFOpenVSwitchDriver, self).disconnect(vif, ifname, netns)
net_utils.delete_ovs_vif_port(vif.bridge_name, vif.vif_name) net_utils.delete_ovs_vif_port(vif.bridge_name, vif.vif_name)
def is_healthy(self):
bridge_name = CONF.neutron_defaults.ovs_bridge
try:
with b_base.get_ipdb() as h_ipdb:
h_ipdb.interfaces[bridge_name]
return True
except Exception:
LOG.debug("Reporting Driver not healthy.")
return False

View File

@ -17,6 +17,7 @@ import six
from kuryr_kubernetes.cni.binding import base as b_base from kuryr_kubernetes.cni.binding import base as b_base
from kuryr_kubernetes import config from kuryr_kubernetes import config
from kuryr_kubernetes.handlers import health
from kuryr_kubernetes import utils from kuryr_kubernetes import utils
VLAN_KIND = 'vlan' VLAN_KIND = 'vlan'
@ -25,7 +26,10 @@ MACVLAN_MODE_BRIDGE = 'bridge'
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class NestedDriver(object): class NestedDriver(health.HealthHandler):
def __init__(self):
super(NestedDriver, self).__init__()
@abc.abstractmethod @abc.abstractmethod
def _get_iface_create_args(self, vif): def _get_iface_create_args(self, vif):
@ -65,10 +69,18 @@ class NestedDriver(object):
class VlanDriver(NestedDriver): class VlanDriver(NestedDriver):
def __init__(self):
super(VlanDriver, self).__init__()
def _get_iface_create_args(self, vif): def _get_iface_create_args(self, vif):
return {'kind': VLAN_KIND, 'vlan_id': vif.vlan_id} return {'kind': VLAN_KIND, 'vlan_id': vif.vlan_id}
class MacvlanDriver(NestedDriver): class MacvlanDriver(NestedDriver):
def __init__(self):
super(MacvlanDriver, self).__init__()
def _get_iface_create_args(self, vif): def _get_iface_create_args(self, vif):
return {'kind': MACVLAN_KIND, 'macvlan_mode': MACVLAN_MODE_BRIDGE} return {'kind': MACVLAN_KIND, 'macvlan_mode': MACVLAN_MODE_BRIDGE}

View File

@ -12,11 +12,14 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from ctypes import c_bool
import multiprocessing import multiprocessing
import os import os
from six.moves import http_client as httplib from six.moves import http_client as httplib
import socket import socket
import sys import sys
import threading
import time
import cotyledon import cotyledon
import flask import flask
@ -35,6 +38,7 @@ from kuryr_kubernetes import clients
from kuryr_kubernetes.cni import api from kuryr_kubernetes.cni import api
from kuryr_kubernetes.cni.binding import base as b_base from kuryr_kubernetes.cni.binding import base as b_base
from kuryr_kubernetes.cni import handlers as h_cni from kuryr_kubernetes.cni import handlers as h_cni
from kuryr_kubernetes.cni import health
from kuryr_kubernetes.cni import utils from kuryr_kubernetes.cni import utils
from kuryr_kubernetes import config from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const from kuryr_kubernetes import constants as k_const
@ -45,6 +49,7 @@ from kuryr_kubernetes import watcher as k_watcher
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
RETRY_DELAY = 1000 # 1 second in milliseconds RETRY_DELAY = 1000 # 1 second in milliseconds
HEALTH_CHECKER_DELAY = 5
# TODO(dulek): Another corner case is (and was) when pod is deleted before it's # TODO(dulek): Another corner case is (and was) when pod is deleted before it's
# annotated by controller or even noticed by any watcher. Kubelet # annotated by controller or even noticed by any watcher. Kubelet
@ -55,7 +60,8 @@ RETRY_DELAY = 1000 # 1 second in milliseconds
class K8sCNIRegistryPlugin(api.CNIPlugin): class K8sCNIRegistryPlugin(api.CNIPlugin):
def __init__(self, registry): def __init__(self, registry, healthy):
self.healthy = healthy
self.registry = registry self.registry = registry
def _get_name(self, pod): def _get_name(self, pod):
@ -108,6 +114,12 @@ class K8sCNIRegistryPlugin(api.CNIPlugin):
pass pass
self._do_work(params, b_base.disconnect) self._do_work(params, b_base.disconnect)
def report_drivers_health(self, driver_healthy):
if not driver_healthy:
with self.healthy.get_lock():
LOG.debug("Reporting CNI driver not healthy.")
self.healthy.value = driver_healthy
def _do_work(self, params, fn): def _do_work(self, params, fn):
pod_name = params.args.K8S_POD_NAME pod_name = params.args.K8S_POD_NAME
@ -126,7 +138,8 @@ class K8sCNIRegistryPlugin(api.CNIPlugin):
except KeyError: except KeyError:
raise exceptions.ResourceNotReady(pod_name) raise exceptions.ResourceNotReady(pod_name)
fn(vif, self._get_inst(pod), params.CNI_IFNAME, params.CNI_NETNS) fn(vif, self._get_inst(pod), params.CNI_IFNAME, params.CNI_NETNS,
self.report_drivers_health)
return vif return vif
def _get_inst(self, pod): def _get_inst(self, pod):
@ -135,10 +148,11 @@ class K8sCNIRegistryPlugin(api.CNIPlugin):
class DaemonServer(object): class DaemonServer(object):
def __init__(self, plugin): def __init__(self, plugin, healthy):
self.ctx = None self.ctx = None
self.plugin = plugin self.plugin = plugin
self.healthy = healthy
self.failure_count = multiprocessing.Value('i', 0)
self.application = flask.Flask('kuryr-daemon') self.application = flask.Flask('kuryr-daemon')
self.application.add_url_rule( self.application.add_url_rule(
'/addNetwork', methods=['POST'], view_func=self.add) '/addNetwork', methods=['POST'], view_func=self.add)
@ -157,6 +171,7 @@ class DaemonServer(object):
try: try:
params = self._prepare_request() params = self._prepare_request()
except Exception: except Exception:
self._check_failure()
LOG.exception('Exception when reading CNI params.') LOG.exception('Exception when reading CNI params.')
return '', httplib.BAD_REQUEST, self.headers return '', httplib.BAD_REQUEST, self.headers
@ -164,10 +179,12 @@ class DaemonServer(object):
vif = self.plugin.add(params) vif = self.plugin.add(params)
data = jsonutils.dumps(vif.obj_to_primitive()) data = jsonutils.dumps(vif.obj_to_primitive())
except exceptions.ResourceNotReady as e: except exceptions.ResourceNotReady as e:
self._check_failure()
LOG.error("Timed out waiting for requested pod to appear in " LOG.error("Timed out waiting for requested pod to appear in "
"registry: %s.", e) "registry: %s.", e)
return '', httplib.GATEWAY_TIMEOUT, self.headers return '', httplib.GATEWAY_TIMEOUT, self.headers
except Exception: except Exception:
self._check_failure()
LOG.exception('Error when processing addNetwork request. CNI ' LOG.exception('Error when processing addNetwork request. CNI '
'Params: %s', params) 'Params: %s', params)
return '', httplib.INTERNAL_SERVER_ERROR, self.headers return '', httplib.INTERNAL_SERVER_ERROR, self.headers
@ -215,16 +232,26 @@ class DaemonServer(object):
LOG.exception('Failed to start kuryr-daemon.') LOG.exception('Failed to start kuryr-daemon.')
raise raise
def _check_failure(self):
with self.failure_count.get_lock():
if self.failure_count.value < CONF.cni_daemon.cni_failures_count:
self.failure_count.value += 1
else:
with self.healthy.get_lock():
LOG.debug("Reporting maximun CNI ADD failures reached.")
self.healthy.value = False
class CNIDaemonServerService(cotyledon.Service): class CNIDaemonServerService(cotyledon.Service):
name = "server" name = "server"
def __init__(self, worker_id, registry): def __init__(self, worker_id, registry, healthy):
super(CNIDaemonServerService, self).__init__(worker_id) super(CNIDaemonServerService, self).__init__(worker_id)
self.run_queue_reading = False self.run_queue_reading = False
self.registry = registry self.registry = registry
self.plugin = K8sCNIRegistryPlugin(registry) self.healthy = healthy
self.server = DaemonServer(self.plugin) self.plugin = K8sCNIRegistryPlugin(registry, self.healthy)
self.server = DaemonServer(self.plugin, self.healthy)
def run(self): def run(self):
# NOTE(dulek): We might do a *lot* of pyroute2 operations, let's # NOTE(dulek): We might do a *lot* of pyroute2 operations, let's
@ -239,11 +266,13 @@ class CNIDaemonServerService(cotyledon.Service):
class CNIDaemonWatcherService(cotyledon.Service): class CNIDaemonWatcherService(cotyledon.Service):
name = "watcher" name = "watcher"
def __init__(self, worker_id, registry): def __init__(self, worker_id, registry, healthy):
super(CNIDaemonWatcherService, self).__init__(worker_id) super(CNIDaemonWatcherService, self).__init__(worker_id)
self.pipeline = None self.pipeline = None
self.watcher = None self.watcher = None
self.health_thread = None
self.registry = registry self.registry = registry
self.healthy = healthy
def _get_nodename(self): def _get_nodename(self):
# NOTE(dulek): At first try to get it using environment variable, # NOTE(dulek): At first try to get it using environment variable,
@ -262,8 +291,20 @@ class CNIDaemonWatcherService(cotyledon.Service):
"%(base)s/pods?fieldSelector=spec.nodeName=%(node_name)s" % { "%(base)s/pods?fieldSelector=spec.nodeName=%(node_name)s" % {
'base': k_const.K8S_API_BASE, 'base': k_const.K8S_API_BASE,
'node_name': self._get_nodename()}) 'node_name': self._get_nodename()})
self.is_running = True
self.health_thread = threading.Thread(
target=self._start_watcher_health_checker)
self.health_thread.start()
self.watcher.start() self.watcher.start()
def _start_watcher_health_checker(self):
while self.is_running:
if not self.watcher.is_healthy():
LOG.debug("Reporting watcher not healthy.")
with self.healthy.get_lock():
self.healthy.value = False
time.sleep(HEALTH_CHECKER_DELAY)
def on_done(self, pod, vif): def on_done(self, pod, vif):
pod_name = pod['metadata']['name'] pod_name = pod['metadata']['name']
vif_dict = vif.obj_to_primitive() vif_dict = vif.obj_to_primitive()
@ -284,10 +325,24 @@ class CNIDaemonWatcherService(cotyledon.Service):
self.registry[pod_name] = pod_dict self.registry[pod_name] = pod_dict
def terminate(self): def terminate(self):
self.is_running = False
if self.health_thread:
self.health_thread.join()
if self.watcher: if self.watcher:
self.watcher.stop() self.watcher.stop()
class CNIDaemonHealthServerService(cotyledon.Service):
name = "health"
def __init__(self, worker_id, healthy):
super(CNIDaemonHealthServerService, self).__init__(worker_id)
self.health_server = health.CNIHealthServer(healthy)
def run(self):
self.health_server.run()
class CNIDaemonServiceManager(cotyledon.ServiceManager): class CNIDaemonServiceManager(cotyledon.ServiceManager):
def __init__(self): def __init__(self):
super(CNIDaemonServiceManager, self).__init__() super(CNIDaemonServiceManager, self).__init__()
@ -302,8 +357,10 @@ class CNIDaemonServiceManager(cotyledon.ServiceManager):
self.manager = multiprocessing.Manager() self.manager = multiprocessing.Manager()
registry = self.manager.dict() # For Watcher->Server communication. registry = self.manager.dict() # For Watcher->Server communication.
self.add(CNIDaemonWatcherService, workers=1, args=(registry,)) healthy = multiprocessing.Value(c_bool, True)
self.add(CNIDaemonServerService, workers=1, args=(registry,)) self.add(CNIDaemonWatcherService, workers=1, args=(registry, healthy,))
self.add(CNIDaemonServerService, workers=1, args=(registry, healthy,))
self.add(CNIDaemonHealthServerService, workers=1, args=(healthy,))
self.register_hooks(on_terminate=self.terminate) self.register_hooks(on_terminate=self.terminate)
def run(self): def run(self):

View File

@ -0,0 +1,129 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import os
import psutil
import requests
from six.moves import http_client as httplib
import subprocess
from flask import Flask
from pyroute2 import IPDB
from kuryr.lib._i18n import _
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
cni_health_server_opts = [
cfg.IntOpt('port',
help=_('Port for CNI Health HTTP Server.'),
default=8090),
cfg.IntOpt('max_memory_usage',
help=_('Maximum memory usage (MiB) for CNI Health Server '
'process. If this value is exceeded kuryr-daemon '
'will be marked as unhealthy.'),
default=-1),
]
CONF.register_opts(cni_health_server_opts, "cni_health_server")
BYTES_AMOUNT = 1048576
class CNIHealthServer(object):
"""Server used by readiness and liveness probe to manage CNI health checks.
Verifies presence of NET_ADMIN capabilities, IPDB in working order,
connectivity to Kubernetes API, quantity of CNI add failure, health of
CNI components and existence of memory leaks.
"""
def __init__(self, components_healthy):
self.ctx = None
self._components_healthy = components_healthy
self.application = Flask('cni-health-daemon')
self.application.add_url_rule(
'/ready', methods=['GET'], view_func=self.readiness_status)
self.application.add_url_rule(
'/alive', methods=['GET'], view_func=self.liveness_status)
self.headers = {'Connection': 'close'}
def readiness_status(self):
net_admin_command = 'capsh --print | grep "Current:" | ' \
'cut -d" " -f3 | grep -q cap_net_admin'
return_code = subprocess.call(net_admin_command, shell=True)
data = 'ok'
k8s_conn, k8s_status = self.verify_k8s_connection()
if return_code != 0:
error_message = 'NET_ADMIN capabilities not present.'
LOG.error(error_message)
return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers
if not k8s_conn:
error_message = 'Error when processing k8s healthz request.'
LOG.error(error_message)
return error_message, k8s_status, self.headers
LOG.info('CNI driver readiness verified.')
return data, httplib.OK, self.headers
def liveness_status(self):
data = 'ok'
no_limit = -1
try:
with IPDB() as a:
a.release()
except Exception:
error_message = 'IPDB not in working order.'
LOG.debug(error_message)
return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers
if CONF.cni_health_server.max_memory_usage != no_limit:
# Force gc to release unreferenced memory before actually checking
# the memory.
gc.collect()
process = psutil.Process(os.getpid())
mem_usage = process.memory_info().rss / BYTES_AMOUNT
if mem_usage > CONF.cni_health_server.max_memory_usage:
err_message = 'CNI daemon exceeded maximum memory usage.'
LOG.debug(err_message)
return err_message, httplib.INTERNAL_SERVER_ERROR, self.headers
with self._components_healthy.get_lock():
if not self._components_healthy.value:
err_message = 'Kuryr CNI components not healthy.'
LOG.debug(err_message)
return err_message, httplib.INTERNAL_SERVER_ERROR, self.headers
LOG.debug('Kuryr CNI Liveness verified.')
return data, httplib.OK, self.headers
def run(self):
address = ''
try:
LOG.info('Starting CNI health check server.')
self.application.run(address, CONF.cni_health_server.port)
except Exception:
LOG.exception('Failed to start CNI health check server.')
raise
def verify_k8s_connection(self):
path = '/healthz'
address = CONF.kubernetes.api_root
url = address + path
resp = requests.get(url, headers={'Connection': 'close'})
return resp.content == 'ok', resp.status_code

View File

@ -70,6 +70,11 @@ daemon_opts = [
"host network namespaces, which is essential for Kuryr " "host network namespaces, which is essential for Kuryr "
"to work."), "to work."),
default=None), default=None),
cfg.IntOpt('cni_failures_count',
help=_('Maximum number of consecutive failures of kuryr-daemon '
'when processing requests. If this number is exceeded, '
'kuryr-daemon will be marked as unhealthy.'),
default=3),
] ]
k8s_opts = [ k8s_opts = [

View File

@ -14,6 +14,7 @@ import copy
from oslo_log import _options from oslo_log import _options
from kuryr.lib import opts as lib_opts from kuryr.lib import opts as lib_opts
from kuryr_kubernetes.cni import health as cni_health
from kuryr_kubernetes import config from kuryr_kubernetes import config
from kuryr_kubernetes.controller.drivers import default_subnet from kuryr_kubernetes.controller.drivers import default_subnet
from kuryr_kubernetes.controller.drivers import nested_vif from kuryr_kubernetes.controller.drivers import nested_vif
@ -33,6 +34,7 @@ _kuryr_k8s_opts = [
('pool_manager', pool.pool_manager_opts), ('pool_manager', pool.pool_manager_opts),
('cni_daemon', config.daemon_opts), ('cni_daemon', config.daemon_opts),
('health_server', health.health_server_opts), ('health_server', health.health_server_opts),
('cni_health_server', cni_health.cni_health_server_opts),
] ]

View File

@ -77,20 +77,26 @@ class TestDriverMixin(test_base.TestCase):
@mock.patch('kuryr_kubernetes.cni.binding.base.get_ipdb') @mock.patch('kuryr_kubernetes.cni.binding.base.get_ipdb')
@mock.patch('os_vif.plug') @mock.patch('os_vif.plug')
def _test_connect(self, m_vif_plug, m_get_ipdb): def _test_connect(self, m_vif_plug, m_get_ipdb, report=None):
def get_ipdb(netns=None): def get_ipdb(netns=None):
return self.ipdbs[netns] return self.ipdbs[netns]
m_get_ipdb.side_effect = get_ipdb m_get_ipdb.side_effect = get_ipdb
base.connect(self.vif, self.instance_info, self.ifname, self.netns) base.connect(self.vif, self.instance_info, self.ifname, self.netns,
report)
m_vif_plug.assert_called_once_with(self.vif, self.instance_info) m_vif_plug.assert_called_once_with(self.vif, self.instance_info)
self.m_c_iface.add_ip.assert_called_once_with('192.168.0.2/24') self.m_c_iface.add_ip.assert_called_once_with('192.168.0.2/24')
if report:
report.assert_called_once()
@mock.patch('os_vif.unplug') @mock.patch('os_vif.unplug')
def _test_disconnect(self, m_vif_unplug): def _test_disconnect(self, m_vif_unplug, report=None):
base.disconnect(self.vif, self.instance_info, self.ifname, self.netns) base.disconnect(self.vif, self.instance_info, self.ifname, self.netns,
report)
m_vif_unplug.assert_called_once_with(self.vif, self.instance_info) m_vif_unplug.assert_called_once_with(self.vif, self.instance_info)
if report:
report.assert_called_once()
class TestOpenVSwitchDriver(TestDriverMixin, test_base.TestCase): class TestOpenVSwitchDriver(TestDriverMixin, test_base.TestCase):
@ -98,11 +104,13 @@ class TestOpenVSwitchDriver(TestDriverMixin, test_base.TestCase):
super(TestOpenVSwitchDriver, self).setUp() super(TestOpenVSwitchDriver, self).setUp()
self.vif = fake._fake_vif(osv_objects.vif.VIFOpenVSwitch) self.vif = fake._fake_vif(osv_objects.vif.VIFOpenVSwitch)
@mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.'
'report_drivers_health')
@mock.patch('os.getpid', mock.Mock(return_value=123)) @mock.patch('os.getpid', mock.Mock(return_value=123))
@mock.patch('kuryr_kubernetes.linux_net_utils.create_ovs_vif_port') @mock.patch('kuryr_kubernetes.linux_net_utils.create_ovs_vif_port')
def test_connect(self, mock_create_ovs): def test_connect(self, mock_create_ovs, m_report):
self._test_connect() self._test_connect(report=m_report)
self.assertEqual(2, self.h_ipdb_exit.call_count) self.assertEqual(3, self.h_ipdb_exit.call_count)
self.assertEqual(2, self.c_ipdb_exit.call_count) self.assertEqual(2, self.c_ipdb_exit.call_count)
self.c_ipdb.create.assert_called_once_with( self.c_ipdb.create.assert_called_once_with(
ifname=self.ifname, peer='h_interface', kind='veth') ifname=self.ifname, peer='h_interface', kind='veth')
@ -118,9 +126,11 @@ class TestOpenVSwitchDriver(TestDriverMixin, test_base.TestCase):
'bridge', 'h_interface', '89eccd45-43e9-43d8-b4cc-4c13db13f782', 'bridge', 'h_interface', '89eccd45-43e9-43d8-b4cc-4c13db13f782',
'3e:94:b7:31:a0:83', 'kuryr') '3e:94:b7:31:a0:83', 'kuryr')
@mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.'
'report_drivers_health')
@mock.patch('kuryr_kubernetes.linux_net_utils.delete_ovs_vif_port') @mock.patch('kuryr_kubernetes.linux_net_utils.delete_ovs_vif_port')
def test_disconnect(self, mock_delete_ovs): def test_disconnect(self, mock_delete_ovs, m_report):
self._test_disconnect() self._test_disconnect(report=m_report)
mock_delete_ovs.assert_called_once_with('bridge', 'h_interface') mock_delete_ovs.assert_called_once_with('bridge', 'h_interface')

View File

@ -0,0 +1,89 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ctypes import c_bool
from kuryr_kubernetes.cni import health
from kuryr_kubernetes.tests import base
import mock
import multiprocessing
from oslo_config import cfg
class TestResourceUsage(object):
pass
class TestCNIHealthServer(base.TestCase):
def setUp(self):
super(TestCNIHealthServer, self).setUp()
healthy = multiprocessing.Value(c_bool, True)
self.srv = health.CNIHealthServer(healthy)
self.srv.application.testing = True
self.test_client = self.srv.application.test_client()
@mock.patch('subprocess.call')
@mock.patch('kuryr_kubernetes.cni.health.CNIHealthServer.'
'verify_k8s_connection')
def test_readiness_status(self, m_verify_k8s_conn, m_subprocess):
m_subprocess.return_value = 0
m_verify_k8s_conn.return_value = True, 200
resp = self.test_client.get('/ready')
self.assertEqual(200, resp.status_code)
@mock.patch('subprocess.call')
@mock.patch('kuryr_kubernetes.cni.health.CNIHealthServer.'
'verify_k8s_connection')
def test_readiness_status_net_admin_error(self, m_verify_k8s_conn,
m_subprocess):
m_subprocess.return_value = 1
m_verify_k8s_conn.return_value = True, 200
resp = self.test_client.get('/ready')
self.assertEqual(500, resp.status_code)
@mock.patch('subprocess.call')
@mock.patch('kuryr_kubernetes.cni.health.CNIHealthServer.'
'verify_k8s_connection')
def test_readiness_status_k8s_error(self, m_verify_k8s_conn, m_subprocess):
m_subprocess.return_value = 0
m_verify_k8s_conn.return_value = False, 503
resp = self.test_client.get('/ready')
self.assertEqual(503, resp.status_code)
@mock.patch('pyroute2.IPDB.release')
def test_liveness_status(self, m_ipdb):
self.srv._components_healthy.value = True
resp = self.test_client.get('/alive')
m_ipdb.assert_called()
self.assertEqual(200, resp.status_code)
def test_liveness_status_components_error(self):
self.srv._components_healthy.value = False
resp = self.test_client.get('/alive')
self.assertEqual(500, resp.status_code)
@mock.patch('pyroute2.IPDB.release')
def test_liveness_status_ipdb_error(self, m_ipdb):
m_ipdb.side_effect = Exception
resp = self.test_client.get('/alive')
self.assertEqual(500, resp.status_code)
@mock.patch('psutil.Process.memory_info')
def test_liveness_status_mem_usage_error(self, m_resource):
cfg.CONF.set_override('max_memory_usage', 4096,
group='cni_health_server')
cls = TestResourceUsage()
cls.rss = 5368709120
m_resource.return_value = cls
resp = self.test_client.get('/alive')
self.assertEqual(500, resp.status_code)

View File

@ -30,7 +30,8 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
self.vif = fake._fake_vif_dict() self.vif = fake._fake_vif_dict()
registry = {'foo': {'pod': self.pod, 'vif': self.vif, registry = {'foo': {'pod': self.pod, 'vif': self.vif,
'containerid': None}} 'containerid': None}}
self.plugin = service.K8sCNIRegistryPlugin(registry) healthy = mock.Mock()
self.plugin = service.K8sCNIRegistryPlugin(registry, healthy)
self.params = mock.Mock(args=mock.Mock(K8S_POD_NAME='foo'), self.params = mock.Mock(args=mock.Mock(K8S_POD_NAME='foo'),
CNI_IFNAME='baz', CNI_NETNS=123, CNI_IFNAME='baz', CNI_NETNS=123,
CNI_CONTAINERID='cont_id') CNI_CONTAINERID='cont_id')
@ -41,20 +42,22 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
self.plugin.add(self.params) self.plugin.add(self.params)
m_lock.assert_called_with('foo', external=True) m_lock.assert_called_with('foo', external=True)
m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123) m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123, mock.ANY)
self.assertEqual('cont_id', self.plugin.registry['foo']['containerid']) self.assertEqual('cont_id', self.plugin.registry['foo']['containerid'])
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect') @mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
def test_del_present(self, m_disconnect): def test_del_present(self, m_disconnect):
self.plugin.delete(self.params) self.plugin.delete(self.params)
m_disconnect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123) m_disconnect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123,
mock.ANY)
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect') @mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
def test_del_wrong_container_id(self, m_disconnect): def test_del_wrong_container_id(self, m_disconnect):
registry = {'foo': {'pod': self.pod, 'vif': self.vif, registry = {'foo': {'pod': self.pod, 'vif': self.vif,
'containerid': 'different'}} 'containerid': 'different'}}
self.plugin = service.K8sCNIRegistryPlugin(registry) healthy = mock.Mock()
self.plugin = service.K8sCNIRegistryPlugin(registry, healthy)
self.plugin.delete(self.params) self.plugin.delete(self.params)
m_disconnect.assert_not_called() m_disconnect.assert_not_called()
@ -77,7 +80,7 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
m_setitem.assert_called_once_with('foo', {'pod': self.pod, m_setitem.assert_called_once_with('foo', {'pod': self.pod,
'vif': self.vif, 'vif': self.vif,
'containerid': 'cont_id'}) 'containerid': 'cont_id'})
m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123) m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123, mock.ANY)
@mock.patch('time.sleep', mock.Mock()) @mock.patch('time.sleep', mock.Mock())
def test_add_not_present(self): def test_add_not_present(self):
@ -95,8 +98,10 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
class TestDaemonServer(base.TestCase): class TestDaemonServer(base.TestCase):
def setUp(self): def setUp(self):
super(TestDaemonServer, self).setUp() super(TestDaemonServer, self).setUp()
self.plugin = service.K8sCNIRegistryPlugin({}) healthy = mock.Mock()
self.srv = service.DaemonServer(self.plugin) self.plugin = service.K8sCNIRegistryPlugin({}, healthy)
self.health_registry = mock.Mock()
self.srv = service.DaemonServer(self.plugin, self.health_registry)
self.srv.application.testing = True self.srv.application.testing = True
self.test_client = self.srv.application.test_client() self.test_client = self.srv.application.test_client()

View File

@ -16,6 +16,7 @@ oslo.serialization!=2.19.1,>=2.18.0 # Apache-2.0
oslo.service!=1.28.1,>=1.24.0 # Apache-2.0 oslo.service!=1.28.1,>=1.24.0 # Apache-2.0
oslo.utils>=3.33.0 # Apache-2.0 oslo.utils>=3.33.0 # Apache-2.0
os-vif!=1.8.0,>=1.7.0 # Apache-2.0 os-vif!=1.8.0,>=1.7.0 # Apache-2.0
psutil>=3.2.2 # BSD
pyroute2>=0.4.21;sys_platform!='win32' # Apache-2.0 (+ dual licensed GPL2) pyroute2>=0.4.21;sys_platform!='win32' # Apache-2.0 (+ dual licensed GPL2)
retrying!=1.3.0,>=1.2.3 # Apache-2.0 retrying!=1.3.0,>=1.2.3 # Apache-2.0
six>=1.10.0 # MIT six>=1.10.0 # MIT

View File

@ -107,4 +107,5 @@ generate_kuryr_configmap $OUTPUT_DIR $CONTROLLER_CONF_PATH $CNI_CONF_PATH
generate_kuryr_service_account $OUTPUT_DIR generate_kuryr_service_account $OUTPUT_DIR
health_server_port=${KURYR_HEALTH_SERVER_PORT:-8082} health_server_port=${KURYR_HEALTH_SERVER_PORT:-8082}
generate_controller_deployment $OUTPUT_DIR $health_server_port generate_controller_deployment $OUTPUT_DIR $health_server_port
generate_cni_daemon_set $OUTPUT_DIR cni_health_server_port=${KURYR_CNI_HEALTH_SERVER_PORT:-8090}
generate_cni_daemon_set $OUTPUT_DIR $cni_health_server_port