CNI split - introducing CNI daemon

This commit implements basic CNI daemon service. The aim of this new
entity is to increase scalability of CNI operations by moving watching
for VIF to a separate process.

This commit:
* Introduces kuryr-daemon service
* Implements communication between CNI driver and CNI daemon using HTTP
* Consolidates watching for VIF on CNI side to a single Watcher that
  looks for all the pods on the node it is running on.
* Solves bug 1731485 when running with CNI daemon.
* Enables new service in DevStack plugin
* Provides unit tests for new code.

Follow up patches will include:
- Documentation.
- Support for running in containerized mode.

To test the patch add `enable_service kuryr-daemon` to your DevStack's
local.conf file.

Partial-Bug: 1731485
Co-Authored-By: Janonymous <janonymous.codevulture@gmail.com>
Implements: blueprint cni-split-exec-daemon
Change-Id: I1bd6406dacab0735a94474e146645c63d933be16
This commit is contained in:
Michał Dulko 2017-10-25 21:29:22 +02:00
parent 1c2320e11e
commit 2f65d993f3
17 changed files with 981 additions and 72 deletions

View File

@ -176,6 +176,20 @@ enable_service kubelet
# resource events and convert them to Neutron actions # resource events and convert them to Neutron actions
enable_service kuryr-kubernetes enable_service kuryr-kubernetes
# Kuryr Daemon
# ============
#
# Kuryr can run CNI plugin in daemonized way - i.e. kubelet will run kuryr CNI
# driver and the driver will pass requests to Kuryr daemon running on the node,
# instead of processing them on its own. This limits the number of Kubernetes
# API requests (as only Kuryr Daemon will watch for new pod events) and should
# increase scalability in environments that often delete and create pods.
# Please note that kuryr-daemon is not yet supported in containerized
# deployment. To enable kuryr-daemon uncomment next line.
# enable_service kuryr-daemon
# Containerized Kuryr # Containerized Kuryr
# =================== # ===================
# #

View File

@ -63,6 +63,10 @@ function configure_kuryr {
iniset "$KURYR_CONFIG" kubernetes port_debug "$KURYR_PORT_DEBUG" iniset "$KURYR_CONFIG" kubernetes port_debug "$KURYR_PORT_DEBUG"
if is_service_enabled kuryr-daemon; then
iniset "$KURYR_CONFIG" cni_daemon daemon_enabled True
fi
create_kuryr_cache_dir create_kuryr_cache_dir
# Neutron API server & Neutron plugin # Neutron API server & Neutron plugin
@ -518,10 +522,23 @@ function run_kuryr_kubernetes {
} }
function run_kuryr_daemon {
local daemon_bin=$(which kuryr-daemon)
run_process kuryr-daemon "$daemon_bin --config-file $KURYR_CONFIG" root root
}
source $DEST/kuryr-kubernetes/devstack/lib/kuryr_kubernetes source $DEST/kuryr-kubernetes/devstack/lib/kuryr_kubernetes
# main loop # main loop
if [[ "$1" == "stack" && "$2" == "install" ]]; then if [[ "$1" == "stack" && "$2" == "pre-install" ]]; then
KURYR_K8S_CONTAINERIZED_DEPLOYMENT=$(trueorfalse False KURYR_K8S_CONTAINERIZED_DEPLOYMENT)
if is_service_enabled kuryr-daemon && [[ "$KURYR_K8S_CONTAINERIZED_DEPLOYMENT" == "True" ]]; then
die $LINENO "Cannot enable kuryr-daemon with KURYR_K8S_CONTAINERIZED_DEPLOYMENT."
fi
elif [[ "$1" == "stack" && "$2" == "install" ]]; then
setup_develop "$KURYR_HOME" setup_develop "$KURYR_HOME"
if is_service_enabled kubelet; then if is_service_enabled kubelet; then
KURYR_K8S_CONTAINERIZED_DEPLOYMENT=$(trueorfalse False KURYR_K8S_CONTAINERIZED_DEPLOYMENT) KURYR_K8S_CONTAINERIZED_DEPLOYMENT=$(trueorfalse False KURYR_K8S_CONTAINERIZED_DEPLOYMENT)
@ -580,6 +597,8 @@ if [[ "$1" == "stack" && "$2" == "extra" ]]; then
run_k8s_scheduler run_k8s_scheduler
fi fi
run_kuryr_daemon
if is_service_enabled kubelet; then if is_service_enabled kubelet; then
prepare_kubelet prepare_kubelet
extract_hyperkube extract_hyperkube
@ -620,6 +639,7 @@ if [[ "$1" == "unstack" ]]; then
elif is_service_enabled kubelet; then elif is_service_enabled kubelet; then
$KURYR_HYPERKUBE_BINARY kubectl delete nodes ${HOSTNAME} $KURYR_HYPERKUBE_BINARY kubectl delete nodes ${HOSTNAME}
fi fi
stop_process kuryr-daemon
docker kill devstack-k8s-setup-files docker kill devstack-k8s-setup-files
docker rm devstack-k8s-setup-files docker rm devstack-k8s-setup-files

View File

@ -0,0 +1,22 @@
# Copyright 2017 NEC Technologies India Pvt. Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kuryr_kubernetes.cni.daemon import service
start = service.start
if __name__ == '__main__':
start()

View File

