Add SCTP support in Amphora

Add SCTP support in the Amphora (with keepalived).
Add amphora-health-checker script for customized SCTP health checks
(INIT/INIT-ACK/ABORT).

Change-Id: I30997ae6cc6b8ec724f0e9dcfdfe49356b320ff4
Story: 2007884
Task: 40932
This commit is contained in:
Gregory Thiemonge 2020-06-26 10:24:51 +02:00
parent 673a5691c1
commit 2888f44e7b
44 changed files with 1319 additions and 326 deletions

View File

@ -164,7 +164,7 @@ title=type - SCTP
status=optional
notes=Use SCTP for the health monitor.
cli=openstack loadbalancer healthmonitor create --type SCTP <pool>
driver.amphora=missing
driver.amphora=complete
driver.ovn=missing
[operation.url_path]

View File

@ -235,7 +235,7 @@ title=protocol - SCTP
status=optional
notes=SCTP protocol support for the listener.
cli=openstack loadbalancer listener create --protocol SCTP <loadbalancer>
driver.amphora=missing
driver.amphora=complete
driver.ovn=missing
[operation.protocol_port]

View File

@ -151,7 +151,7 @@ title=protocol - SCTP
status=optional
notes=SCTP protocol support for the pool.
cli=openstack loadbalancer pool create --protocol SCTP --listener <listener>
driver.amphora=missing
driver.amphora=complete
driver.ovn=missing
[operation.session_persistence.APP_COOKIE]

View File

@ -24,6 +24,9 @@ ln -s $AMP_VENV/bin/amphora-agent /usr/local/bin/amphora-agent || true
# Also link out the vrrp check script(s) so they're in PATH for keepalived
ln -s $AMP_VENV/bin/haproxy-vrrp-* /usr/local/bin/ || true
# Link heath checker script
ln -s $AMP_VENV/bin/amphora-health-checker /usr/local/bin/amphora-health-checker || true
mkdir /etc/octavia
# we assume certs, etc will come in through the config drive
mkdir /etc/octavia/certs

View File

@ -36,10 +36,14 @@ if [ "$1" == "add" ]; then
nft add table ip octavia-ipv4
nft add chain ip octavia-ipv4 ip-udp-masq { type nat hook postrouting priority 100\;}
nft add rule ip octavia-ipv4 ip-udp-masq oifname "$3" meta l4proto udp masquerade
nft add chain ip octavia-ipv4 ip-sctp-masq { type nat hook postrouting priority 100\;}
nft add rule ip octavia-ipv4 ip-sctp-masq oifname "$3" meta l4proto sctp masquerade
elif [ "$2" == "ipv6" ]; then
nft add table ip6 octavia-ipv6
nft add chain ip6 octavia-ipv6 ip6-udp-masq { type nat hook postrouting priority 100\;}
nft add rule ip6 octavia-ipv6 ip6-udp-masq oifname "$3" meta l4proto udp masquerade
nft add chain ip6 octavia-ipv6 ip6-sctp-masq { type nat hook postrouting priority 100\;}
nft add rule ip6 octavia-ipv6 ip6-sctp-masq oifname "$3" meta l4proto sctp masquerade
else
usage
fi
@ -47,8 +51,10 @@ if [ "$1" == "add" ]; then
else # nft not found, fall back to iptables
if [ "$2" == "ipv4" ]; then
/sbin/iptables -t nat -A POSTROUTING -p udp -o $3 -j MASQUERADE
/sbin/iptables -t nat -A POSTROUTING -p sctp -o $3 -j MASQUERADE
elif [ "$2" == "ipv6" ]; then
/sbin/ip6tables -t nat -A POSTROUTING -p udp -o $3 -j MASQUERADE
/sbin/ip6tables -t nat -A POSTROUTING -p sctp -o $3 -j MASQUERADE
else
usage
fi
@ -60,9 +66,13 @@ elif [ "$1" == "delete" ]; then
if [ "$2" == "ipv4" ]; then
nft flush chain ip octavia-ipv4 ip-udp-masq
nft delete chain ip octavia-ipv4 ip-udp-masq
nft flush chain ip octavia-ipv4 ip-sctp-masq
nft delete chain ip octavia-ipv4 ip-sctp-masq
elif [ "$2" == "ipv6" ]; then
nft flush chain ip6 octavia-ipv6 ip-udp-masq
nft delete chain ip6 octavia-ipv6 ip-udp-masq
nft flush chain ip6 octavia-ipv6 ip-sctp-masq
nft delete chain ip6 octavia-ipv6 ip-sctp-masq
else
usage
fi
@ -70,8 +80,10 @@ elif [ "$1" == "delete" ]; then
else # nft not found, fall back to iptables
if [ "$2" == "ipv4" ]; then
/sbin/iptables -t nat -D POSTROUTING -p udp -o $3 -j MASQUERADE
/sbin/iptables -t nat -D POSTROUTING -p sctp -o $3 -j MASQUERADE
elif [ "$2" == "ipv6" ]; then
/sbin/ip6tables -t nat -D POSTROUTING -p udp -o $3 -j MASQUERADE
/sbin/ip6tables -t nat -D POSTROUTING -p sctp -o $3 -j MASQUERADE
else
usage
fi

View File

@ -457,7 +457,7 @@
# Minimum TLS protocol, eg: TLS, TLSv1.1, TLSv1.2, TLSv1.3 (if available)
# agent_tls_protocol = TLSv1.2
# Amphora default UDP driver is keepalived_lvs
# This setting is deprecated. Amphora default UDP driver is keepalived_lvs
#
# amphora_udp_driver = keepalived_lvs

View File

@ -31,11 +31,11 @@ class AmphoraInfo(object):
def __init__(self, osutils):
self._osutils = osutils
def compile_amphora_info(self, extend_udp_driver=None):
def compile_amphora_info(self, extend_lvs_driver=None):
extend_body = {}
if extend_udp_driver:
extend_body = self._get_extend_body_from_udp_driver(
extend_udp_driver)
if extend_lvs_driver:
extend_body = self._get_extend_body_from_lvs_driver(
extend_lvs_driver)
body = {'hostname': socket.gethostname(),
'haproxy_version':
self._get_version_of_installed_package('haproxy'),
@ -44,17 +44,18 @@ class AmphoraInfo(object):
body.update(extend_body)
return webob.Response(json=body)
def compile_amphora_details(self, extend_udp_driver=None):
def compile_amphora_details(self, extend_lvs_driver=None):
haproxy_listener_list = sorted(util.get_listeners())
extend_body = {}
udp_listener_list = []
if extend_udp_driver:
udp_listener_list = util.get_udp_listeners()
extend_data = self._get_extend_body_from_udp_driver(
extend_udp_driver)
udp_count = self._count_udp_listener_processes(extend_udp_driver,
udp_listener_list)
extend_body['udp_listener_process_count'] = udp_count
lvs_listener_list = []
if extend_lvs_driver:
lvs_listener_list = util.get_lvs_listeners()
extend_data = self._get_extend_body_from_lvs_driver(
extend_lvs_driver)
lvs_count = self._count_lvs_listener_processes(
extend_lvs_driver,
lvs_listener_list)
extend_body['lvs_listener_process_count'] = lvs_count
extend_body.update(extend_data)
meminfo = self._get_meminfo()
cpu = self._cpu()
@ -87,8 +88,8 @@ class AmphoraInfo(object):
'topology': consts.TOPOLOGY_SINGLE,
'topology_status': consts.TOPOLOGY_STATUS_OK,
'listeners': sorted(list(
set(haproxy_listener_list + udp_listener_list)))
if udp_listener_list else haproxy_listener_list,
set(haproxy_listener_list + lvs_listener_list)))
if lvs_listener_list else haproxy_listener_list,
'packages': {}}
if extend_body:
body.update(extend_body)
@ -108,16 +109,16 @@ class AmphoraInfo(object):
num += 1
return num
def _count_udp_listener_processes(self, udp_driver, listener_list):
def _count_lvs_listener_processes(self, lvs_driver, listener_list):
num = 0
for listener_id in listener_list:
if udp_driver.is_listener_running(listener_id):
if lvs_driver.is_listener_running(listener_id):
# optional check if it's still running
num += 1
return num
def _get_extend_body_from_udp_driver(self, extend_udp_driver):
extend_info = extend_udp_driver.get_subscribed_amp_compile_info()
def _get_extend_body_from_lvs_driver(self, extend_lvs_driver):
extend_info = extend_lvs_driver.get_subscribed_amp_compile_info()
extend_data = {}
for extend in extend_info:
package_version = self._get_version_of_installed_package(extend)

View File

@ -26,7 +26,7 @@ import webob
from werkzeug import exceptions
from octavia.amphorae.backends.agent.api_server import loadbalancer
from octavia.amphorae.backends.agent.api_server import udp_listener_base
from octavia.amphorae.backends.agent.api_server import lvs_listener_base
from octavia.amphorae.backends.agent.api_server import util
from octavia.common import constants as consts
@ -45,11 +45,11 @@ check_script_file_template = j2_env.get_template(
consts.KEEPALIVED_CHECK_SCRIPT)
class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
class KeepalivedLvs(lvs_listener_base.LvsListenerApiServerBase):
_SUBSCRIBED_AMP_COMPILE = ['keepalived', 'ipvsadm']
def upload_udp_listener_config(self, listener_id):
def upload_lvs_listener_config(self, listener_id):
stream = loadbalancer.Wrapped(flask.request.stream)
NEED_CHECK = True
@ -175,7 +175,7 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
res.headers['ETag'] = stream.get_md5()
return res
def _check_udp_listener_exists(self, listener_id):
def _check_lvs_listener_exists(self, listener_id):
if not os.path.exists(util.keepalived_lvs_cfg_path(listener_id)):
raise exceptions.HTTPException(
response=webob.Response(json=dict(
@ -183,18 +183,18 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
details="No UDP listener with UUID: {0}".format(
listener_id)), status=404))
def get_udp_listener_config(self, listener_id):
def get_lvs_listener_config(self, listener_id):
"""Gets the keepalivedlvs config
:param listener_id: the id of the listener
"""
self._check_udp_listener_exists(listener_id)
self._check_lvs_listener_exists(listener_id)
with open(util.keepalived_lvs_cfg_path(listener_id), 'r') as file:
cfg = file.read()
resp = webob.Response(cfg, content_type='text/plain')
return resp
def manage_udp_listener(self, listener_id, action):
def manage_lvs_listener(self, listener_id, action):
action = action.lower()
if action not in [consts.AMP_ACTION_START,
consts.AMP_ACTION_STOP,
@ -210,9 +210,9 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
if action == consts.AMP_ACTION_RELOAD:
action = consts.AMP_ACTION_RESTART
self._check_udp_listener_exists(listener_id)
self._check_lvs_listener_exists(listener_id)
if action == consts.AMP_ACTION_RELOAD:
if consts.OFFLINE == self._check_udp_listener_status(listener_id):
if consts.OFFLINE == self._check_lvs_listener_status(listener_id):
action = consts.AMP_ACTION_START
cmd = ("/usr/sbin/service "
@ -236,7 +236,7 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
action=action)),
status=202)
def _check_udp_listener_status(self, listener_id):
def _check_lvs_listener_status(self, listener_id):
if os.path.exists(util.keepalived_lvs_pids_path(listener_id)[0]):
if os.path.exists(os.path.join(
'/proc', util.get_keepalivedlvs_pid(listener_id))):
@ -251,25 +251,25 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
return consts.ERROR
return consts.OFFLINE
def get_all_udp_listeners_status(self):
def get_all_lvs_listeners_status(self):
"""Gets the status of all UDP listeners
Gets the status of all UDP listeners on the amphora.
"""
listeners = list()
for udp_listener in util.get_udp_listeners():
status = self._check_udp_listener_status(udp_listener)
for lvs_listener in util.get_lvs_listeners():
status = self._check_lvs_listener_status(lvs_listener)
listeners.append({
'status': status,
'uuid': udp_listener,
'uuid': lvs_listener,
'type': 'UDP',
})
return listeners
def delete_udp_listener(self, listener_id):
def delete_lvs_listener(self, listener_id):
try:
self._check_udp_listener_exists(listener_id)
self._check_lvs_listener_exists(listener_id)
except exceptions.HTTPException:
return webob.Response(json={'message': 'OK'})

View File

@ -16,38 +16,25 @@
import abc
from oslo_config import cfg
from stevedore import driver as stevedore_driver
CONF = cfg.CONF
UDP_SERVER_NAMESPACE = 'octavia.amphora.udp_api_server'
class UdpListenerApiServerBase(object, metaclass=abc.ABCMeta):
"""Base UDP Listener Server API
class LvsListenerApiServerBase(object, metaclass=abc.ABCMeta):
"""Base LVS Listener Server API
"""
_SUBSCRIBED_AMP_COMPILE = []
SERVER_INSTANCE = None
@classmethod
def get_server_driver(cls):
if not cls.SERVER_INSTANCE:
cls.SERVER_INSTANCE = stevedore_driver.DriverManager(
namespace=UDP_SERVER_NAMESPACE,
name=CONF.amphora_agent.amphora_udp_driver,
invoke_on_load=True,
).driver
return cls.SERVER_INSTANCE
def get_subscribed_amp_compile_info(self):
return self._SUBSCRIBED_AMP_COMPILE
@abc.abstractmethod
def upload_udp_listener_config(self, listener_id):
"""Upload the configuration for UDP.
def upload_lvs_listener_config(self, listener_id):
"""Upload the configuration for LVS.
:param listener_id: The id of a UDP Listener
:param listener_id: The id of a LVS Listener
:returns: HTTP response with status code.
:raises Exception: If any file / directory is not found or
@ -56,10 +43,10 @@ class UdpListenerApiServerBase(object, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def get_udp_listener_config(self, listener_id):
"""Gets the UDP Listener configuration details
def get_lvs_listener_config(self, listener_id):
"""Gets the LVS Listener configuration details
:param listener_id: the id of the UDP Listener
:param listener_id: the id of the LVS Listener
:returns: HTTP response with status code.
:raises Exception: If the listener is failed to find.
@ -67,10 +54,10 @@ class UdpListenerApiServerBase(object, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def manage_udp_listener(self, listener_id, action):
"""Gets the UDP Listener configuration details
def manage_lvs_listener(self, listener_id, action):
"""Gets the LVS Listener configuration details
:param listener_id: the id of the UDP Listener
:param listener_id: the id of the LVS Listener
:param action: the operation type.
:returns: HTTP response with status code.
@ -79,21 +66,21 @@ class UdpListenerApiServerBase(object, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def get_all_udp_listeners_status(self):
"""Gets the status of all UDP Listeners
def get_all_lvs_listeners_status(self):
"""Gets the status of all LVS Listeners
This method will not consult the stats socket
so a listener might show as ACTIVE but still be
in ERROR
:returns: a list of UDP Listener status
:returns: a list of LVS Listener status
:raises Exception: If the listener pid located directory is not exist
"""
@abc.abstractmethod
def delete_udp_listener(self, listener_id):
"""Delete a UDP Listener from a amphora
def delete_lvs_listener(self, listener_id):
"""Delete a LVS Listener from a amphora
:param listener_id: The id of the listener

View File

