Browse Source

Merge "Pod annotations to KuryrPort CRD."

changes/54/720454/18
Zuul 5 days ago
committed by Gerrit Code Review
parent
commit
cecd86b282
26 changed files with 1548 additions and 586 deletions
  1. +3
    -3
      .zuul.d/octavia.yaml
  2. +2
    -2
      .zuul.d/sdn.yaml
  3. +1
    -0
      devstack/lib/kuryr_kubernetes
  4. +1
    -0
      devstack/plugin.sh
  5. +1
    -1
      devstack/settings
  6. +48
    -0
      kubernetes_crds/kuryr_crds/kuryrport.yaml
  7. +33
    -31
      kuryr_kubernetes/cni/binding/dpdk.py
  8. +34
    -44
      kuryr_kubernetes/cni/daemon/service.py
  9. +24
    -19
      kuryr_kubernetes/cni/handlers.py
  10. +62
    -65
      kuryr_kubernetes/cni/plugins/k8s_cni_registry.py
  11. +10
    -1
      kuryr_kubernetes/constants.py
  12. +3
    -3
      kuryr_kubernetes/controller/drivers/neutron_vif.py
  13. +26
    -20
      kuryr_kubernetes/controller/drivers/utils.py
  14. +3
    -11
      kuryr_kubernetes/controller/drivers/vif_pool.py
  15. +267
    -0
      kuryr_kubernetes/controller/handlers/kuryrport.py
  16. +11
    -5
      kuryr_kubernetes/controller/handlers/pod_label.py
  17. +84
    -166
      kuryr_kubernetes/controller/handlers/vif.py
  18. +1
    -1
      kuryr_kubernetes/k8s_client.py
  19. +11
    -10
      kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py
  20. +3
    -21
      kuryr_kubernetes/tests/unit/controller/drivers/test_vif_pool.py
  21. +751
    -0
      kuryr_kubernetes/tests/unit/controller/handlers/test_kuryrport.py
  22. +9
    -9
      kuryr_kubernetes/tests/unit/controller/handlers/test_pod_label.py
  23. +144
    -169
      kuryr_kubernetes/tests/unit/controller/handlers/test_vif.py
  24. +14
    -5
      kuryr_kubernetes/utils.py
  25. +1
    -0
      setup.cfg
  26. +1
    -0
      tools/gate/copy_k8s_logs.sh

+ 3
- 3
.zuul.d/octavia.yaml View File

@@ -99,7 +99,7 @@
vars:
devstack_localrc:
DOCKER_CGROUP_DRIVER: "systemd"
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport
KURYR_SG_DRIVER: policy
KURYR_SUBNET_DRIVER: namespace
devstack_services:
@@ -120,7 +120,7 @@
vars:
devstack_localrc:
KURYR_SUBNET_DRIVER: namespace
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport
KURYR_SG_DRIVER: policy
KURYR_USE_PORT_POOLS: true
KURYR_POD_VIF_DRIVER: neutron-vif
@@ -134,7 +134,7 @@
parent: kuryr-kubernetes-tempest-containerized
vars:
devstack_localrc:
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport
KURYR_SG_DRIVER: policy
KURYR_SUBNET_DRIVER: namespace



+ 2
- 2
.zuul.d/sdn.yaml View File

@@ -98,7 +98,7 @@
KURYR_LB_ALGORITHM: SOURCE_IP_PORT
KURYR_SUBNET_DRIVER: namespace
KURYR_SG_DRIVER: policy
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport
voting: false

- job:
@@ -144,7 +144,7 @@
KURYR_ENFORCE_SG_RULES: false
KURYR_LB_ALGORITHM: SOURCE_IP_PORT
KURYR_HYPERKUBE_VERSION: v1.16.0
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport
KURYR_SG_DRIVER: policy
KURYR_SUBNET_DRIVER: namespace
KURYR_K8S_CONTAINERIZED_DEPLOYMENT: true


+ 1
- 0
devstack/lib/kuryr_kubernetes View File

@@ -453,6 +453,7 @@ rules:
- kuryrnetworks
- kuryrnetpolicies
- kuryrloadbalancers
- kuryrports
- apiGroups: ["networking.k8s.io"]
resources:
- networkpolicies


+ 1
- 0
devstack/plugin.sh View File

@@ -973,6 +973,7 @@ function update_tempest_conf_file {
fi
iniset $TEMPEST_CONFIG kuryr_kubernetes validate_crd True
iniset $TEMPEST_CONFIG kuryr_kubernetes kuryrnetworks True
iniset $TEMPEST_CONFIG kuryr_kubernetes kuryrports True
}

source $DEST/kuryr-kubernetes/devstack/lib/kuryr_kubernetes


+ 1
- 1
devstack/settings View File

@@ -43,7 +43,7 @@ KURYR_K8S_API_LB_PORT=${KURYR_K8S_API_LB_PORT:-443}
KURYR_PORT_DEBUG=${KURYR_PORT_DEBUG:-True}
KURYR_SUBNET_DRIVER=${KURYR_SUBNET_DRIVER:-default}
KURYR_SG_DRIVER=${KURYR_SG_DRIVER:-default}
KURYR_ENABLED_HANDLERS=${KURYR_ENABLED_HANDLERS:-vif,lb,lbaasspec}
KURYR_ENABLED_HANDLERS=${KURYR_ENABLED_HANDLERS:-vif,lb,lbaasspec,kuryrport}

# OpenShift
OPENSHIFT_BINARY_VERSION=${OPENSHIFT_BINARY_VERSION:-v3.11.0}


+ 48
- 0
kubernetes_crds/kuryr_crds/kuryrport.yaml View File

@@ -0,0 +1,48 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: kuryrports.openstack.org
spec:
group: openstack.org
scope: Namespaced
names:
plural: kuryrports
singular: kuryrport
kind: KuryrPort
shortNames:
- kp
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
required:
- podUid
- podNodeName
- vifs
properties:
podUid:
type: string
podNodeName:
type: string
vifs:
type: object
x-kubernetes-preserve-unknown-fields: true
additionalPrinterColumns:
- name: PodUID
type: string
description: Pod UID
jsonPath: .spec.podUid
- name: Nodename
type: string
description: Name of the node corresponding pod lives in
jsonPath: .spec.podNodeName
- name: labels
type: string
description: Labels for the CRD
jsonPath: .metadata.labels

+ 33
- 31
kuryr_kubernetes/cni/binding/dpdk.py View File

@@ -15,6 +15,7 @@

import os

from os_vif import objects
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
@@ -23,7 +24,6 @@ from kuryr_kubernetes import clients
from kuryr_kubernetes.cni.binding import base as b_base
from kuryr_kubernetes import constants
from kuryr_kubernetes.handlers import health
from kuryr_kubernetes import utils

from kuryr.lib._i18n import _

@@ -143,42 +143,46 @@ class DpdkDriver(health.HealthHandler, b_base.BaseBindingDriver):

