wip: Loadbalancers reconciliation

The project aims to handle the reconcilation of LoadBalancers on Octavia.
When the loadbalancers are deleted on OpenStack, the reconciliations loop
which runs every 10mins detects the missing loadbalancers andd triggers
their recreation

Implements: blueprint https://blueprints.launchpad.net/kuryr-kubernetes/+spec/reconcile-openstack-resources-with-k8s
Change-Id: I0cad1c45615309587a2add1921029f5ce08be446
This commit is contained in:
Sunday Mgbogu 2021-06-30 08:37:20 +00:00
parent 862deaa455
commit e790508842
3 changed files with 185 additions and 14 deletions

View File

@ -323,23 +323,34 @@ def get_annotated_labels(resource, annotation_labels):
def get_kuryrnetworkpolicy_crds(namespace=None):
kubernetes = clients.get_kubernetes_client()
try:
if namespace:
knp_path = '{}/{}/kuryrnetworkpolicies'.format(
if namespace:
knp_path = '{}/{}/kuryrnetworkpolicies'.format(
constants.K8S_API_CRD_NAMESPACES, namespace)
else:
knp_path = constants.K8S_API_CRD_KURYRNETWORKPOLICIES
knps = kubernetes.get(knp_path)
LOG.debug("Returning KuryrNetworkPolicies %s", knps)
else:
knp_path = constants.K8S_API_CRD_KURYRNETWORKPOLICIES
return get_k8s_resource(knp_path)
def get_kuryrloadbalancer_crds(namespace=None):
if namespace:
klb_path = '{}/{}/kuryrloadbalancers'.format(
constants.K8S_API_CRD_KURYRLOADBALANCERS, namespace)
else:
klb_path = constants.K8S_API_CRD_KURYRLOADBALANCERS
return get_k8s_resource(klb_path)
def get_k8s_resource(resource_path):
kubernetes = clients.get_kubernetes_client()
k8s_resource = {}
try:
k8s_resource = kubernetes.get(resource_path)
except k_exc.K8sResourceNotFound:
LOG.exception("KuryrNetworkPolicy CRD not found")
LOG.exception('Kubernetes CRD not found')
return []
except k_exc.K8sClientException:
LOG.exception("Exception during fetch KuryrNetworkPolicies. Retrying.")
raise k_exc.ResourceNotReady(knp_path)
return knps.get('items', [])
LOG.exception("Exception during Kubernetes recource")
return k8s_resource.get('items', [])
def get_networkpolicies(namespace=None):

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import time
from oslo_log import log as logging
@ -21,6 +23,7 @@ from kuryr_kubernetes import clients
from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes.controller.drivers import base as drv_base
from kuryr_kubernetes.controller.drivers import utils as driver_utils
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes import utils
@ -29,6 +32,7 @@ LOG = logging.getLogger(__name__)
CONF = config.CONF
OCTAVIA_DEFAULT_PROVIDERS = ['octavia', 'amphora']
CRD_RECONCILIATION_FREQUENCY = 600 # seconds
class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
@ -51,6 +55,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
self._drv_svc_project = drv_base.ServiceProjectDriver.get_instance()
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()
self._drv_nodes_subnets = drv_base.NodesSubnetsDriver.get_instance()
eventlet.spawn(self._reconcile_loadbalancers)
def _get_nodes_subnets(self):
return utils.get_subnets_id_cidrs(
@ -58,7 +63,6 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
def on_present(self, loadbalancer_crd, *args, **kwargs):
if loadbalancer_crd.get('status', None) is None:
kubernetes = clients.get_kubernetes_client()
try:
kubernetes.patch_crd('status',
@ -113,6 +117,62 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
self._update_lb_status(loadbalancer_crd)
self._patch_status(loadbalancer_crd)
def _reconcile_loadbalancers(self):
while True:
eventlet.sleep(CRD_RECONCILIATION_FREQUENCY)
loadbalancer_crds = []
try:
loadbalancer_crds = driver_utils.get_kuryrloadbalancer_crds()
except k_exc.K8sClientException:
LOG.debug("Error retriving KuryrLoadBalanders CRDs")
return
try:
self._trigger_loadbalancer_reconciliation(loadbalancer_crds)
except Exception:
LOG.exception('Error while running loadbalancers '
'reconciliation. It will be retried in %s',
CRD_RECONCILIATION_FREQUENCY)
def _trigger_loadbalancer_reconciliation(self, loadbalancer_crds):
LOG.debug("Reconciling the loadbalancer CRDs")
# get the loadbalancers id in the CRD status
crd_loadbalancer_ids = [{'id': loadbalancer_crd.get('status', {}).get(
'loadbalancer', {}).get('id', {}), 'selflink':
utils.get_res_link(loadbalancer_crd)} for
loadbalancer_crd in loadbalancer_crds]
lbaas = clients.get_loadbalancer_client()
lbaas_spec = {}
self._drv_lbaas.add_tags('loadbalancer', lbaas_spec)
loadbalancers = lbaas.load_balancers(**lbaas_spec)
# get the Loadbalaancer IDs from Openstack
loadbalancers_id = [loadbalancer['id']
for loadbalancer in loadbalancers]
# for each loadbalancer id in the CRD status, check if exists in
# OpenStack
crds_to_reconcile_selflink = [crd_lb['selflink'] for crd_lb in
crd_loadbalancer_ids if
crd_lb['id'] not in loadbalancers_id]
if not crds_to_reconcile_selflink:
LOG.debug("KuryrLoadBalancer CRDs already in sync with OpenStack")
return
LOG.debug("Reconciling the following KuryrLoadBalancer CRDs: %r",
crds_to_reconcile_selflink)
self._reconcile_lbaas(crds_to_reconcile_selflink)
def _reconcile_lbaas(self, crds_to_reconcile_selflink):
kubernetes = clients.get_kubernetes_client()
for selflink in crds_to_reconcile_selflink:
try:
kubernetes.patch_crd('status', selflink, {})
except k_exc.K8sResourceNotFound:
LOG.debug('Unable to reconcile the KuryLoadBalancer CRD %s',
selflink)
return
except k_exc.K8sClientException:
LOG.debug('Unable fetch the KuryLoadBalancer CRD %s',
selflink)
return
def _should_ignore(self, loadbalancer_crd):
return (not(self._has_endpoints(loadbalancer_crd) or
loadbalancer_crd.get('status')) or not

View File

@ -137,6 +137,71 @@ def get_lb_crd():
}
def get_lb_crds():
return [
{
'apiVersion': 'openstack.org/v1',
'kind': 'KuryrLoadBalancer',
"metadata": {
"creationTimestamp": "2020-07-28T13:13:30Z",
"finalizers": [
""
],
"generation": 6,
"name": "test",
"namespace": "default",
"resourceVersion": "111871",
"uid": "584fe3ea-04dd-43f7-be2f-713e861694ec"
},
"status": {
"loadbalancer": {
"id": "01234567890",
"ip": "1.2.3.4",
"name": "default/test",
"port_id": "1023456789120",
"project_id": "12345678912",
"provider": "amphora",
"security_groups": [
"1d134e68-5653-4192-bda2-4214319af799",
"31d7b8c2-75f1-4125-9565-8c15c5cf046c"
],
"subnet_id": "123456789120"
},
}
},
{
'apiVersion': 'openstack.org/v1',
'kind': 'KuryrLoadBalancer',
"metadata": {
"creationTimestamp": "2020-07-28T13:13:30Z",
"finalizers": [
""
],
"generation": 6,
"name": "demo",
"namespace": "default",
"resourceVersion": "111871",
"uid": "584fe3ea-04dd-43f7-be2f-713e861694ec"
},
"status": {
"loadbalancer": {
"id": "01234567890",
"ip": "1.2.3.4",
"name": "default/demo",
"port_id": "1023456789120",
"project_id": "12345678912",
"provider": "amphora",
"security_groups": [
"1d134e68-5653-4192-bda2-4214319af799",
"31d7b8c2-75f1-4125-9565-8c15c5cf046c"
],
"subnet_id": "123456789120"
},
}
}
]
class FakeLBaaSDriver(drv_base.LBaaSDriver):
def ensure_loadbalancer(self, name, project_id, subnet_id, ip,
@ -592,3 +657,38 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
self.assertEqual(member_added, False)
m_drv_lbaas.ensure_member.assert_not_called()
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.LBaaSDriver.get_instance')
def test_reconcile_loadbalancers(self, m_get_drv_lbaas, m_k8s):
loadbalancer_crds = get_lb_crds()
m_handler = mock.MagicMock(spec=h_lb.KuryrLoadBalancerHandler)
m_handler._drv_lbaas = m_get_drv_lbaas
lbaas = self.useFixture(k_fix.MockLBaaSClient()).client
lbaas.load_balancers.return_value = []
selflink = ['/apis/openstack.org/v1/namespaces/default/'
'kuryrloadbalancers/test',
'/apis/openstack.org/v1/namespaces/default/'
'kuryrloadbalancers/demo']
h_lb.KuryrLoadBalancerHandler._trigger_loadbalancer_reconciliation(
m_handler, loadbalancer_crds)
m_handler._reconcile_lbaas.assert_called_with(selflink)
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.LBaaSDriver.get_instance')
def test_reconcile_loadbalancers_in_sync(self, m_get_drv_lbaas, m_k8s):
loadbalancer_crds = get_lb_crds()
m_handler = mock.MagicMock(spec=h_lb.KuryrLoadBalancerHandler)
m_handler._drv_lbaas = m_get_drv_lbaas
lbaas = self.useFixture(k_fix.MockLBaaSClient()).client
loadbalancers = [{'id': '01234567890'}, {'id': '01234567891'}]
lbaas.load_balancers.return_value = loadbalancers
h_lb.KuryrLoadBalancerHandler._trigger_loadbalancer_reconciliation(
m_handler, loadbalancer_crds)
m_handler._reconcile_lbaas.assert_not_called()