Extend poller capabilities

This patch allows to wait for certain state when applying
particular resources.

Change-Id: I064cb49c8971f1edee870bc6c3c3dd1e428c73f0
Signed-off-by: Ruslan Aliev <raliev@mirantis.com>
Closes: #624
This commit is contained in:
Ruslan Aliev 2021-06-28 15:49:12 -05:00
parent ff3d6f84a2
commit 3dfb02eb14
43 changed files with 1123 additions and 1587 deletions

4
go.mod
View File

@ -24,8 +24,8 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.6.1
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 // indirect
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d // indirect
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e // indirect
k8s.io/api v0.21.1
k8s.io/apiextensions-apiserver v0.21.1
k8s.io/apimachinery v0.21.1

13
go.sum
View File

@ -722,8 +722,8 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210224082022-3d97a244fca7/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d h1:20cMwl2fHAzkJMEA+8J4JgqBQcQGzbisXo31MIeenXI=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -786,9 +786,9 @@ golang.org/x/sys v0.0.0-20201112073958-5cba982894dd/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e h1:WUoyKPm6nCo1BnNUvPGnFG3T5DUVem42yDJZZ4CNxMA=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE=
@ -799,8 +799,9 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4 h1:0YWbFKbhXG/wIiuHDSKpS0Iy7FSA+u45VtBMfQcFTTc=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=

View File

@ -4,3 +4,8 @@ metadata:
name: default
spec:
registry: "quay.io"
---
apiVersion: operator.tigera.io/v1
kind: TigeraStatus
metadata:
name: calico

View File

@ -1,7 +1,6 @@
resources:
- wait_node
- get_pods
- wait_tigera
- wait_deploy
- get_node
- wait_pods_ready

View File

@ -1,29 +0,0 @@
#!/bin/sh
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -xe
export TIMEOUT=${TIMEOUT:-1000}
echo "Wait $TIMEOUT seconds for tigera status to be in Available state." 1>&2
end=$(($(date +%s) + $TIMEOUT))
until [ "$(kubectl --kubeconfig $KUBECONFIG --context $KCTL_CONTEXT wait --for=condition=Available --all tigerastatus 2>/dev/null)" ]; do
now=$(date +%s)
if [ $now -gt $end ]; then
echo "Tigera status is not ready before TIMEOUT=$TIMEOUT" 1>&2
exit 1
fi
sleep 10
done

View File

@ -1,6 +0,0 @@
configMapGenerator:
- name: kubectl-wait-tigera
options:
disableNameSuffixHash: true
files:
- script=kubectl_wait_tigera.sh

View File

@ -8,6 +8,11 @@ metadata:
config:
waitOptions:
timeout: 2600
conditions:
- apiVersion: metal3.io/v1alpha1
kind: BareMetalHost
jsonPath: "{.status.provisioning.state}"
value: "provisioned"
pruneOptions:
prune: false
---
@ -21,6 +26,11 @@ config:
waitOptions:
timeout: 5000
pollInterval: 30
conditions:
- apiVersion: metal3.io/v1alpha1
kind: BareMetalHost
jsonPath: "{.status.provisioning.state}"
value: "provisioned"
pruneOptions:
prune: false
---
@ -33,6 +43,14 @@ metadata:
config:
waitOptions:
timeout: 1000
conditions:
- apiVersion: operator.tigera.io/v1
kind: Installation
jsonPath: "{.status.computed}"
- apiVersion: operator.tigera.io/v1
kind: TigeraStatus
jsonPath: "{.status.conditions[?(@.type=='Available')].status}"
value: "True"
pruneOptions:
prune: false
---

View File

@ -12,7 +12,6 @@ phases:
- name: kubectl-get-node-target
- name: kubectl-get-pods-target
- name: initinfra-networking-target
- name: kubectl-wait-tigera-target
- name: kubectl-get-pods-target
- name: clusterctl-init-target
- name: kubectl-wait-pods-any-ephemeral

View File

@ -7,4 +7,3 @@ metadata:
name: host-generation-catalogue
hosts:
m3:
- node02

View File

@ -1,7 +1,7 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
# TODO (dukov) It's recocommended to upload BareMetalHost objects separately
# TODO (dukov) It's recommended to upload BareMetalHost objects separately
# otherwise nodes will hang in 'registering' state for quite a long time
- ../../../../type/gating/target/controlplane
- ../catalogues

View File

@ -18,11 +18,6 @@ phases:
- name: kubectl-wait-pods-any-ephemeral
# Deploy calico using tigera operator
- name: initinfra-networking-ephemeral
# Wait for Calico to be deployed using tigera
# Scripts for this phase placed in manifests/function/phase-helpers/wait_tigera/
# To get ConfigMap for this phase, execute `airshipctl phase render --source config -k ConfigMap`
# and find ConfigMap with name kubectl-wait_tigera
- name: kubectl-wait-tigera-ephemeral
# Deploy metal3.io components to ephemeral node
- name: initinfra-ephemeral
# Getting pods as debug information"
@ -66,11 +61,6 @@ phases:
- name: kubectl-get-pods-target
# Deploy calico using tigera operator
- name: initinfra-networking-target
# Wait for Calico to be deployed using tigera
# Scripts for this phase placed in manifests/function/phase-helpers/wait_tigera/
# To get ConfigMap for this phase, execute `airshipctl phase render --source config -k ConfigMap`
# and find ConfigMap with name kubectl-wait-tigera
- name: kubectl-wait-tigera-target
# Deploy infra to cluster
- name: initinfra-target
# List all pods

View File

@ -40,9 +40,18 @@ type ApplyWaitOptions struct {
Timeout int `json:"timeout,omitempty"`
// PollInterval in seconds
PollInterval int `json:"pollInterval,omitempty"`
Conditions []Condition `json:"conditions,omitempty"`
}
// ApplyPruneOptions provides instructions how to prune for kubernetes resources
type ApplyPruneOptions struct {
Prune bool `json:"prune,omitempty"`
}
// Condition is a jsonpath for particular TypeMeta which indicates what state to wait
type Condition struct {
metav1.TypeMeta `json:",inline"`
JSONPath string `json:"jsonPath,omitempty"`
// Value is desired state to wait for, if no value specified - just existence of provided jsonPath will be checked
Value string `json:"value,omitempty"`
}

View File

@ -120,7 +120,7 @@ func (in AirshipctlFunctionImageRepoMap) DeepCopy() AirshipctlFunctionImageRepoM
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ApplyConfig) DeepCopyInto(out *ApplyConfig) {
*out = *in
out.WaitOptions = in.WaitOptions
in.WaitOptions.DeepCopyInto(&out.WaitOptions)
out.PruneOptions = in.PruneOptions
}
@ -152,6 +152,11 @@ func (in *ApplyPruneOptions) DeepCopy() *ApplyPruneOptions {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ApplyWaitOptions) DeepCopyInto(out *ApplyWaitOptions) {
*out = *in
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make([]Condition, len(*in))
copy(*out, *in)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApplyWaitOptions.
@ -531,6 +536,22 @@ func (in *ClusterctlOptions) DeepCopy() *ClusterctlOptions {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Condition) DeepCopyInto(out *Condition) {
*out = *in
out.TypeMeta = in.TypeMeta
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Condition.
func (in *Condition) DeepCopy() *Condition {
if in == nil {
return nil
}
out := new(Condition)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EndPointSpec) DeepCopyInto(out *EndPointSpec) {
*out = *in
@ -1083,7 +1104,7 @@ func (in *KubernetesApply) DeepCopyInto(out *KubernetesApply) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
out.Config = in.Config
in.Config.DeepCopyInto(&out.Config)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KubernetesApply.

View File

@ -15,45 +15,13 @@
package cluster
import (
"fmt"
"io"
"opendev.org/airship/airshipctl/pkg/config"
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
"opendev.org/airship/airshipctl/pkg/log"
"opendev.org/airship/airshipctl/pkg/phase"
"opendev.org/airship/airshipctl/pkg/util"
)
// StatusRunner runs internal logic of cluster status command
func StatusRunner(o StatusOptions, w io.Writer) error {
statusMap, docs, err := o.GetStatusMapDocs()
if err != nil {
return err
}
var errors []error
tw := util.NewTabWriter(w)
fmt.Fprintf(tw, "Kind\tName\tStatus\n")
for _, doc := range docs {
status, err := statusMap.GetStatusForResource(doc)
if err != nil {
errors = append(errors, err)
} else {
fmt.Fprintf(tw, "%s\t%s\t%s\n", doc.GetKind(), doc.GetName(), status)
}
}
tw.Flush()
if len(errors) > 0 {
log.Debug("The following errors occurred while requesting the status:")
for _, statusErr := range errors {
log.Debug(statusErr)
}
}
return nil
}
// GetKubeconfigCommand holds options for get kubeconfig command
type GetKubeconfigCommand struct {
ClusterNames []string

View File

@ -1,88 +0,0 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster_test
import (
"bytes"
"fmt"
"regexp"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"opendev.org/airship/airshipctl/pkg/cluster"
"opendev.org/airship/airshipctl/pkg/document"
"opendev.org/airship/airshipctl/pkg/k8s/client/fake"
testdoc "opendev.org/airship/airshipctl/testutil/document"
)
type mockStatusOptions struct{}
func getAllDocCfgs() []string {
return []string{
`apiVersion: "example.com/v1"
kind: Resource
metadata:
name: stable-resource
namespace: target-infra
`,
}
}
func testFakeDocBundle() document.Bundle {
bundle := &testdoc.MockBundle{}
docCfgs := getAllDocCfgs()
allDocs := make([]document.Document, len(docCfgs))
for i, cfg := range docCfgs {
doc, err := document.NewDocumentFromBytes([]byte(cfg))
if err != nil {
return bundle
}
allDocs[i] = doc
}
bundle.On("GetAllDocuments").Return(allDocs, nil)
return bundle
}
func (o mockStatusOptions) GetStatusMapDocs() (*cluster.StatusMap, []document.Document, error) {
fakeClient := fake.NewClient(
fake.WithCRDs(makeResourceCRD(annotationValidStatusCheck())),
fake.WithDynamicObjects(makeResource("stable-resource", "stable")))
fakeSM, err := cluster.NewStatusMap(fakeClient)
if err != nil {
return nil, nil, err
}
fakeDocBundle := testFakeDocBundle()
fakeDocs, err := fakeDocBundle.GetAllDocuments()
if err != nil {
return nil, nil, err
}
return fakeSM, fakeDocs, nil
}
func TestStatusRunner(t *testing.T) {
statusOptions := mockStatusOptions{}
b := bytes.NewBuffer(nil)
err := cluster.StatusRunner(statusOptions, b)
require.NoError(t, err)
expectedOutput := fmt.Sprintf("Kind Name Status Resource stable-resource Stable ")
space := regexp.MustCompile(`\s+`)
str := space.ReplaceAllString(b.String(), " ")
assert.Equal(t, expectedOutput, str)
}

View File

@ -1,37 +0,0 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster
import "fmt"
// ErrInvalidStatusCheck denotes that something went wrong while handling a
// status-check annotation.
type ErrInvalidStatusCheck struct {
What string
}
func (err ErrInvalidStatusCheck) Error() string {
return fmt.Sprintf("invalid status-check: %s", err.What)
}
// ErrResourceNotFound is used when a resource is requested from a StatusMap,
// but that resource can't be found
type ErrResourceNotFound struct {
Resource string
}
func (err ErrResourceNotFound) Error() string {
return fmt.Sprintf("could not find a status for resource %q", err.Resource)
}

View File

@ -1,102 +0,0 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster_test
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"opendev.org/airship/airshipctl/pkg/cluster"
)
func TestMatch(t *testing.T) {
tests := map[string]struct {
expression cluster.Expression
object *unstructured.Unstructured
expected bool
expectedErr error
}{
"healthy-object-matches-healthy": {
expression: cluster.Expression{
Condition: `@.status.health=="ok"`,
},
object: &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "testversion/v1",
"kind": "TestObject",
"metadata": map[string]interface{}{
"name": "test-object",
},
"status": map[string]interface{}{
"health": "ok",
},
},
},
expected: true,
},
"unhealthy-object-matches-unhealthy": {
expression: cluster.Expression{
Condition: `@.status.health=="ok"`,
},
object: &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "testversion/v1",
"kind": "TestObject",
"metadata": map[string]interface{}{
"name": "test-object",
},
"status": map[string]interface{}{
"health": "not-ok",
},
},
},
expected: false,
},
"invalid-json-path-returns-error": {
expression: cluster.Expression{
Condition: `invalid JSON Path]`,
},
object: &unstructured.Unstructured{},
expectedErr: cluster.ErrInvalidStatusCheck{
What: `unable to parse jsonpath "invalid JSON Path]": ` +
`unrecognized character in action: U+005D ']'`,
},
},
"malformed-object-returns-error": {
expression: cluster.Expression{
Condition: `@.status.health=="ok"`,
},
object: &unstructured.Unstructured{},
expectedErr: cluster.ErrInvalidStatusCheck{
What: `failed to execute condition "@.status.health==\"ok\"" ` +
`on object &{map[]}: status is not found`,
},
},
}
for testName, tt := range tests {
tt := tt
t.Run(testName, func(t *testing.T) {
result, err := tt.expression.Match(tt.object)
assert.Equal(t, tt.expectedErr, err)
if tt.expectedErr == nil {
assert.Equal(t, tt.expected, result)
}
})
}
}

