Amphora Flows and Drivers for Active Standby

This patch implements the Active/Standby blueprint in
https://blueprints.launchpad.net/octavia/+spec/activepassiveamphora

The following points describe the main changes:

1. The patch introduces new flows and subflows to create M amphorae. The
controller worker parses the loadbalancer_topology configuration. If the
loadbalancer_topology value is ACTIVE_STANDBY, the controller invokes a new flow
independent from the SINGLE topology case, which is left untouched. The new
flow uses conditional taskflows to check for spare amphorae at runtime. This
removes the need for the exception workaround we earlier had. The controller
creates the amphorae in parallel using an unordered flow. A new database task
alter an amphora role as either MASTER or BACKUP and assigns a VRRP priority to
each amphora. After the amphorae are created, the controller invokes a separate
flow for post amphora configuration including plug_vip methods, vrrp
configuration upload, and keepalived service start.

2. The patch introduces new data models that include a new table for VRRP group
configuration per loadbalancer, and update the amphora, loadbalancer, and
listener tables to support the new active/standby capability. The VRRPGroup
table hides authentication data, and makes future extensions of VRRP
capabilities easy.

3. This patch updates the existing Haproxy configuration templates  to include
peer synchronization. In case of ACTIVE_STANDBY configuration, the jinja
configuration renders the peer section in the Haproxy configuration and assigns
short names to the amphorae as listener peers. As listeners implies different
Haproxy process, each listener synchronizes on a different port evaluated as
BASE_PORT (1024) + NUMBER_OF_LISTENERS accounting for ports in use.

4. This patch introduces a new Jinja configuration templater and a REST driver
for Keepalived (developed as a Mixin). By default, Keepalived runs "all" check
scripts found in a predefined directory. The keepalived driver is a Mixin that
can be plugged in other services' drivers. It is the responsibility of these
services drivers to introduce their own check scripts. In this patch a
lightweight check script for Haproxy was introduced along with changes in the
amphora agent installation script.

5. The VRRP requires enabling protocol 112 for Master/Backup advertisements,
and enabling protocol 51 for authentication header. This patch enables these
protocols as needed in the loadbalancer security group.

Note: Updates to the failover flow to support active/standby will come in
a dependent patch.
Note: The amphora-agent is pinned to this patch in this patch set.  This
is required so the scenario tests will pass.  It will be removed in a
follow up patch.

Co-Authored-By: Sherif Abdelwahab <sherif.abdelwahab@hp.com>
Co-Authored-By: Michael Johnson <johnsomor@gmail.com>
Implements: blueprint activepassiveamphora
Depends-On: Ifdf20378b26cdd13e0a3ff87cec8990fe89c0661
Change-Id: Ic4e04594e114ba682088d68d5f1af3f8f376db83
changes/52/206252/54
Sherif Abdelwahab 2015-07-27 15:49:05 -07:00 committed by Michael Johnson
parent 0a1d45f696
commit 58cda714ba
66 changed files with 2680 additions and 257 deletions

View File