@ -13,49 +13,25 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import abc import abc
import six import six
from six.moves import http_client as httplib
import traceback import traceback
import requests
from kuryr.lib._i18n import _ from kuryr.lib._i18n import _
from os_vif.objects import base
from oslo_log import log as logging from oslo_log import log as logging
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
from kuryr_kubernetes.cni import utils
from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes import exceptions as k_exc from kuryr_kubernetes import exceptions as k_exc
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
_CNI_TIMEOUT = 60
class CNIConfig(dict):
def __init__(self, cfg):
super(CNIConfig, self).__init__(cfg)
for k, v in self.items():
if not k.startswith('_'):
setattr(self, k, v)
class CNIArgs(object):
def __init__(self, value):
for item in value.split(';'):
k, v = item.split('=', 1)
if not k.startswith('_'):
setattr(self, k, v)
class CNIParameters(object):
def __init__(self, env, cfg):
for k, v in env.items():
if k.startswith('CNI_'):
setattr(self, k, v)
self.config = CNIConfig(cfg)
self.args = CNIArgs(self.CNI_ARGS)
def __repr__(self):
return repr({key: value for key, value in self.__dict__.items() if
key.startswith('CNI_')})
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
@ -70,35 +46,20 @@ class CNIPlugin(object):
raise NotImplementedError() raise NotImplementedError()
@six.add_metaclass(abc.ABCMeta)
class CNIRunner(object): class CNIRunner(object):
# TODO(ivc): extend SUPPORTED_VERSIONS and format output based on # TODO(ivc): extend SUPPORTED_VERSIONS and format output based on
# requested params.CNI_VERSION and/or params.config.cniVersion # requested params.CNI_VERSION and/or params.config.cniVersion
VERSION = '0.3.0' VERSION = '0.3.0'
SUPPORTED_VERSIONS = ['0.3.0'] SUPPORTED_VERSIONS = ['0.3.0']
def __init__(self, plugin): @abc.abstractmethod
self._plugin = plugin def _add(self, params):
raise NotImplementedError()
def run(self, env, fin, fout): @abc.abstractmethod
try: def _delete(self, params):
params = CNIParameters(env, jsonutils.load(fin)) raise NotImplementedError()
if params.CNI_COMMAND == 'ADD':
vif = self._plugin.add(params)
self._write_vif(fout, vif)
elif params.CNI_COMMAND == 'DEL':
self._plugin.delete(params)
elif params.CNI_COMMAND == 'VERSION':
self._write_version(fout)
else:
raise k_exc.CNIError(_("unknown CNI_COMMAND: %s")
% params.CNI_COMMAND)
return 0
except Exception as ex:
# LOG.exception
self._write_exception(fout, str(ex))
return 1
def _write_dict(self, fout, dct): def _write_dict(self, fout, dct):
output = {'cniVersion': self.VERSION} output = {'cniVersion': self.VERSION}
@ -116,7 +77,31 @@ class CNIRunner(object):
def _write_version(self, fout): def _write_version(self, fout):
self._write_dict(fout, {'supportedVersions': self.SUPPORTED_VERSIONS}) self._write_dict(fout, {'supportedVersions': self.SUPPORTED_VERSIONS})
def _write_vif(self, fout, vif): @abc.abstractmethod
def prepare_env(self, env, stdin):
raise NotImplementedError()
def run(self, env, fin, fout):
try:
# Prepare params according to calling Object
params = self.prepare_env(env, fin)
if env.get('CNI_COMMAND') == 'ADD':
vif = self._add(params)
self._write_dict(fout, vif)
elif env.get('CNI_COMMAND') == 'DEL':
self._delete(params)
elif env.get('CNI_COMMAND') == 'VERSION':
self._write_version(fout)
else:
raise k_exc.CNIError(_("unknown CNI_COMMAND: %s")
% env['CNI_COMMAND'])
return 0
except Exception as ex:
# LOG.exception
self._write_exception(fout, str(ex))
return 1
def _vif_data(self, vif):
result = {} result = {}
nameservers = [] nameservers = []
@ -137,5 +122,61 @@ class CNIRunner(object):
if nameservers: if nameservers:
result['dns'] = {'nameservers': nameservers} result['dns'] = {'nameservers': nameservers}
return result
self._write_dict(fout, result)
class CNIStandaloneRunner(CNIRunner):
def __init__(self, plugin):
self._plugin = plugin
def _add(self, params):
vif = self._plugin.add(params)
return self._vif_data(vif)
def _delete(self, params):
self._plugin.delete(params)
def prepare_env(self, env, stdin):
return utils.CNIParameters(env, stdin)
class CNIDaemonizedRunner(CNIRunner):
def _add(self, params):
resp = self._make_request('addNetwork', params, httplib.ACCEPTED)
vif = base.VersionedObject.obj_from_primitive(resp.json())
return self._vif_data(vif)
def _delete(self, params):
self._make_request('delNetwork', params, httplib.NO_CONTENT)
def prepare_env(self, env, stdin):
cni_envs = {}
cni_envs.update(
{k: v for k, v in env.items() if k.startswith('CNI_')})
cni_envs['config_kuryr'] = dict(stdin)
return cni_envs
def _make_request(self, path, cni_envs, expected_status=None):
method = 'POST'
address = config.CONF.cni_daemon.bind_address
url = 'http://%s/%s' % (address, path)
try:
LOG.debug('Making request to CNI Daemon. %(method)s %(path)s\n'
'%(body)s',
{'method': method, 'path': url, 'body': cni_envs})
resp = requests.post(url, json=cni_envs,
headers={'Connection': 'close'})
except requests.ConnectionError:
LOG.exception('Looks like %s cannot be reached. Is kuryr-daemon '
'running?', address)
raise
LOG.debug('CNI Daemon returned "%(status)d %(reason)s".',
{'status': resp.status_code, 'reason': resp.reason})
if expected_status and resp.status_code != expected_status:
LOG.error('CNI daemon returned error "%(status)d %(reason)s".',
{'status': resp.status_code, 'reason': resp.reason})
raise k_exc.CNIError('Got invalid status code from CNI daemon.')
return resp

