Leaks of loadbalancer

In theory with the usage of Finalizers having leaks of loadbalancers
is not possible anymore, and if the CRD is deleted it gets recreated
and also the loadbalancer is recreated.

This commit is deleting ensure_release_lbaas and _cleanup_leftover_lbaas
functions.

Change-Id: I0db62a845b23a32eef4358368332c4da2cad5460
This commit is contained in:
scavnicka 2020-08-17 11:31:46 +00:00 committed by Luis Tomas Bolivar
parent c8e94004ae
commit aa02a4b412
5 changed files with 63 additions and 80 deletions

View File

@ -108,6 +108,8 @@ spec:
type: string type: string
type: type:
type: string type: string
provider:
type: string
status: status:
type: object type: object
properties: properties:

View File

@ -47,6 +47,7 @@ _OCTAVIA_TAGGING_VERSION = 2, 5
# In order to make it simpler, we assume this is supported only from 2.13 # In order to make it simpler, we assume this is supported only from 2.13
_OCTAVIA_DL_VERSION = 2, 13 _OCTAVIA_DL_VERSION = 2, 13
_OCTAVIA_ACL_VERSION = 2, 12 _OCTAVIA_ACL_VERSION = 2, 12
_OCTAVIA_PROVIDER_VERSION = 2, 6
class LBaaSv2Driver(base.LBaaSDriver): class LBaaSv2Driver(base.LBaaSDriver):
@ -58,6 +59,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
self._octavia_tags = False self._octavia_tags = False
self._octavia_acls = False self._octavia_acls = False
self._octavia_double_listeners = False self._octavia_double_listeners = False
self._octavia_providers = False
# Check if Octavia API supports tagging. # Check if Octavia API supports tagging.
# TODO(dulek): *Maybe* this can be replaced with # TODO(dulek): *Maybe* this can be replaced with
# lbaas.get_api_major_version(version=_OCTAVIA_TAGGING_VERSION) # lbaas.get_api_major_version(version=_OCTAVIA_TAGGING_VERSION)
@ -80,10 +82,15 @@ class LBaaSv2Driver(base.LBaaSDriver):
'API %s does not support resource tagging. Kuryr ' 'API %s does not support resource tagging. Kuryr '
'will put requested tags in the description field of ' 'will put requested tags in the description field of '
'Octavia resources.', v_str) 'Octavia resources.', v_str)
if v >= _OCTAVIA_PROVIDER_VERSION:
self._octavia_providers = True
def double_listeners_supported(self): def double_listeners_supported(self):
return self._octavia_double_listeners return self._octavia_double_listeners
def providers_supported(self):
return self._octavia_providers
def get_octavia_version(self): def get_octavia_version(self):
lbaas = clients.get_loadbalancer_client() lbaas = clients.get_loadbalancer_client()
region_name = getattr(CONF.neutron, 'region_name', None) region_name = getattr(CONF.neutron, 'region_name', None)

View File

@ -258,6 +258,8 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
# We need to set the requested load balancer provider # We need to set the requested load balancer provider
# according to 'endpoints_driver_octavia_provider' configuration. # according to 'endpoints_driver_octavia_provider' configuration.
self._lb_provider = None self._lb_provider = None
if self._drv_lbaas.providers_supported():
self._lb_provider = 'amphora'
if (config.CONF.kubernetes.endpoints_driver_octavia_provider if (config.CONF.kubernetes.endpoints_driver_octavia_provider
!= 'default'): != 'default'):
self._lb_provider = ( self._lb_provider = (
@ -349,6 +351,9 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
'status': status, 'status': status,
} }
if self._lb_provider:
loadbalancer_crd['spec']['provider'] = self._lb_provider
try: try:
kubernetes.post('{}/{}/kuryrloadbalancers'.format( kubernetes.post('{}/{}/kuryrloadbalancers'.format(
k_const.K8S_API_CRD_NAMESPACES, namespace), loadbalancer_crd) k_const.K8S_API_CRD_NAMESPACES, namespace), loadbalancer_crd)
@ -365,11 +370,14 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
# TODO(maysams): Remove the convertion once we start handling # TODO(maysams): Remove the convertion once we start handling
# Endpoint slices. # Endpoint slices.
epslices = self._convert_subsets_to_endpointslice(endpoints) epslices = self._convert_subsets_to_endpointslice(endpoints)
spec = {'endpointSlices': epslices}
if self._lb_provider:
spec['provider'] = self._lb_provider
try: try:
kubernetes.patch_crd( kubernetes.patch_crd(
'spec', 'spec',
loadbalancer_crd['metadata']['selfLink'], loadbalancer_crd['metadata']['selfLink'],
{'endpointSlices': epslices}) spec)
except k_exc.K8sResourceNotFound: except k_exc.K8sResourceNotFound:
LOG.debug('KuryrLoadbalancer CRD not found %s', loadbalancer_crd) LOG.debug('KuryrLoadbalancer CRD not found %s', loadbalancer_crd)
except k_exc.K8sConflict: except k_exc.K8sConflict:

