Update collectd cpu plugin and monitor-tools to diagnose cpu spikes

The collectd cpu plugin and monitor-tools are updated to
support diagnosing high cpu usage on shorter time scale.
This includes tools that assist SystemEngineering determine
the source where CPU time is coming from.

This collectd cpu plugin is updated to support Kubernetes services
under system.slice or k8splatform.slice.

This changes the frequency of read function sampling to 1 second.
We now see logs with instantaneous cpu spikes at the cgroup level.
This dispatch of results still occurs at the original plugin
interval of 30 seconds.  The logging of the 1 second sampling is
configurable via /etc/collectd.d/starlingx/python_plugins.conf
field 'hires = <true|false>. The hiresolution samples are always
collected and used for a histogram, but it is not always desired
to log this due to the volume of output.

This adds new logs for occupancy wait. This is similar to cpu
occupancy, but instead of realtime used, it measures the aggregate
percent of time a given cgroup is waiting to schedule. This is a
measure of CPU contention.

This adds new logs for occupancy histograms for all cgroups and
aggregated groupings based on the 1 second occupancy samples.
The histograms are displayed in hirunner order. This displays
the histogram, the mean, 95th-percentile, and max value.
The histograms are logged at 5 minute intervals.

This reduces collectd cgroup to 256 CPUShare from (1024).
This smoothes out behaviour of poorly behaved audits.

The 'schedtop' tool is updated to display 'cgroup' field. This
is the systemd cgroup name, or abbrieviated pod-name. This also
handles Kernel sched output format changes for 6.6.

New tool 'portscanner' is added to monitor-tools to diagnose
local host processes that are using specific ports. This has been
instrumental in discovering gunicorn/keystone API users.

New tool 'k8smetrics' is added to monitor-tools to display
the delay histogram and percentiles for kube-apiserver and
etdcserver. This gives a way to quantify performance as
a result of system load.

Partial-Bug: 2084714

TEST PLAN:
AIO-SX, AIO-DX, Standard, Storage, DC:
PASS: Fresh install ISO
PASS: Verify /var/log/collectd.logs for 1 second cpu/wait logs,
      and contains: etcd, kubelet, and containerd services.
PASS: Verify we are dispatching at 30 second granularity.
PASS: Verify we are displaying histograms every 5 minutes.
PASS: Verify we can enable/disable the display of hiresolution
      logs with /etc/collectd.d/starlingx/python_plugins.conf
      field 'hires = <true|false>'.
PASS: Verify schedtop contains 'cgroup' output.
PASS: Verify output from 'k8smetrics'.
      Cross check against Prometheus GUI for apiserver percentile.
PASS: Verify output from portscanner with port 5000.
      Verify 1-to-1 mapping against /var/log/keystone/keystone-all.log.

Change-Id: I82d4f414afdf1cecbcc99680b360cbad702ba140
Signed-off-by: Jim Gauld <James.Gauld@windriver.com>
This commit is contained in:
Jim Gauld 2024-10-16 18:14:13 -04:00
parent 8f404ea66c
commit 0232b8b9dc
16 changed files with 2270 additions and 270 deletions

View File

@ -12,5 +12,9 @@ ExecStart=/usr/sbin/collectd
ExecStartPost=/bin/bash -c 'echo $MAINPID > /var/run/collectd.pid'
ExecStopPost=/bin/rm -f /var/run/collectd.pid
# cgroup performance engineering
# - smooth out CPU impulse from poorly behaved plugin
CPUShares=256
[Install]
WantedBy=multi-user.target

View File

