Remove events module
This module actually ins't used anymore. Change-Id: I2aa4edb17843e6b88f657e09f1a338302b92b7ff Signed-off-by: Ruslan Aliev <raliev@mirantis.com>
This commit is contained in:
parent
c80e03457b
commit
ef55861139
@ -1,29 +0,0 @@
|
|||||||
/*
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
https://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package events
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ErrEventReceived returned for not implemented features
|
|
||||||
type ErrEventReceived struct {
|
|
||||||
Errors []error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e ErrEventReceived) Error() string {
|
|
||||||
// TODO make printing more readable here
|
|
||||||
return fmt.Sprintf("Error events received on channel, errors are:\n%v", e.Errors)
|
|
||||||
}
|
|
@ -1,263 +0,0 @@
|
|||||||
/*
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
https://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package events
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Type indicates type of the event
|
|
||||||
type Type int
|
|
||||||
|
|
||||||
const (
|
|
||||||
// ApplierType is event that are produced by applier
|
|
||||||
ApplierType Type = iota
|
|
||||||
// ErrorType means that even is of error type
|
|
||||||
ErrorType
|
|
||||||
// StatusPollerType event produced by status poller
|
|
||||||
StatusPollerType
|
|
||||||
// WaitType is event emitted when airshipctl is waiting for something
|
|
||||||
WaitType
|
|
||||||
// ClusterctlType event emitted by Clusterctl executor
|
|
||||||
ClusterctlType
|
|
||||||
// BootstrapType event emitted by Bootstrap executor
|
|
||||||
BootstrapType
|
|
||||||
// GenericContainerType event emitted by GenericContainer
|
|
||||||
GenericContainerType
|
|
||||||
// BaremetalManagerEventType event emitted by BaremetalManager
|
|
||||||
BaremetalManagerEventType
|
|
||||||
)
|
|
||||||
|
|
||||||
// Event holds all possible events that can be produced by airship
|
|
||||||
type Event struct {
|
|
||||||
Type Type
|
|
||||||
Timestamp time.Time
|
|
||||||
ApplierEvent ApplierEvent
|
|
||||||
ErrorEvent ErrorEvent
|
|
||||||
ClusterctlEvent ClusterctlEvent
|
|
||||||
BootstrapEvent BootstrapEvent
|
|
||||||
GenericContainerEvent GenericContainerEvent
|
|
||||||
BaremetalManagerEvent BaremetalManagerEvent
|
|
||||||
}
|
|
||||||
|
|
||||||
// ApplierEvent is produced by k8s_apply executor
|
|
||||||
type ApplierEvent struct {
|
|
||||||
Operation string
|
|
||||||
Message string
|
|
||||||
}
|
|
||||||
|
|
||||||
//GenericEvent generalized type for custom events
|
|
||||||
type GenericEvent struct {
|
|
||||||
Type string
|
|
||||||
Operation string
|
|
||||||
Message string
|
|
||||||
Timestamp time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
var mapTypeToEvent = map[Type]string{
|
|
||||||
ClusterctlType: "ClusterctlEvent",
|
|
||||||
BootstrapType: "BootstrapEvent",
|
|
||||||
GenericContainerType: "GenericContainerEvent",
|
|
||||||
}
|
|
||||||
|
|
||||||
var unknownEventType = map[Type]string{
|
|
||||||
ApplierType: "ApplierType",
|
|
||||||
ErrorType: "ErrorType",
|
|
||||||
StatusPollerType: "StatusPollerType",
|
|
||||||
WaitType: "WaitType",
|
|
||||||
}
|
|
||||||
|
|
||||||
var clusterctlOperationToString = map[ClusterctlOperation]string{
|
|
||||||
ClusterctlInitStart: "ClusterctlInitStart",
|
|
||||||
ClusterctlInitEnd: "ClusterctlInitEnd",
|
|
||||||
ClusterctlMoveStart: "ClusterctlMoveStart",
|
|
||||||
ClusterctlMoveEnd: "ClusterctlMoveEnd",
|
|
||||||
}
|
|
||||||
|
|
||||||
var bootstrapOperationToString = map[BootstrapOperation]string{
|
|
||||||
BootstrapStart: "BootstrapStart",
|
|
||||||
BootstrapDryRun: "BootstrapDryRun",
|
|
||||||
BootstrapValidation: "BootstrapValidation",
|
|
||||||
BootstrapRun: "BootstrapRun",
|
|
||||||
BootstrapEnd: "BootstrapEnd",
|
|
||||||
}
|
|
||||||
|
|
||||||
var genericContainerOperationToString = map[GenericContainerOperation]string{
|
|
||||||
GenericContainerStart: "GenericContainerStart",
|
|
||||||
GenericContainerStop: "GenericContainerStop",
|
|
||||||
}
|
|
||||||
|
|
||||||
var baremetalInventoryOperationToString = map[BaremetalManagerStep]string{
|
|
||||||
BaremetalManagerStart: "BaremetalOperationStart",
|
|
||||||
BaremetalManagerComplete: "BaremetalOperationComplete",
|
|
||||||
}
|
|
||||||
|
|
||||||
//Normalize cast Event to GenericEvent type
|
|
||||||
func Normalize(e Event) GenericEvent {
|
|
||||||
var eventType string
|
|
||||||
if t, exists := mapTypeToEvent[e.Type]; exists {
|
|
||||||
eventType = t
|
|
||||||
} else {
|
|
||||||
eventType = fmt.Sprintf("Unknown event type: %v", unknownEventType[e.Type])
|
|
||||||
}
|
|
||||||
|
|
||||||
var operation, message string
|
|
||||||
switch e.Type {
|
|
||||||
case ClusterctlType:
|
|
||||||
operation = clusterctlOperationToString[e.ClusterctlEvent.Operation]
|
|
||||||
message = e.ClusterctlEvent.Message
|
|
||||||
case BootstrapType:
|
|
||||||
operation = bootstrapOperationToString[e.BootstrapEvent.Operation]
|
|
||||||
message = e.BootstrapEvent.Message
|
|
||||||
case GenericContainerType:
|
|
||||||
operation = genericContainerOperationToString[e.GenericContainerEvent.Operation]
|
|
||||||
message = e.GenericContainerEvent.Message
|
|
||||||
case BaremetalManagerEventType:
|
|
||||||
operation = baremetalInventoryOperationToString[e.BaremetalManagerEvent.Step]
|
|
||||||
message = e.BaremetalManagerEvent.Message
|
|
||||||
}
|
|
||||||
|
|
||||||
return GenericEvent{
|
|
||||||
Type: eventType,
|
|
||||||
Operation: operation,
|
|
||||||
Message: message,
|
|
||||||
Timestamp: e.Timestamp,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewEvent create new event with timestamp
|
|
||||||
func NewEvent() Event {
|
|
||||||
return Event{
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ErrorEvent is produced when error is encountered
|
|
||||||
type ErrorEvent struct {
|
|
||||||
Error error
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithErrorEvent sets type and actual error event
|
|
||||||
func (e Event) WithErrorEvent(event ErrorEvent) Event {
|
|
||||||
e.Type = ErrorType
|
|
||||||
e.ErrorEvent = event
|
|
||||||
return e
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClusterctlOperation type
|
|
||||||
type ClusterctlOperation int
|
|
||||||
|
|
||||||
const (
|
|
||||||
// ClusterctlInitStart operation
|
|
||||||
ClusterctlInitStart ClusterctlOperation = iota
|
|
||||||
// ClusterctlInitEnd operation
|
|
||||||
ClusterctlInitEnd
|
|
||||||
// ClusterctlMoveStart operation
|
|
||||||
ClusterctlMoveStart
|
|
||||||
// ClusterctlMoveEnd operation
|
|
||||||
ClusterctlMoveEnd
|
|
||||||
)
|
|
||||||
|
|
||||||
// ClusterctlEvent is produced by clusterctl executor
|
|
||||||
type ClusterctlEvent struct {
|
|
||||||
Operation ClusterctlOperation
|
|
||||||
Message string
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithClusterctlEvent sets type and actual clusterctl event
|
|
||||||
func (e Event) WithClusterctlEvent(concreteEvent ClusterctlEvent) Event {
|
|
||||||
e.Type = ClusterctlType
|
|
||||||
e.ClusterctlEvent = concreteEvent
|
|
||||||
return e
|
|
||||||
}
|
|
||||||
|
|
||||||
// BootstrapOperation type
|
|
||||||
type BootstrapOperation int
|
|
||||||
|
|
||||||
const (
|
|
||||||
// BootstrapStart operation
|
|
||||||
BootstrapStart BootstrapOperation = iota
|
|
||||||
// BootstrapDryRun operation
|
|
||||||
BootstrapDryRun
|
|
||||||
// BootstrapValidation operation
|
|
||||||
BootstrapValidation
|
|
||||||
// BootstrapRun operation
|
|
||||||
BootstrapRun
|
|
||||||
// BootstrapEnd operation
|
|
||||||
BootstrapEnd
|
|
||||||
)
|
|
||||||
|
|
||||||
// BootstrapEvent needs to to track events in bootstrap executor
|
|
||||||
type BootstrapEvent struct {
|
|
||||||
Operation BootstrapOperation
|
|
||||||
Message string
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithBootstrapEvent sets type and actual bootstrap event
|
|
||||||
func (e Event) WithBootstrapEvent(concreteEvent BootstrapEvent) Event {
|
|
||||||
e.Type = BootstrapType
|
|
||||||
e.BootstrapEvent = concreteEvent
|
|
||||||
return e
|
|
||||||
}
|
|
||||||
|
|
||||||
// GenericContainerOperation type
|
|
||||||
type GenericContainerOperation int
|
|
||||||
|
|
||||||
const (
|
|
||||||
// GenericContainerStart operation
|
|
||||||
GenericContainerStart GenericContainerOperation = iota
|
|
||||||
// GenericContainerStop operation
|
|
||||||
GenericContainerStop
|
|
||||||
)
|
|
||||||
|
|
||||||
// GenericContainerEvent needs to to track events in GenericContainer executor
|
|
||||||
type GenericContainerEvent struct {
|
|
||||||
Operation GenericContainerOperation
|
|
||||||
Message string
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithGenericContainerEvent sets type and actual GenericContainer event
|
|
||||||
func (e Event) WithGenericContainerEvent(concreteEvent GenericContainerEvent) Event {
|
|
||||||
e.Type = GenericContainerType
|
|
||||||
e.GenericContainerEvent = concreteEvent
|
|
||||||
return e
|
|
||||||
}
|
|
||||||
|
|
||||||
// BaremetalManagerStep indicates what operation baremetal manager is currently peforming
|
|
||||||
// Note that this is not baremetal
|
|
||||||
type BaremetalManagerStep int
|
|
||||||
|
|
||||||
const (
|
|
||||||
// BaremetalManagerStart operation
|
|
||||||
BaremetalManagerStart BaremetalManagerStep = iota
|
|
||||||
// BaremetalManagerComplete operation
|
|
||||||
BaremetalManagerComplete
|
|
||||||
)
|
|
||||||
|
|
||||||
// BaremetalManagerEvent event emitted by BaremetalManager
|
|
||||||
type BaremetalManagerEvent struct {
|
|
||||||
Step BaremetalManagerStep
|
|
||||||
// HostOperation indicates which operation is performed against BMH Host
|
|
||||||
HostOperation string
|
|
||||||
Message string
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithBaremetalManagerEvent sets type and actual bootstrap event
|
|
||||||
func (e Event) WithBaremetalManagerEvent(concreteEvent BaremetalManagerEvent) Event {
|
|
||||||
e.Type = BaremetalManagerEventType
|
|
||||||
e.BaremetalManagerEvent = concreteEvent
|
|
||||||
return e
|
|
||||||
}
|
|
@ -1,61 +0,0 @@
|
|||||||
/*
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
https://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package events_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
|
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestNormalize(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
sourceEvent events.Event
|
|
||||||
expectedEvent events.GenericEvent
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "Unknow event type",
|
|
||||||
sourceEvent: events.NewEvent().WithErrorEvent(events.ErrorEvent{}),
|
|
||||||
expectedEvent: events.GenericEvent{
|
|
||||||
Type: "Unknown event type: ErrorType",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "Clusterctl event type",
|
|
||||||
sourceEvent: events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
|
||||||
Operation: events.ClusterctlInitStart,
|
|
||||||
Message: "Clusterctl init start",
|
|
||||||
}),
|
|
||||||
expectedEvent: events.GenericEvent{
|
|
||||||
Type: "ClusterctlEvent",
|
|
||||||
Message: "Clusterctl init start",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tt := range tests {
|
|
||||||
tt := tt
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
ge := events.Normalize(tt.sourceEvent)
|
|
||||||
assert.Equal(t, tt.expectedEvent.Type, ge.Type)
|
|
||||||
if tt.expectedEvent.Type != "Unknown event type: ErrorType" {
|
|
||||||
assert.Equal(t, tt.expectedEvent.Message, ge.Message)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,70 +0,0 @@
|
|||||||
/*
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
https://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package events
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"io"
|
|
||||||
|
|
||||||
"sigs.k8s.io/yaml"
|
|
||||||
|
|
||||||
"opendev.org/airship/airshipctl/pkg/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// YAMLPrinter format for event printer output
|
|
||||||
YAMLPrinter = "yaml"
|
|
||||||
// JSONPrinter format for event printer output
|
|
||||||
JSONPrinter = "json"
|
|
||||||
)
|
|
||||||
|
|
||||||
// NewGenericPrinter returns event printer
|
|
||||||
func NewGenericPrinter(writer io.Writer, formatterType string) GenericPrinter {
|
|
||||||
var formatter func(o interface{}) ([]byte, error)
|
|
||||||
switch formatterType {
|
|
||||||
case YAMLPrinter:
|
|
||||||
formatter = yaml.Marshal
|
|
||||||
case JSONPrinter:
|
|
||||||
formatter = json.Marshal
|
|
||||||
default:
|
|
||||||
log.Fatal("Event printer received wrong type of event formatter")
|
|
||||||
}
|
|
||||||
return GenericPrinter{
|
|
||||||
formatter: formatter,
|
|
||||||
writer: writer,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// GenericPrinter object represents event printer
|
|
||||||
type GenericPrinter struct {
|
|
||||||
formatter func(interface{}) ([]byte, error)
|
|
||||||
writer io.Writer
|
|
||||||
}
|
|
||||||
|
|
||||||
// PrintEvent write event details
|
|
||||||
func (p GenericPrinter) PrintEvent(ge GenericEvent) error {
|
|
||||||
data, err := p.formatter(map[string]interface{}{
|
|
||||||
"Type": ge.Type,
|
|
||||||
"Operation": ge.Operation,
|
|
||||||
"Message": ge.Message,
|
|
||||||
"Timestamp": ge.Timestamp,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
data = append(data, '\n')
|
|
||||||
_, err = p.writer.Write(data)
|
|
||||||
return err
|
|
||||||
}
|
|
@ -1,69 +0,0 @@
|
|||||||
/*
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
https://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package events_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
|
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
)
|
|
||||||
|
|
||||||
type fakeWriter struct {
|
|
||||||
writeErr error
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ io.Writer = fakeWriter{}
|
|
||||||
|
|
||||||
func (f fakeWriter) Write(p []byte) (n int, err error) {
|
|
||||||
return 0, f.writeErr
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPrintEvent(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
formatterType string
|
|
||||||
errString string
|
|
||||||
writer io.Writer
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "Fail on formatter type",
|
|
||||||
formatterType: events.YAMLPrinter,
|
|
||||||
errString: "error on write",
|
|
||||||
writer: fakeWriter{
|
|
||||||
writeErr: fmt.Errorf("error on write"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tt := range tests {
|
|
||||||
tt := tt
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
p := events.NewGenericPrinter(tt.writer, tt.formatterType)
|
|
||||||
e := events.NewEvent().WithGenericContainerEvent(events.GenericContainerEvent{
|
|
||||||
Operation: events.GenericContainerStart,
|
|
||||||
Message: "starting generic container generation",
|
|
||||||
})
|
|
||||||
ge := events.Normalize(e)
|
|
||||||
err := p.PrintEvent(ge)
|
|
||||||
if tt.errString != "" {
|
|
||||||
require.Error(t, err)
|
|
||||||
assert.Contains(t, err.Error(), tt.errString)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,78 +0,0 @@
|
|||||||
/*
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
https://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package events
|
|
||||||
|
|
||||||
import (
|
|
||||||
"opendev.org/airship/airshipctl/pkg/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
// EventProcessor use to process event channels produced by executors
|
|
||||||
type EventProcessor interface {
|
|
||||||
Process(<-chan Event) error
|
|
||||||
Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// DefaultProcessor is implementation of EventProcessor
|
|
||||||
type DefaultProcessor struct {
|
|
||||||
errors []error
|
|
||||||
genericPrinter GenericPrinter
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewDefaultProcessor returns instance of DefaultProcessor as interface Implementation
|
|
||||||
func NewDefaultProcessor() EventProcessor {
|
|
||||||
// printer for custom airshipctl events
|
|
||||||
genericPrinter := NewGenericPrinter(log.Writer(), JSONPrinter)
|
|
||||||
return &DefaultProcessor{
|
|
||||||
errors: []error{},
|
|
||||||
genericPrinter: genericPrinter,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process is implementation of EventProcessor
|
|
||||||
func (p *DefaultProcessor) Process(ch <-chan Event) error {
|
|
||||||
for e := range ch {
|
|
||||||
switch e.Type {
|
|
||||||
case ErrorType:
|
|
||||||
log.Printf("Received error on event channel %v", e.ErrorEvent)
|
|
||||||
p.errors = append(p.errors, e.ErrorEvent.Error)
|
|
||||||
case StatusPollerType:
|
|
||||||
log.Fatalf("Processing for status poller events are not yet implemented")
|
|
||||||
case WaitType:
|
|
||||||
log.Fatalf("Processing for wait events are not yet implemented")
|
|
||||||
default:
|
|
||||||
ge := Normalize(e)
|
|
||||||
err := p.genericPrinter.PrintEvent(ge)
|
|
||||||
if err != nil {
|
|
||||||
p.errors = append(p.errors, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return checkErrors(p.errors)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close cleans up the auxiliary channels used to process events
|
|
||||||
func (p *DefaultProcessor) Close() {
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check list of errors, and verify that these errors we are able to tolerate
|
|
||||||
// currently we simply check if the list is empty or not
|
|
||||||
func checkErrors(errs []error) error {
|
|
||||||
if len(errs) != 0 {
|
|
||||||
return ErrEventReceived{
|
|
||||||
Errors: errs,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -1,104 +0,0 @@
|
|||||||
/*
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
https://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package events_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
|
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
k8stest "opendev.org/airship/airshipctl/testutil/k8sutils"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestDefaultProcessor(t *testing.T) {
|
|
||||||
proc := events.NewDefaultProcessor()
|
|
||||||
defer proc.Close()
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
events []events.Event
|
|
||||||
errString string
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "success",
|
|
||||||
events: successEvents(),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "error event",
|
|
||||||
events: errEvents(),
|
|
||||||
errString: "somerror",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "error event",
|
|
||||||
events: errApplyEvents(),
|
|
||||||
errString: "somerror",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tt := range tests {
|
|
||||||
tt := tt
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
ch := make(chan events.Event, len(tt.events))
|
|
||||||
for _, e := range tt.events {
|
|
||||||
ch <- e
|
|
||||||
}
|
|
||||||
close(ch)
|
|
||||||
err := proc.Process(ch)
|
|
||||||
if tt.errString != "" {
|
|
||||||
require.Error(t, err)
|
|
||||||
assert.Contains(t, err.Error(), tt.errString)
|
|
||||||
} else {
|
|
||||||
assert.NoError(t, err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func successEvents() []events.Event {
|
|
||||||
applyEvents := k8stest.SuccessEvents()
|
|
||||||
airEvents := []events.Event{}
|
|
||||||
for _, e := range applyEvents {
|
|
||||||
airEvents = append(airEvents, events.Event{
|
|
||||||
Type: events.ApplierType,
|
|
||||||
ApplierEvent: e,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return airEvents
|
|
||||||
}
|
|
||||||
|
|
||||||
func errEvents() []events.Event {
|
|
||||||
return []events.Event{
|
|
||||||
{
|
|
||||||
Type: events.ErrorType,
|
|
||||||
ErrorEvent: events.ErrorEvent{
|
|
||||||
Error: fmt.Errorf("somerror"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func errApplyEvents() []events.Event {
|
|
||||||
errApplyEvents := k8stest.ErrorEvents()
|
|
||||||
airEvents := []events.Event{}
|
|
||||||
for _, e := range errApplyEvents {
|
|
||||||
airEvents = append(airEvents, events.Event{
|
|
||||||
Type: events.ApplierType,
|
|
||||||
ApplierEvent: e,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return airEvents
|
|
||||||
}
|
|
@ -26,7 +26,6 @@ import (
|
|||||||
"opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
"opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
||||||
"opendev.org/airship/airshipctl/pkg/container"
|
"opendev.org/airship/airshipctl/pkg/container"
|
||||||
"opendev.org/airship/airshipctl/pkg/document"
|
"opendev.org/airship/airshipctl/pkg/document"
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
||||||
"opendev.org/airship/airshipctl/pkg/log"
|
"opendev.org/airship/airshipctl/pkg/log"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/errors"
|
"opendev.org/airship/airshipctl/pkg/phase/errors"
|
||||||
@ -56,10 +55,9 @@ var _ ifc.Phase = &phase{}
|
|||||||
|
|
||||||
// Phase implements phase interface
|
// Phase implements phase interface
|
||||||
type phase struct {
|
type phase struct {
|
||||||
helper ifc.Helper
|
helper ifc.Helper
|
||||||
apiObj *v1alpha1.Phase
|
apiObj *v1alpha1.Phase
|
||||||
registry ExecutorRegistry
|
registry ExecutorRegistry
|
||||||
processor events.EventProcessor
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Executor returns executor interface associated with the phase
|
// Executor returns executor interface associated with the phase
|
||||||
@ -111,17 +109,12 @@ func (p *phase) Executor() (ifc.Executor, error) {
|
|||||||
|
|
||||||
// Run runs the phase via executor
|
// Run runs the phase via executor
|
||||||
func (p *phase) Run(ro ifc.RunOptions) error {
|
func (p *phase) Run(ro ifc.RunOptions) error {
|
||||||
defer p.processor.Close()
|
|
||||||
executor, err := p.Executor()
|
executor, err := p.Executor()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
ch := make(chan events.Event)
|
|
||||||
|
|
||||||
go func() {
|
return executor.Run(ro)
|
||||||
executor.Run(ch, ro)
|
|
||||||
}()
|
|
||||||
return p.processor.Process(ch)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate makes sure that phase is properly configured
|
// Validate makes sure that phase is properly configured
|
||||||
@ -304,13 +297,9 @@ var _ ifc.Client = &client{}
|
|||||||
type client struct {
|
type client struct {
|
||||||
ifc.Helper
|
ifc.Helper
|
||||||
|
|
||||||
registry ExecutorRegistry
|
registry ExecutorRegistry
|
||||||
processorFunc ProcessorFunc
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProcessorFunc that returns processor interface
|
|
||||||
type ProcessorFunc func() events.EventProcessor
|
|
||||||
|
|
||||||
// Option allows to add various options to a phase
|
// Option allows to add various options to a phase
|
||||||
type Option func(*client)
|
type Option func(*client)
|
||||||
|
|
||||||
@ -330,9 +319,6 @@ func NewClient(helper ifc.Helper, opts ...Option) ifc.Client {
|
|||||||
if c.registry == nil {
|
if c.registry == nil {
|
||||||
c.registry = DefaultExecutorRegistry
|
c.registry = DefaultExecutorRegistry
|
||||||
}
|
}
|
||||||
if c.processorFunc == nil {
|
|
||||||
c.processorFunc = defaultProcessor
|
|
||||||
}
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -343,10 +329,9 @@ func (c *client) PhaseByID(id ifc.ID) (ifc.Phase, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
phase := &phase{
|
phase := &phase{
|
||||||
apiObj: phaseObj,
|
apiObj: phaseObj,
|
||||||
helper: c.Helper,
|
helper: c.Helper,
|
||||||
processor: c.processorFunc(),
|
registry: c.registry,
|
||||||
registry: c.registry,
|
|
||||||
}
|
}
|
||||||
return phase, nil
|
return phase, nil
|
||||||
}
|
}
|
||||||
@ -365,14 +350,9 @@ func (c *client) PlanByID(id ifc.ID) (ifc.Plan, error) {
|
|||||||
|
|
||||||
func (c *client) PhaseByAPIObj(phaseObj *v1alpha1.Phase) (ifc.Phase, error) {
|
func (c *client) PhaseByAPIObj(phaseObj *v1alpha1.Phase) (ifc.Phase, error) {
|
||||||
phase := &phase{
|
phase := &phase{
|
||||||
apiObj: phaseObj,
|
apiObj: phaseObj,
|
||||||
helper: c.Helper,
|
helper: c.Helper,
|
||||||
processor: c.processorFunc(),
|
registry: c.registry,
|
||||||
registry: c.registry,
|
|
||||||
}
|
}
|
||||||
return phase, nil
|
return phase, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultProcessor() events.EventProcessor {
|
|
||||||
return events.NewDefaultProcessor()
|
|
||||||
}
|
|
||||||
|
@ -25,7 +25,6 @@ import (
|
|||||||
|
|
||||||
"opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
"opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
||||||
"opendev.org/airship/airshipctl/pkg/config"
|
"opendev.org/airship/airshipctl/pkg/config"
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
"opendev.org/airship/airshipctl/pkg/phase"
|
"opendev.org/airship/airshipctl/pkg/phase"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/errors"
|
"opendev.org/airship/airshipctl/pkg/phase/errors"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
||||||
@ -514,8 +513,8 @@ func (e fakeExecutor) Render(_ io.Writer, _ ifc.RenderOptions) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e fakeExecutor) Run(ch chan events.Event, _ ifc.RunOptions) {
|
func (e fakeExecutor) Run(_ ifc.RunOptions) error {
|
||||||
defer close(ch)
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e fakeExecutor) Validate() error {
|
func (e fakeExecutor) Validate() error {
|
||||||
|
@ -22,9 +22,9 @@ import (
|
|||||||
"opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
"opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
||||||
airshipv1 "opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
airshipv1 "opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
||||||
commonerrors "opendev.org/airship/airshipctl/pkg/errors"
|
commonerrors "opendev.org/airship/airshipctl/pkg/errors"
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
"opendev.org/airship/airshipctl/pkg/inventory"
|
"opendev.org/airship/airshipctl/pkg/inventory"
|
||||||
inventoryifc "opendev.org/airship/airshipctl/pkg/inventory/ifc"
|
inventoryifc "opendev.org/airship/airshipctl/pkg/inventory/ifc"
|
||||||
|
"opendev.org/airship/airshipctl/pkg/log"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/executors/errors"
|
"opendev.org/airship/airshipctl/pkg/phase/executors/errors"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
||||||
)
|
)
|
||||||
@ -48,21 +48,14 @@ func NewBaremetalExecutor(cfg ifc.ExecutorConfig) (ifc.Executor, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run runs baremetal operations as executor
|
// Run runs baremetal operations as executor
|
||||||
func (e *BaremetalManagerExecutor) Run(evtCh chan events.Event, opts ifc.RunOptions) {
|
func (e *BaremetalManagerExecutor) Run(opts ifc.RunOptions) error {
|
||||||
defer close(evtCh)
|
|
||||||
commandOptions := toCommandOptions(e.inventory, e.options.Spec, opts)
|
commandOptions := toCommandOptions(e.inventory, e.options.Spec, opts)
|
||||||
|
|
||||||
evtCh <- events.NewEvent().WithBaremetalManagerEvent(events.BaremetalManagerEvent{
|
log.Print(fmt.Sprintf("Starting remote operation '%s', selector to be to filter hosts %v",
|
||||||
Step: events.BaremetalManagerStart,
|
e.options.Spec.Operation, e.options.Spec.HostSelector))
|
||||||
HostOperation: string(e.options.Spec.Operation),
|
|
||||||
Message: fmt.Sprintf("Starting remote operation '%s', selector to be to filter hosts %v",
|
|
||||||
e.options.Spec.Operation, e.options.Spec.HostSelector),
|
|
||||||
})
|
|
||||||
|
|
||||||
op, err := e.validate()
|
op, err := e.validate()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(evtCh, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
if !opts.DryRun {
|
if !opts.DryRun {
|
||||||
switch e.options.Spec.Operation {
|
switch e.options.Spec.Operation {
|
||||||
@ -75,16 +68,12 @@ func (e *BaremetalManagerExecutor) Run(evtCh chan events.Event, opts ifc.RunOpti
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(evtCh, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
evtCh <- events.NewEvent().WithBaremetalManagerEvent(events.BaremetalManagerEvent{
|
log.Print(fmt.Sprintf("Successfully completed operation against host selected by selector %v",
|
||||||
Step: events.BaremetalManagerComplete,
|
e.options.Spec.HostSelector))
|
||||||
HostOperation: string(e.options.Spec.Operation),
|
return nil
|
||||||
Message: fmt.Sprintf("Successfully completed operation against host selected by selector %v",
|
|
||||||
e.options.Spec.HostSelector),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate executor configuration and documents
|
// Validate executor configuration and documents
|
||||||
|
@ -26,7 +26,6 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"opendev.org/airship/airshipctl/pkg/document"
|
"opendev.org/airship/airshipctl/pkg/document"
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
inventoryifc "opendev.org/airship/airshipctl/pkg/inventory/ifc"
|
inventoryifc "opendev.org/airship/airshipctl/pkg/inventory/ifc"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
||||||
@ -146,13 +145,7 @@ func TestBMHExecutorRun(t *testing.T) {
|
|||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, executor)
|
require.NotNil(t, executor)
|
||||||
ch := make(chan events.Event)
|
err = executor.Run(tt.runOptions)
|
||||||
go func() {
|
|
||||||
executor.Run(ch, tt.runOptions)
|
|
||||||
}()
|
|
||||||
processor := events.NewDefaultProcessor()
|
|
||||||
defer processor.Close()
|
|
||||||
err = processor.Process(ch)
|
|
||||||
if tt.expectedErr != "" {
|
if tt.expectedErr != "" {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
assert.Contains(t, err.Error(), tt.expectedErr)
|
assert.Contains(t, err.Error(), tt.expectedErr)
|
||||||
|
@ -30,7 +30,6 @@ import (
|
|||||||
"opendev.org/airship/airshipctl/pkg/container"
|
"opendev.org/airship/airshipctl/pkg/container"
|
||||||
"opendev.org/airship/airshipctl/pkg/document"
|
"opendev.org/airship/airshipctl/pkg/document"
|
||||||
airerrors "opendev.org/airship/airshipctl/pkg/errors"
|
airerrors "opendev.org/airship/airshipctl/pkg/errors"
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
||||||
"opendev.org/airship/airshipctl/pkg/log"
|
"opendev.org/airship/airshipctl/pkg/log"
|
||||||
phaseerrors "opendev.org/airship/airshipctl/pkg/phase/errors"
|
phaseerrors "opendev.org/airship/airshipctl/pkg/phase/errors"
|
||||||
@ -152,9 +151,7 @@ func initRepoData(c *airshipv1.Clusterctl, o *airshipv1.ClusterctlOptions, targe
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run clusterctl init as a phase runner
|
// Run clusterctl init as a phase runner
|
||||||
func (c *ClusterctlExecutor) Run(evtCh chan events.Event, opts ifc.RunOptions) {
|
func (c *ClusterctlExecutor) Run(opts ifc.RunOptions) error {
|
||||||
defer close(evtCh)
|
|
||||||
|
|
||||||
if log.DebugEnabled() {
|
if log.DebugEnabled() {
|
||||||
c.cctlOpts.CmdOptions = append(c.cctlOpts.CmdOptions, "-v5")
|
c.cctlOpts.CmdOptions = append(c.cctlOpts.CmdOptions, "-v5")
|
||||||
}
|
}
|
||||||
@ -170,16 +167,16 @@ func (c *ClusterctlExecutor) Run(evtCh chan events.Event, opts ifc.RunOptions) {
|
|||||||
var err error
|
var err error
|
||||||
c.cctlOpts.Config, err = yaml.Marshal(cctlConfig)
|
c.cctlOpts.Config, err = yaml.Marshal(cctlConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(evtCh, err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
switch c.options.Action {
|
switch c.options.Action {
|
||||||
case airshipv1.Init:
|
case airshipv1.Init:
|
||||||
c.init(evtCh)
|
return c.init()
|
||||||
case airshipv1.Move:
|
case airshipv1.Move:
|
||||||
c.move(opts.DryRun, evtCh)
|
return c.move(opts.DryRun)
|
||||||
default:
|
default:
|
||||||
handleError(evtCh, errors.ErrUnknownExecutorAction{Action: string(c.options.Action), ExecutorName: "clusterctl"})
|
return errors.ErrUnknownExecutorAction{Action: string(c.options.Action), ExecutorName: "clusterctl"}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -213,16 +210,12 @@ func (c *ClusterctlExecutor) getKubeconfig() (string, string, func(), error) {
|
|||||||
return kubeConfigFile, context, cleanup, nil
|
return kubeConfigFile, context, cleanup, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterctlExecutor) init(evtCh chan events.Event) {
|
func (c *ClusterctlExecutor) init() error {
|
||||||
evtCh <- events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
log.Print("starting clusterctl init executor")
|
||||||
Operation: events.ClusterctlInitStart,
|
|
||||||
Message: "starting clusterctl init executor",
|
|
||||||
})
|
|
||||||
|
|
||||||
kubecfg, context, cleanup, err := c.getKubeconfig()
|
kubecfg, context, cleanup, err := c.getKubeconfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(evtCh, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
|
||||||
@ -245,38 +238,29 @@ func (c *ClusterctlExecutor) init(evtCh chan events.Event) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err = c.run(); err != nil {
|
if err = c.run(); err != nil {
|
||||||
handleError(evtCh, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
eventMsg := "clusterctl init completed successfully"
|
log.Print("clusterctl init completed successfully")
|
||||||
evtCh <- events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
return nil
|
||||||
Operation: events.ClusterctlInitEnd,
|
|
||||||
Message: eventMsg,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterctlExecutor) move(dryRun bool, evtCh chan events.Event) {
|
func (c *ClusterctlExecutor) move(dryRun bool) error {
|
||||||
evtCh <- events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
log.Print("starting clusterctl move executor")
|
||||||
Operation: events.ClusterctlMoveStart,
|
|
||||||
Message: "starting clusterctl move executor",
|
|
||||||
})
|
|
||||||
kubecfg, context, cleanup, err := c.getKubeconfig()
|
kubecfg, context, cleanup, err := c.getKubeconfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(evtCh, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
|
||||||
fromCluster, err := c.clusterMap.ParentCluster(c.clusterName)
|
fromCluster, err := c.clusterMap.ParentCluster(c.clusterName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(evtCh, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
fromContext, err := c.clusterMap.ClusterKubeconfigContext(fromCluster)
|
fromContext, err := c.clusterMap.ClusterKubeconfigContext(fromCluster)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(evtCh, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.cctlOpts.CmdOptions = append(
|
c.cctlOpts.CmdOptions = append(
|
||||||
@ -297,15 +281,11 @@ func (c *ClusterctlExecutor) move(dryRun bool, evtCh chan events.Event) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err = c.run(); err != nil {
|
if err = c.run(); err != nil {
|
||||||
handleError(evtCh, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
eventMsg := "clusterctl move completed successfully"
|
log.Print("clusterctl move completed successfully")
|
||||||
evtCh <- events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
return nil
|
||||||
Operation: events.ClusterctlMoveEnd,
|
|
||||||
Message: eventMsg,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate executor configuration and documents
|
// Validate executor configuration and documents
|
||||||
|
@ -20,7 +20,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@ -29,7 +28,6 @@ import (
|
|||||||
"opendev.org/airship/airshipctl/pkg/cluster/clustermap"
|
"opendev.org/airship/airshipctl/pkg/cluster/clustermap"
|
||||||
"opendev.org/airship/airshipctl/pkg/container"
|
"opendev.org/airship/airshipctl/pkg/container"
|
||||||
"opendev.org/airship/airshipctl/pkg/document"
|
"opendev.org/airship/airshipctl/pkg/document"
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/executors/errors"
|
"opendev.org/airship/airshipctl/pkg/phase/executors/errors"
|
||||||
@ -248,17 +246,15 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
|||||||
name string
|
name string
|
||||||
cfgDoc document.Document
|
cfgDoc document.Document
|
||||||
kubecfg kubeconfig.Interface
|
kubecfg kubeconfig.Interface
|
||||||
expectedEvt []events.Event
|
expectedErr error
|
||||||
clusterMap clustermap.ClusterMap
|
clusterMap clustermap.ClusterMap
|
||||||
clientFunc container.ClientV1Alpha1FactoryFunc
|
clientFunc container.ClientV1Alpha1FactoryFunc
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "Error unknown action",
|
name: "Error unknown action",
|
||||||
cfgDoc: executorDoc(t, fmt.Sprintf(executorConfigTmplGood, "someAction")),
|
cfgDoc: executorDoc(t, fmt.Sprintf(executorConfigTmplGood, "someAction")),
|
||||||
expectedEvt: []events.Event{
|
expectedErr: errors.ErrUnknownExecutorAction{Action: "someAction", ExecutorName: "clusterctl"},
|
||||||
wrapError(errors.ErrUnknownExecutorAction{Action: "someAction", ExecutorName: "clusterctl"}),
|
clusterMap: clustermap.NewClusterMap(v1alpha1.DefaultClusterMap()),
|
||||||
},
|
|
||||||
clusterMap: clustermap.NewClusterMap(v1alpha1.DefaultClusterMap()),
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Failed get kubeconfig file - init",
|
name: "Failed get kubeconfig file - init",
|
||||||
@ -266,13 +262,8 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
|||||||
kubecfg: fakeKubeConfig{getFile: func() (string, kubeconfig.Cleanup, error) {
|
kubecfg: fakeKubeConfig{getFile: func() (string, kubeconfig.Cleanup, error) {
|
||||||
return "", nil, errTmpFile
|
return "", nil, errTmpFile
|
||||||
}},
|
}},
|
||||||
expectedEvt: []events.Event{
|
expectedErr: errTmpFile,
|
||||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
clusterMap: clustermap.NewClusterMap(v1alpha1.DefaultClusterMap()),
|
||||||
Operation: events.ClusterctlInitStart,
|
|
||||||
}),
|
|
||||||
wrapError(errTmpFile),
|
|
||||||
},
|
|
||||||
clusterMap: clustermap.NewClusterMap(v1alpha1.DefaultClusterMap()),
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Failed get kubeconfig file - move",
|
name: "Failed get kubeconfig file - move",
|
||||||
@ -280,13 +271,8 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
|||||||
kubecfg: fakeKubeConfig{getFile: func() (string, kubeconfig.Cleanup, error) {
|
kubecfg: fakeKubeConfig{getFile: func() (string, kubeconfig.Cleanup, error) {
|
||||||
return "", nil, errTmpFile
|
return "", nil, errTmpFile
|
||||||
}},
|
}},
|
||||||
expectedEvt: []events.Event{
|
expectedErr: errTmpFile,
|
||||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
clusterMap: clustermap.NewClusterMap(v1alpha1.DefaultClusterMap()),
|
||||||
Operation: events.ClusterctlMoveStart,
|
|
||||||
}),
|
|
||||||
wrapError(errTmpFile),
|
|
||||||
},
|
|
||||||
clusterMap: clustermap.NewClusterMap(v1alpha1.DefaultClusterMap()),
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Failed get kubeconfig context - init",
|
name: "Failed get kubeconfig context - init",
|
||||||
@ -294,12 +280,7 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
|||||||
kubecfg: fakeKubeConfig{getFile: func() (string, kubeconfig.Cleanup, error) {
|
kubecfg: fakeKubeConfig{getFile: func() (string, kubeconfig.Cleanup, error) {
|
||||||
return "", func() {}, nil
|
return "", func() {}, nil
|
||||||
}},
|
}},
|
||||||
expectedEvt: []events.Event{
|
expectedErr: errCtx,
|
||||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
|
||||||
Operation: events.ClusterctlInitStart,
|
|
||||||
}),
|
|
||||||
wrapError(errCtx),
|
|
||||||
},
|
|
||||||
clusterMap: ClusterMapMockInterface{MockClusterKubeconfigContext: func(s string) (string, error) {
|
clusterMap: ClusterMapMockInterface{MockClusterKubeconfigContext: func(s string) (string, error) {
|
||||||
return "", errCtx
|
return "", errCtx
|
||||||
}},
|
}},
|
||||||
@ -310,12 +291,7 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
|||||||
kubecfg: fakeKubeConfig{getFile: func() (string, kubeconfig.Cleanup, error) {
|
kubecfg: fakeKubeConfig{getFile: func() (string, kubeconfig.Cleanup, error) {
|
||||||
return "", func() {}, nil
|
return "", func() {}, nil
|
||||||
}},
|
}},
|
||||||
expectedEvt: []events.Event{
|
expectedErr: errCtx,
|
||||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
|
||||||
Operation: events.ClusterctlMoveStart,
|
|
||||||
}),
|
|
||||||
wrapError(errCtx),
|
|
||||||
},
|
|
||||||
clusterMap: ClusterMapMockInterface{MockClusterKubeconfigContext: func(s string) (string, error) {
|
clusterMap: ClusterMapMockInterface{MockClusterKubeconfigContext: func(s string) (string, error) {
|
||||||
return "", errCtx
|
return "", errCtx
|
||||||
}},
|
}},
|
||||||
@ -326,12 +302,7 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
|||||||
kubecfg: fakeKubeConfig{getFile: func() (string, kubeconfig.Cleanup, error) {
|
kubecfg: fakeKubeConfig{getFile: func() (string, kubeconfig.Cleanup, error) {
|
||||||
return "", func() {}, nil
|
return "", func() {}, nil
|
||||||
}},
|
}},
|
||||||
expectedEvt: []events.Event{
|
expectedErr: errParent,
|
||||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
|
||||||
Operation: events.ClusterctlMoveStart,
|
|
||||||
}),
|
|
||||||
wrapError(errParent),
|
|
||||||
},
|
|
||||||
clusterMap: ClusterMapMockInterface{MockClusterKubeconfigContext: func(s string) (string, error) {
|
clusterMap: ClusterMapMockInterface{MockClusterKubeconfigContext: func(s string) (string, error) {
|
||||||
return "ctx", nil
|
return "ctx", nil
|
||||||
},
|
},
|
||||||
@ -354,14 +325,6 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
}}
|
}}
|
||||||
},
|
},
|
||||||
expectedEvt: []events.Event{
|
|
||||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
|
||||||
Operation: events.ClusterctlInitStart,
|
|
||||||
}),
|
|
||||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
|
||||||
Operation: events.ClusterctlInitEnd,
|
|
||||||
}),
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Regular Run move",
|
name: "Regular Run move",
|
||||||
@ -381,14 +344,6 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
}}
|
}}
|
||||||
},
|
},
|
||||||
expectedEvt: []events.Event{
|
|
||||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
|
||||||
Operation: events.ClusterctlMoveStart,
|
|
||||||
}),
|
|
||||||
events.NewEvent().WithClusterctlEvent(events.ClusterctlEvent{
|
|
||||||
Operation: events.ClusterctlMoveEnd,
|
|
||||||
}),
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, test := range testCases {
|
for _, test := range testCases {
|
||||||
@ -404,23 +359,8 @@ func TestClusterctlExecutorRun(t *testing.T) {
|
|||||||
ContainerFunc: tt.clientFunc,
|
ContainerFunc: tt.clientFunc,
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
ch := make(chan events.Event)
|
err = executor.Run(ifc.RunOptions{DryRun: true})
|
||||||
go executor.Run(ch, ifc.RunOptions{DryRun: true})
|
assert.Equal(t, tt.expectedErr, err)
|
||||||
var actualEvt []events.Event
|
|
||||||
for evt := range ch {
|
|
||||||
// Skip timestamp for comparison
|
|
||||||
evt.Timestamp = time.Time{}
|
|
||||||
if evt.Type == events.ClusterctlType {
|
|
||||||
// Set message to empty string, so it's not compared
|
|
||||||
evt.ClusterctlEvent.Message = ""
|
|
||||||
}
|
|
||||||
actualEvt = append(actualEvt, evt)
|
|
||||||
}
|
|
||||||
for i := range tt.expectedEvt {
|
|
||||||
// Skip timestamp for comparison
|
|
||||||
tt.expectedEvt[i].Timestamp = time.Time{}
|
|
||||||
}
|
|
||||||
assert.Equal(t, tt.expectedEvt, actualEvt)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,6 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
|
||||||
airshipv1 "opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
airshipv1 "opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/executors/errors"
|
"opendev.org/airship/airshipctl/pkg/phase/executors/errors"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
||||||
)
|
)
|
||||||
@ -64,9 +63,3 @@ func RegisterExecutor(executorName string, registry map[schema.GroupVersionKind]
|
|||||||
registry[gvks[0]] = execObj
|
registry[gvks[0]] = execObj
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleError(ch chan<- events.Event, err error) {
|
|
||||||
ch <- events.NewEvent().WithErrorEvent(events.ErrorEvent{
|
|
||||||
Error: err,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
@ -25,7 +25,6 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
|
||||||
"opendev.org/airship/airshipctl/pkg/document"
|
"opendev.org/airship/airshipctl/pkg/document"
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
||||||
)
|
)
|
||||||
@ -107,12 +106,6 @@ func executorBundle(t *testing.T, s string) document.Bundle {
|
|||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
func wrapError(err error) events.Event {
|
|
||||||
return events.NewEvent().WithErrorEvent(events.ErrorEvent{
|
|
||||||
Error: err,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func testClusterMap(t *testing.T) clustermap.ClusterMap {
|
func testClusterMap(t *testing.T) clustermap.ClusterMap {
|
||||||
doc, err := document.NewDocumentFromBytes([]byte(singleExecutorClusterMap))
|
doc, err := document.NewDocumentFromBytes([]byte(singleExecutorClusterMap))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -25,7 +25,6 @@ import (
|
|||||||
"opendev.org/airship/airshipctl/pkg/container"
|
"opendev.org/airship/airshipctl/pkg/container"
|
||||||
"opendev.org/airship/airshipctl/pkg/document"
|
"opendev.org/airship/airshipctl/pkg/document"
|
||||||
commonerrors "opendev.org/airship/airshipctl/pkg/errors"
|
commonerrors "opendev.org/airship/airshipctl/pkg/errors"
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
||||||
"opendev.org/airship/airshipctl/pkg/log"
|
"opendev.org/airship/airshipctl/pkg/log"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/errors"
|
"opendev.org/airship/airshipctl/pkg/phase/errors"
|
||||||
@ -82,27 +81,20 @@ func NewContainerExecutor(cfg ifc.ExecutorConfig) (ifc.Executor, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run generic container as a phase runner
|
// Run generic container as a phase runner
|
||||||
func (c *ContainerExecutor) Run(evtCh chan events.Event, opts ifc.RunOptions) {
|
func (c *ContainerExecutor) Run(opts ifc.RunOptions) error {
|
||||||
defer close(evtCh)
|
log.Print("starting generic container")
|
||||||
|
|
||||||
evtCh <- events.NewEvent().WithGenericContainerEvent(events.GenericContainerEvent{
|
|
||||||
Operation: events.GenericContainerStart,
|
|
||||||
Message: "starting generic container",
|
|
||||||
})
|
|
||||||
|
|
||||||
if c.Options.ClusterName != "" {
|
if c.Options.ClusterName != "" {
|
||||||
cleanup, err := c.SetKubeConfig()
|
cleanup, err := c.SetKubeConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(evtCh, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
}
|
}
|
||||||
|
|
||||||
input, err := bundleReader(c.ExecutorBundle)
|
input, err := bundleReader(c.ExecutorBundle)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(evtCh, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO this logic is redundant in executor package, move it to pkg/container
|
// TODO this logic is redundant in executor package, move it to pkg/container
|
||||||
@ -112,29 +104,22 @@ func (c *ContainerExecutor) Run(evtCh chan events.Event, opts ifc.RunOptions) {
|
|||||||
output = os.Stdout
|
output = os.Stdout
|
||||||
}
|
}
|
||||||
if err = c.setConfig(); err != nil {
|
if err = c.setConfig(); err != nil {
|
||||||
handleError(evtCh, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO check the executor type when dryrun is set
|
// TODO check the executor type when dryrun is set
|
||||||
if opts.DryRun {
|
if opts.DryRun {
|
||||||
evtCh <- events.NewEvent().WithGenericContainerEvent(events.GenericContainerEvent{
|
log.Print("DryRun execution finished")
|
||||||
Operation: events.GenericContainerStop,
|
return nil
|
||||||
Message: "DryRun execution finished",
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.ClientFunc(c.ResultsDir, input, output, c.Container, c.MountBasePath).Run()
|
err = c.ClientFunc(c.ResultsDir, input, output, c.Container, c.MountBasePath).Run()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(evtCh, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
evtCh <- events.NewEvent().WithGenericContainerEvent(events.GenericContainerEvent{
|
log.Print("execution of the generic container finished")
|
||||||
Operation: events.GenericContainerStop,
|
return nil
|
||||||
Message: "execution of the generic container finished",
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetKubeConfig adds env variable and mounts kubeconfig to container
|
// SetKubeConfig adds env variable and mounts kubeconfig to container
|
||||||
|
@ -27,7 +27,6 @@ import (
|
|||||||
"opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
"opendev.org/airship/airshipctl/pkg/api/v1alpha1"
|
||||||
"opendev.org/airship/airshipctl/pkg/container"
|
"opendev.org/airship/airshipctl/pkg/container"
|
||||||
"opendev.org/airship/airshipctl/pkg/document"
|
"opendev.org/airship/airshipctl/pkg/document"
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/errors"
|
"opendev.org/airship/airshipctl/pkg/phase/errors"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
||||||
@ -252,24 +251,12 @@ func TestGenericContainer(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
ch := make(chan events.Event)
|
err := containerExecutor.Run(tt.runOptions)
|
||||||
go containerExecutor.Run(ch, tt.runOptions)
|
|
||||||
|
|
||||||
actualEvt := make([]events.Event, 0)
|
|
||||||
for evt := range ch {
|
|
||||||
actualEvt = append(actualEvt, evt)
|
|
||||||
}
|
|
||||||
require.Greater(t, len(actualEvt), 0)
|
|
||||||
|
|
||||||
if tt.expectedErr != "" {
|
if tt.expectedErr != "" {
|
||||||
e := actualEvt[len(actualEvt)-1]
|
require.Error(t, err)
|
||||||
require.Error(t, e.ErrorEvent.Error)
|
assert.Contains(t, err.Error(), tt.expectedErr)
|
||||||
assert.Contains(t, e.ErrorEvent.Error.Error(), tt.expectedErr)
|
|
||||||
} else {
|
} else {
|
||||||
e := actualEvt[len(actualEvt)-1]
|
assert.NoError(t, err)
|
||||||
assert.NoError(t, e.ErrorEvent.Error)
|
|
||||||
assert.Equal(t, e.Type, events.GenericContainerType)
|
|
||||||
assert.Equal(t, e.GenericContainerEvent.Operation, events.GenericContainerStop)
|
|
||||||
assert.Equal(t, tt.resultConfig, containerExecutor.Container.Config)
|
assert.Equal(t, tt.resultConfig, containerExecutor.Container.Config)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -24,7 +24,6 @@ import (
|
|||||||
"opendev.org/airship/airshipctl/pkg/container"
|
"opendev.org/airship/airshipctl/pkg/container"
|
||||||
"opendev.org/airship/airshipctl/pkg/document"
|
"opendev.org/airship/airshipctl/pkg/document"
|
||||||
"opendev.org/airship/airshipctl/pkg/errors"
|
"opendev.org/airship/airshipctl/pkg/errors"
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
"opendev.org/airship/airshipctl/pkg/log"
|
"opendev.org/airship/airshipctl/pkg/log"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
||||||
)
|
)
|
||||||
@ -56,20 +55,12 @@ func NewEphemeralExecutor(cfg ifc.ExecutorConfig) (ifc.Executor, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run ephemeral as a phase runner
|
// Run ephemeral as a phase runner
|
||||||
func (c *EphemeralExecutor) Run(evtCh chan events.Event, opts ifc.RunOptions) {
|
func (c *EphemeralExecutor) Run(opts ifc.RunOptions) error {
|
||||||
defer close(evtCh)
|
log.Print("Processing Ephemeral cluster operation ...")
|
||||||
|
|
||||||
evtCh <- events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
|
||||||
Operation: events.BootstrapStart,
|
|
||||||
Message: "Processing Ephemeral cluster operation ...",
|
|
||||||
})
|
|
||||||
|
|
||||||
if opts.DryRun {
|
if opts.DryRun {
|
||||||
log.Print("Dryrun: bootstrap container command will be skipped")
|
log.Print("Dryrun: bootstrap container command will be skipped")
|
||||||
evtCh <- events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
return nil
|
||||||
Operation: events.BootstrapDryRun,
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.Container == nil {
|
if c.Container == nil {
|
||||||
@ -79,8 +70,7 @@ func (c *EphemeralExecutor) Run(evtCh chan events.Event, opts ifc.RunOptions) {
|
|||||||
c.BootConf.BootstrapContainer.ContainerRuntime,
|
c.BootConf.BootstrapContainer.ContainerRuntime,
|
||||||
c.BootConf.BootstrapContainer.Image)
|
c.BootConf.BootstrapContainer.Image)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(evtCh, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
c.Container = builder
|
c.Container = builder
|
||||||
}
|
}
|
||||||
@ -91,43 +81,29 @@ func (c *EphemeralExecutor) Run(evtCh chan events.Event, opts ifc.RunOptions) {
|
|||||||
Sleep: time.Sleep,
|
Sleep: time.Sleep,
|
||||||
}
|
}
|
||||||
|
|
||||||
evtCh <- events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
log.Print("Verifying executor manifest document ...")
|
||||||
Operation: events.BootstrapValidation,
|
|
||||||
Message: "Verifying executor manifest document ...",
|
|
||||||
})
|
|
||||||
|
|
||||||
err := bootstrapOpts.VerifyInputs()
|
err := bootstrapOpts.VerifyInputs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(evtCh, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
evtCh <- events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
log.Print("Creating and starting the Bootstrap Container ...")
|
||||||
Operation: events.BootstrapRun,
|
|
||||||
Message: "Creating and starting the Bootstrap Container ...",
|
|
||||||
})
|
|
||||||
|
|
||||||
err = bootstrapOpts.CreateBootstrapContainer()
|
err = bootstrapOpts.CreateBootstrapContainer()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(evtCh, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
evtCh <- events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
log.Print("Verifying generation of kubeconfig file ...")
|
||||||
Operation: events.BootstrapValidation,
|
|
||||||
Message: "Verifying generation of kubeconfig file ...",
|
|
||||||
})
|
|
||||||
|
|
||||||
err = bootstrapOpts.VerifyArtifacts()
|
err = bootstrapOpts.VerifyArtifacts()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(evtCh, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
evtCh <- events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
log.Print("Ephemeral cluster operation has completed successfully and artifacts verified")
|
||||||
Operation: events.BootstrapEnd,
|
return nil
|
||||||
Message: "Ephemeral cluster operation has completed successfully and artifacts verified",
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate executor configuration and documents
|
// Validate executor configuration and documents
|
||||||
|
@ -19,7 +19,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@ -29,7 +28,6 @@ import (
|
|||||||
"opendev.org/airship/airshipctl/pkg/container"
|
"opendev.org/airship/airshipctl/pkg/container"
|
||||||
"opendev.org/airship/airshipctl/pkg/document"
|
"opendev.org/airship/airshipctl/pkg/document"
|
||||||
"opendev.org/airship/airshipctl/pkg/errors"
|
"opendev.org/airship/airshipctl/pkg/errors"
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
"opendev.org/airship/airshipctl/pkg/phase/ifc"
|
||||||
"opendev.org/airship/airshipctl/testutil"
|
"opendev.org/airship/airshipctl/testutil"
|
||||||
@ -92,7 +90,7 @@ func TestExecutorEphemeralRun(t *testing.T) {
|
|||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
name string
|
name string
|
||||||
container *testcontainer.MockContainer
|
container *testcontainer.MockContainer
|
||||||
expectedEvt []events.Event
|
expectedErr error
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "Run bootstrap container successfully",
|
name: "Run bootstrap container successfully",
|
||||||
@ -106,44 +104,14 @@ func TestExecutorEphemeralRun(t *testing.T) {
|
|||||||
return state, nil
|
return state, nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedEvt: []events.Event{
|
|
||||||
events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
|
||||||
Operation: events.BootstrapStart,
|
|
||||||
}),
|
|
||||||
events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
|
||||||
Operation: events.BootstrapValidation,
|
|
||||||
}),
|
|
||||||
events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
|
||||||
Operation: events.BootstrapRun,
|
|
||||||
}),
|
|
||||||
events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
|
||||||
Operation: events.BootstrapValidation,
|
|
||||||
}),
|
|
||||||
events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
|
||||||
Operation: events.BootstrapEnd,
|
|
||||||
}),
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Run bootstrap container with Unknow Error",
|
name: "Run bootstrap container with Unknown Error",
|
||||||
container: &testcontainer.MockContainer{
|
container: &testcontainer.MockContainer{
|
||||||
MockRunCommand: func() error { return ephemeral.ErrBootstrapContainerRun{} },
|
MockRunCommand: func() error { return ephemeral.ErrBootstrapContainerRun{} },
|
||||||
MockRmContainer: func() error { return nil },
|
MockRmContainer: func() error { return nil },
|
||||||
},
|
},
|
||||||
expectedEvt: []events.Event{
|
expectedErr: ephemeral.ErrBootstrapContainerRun{},
|
||||||
events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
|
||||||
Operation: events.BootstrapStart,
|
|
||||||
}),
|
|
||||||
events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
|
||||||
Operation: events.BootstrapValidation,
|
|
||||||
}),
|
|
||||||
events.NewEvent().WithBootstrapEvent(events.BootstrapEvent{
|
|
||||||
Operation: events.BootstrapRun,
|
|
||||||
}),
|
|
||||||
events.NewEvent().WithErrorEvent(events.ErrorEvent{
|
|
||||||
Error: ephemeral.ErrBootstrapContainerRun{},
|
|
||||||
}),
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, test := range testCases {
|
for _, test := range testCases {
|
||||||
@ -153,23 +121,8 @@ func TestExecutorEphemeralRun(t *testing.T) {
|
|||||||
BootConf: testCfg,
|
BootConf: testCfg,
|
||||||
Container: tt.container,
|
Container: tt.container,
|
||||||
}
|
}
|
||||||
ch := make(chan events.Event)
|
err := executor.Run(ifc.RunOptions{})
|
||||||
go executor.Run(ch, ifc.RunOptions{})
|
assert.Equal(t, tt.expectedErr, err)
|
||||||
var actualEvt []events.Event
|
|
||||||
for evt := range ch {
|
|
||||||
if evt.Type == events.BootstrapType {
|
|
||||||
// Set message to empty string, so it's not compared
|
|
||||||
evt.BootstrapEvent.Message = ""
|
|
||||||
}
|
|
||||||
actualEvt = append(actualEvt, evt)
|
|
||||||
}
|
|
||||||
for i := range tt.expectedEvt {
|
|
||||||
// Fix timestamps for comparison
|
|
||||||
timeStamp := time.Time{}
|
|
||||||
tt.expectedEvt[i].Timestamp = timeStamp
|
|
||||||
actualEvt[i].Timestamp = timeStamp
|
|
||||||
}
|
|
||||||
assert.Equal(t, tt.expectedEvt, actualEvt)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -26,7 +26,6 @@ import (
|
|||||||
"opendev.org/airship/airshipctl/pkg/container"
|
"opendev.org/airship/airshipctl/pkg/container"
|
||||||
"opendev.org/airship/airshipctl/pkg/document"
|
"opendev.org/airship/airshipctl/pkg/document"
|
||||||
airerrors "opendev.org/airship/airshipctl/pkg/errors"
|
airerrors "opendev.org/airship/airshipctl/pkg/errors"
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
||||||
"opendev.org/airship/airshipctl/pkg/log"
|
"opendev.org/airship/airshipctl/pkg/log"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/errors"
|
"opendev.org/airship/airshipctl/pkg/phase/errors"
|
||||||
@ -93,17 +92,14 @@ func NewKubeApplierExecutor(cfg ifc.ExecutorConfig) (ifc.Executor, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run executor, should be performed in separate go routine
|
// Run executor, should be performed in separate go routine
|
||||||
func (e *KubeApplierExecutor) Run(ch chan events.Event, runOpts ifc.RunOptions) {
|
func (e *KubeApplierExecutor) Run(runOpts ifc.RunOptions) error {
|
||||||
defer close(ch)
|
|
||||||
|
|
||||||
e.apiObject.Config.Debug = log.DebugEnabled()
|
e.apiObject.Config.Debug = log.DebugEnabled()
|
||||||
e.apiObject.Config.PhaseName = e.BundleName
|
e.apiObject.Config.PhaseName = e.BundleName
|
||||||
|
|
||||||
if e.apiObject.Config.Kubeconfig == "" {
|
if e.apiObject.Config.Kubeconfig == "" {
|
||||||
kcfg, ctx, cleanup, err := e.getKubeconfig()
|
kcfg, ctx, cleanup, err := e.getKubeconfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(ch, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
e.apiObject.Config.Kubeconfig, e.apiObject.Config.Context = kcfg, ctx
|
e.apiObject.Config.Kubeconfig, e.apiObject.Config.Context = kcfg, ctx
|
||||||
@ -123,22 +119,16 @@ func (e *KubeApplierExecutor) Run(ch chan events.Event, runOpts ifc.RunOptions)
|
|||||||
|
|
||||||
reader, err := e.prepareDocuments()
|
reader, err := e.prepareDocuments()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(ch, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
opts, err := yaml.Marshal(&e.apiObject.Config)
|
opts, err := yaml.Marshal(&e.apiObject.Config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(ch, err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
e.execObj.Config = string(opts)
|
e.execObj.Config = string(opts)
|
||||||
err = e.clientFunc("", reader, os.Stdout, e.execObj, e.targetPath).Run()
|
return e.clientFunc("", reader, os.Stdout, e.execObj, e.targetPath).Run()
|
||||||
if err != nil {
|
|
||||||
handleError(ch, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *KubeApplierExecutor) getKubeconfig() (string, string, func(), error) {
|
func (e *KubeApplierExecutor) getKubeconfig() (string, string, func(), error) {
|
||||||
|
@ -28,7 +28,6 @@ import (
|
|||||||
"opendev.org/airship/airshipctl/pkg/cluster/clustermap"
|
"opendev.org/airship/airshipctl/pkg/cluster/clustermap"
|
||||||
"opendev.org/airship/airshipctl/pkg/container"
|
"opendev.org/airship/airshipctl/pkg/container"
|
||||||
"opendev.org/airship/airshipctl/pkg/document"
|
"opendev.org/airship/airshipctl/pkg/document"
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
"opendev.org/airship/airshipctl/pkg/fs"
|
"opendev.org/airship/airshipctl/pkg/fs"
|
||||||
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
||||||
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
"opendev.org/airship/airshipctl/pkg/phase/executors"
|
||||||
@ -337,10 +336,7 @@ func TestKubeApplierExecutorRun(t *testing.T) {
|
|||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, exec)
|
require.NotNil(t, exec)
|
||||||
ch := make(chan events.Event)
|
err = exec.Run(ifc.RunOptions{})
|
||||||
go exec.Run(ch, ifc.RunOptions{})
|
|
||||||
processor := events.NewDefaultProcessor()
|
|
||||||
err = processor.Process(ch)
|
|
||||||
if tt.containsErr != "" {
|
if tt.containsErr != "" {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
assert.Contains(t, err.Error(), tt.containsErr)
|
assert.Contains(t, err.Error(), tt.containsErr)
|
||||||
|
@ -21,14 +21,13 @@ import (
|
|||||||
"opendev.org/airship/airshipctl/pkg/cluster/clustermap"
|
"opendev.org/airship/airshipctl/pkg/cluster/clustermap"
|
||||||
"opendev.org/airship/airshipctl/pkg/container"
|
"opendev.org/airship/airshipctl/pkg/container"
|
||||||
"opendev.org/airship/airshipctl/pkg/document"
|
"opendev.org/airship/airshipctl/pkg/document"
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
inventoryifc "opendev.org/airship/airshipctl/pkg/inventory/ifc"
|
inventoryifc "opendev.org/airship/airshipctl/pkg/inventory/ifc"
|
||||||
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Executor interface should be implemented by each runner
|
// Executor interface should be implemented by each runner
|
||||||
type Executor interface {
|
type Executor interface {
|
||||||
Run(chan events.Event, RunOptions)
|
Run(RunOptions) error
|
||||||
Render(io.Writer, RenderOptions) error
|
Render(io.Writer, RenderOptions) error
|
||||||
Validate() error
|
Validate() error
|
||||||
Status() (ExecutorStatus, error)
|
Status() (ExecutorStatus, error)
|
||||||
|
@ -1,47 +0,0 @@
|
|||||||
/*
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
https://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package k8sutils
|
|
||||||
|
|
||||||
import (
|
|
||||||
"opendev.org/airship/airshipctl/pkg/events"
|
|
||||||
)
|
|
||||||
|
|
||||||
// SuccessEvents returns list of events that constitute a successful cli utils apply
|
|
||||||
func SuccessEvents() []events.ApplierEvent {
|
|
||||||
return []events.ApplierEvent{
|
|
||||||
{
|
|
||||||
Operation: "init",
|
|
||||||
Message: "success",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Operation: "configured",
|
|
||||||
Message: "success",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Operation: "completed",
|
|
||||||
Message: "success",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ErrorEvents return a list of events with error
|
|
||||||
func ErrorEvents() []events.ApplierEvent {
|
|
||||||
return []events.ApplierEvent{
|
|
||||||
{
|
|
||||||
Operation: "error",
|
|
||||||
Message: "apply-error",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user