Allow passing multiple VIFs to CNI

This commit alters the format of the annotation, set and read by
controller/CNI. Instead of a single VIF object it now holds a
dictionary, that maps interface names to VIF objects. Controller
currently only sets 'eth0' as the default VIF. The CNI is altered
accordingly to read VIFs from the mapping and add all of them to Pods
namespace.

This commit does not include any mechanism to actually request multiple
IPs/ports for a Pod, but rather lays foundation for future commits, that
would allow that.

Related bp: kuryr-kubernetes-sriov-support
Targets bp: multi-vif-pods

Change-Id: Iaef928e7ab9dc0fce8b7e8fffb7b5a1f6b5ccb17
This commit is contained in:
Kirill Zaitsev 2017-06-05 18:10:03 +03:00 committed by Danil Golov
parent 3ebbe5faba
commit 14f7c3fcc3
6 changed files with 173 additions and 81 deletions

View File

@ -127,7 +127,7 @@ class CNIRunner(object):
cni_ip = result.setdefault("ip%s" % ip.version, {})
cni_ip['ip'] = "%s/%s" % (ip, subnet.cidr.prefixlen)
if subnet.gateway:
if hasattr(subnet, 'gateway'):
cni_ip['gateway'] = str(subnet.gateway)
if subnet.routes.objects:

View File

@ -55,7 +55,7 @@ def _enable_ipv6(netns):
pyroute2.netns.setns(self_ns_fd)
def _configure_l3(vif, ifname, netns):
def _configure_l3(vif, ifname, netns, is_default_gateway):
with get_ipdb(netns).interfaces[ifname] as iface:
for subnet in vif.network.subnets.objects:
if subnet.cidr.version == 6:
@ -68,16 +68,16 @@ def _configure_l3(vif, ifname, netns):
for route in subnet.routes.objects:
routes.add(gateway=str(route.gateway),
dst=str(route.cidr)).commit()
if subnet.gateway:
if is_default_gateway and hasattr(subnet, 'gateway'):
routes.add(gateway=str(subnet.gateway),
dst='default').commit()
def connect(vif, instance_info, ifname, netns=None):
def connect(vif, instance_info, ifname, netns=None, is_default_gateway=True):
driver = _get_binding_driver(vif)
os_vif.plug(vif, instance_info)
driver.connect(vif, ifname, netns)
_configure_l3(vif, ifname, netns)
_configure_l3(vif, ifname, netns, is_default_gateway)
def disconnect(vif, instance_info, ifname, netns=None):

View File