@ -25,11 +25,13 @@ from octavia.amphorae.backends.agent import api_server
from octavia.amphorae.backends.agent.api_server import amphora_info
from octavia.amphorae.backends.agent.api_server import certificate_update
from octavia.amphorae.backends.agent.api_server import keepalived
from octavia.amphorae.backends.agent.api_server import keepalivedlvs
from octavia.amphorae.backends.agent.api_server import loadbalancer
from octavia.amphorae.backends.agent.api_server import osutils
from octavia.amphorae.backends.agent.api_server import plug
from octavia.amphorae.backends.agent.api_server import udp_listener_base
from octavia.amphorae.backends.agent.api_server import util
from octavia.common import constants as consts
BUFFER = 1024
CONF = cfg.CONF
@ -56,8 +58,7 @@ class Server(object):
self._osutils = osutils.BaseOS.get_os_util()
self._keepalived = keepalived.Keepalived()
self._loadbalancer = loadbalancer.Loadbalancer()
self._udp_listener = (udp_listener_base.UdpListenerApiServerBase.
get_server_driver())
self._lvs_listener = keepalivedlvs.KeepalivedLvs()
self._plug = plug.Plug(self._osutils)
self._amphora_info = amphora_info.AmphoraInfo(self._osutils)
@ -69,18 +70,22 @@ class Server(object):
'/loadbalancer/<amphora_id>/<lb_id>/haproxy',
view_func=self.upload_haproxy_config,
methods=['PUT'])
# TODO(gthiemonge) rename 'udp_listener' endpoint to 'lvs_listener'
# when api_version is bumped
self.app.add_url_rule(rule=PATH_PREFIX +
'/listeners/<amphora_id>/<listener_id>'
'/udp_listener',
view_func=self.upload_udp_listener_config,
view_func=self.upload_lvs_listener_config,
methods=['PUT'])
self.app.add_url_rule(rule=PATH_PREFIX +
'/loadbalancer/<lb_id>/haproxy',
view_func=self.get_haproxy_config,
methods=['GET'])
# TODO(gthiemonge) rename 'udp_listener' endpoint to 'lvs_listener'
# when api_version is bumped
self.app.add_url_rule(rule=PATH_PREFIX +
'/listeners/<listener_id>/udp_listener',
view_func=self.get_udp_listener_config,
view_func=self.get_lvs_listener_config,
methods=['GET'])
self.app.add_url_rule(rule=PATH_PREFIX +
'/loadbalancer/<object_id>/<action>',
@ -134,40 +139,40 @@ class Server(object):
def upload_haproxy_config(self, amphora_id, lb_id):
return self._loadbalancer.upload_haproxy_config(amphora_id, lb_id)
def upload_udp_listener_config(self, amphora_id, listener_id):
return self._udp_listener.upload_udp_listener_config(listener_id)
def upload_lvs_listener_config(self, amphora_id, listener_id):
return self._lvs_listener.upload_lvs_listener_config(listener_id)
def get_haproxy_config(self, lb_id):
return self._loadbalancer.get_haproxy_config(lb_id)
def get_udp_listener_config(self, listener_id):
return self._udp_listener.get_udp_listener_config(listener_id)
def get_lvs_listener_config(self, listener_id):
return self._lvs_listener.get_lvs_listener_config(listener_id)
def start_stop_lb_object(self, object_id, action):
protocol = util.get_protocol_for_lb_object(object_id)
if protocol == 'UDP':
return self._udp_listener.manage_udp_listener(
backend = util.get_backend_for_lb_object(object_id)
if backend == consts.LVS_BACKEND:
return self._lvs_listener.manage_lvs_listener(
listener_id=object_id, action=action)
return self._loadbalancer.start_stop_lb(lb_id=object_id, action=action)
def delete_lb_object(self, object_id):
protocol = util.get_protocol_for_lb_object(object_id)
if protocol == 'UDP':
return self._udp_listener.delete_udp_listener(object_id)
backend = util.get_backend_for_lb_object(object_id)
if backend == consts.LVS_BACKEND:
return self._lvs_listener.delete_lvs_listener(object_id)
return self._loadbalancer.delete_lb(object_id)
def get_details(self):
return self._amphora_info.compile_amphora_details(
extend_udp_driver=self._udp_listener)
extend_lvs_driver=self._lvs_listener)
def get_info(self):
return self._amphora_info.compile_amphora_info(
extend_udp_driver=self._udp_listener)
extend_lvs_driver=self._lvs_listener)
def get_all_listeners_status(self):
udp_listeners = self._udp_listener.get_all_udp_listeners_status()
lvs_listeners = self._lvs_listener.get_all_lvs_listeners_status()
return self._loadbalancer.get_all_listeners_status(
other_listeners=udp_listeners)
other_listeners=lvs_listeners)
def upload_certificate(self, lb_id, filename):
return self._loadbalancer.upload_certificate(lb_id, filename)

View File

@ -33,8 +33,8 @@ up route add -net {{ hr.network }} gw {{ hr.gw }} dev {{ interface }}
down route del -net {{ hr.network }} gw {{ hr.gw }} dev {{ interface }}
{%- endif %}
{%- endfor %}
post-up /usr/local/bin/udp-masquerade.sh add {{ 'ipv6' if ipv6 else 'ipv4' }} {{ interface }}
post-down /usr/local/bin/udp-masquerade.sh delete {{ 'ipv6' if ipv6 else 'ipv4' }} {{ interface }}
post-up /usr/local/bin/lvs-masquerade.sh add {{ 'ipv6' if ipv6 else 'ipv4' }} {{ interface }}
post-down /usr/local/bin/lvs-masquerade.sh delete {{ 'ipv6' if ipv6 else 'ipv4' }} {{ interface }}
{%- else %}
iface {{ interface }} inet dhcp
auto {{ interface }}:0

View File

