Move kubernetes-apply phase executor to a separate package

Having an executor within applier package creates
potential import cycling. This patch moves it to a separate
package which can be used to conveniently store all the executors
at one place.

Change-Id: I68d909489b691e4fb7129446ef9a3fb085f8683c
Signed-off-by: Ruslan Aliev <raliev@mirantis.com>
Relates-To: #432
This commit is contained in:
Ruslan Aliev
2020-12-10 09:27:55 -06:00
parent 36303006af
commit b85f8fa8f9
3 changed files with 32 additions and 38 deletions

View File

@@ -1,164 +0,0 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package applier
import (
"io"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/common"
airshipv1 "opendev.org/airship/airshipctl/pkg/api/v1alpha1"
"opendev.org/airship/airshipctl/pkg/cluster/clustermap"
"opendev.org/airship/airshipctl/pkg/document"
"opendev.org/airship/airshipctl/pkg/errors"
"opendev.org/airship/airshipctl/pkg/events"
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
"opendev.org/airship/airshipctl/pkg/k8s/utils"
"opendev.org/airship/airshipctl/pkg/log"
"opendev.org/airship/airshipctl/pkg/phase/ifc"
)
// ExecutorOptions provide a way to configure executor
type ExecutorOptions struct {
BundleName string
ClusterName string
ExecutorDocument document.Document
BundleFactory document.BundleFactoryFunc
Kubeconfig kubeconfig.Interface
Helper ifc.Helper
ClusterMap clustermap.ClusterMap
}
var _ ifc.Executor = &Executor{}
// RegisterExecutor adds executor to phase executor registry
func RegisterExecutor(registry map[schema.GroupVersionKind]ifc.ExecutorFactory) error {
obj := &airshipv1.KubernetesApply{}
gvks, _, err := airshipv1.Scheme.ObjectKinds(obj)
if err != nil {
return err
}
registry[gvks[0]] = registerExecutor
return nil
}
// registerExecutor is here so that executor in theory can be used outside phases
func registerExecutor(cfg ifc.ExecutorConfig) (ifc.Executor, error) {
return NewExecutor(ExecutorOptions{
ClusterName: cfg.ClusterName,
BundleName: cfg.PhaseName,
Helper: cfg.Helper,
ExecutorDocument: cfg.ExecutorDocument,
BundleFactory: cfg.BundleFactory,
Kubeconfig: cfg.KubeConfig,
ClusterMap: cfg.ClusterMap,
})
}
// Executor applies resources to kubernetes
type Executor struct {
Options ExecutorOptions
ExecutorBundle document.Bundle
apiObject *airshipv1.KubernetesApply
cleanup kubeconfig.Cleanup
}
// NewExecutor returns instance of executor
func NewExecutor(opts ExecutorOptions) (*Executor, error) {
apiObj := &airshipv1.KubernetesApply{}
err := opts.ExecutorDocument.ToAPIObject(apiObj, airshipv1.Scheme)
if err != nil {
return nil, err
}
bundle, err := opts.BundleFactory()
if err != nil {
return nil, err
}
return &Executor{
ExecutorBundle: bundle,
Options: opts,
apiObject: apiObj,
}, nil
}
// Run executor, should be performed in separate go routine
func (e *Executor) Run(ch chan events.Event, runOpts ifc.RunOptions) {
applier, filteredBundle, err := e.prepareApplier(ch)
if err != nil {
handleError(ch, err)
close(ch)
return
}
defer e.cleanup()
dryRunStrategy := common.DryRunNone
if runOpts.DryRun {
dryRunStrategy = common.DryRunClient
}
timeout := time.Second * time.Duration(e.apiObject.Config.WaitOptions.Timeout)
if int64(runOpts.Timeout/time.Second) != 0 {
timeout = runOpts.Timeout
}
log.Debugf("WaitTimeout: %v", timeout)
applyOptions := ApplyOptions{
DryRunStrategy: dryRunStrategy,
Prune: e.apiObject.Config.PruneOptions.Prune,
BundleName: e.Options.BundleName,
WaitTimeout: timeout,
}
applier.ApplyBundle(filteredBundle, applyOptions)
}
func (e *Executor) prepareApplier(ch chan events.Event) (*Applier, document.Bundle, error) {
log.Debug("Getting kubeconfig context name from cluster map")
context, err := e.Options.ClusterMap.ClusterKubeconfigContext(e.Options.ClusterName)
if err != nil {
return nil, nil, err
}
log.Debug("Getting kubeconfig file information from kubeconfig provider")
path, cleanup, err := e.Options.Kubeconfig.GetFile()
if err != nil {
return nil, nil, err
}
log.Debug("Filtering out documents that shouldn't be applied to kubernetes from document bundle")
bundle, err := e.ExecutorBundle.SelectBundle(document.NewDeployToK8sSelector())
if err != nil {
cleanup()
return nil, nil, err
}
// set up cleanup only if all calls up to here were successful
e.cleanup = cleanup
log.Debugf("Using kubeconfig at '%s' and context '%s'", path, context)
factory := utils.FactoryFromKubeConfig(path, context)
return NewApplier(ch, factory), bundle, nil
}
// Validate document set
func (e *Executor) Validate() error {
return errors.ErrNotImplemented{}
}
// Render document set
func (e *Executor) Render(w io.Writer, o ifc.RenderOptions) error {
bundle, err := e.ExecutorBundle.SelectBundle(o.FilterSelector)
if err != nil {
return err
}
return bundle.Write(w)
}

