Add lock to pools poulation

We have seen issues that the Neutron Server was not
able to respond in time during the creation of Ports
in bulk. In order to avoid this type of failures and
improve time taken for Pods to be created, this commit
includes a lock that will only allow one population to
happen at time for a specific pool, instead of controling
all the pools population with the frequency setting.

Change-Id: I76f5d08e744b7bbc64093ab1a54084a1f97d4aa7
This commit is contained in:
Maysa Macedo 2021-12-03 09:24:45 +00:00 committed by Michał Dulko
parent f8c0b736c1
commit e2bfcaca2b
3 changed files with 105 additions and 121 deletions

View File

@ -39,12 +39,8 @@ in a bulk request upon pool population, can be modified:
Note this value should be smaller than the ports_pool_max (if the
ports_pool_max is enabled).
Finally, the interval between pools updating actions (in seconds) can be
modified, and it should be adjusted based on your specific deployment, e.g., if
the port creation actions are slow, it is desirable to raise it in order not to
have overlapping actions. As a simple rule of thumbs, the frequency should be
at least as large as the time needed to perform the bulk requests (ports
creation, including subports attachment for the nested case):
Finally, to define the frequency (in seconds) of ports recycle to allow them
to be reused by future pods, configure the following option:
.. code-block:: ini

View File