@ -369,6 +369,80 @@ health of the amphora, currently-configured topology and role, etc.
],
}
Get interface
-------------
* **URL:** /*:version*/interface/*:ip*
* **Method:** GET
* **URL params:**
* *:ip* = the ip address to find the interface name
* **Data params:** none
* **Success Response:**
* Code: 200
* Content: OK
* Content: JSON formatted interface
* **Error Response:**
* Code: 400
* Content: Bad IP address version
* Code: 404
* Content: Error interface not found for IP address
* **Response:**
| OK
| eth1
**Examples:**
* Success code 200:
::
GET URL:
https://octavia-haproxy-img-00328.local/v0.1/interface/10.0.0.1
JSON Response:
{
'message': 'OK',
'interface': 'eth1'
}
* Error code 404:
::
GET URL:
https://octavia-haproxy-img-00328.local/v0.1/interface/10.5.0.1
JSON Response:
{
'message': 'Error interface not found for IP address',
}
* Error code 404:
::
GET URL:
https://octavia-haproxy-img-00328.local/v0.1/interface/10.6.0.1.1
JSON Response:
{
'message': 'Bad IP address version',
}
Get all listeners' statuses
---------------------------
@ -1336,3 +1410,116 @@ not be available for soem time.
}
Upload keepalived configuration
-------------------------------
* **URL:** /*:version*/vrrp/upload
* **Method:** PUT
* **URL params:** none
* **Data params:** none
* **Success Response:**
* Code: 200
* Content: OK
* **Error Response:**
* Code: 500
* Content: Failed to upload keepalived configuration.
* **Response:**
OK
**Examples:**
* Success code 200:
::
PUT URI:
https://octavia-haproxy-img-00328.local/v0.1/vrrp/upload
JSON Response:
{
'message': 'OK'
}
Start, Stop, or Reload keepalived
---------------------------------
* **URL:** /*:version*/vrrp/*:action*
* **Method:** PUT
* **URL params:**
* *:action* = One of: start, stop, reload
* **Data params:** none
* **Success Response:**
* Code: 202
* Content: OK
* **Error Response:**
* Code: 400
* Content: Invalid Request
* Code: 500
* Content: Failed to start / stop / reload keepalived service:
* *(Also contains error output from attempt to start / stop / \
reload keepalived)*
* **Response:**
| OK
| keepalived started
**Examples:**
* Success code 202:
::
PUT URL:
https://octavia-haproxy-img-00328.local/v0.1/vrrp/start
JSON Response:
{
'message': 'OK',
'details': 'keepalived started',
}
* Error code: 400
::
PUT URL:
https://octavia-haproxy-img-00328.local/v0.1/vrrp/BAD_TEST_DATA
JSON Response:
{
'message': 'Invalid Request',
'details': 'Unknown action: BAD_TEST_DATA',
}
* Error code: 500
::
PUT URL:
https://octavia-haproxy-img-00328.local/v0.1/vrrp/stop
JSON Response:
{
'message': 'Failed to stop keepalived service: keeepalived process with PID 3352 not found',
'details': 'keeepalived process with PID 3352 not found',
}

View File

@ -1,2 +1,2 @@
# This is temporary until we have a pip package
amphora-agent git /opt/amphora-agent https://review.openstack.org/openstack/octavia
amphora-agent git /opt/amphora-agent https://review.openstack.org/openstack/octavia refs/changes/52/206252/52

View File

@ -1,9 +1,11 @@
#!/bin/bash
set -eux
#Checks out keepalived version 1.2.13, compiles and installs binaries.
# install keepalived dependances.
apt-get --assume-yes install `apt-cache depends keepalived | awk '/Depends:/{print$2}'`
# Checks out keepalived version 1.2.19, compiles and installs binaries.
cd /opt/vrrp-octavia/
git checkout v1.2.13
git checkout v1.2.19
./configure
make
make install

View File

@ -1,2 +1,3 @@
# Clone source for keepalived version 1.2.13. Correct version is in the installation script
# Clone source for keepalived version 1.2.19. Correct version is in
# the installation script
vrrp-octavia git /opt/vrrp-octavia https://github.com/acassen/keepalived

View File

@ -107,6 +107,10 @@
# Change for production to a ram drive
# haproxy_cert_dir = /tmp
# Maximum number of entries that can fit in the stick table.
# The size supports "k", "m", "g" suffixes.
# haproxy_stick_table_size = 10k
[controller_worker]
# amp_active_retries = 10
# amp_active_wait_sec = 10
@ -138,6 +142,9 @@
# Certificate Generator options are local_cert_generator
# barbican_cert_generator
# cert_generator = local_cert_generator
#
# Load balancer topology options are SINGLE, ACTIVE_STANDBY
# loadbalancer_topology = SINGLE
[task_flow]
# engine = serial
@ -175,3 +182,16 @@
# agent_server_cert = /etc/octavia/certs/server.pem
# agent_server_network_dir = /etc/network/interfaces.d/
# agent_server_network_file =
[keepalived_vrrp]
# Amphora Role/Priority advertisement interval in seconds
# vrrp_advert_int = 1
# Service health check interval and success/fail count
# vrrp_check_interval = 5
# vrpp_fail_count = 2
# vrrp_success_count = 2
# Amphora MASTER gratuitous ARP refresh settings
# vrrp_garp_refresh_interval = 5
# vrrp_garp_refresh_count = 2

View File

@ -19,7 +19,9 @@ import socket
import subprocess
import flask
import ipaddress
import netifaces
import six
from octavia.amphorae.backends.agent import api_server
from octavia.amphorae.backends.agent.api_server import util
@ -138,3 +140,27 @@ def _get_networks():
network_tx=_get_network_bytes(interface, 'tx'),
network_rx=_get_network_bytes(interface, 'rx'))
return networks
def get_interface(ip_addr):
if six.PY2:
ip_version = ipaddress.ip_address(unicode(ip_addr)).version
else:
ip_version = ipaddress.ip_address(ip_addr).version
if ip_version == 4:
address_format = netifaces.AF_INET
elif ip_version == 6:
address_format = netifaces.AF_INET6
else:
return flask.make_response(
flask.jsonify(dict(message="Bad IP address version")), 400)
for interface in netifaces.interfaces():
for i in netifaces.ifaddresses(interface)[address_format]:
if i['addr'] == ip_addr:
return flask.make_response(
flask.jsonify(dict(message='OK', interface=interface)),
200)
return flask.make_response(
flask.jsonify(dict(message="Error interface not found "
"for IP address")), 404)

View File

@ -0,0 +1,115 @@
# Copyright 2015 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import subprocess
import flask
import jinja2
from octavia.amphorae.backends.agent.api_server import listener
from octavia.amphorae.backends.agent.api_server import util
from octavia.common import constants as consts
BUFFER = 100
LOG = logging.getLogger(__name__)
j2_env = jinja2.Environment(loader=jinja2.FileSystemLoader(
os.path.dirname(os.path.realpath(__file__)) + consts.AGENT_API_TEMPLATES))
template = j2_env.get_template(consts.KEEPALIVED_CONF)
check_script_template = j2_env.get_template(consts.CHECK_SCRIPT_CONF)
def upload_keepalived_config():
stream = listener.Wrapped(flask.request.stream)
if not os.path.exists(util.keepalived_dir()):
os.makedirs(util.keepalived_dir())
os.makedirs(util.keepalived_check_scripts_dir())
conf_file = util.keepalived_cfg_path()
with open(conf_file, 'w') as f:
b = stream.read(BUFFER)
while b:
f.write(b)
b = stream.read(BUFFER)
if not os.path.exists(util.keepalived_init_path()):
with open(util.keepalived_init_path(), 'w') as text_file:
text = template.render(
keepalived_pid=util.keepalived_pid_path(),
keepalived_cmd=consts.KEEPALIVED_CMD,
keepalived_cfg=util.keepalived_cfg_path(),
keepalived_log=util.keepalived_log_path()
)
text_file.write(text)
cmd = "chmod +x {file}".format(file=util.keepalived_init_path())
try:
subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
LOG.debug("Failed to upload keepalived configuration. "
"Unable to chmod init script.")
return flask.make_response(flask.jsonify(dict(
message="Failed to upload keepalived configuration. "
"Unable to chmod init script.",
details=e.output)), 500)
# Renders the Keepalived check script
with open(util.keepalived_check_script_path(), 'w') as text_file:
text = check_script_template.render(
check_scripts_dir=util.keepalived_check_scripts_dir()
)
text_file.write(text)
cmd = ("chmod +x {file}".format(
file=util.keepalived_check_script_path()))
try:
subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
LOG.debug("Failed to upload keepalived configuration. "
"Unable to chmod check script.")
return flask.make_response(flask.jsonify(dict(
message="Failed to upload keepalived configuration. "
"Unable to chmod check script.",
details=e.output)), 500)
res = flask.make_response(flask.jsonify({
'message': 'OK'}), 200)
res.headers['ETag'] = stream.get_md5()
return res
def manager_keepalived_service(action):
action = action.lower()
if action not in ['start', 'stop', 'reload']:
return flask.make_response(flask.jsonify(dict(
message='Invalid Request',
details="Unknown action: {0}".format(action))), 400)
cmd = ("/usr/sbin/service octavia-keepalived {action}".format(
action=action))
try:
subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
LOG.debug("Failed to {0} keepalived service: {1}".format(action, e))
return flask.make_response(flask.jsonify(dict(
message="Failed to {0} keepalived service".format(action),
details=e.output)), 500)
return flask.make_response(flask.jsonify(
dict(message='OK',
details='keepalived {action}ed'.format(action=action))), 202)

View File

@ -28,6 +28,7 @@ from werkzeug import exceptions
from octavia.amphorae.backends.agent.api_server import util
from octavia.amphorae.backends.utils import haproxy_query as query
from octavia.common import constants as consts
from octavia.common import utils as octavia_utils
LOG = logging.getLogger(__name__)
BUFFER = 100
@ -78,13 +79,16 @@ def get_haproxy_config(listener_id):
"""Upload the haproxy config
:param amphora_id: The id of the amphora to update
:param listener_id: The id of the listener
"""
def upload_haproxy_config(listener_id):
def upload_haproxy_config(amphora_id, listener_id):
stream = Wrapped(flask.request.stream)
# We have to hash here because HAProxy has a string length limitation
# in the configuration file "peer <peername>" lines
peer_name = octavia_utils.base64_sha1_string(amphora_id).rstrip('=')
if not os.path.exists(util.haproxy_dir(listener_id)):
os.makedirs(util.haproxy_dir(listener_id))
@ -96,7 +100,8 @@ def upload_haproxy_config(listener_id):
b = stream.read(BUFFER)
# use haproxy to check the config
cmd = "haproxy -c -f {config_file}".format(config_file=name)
cmd = "haproxy -c -L {peer} -f {config_file}".format(config_file=name,
peer=peer_name)
try:
subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)
@ -113,6 +118,7 @@ def upload_haproxy_config(listener_id):
if not os.path.exists(util.upstart_path(listener_id)):
with open(util.upstart_path(listener_id), 'w') as text_file:
text = template.render(
peer_name=peer_name,
haproxy_pid=util.pid_path(listener_id),
haproxy_cmd=util.CONF.haproxy_amphora.haproxy_cmd,
haproxy_cfg=util.config_path(listener_id),
@ -136,17 +142,24 @@ def start_stop_listener(listener_id, action):
_check_listener_exists(listener_id)
# Since this script should be created at LB create time
# we can check for this path to see if VRRP is enabled
# on this amphora and not write the file if VRRP is not in use
if os.path.exists(util.keepalived_check_script_path()):
vrrp_check_script_update(listener_id, action)
cmd = ("/usr/sbin/service haproxy-{listener_id} {action}".format(
listener_id=listener_id, action=action))
try:
subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
LOG.debug("Failed to %(action)s HAProxy service: %(err)s",
{'action': action, 'err': e})
return flask.make_response(flask.jsonify(dict(
message="Error {0}ing haproxy".format(action),
details=e.output)), 500)
if 'Job is already running' not in e.output:
LOG.debug("Failed to %(action)s HAProxy service: %(err)s",
{'action': action, 'err': e})
return flask.make_response(flask.jsonify(dict(
message="Error {0}ing haproxy".format(action),
details=e.output)), 500)
if action in ['stop', 'reload']:
return flask.make_response(flask.jsonify(
dict(message='OK',
@ -369,3 +382,20 @@ def _cert_dir(listener_id):
def _cert_file_path(listener_id, filename):
return os.path.join(_cert_dir(listener_id), filename)
def vrrp_check_script_update(listener_id, action):
listener_ids = util.get_listeners()
if action == 'stop':
listener_ids.remove(listener_id)
args = []
for listener_id in listener_ids:
args.append(util.haproxy_sock_path(listener_id))
if not os.path.exists(util.keepalived_dir()):
os.makedirs(util.keepalived_dir())
os.makedirs(util.keepalived_check_scripts_dir())
cmd = 'haproxy-vrrp-check {args}; exit $?'.format(args=' '.join(args))
with open(util.haproxy_check_script_path(), 'w') as text_file:
text_file.write(cmd)

View File

@ -21,6 +21,7 @@ from werkzeug import exceptions
from octavia.amphorae.backends.agent import api_server
from octavia.amphorae.backends.agent.api_server import amphora_info
from octavia.amphorae.backends.agent.api_server import certificate_update
from octavia.amphorae.backends.agent.api_server import keepalived
from octavia.amphorae.backends.agent.api_server import listener
from octavia.amphorae.backends.agent.api_server import plug
@ -42,12 +43,11 @@ for code in six.iterkeys(exceptions.default_exceptions):
app.error_handler_spec[None][code] = make_json_error
# Tested with curl -k -XPUT --data-binary @/tmp/test.txt
# https://127.0.0.1:9443/0.5/listeners/123/haproxy
@app.route('/' + api_server.VERSION + '/listeners/<listener_id>/haproxy',
@app.route('/' + api_server.VERSION +
'/listeners/<amphora_id>/<listener_id>/haproxy',
methods=['PUT'])
def upload_haproxy_config(listener_id):
return listener.upload_haproxy_config(listener_id)
def upload_haproxy_config(amphora_id, listener_id):
return listener.upload_haproxy_config(amphora_id, listener_id)
@app.route('/' + api_server.VERSION + '/listeners/<listener_id>/haproxy',
@ -142,3 +142,18 @@ def plug_network():
@app.route('/' + api_server.VERSION + '/certificate', methods=['PUT'])
def upload_cert():
return certificate_update.upload_server_cert()
@app.route('/' + api_server.VERSION + '/vrrp/upload', methods=['PUT'])
def upload_vrrp_config():
return keepalived.upload_keepalived_config()
@app.route('/' + api_server.VERSION + '/vrrp/<action>', methods=['PUT'])
def manage_service_vrrp(action):
return keepalived.manager_keepalived_service(action)
@app.route('/' + api_server.VERSION + '/interface/<ip_addr>', methods=['GET'])
def get_interface(ip_addr):
return amphora_info.get_interface(ip_addr)

View File

@ -23,6 +23,7 @@ start on startup
env PID_PATH={{ haproxy_pid }}
env BIN_PATH={{ haproxy_cmd }}
env CONF_PATH={{ haproxy_cfg }}
env PEER_NAME={{ peer_name }}
respawn
respawn limit {{ respawn_count }} {{respawn_interval}}
@ -34,9 +35,10 @@ end script
script
exec /bin/bash <<EOF
echo \$(date) Starting HAProxy
$BIN_PATH -f $CONF_PATH -D -p $PID_PATH
# The -L trick fixes the HAProxy limitation to have long peer names
$BIN_PATH -f $CONF_PATH -L $PEER_NAME -D -p $PID_PATH
trap "$BIN_PATH -f $CONF_PATH -p $PID_PATH -sf \\\$(cat $PID_PATH)" SIGHUP
trap "$BIN_PATH -f $CONF_PATH -L $PEER_NAME -p $PID_PATH -sf \\\$(cat $PID_PATH)" SIGHUP
trap "kill -TERM \\\$(cat $PID_PATH) && rm $PID_PATH;echo \\\$(date) Exiting HAProxy; exit 0" SIGTERM SIGINT
while true; do # Iterate to keep job running.

View File

@ -0,0 +1,67 @@
{#
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
#}
#!/bin/sh
RETVAL=0
prog="octavia-keepalived"
start() {
echo -n $"Starting $prog"
{{ keepalived_cmd }} -D -d -f {{ keepalived_cfg }}
RETVAL=$?
echo
[ $RETVAL -eq 0 ] && touch {{ keepalived_pid }}
}
stop() {
echo -n $"Stopping $prog"
kill -9 `pidof keepalived`
RETVAL=$?
echo
[ $RETVAL -eq 0 ] && rm -f {{ keepalived_pid }}
}
status() {
kill -0 `pidof keepalived`
RETVAL=$?
[ $RETVAL -eq 0 ] && echo -n $"$prog is running"
[ $RETVAL -eq 1 ] && echo -n $"$prog is not found"
echo
}
# See how we were called.
case "$1" in
start)
start
;;
stop)
stop
;;
reload)
stop
start
;;
status)
status
;;
*)
echo "Usage: $0 {start|stop|reload|status}"
RETVAL=1
esac
exit $RETVAL

View File

@ -0,0 +1,26 @@
{#
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
#}
#!/bin/sh
status=0
for file in {{ check_scripts_dir }}/*
do
echo "Running check script: " $file
sh $file
status=$(( $status + $? ))
done
exit $status

View File

@ -21,6 +21,7 @@ CONF = cfg.CONF
CONF.import_group('amphora_agent', 'octavia.common.config')
CONF.import_group('haproxy_amphora', 'octavia.common.config')
UPSTART_DIR = '/etc/init'
KEEPALIVED_INIT_DIR = '/etc/init.d'
def upstart_path(listener_id):
@ -44,6 +45,48 @@ def get_haproxy_pid(listener_id):
return f.readline().rstrip()
def haproxy_sock_path(listener_id):
return os.path.join(CONF.haproxy_amphora.base_path, listener_id + '.sock')
def haproxy_check_script_path():
return os.path.join(keepalived_check_scripts_dir(),
'haproxy_check_script.sh')
def keepalived_dir():
return os.path.join(CONF.haproxy_amphora.base_path, 'vrrp')
def keepalived_init_path():
return os.path.join(KEEPALIVED_INIT_DIR, 'octavia-keepalived')
def keepalived_pid_path():
return os.path.join(CONF.haproxy_amphora.base_path,
'vrrp/octavia-keepalived.pid')
def keepalived_cfg_path():
return os.path.join(CONF.haproxy_amphora.base_path,
'vrrp/octavia-keepalived.conf')
def keepalived_log_path():
return os.path.join(CONF.haproxy_amphora.base_path,
'vrrp/octavia-keepalived.log')
def keepalived_check_scripts_dir():
return os.path.join(CONF.haproxy_amphora.base_path,
'vrrp/check_scripts')
def keepalived_check_script_path():
return os.path.join(CONF.haproxy_amphora.base_path,
'vrrp/check_script.sh')
"""Get Listeners
:returns An array with the ids of all listeners, e.g. ['123', '456', ...]

View File

@ -1,4 +1,5 @@
# Copyright 2011-2014 OpenStack Foundation,author: Min Wang,German Eichberger
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -242,3 +243,44 @@ class StatsMixin(object):
awesome update code and code to send to ceilometer
"""
pass
@six.add_metaclass(abc.ABCMeta)
class VRRPDriverMixin(object):
"""Abstract mixin class for VRRP support in loadbalancer amphorae
Usage: To plug VRRP support in another service driver XYZ, use:
@plug_mixin(XYZ)
class XYZ: ...
"""
@abc.abstractmethod
def update_vrrp_conf(self, loadbalancer):
"""Update amphorae of the loadbalancer with a new VRRP configuration
:param loadbalancer: loadbalancer object
"""
pass
@abc.abstractmethod
def stop_vrrp_service(self, loadbalancer):
"""Stop the vrrp services running on the loadbalancer's amphorae
:param loadbalancer: loadbalancer object
"""
pass
@abc.abstractmethod
def start_vrrp_service(self, loadbalancer):
"""Start the VRRP services of all amphorae of the loadbalancer
:param loadbalancer: loadbalancer object
"""
pass
@abc.abstractmethod
def reload_vrrp_service(self, loadbalancer):
"""Reload the VRRP services of all amphorae of the loadbalancer
:param loadbalancer: loadbalancer object
"""
pass

View File

@ -19,8 +19,9 @@ from webob import exc
def check_exception(response):
status_code = response.status_code
responses = {
400: InvalidRequest,
401: Unauthorized,
403: InvalidRequest,
403: Forbidden,
404: NotFound,
405: InvalidRequest,
409: Conflict,
@ -42,13 +43,18 @@ class APIException(exc.HTTPClientError):
super(APIException, self).__init__(detail=self.msg)
class InvalidRequest(APIException):
msg = "Invalid request"
code = 400
class Unauthorized(APIException):
msg = "Unauthorized"
code = 401
class InvalidRequest(APIException):
msg = "Invalid request"
class Forbidden(APIException):
msg = "Forbidden"
code = 403
@ -69,4 +75,4 @@ class InternalServerError(APIException):
class ServiceUnavailable(APIException):
msg = "Service Unavailable"
code = 503
code = 503

View File

@ -17,7 +17,9 @@ import os
import jinja2
import six
from octavia.common.config import cfg
from octavia.common import constants
from octavia.common import utils as octavia_utils
PROTOCOL_MAP = {
constants.PROTOCOL_TCP: 'tcp',
@ -42,6 +44,9 @@ HAPROXY_TEMPLATE = os.path.abspath(
os.path.join(os.path.dirname(__file__),
'templates/haproxy_listener.template'))
CONF = cfg.CONF
CONF.import_group('haproxy_amphora', 'octavia.common.config')
JINJA_ENV = None
@ -104,6 +109,7 @@ class JinjaTemplater(object):
loader=template_loader,
trim_blocks=True,
lstrip_blocks=True)
JINJA_ENV.filters['hash_amp_id'] = octavia_utils.base64_sha1_string
return JINJA_ENV.get_template(os.path.basename(self.haproxy_template))
def render_loadbalancer_obj(self, listener,
@ -144,7 +150,8 @@ class JinjaTemplater(object):
return {
'name': loadbalancer.name,
'vip_address': loadbalancer.vip.ip_address,
'listener': listener
'listener': listener,
'topology': loadbalancer.topology
}
def _transform_listener(self, listener, tls_cert):
@ -156,7 +163,10 @@ class JinjaTemplater(object):
'id': listener.id,
'protocol_port': listener.protocol_port,
'protocol_mode': PROTOCOL_MAP[listener.protocol],
'protocol': listener.protocol
'protocol': listener.protocol,
'peer_port': listener.peer_port,
'topology': listener.load_balancer.topology,
'amphorae': listener.load_balancer.amphorae
}
if listener.connection_limit and listener.connection_limit > -1:
ret_value['connection_limit'] = listener.connection_limit
@ -185,7 +195,8 @@ class JinjaTemplater(object):
'health_monitor': '',
'session_persistence': '',
'enabled': pool.enabled,
'operating_status': pool.operating_status
'operating_status': pool.operating_status,
'stick_size': CONF.haproxy_amphora.haproxy_stick_size
}
members = [self._transform_member(x) for x in pool.members]
ret_value['members'] = members

View File

@ -30,4 +30,6 @@ defaults
timeout client {{ timeout_client | default('50000', true) }}
timeout server {{ timeout_server | default('50000', true) }}
{% block peers %}{% endblock peers %}
{% block proxies %}{% endblock proxies %}

View File

@ -18,6 +18,11 @@
{% set usergroup = user_group %}
{% set sock_path = stats_sock %}
{% block peers %}
{% from 'haproxy_proxies.template' import peers_macro%}
{{ peers_macro(constants, loadbalancer.listener) }}
{% endblock peers %}
{% block proxies %}
{% from 'haproxy_proxies.template' import frontend_macro as frontend_macro, backend_macro%}
{{ frontend_macro(constants, loadbalancer.listener, loadbalancer.vip_address) }}

View File

@ -18,6 +18,11 @@
{% set usergroup = user_group %}
{% set sock_path = stats_sock %}
{% block peers %}
{% from 'haproxy_proxies.template' import peers_macro%}
{{ peers_macro(constants, loadbalancer.listener) }}
{% endblock peers %}
{% block proxies %}
{% from 'haproxy_proxies.template' import frontend_macro as frontend_macro, backend_macro%}
{% for listener in loadbalancer.listeners %}

View File

@ -15,6 +15,16 @@
#}
{% extends 'haproxy_base.template' %}
{% macro peers_macro(constants,listener) %}
{% if listener.topology == constants.TOPOLOGY_ACTIVE_STANDBY %}
peers {{ "%s_peers"|format(listener.id.replace("-", ""))|trim() }}
{% for amp in listener.amphorae %}
{# HAProxy has peer name limitations, thus the hash filter #}
peer {{ amp.id|hash_amp_id|replace('=', '') }} {{ amp.vrrp_ip }}:{{ listener.peer_port }}
{% endfor %}
{% endif %}
{% endmacro %}
{% macro bind_macro(constants, listener, lb_vip_address) %}
{% if listener.default_tls_path %}
{% set def_crt_opt = "ssl crt %s"|format(listener.default_tls_path)|trim() %}
@ -60,7 +70,11 @@ backend {{ pool.id }}
{% endif %}
{% if pool.session_persistence %}
{% if pool.session_persistence.type == constants.SESSION_PERSISTENCE_SOURCE_IP %}
stick-table type ip size 10k
{% if listener.topology == constants.TOPOLOGY_ACTIVE_STANDBY %}
stick-table type ip size {{ pool.stick_size }} peers {{ "%s_peers"|format(listener.id.replace("-", ""))|trim() }}
{% else %}
stick-table type ip size {{ pool.stick_size }}
{% endif %}
stick on src
{% elif pool.session_persistence.type == constants.SESSION_PERSISTENCE_HTTP_COOKIE %}
cookie SRV insert indirect nocache

View File

@ -26,13 +26,14 @@ from octavia.amphorae.driver_exceptions import exceptions as driver_except
from octavia.amphorae.drivers import driver_base as driver_base
from octavia.amphorae.drivers.haproxy import exceptions as exc
from octavia.amphorae.drivers.haproxy.jinja import jinja_cfg
from octavia.amphorae.drivers.keepalived import vrrp_rest_driver
from octavia.common.config import cfg
from octavia.common import constants
from octavia.common.tls_utils import cert_parser
from octavia.i18n import _LW
LOG = logging.getLogger(__name__)
API_VERSION = '0.5'
API_VERSION = constants.API_VERSION
OCTAVIA_API_CLIENT = (
"Octavia HaProxy Rest Client/{version} "
"(https://wiki.openstack.org/wiki/Octavia)").format(version=API_VERSION)
@ -40,7 +41,10 @@ CONF = cfg.CONF
CONF.import_group('haproxy_amphora', 'octavia.common.config')
class HaproxyAmphoraLoadBalancerDriver(driver_base.AmphoraLoadBalancerDriver):
class HaproxyAmphoraLoadBalancerDriver(
driver_base.AmphoraLoadBalancerDriver,
vrrp_rest_driver.KeepalivedAmphoraDriverMixin):
def __init__(self):
super(HaproxyAmphoraLoadBalancerDriver, self).__init__()
self.client = AmphoraAPIClient()
@ -61,7 +65,6 @@ class HaproxyAmphoraLoadBalancerDriver(driver_base.AmphoraLoadBalancerDriver):
# Process listener certificate info
certs = self._process_tls_certificates(listener)
# Generate HaProxy configuration from listener object
config = self.jinja.build_config(listener, certs['tls_cert'],
certs['sni_certs'])
@ -130,6 +133,9 @@ class HaproxyAmphoraLoadBalancerDriver(driver_base.AmphoraLoadBalancerDriver):
port_info = {'mac_address': port.mac_address}
self.client.plug_network(amphora, port_info)
def get_vrrp_interface(self, amphora):
return self.client.get_interface(amphora, amphora.vrrp_ip)['interface']
def _process_tls_certificates(self, listener):
"""Processes TLS data from the listener.
@ -192,6 +198,10 @@ class AmphoraAPIClient(object):
self.stop_listener = functools.partial(self._action, 'stop')
self.reload_listener = functools.partial(self._action, 'reload')
self.start_vrrp = functools.partial(self._vrrp_action, 'start')
self.stop_vrrp = functools.partial(self._vrrp_action, 'stop')
self.reload_vrrp = functools.partial(self._vrrp_action, 'reload')
self.session = requests.Session()
self.session.cert = CONF.haproxy_amphora.client_cert
self.ssl_adapter = CustomHostNameCheckingAdapter()
@ -207,7 +217,7 @@ class AmphoraAPIClient(object):
LOG.debug("request url %s", path)
_request = getattr(self.session, method.lower())
_url = self._base_url(amp.lb_network_ip) + path
LOG.debug("request url " + _url)
reqargs = {
'verify': CONF.haproxy_amphora.server_ca,
'url': _url, }
@ -232,7 +242,8 @@ class AmphoraAPIClient(object):
def upload_config(self, amp, listener_id, config):
r = self.put(
amp,
'listeners/{listener_id}/haproxy'.format(listener_id=listener_id),
'listeners/{amphora_id}/{listener_id}/haproxy'.format(
amphora_id=amp.id, listener_id=listener_id),
data=config)
return exc.check_exception(r)
@ -304,3 +315,16 @@ class AmphoraAPIClient(object):
'plug/vip/{vip}'.format(vip=vip),
json=net_info)
return exc.check_exception(r)
def upload_vrrp_config(self, amp, config):
r = self.put(amp, 'vrrp/upload', data=config)
return exc.check_exception(r)
def _vrrp_action(self, action, amp):
r = self.put(amp, 'vrrp/{action}'.format(action=action))
return exc.check_exception(r)
def get_interface(self, amp, ip_addr):
r = self.get(amp, 'interface/{ip_addr}'.format(ip_addr=ip_addr))
if exc.check_exception(r):
return r.json()

View File

@ -0,0 +1,94 @@
# Copyright 2015 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import jinja2
from oslo_config import cfg
from octavia.amphorae.backends.agent.api_server import util
from octavia.common import constants
KEEPALIVED_TEMPLATE = os.path.abspath(
os.path.join(os.path.dirname(__file__),
'templates/keepalived_base.template'))
CONF = cfg.CONF
CONF.import_group('keepalived_vrrp', 'octavia.common.config')
class KeepalivedJinjaTemplater(object):
def __init__(self, keepalived_template=None):
"""Keepalived configuration generation
:param keepalived_template: Absolute path to keepalived Jinja template
"""
super(KeepalivedJinjaTemplater, self).__init__()
self.keepalived_template = (keepalived_template if
keepalived_template else
KEEPALIVED_TEMPLATE)
self._jinja_env = None
def get_template(self, template_file):
"""Returns the specified Jinja configuration template."""
if not self._jinja_env:
template_loader = jinja2.FileSystemLoader(
searchpath=os.path.dirname(template_file))
self._jinja_env = jinja2.Environment(
loader=template_loader,
trim_blocks=True,
lstrip_blocks=True)
return self._jinja_env.get_template(os.path.basename(template_file))
def build_keepalived_config(self, loadbalancer, amphora):
"""Renders the loadblanacer keepalived configuration for Active/Standby
:param loadbalancer: A lodabalancer object
:param amp: An amphora object
"""
# Note on keepalived configuration: The current base configuration
# enforced Master election whenever a high priority VRRP instance
# start advertising its presence. Accordingly, the fallback behavior
# - which I described in the blueprint - is the default behavior.
# Although this is a stable behavior, this can be undesirable for
# several backend services. To disable the fallback behavior, we need
# to add the "nopreempt" flag in the backup instance section.
peers_ips = []
for amp in loadbalancer.amphorae:
if amp.vrrp_ip != amphora.vrrp_ip:
peers_ips.append(amp.vrrp_ip)
return self.get_template(self.keepalived_template).render(
{'vrrp_group_name': loadbalancer.vrrp_group.vrrp_group_name,
'amp_role': amphora.role,
'amp_intf': amphora.vrrp_interface,
'amp_vrrp_id': amphora.vrrp_id,
'amp_priority': amphora.vrrp_priority,
'vrrp_garp_refresh':
CONF.keepalived_vrrp.vrrp_garp_refresh_interval,
'vrrp_garp_refresh_repeat':
CONF.keepalived_vrrp.vrrp_garp_refresh_count,
'vrrp_auth_type': loadbalancer.vrrp_group.vrrp_auth_type,
'vrrp_auth_pass': loadbalancer.vrrp_group.vrrp_auth_pass,
'amp_vrrp_ip': amphora.vrrp_ip,
'peers_vrrp_ips': peers_ips,
'vip_ip_address': loadbalancer.vip.ip_address,
'advert_int': loadbalancer.vrrp_group.advert_int,
'check_script_path': util.keepalived_check_script_path(),
'vrrp_check_interval':
CONF.keepalived_vrrp.vrrp_check_interval,
'vrrp_fail_count': CONF.keepalived_vrrp.vrrp_fail_count,
'vrrp_success_count':
CONF.keepalived_vrrp.vrrp_success_count},
constants=constants)

View File

@ -0,0 +1,55 @@
{#
# Copyright 2015 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#}
{% macro unicast_peer_macro(peers_vrrp_ips) %}
{% for amp_vrrp_ip in peers_vrrp_ips %}
{{ amp_vrrp_ip }}
{% endfor %}
{% endmacro %}
vrrp_script check_script {
script {{ check_script_path }}
interval {{ vrrp_check_interval }}
fall {{ vrrp_fail_count }}
rise {{ vrrp_success_count }}
}
vrrp_instance {{ vrrp_group_name }} {
state {{ amp_role }}
interface {{ amp_intf }}
virtual_router_id {{ amp_vrrp_id }}
priority {{ amp_priority }}
garp_master_refresh {{ vrrp_garp_refresh }}
garp_master_refresh_repeat {{ vrrp_garp_refresh_repeat }}
advert_int {{ advert_int }}
authentication {
auth_type {{ vrrp_auth_type }}
auth_pass {{ vrrp_auth_pass }}
}
unicast_src_ip {{ amp_vrrp_ip }}
unicast_peer {
{{ unicast_peer_macro(peers_vrrp_ips) }}
}
virtual_ipaddress {
{{ vip_ip_address }}
}
track_script {
check_script
}
}

View File

@ -0,0 +1,79 @@
# Copyright 2015 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from octavia.amphorae.drivers import driver_base as driver_base
from octavia.amphorae.drivers.keepalived.jinja import jinja_cfg
from octavia.common.config import cfg
from octavia.common import constants
from octavia.i18n import _LI
LOG = logging.getLogger(__name__)
API_VERSION = constants.API_VERSION
CONF = cfg.CONF
class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
def __init__(self):
super(KeepalivedAmphoraDriverMixin, self).__init__()
# The Mixed class must define a self.client object for the
# AmphoraApiClient
def update_vrrp_conf(self, loadbalancer):
"""Update amphorae of the loadbalancer with a new VRRP configuration
:param loadbalancer: loadbalancer object
"""
templater = jinja_cfg.KeepalivedJinjaTemplater()
LOG.debug("Update loadbalancer %s amphora VRRP configuration.",
loadbalancer.id)
for amp in loadbalancer.amphorae:
# Generate Keepalived configuration from loadbalancer object
config = templater.build_keepalived_config(loadbalancer, amp)
self.client.upload_vrrp_config(amp, config)
def stop_vrrp_service(self, loadbalancer):
"""Stop the vrrp services running on the loadbalancer's amphorae
:param loadbalancer: loadbalancer object
"""
LOG.info(_LI("Stop loadbalancer %s amphora VRRP Service."),
loadbalancer.id)
for amp in loadbalancer.amphorae:
self.client.stop_vrrp(amp)
def start_vrrp_service(self, loadbalancer):
"""Start the VRRP services of all amphorae of the loadbalancer
:param loadbalancer: loadbalancer object
"""
LOG.info(_LI("Start loadbalancer %s amphora VRRP Service."),
loadbalancer.id)
for amp in loadbalancer.amphorae:
LOG.debug("Start VRRP Service on amphora %s .", amp.lb_network_ip)
self.client.start_vrrp(amp)
def reload_vrrp_service(self, loadbalancer):
"""Reload the VRRP services of all amphorae of the loadbalancer
:param loadbalancer: loadbalancer object
"""
LOG.info(_LI("Reload loadbalancer %s amphora VRRP Service."),
loadbalancer.id)
for amp in loadbalancer.amphorae:
self.client.reload_vrrp(amp)

View File

@ -0,0 +1,64 @@
# Copyright 2015 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import sys
SOCKET_TIMEOUT = 5
def get_status(sock_address):
"""Query haproxy stat socket
Only VRRP fail over if the stats socket is not responding.
:param sock_address: unix socket file
:return: 0 if haproxy responded
"""
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.settimeout(SOCKET_TIMEOUT)
s.connect(sock_address)
s.send('show stat -1 -1 -1\n')
data = ''
while True:
x = s.recv(1024)
if not x:
break
data += x
s.close()
return 0
def health_check(sock_addresses):
"""Invoke queries for all defined listeners
:param sock_addresses:
:return:
"""
status = 0
for address in sock_addresses:
status += get_status(address)
return status
def main():
# usage python haproxy_vrrp_check.py <list_of_stat_sockets>
# Note: for performance, this script loads minimal number of module.
# Loading octavia modules or any other complex construct MUST be avoided.
listeners_sockets = sys.argv[1:]
try:
status = health_check(listeners_sockets)
except Exception:
sys.exit(1)
sys.exit(status)

View File

@ -22,7 +22,7 @@ from oslo_db import options as db_options
from oslo_log import log as logging
import oslo_messaging as messaging
from octavia.common import constants
from octavia.common import utils
from octavia.i18n import _LI
from octavia import version
@ -178,6 +178,9 @@ haproxy_amphora_opts = [
cfg.IntOpt('connection_retry_interval',
default=5,
help=_('Retry timeout between attempts in seconds.')),
cfg.StrOpt('haproxy_stick_size', default='10k',
help=_('Size of the HAProxy stick table. Accepts k, m, g '
'suffixes. Example: 10k')),
# REST server
cfg.IPOpt('bind_host', default='0.0.0.0',
@ -239,7 +242,13 @@ controller_worker_opts = [
help=_('Name of the network driver to use')),
cfg.StrOpt('cert_generator',