Add exponential backoff to openstack loadbalancer functions
Using exponential backoff to lower openstack load and reduce API call throttling Change-Id: Iba806a12184d5e499a9ab8dc9f8dcc2e8303ef5b
This commit is contained in:
parent
e29b3c6405
commit
5363380d88
|
@ -58,6 +58,7 @@ go_library(
|
|||
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/client-go/util/cert:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -80,6 +81,7 @@ go_test(
|
|||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
|
|
@ -39,6 +39,7 @@ import (
|
|||
neutronports "github.com/gophercloud/gophercloud/openstack/networking/v2/ports"
|
||||
"github.com/gophercloud/gophercloud/pagination"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/api/v1/service"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
|
@ -46,8 +47,26 @@ import (
|
|||
|
||||
// Note: when creating a new Loadbalancer (VM), it can take some time before it is ready for use,
|
||||
// this timeout is used for waiting until the Loadbalancer provisioning status goes to ACTIVE state.
|
||||
const loadbalancerActiveTimeoutSeconds = 120
|
||||
const loadbalancerDeleteTimeoutSeconds = 30
|
||||
const (
|
||||
// loadbalancerActive* is configuration of exponential backoff for
|
||||
// going into ACTIVE loadbalancer provisioning status. Starting with 1
|
||||
// seconds, multiplying by 1.2 with each step and taking 19 steps at maximum
|
||||
// it will time out after 128s, which roughly corresponds to 120s
|
||||
loadbalancerActiveInitDealy = 1 * time.Second
|
||||
loadbalancerActiveFactor = 1.2
|
||||
loadbalancerActiveSteps = 19
|
||||
|
||||
// loadbalancerDelete* is configuration of exponential backoff for
|
||||
// waiting for delete operation to complete. Starting with 1
|
||||
// seconds, multiplying by 1.2 with each step and taking 13 steps at maximum
|
||||
// it will time out after 32s, which roughly corresponds to 30s
|
||||
loadbalancerDeleteInitDealy = 1 * time.Second
|
||||
loadbalancerDeleteFactor = 1.2
|
||||
loadbalancerDeleteSteps = 13
|
||||
|
||||
activeStatus = "ACTIVE"
|
||||
errorStatus = "ERROR"
|
||||
)
|
||||
|
||||
// LoadBalancer implementation for LBaaS v1
|
||||
type LbaasV1 struct {
|
||||
|
@ -337,44 +356,6 @@ func getMembersByPoolID(client *gophercloud.ServiceClient, id string) ([]v2pools
|
|||
return members, nil
|
||||
}
|
||||
|
||||
// Each pool has exactly one or zero monitors. ListOpts does not seem to filter anything.
|
||||
func getMonitorByPoolID(client *gophercloud.ServiceClient, id string) (*v2monitors.Monitor, error) {
|
||||
var monitorList []v2monitors.Monitor
|
||||
err := v2monitors.List(client, v2monitors.ListOpts{PoolID: id}).EachPage(func(page pagination.Page) (bool, error) {
|
||||
monitorsList, err := v2monitors.ExtractMonitors(page)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, monitor := range monitorsList {
|
||||
// bugfix, filter by poolid
|
||||
for _, pool := range monitor.Pools {
|
||||
if pool.ID == id {
|
||||
monitorList = append(monitorList, monitor)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(monitorList) > 1 {
|
||||
return false, ErrMultipleResults
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
if isNotFound(err) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(monitorList) == 0 {
|
||||
return nil, ErrNotFound
|
||||
} else if len(monitorList) > 1 {
|
||||
return nil, ErrMultipleResults
|
||||
}
|
||||
|
||||
return &monitorList[0], nil
|
||||
}
|
||||
|
||||
// Check if a member exists for node
|
||||
func memberExists(members []v2pools.Member, addr string, port int) bool {
|
||||
for _, member := range members {
|
||||
|
@ -436,45 +417,59 @@ func getSecurityGroupRules(client *gophercloud.ServiceClient, opts rules.ListOpt
|
|||
}
|
||||
|
||||
func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) (string, error) {
|
||||
start := time.Now().Second()
|
||||
for {
|
||||
backoff := wait.Backoff{
|
||||
Duration: loadbalancerActiveInitDealy,
|
||||
Factor: loadbalancerActiveFactor,
|
||||
Steps: loadbalancerActiveSteps,
|
||||
}
|
||||
|
||||
var provisioningStatus string
|
||||
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
|
||||
loadbalancer, err := loadbalancers.Get(client, loadbalancerID).Extract()
|
||||
if err != nil {
|
||||
return "", err
|
||||
return false, err
|
||||
}
|
||||
if loadbalancer.ProvisioningStatus == "ACTIVE" {
|
||||
return "ACTIVE", nil
|
||||
} else if loadbalancer.ProvisioningStatus == "ERROR" {
|
||||
return "ERROR", fmt.Errorf("Loadbalancer has gone into ERROR state")
|
||||
provisioningStatus = loadbalancer.ProvisioningStatus
|
||||
if loadbalancer.ProvisioningStatus == activeStatus {
|
||||
return true, nil
|
||||
} else if loadbalancer.ProvisioningStatus == errorStatus {
|
||||
return true, fmt.Errorf("Loadbalancer has gone into ERROR state")
|
||||
} else {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
})
|
||||
|
||||
if time.Now().Second()-start >= loadbalancerActiveTimeoutSeconds {
|
||||
return loadbalancer.ProvisioningStatus, fmt.Errorf("Loadbalancer failed to go into ACTIVE provisioning status within alloted time")
|
||||
}
|
||||
if err == wait.ErrWaitTimeout {
|
||||
err = fmt.Errorf("Loadbalancer failed to go into ACTIVE provisioning status within alloted time")
|
||||
}
|
||||
return provisioningStatus, err
|
||||
}
|
||||
|
||||
func waitLoadbalancerDeleted(client *gophercloud.ServiceClient, loadbalancerID string) error {
|
||||
start := time.Now().Second()
|
||||
for {
|
||||
backoff := wait.Backoff{
|
||||
Duration: loadbalancerDeleteInitDealy,
|
||||
Factor: loadbalancerDeleteFactor,
|
||||
Steps: loadbalancerDeleteSteps,
|
||||
}
|
||||
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
|
||||
_, err := loadbalancers.Get(client, loadbalancerID).Extract()
|
||||
if err != nil {
|
||||
if err == ErrNotFound {
|
||||
return nil
|
||||
return true, nil
|
||||
} else {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
} else {
|
||||
return false, nil
|
||||
}
|
||||
})
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
if time.Now().Second()-start >= loadbalancerDeleteTimeoutSeconds {
|
||||
return fmt.Errorf("Loadbalancer failed to delete within the alloted time")
|
||||
}
|
||||
|
||||
if err == wait.ErrWaitTimeout {
|
||||
err = fmt.Errorf("Loadbalancer failed to delete within the alloted time")
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func toRuleProtocol(protocol v1.Protocol) rules.RuleProtocol {
|
||||
|
|
|
@ -30,39 +30,55 @@ import (
|
|||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/rand"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
)
|
||||
|
||||
const volumeAvailableStatus = "available"
|
||||
const volumeInUseStatus = "in-use"
|
||||
const volumeCreateTimeoutSeconds = 30
|
||||
const testClusterName = "testCluster"
|
||||
const (
|
||||
volumeAvailableStatus = "available"
|
||||
volumeInUseStatus = "in-use"
|
||||
testClusterName = "testCluster"
|
||||
|
||||
func WaitForVolumeStatus(t *testing.T, os *OpenStack, volumeName string, status string, timeoutSeconds int) {
|
||||
timeout := timeoutSeconds
|
||||
start := time.Now().Second()
|
||||
for {
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
if timeout >= 0 && time.Now().Second()-start >= timeout {
|
||||
t.Logf("Volume (%s) status did not change to %s after %v seconds\n",
|
||||
volumeName,
|
||||
status,
|
||||
timeout)
|
||||
return
|
||||
}
|
||||
volumeStatusTimeoutSeconds = 30
|
||||
// volumeStatus* is configuration of exponential backoff for
|
||||
// waiting for specified volume status. Starting with 1
|
||||
// seconds, multiplying by 1.2 with each step and taking 13 steps at maximum
|
||||
// it will time out after 32s, which roughly corresponds to 30s
|
||||
volumeStatusInitDealy = 1 * time.Second
|
||||
volumeStatusFactor = 1.2
|
||||
volumeStatusSteps = 13
|
||||
)
|
||||
|
||||
func WaitForVolumeStatus(t *testing.T, os *OpenStack, volumeName string, status string) {
|
||||
backoff := wait.Backoff{
|
||||
Duration: volumeStatusInitDealy,
|
||||
Factor: volumeStatusFactor,
|
||||
Steps: volumeStatusSteps,
|
||||
}
|
||||
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
|
||||
getVol, err := os.getVolume(volumeName)
|
||||
if err != nil {
|
||||
t.Fatalf("Cannot get existing Cinder volume (%s): %v", volumeName, err)
|
||||
return false, err
|
||||
}
|
||||
if getVol.Status == status {
|
||||
t.Logf("Volume (%s) status changed to %s after %v seconds\n",
|
||||
volumeName,
|
||||
status,
|
||||
timeout)
|
||||
return
|
||||
volumeStatusTimeoutSeconds)
|
||||
return true, nil
|
||||
} else {
|
||||
return false, nil
|
||||
}
|
||||
})
|
||||
if err == wait.ErrWaitTimeout {
|
||||
t.Logf("Volume (%s) status did not change to %s after %v seconds\n",
|
||||
volumeName,
|
||||
status,
|
||||
volumeStatusTimeoutSeconds)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("Cannot get existing Cinder volume (%s): %v", volumeName, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -360,7 +376,7 @@ func TestVolumes(t *testing.T) {
|
|||
}
|
||||
t.Logf("Volume (%s) created\n", vol)
|
||||
|
||||
WaitForVolumeStatus(t, os, vol, volumeAvailableStatus, volumeCreateTimeoutSeconds)
|
||||
WaitForVolumeStatus(t, os, vol, volumeAvailableStatus)
|
||||
|
||||
diskId, err := os.AttachDisk(os.localInstanceID, vol)
|
||||
if err != nil {
|
||||
|
@ -368,7 +384,7 @@ func TestVolumes(t *testing.T) {
|
|||
}
|
||||
t.Logf("Volume (%s) attached, disk ID: %s\n", vol, diskId)
|
||||
|
||||
WaitForVolumeStatus(t, os, vol, volumeInUseStatus, volumeCreateTimeoutSeconds)
|
||||
WaitForVolumeStatus(t, os, vol, volumeInUseStatus)
|
||||
|
||||
devicePath := os.GetDevicePath(diskId)
|
||||
if !strings.HasPrefix(devicePath, "/dev/disk/by-id/") {
|
||||
|
@ -382,7 +398,7 @@ func TestVolumes(t *testing.T) {
|
|||
}
|
||||
t.Logf("Volume (%s) detached\n", vol)
|
||||
|
||||
WaitForVolumeStatus(t, os, vol, volumeAvailableStatus, volumeCreateTimeoutSeconds)
|
||||
WaitForVolumeStatus(t, os, vol, volumeAvailableStatus)
|
||||
|
||||
err = os.DeleteVolume(vol)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue