diff --git a/collectd-extensions/src/memory.py b/collectd-extensions/src/memory.py index 607c707..525b0cf 100755 --- a/collectd-extensions/src/memory.py +++ b/collectd-extensions/src/memory.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2018-2021 Wind River Systems, Inc. +# Copyright (c) 2018-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -18,15 +18,19 @@ # SELECT * FROM memory_value WHERE type='absolute' AND type_instance='used' # ############################################################################ + +import datetime +import itertools import os -import collectd -import plugin_common as pc import re import socket import time +import collectd from kubernetes.client.rest import ApiException +import plugin_common as pc + PLUGIN = 'platform memory usage' PLUGIN_NORM = '4K memory usage' PLUGIN_NUMA = '4K numa memory usage' @@ -35,6 +39,7 @@ PLUGIN_DEBUG = 'DEBUG memory' # Memory cgroup controller MEMCONT = pc.CGROUP_ROOT + '/memory' MEMORY_STAT = 'memory.stat' +MEMORY_PIDS = 'cgroup.procs' # Linux memory MEMINFO = '/proc/meminfo' @@ -43,8 +48,10 @@ OVERCOMMIT = '/proc/sys/vm/overcommit_memory' # Common regex pattern match groups re_dict = re.compile(r'^(\w+)\s+(\d+)') +re_pid = re.compile(r'^\d') re_word = re.compile(r'^(\w+)') re_uid = re.compile(r'^pod(\S+)') +re_path_uid = re.compile(r'\/pod(\S+)\/') re_blank = re.compile(r'^\s*$') re_comment = re.compile(r'^\s*[#!]') re_nonword = re.compile(r'^\s*\W') @@ -350,6 +357,256 @@ def calc_normal_memory_nodes(): return normal_nodes +def get_cgroups_procs_paths(directory): + """Loops through all the directories and returns a list of paths + + Args: + directory (): + Returns: + paths [] + """ + + paths = [] + if os.path.isdir(directory): + current_path, directories, files = next(os.walk(directory)) + paths.append(current_path) + if directories is not None and not "": + for new_directory in directories: + current_path = os.path.join(directory, new_directory) + paths.extend(get_cgroups_procs_paths(current_path)) + return paths + + +def get_cgroup_pid(path): + """Get Pid for a specific cgroup path + + This represents the aggregate usage for child cgroups. + Returns an array containing entries: 'PIDS'. + Args: + path (): + Returns: + pids: [] + """ + + pids = [] + + proc = '/'.join([path, MEMORY_PIDS]) + try: + with open(proc, 'r') as fd: + for line in fd: + match = re_pid.search(line) + if match: + pids.append(match.string) + + except IOError: + # Silently ignore IO errors. It is likely the cgroup disappeared. + pass + return pids + + +def format_iec(size, precision=2): + """Converts to IEC standard size units with 2 decimal of precision. + + KiB, MiB, GiB, TiB. + + Args: + size (): + precision: Number of decimal places + + Returns: + Size: Format String + """ + # Values from VmRSS are already in KB + mi = 1024.0 + gi = 1024.0 * 1024.0 + tb = 1024.0 * 1024.0 * 1024.0 + + decimal = "." + str(precision) + "f" + + if size >= tb: + return '%s %s' % (format(size / tb, decimal), "TiB") + if size >= gi: + return '%s %s' % (format(size / gi, decimal), "GiB") + elif size >= mi: + return '%s %s' % (format(size / mi, decimal), "MiB") + else: + return '%s %s' % (format(size, decimal), "KiB") + + +def get_pid_name(pid): + """Returns Pid name from /proc/{}/comm + + Args: + pid: Int + + Returns: + name: Str + """ + + name = "" + try: + pid_path = '/proc/{}/comm'.format(str(pid).strip('\n')) + with open(pid_path, 'r') as comm: + name = comm.readline() + except IOError: + # Silently ignore IO errors. It is likely the cgroup disappeared. + pass + + return name.strip('\n') + + +def get_pid_rss(pid): + """Get memory usage per pid based /proc/pid/status + + Args: + pid: the pid to retrive the value from. + + Returns: + Memory or None + """ + + m = {} + memory = 0.0 + re_dict = re.compile(r'^(\w+:)\s+(\d+)') + try: + pid_path = '/proc/{}/status'.format(str(pid).strip('\n')) + with open(pid_path, 'r') as status: + for line in status: + match = re_dict.search(line) + if match: + k = match.group(1).strip(":") + v = match.group(2) + m[k] = v + + memory = \ + float(m.get('VmRSS', 0)) + except IOError: + # Silently ignore IO errors. It is likely the cgroup disappeared. + pass + return memory + + +def get_platform_memory_per_process(): + """Get memory usage per pid based on cgroup hierarchy. + + Returns: + memory: dict + """ + + platform_pids = [] + k8s_system_pids = [] + pod_pids = [] + memory = { + pc.GROUP_PROCESSES: { + pc.GROUP_PODS: {}, + pc.GROUP_PLATFORM: {}, + pc.GROUP_K8S_SYSTEM: {}, + pc.GROUP_K8S_ADDON: {}, + pc.GROUP_OVERALL: {} + } + } + + platform = memory[pc.GROUP_PROCESSES][pc.GROUP_PLATFORM] + k8s_system = memory[pc.GROUP_PROCESSES][pc.GROUP_K8S_SYSTEM] + pod_group = memory[pc.GROUP_PROCESSES][pc.GROUP_PODS] + + # Overall memory usage + pids = get_cgroup_pid(MEMCONT) + for pid in pids: + name = str(get_pid_name(pid)) + rss = get_pid_rss(pid) + if rss > 0 and not None: + platform[int(pid)] = {'rss': float(rss), + 'name': str(name)} + + # Walk the first level cgroups and get the pids + # (e.g., docker, k8s-infra, user.slice, system.slice, machine.slice) + if os.path.exists(os.path.join(MEMCONT)): + starting_dir = next(os.walk(MEMCONT))[1] + for directory in starting_dir: + if directory != str(pc.K8S_ROOT): + cg_path = '/'.join([MEMCONT, directory]) + paths = get_cgroups_procs_paths(cg_path) + for path in paths: + platform_pids.extend(get_cgroup_pid(path)) + + # Walk the kubepods hierarchy to the pod level and get the pids. + # We can safely ignore reading this if the path does not exist. + # The path won't exist on non-K8S nodes. The path is created as part of + # kubernetes configuration. + path = '/'.join([MEMCONT, pc.K8S_ROOT]) + if os.path.exists(path): + starting_dir = next(os.walk(path))[1] + for directory in starting_dir: + cg_path = '/'.join([path, directory]) + paths = get_cgroups_procs_paths(cg_path) + for path in paths: + if '/pod' in path: + match = re_path_uid.search(path) + if match: + uid = match.group(1) + pod_pids.extend(get_cgroup_pid(path)) + for pid in pod_pids: + name = str(get_pid_name(pid)) + rss = get_pid_rss(pid) + if rss > 0 and not None: + if uid not in pod_group: + pod_group[uid] = {} + if pid not in pod_group[uid]: + pod_group[uid][int(pid)] = {} + pod_group[uid][int(pid)] = { + 'rss': float(rss), + 'name': str(name)} + else: + k8s_system_pids.extend(get_cgroup_pid(path)) + + for pid in platform_pids: + name = str(get_pid_name(pid)) + rss = get_pid_rss(pid) + if rss > 0 and not None: + if name not in platform: + platform[int(pid)] = {} + platform[int(pid)] = {'rss': float(rss), + 'name': str(name)} + + for pid in k8s_system_pids: + name = str(get_pid_name(pid)) + rss = get_pid_rss(pid) + if rss > 0 and not None: + if pid not in k8s_system: + k8s_system[int(pid)] = {} + k8s_system[int(pid)] = {'rss': float(rss), + 'name': str(name)} + + # This returns the system overall process and stores it into a dict. + memory[pc.GROUP_PROCESSES][pc.GROUP_OVERALL] = dict(itertools.chain( + platform.items(), k8s_system.items(), pod_group[uid].items())) + + return memory + + +def output_top_10_pids(pid_dict, message): + """Outputs the top 10 pids with the formatted message. + + Args: + pid_dict: Dict The Dictionary of PIDs with Name and RSS + message: Formatted String, the template message to be output. + """ + + # Check that pid_dict has values + if not pid_dict: + return + proc = [] + # Sort the dict based on Rss value from highest to lowest. + sorted_pid_dict = sorted(pid_dict.items(), key=lambda x: x[1]['rss'], + reverse=True) + # Convert sorted_pid_dict into a list + [proc.append((i[1].get('name'), format_iec(i[1].get('rss')))) for i in + sorted_pid_dict] + # Output top 10 entries of the list + collectd.info(message % (str(proc[:10]))) + + def config_func(config): """Configure the memory usage plugin.""" @@ -486,6 +743,50 @@ def read_func(): # K8S platform addons usage, i.e., non-essential: monitor, openstack if pod.namespace in pc.K8S_NAMESPACE_ADDON: memory[pc.GROUP_OVERALL][pc.GROUP_K8S_ADDON] += MiB + # Limit output to every 5 minutes and after 29 seconds to avoid duplication + if datetime.datetime.now().minute % 5 == 0 and datetime.datetime.now( + + ).second > 29: + # Populate the memory per process dictionary to output results + pids = get_platform_memory_per_process() + + platform = pids[pc.GROUP_PROCESSES][pc.GROUP_PLATFORM] + group_pods = pids[pc.GROUP_PROCESSES][pc.GROUP_PODS] + k8s_system = pids[pc.GROUP_PROCESSES][pc.GROUP_K8S_SYSTEM] + k8s_addon = pids[pc.GROUP_PROCESSES][pc.GROUP_K8S_ADDON] + overall = pids[pc.GROUP_PROCESSES][pc.GROUP_OVERALL] + + # Using pod UID to determine output label memory usage by K8S pod + # per process + for uid in group_pods: + if uid in obj._cache: + pod = obj._cache[uid] + + else: + collectd.warning('%s: uid %s not found' % (PLUGIN, uid)) + continue + + # K8S platform system usage i.e.,kube-system, armada, etc. + if pod.namespace in pc.K8S_NAMESPACE_SYSTEM: + for key in group_pods[uid]: + k8s_system[key] = group_pods[uid][key] + + # K8S platform addons usage, i.e., non-essential: monitor, openstack + if pod.namespace in pc.K8S_NAMESPACE_ADDON: + for key in group_pods[uid]: + k8s_addon[key] = group_pods[uid][key] + + message = 'The top 10 memory rss processes for the platform are : %s' + output_top_10_pids(platform, message) + + message = 'The top 10 memory rss processes for the Kubernetes System are :%s' + output_top_10_pids(k8s_system, message) + + message = 'The top 10 memory rss processes Kubernetes Addon are :%s' + output_top_10_pids(k8s_addon, message) + + message = 'The top 10 memory rss processes overall are :%s' + output_top_10_pids(overall, message) # Calculate base memory usage (i.e., normal memory, exclude K8S and VMs) # e.g., docker, system.slice, user.slice diff --git a/collectd-extensions/src/plugin_common.py b/collectd-extensions/src/plugin_common.py index 5c727b9..f98693c 100644 --- a/collectd-extensions/src/plugin_common.py +++ b/collectd-extensions/src/plugin_common.py @@ -39,7 +39,7 @@ K8S_MODULE_MAJOR_VERSION = int(K8S_MODULE_VERSION.split('.')[0]) KUBELET_CONF = '/etc/kubernetes/kubelet.conf' SSL_TLS_SUPPRESS = True -# Standard units conversion parameters (mebi, kibi) +# Standard units' conversion parameters (mebi, kibi) # Reference: https://en.wikipedia.org/wiki/Binary_prefix Mi = 1048576 Ki = 1024 @@ -65,6 +65,7 @@ GROUP_PLATFORM = 'platform' GROUP_BASE = 'base' GROUP_K8S_SYSTEM = 'kube-system' GROUP_K8S_ADDON = 'kube-addon' +GROUP_PROCESSES = 'cgroup-processes' # Groups included in platform - this excludes apps PLATFORM_GROUPS = [GROUP_BASE, GROUP_K8S_SYSTEM] @@ -220,7 +221,7 @@ class PluginObject(object): # Name : node_ready # # Description: Test for node ready condition. - # Currently that's just a thresholded count + # Currently, that's just a threshold count # # Parameters : plugin name # @@ -586,7 +587,7 @@ class PluginObject(object): # # Name : make_http_request # - # Description: Issue an http request to the specified URL. + # Description: Issue a http request to the specified URL. # Load and return the response # Handling execution errors #