Oleg Bondarev 957533f685 LBaaS: update status of members according to health statistics
Added members health stats reporting to the haproxy driver.
During pool stats update db plugin checks for members stats
and updates members statuses if any

Fixes bug 1160125

Change-Id: I77bf13615607fcf91bf877c228811ea8008b2457
2013-09-03 10:14:14 +00:00

254 lines
9.2 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost
import os
import shutil
import socket
import netaddr
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.common import exceptions
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
from neutron.services.loadbalancer import constants as lb_const
from neutron.services.loadbalancer.drivers.haproxy import cfg as hacfg
LOG = logging.getLogger(__name__)
NS_PREFIX = 'qlbaas-'
class HaproxyNSDriver(object):
def __init__(self, root_helper, state_path, vif_driver, vip_plug_callback):
self.root_helper = root_helper
self.state_path = state_path
self.vif_driver = vif_driver
self.vip_plug_callback = vip_plug_callback
self.pool_to_port_id = {}
def create(self, logical_config):
pool_id = logical_config['pool']['id']
namespace = get_ns_name(pool_id)
self._plug(namespace, logical_config['vip']['port'])
self._spawn(logical_config)
def update(self, logical_config):
pool_id = logical_config['pool']['id']
pid_path = self._get_state_file_path(pool_id, 'pid')
extra_args = ['-sf']
extra_args.extend(p.strip() for p in open(pid_path, 'r'))
self._spawn(logical_config, extra_args)
def _spawn(self, logical_config, extra_cmd_args=()):
pool_id = logical_config['pool']['id']
namespace = get_ns_name(pool_id)
conf_path = self._get_state_file_path(pool_id, 'conf')
pid_path = self._get_state_file_path(pool_id, 'pid')
sock_path = self._get_state_file_path(pool_id, 'sock')
hacfg.save_config(conf_path, logical_config, sock_path)
cmd = ['haproxy', '-f', conf_path, '-p', pid_path]
cmd.extend(extra_cmd_args)
ns = ip_lib.IPWrapper(self.root_helper, namespace)
ns.netns.execute(cmd)
# remember the pool<>port mapping
self.pool_to_port_id[pool_id] = logical_config['vip']['port']['id']
def destroy(self, pool_id):
namespace = get_ns_name(pool_id)
ns = ip_lib.IPWrapper(self.root_helper, namespace)
pid_path = self._get_state_file_path(pool_id, 'pid')
# kill the process
kill_pids_in_file(self.root_helper, pid_path)
# unplug the ports
if pool_id in self.pool_to_port_id:
self._unplug(namespace, self.pool_to_port_id[pool_id])
# remove the configuration directory
conf_dir = os.path.dirname(self._get_state_file_path(pool_id, ''))
if os.path.isdir(conf_dir):
shutil.rmtree(conf_dir)
ns.garbage_collect_namespace()
def exists(self, pool_id):
namespace = get_ns_name(pool_id)
root_ns = ip_lib.IPWrapper(self.root_helper)
socket_path = self._get_state_file_path(pool_id, 'sock')
if root_ns.netns.exists(namespace) and os.path.exists(socket_path):
try:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(socket_path)
return True
except socket.error:
pass
return False
def get_stats(self, pool_id):
socket_path = self._get_state_file_path(pool_id, 'sock')
TYPE_BACKEND_REQUEST = 2
TYPE_SERVER_REQUEST = 4
if os.path.exists(socket_path):
parsed_stats = self._get_stats_from_socket(
socket_path,
entity_type=TYPE_BACKEND_REQUEST | TYPE_SERVER_REQUEST)
pool_stats = self._get_backend_stats(parsed_stats)
pool_stats['members'] = self._get_servers_stats(parsed_stats)
return pool_stats
else:
LOG.warn(_('Stats socket not found for pool %s') % pool_id)
return {}
def _get_backend_stats(self, parsed_stats):
TYPE_BACKEND_RESPONSE = '1'
for stats in parsed_stats:
if stats['type'] == TYPE_BACKEND_RESPONSE:
unified_stats = dict((k, stats.get(v, ''))
for k, v in hacfg.STATS_MAP.items())
return unified_stats
return {}
def _get_servers_stats(self, parsed_stats):
TYPE_SERVER_RESPONSE = '2'
res = {}
for stats in parsed_stats:
if stats['type'] == TYPE_SERVER_RESPONSE:
res[stats['svname']] = {
lb_const.STATS_STATUS: (constants.INACTIVE
if stats['status'] == 'DOWN'
else constants.ACTIVE),
lb_const.STATS_HEALTH: stats['check_status'],
lb_const.STATS_FAILED_CHECKS: stats['chkfail']
}
return res
def _get_stats_from_socket(self, socket_path, entity_type):
try:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(socket_path)
s.send('show stat -1 %s -1\n' % entity_type)
raw_stats = ''
chunk_size = 1024
while True:
chunk = s.recv(chunk_size)
raw_stats += chunk
if len(chunk) < chunk_size:
break
return self._parse_stats(raw_stats)
except socket.error as e:
LOG.warn(_('Error while connecting to stats socket: %s'), e)
return {}
def _parse_stats(self, raw_stats):
stat_lines = raw_stats.splitlines()
if len(stat_lines) < 2:
return []
stat_names = [name.strip('# ') for name in stat_lines[0].split(',')]
res_stats = []
for raw_values in stat_lines[1:]:
stat_values = [value.strip() for value in raw_values.split(',')]
res_stats.append(dict(zip(stat_names, stat_values)))
return res_stats
def remove_orphans(self, known_pool_ids):
raise NotImplementedError()
def _get_state_file_path(self, pool_id, kind, ensure_state_dir=True):
"""Returns the file name for a given kind of config file."""
confs_dir = os.path.abspath(os.path.normpath(self.state_path))
conf_dir = os.path.join(confs_dir, pool_id)
if ensure_state_dir:
if not os.path.isdir(conf_dir):
os.makedirs(conf_dir, 0o755)
return os.path.join(conf_dir, kind)
def _plug(self, namespace, port, reuse_existing=True):
self.vip_plug_callback('plug', port)
interface_name = self.vif_driver.get_device_name(Wrap(port))
if ip_lib.device_exists(interface_name, self.root_helper, namespace):
if not reuse_existing:
raise exceptions.PreexistingDeviceFailure(
dev_name=interface_name
)
else:
self.vif_driver.plug(
port['network_id'],
port['id'],
interface_name,
port['mac_address'],
namespace=namespace
)
cidrs = [
'%s/%s' % (ip['ip_address'],
netaddr.IPNetwork(ip['subnet']['cidr']).prefixlen)
for ip in port['fixed_ips']
]
self.vif_driver.init_l3(interface_name, cidrs, namespace=namespace)
gw_ip = port['fixed_ips'][0]['subnet'].get('gateway_ip')
if gw_ip:
cmd = ['route', 'add', 'default', 'gw', gw_ip]
ip_wrapper = ip_lib.IPWrapper(self.root_helper,
namespace=namespace)
ip_wrapper.netns.execute(cmd, check_exit_code=False)
def _unplug(self, namespace, port_id):
port_stub = {'id': port_id}
self.vip_plug_callback('unplug', port_stub)
interface_name = self.vif_driver.get_device_name(Wrap(port_stub))
self.vif_driver.unplug(interface_name, namespace=namespace)
# NOTE (markmcclain) For compliance with interface.py which expects objects
class Wrap(object):
"""A light attribute wrapper for compatibility with the interface lib."""
def __init__(self, d):
self.__dict__.update(d)
def __getitem__(self, key):
return self.__dict__[key]
def get_ns_name(namespace_id):
return NS_PREFIX + namespace_id
def kill_pids_in_file(root_helper, pid_path):
if os.path.exists(pid_path):
with open(pid_path, 'r') as pids:
for pid in pids:
pid = pid.strip()
try:
utils.execute(['kill', '-9', pid], root_helper)
except RuntimeError:
LOG.exception(
_('Unable to kill haproxy process: %s'),
pid
)