# Copyright 2017 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from ctypes import c_bool import errno from http import client as httplib import multiprocessing import os import socket import sys import threading import time import urllib.parse import urllib3 import cotyledon import flask import pyroute2 from pyroute2.ipdb import transactional import os_vif from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils from kuryr_kubernetes import clients from kuryr_kubernetes.cni import handlers as h_cni from kuryr_kubernetes.cni import health from kuryr_kubernetes.cni.plugins import k8s_cni_registry from kuryr_kubernetes.cni import utils as cni_utils from kuryr_kubernetes import config from kuryr_kubernetes import constants as k_const from kuryr_kubernetes import exceptions from kuryr_kubernetes import objects from kuryr_kubernetes import utils from kuryr_kubernetes import watcher as k_watcher LOG = logging.getLogger(__name__) CONF = cfg.CONF HEALTH_CHECKER_DELAY = 5 ErrInvalidEnvironmentVariables = 4 ErrTryAgainLater = 11 ErrInternal = 999 class DaemonServer(object): def __init__(self, plugin, healthy): self.ctx = None self.plugin = plugin self.healthy = healthy self.failure_count = multiprocessing.Value('i', 0) self.application = flask.Flask('kuryr-daemon') self.application.add_url_rule( '/addNetwork', methods=['POST'], view_func=self.add) self.application.add_url_rule( '/delNetwork', methods=['POST'], view_func=self.delete) self.headers = {'ContentType': 'application/json', 'Connection': 'close'} def _prepare_request(self): params = cni_utils.CNIParameters(flask.request.get_json()) LOG.debug('Received %s request. CNI Params: %s', params.CNI_COMMAND, params) return params def _error(self, error_code, message, details=""): template = { "code": error_code, "msg": message, "details": details } data = jsonutils.dumps(template) return data def add(self): try: params = self._prepare_request() except Exception: self._check_failure() LOG.exception('Exception when reading CNI params.') error = self._error(ErrInvalidEnvironmentVariables, "Required CNI params missing.") return error, httplib.BAD_REQUEST, self.headers try: vif = self.plugin.add(params) data = jsonutils.dumps(vif.obj_to_primitive()) except exceptions.ResourceNotReady as e: self._check_failure() LOG.error('Error when processing addNetwork request') error = self._error(ErrTryAgainLater, f"{e}. Try Again Later.") return error, httplib.GATEWAY_TIMEOUT, self.headers except pyroute2.NetlinkError as e: if e.code == errno.EEXIST: self._check_failure() args = {'kind': 'vlan', 'vlan_id': vif.vlan_id} LOG.warning( f'Creation of pod interface failed due to VLAN ID ' f'(vlan_info={args}) conflict. Probably the CRI had not ' f'cleaned up the network namespace of deleted pods. ' f'Attempting to retry.') error = self._error(ErrTryAgainLater, "Creation of pod interface failed due to" " vlan_id. Try Again Later", f"vlan_id:{vif.vlan_id}") return error, httplib.GATEWAY_TIMEOUT, self.headers raise except Exception: if not self.healthy.value: error = self._error(ErrInternal, "Maximum CNI ADD Failures Reached.", "Error when processing addNetwork request." " CNI Params: {}".format(params)) else: self._check_failure() error = self._error(ErrInternal, "Error processing request", "Failure processing addNetwork request. " "CNI Params: {}".format(params)) LOG.exception('Error when processing addNetwork request. CNI ' 'Params: %s', params) return error, httplib.INTERNAL_SERVER_ERROR, self.headers return data, httplib.ACCEPTED, self.headers def delete(self): try: params = self._prepare_request() except Exception: LOG.exception('Exception when reading CNI params.') error = self._error(ErrInvalidEnvironmentVariables, "Required CNI params missing.") return error, httplib.BAD_REQUEST, self.headers try: self.plugin.delete(params) except exceptions.ResourceNotReady: # NOTE(dulek): It's better to ignore this error - most of the time # it will happen when pod is long gone and CRI # overzealously tries to delete it from the network. # We cannot really do anything without VIF annotation, # so let's just tell CRI to move along. LOG.warning('Error when processing delNetwork request. ' 'Ignoring this error, pod is most likely gone') return '', httplib.NO_CONTENT, self.headers except Exception: if not self.healthy.value: error = self._error(ErrInternal, "Maximum CNI DEL Failures Reached.", "Error processing delNetwork request. " "CNI Params: {}".format(params)) else: self._check_failure() error = self._error(ErrInternal, "Error processing request", "Failure processing delNetwork request. " "CNI Params: {}".format(params)) LOG.exception('Error when processing delNetwork request. CNI ' 'Params: %s.', params) return error, httplib.INTERNAL_SERVER_ERROR, self.headers return '', httplib.NO_CONTENT, self.headers def run(self): server_pair = CONF.cni_daemon.bind_address LOG.info('Starting server on %s.', server_pair) try: address, port = server_pair.split(':') port = int(port) except ValueError: LOG.exception('Cannot start server on %s.', server_pair) raise try: self.application.run(address, port, threaded=False, processes=CONF.cni_daemon.worker_num) except Exception: LOG.exception('Failed to start kuryr-daemon.') raise def _check_failure(self): with self.failure_count.get_lock(): if self.failure_count.value < CONF.cni_daemon.cni_failures_count: self.failure_count.value += 1 else: with self.healthy.get_lock(): LOG.debug("Reporting maximum CNI ADD/DEL failures " "reached.") self.healthy.value = False class CNIDaemonServerService(cotyledon.Service): name = "server" def __init__(self, worker_id, registry, healthy): super(CNIDaemonServerService, self).__init__(worker_id) self.registry = registry self.healthy = healthy self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin(registry, self.healthy) self.server = DaemonServer(self.plugin, self.healthy) def run(self): # NOTE(dulek): We might do a *lot* of pyroute2 operations, let's # make the pyroute2 timeout configurable to make sure # kernel will have chance to catch up. transactional.SYNC_TIMEOUT = CONF.cni_daemon.pyroute2_timeout # Run HTTP server self.server.run() class CNIDaemonWatcherService(cotyledon.Service): name = "watcher" def __init__(self, worker_id, registry, healthy): super(CNIDaemonWatcherService, self).__init__(worker_id) self.pipeline = None self.watcher = None self.health_thread = None self.registry = registry self.healthy = healthy def _get_nodename(self): # NOTE(dulek): At first try to get it using environment variable, # otherwise assume hostname is the nodename. try: nodename = os.environ['KUBERNETES_NODE_NAME'] except KeyError: # NOTE(dulek): By default K8s nodeName is lowercased hostname. nodename = socket.gethostname().lower() return nodename def run(self): self.pipeline = h_cni.CNIPipeline() self.pipeline.register(h_cni.CallbackHandler(self.on_done, self.on_deleted)) self.watcher = k_watcher.Watcher(self.pipeline) query_label = urllib.parse.quote_plus(f'{k_const.KURYRPORT_LABEL}=' f'{self._get_nodename()}') self.watcher.add(f'{k_const.K8S_API_CRD_KURYRPORTS}' f'?labelSelector={query_label}') self.is_running = True self.health_thread = threading.Thread( target=self._start_watcher_health_checker) self.health_thread.start() self.watcher.start() def _start_watcher_health_checker(self): while self.is_running: if not self.watcher.is_alive(): LOG.debug("Reporting watcher not healthy.") with self.healthy.get_lock(): self.healthy.value = False time.sleep(HEALTH_CHECKER_DELAY) def on_done(self, kuryrport, vifs): kp_name = utils.get_res_unique_name(kuryrport) with lockutils.lock(kp_name, external=True): if (kp_name not in self.registry or self.registry[kp_name]['kp']['metadata']['uid'] != kuryrport['metadata']['uid']): self.registry[kp_name] = {'kp': kuryrport, 'vifs': vifs, 'containerid': None, 'vif_unplugged': False, 'del_received': False} else: old_vifs = self.registry[kp_name]['vifs'] for iface in vifs: if old_vifs[iface].active != vifs[iface].active: kp_dict = self.registry[kp_name] kp_dict['vifs'] = vifs self.registry[kp_name] = kp_dict def on_deleted(self, kp): kp_name = utils.get_res_unique_name(kp) try: if kp_name in self.registry: # NOTE(ndesh): We need to lock here to avoid race condition # with the deletion code for CNI DEL so that # we delete the registry entry exactly once with lockutils.lock(kp_name, external=True): if self.registry[kp_name]['vif_unplugged']: del self.registry[kp_name] else: kp_dict = self.registry[kp_name] kp_dict['del_received'] = True self.registry[kp_name] = kp_dict except KeyError: # This means someone else removed it. It's odd but safe to ignore. LOG.debug('KuryrPort %s entry already removed from registry while ' 'handling DELETED event. Ignoring.', kp_name) pass def terminate(self): self.is_running = False if self.health_thread: self.health_thread.join() if self.watcher: self.watcher.stop() class CNIDaemonHealthServerService(cotyledon.Service): name = "health" def __init__(self, worker_id, healthy): super(CNIDaemonHealthServerService, self).__init__(worker_id) self.health_server = health.CNIHealthServer(healthy) def run(self): self.health_server.run() class CNIDaemonServiceManager(cotyledon.ServiceManager): def __init__(self): super(CNIDaemonServiceManager, self).__init__() # TODO(dulek): Use cotyledon.oslo_config_glue to support conf reload. # TODO(vikasc): Should be done using dynamically loadable OVO types # plugin. objects.register_locally_defined_vifs() os_vif.initialize() clients.setup_kubernetes_client() if CONF.sriov.enable_pod_resource_service: clients.setup_pod_resources_client() self.manager = multiprocessing.Manager() registry = self.manager.dict() # For Watcher->Server communication. healthy = multiprocessing.Value(c_bool, True) self.add(CNIDaemonWatcherService, workers=1, args=(registry, healthy,)) self.add(CNIDaemonServerService, workers=1, args=(registry, healthy,)) self.add(CNIDaemonHealthServerService, workers=1, args=(healthy,)) self.register_hooks(on_terminate=self.terminate) def run(self): # FIXME(darshna): Remove pyroute2 IPDB deprecation warning, remove # once we stop using pyroute2.IPDB. logging.getLogger('pyroute2').setLevel(logging.ERROR) reaper_thread = threading.Thread(target=self._zombie_reaper, daemon=True) self._terminate_called = threading.Event() reaper_thread.start() super(CNIDaemonServiceManager, self).run() def _zombie_reaper(self): while True: try: res = os.waitpid(-1, os.WNOHANG) # don't sleep or stop if a zombie process was found # as there could be more if res != (0, 0): continue except ChildProcessError: # There are no child processes yet (or they have been killed) pass except os.error: LOG.exception("Got OS error while reaping zombie processes") if self._terminate_called.isSet(): break time.sleep(1) def terminate(self): self._terminate_called.set() self.manager.shutdown() def start(): urllib3.disable_warnings() config.init(sys.argv[1:]) config.setup_logging() CNIDaemonServiceManager().run()