From 8d8b84ca138a6ddb048b779c45a3e117b96736dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Dulko?= Date: Fri, 10 Apr 2020 17:40:51 +0200 Subject: [PATCH] CNI: Confirm pods in cache before connecting In highly distributed environment like Kubernetes installation with Kuryr, we need to plan for network outages in any case. If we don't, we end up with bugs like one this patch tries to fix. If we'd lose a Pod delete event on kuryr-daemon following can happen: 1. Pod A of name "foo" gets created. 2. It gets annotated normally and CNI ADD request gives it an IP X. 3. Pod A gets deleted. 4. Somehow the delete event gets lost on kuryr-daemon's watcher. 5. CRI sends CNI DEL request and pod gets unplugged successfully. It never gets deleted from the daemon's registry, because we never got the Pod delete event from K8s API. 6. Pod B of the same name "foo" gets created. 7. CNI looks up registry by /, finds old VIF there and plugs pod B with pod A's VIF X. 8. kuryr-controller never notices that and assigns IP X to another pod. 9. We get an IP conflict. To solve the issue this patch makes sure that when handling ADD CNI calls, we always get the pod from K8s API first, and if uid of the API one doesn't match the one in the registry, we remove the registry entry. That way we can make sure the pod we've cached isn't stale. This adds one K8s API call per CNI ADD request, which is a significant load increase, but hopefully the K8s API can handle it. Closes-Bug: 1854928 Change-Id: I9916fca41bd917d85be973b8625b65a61139c3b3 --- kuryr_kubernetes/cni/daemon/service.py | 4 +++- .../cni/plugins/k8s_cni_registry.py | 24 +++++++++++++++++-- .../unit/cni/plugins/test_k8s_cni_registry.py | 13 +++++++--- .../tests/unit/cni/test_service.py | 2 ++ 4 files changed, 37 insertions(+), 6 deletions(-) diff --git a/kuryr_kubernetes/cni/daemon/service.py b/kuryr_kubernetes/cni/daemon/service.py index d36e766a7..e01a5b8d9 100644 --- a/kuryr_kubernetes/cni/daemon/service.py +++ b/kuryr_kubernetes/cni/daemon/service.py @@ -220,7 +220,9 @@ class CNIDaemonWatcherService(cotyledon.Service): # NOTE(dulek): We need a lock when modifying shared self.registry dict # to prevent race conditions with other processes/threads. with lockutils.lock(pod_name, external=True): - if pod_name not in self.registry: + if (pod_name not in self.registry or + self.registry[pod_name]['pod']['metadata']['uid'] + != pod['metadata']['uid']): self.registry[pod_name] = {'pod': pod, 'vifs': vif_dict, 'containerid': None, 'vif_unplugged': False, diff --git a/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py b/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py index 96f21cd24..e4da7efcb 100644 --- a/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py +++ b/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py @@ -20,6 +20,7 @@ from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging +from kuryr_kubernetes import clients from kuryr_kubernetes.cni.binding import base as b_base from kuryr_kubernetes.cni.plugins import base as base_cni from kuryr_kubernetes.cni import utils @@ -42,6 +43,7 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin): def __init__(self, registry, healthy): self.healthy = healthy self.registry = registry + self.k8s = clients.get_kubernetes_client() def _get_pod_name(self, params): return "%(namespace)s/%(name)s" % { @@ -49,7 +51,7 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin): 'name': params.args.K8S_POD_NAME} def add(self, params): - vifs = self._do_work(params, b_base.connect) + vifs = self._do_work(params, b_base.connect, confirm=True) pod_name = self._get_pod_name(params) @@ -124,11 +126,29 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin): LOG.debug("Reporting CNI driver not healthy.") self.healthy.value = driver_healthy - def _do_work(self, params, fn): + def _do_work(self, params, fn, confirm=False): pod_name = self._get_pod_name(params) timeout = CONF.cni_daemon.vif_annotation_timeout + if confirm: + # Try to confirm if pod in the registry is not stale cache. + with lockutils.lock(pod_name, external=True): + if pod_name in self.registry: + cached_pod = self.registry[pod_name]['pod'] + try: + pod = self.k8s.get(cached_pod['metadata']['selfLink']) + except Exception: + LOG.exception('Error when getting pod %s', pod_name) + raise exceptions.ResourceNotReady(pod_name) + + if pod['metadata']['uid'] != cached_pod['metadata']['uid']: + LOG.warning('Stale pod %s detected in cache. (API ' + 'uid=%s, cached uid=%s). Removing it from ' + 'cache.', pod_name, pod['metadata']['uid'], + cached_pod['metadata']['uid']) + del self.registry[pod_name] + # In case of KeyError retry for `timeout` s, wait 1 s between tries. @retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY, retry_on_exception=lambda e: isinstance(e, KeyError)) diff --git a/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py b/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py index c1db1dbc9..a4e32d617 100644 --- a/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py +++ b/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py @@ -20,15 +20,17 @@ from kuryr_kubernetes.cni.plugins import k8s_cni_registry from kuryr_kubernetes import exceptions from kuryr_kubernetes.tests import base from kuryr_kubernetes.tests import fake +from kuryr_kubernetes.tests.unit import kuryr_fixtures class TestK8sCNIRegistryPlugin(base.TestCase): def setUp(self): super(TestK8sCNIRegistryPlugin, self).setUp() + self.k8s_mock = self.useFixture(kuryr_fixtures.MockK8sClient()).client self.default_iface = 'baz' self.additional_iface = 'eth1' self.pod = {'metadata': {'name': 'foo', 'uid': 'bar', - 'namespace': 'default'}} + 'namespace': 'default', 'selfLink': 'baz'}} self.vifs = fake._fake_vifs_dict() registry = {'default/foo': {'pod': self.pod, 'vifs': self.vifs, 'containerid': None, @@ -44,6 +46,8 @@ class TestK8sCNIRegistryPlugin(base.TestCase): @mock.patch('oslo_concurrency.lockutils.lock') @mock.patch('kuryr_kubernetes.cni.binding.base.connect') def test_add_present(self, m_connect, m_lock): + self.k8s_mock.get.return_value = self.pod + self.plugin.add(self.params) m_lock.assert_called_with('default/foo', external=True) @@ -116,7 +120,8 @@ class TestK8sCNIRegistryPlugin(base.TestCase): 'vif_unplugged': False, 'del_received': False}) m_getitem = mock.Mock(side_effect=se) m_setitem = mock.Mock() - m_registry = mock.Mock(__getitem__=m_getitem, __setitem__=m_setitem) + m_registry = mock.Mock(__getitem__=m_getitem, __setitem__=m_setitem, + __contains__=mock.Mock(return_value=False)) self.plugin.registry = m_registry self.plugin.add(self.params) @@ -137,13 +142,15 @@ class TestK8sCNIRegistryPlugin(base.TestCase): container_id='cont_id') @mock.patch('time.sleep', mock.Mock()) + @mock.patch('oslo_concurrency.lockutils.lock', mock.Mock( + return_value=mock.Mock(__enter__=mock.Mock(), __exit__=mock.Mock()))) def test_add_not_present(self): cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon') self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120, group='cni_daemon') m_getitem = mock.Mock(side_effect=KeyError) - m_registry = mock.Mock(__getitem__=m_getitem) + m_registry = mock.Mock(__getitem__=m_getitem, __contains__=False) self.plugin.registry = m_registry self.assertRaises(exceptions.ResourceNotReady, self.plugin.add, self.params) diff --git a/kuryr_kubernetes/tests/unit/cni/test_service.py b/kuryr_kubernetes/tests/unit/cni/test_service.py index 677103794..16d9143d2 100644 --- a/kuryr_kubernetes/tests/unit/cni/test_service.py +++ b/kuryr_kubernetes/tests/unit/cni/test_service.py @@ -21,12 +21,14 @@ from kuryr_kubernetes.cni.plugins import k8s_cni_registry from kuryr_kubernetes import exceptions from kuryr_kubernetes.tests import base from kuryr_kubernetes.tests import fake +from kuryr_kubernetes.tests.unit import kuryr_fixtures class TestDaemonServer(base.TestCase): def setUp(self): super(TestDaemonServer, self).setUp() healthy = mock.Mock() + self.k8s_mock = self.useFixture(kuryr_fixtures.MockK8sClient()) self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin({}, healthy) self.health_registry = mock.Mock() self.srv = service.DaemonServer(self.plugin, self.health_registry)