CNI: Watch for deleted pods

It can happen that we get the CNI request, but pod gets deleted before
kuryr-controller was able to create KuryrPort for it. If kuryr-daemon
only watches for KuryrPort events it will not be able to notice that
and will wait until the timeout, which in effect doesn't play well with
some K8s tests.

This commit adds a separate Service that will watch on Pod events and if
Pod gets deleted we'll make sure to put a sentinel value (None) into the
registry so that the thread waiting for the KuryrPort to appear there
will know that it has to stop and raise an error.

Closes-Bug: 1963678
Change-Id: I52fc1805ec47f24c1da88fd13e79e928a3693419
This commit is contained in:
Michał Dulko 2022-01-27 18:06:08 +01:00
parent d5f5db7005
commit 0176b4a98c
9 changed files with 269 additions and 208 deletions

View File

@ -21,7 +21,6 @@ import queue
import sys
import threading
import time
import urllib.parse
import urllib3
import cotyledon
@ -31,27 +30,23 @@ from pyroute2.ipdb import transactional
from werkzeug import serving
import os_vif
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from kuryr_kubernetes import clients
from kuryr_kubernetes.cni import handlers as h_cni
from kuryr_kubernetes.cni.daemon import watcher_service
from kuryr_kubernetes.cni import health
from kuryr_kubernetes.cni.plugins import k8s_cni_registry
from kuryr_kubernetes.cni import prometheus_exporter
from kuryr_kubernetes.cni import utils as cni_utils
from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes import exceptions
from kuryr_kubernetes import objects
from kuryr_kubernetes import utils
from kuryr_kubernetes import watcher as k_watcher
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
HEALTH_CHECKER_DELAY = 5
ErrContainerUnknown = 3
ErrInvalidEnvironmentVariables = 4
ErrTryAgainLater = 11
ErrInternal = 999
@ -107,6 +102,10 @@ class DaemonServer(object):
try:
vif = self.plugin.add(params)
data = jsonutils.dumps(vif.obj_to_primitive())
except (exceptions.CNIPodGone, exceptions.CNIPodUidMismatch) as e:
LOG.warning('Pod deleted while processing ADD request')
error = self._error(ErrContainerUnknown, str(e))
return error, httplib.GONE, self.headers
except exceptions.CNITimeout as e:
LOG.exception('Timeout on ADD request')
error = self._error(ErrTryAgainLater, f"{e}. Try Again Later.")
@ -247,89 +246,6 @@ class CNIDaemonServerService(cotyledon.Service):
self.server.stop()
class CNIDaemonWatcherService(cotyledon.Service):
name = "watcher"
def __init__(self, worker_id, registry, healthy):
super(CNIDaemonWatcherService, self).__init__(worker_id)
self.pipeline = None
self.watcher = None
self.health_thread = None
self.registry = registry
self.healthy = healthy
def run(self):
self.pipeline = h_cni.CNIPipeline()
self.pipeline.register(h_cni.CallbackHandler(self.on_done,
self.on_deleted))
self.watcher = k_watcher.Watcher(self.pipeline)
query_label = urllib.parse.quote_plus(f'{k_const.KURYRPORT_LABEL}='
f'{utils.get_nodename()}')
self.watcher.add(f'{k_const.K8S_API_CRD_KURYRPORTS}'
f'?labelSelector={query_label}')
self.is_running = True
self.health_thread = threading.Thread(
target=self._start_watcher_health_checker)
self.health_thread.start()
self.watcher.start()
def _start_watcher_health_checker(self):
while self.is_running:
if not self.watcher.is_alive():
LOG.debug("Reporting watcher not healthy.")
with self.healthy.get_lock():
self.healthy.value = False
time.sleep(HEALTH_CHECKER_DELAY)
def on_done(self, kuryrport, vifs):
kp_name = utils.get_res_unique_name(kuryrport)
with lockutils.lock(kp_name, external=True):
if (kp_name not in self.registry or
self.registry[kp_name]['kp']['metadata']['uid']
!= kuryrport['metadata']['uid']):
self.registry[kp_name] = {'kp': kuryrport,
'vifs': vifs,
'containerid': None,
'vif_unplugged': False,
'del_received': False}
else:
old_vifs = self.registry[kp_name]['vifs']
for iface in vifs:
if old_vifs[iface].active != vifs[iface].active:
kp_dict = self.registry[kp_name]
kp_dict['vifs'] = vifs
self.registry[kp_name] = kp_dict
def on_deleted(self, kp):
kp_name = utils.get_res_unique_name(kp)
try:
if kp_name in self.registry:
# NOTE(ndesh): We need to lock here to avoid race condition
# with the deletion code for CNI DEL so that
# we delete the registry entry exactly once
with lockutils.lock(kp_name, external=True):
if self.registry[kp_name]['vif_unplugged']:
del self.registry[kp_name]
else:
kp_dict = self.registry[kp_name]
kp_dict['del_received'] = True
self.registry[kp_name] = kp_dict
except KeyError:
# This means someone else removed it. It's odd but safe to ignore.
LOG.debug('KuryrPort %s entry already removed from registry while '
'handling DELETED event. Ignoring.', kp_name)
pass
def terminate(self):
self.is_running = False
if self.health_thread:
self.health_thread.join()
if self.watcher:
self.watcher.stop()
class CNIDaemonHealthServerService(cotyledon.Service):
name = "health"
@ -393,7 +309,10 @@ class CNIDaemonServiceManager(cotyledon.ServiceManager):
registry = self.manager.dict() # For Watcher->Server communication.
healthy = multiprocessing.Value(c_bool, True)
metrics = self.manager.Queue()
self.add(CNIDaemonWatcherService, workers=1, args=(registry, healthy,))
self.add(watcher_service.KuryrPortWatcherService, workers=1,
args=(registry, healthy,))
self.add(watcher_service.PodWatcherService, workers=1,
args=(registry, healthy,))
self._server_service = self.add(CNIDaemonServerService, workers=1,
args=(registry, healthy, metrics,))
self.add(CNIDaemonHealthServerService, workers=1, args=(healthy,))

