Merge "Ensure only affected services are updated on Pod/NetworkPolicy events"
This commit is contained in:
commit
8ca54e2592
|
@ -251,6 +251,8 @@ class PodSecurityGroupsDriver(DriverBase):
|
||||||
"""Create security group rules for a pod.
|
"""Create security group rules for a pod.
|
||||||
|
|
||||||
:param pod: dict containing Kubernetes Pod object
|
:param pod: dict containing Kubernetes Pod object
|
||||||
|
:return: a list containing podSelectors of CRDs
|
||||||
|
that had security group rules created
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
@ -258,6 +260,8 @@ class PodSecurityGroupsDriver(DriverBase):
|
||||||
"""Delete security group rules for a pod
|
"""Delete security group rules for a pod
|
||||||
|
|
||||||
:param pod: dict containing Kubernetes Pod object
|
:param pod: dict containing Kubernetes Pod object
|
||||||
|
:return: a list containing podSelectors of CRDs
|
||||||
|
that had security group rules deleted
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
@ -265,6 +269,8 @@ class PodSecurityGroupsDriver(DriverBase):
|
||||||
"""Update security group rules for a pod
|
"""Update security group rules for a pod
|
||||||
|
|
||||||
:param pod: dict containing Kubernetes Pod object
|
:param pod: dict containing Kubernetes Pod object
|
||||||
|
:return: a list containing podSelectors of CRDs
|
||||||
|
that had security group rules updated
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
|
|
@ -215,6 +215,7 @@ class NetworkPolicySecurityGroupsDriver(base.PodSecurityGroupsDriver):
|
||||||
|
|
||||||
def create_sg_rules(self, pod):
|
def create_sg_rules(self, pod):
|
||||||
LOG.debug("Creating sg rule for pod: %s", pod['metadata']['name'])
|
LOG.debug("Creating sg rule for pod: %s", pod['metadata']['name'])
|
||||||
|
crd_pod_selectors = []
|
||||||
knp_crds = driver_utils.get_kuryrnetpolicy_crds()
|
knp_crds = driver_utils.get_kuryrnetpolicy_crds()
|
||||||
for crd in knp_crds.get('items'):
|
for crd in knp_crds.get('items'):
|
||||||
crd_selector = crd['spec'].get('podSelector')
|
crd_selector = crd['spec'].get('podSelector')
|
||||||
|
@ -225,11 +226,13 @@ class NetworkPolicySecurityGroupsDriver(base.PodSecurityGroupsDriver):
|
||||||
if i_matched or e_matched:
|
if i_matched or e_matched:
|
||||||
driver_utils.patch_kuryr_crd(crd, i_rules,
|
driver_utils.patch_kuryr_crd(crd, i_rules,
|
||||||
e_rules, crd_selector)
|
e_rules, crd_selector)
|
||||||
|
crd_pod_selectors.append(crd_selector)
|
||||||
|
return crd_pod_selectors
|
||||||
|
|
||||||
def delete_sg_rules(self, pod):
|
def delete_sg_rules(self, pod):
|
||||||
LOG.debug("Deleting sg rule for pod: %s", pod['metadata']['name'])
|
LOG.debug("Deleting sg rule for pod: %s", pod['metadata']['name'])
|
||||||
pod_ip = driver_utils.get_pod_ip(pod)
|
pod_ip = driver_utils.get_pod_ip(pod)
|
||||||
|
crd_pod_selectors = []
|
||||||
knp_crds = driver_utils.get_kuryrnetpolicy_crds()
|
knp_crds = driver_utils.get_kuryrnetpolicy_crds()
|
||||||
for crd in knp_crds.get('items'):
|
for crd in knp_crds.get('items'):
|
||||||
crd_selector = crd['spec'].get('podSelector')
|
crd_selector = crd['spec'].get('podSelector')
|
||||||
|
@ -264,11 +267,15 @@ class NetworkPolicySecurityGroupsDriver(base.PodSecurityGroupsDriver):
|
||||||
if matched:
|
if matched:
|
||||||
driver_utils.patch_kuryr_crd(crd, i_rules, e_rules,
|
driver_utils.patch_kuryr_crd(crd, i_rules, e_rules,
|
||||||
crd_selector)
|
crd_selector)
|
||||||
|
crd_pod_selectors.append(crd_selector)
|
||||||
|
return crd_pod_selectors
|
||||||
|
|
||||||
def update_sg_rules(self, pod):
|
def update_sg_rules(self, pod):
|
||||||
LOG.debug("Updating sg rule for pod: %s", pod['metadata']['name'])
|
LOG.debug("Updating sg rule for pod: %s", pod['metadata']['name'])
|
||||||
self.delete_sg_rules(pod)
|
crd_pod_selectors = []
|
||||||
self.create_sg_rules(pod)
|
crd_pod_selectors.extend(self.delete_sg_rules(pod))
|
||||||
|
crd_pod_selectors.extend(self.create_sg_rules(pod))
|
||||||
|
return crd_pod_selectors
|
||||||
|
|
||||||
def delete_namespace_sg_rules(self, namespace):
|
def delete_namespace_sg_rules(self, namespace):
|
||||||
ns_name = namespace['metadata']['name']
|
ns_name = namespace['metadata']['name']
|
||||||
|
|
|
@ -405,3 +405,21 @@ def get_services(namespace):
|
||||||
'namespace %s', namespace)
|
'namespace %s', namespace)
|
||||||
raise
|
raise
|
||||||
return services
|
return services
|
||||||
|
|
||||||
|
|
||||||
|
def service_matches_affected_pods(service, pod_selectors):
|
||||||
|
"""Returns if the service is affected by the pod selectors
|
||||||
|
|
||||||
|
Checks if the service selector matches the labelSelectors of
|
||||||
|
NetworkPolicies.
|
||||||
|
|
||||||
|
param service: k8s service
|
||||||
|
param pod_selectors: a list of kubernetes labelSelectors
|
||||||
|
return: True if the service is selected by any of the labelSelectors
|
||||||
|
and False otherwise.
|
||||||
|
"""
|
||||||
|
svc_selector = service['spec'].get('selector')
|
||||||
|
for selector in pod_selectors:
|
||||||
|
if match_selector(selector, svc_selector):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
|
@ -39,9 +39,11 @@ class PodLabelHandler(k8s_base.ResourceEventHandler):
|
||||||
super(PodLabelHandler, self).__init__()
|
super(PodLabelHandler, self).__init__()
|
||||||
self._drv_project = drivers.PodProjectDriver.get_instance()
|
self._drv_project = drivers.PodProjectDriver.get_instance()
|
||||||
self._drv_sg = drivers.PodSecurityGroupsDriver.get_instance()
|
self._drv_sg = drivers.PodSecurityGroupsDriver.get_instance()
|
||||||
|
self._drv_svc_sg = drivers.ServiceSecurityGroupsDriver.get_instance()
|
||||||
self._drv_vif_pool = drivers.VIFPoolDriver.get_instance(
|
self._drv_vif_pool = drivers.VIFPoolDriver.get_instance(
|
||||||
specific_driver='multi_pool')
|
specific_driver='multi_pool')
|
||||||
self._drv_vif_pool.set_vif_driver()
|
self._drv_vif_pool.set_vif_driver()
|
||||||
|
self._drv_lbaas = drivers.LBaaSDriver.get_instance()
|
||||||
|
|
||||||
def on_present(self, pod):
|
def on_present(self, pod):
|
||||||
if driver_utils.is_host_network(pod) or not self._has_pod_state(pod):
|
if driver_utils.is_host_network(pod) or not self._has_pod_state(pod):
|
||||||
|
@ -57,13 +59,16 @@ class PodLabelHandler(k8s_base.ResourceEventHandler):
|
||||||
if current_pod_labels == previous_pod_labels:
|
if current_pod_labels == previous_pod_labels:
|
||||||
return
|
return
|
||||||
|
|
||||||
self._drv_sg.update_sg_rules(pod)
|
crd_pod_selectors = self._drv_sg.update_sg_rules(pod)
|
||||||
|
|
||||||
project_id = self._drv_project.get_project(pod)
|
project_id = self._drv_project.get_project(pod)
|
||||||
security_groups = self._drv_sg.get_security_groups(pod, project_id)
|
security_groups = self._drv_sg.get_security_groups(pod, project_id)
|
||||||
self._drv_vif_pool.update_vif_sgs(pod, security_groups)
|
self._drv_vif_pool.update_vif_sgs(pod, security_groups)
|
||||||
self._set_pod_labels(pod, current_pod_labels)
|
self._set_pod_labels(pod, current_pod_labels)
|
||||||
|
|
||||||
|
services = driver_utils.get_services(pod['metadata']['namespace'])
|
||||||
|
self._update_services(services, crd_pod_selectors, project_id)
|
||||||
|
|
||||||
def _get_pod_labels(self, pod):
|
def _get_pod_labels(self, pod):
|
||||||
try:
|
try:
|
||||||
annotations = pod['metadata']['annotations']
|
annotations = pod['metadata']['annotations']
|
||||||
|
@ -94,3 +99,13 @@ class PodLabelHandler(k8s_base.ResourceEventHandler):
|
||||||
except KeyError:
|
except KeyError:
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def _update_services(self, services, crd_pod_selectors, project_id):
|
||||||
|
for service in services.get('items'):
|
||||||
|
if (service['metadata']['name'] == 'kubernetes' or not
|
||||||
|
driver_utils.service_matches_affected_pods(
|
||||||
|
service, crd_pod_selectors)):
|
||||||
|
continue
|
||||||
|
sgs = self._drv_svc_sg.get_security_groups(service,
|
||||||
|
project_id)
|
||||||
|
self._drv_lbaas.update_lbaas_sg(service, sgs)
|
||||||
|
|
|
@ -85,7 +85,8 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
|
||||||
for service in services.get('items'):
|
for service in services.get('items'):
|
||||||
# TODO(ltomasbo): Skip other services that are not affected
|
# TODO(ltomasbo): Skip other services that are not affected
|
||||||
# by the policy
|
# by the policy
|
||||||
if service['metadata']['name'] == 'kubernetes':
|
if (service['metadata']['name'] == 'kubernetes' or not
|
||||||
|
self._is_service_affected(service, pods_to_update)):
|
||||||
continue
|
continue
|
||||||
sgs = self._drv_svc_sg.get_security_groups(service,
|
sgs = self._drv_svc_sg.get_security_groups(service,
|
||||||
project_id)
|
project_id)
|
||||||
|
@ -119,7 +120,8 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
|
||||||
services = driver_utils.get_services(
|
services = driver_utils.get_services(
|
||||||
policy['metadata']['namespace'])
|
policy['metadata']['namespace'])
|
||||||
for service in services.get('items'):
|
for service in services.get('items'):
|
||||||
if service['metadata']['name'] == 'kubernetes':
|
if (service['metadata']['name'] == 'kubernetes' or not
|
||||||
|
self._is_service_affected(service, pods_to_update)):
|
||||||
continue
|
continue
|
||||||
sgs = self._drv_svc_sg.get_security_groups(service,
|
sgs = self._drv_svc_sg.get_security_groups(service,
|
||||||
project_id)
|
project_id)
|
||||||
|
@ -138,3 +140,10 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
|
||||||
if utils.has_limit(sg_quota):
|
if utils.has_limit(sg_quota):
|
||||||
return utils.is_available('security_groups', sg_quota, sg_func)
|
return utils.is_available('security_groups', sg_quota, sg_func)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def _is_service_affected(self, service, affected_pods):
|
||||||
|
svc_namespace = service['metadata']['namespace']
|
||||||
|
svc_selector = service['spec'].get('selector')
|
||||||
|
svc_pods = driver_utils.get_pods({'selector': svc_selector},
|
||||||
|
svc_namespace).get('items')
|
||||||
|
return any(pod in svc_pods for pod in affected_pods)
|
||||||
|
|
|
@ -126,12 +126,13 @@ class VIFHandler(k8s_base.ResourceEventHandler):
|
||||||
changed = True
|
changed = True
|
||||||
if changed:
|
if changed:
|
||||||
self._set_pod_state(pod, state)
|
self._set_pod_state(pod, state)
|
||||||
self._drv_sg.create_sg_rules(pod)
|
crd_pod_selectors = self._drv_sg.create_sg_rules(pod)
|
||||||
|
|
||||||
if self._is_network_policy_enabled():
|
if self._is_network_policy_enabled():
|
||||||
services = driver_utils.get_services(
|
services = driver_utils.get_services(
|
||||||
pod['metadata']['namespace'])
|
pod['metadata']['namespace'])
|
||||||
self._update_services(services, project_id)
|
self._update_services(
|
||||||
|
services, crd_pod_selectors, project_id)
|
||||||
|
|
||||||
def on_deleted(self, pod):
|
def on_deleted(self, pod):
|
||||||
if driver_utils.is_host_network(pod):
|
if driver_utils.is_host_network(pod):
|
||||||
|
@ -139,7 +140,7 @@ class VIFHandler(k8s_base.ResourceEventHandler):
|
||||||
|
|
||||||
services = driver_utils.get_services(pod['metadata']['namespace'])
|
services = driver_utils.get_services(pod['metadata']['namespace'])
|
||||||
project_id = self._drv_project.get_project(pod)
|
project_id = self._drv_project.get_project(pod)
|
||||||
self._drv_sg.delete_sg_rules(pod)
|
crd_pod_selectors = self._drv_sg.delete_sg_rules(pod)
|
||||||
try:
|
try:
|
||||||
security_groups = self._drv_sg.get_security_groups(pod, project_id)
|
security_groups = self._drv_sg.get_security_groups(pod, project_id)
|
||||||
except k_exc.ResourceNotReady:
|
except k_exc.ResourceNotReady:
|
||||||
|
@ -158,7 +159,7 @@ class VIFHandler(k8s_base.ResourceEventHandler):
|
||||||
self._drv_vif_pool.release_vif(pod, vif, project_id,
|
self._drv_vif_pool.release_vif(pod, vif, project_id,
|
||||||
security_groups)
|
security_groups)
|
||||||
if self._is_network_policy_enabled():
|
if self._is_network_policy_enabled():
|
||||||
self._update_services(services, project_id)
|
self._update_services(services, crd_pod_selectors, project_id)
|
||||||
|
|
||||||
@MEMOIZE
|
@MEMOIZE
|
||||||
def is_ready(self, quota):
|
def is_ready(self, quota):
|
||||||
|
@ -208,9 +209,11 @@ class VIFHandler(k8s_base.ResourceEventHandler):
|
||||||
constants.K8S_ANNOTATION_LABEL: labels_annotation},
|
constants.K8S_ANNOTATION_LABEL: labels_annotation},
|
||||||
resource_version=pod['metadata']['resourceVersion'])
|
resource_version=pod['metadata']['resourceVersion'])
|
||||||
|
|
||||||
def _update_services(self, services, project_id):
|
def _update_services(self, services, crd_pod_selectors, project_id):
|
||||||
for service in services.get('items'):
|
for service in services.get('items'):
|
||||||
if service['metadata']['name'] == 'kubernetes':
|
if (service['metadata']['name'] == 'kubernetes' or not
|
||||||
|
driver_utils.service_matches_affected_pods(
|
||||||
|
service, crd_pod_selectors)):
|
||||||
continue
|
continue
|
||||||
sgs = self._drv_svc_sg.get_security_groups(service,
|
sgs = self._drv_svc_sg.get_security_groups(service,
|
||||||
project_id)
|
project_id)
|
||||||
|
|
|
@ -32,7 +32,8 @@ class TestPodLabelHandler(test_base.TestCase):
|
||||||
self._pod_link = mock.sentinel.pod_link
|
self._pod_link = mock.sentinel.pod_link
|
||||||
self._pod = {
|
self._pod = {
|
||||||
'metadata': {'resourceVersion': self._pod_version,
|
'metadata': {'resourceVersion': self._pod_version,
|
||||||
'selfLink': self._pod_link},
|
'selfLink': self._pod_link,
|
||||||
|
'namespace': 'default'},
|
||||||
'status': {'phase': k_const.K8S_POD_STATUS_PENDING},
|
'status': {'phase': k_const.K8S_POD_STATUS_PENDING},
|
||||||
'spec': {'hostNetwork': False,
|
'spec': {'hostNetwork': False,
|
||||||
'nodeName': 'hostname'}
|
'nodeName': 'hostname'}
|
||||||
|
@ -72,7 +73,9 @@ class TestPodLabelHandler(test_base.TestCase):
|
||||||
self.assertEqual(sg_driver, handler._drv_sg)
|
self.assertEqual(sg_driver, handler._drv_sg)
|
||||||
self.assertEqual(vif_pool_driver, handler._drv_vif_pool)
|
self.assertEqual(vif_pool_driver, handler._drv_vif_pool)
|
||||||
|
|
||||||
def test_on_present(self):
|
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
|
||||||
|
def test_on_present(self, m_get_services):
|
||||||
|
m_get_services.return_value = {"items": []}
|
||||||
self._has_pod_state.return_value = True
|
self._has_pod_state.return_value = True
|
||||||
self._get_pod_labels.return_value = {'test1': 'test'}
|
self._get_pod_labels.return_value = {'test1': 'test'}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue