Merge "Leaks of loadbalancer"
This commit is contained in:
commit
1d897a18e9
|
@ -108,6 +108,8 @@ spec:
|
|||
type: string
|
||||
type:
|
||||
type: string
|
||||
provider:
|
||||
type: string
|
||||
status:
|
||||
type: object
|
||||
properties:
|
||||
|
|
|
@ -47,6 +47,7 @@ _OCTAVIA_TAGGING_VERSION = 2, 5
|
|||
# In order to make it simpler, we assume this is supported only from 2.13
|
||||
_OCTAVIA_DL_VERSION = 2, 13
|
||||
_OCTAVIA_ACL_VERSION = 2, 12
|
||||
_OCTAVIA_PROVIDER_VERSION = 2, 6
|
||||
|
||||
|
||||
class LBaaSv2Driver(base.LBaaSDriver):
|
||||
|
@ -58,6 +59,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
|
|||
self._octavia_tags = False
|
||||
self._octavia_acls = False
|
||||
self._octavia_double_listeners = False
|
||||
self._octavia_providers = False
|
||||
# Check if Octavia API supports tagging.
|
||||
# TODO(dulek): *Maybe* this can be replaced with
|
||||
# lbaas.get_api_major_version(version=_OCTAVIA_TAGGING_VERSION)
|
||||
|
@ -80,10 +82,15 @@ class LBaaSv2Driver(base.LBaaSDriver):
|
|||
'API %s does not support resource tagging. Kuryr '
|
||||
'will put requested tags in the description field of '
|
||||
'Octavia resources.', v_str)
|
||||
if v >= _OCTAVIA_PROVIDER_VERSION:
|
||||
self._octavia_providers = True
|
||||
|
||||
def double_listeners_supported(self):
|
||||
return self._octavia_double_listeners
|
||||
|
||||
def providers_supported(self):
|
||||
return self._octavia_providers
|
||||
|
||||
def get_octavia_version(self):
|
||||
lbaas = clients.get_loadbalancer_client()
|
||||
region_name = getattr(CONF.neutron, 'region_name', None)
|
||||
|
|
|
@ -258,10 +258,12 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
|
|||
# We need to set the requested load balancer provider
|
||||
# according to 'endpoints_driver_octavia_provider' configuration.
|
||||
self._lb_provider = None
|
||||
if (config.CONF.kubernetes.endpoints_driver_octavia_provider
|
||||
!= 'default'):
|
||||
self._lb_provider = (
|
||||
config.CONF.kubernetes.endpoints_driver_octavia_provider)
|
||||
if self._drv_lbaas.providers_supported():
|
||||
self._lb_provider = 'amphora'
|
||||
if (config.CONF.kubernetes.endpoints_driver_octavia_provider
|
||||
!= 'default'):
|
||||
self._lb_provider = (
|
||||
config.CONF.kubernetes.endpoints_driver_octavia_provider)
|
||||
|
||||
def on_present(self, endpoints):
|
||||
if self._move_annotations_to_crd(endpoints):
|
||||
|
@ -349,6 +351,9 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
|
|||
'status': status,
|
||||
}
|
||||
|
||||
if self._lb_provider:
|
||||
loadbalancer_crd['spec']['provider'] = self._lb_provider
|
||||
|
||||
try:
|
||||
kubernetes.post('{}/{}/kuryrloadbalancers'.format(
|
||||
k_const.K8S_API_CRD_NAMESPACES, namespace), loadbalancer_crd)
|
||||
|
@ -365,11 +370,14 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
|
|||
# TODO(maysams): Remove the convertion once we start handling
|
||||
# Endpoint slices.
|
||||
epslices = self._convert_subsets_to_endpointslice(endpoints)
|
||||
spec = {'endpointSlices': epslices}
|
||||
if self._lb_provider:
|
||||
spec['provider'] = self._lb_provider
|
||||
try:
|
||||
kubernetes.patch_crd(
|
||||
'spec',
|
||||
loadbalancer_crd['metadata']['selfLink'],
|
||||
{'endpointSlices': epslices})
|
||||
spec)
|
||||
except k_exc.K8sResourceNotFound:
|
||||
LOG.debug('KuryrLoadbalancer CRD not found %s', loadbalancer_crd)
|
||||
except k_exc.K8sConflict:
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
import time
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
@ -22,14 +21,14 @@ from kuryr_kubernetes import clients
|
|||
from kuryr_kubernetes import config
|
||||
from kuryr_kubernetes import constants as k_const
|
||||
from kuryr_kubernetes.controller.drivers import base as drv_base
|
||||
from kuryr_kubernetes.controller.drivers import utils as driver_utils
|
||||
from kuryr_kubernetes import exceptions as k_exc
|
||||
from kuryr_kubernetes.handlers import k8s_base
|
||||
from kuryr_kubernetes.objects import lbaas as obj_lbaas
|
||||
from kuryr_kubernetes import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
OCTAVIA_DEFAULT_PROVIDERS = ['octavia', 'amphora']
|
||||
|
||||
|
||||
class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||
"""LoadBalancerStatusHandler handles K8s Endpoints events.
|
||||
|
@ -54,12 +53,6 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
|||
# Load Balancer creation flow.
|
||||
# We need to set the requested load balancer provider
|
||||
# according to 'endpoints_driver_octavia_provider' configuration.
|
||||
self._lb_provider = None
|
||||
if (config.CONF.kubernetes.endpoints_driver_octavia_provider
|
||||
!= 'default'):
|
||||
self._lb_provider = (
|
||||
config.CONF.kubernetes.endpoints_driver_octavia_provider)
|
||||
eventlet.spawn(self._cleanup_leftover_lbaas)
|
||||
|
||||
def on_present(self, loadbalancer_crd):
|
||||
if self._should_ignore(loadbalancer_crd):
|
||||
|
@ -67,6 +60,22 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
|||
loadbalancer_crd['metadata']['name'])
|
||||
return
|
||||
|
||||
crd_lb = loadbalancer_crd['status'].get('loadbalancer')
|
||||
if crd_lb:
|
||||
lb_provider = crd_lb.get('provider')
|
||||
spec_lb_provider = loadbalancer_crd['spec'].get('provider')
|
||||
# amphora to ovn upgrade
|
||||
if not lb_provider or lb_provider in OCTAVIA_DEFAULT_PROVIDERS:
|
||||
if (spec_lb_provider and
|
||||
spec_lb_provider not in OCTAVIA_DEFAULT_PROVIDERS):
|
||||
self._ensure_release_lbaas(loadbalancer_crd)
|
||||
|
||||
# ovn to amphora downgrade
|
||||
elif lb_provider and lb_provider not in OCTAVIA_DEFAULT_PROVIDERS:
|
||||
if (not spec_lb_provider or
|
||||
spec_lb_provider in OCTAVIA_DEFAULT_PROVIDERS):
|
||||
self._ensure_release_lbaas(loadbalancer_crd)
|
||||
|
||||
try:
|
||||
name = loadbalancer_crd['metadata']['name']
|
||||
namespace = loadbalancer_crd['metadata']['namespace']
|
||||
|
@ -703,7 +712,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
|||
security_groups_ids=loadbalancer_crd['spec'].get(
|
||||
'security_groups_ids'),
|
||||
service_type=loadbalancer_crd['spec'].get('type'),
|
||||
provider=self._lb_provider)
|
||||
provider=loadbalancer_crd['spec'].get('provider'))
|
||||
loadbalancer_crd['status']['loadbalancer'] = lb
|
||||
|
||||
kubernetes = clients.get_kubernetes_client()
|
||||
|
@ -721,47 +730,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
|||
|
||||
return changed
|
||||
|
||||
def _cleanup_leftover_lbaas(self):
|
||||
lbaas_client = clients.get_loadbalancer_client()
|
||||
services = []
|
||||
try:
|
||||
services = driver_utils.get_services().get('items')
|
||||
except k_exc.K8sClientException:
|
||||
LOG.debug("Skipping cleanup of leftover lbaas. "
|
||||
"Error retriving Kubernetes services")
|
||||
return
|
||||
services_cluster_ip = {service['spec']['clusterIP']: service
|
||||
for service in services
|
||||
if service['spec'].get('clusterIP')}
|
||||
|
||||
services_without_selector = set(
|
||||
service['spec']['clusterIP'] for service in services
|
||||
if (service['spec'].get('clusterIP') and
|
||||
not service['spec'].get('selector')))
|
||||
lbaas_spec = {}
|
||||
self._drv_lbaas.add_tags('loadbalancer', lbaas_spec)
|
||||
loadbalancers = lbaas_client.load_balancers(**lbaas_spec)
|
||||
for loadbalancer in loadbalancers:
|
||||
if loadbalancer.vip_address not in services_cluster_ip.keys():
|
||||
lb_obj = obj_lbaas.LBaaSLoadBalancer(**loadbalancer)
|
||||
eventlet.spawn(self._ensure_release_lbaas, lb_obj)
|
||||
else:
|
||||
# check if the provider is the right one
|
||||
if (loadbalancer.vip_address not in services_without_selector
|
||||
and self._lb_provider
|
||||
and self._lb_provider != loadbalancer.provider):
|
||||
LOG.debug("Removing loadbalancer with old provider: %s",
|
||||
loadbalancer)
|
||||
lb_obj = obj_lbaas.LBaaSLoadBalancer(**loadbalancer)
|
||||
eventlet.spawn(
|
||||
self._ensure_release_lbaas,
|
||||
lb_obj,
|
||||
services_cluster_ip[loadbalancer.vip_address])
|
||||
# NOTE(ltomasbo): give some extra time in between lbs
|
||||
# recreation actions
|
||||
time.sleep(1)
|
||||
|
||||
def _ensure_release_lbaas(self, lb_obj, svc=None):
|
||||
def _ensure_release_lbaas(self, loadbalancer_crd):
|
||||
attempts = 0
|
||||
deadline = 0
|
||||
retry = True
|
||||
|
@ -773,32 +742,32 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
|||
if (attempts > 0 and
|
||||
utils.exponential_sleep(deadline, attempts) == 0):
|
||||
LOG.error("Failed releasing lbaas '%s': deadline exceeded",
|
||||
lb_obj.name)
|
||||
loadbalancer_crd['status']['loadbalancer'][
|
||||
'name'])
|
||||
return
|
||||
self._drv_lbaas.release_loadbalancer(lb_obj)
|
||||
self._drv_lbaas.release_loadbalancer(
|
||||
loadbalancer=loadbalancer_crd['status'].get('loadbalancer')
|
||||
)
|
||||
retry = False
|
||||
except k_exc.ResourceNotReady:
|
||||
LOG.debug("Attempt (%s) of loadbalancer release %s failed."
|
||||
" A retry will be triggered.", attempts,
|
||||
lb_obj.name)
|
||||
loadbalancer_crd['status']['loadbalancer']['name'])
|
||||
attempts += 1
|
||||
retry = True
|
||||
if svc:
|
||||
endpoints_link = utils.get_endpoints_link(svc)
|
||||
|
||||
loadbalancer_crd['status'] = {}
|
||||
k8s = clients.get_kubernetes_client()
|
||||
try:
|
||||
endpoints = k8s.get(endpoints_link)
|
||||
k8s.patch_crd('status', loadbalancer_crd['metadata'][
|
||||
'selfLink'], loadbalancer_crd['status'])
|
||||
except k_exc.K8sResourceNotFound:
|
||||
LOG.debug("Endpoint not Found.")
|
||||
return
|
||||
|
||||
lbaas = utils.get_lbaas_state(endpoints)
|
||||
if lbaas:
|
||||
lbaas.loadbalancer = None
|
||||
lbaas.pools = []
|
||||
lbaas.listeners = []
|
||||
lbaas.members = []
|
||||
# NOTE(ltomasbo): give some extra time to ensure the Load
|
||||
# Balancer VIP is also released
|
||||
time.sleep(1)
|
||||
utils.set_lbaas_state(endpoints, lbaas)
|
||||
LOG.debug('KuryrLoadbalancer CRD not found %s',
|
||||
loadbalancer_crd)
|
||||
except k_exc.K8sClientException:
|
||||
LOG.exception('Error updating KuryrLoadbalancer CRD %s',
|
||||
loadbalancer_crd)
|
||||
raise
|
||||
# NOTE(ltomasbo): give some extra time to ensure the Load
|
||||
# Balancer VIP is also released
|
||||
time.sleep(1)
|
||||
|
|
|
@ -79,7 +79,8 @@ def get_lb_crd():
|
|||
]
|
||||
}
|
||||
],
|
||||
"type": "LoadBalancer"
|
||||
"type": "LoadBalancer",
|
||||
"provider": "ovn"
|
||||
},
|
||||
"status": {
|
||||
"listeners": [
|
||||
|
@ -213,14 +214,12 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
|
|||
m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip
|
||||
m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project
|
||||
m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg
|
||||
m_cfg.kubernetes.endpoints_driver_octavia_provider = 'default'
|
||||
handler = h_lb.KuryrLoadBalancerHandler()
|
||||
|
||||
self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas)
|
||||
self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project)
|
||||
self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets)
|
||||
self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip)
|
||||
self.assertIsNone(handler._lb_provider)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
|
||||
'ServiceProjectDriver.get_instance')
|
||||
|
@ -245,14 +244,12 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
|
|||
m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip
|
||||
m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project
|
||||
m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg
|
||||
m_cfg.kubernetes.endpoints_driver_octavia_provider = 'ovn'
|
||||
handler = h_lb .KuryrLoadBalancerHandler()
|
||||
|
||||
self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas)
|
||||
self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project)
|
||||
self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets)
|
||||
self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip)
|
||||
self.assertEqual('ovn', handler._lb_provider)
|
||||
|
||||
def test_on_present(self):
|
||||
m_drv_service_pub_ip = mock.Mock()
|
||||
|
|
Loading…
Reference in New Issue