Followup patch for UDP support

1. Removes the misc_dynamic setting from the UDP-CONNECT health monitor
   as our script does not use it.
2. Adds a release note for the UDP features.
3. Updates the API reference for UDP support.
4. Adds a comment to the keepalived config with the LB ID.
5. Updates the status message type to be the correct UDP protocol.
6. Fix error during deleting a listener if there are multiple amphoraes.
7. Refactors systemd service script handling.

Story: 2003306
Task: 24258
Change-Id: I09240023d066ac5a71836d01045cda6ce5678712
This commit is contained in:
Michael Johnson 2018-07-31 20:40:16 -07:00
parent 4c6846a568
commit cc97397d1c
26 changed files with 916 additions and 547 deletions

View File

@ -411,7 +411,7 @@ healthmonitor-timeout-optional:
healthmonitor-type:
description: |
The type of health monitor. One of ``HTTP``, ``HTTPS``, ``PING``, ``TCP``,
or ``TLS-HELLO``.
``TLS-HELLO``, or ``UDP-CONNECT``.
in: body
required: true
type: string
@ -838,15 +838,15 @@ project_id-optional-deprecated:
type: string
protocol:
description: |
The protocol for the resource. One of ``HTTP``, ``HTTPS``, ``TCP``, or
``TERMINATED_HTTPS``.
The protocol for the resource. One of ``HTTP``, ``HTTPS``, ``TCP``,
``TERMINATED_HTTPS``, or ``UDP``.
in: body
required: true
type: integer
protocol-pools:
description: |
The protocol for the resource. One of ``HTTP``, ``HTTPS``, ``PROXY``, or
``TCP``.
The protocol for the resource. One of ``HTTP``, ``HTTPS``, ``PROXY``,
``TCP``, or ``UDP``.
in: body
required: true
type: string
@ -995,10 +995,26 @@ session_persistence_cookie:
in: body
required: false
type: string
session_persistence_granularity:
description: |
The netmask used to determine UDP session persistence. Currently only
valid for UDP pools with session persistence of SOURCE_IP. Default netmask
is 255.255.255.255, meaning per client full IP.
in: body
required: false
type: string
session_persistence_timeout:
description: |
The timeout, in seconds, after which a UDP flow may be rescheduled to a
different member. Currently only applies to UDP pools with session
persistence of SOURCE_IP. Default is 360.
in: body
required: false
type: integer
session_persistence_type:
description: |
Session persistence type for the pool. One of ``APP_COOKIE``,
``HTTP_COOKIE``, ``SOURCE_IP``.
``HTTP_COOKIE``, or ``SOURCE_IP``.
in: body
required: true
type: string

View File

@ -115,7 +115,7 @@ At a minimum, you must specify these health monitor attributes:
times out.
- ``type`` The type of health monitor. One of ``HTTP``, ``HTTPS``, ``PING``,
``TCP``, or ``TLS-HELLO``.
``TCP``, ``TLS-HELLO``, or ``UDP-CONNECT``.
Some attributes receive default values if you omit them from the request:

View File

@ -110,6 +110,8 @@ L7 policies with ``action`` of ``REDIRECT_TO_URL`` will return a HTTP
L7 policies with ``action`` of ``REJECT`` will return a ``Forbidden (403)``
response code to the requester.
.. note:: Pools of type ``UDP`` cannot be used in L7 policies at this time.
.. rest_status_code:: success ../http-status.yaml
- 201

View File

@ -95,7 +95,7 @@ is ready for further configuration.
At a minimum, you must specify these pool attributes:
- ``protocol`` The protocol for which this pool and its members
listen. A valid value is ``HTTP``, ``HTTPS``, ``PROXY``, or ``TCP``.
listen. A valid value is ``HTTP``, ``HTTPS``, ``PROXY``, ``TCP``, or ``UDP``.
- ``lb_algorithm`` The load-balancer algorithm, such as
``ROUND_ROBIN``, ``LEAST_CONNECTIONS``, and ``SOURCE_IP``, that
@ -202,6 +202,8 @@ Pool Session Persistence Object
- type: session_persistence_type
- cookie_name: session_persistence_cookie
- persistence_timeout: session_persistence_timeout
- persistence_granularity: session_persistence_granularity
Pool Session Persistence Object Example
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

View File

@ -1,7 +1,8 @@
{
"family": {
"redhat": {
"amphora-agent": "openstack-octavia-amphora-agent"
"amphora-agent": "openstack-octavia-amphora-agent",
"netcat-openbsd": "nmap-ncat"
}
},
"default": {

View File

@ -96,17 +96,19 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
if init_system == consts.INIT_SYSTEMD:
template = SYSTEMD_TEMPLATE
init_enable_cmd = ("systemctl enable "
"octavia-keepalivedlvs-%s"
% str(listener_id))
# Render and install the network namespace systemd service
util.install_netns_systemd_service()
util.run_systemctl_command(
consts.ENABLE, consts.AMP_NETNS_SVC_PREFIX)
elif init_system == consts.INIT_UPSTART:
template = UPSTART_TEMPLATE
elif init_system == consts.INIT_SYSVINIT:
template = SYSVINIT_TEMPLATE
init_enable_cmd = "insserv {file}".format(file=file_path)
else:
raise util.UnknownInitError()
# Render and install the keepalivedlvs init script
if init_system == consts.INIT_SYSTEMD:
# mode 00644
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
@ -124,12 +126,17 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
check_pid=check_pid,
keepalived_cmd=consts.KEEPALIVED_CMD,
keepalived_cfg=util.keepalived_lvs_cfg_path(listener_id),
amphora_nsname=consts.AMPHORA_NAMESPACE
amphora_nsname=consts.AMPHORA_NAMESPACE,
amphora_netns=consts.AMP_NETNS_SVC_PREFIX
)
text_file.write(text)
# Make sure the new service is enabled on boot
if init_system != consts.INIT_UPSTART:
# Make sure the keepalivedlvs service is enabled on boot
if init_system == consts.INIT_SYSTEMD:
util.run_systemctl_command(
consts.ENABLE, "octavia-keepalivedlvs-%s" % str(listener_id))
elif init_system == consts.INIT_SYSVINIT:
init_enable_cmd = "insserv {file}".format(file=file_path)
try:
subprocess.check_output(init_enable_cmd.split(),
stderr=subprocess.STDOUT)
@ -232,9 +239,7 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
def get_all_udp_listeners_status(self):
"""Gets the status of all UDP listeners
This method will not consult the stats socket
so a listener might show as ACTIVE but still be
in ERROR
Gets the status of all UDP listeners on the amphora.
"""
listeners = list()
@ -243,7 +248,7 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
listeners.append({
'status': status,
'uuid': udp_listener,
'type': 'lvs',
'type': 'UDP',
})
return listeners
@ -265,14 +270,14 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
stats = dict(
status=status,
uuid=listener_id,
type=''
type='UDP'
)
return webob.Response(json=stats)
stats = dict(
status=status,
uuid=listener_id,
type='lvs'
type='UDP'
)
try:
@ -280,7 +285,8 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
listener_id)
except subprocess.CalledProcessError as e:
return webob.Response(json=dict(
message="Error get kernel lvs status for udp listener",
message="Error getting kernel lvs status for udp listener "
"{}".format(listener_id),
details=e.output), status=500)
stats['pools'] = [pool]
return webob.Response(json=stats)
@ -321,15 +327,14 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
init_path = util.keepalived_lvs_init_path(init_system, listener_id)
if init_system == consts.INIT_SYSTEMD:
init_disable_cmd = (
"systemctl disable octavia-keepalivedlvs-"
"{list}".format(list=listener_id))
util.run_systemctl_command(
consts.DISABLE, "octavia-keepalivedlvs-%s" % str(listener_id))
elif init_system == consts.INIT_SYSVINIT:
init_disable_cmd = "insserv -r {file}".format(file=init_path)
elif init_system != consts.INIT_UPSTART:
raise util.UnknownInitError()
if init_system != consts.INIT_UPSTART:
if init_system == consts.INIT_SYSVINIT:
try:
subprocess.check_output(init_disable_cmd.split(),
stderr=subprocess.STDOUT)

View File

@ -43,7 +43,7 @@ CONF = cfg.CONF
UPSTART_CONF = 'upstart.conf.j2'
SYSVINIT_CONF = 'sysvinit.conf.j2'
SYSTEMD_CONF = 'systemd.conf.j2'
AMPHORA_NETNS = 'amphora-netns'
consts.AMP_NETNS_SVC_PREFIX = 'amphora-netns'
JINJA_ENV = jinja2.Environment(
autoescape=True,
@ -161,8 +161,10 @@ class Listener(object):
if init_system == consts.INIT_SYSTEMD:
template = SYSTEMD_TEMPLATE
init_enable_cmd = "systemctl enable haproxy-{list}".format(
list=listener_id)
# Render and install the network namespace systemd service
util.install_netns_systemd_service()
util.run_systemctl_command(
consts.ENABLE, consts.AMP_NETNS_SVC_PREFIX + '.service')
elif init_system == consts.INIT_UPSTART:
template = UPSTART_TEMPLATE
elif init_system == consts.INIT_SYSVINIT:
@ -187,20 +189,6 @@ class Listener(object):
mode = (stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP |
stat.S_IROTH | stat.S_IXOTH)
if init_system == consts.INIT_SYSTEMD:
# TODO(bcafarel): implement this for other init systems
# netns handling depends on a separate unit file
netns_path = os.path.join(consts.SYSTEMD_DIR,
AMPHORA_NETNS + '.service')
if not os.path.exists(netns_path):
with os.fdopen(os.open(netns_path, flags, mode),
'w') as text_file:
text = JINJA_ENV.get_template(
AMPHORA_NETNS + '.systemd.j2').render(
amphora_nsname=consts.AMPHORA_NAMESPACE,
HasIFUPAll=self._osutils.has_ifup_all())
text_file.write(text)
hap_major, hap_minor = haproxy_compatibility.get_haproxy_versions()
if not os.path.exists(init_path):
with os.fdopen(os.open(init_path, flags, mode), 'w') as text_file:
@ -214,7 +202,7 @@ class Listener(object):
respawn_count=util.CONF.haproxy_amphora.respawn_count,
respawn_interval=(util.CONF.haproxy_amphora.
respawn_interval),
amphora_netns=AMPHORA_NETNS,
amphora_netns=consts.AMP_NETNS_SVC_PREFIX,
amphora_nsname=consts.AMPHORA_NAMESPACE,
HasIFUPAll=self._osutils.has_ifup_all(),
haproxy_major_version=hap_major,
@ -223,7 +211,10 @@ class Listener(object):
text_file.write(text)
# Make sure the new service is enabled on boot
if init_system != consts.INIT_UPSTART:
if init_system == consts.INIT_SYSTEMD:
util.run_systemctl_command(
consts.ENABLE, "haproxy-{list}".format(list=listener_id))
elif init_system == consts.INIT_SYSVINIT:
try:
subprocess.check_output(init_enable_cmd.split(),
stderr=subprocess.STDOUT)
@ -336,14 +327,15 @@ class Listener(object):
init_path = util.init_path(listener_id, init_system)
if init_system == consts.INIT_SYSTEMD:
init_disable_cmd = "systemctl disable haproxy-{list}".format(
list=listener_id)
util.run_systemctl_command(
consts.DISABLE, "haproxy-{list}".format(
list=listener_id))
elif init_system == consts.INIT_SYSVINIT:
init_disable_cmd = "insserv -r {file}".format(file=init_path)
elif init_system != consts.INIT_UPSTART:
raise util.UnknownInitError()
if init_system != consts.INIT_UPSTART:
if init_system == consts.INIT_SYSVINIT:
try:
subprocess.check_output(init_disable_cmd.split(),
stderr=subprocess.STDOUT)

View File

@ -26,6 +26,7 @@ from octavia.amphorae.backends.agent.api_server import listener
from octavia.amphorae.backends.agent.api_server import osutils
from octavia.amphorae.backends.agent.api_server import plug
from octavia.amphorae.backends.agent.api_server import udp_listener_base
from octavia.amphorae.backends.agent.api_server import util
PATH_PREFIX = '/' + api_server.VERSION
@ -43,17 +44,6 @@ def register_app_error_handler(app):
app.register_error_handler(code, make_json_error)
def check_and_return_request_listener_protocol(request):
try:
protocol_dict = request.get_json()
assert type(protocol_dict) is dict
assert 'protocol' in protocol_dict
except Exception:
raise exceptions.BadRequest(
description='Invalid protocol information for Listener')
return protocol_dict['protocol']
class Server(object):
def __init__(self):
self.app = flask.Flask(__name__)
@ -146,16 +136,14 @@ class Server(object):
return self._udp_listener.get_udp_listener_config(listener_id)
def start_stop_listener(self, listener_id, action):
protocol = check_and_return_request_listener_protocol(
flask.request)
protocol = util.get_listener_protocol(listener_id)
if protocol == 'UDP':
return self._udp_listener.manage_udp_listener(
listener_id, action)
return self._listener.start_stop_listener(listener_id, action)
def delete_listener(self, listener_id):
protocol = check_and_return_request_listener_protocol(
flask.request)
protocol = util.get_listener_protocol(listener_id)
if protocol == 'UDP':
return self._udp_listener.delete_udp_listener(listener_id)
return self._listener.delete_listener(listener_id)
@ -174,8 +162,7 @@ class Server(object):
other_listeners=udp_listeners)
def get_listener_status(self, listener_id):
protocol = check_and_return_request_listener_protocol(
flask.request)
protocol = util.get_listener_protocol(listener_id)
if protocol == 'UDP':
return self._udp_listener.get_udp_listener_status(listener_id)
return self._listener.get_listener_status(listener_id)

