From d5f5db7005028f9ba88ed9dcc0b5d09d0bbd8bda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Dulko?= Date: Wed, 19 Jan 2022 18:16:37 +0100 Subject: [PATCH] CNI: Use K8S_POD_UID passed from CRI Recent versions of cri-o and containerd are passing K8S_POD_UID as a CNI argument, alongside with K8S_POD_NAMESPACE and K8S_POD_NAME. As both latter variables cannot be used to safely identify a pod in the API (StatefulSet recreates pods with the same name), we were prone to race conditions in the CNI code that we could only workaround. The end effect was mostly IP conflict. Now that the UID argument is passed, we're able to compare the UID from the request with the one in the API to make sure we're wiring the correct pod. This commit implements that by making sure to move the check to the code actually waiting for the pod to appear in the registry. In case of K8S_POD_UID missing from the CNI request, API call to retrieve Pod is used as a fallback. We also know that this check doesn't work for static pods, so CRD and controller needed to be updated to include information if the pod is static on the KuryrPort spec, so that we can skip the check for the static pods without the need to fetch Pod from the API. Closes-Bug: 1963677 Change-Id: I5ef6a8212c535e90dee049a579c1483644d56db8 --- kubernetes_crds/kuryr_crds/kuryrport.yaml | 2 + kuryr_kubernetes/cni/binding/base.py | 13 ++- kuryr_kubernetes/cni/daemon/service.py | 6 +- .../cni/plugins/k8s_cni_registry.py | 78 ++++++++----- kuryr_kubernetes/cni/utils.py | 3 + kuryr_kubernetes/constants.py | 1 + kuryr_kubernetes/controller/handlers/vif.py | 3 +- kuryr_kubernetes/exceptions.py | 9 ++ .../unit/cni/plugins/test_k8s_cni_registry.py | 107 +++++++++++++++++- kuryr_kubernetes/utils.py | 10 ++ 10 files changed, 193 insertions(+), 39 deletions(-) diff --git a/kubernetes_crds/kuryr_crds/kuryrport.yaml b/kubernetes_crds/kuryr_crds/kuryrport.yaml index 6c3a6685a..e320e048a 100644 --- a/kubernetes_crds/kuryr_crds/kuryrport.yaml +++ b/kubernetes_crds/kuryr_crds/kuryrport.yaml @@ -29,6 +29,8 @@ spec: type: string podNodeName: type: string + podStatic: + type: boolean status: type: object required: diff --git a/kuryr_kubernetes/cni/binding/base.py b/kuryr_kubernetes/cni/binding/base.py index 481b0f573..5eb56af13 100644 --- a/kuryr_kubernetes/cni/binding/base.py +++ b/kuryr_kubernetes/cni/binding/base.py @@ -176,7 +176,12 @@ def disconnect(vif, instance_info, ifname, netns=None, report_health=None, @cni_utils.log_ipdb def cleanup(ifname, netns): - with get_ipdb(netns) as c_ipdb: - if ifname in c_ipdb.interfaces: - with c_ipdb.interfaces[ifname] as iface: - iface.remove() + try: + with get_ipdb(netns) as c_ipdb: + if ifname in c_ipdb.interfaces: + with c_ipdb.interfaces[ifname] as iface: + iface.remove() + except Exception: + # Just ignore cleanup errors, there's not much we can do anyway. + LOG.warning('Error occured when attempting to clean up netns %s. ' + 'Ignoring.', netns) diff --git a/kuryr_kubernetes/cni/daemon/service.py b/kuryr_kubernetes/cni/daemon/service.py index 1dcd06067..2cc722f97 100644 --- a/kuryr_kubernetes/cni/daemon/service.py +++ b/kuryr_kubernetes/cni/daemon/service.py @@ -153,9 +153,9 @@ class DaemonServer(object): try: self.plugin.delete(params) - except exceptions.CNIKuryrPortTimeout: - # NOTE(dulek): It's better to ignore this error - most of the time - # it will happen when pod is long gone and CRI + except (exceptions.CNIKuryrPortTimeout, exceptions.CNIPodUidMismatch): + # NOTE(dulek): It's better to ignore these errors - most of the + # time it will happen when pod is long gone and CRI # overzealously tries to delete it from the network. # We cannot really do anything without VIF annotation, # so let's just tell CRI to move along. diff --git a/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py b/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py index 0a2a69115..42441c762 100644 --- a/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py +++ b/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py @@ -48,31 +48,31 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin): self.k8s = clients.get_kubernetes_client() def _get_obj_name(self, params): - return "%(namespace)s/%(name)s" % { - 'namespace': params.args.K8S_POD_NAMESPACE, - 'name': params.args.K8S_POD_NAME} + return f'{params.args.K8S_POD_NAMESPACE}/{params.args.K8S_POD_NAME}' + + def _get_pod(self, params): + namespace = params.args.K8S_POD_NAMESPACE + name = params.args.K8S_POD_NAME + + try: + return self.k8s.get( + f'{k_const.K8S_API_NAMESPACES}/{namespace}/pods/{name}') + except exceptions.K8sClientException: + uniq_name = self._get_obj_name(params) + LOG.exception('Error when getting Pod %s', uniq_name) + raise def add(self, params): kp_name = self._get_obj_name(params) timeout = CONF.cni_daemon.vif_annotation_timeout - # Try to confirm if CRD in the registry is not stale cache. If it is, - # remove it. - with lockutils.lock(kp_name, external=True): - if kp_name in self.registry: - cached_kp = self.registry[kp_name]['kp'] - try: - kp = self.k8s.get(k_utils.get_res_link(cached_kp)) - except Exception: - LOG.exception('Error when getting KuryrPort %s', kp_name) - raise exceptions.ResourceNotReady(kp_name) - - if kp['metadata']['uid'] != cached_kp['metadata']['uid']: - LOG.warning('Stale KuryrPort %s detected in cache. (API ' - 'uid=%s, cached uid=%s). Removing it from ' - 'cache.', kp_name, kp['metadata']['uid'], - cached_kp['metadata']['uid']) - del self.registry[kp_name] + # In order to fight race conditions when pods get recreated with the + # same name (think StatefulSet), we're trying to get pod UID either + # from the request or the API in order to use it as the ID to compare. + if 'K8S_POD_UID' not in params.args: + # CRI doesn't pass K8S_POD_UID, get it from the API. + pod = self._get_pod(params) + params.args.K8S_POD_UID = pod['metadata']['uid'] vifs = self._do_work(params, b_base.connect, timeout) @@ -138,7 +138,7 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin): # delay before registry is populated by watcher. try: self._do_work(params, b_base.disconnect, 5) - except exceptions.CNIKuryrPortTimeout: + except (exceptions.CNIKuryrPortTimeout, exceptions.CNIPodUidMismatch): # So the VIF info seems to be lost at this point, we don't even # know what binding driver was used to plug it. Let's at least # try to remove the interface we created from the netns to prevent @@ -170,19 +170,42 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin): LOG.debug("Reporting CNI driver not healthy.") self.healthy.value = driver_healthy - def _do_work(self, params, fn, timeout): + def _get_vifs_from_registry(self, params, timeout): kp_name = self._get_obj_name(params) # In case of KeyError retry for `timeout` s, wait 1 s between tries. @retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY, - retry_on_exception=lambda e: isinstance(e, KeyError)) + retry_on_exception=lambda e: isinstance( + e, (KeyError, exceptions.CNIPodUidMismatch))) def find(): - return self.registry[kp_name] + d = self.registry[kp_name] + static = d['kp']['spec'].get('podStatic', None) + uid = d['kp']['spec']['podUid'] + # FIXME(dulek): This is weirdly structured for upgrades support. + # If podStatic is not set (KuryrPort created by old + # Kuryr version), then on uid mismatch we're fetching + # pod from API and check if it's static here. Pods + # are quite ephemeral, so will gradually get replaced + # after the upgrade and in a while all should have + # the field set and the performance penalty should + # be resolved. Remove in the future. + if 'K8S_POD_UID' in params.args and uid != params.args.K8S_POD_UID: + if static is None: + pod = self._get_pod(params) + static = k_utils.is_pod_static(pod) + + # Static pods have mirror pod UID in API, so it's always + # mismatched. We don't raise in that case. See [1] for more. + # [1] https://github.com/k8snetworkplumbingwg/multus-cni/ + # issues/773 + if not static: + raise exceptions.CNIPodUidMismatch( + kp_name, params.args.K8S_POD_UID, uid) + return d try: d = find() - kp = d['kp'] - vifs = d['vifs'] + return d['kp'], d['vifs'] except KeyError: data = {'metadata': {'name': params.args.K8S_POD_NAME, 'namespace': params.args.K8S_POD_NAMESPACE}} @@ -194,6 +217,9 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin): 'kuryr-daemon') raise exceptions.CNIKuryrPortTimeout(kp_name) + def _do_work(self, params, fn, timeout): + kp, vifs = self._get_vifs_from_registry(params, timeout) + for ifname, vif in vifs.items(): is_default_gateway = (ifname == k_const.DEFAULT_IFNAME) if is_default_gateway: diff --git a/kuryr_kubernetes/cni/utils.py b/kuryr_kubernetes/cni/utils.py index ce8dfade8..2e2f796cb 100644 --- a/kuryr_kubernetes/cni/utils.py +++ b/kuryr_kubernetes/cni/utils.py @@ -59,6 +59,9 @@ class CNIArgs(object): if not k.startswith('_'): setattr(self, k, v) + def __contains__(self, key): + return hasattr(self, key) + class CNIParameters(object): def __init__(self, env, cfg=None): diff --git a/kuryr_kubernetes/constants.py b/kuryr_kubernetes/constants.py index 91ac616bb..29a195c06 100644 --- a/kuryr_kubernetes/constants.py +++ b/kuryr_kubernetes/constants.py @@ -76,6 +76,7 @@ K8S_ANNOTATION_CURRENT_DRIVER = 'current_driver' K8S_ANNOTATION_NEUTRON_PORT = 'neutron_id' K8S_ANNOTATION_HEADLESS_SERVICE = 'service.kubernetes.io/headless' +K8S_ANNOTATION_CONFIG_SOURCE = 'kubernetes.io/config.source' POD_FINALIZER = KURYR_FQDN + '/pod-finalizer' KURYRNETWORK_FINALIZER = 'kuryrnetwork.finalizers.kuryr.openstack.org' diff --git a/kuryr_kubernetes/controller/handlers/vif.py b/kuryr_kubernetes/controller/handlers/vif.py index 5401d4b56..6c53b657c 100644 --- a/kuryr_kubernetes/controller/handlers/vif.py +++ b/kuryr_kubernetes/controller/handlers/vif.py @@ -207,7 +207,8 @@ class VIFHandler(k8s_base.ResourceEventHandler): }, 'spec': { 'podUid': pod['metadata']['uid'], - 'podNodeName': pod['spec']['nodeName'] + 'podNodeName': pod['spec']['nodeName'], + 'podStatic': utils.is_pod_static(pod) }, 'status': { 'vifs': vifs diff --git a/kuryr_kubernetes/exceptions.py b/kuryr_kubernetes/exceptions.py index 58715bb8f..283ff6cfb 100644 --- a/kuryr_kubernetes/exceptions.py +++ b/kuryr_kubernetes/exceptions.py @@ -170,6 +170,15 @@ class CNIBindingFailure(Exception): super(CNIBindingFailure, self).__init__(message) +class CNIPodUidMismatch(CNITimeout): + """Excepton raised on a mismatch of CNI request's pod UID and KuryrPort""" + def __init__(self, name, expected, observed): + super().__init__( + f'uid {observed} of the pod {name} does not match the uid ' + f'{expected} requested by the CNI. Dropping CNI request to prevent' + f' race conditions.') + + class UnreachableOctavia(Exception): """Exception indicates Octavia API failure and can not be reached diff --git a/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py b/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py index b977abe7b..307d0083a 100644 --- a/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py +++ b/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py @@ -17,6 +17,7 @@ from unittest import mock from oslo_config import cfg from kuryr_kubernetes.cni.plugins import k8s_cni_registry +from kuryr_kubernetes.cni import utils from kuryr_kubernetes import exceptions from kuryr_kubernetes.tests import base from kuryr_kubernetes.tests import fake @@ -33,7 +34,7 @@ class TestK8sCNIRegistryPlugin(base.TestCase): 'kind': 'KuryrPort', 'metadata': {'name': 'foo', 'uid': 'bar', 'namespace': 'default'}, - 'spec': {'podUid': 'bar'}} + 'spec': {'podUid': 'bar', 'podStatic': False}} self.vifs = fake._fake_vifs() registry = {'default/foo': {'kp': self.kp, 'vifs': self.vifs, 'containerid': None, @@ -41,10 +42,11 @@ class TestK8sCNIRegistryPlugin(base.TestCase): 'del_received': False}} healthy = mock.Mock() self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin(registry, healthy) - self.params = mock.Mock(args=mock.Mock(K8S_POD_NAME='foo', - K8S_POD_NAMESPACE='default'), - CNI_IFNAME=self.default_iface, CNI_NETNS=123, - CNI_CONTAINERID='cont_id') + self.params = mock.Mock( + args=utils.CNIArgs('K8S_POD_NAME=foo;K8S_POD_NAMESPACE=default;' + 'K8S_POD_UID=bar'), + CNI_IFNAME=self.default_iface, CNI_NETNS=123, + CNI_CONTAINERID='cont_id') @mock.patch('oslo_concurrency.lockutils.lock') @mock.patch('kuryr_kubernetes.cni.binding.base.connect') @@ -65,6 +67,101 @@ class TestK8sCNIRegistryPlugin(base.TestCase): self.assertEqual('cont_id', self.plugin.registry['default/foo']['containerid']) + @mock.patch('oslo_concurrency.lockutils.lock') + @mock.patch('kuryr_kubernetes.cni.binding.base.connect') + def test_add_no_uid(self, m_connect, m_lock): + self.k8s_mock.get.return_value = self.kp + + self.params.args = utils.CNIArgs( + 'K8S_POD_NAME=foo;K8S_POD_NAMESPACE=default') + self.plugin.add(self.params) + + m_lock.assert_called_with('default/foo', external=True) + m_connect.assert_any_call(mock.ANY, mock.ANY, self.default_iface, + 123, report_health=mock.ANY, + is_default_gateway=True, + container_id='cont_id') + m_connect.assert_any_call(mock.ANY, mock.ANY, self.additional_iface, + 123, report_health=mock.ANY, + is_default_gateway=False, + container_id='cont_id') + self.k8s_mock.get.assert_any_call( + '/api/v1/namespaces/default/pods/foo') + self.assertEqual('cont_id', + self.plugin.registry['default/foo']['containerid']) + + @mock.patch('kuryr_kubernetes.cni.binding.base.connect') + def test_add_wrong_uid(self, m_connect): + cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon') + self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120, + group='cni_daemon') + self.k8s_mock.get.return_value = self.kp + + self.params.args = utils.CNIArgs( + 'K8S_POD_NAME=foo;K8S_POD_NAMESPACE=default;K8S_POD_UID=blob') + self.assertRaises(exceptions.CNIPodUidMismatch, self.plugin.add, + self.params) + + m_connect.assert_not_called() + self.k8s_mock.get.assert_not_called() + + @mock.patch('oslo_concurrency.lockutils.lock') + @mock.patch('kuryr_kubernetes.cni.binding.base.connect') + def test_add_wrong_uid_static(self, m_connect, m_lock): + cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon') + self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120, + group='cni_daemon') + self.k8s_mock.get.return_value = self.kp + + self.params.args = utils.CNIArgs( + 'K8S_POD_NAME=foo;K8S_POD_NAMESPACE=default;K8S_POD_UID=blob') + self.kp['spec']['podStatic'] = True + self.plugin.add(self.params) + + m_lock.assert_called_with('default/foo', external=True) + m_connect.assert_any_call(mock.ANY, mock.ANY, self.default_iface, + 123, report_health=mock.ANY, + is_default_gateway=True, + container_id='cont_id') + m_connect.assert_any_call(mock.ANY, mock.ANY, self.additional_iface, + 123, report_health=mock.ANY, + is_default_gateway=False, + container_id='cont_id') + self.k8s_mock.get.assert_any_call( + '/api/v1/namespaces/default/pods/foo') + self.assertEqual('cont_id', + self.plugin.registry['default/foo']['containerid']) + + @mock.patch('oslo_concurrency.lockutils.lock') + @mock.patch('kuryr_kubernetes.cni.binding.base.connect') + def test_add_wrong_uid_none_static(self, m_connect, m_lock): + cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon') + self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120, + group='cni_daemon') + self.k8s_mock.get.side_effect = [ + {'metadata': { + 'annotations': {'kubernetes.io/config.source': 'file'}}}, + self.kp] + + self.params.args = utils.CNIArgs( + 'K8S_POD_NAME=foo;K8S_POD_NAMESPACE=default;K8S_POD_UID=blob') + del self.kp['spec']['podStatic'] + self.plugin.add(self.params) + + m_lock.assert_called_with('default/foo', external=True) + m_connect.assert_any_call(mock.ANY, mock.ANY, self.default_iface, + 123, report_health=mock.ANY, + is_default_gateway=True, + container_id='cont_id') + m_connect.assert_any_call(mock.ANY, mock.ANY, self.additional_iface, + 123, report_health=mock.ANY, + is_default_gateway=False, + container_id='cont_id') + self.k8s_mock.get.assert_any_call( + '/api/v1/namespaces/default/pods/foo') + self.assertEqual('cont_id', + self.plugin.registry['default/foo']['containerid']) + @mock.patch('oslo_concurrency.lockutils.lock') @mock.patch('kuryr_kubernetes.cni.binding.base.disconnect') def test_del_present(self, m_disconnect, m_lock): diff --git a/kuryr_kubernetes/utils.py b/kuryr_kubernetes/utils.py index 33f7511e9..3d6f9e113 100644 --- a/kuryr_kubernetes/utils.py +++ b/kuryr_kubernetes/utils.py @@ -665,6 +665,16 @@ def is_host_network(pod): return pod['spec'].get('hostNetwork', False) +def is_pod_static(pod): + """Checks if Pod is static by comparing annotations.""" + try: + annotations = pod['metadata']['annotations'] + config_source = annotations[constants.K8S_ANNOTATION_CONFIG_SOURCE] + return config_source != 'api' + except KeyError: + return False + + def get_nodename(): # NOTE(dulek): At first try to get it using environment variable, # otherwise assume hostname is the nodename.