diff --git a/kuryr_kubernetes/controller/drivers/lbaasv2.py b/kuryr_kubernetes/controller/drivers/lbaasv2.py index e4b264c9e..832a7c00b 100644 --- a/kuryr_kubernetes/controller/drivers/lbaasv2.py +++ b/kuryr_kubernetes/controller/drivers/lbaasv2.py @@ -148,6 +148,7 @@ class LBaaSv2Driver(base.LBaaSDriver): protocol, sg_rule_name, new_sgs=None): LOG.debug("Applying members security groups.") neutron = clients.get_neutron_client() + lb_sg = None if CONF.octavia_defaults.sg_mode == 'create': if new_sgs: lb_name = sg_rule_name.split(":")[0] @@ -155,7 +156,15 @@ class LBaaSv2Driver(base.LBaaSDriver): else: lb_sg = self._find_listeners_sg(loadbalancer) else: - lb_sg = self._get_vip_port(loadbalancer).get('security_groups')[0] + vip_port = self._get_vip_port(loadbalancer) + if vip_port: + lb_sg = vip_port.get('security_groups')[0] + + # NOTE (maysams) It might happen that the update of LBaaS SG + # has been triggered and the LBaaS SG was not created yet. + # This update is skiped, until the LBaaS members are created. + if not lb_sg: + return lbaas_sg_rules = neutron.list_security_group_rules( security_group_id=lb_sg) @@ -747,7 +756,12 @@ class LBaaSv2Driver(base.LBaaSDriver): # NOTE(ltomasbo): lb_name parameter is only passed when sg_mode # is 'create' and in that case there is only one sg associated # to the loadbalancer - return sgs['security_groups'][0]['id'] + try: + sg_id = sgs['security_groups'][0]['id'] + except IndexError: + sg_id = None + LOG.debug("Security Group not created yet for LBaaS.") + return sg_id try: sgs = neutron.list_security_groups( name=loadbalancer.name, project_id=loadbalancer.project_id) @@ -917,6 +931,9 @@ class LBaaSv2Driver(base.LBaaSDriver): if not lbaas: return + lbaas.security_groups_ids = sgs + utils.set_lbaas_spec(service, lbaas) + for port in svc_ports: port_protocol = port['protocol'] lbaas_port = port['port'] diff --git a/kuryr_kubernetes/controller/drivers/utils.py b/kuryr_kubernetes/controller/drivers/utils.py index 57033ea0d..4aba787ea 100644 --- a/kuryr_kubernetes/controller/drivers/utils.py +++ b/kuryr_kubernetes/controller/drivers/utils.py @@ -392,3 +392,16 @@ def tag_neutron_resources(resource, res_ids): LOG.warning("Failed to tag %s %s with %s. Ignoring, but this " "is still unexpected.", resource, res_id, tags, exc_info=True) + + +def get_services(namespace): + kubernetes = clients.get_kubernetes_client() + try: + services = kubernetes.get( + '{}/namespaces/{}/services'.format(constants.K8S_API_BASE, + namespace)) + except k_exc.K8sClientException: + LOG.exception('Exception when getting K8s services in ' + 'namespace %s', namespace) + raise + return services diff --git a/kuryr_kubernetes/controller/handlers/lbaas.py b/kuryr_kubernetes/controller/handlers/lbaas.py index 74469a89c..97a505867 100644 --- a/kuryr_kubernetes/controller/handlers/lbaas.py +++ b/kuryr_kubernetes/controller/handlers/lbaas.py @@ -24,6 +24,7 @@ from kuryr_kubernetes.controller.drivers import base as drv_base from kuryr_kubernetes import exceptions as k_exc from kuryr_kubernetes.handlers import k8s_base from kuryr_kubernetes.objects import lbaas as obj_lbaas +from kuryr_kubernetes import utils LOG = logging.getLogger(__name__) @@ -47,7 +48,7 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler): self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance() def on_present(self, service): - lbaas_spec = self._get_lbaas_spec(service) + lbaas_spec = utils.get_lbaas_spec(service) if self._should_ignore(service): LOG.debug("Skipping Kubernetes service %s of an unsupported kind " @@ -58,7 +59,7 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler): if self._has_lbaas_spec_changes(service, lbaas_spec): lbaas_spec = self._generate_lbaas_spec(service) - self._set_lbaas_spec(service, lbaas_spec) + utils.set_lbaas_spec(service, lbaas_spec) def _is_supported_type(self, service): spec = service['spec'] @@ -167,55 +168,6 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler): return [obj_lbaas.LBaaSPortSpec(**port) for port in self._get_service_ports(service)] - def _get_endpoints_link(self, service): - svc_link = service['metadata']['selfLink'] - link_parts = svc_link.split('/') - - if link_parts[-2] != 'services': - raise k_exc.IntegrityError(_( - "Unsupported service link: %(link)s") % { - 'link': svc_link}) - link_parts[-2] = 'endpoints' - - return "/".join(link_parts) - - def _set_lbaas_spec(self, service, lbaas_spec): - # TODO(ivc): extract annotation interactions - if lbaas_spec is None: - LOG.debug("Removing LBaaSServiceSpec annotation: %r", lbaas_spec) - annotation = None - else: - lbaas_spec.obj_reset_changes(recursive=True) - LOG.debug("Setting LBaaSServiceSpec annotation: %r", lbaas_spec) - annotation = jsonutils.dumps(lbaas_spec.obj_to_primitive(), - sort_keys=True) - svc_link = service['metadata']['selfLink'] - ep_link = self._get_endpoints_link(service) - k8s = clients.get_kubernetes_client() - - try: - k8s.annotate(ep_link, - {k_const.K8S_ANNOTATION_LBAAS_SPEC: annotation}) - except k_exc.K8sClientException: - # REVISIT(ivc): only raise ResourceNotReady for NotFound - raise k_exc.ResourceNotReady(ep_link) - - k8s.annotate(svc_link, - {k_const.K8S_ANNOTATION_LBAAS_SPEC: annotation}, - resource_version=service['metadata']['resourceVersion']) - - def _get_lbaas_spec(self, service): - # TODO(ivc): same as '_set_lbaas_spec' - try: - annotations = service['metadata']['annotations'] - annotation = annotations[k_const.K8S_ANNOTATION_LBAAS_SPEC] - except KeyError: - return None - obj_dict = jsonutils.loads(annotation) - obj = obj_lbaas.LBaaSServiceSpec.obj_from_primitive(obj_dict) - LOG.debug("Got LBaaSServiceSpec from annotation: %r", obj) - return obj - class LoadBalancerHandler(k8s_base.ResourceEventHandler): """LoadBalancerHandler handles K8s Endpoints events. @@ -355,9 +307,30 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler): return changed + def _sync_lbaas_sgs(self, endpoints, lbaas_state, lbaas_spec): + # NOTE (maysams) Need to retrieve the LBaaS Spec again due to + # the possibility of it being updated after the LBaaS creation + # process has started. + svc_link = self._get_service_link(endpoints) + k8s = clients.get_kubernetes_client() + service = k8s.get(svc_link) + lbaas_spec = utils.get_lbaas_spec(service) + + lb = lbaas_state.loadbalancer + default_sgs = config.CONF.neutron_defaults.pod_security_groups + lbaas_spec_sgs = lbaas_spec.security_groups_ids + if lb.security_groups and lb.security_groups != lbaas_spec_sgs: + sgs = [lb_sg for lb_sg in lb.security_groups + if lb_sg not in default_sgs] + if lbaas_spec_sgs != default_sgs: + sgs.extend(lbaas_spec_sgs) + lb.security_groups = sgs + def _add_new_members(self, endpoints, lbaas_state, lbaas_spec): changed = False + self._sync_lbaas_sgs(endpoints, lbaas_state, lbaas_spec) + lsnr_by_id = {l.id: l for l in lbaas_state.listeners} pool_by_lsnr_port = {(lsnr_by_id[p.listener_id].protocol, lsnr_by_id[p.listener_id].port): p @@ -649,15 +622,6 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler): lbaas_state.service_pub_ip_info = None changed = True - default_sgs = config.CONF.neutron_defaults.pod_security_groups - lbaas_spec_sgs = lbaas_spec.security_groups_ids - if lb.security_groups and lb.security_groups != lbaas_spec_sgs: - sgs = [lb_sg for lb_sg in lb.security_groups - if lb_sg not in default_sgs] - if lbaas_spec_sgs != default_sgs: - sgs.extend(lbaas_spec_sgs) - lb.security_groups = sgs - lbaas_state.loadbalancer = lb return changed diff --git a/kuryr_kubernetes/controller/handlers/policy.py b/kuryr_kubernetes/controller/handlers/policy.py index 2e97782fc..68bc21ae7 100644 --- a/kuryr_kubernetes/controller/handlers/policy.py +++ b/kuryr_kubernetes/controller/handlers/policy.py @@ -20,7 +20,6 @@ from kuryr_kubernetes import clients from kuryr_kubernetes import constants as k_const from kuryr_kubernetes.controller.drivers import base as drivers from kuryr_kubernetes.controller.drivers import utils as driver_utils -from kuryr_kubernetes import exceptions from kuryr_kubernetes.handlers import k8s_base from kuryr_kubernetes import utils @@ -81,7 +80,8 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler): if pods_to_update: # NOTE(ltomasbo): only need to change services if the pods that # they point to are updated - services = self._get_services(policy['metadata']['namespace']) + services = driver_utils.get_services( + policy['metadata']['namespace']) for service in services.get('items'): # TODO(ltomasbo): Skip other services that are not affected # by the policy @@ -116,7 +116,8 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler): self._drv_policy.release_network_policy(netpolicy_crd) - services = self._get_services(policy['metadata']['namespace']) + services = driver_utils.get_services( + policy['metadata']['namespace']) for service in services.get('items'): if service['metadata']['name'] == 'kubernetes': continue @@ -137,15 +138,3 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler): if utils.has_limit(sg_quota): return utils.is_available('security_groups', sg_quota, sg_func) return True - - def _get_services(self, namespace): - kubernetes = clients.get_kubernetes_client() - services = {"items": []} - try: - services = kubernetes.get( - '{}/namespaces/{}/services'.format(k_const.K8S_API_BASE, - namespace)) - except exceptions.K8sClientException: - LOG.exception("Kubernetes Client Exception.") - raise - return services diff --git a/kuryr_kubernetes/controller/handlers/vif.py b/kuryr_kubernetes/controller/handlers/vif.py index 4ba15542d..931f23f5e 100644 --- a/kuryr_kubernetes/controller/handlers/vif.py +++ b/kuryr_kubernetes/controller/handlers/vif.py @@ -73,6 +73,8 @@ class VIFHandler(k8s_base.ResourceEventHandler): specific_driver='multi_pool') self._drv_vif_pool.set_vif_driver() self._drv_multi_vif = drivers.MultiVIFDriver.get_enabled_drivers() + self._drv_lbaas = drivers.LBaaSDriver.get_instance() + self._drv_svc_sg = drivers.ServiceSecurityGroupsDriver.get_instance() def on_present(self, pod): if driver_utils.is_host_network(pod) or not self._is_pending_node(pod): @@ -83,9 +85,8 @@ class VIFHandler(k8s_base.ResourceEventHandler): return state = driver_utils.get_pod_state(pod) LOG.debug("Got VIFs from annotation: %r", state) - + project_id = self._drv_project.get_project(pod) if not state: - project_id = self._drv_project.get_project(pod) security_groups = self._drv_sg.get_security_groups(pod, project_id) subnets = self._drv_subnets.get_subnets(pod, project_id) @@ -127,10 +128,16 @@ class VIFHandler(k8s_base.ResourceEventHandler): self._set_pod_state(pod, state) self._drv_sg.create_sg_rules(pod) + if self._is_network_policy_enabled(): + services = driver_utils.get_services( + pod['metadata']['namespace']) + self._update_services(services, project_id) + def on_deleted(self, pod): if driver_utils.is_host_network(pod): return + services = driver_utils.get_services(pod['metadata']['namespace']) project_id = self._drv_project.get_project(pod) self._drv_sg.delete_sg_rules(pod) try: @@ -150,6 +157,8 @@ class VIFHandler(k8s_base.ResourceEventHandler): for ifname, vif in state.vifs.items(): self._drv_vif_pool.release_vif(pod, vif, project_id, security_groups) + if self._is_network_policy_enabled(): + self._update_services(services, project_id) @MEMOIZE def is_ready(self, quota): @@ -198,3 +207,16 @@ class VIFHandler(k8s_base.ResourceEventHandler): {constants.K8S_ANNOTATION_VIF: annotation, constants.K8S_ANNOTATION_LABEL: labels_annotation}, resource_version=pod['metadata']['resourceVersion']) + + def _update_services(self, services, project_id): + for service in services.get('items'): + if service['metadata']['name'] == 'kubernetes': + continue + sgs = self._drv_svc_sg.get_security_groups(service, + project_id) + self._drv_lbaas.update_lbaas_sg(service, sgs) + + def _is_network_policy_enabled(self): + enabled_handlers = oslo_cfg.CONF.kubernetes.enabled_handlers + svc_sg_driver = oslo_cfg.CONF.kubernetes.service_security_groups_driver + return ('policy' in enabled_handlers and svc_sg_driver == 'policy') diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py index bec577b34..81ac42b63 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py @@ -48,7 +48,9 @@ class TestLBaaSSpecHandler(test_base.TestCase): self.assertEqual(mock.sentinel.drv_subnets, handler._drv_subnets) self.assertEqual(mock.sentinel.drv_sg, handler._drv_sg) - def test_on_present(self): + @mock.patch('kuryr_kubernetes.utils.set_lbaas_spec') + @mock.patch('kuryr_kubernetes.utils.get_lbaas_spec') + def test_on_present(self, m_get_lbaas_spec, m_set_lbaas_spec): svc_event = mock.sentinel.svc_event old_spec = mock.sentinel.old_spec new_spec = mock.sentinel.new_spec @@ -58,7 +60,7 @@ class TestLBaaSSpecHandler(test_base.TestCase): m_drv_project.get_project.return_value = project_id m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) - m_handler._get_lbaas_spec.return_value = old_spec + m_get_lbaas_spec.return_value = old_spec m_handler._has_lbaas_spec_changes.return_value = True m_handler._generate_lbaas_spec.return_value = new_spec m_handler._should_ignore.return_value = False @@ -66,43 +68,49 @@ class TestLBaaSSpecHandler(test_base.TestCase): h_lbaas.LBaaSSpecHandler.on_present(m_handler, svc_event) - m_handler._get_lbaas_spec.assert_called_once_with(svc_event) + m_get_lbaas_spec.assert_called_once_with(svc_event) m_handler._has_lbaas_spec_changes.assert_called_once_with(svc_event, old_spec) m_handler._generate_lbaas_spec.assert_called_once_with(svc_event) - m_handler._set_lbaas_spec.assert_called_once_with(svc_event, new_spec) + m_set_lbaas_spec.assert_called_once_with(svc_event, new_spec) - def test_on_present_no_changes(self): + @mock.patch('kuryr_kubernetes.utils.set_lbaas_spec') + @mock.patch('kuryr_kubernetes.utils.get_lbaas_spec') + def test_on_present_no_changes(self, m_get_lbaas_spec, + m_set_lbaas_spec): svc_event = mock.sentinel.svc_event old_spec = mock.sentinel.old_spec m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) - m_handler._get_lbaas_spec.return_value = old_spec + m_get_lbaas_spec.return_value = old_spec m_handler._has_lbaas_spec_changes.return_value = False m_handler._should_ignore.return_value = False h_lbaas.LBaaSSpecHandler.on_present(m_handler, svc_event) - m_handler._get_lbaas_spec.assert_called_once_with(svc_event) + m_get_lbaas_spec.assert_called_once_with(svc_event) m_handler._has_lbaas_spec_changes.assert_called_once_with(svc_event, old_spec) m_handler._generate_lbaas_spec.assert_not_called() - m_handler._set_lbaas_spec.assert_not_called() + m_set_lbaas_spec.assert_not_called() - def test_on_present_no_selector(self): + @mock.patch('kuryr_kubernetes.utils.set_lbaas_spec') + @mock.patch('kuryr_kubernetes.utils.get_lbaas_spec') + def test_on_present_no_selector(self, m_get_lbaas_spec, + m_set_lbaas_spec): svc_event = {'metadata': {'name': 'dummy_name'}} old_spec = mock.sentinel.old_spec m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) - m_handler._get_lbaas_spec.return_value = old_spec + m_get_lbaas_spec.return_value = old_spec m_handler._should_ignore.return_value = True h_lbaas.LBaaSSpecHandler.on_present(m_handler, svc_event) - m_handler._get_lbaas_spec.assert_called_once_with(svc_event) + m_get_lbaas_spec.assert_called_once_with(svc_event) m_handler._has_lbaas_spec_changes.assert_not_called() m_handler._generate_lbaas_spec.assert_not_called() - m_handler._set_lbaas_spec.assert_not_called() + m_set_lbaas_spec.assert_not_called() def test_get_service_ip(self): svc_body = {'spec': {'type': 'ClusterIP', @@ -328,22 +336,6 @@ class TestLBaaSSpecHandler(test_base.TestCase): m_handler._get_service_ports.assert_called_once_with( mock.sentinel.service) - def test_get_endpoints_link(self): - m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) - service = {'metadata': { - 'selfLink': "/api/v1/namespaces/default/services/test"}} - ret = h_lbaas.LBaaSSpecHandler._get_endpoints_link(m_handler, service) - expected_link = "/api/v1/namespaces/default/endpoints/test" - self.assertEqual(expected_link, ret) - - def test_get_endpoints_link__integrity_error(self): - m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) - service = {'metadata': { - 'selfLink': "/api/v1/namespaces/default/not-services/test"}} - self.assertRaises(k_exc.IntegrityError, - h_lbaas.LBaaSSpecHandler._get_endpoints_link, - m_handler, service) - def test_set_lbaas_spec(self): self.skipTest("skipping until generalised annotation handling is " "implemented") @@ -821,6 +813,8 @@ class TestLoadBalancerHandler(test_base.TestCase): for member in state.members) return observed_targets + @mock.patch('kuryr_kubernetes.controller.handlers.lbaas.' + 'LoadBalancerHandler._sync_lbaas_sgs') @mock.patch('kuryr_kubernetes.controller.drivers.base' '.PodSubnetsDriver.get_instance') @mock.patch('kuryr_kubernetes.controller.drivers.base' @@ -828,7 +822,7 @@ class TestLoadBalancerHandler(test_base.TestCase): @mock.patch('kuryr_kubernetes.controller.drivers.base' '.LBaaSDriver.get_instance') def test_sync_lbaas_members(self, m_get_drv_lbaas, m_get_drv_project, - m_get_drv_subnets): + m_get_drv_subnets, m_sync_lbaas_sgs): # REVISIT(ivc): test methods separately and verify ensure/release project_id = str(uuid.uuid4()) subnet_id = str(uuid.uuid4()) @@ -855,6 +849,8 @@ class TestLoadBalancerHandler(test_base.TestCase): self.assertEqual(sorted(expected_targets.items()), observed_targets) self.assertEqual(expected_ip, str(state.loadbalancer.ip)) + @mock.patch('kuryr_kubernetes.controller.handlers.lbaas.' + 'LoadBalancerHandler._sync_lbaas_sgs') @mock.patch('kuryr_kubernetes.controller.drivers.base' '.PodSubnetsDriver.get_instance') @mock.patch('kuryr_kubernetes.controller.drivers.base' @@ -862,7 +858,8 @@ class TestLoadBalancerHandler(test_base.TestCase): @mock.patch('kuryr_kubernetes.controller.drivers.base' '.LBaaSDriver.get_instance') def test_sync_lbaas_members_udp(self, m_get_drv_lbaas, - m_get_drv_project, m_get_drv_subnets): + m_get_drv_project, m_get_drv_subnets, + m_sync_lbaas_sgs): # REVISIT(ivc): test methods separately and verify ensure/release project_id = str(uuid.uuid4()) subnet_id = str(uuid.uuid4()) @@ -889,6 +886,8 @@ class TestLoadBalancerHandler(test_base.TestCase): self.assertEqual([], observed_targets) self.assertEqual(expected_ip, str(state.loadbalancer.ip)) + @mock.patch('kuryr_kubernetes.controller.handlers.lbaas.' + 'LoadBalancerHandler._sync_lbaas_sgs') @mock.patch('kuryr_kubernetes.controller.drivers.base' '.PodSubnetsDriver.get_instance') @mock.patch('kuryr_kubernetes.controller.drivers.base' @@ -896,7 +895,8 @@ class TestLoadBalancerHandler(test_base.TestCase): @mock.patch('kuryr_kubernetes.controller.drivers.base' '.LBaaSDriver.get_instance') def test_sync_lbaas_members_svc_listener_port_edit( - self, m_get_drv_lbaas, m_get_drv_project, m_get_drv_subnets): + self, m_get_drv_lbaas, m_get_drv_project, m_get_drv_subnets, + m_sync_lbaas_sgs): # REVISIT(ivc): test methods separately and verify ensure/release project_id = str(uuid.uuid4()) subnet_id = str(uuid.uuid4()) @@ -943,6 +943,8 @@ class TestLoadBalancerHandler(test_base.TestCase): self.skipTest("skipping until generalised annotation handling is " "implemented") + @mock.patch('kuryr_kubernetes.controller.handlers.lbaas.' + 'LoadBalancerHandler._sync_lbaas_sgs') @mock.patch('kuryr_kubernetes.controller.drivers.base' '.PodSubnetsDriver.get_instance') @mock.patch('kuryr_kubernetes.controller.drivers.base' @@ -950,7 +952,8 @@ class TestLoadBalancerHandler(test_base.TestCase): @mock.patch('kuryr_kubernetes.controller.drivers.base' '.LBaaSDriver.get_instance') def test_add_new_members_udp(self, m_get_drv_lbaas, - m_get_drv_project, m_get_drv_subnets): + m_get_drv_project, m_get_drv_subnets, + m_sync_lbaas_sgs): project_id = str(uuid.uuid4()) subnet_id = str(uuid.uuid4()) current_ip = '1.1.1.1' diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_policy.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_policy.py index 38a0a778b..9b066c007 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_policy.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_policy.py @@ -115,8 +115,9 @@ class TestPolicyHandler(test_base.TestCase): handler._drv_project) self.assertEqual(m_get_policy_driver.return_value, handler._drv_policy) + @mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services') @mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network') - def test_on_present(self, m_host_network): + def test_on_present(self, m_host_network, m_get_services): modified_pod = mock.sentinel.modified_pod match_pod = mock.sentinel.match_pod m_host_network.return_value = False @@ -131,7 +132,7 @@ class TestPolicyHandler(test_base.TestCase): sg1 = [mock.sentinel.sg1] sg2 = [mock.sentinel.sg2] self._get_security_groups.side_effect = [sg1, sg2] - self._handler._get_services.return_value = {'items': []} + m_get_services.return_value = {'items': []} policy.NetworkPolicyHandler.on_present(self._handler, self._policy) namespaced_pods.assert_not_called() @@ -147,8 +148,10 @@ class TestPolicyHandler(test_base.TestCase): self._update_vif_sgs.assert_has_calls(calls) self._update_lbaas_sg.assert_not_called() + @mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services') @mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network') - def test_on_present_without_knps_on_namespace(self, m_host_network): + def test_on_present_without_knps_on_namespace(self, m_host_network, + m_get_services): modified_pod = mock.sentinel.modified_pod match_pod = mock.sentinel.match_pod m_host_network.return_value = False @@ -160,7 +163,7 @@ class TestPolicyHandler(test_base.TestCase): sg2 = [mock.sentinel.sg2] sg3 = [mock.sentinel.sg3] self._get_security_groups.side_effect = [sg2, sg3] - self._handler._get_services.return_value = {'items': []} + m_get_services.return_value = {'items': []} policy.NetworkPolicyHandler.on_present(self._handler, self._policy) ensure_nw_policy.assert_called_once_with(self._policy, @@ -176,8 +179,9 @@ class TestPolicyHandler(test_base.TestCase): self._update_vif_sgs.assert_has_calls(calls) self._update_lbaas_sg.assert_not_called() + @mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services') @mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network') - def test_on_present_with_services(self, m_host_network): + def test_on_present_with_services(self, m_host_network, m_get_services): modified_pod = mock.sentinel.modified_pod match_pod = mock.sentinel.match_pod m_host_network.return_value = False @@ -193,7 +197,7 @@ class TestPolicyHandler(test_base.TestCase): sg2 = [mock.sentinel.sg2] self._get_security_groups.side_effect = [sg1, sg2] service = {'metadata': {'name': 'service-test'}} - self._handler._get_services.return_value = {'items': [service]} + m_get_services.return_value = {'items': [service]} policy.NetworkPolicyHandler.on_present(self._handler, self._policy) namespaced_pods.assert_not_called() @@ -209,8 +213,9 @@ class TestPolicyHandler(test_base.TestCase): self._update_vif_sgs.assert_has_calls(calls) self._update_lbaas_sg.assert_called_once() + @mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services') @mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network') - def test_on_deleted(self, m_host_network): + def test_on_deleted(self, m_host_network, m_get_services): namespace_pod = mock.sentinel.namespace_pod match_pod = mock.sentinel.match_pod m_host_network.return_value = False @@ -222,7 +227,7 @@ class TestPolicyHandler(test_base.TestCase): sg1 = [mock.sentinel.sg1] sg2 = [mock.sentinel.sg2] self._get_security_groups.side_effect = [sg1, sg2] - self._handler._get_services.return_value = {'items': []} + m_get_services.return_value = {'items': []} release_nw_policy = self._handler._drv_policy.release_network_policy knp_on_ns = self._handler._drv_policy.knps_on_namespace knp_on_ns.return_value = False diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_vif.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_vif.py index 609bdd71b..26f9abb44 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_vif.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_vif.py @@ -42,9 +42,11 @@ class TestVIFHandler(test_base.TestCase): self._pod_version = mock.sentinel.pod_version self._pod_link = mock.sentinel.pod_link + self._pod_namespace = mock.sentinel.namespace self._pod = { 'metadata': {'resourceVersion': self._pod_version, - 'selfLink': self._pod_link}, + 'selfLink': self._pod_link, + 'namespace': self._pod_namespace}, 'status': {'phase': k_const.K8S_POD_STATUS_PENDING}, 'spec': {'hostNetwork': False, 'nodeName': 'hostname'} @@ -168,11 +170,14 @@ class TestVIFHandler(test_base.TestCase): self._activate_vif.assert_not_called() self._set_pod_state.assert_not_called() + @mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services') @mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network') @mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state') - def test_on_present_activate(self, m_get_pod_state, m_host_network): + def test_on_present_activate(self, m_get_pod_state, m_host_network, + m_get_services): m_get_pod_state.return_value = self._state m_host_network.return_value = False + m_get_services.return_value = {"items": []} self._vif.active = False h_vif.VIFHandler.on_present(self._handler, self._pod) @@ -239,11 +244,13 @@ class TestVIFHandler(test_base.TestCase): self._security_groups) self._activate_vif.assert_not_called() + @mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services') @mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network') @mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state') - def test_on_deleted(self, m_get_pod_state, m_host_network): + def test_on_deleted(self, m_get_pod_state, m_host_network, m_get_services): m_get_pod_state.return_value = self._state m_host_network.return_value = False + m_get_services.return_value = {"items": []} h_vif.VIFHandler.on_deleted(self._handler, self._pod) m_get_pod_state.assert_called_once_with(self._pod) @@ -251,14 +258,16 @@ class TestVIFHandler(test_base.TestCase): self._project_id, self._security_groups) + @mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services') @mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network') @mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state') def test_on_deleted_with_additional_vifs(self, m_get_pod_state, - m_host_network): + m_host_network, m_get_services): additional_vif = os_obj.vif.VIFBase() self._state.additional_vifs = {'eth1': additional_vif} m_get_pod_state.return_value = self._state m_host_network.return_value = False + m_get_services.return_value = {"items": []} h_vif.VIFHandler.on_deleted(self._handler, self._pod) @@ -280,11 +289,14 @@ class TestVIFHandler(test_base.TestCase): m_get_pod_state.assert_not_called() self._release_vif.assert_not_called() + @mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services') @mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network') @mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state') - def test_on_deleted_no_annotation(self, m_get_pod_state, m_host_network): + def test_on_deleted_no_annotation(self, m_get_pod_state, m_host_network, + m_get_services): m_get_pod_state.return_value = None m_host_network.return_value = False + m_get_services.return_value = {"items": []} h_vif.VIFHandler.on_deleted(self._handler, self._pod) diff --git a/kuryr_kubernetes/tests/unit/test_utils.py b/kuryr_kubernetes/tests/unit/test_utils.py index 5c55b521b..74f26d3d7 100644 --- a/kuryr_kubernetes/tests/unit/test_utils.py +++ b/kuryr_kubernetes/tests/unit/test_utils.py @@ -157,3 +157,10 @@ class TestUtils(test_base.TestCase): self.assertEqual(resp, False) kubernetes.get.assert_called_once() + + def test_get_endpoints_link(self): + service = {'metadata': { + 'selfLink': "/api/v1/namespaces/default/services/test"}} + ret = utils.get_endpoints_link(service) + expected_link = "/api/v1/namespaces/default/endpoints/test" + self.assertEqual(expected_link, ret) diff --git a/kuryr_kubernetes/utils.py b/kuryr_kubernetes/utils.py index 9aa317705..aa560e051 100644 --- a/kuryr_kubernetes/utils.py +++ b/kuryr_kubernetes/utils.py @@ -223,3 +223,43 @@ def get_lbaas_spec(service): obj = obj_lbaas.LBaaSServiceSpec.obj_from_primitive(obj_dict) LOG.debug("Got LBaaSServiceSpec from annotation: %r", obj) return obj + + +def set_lbaas_spec(service, lbaas_spec): + # TODO(ivc): extract annotation interactions + if lbaas_spec is None: + LOG.debug("Removing LBaaSServiceSpec annotation: %r", lbaas_spec) + annotation = None + else: + lbaas_spec.obj_reset_changes(recursive=True) + LOG.debug("Setting LBaaSServiceSpec annotation: %r", lbaas_spec) + annotation = jsonutils.dumps(lbaas_spec.obj_to_primitive(), + sort_keys=True) + svc_link = service['metadata']['selfLink'] + ep_link = get_endpoints_link(service) + k8s = clients.get_kubernetes_client() + + try: + k8s.annotate(ep_link, + {constants.K8S_ANNOTATION_LBAAS_SPEC: annotation}) + except exceptions.K8sResourceNotFound: + raise exceptions.ResourceNotReady(ep_link) + except exceptions.K8sClientException: + LOG.debug("Failed to annotate endpoint %r", ep_link) + raise + k8s.annotate(svc_link, + {constants.K8S_ANNOTATION_LBAAS_SPEC: annotation}, + resource_version=service['metadata']['resourceVersion']) + + +def get_endpoints_link(service): + svc_link = service['metadata']['selfLink'] + link_parts = svc_link.split('/') + + if link_parts[-2] != 'services': + raise exceptions.IntegrityError(_( + "Unsupported service link: %(link)s") % { + 'link': svc_link}) + link_parts[-2] = 'endpoints' + + return "/".join(link_parts)