View File

@ -0,0 +1,94 @@
# Copyright 2022 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
import time
import urllib.parse
import cotyledon
from oslo_config import cfg
from oslo_log import log as logging
from kuryr_kubernetes.cni import handlers
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes import utils
from kuryr_kubernetes import watcher as k_watcher
HEALTH_CHECKER_DELAY = 5
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class BaseCNIDaemonWatcherService(cotyledon.Service):
name = "watcher"
def __init__(self, worker_id, handler, path, registry, healthy):
super().__init__(worker_id)
self.pipeline = None
self.watcher = None
self.health_thread = None
self.handler = handler
self.registry = registry
self.healthy = healthy
self.path = path
self.is_running = False
def run(self):
self.pipeline = handlers.CNIPipeline()
self.pipeline.register(self.handler)
self.watcher = k_watcher.Watcher(self.pipeline)
self.watcher.add(self.path)
self.is_running = True
self.health_thread = threading.Thread(
target=self._start_watcher_health_checker)
self.health_thread.start()
self.watcher.start()
def _start_watcher_health_checker(self):
while self.is_running:
if not self.watcher.is_alive():
LOG.warning(f"Reporting watcher {self.__class__.__name__} is "
f"not healthy because it's not running anymore.")
with self.healthy.get_lock():
self.healthy.value = False
time.sleep(HEALTH_CHECKER_DELAY)
def terminate(self):
self.is_running = False
if self.health_thread:
self.health_thread.join()
if self.watcher:
self.watcher.stop()
class KuryrPortWatcherService(BaseCNIDaemonWatcherService):
def __init__(self, worker_id, registry, healthy):
query_label = urllib.parse.quote_plus(f'{k_const.KURYRPORT_LABEL}='
f'{utils.get_nodename()}')
path = f'{k_const.K8S_API_CRD_KURYRPORTS}?labelSelector={query_label}'
handler = handlers.CNIKuryrPortHandler(registry)
super().__init__(worker_id, handler, path, registry, healthy)
class PodWatcherService(BaseCNIDaemonWatcherService):
def __init__(self, worker_id, registry, healthy):
query_label = urllib.parse.quote_plus(f'spec.nodeName='
f'{utils.get_nodename()}')
path = f'{k_const.K8S_API_PODS}?fieldSelector={query_label}'
handler = handlers.CNIPodHandler(registry)
super().__init__(worker_id, handler, path, registry, healthy)

View File