View File

@ -1,336 +0,0 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster
import (
"context"
"encoding/json"
"fmt"
apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
"opendev.org/airship/airshipctl/pkg/config"
"opendev.org/airship/airshipctl/pkg/document"
"opendev.org/airship/airshipctl/pkg/k8s/client"
)
// A Status represents a kubernetes resource's state.
type Status string
// StatusOptions provides a way to get status map within all the documents in the bundle
type StatusOptions interface {
GetStatusMapDocs() (*StatusMap, []document.Document, error)
}
type statusOptions struct {
CfgFactory config.Factory
ClientFactory client.Factory
Kubeconfig string
}
// NewStatusOptions constructs a new StatusOptions interface based on inner struct
func NewStatusOptions(cfgFactory config.Factory, clientFactory client.Factory, kubeconfig string) StatusOptions {
return &statusOptions{CfgFactory: cfgFactory, ClientFactory: clientFactory, Kubeconfig: kubeconfig}
}
// GetStatusMapDocs returns status map within all the documents in the bundle
func (o *statusOptions) GetStatusMapDocs() (*StatusMap, []document.Document, error) {
conf, err := o.CfgFactory()
if err != nil {
return nil, nil, err
}
manifest, err := conf.CurrentContextManifest()
if err != nil {
return nil, nil, err
}
docBundle, err := document.NewBundleByPath(manifest.GetTargetPath())
if err != nil {
return nil, nil, err
}
docs, err := docBundle.GetAllDocuments()
if err != nil {
return nil, nil, err
}
client, err := o.ClientFactory(conf.LoadedConfigPath(), o.Kubeconfig)
if err != nil {
return nil, nil, err
}
statusMap, err := NewStatusMap(client)
if err != nil {
return nil, nil, err
}
return statusMap, docs, nil
}
// StatusMap holds a mapping of schema.GroupVersionResource to various statuses
// a resource may be in, as well as the Expression used to check for that
// status.
type StatusMap struct {
client client.Interface
GkMapping []schema.GroupKind
mapping map[schema.GroupVersionResource]map[status.Status]Expression
restMapper *meta.DefaultRESTMapper
}
// NewStatusMap creates a cluster-wide StatusMap. It iterates over all
// CustomResourceDefinitions in the cluster that are annotated with the
// airshipit.org/status-check annotation and creates a mapping from the
// GroupVersionResource to the various statuses and their associated
// expressions.
func NewStatusMap(client client.Interface) (*StatusMap, error) {
statusMap := &StatusMap{
client: client,
mapping: make(map[schema.GroupVersionResource]map[status.Status]Expression),
restMapper: meta.NewDefaultRESTMapper([]schema.GroupVersion{}),
}
client.ApiextensionsClientSet()
crds, err := statusMap.client.ApiextensionsClientSet().
ApiextensionsV1().
CustomResourceDefinitions().
List(context.Background(), metav1.ListOptions{})
if err != nil {
return nil, err
}
for _, crd := range crds.Items {
if err = statusMap.addCRD(crd); err != nil {
return nil, err
}
}
return statusMap, nil
}
// ReadStatus returns object status
func (sm *StatusMap) ReadStatus(ctx context.Context, resource object.ObjMetadata) *event.ResourceStatus {
gk := resource.GroupKind
gvr, err := sm.restMapper.RESTMapping(gk, "v1")
if err != nil {
return handleResourceStatusError(resource, err)
}
options := metav1.GetOptions{}
object, err := sm.client.DynamicClient().Resource(gvr.Resource).
Namespace(resource.Namespace).Get(context.Background(), resource.Name, options)
if err != nil {
return handleResourceStatusError(resource, err)
}
return sm.ReadStatusForObject(ctx, object)
}
// ReadStatusForObject returns resource status for object.
// Current status will be returned only if expression matched.
func (sm *StatusMap) ReadStatusForObject(
ctx context.Context, resource *unstructured.Unstructured) *event.ResourceStatus {
identifier := object.ObjMetadata{
GroupKind: resource.GroupVersionKind().GroupKind(),
Name: resource.GetName(),
Namespace: resource.GetNamespace(),
}
gvk := resource.GroupVersionKind()
restMapping, err := sm.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return &event.ResourceStatus{
Identifier: identifier,
Status: status.UnknownStatus,
Error: err,
}
}
gvr := restMapping.Resource
obj, err := sm.client.DynamicClient().Resource(gvr).Namespace(resource.GetNamespace()).
Get(context.Background(), resource.GetName(), metav1.GetOptions{})
if err != nil {
return &event.ResourceStatus{
Identifier: identifier,
Status: status.UnknownStatus,
Error: err,
}
}
// No need to check for existence - if there isn't a mapping for this
// resource, the following for loop won't run anyway
for currentstatus, expression := range sm.mapping[gvr] {
var matched bool
matched, err = expression.Match(obj)
if err != nil {
return &event.ResourceStatus{
Identifier: identifier,
Status: status.UnknownStatus,
Error: err,
}
}
if matched {
return &event.ResourceStatus{
Identifier: identifier,
Status: currentstatus,
Resource: resource,
Message: fmt.Sprintf("%s is %s", resource.GroupVersionKind().Kind, currentstatus.String()),
}
}
}
return &event.ResourceStatus{
Identifier: identifier,
Status: status.UnknownStatus,
Error: nil,
}
}
// GetStatusForResource iterates over all of the stored conditions for the
// resource and returns the first status whose conditions are met.
func (sm *StatusMap) GetStatusForResource(resource document.Document) (status.Status, error) {
gvk := getGVK(resource)
restMapping, err := sm.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return "", ErrResourceNotFound{resource.GetName()}
}
gvr := restMapping.Resource
obj, err := sm.client.DynamicClient().Resource(gvr).Namespace(resource.GetNamespace()).
Get(context.Background(), resource.GetName(), metav1.GetOptions{})
if err != nil {
return "", err
}
// No need to check for existence - if there isn't a mapping for this
// resource, the following for loop won't run anyway
expressionMap := sm.mapping[gvr]
for status, expression := range expressionMap {
matched, err := expression.Match(obj)
if err != nil {
return "", err
}
if matched {
return status, nil
}
}
return status.UnknownStatus, nil
}
// addCRD adds the mappings from the CRD to its associated statuses
func (sm *StatusMap) addCRD(crd apiextensions.CustomResourceDefinition) error {
annotations := crd.GetAnnotations()
rawStatusChecks, ok := annotations["airshipit.org/status-check"]
if !ok {
// This crd doesn't have a status-check
// annotation, so we should skip it.
return nil
}
statusChecks, err := parseStatusChecks(rawStatusChecks)
if err != nil {
return err
}
gvrs := getGVRs(crd)
for _, gvr := range gvrs {
sm.GkMapping = append(sm.GkMapping, crd.GroupVersionKind().GroupKind())
gvk := gvr.GroupVersion().WithKind(crd.Spec.Names.Kind)
gvrSingular := gvr.GroupVersion().WithResource(crd.Spec.Names.Singular)
sm.mapping[gvr] = statusChecks
sm.restMapper.AddSpecific(gvk, gvr, gvrSingular, meta.RESTScopeNamespace)
}
return nil
}
// getGVRs constructs a slice of schema.GroupVersionResource for
// CustomResources defined by the CustomResourceDefinition.
func getGVRs(crd apiextensions.CustomResourceDefinition) []schema.GroupVersionResource {
gvrs := make([]schema.GroupVersionResource, 0, len(crd.Spec.Versions))
for _, version := range crd.Spec.Versions {
gvr := schema.GroupVersionResource{
Group: crd.Spec.Group,
Version: version.Name,
Resource: crd.Spec.Names.Plural,
}
gvrs = append(gvrs, gvr)
}
return gvrs
}
// getGVK constructs a schema.GroupVersionKind for a document
func getGVK(doc document.Document) schema.GroupVersionKind {
toSchemaGvk := schema.GroupVersionKind{
Group: doc.GetGroup(),
Version: doc.GetVersion(),
Kind: doc.GetKind(),
}
return toSchemaGvk
}
// parseStatusChecks takes a string containing a map of status names (e.g.
// Healthy) to the JSONPath filters associated with the statuses, and returns
// the Go object equivalent.
func parseStatusChecks(raw string) (map[status.Status]Expression, error) {
type statusCheckType struct {
Status string `json:"status"`
Condition string `json:"condition"`
}
var mappings []statusCheckType
if err := json.Unmarshal([]byte(raw), &mappings); err != nil {
return nil, ErrInvalidStatusCheck{
What: fmt.Sprintf("unable to parse jsonpath: %q: %v", raw, err.Error()),
}
}
expressionMap := make(map[status.Status]Expression)
for _, mapping := range mappings {
if mapping.Status == "" {
return nil, ErrInvalidStatusCheck{What: "missing status field"}
}
if mapping.Condition == "" {
return nil, ErrInvalidStatusCheck{What: "missing condition field"}
}
expressionMap[status.Status(mapping.Status)] = Expression{Condition: mapping.Condition}
}
return expressionMap, nil
}
// handleResourceStatusError construct the appropriate ResourceStatus
// object based on the type of error.
func handleResourceStatusError(identifier object.ObjMetadata, err error) *event.ResourceStatus {
if errors.IsNotFound(err) {
return &event.ResourceStatus{
Identifier: identifier,
Status: status.NotFoundStatus,
Message: "Resource not found",
}
}
return &event.ResourceStatus{
Identifier: identifier,
Status: status.UnknownStatus,
Error: err,
}
}

View File

