Add kubernetes 1.33.0 patches

This change ports the following kubernetes 1.33.0 patches.

Following 2 patches was refactored and the rest applied
cleanly:

kubelet-cpumanager-platform-pods-on-reserved-cpus.patch
kubelet-cpumanager-introduce-concept-of-isolated-CPU.patch

kubeadm-platform-pods-zero-cpu-resources-readiness-p.patch
kubelet-isolcpus-SMT-aware-sorted-allocation.patch
kubelet-cpumanager-keep-normal-containers-off-reserv.patch
kubeadm-reduce-UpgradeManifestTimeout.patch
Revert-kubeadm-use-new-etcd-livez-and-readyz-endpoint.patch

Note: The upstream logspam patch is not required in K8S 1.34:
      kubelet-reduce-logspam-calculating-sandbox-resources.patch

      The change to disable CFS quota throttling at the container
      level for Guaranteed QoS pods has been integrated in
      Kubernetes v1.33. Therefore, the patch
      kubelet-disable-CFS-quota-throttling-non-integer-cpu.patch
      is no longer needed and is being dropped.

Test Plan:
PASS: Kubernetes package 1.33.0 builds properly.
PASS: Tested Kubernetes patches on a running system.
PASS: Run following make tests successfully:
      make test WHAT=./pkg/kubelet/cm GOFLAGS="-v"
      make test WHAT=./pkg/kubelet/cm/cpumanager GOFLAGS="-v"
      make test WHAT=./pkg/kubelet/cm/cpumanager/state GOFLAGS="-v"
      make test WHAT=./pkg/kubelet/cm/cpumanager/topology GOFLAGS="-v"
      make test WHAT=./pkg/kubelet/cm/topologymanager GOFLAGS="-v"
      make test WHAT=./pkg/kubelet/cm/devicemanager GOFLAGS="-v"
      make test WHAT=./pkg/kubelet/cm/memorymanager GOFLAGS="-v"
      make test WHAT=./pkg/kubelet/kuberuntime GOFLAGS="-v"
      make test WHAT=./cmd/kubeadm/app/constants GOFLAGS="-v"
      make test WHAT=./cmd/kubeadm/app/phases/controlplane GOFLAGS="-v"
      make test WHAT=./pkg/kubelet/cm GOFLAGS="-v"
      make test WHAT=./cmd/kubeadm/app/phases/addons/dns/ GOFLAGS="-v"

Story: 2011340
Task: 52296

Change-Id: Idcfd93340d470efca4cfb86fc94325ab9304e6d0
Signed-off-by: Saba Touheed Mujawar <sabatouheed.mujawar@windriver.com>
This commit is contained in:
Saba Touheed Mujawar
2025-06-04 06:17:59 -04:00
parent 206d4fe69a
commit d1fde6f933
9 changed files with 2618 additions and 0 deletions

View File

@ -0,0 +1,123 @@
From 0d447af5f76da73035fd290764b3a70741392692 Mon Sep 17 00:00:00 2001
From: Saba Touheed Mujawar <sabatouheed.mujawar@windriver.com>
Date: Tue, 3 Jun 2025 11:45:23 -0400
Subject: [PATCH] Revert "kubeadm: use new etcd /livez and /readyz endpoints"
This reverts commit eeac2dda7091c1f1ea100e7637291bba2345bb0b and
partially reverts commit a04cc020f1b3992fcf185ef2b1bf12c55815fb5a
to install K8s 1.33.0 with the existing etcd version 3.4. This patch
needs to be removed after upgrading the etcd version to >= 3.5.11.
Signed-off-by: Saba Touheed Mujawar <sabatouheed.mujawar@windriver.com>
---
cmd/kubeadm/app/constants/constants.go | 2 +-
cmd/kubeadm/app/constants/constants_test.go | 12 ++++--------
cmd/kubeadm/app/phases/etcd/local.go | 9 +++------
cmd/kubeadm/app/phases/etcd/local_test.go | 13 ++-----------
4 files changed, 10 insertions(+), 26 deletions(-)
diff --git a/cmd/kubeadm/app/constants/constants.go b/cmd/kubeadm/app/constants/constants.go
index 695bc6e3fe6..b00156f1c0d 100644
--- a/cmd/kubeadm/app/constants/constants.go
+++ b/cmd/kubeadm/app/constants/constants.go
@@ -323,7 +323,7 @@ const (
KubeletHealthzPort = 10248
// MinExternalEtcdVersion indicates minimum external etcd version which kubeadm supports
- MinExternalEtcdVersion = "3.5.11-0"
+ MinExternalEtcdVersion = "3.4.13-4"
// DefaultEtcdVersion indicates the default etcd version that kubeadm uses
DefaultEtcdVersion = "3.5.21-0"
diff --git a/cmd/kubeadm/app/constants/constants_test.go b/cmd/kubeadm/app/constants/constants_test.go
index 779ec5fda9a..5a46ee20b0c 100644
--- a/cmd/kubeadm/app/constants/constants_test.go
+++ b/cmd/kubeadm/app/constants/constants_test.go
@@ -98,15 +98,11 @@ func TestGetStaticPodFilepath(t *testing.T) {
}
}
-func TestEtcdSupportedVersionLength(t *testing.T) {
- const max = 4
- if len(SupportedEtcdVersion) > max {
- t.Fatalf("SupportedEtcdVersion must not include more than %d versions", max)
- }
-}
-
func TestEtcdSupportedVersion(t *testing.T) {
var supportedEtcdVersion = map[uint8]string{
+ 13: "3.2.24",
+ 14: "3.3.10",
+ 15: "3.3.10",
16: "3.3.17-0",
17: "3.4.3-0",
18: "3.4.3-0",
@@ -125,7 +121,7 @@ func TestEtcdSupportedVersion(t *testing.T) {
},
{
kubernetesVersion: "1.10.1",
- expectedVersion: version.MustParseSemantic("3.3.17-0"),
+ expectedVersion: version.MustParseSemantic("3.2.24"),
expectedWarning: true,
expectedError: false,
},
diff --git a/cmd/kubeadm/app/phases/etcd/local.go b/cmd/kubeadm/app/phases/etcd/local.go
index b7cb697a843..d37a95ee33e 100644
--- a/cmd/kubeadm/app/phases/etcd/local.go
+++ b/cmd/kubeadm/app/phases/etcd/local.go
@@ -220,12 +220,9 @@ func GetEtcdPodSpec(cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.A
v1.ResourceMemory: resource.MustParse("100Mi"),
},
},
- // The etcd probe endpoints are explained here:
- // https://github.com/kubernetes/kubeadm/issues/3039
- LivenessProbe: staticpodutil.LivenessProbe(probeHostname, "/livez", probePort, probeScheme),
- ReadinessProbe: staticpodutil.ReadinessProbe(probeHostname, "/readyz", probePort, probeScheme),
- StartupProbe: staticpodutil.StartupProbe(probeHostname, "/readyz", probePort, probeScheme, componentHealthCheckTimeout),
- Env: kubeadmutil.MergeKubeadmEnvVars(cfg.Etcd.Local.ExtraEnvs),
+ LivenessProbe: staticpodutil.LivenessProbe(probeHostname, "/health?exclude=NOSPACE&serializable=true", probePort, probeScheme),
+ StartupProbe: staticpodutil.StartupProbe(probeHostname, "/health?serializable=false", probePort, probeScheme, componentHealthCheckTimeout),
+ Env: kubeadmutil.MergeKubeadmEnvVars(cfg.Etcd.Local.ExtraEnvs),
},
etcdMounts,
// etcd will listen on the advertise address of the API server, in a different port (2379)
diff --git a/cmd/kubeadm/app/phases/etcd/local_test.go b/cmd/kubeadm/app/phases/etcd/local_test.go
index 50cd5a96c6c..e04c8655eb5 100644
--- a/cmd/kubeadm/app/phases/etcd/local_test.go
+++ b/cmd/kubeadm/app/phases/etcd/local_test.go
@@ -129,22 +129,13 @@ spec:
failureThreshold: 8
httpGet:
host: 127.0.0.1
- path: /livez
+ path: /health?exclude=NOSPACE&serializable=true
port: 2381
scheme: HTTP
initialDelaySeconds: 10
periodSeconds: 10
timeoutSeconds: 15
name: etcd
- readinessProbe:
- failureThreshold: 3
- httpGet:
- host: 127.0.0.1
- path: /readyz
- port: 2381
- scheme: HTTP
- periodSeconds: 1
- timeoutSeconds: 15
resources:
requests:
cpu: 100m
@@ -153,7 +144,7 @@ spec:
failureThreshold: 24
httpGet:
host: 127.0.0.1
- path: /readyz
+ path: /health?serializable=false
port: 2381
scheme: HTTP
initialDelaySeconds: 10
--
2.34.1

View File