@ -13,105 +13,98 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
from os_vif import objects as obj_vif
from oslo_concurrency import lockutils
from oslo_log import log as logging
from kuryr_kubernetes import clients
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.handlers import dispatch as k_dis
from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__)
class CNIHandlerBase(k8s_base.ResourceEventHandler, metaclass=abc.ABCMeta):
class CNIKuryrPortHandler(k8s_base.ResourceEventHandler):
OBJECT_KIND = k_const.K8S_OBJ_KURYRPORT
def __init__(self, cni, on_done):
self._cni = cni
self._callback = on_done
self._vifs = {}
def __init__(self, registry):
super().__init__()
self.registry = registry
def on_present(self, pod, *args, **kwargs):
vifs = self._get_vifs(pod)
def on_vif(self, kuryrport, vifs):
kp_name = utils.get_res_unique_name(kuryrport)
with lockutils.lock(kp_name, external=True):
if (kp_name not in self.registry or
self.registry[kp_name] == k_const.CNI_DELETED_POD_SENTINEL
or self.registry[kp_name]['kp']['metadata']['uid'] !=
kuryrport['metadata']['uid']):
self.registry[kp_name] = {'kp': kuryrport,
'vifs': vifs,
'containerid': None,
'vif_unplugged': False,
'del_received': False}
else:
old_vifs = self.registry[kp_name]['vifs']
for iface in vifs:
if old_vifs[iface].active != vifs[iface].active:
kp_dict = self.registry[kp_name]
kp_dict['vifs'] = vifs
self.registry[kp_name] = kp_dict
if self.should_callback(pod, vifs):
self.callback()
@abc.abstractmethod
def should_callback(self, pod, vifs):
"""Called after all vifs have been processed
Should determine if the CNI is ready to call the callback
:param pod: dict containing Kubernetes Pod object
:param vifs: dict containing os_vif VIF objects and ifnames
:returns True/False
"""
raise NotImplementedError()
@abc.abstractmethod
def callback(self):
"""Called if should_callback returns True"""
raise NotImplementedError()
def _get_vifs(self, pod):
k8s = clients.get_kubernetes_client()
def on_deleted(self, kuryrport, *args, **kwargs):
kp_name = utils.get_res_unique_name(kuryrport)
try:
kuryrport_crd = k8s.get(f'{k_const.K8S_API_CRD_NAMESPACES}/'
f'{pod["metadata"]["namespace"]}/'
f'kuryrports/{pod["metadata"]["name"]}')
LOG.debug("Got CRD: %r", kuryrport_crd)
except k_exc.K8sClientException:
return {}
if (kp_name in self.registry and self.registry[kp_name]
!= k_const.CNI_DELETED_POD_SENTINEL):
# NOTE(ndesh): We need to lock here to avoid race condition
# with the deletion code for CNI DEL so that
# we delete the registry entry exactly once
with lockutils.lock(kp_name, external=True):
if self.registry[kp_name]['vif_unplugged']:
del self.registry[kp_name]
else:
kp_dict = self.registry[kp_name]
kp_dict['del_received'] = True
self.registry[kp_name] = kp_dict
except KeyError:
# This means someone else removed it. It's odd but safe to ignore.
LOG.debug('KuryrPort %s entry already removed from registry while '
'handling DELETED event. Ignoring.', kp_name)
pass
vifs_dict = {k: obj_vif.base.VersionedObject
.obj_from_primitive(v['vif'])
for k, v in kuryrport_crd['status']['vifs'].items()}
def on_present(self, kuryrport, *args, **kwargs):
LOG.debug('MODIFIED event for KuryrPort %s',
utils.get_res_unique_name(kuryrport))
vifs = self._get_vifs(kuryrport)
if vifs:
self.on_vif(kuryrport, vifs)
def _get_vifs(self, kuryrport):
vifs_dict = {
k: obj_vif.base.VersionedObject.obj_from_primitive(v['vif'])
for k, v in kuryrport['status']['vifs'].items()}
LOG.debug("Got vifs: %r", vifs_dict)
return vifs_dict
def _get_inst(self, pod):
return obj_vif.instance_info.InstanceInfo(
uuid=pod['metadata']['uid'], name=pod['metadata']['name'])
class CNIPodHandler(k8s_base.ResourceEventHandler):
OBJECT_KIND = k_const.K8S_OBJ_POD
class CallbackHandler(CNIHandlerBase):
def __init__(self, registry):
super().__init__()
self.registry = registry
def __init__(self, on_vif, on_del=None):
super(CallbackHandler, self).__init__(None, on_vif)
self._del_callback = on_del
self._kuryrport = None
self._callback_vifs = None
def should_callback(self, kuryrport, vifs):
"""Called after all vifs have been processed
Calls callback if there was at least one vif in the CRD
:param kuryrport: dict containing Kubernetes KuryrPort CRD object
:param vifs: dict containing os_vif VIF objects and ifnames
:returns True/False
"""
self._kuryrport = kuryrport
self._callback_vifs = vifs
if vifs:
return True
return False
def callback(self):
self._callback(self._kuryrport, self._callback_vifs)
def on_deleted(self, kuryrport, *args, **kwargs):
LOG.debug("Got kuryrport %s deletion event.",
kuryrport['metadata']['name'])
if self._del_callback:
self._del_callback(kuryrport)
def on_finalize(self, pod, *args, **kwargs):
# TODO(dulek): Verify if this is the handler for such case.
kp_name = utils.get_res_unique_name(pod)
with lockutils.lock(kp_name, external=True):
# If there was no KP and Pod got deleted, we need inform the
# thread waiting for it about that. We'll insert sentinel value.
if kp_name not in self.registry:
self.registry[kp_name] = k_const.CNI_DELETED_POD_SENTINEL
class CNIPipeline(k_dis.EventPipeline):

