CNI: Confirm pods in cache before connecting

In highly distributed environment like Kubernetes installation with
Kuryr, we need to plan for network outages in any case. If we don't, we
end up with bugs like one this patch tries to fix.

If we'd lose a Pod delete event on kuryr-daemon following can happen:

    1. Pod A of name "foo" gets created.
    2. It gets annotated normally and CNI ADD request gives it an IP X.
    3. Pod A gets deleted.
    4. Somehow the delete event gets lost on kuryr-daemon's watcher.
    5. CRI sends CNI DEL request and pod gets unplugged successfully. It
       never gets deleted from the daemon's registry, because we never
       got the Pod delete event from K8s API.
    6. Pod B of the same name "foo" gets created.
    7. CNI looks up registry by <namespace>/<pod>, finds old VIF there
       and plugs pod B with pod A's VIF X.
    8. kuryr-controller never notices that and assigns IP X to another
       pod.
    9. We get an IP conflict.

To solve the issue this patch makes sure that when handling ADD CNI calls, we
always get the pod from K8s API first, and if uid of the API one doesn't match
the one in the registry, we remove the registry entry. That way we can make
sure the pod we've cached isn't stale. This adds one K8s API call per CNI ADD
request, which is a significant load increase, but hopefully the K8s API can
handle it.

Closes-Bug: 1854928

Change-Id: I9916fca41bd917d85be973b8625b65a61139c3b3
This commit is contained in:
Michał Dulko 2020-04-10 17:40:51 +02:00
parent 50b5933730
commit 8d8b84ca13
4 changed files with 37 additions and 6 deletions

View File

@ -220,7 +220,9 @@ class CNIDaemonWatcherService(cotyledon.Service):
# NOTE(dulek): We need a lock when modifying shared self.registry dict # NOTE(dulek): We need a lock when modifying shared self.registry dict
# to prevent race conditions with other processes/threads. # to prevent race conditions with other processes/threads.
with lockutils.lock(pod_name, external=True): with lockutils.lock(pod_name, external=True):
if pod_name not in self.registry: if (pod_name not in self.registry or
self.registry[pod_name]['pod']['metadata']['uid']
!= pod['metadata']['uid']):
self.registry[pod_name] = {'pod': pod, 'vifs': vif_dict, self.registry[pod_name] = {'pod': pod, 'vifs': vif_dict,
'containerid': None, 'containerid': None,
'vif_unplugged': False, 'vif_unplugged': False,

View File

@ -20,6 +20,7 @@ from oslo_concurrency import lockutils
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from kuryr_kubernetes import clients
from kuryr_kubernetes.cni.binding import base as b_base from kuryr_kubernetes.cni.binding import base as b_base
from kuryr_kubernetes.cni.plugins import base as base_cni from kuryr_kubernetes.cni.plugins import base as base_cni
from kuryr_kubernetes.cni import utils from kuryr_kubernetes.cni import utils
@ -42,6 +43,7 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
def __init__(self, registry, healthy): def __init__(self, registry, healthy):
self.healthy = healthy self.healthy = healthy
self.registry = registry self.registry = registry
self.k8s = clients.get_kubernetes_client()
def _get_pod_name(self, params): def _get_pod_name(self, params):
return "%(namespace)s/%(name)s" % { return "%(namespace)s/%(name)s" % {
@ -49,7 +51,7 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
'name': params.args.K8S_POD_NAME} 'name': params.args.K8S_POD_NAME}
def add(self, params): def add(self, params):
vifs = self._do_work(params, b_base.connect) vifs = self._do_work(params, b_base.connect, confirm=True)
pod_name = self._get_pod_name(params) pod_name = self._get_pod_name(params)
@ -124,11 +126,29 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
LOG.debug("Reporting CNI driver not healthy.") LOG.debug("Reporting CNI driver not healthy.")
self.healthy.value = driver_healthy self.healthy.value = driver_healthy
def _do_work(self, params, fn): def _do_work(self, params, fn, confirm=False):
pod_name = self._get_pod_name(params) pod_name = self._get_pod_name(params)
timeout = CONF.cni_daemon.vif_annotation_timeout timeout = CONF.cni_daemon.vif_annotation_timeout
if confirm:
# Try to confirm if pod in the registry is not stale cache.
with lockutils.lock(pod_name, external=True):
if pod_name in self.registry:
cached_pod = self.registry[pod_name]['pod']
try:
pod = self.k8s.get(cached_pod['metadata']['selfLink'])
except Exception:
LOG.exception('Error when getting pod %s', pod_name)
raise exceptions.ResourceNotReady(pod_name)
if pod['metadata']['uid'] != cached_pod['metadata']['uid']:
LOG.warning('Stale pod %s detected in cache. (API '
'uid=%s, cached uid=%s). Removing it from '
'cache.', pod_name, pod['metadata']['uid'],
cached_pod['metadata']['uid'])
del self.registry[pod_name]
# In case of KeyError retry for `timeout` s, wait 1 s between tries. # In case of KeyError retry for `timeout` s, wait 1 s between tries.
@retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY, @retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY,
retry_on_exception=lambda e: isinstance(e, KeyError)) retry_on_exception=lambda e: isinstance(e, KeyError))