def _set_vif(self, vif):
# TODO(ivc): extract annotation interactions
state, labels, resource_version = self._get_pod_details(
vifs, labels, resource_version, kp_link = self._get_pod_details(
vif.port_profile.selflink)
for ifname, vif_ex in state.vifs.items():
if vif.id == vif_ex.id:
state.vifs[ifname] = vif
for ifname, data in vifs.items():
if vif.id == data['vif'].id:
vifs[ifname] = data
break
self._set_pod_details(state, vif.port_profile.selflink, labels,
resource_version)
self._set_pod_details(vifs, vif.port_profile.selflink, labels,
resource_version, kp_link)

def _get_pod_details(self, selflink):
k8s = clients.get_kubernetes_client()
pod = k8s.get(selflink)
annotations = pod['metadata']['annotations']
resource_version = pod['metadata']['resourceVersion']
labels = pod['metadata'].get('labels')
kp = k8s.get(f'{constants.K8S_API_CRD_NAMESPACES}/'
f'{pod["metadata"]["namespace"]}/kuryrports/'
f'{pod["metadata"]["name"]}')

try:
annotations = annotations[constants.K8S_ANNOTATION_VIF]
state_annotation = jsonutils.loads(annotations)
state = utils.extract_pod_annotation(state_annotation)
except KeyError:
LOG.exception("No annotations %s", constants.K8S_ANNOTATION_VIF)
vifs = {k: {'default': v['default'],
'vif': objects.base.VersionedObject
.obj_from_primitive(v['vif'])}
for k, v in kp['spec']['vifs'].items()}
except (KeyError, AttributeError):
LOG.exception(f"No vifs found on KuryrPort: {kp}")
raise
except ValueError:
LOG.exception("Unable encode annotations")
raise
LOG.info("Got VIFs from annotation: %s", state.vifs)
return state, labels, resource_version
LOG.info(f"Got VIFs from Kuryrport: {vifs}")

def _set_pod_details(self, state, selflink, labels, resource_version):
if not state:
LOG.info("Removing VIFs annotation: %r", state)
annotation = None
else:
state_dict = state.obj_to_primitive()
annotation = jsonutils.dumps(state_dict, sort_keys=True)
LOG.info("Setting VIFs annotation: %r", annotation)
resource_version = pod['metadata']['resourceVersion']
labels = pod['metadata'].get('labels')
return vifs, labels, resource_version, kp['metadata']['selflink']

def _set_pod_details(self, vifs, selflink, labels, resource_version,
kp_link):
k8s = clients.get_kubernetes_client()
if vifs:
spec = {k: {'default': v['default'],
'vif': v['vif'].obj_to_primitive()}
for k, v in vifs.items()}

LOG.info("Setting VIFs in KuryrPort %r", spec)
k8s.patch_crd('spec', kp_link, {'vifs': spec})

if not labels:
LOG.info("Removing Label annotation: %r", labels)
@@ -187,8 +191,6 @@ class DpdkDriver(health.HealthHandler, b_base.BaseBindingDriver):
labels_annotation = jsonutils.dumps(labels, sort_keys=True)
LOG.info("Setting Labels annotation: %r", labels_annotation)

k8s = clients.get_kubernetes_client()
k8s.annotate(selflink,
{constants.K8S_ANNOTATION_VIF: annotation,
constants.K8S_ANNOTATION_LABEL: labels_annotation},
{constants.K8S_ANNOTATION_LABEL: labels_annotation},
resource_version=resource_version)

+ 34
- 44
kuryr_kubernetes/cni/daemon/service.py View File

@@ -20,14 +20,14 @@ import socket
import sys
import threading
import time
import urllib.parse
import urllib3

import cotyledon
import flask
from pyroute2.ipdb import transactional
import urllib3

import os_vif
from os_vif.objects import base
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
@@ -193,10 +193,12 @@ class CNIDaemonWatcherService(cotyledon.Service):
self.pipeline.register(h_cni.CallbackHandler(self.on_done,
self.on_deleted))
self.watcher = k_watcher.Watcher(self.pipeline)
self.watcher.add(
"%(base)s/pods?fieldSelector=spec.nodeName=%(node_name)s" % {
'base': k_const.K8S_API_BASE,
'node_name': self._get_nodename()})
query_label = urllib.parse.quote_plus(f'{k_const.KURYRPORT_LABEL}='
f'{self._get_nodename()}')

self.watcher.add(f'{k_const.K8S_API_CRD_KURYRPORTS}'
f'?labelSelector={query_label}')

self.is_running = True
self.health_thread = threading.Thread(
target=self._start_watcher_health_checker)
@@ -211,55 +213,43 @@ class CNIDaemonWatcherService(cotyledon.Service):
self.healthy.value = False
time.sleep(HEALTH_CHECKER_DELAY)

def on_done(self, pod, vifs):
pod_name = utils.get_pod_unique_name(pod)
vif_dict = {
ifname: vif.obj_to_primitive() for
ifname, vif in vifs.items()
}
# NOTE(dulek): We need a lock when modifying shared self.registry dict
# to prevent race conditions with other processes/threads.
with lockutils.lock(pod_name, external=True):
if (pod_name not in self.registry or
self.registry[pod_name]['pod']['metadata']['uid']
!= pod['metadata']['uid']):
self.registry[pod_name] = {'pod': pod, 'vifs': vif_dict,
'containerid': None,
'vif_unplugged': False,
'del_received': False}
def on_done(self, kuryrport, vifs):
kp_name = utils.get_res_unique_name(kuryrport)
with lockutils.lock(kp_name, external=True):
if (kp_name not in self.registry or
self.registry[kp_name]['kp']['metadata']['uid']
!= kuryrport['metadata']['uid']):
self.registry[kp_name] = {'kp': kuryrport,
'vifs': vifs,
'containerid': None,
'vif_unplugged': False,
'del_received': False}
else:
# NOTE(dulek): Only update vif if its status changed, we don't
# need to care about other changes now.
old_vifs = {
ifname:
base.VersionedObject.obj_from_primitive(vif_obj) for
ifname, vif_obj in (
self.registry[pod_name]['vifs'].items())
}
old_vifs = self.registry[kp_name]['vifs']
for iface in vifs:
if old_vifs[iface].active != vifs[iface].active:
pod_dict = self.registry[pod_name]
pod_dict['vifs'] = vif_dict
self.registry[pod_name] = pod_dict
kp_dict = self.registry[kp_name]
kp_dict['vifs'] = vifs
self.registry[kp_name] = kp_dict

def on_deleted(self, pod):
pod_name = utils.get_pod_unique_name(pod)
def on_deleted(self, kp):
kp_name = utils.get_res_unique_name(kp)
try:
if pod_name in self.registry:
if kp_name in self.registry:
# NOTE(ndesh): We need to lock here to avoid race condition
# with the deletion code for CNI DEL so that
# we delete the registry entry exactly once
with lockutils.lock(pod_name, external=True):
if self.registry[pod_name]['vif_unplugged']:
del self.registry[pod_name]
with lockutils.lock(kp_name, external=True):
if self.registry[kp_name]['vif_unplugged']:
del self.registry[kp_name]
else:
pod_dict = self.registry[pod_name]
pod_dict['del_received'] = True
self.registry[pod_name] = pod_dict
kp_dict = self.registry[kp_name]
kp_dict['del_received'] = True
self.registry[kp_name] = kp_dict
except KeyError:
# This means someone else removed it. It's odd but safe to ignore.
LOG.debug('Pod %s entry already removed from registry while '
'handling DELETED event. Ignoring.', pod_name)
LOG.debug('KuryrPort %s entry already removed from registry while '
'handling DELETED event. Ignoring.', kp_name)
pass

def terminate(self):


+ 24
- 19
kuryr_kubernetes/cni/handlers.py View File

@@ -17,18 +17,20 @@ import abc

from os_vif import objects as obj_vif
from oslo_log import log as logging
from oslo_serialization import jsonutils

from kuryr_kubernetes import clients
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.handlers import dispatch as k_dis
from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes import utils


LOG = logging.getLogger(__name__)


class CNIHandlerBase(k8s_base.ResourceEventHandler, metaclass=abc.ABCMeta):
OBJECT_KIND = k_const.K8S_OBJ_POD
OBJECT_KIND = k_const.K8S_OBJ_KURYRPORT

def __init__(self, cni, on_done):
self._cni = cni
@@ -59,16 +61,18 @@ class CNIHandlerBase(k8s_base.ResourceEventHandler, metaclass=abc.ABCMeta):
raise NotImplementedError()

def _get_vifs(self, pod):
# TODO(ivc): same as VIFHandler._get_vif
k8s = clients.get_kubernetes_client()
try:
annotations = pod['metadata']['annotations']
state_annotation = annotations[k_const.K8S_ANNOTATION_VIF]
except KeyError:
kuryrport_crd = k8s.get(f'{k_const.K8S_API_CRD_NAMESPACES}/'
f'{pod["metadata"]["namespace"]}/'
f'kuryrports/{pod["metadata"]["name"]}')
LOG.debug("Got CRD: %r", kuryrport_crd)
except k_exc.K8sClientException:
return {}
state_annotation = jsonutils.loads(state_annotation)
state = utils.extract_pod_annotation(state_annotation)
vifs_dict = state.vifs
LOG.debug("Got VIFs from annotation: %r", vifs_dict)
vifs_dict = utils.get_vifs_from_crd(kuryrport_crd)
LOG.debug("Got vifs: %r", vifs_dict)
return vifs_dict

def _get_inst(self, pod):
@@ -81,31 +85,32 @@ class CallbackHandler(CNIHandlerBase):
def __init__(self, on_vif, on_del=None):
super(CallbackHandler, self).__init__(None, on_vif)
self._del_callback = on_del
self._pod = None
self._kuryrport = None
self._callback_vifs = None

def should_callback(self, pod, vifs):
def should_callback(self, kuryrport, vifs):
"""Called after all vifs have been processed

Calls callback if there was at least one vif in the Pod
Calls callback if there was at least one vif in the CRD

:param pod: dict containing Kubernetes Pod object
:param kuryrport: dict containing Kubernetes KuryrPort CRD object
:param vifs: dict containing os_vif VIF objects and ifnames
:returns True/False
"""
self._pod = pod
self._kuryrport = kuryrport
self._callback_vifs = vifs
if vifs:
return True
return False

def callback(self):
self._callback(self._pod, self._callback_vifs)
self._callback(self._kuryrport, self._callback_vifs)

def on_deleted(self, pod):
LOG.debug("Got pod %s deletion event.", pod['metadata']['name'])
def on_deleted(self, kuryrport):
LOG.debug("Got kuryrport %s deletion event.",
kuryrport['metadata']['name'])
if self._del_callback:
self._del_callback(pod)
self._del_callback(kuryrport)


class CNIPipeline(k_dis.EventPipeline):


+ 62
- 65
kuryr_kubernetes/cni/plugins/k8s_cni_registry.py View File

@@ -15,7 +15,6 @@
import retrying

from os_vif import objects as obj_vif
from os_vif.objects import base
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
@@ -31,12 +30,14 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
RETRY_DELAY = 1000 # 1 second in milliseconds

# TODO(dulek): Another corner case is (and was) when pod is deleted before it's
# annotated by controller or even noticed by any watcher. Kubelet
# will try to delete such vif, but we will have no data about it.
# This is currently worked around by returning successfully in
# case of timing out in delete. To solve this properly we need
# to watch for pod deletes as well.
# TODO(dulek, gryf): Another corner case is (and was) when pod is deleted
# before it's corresponding CRD was created and populated by vifs by
# controller or even noticed by any watcher. Kubelet will try to delete such
# vif, but we will have no data about it. This is currently worked around by
# returning successfully in case of timing out in delete. To solve this
# properly we need to watch for pod deletes as well, or perhaps create
# finalizer for the pod as soon, as we know, that kuryrport CRD will be
# created.


class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
@@ -45,32 +46,32 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
self.registry = registry
self.k8s = clients.get_kubernetes_client()

def _get_pod_name(self, params):
def _get_obj_name(self, params):
return "%(namespace)s/%(name)s" % {
'namespace': params.args.K8S_POD_NAMESPACE,
'name': params.args.K8S_POD_NAME}

def add(self, params):
pod_name = self._get_pod_name(params)
kp_name = self._get_obj_name(params)
timeout = CONF.cni_daemon.vif_annotation_timeout

# Try to confirm if pod in the registry is not stale cache. If it is,
# Try to confirm if CRD in the registry is not stale cache. If it is,
# remove it.
with lockutils.lock(pod_name, external=True):
if pod_name in self.registry:
cached_pod = self.registry[pod_name]['pod']
with lockutils.lock(kp_name, external=True):
if kp_name in self.registry:
cached_kp = self.registry[kp_name]['kp']
try:
pod = self.k8s.get(cached_pod['metadata']['selfLink'])
kp = self.k8s.get(cached_kp['metadata']['selfLink'])
except Exception:
LOG.exception('Error when getting pod %s', pod_name)
raise exceptions.ResourceNotReady(pod_name)
LOG.exception('Error when getting KuryrPort %s', kp_name)
raise exceptions.ResourceNotReady(kp_name)

if pod['metadata']['uid'] != cached_pod['metadata']['uid']:
LOG.warning('Stale pod %s detected in cache. (API '
if kp['metadata']['uid'] != cached_kp['metadata']['uid']:
LOG.warning('Stale KuryrPort %s detected in cache. (API '
'uid=%s, cached uid=%s). Removing it from '
'cache.', pod_name, pod['metadata']['uid'],
cached_pod['metadata']['uid'])
del self.registry[pod_name]
'cache.', kp_name, kp['metadata']['uid'],
cached_kp['metadata']['uid'])
del self.registry[kp_name]

vifs = self._do_work(params, b_base.connect, timeout)

@@ -78,70 +79,68 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
# requests that we should ignore. We need a lock to
# prevent race conditions and replace whole object in the
# dict for multiprocessing.Manager to notice that.
with lockutils.lock(pod_name, external=True):
d = self.registry[pod_name]
with lockutils.lock(kp_name, external=True):
d = self.registry[kp_name]
d['containerid'] = params.CNI_CONTAINERID
self.registry[pod_name] = d
LOG.debug('Saved containerid = %s for pod %s',
params.CNI_CONTAINERID, pod_name)
self.registry[kp_name] = d
LOG.debug('Saved containerid = %s for CRD %s',
params.CNI_CONTAINERID, kp_name)

# Wait for timeout sec, 1 sec between tries, retry when even one
# vif is not active.
@retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY,
retry_on_result=utils.any_vif_inactive)
def wait_for_active(pod_name):
return {
ifname: base.VersionedObject.obj_from_primitive(vif_obj) for
ifname, vif_obj in self.registry[pod_name]['vifs'].items()
}
def wait_for_active(kp_name):
return self.registry[kp_name]['vifs']