View File

@ -31,15 +31,6 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
RETRY_DELAY = 1000 # 1 second in milliseconds
# TODO(dulek, gryf): Another corner case is (and was) when pod is deleted
# before it's corresponding CRD was created and populated by vifs by
# controller or even noticed by any watcher. Kubelet will try to delete such
# vif, but we will have no data about it. This is currently worked around by
# returning successfully in case of timing out in delete. To solve this
# properly we need to watch for pod deletes as well, or perhaps create
# finalizer for the pod as soon, as we know, that kuryrport CRD will be
# created.
class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
def __init__(self, registry, healthy):
@ -57,6 +48,8 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
try:
return self.k8s.get(
f'{k_const.K8S_API_NAMESPACES}/{namespace}/pods/{name}')
except exceptions.K8sResourceNotFound:
return None
except exceptions.K8sClientException:
uniq_name = self._get_obj_name(params)
LOG.exception('Error when getting Pod %s', uniq_name)
@ -72,6 +65,8 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
if 'K8S_POD_UID' not in params.args:
# CRI doesn't pass K8S_POD_UID, get it from the API.
pod = self._get_pod(params)
if not pod:
raise exceptions.CNIPodGone(kp_name)
params.args.K8S_POD_UID = pod['metadata']['uid']
vifs = self._do_work(params, b_base.connect, timeout)
@ -117,6 +112,14 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
def delete(self, params):
kp_name = self._get_obj_name(params)
try:
with lockutils.lock(kp_name, external=True):
kp = self.registry[kp_name]
if kp == k_const.CNI_DELETED_POD_SENTINEL:
LOG.warning(
'Received DEL request for deleted Pod %s without a'
'KuryrPort. Ignoring.', kp_name)
del self.registry[kp_name]
return
reg_ci = self.registry[kp_name]['containerid']
LOG.debug('Read containerid = %s for KuryrPort %s', reg_ci,
kp_name)
@ -179,6 +182,10 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
e, (KeyError, exceptions.CNIPodUidMismatch)))
def find():
d = self.registry[kp_name]
if d == k_const.CNI_DELETED_POD_SENTINEL:
# Pod got deleted meanwhile
raise exceptions.CNIPodGone(kp_name)
static = d['kp']['spec'].get('podStatic', None)
uid = d['kp']['spec']['podUid']
# FIXME(dulek): This is weirdly structured for upgrades support.

View File

@ -16,6 +16,7 @@
KURYR_FQDN = 'kuryr.openstack.org'
K8S_API_BASE = '/api/v1'
K8S_API_PODS = K8S_API_BASE + '/pods'
K8S_API_NAMESPACES = K8S_API_BASE + '/namespaces'
K8S_API_CRD_VERSION = 'openstack.org/v1'
K8S_API_CRD = '/apis/' + K8S_API_CRD_VERSION
@ -91,6 +92,7 @@ K8S_OS_VIF_NOOP_PLUGIN = "noop"
CNI_EXCEPTION_CODE = 100
CNI_TIMEOUT_CODE = 200
CNI_DELETED_POD_SENTINEL = None
KURYR_PORT_NAME = 'kuryr-pool-port'
KURYR_VIF_TYPE_SRIOV = 'sriov'

View File