@ -1,5 +1,5 @@
#
# Copyright (c) 2018-2021 Wind River Systems, Inc.
# Copyright (c) 2018-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -17,6 +17,7 @@
############################################################################
import collectd
import copy
import numpy as np
import os
import plugin_common as pc
import re
@ -26,8 +27,13 @@ import tsconfig.tsconfig as tsc
from kubernetes.client.rest import ApiException
PLUGIN = 'platform cpu usage plugin'
#PLUGIN = 'platform cpu usage plugin'
PLUGIN = 'platform cpu'
PLUGIN_HISTOGRAM = 'histogram'
PLUGIN_DEBUG = 'DEBUG platform cpu'
PLUGIN_HIRES_INTERVAL = 1 # hi-resolution sample interval in secs
PLUGIN_DISPATCH_INTERVAL = 30 # dispatch interval in secs
PLUGIN_HISTOGRAM_INTERVAL = 300 # histogram interval in secs
TIMESTAMP = 'timestamp'
PLATFORM_CPU_PERCENT = 'platform-occupancy'
@ -42,25 +48,38 @@ SCHEDSTAT = '/proc/schedstat'
CPUACCT = pc.CGROUP_ROOT + '/cpuacct'
CPUACCT_USAGE = 'cpuacct.usage'
CPUACCT_USAGE_PERCPU = 'cpuacct.usage_percpu'
CPU_STAT = 'cpu.stat'
# Common regex pattern match groups
re_uid = re.compile(r'^pod(\S+)')
re_processor = re.compile(r'^[Pp]rocessor\s+:\s+(\d+)')
re_schedstat = re.compile(r'^cpu(\d+)\s+\d+\s+\d+\s+\d+\s+\d+\s+\d+\s+\d+\s+(\d+)\s+')
re_schedstat = re.compile(r'^cpu(\d+)\s+\d+\s+\d+\s+\d+\s+\d+\s+\d+\s+\d+\s+(\d+)\s+(\d+)\s+')
re_schedstat_version = re.compile(r'^version\s+(\d+)')
re_keyquoteval = re.compile(r'^\s*(\S+)\s*[=:]\s*\"(\S+)\"\s*')
re_cpu_wait_sum = re.compile(r'^wait_sum\s+(\d+)')
# hirunner minimum cpu occupancy threshold
HIRUNNER_MINIMUM_CPU_PERCENT = 0.1
# Set numpy format for printing bins
np.set_printoptions(formatter={'int': '{: 4d}'.format})
# Plugin specific control class and object.
class CPU_object(pc.PluginObject):
def __init__(self):
super(CPU_object, self).__init__(PLUGIN, '')
# CPU Plugin flags
self.dispatch = False # print occupancy and dispatch this sample
self.histogram = False # print occupancy histogram this sample
# CPU plugin configurable settings
self.debug = True
self.verbose = True
self.hires = False
# Cache Kubernetes pods data
self._cache = {}
self._k8s_client = pc.K8sClient()
self.k8s_pods = set()
@ -69,15 +88,50 @@ class CPU_object(pc.PluginObject):
self.schedstat_supported = True
self.number_platform_cpus = 0
# Platform CPU monitor
now = time.time() # epoch time in floating seconds
self._t0 = {} # cputime state info at start of sample interval
self._t0[TIMESTAMP] = now
self._t0_cpuacct = {}
self._data = {} # derived measurements at end of sample interval
self._data[PLATFORM_CPU_PERCENT] = 0.0
self.elapsed_ms = 0.0
# CPU State information at start of dispatch interval
self.d_t0 = {} # per-cpu cputime at dispatch time 0
self.d_w0 = {} # per-cpu cpuwait at dispatch time 0
self.d_t0[TIMESTAMP] = now # timestamp dispatch time 0
self.d_w0[TIMESTAMP] = now # timestamp dispatch time 0
self.d_t0_cpuacct = {} # per-cgroup cpuacct at dispatch time 0
self.d_t0_cpuwait = {} # per-cgroup cpuwait at dispatch time 0
# Derived measurements over dispatch interval
self.d_occ = {} # dispatch occupancy per cgroup or derived aggregate
self.d_occw = {} # dispatch occupancy wait per cgroup or derived aggregate
self.d_occ[PLATFORM_CPU_PERCENT] = 0.0 # dispatch platform occupancy
self.d_occw[PLATFORM_CPU_PERCENT] = 0.0 # dispatch platform occupancy wait
for g in pc.OVERALL_GROUPS:
self.d_occ[g] = 0.0
self.d_occw[g] = 0.0
self.d_elapsed_ms = 0.0 # dispatch elapsed time
# CPU State information at start of read sample interval
self._t0 = {} # per-cpu cputime at time 0
self._w0 = {} # per-cpu cpuwait at time 0
self._t0[TIMESTAMP] = now # timestamp time 0
self._w0[TIMESTAMP] = now # timestamp time 0
self._t0_cpuacct = {} # per-cgroup cpuacct at time 0
self._t0_cpuwait = {} # per-cgroup cpuwait at time 0
# Derived measurements over read sample interval
self._occ = {} # occupancy per cgroup or derived aggregate
self._occw = {} # occupancy wait per cgroup or derived aggregate
self._occ[PLATFORM_CPU_PERCENT] = 0.0 # platform occupancy
self._occw[PLATFORM_CPU_PERCENT] = 0.0 # platform occupancy wait
for g in pc.OVERALL_GROUPS:
self._occ[g] = 0.0
self._occw[g] = 0.0
self.elapsed_ms = 0.0 # elapsed time
# Derived measurements over histogram interval
self.hist_t0 = now # histogram timestamp time 0
self.hist_elapsed_ms = 0.0 # histogram elapsed time
self.hist_occ = {} # histogram bin counts per cgroup or derived aggregate
self.shared_bins = np.histogram_bin_edges(
np.array([0, 100], dtype=np.float64), bins=10, range=(0, 100))
# Instantiate the class
@ -87,13 +141,17 @@ obj = CPU_object()
def read_schedstat():
"""Read current hiresolution times per cpu from /proc/schedstats.
Return dictionary of cputimes in nanoseconds per cpu.
Return dictionary of cputimes in nanoseconds per cpu,
dictionary of cpuwaits in nanoseconds per cpu.
"""
cputime = {}
cpuwait = {}
# Obtain cumulative cputime (nanoseconds) from 7th field of
# /proc/schedstat. This is the time running tasks on this cpu.
# Obtain cumulative cputime (nanoseconds) from 7th field,
# and cumulative cpuwait (nanoseconds) from 8th field,
# from /proc/schedstat. This is the time running and waiting
# for tasks on this cpu.
try:
with open(SCHEDSTAT, 'r') as f:
for line in f:
@ -101,11 +159,13 @@ def read_schedstat():
if match:
k = int(match.group(1))
v = int(match.group(2))
w = int(match.group(3))
cputime[k] = v
cpuwait[k] = w
except Exception as err:
collectd.error('%s Cannot read schedstat, error=%s' % (PLUGIN, err))
return cputime
return cputime, cpuwait
def get_logical_cpus():
@ -202,8 +262,36 @@ def get_cgroup_cpuacct(path, cpulist=None):
return acct
def get_cgroup_cpu_wait_sum(path):
"""Get cgroup cpu.stat wait_sum usage for a specific cgroup path.
This represents the aggregate of all tasks wait time cfs_rq.
This tells us how suffering a task group is in the fight of
cpu resources.
Returns cumulative wait_sum in nanoseconds.
"""
wait_sum = 0
# Get the aggregate wait_sum for all cpus
fstat = '/'.join([path, CPU_STAT])
try:
with open(fstat, 'r') as f:
for line in f:
match = re_cpu_wait_sum.search(line)
if match:
v = int(match.group(1))
wait_sum = int(v)
except IOError:
# Silently ignore IO errors. It is likely the cgroup disappeared.
pass
return wait_sum
def get_cpuacct():
"""Get cpuacct usage based on cgroup hierarchy."""
"""Get cpuacct usage and wait_sum based on cgroup hierarchy."""
cpuacct = {}
cpuacct[pc.GROUP_OVERALL] = {}
@ -211,48 +299,86 @@ def get_cpuacct():
cpuacct[pc.GROUP_PODS] = {}
cpuacct[pc.CGROUP_SYSTEM] = {}
cpuacct[pc.CGROUP_USER] = {}
cpuacct[pc.CGROUP_INIT] = {}
cpuacct[pc.CGROUP_K8SPLATFORM] = {}
cpuwait = {}
cpuwait[pc.GROUP_OVERALL] = {}
cpuwait[pc.GROUP_FIRST] = {}
cpuwait[pc.GROUP_PODS] = {}
cpuwait[pc.CGROUP_SYSTEM] = {}
cpuwait[pc.CGROUP_USER] = {}
cpuwait[pc.CGROUP_INIT] = {}
cpuwait[pc.CGROUP_K8SPLATFORM] = {}
exclude_types = ['.mount']
# Overall cpuacct usage
acct = get_cgroup_cpuacct(CPUACCT, cpulist=obj.cpu_list)
wait = get_cgroup_cpu_wait_sum(CPUACCT)
cpuacct[pc.GROUP_OVERALL][pc.GROUP_TOTAL] = acct
cpuwait[pc.GROUP_OVERALL][pc.GROUP_TOTAL] = wait
# Initialize 'overhead' time (derived measurement). This will contain
# the remaining cputime not specifically tracked by first-level cgroups.
cpuacct[pc.GROUP_OVERALL][pc.GROUP_OVERHEAD] = acct
cpuwait[pc.GROUP_OVERALL][pc.GROUP_OVERHEAD] = wait
# Walk the first level cgroups and get cpuacct usage
# (e.g., docker, k8s-infra, user.slice, system.slice, machine.slice)
dir_list = next(os.walk(CPUACCT))[1]
for name in dir_list:
if any(name.endswith(x) for x in ['.mount', '.scope']):
if any(name.endswith(x) for x in exclude_types):
continue
cg_path = '/'.join([CPUACCT, name])
acct = get_cgroup_cpuacct(cg_path, cpulist=obj.cpu_list)
wait = get_cgroup_cpu_wait_sum(cg_path)
cpuacct[pc.GROUP_FIRST][name] = acct
cpuwait[pc.GROUP_FIRST][name] = wait
# Subtract out first-level cgroups. The remaining cputime represents
# systemd 'init' pid and kthreads on Platform cpus.
cpuacct[pc.GROUP_OVERALL][pc.GROUP_OVERHEAD] -= acct
cpuwait[pc.GROUP_OVERALL][pc.GROUP_OVERHEAD] -= wait
# Walk the system.slice cgroups and get cpuacct usage
path = '/'.join([CPUACCT, pc.CGROUP_SYSTEM])
dir_list = next(os.walk(path))[1]
for name in dir_list:
if any(name.endswith(x) for x in ['.mount', '.scope']):
if any(name.endswith(x) for x in exclude_types):
continue
cg_path = '/'.join([path, name])
acct = get_cgroup_cpuacct(cg_path, cpulist=obj.cpu_list)
wait = get_cgroup_cpu_wait_sum(cg_path)
cpuacct[pc.CGROUP_SYSTEM][name] = acct
cpuwait[pc.CGROUP_SYSTEM][name] = wait
# Walk the system.slice cgroups and get cpuacct usage
path = '/'.join([CPUACCT, pc.CGROUP_K8SPLATFORM])
if os.path.isdir(path):
dir_list = next(os.walk(path))[1]
else:
dir_list = []
for name in dir_list:
if any(name.endswith(x) for x in exclude_types):
continue
cg_path = '/'.join([path, name])
acct = get_cgroup_cpuacct(cg_path, cpulist=obj.cpu_list)
wait = get_cgroup_cpu_wait_sum(cg_path)
cpuacct[pc.CGROUP_K8SPLATFORM][name] = acct
cpuwait[pc.CGROUP_K8SPLATFORM][name] = wait
# Walk the user.slice cgroups and get cpuacct usage
path = '/'.join([CPUACCT, pc.CGROUP_USER])
dir_list = next(os.walk(path))[1]
for name in dir_list:
if any(name.endswith(x) for x in ['.mount', '.scope']):
if any(name.endswith(x) for x in exclude_types):
continue
cg_path = '/'.join([path, name])
acct = get_cgroup_cpuacct(cg_path, cpulist=obj.cpu_list)
wait = get_cgroup_cpu_wait_sum(cg_path)
cpuacct[pc.CGROUP_USER][name] = acct
cpuwait[pc.CGROUP_USER][name] = wait
# Walk the kubepods hierarchy to the pod level and get cpuacct usage.
# We can safely ignore reading this if the path does not exist.
@ -268,8 +394,357 @@ def get_cpuacct():
uid = match.group(1)
cg_path = os.path.join(root, name)
acct = get_cgroup_cpuacct(cg_path)
wait = get_cgroup_cpu_wait_sum(cg_path)
cpuacct[pc.GROUP_PODS][uid] = acct
return cpuacct
cpuwait[pc.GROUP_PODS][uid] = wait
return cpuacct, cpuwait
def calculate_occupancy(
prefix, hires, dispatch,
cache,
t0, t1,
w0, w1,
t0_cpuacct, t1_cpuacct,
t0_cpuwait, t1_cpuwait,
occ, occw,
elapsed_ms,
number_platform_cpus,
cpu_list, debug):
"""Calculate average occupancy and wait for platform cpus and cgroups.
This calculates:
- per-cpu cputime delta between time 0 and time 1 (ms)
- per-cpu cpuwait delta between time 0 and time 1 (ms)
- average platform occupancy based on cputime (%)
- average platform occupancy wait based on cpuwait (%)
- per-cgroup cpuacct delta between time 0 and time 1
- per-cgroup cpuwait delta between time 0 and time 1
- average per-cgroup occupancy based on cpuacct (%)
- average per-cgroup occupancy wait based on cpuwait (%)
- aggregate occupancy of specific cgroup groupings (%)
- aggregate occupancy wait of specific cgroup groupings (%)
This logs platform occupancy and aggregate cgroup groupings.
This logs of hirunner occupancy for base cgroups.
"""
# Aggregate cputime and cpuwait delta for platform logical cpus
cputime_ms = 0.0
cpuwait_ms = 0.0
for cpu in cpu_list:
# Paranoia check, we should never hit this.
if cpu not in t0 or cpu not in w0:
collectd.error('%s cputime initialization error' % (PLUGIN))
break
cputime_ms += float(t1[cpu] - t0[cpu])
cpuwait_ms += float(w1[cpu] - w0[cpu])
cputime_ms /= float(pc.ONE_MILLION)
cpuwait_ms /= float(pc.ONE_MILLION)
# Calculate average occupancy and wait of platform logical cpus
p_occ = 0.0
p_occw = 0.0
if number_platform_cpus > 0 and elapsed_ms > 0:
p_occ = float(pc.ONE_HUNDRED) * float(cputime_ms) \
/ float(elapsed_ms) / number_platform_cpus
p_occw = float(pc.ONE_HUNDRED) * float(cpuwait_ms) \
/ float(elapsed_ms) / number_platform_cpus
else:
p_occ = 0.0
p_occw = 0.0
if debug:
collectd.info('%s %s %s elapsed = %.1f ms, '
'cputime = %.1f ms, cpuwait = %.1f ms, '
'n_cpus = %d, '
'occupancy = %.2f %%, wait = %.2f %%'
% (PLUGIN_DEBUG,
prefix,
PLATFORM_CPU_PERCENT,
elapsed_ms,
cputime_ms, cpuwait_ms,
number_platform_cpus,
p_occ, p_occw))
occ[PLATFORM_CPU_PERCENT] = p_occ
occw[PLATFORM_CPU_PERCENT] = p_occw
# Calculate cpuacct and cpuwait delta for cgroup hierarchy, dropping transient cgroups
cpuacct = {}
for i in t1_cpuacct.keys():
cpuacct[i] = {}
for k, v in t1_cpuacct[i].items():
if i in t0_cpuacct.keys() and k in t0_cpuacct[i].keys():
cpuacct[i][k] = v - t0_cpuacct[i][k]
else:
cpuacct[i][k] = v
cpuwait = {}
for i in t1_cpuwait.keys():
cpuwait[i] = {}
for k, v in t1_cpuwait[i].items():
if i in t0_cpuwait.keys() and k in t0_cpuwait[i].keys():
cpuwait[i][k] = v - t0_cpuwait[i][k]
else:
cpuwait[i][k] = v
# Summarize cpuacct usage for various groupings we aggregate
for g in pc.GROUPS_AGGREGATED:
cpuacct[pc.GROUP_OVERALL][g] = 0.0
cpuwait[pc.GROUP_OVERALL][g] = 0.0
# Aggregate cpuacct usage by K8S pod
for uid in cpuacct[pc.GROUP_PODS]:
acct = cpuacct[pc.GROUP_PODS][uid]
wait = cpuwait[pc.GROUP_PODS][uid]
if uid in cache:
pod = cache[uid]
else:
collectd.warning('%s uid %s not found' % (PLUGIN, uid))
continue
# K8S platform system usage, i.e., essential: kube-system
# check for component label app.starlingx.io/component=platform
if pod.is_platform_resource():
cpuacct[pc.GROUP_OVERALL][pc.GROUP_K8S_SYSTEM] += acct
cpuwait[pc.GROUP_OVERALL][pc.GROUP_K8S_SYSTEM] += wait
# K8S platform addons usage, i.e., non-essential: monitor, openstack
if pod.namespace in pc.K8S_NAMESPACE_ADDON:
cpuacct[pc.GROUP_OVERALL][pc.GROUP_K8S_ADDON] += acct
cpuwait[pc.GROUP_OVERALL][pc.GROUP_K8S_ADDON] += wait
# Calculate base cpuacct usage (i.e., base tasks, exclude K8S and VMs)
# e.g., docker, system.slice, user.slice, init.scope
for name in cpuacct[pc.GROUP_FIRST].keys():
if name in pc.BASE_GROUPS:
cpuacct[pc.GROUP_OVERALL][pc.GROUP_BASE] += \
cpuacct[pc.GROUP_FIRST][name]
cpuwait[pc.GROUP_OVERALL][pc.GROUP_BASE] += \
cpuwait[pc.GROUP_FIRST][name]
elif name not in pc.BASE_GROUPS_EXCLUDE:
collectd.warning('%s could not find cgroup: %s' % (PLUGIN, name))
# Calculate system.slice container cpuacct usage
for g in pc.CONTAINERS_CGROUPS:
if g in cpuacct[pc.CGROUP_SYSTEM].keys():
cpuacct[pc.GROUP_OVERALL][pc.GROUP_CONTAINERS] += \
cpuacct[pc.CGROUP_SYSTEM][g]
cpuwait[pc.GROUP_OVERALL][pc.GROUP_CONTAINERS] += \
cpuwait[pc.CGROUP_SYSTEM][g]
if g in cpuacct[pc.CGROUP_K8SPLATFORM].keys():
cpuacct[pc.GROUP_OVERALL][pc.GROUP_CONTAINERS] += \
cpuacct[pc.CGROUP_K8SPLATFORM][g]
cpuwait[pc.GROUP_OVERALL][pc.GROUP_CONTAINERS] += \
cpuwait[pc.CGROUP_K8SPLATFORM][g]
# Calculate platform cpuacct usage (this excludes apps)
for g in pc.PLATFORM_GROUPS:
cpuacct[pc.GROUP_OVERALL][pc.GROUP_PLATFORM] += \
cpuacct[pc.GROUP_OVERALL][g]
cpuwait[pc.GROUP_OVERALL][pc.GROUP_PLATFORM] += \
cpuwait[pc.GROUP_OVERALL][g]
# Calculate cgroup based occupancy and wait for overall groupings
for g in pc.OVERALL_GROUPS:
cputime_ms = \
float(cpuacct[pc.GROUP_OVERALL][g]) / float(pc.ONE_MILLION)
g_occ = float(pc.ONE_HUNDRED) * float(cputime_ms) \
/ float(elapsed_ms) / number_platform_cpus
occ[g] = g_occ
cpuwait_ms = \
float(cpuwait[pc.GROUP_OVERALL][g]) / float(pc.ONE_MILLION)
g_occw = float(pc.ONE_HUNDRED) * float(cpuwait_ms) \
/ float(elapsed_ms) / number_platform_cpus
occw[g] = g_occw
if obj.debug:
collectd.info('%s %s %s elapsed = %.1f ms, '
'cputime = %.1f ms, cpuwait = %.1f ms, '
'n_cpus = %d, '
'occupancy = %.2f %%, wait = %.2f %%'
% (PLUGIN_DEBUG,
prefix,
g,
elapsed_ms,
cputime_ms, cpuwait_ms,
number_platform_cpus,
g_occ, g_occ))
# Store occupancy hirunners
h_occ = {}
h_occw = {}
# Calculate cgroup based occupancy for first-level groupings
for g in cpuacct[pc.GROUP_FIRST]:
cputime_ms = \
float(cpuacct[pc.GROUP_FIRST][g]) / float(pc.ONE_MILLION)
g_occ = float(pc.ONE_HUNDRED) * float(cputime_ms) \
/ float(elapsed_ms) / number_platform_cpus
occ[g] = g_occ
cpuwait_ms = \
float(cpuwait[pc.GROUP_FIRST][g]) / float(pc.ONE_MILLION)
g_occw = float(pc.ONE_HUNDRED) * float(cpuwait_ms) \
/ float(elapsed_ms) / number_platform_cpus
occw[g] = g_occw
if g != pc.CGROUP_INIT:
continue
# Keep hirunners exceeding minimum threshold.
if g_occ >= HIRUNNER_MINIMUM_CPU_PERCENT:
h_occ[g] = g_occ
if g_occw >= HIRUNNER_MINIMUM_CPU_PERCENT:
h_occw[g] = g_occw
# Calculate cgroup based occupancy for cgroups within system.slice.
for g in cpuacct[pc.CGROUP_SYSTEM]:
cputime_ms = \
float(cpuacct[pc.CGROUP_SYSTEM][g]) / float(pc.ONE_MILLION)
g_occ = float(pc.ONE_HUNDRED) * float(cputime_ms) \
/ float(elapsed_ms) / number_platform_cpus
occ[g] = g_occ
cpuwait_ms = \
float(cpuwait[pc.CGROUP_SYSTEM][g]) / float(pc.ONE_MILLION)
g_occw = float(pc.ONE_HUNDRED) * float(cpuwait_ms) \
/ float(elapsed_ms) / number_platform_cpus
occw[g] = g_occw
# Keep hirunners exceeding minimum threshold.
if g_occ >= HIRUNNER_MINIMUM_CPU_PERCENT:
h_occ[g] = g_occ
if g_occw >= HIRUNNER_MINIMUM_CPU_PERCENT:
h_occw[g] = g_occw
# Calculate cgroup based occupancy for cgroups within k8splatform.slice.
if pc.CGROUP_K8SPLATFORM in cpuacct.keys():
for g in cpuacct[pc.CGROUP_K8SPLATFORM]:
cputime_ms = \
float(cpuacct[pc.CGROUP_K8SPLATFORM][g]) / float(pc.ONE_MILLION)
g_occ = float(pc.ONE_HUNDRED) * float(cputime_ms) \
/ float(elapsed_ms) / number_platform_cpus
occ[g] = g_occ
cpuwait_ms = \
float(cpuwait[pc.CGROUP_K8SPLATFORM][g]) / float(pc.ONE_MILLION)
g_occw = float(pc.ONE_HUNDRED) * float(cpuwait_ms) \
/ float(elapsed_ms) / number_platform_cpus
occw[g] = g_occw
# Keep hirunners exceeding minimum threshold.
if g_occ >= HIRUNNER_MINIMUM_CPU_PERCENT:
h_occ[g] = g_occ
if g_occw >= HIRUNNER_MINIMUM_CPU_PERCENT:
h_occw[g] = g_occw
# Calculate cgroup based occupancy for cgroups within user.slice.
for g in cpuacct[pc.CGROUP_USER]:
cputime_ms = \
float(cpuacct[pc.CGROUP_USER][g]) / float(pc.ONE_MILLION)
g_occ = float(pc.ONE_HUNDRED) * float(cputime_ms) \
/ float(elapsed_ms) / number_platform_cpus
occ[g] = g_occ
cpuwait_ms = \
float(cpuwait[pc.CGROUP_USER][g]) / float(pc.ONE_MILLION)
g_occw = float(pc.ONE_HUNDRED) * float(cpuwait_ms) \
/ float(elapsed_ms) / number_platform_cpus
occw[g] = g_occw
# Keep hirunners exceeding minimum threshold.
if g_occ >= HIRUNNER_MINIMUM_CPU_PERCENT:
h_occ[g] = g_occ
if g_occw >= HIRUNNER_MINIMUM_CPU_PERCENT:
h_occw[g] = g_occw
if (hires and prefix == 'hires') or (dispatch and prefix == 'dispatch'):
# Print cpu occupancy usage for high-level groupings
collectd.info('%s %s Usage: %.1f%% (avg per cpu); '
'cpus: %d, Platform: %.1f%% '
'(Base: %.1f, k8s-system: %.1f), k8s-addon: %.1f, '
'%s: %.1f, %s: %.1f'
% (PLUGIN, prefix,
occ[PLATFORM_CPU_PERCENT],
number_platform_cpus,
occ[pc.GROUP_PLATFORM],
occ[pc.GROUP_BASE],
occ[pc.GROUP_K8S_SYSTEM],
occ[pc.GROUP_K8S_ADDON],
pc.GROUP_CONTAINERS,
occ[pc.GROUP_CONTAINERS],
pc.GROUP_OVERHEAD,
occ[pc.GROUP_OVERHEAD]))
# Print hirunner cpu occupancy usage for base cgroups
occs = ', '.join(
'{}: {:.1f}'.format(k.split('.', 1)[0], v) for k, v in sorted(
h_occ.items(), key=lambda t: -float(t[1]))
)
collectd.info('%s %s %s: %.1f%%; cpus: %d, (%s)'
% (PLUGIN,
prefix, 'Base usage',
occ[pc.GROUP_BASE],
number_platform_cpus,
occs))
# Print hirunner cpu wait for base cgroups
occws = ', '.join(
'{}: {:.1f}'.format(k.split('.', 1)[0], v) for k, v in sorted(
h_occw.items(), key=lambda t: -float(t[1]))
)
collectd.info('%s %s %s: %.1f%%; cpus: %d, (%s)'
% (PLUGIN,
prefix, 'Base wait',
occw[pc.GROUP_BASE],
number_platform_cpus,
occws))
def aggregate_histogram(histogram, occ, shared_bins, hist_occ, debug):
"""Aggregate occupancy histogram bins for platform cpus and cgroups.
This aggregates occupancy histogram bins for each key measurement.
When 'histogram' flag is True, this will:
- calculate mean, 95th-percentime, and max statistics, and bins
the measurements
- log histograms and statistics per measurement in hirunner order
"""
# Aggregate each key, value into histogram bins
for k, v in occ.items():
# Get abbreviated name (excludes: .service, .scope, .socket, .mount)
# eg, 'k8splatform.slice' will shorten to 'k8splatform'
key = k.split('.', 1)[0]
if key not in hist_occ:
hist_occ[key] = np.array([], dtype=np.float64)
if v is not None:
hist_occ[key] = np.append(hist_occ[key], v)
if histogram:
# Calculate histograms and statistics for each key measurement
H = {}
for k, v in hist_occ.items():
H[k] = {}
H[k]['count'] = hist_occ[k].size
if H[k]['count'] > 0:
H[k]['mean'] = np.mean(hist_occ[k])
H[k]['p95'] = np.percentile(hist_occ[k], 95)
H[k]['pmax'] = np.max(hist_occ[k])
H[k]['hist'], _ = np.histogram(hist_occ[k], bins=shared_bins)
else:
H[k]['mean'] = 0
H[k]['p95'] = 0.0
H[k]['pmax'] = 0.0
H[k]['hist'] = []
# Print out each histogram, sort by cpu occupancy hirunners
bins = ' '.join('{:4d}'.format(int(x)) for x in shared_bins[1:])
collectd.info('%s: %26.26s : bins=[%s]'
% (PLUGIN_HISTOGRAM, 'component', bins))
for k, v in sorted(H.items(), key=lambda t: -float(t[1]['mean'])):
if v['mean'] > HIRUNNER_MINIMUM_CPU_PERCENT:
collectd.info('%s: %26.26s : hist=%s : cnt: %3d, '
'mean: %5.1f %%, p95: %5.1f %%, max: %5.1f %%'
% (PLUGIN_HISTOGRAM, k, v['hist'], v['count'],
v['mean'], v['p95'], v['pmax']))
def update_cpu_data(init=False):
@ -287,23 +762,36 @@ def update_cpu_data(init=False):
# Calculate elapsed time delta since last run
obj.elapsed_ms = float(pc.ONE_THOUSAND) * (now - obj._t0[TIMESTAMP])
obj.d_elapsed_ms = float(pc.ONE_THOUSAND) * (now - obj.d_t0[TIMESTAMP])
obj.hist_elapsed_ms = float(pc.ONE_THOUSAND) * (now - obj.hist_t0)
# Prevent calling this routine too frequently (<= 1 sec)
if not init and obj.elapsed_ms <= 1000.0:
return
# Check whether this is a dispatch interval
if obj.d_elapsed_ms >= 1000.0 * PLUGIN_DISPATCH_INTERVAL:
obj.dispatch = True
# Check whether this is a histogram interval
if obj.hist_elapsed_ms >= 1000.0 * PLUGIN_HISTOGRAM_INTERVAL:
obj.histogram = True
t1 = {}
w1 = {}
t1[TIMESTAMP] = now
w1[TIMESTAMP] = now
if obj.schedstat_supported:
# Get current per-cpu cumulative cputime usage from /proc/schedstat.
cputimes = read_schedstat()
cputime, cpuwait = read_schedstat()
for cpu in obj.cpu_list:
t1[cpu] = cputimes[cpu]
t1[cpu] = cputime[cpu]
w1[cpu] = cpuwait[cpu]
else:
return
# Get current cpuacct usages based on cgroup hierarchy
t1_cpuacct = get_cpuacct()
# Get current cpuacct usages and wait_sum based on cgroup hierarchy
t1_cpuacct, t1_cpuwait = get_cpuacct()
# Refresh the k8s pod information if we have discovered new cgroups
cg_pods = set(t1_cpuacct[pc.GROUP_PODS].keys())
@ -350,154 +838,73 @@ def update_cpu_data(init=False):
del obj._cache[uid]
except ApiException:
# continue with remainder of calculations, keeping cache
collectd.warning("cpu plugin encountered kube ApiException")
collectd.warning('%s encountered kube ApiException' % (PLUGIN))
pass
# Save initial state information
if init:
obj.d_t0 = copy.deepcopy(t1)
obj.d_w0 = copy.deepcopy(w1)
obj.d_t0_cpuacct = copy.deepcopy(t1_cpuacct)
obj.d_t0_cpuwait = copy.deepcopy(t1_cpuwait)
obj._t0 = copy.deepcopy(t1)
obj._w0 = copy.deepcopy(w1)
obj._t0_cpuacct = copy.deepcopy(t1_cpuacct)
obj._t0_cpuwait = copy.deepcopy(t1_cpuwait)
return
# Aggregate cputime delta for platform logical cpus using integer math
cputime_ms = 0.0
for cpu in obj.cpu_list:
# Paranoia check, we should never hit this.
if cpu not in obj._t0:
collectd.error('%s cputime initialization error' % (PLUGIN))
break
cputime_ms += float(t1[cpu] - obj._t0[cpu])
cputime_ms /= float(pc.ONE_MILLION)
# Calculate average cpu occupancy for hi-resolution read sample
prefix = 'hires'
calculate_occupancy(
prefix, obj.hires, obj.dispatch,
obj._cache,
obj._t0, t1,
obj._w0, w1,
obj._t0_cpuacct, t1_cpuacct,
obj._t0_cpuwait, t1_cpuwait,
obj._occ, obj._occw,
obj.elapsed_ms,
obj.number_platform_cpus,
obj.cpu_list,
obj.debug)
# Calculate average occupancy of platform logical cpus
occupancy = 0.0
if obj.number_platform_cpus > 0 and obj.elapsed_ms > 0:
occupancy = float(pc.ONE_HUNDRED) * float(cputime_ms) \
/ float(obj.elapsed_ms) / obj.number_platform_cpus
else:
occupancy = 0.0
obj._data[PLATFORM_CPU_PERCENT] = occupancy
if obj.debug:
collectd.info('%s %s elapsed = %.1f ms, cputime = %.1f ms, '
'n_cpus = %d, occupancy = %.2f %%'
% (PLUGIN_DEBUG,
PLATFORM_CPU_PERCENT,
obj.elapsed_ms,
cputime_ms,
obj.number_platform_cpus,
occupancy))
# Aggregate occupancy histogram bins
aggregate_histogram(
obj.histogram, obj._occ, obj.shared_bins, obj.hist_occ, obj.debug)
# Calculate cpuacct delta for cgroup hierarchy, dropping transient cgroups
cpuacct = {}
for i in t1_cpuacct.keys():
cpuacct[i] = {}
for k, v in t1_cpuacct[i].items():
if i in obj._t0_cpuacct and k in obj._t0_cpuacct[i]:
cpuacct[i][k] = v - obj._t0_cpuacct[i][k]
else:
cpuacct[i][k] = v
# Clear histogram data for next interval
if obj.histogram:
obj.histogram = False
obj.hist_occ = {}
obj.hist_t0 = now
# Summarize cpuacct usage for various groupings we aggregate
for g in pc.GROUPS_AGGREGATED:
cpuacct[pc.GROUP_OVERALL][g] = 0.0
# Aggregate cpuacct usage by K8S pod
for uid in cpuacct[pc.GROUP_PODS]:
acct = cpuacct[pc.GROUP_PODS][uid]
if uid in obj._cache:
pod = obj._cache[uid]
else:
collectd.warning('%s uid %s not found' % (PLUGIN, uid))
continue
# K8S platform system usage, i.e., essential: kube-system
# check for component label app.starlingx.io/component=platform
if pod.is_platform_resource():
cpuacct[pc.GROUP_OVERALL][pc.GROUP_K8S_SYSTEM] += acct
# K8S platform addons usage, i.e., non-essential: monitor, openstack
if pod.namespace in pc.K8S_NAMESPACE_ADDON:
cpuacct[pc.GROUP_OVERALL][pc.GROUP_K8S_ADDON] += acct
# Calculate base cpuacct usage (i.e., base tasks, exclude K8S and VMs)
# e.g., docker, system.slice, user.slice
for name in cpuacct[pc.GROUP_FIRST]:
if name in pc.BASE_GROUPS:
cpuacct[pc.GROUP_OVERALL][pc.GROUP_BASE] += \
cpuacct[pc.GROUP_FIRST][name]
elif name not in pc.BASE_GROUPS_EXCLUDE:
collectd.warning('%s could not find cgroup: %s' % (PLUGIN, name))
# Calculate system.slice container cpuacct usage
for g in pc.CONTAINERS_CGROUPS:
if g in cpuacct[pc.CGROUP_SYSTEM]:
cpuacct[pc.GROUP_OVERALL][pc.GROUP_CONTAINERS] += \
cpuacct[pc.CGROUP_SYSTEM][g]
# Calculate platform cpuacct usage (this excludes apps)
for g in pc.PLATFORM_GROUPS:
cpuacct[pc.GROUP_OVERALL][pc.GROUP_PLATFORM] += \
cpuacct[pc.GROUP_OVERALL][g]
# Calculate cgroup based occupancy for overall groupings
for g in pc.OVERALL_GROUPS:
cputime_ms = \
float(cpuacct[pc.GROUP_OVERALL][g]) / float(pc.ONE_MILLION)
occupancy = float(pc.ONE_HUNDRED) * float(cputime_ms) \
/ float(obj.elapsed_ms) / obj.number_platform_cpus
obj._data[g] = occupancy
if obj.debug:
collectd.info('%s %s elapsed = %.1f ms, cputime = %.1f ms, '
'n_cpus = %d, occupancy = %.2f %%'
% (PLUGIN_DEBUG,
g,
obj.elapsed_ms,
cputime_ms,
obj.number_platform_cpus,
occupancy))
# Calculate cgroup based occupancy for first-level groupings
for g in cpuacct[pc.GROUP_FIRST]:
cputime_ms = \
float(cpuacct[pc.GROUP_FIRST][g]) / float(pc.ONE_MILLION)
occupancy = float(pc.ONE_HUNDRED) * float(cputime_ms) \
/ float(obj.elapsed_ms) / obj.number_platform_cpus
obj._data[g] = occupancy
# Calculate cgroup based occupancy for cgroups within
# system.slice and user.slice, keeping the hirunners
# exceeding minimum threshold.
occ = {}
for g in cpuacct[pc.CGROUP_SYSTEM]:
cputime_ms = \
float(cpuacct[pc.CGROUP_SYSTEM][g]) / float(pc.ONE_MILLION)
occupancy = float(pc.ONE_HUNDRED) * float(cputime_ms) \
/ float(obj.elapsed_ms) / obj.number_platform_cpus
obj._data[g] = occupancy
if occupancy >= HIRUNNER_MINIMUM_CPU_PERCENT:
occ[g] = occupancy
for g in cpuacct[pc.CGROUP_USER]:
cputime_ms = \
float(cpuacct[pc.CGROUP_USER][g]) / float(pc.ONE_MILLION)
occupancy = float(pc.ONE_HUNDRED) * float(cputime_ms) \
/ float(obj.elapsed_ms) / obj.number_platform_cpus
obj._data[g] = occupancy
if occupancy >= HIRUNNER_MINIMUM_CPU_PERCENT:
occ[g] = occupancy
occs = ', '.join(
'{}: {:.1f}'.format(k.split('.', 1)[0], v) for k, v in sorted(
occ.items(), key=lambda t: -float(t[1]))
)
collectd.info('%s %s: %.1f%%; cpus: %d, (%s)'
% (PLUGIN,
'Base usage',
obj._data[pc.GROUP_BASE],
obj.number_platform_cpus,
occs))
# Calculate average cpu occupancy for dispatch interval
if obj.dispatch:
prefix = 'dispatch'
calculate_occupancy(
prefix, obj.hires, obj.dispatch,
obj._cache,
obj.d_t0, t1,
obj.d_w0, w1,
obj.d_t0_cpuacct, t1_cpuacct,
obj.d_t0_cpuwait, t1_cpuwait,
obj.d_occ, obj.d_occw,
obj.d_elapsed_ms,
obj.number_platform_cpus,
obj.cpu_list,
obj.debug)
# Update t0 state for the next sample collection
obj._t0 = copy.deepcopy(t1)
obj._w0 = copy.deepcopy(w1)
obj._t0_cpuacct = copy.deepcopy(t1_cpuacct)
obj._t0_cpuwait = copy.deepcopy(t1_cpuwait)
if obj.dispatch:
obj.d_t0 = copy.deepcopy(t1)
obj.d_w0 = copy.deepcopy(w1)
obj.d_t0_cpuacct = copy.deepcopy(t1_cpuacct)
obj.d_t0_cpuwait = copy.deepcopy(t1_cpuwait)
def config_func(config):
@ -510,9 +917,11 @@ def config_func(config):
obj.debug = pc.convert2boolean(val)
elif key == 'verbose':
obj.verbose = pc.convert2boolean(val)
elif key == 'hires':
obj.hires = pc.convert2boolean(val)
collectd.info('%s debug=%s, verbose=%s'
% (PLUGIN, obj.debug, obj.verbose))
collectd.info('%s debug=%s, verbose=%s, hires=%s'
% (PLUGIN, obj.debug, obj.verbose, obj.hires))
return pc.PLUGIN_PASS
@ -598,55 +1007,41 @@ def read_func():
collectd.info('%s no cpus to monitor' % PLUGIN)
return pc.PLUGIN_PASS
# Gather current cputime state information, and calculate occupancy since
# this routine was last run.
# Gather current cputime state information, and calculate occupancy
# since this routine was last run.
update_cpu_data()
# Prevent dispatching measurements at plugin startup
if obj.elapsed_ms <= 1000.0:
if obj.elapsed_ms <= 500.0:
return pc.PLUGIN_PASS
if obj.verbose:
collectd.info('%s Usage: %.1f%% (avg per cpu); '
'cpus: %d, Platform: %.1f%% '
'(Base: %.1f, k8s-system: %.1f), k8s-addon: %.1f, '
'%s: %.1f, %s: %.1f'
% (PLUGIN, obj._data[PLATFORM_CPU_PERCENT],
obj.number_platform_cpus,
obj._data[pc.GROUP_PLATFORM],
obj._data[pc.GROUP_BASE],
obj._data[pc.GROUP_K8S_SYSTEM],
obj._data[pc.GROUP_K8S_ADDON],
pc.GROUP_CONTAINERS,
obj._data[pc.GROUP_CONTAINERS],
pc.GROUP_OVERHEAD,
obj._data[pc.GROUP_OVERHEAD]))
# Fault insertion code to assis in regression UT
#
# if os.path.exists('/var/run/fit/cpu_data'):
# with open('/var/run/fit/cpu_data', 'r') as infile:
# for line in infile:
# obj._data[PLATFORM_CPU_PERCENT] = float(line)
# obj._occ[PLATFORM_CPU_PERCENT] = float(line)
# collectd.info("%s using FIT data:%.2f" %
# (PLUGIN, obj._data[PLATFORM_CPU_PERCENT] ))
# (PLUGIN, obj._occ[PLATFORM_CPU_PERCENT] ))
# break
# Dispatch overall platform cpu usage percent value
val = collectd.Values(host=obj.hostname)
val.plugin = 'cpu'
val.type = 'percent'
val.type_instance = 'used'
val.dispatch(values=[obj._data[PLATFORM_CPU_PERCENT]])
if obj.dispatch:
# Dispatch overall platform cpu usage percent value
val = collectd.Values(host=obj.hostname)
val.plugin = 'cpu'
val.type = 'percent'
val.type_instance = 'used'
val.dispatch(values=[obj.d_occ[PLATFORM_CPU_PERCENT]])
# Dispatch grouped platform cpu usage values
val = collectd.Values(host=obj.hostname)
val.plugin = 'cpu'
val.type = 'percent'
val.type_instance = 'occupancy'
for g in pc.OVERALL_GROUPS:
val.plugin_instance = g
val.dispatch(values=[obj._data[g]])
# Dispatch grouped platform cpu usage values
val = collectd.Values(host=obj.hostname)
val.plugin = 'cpu'
val.type = 'percent'
val.type_instance = 'occupancy'
for g in pc.OVERALL_GROUPS:
val.plugin_instance = g
val.dispatch(values=[obj.d_occ[g]])
obj.dispatch = False
# Calculate overhead cost of gathering metrics
if obj.debug:
@ -661,4 +1056,4 @@ def read_func():
# Register the config, init and read functions
collectd.register_config(config_func)
collectd.register_init(init_func)
collectd.register_read(read_func)
collectd.register_read(read_func, interval=PLUGIN_HIRES_INTERVAL)