View File

View File

@ -0,0 +1,267 @@
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
from six.moves import http_client as httplib
import socket
import sys
import cotyledon
import flask
from pyroute2.ipdb import transactional
import retrying
import os_vif
from os_vif import objects as obj_vif
from os_vif.objects import base
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from kuryr_kubernetes import clients
from kuryr_kubernetes.cni import api
from kuryr_kubernetes.cni.binding import base as b_base
from kuryr_kubernetes.cni import handlers as h_cni
from kuryr_kubernetes.cni import utils
from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes import exceptions
from kuryr_kubernetes import objects
from kuryr_kubernetes import watcher as k_watcher
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
# TODO(dulek): Another corner case is (and was) when pod is deleted before it's
# annotated by controller or even noticed by any watcher. Kubelet
# will try to delete such vif, but we will have no data about it.
# This is currently worked around by returning succesfully in case
# of timing out in delete. To solve this properly we need to watch
# for pod deletes as well.
class K8sCNIRegistryPlugin(api.CNIPlugin):
def __init__(self, registry):
self.registry = registry
def _get_name(self, pod):
return pod['metadata']['name']
def add(self, params):
vif = self._do_work(params, b_base.connect)
# NOTE(dulek): Saving containerid to be able to distinguish old DEL
# requests that we should ignore. We need to replace whole
# object in the dict for multiprocessing.Manager to work.
pod_name = params.args.K8S_POD_NAME
d = self.registry[pod_name]
d['containerid'] = params.CNI_CONTAINERID
self.registry[pod_name] = d
LOG.debug('Saved containerid = %s for pod %s', params.CNI_CONTAINERID,
pod_name)
return vif
def delete(self, params):
pod_name = params.args.K8S_POD_NAME
try:
reg_ci = self.registry[pod_name]['containerid']
LOG.debug('Read containerid = %s for pod %s', reg_ci, pod_name)
if reg_ci and reg_ci != params.CNI_CONTAINERID:
# NOTE(dulek): This is a DEL request for some older (probably
# failed) ADD call. We should ignore it or we'll
# unplug a running pod.
LOG.warning('Received DEL request for unknown ADD call. '
'Ignoring.')
return
except KeyError:
pass
self._do_work(params, b_base.disconnect)
def _do_work(self, params, fn):
pod_name = params.args.K8S_POD_NAME
timeout = CONF.cni_daemon.vif_annotation_timeout
# In case of KeyError retry for `timeout` s, wait 1 s between tries.
@retrying.retry(stop_max_delay=(timeout * 1000), wait_fixed=1000,
retry_on_exception=lambda e: isinstance(e, KeyError))
def find():
return self.registry[pod_name]
try:
d = find()
pod = d['pod']
vif = base.VersionedObject.obj_from_primitive(d['vif'])
except KeyError:
raise exceptions.ResourceNotReady(pod_name)
fn(vif, self._get_inst(pod), params.CNI_IFNAME, params.CNI_NETNS)
return vif
def _get_inst(self, pod):
return obj_vif.instance_info.InstanceInfo(
uuid=pod['metadata']['uid'], name=pod['metadata']['name'])
class DaemonServer(object):
def __init__(self, plugin):
self.ctx = None
self.plugin = plugin
self.application = flask.Flask('kuryr-daemon')
self.application.add_url_rule(
'/addNetwork', methods=['POST'], view_func=self.add)
self.application.add_url_rule(
'/delNetwork', methods=['POST'], view_func=self.delete)
self.headers = {'ContentType': 'application/json',
'Connection': 'close'}
def add(self):
params = None
try:
params = utils.CNIParameters(flask.request.get_json())
LOG.debug('Received addNetwork request. CNI Params: %s', params)
vif = self.plugin.add(params)
data = jsonutils.dumps(vif.obj_to_primitive())
except exceptions.ResourceNotReady as e:
LOG.error("Timed out waiting for requested pod to appear in "
"registry: %s.", e)
return '', httplib.GATEWAY_TIMEOUT, self.headers
except Exception:
LOG.exception('Error when processing addNetwork request. CNI '
'Params: %s', params)
return '', httplib.INTERNAL_SERVER_ERROR, self.headers
return data, httplib.ACCEPTED, self.headers
def delete(self):
params = None
try:
params = utils.CNIParameters(flask.request.get_json())
LOG.debug('Received delNetwork request. CNI Params: %s', params)
self.plugin.delete(params)
except exceptions.ResourceNotReady as e:
# NOTE(dulek): It's better to ignore this error - most of the time
# it will happen when pod is long gone and kubelet
# overzealously tries to delete it from the network.
# We cannot really do anything without VIF annotation,
# so let's just tell kubelet to move along.
LOG.warning("Timed out waiting for requested pod to appear in "
"registry: %s. Ignoring.", e)
return '', httplib.NO_CONTENT, self.headers
except Exception:
LOG.exception('Error when processing delNetwork request. CNI '
'Params: %s.', params)
return '', httplib.INTERNAL_SERVER_ERROR, self.headers
return '', httplib.NO_CONTENT, self.headers
def run(self):
server_pair = CONF.cni_daemon.bind_address
LOG.info('Starting server on %s.', server_pair)
try:
address, port = server_pair.split(':')
except ValueError:
LOG.exception('Cannot start server on %s.', server_pair)
raise
try:
self.application.run(address, port,
processes=CONF.cni_daemon.worker_num)
except Exception:
LOG.exception('Failed to start kuryr-daemon.')
raise
class CNIDaemonServerService(cotyledon.Service):
name = "server"
def __init__(self, worker_id, registry):
super(CNIDaemonServerService, self).__init__(worker_id)
self.run_queue_reading = False
self.registry = registry
self.plugin = K8sCNIRegistryPlugin(registry)
self.server = DaemonServer(self.plugin)
def run(self):
# NOTE(dulek): We might do a *lot* of pyroute2 operations, let's
# make the pyroute2 timeout configurable to make sure
# kernel will have chance to catch up.
transactional.SYNC_TIMEOUT = CONF.cni_daemon.pyroute2_timeout
# Run HTTP server
self.server.run()
class CNIDaemonWatcherService(cotyledon.Service):
name = "watcher"
def __init__(self, worker_id, registry):
super(CNIDaemonWatcherService, self).__init__(worker_id)
self.pipeline = None
self.watcher = None
self.registry = registry
def run(self):
self.pipeline = h_cni.CNIPipeline()
self.pipeline.register(h_cni.CallbackHandler(self.on_done))
self.watcher = k_watcher.Watcher(self.pipeline)
self.watcher.add(
"%(base)s/pods?fieldSelector=spec.nodeName=%(node_name)s" % {
'base': k_const.K8S_API_BASE,
'node_name': socket.gethostname()})
self.watcher.start()
def on_done(self, pod, vif):
# Add to registry only if it isn't already there.
if pod['metadata']['name'] not in self.registry:
vif_dict = vif.obj_to_primitive()
self.registry[pod['metadata']['name']] = {'pod': pod,
'vif': vif_dict,
'containerid': None}
def terminate(self):
if self.watcher:
self.watcher.stop()
class CNIDaemonServiceManager(cotyledon.ServiceManager):
def __init__(self):
super(CNIDaemonServiceManager, self).__init__()
# TODO(dulek): Use cotyledon.oslo_config_glue to support conf reload.
# TODO(vikasc): Should be done using dynamically loadable OVO types
# plugin.
objects.register_locally_defined_vifs()
os_vif.initialize()
clients.setup_kubernetes_client()
self.manager = multiprocessing.Manager()
registry = self.manager.dict() # For Watcher->Server communication.
self.add(CNIDaemonWatcherService, workers=1, args=(registry,))
self.add(CNIDaemonServerService, workers=1, args=(registry,))
self.register_hooks(on_terminate=self.terminate)
def run(self):
super(CNIDaemonServiceManager, self).run()
def terminate(self):
self.manager.shutdown()
def start():
config.init(sys.argv[1:])
config.setup_logging()
CNIDaemonServiceManager().run()