@ -1,319 +0,0 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster_test
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
"opendev.org/airship/airshipctl/pkg/cluster"
"opendev.org/airship/airshipctl/pkg/config"
"opendev.org/airship/airshipctl/pkg/document"
"opendev.org/airship/airshipctl/pkg/k8s/client"
"opendev.org/airship/airshipctl/pkg/k8s/client/fake"
"opendev.org/airship/airshipctl/testutil"
)
func TestGetStatusMapDocs(t *testing.T) {
tests := []struct {
name string
resources []runtime.Object
CRDs []runtime.Object
}{
{
name: "get-status-map-docs-no-resources",
},
{
name: "get-status-map-docs-with-resources",
resources: []runtime.Object{
makeResource("stable-resource", "stable"),
makeResource("pending-resource", "pending"),
},
CRDs: []runtime.Object{
makeResourceCRD(annotationValidStatusCheck()),
},
},
}
for _, tt := range tests {
tt := tt
settings := clusterStatusTestSettings()
fakeClient := fake.NewClient(
fake.WithDynamicObjects(tt.resources...),
fake.WithCRDs(tt.CRDs...))
clientFactory := func(_ string, _ string) (client.Interface, error) {
return fakeClient, nil
}
statusOptions := cluster.NewStatusOptions(func() (*config.Config, error) {
return settings, nil
}, clientFactory, "")
expectedSM, err := cluster.NewStatusMap(fakeClient)
require.NoError(t, err)
docBundle, err := document.NewBundleByPath(settings.Manifests["testManifest"].TargetPath)
require.NoError(t, err)
expectedDocs, err := docBundle.GetAllDocuments()
require.NoError(t, err)
sm, docs, err := statusOptions.GetStatusMapDocs()
require.NoError(t, err)
assert.Equal(t, expectedSM, sm)
assert.Equal(t, expectedDocs, docs)
}
}
func clusterStatusTestSettings() *config.Config {
return &config.Config{
Contexts: map[string]*config.Context{
"testContext": {Manifest: "testManifest"},
},
Manifests: map[string]*config.Manifest{
"testManifest": {TargetPath: "testdata/statusmap"},
},
CurrentContext: "testContext",
}
}
func TestNewStatusMap(t *testing.T) {
tests := []struct {
name string
client *fake.Client
err error
}{
{
name: "no-failure-on-valid-status-check-annotation",
client: fake.NewClient(fake.WithCRDs(makeResourceCRD(annotationValidStatusCheck()))),
err: nil,
},
{
name: "no-failure-when-missing-status-check-annotation",
client: fake.NewClient(fake.WithCRDs(makeResourceCRD(nil))),
err: nil,
},
{
name: "missing-status",
client: fake.NewClient(fake.WithCRDs(makeResourceCRD(annotationMissingStatus()))),
err: cluster.ErrInvalidStatusCheck{What: "missing status field"},
},
{
name: "missing-condition",
client: fake.NewClient(fake.WithCRDs(makeResourceCRD(annotationMissingCondition()))),
err: cluster.ErrInvalidStatusCheck{What: "missing condition field"},
},
{
name: "malformed-status-check",
client: fake.NewClient(fake.WithCRDs(makeResourceCRD(annotationMalformedStatusCheck()))),
err: cluster.ErrInvalidStatusCheck{What: `unable to parse jsonpath: ` +
`"{invalid json": invalid character 'i' looking for beginning of object key string`},
},
}
for _, tt := range tests {
tt := tt
_, err := cluster.NewStatusMap(tt.client)
assert.Equal(t, tt.err, err)
}
}
func TestGetStatusForResource(t *testing.T) {
tests := []struct {
name string
selector document.Selector
client *fake.Client
expectedStatus status.Status
err error
}{
{
name: "stable-resource-is-stable",
selector: document.NewSelector().
ByGvk("example.com", "v1", "Resource").
ByName("stable-resource"),
client: fake.NewClient(
fake.WithCRDs(makeResourceCRD(annotationValidStatusCheck())),
fake.WithDynamicObjects(makeResource("stable-resource", "stable")),
),
expectedStatus: status.Status("Stable"),
},
{
name: "pending-resource-is-pending",
selector: document.NewSelector().
ByGvk("example.com", "v1", "Resource").
ByName("pending-resource"),
client: fake.NewClient(
fake.WithCRDs(makeResourceCRD(annotationValidStatusCheck())),
fake.WithDynamicObjects(makeResource("pending-resource", "pending")),
),
expectedStatus: status.Status("Pending"),
},
{
name: "unknown-resource-is-unknown",
selector: document.NewSelector().
ByGvk("example.com", "v1", "Resource").
ByName("unknown"),
client: fake.NewClient(
fake.WithCRDs(makeResourceCRD(annotationValidStatusCheck())),
fake.WithDynamicObjects(makeResource("unknown", "unknown")),
),
expectedStatus: status.UnknownStatus,
},
{
name: "missing-resource-returns-error",
selector: document.NewSelector().
ByGvk("example.com", "v1", "Missing").
ByName("missing-resource"),
client: fake.NewClient(),
err: cluster.ErrResourceNotFound{Resource: "missing-resource"},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
bundle := testutil.NewTestBundle(t, "testdata/statusmap")
testStatusMap, err := cluster.NewStatusMap(tt.client)
require.NoError(t, err)
doc, err := bundle.SelectOne(tt.selector)
require.NoError(t, err)
actualStatus, err := testStatusMap.GetStatusForResource(doc)
if tt.err != nil {
assert.EqualError(t, err, tt.err.Error())
// We expected an error - no need to check anything else
return
}
require.NoError(t, err)
assert.Equal(t, tt.expectedStatus, actualStatus)
})
}
}
func TestReadStatus(t *testing.T) {
c := fake.NewClient(fake.WithCRDs(makeResourceCRD(annotationValidStatusCheck())),
fake.WithDynamicObjects(makeResource("pending-resource", "pending")))
statusMap, err := cluster.NewStatusMap(c)
require.NoError(t, err)
ctx := context.Background()
resource := object.ObjMetadata{Namespace: "target-infra",
Name: "pending-resource", GroupKind: schema.GroupKind{Group: "example.com", Kind: "Resource"}}
result := statusMap.ReadStatus(ctx, resource)
assert.Equal(t, "Pending", result.Status.String())
}
func makeResource(name, state string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "example.com/v1",
"kind": "Resource",
"metadata": map[string]interface{}{
"name": name,
"namespace": "target-infra",
},
"status": map[string]interface{}{
"state": state,
},
},
}
}
func makeResourceCRD(annotations map[string]string) *apiextensionsv1.CustomResourceDefinition {
return &apiextensionsv1.CustomResourceDefinition{
TypeMeta: metav1.TypeMeta{
Kind: "CustomResourceDefinition",
APIVersion: "apiextensions.k8s.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "resources.example.com",
Annotations: annotations,
},
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
Group: "example.com",
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
{
Name: "v1",
Served: true,
Storage: true,
},
},
// omitting the openAPIV3Schema for brevity
Scope: "Namespaced",
Names: apiextensionsv1.CustomResourceDefinitionNames{
Kind: "Resource",
Plural: "resources",
Singular: "resource",
},
},
}
}
func annotationValidStatusCheck() map[string]string {
return map[string]string{
"airshipit.org/status-check": `
[
{
"status": "Stable",
"condition": "@.status.state==\"stable\""
},
{
"status": "Pending",
"condition": "@.status.state==\"pending\""
}
]`,
}
}
func annotationMissingStatus() map[string]string {
return map[string]string{
"airshipit.org/status-check": `
[
{
"condition": "@.status.state==\"stable\""
},
{
"condition": "@.status.state==\"pending\""
}
]`,
}
}
func annotationMissingCondition() map[string]string {
return map[string]string{
"airshipit.org/status-check": `
[
{
"status": "Stable"
},
{
"status": "Pending"
}
]`,
}
}
func annotationMalformedStatusCheck() map[string]string {
return map[string]string{"airshipit.org/status-check": "{invalid json"}
}

View File

@ -1,40 +0,0 @@
# this CRD defines a type whose status can be checked using the condition in
# the annotations
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: resources.example.com
annotations:
airshipit.org/status-check: |
[
{
"status": "Stable",
"condition": "@.status.state==\"stable\""
},
{
"status": "Pending",
"condition": "@.status.state==\"pending\""
}
]
spec:
group: example.com
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
status:
type: object
properties:
state:
type: string
scope: Namespaced
names:
plural: resources
singular: resource
kind: Resource
shortNames:
- rsc

View File

@ -1,8 +0,0 @@
resources:
- crd.yaml
- stable-resource.yaml
- pending-resource.yaml
- missing.yaml
- unknown.yaml
- legacy-crd.yaml
- legacy-resource.yaml

View File

@ -1,42 +0,0 @@
# this is a legacy CRD which defines a type whose status can be checked using
# the condition in the annotations
# It is included in tests to assure backward compatibility
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: legacies.example.com
annotations:
airshipit.org/status-check: |
[
{
"status": "Stable",
"condition": "@.status.state==\"stable\""
},
{
"status": "Pending",
"condition": "@.status.state==\"pending\""
}
]
spec:
group: example.com
versions:
- name: v1
served: true
storage: true
scope: Namespaced
names:
plural: legacies
singular: legacy
kind: Legacy
shortNames:
- lgc
preserveUnknownFields: false
validation:
openAPIV3Schema:
type: object
properties:
status:
type: object
properties:
state:
type: string

View File

@ -1,7 +0,0 @@
# this legacy-resource is stable because the fake version in the cluster will
# have .status.state == "stable"
apiVersion: "example.com/v1"
kind: Legacy
metadata:
name: stable-legacy
namespace: target-infra

View File

@ -1,7 +0,0 @@
# This resource doesn't have a status-check defined by its CRD (which is also
# missing for brevity). Requesting its status is an error
apiVersion: "example.com/v1"
kind: Missing
metadata:
name: missing-resource
namespace: target-infra

View File

@ -1,7 +0,0 @@
# this resource is pending because the fake version in the cluster will
# have .status.state == "pending"
apiVersion: "example.com/v1"
kind: Resource
metadata:
name: pending-resource
namespace: target-infra

View File

@ -1,7 +0,0 @@
# this resource is stable because the fake version in the cluster will have
# .status.state == "stable"
apiVersion: "example.com/v1"
kind: Resource
metadata:
name: stable-resource
namespace: target-infra

View File

@ -1,8 +0,0 @@
# this resource is in an unknown state because the fake version in the cluster
# will have .status.state == "unknown", which does not correlate to any of the
# status checks in the CRD.
apiVersion: "example.com/v1"
kind: Resource
metadata:
name: unknown
namespace: target-infra

View File

