diff --git a/devstack/plugin.sh b/devstack/plugin.sh index afdc9db55..99abd107b 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -27,6 +27,11 @@ function create_kuryr_cache_dir { fi } +function create_kuryr_lock_dir { + # Create lock directory + sudo install -d -o "$STACK_USER" "$KURYR_LOCK_DIR" +} + function get_distutils_data_path { cat << EOF | python - from __future__ import print_function @@ -67,6 +72,8 @@ function configure_kuryr { if is_service_enabled kuryr-daemon; then iniset "$KURYR_CONFIG" cni_daemon daemon_enabled True + iniset "$KURYR_CONFIG" oslo_concurrency lock_path "$KURYR_LOCK_DIR" + create_kuryr_lock_dir KURYR_K8S_CONTAINERIZED_DEPLOYMENT=$(trueorfalse False KURYR_K8S_CONTAINERIZED_DEPLOYMENT) if [ "$KURYR_K8S_CONTAINERIZED_DEPLOYMENT" == "True" ]; then # When running kuryr-daemon in container we need to set up configs. diff --git a/devstack/settings b/devstack/settings index 7555511c7..3d4e56468 100644 --- a/devstack/settings +++ b/devstack/settings @@ -5,6 +5,7 @@ CNI_CONF_DIR=${CNI_CONF_DIR:-$DEST/cni/conf} KURYR_CONFIG_DIR=${KURYR_CONFIG_DIR:-/etc/kuryr} KURYR_CONFIG=${KURYR_CONFIG:-${KURYR_CONFIG_DIR}/kuryr.conf} KURYR_AUTH_CACHE_DIR=${KURYR_AUTH_CACHE_DIR:-/var/cache/kuryr} +KURYR_LOCK_DIR=${KURYR_LOCK_DIR:-${DATA_DIR}/kuryr-kubernetes} KURYR_DOCKER_ENGINE_SOCKET_FILE=${KURYR_DOCKER_ENGINE_SOCKET_FILE:-/var/run/docker.sock} diff --git a/kuryr_kubernetes/cni/daemon/service.py b/kuryr_kubernetes/cni/daemon/service.py index a0f40e1c5..ab454bda9 100644 --- a/kuryr_kubernetes/cni/daemon/service.py +++ b/kuryr_kubernetes/cni/daemon/service.py @@ -26,6 +26,7 @@ import retrying import os_vif from os_vif import objects as obj_vif from os_vif.objects import base +from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils @@ -43,6 +44,7 @@ from kuryr_kubernetes import watcher as k_watcher LOG = logging.getLogger(__name__) CONF = cfg.CONF +RETRY_DELAY = 1000 # 1 second in milliseconds # TODO(dulek): Another corner case is (and was) when pod is deleted before it's # annotated by controller or even noticed by any watcher. Kubelet @@ -62,15 +64,31 @@ class K8sCNIRegistryPlugin(api.CNIPlugin): def add(self, params): vif = self._do_work(params, b_base.connect) - # NOTE(dulek): Saving containerid to be able to distinguish old DEL - # requests that we should ignore. We need to replace whole - # object in the dict for multiprocessing.Manager to work. pod_name = params.args.K8S_POD_NAME - d = self.registry[pod_name] - d['containerid'] = params.CNI_CONTAINERID - self.registry[pod_name] = d - LOG.debug('Saved containerid = %s for pod %s', params.CNI_CONTAINERID, - pod_name) + # NOTE(dulek): Saving containerid to be able to distinguish old DEL + # requests that we should ignore. We need a lock to + # prevent race conditions and replace whole object in the + # dict for multiprocessing.Manager to notice that. + with lockutils.lock(pod_name, external=True): + d = self.registry[pod_name] + d['containerid'] = params.CNI_CONTAINERID + self.registry[pod_name] = d + LOG.debug('Saved containerid = %s for pod %s', + params.CNI_CONTAINERID, pod_name) + + # Wait for VIF to become active. + timeout = CONF.cni_daemon.vif_annotation_timeout + + # Wait for timeout sec, 1 sec between tries, retry when vif not active. + @retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY, + retry_on_result=lambda x: not x.active) + def wait_for_active(pod_name): + return base.VersionedObject.obj_from_primitive( + self.registry[pod_name]['vif']) + + vif = wait_for_active(pod_name) + if not vif.active: + raise exceptions.ResourceNotReady(pod_name) return vif @@ -96,7 +114,7 @@ class K8sCNIRegistryPlugin(api.CNIPlugin): timeout = CONF.cni_daemon.vif_annotation_timeout # In case of KeyError retry for `timeout` s, wait 1 s between tries. - @retrying.retry(stop_max_delay=(timeout * 1000), wait_fixed=1000, + @retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY, retry_on_exception=lambda e: isinstance(e, KeyError)) def find(): return self.registry[pod_name] @@ -261,12 +279,23 @@ class CNIDaemonWatcherService(cotyledon.Service): self.watcher.start() def on_done(self, pod, vif): - # Add to registry only if it isn't already there. - if pod['metadata']['name'] not in self.registry: - vif_dict = vif.obj_to_primitive() - self.registry[pod['metadata']['name']] = {'pod': pod, - 'vif': vif_dict, - 'containerid': None} + pod_name = pod['metadata']['name'] + vif_dict = vif.obj_to_primitive() + # NOTE(dulek): We need a lock when modifying shared self.registry dict + # to prevent race conditions with other processes/threads. + with lockutils.lock(pod_name, external=True): + if pod_name not in self.registry: + self.registry[pod_name] = {'pod': pod, 'vif': vif_dict, + 'containerid': None} + else: + # NOTE(dulek): Only update vif if its status changed, we don't + # need to care about other changes now. + old_vif = base.VersionedObject.obj_from_primitive( + self.registry[pod_name]['vif']) + if old_vif.active != vif.active: + pod_dict = self.registry[pod_name] + pod_dict['vif'] = vif_dict + self.registry[pod_name] = pod_dict def terminate(self): if self.watcher: diff --git a/kuryr_kubernetes/tests/fake.py b/kuryr_kubernetes/tests/fake.py index 89583eb91..5663f1c80 100644 --- a/kuryr_kubernetes/tests/fake.py +++ b/kuryr_kubernetes/tests/fake.py @@ -43,6 +43,7 @@ def _fake_vif(cls=osv_vif.VIFOpenVSwitch): subnet.ips.objects.append( osv_objects.fixed_ip.FixedIP(address='192.168.0.2')) vif.network.subnets.objects.append(subnet) + vif.active = True return vif diff --git a/kuryr_kubernetes/tests/unit/cni/test_service.py b/kuryr_kubernetes/tests/unit/cni/test_service.py index 5a69867e6..3f601d1f1 100644 --- a/kuryr_kubernetes/tests/unit/cni/test_service.py +++ b/kuryr_kubernetes/tests/unit/cni/test_service.py @@ -35,10 +35,12 @@ class TestK8sCNIRegistryPlugin(base.TestCase): CNI_IFNAME='baz', CNI_NETNS=123, CNI_CONTAINERID='cont_id') + @mock.patch('oslo_concurrency.lockutils.lock') @mock.patch('kuryr_kubernetes.cni.binding.base.connect') - def test_add_present(self, m_connect): + def test_add_present(self, m_connect, m_lock): self.plugin.add(self.params) + m_lock.assert_called_with('foo', external=True) m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123) self.assertEqual('cont_id', self.plugin.registry['foo']['containerid']) @@ -57,18 +59,21 @@ class TestK8sCNIRegistryPlugin(base.TestCase): m_disconnect.assert_not_called() + @mock.patch('oslo_concurrency.lockutils.lock') @mock.patch('time.sleep', mock.Mock()) @mock.patch('kuryr_kubernetes.cni.binding.base.connect') - def test_add_present_on_5_try(self, m_connect): + def test_add_present_on_5_try(self, m_connect, m_lock): se = [KeyError] * 5 se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None}) se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None}) + se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None}) m_getitem = mock.Mock(side_effect=se) m_setitem = mock.Mock() m_registry = mock.Mock(__getitem__=m_getitem, __setitem__=m_setitem) self.plugin.registry = m_registry self.plugin.add(self.params) + m_lock.assert_called_with('foo', external=True) m_setitem.assert_called_once_with('foo', {'pod': self.pod, 'vif': self.vif, 'containerid': 'cont_id'})