diff --git a/kubernetes_crds/kuryr_crds/kuryrloadbalancer.yaml b/kubernetes_crds/kuryr_crds/kuryrloadbalancer.yaml index 03e0edcf2..65fa5fed7 100644 --- a/kubernetes_crds/kuryr_crds/kuryrloadbalancer.yaml +++ b/kubernetes_crds/kuryr_crds/kuryrloadbalancer.yaml @@ -108,6 +108,8 @@ spec: type: string type: type: string + provider: + type: string status: type: object properties: diff --git a/kuryr_kubernetes/controller/drivers/lbaasv2.py b/kuryr_kubernetes/controller/drivers/lbaasv2.py index daed7946a..0fd739696 100644 --- a/kuryr_kubernetes/controller/drivers/lbaasv2.py +++ b/kuryr_kubernetes/controller/drivers/lbaasv2.py @@ -47,6 +47,7 @@ _OCTAVIA_TAGGING_VERSION = 2, 5 # In order to make it simpler, we assume this is supported only from 2.13 _OCTAVIA_DL_VERSION = 2, 13 _OCTAVIA_ACL_VERSION = 2, 12 +_OCTAVIA_PROVIDER_VERSION = 2, 6 class LBaaSv2Driver(base.LBaaSDriver): @@ -58,6 +59,7 @@ class LBaaSv2Driver(base.LBaaSDriver): self._octavia_tags = False self._octavia_acls = False self._octavia_double_listeners = False + self._octavia_providers = False # Check if Octavia API supports tagging. # TODO(dulek): *Maybe* this can be replaced with # lbaas.get_api_major_version(version=_OCTAVIA_TAGGING_VERSION) @@ -80,10 +82,15 @@ class LBaaSv2Driver(base.LBaaSDriver): 'API %s does not support resource tagging. Kuryr ' 'will put requested tags in the description field of ' 'Octavia resources.', v_str) + if v >= _OCTAVIA_PROVIDER_VERSION: + self._octavia_providers = True def double_listeners_supported(self): return self._octavia_double_listeners + def providers_supported(self): + return self._octavia_providers + def get_octavia_version(self): lbaas = clients.get_loadbalancer_client() region_name = getattr(CONF.neutron, 'region_name', None) diff --git a/kuryr_kubernetes/controller/handlers/lbaas.py b/kuryr_kubernetes/controller/handlers/lbaas.py index 45f6dd62d..0e1d4e86d 100644 --- a/kuryr_kubernetes/controller/handlers/lbaas.py +++ b/kuryr_kubernetes/controller/handlers/lbaas.py @@ -258,10 +258,12 @@ class EndpointsHandler(k8s_base.ResourceEventHandler): # We need to set the requested load balancer provider # according to 'endpoints_driver_octavia_provider' configuration. self._lb_provider = None - if (config.CONF.kubernetes.endpoints_driver_octavia_provider - != 'default'): - self._lb_provider = ( - config.CONF.kubernetes.endpoints_driver_octavia_provider) + if self._drv_lbaas.providers_supported(): + self._lb_provider = 'amphora' + if (config.CONF.kubernetes.endpoints_driver_octavia_provider + != 'default'): + self._lb_provider = ( + config.CONF.kubernetes.endpoints_driver_octavia_provider) def on_present(self, endpoints): if self._move_annotations_to_crd(endpoints): @@ -349,6 +351,9 @@ class EndpointsHandler(k8s_base.ResourceEventHandler): 'status': status, } + if self._lb_provider: + loadbalancer_crd['spec']['provider'] = self._lb_provider + try: kubernetes.post('{}/{}/kuryrloadbalancers'.format( k_const.K8S_API_CRD_NAMESPACES, namespace), loadbalancer_crd) @@ -365,11 +370,14 @@ class EndpointsHandler(k8s_base.ResourceEventHandler): # TODO(maysams): Remove the convertion once we start handling # Endpoint slices. epslices = self._convert_subsets_to_endpointslice(endpoints) + spec = {'endpointSlices': epslices} + if self._lb_provider: + spec['provider'] = self._lb_provider try: kubernetes.patch_crd( 'spec', loadbalancer_crd['metadata']['selfLink'], - {'endpointSlices': epslices}) + spec) except k_exc.K8sResourceNotFound: LOG.debug('KuryrLoadbalancer CRD not found %s', loadbalancer_crd) except k_exc.K8sConflict: diff --git a/kuryr_kubernetes/controller/handlers/loadbalancer.py b/kuryr_kubernetes/controller/handlers/loadbalancer.py index 0646e99ef..8770dbe45 100644 --- a/kuryr_kubernetes/controller/handlers/loadbalancer.py +++ b/kuryr_kubernetes/controller/handlers/loadbalancer.py @@ -13,7 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import eventlet import time from oslo_log import log as logging @@ -22,14 +21,14 @@ from kuryr_kubernetes import clients from kuryr_kubernetes import config from kuryr_kubernetes import constants as k_const from kuryr_kubernetes.controller.drivers import base as drv_base -from kuryr_kubernetes.controller.drivers import utils as driver_utils from kuryr_kubernetes import exceptions as k_exc from kuryr_kubernetes.handlers import k8s_base -from kuryr_kubernetes.objects import lbaas as obj_lbaas from kuryr_kubernetes import utils LOG = logging.getLogger(__name__) +OCTAVIA_DEFAULT_PROVIDERS = ['octavia', 'amphora'] + class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): """LoadBalancerStatusHandler handles K8s Endpoints events. @@ -54,12 +53,6 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): # Load Balancer creation flow. # We need to set the requested load balancer provider # according to 'endpoints_driver_octavia_provider' configuration. - self._lb_provider = None - if (config.CONF.kubernetes.endpoints_driver_octavia_provider - != 'default'): - self._lb_provider = ( - config.CONF.kubernetes.endpoints_driver_octavia_provider) - eventlet.spawn(self._cleanup_leftover_lbaas) def on_present(self, loadbalancer_crd): if self._should_ignore(loadbalancer_crd): @@ -67,6 +60,22 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): loadbalancer_crd['metadata']['name']) return + crd_lb = loadbalancer_crd['status'].get('loadbalancer') + if crd_lb: + lb_provider = crd_lb.get('provider') + spec_lb_provider = loadbalancer_crd['spec'].get('provider') + # amphora to ovn upgrade + if not lb_provider or lb_provider in OCTAVIA_DEFAULT_PROVIDERS: + if (spec_lb_provider and + spec_lb_provider not in OCTAVIA_DEFAULT_PROVIDERS): + self._ensure_release_lbaas(loadbalancer_crd) + + # ovn to amphora downgrade + elif lb_provider and lb_provider not in OCTAVIA_DEFAULT_PROVIDERS: + if (not spec_lb_provider or + spec_lb_provider in OCTAVIA_DEFAULT_PROVIDERS): + self._ensure_release_lbaas(loadbalancer_crd) + try: name = loadbalancer_crd['metadata']['name'] namespace = loadbalancer_crd['metadata']['namespace'] @@ -703,7 +712,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): security_groups_ids=loadbalancer_crd['spec'].get( 'security_groups_ids'), service_type=loadbalancer_crd['spec'].get('type'), - provider=self._lb_provider) + provider=loadbalancer_crd['spec'].get('provider')) loadbalancer_crd['status']['loadbalancer'] = lb kubernetes = clients.get_kubernetes_client() @@ -721,47 +730,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): return changed - def _cleanup_leftover_lbaas(self): - lbaas_client = clients.get_loadbalancer_client() - services = [] - try: - services = driver_utils.get_services().get('items') - except k_exc.K8sClientException: - LOG.debug("Skipping cleanup of leftover lbaas. " - "Error retriving Kubernetes services") - return - services_cluster_ip = {service['spec']['clusterIP']: service - for service in services - if service['spec'].get('clusterIP')} - - services_without_selector = set( - service['spec']['clusterIP'] for service in services - if (service['spec'].get('clusterIP') and - not service['spec'].get('selector'))) - lbaas_spec = {} - self._drv_lbaas.add_tags('loadbalancer', lbaas_spec) - loadbalancers = lbaas_client.load_balancers(**lbaas_spec) - for loadbalancer in loadbalancers: - if loadbalancer.vip_address not in services_cluster_ip.keys(): - lb_obj = obj_lbaas.LBaaSLoadBalancer(**loadbalancer) - eventlet.spawn(self._ensure_release_lbaas, lb_obj) - else: - # check if the provider is the right one - if (loadbalancer.vip_address not in services_without_selector - and self._lb_provider - and self._lb_provider != loadbalancer.provider): - LOG.debug("Removing loadbalancer with old provider: %s", - loadbalancer) - lb_obj = obj_lbaas.LBaaSLoadBalancer(**loadbalancer) - eventlet.spawn( - self._ensure_release_lbaas, - lb_obj, - services_cluster_ip[loadbalancer.vip_address]) - # NOTE(ltomasbo): give some extra time in between lbs - # recreation actions - time.sleep(1) - - def _ensure_release_lbaas(self, lb_obj, svc=None): + def _ensure_release_lbaas(self, loadbalancer_crd): attempts = 0 deadline = 0 retry = True @@ -773,32 +742,32 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): if (attempts > 0 and utils.exponential_sleep(deadline, attempts) == 0): LOG.error("Failed releasing lbaas '%s': deadline exceeded", - lb_obj.name) + loadbalancer_crd['status']['loadbalancer'][ + 'name']) return - self._drv_lbaas.release_loadbalancer(lb_obj) + self._drv_lbaas.release_loadbalancer( + loadbalancer=loadbalancer_crd['status'].get('loadbalancer') + ) retry = False except k_exc.ResourceNotReady: LOG.debug("Attempt (%s) of loadbalancer release %s failed." " A retry will be triggered.", attempts, - lb_obj.name) + loadbalancer_crd['status']['loadbalancer']['name']) attempts += 1 retry = True - if svc: - endpoints_link = utils.get_endpoints_link(svc) + + loadbalancer_crd['status'] = {} k8s = clients.get_kubernetes_client() try: - endpoints = k8s.get(endpoints_link) + k8s.patch_crd('status', loadbalancer_crd['metadata'][ + 'selfLink'], loadbalancer_crd['status']) except k_exc.K8sResourceNotFound: - LOG.debug("Endpoint not Found.") - return - - lbaas = utils.get_lbaas_state(endpoints) - if lbaas: - lbaas.loadbalancer = None - lbaas.pools = [] - lbaas.listeners = [] - lbaas.members = [] - # NOTE(ltomasbo): give some extra time to ensure the Load - # Balancer VIP is also released - time.sleep(1) - utils.set_lbaas_state(endpoints, lbaas) + LOG.debug('KuryrLoadbalancer CRD not found %s', + loadbalancer_crd) + except k_exc.K8sClientException: + LOG.exception('Error updating KuryrLoadbalancer CRD %s', + loadbalancer_crd) + raise + # NOTE(ltomasbo): give some extra time to ensure the Load + # Balancer VIP is also released + time.sleep(1) diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py index d520e5d26..9a36637d3 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py @@ -79,7 +79,8 @@ def get_lb_crd(): ] } ], - "type": "LoadBalancer" + "type": "LoadBalancer", + "provider": "ovn" }, "status": { "listeners": [ @@ -213,14 +214,12 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase): m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg - m_cfg.kubernetes.endpoints_driver_octavia_provider = 'default' handler = h_lb.KuryrLoadBalancerHandler() self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas) self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project) self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets) self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip) - self.assertIsNone(handler._lb_provider) @mock.patch('kuryr_kubernetes.controller.drivers.base.' 'ServiceProjectDriver.get_instance') @@ -245,14 +244,12 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase): m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg - m_cfg.kubernetes.endpoints_driver_octavia_provider = 'ovn' handler = h_lb .KuryrLoadBalancerHandler() self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas) self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project) self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets) self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip) - self.assertEqual('ovn', handler._lb_provider) def test_on_present(self): m_drv_service_pub_ip = mock.Mock()