@ -25,7 +25,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/scheme"
cliapply "sigs.k8s.io/cli-utils/pkg/apply"
applyevent "sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/poller"
@ -33,9 +32,9 @@ import (
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/manifestreader"
"sigs.k8s.io/cli-utils/pkg/provider"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"
"opendev.org/airship/airshipctl/pkg/api/v1alpha1"
"opendev.org/airship/airshipctl/pkg/document"
"opendev.org/airship/airshipctl/pkg/events"
airpoller "opendev.org/airship/airshipctl/pkg/k8s/poller"
@ -55,13 +54,14 @@ type Applier struct {
Poller poller.Poller
ManifestReaderFactory utils.ManifestReaderFactory
eventChannel chan events.Event
conditions []v1alpha1.Condition
}
// ReaderFactory function that returns reader factory interface
type ReaderFactory func(validate bool, bundle document.Bundle, factory cmdutil.Factory) manifestreader.ManifestReader
// NewApplier returns instance of Applier
func NewApplier(eventCh chan events.Event, f cmdutil.Factory) *Applier {
func NewApplier(eventCh chan events.Event, f cmdutil.Factory, conditions []v1alpha1.Condition) *Applier {
cf := provider.NewProvider(f)
return &Applier{
Factory: f,
@ -70,6 +70,7 @@ func NewApplier(eventCh chan events.Event, f cmdutil.Factory) *Applier {
CliUtilsApplier: cliapply.NewApplier(cf),
},
eventChannel: eventCh,
conditions: conditions,
}
}
@ -137,17 +138,10 @@ func (a *Applier) getObjects(
}
if a.Poller == nil {
var pErr error
config, pErr := a.Factory.ToRESTConfig()
if pErr != nil {
return nil, pErr
a.Poller, err = airpoller.NewStatusPoller(a.Factory, a.conditions...)
if err != nil {
return nil, err
}
c, pErr := client.New(config, client.Options{Scheme: scheme.Scheme, Mapper: mapper})
if pErr != nil {
return nil, pErr
}
a.Poller = airpoller.NewStatusPoller(c, mapper)
}
if err = a.Driver.Initialize(a.Poller); err != nil {
@ -198,6 +192,7 @@ func cliApplyOptions(ao ApplyOptions) cliapply.Options {
ReconcileTimeout: ao.WaitTimeout,
NoPrune: !ao.Prune,
DryRunStrategy: ao.DryRunStrategy,
PollInterval: ao.PollInterval,
}
}

View File

@ -112,7 +112,7 @@ func TestApplierRun(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
// create default applier
eventChan := make(chan events.Event)
a := applier.NewApplier(eventChan, f)
a := applier.NewApplier(eventChan, f, nil)
opts := applier.ApplyOptions{
WaitTimeout: time.Second * 5,
BundleName: "test-bundle",

View File

@ -1,135 +0,0 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"path/filepath"
apix "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"opendev.org/airship/airshipctl/pkg/k8s/kubectl"
k8sutils "opendev.org/airship/airshipctl/pkg/k8s/utils"
)
// Interface provides an abstraction layer to interactions with kubernetes
// clusters by providing the following:
// * A ClientSet which includes all kubernetes core objects with standard operations
// * A DynamicClient which provides interactions with loosely typed kubernetes resources
// * An ApiextensionsClientSet which provides interactions with CustomResourceDefinitions
// * A Kubectl interface that is built on top of kubectl libraries and
// implements such kubectl subcommands as kubectl apply (more will be added)
type Interface interface {
ClientSet() kubernetes.Interface
DynamicClient() dynamic.Interface
ApiextensionsClientSet() apix.Interface
Kubectl() kubectl.Interface
}
// Client is an implementation of Interface
type Client struct {
clientSet kubernetes.Interface
dynamicClient dynamic.Interface
apixClient apix.Interface
kubectl kubectl.Interface
}
// Client implements Interface
var _ Interface = &Client{}
// Factory is a function which creates Interfaces
type Factory func(airshipConfigPath string, kubeconfig string) (Interface, error)
// DefaultClient is a factory which generates a default client
var DefaultClient Factory = NewClient
// NewClient creates a Client initialized from the passed in settings
func NewClient(airshipConfigPath string, kubeconfig string) (Interface, error) {
client := new(Client)
var err error
// TODO add support for kubeconfig context, for now use current context
f := k8sutils.FactoryFromKubeConfig(kubeconfig, "")
pathToBufferDir := filepath.Dir(airshipConfigPath)
client.kubectl = kubectl.NewKubectl(f).WithBufferDir(pathToBufferDir)
client.clientSet, err = f.KubernetesClientSet()
if err != nil {
return nil, err
}
client.dynamicClient, err = f.DynamicClient()
if err != nil {
return nil, err
}
// kubectl factories can't create CRD clients...
kubeConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
client.apixClient, err = apix.NewForConfig(kubeConfig)
if err != nil {
return nil, err
}
return client, nil
}
// ClientSet returns the ClientSet interface
func (c *Client) ClientSet() kubernetes.Interface {
return c.clientSet
}
// SetClientSet sets the ClientSet interface
func (c *Client) SetClientSet(clientSet kubernetes.Interface) {
c.clientSet = clientSet
}
// DynamicClient returns the DynamicClient interface
func (c *Client) DynamicClient() dynamic.Interface {
return c.dynamicClient
}
// SetDynamicClient sets the DynamicClient interface
func (c *Client) SetDynamicClient(dynamicClient dynamic.Interface) {
c.dynamicClient = dynamicClient
}
// ApiextensionsClientSet returns the Apiextensions interface
func (c *Client) ApiextensionsClientSet() apix.Interface {
return c.apixClient
}
// SetApiextensionsClientSet sets the ApiextensionsClientSet interface
func (c *Client) SetApiextensionsClientSet(apixClient apix.Interface) {
c.apixClient = apixClient
}
// Kubectl returns the Kubectl interface
func (c *Client) Kubectl() kubectl.Interface {
return c.kubectl
}
// SetKubectl sets the Kubectl interface
func (c *Client) SetKubectl(kctl kubectl.Interface) {
c.kubectl = kctl
}

View File

@ -1,52 +0,0 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client_test
import (
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"opendev.org/airship/airshipctl/pkg/k8s/client"
"opendev.org/airship/airshipctl/testutil"
)
const (
kubeconfigPath = "testdata/kubeconfig.yaml"
airshipConfigDir = "testdata"
)
func TestNewClient(t *testing.T) {
conf, cleanup := testutil.InitConfig(t)
defer cleanup(t)
akp, err := filepath.Abs(kubeconfigPath)
require.NoError(t, err)
adir, err := filepath.Abs(airshipConfigDir)
require.NoError(t, err)
conf.SetLoadedConfigPath(adir)
client, err := client.NewClient(conf.LoadedConfigPath(), akp)
assert.NoError(t, err)
assert.NotNil(t, client)
assert.NotNil(t, client.ClientSet())
assert.NotNil(t, client.DynamicClient())
assert.NotNil(t, client.ApiextensionsClientSet())
assert.NotNil(t, client.Kubectl())
}

View File

@ -1,146 +0,0 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fake
import (
apix "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apixFake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
dynamicFake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes"
kubernetesFake "k8s.io/client-go/kubernetes/fake"
"opendev.org/airship/airshipctl/pkg/k8s/client"
"opendev.org/airship/airshipctl/pkg/k8s/kubectl"
"opendev.org/airship/airshipctl/testutil/k8sutils"
)
// Client is an implementation of client.Interface meant for testing purposes.
type Client struct {
mockClientSet func() kubernetes.Interface
mockDynamicClient func() dynamic.Interface
mockApiextensionsClientSet func() apix.Interface
mockKubectl func() kubectl.Interface
}
var _ client.Interface = &Client{}
// ClientSet is used to get a mocked implementation of a kubernetes clientset.
// To initialize the mocked clientset to be returned, use the WithTypedObjects
// ResourceAccumulator
func (c *Client) ClientSet() kubernetes.Interface {
return c.mockClientSet()
}
// DynamicClient is used to get a mocked implementation of a dynamic client.
// To initialize the mocked client to be returned, use the WithDynamicObjects
// ResourceAccumulator.
func (c *Client) DynamicClient() dynamic.Interface {
return c.mockDynamicClient()
}
// ApiextensionsClientSet is used to get a mocked implementation of an
// Apiextensions clientset. To initialize the mocked client to be returned,
// use the WithCRDs ResourceAccumulator
func (c *Client) ApiextensionsClientSet() apix.Interface {
return c.mockApiextensionsClientSet()
}
// Kubectl is used to get a mocked implementation of a Kubectl client.
// To initialize the mocked client to be returned, use the WithKubectl ResourceAccumulator
func (c *Client) Kubectl() kubectl.Interface {
return c.mockKubectl()
}
// A ResourceAccumulator is an option meant to be passed to NewClient.
// ResourceAccumulators can be mixed and matched to create a collection of
// mocked clients, each having their own fake objects.
type ResourceAccumulator func(*Client)
// NewClient creates an instance of a Client. If no arguments are passed, the
// returned Client will have fresh mocked kubernetes clients which will have no
// prior knowledge of any resources.
//
// If prior knowledge of resources is desirable, NewClient should receive an
// appropriate ResourceAccumulator initialized with the desired resources.
func NewClient(resourceAccumulators ...ResourceAccumulator) *Client {
fakeClient := new(Client)
for _, accumulator := range resourceAccumulators {
accumulator(fakeClient)
}
if fakeClient.mockClientSet == nil {
fakeClient.mockClientSet = func() kubernetes.Interface {
return kubernetesFake.NewSimpleClientset()
}
}
if fakeClient.mockDynamicClient == nil {
fakeClient.mockDynamicClient = func() dynamic.Interface {
return dynamicFake.NewSimpleDynamicClient(runtime.NewScheme())
}
}
if fakeClient.mockApiextensionsClientSet == nil {
fakeClient.mockApiextensionsClientSet = func() apix.Interface {
return apixFake.NewSimpleClientset()
}
}
if fakeClient.mockKubectl == nil {
fakeClient.mockKubectl = func() kubectl.Interface {
return kubectl.NewKubectl(k8sutils.NewMockKubectlFactory())
}
}
return fakeClient
}
// WithTypedObjects returns a ResourceAccumulator with resources which would
// normally be accessible through a kubernetes ClientSet (e.g. Pods,
// Deployments, etc...).
func WithTypedObjects(objs ...runtime.Object) ResourceAccumulator {
return func(c *Client) {
c.mockClientSet = func() kubernetes.Interface {
return kubernetesFake.NewSimpleClientset(objs...)
}
}
}
// WithCRDs returns a ResourceAccumulator with resources which would
// normally be accessible through a kubernetes ApiextensionsClientSet (e.g. CRDs).
func WithCRDs(objs ...runtime.Object) ResourceAccumulator {
return func(c *Client) {
c.mockApiextensionsClientSet = func() apix.Interface {
return apixFake.NewSimpleClientset(objs...)
}
}
}
// WithDynamicObjects returns a ResourceAccumulator with resources which would
// normally be accessible through a kubernetes DynamicClient (e.g. unstructured.Unstructured).
func WithDynamicObjects(objs ...runtime.Object) ResourceAccumulator {
return func(c *Client) {
c.mockDynamicClient = func() dynamic.Interface {
return dynamicFake.NewSimpleDynamicClient(runtime.NewScheme(), objs...)
}
}
}
// WithKubectl returns a ResourceAccumulator with an instance of a kubectl.Interface.
func WithKubectl(kubectlInstance *kubectl.Kubectl) ResourceAccumulator {
return func(c *Client) {
c.mockKubectl = func() kubectl.Interface {
return kubectlInstance
}
}
}

View File

@ -1,19 +0,0 @@
apiVersion: v1
clusters:
- cluster:
certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUN5RENDQWJDZ0F3SUJBZ0lCQURBTkJna3Foa2lHOXcwQkFRc0ZBREFWTVJNd0VRWURWUVFERXdwcmRXSmwKY201bGRHVnpNQjRYRFRFNU1Ea3lPVEUzTURNd09Wb1hEVEk1TURreU5qRTNNRE13T1Zvd0ZURVRNQkVHQTFVRQpBeE1LYTNWaVpYSnVaWFJsY3pDQ0FTSXdEUVlKS29aSWh2Y05BUUVCQlFBRGdnRVBBRENDQVFvQ2dnRUJBTUZyCkdxM0kyb2dZci81Y01Udy9Na1pORTNWQURzdEdyU240WjU2TDhPUGhMcUhDN2t1dno2dVpES3dCSGtGeTBNK2MKRXIzd2piUGE1aTV5NmkyMGtxSHBVMjdPZTA0dzBXV2s4N0RSZVlWaGNoZVJHRXoraWt3SndIcGRmMjJVemZNKwpkSDBzaUhuMVd6UnovYk4za3hMUzJlMnZ2U1Y3bmNubk1YRUd4OXV0MUY0NThHeWxxdmxXTUlWMzg5Q2didXFDCkcwcFdiMTBLM0RVZWdiT25Xa1FmSm5sTWRRVVZDUVdZZEZaaklrcWtkWi9hVTRobkNEV01oZXNWRnFNaDN3VVAKczhQay9BNWh1ZFFPbnFRNDVIWXZLdjZ5RjJWcDUyWExBRUx3NDJ4aVRKZlh0V1h4eHR6cU4wY1lyL2VxeS9XMQp1YVVGSW5xQjFVM0JFL1oxbmFrQ0F3RUFBYU1qTUNFd0RnWURWUjBQQVFIL0JBUURBZ0trTUE4R0ExVWRFd0VCCi93UUZNQU1CQWY4d0RRWUpLb1pJaHZjTkFRRUxCUUFEZ2dFQkFKUUVKQVBLSkFjVDVuK3dsWGJsdU9mS0J3c2gKZTI4R1c5R2QwM0N0NGF3RzhzMXE1ZHNua2tpZmVTUENHVFZ1SXF6UTZDNmJaSk9SMDMvVEl5ejh6NDJnaitDVApjWUZXZkltM2RKTnpRL08xWkdySXZZNWdtcWJtWDlpV0JaU24rRytEOGxubzd2aGMvY0tBRFR5OTMvVU92MThuCkdhMnIrRGJJcHcyTWVBVEl2elpxRS9RWlVSQ25DMmdjUFhTVzFqN2h4R3o1a3ZNcGVDZTdQYVUvdVFvblVHSWsKZ2t6ZzI4NHQvREhUUzc4N1V1SUg5cXBaV09yTFNMOGFBeUxQUHhWSXBteGZmbWRETE9TS2VUemRlTmxoSitUMwowQlBVaHBQTlJBNTNJN0hRQjhVUDR2elNONTkzZ1VFbVlFQ2Jic2RYSzB6ZVR6SDdWWHR2Zmd5WTVWWT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
server: https://127.0.0.1:6443
name: dummycluster_ephemeral
contexts:
- context:
cluster: dummycluster_ephemeral
user: kubernetes-admin
name: dummy_cluster
current-context: dummy_cluster
kind: Config
preferences: {}
users:
- name: kubernetes-admin
user:
client-certificate-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUM4akNDQWRxZ0F3SUJBZ0lJQXhEdzk2RUY4SXN3RFFZSktvWklodmNOQVFFTEJRQXdGVEVUTUJFR0ExVUUKQXhNS2EzVmlaWEp1WlhSbGN6QWVGdzB4T1RBNU1qa3hOekF6TURsYUZ3MHlNREE1TWpneE56QXpNVEphTURReApGekFWQmdOVkJBb1REbk41YzNSbGJUcHRZWE4wWlhKek1Sa3dGd1lEVlFRREV4QnJkV0psY201bGRHVnpMV0ZrCmJXbHVNSUlCSWpBTkJna3Foa2lHOXcwQkFRRUZBQU9DQVE4QU1JSUJDZ0tDQVFFQXV6R0pZdlBaNkRvaTQyMUQKSzhXSmFaQ25OQWQycXo1cC8wNDJvRnpRUGJyQWd6RTJxWVZrek9MOHhBVmVSN1NONXdXb1RXRXlGOEVWN3JyLwo0K0hoSEdpcTVQbXF1SUZ5enpuNi9JWmM4alU5eEVmenZpa2NpckxmVTR2UlhKUXdWd2dBU05sMkFXQUloMmRECmRUcmpCQ2ZpS1dNSHlqMFJiSGFsc0J6T3BnVC9IVHYzR1F6blVRekZLdjJkajVWMU5rUy9ESGp5UlJKK0VMNlEKQlltR3NlZzVQNE5iQzllYnVpcG1NVEFxL0p1bU9vb2QrRmpMMm5acUw2Zkk2ZkJ0RjVPR2xwQ0IxWUo4ZnpDdApHUVFaN0hUSWJkYjJ0cDQzRlZPaHlRYlZjSHFUQTA0UEoxNSswV0F5bVVKVXo4WEE1NDRyL2J2NzRKY0pVUkZoCmFyWmlRd0lEQVFBQm95Y3dKVEFPQmdOVkhROEJBZjhFQkFNQ0JhQXdFd1lEVlIwbEJBd3dDZ1lJS3dZQkJRVUgKQXdJd0RRWUpLb1pJaHZjTkFRRUxCUUFEZ2dFQkFMMmhIUmVibEl2VHJTMFNmUVg1RG9ueVVhNy84aTg1endVWApSd3dqdzFuS0U0NDJKbWZWRGZ5b0hRYUM4Ti9MQkxyUXM0U0lqU1JYdmFHU1dSQnRnT1RRV21Db1laMXdSbjdwCndDTXZQTERJdHNWWm90SEZpUFl2b1lHWFFUSXA3YlROMmg1OEJaaEZ3d25nWUovT04zeG1rd29IN1IxYmVxWEYKWHF1TTluekhESk41VlZub1lQR09yRHMwWlg1RnNxNGtWVU0wVExNQm9qN1ZIRDhmU0E5RjRYNU4yMldsZnNPMAo4aksrRFJDWTAyaHBrYTZQQ0pQS0lNOEJaMUFSMG9ZakZxT0plcXpPTjBqcnpYWHh4S2pHVFVUb1BldVA5dCtCCjJOMVA1TnI4a2oxM0lrend5Q1NZclFVN09ZM3ltZmJobHkrcXZxaFVFa014MlQ1SkpmQT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
client-key-data: LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFcFFJQkFBS0NBUUVBdXpHSll2UFo2RG9pNDIxREs4V0phWkNuTkFkMnF6NXAvMDQyb0Z6UVBickFnekUyCnFZVmt6T0w4eEFWZVI3U041d1dvVFdFeUY4RVY3cnIvNCtIaEhHaXE1UG1xdUlGeXp6bjYvSVpjOGpVOXhFZnoKdmlrY2lyTGZVNHZSWEpRd1Z3Z0FTTmwyQVdBSWgyZERkVHJqQkNmaUtXTUh5ajBSYkhhbHNCek9wZ1QvSFR2MwpHUXpuVVF6Rkt2MmRqNVYxTmtTL0RIanlSUkorRUw2UUJZbUdzZWc1UDROYkM5ZWJ1aXBtTVRBcS9KdW1Pb29kCitGakwyblpxTDZmSTZmQnRGNU9HbHBDQjFZSjhmekN0R1FRWjdIVEliZGIydHA0M0ZWT2h5UWJWY0hxVEEwNFAKSjE1KzBXQXltVUpVejhYQTU0NHIvYnY3NEpjSlVSRmhhclppUXdJREFRQUJBb0lCQVFDU0pycjlaeVpiQ2dqegpSL3VKMFZEWCt2aVF4c01BTUZyUjJsOE1GV3NBeHk1SFA4Vk4xYmc5djN0YUVGYnI1U3hsa3lVMFJRNjNQU25DCm1uM3ZqZ3dVQWlScllnTEl5MGk0UXF5VFBOU1V4cnpTNHRxTFBjM3EvSDBnM2FrNGZ2cSsrS0JBUUlqQnloamUKbnVFc1JpMjRzT3NESlM2UDE5NGlzUC9yNEpIM1M5bFZGbkVuOGxUR2c0M1kvMFZoMXl0cnkvdDljWjR5ZUNpNwpjMHFEaTZZcXJZaFZhSW9RRW1VQjdsbHRFZkZzb3l4VDR6RTE5U3pVbkRoMmxjYTF1TzhqcmI4d2xHTzBoQ2JyClB1R1l2WFFQa3Q0VlNmalhvdGJ3d2lBNFRCVERCRzU1bHp6MmNKeS9zSS8zSHlYbEMxcTdXUmRuQVhhZ1F0VzkKOE9DZGRkb0JBb0dCQU5NcUNtSW94REtyckhZZFRxT1M1ZFN4cVMxL0NUN3ZYZ0pScXBqd2Y4WHA2WHo0KzIvTAozVXFaVDBEL3dGTkZkc1Z4eFYxMnNYMUdwMHFWZVlKRld5OVlCaHVSWGpTZ0ZEWldSY1Z1Y01sNVpPTmJsbmZGCjVKQ0xnNXFMZ1g5VTNSRnJrR3A0R241UDQxamg4TnhKVlhzZG5xWE9xNTFUK1RRT1UzdkpGQjc1QW9HQkFPTHcKalp1cnZtVkZyTHdaVGgvRDNpWll5SVV0ZUljZ2NKLzlzbTh6L0pPRmRIbFd4dGRHUFVzYVd1MnBTNEhvckFtbgpqTm4vSTluUXd3enZ3MWUzVVFPbUhMRjVBczk4VU5hbk5TQ0xNMW1yaXZHRXJ1VHFnTDM1bU41eFZPdTUxQU5JCm4yNkFtODBJT2JDeEtLa0R0ZXJSaFhHd3g5c1pONVJCbG9VRThZNGJBb0dBQ3ZsdVhMZWRxcng5VkE0bDNoNXUKVDJXRVUxYjgxZ1orcmtRc1I1S0lNWEw4cllBTElUNUpHKzFuendyN3BkaEFXZmFWdVV2SDRhamdYT0h6MUs5aQpFODNSVTNGMG9ldUg0V01PY1RwU0prWm0xZUlXcWRiaEVCb1FGdUlWTXRib1BsV0d4ZUhFRHJoOEtreGp4aThSCmdEcUQyajRwY1IzQ0g5QjJ5a0lqQjVFQ2dZRUExc0xXLys2enE1c1lNSm14K1JXZThhTXJmL3pjQnVTSU1LQWgKY0dNK0wwMG9RSHdDaUU4TVNqcVN1ajV3R214YUFuanhMb3ZwSFlRV1VmUEVaUW95UE1YQ2VhRVBLOU4xbk8xMwp0V2lHRytIZkIxaU5PazFCc0lhNFNDbndOM1FRVTFzeXBaeEgxT3hueS9LYmkvYmEvWEZ5VzNqMGFUK2YvVWxrCmJGV1ZVdWtDZ1lFQTBaMmRTTFlmTjV5eFNtYk5xMWVqZXdWd1BjRzQxR2hQclNUZEJxdHFac1doWGE3aDdLTWEKeHdvamh5SXpnTXNyK2tXODdlajhDQ2h0d21sQ1p5QU92QmdOZytncnJ1cEZLM3FOSkpKeU9YREdHckdpbzZmTQp5aXB3Q2tZVGVxRThpZ1J6UkI5QkdFUGY4eVpjMUtwdmZhUDVhM0lRZmxiV0czbGpUemNNZVZjPQotLS0tLUVORCBSU0EgUFJJVkFURSBLRVktLS0tLQo=

View File

@ -0,0 +1,70 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package poller
import (
"context"
"strings"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader"
"sigs.k8s.io/controller-runtime/pkg/client"
"opendev.org/airship/airshipctl/pkg/log"
)
const allowedApplyErrors = 3
// CachingClusterReader is wrapper for kstatus.CachingClusterReader implementation
type CachingClusterReader struct {
Cr *clusterreader.CachingClusterReader
applyErrors []error
}
// Get is a wrapper for kstatus.CachingClusterReader Get method
func (c *CachingClusterReader) Get(ctx context.Context, key client.ObjectKey, obj *unstructured.Unstructured) error {
return c.Cr.Get(ctx, key, obj)
}
// ListNamespaceScoped is a wrapper for kstatus.CachingClusterReader ListNamespaceScoped method
func (c *CachingClusterReader) ListNamespaceScoped(
ctx context.Context,
list *unstructured.UnstructuredList,
namespace string,
selector labels.Selector) error {
return c.Cr.ListNamespaceScoped(ctx, list, namespace, selector)
}
// ListClusterScoped is a wrapper for kstatus.CachingClusterReader ListClusterScoped method
func (c *CachingClusterReader) ListClusterScoped(
ctx context.Context,
list *unstructured.UnstructuredList,
selector labels.Selector) error {
return c.Cr.ListClusterScoped(ctx, list, selector)
}
// Sync is a wrapper for kstatus.CachingClusterReader Sync method, allows to filter specific errors
func (c *CachingClusterReader) Sync(ctx context.Context) error {
err := c.Cr.Sync(ctx)
if err != nil && strings.Contains(err.Error(), "request timed out") {
c.applyErrors = append(c.applyErrors, err)
if len(c.applyErrors) <= allowedApplyErrors {
log.Printf("timeout error occurred during sync: '%v', skipping", err)
return nil
}
}
return err
}

View File

@ -0,0 +1,246 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package poller_test
import (
"context"
"fmt"
"sort"
"testing"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/testutil"
"sigs.k8s.io/controller-runtime/pkg/client"
"opendev.org/airship/airshipctl/pkg/k8s/poller"
)
var (
deploymentGVK = appsv1.SchemeGroupVersion.WithKind("Deployment")
rsGVK = appsv1.SchemeGroupVersion.WithKind("ReplicaSet")
podGVK = v1.SchemeGroupVersion.WithKind("Pod")
)
// gkNamespace contains information about a GroupVersionKind and a namespace.
type gkNamespace struct {
GroupKind schema.GroupKind
Namespace string
}
func TestSync(t *testing.T) {
testCases := map[string]struct {
identifiers []object.ObjMetadata
expectedSynced []gkNamespace
}{
"no identifiers": {
identifiers: []object.ObjMetadata{},
},
"same GVK in multiple namespaces": {
identifiers: []object.ObjMetadata{
{
GroupKind: deploymentGVK.GroupKind(),
Name: "deployment",
Namespace: "Foo",
},
{
GroupKind: deploymentGVK.GroupKind(),
Name: "deployment",
Namespace: "Bar",
},
},
expectedSynced: []gkNamespace{
{
GroupKind: deploymentGVK.GroupKind(),
Namespace: "Foo",
},
{
GroupKind: rsGVK.GroupKind(),
Namespace: "Foo",
},
{
GroupKind: podGVK.GroupKind(),
Namespace: "Foo",
},
{
GroupKind: deploymentGVK.GroupKind(),
Namespace: "Bar",
},
{
GroupKind: rsGVK.GroupKind(),
Namespace: "Bar",
},
{
GroupKind: podGVK.GroupKind(),
Namespace: "Bar",
},
},
},
}
fakeMapper := testutil.NewFakeRESTMapper(
appsv1.SchemeGroupVersion.WithKind("Deployment"),
appsv1.SchemeGroupVersion.WithKind("ReplicaSet"),
v1.SchemeGroupVersion.WithKind("Pod"),
)
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
fakeReader := &fakeReader{}
cr, err := clusterreader.NewCachingClusterReader(fakeReader, fakeMapper, tc.identifiers)
require.NoError(t, err)
clusterReader := &poller.CachingClusterReader{Cr: cr}
err = clusterReader.Sync(context.Background())
require.NoError(t, err)
synced := fakeReader.syncedGVKNamespaces
sortGVKNamespaces(synced)
expectedSynced := tc.expectedSynced
sortGVKNamespaces(expectedSynced)
require.Equal(t, expectedSynced, synced)
})
}
}
func TestSync_Errors(t *testing.T) {
testCases := map[string]struct {
mapper meta.RESTMapper
readerError error
expectSyncError bool
cacheError bool
cacheErrorText string
}{
"mapping and reader are successful": {
mapper: testutil.NewFakeRESTMapper(
apiextv1.SchemeGroupVersion.WithKind("CustomResourceDefinition"),
),
readerError: nil,
expectSyncError: false,
cacheError: false,
},
"reader returns NotFound error": {
mapper: testutil.NewFakeRESTMapper(
apiextv1.SchemeGroupVersion.WithKind("CustomResourceDefinition"),
),
readerError: errors.NewNotFound(schema.GroupResource{
Group: "apiextensions.k8s.io",
Resource: "customresourcedefinitions",
}, "my-crd"),
expectSyncError: false,
cacheError: true,
cacheErrorText: "not found",
},
"reader returns request timed out other error": {
mapper: testutil.NewFakeRESTMapper(
apiextv1.SchemeGroupVersion.WithKind("CustomResourceDefinition"),
),
readerError: errors.NewInternalError(fmt.Errorf("request timed out")),
expectSyncError: false,
cacheError: true,
cacheErrorText: "not found",
},
"reader returns other error": {
mapper: testutil.NewFakeRESTMapper(
apiextv1.SchemeGroupVersion.WithKind("CustomResourceDefinition"),
),
readerError: errors.NewInternalError(fmt.Errorf("testing")),
expectSyncError: true,
cacheError: true,
cacheErrorText: "not found",
},
"mapping not found": {
mapper: testutil.NewFakeRESTMapper(),
expectSyncError: false,
cacheError: true,
cacheErrorText: "no matches for kind",
},
}
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
identifiers := []object.ObjMetadata{
{
Name: "my-crd",
GroupKind: schema.GroupKind{
Group: "apiextensions.k8s.io",
Kind: "CustomResourceDefinition",
},
},
}
fakeReader := &fakeReader{
err: tc.readerError,
}
cr, err := clusterreader.NewCachingClusterReader(fakeReader, tc.mapper, identifiers)
require.NoError(t, err)
clusterReader := poller.CachingClusterReader{Cr: cr}
err = clusterReader.Sync(context.Background())
if tc.expectSyncError {
require.Equal(t, tc.readerError, err)
return
}
require.NoError(t, err)
})
}
}
func sortGVKNamespaces(gvkNamespaces []gkNamespace) {
sort.Slice(gvkNamespaces, func(i, j int) bool {
if gvkNamespaces[i].GroupKind.String() != gvkNamespaces[j].GroupKind.String() {
return gvkNamespaces[i].GroupKind.String() < gvkNamespaces[j].GroupKind.String()
}
return gvkNamespaces[i].Namespace < gvkNamespaces[j].Namespace
})
}
type fakeReader struct {
syncedGVKNamespaces []gkNamespace
err error
}
func (f *fakeReader) Get(_ context.Context, _ client.ObjectKey, _ client.Object) error {
return nil
}
//nolint:gocritic
func (f *fakeReader) List(_ context.Context, list client.ObjectList, opts ...client.ListOption) error {
var namespace string
for _, opt := range opts {
switch opt := opt.(type) {
case client.InNamespace:
namespace = string(opt)
}
}
gvk := list.GetObjectKind().GroupVersionKind()
f.syncedGVKNamespaces = append(f.syncedGVKNamespaces, gkNamespace{
GroupKind: gvk.GroupKind(),
Namespace: namespace,
})
return f.err
}

