From dfa9a392f1d3b777a68adda9f5e06d36575bf5a1 Mon Sep 17 00:00:00 2001 From: Luis Tomas Bolivar Date: Thu, 7 Mar 2019 09:29:06 +0100 Subject: [PATCH] Add support for svc with text targetPorts This patch adds support for services that define the targetPort with text (port name), pointing to the same port number as the defined exposed port on the svc. Closes-Bug: 1818969 Change-Id: I7f957d292f7c4a43b759292e5bd04c4db704c4c4 --- kuryr_kubernetes/controller/handlers/lbaas.py | 61 ++++--------- kuryr_kubernetes/objects/lbaas.py | 2 +- .../unit/controller/handlers/test_lbaas.py | 91 +++---------------- kuryr_kubernetes/tests/unit/test_object.py | 2 +- kuryr_kubernetes/tests/unit/test_utils.py | 51 +++++++++++ kuryr_kubernetes/utils.py | 28 ++++++ 6 files changed, 112 insertions(+), 123 deletions(-) diff --git a/kuryr_kubernetes/controller/handlers/lbaas.py b/kuryr_kubernetes/controller/handlers/lbaas.py index 97a505867..e6330ef78 100644 --- a/kuryr_kubernetes/controller/handlers/lbaas.py +++ b/kuryr_kubernetes/controller/handlers/lbaas.py @@ -118,33 +118,7 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler): def _has_lbaas_spec_changes(self, service, lbaas_spec): return (self._has_ip_changes(service, lbaas_spec) or - self._has_port_changes(service, lbaas_spec)) - - def _get_service_ports(self, service): - return [{'name': port.get('name'), - 'protocol': port.get('protocol', 'TCP'), - 'port': port['port'], - 'targetPort': port['targetPort']} - for port in service['spec']['ports']] - - def _has_port_changes(self, service, lbaas_spec): - link = service['metadata']['selfLink'] - - fields = obj_lbaas.LBaaSPortSpec.fields - svc_port_set = {tuple(port[attr] for attr in fields) - for port in self._get_service_ports(service)} - - spec_port_set = {tuple(getattr(port, attr) - for attr in fields - if port.obj_attr_is_set(attr)) - for port in lbaas_spec.ports} - - if svc_port_set != spec_port_set: - LOG.debug("LBaaS spec ports %(spec_ports)s != %(svc_ports)s " - "for %(link)s" % {'spec_ports': spec_port_set, - 'svc_ports': svc_port_set, - 'link': link}) - return svc_port_set != spec_port_set + utils.has_port_changes(service, lbaas_spec)) def _has_ip_changes(self, service, lbaas_spec): link = service['metadata']['selfLink'] @@ -166,7 +140,7 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler): def _generate_lbaas_port_specs(self, service): return [obj_lbaas.LBaaSPortSpec(**port) - for port in self._get_service_ports(service)] + for port in utils.get_service_ports(service)] class LoadBalancerHandler(k8s_base.ResourceEventHandler): @@ -262,24 +236,25 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler): obj_lbaas.LBaaSServiceSpec()) def _should_ignore(self, endpoints, lbaas_spec): + # NOTE(ltomasbo): we must wait until service handler has annotated the + # endpoints to process them. Thus, if annotations are not updated to + # match the endpoints information, we should skip the event return not(lbaas_spec and self._has_pods(endpoints) and - self._is_lbaas_spec_in_sync(endpoints, lbaas_spec)) + self._svc_handler_annotations_updated(endpoints, + lbaas_spec)) - def _is_lbaas_spec_in_sync(self, endpoints, lbaas_spec): - ports = lbaas_spec.ports - ep_ports = list(set((port.get('name'), port.get('port')) - if ports[0].obj_attr_is_set('targetPort') - else port.get('name') - for subset in endpoints.get('subsets', []) - for port in subset.get('ports', []))) - - spec_ports = [(port.name, port.targetPort) - if port.obj_attr_is_set('targetPort') - else port.name - for port in ports] - - return sorted(ep_ports) == sorted(spec_ports) + def _svc_handler_annotations_updated(self, endpoints, lbaas_spec): + svc_link = self._get_service_link(endpoints) + k8s = clients.get_kubernetes_client() + service = k8s.get(svc_link) + if utils.has_port_changes(service, lbaas_spec): + # NOTE(ltomasbo): Ensuring lbaas_spec annotated on the endpoints + # is in sync with the service status, i.e., upon a service + # modification it will ensure endpoint modifications are not + # handled until the service handler has performed its annotations + return False + return True def _has_pods(self, endpoints): ep_subsets = endpoints.get('subsets', []) diff --git a/kuryr_kubernetes/objects/lbaas.py b/kuryr_kubernetes/objects/lbaas.py index 87352ea2e..b0c971327 100644 --- a/kuryr_kubernetes/objects/lbaas.py +++ b/kuryr_kubernetes/objects/lbaas.py @@ -128,7 +128,7 @@ class LBaaSPortSpec(k_obj.KuryrK8sObjectBase): 'name': obj_fields.StringField(nullable=True), 'protocol': obj_fields.StringField(), 'port': obj_fields.IntegerField(), - 'targetPort': obj_fields.IntegerField(), + 'targetPort': obj_fields.StringField(), } diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py index 81ac42b63..014142406 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py @@ -193,7 +193,8 @@ class TestLBaaSSpecHandler(test_base.TestCase): m_drv_sg.get_security_groups.assert_called_once_with( service, project_id) - def test_has_lbaas_spec_changes(self): + @mock.patch('kuryr_kubernetes.utils.has_port_changes') + def test_has_lbaas_spec_changes(self, m_port_changes): m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) service = mock.sentinel.service lbaas_spec = mock.sentinel.lbaas_spec @@ -201,65 +202,11 @@ class TestLBaaSSpecHandler(test_base.TestCase): for has_ip_changes in (True, False): for has_port_changes in (True, False): m_handler._has_ip_changes.return_value = has_ip_changes - m_handler._has_port_changes.return_value = has_port_changes + m_port_changes.return_value = has_port_changes ret = h_lbaas.LBaaSSpecHandler._has_lbaas_spec_changes( m_handler, service, lbaas_spec) self.assertEqual(has_ip_changes or has_port_changes, ret) - def test_get_service_ports(self): - m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) - service = {'spec': {'ports': [ - {'port': 1, 'targetPort': 1}, - {'port': 2, 'name': 'X', 'protocol': 'UDP', 'targetPort': 2} - ]}} - expected_ret = [ - {'port': 1, 'name': None, 'protocol': 'TCP', 'targetPort': 1}, - {'port': 2, 'name': 'X', 'protocol': 'UDP', 'targetPort': 2}] - - ret = h_lbaas.LBaaSSpecHandler._get_service_ports(m_handler, service) - self.assertEqual(expected_ret, ret) - - def test_has_port_changes(self): - m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) - m_service = mock.MagicMock() - m_handler._get_service_ports.return_value = [ - {'port': 1, 'name': 'X', 'protocol': 'TCP', 'targetPort': 1}, - ] - - m_lbaas_spec = mock.MagicMock() - m_lbaas_spec.ports = [ - obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1, - targetPort=1), - obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2, - targetPort=2), - ] - - ret = h_lbaas.LBaaSSpecHandler._has_port_changes( - m_handler, m_service, m_lbaas_spec) - - self.assertTrue(ret) - - def test_has_port_changes__no_changes(self): - m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) - m_service = mock.MagicMock() - m_handler._get_service_ports.return_value = [ - {'port': 1, 'name': 'X', 'protocol': 'TCP', 'targetPort': 1}, - {'port': 2, 'name': 'Y', 'protocol': 'TCP', 'targetPort': 2} - ] - - m_lbaas_spec = mock.MagicMock() - m_lbaas_spec.ports = [ - obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1, - targetPort=1), - obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2, - targetPort=2), - ] - - ret = h_lbaas.LBaaSSpecHandler._has_port_changes( - m_handler, m_service, m_lbaas_spec) - - self.assertFalse(ret) - def test_has_ip_changes(self): m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) m_service = mock.MagicMock() @@ -302,9 +249,10 @@ class TestLBaaSSpecHandler(test_base.TestCase): m_handler, m_service, m_lbaas_spec) self.assertFalse(ret) - def test_generate_lbaas_port_specs(self): + @mock.patch('kuryr_kubernetes.utils.get_service_ports') + def test_generate_lbaas_port_specs(self, m_get_service_ports): m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) - m_handler._get_service_ports.return_value = [ + m_get_service_ports.return_value = [ {'port': 1, 'name': 'X', 'protocol': 'TCP'}, {'port': 2, 'name': 'Y', 'protocol': 'TCP'} ] @@ -316,12 +264,13 @@ class TestLBaaSSpecHandler(test_base.TestCase): ret = h_lbaas.LBaaSSpecHandler._generate_lbaas_port_specs( m_handler, mock.sentinel.service) self.assertEqual(expected_ports, ret) - m_handler._get_service_ports.assert_called_once_with( + m_get_service_ports.assert_called_once_with( mock.sentinel.service) - def test_generate_lbaas_port_specs_udp(self): + @mock.patch('kuryr_kubernetes.utils.get_service_ports') + def test_generate_lbaas_port_specs_udp(self, m_get_service_ports): m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) - m_handler._get_service_ports.return_value = [ + m_get_service_ports.return_value = [ {'port': 1, 'name': 'X', 'protocol': 'TCP'}, {'port': 2, 'name': 'Y', 'protocol': 'UDP'} ] @@ -333,7 +282,7 @@ class TestLBaaSSpecHandler(test_base.TestCase): ret = h_lbaas.LBaaSSpecHandler._generate_lbaas_port_specs( m_handler, mock.sentinel.service) self.assertEqual(expected_ports, ret) - m_handler._get_service_ports.assert_called_once_with( + m_get_service_ports.assert_called_once_with( mock.sentinel.service) def test_set_lbaas_spec(self): @@ -657,30 +606,16 @@ class TestLoadBalancerHandler(test_base.TestCase): # REVISIT(ivc): ddt? m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler) m_handler._has_pods.return_value = True - m_handler._is_lbaas_spec_in_sync.return_value = True + m_handler._svc_handler_annotations_updated.return_value = True ret = h_lbaas.LoadBalancerHandler._should_ignore( m_handler, endpoints, lbaas_spec) self.assertEqual(False, ret) m_handler._has_pods.assert_called_once_with(endpoints) - m_handler._is_lbaas_spec_in_sync.assert_called_once_with( + m_handler._svc_handler_annotations_updated.assert_called_once_with( endpoints, lbaas_spec) - def test_is_lbaas_spec_in_sync(self): - names = ['a', 'b', 'c'] - endpoints = {'subsets': [{'ports': [{'name': n, 'port': 1} - for n in names]}]} - lbaas_spec = obj_lbaas.LBaaSServiceSpec(ports=[ - obj_lbaas.LBaaSPortSpec(name=n, targetPort=1) - for n in reversed(names)]) - - m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler) - ret = h_lbaas.LoadBalancerHandler._is_lbaas_spec_in_sync( - m_handler, endpoints, lbaas_spec) - - self.assertEqual(True, ret) - def test_has_pods(self): # REVISIT(ivc): ddt? endpoints = {'subsets': [ diff --git a/kuryr_kubernetes/tests/unit/test_object.py b/kuryr_kubernetes/tests/unit/test_object.py index b6d95a558..1ec788800 100644 --- a/kuryr_kubernetes/tests/unit/test_object.py +++ b/kuryr_kubernetes/tests/unit/test_object.py @@ -28,7 +28,7 @@ object_data = { 'LBaaSLoadBalancer': '1.3-8bc0a9bdbd160da67572aa38784378d1', 'LBaaSMember': '1.0-a770c6884c27d6d8c21186b27d0e2ccb', 'LBaaSPool': '1.1-6e77370d7632a902445444249eb77b01', - 'LBaaSPortSpec': '1.1-fcfa2fd07f4bc5619b96fa41bcdf6e23', + 'LBaaSPortSpec': '1.1-1b307f34630617086c7af70f2cb8b215', 'LBaaSPubIp': '1.0-83992edec2c60fb4ab8998ea42a4ff74', 'LBaaSRouteNotifEntry': '1.0-dd2f2be956f68814b1f47cb13483a885', 'LBaaSRouteNotifier': '1.0-f0bfd8e772434abe7557930d7e0180c1', diff --git a/kuryr_kubernetes/tests/unit/test_utils.py b/kuryr_kubernetes/tests/unit/test_utils.py index 74f26d3d7..f493f14b6 100644 --- a/kuryr_kubernetes/tests/unit/test_utils.py +++ b/kuryr_kubernetes/tests/unit/test_utils.py @@ -18,6 +18,7 @@ from oslo_config import cfg from kuryr_kubernetes import constants as k_const from kuryr_kubernetes import exceptions as k_exc +from kuryr_kubernetes.objects import lbaas as obj_lbaas from kuryr_kubernetes.objects import vif from kuryr_kubernetes.tests import base as test_base from kuryr_kubernetes.tests.unit import kuryr_fixtures as k_fix @@ -164,3 +165,53 @@ class TestUtils(test_base.TestCase): ret = utils.get_endpoints_link(service) expected_link = "/api/v1/namespaces/default/endpoints/test" self.assertEqual(expected_link, ret) + + def test_get_service_ports(self): + service = {'spec': {'ports': [ + {'port': 1, 'targetPort': 1}, + {'port': 2, 'name': 'X', 'protocol': 'UDP', 'targetPort': 2} + ]}} + expected_ret = [ + {'port': 1, 'name': None, 'protocol': 'TCP', 'targetPort': '1'}, + {'port': 2, 'name': 'X', 'protocol': 'UDP', 'targetPort': '2'}] + + ret = utils.get_service_ports(service) + self.assertEqual(expected_ret, ret) + + @mock.patch('kuryr_kubernetes.utils.get_service_ports') + def test_has_port_changes(self, m_get_service_ports): + service = mock.MagicMock() + m_get_service_ports.return_value = [ + {'port': 1, 'name': 'X', 'protocol': 'TCP', 'targetPort': 1}, + ] + + lbaas_spec = mock.MagicMock() + lbaas_spec.ports = [ + obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1, + targetPort=1), + obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2, + targetPort=2), + ] + + ret = utils.has_port_changes(service, lbaas_spec) + self.assertTrue(ret) + + @mock.patch('kuryr_kubernetes.utils.get_service_ports') + def test_has_port_changes__no_changes(self, m_get_service_ports): + service = mock.MagicMock() + m_get_service_ports.return_value = [ + {'port': 1, 'name': 'X', 'protocol': 'TCP', 'targetPort': '1'}, + {'port': 2, 'name': 'Y', 'protocol': 'TCP', 'targetPort': '2'} + ] + + lbaas_spec = mock.MagicMock() + lbaas_spec.ports = [ + obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1, + targetPort=1), + obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2, + targetPort=2), + ] + + ret = utils.has_port_changes(service, lbaas_spec) + + self.assertFalse(ret) diff --git a/kuryr_kubernetes/utils.py b/kuryr_kubernetes/utils.py index aa560e051..87c3cb1e1 100644 --- a/kuryr_kubernetes/utils.py +++ b/kuryr_kubernetes/utils.py @@ -263,3 +263,31 @@ def get_endpoints_link(service): link_parts[-2] = 'endpoints' return "/".join(link_parts) + + +def has_port_changes(service, lbaas_spec): + link = service['metadata']['selfLink'] + + fields = obj_lbaas.LBaaSPortSpec.fields + svc_port_set = {tuple(port[attr] for attr in fields) + for port in get_service_ports(service)} + + spec_port_set = {tuple(getattr(port, attr) + for attr in fields + if port.obj_attr_is_set(attr)) + for port in lbaas_spec.ports} + + if svc_port_set != spec_port_set: + LOG.debug("LBaaS spec ports %(spec_ports)s != %(svc_ports)s " + "for %(link)s" % {'spec_ports': spec_port_set, + 'svc_ports': svc_port_set, + 'link': link}) + return svc_port_set != spec_port_set + + +def get_service_ports(service): + return [{'name': port.get('name'), + 'protocol': port.get('protocol', 'TCP'), + 'port': port['port'], + 'targetPort': str(port['targetPort'])} + for port in service['spec']['ports']]