View File

@ -1,5 +1,5 @@
#
# Copyright (c) 2018-2022 Wind River Systems, Inc.
# Copyright (c) 2018-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -618,22 +618,23 @@ def output_top_10_pids(pid_dict, message):
"""Outputs the top 10 pids with the formatted message.
Args:
pid_dict: Dict The Dictionary of PIDs with Name and RSS
message: Formatted String, the template message to be output.
pid_dict: dictionary {pid: {'name': name, 'rss: value}
message: Formatted String, template output message
"""
# Check that pid_dict has values
if not pid_dict:
return
proc = []
# Sort the dict based on Rss value from highest to lowest.
sorted_pid_dict = sorted(pid_dict.items(), key=lambda x: x[1]['rss'],
reverse=True)
# Convert sorted_pid_dict into a list
[proc.append((i[1].get('name'), format_iec(i[1].get('rss')))) for i in
sorted_pid_dict]
# Output top 10 entries of the list
collectd.info(message % (str(proc[:10])))
# Output top 10 RSS usage entries
mems = ', '.join(
'{}: {}'.format(
v.get('name', '-'),
format_iec(v.get('rss', 0.0))) for k, v in sorted(
pid_dict.items(),
key=lambda t: -float(t[1]['rss']))[:10]
)
collectd.info(message % (mems))
def config_func(config):
@ -777,10 +778,10 @@ def read_func():
# K8S platform addons usage, i.e., non-essential: monitor, openstack
if pod.namespace in pc.K8S_NAMESPACE_ADDON:
memory[pc.GROUP_OVERALL][pc.GROUP_K8S_ADDON] += MiB
# Limit output to every 5 minutes and after 29 seconds to avoid duplication
if datetime.datetime.now().minute % 5 == 0 and datetime.datetime.now(
).second > 29:
# Get per-process and per-pod RSS memory every 5 minutes
now = datetime.datetime.now()
if now.minute % 5 == 0 and now.second > 29:
# Populate the memory per process dictionary to output results
pids = get_platform_memory_per_process()
@ -795,13 +796,21 @@ def read_func():
for uid in group_pods:
if uid in obj._cache:
pod = obj._cache[uid]
# Ensure pods outside of Kube-System and Kube-Addon are only logged every 30 min
if datetime.datetime.now().minute % 30 == 0 and datetime.datetime.now().second > 29:
collectd.info(f'The pod:{pod.name} running in namespace:{pod.namespace} '
f'has the following processes{group_pods[uid]}')
# Log detailed memory usage of all pods every 30 minutes
if now.minute % 30 == 0 and now.second > 29:
mems = ', '.join(
'{}({}): {}'.format(
v.get('name', '-'),
k,
format_iec(v.get('rss', 0.0))) for k, v in sorted(
group_pods[uid].items(),
key=lambda t: -float(t[1]['rss']))
)
collectd.info(f'memory usage: Pod: {pod.name}, '
f'Namespace: {pod.namespace}, '
f'pids: {mems}')
else:
collectd.warning('%s: uid %s for pod %s not found in namespace %s' % (
PLUGIN, uid, pod.name, pod.namespace))
collectd.warning('%s: uid %s for pod not found' % (PLUGIN, uid))
continue
# K8S platform system usage, i.e., essential: kube-system
@ -815,16 +824,16 @@ def read_func():
for key in group_pods[uid]:
k8s_addon[key] = group_pods[uid][key]
message = 'The top 10 memory rss processes for the platform are : %s'
message = 'Top 10 memory usage pids: platform: %s'
output_top_10_pids(platform, message)
message = 'The top 10 memory rss processes for the Kubernetes System are :%s'
message = 'Top 10 memory usage pids: Kubernetes System: %s'
output_top_10_pids(k8s_system, message)
message = 'The top 10 memory rss processes Kubernetes Addon are :%s'
message = 'Top 10 memory usage pids: Kubernetes Addon: %s'
output_top_10_pids(k8s_addon, message)
message = 'The top 10 memory rss processes overall are :%s'
message = 'Top 10 memory usage pids: overall: %s'
output_top_10_pids(overall, message)
# Calculate base memory usage (i.e., normal memory, exclude K8S and VMs)