View File

@ -1,7 +1,8 @@
[Unit]
Description=Keepalive Daemon (LVS and VRRP)
After=network-online.target
After=network-online.target {{ amphora_netns }}.service
Wants=network-online.target
Requires={{ amphora_netns }}.service
[Service]
# Force context as we start keepalived under "ip netns exec"

View File

@ -15,13 +15,18 @@
import os
import re
import stat
import subprocess
import jinja2
from oslo_config import cfg
from oslo_log import log as logging
from octavia.amphorae.backends.agent.api_server import osutils
from octavia.common import constants as consts
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class UnknownInitError(Exception):
@ -88,10 +93,6 @@ def keepalived_lvs_cfg_path(listener_id):
str(listener_id))
def keepalived_lvs_iptables_dir():
return os.path.join(CONF.haproxy_amphora.base_path, 'lvs/iptables/')
def haproxy_dir(listener_id):
return os.path.join(CONF.haproxy_amphora.base_path, listener_id)
@ -110,7 +111,7 @@ def get_haproxy_pid(listener_id):
def get_keepalivedlvs_pid(listener_id):
pid_file, _, _ = keepalived_lvs_pids_path(listener_id)
pid_file = keepalived_lvs_pids_path(listener_id)[0]
with open(pid_file, 'r') as f:
return f.readline().rstrip()
@ -195,7 +196,7 @@ def get_udp_listeners():
def is_udp_listener_running(listener_id):
pid_file, _, _ = keepalived_lvs_pids_path(listener_id)
pid_file = keepalived_lvs_pids_path(listener_id)[0]
return os.path.exists(pid_file) and os.path.exists(
os.path.join('/proc', get_keepalivedlvs_pid(listener_id)))
@ -215,3 +216,56 @@ def get_os_init_system():
return consts.INIT_UPSTART
return consts.INIT_SYSVINIT
return consts.INIT_UNKOWN
def install_netns_systemd_service():
os_utils = osutils.BaseOS.get_os_util()
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
# mode 00644
mode = (stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
# TODO(bcafarel): implement this for other init systems
# netns handling depends on a separate unit file
netns_path = os.path.join(consts.SYSTEMD_DIR,
consts.AMP_NETNS_SVC_PREFIX + '.service')
jinja_env = jinja2.Environment(
autoescape=True, loader=jinja2.FileSystemLoader(os.path.dirname(
os.path.realpath(__file__)
) + consts.AGENT_API_TEMPLATES))
if not os.path.exists(netns_path):
with os.fdopen(os.open(netns_path, flags, mode), 'w') as text_file:
text = jinja_env.get_template(
consts.AMP_NETNS_SVC_PREFIX + '.systemd.j2').render(
amphora_nsname=consts.AMPHORA_NAMESPACE,
HasIFUPAll=os_utils.has_ifup_all())
text_file.write(text)
def run_systemctl_command(command, service):
cmd = "systemctl {cmd} {srvc}".format(cmd=command, srvc=service)
try:
subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
LOG.error("Failed to %(cmd)s %(srvc)s service: "
"%(err)s %(out)s", {'cmd': command, 'srvc': service,
'err': e, 'out': e.output})
def get_listener_protocol(listener_id):
"""Returns the L4 protocol for a listener.
If the listener is a TCP based listener (haproxy) return TCP.
If the listener is a UDP based listener (lvs) return UDP.
If the listener is not identifiable, return None.
:param listener_id: The ID of the listener to identify.
:returns: TCP, UDP, or None
"""
if os.path.exists(config_path(listener_id)):
return consts.PROTOCOL_TCP
elif os.path.exists(keepalived_lvs_cfg_path(listener_id)):
return consts.PROTOCOL_UDP
return None

View File

@ -95,7 +95,6 @@ class HaproxyAmphoraLoadBalancerDriver(
self.client.upload_udp_config(amp, listener.id, config,
timeout_dict=timeout_dict)
self.client.reload_listener(amp, listener.id,
listener.protocol,
timeout_dict=timeout_dict)
else:
certs = self._process_tls_certificates(listener)
@ -121,8 +120,7 @@ class HaproxyAmphoraLoadBalancerDriver(
# Generate Keepalived LVS configuration from listener object
config = self.udp_jinja.build_config(listener=listener)
self.client.upload_udp_config(amp, listener.id, config)
self.client.reload_listener(amp, listener.id,
listener.protocol)
self.client.reload_listener(amp, listener.id)
def update(self, listener, vip):
if listener.protocol == 'UDP':
@ -145,8 +143,7 @@ class HaproxyAmphoraLoadBalancerDriver(
tls_cert=certs['tls_cert'],
user_group=CONF.haproxy_amphora.user_group)
self.client.upload_config(amp, listener.id, config)
self.client.reload_listener(amp, listener.id,
listener.protocol)
self.client.reload_listener(amp, listener.id)
def upload_cert_amp(self, amp, pem):
LOG.debug("Amphora %s updating cert in REST driver "
@ -154,37 +151,13 @@ class HaproxyAmphoraLoadBalancerDriver(
self.__class__.__name__, amp.id)
self.client.update_cert_for_rotation(amp, pem)
def _check_if_need_add_listener_protocol(self, func):
# as _apply func will be called by create/update/delete and some cert
# related function. But there is only a port of them can accept
# listener protocol parameter, including:
# start/stop functions call _apply by functools.
# delete function call _apply directly.
# escape cert operation based on function name.
# So add this check for verify if the target function need a protocol
# parameter.
called_by_functools = (not hasattr(func, '__name__') and
hasattr(func, 'func') and
func.func.__name__.find('cert') < 0)
called_directly = (hasattr(func, '__name__') and
func.__name__.find('cert') < 0)
return called_directly or called_by_functools
def _apply(self, func, listener=None, amphora=None, *args):
if amphora is None:
for amp in listener.load_balancer.amphorae:
if amp.status != consts.DELETED:
if self._check_if_need_add_listener_protocol(func):
_list = list(args)
_list.append(listener.protocol)
args = _list
func(amp, listener.id, *args)
else:
if amphora.status != consts.DELETED:
if self._check_if_need_add_listener_protocol(func):
_list = list(args)
_list.append(listener.protocol)
args = _list
func(amphora, listener.id, *args)
def stop(self, listener, vip):
@ -428,21 +401,17 @@ class AmphoraAPIClient(object):
data=config)
return exc.check_exception(r)
def get_listener_status(self, amp, listener_id, protocol=None):
protocol_dict = {'protocol': protocol}
def get_listener_status(self, amp, listener_id):
r = self.get(
amp,
'listeners/{listener_id}'.format(listener_id=listener_id),
json=protocol_dict)
'listeners/{listener_id}'.format(listener_id=listener_id))
if exc.check_exception(r):
return r.json()
return None
def _action(self, action, amp, listener_id, protocol, timeout_dict=None):
protocol_dict = {'protocol': protocol}
def _action(self, action, amp, listener_id, timeout_dict=None):
r = self.put(amp, 'listeners/{listener_id}/{action}'.format(
listener_id=listener_id, action=action), timeout_dict=timeout_dict,
json=protocol_dict)
listener_id=listener_id, action=action), timeout_dict=timeout_dict)
return exc.check_exception(r)
def upload_cert_pem(self, amp, listener_id, pem_filename, pem_file):
@ -465,12 +434,9 @@ class AmphoraAPIClient(object):
return r.json().get("md5sum")
return None
def delete_listener(self, amp, listener_id, protocol):
protocol_dict = {'protocol': protocol}
def delete_listener(self, amp, listener_id):
r = self.delete(
amp, 'listeners/{listener_id}'.format(listener_id=listener_id),
json=protocol_dict)
amp, 'listeners/{listener_id}'.format(listener_id=listener_id))
return exc.check_exception(r, (404,))
def get_info(self, amp):

View File

