From 2f1b520ab29efae4a4886c069bb405c44bc9f1dc Mon Sep 17 00:00:00 2001 From: Kostiantyn Kalynovskyi Date: Mon, 17 Aug 2020 23:34:35 -0500 Subject: [PATCH] Change executor interface to accept channels Now Run method will accept channels that they will write events to, this will allow us to pass channels we want to them, instead of execting them to return us a channel. Wait method is removed from interface for now, as it needs more design. Change-Id: Ibd47dfe49172f537b79bcc8f83f7c76aece8e862 --- cmd/phase/run.go | 1 - pkg/phase/ifc/executor.go | 26 ++++++++-------- pkg/phase/phase.go | 62 +++++++++++++++++++++++++++++++-------- 3 files changed, 62 insertions(+), 27 deletions(-) diff --git a/cmd/phase/run.go b/cmd/phase/run.go index bac2b5f5d..01f20793c 100644 --- a/cmd/phase/run.go +++ b/cmd/phase/run.go @@ -58,6 +58,5 @@ func NewRunCommand(rootSettings *environment.AirshipCTLSettings) *cobra.Command "dry-run", false, "simulate phase execution") - // TODO add kubeconfig flags when https://review.opendev.org/#/c/744382 is merged return runCmd } diff --git a/pkg/phase/ifc/executor.go b/pkg/phase/ifc/executor.go index 3b946b11e..01bb99e31 100644 --- a/pkg/phase/ifc/executor.go +++ b/pkg/phase/ifc/executor.go @@ -26,10 +26,9 @@ import ( // Executor interface should be implemented by each runner type Executor interface { - Run(RunOptions) <-chan events.Event + Run(<-chan events.Event, RunOptions) Render(io.Writer, RenderOptions) error Validate() error - Wait(WaitOptions) <-chan events.Event } // RunOptions holds options for run method @@ -49,14 +48,15 @@ type WaitOptions struct { } // ExecutorFactory for executor instantiation -// First argument is document object which represents executor -// configuration. -// Second argument is document bundle used by executor. -// Third argument airship configuration settings since each phase -// has to be aware of execution context and global settings -type ExecutorFactory func( - document.Document, - document.Bundle, - *environment.AirshipCTLSettings, - kubeconfig.Interface, -) (Executor, error) +type ExecutorFactory func(config ExecutorConfig) (Executor, error) + +// ExecutorConfig container to store all executor options +type ExecutorConfig struct { + PhaseName string + ClusterName string + + ExecutorDocument document.Document + ExecutorBundle document.Bundle + AirshipSettings *environment.AirshipCTLSettings + KubeConfig kubeconfig.Interface +} diff --git a/pkg/phase/phase.go b/pkg/phase/phase.go index edf83bf4c..3659d26d9 100644 --- a/pkg/phase/phase.go +++ b/pkg/phase/phase.go @@ -21,11 +21,15 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" airshipv1 "opendev.org/airship/airshipctl/pkg/api/v1alpha1" + "opendev.org/airship/airshipctl/pkg/config" "opendev.org/airship/airshipctl/pkg/document" "opendev.org/airship/airshipctl/pkg/environment" "opendev.org/airship/airshipctl/pkg/events" + "opendev.org/airship/airshipctl/pkg/k8s/kubeconfig" + k8sutils "opendev.org/airship/airshipctl/pkg/k8s/utils" "opendev.org/airship/airshipctl/pkg/log" "opendev.org/airship/airshipctl/pkg/phase/ifc" + "opendev.org/airship/airshipctl/pkg/util" ) // ExecutorRegistry returns map with executor factories @@ -109,7 +113,7 @@ func (p *Cmd) GetExecutor(phase *airshipv1.Phase) (ifc.Executor, error) { ByGvk(refGVK.Group, refGVK.Version, refGVK.Kind). ByName(phaseConfig.ExecutorRef.Name). ByNamespace(phaseConfig.ExecutorRef.Namespace) - doc, err := bundle.SelectOne(selector) + executorDoc, err := bundle.SelectOne(selector) if err != nil { return nil, err } @@ -131,21 +135,43 @@ func (p *Cmd) GetExecutor(phase *airshipv1.Phase) (ifc.Executor, error) { if !found { return nil, ErrExecutorNotFound{GVK: refGVK} } - // When https://review.opendev.org/#/c/744382 add provider from there. - return executorFactory(doc, executorDocBundle, p.AirshipCTLSettings, nil) + + kubeConfPath := p.AirshipCTLSettings.Config.KubeConfigPath() + homeDir := util.UserHomeDir() + workDir := filepath.Join(homeDir, config.AirshipConfigDir) + fs := document.NewDocumentFs() + source := kubeconfig.FromFile(kubeConfPath, fs) + fileOption := kubeconfig.InjectFilePath(kubeConfPath, fs) + tempRootOption := kubeconfig.InjectTempRoot(workDir) + kubeConfig := kubeconfig.NewKubeConfig(source, fileOption, tempRootOption) + + // TODO add function to decide on how to build kubeconfig instead of hardcoding it here, + // when more kubeconfigs sources are available. + return executorFactory(ifc.ExecutorConfig{ + ExecutorBundle: executorDocBundle, + PhaseName: phase.Name, + ExecutorDocument: executorDoc, + AirshipSettings: p.AirshipCTLSettings, + KubeConfig: kubeConfig, + }) } -// Exec particular phase +// Exec starts executor goroutine and processes the events func (p *Cmd) Exec(name string) error { - executor, err := p.getPhaseExecutor(name) - if err != nil { - return err - } - ch := executor.Run(ifc.RunOptions{ - Debug: p.AirshipCTLSettings.Debug, - DryRun: p.DryRun, - }) - return p.Processor.Process(ch) + runCh := make(chan events.Event) + processor := events.NewDefaultProcessor(k8sutils.Streams()) + go func() { + executor, err := p.getPhaseExecutor(name) + if err != nil { + handleError(err, runCh) + return + } + executor.Run(runCh, ifc.RunOptions{ + Debug: p.Debug, + DryRun: p.DryRun, + }) + }() + return processor.Process(runCh) } // Plan shows available phase names @@ -178,3 +204,13 @@ func (p *Cmd) Plan() (map[string][]string, error) { } return result, nil } + +func handleError(err error, ch chan events.Event) { + ch <- events.Event{ + Type: events.ErrorType, + ErrorEvent: events.ErrorEvent{ + Error: err, + }, + } + close(ch) +}