Only delete pod from CNI registry after unplugging the vif
We can get the DELETED event before kubelet calls CNI DEL on the same pod. This will lead to a case where we remove the pod from the CNI registry before unplugging the vif. In this case the CNI DEL request will fail due absence of info of the tap interfaces. This effects OVS as we are left with tap interfaces whose one end is still attached to br-int. Change-Id: I5fda411e938319a00590f7dcfd2e6f37b7e2194c Signed-off-by: Nayan Deshmukh <n.deshmukh@samsung.com> Signed-off-by: Paweł Suder <p.suder@samsung.com> Closes-Bug: 1849626
This commit is contained in:
parent
6441a6e3dd
commit
b008a59ff0
|
@ -221,7 +221,9 @@ class CNIDaemonWatcherService(cotyledon.Service):
|
|||
with lockutils.lock(pod_name, external=True):
|
||||
if pod_name not in self.registry:
|
||||
self.registry[pod_name] = {'pod': pod, 'vifs': vif_dict,
|
||||
'containerid': None}
|
||||
'containerid': None,
|
||||
'vif_unplugged': False,
|
||||
'del_received': False}
|
||||
else:
|
||||
# NOTE(dulek): Only update vif if its status changed, we don't
|
||||
# need to care about other changes now.
|
||||
|
@ -241,12 +243,20 @@ class CNIDaemonWatcherService(cotyledon.Service):
|
|||
pod_name = utils.get_pod_unique_name(pod)
|
||||
try:
|
||||
if pod_name in self.registry:
|
||||
# NOTE(dulek): del on dict is atomic as long as we use standard
|
||||
# types as keys. This is the case, so we don't
|
||||
# need to lock here.
|
||||
del self.registry[pod_name]
|
||||
# NOTE(ndesh): We need to lock here to avoid race condition
|
||||
# with the deletion code for CNI DEL so that
|
||||
# we delete the registry entry exactly once
|
||||
with lockutils.lock(pod_name, external=True):
|
||||
if self.registry[pod_name]['vif_unplugged']:
|
||||
del self.registry[pod_name]
|
||||
else:
|
||||
pod_dict = self.registry[pod_name]
|
||||
pod_dict['del_received'] = True
|
||||
self.registry[pod_name] = pod_dict
|
||||
except KeyError:
|
||||
# This means someone else removed it. It's odd but safe to ignore.
|
||||
LOG.debug('Pod %s entry already removed from registry while '
|
||||
'handling DELETED event. Ignoring.', pod_name)
|
||||
pass
|
||||
|
||||
def terminate(self):
|
||||
|
|
|
@ -100,6 +100,23 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
|
|||
except KeyError:
|
||||
pass
|
||||
self._do_work(params, b_base.disconnect)
|
||||
# NOTE(ndesh): We need to lock here to avoid race condition
|
||||
# with the deletion code in the watcher to ensure that
|
||||
# we delete the registry entry exactly once
|
||||
try:
|
||||
with lockutils.lock(pod_name, external=True):
|
||||
if self.registry[pod_name]['del_received']:
|
||||
del self.registry[pod_name]
|
||||
else:
|
||||
pod_dict = self.registry[pod_name]
|
||||
pod_dict['vif_unplugged'] = True
|
||||
self.registry[pod_name] = pod_dict
|
||||
except KeyError:
|
||||
# This means the pod was removed before vif was unplugged. This
|
||||
# shouldn't happen, but we can't do anything about it now
|
||||
LOG.debug('Pod %s not found registry while handling DEL request. '
|
||||
'Ignoring.', pod_name)
|
||||
pass
|
||||
|
||||
def report_drivers_health(self, driver_healthy):
|
||||
if not driver_healthy:
|
||||
|
|
|
@ -29,7 +29,9 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
|
|||
'namespace': 'default'}}
|
||||
self.vifs = fake._fake_vifs_dict()
|
||||
registry = {'default/foo': {'pod': self.pod, 'vifs': self.vifs,
|
||||
'containerid': None}}
|
||||
'containerid': None,
|
||||
'vif_unplugged': False,
|
||||
'del_received': False}}
|
||||
healthy = mock.Mock()
|
||||
self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin(registry, healthy)
|
||||
self.params = mock.Mock(args=mock.Mock(K8S_POD_NAME='foo',
|
||||
|
@ -50,14 +52,32 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
|
|||
self.assertEqual('cont_id',
|
||||
self.plugin.registry['default/foo']['containerid'])
|
||||
|
||||
@mock.patch('oslo_concurrency.lockutils.lock')
|
||||
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
|
||||
def test_del_present(self, m_disconnect):
|
||||
def test_del_present(self, m_disconnect, m_lock):
|
||||
self.plugin.delete(self.params)
|
||||
|
||||
m_lock.assert_called_with('default/foo', external=True)
|
||||
m_disconnect.assert_called_with(mock.ANY, mock.ANY, 'eth0', 123,
|
||||
report_health=mock.ANY,
|
||||
is_default_gateway=mock.ANY,
|
||||
container_id='cont_id')
|
||||
self.assertIn('default/foo', self.plugin.registry)
|
||||
self.assertEqual(True,
|
||||
self.plugin.registry['default/foo']['vif_unplugged'])
|
||||
|
||||
@mock.patch('oslo_concurrency.lockutils.lock')
|
||||
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
|
||||
def test_remove_pod_from_registry_after_del(self, m_disconnect, m_lock):
|
||||
self.plugin.registry['default/foo']['del_received'] = True
|
||||
self.plugin.delete(self.params)
|
||||
|
||||
m_lock.assert_called_with('default/foo', external=True)
|
||||
m_disconnect.assert_called_with(mock.ANY, mock.ANY, 'eth0', 123,
|
||||
report_health=mock.ANY,
|
||||
is_default_gateway=mock.ANY,
|
||||
container_id='cont_id')
|
||||
self.assertNotIn('default/foo', self.plugin.registry)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
|
||||
def test_del_wrong_container_id(self, m_disconnect):
|
||||
|
@ -74,9 +94,12 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
|
|||
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
|
||||
def test_add_present_on_5_try(self, m_connect, m_lock):
|
||||
se = [KeyError] * 5
|
||||
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None})
|
||||
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None})
|
||||
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None})
|
||||
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None,
|
||||
'vif_unplugged': False, 'del_received': False})
|
||||
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None,
|
||||
'vif_unplugged': False, 'del_received': False})
|
||||
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None,
|
||||
'vif_unplugged': False, 'del_received': False})
|
||||
m_getitem = mock.Mock(side_effect=se)
|
||||
m_setitem = mock.Mock()
|
||||
m_registry = mock.Mock(__getitem__=m_getitem, __setitem__=m_setitem)
|
||||
|
@ -87,7 +110,9 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
|
|||
m_setitem.assert_called_once_with('default/foo',
|
||||
{'pod': self.pod,
|
||||
'vifs': self.vifs,
|
||||
'containerid': 'cont_id'})
|
||||
'containerid': 'cont_id',
|
||||
'vif_unplugged': False,
|
||||
'del_received': False})
|
||||
m_connect.assert_called_with(mock.ANY, mock.ANY, 'eth0', 123,
|
||||
report_health=mock.ANY,
|
||||
is_default_gateway=mock.ANY,
|
||||
|
|
|
@ -101,3 +101,34 @@ class TestDaemonServer(base.TestCase):
|
|||
|
||||
m_delete.assert_called_once_with(mock.ANY)
|
||||
self.assertEqual(500, resp.status_code)
|
||||
|
||||
|
||||
class TestCNIDaemonWatcherService(base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestCNIDaemonWatcherService, self).setUp()
|
||||
self.registry = {}
|
||||
self.pod = {'metadata': {'namespace': 'testing',
|
||||
'name': 'default'},
|
||||
'vif_unplugged': False,
|
||||
'del_receieved': False}
|
||||
self.healthy = mock.Mock()
|
||||
self.watcher = service.CNIDaemonWatcherService(
|
||||
0, self.registry, self.healthy)
|
||||
|
||||
@mock.patch('oslo_concurrency.lockutils.lock')
|
||||
def test_on_deleted(self, m_lock):
|
||||
pod = self.pod
|
||||
pod['vif_unplugged'] = True
|
||||
pod_name = 'testing/default'
|
||||
self.registry[pod_name] = pod
|
||||
self.watcher.on_deleted(pod)
|
||||
self.assertNotIn(pod_name, self.registry)
|
||||
|
||||
@mock.patch('oslo_concurrency.lockutils.lock')
|
||||
def test_on_deleted_false(self, m_lock):
|
||||
pod = self.pod
|
||||
pod_name = 'testing/default'
|
||||
self.registry[pod_name] = pod
|
||||
self.watcher.on_deleted(pod)
|
||||
self.assertIn(pod_name, self.registry)
|
||||
self.assertIs(True, pod['del_received'])
|
||||
|
|
Loading…
Reference in New Issue