View File

@ -12,12 +12,11 @@
limitations under the License.
*/
package cluster
package poller
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/util/jsonpath"
)
@ -26,7 +25,8 @@ import (
type Expression struct {
// A Condition describes a JSONPath filter which is matched against an
// array containing a single resource.
Condition string `json:"condition"`
Condition string
Value string
// jsonPath is used for the actual act of filtering on resources. It is
// stored within the Expression as a means of memoization.
@ -36,36 +36,26 @@ type Expression struct {
// Match returns true if the given object matches the parsed jsonpath object.
// An error is returned if the Expression's condition is not a valid JSONPath
// as defined here: https://goessner.net/articles/JsonPath.
func (e *Expression) Match(obj runtime.Unstructured) (bool, error) {
// NOTE(howell): JSONPath filters only work on lists. This means that
// in order to check if a certain condition is met for obj, we need to
// put obj into an list, then see if the filter catches obj.
const listName = "items"
func (e *Expression) Match(obj map[string]interface{}) (bool, error) {
// Parse lazily
if e.jsonPath == nil {
jp := jsonpath.New("status-check")
// The condition must be a filter on a list
itemAsArray := fmt.Sprintf("{$.%s[?(%s)]}", listName, e.Condition)
err := jp.Parse(itemAsArray)
err := jp.Parse(e.Condition)
if err != nil {
return false, ErrInvalidStatusCheck{
What: fmt.Sprintf("unable to parse jsonpath %q: %v", e.Condition, err.Error()),
}
return false, err
}
e.jsonPath = jp
}
// Filters only work on lists
list := map[string]interface{}{
listName: []interface{}{obj.UnstructuredContent()},
}
results, err := e.jsonPath.FindResults(list)
results, err := e.jsonPath.FindResults(obj)
if err != nil {
return false, ErrInvalidStatusCheck{
What: fmt.Sprintf("failed to execute condition %q on object %v: %v", e.Condition, obj, err),
return false, err
}
if e.Value != "" {
return len(results[0]) == 1 && fmt.Sprintf("%s", results[0][0].Interface()) == e.Value, nil
}
return len(results[0]) == 1, nil
}

View File

@ -0,0 +1,77 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package poller_test
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"opendev.org/airship/airshipctl/pkg/k8s/poller"
)
func TestNewExpression(t *testing.T) {
testCases := []struct {
name string
condition string
value string
obj map[string]interface{}
expectedResult bool
expectedErrString string
}{
{
name: "Success - value matched",
condition: "{.status}",
obj: map[string]interface{}{"status": "provisioned"},
value: "provisioned",
expectedResult: true,
},
{
name: "Success - empty value",
condition: "{.status}",
obj: map[string]interface{}{"status": "provisioned"},
expectedResult: true,
},
{
name: "Failed - invalid condition",
condition: "{*%.status}",
expectedErrString: "unrecognized character in action",
},
{
name: "Failed - path not found in object",
condition: "{.status}",
expectedErrString: "status is not found",
},
}
for _, test := range testCases {
tt := test
t.Run(tt.name, func(t *testing.T) {
exp := poller.Expression{
Condition: tt.condition,
Value: tt.value,
}
res, err := exp.Match(tt.obj)
assert.Equal(t, tt.expectedResult, res)
if test.expectedErrString != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), test.expectedErrString)
} else {
require.NoError(t, err)
}
})
}
}

View File

@ -16,13 +16,12 @@ package poller
import (
"context"
"strings"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/scheme"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
@ -31,25 +30,43 @@ import (
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/controller-runtime/pkg/client"
"opendev.org/airship/airshipctl/pkg/log"
"opendev.org/airship/airshipctl/pkg/api/v1alpha1"
)
const allowedApplyErrors = 3
// NewStatusPoller creates a new StatusPoller using the given clusterreader and mapper. The StatusPoller
// will use the client for all calls to the cluster.
func NewStatusPoller(reader client.Reader, mapper meta.RESTMapper) *StatusPoller {
func NewStatusPoller(f cmdutil.Factory, conditions ...v1alpha1.Condition) (*StatusPoller, error) {
config, err := f.ToRESTConfig()
if err != nil {
return nil, err
}
mapper, err := f.ToRESTMapper()
if err != nil {
return nil, err
}
c, err := client.New(config, client.Options{Scheme: scheme.Scheme, Mapper: mapper})
if err != nil {
return nil, err
}
return &StatusPoller{
engine: &engine.PollerEngine{
Reader: reader,
Engine: &engine.PollerEngine{
Reader: c,
Mapper: mapper,
},
}
conditions: conditions,
}, nil
}
// StatusPoller provides functionality for polling a cluster for status for a set of resources.
type StatusPoller struct {
engine *engine.PollerEngine
ClusterReaderFactoryFunc engine.ClusterReaderFactoryFunc
StatusReadersFactoryFunc engine.StatusReadersFactoryFunc
Engine *engine.PollerEngine
conditions []v1alpha1.Condition
}
// Poll will create a new statusPollerRunner that will poll all the resources provided and report their status
@ -57,10 +74,12 @@ type StatusPoller struct {
// context passed in.
func (s *StatusPoller) Poll(
ctx context.Context, identifiers []object.ObjMetadata, options polling.Options) <-chan event.Event {
return s.engine.Poll(ctx, identifiers, engine.Options{
return s.Engine.Poll(ctx, identifiers, engine.Options{
PollInterval: options.PollInterval,
ClusterReaderFactoryFunc: clusterReaderFactoryFunc(options.UseCache),
StatusReadersFactoryFunc: s.createStatusReaders,
ClusterReaderFactoryFunc: map[bool]engine.ClusterReaderFactoryFunc{true: clusterReaderFactoryFunc(options.UseCache),
false: s.ClusterReaderFactoryFunc}[s.ClusterReaderFactoryFunc == nil],
StatusReadersFactoryFunc: map[bool]engine.StatusReadersFactoryFunc{true: s.createStatusReadersFactory(),
false: s.StatusReadersFactoryFunc}[s.StatusReadersFactoryFunc == nil],
})
}
@ -69,7 +88,8 @@ func (s *StatusPoller) Poll(
// a specific statusreaders.
// TODO: We should consider making the registration more automatic instead of having to create each of them
// here. Also, it might be worth creating them on demand.
func (s *StatusPoller) createStatusReaders(reader engine.ClusterReader, mapper meta.RESTMapper) (
func (s *StatusPoller) createStatusReadersFactory() engine.StatusReadersFactoryFunc {
return func(reader engine.ClusterReader, mapper meta.RESTMapper) (
map[schema.GroupKind]engine.StatusReader, engine.StatusReader) {
defaultStatusReader := statusreaders.NewGenericStatusReader(reader, mapper)
replicaSetStatusReader := statusreaders.NewReplicaSetStatusReader(reader, mapper, defaultStatusReader)
@ -81,7 +101,16 @@ func (s *StatusPoller) createStatusReaders(reader engine.ClusterReader, mapper m
appsv1.SchemeGroupVersion.WithKind("StatefulSet").GroupKind(): statefulSetStatusReader,
appsv1.SchemeGroupVersion.WithKind("ReplicaSet").GroupKind(): replicaSetStatusReader,
}
if len(s.conditions) > 0 {
cr := NewCustomResourceReader(reader, mapper, s.conditions...)
for _, tm := range s.conditions {
statusReaders[tm.GroupVersionKind().GroupKind()] = cr
}
}
return statusReaders, defaultStatusReader
}
}
// clusterReaderFactoryFunc returns a factory function for creating an instance of a ClusterReader.
@ -101,44 +130,3 @@ func clusterReaderFactoryFunc(useCache bool) engine.ClusterReaderFactoryFunc {
return &clusterreader.DirectClusterReader{Reader: r}, nil
}
}
// CachingClusterReader is wrapper for kstatus.CachingClusterReader implementation
type CachingClusterReader struct {
Cr *clusterreader.CachingClusterReader
applyErrors []error
}
// Get is a wrapper for kstatus.CachingClusterReader Get method
func (c *CachingClusterReader) Get(ctx context.Context, key client.ObjectKey, obj *unstructured.Unstructured) error {
return c.Cr.Get(ctx, key, obj)
}
// ListNamespaceScoped is a wrapper for kstatus.CachingClusterReader ListNamespaceScoped method
func (c *CachingClusterReader) ListNamespaceScoped(
ctx context.Context,
list *unstructured.UnstructuredList,
namespace string,
selector labels.Selector) error {
return c.Cr.ListNamespaceScoped(ctx, list, namespace, selector)
}
// ListClusterScoped is a wrapper for kstatus.CachingClusterReader ListClusterScoped method
func (c *CachingClusterReader) ListClusterScoped(
ctx context.Context,
list *unstructured.UnstructuredList,
selector labels.Selector) error {
return c.Cr.ListClusterScoped(ctx, list, selector)
}
// Sync is a wrapper for kstatus.CachingClusterReader Sync method, allows to filter specific errors
func (c *CachingClusterReader) Sync(ctx context.Context) error {
err := c.Cr.Sync(ctx)
if err != nil && strings.Contains(err.Error(), "request timed out") {
c.applyErrors = append(c.applyErrors, err)
if len(c.applyErrors) < allowedApplyErrors {
log.Printf("timeout error occurred during sync: '%v', skipping", err)
return nil
}
}
return err
}

View File

@ -15,25 +15,304 @@
package poller_test
import (
"context"
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/util/openapi"
"k8s.io/kubectl/pkg/validation"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
kstatustestutil "sigs.k8s.io/cli-utils/pkg/kstatus/polling/testutil"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/testutil"
"sigs.k8s.io/controller-runtime/pkg/client"
"opendev.org/airship/airshipctl/pkg/k8s/poller"
k8sutils "opendev.org/airship/airshipctl/pkg/k8s/utils"
)
func TestNewStatusPoller(t *testing.T) {
f := k8sutils.FactoryFromKubeConfig("testdata/kubeconfig.yaml", "")
restConfig, err := f.ToRESTConfig()
require.NoError(t, err)
restMapper, err := f.ToRESTMapper()
require.NoError(t, err)
restClient, err := client.New(restConfig, client.Options{Mapper: restMapper})
require.NoError(t, err)
testCases := map[string]struct {
factory cmdutil.Factory
expectedError bool
}{
"failed rest config": {
factory: &MockCmdUtilFactory{MockToRESTConfig: func() (*rest.Config, error) {
return nil, errors.New("rest config error")
}},
expectedError: true,
},
"failed rest mapper": {
factory: &MockCmdUtilFactory{MockToRESTConfig: func() (*rest.Config, error) {
return nil, nil
},
MockToRESTMapper: func() (meta.RESTMapper, error) {
return nil, errors.New("rest mapper error")
}},
expectedError: true,
},
"failed new client": {
factory: &MockCmdUtilFactory{MockToRESTConfig: func() (*rest.Config, error) {
return nil, nil
},
MockToRESTMapper: func() (meta.RESTMapper, error) {
return nil, nil
}},
expectedError: true,
},
"success new poller": {
factory: &MockCmdUtilFactory{MockToRESTConfig: func() (*rest.Config, error) {
return &rest.Config{}, nil
},
MockToRESTMapper: func() (meta.RESTMapper, error) {
return testutil.NewFakeRESTMapper(), nil
}},
expectedError: false,
},
}
a := poller.NewStatusPoller(restClient, restMapper)
assert.NotNil(t, a)
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
p, err := poller.NewStatusPoller(tc.factory)
if tc.expectedError {
require.Error(t, err)
} else {
require.NoError(t, err)
require.NotNil(t, p)
}
})
}
}
func TestStatusPollerRun(t *testing.T) {
testCases := map[string]struct {
identifiers []object.ObjMetadata
ClusterReaderFactoryFunc engine.ClusterReaderFactoryFunc
StatusReadersFactoryFunc engine.StatusReadersFactoryFunc
defaultStatusReader engine.StatusReader
expectedEventTypes []event.EventType
}{
"single resource": {
identifiers: []object.ObjMetadata{
{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Name: "foo",
Namespace: "bar",
},
},
defaultStatusReader: &fakeStatusReader{
resourceStatuses: map[schema.GroupKind][]status.Status{
schema.GroupKind{Group: "apps", Kind: "Deployment"}: { //nolint:gofmt
status.InProgressStatus,
status.CurrentStatus,
},
},
resourceStatusCount: make(map[schema.GroupKind]int),
},
expectedEventTypes: []event.EventType{
event.ResourceUpdateEvent,
event.ResourceUpdateEvent,
},
ClusterReaderFactoryFunc: func(_ client.Reader, _ meta.RESTMapper, _ []object.ObjMetadata) (
engine.ClusterReader, error) {
return kstatustestutil.NewNoopClusterReader(), nil
},
StatusReadersFactoryFunc: func(_ engine.ClusterReader, _ meta.RESTMapper) (
statusReaders map[schema.GroupKind]engine.StatusReader, defaultStatusReader engine.StatusReader) {
return make(map[schema.GroupKind]engine.StatusReader), &fakeStatusReader{
resourceStatuses: map[schema.GroupKind][]status.Status{
schema.GroupKind{Group: "apps", Kind: "Deployment"}: { //nolint:gofmt
status.InProgressStatus,
status.CurrentStatus,
},
},
resourceStatusCount: make(map[schema.GroupKind]int),
}
},
},
"multiple resources": {
identifiers: []object.ObjMetadata{
{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Name: "foo",
Namespace: "default",
},
{
GroupKind: schema.GroupKind{
Group: "",
Kind: "Service",
},
Name: "bar",
Namespace: "default",
},
},
ClusterReaderFactoryFunc: func(_ client.Reader, _ meta.RESTMapper, _ []object.ObjMetadata) (
engine.ClusterReader, error) {
return kstatustestutil.NewNoopClusterReader(), nil
},
StatusReadersFactoryFunc: func(_ engine.ClusterReader, _ meta.RESTMapper) (
statusReaders map[schema.GroupKind]engine.StatusReader, defaultStatusReader engine.StatusReader) {
return make(map[schema.GroupKind]engine.StatusReader), &fakeStatusReader{
resourceStatuses: map[schema.GroupKind][]status.Status{
schema.GroupKind{Group: "apps", Kind: "Deployment"}: { //nolint:gofmt
status.InProgressStatus,
status.CurrentStatus,
},
schema.GroupKind{Group: "", Kind: "Service"}: { //nolint:gofmt
status.InProgressStatus,
status.InProgressStatus,
status.CurrentStatus,
},
},
resourceStatusCount: make(map[schema.GroupKind]int),
}
},
expectedEventTypes: []event.EventType{
event.ResourceUpdateEvent,
event.ResourceUpdateEvent,
event.ResourceUpdateEvent,
event.ResourceUpdateEvent,
},
},
}
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
identifiers := tc.identifiers
fakeMapper := testutil.NewFakeRESTMapper(
appsv1.SchemeGroupVersion.WithKind("Deployment"),
v1.SchemeGroupVersion.WithKind("Service"),
)
e := poller.StatusPoller{
ClusterReaderFactoryFunc: tc.ClusterReaderFactoryFunc,
StatusReadersFactoryFunc: tc.StatusReadersFactoryFunc,
Engine: &engine.PollerEngine{Mapper: fakeMapper},
}
options := polling.Options{PollInterval: time.Second, UseCache: true}
eventChannel := e.Poll(ctx, identifiers, options)
var eventTypes []event.EventType
for ch := range eventChannel {
eventTypes = append(eventTypes, ch.EventType)
if len(eventTypes) == len(tc.expectedEventTypes) {
cancel()
}
}
require.Equal(t, tc.expectedEventTypes, eventTypes)
})
}
}
type fakeStatusReader struct {
resourceStatuses map[schema.GroupKind][]status.Status
resourceStatusCount map[schema.GroupKind]int
}
func (f *fakeStatusReader) ReadStatus(_ context.Context, identifier object.ObjMetadata) *event.ResourceStatus {
count := f.resourceStatusCount[identifier.GroupKind]
resourceStatusSlice := f.resourceStatuses[identifier.GroupKind]
var resourceStatus status.Status
if len(resourceStatusSlice) > count {
resourceStatus = resourceStatusSlice[count]
} else {
resourceStatus = resourceStatusSlice[len(resourceStatusSlice)-1]
}
f.resourceStatusCount[identifier.GroupKind] = count + 1
return &event.ResourceStatus{
Identifier: identifier,
Status: resourceStatus,
}
}
func (f *fakeStatusReader) ReadStatusForObject(_ context.Context, _ *unstructured.Unstructured) *event.ResourceStatus {
return nil
}
var _ cmdutil.Factory = &MockCmdUtilFactory{}
type MockCmdUtilFactory struct {
MockToRESTConfig func() (*rest.Config, error)
MockToRESTMapper func() (meta.RESTMapper, error)
}
func (n *MockCmdUtilFactory) ToRESTConfig() (*rest.Config, error) {
return n.MockToRESTConfig()
}
func (n *MockCmdUtilFactory) ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error) {
return nil, nil
}
func (n *MockCmdUtilFactory) ToRESTMapper() (meta.RESTMapper, error) {
return n.MockToRESTMapper()
}
func (n *MockCmdUtilFactory) ToRawKubeConfigLoader() clientcmd.ClientConfig {
return nil
}
func (n *MockCmdUtilFactory) DynamicClient() (dynamic.Interface, error) {
return nil, nil
}
func (n *MockCmdUtilFactory) KubernetesClientSet() (*kubernetes.Clientset, error) {
return nil, nil
}
func (n *MockCmdUtilFactory) RESTClient() (*rest.RESTClient, error) {
return nil, nil
}
func (n *MockCmdUtilFactory) NewBuilder() *resource.Builder {
return nil
}
func (n *MockCmdUtilFactory) ClientForMapping(_ *meta.RESTMapping) (resource.RESTClient, error) {
return nil, nil
}
func (n *MockCmdUtilFactory) UnstructuredClientForMapping(_ *meta.RESTMapping) (resource.RESTClient, error) {
return nil, nil
}
func (n *MockCmdUtilFactory) Validator(_ bool) (validation.Schema, error) {
return nil, nil
}
func (n *MockCmdUtilFactory) OpenAPISchema() (openapi.Resources, error) {
return nil, nil
}
func (n *MockCmdUtilFactory) OpenAPIGetter() discovery.OpenAPISchemaInterface {
return nil
}

