Merge "Loadbalancers reconciliation"
This commit is contained in:
commit
d5c5f99377
@ -323,23 +323,34 @@ def get_annotated_labels(resource, annotation_labels):
|
||||
|
||||
|
||||
def get_kuryrnetworkpolicy_crds(namespace=None):
|
||||
kubernetes = clients.get_kubernetes_client()
|
||||
|
||||
try:
|
||||
if namespace:
|
||||
knp_path = '{}/{}/kuryrnetworkpolicies'.format(
|
||||
constants.K8S_API_CRD_NAMESPACES, namespace)
|
||||
else:
|
||||
knp_path = constants.K8S_API_CRD_KURYRNETWORKPOLICIES
|
||||
knps = kubernetes.get(knp_path)
|
||||
LOG.debug("Returning KuryrNetworkPolicies %s", knps)
|
||||
return get_k8s_resource(knp_path)
|
||||
|
||||
|
||||
def get_kuryrloadbalancer_crds(namespace=None):
|
||||
if namespace:
|
||||
klb_path = '{}/{}/kuryrloadbalancers'.format(
|
||||
constants.K8S_API_CRD_KURYRLOADBALANCERS, namespace)
|
||||
else:
|
||||
klb_path = constants.K8S_API_CRD_KURYRLOADBALANCERS
|
||||
return get_k8s_resource(klb_path)
|
||||
|
||||
|
||||
def get_k8s_resource(resource_path):
|
||||
kubernetes = clients.get_kubernetes_client()
|
||||
k8s_resource = {}
|
||||
try:
|
||||
k8s_resource = kubernetes.get(resource_path)
|
||||
except k_exc.K8sResourceNotFound:
|
||||
LOG.exception("KuryrNetworkPolicy CRD not found")
|
||||
LOG.exception('Kubernetes CRD not found')
|
||||
return []
|
||||
except k_exc.K8sClientException:
|
||||
LOG.exception("Exception during fetch KuryrNetworkPolicies. Retrying.")
|
||||
raise k_exc.ResourceNotReady(knp_path)
|
||||
return knps.get('items', [])
|
||||
LOG.exception("Exception during Kubernetes recource")
|
||||
return k8s_resource.get('items', [])
|
||||
|
||||
|
||||
def get_networkpolicies(namespace=None):
|
||||
|
@ -13,6 +13,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
|
||||
import time
|
||||
|
||||
from oslo_log import log as logging
|
||||
@ -21,6 +23,7 @@ from kuryr_kubernetes import clients
|
||||
from kuryr_kubernetes import config
|
||||
from kuryr_kubernetes import constants as k_const
|
||||
from kuryr_kubernetes.controller.drivers import base as drv_base
|
||||
from kuryr_kubernetes.controller.drivers import utils as driver_utils
|
||||
from kuryr_kubernetes import exceptions as k_exc
|
||||
from kuryr_kubernetes.handlers import k8s_base
|
||||
from kuryr_kubernetes import utils
|
||||
@ -29,6 +32,7 @@ LOG = logging.getLogger(__name__)
|
||||
CONF = config.CONF
|
||||
|
||||
OCTAVIA_DEFAULT_PROVIDERS = ['octavia', 'amphora']
|
||||
CRD_RECONCILIATION_FREQUENCY = 600 # seconds
|
||||
|
||||
|
||||
class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||
@ -51,6 +55,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||
self._drv_svc_project = drv_base.ServiceProjectDriver.get_instance()
|
||||
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()
|
||||
self._drv_nodes_subnets = drv_base.NodesSubnetsDriver.get_instance()
|
||||
eventlet.spawn(self._reconcile_loadbalancers)
|
||||
|
||||
def _get_nodes_subnets(self):
|
||||
return utils.get_subnets_id_cidrs(
|
||||
@ -58,7 +63,6 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||
|
||||
def on_present(self, loadbalancer_crd, *args, **kwargs):
|
||||
if loadbalancer_crd.get('status', None) is None:
|
||||
|
||||
kubernetes = clients.get_kubernetes_client()
|
||||
try:
|
||||
kubernetes.patch_crd('status',
|
||||
@ -113,6 +117,62 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||
self._update_lb_status(loadbalancer_crd)
|
||||
self._patch_status(loadbalancer_crd)
|
||||
|
||||
def _reconcile_loadbalancers(self):
|
||||
while True:
|
||||
eventlet.sleep(CRD_RECONCILIATION_FREQUENCY)
|
||||
loadbalancer_crds = []
|
||||
try:
|
||||
loadbalancer_crds = driver_utils.get_kuryrloadbalancer_crds()
|
||||
except k_exc.K8sClientException:
|
||||
LOG.debug("Error retriving KuryrLoadBalanders CRDs")
|
||||
return
|
||||
try:
|
||||
self._trigger_loadbalancer_reconciliation(loadbalancer_crds)
|
||||
except Exception:
|
||||
LOG.exception('Error while running loadbalancers '
|
||||
'reconciliation. It will be retried in %s',
|
||||
CRD_RECONCILIATION_FREQUENCY)
|
||||
|
||||
def _trigger_loadbalancer_reconciliation(self, loadbalancer_crds):
|
||||
LOG.debug("Reconciling the loadbalancer CRDs")
|
||||
# get the loadbalancers id in the CRD status
|
||||
crd_loadbalancer_ids = [{'id': loadbalancer_crd.get('status', {}).get(
|
||||
'loadbalancer', {}).get('id', {}), 'selflink':
|
||||
utils.get_res_link(loadbalancer_crd)} for
|
||||
loadbalancer_crd in loadbalancer_crds]
|
||||
lbaas = clients.get_loadbalancer_client()
|
||||
lbaas_spec = {}
|
||||
self._drv_lbaas.add_tags('loadbalancer', lbaas_spec)
|
||||
loadbalancers = lbaas.load_balancers(**lbaas_spec)
|
||||
# get the Loadbalaancer IDs from Openstack
|
||||
loadbalancers_id = [loadbalancer['id']
|
||||
for loadbalancer in loadbalancers]
|
||||
# for each loadbalancer id in the CRD status, check if exists in
|
||||
# OpenStack
|
||||
crds_to_reconcile_selflink = [crd_lb['selflink'] for crd_lb in
|
||||
crd_loadbalancer_ids if
|
||||
crd_lb['id'] not in loadbalancers_id]
|
||||
if not crds_to_reconcile_selflink:
|
||||
LOG.debug("KuryrLoadBalancer CRDs already in sync with OpenStack")
|
||||
return
|
||||
LOG.debug("Reconciling the following KuryrLoadBalancer CRDs: %r",
|
||||
crds_to_reconcile_selflink)
|
||||
self._reconcile_lbaas(crds_to_reconcile_selflink)
|
||||
|
||||
def _reconcile_lbaas(self, crds_to_reconcile_selflink):
|
||||
kubernetes = clients.get_kubernetes_client()
|
||||
for selflink in crds_to_reconcile_selflink:
|
||||
try:
|
||||
kubernetes.patch_crd('status', selflink, {})
|
||||
except k_exc.K8sResourceNotFound:
|
||||
LOG.debug('Unable to reconcile the KuryLoadBalancer CRD %s',
|
||||
selflink)
|
||||
return
|
||||
except k_exc.K8sClientException:
|
||||
LOG.debug('Unable fetch the KuryLoadBalancer CRD %s',
|
||||
selflink)
|
||||
return
|
||||
|
||||
def _should_ignore(self, loadbalancer_crd):
|
||||
return (not(self._has_endpoints(loadbalancer_crd) or
|
||||
loadbalancer_crd.get('status')) or not
|
||||
|
@ -137,6 +137,71 @@ def get_lb_crd():
|
||||
}
|
||||
|
||||
|
||||
def get_lb_crds():
|
||||
return [
|
||||
{
|
||||
'apiVersion': 'openstack.org/v1',
|
||||
'kind': 'KuryrLoadBalancer',
|
||||
"metadata": {
|
||||
"creationTimestamp": "2020-07-28T13:13:30Z",
|
||||
"finalizers": [
|
||||
""
|
||||
],
|
||||
"generation": 6,
|
||||
"name": "test",
|
||||
"namespace": "default",
|
||||
"resourceVersion": "111871",
|
||||
"uid": "584fe3ea-04dd-43f7-be2f-713e861694ec"
|
||||
},
|
||||
"status": {
|
||||
"loadbalancer": {
|
||||
"id": "01234567890",
|
||||
"ip": "1.2.3.4",
|
||||
"name": "default/test",
|
||||
"port_id": "1023456789120",
|
||||
"project_id": "12345678912",
|
||||
"provider": "amphora",
|
||||
"security_groups": [
|
||||
"1d134e68-5653-4192-bda2-4214319af799",
|
||||
"31d7b8c2-75f1-4125-9565-8c15c5cf046c"
|
||||
],
|
||||
"subnet_id": "123456789120"
|
||||
},
|
||||
}
|
||||
},
|
||||
{
|
||||
'apiVersion': 'openstack.org/v1',
|
||||
'kind': 'KuryrLoadBalancer',
|
||||
"metadata": {
|
||||
"creationTimestamp": "2020-07-28T13:13:30Z",
|
||||
"finalizers": [
|
||||
""
|
||||
],
|
||||
"generation": 6,
|
||||
"name": "demo",
|
||||
"namespace": "default",
|
||||
"resourceVersion": "111871",
|
||||
"uid": "584fe3ea-04dd-43f7-be2f-713e861694ec"
|
||||
},
|
||||
"status": {
|
||||
"loadbalancer": {
|
||||
"id": "01234567891",
|
||||
"ip": "1.2.3.4",
|
||||
"name": "default/demo",
|
||||
"port_id": "1023456789120",
|
||||
"project_id": "12345678912",
|
||||
"provider": "amphora",
|
||||
"security_groups": [
|
||||
"1d134e68-5653-4192-bda2-4214319af799",
|
||||
"31d7b8c2-75f1-4125-9565-8c15c5cf046c"
|
||||
],
|
||||
"subnet_id": "123456789120"
|
||||
},
|
||||
}
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
class FakeLBaaSDriver(drv_base.LBaaSDriver):
|
||||
|
||||
def ensure_loadbalancer(self, name, project_id, subnet_id, ip,
|
||||
@ -592,3 +657,41 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
|
||||
|
||||
self.assertEqual(member_added, False)
|
||||
m_drv_lbaas.ensure_member.assert_not_called()
|
||||
|
||||
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
|
||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||
'.LBaaSDriver.get_instance')
|
||||
def test_reconcile_loadbalancers(self, m_get_drv_lbaas, m_k8s):
|
||||
loadbalancer_crds = get_lb_crds()
|
||||
m_handler = mock.MagicMock(spec=h_lb.KuryrLoadBalancerHandler)
|
||||
m_handler._drv_lbaas = m_get_drv_lbaas
|
||||
lbaas = self.useFixture(k_fix.MockLBaaSClient()).client
|
||||
lbaas_spec = {}
|
||||
lbaas.load_balancers.return_value = []
|
||||
|
||||
selflink = ['/apis/openstack.org/v1/namespaces/default/'
|
||||
'kuryrloadbalancers/test',
|
||||
'/apis/openstack.org/v1/namespaces/default/'
|
||||
'kuryrloadbalancers/demo']
|
||||
h_lb.KuryrLoadBalancerHandler._trigger_loadbalancer_reconciliation(
|
||||
m_handler, loadbalancer_crds)
|
||||
lbaas.load_balancers.assert_called_once_with(**lbaas_spec)
|
||||
m_handler._reconcile_lbaas.assert_called_with(selflink)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
|
||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||
'.LBaaSDriver.get_instance')
|
||||
def test_reconcile_loadbalancers_in_sync(self, m_get_drv_lbaas, m_k8s):
|
||||
loadbalancer_crds = get_lb_crds()
|
||||
|
||||
m_handler = mock.MagicMock(spec=h_lb.KuryrLoadBalancerHandler)
|
||||
m_handler._drv_lbaas = m_get_drv_lbaas
|
||||
lbaas = self.useFixture(k_fix.MockLBaaSClient()).client
|
||||
loadbalancers = [{'id': '01234567890'}, {'id': '01234567891'}]
|
||||
lbaas_spec = {}
|
||||
lbaas.load_balancers.return_value = loadbalancers
|
||||
|
||||
h_lb.KuryrLoadBalancerHandler._trigger_loadbalancer_reconciliation(
|
||||
m_handler, loadbalancer_crds)
|
||||
lbaas.load_balancers.assert_called_once_with(**lbaas_spec)
|
||||
m_handler._reconcile_lbaas.assert_not_called()
|
||||
|
Loading…
Reference in New Issue
Block a user