View File

@ -90,6 +90,15 @@ class DelHandler(CNIHandlerBase):
self._callback(vif) self._callback(vif)
class CallbackHandler(CNIHandlerBase):
def __init__(self, on_vif):
super(CallbackHandler, self).__init__(None, on_vif)
def on_vif(self, pod, vif):
self._callback(pod, vif)
class CNIPipeline(k_dis.EventPipeline): class CNIPipeline(k_dis.EventPipeline):
def _wrap_dispatcher(self, dispatcher): def _wrap_dispatcher(self, dispatcher):

View File

@ -18,16 +18,20 @@ import signal
import sys import sys
import os_vif import os_vif
from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from oslo_serialization import jsonutils
from kuryr_kubernetes import clients from kuryr_kubernetes import clients
from kuryr_kubernetes.cni import api as cni_api from kuryr_kubernetes.cni import api as cni_api
from kuryr_kubernetes.cni import handlers as h_cni from kuryr_kubernetes.cni import handlers as h_cni
from kuryr_kubernetes.cni import utils
from kuryr_kubernetes import config from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes import objects from kuryr_kubernetes import objects as k_objects
from kuryr_kubernetes import watcher as k_watcher from kuryr_kubernetes import watcher as k_watcher
CONF = cfg.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
_CNI_TIMEOUT = 180 _CNI_TIMEOUT = 180
@ -50,17 +54,6 @@ class K8sCNIPlugin(cni_api.CNIPlugin):
self._watcher.stop() self._watcher.stop()
def _setup(self, params): def _setup(self, params):
args = ['--config-file', params.config.kuryr_conf]
try:
if params.config.debug:
args.append('-d')
except AttributeError:
pass
config.init(args)
config.setup_logging()
os_vif.initialize()
clients.setup_kubernetes_client() clients.setup_kubernetes_client()
self._pipeline = h_cni.CNIPipeline() self._pipeline = h_cni.CNIPipeline()
self._watcher = k_watcher.Watcher(self._pipeline) self._watcher = k_watcher.Watcher(self._pipeline)
@ -76,11 +69,26 @@ def run():
# REVISIT(ivc): current CNI implementation provided by this package is # REVISIT(ivc): current CNI implementation provided by this package is
# experimental and its primary purpose is to enable development of other # experimental and its primary purpose is to enable development of other
# components (e.g. functional tests, service/LBaaSv2 support) # components (e.g. functional tests, service/LBaaSv2 support)
cni_conf = utils.CNIConfig(jsonutils.load(sys.stdin))
args = ['--config-file', cni_conf.kuryr_conf]
# TODO(vikasc): Should be done using dynamically loadable OVO types plugin. try:
objects.register_locally_defined_vifs() if cni_conf.debug:
args.append('-d')
except AttributeError:
pass
config.init(args)
config.setup_logging()
runner = cni_api.CNIRunner(K8sCNIPlugin()) # Initialize o.vo registry.
k_objects.register_locally_defined_vifs()
os_vif.initialize()
if CONF.cni_daemon.daemon_enabled:
runner = cni_api.CNIDaemonizedRunner()
else:
runner = cni_api.CNIStandaloneRunner(K8sCNIPlugin())
LOG.info("Using '%s' ", runner.__class__.__name__)
def _timeout(signum, frame): def _timeout(signum, frame):
runner._write_dict(sys.stdout, { runner._write_dict(sys.stdout, {
@ -92,7 +100,7 @@ def run():
signal.signal(signal.SIGALRM, _timeout) signal.signal(signal.SIGALRM, _timeout)
signal.alarm(_CNI_TIMEOUT) signal.alarm(_CNI_TIMEOUT)
status = runner.run(os.environ, sys.stdin, sys.stdout) status = runner.run(os.environ, cni_conf, sys.stdout)
LOG.debug("Exiting with status %s", status) LOG.debug("Exiting with status %s", status)
if status: if status:
sys.exit(status) sys.exit(status)

View File

@ -0,0 +1,47 @@
# Copyright (c) 2017 NEC Technologies India Pvt Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class CNIConfig(dict):
def __init__(self, cfg):
super(CNIConfig, self).__init__(cfg)
for k, v in self.items():
if not k.startswith('_'):
setattr(self, k, v)
class CNIArgs(object):
def __init__(self, value):
for item in value.split(';'):
k, v = item.split('=', 1)
if not k.startswith('_'):
setattr(self, k, v)
class CNIParameters(object):
def __init__(self, env, cfg=None):
for k, v in env.items():
if k.startswith('CNI_'):
setattr(self, k, v)
if cfg is None:
self.config = CNIConfig(env['config_kuryr'])
else:
self.config = cfg
self.args = CNIArgs(self.CNI_ARGS)
def __repr__(self):
return repr({key: value for key, value in self.__dict__.items() if
key.startswith('CNI_')})

View File

@ -30,6 +30,32 @@ kuryr_k8s_opts = [
'../../'))), '../../'))),
] ]
daemon_opts = [
cfg.BoolOpt('daemon_enabled',
help=_('Enable CNI Daemon configuration.'),
default=False),
cfg.StrOpt('bind_address',
help=_('Bind address for CNI daemon HTTP server. It is '
'recommened to allow only local connections.'),
default='127.0.0.1:50036'),
cfg.IntOpt('worker_num',
help=_('Maximum number of processes that will be spawned to '
'process requests from CNI driver.'),
default=30),
cfg.IntOpt('vif_annotation_timeout',
help=_('Time (in seconds) the CNI daemon will wait for VIF '
'annotation to appear in pod metadata before failing '
'the CNI request.'),
default=60),
cfg.IntOpt('pyroute2_timeout',
help=_('Kuryr uses pyroute2 library to manipulate networking '
'interfaces. When processing a high number of Kuryr '
'requests in parallel, it may take kernel more time to '
'process all networking stack changes. This option '
'allows to tune internal pyroute2 timeout.'),
default=10),
]
k8s_opts = [ k8s_opts = [
cfg.StrOpt('api_root', cfg.StrOpt('api_root',
help=_("The root URL of the Kubernetes API"), help=_("The root URL of the Kubernetes API"),
@ -125,6 +151,7 @@ octavia_defaults = [
CONF = cfg.CONF CONF = cfg.CONF
CONF.register_opts(kuryr_k8s_opts) CONF.register_opts(kuryr_k8s_opts)
CONF.register_opts(daemon_opts, group='cni_daemon')
CONF.register_opts(k8s_opts, group='kubernetes') CONF.register_opts(k8s_opts, group='kubernetes')
CONF.register_opts(neutron_defaults, group='neutron_defaults') CONF.register_opts(neutron_defaults, group='neutron_defaults')
CONF.register_opts(octavia_defaults, group='octavia_defaults') CONF.register_opts(octavia_defaults, group='octavia_defaults')

View File

@ -27,6 +27,7 @@ _kuryr_k8s_opts = [
('vif_pool', vif_pool.vif_pool_driver_opts), ('vif_pool', vif_pool.vif_pool_driver_opts),
('octavia_defaults', config.octavia_defaults), ('octavia_defaults', config.octavia_defaults),
('pool_manager', pool.pool_manager_opts), ('pool_manager', pool.pool_manager_opts),
('cni_daemon', config.daemon_opts),
] ]

View File

@ -0,0 +1,27 @@
# Copyright (c) 2017 NEC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from kuryr_kubernetes.tests import base as test_base
class TestDaemonCmd(test_base.TestCase):
@mock.patch('kuryr_kubernetes.cni.daemon.service.start')
def test_start(self, m_start):
from kuryr_kubernetes.cmd import daemon # To make it import a mock.
daemon.start()
m_start.assert_called()

View File

@ -0,0 +1,142 @@
# Copyright (c) 2017 NEC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from six import StringIO
import requests
from oslo_config import cfg
from oslo_serialization import jsonutils
from kuryr_kubernetes.cni import api
from kuryr_kubernetes.cni import main
from kuryr_kubernetes.tests import base as test_base
from kuryr_kubernetes.tests import fake
CONF = cfg.CONF
class TestCNIRunnerMixin(object):
def test_run_invalid(self, *args):
m_fin = StringIO()
m_fout = StringIO()
code = self.runner.run(
{'CNI_COMMAND': 'INVALID', 'CNI_ARGS': 'foo=bar'}, m_fin, m_fout)
self.assertEqual(1, code)
def test_run_write_version(self, *args):
m_fin = StringIO()
m_fout = StringIO()
code = self.runner.run(
{'CNI_COMMAND': 'VERSION', 'CNI_ARGS': 'foo=bar'}, m_fin, m_fout)
result = jsonutils.loads(m_fout.getvalue())
self.assertEqual(0, code)
self.assertEqual(api.CNIRunner.SUPPORTED_VERSIONS,
result['supportedVersions'])
self.assertEqual(api.CNIRunner.VERSION, result['cniVersion'])
class TestCNIStandaloneRunner(test_base.TestCase, TestCNIRunnerMixin):
def setUp(self):
super(TestCNIStandaloneRunner, self).setUp()
self.runner = api.CNIStandaloneRunner(main.K8sCNIPlugin())
@mock.patch('kuryr_kubernetes.cni.main.K8sCNIPlugin.add')
def test_run_add(self, m_k8s_add):
vif = fake._fake_vif()
m_k8s_add.return_value = vif
m_fin = StringIO()
m_fout = StringIO()
env = {
'CNI_COMMAND': 'ADD',
'CNI_ARGS': 'foo=bar',
}
self.runner.run(env, m_fin, m_fout)
self.assertTrue(m_k8s_add.called)
self.assertEqual('foo=bar', m_k8s_add.call_args[0][0].CNI_ARGS)
result = jsonutils.loads(m_fout.getvalue())
self.assertDictEqual(
{"cniVersion": "0.3.0",
"dns": {"nameservers": ["192.168.0.1"]},
"ip4": {"gateway": "192.168.0.1", "ip": "192.168.0.2/24"}},
result)
@mock.patch('kuryr_kubernetes.cni.main.K8sCNIPlugin.delete')
def test_run_del(self, m_k8s_delete):
vif = fake._fake_vif()
m_k8s_delete.return_value = vif
m_fin = StringIO()
m_fout = StringIO()
env = {
'CNI_COMMAND': 'DEL',
'CNI_ARGS': 'foo=bar',
}
self.runner.run(env, m_fin, m_fout)
self.assertTrue(m_k8s_delete.called)
self.assertEqual('foo=bar', m_k8s_delete.call_args[0][0].CNI_ARGS)
@mock.patch('requests.post')
class TestCNIDaemonizedRunner(test_base.TestCase, TestCNIRunnerMixin):
def setUp(self):
super(TestCNIDaemonizedRunner, self).setUp()
self.runner = api.CNIDaemonizedRunner()
self.port = int(CONF.cni_daemon.bind_address.split(':')[1])
def _test_run(self, cni_cmd, path, m_post):
m_fin = StringIO()
m_fout = StringIO()
env = {
'CNI_COMMAND': cni_cmd,
'CNI_ARGS': 'foo=bar',
}
result = self.runner.run(env, m_fin, m_fout)
m_post.assert_called_with(
'http://127.0.0.1:%d/%s' % (self.port, path),
json=mock.ANY, headers={'Connection': 'close'})
return result
def test_run_add(self, m_post):
m_response = mock.Mock(status_code=202)
m_response.json = mock.Mock(return_value=fake._fake_vif_dict())
m_post.return_value = m_response
result = self._test_run('ADD', 'addNetwork', m_post)
self.assertEqual(0, result)
def test_run_add_invalid(self, m_post):
m_response = mock.Mock(status_code=400)
m_response.json = mock.Mock()
m_post.return_value = m_response
result = self._test_run('ADD', 'addNetwork', m_post)
self.assertEqual(1, result)
m_response.json.assert_not_called()
def test_run_del(self, m_post):
m_post.return_value = mock.Mock(status_code=204)
result = self._test_run('DEL', 'delNetwork', m_post)
self.assertEqual(0, result)
def test_run_del_invalid(self, m_post):
m_post.return_value = mock.Mock(status_code=400)
result = self._test_run('DEL', 'delNetwork', m_post)
self.assertEqual(1, result)
def test_run_socket_error(self, m_post):
m_post.side_effect = requests.ConnectionError
result = self._test_run('DEL', 'delNetwork', m_post)
self.assertEqual(1, result)

View File

@ -0,0 +1,119 @@
# Copyright (c) 2017 NEC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from kuryr_kubernetes.cni import main
from kuryr_kubernetes import constants
from kuryr_kubernetes.tests import base as test_base
class TestCNIMain(test_base.TestCase):
@mock.patch('kuryr_kubernetes.cni.main.jsonutils.load')
@mock.patch('sys.exit')
@mock.patch('sys.stdin')
@mock.patch('kuryr_kubernetes.cni.utils.CNIConfig')
@mock.patch('kuryr_kubernetes.cni.api')
@mock.patch('kuryr_kubernetes.config.init')
@mock.patch('kuryr_kubernetes.config.setup_logging')
@mock.patch('kuryr_kubernetes.cni.api.CNIDaemonizedRunner')
def test_daemonized_run(self, m_cni_dr, m_setup_logging, m_config_init,
m_api, m_conf, m_sys, m_sysexit, m_json):
m_conf.debug = mock.Mock()
m_conf.debug.return_value = True
m_cni_dr.return_value = mock.MagicMock()
m_cni_daemon = m_cni_dr.return_value
cfg.CONF.set_override('daemon_enabled', True, group='cni_daemon')
main.run()
m_config_init.assert_called()
m_setup_logging.assert_called()
m_cni_daemon.run.assert_called()
m_sysexit.assert_called()
@mock.patch('kuryr_kubernetes.cni.main.jsonutils.load')
@mock.patch('sys.exit')
@mock.patch('sys.stdin')
@mock.patch('kuryr_kubernetes.cni.utils.CNIConfig')
@mock.patch('kuryr_kubernetes.cni.api')
@mock.patch('kuryr_kubernetes.config.init')
@mock.patch('kuryr_kubernetes.config.setup_logging')
@mock.patch('kuryr_kubernetes.cni.api.CNIStandaloneRunner')
def test_standalone_run(self, m_cni_sr, m_setup_logging, m_config_init,
m_api, m_conf, m_sys, m_sysexit, m_json):
m_conf.debug = mock.Mock()
m_conf.debug.return_value = True
m_cni_sr.return_value = mock.MagicMock()
m_cni_daemon = m_cni_sr.return_value
cfg.CONF.set_override('daemon_enabled', False, group='cni_daemon')
main.run()
m_config_init.assert_called()
m_setup_logging.assert_called()
m_cni_daemon.run.assert_called()
m_sysexit.assert_called()
class TestK8sCNIPlugin(test_base.TestCase):
@mock.patch('kuryr_kubernetes.watcher.Watcher')
@mock.patch('kuryr_kubernetes.cni.handlers.CNIPipeline')
@mock.patch('kuryr_kubernetes.cni.handlers.DelHandler')
@mock.patch('kuryr_kubernetes.cni.handlers.AddHandler')
def _test_method(self, method, m_add_handler, m_del_handler, m_cni_pipe,
m_watcher_class):
self.passed_handler = None
def _save_handler(params, handler):
self.passed_handler = handler
def _call_handler(*args):
self.passed_handler(mock.sentinel.vif)
m_add_handler.side_effect = _save_handler
m_del_handler.side_effect = _save_handler
m_watcher = mock.MagicMock(
add=mock.MagicMock(),
start=mock.MagicMock(side_effect=_call_handler))
m_watcher_class.return_value = m_watcher
m_params = mock.MagicMock()
m_params.args.K8S_POD_NAMESPACE = 'k8s_pod_namespace'
m_params.args.K8S_POD_NAME = 'k8s_pod'
cni_plugin = main.K8sCNIPlugin()
result = getattr(cni_plugin, method)(m_params)
self.assertEqual(mock.sentinel.vif, cni_plugin._vif)
m_watcher.add.assert_called_with(
"%(base)s/namespaces/%(namespace)s/pods"
"?fieldSelector=metadata.name=%(pod)s" % {
'base': constants.K8S_API_BASE,
'namespace': m_params.args.K8S_POD_NAMESPACE,
'pod': m_params.args.K8S_POD_NAME})
return result
def test_add(self):
result = self._test_method('add')
self.assertEqual(result, mock.sentinel.vif)
def test_delete(self):
self._test_method('delete')

View File

@ -0,0 +1,161 @@
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_config import cfg
from oslo_serialization import jsonutils
from kuryr_kubernetes.cni.daemon import service
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.tests import base
from kuryr_kubernetes.tests import fake
class TestK8sCNIRegistryPlugin(base.TestCase):
def setUp(self):
super(TestK8sCNIRegistryPlugin, self).setUp()
self.pod = {'metadata': {'name': 'foo', 'uid': 'bar'}}
self.vif = fake._fake_vif_dict()
registry = {'foo': {'pod': self.pod, 'vif': self.vif,
'containerid': None}}
self.plugin = service.K8sCNIRegistryPlugin(registry)
self.params = mock.Mock(args=mock.Mock(K8S_POD_NAME='foo'),
CNI_IFNAME='baz', CNI_NETNS=123,
CNI_CONTAINERID='cont_id')
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
def test_add_present(self, m_connect):
self.plugin.add(self.params)
m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123)
self.assertEqual('cont_id', self.plugin.registry['foo']['containerid'])
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
def test_del_present(self, m_disconnect):
self.plugin.delete(self.params)
m_disconnect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123)
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
def test_del_wrong_container_id(self, m_disconnect):
registry = {'foo': {'pod': self.pod, 'vif': self.vif,
'containerid': 'different'}}
self.plugin = service.K8sCNIRegistryPlugin(registry)
self.plugin.delete(self.params)
m_disconnect.assert_not_called()
@mock.patch('time.sleep', mock.Mock())
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
def test_add_present_on_5_try(self, m_connect):
se = [KeyError] * 5
se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None})
se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None})
m_getitem = mock.Mock(side_effect=se)
m_setitem = mock.Mock()
m_registry = mock.Mock(__getitem__=m_getitem, __setitem__=m_setitem)
self.plugin.registry = m_registry
self.plugin.add(self.params)
m_setitem.assert_called_once_with('foo', {'pod': self.pod,
'vif': self.vif,
'containerid': 'cont_id'})
m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123)
@mock.patch('time.sleep', mock.Mock())
def test_add_not_present(self):
cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon')
self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120,
group='cni_daemon')
m_getitem = mock.Mock(side_effect=KeyError)
m_registry = mock.Mock(__getitem__=m_getitem)
self.plugin.registry = m_registry
self.assertRaises(exceptions.ResourceNotReady, self.plugin.add,
self.params)
class TestDaemonServer(base.TestCase):
def setUp(self):
super(TestDaemonServer, self).setUp()
self.plugin = service.K8sCNIRegistryPlugin({})
self.srv = service.DaemonServer(self.plugin)
self.srv.application.testing = True
self.test_client = self.srv.application.test_client()
params = {'config_kuryr': {}, 'CNI_ARGS': 'foo=bar'}
self.params_str = jsonutils.dumps(params)
@mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.add')
def test_add(self, m_add):
vif = fake._fake_vif()
m_add.return_value = vif
resp = self.test_client.post('/addNetwork', data=self.params_str,
content_type='application/json')
m_add.assert_called_once_with(mock.ANY)
self.assertEqual(
fake._fake_vif_string(vif.obj_to_primitive()).encode(), resp.data)
self.assertEqual(202, resp.status_code)
@mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.add')
def test_add_timeout(self, m_add):
m_add.side_effect = exceptions.ResourceNotReady(mock.Mock())
resp = self.test_client.post('/addNetwork', data=self.params_str,
content_type='application/json')
m_add.assert_called_once_with(mock.ANY)
self.assertEqual(504, resp.status_code)
@mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.add')
def test_add_error(self, m_add):
m_add.side_effect = Exception
resp = self.test_client.post('/addNetwork', data=self.params_str,
content_type='application/json')
m_add.assert_called_once_with(mock.ANY)
self.assertEqual(500, resp.status_code)
@mock.patch('kuryr_kubernetes.cni.daemon.service.'
'K8sCNIRegistryPlugin.delete')
def test_delete(self, m_delete):
resp = self.test_client.post('/delNetwork', data=self.params_str,
content_type='application/json')
m_delete.assert_called_once_with(mock.ANY)
self.assertEqual(204, resp.status_code)
@mock.patch('kuryr_kubernetes.cni.daemon.service.'
'K8sCNIRegistryPlugin.delete')
def test_delete_timeout(self, m_delete):
m_delete.side_effect = exceptions.ResourceNotReady(mock.Mock())
resp = self.test_client.post('/delNetwork', data=self.params_str,
content_type='application/json')
m_delete.assert_called_once_with(mock.ANY)
self.assertEqual(204, resp.status_code)
@mock.patch('kuryr_kubernetes.cni.daemon.service.'
'K8sCNIRegistryPlugin.delete')
def test_delete_error(self, m_delete):
m_delete.side_effect = Exception
resp = self.test_client.post('/delNetwork', data=self.params_str,
content_type='application/json')
m_delete.assert_called_once_with(mock.ANY)
self.assertEqual(500, resp.status_code)

