Implement enhancement to help triage memory alarms

Implement the instrumentation/logging of the
top 10 memory intensive processes.
The goal is to have additional info in the collect
logs to help determine why a memory alarm is raised.

In this change includes 3 new logs lines to the collectd log file.
These lines show every 5 min, logging the top 10 memory rss intensive
processes.

To do this we are walking the the cgroups folders and capturing the pids.
We then query each pid to locate the rss memory consumption.

The first is shows top 10 memory rss processes for the platform
 showing the top 10 of each pid under cgroups,
This excludes the cgroups/ks8-infra folder, so it will only show process
 considered to be platform related.

The second log line contains the top 10 pids under the cgroups/ks8-infra
 folder. This will contain all of the pods and k8s related pids.
We are validating the namespace of each pod as to exclude k8s-addon
 related pids, so it will only show process for k8s-system.

The third line we are querying all of the pids system wide and
 surface the top 10 memory intensive process.

Seeing duplicates of process names is expected as we are looking at each
 individual pid rather than the process name itself.

For Reference.
K8S_NAMESPACE_SYSTEM = [
'kube-system', 'armada', 'cert-manager', 'portieris',
'vault', 'notification', 'platform-deployment-manager',
'flux-helm', 'metrics-server']

K8S_NAMESPACE_ADDON = ['monitor', 'openstack']

Test Plan:

PASS: built image successfully.
PASS: Installed image successfully.
PASS: bootstrap and unlock successful
PASS: build updated latest image.
PASS: Check collectd service is loaded and active.
PASS: Verify collectd CPU and Memory Consumption Growth
PASS: Tested On Debian using the latest master Branch.

Example Output:
2022-06-27T17:38:50.953 controller-0 collectd[2607872]: info The top 10 memory rss processes for the platform are :
[('ceph-mon', '950.58 MiB'), ('ceph-osd', '317.41 MiB'),
 ('sysinv-conducto', '215.15 MiB'), ('ceph-mgr', '198.13 MiB'),
 ('sysinv-api', '182.20 MiB'), ('kubelet', '158.93 MiB'),
 ('etcd', '154.39 MiB'), ('python', '131.34 MiB'),
 ('python', '131.33 MiB'), ('sysinv-api', '113.25 MiB')]
2022-06-27T17:38:50.954 controller-0 collectd[2607872]: info The top 10 memory rss processes for the Kubernetes System are :
[('java', '4.52 GiB'), ('java', '1.32 GiB'),
 ('java', '829.71 MiB'), ('java', '796.45 MiB'),
 ('kube-apiserver', '613.39 MiB'), ('java', '564.62 MiB'),
 ('node', '544.01 MiB'), ('helm-controller', '352.71 MiB'),
 ('metricbeat', '187.61 MiB'), ('metricbeat', '165.78 MiB')]
2022-06-27T17:38:50.954 controller-0 collectd[2607872]: info The top 10 memory rss processes Kubernetes Addon are :
[('java', '4.52 GiB'), ('java', '1.32 GiB'),
 ('java', '829.71 MiB'), ('java', '796.45 MiB'),
 ('java', '564.62 MiB'), ('node', '544.01 MiB'),
 ('metricbeat', '187.61 MiB'), ('metricbeat', '165.78 MiB'),
 ('filebeat', '123.93 MiB'), ('java', '87.34 MiB')]
2022-06-27T17:38:50.956 controller-0 collectd[2607872]: info The top 10 memory rss processes overall are :
[('java', '4.52 GiB'), ('java', '1.32 GiB'),
 ('ceph-mon', '950.58 MiB'), ('java', '829.71 MiB'),
 ('java', '796.45 MiB'), ('kube-apiserver', '613.39 MiB'),
 ('java', '564.62 MiB'), ('node', '544.01 MiB'),
 ('helm-controller', '352.71 MiB'), ('ceph-osd', '317.41 MiB')]

Closes-Bug: 1973815

Signed-off-by: Cesar Bombonate <cesar.pompeudebarrosbombonate@windriver.com>
Change-Id: I70c7222434fc3d2f0fee339dfe779a49d89e3294
This commit is contained in:
Cesar Bombonate
2022-05-04 15:48:58 -04:00
committed by Cesar Bombonate
parent 5175b23379
commit 2d6ba853b7
2 changed files with 308 additions and 6 deletions

