Civilize logging

Our logs are awful and this commit attempts to fix some issues with
them:
* Make sure we always indicate why some readiness or liveness probe
  fail.
* Suppress INFO logs from werkzeug (so that we don't see every probe
  call on INFO level).
* Remove logging of successful probe checks.
* Make watcher restart logs less scary and include more cases.
* Add backoff to watcher restarts so that we don't spam logs when K8s
  API is briefly unavailable.
* Add warnings for low quotas.
* Suppress some long logs on K8s healthz failures - we don't need full
  message from K8s printed twice.

I also refactored CNI and controller health probes servers to make sure
they're not duplicating code.

Change-Id: Ia3db4863af8f28cfbaf2317042c8631cc63d9745
This commit is contained in:
Michał Dulko 2020-05-22 16:17:02 +02:00
parent b99f6a85f1
commit d8892d2e72
9 changed files with 164 additions and 113 deletions

View File

@ -112,5 +112,7 @@ class VIFOpenVSwitchDriver(BaseBridgeDriver):
h_ipdb.interfaces[bridge_name] h_ipdb.interfaces[bridge_name]
return True return True
except Exception: except Exception:
LOG.debug("Reporting Driver not healthy.") LOG.error("The configured ovs_bridge=%s integration interface "
"does not exists. Reporting that driver is not healthy.",
bridge_name)
return False return False

View File

@ -13,15 +13,13 @@
from http import client as httplib from http import client as httplib
import os import os
from flask import Flask from oslo_config import cfg
from oslo_log import log as logging
from pyroute2 import IPDB from pyroute2 import IPDB
from kuryr.lib._i18n import _ from kuryr.lib._i18n import _
from kuryr_kubernetes import clients
from kuryr_kubernetes.cni import utils from kuryr_kubernetes.cni import utils
from kuryr_kubernetes import exceptions as exc from kuryr_kubernetes import health as base_server
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
@ -88,7 +86,7 @@ def _get_memsw_usage(cgroup_mem_path):
return memsw_in_bytes / BYTES_AMOUNT return memsw_in_bytes / BYTES_AMOUNT
class CNIHealthServer(object): class CNIHealthServer(base_server.BaseHealthServer):
"""Server used by readiness and liveness probe to manage CNI health checks. """Server used by readiness and liveness probe to manage CNI health checks.
Verifies presence of NET_ADMIN capabilities, IPDB in working order, Verifies presence of NET_ADMIN capabilities, IPDB in working order,
@ -98,33 +96,24 @@ class CNIHealthServer(object):
def __init__(self, components_healthy): def __init__(self, components_healthy):
self.ctx = None super().__init__('daemon-health', CONF.cni_health_server.port)
self._components_healthy = components_healthy self._components_healthy = components_healthy
self.application = Flask('cni-health-daemon')
self.application.add_url_rule(
'/ready', methods=['GET'], view_func=self.readiness_status)
self.application.add_url_rule(
'/alive', methods=['GET'], view_func=self.liveness_status)
self.headers = {'Connection': 'close'}
def readiness_status(self): def readiness_status(self):
data = 'ok'
k8s_conn = self.verify_k8s_connection() k8s_conn = self.verify_k8s_connection()
if not _has_cap(CAP_NET_ADMIN, EFFECTIVE_CAPS): if not _has_cap(CAP_NET_ADMIN, EFFECTIVE_CAPS):
error_message = 'NET_ADMIN capabilities not present.' error_message = 'NET_ADMIN capabilities not present.'
LOG.error(error_message) LOG.error(error_message)
return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers return error_message, httplib.INTERNAL_SERVER_ERROR, {}
if not k8s_conn: if not k8s_conn:
error_message = 'Error when processing k8s healthz request.' error_message = 'K8s API healtz endpoint failed.'
LOG.error(error_message) LOG.error(error_message)
return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers return error_message, httplib.INTERNAL_SERVER_ERROR, {}
LOG.info('CNI driver readiness verified.') return 'ok', httplib.OK, {}
return data, httplib.OK, self.headers
def liveness_status(self): def liveness_status(self):
data = 'ok'
no_limit = -1 no_limit = -1
try: try:
with IPDB(): with IPDB():
@ -132,7 +121,7 @@ class CNIHealthServer(object):
except Exception: except Exception:
error_message = 'IPDB not in working order.' error_message = 'IPDB not in working order.'
LOG.error(error_message) LOG.error(error_message)
return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers return error_message, httplib.INTERNAL_SERVER_ERROR, {}
if CONF.cni_health_server.max_memory_usage != no_limit: if CONF.cni_health_server.max_memory_usage != no_limit:
mem_usage = _get_memsw_usage(_get_cni_cgroup_path()) mem_usage = _get_memsw_usage(_get_cni_cgroup_path())
@ -140,31 +129,12 @@ class CNIHealthServer(object):
if mem_usage > CONF.cni_health_server.max_memory_usage: if mem_usage > CONF.cni_health_server.max_memory_usage:
err_message = 'CNI daemon exceeded maximum memory usage.' err_message = 'CNI daemon exceeded maximum memory usage.'
LOG.error(err_message) LOG.error(err_message)
return err_message, httplib.INTERNAL_SERVER_ERROR, self.headers return err_message, httplib.INTERNAL_SERVER_ERROR, {}
with self._components_healthy.get_lock(): with self._components_healthy.get_lock():
if not self._components_healthy.value: if not self._components_healthy.value:
err_message = 'Kuryr CNI components not healthy.' err_message = 'Kuryr CNI components not healthy.'
LOG.error(err_message) LOG.error(err_message)
return err_message, httplib.INTERNAL_SERVER_ERROR, self.headers return err_message, httplib.INTERNAL_SERVER_ERROR, {}
LOG.debug('Kuryr CNI Liveness verified.') return 'ok', httplib.OK, {}
return data, httplib.OK, self.headers
def run(self):
address = '::'
try:
LOG.info('Starting CNI health check server.')
self.application.run(address, CONF.cni_health_server.port)
except Exception:
LOG.exception('Failed to start CNI health check server.')
raise
def verify_k8s_connection(self):
k8s = clients.get_kubernetes_client()
try:
k8s.get('/healthz', json=False, headers={'Connection': 'close'})
except exc.K8sClientException:
LOG.exception('Exception when trying to reach Kubernetes API.')
return False
return True

