diff --git a/kuryr_kubernetes/cni/binding/bridge.py b/kuryr_kubernetes/cni/binding/bridge.py index a96838656..ad9660163 100644 --- a/kuryr_kubernetes/cni/binding/bridge.py +++ b/kuryr_kubernetes/cni/binding/bridge.py @@ -112,5 +112,7 @@ class VIFOpenVSwitchDriver(BaseBridgeDriver): h_ipdb.interfaces[bridge_name] return True except Exception: - LOG.debug("Reporting Driver not healthy.") + LOG.error("The configured ovs_bridge=%s integration interface " + "does not exists. Reporting that driver is not healthy.", + bridge_name) return False diff --git a/kuryr_kubernetes/cni/health.py b/kuryr_kubernetes/cni/health.py index 285048650..0a206c7d5 100644 --- a/kuryr_kubernetes/cni/health.py +++ b/kuryr_kubernetes/cni/health.py @@ -13,15 +13,13 @@ from http import client as httplib import os -from flask import Flask +from oslo_config import cfg +from oslo_log import log as logging from pyroute2 import IPDB from kuryr.lib._i18n import _ -from kuryr_kubernetes import clients from kuryr_kubernetes.cni import utils -from kuryr_kubernetes import exceptions as exc -from oslo_config import cfg -from oslo_log import log as logging +from kuryr_kubernetes import health as base_server LOG = logging.getLogger(__name__) CONF = cfg.CONF @@ -88,7 +86,7 @@ def _get_memsw_usage(cgroup_mem_path): return memsw_in_bytes / BYTES_AMOUNT -class CNIHealthServer(object): +class CNIHealthServer(base_server.BaseHealthServer): """Server used by readiness and liveness probe to manage CNI health checks. Verifies presence of NET_ADMIN capabilities, IPDB in working order, @@ -98,33 +96,24 @@ class CNIHealthServer(object): def __init__(self, components_healthy): - self.ctx = None + super().__init__('daemon-health', CONF.cni_health_server.port) self._components_healthy = components_healthy - self.application = Flask('cni-health-daemon') - self.application.add_url_rule( - '/ready', methods=['GET'], view_func=self.readiness_status) - self.application.add_url_rule( - '/alive', methods=['GET'], view_func=self.liveness_status) - self.headers = {'Connection': 'close'} def readiness_status(self): - data = 'ok' k8s_conn = self.verify_k8s_connection() if not _has_cap(CAP_NET_ADMIN, EFFECTIVE_CAPS): error_message = 'NET_ADMIN capabilities not present.' LOG.error(error_message) - return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers + return error_message, httplib.INTERNAL_SERVER_ERROR, {} if not k8s_conn: - error_message = 'Error when processing k8s healthz request.' + error_message = 'K8s API healtz endpoint failed.' LOG.error(error_message) - return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers + return error_message, httplib.INTERNAL_SERVER_ERROR, {} - LOG.info('CNI driver readiness verified.') - return data, httplib.OK, self.headers + return 'ok', httplib.OK, {} def liveness_status(self): - data = 'ok' no_limit = -1 try: with IPDB(): @@ -132,7 +121,7 @@ class CNIHealthServer(object): except Exception: error_message = 'IPDB not in working order.' LOG.error(error_message) - return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers + return error_message, httplib.INTERNAL_SERVER_ERROR, {} if CONF.cni_health_server.max_memory_usage != no_limit: mem_usage = _get_memsw_usage(_get_cni_cgroup_path()) @@ -140,31 +129,12 @@ class CNIHealthServer(object): if mem_usage > CONF.cni_health_server.max_memory_usage: err_message = 'CNI daemon exceeded maximum memory usage.' LOG.error(err_message) - return err_message, httplib.INTERNAL_SERVER_ERROR, self.headers + return err_message, httplib.INTERNAL_SERVER_ERROR, {} with self._components_healthy.get_lock(): if not self._components_healthy.value: err_message = 'Kuryr CNI components not healthy.' LOG.error(err_message) - return err_message, httplib.INTERNAL_SERVER_ERROR, self.headers + return err_message, httplib.INTERNAL_SERVER_ERROR, {} - LOG.debug('Kuryr CNI Liveness verified.') - return data, httplib.OK, self.headers - - def run(self): - address = '::' - try: - LOG.info('Starting CNI health check server.') - self.application.run(address, CONF.cni_health_server.port) - except Exception: - LOG.exception('Failed to start CNI health check server.') - raise - - def verify_k8s_connection(self): - k8s = clients.get_kubernetes_client() - try: - k8s.get('/healthz', json=False, headers={'Connection': 'close'}) - except exc.K8sClientException: - LOG.exception('Exception when trying to reach Kubernetes API.') - return False - return True + return 'ok', httplib.OK, {} diff --git a/kuryr_kubernetes/controller/handlers/namespace.py b/kuryr_kubernetes/controller/handlers/namespace.py index 6268eaf09..ef2b9a675 100644 --- a/kuryr_kubernetes/controller/handlers/namespace.py +++ b/kuryr_kubernetes/controller/handlers/namespace.py @@ -144,9 +144,11 @@ class NamespaceHandler(k8s_base.ResourceEventHandler): raise def is_ready(self, quota): - if not utils.has_kuryr_crd(constants.K8S_API_CRD_KURYRNETS): + if not (utils.has_kuryr_crd(constants.K8S_API_CRD_KURYRNETS) and + self._check_quota(quota)): + LOG.error('Marking NamespaceHandler as not ready.') return False - return self._check_quota(quota) + return True def _check_quota(self, quota): resources = ('subnets', 'networks', 'security_groups') diff --git a/kuryr_kubernetes/controller/handlers/policy.py b/kuryr_kubernetes/controller/handlers/policy.py index e7caa53d1..b916e3210 100644 --- a/kuryr_kubernetes/controller/handlers/policy.py +++ b/kuryr_kubernetes/controller/handlers/policy.py @@ -127,9 +127,11 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler): self._drv_lbaas.update_lbaas_sg(svc, sgs) def is_ready(self, quota): - if not utils.has_kuryr_crd(k_const.K8S_API_CRD_KURYRNETPOLICIES): + if not (utils.has_kuryr_crd(k_const.K8S_API_CRD_KURYRNETPOLICIES) and + self._check_quota(quota)): + LOG.error("Marking NetworkPolicyHandler as not ready.") return False - return self._check_quota(quota) + return True def _check_quota(self, quota): if utils.has_limit(quota.security_groups): diff --git a/kuryr_kubernetes/controller/handlers/vif.py b/kuryr_kubernetes/controller/handlers/vif.py index 9ea5cd117..10f96b7fe 100644 --- a/kuryr_kubernetes/controller/handlers/vif.py +++ b/kuryr_kubernetes/controller/handlers/vif.py @@ -212,8 +212,10 @@ class VIFHandler(k8s_base.ResourceEventHandler): self._update_services(services, crd_pod_selectors, project_id) def is_ready(self, quota): - if utils.has_limit(quota.ports): - return utils.is_available('ports', quota.ports) + if (utils.has_limit(quota.ports) and + not utils.is_available('ports', quota.ports)): + LOG.error('Marking VIFHandler as not ready.') + return False return True @staticmethod diff --git a/kuryr_kubernetes/controller/managers/health.py b/kuryr_kubernetes/controller/managers/health.py index d8c5559be..1d9fc0d39 100644 --- a/kuryr_kubernetes/controller/managers/health.py +++ b/kuryr_kubernetes/controller/managers/health.py @@ -15,7 +15,6 @@ from http import client as httplib import os -from flask import Flask from oslo_config import cfg from oslo_log import log as logging @@ -24,8 +23,8 @@ from kuryr.lib import config as kuryr_config from kuryr.lib import utils from kuryr_kubernetes import clients from kuryr_kubernetes import config -from kuryr_kubernetes import exceptions as exc from kuryr_kubernetes.handlers import health as h_health +from kuryr_kubernetes import health as base_server LOG = logging.getLogger(__name__) CONF = cfg.CONF @@ -39,7 +38,7 @@ health_server_opts = [ CONF.register_opts(health_server_opts, "health_server") -class HealthServer(object): +class HealthServer(base_server.BaseHealthServer): """Proxy server used by readiness and liveness probes to manage health checks. Allows to verify connectivity with Kubernetes API, Keystone and Neutron. @@ -49,14 +48,8 @@ class HealthServer(object): """ def __init__(self): - self.ctx = None + super().__init__('controller-health', CONF.health_server.port) self._registry = h_health.HealthRegister.get_instance().registry - self.application = Flask('health-daemon') - self.application.add_url_rule( - '/ready', methods=['GET'], view_func=self.readiness_status) - self.application.add_url_rule( - '/alive', methods=['GET'], view_func=self.liveness_status) - self.headers = {'Connection': 'close'} def _components_ready(self): os_net = clients.get_network_client() @@ -70,64 +63,42 @@ class HealthServer(object): return True def readiness_status(self): - data = 'ok' - if CONF.kubernetes.vif_pool_driver != 'noop': if not os.path.exists('/tmp/pools_loaded'): error_message = 'Ports not loaded into the pools.' LOG.error(error_message) - return error_message, httplib.NOT_FOUND, self.headers + return error_message, httplib.NOT_FOUND, {} k8s_conn = self.verify_k8s_connection() if not k8s_conn: error_message = 'Error when processing k8s healthz request.' LOG.error(error_message) - return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers + return error_message, httplib.INTERNAL_SERVER_ERROR, {} try: self.verify_keystone_connection() except Exception as ex: error_message = ('Error when creating a Keystone session and ' 'getting a token: %s.' % ex) LOG.exception(error_message) - return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers + return error_message, httplib.INTERNAL_SERVER_ERROR, {} try: if not self._components_ready(): - return '', httplib.INTERNAL_SERVER_ERROR, self.headers + return '', httplib.INTERNAL_SERVER_ERROR, {} except Exception as ex: error_message = ('Error when processing neutron request %s' % ex) LOG.exception(error_message) - return error_message, httplib.INTERNAL_SERVER_ERROR, self.headers + return error_message, httplib.INTERNAL_SERVER_ERROR, {} - LOG.info('Kuryr Controller readiness verified.') - return data, httplib.OK, self.headers + return 'ok', httplib.OK, {} def liveness_status(self): - data = 'ok' for component in self._registry: if not component.is_alive(): - LOG.debug('Kuryr Controller not healthy.') - return '', httplib.INTERNAL_SERVER_ERROR, self.headers - LOG.debug('Kuryr Controller Liveness verified.') - return data, httplib.OK, self.headers - - def run(self): - address = '::' - try: - LOG.info('Starting health check server.') - self.application.run(address, CONF.health_server.port) - except Exception: - LOG.exception('Failed to start health check server.') - raise - - def verify_k8s_connection(self): - k8s = clients.get_kubernetes_client() - try: - k8s.get('/healthz', json=False, headers={'Connection': 'close'}) - except exc.K8sClientException: - LOG.exception('Exception when trying to reach Kubernetes API.') - return False - return True + msg = 'Component %s is dead.' % component.__class__.__name__ + LOG.error(msg) + return msg, httplib.INTERNAL_SERVER_ERROR, {} + return 'ok', httplib.OK, {} def verify_keystone_connection(self): # Obtain a new token to ensure connectivity with keystone diff --git a/kuryr_kubernetes/health.py b/kuryr_kubernetes/health.py new file mode 100644 index 000000000..d45d80017 --- /dev/null +++ b/kuryr_kubernetes/health.py @@ -0,0 +1,76 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc + +from flask import Flask +from oslo_config import cfg +from oslo_log import log as logging + +from kuryr_kubernetes import clients + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class BaseHealthServer(abc.ABC): + """Base class of server used to provide readiness and liveness probes.""" + + def __init__(self, app_name, port): + self.app_name = app_name + self.port = port + self.ctx = None + self.application = Flask(app_name) + self.application.add_url_rule( + '/ready', methods=['GET'], view_func=self.readiness_status) + self.application.add_url_rule( + '/alive', methods=['GET'], view_func=self.liveness_status) + + def apply_conn_close(response): + response.headers['Connection'] = 'close' + return response + + self.application.after_request(apply_conn_close) + + @abc.abstractmethod + def readiness_status(self): + raise NotImplementedError() + + @abc.abstractmethod + def liveness_status(self): + raise NotImplementedError() + + def run(self): + # Disable obtrusive werkzeug logs. + logging.getLogger('werkzeug').setLevel(logging.WARNING) + + address = '::' + LOG.info('Starting %s health check server on %s:%d.', self.app_name, + address, self.port) + try: + self.application.run(address, self.port) + except Exception: + LOG.exception('Failed to start %s health check server.', + self.app_name) + raise + + def verify_k8s_connection(self): + k8s = clients.get_kubernetes_client() + try: + k8s.get('/healthz', json=False, headers={'Connection': 'close'}) + except Exception as e: + # Not LOG.exception to make sure long message from K8s API is not + # repeated. + LOG.error('Exception when trying to reach Kubernetes API: %s.', e) + return False + + return True diff --git a/kuryr_kubernetes/k8s_client.py b/kuryr_kubernetes/k8s_client.py index 7311f756a..aed047d0d 100644 --- a/kuryr_kubernetes/k8s_client.py +++ b/kuryr_kubernetes/k8s_client.py @@ -16,6 +16,7 @@ import contextlib import itertools import os import ssl +import time from urllib import parse from oslo_log import log as logging @@ -27,10 +28,15 @@ from kuryr.lib._i18n import _ from kuryr_kubernetes import config from kuryr_kubernetes import constants from kuryr_kubernetes import exceptions as exc +from kuryr_kubernetes import utils CONF = config.CONF LOG = logging.getLogger(__name__) +# Hardcoding 60 seconds as I don't see a scenario when we want to wait more +# than a minute for reconnection. +MAX_BACKOFF = 60 + class K8sClient(object): # REVISIT(ivc): replace with python-k8sclient if it could be extended @@ -267,6 +273,7 @@ class K8sClient(object): if self.token: header.update({'Authorization': 'Bearer %s' % self.token}) + attempt = 0 while True: try: params = {'watch': 'true'} @@ -279,6 +286,7 @@ class K8sClient(object): timeout=timeouts)) as response: if not response.ok: raise exc.K8sClientException(response.text) + attempt = 0 for line in response.iter_lines(): line = line.decode('utf-8').strip() if line: @@ -289,19 +297,16 @@ class K8sClient(object): m = line_dict.get('object', {}).get('metadata', {}) resource_version = m.get('resourceVersion', None) except (requests.ReadTimeout, requests.ConnectionError, - ssl.SSLError) as e: - if isinstance(e, ssl.SSLError) and e.args != ('timed out',): - raise - - LOG.warning('%ds without data received from watching %s. ' - 'Retrying the connection with resourceVersion=%s.', - timeouts[1], path, params.get('resourceVersion')) - except requests.exceptions.ChunkedEncodingError: - LOG.warning("Connection to %s closed when watching. This " - "mostly happens when Octavia's Amphora closes " - "connection due to lack of activity for 50s. " - "Since Rocky Octavia this is configurable and " - "should be set to at least 20m, so check timeouts " - "on Kubernetes API LB listener. Restarting " - "connection with resourceVersion=%s.", path, - params.get('resourceVersion')) + ssl.SSLError, requests.exceptions.ChunkedEncodingError): + t = utils.exponential_backoff(attempt, min_backoff=0, + max_backoff=MAX_BACKOFF) + log = LOG.debug + if attempt > 0: + # Only make it a warning if it's happening again, no need + # to inform about all the read timeouts. + log = LOG.warning + log('Connection error when watching %s. Retrying in %ds with ' + 'resourceVersion=%s', path, t, + params.get('resourceVersion')) + time.sleep(t) + attempt += 1 diff --git a/kuryr_kubernetes/utils.py b/kuryr_kubernetes/utils.py index 06cd968df..ab5b912f8 100644 --- a/kuryr_kubernetes/utils.py +++ b/kuryr_kubernetes/utils.py @@ -43,6 +43,7 @@ VALID_MULTI_POD_POOLS_OPTS = {'noop': ['neutron-vif', } DEFAULT_TIMEOUT = 500 DEFAULT_INTERVAL = 3 +MAX_ATTEMPTS = 10 subnet_caching_opts = [ cfg.BoolOpt('caching', default=True), @@ -129,7 +130,7 @@ def exponential_sleep(deadline, attempt, interval=DEFAULT_INTERVAL): if seconds_left <= 0: return 0 - to_sleep = random.randint(1, 2 ** attempt - 1) * interval + to_sleep = exponential_backoff(attempt, interval) if to_sleep > seconds_left: to_sleep = seconds_left @@ -141,6 +142,20 @@ def exponential_sleep(deadline, attempt, interval=DEFAULT_INTERVAL): return to_sleep +def exponential_backoff(attempt, interval=DEFAULT_INTERVAL, min_backoff=1, + max_backoff=None): + if attempt >= MAX_ATTEMPTS: + # No need to calculate very long intervals + attempt = MAX_ATTEMPTS + + backoff = random.randint(min_backoff, 2 ** attempt - 1) * interval + + if max_backoff is not None and backoff > max_backoff: + backoff = max_backoff + + return backoff + + def get_node_name(): # leader-elector container based on K8s way of doing leader election is # assuming that hostname it sees is the node id. Containers within a pod @@ -233,8 +248,12 @@ def has_limit(quota): def is_available(resource, resource_quota): availability = resource_quota['limit'] - resource_quota['used'] - if availability <= 0: - LOG.error("Quota exceeded for resource: %s", resource) + if availability <= 3: + LOG.warning("Neutron quota low for %s. Used %d out of %d limit.", + resource, resource_quota['limit'], resource_quota['used']) + elif availability <= 0: + LOG.error("Neutron quota exceeded for %s. Used %d out of %d limit.", + resource, resource_quota['limit'], resource_quota['used']) return False return True @@ -243,9 +262,11 @@ def has_kuryr_crd(crd_url): k8s = clients.get_kubernetes_client() try: k8s.get(crd_url, json=False, headers={'Connection': 'close'}) + except exceptions.K8sResourceNotFound: + LOG.error('CRD %s does not exists.', crd_url) except exceptions.K8sClientException: - LOG.exception("Kubernetes Client Exception fetching" - " CRD. %s" % exceptions.K8sClientException) + LOG.exception('Error fetching CRD %s, assuming it does not exist.', + crd_url) return False return True