diff --git a/kuryr_kubernetes/controller/drivers/utils.py b/kuryr_kubernetes/controller/drivers/utils.py index 1d5f013ef..b5e156efc 100644 --- a/kuryr_kubernetes/controller/drivers/utils.py +++ b/kuryr_kubernetes/controller/drivers/utils.py @@ -323,21 +323,29 @@ def get_annotated_labels(resource, annotation_labels): def get_kuryrnetworkpolicy_crds(namespace=None): - if namespace: - knp_path = '{}/{}/kuryrnetworkpolicies'.format( + + try: + if namespace: + knp_path = '{}/{}/kuryrnetworkpolicies'.format( constants.K8S_API_CRD_NAMESPACES, namespace) - else: - knp_path = constants.K8S_API_CRD_KURYRNETWORKPOLICIES - return get_k8s_resource(knp_path) + else: + knp_path = constants.K8S_API_CRD_KURYRNETWORKPOLICIES + knps = get_k8s_resource(knp_path) + LOG.debug("Returning KuryrNetworkPolicies %s", knps) + except k_exc.K8sClientException: + LOG.exception("Exception during fetch KuryrNetworkPolicies. Retrying.") + raise k_exc.ResourceNotReady(knp_path) + return knps def get_kuryrloadbalancer_crds(namespace=None): if namespace: klb_path = '{}/{}/kuryrloadbalancers'.format( - constants.K8S_API_CRD_KURYRLOADBALANCERS, namespace) + constants.K8S_API_CRD_KURYRLOADBALANCERS, namespace) else: klb_path = constants.K8S_API_CRD_KURYRLOADBALANCERS - return get_k8s_resource(klb_path) + klbs = get_k8s_resource(klb_path) + return klbs def get_k8s_resource(resource_path): @@ -348,8 +356,6 @@ def get_k8s_resource(resource_path): except k_exc.K8sResourceNotFound: LOG.exception('Kubernetes CRD not found') return [] - except k_exc.K8sClientException: - LOG.exception("Exception during Kubernetes recource") return k8s_resource.get('items', []) diff --git a/kuryr_kubernetes/controller/handlers/loadbalancer.py b/kuryr_kubernetes/controller/handlers/loadbalancer.py index 25561d5c8..6daa67bed 100644 --- a/kuryr_kubernetes/controller/handlers/loadbalancer.py +++ b/kuryr_kubernetes/controller/handlers/loadbalancer.py @@ -13,8 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import eventlet - import time from oslo_log import log as logging @@ -55,7 +53,6 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): self._drv_svc_project = drv_base.ServiceProjectDriver.get_instance() self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance() self._drv_nodes_subnets = drv_base.NodesSubnetsDriver.get_instance() - eventlet.spawn(self._reconcile_loadbalancers) def _get_nodes_subnets(self): return utils.get_subnets_id_cidrs( @@ -117,21 +114,16 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): self._update_lb_status(loadbalancer_crd) self._patch_status(loadbalancer_crd) - def _reconcile_loadbalancers(self): - while True: - eventlet.sleep(CRD_RECONCILIATION_FREQUENCY) - loadbalancer_crds = [] - try: - loadbalancer_crds = driver_utils.get_kuryrloadbalancer_crds() - except k_exc.K8sClientException: - LOG.debug("Error retriving KuryrLoadBalanders CRDs") - return - try: - self._trigger_loadbalancer_reconciliation(loadbalancer_crds) - except Exception: - LOG.exception('Error while running loadbalancers ' - 'reconciliation. It will be retried in %s', - CRD_RECONCILIATION_FREQUENCY) + def reconcile(self): + loadbalancer_crds = [] + try: + loadbalancer_crds = driver_utils.get_kuryrloadbalancer_crds() + except k_exc.K8sClientException: + LOG.warning("Error retriving KuryrLoadBalanders CRDs") + try: + self._trigger_loadbalancer_reconciliation(loadbalancer_crds) + except Exception: + LOG.exception('Error while running loadbalancers reconciliation.') def _trigger_loadbalancer_reconciliation(self, loadbalancer_crds): LOG.debug("Reconciling the loadbalancer CRDs") @@ -144,7 +136,6 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): lbaas_spec = {} self._drv_lbaas.add_tags('loadbalancer', lbaas_spec) loadbalancers = lbaas.load_balancers(**lbaas_spec) - # get the Loadbalaancer IDs from Openstack loadbalancers_id = [loadbalancer['id'] for loadbalancer in loadbalancers] # for each loadbalancer id in the CRD status, check if exists in @@ -167,11 +158,11 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): except k_exc.K8sResourceNotFound: LOG.debug('Unable to reconcile the KuryLoadBalancer CRD %s', selflink) - return + continue except k_exc.K8sClientException: - LOG.debug('Unable fetch the KuryLoadBalancer CRD %s', - selflink) - return + LOG.warning('Unable to patch the KuryLoadBalancer CRD %s', + selflink) + continue def _should_ignore(self, loadbalancer_crd): return (not(self._has_endpoints(loadbalancer_crd) or diff --git a/kuryr_kubernetes/controller/service.py b/kuryr_kubernetes/controller/service.py index b49e4aeb9..66c43ce54 100644 --- a/kuryr_kubernetes/controller/service.py +++ b/kuryr_kubernetes/controller/service.py @@ -88,8 +88,8 @@ class KuryrK8sService(service.Service, periodic_task.PeriodicTasks, self.current_leader = None self.node_name = utils.get_node_name() - handlers = _load_kuryr_ctrlr_handlers() - for handler in handlers: + self.handlers = _load_kuryr_ctrlr_handlers() + for handler in self.handlers: self.watcher.add(handler.get_watch_path()) pipeline.register(handler) self.pool_driver = drivers.VIFPoolDriver.get_instance( @@ -109,14 +109,16 @@ class KuryrK8sService(service.Service, periodic_task.PeriodicTasks, self.pool_driver.sync_pools() else: LOG.info('Running in HA mode, watcher will be started later.') - f = functools.partial(self.run_periodic_tasks, None) - self.tg.add_timer(1, f) + f = functools.partial(self.run_periodic_tasks, None) + self.tg.add_timer(1, f) self.health_manager.run() LOG.info("Service '%s' started", self.__class__.__name__) @periodic_task.periodic_task(spacing=5, run_immediately=True) def monitor_leader(self, context): + if not CONF.kubernetes.controller_ha: + return leader = utils.get_leader_name() if leader is None: # Error when fetching current leader. We're paranoid, so just to @@ -162,6 +164,12 @@ class KuryrK8sService(service.Service, periodic_task.PeriodicTasks, self.watcher.stop() super(KuryrK8sService, self).stop(graceful) + @periodic_task.periodic_task(spacing=600, run_immediately=False) + def reconcile_loadbalancers(self, context): + LOG.debug("Checking for Kubernetes resources reconciliations") + for handler in self.handlers: + handler.reconcile() + def start(): urllib3.disable_warnings() diff --git a/kuryr_kubernetes/handlers/k8s_base.py b/kuryr_kubernetes/handlers/k8s_base.py index 8f37c180e..7b291d887 100755 --- a/kuryr_kubernetes/handlers/k8s_base.py +++ b/kuryr_kubernetes/handlers/k8s_base.py @@ -105,3 +105,6 @@ class ResourceEventHandler(dispatch.EventConsumer, health.HealthHandler): def on_finalize(self, obj, *args, **kwargs): pass + + def reconcile(self): + pass