View File

@ -144,9 +144,11 @@ class NamespaceHandler(k8s_base.ResourceEventHandler):
raise raise
def is_ready(self, quota): def is_ready(self, quota):
if not utils.has_kuryr_crd(constants.K8S_API_CRD_KURYRNETS): if not (utils.has_kuryr_crd(constants.K8S_API_CRD_KURYRNETS) and
self._check_quota(quota)):
LOG.error('Marking NamespaceHandler as not ready.')
return False return False
return self._check_quota(quota) return True
def _check_quota(self, quota): def _check_quota(self, quota):
resources = ('subnets', 'networks', 'security_groups') resources = ('subnets', 'networks', 'security_groups')

View File

@ -127,9 +127,11 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
self._drv_lbaas.update_lbaas_sg(svc, sgs) self._drv_lbaas.update_lbaas_sg(svc, sgs)
def is_ready(self, quota): def is_ready(self, quota):
if not utils.has_kuryr_crd(k_const.K8S_API_CRD_KURYRNETPOLICIES): if not (utils.has_kuryr_crd(k_const.K8S_API_CRD_KURYRNETPOLICIES) and
self._check_quota(quota)):
LOG.error("Marking NetworkPolicyHandler as not ready.")
return False return False
return self._check_quota(quota) return True
def _check_quota(self, quota): def _check_quota(self, quota):
if utils.has_limit(quota.security_groups): if utils.has_limit(quota.security_groups):

View File

@ -212,8 +212,10 @@ class VIFHandler(k8s_base.ResourceEventHandler):
self._update_services(services, crd_pod_selectors, project_id) self._update_services(services, crd_pod_selectors, project_id)
def is_ready(self, quota): def is_ready(self, quota):
if utils.has_limit(quota.ports): if (utils.has_limit(quota.ports) and
return utils.is_available('ports', quota.ports) not utils.is_available('ports', quota.ports)):
LOG.error('Marking VIFHandler as not ready.')
return False
return True return True
@staticmethod @staticmethod

View File

