Modify executor interface to return channels

Change executors interface to return event channels and allow to
to set event processors. In further development this will allow better
visibility in UI or in command line, also opens the room for concurency.

Also add kubeconfig provider interface that will return kubeconfig.File
object to provide uniform way of accessing kubeconfig

Change-Id: I47bf9409a9b4286905cd9fc4ce172ee33e16dfa6
This commit is contained in:
Kostiantyn Kalynovskyi 2020-08-03 23:14:42 -05:00
parent 7c6f727783
commit 696b287552
5 changed files with 97 additions and 23 deletions

View File

@ -29,6 +29,8 @@ const (
ErrorType
// StatusPollerType event produced by status poller
StatusPollerType
// WaitType is event emitted when airshipctl is waiting for something
WaitType
)
// Event holds all possible events that can be produced by airship

View File

@ -53,6 +53,8 @@ func (p *DefaultProcessor) Process(ch <-chan Event) error {
p.errors = append(p.errors, e.ErrorEvent.Error)
case StatusPollerType:
log.Fatalf("Processing for status poller events are not yet implemented")
case WaitType:
log.Fatalf("Processing for wait events are not yet implemented")
default:
log.Fatalf("Unknown event type received: %d", e.Type)
}

View File

@ -0,0 +1,31 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeconfig
// File determines where kubeconfig is located on file system and which context to use
type File struct {
Path string
Context string
}
// Provider interface allows to get kubeconfig file path and context based on cluster type
type Provider interface {
// If clusterType is an empty string it means that caller is not aware then default cluster type will be used
// default cluster type maybe different for different provider implementations, for example if we are providing
// kubeconfig file for a phase then phase may be bound to ephemeral or target cluster type then defaults will be
// ephemeral or target respectively.
Get(clusterType string) (File, error)
Cleanup() error
}

View File

@ -16,17 +16,36 @@ package ifc
import (
"io"
"time"
"opendev.org/airship/airshipctl/pkg/document"
"opendev.org/airship/airshipctl/pkg/environment"
"opendev.org/airship/airshipctl/pkg/events"
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
)
// Executor interface should be implemented by each runner
type Executor interface {
Run(dryrun, debug, wait bool) error
Render(io.Writer) error
Run(RunOptions) <-chan events.Event
Render(io.Writer, RenderOptions) error
Validate() error
Wait() error
Wait(WaitOptions) <-chan events.Event
}
// RunOptions holds options for run method
type RunOptions struct {
Debug bool
DryRun bool
Timeout time.Duration
}
// RenderOptions is empty for now, but may hold things like format in future
type RenderOptions struct{}
// WaitOptions holds only timeout now, but may be extended in the future
type WaitOptions struct {
Timeout time.Duration
}
// ExecutorFactory for executor instantiation
@ -39,4 +58,5 @@ type ExecutorFactory func(
document.Document,
document.Bundle,
*environment.AirshipCTLSettings,
kubeconfig.Provider,
) (Executor, error)

View File

@ -15,7 +15,6 @@
package phase
import (
"fmt"
"path/filepath"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -24,31 +23,50 @@ import (
airshipv1 "opendev.org/airship/airshipctl/pkg/api/v1alpha1"
"opendev.org/airship/airshipctl/pkg/document"
"opendev.org/airship/airshipctl/pkg/environment"
"opendev.org/airship/airshipctl/pkg/events"
"opendev.org/airship/airshipctl/pkg/log"
"opendev.org/airship/airshipctl/pkg/phase/ifc"
)
var (
// ExecutorRegistry contins registered runner factories
ExecutorRegistry = make(map[schema.GroupVersionKind]ifc.ExecutorFactory)
)
// ExecutorRegistry returns map with executor factories
type ExecutorRegistry func() map[schema.GroupVersionKind]ifc.ExecutorFactory
// DefaultExecutorRegistry returns map with executor factories
func DefaultExecutorRegistry() map[schema.GroupVersionKind]ifc.ExecutorFactory {
execMap := make(map[schema.GroupVersionKind]ifc.ExecutorFactory)
// add executors here
return execMap
}
// Cmd object to work with phase api
type Cmd struct {
*environment.AirshipCTLSettings
DryRun bool
Registry ExecutorRegistry
// Will be used to get processor based on executor action
Processor events.EventProcessor
*environment.AirshipCTLSettings
}
func (p *Cmd) getBundle() (document.Bundle, error) {
ccm, err := p.Config.CurrentContextManifest()
tp, err := p.AirshipCTLSettings.Config.CurrentContextTargetPath()
if err != nil {
return nil, err
}
fmt.Printf("Target path is: %s", filepath.Join(ccm.TargetPath))
meta, err := p.Config.CurrentContextManifestMetadata()
if err != nil {
return nil, err
}
return document.NewBundleByPath(filepath.Join(ccm.TargetPath, meta.PhaseMeta.Path))
log.Debugf("Building phase bundle from path %s", tp)
return document.NewBundleByPath(filepath.Join(tp, meta.PhaseMeta.Path))
}
func (p *Cmd) getPhaseExecutor(name string) (ifc.Executor, error) {
phaseConfig, err := p.GetPhase(name)
if err != nil {
return nil, err
}
return p.GetExecutor(phaseConfig)
}
// GetPhase returns particular phase object identified by name
@ -105,28 +123,29 @@ func (p *Cmd) GetExecutor(phase *airshipv1.Phase) (ifc.Executor, error) {
if err != nil {
return nil, err
}
if p.Registry == nil {
p.Registry = DefaultExecutorRegistry
}
// Look for executor factory defined in registry
executorFactory, found := ExecutorRegistry[refGVK]
executorFactory, found := p.Registry()[refGVK]
if !found {
return nil, ErrExecutorNotFound{GVK: refGVK}
}
return executorFactory(doc, executorDocBundle, p.AirshipCTLSettings)
// When https://review.opendev.org/#/c/744382 add provider from there.
return executorFactory(doc, executorDocBundle, p.AirshipCTLSettings, nil)
}
// Exec particular phase
func (p *Cmd) Exec(name string) error {
phaseConfig, err := p.GetPhase(name)
executor, err := p.getPhaseExecutor(name)
if err != nil {
return err
}
executor, err := p.GetExecutor(phaseConfig)
if err != nil {
return err
}
return executor.Run(p.DryRun, p.Debug, true)
ch := executor.Run(ifc.RunOptions{
Debug: p.AirshipCTLSettings.Debug,
DryRun: p.DryRun,
})
return p.Processor.Process(ch)
}
// Plan shows available phase names