Add runfn implementation with timeout support

Allows to limit execution time of KRM function.

Change-Id: I7fd8d97492512c6c5033375429906a1268f3818e
Signed-off-by: Ruslan Aliev <raliev@mirantis.com>
Closes: #544
Closes: #545
Closes: #597
This commit is contained in:
Ruslan Aliev 2021-11-17 07:46:57 +00:00
parent 1aab57af13
commit d0e2f6dbbf
8 changed files with 1873 additions and 1 deletions

View File

@ -149,6 +149,12 @@ spec:
parameter is specified. Else it will write output to STDOUT. This
path relative to current site root.
type: string
timeout:
description: Timeout is the maximum amount of time (in seconds) for
container execution if not specified (0) no timeout will be set
and container could run indefinitely
format: int64
type: integer
type:
description: Supported types are "airship" and "krm"
type: string

View File

@ -95,6 +95,10 @@ type GenericContainerSpec struct {
// Mounts are the storage or directories to mount into the container
StorageMounts []StorageMount `json:"mounts,omitempty" yaml:"mounts,omitempty"`
// Timeout is the maximum amount of time (in seconds) for container execution
// if not specified (0) no timeout will be set and container could run indefinitely
Timeout uint64 `json:"timeout,omitempty"`
}
// AirshipContainerSpec airship container settings

View File

