EnsureLoadBalancer update instead of recreate existing LBs

This commit is contained in:
Markus Suonto 2016-09-28 09:20:59 +03:00
parent e53f9586d1
commit 5fa3c1376a
1 changed files with 346 additions and 94 deletions

View File

@ -241,6 +241,161 @@ func getLoadbalancerByName(client *gophercloud.ServiceClient, name string) (*loa
return &loadbalancerList[0], nil
}
func getListenersByLoadBalancerID(client *gophercloud.ServiceClient, id string) ([]listeners.Listener, error) {
var existingListeners []listeners.Listener
err := listeners.List(client, listeners.ListOpts{LoadbalancerID: id}).EachPage(func(page pagination.Page) (bool, error) {
listenerList, err := listeners.ExtractListeners(page)
if err != nil {
return false, err
}
existingListeners = append(existingListeners, listenerList...)
return true, nil
})
if err != nil {
return nil, err
}
return existingListeners, nil
}
// get listener for a port or nil if does not exist
func getListenerForPort(existingListeners []listeners.Listener, port api.ServicePort) *listeners.Listener {
for _, l := range existingListeners {
if l.Protocol == string(port.Protocol) && l.ProtocolPort == int(port.Port) {
return &l
}
}
return nil
}
// Get pool for a listener. A listener always has exactly one pool.
func getPoolByListenerID(client *gophercloud.ServiceClient, loadbalancerID string, listenerID string) (*v2_pools.Pool, error) {
listenerPools := make([]v2_pools.Pool, 0, 1)
err := v2_pools.List(client, v2_pools.ListOpts{LoadbalancerID: loadbalancerID}).EachPage(func(page pagination.Page) (bool, error) {
poolsList, err := v2_pools.ExtractPools(page)
if err != nil {
return false, err
}
for _, p := range poolsList {
for _, l := range p.Listeners {
if l.ID == listenerID {
listenerPools = append(listenerPools, p)
}
}
}
if len(listenerPools) > 1 {
return false, ErrMultipleResults
}
return true, nil
})
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, err
}
if len(listenerPools) == 0 {
return nil, ErrNotFound
} else if len(listenerPools) > 1 {
return nil, ErrMultipleResults
}
return &listenerPools[0], nil
}
func getMembersByPoolID(client *gophercloud.ServiceClient, id string) ([]v2_pools.Member, error) {
var members []v2_pools.Member
err := v2_pools.ListAssociateMembers(client, id, v2_pools.MemberListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
membersList, err := v2_pools.ExtractMembers(page)
if err != nil {
return false, err
}
members = append(members, membersList...)
return true, nil
})
if err != nil {
return nil, err
}
return members, nil
}
// Each pool has exactly one or zero monitors. ListOpts does not seem to filter anything.
func getMonitorByPoolID(client *gophercloud.ServiceClient, id string) (*v2_monitors.Monitor, error) {
var monitorList []v2_monitors.Monitor
err := v2_monitors.List(client, v2_monitors.ListOpts{PoolID: id}).EachPage(func(page pagination.Page) (bool, error) {
monitorsList, err := v2_monitors.ExtractMonitors(page)
if err != nil {
return false, err
}
for _, monitor := range monitorsList {
// bugfix, filter by poolid
for _, pool := range monitor.Pools {
if pool.ID == id {
monitorList = append(monitorList, monitor)
}
}
}
if len(monitorList) > 1 {
return false, ErrMultipleResults
}
return true, nil
})
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, err
}
if len(monitorList) == 0 {
return nil, ErrNotFound
} else if len(monitorList) > 1 {
return nil, ErrMultipleResults
}
return &monitorList[0], nil
}
// Check if a member exists for node
func memberExists(members []v2_pools.Member, addr string) bool {
for _, member := range members {
if member.Address == addr {
return true
}
}
return false
}
func popListener(existingListeners []listeners.Listener, id string) []listeners.Listener {
for i, existingListener := range existingListeners {
if existingListener.ID == id {
existingListeners[i] = existingListeners[len(existingListeners)-1]
existingListeners = existingListeners[:len(existingListeners)-1]
break
}
}
return existingListeners
}
func popMember(members []v2_pools.Member, addr string) []v2_pools.Member {
for i, member := range members {
if member.Address == addr {
members[i] = members[len(members)-1]
members = members[:len(members)-1]
}
}
return members
}
func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) (string, error) {
start := time.Now().Second()
for {
@ -283,6 +438,26 @@ func waitLoadbalancerDeleted(client *gophercloud.ServiceClient, loadbalancerID s
}
}
func (lbaas *LbaasV2) createLoadBalancer(service *api.Service, name string) (*loadbalancers.LoadBalancer, error) {
createOpts := loadbalancers.CreateOpts{
Name: name,
Description: fmt.Sprintf("Kubernetes external service %s", name),
VipSubnetID: lbaas.opts.SubnetId,
}
loadBalancerIP := service.Spec.LoadBalancerIP
if loadBalancerIP != "" {
createOpts.VipAddress = loadBalancerIP
}
loadbalancer, err := loadbalancers.Create(lbaas.network, createOpts).Extract()
if err != nil {
return nil, fmt.Errorf("Error creating loadbalancer %v: %v", createOpts, err)
}
return loadbalancer, nil
}
func (lbaas *LbaasV2) GetLoadBalancer(clusterName string, service *api.Service) (*api.LoadBalancerStatus, bool, error) {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
loadbalancer, err := getLoadbalancerByName(lbaas.network, loadBalancerName)
@ -340,101 +515,126 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *api.Ser
return nil, fmt.Errorf("Source range restrictions are not supported for openstack load balancers")
}
glog.V(2).Infof("Checking if openstack load balancer already exists: %s", cloudprovider.GetLoadBalancerName(apiService))
_, exists, err := lbaas.GetLoadBalancer(clusterName, apiService)
name := cloudprovider.GetLoadBalancerName(apiService)
loadbalancer, err := getLoadbalancerByName(lbaas.network, name)
if err != nil {
return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err)
if err != ErrNotFound {
return nil, fmt.Errorf("Error getting loadbalancer %s: %v", name, err)
}
glog.V(2).Infof("Creating loadbalancer %s", name)
loadbalancer, err = lbaas.createLoadBalancer(apiService, name)
if err != nil {
// Unknown error, retry later
return nil, fmt.Errorf("Error creating loadbalancer %s: %v", name, err)
}
} else {
glog.V(2).Infof("LoadBalancer %s already exists", name)
}
// TODO: Implement a more efficient update strategy for common changes than delete & create
// In particular, if we implement hosts update, we can get rid of UpdateHosts
if exists {
err := lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
if err != nil {
return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err)
}
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
lbmethod := v2_pools.LBMethod(lbaas.opts.LBMethod)
if lbmethod == "" {
lbmethod = v2_pools.LBMethodRoundRobin
}
name := cloudprovider.GetLoadBalancerName(apiService)
createOpts := loadbalancers.CreateOpts{
Name: name,
Description: fmt.Sprintf("Kubernetes external service %s", name),
VipSubnetID: lbaas.opts.SubnetId,
}
loadBalancerIP := apiService.Spec.LoadBalancerIP
if loadBalancerIP != "" {
createOpts.VipAddress = loadBalancerIP
}
loadbalancer, err := loadbalancers.Create(lbaas.network, createOpts).Extract()
oldListeners, err := getListenersByLoadBalancerID(lbaas.network, loadbalancer.ID)
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
return nil, fmt.Errorf("Error getting LB %s listeners: %v", name, err)
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
for portIndex, port := range ports {
listener, err := listeners.Create(lbaas.network, listeners.CreateOpts{
Name: fmt.Sprintf("listener_%s_%d", name, portIndex),
Protocol: listeners.Protocol(port.Protocol),
ProtocolPort: int(port.Port),
LoadbalancerID: loadbalancer.ID,
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
pool, err := v2_pools.Create(lbaas.network, v2_pools.CreateOpts{
Name: fmt.Sprintf("pool_%s_%d", name, portIndex),
Protocol: v2_pools.Protocol(port.Protocol),
LBMethod: lbmethod,
ListenerID: listener.ID,
Persistence: persistence,
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
for _, nodeName := range nodeNames {
addr, err := getAddressByName(lbaas.compute, types.NodeName(nodeName))
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
_, err = v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{
ProtocolPort: int(port.NodePort),
Address: addr,
SubnetID: lbaas.opts.SubnetId,
listener := getListenerForPort(oldListeners, port)
if listener == nil {
glog.V(4).Infof("Creating listener for port %d", int(port.Port))
listener, err = listeners.Create(lbaas.network, listeners.CreateOpts{
Name: fmt.Sprintf("listener_%s_%d", name, portIndex),
Protocol: listeners.Protocol(port.Protocol),
ProtocolPort: int(port.Port),
LoadbalancerID: loadbalancer.ID,
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
// Unknown error, retry later
return nil, fmt.Errorf("Error creating LB listener: %v", err)
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
if lbaas.opts.CreateMonitor {
_, err = v2_monitors.Create(lbaas.network, v2_monitors.CreateOpts{
glog.V(4).Infof("Listener for %s port %d: %s", string(port.Protocol), int(port.Port), listener.ID)
// After all ports have been processed, remaining listeners are removed as obsolete.
// Pop valid listeners.
oldListeners = popListener(oldListeners, listener.ID)
pool, err := getPoolByListenerID(lbaas.network, loadbalancer.ID, listener.ID)
if err != nil && err != ErrNotFound {
// Unknown error, retry later
return nil, fmt.Errorf("Error getting pool for listener %s: %v", listener.ID, err)
}
if pool == nil {
glog.V(4).Infof("Creating pool for listener %s", listener.ID)
pool, err = v2_pools.Create(lbaas.network, v2_pools.CreateOpts{
Name: fmt.Sprintf("pool_%s_%d", name, portIndex),
Protocol: v2_pools.Protocol(port.Protocol),
LBMethod: lbmethod,
ListenerID: listener.ID,
Persistence: persistence,
}).Extract()
if err != nil {
// Unknown error, retry later
return nil, fmt.Errorf("Error creating pool for listener %s: %v", listener.ID, err)
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
glog.V(4).Infof("Pool for listener %s: %s", listener.ID, pool.ID)
members, err := getMembersByPoolID(lbaas.network, pool.ID)
if err != nil && !isNotFound(err) {
return nil, fmt.Errorf("Error getting pool members %s: %v", pool.ID, err)
}
for _, nodeName := range nodeNames {
addr, err := getAddressByName(lbaas.compute, types.NodeName(nodeName))
if err != nil {
if err == ErrNotFound {
// Node failure, do not create member
glog.Warningf("Failed to create LB pool member for node %s: %v", nodeName, err)
continue
} else {
return nil, fmt.Errorf("Error getting address for node %s: %v", nodeName, err)
}
}
if !memberExists(members, addr) {
glog.V(4).Infof("Creating member for pool %s", pool.ID)
_, err := v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{
ProtocolPort: int(port.NodePort),
Address: addr,
SubnetID: lbaas.opts.SubnetId,
}).Extract()
if err != nil {
return nil, fmt.Errorf("Error creating LB pool member for node: %s, %v", nodeName, err)
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
// After all members have been processed, remaining members are deleted as obsolete.
members = popMember(members, addr)
glog.V(4).Infof("Ensured pool %s has member for %s at %s", pool.ID, nodeName, addr)
}
// Delete obsolete members for this pool
for _, member := range members {
glog.V(4).Infof("Deleting obsolete member %s for pool %s address %s", member.ID, pool.ID, member.Address)
err := v2_pools.DeleteMember(lbaas.network, pool.ID, member.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return nil, fmt.Errorf("Error deleting obsolete member %s for pool %s address %s: %v", member.ID, pool.ID, member.Address, err)
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
monitorID := pool.MonitorID
if monitorID == "" && lbaas.opts.CreateMonitor {
glog.V(4).Infof("Creating monitor for pool %s", pool.ID)
monitor, err := v2_monitors.Create(lbaas.network, v2_monitors.CreateOpts{
PoolID: pool.ID,
Type: string(port.Protocol),
Delay: int(lbaas.opts.MonitorDelay.Duration.Seconds()),
@ -442,40 +642,92 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *api.Ser
MaxRetries: int(lbaas.opts.MonitorMaxRetries),
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
return nil, fmt.Errorf("Error creating LB pool healthmonitor: %v", err)
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
monitorID = monitor.ID
}
glog.V(4).Infof("Monitor for pool %s: %s", pool.ID, monitorID)
}
// All remaining listeners are obsolete, delete
for _, listener := range oldListeners {
glog.V(4).Infof("Deleting obsolete listener %s:", listener.ID)
// get pool for listener
pool, err := getPoolByListenerID(lbaas.network, loadbalancer.ID, listener.ID)
if err != nil && err != ErrNotFound {
return nil, fmt.Errorf("Error getting pool for obsolete listener %s: %v", listener.ID, err)
}
if pool != nil {
// get and delete monitor
monitorID := pool.MonitorID
if monitorID != "" {
glog.V(4).Infof("Deleting obsolete monitor %s for pool %s", monitorID, pool.ID)
err = v2_monitors.Delete(lbaas.network, monitorID).ExtractErr()
if err != nil && !isNotFound(err) {
return nil, fmt.Errorf("Error deleting obsolete monitor %s for pool %s: %v", monitorID, pool.ID, err)
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
// get and delete pool members
members, err := getMembersByPoolID(lbaas.network, pool.ID)
if err != nil && !isNotFound(err) {
return nil, fmt.Errorf("Error getting members for pool %s: %v", pool.ID, err)
}
if members != nil {
for _, member := range members {
glog.V(4).Infof("Deleting obsolete member %s for pool %s address %s", member.ID, pool.ID, member.Address)
err := v2_pools.DeleteMember(lbaas.network, pool.ID, member.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return nil, fmt.Errorf("Error deleting obsolete member %s for pool %s address %s: %v", member.ID, pool.ID, member.Address, err)
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
}
glog.V(4).Infof("Deleting obsolete pool %s for listener %s", pool.ID, listener.ID)
// delete pool
err = v2_pools.Delete(lbaas.network, pool.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return nil, fmt.Errorf("Error deleting obsolete pool %s for listener %s: %v", pool.ID, listener.ID, err)
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
// delete listener
err = listeners.Delete(lbaas.network, listener.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return nil, fmt.Errorf("Error deleteting obsolete listener: %v", err)
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
glog.V(2).Infof("Deleted obsolete listener: %s", listener.ID)
}
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: loadbalancer.VipAddress}}
if lbaas.opts.FloatingNetworkId != "" {
portID, err := getPortIDByIP(lbaas.network, loadbalancer.VipAddress)
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
portID, err := getPortIDByIP(lbaas.network, loadbalancer.VipAddress)
if err != nil {
return nil, fmt.Errorf("Error getting port for LB vip %s: %v", loadbalancer.VipAddress, err)
}
floatIP, err := getFloatingIPByPortID(lbaas.network, portID)
if err != nil && err != ErrNotFound {
return nil, fmt.Errorf("Error getting floating ip for port %s: %v", portID, err)
}
if floatIP == nil && lbaas.opts.FloatingNetworkId != "" {
glog.V(4).Infof("Creating floating ip for loadbalancer %s port %s", loadbalancer.ID, portID)
floatIPOpts := floatingips.CreateOpts{
FloatingNetworkID: lbaas.opts.FloatingNetworkId,
PortID: portID,
}
floatIP, err := floatingips.Create(lbaas.network, floatIPOpts).Extract()
floatIP, err = floatingips.Create(lbaas.network, floatIPOpts).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
return nil, fmt.Errorf("Error creating LB floatingip %+v: %v", floatIPOpts, err)
}
status.Ingress = append(status.Ingress, api.LoadBalancerIngress{IP: floatIP.FloatingIP})
}
status.Ingress = append(status.Ingress, api.LoadBalancerIngress{IP: floatIP.FloatingIP})
return status, nil
}