View File

@ -1,7 +1,7 @@
#
# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2019 Intel Corporation
# Copyright (C) 2019-2024 Intel Corporation
#
############################################################################
#
@ -741,7 +741,7 @@ def parse_ovs_appctl_bond_list(buf):
buf = buf.strip().split("\n")
result = {}
for idx, line in enumerate(buf):
if idx is 0:
if idx == 0:
continue
line = line.strip()
@ -837,7 +837,7 @@ def compare_interfaces(interfaces1, interfaces2):
len1 = len(set1 - set2)
len2 = len(set2 - set1)
if len1 is 0 and len2 is 0:
if len1 == 0 and len2 == 0:
return True
else:
return False

View File

@ -1,5 +1,5 @@
#
# Copyright (c) 2019-2022 Wind River Systems, Inc.
# Copyright (c) 2019-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -40,6 +40,7 @@ MIN_AUDITS_B4_FIRST_QUERY = 2
K8S_MODULE_MAJOR_VERSION = int(K8S_MODULE_VERSION.split('.')[0])
KUBELET_CONF = '/etc/kubernetes/kubelet.conf'
SSL_TLS_SUPPRESS = True
K8S_TIMEOUT = 2
# Standard units' conversion parameters (mebi, kibi)
# Reference: https://en.wikipedia.org/wiki/Binary_prefix
@ -83,9 +84,11 @@ GROUPS_AGGREGATED = [GROUP_PLATFORM, GROUP_BASE, GROUP_K8S_SYSTEM,
GROUP_K8S_ADDON, GROUP_CONTAINERS]
# First level cgroups -- these are the groups we know about
CGROUP_INIT = 'init.scope'
CGROUP_SYSTEM = 'system.slice'
CGROUP_USER = 'user.slice'
CGROUP_MACHINE = 'machine.slice'
CGROUP_K8SPLATFORM = 'k8splatform.slice'
CGROUP_DOCKER = 'docker'
CGROUP_K8S = K8S_ROOT
@ -98,7 +101,8 @@ CONTAINERS_CGROUPS = [CGROUP_SYSTEM_CONTAINERD, CGROUP_SYSTEM_DOCKER,
CGROUP_SYSTEM_KUBELET, CGROUP_SYSTEM_ETCD]
# Groupings by first level cgroup
BASE_GROUPS = [CGROUP_DOCKER, CGROUP_SYSTEM, CGROUP_USER]
BASE_GROUPS = [CGROUP_INIT, CGROUP_DOCKER, CGROUP_SYSTEM, CGROUP_USER,
CGROUP_K8SPLATFORM]
BASE_GROUPS_EXCLUDE = [CGROUP_K8S, CGROUP_MACHINE]
# Groupings of pods by kubernetes namespace
@ -750,18 +754,28 @@ class K8sClient(object):
# Debian
# kubectl --kubeconfig KUBELET_CONF get pods --all-namespaces \
# --selector spec.nodeName=the_host -o json
kube_results = subprocess.check_output(
['kubectl', '--kubeconfig', KUBELET_CONF,
'--field-selector', field_selector,
'get', 'pods', '--all-namespaces',
'-o', 'json'
]).decode()
json_results = json.loads(kube_results)
try:
kube_results = subprocess.check_output(
['kubectl', '--kubeconfig', KUBELET_CONF,
'--field-selector', field_selector,
'get', 'pods', '--all-namespaces',
'-o', 'json',
], timeout=K8S_TIMEOUT).decode()
json_results = json.loads(kube_results)
except subprocess.TimeoutExpired:
collectd.error('kube_get_local_pods: Timeout')
return []
except json.JSONDecodeError as e:
collectd.error('kube_get_local_pods: Could not parse json output, error=%s' % (str(e)))
return []
except subprocess.CalledProcessError as e:
collectd.error('kube_get_local_pods: Could not get pods, error=%s' % (str(e)))
return []
# convert the items to: kubernetes.client.V1Pod
api_items = [self._as_kube_pod(x) for x in json_results['items']]
return api_items
except Exception as err:
collectd.error("kube_get_local_pods: %s" % (err))
collectd.error("kube_get_local_pods: error=%s" % (str(err)))
raise
@ -783,7 +797,8 @@ class POD_object:
"""Check whether pod contains platform namespace or platform label"""
if (self.namespace in K8S_NAMESPACE_SYSTEM
or self.labels.get(PLATFORM_LABEL_KEY) == GROUP_PLATFORM):
or (self.labels is not None and
self.labels.get(PLATFORM_LABEL_KEY) == GROUP_PLATFORM)):
return True
return False

View File

@ -5,6 +5,7 @@ LoadPlugin python
<Module "cpu">
debug = false
verbose = true
hires = false
</Module>
Import "memory"
<Module "memory">
@ -21,5 +22,4 @@ LoadPlugin python
Import "remotels"
Import "service_res"
LogTraces = true
Encoding "utf-8"
</Plugin>

View File

@ -1,3 +1,10 @@
monitor-tools (1.0-2) unstable; urgency=medium
* Update schedtop to display cgroups from systemd services and Kubernetes pods
* Add watchpids to find created processes, typically short-lived
-- Jim Gauld <James.Gauld@windriver.com> Thu, 12 Sep 2024 09:54:55 -0400
monitor-tools (1.0-1) unstable; urgency=medium
* Initial release.

View File

@ -13,4 +13,5 @@ Description: Monitor tools package
This package contains data collection tools to monitor host performance.
Tools are general purpose engineering and debugging related.
Includes overall memory, cpu occupancy, per-task cpu,
per-task scheduling, per-task io.
per-task scheduling, per-task io, newly created short-lived-processes,
local port scanning.

View File

