Browse Source

Ensure NP changes are applied to services

When a Network Policy is changed, services must also be updated,
deleting the unnecessary rules that do not match the NP anymore
and create needed ones.

Closes-Bug: #1811242

Partially Implements: blueprint k8s-network-policies

Change-Id: I800477d08fd1f46c2a94d3653496f8f1188a3844
tags/1.0.0
Maysa Macedo 5 months ago
parent
commit
70692f86a4

+ 10
- 1
kuryr_kubernetes/controller/drivers/base.py View File

@@ -690,7 +690,7 @@ class LBaaSDriver(DriverBase):
690 690
         raise NotImplementedError()
691 691
 
692 692
     @abc.abstractmethod
693
-    def is_pool_used_by_other_l7policies(l7policy, pool):
693
+    def is_pool_used_by_other_l7policies(self, l7policy, pool):
694 694
         """Checks if pool used by other L7policy.
695 695
 
696 696
         :param l7policy: `LBaaSL7Policy` object
@@ -699,6 +699,15 @@ class LBaaSDriver(DriverBase):
699 699
         """
700 700
         raise NotImplementedError()
701 701
 
702
+    @abc.abstractmethod
703
+    def update_lbaas_sg(self, service, sgs):
704
+        """Update security group rules associated to the loadbalancer
705
+
706
+        :param service: k8s service object
707
+        :param sgs: list of security group ids to use for updating the rules
708
+        """
709
+        raise NotImplementedError()
710
+
702 711
 
703 712
 @six.add_metaclass(abc.ABCMeta)
704 713
 class VIFPoolDriver(PodVIFDriver):

+ 90
- 9
kuryr_kubernetes/controller/drivers/lbaasv2.py View File

@@ -27,6 +27,7 @@ from oslo_log import log as logging
27 27
 from oslo_utils import timeutils
28 28
 
29 29
 from kuryr_kubernetes import clients
30
+from kuryr_kubernetes import config
30 31
 from kuryr_kubernetes.controller.drivers import base
31 32
 from kuryr_kubernetes import exceptions as k_exc
32 33
 from kuryr_kubernetes.objects import lbaas as obj_lbaas
@@ -121,7 +122,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
121 122
             neutron.update_port(
122 123
                 vip_port.get('id'),
123 124
                 {'port': {
124
-                    'security_groups': loadbalancer.security_groups}})
125
+                    'security_groups': [sg_id]}})
125 126
 
126 127
         try:
127 128
             neutron.create_security_group_rule({
@@ -140,19 +141,37 @@ class LBaaSv2Driver(base.LBaaSDriver):
140 141
                               'for listener %s.', listener.name)
141 142
 
142 143
     def _apply_members_security_groups(self, loadbalancer, port, target_port,
143
-                                       protocol, sg_rule_name):
144
+                                       protocol, sg_rule_name, new_sgs=None):
145
+        LOG.debug("Applying members security groups.")
144 146
         neutron = clients.get_neutron_client()
145 147
         if CONF.octavia_defaults.sg_mode == 'create':
146
-            sg_id = self._find_listeners_sg(loadbalancer)
148
+            if new_sgs:
149
+                lb_name = sg_rule_name.split(":")[0]
150
+                lb_sg = self._find_listeners_sg(loadbalancer, lb_name=lb_name)
151
+            else:
152
+                lb_sg = self._find_listeners_sg(loadbalancer)
147 153
         else:
148
-            sg_id = self._get_vip_port(loadbalancer).get('security_groups')[0]
154
+            lb_sg = self._get_vip_port(loadbalancer).get('security_groups')[0]
149 155
 
150 156
         lbaas_sg_rules = neutron.list_security_group_rules(
151
-            security_group_id=sg_id)
157
+            security_group_id=lb_sg)
152 158
         all_pod_rules = []
159
+        add_default_rules = False
160
+
161
+        if new_sgs:
162
+            sgs = new_sgs
163
+        else:
164
+            sgs = loadbalancer.security_groups
165
+
153 166
         # Check if Network Policy allows listener on the pods
154
-        for sg in loadbalancer.security_groups:
155
-            if sg != sg_id:
167
+        for sg in sgs:
168
+            if sg != lb_sg:
169
+                if sg in config.CONF.neutron_defaults.pod_security_groups:
170
+                    # If default sg is set, this means there is no NP
171
+                    # associated to the service, thus falling back to the
172
+                    # default listener rules
173
+                    add_default_rules = True
174
+                    break
156 175
                 rules = neutron.list_security_group_rules(
157 176
                     security_group_id=sg)
158 177
                 for rule in rules['security_group_rules']:
@@ -172,6 +191,8 @@ class LBaaSv2Driver(base.LBaaSDriver):
172 191
                             continue
173 192
                         all_pod_rules.append(rule)
174 193
                         try:
194
+                            LOG.debug("Creating LBaaS sg rule for sg: %r",
195
+                                      lb_sg)
175 196
                             neutron.create_security_group_rule({
176 197
                                 'security_group_rule': {
177 198
                                     'direction': 'ingress',
@@ -180,7 +201,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
180 201
                                     'protocol': protocol,
181 202
                                     'remote_ip_prefix': rule[
182 203
                                         'remote_ip_prefix'],
183
-                                    'security_group_id': sg_id,
204
+                                    'security_group_id': lb_sg,
184 205
                                     'description': sg_rule_name,
185 206
                                 },
186 207
                             })
@@ -190,13 +211,38 @@ class LBaaSv2Driver(base.LBaaSDriver):
190 211
                                               'group rule for listener %s.',
191 212
                                               sg_rule_name)
192 213
 
214
+        # Delete LBaaS sg rules that do not match NP
193 215
         for rule in lbaas_sg_rules['security_group_rules']:
194 216
             if (rule.get('protocol') != protocol.lower() or
195 217
                     rule.get('port_range_min') != port or
218
+                    rule.get('direction') != 'ingress' or
196 219
                     not rule.get('remote_ip_prefix')):
220
+                if all_pod_rules and self._is_default_rule(rule):
221
+                    LOG.debug("Removing default LBaaS sg rule for sg: %r",
222
+                              lb_sg)
223
+                    neutron.delete_security_group_rule(rule['id'])
197 224
                 continue
198 225
             self._delete_rule_if_no_match(rule, all_pod_rules)
199 226
 
227
+        if add_default_rules:
228
+            try:
229
+                LOG.debug("Restoring default LBaaS sg rule for sg: %r", lb_sg)
230
+                neutron.create_security_group_rule({
231
+                    'security_group_rule': {
232
+                        'direction': 'ingress',
233
+                        'port_range_min': port,
234
+                        'port_range_max': port,
235
+                        'protocol': protocol,
236
+                        'security_group_id': lb_sg,
237
+                        'description': sg_rule_name,
238
+                    },
239
+                })
240
+            except n_exc.NeutronClientException as ex:
241
+                if ex.status_code != requests.codes.conflict:
242
+                    LOG.exception('Failed when creating security '
243
+                                  'group rule for listener %s.',
244
+                                  sg_rule_name)
245
+
200 246
     def _delete_rule_if_no_match(self, rule, all_pod_rules):
201 247
         for pod_rule in all_pod_rules:
202 248
             if pod_rule['remote_ip_prefix'] == rule['remote_ip_prefix']:
@@ -205,6 +251,12 @@ class LBaaSv2Driver(base.LBaaSDriver):
205 251
         LOG.debug("Deleting sg rule: %r", rule['id'])
206 252
         neutron.delete_security_group_rule(rule['id'])
207 253
 
254
+    def _is_default_rule(self, rule):
255
+        if (rule.get('direction') == 'ingress' and
256
+                not rule.get('remote_ip_prefix')):
257
+            return True
258
+        return False
259
+
208 260
     def _remove_default_octavia_rules(self, sg_id, listener):
209 261
         neutron = clients.get_neutron_client()
210 262
         for remaining in self._provisioning_timer(
@@ -678,8 +730,15 @@ class LBaaSv2Driver(base.LBaaSDriver):
678 730
                 if interval:
679 731
                     time.sleep(interval)
680 732
 
681
-    def _find_listeners_sg(self, loadbalancer):
733
+    def _find_listeners_sg(self, loadbalancer, lb_name=None):
682 734
         neutron = clients.get_neutron_client()
735
+        if lb_name:
736
+            sgs = neutron.list_security_groups(
737
+                name=lb_name, project_id=loadbalancer.project_id)
738
+            # NOTE(ltomasbo): lb_name parameter is only passed when sg_mode
739
+            # is 'create' and in that case there is only one sg associated
740
+            # to the loadbalancer
741
+            return sgs['security_groups'][0]['id']
683 742
         try:
684 743
             sgs = neutron.list_security_groups(
685 744
                 name=loadbalancer.name, project_id=loadbalancer.project_id)
@@ -837,3 +896,25 @@ class LBaaSv2Driver(base.LBaaSDriver):
837 896
                     entry['id'] != l7policy.id):
