Avoid race between Retries and Deletion actions
This patch set increases the timeout to wait for resources to be created/deleted. This is needed to better support spikes without restarting the kuryr-controller. This patch also ensures that future retry events are not afecting the kuryr controller if they are retried once the related resources are already deleted, i.e., the on_delete event was executed before one of the retries. Closes-Bug: 1847753 Change-Id: I725ba22f0babf496af219a37e42e1c33b247308a
This commit is contained in:
parent
f363077401
commit
998be3bbda
|
@ -56,6 +56,9 @@ class NamespacePodSubnetDriver(default_subnet.DefaultPodSubnetDriver):
|
||||||
try:
|
try:
|
||||||
ns = kubernetes.get('%s/namespaces/%s' % (constants.K8S_API_BASE,
|
ns = kubernetes.get('%s/namespaces/%s' % (constants.K8S_API_BASE,
|
||||||
namespace))
|
namespace))
|
||||||
|
except exceptions.K8sResourceNotFound:
|
||||||
|
LOG.warning("Namespace %s not found", namespace)
|
||||||
|
raise
|
||||||
except exceptions.K8sClientException:
|
except exceptions.K8sClientException:
|
||||||
LOG.exception("Kubernetes Client Exception.")
|
LOG.exception("Kubernetes Client Exception.")
|
||||||
raise exceptions.ResourceNotReady(namespace)
|
raise exceptions.ResourceNotReady(namespace)
|
||||||
|
|
|
@ -532,12 +532,9 @@ class NetworkPolicySecurityGroupsDriver(base.PodSecurityGroupsDriver):
|
||||||
return crd_selectors
|
return crd_selectors
|
||||||
|
|
||||||
def create_namespace_sg_rules(self, namespace):
|
def create_namespace_sg_rules(self, namespace):
|
||||||
kubernetes = clients.get_kubernetes_client()
|
|
||||||
ns_name = namespace['metadata']['name']
|
ns_name = namespace['metadata']['name']
|
||||||
LOG.debug("Creating sg rule for namespace: %s", ns_name)
|
LOG.debug("Creating sg rule for namespace: %s", ns_name)
|
||||||
crd_selectors = []
|
crd_selectors = []
|
||||||
namespace = kubernetes.get(
|
|
||||||
'{}/namespaces/{}'.format(constants.K8S_API_BASE, ns_name))
|
|
||||||
knp_crds = driver_utils.get_kuryrnetpolicy_crds()
|
knp_crds = driver_utils.get_kuryrnetpolicy_crds()
|
||||||
for crd in knp_crds.get('items'):
|
for crd in knp_crds.get('items'):
|
||||||
crd_selector = crd['spec'].get('podSelector')
|
crd_selector = crd['spec'].get('podSelector')
|
||||||
|
|
|
@ -306,7 +306,11 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||||
def _add_new_members(self, endpoints, lbaas_state, lbaas_spec):
|
def _add_new_members(self, endpoints, lbaas_state, lbaas_spec):
|
||||||
changed = False
|
changed = False
|
||||||
|
|
||||||
self._sync_lbaas_sgs(endpoints, lbaas_state, lbaas_spec)
|
try:
|
||||||
|
self._sync_lbaas_sgs(endpoints, lbaas_state, lbaas_spec)
|
||||||
|
except k_exc.K8sResourceNotFound:
|
||||||
|
LOG.debug("The svc has been deleted while processing the endpoints"
|
||||||
|
" update. No need to add new members.")
|
||||||
|
|
||||||
lsnr_by_id = {l.id: l for l in lbaas_state.listeners}
|
lsnr_by_id = {l.id: l for l in lbaas_state.listeners}
|
||||||
pool_by_lsnr_port = {(lsnr_by_id[p.listener_id].protocol,
|
pool_by_lsnr_port = {(lsnr_by_id[p.listener_id].protocol,
|
||||||
|
|
|
@ -120,8 +120,9 @@ class NamespaceHandler(k8s_base.ResourceEventHandler):
|
||||||
net_crd = self._add_kuryrnet_crd(ns_name, net_crd_spec)
|
net_crd = self._add_kuryrnet_crd(ns_name, net_crd_spec)
|
||||||
self._set_net_crd(namespace, net_crd)
|
self._set_net_crd(namespace, net_crd)
|
||||||
self._drv_sg.create_namespace_sg_rules(namespace)
|
self._drv_sg.create_namespace_sg_rules(namespace)
|
||||||
except exceptions.K8sClientException:
|
except (exceptions.K8sClientException,
|
||||||
LOG.exception("Kubernetes client exception. Rolling back "
|
exceptions.K8sResourceNotFound):
|
||||||
|
LOG.exception("Kuryrnet CRD creation failed. Rolling back "
|
||||||
"resources created for the namespace.")
|
"resources created for the namespace.")
|
||||||
self._drv_subnets.rollback_network_resources(net_crd_spec, ns_name)
|
self._drv_subnets.rollback_network_resources(net_crd_spec, ns_name)
|
||||||
if net_crd_sg.get('sgId'):
|
if net_crd_sg.get('sgId'):
|
||||||
|
|
|
@ -20,6 +20,7 @@ from kuryr_kubernetes import clients
|
||||||
from kuryr_kubernetes import constants
|
from kuryr_kubernetes import constants
|
||||||
from kuryr_kubernetes.controller.drivers import base as drivers
|
from kuryr_kubernetes.controller.drivers import base as drivers
|
||||||
from kuryr_kubernetes.controller.drivers import utils as driver_utils
|
from kuryr_kubernetes.controller.drivers import utils as driver_utils
|
||||||
|
from kuryr_kubernetes import exceptions as k_exc
|
||||||
from kuryr_kubernetes.handlers import k8s_base
|
from kuryr_kubernetes.handlers import k8s_base
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
@ -64,7 +65,11 @@ class PodLabelHandler(k8s_base.ResourceEventHandler):
|
||||||
project_id = self._drv_project.get_project(pod)
|
project_id = self._drv_project.get_project(pod)
|
||||||
security_groups = self._drv_sg.get_security_groups(pod, project_id)
|
security_groups = self._drv_sg.get_security_groups(pod, project_id)
|
||||||
self._drv_vif_pool.update_vif_sgs(pod, security_groups)
|
self._drv_vif_pool.update_vif_sgs(pod, security_groups)
|
||||||
self._set_pod_labels(pod, current_pod_labels)
|
try:
|
||||||
|
self._set_pod_labels(pod, current_pod_labels)
|
||||||
|
except k_exc.K8sResourceNotFound:
|
||||||
|
LOG.debug("Pod already deleted, no need to retry.")
|
||||||
|
return
|
||||||
|
|
||||||
if oslo_cfg.CONF.octavia_defaults.enforce_sg_rules:
|
if oslo_cfg.CONF.octavia_defaults.enforce_sg_rules:
|
||||||
services = driver_utils.get_services()
|
services = driver_utils.get_services()
|
||||||
|
|
|
@ -94,7 +94,7 @@ class VIFHandler(k8s_base.ResourceEventHandler):
|
||||||
if not state:
|
if not state:
|
||||||
try:
|
try:
|
||||||
subnets = self._drv_subnets.get_subnets(pod, project_id)
|
subnets = self._drv_subnets.get_subnets(pod, project_id)
|
||||||
except n_exc.NotFound:
|
except (n_exc.NotFound, k_exc.K8sResourceNotFound):
|
||||||
LOG.warning("Subnet does not exists. If namespace driver is "
|
LOG.warning("Subnet does not exists. If namespace driver is "
|
||||||
"used, probably the namespace for the pod is "
|
"used, probably the namespace for the pod is "
|
||||||
"already deleted. So this pod does not need to "
|
"already deleted. So this pod does not need to "
|
||||||
|
|
|
@ -20,6 +20,7 @@ from neutronclient.common import exceptions as n_exc
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from oslo_utils import excutils
|
from oslo_utils import excutils
|
||||||
|
|
||||||
|
from kuryr_kubernetes import clients
|
||||||
from kuryr_kubernetes import exceptions
|
from kuryr_kubernetes import exceptions
|
||||||
from kuryr_kubernetes.handlers import base
|
from kuryr_kubernetes.handlers import base
|
||||||
from kuryr_kubernetes import utils
|
from kuryr_kubernetes import utils
|
||||||
|
@ -48,10 +49,31 @@ class Retry(base.EventHandler):
|
||||||
self._exceptions = exceptions
|
self._exceptions = exceptions
|
||||||
self._timeout = timeout
|
self._timeout = timeout
|
||||||
self._interval = interval
|
self._interval = interval
|
||||||
|
self._k8s = clients.get_kubernetes_client()
|
||||||
|
|
||||||
def __call__(self, event):
|
def __call__(self, event):
|
||||||
deadline = time.time() + self._timeout
|
deadline = time.time() + self._timeout
|
||||||
for attempt in itertools.count(1):
|
for attempt in itertools.count(1):
|
||||||
|
if event.get('type') in ['MODIFIED', 'ADDED']:
|
||||||
|
obj = event.get('object')
|
||||||
|
if obj:
|
||||||
|
try:
|
||||||
|
obj_link = obj['metadata']['selfLink']
|
||||||
|
except KeyError:
|
||||||
|
LOG.debug("Skipping object check as it does not have "
|
||||||
|
"selfLink: %s", obj)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
self._k8s.get(obj_link)
|
||||||
|
except exceptions.K8sResourceNotFound:
|
||||||
|
LOG.debug("There is no need to process the "
|
||||||
|
"retry as the object %s has already "
|
||||||
|
"been deleted.", obj_link)
|
||||||
|
return
|
||||||
|
except exceptions.K8sClientException:
|
||||||
|
LOG.debug("Kubernetes client error getting the "
|
||||||
|
"object. Continuing with handler "
|
||||||
|
"execution.")
|
||||||
try:
|
try:
|
||||||
self._handler(event)
|
self._handler(event)
|
||||||
break
|
break
|
||||||
|
|
|
@ -17,8 +17,10 @@ import fixtures
|
||||||
import mock
|
import mock
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
from kuryr_kubernetes import exceptions
|
||||||
from kuryr_kubernetes.handlers import retry as h_retry
|
from kuryr_kubernetes.handlers import retry as h_retry
|
||||||
from kuryr_kubernetes.tests import base as test_base
|
from kuryr_kubernetes.tests import base as test_base
|
||||||
|
from kuryr_kubernetes.tests.unit import kuryr_fixtures as k_fix
|
||||||
|
|
||||||
|
|
||||||
class _EX1(Exception):
|
class _EX1(Exception):
|
||||||
|
@ -42,6 +44,11 @@ class TestRetryHandler(test_base.TestCase):
|
||||||
f_time = self.useFixture(fixtures.MockPatch('time.time'))
|
f_time = self.useFixture(fixtures.MockPatch('time.time'))
|
||||||
f_time.mock.return_value = self.now
|
f_time.mock.return_value = self.now
|
||||||
|
|
||||||
|
self.k8s = self.useFixture(k_fix.MockK8sClient()).client
|
||||||
|
f_k8s = self.useFixture(fixtures.MockPatch(
|
||||||
|
'kuryr_kubernetes.clients.get_kubernetes_client'))
|
||||||
|
f_k8s.mock.return_value = self.k8s
|
||||||
|
|
||||||
@mock.patch('random.randint')
|
@mock.patch('random.randint')
|
||||||
@mock.patch('time.sleep')
|
@mock.patch('time.sleep')
|
||||||
def test_should_not_sleep(self, m_sleep, m_randint):
|
def test_should_not_sleep(self, m_sleep, m_randint):
|
||||||
|
@ -87,10 +94,27 @@ class TestRetryHandler(test_base.TestCase):
|
||||||
m_handler = mock.Mock()
|
m_handler = mock.Mock()
|
||||||
m_count.return_value = list(range(1, 5))
|
m_count.return_value = list(range(1, 5))
|
||||||
retry = h_retry.Retry(m_handler)
|
retry = h_retry.Retry(m_handler)
|
||||||
|
event = {'type': 'DELETED'}
|
||||||
|
|
||||||
retry(mock.sentinel.event)
|
retry(event)
|
||||||
|
|
||||||
m_handler.assert_called_once_with(mock.sentinel.event)
|
m_handler.assert_called_once_with(event)
|
||||||
|
m_sleep.assert_not_called()
|
||||||
|
|
||||||
|
@mock.patch('itertools.count')
|
||||||
|
@mock.patch.object(h_retry.Retry, '_sleep')
|
||||||
|
def test_call_outdated_event(self, m_sleep, m_count):
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
m_count.return_value = list(range(1, 5))
|
||||||
|
obj = {'metadata': {'selfLink': mock.sentinel.selflink}}
|
||||||
|
event = {'type': 'MODIFIED', 'object': obj}
|
||||||
|
self.k8s.get.side_effect = exceptions.K8sResourceNotFound(obj)
|
||||||
|
|
||||||
|
retry = h_retry.Retry(m_handler)
|
||||||
|
retry(event)
|
||||||
|
|
||||||
|
self.k8s.get.assert_called_once_with(obj['metadata']['selfLink'])
|
||||||
|
m_handler.assert_not_called()
|
||||||
m_sleep.assert_not_called()
|
m_sleep.assert_not_called()
|
||||||
|
|
||||||
@mock.patch('itertools.count')
|
@mock.patch('itertools.count')
|
||||||
|
@ -100,7 +124,7 @@ class TestRetryHandler(test_base.TestCase):
|
||||||
timeout = 10
|
timeout = 10
|
||||||
deadline = self.now + timeout
|
deadline = self.now + timeout
|
||||||
failures = [_EX1()] * (attempts - 1)
|
failures = [_EX1()] * (attempts - 1)
|
||||||
event = mock.sentinel.event
|
event = {'type': 'DELETED'}
|
||||||
m_handler = mock.Mock()
|
m_handler = mock.Mock()
|
||||||
m_handler.side_effect = failures + [None]
|
m_handler.side_effect = failures + [None]
|
||||||
m_sleep.return_value = 1
|
m_sleep.return_value = 1
|
||||||
|
@ -121,7 +145,7 @@ class TestRetryHandler(test_base.TestCase):
|
||||||
timeout = 10
|
timeout = 10
|
||||||
deadline = self.now + timeout
|
deadline = self.now + timeout
|
||||||
failures = [_EX1(), _EX1(), _EX11()]
|
failures = [_EX1(), _EX1(), _EX11()]
|
||||||
event = mock.sentinel.event
|
event = {'type': 'DELETED'}
|
||||||
m_handler = mock.Mock()
|
m_handler = mock.Mock()
|
||||||
m_handler.side_effect = failures
|
m_handler.side_effect = failures
|
||||||
m_sleep.side_effect = [1] * (attempts - 1) + [0]
|
m_sleep.side_effect = [1] * (attempts - 1) + [0]
|
||||||
|
|
|
@ -40,7 +40,7 @@ VALID_MULTI_POD_POOLS_OPTS = {'noop': ['neutron-vif',
|
||||||
'neutron': ['neutron-vif'],
|
'neutron': ['neutron-vif'],
|
||||||
'nested': ['nested-vlan'],
|
'nested': ['nested-vlan'],
|
||||||
}
|
}
|
||||||
DEFAULT_TIMEOUT = 180
|
DEFAULT_TIMEOUT = 500
|
||||||
DEFAULT_INTERVAL = 3
|
DEFAULT_INTERVAL = 3
|
||||||
|
|
||||||
subnet_caching_opts = [
|
subnet_caching_opts = [
|
||||||
|
|
Loading…
Reference in New Issue