vifs = wait_for_active(pod_name)
vifs = wait_for_active(kp_name)
for vif in vifs.values():
if not vif.active:
LOG.error("Timed out waiting for vifs to become active")
raise exceptions.ResourceNotReady(pod_name)
raise exceptions.ResourceNotReady(kp_name)

return vifs[k_const.DEFAULT_IFNAME]

def delete(self, params):
pod_name = self._get_pod_name(params)
kp_name = self._get_obj_name(params)
try:
reg_ci = self.registry[pod_name]['containerid']
LOG.debug('Read containerid = %s for pod %s', reg_ci, pod_name)
reg_ci = self.registry[kp_name]['containerid']
LOG.debug('Read containerid = %s for KuryrPort %s', reg_ci,
kp_name)
if reg_ci and reg_ci != params.CNI_CONTAINERID:
# NOTE(dulek): This is a DEL request for some older (probably
# failed) ADD call. We should ignore it or we'll
# unplug a running pod.
LOG.warning('Received DEL request for unknown ADD call for '
'pod %s (CNI_CONTAINERID=%s). Ignoring.', pod_name,
params.CNI_CONTAINERID)
'Kuryrport %s (CNI_CONTAINERID=%s). Ignoring.',
kp_name, params.CNI_CONTAINERID)
return
except KeyError:
pass

# Passing arbitrary 5 seconds as timeout, as it does not make any sense
# to wait on CNI DEL. If pod got deleted from API - VIF info is gone.
# If pod got the annotation removed - it is now gone too. The number's
# not 0, because we need to anticipate for restarts and delay before
# registry is populated by watcher.
# to wait on CNI DEL. If kuryrport got deleted from API - VIF info is
# gone. If kuryrport got the vif info removed - it is now gone too.
# The number's not 0, because we need to anticipate for restarts and
# delay before registry is populated by watcher.
self._do_work(params, b_base.disconnect, 5)

# NOTE(ndesh): We need to lock here to avoid race condition
# with the deletion code in the watcher to ensure that
# we delete the registry entry exactly once
try:
with lockutils.lock(pod_name, external=True):
if self.registry[pod_name]['del_received']:
del self.registry[pod_name]
with lockutils.lock(kp_name, external=True):
if self.registry[kp_name]['del_received']:
del self.registry[kp_name]
else:
pod_dict = self.registry[pod_name]
pod_dict['vif_unplugged'] = True
self.registry[pod_name] = pod_dict
kp_dict = self.registry[kp_name]
kp_dict['vif_unplugged'] = True
self.registry[kp_name] = kp_dict
except KeyError:
# This means the pod was removed before vif was unplugged. This
# shouldn't happen, but we can't do anything about it now
LOG.debug('Pod %s not found registry while handling DEL request. '
'Ignoring.', pod_name)
# This means the kuryrport was removed before vif was unplugged.
# This shouldn't happen, but we can't do anything about it now
LOG.debug('KuryrPort %s not found registry while handling DEL '
'request. Ignoring.', kp_name)
pass

def report_drivers_health(self, driver_healthy):
@@ -151,25 +150,22 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
self.healthy.value = driver_healthy

def _do_work(self, params, fn, timeout):
pod_name = self._get_pod_name(params)
kp_name = self._get_obj_name(params)

# In case of KeyError retry for `timeout` s, wait 1 s between tries.
@retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY,
retry_on_exception=lambda e: isinstance(e, KeyError))
def find():
return self.registry[pod_name]
return self.registry[kp_name]