@ -170,7 +170,7 @@ class CNIBindingFailure(Exception):
super(CNIBindingFailure, self).__init__(message)
class CNIPodUidMismatch(CNITimeout):
class CNIPodUidMismatch(Exception):
"""Excepton raised on a mismatch of CNI request's pod UID and KuryrPort"""
def __init__(self, name, expected, observed):
super().__init__(
@ -179,6 +179,13 @@ class CNIPodUidMismatch(CNITimeout):
f' race conditions.')
class CNIPodGone(Exception):
"""Excepton raised when Pod got deleted while processing a CNI request"""
def __init__(self, name):
super().__init__(
f'Pod {name} got deleted while processing the CNI ADD request.')
class UnreachableOctavia(Exception):
"""Exception indicates Octavia API failure and can not be reached

View File

@ -197,8 +197,9 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
is_default_gateway=False,
container_id='cont_id')
@mock.patch('oslo_concurrency.lockutils.lock')
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
def test_del_wrong_container_id(self, m_disconnect):
def test_del_wrong_container_id(self, m_disconnect, m_lock):
registry = {'default/foo': {'kp': self.kp, 'vifs': self.vifs,
'containerid': 'different'}}
healthy = mock.Mock()
@ -206,6 +207,7 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
self.plugin.delete(self.params)
m_disconnect.assert_not_called()
m_lock.assert_called_with('default/foo', external=True)
@mock.patch('oslo_concurrency.lockutils.lock')
@mock.patch('time.sleep', mock.Mock())

View File

@ -0,0 +1,68 @@
# Copyright 2021 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock
from kuryr_kubernetes.cni import handlers
from kuryr_kubernetes.tests import base
class TestCNIDaemonHandlers(base.TestCase):
def setUp(self):
super().setUp()
self.registry = {}
self.pod = {'metadata': {'namespace': 'testing',
'name': 'default'},
'vif_unplugged': False,
'del_receieved': False}
self.healthy = mock.Mock()
self.port_handler = handlers.CNIKuryrPortHandler(self.registry)
self.pod_handler = handlers.CNIPodHandler(self.registry)
@mock.patch('oslo_concurrency.lockutils.lock')
def test_kp_on_deleted(self, m_lock):
pod = self.pod
pod['vif_unplugged'] = True
pod_name = 'testing/default'
self.registry[pod_name] = pod
self.port_handler.on_deleted(pod)
self.assertNotIn(pod_name, self.registry)
@mock.patch('oslo_concurrency.lockutils.lock')
def test_kp_on_deleted_false(self, m_lock):
pod = self.pod
pod_name = 'testing/default'
self.registry[pod_name] = pod
self.port_handler.on_deleted(pod)
self.assertIn(pod_name, self.registry)
self.assertIs(True, pod['del_received'])
@mock.patch('oslo_concurrency.lockutils.lock')
def test_pod_on_finalize(self, m_lock):
pod = self.pod
pod_name = 'testing/default'
self.pod_handler.on_finalize(pod)
self.assertIn(pod_name, self.registry)
self.assertIsNone(self.registry[pod_name])
m_lock.assert_called_once_with(pod_name, external=True)
@mock.patch('oslo_concurrency.lockutils.lock')
def test_pod_on_finalize_exists(self, m_lock):
pod = self.pod
pod_name = 'testing/default'
self.registry[pod_name] = pod
self.pod_handler.on_finalize(pod)
self.assertIn(pod_name, self.registry)
self.assertIsNotNone(self.registry[pod_name])
m_lock.assert_called_once_with(pod_name, external=True)

View File

@ -107,34 +107,3 @@ class TestDaemonServer(base.TestCase):
m_delete.assert_called_once_with(mock.ANY)
self.assertEqual(500, resp.status_code)
class TestCNIDaemonWatcherService(base.TestCase):
def setUp(self):
super(TestCNIDaemonWatcherService, self).setUp()
self.registry = {}
self.pod = {'metadata': {'namespace': 'testing',
'name': 'default'},
'vif_unplugged': False,
'del_receieved': False}
self.healthy = mock.Mock()
self.watcher = service.CNIDaemonWatcherService(
0, self.registry, self.healthy)
@mock.patch('oslo_concurrency.lockutils.lock')
def test_on_deleted(self, m_lock):
pod = self.pod
pod['vif_unplugged'] = True
pod_name = 'testing/default'
self.registry[pod_name] = pod
self.watcher.on_deleted(pod)
self.assertNotIn(pod_name, self.registry)
@mock.patch('oslo_concurrency.lockutils.lock')
def test_on_deleted_false(self, m_lock):
pod = self.pod
pod_name = 'testing/default'
self.registry[pod_name] = pod
self.watcher.on_deleted(pod)
self.assertIn(pod_name, self.registry)
self.assertIs(True, pod['del_received'])