diff --git a/kubernetes_crds/kuryr_crds/kuryrport.yaml b/kubernetes_crds/kuryr_crds/kuryrport.yaml index 6c3a6685a..e320e048a 100644 --- a/kubernetes_crds/kuryr_crds/kuryrport.yaml +++ b/kubernetes_crds/kuryr_crds/kuryrport.yaml @@ -29,6 +29,8 @@ spec: type: string podNodeName: type: string + podStatic: + type: boolean status: type: object required: diff --git a/kuryr_kubernetes/cni/binding/base.py b/kuryr_kubernetes/cni/binding/base.py index 481b0f573..5eb56af13 100644 --- a/kuryr_kubernetes/cni/binding/base.py +++ b/kuryr_kubernetes/cni/binding/base.py @@ -176,7 +176,12 @@ def disconnect(vif, instance_info, ifname, netns=None, report_health=None, @cni_utils.log_ipdb def cleanup(ifname, netns): - with get_ipdb(netns) as c_ipdb: - if ifname in c_ipdb.interfaces: - with c_ipdb.interfaces[ifname] as iface: - iface.remove() + try: + with get_ipdb(netns) as c_ipdb: + if ifname in c_ipdb.interfaces: + with c_ipdb.interfaces[ifname] as iface: + iface.remove() + except Exception: + # Just ignore cleanup errors, there's not much we can do anyway. + LOG.warning('Error occured when attempting to clean up netns %s. ' + 'Ignoring.', netns) diff --git a/kuryr_kubernetes/cni/daemon/service.py b/kuryr_kubernetes/cni/daemon/service.py index 1dcd06067..2cc722f97 100644 --- a/kuryr_kubernetes/cni/daemon/service.py +++ b/kuryr_kubernetes/cni/daemon/service.py @@ -153,9 +153,9 @@ class DaemonServer(object): try: self.plugin.delete(params) - except exceptions.CNIKuryrPortTimeout: - # NOTE(dulek): It's better to ignore this error - most of the time - # it will happen when pod is long gone and CRI + except (exceptions.CNIKuryrPortTimeout, exceptions.CNIPodUidMismatch): + # NOTE(dulek): It's better to ignore these errors - most of the + # time it will happen when pod is long gone and CRI # overzealously tries to delete it from the network. # We cannot really do anything without VIF annotation, # so let's just tell CRI to move along. diff --git a/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py b/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py index 0a2a69115..42441c762 100644 --- a/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py +++ b/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py @@ -48,31 +48,31 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin): self.k8s = clients.get_kubernetes_client() def _get_obj_name(self, params): - return "%(namespace)s/%(name)s" % { - 'namespace': params.args.K8S_POD_NAMESPACE, - 'name': params.args.K8S_POD_NAME} + return f'{params.args.K8S_POD_NAMESPACE}/{params.args.K8S_POD_NAME}' + + def _get_pod(self, params): + namespace = params.args.K8S_POD_NAMESPACE + name = params.args.K8S_POD_NAME + + try: + return self.k8s.get( + f'{k_const.K8S_API_NAMESPACES}/{namespace}/pods/{name}') + except exceptions.K8sClientException: + uniq_name = self._get_obj_name(params) + LOG.exception('Error when getting Pod %s', uniq_name) + raise def add(self, params): kp_name = self._get_obj_name(params) timeout = CONF.cni_daemon.vif_annotation_timeout - # Try to confirm if CRD in the registry is not stale cache. If it is, - # remove it. - with lockutils.lock(kp_name, external=True): - if kp_name in self.registry: - cached_kp = self.registry[kp_name]['kp'] - try: - kp = self.k8s.get(k_utils.get_res_link(cached_kp)) - except Exception: - LOG.exception('Error when getting KuryrPort %s', kp_name) - raise exceptions.ResourceNotReady(kp_name) - - if kp['metadata']['uid'] != cached_kp['metadata']['uid']: - LOG.warning('Stale KuryrPort %s detected in cache. (API ' - 'uid=%s, cached uid=%s). Removing it from ' - 'cache.', kp_name, kp['metadata']['uid'], - cached_kp['metadata']['uid']) - del self.registry[kp_name] + # In order to fight race conditions when pods get recreated with the + # same name (think StatefulSet), we're trying to get pod UID either + # from the request or the API in order to use it as the ID to compare. + if 'K8S_POD_UID' not in params.args: + # CRI doesn't pass K8S_POD_UID, get it from the API. + pod = self._get_pod(params) + params.args.K8S_POD_UID = pod['metadata']['uid'] vifs = self._do_work(params, b_base.connect, timeout) @@ -138,7 +138,7 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin): # delay before registry is populated by watcher. try: self._do_work(params, b_base.disconnect, 5) - except exceptions.CNIKuryrPortTimeout: + except (exceptions.CNIKuryrPortTimeout, exceptions.CNIPodUidMismatch): # So the VIF info seems to be lost at this point, we don't even # know what binding driver was used to plug it. Let's at least # try to remove the interface we created from the netns to prevent @@ -170,19 +170,42 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin): LOG.debug("Reporting CNI driver not healthy.") self.healthy.value = driver_healthy - def _do_work(self, params, fn, timeout): + def _get_vifs_from_registry(self, params, timeout): kp_name = self._get_obj_name(params) # In case of KeyError retry for `timeout` s, wait 1 s between tries. @retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY, - retry_on_exception=lambda e: isinstance(e, KeyError)) + retry_on_exception=lambda e: isinstance( + e, (KeyError, exceptions.CNIPodUidMismatch))) def find(): - return self.registry[kp_name] + d = self.registry[kp_name] + static = d['kp']['spec'].get('podStatic', None) + uid = d['kp']['spec']['podUid'] + # FIXME(dulek): This is weirdly structured for upgrades support. + # If podStatic is not set (KuryrPort created by old + # Kuryr version), then on uid mismatch we're fetching + # pod from API and check if it's static here. Pods + # are quite ephemeral, so will gradually get replaced + # after the upgrade and in a while all should have + # the field set and the performance penalty should + # be resolved. Remove in the future. + if 'K8S_POD_UID' in params.args and uid != params.args.K8S_POD_UID: + if static is None: + pod = self._get_pod(params) + static = k_utils.is_pod_static(pod) + + # Static pods have mirror pod UID in API, so it's always + # mismatched. We don't raise in that case. See [1] for more. + # [1] https://github.com/k8snetworkplumbingwg/multus-cni/ + # issues/773 + if not static: + raise exceptions.CNIPodUidMismatch( + kp_name, params.args.K8S_POD_UID, uid) + return d try: d = find() - kp = d['kp'] - vifs = d['vifs'] + return d['kp'], d['vifs'] except KeyError: data = {'metadata': {'name': params.args.K8S_POD_NAME, 'namespace': params.args.K8S_POD_NAMESPACE}} @@ -194,6 +217,9 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin): 'kuryr-daemon') raise exceptions.CNIKuryrPortTimeout(kp_name) + def _do_work(self, params, fn, timeout): + kp, vifs = self._get_vifs_from_registry(params, timeout) + for ifname, vif in vifs.items(): is_default_gateway = (ifname == k_const.DEFAULT_IFNAME) if is_default_gateway: diff --git a/kuryr_kubernetes/cni/utils.py b/kuryr_kubernetes/cni/utils.py index ce8dfade8..2e2f796cb 100644 --- a/kuryr_kubernetes/cni/utils.py +++ b/kuryr_kubernetes/cni/utils.py @@ -59,6 +59,9 @@ class CNIArgs(object): if not k.startswith('_'): setattr(self, k, v) + def __contains__(self, key): + return hasattr(self, key) + class CNIParameters(object): def __init__(self, env, cfg=None): diff --git a/kuryr_kubernetes/constants.py b/kuryr_kubernetes/constants.py index 91ac616bb..29a195c06 100644 --- a/kuryr_kubernetes/constants.py +++ b/kuryr_kubernetes/constants.py @@ -76,6 +76,7 @@ K8S_ANNOTATION_CURRENT_DRIVER = 'current_driver' K8S_ANNOTATION_NEUTRON_PORT = 'neutron_id' K8S_ANNOTATION_HEADLESS_SERVICE = 'service.kubernetes.io/headless' +K8S_ANNOTATION_CONFIG_SOURCE = 'kubernetes.io/config.source' POD_FINALIZER = KURYR_FQDN + '/pod-finalizer' KURYRNETWORK_FINALIZER = 'kuryrnetwork.finalizers.kuryr.openstack.org' diff --git a/kuryr_kubernetes/controller/handlers/vif.py b/kuryr_kubernetes/controller/handlers/vif.py index 5401d4b56..6c53b657c 100644 --- a/kuryr_kubernetes/controller/handlers/vif.py +++ b/kuryr_kubernetes/controller/handlers/vif.py @@ -207,7 +207,8 @@ class VIFHandler(k8s_base.ResourceEventHandler): }, 'spec': { 'podUid': pod['metadata']['uid'], - 'podNodeName': pod['spec']['nodeName'] + 'podNodeName': pod['spec']['nodeName'], + 'podStatic': utils.is_pod_static(pod) }, 'status': { 'vifs': vifs diff --git a/kuryr_kubernetes/exceptions.py b/kuryr_kubernetes/exceptions.py index 58715bb8f..283ff6cfb 100644 --- a/kuryr_kubernetes/exceptions.py +++ b/kuryr_kubernetes/exceptions.py @@ -170,6 +170,15 @@ class CNIBindingFailure(Exception): super(CNIBindingFailure, self).__init__(message) +class CNIPodUidMismatch(CNITimeout): + """Excepton raised on a mismatch of CNI request's pod UID and KuryrPort""" + def __init__(self, name, expected, observed): + super().__init__( + f'uid {observed} of the pod {name} does not match the uid ' + f'{expected} requested by the CNI. Dropping CNI request to prevent' + f' race conditions.') + + class UnreachableOctavia(Exception): """Exception indicates Octavia API failure and can not be reached diff --git a/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py b/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py index b977abe7b..307d0083a 100644 --- a/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py +++ b/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py @@ -17,6 +17,7 @@ from unittest import mock from oslo_config import cfg from kuryr_kubernetes.cni.plugins import k8s_cni_registry +from kuryr_kubernetes.cni import utils from kuryr_kubernetes import exceptions from kuryr_kubernetes.tests import base from kuryr_kubernetes.tests import fake @@ -33,7 +34,7 @@ class TestK8sCNIRegistryPlugin(base.TestCase): 'kind': 'KuryrPort', 'metadata': {'name': 'foo', 'uid': 'bar', 'namespace': 'default'}, - 'spec': {'podUid': 'bar'}} + 'spec': {'podUid': 'bar', 'podStatic': False}} self.vifs = fake._fake_vifs() registry = {'default/foo': {'kp': self.kp, 'vifs': self.vifs, 'containerid': None, @@ -41,10 +42,11 @@ class TestK8sCNIRegistryPlugin(base.TestCase): 'del_received': False}} healthy = mock.Mock() self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin(registry, healthy) - self.params = mock.Mock(args=mock.Mock(K8S_POD_NAME='foo', - K8S_POD_NAMESPACE='default'), - CNI_IFNAME=self.default_iface, CNI_NETNS=123, - CNI_CONTAINERID='cont_id') + self.params = mock.Mock( + args=utils.CNIArgs('K8S_POD_NAME=foo;K8S_POD_NAMESPACE=default;' + 'K8S_POD_UID=bar'), + CNI_IFNAME=self.default_iface, CNI_NETNS=123, + CNI_CONTAINERID='cont_id') @mock.patch('oslo_concurrency.lockutils.lock') @mock.patch('kuryr_kubernetes.cni.binding.base.connect') @@ -65,6 +67,101 @@ class TestK8sCNIRegistryPlugin(base.TestCase): self.assertEqual('cont_id', self.plugin.registry['default/foo']['containerid']) + @mock.patch('oslo_concurrency.lockutils.lock') + @mock.patch('kuryr_kubernetes.cni.binding.base.connect') + def test_add_no_uid(self, m_connect, m_lock): + self.k8s_mock.get.return_value = self.kp + + self.params.args = utils.CNIArgs( + 'K8S_POD_NAME=foo;K8S_POD_NAMESPACE=default') + self.plugin.add(self.params) + + m_lock.assert_called_with('default/foo', external=True) + m_connect.assert_any_call(mock.ANY, mock.ANY, self.default_iface, + 123, report_health=mock.ANY, + is_default_gateway=True, + container_id='cont_id') + m_connect.assert_any_call(mock.ANY, mock.ANY, self.additional_iface, + 123, report_health=mock.ANY, + is_default_gateway=False, + container_id='cont_id') + self.k8s_mock.get.assert_any_call( + '/api/v1/namespaces/default/pods/foo') + self.assertEqual('cont_id', + self.plugin.registry['default/foo']['containerid']) + + @mock.patch('kuryr_kubernetes.cni.binding.base.connect') + def test_add_wrong_uid(self, m_connect): + cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon') + self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120, + group='cni_daemon') + self.k8s_mock.get.return_value = self.kp + + self.params.args = utils.CNIArgs( + 'K8S_POD_NAME=foo;K8S_POD_NAMESPACE=default;K8S_POD_UID=blob') + self.assertRaises(exceptions.CNIPodUidMismatch, self.plugin.add, + self.params) + + m_connect.assert_not_called() + self.k8s_mock.get.assert_not_called() + + @mock.patch('oslo_concurrency.lockutils.lock') + @mock.patch('kuryr_kubernetes.cni.binding.base.connect') + def test_add_wrong_uid_static(self, m_connect, m_lock): + cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon') + self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120, + group='cni_daemon') + self.k8s_mock.get.return_value = self.kp + + self.params.args = utils.CNIArgs( + 'K8S_POD_NAME=foo;K8S_POD_NAMESPACE=default;K8S_POD_UID=blob') + self.kp['spec']['podStatic'] = True + self.plugin.add(self.params) + + m_lock.assert_called_with('default/foo', external=True) + m_connect.assert_any_call(mock.ANY, mock.ANY, self.default_iface, + 123, report_health=mock.ANY, + is_default_gateway=True, + container_id='cont_id') + m_connect.assert_any_call(mock.ANY, mock.ANY, self.additional_iface, + 123, report_health=mock.ANY, + is_default_gateway=False, + container_id='cont_id') + self.k8s_mock.get.assert_any_call( + '/api/v1/namespaces/default/pods/foo') + self.assertEqual('cont_id', + self.plugin.registry['default/foo']['containerid']) + + @mock.patch('oslo_concurrency.lockutils.lock') + @mock.patch('kuryr_kubernetes.cni.binding.base.connect') + def test_add_wrong_uid_none_static(self, m_connect, m_lock): + cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon') + self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120, + group='cni_daemon') + self.k8s_mock.get.side_effect = [ + {'metadata': { + 'annotations': {'kubernetes.io/config.source': 'file'}}}, + self.kp] + + self.params.args = utils.CNIArgs( + 'K8S_POD_NAME=foo;K8S_POD_NAMESPACE=default;K8S_POD_UID=blob') + del self.kp['spec']['podStatic'] + self.plugin.add(self.params) + + m_lock.assert_called_with('default/foo', external=True) + m_connect.assert_any_call(mock.ANY, mock.ANY, self.default_iface, + 123, report_health=mock.ANY, + is_default_gateway=True, + container_id='cont_id') + m_connect.assert_any_call(mock.ANY, mock.ANY, self.additional_iface, + 123, report_health=mock.ANY, + is_default_gateway=False, + container_id='cont_id') + self.k8s_mock.get.assert_any_call( + '/api/v1/namespaces/default/pods/foo') + self.assertEqual('cont_id', + self.plugin.registry['default/foo']['containerid']) + @mock.patch('oslo_concurrency.lockutils.lock') @mock.patch('kuryr_kubernetes.cni.binding.base.disconnect') def test_del_present(self, m_disconnect, m_lock): diff --git a/kuryr_kubernetes/utils.py b/kuryr_kubernetes/utils.py index 33f7511e9..3d6f9e113 100644 --- a/kuryr_kubernetes/utils.py +++ b/kuryr_kubernetes/utils.py @@ -665,6 +665,16 @@ def is_host_network(pod): return pod['spec'].get('hostNetwork', False) +def is_pod_static(pod): + """Checks if Pod is static by comparing annotations.""" + try: + annotations = pod['metadata']['annotations'] + config_source = annotations[constants.K8S_ANNOTATION_CONFIG_SOURCE] + return config_source != 'api' + except KeyError: + return False + + def get_nodename(): # NOTE(dulek): At first try to get it using environment variable, # otherwise assume hostname is the nodename.