K8S-services: add support for UDP ports

With the support of UDP load balancing in Octavia [1],
Kuryr can handle also K8S-services with UDP ports.
This patch adds support for UDP listener's  protocol in
LBaaS driver, and in addition, handles the case in which
listener's protocol is not supported by Octavia.

Todos:
 * Update the UDP Services tests in kury-tempest
 * 'Howto' doc for UDP services verification

[1] https://developer.openstack.org/api-ref/load-balancer/v2/index.html#create-listener

Change-Id: I93a27af10e531250e07cf6abaa3c060d7090116f
Closes-Bug: 1792768
This commit is contained in:
Yossi Boaron 2018-10-04 14:54:16 +03:00 committed by Michał Dulko
parent d84ae550c6
commit 8f871ee234
3 changed files with 94 additions and 11 deletions

View File

@ -339,6 +339,14 @@ function configure_neutron_defaults {
--description "k8s service subnet allowed" \
--remote-ip "$service_cidr" --ethertype IPv4 --protocol tcp \
"$service_pod_access_sg_id"
# Since Octavia supports also UDP load balancing, we need to allow
# also udp traffic
openstack --os-cloud devstack-admin --os-region "$REGION_NAME" \
security group rule create --project "$project_id" \
--description "k8s service subnet UDP allowed" \
--remote-ip "$service_cidr" --ethertype IPv4 --protocol udp \
"$service_pod_access_sg_id"
if [[ "$use_octavia" == "True" && \
"$KURYR_K8S_OCTAVIA_MEMBER_MODE" == "L3" ]]; then
if [ -n "$sg_ids" ]; then
@ -368,6 +376,13 @@ function configure_neutron_defaults {
--description "k8s pod subnet allowed from k8s-pod-subnet" \
--remote-ip "$pod_cidr" --ethertype IPv4 --protocol tcp \
"$octavia_pod_access_sg_id"
# Since Octavia supports also UDP load balancing, we need to allow
# also udp traffic
openstack --os-cloud devstack-admin --os-region "$REGION_NAME" \
security group rule create --project "$project_id" \
--description "k8s pod subnet allowed from k8s-pod-subnet" \
--remote-ip "$pod_cidr" --ethertype IPv4 --protocol udp \
"$octavia_pod_access_sg_id"
if [ -n "$sg_ids" ]; then
sg_ids+=",${octavia_pod_access_sg_id}"
else

View File

@ -36,7 +36,12 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
_ACTIVATION_TIMEOUT = CONF.neutron_defaults.lbaas_activation_timeout
_SUPPORTED_LISTENER_PROT = ('HTTP', 'HTTPS', 'TCP')
_PROVIDER_SUPPORTED_LISTENER_PROT = {
'amphora': ['HTTP', 'HTTPS', 'TCP', 'UDP'],
'ovn': ['TCP', 'UDP'],
'haproxy': ['HTTP', 'HTTPS', 'TCP']}
_L7_POLICY_ACT_REDIRECT_TO_POOL = 'REDIRECT_TO_POOL'
# NOTE(yboaron):Prior to sending create request to Octavia, LBaaS driver
# verifies that LB is in a stable state by polling LB's provisioning_status
@ -254,20 +259,39 @@ class LBaaSv2Driver(base.LBaaSDriver):
def ensure_listener(self, loadbalancer, protocol, port,
service_type='ClusterIP'):
if protocol not in _SUPPORTED_LISTENER_PROT:
LOG.info("Protocol: %(prot)s: is not supported by LBaaSV2", {
'prot': protocol})
# NOTE(yboaron): Since retrieving Octavia capabilities/version is not
# supported via the OpenstackSdk, the list of allowed listener's
# protocols will be defined statically.
# Kuryr still need to handle the case in which listener's protocol
# (e.g: UDP) is not supported by Octavia.
provider = loadbalancer.provider or 'amphora'
try:
if protocol not in _PROVIDER_SUPPORTED_LISTENER_PROT[provider]:
LOG.info("Protocol: %(prot)s: is not supported by "
"%(provider)s",
{'prot': protocol, 'provider': provider})
return None
except KeyError:
LOG.info("Provider %(provider)s doesnt exist in "
"_PROVIDER_SUPPORTED_LISTENER_PROT",
{'provider': provider})
return None
name = "%s:%s:%s" % (loadbalancer.name, protocol, port)
listener = obj_lbaas.LBaaSListener(name=name,
project_id=loadbalancer.project_id,
loadbalancer_id=loadbalancer.id,
protocol=protocol,
port=port)
result = self._ensure_provisioned(loadbalancer, listener,
self._create_listener,
self._find_listener,
_LB_STS_POLL_SLOW_INTERVAL)
try:
result = self._ensure_provisioned(
loadbalancer, listener, self._create_listener,
self._find_listener, _LB_STS_POLL_SLOW_INTERVAL)
except n_exc.BadRequest:
LOG.info("Listener creation failed, most probably because "
"protocol %(prot)s is not supported", {'prot': protocol})
return None
self._ensure_security_group_rules(loadbalancer, result, service_type)

View File

@ -98,7 +98,25 @@ class TestLBaaSv2Driver(test_base.TestCase):
lbaas.lbaas_loadbalancer_path % loadbalancer.id,
params={'cascade': True})
def test_ensure_listener(self):
def test_ensure_listener_tcp(self):
self._test_ensure_listener('TCP')
def test_ensure_listener_udp(self):
self._test_ensure_listener('UDP')
def test_ensure_listener_unsupported_protocol(self):
self._test_ensure_listener('NOT_SUPPORTED')
def test_ensure_listener_ovn_tcp(self):
self._test_ensure_listener('TCP', 'ovn')
def test_ensure_listener_ovn_udp(self):
self._test_ensure_listener('UDP', 'ovn')
def test_ensure_listener_ovn_unsupported_protocol(self):
self._test_ensure_listener('HTTP', 'ovn')
def _test_ensure_listener(self, protocol, provider=None):
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
expected_resp = mock.sentinel.expected_resp
@ -107,21 +125,27 @@ class TestLBaaSv2Driver(test_base.TestCase):
subnet_id = 'D3FA400A-F543-4B91-9CD3-047AF0CE42D1'
ip = '1.2.3.4'
loadbalancer_id = '00EE9E11-91C2-41CF-8FD4-7970579E5C4C'
protocol = 'TCP'
port = 1234
loadbalancer = obj_lbaas.LBaaSLoadBalancer(
id=loadbalancer_id, name=name, project_id=project_id,
subnet_id=subnet_id, ip=ip)
subnet_id=subnet_id, ip=ip, provider=provider)
# TODO(ivc): handle security groups
m_driver._ensure_provisioned.return_value = expected_resp
resp = cls.ensure_listener(m_driver, loadbalancer,
protocol, port)
provider = loadbalancer.provider or 'amphora'
if (protocol not in
d_lbaasv2._PROVIDER_SUPPORTED_LISTENER_PROT[provider]):
self.assertIsNone(resp)
return
m_driver._ensure_provisioned.assert_called_once_with(
loadbalancer, mock.ANY, m_driver._create_listener,
m_driver._find_listener, d_lbaasv2._LB_STS_POLL_SLOW_INTERVAL)
listener = m_driver._ensure_provisioned.call_args[0][1]
self.assertEqual("%s:%s:%s" % (loadbalancer.name, protocol, port),
listener.name)
self.assertEqual(project_id, listener.project_id)
@ -130,6 +154,26 @@ class TestLBaaSv2Driver(test_base.TestCase):
self.assertEqual(port, listener.port)
self.assertEqual(expected_resp, resp)
def test_ensure_listener_bad_request_exception(self):
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
name = 'TEST_NAME'
project_id = 'TEST_PROJECT'
subnet_id = 'D3FA400A-F543-4B91-9CD3-047AF0CE42D1'
ip = '1.2.3.4'
loadbalancer_id = '00EE9E11-91C2-41CF-8FD4-7970579E5C4C'
port = 1234
protocol = 'TCP'
provider = 'amphora'
loadbalancer = obj_lbaas.LBaaSLoadBalancer(
id=loadbalancer_id, name=name, project_id=project_id,
subnet_id=subnet_id, ip=ip, provider=provider)
m_driver._ensure_provisioned.side_effect = n_exc.BadRequest
resp = cls.ensure_listener(m_driver, loadbalancer,
protocol, port)
self.assertIsNone(resp)
def test_release_listener(self):
neutron = self.useFixture(k_fix.MockNeutronClient()).client
lbaas = self.useFixture(k_fix.MockLBaaSClient()).client