Merge "Remove events module"
This commit is contained in:
commit
d718d08c20
@ -1,29 +0,0 @@
|
||||
/*
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
https://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// ErrEventReceived returned for not implemented features
|
||||
type ErrEventReceived struct {
|
||||
Errors []error
|
||||
}
|
||||
|
||||
func (e ErrEventReceived) Error() string {
|
||||
// TODO make printing more readable here
|
||||
return fmt.Sprintf("Error events received on channel, errors are:\n%v", e.Errors)
|
||||
}
|
@ -1,263 +0,0 @@
|
||||
/*
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
https://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Type indicates type of the event
|
||||
type Type int
|
||||
|
||||
const (
|
||||
// ApplierType is event that are produced by applier
|
||||
ApplierType Type = iota
|
||||
// ErrorType means that even is of error type
|
||||
ErrorType
|
||||
// StatusPollerType event produced by status poller
|
||||
StatusPollerType
|
||||
// WaitType is event emitted when airshipctl is waiting for something
|
||||
WaitType
|
||||
// ClusterctlType event emitted by Clusterctl executor
|
||||
ClusterctlType
|
||||
// BootstrapType event emitted by Bootstrap executor
|
||||
BootstrapType
|
||||
// GenericContainerType event emitted by GenericContainer
|
||||
GenericContainerType
|
||||
// BaremetalManagerEventType event emitted by BaremetalManager
|
||||
BaremetalManagerEventType
|
||||
)
|
||||
|
||||
// Event holds all possible events that can be produced by airship
|
||||
type Event struct {
|
||||
Type Type
|
||||
Timestamp time.Time
|
||||
ApplierEvent ApplierEvent
|
||||
ErrorEvent ErrorEvent
|
||||
ClusterctlEvent ClusterctlEvent
|
||||
BootstrapEvent BootstrapEvent
|
||||
GenericContainerEvent GenericContainerEvent
|
||||
BaremetalManagerEvent BaremetalManagerEvent
|
||||
}
|
||||
|
||||
// ApplierEvent is produced by k8s_apply executor
|
||||
type ApplierEvent struct {
|
||||
Operation string
|
||||
Message string
|
||||
}
|
||||
|
||||
//GenericEvent generalized type for custom events
|
||||
type GenericEvent struct {
|
||||
Type string
|
||||
Operation string
|
||||
Message string
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
var mapTypeToEvent = map[Type]string{
|
||||
ClusterctlType: "ClusterctlEvent",
|
||||
BootstrapType: "BootstrapEvent",
|
||||
GenericContainerType: "GenericContainerEvent",
|
||||
}
|
||||
|
||||
var unknownEventType = map[Type]string{
|
||||
ApplierType: "ApplierType",
|
||||
ErrorType: "ErrorType",
|
||||
StatusPollerType: "StatusPollerType",
|
||||
WaitType: "WaitType",
|
||||
}
|
||||
|
||||
var clusterctlOperationToString = map[ClusterctlOperation]string{
|
||||
ClusterctlInitStart: "ClusterctlInitStart",
|
||||
ClusterctlInitEnd: "ClusterctlInitEnd",
|
||||
ClusterctlMoveStart: "ClusterctlMoveStart",
|
||||
ClusterctlMoveEnd: "ClusterctlMoveEnd",
|
||||
}
|
||||
|
||||
var bootstrapOperationToString = map[BootstrapOperation]string{
|
||||
BootstrapStart: "BootstrapStart",
|
||||
BootstrapDryRun: "BootstrapDryRun",
|
||||
BootstrapValidation: "BootstrapValidation",
|
||||
BootstrapRun: "BootstrapRun",
|
||||
BootstrapEnd: "BootstrapEnd",
|
||||
}
|
||||
|
||||
var genericContainerOperationToString = map[GenericContainerOperation]string{
|
||||
GenericContainerStart: "GenericContainerStart",
|
||||
GenericContainerStop: "GenericContainerStop",
|
||||
}
|
||||
|
||||
var baremetalInventoryOperationToString = map[BaremetalManagerStep]string{
|
||||
BaremetalManagerStart: "BaremetalOperationStart",
|
||||
BaremetalManagerComplete: "BaremetalOperationComplete",
|
||||
}
|
||||
|
||||
//Normalize cast Event to GenericEvent type
|
||||
func Normalize(e Event) GenericEvent {
|
||||
var eventType string
|
||||
if t, exists := mapTypeToEvent[e.Type]; exists {
|
||||
eventType = t
|
||||
} else {
|
||||
eventType = fmt.Sprintf("Unknown event type: %v", unknownEventType[e.Type])
|
||||
}
|
||||
|
||||
var operation, message string
|
||||
switch e.Type {
|
||||
case ClusterctlType:
|
||||
operation = clusterctlOperationToString[e.ClusterctlEvent.Operation]
|
||||
message = e.ClusterctlEvent.Message
|
||||
case BootstrapType:
|
||||
operation = bootstrapOperationToString[e.BootstrapEvent.Operation]
|
||||
message = e.BootstrapEvent.Message
|
||||
case GenericContainerType:
|
||||
operation = genericContainerOperationToString[e.GenericContainerEvent.Operation]
|
||||
message = e.GenericContainerEvent.Message
|
||||
case BaremetalManagerEventType:
|
||||
operation = baremetalInventoryOperationToString[e.BaremetalManagerEvent.Step]
|
||||
message = e.BaremetalManagerEvent.Message
|
||||
}
|
||||
|
||||
return GenericEvent{
|
||||
Type: eventType,
|
||||
Operation: operation,
|
||||
Message: message,
|
||||
Timestamp: e.Timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
// NewEvent create new event with timestamp
|
||||
func NewEvent() Event {
|
||||
return Event{
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// ErrorEvent is produced when error is encountered
|
||||
type ErrorEvent struct {
|
||||
Error error
|
||||
}
|
||||
|
||||
// WithErrorEvent sets type and actual error event
|
||||
func (e Event) WithErrorEvent(event ErrorEvent) Event {
|
||||
e.Type = ErrorType
|
||||
e.ErrorEvent = event
|
||||
return e
|
||||
}
|
||||
|
||||
// ClusterctlOperation type
|
||||
type ClusterctlOperation int
|
||||
|
||||
const (
|
||||
// ClusterctlInitStart operation
|
||||
ClusterctlInitStart ClusterctlOperation = iota
|
||||
// ClusterctlInitEnd operation
|
||||
ClusterctlInitEnd
|
||||
// ClusterctlMoveStart operation
|
||||
ClusterctlMoveStart
|
||||
// ClusterctlMoveEnd operation
|
||||
ClusterctlMoveEnd
|
||||
)
|
||||
|
||||
// ClusterctlEvent is produced by clusterctl executor
|
||||
type ClusterctlEvent struct {
|
||||
Operation ClusterctlOperation
|
||||
Message string
|
||||
}
|
||||
|
||||
// WithClusterctlEvent sets type and actual clusterctl event
|
||||
func (e Event) WithClusterctlEvent(concreteEvent ClusterctlEvent) Event {
|
||||
e.Type = ClusterctlType
|
||||
e.ClusterctlEvent = concreteEvent
|
||||
return e
|
||||
}
|
||||
|
||||
// BootstrapOperation type
|
||||
type BootstrapOperation int
|
||||
|
||||
const (
|
||||
// BootstrapStart operation
|
||||
BootstrapStart BootstrapOperation = iota
|
||||
// BootstrapDryRun operation
|
||||
BootstrapDryRun
|
||||
// BootstrapValidation operation
|
||||
BootstrapValidation
|
||||
// BootstrapRun operation
|
||||
BootstrapRun
|
||||
// BootstrapEnd operation
|
||||
BootstrapEnd
|
||||
)
|
||||
|
||||
// BootstrapEvent needs to to track events in bootstrap executor
|
||||
type BootstrapEvent struct {
|
||||
Operation BootstrapOperation
|
||||
Message string
|
||||
}
|
||||
|
||||
// WithBootstrapEvent sets type and actual bootstrap event
|
||||
func (e Event) WithBootstrapEvent(concreteEvent BootstrapEvent) Event {
|
||||
e.Type = BootstrapType
|
||||
e.BootstrapEvent = concreteEvent
|
||||
return e
|
||||
}
|
||||
|
||||
// GenericContainerOperation type
|
||||
type GenericContainerOperation int
|
||||
|
||||
const (
|
||||
// GenericContainerStart operation
|
||||
GenericContainerStart GenericContainerOperation = iota
|
||||
// GenericContainerStop operation
|
||||
GenericContainerStop
|
||||
)
|
||||
|
||||
// GenericContainerEvent needs to to track events in GenericContainer executor
|
||||
type GenericContainerEvent struct {
|
||||
Operation GenericContainerOperation
|
||||
Message string
|
||||
}
|
||||
|
||||
// WithGenericContainerEvent sets type and actual GenericContainer event
|
||||
func (e Event) WithGenericContainerEvent(concreteEvent GenericContainerEvent) Event {
|
||||
e.Type = GenericContainerType
|
||||
e.GenericContainerEvent = concreteEvent
|
||||
return e
|
||||
}
|
||||
|
||||
// BaremetalManagerStep indicates what operation baremetal manager is currently peforming
|
||||
// Note that this is not baremetal
|
||||
type BaremetalManagerStep int
|
||||
|
||||
const (
|
||||
// BaremetalManagerStart operation
|
||||
BaremetalManagerStart BaremetalManagerStep = iota
|
||||
// BaremetalManagerComplete operation
|
||||
BaremetalManagerComplete
|
||||
)
|
||||
|
||||
// BaremetalManagerEvent event emitted by BaremetalManager
|
||||
type BaremetalManagerEvent struct {
|
||||
Step BaremetalManagerStep
|
||||
// HostOperation indicates which operation is performed against BMH Host
|
||||
HostOperation string
|
||||
Message string
|
||||
}
|
||||
|
||||
// WithBaremetalManagerEvent sets type and actual bootstrap event
|
||||
func (e Event) WithBaremetalManagerEvent(concreteEvent BaremetalManagerEvent) Event {
|
||||
e.Type = BaremetalManagerEventType
|
||||
e.BaremetalManagerEvent = concreteEvent
|
||||
return e
|
||||
}
|
@ -1,61 +0,0 @@
|
||||
/*
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
https://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package events_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
)
|
||||
|
||||
func TestNormalize(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
sourceEvent events.Event
|
||||
expectedEvent events.GenericEvent
|
||||
}{
|
||||
{
|
||||
name: "Unknow event type",
|
||||
sourceEvent: events.NewEvent().WithErrorEvent(events.ErrorEvent{}),
|
||||
expectedEvent: events.GenericEvent{
|
||||
Type: "Unknown event type: ErrorType",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Clusterctl event type",
|
||||
sourceEvent: events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
||||
Operation: events.ClusterctlInitStart,
|
||||
Message: "Clusterctl init start",
|
||||
}),
|
||||
expectedEvent: events.GenericEvent{
|
||||
Type: "ClusterctlEvent",
|
||||
Message: "Clusterctl init start",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ge := events.Normalize(tt.sourceEvent)
|
||||
assert.Equal(t, tt.expectedEvent.Type, ge.Type)
|
||||
if tt.expectedEvent.Type != "Unknown event type: ErrorType" {
|
||||
assert.Equal(t, tt.expectedEvent.Message, ge.Message)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -1,70 +0,0 @@
|
||||
/*
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
https://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
|
||||
"sigs.k8s.io/yaml"
|
||||
|
||||
"opendev.org/airship/airshipctl/pkg/log"
|
||||
)
|
||||
|
||||
const (
|
||||
// YAMLPrinter format for event printer output
|
||||
YAMLPrinter = "yaml"
|
||||
// JSONPrinter format for event printer output
|
||||
JSONPrinter = "json"
|
||||
)
|
||||
|
||||
// NewGenericPrinter returns event printer
|
||||
func NewGenericPrinter(writer io.Writer, formatterType string) GenericPrinter {
|
||||
var formatter func(o interface{}) ([]byte, error)
|
||||
switch formatterType {
|
||||
case YAMLPrinter:
|
||||
formatter = yaml.Marshal
|
||||
case JSONPrinter:
|
||||
formatter = json.Marshal
|
||||
default:
|
||||
log.Fatal("Event printer received wrong type of event formatter")
|
||||
}
|
||||
return GenericPrinter{
|
||||
formatter: formatter,
|
||||
writer: writer,
|
||||
}
|
||||
}
|
||||
|
||||
// GenericPrinter object represents event printer
|
||||
type GenericPrinter struct {
|
||||
formatter func(interface{}) ([]byte, error)
|
||||
writer io.Writer
|
||||
}
|
||||
|
||||
// PrintEvent write event details
|
||||
func (p GenericPrinter) PrintEvent(ge GenericEvent) error {
|
||||
data, err := p.formatter(map[string]interface{}{
|
||||
"Type": ge.Type,
|
||||
"Operation": ge.Operation,
|
||||
"Message": ge.Message,
|
||||
"Timestamp": ge.Timestamp,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data = append(data, '\n')
|
||||
_, err = p.writer.Write(data)
|
||||
return err
|
||||
}
|
@ -1,69 +0,0 @@
|
||||
/*
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
https://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package events_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
)
|
||||
|
||||
type fakeWriter struct {
|
||||
writeErr error
|
||||
}
|
||||
|
||||
var _ io.Writer = fakeWriter{}
|
||||
|
||||
func (f fakeWriter) Write(p []byte) (n int, err error) {
|
||||
return 0, f.writeErr
|
||||
}
|
||||
|
||||
func TestPrintEvent(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
formatterType string
|
||||
errString string
|
||||
writer io.Writer
|
||||
}{
|
||||
{
|
||||
name: "Fail on formatter type",
|
||||
formatterType: events.YAMLPrinter,
|
||||
errString: "error on write",
|
||||
writer: fakeWriter{
|
||||
writeErr: fmt.Errorf("error on write"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
p := events.NewGenericPrinter(tt.writer, tt.formatterType)
|
||||
e := events.NewEvent().WithGenericContainerEvent(events.GenericContainerEvent{
|
||||
Operation: events.GenericContainerStart,
|
||||
Message: "starting generic container generation",
|
||||
})
|
||||
ge := events.Normalize(e)
|
||||
err := p.PrintEvent(ge)
|
||||
if tt.errString != "" {
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), tt.errString)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -1,78 +0,0 @@
|
||||
/*
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
https://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"opendev.org/airship/airshipctl/pkg/log"
|
||||
)
|
||||
|
||||
// EventProcessor use to process event channels produced by executors
|
||||
type EventProcessor interface {
|
||||
Process(<-chan Event) error
|
||||
Close()
|
||||
}
|
||||
|
||||
// DefaultProcessor is implementation of EventProcessor
|
||||
type DefaultProcessor struct {
|
||||
errors []error
|
||||
genericPrinter GenericPrinter
|
||||
}
|
||||
|
||||
// NewDefaultProcessor returns instance of DefaultProcessor as interface Implementation
|
||||
func NewDefaultProcessor() EventProcessor {
|
||||
// printer for custom airshipctl events
|
||||
genericPrinter := NewGenericPrinter(log.Writer(), JSONPrinter)
|
||||
return &DefaultProcessor{
|
||||
errors: []error{},
|
||||
genericPrinter: genericPrinter,
|
||||
}
|
||||
}
|
||||
|
||||
// Process is implementation of EventProcessor
|
||||
func (p *DefaultProcessor) Process(ch <-chan Event) error {
|
||||
for e := range ch {
|
||||
switch e.Type {
|
||||
case ErrorType:
|
||||
log.Printf("Received error on event channel %v", e.ErrorEvent)
|
||||
p.errors = append(p.errors, e.ErrorEvent.Error)
|
||||
case StatusPollerType:
|
||||
log.Fatalf("Processing for status poller events are not yet implemented")
|
||||
case WaitType:
|
||||
log.Fatalf("Processing for wait events are not yet implemented")
|
||||
default:
|
||||
ge := Normalize(e)
|
||||
err := p.genericPrinter.PrintEvent(ge)
|
||||
if err != nil {
|
||||
p.errors = append(p.errors, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return checkErrors(p.errors)
|
||||
}
|
||||
|
||||
// Close cleans up the auxiliary channels used to process events
|
||||
func (p *DefaultProcessor) Close() {
|
||||
}
|
||||
|
||||
// Check list of errors, and verify that these errors we are able to tolerate
|
||||
// currently we simply check if the list is empty or not
|
||||
func checkErrors(errs []error) error {
|
||||
if len(errs) != 0 {
|
||||
return ErrEventReceived{
|
||||
Errors: errs,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,104 +0,0 @@
|
||||
/*
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
https://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package events_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
k8stest "opendev.org/airship/airshipctl/testutil/k8sutils"
|
||||
)
|
||||
|
||||
func TestDefaultProcessor(t *testing.T) {
|
||||
proc := events.NewDefaultProcessor()
|
||||
defer proc.Close()
|
||||
tests := []struct {
|
||||
name string
|
||||
events []events.Event
|
||||
errString string
|
||||
}{
|
||||
{
|
||||
name: "success",
|
||||
events: successEvents(),
|
||||
},
|
||||
{
|
||||
name: "error event",
|
||||
events: errEvents(),
|
||||
errString: "somerror",
|
||||
},
|
||||
{
|
||||
name: "error event",
|
||||
events: errApplyEvents(),
|
||||
errString: "somerror",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ch := make(chan events.Event, len(tt.events))
|
||||
for _, e := range tt.events {
|
||||
ch <- e
|
||||
}
|
||||
close(ch)
|
||||
err := proc.Process(ch)
|
||||
if tt.errString != "" {
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), tt.errString)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func successEvents() []events.Event {
|
||||
applyEvents := k8stest.SuccessEvents()
|
||||
airEvents := []events.Event{}
|
||||
for _, e := range applyEvents {
|
||||
airEvents = append(airEvents, events.Event{
|
||||
Type: events.ApplierType,
|
||||
ApplierEvent: e,
|
||||
})
|
||||
}
|
||||
return airEvents
|
||||
}
|
||||
|
||||
func errEvents() []events.Event {
|
||||
return []events.Event{
|
||||
{
|
||||
Type: events.ErrorType,
|
||||
ErrorEvent: events.ErrorEvent{
|
||||
Error: fmt.Errorf("somerror"),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func errApplyEvents() []events.Event {
|
||||
errApplyEvents := k8stest.ErrorEvents()
|
||||
airEvents := []events.Event{}
|
||||
for _, e := range errApplyEvents {
|
||||
airEvents = append(airEvents, events.Event{
|
||||
Type: events.ApplierType,
|
||||
ApplierEvent: e,
|
||||
})
|
||||
}
|
||||
return airEvents
|
||||
}
|
@ -26,7 +26,6 @@ import (
|
||||
"opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
||||
"opendev.org/airship/airshipctl/pkg/container"
|
||||
"opendev.org/airship/airshipctl/pkg/document"
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
||||
"opendev.org/airship/airshipctl/pkg/log"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/errors"
|
||||
@ -56,10 +55,9 @@ var _ ifc.Phase = &phase{}
|
||||
|
||||
// Phase implements phase interface
|
||||
type phase struct {
|
||||
helper ifc.Helper
|
||||
apiObj *v1alpha1.Phase
|
||||
registry ExecutorRegistry
|
||||
processor events.EventProcessor
|
||||
helper ifc.Helper
|
||||
apiObj *v1alpha1.Phase
|
||||
registry ExecutorRegistry
|
||||
}
|
||||
|
||||
// Executor returns executor interface associated with the phase
|
||||
@ -111,17 +109,12 @@ func (p *phase) Executor() (ifc.Executor, error) {
|
||||
|
||||
// Run runs the phase via executor
|
||||
func (p *phase) Run(ro ifc.RunOptions) error {
|
||||
defer p.processor.Close()
|
||||
executor, err := p.Executor()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ch := make(chan events.Event)
|
||||
|
||||
go func() {
|
||||
executor.Run(ch, ro)
|
||||
}()
|
||||
return p.processor.Process(ch)
|
||||
return executor.Run(ro)
|
||||
}
|
||||
|
||||
// Validate makes sure that phase is properly configured
|
||||
@ -304,13 +297,9 @@ var _ ifc.Client = &client{}
|
||||
type client struct {
|
||||
ifc.Helper
|
||||
|
||||
registry ExecutorRegistry
|
||||
processorFunc ProcessorFunc
|
||||
registry ExecutorRegistry
|
||||
}
|
||||
|
||||
// ProcessorFunc that returns processor interface
|
||||
type ProcessorFunc func() events.EventProcessor
|
||||
|
||||
// Option allows to add various options to a phase
|
||||
type Option func(*client)
|
||||
|
||||
@ -330,9 +319,6 @@ func NewClient(helper ifc.Helper, opts ...Option) ifc.Client {
|
||||
if c.registry == nil {
|
||||
c.registry = DefaultExecutorRegistry
|
||||
}
|
||||
if c.processorFunc == nil {
|
||||
c.processorFunc = defaultProcessor
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
@ -343,10 +329,9 @@ func (c *client) PhaseByID(id ifc.ID) (ifc.Phase, error) {
|
||||
}
|
||||
|
||||
phase := &phase{
|
||||
apiObj: phaseObj,
|
||||
helper: c.Helper,
|
||||
processor: c.processorFunc(),
|
||||
registry: c.registry,
|
||||
apiObj: phaseObj,
|
||||
helper: c.Helper,
|
||||
registry: c.registry,
|
||||
}
|
||||
return phase, nil
|
||||
}
|
||||
@ -365,14 +350,9 @@ func (c *client) PlanByID(id ifc.ID) (ifc.Plan, error) {
|
||||
|
||||
func (c *client) PhaseByAPIObj(phaseObj *v1alpha1.Phase) (ifc.Phase, error) {
|
||||
phase := &phase{
|
||||
apiObj: phaseObj,
|
||||
helper: c.Helper,
|
||||
processor: c.processorFunc(),
|
||||
registry: c.registry,
|
||||
apiObj: phaseObj,
|
||||
helper: c.Helper,
|
||||
registry: c.registry,
|
||||
}
|
||||
return phase, nil
|
||||
}
|
||||
|
||||
func defaultProcessor() events.EventProcessor {
|
||||
return events.NewDefaultProcessor()
|
||||
}
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
|
||||
"opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
||||
"opendev.org/airship/airshipctl/pkg/config"
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
"opendev.org/airship/airshipctl/pkg/phase"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/errors"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
||||
@ -514,8 +513,8 @@ func (e fakeExecutor) Render(_ io.Writer, _ ifc.RenderOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e fakeExecutor) Run(ch chan events.Event, _ ifc.RunOptions) {
|
||||
defer close(ch)
|
||||
func (e fakeExecutor) Run(_ ifc.RunOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e fakeExecutor) Validate() error {
|
||||
|
@ -22,9 +22,9 @@ import (
|
||||
"opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
||||
airshipv1 "opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
||||
commonerrors "opendev.org/airship/airshipctl/pkg/errors"
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
"opendev.org/airship/airshipctl/pkg/inventory"
|
||||
inventoryifc "opendev.org/airship/airshipctl/pkg/inventory/ifc"
|
||||
"opendev.org/airship/airshipctl/pkg/log"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/executors/errors"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
||||
)
|
||||
@ -48,21 +48,14 @@ func NewBaremetalExecutor(cfg ifc.ExecutorConfig) (ifc.Executor, error) {
|
||||
}
|
||||
|
||||
// Run runs baremetal operations as executor
|
||||
func (e *BaremetalManagerExecutor) Run(evtCh chan events.Event, opts ifc.RunOptions) {
|
||||
defer close(evtCh)
|
||||
func (e *BaremetalManagerExecutor) Run(opts ifc.RunOptions) error {
|
||||
commandOptions := toCommandOptions(e.inventory, e.options.Spec, opts)
|
||||
|
||||
evtCh <- events.NewEvent().WithBaremetalManagerEvent(events.BaremetalManagerEvent{
|
||||
Step: events.BaremetalManagerStart,
|
||||
HostOperation: string(e.options.Spec.Operation),
|
||||
Message: fmt.Sprintf("Starting remote operation '%s', selector to be to filter hosts %v",
|
||||
e.options.Spec.Operation, e.options.Spec.HostSelector),
|
||||
})
|
||||
|
||||
log.Print(fmt.Sprintf("Starting remote operation '%s', selector to be to filter hosts %v",
|
||||
e.options.Spec.Operation, e.options.Spec.HostSelector))
|
||||
op, err := e.validate()
|
||||
if err != nil {
|
||||
handleError(evtCh, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
if !opts.DryRun {
|
||||
switch e.options.Spec.Operation {
|
||||
@ -75,16 +68,12 @@ func (e *BaremetalManagerExecutor) Run(evtCh chan events.Event, opts ifc.RunOpti
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
handleError(evtCh, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
evtCh <- events.NewEvent().WithBaremetalManagerEvent(events.BaremetalManagerEvent{
|
||||
Step: events.BaremetalManagerComplete,
|
||||
HostOperation: string(e.options.Spec.Operation),
|
||||
Message: fmt.Sprintf("Successfully completed operation against host selected by selector %v",
|
||||
e.options.Spec.HostSelector),
|
||||
})
|
||||
log.Print(fmt.Sprintf("Successfully completed operation against host selected by selector %v",
|
||||
e.options.Spec.HostSelector))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Validate executor configuration and documents
|
||||
|
@ -26,7 +26,6 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"opendev.org/airship/airshipctl/pkg/document"
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
inventoryifc "opendev.org/airship/airshipctl/pkg/inventory/ifc"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
||||
@ -146,13 +145,7 @@ func TestBMHExecutorRun(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, executor)
|
||||
ch := make(chan events.Event)
|
||||
go func() {
|
||||
executor.Run(ch, tt.runOptions)
|
||||
}()
|
||||
processor := events.NewDefaultProcessor()
|
||||
defer processor.Close()
|
||||
err = processor.Process(ch)
|
||||
err = executor.Run(tt.runOptions)
|
||||
if tt.expectedErr != "" {
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), tt.expectedErr)
|
||||
|
@ -30,7 +30,6 @@ import (
|
||||
"opendev.org/airship/airshipctl/pkg/container"
|
||||
"opendev.org/airship/airshipctl/pkg/document"
|
||||
airerrors "opendev.org/airship/airshipctl/pkg/errors"
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
||||
"opendev.org/airship/airshipctl/pkg/log"
|
||||
phaseerrors "opendev.org/airship/airshipctl/pkg/phase/errors"
|
||||
@ -152,9 +151,7 @@ func initRepoData(c *airshipv1.Clusterctl, o *airshipv1.ClusterctlOptions, targe
|
||||
}
|
||||
|
||||
// Run clusterctl init as a phase runner
|
||||
func (c *ClusterctlExecutor) Run(evtCh chan events.Event, opts ifc.RunOptions) {
|
||||
defer close(evtCh)
|
||||
|
||||
func (c *ClusterctlExecutor) Run(opts ifc.RunOptions) error {
|
||||
if log.DebugEnabled() {
|
||||
c.cctlOpts.CmdOptions = append(c.cctlOpts.CmdOptions, "-v5")
|
||||
}
|
||||
@ -170,16 +167,16 @@ func (c *ClusterctlExecutor) Run(evtCh chan events.Event, opts ifc.RunOptions) {
|
||||
var err error
|
||||
c.cctlOpts.Config, err = yaml.Marshal(cctlConfig)
|
||||
if err != nil {
|
||||
handleError(evtCh, err)
|
||||
return err
|
||||
}
|
||||
|
||||
switch c.options.Action {
|
||||
case airshipv1.Init:
|
||||
c.init(evtCh)
|
||||
return c.init()
|
||||
case airshipv1.Move:
|
||||
c.move(opts.DryRun, evtCh)
|
||||
return c.move(opts.DryRun)
|
||||
default:
|
||||
handleError(evtCh, errors.ErrUnknownExecutorAction{Action: string(c.options.Action), ExecutorName: "clusterctl"})
|
||||
return errors.ErrUnknownExecutorAction{Action: string(c.options.Action), ExecutorName: "clusterctl"}
|
||||
}
|
||||
}
|
||||
|
||||
@ -213,16 +210,12 @@ func (c *ClusterctlExecutor) getKubeconfig() (string, string, func(), error) {
|
||||
return kubeConfigFile, context, cleanup, nil
|
||||
}
|
||||
|
||||
func (c *ClusterctlExecutor) init(evtCh chan events.Event) {
|
||||
evtCh <- events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
||||
Operation: events.ClusterctlInitStart,
|
||||
Message: "starting clusterctl init executor",
|
||||
})
|
||||
func (c *ClusterctlExecutor) init() error {
|
||||
log.Print("starting clusterctl init executor")
|
||||
|
||||
kubecfg, context, cleanup, err := c.getKubeconfig()
|
||||
if err != nil {
|
||||
handleError(evtCh, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
@ -245,38 +238,29 @@ func (c *ClusterctlExecutor) init(evtCh chan events.Event) {
|
||||
}
|
||||
|
||||
if err = c.run(); err != nil {
|
||||
handleError(evtCh, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
eventMsg := "clusterctl init completed successfully"
|
||||
evtCh <- events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
||||
Operation: events.ClusterctlInitEnd,
|
||||
Message: eventMsg,
|
||||
})
|
||||
log.Print("clusterctl init completed successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ClusterctlExecutor) move(dryRun bool, evtCh chan events.Event) {
|
||||
evtCh <- events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
||||
Operation: events.ClusterctlMoveStart,
|
||||
Message: "starting clusterctl move executor",
|
||||
})
|
||||
func (c *ClusterctlExecutor) move(dryRun bool) error {
|
||||
log.Print("starting clusterctl move executor")
|
||||
|
||||
kubecfg, context, cleanup, err := c.getKubeconfig()
|
||||
if err != nil {
|
||||
handleError(evtCh, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
fromCluster, err := c.clusterMap.ParentCluster(c.clusterName)
|
||||
if err != nil {
|
||||
handleError(evtCh, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
fromContext, err := c.clusterMap.ClusterKubeconfigContext(fromCluster)
|
||||
if err != nil {
|
||||
handleError(evtCh, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
c.cctlOpts.CmdOptions = append(
|
||||
@ -297,15 +281,11 @@ func (c *ClusterctlExecutor) move(dryRun bool, evtCh chan events.Event) {
|
||||
}
|
||||
|
||||
if err = c.run(); err != nil {
|
||||
handleError(evtCh, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
eventMsg := "clusterctl move completed successfully"
|
||||
evtCh <- events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
||||
Operation: events.ClusterctlMoveEnd,
|
||||
Message: eventMsg,
|
||||
})
|
||||
log.Print("clusterctl move completed successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Validate executor configuration and documents
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -29,7 +28,6 @@ import (
|
||||
"opendev.org/airship/airshipctl/pkg/cluster/clustermap"
|
||||
"opendev.org/airship/airshipctl/pkg/container"
|
||||
"opendev.org/airship/airshipctl/pkg/document"
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/executors/errors"
|
||||
@ -248,17 +246,15 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
||||
name string
|
||||
cfgDoc document.Document
|
||||
kubecfg kubeconfig.Interface
|
||||
expectedEvt []events.Event
|
||||
expectedErr error
|
||||
clusterMap clustermap.ClusterMap
|
||||
clientFunc container.ClientV1Alpha1FactoryFunc
|
||||
}{
|
||||
{
|
||||
name: "Error unknown action",
|
||||
cfgDoc: executorDoc(t, fmt.Sprintf(executorConfigTmplGood, "someAction")),
|
||||
expectedEvt: []events.Event{
|
||||
wrapError(errors.ErrUnknownExecutorAction{Action: "someAction", ExecutorName: "clusterctl"}),
|
||||
},
|
||||
clusterMap: clustermap.NewClusterMap(v1alpha1.DefaultClusterMap()),
|
||||
name: "Error unknown action",
|
||||
cfgDoc: executorDoc(t, fmt.Sprintf(executorConfigTmplGood, "someAction")),
|
||||
expectedErr: errors.ErrUnknownExecutorAction{Action: "someAction", ExecutorName: "clusterctl"},
|
||||
clusterMap: clustermap.NewClusterMap(v1alpha1.DefaultClusterMap()),
|
||||
},
|
||||
{
|
||||
name: "Failed get kubeconfig file - init",
|
||||
@ -266,13 +262,8 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
||||
kubecfg: fakeKubeConfig{getFile: func() (string, kubeconfig.Cleanup, error) {
|
||||
return "", nil, errTmpFile
|
||||
}},
|
||||
expectedEvt: []events.Event{
|
||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
||||
Operation: events.ClusterctlInitStart,
|
||||
}),
|
||||
wrapError(errTmpFile),
|
||||
},
|
||||
clusterMap: clustermap.NewClusterMap(v1alpha1.DefaultClusterMap()),
|
||||
expectedErr: errTmpFile,
|
||||
clusterMap: clustermap.NewClusterMap(v1alpha1.DefaultClusterMap()),
|
||||
},
|
||||
{
|
||||
name: "Failed get kubeconfig file - move",
|
||||
@ -280,13 +271,8 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
||||
kubecfg: fakeKubeConfig{getFile: func() (string, kubeconfig.Cleanup, error) {
|
||||
return "", nil, errTmpFile
|
||||
}},
|
||||
expectedEvt: []events.Event{
|
||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
||||
Operation: events.ClusterctlMoveStart,
|
||||
}),
|
||||
wrapError(errTmpFile),
|
||||
},
|
||||
clusterMap: clustermap.NewClusterMap(v1alpha1.DefaultClusterMap()),
|
||||
expectedErr: errTmpFile,
|
||||
clusterMap: clustermap.NewClusterMap(v1alpha1.DefaultClusterMap()),
|
||||
},
|
||||
{
|
||||
name: "Failed get kubeconfig context - init",
|
||||
@ -294,12 +280,7 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
||||
kubecfg: fakeKubeConfig{getFile: func() (string, kubeconfig.Cleanup, error) {
|
||||
return "", func() {}, nil
|
||||
}},
|
||||
expectedEvt: []events.Event{
|
||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
||||
Operation: events.ClusterctlInitStart,
|
||||
}),
|
||||
wrapError(errCtx),
|
||||
},
|
||||
expectedErr: errCtx,
|
||||
clusterMap: ClusterMapMockInterface{MockClusterKubeconfigContext: func(s string) (string, error) {
|
||||
return "", errCtx
|
||||
}},
|
||||
@ -310,12 +291,7 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
||||
kubecfg: fakeKubeConfig{getFile: func() (string, kubeconfig.Cleanup, error) {
|
||||
return "", func() {}, nil
|
||||
}},
|
||||
expectedEvt: []events.Event{
|
||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
||||
Operation: events.ClusterctlMoveStart,
|
||||
}),
|
||||
wrapError(errCtx),
|
||||
},
|
||||
expectedErr: errCtx,
|
||||
clusterMap: ClusterMapMockInterface{MockClusterKubeconfigContext: func(s string) (string, error) {
|
||||
return "", errCtx
|
||||
}},
|
||||
@ -326,12 +302,7 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
||||
kubecfg: fakeKubeConfig{getFile: func() (string, kubeconfig.Cleanup, error) {
|
||||
return "", func() {}, nil
|
||||
}},
|
||||
expectedEvt: []events.Event{
|
||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
||||
Operation: events.ClusterctlMoveStart,
|
||||
}),
|
||||
wrapError(errParent),
|
||||
},
|
||||
expectedErr: errParent,
|
||||
clusterMap: ClusterMapMockInterface{MockClusterKubeconfigContext: func(s string) (string, error) {
|
||||
return "ctx", nil
|
||||
},
|
||||
@ -354,14 +325,6 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
||||
return nil
|
||||
}}
|
||||
},
|
||||
expectedEvt: []events.Event{
|
||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
||||
Operation: events.ClusterctlInitStart,
|
||||
}),
|
||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
||||
Operation: events.ClusterctlInitEnd,
|
||||
}),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Regular Run move",
|
||||
@ -381,14 +344,6 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
||||
return nil
|
||||
}}
|
||||
},
|
||||
expectedEvt: []events.Event{
|
||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
||||
Operation: events.ClusterctlMoveStart,
|
||||
}),
|
||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
||||
Operation: events.ClusterctlMoveEnd,
|
||||
}),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, test := range testCases {
|
||||
@ -404,23 +359,8 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
||||
ContainerFunc: tt.clientFunc,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
ch := make(chan events.Event)
|
||||
go executor.Run(ch, ifc.RunOptions{DryRun: true})
|
||||
var actualEvt []events.Event
|
||||
for evt := range ch {
|
||||
// Skip timestamp for comparison
|
||||
evt.Timestamp = time.Time{}
|
||||
if evt.Type == events.ClusterctlType {
|
||||
// Set message to empty string, so it's not compared
|
||||
evt.ClusterctlEvent.Message = ""
|
||||
}
|
||||
actualEvt = append(actualEvt, evt)
|
||||
}
|
||||
for i := range tt.expectedEvt {
|
||||
// Skip timestamp for comparison
|
||||
tt.expectedEvt[i].Timestamp = time.Time{}
|
||||
}
|
||||
assert.Equal(t, tt.expectedEvt, actualEvt)
|
||||
err = executor.Run(ifc.RunOptions{DryRun: true})
|
||||
assert.Equal(t, tt.expectedErr, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
|
||||
airshipv1 "opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/executors/errors"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
||||
)
|
||||
@ -64,9 +63,3 @@ func RegisterExecutor(executorName string, registry map[schema.GroupVersionKind]
|
||||
registry[gvks[0]] = execObj
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleError(ch chan<- events.Event, err error) {
|
||||
ch <- events.NewEvent().WithErrorEvent(events.ErrorEvent{
|
||||
Error: err,
|
||||
})
|
||||
}
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
|
||||
"opendev.org/airship/airshipctl/pkg/document"
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
||||
)
|
||||
@ -107,12 +106,6 @@ func executorBundle(t *testing.T, s string) document.Bundle {
|
||||
return b
|
||||
}
|
||||
|
||||
func wrapError(err error) events.Event {
|
||||
return events.NewEvent().WithErrorEvent(events.ErrorEvent{
|
||||
Error: err,
|
||||
})
|
||||
}
|
||||
|
||||
func testClusterMap(t *testing.T) clustermap.ClusterMap {
|
||||
doc, err := document.NewDocumentFromBytes([]byte(singleExecutorClusterMap))
|
||||
require.NoError(t, err)
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
"opendev.org/airship/airshipctl/pkg/container"
|
||||
"opendev.org/airship/airshipctl/pkg/document"
|
||||
commonerrors "opendev.org/airship/airshipctl/pkg/errors"
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
||||
"opendev.org/airship/airshipctl/pkg/log"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/errors"
|
||||
@ -82,27 +81,20 @@ func NewContainerExecutor(cfg ifc.ExecutorConfig) (ifc.Executor, error) {
|
||||
}
|
||||
|
||||
// Run generic container as a phase runner
|
||||
func (c *ContainerExecutor) Run(evtCh chan events.Event, opts ifc.RunOptions) {
|
||||
defer close(evtCh)
|
||||
|
||||
evtCh <- events.NewEvent().WithGenericContainerEvent(events.GenericContainerEvent{
|
||||
Operation: events.GenericContainerStart,
|
||||
Message: "starting generic container",
|
||||
})
|
||||
func (c *ContainerExecutor) Run(opts ifc.RunOptions) error {
|
||||
log.Print("starting generic container")
|
||||
|
||||
if c.Options.ClusterName != "" {
|
||||
cleanup, err := c.SetKubeConfig()
|
||||
if err != nil {
|
||||
handleError(evtCh, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
defer cleanup()
|
||||
}
|
||||
|
||||
input, err := bundleReader(c.ExecutorBundle)
|
||||
if err != nil {
|
||||
handleError(evtCh, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO this logic is redundant in executor package, move it to pkg/container
|
||||
@ -112,29 +104,22 @@ func (c *ContainerExecutor) Run(evtCh chan events.Event, opts ifc.RunOptions) {
|
||||
output = os.Stdout
|
||||
}
|
||||
if err = c.setConfig(); err != nil {
|
||||
handleError(evtCh, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO check the executor type when dryrun is set
|
||||
if opts.DryRun {
|
||||
evtCh <- events.NewEvent().WithGenericContainerEvent(events.GenericContainerEvent{
|
||||
Operation: events.GenericContainerStop,
|
||||
Message: "DryRun execution finished",
|
||||
})
|
||||
return
|
||||
log.Print("DryRun execution finished")
|
||||
return nil
|
||||
}
|
||||
|
||||
err = c.ClientFunc(c.ResultsDir, input, output, c.Container, c.MountBasePath).Run()
|
||||
if err != nil {
|
||||
handleError(evtCh, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
evtCh <- events.NewEvent().WithGenericContainerEvent(events.GenericContainerEvent{
|
||||
Operation: events.GenericContainerStop,
|
||||
Message: "execution of the generic container finished",
|
||||
})
|
||||
log.Print("execution of the generic container finished")
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetKubeConfig adds env variable and mounts kubeconfig to container
|
||||
|
@ -27,7 +27,6 @@ import (
|
||||
"opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
||||
"opendev.org/airship/airshipctl/pkg/container"
|
||||
"opendev.org/airship/airshipctl/pkg/document"
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/errors"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
||||
@ -252,24 +251,12 @@ func TestGenericContainer(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
ch := make(chan events.Event)
|
||||
go containerExecutor.Run(ch, tt.runOptions)
|
||||
|
||||
actualEvt := make([]events.Event, 0)
|
||||
for evt := range ch {
|
||||
actualEvt = append(actualEvt, evt)
|
||||
}
|
||||
require.Greater(t, len(actualEvt), 0)
|
||||
|
||||
err := containerExecutor.Run(tt.runOptions)
|
||||
if tt.expectedErr != "" {
|
||||
e := actualEvt[len(actualEvt)-1]
|
||||
require.Error(t, e.ErrorEvent.Error)
|
||||
assert.Contains(t, e.ErrorEvent.Error.Error(), tt.expectedErr)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), tt.expectedErr)
|
||||
} else {
|
||||
e := actualEvt[len(actualEvt)-1]
|
||||
assert.NoError(t, e.ErrorEvent.Error)
|
||||
assert.Equal(t, e.Type, events.GenericContainerType)
|
||||
assert.Equal(t, e.GenericContainerEvent.Operation, events.GenericContainerStop)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tt.resultConfig, containerExecutor.Container.Config)
|
||||
}
|
||||
})
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
"opendev.org/airship/airshipctl/pkg/container"
|
||||
"opendev.org/airship/airshipctl/pkg/document"
|
||||
"opendev.org/airship/airshipctl/pkg/errors"
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
"opendev.org/airship/airshipctl/pkg/log"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
||||
)
|
||||
@ -56,20 +55,12 @@ func NewEphemeralExecutor(cfg ifc.ExecutorConfig) (ifc.Executor, error) {
|
||||
}
|
||||
|
||||
// Run ephemeral as a phase runner
|
||||
func (c *EphemeralExecutor) Run(evtCh chan events.Event, opts ifc.RunOptions) {
|
||||
defer close(evtCh)
|
||||
|
||||
evtCh <- events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
||||
Operation: events.BootstrapStart,
|
||||
Message: "Processing Ephemeral cluster operation ...",
|
||||
})
|
||||
func (c *EphemeralExecutor) Run(opts ifc.RunOptions) error {
|
||||
log.Print("Processing Ephemeral cluster operation ...")
|
||||
|
||||
if opts.DryRun {
|
||||
log.Print("Dryrun: bootstrap container command will be skipped")
|
||||
evtCh <- events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
||||
Operation: events.BootstrapDryRun,
|
||||
})
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
if c.Container == nil {
|
||||
@ -79,8 +70,7 @@ func (c *EphemeralExecutor) Run(evtCh chan events.Event, opts ifc.RunOptions) {
|
||||
c.BootConf.BootstrapContainer.ContainerRuntime,
|
||||
c.BootConf.BootstrapContainer.Image)
|
||||
if err != nil {
|
||||
handleError(evtCh, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
c.Container = builder
|
||||
}
|
||||
@ -91,43 +81,29 @@ func (c *EphemeralExecutor) Run(evtCh chan events.Event, opts ifc.RunOptions) {
|
||||
Sleep: time.Sleep,
|
||||
}
|
||||
|
||||
evtCh <- events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
||||
Operation: events.BootstrapValidation,
|
||||
Message: "Verifying executor manifest document ...",
|
||||
})
|
||||
log.Print("Verifying executor manifest document ...")
|
||||
|
||||
err := bootstrapOpts.VerifyInputs()
|
||||
if err != nil {
|
||||
handleError(evtCh, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
evtCh <- events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
||||
Operation: events.BootstrapRun,
|
||||
Message: "Creating and starting the Bootstrap Container ...",
|
||||
})
|
||||
log.Print("Creating and starting the Bootstrap Container ...")
|
||||
|
||||
err = bootstrapOpts.CreateBootstrapContainer()
|
||||
if err != nil {
|
||||
handleError(evtCh, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
evtCh <- events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
||||
Operation: events.BootstrapValidation,
|
||||
Message: "Verifying generation of kubeconfig file ...",
|
||||
})
|
||||
log.Print("Verifying generation of kubeconfig file ...")
|
||||
|
||||
err = bootstrapOpts.VerifyArtifacts()
|
||||
if err != nil {
|
||||
handleError(evtCh, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
evtCh <- events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
||||
Operation: events.BootstrapEnd,
|
||||
Message: "Ephemeral cluster operation has completed successfully and artifacts verified",
|
||||
})
|
||||
log.Print("Ephemeral cluster operation has completed successfully and artifacts verified")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Validate executor configuration and documents
|
||||
|
@ -19,7 +19,6 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -29,7 +28,6 @@ import (
|
||||
"opendev.org/airship/airshipctl/pkg/container"
|
||||
"opendev.org/airship/airshipctl/pkg/document"
|
||||
"opendev.org/airship/airshipctl/pkg/errors"
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
||||
"opendev.org/airship/airshipctl/testutil"
|
||||
@ -92,7 +90,7 @@ func TestExecutorEphemeralRun(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
container *testcontainer.MockContainer
|
||||
expectedEvt []events.Event
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
name: "Run bootstrap container successfully",
|
||||
@ -106,44 +104,14 @@ func TestExecutorEphemeralRun(t *testing.T) {
|
||||
return state, nil
|
||||
},
|
||||
},
|
||||
expectedEvt: []events.Event{
|
||||
events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
||||
Operation: events.BootstrapStart,
|
||||
}),
|
||||
events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
||||
Operation: events.BootstrapValidation,
|
||||
}),
|
||||
events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
||||
Operation: events.BootstrapRun,
|
||||
}),
|
||||
events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
||||
Operation: events.BootstrapValidation,
|
||||
}),
|
||||
events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
||||
Operation: events.BootstrapEnd,
|
||||
}),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Run bootstrap container with Unknow Error",
|
||||
name: "Run bootstrap container with Unknown Error",
|
||||
container: &testcontainer.MockContainer{
|
||||
MockRunCommand: func() error { return ephemeral.ErrBootstrapContainerRun{} },
|
||||
MockRmContainer: func() error { return nil },
|
||||
},
|
||||
expectedEvt: []events.Event{
|
||||
events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
||||
Operation: events.BootstrapStart,
|
||||
}),
|
||||
events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
||||
Operation: events.BootstrapValidation,
|
||||
}),
|
||||
events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
||||
Operation: events.BootstrapRun,
|
||||
}),
|
||||
events.NewEvent().WithErrorEvent(events.ErrorEvent{
|
||||
Error: ephemeral.ErrBootstrapContainerRun{},
|
||||
}),
|
||||
},
|
||||
expectedErr: ephemeral.ErrBootstrapContainerRun{},
|
||||
},
|
||||
}
|
||||
for _, test := range testCases {
|
||||
@ -153,23 +121,8 @@ func TestExecutorEphemeralRun(t *testing.T) {
|
||||
BootConf: testCfg,
|
||||
Container: tt.container,
|
||||
}
|
||||
ch := make(chan events.Event)
|
||||
go executor.Run(ch, ifc.RunOptions{})
|
||||
var actualEvt []events.Event
|
||||
for evt := range ch {
|
||||
if evt.Type == events.BootstrapType {
|
||||
// Set message to empty string, so it's not compared
|
||||
evt.BootstrapEvent.Message = ""
|
||||
}
|
||||
actualEvt = append(actualEvt, evt)
|
||||
}
|
||||
for i := range tt.expectedEvt {
|
||||
// Fix timestamps for comparison
|
||||
timeStamp := time.Time{}
|
||||
tt.expectedEvt[i].Timestamp = timeStamp
|
||||
actualEvt[i].Timestamp = timeStamp
|
||||
}
|
||||
assert.Equal(t, tt.expectedEvt, actualEvt)
|
||||
err := executor.Run(ifc.RunOptions{})
|
||||
assert.Equal(t, tt.expectedErr, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -26,7 +26,6 @@ import (
|
||||
"opendev.org/airship/airshipctl/pkg/container"
|
||||
"opendev.org/airship/airshipctl/pkg/document"
|
||||
airerrors "opendev.org/airship/airshipctl/pkg/errors"
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
||||
"opendev.org/airship/airshipctl/pkg/log"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/errors"
|
||||
@ -93,17 +92,14 @@ func NewKubeApplierExecutor(cfg ifc.ExecutorConfig) (ifc.Executor, error) {
|
||||
}
|
||||
|
||||
// Run executor, should be performed in separate go routine
|
||||
func (e *KubeApplierExecutor) Run(ch chan events.Event, runOpts ifc.RunOptions) {
|
||||
defer close(ch)
|
||||
|
||||
func (e *KubeApplierExecutor) Run(runOpts ifc.RunOptions) error {
|
||||
e.apiObject.Config.Debug = log.DebugEnabled()
|
||||
e.apiObject.Config.PhaseName = e.BundleName
|
||||
|
||||
if e.apiObject.Config.Kubeconfig == "" {
|
||||
kcfg, ctx, cleanup, err := e.getKubeconfig()
|
||||
if err != nil {
|
||||
handleError(ch, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
defer cleanup()
|
||||
e.apiObject.Config.Kubeconfig, e.apiObject.Config.Context = kcfg, ctx
|
||||
@ -123,22 +119,16 @@ func (e *KubeApplierExecutor) Run(ch chan events.Event, runOpts ifc.RunOptions)
|
||||
|
||||
reader, err := e.prepareDocuments()
|
||||
if err != nil {
|
||||
handleError(ch, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
opts, err := yaml.Marshal(&e.apiObject.Config)
|
||||
if err != nil {
|
||||
handleError(ch, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
e.execObj.Config = string(opts)
|
||||
err = e.clientFunc("", reader, os.Stdout, e.execObj, e.targetPath).Run()
|
||||
if err != nil {
|
||||
handleError(ch, err)
|
||||
return
|
||||
}
|
||||
return e.clientFunc("", reader, os.Stdout, e.execObj, e.targetPath).Run()
|
||||
}
|
||||
|
||||
func (e *KubeApplierExecutor) getKubeconfig() (string, string, func(), error) {
|
||||
|
@ -28,7 +28,6 @@ import (
|
||||
"opendev.org/airship/airshipctl/pkg/cluster/clustermap"
|
||||
"opendev.org/airship/airshipctl/pkg/container"
|
||||
"opendev.org/airship/airshipctl/pkg/document"
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
"opendev.org/airship/airshipctl/pkg/fs"
|
||||
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
||||
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
||||
@ -337,10 +336,7 @@ func TestKubeApplierExecutorRun(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, exec)
|
||||
ch := make(chan events.Event)
|
||||
go exec.Run(ch, ifc.RunOptions{})
|
||||
processor := events.NewDefaultProcessor()
|
||||
err = processor.Process(ch)
|
||||
err = exec.Run(ifc.RunOptions{})
|
||||
if tt.containsErr != "" {
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), tt.containsErr)
|
||||
|
@ -21,14 +21,13 @@ import (
|
||||
"opendev.org/airship/airshipctl/pkg/cluster/clustermap"
|
||||
"opendev.org/airship/airshipctl/pkg/container"
|
||||
"opendev.org/airship/airshipctl/pkg/document"
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
inventoryifc "opendev.org/airship/airshipctl/pkg/inventory/ifc"
|
||||
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
||||
)
|
||||
|
||||
// Executor interface should be implemented by each runner
|
||||
type Executor interface {
|
||||
Run(chan events.Event, RunOptions)
|
||||
Run(RunOptions) error
|
||||
Render(io.Writer, RenderOptions) error
|
||||
Validate() error
|
||||
Status() (ExecutorStatus, error)
|
||||
|
@ -1,47 +0,0 @@
|
||||
/*
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
https://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package k8sutils
|
||||
|
||||
import (
|
||||
"opendev.org/airship/airshipctl/pkg/events"
|
||||
)
|
||||
|
||||
// SuccessEvents returns list of events that constitute a successful cli utils apply
|
||||
func SuccessEvents() []events.ApplierEvent {
|
||||
return []events.ApplierEvent{
|
||||
{
|
||||
Operation: "init",
|
||||
Message: "success",
|
||||
},
|
||||
{
|
||||
Operation: "configured",
|
||||
Message: "success",
|
||||
},
|
||||
{
|
||||
Operation: "completed",
|
||||
Message: "success",
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// ErrorEvents return a list of events with error
|
||||
func ErrorEvents() []events.ApplierEvent {
|
||||
return []events.ApplierEvent{
|
||||
{
|
||||
Operation: "error",
|
||||
Message: "apply-error",
|
||||
},
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user