From b008a59ff0a25ca22b3f0c5bcc6a8d8bf92c4f82 Mon Sep 17 00:00:00 2001 From: Nayan Deshmukh Date: Tue, 5 Nov 2019 16:08:48 +0900 Subject: [PATCH] Only delete pod from CNI registry after unplugging the vif MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We can get the DELETED event before kubelet calls CNI DEL on the same pod. This will lead to a case where we remove the pod from the CNI registry before unplugging the vif. In this case the CNI DEL request will fail due absence of info of the tap interfaces. This effects OVS as we are left with tap interfaces whose one end is still attached to br-int. Change-Id: I5fda411e938319a00590f7dcfd2e6f37b7e2194c Signed-off-by: Nayan Deshmukh Signed-off-by: Paweł Suder Closes-Bug: 1849626 --- kuryr_kubernetes/cni/daemon/service.py | 20 +++++++--- .../cni/plugins/k8s_cni_registry.py | 17 +++++++++ .../unit/cni/plugins/test_k8s_cni_registry.py | 37 ++++++++++++++++--- .../tests/unit/cni/test_service.py | 31 ++++++++++++++++ 4 files changed, 94 insertions(+), 11 deletions(-) diff --git a/kuryr_kubernetes/cni/daemon/service.py b/kuryr_kubernetes/cni/daemon/service.py index 13d53aa16..4559f105a 100644 --- a/kuryr_kubernetes/cni/daemon/service.py +++ b/kuryr_kubernetes/cni/daemon/service.py @@ -221,7 +221,9 @@ class CNIDaemonWatcherService(cotyledon.Service): with lockutils.lock(pod_name, external=True): if pod_name not in self.registry: self.registry[pod_name] = {'pod': pod, 'vifs': vif_dict, - 'containerid': None} + 'containerid': None, + 'vif_unplugged': False, + 'del_received': False} else: # NOTE(dulek): Only update vif if its status changed, we don't # need to care about other changes now. @@ -241,12 +243,20 @@ class CNIDaemonWatcherService(cotyledon.Service): pod_name = utils.get_pod_unique_name(pod) try: if pod_name in self.registry: - # NOTE(dulek): del on dict is atomic as long as we use standard - # types as keys. This is the case, so we don't - # need to lock here. - del self.registry[pod_name] + # NOTE(ndesh): We need to lock here to avoid race condition + # with the deletion code for CNI DEL so that + # we delete the registry entry exactly once + with lockutils.lock(pod_name, external=True): + if self.registry[pod_name]['vif_unplugged']: + del self.registry[pod_name] + else: + pod_dict = self.registry[pod_name] + pod_dict['del_received'] = True + self.registry[pod_name] = pod_dict except KeyError: # This means someone else removed it. It's odd but safe to ignore. + LOG.debug('Pod %s entry already removed from registry while ' + 'handling DELETED event. Ignoring.', pod_name) pass def terminate(self): diff --git a/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py b/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py index 3bc605de4..e8700ba5e 100644 --- a/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py +++ b/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py @@ -100,6 +100,23 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin): except KeyError: pass self._do_work(params, b_base.disconnect) + # NOTE(ndesh): We need to lock here to avoid race condition + # with the deletion code in the watcher to ensure that + # we delete the registry entry exactly once + try: + with lockutils.lock(pod_name, external=True): + if self.registry[pod_name]['del_received']: + del self.registry[pod_name] + else: + pod_dict = self.registry[pod_name] + pod_dict['vif_unplugged'] = True + self.registry[pod_name] = pod_dict + except KeyError: + # This means the pod was removed before vif was unplugged. This + # shouldn't happen, but we can't do anything about it now + LOG.debug('Pod %s not found registry while handling DEL request. ' + 'Ignoring.', pod_name) + pass def report_drivers_health(self, driver_healthy): if not driver_healthy: diff --git a/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py b/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py index 78d8254b0..eddbad721 100644 --- a/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py +++ b/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py @@ -29,7 +29,9 @@ class TestK8sCNIRegistryPlugin(base.TestCase): 'namespace': 'default'}} self.vifs = fake._fake_vifs_dict() registry = {'default/foo': {'pod': self.pod, 'vifs': self.vifs, - 'containerid': None}} + 'containerid': None, + 'vif_unplugged': False, + 'del_received': False}} healthy = mock.Mock() self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin(registry, healthy) self.params = mock.Mock(args=mock.Mock(K8S_POD_NAME='foo', @@ -50,14 +52,32 @@ class TestK8sCNIRegistryPlugin(base.TestCase): self.assertEqual('cont_id', self.plugin.registry['default/foo']['containerid']) + @mock.patch('oslo_concurrency.lockutils.lock') @mock.patch('kuryr_kubernetes.cni.binding.base.disconnect') - def test_del_present(self, m_disconnect): + def test_del_present(self, m_disconnect, m_lock): self.plugin.delete(self.params) + m_lock.assert_called_with('default/foo', external=True) m_disconnect.assert_called_with(mock.ANY, mock.ANY, 'eth0', 123, report_health=mock.ANY, is_default_gateway=mock.ANY, container_id='cont_id') + self.assertIn('default/foo', self.plugin.registry) + self.assertEqual(True, + self.plugin.registry['default/foo']['vif_unplugged']) + + @mock.patch('oslo_concurrency.lockutils.lock') + @mock.patch('kuryr_kubernetes.cni.binding.base.disconnect') + def test_remove_pod_from_registry_after_del(self, m_disconnect, m_lock): + self.plugin.registry['default/foo']['del_received'] = True + self.plugin.delete(self.params) + + m_lock.assert_called_with('default/foo', external=True) + m_disconnect.assert_called_with(mock.ANY, mock.ANY, 'eth0', 123, + report_health=mock.ANY, + is_default_gateway=mock.ANY, + container_id='cont_id') + self.assertNotIn('default/foo', self.plugin.registry) @mock.patch('kuryr_kubernetes.cni.binding.base.disconnect') def test_del_wrong_container_id(self, m_disconnect): @@ -74,9 +94,12 @@ class TestK8sCNIRegistryPlugin(base.TestCase): @mock.patch('kuryr_kubernetes.cni.binding.base.connect') def test_add_present_on_5_try(self, m_connect, m_lock): se = [KeyError] * 5 - se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None}) - se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None}) - se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None}) + se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None, + 'vif_unplugged': False, 'del_received': False}) + se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None, + 'vif_unplugged': False, 'del_received': False}) + se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None, + 'vif_unplugged': False, 'del_received': False}) m_getitem = mock.Mock(side_effect=se) m_setitem = mock.Mock() m_registry = mock.Mock(__getitem__=m_getitem, __setitem__=m_setitem) @@ -87,7 +110,9 @@ class TestK8sCNIRegistryPlugin(base.TestCase): m_setitem.assert_called_once_with('default/foo', {'pod': self.pod, 'vifs': self.vifs, - 'containerid': 'cont_id'}) + 'containerid': 'cont_id', + 'vif_unplugged': False, + 'del_received': False}) m_connect.assert_called_with(mock.ANY, mock.ANY, 'eth0', 123, report_health=mock.ANY, is_default_gateway=mock.ANY, diff --git a/kuryr_kubernetes/tests/unit/cni/test_service.py b/kuryr_kubernetes/tests/unit/cni/test_service.py index 94d4bc9ec..9f6366447 100644 --- a/kuryr_kubernetes/tests/unit/cni/test_service.py +++ b/kuryr_kubernetes/tests/unit/cni/test_service.py @@ -101,3 +101,34 @@ class TestDaemonServer(base.TestCase): m_delete.assert_called_once_with(mock.ANY) self.assertEqual(500, resp.status_code) + + +class TestCNIDaemonWatcherService(base.TestCase): + def setUp(self): + super(TestCNIDaemonWatcherService, self).setUp() + self.registry = {} + self.pod = {'metadata': {'namespace': 'testing', + 'name': 'default'}, + 'vif_unplugged': False, + 'del_receieved': False} + self.healthy = mock.Mock() + self.watcher = service.CNIDaemonWatcherService( + 0, self.registry, self.healthy) + + @mock.patch('oslo_concurrency.lockutils.lock') + def test_on_deleted(self, m_lock): + pod = self.pod + pod['vif_unplugged'] = True + pod_name = 'testing/default' + self.registry[pod_name] = pod + self.watcher.on_deleted(pod) + self.assertNotIn(pod_name, self.registry) + + @mock.patch('oslo_concurrency.lockutils.lock') + def test_on_deleted_false(self, m_lock): + pod = self.pod + pod_name = 'testing/default' + self.registry[pod_name] = pod + self.watcher.on_deleted(pod) + self.assertIn(pod_name, self.registry) + self.assertIs(True, pod['del_received'])