View File

@@ -1,5 +1,5 @@
#
# Copyright (c) 2018-2021 Wind River Systems, Inc.
# Copyright (c) 2018-2022 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@@ -18,15 +18,19 @@
# SELECT * FROM memory_value WHERE type='absolute' AND type_instance='used'
#
############################################################################
import datetime
import itertools
import os
import collectd
import plugin_common as pc
import re
import socket
import time
import collectd
from kubernetes.client.rest import ApiException
import plugin_common as pc
PLUGIN = 'platform memory usage'
PLUGIN_NORM = '4K memory usage'
PLUGIN_NUMA = '4K numa memory usage'
@@ -35,6 +39,7 @@ PLUGIN_DEBUG = 'DEBUG memory'
# Memory cgroup controller
MEMCONT = pc.CGROUP_ROOT + '/memory'
MEMORY_STAT = 'memory.stat'
MEMORY_PIDS = 'cgroup.procs'
# Linux memory
MEMINFO = '/proc/meminfo'
@@ -43,8 +48,10 @@ OVERCOMMIT = '/proc/sys/vm/overcommit_memory'
# Common regex pattern match groups
re_dict = re.compile(r'^(\w+)\s+(\d+)')
re_pid = re.compile(r'^\d')
re_word = re.compile(r'^(\w+)')
re_uid = re.compile(r'^pod(\S+)')
re_path_uid = re.compile(r'\/pod(\S+)\/')
re_blank = re.compile(r'^\s*$')
re_comment = re.compile(r'^\s*[#!]')
re_nonword = re.compile(r'^\s*\W')
@@ -350,6 +357,256 @@ def calc_normal_memory_nodes():
return normal_nodes
def get_cgroups_procs_paths(directory):
"""Loops through all the directories and returns a list of paths
Args:
directory ():
Returns:
paths []
"""
paths = []
if os.path.isdir(directory):
current_path, directories, files = next(os.walk(directory))
paths.append(current_path)
if directories is not None and not "":
for new_directory in directories:
current_path = os.path.join(directory, new_directory)
paths.extend(get_cgroups_procs_paths(current_path))
return paths
def get_cgroup_pid(path):
"""Get Pid for a specific cgroup path
This represents the aggregate usage for child cgroups.
Returns an array containing entries: 'PIDS'.
Args:
path ():
Returns:
pids: []
"""
pids = []
proc = '/'.join([path, MEMORY_PIDS])
try:
with open(proc, 'r') as fd:
for line in fd:
match = re_pid.search(line)
if match:
pids.append(match.string)
except IOError:
# Silently ignore IO errors. It is likely the cgroup disappeared.
pass
return pids
def format_iec(size, precision=2):
"""Converts to IEC standard size units with 2 decimal of precision.
KiB, MiB, GiB, TiB.
Args:
size ():
precision: Number of decimal places
Returns:
Size: Format String
"""
# Values from VmRSS are already in KB
mi = 1024.0
gi = 1024.0 * 1024.0
tb = 1024.0 * 1024.0 * 1024.0
decimal = "." + str(precision) + "f"
if size >= tb:
return '%s %s' % (format(size / tb, decimal), "TiB")
if size >= gi:
return '%s %s' % (format(size / gi, decimal), "GiB")
elif size >= mi:
return '%s %s' % (format(size / mi, decimal), "MiB")
else:
return '%s %s' % (format(size, decimal), "KiB")
def get_pid_name(pid):
"""Returns Pid name from /proc/{}/comm
Args:
pid: Int
Returns:
name: Str
"""
name = ""
try:
pid_path = '/proc/{}/comm'.format(str(pid).strip('\n'))
with open(pid_path, 'r') as comm:
name = comm.readline()
except IOError:
# Silently ignore IO errors. It is likely the cgroup disappeared.
pass
return name.strip('\n')
def get_pid_rss(pid):
"""Get memory usage per pid based /proc/pid/status
Args:
pid: the pid to retrive the value from.
Returns:
Memory or None
"""
m = {}
memory = 0.0
re_dict = re.compile(r'^(\w+:)\s+(\d+)')
try:
pid_path = '/proc/{}/status'.format(str(pid).strip('\n'))
with open(pid_path, 'r') as status:
for line in status:
match = re_dict.search(line)
if match:
k = match.group(1).strip(":")
v = match.group(2)
m[k] = v
memory = \
float(m.get('VmRSS', 0))
except IOError:
# Silently ignore IO errors. It is likely the cgroup disappeared.
pass
return memory
def get_platform_memory_per_process():
"""Get memory usage per pid based on cgroup hierarchy.
Returns:
memory: dict
"""
platform_pids = []
k8s_system_pids = []
pod_pids = []
memory = {
pc.GROUP_PROCESSES: {
pc.GROUP_PODS: {},
pc.GROUP_PLATFORM: {},
pc.GROUP_K8S_SYSTEM: {},
pc.GROUP_K8S_ADDON: {},
pc.GROUP_OVERALL: {}
}
}
platform = memory[pc.GROUP_PROCESSES][pc.GROUP_PLATFORM]
k8s_system = memory[pc.GROUP_PROCESSES][pc.GROUP_K8S_SYSTEM]
pod_group = memory[pc.GROUP_PROCESSES][pc.GROUP_PODS]
# Overall memory usage
pids = get_cgroup_pid(MEMCONT)
for pid in pids:
name = str(get_pid_name(pid))
rss = get_pid_rss(pid)
if rss > 0 and not None:
platform[int(pid)] = {'rss': float(rss),
'name': str(name)}
# Walk the first level cgroups and get the pids
# (e.g., docker, k8s-infra, user.slice, system.slice, machine.slice)
if os.path.exists(os.path.join(MEMCONT)):
starting_dir = next(os.walk(MEMCONT))[1]
for directory in starting_dir:
if directory != str(pc.K8S_ROOT):
cg_path = '/'.join([MEMCONT, directory])
paths = get_cgroups_procs_paths(cg_path)
for path in paths:
platform_pids.extend(get_cgroup_pid(path))
# Walk the kubepods hierarchy to the pod level and get the pids.
# We can safely ignore reading this if the path does not exist.
# The path won't exist on non-K8S nodes. The path is created as part of
# kubernetes configuration.
path = '/'.join([MEMCONT, pc.K8S_ROOT])
if os.path.exists(path):
starting_dir = next(os.walk(path))[1]
for directory in starting_dir:
cg_path = '/'.join([path, directory])
paths = get_cgroups_procs_paths(cg_path)
for path in paths:
if '/pod' in path:
match = re_path_uid.search(path)
if match:
uid = match.group(1)
pod_pids.extend(get_cgroup_pid(path))
for pid in pod_pids:
name = str(get_pid_name(pid))
rss = get_pid_rss(pid)
if rss > 0 and not None:
if uid not in pod_group:
pod_group[uid] = {}
if pid not in pod_group[uid]:
pod_group[uid][int(pid)] = {}
pod_group[uid][int(pid)] = {
'rss': float(rss),
'name': str(name)}
else:
k8s_system_pids.extend(get_cgroup_pid(path))
for pid in platform_pids:
name = str(get_pid_name(pid))
rss = get_pid_rss(pid)
if rss > 0 and not None:
if name not in platform:
platform[int(pid)] = {}
platform[int(pid)] = {'rss': float(rss),
'name': str(name)}
for pid in k8s_system_pids:
name = str(get_pid_name(pid))
rss = get_pid_rss(pid)
if rss > 0 and not None:
if pid not in k8s_system:
k8s_system[int(pid)] = {}
k8s_system[int(pid)] = {'rss': float(rss),
'name': str(name)}
# This returns the system overall process and stores it into a dict.
memory[pc.GROUP_PROCESSES][pc.GROUP_OVERALL] = dict(itertools.chain(
platform.items(), k8s_system.items(), pod_group[uid].items()))
return memory
def output_top_10_pids(pid_dict, message):
"""Outputs the top 10 pids with the formatted message.
Args:
pid_dict: Dict The Dictionary of PIDs with Name and RSS
message: Formatted String, the template message to be output.
"""
# Check that pid_dict has values
if not pid_dict:
return
proc = []
# Sort the dict based on Rss value from highest to lowest.
sorted_pid_dict = sorted(pid_dict.items(), key=lambda x: x[1]['rss'],
reverse=True)
# Convert sorted_pid_dict into a list
[proc.append((i[1].get('name'), format_iec(i[1].get('rss')))) for i in
sorted_pid_dict]
# Output top 10 entries of the list
collectd.info(message % (str(proc[:10])))
def config_func(config):
"""Configure the memory usage plugin."""
@@ -486,6 +743,50 @@ def read_func():
# K8S platform addons usage, i.e., non-essential: monitor, openstack
if pod.namespace in pc.K8S_NAMESPACE_ADDON:
memory[pc.GROUP_OVERALL][pc.GROUP_K8S_ADDON] += MiB
# Limit output to every 5 minutes and after 29 seconds to avoid duplication
if datetime.datetime.now().minute % 5 == 0 and datetime.datetime.now(
).second > 29:
# Populate the memory per process dictionary to output results
pids = get_platform_memory_per_process()
platform = pids[pc.GROUP_PROCESSES][pc.GROUP_PLATFORM]
group_pods = pids[pc.GROUP_PROCESSES][pc.GROUP_PODS]
k8s_system = pids[pc.GROUP_PROCESSES][pc.GROUP_K8S_SYSTEM]
k8s_addon = pids[pc.GROUP_PROCESSES][pc.GROUP_K8S_ADDON]
overall = pids[pc.GROUP_PROCESSES][pc.GROUP_OVERALL]
# Using pod UID to determine output label memory usage by K8S pod
# per process
for uid in group_pods:
if uid in obj._cache:
pod = obj._cache[uid]
else:
collectd.warning('%s: uid %s not found' % (PLUGIN, uid))
continue
# K8S platform system usage i.e.,kube-system, armada, etc.
if pod.namespace in pc.K8S_NAMESPACE_SYSTEM:
for key in group_pods[uid]:
k8s_system[key] = group_pods[uid][key]
# K8S platform addons usage, i.e., non-essential: monitor, openstack
if pod.namespace in pc.K8S_NAMESPACE_ADDON:
for key in group_pods[uid]:
k8s_addon[key] = group_pods[uid][key]
message = 'The top 10 memory rss processes for the platform are : %s'
output_top_10_pids(platform, message)
message = 'The top 10 memory rss processes for the Kubernetes System are :%s'
output_top_10_pids(k8s_system, message)
message = 'The top 10 memory rss processes Kubernetes Addon are :%s'
output_top_10_pids(k8s_addon, message)
message = 'The top 10 memory rss processes overall are :%s'
output_top_10_pids(overall, message)
# Calculate base memory usage (i.e., normal memory, exclude K8S and VMs)
# e.g., docker, system.slice, user.slice