@ -35,29 +35,46 @@ class CNIHandlerBase(k8s_base.ResourceEventHandler):
def __init__(self, cni, on_done):
self._cni = cni
self._callback = on_done
self._vif = None
self._vifs = {}
def on_present(self, pod):
vif = self._get_vif(pod)
vifs = self._get_vifs(pod)
if vif:
self.on_vif(pod, vif)
for ifname, vif in vifs.items():
self.on_vif(pod, vif, ifname)
self.maybe_callback(pod, vifs)
@abc.abstractmethod
def on_vif(self, pod, vif):
def maybe_callback(self, pod, vifs):
"""Called after all vifs have been processed
Should determine if the CNI is ready to call the callback
:param pod: dict containing Kubernetes Pod object
:param vifs: dict containing os_vif VIF objects and ifnames
"""
raise NotImplementedError()
# NOTE: all returns True for empty iterable
@abc.abstractmethod
def on_vif(self, pod, vif, ifname):
raise NotImplementedError()
def _get_vif(self, pod):
def _get_vifs(self, pod):
# TODO(ivc): same as VIFHandler._get_vif
try:
annotations = pod['metadata']['annotations']
vif_annotation = annotations[k_const.K8S_ANNOTATION_VIF]
except KeyError:
return None
vif_dict = jsonutils.loads(vif_annotation)
vif = obj_vif.vif.VIFBase.obj_from_primitive(vif_dict)
LOG.debug("Got VIF from annotation: %r", vif)
return vif
return {}
vifs_dict = jsonutils.loads(vif_annotation)
vifs_dict = {
ifname: obj_vif.vif.VIFBase.obj_from_primitive(vif)
for ifname, vif in vifs_dict.items()
}
LOG.debug("Got VIFs from annotation: %r", vifs_dict)
return vifs_dict
def _get_inst(self, pod):
return obj_vif.instance_info.InstanceInfo(
@ -69,25 +86,71 @@ class AddHandler(CNIHandlerBase):
def __init__(self, cni, on_done):
LOG.debug("AddHandler called with CNI env: %r", cni)
super(AddHandler, self).__init__(cni, on_done)
self._vif = None
def on_vif(self, pod, vif):
if not self._vif:
self._vif = vif.obj_clone()
self._vif.active = True
b_base.connect(self._vif, self._get_inst(pod),
self._cni.CNI_IFNAME, self._cni.CNI_NETNS)
def on_vif(self, pod, vif, ifname):
"""Called once for every vif of a Pod on every event.
if vif.active:
self._callback(vif)
If it is the first time we see this vif, plug it in. Otherwise check
activeness. If at least one vif in a Pod is not ready we assume the
whole Pod is not ready and wait for more Pod events.
:param pod: dict containing Kubernetes Pod object
:param vif: os_vif VIF object
:param ifname: string, name of the interfaces inside container
"""
if ifname not in self._vifs:
self._vifs[ifname] = vif
_vif = vif.obj_clone()
_vif.active = True
# set eth0's gateway as default
is_default_gateway = (ifname == self._cni.CNI_IFNAME)
b_base.connect(_vif, self._get_inst(pod),
ifname, self._cni.CNI_NETNS,
is_default_gateway)
def maybe_callback(self, pod, vifs):
"""Called after all vifs have been processed
Determines if CNI is ready to call the callback and stop watching for
more events. For AddHandler the callback should be called if there
is at least one VIF in the annotation and all the
VIFs recieved are marked active
:param pod: dict containing Kubernetes Pod object
:param vifs: dict containing os_vif VIF objects and ifnames
"""
all_vifs_active = all(vif.active for vif in vifs.values()) and vifs
if all_vifs_active:
if self._cni.CNI_IFNAME in self._vifs:
callback_vif = self._vifs[self._cni.CNI_IFNAME]
else:
callback_vif = self._vifs.values()[0]
LOG.debug("All VIFs are active, exiting. Will return %s",
callback_vif)
self._callback(callback_vif)
else:
LOG.debug("Waiting for all vifs to become active")
class DelHandler(CNIHandlerBase):
def on_vif(self, pod, vif):
def on_vif(self, pod, vif, ifname):
b_base.disconnect(vif, self._get_inst(pod),
self._cni.CNI_IFNAME, self._cni.CNI_NETNS)
self._callback(vif)
def maybe_callback(self, pod, vifs):
"""Called after all vifs have been processed
Calls callback if there was at least one vif in the Pod
:param pod: dict containing Kubernetes Pod object
:param vifs: dict containing os_vif VIF objects and ifnames
"""
if vifs:
self._callback(None)
class CNIPipeline(k_dis.EventPipeline):

View File

@ -42,3 +42,5 @@ VIF_POOL_POPULATE = '/populatePool'
VIF_POOL_FREE = '/freePool'
VIF_POOL_LIST = '/listPools'
VIF_POOL_SHOW = '/showPool'
DEFAULT_IFNAME = 'eth0'

View File

@ -33,7 +33,7 @@ class VIFHandler(k8s_base.ResourceEventHandler):
the CNI driver (that runs on 'kubelet' nodes) is responsible for providing
networking to Kubernetes pods. `VIFHandler` relies on a set of drivers
(which are responsible for managing Neutron resources) to define the VIF
object and pass it to the CNI driver in form of the Kubernetes pod
objects and pass them to the CNI driver in form of the Kubernetes pod
annotation.
"""
@ -58,38 +58,54 @@ class VIFHandler(k8s_base.ResourceEventHandler):
# where certain pods/namespaces/nodes can be managed by other
# networking solutions/CNI drivers.
return
vifs = self._get_vifs(pod)
vif = self._get_vif(pod)
if not vifs:
vifs = {}
if not vif:
project_id = self._drv_project.get_project(pod)
security_groups = self._drv_sg.get_security_groups(pod, project_id)
subnets = self._drv_subnets.get_subnets(pod, project_id)
vif = self._drv_vif_pool.request_vif(pod, project_id, subnets,
security_groups)
# NOTE(kzaitsev): There is currently no way to actually request
# multiple VIFs. However we're packing the main_vif 'eth0' in a
# dict here to facilitate future work in this area
main_vif = self._drv_vif_pool.request_vif(
pod, project_id, subnets, security_groups)
vifs[constants.DEFAULT_IFNAME] = main_vif
try:
self._set_vif(pod, vif)
self._set_vifs(pod, vifs)
except k_exc.K8sClientException as ex:
LOG.debug("Failed to set annotation: %s", ex)
# FIXME(ivc): improve granularity of K8sClient exceptions:
# only resourceVersion conflict should be ignored
self._drv_vif_pool.release_vif(pod, vif, project_id,
security_groups)
elif not vif.active:
self._drv_vif_pool.activate_vif(pod, vif)
self._set_vif(pod, vif)
for ifname, vif in vifs.items():
self._drv_for_vif(vif).release_vif(pod, vif, project_id,
security_groups)
else:
changed = False
for ifname, vif in vifs.items():
if not vif.active:
self._drv_for_vif(vif).activate_vif(pod, vif)
changed = True
if changed:
self._set_vifs(pod, vifs)
def on_deleted(self, pod):
if self._is_host_network(pod):
return
project_id = self._drv_project.get_project(pod)
security_groups = self._drv_sg.get_security_groups(pod, project_id)
vif = self._get_vif(pod)
vifs = self._get_vifs(pod)
for ifname, vif in vifs.items():
self._drv_for_vif(vif).release_vif(pod, vif, project_id,
security_groups)
if vif:
project_id = self._drv_project.get_project(pod)
security_groups = self._drv_sg.get_security_groups(pod, project_id)
self._drv_vif_pool.release_vif(pod, vif, project_id,
security_groups)
def _drv_for_vif(self, vif):
# TODO(kzaitsev): a better polymorphism is required here
return self._drv_vif_pool
@staticmethod
def _is_host_network(pod):
@ -104,29 +120,36 @@ class VIFHandler(k8s_base.ResourceEventHandler):
except KeyError:
return False
def _set_vif(self, pod, vif):
def _set_vifs(self, pod, vifs):
# TODO(ivc): extract annotation interactions
if vif is None:
LOG.debug("Removing VIF annotation: %r", vif)
if not vifs:
LOG.debug("Removing VIFs annotation: %r", vifs)
annotation = None
else:
vif.obj_reset_changes(recursive=True)
LOG.debug("Setting VIF annotation: %r", vif)
annotation = jsonutils.dumps(vif.obj_to_primitive(),
vifs_dict = {}
for ifname, vif in vifs.items():
vif.obj_reset_changes(recursive=True)
vifs_dict[ifname] = vif.obj_to_primitive()
annotation = jsonutils.dumps(vifs_dict,
sort_keys=True)
LOG.debug("Setting VIFs annotation: %r", annotation)
k8s = clients.get_kubernetes_client()
k8s.annotate(pod['metadata']['selfLink'],
{constants.K8S_ANNOTATION_VIF: annotation},
resource_version=pod['metadata']['resourceVersion'])
def _get_vif(self, pod):
def _get_vifs(self, pod):
# TODO(ivc): same as '_set_vif'
try:
annotations = pod['metadata']['annotations']
vif_annotation = annotations[constants.K8S_ANNOTATION_VIF]
except KeyError:
return None
vif_dict = jsonutils.loads(vif_annotation)
vif = obj_vif.vif.VIFBase.obj_from_primitive(vif_dict)
LOG.debug("Got VIF from annotation: %r", vif)
return vif
return {}
vif_annotation = jsonutils.loads(vif_annotation)
vifs = {
ifname: obj_vif.vif.VIFBase.obj_from_primitive(vif_obj) for
ifname, vif_obj in vif_annotation.items()
}
LOG.debug("Got VIFs from annotation: %r", vifs)
return vifs

View File

@ -33,6 +33,7 @@ class TestVIFHandler(test_base.TestCase):
self._vif = mock.Mock()
self._vif.active = True
self._vif_serialized = mock.sentinel.vif_serialized
self._vifs = {k_const.DEFAULT_IFNAME: self._vif}
self._pod_version = mock.sentinel.pod_version
self._pod_link = mock.sentinel.pod_link
@ -55,25 +56,28 @@ class TestVIFHandler(test_base.TestCase):
self._get_project = self._handler._drv_project.get_project
self._get_subnets = self._handler._drv_subnets.get_subnets
self._get_security_groups = self._handler._drv_sg.get_security_groups
self._set_vif_driver = self._handler._drv_vif_pool.set_vif_driver
self._set_vifs_driver = self._handler._drv_vif_pool.set_vif_driver
self._request_vif = self._handler._drv_vif_pool.request_vif
self._release_vif = self._handler._drv_vif_pool.release_vif
self._activate_vif = self._handler._drv_vif_pool.activate_vif
self._get_vif = self._handler._get_vif
self._set_vif = self._handler._set_vif
self._get_vifs = self._handler._get_vifs
self._set_vifs = self._handler._set_vifs
self._is_host_network = self._handler._is_host_network
self._is_pending_node = self._handler._is_pending_node
self._request_vif.return_value = self._vif
self._get_vif.return_value = self._vif
self._get_vifs.return_value = self._vifs
self._is_host_network.return_value = False
self._is_pending_node.return_value = True
self._get_project.return_value = self._project_id
self._get_subnets.return_value = self._subnets
self._get_security_groups.return_value = self._security_groups
self._set_vif_driver.return_value = mock.Mock(
self._set_vifs_driver.return_value = mock.Mock(
spec=drivers.PodVIFDriver)
self._handler._drv_for_vif = h_vif.VIFHandler._drv_for_vif.__get__(
self._handler)
@mock.patch.object(drivers.VIFPoolDriver, 'set_vif_driver')
@mock.patch.object(drivers.VIFPoolDriver, 'get_instance')
@mock.patch.object(drivers.PodVIFDriver, 'get_instance')
@ -82,7 +86,7 @@ class TestVIFHandler(test_base.TestCase):
@mock.patch.object(drivers.PodProjectDriver, 'get_instance')
def test_init(self, m_get_project_driver, m_get_subnets_driver,
m_get_sg_driver, m_get_vif_driver, m_get_vif_pool_driver,
m_set_vif_driver):
m_set_vifs_driver):
project_driver = mock.sentinel.project_driver
subnets_driver = mock.sentinel.subnets_driver
sg_driver = mock.sentinel.sg_driver
@ -131,62 +135,62 @@ class TestVIFHandler(test_base.TestCase):
def test_on_present(self):
h_vif.VIFHandler.on_present(self._handler, self._pod)
self._get_vif.assert_called_once_with(self._pod)
self._get_vifs.assert_called_once_with(self._pod)
self._request_vif.assert_not_called()
self._activate_vif.assert_not_called()
self._set_vif.assert_not_called()
self._set_vifs.assert_not_called()
def test_on_present_host_network(self):
self._is_host_network.return_value = True
h_vif.VIFHandler.on_present(self._handler, self._pod)
self._get_vif.assert_not_called()
self._get_vifs.assert_not_called()
self._request_vif.assert_not_called()
self._activate_vif.assert_not_called()
self._set_vif.assert_not_called()
self._set_vifs.assert_not_called()
def test_on_present_not_pending(self):
self._is_pending_node.return_value = False
h_vif.VIFHandler.on_present(self._handler, self._pod)
self._get_vif.assert_not_called()
self._get_vifs.assert_not_called()
self._request_vif.assert_not_called()
self._activate_vif.assert_not_called()
self._set_vif.assert_not_called()
self._set_vifs.assert_not_called()
def test_on_present_activate(self):
self._vif.active = False
h_vif.VIFHandler.on_present(self._handler, self._pod)
self._get_vif.assert_called_once_with(self._pod)
self._get_vifs.assert_called_once_with(self._pod)
self._activate_vif.assert_called_once_with(self._pod, self._vif)
self._set_vif.assert_called_once_with(self._pod, self._vif)
self._set_vifs.assert_called_once_with(self._pod, self._vifs)
self._request_vif.assert_not_called()
def test_on_present_create(self):
self._get_vif.return_value = None
self._get_vifs.return_value = {}
h_vif.VIFHandler.on_present(self._handler, self._pod)
self._get_vif.assert_called_once_with(self._pod)
self._get_vifs.assert_called_once_with(self._pod)
self._request_vif.assert_called_once_with(
self._pod, self._project_id, self._subnets, self._security_groups)
self._set_vif.assert_called_once_with(self._pod, self._vif)
self._set_vifs.assert_called_once_with(self._pod, self._vifs)
self._activate_vif.assert_not_called()
def test_on_present_rollback(self):
self._get_vif.return_value = None
self._set_vif.side_effect = k_exc.K8sClientException
self._get_vifs.return_value = {}
self._set_vifs.side_effect = k_exc.K8sClientException
h_vif.VIFHandler.on_present(self._handler, self._pod)
self._get_vif.assert_called_once_with(self._pod)
self._get_vifs.assert_called_once_with(self._pod)
self._request_vif.assert_called_once_with(
self._pod, self._project_id, self._subnets, self._security_groups)
self._set_vif.assert_called_once_with(self._pod, self._vif)
self._set_vifs.assert_called_once_with(self._pod, self._vifs)
self._release_vif.assert_called_once_with(self._pod, self._vif,
self._project_id,
self._security_groups)
@ -195,7 +199,7 @@ class TestVIFHandler(test_base.TestCase):
def test_on_deleted(self):
h_vif.VIFHandler.on_deleted(self._handler, self._pod)
self._get_vif.assert_called_once_with(self._pod)
self._get_vifs.assert_called_once_with(self._pod)
self._release_vif.assert_called_once_with(self._pod, self._vif,
self._project_id,
self._security_groups)
@ -205,13 +209,13 @@ class TestVIFHandler(test_base.TestCase):
h_vif.VIFHandler.on_deleted(self._handler, self._pod)
self._get_vif.assert_not_called()
self._get_vifs.assert_not_called()
self._release_vif.assert_not_called()
def test_on_deleted_no_annotation(self):
self._get_vif.return_value = None
self._get_vifs.return_value = {}
h_vif.VIFHandler.on_deleted(self._handler, self._pod)
self._get_vif.assert_called_once_with(self._pod)
self._get_vifs.assert_called_once_with(self._pod)
self._release_vif.assert_not_called()