View File

@ -2,6 +2,8 @@
# of appearance. Changing the order has an impact on the overall integration # of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later. # process, which may cause wedges in the gate later.
cotyledon>=1.3.0 # Apache-2.0
Flask>=0.10,!=0.11,<1.0 # BSD
kuryr-lib>=0.5.0 # Apache-2.0 kuryr-lib>=0.5.0 # Apache-2.0
pbr!=2.1.0,>=2.0.0 # Apache-2.0 pbr!=2.1.0,>=2.0.0 # Apache-2.0
requests>=2.14.2 # Apache-2.0 requests>=2.14.2 # Apache-2.0
@ -14,5 +16,6 @@ oslo.service>=1.24.0 # Apache-2.0
oslo.utils>=3.28.0 # Apache-2.0 oslo.utils>=3.28.0 # Apache-2.0
os-vif>=1.7.0 # Apache-2.0 os-vif>=1.7.0 # Apache-2.0
pyroute2>=0.4.21 # Apache-2.0 (+ dual licensed GPL2) pyroute2>=0.4.21 # Apache-2.0 (+ dual licensed GPL2)
retrying>=1.2.3,!=1.3.0 # Apache-2.0
six>=1.9.0 # MIT six>=1.9.0 # MIT
stevedore>=1.20.0 # Apache-2.0 stevedore>=1.20.0 # Apache-2.0

View File

@ -28,6 +28,7 @@ os_vif =
console_scripts = console_scripts =
kuryr-k8s-controller = kuryr_kubernetes.cmd.eventlet.controller:start kuryr-k8s-controller = kuryr_kubernetes.cmd.eventlet.controller:start
kuryr-daemon = kuryr_kubernetes.cmd.daemon:start
kuryr-cni = kuryr_kubernetes.cmd.cni:run kuryr-cni = kuryr_kubernetes.cmd.cni:run
kuryr_kubernetes.vif_translators = kuryr_kubernetes.vif_translators =