View File

@@ -39,7 +39,7 @@ K8S_MODULE_MAJOR_VERSION = int(K8S_MODULE_VERSION.split('.')[0])
KUBELET_CONF = '/etc/kubernetes/kubelet.conf'
SSL_TLS_SUPPRESS = True
# Standard units conversion parameters (mebi, kibi)
# Standard units' conversion parameters (mebi, kibi)
# Reference: https://en.wikipedia.org/wiki/Binary_prefix
Mi = 1048576
Ki = 1024
@@ -65,6 +65,7 @@ GROUP_PLATFORM = 'platform'
GROUP_BASE = 'base'
GROUP_K8S_SYSTEM = 'kube-system'
GROUP_K8S_ADDON = 'kube-addon'
GROUP_PROCESSES = 'cgroup-processes'
# Groups included in platform - this excludes apps
PLATFORM_GROUPS = [GROUP_BASE, GROUP_K8S_SYSTEM]
@@ -220,7 +221,7 @@ class PluginObject(object):
# Name : node_ready
#
# Description: Test for node ready condition.
# Currently that's just a thresholded count
# Currently, that's just a threshold count
#
# Parameters : plugin name
#
@@ -586,7 +587,7 @@ class PluginObject(object):
#
# Name : make_http_request
#
# Description: Issue an http request to the specified URL.
# Description: Issue a http request to the specified URL.
# Load and return the response
# Handling execution errors
#