Experimental CNI & VIFBridge binding
This patch provides an experimental CNI driver. It's primary purpose is to enable development of other components (e.g. functional tests, service/LBaaSv2 support). It is expected to be replaced with daemon to configure VIF and connect it to the pods and a small lightweight client to serve as CNI driver called by Kubernetes. NOTE: unit tests are not provided as part of this patch as it is yet unclear what parts of it will be reused in daemon-based implementation. Change-Id: Iacc8439dd3aee910d542e48ed013d6d3f354786e Partially-Implements: blueprint kuryr-k8s-integration
This commit is contained in:
parent
1b1e9eb5e8
commit
fa03953aff
@ -73,6 +73,9 @@ function configure_kuryr {
|
||||
# "$(get_distutils_data_path)/libexec/kuryr"
|
||||
|
||||
iniset "$KURYR_CONFIG" kubernetes api_root "$KURYR_K8S_API_URL"
|
||||
# REVISIT(ivc): 'use_stderr' is required for current CNI driver. Once a
|
||||
# daemon-based CNI driver is implemented, this could be removed.
|
||||
iniset "$KURYR_CONFIG" DEFAULT use_stderr true
|
||||
|
||||
create_kuryr_cache_dir
|
||||
|
||||
@ -83,6 +86,12 @@ function configure_kuryr {
|
||||
fi
|
||||
}
|
||||
|
||||
function install_kuryr_cni {
|
||||
local kuryr_cni_bin=$(which kuryr-cni)
|
||||
sudo install -o "$STACK_USER" -m 0555 -D \
|
||||
"$kuryr_cni_bin" "${CNI_BIN_DIR}/kuryr-cni"
|
||||
}
|
||||
|
||||
function configure_neutron_defaults {
|
||||
local project_id=$(get_or_create_project \
|
||||
"$KURYR_NEUTRON_DEFAULT_PROJECT" default)
|
||||
@ -349,6 +358,9 @@ function run_k8s_kubelet {
|
||||
if is_service_enabled kuryr-kubernetes; then
|
||||
if [[ "$1" == "stack" && "$2" == "install" ]]; then
|
||||
setup_develop "$KURYR_HOME"
|
||||
if is_service_enabled kubelet; then
|
||||
install_kuryr_cni
|
||||
fi
|
||||
|
||||
elif [[ "$1" == "stack" && "$2" == "post-config" ]]; then
|
||||
create_kuryr_account
|
||||
|
@ -1,5 +1,7 @@
|
||||
{
|
||||
"cniVersion": "0.3.0",
|
||||
"name": "kuryr",
|
||||
"type": "kuryr"
|
||||
"type": "kuryr-cni",
|
||||
"kuryr_conf": "/etc/kuryr/kuryr.conf",
|
||||
"debug": true
|
||||
}
|
||||
|
@ -32,6 +32,14 @@ def get_kubernetes_client():
|
||||
|
||||
|
||||
def setup_clients():
|
||||
setup_neutron_client()
|
||||
setup_kubernetes_client()
|
||||
|
||||
|
||||
def setup_neutron_client():
|
||||
_clients[_NEUTRON_CLIENT] = utils.get_neutron_client()
|
||||
|
||||
|
||||
def setup_kubernetes_client():
|
||||
_clients[_KUBERNETES_CLIENT] = k8s_client.K8sClient(
|
||||
config.CONF.kubernetes.api_root)
|
||||
|
22
kuryr_kubernetes/cmd/cni.py
Normal file
22
kuryr_kubernetes/cmd/cni.py
Normal file
@ -0,0 +1,22 @@
|
||||
# Copyright (c) 2016 Mirantis, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from kuryr_kubernetes.cni import main
|
||||
|
||||
|
||||
run = main.run
|
||||
|
||||
if __name__ == '__main__':
|
||||
run()
|
0
kuryr_kubernetes/cni/__init__.py
Normal file
0
kuryr_kubernetes/cni/__init__.py
Normal file
136
kuryr_kubernetes/cni/api.py
Normal file
136
kuryr_kubernetes/cni/api.py
Normal file
@ -0,0 +1,136 @@
|
||||
# Copyright (c) 2016 Mirantis, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import six
|
||||
import traceback
|
||||
|
||||
from kuryr.lib._i18n import _LE
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from kuryr_kubernetes import constants as k_const
|
||||
from kuryr_kubernetes import exceptions as k_exc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
_CNI_TIMEOUT = 60
|
||||
|
||||
|
||||
class CNIConfig(dict):
|
||||
def __init__(self, cfg):
|
||||
super(CNIConfig, self).__init__(cfg)
|
||||
|
||||
for k, v in six.iteritems(self):
|
||||
if not k.startswith('_'):
|
||||
setattr(self, k, v)
|
||||
|
||||
|
||||
class CNIArgs(object):
|
||||
def __init__(self, value):
|
||||
for item in value.split(';'):
|
||||
k, v = item.split('=', 1)
|
||||
if not k.startswith('_'):
|
||||
setattr(self, k, v)
|
||||
|
||||
|
||||
class CNIParameters(object):
|
||||
def __init__(self, env, cfg):
|
||||
for k, v in six.iteritems(env):
|
||||
if k.startswith('CNI_'):
|
||||
setattr(self, k, v)
|
||||
self.config = CNIConfig(cfg)
|
||||
self.args = CNIArgs(self.CNI_ARGS)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CNIPlugin(object):
|
||||
|
||||
@abc.abstractmethod
|
||||
def add(self, params):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete(self, params):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class CNIRunner(object):
|
||||
|
||||
# TODO(ivc): extend SUPPORTED_VERSIONS and format output based on
|
||||
# requested params.CNI_VERSION and/or params.config.cniVersion
|
||||
VERSION = '0.3.0'
|
||||
SUPPORTED_VERSIONS = ['0.3.0']
|
||||
|
||||
def __init__(self, plugin):
|
||||
self._plugin = plugin
|
||||
|
||||
def run(self, env, fin, fout):
|
||||
try:
|
||||
params = CNIParameters(env, jsonutils.load(fin))
|
||||
|
||||
if params.CNI_COMMAND == 'ADD':
|
||||
vif = self._plugin.add(params)
|
||||
self._write_vif(fout, vif)
|
||||
elif params.CNI_COMMAND == 'DEL':
|
||||
self._plugin.delete(params)
|
||||
elif params.CNI_COMMAND == 'VERSION':
|
||||
self._write_version(fout)
|
||||
else:
|
||||
raise k_exc.CNIError(_LE("unknown CNI_COMMAND: %s")
|
||||
% params.CNI_COMMAND)
|
||||
except Exception as ex:
|
||||
# LOG.exception
|
||||
self._write_exception(fout, str(ex))
|
||||
return 1
|
||||
|
||||
def _write_dict(self, fout, dct):
|
||||
output = {'cniVersion': self.VERSION}
|
||||
output.update(dct)
|
||||
LOG.debug("CNI output: %s", output)
|
||||
jsonutils.dump(output, fout, sort_keys=True)
|
||||
|
||||
def _write_exception(self, fout, msg):
|
||||
self._write_dict(fout, {
|
||||
'msg': msg,
|
||||
'code': k_const.CNI_EXCEPTION_CODE,
|
||||
'details': traceback.format_exc(),
|
||||
})
|
||||
|
||||
def _write_version(self, fout):
|
||||
self._write_dict(fout, {'supportedVersions': self.SUPPORTED_VERSIONS})
|
||||
|
||||
def _write_vif(self, fout, vif):
|
||||
result = {}
|
||||
nameservers = []
|
||||
|
||||
for subnet in vif.network.subnets.objects:
|
||||
nameservers.extend(subnet.dns)
|
||||
|
||||
ip = subnet.ips.objects[0].address
|
||||
cni_ip = result.setdefault("ip%s" % ip.version, {})
|
||||
cni_ip['ip'] = "%s/%s" % (ip, subnet.cidr.prefixlen)
|
||||
|
||||
if subnet.gateway:
|
||||
cni_ip['gateway'] = str(subnet.gateway)
|
||||
|
||||
if subnet.routes.objects:
|
||||
cni_ip['routes'] = [
|
||||
{'dst': str(route.cidr), 'gw': str(route.gateway)}
|
||||
for route in subnet.routes.objects]
|
||||
|
||||
if nameservers:
|
||||
result['dns']['nameservers'] = nameservers
|
||||
|
||||
self._write_dict(fout, result)
|
0
kuryr_kubernetes/cni/binding/__init__.py
Normal file
0
kuryr_kubernetes/cni/binding/__init__.py
Normal file
72
kuryr_kubernetes/cni/binding/base.py
Normal file
72
kuryr_kubernetes/cni/binding/base.py
Normal file
@ -0,0 +1,72 @@
|
||||
# Copyright (c) 2016 Mirantis, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os_vif
|
||||
from oslo_log import log as logging
|
||||
import pyroute2
|
||||
from stevedore import driver as stv_driver
|
||||
|
||||
_BINDING_NAMESPACE = 'kuryr_kubernetes.cni.binding'
|
||||
_IPDB = {}
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_binding_driver(vif):
|
||||
mgr = stv_driver.DriverManager(namespace=_BINDING_NAMESPACE,
|
||||
name=type(vif).__name__,
|
||||
invoke_on_load=True)
|
||||
return mgr.driver
|
||||
|
||||
|
||||
def get_ipdb(netns=None):
|
||||
try:
|
||||
return _IPDB[netns]
|
||||
except KeyError:
|
||||
if netns:
|
||||
ipdb = pyroute2.IPDB(nl=pyroute2.NetNS(netns))
|
||||
else:
|
||||
ipdb = pyroute2.IPDB()
|
||||
_IPDB[netns] = ipdb
|
||||
return ipdb
|
||||
|
||||
|
||||
def _configure_l3(vif, ifname, netns):
|
||||
with get_ipdb(netns).interfaces[ifname] as iface:
|
||||
for subnet in vif.network.subnets.objects:
|
||||
for fip in subnet.ips.objects:
|
||||
iface.add_ip(str(fip.address), mask=str(subnet.cidr.netmask))
|
||||
|
||||
routes = get_ipdb(netns).routes
|
||||
for subnet in vif.network.subnets.objects:
|
||||
for route in subnet.routes.objects:
|
||||
routes.add(gateway=str(route.gateway),
|
||||
dst=str(route.cidr)).commit()
|
||||
if subnet.gateway:
|
||||
routes.add(gateway=str(subnet.gateway),
|
||||
dst='default').commit()
|
||||
|
||||
|
||||
def connect(vif, instance_info, ifname, netns=None):
|
||||
driver = _get_binding_driver(vif)
|
||||
os_vif.plug(vif, instance_info)
|
||||
driver.connect(vif, ifname, netns)
|
||||
_configure_l3(vif, ifname, netns)
|
||||
|
||||
|
||||
def disconnect(vif, instance_info, ifname, netns=None):
|
||||
driver = _get_binding_driver(vif)
|
||||
driver.disconnect(vif, ifname, netns)
|
||||
os_vif.unplug(vif, instance_info)
|
49
kuryr_kubernetes/cni/binding/bridge.py
Normal file
49
kuryr_kubernetes/cni/binding/bridge.py
Normal file
@ -0,0 +1,49 @@
|
||||
# Copyright (c) 2016 Mirantis, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
|
||||
from kuryr_kubernetes.cni.binding import base as b_base
|
||||
|
||||
|
||||
class BridgeDriver(object):
|
||||
def connect(self, vif, ifname, netns):
|
||||
host_ifname = vif.vif_name
|
||||
bridge_name = vif.bridge_name
|
||||
|
||||
c_ipdb = b_base.get_ipdb(netns)
|
||||
h_ipdb = b_base.get_ipdb()
|
||||
|
||||
with c_ipdb.create(ifname=ifname, peer=host_ifname,
|
||||
kind='veth') as c_iface:
|
||||
c_iface.mtu = vif.network.mtu
|
||||
c_iface.address = str(vif.address)
|
||||
c_iface.up()
|
||||
|
||||
if netns:
|
||||
with c_ipdb.interfaces[host_ifname] as h_iface:
|
||||
h_iface.net_ns_pid = os.getpid()
|
||||
|
||||
with h_ipdb.interfaces[host_ifname] as h_iface:
|
||||
h_iface.mtu = vif.network.mtu
|
||||
h_iface.up()
|
||||
|
||||
with h_ipdb.interfaces[bridge_name] as h_br:
|
||||
h_br.add_port(host_ifname)
|
||||
|
||||
def disconnect(self, vif, ifname, netns):
|
||||
# NOTE(ivc): veth pair is destroyed automatically along with the
|
||||
# container namespace
|
||||
pass
|
98
kuryr_kubernetes/cni/handlers.py
Normal file
98
kuryr_kubernetes/cni/handlers.py
Normal file
@ -0,0 +1,98 @@
|
||||
# Copyright (c) 2016 Mirantis, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import six
|
||||
|
||||
from os_vif import objects as obj_vif
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from kuryr_kubernetes.cni.binding import base as b_base
|
||||
from kuryr_kubernetes import constants as k_const
|
||||
from kuryr_kubernetes.handlers import dispatch as k_dis
|
||||
from kuryr_kubernetes.handlers import k8s_base
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CNIHandlerBase(k8s_base.ResourceEventHandler):
|
||||
OBJECT_KIND = k_const.K8S_OBJ_POD
|
||||
|
||||
def __init__(self, cni, on_done):
|
||||
self._cni = cni
|
||||
self._callback = on_done
|
||||
self._vif = None
|
||||
|
||||
def on_present(self, pod):
|
||||
vif = self._get_vif(pod)
|
||||
|
||||
if vif:
|
||||
self.on_vif(pod, vif)
|
||||
|
||||
@abc.abstractmethod
|
||||
def on_vif(self, pod, vif):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _get_vif(self, pod):
|
||||
# TODO(ivc): same as VIFHandler._get_vif
|
||||
try:
|
||||
annotations = pod['metadata']['annotations']
|
||||
vif_annotation = annotations[k_const.K8S_ANNOTATION_VIF]
|
||||
except KeyError:
|
||||
return None
|
||||
vif_dict = jsonutils.loads(vif_annotation)
|
||||
vif = obj_vif.vif.VIFBase.obj_from_primitive(vif_dict)
|
||||
LOG.debug("Got VIF from annotation: %r", vif)
|
||||
return vif
|
||||
|
||||
def _get_inst(self, pod):
|
||||
return obj_vif.instance_info.InstanceInfo(
|
||||
uuid=pod['metadata']['uid'], name=pod['metadata']['name'])
|
||||
|
||||
|
||||
class AddHandler(CNIHandlerBase):
|
||||
|
||||
def __init__(self, cni, on_done):
|
||||
super(AddHandler, self).__init__(cni, on_done)
|
||||
self._vif = None
|
||||
|
||||
def on_vif(self, pod, vif):
|
||||
if not self._vif:
|
||||
self._vif = vif.obj_clone()
|
||||
self._vif.active = True
|
||||
b_base.connect(self._vif, self._get_inst(pod),
|
||||
self._cni.CNI_IFNAME, self._cni.CNI_NETNS)
|
||||
|
||||
if vif.active:
|
||||
self._callback(vif)
|
||||
|
||||
|
||||
class DelHandler(CNIHandlerBase):
|
||||
|
||||
def on_vif(self, pod, vif):
|
||||
b_base.disconnect(vif, self._get_inst(pod),
|
||||
self._cni.CNI_IFNAME, self._cni.CNI_NETNS)
|
||||
self._callback(vif)
|
||||
|
||||
|
||||
class CNIPipeline(k_dis.EventPipeline):
|
||||
|
||||
def _wrap_dispatcher(self, dispatcher):
|
||||
return dispatcher
|
||||
|
||||
def _wrap_consumer(self, consumer):
|
||||
return consumer
|
93
kuryr_kubernetes/cni/main.py
Normal file
93
kuryr_kubernetes/cni/main.py
Normal file
@ -0,0 +1,93 @@
|
||||
# Copyright (c) 2016 Mirantis, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
|
||||
import os_vif
|
||||
from oslo_log import log as logging
|
||||
|
||||
from kuryr_kubernetes import clients
|
||||
from kuryr_kubernetes.cni import api as cni_api
|
||||
from kuryr_kubernetes.cni import handlers as h_cni
|
||||
from kuryr_kubernetes import config
|
||||
from kuryr_kubernetes import constants as k_const
|
||||
from kuryr_kubernetes import watcher as k_watcher
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
_CNI_TIMEOUT = 180
|
||||
|
||||
|
||||
class K8sCNIPlugin(cni_api.CNIPlugin):
|
||||
|
||||
def add(self, params):
|
||||
self._setup(params)
|
||||
self._pipeline.register(h_cni.AddHandler(params, self._done))
|
||||
self._watcher.start()
|
||||
return self._vif
|
||||
|
||||
def delete(self, params):
|
||||
self._setup(params)
|
||||
self._pipeline.register(h_cni.DelHandler(params, self._done))
|
||||
self._watcher.start()
|
||||
|
||||
def _done(self, vif):
|
||||
self._vif = vif
|
||||
self._watcher.stop()
|
||||
|
||||
def _setup(self, params):
|
||||
args = ['--config-file', params.config.kuryr_conf]
|
||||
|
||||
try:
|
||||
if params.config.debug:
|
||||
args.append('-d')
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
config.init(args)
|
||||
config.setup_logging()
|
||||
os_vif.initialize()
|
||||
clients.setup_kubernetes_client()
|
||||
self._pipeline = h_cni.CNIPipeline()
|
||||
self._watcher = k_watcher.Watcher(self._pipeline)
|
||||
self._watcher.add(
|
||||
"%(base)s/namespaces/%(namespace)s/pods"
|
||||
"?fieldSelector=metadata.name=%(pod)s" % {
|
||||
'base': k_const.K8S_API_BASE,
|
||||
'namespace': params.args.K8S_POD_NAMESPACE,
|
||||
'pod': params.args.K8S_POD_NAME})
|
||||
|
||||
|
||||
def run():
|
||||
# REVISIT(ivc): current CNI implementation provided by this package is
|
||||
# experimental and its primary purpose is to enable development of other
|
||||
# components (e.g. functional tests, service/LBaaSv2 support)
|
||||
runner = cni_api.CNIRunner(K8sCNIPlugin())
|
||||
|
||||
def _timeout(signum, frame):
|
||||
runner._write_dict(sys.stdout, {
|
||||
'msg': 'timeout',
|
||||
'code': k_const.CNI_TIMEOUT_CODE,
|
||||
})
|
||||
LOG.debug('timed out')
|
||||
sys.exit(1)
|
||||
|
||||
signal.signal(signal.SIGALRM, _timeout)
|
||||
signal.alarm(_CNI_TIMEOUT)
|
||||
status = runner.run(os.environ, sys.stdin, sys.stdout)
|
||||
LOG.debug("Exiting with status %s", status)
|
||||
if status:
|
||||
sys.exit(status)
|
@ -25,3 +25,6 @@ K8S_POD_STATUS_PENDING = 'Pending'
|
||||
|
||||
K8S_ANNOTATION_PREFIX = 'openstack.org/kuryr'
|
||||
K8S_ANNOTATION_VIF = K8S_ANNOTATION_PREFIX + '-vif'
|
||||
|
||||
CNI_EXCEPTION_CODE = 100
|
||||
CNI_TIMEOUT_CODE = 200
|
||||
|
@ -30,5 +30,9 @@ class ResourceNotReady(Exception):
|
||||
% resource)
|
||||
|
||||
|
||||
class CNIError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def format_msg(exception):
|
||||
return "%s: %s" % (exception.__class__.__name__, exception)
|
||||
|
@ -19,8 +19,6 @@ import six
|
||||
from oslo_log import log as logging
|
||||
|
||||
from kuryr_kubernetes.handlers import base as h_base
|
||||
from kuryr_kubernetes.handlers import logging as h_log
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -84,6 +82,7 @@ class EventConsumer(h_base.EventHandler):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class EventPipeline(h_base.EventHandler):
|
||||
"""Serves as an entry-point for event handling.
|
||||
|
||||
@ -110,8 +109,10 @@ class EventPipeline(h_base.EventHandler):
|
||||
def __call__(self, event):
|
||||
self._handler(event)
|
||||
|
||||
@abc.abstractmethod
|
||||
def _wrap_dispatcher(self, dispatcher):
|
||||
return h_log.LogExceptions(dispatcher)
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def _wrap_consumer(self, consumer):
|
||||
return h_log.LogExceptions(consumer)
|
||||
raise NotImplementedError()
|
||||
|
@ -60,22 +60,30 @@ class TestDispatch(test_base.TestCase):
|
||||
handler.assert_called_once_with(events[key])
|
||||
|
||||
|
||||
class _TestEventPipeline(h_dis.EventPipeline):
|
||||
def _wrap_dispatcher(self, dispatcher):
|
||||
pass
|
||||
|
||||
def _wrap_consumer(self, consumer):
|
||||
pass
|
||||
|
||||
|
||||
class TestEventPipeline(test_base.TestCase):
|
||||
@mock.patch.object(h_dis.EventPipeline, '_wrap_dispatcher')
|
||||
@mock.patch.object(_TestEventPipeline, '_wrap_dispatcher')
|
||||
@mock.patch('kuryr_kubernetes.handlers.dispatch.Dispatcher')
|
||||
def test_init(self, m_dispatcher_type, m_wrapper):
|
||||
m_dispatcher_type.return_value = mock.sentinel.dispatcher
|
||||
m_wrapper.return_value = mock.sentinel.handler
|
||||
|
||||
pipeline = h_dis.EventPipeline()
|
||||
pipeline = _TestEventPipeline()
|
||||
|
||||
m_dispatcher_type.assert_called_once()
|
||||
m_wrapper.assert_called_once_with(mock.sentinel.dispatcher)
|
||||
self.assertEqual(mock.sentinel.dispatcher, pipeline._dispatcher)
|
||||
self.assertEqual(mock.sentinel.handler, pipeline._handler)
|
||||
|
||||
@mock.patch.object(h_dis.EventPipeline, '_wrap_consumer')
|
||||
@mock.patch.object(h_dis.EventPipeline, '__init__')
|
||||
@mock.patch.object(_TestEventPipeline, '_wrap_consumer')
|
||||
@mock.patch.object(_TestEventPipeline, '__init__')
|
||||
def test_register(self, m_init, m_wrap_consumer):
|
||||
consumes = {mock.sentinel.key_fn1: mock.sentinel.key1,
|
||||
mock.sentinel.key_fn2: mock.sentinel.key2,
|
||||
@ -85,7 +93,7 @@ class TestEventPipeline(test_base.TestCase):
|
||||
m_consumer.consumes = consumes
|
||||
m_wrap_consumer.return_value = mock.sentinel.handler
|
||||
m_init.return_value = None
|
||||
pipeline = h_dis.EventPipeline()
|
||||
pipeline = _TestEventPipeline()
|
||||
pipeline._dispatcher = m_dispatcher
|
||||
|
||||
pipeline.register(m_consumer)
|
||||
@ -95,11 +103,11 @@ class TestEventPipeline(test_base.TestCase):
|
||||
mock.call(key_fn, key, mock.sentinel.handler)
|
||||
for key_fn, key in consumes.items()], any_order=True)
|
||||
|
||||
@mock.patch.object(h_dis.EventPipeline, '__init__')
|
||||
@mock.patch.object(_TestEventPipeline, '__init__')
|
||||
def test_call(self, m_init):
|
||||
m_init.return_value = None
|
||||
m_handler = mock.Mock()
|
||||
pipeline = h_dis.EventPipeline()
|
||||
pipeline = _TestEventPipeline()
|
||||
pipeline._handler = m_handler
|
||||
|
||||
pipeline(mock.sentinel.event)
|
||||
|
@ -2,7 +2,7 @@
|
||||
# of appearance. Changing the order has an impact on the overall integration
|
||||
# process, which may cause wedges in the gate later.
|
||||
|
||||
-e git://github.com/openstack/kuryr.git@master#egg=kuryr_lib
|
||||
kuryr-lib>=0.1.0 # Apache-2.0
|
||||
pbr>=1.6 # Apache-2.0
|
||||
requests>=2.10.0 # Apache-2.0
|
||||
eventlet!=0.18.3,>=0.18.2 # MIT
|
||||
|
@ -25,10 +25,14 @@ oslo.config.opts =
|
||||
|
||||
console_scripts =
|
||||
kuryr-k8s-controller = kuryr_kubernetes.cmd.eventlet.controller:start
|
||||
kuryr-cni = kuryr_kubernetes.cmd.cni:run
|
||||
|
||||
kuryr_kubernetes.vif_translators =
|
||||
ovs = kuryr_kubernetes.os_vif_util:neutron_to_osvif_vif_ovs
|
||||
|
||||
kuryr_kubernetes.cni.binding =
|
||||
VIFBridge = kuryr_kubernetes.cni.binding.bridge:BridgeDriver
|
||||
|
||||
kuryr_kubernetes.controller.drivers.pod_project =
|
||||
default = kuryr_kubernetes.controller.drivers.default_project:DefaultPodProjectDriver
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user