View File

@ -20,15 +20,17 @@ from kuryr_kubernetes.cni.plugins import k8s_cni_registry
from kuryr_kubernetes import exceptions from kuryr_kubernetes import exceptions
from kuryr_kubernetes.tests import base from kuryr_kubernetes.tests import base
from kuryr_kubernetes.tests import fake from kuryr_kubernetes.tests import fake
from kuryr_kubernetes.tests.unit import kuryr_fixtures
class TestK8sCNIRegistryPlugin(base.TestCase): class TestK8sCNIRegistryPlugin(base.TestCase):
def setUp(self): def setUp(self):
super(TestK8sCNIRegistryPlugin, self).setUp() super(TestK8sCNIRegistryPlugin, self).setUp()
self.k8s_mock = self.useFixture(kuryr_fixtures.MockK8sClient()).client
self.default_iface = 'baz' self.default_iface = 'baz'
self.additional_iface = 'eth1' self.additional_iface = 'eth1'
self.pod = {'metadata': {'name': 'foo', 'uid': 'bar', self.pod = {'metadata': {'name': 'foo', 'uid': 'bar',
'namespace': 'default'}} 'namespace': 'default', 'selfLink': 'baz'}}
self.vifs = fake._fake_vifs_dict() self.vifs = fake._fake_vifs_dict()
registry = {'default/foo': {'pod': self.pod, 'vifs': self.vifs, registry = {'default/foo': {'pod': self.pod, 'vifs': self.vifs,
'containerid': None, 'containerid': None,
@ -44,6 +46,8 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
@mock.patch('oslo_concurrency.lockutils.lock') @mock.patch('oslo_concurrency.lockutils.lock')
@mock.patch('kuryr_kubernetes.cni.binding.base.connect') @mock.patch('kuryr_kubernetes.cni.binding.base.connect')
def test_add_present(self, m_connect, m_lock): def test_add_present(self, m_connect, m_lock):
self.k8s_mock.get.return_value = self.pod
self.plugin.add(self.params) self.plugin.add(self.params)
m_lock.assert_called_with('default/foo', external=True) m_lock.assert_called_with('default/foo', external=True)
@ -116,7 +120,8 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
'vif_unplugged': False, 'del_received': False}) 'vif_unplugged': False, 'del_received': False})
m_getitem = mock.Mock(side_effect=se) m_getitem = mock.Mock(side_effect=se)
m_setitem = mock.Mock() m_setitem = mock.Mock()
m_registry = mock.Mock(__getitem__=m_getitem, __setitem__=m_setitem) m_registry = mock.Mock(__getitem__=m_getitem, __setitem__=m_setitem,
__contains__=mock.Mock(return_value=False))
self.plugin.registry = m_registry self.plugin.registry = m_registry
self.plugin.add(self.params) self.plugin.add(self.params)
@ -137,13 +142,15 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
container_id='cont_id') container_id='cont_id')
@mock.patch('time.sleep', mock.Mock()) @mock.patch('time.sleep', mock.Mock())
@mock.patch('oslo_concurrency.lockutils.lock', mock.Mock(
return_value=mock.Mock(__enter__=mock.Mock(), __exit__=mock.Mock())))
def test_add_not_present(self): def test_add_not_present(self):
cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon') cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon')
self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120, self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120,
group='cni_daemon') group='cni_daemon')
m_getitem = mock.Mock(side_effect=KeyError) m_getitem = mock.Mock(side_effect=KeyError)
m_registry = mock.Mock(__getitem__=m_getitem) m_registry = mock.Mock(__getitem__=m_getitem, __contains__=False)
self.plugin.registry = m_registry self.plugin.registry = m_registry
self.assertRaises(exceptions.ResourceNotReady, self.plugin.add, self.assertRaises(exceptions.ResourceNotReady, self.plugin.add,
self.params) self.params)

View File

@ -21,12 +21,14 @@ from kuryr_kubernetes.cni.plugins import k8s_cni_registry
from kuryr_kubernetes import exceptions from kuryr_kubernetes import exceptions
from kuryr_kubernetes.tests import base from kuryr_kubernetes.tests import base
from kuryr_kubernetes.tests import fake from kuryr_kubernetes.tests import fake
from kuryr_kubernetes.tests.unit import kuryr_fixtures
class TestDaemonServer(base.TestCase): class TestDaemonServer(base.TestCase):
def setUp(self): def setUp(self):
super(TestDaemonServer, self).setUp() super(TestDaemonServer, self).setUp()
healthy = mock.Mock() healthy = mock.Mock()
self.k8s_mock = self.useFixture(kuryr_fixtures.MockK8sClient())
self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin({}, healthy) self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin({}, healthy)
self.health_registry = mock.Mock() self.health_registry = mock.Mock()
self.srv = service.DaemonServer(self.plugin, self.health_registry) self.srv = service.DaemonServer(self.plugin, self.health_registry)