# Copyright 2017 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import multiprocessing from six.moves import http_client as httplib import socket import sys import cotyledon import flask from pyroute2.ipdb import transactional import retrying import os_vif from os_vif import objects as obj_vif from os_vif.objects import base from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils from kuryr_kubernetes import clients from kuryr_kubernetes.cni import api from kuryr_kubernetes.cni.binding import base as b_base from kuryr_kubernetes.cni import handlers as h_cni from kuryr_kubernetes.cni import utils from kuryr_kubernetes import config from kuryr_kubernetes import constants as k_const from kuryr_kubernetes import exceptions from kuryr_kubernetes import objects from kuryr_kubernetes import watcher as k_watcher LOG = logging.getLogger(__name__) CONF = cfg.CONF # TODO(dulek): Another corner case is (and was) when pod is deleted before it's # annotated by controller or even noticed by any watcher. Kubelet # will try to delete such vif, but we will have no data about it. # This is currently worked around by returning succesfully in case # of timing out in delete. To solve this properly we need to watch # for pod deletes as well. class K8sCNIRegistryPlugin(api.CNIPlugin): def __init__(self, registry): self.registry = registry def _get_name(self, pod): return pod['metadata']['name'] def add(self, params): vif = self._do_work(params, b_base.connect) # NOTE(dulek): Saving containerid to be able to distinguish old DEL # requests that we should ignore. We need to replace whole # object in the dict for multiprocessing.Manager to work. pod_name = params.args.K8S_POD_NAME d = self.registry[pod_name] d['containerid'] = params.CNI_CONTAINERID self.registry[pod_name] = d LOG.debug('Saved containerid = %s for pod %s', params.CNI_CONTAINERID, pod_name) return vif def delete(self, params): pod_name = params.args.K8S_POD_NAME try: reg_ci = self.registry[pod_name]['containerid'] LOG.debug('Read containerid = %s for pod %s', reg_ci, pod_name) if reg_ci and reg_ci != params.CNI_CONTAINERID: # NOTE(dulek): This is a DEL request for some older (probably # failed) ADD call. We should ignore it or we'll # unplug a running pod. LOG.warning('Received DEL request for unknown ADD call. ' 'Ignoring.') return except KeyError: pass self._do_work(params, b_base.disconnect) def _do_work(self, params, fn): pod_name = params.args.K8S_POD_NAME timeout = CONF.cni_daemon.vif_annotation_timeout # In case of KeyError retry for `timeout` s, wait 1 s between tries. @retrying.retry(stop_max_delay=(timeout * 1000), wait_fixed=1000, retry_on_exception=lambda e: isinstance(e, KeyError)) def find(): return self.registry[pod_name] try: d = find() pod = d['pod'] vif = base.VersionedObject.obj_from_primitive(d['vif']) except KeyError: raise exceptions.ResourceNotReady(pod_name) fn(vif, self._get_inst(pod), params.CNI_IFNAME, params.CNI_NETNS) return vif def _get_inst(self, pod): return obj_vif.instance_info.InstanceInfo( uuid=pod['metadata']['uid'], name=pod['metadata']['name']) class DaemonServer(object): def __init__(self, plugin): self.ctx = None self.plugin = plugin self.application = flask.Flask('kuryr-daemon') self.application.add_url_rule( '/addNetwork', methods=['POST'], view_func=self.add) self.application.add_url_rule( '/delNetwork', methods=['POST'], view_func=self.delete) self.headers = {'ContentType': 'application/json', 'Connection': 'close'} def add(self): params = None try: params = utils.CNIParameters(flask.request.get_json()) LOG.debug('Received addNetwork request. CNI Params: %s', params) vif = self.plugin.add(params) data = jsonutils.dumps(vif.obj_to_primitive()) except exceptions.ResourceNotReady as e: LOG.error("Timed out waiting for requested pod to appear in " "registry: %s.", e) return '', httplib.GATEWAY_TIMEOUT, self.headers except Exception: LOG.exception('Error when processing addNetwork request. CNI ' 'Params: %s', params) return '', httplib.INTERNAL_SERVER_ERROR, self.headers return data, httplib.ACCEPTED, self.headers def delete(self): params = None try: params = utils.CNIParameters(flask.request.get_json()) LOG.debug('Received delNetwork request. CNI Params: %s', params) self.plugin.delete(params) except exceptions.ResourceNotReady as e: # NOTE(dulek): It's better to ignore this error - most of the time # it will happen when pod is long gone and kubelet # overzealously tries to delete it from the network. # We cannot really do anything without VIF annotation, # so let's just tell kubelet to move along. LOG.warning("Timed out waiting for requested pod to appear in " "registry: %s. Ignoring.", e) return '', httplib.NO_CONTENT, self.headers except Exception: LOG.exception('Error when processing delNetwork request. CNI ' 'Params: %s.', params) return '', httplib.INTERNAL_SERVER_ERROR, self.headers return '', httplib.NO_CONTENT, self.headers def run(self): server_pair = CONF.cni_daemon.bind_address LOG.info('Starting server on %s.', server_pair) try: address, port = server_pair.split(':') except ValueError: LOG.exception('Cannot start server on %s.', server_pair) raise try: self.application.run(address, port, processes=CONF.cni_daemon.worker_num) except Exception: LOG.exception('Failed to start kuryr-daemon.') raise class CNIDaemonServerService(cotyledon.Service): name = "server" def __init__(self, worker_id, registry): super(CNIDaemonServerService, self).__init__(worker_id) self.run_queue_reading = False self.registry = registry self.plugin = K8sCNIRegistryPlugin(registry) self.server = DaemonServer(self.plugin) def run(self): # NOTE(dulek): We might do a *lot* of pyroute2 operations, let's # make the pyroute2 timeout configurable to make sure # kernel will have chance to catch up. transactional.SYNC_TIMEOUT = CONF.cni_daemon.pyroute2_timeout # Run HTTP server self.server.run() class CNIDaemonWatcherService(cotyledon.Service): name = "watcher" def __init__(self, worker_id, registry): super(CNIDaemonWatcherService, self).__init__(worker_id) self.pipeline = None self.watcher = None self.registry = registry def run(self): self.pipeline = h_cni.CNIPipeline() self.pipeline.register(h_cni.CallbackHandler(self.on_done)) self.watcher = k_watcher.Watcher(self.pipeline) self.watcher.add( "%(base)s/pods?fieldSelector=spec.nodeName=%(node_name)s" % { 'base': k_const.K8S_API_BASE, 'node_name': socket.gethostname()}) self.watcher.start() def on_done(self, pod, vif): # Add to registry only if it isn't already there. if pod['metadata']['name'] not in self.registry: vif_dict = vif.obj_to_primitive() self.registry[pod['metadata']['name']] = {'pod': pod, 'vif': vif_dict, 'containerid': None} def terminate(self): if self.watcher: self.watcher.stop() class CNIDaemonServiceManager(cotyledon.ServiceManager): def __init__(self): super(CNIDaemonServiceManager, self).__init__() # TODO(dulek): Use cotyledon.oslo_config_glue to support conf reload. # TODO(vikasc): Should be done using dynamically loadable OVO types # plugin. objects.register_locally_defined_vifs() os_vif.initialize() clients.setup_kubernetes_client() self.manager = multiprocessing.Manager() registry = self.manager.dict() # For Watcher->Server communication. self.add(CNIDaemonWatcherService, workers=1, args=(registry,)) self.add(CNIDaemonServerService, workers=1, args=(registry,)) self.register_hooks(on_terminate=self.terminate) def run(self): super(CNIDaemonServiceManager, self).run() def terminate(self): self.manager.shutdown() def start(): config.init(sys.argv[1:]) config.setup_logging() CNIDaemonServiceManager().run()