838 897
                 return True
839 898
         return False
899
+
900
+    def update_lbaas_sg(self, service, sgs):
901
+        LOG.debug('Setting SG for LBaaS VIP port')
902
+
903
+        svc_namespace = service['metadata']['namespace']
904
+        svc_name = service['metadata']['name']
905
+        svc_ports = service['spec']['ports']
906
+
907
+        lbaas_name = "%s/%s" % (svc_namespace, svc_name)
908
+        lbaas = utils.get_lbaas_spec(service)
909
+        if not lbaas:
910
+            return
911
+
912
+        for port in svc_ports:
913
+            port_protocol = port['protocol']
914
+            lbaas_port = port['port']
915
+            target_port = port['targetPort']
916
+            sg_rule_name = "%s:%s:%s" % (lbaas_name, port_protocol, lbaas_port)
917
+
918
+            self._apply_members_security_groups(lbaas, lbaas_port,
919
+                                                target_port, port_protocol,
920
+                                                sg_rule_name, sgs)

+ 34
- 0
kuryr_kubernetes/controller/handlers/policy.py View File

@@ -20,6 +20,7 @@ from kuryr_kubernetes import clients
20 20
 from kuryr_kubernetes import constants as k_const
21 21
 from kuryr_kubernetes.controller.drivers import base as drivers
22 22
 from kuryr_kubernetes.controller.drivers import utils as driver_utils
23
+from kuryr_kubernetes import exceptions
23 24
 from kuryr_kubernetes.handlers import k8s_base
24 25
 from kuryr_kubernetes import utils
25 26
 
@@ -56,6 +57,7 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
56 57
         self._drv_vif_pool.set_vif_driver()
57 58
         self._drv_pod_sg = drivers.PodSecurityGroupsDriver.get_instance()
58 59
         self._drv_svc_sg = drivers.ServiceSecurityGroupsDriver.get_instance()
60
+        self._drv_lbaas = drivers.LBaaSDriver.get_instance()
59 61
 
60 62
     def on_present(self, policy):
61 63
         LOG.debug("Created or updated: %s", policy)
@@ -76,6 +78,19 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
76 78
             pod_sgs = self._drv_pod_sg.get_security_groups(pod, project_id)
77 79
             self._drv_vif_pool.update_vif_sgs(pod, pod_sgs)
78 80
 
81
+        if pods_to_update:
82
+            # NOTE(ltomasbo): only need to change services if the pods that
83
+            # they point to are updated
84
+            services = self._get_services(policy['metadata']['namespace'])
85
+            for service in services.get('items'):
86
+                # TODO(ltomasbo): Skip other services that are not affected
87
+                # by the policy
88
+                if service['metadata']['name'] == 'kubernetes':
89
+                    continue
90
+                sgs = self._drv_svc_sg.get_security_groups(service,
91
+                                                           project_id)
92
+                self._drv_lbaas.update_lbaas_sg(service, sgs)
93
+
79 94
     def on_deleted(self, policy):
80 95
         LOG.debug("Deleted network policy: %s", policy)
81 96
         project_id = self._drv_project.get_project(policy)
@@ -98,6 +113,13 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
98 113
 
99 114
         self._drv_policy.release_network_policy(netpolicy_crd)
100 115
 
116
+        services = self._get_services(policy['metadata']['namespace'])
117
+        for service in services.get('items'):
118
+            if service['metadata']['name'] == 'kubernetes':
119
+                continue
120
+            sgs = self._drv_svc_sg.get_security_groups(service, project_id)
121
+            self._drv_lbaas.update_lbaas_sg(service, sgs)
122
+
101 123
     def is_ready(self, quota):
102 124
         if not utils.has_kuryr_crd(k_const.K8S_API_CRD_KURYRNETPOLICIES):
103 125
             return False
@@ -111,3 +133,15 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
111 133
         if utils.has_limit(sg_quota):
112 134
             return utils.is_available('security_groups', sg_quota, sg_func)
113 135
         return True
136
+
137
+    def _get_services(self, namespace):
138
+        kubernetes = clients.get_kubernetes_client()
139
+        services = {"items": []}
140
+        try:
141
+            services = kubernetes.get(
142
+                '{}/namespaces/{}/services'.format(k_const.K8S_API_BASE,
143
+                                                   namespace))
144
+        except exceptions.K8sClientException:
145
+            LOG.exception("Kubernetes Client Exception.")
146
+            raise
147
+        return services

