From e790508842493d7d32f13c94ddda76d973f2f619 Mon Sep 17 00:00:00 2001 From: Sunday Mgbogu Date: Wed, 30 Jun 2021 08:37:20 +0000 Subject: [PATCH] wip: Loadbalancers reconciliation The project aims to handle the reconcilation of LoadBalancers on Octavia. When the loadbalancers are deleted on OpenStack, the reconciliations loop which runs every 10mins detects the missing loadbalancers andd triggers their recreation Implements: blueprint https://blueprints.launchpad.net/kuryr-kubernetes/+spec/reconcile-openstack-resources-with-k8s Change-Id: I0cad1c45615309587a2add1921029f5ce08be446 --- kuryr_kubernetes/controller/drivers/utils.py | 37 ++++--- .../controller/handlers/loadbalancer.py | 62 ++++++++++- .../controller/handlers/test_loadbalancer.py | 100 ++++++++++++++++++ 3 files changed, 185 insertions(+), 14 deletions(-) diff --git a/kuryr_kubernetes/controller/drivers/utils.py b/kuryr_kubernetes/controller/drivers/utils.py index b320c33cc..1d5f013ef 100644 --- a/kuryr_kubernetes/controller/drivers/utils.py +++ b/kuryr_kubernetes/controller/drivers/utils.py @@ -323,23 +323,34 @@ def get_annotated_labels(resource, annotation_labels): def get_kuryrnetworkpolicy_crds(namespace=None): - kubernetes = clients.get_kubernetes_client() - - try: - if namespace: - knp_path = '{}/{}/kuryrnetworkpolicies'.format( + if namespace: + knp_path = '{}/{}/kuryrnetworkpolicies'.format( constants.K8S_API_CRD_NAMESPACES, namespace) - else: - knp_path = constants.K8S_API_CRD_KURYRNETWORKPOLICIES - knps = kubernetes.get(knp_path) - LOG.debug("Returning KuryrNetworkPolicies %s", knps) + else: + knp_path = constants.K8S_API_CRD_KURYRNETWORKPOLICIES + return get_k8s_resource(knp_path) + + +def get_kuryrloadbalancer_crds(namespace=None): + if namespace: + klb_path = '{}/{}/kuryrloadbalancers'.format( + constants.K8S_API_CRD_KURYRLOADBALANCERS, namespace) + else: + klb_path = constants.K8S_API_CRD_KURYRLOADBALANCERS + return get_k8s_resource(klb_path) + + +def get_k8s_resource(resource_path): + kubernetes = clients.get_kubernetes_client() + k8s_resource = {} + try: + k8s_resource = kubernetes.get(resource_path) except k_exc.K8sResourceNotFound: - LOG.exception("KuryrNetworkPolicy CRD not found") + LOG.exception('Kubernetes CRD not found') return [] except k_exc.K8sClientException: - LOG.exception("Exception during fetch KuryrNetworkPolicies. Retrying.") - raise k_exc.ResourceNotReady(knp_path) - return knps.get('items', []) + LOG.exception("Exception during Kubernetes recource") + return k8s_resource.get('items', []) def get_networkpolicies(namespace=None): diff --git a/kuryr_kubernetes/controller/handlers/loadbalancer.py b/kuryr_kubernetes/controller/handlers/loadbalancer.py index 10fc8dfbe..25561d5c8 100644 --- a/kuryr_kubernetes/controller/handlers/loadbalancer.py +++ b/kuryr_kubernetes/controller/handlers/loadbalancer.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +import eventlet + import time from oslo_log import log as logging @@ -21,6 +23,7 @@ from kuryr_kubernetes import clients from kuryr_kubernetes import config from kuryr_kubernetes import constants as k_const from kuryr_kubernetes.controller.drivers import base as drv_base +from kuryr_kubernetes.controller.drivers import utils as driver_utils from kuryr_kubernetes import exceptions as k_exc from kuryr_kubernetes.handlers import k8s_base from kuryr_kubernetes import utils @@ -29,6 +32,7 @@ LOG = logging.getLogger(__name__) CONF = config.CONF OCTAVIA_DEFAULT_PROVIDERS = ['octavia', 'amphora'] +CRD_RECONCILIATION_FREQUENCY = 600 # seconds class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): @@ -51,6 +55,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): self._drv_svc_project = drv_base.ServiceProjectDriver.get_instance() self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance() self._drv_nodes_subnets = drv_base.NodesSubnetsDriver.get_instance() + eventlet.spawn(self._reconcile_loadbalancers) def _get_nodes_subnets(self): return utils.get_subnets_id_cidrs( @@ -58,7 +63,6 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): def on_present(self, loadbalancer_crd, *args, **kwargs): if loadbalancer_crd.get('status', None) is None: - kubernetes = clients.get_kubernetes_client() try: kubernetes.patch_crd('status', @@ -113,6 +117,62 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): self._update_lb_status(loadbalancer_crd) self._patch_status(loadbalancer_crd) + def _reconcile_loadbalancers(self): + while True: + eventlet.sleep(CRD_RECONCILIATION_FREQUENCY) + loadbalancer_crds = [] + try: + loadbalancer_crds = driver_utils.get_kuryrloadbalancer_crds() + except k_exc.K8sClientException: + LOG.debug("Error retriving KuryrLoadBalanders CRDs") + return + try: + self._trigger_loadbalancer_reconciliation(loadbalancer_crds) + except Exception: + LOG.exception('Error while running loadbalancers ' + 'reconciliation. It will be retried in %s', + CRD_RECONCILIATION_FREQUENCY) + + def _trigger_loadbalancer_reconciliation(self, loadbalancer_crds): + LOG.debug("Reconciling the loadbalancer CRDs") + # get the loadbalancers id in the CRD status + crd_loadbalancer_ids = [{'id': loadbalancer_crd.get('status', {}).get( + 'loadbalancer', {}).get('id', {}), 'selflink': + utils.get_res_link(loadbalancer_crd)} for + loadbalancer_crd in loadbalancer_crds] + lbaas = clients.get_loadbalancer_client() + lbaas_spec = {} + self._drv_lbaas.add_tags('loadbalancer', lbaas_spec) + loadbalancers = lbaas.load_balancers(**lbaas_spec) + # get the Loadbalaancer IDs from Openstack + loadbalancers_id = [loadbalancer['id'] + for loadbalancer in loadbalancers] + # for each loadbalancer id in the CRD status, check if exists in + # OpenStack + crds_to_reconcile_selflink = [crd_lb['selflink'] for crd_lb in + crd_loadbalancer_ids if + crd_lb['id'] not in loadbalancers_id] + if not crds_to_reconcile_selflink: + LOG.debug("KuryrLoadBalancer CRDs already in sync with OpenStack") + return + LOG.debug("Reconciling the following KuryrLoadBalancer CRDs: %r", + crds_to_reconcile_selflink) + self._reconcile_lbaas(crds_to_reconcile_selflink) + + def _reconcile_lbaas(self, crds_to_reconcile_selflink): + kubernetes = clients.get_kubernetes_client() + for selflink in crds_to_reconcile_selflink: + try: + kubernetes.patch_crd('status', selflink, {}) + except k_exc.K8sResourceNotFound: + LOG.debug('Unable to reconcile the KuryLoadBalancer CRD %s', + selflink) + return + except k_exc.K8sClientException: + LOG.debug('Unable fetch the KuryLoadBalancer CRD %s', + selflink) + return + def _should_ignore(self, loadbalancer_crd): return (not(self._has_endpoints(loadbalancer_crd) or loadbalancer_crd.get('status')) or not diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py index de8420023..20846befc 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py @@ -137,6 +137,71 @@ def get_lb_crd(): } +def get_lb_crds(): + return [ + { + 'apiVersion': 'openstack.org/v1', + 'kind': 'KuryrLoadBalancer', + "metadata": { + "creationTimestamp": "2020-07-28T13:13:30Z", + "finalizers": [ + "" + ], + "generation": 6, + "name": "test", + "namespace": "default", + "resourceVersion": "111871", + "uid": "584fe3ea-04dd-43f7-be2f-713e861694ec" + }, + "status": { + "loadbalancer": { + "id": "01234567890", + "ip": "1.2.3.4", + "name": "default/test", + "port_id": "1023456789120", + "project_id": "12345678912", + "provider": "amphora", + "security_groups": [ + "1d134e68-5653-4192-bda2-4214319af799", + "31d7b8c2-75f1-4125-9565-8c15c5cf046c" + ], + "subnet_id": "123456789120" + }, + } + }, + { + 'apiVersion': 'openstack.org/v1', + 'kind': 'KuryrLoadBalancer', + "metadata": { + "creationTimestamp": "2020-07-28T13:13:30Z", + "finalizers": [ + "" + ], + "generation": 6, + "name": "demo", + "namespace": "default", + "resourceVersion": "111871", + "uid": "584fe3ea-04dd-43f7-be2f-713e861694ec" + }, + "status": { + "loadbalancer": { + "id": "01234567890", + "ip": "1.2.3.4", + "name": "default/demo", + "port_id": "1023456789120", + "project_id": "12345678912", + "provider": "amphora", + "security_groups": [ + "1d134e68-5653-4192-bda2-4214319af799", + "31d7b8c2-75f1-4125-9565-8c15c5cf046c" + ], + "subnet_id": "123456789120" + }, + } + } + ] + + class FakeLBaaSDriver(drv_base.LBaaSDriver): def ensure_loadbalancer(self, name, project_id, subnet_id, ip, @@ -592,3 +657,38 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase): self.assertEqual(member_added, False) m_drv_lbaas.ensure_member.assert_not_called() + + @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') + @mock.patch('kuryr_kubernetes.controller.drivers.base' + '.LBaaSDriver.get_instance') + def test_reconcile_loadbalancers(self, m_get_drv_lbaas, m_k8s): + loadbalancer_crds = get_lb_crds() + + m_handler = mock.MagicMock(spec=h_lb.KuryrLoadBalancerHandler) + m_handler._drv_lbaas = m_get_drv_lbaas + lbaas = self.useFixture(k_fix.MockLBaaSClient()).client + lbaas.load_balancers.return_value = [] + + selflink = ['/apis/openstack.org/v1/namespaces/default/' + 'kuryrloadbalancers/test', + '/apis/openstack.org/v1/namespaces/default/' + 'kuryrloadbalancers/demo'] + h_lb.KuryrLoadBalancerHandler._trigger_loadbalancer_reconciliation( + m_handler, loadbalancer_crds) + m_handler._reconcile_lbaas.assert_called_with(selflink) + + @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') + @mock.patch('kuryr_kubernetes.controller.drivers.base' + '.LBaaSDriver.get_instance') + def test_reconcile_loadbalancers_in_sync(self, m_get_drv_lbaas, m_k8s): + loadbalancer_crds = get_lb_crds() + + m_handler = mock.MagicMock(spec=h_lb.KuryrLoadBalancerHandler) + m_handler._drv_lbaas = m_get_drv_lbaas + lbaas = self.useFixture(k_fix.MockLBaaSClient()).client + loadbalancers = [{'id': '01234567890'}, {'id': '01234567891'}] + lbaas.load_balancers.return_value = loadbalancers + + h_lb.KuryrLoadBalancerHandler._trigger_loadbalancer_reconciliation( + m_handler, loadbalancer_crds) + m_handler._reconcile_lbaas.assert_not_called()