Add support for svc with text targetPorts
This patch adds support for services that define the targetPort with text (port name), pointing to the same port number as the defined exposed port on the svc. Closes-Bug: 1818969 Change-Id: I7f957d292f7c4a43b759292e5bd04c4db704c4c4
This commit is contained in:
parent
a0197124c0
commit
dfa9a392f1
|
@ -118,33 +118,7 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler):
|
|||
|
||||
def _has_lbaas_spec_changes(self, service, lbaas_spec):
|
||||
return (self._has_ip_changes(service, lbaas_spec) or
|
||||
self._has_port_changes(service, lbaas_spec))
|
||||
|
||||
def _get_service_ports(self, service):
|
||||
return [{'name': port.get('name'),
|
||||
'protocol': port.get('protocol', 'TCP'),
|
||||
'port': port['port'],
|
||||
'targetPort': port['targetPort']}
|
||||
for port in service['spec']['ports']]
|
||||
|
||||
def _has_port_changes(self, service, lbaas_spec):
|
||||
link = service['metadata']['selfLink']
|
||||
|
||||
fields = obj_lbaas.LBaaSPortSpec.fields
|
||||
svc_port_set = {tuple(port[attr] for attr in fields)
|
||||
for port in self._get_service_ports(service)}
|
||||
|
||||
spec_port_set = {tuple(getattr(port, attr)
|
||||
for attr in fields
|
||||
if port.obj_attr_is_set(attr))
|
||||
for port in lbaas_spec.ports}
|
||||
|
||||
if svc_port_set != spec_port_set:
|
||||
LOG.debug("LBaaS spec ports %(spec_ports)s != %(svc_ports)s "
|
||||
"for %(link)s" % {'spec_ports': spec_port_set,
|
||||
'svc_ports': svc_port_set,
|
||||
'link': link})
|
||||
return svc_port_set != spec_port_set
|
||||
utils.has_port_changes(service, lbaas_spec))
|
||||
|
||||
def _has_ip_changes(self, service, lbaas_spec):
|
||||
link = service['metadata']['selfLink']
|
||||
|
@ -166,7 +140,7 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler):
|
|||
|
||||
def _generate_lbaas_port_specs(self, service):
|
||||
return [obj_lbaas.LBaaSPortSpec(**port)
|
||||
for port in self._get_service_ports(service)]
|
||||
for port in utils.get_service_ports(service)]
|
||||
|
||||
|
||||
class LoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||
|
@ -262,24 +236,25 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
|
|||
obj_lbaas.LBaaSServiceSpec())
|
||||
|
||||
def _should_ignore(self, endpoints, lbaas_spec):
|
||||
# NOTE(ltomasbo): we must wait until service handler has annotated the
|
||||
# endpoints to process them. Thus, if annotations are not updated to
|
||||
# match the endpoints information, we should skip the event
|
||||
return not(lbaas_spec and
|
||||
self._has_pods(endpoints) and
|
||||
self._is_lbaas_spec_in_sync(endpoints, lbaas_spec))
|
||||
self._svc_handler_annotations_updated(endpoints,
|
||||
lbaas_spec))
|
||||
|
||||
def _is_lbaas_spec_in_sync(self, endpoints, lbaas_spec):
|
||||
ports = lbaas_spec.ports
|
||||
ep_ports = list(set((port.get('name'), port.get('port'))
|
||||
if ports[0].obj_attr_is_set('targetPort')
|
||||
else port.get('name')
|
||||
for subset in endpoints.get('subsets', [])
|
||||
for port in subset.get('ports', [])))
|
||||
|
||||
spec_ports = [(port.name, port.targetPort)
|
||||
if port.obj_attr_is_set('targetPort')
|
||||
else port.name
|
||||
for port in ports]
|
||||
|
||||
return sorted(ep_ports) == sorted(spec_ports)
|
||||
def _svc_handler_annotations_updated(self, endpoints, lbaas_spec):
|
||||
svc_link = self._get_service_link(endpoints)
|
||||
k8s = clients.get_kubernetes_client()
|
||||
service = k8s.get(svc_link)
|
||||
if utils.has_port_changes(service, lbaas_spec):
|
||||
# NOTE(ltomasbo): Ensuring lbaas_spec annotated on the endpoints
|
||||
# is in sync with the service status, i.e., upon a service
|
||||
# modification it will ensure endpoint modifications are not
|
||||
# handled until the service handler has performed its annotations
|
||||
return False
|
||||
return True
|
||||
|
||||
def _has_pods(self, endpoints):
|
||||
ep_subsets = endpoints.get('subsets', [])
|
||||
|
|
|
@ -128,7 +128,7 @@ class LBaaSPortSpec(k_obj.KuryrK8sObjectBase):
|
|||
'name': obj_fields.StringField(nullable=True),
|
||||
'protocol': obj_fields.StringField(),
|
||||
'port': obj_fields.IntegerField(),
|
||||
'targetPort': obj_fields.IntegerField(),
|
||||
'targetPort': obj_fields.StringField(),
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -193,7 +193,8 @@ class TestLBaaSSpecHandler(test_base.TestCase):
|
|||
m_drv_sg.get_security_groups.assert_called_once_with(
|
||||
service, project_id)
|
||||
|
||||
def test_has_lbaas_spec_changes(self):
|
||||
@mock.patch('kuryr_kubernetes.utils.has_port_changes')
|
||||
def test_has_lbaas_spec_changes(self, m_port_changes):
|
||||
m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
|
||||
service = mock.sentinel.service
|
||||
lbaas_spec = mock.sentinel.lbaas_spec
|
||||
|
@ -201,65 +202,11 @@ class TestLBaaSSpecHandler(test_base.TestCase):
|
|||
for has_ip_changes in (True, False):
|
||||
for has_port_changes in (True, False):
|
||||
m_handler._has_ip_changes.return_value = has_ip_changes
|
||||
m_handler._has_port_changes.return_value = has_port_changes
|
||||
m_port_changes.return_value = has_port_changes
|
||||
ret = h_lbaas.LBaaSSpecHandler._has_lbaas_spec_changes(
|
||||
m_handler, service, lbaas_spec)
|
||||
self.assertEqual(has_ip_changes or has_port_changes, ret)
|
||||
|
||||
def test_get_service_ports(self):
|
||||
m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
|
||||
service = {'spec': {'ports': [
|
||||
{'port': 1, 'targetPort': 1},
|
||||
{'port': 2, 'name': 'X', 'protocol': 'UDP', 'targetPort': 2}
|
||||
]}}
|
||||
expected_ret = [
|
||||
{'port': 1, 'name': None, 'protocol': 'TCP', 'targetPort': 1},
|
||||
{'port': 2, 'name': 'X', 'protocol': 'UDP', 'targetPort': 2}]
|
||||
|
||||
ret = h_lbaas.LBaaSSpecHandler._get_service_ports(m_handler, service)
|
||||
self.assertEqual(expected_ret, ret)
|
||||
|
||||
def test_has_port_changes(self):
|
||||
m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
|
||||
m_service = mock.MagicMock()
|
||||
m_handler._get_service_ports.return_value = [
|
||||
{'port': 1, 'name': 'X', 'protocol': 'TCP', 'targetPort': 1},
|
||||
]
|
||||
|
||||
m_lbaas_spec = mock.MagicMock()
|
||||
m_lbaas_spec.ports = [
|
||||
obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1,
|
||||
targetPort=1),
|
||||
obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2,
|
||||
targetPort=2),
|
||||
]
|
||||
|
||||
ret = h_lbaas.LBaaSSpecHandler._has_port_changes(
|
||||
m_handler, m_service, m_lbaas_spec)
|
||||
|
||||
self.assertTrue(ret)
|
||||
|
||||
def test_has_port_changes__no_changes(self):
|
||||
m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
|
||||
m_service = mock.MagicMock()
|
||||
m_handler._get_service_ports.return_value = [
|
||||
{'port': 1, 'name': 'X', 'protocol': 'TCP', 'targetPort': 1},
|
||||
{'port': 2, 'name': 'Y', 'protocol': 'TCP', 'targetPort': 2}
|
||||
]
|
||||
|
||||
m_lbaas_spec = mock.MagicMock()
|
||||
m_lbaas_spec.ports = [
|
||||
obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1,
|
||||
targetPort=1),
|
||||
obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2,
|
||||
targetPort=2),
|
||||
]
|
||||
|
||||
ret = h_lbaas.LBaaSSpecHandler._has_port_changes(
|
||||
m_handler, m_service, m_lbaas_spec)
|
||||
|
||||
self.assertFalse(ret)
|
||||
|
||||
def test_has_ip_changes(self):
|
||||
m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
|
||||
m_service = mock.MagicMock()
|
||||
|
@ -302,9 +249,10 @@ class TestLBaaSSpecHandler(test_base.TestCase):
|
|||
m_handler, m_service, m_lbaas_spec)
|
||||
self.assertFalse(ret)
|
||||
|
||||
def test_generate_lbaas_port_specs(self):
|
||||
@mock.patch('kuryr_kubernetes.utils.get_service_ports')
|
||||
def test_generate_lbaas_port_specs(self, m_get_service_ports):
|
||||
m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
|
||||
m_handler._get_service_ports.return_value = [
|
||||
m_get_service_ports.return_value = [
|
||||
{'port': 1, 'name': 'X', 'protocol': 'TCP'},
|
||||
{'port': 2, 'name': 'Y', 'protocol': 'TCP'}
|
||||
]
|
||||
|
@ -316,12 +264,13 @@ class TestLBaaSSpecHandler(test_base.TestCase):
|
|||
ret = h_lbaas.LBaaSSpecHandler._generate_lbaas_port_specs(
|
||||
m_handler, mock.sentinel.service)
|
||||
self.assertEqual(expected_ports, ret)
|
||||
m_handler._get_service_ports.assert_called_once_with(
|
||||
m_get_service_ports.assert_called_once_with(
|
||||
mock.sentinel.service)
|
||||
|
||||
def test_generate_lbaas_port_specs_udp(self):
|
||||
@mock.patch('kuryr_kubernetes.utils.get_service_ports')
|
||||
def test_generate_lbaas_port_specs_udp(self, m_get_service_ports):
|
||||
m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
|
||||
m_handler._get_service_ports.return_value = [
|
||||
m_get_service_ports.return_value = [
|
||||
{'port': 1, 'name': 'X', 'protocol': 'TCP'},
|
||||
{'port': 2, 'name': 'Y', 'protocol': 'UDP'}
|
||||
]
|
||||
|
@ -333,7 +282,7 @@ class TestLBaaSSpecHandler(test_base.TestCase):
|
|||
ret = h_lbaas.LBaaSSpecHandler._generate_lbaas_port_specs(
|
||||
m_handler, mock.sentinel.service)
|
||||
self.assertEqual(expected_ports, ret)
|
||||
m_handler._get_service_ports.assert_called_once_with(
|
||||
m_get_service_ports.assert_called_once_with(
|
||||
mock.sentinel.service)
|
||||
|
||||
def test_set_lbaas_spec(self):
|
||||
|
@ -657,30 +606,16 @@ class TestLoadBalancerHandler(test_base.TestCase):
|
|||
# REVISIT(ivc): ddt?
|
||||
m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler)
|
||||
m_handler._has_pods.return_value = True
|
||||
m_handler._is_lbaas_spec_in_sync.return_value = True
|
||||
m_handler._svc_handler_annotations_updated.return_value = True
|
||||
|
||||
ret = h_lbaas.LoadBalancerHandler._should_ignore(
|
||||
m_handler, endpoints, lbaas_spec)
|
||||
self.assertEqual(False, ret)
|
||||
|
||||
m_handler._has_pods.assert_called_once_with(endpoints)
|
||||
m_handler._is_lbaas_spec_in_sync.assert_called_once_with(
|
||||
m_handler._svc_handler_annotations_updated.assert_called_once_with(
|
||||
endpoints, lbaas_spec)
|
||||
|
||||
def test_is_lbaas_spec_in_sync(self):
|
||||
names = ['a', 'b', 'c']
|
||||
endpoints = {'subsets': [{'ports': [{'name': n, 'port': 1}
|
||||
for n in names]}]}
|
||||
lbaas_spec = obj_lbaas.LBaaSServiceSpec(ports=[
|
||||
obj_lbaas.LBaaSPortSpec(name=n, targetPort=1)
|
||||
for n in reversed(names)])
|
||||
|
||||
m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler)
|
||||
ret = h_lbaas.LoadBalancerHandler._is_lbaas_spec_in_sync(
|
||||
m_handler, endpoints, lbaas_spec)
|
||||
|
||||
self.assertEqual(True, ret)
|
||||
|
||||
def test_has_pods(self):
|
||||
# REVISIT(ivc): ddt?
|
||||
endpoints = {'subsets': [
|
||||
|
|
|
@ -28,7 +28,7 @@ object_data = {
|
|||
'LBaaSLoadBalancer': '1.3-8bc0a9bdbd160da67572aa38784378d1',
|
||||
'LBaaSMember': '1.0-a770c6884c27d6d8c21186b27d0e2ccb',
|
||||
'LBaaSPool': '1.1-6e77370d7632a902445444249eb77b01',
|
||||
'LBaaSPortSpec': '1.1-fcfa2fd07f4bc5619b96fa41bcdf6e23',
|
||||
'LBaaSPortSpec': '1.1-1b307f34630617086c7af70f2cb8b215',
|
||||
'LBaaSPubIp': '1.0-83992edec2c60fb4ab8998ea42a4ff74',
|
||||
'LBaaSRouteNotifEntry': '1.0-dd2f2be956f68814b1f47cb13483a885',
|
||||
'LBaaSRouteNotifier': '1.0-f0bfd8e772434abe7557930d7e0180c1',
|
||||
|
|
|
@ -18,6 +18,7 @@ from oslo_config import cfg
|
|||
|
||||
from kuryr_kubernetes import constants as k_const
|
||||
from kuryr_kubernetes import exceptions as k_exc
|
||||
from kuryr_kubernetes.objects import lbaas as obj_lbaas
|
||||
from kuryr_kubernetes.objects import vif
|
||||
from kuryr_kubernetes.tests import base as test_base
|
||||
from kuryr_kubernetes.tests.unit import kuryr_fixtures as k_fix
|
||||
|
@ -164,3 +165,53 @@ class TestUtils(test_base.TestCase):
|
|||
ret = utils.get_endpoints_link(service)
|
||||
expected_link = "/api/v1/namespaces/default/endpoints/test"
|
||||
self.assertEqual(expected_link, ret)
|
||||
|
||||
def test_get_service_ports(self):
|
||||
service = {'spec': {'ports': [
|
||||
{'port': 1, 'targetPort': 1},
|
||||
{'port': 2, 'name': 'X', 'protocol': 'UDP', 'targetPort': 2}
|
||||
]}}
|
||||
expected_ret = [
|
||||
{'port': 1, 'name': None, 'protocol': 'TCP', 'targetPort': '1'},
|
||||
{'port': 2, 'name': 'X', 'protocol': 'UDP', 'targetPort': '2'}]
|
||||
|
||||
ret = utils.get_service_ports(service)
|
||||
self.assertEqual(expected_ret, ret)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.utils.get_service_ports')
|
||||
def test_has_port_changes(self, m_get_service_ports):
|
||||
service = mock.MagicMock()
|
||||
m_get_service_ports.return_value = [
|
||||
{'port': 1, 'name': 'X', 'protocol': 'TCP', 'targetPort': 1},
|
||||
]
|
||||
|
||||
lbaas_spec = mock.MagicMock()
|
||||
lbaas_spec.ports = [
|
||||
obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1,
|
||||
targetPort=1),
|
||||
obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2,
|
||||
targetPort=2),
|
||||
]
|
||||
|
||||
ret = utils.has_port_changes(service, lbaas_spec)
|
||||
self.assertTrue(ret)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.utils.get_service_ports')
|
||||
def test_has_port_changes__no_changes(self, m_get_service_ports):
|
||||
service = mock.MagicMock()
|
||||
m_get_service_ports.return_value = [
|
||||
{'port': 1, 'name': 'X', 'protocol': 'TCP', 'targetPort': '1'},
|
||||
{'port': 2, 'name': 'Y', 'protocol': 'TCP', 'targetPort': '2'}
|
||||
]
|
||||
|
||||
lbaas_spec = mock.MagicMock()
|
||||
lbaas_spec.ports = [
|
||||
obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1,
|
||||
targetPort=1),
|
||||
obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2,
|
||||
targetPort=2),
|
||||
]
|
||||
|
||||
ret = utils.has_port_changes(service, lbaas_spec)
|
||||
|
||||
self.assertFalse(ret)
|
||||
|
|
|
@ -263,3 +263,31 @@ def get_endpoints_link(service):
|
|||
link_parts[-2] = 'endpoints'
|
||||
|
||||
return "/".join(link_parts)
|
||||
|
||||
|
||||
def has_port_changes(service, lbaas_spec):
|
||||
link = service['metadata']['selfLink']
|
||||
|
||||
fields = obj_lbaas.LBaaSPortSpec.fields
|
||||
svc_port_set = {tuple(port[attr] for attr in fields)
|
||||
for port in get_service_ports(service)}
|
||||
|
||||
spec_port_set = {tuple(getattr(port, attr)
|
||||
for attr in fields
|
||||
if port.obj_attr_is_set(attr))
|
||||
for port in lbaas_spec.ports}
|
||||
|
||||
if svc_port_set != spec_port_set:
|
||||
LOG.debug("LBaaS spec ports %(spec_ports)s != %(svc_ports)s "
|
||||
"for %(link)s" % {'spec_ports': spec_port_set,
|
||||
'svc_ports': svc_port_set,
|
||||
'link': link})
|
||||
return svc_port_set != spec_port_set
|
||||
|
||||
|
||||
def get_service_ports(service):
|
||||
return [{'name': port.get('name'),
|
||||
'protocol': port.get('protocol', 'TCP'),
|
||||
'port': port['port'],
|
||||
'targetPort': str(port['targetPort'])}
|
||||
for port in service['spec']['ports']]
|
||||
|
|
Loading…
Reference in New Issue