@ -5,7 +5,7 @@ Source: https://opendev.org/starlingx/utilities
Files: *
Copyright:
(c) 2013-2021 Wind River Systems, Inc
(c) 2013-2024 Wind River Systems, Inc
(c) Others (See individual files for more details)
License: Apache-2
Licensed under the Apache License, Version 2.0 (the "License");
@ -26,7 +26,7 @@ License: Apache-2
# If you want to use GPL v2 or later for the /debian/* files use
# the following clauses, or change it to suit. Delete these two lines
Files: debian/*
Copyright: 2021 Wind River Systems, Inc
Copyright: 2024 Wind River Systems, Inc
License: Apache-2
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@ -10,5 +10,8 @@ override_dh_install:
install -p memtop $(ROOT)/usr/bin
install -p schedtop $(ROOT)/usr/bin
install -p occtop $(ROOT)/usr/bin
install -p k8smetrics $(ROOT)/usr/bin
install -p portscanner $(ROOT)/usr/bin
install -p watchpids $(ROOT)/usr/bin
dh_install

View File

@ -1,6 +1,6 @@
---
debname: monitor-tools
debver: 1.0-1
debver: 1.0-2
src_path: scripts
revision:
dist: $STX_DIST

292
monitor-tools/scripts/k8smetrics Executable file
View File

@ -0,0 +1,292 @@
#!/usr/bin/env python
########################################################################
#
# Copyright (c) 2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
########################################################################
#
# Calculate Kubernetes latency percentile metrics (50%, 95, and 99%) for
# etcdserver and kube-apiserver. This is based on Prometheus format raw
# metrics histograms within kube-apiserver.
#
# This obtains current Kubernetes raw metrics cumulative counters,
# (e.g., kubectl get --raw /metrics). The counters represent cumulative
# frequency of delays <= value. This calculates the delta from previous,
# and does percentile calculation.
#
# Example:
# kubectl get --raw /metrics
#
# To see API calls:
# kubectl get --raw /metrics -v 6
#
# This does minimal parsing and aggregation to yield equivalent of the
# following Prometheus PromQL queries using data over a time-window:
# histogram_quantile(0.95, sum(rate(etcd_request_duration_seconds_bucket[5m])) by (le))
# histogram_quantile(0.95, sum(rate(apiserver_request_duration_seconds_bucket{verb!~"CONNECT|WATCH|WATCH|PROXY"}[5m])) by (le))
# histogram_quantile(0.95, sum(rate(workqueue_queue_duration_seconds_bucket[5m])) by (le))
# histogram_quantile(0.95, sum(rate(rest_client_request_duration_seconds[5m])) by (le))
#
# Specific verbs are excluded to eliminate tooling anomolies, otherwise
# histogram are polluted with >>40second delays.
#
# TODO(jgauld): Migrate code to use prometheus_client API;
# prometheus_clientthat is not currently installed.
#
import argparse
from copy import deepcopy
from datetime import datetime, timedelta
import logging
import logging.handlers
import os
import pprint
import re
import subprocess
import sys
import tempfile
import time
LOG = logging.getLogger(__name__)
KUBECONFIG = '/etc/kubernetes/admin.conf'
re_bucket = re.compile(r'^([a-zA-Z0-9:_]+)_bucket{(.*)}\s+(\d+)')
def get_raw_metrics(rawfile=None):
if rawfile is None:
fd, rawfile = tempfile.mkstemp(dir='/tmp', prefix='k8s-prom-raw-', suffix='.log')
with os.fdopen(fd, 'w') as f:
cmd = ['kubectl', '--kubeconfig={}'.format(KUBECONFIG),
'get', '--raw', '/metrics']
try:
subprocess.check_call(cmd, stdout=f, timeout=5)
except subprocess.TimeoutExpired as e:
LOG.error('get_raw_metrics: error=%s' % (str(e)))
except subprocess.CalledProcessError as e:
LOG.error('get_raw_metrics: error=%s' % (str(e)))
except Exception as e:
LOG.error('get_raw_metrics: error=%s' % (str(e)))
return rawfile
def read_raw_metrics(rawfile=None):
patterns = {
'apiserver_request_duration_seconds': {'exclude_verbs': ['CONNECT', 'WATCH', 'WATCHLIST', 'PROXY']},
'etcd_request_duration_seconds': {},
'workqueue_queue_duration_seconds': {},
'rest_client_request_duration_seconds': {},
}
names = patterns.keys()
# Store aggregate bucket values metric[name][le]
metrics = {}
for name in names:
metrics[name] = {}
cleanup = False
if rawfile is None:
cleanup = True
rawfile = get_raw_metrics()
with open(rawfile) as f:
for l in f:
if l.startswith(tuple(names)):
# THIS IS TOO VERBOSE FOR TYPICAL DEBUG
#LOG.debug(l.rstrip())
match = re_bucket.search(l)
if match:
name = match.group(1)
tags = match.group(2)
count = int(match.group(3))
D = {}
for key_value in tags.split(','):
key, value = key_value.split('=')
value = value.replace('"', '')
D.update({key: value})
# make sure we have a valid "le" bucket
bucket = D.get('le')
if bucket is None:
continue
# filter out specific verbs
exclude_verbs = patterns[name].get('exclude_verbs', {})
if 'verb' in D and D['verb'] in exclude_verbs:
continue
# Aggregate metric for matching name and "le" bucket
if bucket not in metrics[name]:
metrics[name][bucket] = 0
metrics[name][bucket] += count
if cleanup:
os.unlink(rawfile)
return metrics
def percentile(hist, q=0.95):
# Input: dictionary hist[le_bin] = freq
# these are sorted
le_bins = sorted(list(hist.keys()), key=float)
# Calculate number of binned samples
count = 0
for x in le_bins:
count += hist[x]
p0 = 0.0
x0 = 0.0
for x in le_bins:
x1 = float(x)
p = float(hist[x]) / float(count)
p1 = p0 + p
if p1 >= q:
percentile = x0 + (x1 - x0) * (q - p0) / (p1 - p0)
break
p0 = p1
percentile = x1
return percentile
def k8smetrics(args=None):
# Read prometheus raw metrics snapshot at time t1
now = datetime.now()
tstamp1 = now
t1 = read_raw_metrics()
if args.debug:
LOG.debug("t1:")
pprint.pprint(t1, indent=1)
start_time = now
while now - start_time < timedelta(minutes=args.period_min):
# Copy all state information for time t0
t0 = deepcopy(t1)
tstamp0 = tstamp1
time.sleep(args.interval_min*60)
# Read prometheus raw metrics snapshot at time t1
now = datetime.now()
tstamp1 = now
t1 = read_raw_metrics()
if args.debug:
LOG.debug("t1:")
pprint.pprint(t1, indent=1)
# Print tool header for this interval
duration = tstamp1 - tstamp0
LOG.info('Samples from: %s - %s, duration: %s'
% (tstamp0, tstamp1, duration))
# Calculate delta between cumulative snapshots
delta = {}
for name in t1.keys():
delta[name] = {}
for bucket in t1[name]:
v0 = t0[name].get(bucket, 0)
delta[name][bucket] = t1[name][bucket] - v0
# NOTE: le="+Inf" is identical to value of x_count
# le="y" is upper-bound of the bucket
hist = {}
for name in delta.keys():
hist[name] = {}
inf = delta[name].pop('+Inf', None)
if inf is None:
continue
buckets = sorted(list(delta[name].keys()), key=float)
# Calculate frequency distribution from cumulative frequency
maxbin = 0.0
v0 = 0
for x in buckets:
v = delta[name][x]
d = v - v0
# in the case of anomolous value (yeah, we going crazy)
if d < 0:
if args.debug:
LOG.debug('d<0: x=%s, v0=%s, v=%s, d=%s, inf=%s' % (x, v0, v, d, inf))
d = 0
if d > 0:
maxbin = float(x)
v0 = v
hist[name][x] = d
index = name.rfind('_seconds')
text = name[:index]
percentile_50 = 1000.0*percentile(hist[name], q=0.50)
percentile_95 = 1000.0*percentile(hist[name], q=0.95)
percentile_99 = 1000.0*percentile(hist[name], q=0.99)
# Print histogram summary and percentiles for each metric
print("{} : count: {}, p50: {:.0f} ms, p95: {:.0f} ms, p99: {:.0f} ms, maxbin: {:.0f} ms".format(
text, inf, percentile_50, percentile_95, percentile_99, 1000.0*maxbin))
print('bins:', end=' ')
[print('{0:5g}'.format(1000.0*float(x)), end=' ') for x in buckets]
print()
print(' <=:', end=' ')
[print('{0:5.0f}'.format(delta[name][x]), end=' ') for x in buckets]
print()
print('hist:', end=' ')
[print('{0:5.0f}'.format(hist[name][x]), end=' ') for x in buckets]
print()
# blank line between metrics
print()
return 0
def main():
# Instantiate the parser
parser = argparse.ArgumentParser(description='Kubernetes latency percentile metrics')
# Optional argument
parser.add_argument('--period_min', type=int, default=1,
help='sampling period in minutes')
parser.add_argument('--interval_min', type=int, default=1,
help='sampling interval in minutes')
parser.add_argument('--debug', action='store_true',
help='enable tool debug')
args = parser.parse_args()
# Configure logging
if args.debug:
level = logging.DEBUG
else:
level = logging.INFO
out_hdlr = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter(
'%(asctime)s %(process)s %(levelname)s %(module)s: %(message)s')
out_hdlr.setFormatter(formatter)
out_hdlr.setLevel(level)
LOG.addHandler(out_hdlr)
LOG.setLevel(level)
LOG.info("Kubernetes latency percentiles: period:%s mins, interval=%s mins",
args.period_min, args.interval_min)
try:
ret = k8smetrics(args=args)
sys.exit(ret)
except KeyboardInterrupt as e:
LOG.info('caught: %r, shutting down', e)
sys.exit(0)
except IOError:
sys.exit(0)
except Exception as e:
LOG.error('exception: %r', e, exc_info=1)
sys.exit(-4)
if __name__ == '__main__':
main()

153
monitor-tools/scripts/portscanner Executable file
View File

@ -0,0 +1,153 @@
#!/usr/bin/env python
########################################################################
#
# Copyright (c) 2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
########################################################################
import argparse
import psutil
from psutil._common import addr
from datetime import datetime, timedelta
import logging
import logging.handlers
import time
import os
import sys
import time
LOG = logging.getLogger(__name__)
# Global variables
seen_connections = dict()
seen_local_ports = dict()
def connections_summary():
# Print overall connections summary
sorted_dict = sorted(seen_connections.items(), key=lambda item: item[1])
sorted_dict = reversed(sorted_dict)
print('\nSUMMARY: Total connections')
for key, value in sorted_dict:
print("%7d %s" % (value, key))
def portscan(args=None):
match_status = ['ESTABLISHED']
LOG.info("Scanning for connections on port:%s, matching status:%s, for %d minutes.",
match_status, args.port, args.duration_min)
now = datetime.now()
start_time = now
while now - start_time < timedelta(minutes=args.duration_min):
now = datetime.now()
try:
connections = psutil.net_connections(kind='tcp')
except psutil.Error as error:
LOG.error("Error: %s", str(error))
connections = []
time.sleep(1)
matches = []
for conn in connections:
if (isinstance(conn.raddr, addr) and
(conn.raddr.port == args.port) and
(any(s == conn.status for s in match_status)) and
str(conn.laddr.port) + str(conn.pid) not in seen_local_ports):
local_port_pid = str(conn.laddr.port) + str(conn.pid)
seen_local_ports[local_port_pid] = seen_local_ports.get(local_port_pid, 0) + 1
matches.append(conn)
if matches:
tstamp = now.strftime("%Y-%m-%d %H:%M:%S")
for conn in matches:
try:
p = psutil.Process(pid=conn.pid)
except psutil.Error as error:
LOG.debug("Error: %s", str(error))
continue
d = p.as_dict()
pid = conn.pid
r_ip = conn.raddr.ip
new_match = False
summary_key = '{} {} {} {}'.format(
r_ip, pid, d['name'],' '.join(d['cmdline']))
if summary_key not in seen_connections:
new_match = True
# Increment connection counts based on unique key
seen_connections[summary_key] = seen_connections.get(summary_key, 0) + 1
# d['environ'] -- too verbose
if new_match:
print("{} Local:{}:{} Remote: {}:{} status:{} ppid:{}, pid:{}, threads:{}, user:{}, name:{}, cmdline:{}".format(
tstamp,
conn.laddr.ip, conn.laddr.port,
conn.raddr.ip, conn.raddr.port,
conn.status,
d['ppid'], d['pid'], d['num_threads'],
d['username'], d['name'],' '.join(d['cmdline'])))
time.sleep(args.delay)
def main():
"""Main program."""
# Instantiate the parser
parser = argparse.ArgumentParser(
description='Scan processes matching net_connection port')
# Optional argument
parser.add_argument('--duration_min', type=int, default=5,
help='duration to collect in minutes')
parser.add_argument('--port', type=int, default=5000,
help='specific port to scan')
parser.add_argument('--delay', type=float, default=0.2,
help='scanning delay in seconds')
parser.add_argument('--debug', action='store_true',
help='enable tool debug')
args = parser.parse_args()
# Configure logging
if args.debug:
level = logging.DEBUG
else:
level = logging.INFO
out_hdlr = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter(
'%(asctime)s %(process)s %(levelname)s %(module)s: %(message)s')
out_hdlr.setFormatter(formatter)
out_hdlr.setLevel(level)
LOG.addHandler(out_hdlr)
LOG.setLevel(level)
# Limit access of this tool.
if os.geteuid() != 0:
LOG.error('Require sudo/root.')
sys.exit(1)
try:
ret = portscan(args=args)
connections_summary()
sys.exit(ret)
except KeyboardInterrupt as e:
LOG.info('caught: %r, shutting down', e)
connections_summary()
sys.exit(0)
except IOError:
sys.exit(0)
except Exception as e:
LOG.error('exception: %r', e, exc_info=1)
sys.exit(-4)
if __name__ == '__main__':
main()

View File

@ -1,7 +1,7 @@
#!/usr/bin/perl
########################################################################
#
# Copyright (c) 2015-2021 Wind River Systems, Inc.
# Copyright (c) 2015-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -16,7 +16,8 @@
# Usage: schedtop OPTIONS
# [--delay=<seconds>] [--repeat=<num>] [--period=<seconds>]
# [--reset-hwm] [--idle] [--sort=<cpu|io>]
# [--watch-cmd=tid1,cmd1,cmd2,...] [--watch-only] [--watch-quiet]
# [--watch-cmd=tid1,cmd1,cmd2,...] [--watch-cgroup=cgroup1,...]
# [--watch-only] [--watch-quiet]
# [--trig-delay=time]
# [--help]
@ -28,6 +29,7 @@ use Time::HiRes qw(clock_gettime usleep CLOCK_MONOTONIC CLOCK_REALTIME);
use Benchmark ':hireswallclock';
use Carp qw(croak carp);
use Math::BigInt;
use File::Find ();
# Define toolname
our $TOOLNAME = "schedtop";
@ -73,6 +75,10 @@ our $USER_HZ = 100; # no easy way to get this
our $CLOCK_NS = SI_G / $USER_HZ;
our $print_host = 1;
our @cgroup_procs_paths = ();
our @cgroup_procs_match = ();
our @cgroup_tids = ();
# Print options
our ($P_none, $P_lite, $P_brief, $P_full) = (0, 1, 2, 3);
our ($P_ps, $P_cpu, $P_del, $P_io, $P_id, $P_cmd) = (0, 1, 2, 3, 4, 5);
@ -88,6 +94,7 @@ our ($arg_debug,
$arg_sort,
$arg_print,
@arg_watch_cmd,
@arg_watch_cgroup,
$arg_watch_only,
$arg_watch_quiet,
$arg_trig_delay,
@ -120,7 +127,7 @@ my @delta_list = (
my @state_list = (
'exec_max', 'wait_max', 'block_max',
'pid', 'ppid', 'state', 'comm', 'cmdline', 'wchan', 'affinity',
'pid', 'ppid', 'state', 'cgroup', 'comm', 'cmdline', 'wchan', 'affinity',
'VmSize', 'VmRSS', 'start_time',
'nice', 'policy', 'priority', 'rt_priority', 'task_cpu'
);
@ -142,6 +149,7 @@ $| = 1;
\$::arg_sort,
\$::arg_print,
\@::arg_watch_cmd,
\@::arg_watch_cgroup,
\$::arg_watch_only,
\$::arg_watch_quiet,
\$::arg_trig_delay,
@ -198,8 +206,9 @@ printf "selected options: ".
$::arg_sort, $::arg_print;
if (@::arg_watch_cmd) {
printf "selected watch/trigger options: ".
"watch-cmd=%s, only=%s, quiet=%s, delay=%d ms\n",
"watch-cmd=%s, watch-cgroup=%s, only=%s, quiet=%s, delay=%d ms\n",
join(',', @::arg_watch_cmd),
join(',', @::arg_watch_cgroup),
(defined $::arg_watch_only ? 'true' : 'false'),
(defined $::arg_watch_quiet ? 'true' : 'false'),
$::arg_trig_delay;
@ -218,6 +227,12 @@ for (my $i=0; $i < $::num_cpus; $i++) {
}
$w_aff = &max(length 'AFF', length $::affinity_mask->as_hex());
# Find cgroup.proc paths matching specified cgroup patterns
&find_matching_cgroup_procs(\@::cgroup_procs_match, \@::arg_watch_cgroup);
for my $file (@::cgroup_procs_match) {
print "matched cgroup:", $file, "\n";
}
# Reset scheduling hi-water marks
if (defined $::arg_reset_hwm) {
&get_tids(\%::tids_1);
@ -246,7 +261,7 @@ if ($is_schedstat) {
# Get current scheduling and io info for all tids
&read_sched(\%::tids_1, \%::task_1);
# Track watched tids for monitoring
&track_watched_tids(\%::tids_1, \%::tids_w, \%::task_1, \@::arg_watch_cmd);
&track_watched_tids(\%::tids_1, \%::tids_w, \%::task_1, \@::arg_watch_cmd, \@::arg_watch_cgroup);
# determine column sort order
my $s_keyw = 'watched';
@ -295,11 +310,46 @@ REPEAT_LOOP: for (my $repeat=1; $repeat <= $::arg_repeat; $repeat++) {
&read_stat(\%::percpu_1);
}
if (defined $::arg_watch_only) {
# Get list of pids and tids from watched commands;
# this reduces cpu impact dramatically
foreach my $tid (keys %::tids_w) {
$::tids_1{$tid} = $::tids_w{$tid};
# This determines a subset of pids and tids
# based on previous watched tids and matching cgroups.
# This should reduce cpu impact dramatically.
# Get list of pids and tids
&get_tids(\%::tids_1);
# Get array of tids corresponding to matching cgroups
&read_cgroup_procs(\@::cgroup_tids, \@::cgroup_procs_match);
my %cgroup_tids_h = map { $_ => 1 } @::cgroup_tids;
# Keep previous watched tids and find new matches from cgroup.procs
my @del_tids = ();
foreach my $tid (keys %::tids_1) {
my $pid = $::tids_1{$tid};
next if (exists $::tids_w{$tid});
if (exists $cgroup_tids_h{$tid}) {
$::tids_w{$tid} = $pid;
printf "ADD watching: tid=%7d\n", $tid;
next;
}
push(@del_tids, $tid);
}
# Prune tids not actually being watched
foreach my $tid (@del_tids) {
delete $::tids_1{$tid};
}
# Prune watched tids that not longer exist
my @del_tids_w = ();
foreach my $tid (keys %::tids_w) {
next if (exists $::tids_1{$tid});
push(@del_tids_w, $tid);
}
foreach my $tid (@del_tids_w) {
printf "REM watching: tid=%7d\n", $tid;
delete $::tids_w{$tid};
}
} else {
# Get list of pids and tids
&get_tids(\%::tids_1);
@ -462,7 +512,7 @@ REPEAT_LOOP: for (my $repeat=1; $repeat <= $::arg_repeat; $repeat++) {
if ($::opt_P{$::P_cmd} == $::P_brief) {
$L .= sprintf "%s", "cmdline";
} elsif ($::opt_P{$::P_cmd} == $::P_full) {
$L .= sprintf "%-15s %s", "comm", "cmdline";
$L .= sprintf "%-16s %-15s %s", "cgroup", "comm", "cmdline";
}
print $L, "\n";
@ -526,7 +576,8 @@ REPEAT_LOOP: for (my $repeat=1; $repeat <= $::arg_repeat; $repeat++) {
if ($::opt_P{$::P_cmd} == $::P_brief) {
$L .= sprintf "%s", $::D_task{$tid}{'cmdline'};
} elsif ($::opt_P{$::P_cmd} == $::P_full) {
$L .= sprintf "%-15s %s",
$L .= sprintf "%-16s %-15s %s",
substr($::D_task{$tid}{'cgroup'}, 0, 16),
substr($::D_task{$tid}{'comm'}, 0, 15),
$::D_task{$tid}{'cmdline'};
}
@ -625,17 +676,36 @@ sub get_tids
}
# Reset scheduling hi-water-marks
# NOTE: Reset by write 0 to sched is finicky; use brute force
sub reset_sched_hwm
{
(local *::tids) = @_;
# reset scheduling hi-water-marks by writing '0' to each task
my (%pids_) = ();
foreach my $tid (keys %::tids) {
my $pid = $::tids{$tid};
$pids_{$pid} = 1;
}
foreach my $pid (keys %pids_) {
my $file = '/proc/' . $pid . '/sched';
open(my $fh, "> $file") || next;
print $fh "0\n";
close($fh);
}
foreach my $tid (keys %::tids) {
my $file = '/proc/' . $tid . '/sched';
open(my $fh, "> $file") || next;
print $fh "0\n";
close($fh);
}
foreach my $tid (keys %::tids) {
my $pid = $::tids{$tid};
my $file = '/proc/' . $pid . '/task/' . $tid . '/sched';
open(my $fh, "> $file") || next;
print $fh "0\n";
close($fh);
}
}
# Trigger a crash dump via sysrq, result in /var/crash .
@ -658,22 +728,81 @@ sub sysrq_trigger_crash
# Track watched tids for monitoring
sub track_watched_tids
{
(local *::tids, local *::tids_w, local *::task, local *::arg_watch_cmd) = @_;
(local *::tids, local *::tids_w, local *::task, local *::arg_watch_cmd, local *::arg_watch_cgroup) = @_;
foreach my $tid (keys %::tids) {
my $pid = $::tids{$tid};
my $comm = $::task{$tid}{'comm'};
my $cgroup = $::task{$tid}{'cgroup'};
my $cmdline = $::task{$tid}{'cmdline'};
my $watched = 0;
next if (exists $::tids_w{$tid});
foreach my $cmd (@::arg_watch_cmd) {
if (($cmd =~ /^\d+$/) && ($tid == $cmd)) {
if (($cmd =~ /^\d+$/) && (($tid == $cmd) || ($pid == $cmd))) {
$::tids_w{$tid} = $pid;
printf "watching: tid=%7d, comm=%s\n", $tid, $comm;
printf "watching: tid=%7d, cgroup=%s, comm=%s, cmdline=%.40s\n", $tid, $cgroup, $comm, $cmdline;
}
if ((defined $comm) && ($comm =~ /^\Q$cmd\E/)) {
$::tids_w{$tid} = $pid;
printf "watching: tid=%7d, comm=%s\n", $tid, $comm;
printf "watching: tid=%7d, cgroup=%s, comm=%s, cmdline=%.40s\n", $tid, $cgroup, $comm, $cmdline;
}
}
foreach my $cg (@::arg_watch_cgroup) {
if ((defined $cgroup) && ($cgroup =~ /^\Q$cg\E/)) {
$::tids_w{$tid} = $pid;
printf "watching: tid=%7d, cgroup=%s, comm=%s, cmdline=%.40s\n", $tid, $cgroup, $comm, $cmdline;
}
}
}
}
# Find module difficult, storing result in global variable
sub wanted_cgroup_procs {
my $F = $File::Find::name;
if ($_ eq 'cgroup.procs') {
push @::cgroup_procs_paths, $F;
}
}
# Find cgroup.proc paths matching specified cgroup patterns
sub find_matching_cgroup_procs
{
(local *::cgroup_procs_match, local *::arg_watch_cgroup) = @_;
# Find all cgroup.procs paths for the pids cgroup controller
File::Find::find(\&wanted_cgroup_procs, '/sys/fs/cgroup/pids');
foreach my $file (@::cgroup_procs_paths) {
foreach my $cg (@::arg_watch_cgroup) {
if ($file =~ /\Q$cg\E(\.service|\.scope)/) {
push(@::cgroup_procs_match, $file);
} elsif ($file =~ /kubepods\/\w+\/\Q$cg\E/) {
push(@::cgroup_procs_match, $file);
}
}
}
}
# Get array of tids corresponding to matching cgroups
sub read_cgroup_procs
{
(local *::tids, local *::cgroup_procs_match) = @_;
my $tid = ();
# reset scheduling hi-water-marks by writing '0' to each task
foreach my $cgroup_procs (@::cgroup_procs_match) {
open(my $fh, $cgroup_procs) || goto SKIP_PROCS;
while (<$fh>) {
if (/^(\d+)$/) {
$tid = $1;
push @::tids, $tid;
}
}
close($fh);
SKIP_PROCS:;
}
}
@ -703,7 +832,7 @@ sub read_sched
$gtime, $cgtime,
$start_data, $end_data, $start_brk, $arg_start, $arg_end,
$env_start, $env_end, $exit_code) = ();
my ($cgroup) = ();
my ($nr_switches, $nr_migrations) = (0,0);
my ($exec_runtime, $exec_max) = (0.0, 0.0);
my ($wait_max, $wait_sum, $wait_count) = (0.0, 0.0, 0);
@ -716,7 +845,7 @@ sub read_sched
$cancelled_write_bytes) = (0,0,0,0,0,0,0);
my ($sched_valid, $io_valid, $status_valid, $cmdline_valid,
$wchan_valid, $stat_valid) = ();
$wchan_valid, $stat_valid, $cgroup_valid) = ();
$pid = $::tids{$tid};
@ -765,6 +894,67 @@ sub read_sched
#prio : 120
#clock-delta : 28
# Changes for 6.6.0 kernel
#cat /proc/1/sched
#systemd (1, #threads: 1)
#-------------------------------------------------------------------
#se.exec_start : 251536392.418317
#se.vruntime : 542073.435409
#se.sum_exec_runtime : 1097697.572750
#se.nr_migrations : 35039
#sum_sleep_runtime : 249925608.224346
#sum_block_runtime : 234992.983051
#wait_start : 0.000000
#sleep_start : 251536392.418317
#block_start : 0.000000
#sleep_max : 11967.794377
#block_max : 1230.041276
#exec_max : 147.808142
#slice_max : 78.070544
#wait_max : 180.271599
#wait_sum : 440802.706697
#wait_count : 1022180
#iowait_sum : 81.179285
#iowait_count : 63
#nr_migrations_cold : 0
#nr_failed_migrations_affine : 145872
#nr_failed_migrations_running : 67209
#nr_failed_migrations_hot : 82715
#nr_forced_migrations : 12
#nr_wakeups : 264124
#nr_wakeups_sync : 41
#nr_wakeups_migrate : 205
#nr_wakeups_local : 146458
#nr_wakeups_remote : 117666
#nr_wakeups_affine : 204
#nr_wakeups_affine_attempts : 409
#nr_wakeups_passive : 0
#nr_wakeups_idle : 0
#avg_atom : 1.072258
#avg_per_cpu : 31.327879
#nr_switches : 1023725
#nr_voluntary_switches : 264916
#nr_involuntary_switches : 758809
#se.load.weight : 1048576
#se.avg.load_sum : 1490
#se.avg.runnable_sum : 1526937
#se.avg.util_sum : 365568
#se.avg.load_avg : 32
#se.avg.runnable_avg : 32
#se.avg.util_avg : 7
#se.avg.last_update_time : 251536392418304
#se.avg.util_est.ewma : 163
#se.avg.util_est.enqueued : 7
#policy : 0
#prio : 120
#clock-delta : 112
#mm->numa_scan_seq : 0
#numa_pages_migrated : 0
#numa_preferred_nid : -1
#total_numa_faults : 0
#current_node=0, numa_group_id=0
#numa_faults node=0 task_private=0 task_shared=0 group_private=0 group_shared=0
# parse /proc/<pid>/task/<tid>/sched
$file = '/proc/' . $pid . '/task/' . $tid . '/sched';
open($fh, $file) || goto SKIP_SCHED;
@ -774,19 +964,19 @@ sub read_sched
}
my ($k, $v, $c0);
LOOP_SCHED: while (<$fh>) {
if (/^se\.statistics.{1,2}wait_max\s+:\s+(\S+)/) {
if (/^wait_max\s+:\s+(\S+)/ || /^se\.statistics.{1,2}wait_max\s+:\s+(\S+)/) {
$wait_max = $1;
} elsif (/^se\.statistics.{1,2}block_max\s+:\s+(\S+)/) {
} elsif (/^block_max\s+:\s+(\S+)/ || /^se\.statistics.{1,2}block_max\s+:\s+(\S+)/) {
$block_max = $1;
} elsif (/^se\.statistics.{1,2}wait_sum\s+:\s+(\S+)/) {
} elsif (/^wait_sum\s+:\s+(\S+)/ || /^se\.statistics.{1,2}wait_sum\s+:\s+(\S+)/) {
$wait_sum = $1;
} elsif (/^se\.statistics.{1,2}wait_count\s+:\s+(\S+)/) {
} elsif (/^wait_count\s+:\s+(\S+)/ || /^se\.statistics.{1,2}wait_count\s+:\s+(\S+)/) {
$wait_count = $1;
} elsif (/^se\.statistics.{1,2}exec_max\s+:\s+(\S+)/) {
} elsif (/^exec_max\s+:\s+(\S+)/ || /^se\.statistics.{1,2}exec_max\s+:\s+(\S+)/) {
$exec_max = $1;
} elsif (/^se\.statistics.{1,2}iowait_sum\s+:\s+(\S+)/) {
} elsif (/^iowait_sum\s+:\s+(\S+)/ || /^se\.statistics.{1,2}iowait_sum\s+:\s+(\S+)/) {
$iowait_sum = $1;
} elsif (/^se\.statistics.{1,2}iowait_count\s+:\s+(\S+)/) {
} elsif (/^iowait_count\s+:\s+(\S+)/ || /^se\.statistics.{1,2}iowait_count\s+:\s+(\S+)/) {
$iowait_count = $1;
} elsif (/^se\.sum_exec_runtime\s+:\s+(\S+)/) {
$exec_runtime = $1;
@ -967,6 +1157,46 @@ sub read_sched
$stat_valid = 1;
close($fh);
#cat /proc/1/task/1/cgroup
#12:cpu,cpuacct:/init.scope
#11:pids:/init.scope
#10:hugetlb:/
#9:memory:/init.scope
#8:rdma:/
#7:cpuset:/
#6:net_cls,net_prio:/
#5:devices:/init.scope
#4:blkio:/init.scope
#3:freezer:/
#2:perf_event:/
#1:name=systemd:/init.scope
#0::/init.scope
# Extract the pod id:
# /k8s-infra/kubepods/burstable/pode84531c2-0bb1-45f8-b27f-e779b858552d/fdeaea0e577a525a3d9e41655ee05dd9b4edf17ce4b1bf95803cae1518f43ca2
# Extract *.service or *.scope name:
# /system.slice/acpid.service
# /system.slice/system-ceph.slice/ceph-mds.scope
# parse /proc/<pid>/task/<tid>/cgroup
$file = '/proc/' . $pid . '/task/' . $tid . '/cgroup';
open($fh, $file) || next;
LOOP_CGROUP: while (<$fh>) {
if (/^\d+:(pids|cpu,cpuacct):(.*)/) {
$_ = $2;
if (/kubepods\/\w+\/(pod[a-z0-9-]+)\/\w+$/) {
$cgroup = $1;
} elsif (/\/([a-zA-Z0-9_-@:]+)\.\w+$/) {
$cgroup = $1;
} else {
$cgroup = '-'; # '-' looks prettier than '/'
}
$cgroup_valid = 1;
last LOOP_CGROUP;
}
}
close($fh);
# sched
if (defined $sched_valid) {
$::task{$tid}{'exec_runtime'} = $exec_runtime;
@ -1060,6 +1290,14 @@ sub read_sched
$::task{$tid}{'start_time'} = '';
$::task{$tid}{'task_cpu'} = 0;
}
# cgroup
if (defined $cgroup_valid) {
$::task{$tid}{'cgroup'} = $cgroup;
} else {
$::task{$tid}{'cgroup'} = '-';
}
}
}
@ -1327,6 +1565,7 @@ sub parse_schedtop_args {
local *::arg_sort,
local *::arg_print,
local *::arg_watch_cmd,
local *::arg_watch_cgroup,
local *::arg_watch_only,
local *::arg_watch_quiet,
local *::arg_trig_delay,
@ -1356,6 +1595,7 @@ sub parse_schedtop_args {
"sort=s", \$::arg_sort,
"print=s", \$::arg_print,
"watch-cmd=s@", \@::arg_watch_cmd,
"watch-cgroup=s@", \@::arg_watch_cgroup,
"watch-only", \$::arg_watch_only,
"watch-quiet", \$::arg_watch_quiet,
"trig-delay=i", \$::arg_trig_delay,
@ -1383,17 +1623,17 @@ sub parse_schedtop_args {
$fail = 1;
warn "$::TOOLNAME: Input error: --print=$::arg_print invalid; valid options are: brief, full\n";
}
if ((defined $::arg_watch_only) && !(@::arg_watch_cmd)) {
if ((defined $::arg_watch_only) && !(@::arg_watch_cmd || @::arg_watch_cgroup)) {
$fail = 1;
warn "$::TOOLNAME: Input error: --watch-only requires --watch-cmd option.\n";
warn "$::TOOLNAME: Input error: --watch-only requires --watch-cmd or --watch-cgroup option.\n";
}
if ((defined $::arg_watch_quiet) && !(@::arg_watch_cmd)) {
if ((defined $::arg_watch_quiet) && !(@::arg_watch_cmd || @::arg_watch_cgroup)) {
$fail = 1;
warn "$::TOOLNAME: Input error: --watch-quiet requires --watch-cmd option.\n";
warn "$::TOOLNAME: Input error: --watch-quiet requires --watch-cmd or --watch-cgroup option.\n";
}
if ((defined $::arg_trig_delay) && !(@::arg_watch_cmd)) {
if ((defined $::arg_trig_delay) && !(@::arg_watch_cmd || @::arg_watch_cgroup)) {
$fail = 1;
warn "$::TOOLNAME: Input error: --trig-delay requires --watch-cmd option.\n";
warn "$::TOOLNAME: Input error: --trig-delay requires --watch-cmd or --watch-cgroup option.\n";
}
if ((defined $::arg_trig_delay) && ($::arg_trig_delay < 1)) {
$fail = 1;
@ -1407,6 +1647,14 @@ sub parse_schedtop_args {
push(@::arg_watch_cmd, split(',', $cmd));
}
}
if (@::arg_watch_cgroup) {
my @cgroups = @::arg_watch_cgroup;
@::arg_watch_cgroup = ();
for my $cgroup (@cgroups) {
push(@::arg_watch_cgroup, split(',', $cgroup));
}
}
if (@::ARGV) {
$fail = 1;
warn "$::TOOLNAME: Input error: not expecting these options: '@::ARGV'.\n";
@ -1443,7 +1691,8 @@ sub Usage {
printf "Usage: $::TOOLNAME OPTIONS\n";
printf " [--delay=<seconds>] [--repeat=<num>] [--period=<seconds>]\n";
printf " [--reset-hwm] [--idle] [--sort=<cpu|io>] [--print=<brief|full>]\n";
printf " [--watch-cmd=tid1,cmd1,cmd2,...] [--watch-only] [--watch-quiet]\n";
printf " [--watch-cmd=tid1,cmd1,cmd2,...] [--watch-cgroup=cgroup1,...]\n";
printf " [--watch-only] [--watch-quiet]\n";
printf " [--trig-delay=time]\n";
printf " [--help]\n";
@ -1465,6 +1714,8 @@ sub ListHelp {
printf("Watch specific tasks or commands:\n");
printf(" --watch-cmd=tid1,cmd1,... : watch specific tids or 'comm' names\n");
printf(" (matches from beginning of comm with partial name, eg, --watch-cmd=sirq)\n");
printf(" --watch-cgroup=cgroup1,... : watch specific cgroup names\n");
printf(" (matches from beginning of cgroup with partial name, eg, --watch-cgroup=sm)\n");
printf(" --watch-only : display only watched tasks (reduces impact of tool)\n");
printf(" --watch-quiet : suppress output after watch starts\n");
printf("Trigger crash dump via sysrq:\n");

861
monitor-tools/scripts/watchpids Executable file
View File

@ -0,0 +1,861 @@
#!/usr/bin/perl
########################################################################
#
# Copyright (c) 2015-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
#
########################################################################
#
# Description:
# This displays process calling tree information for newly created
# processess.
#
# Usage: watchpids OPTIONS
# [--delay=<seconds>] [--repeat=<num>] [--period=<seconds>]
# [--help]
## TODO: JGAULD
# --tids
# --tree | --forest
# --simple
# --exclude=name
# --filter=affinity mask (TBD.. eg, 0x1000001 )
use strict;
use warnings;
use Data::Dumper;
use POSIX qw(uname strftime);
use Time::HiRes qw(clock_gettime usleep CLOCK_MONOTONIC CLOCK_REALTIME);
use Benchmark ':hireswallclock';
use Carp qw(croak carp);
use Math::BigInt;
use File::Find ();
# Define toolname
our $TOOLNAME = "watchpids";
our $VERSION = "0.1";
# Constants
use constant SI_k => 1.0E3;
use constant SI_M => 1.0E6;
use constant SI_G => 1.0E9;
use constant Ki => 1024.0;
use constant Mi => 1024.0*1024.0;
use constant Gi => 1024.0*1024.0*1024.0;
# Globals
our %opt_V = ();
our %opt_P = ();
our %percpu_0 = ();
our %percpu_1 = ();
our %task_0 = ();
our %task_1 = ();
our %task_n = ();
our %tids_0 = ();
our %tids_1 = ();
our %tids_w = ();
our %loadavg = ();
our $tm_0 = ();
our $tm_1 = ();
our $tr_0 = ();
our $tr_1 = ();
our $tm_elapsed = ();
our $tr_elapsed = ();
our $tm_final = ();
our $uptime = ();
our $num_cpus = 1;
our $affinity_mask = Math::BigInt->new('0');
our $w_aff = 10;
our $num_tasks = 0;
our $num_blk = 0;
our $num_state_D = 0;
our $USER_HZ = 100; # no easy way to get this
our $CLOCK_NS = SI_G / $USER_HZ;
our $print_host = 1;
our @cgroup_procs_paths = ();
our @cgroup_procs_match = ();
our @cgroup_tids = ();
# Argument list parameters
our ($arg_debug,
$arg_delay,
$arg_repeat,
$arg_period,
$arg_tids,
) = ();
#-------------------------------------------------------------------------------
# MAIN Program
#-------------------------------------------------------------------------------
my $ONE_BILLION = 1.0E9;
my $MIN_DELAY = 0.001;
my $MAX_DELAY = 0.001;
# benchmark variables
my ($bd, $b0, $b1);
my @policies = ('OT', 'FF', 'RR', 'BA', 'ID', 'UN', 'UN');
my @state_list = (
'pid', 'ppid', 'state', 'cgroup', 'comm', 'cmdline', 'affinity',
'VmSize', 'VmRSS', 'start_time',
'nice', 'policy', 'priority', 'rt_priority', 'task_cpu'
);
# Autoflush output
select(STDERR);
$| = 1;
select(STDOUT); # default
$| = 1;
# Parse input arguments and print tool usage if necessary
&parse_watchpids_args(
\$::arg_debug,
\$::arg_delay,
\$::arg_repeat,
\$::arg_period,
\$::arg_tids,
);
# Check for root user
if ($>) {
warn "$::TOOLNAME: requires root/sudo.\n";
exit 1;
}
# Print out some debugging information
if (defined $::arg_debug) {
$Data::Dumper::Indent = 1;
}
# Print out selected options
printf "selected options: delay = %.3fs, repeat = %d, tids = %s\n",
$::arg_delay, $::arg_repeat, $::arg_tids ? 'true' : 'false';
# Capture timestamp
$b0 = new Benchmark;
# Get number of logical cpus
&get_num_logical_cpus(\$::num_cpus);
$::affinity_mask = Math::BigInt->new('0');
for (my $i=0; $i < $::num_cpus; $i++) {
my $y = Math::BigInt->new('1');
$y->blsft($i);
$::affinity_mask->bior($y);
}
$w_aff = &max(length 'AFF', length $::affinity_mask->as_hex());
# Find cgroup.proc paths matching specified cgroup patterns
&find_matching_cgroup_procs(\@::cgroup_procs_match, \@::arg_watch_cgroup);
for my $file (@::cgroup_procs_match) {
print "matched cgroup:", $file, "\n";
}
# Get current hires epoc timestamp
$::tm_1 = clock_gettime(CLOCK_MONOTONIC);
$::tr_1 = clock_gettime(CLOCK_REALTIME);
$::tm_final = $::tm_1 + $::arg_delay*$::arg_repeat;
# Set initial delay
$::tm_elapsed = $::arg_delay;
$MAX_DELAY = $::arg_delay + $MIN_DELAY;
# Get list of pids and tids
&get_tids(\%::tids_1, \$::arg_tids);
# Get current scheduling info for all tids if new or requiring refresh
&read_sched(\%::tids_1, \%::task_0, \%::task_1, \%::task_n);
# Get current uptime
&get_uptime(\$::uptime);
# determine column sort order
my ($s_key1, $s_key2, $s_key3) = ();
($s_key1, $s_key2, $s_key3) = ('ppid', 'pid', 'tid');
# Main loop
REPEAT_LOOP: for (my $repeat=1; $repeat <= $::arg_repeat; $repeat++) {
# copy all state variables
$::tm_0 = (); $::tr_0 = (); %::tids_0 = (); %::task_0 = ();
$::tm_0 = $::tm_1; $::tr_0 = $::tr_1;
foreach my $tid (keys %::tids_1) { $::tids_0{$tid} = $::tids_1{$tid}; }
foreach my $tid (keys %::task_1) {
# TODO: JGAULD -- problem with undefined affinity in task_0
## TODO: BEGIN if ($::task_1{$tid}{'age'} == 0) {
foreach my $var (keys %{$::task_1{$tid}}) {
$::task_0{$tid}{$var} = $::task_1{$tid}{$var};
}
## TODO: END }
}
# estimate sleep delay to achieve desired interarrival by subtracting out
# the measured cpu runtime of the tool.
my $delay = $::arg_delay;
$delay = $MIN_DELAY if ($delay < $MIN_DELAY);
$delay = $MAX_DELAY if ($delay > $MAX_DELAY);
usleep( SI_M*$delay );
# Collect current state
$::tm_1 = (); $::tr_1 = (); %::percpu_1 = (); %::tids_1 = (); %::task_1 = (); %::task_n = ();
# Get current hires epoc timestamp
$::tm_1 = clock_gettime(CLOCK_MONOTONIC);
$::tr_1 = clock_gettime(CLOCK_REALTIME);
# Get list of pids and tids
&get_tids(\%::tids_1, \$::arg_tids);
# JGAULD: PRUNE LOGIC TBD
# Get current scheduling info for all tids if new or requiring refresh
&read_sched(\%::tids_1, \%::task_0, \%::task_1, \%::task_n);
my $num_new = keys %::task_n;
# Get current uptime
&get_uptime(\$::uptime);
# Delta calculation
$::tm_elapsed = $::tm_1 - $::tm_0;
$::tr_elapsed = $::tr_1 - $::tr_0;
# Print heading and column titles only if we have new pids
if ($num_new) {
# Print summary
&watchpids_header(
\$::tr_1,
\$::tm_elapsed,
\$::tr_elapsed,
\$::uptime,
\$::print_host
);
print "\n";
# Build up output line by specific area
my $L = ();
$L = '';
if ($::arg_tids) {
$L .= sprintf "%7s %7s %7s ", "PPID", "PID", "TID";
} else {
$L .= sprintf "%7s %7s ", "PPID", "PID";
}
$L .= sprintf "%1s %2s %*s %2s %3s %4s ",
"S", "P", $w_aff, "AFF", "PO", "NI", "PR";
$L .= sprintf "%-16s %-15s %s", "cgroup", "comm", "cmdline";
print $L, "\n";
}
# TODO: JGAULD : Use of uninitialized value in numeric comparison (<=>) at ./watchpids line 237.
#foreach my $tid (sort {($::task_1{$b}{$s_key1} <=> $::task_1{$a}{$s_key1}) or
# ($::task_1{$b}{$s_key2} <=> $::task_1{$a}{$s_key2}) or
# ($::task_1{$b}{$s_key3} <=> $::task_1{$a}{$s_key3})} keys %::task_n) {
# TODO: JGAULD make this configurable maybe? --long
my $COMMAND_LEN = 120;
# TODO: JGAULD -- add option for parent calling forest tree
foreach my $tid (keys %::task_n) {
# TODO: JGAULD : UNDEFINDED AFFINITY
my $aff = Math::BigInt->new('0')->as_hex();
if (defined $::task_1{$tid}{'affinity'}) {
$aff = $::task_1{$tid}{'affinity'}->as_hex();
} else {
# TODO: JGAULD -- DEBUG -- only field is 'age', no other keys
if (defined $::arg_debug) {
print Data::Dumper->Dump([\%::task_1{$tid}], [qw(task_1)]);
}
next;
}
# Build up output line by specific area
my $L = ();
$L = '';
if ($::arg_tids) {
$L .= sprintf "%7d %7d %7d ",
$::task_1{$tid}{'ppid'}, $::task_1{$tid}{'pid'}, $tid;
} else {
$L .= sprintf "%7d %7d ",
$::task_1{$tid}{'ppid'}, $::task_1{$tid}{'pid'};
}
$L .= sprintf "%1s %2d %*s %2s %3d %4d ",
$::task_1{$tid}{'state'}, $::task_1{$tid}{'task_cpu'}, $w_aff, $aff,
$policies[$::task_1{$tid}{'policy'}], $::task_1{$tid}{'nice'},
$::task_1{$tid}{'priority'};
$L .= sprintf "%-16s %-15s %s",
substr($::task_1{$tid}{'cgroup'}, 0, 16),
substr($::task_1{$tid}{'comm'}, 0, 15),
substr($::task_1{$tid}{'cmdline'}, 0, $COMMAND_LEN);
# JGAULD: SHORTEN: $::task_1{$tid}{'cmdline'};
print $L, "\n";
}
if ($num_new) {
print "\n";
}
# exit repeat loop if we have exceeded overall time
last if ($::tm_1 > $::tm_final);
} # REPEAT LOOP
# Print that tool has finished
print "done\n";
# Capture timestamp and report delta
$b1 = new Benchmark; $bd = Benchmark::timediff($b1, $b0);
printf "processing time: %s\n", timestr($bd);
exit 0;
#-------------------------------------------------------------------------------
# Convert a number to SI unit xxx.yyyG
sub format_SI
{
(my $value) = @_;
if ($value >= SI_G) {
return sprintf("%.3fG", $value/SI_G);
} elsif ($value >= SI_M) {
return sprintf("%.3fM", $value/SI_M);
} elsif ($value >= SI_k) {
return sprintf("%.3fk", $value/SI_k);
} else {
return sprintf("%.0f", $value);
}
}
# Convert to IEC binary unit xxx.yyyGi
# Since underlying memory units are in pages, don't need decimals for Ki
sub format_IEC
{
(my $value) = @_;
if ($value >= Gi) {
return sprintf("%.3fGi", $value/Gi);
} elsif ($value >= Mi) {
return sprintf("%.3fMi", $value/Mi);
} elsif ($value >= Ki) {
return sprintf("%.0fKi", $value/Ki);
} else {
return sprintf("%.0f", $value);
}
}
# Determine max of array
sub max {
my ($max, @vars) = @_;
for (@vars) {
$max = $_ if $_ > $max;
}
return $max;
}
# Determine tids and pid mapping by walking /proc/<pid>/task/<tid>
sub get_tids
{
(local *::tids, *::arg_tids) = @_;
my (@pids_, @tids_) = ();
my ($dh, $pid, $tid);
# get pid list
my $dir = '/proc';
opendir($dh, $dir) || croak "Cannot open directory: $dir ($!)";
@pids_ = grep { /^\d+$/ && -d "$dir/$_" } readdir($dh);
closedir $dh;
if ($::arg_tids) {
# get tid list
foreach $pid (@pids_) {
$dir = '/proc/' . $pid . '/task';
opendir(my $dh, $dir) || next;
@tids_ = grep { /^\d+$/ && -d "$dir/$_" } readdir($dh);
closedir $dh;
foreach $tid (@tids_) { $::tids{$tid} = $pid; }
}
} else {
foreach $pid (@pids_) { $::tids{$pid} = $pid; }
}
}
# Find module difficult, storing result in global variable
sub wanted_cgroup_procs {
my $F = $File::Find::name;
if ($_ eq 'cgroup.procs') {
push @::cgroup_procs_paths, $F;
}
}
# Find cgroup.proc paths matching specified cgroup patterns
sub find_matching_cgroup_procs
{
(local *::cgroup_procs_match, local *::arg_watch_cgroup) = @_;
# Find all cgroup.procs paths for the pids cgroup controller
File::Find::find(\&wanted_cgroup_procs, '/sys/fs/cgroup/pids');
}
# Get array of tids corresponding to matching cgroups
sub read_cgroup_procs
{
(local *::tids, local *::cgroup_procs_match) = @_;
my $tid = ();
# reset scheduling hi-water-marks by writing '0' to each task
foreach my $cgroup_procs (@::cgroup_procs_match) {
open(my $fh, $cgroup_procs) || goto SKIP_PROCS;
while (<$fh>) {
if (/^(\d+)$/) {
$tid = $1;
push @::tids, $tid;
}
}
close($fh);
SKIP_PROCS:;
}
}
# Parse cpu and scheduling info for each tid
# - ignore the specific tid if there is incomplete data,
# (i.e., cannot obtain info because task has died,
# eg. missing ./stat, ./status, ./cmdline, ./wchan)
#
sub read_sched
{
(local *::tids, local *::task_0, local *::task, local *::task_n) = @_;
# TODO: JGAULD -- consider changing this to global;
# maybe it has to be input option; very unlikely folks
# dynamically changing scheduling attributes
my $TASK_REFRESH_INTERVAL = 100;
%::task = ();
%::task_n = ();
foreach my $tid (keys %::tids) {
my ($fh, $file, $pid, $comm, $cmdline, $wchan, $id) = ();
my ($tpid, $tcomm, $state, $ppid, $pgrp, $sid,
$tty_nr, $tty_pgrp, $flags,
$min_flt, $cmin_flt, $maj_flt, $cmaj_flt,
$utime, $stime, $cutime, $cstime,
$priority, $nice, $num_threads,
$it_real_value, $start_time,
$vsize, $rss, $rsslim,
$start_code, $end_code, $start_stack, $esp, $eip,
$pending, $blocked, $sigign, $sigcatch, $wchan_addr,
$dum1, $dum2, $exit_signal, $task_cpu,
$rt_priority, $policy, $blkio_ticks,
$gtime, $cgtime,
$start_data, $end_data, $start_brk, $arg_start, $arg_end,
$env_start, $env_end, $exit_code) = ();
my ($cgroup) = ();
my ($VmSize, $VmRSS) = ();
my $Cpus_allowed = Math::BigInt->new('0');
my $affinity = Math::BigInt->new('0');
my ($status_valid, $cmdline_valid, $stat_valid, $cgroup_valid) = ();
$pid = $::tids{$tid};
# JGAULD: read stuff if new, else skip
my $bypass_refresh = 1;
if (exists $::task_0{$tid}) {
# Copy previous values.
foreach my $var (keys %{$::task_0{$tid}}) {
$::task{$tid}{$var} = $::task_0{$tid}{$var};
}
$::task{$tid}{'age'} = $::task_0{$tid}{'age'} + 1;
if ($::task{$tid}{'age'} == $TASK_REFRESH_INTERVAL) {
$::task{$tid}{'age'} = 0;
$bypass_refresh = 0;
}
} else {
$::task_n{$tid} = 1;
$::task{$tid}{'age'} = 0;
$bypass_refresh = 0;
}
next if ($bypass_refresh);
# parse /proc/<pid>/task/<tid>/status
$file = '/proc/' . $pid . '/task/' . $tid . '/status';
open($fh, $file) || next;
LOOP_STATUS: while (<$fh>) {
if (/^Name:\s+(.*)/) {
$comm = $1;
} elsif (/^State:\s+(\S+)/) {
$state = $1;
} elsif (/^PPid:\s+(\S+)/) {
$ppid = $1;
} elsif (/^VmSize:\s+(\S+)/) {
$VmSize = $1;
} elsif (/^VmRSS:\s+(\S+)/) {
$VmRSS = $1;
} elsif (/^Cpus_allowed:\s+([0]+,)*(\S+)/) {
my $h = $2; $h =~ tr/,/_/;
$Cpus_allowed = Math::BigInt->from_hex($h);
$affinity = $Cpus_allowed->band($::affinity_mask);
$status_valid = 1;
last LOOP_STATUS;
}
}
close($fh);
# parse /proc/<pid>/task/<tid>/cmdline
$file = '/proc/' . $pid . '/task/' . $tid . '/cmdline';
open($fh, $file) || next;
LOOP_CMDLINE: while (<$fh>) {
if (/^(.*)$/) {
$cmdline = $1;
$cmdline =~ s/\000/ /g;
$cmdline_valid = 1;
last LOOP_CMDLINE;
}
}
if (!$cmdline_valid) {
$cmdline_valid = 1;
$cmdline = $comm;
}
close($fh);
#Table 1-4: Contents of the stat files (as of 2.6.30-rc7)
#..............................................................................
# Field Content
# tpid process id (or tid, if /proc/<pid>/task/<tid>/stat)
# tcomm filename of the executable
# state state (R is running, S is sleeping, D is sleeping in an
# uninterruptible wait, Z is zombie, T is traced or stopped)
# ppid process id of the parent process
# pgrp pgrp of the process
# sid session id
# tty_nr tty the process uses
# tty_pgrp pgrp of the tty
# flags task flags
# min_flt number of minor faults
# cmin_flt number of minor faults with child's
# maj_flt number of major faults
# cmaj_flt number of major faults with child's
# utime user mode jiffies
# stime kernel mode jiffies
# cutime user mode jiffies with child's
# cstime kernel mode jiffies with child's
# priority priority level
# nice nice level
# num_threads number of threads
# it_real_value (obsolete, always 0)
# start_time time the process started after system boot
# vsize virtual memory size
# rss resident set memory size
# rsslim current limit in bytes on the rss
# start_code address above which program text can run
# end_code address below which program text can run
# start_stack address of the start of the main process stack
# esp current value of ESP
# eip current value of EIP
# pending bitmap of pending signals
# blocked bitmap of blocked signals
# sigign bitmap of ignored signals
# sigcatch bitmap of catched signals
# wchan address where process went to sleep
# 0 (place holder)
# 0 (place holder)
# exit_signal signal to send to parent thread on exit
# task_cpu which CPU the task is scheduled on
# rt_priority realtime priority
# policy scheduling policy (man sched_setscheduler)
# blkio_ticks time spent waiting for block IO
# gtime guest time of the task in jiffies
# cgtime guest time of the task children in jiffies
# start_data address above which program data+bss is placed
# end_data address below which program data+bss is placed
# start_brk address above which program heap can be expanded with brk()
# arg_start address above which program command line is placed
# arg_end address below which program command line is placed
# env_start address above which program environment is placed
# env_end address below which program environment is placed
# exit_code the thread's exit_code in the form reported by the waitpid system call
# parse /proc/<pid>/task/<tid>/stat
$file = '/proc/' . $pid . '/task/' . $tid . '/stat';
my $dummy;
open($fh, $file) || next;
$_ = <$fh>;
($tpid, $tcomm, $dummy) = /^(\d+)\s+\((.*)\)\s+(.*)/;
($state, $ppid, $pgrp, $sid,
$tty_nr, $tty_pgrp, $flags,
$min_flt, $cmin_flt, $maj_flt, $cmaj_flt,
$utime, $stime, $cutime, $cstime,
$priority, $nice, $num_threads,
$it_real_value, $start_time,
$vsize, $rss, $rsslim,
$start_code, $end_code, $start_stack, $esp, $eip,
$pending, $blocked, $sigign, $sigcatch, $wchan_addr,
$dum1, $dum2, $exit_signal, $task_cpu,
$rt_priority, $policy, $blkio_ticks, $gtime, $cgtime,
$start_data, $end_data, $start_brk, $arg_start, $arg_end,
$env_start, $env_end, $exit_code) = split(/\s+/, $dummy);
$stat_valid = 1;
close($fh);
#cat /proc/1/task/1/cgroup
#12:cpu,cpuacct:/init.scope
#11:pids:/init.scope
#10:hugetlb:/
#9:memory:/init.scope
#8:rdma:/
#7:cpuset:/
#6:net_cls,net_prio:/
#5:devices:/init.scope
#4:blkio:/init.scope
#3:freezer:/
#2:perf_event:/
#1:name=systemd:/init.scope
#0::/init.scope
# Extract the pod id:
# /k8s-infra/kubepods/burstable/pode84531c2-0bb1-45f8-b27f-e779b858552d/fdeaea0e577a525a3d9e41655ee05dd9b4edf17ce4b1bf95803cae1518f43ca2
# Extract *.service or *.scope name:
# /system.slice/acpid.service
# /system.slice/system-ceph.slice/ceph-mds.scope
# parse /proc/<pid>/task/<tid>/cgroup
$file = '/proc/' . $pid . '/task/' . $tid . '/cgroup';
open($fh, $file) || next;
LOOP_CGROUP: while (<$fh>) {
if (/^\d+:(pids|cpu,cpuacct):(.*)/) {
$_ = $2;
if (/kubepods\/\w+\/(pod[a-z0-9-]+)\/\w+$/) {
$cgroup = $1;
} elsif (/\/([a-zA-Z0-9_-@:]+)\.\w+$/) {
$cgroup = $1;
} else {
$cgroup = '-'; # '-' looks prettier than '/'
}
$cgroup_valid = 1;
last LOOP_CGROUP;
}
}
close($fh);
# status
if (defined $status_valid) {
$::task{$tid}{'pid'} = $pid;
$::task{$tid}{'comm'} = $comm;
$::task{$tid}{'state'} = $state;
$::task{$tid}{'ppid'} = $ppid;
$::task{$tid}{'VmSize'} = $VmSize;
$::task{$tid}{'VmRSS'} = $VmRSS;
$::task{$tid}{'affinity'} = $affinity;
} else {
$::task{$tid}{'pid'} = 0;
$::task{$tid}{'comm'} = '-';
$::task{$tid}{'state'} = '-';
$::task{$tid}{'ppid'} = 0;
$::task{$tid}{'VmSize'} = 0;
$::task{$tid}{'VmRSS'} = 0;
$::task{$tid}{'affinity'} = Math::BigInt->new('0');
}
# cmdline
if (defined $cmdline_valid) {
$::task{$tid}{'cmdline'} = $cmdline;
} else {
$::task{$tid}{'cmdline'} = $comm;
}
# stat
if (defined $stat_valid) {
$::task{$tid}{'nice'} = $nice;
$::task{$tid}{'policy'} = $policy;
$::task{$tid}{'priority'} = $priority;
$::task{$tid}{'rt_priority'} = $rt_priority;
$::task{$tid}{'start_time'} = $start_time;
$::task{$tid}{'task_cpu'} = $task_cpu;
} else {
$::task{$tid}{'nice'} = 0;
$::task{$tid}{'policy'} = '-';
$::task{$tid}{'priority'} = 0;
$::task{$tid}{'rt_priority'} = 0;
$::task{$tid}{'start_time'} = '';
$::task{$tid}{'task_cpu'} = 0;
}
# cgroup
if (defined $cgroup_valid) {
$::task{$tid}{'cgroup'} = $cgroup;
} else {
$::task{$tid}{'cgroup'} = '-';
}
}
}
# Parse uptime from /proc/uptime
sub get_uptime
{
(local *::uptime) = @_;
$::uptime = 0.0;
my $file = '/proc/uptime';
open(my $fh, $file) || croak "Cannot open file: $file ($!)";
$_ = <$fh>;
if (/^(\S+)\s+\S+/) {
$::uptime = $1;
}
close($fh);
}
# Get number of online logical cpus
sub get_num_logical_cpus {
(local *::num_cpus) = @_;
$::num_cpus = 0;
my $file = "/proc/cpuinfo";
open(my $fh, $file) || croak "Cannot open file: $file ($!)";
LOOP_CPUINFO: while (<$fh>) {
if (/^[Pp]rocessor\s+:\s\d+/) {
$::num_cpus++;
}
}
close($fh);
}
# Print header
sub watchpids_header {
(local *::tr_1,
local *::tm_elapsed,
local *::tr_elapsed,
local *::uptime,
) = @_;
# process epoch to get current timestamp
my $mm_in_s = 60;
my $hh_in_s = 60*60;
my $dd_in_s = 24*60*60;
my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst);
($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($::tr_1);
my $msec = 1000.0*($::tr_1 - int($::tr_1));
# convert uptime to elapsed <d>:<hh>:<mm>:<ss>
my ($up, $up_dd, $up_hh, $up_mm, $up_ss);
$up = int($::uptime);
$up_dd = int($up/$dd_in_s);
$up -= $dd_in_s*$up_dd;
$up_hh = int($up/$hh_in_s);
$up -= $hh_in_s*$up_hh;
$up_mm = int($up/$mm_in_s);
$up -= $mm_in_s*$up_mm;
$up_ss = $up;
#watchpids -- 2014/03/03 02:00:21.357 dt:2050.003 ms up:6:13:00:56
printf "%s %s -- ".
"%4d-%02d-%02d %02d:%02d:%02d.%03d ".
"dt:%.3f ms ".
"up:%d:%02d:%02d:%02d\n",
$::TOOLNAME, $::VERSION,
1900+$year, 1+$mon, $mday, $hour, $min, $sec, $msec,
$::tm_elapsed*1000.0,
$up_dd, $up_hh, $up_mm, $up_ss;
}
# Parse and validate command line arguments
sub parse_watchpids_args {
(local *::arg_debug,
local *::arg_delay,
local *::arg_repeat,
local *::arg_period,
local *::arg_tids,
) = @_;
# Local variables
my ($fail, $arg_help);
# Use the Argument processing module
use Getopt::Long;
# Print usage if no arguments
if (!@::ARGV) {
&Usage();
exit 0;
}
# Process input arguments
$fail = 0;
GetOptions(
"debug:i", \$::arg_debug,
"delay=f", \$::arg_delay,
"period=i", \$::arg_period,
"repeat=i", \$::arg_repeat,
"tids", \$::arg_tids,
"help|h", \$arg_help
) || GetOptionsMessage();
# Print help documentation if user has selected --help
&ListHelp() if (defined $arg_help);
# Validate options
if ((defined $::arg_repeat) && (defined $::arg_period)) {
$fail = 1;
warn "$::TOOLNAME: Input error: cannot specify both --repeat and --period options.\n";
}
if ((defined $::arg_delay) && ($::arg_delay < 0.01)) {
$fail = 1;
warn "$::TOOLNAME: Input error: --delay %f is less than 0.01.\n",
$::arg_delay;
}
$::arg_tids = (defined $::arg_tids) ? 1 : 0;
if (@::ARGV) {
$fail = 1;
warn "$::TOOLNAME: Input error: not expecting these options: '@::ARGV'.\n";
}
# Set reasonable defaults
$::arg_delay ||= 1.0;
$::arg_repeat ||= 1;
if ($::arg_period) {
$::arg_repeat = $::arg_period / $::arg_delay;
} else {
$::arg_period = $::arg_delay * $::arg_repeat;
}
# Upon missing or invalid options, print usage
if ($fail == 1) {
&Usage();
exit 1;
}
}
# Print out a warning message and usage
sub GetOptionsMessage {
warn "$::TOOLNAME: Error processing input arguments.\n";
&Usage();
exit 1;
}
# Print out program usage
sub Usage {
printf "Usage: $::TOOLNAME OPTIONS\n";
printf " [--delay=<seconds>] [--repeat=<num>] [--period=<seconds>]\n";
printf " [--help]\n";
printf "\n";
}
# Print tool help
sub ListHelp {
printf "$::TOOLNAME -- display per-task scheduling occupancy\n";
&Usage();
printf "Options: miscellaneous\n";
printf " --delay=<seconds> : output interval (seconds): default: 1.0\n";
printf " --repeat=<num> : number of repeat samples: default: 1\n";
printf " --period=<seconds> : overall tool duration (seconds): default: --\n";
printf " --help : this help\n";
exit 0;
}
1;

11
tox.ini
View File

@ -71,8 +71,17 @@ basepython = python3
description = Dummy environment to allow pylint to be run in subdir tox
# deps = -r{toxinidir}/test-requirements.txt
[bandit]
# The following bandit tests are being skipped:
# B602: Test for use of popen with shell equals true
#
# Note: 'skips' entry cannot be split across multiple lines
#
skips = B602
exclude = tests
[testenv:bandit]
basepython = python3
description = Bandit code scan for *.py files under config folder
deps = -r{toxinidir}/test-requirements.txt
commands = bandit -r {toxinidir}/ -x '**/.tox/**,**/.eggs/**' -lll
commands = bandit --ini tox.ini -n 5 -r {toxinidir}/ -x '**/.tox/**,**/.eggs/**' -lll