try:
d = find()
pod = d['pod']
vifs = {
ifname: base.VersionedObject.obj_from_primitive(vif_obj) for
ifname, vif_obj in d['vifs'].items()
}
kp = d['kp']
vifs = d['vifs']
except KeyError:
LOG.error("Timed out waiting for requested pod to appear in "
LOG.error("Timed out waiting for requested KuryrPort to appear in "
"registry")
raise exceptions.ResourceNotReady(pod_name)
raise exceptions.ResourceNotReady(kp_name)

for ifname, vif in vifs.items():
is_default_gateway = (ifname == k_const.DEFAULT_IFNAME)
@@ -178,12 +174,13 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
# use the ifname supplied in the CNI ADD request
ifname = params.CNI_IFNAME

fn(vif, self._get_inst(pod), ifname, params.CNI_NETNS,
fn(vif, self._get_inst(kp), ifname, params.CNI_NETNS,
report_health=self.report_drivers_health,
is_default_gateway=is_default_gateway,
container_id=params.CNI_CONTAINERID)
return vifs

def _get_inst(self, pod):
return obj_vif.instance_info.InstanceInfo(
uuid=pod['metadata']['uid'], name=pod['metadata']['name'])
def _get_inst(self, kp):
return (obj_vif.instance_info
.InstanceInfo(uuid=kp['spec']['podUid'],
name=kp['metadata']['name']))

+ 10
- 1
kuryr_kubernetes/constants.py View File

@@ -13,14 +13,18 @@
# License for the specific language governing permissions and limitations
# under the License.

KURYR_FQDN = 'kuryr.openstack.org'

K8S_API_BASE = '/api/v1'
K8S_API_NAMESPACES = K8S_API_BASE + '/namespaces'
K8S_API_CRD = '/apis/openstack.org/v1'
K8S_API_CRD_VERSION = 'openstack.org/v1'
K8S_API_CRD = '/apis/' + K8S_API_CRD_VERSION
K8S_API_CRD_NAMESPACES = K8S_API_CRD + '/namespaces'
K8S_API_CRD_KURYRNETS = K8S_API_CRD + '/kuryrnets'
K8S_API_CRD_KURYRNETWORKS = K8S_API_CRD + '/kuryrnetworks'
K8S_API_CRD_KURYRNETPOLICIES = K8S_API_CRD + '/kuryrnetpolicies'
K8S_API_CRD_KURYRLOADBALANCERS = K8S_API_CRD + '/kuryrloadbalancers'
K8S_API_CRD_KURYRPORTS = K8S_API_CRD + '/kuryrports'
K8S_API_POLICIES = '/apis/networking.k8s.io/v1/networkpolicies'

K8S_API_NPWG_CRD = '/apis/k8s.cni.cncf.io/v1'
@@ -34,6 +38,7 @@ K8S_OBJ_KURYRNET = 'KuryrNet'
K8S_OBJ_KURYRNETWORK = 'KuryrNetwork'
K8S_OBJ_KURYRNETPOLICY = 'KuryrNetPolicy'
K8S_OBJ_KURYRLOADBALANCER = 'KuryrLoadBalancer'
K8S_OBJ_KURYRPORT = 'KuryrPort'

K8S_POD_STATUS_PENDING = 'Pending'
K8S_POD_STATUS_SUCCEEDED = 'Succeeded'
@@ -59,8 +64,12 @@ K8S_ANNOTATION_OLD_DRIVER = 'old_driver'
K8S_ANNOTATION_CURRENT_DRIVER = 'current_driver'
K8S_ANNOTATION_NEUTRON_PORT = 'neutron_id'

POD_FINALIZER = KURYR_FQDN + '/pod-finalizer'
KURYRNETWORK_FINALIZER = 'kuryrnetwork.finalizers.kuryr.openstack.org'

KURYRPORT_FINALIZER = KURYR_FQDN + '/kuryrport-finalizer'
KURYRPORT_LABEL = KURYR_FQDN + '/nodeName'

K8S_OS_VIF_NOOP_PLUGIN = "noop"

CNI_EXCEPTION_CODE = 100


+ 3
- 3
kuryr_kubernetes/controller/drivers/neutron_vif.py View File

@@ -95,10 +95,10 @@ class NeutronPodVIFDriver(base.PodVIFDriver):

def update_vif_sgs(self, pod, security_groups):
os_net = clients.get_network_client()
pod_state = utils.get_pod_state(pod)
if pod_state:
vifs = utils.get_vifs(pod)
if vifs:
# NOTE(ltomasbo): It just updates the default_vif security group
port_id = pod_state.vifs[constants.DEFAULT_IFNAME].id
port_id = vifs[constants.DEFAULT_IFNAME].id
os_net.update_port(port_id, security_groups=list(security_groups))

def _get_port_request(self, pod, project_id, subnets, security_groups,


+ 26
- 20
kuryr_kubernetes/controller/drivers/utils.py View File

@@ -17,6 +17,7 @@ import urllib

import netaddr
from openstack import exceptions as os_exc
from os_vif import objects
from oslo_config import cfg
from oslo_log import log
from oslo_serialization import jsonutils
@@ -24,7 +25,6 @@ from oslo_serialization import jsonutils
from kuryr_kubernetes import clients
from kuryr_kubernetes import constants
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes import utils


OPERATORS_WITH_VALUES = [constants.K8S_OPERATOR_IN,
@@ -59,15 +59,23 @@ def get_host_id(pod):
return pod['spec']['nodeName']


def get_pod_state(pod):
def get_kuryrport(pod):
k8s = clients.get_kubernetes_client()
try:
annotations = pod['metadata']['annotations']
state_annotation = annotations[constants.K8S_ANNOTATION_VIF]
except KeyError:
return k8s.get(f'{constants.K8S_API_CRD_NAMESPACES}/'
f'{pod["metadata"]["namespace"]}/kuryrports/'
f'{pod["metadata"]["name"]}')
except k_exc.K8sResourceNotFound:
return None
state_annotation = jsonutils.loads(state_annotation)
state = utils.extract_pod_annotation(state_annotation)
return state


def get_vifs(pod):
kp = get_kuryrport(pod)
try:
return {k: objects.base.VersionedObject.obj_from_primitive(v['vif'])
for k, v in kp['spec']['vifs'].items()}
except (KeyError, AttributeError, TypeError):
return {}


def is_host_network(pod):
@@ -274,19 +282,17 @@ def create_security_group_rule_body(

def get_pod_ip(pod):
try:
pod_metadata = pod['metadata']['annotations']
vif = pod_metadata[constants.K8S_ANNOTATION_VIF]
except KeyError:
kp = get_kuryrport(pod)
vif = [x['vif'] for x in kp['spec']['vifs'].values()
if x['default']][0]
except (KeyError, TypeError, IndexError):
return None
vif = jsonutils.loads(vif)
vif = vif['versioned_object.data']['default_vif']
network = (vif['versioned_object.data']['network']
['versioned_object.data'])
first_subnet = (network['subnets']['versioned_object.data']
['objects'][0]['versioned_object.data'])
first_subnet_ip = (first_subnet['ips']['versioned_object.data']
['objects'][0]['versioned_object.data']['address'])
return first_subnet_ip
return (vif['versioned_object.data']['network']
['versioned_object.data']['subnets']
['versioned_object.data']['objects'][0]
['versioned_object.data']['ips']
['versioned_object.data']['objects'][0]
['versioned_object.data']['address'])


def get_annotations(resource, annotation):


+ 3
- 11
kuryr_kubernetes/controller/drivers/vif_pool.py View File

@@ -27,7 +27,6 @@ from oslo_concurrency import lockutils
from oslo_config import cfg as oslo_cfg
from oslo_log import log as logging
from oslo_log import versionutils
from oslo_serialization import jsonutils

from kuryr_kubernetes import clients
from kuryr_kubernetes import config
@@ -280,16 +279,9 @@ class BaseVIFPool(base.VIFPoolDriver, metaclass=abc.ABCMeta):
in_use_ports = []
running_pods = kubernetes.get(constants.K8S_API_BASE + '/pods')
for pod in running_pods['items']:
try:
annotations = jsonutils.loads(pod['metadata']['annotations'][
constants.K8S_ANNOTATION_VIF])
pod_state = utils.extract_pod_annotation(annotations)
except KeyError:
LOG.debug("Skipping pod without kuryr VIF annotation: %s",
pod)
else:
for vif in pod_state.vifs.values():
in_use_ports.append(vif.id)
vifs = c_utils.get_vifs(pod)
for data in vifs.values():
in_use_ports.append(data.id)
return in_use_ports

def list_pools(self):


+ 267
- 0
kuryr_kubernetes/controller/handlers/kuryrport.py View File

@@ -0,0 +1,267 @@
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from openstack import exceptions as os_exc
from os_vif import objects
from oslo_config import cfg as oslo_cfg
from oslo_log import log as logging

from kuryr_kubernetes import clients
from kuryr_kubernetes import constants
from kuryr_kubernetes.controller.drivers import base as drivers
from kuryr_kubernetes.controller.drivers import utils as driver_utils
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.handlers import k8s_base

LOG = logging.getLogger(__name__)
KURYRPORT_URI = constants.K8S_API_CRD_NAMESPACES + '/{ns}/kuryrports/{crd}'


class KuryrPortHandler(k8s_base.ResourceEventHandler):
"""Controller side of KuryrPort process for Kubernetes pods.

`KuryrPortHandler` runs on the Kuryr-Kubernetes controller and is
responsible for creating/removing the OpenStack resources associated to
the newly created pods, namely ports and update the KuryrPort CRD data.
"""
OBJECT_KIND = constants.K8S_OBJ_KURYRPORT
OBJECT_WATCH_PATH = constants.K8S_API_CRD_KURYRPORTS

def __init__(self):
super(KuryrPortHandler, self).__init__()
self._drv_project = drivers.PodProjectDriver.get_instance()
self._drv_subnets = drivers.PodSubnetsDriver.get_instance()
self._drv_sg = drivers.PodSecurityGroupsDriver.get_instance()
# REVISIT(ltomasbo): The VIF Handler should not be aware of the pool
# directly. Due to the lack of a mechanism to load and set the
# VIFHandler driver, for now it is aware of the pool driver, but this
# will be reverted as soon as a mechanism is in place.
self._drv_vif_pool = drivers.VIFPoolDriver.get_instance(
specific_driver='multi_pool')
self._drv_vif_pool.set_vif_driver()
self._drv_multi_vif = drivers.MultiVIFDriver.get_enabled_drivers()
if self._is_network_policy_enabled():
self._drv_lbaas = drivers.LBaaSDriver.get_instance()
self._drv_svc_sg = (drivers.ServiceSecurityGroupsDriver
.get_instance())
self.k8s = clients.get_kubernetes_client()

def on_present(self, kuryrport_crd):
if not kuryrport_crd['spec']['vifs']:
# Get vifs
if not self.get_vifs(kuryrport_crd):
# Ignore this event, according to one of the cases logged in
# get_vifs method.
return

vifs = {ifname: {'default': data['default'],
'vif': objects.base.VersionedObject
.obj_from_primitive(data['vif'])}
for ifname, data in kuryrport_crd['spec']['vifs'].items()}

if all([v['vif'].active for v in vifs.values()]):
return

changed = False

try:
for ifname, data in vifs.items():
if (data['vif'].plugin == constants.KURYR_VIF_TYPE_SRIOV and
oslo_cfg.CONF.sriov.enable_node_annotations):
pod_node = kuryrport_crd['spec']['podNodeName']
# TODO(gryf): This probably will need adoption, so it will
# add information to CRD instead of the pod.
driver_utils.update_port_pci_info(pod_node, data['vif'])
if not data['vif'].active:
try:
self._drv_vif_pool.activate_vif(data['vif'])
changed = True
except os_exc.ResourceNotFound:
LOG.debug("Port not found, possibly already deleted. "
"No need to activate it")
finally:
if changed:
try:
name = kuryrport_crd['metadata']['name']
namespace = kuryrport_crd['metadata']['namespace']
pod = self.k8s.get(f"{constants.K8S_API_NAMESPACES}"
f"/{namespace}/pods/{name}")
except k_exc.K8sResourceNotFound as ex:
LOG.exception("Failed to get pod: %s", ex)
raise

project_id = self._drv_project.get_project(pod)

try:
self._update_kuryrport_crd(kuryrport_crd, vifs)
except k_exc.K8sResourceNotFound as ex:
LOG.exception("Failed to update KuryrPort CRD: %s", ex)
security_groups = self._drv_sg.get_security_groups(
pod, project_id)
for ifname, data in vifs.items():
self._drv_vif_pool.release_vif(pod, data['vif'],
project_id,
security_groups)
except k_exc.K8sClientException:
raise k_exc.ResourceNotReady(pod['metadata']['name'])

if self._is_network_policy_enabled():
crd_pod_selectors = self._drv_sg.create_sg_rules(pod)
if oslo_cfg.CONF.octavia_defaults.enforce_sg_rules:
services = driver_utils.get_services()
self._update_services(services, crd_pod_selectors,
project_id)

def on_finalize(self, kuryrport_crd):
name = kuryrport_crd['metadata']['name']
namespace = kuryrport_crd['metadata']['namespace']
try:
pod = self.k8s.get(f"{constants.K8S_API_NAMESPACES}"
f"/{namespace}/pods/{name}")
except k_exc.K8sResourceNotFound as ex:
LOG.exception("Failed to get pod: %s", ex)
# TODO(gryf): Free resources
self.k8s.remove_finalizer(kuryrport_crd, constants.POD_FINALIZER)
raise

if (driver_utils.is_host_network(pod) or
not pod['spec'].get('nodeName')):
return

project_id = self._drv_project.get_project(pod)
try:
crd_pod_selectors = self._drv_sg.delete_sg_rules(pod)
except k_exc.ResourceNotReady:
# NOTE(ltomasbo): If the pod is being deleted before
# kuryr-controller annotated any information about the port
# associated, there is no need for deleting sg rules associated to
# it. So this exception could be safetly ignored for the current
# sg drivers. Only the NP driver associates rules to the pods ips,
# and that waits for annotations to start.
#
# NOTE(gryf): perhaps we don't need to handle this case, since
# during CRD creation all the things, including security groups
# rules would be created too.
LOG.debug("Skipping SG rules deletion associated to the pod %s",
pod)
crd_pod_selectors = []
try:
security_groups = self._drv_sg.get_security_groups(pod, project_id)
except k_exc.ResourceNotReady:
# NOTE(ltomasbo): If the namespace object gets deleted first the
# namespace security group driver will raise a ResourceNotReady
# exception as it cannot access anymore the kuryrnetwork CRD
# annotated on the namespace object. In such case we set security
# groups to empty list so that if pools are enabled they will be
# properly released.
security_groups = []

for data in kuryrport_crd['spec']['vifs'].values():
vif = objects.base.VersionedObject.obj_from_primitive(data['vif'])
self._drv_vif_pool.release_vif(pod, vif, project_id,
security_groups)
if (self._is_network_policy_enabled() and crd_pod_selectors and
oslo_cfg.CONF.octavia_defaults.enforce_sg_rules):
services = driver_utils.get_services()
self._update_services(services, crd_pod_selectors, project_id)

# Remove finalizer out of pod.
self.k8s.remove_finalizer(pod, constants.POD_FINALIZER)

# Finally, remove finalizer from KuryrPort CRD
self.k8s.remove_finalizer(kuryrport_crd, constants.KURYRPORT_FINALIZER)

def get_vifs(self, kuryrport_crd):
try:
pod = self.k8s.get(f"{constants.K8S_API_NAMESPACES}"
f"/{kuryrport_crd['metadata']['namespace']}"
f"/pods"
f"/{kuryrport_crd['metadata']['name']}")
except k_exc.K8sResourceNotFound as ex:
LOG.exception("Failed to get pod: %s", ex)
# TODO(gryf): Release resources
self.k8s.remove_finalizer(kuryrport_crd,
constants.KURYRPORT_FINALIZER)
raise

project_id = self._drv_project.get_project(pod)
security_groups = self._drv_sg.get_security_groups(pod, project_id)
try:
subnets = self._drv_subnets.get_subnets(pod, project_id)
except (os_exc.ResourceNotFound, k_exc.K8sResourceNotFound):
LOG.warning("Subnet does not exists. If namespace driver is "
"used, probably the namespace for the pod is "
"already deleted. So this pod does not need to "
"get a port as it will be deleted too. If the "
"default subnet driver is used, then you must "
"select an existing subnet to be used by Kuryr.")
return False

# Request the default interface of pod
main_vif = self._drv_vif_pool.request_vif(
pod, project_id, subnets, security_groups)

if not main_vif:
pod_name = pod['metadata']['name']
LOG.warning("Ignoring event due to pod %s not being "
"scheduled yet.", pod_name)
return False

vifs = {constants.DEFAULT_IFNAME: {'default': True, 'vif': main_vif}}

# Request the additional interfaces from multiple drivers
index = 0
for driver in self._drv_multi_vif:
additional_vifs = driver.request_additional_vifs(pod, project_id,
security_groups)
for index, vif in enumerate(additional_vifs, start=index+1):
ifname = (oslo_cfg.CONF.kubernetes.additional_ifname_prefix +
str(index))
vifs[ifname] = {'default': False, 'vif': vif}

try:
self._update_kuryrport_crd(kuryrport_crd, vifs)
except k_exc.K8sClientException as ex:
LOG.exception("Kubernetes Client Exception creating "
"KuryrPort CRD: %s", ex)
for ifname, data in vifs.items():
self._drv_vif_pool.release_vif(pod, data['vif'],
project_id,
security_groups)
return True

def _update_kuryrport_crd(self, kuryrport_crd, vifs):
LOG.debug('Updatting CRD %s', kuryrport_crd["metadata"]["name"])
spec = {}
for ifname, data in vifs.items():
data['vif'].obj_reset_changes(recursive=True)
spec[ifname] = {'default': data['default'],
'vif': data['vif'].obj_to_primitive()}

self.k8s.patch_crd('spec', kuryrport_crd['metadata']['selfLink'],
{'vifs': spec})

def _is_network_policy_enabled(self):
enabled_handlers = oslo_cfg.CONF.kubernetes.enabled_handlers
svc_sg_driver = oslo_cfg.CONF.kubernetes.service_security_groups_driver
return ('policy' in enabled_handlers and svc_sg_driver == 'policy')

def _update_services(self, services, crd_pod_selectors, project_id):
for service in services.get('items'):
if not driver_utils.service_matches_affected_pods(
service, crd_pod_selectors):
continue
sgs = self._drv_svc_sg.get_security_groups(service,
project_id)
self._drv_lbaas.update_lbaas_sg(service, sgs)

+ 11
- 5
kuryr_kubernetes/controller/handlers/pod_label.py View File

@@ -47,11 +47,17 @@ class PodLabelHandler(k8s_base.ResourceEventHandler):
self._drv_lbaas = drivers.LBaaSDriver.get_instance()

def on_present(self, pod):
if driver_utils.is_host_network(pod) or not self._has_pod_state(pod):
if driver_utils.is_host_network(pod) or not self._has_vifs(pod):
# NOTE(ltomasbo): The event will be retried once the vif handler
# annotates the pod with the pod state.
return

if (constants.K8S_ANNOTATION_VIF in
pod['metadata'].get('annotations', {})):
# NOTE(dulek): This might happen on upgrade, we need to wait for
# annotation to be moved to KuryrPort CRD.
return

current_pod_labels = pod['metadata'].get('labels')
previous_pod_labels = self._get_pod_labels(pod)
LOG.debug("Got previous pod labels from annotation: %r",
@@ -97,11 +103,11 @@ class PodLabelHandler(k8s_base.ResourceEventHandler):
{constants.K8S_ANNOTATION_LABEL: annotation},
resource_version=pod['metadata']['resourceVersion'])

def _has_pod_state(self, pod):
def _has_vifs(self, pod):
try:
pod_state = pod['metadata']['annotations'][
constants.K8S_ANNOTATION_VIF]
LOG.debug("Pod state is: %s", pod_state)
kp = driver_utils.get_vifs(pod)
vifs = kp['spec']['vifs']
LOG.debug("Pod have associated KuryrPort with vifs: %s", vifs)
except KeyError:
return False
return True


+ 84
- 166
kuryr_kubernetes/controller/handlers/vif.py View File

@@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.

from openstack import exceptions as os_exc
from os_vif import objects
from oslo_config import cfg as oslo_cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
@@ -24,10 +24,10 @@ from kuryr_kubernetes.controller.drivers import base as drivers
from kuryr_kubernetes.controller.drivers import utils as driver_utils
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes import objects
from kuryr_kubernetes import utils

LOG = logging.getLogger(__name__)
KURYRPORT_URI = constants.K8S_API_CRD_NAMESPACES + '/{ns}/kuryrports/{crd}'


class VIFHandler(k8s_base.ResourceEventHandler):
@@ -63,20 +63,16 @@ class VIFHandler(k8s_base.ResourceEventHandler):
drivers.ServiceSecurityGroupsDriver.get_instance())

def on_present(self, pod):
state = driver_utils.get_pod_state(pod)
if (self._is_pod_completed(pod)):
if state:
if self._move_annotations_to_crd(pod):
return

kp = driver_utils.get_kuryrport(pod)
if self._is_pod_completed(pod):
if kp:
LOG.debug("Pod has completed execution, removing the vifs")
self.on_deleted(pod)
try:
self._set_pod_state(pod, None)
except k_exc.K8sClientException:
LOG.exception("Could not clear pod annotation")
raise k_exc.ResourceNotReady(pod['metadata']['name'])
except k_exc.K8sResourceNotFound:
pass
self.on_finalize(pod)
else:
LOG.debug("Pod has completed execution, no annotation found."
LOG.debug("Pod has completed execution, no KuryrPort found."
" Skipping")
return

@@ -87,129 +83,31 @@ class VIFHandler(k8s_base.ResourceEventHandler):
# where certain pods/namespaces/nodes can be managed by other
# networking solutions/CNI drivers.
return
LOG.debug("Got VIFs from annotation: %r", state)
project_id = self._drv_project.get_project(pod)
security_groups = self._drv_sg.get_security_groups(pod, project_id)
if not state:
try:
subnets = self._drv_subnets.get_subnets(pod, project_id)
except (os_exc.ResourceNotFound, k_exc.K8sResourceNotFound):
LOG.warning("Subnet does not exists. If namespace driver is "
"used, probably the namespace for the pod is "
"already deleted. So this pod does not need to "
"get a port as it will be deleted too. If the "
"default subnet driver is used, then you must "
"select an existing subnet to be used by Kuryr.")
return
# Request the default interface of pod
main_vif = self._drv_vif_pool.request_vif(
pod, project_id, subnets, security_groups)

if not main_vif:
pod_name = pod['metadata']['name']
LOG.warning("Ignoring event due to pod %s not being "
"scheduled yet.", pod_name)
return

state = objects.vif.PodState(default_vif=main_vif)

# Request the additional interfaces from multiple dirvers
additional_vifs = []
for driver in self._drv_multi_vif:
additional_vifs.extend(
driver.request_additional_vifs(
pod, project_id, security_groups))
if additional_vifs:
state.additional_vifs = {}
for i, vif in enumerate(additional_vifs, start=1):
k = (oslo_cfg.CONF.kubernetes.additional_ifname_prefix
+ str(i))
state.additional_vifs[k] = vif

LOG.debug("Got KuryrPort: %r", kp)
if not kp:
try:
self._set_pod_state(pod, state)
self._add_kuryrport_crd(pod)
except k_exc.K8sClientException as ex:
LOG.debug("Failed to set annotation: %s", ex)
# FIXME(ivc): improve granularity of K8sClient exceptions:
# only resourceVersion conflict should be ignored
for ifname, vif in state.vifs.items():
self._drv_vif_pool.release_vif(pod, vif,
project_id,
security_groups)
else:
changed = False
try:
for ifname, vif in state.vifs.items():
if (vif.plugin == constants.KURYR_VIF_TYPE_SRIOV and
oslo_cfg.CONF.sriov.enable_node_annotations):
driver_utils.update_port_pci_info(pod, vif)
if not vif.active:
try:
self._drv_vif_pool.activate_vif(vif)
changed = True
except os_exc.ResourceNotFound:
LOG.debug("Port not found, possibly already "
"deleted. No need to activate it")
finally:
if changed:
try:
self._set_pod_state(pod, state)
except k_exc.K8sResourceNotFound as ex:
LOG.exception("Failed to set annotation: %s", ex)
for ifname, vif in state.vifs.items():
self._drv_vif_pool.release_vif(
pod, vif, project_id,
security_groups)
except k_exc.K8sClientException:
pod_name = pod['metadata']['name']
raise k_exc.ResourceNotReady(pod_name)
if self._is_network_policy_enabled():
crd_pod_selectors = self._drv_sg.create_sg_rules(pod)
if oslo_cfg.CONF.octavia_defaults.enforce_sg_rules:
services = driver_utils.get_services()
self._update_services(
services, crd_pod_selectors, project_id)
LOG.exception("Kubernetes Client Exception creating "
"KuryrPort CRD: %s", ex)
raise k_exc.ResourceNotReady(pod)

def on_deleted(self, pod):
if (driver_utils.is_host_network(pod) or
not pod['spec'].get('nodeName')):
return
k8s = clients.get_kubernetes_client()
k8s.add_finalizer(pod, constants.POD_FINALIZER)

project_id = self._drv_project.get_project(pod)
try:
crd_pod_selectors = self._drv_sg.delete_sg_rules(pod)
except k_exc.ResourceNotReady:
# NOTE(ltomasbo): If the pod is being deleted before
# kuryr-controller annotated any information about the port
# associated, there is no need for deleting sg rules associated to
# it. So this exception could be safetly ignored for the current
# sg drivers. Only the NP driver associates rules to the pods ips,
# and that waits for annotations to start.
LOG.debug("Pod was not yet annotated by Kuryr-controller. "
"Skipping SG rules deletion associated to the pod %s",
pod)
crd_pod_selectors = []
def on_finalize(self, pod):
k8s = clients.get_kubernetes_client()
try:
security_groups = self._drv_sg.get_security_groups(pod, project_id)
except k_exc.ResourceNotReady:
# NOTE(ltomasbo): If the namespace object gets deleted first the
# namespace security group driver will raise a ResourceNotReady
# exception as it cannot access anymore the kuryrnetwork CRD
# annotated on the namespace object. In such case we set security
# groups to empty list so that if pools are enabled they will be
# properly released.
security_groups = []
k8s.delete(KURYRPORT_URI.format(ns=pod["metadata"]["namespace"],
crd=pod["metadata"]["name"]))
except k_exc.K8sResourceNotFound:
k8s.remove_finalizer(pod, constants.POD_FINALIZER)

state = driver_utils.get_pod_state(pod)
LOG.debug("Got VIFs from annotation: %r", state)
if state:
for ifname, vif in state.vifs.items():
self._drv_vif_pool.release_vif(pod, vif, project_id,
security_groups)
if (self._is_network_policy_enabled() and crd_pod_selectors and
oslo_cfg.CONF.octavia_defaults.enforce_sg_rules):
services = driver_utils.get_services()
self._update_services(services, crd_pod_selectors, project_id)
except k_exc.K8sClientException:
LOG.exception("Could not remove KuryrPort CRD for pod %s.",
pod['metadata']['name'])
raise k_exc.ResourceNotReady(pod['metadata']['name'])

def is_ready(self, quota):
if (utils.has_limit(quota.ports) and
@@ -236,42 +134,6 @@ class VIFHandler(k8s_base.ResourceEventHandler):
except KeyError:
return False

def _set_pod_state(self, pod, state):
# TODO(ivc): extract annotation interactions
if not state:
old_annotation = pod['metadata'].get('annotations', {})
LOG.debug("Removing VIFs annotation: %r for pod %s/%s (uid: %s)",
old_annotation.get(constants.K8S_ANNOTATION_VIF),
pod['metadata']['namespace'], pod['metadata']['name'],
pod['metadata']['uid'])
annotation = None
else:
state_dict = state.obj_to_primitive()
annotation = jsonutils.dumps(state_dict, sort_keys=True)
LOG.debug("Setting VIFs annotation: %r for pod %s/%s (uid: %s)",
annotation, pod['metadata']['namespace'],
pod['metadata']['name'], pod['metadata']['uid'])

labels = pod['metadata'].get('labels')
if not labels:
LOG.debug("Removing Label annotation: %r", labels)
labels_annotation = None
else:
labels_annotation = jsonutils.dumps(labels, sort_keys=True)
LOG.debug("Setting Labels annotation: %r", labels_annotation)

# NOTE(dulek): We don't care about compatibility with Queens format
# here, as eventually all Kuryr services will be upgraded
# and cluster will start working normally. Meanwhile
# we just ignore issue of old services being unable to
# read new annotations.

k8s = clients.get_kubernetes_client()
k8s.annotate(pod['metadata']['selfLink'],
{constants.K8S_ANNOTATION_VIF: annotation,
constants.K8S_ANNOTATION_LABEL: labels_annotation},
resource_version=pod['metadata']['resourceVersion'])

def _update_services(self, services, crd_pod_selectors, project_id):
for service in services.get('items'):
if not driver_utils.service_matches_affected_pods(
@@ -285,3 +147,59 @@ class VIFHandler(k8s_base.ResourceEventHandler):
enabled_handlers = oslo_cfg.CONF.kubernetes.enabled_handlers
svc_sg_driver = oslo_cfg.CONF.kubernetes.service_security_groups_driver
return ('policy' in enabled_handlers and svc_sg_driver == 'policy')

def _add_kuryrport_crd(self, pod, vifs=None):
LOG.debug('Adding CRD %s', pod["metadata"]["name"])

if not vifs:
vifs = {}

kuryr_port = {
'apiVersion': constants.K8S_API_CRD_VERSION,
'kind': constants.K8S_OBJ_KURYRPORT,
'metadata': {
'name': pod['metadata']['name'],
'finalizers': [constants.KURYRPORT_FINALIZER],
'labels': {
constants.KURYRPORT_LABEL: pod['spec']['nodeName']
}
},
'spec': {
'podUid': pod['metadata']['uid'],
'podNodeName': pod['spec']['nodeName'],
'vifs': vifs
}
}

k8s = clients.get_kubernetes_client()
k8s.post(KURYRPORT_URI.format(ns=pod["metadata"]["namespace"],
crd=''), kuryr_port)

def _move_annotations_to_crd(self, pod):
"""Support upgrade from annotations to KuryrPort CRD."""
try:
state = (pod['metadata']['annotations']
[constants.K8S_ANNOTATION_VIF])
except KeyError:
return False

_dict = jsonutils.loads(state)
state = objects.base.VersionedObject.obj_from_primitive(_dict)

vifs = {ifname: {'default': state.default_vif == vif,
'vif': objects.base.VersionedObject
.obj_to_primitive(vif)}
for ifname, vif in state.vifs.items()}

try:
self._add_kuryrport_crd(pod, vifs)
except k_exc.K8sClientException as ex:
LOG.exception("Kubernetes Client Exception recreating "
"KuryrPort CRD from annotation: %s", ex)
raise k_exc.ResourceNotReady(pod)

k8s = clients.get_kubernetes_client()
k8s.remove_annotations(pod['metadata']['selfLink'],
constants.K8S_ANNOTATION_VIF)

return True

+ 1
- 1
kuryr_kubernetes/k8s_client.py View File

@@ -316,7 +316,7 @@ class K8sClient(object):
headers=header, cert=self.cert,
verify=self.verify_server)
if response.ok:
return response.json()['metadata']['annotations']
return response.json()['metadata'].get('annotations', {})
if response.status_code == requests.codes.conflict:
resource = self.get(path)
new_version = resource['metadata']['resourceVersion']


+ 11
- 10
kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py View File

@@ -29,10 +29,11 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
self.k8s_mock = self.useFixture(kuryr_fixtures.MockK8sClient()).client
self.default_iface = 'baz'
self.additional_iface = 'eth1'
self.pod = {'metadata': {'name': 'foo', 'uid': 'bar',
'namespace': 'default', 'selfLink': 'baz'}}
self.vifs = fake._fake_vifs_dict()
registry = {'default/foo': {'pod': self.pod, 'vifs': self.vifs,
self.kp = {'metadata': {'name': 'foo', 'uid': 'bar',
'namespace': 'default', 'selfLink': 'baz'},
'spec': {'podUid': 'bar'}}
self.vifs = fake._fake_vifs()
registry = {'default/foo': {'kp': self.kp, 'vifs': self.vifs,
'containerid': None,
'vif_unplugged': False,
'del_received': False}}
@@ -46,7 +47,7 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
@mock.patch('oslo_concurrency.lockutils.lock')
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
def test_add_present(self, m_connect, m_lock):
self.k8s_mock.get.return_value = self.pod
self.k8s_mock.get.return_value = self.kp

self.plugin.add(self.params)

@@ -99,7 +100,7 @@ class TestK8sCNIRegistryPlugin(base.TestCase):

@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
def test_del_wrong_container_id(self, m_disconnect):
registry = {'default/foo': {'pod': self.pod, 'vifs': self.vifs,
registry = {'default/foo': {'kp': self.kp, 'vifs': self.vifs,
'containerid': 'different'}}
healthy = mock.Mock()
self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin(registry, healthy)
@@ -112,11 +113,11 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
def test_add_present_on_5_try(self, m_connect, m_lock):
se = [KeyError] * 5
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None,
se.append({'kp': self.kp, 'vifs': self.vifs, 'containerid': None,
'vif_unplugged': False, 'del_received': False})
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None,
se.append({'kp': self.kp, 'vifs': self.vifs, 'containerid': None,
'vif_unplugged': False, 'del_received': False})
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None,
se.append({'kp': self.kp, 'vifs': self.vifs, 'containerid': None,
'vif_unplugged': False, 'del_received': False})
m_getitem = mock.Mock(side_effect=se)
m_setitem = mock.Mock()
@@ -127,7 +128,7 @@ class TestK8sCNIRegistryPlugin(base.TestCase):

m_lock.assert_called_with('default/foo', external=True)
m_setitem.assert_called_once_with('default/foo',
{'pod': self.pod,
{'kp': self.kp,
'vifs': self.vifs,
'containerid': 'cont_id',
'vif_unplugged': False,


+ 3
- 21
kuryr_kubernetes/tests/unit/controller/drivers/test_vif_pool.py View File

@@ -20,7 +20,6 @@ import ddt
import munch
from openstack import exceptions as os_exc
from oslo_config import cfg as oslo_cfg
from oslo_serialization import jsonutils

from os_vif.objects import vif as osv_vif

@@ -29,7 +28,6 @@ from kuryr_kubernetes.controller.drivers import nested_vlan_vif
from kuryr_kubernetes.controller.drivers import neutron_vif
from kuryr_kubernetes.controller.drivers import vif_pool
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.objects import vif
from kuryr_kubernetes import os_vif_util as ovu
from kuryr_kubernetes.tests import base as test_base
from kuryr_kubernetes.tests import fake
@@ -276,7 +274,8 @@ class BaseVIFPool(test_base.TestCase):

m_driver._return_ports_to_pool.assert_not_called()

def test__get_in_use_ports(self):
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_vifs')
def test__get_in_use_ports(self, get_vifs):
cls = vif_pool.BaseVIFPool
m_driver = mock.MagicMock(spec=cls)

@@ -284,10 +283,7 @@ class BaseVIFPool(test_base.TestCase):
pod = get_pod_obj()
port_id = str(uuid.uuid4())
pod_vif = osv_vif.VIFBase(id=port_id)
pod_state = vif.PodState(default_vif=pod_vif)

pod['metadata']['annotations'][constants.K8S_ANNOTATION_VIF] = (
jsonutils.dumps(pod_state.obj_to_primitive()))
get_vifs.return_value = {'eth0': pod_vif}
items = [pod]
kubernetes.get.return_value = {'items': items}

@@ -295,20 +291,6 @@ class BaseVIFPool(test_base.TestCase):

self.assertEqual(resp, [port_id])

def test__get_in_use_ports_exception(self):
cls = vif_pool.BaseVIFPool
m_driver = mock.MagicMock(spec=cls)

kubernetes = self.useFixture(k_fix.MockK8sClient()).client
pod = get_pod_obj()
del pod['metadata']['annotations'][constants.K8S_ANNOTATION_VIF]
items = [pod]
kubernetes.get.return_value = {'items': items}

resp = cls._get_in_use_ports(m_driver)

self.assertEqual(resp, [])

def test__get_in_use_ports_empty(self):
cls = vif_pool.BaseVIFPool
m_driver = mock.MagicMock(spec=cls)


+ 751
- 0
kuryr_kubernetes/tests/unit/controller/handlers/test_kuryrport.py View File

@@ -0,0 +1,751 @@
# Copyright (c) 2020 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from unittest import mock

from openstack import exceptions as os_exc
from os_vif import objects as os_obj
from oslo_config import cfg

from kuryr_kubernetes import constants
from kuryr_kubernetes.controller.drivers import multi_vif
from kuryr_kubernetes.controller.handlers import kuryrport
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.tests import base as test_base


CONF = cfg.CONF


class TestKuryrPortHandler(test_base.TestCase):

def setUp(self):
super().setUp()
self._project_id = mock.sentinel.project_id
self._subnets = mock.sentinel.subnets
self._security_groups = mock.sentinel.security_groups
self._host = mock.sentinel.hostname
self._pod_version = mock.sentinel.pod_version
self._pod_link = mock.sentinel.pod_link
self._kp_version = mock.sentinel.kp_version
self._kp_link = mock.sentinel.kp_link
self._kp_namespace = mock.sentinel.namespace
self._kp_uid = mock.sentinel.kp_uid
self._kp_name = 'pod1'

self._pod = {'metadata': {'resourceVersion': self._pod_version,
'selfLink': self._pod_link,
'name': self._kp_name,
'namespace': self._kp_namespace},
'spec': {'nodeName': self._host}}

self._kp = {
'metadata': {
'resourceVersion': self._kp_version,
'selfLink': self._kp_link,
'name': self._kp_name,
'namespace': self._kp_namespace,
'labels': {
constants.KURYRPORT_LABEL: self._host
}
},
'spec': {
'podUid': 'deadbeef',
'podNodeName': self._host,
'vifs': {}
}
}
self._vif1 = os_obj.vif.VIFBase()
self._vif2 = os_obj.vif.VIFBase()
self._vif1.active = False
self._vif2.active = False
self._vif1.plugin = 'object'
self._vif2.plugin = 'object'
self._vif1_primitive = self._vif1.obj_to_primitive()
self._vif2_primitive = self._vif2.obj_to_primitive()
self._vifs_primitive = {'eth0': {'default': True,
'vif': self._vif1_primitive},
'eth1': {'default': False,
'vif': self._vif2_primitive}}
self._vifs = {'eth0': {'default': True,
'vif': self._vif1},
'eth1': {'default': False,
'vif': self._vif2}}
self._pod_uri = (f"{constants.K8S_API_NAMESPACES}"
f"/{self._kp['metadata']['namespace']}/pods/"
f"{self._kp['metadata']['name']}")
self._driver = multi_vif.NoopMultiVIFDriver()

@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler.get_vifs')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_no_vifs_create(self, ged, get_k8s_client, get_vifs):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
get_vifs.return_value = True

kp.on_present(self._kp)

get_vifs.assert_called_once_with(self._kp)

@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler.get_vifs')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_getting_vifs_failed(self, ged, get_k8s_client,
get_vifs):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
get_vifs.return_value = False

self.assertFalse(kp.on_present(self._kp))

get_vifs.assert_called_once_with(self._kp)

@mock.patch('kuryr_kubernetes.controller.drivers.default_project.'
'DefaultPodProjectDriver.get_project')
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._update_kuryrport_crd')
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'activate_vif')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present(self, ged, get_k8s_client, activate_vif,
update_crd, get_project):
ged.return_value = [mock.MagicMock]
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive
get_project.return_value = self._project_id

with mock.patch.object(kp, 'k8s') as k8s:
k8s.get.return_value = self._pod

kp.on_present(self._kp)

k8s.get.assert_called_once_with(self._pod_uri)

activate_vif.assert_has_calls([mock.call(self._vif1),
mock.call(self._vif2)])
update_crd.assert_called_once_with(self._kp, self._vifs)

@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_active(self, ged, get_k8s_client):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._vif1.active = True
self._vif2.active = True
self._kp['spec']['vifs'] = {
'eth0': {'default': True,
'vif': self._vif1.obj_to_primitive()},
'eth1': {'default': False,
'vif': self._vif2.obj_to_primitive()}}

kp.on_present(self._kp)

@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._update_kuryrport_crd')
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'activate_vif')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_port_not_found(self, ged, get_k8s_client, activate_vif,
update_crd):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive
activate_vif.side_effect = os_exc.ResourceNotFound()

kp.on_present(self._kp)

activate_vif.assert_has_calls([mock.call(self._vif1),
mock.call(self._vif2)])

@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'activate_vif')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_pod_not_found(self, ged, get_k8s_client, activate_vif):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive

with mock.patch.object(kp, 'k8s') as k8s:
k8s.get.side_effect = k_exc.K8sResourceNotFound(self._pod)

self.assertRaises(k_exc.K8sResourceNotFound, kp.on_present,
self._kp)

k8s.get.assert_called_once_with(self._pod_uri)

@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'release_vif')
@mock.patch('kuryr_kubernetes.controller.drivers.default_security_groups.'
'DefaultPodSecurityGroupsDriver.get_security_groups')
@mock.patch('kuryr_kubernetes.controller.drivers.default_project.'
'DefaultPodProjectDriver.get_project')
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._update_kuryrport_crd')
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'activate_vif')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_fail_update_crd(self, ged, get_k8s_client,
activate_vif, update_crd, get_project,
get_sg, release_vif):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive
update_crd.side_effect = k_exc.K8sResourceNotFound(self._kp)
get_project.return_value = self._project_id
get_sg.return_value = self._security_groups

with mock.patch.object(kp, 'k8s') as k8s:
k8s.get.return_value = self._pod

kp.on_present(self._kp)

k8s.get.assert_called_once_with(self._pod_uri)

@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'release_vif')
@mock.patch('kuryr_kubernetes.controller.drivers.default_security_groups.'
'DefaultPodSecurityGroupsDriver.get_security_groups')
@mock.patch('kuryr_kubernetes.controller.drivers.default_project.'
'DefaultPodProjectDriver.get_project')
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._update_kuryrport_crd')
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'activate_vif')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_exception_during_update_crd(self, ged, get_k8s_client,
activate_vif,
update_crd, get_project,
get_sg, release_vif):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive
update_crd.side_effect = k_exc.K8sClientException()
get_project.return_value = self._project_id
get_sg.return_value = self._security_groups

with mock.patch.object(kp, 'k8s') as k8s:
k8s.get.return_value = self._pod

self.assertRaises(k_exc.ResourceNotReady, kp.on_present, self._kp)

k8s.get.assert_called_once_with(self._pod_uri)

update_crd.assert_called_once_with(self._kp, self._vifs)

@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'activate_vif')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'
'update_port_pci_info')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_sriov(self, ged, get_k8s_client, update_port_pci_info,
activate_vif):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._vif2.plugin = constants.KURYR_VIF_TYPE_SRIOV
self._vif2.active = True
self._kp['spec']['vifs'] = {
'eth0': {'default': True,
'vif': self._vif2.obj_to_primitive()},
'eth1': {'default': False,
'vif': self._vif1.obj_to_primitive()}}
CONF.set_override('enable_node_annotations', True, group='sriov')
self.addCleanup(CONF.clear_override, 'enable_node_annotations',
group='sriov')
activate_vif.side_effect = os_exc.ResourceNotFound()

kp.on_present(self._kp)

update_port_pci_info.assert_called_once_with(self._host, self._vif2)

@mock.patch('kuryr_kubernetes.controller.drivers.default_project.'
'DefaultPodProjectDriver.get_project')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._update_services')
@mock.patch('kuryr_kubernetes.controller.drivers.default_security_groups.'
'DefaultPodSecurityGroupsDriver.create_sg_rules')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.LBaaSDriver.'
'get_instance')
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._update_kuryrport_crd')
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'activate_vif')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._is_network_policy_enabled')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_np(self, ged, is_np_enabled, get_k8s_client,
activate_vif, update_crd, get_lb_instance,
get_sg_instance, create_sgr, update_services,
get_services, get_project):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive

with mock.patch.object(kp, 'k8s') as k8s:
k8s.get.return_value = self._pod

kp.on_present(self._kp)

k8s.get.assert_called_once_with(self._pod_uri)

activate_vif.assert_has_calls([mock.call(self._vif1),
mock.call(self._vif2)])
update_crd.assert_called_once_with(self._kp, self._vifs)
create_sgr.assert_called_once_with(self._pod)

@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_finalize_exception_on_pod(self, ged, k8s):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive

with mock.patch.object(kp, 'k8s') as k8s:
k8s.get.side_effect = k_exc.K8sResourceNotFound(self._pod)

self.assertRaises(k_exc.K8sResourceNotFound, kp.on_finalize,
self._kp)

k8s.get.assert_called_once_with(self._pod_uri)
k8s.remove_finalizer.assert_called_once_with(
self._kp, constants.POD_FINALIZER)

@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_finalize_host_net_or_no_nodename(self, ged, k8s,
is_host_network):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive
is_host_network.return_value = False
_pod = dict(self._pod)
del _pod['spec']['nodeName']
with mock.patch.object(kp, 'k8s') as k8s:
k8s.get.return_value = _pod

kp.on_finalize(self._kp)

k8s.get.assert_called_once_with(self._pod_uri)

is_host_network.assert_called_once_with(self._pod)
is_host_network.reset_mock()