Update the LoadBalancers Reconciliation Loop
The patch handles the KuryrLoadBalancers reconcilation using periodic task as event scheduler instead of eventlet Implements: blueprint reconcile-openstack-resources-with-k8s Change-Id: I85d2d1cf00b2543f41a875b32ae64fbbe885652c
This commit is contained in:
parent
7f2c925a51
commit
f23419d0ee
@ -323,21 +323,29 @@ def get_annotated_labels(resource, annotation_labels):
|
||||
|
||||
|
||||
def get_kuryrnetworkpolicy_crds(namespace=None):
|
||||
if namespace:
|
||||
knp_path = '{}/{}/kuryrnetworkpolicies'.format(
|
||||
|
||||
try:
|
||||
if namespace:
|
||||
knp_path = '{}/{}/kuryrnetworkpolicies'.format(
|
||||
constants.K8S_API_CRD_NAMESPACES, namespace)
|
||||
else:
|
||||
knp_path = constants.K8S_API_CRD_KURYRNETWORKPOLICIES
|
||||
return get_k8s_resource(knp_path)
|
||||
else:
|
||||
knp_path = constants.K8S_API_CRD_KURYRNETWORKPOLICIES
|
||||
knps = get_k8s_resource(knp_path)
|
||||
LOG.debug("Returning KuryrNetworkPolicies %s", knps)
|
||||
except k_exc.K8sClientException:
|
||||
LOG.exception("Exception during fetch KuryrNetworkPolicies. Retrying.")
|
||||
raise k_exc.ResourceNotReady(knp_path)
|
||||
return knps
|
||||
|
||||
|
||||
def get_kuryrloadbalancer_crds(namespace=None):
|
||||
if namespace:
|
||||
klb_path = '{}/{}/kuryrloadbalancers'.format(
|
||||
constants.K8S_API_CRD_KURYRLOADBALANCERS, namespace)
|
||||
constants.K8S_API_CRD_KURYRLOADBALANCERS, namespace)
|
||||
else:
|
||||
klb_path = constants.K8S_API_CRD_KURYRLOADBALANCERS
|
||||
return get_k8s_resource(klb_path)
|
||||
klbs = get_k8s_resource(klb_path)
|
||||
return klbs
|
||||
|
||||
|
||||
def get_k8s_resource(resource_path):
|
||||
@ -348,8 +356,6 @@ def get_k8s_resource(resource_path):
|
||||
except k_exc.K8sResourceNotFound:
|
||||
LOG.exception('Kubernetes CRD not found')
|
||||
return []
|
||||
except k_exc.K8sClientException:
|
||||
LOG.exception("Exception during Kubernetes recource")
|
||||
return k8s_resource.get('items', [])
|
||||
|
||||
|
||||
|
@ -13,8 +13,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
|
||||
import time
|
||||
|
||||
from oslo_log import log as logging
|
||||
@ -55,7 +53,6 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||
self._drv_svc_project = drv_base.ServiceProjectDriver.get_instance()
|
||||
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()
|
||||
self._drv_nodes_subnets = drv_base.NodesSubnetsDriver.get_instance()
|
||||
eventlet.spawn(self._reconcile_loadbalancers)
|
||||
|
||||
def _get_nodes_subnets(self):
|
||||
return utils.get_subnets_id_cidrs(
|
||||
@ -117,21 +114,16 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||
self._update_lb_status(loadbalancer_crd)
|
||||
self._patch_status(loadbalancer_crd)
|
||||
|
||||
def _reconcile_loadbalancers(self):
|
||||
while True:
|
||||
eventlet.sleep(CRD_RECONCILIATION_FREQUENCY)
|
||||
loadbalancer_crds = []
|
||||
try:
|
||||
loadbalancer_crds = driver_utils.get_kuryrloadbalancer_crds()
|
||||
except k_exc.K8sClientException:
|
||||
LOG.debug("Error retriving KuryrLoadBalanders CRDs")
|
||||
return
|
||||
try:
|
||||
self._trigger_loadbalancer_reconciliation(loadbalancer_crds)
|
||||
except Exception:
|
||||
LOG.exception('Error while running loadbalancers '
|
||||
'reconciliation. It will be retried in %s',
|
||||
CRD_RECONCILIATION_FREQUENCY)
|
||||
def reconcile(self):
|
||||
loadbalancer_crds = []
|
||||
try:
|
||||
loadbalancer_crds = driver_utils.get_kuryrloadbalancer_crds()
|
||||
except k_exc.K8sClientException:
|
||||
LOG.warning("Error retriving KuryrLoadBalanders CRDs")
|
||||
try:
|
||||
self._trigger_loadbalancer_reconciliation(loadbalancer_crds)
|
||||
except Exception:
|
||||
LOG.exception('Error while running loadbalancers reconciliation.')
|
||||
|
||||
def _trigger_loadbalancer_reconciliation(self, loadbalancer_crds):
|
||||
LOG.debug("Reconciling the loadbalancer CRDs")
|
||||
@ -144,7 +136,6 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||
lbaas_spec = {}
|
||||
self._drv_lbaas.add_tags('loadbalancer', lbaas_spec)
|
||||
loadbalancers = lbaas.load_balancers(**lbaas_spec)
|
||||
# get the Loadbalaancer IDs from Openstack
|
||||
loadbalancers_id = [loadbalancer['id']
|
||||
for loadbalancer in loadbalancers]
|
||||
# for each loadbalancer id in the CRD status, check if exists in
|
||||
@ -167,11 +158,11 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||
except k_exc.K8sResourceNotFound:
|
||||
LOG.debug('Unable to reconcile the KuryLoadBalancer CRD %s',
|
||||
selflink)
|
||||
return
|
||||
continue
|
||||
except k_exc.K8sClientException:
|
||||
LOG.debug('Unable fetch the KuryLoadBalancer CRD %s',
|
||||
selflink)
|
||||
return
|
||||
LOG.warning('Unable to patch the KuryLoadBalancer CRD %s',
|
||||
selflink)
|
||||
continue
|
||||
|
||||
def _should_ignore(self, loadbalancer_crd):
|
||||
return (not(self._has_endpoints(loadbalancer_crd) or
|
||||
|
@ -88,8 +88,8 @@ class KuryrK8sService(service.Service, periodic_task.PeriodicTasks,
|
||||
self.current_leader = None
|
||||
self.node_name = utils.get_node_name()
|
||||
|
||||
handlers = _load_kuryr_ctrlr_handlers()
|
||||
for handler in handlers:
|
||||
self.handlers = _load_kuryr_ctrlr_handlers()
|
||||
for handler in self.handlers:
|
||||
self.watcher.add(handler.get_watch_path())
|
||||
pipeline.register(handler)
|
||||
self.pool_driver = drivers.VIFPoolDriver.get_instance(
|
||||
@ -109,14 +109,16 @@ class KuryrK8sService(service.Service, periodic_task.PeriodicTasks,
|
||||
self.pool_driver.sync_pools()
|
||||
else:
|
||||
LOG.info('Running in HA mode, watcher will be started later.')
|
||||
f = functools.partial(self.run_periodic_tasks, None)
|
||||
self.tg.add_timer(1, f)
|
||||
f = functools.partial(self.run_periodic_tasks, None)
|
||||
self.tg.add_timer(1, f)
|
||||
|
||||
self.health_manager.run()
|
||||
LOG.info("Service '%s' started", self.__class__.__name__)
|
||||
|
||||
@periodic_task.periodic_task(spacing=5, run_immediately=True)
|
||||
def monitor_leader(self, context):
|
||||
if not CONF.kubernetes.controller_ha:
|
||||
return
|
||||
leader = utils.get_leader_name()
|
||||
if leader is None:
|
||||
# Error when fetching current leader. We're paranoid, so just to
|
||||
@ -162,6 +164,12 @@ class KuryrK8sService(service.Service, periodic_task.PeriodicTasks,
|
||||
self.watcher.stop()
|
||||
super(KuryrK8sService, self).stop(graceful)
|
||||
|
||||
@periodic_task.periodic_task(spacing=600, run_immediately=False)
|
||||
def reconcile_loadbalancers(self, context):
|
||||
LOG.debug("Checking for Kubernetes resources reconciliations")
|
||||
for handler in self.handlers:
|
||||
handler.reconcile()
|
||||
|
||||
|
||||
def start():
|
||||
urllib3.disable_warnings()
|
||||
|
@ -105,3 +105,6 @@ class ResourceEventHandler(dispatch.EventConsumer, health.HealthHandler):
|
||||
|
||||
def on_finalize(self, obj, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def reconcile(self):
|
||||
pass
|
||||
|
Loading…
x
Reference in New Issue
Block a user