Add support for amphora to ovn-octavia upgrade
Upon kuryr-controller restart, if kuryr configuration regarding octavia provider has changed, the cleanup process not only removes the leftovers loadbalancers but also: - remove the lbs with the previous octavia provider driver - modifies endpoints annotations (provider information) to force the recreation of the loadbalancer with the new provider Change-Id: I78fd6bdd1f53ea8eb2ba85395fd2e3c651487c60
This commit is contained in:
parent
3795b84f59
commit
3042fb6cb8
@ -143,7 +143,6 @@ class LBaaSv2Driver(base.LBaaSDriver):
|
||||
return response
|
||||
|
||||
def release_loadbalancer(self, loadbalancer):
|
||||
os_net = clients.get_network_client()
|
||||
lbaas = clients.get_loadbalancer_client()
|
||||
self._release(
|
||||
loadbalancer,
|
||||
@ -152,15 +151,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
|
||||
loadbalancer.id,
|
||||
cascade=True)
|
||||
|
||||
sg_id = self._find_listeners_sg(loadbalancer)
|
||||
if sg_id:
|
||||
# Note: reusing activation timeout as deletion timeout
|
||||
self._wait_for_deletion(loadbalancer, _ACTIVATION_TIMEOUT)
|
||||
try:
|
||||
os_net.delete_security_group(sg_id)
|
||||
except os_exc.SDKException:
|
||||
LOG.exception('Error when deleting loadbalancer security '
|
||||
'group. Leaving it orphaned.')
|
||||
self._wait_for_deletion(loadbalancer, _ACTIVATION_TIMEOUT)
|
||||
|
||||
def _create_listeners_acls(self, loadbalancer, port, target_port,
|
||||
protocol, lb_sg, new_sgs, listener_id):
|
||||
@ -735,26 +726,6 @@ class LBaaSv2Driver(base.LBaaSDriver):
|
||||
if interval:
|
||||
time.sleep(interval)
|
||||
|
||||
def _find_listeners_sg(self, loadbalancer):
|
||||
os_net = clients.get_network_client()
|
||||
try:
|
||||
sgs = os_net.security_groups(name=loadbalancer.name,
|
||||
project_id=loadbalancer.project_id)
|
||||
for sg in sgs:
|
||||
try:
|
||||
if sg.id in loadbalancer.security_groups:
|
||||
return sg.id
|
||||
except TypeError:
|
||||
LOG.exception('Loadbalancer %s does not have '
|
||||
'security_groups defined.',
|
||||
loadbalancer.name)
|
||||
raise
|
||||
except os_exc.SDKException:
|
||||
LOG.exception('Cannot list security groups for loadbalancer %s.',
|
||||
loadbalancer.name)
|
||||
|
||||
return None
|
||||
|
||||
def update_lbaas_sg(self, service, sgs):
|
||||
LOG.debug('Setting SG for LBaaS VIP port')
|
||||
|
||||
|
@ -17,6 +17,7 @@ import eventlet
|
||||
import time
|
||||
|
||||
from kuryr.lib._i18n import _
|
||||
from openstack import exceptions as os_exc
|
||||
from oslo_log import log as logging
|
||||
|
||||
from kuryr_kubernetes import clients
|
||||
@ -186,6 +187,22 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||
lbaas_state = utils.get_lbaas_state(endpoints)
|
||||
if not lbaas_state:
|
||||
lbaas_state = obj_lbaas.LBaaSState()
|
||||
elif (lbaas_state.loadbalancer and self._lb_provider and
|
||||
self._lb_provider != lbaas_state.loadbalancer.provider):
|
||||
LOG.info("LoadBalancer associated to the service does not match "
|
||||
"the current provider: %s", lbaas_state.loadbalancer.id)
|
||||
lb_client = clients.get_loadbalancer_client()
|
||||
try:
|
||||
lb_client.get_load_balancer(lbaas_state.loadbalancer.id)
|
||||
except os_exc.NotFoundException:
|
||||
# NOTE(ltomasbo): If the loadbalancer is gone, remove the
|
||||
# annotations to ensure it is reprocessed
|
||||
lbaas_state.loadbalancer = None
|
||||
lbaas_state.pools = []
|
||||
lbaas_state.listeners = []
|
||||
lbaas_state.members = []
|
||||
utils.set_lbaas_state(endpoints, lbaas_state)
|
||||
return
|
||||
|
||||
if self._sync_lbaas_members(endpoints, lbaas_state, lbaas_spec):
|
||||
# Note(yboaron) For LoadBalancer services, we should allocate FIP,
|
||||
@ -649,18 +666,38 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||
LOG.debug("Skipping cleanup of leftover lbaas. "
|
||||
"Error retriving Kubernetes services")
|
||||
return
|
||||
services_cluster_ip = set(service['spec']['clusterIP']
|
||||
for service in services
|
||||
if service['spec'].get('clusterIP'))
|
||||
services_cluster_ip = {service['spec']['clusterIP']: service
|
||||
for service in services
|
||||
if service['spec'].get('clusterIP')}
|
||||
|
||||
services_without_selector = set(
|
||||
service['spec']['clusterIP'] for service in services
|
||||
if (service['spec'].get('clusterIP') and
|
||||
not service['spec'].get('selector')))
|
||||
lbaas_spec = {}
|
||||
self._drv_lbaas.add_tags('loadbalancer', lbaas_spec)
|
||||
loadbalancers = lbaas_client.load_balancers(**lbaas_spec)
|
||||
for loadbalancer in loadbalancers:
|
||||
if loadbalancer.vip_address not in services_cluster_ip:
|
||||
if loadbalancer.vip_address not in services_cluster_ip.keys():
|
||||
lb_obj = obj_lbaas.LBaaSLoadBalancer(**loadbalancer)
|
||||
eventlet.spawn(self._ensure_release_lbaas, lb_obj)
|
||||
else:
|
||||
# check if the provider is the right one
|
||||
if (loadbalancer.vip_address not in services_without_selector
|
||||
and self._lb_provider
|
||||
and self._lb_provider != loadbalancer.provider):
|
||||
LOG.debug("Removing loadbalancer with old provider: %s",
|
||||
loadbalancer)
|
||||
lb_obj = obj_lbaas.LBaaSLoadBalancer(**loadbalancer)
|
||||
eventlet.spawn(
|
||||
self._ensure_release_lbaas,
|
||||
lb_obj,
|
||||
services_cluster_ip[loadbalancer.vip_address])
|
||||
# NOTE(ltomasbo): give some extra time in between lbs
|
||||
# recreation actions
|
||||
time.sleep(1)
|
||||
|
||||
def _ensure_release_lbaas(self, lb_obj):
|
||||
def _ensure_release_lbaas(self, lb_obj, svc=None):
|
||||
attempts = 0
|
||||
deadline = 0
|
||||
retry = True
|
||||
@ -678,6 +715,26 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||
retry = False
|
||||
except k_exc.ResourceNotReady:
|
||||
LOG.debug("Attempt (%s) of loadbalancer release %s failed."
|
||||
" A retry will be triggered.", attempts, lb_obj.name)
|
||||
" A retry will be triggered.", attempts,
|
||||
lb_obj.name)
|
||||
attempts += 1
|
||||
retry = True
|
||||
if svc:
|
||||
endpoints_link = utils.get_endpoints_link(svc)
|
||||
k8s = clients.get_kubernetes_client()
|
||||
try:
|
||||
endpoints = k8s.get(endpoints_link)
|
||||
except k_exc.K8sResourceNotFound:
|
||||
LOG.debug("Endpoint not Found.")
|
||||
return
|
||||
|
||||
lbaas = utils.get_lbaas_state(endpoints)
|
||||
if lbaas:
|
||||
lbaas.loadbalancer = None
|
||||
lbaas.pools = []
|
||||
lbaas.listeners = []
|
||||
lbaas.members = []
|
||||
# NOTE(ltomasbo): give some extra time to ensure the Load
|
||||
# Balancer VIP is also released
|
||||
time.sleep(1)
|
||||
utils.set_lbaas_state(endpoints, lbaas)
|
||||
|
@ -458,6 +458,7 @@ class TestLoadBalancerHandler(test_base.TestCase):
|
||||
m_get_lbaas_state.return_value = lbaas_state
|
||||
m_handler._sync_lbaas_members.return_value = True
|
||||
m_handler._drv_service_pub_ip = m_drv_service_pub_ip
|
||||
m_handler._lb_provider = None
|
||||
|
||||
h_lbaas.LoadBalancerHandler.on_present(m_handler, endpoints)
|
||||
|
||||
@ -489,6 +490,9 @@ class TestLoadBalancerHandler(test_base.TestCase):
|
||||
|
||||
lbaas_state = mock.sentinel.lbaas_state
|
||||
lbaas_state.service_pub_ip_info = None
|
||||
loadbalancer = mock.Mock()
|
||||
loadbalancer.port_id = 12345678
|
||||
lbaas_state.loadbalancer = loadbalancer
|
||||
endpoints = mock.sentinel.endpoints
|
||||
|
||||
floating_ip = {'floating_ip_address': '1.2.3.5',
|
||||
@ -509,6 +513,7 @@ class TestLoadBalancerHandler(test_base.TestCase):
|
||||
m_get_lbaas_state.return_value = lbaas_state
|
||||
m_handler._sync_lbaas_members = self._fake_sync_lbaas_members
|
||||
m_handler._drv_service_pub_ip = m_drv_service_pub_ip
|
||||
m_handler._lb_provider = None
|
||||
|
||||
h_lbaas.LoadBalancerHandler.on_present(m_handler, endpoints)
|
||||
|
||||
@ -547,6 +552,7 @@ class TestLoadBalancerHandler(test_base.TestCase):
|
||||
m_set_lbaas_state.side_effect = (
|
||||
k_exc.K8sResourceNotFound('ep'))
|
||||
m_handler._drv_service_pub_ip = m_drv_service_pub_ip
|
||||
m_handler._lb_provider = None
|
||||
h_lbaas.LoadBalancerHandler.on_present(m_handler, endpoints)
|
||||
|
||||
m_get_lbaas_spec.assert_called_once_with(endpoints)
|
||||
|
Loading…
Reference in New Issue
Block a user