Merge "Make daemon wait for VIF to become active"
This commit is contained in:
commit
11a6baa2f4
|
@ -27,6 +27,11 @@ function create_kuryr_cache_dir {
|
|||
fi
|
||||
}
|
||||
|
||||
function create_kuryr_lock_dir {
|
||||
# Create lock directory
|
||||
sudo install -d -o "$STACK_USER" "$KURYR_LOCK_DIR"
|
||||
}
|
||||
|
||||
function get_distutils_data_path {
|
||||
cat << EOF | python -
|
||||
from __future__ import print_function
|
||||
|
@ -67,6 +72,8 @@ function configure_kuryr {
|
|||
|
||||
if is_service_enabled kuryr-daemon; then
|
||||
iniset "$KURYR_CONFIG" cni_daemon daemon_enabled True
|
||||
iniset "$KURYR_CONFIG" oslo_concurrency lock_path "$KURYR_LOCK_DIR"
|
||||
create_kuryr_lock_dir
|
||||
KURYR_K8S_CONTAINERIZED_DEPLOYMENT=$(trueorfalse False KURYR_K8S_CONTAINERIZED_DEPLOYMENT)
|
||||
if [ "$KURYR_K8S_CONTAINERIZED_DEPLOYMENT" == "True" ]; then
|
||||
# When running kuryr-daemon in container we need to set up configs.
|
||||
|
|
|
@ -5,6 +5,7 @@ CNI_CONF_DIR=${CNI_CONF_DIR:-$DEST/cni/conf}
|
|||
KURYR_CONFIG_DIR=${KURYR_CONFIG_DIR:-/etc/kuryr}
|
||||
KURYR_CONFIG=${KURYR_CONFIG:-${KURYR_CONFIG_DIR}/kuryr.conf}
|
||||
KURYR_AUTH_CACHE_DIR=${KURYR_AUTH_CACHE_DIR:-/var/cache/kuryr}
|
||||
KURYR_LOCK_DIR=${KURYR_LOCK_DIR:-${DATA_DIR}/kuryr-kubernetes}
|
||||
|
||||
KURYR_DOCKER_ENGINE_SOCKET_FILE=${KURYR_DOCKER_ENGINE_SOCKET_FILE:-/var/run/docker.sock}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import retrying
|
|||
import os_vif
|
||||
from os_vif import objects as obj_vif
|
||||
from os_vif.objects import base
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
|
@ -43,6 +44,7 @@ from kuryr_kubernetes import watcher as k_watcher
|
|||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
RETRY_DELAY = 1000 # 1 second in milliseconds
|
||||
|
||||
# TODO(dulek): Another corner case is (and was) when pod is deleted before it's
|
||||
# annotated by controller or even noticed by any watcher. Kubelet
|
||||
|
@ -62,15 +64,31 @@ class K8sCNIRegistryPlugin(api.CNIPlugin):
|
|||
def add(self, params):
|
||||
vif = self._do_work(params, b_base.connect)
|
||||
|
||||
# NOTE(dulek): Saving containerid to be able to distinguish old DEL
|
||||
# requests that we should ignore. We need to replace whole
|
||||
# object in the dict for multiprocessing.Manager to work.
|
||||
pod_name = params.args.K8S_POD_NAME
|
||||
d = self.registry[pod_name]
|
||||
d['containerid'] = params.CNI_CONTAINERID
|
||||
self.registry[pod_name] = d
|
||||
LOG.debug('Saved containerid = %s for pod %s', params.CNI_CONTAINERID,
|
||||
pod_name)
|
||||
# NOTE(dulek): Saving containerid to be able to distinguish old DEL
|
||||
# requests that we should ignore. We need a lock to
|
||||
# prevent race conditions and replace whole object in the
|
||||
# dict for multiprocessing.Manager to notice that.
|
||||
with lockutils.lock(pod_name, external=True):
|
||||
d = self.registry[pod_name]
|
||||
d['containerid'] = params.CNI_CONTAINERID
|
||||
self.registry[pod_name] = d
|
||||
LOG.debug('Saved containerid = %s for pod %s',
|
||||
params.CNI_CONTAINERID, pod_name)
|
||||
|
||||
# Wait for VIF to become active.
|
||||
timeout = CONF.cni_daemon.vif_annotation_timeout
|
||||
|
||||
# Wait for timeout sec, 1 sec between tries, retry when vif not active.
|
||||
@retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY,
|
||||
retry_on_result=lambda x: not x.active)
|
||||
def wait_for_active(pod_name):
|
||||
return base.VersionedObject.obj_from_primitive(
|
||||
self.registry[pod_name]['vif'])
|
||||
|
||||
vif = wait_for_active(pod_name)
|
||||
if not vif.active:
|
||||
raise exceptions.ResourceNotReady(pod_name)
|
||||
|
||||
return vif
|
||||
|
||||
|
@ -96,7 +114,7 @@ class K8sCNIRegistryPlugin(api.CNIPlugin):
|
|||
timeout = CONF.cni_daemon.vif_annotation_timeout
|
||||
|
||||
# In case of KeyError retry for `timeout` s, wait 1 s between tries.
|
||||
@retrying.retry(stop_max_delay=(timeout * 1000), wait_fixed=1000,
|
||||
@retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY,
|
||||
retry_on_exception=lambda e: isinstance(e, KeyError))
|
||||
def find():
|
||||
return self.registry[pod_name]
|
||||
|
@ -261,12 +279,23 @@ class CNIDaemonWatcherService(cotyledon.Service):
|
|||
self.watcher.start()
|
||||
|
||||
def on_done(self, pod, vif):
|
||||
# Add to registry only if it isn't already there.
|
||||
if pod['metadata']['name'] not in self.registry:
|
||||
vif_dict = vif.obj_to_primitive()
|
||||
self.registry[pod['metadata']['name']] = {'pod': pod,
|
||||
'vif': vif_dict,
|
||||
'containerid': None}
|
||||
pod_name = pod['metadata']['name']
|
||||
vif_dict = vif.obj_to_primitive()
|
||||
# NOTE(dulek): We need a lock when modifying shared self.registry dict
|
||||
# to prevent race conditions with other processes/threads.
|
||||
with lockutils.lock(pod_name, external=True):
|
||||
if pod_name not in self.registry:
|
||||
self.registry[pod_name] = {'pod': pod, 'vif': vif_dict,
|
||||
'containerid': None}
|
||||
else:
|
||||
# NOTE(dulek): Only update vif if its status changed, we don't
|
||||
# need to care about other changes now.
|
||||
old_vif = base.VersionedObject.obj_from_primitive(
|
||||
self.registry[pod_name]['vif'])
|
||||
if old_vif.active != vif.active:
|
||||
pod_dict = self.registry[pod_name]
|
||||
pod_dict['vif'] = vif_dict
|
||||
self.registry[pod_name] = pod_dict
|
||||
|
||||
def terminate(self):
|
||||
if self.watcher:
|
||||
|
|
|
@ -43,6 +43,7 @@ def _fake_vif(cls=osv_vif.VIFOpenVSwitch):
|
|||
subnet.ips.objects.append(
|
||||
osv_objects.fixed_ip.FixedIP(address='192.168.0.2'))
|
||||
vif.network.subnets.objects.append(subnet)
|
||||
vif.active = True
|
||||
return vif
|
||||
|
||||
|
||||
|
|
|
@ -35,10 +35,12 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
|
|||
CNI_IFNAME='baz', CNI_NETNS=123,
|
||||
CNI_CONTAINERID='cont_id')
|
||||
|
||||
@mock.patch('oslo_concurrency.lockutils.lock')
|
||||
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
|
||||
def test_add_present(self, m_connect):
|
||||
def test_add_present(self, m_connect, m_lock):
|
||||
self.plugin.add(self.params)
|
||||
|
||||
m_lock.assert_called_with('foo', external=True)
|
||||
m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123)
|
||||
self.assertEqual('cont_id', self.plugin.registry['foo']['containerid'])
|
||||
|
||||
|
@ -57,18 +59,21 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
|
|||
|
||||
m_disconnect.assert_not_called()
|
||||
|
||||
@mock.patch('oslo_concurrency.lockutils.lock')
|
||||
@mock.patch('time.sleep', mock.Mock())
|
||||
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
|
||||
def test_add_present_on_5_try(self, m_connect):
|
||||
def test_add_present_on_5_try(self, m_connect, m_lock):
|
||||
se = [KeyError] * 5
|
||||
se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None})
|
||||
se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None})
|
||||
se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None})
|
||||
m_getitem = mock.Mock(side_effect=se)
|
||||
m_setitem = mock.Mock()
|
||||
m_registry = mock.Mock(__getitem__=m_getitem, __setitem__=m_setitem)
|
||||
self.plugin.registry = m_registry
|
||||
self.plugin.add(self.params)
|
||||
|
||||
m_lock.assert_called_with('foo', external=True)
|
||||
m_setitem.assert_called_once_with('foo', {'pod': self.pod,
|
||||
'vif': self.vif,
|
||||
'containerid': 'cont_id'})
|
||||
|
|
Loading…
Reference in New Issue