+ 47
- 1
kuryr_kubernetes/tests/unit/controller/handlers/test_policy.py View File

@@ -64,6 +64,8 @@ class TestPolicyHandler(test_base.TestCase):
64 64
             spec=drivers.ServiceSecurityGroupsDriver)
65 65
         self._handler._drv_vif_pool = mock.MagicMock(
66 66
             spec=drivers.VIFPoolDriver)
67
+        self._handler._drv_lbaas = mock.Mock(
68
+            spec=drivers.LBaaSDriver)
67 69
 
68 70
         self._get_project = self._handler._drv_project.get_project
69 71
         self._get_project.return_value = self._project_id
@@ -74,6 +76,8 @@ class TestPolicyHandler(test_base.TestCase):
74 76
             spec=drivers.PodVIFDriver)
75 77
         self._update_vif_sgs = self._handler._drv_vif_pool.update_vif_sgs
76 78
         self._update_vif_sgs.return_value = None
79
+        self._update_lbaas_sg = self._handler._drv_lbaas.update_lbaas_sg
80
+        self._update_lbaas_sg.return_value = None
77 81
 
78 82
     def _get_knp_obj(self):
79 83
         knp_obj = {
@@ -89,13 +93,15 @@ class TestPolicyHandler(test_base.TestCase):
89 93
             }}
90 94
         return knp_obj
91 95
 
96
+    @mock.patch.object(drivers.LBaaSDriver, 'get_instance')
92 97
     @mock.patch.object(drivers.ServiceSecurityGroupsDriver, 'get_instance')
93 98
     @mock.patch.object(drivers.PodSecurityGroupsDriver, 'get_instance')
94 99
     @mock.patch.object(drivers.VIFPoolDriver, 'get_instance')
95 100
     @mock.patch.object(drivers.NetworkPolicyDriver, 'get_instance')
96 101
     @mock.patch.object(drivers.NetworkPolicyProjectDriver, 'get_instance')
97 102
     def test_init(self, m_get_project_driver, m_get_policy_driver,
98
-                  m_get_vif_driver, m_get_pod_sg_driver, m_get_svc_sg_driver):
103
+                  m_get_vif_driver, m_get_pod_sg_driver, m_get_svc_sg_driver,
104
+                  m_get_lbaas_driver):
99 105
         handler = policy.NetworkPolicyHandler()
100 106
 
101 107
         m_get_project_driver.assert_called_once()
@@ -103,6 +109,7 @@ class TestPolicyHandler(test_base.TestCase):
103 109
         m_get_vif_driver.assert_called_once()
104 110
         m_get_pod_sg_driver.assert_called_once()
105 111
         m_get_svc_sg_driver.assert_called_once()
112
+        m_get_lbaas_driver.assert_called_once()
106 113
 
107 114
         self.assertEqual(m_get_project_driver.return_value,
108 115
                          handler._drv_project)
@@ -124,6 +131,7 @@ class TestPolicyHandler(test_base.TestCase):
124 131
         sg1 = [mock.sentinel.sg1]
125 132
         sg2 = [mock.sentinel.sg2]
126 133
         self._get_security_groups.side_effect = [sg1, sg2]
134
+        self._handler._get_services.return_value = {'items': []}
127 135
 
128 136
         policy.NetworkPolicyHandler.on_present(self._handler, self._policy)
129 137
         namespaced_pods.assert_not_called()
@@ -137,6 +145,7 @@ class TestPolicyHandler(test_base.TestCase):
137 145
 
138 146
         calls = [mock.call(modified_pod, sg1), mock.call(match_pod, sg2)]
139 147
         self._update_vif_sgs.assert_has_calls(calls)
148
+        self._update_lbaas_sg.assert_not_called()
140 149
 
141 150
     @mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
142 151
     def test_on_present_without_knps_on_namespace(self, m_host_network):
@@ -151,6 +160,7 @@ class TestPolicyHandler(test_base.TestCase):
151 160
         sg2 = [mock.sentinel.sg2]
152 161
         sg3 = [mock.sentinel.sg3]
153 162
         self._get_security_groups.side_effect = [sg2, sg3]
163
+        self._handler._get_services.return_value = {'items': []}
154 164
 
155 165
         policy.NetworkPolicyHandler.on_present(self._handler, self._policy)
