Enable tolerance to apply timeout errors

This commit allows to tolerate up to three timeout errors that
came from poll requests before interruption of apply process.

Change-Id: I6cb95eba908e62ee44be3338c263b20c7dffc34b
Signed-off-by: Ruslan Aliev <raliev@mirantis.com>
Relates-To: #579
Closes: #579
changes/87/794087/12
Ruslan Aliev 2 years ago
parent 75cc1f84a9
commit 5fc39a8b54
  1. 3
      pkg/events/processor.go
  2. 29
      pkg/k8s/applier/applier.go
  3. 63
      pkg/k8s/poller/poller.go
  4. 8
      pkg/k8s/poller/poller_test.go

@ -17,9 +17,8 @@ package events
import (
"k8s.io/cli-runtime/pkg/genericclioptions"
"sigs.k8s.io/cli-utils/cmd/printers"
"sigs.k8s.io/cli-utils/pkg/common"
applyevent "sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/common"
"opendev.org/airship/airshipctl/pkg/log"
)

@ -25,16 +25,19 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/scheme"
cliapply "sigs.k8s.io/cli-utils/pkg/apply"
applyevent "sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/poller"
clicommon "sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/manifestreader"
"sigs.k8s.io/cli-utils/pkg/provider"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"
"opendev.org/airship/airshipctl/pkg/document"
"opendev.org/airship/airshipctl/pkg/events"
airpoller "opendev.org/airship/airshipctl/pkg/k8s/poller"
"opendev.org/airship/airshipctl/pkg/k8s/utils"
"opendev.org/airship/airshipctl/pkg/log"
)
@ -86,6 +89,7 @@ func (a *Applier) ApplyBundle(bundle document.Bundle, ao ApplyOptions) {
ApplierEvent: e,
}
}
log.Debugf("applier channel closed")
}
func (a *Applier) getObjects(
@ -125,14 +129,31 @@ func (a *Applier) getObjects(
} else if err != nil {
return nil, err
}
if err = a.Driver.Initialize(a.Poller); err != nil {
mapper, err := a.Factory.ToRESTMapper()
if err != nil {
return nil, err
}
restMapper, err := a.Factory.ToRESTMapper()
if err != nil {
if a.Poller == nil {
var pErr error
config, pErr := a.Factory.ToRESTConfig()
if pErr != nil {
return nil, pErr
}
c, pErr := client.New(config, client.Options{Scheme: scheme.Scheme, Mapper: mapper})
if pErr != nil {
return nil, pErr
}
a.Poller = airpoller.NewStatusPoller(c, mapper)
}
if err = a.Driver.Initialize(a.Poller); err != nil {
return nil, err
}
return a.ManifestReaderFactory(false, bundle, restMapper).Read()
return a.ManifestReaderFactory(false, bundle, mapper).Read()
}
func (a *Applier) ensureNamespaceExists(name string, dryRun clicommon.DryRunStrategy) error {

@ -16,9 +16,12 @@ package poller
import (
"context"
"strings"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader"
@ -28,25 +31,25 @@ import (
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/controller-runtime/pkg/client"
"opendev.org/airship/airshipctl/pkg/cluster"
"opendev.org/airship/airshipctl/pkg/log"
)
const allowedApplyErrors = 3
// NewStatusPoller creates a new StatusPoller using the given clusterreader and mapper. The StatusPoller
// will use the client for all calls to the cluster.
func NewStatusPoller(reader client.Reader, mapper meta.RESTMapper, statusmap *cluster.StatusMap) *StatusPoller {
func NewStatusPoller(reader client.Reader, mapper meta.RESTMapper) *StatusPoller {
return &StatusPoller{
engine: &engine.PollerEngine{
Reader: reader,
Mapper: mapper,
},
statusmap: statusmap,
}
}
// StatusPoller provides functionality for polling a cluster for status for a set of resources.
type StatusPoller struct {
engine *engine.PollerEngine
statusmap *cluster.StatusMap
engine *engine.PollerEngine
}
// Poll will create a new statusPollerRunner that will poll all the resources provided and report their status
@ -78,9 +81,6 @@ func (s *StatusPoller) createStatusReaders(reader engine.ClusterReader, mapper m
appsv1.SchemeGroupVersion.WithKind("StatefulSet").GroupKind(): statefulSetStatusReader,
appsv1.SchemeGroupVersion.WithKind("ReplicaSet").GroupKind(): replicaSetStatusReader,
}
for _, gk := range s.statusmap.GkMapping {
statusReaders[gk] = s.statusmap
}
return statusReaders, defaultStatusReader
}
@ -92,8 +92,53 @@ func (s *StatusPoller) createStatusReaders(reader engine.ClusterReader, mapper m
func clusterReaderFactoryFunc(useCache bool) engine.ClusterReaderFactoryFunc {
return func(r client.Reader, mapper meta.RESTMapper, identifiers []object.ObjMetadata) (engine.ClusterReader, error) {
if useCache {
return clusterreader.NewCachingClusterReader(r, mapper, identifiers)
cr, err := clusterreader.NewCachingClusterReader(r, mapper, identifiers)
if err != nil {
return nil, err
}
return &CachingClusterReader{Cr: cr}, nil
}
return &clusterreader.DirectClusterReader{Reader: r}, nil
}
}
// CachingClusterReader is wrapper for kstatus.CachingClusterReader implementation
type CachingClusterReader struct {
Cr *clusterreader.CachingClusterReader
applyErrors []error
}
// Get is a wrapper for kstatus.CachingClusterReader Get method
func (c *CachingClusterReader) Get(ctx context.Context, key client.ObjectKey, obj *unstructured.Unstructured) error {
return c.Cr.Get(ctx, key, obj)
}
// ListNamespaceScoped is a wrapper for kstatus.CachingClusterReader ListNamespaceScoped method
func (c *CachingClusterReader) ListNamespaceScoped(
ctx context.Context,
list *unstructured.UnstructuredList,
namespace string,
selector labels.Selector) error {
return c.Cr.ListNamespaceScoped(ctx, list, namespace, selector)
}
// ListClusterScoped is a wrapper for kstatus.CachingClusterReader ListClusterScoped method
func (c *CachingClusterReader) ListClusterScoped(
ctx context.Context,
list *unstructured.UnstructuredList,
selector labels.Selector) error {
return c.Cr.ListClusterScoped(ctx, list, selector)
}
// Sync is a wrapper for kstatus.CachingClusterReader Sync method, allows to filter specific errors
func (c *CachingClusterReader) Sync(ctx context.Context) error {
err := c.Cr.Sync(ctx)
if err != nil && strings.Contains(err.Error(), "request timed out") {
c.applyErrors = append(c.applyErrors, err)
if len(c.applyErrors) < allowedApplyErrors {
log.Printf("timeout error occurred during sync: '%v', skipping", err)
return nil
}
}
return err
}

@ -21,15 +21,11 @@ import (
"github.com/stretchr/testify/require"
"sigs.k8s.io/controller-runtime/pkg/client"
"opendev.org/airship/airshipctl/pkg/cluster"
"opendev.org/airship/airshipctl/pkg/k8s/client/fake"
"opendev.org/airship/airshipctl/pkg/k8s/poller"
k8sutils "opendev.org/airship/airshipctl/pkg/k8s/utils"
)
func TestNewStatusPoller(t *testing.T) {
airClient := fake.NewClient()
f := k8sutils.FactoryFromKubeConfig("testdata/kubeconfig.yaml", "")
restConfig, err := f.ToRESTConfig()
require.NoError(t, err)
@ -37,9 +33,7 @@ func TestNewStatusPoller(t *testing.T) {
require.NoError(t, err)
restClient, err := client.New(restConfig, client.Options{Mapper: restMapper})
require.NoError(t, err)
statusmap, err := cluster.NewStatusMap(airClient)
require.NoError(t, err)
a := poller.NewStatusPoller(restClient, restMapper, statusmap)
a := poller.NewStatusPoller(restClient, restMapper)
assert.NotNil(t, a)
}

Loading…
Cancel
Save