@ -28,11 +28,11 @@ import (
"github.com/ahmetb/dlog"
"sigs.k8s.io/kustomize/kyaml/fn/runtime/runtimeutil"
"sigs.k8s.io/kustomize/kyaml/kio"
"sigs.k8s.io/kustomize/kyaml/runfn"
kyaml "sigs.k8s.io/kustomize/kyaml/yaml"
"sigs.k8s.io/yaml"
"opendev.org/airship/airshipctl/pkg/api/v1alpha1"
"opendev.org/airship/airshipctl/pkg/container/runfn"
"opendev.org/airship/airshipctl/pkg/log"
"opendev.org/airship/airshipctl/pkg/util"
)
@ -228,6 +228,7 @@ func (c *V1Alpha1) runKRM() error {
Output: c.output,
StorageMounts: mounts,
ContinueOnEmptyResult: true,
Timeout: c.conf.Spec.Timeout,
}
function, err := kyaml.Parse(c.conf.Config)
if err != nil {

View File

@ -0,0 +1,532 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runfn
import (
"fmt"
"io"
"os"
"os/user"
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"sync/atomic"
"sigs.k8s.io/kustomize/kyaml/errors"
"sigs.k8s.io/kustomize/kyaml/fn/runtime/container"
"sigs.k8s.io/kustomize/kyaml/fn/runtime/runtimeutil"
"sigs.k8s.io/kustomize/kyaml/kio"
"sigs.k8s.io/kustomize/kyaml/kio/kioutil"
"sigs.k8s.io/kustomize/kyaml/yaml"
)
// RunFns runs the set of configuration functions in a local directory against
// the Resources in that directory
type RunFns struct {
StorageMounts []runtimeutil.StorageMount
// Path is the path to the directory containing functions
Path string
// FunctionPaths Paths allows functions to be specified outside the configuration
// directory.
// Functions provided on FunctionPaths are globally scoped.
// If FunctionPaths length is > 0, then NoFunctionsFromInput defaults to true
FunctionPaths []string
// Functions is an explicit list of functions to run against the input.
// Functions provided on Functions are globally scoped.
// If Functions length is > 0, then NoFunctionsFromInput defaults to true
Functions []*yaml.RNode
// GlobalScope if true, functions read from input will be scoped globally rather
// than only to Resources under their subdirs.
GlobalScope bool
// Input can be set to read the Resources from Input rather than from a directory
Input io.Reader
// Network enables network access for functions that declare it
Network bool
// Output can be set to write the result to Output rather than back to the directory
Output io.Writer
// NoFunctionsFromInput if set to true will not read any functions from the input,
// and only use explicit sources
NoFunctionsFromInput *bool
// ResultsDir is where to write each functions results
ResultsDir string
// LogSteps enables logging the function that is running.
LogSteps bool
// LogWriter can be set to write the logs to LogWriter rather than stderr if LogSteps is enabled.
LogWriter io.Writer
// resultsCount is used to generate the results filename for each container
resultsCount uint32
// functionFilterProvider provides a filter to perform the function.
// this is a variable so it can be mocked in tests
functionFilterProvider func(
filter runtimeutil.FunctionSpec, api *yaml.RNode, currentUser currentUserFunc) (kio.Filter, error)
// noCmdSet is used in unit test to bypass setting cmd and args for docker, false by default
noCmdSet bool
// AsCurrentUser is a boolean to indicate whether docker container should use
// the uid and gid that run the command
AsCurrentUser bool
// Env contains environment variables that will be exported to container
Env []string
// ContinueOnEmptyResult configures what happens when the underlying pipeline
// returns an empty result.
// If it is false (default), subsequent functions will be skipped and the
// result will be returned immediately.
// If it is true, the empty result will be provided as input to the next
// function in the list.
ContinueOnEmptyResult bool
// Timeout is the maximum amount of time (in seconds) for KRM function execution
Timeout uint64
}
// Execute runs the command
func (r RunFns) Execute() error {
// make the path absolute so it works on mac
var err error
r.Path, err = filepath.Abs(r.Path)
if err != nil {
return errors.Wrap(err)
}
// default the containerFilterProvider if it hasn't been override. Split out for testing.
(&r).init()
nodes, fltrs, output, err := r.getNodesAndFilters()
if err != nil {
return err
}
return r.runFunctions(nodes, output, fltrs)
}
func (r RunFns) getNodesAndFilters() (
*kio.PackageBuffer, []kio.Filter, *kio.LocalPackageReadWriter, error) {
// Read Resources from Directory or Input
buff := &kio.PackageBuffer{}
p := kio.Pipeline{Outputs: []kio.Writer{buff}}
// save the output dir because we will need it to write back
// the same one for reading must be used for writing if deleting Resources
var outputPkg *kio.LocalPackageReadWriter
if r.Path != "" {
outputPkg = &kio.LocalPackageReadWriter{PackagePath: r.Path, MatchFilesGlob: kio.MatchAll}
}
if r.Input == nil {
p.Inputs = []kio.Reader{outputPkg}
} else {
p.Inputs = []kio.Reader{&kio.ByteReader{Reader: r.Input}}
}
if err := p.Execute(); err != nil {
return nil, nil, outputPkg, err
}
fltrs, err := r.getFilters(buff.Nodes)
if err != nil {
return nil, nil, outputPkg, err
}
return buff, fltrs, outputPkg, nil
}
func (r RunFns) getFilters(nodes []*yaml.RNode) ([]kio.Filter, error) {
var fltrs []kio.Filter
// fns from annotations on the input resources
f, err := r.getFunctionsFromInput(nodes)
if err != nil {
return nil, err
}
fltrs = append(fltrs, f...)
// fns from directories specified on the struct
f, err = r.getFunctionsFromFunctionPaths()
if err != nil {
return nil, err
}
fltrs = append(fltrs, f...)
// explicit fns specified on the struct
f, err = r.getFunctionsFromFunctions()
if err != nil {
return nil, err
}
fltrs = append(fltrs, f...)
return fltrs, nil
}
// runFunctions runs the fltrs against the input and writes to either r.Output or output
func (r RunFns) runFunctions(
input kio.Reader, output kio.Writer, fltrs []kio.Filter) error {
// use the previously read Resources as input
var outputs []kio.Writer
if r.Output == nil {
// write back to the package
outputs = append(outputs, output)
} else {
// write to the output instead of the directory if r.Output is specified or
// the output is nil (reading from Input)
outputs = append(outputs, kio.ByteWriter{Writer: r.Output})
}
var err error
pipeline := kio.Pipeline{
Inputs: []kio.Reader{input},
Filters: fltrs,
Outputs: outputs,
ContinueOnEmptyResult: r.ContinueOnEmptyResult,
}
if r.LogSteps {
err = pipeline.ExecuteWithCallback(func(op kio.Filter) {
var identifier string
switch filter := op.(type) {
case *container.Filter:
identifier = filter.Image
default:
identifier = "unknown-type function"
}
_, _ = fmt.Fprintf(r.LogWriter, "Running %s\n", identifier)
})
} else {
err = pipeline.Execute()
}
if err != nil {
return err
}
// check for deferred function errors
var errs []string
for i := range fltrs {
cf, ok := fltrs[i].(runtimeutil.DeferFailureFunction)
if !ok {
continue
}
if cf.GetExit() != nil {
errs = append(errs, cf.GetExit().Error())
}
}
if len(errs) > 0 {
return fmt.Errorf(strings.Join(errs, "\n---\n"))
}
return nil
}
// getFunctionsFromInput scans the input for functions and runs them
func (r RunFns) getFunctionsFromInput(nodes []*yaml.RNode) ([]kio.Filter, error) {
if *r.NoFunctionsFromInput {
return nil, nil
}
buff := &kio.PackageBuffer{}
err := kio.Pipeline{
Inputs: []kio.Reader{&kio.PackageBuffer{Nodes: nodes}},
Filters: []kio.Filter{&runtimeutil.IsReconcilerFilter{}},
Outputs: []kio.Writer{buff},
}.Execute()
if err != nil {
return nil, err
}
err = sortFns(buff)
if err != nil {
return nil, err
}
return r.getFunctionFilters(false, buff.Nodes...)
}
// getFunctionsFromFunctionPaths returns the set of functions read from r.FunctionPaths
// as a slice of Filters
func (r RunFns) getFunctionsFromFunctionPaths() ([]kio.Filter, error) {
buff := &kio.PackageBuffer{}
for i := range r.FunctionPaths {
err := kio.Pipeline{
Inputs: []kio.Reader{
kio.LocalPackageReader{PackagePath: r.FunctionPaths[i]},
},
Outputs: []kio.Writer{buff},
}.Execute()
if err != nil {
return nil, err
}
}
return r.getFunctionFilters(true, buff.Nodes...)
}
// getFunctionsFromFunctions returns the set of explicitly provided functions as
// Filters
func (r RunFns) getFunctionsFromFunctions() ([]kio.Filter, error) {
return r.getFunctionFilters(true, r.Functions...)
}
// mergeContainerEnv will merge the envs specified by command line (imperative) and config
// file (declarative). If they have same key, the imperative value will be respected.
func (r RunFns) mergeContainerEnv(envs []string) []string {
imperative := runtimeutil.NewContainerEnvFromStringSlice(r.Env)
declarative := runtimeutil.NewContainerEnvFromStringSlice(envs)
for key, value := range imperative.EnvVars {
declarative.AddKeyValue(key, value)
}
for _, key := range imperative.VarsToExport {
declarative.AddKey(key)
}
return declarative.Raw()
}
func (r RunFns) getFunctionFilters(global bool, fns ...*yaml.RNode) (
[]kio.Filter, error) {
fltrs := make([]kio.Filter, 0)
for i := range fns {
api := fns[i]
spec := runtimeutil.GetFunctionSpec(api)
if spec == nil {
// resource doesn't have function spec
continue
}
if spec.Container.Network && !r.Network {
// TODO(eddiezane): Provide error info about which function needs the network
return fltrs, errors.Errorf("network required but not enabled with --network")
}
// merge envs from imperative and declarative
spec.Container.Env = r.mergeContainerEnv(spec.Container.Env)
c, err := r.functionFilterProvider(*spec, api, user.Current)
if err != nil {
return nil, err
}
if c == nil {
continue
}
cf, ok := c.(*container.Filter)
if global && ok {
cf.Exec.GlobalScope = true
}
fltrs = append(fltrs, c)
}
return fltrs, nil
}
// sortFns sorts functions so that functions with the longest paths come first
func sortFns(buff *kio.PackageBuffer) error {
var outerErr error
// sort the nodes so that we traverse them depth first
// functions deeper in the file system tree should be run first
sort.Slice(buff.Nodes, func(i, j int) bool {
mi, err := buff.Nodes[i].GetMeta()
if err != nil {
outerErr = err
return false
}
pi := filepath.ToSlash(mi.Annotations[kioutil.PathAnnotation])
mj, err := buff.Nodes[j].GetMeta()
if err != nil {
outerErr = err
return false
}
pj := filepath.ToSlash(mj.Annotations[kioutil.PathAnnotation])
// If the path is the same, we decide the ordering based on the
// index annotation.
if pi == pj {
iIndex, err := strconv.Atoi(mi.Annotations[kioutil.IndexAnnotation])
if err != nil {
outerErr = err
return false
}
jIndex, err := strconv.Atoi(mj.Annotations[kioutil.IndexAnnotation])
if err != nil {
outerErr = err
return false
}
return iIndex < jIndex
}
if filepath.Base(path.Dir(pi)) == "functions" {
// don't count the functions dir, the functions are scoped 1 level above
pi = filepath.Dir(path.Dir(pi))
} else {
pi = filepath.Dir(pi)
}
if filepath.Base(path.Dir(pj)) == "functions" {
// don't count the functions dir, the functions are scoped 1 level above
pj = filepath.Dir(path.Dir(pj))
} else {
pj = filepath.Dir(pj)
}
// i is "less" than j (comes earlier) if its depth is greater -- e.g. run
// i before j if it is deeper in the directory structure
li := len(strings.Split(pi, "/"))
if pi == "." {
// local dir should have 0 path elements instead of 1
li = 0
}
lj := len(strings.Split(pj, "/"))
if pj == "." {
// local dir should have 0 path elements instead of 1
lj = 0
}
if li != lj {
// use greater-than because we want to sort with the longest
// paths FIRST rather than last
return li > lj
}
// sort by path names if depths are equal
return pi < pj
})
return outerErr
}
// init initializes the RunFns with a containerFilterProvider.
func (r *RunFns) init() {
if r.NoFunctionsFromInput == nil {
// default no functions from input if any function sources are explicitly provided
nfn := len(r.FunctionPaths) > 0 || len(r.Functions) > 0
r.NoFunctionsFromInput = &nfn
}
// if no path is specified, default reading from stdin and writing to stdout
if r.Path == "" {
if r.Output == nil {
r.Output = os.Stdout
}
if r.Input == nil {
r.Input = os.Stdin
}
}
// functionFilterProvider set the filter provider
if r.functionFilterProvider == nil {
r.functionFilterProvider = r.ffp
}
// if LogSteps is enabled and LogWriter is not specified, use stderr
if r.LogSteps && r.LogWriter == nil {
r.LogWriter = os.Stderr
}
}
type currentUserFunc func() (*user.User, error)
// getUIDGID will return "nobody" if asCurrentUser is false. Otherwise
// return "uid:gid" according to the return from currentUser function.
func getUIDGID(asCurrentUser bool, currentUser currentUserFunc) (string, error) {
if !asCurrentUser {
return "nobody", nil
}
u, err := currentUser()
if err != nil {
return "", err
}
return fmt.Sprintf("%s:%s", u.Uid, u.Gid), nil
}
// ffp provides function filters
func (r *RunFns) ffp(spec runtimeutil.FunctionSpec, api *yaml.RNode, currentUser currentUserFunc) (kio.Filter, error) {
var resultsFile string
if r.ResultsDir != "" {
resultsFile = filepath.Join(r.ResultsDir, fmt.Sprintf(
"results-%v.yaml", r.resultsCount))
atomic.AddUint32(&r.resultsCount, 1)
}
if spec.Container.Image != "" {
// TODO: Add a test for this behavior
uidgid, err := getUIDGID(r.AsCurrentUser, currentUser)
if err != nil {
return nil, err
}
c := container.NewContainer(
runtimeutil.ContainerSpec{
Image: spec.Container.Image,
Network: spec.Container.Network,
StorageMounts: r.StorageMounts,
Env: spec.Container.Env,
},
uidgid,
)
cf := &c
cf.Exec.FunctionConfig = api
cf.Exec.GlobalScope = r.GlobalScope
cf.Exec.ResultsFile = resultsFile
cf.Exec.DeferFailure = spec.DeferFailure
if !r.noCmdSet {
cmd, args := r.getCommand(c)
cf.Exec.Path = cmd
cf.Exec.Args = args
}
return cf, nil
}
return nil, nil
}
// getArgs returns the command + args to run to spawn the container
func (r *RunFns) getCommand(c container.Filter) (string, []string) {
network := runtimeutil.NetworkNameNone
if c.ContainerSpec.Network {
network = runtimeutil.NetworkNameHost
}
// run the container using docker. this is simpler than using the docker
// libraries, and ensures things like auth work the same as if the container
// was run from the cli.
args := []string{"run",
"--rm", // delete the container afterward
"-i", "-a", "STDIN", "-a", "STDOUT", "-a", "STDERR", // attach stdin, stdout, stderr
"--network", string(network),
// added security options
"--user", c.UIDGID,
"--security-opt=no-new-privileges", // don't allow the user to escalate privileges
// note: don't make fs readonly because things like heredoc rely on writing tmp files
}
// TODO(joncwong): Allow StorageMount fields to have default values.
for _, storageMount := range c.StorageMounts {
args = append(args, "--mount", storageMount.String())
}
args = append(args, runtimeutil.NewContainerEnvFromStringSlice(c.Env).GetDockerFlags()...)
a := append(args, c.Image)
if r.Timeout > 0 {
a = append([]string{"-v", "-s9", fmt.Sprintf("%d", r.Timeout), "docker"}, a...)
return "timeout", a
}
return "docker", a
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,10 @@
# Copyright 2019 The Kubernetes Authors.
# SPDX-License-Identifier: Apache-2.0
apiVersion: v1
kind: ConfigMap
metadata:
name: app-config
labels:
app.kubernetes.io/component: undefined
app.kubernetes.io/instance: undefined
data: {}

View File

@ -0,0 +1,36 @@
# Copyright 2019 The Kubernetes Authors.
# SPDX-License-Identifier: Apache-2.0
apiVersion: apps/v1
kind: Deployment
metadata:
name: app
labels:
app: java
spec:
selector:
matchLabels:
app: java
template:
metadata:
labels:
app: java
spec:
restartPolicy: Always
containers:
- name: app
image: gcr.io/project/app:version
command:
- java
- -jar
- /app.jar
ports:
- containerPort: 8080
envFrom:
- configMapRef:
name: app-config
env:
- name: JAVA_OPTS
value: -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap
-Djava.security.egd=file:/dev/./urandom
imagePullPolicy: Always
minReadySeconds: 5

View File

@ -0,0 +1,15 @@
# Copyright 2019 The Kubernetes Authors.
# SPDX-License-Identifier: Apache-2.0
apiVersion: v1
kind: Service
metadata:
name: app
labels:
app: java
spec:
selector:
app: java
ports:
- name: "8080"
port: 8080
targetPort: 8080