@ -78,5 +78,5 @@ post-up /sbin/ip {{ '-6 ' if vip_ipv6 }}rule add from {{ vip }}/{{ '128' if vip_
post-down /sbin/ip {{ '-6 ' if vip_ipv6 }}rule del from {{ vip }}/{{ '128' if vip_ipv6 else '32' }} table 1 priority 100
{%- endif %}
post-up /usr/local/bin/udp-masquerade.sh add {{ 'ipv6' if vip_ipv6 else 'ipv4' }} {{ interface }}
post-down /usr/local/bin/udp-masquerade.sh delete {{ 'ipv6' if vip_ipv6 else 'ipv4' }} {{ interface }}
post-up /usr/local/bin/lvs-masquerade.sh add {{ 'ipv6' if vip_ipv6 else 'ipv4' }} {{ interface }}
post-down /usr/local/bin/lvs-masquerade.sh delete {{ 'ipv6' if vip_ipv6 else 'ipv4' }} {{ interface }}

View File

@ -14,6 +14,6 @@
#!/bin/bash
if [[ "$1" != "lo" ]]
then
/usr/local/bin/udp-masquerade.sh delete ipv4 $1
/usr/local/bin/udp-masquerade.sh delete ipv6 $1
/usr/local/bin/lvs-masquerade.sh delete ipv4 $1
/usr/local/bin/lvs-masquerade.sh delete ipv6 $1
fi

View File

@ -14,6 +14,6 @@
#!/bin/bash
if [[ "$1" != "lo" ]]
then
/usr/local/bin/udp-masquerade.sh add ipv4 $1
/usr/local/bin/udp-masquerade.sh add ipv6 $1
/usr/local/bin/lvs-masquerade.sh add ipv4 $1
/usr/local/bin/lvs-masquerade.sh add ipv6 $1
fi

View File

@ -204,7 +204,7 @@ def is_lb_running(lb_id):
os.path.join('/proc', get_haproxy_pid(lb_id)))
def get_udp_listeners():
def get_lvs_listeners():
result = []
if os.path.exists(keepalived_lvs_dir()):
for f in os.listdir(keepalived_lvs_dir()):
@ -216,7 +216,7 @@ def get_udp_listeners():
return result
def is_udp_listener_running(listener_id):
def is_lvs_listener_running(listener_id):
pid_file = keepalived_lvs_pids_path(listener_id)[0]
return os.path.exists(pid_file) and os.path.exists(
os.path.join('/proc', get_keepalivedlvs_pid(listener_id)))
@ -275,20 +275,20 @@ def run_systemctl_command(command, service):
'err': e, 'out': e.output})
def get_protocol_for_lb_object(object_id):
"""Returns the L4 protocol for a listener.
def get_backend_for_lb_object(object_id):
"""Returns the backend for a listener.
If the listener is a TCP based listener (haproxy) return TCP.
If the listener is a UDP based listener (lvs) return UDP.
If the listener is a TCP based listener return 'HAPROXY'.
If the listener is a UDP or SCTP based listener return 'LVS'
If the listener is not identifiable, return None.
:param listener_id: The ID of the listener to identify.
:returns: TCP, UDP, or None
:returns: HAPROXY_BACKEND, LVS_BACKEND or None
"""
if os.path.exists(config_path(object_id)):
return consts.PROTOCOL_TCP
return consts.HAPROXY_BACKEND
if os.path.exists(keepalived_lvs_cfg_path(object_id)):
return consts.PROTOCOL_UDP
return consts.LVS_BACKEND
return None
@ -341,10 +341,10 @@ def vrrp_check_script_update(lb_id, action):
os.makedirs(keepalived_check_scripts_dir(), exist_ok=True)
lb_ids = get_loadbalancers()
udp_ids = get_udp_listeners()
lvs_ids = get_lvs_listeners()
# If no LBs are found, so make sure keepalived thinks haproxy is down.
if not lb_ids:
if not udp_ids:
if not lvs_ids:
with open(haproxy_check_script_path(), 'w') as text_file:
text_file.write('exit 1')
else:

View File

@ -241,18 +241,19 @@ def build_stats_message():
"members": pool['members']}
# UDP listener part
udp_listener_ids = util.get_udp_listeners()
if udp_listener_ids:
listeners_stats = keepalivedlvs_query.get_udp_listeners_stats()
lvs_listener_ids = util.get_lvs_listeners()
if lvs_listener_ids:
listeners_stats = keepalivedlvs_query.get_lvs_listeners_stats()
if listeners_stats:
for listener_id, listener_stats in listeners_stats.items():
delta_values = calculate_stats_deltas(
listener_id, listener_stats['stats'])
pool_status = keepalivedlvs_query.get_udp_listener_pool_status(
listener_id)
udp_listener_dict = dict()
udp_listener_dict['status'] = listener_stats['status']
udp_listener_dict['stats'] = {
pool_status = (
keepalivedlvs_query.get_lvs_listener_pool_status(
listener_id))
lvs_listener_dict = dict()
lvs_listener_dict['status'] = listener_stats['status']
lvs_listener_dict['stats'] = {
'tx': delta_values['bout'],
'rx': delta_values['bin'],
'conns': listener_stats['stats']['scur'],
@ -265,6 +266,6 @@ def build_stats_message():
"status": pool_status['lvs']['status'],
"members": pool_status['lvs']['members']
}
msg['listeners'][listener_id] = udp_listener_dict
msg['listeners'][listener_id] = lvs_listener_dict
persist_counters()
return msg

View File

@ -15,6 +15,7 @@ import os
import re
import subprocess
from octavia_lib.common import constants as lib_consts
from oslo_log import log as logging
from octavia.amphorae.backends.agent.api_server import util
@ -91,10 +92,12 @@ def get_listener_realserver_mapping(ns_name, listener_ip_port,
if 'RemoteAddress:Port' in line:
result_keys = re.split(r'\s+',
LVS_KEY_REGEX.findall(line)[0].strip())
elif line.startswith('UDP') and find_target_block:
elif ((line.startswith(constants.PROTOCOL_UDP) or
line.startswith(lib_consts.PROTOCOL_SCTP)) and
find_target_block):
break
elif line.startswith('UDP') and re.match(r'^UDP\s+%s\s+\w+' % idex,
line):
elif re.match(r'^(UDP|SCTP)\s+%s\s+\w+' % idex,
line):
find_target_block = True
elif find_target_block and line:
rs_is_ipv4 = True
@ -134,7 +137,7 @@ def get_listener_realserver_mapping(ns_name, listener_ip_port,
return find_target_block, actual_member_result
def get_udp_listener_resource_ipports_nsname(listener_id):
def get_lvs_listener_resource_ipports_nsname(listener_id):
# resource_ipport_mapping = {'Listener': {'id': listener-id,
# 'ipport': ipport},
# 'Pool': {'id': pool-id},
@ -162,7 +165,7 @@ def get_udp_listener_resource_ipports_nsname(listener_id):
if not listener_ip_port:
# If not get listener_ip_port from the lvs config file,
# that means the udp listener's default pool have no enabled member
# that means the listener's default pool have no enabled member
# yet. But at this moment, we can get listener_id and ns_name, so
# for this function, we will just return ns_name
return resource_ipport_mapping, ns_name
@ -225,9 +228,9 @@ def get_udp_listener_resource_ipports_nsname(listener_id):
return resource_ipport_mapping, ns_name
def get_udp_listener_pool_status(listener_id):
def get_lvs_listener_pool_status(listener_id):
(resource_ipport_mapping,
ns_name) = get_udp_listener_resource_ipports_nsname(listener_id)
ns_name) = get_lvs_listener_resource_ipports_nsname(listener_id)
if 'Pool' not in resource_ipport_mapping:
return {}
if 'Members' not in resource_ipport_mapping:
@ -340,7 +343,8 @@ def get_ipvsadm_info(ns_name, is_stats_cmd=False):
fields.extend(split_line(output[line_num]))
fields.extend(temp_fields)
# here we get the all fields
elif constants.PROTOCOL_UDP in output[line_num]:
elif (constants.PROTOCOL_UDP in output[line_num] or
lib_consts.PROTOCOL_SCTP in output[line_num]):
# if UDP/TCP in this line, we can know this line is
# VS configuration.
vs_values = split_line(output[line_num])
@ -372,11 +376,11 @@ def get_ipvsadm_info(ns_name, is_stats_cmd=False):
return value_mapping
def get_udp_listeners_stats():
udp_listener_ids = util.get_udp_listeners()
def get_lvs_listeners_stats():
lvs_listener_ids = util.get_lvs_listeners()
need_check_listener_ids = [
listener_id for listener_id in udp_listener_ids
if util.is_udp_listener_running(listener_id)]
listener_id for listener_id in lvs_listener_ids
if util.is_lvs_listener_running(listener_id)]
ipport_mapping = dict()
listener_stats_res = dict()
for check_listener_id in need_check_listener_ids:
@ -388,8 +392,8 @@ def get_udp_listeners_stats():
# {'id': member-id-2,
# 'ipport': ipport}],
# 'HealthMonitor': {'id': healthmonitor-id}}
(resource_ipport_mapping,
ns_name) = get_udp_listener_resource_ipports_nsname(check_listener_id)
resource_ipport_mapping, ns_name = (
get_lvs_listener_resource_ipports_nsname(check_listener_id))
# Listener is disabled, we don't need to send an update
if resource_ipport_mapping is None:

View File

@ -75,7 +75,7 @@ class HaproxyAmphoraLoadBalancerDriver(
base_crt_dir=CONF.haproxy_amphora.base_cert_dir,
haproxy_template=CONF.haproxy_amphora.haproxy_template,
connection_logging=CONF.haproxy_amphora.connection_logging)
self.udp_jinja = jinja_udp_cfg.LvsJinjaTemplater()
self.lvs_jinja = jinja_udp_cfg.LvsJinjaTemplater()
def _get_haproxy_versions(self, amphora):
"""Get major and minor version number from haproxy
@ -154,9 +154,9 @@ class HaproxyAmphoraLoadBalancerDriver(
for listener in loadbalancer.listeners:
LOG.debug("%s updating listener %s on amphora %s",
self.__class__.__name__, listener.id, amphora.id)
if listener.protocol == 'UDP':
if listener.protocol in consts.LVS_PROTOCOLS:
# Generate Keepalived LVS configuration from listener object
config = self.udp_jinja.build_config(listener=listener)
config = self.lvs_jinja.build_config(listener=listener)
self.clients[amphora.api_version].upload_udp_config(
amphora, listener.id, config, timeout_dict=timeout_dict)
self.clients[amphora.api_version].reload_listener(
@ -240,7 +240,7 @@ class HaproxyAmphoraLoadBalancerDriver(
if amp.status != consts.DELETED:
# Generate Keepalived LVS configuration from listener object
self._populate_amphora_api_version(amp)
config = self.udp_jinja.build_config(listener=listener)
config = self.lvs_jinja.build_config(listener=listener)
self.clients[amp.api_version].upload_udp_config(
amp, listener.id, config)
self.clients[amp.api_version].reload_listener(
@ -282,7 +282,7 @@ class HaproxyAmphoraLoadBalancerDriver(
'process mode.', amp.id, loadbalancer.id)
has_tcp = False
for listener in loadbalancer.listeners:
if listener.protocol == consts.PROTOCOL_UDP:
if listener.protocol in consts.LVS_PROTOCOLS:
getattr(self.clients[amp.api_version], func_name)(
amp, listener.id, *args)
else:
@ -298,10 +298,10 @@ class HaproxyAmphoraLoadBalancerDriver(
self._apply('start_listener', loadbalancer, amphora, timeout_dict)
def delete(self, listener):
# Delete any UDP listeners the old way (we didn't update the way they
# are configured)
# Delete any UDP/SCTP listeners the old way (we didn't update the way
# they are configured)
loadbalancer = listener.load_balancer
if listener.protocol == consts.PROTOCOL_UDP:
if listener.protocol in consts.LVS_PROTOCOLS:
for amp in loadbalancer.amphorae:
if amp.status != consts.DELETED:
self._populate_amphora_api_version(amp)
@ -309,7 +309,7 @@ class HaproxyAmphoraLoadBalancerDriver(
amp, listener.id)
return
# In case the listener is not UDP, things get more complicated.
# In case the listener is not UDP or SCTP, things get more complicated.
# We need to do this individually for each amphora in case some are
# using split config and others are using combined config.
for amp in loadbalancer.amphorae:
@ -353,11 +353,11 @@ class HaproxyAmphoraLoadBalancerDriver(
amphora, listener.load_balancer.id,
'{id}.pem'.format(id=cert_id))
# See how many non-UDP listeners we have left
non_udp_listener_count = len([
# See how many non-UDP/SCTP listeners we have left
non_lvs_listener_count = len([
1 for li in listener.load_balancer.listeners
if li.protocol != consts.PROTOCOL_UDP])
if non_udp_listener_count > 0:
if li.protocol not in consts.LVS_PROTOCOLS])
if non_lvs_listener_count > 0:
# We have other listeners, so just update is fine.
# TODO(rm_work): This is a little inefficient since this duplicates
# a lot of the detection logic that has already been done, but it
@ -980,6 +980,7 @@ class AmphoraAPIClient1_0(AmphoraAPIClientBase):
timeout_dict=timeout_dict)
return exc.check_exception(r, log_error=log_error).json()
# The function is used for all LVS-supported protocol listener (UDP, SCTP)
def upload_udp_config(self, amp, listener_id, config, timeout_dict=None):
r = self.put(
amp,

View File

@ -52,6 +52,7 @@ AMPHORA_SUPPORTED_PROTOCOLS = [
lib_consts.PROTOCOL_PROXY,
lib_consts.PROTOCOL_PROXYV2,
lib_consts.PROTOCOL_UDP,
lib_consts.PROTOCOL_SCTP,
]
@ -289,8 +290,8 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
LOG.info("Member batch update is a noop, returning early.")
def _validate_members(self, db_pool, members):
if db_pool.protocol == consts.PROTOCOL_UDP:
# For UDP LBs, check that we are not mixing IPv4 and IPv6
if db_pool.protocol in consts.LVS_PROTOCOLS:
# For SCTP/UDP LBs, check that we are not mixing IPv4 and IPv6
for member in members:
member_is_ipv6 = utils.is_ipv6(member.address)
@ -300,8 +301,8 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
if member_is_ipv6 != vip_is_ipv6:
msg = ("This provider doesn't support mixing IPv4 and "
"IPv6 addresses for its VIP and members in UDP "
"load balancers.")
"IPv6 addresses for its VIP and members in {} "
"load balancers.".format(db_pool.protocol))
raise exceptions.UnsupportedOptionError(
user_fault_string=msg,
operator_fault_string=msg)

View File

@ -53,6 +53,7 @@ AMPHORA_SUPPORTED_PROTOCOLS = [
lib_consts.PROTOCOL_PROXY,
lib_consts.PROTOCOL_PROXYV2,
lib_consts.PROTOCOL_UDP,
lib_consts.PROTOCOL_SCTP,
]
@ -313,8 +314,8 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
LOG.info("Member batch update is a noop, returning early.")
def _validate_members(self, db_pool, members):
if db_pool.protocol == consts.PROTOCOL_UDP:
# For UDP LBs, check that we are not mixing IPv4 and IPv6
if db_pool.protocol in consts.LVS_PROTOCOLS:
# For SCTP/UDP LBs, check that we are not mixing IPv4 and IPv6
for member in members:
member_is_ipv6 = utils.is_ipv6(member.address)
@ -324,8 +325,8 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
if member_is_ipv6 != vip_is_ipv6:
msg = ("This provider doesn't support mixing IPv4 and "
"IPv6 addresses for its VIP and members in UDP "
"load balancers.")
"IPv6 addresses for its VIP and members in {} "
"load balancers.".format(db_pool.protocol))
raise exceptions.UnsupportedOptionError(
user_fault_string=msg,
operator_fault_string=msg)

View File

@ -0,0 +1,268 @@
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import struct
import sys
import time
import random
# Adapted from https://opendev.org/openstack/os-ken/src/branch/
# master/os_ken/lib/packet/sctp.py
def crc32c(data):
# from RFC 3309
crc_c = [
0x00000000, 0xF26B8303, 0xE13B70F7, 0x1350F3F4,
0xC79A971F, 0x35F1141C, 0x26A1E7E8, 0xD4CA64EB,
0x8AD958CF, 0x78B2DBCC, 0x6BE22838, 0x9989AB3B,
0x4D43CFD0, 0xBF284CD3, 0xAC78BF27, 0x5E133C24,
0x105EC76F, 0xE235446C, 0xF165B798, 0x030E349B,
0xD7C45070, 0x25AFD373, 0x36FF2087, 0xC494A384,
0x9A879FA0, 0x68EC1CA3, 0x7BBCEF57, 0x89D76C54,
0x5D1D08BF, 0xAF768BBC, 0xBC267848, 0x4E4DFB4B,
0x20BD8EDE, 0xD2D60DDD, 0xC186FE29, 0x33ED7D2A,
0xE72719C1, 0x154C9AC2, 0x061C6936, 0xF477EA35,
0xAA64D611, 0x580F5512, 0x4B5FA6E6, 0xB93425E5,
0x6DFE410E, 0x9F95C20D, 0x8CC531F9, 0x7EAEB2FA,
0x30E349B1, 0xC288CAB2, 0xD1D83946, 0x23B3BA45,
0xF779DEAE, 0x05125DAD, 0x1642AE59, 0xE4292D5A,
0xBA3A117E, 0x4851927D, 0x5B016189, 0xA96AE28A,
0x7DA08661, 0x8FCB0562, 0x9C9BF696, 0x6EF07595,
0x417B1DBC, 0xB3109EBF, 0xA0406D4B, 0x522BEE48,
0x86E18AA3, 0x748A09A0, 0x67DAFA54, 0x95B17957,
0xCBA24573, 0x39C9C670, 0x2A993584, 0xD8F2B687,
0x0C38D26C, 0xFE53516F, 0xED03A29B, 0x1F682198,
0x5125DAD3, 0xA34E59D0, 0xB01EAA24, 0x42752927,
0x96BF4DCC, 0x64D4CECF, 0x77843D3B, 0x85EFBE38,
0xDBFC821C, 0x2997011F, 0x3AC7F2EB, 0xC8AC71E8,
0x1C661503, 0xEE0D9600, 0xFD5D65F4, 0x0F36E6F7,
0x61C69362, 0x93AD1061, 0x80FDE395, 0x72966096,
0xA65C047D, 0x5437877E, 0x4767748A, 0xB50CF789,
0xEB1FCBAD, 0x197448AE, 0x0A24BB5A, 0xF84F3859,
0x2C855CB2, 0xDEEEDFB1, 0xCDBE2C45, 0x3FD5AF46,
0x7198540D, 0x83F3D70E, 0x90A324FA, 0x62C8A7F9,
0xB602C312, 0x44694011, 0x5739B3E5, 0xA55230E6,
0xFB410CC2, 0x092A8FC1, 0x1A7A7C35, 0xE811FF36,
0x3CDB9BDD, 0xCEB018DE, 0xDDE0EB2A, 0x2F8B6829,
0x82F63B78, 0x709DB87B, 0x63CD4B8F, 0x91A6C88C,
0x456CAC67, 0xB7072F64, 0xA457DC90, 0x563C5F93,
0x082F63B7, 0xFA44E0B4, 0xE9141340, 0x1B7F9043,
0xCFB5F4A8, 0x3DDE77AB, 0x2E8E845F, 0xDCE5075C,
0x92A8FC17, 0x60C37F14, 0x73938CE0, 0x81F80FE3,
0x55326B08, 0xA759E80B, 0xB4091BFF, 0x466298FC,
0x1871A4D8, 0xEA1A27DB, 0xF94AD42F, 0x0B21572C,
0xDFEB33C7, 0x2D80B0C4, 0x3ED04330, 0xCCBBC033,
0xA24BB5A6, 0x502036A5, 0x4370C551, 0xB11B4652,
0x65D122B9, 0x97BAA1BA, 0x84EA524E, 0x7681D14D,
0x2892ED69, 0xDAF96E6A, 0xC9A99D9E, 0x3BC21E9D,
0xEF087A76, 0x1D63F975, 0x0E330A81, 0xFC588982,
0xB21572C9, 0x407EF1CA, 0x532E023E, 0xA145813D,
0x758FE5D6, 0x87E466D5, 0x94B49521, 0x66DF1622,
0x38CC2A06, 0xCAA7A905, 0xD9F75AF1, 0x2B9CD9F2,
0xFF56BD19, 0x0D3D3E1A, 0x1E6DCDEE, 0xEC064EED,
0xC38D26C4, 0x31E6A5C7, 0x22B65633, 0xD0DDD530,
0x0417B1DB, 0xF67C32D8, 0xE52CC12C, 0x1747422F,
0x49547E0B, 0xBB3FFD08, 0xA86F0EFC, 0x5A048DFF,
0x8ECEE914, 0x7CA56A17, 0x6FF599E3, 0x9D9E1AE0,
0xD3D3E1AB, 0x21B862A8, 0x32E8915C, 0xC083125F,
0x144976B4, 0xE622F5B7, 0xF5720643, 0x07198540,
0x590AB964, 0xAB613A67, 0xB831C993, 0x4A5A4A90,
0x9E902E7B, 0x6CFBAD78, 0x7FAB5E8C, 0x8DC0DD8F,
0xE330A81A, 0x115B2B19, 0x020BD8ED, 0xF0605BEE,
0x24AA3F05, 0xD6C1BC06, 0xC5914FF2, 0x37FACCF1,
0x69E9F0D5, 0x9B8273D6, 0x88D28022, 0x7AB90321,
0xAE7367CA, 0x5C18E4C9, 0x4F48173D, 0xBD23943E,
0xF36E6F75, 0x0105EC76, 0x12551F82, 0xE03E9C81,
0x34F4F86A, 0xC69F7B69, 0xD5CF889D, 0x27A40B9E,
0x79B737BA, 0x8BDCB4B9, 0x988C474D, 0x6AE7C44E,
0xBE2DA0A5, 0x4C4623A6, 0x5F16D052, 0xAD7D5351,
]
crc32 = 0xffffffff
for c in data:
crc32 = (crc32 >> 8) ^ crc_c[(crc32 ^ (c)) & 0xFF]
crc32 = (~crc32) & 0xffffffff
return struct.unpack(">I", struct.pack("<I", crc32))[0]
def _sctp_build_init_packet(src_port, dest_port, tag):
sctp_hdr = [
# HEADER
struct.pack('!H', src_port), # Source port number
struct.pack('!H', dest_port), # Destination port number
struct.pack('!L', 0), # Verification tag
struct.pack('!L', 0), # checksum
# INIT Chunk
struct.pack('!B', 1), # Type
struct.pack('!B', 0), # Chunk flag
struct.pack('!H', 0), # Chunk length
struct.pack('!L', tag), # Tag
struct.pack('!L', 106496), # a_rwnd
struct.pack('!H', 10), # Number of outbound stream
struct.pack('!H', 65535), # Number of inbound stream
struct.pack('!L',
random.randint(
1, 4294967295)), # Initial TSN
]
data = bytearray(b''.join(sctp_hdr))
chunk_len = len(data) - 12
struct.pack_into('!H', data, 14, chunk_len)
checksum = crc32c(data)
struct.pack_into('!L', data, 8, checksum)
return data
def _sctp_build_abort_packet(src_port, dest_port, verification_tag):
sctp_hdr = [
# HEADER
struct.pack('!H', src_port), # Source port number
struct.pack('!H', dest_port), # Destination port number
struct.pack('!L', verification_tag), # Verification tag
struct.pack('!L', 0), # checksum
# ABORT Chunk
struct.pack('!B', 6), # Type
struct.pack('!B', 1), # Chunk flag
struct.pack('!H', 4), # Chunk length
]
data = bytearray(b''.join(sctp_hdr))
checksum = crc32c(data)
struct.pack_into('!L', data, 8, checksum)
return data
def _sctp_decode_packet(data, family, expected_tag):
# AF_INET packets contain ipv4 header
if family == socket.AF_INET:
hdr_offset = (data[0] & 0xf) << 2
else:
hdr_offset = 0
if len(data) - hdr_offset < 16:
return False
# Check if the packet is a reply to our INIT packet
verification_tag = struct.unpack_from('!L', data, hdr_offset + 4)[0]
if verification_tag != expected_tag:
return False
response_type = data[hdr_offset + 12]
return response_type
def sctp_health_check(ip_address, port, timeout=2):
family = socket.AF_INET6 if ':' in ip_address else socket.AF_INET
s = socket.socket(family, socket.SOCK_RAW, socket.IPPROTO_SCTP)
s.settimeout(.2)
s.connect((ip_address, port))
tag = random.randint(1, 4294967295)
src_port = random.randint(1024, 65535)
data = _sctp_build_init_packet(src_port, port, tag)
print("Sending INIT packet to {}:{}".format(ip_address, port))
s.send(data)
start = time.time()
send_abort = False
while time.time() - start < timeout:
try:
buf, addr = s.recvfrom(1500)
except socket.timeout:
continue
response_type = _sctp_decode_packet(buf, family, tag)
if response_type is False:
continue
if response_type == 2: # INIT ACK
print("Received INIT ACK")
send_abort = True
ret = 0
elif response_type == 6: # ABORT
print("Received ABORT")
ret = 1
else: # Others: unknown error
print("Received {} Type chunk".format(response_type))
send_abort = True
ret = 3
break
else:
print("Timeout after {} seconds.".format(timeout))
# Timeout
ret = 2
if send_abort:
has_sctp_support = False
with open("/proc/net/protocols") as fp:
for line in fp:
if line.startswith('SCTP'):
has_sctp_support = True
# if SCTP support is not included in the kernel, closing the socket
# won't automatically send a ABORT packet, we need to craft it.
if not has_sctp_support:
data = _sctp_build_abort_packet(src_port, port, tag)
print("Sending ABORT packet")
s.send(data)
s.close()
return ret
def main():
default_timeout = 3
args = sys.argv[1:]
protocol = args.pop(0)
if args[0] == '-t':
args.pop(0)
default_timeout = int(args.pop(0))
if protocol.lower() == 'sctp':
destination = args[0]
port = int(args[1])
ret = sctp_health_check(destination, port, timeout=default_timeout)
sys.exit(ret)
else:
print("Unsupported protocol '{}'".format(protocol))
sys.exit(1)
if __name__ == '__main__':
main()

View File

@ -210,7 +210,11 @@ amphora_agent_opts = [
cfg.StrOpt('amphora_id', help=_("The amphora ID.")),
cfg.StrOpt('amphora_udp_driver',
default='keepalived_lvs',
help='The UDP API backend for amphora agent.'),
help='The UDP API backend for amphora agent.',
deprecated_for_removal=True,
deprecated_reason=_('amphora-agent will not support any other '
'backend than keepalived_lvs.'),
deprecated_since='Wallaby'),
]
compute_opts = [

View File

@ -807,6 +807,12 @@ TOPIC_AMPHORA_V2 = 'octavia_provisioning_v2'
HAPROXY_HTTP_PROTOCOLS = [lib_consts.PROTOCOL_HTTP,
lib_consts.PROTOCOL_TERMINATED_HTTPS]
LVS_PROTOCOLS = [PROTOCOL_UDP,
lib_consts.PROTOCOL_SCTP]
HAPROXY_BACKEND = 'HAPROXY'
LVS_BACKEND = 'LVS'
# Map each supported protocol to its L4 protocol
L4_PROTOCOL_MAP = {
PROTOCOL_TCP: PROTOCOL_TCP,

View File

@ -179,7 +179,7 @@ class JinjaTemplater(object):
"""
listener_transforms = []
for listener in listeners:
if listener.protocol == constants.PROTOCOL_UDP:
if listener.protocol in constants.LVS_PROTOCOLS:
continue
listener_transforms.append(self._transform_listener(
listener, tls_certs, feature_compatibility, loadbalancer))
@ -199,7 +199,7 @@ class JinjaTemplater(object):
# listeners' connection limits.
connection_limit_sum = 0
for listener in listeners:
if listener.protocol == constants.PROTOCOL_UDP:
if listener.protocol in constants.LVS_PROTOCOLS:
continue
if listener.connection_limit and listener.connection_limit > -1:
connection_limit_sum += listener.connection_limit

View File

@ -15,6 +15,7 @@
import os
import jinja2
from octavia_lib.common import constants as lib_consts
from octavia.common.config import cfg
from octavia.common import constants
@ -24,7 +25,8 @@ from octavia.common import utils as octavia_utils
CONF = cfg.CONF
PROTOCOL_MAP = {
constants.PROTOCOL_UDP: 'udp'
constants.PROTOCOL_UDP: 'udp',
lib_consts.PROTOCOL_SCTP: 'sctp'
}
BALANCE_MAP = {
@ -92,7 +94,8 @@ class LvsJinjaTemplater(object):
listener)
return self._get_template().render(
{'loadbalancer': loadbalancer},
constants=constants)
constants=constants,
lib_consts=lib_consts)
def _transform_loadbalancer(self, loadbalancer, listener):
"""Transforms a load balancer into an object that will
@ -186,9 +189,13 @@ class LvsJinjaTemplater(object):
'monitor_port': member.monitor_port
}
def _get_default_lvs_check_script_path(self):
return (CONF.haproxy_amphora.base_path +
'/lvs/check/' + CHECK_SCRIPT_NAME)
def _get_default_lvs_check_script_path(self, monitor_type):
if monitor_type == constants.HEALTH_MONITOR_UDP_CONNECT:
return (CONF.haproxy_amphora.base_path +
'/lvs/check/' + CHECK_SCRIPT_NAME)
if monitor_type == lib_consts.HEALTH_MONITOR_SCTP:
return "amphora-health-checker sctp"
return None
def _transform_health_monitor(self, monitor):
"""Transforms a health monitor into an object that will
@ -202,10 +209,8 @@ class LvsJinjaTemplater(object):
'timeout': monitor.timeout,
'enabled': monitor.enabled,
'fall_threshold': monitor.fall_threshold,
'check_script_path': (self._get_default_lvs_check_script_path()
if monitor.type ==
constants.HEALTH_MONITOR_UDP_CONNECT else
None)
'check_script_path': (
self._get_default_lvs_check_script_path(monitor.type))
}
if monitor.type == constants.HEALTH_MONITOR_HTTP:
return_val.update({

View File

@ -21,7 +21,8 @@ net_namespace {{ constants.AMPHORA_NAMESPACE }}
{% endblock global_definitions %}
{% block proxies %}
{% if loadbalancer.enabled and loadbalancer.listener.enabled %}
{{- virtualserver_macro(constants, loadbalancer.listener,
{{- virtualserver_macro(constants, lib_consts,
loadbalancer.listener,
loadbalancer.vip_address,
loadbalancer.listener.get('default_pool', None)) }}
{% endif %}

View File

@ -29,6 +29,13 @@ MISC_CHECK {
}
{%- endmacro -%}
{%- macro sctp_check_macro(pool, member, health_monitor) -%}
MISC_CHECK {
misc_path "amphora-health-checker sctp -t {{ pool.health_monitor.timeout }} {{ member.monitor_address|default(member.address, true) }} {{ member.monitor_port|default(member.protocol_port, true) }}"
misc_timeout {{ (pool.health_monitor.timeout + 1) }}
}
{%- endmacro -%}
{%- macro http_url_macro(health_monitor, health_monitor_status_code) %}
url {
path {{ health_monitor.url_path }}
@ -55,10 +62,12 @@ TCP_CHECK {
}
{%- endmacro -%}
{% macro health_monitor_rs_macro(constants, pool, member) %}
{% macro health_monitor_rs_macro(constants, lib_consts, pool, member) %}
{% if pool.health_monitor and pool.health_monitor.enabled %}
{% if pool.health_monitor.type == constants.HEALTH_MONITOR_UDP_CONNECT %}
{{ misc_check_macro(pool, member, pool.health_monitor) -}}
{% elif pool.health_monitor.type == lib_consts.HEALTH_MONITOR_SCTP %}
{{ sctp_check_macro(pool, member, pool.health_monitor) -}}
{% elif pool.health_monitor.type == constants.HEALTH_MONITOR_HTTP and pool.health_monitor.http_method == constants.HEALTH_MONITOR_HTTP_METHOD_GET %}
{{ http_get_macro(pool, member, pool.health_monitor) -}}
{% elif pool.health_monitor.type == constants.HEALTH_MONITOR_TCP %}
@ -67,7 +76,7 @@ TCP_CHECK {
{% endif %}
{% endmacro %}
{% macro realserver_macro(constants, pool, member, listener) %}
{% macro realserver_macro(constants, lib_consts, pool, member, listener) %}
{% if member.enabled %}
# Configuration for Member {{ member.id }}
real_server {{ member.address }} {{ member.protocol_port }} {
@ -75,7 +84,7 @@ TCP_CHECK {
{% if listener.connection_limit %}
uthreshold {{ listener.connection_limit }}
{% endif %}
{{- health_monitor_rs_macro(constants, pool, member) }}
{{- health_monitor_rs_macro(constants, lib_consts, pool, member) }}
}
{% else %}
# Member {{ member.id }} is disabled
@ -92,7 +101,7 @@ TCP_CHECK {
{% endif %}
{% endmacro %}
{% macro virtualserver_macro(constants, listener, lb_vip_address, default_pool) %}
{% macro virtualserver_macro(constants, lib_consts, listener, lb_vip_address, default_pool) %}
{% if default_pool %}
virtual_server {{ lb_vip_address }} {{ listener.protocol_port }} {
{{ lb_algo_macro(default_pool) }}
@ -114,7 +123,7 @@ virtual_server {{ lb_vip_address }} {{ listener.protocol_port }} {
{% endif %}
{{ health_monitor_vs_macro(default_pool) }}
{% if default_pool.protocol.lower() == "udp" %}
{% if default_pool.protocol.upper() in constants.LVS_PROTOCOLS %}
{% if default_pool.enabled %}
# Configuration for Pool {{ default_pool.id }}
{% else %}
@ -124,7 +133,7 @@ virtual_server {{ lb_vip_address }} {{ listener.protocol_port }} {
# Configuration for HealthMonitor {{ default_pool.health_monitor.id }}
{% endif %}
{% for member in default_pool.members %}
{{- realserver_macro(constants, default_pool, member, listener) }}
{{- realserver_macro(constants, lib_consts, default_pool, member, listener) }}
{% endfor %}
{% endif %}
}

View File

@ -197,7 +197,9 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
del_ports = set(old_ports) - set(updated_ports)
for rule in rules.get('security_group_rules', []):
if (rule.get('protocol', '') and
rule.get('protocol', '').lower() in ['tcp', 'udp'] and
rule.get('protocol', '').upper() in
[constants.PROTOCOL_TCP, constants.PROTOCOL_UDP,
lib_consts.PROTOCOL_SCTP] and
(rule.get('port_range_max'), rule.get('protocol'),
rule.get('remote_ip_prefix')) in del_ports):
rule_id = rule.get(constants.ID)

View File

@ -106,7 +106,7 @@ class KeepalivedLvsTestCase(base.TestCase):
self.app.add_url_rule(
rule=self.TEST_URL % ('<amphora_id>', '<listener_id>'),
view_func=(lambda amphora_id, listener_id:
self.test_keepalivedlvs.upload_udp_listener_config(
self.test_keepalivedlvs.upload_lvs_listener_config(
listener_id)),
methods=['PUT'])
@ -123,7 +123,7 @@ class KeepalivedLvsTestCase(base.TestCase):
@mock.patch('os.makedirs')
@mock.patch('os.remove')
@mock.patch('subprocess.check_output')
def test_upload_udp_listener_config_no_vrrp_check_dir(
def test_upload_lvs_listener_config_no_vrrp_check_dir(
self, m_check_output, m_os_rm, m_os_mkdir, m_exists, m_os_chmod,
m_os_sysinit, m_copy2, mock_netns, mock_install_netns,
mock_systemctl):
@ -172,7 +172,7 @@ class KeepalivedLvsTestCase(base.TestCase):
self.assertEqual(200, res.status_code)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_udp_listeners')
'get_lvs_listeners')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_loadbalancers')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
@ -188,13 +188,13 @@ class KeepalivedLvsTestCase(base.TestCase):
@mock.patch('os.makedirs')
@mock.patch('os.remove')
@mock.patch('subprocess.check_output')
def test_upload_udp_listener_config_with_vrrp_check_dir(
def test_upload_lvs_listener_config_with_vrrp_check_dir(
self, m_check_output, m_os_rm, m_os_mkdir, m_exists, m_os_chmod,
m_os_sysinit, m_copy2, mock_netns, mock_install_netns,
mock_systemctl, mock_get_lbs, mock_get_udp_listeners):
mock_systemctl, mock_get_lbs, mock_get_lvs_listeners):
m_exists.side_effect = [False, False, True, True, False, False, False]
mock_get_lbs.return_value = []
mock_get_udp_listeners.return_value = [self.FAKE_ID]
mock_get_lvs_listeners.return_value = [self.FAKE_ID]
cfg_path = util.keepalived_lvs_cfg_path(self.FAKE_ID)
m = self.useFixture(test_utils.OpenFixture(cfg_path)).mock_open
@ -259,7 +259,7 @@ class KeepalivedLvsTestCase(base.TestCase):
@mock.patch('os.makedirs')
@mock.patch('os.remove')
@mock.patch('subprocess.check_output')
def test_upload_udp_listener_config_start_service_failure(
def test_upload_lvs_listener_config_start_service_failure(
self, m_check_output, m_os_rm, m_os_mkdir, m_exists, m_os_chmod,
m_os_sysinit, m_copy2, mock_install_netns, mock_systemctl):
m_exists.side_effect = [False, False, True, True, False]
@ -307,9 +307,9 @@ class KeepalivedLvsTestCase(base.TestCase):
@mock.patch('subprocess.check_output')
@mock.patch('octavia.amphorae.backends.agent.api_server.'
'keepalivedlvs.KeepalivedLvs.'
'_check_udp_listener_exists')
def test_manage_udp_listener(self, mock_udp_exist, mock_check_output):
res = self.test_keepalivedlvs.manage_udp_listener(self.FAKE_ID,
'_check_lvs_listener_exists')
def test_manage_lvs_listener(self, mock_lvs_exist, mock_check_output):
res = self.test_keepalivedlvs.manage_lvs_listener(self.FAKE_ID,
'start')
cmd = ("/usr/sbin/service octavia-keepalivedlvs-{listener_id}"
" {action}".format(listener_id=self.FAKE_ID, action='start'))
@ -317,19 +317,19 @@ class KeepalivedLvsTestCase(base.TestCase):
stderr=subprocess.STDOUT)
self.assertEqual(202, res.status_code)
res = self.test_keepalivedlvs.manage_udp_listener(self.FAKE_ID,
res = self.test_keepalivedlvs.manage_lvs_listener(self.FAKE_ID,
'restart')
self.assertEqual(400, res.status_code)
mock_check_output.side_effect = subprocess.CalledProcessError(1,
'blah!')
res = self.test_keepalivedlvs.manage_udp_listener(self.FAKE_ID,
res = self.test_keepalivedlvs.manage_lvs_listener(self.FAKE_ID,
'start')
self.assertEqual(500, res.status_code)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_udp_listeners', return_value=[LISTENER_ID])
'get_lvs_listeners', return_value=[LISTENER_ID])
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSTEMD)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
@ -337,10 +337,10 @@ class KeepalivedLvsTestCase(base.TestCase):
@mock.patch('subprocess.check_output')
@mock.patch('os.remove')
@mock.patch('os.path.exists')
def test_delete_udp_listener(self, m_exist, m_remove, m_check_output,
mget_pid, m_init_sys, mget_udp_listeners):
def test_delete_lvs_listener(self, m_exist, m_remove, m_check_output,
mget_pid, m_init_sys, mget_lvs_listeners):
m_exist.return_value = True
res = self.test_keepalivedlvs.delete_udp_listener(self.FAKE_ID)
res = self.test_keepalivedlvs.delete_lvs_listener(self.FAKE_ID)
cmd1 = ("/usr/sbin/service "
"octavia-keepalivedlvs-{0} stop".format(self.FAKE_ID))
@ -355,9 +355,9 @@ class KeepalivedLvsTestCase(base.TestCase):
@mock.patch.object(keepalivedlvs, "webob")
@mock.patch('os.path.exists')
def test_delete_udp_listener_not_exist(self, m_exist, m_webob):
def test_delete_lvs_listener_not_exist(self, m_exist, m_webob):
m_exist.return_value = False
self.test_keepalivedlvs.delete_udp_listener(self.FAKE_ID)
self.test_keepalivedlvs.delete_lvs_listener(self.FAKE_ID)
calls = [
mock.call(
json=dict(message='UDP Listener Not Found',
@ -371,12 +371,12 @@ class KeepalivedLvsTestCase(base.TestCase):
'get_keepalivedlvs_pid', return_value="12345")
@mock.patch('subprocess.check_output')
@mock.patch('os.path.exists')
def test_delete_udp_listener_stop_service_fail(self, m_exist,
def test_delete_lvs_listener_stop_service_fail(self, m_exist,
m_check_output, mget_pid):
m_exist.return_value = True
m_check_output.side_effect = subprocess.CalledProcessError(1,
'Woops!')
res = self.test_keepalivedlvs.delete_udp_listener(self.FAKE_ID)
res = self.test_keepalivedlvs.delete_lvs_listener(self.FAKE_ID)
self.assertEqual(500, res.status_code)
self.assertEqual({'message': 'Error stopping keepalivedlvs',
'details': None}, res.json)
@ -388,14 +388,15 @@ class KeepalivedLvsTestCase(base.TestCase):
@mock.patch('subprocess.check_output')
@mock.patch('os.remove')
@mock.patch('os.path.exists')
def test_delete_udp_listener_disable_service_fail(self, m_exist, m_remove,
m_check_output, mget_pid,
def test_delete_lvs_listener_disable_service_fail(self, m_exist, m_remove,
m_check_output,
mget_pid,
m_init_sys):
m_exist.return_value = True
m_check_output.side_effect = [True,
subprocess.CalledProcessError(
1, 'Woops!')]
res = self.test_keepalivedlvs.delete_udp_listener(self.FAKE_ID)
res = self.test_keepalivedlvs.delete_lvs_listener(self.FAKE_ID)
self.assertEqual(500, res.status_code)
self.assertEqual({
'message': 'Error disabling '
@ -409,10 +410,11 @@ class KeepalivedLvsTestCase(base.TestCase):
@mock.patch('subprocess.check_output')
@mock.patch('os.remove')
@mock.patch('os.path.exists')
def test_delete_udp_listener_unsupported_sysinit(self, m_exist, m_remove,
def test_delete_lvs_listener_unsupported_sysinit(self, m_exist, m_remove,
m_check_output, mget_pid,
m_init_sys):
m_exist.return_value = True
self.assertRaises(
util.UnknownInitError, self.test_keepalivedlvs.delete_udp_listener,
util.UnknownInitError,
self.test_keepalivedlvs.delete_lvs_listener,
self.FAKE_ID)

View File

@ -399,7 +399,7 @@ class TestServerTestCase(base.TestCase):
self._test_info(consts.CENTOS)
@mock.patch('octavia.amphorae.backends.agent.api_server.amphora_info.'
'AmphoraInfo._get_extend_body_from_udp_driver',
'AmphoraInfo._get_extend_body_from_lvs_driver',
return_value={})
@mock.patch('socket.gethostname')
@mock.patch('subprocess.check_output')
@ -422,7 +422,7 @@ class TestServerTestCase(base.TestCase):
jsonutils.loads(rv.data.decode('utf-8')))
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_protocol_for_lb_object', return_value='TCP')
'get_backend_for_lb_object', return_value='HAPROXY')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSTEMD)
def test_delete_ubuntu_listener_systemd(self, mock_init_system,
@ -431,7 +431,7 @@ class TestServerTestCase(base.TestCase):
mock_init_system)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_protocol_for_lb_object', return_value='TCP')
'get_backend_for_lb_object', return_value='HAPROXY')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSTEMD)
def test_delete_centos_listener_systemd(self, mock_init_system,
@ -440,7 +440,7 @@ class TestServerTestCase(base.TestCase):
mock_init_system)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_protocol_for_lb_object', return_value='TCP')
'get_backend_for_lb_object', return_value='HAPROXY')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSVINIT)
def test_delete_ubuntu_listener_sysvinit(self, mock_init_system,
@ -449,7 +449,7 @@ class TestServerTestCase(base.TestCase):
mock_init_system)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_protocol_for_lb_object', return_value='TCP')
'get_backend_for_lb_object', return_value='HAPROXY')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_UPSTART)
def test_delete_ubuntu_listener_upstart(self, mock_init_system,
@ -1178,9 +1178,9 @@ class TestServerTestCase(base.TestCase):
'address 10.0.0.5\nbroadcast 10.0.0.255\n'
'netmask 255.255.255.0\n'
'mtu 1450\n'
'post-up /usr/local/bin/udp-masquerade.sh add ipv4 '
'post-up /usr/local/bin/lvs-masquerade.sh add ipv4 '
'eth{int}\n'
'post-down /usr/local/bin/udp-masquerade.sh delete ipv4 '
'post-down /usr/local/bin/lvs-masquerade.sh delete ipv4 '
'eth{int}\n'.format(int=test_int_num))
elif distro == consts.CENTOS:
handle.write.assert_any_call(
@ -1253,9 +1253,9 @@ class TestServerTestCase(base.TestCase):
'address 2001:0db8:0000:0000:0000:0000:0000:0002\n'
'broadcast 2001:0db8:ffff:ffff:ffff:ffff:ffff:ffff\n'
'netmask 32\nmtu 1450\n'
'post-up /usr/local/bin/udp-masquerade.sh add ipv6 '
'post-up /usr/local/bin/lvs-masquerade.sh add ipv6 '
'eth{int}\n'
'post-down /usr/local/bin/udp-masquerade.sh delete ipv6 '
'post-down /usr/local/bin/lvs-masquerade.sh delete ipv6 '
'eth{int}\n'.format(int=test_int_num))
elif distro == consts.CENTOS:
handle.write.assert_any_call(
@ -1441,9 +1441,9 @@ class TestServerTestCase(base.TestCase):
' dev ' + consts.NETNS_PRIMARY_INTERFACE + '\n'
'down route del -host ' + DEST2 + ' gw ' + NEXTHOP +
' dev ' + consts.NETNS_PRIMARY_INTERFACE + '\n' +
'post-up /usr/local/bin/udp-masquerade.sh add ipv4 ' +
'post-up /usr/local/bin/lvs-masquerade.sh add ipv4 ' +
consts.NETNS_PRIMARY_INTERFACE + '\n' +
'post-down /usr/local/bin/udp-masquerade.sh delete ipv4 ' +
'post-down /usr/local/bin/lvs-masquerade.sh delete ipv4 ' +
consts.NETNS_PRIMARY_INTERFACE + '\n')
elif distro == consts.CENTOS:
handle.write.assert_any_call(
@ -1701,8 +1701,8 @@ class TestServerTestCase(base.TestCase):
'priority 100\n'
'post-down /sbin/ip rule del from 203.0.113.2/32 table 1 '
'priority 100\n\n'
'post-up /usr/local/bin/udp-masquerade.sh add ipv4 eth1\n'
'post-down /usr/local/bin/udp-masquerade.sh delete ipv4 '
'post-up /usr/local/bin/lvs-masquerade.sh add ipv4 eth1\n'
'post-down /usr/local/bin/lvs-masquerade.sh delete ipv4 '
'eth1'.format(netns_int=consts.NETNS_PRIMARY_INTERFACE))
elif distro == consts.CENTOS:
handle.write.assert_any_call(
@ -1806,8 +1806,8 @@ class TestServerTestCase(base.TestCase):
'priority 100\n'
'post-down /sbin/ip rule del from 203.0.113.2/32 table 1 '
'priority 100\n\n'
'post-up /usr/local/bin/udp-masquerade.sh add ipv4 eth1\n'
'post-down /usr/local/bin/udp-masquerade.sh delete ipv4 '
'post-up /usr/local/bin/lvs-masquerade.sh add ipv4 eth1\n'
'post-down /usr/local/bin/lvs-masquerade.sh delete ipv4 '
'eth1'.format(netns_int=consts.NETNS_PRIMARY_INTERFACE))
elif distro == consts.CENTOS:
handle.write.assert_any_call(
@ -2056,8 +2056,8 @@ class TestServerTestCase(base.TestCase):
'post-down /sbin/ip -6 rule del from '
'2001:0db8:0000:0000:0000:0000:0000:0002/128 table 1 '
'priority 100\n\n'
'post-up /usr/local/bin/udp-masquerade.sh add ipv6 eth1\n'
'post-down /usr/local/bin/udp-masquerade.sh delete ipv6 '
'post-up /usr/local/bin/lvs-masquerade.sh add ipv6 eth1\n'
'post-down /usr/local/bin/lvs-masquerade.sh delete ipv6 '
'eth1'.format(netns_int=consts.NETNS_PRIMARY_INTERFACE))
elif distro == consts.CENTOS:
handle.write.assert_any_call(
@ -2161,8 +2161,8 @@ class TestServerTestCase(base.TestCase):
'post-down /sbin/ip -6 rule del from '
'2001:0db8:0000:0000:0000:0000:0000:0002/128 table 1 '
'priority 100\n\n'
'post-up /usr/local/bin/udp-masquerade.sh add ipv6 eth1\n'
'post-down /usr/local/bin/udp-masquerade.sh delete ipv6 '
'post-up /usr/local/bin/lvs-masquerade.sh add ipv6 eth1\n'
'post-down /usr/local/bin/lvs-masquerade.sh delete ipv6 '
'eth1'.format(netns_int=consts.NETNS_PRIMARY_INTERFACE))
elif distro == consts.CENTOS:
handle.write.assert_any_call(
@ -2413,18 +2413,18 @@ class TestServerTestCase(base.TestCase):
self._test_details(consts.CENTOS)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_udp_listeners',
'get_lvs_listeners',
return_value=[])
@mock.patch('octavia.amphorae.backends.agent.api_server.'
'amphora_info.AmphoraInfo.'
'_get_extend_body_from_udp_driver',
'_get_extend_body_from_lvs_driver',
return_value={
"keepalived_version": '1.1.11-1',
"ipvsadm_version": '2.2.22-2'
})
@mock.patch('octavia.amphorae.backends.agent.api_server.'
'amphora_info.AmphoraInfo.'
'_count_udp_listener_processes', return_value=0)
'_count_lvs_listener_processes', return_value=0)
@mock.patch('octavia.amphorae.backends.agent.api_server.amphora_info.'
'AmphoraInfo._count_haproxy_processes')
@mock.patch('octavia.amphorae.backends.agent.api_server.amphora_info.'
@ -2443,8 +2443,8 @@ class TestServerTestCase(base.TestCase):
def _test_details(self, distro, mock_subbprocess, mock_hostname,
mock_get_listeners, mock_get_mem, mock_cpu,
mock_statvfs, mock_load, mock_get_nets,
mock_count_haproxy, mock_count_udp_listeners,
mock_get_ext_from_udp_driver, mock_get_udp_listeners):
mock_count_haproxy, mock_count_lvs_listeners,
mock_get_ext_from_lvs_driver, mock_get_lvs_listeners):
self.assertIn(distro, [consts.UBUNTU, consts.CENTOS])
@ -2559,7 +2559,7 @@ class TestServerTestCase(base.TestCase):
'packages': {},
'topology': consts.TOPOLOGY_SINGLE,
'topology_status': consts.TOPOLOGY_STATUS_OK,
'udp_listener_process_count': 0}
'lvs_listener_process_count': 0}
if distro == consts.UBUNTU:
rv = self.ubuntu_app.get('/' + api_server.VERSION + '/details')

View File

@ -43,7 +43,7 @@ class TestAmphoraInfo(base.TestCase):
super().setUp()
self.osutils_mock = mock.MagicMock()
self.amp_info = amphora_info.AmphoraInfo(self.osutils_mock)
self.udp_driver = mock.MagicMock()
self.lvs_driver = mock.MagicMock()
# setup a fake haproxy config file
templater = jinja_cfg.JinjaTemplater(
@ -95,7 +95,7 @@ class TestAmphoraInfo(base.TestCase):
mock_pkg_version, mock_webob):
mock_pkg_version.side_effect = self._return_version
self.udp_driver.get_subscribed_amp_compile_info.side_effect = [
self.lvs_driver.get_subscribed_amp_compile_info.side_effect = [
['keepalived', 'ipvsadm']]
original_version = api_server.VERSION
api_server.VERSION = self.API_VERSION
@ -105,7 +105,7 @@ class TestAmphoraInfo(base.TestCase):
'keepalived_version': self.KEEPALIVED_VERSION,
'ipvsadm_version': self.IPVSADM_VERSION
}
self.amp_info.compile_amphora_info(extend_udp_driver=self.udp_driver)
self.amp_info.compile_amphora_info(extend_lvs_driver=self.lvs_driver)
mock_webob.Response.assert_called_once_with(json=expected_dict)
api_server.VERSION = original_version
@ -183,7 +183,7 @@ class TestAmphoraInfo(base.TestCase):
api_server.VERSION = original_version
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_udp_listeners',
'get_lvs_listeners',
return_value=[FAKE_LISTENER_ID_3, FAKE_LISTENER_ID_4])
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_loadbalancers')
@ -201,10 +201,11 @@ class TestAmphoraInfo(base.TestCase):
@mock.patch('octavia.amphorae.backends.agent.api_server.'
'amphora_info.AmphoraInfo._count_haproxy_processes')
@mock.patch('socket.gethostname', return_value='FAKE_HOST')
def test_compile_amphora_details_for_udp(self, mhostname, m_count,
m_pkg_version, m_load, m_get_nets,
m_os, m_cpu, mget_mem,
mock_get_lb, mget_udp_listener):
def test_compile_amphora_details_for_ipvs(self, mhostname, m_count,
m_pkg_version, m_load,
m_get_nets,
m_os, m_cpu, mget_mem,
mock_get_lb, mget_lvs_listener):
mget_mem.return_value = {'SwapCached': 0, 'Buffers': 344792,
'MemTotal': 21692784, 'Cached': 4271856,
'Slab': 534384, 'MemFree': 12685624,
@ -224,9 +225,9 @@ class TestAmphoraInfo(base.TestCase):
'network_tx': 578}}
m_load.return_value = ['0.09', '0.11', '0.10']
m_count.return_value = 5
self.udp_driver.get_subscribed_amp_compile_info.return_value = [
self.lvs_driver.get_subscribed_amp_compile_info.return_value = [
'keepalived', 'ipvsadm']
self.udp_driver.is_listener_running.side_effect = [True, False]
self.lvs_driver.is_listener_running.side_effect = [True, False]
mock_get_lb.return_value = [self.LB_ID_1]
original_version = api_server.VERSION
api_server.VERSION = self.API_VERSION
@ -242,7 +243,7 @@ class TestAmphoraInfo(base.TestCase):
u'haproxy_version': self.HAPROXY_VERSION,
u'keepalived_version': self.KEEPALIVED_VERSION,
u'ipvsadm_version': self.IPVSADM_VERSION,
u'udp_listener_process_count': 1,
u'lvs_listener_process_count': 1,
u'hostname': u'FAKE_HOST',
u'listeners': sorted(list(set(
[self.FAKE_LISTENER_ID_3,
@ -263,7 +264,7 @@ class TestAmphoraInfo(base.TestCase):
u'packages': {},
u'topology': u'SINGLE',
u'topology_status': u'OK'}
actual = self.amp_info.compile_amphora_details(self.udp_driver)
actual = self.amp_info.compile_amphora_details(self.lvs_driver)
self.assertEqual(expected_dict, actual.json)
api_server.VERSION = original_version
@ -281,27 +282,27 @@ class TestAmphoraInfo(base.TestCase):
[uuidutils.generate_uuid(), uuidutils.generate_uuid()])
self.assertEqual(1, result)
def test__count_udp_listener_processes(self):
self.udp_driver.is_listener_running.side_effect = [True, False, True]
def test__count_lvs_listener_processes(self):
self.lvs_driver.is_listener_running.side_effect = [True, False, True]
expected = 2
actual = self.amp_info._count_udp_listener_processes(
self.udp_driver, [self.FAKE_LISTENER_ID_1,
actual = self.amp_info._count_lvs_listener_processes(
self.lvs_driver, [self.FAKE_LISTENER_ID_1,
self.FAKE_LISTENER_ID_2,
self.FAKE_LISTENER_ID_3])
self.assertEqual(expected, actual)
@mock.patch('octavia.amphorae.backends.agent.api_server.'
'amphora_info.AmphoraInfo._get_version_of_installed_package')
def test__get_extend_body_from_udp_driver(self, m_get_version):
self.udp_driver.get_subscribed_amp_compile_info.return_value = [
def test__get_extend_body_from_lvs_driver(self, m_get_version):
self.lvs_driver.get_subscribed_amp_compile_info.return_value = [
'keepalived', 'ipvsadm']
m_get_version.side_effect = self._return_version
expected = {
"keepalived_version": self.KEEPALIVED_VERSION,
"ipvsadm_version": self.IPVSADM_VERSION
}
actual = self.amp_info._get_extend_body_from_udp_driver(
self.udp_driver)
actual = self.amp_info._get_extend_body_from_lvs_driver(
self.lvs_driver)
self.assertEqual(expected, actual)
def test__get_meminfo(self):

View File

@ -29,9 +29,9 @@ class KeepalivedLvsTestCase(base.TestCase):
@mock.patch.object(keepalivedlvs, "webob")
@mock.patch('os.path.exists')
def test_delete_udp_listener_not_exist(self, m_exist, m_webob):
def test_delete_lvs_listener_not_exist(self, m_exist, m_webob):
m_exist.return_value = False
self.test_keepalivedlvs.delete_udp_listener(self.FAKE_ID)
self.test_keepalivedlvs.delete_lvs_listener(self.FAKE_ID)
calls = [
mock.call(
json=dict(message='UDP Listener Not Found',
@ -48,11 +48,12 @@ class KeepalivedLvsTestCase(base.TestCase):
@mock.patch('subprocess.check_output')
@mock.patch('os.remove')
@mock.patch('os.path.exists')
def test_delete_udp_listener_unsupported_sysinit(self, m_exist, m_remove,
def test_delete_lvs_listener_unsupported_sysinit(self, m_exist, m_remove,
m_check_output, mget_pid,
m_init_sys):
m_exist.return_value = True
mget_pid.return_value = '0'
self.assertRaises(
util.UnknownInitError, self.test_keepalivedlvs.delete_udp_listener,
util.UnknownInitError,
self.test_keepalivedlvs.delete_lvs_listener,
self.FAKE_ID)

View File

@ -241,8 +241,8 @@ class TestPlugNetwork(base.TestCase):
'down route del -net {dest1} gw {nexthop} dev {netns_interface}\n'
'up route add -net {dest2} gw {nexthop} dev {netns_interface}\n'
'down route del -net {dest2} gw {nexthop} dev {netns_interface}\n'
'post-up /usr/local/bin/udp-masquerade.sh add ipv4 eth1234\n'
'post-down /usr/local/bin/udp-masquerade.sh delete ipv4 eth1234\n')
'post-up /usr/local/bin/lvs-masquerade.sh add ipv4 eth1234\n'
'post-down /usr/local/bin/lvs-masquerade.sh delete ipv4 eth1234\n')
template_port = osutils.j2_env.get_template('plug_port_ethX.conf.j2')
text = self.test_plug._osutils._generate_network_file_text(
@ -293,9 +293,9 @@ class TestPlugNetwork(base.TestCase):
'down route del -net {dest1} gw {nexthop} dev {netns_interface}\n'
'up route add -net {dest2} gw {nexthop} dev {netns_interface}\n'
'down route del -net {dest2} gw {nexthop} dev {netns_interface}\n'
'post-up /usr/local/bin/udp-masquerade.sh add ipv4 '
'post-up /usr/local/bin/lvs-masquerade.sh add ipv4 '
'{netns_interface}\n'
'post-down /usr/local/bin/udp-masquerade.sh delete ipv4 '
'post-down /usr/local/bin/lvs-masquerade.sh delete ipv4 '
'{netns_interface}\n'
'\n\n# Generated by Octavia agent\n'
'auto {netns_interface}\n'
@ -304,9 +304,9 @@ class TestPlugNetwork(base.TestCase):
'broadcast {broadcast_ipv6}\n'
'netmask {netmask_ipv6}\n'
'mtu {mtu}\n'
'post-up /usr/local/bin/udp-masquerade.sh add ipv6 '
'post-up /usr/local/bin/lvs-masquerade.sh add ipv6 '
'{netns_interface}\n'
'post-down /usr/local/bin/udp-masquerade.sh delete ipv6 '
'post-down /usr/local/bin/lvs-masquerade.sh delete ipv6 '
'{netns_interface}\n')
template_port = osutils.j2_env.get_template('plug_port_ethX.conf.j2')

View File

@ -178,25 +178,25 @@ class TestUtil(base.TestCase):
mock_cfg_path.return_value = '/there'
mock_path_exists.side_effect = [True, False, True, False, False]
result = util.get_protocol_for_lb_object('1')
result = util.get_backend_for_lb_object('1')
mock_cfg_path.assert_called_once_with('1')
mock_path_exists.assert_called_once_with('/there')
self.assertFalse(mock_lvs_path.called)
self.assertEqual(consts.PROTOCOL_TCP, result)
self.assertEqual(consts.HAPROXY_BACKEND, result)
mock_cfg_path.reset_mock()
result = util.get_protocol_for_lb_object('2')
result = util.get_backend_for_lb_object('2')
mock_cfg_path.assert_called_once_with('2')
mock_lvs_path.assert_called_once_with('2')
self.assertEqual(consts.PROTOCOL_UDP, result)
self.assertEqual(consts.LVS_BACKEND, result)
mock_cfg_path.reset_mock()
mock_lvs_path.reset_mock()
result = util.get_protocol_for_lb_object('3')
result = util.get_backend_for_lb_object('3')
mock_cfg_path.assert_called_once_with('3')
mock_lvs_path.assert_called_once_with('3')
@ -281,7 +281,7 @@ class TestUtil(base.TestCase):
LISTENER_ID1)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_udp_listeners')
'get_lvs_listeners')
@mock.patch('os.makedirs')
@mock.patch('os.path.exists')
@mock.patch('os.listdir')

View File

@ -379,13 +379,13 @@ class TestHealthDaemon(base.TestCase):
self.assertEqual(0, mock_fdopen().read.call_count)
@mock.patch("octavia.amphorae.backends.utils.keepalivedlvs_query."
"get_udp_listener_pool_status")
"get_lvs_listener_pool_status")
@mock.patch("octavia.amphorae.backends.utils.keepalivedlvs_query."
"get_udp_listeners_stats")
"get_lvs_listeners_stats")
@mock.patch("octavia.amphorae.backends.agent.api_server.util."
"get_udp_listeners")
def test_build_stats_message_with_udp_listener(
self, mock_get_udp_listeners,
"get_lvs_listeners")
def test_build_stats_message_with_lvs_listener(
self, mock_get_lvs_listeners,
mock_get_listener_stats, mock_get_pool_status):
health_daemon.COUNTERS = None
health_daemon.COUNTERS_FILE = None
@ -395,7 +395,7 @@ class TestHealthDaemon(base.TestCase):
pool_id = uuidutils.generate_uuid()
member_id1 = uuidutils.generate_uuid()
member_id2 = uuidutils.generate_uuid()
mock_get_udp_listeners.return_value = [udp_listener_id1,
mock_get_lvs_listeners.return_value = [udp_listener_id1,
udp_listener_id2,
udp_listener_id3]

View File

@ -258,9 +258,9 @@ class LvsQueryTestCase(base.TestCase):
health_monitor_enabled=True)
self.assertEqual((False, {}), result)
def test_get_udp_listener_resource_ipports_nsname(self):
def test_get_lvs_listener_resource_ipports_nsname(self):
# ipv4
res = lvs_query.get_udp_listener_resource_ipports_nsname(
res = lvs_query.get_lvs_listener_resource_ipports_nsname(
self.listener_id_v4)
expected = {'Listener': {'id': self.listener_id_v4,
'ipport': '10.0.0.37:7777'},
@ -276,7 +276,7 @@ class LvsQueryTestCase(base.TestCase):
self.assertEqual((expected, constants.AMPHORA_NAMESPACE), res)
# ipv6
res = lvs_query.get_udp_listener_resource_ipports_nsname(
res = lvs_query.get_lvs_listener_resource_ipports_nsname(
self.listener_id_v6)
expected = {'Listener': {
'id': self.listener_id_v6,
@ -294,13 +294,13 @@ class LvsQueryTestCase(base.TestCase):
self.assertEqual((expected, constants.AMPHORA_NAMESPACE), res)
# disabled
res = lvs_query.get_udp_listener_resource_ipports_nsname(
res = lvs_query.get_lvs_listener_resource_ipports_nsname(
self.disabled_listener_id)
self.assertEqual((None, constants.AMPHORA_NAMESPACE), res)
@mock.patch('os.stat')
@mock.patch('subprocess.check_output')
def test_get_udp_listener_pool_status(self, mock_check_output,
def test_get_lvs_listener_pool_status(self, mock_check_output,
mock_os_stat):
mock_os_stat.side_effect = (
mock.Mock(st_mtime=1234),
@ -309,7 +309,7 @@ class LvsQueryTestCase(base.TestCase):
# test with ipv4 and ipv6
mock_check_output.return_value = KERNAL_FILE_SAMPLE_V4
res = lvs_query.get_udp_listener_pool_status(self.listener_id_v4)
res = lvs_query.get_lvs_listener_pool_status(self.listener_id_v4)
expected = {
'lvs':
{'uuid': self.pool_id_v4,
@ -326,7 +326,7 @@ class LvsQueryTestCase(base.TestCase):
)
mock_check_output.return_value = KERNAL_FILE_SAMPLE_V6
res = lvs_query.get_udp_listener_pool_status(self.listener_id_v6)
res = lvs_query.get_lvs_listener_pool_status(self.listener_id_v6)
expected = {
'lvs':
{'uuid': self.pool_id_v6,
@ -339,7 +339,7 @@ class LvsQueryTestCase(base.TestCase):
@mock.patch('os.stat')
@mock.patch('subprocess.check_output')
def test_get_udp_listener_pool_status_restarting(self, mock_check_output,
def test_get_lvs_listener_pool_status_restarting(self, mock_check_output,
mock_os_stat):
mock_os_stat.side_effect = (
mock.Mock(st_mtime=1234), # config file
@ -348,7 +348,7 @@ class LvsQueryTestCase(base.TestCase):
# test with ipv4 and ipv6
mock_check_output.return_value = KERNAL_FILE_SAMPLE_V4
res = lvs_query.get_udp_listener_pool_status(self.listener_id_v4)
res = lvs_query.get_lvs_listener_pool_status(self.listener_id_v4)
expected = {
'lvs':
{'uuid': self.pool_id_v4,
@ -360,8 +360,8 @@ class LvsQueryTestCase(base.TestCase):
self.assertEqual(expected, res)
@mock.patch('octavia.amphorae.backends.utils.keepalivedlvs_query.'
'get_udp_listener_resource_ipports_nsname')
def test_get_udp_listener_pool_status_when_no_pool(
'get_lvs_listener_resource_ipports_nsname')
def test_get_lvs_listener_pool_status_when_no_pool(
self, mock_get_resource_ipports):
# Just test with ipv4, ipv6 tests is same.
# the returned resource_ipport_mapping doesn't contains the 'Pool'
@ -374,12 +374,12 @@ class LvsQueryTestCase(base.TestCase):
'id': self.listener_id_v4,
'ipport': '10.0.0.37:7777'}},
constants.AMPHORA_NAMESPACE)
res = lvs_query.get_udp_listener_pool_status(self.listener_id_v4)
res = lvs_query.get_lvs_listener_pool_status(self.listener_id_v4)
self.assertEqual({}, res)
@mock.patch('octavia.amphorae.backends.utils.keepalivedlvs_query.'
'get_udp_listener_resource_ipports_nsname')
def test_get_udp_listener_pool_status_when_no_members(
'get_lvs_listener_resource_ipports_nsname')
def test_get_lvs_listener_pool_status_when_no_members(
self, mock_get_resource_ipports):
# Just test with ipv4, ipv6 tests is same.
# the returned resource_ipport_mapping doesn't contains the 'Members'
@ -392,7 +392,7 @@ class LvsQueryTestCase(base.TestCase):
'ipport': '10.0.0.37:7777'},
'Pool': {'id': self.pool_id_v4}},
constants.AMPHORA_NAMESPACE)
res = lvs_query.get_udp_listener_pool_status(self.listener_id_v4)
res = lvs_query.get_lvs_listener_pool_status(self.listener_id_v4)
expected = {'lvs': {
'uuid': self.pool_id_v4,
'status': constants.UP,
@ -403,7 +403,7 @@ class LvsQueryTestCase(base.TestCase):
@mock.patch('os.stat')
@mock.patch('octavia.amphorae.backends.utils.keepalivedlvs_query.'
'get_listener_realserver_mapping')
def test_get_udp_listener_pool_status_when_not_get_realserver_result(
def test_get_lvs_listener_pool_status_when_not_get_realserver_result(
self, mock_get_mapping, mock_os_stat):
# This will hit if the kernel lvs file (/proc/net/ip_vs)
# lose its content. So at this moment, eventhough we configure the
@ -414,7 +414,7 @@ class LvsQueryTestCase(base.TestCase):
mock.Mock(st_mtime=1234),
)
mock_get_mapping.return_value = (False, {})
res = lvs_query.get_udp_listener_pool_status(self.listener_id_v4)
res = lvs_query.get_lvs_listener_pool_status(self.listener_id_v4)
expected = {
'lvs':
{'uuid': self.pool_id_v4,
@ -484,10 +484,10 @@ class LvsQueryTestCase(base.TestCase):
@mock.patch('subprocess.check_output')
@mock.patch("octavia.amphorae.backends.agent.api_server.util."
"is_udp_listener_running", return_value=True)
"is_lvs_listener_running", return_value=True)
@mock.patch("octavia.amphorae.backends.agent.api_server.util."
"get_udp_listeners")
def test_get_udp_listeners_stats(
"get_lvs_listeners")
def test_get_lvs_listeners_stats(
self, mock_get_listener, mock_is_running, mock_check_output):
# The ipv6 test is same with ipv4, so just test ipv4 here
mock_get_listener.return_value = [self.listener_id_v4]
@ -501,7 +501,7 @@ class LvsQueryTestCase(base.TestCase):
"member1_ipport": "10.0.0.25:2222",
"member2_ipport": "10.0.0.35:3333"})
mock_check_output.side_effect = output_list
res = lvs_query.get_udp_listeners_stats()
res = lvs_query.get_lvs_listeners_stats()
# We can check the expected result reference the stats sample,
# that means this func can compute the stats info of single listener.
expected = {self.listener_id_v4: {
@ -513,16 +513,16 @@ class LvsQueryTestCase(base.TestCase):
# if no udp listener need to be collected.
# Then this function will return nothing.
mock_is_running.return_value = False
res = lvs_query.get_udp_listeners_stats()
res = lvs_query.get_lvs_listeners_stats()
self.assertEqual({}, res)
@mock.patch('subprocess.check_output')
@mock.patch("octavia.amphorae.backends.agent.api_server.util."
"is_udp_listener_running", return_value=True)
"is_lvs_listener_running", return_value=True)
@mock.patch("octavia.amphorae.backends.agent.api_server.util."
"get_udp_listeners")
def test_get_udp_listeners_stats_disabled_listener(
"get_lvs_listeners")
def test_get_lvs_listeners_stats_disabled_listener(
self, mock_get_listener, mock_is_running, mock_check_output):
mock_get_listener.return_value = [self.disabled_listener_id]
res = lvs_query.get_udp_listeners_stats()
res = lvs_query.get_lvs_listeners_stats()
self.assertEqual({}, res)

View File

@ -71,7 +71,7 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
'haproxy_version': u'1.6.3-1ubuntu0.1',
'api_version': API_VERSION}
self.driver.jinja_split = mock.MagicMock()
self.driver.udp_jinja = mock.MagicMock()
self.driver.lvs_jinja = mock.MagicMock()
# Build sample Listener and VIP configs
self.sl = sample_configs_split.sample_listener_tuple(
@ -252,7 +252,7 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
mock_secret.assert_has_calls(secret_calls)
def test_udp_update(self):
self.driver.udp_jinja.build_config.side_effect = ['fake_udp_config']
self.driver.lvs_jinja.build_config.side_effect = ['fake_udp_config']
# Execute driver method
self.driver.update(self.lb_udp)

View File

@ -71,7 +71,7 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
'haproxy_version': u'1.6.3-1ubuntu0.1',
'api_version': API_VERSION}
self.driver.jinja_combo = mock.MagicMock()
self.driver.udp_jinja = mock.MagicMock()
self.driver.lvs_jinja = mock.MagicMock()
# Build sample Listener and VIP configs
self.sl = sample_configs_combined.sample_listener_tuple(
@ -253,7 +253,7 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
mock_secret.assert_has_calls(secret_calls)
def test_udp_update(self):
self.driver.udp_jinja.build_config.side_effect = ['fake_udp_config']
self.driver.lvs_jinja.build_config.side_effect = ['fake_udp_config']
# Execute driver method
self.driver.update(self.lb_udp)

View File

@ -22,6 +22,7 @@ from octavia.common import constants as consts
from octavia.network import base as network_base
from octavia.tests.common import sample_data_models
from octavia.tests.unit import base
from octavia_lib.common import constants as lib_consts
class TestAmphoraDriver(base.TestRpc):
@ -349,6 +350,50 @@ class TestAmphoraDriver(base.TestRpc):
self.amp_driver.member_create,
provider_member)
@mock.patch('octavia.db.api.get_session')
@mock.patch('octavia.db.repositories.PoolRepository.get')
@mock.patch('oslo_messaging.RPCClient.cast')
def test_member_create_sctp_ipv4(self, mock_cast, mock_pool_get,
mock_session):
mock_lb = mock.MagicMock()
mock_lb.vip = mock.MagicMock()
mock_lb.vip.ip_address = "192.0.1.1"
mock_listener = mock.MagicMock()
mock_listener.load_balancer = mock_lb
mock_pool = mock.MagicMock()
mock_pool.protocol = lib_consts.PROTOCOL_SCTP
mock_pool.listeners = [mock_listener]
mock_pool_get.return_value = mock_pool
provider_member = driver_dm.Member(
member_id=self.sample_data.member1_id,
address="192.0.2.1")
self.amp_driver.member_create(provider_member)
payload = {consts.MEMBER: provider_member.to_dict()}
mock_cast.assert_called_with({}, 'create_member', **payload)
@mock.patch('octavia.db.api.get_session')
@mock.patch('octavia.db.repositories.PoolRepository.get')
@mock.patch('oslo_messaging.RPCClient.cast')
def test_member_create_sctp_ipv4_ipv6(self, mock_cast, mock_pool_get,
mock_session):
mock_lb = mock.MagicMock()
mock_lb.vip = mock.MagicMock()
mock_lb.vip.ip_address = "fe80::1"
mock_listener = mock.MagicMock()
mock_listener.load_balancer = mock_lb
mock_pool = mock.MagicMock()
mock_pool.protocol = lib_consts.PROTOCOL_SCTP
mock_pool.listeners = [mock_listener]
mock_pool_get.return_value = mock_pool
provider_member = driver_dm.Member(
member_id=self.sample_data.member1_id,
address="192.0.2.1")
self.assertRaises(exceptions.UnsupportedOptionError,
self.amp_driver.member_create,
provider_member)
@mock.patch('oslo_messaging.RPCClient.cast')
def test_member_delete(self, mock_cast):
provider_member = driver_dm.Member(

View File

@ -0,0 +1,152 @@
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import struct
from unittest import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from octavia.cmd import health_checker
from octavia.tests.unit import base
CONF = cfg.CONF
class TestHealthCheckerCMD(base.TestCase):
def setUp(self):
super(TestHealthCheckerCMD, self).setUp()
self.CONF = self.useFixture(oslo_fixture.Config(cfg.CONF))
def test_crc32c(self):
data = b'STRING1234'
result = health_checker.crc32c(data)
self.assertEqual(result, 0x30e0e107)
@mock.patch('random.randint', return_value=42424242)
def test__sctp_build_init_packet(self, mock_randint):
expected_packet = bytearray(
b'\x04\xd2\x16.\x00\x00\x00\x00\x1d9\x96\r\x01\x00\x00\x14:\xde'
b'h\xb1\x00\x01\xa0\x00\x00\n\xff\xff\x02\x87W\xb2')
src_port = 1234
dest_port = 5678
tag = 987654321
pkt = health_checker._sctp_build_init_packet(
src_port, dest_port, tag)
self.assertEqual(pkt, expected_packet)
decoded_src_port = struct.unpack_from('!H', pkt, 0)[0]
decoded_dest_port = struct.unpack_from('!H', pkt, 2)[0]
self.assertEqual(src_port, decoded_src_port)
self.assertEqual(dest_port, decoded_dest_port)
decoded_tag = struct.unpack_from('!L', pkt, 16)[0]
self.assertEqual(tag, decoded_tag)
decoded_checksum = struct.unpack_from('!L', pkt, 8)[0]
# Reset and re-compute checksum
pkt[8] = pkt[9] = pkt[10] = pkt[11] = 0
checksum = health_checker.crc32c(pkt)
self.assertEqual(checksum, decoded_checksum)
def test__sctp_build_abort_packet(self):
expected_packet = bytearray(
b'\x04\xd2\x16.\x02\x93wM3\x83\xbbN\x06\x01\x00\x04')
src_port = 1234
dest_port = 5678
verification_tag = 43218765
pkt = health_checker._sctp_build_abort_packet(
src_port, dest_port, verification_tag)
self.assertEqual(pkt, expected_packet)
decoded_src_port = struct.unpack_from('!H', pkt, 0)[0]
decoded_dest_port = struct.unpack_from('!H', pkt, 2)[0]
self.assertEqual(src_port, decoded_src_port)
self.assertEqual(dest_port, decoded_dest_port)
decoded_tag = struct.unpack_from('!L', pkt, 4)[0]
self.assertEqual(verification_tag, decoded_tag)
decoded_checksum = struct.unpack_from('!L', pkt, 8)[0]
# Reset and re-compute checksum
pkt[8] = pkt[9] = pkt[10] = pkt[11] = 0
checksum = health_checker.crc32c(pkt)
self.assertEqual(checksum, decoded_checksum)
def test__sctp_decode_packet(self):
# IPv4 INIT ACK packet
data = (b'\x45\x00\x00\x00\x00\x01\x01\x01'
b'\x00\x00\xff\x06\x7f\x00\x00\x00'
b'\x7f\x00\x00\x02\x16.\x04\xd2'
b'\x02\x93\x77\x4d\x00\x00\x00\x32'
b'\x02\x00\x00\x16')
family = socket.AF_INET
expected_tag = 43218765
ret = health_checker._sctp_decode_packet(data, family, expected_tag)
self.assertEqual(ret, 2) # INIT ACK
# IPv6 ABORT packet
data = (b'\x16.\x04\xd2\x02\x93\x77\x4d\x00\x00\x00\x32'
b'\x06\x00\x00\x16')
family = socket.AF_INET6
expected_tag = 43218765
ret = health_checker._sctp_decode_packet(data, family, expected_tag)
self.assertEqual(ret, 6) # ABORT
def test__sctp_decode_packet_too_short(self):
# IPv4 packet with different verification tag
data = (b'\x45\x00\x00\x00\x00\x01')
family = socket.AF_INET
expected_tag = 43218765
ret = health_checker._sctp_decode_packet(data, family, expected_tag)
self.assertFalse(ret)
def test__sctp_decode_packet_unexpected(self):
# IPv4 packet with different verification tag
data = (b'\x45\x00\x00\x00\x00\x01\x01\x01'
b'\x00\x00\xff\x06\x7f\x00\x00\x00'
b'\x7f\x00\x00\x02\x16.\x04\xd2'
b'\x02\x91\x17\x4d\x00\x00\x00\x32'
b'\x02\x00\x00\x16')
family = socket.AF_INET
expected_tag = 43218765
ret = health_checker._sctp_decode_packet(data, family, expected_tag)
self.assertFalse(ret)

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from octavia_lib.common import constants as lib_consts
from octavia.common import constants
from octavia.common.jinja.lvs import jinja_cfg
from octavia.tests.unit import base
@ -27,12 +29,12 @@ BASE_PATH = '/var/lib/octavia'
class TestLvsCfg(base.TestCase):
def setUp(self):
super().setUp()
self.udp_jinja_cfg = jinja_cfg.LvsJinjaTemplater()
self.lvs_jinja_cfg = jinja_cfg.LvsJinjaTemplater()
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="haproxy_amphora", base_path=BASE_PATH)
def test_udp_get_template(self):
template = self.udp_jinja_cfg._get_template()
template = self.lvs_jinja_cfg._get_template()
self.assertEqual('keepalivedlvs.cfg.j2', template.name)
def test_render_template_udp_source_ip(self):
@ -71,7 +73,7 @@ class TestLvsCfg(base.TestCase):
" }\n"
" }\n\n"
"}\n\n")
rendered_obj = self.udp_jinja_cfg.render_loadbalancer_obj(
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(
sample_configs_combined.sample_listener_tuple(
proto=constants.PROTOCOL_UDP,
persistence_type=constants.SESSION_PERSISTENCE_SOURCE_IP,
@ -121,7 +123,7 @@ class TestLvsCfg(base.TestCase):
monitor_proto=constants.HEALTH_MONITOR_UDP_CONNECT,
connection_limit=98,
persistence=False)
rendered_obj = self.udp_jinja_cfg.render_loadbalancer_obj(listener)
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(listener)
self.assertEqual(exp, rendered_obj)
def test_render_template_udp_with_health_monitor(self):
@ -159,7 +161,7 @@ class TestLvsCfg(base.TestCase):
" }\n\n"
"}\n\n")
rendered_obj = self.udp_jinja_cfg.render_loadbalancer_obj(
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(
sample_configs_combined.sample_listener_tuple(
proto=constants.PROTOCOL_UDP,
monitor_proto=constants.HEALTH_MONITOR_UDP_CONNECT,
@ -202,7 +204,7 @@ class TestLvsCfg(base.TestCase):
" }\n\n"
"}\n\n")
rendered_obj = self.udp_jinja_cfg.render_loadbalancer_obj(
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(
sample_configs_combined.sample_listener_tuple(
proto=constants.PROTOCOL_UDP,
monitor_ip_port=True,
@ -216,7 +218,7 @@ class TestLvsCfg(base.TestCase):
"# Configuration for Listener sample_listener_id_1\n\n"
"net_namespace amphora-haproxy\n\n\n")
rendered_obj = self.udp_jinja_cfg.render_loadbalancer_obj(
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(
sample_configs_combined.sample_listener_tuple(
proto=constants.PROTOCOL_UDP, monitor=False,
persistence=False, alloc_default_pool=False))
@ -233,7 +235,7 @@ class TestLvsCfg(base.TestCase):
" # Configuration for Pool sample_pool_id_0\n"
"}\n\n")
rendered_obj = self.udp_jinja_cfg.render_loadbalancer_obj(
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(
sample_configs_combined.sample_listener_tuple(
proto=constants.PROTOCOL_UDP, monitor=False,
persistence=False, alloc_default_pool=True,
@ -259,7 +261,7 @@ class TestLvsCfg(base.TestCase):
" }\n\n"
"}\n\n")
rendered_obj = self.udp_jinja_cfg.render_loadbalancer_obj(
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(
sample_configs_combined.sample_listener_tuple(
proto=constants.PROTOCOL_UDP, monitor=False,
persistence=False, alloc_default_pool=True,
@ -275,7 +277,7 @@ class TestLvsCfg(base.TestCase):
persistence_granularity='255.0.0.0'
))
exp = sample_configs_combined.UDP_SOURCE_IP_BODY
ret = self.udp_jinja_cfg._transform_session_persistence(
ret = self.lvs_jinja_cfg._transform_session_persistence(
persistence_src_ip)
self.assertEqual(exp, ret)
@ -283,20 +285,20 @@ class TestLvsCfg(base.TestCase):
in_hm = sample_configs_combined.sample_health_monitor_tuple(
proto=constants.HEALTH_MONITOR_UDP_CONNECT
)
ret = self.udp_jinja_cfg._transform_health_monitor(in_hm)
ret = self.lvs_jinja_cfg._transform_health_monitor(in_hm)
self.assertEqual(sample_configs_combined.RET_UDP_HEALTH_MONITOR, ret)
def test_udp_transform_member(self):
in_member = sample_configs_combined.sample_member_tuple(
'member_id_1', '192.0.2.10')
ret = self.udp_jinja_cfg._transform_member(in_member)
ret = self.lvs_jinja_cfg._transform_member(in_member)
self.assertEqual(sample_configs_combined.RET_UDP_MEMBER, ret)
in_member = sample_configs_combined.sample_member_tuple(
'member_id_1',
'192.0.2.10',
monitor_ip_port=True)
ret = self.udp_jinja_cfg._transform_member(in_member)
ret = self.lvs_jinja_cfg._transform_member(in_member)
self.assertEqual(
sample_configs_combined.RET_UDP_MEMBER_MONITOR_IP_PORT, ret)
@ -306,7 +308,7 @@ class TestLvsCfg(base.TestCase):
persistence_type=constants.SESSION_PERSISTENCE_SOURCE_IP,
persistence_timeout=33, persistence_granularity='255.0.0.0',
)
ret = self.udp_jinja_cfg._transform_pool(in_pool)
ret = self.lvs_jinja_cfg._transform_pool(in_pool)
self.assertEqual(sample_configs_combined.RET_UDP_POOL, ret)
in_pool = sample_configs_combined.sample_pool_tuple(
@ -315,7 +317,7 @@ class TestLvsCfg(base.TestCase):
persistence_timeout=33, persistence_granularity='255.0.0.0',
lb_algorithm=None,
)
ret = self.udp_jinja_cfg._transform_pool(in_pool)
ret = self.lvs_jinja_cfg._transform_pool(in_pool)
self.assertEqual(sample_configs_combined.RET_UDP_POOL, ret)
in_pool = sample_configs_combined.sample_pool_tuple(
@ -324,7 +326,7 @@ class TestLvsCfg(base.TestCase):
persistence_timeout=33, persistence_granularity='255.0.0.0',
monitor=False)
sample_configs_combined.RET_UDP_POOL['health_monitor'] = ''
ret = self.udp_jinja_cfg._transform_pool(in_pool)
ret = self.lvs_jinja_cfg._transform_pool(in_pool)
self.assertEqual(sample_configs_combined.RET_UDP_POOL, ret)
def test_udp_transform_listener(self):
@ -336,7 +338,7 @@ class TestLvsCfg(base.TestCase):
monitor_proto=constants.HEALTH_MONITOR_UDP_CONNECT,
connection_limit=98
)
ret = self.udp_jinja_cfg._transform_listener(in_listener)
ret = self.lvs_jinja_cfg._transform_listener(in_listener)
self.assertEqual(sample_configs_combined.RET_UDP_LISTENER, ret)
in_listener = sample_configs_combined.sample_listener_tuple(
@ -347,7 +349,7 @@ class TestLvsCfg(base.TestCase):
monitor_proto=constants.HEALTH_MONITOR_UDP_CONNECT,
connection_limit=-1)
ret = self.udp_jinja_cfg._transform_listener(in_listener)
ret = self.lvs_jinja_cfg._transform_listener(in_listener)
sample_configs_combined.RET_UDP_LISTENER.pop('connection_limit')
self.assertEqual(sample_configs_combined.RET_UDP_LISTENER, ret)
@ -409,7 +411,7 @@ class TestLvsCfg(base.TestCase):
persistence=False,
monitor_expected_codes='200-201')
rendered_obj = self.udp_jinja_cfg.render_loadbalancer_obj(listener)
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(listener)
self.assertEqual(exp, rendered_obj)
def test_render_template_udp_listener_with_tcp_health_monitor(self):
@ -452,14 +454,14 @@ class TestLvsCfg(base.TestCase):
connection_limit=98,
persistence=False)
rendered_obj = self.udp_jinja_cfg.render_loadbalancer_obj(listener)
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(listener)
self.assertEqual(exp, rendered_obj)
def test_render_template_disabled_udp_listener(self):
exp = ("# Configuration for Loadbalancer sample_loadbalancer_id_1\n"
"# Listener sample_listener_id_1 is disabled\n\n"
"net_namespace amphora-haproxy\n\n")
rendered_obj = self.udp_jinja_cfg.render_loadbalancer_obj(
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(
sample_configs_combined.sample_listener_tuple(
enabled=False,
proto=constants.PROTOCOL_UDP,
@ -469,3 +471,394 @@ class TestLvsCfg(base.TestCase):
monitor_proto=constants.HEALTH_MONITOR_UDP_CONNECT,
connection_limit=98))
self.assertEqual(exp, rendered_obj)
def test_render_template_sctp_source_ip(self):
exp = ("# Configuration for Loadbalancer sample_loadbalancer_id_1\n"
"# Configuration for Listener sample_listener_id_1\n\n"
"net_namespace amphora-haproxy\n\n"
"virtual_server 10.0.0.2 80 {\n"
" lb_algo rr\n"
" lb_kind NAT\n"
" protocol SCTP\n"
" persistence_timeout 33\n"
" persistence_granularity 255.255.0.0\n"
" delay_loop 30\n"
" delay_before_retry 30\n"
" retry 3\n\n\n"
" # Configuration for Pool sample_pool_id_1\n"
" # Configuration for HealthMonitor sample_monitor_id_1\n"
" # Configuration for Member sample_member_id_1\n"
" real_server 10.0.0.99 82 {\n"
" weight 13\n"
" uthreshold 98\n"
" MISC_CHECK {\n"
" misc_path \"amphora-health-checker sctp -t 31 "
"10.0.0.99 82\"\n"
" misc_timeout 32\n"
" }\n"
" }\n\n"
" # Configuration for Member sample_member_id_2\n"
" real_server 10.0.0.98 82 {\n"
" weight 13\n"
" uthreshold 98\n"
" MISC_CHECK {\n"
" misc_path \"amphora-health-checker sctp -t 31 "
"10.0.0.98 82\"\n"
" misc_timeout 32\n"
" }\n"
" }\n\n"
"}\n\n")
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(
sample_configs_combined.sample_listener_tuple(
proto=lib_consts.PROTOCOL_SCTP,
persistence_type=constants.SESSION_PERSISTENCE_SOURCE_IP,
persistence_timeout=33,
persistence_granularity='255.255.0.0',
monitor_proto=lib_consts.HEALTH_MONITOR_SCTP,
connection_limit=98))
self.assertEqual(exp, rendered_obj)
def test_render_template_sctp_one_packet(self):
exp = ("# Configuration for Loadbalancer sample_loadbalancer_id_1\n"
"# Configuration for Listener sample_listener_id_1\n\n"
"net_namespace amphora-haproxy\n\n"
"virtual_server 10.0.0.2 80 {\n"
" lb_algo rr\n"
" lb_kind NAT\n"
" protocol SCTP\n"
" delay_loop 30\n"
" delay_before_retry 30\n"
" retry 3\n\n\n"
" # Configuration for Pool sample_pool_id_1\n"
" # Configuration for HealthMonitor sample_monitor_id_1\n"
" # Configuration for Member sample_member_id_1\n"
" real_server 10.0.0.99 82 {\n"
" weight 13\n"
" uthreshold 98\n"
" MISC_CHECK {\n"
" misc_path \"amphora-health-checker sctp -t 31 "
"10.0.0.99 82\"\n"
" misc_timeout 32\n"
" }\n"
" }\n\n"
" # Configuration for Member sample_member_id_2\n"
" real_server 10.0.0.98 82 {\n"
" weight 13\n"
" uthreshold 98\n"
" MISC_CHECK {\n"
" misc_path \"amphora-health-checker sctp -t 31 "
"10.0.0.98 82\"\n"
" misc_timeout 32\n"
" }\n"
" }\n\n"
"}\n\n")
listener = sample_configs_combined.sample_listener_tuple(
proto=lib_consts.PROTOCOL_SCTP,
monitor_proto=lib_consts.HEALTH_MONITOR_SCTP,
connection_limit=98,
persistence=False)
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(listener)
self.assertEqual(exp, rendered_obj)
def test_render_template_sctp_with_health_monitor(self):
exp = ("# Configuration for Loadbalancer sample_loadbalancer_id_1\n"
"# Configuration for Listener sample_listener_id_1\n\n"
"net_namespace amphora-haproxy\n\n"
"virtual_server 10.0.0.2 80 {\n"
" lb_algo rr\n"
" lb_kind NAT\n"
" protocol SCTP\n"
" delay_loop 30\n"
" delay_before_retry 30\n"
" retry 3\n\n\n"
" # Configuration for Pool sample_pool_id_1\n"
" # Configuration for HealthMonitor sample_monitor_id_1\n"
" # Configuration for Member sample_member_id_1\n"
" real_server 10.0.0.99 82 {\n"
" weight 13\n"
" uthreshold 98\n"
" MISC_CHECK {\n"
" misc_path \"amphora-health-checker sctp -t 31 "
"10.0.0.99 82\"\n"
" misc_timeout 32\n"
" }\n"
" }\n\n"
" # Configuration for Member sample_member_id_2\n"
" real_server 10.0.0.98 82 {\n"
" weight 13\n"
" uthreshold 98\n"
" MISC_CHECK {\n"
" misc_path \"amphora-health-checker sctp -t 31 "
"10.0.0.98 82\"\n"
" misc_timeout 32\n"
" }\n"
" }\n\n"
"}\n\n")
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(
sample_configs_combined.sample_listener_tuple(
proto=lib_consts.PROTOCOL_SCTP,
monitor_proto=lib_consts.HEALTH_MONITOR_SCTP,
persistence=False,
connection_limit=98))
self.assertEqual(exp, rendered_obj)
def test_render_template_sctp_with_health_monitor_ip_port(self):
exp = ("# Configuration for Loadbalancer sample_loadbalancer_id_1\n"
"# Configuration for Listener sample_listener_id_1\n\n"
"net_namespace amphora-haproxy\n\n"
"virtual_server 10.0.0.2 80 {\n"
" lb_algo rr\n"
" lb_kind NAT\n"
" protocol SCTP\n"
" delay_loop 30\n"
" delay_before_retry 30\n"
" retry 3\n\n\n"
" # Configuration for Pool sample_pool_id_1\n"
" # Configuration for HealthMonitor sample_monitor_id_1\n"
" # Configuration for Member sample_member_id_1\n"
" real_server 10.0.0.99 82 {\n"
" weight 13\n"
" uthreshold 98\n"
" MISC_CHECK {\n"
" misc_path \"amphora-health-checker sctp -t 31 "
"192.168.1.1 9000\"\n"
" misc_timeout 32\n"
" }\n"
" }\n\n"
" # Configuration for Member sample_member_id_2\n"
" real_server 10.0.0.98 82 {\n"
" weight 13\n"
" uthreshold 98\n"
" MISC_CHECK {\n"
" misc_path \"amphora-health-checker sctp -t 31 "
"192.168.1.1 9000\"\n"
" misc_timeout 32\n"
" }\n"
" }\n\n"
"}\n\n")
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(
sample_configs_combined.sample_listener_tuple(
proto=lib_consts.PROTOCOL_SCTP,
monitor_ip_port=True,
monitor_proto=lib_consts.HEALTH_MONITOR_SCTP,
persistence=False,
connection_limit=98))
self.assertEqual(exp, rendered_obj)
def test_render_template_sctp_no_other_resources(self):
exp = ("# Configuration for Loadbalancer sample_loadbalancer_id_1\n"
"# Configuration for Listener sample_listener_id_1\n\n"
"net_namespace amphora-haproxy\n\n\n")
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(
sample_configs_combined.sample_listener_tuple(
proto=lib_consts.PROTOCOL_SCTP, monitor=False,
persistence=False, alloc_default_pool=False))
self.assertEqual(exp, rendered_obj)
def test_sctp_transform_session_persistence(self):
persistence_src_ip = (
sample_configs_combined.sample_session_persistence_tuple(
persistence_type=constants.SESSION_PERSISTENCE_SOURCE_IP,
persistence_cookie=None,
persistence_timeout=33,
persistence_granularity='255.0.0.0'
))
exp = sample_configs_combined.SCTP_SOURCE_IP_BODY
ret = self.lvs_jinja_cfg._transform_session_persistence(
persistence_src_ip)
self.assertEqual(exp, ret)
def test_sctp_transform_health_monitor(self):
in_hm = sample_configs_combined.sample_health_monitor_tuple(
proto=lib_consts.HEALTH_MONITOR_SCTP
)
ret = self.lvs_jinja_cfg._transform_health_monitor(in_hm)
self.assertEqual(sample_configs_combined.RET_SCTP_HEALTH_MONITOR, ret)
def test_sctp_transform_member(self):
in_member = sample_configs_combined.sample_member_tuple(
'member_id_1', '192.0.2.10')
ret = self.lvs_jinja_cfg._transform_member(in_member)
self.assertEqual(sample_configs_combined.RET_SCTP_MEMBER, ret)
in_member = sample_configs_combined.sample_member_tuple(
'member_id_1',
'192.0.2.10',
monitor_ip_port=True)
ret = self.lvs_jinja_cfg._transform_member(in_member)
self.assertEqual(
sample_configs_combined.RET_SCTP_MEMBER_MONITOR_IP_PORT, ret)
def test_sctp_transform_pool(self):
in_pool = sample_configs_combined.sample_pool_tuple(
proto=lib_consts.PROTOCOL_SCTP,
persistence_type=constants.SESSION_PERSISTENCE_SOURCE_IP,
persistence_timeout=33, persistence_granularity='255.0.0.0',
)
ret = self.lvs_jinja_cfg._transform_pool(in_pool)
self.assertEqual(sample_configs_combined.RET_SCTP_POOL, ret)
in_pool = sample_configs_combined.sample_pool_tuple(
proto=lib_consts.PROTOCOL_SCTP,
persistence_type=constants.SESSION_PERSISTENCE_SOURCE_IP,
persistence_timeout=33, persistence_granularity='255.0.0.0',
lb_algorithm=None,
)
ret = self.lvs_jinja_cfg._transform_pool(in_pool)
self.assertEqual(sample_configs_combined.RET_SCTP_POOL, ret)
in_pool = sample_configs_combined.sample_pool_tuple(
proto=lib_consts.PROTOCOL_SCTP,
persistence_type=constants.SESSION_PERSISTENCE_SOURCE_IP,
persistence_timeout=33, persistence_granularity='255.0.0.0',
monitor=False)
sample_configs_combined.RET_SCTP_POOL['health_monitor'] = ''
ret = self.lvs_jinja_cfg._transform_pool(in_pool)
self.assertEqual(sample_configs_combined.RET_SCTP_POOL, ret)
def test_sctp_transform_listener(self):
in_listener = sample_configs_combined.sample_listener_tuple(
proto=lib_consts.PROTOCOL_SCTP,
persistence_type=constants.SESSION_PERSISTENCE_SOURCE_IP,
persistence_timeout=33,
persistence_granularity='255.0.0.0',
monitor_proto=lib_consts.HEALTH_MONITOR_SCTP,
connection_limit=98
)
ret = self.lvs_jinja_cfg._transform_listener(in_listener)
self.assertEqual(sample_configs_combined.RET_SCTP_LISTENER, ret)
in_listener = sample_configs_combined.sample_listener_tuple(
proto=lib_consts.PROTOCOL_SCTP,
persistence_type=constants.SESSION_PERSISTENCE_SOURCE_IP,
persistence_timeout=33,
persistence_granularity='255.0.0.0',
monitor_proto=lib_consts.HEALTH_MONITOR_SCTP,
connection_limit=-1)
ret = self.lvs_jinja_cfg._transform_listener(in_listener)
sample_configs_combined.RET_SCTP_LISTENER.pop('connection_limit')
self.assertEqual(sample_configs_combined.RET_SCTP_LISTENER, ret)
def test_render_template_sctp_listener_with_http_health_monitor(self):
exp = ("# Configuration for Loadbalancer sample_loadbalancer_id_1\n"
"# Configuration for Listener sample_listener_id_1\n\n"
"net_namespace amphora-haproxy\n\n"
"virtual_server 10.0.0.2 80 {\n"
" lb_algo rr\n"
" lb_kind NAT\n"
" protocol SCTP\n"
" delay_loop 30\n"
" delay_before_retry 30\n"
" retry 3\n\n\n"
" # Configuration for Pool sample_pool_id_1\n"
" # Configuration for HealthMonitor sample_monitor_id_1\n"
" # Configuration for Member sample_member_id_1\n"
" real_server 10.0.0.99 82 {\n"
" weight 13\n"
" uthreshold 98\n"
" HTTP_GET {\n"
" url {\n"
" path /index.html\n"
" status_code 200\n"
" }\n"
" url {\n"
" path /index.html\n"
" status_code 201\n"
" }\n"
" connect_ip 10.0.0.99\n"
" connect_port 82\n"
" connect_timeout 31\n"
" }\n"
" }\n\n"
" # Configuration for Member sample_member_id_2\n"
" real_server 10.0.0.98 82 {\n"
" weight 13\n"
" uthreshold 98\n"
" HTTP_GET {\n"
" url {\n"
" path /index.html\n"
" status_code 200\n"
" }\n"
" url {\n"
" path /index.html\n"
" status_code 201\n"
" }\n"
" connect_ip 10.0.0.98\n"
" connect_port 82\n"
" connect_timeout 31\n"
" }\n"
" }\n\n"
"}\n\n")
listener = sample_configs_combined.sample_listener_tuple(
proto=lib_consts.PROTOCOL_SCTP,
monitor_proto=constants.HEALTH_MONITOR_HTTP,
connection_limit=98,
persistence=False,
monitor_expected_codes='200-201')
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(listener)
self.assertEqual(exp, rendered_obj)
def test_render_template_sctp_listener_with_tcp_health_monitor(self):
exp = ("# Configuration for Loadbalancer sample_loadbalancer_id_1\n"
"# Configuration for Listener sample_listener_id_1\n\n"
"net_namespace amphora-haproxy\n\n"
"virtual_server 10.0.0.2 80 {\n"
" lb_algo rr\n"
" lb_kind NAT\n"
" protocol SCTP\n"
" delay_loop 30\n"
" delay_before_retry 30\n"
" retry 3\n\n\n"
" # Configuration for Pool sample_pool_id_1\n"
" # Configuration for HealthMonitor sample_monitor_id_1\n"
" # Configuration for Member sample_member_id_1\n"
" real_server 10.0.0.99 82 {\n"
" weight 13\n"
" uthreshold 98\n"
" TCP_CHECK {\n"
" connect_ip 10.0.0.99\n"
" connect_port 82\n"
" connect_timeout 31\n"
" }\n"
" }\n\n"
" # Configuration for Member sample_member_id_2\n"
" real_server 10.0.0.98 82 {\n"
" weight 13\n"
" uthreshold 98\n"
" TCP_CHECK {\n"
" connect_ip 10.0.0.98\n"
" connect_port 82\n"
" connect_timeout 31\n"
" }\n"
" }\n\n"
"}\n\n")
listener = sample_configs_combined.sample_listener_tuple(
proto=lib_consts.PROTOCOL_SCTP,
monitor_proto=constants.HEALTH_MONITOR_TCP,
connection_limit=98,
persistence=False)
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(listener)
self.assertEqual(exp, rendered_obj)
def test_render_template_disabled_sctp_listener(self):
exp = ("# Configuration for Loadbalancer sample_loadbalancer_id_1\n"
"# Listener sample_listener_id_1 is disabled\n\n"
"net_namespace amphora-haproxy\n\n")
rendered_obj = self.lvs_jinja_cfg.render_loadbalancer_obj(
sample_configs_combined.sample_listener_tuple(
enabled=False,
proto=lib_consts.PROTOCOL_SCTP,
persistence_type=constants.SESSION_PERSISTENCE_SOURCE_IP,
persistence_timeout=33,
persistence_granularity='255.255.0.0',
monitor_proto=lib_consts.HEALTH_MONITOR_SCTP,
connection_limit=98))
self.assertEqual(exp, rendered_obj)

View File

@ -15,6 +15,7 @@
import collections
from octavia_lib.common import constants as lib_consts
from oslo_config import cfg
from octavia.common import constants
@ -416,16 +417,6 @@ RET_UDP_HEALTH_MONITOR = {
'/lvs/check/udp_check.sh')
}
UDP_HEALTH_MONITOR_NO_SCRIPT = {
'id': 'sample_monitor_id_1',
'check_script_path': None,
'delay': 30,
'enabled': True,
'fall_threshold': 3,
'timeout': 31,
'type': 'UDP'
}
RET_UDP_MEMBER = {
'id': 'member_id_1',
'address': '192.0.2.10',
@ -469,7 +460,7 @@ UDP_MEMBER_2 = {
RET_UDP_POOL = {
'id': 'sample_pool_id_1',
'enabled': True,
'health_monitor': UDP_HEALTH_MONITOR_NO_SCRIPT,
'health_monitor': RET_UDP_HEALTH_MONITOR,
'lb_algorithm': 'rr',
'members': [UDP_MEMBER_1, UDP_MEMBER_2],
'protocol': 'udp',
@ -493,6 +484,89 @@ RET_UDP_LISTENER = {
'protocol_port': '80'
}
SCTP_SOURCE_IP_BODY = {
'type': constants.SESSION_PERSISTENCE_SOURCE_IP,
'persistence_timeout': 33,
'persistence_granularity': '255.0.0.0'
}
RET_SCTP_HEALTH_MONITOR = {
'id': 'sample_monitor_id_1',
'type': lib_consts.HEALTH_MONITOR_SCTP,
'delay': 30,
'timeout': 31,
'enabled': True,
'fall_threshold': 3,
'check_script_path': 'amphora-health-checker sctp'
}
RET_SCTP_MEMBER = {
'id': 'member_id_1',
'address': '192.0.2.10',
'protocol_port': 82,
'weight': 13,
'enabled': True,
'monitor_address': None,
'monitor_port': None
}
RET_SCTP_MEMBER_MONITOR_IP_PORT = {
'id': 'member_id_1',
'address': '192.0.2.10',
'protocol_port': 82,
'weight': 13,
'enabled': True,
'monitor_address': '192.168.1.1',
'monitor_port': 9000
}
SCTP_MEMBER_1 = {
'id': 'sample_member_id_1',
'address': '10.0.0.99',
'enabled': True,
'protocol_port': 82,
'weight': 13,
'monitor_address': None,
'monitor_port': None,
}
SCTP_MEMBER_2 = {
'id': 'sample_member_id_2',
'address': '10.0.0.98',
'enabled': True,
'protocol_port': 82,
'weight': 13,
'monitor_address': None,
'monitor_port': None
}
RET_SCTP_POOL = {
'id': 'sample_pool_id_1',
'enabled': True,
'health_monitor': RET_SCTP_HEALTH_MONITOR,
'lb_algorithm': 'rr',
'members': [SCTP_MEMBER_1, SCTP_MEMBER_2],
'protocol': 'sctp',
'session_persistence': SCTP_SOURCE_IP_BODY
}
RET_SCTP_LISTENER = {
'connection_limit': 98,
'default_pool': {
'id': 'sample_pool_id_1',
'enabled': True,
'health_monitor': RET_SCTP_HEALTH_MONITOR,
'lb_algorithm': 'rr',
'members': [SCTP_MEMBER_1, SCTP_MEMBER_2],
'protocol': 'sctp',
'session_persistence': SCTP_SOURCE_IP_BODY
},
'enabled': True,
'id': 'sample_listener_id_1',
'protocol_mode': 'sctp',
'protocol_port': '80'
}
def sample_listener_loadbalancer_tuple(
topology=None, enabled=True, pools=None):
@ -812,7 +886,11 @@ def sample_pool_tuple(listener_id=None, proto=None, monitor=True,
tls_ciphers = None
tls_versions = None
alpn_protocols = None
monitor_proto = proto if monitor_proto is None else monitor_proto
if monitor_proto is None:
if proto == constants.PROTOCOL_UDP:
monitor_proto = constants.HEALTH_MONITOR_UDP_CONNECT
else:
monitor_proto = proto
in_pool = collections.namedtuple(
'pool', 'id, protocol, lb_algorithm, members, health_monitor, '
'session_persistence, enabled, operating_status, '
@ -820,7 +898,7 @@ def sample_pool_tuple(listener_id=None, proto=None, monitor=True,
'crl_container_id, tls_enabled, tls_ciphers, '
'tls_versions, provisioning_status, alpn_protocols, ' +
constants.HTTP_REUSE)
if (proto == constants.PROTOCOL_UDP and
if (proto in constants.LVS_PROTOCOLS and
persistence_type == constants.SESSION_PERSISTENCE_SOURCE_IP):
kwargs = {'persistence_type': persistence_type,
'persistence_timeout': persistence_timeout,
@ -956,6 +1034,8 @@ def sample_health_monitor_tuple(proto='HTTP', sample_hm=1,
if proto == constants.HEALTH_MONITOR_UDP_CONNECT:
kwargs['check_script_path'] = (CONF.haproxy_amphora.base_path +
'lvs/check/' + 'udp_check.sh')
elif proto == lib_consts.HEALTH_MONITOR_SCTP:
kwargs['check_script_path'] = 'amphora-health-checker sctp'
else:
kwargs['check_script_path'] = None
return monitor(**kwargs)

View File

@ -0,0 +1,7 @@
---
features:
- |
Add support for the SCTP protocol in the Amphora driver. Support for SCTP
listeners and pools is implemented using keepalived in the amphora. Support
for SCTP health monitors is provided by the amphora-health-checker script
and relies on an INIT/INIT-ACK/ABORT sequence of packets.

View File

@ -52,6 +52,7 @@ console_scripts =
amphora-agent = octavia.cmd.agent:main
haproxy-vrrp-check = octavia.cmd.haproxy_vrrp_check:main
octavia-status = octavia.cmd.status:main
amphora-health-checker = octavia.cmd.health_checker:main
octavia.api.drivers =
noop_driver = octavia.api.drivers.noop_driver.driver:NoopProviderDriver
noop_driver-alt = octavia.api.drivers.noop_driver.driver:NoopProviderDriver