diff --git a/bin/create_certificates.sh b/bin/create_certificates.sh index 19fd98e0af..bd153dfc68 100644 --- a/bin/create_certificates.sh +++ b/bin/create_certificates.sh @@ -13,7 +13,7 @@ # #Example for client use: # -#curl -k -v --key client.key --cacert ca_01.pem --cert client.pem https://0.0.0.0:8443/ +#curl -k -v --key client.key --cacert ca_01.pem --cert client.pem https://0.0.0.0:9443/ # # #Notes: diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 6b6aeff439..74f7d5566f 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -67,6 +67,8 @@ function octavia_configure { iniset $OCTAVIA_CONF controller_worker compute_driver compute_nova_driver iniset $OCTAVIA_CONF controller_worker network_driver allowed_address_pairs_driver + iniset $OCTAVIA_CONF health_manager heartbeat_key ${OCTAVIA_HEALTH_KEY} + iniset $OCTAVIA_CONF DEFAULT api_handler queue_producer iniset $OCTAVIA_CONF oslo_messaging_rabbit rabbit_port 5672 @@ -118,7 +120,7 @@ function build_mgmt_network { neutron security-group-create lb-mgmt-sec-grp neutron security-group-rule-create --protocol icmp lb-mgmt-sec-grp neutron security-group-rule-create --protocol tcp --port-range-min 22 --port-range-max 22 lb-mgmt-sec-grp - neutron security-group-rule-create --protocol tcp --port-range-min 8443 --port-range-max 8443 lb-mgmt-sec-grp + neutron security-group-rule-create --protocol tcp --port-range-min 9443 --port-range-max 9443 lb-mgmt-sec-grp OCTAVIA_MGMT_SEC_GRP_ID=$(nova secgroup-list | awk ' / lb-mgmt-sec-grp / {print $2}') iniset ${OCTAVIA_CONF} controller_worker amp_secgroup_list ${OCTAVIA_MGMT_SEC_GRP_ID} diff --git a/devstack/settings b/devstack/settings index 7e15fa555c..dbfe164b7b 100644 --- a/devstack/settings +++ b/devstack/settings @@ -34,6 +34,8 @@ OCTAVIA_AMP_ACTIVE_RETRIES=${OCTAVIA_AMP_ACTIVE_RETRIES:-"500"} OCTAVIA_AMP_IMAGE_NAME=${OCTAVIA_AMP_IMAGE_NAME:-"amphora-x64-haproxy"} OCTAVIA_AMP_IMAGE_FILE=${OCTAVIA_AMP_IMAGE_FILE:-${OCTAVIA_DIR}/diskimage-create/${OCTAVIA_AMP_IMAGE_NAME}.qcow2} +OCTAVIA_HEALTH_KEY=${OCTAVIA_HEALTH_KEY:-"insecure"} + OCTAVIA_API_BINARY=${OCTAVIA_API_BINARY:-${OCTAVIA_BIN_DIR}/octavia-api} OCTAVIA_CONSUMER_BINARY=${OCTAVIA_CONSUMER_BINARY:-${OCTAVIA_BIN_DIR}/octavia-worker} OCTAVIA_HOUSEKEEPER_BINARY=${OCTAVIA_HOUSEKEEPER_BINARY:-${OCTAVIA_BIN_DIR}/octavia-housekeeping} diff --git a/elements/amphora-agent/install.d/75-run_setup_install b/elements/amphora-agent/install.d/75-run_setup_install index 55f46d3c5c..c02ab7dc57 100755 --- a/elements/amphora-agent/install.d/75-run_setup_install +++ b/elements/amphora-agent/install.d/75-run_setup_install @@ -5,9 +5,8 @@ install-packages libffi-dev libssl-dev cd /opt/amphora-agent/ pip install -r requirements.txt python setup.py install -cp etc/init/octavia-agent.conf /etc/init/ +cp etc/init/amphora-agent.conf /etc/init/ mkdir /etc/octavia -cp etc/octavia.conf /etc/octavia # we assume certs, etc will come in through the config drive mkdir /etc/octavia/certs mkdir /var/lib/octavia diff --git a/etc/init/octavia-agent.conf b/etc/init/amphora-agent.conf similarity index 59% rename from etc/init/octavia-agent.conf rename to etc/init/amphora-agent.conf index 8be2272037..176d37a3da 100644 --- a/etc/init/octavia-agent.conf +++ b/etc/init/amphora-agent.conf @@ -5,4 +5,4 @@ start on startup respawn respawn limit 2 2 -exec amphora-agent --config-file /etc/octavia/octavia.conf +exec amphora-agent --config-file /etc/octavia/amphora-agent.conf diff --git a/etc/octavia.conf b/etc/octavia.conf index be840372c5..68dfa0ee46 100644 --- a/etc/octavia.conf +++ b/etc/octavia.conf @@ -34,9 +34,17 @@ # configuration file. [health_manager] +# bind_ip = 0.0.0.0 +# bind_port = 5555 +# controller_ip_port_list example: 127.0.0.1:5555, 127.0.0.1:5555 +# controller_ip_port_list = # failover_threads = 10 -# interval = 3 -# heartbeat_timeout = 10 +# status_update_threads = 50 +# heartbeat_interval = 10 +# heartbeat_key = +# heartbeat_timeout = 60 +# health_check_interval = 3 +# sock_rlimit = 0 [keystone_authtoken] # auth_uri = https://localhost:5000/v3 @@ -89,8 +97,6 @@ # respawn_interval = 2 # Change for production to a ram drive # haproxy_cert_dir = /tmp -# agent_server_cert = /etc/octavia/certs/server.pem -# agent_server_ca = /etc/octavia/certs/client_ca.pem [controller_worker] # amp_active_retries = 10 diff --git a/octavia/amphorae/backends/agent/agent_jinja_cfg.py b/octavia/amphorae/backends/agent/agent_jinja_cfg.py new file mode 100644 index 0000000000..998d0dcd73 --- /dev/null +++ b/octavia/amphorae/backends/agent/agent_jinja_cfg.py @@ -0,0 +1,57 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +import jinja2 + +from octavia.common.config import cfg +from octavia.common import constants + +CONF = cfg.CONF +CONF.import_group('amphora_agent', 'octavia.common.config') +CONF.import_group('haproxy_amphora', 'octavia.common.config') +CONF.import_group('health_manager', 'octavia.common.config') + +TEMPLATES_DIR = (os.path.dirname(os.path.realpath(__file__)) + + constants.AGENT_API_TEMPLATES + '/') + + +class AgentJinjaTemplater(object): + + def __init__(self): + template_loader = jinja2.FileSystemLoader(searchpath=os.path.dirname( + TEMPLATES_DIR)) + jinja_env = jinja2.Environment(loader=template_loader) + self.agent_template = jinja_env.get_template( + constants.AGENT_CONF_TEMPLATE) + + def build_agent_config(self, amphora_id): + return self.agent_template.render( + {'agent_server_ca': CONF.amphora_agent.agent_server_ca, + 'agent_server_cert': CONF.amphora_agent.agent_server_cert, + 'agent_server_network_dir': + CONF.amphora_agent.agent_server_network_dir, + 'amphora_id': amphora_id, + 'base_cert_dir': CONF.haproxy_amphora.base_cert_dir, + 'base_path': CONF.haproxy_amphora.base_path, + 'bind_host': CONF.haproxy_amphora.bind_host, + 'bind_port': CONF.haproxy_amphora.bind_port, + 'controller_list': CONF.health_manager.controller_ip_port_list, + 'debug': CONF.debug, + 'haproxy_cmd': CONF.haproxy_amphora.haproxy_cmd, + 'heartbeat_interval': CONF.health_manager.heartbeat_interval, + 'heartbeat_key': CONF.health_manager.heartbeat_key, + 'respawn_count': CONF.haproxy_amphora.respawn_count, + 'respawn_interval': CONF.haproxy_amphora.respawn_interval}) diff --git a/octavia/amphorae/backends/agent/api_server/certificate_update.py b/octavia/amphorae/backends/agent/api_server/certificate_update.py index c34e94f247..0e92a8e35d 100644 --- a/octavia/amphorae/backends/agent/api_server/certificate_update.py +++ b/octavia/amphorae/backends/agent/api_server/certificate_update.py @@ -22,17 +22,17 @@ MODE_OWNER = 0o600 BUFFER = 1024 CONF = cfg.CONF -CONF.import_group('haproxy_amphora', 'octavia.common.config') +CONF.import_group('amphora_agent', 'octavia.common.config') def upload_server_cert(): stream = flask.request.stream - with open(CONF.haproxy_amphora.agent_server_cert, 'w') as crt_file: + with open(CONF.amphora_agent.agent_server_cert, 'w') as crt_file: b = stream.read(BUFFER) while b: crt_file.write(b) b = stream.read(BUFFER) os.fchmod(crt_file.fileno(), MODE_OWNER) # only accessible by owner - return flask.make_response(flask.jsonify({ - 'message': 'OK'}), 202) \ No newline at end of file + return flask.make_response(flask.jsonify({ + 'message': 'OK'}), 202) diff --git a/octavia/amphorae/backends/agent/api_server/server.py b/octavia/amphorae/backends/agent/api_server/server.py index a43383359d..8abcdb2c69 100644 --- a/octavia/amphorae/backends/agent/api_server/server.py +++ b/octavia/amphorae/backends/agent/api_server/server.py @@ -43,7 +43,7 @@ for code in six.iterkeys(exceptions.default_exceptions): # Tested with curl -k -XPUT --data-binary @/tmp/test.txt -# https://127.0.0.1:8443/0.5/listeners/123/haproxy +# https://127.0.0.1:9443/0.5/listeners/123/haproxy @app.route('/' + api_server.VERSION + '/listeners//haproxy', methods=['PUT']) def upload_haproxy_config(listener_id): diff --git a/octavia/amphorae/backends/agent/api_server/util.py b/octavia/amphorae/backends/agent/api_server/util.py index 80ad8a2e9d..04a5c5c067 100644 --- a/octavia/amphorae/backends/agent/api_server/util.py +++ b/octavia/amphorae/backends/agent/api_server/util.py @@ -18,6 +18,7 @@ import os from oslo_config import cfg CONF = cfg.CONF +CONF.import_group('amphora_agent', 'octavia.common.config') CONF.import_group('haproxy_amphora', 'octavia.common.config') UPSTART_DIR = '/etc/init' @@ -31,7 +32,7 @@ def haproxy_dir(listener_id): def pid_path(listener_id): - return os.path.join(haproxy_dir(listener_id), 'haproxy.pid') + return os.path.join(haproxy_dir(listener_id), listener_id + '.pid') def config_path(listener_id): @@ -63,5 +64,5 @@ def is_listener_running(listener_id): def get_network_interface_file(interface): - return os.path.join(CONF.haproxy_amphora.agent_server_network_dir, - interface + '.cfg') \ No newline at end of file + return os.path.join(CONF.amphora_agent.agent_server_network_dir, + interface + '.cfg') diff --git a/octavia/amphorae/backends/agent/templates/__init__.py b/octavia/amphorae/backends/agent/templates/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/octavia/amphorae/backends/agent/templates/amphora_agent_conf.template b/octavia/amphorae/backends/agent/templates/amphora_agent_conf.template new file mode 100644 index 0000000000..c67ebd5730 --- /dev/null +++ b/octavia/amphorae/backends/agent/templates/amphora_agent_conf.template @@ -0,0 +1,36 @@ +{# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +#} +[DEFAULT] +debug = {{ debug }} + +[haproxy_amphora] +base_cert_dir = {{ base_cert_dir }} +base_path = {{ base_path }} +bind_host = {{ bind_host }} +bind_port = {{ bind_port }} +haproxy_cmd = {{ haproxy_cmd }} +respawn_count = {{ respawn_count }} +respawn_interval = {{ respawn_interval }} + +[health_manager] +controller_ip_port_list = {{ controller_list|join(', ') }} +heartbeat_interval = {{ heartbeat_interval }} +heartbeat_key = {{ heartbeat_key }} + +[amphora_agent] +agent_server_ca = {{ agent_server_ca }} +agent_server_cert = {{ agent_server_cert }} +agent_server_network_dir = {{ agent_server_network_dir }} +amphora_id = {{ amphora_id }} diff --git a/octavia/amphorae/backends/health_daemon/config.py b/octavia/amphorae/backends/health_daemon/config.py deleted file mode 100644 index e4370da421..0000000000 --- a/octavia/amphorae/backends/health_daemon/config.py +++ /dev/null @@ -1,76 +0,0 @@ -# Copyright 2014 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections -import json - -import octavia.amphorae.backends.health_daemon.singleton as singleton - - -@singleton.singleton -class JSONFileConfig(collections.Mapping): - def __init__(self): - self.filename = None - self.conf = {} - self.observers = set() - - """ Set the config filename and perform the first read - - :param filename: a JSON file that contains the config - """ - def set_filename(self, filename): - self.filename = filename - self.read_config() - - def __iter__(self): - return iter(self.conf) - - def __getitem__(self, k): - return self.conf[k] - - def __len__(self): - return len(self.conf) - - """ Add a callable to be notified of config changes - - :param obs: a callable to receive change events - """ - def add_observer(self, obs): - self.observers.add(obs) - - """ Remove a callable to be notified of config changes - - By design if the callable passed doesn't exist then just return - - :param obs: a callable to attempt to remove - """ - def remove_observer(self, obs): - self.observers.discard(obs) - - """ Force a reread of the config file and inform all observers - """ - def check_update(self): - self.read_config() - self.confirm_update() - - def confirm_update(self): - for observer in self.observers: - observer() - - def read_config(self): - if self.filename is None: - return - - self.cfile = open(self.filename, 'r') - self.conf = json.load(self.cfile) diff --git a/octavia/amphorae/backends/health_daemon/health_daemon.py b/octavia/amphorae/backends/health_daemon/health_daemon.py old mode 100755 new mode 100644 index bb03b07f47..068460b642 --- a/octavia/amphorae/backends/health_daemon/health_daemon.py +++ b/octavia/amphorae/backends/health_daemon/health_daemon.py @@ -14,52 +14,93 @@ # License for the specific language governing permissions and limitations # under the License. -import argparse -import sys +import os import time -import config -import health_sender +from oslo_config import cfg +from oslo_log import log as logging +import six + +from octavia.amphorae.backends.agent.api_server import util +from octavia.amphorae.backends.health_daemon import health_sender +from octavia.amphorae.backends.utils import haproxy_query +from octavia.i18n import _LI + +if six.PY2: + import Queue as queue +else: + import queue + +CONF = cfg.CONF +CONF.import_group('amphora_agent', 'octavia.common.config') +CONF.import_group('haproxy_amphora', 'octavia.common.config') +CONF.import_group('health_manager', 'octavia.common.config') +LOG = logging.getLogger(__name__) +SEQ = 0 -def run_sender(): +def list_sock_stat_files(hadir=None): + stat_sock_files = {} + if hadir is None: + hadir = CONF.haproxy_amphora.base_path + listener_ids = util.get_listeners() + for listener_id in listener_ids: + sock_file = listener_id + ".sock" + stat_sock_files[listener_id] = os.path.join(hadir, sock_file) + return stat_sock_files + + +def run_sender(cmd_queue): + LOG.info(_LI('Health Manager Sender starting.')) sender = health_sender.UDPStatusSender() - cfg = config.JSONFileConfig() - - sighup_received = False - seq = 0 while True: - if sighup_received: - print('re-reading config file') - sighup_received = False - cfg.check_update() - - message = {'not the answer': 43, - 'id': cfg['id'], - 'seq': seq} - seq = seq + 1 + message = build_stats_message() sender.dosend(message) - time.sleep(cfg['delay']) + try: + cmd = cmd_queue.get_nowait() + if cmd is 'reload': + LOG.info(_LI('Reloading configuration')) + CONF.reload_config_files() + elif cmd is 'shutdown': + LOG.info(_LI('Health Manager Sender shutting down.')) + break + except queue.Empty: + pass + time.sleep(CONF.health_manager.heartbeat_interval) -def parse_args(): - parser = argparse.ArgumentParser(description='Health Sender Daemon') - parser.add_argument('-c', '--config', type=str, required=False, - help='config file path', - default='/etc/amphora/status_sender.json') - args = parser.parse_args() - return vars(args) +def get_stats(stat_sock_file): + stats_query = haproxy_query.HAProxyQuery(stat_sock_file) + stats = stats_query.show_stat() + pool_status = stats_query.get_pool_status() + return stats, pool_status -if __name__ == '__main__': - args = parse_args() - cfg = config.JSONFileConfig() - try: - cfg.set_filename(args['config']) - except IOError as exception: - print(exception) - sys.exit(1) - - # Now start up the sender loop - run_sender() - sys.exit(0) +def build_stats_message(): + global SEQ + msg = {'id': CONF.amphora_agent.amphora_id, + 'seq': SEQ, "listeners": {}} + SEQ += 1 + stat_sock_files = list_sock_stat_files() + for listener_id, stat_sock_file in six.iteritems(stat_sock_files): + listener_dict = {'pools': {}, 'status': 'DOWN', + 'stats': {'tx': 0, 'rx': 0, + 'conns': 0, 'totconns': 0}} + msg['listeners'][listener_id] = listener_dict + if util.is_listener_running(listener_id): + (stats, pool_status) = get_stats(stat_sock_file) + listener_dict = msg['listeners'][listener_id] + for row in stats: + if row['svname'] == 'FRONTEND': + listener_dict['stats']['tx'] = int(row['bout']) + listener_dict['stats']['rx'] = int(row['bin']) + listener_dict['stats']['conns'] = int(row['scur']) + listener_dict['stats']['totconns'] = int(row['stot']) + listener_dict['status'] = row['status'] + for oid, pool in six.iteritems(pool_status): + if oid != listener_id: + pool_id = oid + pools = listener_dict['pools'] + pools[pool_id] = {"status": pool['status'], + "members": pool['members']} + return msg diff --git a/octavia/amphorae/backends/health_daemon/health_sender.py b/octavia/amphorae/backends/health_daemon/health_sender.py index f75b363d78..934a9bd463 100644 --- a/octavia/amphorae/backends/health_daemon/health_sender.py +++ b/octavia/amphorae/backends/health_daemon/health_sender.py @@ -12,43 +12,63 @@ # License for the specific language governing permissions and limitations # under the License. -# TODO(barclaac) Need to decide how this hooks into rest of system, -# e.g. daemon, subprocess, thread etc. - import socket -import config -import status_message +from oslo_config import cfg +from oslo_log import log as logging + +from octavia.amphorae.backends.health_daemon import status_message +from octavia.i18n import _LE + +CONF = cfg.CONF +CONF.import_group('health_manager', 'octavia.common.config') +LOG = logging.getLogger(__name__) -class UDPStatusSender: +def round_robin_addr(addrinfo_list): + if len(addrinfo_list) <= 0: + return None + addrinfo = addrinfo_list.pop(0) + addrinfo_list.append(addrinfo) + return addrinfo + + +class UDPStatusSender(object): def __init__(self): - self.cfg = config.JSONFileConfig() - self.dests = {} - self.update(self.cfg['destination'], self.cfg['port']) + self.dests = [] + for ipport in CONF.health_manager.controller_ip_port_list: + parts = ipport.split(':') + self.update(parts[0], parts[1]) self.v4sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.v6sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) - self.key = str(self.cfg['key']) - self.cfg.add_observer(self.config_change) + self.key = str(CONF.health_manager.heartbeat_key) - # TODO(barclaac) Still need to reread the address list if it gets changed - def config_change(self): - pass + def update(self, dest, port): + addrlist = socket.getaddrinfo(dest, port, 0, socket.SOCK_DGRAM) + # addrlist = [(family, socktype, proto, canonname, sockaddr) ...] + # e.g. 4 = sockaddr - what we actually need + for addr in addrlist: + self.dests.append(addr) # Just grab the first match + break - def update(self, dest_list, port): - for dest in dest_list: - addrlist = socket.getaddrinfo(dest, port, 0, socket.SOCK_DGRAM) - # addrlist = [(family, socktype, proto, canonname, sockaddr) ...] - # e.g. 4 = sockaddr - what we actually need - for addr in addrlist: - self.dests[addr[4]] = addr - - def dosend(self, envelope): - envelope_str = status_message.encode(envelope, self.key) - for dest in self.dests.itervalues(): - # addrlist = [(family, socktype, proto, canonname, sockaddr) ...] - # e.g. 0 = sock family, 4 = sockaddr - what we actually need - if dest[0] == socket.AF_INET: - self.v4sock.sendto(envelope_str, dest[4]) - elif dest[0] == socket.AF_INET6: - self.v6sock.sendto(envelope_str, dest[4]) + def dosend(self, obj): + envelope_str = status_message.wrap_envelope(obj, self.key) + addrinfo = round_robin_addr(self.dests) + # dest = (family, socktype, proto, canonname, sockaddr) + # e.g. 0 = sock family, 4 = sockaddr - what we actually need + if addrinfo is None: + LOG.error(_LE('No controller address found. ' + 'Unable to send heartbeat.')) + return + try: + if addrinfo[0] == socket.AF_INET: + self.v4sock.sendto(envelope_str, addrinfo[4]) + elif addrinfo[0] == socket.AF_INET6: + self.v6sock.sendto(envelope_str, addrinfo[4]) + except socket.error: + # Pass here as on amp boot it will get one or more + # error: [Errno 101] Network is unreachable + # while the networks are coming up + # No harm in trying to send as it will still failover + # if the message isn't received + pass diff --git a/octavia/amphorae/backends/health_daemon/sample_config.json b/octavia/amphorae/backends/health_daemon/sample_config.json deleted file mode 100644 index f740e9ae9d..0000000000 --- a/octavia/amphorae/backends/health_daemon/sample_config.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "key": "asamplekey", - "delay": 2.5, - "destination": [ "::1", "127.1" ], - "port": 12345, - "id": "0dc47eda-872b-11e4-920b-000c294b76ae" -} diff --git a/octavia/amphorae/backends/health_daemon/singleton.py b/octavia/amphorae/backends/health_daemon/singleton.py deleted file mode 100644 index d81c705d0e..0000000000 --- a/octavia/amphorae/backends/health_daemon/singleton.py +++ /dev/null @@ -1,26 +0,0 @@ -# Copyright 2014 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# TODO(barclaac) Someone needs to move this to be a library function for -# all of Octavia (Oslo even?) - - -def singleton(cls): - instances = {} - - def getinstance(): - if cls not in instances: - instances[cls] = cls() - return instances[cls] - return getinstance diff --git a/octavia/amphorae/backends/health_daemon/status_message.py b/octavia/amphorae/backends/health_daemon/status_message.py index 3c111b5d5f..ae13a4b9ca 100644 --- a/octavia/amphorae/backends/health_daemon/status_message.py +++ b/octavia/amphorae/backends/health_daemon/status_message.py @@ -1,4 +1,4 @@ -# Copyright 2014 Hewlett-Packard Development Company, L.P. +# Copyright 2014 Hewlett-Packard Development Company, L.P. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -12,22 +12,61 @@ # License for the specific language governing permissions and limitations # under the License. +import binascii import hashlib import hmac import json +import zlib + +from oslo_log import log as logging + +from octavia.common import exceptions +from octavia.i18n import _LW + +LOG = logging.getLogger(__name__) + +hash_algo = hashlib.sha256 +hash_len = 32 -def encode(msg, key): - result = {} - src = json.dumps(msg) - hmc = hmac.new(key.encode('ascii'), src.encode('ascii'), hashlib.sha1) - result['msg'] = msg - result['hmac'] = hmc.hexdigest() - return json.dumps(result) +def to_hex(byte_array): + return binascii.hexlify(byte_array).decode() -def checkhmac(envelope_str, key): - envelope = json.loads(envelope_str) - src = json.dumps(envelope['msg']) - hmc = hmac.new(key.encode('ascii'), src.encode('ascii'), hashlib.sha1) - return hmc.hexdigest() == envelope['hmac'] +def encode_obj(obj): + json_bytes = json.dumps(obj).encode('utf-8') + binary_array = zlib.compress(json_bytes, 9) + return binary_array + + +def decode_obj(binary_array): + json_str = zlib.decompress(binary_array).decode('utf-8') + obj = json.loads(json_str) + return obj + + +def wrap_envelope(obj, key): + payload = encode_obj(obj) + hmc = get_hmac(payload, key) + envelope = payload + hmc + return envelope + + +def unwrap_envelope(envelope, key): + payload = envelope[:-hash_len] + expected_hmc = envelope[-hash_len:] + calculated_hmc = get_hmac(payload, key) + if expected_hmc != calculated_hmc: + LOG.warn(_LW('calculated hmac: %(s1)s not equal to msg hmac: ' + '%(s2)s dropping packet'), {'s1': to_hex(calculated_hmc), + 's2': to_hex(expected_hmc)}) + fmt = 'calculated hmac: {0} not equal to msg hmac: {1} dropping packet' + raise exceptions.InvalidHMACException(fmt.format( + to_hex(calculated_hmc), to_hex(expected_hmc))) + obj = decode_obj(payload) + return obj + + +def get_hmac(payload, key): + hmc = hmac.new(key.encode("utf-8"), payload, hashlib.sha256) + return hmc.digest() diff --git a/octavia/amphorae/backends/utils/haproxy_query.py b/octavia/amphorae/backends/utils/haproxy_query.py index 857714e57f..e227edc3b6 100644 --- a/octavia/amphorae/backends/utils/haproxy_query.py +++ b/octavia/amphorae/backends/utils/haproxy_query.py @@ -90,8 +90,7 @@ class HAProxyQuery(object): """ results = self._query( - 'show stat {proxy_iid} {object_type}' - + '{server_id}'.format( + 'show stat {proxy_iid} {object_type} {server_id}'.format( proxy_iid=proxy_iid, object_type=object_type, server_id=server_id)) @@ -118,16 +117,16 @@ class HAProxyQuery(object): # pxname: pool, svname: server_name, status: status # All the way up is UP, otherwise call it DOWN - if line['status'] != consts.AMPHORA_UP: - line['status'] = consts.AMPHORA_DOWN + if line['status'] != consts.UP: + line['status'] = consts.DOWN if line['pxname'] not in final_results: - final_results[line['pxname']] = dict(members=[]) + final_results[line['pxname']] = dict(members={}) if line['svname'] == 'BACKEND': final_results[line['pxname']]['uuid'] = line['pxname'] final_results[line['pxname']]['status'] = line['status'] else: - final_results[line['pxname']]['members'].append( - {line['svname']: line['status']}) + final_results[line['pxname']]['members'][line['svname']] = ( + line['status']) return final_results diff --git a/octavia/amphorae/drivers/haproxy/rest_api_driver.py b/octavia/amphorae/drivers/haproxy/rest_api_driver.py index 3c50990570..559059abab 100644 --- a/octavia/amphorae/drivers/haproxy/rest_api_driver.py +++ b/octavia/amphorae/drivers/haproxy/rest_api_driver.py @@ -22,6 +22,7 @@ import requests import six from stevedore import driver as stevedore_driver +from octavia.amphorae.driver_exceptions import exceptions as driver_except from octavia.amphorae.drivers import driver_base as driver_base from octavia.amphorae.drivers.haproxy import exceptions as exc from octavia.amphorae.drivers.haproxy.jinja import jinja_cfg @@ -244,10 +245,10 @@ class AmphoraAPIClient(object): LOG.warn(_LW("Could not talk to instance")) time.sleep(CONF.haproxy_amphora.connection_retry_interval) if a >= CONF.haproxy_amphora.connection_max_retries: - raise exc.TimeOutException() + raise driver_except.TimeOutException() else: return r - raise exc.UnavailableException() + raise driver_except.UnavailableException() def upload_config(self, amp, listener_id, config): r = self.put( diff --git a/octavia/amphorae/drivers/health/__init__.py b/octavia/amphorae/drivers/health/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/octavia/amphorae/drivers/health/heartbeat_udp.py b/octavia/amphorae/drivers/health/heartbeat_udp.py new file mode 100644 index 0000000000..6c268a548b --- /dev/null +++ b/octavia/amphorae/drivers/health/heartbeat_udp.py @@ -0,0 +1,181 @@ +# Copyright 2014 Rackspace +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import socket + +from concurrent import futures +from oslo_config import cfg +from oslo_log import log as logging + +from octavia.amphorae.backends.health_daemon import status_message +from octavia.common import exceptions +from octavia.db import repositories +from octavia.i18n import _LI + + +UDP_MAX_SIZE = 64 * 1024 +LOG = logging.getLogger(__name__) + + +class UDPStatusGetter(object): + """This class defines methods that will gather heatbeats + + The heartbeats are transmitted via UDP and this class will bind to a port + and absorb them + """ + def __init__(self, health_update, stats_update): + self.stats_update = stats_update + self.health_update = health_update + self.key = cfg.CONF.health_manager.heartbeat_key + self.ip = cfg.CONF.health_manager.bind_ip + self.port = cfg.CONF.health_manager.bind_port + self.sockaddr = None + LOG.info(_LI( + 'attempting to listen on {0} port {1}').format( + self.ip, self.port)) + self.sock = None + self.update(self.key, self.ip, self.port) + + self.executor = futures.ThreadPoolExecutor( + max_workers=cfg.CONF.health_manager.status_update_threads) + self.repo = repositories.Repositories().amphorahealth + + def update(self, key, ip, port): + """Update the running config for the udp socket server + + :param key: The hmac key used to verify the UDP packets. String + :param ip: The ip address the UDP server will read from + :param port: The port the UDP server will read from + :return: None + """ + self.key = key + for addrinfo in socket.getaddrinfo(ip, port, 0, socket.SOCK_DGRAM): + ai_family = addrinfo[0] + self.sockaddr = addrinfo[4] + if self.sock is not None: + self.sock.close() + self.sock = socket.socket(ai_family, socket.SOCK_DGRAM) + self.sock.bind((ip, port)) + if cfg.CONF.health_manager.sock_rlimit > 0: + rlimit = cfg.CONF.health_manager.sock_rlimit + LOG.info(_LI("setting sock rlimit to {0}").format(rlimit)) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, + rlimit) + break # just used the first addr getaddrinfo finds + if self.sock is None: + raise exceptions.NetworkConfig("unable to find suitable socket") + + def dorecv(self, *args, **kw): + """Waits for a UDP heart beat to be sent. + + :return: Returns the unwrapped payload and addr that sent the \ + heart beat. The format of the obj from the UDP sender is of + the form. Not that listener_1 has not pools and listener_4 + has no nodes. + + {"listeners": { + "listener_uuid_1": { + "pools": {}, + "status": "OPEN", + "stats": { + "conns": 0, + "rx": 0, + "tx": 0 + } + }, + "listener_uuid_2": { + "pools": { + "pool_uuid_1": { + "members": [ + { + "member_uuid_1": "DOWN" + }, + { + "member_uuid_2": "DOWN" + }, + { + "member_uuid_3": "DOWN" + }, + { + "member_uuid_4": "DOWN" + } + ] + } + }, + "status": "OPEN", + "stats": { + "conns": 0, + "rx": 0, + "tx": 0 + } + }, + "listener_uuid_3": { + "pools": { + "pool_uuid_2": { + "members": [ + { + "member_uuid_5": "DOWN" + }, + { + "member_uuid_6": "DOWN" + }, + { + "member_uuid_7": "DOWN" + }, + { + "member_uuid_8": "DOWN" + } + ] + } + }, + "status": "OPEN", + "stats": { + "conns": 0, + "rx": 0, + "tx": 0 + } + }, + "listener_uuid_4": { + "pools": { + "pool_uuid_3": { + "members": [] + } + }, + "status": "OPEN", + "stats": { + "conns": 0, + "rx": 0, + "tx": 0 + } + } + }, + "id": "amphora_uuid", + "seq": 1033 +} + + """ + (data, srcaddr) = self.sock.recvfrom(UDP_MAX_SIZE) + obj = status_message.unwrap_envelope(data, self.key) + return obj, srcaddr + + def check(self): + try: + (obj, srcaddr) = self.dorecv() + if self.health_update: + self.executor.submit(self.health_update.update_health, obj) + if self.stats_update: + self.executor.submit(self.stats_update.update_stats, obj) + except exceptions.InvalidHMACException: + # Pass here as the packet was dropped and logged already + pass diff --git a/octavia/cmd/agent.py b/octavia/cmd/agent.py index 4efd9f9850..96b3151437 100755 --- a/octavia/cmd/agent.py +++ b/octavia/cmd/agent.py @@ -17,6 +17,8 @@ # make sure PYTHONPATH includes the home directory if you didn't install import logging +import multiprocessing as multiproc +import os import ssl import sys @@ -24,11 +26,14 @@ from oslo_config import cfg from werkzeug import serving from octavia.amphorae.backends.agent.api_server import server +from octavia.amphorae.backends.health_daemon import health_daemon from octavia.common import service LOG = logging.getLogger(__name__) CONF = cfg.CONF +CONF.import_group('amphora_agent', 'octavia.common.config') CONF.import_group('haproxy_amphora', 'octavia.common.config') +HM_SENDER_CMD_QUEUE = multiproc.Queue() # Hack: Use werkzeugs context @@ -59,12 +64,21 @@ def main(): # comment out to improve logging service.prepare_service(sys.argv) + # Workaround for an issue with the auto-reload used below in werkzeug + # Without it multiple health senders get started when werkzeug reloads + if not os.environ.get('WERKZEUG_RUN_MAIN'): + health_sender_proc = multiproc.Process(name='HM_sender', + target=health_daemon.run_sender, + args=(HM_SENDER_CMD_QUEUE,)) + health_sender_proc.daemon = True + health_sender_proc.start() + # We will only enforce that the client cert is from the good authority # todo(german): Watch this space for security improvements ctx = OctaviaSSLContext(ssl.PROTOCOL_SSLv23) - ctx.load_cert_chain(CONF.haproxy_amphora.agent_server_cert, - ca=CONF.haproxy_amphora.agent_server_ca) + ctx.load_cert_chain(CONF.amphora_agent.agent_server_cert, + ca=CONF.amphora_agent.agent_server_ca) # This will trigger a reload if any files change and # in particular the certificate file @@ -74,4 +88,4 @@ def main(): use_debugger=CONF.debug, ssl_context=ctx, use_reloader=True, - extra_files=[CONF.haproxy_amphora.agent_server_cert]) + extra_files=[CONF.amphora_agent.agent_server_cert]) diff --git a/octavia/cmd/health_manager.py b/octavia/cmd/health_manager.py index ccda3225f6..295779532e 100755 --- a/octavia/cmd/health_manager.py +++ b/octavia/cmd/health_manager.py @@ -14,13 +14,15 @@ # import multiprocessing import sys -import time from oslo_config import cfg from oslo_log import log as logging +from octavia.amphorae.drivers.health import heartbeat_udp from octavia.common import service from octavia.controller.healthmanager import health_manager +from octavia.controller.healthmanager import update_health_mixin +from octavia.controller.healthmanager import update_stats_mixin from octavia.i18n import _LI CONF = cfg.CONF @@ -28,16 +30,18 @@ LOG = logging.getLogger(__name__) CONF.import_group('health_manager', 'octavia.common.config') -def HM_listener(): +def hm_listener(): + # TODO(german): steved'or load those drivers + udp_getter = heartbeat_udp.UDPStatusGetter( + update_health_mixin.UpdateHealthMixin(), + update_stats_mixin.UpdateStatsMixin()) while True: - time.sleep(5) - # to do by Carlos + udp_getter.check() -def HM_health_check(): +def hm_health_check(): + hm = health_manager.HealthManager() while True: - time.sleep(CONF.health_manager.interval) - hm = health_manager.HealthManager() hm.health_check() @@ -45,21 +49,21 @@ def main(): service.prepare_service(sys.argv) processes = [] - HM_listener_proc = multiprocessing.Process(name='HM_listener', - target=HM_listener) - processes.append(HM_listener_proc) - HM_health_check_proc = multiprocessing.Process(name='HM_health_check', - target=HM_health_check) - processes.append(HM_health_check_proc) + hm_listener_proc = multiprocessing.Process(name='HM_listener', + target=hm_listener) + processes.append(hm_listener_proc) + hm_health_check_proc = multiprocessing.Process(name='HM_health_check', + target=hm_health_check) + processes.append(hm_health_check_proc) LOG.info(_LI("Health Manager listener process starts:")) - HM_listener_proc.start() + hm_listener_proc.start() LOG.info(_LI("Health manager check process starts:")) - HM_health_check_proc.start() + hm_health_check_proc.start() try: for process in processes: process.join() except KeyboardInterrupt: LOG.info(_LI("Health Manager existing due to signal")) - HM_listener_proc.terminate() - HM_health_check_proc.terminate() + hm_listener_proc.terminate() + hm_health_check_proc.terminate() diff --git a/octavia/common/config.py b/octavia/common/config.py index b2b64869c6..5c1a57b948 100644 --- a/octavia/common/config.py +++ b/octavia/common/config.py @@ -80,21 +80,59 @@ core_opts = [ help=_('Name of the controller plugin to use')) ] +# Options only used by the amphora agent +amphora_agent_opts = [ + cfg.StrOpt('agent_server_ca', default='/etc/octavia/certs/client_ca.pem', + help=_("The ca which signed the client certificates")), + cfg.StrOpt('agent_server_cert', default='/etc/octavia/certs/server.pem', + help=_("The server certificate for the agent.py server " + "to use")), + cfg.StrOpt('agent_server_network_dir', + default='/etc/network/interfaces.d/', + help=_("The directory where new network interfaces " + "are located")), + # Do not specify in octavia.conf, loaded at runtime + cfg.StrOpt('amphora_id', help=_("The amphora ID.")), +] + networking_opts = [ cfg.StrOpt('lb_network_name', help=_('Name of amphora internal network')), ] healthmanager_opts = [ + cfg.StrOpt('bind_ip', default='0.0.0.0', + help=_('IP address the controller will listen on for ' + 'heart beats')), + cfg.IntOpt('bind_port', default=5555, + help=_('Port number the controller will listen on' + 'for heart beats')), cfg.IntOpt('failover_threads', default=10, help=_('Number of threads performing amphora failovers.')), - cfg.IntOpt('interval', - default=3, - help=_('Sleep time between health checks in seconds.')), + cfg.IntOpt('status_update_threads', + default=50, + help=_('Number of threads performing amphora failovers.')), + cfg.StrOpt('heartbeat_key', + help=_('key used to validate amphora sending' + 'the message'), secret=True), cfg.IntOpt('heartbeat_timeout', - default=10, + default=60, help=_('Interval, in seconds, to wait before failing over an ' 'amphora.')), + cfg.IntOpt('health_check_interval', + default=3, + help=_('Sleep time between health checks in seconds.')), + cfg.IntOpt('sock_rlimit', default=0, + help=_(' sets the value of the heartbeat recv buffer')), + + # Used by the health manager on the amphora + cfg.ListOpt('controller_ip_port_list', + help=_('List of controller ip and port pairs for the ' + 'heartbeat receivers. Example [\'127.0.0.1:5555\', ' + '\'127.0.0.1:5555\']')), + cfg.IntOpt('heartbeat_interval', + default=10, + help=_('Sleep time between sending hearthbeats.')) ] oslo_messaging_opts = [ @@ -133,7 +171,7 @@ haproxy_amphora_opts = [ # REST server cfg.StrOpt('bind_host', default='0.0.0.0', help=_("The host IP to bind to")), - cfg.IntOpt('bind_port', default=8443, + cfg.IntOpt('bind_port', default=9443, help=_("The port to bind to")), cfg.StrOpt('haproxy_cmd', default='/usr/sbin/haproxy', help=_("The full path to haproxy")), @@ -143,15 +181,6 @@ haproxy_amphora_opts = [ help=_("The respawn interval for haproxy's upstart script")), cfg.StrOpt('haproxy_cert_dir', default='/tmp/', help=_("The directory to store haproxy cert files in")), - cfg.StrOpt('agent_server_cert', default='/etc/octavia/certs/server.pem', - help=_("The server certificate for the agent.py server " - "to use")), - cfg.StrOpt('agent_server_ca', default='/etc/octavia/certs/client_ca.pem', - help=_("The ca which signed the client certificates")), - cfg.StrOpt('agent_server_network_dir', - default='/etc/network/interfaces.d/', - help=_("The directory where new network interfaces " - "are located")), # REST client cfg.StrOpt('client_cert', default='/etc/octavia/certs/client.pem', help=_("The client certificate to talk to the agent")), @@ -236,6 +265,7 @@ house_keeping_opts = [ # Register the configuration options cfg.CONF.register_opts(core_opts) +cfg.CONF.register_opts(amphora_agent_opts, group='amphora_agent') cfg.CONF.register_opts(networking_opts, group='networking') cfg.CONF.register_opts(oslo_messaging_opts, group='oslo_messaging') cfg.CONF.register_opts(haproxy_amphora_opts, group='haproxy_amphora') diff --git a/octavia/common/constants.py b/octavia/common/constants.py index 40f2541ca6..3dba6a82ff 100644 --- a/octavia/common/constants.py +++ b/octavia/common/constants.py @@ -42,12 +42,6 @@ SUPPORTED_PROTOCOLS = (PROTOCOL_TCP, PROTOCOL_HTTPS, PROTOCOL_HTTP, # the provisioning_status table # Amphora has been allocated to a load balancer AMPHORA_ALLOCATED = 'ALLOCATED' -# Amphora healthy with listener(s) deployed -# TODO(johnsom) This doesn't exist -AMPHORA_UP = 'UP' -# Amphora unhealthy with listener(s) deployed -# TODO(johnsom) This doesn't exist -AMPHORA_DOWN = 'DOWN' # Amphora is being built AMPHORA_BOOTING = 'BOOTING' # Amphora is ready to be allocated to a load balancer @@ -65,9 +59,8 @@ SUPPORTED_PROVISIONING_STATUSES = (ACTIVE, AMPHORA_ALLOCATED, PENDING_UPDATE, DELETED, ERROR) MUTABLE_STATUSES = (ACTIVE,) -SUPPORTED_AMPHORA_STATUSES = (AMPHORA_ALLOCATED, AMPHORA_UP, AMPHORA_DOWN, - AMPHORA_BOOTING, AMPHORA_READY, DELETED, - PENDING_DELETE) +SUPPORTED_AMPHORA_STATUSES = (AMPHORA_ALLOCATED, AMPHORA_BOOTING, + AMPHORA_READY, DELETED, PENDING_DELETE) ONLINE = 'ONLINE' OFFLINE = 'OFFLINE' @@ -150,3 +143,18 @@ SUPPORTED_LB_TOPOLOGIES = (TOPOLOGY_ACTIVE_STANDBY, TOPOLOGY_SINGLE) SUPPORTED_AMPHORA_ROLES = (ROLE_BACKUP, ROLE_MASTER, ROLE_STANDALONE) AGENT_API_TEMPLATES = '/templates' +AGENT_CONF_TEMPLATE = 'amphora_agent_conf.template' + +OPEN = 'OPEN' +FULL = 'FULL' + +# OPEN = HAProxy listener status nbconn < maxconn +# FULL = HAProxy listener status not nbconn < maxconn +HAPROXY_LISTENER_STATUSES = (OPEN, FULL) + +UP = 'UP' +DOWN = 'DOWN' + +# UP = HAProxy backend has working or no servers +# DOWN = HAProxy backend has no working servers +HAPROXY_BACKEND_STATUSES = (UP, DOWN) diff --git a/octavia/common/exceptions.py b/octavia/common/exceptions.py index bafe0a11a6..a9d6963805 100644 --- a/octavia/common/exceptions.py +++ b/octavia/common/exceptions.py @@ -80,10 +80,18 @@ class InvalidOption(APIException): code = 400 +class InvalidHMACException(OctaviaException): + message = _("HMAC hashes didn't match") + + class MissingArguments(OctaviaException): message = _("Missing arguments.") +class NetworkConfig(OctaviaException): + message = _("Unable to allocate network resource from config") + + class NeedsPassphrase(OctaviaException): message = _("Passphrase needed to decrypt key but client " "did not provide one.") diff --git a/octavia/compute/drivers/nova_driver.py b/octavia/compute/drivers/nova_driver.py index 25c3b2880b..ec760b231f 100644 --- a/octavia/compute/drivers/nova_driver.py +++ b/octavia/compute/drivers/nova_driver.py @@ -106,11 +106,11 @@ class VirtualMachineManager(compute_base.ComputeBase): try: amphora = self.get_amphora(amphora_id=amphora_id) if amphora and amphora.status == 'ACTIVE': - return constants.AMPHORA_UP + return constants.UP except Exception: LOG.exception(_LE("Error retrieving nova virtual machine status.")) raise exceptions.ComputeStatusException() - return constants.AMPHORA_DOWN + return constants.DOWN def get_amphora(self, amphora_id): '''Retrieve the information in nova of a virtual machine. diff --git a/octavia/controller/healthmanager/health_manager.py b/octavia/controller/healthmanager/health_manager.py index 7cfdbc839c..83d61de1c0 100644 --- a/octavia/controller/healthmanager/health_manager.py +++ b/octavia/controller/healthmanager/health_manager.py @@ -39,7 +39,7 @@ class HealthManager(object): with futures.ThreadPoolExecutor(max_workers=self.threads) as executor: try: while True: - time.sleep(CONF.health_manager.interval) + time.sleep(CONF.health_manager.health_check_interval) session = db_api.get_session() LOG.debug("Starting amphora health check") failover_count = 0 diff --git a/octavia/controller/healthmanager/update_health_mixin.py b/octavia/controller/healthmanager/update_health_mixin.py index 4d8130f069..05d372e6e8 100644 --- a/octavia/controller/healthmanager/update_health_mixin.py +++ b/octavia/controller/healthmanager/update_health_mixin.py @@ -12,14 +12,16 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime + from oslo_log import log as logging import sqlalchemy -from sqlalchemy.sql import func from octavia.amphorae.drivers import driver_base as driver_base from octavia.common import constants from octavia.db import api as db_api from octavia.db import repositories as repo +from octavia.i18n import _LE, _LW import six @@ -31,8 +33,10 @@ class UpdateHealthMixin(driver_base.HealthMixin): def __init__(self): super(UpdateHealthMixin, self).__init__() # first setup repo for amphora, listener,member(nodes),pool repo + self.amphora_repo = repo.AmphoraRepository() self.amphora_health_repo = repo.AmphoraHealthRepository() self.listener_repo = repo.ListenerRepository() + self.loadbalancer_repo = repo.LoadBalancerRepository() self.member_repo = repo.MemberRepository() self.pool_repo = repo.PoolRepository() @@ -43,76 +47,131 @@ class UpdateHealthMixin(driver_base.HealthMixin): :type map: string :returns: null - This function has the following 3 goals: - 1)Update the health_manager table based on amphora status is up/down - 2)Update related DB status to be ERROR/DOWN when amphora is down - 3)Update related DB status to be ACTIVATE/UP when amphora is up - 4)Track the status of the members - The input health data structure is shown as below: health = { - "amphora-status": "AMPHORA_UP", - "amphora-id": FAKE_UUID_1, + "id": self.FAKE_UUID_1, "listeners": { - "listener-id-1": {"listener-status": "ONLINE", - "members": { - "member-id-1": "ONLINE", - "member-id-2": "ONLINE" - } - }, - "listener-id-2": {"listener-status": "ONLINE", - "members": { - "member-id-3": "ERROR", - "member-id-4": "ERROR", - "member-id-5": "ONLINE" + "listener-id-1": {"status": constants.OPEN, "pools": { + "pool-id-1": {"status": constants.UP, + "members": {"member-id-1": constants.ONLINE} } } + } } } + """ session = db_api.get_session() - # if the input amphora is healthy, we update its db info - # before update db, we need to check if the db has been created, - # if not, we need to create first - if health["amphora-status"] == constants.AMPHORA_UP: - amphora_id = health["amphora-id"] - amphora = self.amphora_health_repo.get( - session, amphora_id=amphora_id) - if amphora is None: - self.amphora_health_repo.create(session, - amphora_id=amphora_id, - last_update=func.now()) - else: - self.amphora_health_repo.update(session, amphora_id, - last_update=func.now()) + # We need to see if all of the listeners are reporting in + expected_listener_count = 0 + lbs_on_amp = self.amphora_repo.get_all_lbs_on_amphora(session, + health['id']) + for lb in lbs_on_amp: + listener_count = self.listener_repo.count(session, + load_balancer_id=lb.id) + expected_listener_count += listener_count + + listeners = health['listeners'] + + # Do not update ampohra health if the reporting listener count + # does not match the expected listener count + if len(listeners) == expected_listener_count: + + # if the input amphora is healthy, we update its db info + self.amphora_health_repo.replace(session, health['id'], + last_update=(datetime. + datetime.utcnow())) + else: + LOG.warn(_LW('Amphora %(id)s health message reports %(found)i ' + 'listeners when %(expected)i expected'), + {'id': health['id'], + 'found': len(listeners), + 'expected': expected_listener_count}) + + # We got a heartbeat so lb is healthy until proven otherwise + lb_status = constants.ONLINE # update listener and nodes db information - listeners = health['listeners'] for listener_id, listener in six.iteritems(listeners): - if listener.get("listener-status") == constants.ONLINE: - try: + listener_model = self.listener_repo.get(session, id=listener_id) + lb_id = listener_model.load_balancer_id + + listener_status = None + # OPEN = HAProxy listener status nbconn < maxconn + if listener.get('status') == constants.OPEN: + listener_status = constants.ONLINE + # FULL = HAProxy listener status not nbconn < maxconn + elif listener.get('status') == constants.FULL: + listener_status = constants.DEGRADED + if lb_status == constants.ONLINE: + lb_status = constants.DEGRADED + else: + LOG.warn(_LW('Listener %(list)s reported status of ' + '%(status)s'), {'list': listener_id, + 'status': listener.get('status')}) + + try: + if listener_status is not None: self.listener_repo.update( session, listener_id, - operating_status=constants.ONLINE) - except sqlalchemy.orm.exc.NoResultFound: - LOG.debug("Listener %s is not in DB", listener_id) + operating_status=listener_status) + except sqlalchemy.orm.exc.NoResultFound: + LOG.error(_LE("Listener %s is not in DB"), listener_id) + + pools = listener['pools'] + for pool_id, pool in six.iteritems(pools): + + pool_status = None + # UP = HAProxy backend has working or no servers + if pool.get('status') == constants.UP: + pool_status = constants.ONLINE + # DOWN = HAProxy backend has no working servers + elif pool.get('status') == constants.DOWN: + pool_status = constants.ERROR + lb_status = constants.ERROR + else: + LOG.warn(_LW('Pool %(pool)s reported status of ' + '%(status)s'), {'pool': pool_id, + 'status': pool.get('status')}) + + members = pool['members'] + for member_id, status in six.iteritems(members): + + member_status = None + if status == constants.UP: + member_status = constants.ONLINE + elif status == constants.DOWN: + member_status = constants.ERROR + if pool_status == constants.ONLINE: + pool_status = constants.DEGRADED + if lb_status == constants.ONLINE: + lb_status = constants.DEGRADED + else: + LOG.warn(_LW('Member %(mem)s reported status of ' + '%(status)s'), {'mem': member_id, + 'status': status}) + + try: + if member_status is not None: + self.member_repo.update(session, id=member_id, + operating_status=( + member_status)) + except sqlalchemy.orm.exc.NoResultFound: + LOG.error(_LE("Member %s is not able to update " + "in DB"), member_id) - elif listener.get("listener-status") == constants.ERROR: try: - self.listener_repo.update(session, listener_id, - operating_status=constants.ERROR) + if pool_status is not None: + self.pool_repo.update(session, pool_id, + operating_status=pool_status) except sqlalchemy.orm.exc.NoResultFound: - LOG.debug("Listener %s is not in DB", listener_id) - members = listener['members'] - for member_id, member in six.iteritems(members): - if member in constants.SUPPORTED_OPERATING_STATUSES: - try: - self.member_repo.update( - session, id=member_id, - operating_status=member) - except sqlalchemy.orm.exc.NoResultFound: - LOG.DEBUG("Member %s is not able to update in DB", - member_id) + LOG.error(_LE("Pool %s is not in DB"), pool_id) + + try: + self.loadbalancer_repo.update(session, lb_id, + operating_status=lb_status) + except sqlalchemy.orm.exc.NoResultFound: + LOG.error(_LE("Load balancer %s is not in DB"), lb_id) diff --git a/octavia/controller/healthmanager/update_stats_mixin.py b/octavia/controller/healthmanager/update_stats_mixin.py new file mode 100644 index 0000000000..fcb4061cc0 --- /dev/null +++ b/octavia/controller/healthmanager/update_stats_mixin.py @@ -0,0 +1,69 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging + +from octavia.amphorae.drivers import driver_base as driver_base +from octavia.db import api as db_api +from octavia.db import repositories as repo + +import six + +LOG = logging.getLogger(__name__) + + +class UpdateStatsMixin(driver_base.StatsMixin): + + def __init__(self): + super(UpdateStatsMixin, self).__init__() + self.listener_stats_repo = repo.ListenerStatisticsRepository() + + def update_stats(self, health_message): + """This function is to update the db with listener stats + + :param health_message: The health message containing the listener stats + :type map: string + :returns: null + + health = { + "id": self.FAKE_UUID_1, + "listeners": { + "listener-id-1": {"status": constants.OPEN, + 'stats': {'conns': 0, + 'totconns': 0, + 'rx': 0, + 'tx': 0}, + "pools": { + "pool-id-1": {"status": constants.UP, + "members": {"member-id-1": constants.ONLINE} + } + } + } + } + } + + """ + session = db_api.get_session() + + listeners = health_message['listeners'] + for listener_id, listener in six.iteritems(listeners): + + stats = listener.get('stats') + + self.listener_stats_repo.replace(session, listener_id, + bytes_in=stats['rx'], + bytes_out=stats['tx'], + active_connections=stats['conns'], + total_connections=( + stats['totconns'])) diff --git a/octavia/controller/worker/tasks/compute_tasks.py b/octavia/controller/worker/tasks/compute_tasks.py index da6cec3631..0e6c708d19 100644 --- a/octavia/controller/worker/tasks/compute_tasks.py +++ b/octavia/controller/worker/tasks/compute_tasks.py @@ -21,6 +21,7 @@ from stevedore import driver as stevedore_driver from taskflow import task from taskflow.types import failure +from octavia.amphorae.backends.agent import agent_jinja_cfg from octavia.common import constants from octavia.common import exceptions from octavia.i18n import _LE, _LW @@ -46,7 +47,7 @@ class BaseComputeTask(task.Task): class ComputeCreate(BaseComputeTask): """Create the compute instance for a new amphora.""" - def execute(self, amphora_id, ports=None, config_drive_files=None): + def execute(self, amphora_id, ports=None, config_drive_files={}): """Create an amphora :returns: an amphora @@ -56,6 +57,9 @@ class ComputeCreate(BaseComputeTask): % amphora_id) try: + agent_cfg = agent_jinja_cfg.AgentJinjaTemplater() + config_drive_files['/etc/octavia/amphora-agent.conf'] = ( + agent_cfg.build_agent_config(amphora_id)) compute_id = self.compute.build( name="amphora-" + amphora_id, amphora_flavor=CONF.controller_worker.amp_flavor_id, diff --git a/octavia/db/repositories.py b/octavia/db/repositories.py index 6ef6995828..abc778b946 100644 --- a/octavia/db/repositories.py +++ b/octavia/db/repositories.py @@ -35,6 +35,15 @@ CONF.import_group('health_manager', 'octavia.common.config') class BaseRepository(object): model_class = None + def count(self, session, **filters): + """Retrieves a count of entities from the database. + + :param session: A Sql Alchemy database session. + :param filters: Filters to decide which entities should be retrieved. + :returns: int + """ + return session.query(self.model_class).filter_by(**filters).count() + def create(self, session, **model_kwargs): """Base create method for a database entity. @@ -296,6 +305,19 @@ class ListenerRepository(BaseRepository): class ListenerStatisticsRepository(BaseRepository): model_class = models.ListenerStatistics + def replace(self, session, listener_id, **model_kwargs): + """replace or insert listener into database.""" + with session.begin(subtransactions=True): + count = session.query(self.model_class).filter_by( + listener_id=listener_id).count() + if count: + session.query(self.model_class).filter_by( + listener_id=listener_id).update(model_kwargs, + synchronize_session=False) + else: + model_kwargs['listener_id'] = listener_id + self.create(session, **model_kwargs) + def update(self, session, listener_id, **model_kwargs): """Updates a listener's statistics by a listener's id.""" with session.begin(subtransactions=True): @@ -396,6 +418,19 @@ class AmphoraHealthRepository(BaseRepository): session.query(self.model_class).filter_by( amphora_id=amphora_id).update(model_kwargs) + def replace(self, session, amphora_id, **model_kwargs): + """replace or insert amphora into database.""" + with session.begin(subtransactions=True): + count = session.query(self.model_class).filter_by( + amphora_id=amphora_id).count() + if count: + session.query(self.model_class).filter_by( + amphora_id=amphora_id).update(model_kwargs, + synchronize_session=False) + else: + model_kwargs['amphora_id'] = amphora_id + self.create(session, **model_kwargs) + def check_amphora_expired(self, session, amphora_id, exp_age=None): """check if a specific amphora is expired @@ -420,10 +455,9 @@ class AmphoraHealthRepository(BaseRepository): :returns: [octavia.common.data_model] """ - timestamp = CONF.health_manager.heartbeat_timeout - + timeout = CONF.health_manager.heartbeat_timeout expired_time = datetime.datetime.utcnow() - datetime.timedelta( - seconds=timestamp) + seconds=timeout) with session.begin(subtransactions=True): amp = session.query(self.model_class).with_for_update().filter_by( diff --git a/octavia/opts.py b/octavia/opts.py index 44b0b9fddc..0b35c18c2a 100644 --- a/octavia/opts.py +++ b/octavia/opts.py @@ -19,6 +19,7 @@ def list_opts(): return [ ('DEFAULT', itertools.chain(octavia.common.config.core_opts)), + ('amphora_agent', octavia.common.config.amphora_agent_opts), ('networking', octavia.common.config.networking_opts), ('oslo_messaging', octavia.common.config.oslo_messaging_opts), ('keystone_authtoken_v3', diff --git a/octavia/tests/functional/amphorae/backend/agent/api_server/test_server.py b/octavia/tests/functional/amphorae/backend/agent/api_server/test_server.py index 76727fc0cd..9978f01d00 100644 --- a/octavia/tests/functional/amphorae/backend/agent/api_server/test_server.py +++ b/octavia/tests/functional/amphorae/backend/agent/api_server/test_server.py @@ -203,7 +203,7 @@ class ServerTestCase(base.TestCase): json.loads(rv.data.decode('utf-8'))) mock_rmtree.assert_called_with('/var/lib/octavia/123') mock_exists.assert_called_with('/etc/init/haproxy-123.conf') - mock_exists.assert_any_call('/var/lib/octavia/123/haproxy.pid') + mock_exists.assert_any_call('/var/lib/octavia/123/123.pid') # service is stopped + upstart script mock_exists.side_effect = [True, False, True] @@ -339,7 +339,7 @@ class ServerTestCase(base.TestCase): type='test', uuid='123', pools=[dict( - status=consts.AMPHORA_DOWN, + status=consts.DOWN, uuid='tcp-servers', members=[ {u'id-34833': u'DOWN'}, diff --git a/octavia/tests/functional/db/test_repositories.py b/octavia/tests/functional/db/test_repositories.py index 4b2d56f7c1..dc5d7588e1 100644 --- a/octavia/tests/functional/db/test_repositories.py +++ b/octavia/tests/functional/db/test_repositories.py @@ -13,6 +13,7 @@ # under the License. import datetime +import random from oslo_utils import uuidutils @@ -696,6 +697,47 @@ class ListenerStatisticsRepositoryTest(BaseRepositoryTest): self.assertIsNotNone(new_listener) self.assertIsNone(new_listener.stats) + def test_replace(self): + # Test the create path + bytes_in = random.randrange(1000000000) + bytes_out = random.randrange(1000000000) + active_conns = random.randrange(1000000000) + total_conns = random.randrange(1000000000) + self.assertIsNone(self.listener_stats_repo.get( + self.session, listener_id=self.listener.id)) + self.listener_stats_repo.replace(self.session, self.listener.id, + bytes_in=bytes_in, + bytes_out=bytes_out, + active_connections=active_conns, + total_connections=total_conns) + obj = self.listener_stats_repo.get(self.session, + listener_id=self.listener.id) + self.assertIsNotNone(obj) + self.assertEqual(self.listener.id, obj.listener_id) + self.assertEqual(bytes_in, obj.bytes_in) + self.assertEqual(bytes_out, obj.bytes_out) + self.assertEqual(active_conns, obj.active_connections) + self.assertEqual(total_conns, obj.total_connections) + + # Test the update path + bytes_in_2 = random.randrange(1000000000) + bytes_out_2 = random.randrange(1000000000) + active_conns_2 = random.randrange(1000000000) + total_conns_2 = random.randrange(1000000000) + self.listener_stats_repo.replace(self.session, self.listener.id, + bytes_in=bytes_in_2, + bytes_out=bytes_out_2, + active_connections=active_conns_2, + total_connections=total_conns_2) + obj = self.listener_stats_repo.get(self.session, + listener_id=self.listener.id) + self.assertIsNotNone(obj) + self.assertEqual(self.listener.id, obj.listener_id) + self.assertEqual(bytes_in_2, obj.bytes_in) + self.assertEqual(bytes_out_2, obj.bytes_out) + self.assertEqual(active_conns_2, obj.active_connections) + self.assertEqual(total_conns_2, obj.total_connections) + class HealthMonitorRepositoryTest(BaseRepositoryTest): @@ -1081,6 +1123,11 @@ class AmphoraRepositoryTest(BaseRepositoryTest): self.assertIsInstance(new_amphora, models.Amphora) self.assertEqual(amphora, new_amphora) + def test_count(self): + amphora = self.create_amphora(self.FAKE_UUID_1) + amp_count = self.amphora_repo.count(self.session, id=amphora.id) + self.assertEqual(amp_count, 1) + def test_create(self): amphora = self.create_amphora(self.FAKE_UUID_1) self.assertEqual(self.FAKE_UUID_1, amphora.id) @@ -1088,6 +1135,16 @@ class AmphoraRepositoryTest(BaseRepositoryTest): self.assertEqual(constants.ACTIVE, amphora.status) self.assertEqual(constants.ROLE_MASTER, amphora.role) + def test_exists_true(self): + amphora = self.create_amphora(self.FAKE_UUID_1) + exist = self.amphora_repo.exists(self.session, id=amphora.id) + self.assertTrue(exist) + + def test_exists_false(self): + self.create_amphora(self.FAKE_UUID_1) + exist = self.amphora_repo.exists(self.session, id='test') + self.assertFalse(exist) + def test_update(self): status_change = constants.PENDING_UPDATE amphora = self.create_amphora(self.FAKE_UUID_1) @@ -1170,6 +1227,26 @@ class AmphoraHealthRepositoryTest(BaseRepositoryTest): busy=False) return amphora_health + def test_replace(self): + amphora_id = uuidutils.generate_uuid() + now = datetime.datetime.utcnow() + self.assertIsNone(self.amphora_health_repo.get( + self.session, amphora_id=amphora_id)) + self.amphora_health_repo.replace(self.session, amphora_id, + last_update=now) + obj = self.amphora_health_repo.get(self.session, amphora_id=amphora_id) + self.assertIsNotNone(obj) + self.assertEqual(amphora_id, obj.amphora_id) + self.assertEqual(now, obj.last_update) + + now += datetime.timedelta(seconds=69) + self.amphora_health_repo.replace(self.session, amphora_id, + last_update=now) + obj = self.amphora_health_repo.get(self.session, amphora_id=amphora_id) + self.assertIsNotNone(obj) + self.assertEqual(amphora_id, obj.amphora_id) + self.assertEqual(now, obj.last_update) + def test_get(self): amphora_health = self.create_amphora_health(self.amphora.id) new_amphora_health = self.amphora_health_repo.get( @@ -1194,7 +1271,11 @@ class AmphoraHealthRepositoryTest(BaseRepositoryTest): self.session, self.amphora.id, exp_age) self.assertTrue(checkres) - def test_get_stale_amphorae(self): + def test_get_stale_amphora(self): + stale_amphora = self.amphora_health_repo.get_stale_amphora( + self.session) + self.assertIsNone(stale_amphora) + self.create_amphora_health(self.amphora.id) stale_amphora = self.amphora_health_repo.get_stale_amphora( self.session) diff --git a/octavia/tests/unit/amphorae/backends/agent/test_agent_jinja_cfg.py b/octavia/tests/unit/amphorae/backends/agent/test_agent_jinja_cfg.py new file mode 100644 index 0000000000..517ea0cc99 --- /dev/null +++ b/octavia/tests/unit/amphorae/backends/agent/test_agent_jinja_cfg.py @@ -0,0 +1,76 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg +from oslo_config import fixture as oslo_fixture +from oslo_utils import uuidutils + +from octavia.amphorae.backends.agent import agent_jinja_cfg +import octavia.tests.unit.base as base + +AMP_ID = uuidutils.generate_uuid() + + +class AgentJinjaTestCase(base.TestCase): + def setUp(self): + super(AgentJinjaTestCase, self).setUp() + + conf = oslo_fixture.Config(cfg.CONF) + conf.config(debug=False) + conf.config(group="amphora_agent", + agent_server_ca='/etc/octavia/certs/client_ca.pem') + conf.config(group="amphora_agent", + agent_server_cert='/etc/octavia/certs/server.pem') + conf.config(group="amphora_agent", + agent_server_network_dir='/etc/network/interfaces.d/') + conf.config(group="haproxy_amphora", + base_cert_dir='/var/lib/octavia/certs') + conf.config(group="haproxy_amphora", base_path='/var/lib/octavia') + conf.config(group="haproxy_amphora", bind_host='0.0.0.0') + conf.config(group="haproxy_amphora", bind_port=9443) + conf.config(group="haproxy_amphora", haproxy_cmd='/usr/sbin/haproxy') + conf.config(group="haproxy_amphora", respawn_count=2) + conf.config(group="haproxy_amphora", respawn_interval=2) + conf.config(group="health_manager", + controller_ip_port_list=['192.0.2.10:5555']) + conf.config(group="health_manager", heartbeat_interval=10) + conf.config(group="health_manager", heartbeat_key='TEST') + + self.expected_config = ('\n[DEFAULT]\n' + 'debug = False\n\n' + '[haproxy_amphora]\n' + 'base_cert_dir = /var/lib/octavia/certs\n' + 'base_path = /var/lib/octavia\n' + 'bind_host = 0.0.0.0\n' + 'bind_port = 9443\n' + 'haproxy_cmd = /usr/sbin/haproxy\n' + 'respawn_count = 2\n' + 'respawn_interval = 2\n\n' + '[health_manager]\n' + 'controller_ip_port_list = 192.0.2.10:5555\n' + 'heartbeat_interval = 10\n' + 'heartbeat_key = TEST\n\n' + '[amphora_agent]\n' + 'agent_server_ca = ' + '/etc/octavia/certs/client_ca.pem\n' + 'agent_server_cert = ' + '/etc/octavia/certs/server.pem\n' + 'agent_server_network_dir = ' + '/etc/network/interfaces.d/\n' + 'amphora_id = ' + AMP_ID) + + def test_build_agent_config(self): + ajc = agent_jinja_cfg.AgentJinjaTemplater() + agent_cfg = ajc.build_agent_config(AMP_ID) + self.assertEqual(self.expected_config, agent_cfg) diff --git a/octavia/tests/unit/amphorae/backends/health_daemon/test_config.py b/octavia/tests/unit/amphorae/backends/health_daemon/test_config.py deleted file mode 100644 index 5d9470dfdb..0000000000 --- a/octavia/tests/unit/amphorae/backends/health_daemon/test_config.py +++ /dev/null @@ -1,95 +0,0 @@ -# Copyright 2014 Hewlett-Packard Development-Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import json -import os -import tempfile - -from testtools import matchers - -from octavia.amphorae.backends.health_daemon import config -from octavia.tests.unit import base - - -class TestConfig(base.TestCase): - def setUp(self): - super(TestConfig, self).setUp() - self.setup_config_file() - self.addCleanup(self.remove_config_file) - - def test_noconfig(self): - cfg = config.JSONFileConfig() - self.assertThat(lambda: cfg.set_filename('/doesnotexist'), - matchers.raises(IOError)) - - def test_config(self): - def check_update(): - self.assertFalse(self.update_called) - self.update_called = True - - cfg = config.JSONFileConfig() - cfg.set_filename(self.sampleconfig[1]) - - # Check the singleton decorator - self.assertIs(cfg, config.JSONFileConfig()) - - cfg.add_observer(check_update) - - self.update_called = False - cfg.check_update() - self.assertTrue(self.update_called) - - self.assertIs(cfg['delay'], 10) - - # First test - change the existing file - same file, no change - with open(self.sampleconfig[1], 'w+') as f: - cdata = {'delay': 5} - json.dump(cdata, f) - - self.update_called = False - cfg.check_update() - self.assertTrue(self.update_called) - - self.assertIs(cfg['delay'], 5) - - # Check for removing an observer - Thanks Stephen - cfg.remove_observer(check_update) - self.update_called = False - cfg.check_update() - self.assertFalse(self.update_called) - - # Better add it back for the next test - cfg.add_observer(check_update) - - # Next, replace the file (new inode) - self.remove_config_file() - with open(self.sampleconfig[1], 'w+') as f: - cdata = {'delay': 3} - json.dump(cdata, f) - - self.update_called = False - cfg.check_update() - self.assertTrue(self.update_called) - - self.assertIs(cfg['delay'], 3) - - def setup_config_file(self): - self.sampleconfig = tempfile.mkstemp() - conffile = os.fdopen(self.sampleconfig[0], 'w+') - cdata = {'delay': 10} - json.dump(cdata, conffile) - conffile.close() - - def remove_config_file(self): - os.unlink(self.sampleconfig[1]) diff --git a/octavia/tests/unit/amphorae/backends/health_daemon/test_envelope.py b/octavia/tests/unit/amphorae/backends/health_daemon/test_envelope.py index 5b70e6ee0a..5836198e92 100644 --- a/octavia/tests/unit/amphorae/backends/health_daemon/test_envelope.py +++ b/octavia/tests/unit/amphorae/backends/health_daemon/test_envelope.py @@ -15,6 +15,7 @@ import uuid from octavia.amphorae.backends.health_daemon import status_message +from octavia.common import exceptions from octavia.tests.unit import base @@ -23,11 +24,16 @@ class TestEnvelope(base.TestCase): super(TestEnvelope, self).setUp() def test_message_hmac(self): - self.skipTest("This test is broken and will be fixed in CR# 201882") - statusMsg = {'seq': 42, - 'status': 'OK', - 'id': str(uuid.uuid4())} - sme = status_message.encode(statusMsg, 'samplekey1') - - self.assertTrue(status_message.checkhmac(sme, 'samplekey1')) - self.assertFalse(status_message.checkhmac(sme, 'samplekey2')) + seq = 42 + for i in range(0, 16): + statusMsg = {'seq': seq, + 'status': 'OK', + 'id': str(uuid.uuid4())} + envelope = status_message.wrap_envelope(statusMsg, 'samplekey1') + obj = status_message.unwrap_envelope(envelope, 'samplekey1') + self.assertEqual(obj['status'], 'OK') + self.assertEqual(obj['seq'], seq) + seq += 1 + args = (envelope, 'samplekey?') + self.assertRaises(exceptions.InvalidHMACException, + status_message.unwrap_envelope, *args) diff --git a/octavia/tests/unit/amphorae/backends/health_daemon/test_health_daemon.py b/octavia/tests/unit/amphorae/backends/health_daemon/test_health_daemon.py new file mode 100644 index 0000000000..37e14c40fb --- /dev/null +++ b/octavia/tests/unit/amphorae/backends/health_daemon/test_health_daemon.py @@ -0,0 +1,259 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from oslo_config import cfg +from oslo_config import fixture as oslo_fixture +from oslo_utils import uuidutils +import six + +from octavia.amphorae.backends.health_daemon import health_daemon +import octavia.tests.unit.base as base + +if six.PY2: + import Queue as queue + + import mock +else: + import queue + + import unittest.mock as mock + +LISTENER_ID1 = uuidutils.generate_uuid() +LISTENER_ID2 = uuidutils.generate_uuid() +LISTENER_IDS = [LISTENER_ID1, LISTENER_ID2] +BASE_PATH = '/tmp/test' +SAMPLE_POOL_STATUS = {'432fc8b3-d446-48d4-bb64-13beb90e22bc': { + 'status': 'UP', + 'uuid': '432fc8b3-d446-48d4-bb64-13beb90e22bc', + 'members': { + '302e33d9-dee1-4de9-98d5-36329a06fb58': 'DOWN'}}} + +SAMPLE_BOGUS_POOL_STATUS = {LISTENER_ID1: { + 'status': 'UP', + 'uuid': LISTENER_ID1, + 'members': { + '302e33d9-dee1-4de9-98d5-36329a06fb58': + 'DOWN'}}} + +SAMPLE_STATS = ({'': '', 'status': 'OPEN', 'lastchg': '', + 'weight': '', 'slim': '2000', 'pid': '1', 'comp_byp': '0', + 'lastsess': '', 'rate_lim': '0', 'check_duration': '', + 'rate': '0', 'req_rate': '0', 'check_status': '', + 'econ': '', 'comp_out': '0', 'wredis': '', 'dresp': '0', + 'ereq': '0', 'tracked': '', 'comp_in': '0', + 'pxname': '490b6ae7-21aa-43f1-b82a-68ddcd2ca2fb', + 'dreq': '0', 'hrsp_5xx': '0', 'last_chk': '', + 'check_code': '', 'sid': '0', 'bout': '0', 'hrsp_1xx': '0', + 'qlimit': '', 'hrsp_other': '0', 'bin': '0', 'rtime': '', + 'smax': '0', 'req_tot': '0', 'lbtot': '', 'stot': '0', + 'wretr': '', 'req_rate_max': '0', 'ttime': '', 'iid': '2', + 'hrsp_4xx': '0', 'chkfail': '', 'hanafail': '', + 'downtime': '', 'qcur': '', 'eresp': '', 'comp_rsp': '0', + 'cli_abrt': '', 'ctime': '', 'qtime': '', 'srv_abrt': '', + 'throttle': '', 'last_agt': '', 'scur': '0', 'type': '0', + 'bck': '', 'qmax': '', 'rate_max': '0', 'hrsp_2xx': '0', + 'act': '', 'chkdown': '', 'svname': 'FRONTEND', + 'hrsp_3xx': '0'}, + {'': '', 'status': 'no check', 'lastchg': '', 'weight': '1', + 'slim': '', 'pid': '1', 'comp_byp': '', 'lastsess': '-1', + 'rate_lim': '', 'check_duration': '', 'rate': '0', + 'req_rate': '', 'check_status': '', 'econ': '0', + 'comp_out': '', 'wredis': '0', 'dresp': '0', 'ereq': '', + 'tracked': '', 'comp_in': '', + 'pxname': '432fc8b3-d446-48d4-bb64-13beb90e22bc', + 'dreq': '', 'hrsp_5xx': '0', 'last_chk': '', + 'check_code': '', 'sid': '1', 'bout': '0', 'hrsp_1xx': '0', + 'qlimit': '', 'hrsp_other': '0', 'bin': '0', 'rtime': '0', + 'smax': '0', 'req_tot': '', 'lbtot': '0', 'stot': '0', + 'wretr': '0', 'req_rate_max': '', 'ttime': '0', 'iid': '3', + 'hrsp_4xx': '0', 'chkfail': '', 'hanafail': '0', + 'downtime': '', 'qcur': '0', 'eresp': '0', 'comp_rsp': '', + 'cli_abrt': '0', 'ctime': '0', 'qtime': '0', 'srv_abrt': '0', + 'throttle': '', 'last_agt': '', 'scur': '0', 'type': '2', + 'bck': '0', 'qmax': '0', 'rate_max': '0', 'hrsp_2xx': '0', + 'act': '1', 'chkdown': '', + 'svname': '302e33d9-dee1-4de9-98d5-36329a06fb58', + 'hrsp_3xx': '0'}, + {'': '', 'status': 'UP', 'lastchg': '122', 'weight': '1', + 'slim': '200', 'pid': '1', 'comp_byp': '0', 'lastsess': '-1', + 'rate_lim': '', 'check_duration': '', 'rate': '0', + 'req_rate': '', 'check_status': '', 'econ': '0', + 'comp_out': '0', 'wredis': '0', 'dresp': '0', 'ereq': '', + 'tracked': '', 'comp_in': '0', + 'pxname': '432fc8b3-d446-48d4-bb64-13beb90e22bc', 'dreq': '0', + 'hrsp_5xx': '0', 'last_chk': '', 'check_code': '', 'sid': '0', + 'bout': '0', 'hrsp_1xx': '0', 'qlimit': '', 'hrsp_other': '0', + 'bin': '0', 'rtime': '0', 'smax': '0', 'req_tot': '', + 'lbtot': '0', 'stot': '0', 'wretr': '0', 'req_rate_max': '', + 'ttime': '0', 'iid': '3', 'hrsp_4xx': '0', 'chkfail': '', + 'hanafail': '', 'downtime': '0', 'qcur': '0', 'eresp': '0', + 'comp_rsp': '0', 'cli_abrt': '0', 'ctime': '0', 'qtime': '0', + 'srv_abrt': '0', 'throttle': '', 'last_agt': '', 'scur': '0', + 'type': '1', 'bck': '0', 'qmax': '0', 'rate_max': '0', + 'hrsp_2xx': '0', 'act': '1', 'chkdown': '0', + 'svname': 'BACKEND', 'hrsp_3xx': '0'}) + +SAMPLE_STATS_MSG = {'listeners': { + LISTENER_ID1: { + 'pools': {'432fc8b3-d446-48d4-bb64-13beb90e22bc': { + 'members': { + '302e33d9-dee1-4de9-98d5-36329a06fb58': + 'DOWN'}, + 'status': 'UP'}}, 'stats': { + 'totconns': 0, 'conns': 0, 'tx': 0, 'rx': 0}, + 'status': 'OPEN'}, + LISTENER_ID2: { + 'pools': {'432fc8b3-d446-48d4-bb64-13beb90e22bc': { + 'members': { + '302e33d9-dee1-4de9-98d5-36329a06fb58': + 'DOWN'}, + 'status': 'UP'}}, 'stats': { + 'totconns': 0, 'conns': 0, 'tx': 0, 'rx': 0}, + 'status': 'OPEN'}}, 'id': None, 'seq': 0} + + +class TestHealthDaemon(base.TestCase): + + def setUp(self): + super(TestHealthDaemon, self).setUp() + conf = oslo_fixture.Config(cfg.CONF) + conf.config(group="haproxy_amphora", base_path=BASE_PATH) + + @mock.patch('octavia.amphorae.backends.agent.' + 'api_server.util.get_listeners') + def test_list_sock_stat_files(self, mock_get_listener): + mock_get_listener.return_value = LISTENER_IDS + + health_daemon.list_sock_stat_files() + + files = health_daemon.list_sock_stat_files(BASE_PATH) + + expected_files = {LISTENER_ID1: BASE_PATH + '/' + + LISTENER_ID1 + '.sock', + LISTENER_ID2: BASE_PATH + '/' + + LISTENER_ID2 + '.sock'} + self.assertEqual(files, expected_files) + + @mock.patch('oslo_config.cfg.CONF.reload_config_files') + @mock.patch('octavia.amphorae.backends.health_daemon.' + 'health_daemon.build_stats_message') + @mock.patch('octavia.amphorae.backends.health_daemon.' + 'health_sender.UDPStatusSender') + def test_run_sender(self, mock_UDPStatusSender, mock_build_msg, + mock_reload_cfg): + sender_mock = mock.MagicMock() + dosend_mock = mock.MagicMock() + sender_mock.dosend = dosend_mock + mock_UDPStatusSender.return_value = sender_mock + mock_build_msg.side_effect = ['TEST', Exception('break')] + + test_queue = queue.Queue() + self.assertRaisesRegexp(Exception, 'break', + health_daemon.run_sender, test_queue) + + sender_mock.dosend.assert_called_once_with('TEST') + + # Test a reload event + mock_build_msg.reset_mock() + mock_build_msg.side_effect = ['TEST', Exception('break')] + test_queue.put('reload') + self.assertRaisesRegexp(Exception, 'break', + health_daemon.run_sender, test_queue) + mock_reload_cfg.assert_called_once_with() + + # Test the shutdown path + sender_mock.reset_mock() + dosend_mock.reset_mock() + mock_build_msg.reset_mock() + mock_build_msg.side_effect = ['TEST', 'TEST'] + test_queue.put('shutdown') + health_daemon.run_sender(test_queue) + sender_mock.dosend.assert_called_once_with('TEST') + + # Test an unkown command + mock_build_msg.reset_mock() + mock_build_msg.side_effect = ['TEST', Exception('break')] + test_queue.put('bogus') + self.assertRaisesRegexp(Exception, 'break', + health_daemon.run_sender, test_queue) + + @mock.patch('octavia.amphorae.backends.utils.haproxy_query.HAProxyQuery') + def test_get_stats(self, mock_query): + stats_query_mock = mock.MagicMock() + mock_query.return_value = stats_query_mock + + health_daemon.get_stats('TEST') + + stats_query_mock.show_stat.assert_called_once_with() + stats_query_mock.get_pool_status.assert_called_once_with() + + @mock.patch('octavia.amphorae.backends.agent.api_server.' + 'util.is_listener_running') + @mock.patch('octavia.amphorae.backends.health_daemon.' + 'health_daemon.get_stats') + @mock.patch('octavia.amphorae.backends.health_daemon.' + 'health_daemon.list_sock_stat_files') + def test_build_stats_message(self, mock_list_files, + mock_get_stats, mock_is_running): + mock_list_files.return_value = {LISTENER_ID1: 'TEST', + LISTENER_ID2: 'TEST2'} + + mock_is_running.return_value = True + mock_get_stats.return_value = SAMPLE_STATS, SAMPLE_POOL_STATUS + + msg = health_daemon.build_stats_message() + + self.assertEqual(msg, SAMPLE_STATS_MSG) + + mock_get_stats.assert_any_call('TEST') + mock_get_stats.assert_any_call('TEST2') + + @mock.patch('octavia.amphorae.backends.agent.api_server.' + 'util.is_listener_running') + @mock.patch('octavia.amphorae.backends.health_daemon.' + 'health_daemon.get_stats') + @mock.patch('octavia.amphorae.backends.health_daemon.' + 'health_daemon.list_sock_stat_files') + def test_build_stats_message_no_listener(self, mock_list_files, + mock_get_stats, + mock_is_running): + mock_list_files.return_value = {LISTENER_ID1: 'TEST', + LISTENER_ID2: 'TEST2'} + + mock_is_running.side_effect = [True, False] + mock_get_stats.return_value = SAMPLE_STATS, SAMPLE_POOL_STATUS + + health_daemon.build_stats_message() + + self.assertEqual(mock_get_stats.call_count, 1) + + @mock.patch('octavia.amphorae.backends.agent.api_server.' + 'util.is_listener_running') + @mock.patch('octavia.amphorae.backends.health_daemon.' + 'health_daemon.get_stats') + @mock.patch('octavia.amphorae.backends.health_daemon.' + 'health_daemon.list_sock_stat_files') + def test_build_stats_message_mismatch_pool(self, mock_list_files, + mock_get_stats, + mock_is_running): + mock_list_files.return_value = {LISTENER_ID1: 'TEST', + LISTENER_ID2: 'TEST2'} + + mock_is_running.return_value = True + mock_get_stats.return_value = SAMPLE_STATS, SAMPLE_BOGUS_POOL_STATUS + + msg = health_daemon.build_stats_message() + + self.assertEqual(msg['listeners'][LISTENER_ID1]['pools'], {}) diff --git a/octavia/tests/unit/amphorae/backends/health_daemon/test_health_sender.py b/octavia/tests/unit/amphorae/backends/health_daemon/test_health_sender.py new file mode 100644 index 0000000000..d81d25f964 --- /dev/null +++ b/octavia/tests/unit/amphorae/backends/health_daemon/test_health_sender.py @@ -0,0 +1,124 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import binascii +import random +import socket + +from oslo_config import cfg +from oslo_config import fixture as oslo_fixture +import six + +from octavia.amphorae.backends.health_daemon import health_sender +from octavia.tests.unit import base + +if six.PY2: + import mock +else: + import unittest.mock as mock + +IP = '192.0.2.15' +IP_PORT = '192.0.2.10:5555', '192.0.2.10:5555' +KEY = 'TEST' +PORT = random.randrange(1, 9000) +SAMPLE_MSG = {'testkey': 'TEST'} +SAMPLE_MSG_BIN = binascii.unhexlify('78daab562a492d2ec94ead54b252500a710d0e5' + '1aa050041b506245806e5c1971e79951818394e' + 'a6e71ad989ff950945f9573f4ab6f83e25db8ed7') + + +class TestHealthSender(base.TestCase): + + def setUp(self): + super(TestHealthSender, self).setUp() + self.conf = oslo_fixture.Config(cfg.CONF) + self.conf.config(group="health_manager", + controller_ip_port_list=IP_PORT) + self.conf.config(group="health_manager", + heartbeat_key=KEY) + + @mock.patch('socket.getaddrinfo') + @mock.patch('socket.socket') + def test_sender(self, mock_socket, mock_getaddrinfo): + socket_mock = mock.MagicMock() + mock_socket.return_value = socket_mock + sendto_mock = mock.MagicMock() + socket_mock.sendto = sendto_mock + + # Test when no addresses are returned + mock_getaddrinfo.return_value = [] + sender = health_sender.UDPStatusSender() + sender.dosend(SAMPLE_MSG) + + # Test IPv4 path + mock_getaddrinfo.return_value = [(socket.AF_INET, + socket.SOCK_DGRAM, + socket.IPPROTO_UDP, + '', + ('192.0.2.20', 80))] + sendto_mock.reset_mock() + + sender = health_sender.UDPStatusSender() + sender.dosend(SAMPLE_MSG) + + sendto_mock.assert_called_once_with(SAMPLE_MSG_BIN, + ('192.0.2.20', 80)) + + sendto_mock.reset_mock() + + # Test IPv6 path + mock_getaddrinfo.return_value = [(socket.AF_INET6, + socket.SOCK_DGRAM, + socket.IPPROTO_UDP, + '', + ('2001:0DB8::F00D', 80))] + + sender = health_sender.UDPStatusSender() + + sender.dosend(SAMPLE_MSG) + + sendto_mock.assert_called_once_with(SAMPLE_MSG_BIN, + ('2001:0DB8::F00D', 80)) + + sendto_mock.reset_mock() + + # Test invalid address family + + mock_getaddrinfo.return_value = [(socket.AF_UNIX, + socket.SOCK_DGRAM, + socket.IPPROTO_UDP, + '', + ('2001:0DB8::F00D', 80))] + + sender = health_sender.UDPStatusSender() + + sender.dosend(SAMPLE_MSG) + + self.assertFalse(sendto_mock.called) + + sendto_mock.reset_mock() + + # Test socket error + socket_mock.sendto.side_effect = socket.error + + mock_getaddrinfo.return_value = [(socket.AF_INET6, + socket.SOCK_DGRAM, + socket.IPPROTO_UDP, + '', + ('2001:0DB8::F00D', 80))] + + sender = health_sender.UDPStatusSender() + + # Should not raise an exception + sender.dosend(SAMPLE_MSG) diff --git a/octavia/tests/unit/amphorae/backends/health_daemon/test_sender.py b/octavia/tests/unit/amphorae/backends/health_daemon/test_sender.py deleted file mode 100644 index c13bb763fe..0000000000 --- a/octavia/tests/unit/amphorae/backends/health_daemon/test_sender.py +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright 2014 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import json -import os -import tempfile - -from octavia.tests.unit import base - - -class TestSender(base.TestCase): - def setUp(self): - super(TestSender, self).setUp() - self.setupConfigFile() - self.addCleanup(self.removeConfigFile) - - def setupConfigFile(self): - self.sampleconfig = tempfile.mkstemp() - conffile = os.fdopen(self.sampleconfig[0], 'w+') - cdata = {'delay': 10, - 'target': ['127.0.0.1', '::1'], - 'psk': 'fubar', - 'dport': 12345} - json.dump(cdata, conffile) - conffile.close() - - def removeConfigFile(self): - os.unlink(self.sampleconfig[1]) - - def test_message_output(self): - pass diff --git a/octavia/tests/unit/amphorae/backends/utils/test_haproxy_query.py b/octavia/tests/unit/amphorae/backends/utils/test_haproxy_query.py index 0ba9cca362..bfc0235812 100644 --- a/octavia/tests/unit/amphorae/backends/utils/test_haproxy_query.py +++ b/octavia/tests/unit/amphorae/backends/utils/test_haproxy_query.py @@ -12,6 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. +import socket + import mock from octavia.amphorae.backends.utils import haproxy_query as query @@ -31,11 +33,11 @@ STATS_SOCKET_SAMPLE = ( ",1,3,2,,0,,2,0,,0,L4TOUT,,30001,0,0,0,0,0,0,0,,,,0,0,,,,,-1,,,0,0,0,0,\n" "http-servers,BACKEND,0,0,0,0,200,0,0,0,0,0,,0,0,0,0,DOWN,0,0,0,,1,567,567" ",,1,3,0,,0,,1,0,,0,,,,0,0,0,0,0,0,,,,,0,0,0,0,0,0,-1,,,0,0,0,0,\n" - "tcp-servers,id-34833,0,0,0,0,,0,0,0,,0,,0,0,0,0,DOWN,1,1,0,1,1,560,560,," + "tcp-servers,id-34833,0,0,0,0,,0,0,0,,0,,0,0,0,0,UP,1,1,0,1,1,560,560,," "1,5,1,,0,,2,0,,0,L4TOUT,,30000,,,,,,,0,,,,0,0,,,,,-1,,,0,0,0,0,\n" - "tcp-servers,id-34836,0,0,0,0,,0,0,0,,0,,0,0,0,0,DOWN,1,1,0,1,1,552,552,," + "tcp-servers,id-34836,0,0,0,0,,0,0,0,,0,,0,0,0,0,UP,1,1,0,1,1,552,552,," "1,5,2,,0,,2,0,,0,L4TOUT,,30001,,,,,,,0,,,,0,0,,,,,-1,,,0,0,0,0,\n" - "tcp-servers,BACKEND,0,0,0,0,200,0,0,0,0,0,,0,0,0,0,DOWN,0,0,0,,1,552,552" + "tcp-servers,BACKEND,0,0,0,0,200,0,0,0,0,0,,0,0,0,0,UP,0,0,0,,1,552,552" ",,1,5,0,,0,,1,0,,0,,,,,,,,,,,,,,0,0,0,0,0,0,-1,,,0,0,0,0," ) @@ -62,23 +64,42 @@ class QueryTestCase(base.TestCase): self.q = query.HAProxyQuery('') super(QueryTestCase, self).setUp() + @mock.patch('socket.socket') + def test_query(self, mock_socket): + + sock = mock.MagicMock() + sock.connect.side_effect = [None, socket.error] + sock.recv.side_effect = ['testdata', None] + mock_socket.return_value = sock + + self.q._query('test') + + sock.connect.assert_called_once_with('') + sock.send.assert_called_once_with('test' + '\n') + sock.recv.assert_called_with(1024) + self.assertTrue(sock.close.called) + + self.assertRaisesRegexp(Exception, + 'HAProxy \'test\' query failed.', + self.q._query, 'test') + def test_get_pool_status(self): query_mock = mock.Mock() self.q._query = query_mock query_mock.return_value = STATS_SOCKET_SAMPLE self.assertEqual( {'tcp-servers': { - 'status': 'DOWN', + 'status': 'UP', 'uuid': 'tcp-servers', - 'members': [ - {'id-34833': 'DOWN'}, - {'id-34836': 'DOWN'}]}, + 'members': + {'id-34833': 'UP', + 'id-34836': 'UP'}}, 'http-servers': { 'status': 'DOWN', 'uuid': 'http-servers', - 'members': [ - {'id-34821': 'DOWN'}, - {'id-34824': 'DOWN'}]}}, + 'members': + {'id-34821': 'DOWN', + 'id-34824': 'DOWN'}}}, self.q.get_pool_status() ) diff --git a/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py b/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py index 368288792f..274926bf53 100644 --- a/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py +++ b/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py @@ -138,7 +138,7 @@ class AmphoraAPIClientTest(base.TestCase): def setUp(self): super(AmphoraAPIClientTest, self).setUp() self.driver = driver.AmphoraAPIClient() - self.base_url = "https://127.0.0.1:8443/0.5" + self.base_url = "https://127.0.0.1:9443/0.5" self.amp = models.Amphora(lb_network_ip='127.0.0.1', compute_id='123') self.port_info = dict(mac_address='123') diff --git a/octavia/tests/unit/amphorae/drivers/health/__init__.py b/octavia/tests/unit/amphorae/drivers/health/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py b/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py new file mode 100644 index 0000000000..d43a1690b2 --- /dev/null +++ b/octavia/tests/unit/amphorae/drivers/health/test_heartbeat_udp.py @@ -0,0 +1,151 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# Copyright (c) 2015 Rackspace +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import binascii +import random +import socket + +from oslo_config import cfg +from oslo_config import fixture as oslo_fixture +import six + +from octavia.amphorae.drivers.health import heartbeat_udp +from octavia.common import exceptions +from octavia.tests.unit import base + +if six.PY2: + import mock +else: + import unittest.mock as mock + +FAKE_ID = 1 +KEY = 'TEST' +IP = '192.0.2.10' +PORT = random.randrange(1, 9000) +RLIMIT = random.randrange(1, 100) + + +class TestHeartbeatUDP(base.TestCase): + + @mock.patch('socket.getaddrinfo') + @mock.patch('socket.socket') + def setUp(self, mock_socket, mock_getaddrinfo): + super(TestHeartbeatUDP, self).setUp() + self.mock_socket = mock_socket + self.mock_getaddrinfo = mock_getaddrinfo + self.mock_getaddrinfo.return_value = [range(1, 6)] + self.health_update = mock.Mock() + self.stats_update = mock.Mock() + + self.conf = oslo_fixture.Config(cfg.CONF) + self.conf.config(group="health_manager", heartbeat_key=KEY) + self.conf.config(group="health_manager", bind_ip=IP) + self.conf.config(group="health_manager", bind_port=PORT) + self.conf.config(group="health_manager", sock_rlimit=0) + + self.udp_status_getter = heartbeat_udp.UDPStatusGetter( + self.health_update, self.stats_update) + + @mock.patch('socket.getaddrinfo') + @mock.patch('socket.socket') + def test_update(self, mock_socket, mock_getaddrinfo): + socket_mock = mock.MagicMock() + mock_socket.return_value = socket_mock + mock_getaddrinfo.return_value = [range(1, 6)] + bind_mock = mock.MagicMock() + socket_mock.bind = bind_mock + + getter = heartbeat_udp.UDPStatusGetter( + None, None) + + self.mock_getaddrinfo.assert_called_with(IP, PORT, 0, 2) + self.assertEqual(self.udp_status_getter.sockaddr, 5) + self.mock_socket.assert_called_with(1, socket.SOCK_DGRAM) + bind_mock.assert_called_once_with((IP, PORT)) + + self.conf.config(group="health_manager", sock_rlimit=RLIMIT) + mock_getaddrinfo.return_value = [range(1, 6), range(1, 6)] + getter.update(KEY, IP, PORT) + + @mock.patch('socket.getaddrinfo') + @mock.patch('socket.socket') + def test_dorecv(self, mock_socket, mock_getaddrinfo): + socket_mock = mock.MagicMock() + mock_socket.return_value = socket_mock + mock_getaddrinfo.return_value = [range(1, 6)] + recvfrom = mock.MagicMock() + socket_mock.recvfrom = recvfrom + + getter = heartbeat_udp.UDPStatusGetter( + None, None) + + # key = 'TEST' msg = {"testkey": "TEST"} + sample_msg = ('78daab562a492d2ec94ead54b252500a710d0e5' + '1aa050041b506245806e5c1971e79951818394e' + 'a6e71ad989ff950945f9573f4ab6f83e25db8ed7') + bin_msg = binascii.unhexlify(sample_msg) + recvfrom.return_value = bin_msg, 2 + (obj, srcaddr) = getter.dorecv() + self.assertEqual(srcaddr, 2) + self.assertEqual(obj, {"testkey": "TEST"}) + + def test_check(self): + mock_dorecv = mock.Mock() + self.udp_status_getter.dorecv = mock_dorecv + mock_dorecv.side_effect = [(dict(id=FAKE_ID), 2)] + + self.udp_status_getter.check() + self.health_update.update_health.assert_called_once_with({'id': 1}) + self.stats_update.update_stats.assert_called_once_with({'id': 1}) + + @mock.patch('socket.getaddrinfo') + @mock.patch('socket.socket') + def test_check_no_mixins(self, mock_socket, mock_getaddrinfo): + self.mock_socket = mock_socket + self.mock_getaddrinfo = mock_getaddrinfo + self.mock_getaddrinfo.return_value = [range(1, 6)] + + mock_dorecv = mock.Mock() + self.udp_status_getter = heartbeat_udp.UDPStatusGetter( + None, None) + + self.udp_status_getter.dorecv = mock_dorecv + mock_dorecv.side_effect = [(dict(id=FAKE_ID), 2)] + + self.udp_status_getter.check() + + @mock.patch('socket.getaddrinfo') + @mock.patch('socket.socket') + def test_socket_except(self, mock_socket, mock_getaddrinfo): + self.assertRaises(exceptions.NetworkConfig, + heartbeat_udp.UDPStatusGetter, None, None) + + @mock.patch('concurrent.futures.ThreadPoolExecutor.submit') + @mock.patch('socket.getaddrinfo') + @mock.patch('socket.socket') + def test_check_exception(self, mock_socket, mock_getaddrinfo, mock_submit): + self.mock_socket = mock_socket + self.mock_getaddrinfo = mock_getaddrinfo + self.mock_getaddrinfo.return_value = [range(1, 6)] + + mock_dorecv = mock.Mock() + self.udp_status_getter = heartbeat_udp.UDPStatusGetter( + None, None) + + self.udp_status_getter.dorecv = mock_dorecv + mock_dorecv.side_effect = exceptions.InvalidHMACException + + self.udp_status_getter.check() + self.assertFalse(mock_submit.called) diff --git a/octavia/tests/unit/cmd/__init__.py b/octavia/tests/unit/cmd/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/octavia/tests/unit/cmd/test_health_manager.py b/octavia/tests/unit/cmd/test_health_manager.py new file mode 100644 index 0000000000..e165ee6b64 --- /dev/null +++ b/octavia/tests/unit/cmd/test_health_manager.py @@ -0,0 +1,92 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + +from octavia.cmd import health_manager +from octavia.tests.unit import base + +if six.PY2: + import mock +else: + import unittest.mock as mock + + +class TestHealthManagerCMD(base.TestCase): + + def setUp(self): + super(TestHealthManagerCMD, self).setUp() + + @mock.patch('octavia.controller.healthmanager.' + 'update_stats_mixin.UpdateStatsMixin') + @mock.patch('octavia.controller.healthmanager.' + 'update_health_mixin.UpdateHealthMixin') + @mock.patch('octavia.amphorae.drivers.health.' + 'heartbeat_udp.UDPStatusGetter') + def test_hm_listener(self, mock_getter, mock_health, mock_stats): + getter_mock = mock.MagicMock() + check_mock = mock.MagicMock() + getter_mock.check = check_mock + getter_mock.check.side_effect = [None, Exception('break')] + mock_getter.return_value = getter_mock + self.assertRaisesRegexp(Exception, 'break', + health_manager.hm_listener) + mock_getter.assert_called_once_with(mock_health(), mock_stats()) + self.assertEqual(getter_mock.check.call_count, 2) + + @mock.patch('octavia.controller.healthmanager.' + 'health_manager.HealthManager') + def test_hm_health_check(self, mock_health): + hm_mock = mock.MagicMock() + health_check_mock = mock.MagicMock() + hm_mock.health_check = health_check_mock + hm_mock.health_check.side_effect = [None, Exception('break')] + mock_health.return_value = hm_mock + self.assertRaisesRegexp(Exception, 'break', + health_manager.hm_health_check) + mock_health.assert_called_once_with() + self.assertEqual(hm_mock.health_check.call_count, 2) + + @mock.patch('multiprocessing.Process') + @mock.patch('octavia.common.service.prepare_service') + def test_main(self, mock_service, mock_process): + mock_listener_proc = mock.MagicMock() + mock_health_proc = mock.MagicMock() + + mock_process.side_effect = [mock_listener_proc, mock_health_proc] + + health_manager.main() + + mock_listener_proc.start.assert_called_once_with() + mock_health_proc.start.assert_called_once_with() + mock_listener_proc.join.assert_called_once_with() + mock_health_proc.join.assert_called_once_with() + + @mock.patch('multiprocessing.Process') + @mock.patch('octavia.common.service.prepare_service') + def test_main_keyboard_interrupt(self, mock_service, mock_process): + mock_listener_proc = mock.MagicMock() + mock_health_proc = mock.MagicMock() + mock_join = mock.MagicMock() + mock_join.side_effect = KeyboardInterrupt + mock_listener_proc.join = mock_join + + mock_process.side_effect = [mock_listener_proc, mock_health_proc] + + health_manager.main() + + mock_listener_proc.start.assert_called_once_with() + mock_health_proc.start.assert_called_once_with() + mock_listener_proc.join.assert_called_once_with() + self.assertFalse(mock_health_proc.join.called) diff --git a/octavia/tests/unit/compute/drivers/test_nova_driver.py b/octavia/tests/unit/compute/drivers/test_nova_driver.py index 251a0e0f50..17d25e1d9a 100644 --- a/octavia/tests/unit/compute/drivers/test_nova_driver.py +++ b/octavia/tests/unit/compute/drivers/test_nova_driver.py @@ -105,7 +105,7 @@ class TestNovaClient(base.TestCase): def test_status(self): status = self.manager.status(self.amphora.id) - self.assertEqual(constants.AMPHORA_UP, status) + self.assertEqual(constants.UP, status) def test_bad_status(self): self.manager.manager.get.side_effect = Exception diff --git a/octavia/tests/unit/controller/healthmanager/test_health_mixin.py b/octavia/tests/unit/controller/healthmanager/test_health_mixin.py index dd7e669608..cb735ea40c 100644 --- a/octavia/tests/unit/controller/healthmanager/test_health_mixin.py +++ b/octavia/tests/unit/controller/healthmanager/test_health_mixin.py @@ -14,6 +14,7 @@ from oslo_utils import uuidutils import six +import sqlalchemy from octavia.common import constants from octavia.controller.healthmanager import update_health_mixin as healthmixin @@ -32,36 +33,42 @@ class TestUpdateHealthMixin(base.TestCase): super(TestUpdateHealthMixin, self).setUp() self.hm = healthmixin.UpdateHealthMixin() + self.amphora_repo = mock.MagicMock() self.amphora_health_repo = mock.MagicMock() self.listener_repo = mock.MagicMock() + self.loadbalancer_repo = mock.MagicMock() self.member_repo = mock.MagicMock() self.pool_repo = mock.MagicMock() + self.hm.amphora_repo = self.amphora_repo + fake_lb = mock.MagicMock() + self.hm.amphora_repo.get_all_lbs_on_amphora.return_value = [fake_lb] self.hm.amphora_health_repo = self.amphora_health_repo self.hm.listener_repo = self.listener_repo + self.hm.listener_repo.count.return_value = 1 + self.hm.loadbalancer_repo = self.loadbalancer_repo self.hm.member_repo = self.member_repo self.hm.pool_repo = self.pool_repo @mock.patch('octavia.db.api.get_session') - @mock.patch('sqlalchemy.sql.func.now') - def test_update_health_Online(self, lastupdate, session): + def test_update_health_Online(self, session): health = { - "amphora-status": constants.AMPHORA_UP, - "amphora-id": self.FAKE_UUID_1, + "id": self.FAKE_UUID_1, "listeners": { - "listener-id-1": {"listener-status": constants.ONLINE, - "members": {"member-id-1": constants.ONLINE} + "listener-id-1": {"status": constants.OPEN, "pools": { + "pool-id-1": {"status": constants.UP, + "members": {"member-id-1": constants.UP} } - + } + } } } session.return_value = 'blah' - lastupdate.return_value = '2014-02-12' self.hm.update_health(health) - self.assertTrue(self.amphora_health_repo.update.called) + self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member for listener_id, listener in six.iteritems( @@ -70,42 +77,241 @@ class TestUpdateHealthMixin(base.TestCase): self.listener_repo.update.assert_any_call( 'blah', listener_id, operating_status=constants.ONLINE) - for member_id, member in six.iteritems( - listener.get('members', {})): - self.member_repo.update.assert_any_call( - 'blah', id=member_id, operating_status=constants.ONLINE) + for pool_id, pool in six.iteritems(listener.get('pools', {})): + + self.hm.pool_repo.update.assert_any_call( + 'blah', pool_id, operating_status=constants.ONLINE) + + for member_id, member in six.iteritems( + pool.get('members', {})): + self.member_repo.update.assert_any_call( + 'blah', id=member_id, + operating_status=constants.ONLINE) + + self.hm.listener_repo.count.return_value = 2 + + self.hm.update_health(health) @mock.patch('octavia.db.api.get_session') - @mock.patch('sqlalchemy.sql.func.now') - def test_update_health_Error(self, lastupdate, session): + def test_update_health_member_down(self, session): health = { - "amphora-status": constants.AMPHORA_DOWN, - "amphora-id": self.FAKE_UUID_1, + "id": self.FAKE_UUID_1, "listeners": { - "listener-id-1": {"listener-status": constants.ERROR, - "members": {"member-id-1": constants.ERROR} - }, - "listener-id-2": {"listener-status": constants.ERROR, - "members": {"member-id-2": constants.ERROR} + "listener-id-1": {"status": constants.OPEN, "pools": { + "pool-id-1": {"status": constants.UP, + "members": {"member-id-1": constants.DOWN} } + } + } } } session.return_value = 'blah' - lastupdate.return_value = '2014-02-12' self.hm.update_health(health) - self.assertFalse(self.amphora_health_repo.update.called) + self.assertTrue(self.amphora_health_repo.replace.called) # test listener, member for listener_id, listener in six.iteritems( health.get('listeners', {})): self.listener_repo.update.assert_any_call( - 'blah', listener_id, operating_status=constants.ERROR) + 'blah', listener_id, operating_status=constants.ONLINE) - for member_id, member in six.iteritems( - listener.get('members', {})): - self.member_repo.update.assert_any_call( - 'blah', id=member_id, operating_status=constants.ERROR) + for pool_id, pool in six.iteritems(listener.get('pools', {})): + + self.hm.pool_repo.update.assert_any_call( + 'blah', pool_id, operating_status=constants.DEGRADED) + + for member_id, member in six.iteritems( + pool.get('members', {})): + + self.member_repo.update.assert_any_call( + 'blah', id=member_id, + operating_status=constants.ERROR) + + self.hm.listener_repo.count.return_value = 2 + + self.hm.update_health(health) + + @mock.patch('octavia.db.api.get_session') + def test_update_health_list_full_member_down(self, session): + + health = { + "id": self.FAKE_UUID_1, + "listeners": { + "listener-id-1": {"status": constants.FULL, "pools": { + "pool-id-1": {"status": constants.UP, + "members": {"member-id-1": constants.DOWN} + } + } + } + } + } + + session.return_value = 'blah' + + self.hm.update_health(health) + self.assertTrue(self.amphora_health_repo.replace.called) + + # test listener, member + for listener_id, listener in six.iteritems( + health.get('listeners', {})): + + self.listener_repo.update.assert_any_call( + 'blah', listener_id, operating_status=constants.DEGRADED) + + for pool_id, pool in six.iteritems(listener.get('pools', {})): + + self.hm.pool_repo.update.assert_any_call( + 'blah', pool_id, operating_status=constants.DEGRADED) + + for member_id, member in six.iteritems( + pool.get('members', {})): + + self.member_repo.update.assert_any_call( + 'blah', id=member_id, + operating_status=constants.ERROR) + + self.hm.listener_repo.count.return_value = 2 + + self.hm.update_health(health) + + @mock.patch('octavia.db.api.get_session') + def test_update_health_Error(self, session): + + health = { + "id": self.FAKE_UUID_1, + "listeners": { + "listener-id-1": {"status": constants.OPEN, "pools": { + "pool-id-1": {"status": constants.DOWN, + "members": {"member-id-1": constants.DOWN} + } + } + } + } + } + + session.return_value = 'blah' + + self.hm.update_health(health) + self.assertTrue(self.amphora_health_repo.replace.called) + + # test listener, member + for listener_id, listener in six.iteritems( + health.get('listeners', {})): + + self.listener_repo.update.assert_any_call( + 'blah', listener_id, operating_status=constants.ONLINE) + + for pool_id, pool in six.iteritems(listener.get('pools', {})): + + self.hm.pool_repo.update.assert_any_call( + 'blah', pool_id, operating_status=constants.ERROR) + + for member_id, member in six.iteritems( + pool.get('members', {})): + + self.member_repo.update.assert_any_call( + 'blah', id=member_id, operating_status=constants.ERROR) + + # Test the logic code paths + @mock.patch('octavia.db.api.get_session') + def test_update_health_Full(self, session): + + health = { + "id": self.FAKE_UUID_1, + "listeners": { + "listener-id-1": {"status": constants.FULL, "pools": { + "pool-id-1": {"status": constants.DOWN, + "members": {"member-id-1": constants.DOWN} + } + } + }, + "listener-id-2": {"status": constants.FULL, "pools": { + "pool-id-2": {"status": constants.UP, + "members": {"member-id-2": constants.UP} + } + } + }, + "listener-id-3": {"status": constants.OPEN, "pools": { + "pool-id-3": {"status": constants.UP, + "members": {"member-id-3": constants.UP, + "member-id-31": constants.DOWN} + } + } + }, + "listener-id-4": {"status": "bogus", "pools": { + "pool-id-4": {"status": "bogus", + "members": {"member-id-4": "bogus"} + } + } + } + } + } + + session.return_value = 'blah' + + self.hm.update_health(health) + + # test listener + self.listener_repo.update.assert_any_call( + 'blah', "listener-id-1", operating_status=constants.DEGRADED) + self.listener_repo.update.assert_any_call( + 'blah', "listener-id-2", operating_status=constants.DEGRADED) + self.pool_repo.update.assert_any_call( + 'blah', "pool-id-1", operating_status=constants.ERROR) + self.pool_repo.update.assert_any_call( + 'blah', "pool-id-2", operating_status=constants.ONLINE) + self.pool_repo.update.assert_any_call( + 'blah', "pool-id-3", operating_status=constants.DEGRADED) + + # Test code paths where objects are not found in the database + @mock.patch('octavia.db.api.get_session') + def test_update_health_Not_Found(self, session): + + health = { + "id": self.FAKE_UUID_1, + "listeners": { + "listener-id-1": {"status": constants.OPEN, "pools": { + "pool-id-1": {"status": constants.UP, + "members": {"member-id-1": constants.UP} + } + } + } + } + } + + session.return_value = 'blah' + + self.hm.listener_repo.update.side_effect = ( + [sqlalchemy.orm.exc.NoResultFound]) + self.hm.member_repo.update.side_effect = ( + [sqlalchemy.orm.exc.NoResultFound]) + self.hm.pool_repo.update.side_effect = ( + [sqlalchemy.orm.exc.NoResultFound]) + self.hm.loadbalancer_repo.update.side_effect = ( + [sqlalchemy.orm.exc.NoResultFound]) + + self.hm.update_health(health) + self.assertTrue(self.amphora_health_repo.replace.called) + + # test listener, member + for listener_id, listener in six.iteritems( + health.get('listeners', {})): + + self.listener_repo.update.assert_any_call( + 'blah', listener_id, operating_status=constants.ONLINE) + + for pool_id, pool in six.iteritems(listener.get('pools', {})): + + self.hm.pool_repo.update.assert_any_call( + 'blah', pool_id, operating_status=constants.ONLINE) + + for member_id, member in six.iteritems( + pool.get('members', {})): + + self.member_repo.update.assert_any_call( + 'blah', id=member_id, + operating_status=constants.ONLINE) diff --git a/octavia/tests/unit/controller/healthmanager/test_update_stats_mixin.py b/octavia/tests/unit/controller/healthmanager/test_update_stats_mixin.py new file mode 100644 index 0000000000..01f253486e --- /dev/null +++ b/octavia/tests/unit/controller/healthmanager/test_update_stats_mixin.py @@ -0,0 +1,72 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import random + +from oslo_utils import uuidutils +import six + +from octavia.common import constants +from octavia.controller.healthmanager import update_stats_mixin as statsmixin +import octavia.tests.unit.base as base + +if six.PY2: + import mock +else: + import unittest.mock as mock + + +class TestUpdateStatsMixin(base.TestCase): + + def setUp(self): + super(TestUpdateStatsMixin, self).setUp() + + self.sm = statsmixin.UpdateStatsMixin() + self.listener_stats_repo = mock.MagicMock() + self.sm.listener_stats_repo = self.listener_stats_repo + + self.bytes_in = random.randrange(1000000000) + self.bytes_out = random.randrange(1000000000) + self.active_conns = random.randrange(1000000000) + self.total_conns = random.randrange(1000000000) + self.loadbalancer_id = uuidutils.generate_uuid() + self.listener_id = uuidutils.generate_uuid() + + @mock.patch('octavia.db.api.get_session') + def test_update_stats(self, session): + + health = { + "id": self.loadbalancer_id, + "listeners": { + self.listener_id: {"status": constants.OPEN, + "stats": {"conns": self.active_conns, + "totconns": self.total_conns, + "rx": self.bytes_in, + "tx": self.bytes_out}, + "pools": {"pool-id-1": + {"status": constants.UP, + "members": + {"member-id-1": constants.ONLINE} + } + } + }}} + + session.return_value = 'blah' + + self.sm.update_stats(health) + + self.listener_stats_repo.replace.assert_called_once_with( + 'blah', self.listener_id, bytes_in=self.bytes_in, + bytes_out=self.bytes_out, active_connections=self.active_conns, + total_connections=self.total_conns) diff --git a/octavia/tests/unit/controller/worker/tasks/test_compute_tasks.py b/octavia/tests/unit/controller/worker/tasks/test_compute_tasks.py index 52bd0d6c60..7371178b1b 100644 --- a/octavia/tests/unit/controller/worker/tasks/test_compute_tasks.py +++ b/octavia/tests/unit/controller/worker/tasks/test_compute_tasks.py @@ -83,12 +83,16 @@ class TestComputeTasks(base.TestCase): super(TestComputeTasks, self).setUp() + @mock.patch('jinja2.Environment.get_template') + @mock.patch('octavia.amphorae.backends.agent.' + 'agent_jinja_cfg.AgentJinjaTemplater.' + 'build_agent_config', return_value='test_conf') @mock.patch('stevedore.driver.DriverManager.driver') - def test_compute_create(self, mock_driver): + def test_compute_create(self, mock_driver, mock_conf, mock_jinja): createcompute = compute_tasks.ComputeCreate() - mock_driver.build.side_effect = [COMPUTE_ID, TestException('test')] + mock_driver.build.return_value = COMPUTE_ID # Test execute() compute_id = createcompute.execute(_amphora_mock.id, ports=[_port]) @@ -101,14 +105,16 @@ class TestComputeTasks(base.TestCase): sec_groups=AMP_SEC_GROUPS, network_ids=[AMP_NET], port_ids=[PORT_ID], - config_drive_files=None) + config_drive_files={'/etc/octavia/' + 'amphora-agent.conf': 'test_conf'}) # Make sure it returns the expected compute_id assert(compute_id == COMPUTE_ID) # Test that a build exception is raised createcompute = compute_tasks.ComputeCreate() - self.assertRaises(TestException, + + self.assertRaises(TypeError, createcompute.execute, _amphora_mock, config_drive_files='test_cert') @@ -127,12 +133,16 @@ class TestComputeTasks(base.TestCase): createcompute.revert(COMPUTE_ID, _amphora_mock.id) + @mock.patch('jinja2.Environment.get_template') + @mock.patch('octavia.amphorae.backends.agent.' + 'agent_jinja_cfg.AgentJinjaTemplater.' + 'build_agent_config', return_value='test_conf') @mock.patch('stevedore.driver.DriverManager.driver') - def test_compute_create_cert(self, mock_driver): + def test_compute_create_cert(self, mock_driver, mock_conf, mock_jinja): createcompute = compute_tasks.CertComputeCreate() - mock_driver.build.side_effect = [COMPUTE_ID, TestException('test')] + mock_driver.build.return_value = COMPUTE_ID m = mock.mock_open(read_data='test') with mock.patch('%s.open' % BUILTINS, m, create=True): # Test execute() @@ -150,7 +160,8 @@ class TestComputeTasks(base.TestCase): port_ids=[], config_drive_files={ '/etc/octavia/certs/server.pem': 'test_cert', - '/etc/octavia/certs/client_ca.pem': m.return_value}) + '/etc/octavia/certs/client_ca.pem': m.return_value, + '/etc/octavia/amphora-agent.conf': 'test_conf'}) # Make sure it returns the expected compute_id assert(compute_id == COMPUTE_ID) @@ -158,9 +169,10 @@ class TestComputeTasks(base.TestCase): # Test that a build exception is raised with mock.patch('%s.open' % BUILTINS, m, create=True): createcompute = compute_tasks.ComputeCreate() - self.assertRaises(TestException, + self.assertRaises(TypeError, createcompute.execute, - _amphora_mock, config_drive_files='test_cert') + _amphora_mock, + config_drive_files='test_cert') # Test revert()