@ -17,6 +17,7 @@ import abc
import collections
import eventlet
import os
import threading
import time
from kuryr.lib._i18n import _
@ -105,6 +106,7 @@ VIF_TYPE_TO_DRIVER_MAPPING = {
}
NODE_PORTS_CLEAN_FREQUENCY = 600 # seconds
POPULATE_POOL_TIMEOUT = 420 # seconds
class NoopVIFPool(base.VIFPoolDriver):
@ -146,6 +148,8 @@ class BaseVIFPool(base.VIFPoolDriver, metaclass=abc.ABCMeta):
_last_update is a dictionary with the timestamp of the last population
action for each pool. The keys are the pool_keys and the values are the
timestamps.
_populate_pool_lock is a dict with the pool_key as key and a lock as value.
Also, there is a _lock to control access to _populate_pool_lock dict.
The following driver configuration options exist:
- ports_pool_max: it specifies how many ports can be kept at each pool.
@ -157,7 +161,7 @@ class BaseVIFPool(base.VIFPoolDriver, metaclass=abc.ABCMeta):
- ports_pool_batch: target number of ports to be created in bulk requests
when populating pools.
- ports_pool_update_frequency: interval in seconds between ports pool
updates, both for populating pools as well as for recycling ports.
updates for recycling ports.
"""
def __init__(self):
@ -207,24 +211,28 @@ class BaseVIFPool(base.VIFPoolDriver, metaclass=abc.ABCMeta):
pool_key = self._get_pool_key(host_addr, project_id, None, subnets)
# NOTE(maysams): It's possible that more recent Pods will retrieve
# the Ports from the pool that older Pods were waiting for. In case
# this happens, the event will be retried.
try:
return self._get_port_from_pool(pool_key, pod, subnets,
tuple(sorted(security_groups)))
except exceptions.ResourceNotReady:
LOG.debug("Ports pool does not have available ports: %s", pool_key)
# NOTE(dulek): We're passing raise_not_ready=False because this
# will be run outside of handlers thread, so raising
# it will only result in an ugly log from eventlet.
eventlet.spawn(self._populate_pool, pool_key, pod, subnets,
tuple(sorted(security_groups)),
raise_not_ready=False)
if self._populate_pool(pool_key, pod, subnets,
tuple(sorted(security_groups))):
return self._get_port_from_pool(
pool_key, pod, subnets, tuple(sorted(security_groups)))
raise
def _get_port_from_pool(self, pool_key, pod, subnets, security_groups):
raise NotImplementedError()
def _populate_pool(self, pool_key, pod, subnets, security_groups,
raise_not_ready=True):
def _get_populate_pool_lock(self, pool_key):
with self._lock:
return self._populate_pool_lock[pool_key]
def _populate_pool(self, pool_key, pod, subnets, security_groups):
# REVISIT(ltomasbo): Drop the subnets parameter and get the information
# from the pool_key, which will be required when multi-network is
# supported
@ -232,50 +240,44 @@ class BaseVIFPool(base.VIFPoolDriver, metaclass=abc.ABCMeta):
if not self._recovered_pools:
LOG.debug("Kuryr-controller not yet ready to populate pools.")
if raise_not_ready:
raise exceptions.ResourceNotReady(pod)
else:
return
now = time.time()
last_update = 0
pool_updates = self._last_update.get(pool_key)
if pool_updates:
last_update = pool_updates.get(security_groups, 0)
return False
ports_pool_min = oslo_cfg.CONF.vif_pool.ports_pool_min
lock = self._get_populate_pool_lock(pool_key)
# NOTE(maysams): Only allow one request vifs per pool and times out
# if takes 420 sec.
if lock.acquire(timeout=POPULATE_POOL_TIMEOUT):
pool_size = self._get_pool_size(pool_key)
try:
if (now - oslo_cfg.CONF.vif_pool.ports_pool_update_frequency <
last_update):
LOG.debug("Not enough time since the last pool update")
return
except AttributeError:
LOG.debug("Kuryr-controller not yet ready to populate pools.")
return
self._last_update[pool_key] = {security_groups: now}
if pool_size < ports_pool_min:
num_ports = max(oslo_cfg.CONF.vif_pool.ports_pool_batch,
ports_pool_min - pool_size)
try:
vifs = self._drv_vif.request_vifs(
pod=pod,
project_id=pool_key[1],
subnets=subnets,
security_groups=security_groups,
num_ports=num_ports)
except os_exc.SDKException as exc:
kubernetes.add_event(
pod, 'FailToPopulateVIFPool',
f'There was an error during populating VIF pool '
f'for pod: {exc.message}', type_='Warning')
raise
pool_size = self._get_pool_size(pool_key)
if pool_size < oslo_cfg.CONF.vif_pool.ports_pool_min:
num_ports = max(oslo_cfg.CONF.vif_pool.ports_pool_batch,
oslo_cfg.CONF.vif_pool.ports_pool_min - pool_size)
try:
vifs = self._drv_vif.request_vifs(
pod=pod,
project_id=pool_key[1],
subnets=subnets,
security_groups=security_groups,
num_ports=num_ports)
except os_exc.SDKException as exc:
kubernetes.add_event(pod, 'FailToPopulateVIFPool',
f'There was an error during populating '
f'VIF pool for pod: {exc.message}',
type_='Warning')
raise
for vif in vifs:
self._existing_vifs[vif.id] = vif
self._available_ports_pools.setdefault(
pool_key, {}).setdefault(
security_groups, []).append(vif.id)
if not vifs:
self._last_update[pool_key] = {security_groups: last_update}
for vif in vifs:
self._existing_vifs[vif.id] = vif
self._available_ports_pools.setdefault(
pool_key, {}).setdefault(
security_groups, []).append(vif.id)
if vifs:
now = time.time()
self._last_update[pool_key] = {security_groups: now}
finally:
lock.release()
else:
return False
return True
def release_vif(self, pod, vif, project_id, security_groups,
host_addr=None):
@ -376,6 +378,8 @@ class BaseVIFPool(base.VIFPoolDriver, metaclass=abc.ABCMeta):
self._existing_vifs = collections.defaultdict()
self._recyclable_ports = collections.defaultdict()
self._last_update = collections.defaultdict()
self._lock = threading.Lock()
self._populate_pool_lock = collections.defaultdict(threading.Lock)
def _get_trunks_info(self):
"""Returns information about trunks and their subports.
@ -639,11 +643,8 @@ class NeutronVIFPool(BaseVIFPool):
os_net = clients.get_network_client()
os_net.update_port(port_id, name=c_utils.get_port_name(pod),
device_id=pod['metadata']['uid'])
# check if the pool needs to be populated
if (self._get_pool_size(pool_key) <
oslo_cfg.CONF.vif_pool.ports_pool_min):
eventlet.spawn(self._populate_pool, pool_key, pod, subnets,
security_groups)
eventlet.spawn(self._populate_pool, pool_key, pod, subnets,
security_groups)
# Add protection from port_id not in existing_vifs
try:
port = self._existing_vifs[port_id]
@ -805,6 +806,11 @@ class NeutronVIFPool(BaseVIFPool):
os_net.delete_port(port_id)
self._available_ports_pools[pool_key] = {}
with self._lock:
try:
del self._populate_pool_lock[pool_key]
except KeyError:
pass
class NestedVIFPool(BaseVIFPool):
@ -905,11 +911,8 @@ class NestedVIFPool(BaseVIFPool):
os_net.update_port(port_id, security_groups=list(security_groups))
if config.CONF.kubernetes.port_debug:
os_net.update_port(port_id, name=c_utils.get_port_name(pod))
# check if the pool needs to be populated
if (self._get_pool_size(pool_key) <
oslo_cfg.CONF.vif_pool.ports_pool_min):
eventlet.spawn(self._populate_pool, pool_key, pod, subnets,
security_groups)
eventlet.spawn(self._populate_pool, pool_key, pod, subnets,
security_groups)
# Add protection from port_id not in existing_vifs
try:
port = self._existing_vifs[port_id]
@ -1190,6 +1193,11 @@ class NestedVIFPool(BaseVIFPool):
os_net.delete_port(port_id)
self._available_ports_pools[pool_key] = {}
with self._lock:
try:
del self._populate_pool_lock[pool_key]
except KeyError:
pass
class MultiVIFPool(base.VIFPoolDriver):

