Implement UDP heartbeat sender and receiver

Used binary compressed encoding of json dumped object. To reduce
the size needed to send heart beats incase some stats objects
start getting sent later on. Also used sha256 instead of sha1
with hmac.

Co-Authored-By: Michael Johnson <johnsomor@gmail.com>
Co-Authored-By: German Eichberger <german.eichbeger@hp.com>
Co-Authored-By: Carlos Garza <carlos.garza@rackspace.com>
Partially implements: health-manager
Change-Id: I932c693101b94c9132e1741291610508876eab43
This commit is contained in:
Carlos D. Garza 2015-07-14 20:12:42 -05:00 committed by Carlos Garza
parent f849f55e5e
commit ccd7865350
52 changed files with 1988 additions and 519 deletions

View File

@ -13,7 +13,7 @@
#
#Example for client use:
#
#curl -k -v --key client.key --cacert ca_01.pem --cert client.pem https://0.0.0.0:8443/
#curl -k -v --key client.key --cacert ca_01.pem --cert client.pem https://0.0.0.0:9443/
#
#
#Notes:

View File

@ -67,6 +67,8 @@ function octavia_configure {
iniset $OCTAVIA_CONF controller_worker compute_driver compute_nova_driver
iniset $OCTAVIA_CONF controller_worker network_driver allowed_address_pairs_driver
iniset $OCTAVIA_CONF health_manager heartbeat_key ${OCTAVIA_HEALTH_KEY}
iniset $OCTAVIA_CONF DEFAULT api_handler queue_producer
iniset $OCTAVIA_CONF oslo_messaging_rabbit rabbit_port 5672
@ -118,7 +120,7 @@ function build_mgmt_network {
neutron security-group-create lb-mgmt-sec-grp
neutron security-group-rule-create --protocol icmp lb-mgmt-sec-grp
neutron security-group-rule-create --protocol tcp --port-range-min 22 --port-range-max 22 lb-mgmt-sec-grp
neutron security-group-rule-create --protocol tcp --port-range-min 8443 --port-range-max 8443 lb-mgmt-sec-grp
neutron security-group-rule-create --protocol tcp --port-range-min 9443 --port-range-max 9443 lb-mgmt-sec-grp
OCTAVIA_MGMT_SEC_GRP_ID=$(nova secgroup-list | awk ' / lb-mgmt-sec-grp / {print $2}')
iniset ${OCTAVIA_CONF} controller_worker amp_secgroup_list ${OCTAVIA_MGMT_SEC_GRP_ID}

View File

@ -34,6 +34,8 @@ OCTAVIA_AMP_ACTIVE_RETRIES=${OCTAVIA_AMP_ACTIVE_RETRIES:-"500"}
OCTAVIA_AMP_IMAGE_NAME=${OCTAVIA_AMP_IMAGE_NAME:-"amphora-x64-haproxy"}
OCTAVIA_AMP_IMAGE_FILE=${OCTAVIA_AMP_IMAGE_FILE:-${OCTAVIA_DIR}/diskimage-create/${OCTAVIA_AMP_IMAGE_NAME}.qcow2}
OCTAVIA_HEALTH_KEY=${OCTAVIA_HEALTH_KEY:-"insecure"}
OCTAVIA_API_BINARY=${OCTAVIA_API_BINARY:-${OCTAVIA_BIN_DIR}/octavia-api}
OCTAVIA_CONSUMER_BINARY=${OCTAVIA_CONSUMER_BINARY:-${OCTAVIA_BIN_DIR}/octavia-worker}
OCTAVIA_HOUSEKEEPER_BINARY=${OCTAVIA_HOUSEKEEPER_BINARY:-${OCTAVIA_BIN_DIR}/octavia-housekeeping}

View File

@ -5,9 +5,8 @@ install-packages libffi-dev libssl-dev
cd /opt/amphora-agent/
pip install -r requirements.txt
python setup.py install
cp etc/init/octavia-agent.conf /etc/init/
cp etc/init/amphora-agent.conf /etc/init/
mkdir /etc/octavia
cp etc/octavia.conf /etc/octavia
# we assume certs, etc will come in through the config drive
mkdir /etc/octavia/certs
mkdir /var/lib/octavia

View File

@ -5,4 +5,4 @@ start on startup
respawn
respawn limit 2 2
exec amphora-agent --config-file /etc/octavia/octavia.conf
exec amphora-agent --config-file /etc/octavia/amphora-agent.conf

View File

@ -34,9 +34,17 @@
# configuration file.
[health_manager]
# bind_ip = 0.0.0.0
# bind_port = 5555
# controller_ip_port_list example: 127.0.0.1:5555, 127.0.0.1:5555
# controller_ip_port_list =
# failover_threads = 10
# interval = 3
# heartbeat_timeout = 10
# status_update_threads = 50
# heartbeat_interval = 10
# heartbeat_key =
# heartbeat_timeout = 60
# health_check_interval = 3
# sock_rlimit = 0
[keystone_authtoken]
# auth_uri = https://localhost:5000/v3
@ -89,8 +97,6 @@
# respawn_interval = 2
# Change for production to a ram drive
# haproxy_cert_dir = /tmp
# agent_server_cert = /etc/octavia/certs/server.pem
# agent_server_ca = /etc/octavia/certs/client_ca.pem
[controller_worker]
# amp_active_retries = 10

View File

@ -0,0 +1,57 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import jinja2
from octavia.common.config import cfg
from octavia.common import constants
CONF = cfg.CONF
CONF.import_group('amphora_agent', 'octavia.common.config')
CONF.import_group('haproxy_amphora', 'octavia.common.config')
CONF.import_group('health_manager', 'octavia.common.config')
TEMPLATES_DIR = (os.path.dirname(os.path.realpath(__file__)) +
constants.AGENT_API_TEMPLATES + '/')
class AgentJinjaTemplater(object):
def __init__(self):
template_loader = jinja2.FileSystemLoader(searchpath=os.path.dirname(
TEMPLATES_DIR))
jinja_env = jinja2.Environment(loader=template_loader)
self.agent_template = jinja_env.get_template(
constants.AGENT_CONF_TEMPLATE)
def build_agent_config(self, amphora_id):
return self.agent_template.render(
{'agent_server_ca': CONF.amphora_agent.agent_server_ca,
'agent_server_cert': CONF.amphora_agent.agent_server_cert,
'agent_server_network_dir':
CONF.amphora_agent.agent_server_network_dir,
'amphora_id': amphora_id,
'base_cert_dir': CONF.haproxy_amphora.base_cert_dir,
'base_path': CONF.haproxy_amphora.base_path,
'bind_host': CONF.haproxy_amphora.bind_host,
'bind_port': CONF.haproxy_amphora.bind_port,
'controller_list': CONF.health_manager.controller_ip_port_list,
'debug': CONF.debug,
'haproxy_cmd': CONF.haproxy_amphora.haproxy_cmd,
'heartbeat_interval': CONF.health_manager.heartbeat_interval,
'heartbeat_key': CONF.health_manager.heartbeat_key,
'respawn_count': CONF.haproxy_amphora.respawn_count,
'respawn_interval': CONF.haproxy_amphora.respawn_interval})

View File

@ -22,12 +22,12 @@ MODE_OWNER = 0o600
BUFFER = 1024
CONF = cfg.CONF
CONF.import_group('haproxy_amphora', 'octavia.common.config')
CONF.import_group('amphora_agent', 'octavia.common.config')
def upload_server_cert():
stream = flask.request.stream
with open(CONF.haproxy_amphora.agent_server_cert, 'w') as crt_file:
with open(CONF.amphora_agent.agent_server_cert, 'w') as crt_file:
b = stream.read(BUFFER)
while b:
crt_file.write(b)

View File

@ -43,7 +43,7 @@ for code in six.iterkeys(exceptions.default_exceptions):
# Tested with curl -k -XPUT --data-binary @/tmp/test.txt
# https://127.0.0.1:8443/0.5/listeners/123/haproxy
# https://127.0.0.1:9443/0.5/listeners/123/haproxy
@app.route('/' + api_server.VERSION + '/listeners/<listener_id>/haproxy',
methods=['PUT'])
def upload_haproxy_config(listener_id):

View File

@ -18,6 +18,7 @@ import os
from oslo_config import cfg
CONF = cfg.CONF
CONF.import_group('amphora_agent', 'octavia.common.config')
CONF.import_group('haproxy_amphora', 'octavia.common.config')
UPSTART_DIR = '/etc/init'
@ -31,7 +32,7 @@ def haproxy_dir(listener_id):
def pid_path(listener_id):
return os.path.join(haproxy_dir(listener_id), 'haproxy.pid')
return os.path.join(haproxy_dir(listener_id), listener_id + '.pid')
def config_path(listener_id):
@ -63,5 +64,5 @@ def is_listener_running(listener_id):
def get_network_interface_file(interface):
return os.path.join(CONF.haproxy_amphora.agent_server_network_dir,
return os.path.join(CONF.amphora_agent.agent_server_network_dir,
interface + '.cfg')

View File

@ -0,0 +1,36 @@
{# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#}
[DEFAULT]
debug = {{ debug }}
[haproxy_amphora]
base_cert_dir = {{ base_cert_dir }}
base_path = {{ base_path }}
bind_host = {{ bind_host }}
bind_port = {{ bind_port }}
haproxy_cmd = {{ haproxy_cmd }}
respawn_count = {{ respawn_count }}
respawn_interval = {{ respawn_interval }}
[health_manager]
controller_ip_port_list = {{ controller_list|join(', ') }}
heartbeat_interval = {{ heartbeat_interval }}
heartbeat_key = {{ heartbeat_key }}
[amphora_agent]
agent_server_ca = {{ agent_server_ca }}
agent_server_cert = {{ agent_server_cert }}
agent_server_network_dir = {{ agent_server_network_dir }}
amphora_id = {{ amphora_id }}

View File

@ -1,76 +0,0 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import json
import octavia.amphorae.backends.health_daemon.singleton as singleton
@singleton.singleton
class JSONFileConfig(collections.Mapping):
def __init__(self):
self.filename = None
self.conf = {}
self.observers = set()
""" Set the config filename and perform the first read
:param filename: a JSON file that contains the config
"""
def set_filename(self, filename):
self.filename = filename
self.read_config()
def __iter__(self):
return iter(self.conf)
def __getitem__(self, k):
return self.conf[k]
def __len__(self):
return len(self.conf)
""" Add a callable to be notified of config changes
:param obs: a callable to receive change events
"""
def add_observer(self, obs):
self.observers.add(obs)
""" Remove a callable to be notified of config changes
By design if the callable passed doesn't exist then just return
:param obs: a callable to attempt to remove
"""
def remove_observer(self, obs):
self.observers.discard(obs)
""" Force a reread of the config file and inform all observers
"""
def check_update(self):
self.read_config()
self.confirm_update()
def confirm_update(self):
for observer in self.observers:
observer()
def read_config(self):
if self.filename is None:
return
self.cfile = open(self.filename, 'r')
self.conf = json.load(self.cfile)

121
octavia/amphorae/backends/health_daemon/health_daemon.py Executable file → Normal file
View File

@ -14,52 +14,93 @@
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import sys
import os
import time
import config
import health_sender
from oslo_config import cfg
from oslo_log import log as logging
import six
from octavia.amphorae.backends.agent.api_server import util
from octavia.amphorae.backends.health_daemon import health_sender
from octavia.amphorae.backends.utils import haproxy_query
from octavia.i18n import _LI
if six.PY2:
import Queue as queue
else:
import queue
CONF = cfg.CONF
CONF.import_group('amphora_agent', 'octavia.common.config')
CONF.import_group('haproxy_amphora', 'octavia.common.config')
CONF.import_group('health_manager', 'octavia.common.config')
LOG = logging.getLogger(__name__)
SEQ = 0
def run_sender():
def list_sock_stat_files(hadir=None):
stat_sock_files = {}
if hadir is None:
hadir = CONF.haproxy_amphora.base_path
listener_ids = util.get_listeners()
for listener_id in listener_ids:
sock_file = listener_id + ".sock"
stat_sock_files[listener_id] = os.path.join(hadir, sock_file)
return stat_sock_files
def run_sender(cmd_queue):
LOG.info(_LI('Health Manager Sender starting.'))
sender = health_sender.UDPStatusSender()
cfg = config.JSONFileConfig()
sighup_received = False
seq = 0
while True:
if sighup_received:
print('re-reading config file')
sighup_received = False
cfg.check_update()
message = {'not the answer': 43,
'id': cfg['id'],
'seq': seq}
seq = seq + 1
message = build_stats_message()
sender.dosend(message)
time.sleep(cfg['delay'])
def parse_args():
parser = argparse.ArgumentParser(description='Health Sender Daemon')
parser.add_argument('-c', '--config', type=str, required=False,
help='config file path',
default='/etc/amphora/status_sender.json')
args = parser.parse_args()
return vars(args)
if __name__ == '__main__':
args = parse_args()
cfg = config.JSONFileConfig()
try:
cfg.set_filename(args['config'])
except IOError as exception:
print(exception)
sys.exit(1)
cmd = cmd_queue.get_nowait()
if cmd is 'reload':
LOG.info(_LI('Reloading configuration'))
CONF.reload_config_files()
elif cmd is 'shutdown':
LOG.info(_LI('Health Manager Sender shutting down.'))
break
except queue.Empty:
pass
time.sleep(CONF.health_manager.heartbeat_interval)
# Now start up the sender loop
run_sender()
sys.exit(0)
def get_stats(stat_sock_file):
stats_query = haproxy_query.HAProxyQuery(stat_sock_file)
stats = stats_query.show_stat()
pool_status = stats_query.get_pool_status()
return stats, pool_status
def build_stats_message():
global SEQ
msg = {'id': CONF.amphora_agent.amphora_id,
'seq': SEQ, "listeners": {}}
SEQ += 1
stat_sock_files = list_sock_stat_files()
for listener_id, stat_sock_file in six.iteritems(stat_sock_files):
listener_dict = {'pools': {}, 'status': 'DOWN',
'stats': {'tx': 0, 'rx': 0,
'conns': 0, 'totconns': 0}}
msg['listeners'][listener_id] = listener_dict
if util.is_listener_running(listener_id):
(stats, pool_status) = get_stats(stat_sock_file)
listener_dict = msg['listeners'][listener_id]
for row in stats:
if row['svname'] == 'FRONTEND':
listener_dict['stats']['tx'] = int(row['bout'])
listener_dict['stats']['rx'] = int(row['bin'])
listener_dict['stats']['conns'] = int(row['scur'])
listener_dict['stats']['totconns'] = int(row['stot'])
listener_dict['status'] = row['status']
for oid, pool in six.iteritems(pool_status):
if oid != listener_id:
pool_id = oid
pools = listener_dict['pools']
pools[pool_id] = {"status": pool['status'],
"members": pool['members']}
return msg

View File

@ -12,43 +12,63 @@
# License for the specific language governing permissions and limitations
# under the License.
# TODO(barclaac) Need to decide how this hooks into rest of system,
# e.g. daemon, subprocess, thread etc.
import socket
import config
import status_message
from oslo_config import cfg
from oslo_log import log as logging
from octavia.amphorae.backends.health_daemon import status_message
from octavia.i18n import _LE
CONF = cfg.CONF
CONF.import_group('health_manager', 'octavia.common.config')
LOG = logging.getLogger(__name__)
class UDPStatusSender:
def round_robin_addr(addrinfo_list):
if len(addrinfo_list) <= 0:
return None
addrinfo = addrinfo_list.pop(0)
addrinfo_list.append(addrinfo)
return addrinfo
class UDPStatusSender(object):
def __init__(self):
self.cfg = config.JSONFileConfig()
self.dests = {}
self.update(self.cfg['destination'], self.cfg['port'])
self.dests = []
for ipport in CONF.health_manager.controller_ip_port_list:
parts = ipport.split(':')
self.update(parts[0], parts[1])
self.v4sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.v6sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
self.key = str(self.cfg['key'])
self.cfg.add_observer(self.config_change)
self.key = str(CONF.health_manager.heartbeat_key)
# TODO(barclaac) Still need to reread the address list if it gets changed
def config_change(self):
pass
def update(self, dest_list, port):
for dest in dest_list:
def update(self, dest, port):
addrlist = socket.getaddrinfo(dest, port, 0, socket.SOCK_DGRAM)
# addrlist = [(family, socktype, proto, canonname, sockaddr) ...]
# e.g. 4 = sockaddr - what we actually need
for addr in addrlist:
self.dests[addr[4]] = addr
self.dests.append(addr) # Just grab the first match
break
def dosend(self, envelope):
envelope_str = status_message.encode(envelope, self.key)
for dest in self.dests.itervalues():
# addrlist = [(family, socktype, proto, canonname, sockaddr) ...]
def dosend(self, obj):
envelope_str = status_message.wrap_envelope(obj, self.key)
addrinfo = round_robin_addr(self.dests)
# dest = (family, socktype, proto, canonname, sockaddr)
# e.g. 0 = sock family, 4 = sockaddr - what we actually need
if dest[0] == socket.AF_INET:
self.v4sock.sendto(envelope_str, dest[4])
elif dest[0] == socket.AF_INET6:
self.v6sock.sendto(envelope_str, dest[4])
if addrinfo is None:
LOG.error(_LE('No controller address found. '
'Unable to send heartbeat.'))
return
try:
if addrinfo[0] == socket.AF_INET:
self.v4sock.sendto(envelope_str, addrinfo[4])
elif addrinfo[0] == socket.AF_INET6:
self.v6sock.sendto(envelope_str, addrinfo[4])
except socket.error:
# Pass here as on amp boot it will get one or more
# error: [Errno 101] Network is unreachable
# while the networks are coming up
# No harm in trying to send as it will still failover
# if the message isn't received
pass

View File

@ -1,7 +0,0 @@
{
"key": "asamplekey",
"delay": 2.5,
"destination": [ "::1", "127.1" ],
"port": 12345,
"id": "0dc47eda-872b-11e4-920b-000c294b76ae"
}

View File

@ -1,26 +0,0 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# TODO(barclaac) Someone needs to move this to be a library function for
# all of Octavia (Oslo even?)
def singleton(cls):
instances = {}
def getinstance():
if cls not in instances:
instances[cls] = cls()
return instances[cls]
return getinstance

View File

@ -12,22 +12,61 @@
# License for the specific language governing permissions and limitations
# under the License.
import binascii
import hashlib
import hmac
import json
import zlib
from oslo_log import log as logging
from octavia.common import exceptions
from octavia.i18n import _LW
LOG = logging.getLogger(__name__)
hash_algo = hashlib.sha256
hash_len = 32
def encode(msg, key):
result = {}
src = json.dumps(msg)
hmc = hmac.new(key.encode('ascii'), src.encode('ascii'), hashlib.sha1)
result['msg'] = msg
result['hmac'] = hmc.hexdigest()
return json.dumps(result)
def to_hex(byte_array):
return binascii.hexlify(byte_array).decode()
def checkhmac(envelope_str, key):
envelope = json.loads(envelope_str)
src = json.dumps(envelope['msg'])
hmc = hmac.new(key.encode('ascii'), src.encode('ascii'), hashlib.sha1)
return hmc.hexdigest() == envelope['hmac']
def encode_obj(obj):
json_bytes = json.dumps(obj).encode('utf-8')
binary_array = zlib.compress(json_bytes, 9)
return binary_array
def decode_obj(binary_array):
json_str = zlib.decompress(binary_array).decode('utf-8')
obj = json.loads(json_str)
return obj
def wrap_envelope(obj, key):
payload = encode_obj(obj)
hmc = get_hmac(payload, key)
envelope = payload + hmc
return envelope
def unwrap_envelope(envelope, key):
payload = envelope[:-hash_len]
expected_hmc = envelope[-hash_len:]
calculated_hmc = get_hmac(payload, key)
if expected_hmc != calculated_hmc:
LOG.warn(_LW('calculated hmac: %(s1)s not equal to msg hmac: '
'%(s2)s dropping packet'), {'s1': to_hex(calculated_hmc),
's2': to_hex(expected_hmc)})
fmt = 'calculated hmac: {0} not equal to msg hmac: {1} dropping packet'
raise exceptions.InvalidHMACException(fmt.format(
to_hex(calculated_hmc), to_hex(expected_hmc)))
obj = decode_obj(payload)
return obj
def get_hmac(payload, key):
hmc = hmac.new(key.encode("utf-8"), payload, hashlib.sha256)
return hmc.digest()

View File

@ -90,8 +90,7 @@ class HAProxyQuery(object):
"""
results = self._query(
'show stat {proxy_iid} {object_type}'
+ '{server_id}'.format(
'show stat {proxy_iid} {object_type} {server_id}'.format(
proxy_iid=proxy_iid,
object_type=object_type,
server_id=server_id))
@ -118,16 +117,16 @@ class HAProxyQuery(object):
# pxname: pool, svname: server_name, status: status
# All the way up is UP, otherwise call it DOWN
if line['status'] != consts.AMPHORA_UP:
line['status'] = consts.AMPHORA_DOWN
if line['status'] != consts.UP:
line['status'] = consts.DOWN
if line['pxname'] not in final_results:
final_results[line['pxname']] = dict(members=[])
final_results[line['pxname']] = dict(members={})
if line['svname'] == 'BACKEND':
final_results[line['pxname']]['uuid'] = line['pxname']
final_results[line['pxname']]['status'] = line['status']
else:
final_results[line['pxname']]['members'].append(
{line['svname']: line['status']})
final_results[line['pxname']]['members'][line['svname']] = (
line['status'])
return final_results

View File

@ -22,6 +22,7 @@ import requests
import six
from stevedore import driver as stevedore_driver
from octavia.amphorae.driver_exceptions import exceptions as driver_except
from octavia.amphorae.drivers import driver_base as driver_base
from octavia.amphorae.drivers.haproxy import exceptions as exc
from octavia.amphorae.drivers.haproxy.jinja import jinja_cfg
@ -244,10 +245,10 @@ class AmphoraAPIClient(object):
LOG.warn(_LW("Could not talk to instance"))
time.sleep(CONF.haproxy_amphora.connection_retry_interval)
if a >= CONF.haproxy_amphora.connection_max_retries:
raise exc.TimeOutException()
raise driver_except.TimeOutException()
else:
return r
raise exc.UnavailableException()
raise driver_except.UnavailableException()
def upload_config(self, amp, listener_id, config):
r = self.put(

View File

@ -0,0 +1,181 @@
# Copyright 2014 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
from concurrent import futures
from oslo_config import cfg
from oslo_log import log as logging
from octavia.amphorae.backends.health_daemon import status_message
from octavia.common import exceptions
from octavia.db import repositories
from octavia.i18n import _LI
UDP_MAX_SIZE = 64 * 1024
LOG = logging.getLogger(__name__)
class UDPStatusGetter(object):
"""This class defines methods that will gather heatbeats
The heartbeats are transmitted via UDP and this class will bind to a port
and absorb them
"""
def __init__(self, health_update, stats_update):
self.stats_update = stats_update
self.health_update = health_update
self.key = cfg.CONF.health_manager.heartbeat_key
self.ip = cfg.CONF.health_manager.bind_ip
self.port = cfg.CONF.health_manager.bind_port
self.sockaddr = None
LOG.info(_LI(
'attempting to listen on {0} port {1}').format(
self.ip, self.port))
self.sock = None
self.update(self.key, self.ip, self.port)
self.executor = futures.ThreadPoolExecutor(
max_workers=cfg.CONF.health_manager.status_update_threads)
self.repo = repositories.Repositories().amphorahealth
def update(self, key, ip, port):
"""Update the running config for the udp socket server
:param key: The hmac key used to verify the UDP packets. String
:param ip: The ip address the UDP server will read from
:param port: The port the UDP server will read from
:return: None
"""
self.key = key
for addrinfo in socket.getaddrinfo(ip, port, 0, socket.SOCK_DGRAM):
ai_family = addrinfo[0]
self.sockaddr = addrinfo[4]
if self.sock is not None:
self.sock.close()
self.sock = socket.socket(ai_family, socket.SOCK_DGRAM)
self.sock.bind((ip, port))
if cfg.CONF.health_manager.sock_rlimit > 0:
rlimit = cfg.CONF.health_manager.sock_rlimit
LOG.info(_LI("setting sock rlimit to {0}").format(rlimit))
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
rlimit)
break # just used the first addr getaddrinfo finds
if self.sock is None:
raise exceptions.NetworkConfig("unable to find suitable socket")
def dorecv(self, *args, **kw):
"""Waits for a UDP heart beat to be sent.
:return: Returns the unwrapped payload and addr that sent the \
heart beat. The format of the obj from the UDP sender is of
the form. Not that listener_1 has not pools and listener_4
has no nodes.
{"listeners": {
"listener_uuid_1": {
"pools": {},
"status": "OPEN",
"stats": {
"conns": 0,
"rx": 0,
"tx": 0
}
},
"listener_uuid_2": {
"pools": {
"pool_uuid_1": {
"members": [
{
"member_uuid_1": "DOWN"
},
{
"member_uuid_2": "DOWN"
},
{
"member_uuid_3": "DOWN"
},
{
"member_uuid_4": "DOWN"
}
]
}
},
"status": "OPEN",
"stats": {
"conns": 0,
"rx": 0,
"tx": 0
}
},
"listener_uuid_3": {
"pools": {
"pool_uuid_2": {
"members": [
{
"member_uuid_5": "DOWN"
},
{
"member_uuid_6": "DOWN"
},
{
"member_uuid_7": "DOWN"
},
{
"member_uuid_8": "DOWN"
}
]
}
},
"status": "OPEN",
"stats": {
"conns": 0,
"rx": 0,
"tx": 0
}
},
"listener_uuid_4": {
"pools": {
"pool_uuid_3": {
"members": []
}
},
"status": "OPEN",
"stats": {
"conns": 0,
"rx": 0,
"tx": 0
}
}
},
"id": "amphora_uuid",
"seq": 1033
}
"""
(data, srcaddr) = self.sock.recvfrom(UDP_MAX_SIZE)
obj = status_message.unwrap_envelope(data, self.key)
return obj, srcaddr
def check(self):
try:
(obj, srcaddr) = self.dorecv()
if self.health_update:
self.executor.submit(self.health_update.update_health, obj)
if self.stats_update:
self.executor.submit(self.stats_update.update_stats, obj)
except exceptions.InvalidHMACException:
# Pass here as the packet was dropped and logged already
pass

View File

@ -17,6 +17,8 @@
# make sure PYTHONPATH includes the home directory if you didn't install
import logging
import multiprocessing as multiproc
import os
import ssl
import sys
@ -24,11 +26,14 @@ from oslo_config import cfg
from werkzeug import serving
from octavia.amphorae.backends.agent.api_server import server
from octavia.amphorae.backends.health_daemon import health_daemon
from octavia.common import service
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_group('amphora_agent', 'octavia.common.config')
CONF.import_group('haproxy_amphora', 'octavia.common.config')
HM_SENDER_CMD_QUEUE = multiproc.Queue()
# Hack: Use werkzeugs context
@ -59,12 +64,21 @@ def main():
# comment out to improve logging
service.prepare_service(sys.argv)
# Workaround for an issue with the auto-reload used below in werkzeug
# Without it multiple health senders get started when werkzeug reloads
if not os.environ.get('WERKZEUG_RUN_MAIN'):
health_sender_proc = multiproc.Process(name='HM_sender',
target=health_daemon.run_sender,
args=(HM_SENDER_CMD_QUEUE,))
health_sender_proc.daemon = True
health_sender_proc.start()
# We will only enforce that the client cert is from the good authority
# todo(german): Watch this space for security improvements
ctx = OctaviaSSLContext(ssl.PROTOCOL_SSLv23)
ctx.load_cert_chain(CONF.haproxy_amphora.agent_server_cert,
ca=CONF.haproxy_amphora.agent_server_ca)
ctx.load_cert_chain(CONF.amphora_agent.agent_server_cert,
ca=CONF.amphora_agent.agent_server_ca)
# This will trigger a reload if any files change and
# in particular the certificate file
@ -74,4 +88,4 @@ def main():
use_debugger=CONF.debug,
ssl_context=ctx,
use_reloader=True,
extra_files=[CONF.haproxy_amphora.agent_server_cert])
extra_files=[CONF.amphora_agent.agent_server_cert])

View File

@ -14,13 +14,15 @@
#
import multiprocessing
import sys
import time
from oslo_config import cfg
from oslo_log import log as logging
from octavia.amphorae.drivers.health import heartbeat_udp
from octavia.common import service
from octavia.controller.healthmanager import health_manager
from octavia.controller.healthmanager import update_health_mixin
from octavia.controller.healthmanager import update_stats_mixin
from octavia.i18n import _LI
CONF = cfg.CONF
@ -28,16 +30,18 @@ LOG = logging.getLogger(__name__)
CONF.import_group('health_manager', 'octavia.common.config')
def HM_listener():
def hm_listener():
# TODO(german): steved'or load those drivers
udp_getter = heartbeat_udp.UDPStatusGetter(
update_health_mixin.UpdateHealthMixin(),
update_stats_mixin.UpdateStatsMixin())
while True:
time.sleep(5)
# to do by Carlos
udp_getter.check()
def HM_health_check():
while True:
time.sleep(CONF.health_manager.interval)
def hm_health_check():
hm = health_manager.HealthManager()
while True:
hm.health_check()
@ -45,21 +49,21 @@ def main():
service.prepare_service(sys.argv)
processes = []
HM_listener_proc = multiprocessing.Process(name='HM_listener',
target=HM_listener)
processes.append(HM_listener_proc)
HM_health_check_proc = multiprocessing.Process(name='HM_health_check',
target=HM_health_check)
processes.append(HM_health_check_proc)
hm_listener_proc = multiprocessing.Process(name='HM_listener',
target=hm_listener)
processes.append(hm_listener_proc)
hm_health_check_proc = multiprocessing.Process(name='HM_health_check',
target=hm_health_check)
processes.append(hm_health_check_proc)
LOG.info(_LI("Health Manager listener process starts:"))
HM_listener_proc.start()
hm_listener_proc.start()
LOG.info(_LI("Health manager check process starts:"))
HM_health_check_proc.start()
hm_health_check_proc.start()
try:
for process in processes:
process.join()
except KeyboardInterrupt:
LOG.info(_LI("Health Manager existing due to signal"))
HM_listener_proc.terminate()
HM_health_check_proc.terminate()
hm_listener_proc.terminate()
hm_health_check_proc.terminate()

View File

@ -80,21 +80,59 @@ core_opts = [
help=_('Name of the controller plugin to use'))
]
# Options only used by the amphora agent
amphora_agent_opts = [
cfg.StrOpt('agent_server_ca', default='/etc/octavia/certs/client_ca.pem',
help=_("The ca which signed the client certificates")),
cfg.StrOpt('agent_server_cert', default='/etc/octavia/certs/server.pem',
help=_("The server certificate for the agent.py server "
"to use")),
cfg.StrOpt('agent_server_network_dir',
default='/etc/network/interfaces.d/',
help=_("The directory where new network interfaces "
"are located")),
# Do not specify in octavia.conf, loaded at runtime
cfg.StrOpt('amphora_id', help=_("The amphora ID.")),
]
networking_opts = [
cfg.StrOpt('lb_network_name', help=_('Name of amphora internal network')),
]
healthmanager_opts = [
cfg.StrOpt('bind_ip', default='0.0.0.0',
help=_('IP address the controller will listen on for '
'heart beats')),
cfg.IntOpt('bind_port', default=5555,
help=_('Port number the controller will listen on'
'for heart beats')),
cfg.IntOpt('failover_threads',
default=10,
help=_('Number of threads performing amphora failovers.')),
cfg.IntOpt('interval',
default=3,
help=_('Sleep time between health checks in seconds.')),
cfg.IntOpt('status_update_threads',
default=50,
help=_('Number of threads performing amphora failovers.')),
cfg.StrOpt('heartbeat_key',
help=_('key used to validate amphora sending'
'the message'), secret=True),
cfg.IntOpt('heartbeat_timeout',
default=10,
default=60,
help=_('Interval, in seconds, to wait before failing over an '
'amphora.')),
cfg.IntOpt('health_check_interval',
default=3,
help=_('Sleep time between health checks in seconds.')),
cfg.IntOpt('sock_rlimit', default=0,
help=_(' sets the value of the heartbeat recv buffer')),
# Used by the health manager on the amphora
cfg.ListOpt('controller_ip_port_list',
help=_('List of controller ip and port pairs for the '
'heartbeat receivers. Example [\'127.0.0.1:5555\', '
'\'127.0.0.1:5555\']')),
cfg.IntOpt('heartbeat_interval',
default=10,
help=_('Sleep time between sending hearthbeats.'))
]
oslo_messaging_opts = [
@ -133,7 +171,7 @@ haproxy_amphora_opts = [
# REST server
cfg.StrOpt('bind_host', default='0.0.0.0',
help=_("The host IP to bind to")),
cfg.IntOpt('bind_port', default=8443,
cfg.IntOpt('bind_port', default=9443,
help=_("The port to bind to")),
cfg.StrOpt('haproxy_cmd', default='/usr/sbin/haproxy',
help=_("The full path to haproxy")),
@ -143,15 +181,6 @@ haproxy_amphora_opts = [
help=_("The respawn interval for haproxy's upstart script")),
cfg.StrOpt('haproxy_cert_dir', default='/tmp/',
help=_("The directory to store haproxy cert files in")),
cfg.StrOpt('agent_server_cert', default='/etc/octavia/certs/server.pem',
help=_("The server certificate for the agent.py server "
"to use")),
cfg.StrOpt('agent_server_ca', default='/etc/octavia/certs/client_ca.pem',
help=_("The ca which signed the client certificates")),
cfg.StrOpt('agent_server_network_dir',
default='/etc/network/interfaces.d/',
help=_("The directory where new network interfaces "
"are located")),
# REST client
cfg.StrOpt('client_cert', default='/etc/octavia/certs/client.pem',
help=_("The client certificate to talk to the agent")),
@ -236,6 +265,7 @@ house_keeping_opts = [
# Register the configuration options
cfg.CONF.register_opts(core_opts)
cfg.CONF.register_opts(amphora_agent_opts, group='amphora_agent')
cfg.CONF.register_opts(networking_opts, group='networking')
cfg.CONF.register_opts(oslo_messaging_opts, group='oslo_messaging')
cfg.CONF.register_opts(haproxy_amphora_opts, group='haproxy_amphora')

View File

@ -42,12 +42,6 @@ SUPPORTED_PROTOCOLS = (PROTOCOL_TCP, PROTOCOL_HTTPS, PROTOCOL_HTTP,
# the provisioning_status table
# Amphora has been allocated to a load balancer
AMPHORA_ALLOCATED = 'ALLOCATED'
# Amphora healthy with listener(s) deployed
# TODO(johnsom) This doesn't exist
AMPHORA_UP = 'UP'
# Amphora unhealthy with listener(s) deployed
# TODO(johnsom) This doesn't exist
AMPHORA_DOWN = 'DOWN'
# Amphora is being built
AMPHORA_BOOTING = 'BOOTING'
# Amphora is ready to be allocated to a load balancer
@ -65,9 +59,8 @@ SUPPORTED_PROVISIONING_STATUSES = (ACTIVE, AMPHORA_ALLOCATED,
PENDING_UPDATE, DELETED, ERROR)
MUTABLE_STATUSES = (ACTIVE,)
SUPPORTED_AMPHORA_STATUSES = (AMPHORA_ALLOCATED, AMPHORA_UP, AMPHORA_DOWN,
AMPHORA_BOOTING, AMPHORA_READY, DELETED,
PENDING_DELETE)
SUPPORTED_AMPHORA_STATUSES = (AMPHORA_ALLOCATED, AMPHORA_BOOTING,
AMPHORA_READY, DELETED, PENDING_DELETE)
ONLINE = 'ONLINE'
OFFLINE = 'OFFLINE'
@ -150,3 +143,18 @@ SUPPORTED_LB_TOPOLOGIES = (TOPOLOGY_ACTIVE_STANDBY, TOPOLOGY_SINGLE)
SUPPORTED_AMPHORA_ROLES = (ROLE_BACKUP, ROLE_MASTER, ROLE_STANDALONE)
AGENT_API_TEMPLATES = '/templates'
AGENT_CONF_TEMPLATE = 'amphora_agent_conf.template'
OPEN = 'OPEN'
FULL = 'FULL'
# OPEN = HAProxy listener status nbconn < maxconn
# FULL = HAProxy listener status not nbconn < maxconn
HAPROXY_LISTENER_STATUSES = (OPEN, FULL)
UP = 'UP'
DOWN = 'DOWN'
# UP = HAProxy backend has working or no servers
# DOWN = HAProxy backend has no working servers
HAPROXY_BACKEND_STATUSES = (UP, DOWN)

View File

@ -80,10 +80,18 @@ class InvalidOption(APIException):
code = 400
class InvalidHMACException(OctaviaException):
message = _("HMAC hashes didn't match")
class MissingArguments(OctaviaException):
message = _("Missing arguments.")
class NetworkConfig(OctaviaException):
message = _("Unable to allocate network resource from config")
class NeedsPassphrase(OctaviaException):
message = _("Passphrase needed to decrypt key but client "
"did not provide one.")

View File

@ -106,11 +106,11 @@ class VirtualMachineManager(compute_base.ComputeBase):
try:
amphora = self.get_amphora(amphora_id=amphora_id)
if amphora and amphora.status == 'ACTIVE':
return constants.AMPHORA_UP
return constants.UP
except Exception:
LOG.exception(_LE("Error retrieving nova virtual machine status."))
raise exceptions.ComputeStatusException()
return constants.AMPHORA_DOWN
return constants.DOWN
def get_amphora(self, amphora_id):
'''Retrieve the information in nova of a virtual machine.

View File

@ -39,7 +39,7 @@ class HealthManager(object):
with futures.ThreadPoolExecutor(max_workers=self.threads) as executor:
try:
while True:
time.sleep(CONF.health_manager.interval)
time.sleep(CONF.health_manager.health_check_interval)
session = db_api.get_session()
LOG.debug("Starting amphora health check")
failover_count = 0

View File

@ -12,14 +12,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from oslo_log import log as logging
import sqlalchemy
from sqlalchemy.sql import func
from octavia.amphorae.drivers import driver_base as driver_base
from octavia.common import constants
from octavia.db import api as db_api
from octavia.db import repositories as repo
from octavia.i18n import _LE, _LW
import six
@ -31,8 +33,10 @@ class UpdateHealthMixin(driver_base.HealthMixin):
def __init__(self):
super(UpdateHealthMixin, self).__init__()
# first setup repo for amphora, listener,member(nodes),pool repo
self.amphora_repo = repo.AmphoraRepository()
self.amphora_health_repo = repo.AmphoraHealthRepository()
self.listener_repo = repo.ListenerRepository()
self.loadbalancer_repo = repo.LoadBalancerRepository()
self.member_repo = repo.MemberRepository()
self.pool_repo = repo.PoolRepository()
@ -43,76 +47,131 @@ class UpdateHealthMixin(driver_base.HealthMixin):
:type map: string
:returns: null
This function has the following 3 goals:
1)Update the health_manager table based on amphora status is up/down
2)Update related DB status to be ERROR/DOWN when amphora is down
3)Update related DB status to be ACTIVATE/UP when amphora is up
4)Track the status of the members
The input health data structure is shown as below:
health = {
"amphora-status": "AMPHORA_UP",
"amphora-id": FAKE_UUID_1,
"id": self.FAKE_UUID_1,
"listeners": {
"listener-id-1": {"listener-status": "ONLINE",
"members": {
"member-id-1": "ONLINE",
"member-id-2": "ONLINE"
}
},
"listener-id-2": {"listener-status": "ONLINE",
"members": {
"member-id-3": "ERROR",
"member-id-4": "ERROR",
"member-id-5": "ONLINE"
"listener-id-1": {"status": constants.OPEN, "pools": {
"pool-id-1": {"status": constants.UP,
"members": {"member-id-1": constants.ONLINE}
}
}
}
}
}
"""
session = db_api.get_session()
# We need to see if all of the listeners are reporting in
expected_listener_count = 0
lbs_on_amp = self.amphora_repo.get_all_lbs_on_amphora(session,
health['id'])
for lb in lbs_on_amp:
listener_count = self.listener_repo.count(session,
load_balancer_id=lb.id)
expected_listener_count += listener_count
listeners = health['listeners']
# Do not update ampohra health if the reporting listener count
# does not match the expected listener count
if len(listeners) == expected_listener_count:
# if the input amphora is healthy, we update its db info
# before update db, we need to check if the db has been created,
# if not, we need to create first
if health["amphora-status"] == constants.AMPHORA_UP:
amphora_id = health["amphora-id"]
amphora = self.amphora_health_repo.get(
session, amphora_id=amphora_id)
if amphora is None:
self.amphora_health_repo.create(session,
amphora_id=amphora_id,
last_update=func.now())
self.amphora_health_repo.replace(session, health['id'],
last_update=(datetime.
datetime.utcnow()))
else:
self.amphora_health_repo.update(session, amphora_id,
last_update=func.now())
LOG.warn(_LW('Amphora %(id)s health message reports %(found)i '
'listeners when %(expected)i expected'),
{'id': health['id'],
'found': len(listeners),
'expected': expected_listener_count})
# We got a heartbeat so lb is healthy until proven otherwise
lb_status = constants.ONLINE
# update listener and nodes db information
listeners = health['listeners']
for listener_id, listener in six.iteritems(listeners):
if listener.get("listener-status") == constants.ONLINE:
listener_model = self.listener_repo.get(session, id=listener_id)
lb_id = listener_model.load_balancer_id
listener_status = None
# OPEN = HAProxy listener status nbconn < maxconn
if listener.get('status') == constants.OPEN:
listener_status = constants.ONLINE
# FULL = HAProxy listener status not nbconn < maxconn
elif listener.get('status') == constants.FULL:
listener_status = constants.DEGRADED
if lb_status == constants.ONLINE:
lb_status = constants.DEGRADED
else:
LOG.warn(_LW('Listener %(list)s reported status of '
'%(status)s'), {'list': listener_id,
'status': listener.get('status')})
try:
if listener_status is not None:
self.listener_repo.update(
session, listener_id,
operating_status=constants.ONLINE)
operating_status=listener_status)
except sqlalchemy.orm.exc.NoResultFound:
LOG.debug("Listener %s is not in DB", listener_id)
LOG.error(_LE("Listener %s is not in DB"), listener_id)
pools = listener['pools']
for pool_id, pool in six.iteritems(pools):
pool_status = None
# UP = HAProxy backend has working or no servers
if pool.get('status') == constants.UP:
pool_status = constants.ONLINE
# DOWN = HAProxy backend has no working servers
elif pool.get('status') == constants.DOWN:
pool_status = constants.ERROR
lb_status = constants.ERROR
else:
LOG.warn(_LW('Pool %(pool)s reported status of '
'%(status)s'), {'pool': pool_id,
'status': pool.get('status')})
members = pool['members']
for member_id, status in six.iteritems(members):
member_status = None
if status == constants.UP:
member_status = constants.ONLINE
elif status == constants.DOWN:
member_status = constants.ERROR
if pool_status == constants.ONLINE:
pool_status = constants.DEGRADED
if lb_status == constants.ONLINE:
lb_status = constants.DEGRADED
else:
LOG.warn(_LW('Member %(mem)s reported status of '
'%(status)s'), {'mem': member_id,
'status': status})
elif listener.get("listener-status") == constants.ERROR:
try:
self.listener_repo.update(session, listener_id,
operating_status=constants.ERROR)
if member_status is not None:
self.member_repo.update(session, id=member_id,
operating_status=(
member_status))
except sqlalchemy.orm.exc.NoResultFound:
LOG.debug("Listener %s is not in DB", listener_id)
members = listener['members']
for member_id, member in six.iteritems(members):
if member in constants.SUPPORTED_OPERATING_STATUSES:
LOG.error(_LE("Member %s is not able to update "
"in DB"), member_id)
try:
self.member_repo.update(
session, id=member_id,
operating_status=member)
if pool_status is not None:
self.pool_repo.update(session, pool_id,
operating_status=pool_status)
except sqlalchemy.orm.exc.NoResultFound:
LOG.DEBUG("Member %s is not able to update in DB",
member_id)
LOG.error(_LE("Pool %s is not in DB"), pool_id)
try:
self.loadbalancer_repo.update(session, lb_id,
operating_status=lb_status)
except sqlalchemy.orm.exc.NoResultFound:
LOG.error(_LE("Load balancer %s is not in DB"), lb_id)

View File

@ -0,0 +1,69 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from octavia.amphorae.drivers import driver_base as driver_base
from octavia.db import api as db_api
from octavia.db import repositories as repo
import six
LOG = logging.getLogger(__name__)
class UpdateStatsMixin(driver_base.StatsMixin):
def __init__(self):
super(UpdateStatsMixin, self).__init__()
self.listener_stats_repo = repo.ListenerStatisticsRepository()
def update_stats(self, health_message):
"""This function is to update the db with listener stats
:param health_message: The health message containing the listener stats
:type map: string
:returns: null
health = {
"id": self.FAKE_UUID_1,
"listeners": {
"listener-id-1": {"status": constants.OPEN,
'stats': {'conns': 0,
'totconns': 0,
'rx': 0,
'tx': 0},
"pools": {
"pool-id-1": {"status": constants.UP,
"members": {"member-id-1": constants.ONLINE}
}
}
}
}
}
"""
session = db_api.get_session()
listeners = health_message['listeners']
for listener_id, listener in six.iteritems(listeners):
stats = listener.get('stats')
self.listener_stats_repo.replace(session, listener_id,
bytes_in=stats['rx'],
bytes_out=stats['tx'],
active_connections=stats['conns'],
total_connections=(
stats['totconns']))

View File

@ -21,6 +21,7 @@ from stevedore import driver as stevedore_driver
from taskflow import task
from taskflow.types import failure
from octavia.amphorae.backends.agent import agent_jinja_cfg
from octavia.common import constants
from octavia.common import exceptions
from octavia.i18n import _LE, _LW
@ -46,7 +47,7 @@ class BaseComputeTask(task.Task):
class ComputeCreate(BaseComputeTask):
"""Create the compute instance for a new amphora."""
def execute(self, amphora_id, ports=None, config_drive_files=None):
def execute(self, amphora_id, ports=None, config_drive_files={}):
"""Create an amphora
:returns: an amphora
@ -56,6 +57,9 @@ class ComputeCreate(BaseComputeTask):
% amphora_id)
try:
agent_cfg = agent_jinja_cfg.AgentJinjaTemplater()
config_drive_files['/etc/octavia/amphora-agent.conf'] = (
agent_cfg.build_agent_config(amphora_id))
compute_id = self.compute.build(
name="amphora-" + amphora_id,
amphora_flavor=CONF.controller_worker.amp_flavor_id,

View File

@ -35,6 +35,15 @@ CONF.import_group('health_manager', 'octavia.common.config')
class BaseRepository(object):
model_class = None
def count(self, session, **filters):
"""Retrieves a count of entities from the database.
:param session: A Sql Alchemy database session.
:param filters: Filters to decide which entities should be retrieved.
:returns: int
"""
return session.query(self.model_class).filter_by(**filters).count()
def create(self, session, **model_kwargs):
"""Base create method for a database entity.
@ -296,6 +305,19 @@ class ListenerRepository(BaseRepository):
class ListenerStatisticsRepository(BaseRepository):
model_class = models.ListenerStatistics
def replace(self, session, listener_id, **model_kwargs):
"""replace or insert listener into database."""
with session.begin(subtransactions=True):
count = session.query(self.model_class).filter_by(
listener_id=listener_id).count()
if count:
session.query(self.model_class).filter_by(
listener_id=listener_id).update(model_kwargs,
synchronize_session=False)
else:
model_kwargs['listener_id'] = listener_id
self.create(session, **model_kwargs)
def update(self, session, listener_id, **model_kwargs):
"""Updates a listener's statistics by a listener's id."""
with session.begin(subtransactions=True):
@ -396,6 +418,19 @@ class AmphoraHealthRepository(BaseRepository):
session.query(self.model_class).filter_by(
amphora_id=amphora_id).update(model_kwargs)
def replace(self, session, amphora_id, **model_kwargs):
"""replace or insert amphora into database."""
with session.begin(subtransactions=True):
count = session.query(self.model_class).filter_by(
amphora_id=amphora_id).count()
if count:
session.query(self.model_class).filter_by(
amphora_id=amphora_id).update(model_kwargs,
synchronize_session=False)
else:
model_kwargs['amphora_id'] = amphora_id
self.create(session, **model_kwargs)
def check_amphora_expired(self, session, amphora_id, exp_age=None):
"""check if a specific amphora is expired
@ -420,10 +455,9 @@ class AmphoraHealthRepository(BaseRepository):
:returns: [octavia.common.data_model]
"""
timestamp = CONF.health_manager.heartbeat_timeout
timeout = CONF.health_manager.heartbeat_timeout
expired_time = datetime.datetime.utcnow() - datetime.timedelta(
seconds=timestamp)
seconds=timeout)
with session.begin(subtransactions=True):
amp = session.query(self.model_class).with_for_update().filter_by(

View File

@ -19,6 +19,7 @@ def list_opts():
return [
('DEFAULT',
itertools.chain(octavia.common.config.core_opts)),
('amphora_agent', octavia.common.config.amphora_agent_opts),
('networking', octavia.common.config.networking_opts),
('oslo_messaging', octavia.common.config.oslo_messaging_opts),
('keystone_authtoken_v3',

View File

@ -203,7 +203,7 @@ class ServerTestCase(base.TestCase):
json.loads(rv.data.decode('utf-8')))
mock_rmtree.assert_called_with('/var/lib/octavia/123')
mock_exists.assert_called_with('/etc/init/haproxy-123.conf')
mock_exists.assert_any_call('/var/lib/octavia/123/haproxy.pid')
mock_exists.assert_any_call('/var/lib/octavia/123/123.pid')
# service is stopped + upstart script
mock_exists.side_effect = [True, False, True]
@ -339,7 +339,7 @@ class ServerTestCase(base.TestCase):
type='test',
uuid='123',
pools=[dict(
status=consts.AMPHORA_DOWN,
status=consts.DOWN,
uuid='tcp-servers',
members=[
{u'id-34833': u'DOWN'},

View File

@ -13,6 +13,7 @@
# under the License.
import datetime
import random
from oslo_utils import uuidutils
@ -696,6 +697,47 @@ class ListenerStatisticsRepositoryTest(BaseRepositoryTest):
self.assertIsNotNone(new_listener)
self.assertIsNone(new_listener.stats)
def test_replace(self):
# Test the create path
bytes_in = random.randrange(1000000000)
bytes_out = random.randrange(1000000000)
active_conns = random.randrange(1000000000)
total_conns = random.randrange(1000000000)
self.assertIsNone(self.listener_stats_repo.get(
self.session, listener_id=self.listener.id))
self.listener_stats_repo.replace(self.session, self.listener.id,
bytes_in=bytes_in,
bytes_out=bytes_out,
active_connections=active_conns,
total_connections=total_conns)
obj = self.listener_stats_repo.get(self.session,
listener_id=self.listener.id)
self.assertIsNotNone(obj)
self.assertEqual(self.listener.id, obj.listener_id)
self.assertEqual(bytes_in, obj.bytes_in)
self.assertEqual(bytes_out, obj.bytes_out)
self.assertEqual(active_conns, obj.active_connections)
self.assertEqual(total_conns, obj.total_connections)
# Test the update path
bytes_in_2 = random.randrange(1000000000)
bytes_out_2 = random.randrange(1000000000)
active_conns_2 = random.randrange(1000000000)
total_conns_2 = random.randrange(1000000000)
self.listener_stats_repo.replace(self.session, self.listener.id,
bytes_in=bytes_in_2,
bytes_out=bytes_out_2,
active_connections=active_conns_2,
total_connections=total_conns_2)
obj = self.listener_stats_repo.get(self.session,
listener_id=self.listener.id)
self.assertIsNotNone(obj)
self.assertEqual(self.listener.id, obj.listener_id)
self.assertEqual(bytes_in_2, obj.bytes_in)
self.assertEqual(bytes_out_2, obj.bytes_out)
self.assertEqual(active_conns_2, obj.active_connections)
self.assertEqual(total_conns_2, obj.total_connections)
class HealthMonitorRepositoryTest(BaseRepositoryTest):
@ -1081,6 +1123,11 @@ class AmphoraRepositoryTest(BaseRepositoryTest):
self.assertIsInstance(new_amphora, models.Amphora)
self.assertEqual(amphora, new_amphora)
def test_count(self):
amphora = self.create_amphora(self.FAKE_UUID_1)
amp_count = self.amphora_repo.count(self.session, id=amphora.id)
self.assertEqual(amp_count, 1)
def test_create(self):
amphora = self.create_amphora(self.FAKE_UUID_1)
self.assertEqual(self.FAKE_UUID_1, amphora.id)
@ -1088,6 +1135,16 @@ class AmphoraRepositoryTest(BaseRepositoryTest):
self.assertEqual(constants.ACTIVE, amphora.status)
self.assertEqual(constants.ROLE_MASTER, amphora.role)
def test_exists_true(self):
amphora = self.create_amphora(self.FAKE_UUID_1)
exist = self.amphora_repo.exists(self.session, id=amphora.id)
self.assertTrue(exist)
def test_exists_false(self):
self.create_amphora(self.FAKE_UUID_1)
exist = self.amphora_repo.exists(self.session, id='test')
self.assertFalse(exist)
def test_update(self):
status_change = constants.PENDING_UPDATE
amphora = self.create_amphora(self.FAKE_UUID_1)
@ -1170,6 +1227,26 @@ class AmphoraHealthRepositoryTest(BaseRepositoryTest):
busy=False)
return amphora_health
def test_replace(self):
amphora_id = uuidutils.generate_uuid()
now = datetime.datetime.utcnow()
self.assertIsNone(self.amphora_health_repo.get(
self.session, amphora_id=amphora_id))
self.amphora_health_repo.replace(self.session, amphora_id,
last_update=now)
obj = self.amphora_health_repo.get(self.session, amphora_id=amphora_id)
self.assertIsNotNone(obj)
self.assertEqual(amphora_id, obj.amphora_id)
self.assertEqual(now, obj.last_update)
now += datetime.timedelta(seconds=69)
self.amphora_health_repo.replace(self.session, amphora_id,
last_update=now)
obj = self.amphora_health_repo.get(self.session, amphora_id=amphora_id)
self.assertIsNotNone(obj)
self.assertEqual(amphora_id, obj.amphora_id)
self.assertEqual(now, obj.last_update)
def test_get(self):
amphora_health = self.create_amphora_health(self.amphora.id)
new_amphora_health = self.amphora_health_repo.get(
@ -1194,7 +1271,11 @@ class AmphoraHealthRepositoryTest(BaseRepositoryTest):
self.session, self.amphora.id, exp_age)
self.assertTrue(checkres)
def test_get_stale_amphorae(self):
def test_get_stale_amphora(self):
stale_amphora = self.amphora_health_repo.get_stale_amphora(
self.session)
self.assertIsNone(stale_amphora)
self.create_amphora_health(self.amphora.id)
stale_amphora = self.amphora_health_repo.get_stale_amphora(
self.session)

View File

@ -0,0 +1,76 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from octavia.amphorae.backends.agent import agent_jinja_cfg
import octavia.tests.unit.base as base
AMP_ID = uuidutils.generate_uuid()
class AgentJinjaTestCase(base.TestCase):
def setUp(self):
super(AgentJinjaTestCase, self).setUp()
conf = oslo_fixture.Config(cfg.CONF)
conf.config(debug=False)
conf.config(group="amphora_agent",
agent_server_ca='/etc/octavia/certs/client_ca.pem')
conf.config(group="amphora_agent",
agent_server_cert='/etc/octavia/certs/server.pem')
conf.config(group="amphora_agent",
agent_server_network_dir='/etc/network/interfaces.d/')
conf.config(group="haproxy_amphora",
base_cert_dir='/var/lib/octavia/certs')
conf.config(group="haproxy_amphora", base_path='/var/lib/octavia')
conf.config(group="haproxy_amphora", bind_host='0.0.0.0')
conf.config(group="haproxy_amphora", bind_port=9443)
conf.config(group="haproxy_amphora", haproxy_cmd='/usr/sbin/haproxy')
conf.config(group="haproxy_amphora", respawn_count=2)
conf.config(group="haproxy_amphora", respawn_interval=2)
conf.config(group="health_manager",
controller_ip_port_list=['192.0.2.10:5555'])
conf.config(group="health_manager", heartbeat_interval=10)
conf.config(group="health_manager", heartbeat_key='TEST')
self.expected_config = ('\n[DEFAULT]\n'
'debug = False\n\n'
'[haproxy_amphora]\n'
'base_cert_dir = /var/lib/octavia/certs\n'
'base_path = /var/lib/octavia\n'
'bind_host = 0.0.0.0\n'
'bind_port = 9443\n'
'haproxy_cmd = /usr/sbin/haproxy\n'
'respawn_count = 2\n'
'respawn_interval = 2\n\n'
'[health_manager]\n'
'controller_ip_port_list = 192.0.2.10:5555\n'
'heartbeat_interval = 10\n'
'heartbeat_key = TEST\n\n'
'[amphora_agent]\n'
'agent_server_ca = '
'/etc/octavia/certs/client_ca.pem\n'
'agent_server_cert = '
'/etc/octavia/certs/server.pem\n'
'agent_server_network_dir = '
'/etc/network/interfaces.d/\n'
'amphora_id = ' + AMP_ID)
def test_build_agent_config(self):
ajc = agent_jinja_cfg.AgentJinjaTemplater()
agent_cfg = ajc.build_agent_config(AMP_ID)
self.assertEqual(self.expected_config, agent_cfg)

View File

@ -1,95 +0,0 @@
# Copyright 2014 Hewlett-Packard Development-Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import os
import tempfile
from testtools import matchers
from octavia.amphorae.backends.health_daemon import config
from octavia.tests.unit import base
class TestConfig(base.TestCase):
def setUp(self):
super(TestConfig, self).setUp()
self.setup_config_file()
self.addCleanup(self.remove_config_file)
def test_noconfig(self):
cfg = config.JSONFileConfig()
self.assertThat(lambda: cfg.set_filename('/doesnotexist'),
matchers.raises(IOError))
def test_config(self):
def check_update():
self.assertFalse(self.update_called)
self.update_called = True
cfg = config.JSONFileConfig()
cfg.set_filename(self.sampleconfig[1])
# Check the singleton decorator
self.assertIs(cfg, config.JSONFileConfig())
cfg.add_observer(check_update)
self.update_called = False
cfg.check_update()
self.assertTrue(self.update_called)
self.assertIs(cfg['delay'], 10)
# First test - change the existing file - same file, no change
with open(self.sampleconfig[1], 'w+') as f:
cdata = {'delay': 5}
json.dump(cdata, f)
self.update_called = False
cfg.check_update()
self.assertTrue(self.update_called)
self.assertIs(cfg['delay'], 5)
# Check for removing an observer - Thanks Stephen
cfg.remove_observer(check_update)
self.update_called = False
cfg.check_update()
self.assertFalse(self.update_called)
# Better add it back for the next test
cfg.add_observer(check_update)
# Next, replace the file (new inode)
self.remove_config_file()
with open(self.sampleconfig[1], 'w+') as f:
cdata = {'delay': 3}
json.dump(cdata, f)
self.update_called = False
cfg.check_update()
self.assertTrue(self.update_called)
self.assertIs(cfg['delay'], 3)
def setup_config_file(self):
self.sampleconfig = tempfile.mkstemp()
conffile = os.fdopen(self.sampleconfig[0], 'w+')
cdata = {'delay': 10}
json.dump(cdata, conffile)
conffile.close()
def remove_config_file(self):
os.unlink(self.sampleconfig[1])

View File

@ -15,6 +15,7 @@
import uuid
from octavia.amphorae.backends.health_daemon import status_message
from octavia.common import exceptions
from octavia.tests.unit import base
@ -23,11 +24,16 @@ class TestEnvelope(base.TestCase):
super(TestEnvelope, self).setUp()
def test_message_hmac(self):
self.skipTest("This test is broken and will be fixed in CR# 201882")
statusMsg = {'seq': 42,
seq = 42
for i in range(0, 16):
statusMsg = {'seq': seq,
'status': 'OK',
'id': str(uuid.uuid4())}
sme = status_message.encode(statusMsg, 'samplekey1')
self.assertTrue(status_message.checkhmac(sme, 'samplekey1'))
self.assertFalse(status_message.checkhmac(sme, 'samplekey2'))
envelope = status_message.wrap_envelope(statusMsg, 'samplekey1')
obj = status_message.unwrap_envelope(envelope, 'samplekey1')
self.assertEqual(obj['status'], 'OK')
self.assertEqual(obj['seq'], seq)
seq += 1
args = (envelope, 'samplekey?')
self.assertRaises(exceptions.InvalidHMACException,
status_message.unwrap_envelope, *args)

View File

@ -0,0 +1,259 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
import six
from octavia.amphorae.backends.health_daemon import health_daemon
import octavia.tests.unit.base as base
if six.PY2:
import Queue as queue
import mock
else:
import queue
import unittest.mock as mock
LISTENER_ID1 = uuidutils.generate_uuid()
LISTENER_ID2 = uuidutils.generate_uuid()
LISTENER_IDS = [LISTENER_ID1, LISTENER_ID2]
BASE_PATH = '/tmp/test'
SAMPLE_POOL_STATUS = {'432fc8b3-d446-48d4-bb64-13beb90e22bc': {
'status': 'UP',
'uuid': '432fc8b3-d446-48d4-bb64-13beb90e22bc',
'members': {
'302e33d9-dee1-4de9-98d5-36329a06fb58': 'DOWN'}}}
SAMPLE_BOGUS_POOL_STATUS = {LISTENER_ID1: {
'status': 'UP',
'uuid': LISTENER_ID1,
'members': {
'302e33d9-dee1-4de9-98d5-36329a06fb58':
'DOWN'}}}
SAMPLE_STATS = ({'': '', 'status': 'OPEN', 'lastchg': '',
'weight': '', 'slim': '2000', 'pid': '1', 'comp_byp': '0',
'lastsess': '', 'rate_lim': '0', 'check_duration': '',
'rate': '0', 'req_rate': '0', 'check_status': '',
'econ': '', 'comp_out': '0', 'wredis': '', 'dresp': '0',
'ereq': '0', 'tracked': '', 'comp_in': '0',
'pxname': '490b6ae7-21aa-43f1-b82a-68ddcd2ca2fb',
'dreq': '0', 'hrsp_5xx': '0', 'last_chk': '',
'check_code': '', 'sid': '0', 'bout': '0', 'hrsp_1xx': '0',
'qlimit': '', 'hrsp_other': '0', 'bin': '0', 'rtime': '',
'smax': '0', 'req_tot': '0', 'lbtot': '', 'stot': '0',
'wretr': '', 'req_rate_max': '0', 'ttime': '', 'iid': '2',
'hrsp_4xx': '0', 'chkfail': '', 'hanafail': '',
'downtime': '', 'qcur': '', 'eresp': '', 'comp_rsp': '0',
'cli_abrt': '', 'ctime': '', 'qtime': '', 'srv_abrt': '',
'throttle': '', 'last_agt': '', 'scur': '0', 'type': '0',
'bck': '', 'qmax': '', 'rate_max': '0', 'hrsp_2xx': '0',
'act': '', 'chkdown': '', 'svname': 'FRONTEND',
'hrsp_3xx': '0'},
{'': '', 'status': 'no check', 'lastchg': '', 'weight': '1',
'slim': '', 'pid': '1', 'comp_byp': '', 'lastsess': '-1',
'rate_lim': '', 'check_duration': '', 'rate': '0',
'req_rate': '', 'check_status': '', 'econ': '0',
'comp_out': '', 'wredis': '0', 'dresp': '0', 'ereq': '',
'tracked': '', 'comp_in': '',
'pxname': '432fc8b3-d446-48d4-bb64-13beb90e22bc',
'dreq': '', 'hrsp_5xx': '0', 'last_chk': '',
'check_code': '', 'sid': '1', 'bout': '0', 'hrsp_1xx': '0',
'qlimit': '', 'hrsp_other': '0', 'bin': '0', 'rtime': '0',
'smax': '0', 'req_tot': '', 'lbtot': '0', 'stot': '0',
'wretr': '0', 'req_rate_max': '', 'ttime': '0', 'iid': '3',
'hrsp_4xx': '0', 'chkfail': '', 'hanafail': '0',
'downtime': '', 'qcur': '0', 'eresp': '0', 'comp_rsp': '',
'cli_abrt': '0', 'ctime': '0', 'qtime': '0', 'srv_abrt': '0',
'throttle': '', 'last_agt': '', 'scur': '0', 'type': '2',
'bck': '0', 'qmax': '0', 'rate_max': '0', 'hrsp_2xx': '0',
'act': '1', 'chkdown': '',
'svname': '302e33d9-dee1-4de9-98d5-36329a06fb58',
'hrsp_3xx': '0'},
{'': '', 'status': 'UP', 'lastchg': '122', 'weight': '1',
'slim': '200', 'pid': '1', 'comp_byp': '0', 'lastsess': '-1',
'rate_lim': '', 'check_duration': '', 'rate': '0',
'req_rate': '', 'check_status': '', 'econ': '0',
'comp_out': '0', 'wredis': '0', 'dresp': '0', 'ereq': '',
'tracked': '', 'comp_in': '0',
'pxname': '432fc8b3-d446-48d4-bb64-13beb90e22bc', 'dreq': '0',
'hrsp_5xx': '0', 'last_chk': '', 'check_code': '', 'sid': '0',
'bout': '0', 'hrsp_1xx': '0', 'qlimit': '', 'hrsp_other': '0',
'bin': '0', 'rtime': '0', 'smax': '0', 'req_tot': '',
'lbtot': '0', 'stot': '0', 'wretr': '0', 'req_rate_max': '',
'ttime': '0', 'iid': '3', 'hrsp_4xx': '0', 'chkfail': '',
'hanafail': '', 'downtime': '0', 'qcur': '0', 'eresp': '0',
'comp_rsp': '0', 'cli_abrt': '0', 'ctime': '0', 'qtime': '0',
'srv_abrt': '0', 'throttle': '', 'last_agt': '', 'scur': '0',
'type': '1', 'bck': '0', 'qmax': '0', 'rate_max': '0',
'hrsp_2xx': '0', 'act': '1', 'chkdown': '0',
'svname': 'BACKEND', 'hrsp_3xx': '0'})
SAMPLE_STATS_MSG = {'listeners': {
LISTENER_ID1: {
'pools': {'432fc8b3-d446-48d4-bb64-13beb90e22bc': {
'members': {
'302e33d9-dee1-4de9-98d5-36329a06fb58':
'DOWN'},
'status': 'UP'}}, 'stats': {
'totconns': 0, 'conns': 0, 'tx': 0, 'rx': 0},
'status': 'OPEN'},
LISTENER_ID2: {
'pools': {'432fc8b3-d446-48d4-bb64-13beb90e22bc': {
'members': {
'302e33d9-dee1-4de9-98d5-36329a06fb58':
'DOWN'},
'status': 'UP'}}, 'stats': {
'totconns': 0, 'conns': 0, 'tx': 0, 'rx': 0},
'status': 'OPEN'}}, 'id': None, 'seq': 0}
class TestHealthDaemon(base.TestCase):
def setUp(self):
super(TestHealthDaemon, self).setUp()
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="haproxy_amphora", base_path=BASE_PATH)
@mock.patch('octavia.amphorae.backends.agent.'
'api_server.util.get_listeners')
def test_list_sock_stat_files(self, mock_get_listener):
mock_get_listener.return_value = LISTENER_IDS
health_daemon.list_sock_stat_files()
files = health_daemon.list_sock_stat_files(BASE_PATH)
expected_files = {LISTENER_ID1: BASE_PATH + '/' +
LISTENER_ID1 + '.sock',
LISTENER_ID2: BASE_PATH + '/' +
LISTENER_ID2 + '.sock'}
self.assertEqual(files, expected_files)
@mock.patch('oslo_config.cfg.CONF.reload_config_files')
@mock.patch('octavia.amphorae.backends.health_daemon.'
'health_daemon.build_stats_message')
@mock.patch('octavia.amphorae.backends.health_daemon.'
'health_sender.UDPStatusSender')
def test_run_sender(self, mock_UDPStatusSender, mock_build_msg,
mock_reload_cfg):
sender_mock = mock.MagicMock()
dosend_mock = mock.MagicMock()
sender_mock.dosend = dosend_mock
mock_UDPStatusSender.return_value = sender_mock
mock_build_msg.side_effect = ['TEST', Exception('break')]
test_queue = queue.Queue()
self.assertRaisesRegexp(Exception, 'break',
health_daemon.run_sender, test_queue)
sender_mock.dosend.assert_called_once_with('TEST')
# Test a reload event
mock_build_msg.reset_mock()
mock_build_msg.side_effect = ['TEST', Exception('break')]
test_queue.put('reload')
self.assertRaisesRegexp(Exception, 'break',
health_daemon.run_sender, test_queue)
mock_reload_cfg.assert_called_once_with()
# Test the shutdown path
sender_mock.reset_mock()
dosend_mock.reset_mock()
mock_build_msg.reset_mock()
mock_build_msg.side_effect = ['TEST', 'TEST']
test_queue.put('shutdown')
health_daemon.run_sender(test_queue)
sender_mock.dosend.assert_called_once_with('TEST')
# Test an unkown command
mock_build_msg.reset_mock()
mock_build_msg.side_effect = ['TEST', Exception('break')]
test_queue.put('bogus')
self.assertRaisesRegexp(Exception, 'break',
health_daemon.run_sender, test_queue)
@mock.patch('octavia.amphorae.backends.utils.haproxy_query.HAProxyQuery')
def test_get_stats(self, mock_query):
stats_query_mock = mock.MagicMock()
mock_query.return_value = stats_query_mock
health_daemon.get_stats('TEST')
stats_query_mock.show_stat.assert_called_once_with()
stats_query_mock.get_pool_status.assert_called_once_with()
@mock.patch('octavia.amphorae.backends.agent.api_server.'
'util.is_listener_running')
@mock.patch('octavia.amphorae.backends.health_daemon.'
'health_daemon.get_stats')
@mock.patch('octavia.amphorae.backends.health_daemon.'
'health_daemon.list_sock_stat_files')
def test_build_stats_message(self, mock_list_files,
mock_get_stats, mock_is_running):
mock_list_files.return_value = {LISTENER_ID1: 'TEST',
LISTENER_ID2: 'TEST2'}
mock_is_running.return_value = True
mock_get_stats.return_value = SAMPLE_STATS, SAMPLE_POOL_STATUS
msg = health_daemon.build_stats_message()
self.assertEqual(msg, SAMPLE_STATS_MSG)
mock_get_stats.assert_any_call('TEST')
mock_get_stats.assert_any_call('TEST2')
@mock.patch('octavia.amphorae.backends.agent.api_server.'
'util.is_listener_running')
@mock.patch('octavia.amphorae.backends.health_daemon.'
'health_daemon.get_stats')
@mock.patch('octavia.amphorae.backends.health_daemon.'
'health_daemon.list_sock_stat_files')
def test_build_stats_message_no_listener(self, mock_list_files,
mock_get_stats,
mock_is_running):
mock_list_files.return_value = {LISTENER_ID1: 'TEST',
LISTENER_ID2: 'TEST2'}
mock_is_running.side_effect = [True, False]
mock_get_stats.return_value = SAMPLE_STATS, SAMPLE_POOL_STATUS
health_daemon.build_stats_message()
self.assertEqual(mock_get_stats.call_count, 1)
@mock.patch('octavia.amphorae.backends.agent.api_server.'
'util.is_listener_running')
@mock.patch('octavia.amphorae.backends.health_daemon.'
'health_daemon.get_stats')
@mock.patch('octavia.amphorae.backends.health_daemon.'
'health_daemon.list_sock_stat_files')
def test_build_stats_message_mismatch_pool(self, mock_list_files,
mock_get_stats,
mock_is_running):
mock_list_files.return_value = {LISTENER_ID1: 'TEST',
LISTENER_ID2: 'TEST2'}
mock_is_running.return_value = True
mock_get_stats.return_value = SAMPLE_STATS, SAMPLE_BOGUS_POOL_STATUS
msg = health_daemon.build_stats_message()
self.assertEqual(msg['listeners'][LISTENER_ID1]['pools'], {})

View File

@ -0,0 +1,124 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import binascii
import random
import socket
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
import six
from octavia.amphorae.backends.health_daemon import health_sender
from octavia.tests.unit import base
if six.PY2:
import mock
else:
import unittest.mock as mock
IP = '192.0.2.15'
IP_PORT = '192.0.2.10:5555', '192.0.2.10:5555'
KEY = 'TEST'
PORT = random.randrange(1, 9000)
SAMPLE_MSG = {'testkey': 'TEST'}
SAMPLE_MSG_BIN = binascii.unhexlify('78daab562a492d2ec94ead54b252500a710d0e5'
'1aa050041b506245806e5c1971e79951818394e'
'a6e71ad989ff950945f9573f4ab6f83e25db8ed7')
class TestHealthSender(base.TestCase):
def setUp(self):
super(TestHealthSender, self).setUp()
self.conf = oslo_fixture.Config(cfg.CONF)
self.conf.config(group="health_manager",
controller_ip_port_list=IP_PORT)
self.conf.config(group="health_manager",
heartbeat_key=KEY)
@mock.patch('socket.getaddrinfo')
@mock.patch('socket.socket')
def test_sender(self, mock_socket, mock_getaddrinfo):
socket_mock = mock.MagicMock()
mock_socket.return_value = socket_mock
sendto_mock = mock.MagicMock()
socket_mock.sendto = sendto_mock
# Test when no addresses are returned
mock_getaddrinfo.return_value = []
sender = health_sender.UDPStatusSender()
sender.dosend(SAMPLE_MSG)
# Test IPv4 path
mock_getaddrinfo.return_value = [(socket.AF_INET,
socket.SOCK_DGRAM,
socket.IPPROTO_UDP,
'',
('192.0.2.20', 80))]
sendto_mock.reset_mock()
sender = health_sender.UDPStatusSender()
sender.dosend(SAMPLE_MSG)
sendto_mock.assert_called_once_with(SAMPLE_MSG_BIN,
('192.0.2.20', 80))
sendto_mock.reset_mock()
# Test IPv6 path
mock_getaddrinfo.return_value = [(socket.AF_INET6,
socket.SOCK_DGRAM,
socket.IPPROTO_UDP,
'',
('2001:0DB8::F00D', 80))]
sender = health_sender.UDPStatusSender()
sender.dosend(SAMPLE_MSG)
sendto_mock.assert_called_once_with(SAMPLE_MSG_BIN,
('2001:0DB8::F00D', 80))
sendto_mock.reset_mock()
# Test invalid address family
mock_getaddrinfo.return_value = [(socket.AF_UNIX,
socket.SOCK_DGRAM,
socket.IPPROTO_UDP,
'',
('2001:0DB8::F00D', 80))]
sender = health_sender.UDPStatusSender()
sender.dosend(SAMPLE_MSG)
self.assertFalse(sendto_mock.called)
sendto_mock.reset_mock()
# Test socket error
socket_mock.sendto.side_effect = socket.error
mock_getaddrinfo.return_value = [(socket.AF_INET6,
socket.SOCK_DGRAM,
socket.IPPROTO_UDP,
'',
('2001:0DB8::F00D', 80))]
sender = health_sender.UDPStatusSender()
# Should not raise an exception
sender.dosend(SAMPLE_MSG)

View File

@ -1,42 +0,0 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import os
import tempfile
from octavia.tests.unit import base
class TestSender(base.TestCase):
def setUp(self):
super(TestSender, self).setUp()
self.setupConfigFile()
self.addCleanup(self.removeConfigFile)
def setupConfigFile(self):
self.sampleconfig = tempfile.mkstemp()
conffile = os.fdopen(self.sampleconfig[0], 'w+')
cdata = {'delay': 10,
'target': ['127.0.0.1', '::1'],
'psk': 'fubar',
'dport': 12345}
json.dump(cdata, conffile)
conffile.close()
def removeConfigFile(self):
os.unlink(self.sampleconfig[1])
def test_message_output(self):
pass

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import socket
import mock
from octavia.amphorae.backends.utils import haproxy_query as query
@ -31,11 +33,11 @@ STATS_SOCKET_SAMPLE = (
",1,3,2,,0,,2,0,,0,L4TOUT,,30001,0,0,0,0,0,0,0,,,,0,0,,,,,-1,,,0,0,0,0,\n"
"http-servers,BACKEND,0,0,0,0,200,0,0,0,0,0,,0,0,0,0,DOWN,0,0,0,,1,567,567"
",,1,3,0,,0,,1,0,,0,,,,0,0,0,0,0,0,,,,,0,0,0,0,0,0,-1,,,0,0,0,0,\n"
"tcp-servers,id-34833,0,0,0,0,,0,0,0,,0,,0,0,0,0,DOWN,1,1,0,1,1,560,560,,"
"tcp-servers,id-34833,0,0,0,0,,0,0,0,,0,,0,0,0,0,UP,1,1,0,1,1,560,560,,"
"1,5,1,,0,,2,0,,0,L4TOUT,,30000,,,,,,,0,,,,0,0,,,,,-1,,,0,0,0,0,\n"
"tcp-servers,id-34836,0,0,0,0,,0,0,0,,0,,0,0,0,0,DOWN,1,1,0,1,1,552,552,,"
"tcp-servers,id-34836,0,0,0,0,,0,0,0,,0,,0,0,0,0,UP,1,1,0,1,1,552,552,,"
"1,5,2,,0,,2,0,,0,L4TOUT,,30001,,,,,,,0,,,,0,0,,,,,-1,,,0,0,0,0,\n"
"tcp-servers,BACKEND,0,0,0,0,200,0,0,0,0,0,,0,0,0,0,DOWN,0,0,0,,1,552,552"
"tcp-servers,BACKEND,0,0,0,0,200,0,0,0,0,0,,0,0,0,0,UP,0,0,0,,1,552,552"
",,1,5,0,,0,,1,0,,0,,,,,,,,,,,,,,0,0,0,0,0,0,-1,,,0,0,0,0,"
)
@ -62,23 +64,42 @@ class QueryTestCase(base.TestCase):
self.q = query.HAProxyQuery('')
super(QueryTestCase, self).setUp()
@mock.patch('socket.socket')
def test_query(self, mock_socket):
sock = mock.MagicMock()
sock.connect.side_effect = [None, socket.error]
sock.recv.side_effect = ['testdata', None]
mock_socket.return_value = sock
self.q._query('test')
sock.connect.assert_called_once_with('')
sock.send.assert_called_once_with('test' + '\n')
sock.recv.assert_called_with(1024)
self.assertTrue(sock.close.called)
self.assertRaisesRegexp(Exception,
'HAProxy \'test\' query failed.',
self.q._query, 'test')
def test_get_pool_status(self):
query_mock = mock.Mock()
self.q._query = query_mock
query_mock.return_value = STATS_SOCKET_SAMPLE
self.assertEqual(
{'tcp-servers': {
'status': 'DOWN',
'status': 'UP',
'uuid': 'tcp-servers',
'members': [
{'id-34833': 'DOWN'},
{'id-34836': 'DOWN'}]},
'members':
{'id-34833': 'UP',
'id-34836': 'UP'}},
'http-servers': {
'status': 'DOWN',
'uuid': 'http-servers',
'members': [
{'id-34821': 'DOWN'},
{'id-34824': 'DOWN'}]}},
'members':
{'id-34821': 'DOWN',
'id-34824': 'DOWN'}}},
self.q.get_pool_status()
)

View File

@ -138,7 +138,7 @@ class AmphoraAPIClientTest(base.TestCase):
def setUp(self):
super(AmphoraAPIClientTest, self).setUp()
self.driver = driver.AmphoraAPIClient()
self.base_url = "https://127.0.0.1:8443/0.5"
self.base_url = "https://127.0.0.1:9443/0.5"
self.amp = models.Amphora(lb_network_ip='127.0.0.1', compute_id='123')
self.port_info = dict(mac_address='123')

View File

@ -0,0 +1,151 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
# Copyright (c) 2015 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import binascii
import random
import socket
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
import six
from octavia.amphorae.drivers.health import heartbeat_udp
from octavia.common import exceptions
from octavia.tests.unit import base
if six.PY2:
import mock
else:
import unittest.mock as mock
FAKE_ID = 1
KEY = 'TEST'
IP = '192.0.2.10'
PORT = random.randrange(1, 9000)
RLIMIT = random.randrange(1, 100)
class TestHeartbeatUDP(base.TestCase):
@mock.patch('socket.getaddrinfo')
@mock.patch('socket.socket')
def setUp(self, mock_socket, mock_getaddrinfo):
super(TestHeartbeatUDP, self).setUp()
self.mock_socket = mock_socket
self.mock_getaddrinfo = mock_getaddrinfo
self.mock_getaddrinfo.return_value = [range(1, 6)]
self.health_update = mock.Mock()
self.stats_update = mock.Mock()
self.conf = oslo_fixture.Config(cfg.CONF)
self.conf.config(group="health_manager", heartbeat_key=KEY)
self.conf.config(group="health_manager", bind_ip=IP)
self.conf.config(group="health_manager", bind_port=PORT)
self.conf.config(group="health_manager", sock_rlimit=0)
self.udp_status_getter = heartbeat_udp.UDPStatusGetter(
self.health_update, self.stats_update)
@mock.patch('socket.getaddrinfo')
@mock.patch('socket.socket')
def test_update(self, mock_socket, mock_getaddrinfo):
socket_mock = mock.MagicMock()
mock_socket.return_value = socket_mock
mock_getaddrinfo.return_value = [range(1, 6)]
bind_mock = mock.MagicMock()
socket_mock.bind = bind_mock
getter = heartbeat_udp.UDPStatusGetter(
None, None)
self.mock_getaddrinfo.assert_called_with(IP, PORT, 0, 2)
self.assertEqual(self.udp_status_getter.sockaddr, 5)
self.mock_socket.assert_called_with(1, socket.SOCK_DGRAM)
bind_mock.assert_called_once_with((IP, PORT))
self.conf.config(group="health_manager", sock_rlimit=RLIMIT)
mock_getaddrinfo.return_value = [range(1, 6), range(1, 6)]
getter.update(KEY, IP, PORT)
@mock.patch('socket.getaddrinfo')
@mock.patch('socket.socket')
def test_dorecv(self, mock_socket, mock_getaddrinfo):
socket_mock = mock.MagicMock()
mock_socket.return_value = socket_mock
mock_getaddrinfo.return_value = [range(1, 6)]
recvfrom = mock.MagicMock()
socket_mock.recvfrom = recvfrom
getter = heartbeat_udp.UDPStatusGetter(
None, None)
# key = 'TEST' msg = {"testkey": "TEST"}
sample_msg = ('78daab562a492d2ec94ead54b252500a710d0e5'
'1aa050041b506245806e5c1971e79951818394e'
'a6e71ad989ff950945f9573f4ab6f83e25db8ed7')
bin_msg = binascii.unhexlify(sample_msg)
recvfrom.return_value = bin_msg, 2
(obj, srcaddr) = getter.dorecv()
self.assertEqual(srcaddr, 2)
self.assertEqual(obj, {"testkey": "TEST"})
def test_check(self):
mock_dorecv = mock.Mock()
self.udp_status_getter.dorecv = mock_dorecv
mock_dorecv.side_effect = [(dict(id=FAKE_ID), 2)]
self.udp_status_getter.check()
self.health_update.update_health.assert_called_once_with({'id': 1})
self.stats_update.update_stats.assert_called_once_with({'id': 1})
@mock.patch('socket.getaddrinfo')
@mock.patch('socket.socket')
def test_check_no_mixins(self, mock_socket, mock_getaddrinfo):
self.mock_socket = mock_socket
self.mock_getaddrinfo = mock_getaddrinfo
self.mock_getaddrinfo.return_value = [range(1, 6)]
mock_dorecv = mock.Mock()
self.udp_status_getter = heartbeat_udp.UDPStatusGetter(
None, None)
self.udp_status_getter.dorecv = mock_dorecv
mock_dorecv.side_effect = [(dict(id=FAKE_ID), 2)]
self.udp_status_getter.check()
@mock.patch('socket.getaddrinfo')
@mock.patch('socket.socket')
def test_socket_except(self, mock_socket, mock_getaddrinfo):
self.assertRaises(exceptions.NetworkConfig,
heartbeat_udp.UDPStatusGetter, None, None)
@mock.patch('concurrent.futures.ThreadPoolExecutor.submit')
@mock.patch('socket.getaddrinfo')
@mock.patch('socket.socket')
def test_check_exception(self, mock_socket, mock_getaddrinfo, mock_submit):
self.mock_socket = mock_socket
self.mock_getaddrinfo = mock_getaddrinfo
self.mock_getaddrinfo.return_value = [range(1, 6)]
mock_dorecv = mock.Mock()
self.udp_status_getter = heartbeat_udp.UDPStatusGetter(
None, None)
self.udp_status_getter.dorecv = mock_dorecv
mock_dorecv.side_effect = exceptions.InvalidHMACException
self.udp_status_getter.check()
self.assertFalse(mock_submit.called)

View File

View File

@ -0,0 +1,92 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from octavia.cmd import health_manager
from octavia.tests.unit import base
if six.PY2:
import mock
else:
import unittest.mock as mock
class TestHealthManagerCMD(base.TestCase):
def setUp(self):
super(TestHealthManagerCMD, self).setUp()
@mock.patch('octavia.controller.healthmanager.'
'update_stats_mixin.UpdateStatsMixin')
@mock.patch('octavia.controller.healthmanager.'
'update_health_mixin.UpdateHealthMixin')
@mock.patch('octavia.amphorae.drivers.health.'
'heartbeat_udp.UDPStatusGetter')
def test_hm_listener(self, mock_getter, mock_health, mock_stats):
getter_mock = mock.MagicMock()
check_mock = mock.MagicMock()
getter_mock.check = check_mock
getter_mock.check.side_effect = [None, Exception('break')]
mock_getter.return_value = getter_mock
self.assertRaisesRegexp(Exception, 'break',
health_manager.hm_listener)
mock_getter.assert_called_once_with(mock_health(), mock_stats())
self.assertEqual(getter_mock.check.call_count, 2)
@mock.patch('octavia.controller.healthmanager.'
'health_manager.HealthManager')
def test_hm_health_check(self, mock_health):
hm_mock = mock.MagicMock()
health_check_mock = mock.MagicMock()
hm_mock.health_check = health_check_mock
hm_mock.health_check.side_effect = [None, Exception('break')]
mock_health.return_value = hm_mock
self.assertRaisesRegexp(Exception, 'break',
health_manager.hm_health_check)
mock_health.assert_called_once_with()
self.assertEqual(hm_mock.health_check.call_count, 2)
@mock.patch('multiprocessing.Process')
@mock.patch('octavia.common.service.prepare_service')
def test_main(self, mock_service, mock_process):
mock_listener_proc = mock.MagicMock()
mock_health_proc = mock.MagicMock()
mock_process.side_effect = [mock_listener_proc, mock_health_proc]
health_manager.main()
mock_listener_proc.start.assert_called_once_with()
mock_health_proc.start.assert_called_once_with()
mock_listener_proc.join.assert_called_once_with()
mock_health_proc.join.assert_called_once_with()
@mock.patch('multiprocessing.Process')
@mock.patch('octavia.common.service.prepare_service')
def test_main_keyboard_interrupt(self, mock_service, mock_process):
mock_listener_proc = mock.MagicMock()
mock_health_proc = mock.MagicMock()
mock_join = mock.MagicMock()
mock_join.side_effect = KeyboardInterrupt
mock_listener_proc.join = mock_join
mock_process.side_effect = [mock_listener_proc, mock_health_proc]
health_manager.main()
mock_listener_proc.start.assert_called_once_with()
mock_health_proc.start.assert_called_once_with()
mock_listener_proc.join.assert_called_once_with()
self.assertFalse(mock_health_proc.join.called)

View File

@ -105,7 +105,7 @@ class TestNovaClient(base.TestCase):
def test_status(self):
status = self.manager.status(self.amphora.id)
self.assertEqual(constants.AMPHORA_UP, status)
self.assertEqual(constants.UP, status)
def test_bad_status(self):
self.manager.manager.get.side_effect = Exception

View File

@ -14,6 +14,7 @@
from oslo_utils import uuidutils
import six
import sqlalchemy
from octavia.common import constants
from octavia.controller.healthmanager import update_health_mixin as healthmixin
@ -32,36 +33,42 @@ class TestUpdateHealthMixin(base.TestCase):
super(TestUpdateHealthMixin, self).setUp()
self.hm = healthmixin.UpdateHealthMixin()
self.amphora_repo = mock.MagicMock()
self.amphora_health_repo = mock.MagicMock()
self.listener_repo = mock.MagicMock()
self.loadbalancer_repo = mock.MagicMock()
self.member_repo = mock.MagicMock()
self.pool_repo = mock.MagicMock()
self.hm.amphora_repo = self.amphora_repo
fake_lb = mock.MagicMock()
self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [fake_lb]
self.hm.amphora_health_repo = self.amphora_health_repo
self.hm.listener_repo = self.listener_repo
self.hm.listener_repo.count.return_value = 1
self.hm.loadbalancer_repo = self.loadbalancer_repo
self.hm.member_repo = self.member_repo
self.hm.pool_repo = self.pool_repo
@mock.patch('octavia.db.api.get_session')
@mock.patch('sqlalchemy.sql.func.now')
def test_update_health_Online(self, lastupdate, session):
def test_update_health_Online(self, session):
health = {
"amphora-status": constants.AMPHORA_UP,
"amphora-id": self.FAKE_UUID_1,
"id": self.FAKE_UUID_1,
"listeners": {
"listener-id-1": {"listener-status": constants.ONLINE,
"members": {"member-id-1": constants.ONLINE}
"listener-id-1": {"status": constants.OPEN, "pools": {
"pool-id-1": {"status": constants.UP,
"members": {"member-id-1": constants.UP}
}
}
}
}
}
session.return_value = 'blah'
lastupdate.return_value = '2014-02-12'
self.hm.update_health(health)
self.assertTrue(self.amphora_health_repo.update.called)
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
@ -70,42 +77,241 @@ class TestUpdateHealthMixin(base.TestCase):
self.listener_repo.update.assert_any_call(
'blah', listener_id, operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
self.hm.pool_repo.update.assert_any_call(
'blah', pool_id, operating_status=constants.ONLINE)
for member_id, member in six.iteritems(
listener.get('members', {})):
pool.get('members', {})):
self.member_repo.update.assert_any_call(
'blah', id=member_id, operating_status=constants.ONLINE)
'blah', id=member_id,
operating_status=constants.ONLINE)
self.hm.listener_repo.count.return_value = 2
self.hm.update_health(health)
@mock.patch('octavia.db.api.get_session')
@mock.patch('sqlalchemy.sql.func.now')
def test_update_health_Error(self, lastupdate, session):
def test_update_health_member_down(self, session):
health = {
"amphora-status": constants.AMPHORA_DOWN,
"amphora-id": self.FAKE_UUID_1,
"id": self.FAKE_UUID_1,
"listeners": {
"listener-id-1": {"listener-status": constants.ERROR,
"members": {"member-id-1": constants.ERROR}
},
"listener-id-2": {"listener-status": constants.ERROR,
"members": {"member-id-2": constants.ERROR}
"listener-id-1": {"status": constants.OPEN, "pools": {
"pool-id-1": {"status": constants.UP,
"members": {"member-id-1": constants.DOWN}
}
}
}
}
}
session.return_value = 'blah'
lastupdate.return_value = '2014-02-12'
self.hm.update_health(health)
self.assertFalse(self.amphora_health_repo.update.called)
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
self.listener_repo.update.assert_any_call(
'blah', listener_id, operating_status=constants.ERROR)
'blah', listener_id, operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
self.hm.pool_repo.update.assert_any_call(
'blah', pool_id, operating_status=constants.DEGRADED)
for member_id, member in six.iteritems(
listener.get('members', {})):
pool.get('members', {})):
self.member_repo.update.assert_any_call(
'blah', id=member_id,
operating_status=constants.ERROR)
self.hm.listener_repo.count.return_value = 2
self.hm.update_health(health)
@mock.patch('octavia.db.api.get_session')
def test_update_health_list_full_member_down(self, session):
health = {
"id": self.FAKE_UUID_1,
"listeners": {
"listener-id-1": {"status": constants.FULL, "pools": {
"pool-id-1": {"status": constants.UP,
"members": {"member-id-1": constants.DOWN}
}
}
}
}
}
session.return_value = 'blah'
self.hm.update_health(health)
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
self.listener_repo.update.assert_any_call(
'blah', listener_id, operating_status=constants.DEGRADED)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
self.hm.pool_repo.update.assert_any_call(
'blah', pool_id, operating_status=constants.DEGRADED)
for member_id, member in six.iteritems(
pool.get('members', {})):
self.member_repo.update.assert_any_call(
'blah', id=member_id,
operating_status=constants.ERROR)
self.hm.listener_repo.count.return_value = 2
self.hm.update_health(health)
@mock.patch('octavia.db.api.get_session')
def test_update_health_Error(self, session):
health = {
"id": self.FAKE_UUID_1,
"listeners": {
"listener-id-1": {"status": constants.OPEN, "pools": {
"pool-id-1": {"status": constants.DOWN,
"members": {"member-id-1": constants.DOWN}
}
}
}
}
}
session.return_value = 'blah'
self.hm.update_health(health)
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
self.listener_repo.update.assert_any_call(
'blah', listener_id, operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
self.hm.pool_repo.update.assert_any_call(
'blah', pool_id, operating_status=constants.ERROR)
for member_id, member in six.iteritems(
pool.get('members', {})):
self.member_repo.update.assert_any_call(
'blah', id=member_id, operating_status=constants.ERROR)
# Test the logic code paths
@mock.patch('octavia.db.api.get_session')
def test_update_health_Full(self, session):
health = {
"id": self.FAKE_UUID_1,
"listeners": {
"listener-id-1": {"status": constants.FULL, "pools": {
"pool-id-1": {"status": constants.DOWN,
"members": {"member-id-1": constants.DOWN}
}
}
},
"listener-id-2": {"status": constants.FULL, "pools": {
"pool-id-2": {"status": constants.UP,
"members": {"member-id-2": constants.UP}
}
}
},
"listener-id-3": {"status": constants.OPEN, "pools": {
"pool-id-3": {"status": constants.UP,
"members": {"member-id-3": constants.UP,
"member-id-31": constants.DOWN}
}
}
},
"listener-id-4": {"status": "bogus", "pools": {
"pool-id-4": {"status": "bogus",
"members": {"member-id-4": "bogus"}
}
}
}
}
}
session.return_value = 'blah'
self.hm.update_health(health)
# test listener
self.listener_repo.update.assert_any_call(
'blah', "listener-id-1", operating_status=constants.DEGRADED)
self.listener_repo.update.assert_any_call(
'blah', "listener-id-2", operating_status=constants.DEGRADED)
self.pool_repo.update.assert_any_call(
'blah', "pool-id-1", operating_status=constants.ERROR)
self.pool_repo.update.assert_any_call(
'blah', "pool-id-2", operating_status=constants.ONLINE)
self.pool_repo.update.assert_any_call(
'blah', "pool-id-3", operating_status=constants.DEGRADED)
# Test code paths where objects are not found in the database
@mock.patch('octavia.db.api.get_session')
def test_update_health_Not_Found(self, session):
health = {
"id": self.FAKE_UUID_1,
"listeners": {
"listener-id-1": {"status": constants.OPEN, "pools": {
"pool-id-1": {"status": constants.UP,
"members": {"member-id-1": constants.UP}
}
}
}
}
}
session.return_value = 'blah'
self.hm.listener_repo.update.side_effect = (
[sqlalchemy.orm.exc.NoResultFound])
self.hm.member_repo.update.side_effect = (
[sqlalchemy.orm.exc.NoResultFound])
self.hm.pool_repo.update.side_effect = (
[sqlalchemy.orm.exc.NoResultFound])
self.hm.loadbalancer_repo.update.side_effect = (
[sqlalchemy.orm.exc.NoResultFound])
self.hm.update_health(health)
self.assertTrue(self.amphora_health_repo.replace.called)
# test listener, member
for listener_id, listener in six.iteritems(
health.get('listeners', {})):
self.listener_repo.update.assert_any_call(
'blah', listener_id, operating_status=constants.ONLINE)
for pool_id, pool in six.iteritems(listener.get('pools', {})):
self.hm.pool_repo.update.assert_any_call(
'blah', pool_id, operating_status=constants.ONLINE)
for member_id, member in six.iteritems(
pool.get('members', {})):
self.member_repo.update.assert_any_call(
'blah', id=member_id,
operating_status=constants.ONLINE)

View File

@ -0,0 +1,72 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
from oslo_utils import uuidutils
import six
from octavia.common import constants
from octavia.controller.healthmanager import update_stats_mixin as statsmixin
import octavia.tests.unit.base as base
if six.PY2:
import mock
else:
import unittest.mock as mock
class TestUpdateStatsMixin(base.TestCase):
def setUp(self):
super(TestUpdateStatsMixin, self).setUp()
self.sm = statsmixin.UpdateStatsMixin()
self.listener_stats_repo = mock.MagicMock()
self.sm.listener_stats_repo = self.listener_stats_repo
self.bytes_in = random.randrange(1000000000)
self.bytes_out = random.randrange(1000000000)
self.active_conns = random.randrange(1000000000)
self.total_conns = random.randrange(1000000000)
self.loadbalancer_id = uuidutils.generate_uuid()
self.listener_id = uuidutils.generate_uuid()
@mock.patch('octavia.db.api.get_session')
def test_update_stats(self, session):
health = {
"id": self.loadbalancer_id,
"listeners": {
self.listener_id: {"status": constants.OPEN,
"stats": {"conns": self.active_conns,
"totconns": self.total_conns,
"rx": self.bytes_in,
"tx": self.bytes_out},
"pools": {"pool-id-1":
{"status": constants.UP,
"members":
{"member-id-1": constants.ONLINE}
}
}
}}}
session.return_value = 'blah'
self.sm.update_stats(health)
self.listener_stats_repo.replace.assert_called_once_with(
'blah', self.listener_id, bytes_in=self.bytes_in,
bytes_out=self.bytes_out, active_connections=self.active_conns,
total_connections=self.total_conns)

View File

@ -83,12 +83,16 @@ class TestComputeTasks(base.TestCase):
super(TestComputeTasks, self).setUp()
@mock.patch('jinja2.Environment.get_template')
@mock.patch('octavia.amphorae.backends.agent.'
'agent_jinja_cfg.AgentJinjaTemplater.'
'build_agent_config', return_value='test_conf')
@mock.patch('stevedore.driver.DriverManager.driver')
def test_compute_create(self, mock_driver):
def test_compute_create(self, mock_driver, mock_conf, mock_jinja):
createcompute = compute_tasks.ComputeCreate()
mock_driver.build.side_effect = [COMPUTE_ID, TestException('test')]
mock_driver.build.return_value = COMPUTE_ID
# Test execute()
compute_id = createcompute.execute(_amphora_mock.id, ports=[_port])
@ -101,14 +105,16 @@ class TestComputeTasks(base.TestCase):
sec_groups=AMP_SEC_GROUPS,
network_ids=[AMP_NET],
port_ids=[PORT_ID],
config_drive_files=None)
config_drive_files={'/etc/octavia/'
'amphora-agent.conf': 'test_conf'})
# Make sure it returns the expected compute_id
assert(compute_id == COMPUTE_ID)
# Test that a build exception is raised
createcompute = compute_tasks.ComputeCreate()
self.assertRaises(TestException,
self.assertRaises(TypeError,
createcompute.execute,
_amphora_mock, config_drive_files='test_cert')
@ -127,12 +133,16 @@ class TestComputeTasks(base.TestCase):
createcompute.revert(COMPUTE_ID, _amphora_mock.id)
@mock.patch('jinja2.Environment.get_template')
@mock.patch('octavia.amphorae.backends.agent.'
'agent_jinja_cfg.AgentJinjaTemplater.'
'build_agent_config', return_value='test_conf')
@mock.patch('stevedore.driver.DriverManager.driver')
def test_compute_create_cert(self, mock_driver):
def test_compute_create_cert(self, mock_driver, mock_conf, mock_jinja):
createcompute = compute_tasks.CertComputeCreate()
mock_driver.build.side_effect = [COMPUTE_ID, TestException('test')]
mock_driver.build.return_value = COMPUTE_ID
m = mock.mock_open(read_data='test')
with mock.patch('%s.open' % BUILTINS, m, create=True):
# Test execute()
@ -150,7 +160,8 @@ class TestComputeTasks(base.TestCase):
port_ids=[],
config_drive_files={
'/etc/octavia/certs/server.pem': 'test_cert',
'/etc/octavia/certs/client_ca.pem': m.return_value})
'/etc/octavia/certs/client_ca.pem': m.return_value,
'/etc/octavia/amphora-agent.conf': 'test_conf'})
# Make sure it returns the expected compute_id
assert(compute_id == COMPUTE_ID)
@ -158,9 +169,10 @@ class TestComputeTasks(base.TestCase):
# Test that a build exception is raised
with mock.patch('%s.open' % BUILTINS, m, create=True):
createcompute = compute_tasks.ComputeCreate()
self.assertRaises(TestException,
self.assertRaises(TypeError,
createcompute.execute,
_amphora_mock, config_drive_files='test_cert')
_amphora_mock,
config_drive_files='test_cert')
# Test revert()