From aa02a4b412ec6a47827ec8a10ed9231254670ee2 Mon Sep 17 00:00:00 2001 From: scavnicka Date: Mon, 17 Aug 2020 11:31:46 +0000 Subject: [PATCH] Leaks of loadbalancer In theory with the usage of Finalizers having leaks of loadbalancers is not possible anymore, and if the CRD is deleted it gets recreated and also the loadbalancer is recreated. This commit is deleting ensure_release_lbaas and _cleanup_leftover_lbaas functions. Change-Id: I0db62a845b23a32eef4358368332c4da2cad5460 --- .../kuryr_crds/kuryrloadbalancer.yaml | 2 + .../controller/drivers/lbaasv2.py | 7 ++ kuryr_kubernetes/controller/handlers/lbaas.py | 18 ++- .../controller/handlers/loadbalancer.py | 109 +++++++----------- .../controller/handlers/test_loadbalancer.py | 7 +- 5 files changed, 63 insertions(+), 80 deletions(-) diff --git a/kubernetes_crds/kuryr_crds/kuryrloadbalancer.yaml b/kubernetes_crds/kuryr_crds/kuryrloadbalancer.yaml index 03e0edcf2..65fa5fed7 100644 --- a/kubernetes_crds/kuryr_crds/kuryrloadbalancer.yaml +++ b/kubernetes_crds/kuryr_crds/kuryrloadbalancer.yaml @@ -108,6 +108,8 @@ spec: type: string type: type: string + provider: + type: string status: type: object properties: diff --git a/kuryr_kubernetes/controller/drivers/lbaasv2.py b/kuryr_kubernetes/controller/drivers/lbaasv2.py index daed7946a..0fd739696 100644 --- a/kuryr_kubernetes/controller/drivers/lbaasv2.py +++ b/kuryr_kubernetes/controller/drivers/lbaasv2.py @@ -47,6 +47,7 @@ _OCTAVIA_TAGGING_VERSION = 2, 5 # In order to make it simpler, we assume this is supported only from 2.13 _OCTAVIA_DL_VERSION = 2, 13 _OCTAVIA_ACL_VERSION = 2, 12 +_OCTAVIA_PROVIDER_VERSION = 2, 6 class LBaaSv2Driver(base.LBaaSDriver): @@ -58,6 +59,7 @@ class LBaaSv2Driver(base.LBaaSDriver): self._octavia_tags = False self._octavia_acls = False self._octavia_double_listeners = False + self._octavia_providers = False # Check if Octavia API supports tagging. # TODO(dulek): *Maybe* this can be replaced with # lbaas.get_api_major_version(version=_OCTAVIA_TAGGING_VERSION) @@ -80,10 +82,15 @@ class LBaaSv2Driver(base.LBaaSDriver): 'API %s does not support resource tagging. Kuryr ' 'will put requested tags in the description field of ' 'Octavia resources.', v_str) + if v >= _OCTAVIA_PROVIDER_VERSION: + self._octavia_providers = True def double_listeners_supported(self): return self._octavia_double_listeners + def providers_supported(self): + return self._octavia_providers + def get_octavia_version(self): lbaas = clients.get_loadbalancer_client() region_name = getattr(CONF.neutron, 'region_name', None) diff --git a/kuryr_kubernetes/controller/handlers/lbaas.py b/kuryr_kubernetes/controller/handlers/lbaas.py index 45f6dd62d..0e1d4e86d 100644 --- a/kuryr_kubernetes/controller/handlers/lbaas.py +++ b/kuryr_kubernetes/controller/handlers/lbaas.py @@ -258,10 +258,12 @@ class EndpointsHandler(k8s_base.ResourceEventHandler): # We need to set the requested load balancer provider # according to 'endpoints_driver_octavia_provider' configuration. self._lb_provider = None - if (config.CONF.kubernetes.endpoints_driver_octavia_provider - != 'default'): - self._lb_provider = ( - config.CONF.kubernetes.endpoints_driver_octavia_provider) + if self._drv_lbaas.providers_supported(): + self._lb_provider = 'amphora' + if (config.CONF.kubernetes.endpoints_driver_octavia_provider + != 'default'): + self._lb_provider = ( + config.CONF.kubernetes.endpoints_driver_octavia_provider) def on_present(self, endpoints): if self._move_annotations_to_crd(endpoints): @@ -349,6 +351,9 @@ class EndpointsHandler(k8s_base.ResourceEventHandler): 'status': status, } + if self._lb_provider: + loadbalancer_crd['spec']['provider'] = self._lb_provider + try: kubernetes.post('{}/{}/kuryrloadbalancers'.format( k_const.K8S_API_CRD_NAMESPACES, namespace), loadbalancer_crd) @@ -365,11 +370,14 @@ class EndpointsHandler(k8s_base.ResourceEventHandler): # TODO(maysams): Remove the convertion once we start handling # Endpoint slices. epslices = self._convert_subsets_to_endpointslice(endpoints) + spec = {'endpointSlices': epslices} + if self._lb_provider: + spec['provider'] = self._lb_provider try: kubernetes.patch_crd( 'spec', loadbalancer_crd['metadata']['selfLink'], - {'endpointSlices': epslices}) + spec) except k_exc.K8sResourceNotFound: LOG.debug('KuryrLoadbalancer CRD not found %s', loadbalancer_crd) except k_exc.K8sConflict: diff --git a/kuryr_kubernetes/controller/handlers/loadbalancer.py b/kuryr_kubernetes/controller/handlers/loadbalancer.py index 0646e99ef..8770dbe45 100644 --- a/kuryr_kubernetes/controller/handlers/loadbalancer.py +++ b/kuryr_kubernetes/controller/handlers/loadbalancer.py @@ -13,7 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import eventlet import time from oslo_log import log as logging @@ -22,14 +21,14 @@ from kuryr_kubernetes import clients from kuryr_kubernetes import config from kuryr_kubernetes import constants as k_const from kuryr_kubernetes.controller.drivers import base as drv_base -from kuryr_kubernetes.controller.drivers import utils as driver_utils from kuryr_kubernetes import exceptions as k_exc from kuryr_kubernetes.handlers import k8s_base -from kuryr_kubernetes.objects import lbaas as obj_lbaas from kuryr_kubernetes import utils LOG = logging.getLogger(__name__) +OCTAVIA_DEFAULT_PROVIDERS = ['octavia', 'amphora'] + class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): """LoadBalancerStatusHandler handles K8s Endpoints events. @@ -54,12 +53,6 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): # Load Balancer creation flow. # We need to set the requested load balancer provider # according to 'endpoints_driver_octavia_provider' configuration. - self._lb_provider = None - if (config.CONF.kubernetes.endpoints_driver_octavia_provider - != 'default'): - self._lb_provider = ( - config.CONF.kubernetes.endpoints_driver_octavia_provider) - eventlet.spawn(self._cleanup_leftover_lbaas) def on_present(self, loadbalancer_crd): if self._should_ignore(loadbalancer_crd): @@ -67,6 +60,22 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): loadbalancer_crd['metadata']['name']) return + crd_lb = loadbalancer_crd['status'].get('loadbalancer') + if crd_lb: + lb_provider = crd_lb.get('provider') + spec_lb_provider = loadbalancer_crd['spec'].get('provider') + # amphora to ovn upgrade + if not lb_provider or lb_provider in OCTAVIA_DEFAULT_PROVIDERS: + if (spec_lb_provider and + spec_lb_provider not in OCTAVIA_DEFAULT_PROVIDERS): + self._ensure_release_lbaas(loadbalancer_crd) + + # ovn to amphora downgrade + elif lb_provider and lb_provider not in OCTAVIA_DEFAULT_PROVIDERS: + if (not spec_lb_provider or + spec_lb_provider in OCTAVIA_DEFAULT_PROVIDERS): + self._ensure_release_lbaas(loadbalancer_crd) + try: name = loadbalancer_crd['metadata']['name'] namespace = loadbalancer_crd['metadata']['namespace'] @@ -703,7 +712,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): security_groups_ids=loadbalancer_crd['spec'].get( 'security_groups_ids'), service_type=loadbalancer_crd['spec'].get('type'), - provider=self._lb_provider) + provider=loadbalancer_crd['spec'].get('provider')) loadbalancer_crd['status']['loadbalancer'] = lb kubernetes = clients.get_kubernetes_client() @@ -721,47 +730,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): return changed - def _cleanup_leftover_lbaas(self): - lbaas_client = clients.get_loadbalancer_client() - services = [] - try: - services = driver_utils.get_services().get('items') - except k_exc.K8sClientException: - LOG.debug("Skipping cleanup of leftover lbaas. " - "Error retriving Kubernetes services") - return - services_cluster_ip = {service['spec']['clusterIP']: service - for service in services - if service['spec'].get('clusterIP')} - - services_without_selector = set( - service['spec']['clusterIP'] for service in services - if (service['spec'].get('clusterIP') and - not service['spec'].get('selector'))) - lbaas_spec = {} - self._drv_lbaas.add_tags('loadbalancer', lbaas_spec) - loadbalancers = lbaas_client.load_balancers(**lbaas_spec) - for loadbalancer in loadbalancers: - if loadbalancer.vip_address not in services_cluster_ip.keys(): - lb_obj = obj_lbaas.LBaaSLoadBalancer(**loadbalancer) - eventlet.spawn(self._ensure_release_lbaas, lb_obj) - else: - # check if the provider is the right one - if (loadbalancer.vip_address not in services_without_selector - and self._lb_provider - and self._lb_provider != loadbalancer.provider): - LOG.debug("Removing loadbalancer with old provider: %s", - loadbalancer) - lb_obj = obj_lbaas.LBaaSLoadBalancer(**loadbalancer) - eventlet.spawn( - self._ensure_release_lbaas, - lb_obj, - services_cluster_ip[loadbalancer.vip_address]) - # NOTE(ltomasbo): give some extra time in between lbs - # recreation actions - time.sleep(1) - - def _ensure_release_lbaas(self, lb_obj, svc=None): + def _ensure_release_lbaas(self, loadbalancer_crd): attempts = 0 deadline = 0 retry = True @@ -773,32 +742,32 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): if (attempts > 0 and utils.exponential_sleep(deadline, attempts) == 0): LOG.error("Failed releasing lbaas '%s': deadline exceeded", - lb_obj.name) + loadbalancer_crd['status']['loadbalancer'][ + 'name']) return - self._drv_lbaas.release_loadbalancer(lb_obj) + self._drv_lbaas.release_loadbalancer( + loadbalancer=loadbalancer_crd['status'].get('loadbalancer') + ) retry = False except k_exc.ResourceNotReady: LOG.debug("Attempt (%s) of loadbalancer release %s failed." " A retry will be triggered.", attempts, - lb_obj.name) + loadbalancer_crd['status']['loadbalancer']['name']) attempts += 1 retry = True - if svc: - endpoints_link = utils.get_endpoints_link(svc) + + loadbalancer_crd['status'] = {} k8s = clients.get_kubernetes_client() try: - endpoints = k8s.get(endpoints_link) + k8s.patch_crd('status', loadbalancer_crd['metadata'][ + 'selfLink'], loadbalancer_crd['status']) except k_exc.K8sResourceNotFound: - LOG.debug("Endpoint not Found.") - return - - lbaas = utils.get_lbaas_state(endpoints) - if lbaas: - lbaas.loadbalancer = None - lbaas.pools = [] - lbaas.listeners = [] - lbaas.members = [] - # NOTE(ltomasbo): give some extra time to ensure the Load - # Balancer VIP is also released - time.sleep(1) - utils.set_lbaas_state(endpoints, lbaas) + LOG.debug('KuryrLoadbalancer CRD not found %s', + loadbalancer_crd) + except k_exc.K8sClientException: + LOG.exception('Error updating KuryrLoadbalancer CRD %s', + loadbalancer_crd) + raise + # NOTE(ltomasbo): give some extra time to ensure the Load + # Balancer VIP is also released + time.sleep(1) diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py index d520e5d26..9a36637d3 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py @@ -79,7 +79,8 @@ def get_lb_crd(): ] } ], - "type": "LoadBalancer" + "type": "LoadBalancer", + "provider": "ovn" }, "status": { "listeners": [ @@ -213,14 +214,12 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase): m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg - m_cfg.kubernetes.endpoints_driver_octavia_provider = 'default' handler = h_lb.KuryrLoadBalancerHandler() self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas) self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project) self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets) self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip) - self.assertIsNone(handler._lb_provider) @mock.patch('kuryr_kubernetes.controller.drivers.base.' 'ServiceProjectDriver.get_instance') @@ -245,14 +244,12 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase): m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg - m_cfg.kubernetes.endpoints_driver_octavia_provider = 'ovn' handler = h_lb .KuryrLoadBalancerHandler() self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas) self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project) self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets) self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip) - self.assertEqual('ovn', handler._lb_provider) def test_on_present(self): m_drv_service_pub_ip = mock.Mock()