Merge "Ensure LB sg is in sync with backend Pods"
This commit is contained in:
commit
807d203ac3
|
@ -50,6 +50,8 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||||
self._drv_pod_project = drv_base.PodProjectDriver.get_instance()
|
self._drv_pod_project = drv_base.PodProjectDriver.get_instance()
|
||||||
self._drv_pod_subnets = drv_base.PodSubnetsDriver.get_instance()
|
self._drv_pod_subnets = drv_base.PodSubnetsDriver.get_instance()
|
||||||
self._drv_service_pub_ip = drv_base.ServicePubIpDriver.get_instance()
|
self._drv_service_pub_ip = drv_base.ServicePubIpDriver.get_instance()
|
||||||
|
self._drv_svc_project = drv_base.ServiceProjectDriver.get_instance()
|
||||||
|
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()
|
||||||
# Note(yboaron) LBaaS driver supports 'provider' parameter in
|
# Note(yboaron) LBaaS driver supports 'provider' parameter in
|
||||||
# Load Balancer creation flow.
|
# Load Balancer creation flow.
|
||||||
# We need to set the requested load balancer provider
|
# We need to set the requested load balancer provider
|
||||||
|
@ -204,44 +206,43 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||||
|
|
||||||
return changed
|
return changed
|
||||||
|
|
||||||
def _sync_lbaas_sgs(self, loadbalancer_crd):
|
def _sync_lbaas_sgs(self, klb_crd):
|
||||||
# NOTE (maysams) Need to retrieve the LBaaS Spec again due to
|
lb = klb_crd['status'].get('loadbalancer')
|
||||||
# the possibility of it being updated after the LBaaS creation
|
svc_name = klb_crd['metadata']['name']
|
||||||
# process has started.
|
svc_namespace = klb_crd['metadata']['namespace']
|
||||||
lbaas_spec = loadbalancer_crd.get('spec')
|
|
||||||
|
|
||||||
lb = loadbalancer_crd['status'].get('loadbalancer')
|
|
||||||
if not lb:
|
if not lb:
|
||||||
|
LOG.debug("No LB created. Skipping lb %s sg sync.",
|
||||||
|
svc_name)
|
||||||
return
|
return
|
||||||
|
k8s = clients.get_kubernetes_client()
|
||||||
|
try:
|
||||||
|
service = k8s.get(
|
||||||
|
f'{k_const.K8S_API_NAMESPACES}/{svc_namespace}/'
|
||||||
|
f'services/{svc_name}')
|
||||||
|
except k_exc.K8sResourceNotFound:
|
||||||
|
LOG.debug('Service %s not found.', svc_name)
|
||||||
|
return
|
||||||
|
except k_exc.K8sClientException:
|
||||||
|
LOG.exception('Error retrieving Service %s.', svc_name)
|
||||||
|
raise
|
||||||
|
|
||||||
default_sgs = config.CONF.neutron_defaults.pod_security_groups
|
project_id = self._drv_svc_project.get_project(service)
|
||||||
# NOTE(maysams) As the endpoint and svc are annotated with the
|
lb_sgs = self._drv_sg.get_security_groups(service, project_id)
|
||||||
# 'lbaas_spec' in two separate k8s calls, it's possible that
|
try:
|
||||||
# the endpoint got annotated and the svc haven't due to controller
|
k8s.patch_crd('status/loadbalancer',
|
||||||
# restarts. For this case, a resourceNotReady exception is raised
|
klb_crd['metadata']['selfLink'],
|
||||||
# till the svc gets annotated with a 'lbaas_spec'.
|
{'security_groups': lb_sgs})
|
||||||
if lbaas_spec:
|
except k_exc.K8sResourceNotFound:
|
||||||
lbaas_spec_sgs = loadbalancer_crd['spec'].get(
|
LOG.debug('KuryrLoadBalancer %s not found', svc_name)
|
||||||
'security_groups_ids', [])
|
except k_exc.K8sClientException:
|
||||||
else:
|
LOG.exception('Error syncing KuryrLoadBalancer'
|
||||||
raise k_exc.ResourceNotReady(lbaas_spec_sgs)
|
' %s', svc_name)
|
||||||
if (lb.get('security_groups') and
|
raise
|
||||||
lb.get('security_groups') != lbaas_spec_sgs):
|
|
||||||
sgs = [lb_sg for lb_sg in lb['security_groups']
|
|
||||||
if lb_sg not in default_sgs]
|
|
||||||
if lbaas_spec_sgs != default_sgs:
|
|
||||||
sgs.extend(lbaas_spec_sgs)
|
|
||||||
|
|
||||||
# Check if this should update the CRD
|
|
||||||
lb['security_groups'] = sgs
|
|
||||||
|
|
||||||
def _add_new_members(self, loadbalancer_crd):
|
def _add_new_members(self, loadbalancer_crd):
|
||||||
changed = False
|
changed = False
|
||||||
try:
|
|
||||||
self._sync_lbaas_sgs(loadbalancer_crd)
|
self._sync_lbaas_sgs(loadbalancer_crd)
|
||||||
except k_exc.K8sResourceNotFound:
|
|
||||||
LOG.debug("The svc has been deleted while processing the endpoints"
|
|
||||||
" update. No need to add new members.")
|
|
||||||
|
|
||||||
lsnr_by_id = {l['id']: l for l in loadbalancer_crd['status'].get(
|
lsnr_by_id = {l['id']: l for l in loadbalancer_crd['status'].get(
|
||||||
'listeners', [])}
|
'listeners', [])}
|
||||||
|
|
|
@ -412,6 +412,11 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
|
||||||
for member in crd['status']['members'])
|
for member in crd['status']['members'])
|
||||||
return observed_targets
|
return observed_targets
|
||||||
|
|
||||||
|
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
|
||||||
|
'ServiceSecurityGroupsDriver.get_instance')
|
||||||
|
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
|
||||||
|
'ServiceProjectDriver.get_instance')
|
||||||
|
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
|
||||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||||
'.PodSubnetsDriver.get_instance')
|
'.PodSubnetsDriver.get_instance')
|
||||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||||
|
@ -419,7 +424,8 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
|
||||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||||
'.LBaaSDriver.get_instance')
|
'.LBaaSDriver.get_instance')
|
||||||
def test_sync_lbaas_members(self, m_get_drv_lbaas, m_get_drv_project,
|
def test_sync_lbaas_members(self, m_get_drv_lbaas, m_get_drv_project,
|
||||||
m_get_drv_subnets):
|
m_get_drv_subnets, m_k8s, m_svc_project_drv,
|
||||||
|
m_svc_sg_drv):
|
||||||
# REVISIT(ivc): test methods separately and verify ensure/release
|
# REVISIT(ivc): test methods separately and verify ensure/release
|
||||||
project_id = str(uuid.uuid4())
|
project_id = str(uuid.uuid4())
|
||||||
subnet_id = str(uuid.uuid4())
|
subnet_id = str(uuid.uuid4())
|
||||||
|
@ -437,6 +443,11 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
|
||||||
self.assertEqual(sorted(expected_targets.items()), observed_targets)
|
self.assertEqual(sorted(expected_targets.items()), observed_targets)
|
||||||
self.assertEqual(expected_ip, str(crd['status']['loadbalancer']['ip']))
|
self.assertEqual(expected_ip, str(crd['status']['loadbalancer']['ip']))
|
||||||
|
|
||||||
|
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
|
||||||
|
'ServiceSecurityGroupsDriver.get_instance')
|
||||||
|
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
|
||||||
|
'ServiceProjectDriver.get_instance')
|
||||||
|
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
|
||||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||||
'.PodSubnetsDriver.get_instance')
|
'.PodSubnetsDriver.get_instance')
|
||||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||||
|
@ -444,7 +455,8 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
|
||||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||||
'.LBaaSDriver.get_instance')
|
'.LBaaSDriver.get_instance')
|
||||||
def test_sync_lbaas_members_udp(self, m_get_drv_lbaas,
|
def test_sync_lbaas_members_udp(self, m_get_drv_lbaas,
|
||||||
m_get_drv_project, m_get_drv_subnets):
|
m_get_drv_project, m_get_drv_subnets,
|
||||||
|
m_k8s, m_svc_project_drv, m_svc_sg_drv):
|
||||||
# REVISIT(ivc): test methods separately and verify ensure/release
|
# REVISIT(ivc): test methods separately and verify ensure/release
|
||||||
project_id = str(uuid.uuid4())
|
project_id = str(uuid.uuid4())
|
||||||
subnet_id = str(uuid.uuid4())
|
subnet_id = str(uuid.uuid4())
|
||||||
|
@ -463,6 +475,11 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
|
||||||
self.assertEqual(sorted(expected_targets.items()), observed_targets)
|
self.assertEqual(sorted(expected_targets.items()), observed_targets)
|
||||||
self.assertEqual(expected_ip, str(crd['status']['loadbalancer']['ip']))
|
self.assertEqual(expected_ip, str(crd['status']['loadbalancer']['ip']))
|
||||||
|
|
||||||
|
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
|
||||||
|
'ServiceSecurityGroupsDriver.get_instance')
|
||||||
|
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
|
||||||
|
'ServiceProjectDriver.get_instance')
|
||||||
|
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
|
||||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||||
'.PodSubnetsDriver.get_instance')
|
'.PodSubnetsDriver.get_instance')
|
||||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||||
|
@ -470,7 +487,8 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
|
||||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||||
'.LBaaSDriver.get_instance')
|
'.LBaaSDriver.get_instance')
|
||||||
def test_sync_lbaas_members_svc_listener_port_edit(
|
def test_sync_lbaas_members_svc_listener_port_edit(
|
||||||
self, m_get_drv_lbaas, m_get_drv_project, m_get_drv_subnets):
|
self, m_get_drv_lbaas, m_get_drv_project, m_get_drv_subnets,
|
||||||
|
m_k8s, m_svc_project_drv, m_svc_sg_drv):
|
||||||
# REVISIT(ivc): test methods separately and verify ensure/release
|
# REVISIT(ivc): test methods separately and verify ensure/release
|
||||||
project_id = str(uuid.uuid4())
|
project_id = str(uuid.uuid4())
|
||||||
subnet_id = str(uuid.uuid4())
|
subnet_id = str(uuid.uuid4())
|
||||||
|
@ -495,6 +513,11 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
|
||||||
|
|
||||||
self.assertEqual(expected_ip, str(crd['status']['loadbalancer']['ip']))
|
self.assertEqual(expected_ip, str(crd['status']['loadbalancer']['ip']))
|
||||||
|
|
||||||
|
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
|
||||||
|
'ServiceSecurityGroupsDriver.get_instance')
|
||||||
|
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
|
||||||
|
'ServiceProjectDriver.get_instance')
|
||||||
|
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
|
||||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||||
'.PodSubnetsDriver.get_instance')
|
'.PodSubnetsDriver.get_instance')
|
||||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||||
|
@ -502,7 +525,9 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
|
||||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||||
'.LBaaSDriver.get_instance')
|
'.LBaaSDriver.get_instance')
|
||||||
def test_add_new_members_udp(self, m_get_drv_lbaas,
|
def test_add_new_members_udp(self, m_get_drv_lbaas,
|
||||||
m_get_drv_project, m_get_drv_subnets):
|
m_get_drv_project, m_get_drv_subnets,
|
||||||
|
m_k8s, m_svc_project_drv,
|
||||||
|
m_svc_sg_drv):
|
||||||
project_id = str(uuid.uuid4())
|
project_id = str(uuid.uuid4())
|
||||||
subnet_id = str(uuid.uuid4())
|
subnet_id = str(uuid.uuid4())
|
||||||
crd = get_lb_crd()
|
crd = get_lb_crd()
|
||||||
|
|
Loading…
Reference in New Issue