182
pkg/k8s/poller/status.go Executable file
View File

@ -0,0 +1,182 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package poller
import (
"context"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
"opendev.org/airship/airshipctl/pkg/api/v1alpha1"
)
// CustomResourceReader is a wrapper for clu-utils genericClusterReader struct
type CustomResourceReader struct {
// Reader is an implementation of the ClusterReader interface. It provides a
// way for the StatusReader to fetch resources from the cluster.
Reader engine.ClusterReader
// Mapper provides a way to look up the resource types that are available
// in the cluster.
Mapper meta.RESTMapper
// StatusFunc is a function for computing status of object
StatusFunc func(u *unstructured.Unstructured) (*status.Result, error)
// CondMap is a map with stored jsonpath expressions per GK to compute custom status
CondMap map[schema.GroupKind]Expression
}
var _ engine.StatusReader = &CustomResourceReader{}
// NewCustomResourceReader implements custom logic to retrieve resource's status
func NewCustomResourceReader(reader engine.ClusterReader, mapper meta.RESTMapper,
conditions ...v1alpha1.Condition) engine.StatusReader {
condMap := make(map[schema.GroupKind]Expression)
for _, cond := range conditions {
condMap[cond.GroupVersionKind().GroupKind()] = Expression{
Condition: cond.JSONPath,
Value: cond.Value,
}
}
return &CustomResourceReader{
Reader: reader,
Mapper: mapper,
StatusFunc: status.Compute,
CondMap: condMap,
}
}
// ReadStatus will fetch the resource identified by the given identifier
// from the cluster and return an ResourceStatus that will contain
// information about the latest state of the resource, its computed status
// and information about any generated resources.
func (c *CustomResourceReader) ReadStatus(ctx context.Context, identifier object.ObjMetadata) *event.ResourceStatus {
obj, err := c.lookupResource(ctx, identifier)
if err != nil {
return handleResourceStatusError(identifier, err)
}
return c.ReadStatusForObject(ctx, obj)
}
// ReadStatusForObject is similar to ReadStatus, but instead of looking up the
// resource based on an identifier, it will use the passed-in resource.
func (c *CustomResourceReader) ReadStatusForObject(_ context.Context,
obj *unstructured.Unstructured) *event.ResourceStatus {
res, err := c.StatusFunc(obj)
if err != nil {
return &event.ResourceStatus{
Identifier: toIdentifier(obj),
Status: status.UnknownStatus,
Error: err,
}
}
if val, ok := c.CondMap[obj.GroupVersionKind().GroupKind()]; ok && res.Status == status.CurrentStatus {
b, err := val.Match(obj.UnstructuredContent())
if err != nil {
return &event.ResourceStatus{
Identifier: toIdentifier(obj),
Status: status.UnknownStatus,
Error: err,
}
}
if b {
return &event.ResourceStatus{
Identifier: toIdentifier(obj),
Status: res.Status,
Resource: obj,
Message: res.Message,
}
}
return &event.ResourceStatus{
Identifier: toIdentifier(obj),
Status: status.InProgressStatus,
}
}
return &event.ResourceStatus{
Identifier: toIdentifier(obj),
Status: res.Status,
Resource: obj,
Message: res.Message,
}
}
// lookupResource looks up a resource with the given identifier. It will use the rest mapper to resolve
// the version of the GroupKind given in the identifier.
// If the resource is found, it is returned. If it is not found or something
// went wrong, the function will return an error.
func (c *CustomResourceReader) lookupResource(ctx context.Context,
identifier object.ObjMetadata) (*unstructured.Unstructured, error) {
groupVersionKind, err := gvk(identifier.GroupKind, c.Mapper)
if err != nil {
return nil, err
}
var u unstructured.Unstructured
u.SetGroupVersionKind(groupVersionKind)
key := types.NamespacedName{
Name: identifier.Name,
Namespace: identifier.Namespace,
}
err = c.Reader.Get(ctx, key, &u)
if err != nil {
return nil, err
}
return &u, nil
}
// gvk looks up the GVK from a GroupKind using the rest mapper.
func gvk(gk schema.GroupKind, mapper meta.RESTMapper) (schema.GroupVersionKind, error) {
mapping, err := mapper.RESTMapping(gk)
if err != nil {
return schema.GroupVersionKind{}, err
}
return mapping.GroupVersionKind, nil
}
func toIdentifier(u *unstructured.Unstructured) object.ObjMetadata {
return object.ObjMetadata{
GroupKind: u.GroupVersionKind().GroupKind(),
Name: u.GetName(),
Namespace: u.GetNamespace(),
}
}
// handleResourceStatusError construct the appropriate ResourceStatus
// object based on the type of error.
func handleResourceStatusError(identifier object.ObjMetadata, err error) *event.ResourceStatus {
if errors.IsNotFound(err) {
return &event.ResourceStatus{
Identifier: identifier,
Status: status.NotFoundStatus,
Message: "Resource not found",
}
}
return &event.ResourceStatus{
Identifier: identifier,
Status: status.UnknownStatus,
Error: err,
}
}

