Merge "Civilize logging"

This commit is contained in:
Zuul 2020-07-14 11:31:24 +00:00 committed by Gerrit Code Review
commit 306171b415
9 changed files with 164 additions and 113 deletions

View File

@ -112,5 +112,7 @@ class VIFOpenVSwitchDriver(BaseBridgeDriver):
h_ipdb.interfaces[bridge_name]
return True
except Exception:
LOG.debug("Reporting Driver not healthy.")
LOG.error("The configured ovs_bridge=%s integration interface "
"does not exists. Reporting that driver is not healthy.",
bridge_name)
return False

View File

@ -13,15 +13,13 @@
from http import client as httplib
import os
from flask import Flask
from oslo_config import cfg
from oslo_log import log as logging
from pyroute2 import IPDB
from kuryr.lib._i18n import _
from kuryr_kubernetes import clients
from kuryr_kubernetes.cni import utils
from kuryr_kubernetes import exceptions as exc
from oslo_config import cfg
from oslo_log import log as logging
from kuryr_kubernetes import health as base_server
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
@ -88,7 +86,7 @@ def _get_memsw_usage(cgroup_mem_path):
return memsw_in_bytes / BYTES_AMOUNT
class CNIHealthServer(object):
class CNIHealthServer(base_server.BaseHealthServer):
"""Server used by readiness and liveness probe to manage CNI health checks.
Verifies presence of NET_ADMIN capabilities, IPDB in working order,
@ -98,33 +96,24 @@ class CNIHealthServer(object):
def __init__(self, components_healthy):
self.ctx = None
super().__init__('daemon-health', CONF.cni_health_server.port)
self._components_healthy = components_healthy
self.application = Flask('cni-health-daemon')
self.application.add_url_rule(
'/ready', methods=['GET'], view_func=self.readiness_status)
self.application.add_url_rule(
'/alive', methods=['GET'], view_func=self.liveness_status)
self.headers = {'Connection': 'close'}
def readiness_status(self):
data = 'ok'
k8s_conn = self.verify_k8s_connection()
if not _has_cap(CAP_NET_ADMIN, EFFECTIVE_CAPS):
error_message = 'NET_ADMIN capabilities not present.'
LOG.error(error_message)
return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers
return error_message, httplib.INTERNAL_SERVER_ERROR, {}
if not k8s_conn:
error_message = 'Error when processing k8s healthz request.'
error_message = 'K8s API healtz endpoint failed.'
LOG.error(error_message)
return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers
return error_message, httplib.INTERNAL_SERVER_ERROR, {}
LOG.info('CNI driver readiness verified.')
return data, httplib.OK, self.headers
return 'ok', httplib.OK, {}
def liveness_status(self):
data = 'ok'
no_limit = -1
try:
with IPDB():
@ -132,7 +121,7 @@ class CNIHealthServer(object):
except Exception:
error_message = 'IPDB not in working order.'
LOG.error(error_message)
return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers
return error_message, httplib.INTERNAL_SERVER_ERROR, {}
if CONF.cni_health_server.max_memory_usage != no_limit:
mem_usage = _get_memsw_usage(_get_cni_cgroup_path())
@ -140,31 +129,12 @@ class CNIHealthServer(object):
if mem_usage > CONF.cni_health_server.max_memory_usage:
err_message = 'CNI daemon exceeded maximum memory usage.'
LOG.error(err_message)
return err_message, httplib.INTERNAL_SERVER_ERROR, self.headers
return err_message, httplib.INTERNAL_SERVER_ERROR, {}
with self._components_healthy.get_lock():
if not self._components_healthy.value:
err_message = 'Kuryr CNI components not healthy.'
LOG.error(err_message)
return err_message, httplib.INTERNAL_SERVER_ERROR, self.headers
return err_message, httplib.INTERNAL_SERVER_ERROR, {}
LOG.debug('Kuryr CNI Liveness verified.')
return data, httplib.OK, self.headers
def run(self):
address = '::'
try:
LOG.info('Starting CNI health check server.')
self.application.run(address, CONF.cni_health_server.port)
except Exception:
LOG.exception('Failed to start CNI health check server.')
raise
def verify_k8s_connection(self):
k8s = clients.get_kubernetes_client()
try:
k8s.get('/healthz', json=False, headers={'Connection': 'close'})
except exc.K8sClientException:
LOG.exception('Exception when trying to reach Kubernetes API.')
return False
return True
return 'ok', httplib.OK, {}

View File

@ -144,9 +144,11 @@ class NamespaceHandler(k8s_base.ResourceEventHandler):
raise
def is_ready(self, quota):
if not utils.has_kuryr_crd(constants.K8S_API_CRD_KURYRNETS):
if not (utils.has_kuryr_crd(constants.K8S_API_CRD_KURYRNETS) and
self._check_quota(quota)):
LOG.error('Marking NamespaceHandler as not ready.')
return False
return self._check_quota(quota)
return True
def _check_quota(self, quota):
resources = ('subnets', 'networks', 'security_groups')

View File

@ -127,9 +127,11 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
self._drv_lbaas.update_lbaas_sg(svc, sgs)
def is_ready(self, quota):
if not utils.has_kuryr_crd(k_const.K8S_API_CRD_KURYRNETPOLICIES):
if not (utils.has_kuryr_crd(k_const.K8S_API_CRD_KURYRNETPOLICIES) and
self._check_quota(quota)):
LOG.error("Marking NetworkPolicyHandler as not ready.")
return False
return self._check_quota(quota)
return True
def _check_quota(self, quota):
if utils.has_limit(quota.security_groups):

View File

@ -212,8 +212,10 @@ class VIFHandler(k8s_base.ResourceEventHandler):
self._update_services(services, crd_pod_selectors, project_id)
def is_ready(self, quota):
if utils.has_limit(quota.ports):
return utils.is_available('ports', quota.ports)
if (utils.has_limit(quota.ports) and
not utils.is_available('ports', quota.ports)):
LOG.error('Marking VIFHandler as not ready.')
return False
return True
@staticmethod

View File

@ -15,7 +15,6 @@
from http import client as httplib
import os
from flask import Flask
from oslo_config import cfg
from oslo_log import log as logging
@ -24,8 +23,8 @@ from kuryr.lib import config as kuryr_config
from kuryr.lib import utils
from kuryr_kubernetes import clients
from kuryr_kubernetes import config
from kuryr_kubernetes import exceptions as exc
from kuryr_kubernetes.handlers import health as h_health
from kuryr_kubernetes import health as base_server
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
@ -39,7 +38,7 @@ health_server_opts = [
CONF.register_opts(health_server_opts, "health_server")
class HealthServer(object):
class HealthServer(base_server.BaseHealthServer):
"""Proxy server used by readiness and liveness probes to manage health checks.
Allows to verify connectivity with Kubernetes API, Keystone and Neutron.
@ -49,14 +48,8 @@ class HealthServer(object):
"""
def __init__(self):
self.ctx = None
super().__init__('controller-health', CONF.health_server.port)
self._registry = h_health.HealthRegister.get_instance().registry
self.application = Flask('health-daemon')
self.application.add_url_rule(
'/ready', methods=['GET'], view_func=self.readiness_status)
self.application.add_url_rule(
'/alive', methods=['GET'], view_func=self.liveness_status)
self.headers = {'Connection': 'close'}
def _components_ready(self):
os_net = clients.get_network_client()
@ -70,64 +63,42 @@ class HealthServer(object):
return True
def readiness_status(self):
data = 'ok'
if CONF.kubernetes.vif_pool_driver != 'noop':
if not os.path.exists('/tmp/pools_loaded'):
error_message = 'Ports not loaded into the pools.'
LOG.error(error_message)
return error_message, httplib.NOT_FOUND, self.headers
return error_message, httplib.NOT_FOUND, {}
k8s_conn = self.verify_k8s_connection()
if not k8s_conn:
error_message = 'Error when processing k8s healthz request.'
LOG.error(error_message)
return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers
return error_message, httplib.INTERNAL_SERVER_ERROR, {}
try:
self.verify_keystone_connection()
except Exception as ex:
error_message = ('Error when creating a Keystone session and '
'getting a token: %s.' % ex)
LOG.exception(error_message)
return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers
return error_message, httplib.INTERNAL_SERVER_ERROR, {}
try:
if not self._components_ready():
return '', httplib.INTERNAL_SERVER_ERROR, self.headers
return '', httplib.INTERNAL_SERVER_ERROR, {}
except Exception as ex:
error_message = ('Error when processing neutron request %s' % ex)
LOG.exception(error_message)
return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers
return error_message, httplib.INTERNAL_SERVER_ERROR, {}
LOG.info('Kuryr Controller readiness verified.')
return data, httplib.OK, self.headers
return 'ok', httplib.OK, {}
def liveness_status(self):
data = 'ok'
for component in self._registry:
if not component.is_alive():
LOG.debug('Kuryr Controller not healthy.')
return '', httplib.INTERNAL_SERVER_ERROR, self.headers
LOG.debug('Kuryr Controller Liveness verified.')
return data, httplib.OK, self.headers
def run(self):
address = '::'
try:
LOG.info('Starting health check server.')
self.application.run(address, CONF.health_server.port)
except Exception:
LOG.exception('Failed to start health check server.')
raise
def verify_k8s_connection(self):
k8s = clients.get_kubernetes_client()
try:
k8s.get('/healthz', json=False, headers={'Connection': 'close'})
except exc.K8sClientException:
LOG.exception('Exception when trying to reach Kubernetes API.')
return False
return True
msg = 'Component %s is dead.' % component.__class__.__name__
LOG.error(msg)
return msg, httplib.INTERNAL_SERVER_ERROR, {}
return 'ok', httplib.OK, {}
def verify_keystone_connection(self):
# Obtain a new token to ensure connectivity with keystone

View File

@ -0,0 +1,76 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from flask import Flask
from oslo_config import cfg
from oslo_log import log as logging
from kuryr_kubernetes import clients
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class BaseHealthServer(abc.ABC):
"""Base class of server used to provide readiness and liveness probes."""
def __init__(self, app_name, port):
self.app_name = app_name
self.port = port
self.ctx = None
self.application = Flask(app_name)
self.application.add_url_rule(
'/ready', methods=['GET'], view_func=self.readiness_status)
self.application.add_url_rule(
'/alive', methods=['GET'], view_func=self.liveness_status)
def apply_conn_close(response):
response.headers['Connection'] = 'close'
return response
self.application.after_request(apply_conn_close)
@abc.abstractmethod
def readiness_status(self):
raise NotImplementedError()
@abc.abstractmethod
def liveness_status(self):
raise NotImplementedError()
def run(self):
# Disable obtrusive werkzeug logs.
logging.getLogger('werkzeug').setLevel(logging.WARNING)
address = '::'
LOG.info('Starting %s health check server on %s:%d.', self.app_name,
address, self.port)
try:
self.application.run(address, self.port)
except Exception:
LOG.exception('Failed to start %s health check server.',
self.app_name)
raise
def verify_k8s_connection(self):
k8s = clients.get_kubernetes_client()
try:
k8s.get('/healthz', json=False, headers={'Connection': 'close'})
except Exception as e:
# Not LOG.exception to make sure long message from K8s API is not
# repeated.
LOG.error('Exception when trying to reach Kubernetes API: %s.', e)
return False
return True

View File

@ -16,6 +16,7 @@ import contextlib
import itertools
import os
import ssl
import time
from urllib import parse
from oslo_log import log as logging
@ -27,10 +28,15 @@ from kuryr.lib._i18n import _
from kuryr_kubernetes import config
from kuryr_kubernetes import constants
from kuryr_kubernetes import exceptions as exc
from kuryr_kubernetes import utils
CONF = config.CONF
LOG = logging.getLogger(__name__)
# Hardcoding 60 seconds as I don't see a scenario when we want to wait more
# than a minute for reconnection.
MAX_BACKOFF = 60
class K8sClient(object):
# REVISIT(ivc): replace with python-k8sclient if it could be extended
@ -267,6 +273,7 @@ class K8sClient(object):
if self.token:
header.update({'Authorization': 'Bearer %s' % self.token})
attempt = 0
while True:
try:
params = {'watch': 'true'}
@ -279,6 +286,7 @@ class K8sClient(object):
timeout=timeouts)) as response:
if not response.ok:
raise exc.K8sClientException(response.text)
attempt = 0
for line in response.iter_lines():
line = line.decode('utf-8').strip()
if line:
@ -289,19 +297,16 @@ class K8sClient(object):
m = line_dict.get('object', {}).get('metadata', {})
resource_version = m.get('resourceVersion', None)
except (requests.ReadTimeout, requests.ConnectionError,
ssl.SSLError) as e:
if isinstance(e, ssl.SSLError) and e.args != ('timed out',):
raise
LOG.warning('%ds without data received from watching %s. '
'Retrying the connection with resourceVersion=%s.',
timeouts[1], path, params.get('resourceVersion'))
except requests.exceptions.ChunkedEncodingError:
LOG.warning("Connection to %s closed when watching. This "
"mostly happens when Octavia's Amphora closes "
"connection due to lack of activity for 50s. "
"Since Rocky Octavia this is configurable and "
"should be set to at least 20m, so check timeouts "
"on Kubernetes API LB listener. Restarting "
"connection with resourceVersion=%s.", path,
params.get('resourceVersion'))
ssl.SSLError, requests.exceptions.ChunkedEncodingError):
t = utils.exponential_backoff(attempt, min_backoff=0,
max_backoff=MAX_BACKOFF)
log = LOG.debug
if attempt > 0:
# Only make it a warning if it's happening again, no need
# to inform about all the read timeouts.
log = LOG.warning
log('Connection error when watching %s. Retrying in %ds with '
'resourceVersion=%s', path, t,
params.get('resourceVersion'))
time.sleep(t)
attempt += 1

View File

@ -43,6 +43,7 @@ VALID_MULTI_POD_POOLS_OPTS = {'noop': ['neutron-vif',
}
DEFAULT_TIMEOUT = 500
DEFAULT_INTERVAL = 3
MAX_ATTEMPTS = 10
subnet_caching_opts = [
cfg.BoolOpt('caching', default=True),
@ -129,7 +130,7 @@ def exponential_sleep(deadline, attempt, interval=DEFAULT_INTERVAL):
if seconds_left <= 0:
return 0
to_sleep = random.randint(1, 2 ** attempt - 1) * interval
to_sleep = exponential_backoff(attempt, interval)
if to_sleep > seconds_left:
to_sleep = seconds_left
@ -141,6 +142,20 @@ def exponential_sleep(deadline, attempt, interval=DEFAULT_INTERVAL):
return to_sleep
def exponential_backoff(attempt, interval=DEFAULT_INTERVAL, min_backoff=1,
max_backoff=None):
if attempt >= MAX_ATTEMPTS:
# No need to calculate very long intervals
attempt = MAX_ATTEMPTS
backoff = random.randint(min_backoff, 2 ** attempt - 1) * interval
if max_backoff is not None and backoff > max_backoff:
backoff = max_backoff
return backoff
def get_node_name():
# leader-elector container based on K8s way of doing leader election is
# assuming that hostname it sees is the node id. Containers within a pod
@ -233,8 +248,12 @@ def has_limit(quota):
def is_available(resource, resource_quota):
availability = resource_quota['limit'] - resource_quota['used']
if availability <= 0:
LOG.error("Quota exceeded for resource: %s", resource)
if availability <= 3:
LOG.warning("Neutron quota low for %s. Used %d out of %d limit.",
resource, resource_quota['limit'], resource_quota['used'])
elif availability <= 0:
LOG.error("Neutron quota exceeded for %s. Used %d out of %d limit.",
resource, resource_quota['limit'], resource_quota['used'])
return False
return True
@ -243,9 +262,11 @@ def has_kuryr_crd(crd_url):
k8s = clients.get_kubernetes_client()
try:
k8s.get(crd_url, json=False, headers={'Connection': 'close'})
except exceptions.K8sResourceNotFound:
LOG.error('CRD %s does not exists.', crd_url)
except exceptions.K8sClientException:
LOG.exception("Kubernetes Client Exception fetching"
" CRD. %s" % exceptions.K8sClientException)
LOG.exception('Error fetching CRD %s, assuming it does not exist.',
crd_url)
return False
return True