Merge "CNI: Use K8S_POD_UID passed from CRI"

This commit is contained in:
Zuul 2022-03-30 10:41:25 +00:00 committed by Gerrit Code Review
commit c24002fe78
10 changed files with 193 additions and 39 deletions

View File

@ -29,6 +29,8 @@ spec:
type: string
podNodeName:
type: string
podStatic:
type: boolean
status:
type: object
required:

View File

@ -176,7 +176,12 @@ def disconnect(vif, instance_info, ifname, netns=None, report_health=None,
@cni_utils.log_ipdb
def cleanup(ifname, netns):
try:
with get_ipdb(netns) as c_ipdb:
if ifname in c_ipdb.interfaces:
with c_ipdb.interfaces[ifname] as iface:
iface.remove()
except Exception:
# Just ignore cleanup errors, there's not much we can do anyway.
LOG.warning('Error occured when attempting to clean up netns %s. '
'Ignoring.', netns)

View File

@ -153,9 +153,9 @@ class DaemonServer(object):
try:
self.plugin.delete(params)
except exceptions.CNIKuryrPortTimeout:
# NOTE(dulek): It's better to ignore this error - most of the time
# it will happen when pod is long gone and CRI
except (exceptions.CNIKuryrPortTimeout, exceptions.CNIPodUidMismatch):
# NOTE(dulek): It's better to ignore these errors - most of the
# time it will happen when pod is long gone and CRI
# overzealously tries to delete it from the network.
# We cannot really do anything without VIF annotation,
# so let's just tell CRI to move along.

View File

@ -48,31 +48,31 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
self.k8s = clients.get_kubernetes_client()
def _get_obj_name(self, params):
return "%(namespace)s/%(name)s" % {
'namespace': params.args.K8S_POD_NAMESPACE,
'name': params.args.K8S_POD_NAME}
return f'{params.args.K8S_POD_NAMESPACE}/{params.args.K8S_POD_NAME}'
def _get_pod(self, params):
namespace = params.args.K8S_POD_NAMESPACE
name = params.args.K8S_POD_NAME
try:
return self.k8s.get(
f'{k_const.K8S_API_NAMESPACES}/{namespace}/pods/{name}')
except exceptions.K8sClientException:
uniq_name = self._get_obj_name(params)
LOG.exception('Error when getting Pod %s', uniq_name)
raise
def add(self, params):
kp_name = self._get_obj_name(params)
timeout = CONF.cni_daemon.vif_annotation_timeout
# Try to confirm if CRD in the registry is not stale cache. If it is,
# remove it.
with lockutils.lock(kp_name, external=True):
if kp_name in self.registry:
cached_kp = self.registry[kp_name]['kp']
try:
kp = self.k8s.get(k_utils.get_res_link(cached_kp))
except Exception:
LOG.exception('Error when getting KuryrPort %s', kp_name)
raise exceptions.ResourceNotReady(kp_name)
if kp['metadata']['uid'] != cached_kp['metadata']['uid']:
LOG.warning('Stale KuryrPort %s detected in cache. (API '
'uid=%s, cached uid=%s). Removing it from '
'cache.', kp_name, kp['metadata']['uid'],
cached_kp['metadata']['uid'])
del self.registry[kp_name]
# In order to fight race conditions when pods get recreated with the
# same name (think StatefulSet), we're trying to get pod UID either
# from the request or the API in order to use it as the ID to compare.
if 'K8S_POD_UID' not in params.args:
# CRI doesn't pass K8S_POD_UID, get it from the API.
pod = self._get_pod(params)
params.args.K8S_POD_UID = pod['metadata']['uid']
vifs = self._do_work(params, b_base.connect, timeout)
@ -138,7 +138,7 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
# delay before registry is populated by watcher.
try:
self._do_work(params, b_base.disconnect, 5)
except exceptions.CNIKuryrPortTimeout:
except (exceptions.CNIKuryrPortTimeout, exceptions.CNIPodUidMismatch):
# So the VIF info seems to be lost at this point, we don't even
# know what binding driver was used to plug it. Let's at least
# try to remove the interface we created from the netns to prevent
@ -170,19 +170,42 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
LOG.debug("Reporting CNI driver not healthy.")
self.healthy.value = driver_healthy
def _do_work(self, params, fn, timeout):
def _get_vifs_from_registry(self, params, timeout):
kp_name = self._get_obj_name(params)
# In case of KeyError retry for `timeout` s, wait 1 s between tries.
@retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY,
retry_on_exception=lambda e: isinstance(e, KeyError))
retry_on_exception=lambda e: isinstance(
e, (KeyError, exceptions.CNIPodUidMismatch)))
def find():
return self.registry[kp_name]
d = self.registry[kp_name]
static = d['kp']['spec'].get('podStatic', None)
uid = d['kp']['spec']['podUid']
# FIXME(dulek): This is weirdly structured for upgrades support.
# If podStatic is not set (KuryrPort created by old
# Kuryr version), then on uid mismatch we're fetching
# pod from API and check if it's static here. Pods
# are quite ephemeral, so will gradually get replaced
# after the upgrade and in a while all should have
# the field set and the performance penalty should
# be resolved. Remove in the future.
if 'K8S_POD_UID' in params.args and uid != params.args.K8S_POD_UID:
if static is None:
pod = self._get_pod(params)
static = k_utils.is_pod_static(pod)
# Static pods have mirror pod UID in API, so it's always
# mismatched. We don't raise in that case. See [1] for more.
# [1] https://github.com/k8snetworkplumbingwg/multus-cni/
# issues/773
if not static:
raise exceptions.CNIPodUidMismatch(
kp_name, params.args.K8S_POD_UID, uid)
return d
try:
d = find()
kp = d['kp']
vifs = d['vifs']
return d['kp'], d['vifs']
except KeyError:
data = {'metadata': {'name': params.args.K8S_POD_NAME,
'namespace': params.args.K8S_POD_NAMESPACE}}
@ -194,6 +217,9 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
'kuryr-daemon')
raise exceptions.CNIKuryrPortTimeout(kp_name)
def _do_work(self, params, fn, timeout):
kp, vifs = self._get_vifs_from_registry(params, timeout)
for ifname, vif in vifs.items():
is_default_gateway = (ifname == k_const.DEFAULT_IFNAME)
if is_default_gateway:

