Merge "Services: Gracefully ignore exposed UDP ports"
This commit is contained in:
commit
90b3cd6750
|
@ -24,9 +24,11 @@ be implemented in the following way:
|
||||||
default configuration (bottom)
|
default configuration (bottom)
|
||||||
|
|
||||||
If you are paying attention and are familiar with the `LBaaS API`_ you probably
|
If you are paying attention and are familiar with the `LBaaS API`_ you probably
|
||||||
noticed that we have separate pools for each exposed pool in a service. This is
|
noticed that we have separate pools for each exposed port in a service. This is
|
||||||
probably not optimal and we would probably benefit from keeping a single Neutron
|
probably not optimal and we would probably benefit from keeping a single Neutron
|
||||||
pool that lists each of the per port listeners.
|
pool that lists each of the per port listeners.
|
||||||
|
Since `LBaaS API`_ doesn't support UDP load balancing, service exported UDP
|
||||||
|
ports will be ignored.
|
||||||
|
|
||||||
When installing you can decide to use the legacy Neutron HAProxy driver for
|
When installing you can decide to use the legacy Neutron HAProxy driver for
|
||||||
LBaaSv2 or install and configure OpenStack Octavia, which as of Pike implements
|
LBaaSv2 or install and configure OpenStack Octavia, which as of Pike implements
|
||||||
|
|
|
@ -31,6 +31,7 @@ from kuryr_kubernetes.objects import lbaas as obj_lbaas
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
_ACTIVATION_TIMEOUT = 300
|
_ACTIVATION_TIMEOUT = 300
|
||||||
|
_SUPPORTED_LISTENER_PROT = ('HTTP', 'HTTPS', 'TCP')
|
||||||
|
|
||||||
|
|
||||||
class LBaaSv2Driver(base.LBaaSDriver):
|
class LBaaSv2Driver(base.LBaaSDriver):
|
||||||
|
@ -128,6 +129,10 @@ class LBaaSv2Driver(base.LBaaSDriver):
|
||||||
'for listener %s.', listener.name)
|
'for listener %s.', listener.name)
|
||||||
|
|
||||||
def ensure_listener(self, endpoints, loadbalancer, protocol, port):
|
def ensure_listener(self, endpoints, loadbalancer, protocol, port):
|
||||||
|
if protocol not in _SUPPORTED_LISTENER_PROT:
|
||||||
|
LOG.info("Protocol: %(prot)s: is not supported by LBaaSV2", {
|
||||||
|
'prot': protocol})
|
||||||
|
return None
|
||||||
name = "%(namespace)s/%(name)s" % endpoints['metadata']
|
name = "%(namespace)s/%(name)s" % endpoints['metadata']
|
||||||
name += ":%s:%s" % (protocol, port)
|
name += ":%s:%s" % (protocol, port)
|
||||||
listener = obj_lbaas.LBaaSListener(name=name,
|
listener = obj_lbaas.LBaaSListener(name=name,
|
||||||
|
|
|
@ -310,8 +310,18 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||||
pool_by_lsnr_port = {(lsnr_by_id[p.listener_id].protocol,
|
pool_by_lsnr_port = {(lsnr_by_id[p.listener_id].protocol,
|
||||||
lsnr_by_id[p.listener_id].port): p
|
lsnr_by_id[p.listener_id].port): p
|
||||||
for p in lbaas_state.pools}
|
for p in lbaas_state.pools}
|
||||||
pool_by_tgt_name = {p.name: pool_by_lsnr_port[p.protocol, p.port]
|
# NOTE(yboaron): Since LBaaSv2 doesn't support UDP load balancing,
|
||||||
for p in lbaas_spec.ports}
|
# the LBaaS driver will return 'None' in case of UDP port
|
||||||
|
# listener creation.
|
||||||
|
# we should consider the case in which
|
||||||
|
# 'pool_by_lsnr_port[p.protocol, p.port]' is missing
|
||||||
|
pool_by_tgt_name = {}
|
||||||
|
for p in lbaas_spec.ports:
|
||||||
|
try:
|
||||||
|
pool_by_tgt_name[p.name] = pool_by_lsnr_port[p.protocol,
|
||||||
|
p.port]
|
||||||
|
except KeyError:
|
||||||
|
continue
|
||||||
current_targets = {(str(m.ip), m.port) for m in lbaas_state.members}
|
current_targets = {(str(m.ip), m.port) for m in lbaas_state.members}
|
||||||
|
|
||||||
for subset in endpoints.get('subsets', []):
|
for subset in endpoints.get('subsets', []):
|
||||||
|
@ -463,8 +473,9 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||||
loadbalancer=lbaas_state.loadbalancer,
|
loadbalancer=lbaas_state.loadbalancer,
|
||||||
protocol=protocol,
|
protocol=protocol,
|
||||||
port=port)
|
port=port)
|
||||||
lbaas_state.listeners.append(listener)
|
if listener is not None:
|
||||||
changed = True
|
lbaas_state.listeners.append(listener)
|
||||||
|
changed = True
|
||||||
return changed
|
return changed
|
||||||
|
|
||||||
def _remove_unused_listeners(self, endpoints, lbaas_state, lbaas_spec):
|
def _remove_unused_listeners(self, endpoints, lbaas_state, lbaas_spec):
|
||||||
|
|
|
@ -26,6 +26,8 @@ from kuryr_kubernetes import exceptions as k_exc
|
||||||
from kuryr_kubernetes.objects import lbaas as obj_lbaas
|
from kuryr_kubernetes.objects import lbaas as obj_lbaas
|
||||||
from kuryr_kubernetes.tests import base as test_base
|
from kuryr_kubernetes.tests import base as test_base
|
||||||
|
|
||||||
|
_SUPPORTED_LISTENER_PROT = ('HTTP', 'HTTPS', 'TCP')
|
||||||
|
|
||||||
|
|
||||||
class TestLBaaSSpecHandler(test_base.TestCase):
|
class TestLBaaSSpecHandler(test_base.TestCase):
|
||||||
|
|
||||||
|
@ -304,6 +306,23 @@ class TestLBaaSSpecHandler(test_base.TestCase):
|
||||||
m_handler._get_service_ports.assert_called_once_with(
|
m_handler._get_service_ports.assert_called_once_with(
|
||||||
mock.sentinel.service)
|
mock.sentinel.service)
|
||||||
|
|
||||||
|
def test_generate_lbaas_port_specs_udp(self):
|
||||||
|
m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
|
||||||
|
m_handler._get_service_ports.return_value = [
|
||||||
|
{'port': 1, 'name': 'X', 'protocol': 'TCP'},
|
||||||
|
{'port': 2, 'name': 'Y', 'protocol': 'UDP'}
|
||||||
|
]
|
||||||
|
expected_ports = [
|
||||||
|
obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1),
|
||||||
|
obj_lbaas.LBaaSPortSpec(name='Y', protocol='UDP', port=2),
|
||||||
|
]
|
||||||
|
|
||||||
|
ret = h_lbaas.LBaaSSpecHandler._generate_lbaas_port_specs(
|
||||||
|
m_handler, mock.sentinel.service)
|
||||||
|
self.assertEqual(expected_ports, ret)
|
||||||
|
m_handler._get_service_ports.assert_called_once_with(
|
||||||
|
mock.sentinel.service)
|
||||||
|
|
||||||
def test_get_endpoints_link(self):
|
def test_get_endpoints_link(self):
|
||||||
m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
|
m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
|
||||||
service = {'metadata': {
|
service = {'metadata': {
|
||||||
|
@ -330,6 +349,7 @@ class TestLBaaSSpecHandler(test_base.TestCase):
|
||||||
|
|
||||||
|
|
||||||
class FakeLBaaSDriver(drv_base.LBaaSDriver):
|
class FakeLBaaSDriver(drv_base.LBaaSDriver):
|
||||||
|
|
||||||
def ensure_loadbalancer(self, endpoints, project_id, subnet_id, ip,
|
def ensure_loadbalancer(self, endpoints, project_id, subnet_id, ip,
|
||||||
security_groups_ids, service_type):
|
security_groups_ids, service_type):
|
||||||
name = str(ip)
|
name = str(ip)
|
||||||
|
@ -340,6 +360,9 @@ class FakeLBaaSDriver(drv_base.LBaaSDriver):
|
||||||
id=uuidutils.generate_uuid())
|
id=uuidutils.generate_uuid())
|
||||||
|
|
||||||
def ensure_listener(self, endpoints, loadbalancer, protocol, port):
|
def ensure_listener(self, endpoints, loadbalancer, protocol, port):
|
||||||
|
if protocol not in _SUPPORTED_LISTENER_PROT:
|
||||||
|
return None
|
||||||
|
|
||||||
name = "%s:%s:%s" % (loadbalancer.name, protocol, port)
|
name = "%s:%s:%s" % (loadbalancer.name, protocol, port)
|
||||||
return obj_lbaas.LBaaSListener(name=name,
|
return obj_lbaas.LBaaSListener(name=name,
|
||||||
project_id=loadbalancer.project_id,
|
project_id=loadbalancer.project_id,
|
||||||
|
@ -554,13 +577,14 @@ class TestLoadBalancerHandler(test_base.TestCase):
|
||||||
pools=list(pools.values()),
|
pools=list(pools.values()),
|
||||||
members=list(members.values()))
|
members=list(members.values()))
|
||||||
|
|
||||||
def _generate_lbaas_spec(self, vip, targets, project_id, subnet_id):
|
def _generate_lbaas_spec(self, vip, targets, project_id,
|
||||||
|
subnet_id, prot='TCP'):
|
||||||
return obj_lbaas.LBaaSServiceSpec(
|
return obj_lbaas.LBaaSServiceSpec(
|
||||||
ip=vip,
|
ip=vip,
|
||||||
project_id=project_id,
|
project_id=project_id,
|
||||||
subnet_id=subnet_id,
|
subnet_id=subnet_id,
|
||||||
ports=[obj_lbaas.LBaaSPortSpec(name=str(port),
|
ports=[obj_lbaas.LBaaSPortSpec(name=str(port),
|
||||||
protocol='TCP',
|
protocol=prot,
|
||||||
port=port)
|
port=port)
|
||||||
for port in set(t[0] for t in targets.values())])
|
for port in set(t[0] for t in targets.values())])
|
||||||
|
|
||||||
|
@ -592,6 +616,34 @@ class TestLoadBalancerHandler(test_base.TestCase):
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def _sync_lbaas_members_impl(self, m_get_drv_lbaas, m_get_drv_project,
|
||||||
|
m_get_drv_subnets, subnet_id, project_id,
|
||||||
|
endpoints, state, spec):
|
||||||
|
m_drv_lbaas = mock.Mock(wraps=FakeLBaaSDriver())
|
||||||
|
m_drv_project = mock.Mock()
|
||||||
|
m_drv_project.get_project.return_value = project_id
|
||||||
|
m_drv_subnets = mock.Mock()
|
||||||
|
m_drv_subnets.get_subnets.return_value = {
|
||||||
|
subnet_id: mock.sentinel.subnet}
|
||||||
|
m_get_drv_lbaas.return_value = m_drv_lbaas
|
||||||
|
m_get_drv_project.return_value = m_drv_project
|
||||||
|
m_get_drv_subnets.return_value = m_drv_subnets
|
||||||
|
|
||||||
|
handler = h_lbaas.LoadBalancerHandler()
|
||||||
|
|
||||||
|
with mock.patch.object(handler, '_get_pod_subnet') as m_get_pod_subnet:
|
||||||
|
m_get_pod_subnet.return_value = subnet_id
|
||||||
|
handler._sync_lbaas_members(endpoints, state, spec)
|
||||||
|
|
||||||
|
lsnrs = {lsnr.id: lsnr for lsnr in state.listeners}
|
||||||
|
pools = {pool.id: pool for pool in state.pools}
|
||||||
|
observed_targets = sorted(
|
||||||
|
(str(member.ip), (
|
||||||
|
lsnrs[pools[member.pool_id].listener_id].port,
|
||||||
|
member.port))
|
||||||
|
for member in state.members)
|
||||||
|
return observed_targets
|
||||||
|
|
||||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||||
'.PodSubnetsDriver.get_instance')
|
'.PodSubnetsDriver.get_instance')
|
||||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||||
|
@ -619,32 +671,47 @@ class TestLoadBalancerHandler(test_base.TestCase):
|
||||||
spec = self._generate_lbaas_spec(expected_ip, expected_targets,
|
spec = self._generate_lbaas_spec(expected_ip, expected_targets,
|
||||||
project_id, subnet_id)
|
project_id, subnet_id)
|
||||||
|
|
||||||
m_drv_lbaas = mock.Mock(wraps=FakeLBaaSDriver())
|
observed_targets = self._sync_lbaas_members_impl(
|
||||||
m_drv_project = mock.Mock()
|
m_get_drv_lbaas, m_get_drv_project, m_get_drv_subnets,
|
||||||
m_drv_project.get_project.return_value = project_id
|
subnet_id, project_id, endpoints, state, spec)
|
||||||
m_drv_subnets = mock.Mock()
|
|
||||||
m_drv_subnets.get_subnets.return_value = {
|
|
||||||
subnet_id: mock.sentinel.subnet}
|
|
||||||
m_get_drv_lbaas.return_value = m_drv_lbaas
|
|
||||||
m_get_drv_project.return_value = m_drv_project
|
|
||||||
m_get_drv_subnets.return_value = m_drv_subnets
|
|
||||||
|
|
||||||
handler = h_lbaas.LoadBalancerHandler()
|
|
||||||
|
|
||||||
with mock.patch.object(handler, '_get_pod_subnet') as m_get_pod_subnet:
|
|
||||||
m_get_pod_subnet.return_value = subnet_id
|
|
||||||
handler._sync_lbaas_members(endpoints, state, spec)
|
|
||||||
|
|
||||||
lsnrs = {lsnr.id: lsnr for lsnr in state.listeners}
|
|
||||||
pools = {pool.id: pool for pool in state.pools}
|
|
||||||
observed_targets = sorted(
|
|
||||||
(str(member.ip), (
|
|
||||||
lsnrs[pools[member.pool_id].listener_id].port,
|
|
||||||
member.port))
|
|
||||||
for member in state.members)
|
|
||||||
self.assertEqual(sorted(expected_targets.items()), observed_targets)
|
self.assertEqual(sorted(expected_targets.items()), observed_targets)
|
||||||
self.assertEqual(expected_ip, str(state.loadbalancer.ip))
|
self.assertEqual(expected_ip, str(state.loadbalancer.ip))
|
||||||
|
|
||||||
|
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||||
|
'.PodSubnetsDriver.get_instance')
|
||||||
|
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||||
|
'.PodProjectDriver.get_instance')
|
||||||
|
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||||
|
'.LBaaSDriver.get_instance')
|
||||||
|
def test_sync_lbaas_members_udp(self, m_get_drv_lbaas,
|
||||||
|
m_get_drv_project, m_get_drv_subnets):
|
||||||
|
# REVISIT(ivc): test methods separately and verify ensure/release
|
||||||
|
project_id = uuidutils.generate_uuid()
|
||||||
|
subnet_id = uuidutils.generate_uuid()
|
||||||
|
current_ip = '1.1.1.1'
|
||||||
|
current_targets = {
|
||||||
|
'1.1.1.101': (1001, 10001),
|
||||||
|
'1.1.1.111': (1001, 10001),
|
||||||
|
'1.1.1.201': (2001, 20001)}
|
||||||
|
expected_ip = '2.2.2.2'
|
||||||
|
expected_targets = {
|
||||||
|
'2.2.2.101': (1201, 12001),
|
||||||
|
'2.2.2.111': (1201, 12001),
|
||||||
|
'2.2.2.201': (2201, 22001)}
|
||||||
|
endpoints = self._generate_endpoints(expected_targets)
|
||||||
|
state = self._generate_lbaas_state(
|
||||||
|
current_ip, current_targets, project_id, subnet_id)
|
||||||
|
spec = self._generate_lbaas_spec(expected_ip, expected_targets,
|
||||||
|
project_id, subnet_id, 'UDP')
|
||||||
|
|
||||||
|
observed_targets = self._sync_lbaas_members_impl(
|
||||||
|
m_get_drv_lbaas, m_get_drv_project, m_get_drv_subnets,
|
||||||
|
subnet_id, project_id, endpoints, state, spec)
|
||||||
|
|
||||||
|
self.assertEqual([], observed_targets)
|
||||||
|
self.assertEqual(expected_ip, str(state.loadbalancer.ip))
|
||||||
|
|
||||||
def test_get_lbaas_spec(self):
|
def test_get_lbaas_spec(self):
|
||||||
self.skipTest("skipping until generalised annotation handling is "
|
self.skipTest("skipping until generalised annotation handling is "
|
||||||
"implemented")
|
"implemented")
|
||||||
|
|
Loading…
Reference in New Issue