@ -15,7 +15,6 @@
from http import client as httplib from http import client as httplib
import os import os
from flask import Flask
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
@ -24,8 +23,8 @@ from kuryr.lib import config as kuryr_config
from kuryr.lib import utils from kuryr.lib import utils
from kuryr_kubernetes import clients from kuryr_kubernetes import clients
from kuryr_kubernetes import config from kuryr_kubernetes import config
from kuryr_kubernetes import exceptions as exc
from kuryr_kubernetes.handlers import health as h_health from kuryr_kubernetes.handlers import health as h_health
from kuryr_kubernetes import health as base_server
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
@ -39,7 +38,7 @@ health_server_opts = [
CONF.register_opts(health_server_opts, "health_server") CONF.register_opts(health_server_opts, "health_server")
class HealthServer(object): class HealthServer(base_server.BaseHealthServer):
"""Proxy server used by readiness and liveness probes to manage health checks. """Proxy server used by readiness and liveness probes to manage health checks.
Allows to verify connectivity with Kubernetes API, Keystone and Neutron. Allows to verify connectivity with Kubernetes API, Keystone and Neutron.
@ -49,14 +48,8 @@ class HealthServer(object):
""" """
def __init__(self): def __init__(self):
self.ctx = None super().__init__('controller-health', CONF.health_server.port)
self._registry = h_health.HealthRegister.get_instance().registry self._registry = h_health.HealthRegister.get_instance().registry
self.application = Flask('health-daemon')
self.application.add_url_rule(
'/ready', methods=['GET'], view_func=self.readiness_status)
self.application.add_url_rule(
'/alive', methods=['GET'], view_func=self.liveness_status)
self.headers = {'Connection': 'close'}
def _components_ready(self): def _components_ready(self):
os_net = clients.get_network_client() os_net = clients.get_network_client()
@ -70,64 +63,42 @@ class HealthServer(object):
return True return True
def readiness_status(self): def readiness_status(self):
data = 'ok'
if CONF.kubernetes.vif_pool_driver != 'noop': if CONF.kubernetes.vif_pool_driver != 'noop':
if not os.path.exists('/tmp/pools_loaded'): if not os.path.exists('/tmp/pools_loaded'):
error_message = 'Ports not loaded into the pools.' error_message = 'Ports not loaded into the pools.'
LOG.error(error_message) LOG.error(error_message)
return error_message, httplib.NOT_FOUND, self.headers return error_message, httplib.NOT_FOUND, {}
k8s_conn = self.verify_k8s_connection() k8s_conn = self.verify_k8s_connection()
if not k8s_conn: if not k8s_conn:
error_message = 'Error when processing k8s healthz request.' error_message = 'Error when processing k8s healthz request.'
LOG.error(error_message) LOG.error(error_message)
return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers return error_message, httplib.INTERNAL_SERVER_ERROR, {}
try: try:
self.verify_keystone_connection() self.verify_keystone_connection()
except Exception as ex: except Exception as ex:
error_message = ('Error when creating a Keystone session and ' error_message = ('Error when creating a Keystone session and '
'getting a token: %s.' % ex) 'getting a token: %s.' % ex)
LOG.exception(error_message) LOG.exception(error_message)
return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers return error_message, httplib.INTERNAL_SERVER_ERROR, {}
try: try:
if not self._components_ready(): if not self._components_ready():
return '', httplib.INTERNAL_SERVER_ERROR, self.headers return '', httplib.INTERNAL_SERVER_ERROR, {}
except Exception as ex: except Exception as ex:
error_message = ('Error when processing neutron request %s' % ex) error_message = ('Error when processing neutron request %s' % ex)
LOG.exception(error_message) LOG.exception(error_message)
return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers return error_message, httplib.INTERNAL_SERVER_ERROR, {}
LOG.info('Kuryr Controller readiness verified.') return 'ok', httplib.OK, {}
return data, httplib.OK, self.headers
def liveness_status(self): def liveness_status(self):
data = 'ok'
for component in self._registry: for component in self._registry:
if not component.is_alive(): if not component.is_alive():
LOG.debug('Kuryr Controller not healthy.') msg = 'Component %s is dead.' % component.__class__.__name__
return '', httplib.INTERNAL_SERVER_ERROR, self.headers LOG.error(msg)
LOG.debug('Kuryr Controller Liveness verified.') return msg, httplib.INTERNAL_SERVER_ERROR, {}
return data, httplib.OK, self.headers return 'ok', httplib.OK, {}
def run(self):
address = '::'
try:
LOG.info('Starting health check server.')
self.application.run(address, CONF.health_server.port)
except Exception:
LOG.exception('Failed to start health check server.')
raise
def verify_k8s_connection(self):
k8s = clients.get_kubernetes_client()
try:
k8s.get('/healthz', json=False, headers={'Connection': 'close'})
except exc.K8sClientException:
LOG.exception('Exception when trying to reach Kubernetes API.')
return False
return True
def verify_keystone_connection(self): def verify_keystone_connection(self):
# Obtain a new token to ensure connectivity with keystone # Obtain a new token to ensure connectivity with keystone

View File

@ -0,0 +1,76 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from flask import Flask
from oslo_config import cfg
from oslo_log import log as logging
from kuryr_kubernetes import clients
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class BaseHealthServer(abc.ABC):
"""Base class of server used to provide readiness and liveness probes."""
def __init__(self, app_name, port):
self.app_name = app_name
self.port = port
self.ctx = None
self.application = Flask(app_name)
self.application.add_url_rule(
'/ready', methods=['GET'], view_func=self.readiness_status)
self.application.add_url_rule(
'/alive', methods=['GET'], view_func=self.liveness_status)
def apply_conn_close(response):
response.headers['Connection'] = 'close'
return response
self.application.after_request(apply_conn_close)
@abc.abstractmethod
def readiness_status(self):
raise NotImplementedError()
@abc.abstractmethod
def liveness_status(self):
raise NotImplementedError()
def run(self):
# Disable obtrusive werkzeug logs.
logging.getLogger('werkzeug').setLevel(logging.WARNING)
address = '::'
LOG.info('Starting %s health check server on %s:%d.', self.app_name,
address, self.port)
try:
self.application.run(address, self.port)
except Exception:
LOG.exception('Failed to start %s health check server.',
self.app_name)
raise
def verify_k8s_connection(self):
k8s = clients.get_kubernetes_client()
try:
k8s.get('/healthz', json=False, headers={'Connection': 'close'})
except Exception as e:
# Not LOG.exception to make sure long message from K8s API is not
# repeated.
LOG.error('Exception when trying to reach Kubernetes API: %s.', e)
return False
return True

View File

@ -16,6 +16,7 @@ import contextlib
import itertools import itertools
import os import os
import ssl import ssl
import time
from urllib import parse from urllib import parse
from oslo_log import log as logging from oslo_log import log as logging
@ -27,10 +28,15 @@ from kuryr.lib._i18n import _
from kuryr_kubernetes import config from kuryr_kubernetes import config
from kuryr_kubernetes import constants from kuryr_kubernetes import constants
from kuryr_kubernetes import exceptions as exc from kuryr_kubernetes import exceptions as exc
from kuryr_kubernetes import utils
CONF = config.CONF CONF = config.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
# Hardcoding 60 seconds as I don't see a scenario when we want to wait more
# than a minute for reconnection.
MAX_BACKOFF = 60
class K8sClient(object): class K8sClient(object):
# REVISIT(ivc): replace with python-k8sclient if it could be extended # REVISIT(ivc): replace with python-k8sclient if it could be extended
@ -267,6 +273,7 @@ class K8sClient(object):
if self.token: if self.token:
header.update({'Authorization': 'Bearer %s' % self.token}) header.update({'Authorization': 'Bearer %s' % self.token})
attempt = 0
while True: while True:
try: try:
params = {'watch': 'true'} params = {'watch': 'true'}
@ -279,6 +286,7 @@ class K8sClient(object):
timeout=timeouts)) as response: timeout=timeouts)) as response:
if not response.ok: if not response.ok:
raise exc.K8sClientException(response.text) raise exc.K8sClientException(response.text)
attempt = 0
for line in response.iter_lines(): for line in response.iter_lines():
line = line.decode('utf-8').strip() line = line.decode('utf-8').strip()
if line: if line:
@ -289,19 +297,16 @@ class K8sClient(object):
m = line_dict.get('object', {}).get('metadata', {}) m = line_dict.get('object', {}).get('metadata', {})
resource_version = m.get('resourceVersion', None) resource_version = m.get('resourceVersion', None)
except (requests.ReadTimeout, requests.ConnectionError, except (requests.ReadTimeout, requests.ConnectionError,
ssl.SSLError) as e: ssl.SSLError, requests.exceptions.ChunkedEncodingError):
if isinstance(e, ssl.SSLError) and e.args != ('timed out',): t = utils.exponential_backoff(attempt, min_backoff=0,
raise max_backoff=MAX_BACKOFF)
log = LOG.debug
LOG.warning('%ds without data received from watching %s. ' if attempt > 0:
'Retrying the connection with resourceVersion=%s.', # Only make it a warning if it's happening again, no need
timeouts[1], path, params.get('resourceVersion')) # to inform about all the read timeouts.
except requests.exceptions.ChunkedEncodingError: log = LOG.warning
LOG.warning("Connection to %s closed when watching. This " log('Connection error when watching %s. Retrying in %ds with '
"mostly happens when Octavia's Amphora closes " 'resourceVersion=%s', path, t,
"connection due to lack of activity for 50s. " params.get('resourceVersion'))
"Since Rocky Octavia this is configurable and " time.sleep(t)
"should be set to at least 20m, so check timeouts " attempt += 1
"on Kubernetes API LB listener. Restarting "
"connection with resourceVersion=%s.", path,
params.get('resourceVersion'))