156 166
         ensure_nw_policy.assert_called_once_with(self._policy,
@@ -164,6 +174,40 @@ class TestPolicyHandler(test_base.TestCase):
164 174
         calls = [mock.call(modified_pod, sg2),
165 175
                  mock.call(match_pod, sg3)]
166 176
         self._update_vif_sgs.assert_has_calls(calls)
177
+        self._update_lbaas_sg.assert_not_called()
178
+
179
+    @mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
180
+    def test_on_present_with_services(self, m_host_network):
181
+        modified_pod = mock.sentinel.modified_pod
182
+        match_pod = mock.sentinel.match_pod
183
+        m_host_network.return_value = False
184
+
185
+        knp_on_ns = self._handler._drv_policy.knps_on_namespace
186
+        knp_on_ns.return_value = True
187
+        namespaced_pods = self._handler._drv_policy.namespaced_pods
188
+        ensure_nw_policy = self._handler._drv_policy.ensure_network_policy
189
+        ensure_nw_policy.return_value = [modified_pod]
190
+        affected_pods = self._handler._drv_policy.affected_pods
191
+        affected_pods.return_value = [match_pod]
192
+        sg1 = [mock.sentinel.sg1]
193
+        sg2 = [mock.sentinel.sg2]
194
+        self._get_security_groups.side_effect = [sg1, sg2]
195
+        service = {'metadata': {'name': 'service-test'}}
196
+        self._handler._get_services.return_value = {'items': [service]}
197
+
198
+        policy.NetworkPolicyHandler.on_present(self._handler, self._policy)
199
+        namespaced_pods.assert_not_called()
200
+        ensure_nw_policy.assert_called_once_with(self._policy,
201
+                                                 self._project_id)
202
+        affected_pods.assert_called_once_with(self._policy)
203
+
204
+        calls = [mock.call(modified_pod, self._project_id),
205
+                 mock.call(match_pod, self._project_id)]
206
+        self._get_security_groups.assert_has_calls(calls)
207
+
208
+        calls = [mock.call(modified_pod, sg1), mock.call(match_pod, sg2)]
209
+        self._update_vif_sgs.assert_has_calls(calls)
210
+        self._update_lbaas_sg.assert_called_once()
167 211
 
168 212
     @mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
169 213
     def test_on_deleted(self, m_host_network):
@@ -178,6 +222,7 @@ class TestPolicyHandler(test_base.TestCase):
178 222
         sg1 = [mock.sentinel.sg1]
179 223
         sg2 = [mock.sentinel.sg2]
180 224
         self._get_security_groups.side_effect = [sg1, sg2]
225
+        self._handler._get_services.return_value = {'items': []}
181 226
         release_nw_policy = self._handler._drv_policy.release_network_policy
182 227
         knp_on_ns = self._handler._drv_policy.knps_on_namespace
183 228
         knp_on_ns.return_value = False
@@ -189,3 +234,4 @@ class TestPolicyHandler(test_base.TestCase):
189 234
         self._get_security_groups.assert_called_once_with(match_pod,
190 235
                                                           self._project_id)
191 236
         self._update_vif_sgs.assert_called_once_with(match_pod, sg1)
237
+        self._update_lbaas_sg.assert_not_called()

+ 14
- 1
kuryr_kubernetes/utils.py View File

@@ -24,11 +24,12 @@ from oslo_log import log
24 24
 from oslo_serialization import jsonutils
25 25
 
26 26
 from kuryr_kubernetes import clients
27
+from kuryr_kubernetes import constants
27 28
 from kuryr_kubernetes import exceptions
29
+from kuryr_kubernetes.objects import lbaas as obj_lbaas
28 30
 from kuryr_kubernetes.objects import vif
29 31
 from kuryr_kubernetes import os_vif_util
30 32
 
31
-
32 33
 CONF = cfg.CONF
33 34
 LOG = log.getLogger(__name__)
34 35
 
@@ -210,3 +211,15 @@ def has_kuryr_crd(crd_url):
210 211
                       " CRD. %s" % exceptions.K8sClientException)
211 212
         return False
212 213
     return True
214
+
215
+
216
+def get_lbaas_spec(service):
217
+    try:
218
+        annotations = service['metadata']['annotations']
219
+        annotation = annotations[constants.K8S_ANNOTATION_LBAAS_SPEC]
220
+    except KeyError:
221
+        return None
222
+    obj_dict = jsonutils.loads(annotation)
223
+    obj = obj_lbaas.LBaaSServiceSpec.obj_from_primitive(obj_dict)
224
+    LOG.debug("Got LBaaSServiceSpec from annotation: %r", obj)
225
+    return obj

Loading…
Cancel
Save