From c0349cec33e0569668be00f23b85c1971b2d5a71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Dulko?= Date: Tue, 19 Dec 2017 15:18:58 +0100 Subject: [PATCH] Make daemon wait for VIF to become active Currently CNI daemon is not coded to wait for VIF to become active before returning IP to the CNI. This commit fixes that by adding waiting to ADD part of the code. Change-Id: I2a4c3f3534c54ee7da886c28f73b3dda236b9c93 Closes-Bug: 1739014 --- devstack/plugin.sh | 7 +++ devstack/settings | 1 + kuryr_kubernetes/cni/daemon/service.py | 59 ++++++++++++++----- kuryr_kubernetes/tests/fake.py | 1 + .../tests/unit/cni/test_service.py | 9 ++- 5 files changed, 60 insertions(+), 17 deletions(-) diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 908b4548d..d2498e45d 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -27,6 +27,11 @@ function create_kuryr_cache_dir { fi } +function create_kuryr_lock_dir { + # Create lock directory + sudo install -d -o "$STACK_USER" "$KURYR_LOCK_DIR" +} + function get_distutils_data_path { cat << EOF | python - from __future__ import print_function @@ -67,6 +72,8 @@ function configure_kuryr { if is_service_enabled kuryr-daemon; then iniset "$KURYR_CONFIG" cni_daemon daemon_enabled True + iniset "$KURYR_CONFIG" oslo_concurrency lock_path "$KURYR_LOCK_DIR" + create_kuryr_lock_dir KURYR_K8S_CONTAINERIZED_DEPLOYMENT=$(trueorfalse False KURYR_K8S_CONTAINERIZED_DEPLOYMENT) if [ "$KURYR_K8S_CONTAINERIZED_DEPLOYMENT" == "True" ]; then # When running kuryr-daemon in container we need to set up configs. diff --git a/devstack/settings b/devstack/settings index f402fb5d5..23f7aa94f 100644 --- a/devstack/settings +++ b/devstack/settings @@ -5,6 +5,7 @@ CNI_CONF_DIR=${CNI_CONF_DIR:-$DEST/cni/conf} KURYR_CONFIG_DIR=${KURYR_CONFIG_DIR:-/etc/kuryr} KURYR_CONFIG=${KURYR_CONFIG:-${KURYR_CONFIG_DIR}/kuryr.conf} KURYR_AUTH_CACHE_DIR=${KURYR_AUTH_CACHE_DIR:-/var/cache/kuryr} +KURYR_LOCK_DIR=${KURYR_LOCK_DIR:-${DATA_DIR}/kuryr-kubernetes} KURYR_DOCKER_ENGINE_SOCKET_FILE=${KURYR_DOCKER_ENGINE_SOCKET_FILE:-/var/run/docker.sock} diff --git a/kuryr_kubernetes/cni/daemon/service.py b/kuryr_kubernetes/cni/daemon/service.py index a0f40e1c5..ab454bda9 100644 --- a/kuryr_kubernetes/cni/daemon/service.py +++ b/kuryr_kubernetes/cni/daemon/service.py @@ -26,6 +26,7 @@ import retrying import os_vif from os_vif import objects as obj_vif from os_vif.objects import base +from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils @@ -43,6 +44,7 @@ from kuryr_kubernetes import watcher as k_watcher LOG = logging.getLogger(__name__) CONF = cfg.CONF +RETRY_DELAY = 1000 # 1 second in milliseconds # TODO(dulek): Another corner case is (and was) when pod is deleted before it's # annotated by controller or even noticed by any watcher. Kubelet @@ -62,15 +64,31 @@ class K8sCNIRegistryPlugin(api.CNIPlugin): def add(self, params): vif = self._do_work(params, b_base.connect) - # NOTE(dulek): Saving containerid to be able to distinguish old DEL - # requests that we should ignore. We need to replace whole - # object in the dict for multiprocessing.Manager to work. pod_name = params.args.K8S_POD_NAME - d = self.registry[pod_name] - d['containerid'] = params.CNI_CONTAINERID - self.registry[pod_name] = d - LOG.debug('Saved containerid = %s for pod %s', params.CNI_CONTAINERID, - pod_name) + # NOTE(dulek): Saving containerid to be able to distinguish old DEL + # requests that we should ignore. We need a lock to + # prevent race conditions and replace whole object in the + # dict for multiprocessing.Manager to notice that. + with lockutils.lock(pod_name, external=True): + d = self.registry[pod_name] + d['containerid'] = params.CNI_CONTAINERID + self.registry[pod_name] = d + LOG.debug('Saved containerid = %s for pod %s', + params.CNI_CONTAINERID, pod_name) + + # Wait for VIF to become active. + timeout = CONF.cni_daemon.vif_annotation_timeout + + # Wait for timeout sec, 1 sec between tries, retry when vif not active. + @retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY, + retry_on_result=lambda x: not x.active) + def wait_for_active(pod_name): + return base.VersionedObject.obj_from_primitive( + self.registry[pod_name]['vif']) + + vif = wait_for_active(pod_name) + if not vif.active: + raise exceptions.ResourceNotReady(pod_name) return vif @@ -96,7 +114,7 @@ class K8sCNIRegistryPlugin(api.CNIPlugin): timeout = CONF.cni_daemon.vif_annotation_timeout # In case of KeyError retry for `timeout` s, wait 1 s between tries. - @retrying.retry(stop_max_delay=(timeout * 1000), wait_fixed=1000, + @retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY, retry_on_exception=lambda e: isinstance(e, KeyError)) def find(): return self.registry[pod_name] @@ -261,12 +279,23 @@ class CNIDaemonWatcherService(cotyledon.Service): self.watcher.start() def on_done(self, pod, vif): - # Add to registry only if it isn't already there. - if pod['metadata']['name'] not in self.registry: - vif_dict = vif.obj_to_primitive() - self.registry[pod['metadata']['name']] = {'pod': pod, - 'vif': vif_dict, - 'containerid': None} + pod_name = pod['metadata']['name'] + vif_dict = vif.obj_to_primitive() + # NOTE(dulek): We need a lock when modifying shared self.registry dict + # to prevent race conditions with other processes/threads. + with lockutils.lock(pod_name, external=True): + if pod_name not in self.registry: + self.registry[pod_name] = {'pod': pod, 'vif': vif_dict, + 'containerid': None} + else: + # NOTE(dulek): Only update vif if its status changed, we don't + # need to care about other changes now. + old_vif = base.VersionedObject.obj_from_primitive( + self.registry[pod_name]['vif']) + if old_vif.active != vif.active: + pod_dict = self.registry[pod_name] + pod_dict['vif'] = vif_dict + self.registry[pod_name] = pod_dict def terminate(self): if self.watcher: diff --git a/kuryr_kubernetes/tests/fake.py b/kuryr_kubernetes/tests/fake.py index 89583eb91..5663f1c80 100644 --- a/kuryr_kubernetes/tests/fake.py +++ b/kuryr_kubernetes/tests/fake.py @@ -43,6 +43,7 @@ def _fake_vif(cls=osv_vif.VIFOpenVSwitch): subnet.ips.objects.append( osv_objects.fixed_ip.FixedIP(address='192.168.0.2')) vif.network.subnets.objects.append(subnet) + vif.active = True return vif diff --git a/kuryr_kubernetes/tests/unit/cni/test_service.py b/kuryr_kubernetes/tests/unit/cni/test_service.py index 5a69867e6..3f601d1f1 100644 --- a/kuryr_kubernetes/tests/unit/cni/test_service.py +++ b/kuryr_kubernetes/tests/unit/cni/test_service.py @@ -35,10 +35,12 @@ class TestK8sCNIRegistryPlugin(base.TestCase): CNI_IFNAME='baz', CNI_NETNS=123, CNI_CONTAINERID='cont_id') + @mock.patch('oslo_concurrency.lockutils.lock') @mock.patch('kuryr_kubernetes.cni.binding.base.connect') - def test_add_present(self, m_connect): + def test_add_present(self, m_connect, m_lock): self.plugin.add(self.params) + m_lock.assert_called_with('foo', external=True) m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123) self.assertEqual('cont_id', self.plugin.registry['foo']['containerid']) @@ -57,18 +59,21 @@ class TestK8sCNIRegistryPlugin(base.TestCase): m_disconnect.assert_not_called() + @mock.patch('oslo_concurrency.lockutils.lock') @mock.patch('time.sleep', mock.Mock()) @mock.patch('kuryr_kubernetes.cni.binding.base.connect') - def test_add_present_on_5_try(self, m_connect): + def test_add_present_on_5_try(self, m_connect, m_lock): se = [KeyError] * 5 se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None}) se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None}) + se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None}) m_getitem = mock.Mock(side_effect=se) m_setitem = mock.Mock() m_registry = mock.Mock(__getitem__=m_getitem, __setitem__=m_setitem) self.plugin.registry = m_registry self.plugin.add(self.params) + m_lock.assert_called_with('foo', external=True) m_setitem.assert_called_once_with('foo', {'pod': self.pod, 'vif': self.vif, 'containerid': 'cont_id'})