@ -73,6 +73,8 @@ class RootController(rest.RestController):
if CONF.api_settings.api_v2_enabled:
self._add_a_version(versions, 'v2.0', 'v2', 'SUPPORTED',
'2016-12-11T00:00:00Z', host_url)
self._add_a_version(versions, 'v2.1', 'v2', 'CURRENT',
self._add_a_version(versions, 'v2.1', 'v2', 'SUPPORTED',
'2018-04-20T00:00:00Z', host_url)
self._add_a_version(versions, 'v2.2', 'v2', 'CURRENT',
'2018-07-31T00:00:00Z', host_url)
return {'versions': versions}

View File

@ -130,20 +130,17 @@ class PoolsController(base.BaseController):
def _is_only_specified_in_request(self, request, **kwargs):
request_attrs = []
check_attrs = kwargs['check_exist_attrs']
excaped_attrs = ['from_data_model',
escaped_attrs = ['from_data_model',
'translate_dict_keys_to_data_model', 'to_dict']
for attr in dir(request):
if attr.startswith('_') or attr in excaped_attrs:
if attr.startswith('_') or attr in escaped_attrs:
continue
else:
request_attrs.append(attr)
for req_attr in request_attrs:
if (getattr(
request, req_attr) and req_attr not in check_attrs) or (
not getattr(
request, req_attr) and req_attr in check_attrs):
if (getattr(request, req_attr) and req_attr not in check_attrs):
return False
return True

View File

@ -32,7 +32,6 @@ HEALTH_MONITOR_HTTP = 'HTTP'
HEALTH_MONITOR_HTTPS = 'HTTPS'
HEALTH_MONITOR_TLS_HELLO = 'TLS-HELLO'
HEALTH_MONITOR_UDP_CONNECT = 'UDP-CONNECT'
UDP_CONNECT_SCRIPT_MIN_INTERVAL = 3
SUPPORTED_HEALTH_MONITOR_TYPES = (HEALTH_MONITOR_HTTP, HEALTH_MONITOR_HTTPS,
HEALTH_MONITOR_PING, HEALTH_MONITOR_TCP,
HEALTH_MONITOR_TLS_HELLO,
@ -547,3 +546,10 @@ OCTAVIA = 'octavia'
# FLAVORS
# TODO(johnsom) When flavors are implemented, this should be removed.
SUPPORTED_FLAVORS = ()
# systemctl commands
DISABLE = 'disable'
ENABLE = 'enable'
# systemd amphora netns service prefix
AMP_NETNS_SVC_PREFIX = 'amphora-netns'

View File

@ -100,6 +100,7 @@ class LvsJinjaTemplater(object):
"""
t_listener = self._transform_listener(listener)
ret_value = {
'id': loadbalancer.id,
'vip_address': loadbalancer.vip.ip_address,
'listener': t_listener,
'enabled': loadbalancer.enabled

View File

@ -13,6 +13,7 @@
# under the License.
#
#}
# Configuration for Loadbalancer {{ loadbalancer.id }}
# Configuration for Listener {{ udp_listener_id }}
{% block global_definitions %}{% endblock global_definitions %}

View File

@ -17,7 +17,7 @@
{% from 'macros.j2' import virtualserver_macro %}
{% set udp_listener_id = loadbalancer.listener.id %}
{% block global_definitions %}
net_namespace amphora-haproxy
net_namespace {{ constants.AMPHORA_NAMESPACE }}
{% endblock global_definitions %}
{% block proxies %}
{% if loadbalancer.enabled and loadbalancer.listener.enabled %}

View File

@ -26,7 +26,6 @@ misc_path "{{ health_monitor.check_script_path }} {{ member.address }} {{ member
MISC_CHECK {
{{ misc_path_macro(member, health_monitor) }}
misc_timeout {{ pool.health_monitor.delay }}
misc_dynamic
}
{%- endmacro -%}
@ -52,10 +51,6 @@ MISC_CHECK {
{% if listener.connection_limit %}
uthreshold {{ listener.connection_limit }}
{% endif %}
{% if pool.session_persistence and pool.session_persistence.type == constants.SESSION_PERSISTENCE_SOURCE_IP %}
persistence_timeout {{ pool.session_persistence.persistence_timeout }}
persistence_granularity {{ pool.session_persistence.persistence_granularity }}
{% endif %}
{{- health_monitor_rs_macro(constants, pool, member) }}
}
{% endmacro %}
@ -84,11 +79,22 @@ MISC_CHECK {
{% if need_render|length > 0 %}
virtual_server {{ lb_vip_address }} {{ listener.protocol_port }} {
{{ lb_algo_macro(default_pool) }}
{% if not default_pool.session_persistence %}
ops
{% endif %}
lb_kind NAT
protocol {{ listener.protocol_mode }}
protocol {{ listener.protocol_mode.upper() }}
{% if default_pool.session_persistence and default_pool.session_persistence.type == constants.SESSION_PERSISTENCE_SOURCE_IP %}
{# set our defined defaults as I saw this not be consistent #}
{# in testing #}
{% if default_pool.session_persistence.persistence_timeout %}
persistence_timeout {{ default_pool.session_persistence.persistence_timeout }}
{% else %}
persistence_timeout 360
{% endif %}
{% if default_pool.session_persistence.persistence_granularity %}
persistence_granularity {{ default_pool.session_persistence.persistence_granularity }}
{% else %}
persistence_granularity 255.255.255.255
{% endif %}
{% endif %}
{{ health_monitor_vs_macro(default_pool) }}
{% if default_pool.protocol.lower() == "udp" %}

View File

@ -0,0 +1,466 @@
# Copyright 2015 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import stat
import subprocess
import flask
import mock
from werkzeug import exceptions
from oslo_utils import uuidutils
from octavia.amphorae.backends.agent.api_server import keepalivedlvs
from octavia.amphorae.backends.agent.api_server import server
from octavia.amphorae.backends.agent.api_server import util
from octavia.common import constants as consts
from octavia.tests.common import utils as test_utils
from octavia.tests.unit import base
class KeepalivedLvsTestCase(base.TestCase):
FAKE_ID = uuidutils.generate_uuid()
LISTENER_ID = 'listener-1111-1111-1111-listenerid00'
POOL_ID = 'poolpool-1111-1111-1111-poolid000000'
MEMBER_ID1 = 'memberid-1111-1111-1111-memberid1111'
MEMBER_ID2 = 'memberid-2222-2222-2222-memberid2222'
HEALTHMONITOR_ID = 'hmidhmid-1111-1111-1111-healthmonito'
NORMAL_CFG_CONTENT = (
"# Configuration for Listener %(listener_id)s\n\n"
"net_namespace haproxy-amphora\n\n"
"virtual_server 10.0.0.2 80 {\n"
" lb_algo rr\n"
" lb_kind NAT\n"
" protocol udp\n"
" delay_loop 30\n"
" delay_before_retry 31\n"
" retry 3\n\n\n"
" # Configuration for Pool %(pool_id)s\n"
" # Configuration for HealthMonitor %(hm_id)s\n"
" # Configuration for Member %(member1_id)s\n"
" real_server 10.0.0.99 82 {\n"
" weight 13\n"
" inhibit_on_failure\n"
" uthreshold 98\n"
" persistence_timeout 33\n"
" persistence_granularity 255.255.0.0\n"
" delay_before_retry 31\n"
" retry 3\n"
" MISC_CHECK {\n"
" misc_path \"/var/lib/octavia/lvs/check/"
"udp_check.sh 10.0.0.99 82\"\n"
" misc_timeout 30\n"
" misc_dynamic\n"
" }\n"
" }\n\n"
" # Configuration for Member %(member2_id)s\n"
" real_server 10.0.0.98 82 {\n"
" weight 13\n"
" inhibit_on_failure\n"
" uthreshold 98\n"
" persistence_timeout 33\n"
" persistence_granularity 255.255.0.0\n"
" delay_before_retry 31\n"
" retry 3\n"
" MISC_CHECK {\n"
" misc_path \"/var/lib/octavia/lvs/check/"
"udp_check.sh 10.0.0.98 82\"\n"
" misc_timeout 30\n"
" misc_dynamic\n"
" }\n"
" }\n\n"
"}\n\n") % {'listener_id': LISTENER_ID, 'pool_id': POOL_ID,
'hm_id': HEALTHMONITOR_ID, 'member1_id': MEMBER_ID1,
'member2_id': MEMBER_ID2}
PROC_CONTENT = (
"IP Virtual Server version 1.2.1 (size=4096)\n"
"Prot LocalAddress:Port Scheduler Flags\n"
" -> RemoteAddress:Port Forward Weight ActiveConn InActConn\n"
"UDP 0A000002:0050 sh\n"
" -> 0A000063:0052 Masq 13 1 0\n"
" -> 0A000062:0052 Masq 13 1 0\n"
)
NORMAL_PID_CONTENT = "1988"
TEST_URL = server.PATH_PREFIX + '/listeners/%s/%s/udp_listener'
def setUp(self):
super(KeepalivedLvsTestCase, self).setUp()
self.app = flask.Flask(__name__)
self.client = self.app.test_client()
self._ctx = self.app.test_request_context()
self._ctx.push()
self.test_keepalivedlvs = keepalivedlvs.KeepalivedLvs()
self.app.add_url_rule(
rule=self.TEST_URL % ('<amphora_id>', '<listener_id>'),
view_func=(lambda amphora_id, listener_id:
self.test_keepalivedlvs.upload_udp_listener_config(
listener_id)),
methods=['PUT'])
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'run_systemctl_command')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'install_netns_systemd_service')
@mock.patch('pyroute2.NetNS')
@mock.patch('shutil.copy2')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSTEMD)
@mock.patch('os.chmod')
@mock.patch('os.path.exists')
@mock.patch('os.makedirs')
@mock.patch('os.remove')
@mock.patch('subprocess.check_output')
def test_upload_udp_listener_config_no_vrrp_check_dir(
self, m_check_output, m_os_rm, m_os_mkdir, m_exists, m_os_chmod,
m_os_sysinit, m_copy2, mock_netns, mock_install_netns,
mock_systemctl):
m_exists.side_effect = [False, False, True, True, False, False]
cfg_path = util.keepalived_lvs_cfg_path(self.FAKE_ID)
m = self.useFixture(test_utils.OpenFixture(cfg_path)).mock_open
with mock.patch('os.open') as m_open, mock.patch.object(os,
'fdopen',
m) as m_fdopen:
m_open.side_effect = ['TEST-WRITE-CFG',
'TEST-WRITE-SYSINIT']
res = self.client.put(self.TEST_URL % ('123', self.FAKE_ID),
data=self.NORMAL_CFG_CONTENT)
mock_install_netns.assert_called_once()
systemctl_calls = [
mock.call(consts.ENABLE,
consts.AMP_NETNS_SVC_PREFIX),
mock.call(consts.ENABLE,
'octavia-keepalivedlvs-%s' % str(self.FAKE_ID)),
]
mock_systemctl.assert_has_calls(systemctl_calls)
os_mkdir_calls = [
mock.call(util.keepalived_lvs_dir()),
mock.call(util.keepalived_backend_check_script_dir())
]
m_os_mkdir.assert_has_calls(os_mkdir_calls)
m_os_chmod.assert_called_with(
util.keepalived_backend_check_script_path(), stat.S_IEXEC)
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
systemd_cfg_path = util.keepalived_lvs_init_path(
consts.INIT_SYSTEMD, self.FAKE_ID)
m_open_calls = [
mock.call(cfg_path, flags, mode),
mock.call(systemd_cfg_path, flags, mode)
]
m_open.assert_has_calls(m_open_calls)
m_fdopen.assert_any_call('TEST-WRITE-CFG', 'wb')
m_fdopen.assert_any_call('TEST-WRITE-SYSINIT', 'w')
self.assertEqual(200, res.status_code)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'run_systemctl_command')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'install_netns_systemd_service')
@mock.patch('pyroute2.NetNS')
@mock.patch('shutil.copy2')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSTEMD)
@mock.patch('os.chmod')
@mock.patch('os.path.exists')
@mock.patch('os.makedirs')
@mock.patch('os.remove')
@mock.patch('subprocess.check_output')
def test_upload_udp_listener_config_with_vrrp_check_dir(
self, m_check_output, m_os_rm, m_os_mkdir, m_exists, m_os_chmod,
m_os_sysinit, m_copy2, mock_netns, mock_install_netns,
mock_systemctl):
m_exists.side_effect = [False, False, True, True, True, False, False]
cfg_path = util.keepalived_lvs_cfg_path(self.FAKE_ID)
m = self.useFixture(test_utils.OpenFixture(cfg_path)).mock_open
with mock.patch('os.open') as m_open, mock.patch.object(os,
'fdopen',
m) as m_fdopen:
m_open.side_effect = ['TEST-WRITE-CFG',
'TEST-WRITE-SYSINIT',
'TEST-WRITE-UDP-VRRP-CHECK']
res = self.client.put(self.TEST_URL % ('123', self.FAKE_ID),
data=self.NORMAL_CFG_CONTENT)
os_mkdir_calls = [
mock.call(util.keepalived_lvs_dir()),
mock.call(util.keepalived_backend_check_script_dir())
]
m_os_mkdir.assert_has_calls(os_mkdir_calls)
mock_install_netns.assert_called_once()
systemctl_calls = [
mock.call(consts.ENABLE,
consts.AMP_NETNS_SVC_PREFIX),
mock.call(consts.ENABLE,
'octavia-keepalivedlvs-%s' % str(self.FAKE_ID)),
]
mock_systemctl.assert_has_calls(systemctl_calls)
m_os_chmod.assert_called_with(
util.keepalived_backend_check_script_path(), stat.S_IEXEC)
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
systemd_cfg_path = util.keepalived_lvs_init_path(
consts.INIT_SYSTEMD, self.FAKE_ID)
script_path = os.path.join(
util.keepalived_check_scripts_dir(),
keepalivedlvs.KEEPALIVED_CHECK_SCRIPT_NAME)
m_open_calls = [
mock.call(cfg_path, flags, mode),
mock.call(systemd_cfg_path, flags, mode),
mock.call(script_path, flags, stat.S_IEXEC)
]
m_open.assert_has_calls(m_open_calls)
m_fdopen.assert_any_call('TEST-WRITE-CFG', 'wb')
m_fdopen.assert_any_call('TEST-WRITE-SYSINIT', 'w')
m_fdopen.assert_any_call('TEST-WRITE-UDP-VRRP-CHECK', 'w')
self.assertEqual(200, res.status_code)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'run_systemctl_command')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'install_netns_systemd_service')
@mock.patch('shutil.copy2')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSTEMD)
@mock.patch('os.chmod')
@mock.patch('os.path.exists')
@mock.patch('os.makedirs')
@mock.patch('os.remove')
@mock.patch('subprocess.check_output')
def test_upload_udp_listener_config_start_service_failure(
self, m_check_output, m_os_rm, m_os_mkdir, m_exists, m_os_chmod,
m_os_sysinit, m_copy2, mock_install_netns, mock_systemctl):
m_exists.side_effect = [False, False, True, True, True, False]
m_check_output.side_effect = subprocess.CalledProcessError(1, 'blah!')
cfg_path = util.keepalived_lvs_cfg_path(self.FAKE_ID)
m = self.useFixture(test_utils.OpenFixture(cfg_path)).mock_open
with mock.patch('os.open') as m_open, mock.patch.object(os,
'fdopen',
m) as m_fdopen:
m_open.side_effect = ['TEST-WRITE-CFG',
'TEST-WRITE-SYSINIT']
res = self.client.put(self.TEST_URL % ('123', self.FAKE_ID),
data=self.NORMAL_CFG_CONTENT)
os_mkdir_calls = [
mock.call(util.keepalived_lvs_dir()),
mock.call(util.keepalived_backend_check_script_dir())
]
m_os_mkdir.assert_has_calls(os_mkdir_calls)
mock_install_netns.assert_called_once()
systemctl_calls = [
mock.call(consts.ENABLE,
consts.AMP_NETNS_SVC_PREFIX),
mock.call(consts.ENABLE,
'octavia-keepalivedlvs-%s' % str(self.FAKE_ID)),
]
mock_systemctl.assert_has_calls(systemctl_calls)
m_os_chmod.assert_called_with(
util.keepalived_backend_check_script_path(), stat.S_IEXEC)
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
systemd_cfg_path = util.keepalived_lvs_init_path(
consts.INIT_SYSTEMD, self.FAKE_ID)
m_open_calls = [
mock.call(cfg_path, flags, mode),
mock.call(systemd_cfg_path, flags, mode)
]
m_open.assert_has_calls(m_open_calls)
m_fdopen.assert_any_call('TEST-WRITE-CFG', 'wb')
m_fdopen.assert_any_call('TEST-WRITE-SYSINIT', 'w')
self.assertEqual(500, res.status_code)
@mock.patch('subprocess.check_output')
@mock.patch('octavia.amphorae.backends.agent.api_server.'
'keepalivedlvs.KeepalivedLvs.'
'_check_udp_listener_exists')
def test_manage_udp_listener(self, mock_udp_exist, mock_check_output):
res = self.test_keepalivedlvs.manage_udp_listener(self.FAKE_ID,
'start')
cmd = ("/usr/sbin/service octavia-keepalivedlvs-{listener_id}"
" {action}".format(listener_id=self.FAKE_ID, action='start'))
mock_check_output.assert_called_once_with(cmd.split(),
stderr=subprocess.STDOUT)
self.assertEqual(202, res.status_code)
res = self.test_keepalivedlvs.manage_udp_listener(self.FAKE_ID,
'restart')
self.assertEqual(400, res.status_code)
mock_check_output.side_effect = subprocess.CalledProcessError(1,
'blah!')
res = self.test_keepalivedlvs.manage_udp_listener(self.FAKE_ID,
'start')
self.assertEqual(500, res.status_code)
@mock.patch('octavia.amphorae.backends.utils.keepalivedlvs_query.'
'get_listener_realserver_mapping')
@mock.patch('subprocess.check_output', return_value=PROC_CONTENT)
@mock.patch('os.path.exists')
def test_get_udp_listener_status(self, m_exist, m_check_output,
mget_mapping):
mget_mapping.return_value = (
True, {'10.0.0.99:82': {'status': 'UP',
'Weight': '13',
'InActConn': '0',
'ActiveConn': '0'},
'10.0.0.98:82': {'status': 'UP',
'Weight': '13',
'InActConn': '0',
'ActiveConn': '0'}})
pid_path = ('/var/lib/octavia/lvs/octavia-'
'keepalivedlvs-%s.pid' % self.FAKE_ID)
self.useFixture(test_utils.OpenFixture(pid_path,
self.NORMAL_PID_CONTENT))
cfg_path = ('/var/lib/octavia/lvs/octavia-'
'keepalivedlvs-%s.conf' % self.FAKE_ID)
self.useFixture(test_utils.OpenFixture(cfg_path,
self.NORMAL_CFG_CONTENT))
m_exist.return_value = True
expected = {'status': 'ACTIVE',
'pools': [{'lvs': {
'members': {self.MEMBER_ID1: 'UP',
self.MEMBER_ID2: 'UP'},
'status': 'UP',
'uuid': self.POOL_ID}}],
'type': 'UDP', 'uuid': self.FAKE_ID}
res = self.test_keepalivedlvs.get_udp_listener_status(self.FAKE_ID)
self.assertEqual(200, res.status_code)
self.assertEqual(expected, res.json)
@mock.patch('os.path.exists')
def test_get_udp_listener_status_no_exists(self, m_exist):
m_exist.return_value = False
self.assertRaises(exceptions.HTTPException,
self.test_keepalivedlvs.get_udp_listener_status,
self.FAKE_ID)
@mock.patch('os.path.exists')
def test_get_udp_listener_status_offline_status(self, m_exist):
m_exist.return_value = True
pid_path = ('/var/lib/octavia/lvs/octavia-'
'keepalivedlvs-%s.pid' % self.FAKE_ID)
self.useFixture(test_utils.OpenFixture(pid_path,
self.NORMAL_PID_CONTENT))
cfg_path = ('/var/lib/octavia/lvs/octavia-'
'keepalivedlvs-%s.conf' % self.FAKE_ID)
self.useFixture(test_utils.OpenFixture(cfg_path, 'NO VS CONFIG'))
expected = {'status': 'OFFLINE',
'type': 'UDP',
'uuid': self.FAKE_ID}
res = self.test_keepalivedlvs.get_udp_listener_status(self.FAKE_ID)
self.assertEqual(200, res.status_code)
self.assertEqual(expected, res.json)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_udp_listeners', return_value=[LISTENER_ID])
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSTEMD)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_keepalivedlvs_pid')
@mock.patch('subprocess.check_output')
@mock.patch('os.remove')
@mock.patch('os.path.exists')
def test_delete_udp_listener(self, m_exist, m_remove, m_check_output,
mget_pid, m_init_sys, mget_udp_listeners):
m_exist.return_value = True
res = self.test_keepalivedlvs.delete_udp_listener(self.FAKE_ID)
cmd1 = ("/usr/sbin/service "
"octavia-keepalivedlvs-{0} stop".format(self.FAKE_ID))
cmd2 = ("systemctl disable "
"octavia-keepalivedlvs-{list}".format(list=self.FAKE_ID))
calls = [
mock.call(cmd1.split(), stderr=subprocess.STDOUT),
mock.call(cmd2.split(), stderr=subprocess.STDOUT)
]
m_check_output.assert_has_calls(calls)
self.assertEqual(200, res.status_code)
@mock.patch.object(keepalivedlvs, "webob")
@mock.patch('os.path.exists')
def test_delete_udp_listener_not_exist(self, m_exist, m_webob):
m_exist.return_value = False
self.test_keepalivedlvs.delete_udp_listener(self.FAKE_ID)
calls = [
mock.call(
json=dict(message='UDP Listener Not Found',
details="No UDP listener with UUID: "
"{0}".format(self.FAKE_ID)), status=404),
mock.call(json={'message': 'OK'})
]
m_webob.Response.assert_has_calls(calls)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_keepalivedlvs_pid')
@mock.patch('subprocess.check_output')
@mock.patch('os.path.exists')
def test_delete_udp_listener_stop_service_fail(self, m_exist,
m_check_output, mget_pid):
m_exist.return_value = True
m_check_output.side_effect = subprocess.CalledProcessError(1,
'Woops!')
res = self.test_keepalivedlvs.delete_udp_listener(self.FAKE_ID)
self.assertEqual(500, res.status_code)
self.assertEqual({'message': 'Error stopping keepalivedlvs',
'details': None}, res.json)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSVINIT)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_keepalivedlvs_pid')
@mock.patch('subprocess.check_output')
@mock.patch('os.remove')
@mock.patch('os.path.exists')
def test_delete_udp_listener_disable_service_fail(self, m_exist, m_remove,
m_check_output, mget_pid,
m_init_sys):
m_exist.return_value = True
m_check_output.side_effect = [True,
subprocess.CalledProcessError(
1, 'Woops!')]
res = self.test_keepalivedlvs.delete_udp_listener(self.FAKE_ID)
self.assertEqual(500, res.status_code)
self.assertEqual({
'message': 'Error disabling '
'octavia-keepalivedlvs-%s service' % self.FAKE_ID,
'details': None}, res.json)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_keepalivedlvs_pid')
@mock.patch('subprocess.check_output')
@mock.patch('os.remove')
@mock.patch('os.path.exists')
def test_delete_udp_listener_unsupported_sysinit(self, m_exist, m_remove,
m_check_output, mget_pid,
m_init_sys):
m_exist.return_value = True
self.assertRaises(
util.UnknownInitError, self.test_keepalivedlvs.delete_udp_listener,
self.FAKE_ID)

View File

@ -57,9 +57,6 @@ class TestServerTestCase(base.TestCase):
self.conf = self.useFixture(oslo_fixture.Config(config.cfg.CONF))
self.conf.config(group="haproxy_amphora", base_path='/var/lib/octavia')
mock.patch('octavia.amphorae.backends.agent.api_server.server.'
'check_and_return_request_listener_protocol',
return_value='TCP').start()
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSTEMD)
@ -400,27 +397,39 @@ class TestServerTestCase(base.TestCase):
hostname='test-host'),
json.loads(rv.data.decode('utf-8')))
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_listener_protocol', return_value='TCP')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSTEMD)
def test_delete_ubuntu_listener_systemd(self, mock_init_system):
def test_delete_ubuntu_listener_systemd(self, mock_init_system,
mock_get_proto):
self._test_delete_listener(consts.INIT_SYSTEMD, consts.UBUNTU,
mock_init_system)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_listener_protocol', return_value='TCP')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSTEMD)
def test_delete_centos_listener_systemd(self, mock_init_system):
def test_delete_centos_listener_systemd(self, mock_init_system,
mock_get_proto):
self._test_delete_listener(consts.INIT_SYSTEMD, consts.CENTOS,
mock_init_system)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_listener_protocol', return_value='TCP')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSVINIT)
def test_delete_ubuntu_listener_sysvinit(self, mock_init_system):
def test_delete_ubuntu_listener_sysvinit(self, mock_init_system,
mock_get_proto):
self._test_delete_listener(consts.INIT_SYSVINIT, consts.UBUNTU,
mock_init_system)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_listener_protocol', return_value='TCP')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_UPSTART)
def test_delete_ubuntu_listener_upstart(self, mock_init_system):
def test_delete_ubuntu_listener_upstart(self, mock_init_system,
mock_get_proto):
self._test_delete_listener(consts.INIT_UPSTART, consts.UBUNTU,
mock_init_system)
@ -725,6 +734,8 @@ class TestServerTestCase(base.TestCase):
def test_centos_get_listener(self):
self._test_get_listener(consts.CENTOS)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_listener_protocol', return_value='TCP')
@mock.patch('octavia.amphorae.backends.agent.api_server.listener.Listener.'
'_check_listener_status')
@mock.patch('octavia.amphorae.backends.agent.api_server.listener.Listener.'
@ -732,7 +743,7 @@ class TestServerTestCase(base.TestCase):
@mock.patch('octavia.amphorae.backends.utils.haproxy_query.HAProxyQuery')
@mock.patch('os.path.exists')
def _test_get_listener(self, distro, mock_exists, mock_query, mock_parse,
mock_status):
mock_status, mock_get_proto):
self.assertIn(distro, [consts.UBUNTU, consts.CENTOS])
# Listener not found
mock_exists.side_effect = [False]

View File

@ -46,10 +46,11 @@ class TestRootController(base_db_test.OctaviaDBTestBase):
versions = self._get_versions_with_config(
api_v1_enabled=True, api_v2_enabled=True)
version_ids = tuple(v.get('id') for v in versions)
self.assertEqual(3, len(version_ids))
self.assertEqual(4, len(version_ids))
self.assertIn('v1', version_ids)
self.assertIn('v2.0', version_ids)
self.assertIn('v2.1', version_ids)
self.assertIn('v2.2', version_ids)
# Each version should have a 'self' 'href' to the API version URL
# [{u'rel': u'self', u'href': u'http://localhost/v2'}]
@ -69,9 +70,10 @@ class TestRootController(base_db_test.OctaviaDBTestBase):
def test_api_v1_disabled(self):
versions = self._get_versions_with_config(
api_v1_enabled=False, api_v2_enabled=True)
self.assertEqual(2, len(versions))
self.assertEqual(3, len(versions))
self.assertEqual('v2.0', versions[0].get('id'))
self.assertEqual('v2.1', versions[1].get('id'))
self.assertEqual('v2.2', versions[2].get('id'))
def test_api_v2_disabled(self):
versions = self._get_versions_with_config(

View File

@ -12,301 +12,22 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import stat
import subprocess
import flask
import mock
from werkzeug import exceptions
from oslo_utils import uuidutils
from octavia.amphorae.backends.agent.api_server import keepalivedlvs
from octavia.amphorae.backends.agent.api_server import server
from octavia.amphorae.backends.agent.api_server import util
from octavia.common import constants as consts
from octavia.tests.common import utils as test_utils
from octavia.tests.unit import base
class KeepalivedLvsTestCase(base.TestCase):
FAKE_ID = uuidutils.generate_uuid()
LISTENER_ID = 'listener-1111-1111-1111-listenerid00'
POOL_ID = 'poolpool-1111-1111-1111-poolid000000'
MEMBER_ID1 = 'memberid-1111-1111-1111-memberid1111'
MEMBER_ID2 = 'memberid-2222-2222-2222-memberid2222'
HEALTHMONITOR_ID = 'hmidhmid-1111-1111-1111-healthmonito'
NORMAL_CFG_CONTENT = (
"# Configuration for Listener %(listener_id)s\n\n"
"net_namespace haproxy-amphora\n\n"
"virtual_server 10.0.0.2 80 {\n"
" lb_algo rr\n"
" lb_kind NAT\n"
" protocol udp\n"
" delay_loop 30\n"
" delay_before_retry 31\n"
" retry 3\n\n\n"
" # Configuration for Pool %(pool_id)s\n"
" # Configuration for HealthMonitor %(hm_id)s\n"
" # Configuration for Member %(member1_id)s\n"
" real_server 10.0.0.99 82 {\n"
" weight 13\n"
" inhibit_on_failure\n"
" uthreshold 98\n"
" persistence_timeout 33\n"
" persistence_granularity 255.255.0.0\n"
" delay_before_retry 31\n"
" retry 3\n"
" MISC_CHECK {\n"
" misc_path \"/var/lib/octavia/lvs/check/"
"udp_check.sh 10.0.0.99 82\"\n"
" misc_timeout 30\n"
" misc_dynamic\n"
" }\n"
" }\n\n"
" # Configuration for Member %(member2_id)s\n"
" real_server 10.0.0.98 82 {\n"
" weight 13\n"
" inhibit_on_failure\n"
" uthreshold 98\n"
" persistence_timeout 33\n"
" persistence_granularity 255.255.0.0\n"
" delay_before_retry 31\n"
" retry 3\n"
" MISC_CHECK {\n"
" misc_path \"/var/lib/octavia/lvs/check/"
"udp_check.sh 10.0.0.98 82\"\n"
" misc_timeout 30\n"
" misc_dynamic\n"
" }\n"
" }\n\n"
"}\n\n") % {'listener_id': LISTENER_ID, 'pool_id': POOL_ID,
'hm_id': HEALTHMONITOR_ID, 'member1_id': MEMBER_ID1,
'member2_id': MEMBER_ID2}
PROC_CONTENT = (
"IP Virtual Server version 1.2.1 (size=4096)\n"
"Prot LocalAddress:Port Scheduler Flags\n"
" -> RemoteAddress:Port Forward Weight ActiveConn InActConn\n"
"UDP 0A000002:0050 sh\n"
" -> 0A000063:0052 Masq 13 1 0\n"
" -> 0A000062:0052 Masq 13 1 0\n"
)
NORMAL_PID_CONTENT = "1988"
TEST_URL = server.PATH_PREFIX + '/listeners/%s/%s/udp_listener'
def setUp(self):
super(KeepalivedLvsTestCase, self).setUp()
self.app = flask.Flask(__name__)
self.client = self.app.test_client()
self._ctx = self.app.test_request_context()
self._ctx.push()
self.test_keepalivedlvs = keepalivedlvs.KeepalivedLvs()
self.app.add_url_rule(
rule=self.TEST_URL % ('<amphora_id>', '<listener_id>'),
view_func=(lambda amphora_id, listener_id:
self.test_keepalivedlvs.upload_udp_listener_config(
listener_id)),
methods=['PUT'])
@mock.patch('pyroute2.NetNS')
@mock.patch('shutil.copy2')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSTEMD)
@mock.patch('os.chmod')
@mock.patch('os.path.exists')
@mock.patch('os.makedirs')
@mock.patch('os.remove')
@mock.patch('subprocess.check_output')
def test_upload_udp_listener_config_no_vrrp_check_dir(
self, m_check_output, m_os_rm, m_os_mkdir, m_exists, m_os_chmod,
m_os_sysinit, m_copy2, mock_netns):
m_exists.side_effect = [False, False, True, True, False, False]
cfg_path = util.keepalived_lvs_cfg_path(self.FAKE_ID)
m = self.useFixture(test_utils.OpenFixture(cfg_path)).mock_open
with mock.patch('os.open') as m_open, mock.patch.object(os,
'fdopen',
m) as m_fdopen:
m_open.side_effect = ['TEST-WRITE-CFG',
'TEST-WRITE-SYSINIT']
res = self.client.put(self.TEST_URL % ('123', self.FAKE_ID),
data=self.NORMAL_CFG_CONTENT)
os_mkdir_calls = [
mock.call(util.keepalived_lvs_dir()),
mock.call(util.keepalived_backend_check_script_dir())
]
m_os_mkdir.assert_has_calls(os_mkdir_calls)
m_os_chmod.assert_called_with(
util.keepalived_backend_check_script_path(), stat.S_IEXEC)
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
systemd_cfg_path = util.keepalived_lvs_init_path(
consts.INIT_SYSTEMD, self.FAKE_ID)
m_open_calls = [
mock.call(cfg_path, flags, mode),
mock.call(systemd_cfg_path, flags, mode)
]
m_open.assert_has_calls(m_open_calls)
m_fdopen.assert_any_call('TEST-WRITE-CFG', 'wb')
m_fdopen.assert_any_call('TEST-WRITE-SYSINIT', 'w')
self.assertEqual(200, res.status_code)
@mock.patch('pyroute2.NetNS')
@mock.patch('shutil.copy2')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSTEMD)
@mock.patch('os.chmod')
@mock.patch('os.path.exists')
@mock.patch('os.makedirs')
@mock.patch('os.remove')
@mock.patch('subprocess.check_output')
def test_upload_udp_listener_config_with_vrrp_check_dir(
self, m_check_output, m_os_rm, m_os_mkdir, m_exists, m_os_chmod,
m_os_sysinit, m_copy2, mock_netns):
m_exists.side_effect = [False, False, True, True, True, False, False]
cfg_path = util.keepalived_lvs_cfg_path(self.FAKE_ID)
m = self.useFixture(test_utils.OpenFixture(cfg_path)).mock_open
with mock.patch('os.open') as m_open, mock.patch.object(os,
'fdopen',
m) as m_fdopen:
m_open.side_effect = ['TEST-WRITE-CFG',
'TEST-WRITE-SYSINIT',
'TEST-WRITE-UDP-VRRP-CHECK']
res = self.client.put(self.TEST_URL % ('123', self.FAKE_ID),
data=self.NORMAL_CFG_CONTENT)
os_mkdir_calls = [
mock.call(util.keepalived_lvs_dir()),
mock.call(util.keepalived_backend_check_script_dir())
]
m_os_mkdir.assert_has_calls(os_mkdir_calls)
m_os_chmod.assert_called_with(
util.keepalived_backend_check_script_path(), stat.S_IEXEC)
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
systemd_cfg_path = util.keepalived_lvs_init_path(
consts.INIT_SYSTEMD, self.FAKE_ID)
script_path = os.path.join(
util.keepalived_check_scripts_dir(),
keepalivedlvs.KEEPALIVED_CHECK_SCRIPT_NAME)
m_open_calls = [
mock.call(cfg_path, flags, mode),
mock.call(systemd_cfg_path, flags, mode),
mock.call(script_path, flags, stat.S_IEXEC)
]
m_open.assert_has_calls(m_open_calls)
m_fdopen.assert_any_call('TEST-WRITE-CFG', 'wb')
m_fdopen.assert_any_call('TEST-WRITE-SYSINIT', 'w')
m_fdopen.assert_any_call('TEST-WRITE-UDP-VRRP-CHECK', 'w')
self.assertEqual(200, res.status_code)
@mock.patch('shutil.copy2')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSTEMD)
@mock.patch('os.chmod')
@mock.patch('os.path.exists')
@mock.patch('os.makedirs')
@mock.patch('os.remove')
@mock.patch('subprocess.check_output')
def test_upload_udp_listener_config_start_service_failure(
self, m_check_output, m_os_rm, m_os_mkdir, m_exists, m_os_chmod,
m_os_sysinit, m_copy2):
m_exists.side_effect = [False, False, True, True, True, False]
m_check_output.side_effect = subprocess.CalledProcessError(1, 'blah!')
cfg_path = util.keepalived_lvs_cfg_path(self.FAKE_ID)
m = self.useFixture(test_utils.OpenFixture(cfg_path)).mock_open
with mock.patch('os.open') as m_open, mock.patch.object(os,
'fdopen',
m) as m_fdopen:
m_open.side_effect = ['TEST-WRITE-CFG',
'TEST-WRITE-SYSINIT']
res = self.client.put(self.TEST_URL % ('123', self.FAKE_ID),
data=self.NORMAL_CFG_CONTENT)
os_mkdir_calls = [
mock.call(util.keepalived_lvs_dir()),
mock.call(util.keepalived_backend_check_script_dir())
]
m_os_mkdir.assert_has_calls(os_mkdir_calls)
m_os_chmod.assert_called_with(
util.keepalived_backend_check_script_path(), stat.S_IEXEC)
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
systemd_cfg_path = util.keepalived_lvs_init_path(
consts.INIT_SYSTEMD, self.FAKE_ID)
m_open_calls = [
mock.call(cfg_path, flags, mode),
mock.call(systemd_cfg_path, flags, mode)
]
m_open.assert_has_calls(m_open_calls)
m_fdopen.assert_any_call('TEST-WRITE-CFG', 'wb')
m_fdopen.assert_any_call('TEST-WRITE-SYSINIT', 'w')
self.assertEqual(500, res.status_code)
@mock.patch('subprocess.check_output')
@mock.patch('octavia.amphorae.backends.agent.api_server.'
'keepalivedlvs.KeepalivedLvs.'
'_check_udp_listener_exists')
def test_manage_udp_listener(self, mock_udp_exist, mock_check_output):
res = self.test_keepalivedlvs.manage_udp_listener(self.FAKE_ID,
'start')
cmd = ("/usr/sbin/service octavia-keepalivedlvs-{listener_id}"
" {action}".format(listener_id=self.FAKE_ID, action='start'))
mock_check_output.assert_called_once_with(cmd.split(),
stderr=subprocess.STDOUT)
self.assertEqual(202, res.status_code)
res = self.test_keepalivedlvs.manage_udp_listener(self.FAKE_ID,
'restart')
self.assertEqual(400, res.status_code)
mock_check_output.side_effect = subprocess.CalledProcessError(1,
'blah!')
res = self.test_keepalivedlvs.manage_udp_listener(self.FAKE_ID,
'start')
self.assertEqual(500, res.status_code)
@mock.patch('octavia.amphorae.backends.utils.keepalivedlvs_query.'
'get_listener_realserver_mapping')
@mock.patch('subprocess.check_output', return_value=PROC_CONTENT)
@mock.patch('os.path.exists')
def test_get_udp_listener_status(self, m_exist, m_check_output,
mget_mapping):
mget_mapping.return_value = (
True, {'10.0.0.99:82': {'status': 'UP',
'Weight': '13',
'InActConn': '0',
'ActiveConn': '0'},
'10.0.0.98:82': {'status': 'UP',
'Weight': '13',
'InActConn': '0',
'ActiveConn': '0'}})
pid_path = ('/var/lib/octavia/lvs/octavia-'
'keepalivedlvs-%s.pid' % self.FAKE_ID)
self.useFixture(test_utils.OpenFixture(pid_path,
self.NORMAL_PID_CONTENT))
cfg_path = ('/var/lib/octavia/lvs/octavia-'
'keepalivedlvs-%s.conf' % self.FAKE_ID)
self.useFixture(test_utils.OpenFixture(cfg_path,
self.NORMAL_CFG_CONTENT))
m_exist.return_value = True
expected = {'status': 'ACTIVE',
'pools': [{'lvs': {
'members': {self.MEMBER_ID1: 'UP',
self.MEMBER_ID2: 'UP'},
'status': 'UP',
'uuid': self.POOL_ID}}],
'type': 'lvs', 'uuid': self.FAKE_ID}
res = self.test_keepalivedlvs.get_udp_listener_status(self.FAKE_ID)
self.assertEqual(200, res.status_code)
self.assertEqual(expected, res.json)
@mock.patch('os.path.exists')
def test_get_udp_listener_status_no_exists(self, m_exist):
@ -315,48 +36,6 @@ class KeepalivedLvsTestCase(base.TestCase):
self.test_keepalivedlvs.get_udp_listener_status,
self.FAKE_ID)
@mock.patch('os.path.exists')
def test_get_udp_listener_status_offline_status(self, m_exist):
m_exist.return_value = True
pid_path = ('/var/lib/octavia/lvs/octavia-'
'keepalivedlvs-%s.pid' % self.FAKE_ID)
self.useFixture(test_utils.OpenFixture(pid_path,
self.NORMAL_PID_CONTENT))
cfg_path = ('/var/lib/octavia/lvs/octavia-'
'keepalivedlvs-%s.conf' % self.FAKE_ID)
self.useFixture(test_utils.OpenFixture(cfg_path, 'NO VS CONFIG'))
expected = {'status': 'OFFLINE',
'type': '',
'uuid': self.FAKE_ID}
res = self.test_keepalivedlvs.get_udp_listener_status(self.FAKE_ID)
self.assertEqual(200, res.status_code)
self.assertEqual(expected, res.json)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_udp_listeners', return_value=[LISTENER_ID])
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSTEMD)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_keepalivedlvs_pid')
@mock.patch('subprocess.check_output')
@mock.patch('os.remove')
@mock.patch('os.path.exists')
def test_delete_udp_listener(self, m_exist, m_remove, m_check_output,
mget_pid, m_init_sys, mget_udp_listeners):
m_exist.return_value = True
res = self.test_keepalivedlvs.delete_udp_listener(self.FAKE_ID)
cmd1 = ("/usr/sbin/service "
"octavia-keepalivedlvs-{0} stop".format(self.FAKE_ID))
cmd2 = ("systemctl disable "
"octavia-keepalivedlvs-{list}".format(list=self.FAKE_ID))
calls = [
mock.call(cmd1.split(), stderr=subprocess.STDOUT),
mock.call(cmd2.split(), stderr=subprocess.STDOUT)
]
m_check_output.assert_has_calls(calls)
self.assertEqual(200, res.status_code)
@mock.patch.object(keepalivedlvs, "webob")
@mock.patch('os.path.exists')
def test_delete_udp_listener_not_exist(self, m_exist, m_webob):
@ -371,41 +50,6 @@ class KeepalivedLvsTestCase(base.TestCase):
]
m_webob.Response.assert_has_calls(calls)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_keepalivedlvs_pid')
@mock.patch('subprocess.check_output')
@mock.patch('os.path.exists')
def test_delete_udp_listener_stop_service_fail(self, m_exist,
m_check_output, mget_pid):
m_exist.return_value = True
m_check_output.side_effect = subprocess.CalledProcessError(1,
'Woops!')
res = self.test_keepalivedlvs.delete_udp_listener(self.FAKE_ID)
self.assertEqual(500, res.status_code)
self.assertEqual({'message': 'Error stopping keepalivedlvs',
'details': None}, res.json)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system', return_value=consts.INIT_SYSVINIT)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_keepalivedlvs_pid')
@mock.patch('subprocess.check_output')
@mock.patch('os.remove')
@mock.patch('os.path.exists')
def test_delete_udp_listener_disable_service_fail(self, m_exist, m_remove,
m_check_output, mget_pid,
m_init_sys):
m_exist.return_value = True
m_check_output.side_effect = [True,
subprocess.CalledProcessError(
1, 'Woops!')]
res = self.test_keepalivedlvs.delete_udp_listener(self.FAKE_ID)
self.assertEqual(500, res.status_code)
self.assertEqual({
'message': 'Error disabling '
'octavia-keepalivedlvs-%s service' % self.FAKE_ID,
'details': None}, res.json)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'get_os_init_system')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'

View File

@ -0,0 +1,196 @@
# Copyright 2018 Rackspace, US Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import subprocess
import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from octavia.amphorae.backends.agent.api_server import util
from octavia.common import constants as consts
from octavia.tests.common import utils as test_utils
import octavia.tests.unit.base as base
CONF = cfg.CONF
class TestUtil(base.TestCase):
def setUp(self):
super(TestUtil, self).setUp()
self.CONF = self.useFixture(oslo_fixture.Config(cfg.CONF))
self.listener_id = uuidutils.generate_uuid()
def test_keepalived_lvs_dir(self):
fake_path = '/fake/path'
self.CONF.config(group="haproxy_amphora", base_path=fake_path)
result = util.keepalived_lvs_dir()
fake_path = fake_path + '/lvs'
self.assertEqual(fake_path, result)
def test_keepalived_lvs_init_path(self):
# Test systemd
ref_path = (consts.SYSTEMD_DIR + '/' +
consts.KEEPALIVED_SYSTEMD_PREFIX % str(self.listener_id))
result = util.keepalived_lvs_init_path(consts.INIT_SYSTEMD,
self.listener_id)
self.assertEqual(ref_path, result)
# Test upstart
ref_path = (consts.UPSTART_DIR + '/' +
consts.KEEPALIVED_UPSTART_PREFIX % str(self.listener_id))
result = util.keepalived_lvs_init_path(consts.INIT_UPSTART,
self.listener_id)
self.assertEqual(ref_path, result)
# Test sysvinit
ref_path = (consts.SYSVINIT_DIR + '/' +
consts.KEEPALIVED_SYSVINIT_PREFIX % str(self.listener_id))
result = util.keepalived_lvs_init_path(consts.INIT_SYSVINIT,
self.listener_id)
self.assertEqual(ref_path, result)
# Test bad init system
self.assertRaises(util.UnknownInitError, util.keepalived_lvs_init_path,
'bogus_init', self.listener_id)
def test_keepalived_lvs_pids_path(self):
fake_path = '/fake/path'
self.CONF.config(group="haproxy_amphora", base_path=fake_path)
pid_path = (fake_path + '/' + 'lvs/octavia-keepalivedlvs-' +
self.listener_id + '.' + 'pid')
vrrp_pid_path = (fake_path + '/' + 'lvs/octavia-keepalivedlvs-' +
self.listener_id + '.' + 'vrrp.pid')
check_pid_path = (fake_path + '/' + 'lvs/octavia-keepalivedlvs-' +
self.listener_id + '.' + 'check.pid')
result1, result2, result3 = util.keepalived_lvs_pids_path(
self.listener_id)
self.assertEqual(pid_path, result1)
self.assertEqual(vrrp_pid_path, result2)
self.assertEqual(check_pid_path, result3)
def test_keepalived_lvs_cfg_path(self):
fake_path = '/fake/path'
self.CONF.config(group="haproxy_amphora", base_path=fake_path)
ref_path = (fake_path + '/lvs/octavia-keepalivedlvs-' +
self.listener_id + '.conf')
result = util.keepalived_lvs_cfg_path(self.listener_id)
self.assertEqual(ref_path, result)
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'keepalived_lvs_pids_path')
def test_get_keepalivedlvs_pid(self, mock_path):
fake_path = '/fake/path'
mock_path.return_value = [fake_path]
self.useFixture(test_utils.OpenFixture(
fake_path, ' space data ')).mock_open
result = util.get_keepalivedlvs_pid(self.listener_id)
self.assertEqual(' space data', result)
@mock.patch('jinja2.FileSystemLoader')
@mock.patch('jinja2.Environment')
@mock.patch('os.path')
@mock.patch('octavia.amphorae.backends.agent.api_server.osutils.'
'BaseOS.get_os_util')
def test_install_netns_systemd_service(self, mock_get_os_util,
mock_os_path, mock_jinja2_env,
mock_fsloader):
mock_os_util = mock.MagicMock()
mock_os_util.has_ifup_all.return_value = True
mock_get_os_util.return_value = mock_os_util
mock_os_path.realpath.return_value = '/dir/file'
mock_os_path.dirname.return_value = '/dir/'
mock_os_path.exists.return_value = False
mock_fsloader.return_value = 'fake_loader'
mock_jinja_env = mock.MagicMock()
mock_jinja2_env.return_value = mock_jinja_env
mock_template = mock.MagicMock()
mock_template.render.return_value = 'script'
mock_jinja_env.get_template.return_value = mock_template
m = mock.mock_open()
with mock.patch('os.open'), mock.patch.object(os, 'fdopen', m):
util.install_netns_systemd_service()
mock_jinja2_env.assert_called_with(autoescape=True,
loader='fake_loader')
mock_jinja_env.get_template.assert_called_once_with(
consts.AMP_NETNS_SVC_PREFIX + '.systemd.j2')
mock_template.render.assert_called_once_with(
amphora_nsname=consts.AMPHORA_NAMESPACE, HasIFUPAll=True)
handle = m()
handle.write.assert_called_with('script')
# Test file exists path we don't over write
mock_jinja_env.get_template.reset_mock()
mock_os_path.exists.return_value = True
util.install_netns_systemd_service()
self.assertFalse(mock_jinja_env.get_template.called)
@mock.patch('subprocess.check_output')
def test_run_systemctl_command(self, mock_check_output):
util.run_systemctl_command('test', 'world')
mock_check_output.assert_called_once_with(
['systemctl', 'test', 'world'], stderr=subprocess.STDOUT)
mock_check_output.side_effect = subprocess.CalledProcessError(1,
'boom')
util.run_systemctl_command('test', 'world')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.config_path')
@mock.patch('octavia.amphorae.backends.agent.api_server.util.'
'keepalived_lvs_cfg_path')
@mock.patch('os.path.exists')
def test_get_listener_protocol(self, mock_path_exists, mock_lvs_path,
mock_cfg_path):
mock_lvs_path.return_value = '/here'
mock_cfg_path.return_value = '/there'
mock_path_exists.side_effect = [True, False, True, False, False]
result = util.get_listener_protocol('1')
mock_cfg_path.assert_called_once_with('1')
mock_path_exists.assert_called_once_with('/there')
self.assertFalse(mock_lvs_path.called)
self.assertEqual(consts.PROTOCOL_TCP, result)
mock_cfg_path.reset_mock()
result = util.get_listener_protocol('2')
mock_cfg_path.assert_called_once_with('2')
mock_lvs_path.assert_called_once_with('2')
self.assertEqual(consts.PROTOCOL_UDP, result)
mock_cfg_path.reset_mock()
mock_lvs_path.reset_mock()
result = util.get_listener_protocol('3')
mock_cfg_path.assert_called_once_with('3')
mock_lvs_path.assert_called_once_with('3')
self.assertIsNone(result)

View File

@ -43,7 +43,6 @@ FAKE_MAC_ADDRESS = '123'
FAKE_MTU = 1450
FAKE_MEMBER_IP_PORT_NAME_1 = "10.0.0.10:1003"
FAKE_MEMBER_IP_PORT_NAME_2 = "10.0.0.11:1004"
FAKE_PROTOCOL = 'test-protocol'
class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
@ -202,7 +201,7 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
self.amp, self.sl.id, 'fake_config')
# start should be called once
self.driver.client.reload_listener.assert_called_once_with(
self.amp, self.sl.id, self.sl.protocol)
self.amp, self.sl.id)
def test_udp_update(self):
self.driver.udp_jinja.build_config.side_effect = ['fake_udp_config']
@ -216,7 +215,7 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
# start should be called once
self.driver.client.reload_listener.assert_called_once_with(
self.amp, self.sl_udp.id, self.sl_udp.protocol)
self.amp, self.sl_udp.id)
def test_upload_cert_amp(self):
self.driver.upload_cert_amp(self.amp, six.b('test'))
@ -228,14 +227,14 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
# Execute driver method
self.driver.stop(self.sl, self.sv)
self.driver.client.stop_listener.assert_called_once_with(
self.amp, self.sl.id, self.sl.protocol)
self.amp, self.sl.id)
def test_udp_stop(self):
self.driver.client.stop_listener.__name__ = 'stop_listener'
# Execute driver method - UDP case
self.driver.stop(self.sl_udp, self.sv)
self.driver.client.stop_listener.assert_called_once_with(
self.amp, self.sl_udp.id, self.sl_udp.protocol)
self.amp, self.sl_udp.id)
def test_start(self):
amp1 = mock.MagicMock()
@ -249,7 +248,7 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
# Execute driver method
self.driver.start(listener, self.sv)
self.driver.client.start_listener.assert_called_once_with(
amp1, listener.id, 'listener_protocol')
amp1, listener.id)
def test_start_with_amphora(self):
# Execute driver method
@ -257,7 +256,7 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
self.driver.client.start_listener.__name__ = 'start_listener'
self.driver.start(self.sl, self.sv, self.amp)
self.driver.client.start_listener.assert_called_once_with(
self.amp, self.sl.id, self.sl.protocol)
self.amp, self.sl.id)
self.driver.client.start_listener.reset_mock()
amp.status = constants.DELETED
@ -269,21 +268,21 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
# Execute driver method
self.driver.start(self.sl_udp, self.sv)
self.driver.client.start_listener.assert_called_once_with(
self.amp, self.sl_udp.id, self.sl_udp.protocol)
self.amp, self.sl_udp.id)
def test_delete(self):
self.driver.client.delete_listener.__name__ = 'delete_listener'
# Execute driver method
self.driver.delete(self.sl, self.sv)
self.driver.client.delete_listener.assert_called_once_with(
self.amp, self.sl.id, self.sl.protocol)
self.amp, self.sl.id)
def test_udp_delete(self):
self.driver.client.delete_listener.__name__ = 'delete_listener'
# Execute driver method
self.driver.delete(self.sl_udp, self.sv)
self.driver.client.delete_listener.assert_called_once_with(
self.amp, self.sl_udp.id, self.sl_udp.protocol)
self.amp, self.sl_udp.id)
def test_get_info(self):
self.driver.client.get_info.return_value = 'FAKE_INFO'
@ -532,8 +531,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
m.get("{base}/listeners/{listener_id}".format(
base=self.base_url, listener_id=FAKE_UUID_1),
json=listener)
status = self.driver.get_listener_status(self.amp, FAKE_UUID_1,
protocol='TCP')
status = self.driver.get_listener_status(self.amp, FAKE_UUID_1)
self.assertEqual(listener, status)
@requests_mock.mock()
@ -553,8 +551,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
m.get("{base}/listeners/{listener_id}".format(
base=self.base_url, listener_id=FAKE_UUID_1),
json=udp_listener)
status = self.driver.get_listener_status(self.amp, FAKE_UUID_1,
protocol='UDP')
status = self.driver.get_listener_status(self.amp, FAKE_UUID_1)
self.assertEqual(udp_listener, status)
@requests_mock.mock()
@ -598,7 +595,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
def test_start_listener(self, m):
m.put("{base}/listeners/{listener_id}/start".format(
base=self.base_url, listener_id=FAKE_UUID_1))
self.driver.start_listener(self.amp, FAKE_UUID_1, FAKE_PROTOCOL)
self.driver.start_listener(self.amp, FAKE_UUID_1)
self.assertTrue(m.called)
@requests_mock.mock()
@ -608,7 +605,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
status_code=404,
headers={'content-type': 'application/json'})
self.assertRaises(exc.NotFound, self.driver.start_listener,
self.amp, FAKE_UUID_1, FAKE_PROTOCOL)
self.amp, FAKE_UUID_1)
@requests_mock.mock()
def test_start_listener_unauthorized(self, m):
@ -616,7 +613,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
base=self.base_url, listener_id=FAKE_UUID_1),
status_code=401)
self.assertRaises(exc.Unauthorized, self.driver.start_listener,
self.amp, FAKE_UUID_1, FAKE_PROTOCOL)
self.amp, FAKE_UUID_1)
@requests_mock.mock()
def test_start_listener_server_error(self, m):
@ -624,7 +621,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
base=self.base_url, listener_id=FAKE_UUID_1),
status_code=500)
self.assertRaises(exc.InternalServerError, self.driver.start_listener,
self.amp, FAKE_UUID_1, FAKE_PROTOCOL)
self.amp, FAKE_UUID_1)
@requests_mock.mock()
def test_start_listener_service_unavailable(self, m):
@ -632,13 +629,13 @@ class TestAmphoraAPIClientTest(base.TestCase):
base=self.base_url, listener_id=FAKE_UUID_1),
status_code=503)
self.assertRaises(exc.ServiceUnavailable, self.driver.start_listener,
self.amp, FAKE_UUID_1, FAKE_PROTOCOL)
self.amp, FAKE_UUID_1)
@requests_mock.mock()
def test_stop_listener(self, m):
m.put("{base}/listeners/{listener_id}/stop".format(
base=self.base_url, listener_id=FAKE_UUID_1))
self.driver.stop_listener(self.amp, FAKE_UUID_1, FAKE_PROTOCOL)
self.driver.stop_listener(self.amp, FAKE_UUID_1)
self.assertTrue(m.called)
@requests_mock.mock()
@ -648,7 +645,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
status_code=404,
headers={'content-type': 'application/json'})
self.assertRaises(exc.NotFound, self.driver.stop_listener,
self.amp, FAKE_UUID_1, FAKE_PROTOCOL)
self.amp, FAKE_UUID_1)
@requests_mock.mock()
def test_stop_listener_unauthorized(self, m):
@ -656,7 +653,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
base=self.base_url, listener_id=FAKE_UUID_1),
status_code=401)
self.assertRaises(exc.Unauthorized, self.driver.stop_listener,
self.amp, FAKE_UUID_1, FAKE_PROTOCOL)
self.amp, FAKE_UUID_1)
@requests_mock.mock()
def test_stop_listener_server_error(self, m):
@ -664,7 +661,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
base=self.base_url, listener_id=FAKE_UUID_1),
status_code=500)
self.assertRaises(exc.InternalServerError, self.driver.stop_listener,
self.amp, FAKE_UUID_1, FAKE_PROTOCOL)
self.amp, FAKE_UUID_1)
@requests_mock.mock()
def test_stop_listener_service_unavailable(self, m):
@ -672,13 +669,13 @@ class TestAmphoraAPIClientTest(base.TestCase):
base=self.base_url, listener_id=FAKE_UUID_1),
status_code=503)
self.assertRaises(exc.ServiceUnavailable, self.driver.stop_listener,
self.amp, FAKE_UUID_1, FAKE_PROTOCOL)
self.amp, FAKE_UUID_1)
@requests_mock.mock()
def test_delete_listener(self, m):
m.delete("{base}/listeners/{listener_id}".format(
base=self.base_url, listener_id=FAKE_UUID_1), json={})
self.driver.delete_listener(self.amp, FAKE_UUID_1, FAKE_PROTOCOL)
self.driver.delete_listener(self.amp, FAKE_UUID_1)
self.assertTrue(m.called)
@requests_mock.mock()
@ -687,7 +684,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
base=self.base_url, listener_id=FAKE_UUID_1),
status_code=404,
headers={'content-type': 'application/json'})
self.driver.delete_listener(self.amp, FAKE_UUID_1, FAKE_PROTOCOL)
self.driver.delete_listener(self.amp, FAKE_UUID_1)
self.assertTrue(m.called)
@requests_mock.mock()
@ -696,7 +693,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
base=self.base_url, listener_id=FAKE_UUID_1),
status_code=401)
self.assertRaises(exc.Unauthorized, self.driver.delete_listener,
self.amp, FAKE_UUID_1, FAKE_PROTOCOL)
self.amp, FAKE_UUID_1)
@requests_mock.mock()
def test_delete_listener_server_error(self, m):
@ -704,7 +701,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
base=self.base_url, listener_id=FAKE_UUID_1),
status_code=500)
self.assertRaises(exc.InternalServerError, self.driver.delete_listener,
self.amp, FAKE_UUID_1, FAKE_PROTOCOL)
self.amp, FAKE_UUID_1)
@requests_mock.mock()
def test_delete_listener_service_unavailable(self, m):
@ -712,7 +709,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
base=self.base_url, listener_id=FAKE_UUID_1),
status_code=503)
self.assertRaises(exc.ServiceUnavailable, self.driver.delete_listener,
self.amp, FAKE_UUID_1, FAKE_PROTOCOL)
self.amp, FAKE_UUID_1)
@requests_mock.mock()
def test_upload_cert_pem(self, m):

View File

@ -36,12 +36,15 @@ class TestLvsCfg(base.TestCase):
self.assertEqual('keepalivedlvs.cfg.j2', template.name)
def test_render_template_udp_source_ip(self):
exp = ("# Configuration for Listener sample_listener_id_1\n\n"
exp = ("# Configuration for Loadbalancer sample_loadbalancer_id_1\n"
"# Configuration for Listener sample_listener_id_1\n\n"
"net_namespace amphora-haproxy\n\n"
"virtual_server 10.0.0.2 80 {\n"
" lb_algo rr\n"
" lb_kind NAT\n"
" protocol udp\n"
" protocol UDP\n"
" persistence_timeout 33\n"
" persistence_granularity 255.255.0.0\n"
" delay_loop 30\n"
" delay_before_retry 31\n"
" retry 3\n\n\n"
@ -52,15 +55,12 @@ class TestLvsCfg(base.TestCase):
" weight 13\n"
" inhibit_on_failure\n"
" uthreshold 98\n"
" persistence_timeout 33\n"
" persistence_granularity 255.255.0.0\n"
" delay_before_retry 31\n"
" retry 3\n"
" MISC_CHECK {\n"
" misc_path \"/var/lib/octavia/lvs/check/"
"udp_check.sh 10.0.0.99 82\"\n"
" misc_timeout 30\n"
" misc_dynamic\n"
" }\n"
" }\n\n"
" # Configuration for Member sample_member_id_2\n"
@ -68,15 +68,12 @@ class TestLvsCfg(base.TestCase):
" weight 13\n"
" inhibit_on_failure\n"
" uthreshold 98\n"
" persistence_timeout 33\n"
" persistence_granularity 255.255.0.0\n"
" delay_before_retry 31\n"
" retry 3\n"
" MISC_CHECK {\n"
" misc_path \"/var/lib/octavia/lvs/check/"
"udp_check.sh 10.0.0.98 82\"\n"
" misc_timeout 30\n"
" misc_dynamic\n"
" }\n"
" }\n\n"
"}\n\n")
@ -91,13 +88,13 @@ class TestLvsCfg(base.TestCase):
self.assertEqual(exp, rendered_obj)
def test_render_template_udp_one_packet(self):
exp = ("# Configuration for Listener sample_listener_id_1\n\n"
exp = ("# Configuration for Loadbalancer sample_loadbalancer_id_1\n"
"# Configuration for Listener sample_listener_id_1\n\n"
"net_namespace amphora-haproxy\n\n"
"virtual_server 10.0.0.2 80 {\n"
" lb_algo rr\n"
" ops\n"
" lb_kind NAT\n"
" protocol udp\n"
" protocol UDP\n"
" delay_loop 30\n"
" delay_before_retry 31\n"
" retry 3\n\n\n"
@ -114,7 +111,6 @@ class TestLvsCfg(base.TestCase):
" misc_path \"/var/lib/octavia/lvs/check/"
"udp_check.sh 10.0.0.99 82\"\n"
" misc_timeout 30\n"
" misc_dynamic\n"
" }\n"
" }\n\n"
" # Configuration for Member sample_member_id_2\n"
@ -128,7 +124,6 @@ class TestLvsCfg(base.TestCase):
" misc_path \"/var/lib/octavia/lvs/check/"
"udp_check.sh 10.0.0.98 82\"\n"
" misc_timeout 30\n"
" misc_dynamic\n"
" }\n"
" }\n\n"
"}\n\n")
@ -142,13 +137,13 @@ class TestLvsCfg(base.TestCase):
self.assertEqual(exp, rendered_obj)
def test_render_template_udp_with_health_monitor(self):
exp = ("# Configuration for Listener sample_listener_id_1\n\n"
exp = ("# Configuration for Loadbalancer sample_loadbalancer_id_1\n"
"# Configuration for Listener sample_listener_id_1\n\n"
"net_namespace amphora-haproxy\n\n"
"virtual_server 10.0.0.2 80 {\n"
" lb_algo rr\n"
" ops\n"
" lb_kind NAT\n"
" protocol udp\n"
" protocol UDP\n"
" delay_loop 30\n"
" delay_before_retry 31\n"
" retry 3\n\n\n"
@ -165,7 +160,6 @@ class TestLvsCfg(base.TestCase):
" misc_path \"/var/lib/octavia/lvs/check/"
"udp_check.sh 10.0.0.99 82\"\n"
" misc_timeout 30\n"
" misc_dynamic\n"
" }\n"
" }\n\n"
" # Configuration for Member sample_member_id_2\n"
@ -179,7 +173,6 @@ class TestLvsCfg(base.TestCase):
" misc_path \"/var/lib/octavia/lvs/check/"
"udp_check.sh 10.0.0.98 82\"\n"
" misc_timeout 30\n"
" misc_dynamic\n"
" }\n"
" }\n\n"
"}\n\n")
@ -193,7 +186,8 @@ class TestLvsCfg(base.TestCase):
self.assertEqual(exp, rendered_obj)
def test_render_template_udp_no_other_resources(self):
exp = ("# Configuration for Listener sample_listener_id_1\n\n"
exp = ("# Configuration for Loadbalancer sample_loadbalancer_id_1\n"
"# Configuration for Listener sample_listener_id_1\n\n"
"net_namespace amphora-haproxy\n\n\n")
rendered_obj = self.udp_jinja_cfg.render_loadbalancer_obj(

View File

@ -0,0 +1,20 @@
---
features:
- Added UDP protocol support to listeners and pools.
- Adds a health monitor type of UDP-CONNECT that does a basic UDP port
connect.
issues:
- You cannot mix IPv4 UDP listeners with IPv6 members at this time. This is
being tracked with this story
https://storyboard.openstack.org/#!/story/2003329
- CentOS based amphora images do not yet support UDP load balancing.
upgrade:
- |
UDP protocol support requires an update to the amphora image to support
UDP protocol statistics reporting and UDP-CONNECT health monitoring.
other:
- |
Health monitors of type UDP-CONNECT may not work correctly if ICMP
unreachable is not enabled on the member server or is blocked by a security
rule. A member server may be marked as operating status ONLINE when it is
actually down.