diff --git a/pkg/events/processor.go b/pkg/events/processor.go index 89b14e334..d8d0e874d 100644 --- a/pkg/events/processor.go +++ b/pkg/events/processor.go @@ -17,9 +17,8 @@ package events import ( "k8s.io/cli-runtime/pkg/genericclioptions" "sigs.k8s.io/cli-utils/cmd/printers" - "sigs.k8s.io/cli-utils/pkg/common" - applyevent "sigs.k8s.io/cli-utils/pkg/apply/event" + "sigs.k8s.io/cli-utils/pkg/common" "opendev.org/airship/airshipctl/pkg/log" ) diff --git a/pkg/k8s/applier/applier.go b/pkg/k8s/applier/applier.go index 4fd8269c9..bfa5fb6f9 100644 --- a/pkg/k8s/applier/applier.go +++ b/pkg/k8s/applier/applier.go @@ -25,16 +25,19 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" cmdutil "k8s.io/kubectl/pkg/cmd/util" + "k8s.io/kubectl/pkg/scheme" cliapply "sigs.k8s.io/cli-utils/pkg/apply" applyevent "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/apply/poller" clicommon "sigs.k8s.io/cli-utils/pkg/common" "sigs.k8s.io/cli-utils/pkg/manifestreader" "sigs.k8s.io/cli-utils/pkg/provider" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" "opendev.org/airship/airshipctl/pkg/document" "opendev.org/airship/airshipctl/pkg/events" + airpoller "opendev.org/airship/airshipctl/pkg/k8s/poller" "opendev.org/airship/airshipctl/pkg/k8s/utils" "opendev.org/airship/airshipctl/pkg/log" ) @@ -86,6 +89,7 @@ func (a *Applier) ApplyBundle(bundle document.Bundle, ao ApplyOptions) { ApplierEvent: e, } } + log.Debugf("applier channel closed") } func (a *Applier) getObjects( @@ -125,14 +129,31 @@ func (a *Applier) getObjects( } else if err != nil { return nil, err } - if err = a.Driver.Initialize(a.Poller); err != nil { - return nil, err - } - restMapper, err := a.Factory.ToRESTMapper() + + mapper, err := a.Factory.ToRESTMapper() if err != nil { return nil, err } - return a.ManifestReaderFactory(false, bundle, restMapper).Read() + + if a.Poller == nil { + var pErr error + config, pErr := a.Factory.ToRESTConfig() + if pErr != nil { + return nil, pErr + } + + c, pErr := client.New(config, client.Options{Scheme: scheme.Scheme, Mapper: mapper}) + if pErr != nil { + return nil, pErr + } + a.Poller = airpoller.NewStatusPoller(c, mapper) + } + + if err = a.Driver.Initialize(a.Poller); err != nil { + return nil, err + } + + return a.ManifestReaderFactory(false, bundle, mapper).Read() } func (a *Applier) ensureNamespaceExists(name string, dryRun clicommon.DryRunStrategy) error { diff --git a/pkg/k8s/poller/poller.go b/pkg/k8s/poller/poller.go index a996f9f37..01683d150 100644 --- a/pkg/k8s/poller/poller.go +++ b/pkg/k8s/poller/poller.go @@ -16,9 +16,12 @@ package poller import ( "context" + "strings" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/cli-utils/pkg/kstatus/polling" "sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader" @@ -28,25 +31,25 @@ import ( "sigs.k8s.io/cli-utils/pkg/object" "sigs.k8s.io/controller-runtime/pkg/client" - "opendev.org/airship/airshipctl/pkg/cluster" + "opendev.org/airship/airshipctl/pkg/log" ) +const allowedApplyErrors = 3 + // NewStatusPoller creates a new StatusPoller using the given clusterreader and mapper. The StatusPoller // will use the client for all calls to the cluster. -func NewStatusPoller(reader client.Reader, mapper meta.RESTMapper, statusmap *cluster.StatusMap) *StatusPoller { +func NewStatusPoller(reader client.Reader, mapper meta.RESTMapper) *StatusPoller { return &StatusPoller{ engine: &engine.PollerEngine{ Reader: reader, Mapper: mapper, }, - statusmap: statusmap, } } // StatusPoller provides functionality for polling a cluster for status for a set of resources. type StatusPoller struct { - engine *engine.PollerEngine - statusmap *cluster.StatusMap + engine *engine.PollerEngine } // Poll will create a new statusPollerRunner that will poll all the resources provided and report their status @@ -78,9 +81,6 @@ func (s *StatusPoller) createStatusReaders(reader engine.ClusterReader, mapper m appsv1.SchemeGroupVersion.WithKind("StatefulSet").GroupKind(): statefulSetStatusReader, appsv1.SchemeGroupVersion.WithKind("ReplicaSet").GroupKind(): replicaSetStatusReader, } - for _, gk := range s.statusmap.GkMapping { - statusReaders[gk] = s.statusmap - } return statusReaders, defaultStatusReader } @@ -92,8 +92,53 @@ func (s *StatusPoller) createStatusReaders(reader engine.ClusterReader, mapper m func clusterReaderFactoryFunc(useCache bool) engine.ClusterReaderFactoryFunc { return func(r client.Reader, mapper meta.RESTMapper, identifiers []object.ObjMetadata) (engine.ClusterReader, error) { if useCache { - return clusterreader.NewCachingClusterReader(r, mapper, identifiers) + cr, err := clusterreader.NewCachingClusterReader(r, mapper, identifiers) + if err != nil { + return nil, err + } + return &CachingClusterReader{Cr: cr}, nil } return &clusterreader.DirectClusterReader{Reader: r}, nil } } + +// CachingClusterReader is wrapper for kstatus.CachingClusterReader implementation +type CachingClusterReader struct { + Cr *clusterreader.CachingClusterReader + applyErrors []error +} + +// Get is a wrapper for kstatus.CachingClusterReader Get method +func (c *CachingClusterReader) Get(ctx context.Context, key client.ObjectKey, obj *unstructured.Unstructured) error { + return c.Cr.Get(ctx, key, obj) +} + +// ListNamespaceScoped is a wrapper for kstatus.CachingClusterReader ListNamespaceScoped method +func (c *CachingClusterReader) ListNamespaceScoped( + ctx context.Context, + list *unstructured.UnstructuredList, + namespace string, + selector labels.Selector) error { + return c.Cr.ListNamespaceScoped(ctx, list, namespace, selector) +} + +// ListClusterScoped is a wrapper for kstatus.CachingClusterReader ListClusterScoped method +func (c *CachingClusterReader) ListClusterScoped( + ctx context.Context, + list *unstructured.UnstructuredList, + selector labels.Selector) error { + return c.Cr.ListClusterScoped(ctx, list, selector) +} + +// Sync is a wrapper for kstatus.CachingClusterReader Sync method, allows to filter specific errors +func (c *CachingClusterReader) Sync(ctx context.Context) error { + err := c.Cr.Sync(ctx) + if err != nil && strings.Contains(err.Error(), "request timed out") { + c.applyErrors = append(c.applyErrors, err) + if len(c.applyErrors) < allowedApplyErrors { + log.Printf("timeout error occurred during sync: '%v', skipping", err) + return nil + } + } + return err +} diff --git a/pkg/k8s/poller/poller_test.go b/pkg/k8s/poller/poller_test.go index d344c43c0..26c804d11 100755 --- a/pkg/k8s/poller/poller_test.go +++ b/pkg/k8s/poller/poller_test.go @@ -21,15 +21,11 @@ import ( "github.com/stretchr/testify/require" "sigs.k8s.io/controller-runtime/pkg/client" - "opendev.org/airship/airshipctl/pkg/cluster" - "opendev.org/airship/airshipctl/pkg/k8s/client/fake" "opendev.org/airship/airshipctl/pkg/k8s/poller" k8sutils "opendev.org/airship/airshipctl/pkg/k8s/utils" ) func TestNewStatusPoller(t *testing.T) { - airClient := fake.NewClient() - f := k8sutils.FactoryFromKubeConfig("testdata/kubeconfig.yaml", "") restConfig, err := f.ToRESTConfig() require.NoError(t, err) @@ -37,9 +33,7 @@ func TestNewStatusPoller(t *testing.T) { require.NoError(t, err) restClient, err := client.New(restConfig, client.Options{Mapper: restMapper}) require.NoError(t, err) - statusmap, err := cluster.NewStatusMap(airClient) - require.NoError(t, err) - a := poller.NewStatusPoller(restClient, restMapper, statusmap) + a := poller.NewStatusPoller(restClient, restMapper) assert.NotNil(t, a) }