View File

@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import eventlet
import time import time
from oslo_log import log as logging from oslo_log import log as logging
@ -22,14 +21,14 @@ from kuryr_kubernetes import clients
from kuryr_kubernetes import config from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes.controller.drivers import base as drv_base from kuryr_kubernetes.controller.drivers import base as drv_base
from kuryr_kubernetes.controller.drivers import utils as driver_utils
from kuryr_kubernetes import exceptions as k_exc from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.handlers import k8s_base from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes.objects import lbaas as obj_lbaas
from kuryr_kubernetes import utils from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
OCTAVIA_DEFAULT_PROVIDERS = ['octavia', 'amphora']
class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
"""LoadBalancerStatusHandler handles K8s Endpoints events. """LoadBalancerStatusHandler handles K8s Endpoints events.
@ -54,12 +53,6 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
# Load Balancer creation flow. # Load Balancer creation flow.
# We need to set the requested load balancer provider # We need to set the requested load balancer provider
# according to 'endpoints_driver_octavia_provider' configuration. # according to 'endpoints_driver_octavia_provider' configuration.
self._lb_provider = None
if (config.CONF.kubernetes.endpoints_driver_octavia_provider
!= 'default'):
self._lb_provider = (
config.CONF.kubernetes.endpoints_driver_octavia_provider)
eventlet.spawn(self._cleanup_leftover_lbaas)
def on_present(self, loadbalancer_crd): def on_present(self, loadbalancer_crd):
if self._should_ignore(loadbalancer_crd): if self._should_ignore(loadbalancer_crd):
@ -67,6 +60,22 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
loadbalancer_crd['metadata']['name']) loadbalancer_crd['metadata']['name'])
return return
crd_lb = loadbalancer_crd['status'].get('loadbalancer')
if crd_lb:
lb_provider = crd_lb.get('provider')
spec_lb_provider = loadbalancer_crd['spec'].get('provider')
# amphora to ovn upgrade
if not lb_provider or lb_provider in OCTAVIA_DEFAULT_PROVIDERS:
if (spec_lb_provider and
spec_lb_provider not in OCTAVIA_DEFAULT_PROVIDERS):
self._ensure_release_lbaas(loadbalancer_crd)
# ovn to amphora downgrade
elif lb_provider and lb_provider not in OCTAVIA_DEFAULT_PROVIDERS:
if (not spec_lb_provider or
spec_lb_provider in OCTAVIA_DEFAULT_PROVIDERS):
self._ensure_release_lbaas(loadbalancer_crd)
try: try:
name = loadbalancer_crd['metadata']['name'] name = loadbalancer_crd['metadata']['name']
namespace = loadbalancer_crd['metadata']['namespace'] namespace = loadbalancer_crd['metadata']['namespace']
@ -703,7 +712,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
security_groups_ids=loadbalancer_crd['spec'].get( security_groups_ids=loadbalancer_crd['spec'].get(
'security_groups_ids'), 'security_groups_ids'),
service_type=loadbalancer_crd['spec'].get('type'), service_type=loadbalancer_crd['spec'].get('type'),
provider=self._lb_provider) provider=loadbalancer_crd['spec'].get('provider'))
loadbalancer_crd['status']['loadbalancer'] = lb loadbalancer_crd['status']['loadbalancer'] = lb
kubernetes = clients.get_kubernetes_client() kubernetes = clients.get_kubernetes_client()
@ -721,47 +730,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
return changed return changed
def _cleanup_leftover_lbaas(self): def _ensure_release_lbaas(self, loadbalancer_crd):
lbaas_client = clients.get_loadbalancer_client()
services = []
try:
services = driver_utils.get_services().get('items')
except k_exc.K8sClientException:
LOG.debug("Skipping cleanup of leftover lbaas. "
"Error retriving Kubernetes services")
return
services_cluster_ip = {service['spec']['clusterIP']: service
for service in services
if service['spec'].get('clusterIP')}
services_without_selector = set(
service['spec']['clusterIP'] for service in services
if (service['spec'].get('clusterIP') and
not service['spec'].get('selector')))
lbaas_spec = {}
self._drv_lbaas.add_tags('loadbalancer', lbaas_spec)
loadbalancers = lbaas_client.load_balancers(**lbaas_spec)
for loadbalancer in loadbalancers:
if loadbalancer.vip_address not in services_cluster_ip.keys():
lb_obj = obj_lbaas.LBaaSLoadBalancer(**loadbalancer)
eventlet.spawn(self._ensure_release_lbaas, lb_obj)
else:
# check if the provider is the right one
if (loadbalancer.vip_address not in services_without_selector
and self._lb_provider
and self._lb_provider != loadbalancer.provider):
LOG.debug("Removing loadbalancer with old provider: %s",
loadbalancer)
lb_obj = obj_lbaas.LBaaSLoadBalancer(**loadbalancer)
eventlet.spawn(
self._ensure_release_lbaas,
lb_obj,
services_cluster_ip[loadbalancer.vip_address])
# NOTE(ltomasbo): give some extra time in between lbs
# recreation actions
time.sleep(1)
def _ensure_release_lbaas(self, lb_obj, svc=None):
attempts = 0 attempts = 0
deadline = 0 deadline = 0
retry = True retry = True
@ -773,32 +742,32 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
if (attempts > 0 and if (attempts > 0 and
utils.exponential_sleep(deadline, attempts) == 0): utils.exponential_sleep(deadline, attempts) == 0):
LOG.error("Failed releasing lbaas '%s': deadline exceeded", LOG.error("Failed releasing lbaas '%s': deadline exceeded",
lb_obj.name) loadbalancer_crd['status']['loadbalancer'][
'name'])
return return
self._drv_lbaas.release_loadbalancer(lb_obj) self._drv_lbaas.release_loadbalancer(
loadbalancer=loadbalancer_crd['status'].get('loadbalancer')
)
retry = False retry = False
except k_exc.ResourceNotReady: except k_exc.ResourceNotReady:
LOG.debug("Attempt (%s) of loadbalancer release %s failed." LOG.debug("Attempt (%s) of loadbalancer release %s failed."
" A retry will be triggered.", attempts, " A retry will be triggered.", attempts,
lb_obj.name) loadbalancer_crd['status']['loadbalancer']['name'])
attempts += 1 attempts += 1
retry = True retry = True
if svc:
endpoints_link = utils.get_endpoints_link(svc) loadbalancer_crd['status'] = {}
k8s = clients.get_kubernetes_client() k8s = clients.get_kubernetes_client()
try: try:
endpoints = k8s.get(endpoints_link) k8s.patch_crd('status', loadbalancer_crd['metadata'][
'selfLink'], loadbalancer_crd['status'])
except k_exc.K8sResourceNotFound: except k_exc.K8sResourceNotFound:
LOG.debug("Endpoint not Found.") LOG.debug('KuryrLoadbalancer CRD not found %s',
return loadbalancer_crd)
except k_exc.K8sClientException:
lbaas = utils.get_lbaas_state(endpoints) LOG.exception('Error updating KuryrLoadbalancer CRD %s',
if lbaas: loadbalancer_crd)
lbaas.loadbalancer = None raise
lbaas.pools = []
lbaas.listeners = []
lbaas.members = []
# NOTE(ltomasbo): give some extra time to ensure the Load # NOTE(ltomasbo): give some extra time to ensure the Load
# Balancer VIP is also released # Balancer VIP is also released
time.sleep(1) time.sleep(1)
utils.set_lbaas_state(endpoints, lbaas)