View File

@@ -1,300 +0,0 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package applier_test
import (
"bytes"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"opendev.org/airship/airshipctl/pkg/api/v1alpha1"
"opendev.org/airship/airshipctl/pkg/cluster/clustermap"
"opendev.org/airship/airshipctl/pkg/config"
"opendev.org/airship/airshipctl/pkg/document"
"opendev.org/airship/airshipctl/pkg/events"
"opendev.org/airship/airshipctl/pkg/fs"
"opendev.org/airship/airshipctl/pkg/k8s/applier"
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
"opendev.org/airship/airshipctl/pkg/k8s/utils"
"opendev.org/airship/airshipctl/pkg/phase"
"opendev.org/airship/airshipctl/pkg/phase/ifc"
testfs "opendev.org/airship/airshipctl/testutil/fs"
)
const (
ValidExecutorDoc = `apiVersion: airshipit.org/v1alpha1
kind: KubernetesApply
metadata:
labels:
airshipit.org/deploy-k8s: "false"
name: kubernetes-apply
config:
waitOptions:
timeout: 600
pruneOptions:
prune: false
`
ValidExecutorDocNamespaced = `apiVersion: airshipit.org/v1alpha1
kind: KubernetesApply
metadata:
labels:
airshipit.org/deploy-k8s: "false"
name: kubernetes-apply-namespaced
namespace: bundle
config:
waitOptions:
timeout: 600
pruneOptions:
prune: false
`
testValidKubeconfig = `apiVersion: v1
clusters:
- cluster:
certificate-authority-data: ca-data
server: https://10.0.1.7:6443
name: kubernetes_target
contexts:
- context:
cluster: kubernetes_target
user: kubernetes-admin
name: kubernetes-admin@kubernetes
current-context: ""
kind: Config
preferences: {}
users:
- name: kubernetes-admin
user:
client-certificate-data: cert-data
client-key-data: client-keydata
`
)
func TestNewExecutor(t *testing.T) {
tests := []struct {
name string
cfgDoc string
expectedErr string
helper ifc.Helper
kubeconf kubeconfig.Interface
bundleFactory document.BundleFactoryFunc
}{
{
name: "valid executor",
cfgDoc: ValidExecutorDoc,
kubeconf: testKubeconfig(testValidKubeconfig),
helper: makeDefaultHelper(t),
bundleFactory: testBundleFactory("testdata/source_bundle"),
},
{
name: "wrong config document",
cfgDoc: `apiVersion: v1
kind: ConfigMap
metadata:
name: first-map
namespace: default
labels:
cli-utils.sigs.k8s.io/inventory-id: "some id"`,
expectedErr: "wrong config document",
helper: makeDefaultHelper(t),
bundleFactory: testBundleFactory("testdata/source_bundle"),
},
{
name: "path to bundle does not exist",
cfgDoc: ValidExecutorDoc,
expectedErr: "no such file or directory",
kubeconf: testKubeconfig(testValidKubeconfig),
helper: makeDefaultHelper(t),
bundleFactory: testBundleFactory("does not exist"),
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
doc, err := document.NewDocumentFromBytes([]byte(tt.cfgDoc))
require.NoError(t, err)
require.NotNil(t, doc)
exec, err := applier.NewExecutor(
applier.ExecutorOptions{
ExecutorDocument: doc,
BundleFactory: tt.bundleFactory,
Kubeconfig: tt.kubeconf,
Helper: tt.helper,
})
if tt.expectedErr != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), "")
assert.Nil(t, exec)
} else {
require.NoError(t, err)
require.NotNil(t, exec)
}
})
}
}
// TODO We need valid test that checks that actual bundle has arrived to applier
// for that we need a way to inject fake applier, which is not doable with `black box` test currently
// since we tests are in different package from executor
func TestExecutorRun(t *testing.T) {
tests := []struct {
name string
containsErr string
clusterName string
kubeconf kubeconfig.Interface
execDoc document.Document
bundleFactory document.BundleFactoryFunc
helper ifc.Helper
clusterMap clustermap.ClusterMap
}{
{
name: "cant read kubeconfig error",
containsErr: "no such file or directory",
helper: makeDefaultHelper(t),
bundleFactory: testBundleFactory("testdata/source_bundle"),
kubeconf: testKubeconfig(`invalid kubeconfig`),
execDoc: toKubernetesApply(t, ValidExecutorDocNamespaced),
clusterName: "ephemeral-cluster",
clusterMap: clustermap.NewClusterMap(&v1alpha1.ClusterMap{
Map: map[string]*v1alpha1.Cluster{
"ephemeral-cluster": {},
},
}),
},
{
name: "error cluster not defined",
containsErr: "cluster is not defined in in cluster map",
helper: makeDefaultHelper(t),
bundleFactory: testBundleFactory("testdata/source_bundle"),
kubeconf: testKubeconfig(testValidKubeconfig),
execDoc: toKubernetesApply(t, ValidExecutorDocNamespaced),
clusterMap: clustermap.NewClusterMap(v1alpha1.DefaultClusterMap()),
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
exec, err := applier.NewExecutor(
applier.ExecutorOptions{
ExecutorDocument: tt.execDoc,
Helper: tt.helper,
BundleFactory: tt.bundleFactory,
Kubeconfig: tt.kubeconf,
ClusterMap: tt.clusterMap,
ClusterName: tt.clusterName,
})
if tt.name == "Nil bundle provided" {
require.Error(t, err)
assert.Contains(t, err.Error(), tt.containsErr)
} else {
require.NoError(t, err)
require.NotNil(t, exec)
ch := make(chan events.Event)
go exec.Run(ch, ifc.RunOptions{})
processor := events.NewDefaultProcessor(utils.Streams())
err = processor.Process(ch)
if tt.containsErr != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), tt.containsErr)
} else {
assert.NoError(t, err)
}
}
})
}
}
func TestRender(t *testing.T) {
execDoc, err := document.NewDocumentFromBytes([]byte(ValidExecutorDoc))
require.NoError(t, err)
require.NotNil(t, execDoc)
exec, err := applier.NewExecutor(applier.ExecutorOptions{
BundleFactory: testBundleFactory("testdata/source_bundle"),
ExecutorDocument: execDoc,
})
require.NoError(t, err)
require.NotNil(t, exec)
writerReader := bytes.NewBuffer([]byte{})
err = exec.Render(writerReader, ifc.RenderOptions{})
require.NoError(t, err)
result := writerReader.String()
assert.Contains(t, result, "ReplicationController")
}
func makeDefaultHelper(t *testing.T) ifc.Helper {
t.Helper()
conf := &config.Config{
CurrentContext: "default",
Contexts: map[string]*config.Context{
"default": {
Manifest: "default-manifest",
},
},
Manifests: map[string]*config.Manifest{
"default-manifest": {
MetadataPath: "metadata.yaml",
TargetPath: "testdata",
PhaseRepositoryName: config.DefaultTestPhaseRepo,
Repositories: map[string]*config.Repository{
config.DefaultTestPhaseRepo: {
URLString: "",
},
},
},
},
}
helper, err := phase.NewHelper(conf)
require.NoError(t, err)
require.NotNil(t, helper)
return helper
}
// toKubernetesApply converts string to document object
func toKubernetesApply(t *testing.T, s string) document.Document {
doc, err := document.NewDocumentFromBytes([]byte(s))
require.NoError(t, err)
require.NotNil(t, doc)
return doc
}
func testKubeconfig(stringData string) kubeconfig.Interface {
return kubeconfig.NewKubeConfig(
kubeconfig.FromByte([]byte(stringData)),
kubeconfig.InjectFileSystem(
testfs.MockFileSystem{
MockTempFile: func(root, pattern string) (fs.File, error) {
return testfs.TestFile{
MockName: func() string { return "kubeconfig-142398" },
MockWrite: func() (int, error) { return 0, nil },
MockClose: func() error { return nil },
}, nil
},
MockRemoveAll: func() error { return nil },
},
))
}
func testBundleFactory(path string) document.BundleFactoryFunc {
return func() (document.Bundle, error) {
return document.NewBundleByPath(path)
}
}