diff --git a/kuryr_kubernetes/controller/drivers/utils.py b/kuryr_kubernetes/controller/drivers/utils.py index b320c33cc..1d5f013ef 100644 --- a/kuryr_kubernetes/controller/drivers/utils.py +++ b/kuryr_kubernetes/controller/drivers/utils.py @@ -323,23 +323,34 @@ def get_annotated_labels(resource, annotation_labels): def get_kuryrnetworkpolicy_crds(namespace=None): - kubernetes = clients.get_kubernetes_client() - - try: - if namespace: - knp_path = '{}/{}/kuryrnetworkpolicies'.format( + if namespace: + knp_path = '{}/{}/kuryrnetworkpolicies'.format( constants.K8S_API_CRD_NAMESPACES, namespace) - else: - knp_path = constants.K8S_API_CRD_KURYRNETWORKPOLICIES - knps = kubernetes.get(knp_path) - LOG.debug("Returning KuryrNetworkPolicies %s", knps) + else: + knp_path = constants.K8S_API_CRD_KURYRNETWORKPOLICIES + return get_k8s_resource(knp_path) + + +def get_kuryrloadbalancer_crds(namespace=None): + if namespace: + klb_path = '{}/{}/kuryrloadbalancers'.format( + constants.K8S_API_CRD_KURYRLOADBALANCERS, namespace) + else: + klb_path = constants.K8S_API_CRD_KURYRLOADBALANCERS + return get_k8s_resource(klb_path) + + +def get_k8s_resource(resource_path): + kubernetes = clients.get_kubernetes_client() + k8s_resource = {} + try: + k8s_resource = kubernetes.get(resource_path) except k_exc.K8sResourceNotFound: - LOG.exception("KuryrNetworkPolicy CRD not found") + LOG.exception('Kubernetes CRD not found') return [] except k_exc.K8sClientException: - LOG.exception("Exception during fetch KuryrNetworkPolicies. Retrying.") - raise k_exc.ResourceNotReady(knp_path) - return knps.get('items', []) + LOG.exception("Exception during Kubernetes recource") + return k8s_resource.get('items', []) def get_networkpolicies(namespace=None): diff --git a/kuryr_kubernetes/controller/handlers/loadbalancer.py b/kuryr_kubernetes/controller/handlers/loadbalancer.py index 10fc8dfbe..25561d5c8 100644 --- a/kuryr_kubernetes/controller/handlers/loadbalancer.py +++ b/kuryr_kubernetes/controller/handlers/loadbalancer.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +import eventlet + import time from oslo_log import log as logging @@ -21,6 +23,7 @@ from kuryr_kubernetes import clients from kuryr_kubernetes import config from kuryr_kubernetes import constants as k_const from kuryr_kubernetes.controller.drivers import base as drv_base +from kuryr_kubernetes.controller.drivers import utils as driver_utils from kuryr_kubernetes import exceptions as k_exc from kuryr_kubernetes.handlers import k8s_base from kuryr_kubernetes import utils @@ -29,6 +32,7 @@ LOG = logging.getLogger(__name__) CONF = config.CONF OCTAVIA_DEFAULT_PROVIDERS = ['octavia', 'amphora'] +CRD_RECONCILIATION_FREQUENCY = 600 # seconds class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): @@ -51,6 +55,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): self._drv_svc_project = drv_base.ServiceProjectDriver.get_instance() self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance() self._drv_nodes_subnets = drv_base.NodesSubnetsDriver.get_instance() + eventlet.spawn(self._reconcile_loadbalancers) def _get_nodes_subnets(self): return utils.get_subnets_id_cidrs( @@ -58,7 +63,6 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): def on_present(self, loadbalancer_crd, *args, **kwargs): if loadbalancer_crd.get('status', None) is None: - kubernetes = clients.get_kubernetes_client() try: kubernetes.patch_crd('status', @@ -113,6 +117,62 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): self._update_lb_status(loadbalancer_crd) self._patch_status(loadbalancer_crd) + def _reconcile_loadbalancers(self): + while True: + eventlet.sleep(CRD_RECONCILIATION_FREQUENCY) + loadbalancer_crds = [] + try: + loadbalancer_crds = driver_utils.get_kuryrloadbalancer_crds() + except k_exc.K8sClientException: + LOG.debug("Error retriving KuryrLoadBalanders CRDs") + return + try: + self._trigger_loadbalancer_reconciliation(loadbalancer_crds) + except Exception: + LOG.exception('Error while running loadbalancers ' + 'reconciliation. It will be retried in %s', + CRD_RECONCILIATION_FREQUENCY) + + def _trigger_loadbalancer_reconciliation(self, loadbalancer_crds): + LOG.debug("Reconciling the loadbalancer CRDs") + # get the loadbalancers id in the CRD status + crd_loadbalancer_ids = [{'id': loadbalancer_crd.get('status', {}).get( + 'loadbalancer', {}).get('id', {}), 'selflink': + utils.get_res_link(loadbalancer_crd)} for + loadbalancer_crd in loadbalancer_crds] + lbaas = clients.get_loadbalancer_client() + lbaas_spec = {} + self._drv_lbaas.add_tags('loadbalancer', lbaas_spec) + loadbalancers = lbaas.load_balancers(**lbaas_spec) + # get the Loadbalaancer IDs from Openstack + loadbalancers_id = [loadbalancer['id'] + for loadbalancer in loadbalancers] + # for each loadbalancer id in the CRD status, check if exists in + # OpenStack + crds_to_reconcile_selflink = [crd_lb['selflink'] for crd_lb in + crd_loadbalancer_ids if + crd_lb['id'] not in loadbalancers_id] + if not crds_to_reconcile_selflink: + LOG.debug("KuryrLoadBalancer CRDs already in sync with OpenStack") + return + LOG.debug("Reconciling the following KuryrLoadBalancer CRDs: %r", + crds_to_reconcile_selflink) + self._reconcile_lbaas(crds_to_reconcile_selflink) + + def _reconcile_lbaas(self, crds_to_reconcile_selflink): + kubernetes = clients.get_kubernetes_client() + for selflink in crds_to_reconcile_selflink: + try: + kubernetes.patch_crd('status', selflink, {}) + except k_exc.K8sResourceNotFound: + LOG.debug('Unable to reconcile the KuryLoadBalancer CRD %s', + selflink) + return + except k_exc.K8sClientException: + LOG.debug('Unable fetch the KuryLoadBalancer CRD %s', + selflink) + return + def _should_ignore(self, loadbalancer_crd): return (not(self._has_endpoints(loadbalancer_crd) or loadbalancer_crd.get('status')) or not diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py index de8420023..20846befc 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py @@ -137,6 +137,71 @@ def get_lb_crd(): } +def get_lb_crds(): + return [ + { + 'apiVersion': 'openstack.org/v1', + 'kind': 'KuryrLoadBalancer', + "metadata": { + "creationTimestamp": "2020-07-28T13:13:30Z", + "finalizers": [ + "" + ], + "generation": 6, + "name": "test", + "namespace": "default", + "resourceVersion": "111871", + "uid": "584fe3ea-04dd-43f7-be2f-713e861694ec" + }, + "status": { + "loadbalancer": { + "id": "01234567890", + "ip": "1.2.3.4", + "name": "default/test", + "port_id": "1023456789120", + "project_id": "12345678912", + "provider": "amphora", + "security_groups": [ + "1d134e68-5653-4192-bda2-4214319af799", + "31d7b8c2-75f1-4125-9565-8c15c5cf046c" + ], + "subnet_id": "123456789120" + }, + } + }, + { + 'apiVersion': 'openstack.org/v1', + 'kind': 'KuryrLoadBalancer', + "metadata": { + "creationTimestamp": "2020-07-28T13:13:30Z", + "finalizers": [ + "" + ], + "generation": 6, + "name": "demo", + "namespace": "default", + "resourceVersion": "111871", + "uid": "584fe3ea-04dd-43f7-be2f-713e861694ec" + }, + "status": { + "loadbalancer": { + "id": "01234567890", + "ip": "1.2.3.4", + "name": "default/demo", + "port_id": "1023456789120", + "project_id": "12345678912", + "provider": "amphora", + "security_groups": [ + "1d134e68-5653-4192-bda2-4214319af799", + "31d7b8c2-75f1-4125-9565-8c15c5cf046c" + ], + "subnet_id": "123456789120" + }, + } + } + ] + + class FakeLBaaSDriver(drv_base.LBaaSDriver): def ensure_loadbalancer(self, name, project_id, subnet_id, ip, @@ -592,3 +657,38 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase): self.assertEqual(member_added, False) m_drv_lbaas.ensure_member.assert_not_called() + + @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') + @mock.patch('kuryr_kubernetes.controller.drivers.base' + '.LBaaSDriver.get_instance') + def test_reconcile_loadbalancers(self, m_get_drv_lbaas, m_k8s): + loadbalancer_crds = get_lb_crds() + + m_handler = mock.MagicMock(spec=h_lb.KuryrLoadBalancerHandler) + m_handler._drv_lbaas = m_get_drv_lbaas + lbaas = self.useFixture(k_fix.MockLBaaSClient()).client + lbaas.load_balancers.return_value = [] + + selflink = ['/apis/openstack.org/v1/namespaces/default/' + 'kuryrloadbalancers/test', + '/apis/openstack.org/v1/namespaces/default/' + 'kuryrloadbalancers/demo'] + h_lb.KuryrLoadBalancerHandler._trigger_loadbalancer_reconciliation( + m_handler, loadbalancer_crds) + m_handler._reconcile_lbaas.assert_called_with(selflink) + + @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') + @mock.patch('kuryr_kubernetes.controller.drivers.base' + '.LBaaSDriver.get_instance') + def test_reconcile_loadbalancers_in_sync(self, m_get_drv_lbaas, m_k8s): + loadbalancer_crds = get_lb_crds() + + m_handler = mock.MagicMock(spec=h_lb.KuryrLoadBalancerHandler) + m_handler._drv_lbaas = m_get_drv_lbaas + lbaas = self.useFixture(k_fix.MockLBaaSClient()).client + loadbalancers = [{'id': '01234567890'}, {'id': '01234567891'}] + lbaas.load_balancers.return_value = loadbalancers + + h_lb.KuryrLoadBalancerHandler._trigger_loadbalancer_reconciliation( + m_handler, loadbalancer_crds) + m_handler._reconcile_lbaas.assert_not_called()