diff --git a/cmd/phase/run.go b/cmd/phase/run.go index bac2b5f5d..01f20793c 100644 --- a/cmd/phase/run.go +++ b/cmd/phase/run.go @@ -58,6 +58,5 @@ func NewRunCommand(rootSettings *environment.AirshipCTLSettings) *cobra.Command "dry-run", false, "simulate phase execution") - // TODO add kubeconfig flags when https://review.opendev.org/#/c/744382 is merged return runCmd } diff --git a/pkg/phase/ifc/executor.go b/pkg/phase/ifc/executor.go index 3b946b11e..01bb99e31 100644 --- a/pkg/phase/ifc/executor.go +++ b/pkg/phase/ifc/executor.go @@ -26,10 +26,9 @@ import ( // Executor interface should be implemented by each runner type Executor interface { - Run(RunOptions) <-chan events.Event + Run(<-chan events.Event, RunOptions) Render(io.Writer, RenderOptions) error Validate() error - Wait(WaitOptions) <-chan events.Event } // RunOptions holds options for run method @@ -49,14 +48,15 @@ type WaitOptions struct { } // ExecutorFactory for executor instantiation -// First argument is document object which represents executor -// configuration. -// Second argument is document bundle used by executor. -// Third argument airship configuration settings since each phase -// has to be aware of execution context and global settings -type ExecutorFactory func( - document.Document, - document.Bundle, - *environment.AirshipCTLSettings, - kubeconfig.Interface, -) (Executor, error) +type ExecutorFactory func(config ExecutorConfig) (Executor, error) + +// ExecutorConfig container to store all executor options +type ExecutorConfig struct { + PhaseName string + ClusterName string + + ExecutorDocument document.Document + ExecutorBundle document.Bundle + AirshipSettings *environment.AirshipCTLSettings + KubeConfig kubeconfig.Interface +} diff --git a/pkg/phase/phase.go b/pkg/phase/phase.go index edf83bf4c..3659d26d9 100644 --- a/pkg/phase/phase.go +++ b/pkg/phase/phase.go @@ -21,11 +21,15 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" airshipv1 "opendev.org/airship/airshipctl/pkg/api/v1alpha1" + "opendev.org/airship/airshipctl/pkg/config" "opendev.org/airship/airshipctl/pkg/document" "opendev.org/airship/airshipctl/pkg/environment" "opendev.org/airship/airshipctl/pkg/events" + "opendev.org/airship/airshipctl/pkg/k8s/kubeconfig" + k8sutils "opendev.org/airship/airshipctl/pkg/k8s/utils" "opendev.org/airship/airshipctl/pkg/log" "opendev.org/airship/airshipctl/pkg/phase/ifc" + "opendev.org/airship/airshipctl/pkg/util" ) // ExecutorRegistry returns map with executor factories @@ -109,7 +113,7 @@ func (p *Cmd) GetExecutor(phase *airshipv1.Phase) (ifc.Executor, error) { ByGvk(refGVK.Group, refGVK.Version, refGVK.Kind). ByName(phaseConfig.ExecutorRef.Name). ByNamespace(phaseConfig.ExecutorRef.Namespace) - doc, err := bundle.SelectOne(selector) + executorDoc, err := bundle.SelectOne(selector) if err != nil { return nil, err } @@ -131,21 +135,43 @@ func (p *Cmd) GetExecutor(phase *airshipv1.Phase) (ifc.Executor, error) { if !found { return nil, ErrExecutorNotFound{GVK: refGVK} } - // When https://review.opendev.org/#/c/744382 add provider from there. - return executorFactory(doc, executorDocBundle, p.AirshipCTLSettings, nil) + + kubeConfPath := p.AirshipCTLSettings.Config.KubeConfigPath() + homeDir := util.UserHomeDir() + workDir := filepath.Join(homeDir, config.AirshipConfigDir) + fs := document.NewDocumentFs() + source := kubeconfig.FromFile(kubeConfPath, fs) + fileOption := kubeconfig.InjectFilePath(kubeConfPath, fs) + tempRootOption := kubeconfig.InjectTempRoot(workDir) + kubeConfig := kubeconfig.NewKubeConfig(source, fileOption, tempRootOption) + + // TODO add function to decide on how to build kubeconfig instead of hardcoding it here, + // when more kubeconfigs sources are available. + return executorFactory(ifc.ExecutorConfig{ + ExecutorBundle: executorDocBundle, + PhaseName: phase.Name, + ExecutorDocument: executorDoc, + AirshipSettings: p.AirshipCTLSettings, + KubeConfig: kubeConfig, + }) } -// Exec particular phase +// Exec starts executor goroutine and processes the events func (p *Cmd) Exec(name string) error { - executor, err := p.getPhaseExecutor(name) - if err != nil { - return err - } - ch := executor.Run(ifc.RunOptions{ - Debug: p.AirshipCTLSettings.Debug, - DryRun: p.DryRun, - }) - return p.Processor.Process(ch) + runCh := make(chan events.Event) + processor := events.NewDefaultProcessor(k8sutils.Streams()) + go func() { + executor, err := p.getPhaseExecutor(name) + if err != nil { + handleError(err, runCh) + return + } + executor.Run(runCh, ifc.RunOptions{ + Debug: p.Debug, + DryRun: p.DryRun, + }) + }() + return processor.Process(runCh) } // Plan shows available phase names @@ -178,3 +204,13 @@ func (p *Cmd) Plan() (map[string][]string, error) { } return result, nil } + +func handleError(err error, ch chan events.Event) { + ch <- events.Event{ + Type: events.ErrorType, + ErrorEvent: events.ErrorEvent{ + Error: err, + }, + } + close(ch) +}