View File

@ -43,6 +43,7 @@ VALID_MULTI_POD_POOLS_OPTS = {'noop': ['neutron-vif',
} }
DEFAULT_TIMEOUT = 500 DEFAULT_TIMEOUT = 500
DEFAULT_INTERVAL = 3 DEFAULT_INTERVAL = 3
MAX_ATTEMPTS = 10
subnet_caching_opts = [ subnet_caching_opts = [
cfg.BoolOpt('caching', default=True), cfg.BoolOpt('caching', default=True),
@ -129,7 +130,7 @@ def exponential_sleep(deadline, attempt, interval=DEFAULT_INTERVAL):
if seconds_left <= 0: if seconds_left <= 0:
return 0 return 0
to_sleep = random.randint(1, 2 ** attempt - 1) * interval to_sleep = exponential_backoff(attempt, interval)
if to_sleep > seconds_left: if to_sleep > seconds_left:
to_sleep = seconds_left to_sleep = seconds_left
@ -141,6 +142,20 @@ def exponential_sleep(deadline, attempt, interval=DEFAULT_INTERVAL):
return to_sleep return to_sleep
def exponential_backoff(attempt, interval=DEFAULT_INTERVAL, min_backoff=1,
max_backoff=None):
if attempt >= MAX_ATTEMPTS:
# No need to calculate very long intervals
attempt = MAX_ATTEMPTS
backoff = random.randint(min_backoff, 2 ** attempt - 1) * interval
if max_backoff is not None and backoff > max_backoff:
backoff = max_backoff
return backoff
def get_node_name(): def get_node_name():
# leader-elector container based on K8s way of doing leader election is # leader-elector container based on K8s way of doing leader election is
# assuming that hostname it sees is the node id. Containers within a pod # assuming that hostname it sees is the node id. Containers within a pod
@ -233,8 +248,12 @@ def has_limit(quota):
def is_available(resource, resource_quota): def is_available(resource, resource_quota):
availability = resource_quota['limit'] - resource_quota['used'] availability = resource_quota['limit'] - resource_quota['used']
if availability <= 0: if availability <= 3:
LOG.error("Quota exceeded for resource: %s", resource) LOG.warning("Neutron quota low for %s. Used %d out of %d limit.",
resource, resource_quota['limit'], resource_quota['used'])
elif availability <= 0:
LOG.error("Neutron quota exceeded for %s. Used %d out of %d limit.",
resource, resource_quota['limit'], resource_quota['used'])
return False return False
return True return True
@ -243,9 +262,11 @@ def has_kuryr_crd(crd_url):
k8s = clients.get_kubernetes_client() k8s = clients.get_kubernetes_client()
try: try:
k8s.get(crd_url, json=False, headers={'Connection': 'close'}) k8s.get(crd_url, json=False, headers={'Connection': 'close'})
except exceptions.K8sResourceNotFound:
LOG.error('CRD %s does not exists.', crd_url)
except exceptions.K8sClientException: except exceptions.K8sClientException:
LOG.exception("Kubernetes Client Exception fetching" LOG.exception('Error fetching CRD %s, assuming it does not exist.',
" CRD. %s" % exceptions.K8sClientException) crd_url)
return False return False
return True return True