View File

@ -59,6 +59,9 @@ class CNIArgs(object):
if not k.startswith('_'):
setattr(self, k, v)
def __contains__(self, key):
return hasattr(self, key)
class CNIParameters(object):
def __init__(self, env, cfg=None):

View File

@ -76,6 +76,7 @@ K8S_ANNOTATION_CURRENT_DRIVER = 'current_driver'
K8S_ANNOTATION_NEUTRON_PORT = 'neutron_id'
K8S_ANNOTATION_HEADLESS_SERVICE = 'service.kubernetes.io/headless'
K8S_ANNOTATION_CONFIG_SOURCE = 'kubernetes.io/config.source'
POD_FINALIZER = KURYR_FQDN + '/pod-finalizer'
KURYRNETWORK_FINALIZER = 'kuryrnetwork.finalizers.kuryr.openstack.org'

View File

@ -207,7 +207,8 @@ class VIFHandler(k8s_base.ResourceEventHandler):
},
'spec': {
'podUid': pod['metadata']['uid'],
'podNodeName': pod['spec']['nodeName']
'podNodeName': pod['spec']['nodeName'],
'podStatic': utils.is_pod_static(pod)
},
'status': {
'vifs': vifs

View File

@ -170,6 +170,15 @@ class CNIBindingFailure(Exception):
super(CNIBindingFailure, self).__init__(message)
class CNIPodUidMismatch(CNITimeout):
"""Excepton raised on a mismatch of CNI request's pod UID and KuryrPort"""
def __init__(self, name, expected, observed):
super().__init__(
f'uid {observed} of the pod {name} does not match the uid '
f'{expected} requested by the CNI. Dropping CNI request to prevent'
f' race conditions.')
class UnreachableOctavia(Exception):
"""Exception indicates Octavia API failure and can not be reached

View File

@ -17,6 +17,7 @@ from unittest import mock
from oslo_config import cfg
from kuryr_kubernetes.cni.plugins import k8s_cni_registry
from kuryr_kubernetes.cni import utils
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.tests import base
from kuryr_kubernetes.tests import fake
@ -33,7 +34,7 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
'kind': 'KuryrPort',
'metadata': {'name': 'foo', 'uid': 'bar',
'namespace': 'default'},
'spec': {'podUid': 'bar'}}
'spec': {'podUid': 'bar', 'podStatic': False}}
self.vifs = fake._fake_vifs()
registry = {'default/foo': {'kp': self.kp, 'vifs': self.vifs,
'containerid': None,
@ -41,8 +42,9 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
'del_received': False}}
healthy = mock.Mock()
self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin(registry, healthy)
self.params = mock.Mock(args=mock.Mock(K8S_POD_NAME='foo',
K8S_POD_NAMESPACE='default'),
self.params = mock.Mock(
args=utils.CNIArgs('K8S_POD_NAME=foo;K8S_POD_NAMESPACE=default;'
'K8S_POD_UID=bar'),
CNI_IFNAME=self.default_iface, CNI_NETNS=123,
CNI_CONTAINERID='cont_id')
@ -65,6 +67,101 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
self.assertEqual('cont_id',
self.plugin.registry['default/foo']['containerid'])
@mock.patch('oslo_concurrency.lockutils.lock')
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
def test_add_no_uid(self, m_connect, m_lock):
self.k8s_mock.get.return_value = self.kp
self.params.args = utils.CNIArgs(
'K8S_POD_NAME=foo;K8S_POD_NAMESPACE=default')
self.plugin.add(self.params)
m_lock.assert_called_with('default/foo', external=True)
m_connect.assert_any_call(mock.ANY, mock.ANY, self.default_iface,
123, report_health=mock.ANY,
is_default_gateway=True,
container_id='cont_id')
m_connect.assert_any_call(mock.ANY, mock.ANY, self.additional_iface,
123, report_health=mock.ANY,
is_default_gateway=False,
container_id='cont_id')
self.k8s_mock.get.assert_any_call(
'/api/v1/namespaces/default/pods/foo')
self.assertEqual('cont_id',
self.plugin.registry['default/foo']['containerid'])
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
def test_add_wrong_uid(self, m_connect):
cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon')
self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120,
group='cni_daemon')
self.k8s_mock.get.return_value = self.kp
self.params.args = utils.CNIArgs(
'K8S_POD_NAME=foo;K8S_POD_NAMESPACE=default;K8S_POD_UID=blob')
self.assertRaises(exceptions.CNIPodUidMismatch, self.plugin.add,
self.params)
m_connect.assert_not_called()
self.k8s_mock.get.assert_not_called()
@mock.patch('oslo_concurrency.lockutils.lock')
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
def test_add_wrong_uid_static(self, m_connect, m_lock):
cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon')
self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120,
group='cni_daemon')
self.k8s_mock.get.return_value = self.kp
self.params.args = utils.CNIArgs(
'K8S_POD_NAME=foo;K8S_POD_NAMESPACE=default;K8S_POD_UID=blob')
self.kp['spec']['podStatic'] = True
self.plugin.add(self.params)
m_lock.assert_called_with('default/foo', external=True)
m_connect.assert_any_call(mock.ANY, mock.ANY, self.default_iface,
123, report_health=mock.ANY,
is_default_gateway=True,
container_id='cont_id')
m_connect.assert_any_call(mock.ANY, mock.ANY, self.additional_iface,
123, report_health=mock.ANY,
is_default_gateway=False,
container_id='cont_id')
self.k8s_mock.get.assert_any_call(
'/api/v1/namespaces/default/pods/foo')
self.assertEqual('cont_id',
self.plugin.registry['default/foo']['containerid'])
@mock.patch('oslo_concurrency.lockutils.lock')
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
def test_add_wrong_uid_none_static(self, m_connect, m_lock):
cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon')
self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120,
group='cni_daemon')
self.k8s_mock.get.side_effect = [
{'metadata': {
'annotations': {'kubernetes.io/config.source': 'file'}}},
self.kp]
self.params.args = utils.CNIArgs(
'K8S_POD_NAME=foo;K8S_POD_NAMESPACE=default;K8S_POD_UID=blob')
del self.kp['spec']['podStatic']
self.plugin.add(self.params)
m_lock.assert_called_with('default/foo', external=True)
m_connect.assert_any_call(mock.ANY, mock.ANY, self.default_iface,
123, report_health=mock.ANY,
is_default_gateway=True,
container_id='cont_id')
m_connect.assert_any_call(mock.ANY, mock.ANY, self.additional_iface,
123, report_health=mock.ANY,
is_default_gateway=False,
container_id='cont_id')
self.k8s_mock.get.assert_any_call(
'/api/v1/namespaces/default/pods/foo')
self.assertEqual('cont_id',
self.plugin.registry['default/foo']['containerid'])
@mock.patch('oslo_concurrency.lockutils.lock')
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
def test_del_present(self, m_disconnect, m_lock):

View File

@ -665,6 +665,16 @@ def is_host_network(pod):
return pod['spec'].get('hostNetwork', False)
def is_pod_static(pod):
"""Checks if Pod is static by comparing annotations."""
try:
annotations = pod['metadata']['annotations']
config_source = annotations[constants.K8S_ANNOTATION_CONFIG_SOURCE]
return config_source != 'api'
except KeyError:
return False
def get_nodename():
# NOTE(dulek): At first try to get it using environment variable,
# otherwise assume hostname is the nodename.