113
pkg/k8s/poller/status_test.go Executable file
View File

@ -0,0 +1,113 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package poller_test
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/testutil"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
fakemapper "sigs.k8s.io/cli-utils/pkg/testutil"
"opendev.org/airship/airshipctl/pkg/k8s/poller"
)
var (
customGVK = schema.GroupVersionKind{
Group: "custom.io",
Version: "v1beta1",
Kind: "Custom",
}
name = "Foo"
namespace = "default"
)
func TestGenericStatusReader(t *testing.T) {
testCases := map[string]struct {
result *status.Result
err error
expectedIdentifier object.ObjMetadata
expectedStatus status.Status
condMap map[schema.GroupKind]poller.Expression
}{
"successfully computes status": {
result: &status.Result{
Status: status.InProgressStatus,
Message: "this is a test",
},
expectedIdentifier: object.ObjMetadata{
GroupKind: customGVK.GroupKind(),
Name: name,
Namespace: namespace,
},
expectedStatus: status.InProgressStatus,
},
"successfully computes custom status": {
result: &status.Result{
Status: status.CurrentStatus,
Message: "this is a test",
},
expectedIdentifier: object.ObjMetadata{
GroupKind: customGVK.GroupKind(),
Name: name,
Namespace: namespace,
},
condMap: map[schema.GroupKind]poller.Expression{
customGVK.GroupKind(): {Condition: "{.metadata.name}", Value: "Bar"}},
expectedStatus: status.InProgressStatus,
},
"computing status fails": {
err: fmt.Errorf("this error is a test"),
expectedIdentifier: object.ObjMetadata{
GroupKind: customGVK.GroupKind(),
Name: name,
Namespace: namespace,
},
expectedStatus: status.UnknownStatus,
},
}
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
fakeReader := testutil.NewNoopClusterReader()
fakeMapper := fakemapper.NewFakeRESTMapper()
resourceStatusReader := &poller.CustomResourceReader{
Reader: fakeReader,
Mapper: fakeMapper,
StatusFunc: func(u *unstructured.Unstructured) (*status.Result, error) {
return tc.result, tc.err
},
CondMap: tc.condMap,
}
o := &unstructured.Unstructured{}
o.SetGroupVersionKind(customGVK)
o.SetName(name)
o.SetNamespace(namespace)
resourceStatus := resourceStatusReader.ReadStatusForObject(context.Background(), o)
require.Equal(t, tc.expectedIdentifier, resourceStatus.Identifier)
require.Equal(t, tc.expectedStatus, resourceStatus.Status)
})
}
}

