diff --git a/pkg/events/processor.go b/pkg/events/processor.go index 04890e5c5..65db12212 100644 --- a/pkg/events/processor.go +++ b/pkg/events/processor.go @@ -27,6 +27,7 @@ import ( // EventProcessor use to process event channels produced by executors type EventProcessor interface { Process(<-chan Event) error + Close() } // DefaultProcessor is implementation of EventProcessor @@ -72,6 +73,11 @@ func (p *DefaultProcessor) Process(ch <-chan Event) error { return checkErrors(p.errors) } +// Close cleans up the auxiliary channels used to process events +func (p *DefaultProcessor) Close() { + close(p.applierChan) +} + func (p *DefaultProcessor) processApplierEvent(e applyevent.Event) { if e.Type == applyevent.ErrorType { log.Printf("Received error when applying errors to kubernetes %v", e.ErrorEvent.Err) diff --git a/pkg/events/processor_test.go b/pkg/events/processor_test.go index 6949f91ca..f0ee3fad9 100644 --- a/pkg/events/processor_test.go +++ b/pkg/events/processor_test.go @@ -28,6 +28,7 @@ import ( func TestDefaultProcessor(t *testing.T) { proc := events.NewDefaultProcessor(utils.Streams()) + defer proc.Close() tests := []struct { name string events []events.Event diff --git a/pkg/phase/client.go b/pkg/phase/client.go index cc7656896..95b7a87fc 100644 --- a/pkg/phase/client.go +++ b/pkg/phase/client.go @@ -115,6 +115,7 @@ func (p *phase) Executor() (ifc.Executor, error) { // Run runs the phase via executor func (p *phase) Run(ro ifc.RunOptions) error { + defer p.processor.Close() executor, err := p.Executor() if err != nil { return err