View File

@ -79,7 +79,8 @@ def get_lb_crd():
] ]
} }
], ],
"type": "LoadBalancer" "type": "LoadBalancer",
"provider": "ovn"
}, },
"status": { "status": {
"listeners": [ "listeners": [
@ -213,14 +214,12 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip
m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project
m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg
m_cfg.kubernetes.endpoints_driver_octavia_provider = 'default'
handler = h_lb.KuryrLoadBalancerHandler() handler = h_lb.KuryrLoadBalancerHandler()
self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas) self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas)
self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project) self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project)
self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets) self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets)
self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip) self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip)
self.assertIsNone(handler._lb_provider)
@mock.patch('kuryr_kubernetes.controller.drivers.base.' @mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceProjectDriver.get_instance') 'ServiceProjectDriver.get_instance')
@ -245,14 +244,12 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip
m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project
m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg
m_cfg.kubernetes.endpoints_driver_octavia_provider = 'ovn'
handler = h_lb .KuryrLoadBalancerHandler() handler = h_lb .KuryrLoadBalancerHandler()
self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas) self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas)
self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project) self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project)
self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets) self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets)
self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip) self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip)
self.assertEqual('ovn', handler._lb_provider)
def test_on_present(self): def test_on_present(self):
m_drv_service_pub_ip = mock.Mock() m_drv_service_pub_ip = mock.Mock()