View File

@ -1,19 +0,0 @@
apiVersion: v1
clusters:
- cluster:
certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUN5RENDQWJDZ0F3SUJBZ0lCQURBTkJna3Foa2lHOXcwQkFRc0ZBREFWTVJNd0VRWURWUVFERXdwcmRXSmwKY201bGRHVnpNQjRYRFRFNU1Ea3lPVEUzTURNd09Wb1hEVEk1TURreU5qRTNNRE13T1Zvd0ZURVRNQkVHQTFVRQpBeE1LYTNWaVpYSnVaWFJsY3pDQ0FTSXdEUVlKS29aSWh2Y05BUUVCQlFBRGdnRVBBRENDQVFvQ2dnRUJBTUZyCkdxM0kyb2dZci81Y01Udy9Na1pORTNWQURzdEdyU240WjU2TDhPUGhMcUhDN2t1dno2dVpES3dCSGtGeTBNK2MKRXIzd2piUGE1aTV5NmkyMGtxSHBVMjdPZTA0dzBXV2s4N0RSZVlWaGNoZVJHRXoraWt3SndIcGRmMjJVemZNKwpkSDBzaUhuMVd6UnovYk4za3hMUzJlMnZ2U1Y3bmNubk1YRUd4OXV0MUY0NThHeWxxdmxXTUlWMzg5Q2didXFDCkcwcFdiMTBLM0RVZWdiT25Xa1FmSm5sTWRRVVZDUVdZZEZaaklrcWtkWi9hVTRobkNEV01oZXNWRnFNaDN3VVAKczhQay9BNWh1ZFFPbnFRNDVIWXZLdjZ5RjJWcDUyWExBRUx3NDJ4aVRKZlh0V1h4eHR6cU4wY1lyL2VxeS9XMQp1YVVGSW5xQjFVM0JFL1oxbmFrQ0F3RUFBYU1qTUNFd0RnWURWUjBQQVFIL0JBUURBZ0trTUE4R0ExVWRFd0VCCi93UUZNQU1CQWY4d0RRWUpLb1pJaHZjTkFRRUxCUUFEZ2dFQkFKUUVKQVBLSkFjVDVuK3dsWGJsdU9mS0J3c2gKZTI4R1c5R2QwM0N0NGF3RzhzMXE1ZHNua2tpZmVTUENHVFZ1SXF6UTZDNmJaSk9SMDMvVEl5ejh6NDJnaitDVApjWUZXZkltM2RKTnpRL08xWkdySXZZNWdtcWJtWDlpV0JaU24rRytEOGxubzd2aGMvY0tBRFR5OTMvVU92MThuCkdhMnIrRGJJcHcyTWVBVEl2elpxRS9RWlVSQ25DMmdjUFhTVzFqN2h4R3o1a3ZNcGVDZTdQYVUvdVFvblVHSWsKZ2t6ZzI4NHQvREhUUzc4N1V1SUg5cXBaV09yTFNMOGFBeUxQUHhWSXBteGZmbWRETE9TS2VUemRlTmxoSitUMwowQlBVaHBQTlJBNTNJN0hRQjhVUDR2elNONTkzZ1VFbVlFQ2Jic2RYSzB6ZVR6SDdWWHR2Zmd5WTVWWT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
server: https://10.0.1.7:6443
name: kubernetes_target
contexts:
- context:
cluster: kubernetes_target
user: kubernetes-admin
name: kubernetes-admin@kubernetes
current-context: ""
kind: Config
preferences: {}
users:
- name: kubernetes-admin
user:
client-certificate-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUM4akNDQWRxZ0F3SUJBZ0lJQXhEdzk2RUY4SXN3RFFZSktvWklodmNOQVFFTEJRQXdGVEVUTUJFR0ExVUUKQXhNS2EzVmlaWEp1WlhSbGN6QWVGdzB4T1RBNU1qa3hOekF6TURsYUZ3MHlNREE1TWpneE56QXpNVEphTURReApGekFWQmdOVkJBb1REbk41YzNSbGJUcHRZWE4wWlhKek1Sa3dGd1lEVlFRREV4QnJkV0psY201bGRHVnpMV0ZrCmJXbHVNSUlCSWpBTkJna3Foa2lHOXcwQkFRRUZBQU9DQVE4QU1JSUJDZ0tDQVFFQXV6R0pZdlBaNkRvaTQyMUQKSzhXSmFaQ25OQWQycXo1cC8wNDJvRnpRUGJyQWd6RTJxWVZrek9MOHhBVmVSN1NONXdXb1RXRXlGOEVWN3JyLwo0K0hoSEdpcTVQbXF1SUZ5enpuNi9JWmM4alU5eEVmenZpa2NpckxmVTR2UlhKUXdWd2dBU05sMkFXQUloMmRECmRUcmpCQ2ZpS1dNSHlqMFJiSGFsc0J6T3BnVC9IVHYzR1F6blVRekZLdjJkajVWMU5rUy9ESGp5UlJKK0VMNlEKQlltR3NlZzVQNE5iQzllYnVpcG1NVEFxL0p1bU9vb2QrRmpMMm5acUw2Zkk2ZkJ0RjVPR2xwQ0IxWUo4ZnpDdApHUVFaN0hUSWJkYjJ0cDQzRlZPaHlRYlZjSHFUQTA0UEoxNSswV0F5bVVKVXo4WEE1NDRyL2J2NzRKY0pVUkZoCmFyWmlRd0lEQVFBQm95Y3dKVEFPQmdOVkhROEJBZjhFQkFNQ0JhQXdFd1lEVlIwbEJBd3dDZ1lJS3dZQkJRVUgKQXdJd0RRWUpLb1pJaHZjTkFRRUxCUUFEZ2dFQkFMMmhIUmVibEl2VHJTMFNmUVg1RG9ueVVhNy84aTg1endVWApSd3dqdzFuS0U0NDJKbWZWRGZ5b0hRYUM4Ti9MQkxyUXM0U0lqU1JYdmFHU1dSQnRnT1RRV21Db1laMXdSbjdwCndDTXZQTERJdHNWWm90SEZpUFl2b1lHWFFUSXA3YlROMmg1OEJaaEZ3d25nWUovT04zeG1rd29IN1IxYmVxWEYKWHF1TTluekhESk41VlZub1lQR09yRHMwWlg1RnNxNGtWVU0wVExNQm9qN1ZIRDhmU0E5RjRYNU4yMldsZnNPMAo4aksrRFJDWTAyaHBrYTZQQ0pQS0lNOEJaMUFSMG9ZakZxT0plcXpPTjBqcnpYWHh4S2pHVFVUb1BldVA5dCtCCjJOMVA1TnI4a2oxM0lrend5Q1NZclFVN09ZM3ltZmJobHkrcXZxaFVFa014MlQ1SkpmQT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
client-key-data: LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFcFFJQkFBS0NBUUVBdXpHSll2UFo2RG9pNDIxREs4V0phWkNuTkFkMnF6NXAvMDQyb0Z6UVBickFnekUyCnFZVmt6T0w4eEFWZVI3U041d1dvVFdFeUY4RVY3cnIvNCtIaEhHaXE1UG1xdUlGeXp6bjYvSVpjOGpVOXhFZnoKdmlrY2lyTGZVNHZSWEpRd1Z3Z0FTTmwyQVdBSWgyZERkVHJqQkNmaUtXTUh5ajBSYkhhbHNCek9wZ1QvSFR2MwpHUXpuVVF6Rkt2MmRqNVYxTmtTL0RIanlSUkorRUw2UUJZbUdzZWc1UDROYkM5ZWJ1aXBtTVRBcS9KdW1Pb29kCitGakwyblpxTDZmSTZmQnRGNU9HbHBDQjFZSjhmekN0R1FRWjdIVEliZGIydHA0M0ZWT2h5UWJWY0hxVEEwNFAKSjE1KzBXQXltVUpVejhYQTU0NHIvYnY3NEpjSlVSRmhhclppUXdJREFRQUJBb0lCQVFDU0pycjlaeVpiQ2dqegpSL3VKMFZEWCt2aVF4c01BTUZyUjJsOE1GV3NBeHk1SFA4Vk4xYmc5djN0YUVGYnI1U3hsa3lVMFJRNjNQU25DCm1uM3ZqZ3dVQWlScllnTEl5MGk0UXF5VFBOU1V4cnpTNHRxTFBjM3EvSDBnM2FrNGZ2cSsrS0JBUUlqQnloamUKbnVFc1JpMjRzT3NESlM2UDE5NGlzUC9yNEpIM1M5bFZGbkVuOGxUR2c0M1kvMFZoMXl0cnkvdDljWjR5ZUNpNwpjMHFEaTZZcXJZaFZhSW9RRW1VQjdsbHRFZkZzb3l4VDR6RTE5U3pVbkRoMmxjYTF1TzhqcmI4d2xHTzBoQ2JyClB1R1l2WFFQa3Q0VlNmalhvdGJ3d2lBNFRCVERCRzU1bHp6MmNKeS9zSS8zSHlYbEMxcTdXUmRuQVhhZ1F0VzkKOE9DZGRkb0JBb0dCQU5NcUNtSW94REtyckhZZFRxT1M1ZFN4cVMxL0NUN3ZYZ0pScXBqd2Y4WHA2WHo0KzIvTAozVXFaVDBEL3dGTkZkc1Z4eFYxMnNYMUdwMHFWZVlKRld5OVlCaHVSWGpTZ0ZEWldSY1Z1Y01sNVpPTmJsbmZGCjVKQ0xnNXFMZ1g5VTNSRnJrR3A0R241UDQxamg4TnhKVlhzZG5xWE9xNTFUK1RRT1UzdkpGQjc1QW9HQkFPTHcKalp1cnZtVkZyTHdaVGgvRDNpWll5SVV0ZUljZ2NKLzlzbTh6L0pPRmRIbFd4dGRHUFVzYVd1MnBTNEhvckFtbgpqTm4vSTluUXd3enZ3MWUzVVFPbUhMRjVBczk4VU5hbk5TQ0xNMW1yaXZHRXJ1VHFnTDM1bU41eFZPdTUxQU5JCm4yNkFtODBJT2JDeEtLa0R0ZXJSaFhHd3g5c1pONVJCbG9VRThZNGJBb0dBQ3ZsdVhMZWRxcng5VkE0bDNoNXUKVDJXRVUxYjgxZ1orcmtRc1I1S0lNWEw4cllBTElUNUpHKzFuendyN3BkaEFXZmFWdVV2SDRhamdYT0h6MUs5aQpFODNSVTNGMG9ldUg0V01PY1RwU0prWm0xZUlXcWRiaEVCb1FGdUlWTXRib1BsV0d4ZUhFRHJoOEtreGp4aThSCmdEcUQyajRwY1IzQ0g5QjJ5a0lqQjVFQ2dZRUExc0xXLys2enE1c1lNSm14K1JXZThhTXJmL3pjQnVTSU1LQWgKY0dNK0wwMG9RSHdDaUU4TVNqcVN1ajV3R214YUFuanhMb3ZwSFlRV1VmUEVaUW95UE1YQ2VhRVBLOU4xbk8xMwp0V2lHRytIZkIxaU5PazFCc0lhNFNDbndOM1FRVTFzeXBaeEgxT3hueS9LYmkvYmEvWEZ5VzNqMGFUK2YvVWxrCmJGV1ZVdWtDZ1lFQTBaMmRTTFlmTjV5eFNtYk5xMWVqZXdWd1BjRzQxR2hQclNUZEJxdHFac1doWGE3aDdLTWEKeHdvamh5SXpnTXNyK2tXODdlajhDQ2h0d21sQ1p5QU92QmdOZytncnJ1cEZLM3FOSkpKeU9YREdHckdpbzZmTQp5aXB3Q2tZVGVxRThpZ1J6UkI5QkdFUGY4eVpjMUtwdmZhUDVhM0lRZmxiV0czbGpUemNNZVZjPQotLS0tLUVORCBSU0EgUFJJVkFURSBLRVktLS0tLQo=

View File

@ -62,6 +62,7 @@ func NewKubeApplierExecutor(cfg ifc.ExecutorConfig) (ifc.Executor, error) {
if err != nil {
return nil, err
}
return &KubeApplierExecutor{
ExecutorBundle: bundle,
BundleName: cfg.PhaseName,
@ -128,7 +129,7 @@ func (e *KubeApplierExecutor) prepareApplier(ch chan events.Event) (*k8sapplier.
e.cleanup = cleanup
log.Printf("Using kubeconfig at '%s' and context '%s'", path, context)
factory := utils.FactoryFromKubeConfig(path, context)
return k8sapplier.NewApplier(ch, factory), bundle, nil
return k8sapplier.NewApplier(ch, factory, e.apiObject.Config.WaitOptions.Conditions), bundle, nil
}
// Validate document set