473 lines
19 KiB
Python
473 lines
19 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import ipaddress
|
|
import os
|
|
import re
|
|
import subprocess
|
|
|
|
from octavia_lib.common import constants as lib_consts
|
|
from oslo_log import log as logging
|
|
|
|
from octavia.amphorae.backends.agent.api_server import util
|
|
from octavia.common import constants
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
KERNEL_LVS_PATH = '/proc/net/ip_vs'
|
|
KERNEL_LVS_STATS_PATH = '/proc/net/ip_vs_stats'
|
|
LVS_KEY_REGEX = re.compile(r"RemoteAddress:Port\s+(.*$)")
|
|
V4_RS_VALUE_REGEX = re.compile(r"(\w{8}:\w{4})\s+(.*$)")
|
|
V4_HEX_IP_REGEX = re.compile(r"(\w{2})(\w{2})(\w{2})(\w{2})")
|
|
V6_RS_VALUE_REGEX = re.compile(r"(\[[[\w{4}:]+\b\]:\w{4})\s+(.*$)")
|
|
|
|
NS_REGEX = re.compile(r"net_namespace\s(\w+-\w+)")
|
|
V4_VS_REGEX = re.compile(r"virtual_server\s([\d+\.]+\b)\s(\d{1,5})")
|
|
V4_RS_REGEX = re.compile(r"real_server\s([\d+\.]+\b)\s(\d{1,5})")
|
|
V6_VS_REGEX = re.compile(r"virtual_server\s([\w*:]+\b)\s(\d{1,5})")
|
|
V6_RS_REGEX = re.compile(r"real_server\s([\w*:]+\b)\s(\d{1,5})")
|
|
CONFIG_COMMENT_REGEX = re.compile(
|
|
r"#\sConfiguration\sfor\s(\w+)\s(\w{8}-\w{4}-\w{4}-\w{4}-\w{12})")
|
|
DISABLED_CONFIG_COMMENT_REGEX = re.compile(
|
|
r"#\s(\w+)\s(\w{8}-\w{4}-\w{4}-\w{4}-\w{12}) is disabled")
|
|
|
|
CHECKER_REGEX = re.compile(r"(MISC_CHECK|HTTP_GET|TCP_CHECK)")
|
|
|
|
|
|
def read_kernel_file(ns_name, file_path):
|
|
cmd = ("ip netns exec {ns} cat {lvs_stat_path}".format(
|
|
ns=ns_name, lvs_stat_path=file_path))
|
|
try:
|
|
output = subprocess.check_output(cmd.split(),
|
|
stderr=subprocess.STDOUT)
|
|
except subprocess.CalledProcessError as e:
|
|
LOG.error("Failed to get kernel lvs status in ns %(ns_name)s "
|
|
"%(kernel_lvs_path)s: %(err)s %(out)s",
|
|
{'ns_name': ns_name, 'kernel_lvs_path': file_path,
|
|
'err': e, 'out': e.output})
|
|
raise e
|
|
# py3 treat the output as bytes type.
|
|
if isinstance(output, bytes):
|
|
output = output.decode('utf-8')
|
|
return output
|
|
|
|
|
|
def get_listener_realserver_mapping(ns_name, listener_ip_port,
|
|
health_monitor_enabled):
|
|
# returned result:
|
|
# actual_member_result = {'rs_ip:listened_port': {
|
|
# 'status': 'UP',
|
|
# 'Forward': forward_type,
|
|
# 'Weight': 5,
|
|
# 'ActiveConn': 0,
|
|
# 'InActConn': 0
|
|
# }}
|
|
listener_ip, listener_port = listener_ip_port.rsplit(':', 1)
|
|
ip_obj = ipaddress.ip_address(listener_ip.strip('[]'))
|
|
output = read_kernel_file(ns_name, KERNEL_LVS_PATH).split('\n')
|
|
if ip_obj.version == 4:
|
|
ip_to_hex_format = "%.8X" % ip_obj._ip
|
|
else:
|
|
ip_to_hex_format = r'\[' + ip_obj.exploded + r'\]'
|
|
port_hex_format = "%.4X" % int(listener_port)
|
|
idex = ip_to_hex_format + ':' + port_hex_format
|
|
|
|
if health_monitor_enabled:
|
|
member_status = constants.UP
|
|
else:
|
|
member_status = constants.NO_CHECK
|
|
|
|
actual_member_result = {}
|
|
find_target_block = False
|
|
result_keys = []
|
|
for line in output:
|
|
if 'RemoteAddress:Port' in line:
|
|
result_keys = re.split(r'\s+',
|
|
LVS_KEY_REGEX.findall(line)[0].strip())
|
|
elif ((line.startswith(constants.PROTOCOL_UDP) or
|
|
line.startswith(lib_consts.PROTOCOL_SCTP)) and
|
|
find_target_block):
|
|
break
|
|
elif re.match(r'^(UDP|SCTP)\s+%s\s+\w+' % idex,
|
|
line):
|
|
find_target_block = True
|
|
elif find_target_block and line:
|
|
rs_is_ipv4 = True
|
|
all_values = V4_RS_VALUE_REGEX.findall(line)
|
|
# If can not get all_values with ipv4 regex, then this line must be
|
|
# a ipv6 real server record.
|
|
if not all_values:
|
|
all_values = V6_RS_VALUE_REGEX.findall(line)
|
|
rs_is_ipv4 = False
|
|
|
|
all_values = all_values[0]
|
|
ip_port = all_values[0]
|
|
result_values = re.split(r"\s+", all_values[1].strip())
|
|
member_ip, member_port = ip_port.rsplit(':', 1)
|
|
port_string = str(int(member_port, 16))
|
|
if rs_is_ipv4:
|
|
ip_string = ipaddress.ip_address(int(member_ip, 16)).compressed
|
|
member_ip_port_string = ip_string + ':' + port_string
|
|
else:
|
|
ip_string = ipaddress.ip_address(
|
|
member_ip.strip('[]')).compressed
|
|
member_ip_port_string = '[' + ip_string + ']:' + port_string
|
|
result_key_count = len(result_keys)
|
|
for index in range(result_key_count):
|
|
if member_ip_port_string not in actual_member_result:
|
|
actual_member_result[
|
|
member_ip_port_string] = {'status': member_status,
|
|
result_keys[index]:
|
|
result_values[index]}
|
|
else:
|
|
# The other values include the weight
|
|
actual_member_result[
|
|
member_ip_port_string][
|
|
result_keys[index]] = result_values[index]
|
|
continue
|
|
|
|
return find_target_block, actual_member_result
|
|
|
|
|
|
def get_lvs_listener_resource_ipports_nsname(listener_id):
|
|
# resource_ipport_mapping = {'Listener': {'id': listener-id,
|
|
# 'ipport': ipport},
|
|
# 'Pool': {'id': pool-id},
|
|
# 'Members': [{'id': member-id-1,
|
|
# 'ipport': ipport},
|
|
# {'id': member-id-2,
|
|
# 'ipport': ipport}],
|
|
# 'HealthMonitor': {'id': healthmonitor-id}}
|
|
resource_ipport_mapping = {}
|
|
with open(util.keepalived_lvs_cfg_path(listener_id), 'r') as f:
|
|
cfg = f.read()
|
|
ns_name = NS_REGEX.findall(cfg)[0]
|
|
listener_ip_port = V4_VS_REGEX.findall(cfg)
|
|
if not listener_ip_port:
|
|
listener_ip_port = V6_VS_REGEX.findall(cfg)
|
|
listener_ip_port = listener_ip_port[0] if listener_ip_port else []
|
|
|
|
disabled_resource_ids = DISABLED_CONFIG_COMMENT_REGEX.findall(cfg)
|
|
|
|
listener_disabled = any(True
|
|
for resource in disabled_resource_ids
|
|
if resource[0] == 'Listener')
|
|
if listener_disabled:
|
|
return None, ns_name
|
|
|
|
if not listener_ip_port:
|
|
# If not get listener_ip_port from the lvs config file,
|
|
# that means the listener's default pool have no enabled member
|
|
# yet. But at this moment, we can get listener_id and ns_name, so
|
|
# for this function, we will just return ns_name
|
|
return resource_ipport_mapping, ns_name
|
|
|
|
cfg_line = cfg.split('\n')
|
|
rs_ip_port_list = []
|
|
for line in cfg_line:
|
|
if 'real_server' in line:
|
|
res = V4_RS_REGEX.findall(line)
|
|
if not res:
|
|
res = V6_RS_REGEX.findall(line)
|
|
rs_ip_port_list.append(res[0])
|
|
|
|
resource_type_ids = CONFIG_COMMENT_REGEX.findall(cfg)
|
|
|
|
for resource_type, resource_id in resource_type_ids:
|
|
value = {'id': resource_id}
|
|
if resource_type == 'Member':
|
|
resource_type = '%ss' % resource_type
|
|
if resource_type not in resource_ipport_mapping:
|
|
value = [value]
|
|
if resource_type not in resource_ipport_mapping:
|
|
resource_ipport_mapping[resource_type] = value
|
|
elif resource_type == 'Members':
|
|
resource_ipport_mapping[resource_type].append(value)
|
|
|
|
disabled_member_ids = [
|
|
resource[1]
|
|
for resource in disabled_resource_ids
|
|
if resource[0] == 'Member'
|
|
]
|
|
|
|
resource_type = 'Members'
|
|
for member_id in disabled_member_ids:
|
|
value = {'id': member_id,
|
|
'ipport': None}
|
|
if resource_type not in resource_ipport_mapping:
|
|
resource_ipport_mapping[resource_type] = []
|
|
resource_ipport_mapping[resource_type].append(value)
|
|
|
|
if rs_ip_port_list:
|
|
rs_ip_port_count = len(rs_ip_port_list)
|
|
for index in range(rs_ip_port_count):
|
|
member_ip = ipaddress.ip_address(
|
|
rs_ip_port_list[index][0])
|
|
if member_ip.version == 6:
|
|
rs_ip_port_list[index] = (
|
|
'[' + member_ip.compressed + ']',
|
|
rs_ip_port_list[index][1])
|
|
resource_ipport_mapping['Members'][index]['ipport'] = (
|
|
rs_ip_port_list[index][0] + ':' +
|
|
rs_ip_port_list[index][1])
|
|
|
|
listener_ip = ipaddress.ip_address(listener_ip_port[0])
|
|
if listener_ip.version == 6:
|
|
listener_ip_port = (
|
|
'[' + listener_ip.compressed + ']', listener_ip_port[1])
|
|
resource_ipport_mapping['Listener']['ipport'] = (
|
|
listener_ip_port[0] + ':' + listener_ip_port[1])
|
|
|
|
return resource_ipport_mapping, ns_name
|
|
|
|
|
|
def get_lvs_listener_pool_status(listener_id):
|
|
(resource_ipport_mapping,
|
|
ns_name) = get_lvs_listener_resource_ipports_nsname(listener_id)
|
|
if 'Pool' not in resource_ipport_mapping:
|
|
return {}
|
|
if 'Members' not in resource_ipport_mapping:
|
|
return {'lvs': {
|
|
'uuid': resource_ipport_mapping['Pool']['id'],
|
|
'status': constants.UP,
|
|
'members': {}
|
|
}}
|
|
|
|
config_path = util.keepalived_lvs_cfg_path(listener_id)
|
|
pids_pathes = util.keepalived_lvs_pids_path(listener_id)
|
|
|
|
config_stat = os.stat(config_path)
|
|
check_pid_stat = os.stat(pids_pathes[2])
|
|
|
|
# Indicates that keepalived configuration has been updated but the service
|
|
# has yet to be restarted.
|
|
# NOTE: It only works if we are doing a RESTART on configuration change,
|
|
# Iaa34db6cb1dfed98e96a585c5d105e263c7efa65 forces a RESTART instead of a
|
|
# RELOAD, we need to be careful if we want to switch back to RELOAD after
|
|
# updating to a recent keepalived release.
|
|
restarting = config_stat.st_mtime > check_pid_stat.st_mtime
|
|
|
|
with open(util.keepalived_lvs_cfg_path(listener_id), 'r') as f:
|
|
cfg = f.read()
|
|
hm_enabled = len(CHECKER_REGEX.findall(cfg)) > 0
|
|
|
|
_, realserver_result = get_listener_realserver_mapping(
|
|
ns_name, resource_ipport_mapping['Listener']['ipport'],
|
|
hm_enabled)
|
|
pool_status = constants.UP
|
|
member_results = {}
|
|
if realserver_result:
|
|
member_ip_port_list = [
|
|
member['ipport'] for member in resource_ipport_mapping['Members']]
|
|
down_member_ip_port_set = set(
|
|
member_ip_port_list) - set(list(realserver_result.keys()))
|
|
|
|
for member_ip_port in member_ip_port_list:
|
|
member_id = None
|
|
for member in resource_ipport_mapping['Members']:
|
|
if member['ipport'] == member_ip_port:
|
|
member_id = member['id']
|
|
if member_ip_port is None:
|
|
status = constants.MAINT
|
|
elif member_ip_port in down_member_ip_port_set:
|
|
status = (
|
|
constants.RESTARTING if restarting else constants.DOWN)
|
|
elif int(realserver_result[member_ip_port]['Weight']) == 0:
|
|
status = constants.DRAIN
|
|
else:
|
|
status = realserver_result[member_ip_port]['status']
|
|
|
|
if member_id:
|
|
member_results[member_id] = status
|
|
else:
|
|
if hm_enabled:
|
|
pool_status = constants.DOWN
|
|
|
|
for member in resource_ipport_mapping['Members']:
|
|
if member['ipport'] is None:
|
|
member_results[member['id']] = constants.MAINT
|
|
elif hm_enabled:
|
|
member_results[member['id']] = (
|
|
constants.RESTARTING if restarting else constants.DOWN)
|
|
else:
|
|
member_results[member['id']] = constants.NO_CHECK
|
|
|
|
return {
|
|
'lvs':
|
|
{
|
|
'uuid': resource_ipport_mapping['Pool']['id'],
|
|
'status': pool_status,
|
|
'members': member_results
|
|
}
|
|
}
|
|
|
|
|
|
def get_ipvsadm_info(ns_name, is_stats_cmd=False):
|
|
cmd_list = ['ip', 'netns', 'exec', ns_name, 'ipvsadm', '-Ln']
|
|
# use --exact to ensure output is integer only
|
|
if is_stats_cmd:
|
|
cmd_list += ['--stats', '--exact']
|
|
output = subprocess.check_output(cmd_list, stderr=subprocess.STDOUT)
|
|
if isinstance(output, bytes):
|
|
output = output.decode('utf-8')
|
|
output = output.split('\n')
|
|
fields = []
|
|
# mapping = {'listeneripport': {'Linstener': vs_values,
|
|
# 'members': [rs_values1, rs_values2]}}
|
|
last_key = None
|
|
value_mapping = dict()
|
|
output_line_num = len(output)
|
|
|
|
def split_line(line):
|
|
return re.sub(r'\s+', ' ', line.strip()).split(' ')
|
|
for line_num in range(output_line_num):
|
|
# ipvsadm -Ln
|
|
if 'Flags' in output[line_num]:
|
|
fields = split_line(output[line_num])
|
|
elif fields and 'Flags' in fields and fields.index('Flags') == len(
|
|
fields) - 1:
|
|
fields.extend(split_line(output[line_num]))
|
|
# ipvsadm -Ln --stats
|
|
elif 'Prot' in output[line_num]:
|
|
fields = split_line(output[line_num])
|
|
elif 'RemoteAddress' in output[line_num]:
|
|
start = fields.index('LocalAddress:Port') + 1
|
|
temp_fields = fields[start:]
|
|
fields.extend(split_line(output[line_num]))
|
|
fields.extend(temp_fields)
|
|
# here we get the all fields
|
|
elif (constants.PROTOCOL_UDP in output[line_num] or
|
|
lib_consts.PROTOCOL_SCTP in output[line_num]):
|
|
# if UDP/TCP in this line, we can know this line is
|
|
# VS configuration.
|
|
vs_values = split_line(output[line_num])
|
|
for value in vs_values:
|
|
if ':' in value:
|
|
value_mapping[value] = {'Listener': vs_values,
|
|
'Members': []}
|
|
last_key = value
|
|
break
|
|
# here the line must be a RS which belongs to a VS
|
|
elif '->' in output[line_num] and last_key:
|
|
rs_values = split_line(output[line_num])
|
|
rs_values.remove('->')
|
|
value_mapping[last_key]['Members'].append(rs_values)
|
|
|
|
index = fields.index('->')
|
|
vs_fields = fields[:index]
|
|
if 'Flags' in vs_fields:
|
|
vs_fields.remove('Flags')
|
|
rs_fields = fields[index + 1:]
|
|
for key in list(value_mapping.keys()):
|
|
value_mapping[key]['Listener'] = list(
|
|
zip(vs_fields, value_mapping[key]['Listener']))
|
|
member_res = []
|
|
for member_value in value_mapping[key]['Members']:
|
|
member_res.append(list(zip(rs_fields, member_value)))
|
|
value_mapping[key]['Members'] = member_res
|
|
|
|
return value_mapping
|
|
|
|
|
|
def get_lvs_listeners_stats():
|
|
lvs_listener_ids = util.get_lvs_listeners()
|
|
need_check_listener_ids = [
|
|
listener_id for listener_id in lvs_listener_ids
|
|
if util.is_lvs_listener_running(listener_id)]
|
|
ipport_mapping = dict()
|
|
listener_stats_res = dict()
|
|
for check_listener_id in need_check_listener_ids:
|
|
# resource_ipport_mapping = {'Listener': {'id': listener-id,
|
|
# 'ipport': ipport},
|
|
# 'Pool': {'id': pool-id},
|
|
# 'Members': [{'id': member-id-1,
|
|
# 'ipport': ipport},
|
|
# {'id': member-id-2,
|
|
# 'ipport': ipport}],
|
|
# 'HealthMonitor': {'id': healthmonitor-id}}
|
|
resource_ipport_mapping, ns_name = (
|
|
get_lvs_listener_resource_ipports_nsname(check_listener_id))
|
|
|
|
# Listener is disabled, we don't need to send an update
|
|
if resource_ipport_mapping is None:
|
|
continue
|
|
|
|
# Since we found the keepalived running, acknowledge the listener
|
|
# in the heartbeat. If this listener has a pool and members,
|
|
# the stats will be updated later in the code flow.
|
|
listener_stats_res.update({
|
|
check_listener_id: {
|
|
'stats': {
|
|
'bout': 0,
|
|
'bin': 0,
|
|
'scur': 0,
|
|
'stot': 0,
|
|
'ereq': 0},
|
|
'status': constants.OPEN}})
|
|
|
|
# If we can not read the lvs configuration from file, that means
|
|
# the pool of this listener may own zero enabled member, but the
|
|
# keepalived process is running. So we need to skip it.
|
|
if not resource_ipport_mapping:
|
|
continue
|
|
ipport_mapping.update({check_listener_id: resource_ipport_mapping})
|
|
|
|
# So here, if we can not get any ipport_mapping,
|
|
# we do nothing, just return
|
|
if not ipport_mapping:
|
|
return listener_stats_res
|
|
|
|
# contains bout, bin, scur, stot, ereq, status
|
|
# bout(OutBytes), bin(InBytes), stot(Conns) from cmd ipvsadm -Ln --stats
|
|
# scur(ActiveConn) from cmd ipvsadm -Ln
|
|
# status, can see configuration in any cmd, treat it as OPEN
|
|
# ereq is still 0, as UDP case does not support it.
|
|
scur_res = get_ipvsadm_info(constants.AMPHORA_NAMESPACE)
|
|
stats_res = get_ipvsadm_info(constants.AMPHORA_NAMESPACE,
|
|
is_stats_cmd=True)
|
|
for listener_id, ipport in ipport_mapping.items():
|
|
listener_ipport = ipport['Listener']['ipport']
|
|
# This would be in Error, wait for the next loop to sync for the
|
|
# listener at this moment. Also this is for skip the case no enabled
|
|
# member in UDP listener, so we don't check it for failover.
|
|
if listener_ipport not in scur_res or listener_ipport not in stats_res:
|
|
continue
|
|
|
|
scur, bout, bin, stot, ereq = 0, 0, 0, 0, 0
|
|
# As all results contain this listener, so its status should be OPEN
|
|
status = constants.OPEN
|
|
# Get scur
|
|
for m in scur_res[listener_ipport]['Members']:
|
|
for item in m:
|
|
if item[0] == 'ActiveConn':
|
|
scur += int(item[1])
|
|
|
|
# Get bout, bin, stot
|
|
for item in stats_res[listener_ipport]['Listener']:
|
|
if item[0] == 'Conns':
|
|
stot = int(item[1])
|
|
elif item[0] == 'OutBytes':
|
|
bout = int(item[1])
|
|
elif item[0] == 'InBytes':
|
|
bin = int(item[1])
|
|
|
|
listener_stats_res.update({
|
|
listener_id: {
|
|
'stats': {
|
|
'bout': bout,
|
|
'bin': bin,
|
|
'scur': scur,
|
|
'stot': stot,
|
|
'ereq': ereq},
|
|
'status': status}})
|
|
|
|
return listener_stats_res
|