View File

@ -13,6 +13,7 @@
# limitations under the License.
import collections
import threading
from unittest import mock
import uuid
@ -97,8 +98,7 @@ class BaseVIFPool(test_base.TestCase):
self.assertEqual(vif, cls.request_vif(m_driver, pod, project_id,
subnets, security_groups))
@mock.patch('eventlet.spawn')
def test_request_vif_empty_pool(self, m_eventlet):
def test_request_vif_empty_pool(self):
cls = vif_pool.BaseVIFPool
m_driver = mock.MagicMock(spec=cls)
@ -123,7 +123,7 @@ class BaseVIFPool(test_base.TestCase):
self.assertRaises(exceptions.ResourceNotReady, cls.request_vif,
m_driver, pod, project_id, subnets, security_groups)
m_eventlet.assert_called_once()
m_driver._populate_pool.assert_called_once()
def test_request_vif_pod_without_host(self):
cls = vif_pool.BaseVIFPool
@ -199,9 +199,9 @@ class BaseVIFPool(test_base.TestCase):
pool_key = (mock.sentinel.host_addr, project_id)
m_driver._recovered_pools = False
self.assertRaises(exceptions.ResourceNotReady, cls._populate_pool,
self.assertFalse(cls._populate_pool(
m_driver, pool_key, pod, subnets,
tuple(security_groups))
tuple(security_groups)))
m_driver._drv_vif.request_vifs.assert_not_called()
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@ -224,12 +224,15 @@ class BaseVIFPool(test_base.TestCase):
m_driver._recovered_pools = False
cls._populate_pool(m_driver, pool_key, pod, subnets,
tuple(security_groups), raise_not_ready=False)
tuple(security_groups))
m_driver._drv_vif.request_vifs.assert_not_called()
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('time.time', return_value=0)
def test__populate_pool_no_update(self, m_time, m_get_kubernetes_client):
@ddt.data((neutron_vif.NeutronPodVIFDriver),
(nested_vlan_vif.NestedVlanPodVIFDriver))
def test__populate_pool_no_update(self, m_vif_driver, m_time,
m_get_kubernetes_client):
cls = vif_pool.BaseVIFPool
m_driver = mock.MagicMock(spec=cls)
@ -238,41 +241,17 @@ class BaseVIFPool(test_base.TestCase):
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
pool_key = (mock.sentinel.host_addr, project_id)
oslo_cfg.CONF.set_override('ports_pool_update_frequency',
15,
group='vif_pool')
m_driver._last_update = {pool_key: {tuple(security_groups): 1}}
m_driver._recovered_pools = True
cls._populate_pool(m_driver, pool_key, pod, subnets,
tuple(security_groups))
m_driver._get_pool_size.assert_not_called()
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('time.time', return_value=50)
@ddt.data((neutron_vif.NeutronPodVIFDriver),
(nested_vlan_vif.NestedVlanPodVIFDriver))
def test__populate_pool_large_pool(self, m_vif_driver, m_time,
m_get_kubernetes_client):
cls = vif_pool.BaseVIFPool
m_driver = mock.MagicMock(spec=cls)
m_driver._get_pool_size.return_value = 4
cls_vif_driver = m_vif_driver
vif_driver = mock.MagicMock(spec=cls_vif_driver)
m_driver._drv_vif = vif_driver
pod = mock.sentinel.pod
project_id = str(uuid.uuid4())
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
pool_key = (mock.sentinel.host_addr, project_id)
oslo_cfg.CONF.set_override('ports_pool_update_frequency',
15,
group='vif_pool')
oslo_cfg.CONF.set_override('ports_pool_min',
5,
3,
group='vif_pool')
m_driver._last_update = {pool_key: {tuple(security_groups): 1}}
m_driver._get_pool_size.return_value = 10
@ -280,7 +259,7 @@ class BaseVIFPool(test_base.TestCase):
cls._populate_pool(m_driver, pool_key, pod, subnets,
tuple(security_groups))
m_driver._get_pool_size.assert_called_once()
m_driver._get_pool_size.assert_called()
m_driver._drv_vif.request_vifs.assert_not_called()
def test_release_vif(self):
@ -480,31 +459,22 @@ class NeutronVIFPool(test_base.TestCase):
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
oslo_cfg.CONF.set_override('port_debug',
True, group='kubernetes')
pod = get_pod_obj()
m_get_port_name.return_value = get_pod_name(pod)
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([port_id])}}
m_driver._existing_vifs = {port_id: port}
m_get_port_name.return_value = get_pod_name(pod)
oslo_cfg.CONF.set_override('ports_pool_min',
5,
group='vif_pool')
oslo_cfg.CONF.set_override('port_debug',
True,
group='kubernetes')
oslo_cfg.CONF.set_override('port_debug',
True,
group='kubernetes')
pool_length = 5
m_driver._get_pool_size.return_value = pool_length
self.assertEqual(port, cls._get_port_from_pool(
m_driver, pool_key, pod, subnets, tuple(security_groups)))
os_net.update_port.assert_called_once_with(
port_id, name=get_pod_name(pod), device_id=pod['metadata']['uid'])
m_eventlet.assert_not_called()
m_eventlet.assert_called()
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_port_name')
@mock.patch('eventlet.spawn')
@ -583,8 +553,6 @@ class NeutronVIFPool(test_base.TestCase):
oslo_cfg.CONF.set_override('port_debug',
False,
group='kubernetes')
pool_length = 5
m_driver._get_pool_size.return_value = pool_length
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([]),
@ -598,7 +566,7 @@ class NeutronVIFPool(test_base.TestCase):
os_net.update_port.assert_called_once_with(
port_id, security_groups=list(security_groups))
m_eventlet.assert_not_called()
m_eventlet.assert_called()
@mock.patch('eventlet.spawn')
def test__get_port_from_pool_empty_pool_reuse_no_update_info(self,
@ -633,7 +601,7 @@ class NeutronVIFPool(test_base.TestCase):
os_net.update_port.assert_called_once_with(
port_id, security_groups=list(security_groups))
m_eventlet.assert_not_called()
m_eventlet.assert_called()
def test__get_port_from_pool_empty_pool_reuse_no_ports(self):
cls = vif_pool.NeutronVIFPool
@ -931,6 +899,9 @@ class NeutronVIFPool(test_base.TestCase):
port_id = str(uuid.uuid4())
m_driver._available_ports_pools = {pool_key: {
tuple(['security_group']): [port_id]}}
m_driver._lock = threading.Lock()
m_driver._populate_pool_lock = {
pool_key: mock.MagicMock(spec=threading.Lock())}
m_driver._existing_vifs = {port_id: mock.sentinel.vif}
m_driver._recovered_pools = True
@ -968,6 +939,9 @@ class NeutronVIFPool(test_base.TestCase):
port_id = str(uuid.uuid4())
m_driver._available_ports_pools = {pool_key: {
tuple(['security_group']): [port_id]}}
m_driver._lock = threading.Lock()
m_driver._populate_pool_lock = {
pool_key: mock.MagicMock(spec=threading.Lock())}
m_driver._existing_vifs = {}
m_driver._recovered_pools = True
@ -1053,7 +1027,7 @@ class NestedVIFPool(test_base.TestCase):
os_net.update_port.assert_called_once_with(
port_id, name=get_pod_name(pod))
m_eventlet.assert_not_called()
m_eventlet.assert_called()
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_port_name')
@mock.patch('eventlet.spawn')
@ -1147,7 +1121,7 @@ class NestedVIFPool(test_base.TestCase):
os_net.update_port.assert_called_once_with(
port_id, security_groups=list(security_groups))
m_eventlet.assert_not_called()
m_eventlet.assert_called()
@mock.patch('eventlet.spawn')
def test__get_port_from_pool_empty_pool_reuse_no_update_info(self,
@ -1182,7 +1156,7 @@ class NestedVIFPool(test_base.TestCase):
os_net.update_port.assert_called_once_with(
port_id, security_groups=list(security_groups))
m_eventlet.assert_not_called()
m_eventlet.assert_called()
def test__get_port_from_pool_empty_pool_reuse_no_ports(self):
cls = vif_pool.NestedVIFPool
@ -1836,6 +1810,9 @@ class NestedVIFPool(test_base.TestCase):
vif.vlan_id = vlan_id
m_driver._available_ports_pools = {pool_key: {
tuple(['security_group']): [port_id]}}
m_driver._lock = threading.Lock()
m_driver._populate_pool_lock = {
pool_key: mock.MagicMock(spec=threading.Lock())}
m_driver._existing_vifs = {port_id: vif}
m_driver._recovered_pools = True
@ -1925,6 +1902,9 @@ class NestedVIFPool(test_base.TestCase):
vif.vlan_id = vlan_id
m_driver._available_ports_pools = {pool_key: {
tuple(['security_group']): [port_id]}}
m_driver._lock = threading.Lock()
m_driver._populate_pool_lock = {
pool_key: mock.MagicMock(spec=threading.Lock())}
m_driver._existing_vifs = {}
m_driver._recovered_pools = True