@ -0,0 +1,248 @@
From f635c173aa2e969f8932a2463aa9bec4084fc2b9 Mon Sep 17 00:00:00 2001
From: Saba Touheed Mujawar <sabatouheed.mujawar@windriver.com>
Date: Fri, 25 Apr 2025 09:12:47 -0400
Subject: [PATCH] kubeadm: platform pods zero cpu resources readiness probe
timeout
This specifies zero CPU resources when creating the manifests
for the static platform pods, as a workaround for the lack of
separate resource tracking for platform resources.
This specifies zero CPU, Memory resources and readiness probe
timeout to 5 sec for the coredns deployment. manifests.go is
the main source file for this, not sure if the coredns.yaml
are used but they are updated to be consistent.
This specifies CPU limit of 4 for kube-apiserver pod so that it is
treated as a burstable QoS. This gives a boost of cgroup CPUShares
since the burstable cgroup parent has significantly more CPUShares
than best-effort on typical systems. This improves kube-apiserver
API responsiveness.
This increases kube-apiserver Readiness probe periodSeconds to 10
based on WRS/SS joint recommendation for minimum probe settings.
This reduces likelihood of kube-apiserver probe failure and
subsequent pod-restart under servere load. This also reduces CPU
demand.
Signed-off-by: Daniel Safta <daniel.safta@windriver.com>
Signed-off-by: Boovan Rajendran <boovan.rajendran@windriver.com>
Signed-off-by: Jim Gauld <James.Gauld@windriver.com>
Signed-off-by: Saba Touheed Mujawar <sabatouheed.mujawar@windriver.com>
---
cluster/addons/dns/coredns/coredns.yaml.base | 5 +++--
cluster/addons/dns/coredns/coredns.yaml.in | 5 +++--
cluster/addons/dns/coredns/coredns.yaml.sed | 5 +++--
cmd/kubeadm/app/phases/addons/dns/dns_test.go | 10 ++++++----
cmd/kubeadm/app/phases/addons/dns/manifests.go | 5 +++--
.../app/phases/controlplane/manifests.go | 10 ++++++----
cmd/kubeadm/app/util/staticpod/utils.go | 17 ++++++++++++++++-
7 files changed, 40 insertions(+), 17 deletions(-)
diff --git a/cluster/addons/dns/coredns/coredns.yaml.base b/cluster/addons/dns/coredns/coredns.yaml.base
index 5baf13c994f..394d4f6b38b 100644
--- a/cluster/addons/dns/coredns/coredns.yaml.base
+++ b/cluster/addons/dns/coredns/coredns.yaml.base
@@ -139,8 +139,8 @@ spec:
limits:
memory: __DNS__MEMORY__LIMIT__
requests:
- cpu: 100m
- memory: 70Mi
+ cpu: 0
+ memory: 0
args: [ "-conf", "/etc/coredns/Corefile" ]
volumeMounts:
- name: config-volume
@@ -170,6 +170,7 @@ spec:
path: /ready
port: 8181
scheme: HTTP
+ timeoutSeconds: 5
securityContext:
allowPrivilegeEscalation: false
capabilities:
diff --git a/cluster/addons/dns/coredns/coredns.yaml.in b/cluster/addons/dns/coredns/coredns.yaml.in
index 692f72a06df..8f11b069cc5 100644
--- a/cluster/addons/dns/coredns/coredns.yaml.in
+++ b/cluster/addons/dns/coredns/coredns.yaml.in
@@ -139,8 +139,8 @@ spec:
limits:
memory: 'dns_memory_limit'
requests:
- cpu: 100m
- memory: 70Mi
+ cpu: 0
+ memory: 0
args: [ "-conf", "/etc/coredns/Corefile" ]
volumeMounts:
- name: config-volume
@@ -170,6 +170,7 @@ spec:
path: /ready
port: 8181
scheme: HTTP
+ timeoutSeconds: 5
securityContext:
allowPrivilegeEscalation: false
capabilities:
diff --git a/cluster/addons/dns/coredns/coredns.yaml.sed b/cluster/addons/dns/coredns/coredns.yaml.sed
index 48ed09c8212..ce850412d33 100644
--- a/cluster/addons/dns/coredns/coredns.yaml.sed
+++ b/cluster/addons/dns/coredns/coredns.yaml.sed
@@ -139,8 +139,8 @@ spec:
limits:
memory: $DNS_MEMORY_LIMIT
requests:
- cpu: 100m
- memory: 70Mi
+ cpu: 0
+ memory: 0
args: [ "-conf", "/etc/coredns/Corefile" ]
volumeMounts:
- name: config-volume
@@ -170,6 +170,7 @@ spec:
path: /ready
port: 8181
scheme: HTTP
+ timeoutSeconds: 5
securityContext:
allowPrivilegeEscalation: false
capabilities:
diff --git a/cmd/kubeadm/app/phases/addons/dns/dns_test.go b/cmd/kubeadm/app/phases/addons/dns/dns_test.go
index 4e57f3b017a..31b1a29da66 100644
--- a/cmd/kubeadm/app/phases/addons/dns/dns_test.go
+++ b/cmd/kubeadm/app/phases/addons/dns/dns_test.go
@@ -716,8 +716,8 @@ spec:
limits:
memory: 170Mi
requests:
- cpu: 100m
- memory: 70Mi
+ cpu: 0
+ memory: 0
args: [ "-conf", "/etc/coredns/Corefile" ]
volumeMounts:
- name: config-volume
@@ -747,6 +747,7 @@ spec:
path: /ready
port: 8181
scheme: HTTP
+ timeoutSeconds: 5
securityContext:
allowPrivilegeEscalation: false
capabilities:
@@ -1000,8 +1001,8 @@ spec:
limits:
memory: 170Mi
requests:
- cpu: 100m
- memory: 70Mi
+ cpu: 0
+ memory: 0
args: [ "-conf", "/etc/coredns/Corefile" ]
volumeMounts:
- name: config-volume
@@ -1031,6 +1032,7 @@ spec:
path: /ready
port: 8181
scheme: HTTP
+ timeoutSeconds: 5
securityContext:
allowPrivilegeEscalation: false
capabilities:
diff --git a/cmd/kubeadm/app/phases/addons/dns/manifests.go b/cmd/kubeadm/app/phases/addons/dns/manifests.go
index 1358733b5be..8bc4ea49b9d 100644
--- a/cmd/kubeadm/app/phases/addons/dns/manifests.go
+++ b/cmd/kubeadm/app/phases/addons/dns/manifests.go
@@ -104,8 +104,8 @@ spec:
limits:
memory: 170Mi
requests:
- cpu: 100m
- memory: 70Mi
+ cpu: 0
+ memory: 0
args: [ "-conf", "/etc/coredns/Corefile" ]
volumeMounts:
- name: config-volume
@@ -135,6 +135,7 @@ spec:
path: /ready
port: 8181
scheme: HTTP
+ timeoutSeconds: 5
securityContext:
allowPrivilegeEscalation: false
capabilities:
diff --git a/cmd/kubeadm/app/phases/controlplane/manifests.go b/cmd/kubeadm/app/phases/controlplane/manifests.go
index e0abc479b22..fe7215454f3 100644
--- a/cmd/kubeadm/app/phases/controlplane/manifests.go
+++ b/cmd/kubeadm/app/phases/controlplane/manifests.go
@@ -67,8 +67,10 @@ func GetStaticPodSpecs(cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmap
LivenessProbe: staticpodutil.LivenessProbe(staticpodutil.GetAPIServerProbeAddress(endpoint), "/livez", endpoint.BindPort, v1.URISchemeHTTPS),
ReadinessProbe: staticpodutil.ReadinessProbe(staticpodutil.GetAPIServerProbeAddress(endpoint), "/readyz", endpoint.BindPort, v1.URISchemeHTTPS),
StartupProbe: staticpodutil.StartupProbe(staticpodutil.GetAPIServerProbeAddress(endpoint), "/livez", endpoint.BindPort, v1.URISchemeHTTPS, componentHealthCheckTimeout),
- Resources: staticpodutil.ComponentResources("250m"),
- Env: kubeadmutil.MergeKubeadmEnvVars(proxyEnvs, cfg.APIServer.ExtraEnvs),
+ // WRS: Increase kube-apiserver cgroup CPUShares to improve API responsiveness;
+ // achieved by setting CPU Limits to make it burstable QoS.
+ Resources: staticpodutil.ComponentLimitResources("0", "4"),
+ Env: kubeadmutil.MergeKubeadmEnvVars(proxyEnvs, cfg.APIServer.ExtraEnvs),
}, mounts.GetVolumes(kubeadmconstants.KubeAPIServer),
map[string]string{kubeadmconstants.KubeAPIServerAdvertiseAddressEndpointAnnotationKey: endpoint.String()}),
kubeadmconstants.KubeControllerManager: staticpodutil.ComponentPod(v1.Container{
@@ -79,7 +81,7 @@ func GetStaticPodSpecs(cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmap
VolumeMounts: staticpodutil.VolumeMountMapToSlice(mounts.GetVolumeMounts(kubeadmconstants.KubeControllerManager)),
LivenessProbe: staticpodutil.LivenessProbe(staticpodutil.GetControllerManagerProbeAddress(cfg), "/healthz", kubeadmconstants.KubeControllerManagerPort, v1.URISchemeHTTPS),
StartupProbe: staticpodutil.StartupProbe(staticpodutil.GetControllerManagerProbeAddress(cfg), "/healthz", kubeadmconstants.KubeControllerManagerPort, v1.URISchemeHTTPS, componentHealthCheckTimeout),
- Resources: staticpodutil.ComponentResources("200m"),
+ Resources: staticpodutil.ComponentResources("0"),
Env: kubeadmutil.MergeKubeadmEnvVars(proxyEnvs, cfg.ControllerManager.ExtraEnvs),
}, mounts.GetVolumes(kubeadmconstants.KubeControllerManager), nil),
kubeadmconstants.KubeScheduler: staticpodutil.ComponentPod(v1.Container{
@@ -91,7 +93,7 @@ func GetStaticPodSpecs(cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmap
LivenessProbe: staticpodutil.LivenessProbe(staticpodutil.GetSchedulerProbeAddress(cfg), "/livez", kubeadmconstants.KubeSchedulerPort, v1.URISchemeHTTPS),
ReadinessProbe: staticpodutil.ReadinessProbe(staticpodutil.GetSchedulerProbeAddress(cfg), "/readyz", kubeadmconstants.KubeSchedulerPort, v1.URISchemeHTTPS),
StartupProbe: staticpodutil.StartupProbe(staticpodutil.GetSchedulerProbeAddress(cfg), "/livez", kubeadmconstants.KubeSchedulerPort, v1.URISchemeHTTPS, componentHealthCheckTimeout),
- Resources: staticpodutil.ComponentResources("100m"),
+ Resources: staticpodutil.ComponentResources("0"),
Env: kubeadmutil.MergeKubeadmEnvVars(proxyEnvs, cfg.Scheduler.ExtraEnvs),
}, mounts.GetVolumes(kubeadmconstants.KubeScheduler), nil),
}
diff --git a/cmd/kubeadm/app/util/staticpod/utils.go b/cmd/kubeadm/app/util/staticpod/utils.go
index 98e6ae21951..56f8e731a35 100644
--- a/cmd/kubeadm/app/util/staticpod/utils.go
+++ b/cmd/kubeadm/app/util/staticpod/utils.go
@@ -100,6 +100,18 @@ func ComponentResources(cpu string) v1.ResourceRequirements {
}
}
+// ComponentLimitResources returns the v1.ResourceRequirements object needed for allocating a specified amount of the CPU with Limits
+func ComponentLimitResources(cpu string, lcpu string) v1.ResourceRequirements {
+ return v1.ResourceRequirements{
+ Requests: v1.ResourceList{
+ v1.ResourceCPU: resource.MustParse(cpu),
+ },
+ Limits: v1.ResourceList{
+ v1.ResourceCPU: resource.MustParse(lcpu),
+ },
+ }
+}
+
// NewVolume creates a v1.Volume with a hostPath mount to the specified location
func NewVolume(name, path string, pathType *v1.HostPathType) v1.Volume {
return v1.Volume{
@@ -262,7 +274,10 @@ func LivenessProbe(host, path string, port int32, scheme v1.URIScheme) *v1.Probe
func ReadinessProbe(host, path string, port int32, scheme v1.URIScheme) *v1.Probe {
// sets initialDelaySeconds as '0' because we don't want to delay user infrastructure checks
// looking for "ready" status on kubeadm static Pods
- return createHTTPProbe(host, path, port, scheme, 0, 15, 3, 1)
+ // WRS/SS joint recommendation: All pods probes should have following minimum probe
+ // settings unless required by the service (initialDelaySecond 0, periodSeconds 10,
+ // timeoutSeconds 5, successThreshold 1, failureThreshold 3)
+ return createHTTPProbe(host, path, port, scheme, 0, 15, 3, 10)
}
// StartupProbe creates a Probe object with a HTTPGet handler
--
2.34.1

View File

@ -0,0 +1,33 @@
From 5f9da0e944a16194baf8d332a0d3a7e2fa0146d5 Mon Sep 17 00:00:00 2001
From: Saba Touheed Mujawar <sabatouheed.mujawar@windriver.com>
Date: Tue, 29 Apr 2025 10:09:16 -0400
Subject: [PATCH] kubeadm: reduce UpgradeManifestTimeout
This modifies kubeadm UpgradeManifestTimeout from 5 minutes default
to 3 minutes to reduce the unnecessary delay in retries during
kubeadm-upgrade-apply failures.
The typical control-plane upgrade of static pods is 75 to 85 seconds,
so 3 minutes gives adequate buffer to complete the operation.
Signed-off-by: Saba Touheed Mujawar <sabatouheed.mujawar@windriver.com>
---
cmd/kubeadm/app/constants/constants.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/cmd/kubeadm/app/constants/constants.go b/cmd/kubeadm/app/constants/constants.go
index 6aa6f41bddd..695bc6e3fe6 100644
--- a/cmd/kubeadm/app/constants/constants.go
+++ b/cmd/kubeadm/app/constants/constants.go
@@ -237,7 +237,7 @@ const (
KubeletHealthCheckTimeout = 4 * time.Minute
// UpgradeManifestsTimeout specifies the default timeout for upgrading static Pod manifests
- UpgradeManifestsTimeout = 5 * time.Minute
+ UpgradeManifestsTimeout = 3 * time.Minute
// PullImageRetry specifies how many times ContainerRuntime retries when pulling image failed
PullImageRetry = 5
--
2.34.1

View File

@ -0,0 +1,944 @@
From e7313c1fcdadbf74aa3665c406ba1ca6946a8c1d Mon Sep 17 00:00:00 2001
From: Saba Touheed Mujawar <sabatouheed.mujawar@windriver.com>
Date: Tue, 3 Jun 2025 11:28:24 -0400
Subject: [PATCH] kubelet cpumanager introduce concept of isolated CPUs
This introduces the concept of "isolated CPUs", which are CPUs that
have been isolated at the kernel level via the "isolcpus" kernel boot
parameter.
When starting the kubelet process, two separate sets of reserved CPUs
may be specified. With this change CPUs reserved via
'--system-reserved=cpu' will be used for infrastructure pods while the
isolated CPUs should be reserved via '--kube-reserved=cpu' to cause
kubelet to skip over them for "normal" CPU resource tracking. The
kubelet code will double-check that the specified isolated CPUs match
what the kernel exposes in "/sys/devices/system/cpu/isolated".
A plugin (outside the scope of this commit) will expose the isolated
CPUs to kubelet via the device plugin API.
If a pod specifies some number of "isolcpus" resources, the device
manager will allocate them. In this code we check whether such
resources have been allocated, and if so we set the container cpuset to
the isolated CPUs. This does mean that it really only makes sense to
specify "isolcpus" resources for best-effort or burstable pods, not for
guaranteed ones since that would throw off the accounting code. In
order to ensure the accounting still works as designed, if "isolcpus"
are specified for guaranteed pods, the affinity will be set to the
non-isolated CPUs.
Co-authored-by: Jim Gauld <james.gauld@windriver.com>
Co-authored-by: Chris Friesen <chris.friesen@windriver.com>
Signed-off-by: Gleb Aronsky <gleb.aronsky@windriver.com>
Signed-off-by: Ramesh Kumar Sivanandam <rameshkumar.sivanandam@windriver.com>
Signed-off-by: Sachin Gopala Krishna <saching.krishna@windriver.com>
Signed-off-by: Boovan Rajendran <boovan.rajendran@windriver.com>
Signed-off-by: Saba Touheed Mujawar <sabatouheed.mujawar@windriver.com>
---
pkg/kubelet/cm/container_manager_linux.go | 1 +
pkg/kubelet/cm/cpumanager/cpu_manager.go | 35 ++++-
pkg/kubelet/cm/cpumanager/cpu_manager_test.go | 23 +++-
pkg/kubelet/cm/cpumanager/policy_static.go | 105 ++++++++++++++-
.../cm/cpumanager/policy_static_test.go | 79 +++++++++--
pkg/kubelet/cm/devicemanager/manager_stub.go | 127 ++++++++++++++++++
6 files changed, 346 insertions(+), 24 deletions(-)
create mode 100644 pkg/kubelet/cm/devicemanager/manager_stub.go
diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go
index 51536010663..08f53d6f334 100644
--- a/pkg/kubelet/cm/container_manager_linux.go
+++ b/pkg/kubelet/cm/container_manager_linux.go
@@ -327,6 +327,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cm.GetNodeAllocatableReservation(),
nodeConfig.KubeletRootDir,
cm.topologyManager,
+ cm.deviceManager,
)
if err != nil {
klog.ErrorS(err, "Failed to initialize cpu manager")
diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go
index 2e292902c52..df4f0df8d17 100644
--- a/pkg/kubelet/cm/cpumanager/cpu_manager.go
+++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go
@@ -20,6 +20,8 @@ import (
"context"
"fmt"
"math"
+ "os"
+ "strings"
"sync"
"time"
@@ -32,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
+ "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@@ -51,6 +54,25 @@ type policyName string
// cpuManagerStateFileName is the file name where cpu manager stores its state
const cpuManagerStateFileName = "cpu_manager_state"
+// get the system-level isolated CPUs
+func getIsolcpus() cpuset.CPUSet {
+ dat, err := os.ReadFile("/sys/devices/system/cpu/isolated")
+ if err != nil {
+ klog.Errorf("[cpumanager] unable to read sysfs isolcpus subdir")
+ return cpuset.New()
+ }
+
+ // The isolated cpus string ends in a newline
+ cpustring := strings.TrimSuffix(string(dat), "\n")
+ cset, err := cpuset.Parse(cpustring)
+ if err != nil {
+ klog.Errorf("[cpumanager] unable to parse sysfs isolcpus string to cpuset")
+ return cpuset.New()
+ }
+
+ return cset
+}
+
// Manager interface provides methods for Kubelet to manage pod cpus.
type Manager interface {
// Start is called during Kubelet initialization.
@@ -159,7 +181,8 @@ func (s *sourcesReadyStub) AddSource(source string) {}
func (s *sourcesReadyStub) AllReady() bool { return true }
// NewManager creates new cpu manager based on provided policy
-func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
+func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store, deviceManager devicemanager.Manager) (Manager, error) {
+
var topo *topology.CPUTopology
var policy Policy
var err error
@@ -201,7 +224,15 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc
// NOTE: Set excludeReserved unconditionally to exclude reserved CPUs from default cpuset.
// This variable is primarily to make testing easier.
excludeReserved := true
- policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity, cpuPolicyOptions, excludeReserved)
+
+ // isolCPUs is the set of kernel-isolated CPUs. They should be a subset of specificCPUs or
+ // of the CPUs that NewStaticPolicy() will pick if numReservedCPUs is set. It's only in the
+ // argument list here for ease of testing, it's really internal to the policy.
+ isolCPUs := getIsolcpus()
+ policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, isolCPUs, affinity, cpuPolicyOptions, deviceManager, excludeReserved)
+ if err != nil {
+ return nil, fmt.Errorf("new static policy error: %v", err)
+ }
if err != nil {
return nil, fmt.Errorf("new static policy error: %w", err)
diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go
index e029f6c11a9..c73cd418c49 100644
--- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go
+++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go
@@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
+ "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/utils/cpuset"
)
@@ -272,6 +273,7 @@ func TestCPUManagerAdd(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
+ testDM, _ := devicemanager.NewManagerStub()
testExcl := false
testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{
@@ -287,8 +289,10 @@ func TestCPUManagerAdd(t *testing.T) {
},
0,
cpuset.New(),
+ cpuset.New(),
topologymanager.NewFakeManager(),
nil,
+ testDM,
testExcl)
testCases := []struct {
description string
@@ -543,8 +547,9 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
}
testExcl := false
+ testDM, _ := devicemanager.NewManagerStub()
for _, testCase := range testCases {
- policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil, testExcl)
+ policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), cpuset.New(), topologymanager.NewFakeManager(), nil, testDM, testExcl)
mockState := &mockState{
assignments: testCase.stAssignments,
@@ -707,7 +712,9 @@ func TestCPUManagerGenerate(t *testing.T) {
}
defer os.RemoveAll(sDir)
- mgr, err := NewManager(testCase.cpuPolicyName, nil, 5*time.Second, machineInfo, cpuset.New(), testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager())
+ testDM, err := devicemanager.NewManagerStub()
+ mgr, err := NewManager(testCase.cpuPolicyName, nil, 5*time.Second, machineInfo, cpuset.New(), testCase.nodeAllocatableReservation, sDir, topologymanager.NewFakeManager(), testDM)
+
if testCase.expectedError != nil {
if !strings.Contains(err.Error(), testCase.expectedError.Error()) {
t.Errorf("Unexpected error message. Have: %s wants %s", err.Error(), testCase.expectedError.Error())
@@ -779,6 +786,7 @@ func TestReconcileState(t *testing.T) {
}
testExcl := false
+ testDM, _ := devicemanager.NewManagerStub()
testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{
NumCPUs: 8,
@@ -797,8 +805,10 @@ func TestReconcileState(t *testing.T) {
},
0,
cpuset.New(),
+ cpuset.New(),
topologymanager.NewFakeManager(),
nil,
+ testDM,
testExcl)
testCases := []struct {
@@ -1308,6 +1318,7 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
}
testExcl := false
+ testDM, _ := devicemanager.NewManagerStub()
testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{
NumCPUs: 4,
@@ -1322,8 +1333,10 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
},
1,
cpuset.New(0),
+ cpuset.New(),
topologymanager.NewFakeManager(),
nil,
+ testDM,
testExcl)
testCases := []struct {
description string
@@ -1440,7 +1453,8 @@ func TestCPUManagerHandlePolicyOptions(t *testing.T) {
}
defer os.RemoveAll(sDir)
- _, err = NewManager(testCase.cpuPolicyName, testCase.cpuPolicyOptions, 5*time.Second, machineInfo, cpuset.New(), nodeAllocatableReservation, sDir, topologymanager.NewFakeManager())
+ testDM, err := devicemanager.NewManagerStub()
+ _, err = NewManager(testCase.cpuPolicyName, testCase.cpuPolicyOptions, 5*time.Second, machineInfo, cpuset.New(), nodeAllocatableReservation, sDir, topologymanager.NewFakeManager(), testDM)
if err == nil {
t.Errorf("Expected error, but NewManager succeeded")
}
@@ -1458,6 +1472,7 @@ func TestCPUManagerGetAllocatableCPUs(t *testing.T) {
}
testExcl := false
+ testDm, _ := devicemanager.NewManagerStub()
nonePolicy, _ := NewNonePolicy(nil)
staticPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{
@@ -1473,8 +1488,10 @@ func TestCPUManagerGetAllocatableCPUs(t *testing.T) {
},
1,
cpuset.New(0),
+ cpuset.New(),
topologymanager.NewFakeManager(),
nil,
+ testDm,
testExcl)
testCases := []struct {
diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go
index a9db41b85eb..d2cab721205 100644
--- a/pkg/kubelet/cm/cpumanager/policy_static.go
+++ b/pkg/kubelet/cm/cpumanager/policy_static.go
@@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
+ "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/kubernetes/pkg/kubelet/metrics"
@@ -129,6 +130,10 @@ type staticPolicy struct {
topology *topology.CPUTopology
// set of CPUs that is not available for exclusive assignment
reservedCPUs cpuset.CPUSet
+ // subset of reserved CPUs with isolcpus attribute
+ isolcpus cpuset.CPUSet
+ // parent containerManager, used to get device list
+ deviceManager devicemanager.Manager
// If true, default CPUSet should exclude reserved CPUs
excludeReserved bool
// Superset of reservedCPUs. It includes not just the reservedCPUs themselves,
@@ -154,7 +159,8 @@ var _ Policy = &staticPolicy{}
// NewStaticPolicy returns a CPU manager policy that does not change CPU
// assignments for exclusively pinned guaranteed containers after the main
// container process starts.
-func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store, cpuPolicyOptions map[string]string, excludeReserved bool) (Policy, error) {
+func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, isolCPUs cpuset.CPUSet, affinity topologymanager.Store, cpuPolicyOptions map[string]string, deviceManager devicemanager.Manager, excludeReserved bool) (Policy, error) {
+
opts, err := NewStaticPolicyOptions(cpuPolicyOptions)
if err != nil {
return nil, err
@@ -170,6 +176,8 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
policy := &staticPolicy{
topology: topology,
affinity: affinity,
+ isolcpus: isolCPUs,
+ deviceManager: deviceManager,
excludeReserved: excludeReserved,
cpusToReuse: make(map[string]cpuset.CPUSet),
options: opts,
@@ -207,6 +215,12 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
policy.reservedCPUs = reserved
policy.reservedPhysicalCPUs = reservedPhysicalCPUs
+ if !isolCPUs.IsSubsetOf(reserved) {
+ klog.Errorf("[cpumanager] isolCPUs %v is not a subset of reserved %v", isolCPUs, reserved)
+ reserved = reserved.Union(isolCPUs)
+ klog.Warningf("[cpumanager] mismatch isolCPUs %v, force reserved %v", isolCPUs, reserved)
+ }
+
return policy, nil
}
@@ -246,9 +260,10 @@ func (p *staticPolicy) validateState(s state.State) error {
s.SetDefaultCPUSet(allCPUs)
}
klog.Infof(
- "[cpumanager] static policy: CPUSet: allCPUs:%v, reserved:%v, default:%v, CPUsPerCore:%v\n",
+ "[cpumanager] static policy: CPUSet: allCPUs:%v, reserved:%v, isolcpus:%v, default:%v, CPUsPerCore:%v\n",
allCPUs,
p.reservedCPUs,
+ p.isolcpus,
s.GetDefaultCPUSet(),
p.topology.CPUsPerCore(),
)
@@ -372,10 +387,11 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
return nil
}
- cpuset := p.reservedCPUs
+ cpuset := p.reservedCPUs.Clone().Difference(p.isolcpus)
if cpuset.IsEmpty() {
// If this happens then someone messed up.
- return fmt.Errorf("[cpumanager] static policy: reserved container unable to allocate cpus (namespace: %s, pod UID: %s, pod: %s, container: %s); cpuset=%v, reserved:%v", pod.Namespace, string(pod.UID), pod.Name, container.Name, cpuset, p.reservedCPUs)
+ return fmt.Errorf("[cpumanager] static policy: reserved container unable to allocate cpus (namespace: %s, pod UID: %s, pod: %s, container: %s); cpuset=%v, reserved:%v, isolcpus:%v", pod.Namespace, string(pod.UID), pod.Name, container.Name, cpuset, p.reservedCPUs, p.isolcpus)
+
}
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
klog.Infof(
@@ -389,6 +405,47 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
return nil
}
+ // This corrects kubelet cpumanager static cpuset tracking for isolcpus,
+ // since version 1.26.1 . This ensures that pods specified with
+ // isolcpus + guaranteed QoS + integer cpu requests, are affined to
+ // exclusive cpuset and tracked as non-isolated cpus.
+ cpuQuantity := container.Resources.Requests[v1.ResourceCPU]
+ fractionalCpuQuantity := cpuQuantity.MilliValue() % 1000
+ if isolcpus := p.podIsolCPUs(pod, container); isolcpus.Size() > 0 &&
+ v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed && fractionalCpuQuantity == 0 {
+ // container has requested isolated CPUs
+ if set, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
+ if set.Equals(isolcpus) {
+ klog.Infof(
+ "[cpumanager] isolcpus container already present in state, skipping (namespace: %s, pod UID: %s, pod: %s, container: %s)",
+ pod.Namespace,
+ string(pod.UID),
+ pod.Name,
+ container.Name,
+ )
+ return nil
+ } else {
+ klog.Infof(
+ "[cpumanager] isolcpus container state has cpus %v, should be %v (namespace: %s, pod UID: %s, pod: %s, container: %s)",
+ isolcpus,
+ set,
+ pod.Namespace,
+ string(pod.UID),
+ pod.Name,
+ container.Name,
+ )
+ }
+ }
+ // Note that we do not do anything about init containers here.
+ // It looks like devices are allocated per-pod based on effective requests/limits
+ // and extra devices from initContainers are not freed up when the regular containers start.
+ // TODO: confirm this is still true for 1.20
+ s.SetCPUSet(string(pod.UID), container.Name, isolcpus)
+ klog.Infof("[cpumanager] isolcpus: AddContainer (namespace: %s, pod UID: %s, pod: %s, container: %s); cpuset=%v",
+ pod.Namespace, string(pod.UID), pod.Name, container.Name, isolcpus)
+ return nil
+ }
+
numCPUs := p.guaranteedCPUs(pod, container)
if numCPUs == 0 {
// container belongs in the shared pool (nothing to do; use default cpuset)
@@ -466,7 +523,15 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
s.SetCPUSet(string(pod.UID), container.Name, cpuAllocation.CPUs)
p.updateCPUsToReuse(pod, container, cpuAllocation.CPUs)
p.updateMetricsOnAllocate(s, cpuAllocation)
-
+ klog.Infof(
+ "[cpumanager] guaranteed: AddContainer (namespace: %s, pod UID: %s, pod: %s, container: %s); numCPUS=%d, cpuset=%v",
+ pod.Namespace,
+ string(pod.UID),
+ pod.Name,
+ container.Name,
+ numCPUs,
+ cpuAllocation.CPUs,
+ )
klog.V(4).InfoS("Allocated exclusive CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "cpuset", cpuAllocation.CPUs.String())
return nil
}
@@ -865,6 +930,36 @@ func isKubeInfra(pod *v1.Pod) bool {
}
+// get the isolated CPUs (if any) from the devices associated with a specific container
+func (p *staticPolicy) podIsolCPUs(pod *v1.Pod, container *v1.Container) cpuset.CPUSet {
+ // NOTE: This is required for TestStaticPolicyAdd() since makePod() does
+ // not create UID. We also need a way to properly stub devicemanager.
+ if len(string(pod.UID)) == 0 {
+ return cpuset.New()
+ }
+ resContDevices := p.deviceManager.GetDevices(string(pod.UID), container.Name)
+ cpuSet := cpuset.New()
+ for resourceName, resourceDevs := range resContDevices {
+ // this resource name needs to match the isolcpus device plugin
+ if resourceName == "windriver.com/isolcpus" {
+ for devID, _ := range resourceDevs {
+ cpuStrList := []string{devID}
+ if len(cpuStrList) > 0 {
+ // loop over the list of strings, convert each one to int, add to cpuset
+ for _, cpuStr := range cpuStrList {
+ cpu, err := strconv.Atoi(cpuStr)
+ if err != nil {
+ panic(err)
+ }
+ cpuSet = cpuSet.Union(cpuset.New(cpu))
+ }
+ }
+ }
+ }
+ }
+ return cpuSet
+}
+
// isHintSocketAligned function return true if numa nodes in hint are socket aligned.
func (p *staticPolicy) isHintSocketAligned(hint topologymanager.TopologyHint, minAffinitySize int) bool {
numaNodesBitMask := hint.NUMANodeAffinity.GetBits()
diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go
index 23e33a743f6..2b2e80ae8a4 100644
--- a/pkg/kubelet/cm/cpumanager/policy_static_test.go
+++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go
@@ -31,6 +31,7 @@ import (
pkgfeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
+ "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/utils/cpuset"
@@ -74,8 +75,9 @@ func (spt staticPolicyTest) PseudoClone() staticPolicyTest {
}
func TestStaticPolicyName(t *testing.T) {
+ testDM, _ := devicemanager.NewManagerStub()
testExcl := false
- policy, err := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.New(), topologymanager.NewFakeManager(), nil, testExcl)
+ policy, err := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.New(), cpuset.New(), topologymanager.NewFakeManager(), nil, testDM, testExcl)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
@@ -88,6 +90,7 @@ func TestStaticPolicyName(t *testing.T) {
}
func TestStaticPolicyStart(t *testing.T) {
+ testDM, _ := devicemanager.NewManagerStub()
testCases := []staticPolicyTest{
{
description: "non-corrupted state",
@@ -184,7 +187,7 @@ func TestStaticPolicyStart(t *testing.T) {
}
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
- p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), testCase.options, testCase.excludeReserved)
+ p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), cpuset.New(), topologymanager.NewFakeManager(), testCase.options, testDM, testCase.excludeReserved)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
@@ -234,7 +237,6 @@ func TestStaticPolicyAdd(t *testing.T) {
largeTopoCPUSet := cpuset.New(largeTopoCPUids...)
largeTopoSock0CPUSet := cpuset.New(largeTopoSock0CPUids...)
largeTopoSock1CPUSet := cpuset.New(largeTopoSock1CPUids...)
-
// these are the cases which must behave the same regardless the policy options.
// So we will permutate the options to ensure this holds true.
@@ -657,7 +659,8 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) {
cpus = testCase.reservedCPUs.Clone()
}
testExcl := false
- policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpus, tm, testCase.options, testExcl)
+ testDM, _ := devicemanager.NewManagerStub()
+ policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpus, cpus, tm, testCase.options, testDM, testExcl)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
@@ -707,6 +710,7 @@ func runStaticPolicyTestCaseWithFeatureGate(t *testing.T, testCase staticPolicyT
}
func TestStaticPolicyReuseCPUs(t *testing.T) {
+ testDM, _ := devicemanager.NewManagerStub()
testCases := []struct {
staticPolicyTest
expCSetAfterAlloc cpuset.CPUSet
@@ -731,7 +735,7 @@ func TestStaticPolicyReuseCPUs(t *testing.T) {
}
for _, testCase := range testCases {
- policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil, testCase.excludeReserved)
+ policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), cpuset.New(), topologymanager.NewFakeManager(), nil, testDM, testCase.excludeReserved)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
@@ -766,6 +770,7 @@ func TestStaticPolicyReuseCPUs(t *testing.T) {
}
func TestStaticPolicyDoNotReuseCPUs(t *testing.T) {
+ testDM, _ := devicemanager.NewManagerStub()
testCases := []struct {
staticPolicyTest
expCSetAfterAlloc cpuset.CPUSet
@@ -787,7 +792,7 @@ func TestStaticPolicyDoNotReuseCPUs(t *testing.T) {
}
for _, testCase := range testCases {
- policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil, testCase.excludeReserved)
+ policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), cpuset.New(), topologymanager.NewFakeManager(), nil, testDM, testCase.excludeReserved)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
@@ -815,6 +820,7 @@ func TestStaticPolicyDoNotReuseCPUs(t *testing.T) {
func TestStaticPolicyRemove(t *testing.T) {
excludeReserved := false
+ testDM, _ := devicemanager.NewManagerStub()
testCases := []staticPolicyTest{
{
description: "SingleSocketHT, DeAllocOneContainer",
@@ -873,7 +879,7 @@ func TestStaticPolicyRemove(t *testing.T) {
}
for _, testCase := range testCases {
- policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil, excludeReserved)
+ policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), cpuset.New(), topologymanager.NewFakeManager(), nil, testDM, excludeReserved)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
@@ -899,6 +905,7 @@ func TestStaticPolicyRemove(t *testing.T) {
func TestTopologyAwareAllocateCPUs(t *testing.T) {
excludeReserved := false
+ testDM, _ := devicemanager.NewManagerStub()
testCases := []struct {
description string
topo *topology.CPUTopology
@@ -967,7 +974,7 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
},
}
for _, tc := range testCases {
- p, err := NewStaticPolicy(tc.topo, 0, cpuset.New(), topologymanager.NewFakeManager(), nil, excludeReserved)
+ p, err := NewStaticPolicy(tc.topo, 0, cpuset.New(), cpuset.New(), topologymanager.NewFakeManager(), nil, testDM, excludeReserved)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
@@ -1003,6 +1010,7 @@ type staticPolicyTestWithResvList struct {
topo *topology.CPUTopology
numReservedCPUs int
reserved cpuset.CPUSet
+ isolcpus cpuset.CPUSet
cpuPolicyOptions map[string]string
stAssignments state.ContainerCPUAssignments
stDefaultCPUSet cpuset.CPUSet
@@ -1016,6 +1024,8 @@ type staticPolicyTestWithResvList struct {
}
func TestStaticPolicyStartWithResvList(t *testing.T) {
+ testDM, _ := devicemanager.NewManagerStub()
+ testExcl := false
testCases := []staticPolicyTestWithResvList{
{
description: "empty cpuset",
@@ -1065,10 +1075,10 @@ func TestStaticPolicyStartWithResvList(t *testing.T) {
expNewErr: fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of 0-1 did not equal 1)"),
},
}
- testExcl := false
+
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
- p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), testCase.cpuPolicyOptions, testExcl)
+ p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, cpuset.New(), topologymanager.NewFakeManager(), testCase.cpuPolicyOptions, testDM, testExcl)
if !reflect.DeepEqual(err, testCase.expNewErr) {
t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v",
@@ -1118,6 +1128,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
topo: topoSingleSocketHT,
numReservedCPUs: 1,
reserved: cpuset.New(0),
+ isolcpus: cpuset.New(),
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.New(1, 2, 3, 4, 5, 6, 7),
pod: makePod("fakePod", "fakeContainer2", "8000m", "8000m"),
@@ -1131,6 +1142,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
topo: topoSingleSocketHT,
numReservedCPUs: 2,
reserved: cpuset.New(0, 1),
+ isolcpus: cpuset.New(),
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.New(2, 3, 4, 5, 6, 7),
pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"),
@@ -1144,6 +1156,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
topo: topoSingleSocketHT,
numReservedCPUs: 2,
reserved: cpuset.New(0, 1),
+ isolcpus: cpuset.New(),
stAssignments: state.ContainerCPUAssignments{
"fakePod": map[string]cpuset.CPUSet{
"fakeContainer100": cpuset.New(2, 3, 6, 7),
@@ -1156,11 +1169,30 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
expCPUAlloc: true,
expCSet: cpuset.New(0, 1),
},
+ {
+ description: "InfraPod, SingleSocketHT, Isolcpus, ExpectAllocReserved",
+ topo: topoSingleSocketHT,
+ numReservedCPUs: 2,
+ reserved: cpuset.New(0, 1),
+ isolcpus: cpuset.New(1),
+ stAssignments: state.ContainerCPUAssignments{
+ "fakePod": map[string]cpuset.CPUSet{
+ "fakeContainer100": cpuset.New(2, 3, 6, 7),
+ },
+ },
+ stDefaultCPUSet: cpuset.New(4, 5),
+ pod: infraPod,
+ isKubeInfraPod: fakeIsKubeInfraTrue,
+ expErr: nil,
+ expCPUAlloc: true,
+ expCSet: cpuset.New(0),
+ },
}
testExcl := true
+ testDM, _ := devicemanager.NewManagerStub()
for _, testCase := range testCases {
- policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil, testExcl)
+ policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, testCase.isolcpus, topologymanager.NewFakeManager(), nil, testDM, testExcl)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
@@ -1221,6 +1253,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
topo: topoDualSocketSingleNumaPerSocketSMTUncore,
numReservedCPUs: 2,
reserved: cpuset.New(8, 9),
+ isolcpus: cpuset.New(),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1250,6 +1283,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
topo: topoDualSocketSingleNumaPerSocketSMTUncore,
numReservedCPUs: 2,
reserved: cpuset.New(8, 9),
+ isolcpus: cpuset.New(),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1280,6 +1314,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
topo: topoDualSocketSingleNumaPerSocketSMTUncore,
numReservedCPUs: 2,
reserved: cpuset.New(8, 9),
+ isolcpus: cpuset.New(),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1310,6 +1345,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
topo: topoDualSocketSingleNumaPerSocketSMTUncore,
numReservedCPUs: 2,
reserved: cpuset.New(8, 9),
+ isolcpus: cpuset.New(),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1342,6 +1378,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
topo: topoDualSocketSingleNumaPerSocketSMTUncore,
numReservedCPUs: 4,
reserved: cpuset.New(0, 1, 8, 9),
+ isolcpus: cpuset.New(),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1365,6 +1402,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
topo: topoSingleSocketSingleNumaPerSocketSMTSmallUncore,
numReservedCPUs: 4,
reserved: cpuset.New(8, 9, 10, 11),
+ isolcpus: cpuset.New(),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1387,7 +1425,8 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
description: "GuPodSingleContainer, SingleSocketSMTSmallUncore, ExpectAllocTwoUncore",
topo: topoSingleSocketSingleNumaPerSocketSMTSmallUncore, // 8 cpus per uncore
numReservedCPUs: 4,
- reserved: cpuset.New(0, 1, 64, 65), // note 4 cpus taken from uncore 0
+ reserved: cpuset.New(0, 1, 64, 65),
+ isolcpus: cpuset.New(), // note 4 cpus taken from uncore 0
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1414,6 +1453,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
topo: topoSingleSocketSingleNumaPerSocketNoSMTSmallUncore, // 4 cpus per uncore
numReservedCPUs: 4,
reserved: cpuset.New(0, 1, 2, 3), // note 4 cpus taken from uncore 0
+ isolcpus: cpuset.New(),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1446,6 +1486,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
topo: topoSmallDualSocketSingleNumaPerSocketNoSMTUncore, // 8 cpus per uncore
numReservedCPUs: 4,
reserved: cpuset.New(0, 1, 2, 3),
+ isolcpus: cpuset.New(),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1473,6 +1514,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
topo: topoSmallDualSocketSingleNumaPerSocketNoSMTUncore, // 8 cpus per uncore
numReservedCPUs: 4,
reserved: cpuset.New(0, 1, 32, 33), // note 2 cpus taken from uncore 0, 2 from uncore 4
+ isolcpus: cpuset.New(),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1500,6 +1542,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
topo: topoLargeSingleSocketSingleNumaPerSocketSMTUncore, // 16 cpus per uncore
numReservedCPUs: 4,
reserved: cpuset.New(0, 1, 128, 129), // note 4 cpus taken from uncore 0
+ isolcpus: cpuset.New(),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1527,6 +1570,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
topo: topoLargeSingleSocketSingleNumaPerSocketSMTUncore, // 16 cpus per uncore
numReservedCPUs: 4,
reserved: cpuset.New(0, 1, 128, 129), // note 4 cpus taken from uncore 0
+ isolcpus: cpuset.New(),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1554,6 +1598,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
topo: topoDualSocketMultiNumaPerSocketUncore, // 8 cpus per uncore
numReservedCPUs: 4,
reserved: cpuset.New(0, 1, 2, 3), // note 4 cpus taken from uncore 0
+ isolcpus: cpuset.New(),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1577,6 +1622,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
topo: topoDualSocketSubNumaPerSocketHTMonolithicUncore, // Uncore cache CPUs = Socket CPUs
numReservedCPUs: 4,
reserved: cpuset.New(0, 1, 120, 121), // note 4 cpus taken from first 2 cores
+ isolcpus: cpuset.New(),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1602,6 +1648,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
topo: topoSingleSocketSingleNumaPerSocketPCoreHTMonolithicUncore, // Uncore cache CPUs = Socket CPUs
numReservedCPUs: 2,
reserved: cpuset.New(0, 1),
+ isolcpus: cpuset.New(),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1629,7 +1676,8 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
description: "GuPodSingleContainer, LargeSingleSocketUncore, ExpectAllocOneUncore",
topo: topoLargeSingleSocketSingleNumaPerSocketUncore, // 8 cpus per uncore
numReservedCPUs: 2,
- reserved: cpuset.New(8, 9), //
+ reserved: cpuset.New(8, 9),
+ isolcpus: cpuset.New(),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1653,6 +1701,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
topo: topoSingleSocketSingleNumaPerSocketUncore, // Uncore cache CPUs = Socket CPUs
numReservedCPUs: 2,
reserved: cpuset.New(0, 1),
+ isolcpus: cpuset.New(),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1688,6 +1737,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
topo: topoSmallSingleSocketSingleNumaPerSocketNoSMTUncore, // 8 cpus per uncore
numReservedCPUs: 4,
reserved: cpuset.New(0, 1, 2, 3), // note 4 cpus taken from uncore 0
+ isolcpus: cpuset.New(),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1717,9 +1767,10 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
}
testExcl := false
+ testDM, _ := devicemanager.NewManagerStub()
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
- policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), testCase.cpuPolicyOptions, testExcl)
+ policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, testCase.isolcpus, topologymanager.NewFakeManager(), testCase.cpuPolicyOptions, testDM, testExcl)
if err != nil {
t.Fatalf("NewStaticPolicy() failed with %v", err)
}
diff --git a/pkg/kubelet/cm/devicemanager/manager_stub.go b/pkg/kubelet/cm/devicemanager/manager_stub.go
new file mode 100644
index 00000000000..6e9e03a25f1
--- /dev/null
+++ b/pkg/kubelet/cm/devicemanager/manager_stub.go
@@ -0,0 +1,127 @@
+/*
+Copyright 2017 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package devicemanager
+
+import (
+ v1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/util/sets"
+ "k8s.io/apiserver/pkg/server/healthz"
+ "k8s.io/kubernetes/pkg/kubelet/cm/containermap"
+ "k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates"
+ "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
+ "k8s.io/kubernetes/pkg/kubelet/config"
+ "k8s.io/kubernetes/pkg/kubelet/lifecycle"
+ "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
+ schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
+)
+
+// ManagerStub provides a simple stub implementation for the Device Manager.
+type ManagerStub struct {
+ // containerMap provides a mapping from (pod, container) -> containerID
+ // for all containers in a pod. Used to detect pods running across a restart
+ containerMap containermap.ContainerMap
+
+ // containerRunningSet identifies which container among those present in `containerMap`
+ // was reported running by the container runtime when `containerMap` was computed.
+ // Used to detect pods running across a restart
+ containerRunningSet sets.Set[string]
+}
+
+// GetHealthChecker implements Manager.
+func (h *ManagerStub) GetHealthChecker() healthz.HealthChecker {
+ return nil
+}
+
+// UpdateAllocatedResourcesStatus implements Manager.
+func (h *ManagerStub) UpdateAllocatedResourcesStatus(pod *v1.Pod, status *v1.PodStatus) {
+ return
+}
+
+// Updates implements Manager.
+func (h *ManagerStub) Updates() <-chan resourceupdates.Update {
+ return nil
+}
+
+// NewManagerStub creates a ManagerStub.
+func NewManagerStub() (*ManagerStub, error) {
+ return &ManagerStub{}, nil
+}
+
+// Start simply returns nil.
+func (h *ManagerStub) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.Set[string]) error {
+ return nil
+}
+
+// Stop simply returns nil.
+func (h *ManagerStub) Stop() error {
+ return nil
+}
+
+// Allocate simply returns nil.
+func (h *ManagerStub) Allocate(pod *v1.Pod, container *v1.Container) error {
+ return nil
+}
+
+// UpdatePluginResources simply returns nil.
+func (h *ManagerStub) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
+ return nil
+}
+
+// GetDeviceRunContainerOptions simply returns nil.
+func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {
+ return nil, nil
+}
+
+// GetCapacity simply returns nil capacity and empty removed resource list.
+func (h *ManagerStub) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
+ return nil, nil, []string{}
+}
+
+// GetWatcherHandler returns plugin watcher interface
+func (h *ManagerStub) GetWatcherHandler() cache.PluginHandler {
+ return nil
+}
+
+// GetTopologyHints returns an empty TopologyHint map
+func (h *ManagerStub) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
+ return map[string][]topologymanager.TopologyHint{}
+}
+
+// GetPodTopologyHints returns an empty TopologyHint map
+func (h *ManagerStub) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
+ return map[string][]topologymanager.TopologyHint{}
+}
+
+// GetDevices returns nil
+func (h *ManagerStub) GetDevices(_, _ string) ResourceDeviceInstances {
+ return nil
+}
+
+// GetAllocatableDevices returns nothing
+func (h *ManagerStub) GetAllocatableDevices() ResourceDeviceInstances {
+ return nil
+}
+
+// ShouldResetExtendedResourceCapacity returns false
+func (h *ManagerStub) ShouldResetExtendedResourceCapacity() bool {
+ return false
+}
+
+// UpdateAllocatedDevices returns nothing
+func (h *ManagerStub) UpdateAllocatedDevices() {
+ return
+}
--
2.34.1

View File

@ -0,0 +1,397 @@
From bbe0d21ce075022a2739bbacdcc19e353fddb77b Mon Sep 17 00:00:00 2001
From: Saba Touheed Mujawar <sabatouheed.mujawar@windriver.com>
Date: Wed, 21 May 2025 07:09:16 -0400
Subject: [PATCH] kubelet cpumanager keep normal containers off reserved CPUs
When starting the kubelet process, two separate sets of reserved CPUs
may be specified. With this change CPUs reserved via
'--system-reserved=cpu'
or '--kube-reserved=cpu' will be ignored by kubernetes itself. This
explicitly excludes the reserved CPUS from the DefaultCPUset so
that pods cannot run there.
Co-authored-by: Jim Gauld <james.gauld@windriver.com>
Signed-off-by: Sachin Gopala Krishna <saching.krishna@windriver.com>
Signed-off-by: Ramesh Kumar Sivanandam <rameshkumar.sivanandam@windriver.com>
Signed-off-by: Boovan Rajendran <boovan.rajendran@windriver.com>
Signed-off-by: Saba Touheed Mujawar <sabatouheed.mujawar@windriver.com>
---
pkg/kubelet/cm/cpumanager/cpu_manager.go | 6 ++-
pkg/kubelet/cm/cpumanager/cpu_manager_test.go | 19 ++++++--
pkg/kubelet/cm/cpumanager/policy_static.go | 48 ++++++++++++++-----
.../cm/cpumanager/policy_static_test.go | 42 +++++++++++-----
4 files changed, 85 insertions(+), 30 deletions(-)
diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go
index 015a49f6a35..2e292902c52 100644
--- a/pkg/kubelet/cm/cpumanager/cpu_manager.go
+++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go
@@ -198,7 +198,11 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc
// exclusively allocated.
reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
- policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity, cpuPolicyOptions)
+ // NOTE: Set excludeReserved unconditionally to exclude reserved CPUs from default cpuset.
+ // This variable is primarily to make testing easier.
+ excludeReserved := true
+ policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity, cpuPolicyOptions, excludeReserved)
+
if err != nil {
return nil, fmt.Errorf("new static policy error: %w", err)
}
diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go
index 6c3af2dc3f3..e029f6c11a9 100644
--- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go
+++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go
@@ -272,6 +272,7 @@ func TestCPUManagerAdd(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
+ testExcl := false
testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{
NumCPUs: 4,
@@ -287,7 +288,8 @@ func TestCPUManagerAdd(t *testing.T) {
0,
cpuset.New(),
topologymanager.NewFakeManager(),
- nil)
+ nil,
+ testExcl)
testCases := []struct {
description string
updateErr error
@@ -540,8 +542,9 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
},
}
+ testExcl := false
for _, testCase := range testCases {
- policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil)
+ policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil, testExcl)
mockState := &mockState{
assignments: testCase.stAssignments,
@@ -775,6 +778,7 @@ func TestReconcileState(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
+ testExcl := false
testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{
NumCPUs: 8,
@@ -794,7 +798,8 @@ func TestReconcileState(t *testing.T) {
0,
cpuset.New(),
topologymanager.NewFakeManager(),
- nil)
+ nil,
+ testExcl)
testCases := []struct {
description string
@@ -1302,6 +1307,7 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
+ testExcl := false
testPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{
NumCPUs: 4,
@@ -1317,7 +1323,8 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
1,
cpuset.New(0),
topologymanager.NewFakeManager(),
- nil)
+ nil,
+ testExcl)
testCases := []struct {
description string
updateErr error
@@ -1450,6 +1457,7 @@ func TestCPUManagerGetAllocatableCPUs(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
}
+ testExcl := false
nonePolicy, _ := NewNonePolicy(nil)
staticPolicy, _ := NewStaticPolicy(
&topology.CPUTopology{
@@ -1466,7 +1474,8 @@ func TestCPUManagerGetAllocatableCPUs(t *testing.T) {
1,
cpuset.New(0),
topologymanager.NewFakeManager(),
- nil)
+ nil,
+ testExcl)
testCases := []struct {
description string
diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go
index 28591c5baf1..b67d04849c4 100644
--- a/pkg/kubelet/cm/cpumanager/policy_static.go
+++ b/pkg/kubelet/cm/cpumanager/policy_static.go
@@ -107,6 +107,8 @@ type staticPolicy struct {
topology *topology.CPUTopology
// set of CPUs that is not available for exclusive assignment
reservedCPUs cpuset.CPUSet
+ // If true, default CPUSet should exclude reserved CPUs
+ excludeReserved bool
// Superset of reservedCPUs. It includes not just the reservedCPUs themselves,
// but also any siblings of those reservedCPUs on the same physical die.
// NOTE: If the reserved set includes full physical CPUs from the beginning
@@ -130,7 +132,7 @@ var _ Policy = &staticPolicy{}
// NewStaticPolicy returns a CPU manager policy that does not change CPU
// assignments for exclusively pinned guaranteed containers after the main
// container process starts.
-func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store, cpuPolicyOptions map[string]string) (Policy, error) {
+func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store, cpuPolicyOptions map[string]string, excludeReserved bool) (Policy, error) {
opts, err := NewStaticPolicyOptions(cpuPolicyOptions)
if err != nil {
return nil, err
@@ -141,14 +143,15 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
}
cpuGroupSize := topology.CPUsPerCore()
- klog.InfoS("Static policy created with configuration", "options", opts, "cpuGroupSize", cpuGroupSize)
+ klog.V(4).InfoS("Static policy created with configuration", "options", opts, "cpuGroupSize", cpuGroupSize)
policy := &staticPolicy{
- topology: topology,
- affinity: affinity,
- cpusToReuse: make(map[string]cpuset.CPUSet),
- options: opts,
- cpuGroupSize: cpuGroupSize,
+ topology: topology,
+ affinity: affinity,
+ excludeReserved: excludeReserved,
+ cpusToReuse: make(map[string]cpuset.CPUSet),
+ options: opts,
+ cpuGroupSize: cpuGroupSize,
}
allCPUs := topology.CPUDetails.CPUs()
@@ -213,8 +216,21 @@ func (p *staticPolicy) validateState(s state.State) error {
return fmt.Errorf("default cpuset cannot be empty")
}
// state is empty initialize
- s.SetDefaultCPUSet(allCPUs)
- klog.InfoS("Static policy initialized", "defaultCPUSet", allCPUs)
+ if p.excludeReserved {
+ // Exclude reserved CPUs from the default CPUSet to keep containers off them
+ // unless explicitly affined.
+ s.SetDefaultCPUSet(allCPUs.Difference(p.reservedCPUs))
+ } else {
+ s.SetDefaultCPUSet(allCPUs)
+ }
+ klog.Infof(
+ "[cpumanager] static policy: CPUSet: allCPUs:%v, reserved:%v, default:%v, CPUsPerCore:%v\n",
+ allCPUs,
+ p.reservedCPUs,
+ s.GetDefaultCPUSet(),
+ p.topology.CPUsPerCore(),
+ )
+ klog.V(4).InfoS("Static policy initialized", "defaultCPUSet", allCPUs)
return nil
}
@@ -228,9 +244,11 @@ func (p *staticPolicy) validateState(s state.State) error {
p.reservedCPUs.Intersection(tmpDefaultCPUset).String(), tmpDefaultCPUset.String())
}
} else {
- if !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) {
- return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"",
- p.reservedCPUs.String(), tmpDefaultCPUset.String())
+ if !p.excludeReserved {
+ if !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) {
+ return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"",
+ p.reservedCPUs.String(), tmpDefaultCPUset.String())
+ }
}
}
@@ -260,6 +278,9 @@ func (p *staticPolicy) validateState(s state.State) error {
}
}
totalKnownCPUs = totalKnownCPUs.Union(tmpCPUSets...)
+ if p.excludeReserved {
+ totalKnownCPUs = totalKnownCPUs.Union(p.reservedCPUs)
+ }
if !totalKnownCPUs.Equals(allCPUs) {
return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"",
allCPUs.String(), totalKnownCPUs.String())
@@ -414,6 +435,9 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
cpusInUse := getAssignedCPUsOfSiblings(s, podUID, containerName)
if toRelease, ok := s.GetCPUSet(podUID, containerName); ok {
s.Delete(podUID, containerName)
+ if p.excludeReserved {
+ toRelease = toRelease.Difference(p.reservedCPUs)
+ }
// Mutate the shared pool, adding released cpus.
toRelease = toRelease.Difference(cpusInUse)
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease))
diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go
index db3a3649b56..28fea9c8aa7 100644
--- a/pkg/kubelet/cm/cpumanager/policy_static_test.go
+++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go
@@ -37,6 +37,7 @@ type staticPolicyTest struct {
description string
topo *topology.CPUTopology
numReservedCPUs int
+ excludeReserved bool
reservedCPUs *cpuset.CPUSet
podUID string
options map[string]string
@@ -70,7 +71,8 @@ func (spt staticPolicyTest) PseudoClone() staticPolicyTest {
}
func TestStaticPolicyName(t *testing.T) {
- policy, err := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.New(), topologymanager.NewFakeManager(), nil)
+ testExcl := false
+ policy, err := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.New(), topologymanager.NewFakeManager(), nil, testExcl)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
@@ -103,6 +105,15 @@ func TestStaticPolicyStart(t *testing.T) {
stDefaultCPUSet: cpuset.New(),
expCSet: cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
},
+ {
+ description: "empty cpuset exclude reserved",
+ topo: topoDualSocketHT,
+ numReservedCPUs: 2,
+ excludeReserved: true,
+ stAssignments: state.ContainerCPUAssignments{},
+ stDefaultCPUSet: cpuset.New(),
+ expCSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
+ },
{
description: "reserved cores 0 & 6 are not present in available cpuset",
topo: topoDualSocketHT,
@@ -170,7 +181,7 @@ func TestStaticPolicyStart(t *testing.T) {
}
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
- p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), testCase.options)
+ p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), testCase.options, testCase.excludeReserved)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
@@ -642,7 +653,8 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) {
if testCase.reservedCPUs != nil {
cpus = testCase.reservedCPUs.Clone()
}
- policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpus, tm, testCase.options)
+ testExcl := false
+ policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpus, tm, testCase.options, testExcl)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
@@ -716,7 +728,7 @@ func TestStaticPolicyReuseCPUs(t *testing.T) {
}
for _, testCase := range testCases {
- policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil)
+ policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil, testCase.excludeReserved)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
@@ -772,7 +784,7 @@ func TestStaticPolicyDoNotReuseCPUs(t *testing.T) {
}
for _, testCase := range testCases {
- policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil)
+ policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil, testCase.excludeReserved)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
@@ -799,6 +811,7 @@ func TestStaticPolicyDoNotReuseCPUs(t *testing.T) {
}
func TestStaticPolicyRemove(t *testing.T) {
+ excludeReserved := false
testCases := []staticPolicyTest{
{
description: "SingleSocketHT, DeAllocOneContainer",
@@ -857,7 +870,7 @@ func TestStaticPolicyRemove(t *testing.T) {
}
for _, testCase := range testCases {
- policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil)
+ policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.New(), topologymanager.NewFakeManager(), nil, excludeReserved)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
@@ -882,6 +895,7 @@ func TestStaticPolicyRemove(t *testing.T) {
}
func TestTopologyAwareAllocateCPUs(t *testing.T) {
+ excludeReserved := false
testCases := []struct {
description string
topo *topology.CPUTopology
@@ -950,7 +964,7 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
},
}
for _, tc := range testCases {
- p, err := NewStaticPolicy(tc.topo, 0, cpuset.New(), topologymanager.NewFakeManager(), nil)
+ p, err := NewStaticPolicy(tc.topo, 0, cpuset.New(), topologymanager.NewFakeManager(), nil, excludeReserved)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
@@ -1047,9 +1061,11 @@ func TestStaticPolicyStartWithResvList(t *testing.T) {
expNewErr: fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of 0-1 did not equal 1)"),
},
}
+ testExcl := false
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
- p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), testCase.cpuPolicyOptions)
+ p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), testCase.cpuPolicyOptions, testExcl)
+
if !reflect.DeepEqual(err, testCase.expNewErr) {
t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expNewErr, err)
@@ -1089,7 +1105,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
numReservedCPUs: 1,
reserved: cpuset.New(0),
stAssignments: state.ContainerCPUAssignments{},
- stDefaultCPUSet: cpuset.New(0, 1, 2, 3, 4, 5, 6, 7),
+ stDefaultCPUSet: cpuset.New(1, 2, 3, 4, 5, 6, 7),
pod: makePod("fakePod", "fakeContainer2", "8000m", "8000m"),
expErr: fmt.Errorf("not enough cpus available to satisfy request: requested=8, available=7"),
expCPUAlloc: false,
@@ -1101,7 +1117,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
numReservedCPUs: 2,
reserved: cpuset.New(0, 1),
stAssignments: state.ContainerCPUAssignments{},
- stDefaultCPUSet: cpuset.New(0, 1, 2, 3, 4, 5, 6, 7),
+ stDefaultCPUSet: cpuset.New(2, 3, 4, 5, 6, 7),
pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"),
expErr: nil,
expCPUAlloc: true,
@@ -1125,8 +1141,9 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
},
}
+ testExcl := true
for _, testCase := range testCases {
- policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil)
+ policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), nil, testExcl)
if err != nil {
t.Fatalf("NewStaticPolicy() failed: %v", err)
}
@@ -1681,9 +1698,10 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
},
}
+ testExcl := false
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
- policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), testCase.cpuPolicyOptions)
+ policy, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), testCase.cpuPolicyOptions, testExcl)
if err != nil {
t.Fatalf("NewStaticPolicy() failed with %v", err)
}
--
2.34.1

View File

@ -0,0 +1,649 @@
From 8a9330b4fe8c0024d34a53151979d566ce52f441 Mon Sep 17 00:00:00 2001
From: Saba Touheed Mujawar <sabatouheed.mujawar@windriver.com>
Date: Tue, 3 Jun 2025 10:18:14 -0400
Subject: [PATCH] kubelet cpumanager platform pods on reserved cpus
This change assigns system infrastructure pods to the "reserved"
cpuset to isolate them from the shared pool of CPUs. Kubernetes
infrastructure pods are identified based on namespace 'kube-system'.
Platform pods are identified based on namespace 'kube-system',
or label with 'app.starlingx.io/component=platform'.
The platform and infrastructure pods are given an isolated CPU
affinity cpuset when the CPU manager is configured "with the 'static'
policy."
This implementation assumes that the "reserved" cpuset is large
enough to handle all infrastructure and platform pod's CPU
allocations, and it prevents the platform pods from running on
application/isolated CPUs regardless of what QoS class they're in.
Co-authored-by: Jim Gauld <james.gauld@windriver.com>
Signed-off-by: Gleb Aronsky <gleb.aronsky@windriver.com>
Signed-off-by: Thiago Miranda <ThiagoOliveira.Miranda@windriver.com>
Signed-off-by: Kaustubh Dhokte <kaustubh.dhokte@windriver.com>
Signed-off-by: Ramesh Kumar Sivanandam <rameshkumar.sivanandam@windriver.com>
Signed-off-by: Sachin Gopala Krishna <saching.krishna@windriver.com>
Signed-off-by: Boovan Rajendran <boovan.rajendran@windriver.com>
Signed-off-by: Saba Touheed Mujawar <sabatouheed.mujawar@windriver.com>
---
pkg/kubelet/cm/cpumanager/policy_static.go | 122 +++++++++++
.../cm/cpumanager/policy_static_test.go | 203 +++++++++++++++---
.../cm/cpumanager/topology_hints_test.go | 4 +
3 files changed, 300 insertions(+), 29 deletions(-)
diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go
index b67d04849c4..a9db41b85eb 100644
--- a/pkg/kubelet/cm/cpumanager/policy_static.go
+++ b/pkg/kubelet/cm/cpumanager/policy_static.go
@@ -17,12 +17,18 @@ limitations under the License.
package cpumanager
import (
+ "context"
"fmt"
"strconv"
v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
+ k8sclient "k8s.io/client-go/kubernetes"
+ restclient "k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
+ "k8s.io/kubernetes/cmd/kubeadm/app/constants"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
"k8s.io/kubernetes/pkg/features"
@@ -44,6 +50,22 @@ const (
ErrorSMTAlignment = "SMTAlignmentError"
)
+// Declared as variables so that they can easily more
+// overridden during testing
+type getPodNamespace func(string) (*v1.Namespace, error)
+type buildFromConfigFlag func(masterUrl string, kubeconfigPath string) (*restclient.Config, error)
+type isKubeInfraFunc func(pod *v1.Pod) bool
+
+var varGetNamespaceObject getPodNamespace
+var varBuildConfigFromFlags buildFromConfigFlag
+var varIsKubeInfra isKubeInfraFunc
+
+func init() {
+ varIsKubeInfra = isKubeInfra
+ varGetNamespaceObject = getPodNamespaceObject
+ varBuildConfigFromFlags = clientcmd.BuildConfigFromFlags
+}
+
// SMTAlignmentError represents an error due to SMT alignment
type SMTAlignmentError struct {
RequestedCPUs int
@@ -335,6 +357,38 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c
}
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) {
+ // Process infra pods before guaranteed pods
+ if varIsKubeInfra(pod) {
+ // Container belongs in reserved pool.
+ // We don't want to fall through to the p.guaranteedCPUs() clause below so return either nil or error.
+ if _, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
+ klog.Infof(
+ "[cpumanager] static policy: reserved container already present in state, skipping (namespace: %s, pod UID: %s, pod: %s, container: %s)",
+ pod.Namespace,
+ string(pod.UID),
+ pod.Name,
+ container.Name,
+ )
+ return nil
+ }
+
+ cpuset := p.reservedCPUs
+ if cpuset.IsEmpty() {
+ // If this happens then someone messed up.
+ return fmt.Errorf("[cpumanager] static policy: reserved container unable to allocate cpus (namespace: %s, pod UID: %s, pod: %s, container: %s); cpuset=%v, reserved:%v", pod.Namespace, string(pod.UID), pod.Name, container.Name, cpuset, p.reservedCPUs)
+ }
+ s.SetCPUSet(string(pod.UID), container.Name, cpuset)
+ klog.Infof(
+ "[cpumanager] static policy: reserved: AddContainer (namespace: %s, pod UID: %s, pod: %s, container: %s); cpuset=%v",
+ pod.Namespace,
+ string(pod.UID),
+ pod.Name,
+ container.Name,
+ cpuset,
+ )
+ return nil
+ }
+
numCPUs := p.guaranteedCPUs(pod, container)
if numCPUs == 0 {
// container belongs in the shared pool (nothing to do; use default cpuset)
@@ -512,6 +566,10 @@ func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int
klog.V(5).InfoS("Exclusive CPU allocation skipped, pod requested non-integral CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "cpu", cpuValue)
return 0
}
+ // Infrastructure pods use reserved CPUs even if they're in the Guaranteed QoS class
+ if varIsKubeInfra(pod) {
+ return 0
+ }
// Safe downcast to do for all systems with < 2.1 billion CPUs.
// Per the language spec, `int` is guaranteed to be at least 32 bits wide.
// https://golang.org/ref/spec#Numeric_types
@@ -743,6 +801,70 @@ func (p *staticPolicy) generateCPUTopologyHints(availableCPUs cpuset.CPUSet, reu
return hints
}
+func getPodNamespaceObject(podNamespaceName string) (*v1.Namespace, error) {
+
+ kubeConfigPath := constants.GetKubeletKubeConfigPath()
+ cfg, err := varBuildConfigFromFlags("", kubeConfigPath)
+ if err != nil {
+ klog.Error("Failed to build client config from ", kubeConfigPath, err.Error())
+ return nil, err
+ }
+
+ clientset, err := k8sclient.NewForConfig(cfg)
+ if err != nil {
+ klog.Error("Failed to get clientset for KUBECONFIG ", kubeConfigPath, err.Error())
+ return nil, err
+ }
+
+ namespaceObj, err := clientset.CoreV1().Namespaces().Get(context.TODO(), podNamespaceName, metav1.GetOptions{})
+ if err != nil {
+ klog.Error("Error getting namespace object:", err.Error())
+ return nil, err
+ }
+
+ return namespaceObj, nil
+
+}
+
+// check if a given pod is labelled as platform pod or
+// is in a namespace labelled as a platform namespace
+func isKubeInfra(pod *v1.Pod) bool {
+
+ podName := pod.GetName()
+ podNamespaceName := pod.GetNamespace()
+
+ if podNamespaceName == "kube-system" {
+ klog.Infof("Pod %s has %s namespace. Treating as platform pod.", podName, podNamespaceName)
+ return true
+ }
+
+ klog.InfoS("Checking pod ", podName, " for label 'app.starlingx.io/component=platform'.")
+ podLabels := pod.GetLabels()
+ val, ok := podLabels["app.starlingx.io/component"]
+ if ok && val == "platform" {
+ klog.InfoS("Pod ", podName, " has 'app.starlingx.io/component=platform' label. Treating as platform pod.")
+ return true
+ }
+
+ klog.V(4).InfoS("Pod ", pod.GetName(), " does not have 'app.starlingx.io/component=platform' label. Checking its namespace information...")
+
+ namespaceObj, err := varGetNamespaceObject(podNamespaceName)
+ if err != nil {
+ return false
+ }
+
+ namespaceLabels := namespaceObj.GetLabels()
+ val, ok = namespaceLabels["app.starlingx.io/component"]
+ if ok && val == "platform" {
+ klog.InfoS("For pod: ", podName, ", its Namespace ", podNamespaceName, " has 'app.starlingx.io/component=platform' label. Treating as platform pod.")
+ return true
+ }
+
+ klog.InfoS("Neither pod ", podName, " nor its namespace ", podNamespaceName, " has 'app.starlingx.io/component=platform' label. Not treating as platform pod.")
+ return false
+
+}
+
// isHintSocketAligned function return true if numa nodes in hint are socket aligned.
func (p *staticPolicy) isHintSocketAligned(hint topologymanager.TopologyHint, minAffinitySize int) bool {
numaNodesBitMask := hint.NUMANodeAffinity.GetBits()
diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go
index 28fea9c8aa7..23e33a743f6 100644
--- a/pkg/kubelet/cm/cpumanager/policy_static_test.go
+++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go
@@ -17,13 +17,16 @@ limitations under the License.
package cpumanager
import (
+ "errors"
"fmt"
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
+ restclient "k8s.io/client-go/rest"
featuregatetesting "k8s.io/component-base/featuregate/testing"
pkgfeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
@@ -1004,6 +1007,7 @@ type staticPolicyTestWithResvList struct {
stAssignments state.ContainerCPUAssignments
stDefaultCPUSet cpuset.CPUSet
pod *v1.Pod
+ isKubeInfraPod isKubeInfraFunc
expErr error
expNewErr error
expCPUAlloc bool
@@ -1096,7 +1100,17 @@ func TestStaticPolicyStartWithResvList(t *testing.T) {
}
}
+func fakeIsKubeInfraTrue(pod *v1.Pod) bool {
+ return true
+}
+
+func fakeIsKubeInfraFalse(pod *v1.Pod) bool {
+ return false
+}
+
func TestStaticPolicyAddWithResvList(t *testing.T) {
+ infraPod := makePod("fakePod", "fakeContainer2", "200m", "200m")
+ infraPod.Namespace = "kube-system"
testCases := []staticPolicyTestWithResvList{
{
@@ -1107,6 +1121,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.New(1, 2, 3, 4, 5, 6, 7),
pod: makePod("fakePod", "fakeContainer2", "8000m", "8000m"),
+ isKubeInfraPod: fakeIsKubeInfraFalse,
expErr: fmt.Errorf("not enough cpus available to satisfy request: requested=8, available=7"),
expCPUAlloc: false,
expCSet: cpuset.New(),
@@ -1119,12 +1134,13 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.New(2, 3, 4, 5, 6, 7),
pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"),
+ isKubeInfraPod: fakeIsKubeInfraFalse,
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.New(4), // expect sibling of partial core
},
{
- description: "GuPodMultipleCores, SingleSocketHT, ExpectAllocOneCore",
+ description: "InfraPod, SingleSocketHT, ExpectAllocReserved",
topo: topoSingleSocketHT,
numReservedCPUs: 2,
reserved: cpuset.New(0, 1),
@@ -1133,11 +1149,12 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
"fakeContainer100": cpuset.New(2, 3, 6, 7),
},
},
- stDefaultCPUSet: cpuset.New(0, 1, 4, 5),
- pod: makePod("fakePod", "fakeContainer3", "2000m", "2000m"),
+ stDefaultCPUSet: cpuset.New(4, 5),
+ pod: infraPod,
+ isKubeInfraPod: fakeIsKubeInfraTrue,
expErr: nil,
expCPUAlloc: true,
- expCSet: cpuset.New(4, 5),
+ expCSet: cpuset.New(0, 1),
},
}
@@ -1153,6 +1170,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
defaultCPUSet: testCase.stDefaultCPUSet,
}
+ varIsKubeInfra = testCase.isKubeInfraPod
container := &testCase.pod.Spec.Containers[0]
err = policy.Allocate(st, testCase.pod, container)
if !reflect.DeepEqual(err, testCase.expErr) {
@@ -1201,8 +1219,8 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
{
description: "GuPodSingleContainerSaturating, DualSocketHTUncore, ExpectAllocOneUncore, FullUncoreAvail",
topo: topoDualSocketSingleNumaPerSocketSMTUncore,
- numReservedCPUs: 8,
- reserved: cpuset.New(0, 1, 96, 97, 192, 193, 288, 289), // note 4 cpus taken from uncore 0, 4 from uncore 12
+ numReservedCPUs: 2,
+ reserved: cpuset.New(8, 9),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1230,8 +1248,8 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
{
description: "GuPodMainAndSidecarContainer, DualSocketHTUncore, ExpectAllocOneUncore, FullUncoreAvail",
topo: topoDualSocketSingleNumaPerSocketSMTUncore,
- numReservedCPUs: 8,
- reserved: cpuset.New(0, 1, 96, 97, 192, 193, 288, 289), // note 4 cpus taken from uncore 0, 4 from uncore 12
+ numReservedCPUs: 2,
+ reserved: cpuset.New(8, 9),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1260,8 +1278,8 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
{
description: "GuPodSidecarAndMainContainer, DualSocketHTUncore, ExpectAllocOneUncore, FullUncoreAvail",
topo: topoDualSocketSingleNumaPerSocketSMTUncore,
- numReservedCPUs: 8,
- reserved: cpuset.New(0, 1, 96, 97, 192, 193, 288, 289), // note 4 cpus taken from uncore 0, 4 from uncore 12
+ numReservedCPUs: 2,
+ reserved: cpuset.New(8, 9),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1290,8 +1308,8 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
{
description: "GuPodMainAndManySidecarContainer, DualSocketHTUncore, ExpectAllocOneUncore, FullUncoreAvail",
topo: topoDualSocketSingleNumaPerSocketSMTUncore,
- numReservedCPUs: 8,
- reserved: cpuset.New(0, 1, 96, 97, 192, 193, 288, 289), // note 4 cpus taken from uncore 0, 4 from uncore 12
+ numReservedCPUs: 2,
+ reserved: cpuset.New(8, 9),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1322,8 +1340,8 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
{
description: "GuPodMainAndSidecarContainer, DualSocketHTUncore, ExpectAllocTwoUncore",
topo: topoDualSocketSingleNumaPerSocketSMTUncore,
- numReservedCPUs: 8,
- reserved: cpuset.New(0, 1, 96, 97, 192, 193, 288, 289), // note 4 cpus taken from uncore 0, 4 from uncore 12
+ numReservedCPUs: 4,
+ reserved: cpuset.New(0, 1, 8, 9),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1346,7 +1364,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
description: "GuPodSingleContainer, SingleSocketSMTSmallUncore, ExpectAllocOneUncore",
topo: topoSingleSocketSingleNumaPerSocketSMTSmallUncore,
numReservedCPUs: 4,
- reserved: cpuset.New(0, 1, 64, 65), // note 4 cpus taken from uncore 0
+ reserved: cpuset.New(8, 9, 10, 11),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1362,7 +1380,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
),
"with-app-container-saturating",
),
- expUncoreCache: cpuset.New(1),
+ expUncoreCache: cpuset.New(2),
},
{
// Best-effort policy allows larger containers to be scheduled using a packed method
@@ -1388,7 +1406,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
),
"with-app-container-saturating",
),
- expUncoreCache: cpuset.New(0, 2),
+ expUncoreCache: cpuset.New(0),
},
{
// Best-effort policy allows larger containers to be scheduled using a packed method
@@ -1420,14 +1438,14 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
),
"with-app-container-saturating",
),
- expUncoreCache: cpuset.New(1, 4, 6),
+ expUncoreCache: cpuset.New(0),
},
{
// Uncore cache alignment following a packed methodology
description: "GuPodMultiContainer, DualSocketSMTUncore, FragmentedUncore, ExpectAllocOneUncore",
topo: topoSmallDualSocketSingleNumaPerSocketNoSMTUncore, // 8 cpus per uncore
numReservedCPUs: 4,
- reserved: cpuset.New(0, 1, 32, 33), // note 2 cpus taken from uncore 0, 2 from uncore 4
+ reserved: cpuset.New(0, 1, 2, 3),
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1474,7 +1492,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
),
"with-multiple-container",
),
- expUncoreCache: cpuset.New(0, 2),
+ expUncoreCache: cpuset.New(0, 4),
},
{
// CPU assignments able to fit on partially available uncore cache
@@ -1501,7 +1519,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
),
"with-multiple-container",
),
- expUncoreCache: cpuset.New(0, 1),
+ expUncoreCache: cpuset.New(0),
},
{
// CPU assignments unable to fit on partially available uncore cache
@@ -1528,7 +1546,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
),
"with-multiple-container",
),
- expUncoreCache: cpuset.New(2, 3),
+ expUncoreCache: cpuset.New(0),
},
{
// Full NUMA allocation on split-cache architecture with NPS=2
@@ -1551,7 +1569,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
),
"with-large-single-container",
),
- expUncoreCache: cpuset.New(6, 7, 8, 9, 10, 11), // uncore caches of NUMA Node 1
+ expUncoreCache: cpuset.New(0), // uncore caches of NUMA Node 1
},
{
// PreferAlignByUnCoreCacheOption will not impact monolithic x86 architectures
@@ -1575,7 +1593,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
"with-single-container",
),
expCPUAlloc: true,
- expCSet: cpuset.New(2, 3, 4, 122, 123, 124),
+ expCSet: cpuset.New(0, 1, 120, 121),
expUncoreCache: cpuset.New(0),
},
{
@@ -1603,15 +1621,15 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
"with-single-container",
),
expCPUAlloc: true,
- expCSet: cpuset.New(2, 3, 8, 9, 16, 17), // identical to default packed assignment
+ expCSet: cpuset.New(0, 1), // identical to default packed assignment
expUncoreCache: cpuset.New(0),
},
{
// Compatibility with ARM-based split cache architectures
description: "GuPodSingleContainer, LargeSingleSocketUncore, ExpectAllocOneUncore",
topo: topoLargeSingleSocketSingleNumaPerSocketUncore, // 8 cpus per uncore
- numReservedCPUs: 4,
- reserved: cpuset.New(0, 1, 2, 3), // note 4 cpus taken from uncore 0
+ numReservedCPUs: 2,
+ reserved: cpuset.New(8, 9), //
cpuPolicyOptions: map[string]string{
FullPCPUsOnlyOption: "true",
PreferAlignByUnCoreCacheOption: "true",
@@ -1660,7 +1678,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
"with-single-container",
),
expCPUAlloc: true,
- expCSet: cpuset.New(2, 3, 4, 5, 10, 11, 16, 17, 20, 21, 22, 23), // identical to default packed assignment
+ expCSet: cpuset.New(0, 1), // identical to default packed assignment
expUncoreCache: cpuset.New(0),
},
{
@@ -1694,7 +1712,7 @@ func TestStaticPolicyAddWithUncoreAlignment(t *testing.T) {
),
"with-single-container",
),
- expUncoreCache: cpuset.New(0, 1), // best-effort across uncore cache 0 and 1
+ expUncoreCache: cpuset.New(0), // best-effort across uncore cache 0 and 1
},
}
@@ -1832,6 +1850,133 @@ func TestStaticPolicyOptions(t *testing.T) {
}
}
+func makePodWithLabels(podLabels map[string]string) *v1.Pod {
+ return &v1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-pod",
+ Namespace: "test-namespace",
+ Labels: podLabels,
+ },
+ }
+}
+
+func fakeBuildConfigFromFlags(masterUrl string, kubeconfigPath string) (*restclient.Config, error) {
+
+ return &restclient.Config{}, nil
+}
+
+func fakeBuildConfigFromFlagsError(masterUrl string, kubeconfigPath string) (*restclient.Config, error) {
+
+ errString := fmt.Sprintf("%s file not found", kubeconfigPath)
+ return nil, errors.New(errString)
+
+}
+
+func getFakeInfraPodNamespace(_ string) (*v1.Namespace, error) {
+
+ return &v1.Namespace{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-namespace",
+ Labels: map[string]string{
+ "app.starlingx.io/component": "platform",
+ },
+ }}, nil
+}
+
+func getFakeNonInfraPodNamespace(_ string) (*v1.Namespace, error) {
+
+ return &v1.Namespace{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-namespace",
+ Labels: map[string]string{
+ "fake": "label",
+ }}}, nil
+
+}
+
+type kubeInfraPodTestCase struct {
+ description string
+ pod *v1.Pod
+ namespaceFunc getPodNamespace
+ expectedValue bool
+}
+
+func TestKubeInfraPod(t *testing.T) {
+ testCases := []kubeInfraPodTestCase{
+ {
+ description: "Pod with platform label and namespace with platform label",
+ pod: makePodWithLabels(map[string]string{
+ "app.starlingx.io/component": "platform",
+ }),
+ namespaceFunc: getFakeInfraPodNamespace,
+ expectedValue: true,
+ },
+ {
+ description: "Pod with platform label and namespace without platform label",
+ pod: makePodWithLabels(map[string]string{
+ "app.starlingx.io/component": "platform",
+ }),
+ namespaceFunc: getFakeNonInfraPodNamespace,
+ expectedValue: true,
+ },
+ {
+ description: "Pod without platform label and namespace with platform label",
+ pod: makePodWithLabels(map[string]string{
+ "test": "label",
+ }),
+ namespaceFunc: getFakeInfraPodNamespace,
+ expectedValue: true,
+ },
+ {
+ description: "Pod without platform label and namespace without platform label",
+ pod: makePodWithLabels(map[string]string{
+ "test": "namespace",
+ }),
+ namespaceFunc: getFakeNonInfraPodNamespace,
+ expectedValue: false,
+ },
+ }
+
+ for _, testCase := range testCases {
+ t.Run(testCase.description, func(t *testing.T) {
+
+ varGetNamespaceObject = testCase.namespaceFunc
+ varBuildConfigFromFlags = fakeBuildConfigFromFlags
+ gotValue := isKubeInfra(testCase.pod)
+
+ if gotValue != testCase.expectedValue {
+ t.Errorf("StaticPolicy isKubeInfraPod() error %v. expected value %v actual value %v",
+ testCase.description, testCase.expectedValue, gotValue)
+ } else {
+ fmt.Printf("StaticPolicy isKubeInfraPod() test successful. : %v ", testCase.description)
+ }
+
+ })
+ }
+
+ test := kubeInfraPodTestCase{
+ description: "Failure reading kubeconfig file",
+ pod: makePodWithLabels(map[string]string{
+ "test": "namespace",
+ }),
+ namespaceFunc: getFakeNonInfraPodNamespace,
+ expectedValue: false,
+ }
+
+ varGetNamespaceObject = getPodNamespaceObject
+ varBuildConfigFromFlags = fakeBuildConfigFromFlagsError
+
+ gotValue := isKubeInfra(test.pod)
+
+ if gotValue != test.expectedValue {
+ t.Errorf("StaticPolicy isKubeInfraPod() error %v. expected value %v actual value %v",
+ test.description, test.expectedValue, gotValue)
+ } else {
+ fmt.Printf("StaticPolicy isKubeInfraPod() test successful. : %v ", test.description)
+ }
+
+}
+
func TestSMTAlignmentErrorText(t *testing.T) {
type smtErrTestCase struct {
name string
diff --git a/pkg/kubelet/cm/cpumanager/topology_hints_test.go b/pkg/kubelet/cm/cpumanager/topology_hints_test.go
index f6c230bc32f..ebb314eb434 100644
--- a/pkg/kubelet/cm/cpumanager/topology_hints_test.go
+++ b/pkg/kubelet/cm/cpumanager/topology_hints_test.go
@@ -197,6 +197,7 @@ func TestPodGuaranteedCPUs(t *testing.T) {
expectedCPU: 210,
},
}
+ varIsKubeInfra = fakeIsKubeInfraFalse
for _, tc := range tcases {
t.Run(tc.name, func(t *testing.T) {
requestedCPU := p.podGuaranteedCPUs(tc.pod)
@@ -241,6 +242,7 @@ func TestGetTopologyHints(t *testing.T) {
sourcesReady: &sourcesReadyStub{},
}
+ varIsKubeInfra = fakeIsKubeInfraFalse
hints := m.GetTopologyHints(&tc.pod, &tc.container)[string(v1.ResourceCPU)]
if len(tc.expectedHints) == 0 && len(hints) == 0 {
continue
@@ -289,6 +291,7 @@ func TestGetPodTopologyHints(t *testing.T) {
sourcesReady: &sourcesReadyStub{},
}
+ varIsKubeInfra = fakeIsKubeInfraFalse
podHints := m.GetPodTopologyHints(&tc.pod)[string(v1.ResourceCPU)]
if len(tc.expectedHints) == 0 && len(podHints) == 0 {
continue
@@ -471,6 +474,7 @@ func TestGetPodTopologyHintsWithPolicyOptions(t *testing.T) {
sourcesReady: &sourcesReadyStub{},
}
+ varIsKubeInfra = fakeIsKubeInfraFalse
podHints := m.GetPodTopologyHints(&testCase.pod)[string(v1.ResourceCPU)]
sort.SliceStable(podHints, func(i, j int) bool {
return podHints[i].LessThan(podHints[j])
--
2.34.1

View File

@ -0,0 +1,168 @@
From d11918cde1a0ab7e87af70bf69fad0352e0a4184 Mon Sep 17 00:00:00 2001
From: Saba Touheed Mujawar <sabatouheed.mujawar@windriver.com>
Date: Mon, 28 Apr 2025 05:11:52 -0400
Subject: [PATCH] kubelet: isolcpus SMT aware sorted allocation
Enhance isolcpus support in Kubernetes to allocate isolated SMT
siblings to the same container when SMT/HT is enabled on the host.
As it stands, the device manager code in Kubernetes is not SMT-aware
(since normally it doesn't deal with CPUs). However, StarlingX
exposes isolated CPUs as devices and if possible we want to allocate
all SMT siblings from a CPU core to the same container in order to
minimize cross- container interference due to resource contention
within the CPU core.The existing device manager code returns CPUs
as devices in unsorted order. This numerically sorts isolcpus
allocations when SMT/HT is enabled on the host. This logs SMT
pairs, singletons, and algorithm order details to make the
algorithm understandable.
The solution is basically to take the list of isolated CPUs and
re-order it so that the SMT siblings are next to each other. That
way the existing resource selection code will allocate the siblings
together. As an optimization, if it is known that an odd number
of isolated CPUs are desired, a singleton SMT sibling will be
inserted into the list to avoid breaking up sibling pairs.
Signed-off-by: Tao Wang <tao.wang@windriver.com>
Signed-off-by: Ramesh Kumar Sivanandam <rameshkumar.sivanandam@windriver.com>
Signed-off-by: Boovan Rajendran <boovan.rajendran@windriver.com>
Signed-off-by: Saba Touheed Mujawar <sabatouheed.mujawar@windriver.com>
---
pkg/kubelet/cm/devicemanager/manager.go | 99 ++++++++++++++++++++++++-
1 file changed, 98 insertions(+), 1 deletion(-)
diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go
index 9ed9c132293..d16b6627251 100644
--- a/pkg/kubelet/cm/devicemanager/manager.go
+++ b/pkg/kubelet/cm/devicemanager/manager.go
@@ -24,6 +24,8 @@ import (
"path/filepath"
"runtime"
"sort"
+ "strconv"
+ "strings"
"sync"
"time"
@@ -51,6 +53,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
+ "k8s.io/utils/cpuset"
)
const nodeWithoutTopology = -1
@@ -566,6 +569,91 @@ func (m *ManagerImpl) UpdateAllocatedDevices() {
m.allocatedDevices = m.podDevices.devices()
}
+// Given a list of isolated CPUs in 'devices', and the number of desired CPUs in 'needed',
+// return an ordered list of isolated CPUs such that the first 'needed' CPUs in the list
+// contain as many hyperthread sibling pairs as possible.
+func order_devices_by_sibling(devices sets.Set[string], needed int) ([]string, error) {
+ var dev_lst []string
+ var single_lst []string
+ sibling_lst := make([]string, 0, int(devices.Len()))
+ _iterated_cpu := make(map[string]string)
+ get_sibling := func(cpu string, cpu_lst []string) string {
+ if cpu_lst[0] == cpu {
+ return cpu_lst[1]
+ } else {
+ return cpu_lst[0]
+ }
+ }
+ //Make post-analysis of selection algorithm obvious by numerical sorting
+ //the available isolated cpu_id.
+ cpu_ids := make([]int, 0, int(devices.Len()))
+ for cpu_id := range devices {
+ cpu_id_, _ := strconv.Atoi(cpu_id)
+ cpu_ids = append(cpu_ids, cpu_id_)
+ }
+ sort.Ints(cpu_ids)
+ for _, _cpu_id := range cpu_ids {
+ cpu_id := strconv.Itoa(_cpu_id)
+ // If we've already found cpu_id as a sibling, skip it.
+ if _, ok := _iterated_cpu[cpu_id]; ok {
+ continue
+ }
+ devPath := fmt.Sprintf("/sys/devices/system/cpu/cpu%s/topology/thread_siblings_list", cpu_id)
+ dat, err := os.ReadFile(devPath)
+ if err != nil {
+ return dev_lst, fmt.Errorf("Can't read cpu[%s] thread_siblings_list", cpu_id)
+ }
+ cpustring := strings.TrimSuffix(string(dat), "\n")
+ cpu_pair_set, err := cpuset.Parse(cpustring)
+ if err != nil {
+ return dev_lst, fmt.Errorf("Unable to parse thread_siblings_list[%s] string to cpuset", cpustring)
+ }
+ var cpu_pair_lst []string
+ for _, v := range cpu_pair_set.List() {
+ cpu_pair_lst = append(cpu_pair_lst, strconv.Itoa(v))
+ }
+ sibling_cpu_id := get_sibling(cpu_id, cpu_pair_lst)
+ if _, ok := devices[sibling_cpu_id]; ok {
+ sibling_lst = append(sibling_lst, cpu_id, sibling_cpu_id)
+ _iterated_cpu[sibling_cpu_id] = ""
+ } else {
+ single_lst = append(single_lst, cpu_id)
+ }
+ _iterated_cpu[cpu_id] = ""
+ }
+ if needed%2 == 0 {
+ dev_lst = append(sibling_lst, single_lst...)
+ } else {
+ if len(single_lst) > 1 {
+ _tmp_list := append(sibling_lst, single_lst[1:]...)
+ dev_lst = append(single_lst[0:1], _tmp_list...)
+ } else {
+ if len(single_lst) == 0 {
+ dev_lst = sibling_lst
+ } else {
+ dev_lst = append(single_lst, sibling_lst...)
+ }
+ }
+ }
+ //This algorithm will get some attention. Show minimal details.
+ klog.Infof(
+ "order_devices_by_sibling: needed=%d, smtpairs=%v, singletons=%v, order=%v",
+ needed,
+ sibling_lst,
+ single_lst,
+ dev_lst,
+ )
+ return dev_lst, nil
+}
+func smt_enabled() bool {
+ dat, _ := os.ReadFile("/sys/devices/system/cpu/smt/active")
+ state := strings.TrimSuffix(string(dat), "\n")
+ if state == "0" {
+ return false
+ }
+ return true
+}
+
// Returns list of device Ids we need to allocate with Allocate rpc call.
// Returns empty list in case we don't need to issue the Allocate rpc call.
func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int, reusableDevices sets.Set[string]) (sets.Set[string], error) {
@@ -646,7 +734,16 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
if m.allocatedDevices[resource] == nil {
m.allocatedDevices[resource] = sets.New[string]()
}
- for device := range devices.Difference(allocated) {
+ availableDevices := sets.List[string](devices.Difference(allocated))
+ // If we're dealing with isolcpus and SMT is enabled, reorder to group SMT siblings together.
+ if resource == "windriver.com/isolcpus" && len(devices) > 0 && smt_enabled() {
+ var err error
+ availableDevices, err = order_devices_by_sibling(devices.Difference(allocated), needed)
+ if err != nil {
+ klog.Errorf("error in order_devices_by_sibling: %v", err)
+ }
+ }
+ for _, device := range availableDevices {
m.allocatedDevices[resource].Insert(device)
allocated.Insert(device)
needed--
--
2.34.1

View File

@ -0,0 +1,48 @@
From c5d43fd96dca833475af61d3027594310ea30674 Mon Sep 17 00:00:00 2001
From: Saba Touheed Mujawar <sabatouheed.mujawar@windriver.com>
Date: Wed, 18 Jun 2025 09:26:33 -0400
Subject: [PATCH] kubelet reduce logspam calculating sandbox resources
On v1.33 nodes, the Kubelet logs have lots of lines like
kuberuntime_sandbox_linux.go:62] "Enforcing CFS quota"
pod="kube-system/pod_name" unlimited=false
This change lowers the verbosity of the CFS quota enforcement
log from V(2) to V(5)
Please see: https://github.com/kubernetes/kubernetes/pull/132168/files
Signed-off-by: Saba Touheed Mujawar <sabatouheed.mujawar@windriver.com>
---
pkg/kubelet/kuberuntime/kuberuntime_container_linux.go | 2 +-
pkg/kubelet/kuberuntime/kuberuntime_sandbox_linux.go | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container_linux.go b/pkg/kubelet/kuberuntime/kuberuntime_container_linux.go
index 92321bd9bd4..c449a3df35d 100644
--- a/pkg/kubelet/kuberuntime/kuberuntime_container_linux.go
+++ b/pkg/kubelet/kuberuntime/kuberuntime_container_linux.go
@@ -137,7 +137,7 @@ func (m *kubeGenericRuntimeManager) generateLinuxContainerResources(pod *v1.Pod,
// If pod has exclusive cpu and the container in question has integer cpu requests
// the cfs quota will not be enforced
disableCPUQuota := utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DisableCPUQuotaWithExclusiveCPUs) && m.containerManager.ContainerHasExclusiveCPUs(pod, container)
- klog.V(2).InfoS("Enforcing CFS quota", "pod", klog.KObj(pod), "unlimited", disableCPUQuota)
+ klog.V(5).InfoS("Enforcing CFS quota", "pod", klog.KObj(pod), "unlimited", disableCPUQuota)
lcr := m.calculateLinuxResources(cpuRequest, cpuLimit, memoryLimit, disableCPUQuota)
lcr.OomScoreAdj = int64(qos.GetContainerOOMScoreAdjust(pod, container,
diff --git a/pkg/kubelet/kuberuntime/kuberuntime_sandbox_linux.go b/pkg/kubelet/kuberuntime/kuberuntime_sandbox_linux.go
index ebf8d4e6204..851386f0fd3 100644
--- a/pkg/kubelet/kuberuntime/kuberuntime_sandbox_linux.go
+++ b/pkg/kubelet/kuberuntime/kuberuntime_sandbox_linux.go
@@ -59,7 +59,7 @@ func (m *kubeGenericRuntimeManager) calculateSandboxResources(pod *v1.Pod) *runt
// If pod has exclusive cpu the sandbox will not have cfs quote enforced
disableCPUQuota := utilfeature.DefaultFeatureGate.Enabled(features.DisableCPUQuotaWithExclusiveCPUs) && m.containerManager.PodHasExclusiveCPUs(pod)
- klog.V(2).InfoS("Enforcing CFS quota", "pod", klog.KObj(pod), "unlimited", disableCPUQuota)
+ klog.V(5).InfoS("Enforcing CFS quota", "pod", klog.KObj(pod), "unlimited", disableCPUQuota)
return m.calculateLinuxResources(cpuRequest, lim.Cpu(), lim.Memory(), disableCPUQuota)
}
--
2.34.1

View File

@ -0,0 +1,8 @@
kubeadm-platform-pods-zero-cpu-resources-readiness-p.patch
kubelet-isolcpus-SMT-aware-sorted-allocation.patch
kubelet-cpumanager-keep-normal-containers-off-reserv.patch
kubelet-cpumanager-platform-pods-on-reserved-cpus.patch
kubelet-cpumanager-introduce-concept-of-isolated-CPU.patch
kubeadm-reduce-UpgradeManifestTimeout.patch
Revert-kubeadm-use-new-etcd-livez-and-readyz-endpoint.patch
kubelet-reduce-logspam-calculating-sandbox-resources.patch