Merge pull request #25987 from hpcloud/openstack-lbaas-v2
Automatic merge from submit-queue LBaaS v2 Support for Openstack Cloud Provider Plugin Resolves #19774. This work is based on Gophercloud support for LBaaS v2 currently in review (this will have to merge first): https://github.com/rackspace/gophercloud/pull/575 These changes includes the addition of a new loadbalancer configuration option: **LBVersion**. If this configuration attribute is missing or anything other than "v2", lbaas v1 implementation will be used.
This commit is contained in:
commit
8681baa950
|
@ -28,24 +28,19 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"gopkg.in/gcfg.v1"
|
||||
|
||||
"github.com/rackspace/gophercloud"
|
||||
"github.com/rackspace/gophercloud/openstack"
|
||||
"github.com/rackspace/gophercloud/openstack/blockstorage/v1/volumes"
|
||||
"github.com/rackspace/gophercloud/openstack/compute/v2/extensions/volumeattach"
|
||||
"github.com/rackspace/gophercloud/openstack/compute/v2/flavors"
|
||||
"github.com/rackspace/gophercloud/openstack/compute/v2/servers"
|
||||
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/layer3/floatingips"
|
||||
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/members"
|
||||
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/monitors"
|
||||
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/pools"
|
||||
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/vips"
|
||||
"github.com/rackspace/gophercloud/pagination"
|
||||
"gopkg.in/gcfg.v1"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/api/service"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
)
|
||||
|
||||
|
@ -81,8 +76,15 @@ func (d *MyDuration) UnmarshalText(text []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type LoadBalancer struct {
|
||||
network *gophercloud.ServiceClient
|
||||
compute *gophercloud.ServiceClient
|
||||
opts LoadBalancerOpts
|
||||
}
|
||||
|
||||
type LoadBalancerOpts struct {
|
||||
SubnetId string `gcfg:"subnet-id"` // required
|
||||
LBVersion string `gcfg:"lb-version"` // v1 or v2
|
||||
SubnetId string `gcfg:"subnet-id"` // required
|
||||
FloatingNetworkId string `gcfg:"floating-network-id"`
|
||||
LBMethod string `gcfg:"lb-method"`
|
||||
CreateMonitor bool `gcfg:"create-monitor"`
|
||||
|
@ -504,12 +506,6 @@ func (os *OpenStack) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []
|
|||
return nameservers, searches
|
||||
}
|
||||
|
||||
type LoadBalancer struct {
|
||||
network *gophercloud.ServiceClient
|
||||
compute *gophercloud.ServiceClient
|
||||
opts LoadBalancerOpts
|
||||
}
|
||||
|
||||
func (os *OpenStack) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
|
||||
glog.V(4).Info("openstack.LoadBalancer() called")
|
||||
|
||||
|
@ -532,7 +528,12 @@ func (os *OpenStack) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
|
|||
|
||||
glog.V(1).Info("Claiming to support LoadBalancer")
|
||||
|
||||
return &LoadBalancer{network, compute, os.lbOpts}, true
|
||||
if os.lbOpts.LBVersion == "v2" {
|
||||
return &LbaasV2{LoadBalancer{network, compute, os.lbOpts}}, true
|
||||
} else {
|
||||
|
||||
return &LbaasV1{LoadBalancer{network, compute, os.lbOpts}}, true
|
||||
}
|
||||
}
|
||||
|
||||
func isNotFound(err error) bool {
|
||||
|
@ -540,412 +541,6 @@ func isNotFound(err error) bool {
|
|||
return ok && e.Actual == http.StatusNotFound
|
||||
}
|
||||
|
||||
func getPoolByName(client *gophercloud.ServiceClient, name string) (*pools.Pool, error) {
|
||||
opts := pools.ListOpts{
|
||||
Name: name,
|
||||
}
|
||||
pager := pools.List(client, opts)
|
||||
|
||||
poolList := make([]pools.Pool, 0, 1)
|
||||
|
||||
err := pager.EachPage(func(page pagination.Page) (bool, error) {
|
||||
p, err := pools.ExtractPools(page)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
poolList = append(poolList, p...)
|
||||
if len(poolList) > 1 {
|
||||
return false, ErrMultipleResults
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
if isNotFound(err) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(poolList) == 0 {
|
||||
return nil, ErrNotFound
|
||||
} else if len(poolList) > 1 {
|
||||
return nil, ErrMultipleResults
|
||||
}
|
||||
|
||||
return &poolList[0], nil
|
||||
}
|
||||
|
||||
func getVipByName(client *gophercloud.ServiceClient, name string) (*vips.VirtualIP, error) {
|
||||
opts := vips.ListOpts{
|
||||
Name: name,
|
||||
}
|
||||
pager := vips.List(client, opts)
|
||||
|
||||
vipList := make([]vips.VirtualIP, 0, 1)
|
||||
|
||||
err := pager.EachPage(func(page pagination.Page) (bool, error) {
|
||||
v, err := vips.ExtractVIPs(page)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
vipList = append(vipList, v...)
|
||||
if len(vipList) > 1 {
|
||||
return false, ErrMultipleResults
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
if isNotFound(err) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(vipList) == 0 {
|
||||
return nil, ErrNotFound
|
||||
} else if len(vipList) > 1 {
|
||||
return nil, ErrMultipleResults
|
||||
}
|
||||
|
||||
return &vipList[0], nil
|
||||
}
|
||||
|
||||
func getFloatingIPByPortID(client *gophercloud.ServiceClient, portID string) (*floatingips.FloatingIP, error) {
|
||||
opts := floatingips.ListOpts{
|
||||
PortID: portID,
|
||||
}
|
||||
pager := floatingips.List(client, opts)
|
||||
|
||||
floatingIPList := make([]floatingips.FloatingIP, 0, 1)
|
||||
|
||||
err := pager.EachPage(func(page pagination.Page) (bool, error) {
|
||||
f, err := floatingips.ExtractFloatingIPs(page)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
floatingIPList = append(floatingIPList, f...)
|
||||
if len(floatingIPList) > 1 {
|
||||
return false, ErrMultipleResults
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
if isNotFound(err) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(floatingIPList) == 0 {
|
||||
return nil, ErrNotFound
|
||||
} else if len(floatingIPList) > 1 {
|
||||
return nil, ErrMultipleResults
|
||||
}
|
||||
|
||||
return &floatingIPList[0], nil
|
||||
}
|
||||
|
||||
func (lb *LoadBalancer) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) {
|
||||
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
|
||||
vip, err := getVipByName(lb.network, loadBalancerName)
|
||||
if err == ErrNotFound {
|
||||
return nil, false, nil
|
||||
}
|
||||
if vip == nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
status := &api.LoadBalancerStatus{}
|
||||
status.Ingress = []api.LoadBalancerIngress{{IP: vip.Address}}
|
||||
|
||||
return status, true, err
|
||||
}
|
||||
|
||||
// TODO: This code currently ignores 'region' and always creates a
|
||||
// loadbalancer in only the current OpenStack region. We should take
|
||||
// a list of regions (from config) and query/create loadbalancers in
|
||||
// each region.
|
||||
|
||||
func (lb *LoadBalancer) EnsureLoadBalancer(apiService *api.Service, hosts []string) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, apiService.Annotations)
|
||||
|
||||
ports := apiService.Spec.Ports
|
||||
if len(ports) > 1 {
|
||||
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
|
||||
} else if len(ports) == 0 {
|
||||
return nil, fmt.Errorf("no ports provided to openstack load balancer")
|
||||
}
|
||||
|
||||
// The service controller verified all the protocols match on the ports, just check and use the first one
|
||||
// TODO: Convert all error messages to use an event recorder
|
||||
if ports[0].Protocol != api.ProtocolTCP {
|
||||
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers")
|
||||
}
|
||||
|
||||
affinity := apiService.Spec.SessionAffinity
|
||||
var persistence *vips.SessionPersistence
|
||||
switch affinity {
|
||||
case api.ServiceAffinityNone:
|
||||
persistence = nil
|
||||
case api.ServiceAffinityClientIP:
|
||||
persistence = &vips.SessionPersistence{Type: "SOURCE_IP"}
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
|
||||
}
|
||||
|
||||
sourceRanges, err := service.GetLoadBalancerSourceRanges(apiService)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !service.IsAllowAll(sourceRanges) {
|
||||
return nil, fmt.Errorf("Source range restrictions are not supported for openstack load balancers")
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Checking if openstack load balancer already exists: %s", cloudprovider.GetLoadBalancerName(apiService))
|
||||
_, exists, err := lb.GetLoadBalancer(apiService)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err)
|
||||
}
|
||||
|
||||
// TODO: Implement a more efficient update strategy for common changes than delete & create
|
||||
// In particular, if we implement hosts update, we can get rid of UpdateHosts
|
||||
if exists {
|
||||
err := lb.EnsureLoadBalancerDeleted(apiService)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
lbmethod := lb.opts.LBMethod
|
||||
if lbmethod == "" {
|
||||
lbmethod = pools.LBMethodRoundRobin
|
||||
}
|
||||
name := cloudprovider.GetLoadBalancerName(apiService)
|
||||
pool, err := pools.Create(lb.network, pools.CreateOpts{
|
||||
Name: name,
|
||||
Protocol: pools.ProtocolTCP,
|
||||
SubnetID: lb.opts.SubnetId,
|
||||
LBMethod: lbmethod,
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, host := range hosts {
|
||||
addr, err := getAddressByName(lb.compute, host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = members.Create(lb.network, members.CreateOpts{
|
||||
PoolID: pool.ID,
|
||||
ProtocolPort: int(ports[0].NodePort), //TODO: need to handle multi-port
|
||||
Address: addr,
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
pools.Delete(lb.network, pool.ID)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var mon *monitors.Monitor
|
||||
if lb.opts.CreateMonitor {
|
||||
mon, err = monitors.Create(lb.network, monitors.CreateOpts{
|
||||
Type: monitors.TypeTCP,
|
||||
Delay: int(lb.opts.MonitorDelay.Duration.Seconds()),
|
||||
Timeout: int(lb.opts.MonitorTimeout.Duration.Seconds()),
|
||||
MaxRetries: int(lb.opts.MonitorMaxRetries),
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
pools.Delete(lb.network, pool.ID)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = pools.AssociateMonitor(lb.network, pool.ID, mon.ID).Extract()
|
||||
if err != nil {
|
||||
monitors.Delete(lb.network, mon.ID)
|
||||
pools.Delete(lb.network, pool.ID)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
createOpts := vips.CreateOpts{
|
||||
Name: name,
|
||||
Description: fmt.Sprintf("Kubernetes external service %s", name),
|
||||
Protocol: "TCP",
|
||||
ProtocolPort: int(ports[0].Port), //TODO: need to handle multi-port
|
||||
PoolID: pool.ID,
|
||||
SubnetID: lb.opts.SubnetId,
|
||||
Persistence: persistence,
|
||||
}
|
||||
|
||||
loadBalancerIP := apiService.Spec.LoadBalancerIP
|
||||
if loadBalancerIP != "" {
|
||||
createOpts.Address = loadBalancerIP
|
||||
}
|
||||
|
||||
vip, err := vips.Create(lb.network, createOpts).Extract()
|
||||
if err != nil {
|
||||
if mon != nil {
|
||||
monitors.Delete(lb.network, mon.ID)
|
||||
}
|
||||
pools.Delete(lb.network, pool.ID)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
status := &api.LoadBalancerStatus{}
|
||||
|
||||
status.Ingress = []api.LoadBalancerIngress{{IP: vip.Address}}
|
||||
|
||||
if lb.opts.FloatingNetworkId != "" {
|
||||
floatIPOpts := floatingips.CreateOpts{
|
||||
FloatingNetworkID: lb.opts.FloatingNetworkId,
|
||||
PortID: vip.PortID,
|
||||
}
|
||||
floatIP, err := floatingips.Create(lb.network, floatIPOpts).Extract()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
status.Ingress = append(status.Ingress, api.LoadBalancerIngress{IP: floatIP.FloatingIP})
|
||||
}
|
||||
|
||||
return status, nil
|
||||
|
||||
}
|
||||
|
||||
func (lb *LoadBalancer) UpdateLoadBalancer(service *api.Service, hosts []string) error {
|
||||
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
|
||||
glog.V(4).Infof("UpdateLoadBalancer(%v, %v)", loadBalancerName, hosts)
|
||||
|
||||
vip, err := getVipByName(lb.network, loadBalancerName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set of member (addresses) that _should_ exist
|
||||
addrs := map[string]bool{}
|
||||
for _, host := range hosts {
|
||||
addr, err := getAddressByName(lb.compute, host)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
addrs[addr] = true
|
||||
}
|
||||
|
||||
// Iterate over members that _do_ exist
|
||||
pager := members.List(lb.network, members.ListOpts{PoolID: vip.PoolID})
|
||||
err = pager.EachPage(func(page pagination.Page) (bool, error) {
|
||||
memList, err := members.ExtractMembers(page)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, member := range memList {
|
||||
if _, found := addrs[member.Address]; found {
|
||||
// Member already exists
|
||||
delete(addrs, member.Address)
|
||||
} else {
|
||||
// Member needs to be deleted
|
||||
err = members.Delete(lb.network, member.ID).ExtractErr()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Anything left in addrs is a new member that needs to be added
|
||||
for addr := range addrs {
|
||||
_, err := members.Create(lb.network, members.CreateOpts{
|
||||
PoolID: vip.PoolID,
|
||||
Address: addr,
|
||||
ProtocolPort: vip.ProtocolPort,
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lb *LoadBalancer) EnsureLoadBalancerDeleted(service *api.Service) error {
|
||||
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
|
||||
glog.V(4).Infof("EnsureLoadBalancerDeleted(%v)", loadBalancerName)
|
||||
|
||||
vip, err := getVipByName(lb.network, loadBalancerName)
|
||||
if err != nil && err != ErrNotFound {
|
||||
return err
|
||||
}
|
||||
|
||||
if lb.opts.FloatingNetworkId != "" && vip != nil {
|
||||
floatingIP, err := getFloatingIPByPortID(lb.network, vip.PortID)
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
if floatingIP != nil {
|
||||
err = floatingips.Delete(lb.network, floatingIP.ID).ExtractErr()
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We have to delete the VIP before the pool can be deleted,
|
||||
// so no point continuing if this fails.
|
||||
if vip != nil {
|
||||
err := vips.Delete(lb.network, vip.ID).ExtractErr()
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
var pool *pools.Pool
|
||||
if vip != nil {
|
||||
pool, err = pools.Get(lb.network, vip.PoolID).Extract()
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// The VIP is gone, but it is conceivable that a Pool
|
||||
// still exists that we failed to delete on some
|
||||
// previous occasion. Make a best effort attempt to
|
||||
// cleanup any pools with the same name as the VIP.
|
||||
pool, err = getPoolByName(lb.network, service.Name)
|
||||
if err != nil && err != ErrNotFound {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if pool != nil {
|
||||
for _, monId := range pool.MonitorIDs {
|
||||
_, err = pools.DisassociateMonitor(lb.network, pool.ID, monId).Extract()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = monitors.Delete(lb.network, monId).ExtractErr()
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = pools.Delete(lb.network, pool.ID).ExtractErr()
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (os *OpenStack) Zones() (cloudprovider.Zones, bool) {
|
||||
glog.V(1).Info("Claiming to support Zones")
|
||||
|
||||
|
|
|
@ -0,0 +1,993 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package openstack
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/rackspace/gophercloud"
|
||||
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/layer3/floatingips"
|
||||
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/members"
|
||||
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/monitors"
|
||||
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/pools"
|
||||
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/vips"
|
||||
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/listeners"
|
||||
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/loadbalancers"
|
||||
v2_monitors "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/monitors"
|
||||
v2_pools "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/pools"
|
||||
neutron_ports "github.com/rackspace/gophercloud/openstack/networking/v2/ports"
|
||||
"github.com/rackspace/gophercloud/pagination"
|
||||
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/service"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
)
|
||||
|
||||
// Note: when creating a new Loadbalancer (VM), it can take some time before it is ready for use,
|
||||
// this timeout is used for waiting until the Loadbalancer provisioning status goes to ACTIVE state.
|
||||
const loadbalancerActiveTimeoutSeconds = 120
|
||||
const loadbalancerDeleteTimeoutSeconds = 30
|
||||
|
||||
// LoadBalancer implementation for LBaaS v1
|
||||
type LbaasV1 struct {
|
||||
LoadBalancer
|
||||
}
|
||||
|
||||
// LoadBalancer implementation for LBaaS v2
|
||||
type LbaasV2 struct {
|
||||
LoadBalancer
|
||||
}
|
||||
|
||||
func getPortIDByIP(client *gophercloud.ServiceClient, ipAddress string) (string, error) {
|
||||
var portID string
|
||||
|
||||
err := neutron_ports.List(client, neutron_ports.ListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
|
||||
portList, err := neutron_ports.ExtractPorts(page)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, port := range portList {
|
||||
for _, ip := range port.FixedIPs {
|
||||
if ip.IPAddress == ipAddress {
|
||||
portID = port.ID
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
|
||||
return portID, err
|
||||
}
|
||||
|
||||
func getFloatingIPByPortID(client *gophercloud.ServiceClient, portID string) (*floatingips.FloatingIP, error) {
|
||||
opts := floatingips.ListOpts{
|
||||
PortID: portID,
|
||||
}
|
||||
pager := floatingips.List(client, opts)
|
||||
|
||||
floatingIPList := make([]floatingips.FloatingIP, 0, 1)
|
||||
|
||||
err := pager.EachPage(func(page pagination.Page) (bool, error) {
|
||||
f, err := floatingips.ExtractFloatingIPs(page)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
floatingIPList = append(floatingIPList, f...)
|
||||
if len(floatingIPList) > 1 {
|
||||
return false, ErrMultipleResults
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
if isNotFound(err) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(floatingIPList) == 0 {
|
||||
return nil, ErrNotFound
|
||||
} else if len(floatingIPList) > 1 {
|
||||
return nil, ErrMultipleResults
|
||||
}
|
||||
|
||||
return &floatingIPList[0], nil
|
||||
}
|
||||
|
||||
func getPoolByName(client *gophercloud.ServiceClient, name string) (*pools.Pool, error) {
|
||||
opts := pools.ListOpts{
|
||||
Name: name,
|
||||
}
|
||||
pager := pools.List(client, opts)
|
||||
|
||||
poolList := make([]pools.Pool, 0, 1)
|
||||
|
||||
err := pager.EachPage(func(page pagination.Page) (bool, error) {
|
||||
p, err := pools.ExtractPools(page)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
poolList = append(poolList, p...)
|
||||
if len(poolList) > 1 {
|
||||
return false, ErrMultipleResults
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
if isNotFound(err) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(poolList) == 0 {
|
||||
return nil, ErrNotFound
|
||||
} else if len(poolList) > 1 {
|
||||
return nil, ErrMultipleResults
|
||||
}
|
||||
|
||||
return &poolList[0], nil
|
||||
}
|
||||
|
||||
func getVipByName(client *gophercloud.ServiceClient, name string) (*vips.VirtualIP, error) {
|
||||
opts := vips.ListOpts{
|
||||
Name: name,
|
||||
}
|
||||
pager := vips.List(client, opts)
|
||||
|
||||
vipList := make([]vips.VirtualIP, 0, 1)
|
||||
|
||||
err := pager.EachPage(func(page pagination.Page) (bool, error) {
|
||||
v, err := vips.ExtractVIPs(page)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
vipList = append(vipList, v...)
|
||||
if len(vipList) > 1 {
|
||||
return false, ErrMultipleResults
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
if isNotFound(err) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(vipList) == 0 {
|
||||
return nil, ErrNotFound
|
||||
} else if len(vipList) > 1 {
|
||||
return nil, ErrMultipleResults
|
||||
}
|
||||
|
||||
return &vipList[0], nil
|
||||
}
|
||||
|
||||
func getLoadbalancerByName(client *gophercloud.ServiceClient, name string) (*loadbalancers.LoadBalancer, error) {
|
||||
opts := loadbalancers.ListOpts{
|
||||
Name: name,
|
||||
}
|
||||
pager := loadbalancers.List(client, opts)
|
||||
|
||||
loadbalancerList := make([]loadbalancers.LoadBalancer, 0, 1)
|
||||
|
||||
err := pager.EachPage(func(page pagination.Page) (bool, error) {
|
||||
v, err := loadbalancers.ExtractLoadbalancers(page)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
loadbalancerList = append(loadbalancerList, v...)
|
||||
if len(loadbalancerList) > 1 {
|
||||
return false, ErrMultipleResults
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
if isNotFound(err) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(loadbalancerList) == 0 {
|
||||
return nil, ErrNotFound
|
||||
} else if len(loadbalancerList) > 1 {
|
||||
return nil, ErrMultipleResults
|
||||
}
|
||||
|
||||
return &loadbalancerList[0], nil
|
||||
}
|
||||
|
||||
func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) error {
|
||||
start := time.Now().Second()
|
||||
for {
|
||||
loadbalancer, err := loadbalancers.Get(client, loadbalancerID).Extract()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if loadbalancer.ProvisioningStatus == "ACTIVE" {
|
||||
return nil
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
if time.Now().Second()-start >= loadbalancerActiveTimeoutSeconds {
|
||||
return fmt.Errorf("Loadbalancer failed to go into ACTIVE provisioning status within alloted time")
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func waitLoadbalancerDeleted(client *gophercloud.ServiceClient, loadbalancerID string) error {
|
||||
start := time.Now().Second()
|
||||
for {
|
||||
_, err := loadbalancers.Get(client, loadbalancerID).Extract()
|
||||
if err != nil {
|
||||
if err == ErrNotFound {
|
||||
return nil
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
if time.Now().Second()-start >= loadbalancerDeleteTimeoutSeconds {
|
||||
return fmt.Errorf("Loadbalancer failed to delete within the alloted time")
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (lbaas *LbaasV2) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) {
|
||||
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
|
||||
loadbalancer, err := getLoadbalancerByName(lbaas.network, loadBalancerName)
|
||||
if err == ErrNotFound {
|
||||
return nil, false, nil
|
||||
}
|
||||
if loadbalancer == nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
status := &api.LoadBalancerStatus{}
|
||||
status.Ingress = []api.LoadBalancerIngress{{IP: loadbalancer.VipAddress}}
|
||||
|
||||
return status, true, err
|
||||
}
|
||||
|
||||
// TODO: This code currently ignores 'region' and always creates a
|
||||
// loadbalancer in only the current OpenStack region. We should take
|
||||
// a list of regions (from config) and query/create loadbalancers in
|
||||
// each region.
|
||||
|
||||
func (lbaas *LbaasV2) EnsureLoadBalancer(apiService *api.Service, hosts []string) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, apiService.Annotations)
|
||||
|
||||
ports := apiService.Spec.Ports
|
||||
if len(ports) > 1 {
|
||||
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
|
||||
} else if len(ports) == 0 {
|
||||
return nil, fmt.Errorf("no ports provided to openstack load balancer")
|
||||
}
|
||||
|
||||
// The service controller verified all the protocols match on the ports, just check and use the first one
|
||||
// TODO: Convert all error messages to use an event recorder
|
||||
if ports[0].Protocol != api.ProtocolTCP {
|
||||
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers")
|
||||
}
|
||||
|
||||
affinity := api.ServiceAffinityNone //apiService.Spec.SessionAffinity
|
||||
var persistence *v2_pools.SessionPersistence
|
||||
switch affinity {
|
||||
case api.ServiceAffinityNone:
|
||||
persistence = nil
|
||||
case api.ServiceAffinityClientIP:
|
||||
persistence = &v2_pools.SessionPersistence{Type: "SOURCE_IP"}
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
|
||||
}
|
||||
|
||||
sourceRanges, err := service.GetLoadBalancerSourceRanges(apiService)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !service.IsAllowAll(sourceRanges) {
|
||||
return nil, fmt.Errorf("Source range restrictions are not supported for openstack load balancers")
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Checking if openstack load balancer already exists: %s", cloudprovider.GetLoadBalancerName(apiService))
|
||||
_, exists, err := lbaas.GetLoadBalancer(apiService)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err)
|
||||
}
|
||||
|
||||
// TODO: Implement a more efficient update strategy for common changes than delete & create
|
||||
// In particular, if we implement hosts update, we can get rid of UpdateHosts
|
||||
if exists {
|
||||
err := lbaas.EnsureLoadBalancerDeleted(apiService)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
lbmethod := v2_pools.LBMethod(lbaas.opts.LBMethod)
|
||||
if lbmethod == "" {
|
||||
lbmethod = v2_pools.LBMethodRoundRobin
|
||||
}
|
||||
name := cloudprovider.GetLoadBalancerName(apiService)
|
||||
|
||||
createOpts := loadbalancers.CreateOpts{
|
||||
Name: name,
|
||||
Description: fmt.Sprintf("Kubernetes external service %s", name),
|
||||
VipSubnetID: lbaas.opts.SubnetId,
|
||||
}
|
||||
|
||||
loadBalancerIP := apiService.Spec.LoadBalancerIP
|
||||
if loadBalancerIP != "" {
|
||||
createOpts.VipAddress = loadBalancerIP
|
||||
}
|
||||
|
||||
loadbalancer, err := loadbalancers.Create(lbaas.network, createOpts).Extract()
|
||||
if err != nil {
|
||||
// cleanup what was created so far
|
||||
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
||||
|
||||
listener, err := listeners.Create(lbaas.network, listeners.CreateOpts{
|
||||
Name: name,
|
||||
Protocol: listeners.ProtocolTCP,
|
||||
ProtocolPort: (int)(ports[0].Port), //TODO: need to handle multi-port
|
||||
LoadbalancerID: loadbalancer.ID,
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
// cleanup what was created so far
|
||||
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
||||
|
||||
pool, err := v2_pools.Create(lbaas.network, v2_pools.CreateOpts{
|
||||
Name: name,
|
||||
Protocol: v2_pools.ProtocolTCP,
|
||||
LBMethod: lbmethod,
|
||||
ListenerID: listener.ID,
|
||||
Persistence: persistence,
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
// cleanup what was created so far
|
||||
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, host := range hosts {
|
||||
addr, err := getAddressByName(lbaas.compute, host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
||||
|
||||
_, err = v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{
|
||||
Name: name,
|
||||
ProtocolPort: int(ports[0].NodePort), //TODO: need to handle multi-port
|
||||
Address: addr,
|
||||
SubnetID: lbaas.opts.SubnetId,
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
// cleanup what was created so far
|
||||
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if lbaas.opts.CreateMonitor {
|
||||
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
||||
|
||||
_, err = v2_monitors.Create(lbaas.network, v2_monitors.CreateOpts{
|
||||
PoolID: pool.ID,
|
||||
Type: monitors.TypeTCP,
|
||||
Delay: int(lbaas.opts.MonitorDelay.Duration.Seconds()),
|
||||
Timeout: int(lbaas.opts.MonitorTimeout.Duration.Seconds()),
|
||||
MaxRetries: int(lbaas.opts.MonitorMaxRetries),
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
// cleanup what was created so far
|
||||
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
status := &api.LoadBalancerStatus{}
|
||||
|
||||
status.Ingress = []api.LoadBalancerIngress{{IP: loadbalancer.VipAddress}}
|
||||
|
||||
if lbaas.opts.FloatingNetworkId != "" {
|
||||
portID, err := getPortIDByIP(lbaas.network, loadbalancer.VipAddress)
|
||||
if err != nil {
|
||||
// cleanup what was created so far
|
||||
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
floatIPOpts := floatingips.CreateOpts{
|
||||
FloatingNetworkID: lbaas.opts.FloatingNetworkId,
|
||||
PortID: portID,
|
||||
}
|
||||
floatIP, err := floatingips.Create(lbaas.network, floatIPOpts).Extract()
|
||||
if err != nil {
|
||||
// cleanup what was created so far
|
||||
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
status.Ingress = append(status.Ingress, api.LoadBalancerIngress{IP: floatIP.FloatingIP})
|
||||
}
|
||||
|
||||
return status, nil
|
||||
}
|
||||
|
||||
func (lbaas *LbaasV2) UpdateLoadBalancer(service *api.Service, hosts []string) error {
|
||||
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
|
||||
glog.V(4).Infof("UpdateLoadBalancer(%v, %v)", loadBalancerName, hosts)
|
||||
|
||||
ports := service.Spec.Ports
|
||||
if len(ports) > 1 {
|
||||
return fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
|
||||
} else if len(ports) == 0 {
|
||||
return fmt.Errorf("no ports provided to openstack load balancer")
|
||||
}
|
||||
|
||||
loadbalancer, err := getLoadbalancerByName(lbaas.network, loadBalancerName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if loadbalancer == nil {
|
||||
return fmt.Errorf("Loadbalancer %s does not exist", loadBalancerName)
|
||||
}
|
||||
|
||||
// Set of member (addresses) that _should_ exist
|
||||
addrs := map[string]bool{}
|
||||
for _, host := range hosts {
|
||||
addr, err := getAddressByName(lbaas.compute, host)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
addrs[addr] = true
|
||||
}
|
||||
|
||||
// Iterate over members in each pool that _do_ exist
|
||||
var poolID string
|
||||
err = v2_pools.List(lbaas.network, v2_pools.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) {
|
||||
poolsList, err := v2_pools.ExtractPools(page)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, pool := range poolsList {
|
||||
poolID = pool.ID
|
||||
err := v2_pools.ListAssociateMembers(lbaas.network, poolID, v2_pools.MemberListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
|
||||
membersList, err := v2_pools.ExtractMembers(page)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, member := range membersList {
|
||||
if _, found := addrs[member.Address]; found {
|
||||
// Member already exists, remove from update list
|
||||
delete(addrs, member.Address)
|
||||
} else {
|
||||
// Member needs to be deleted
|
||||
err = v2_pools.DeleteMember(lbaas.network, poolID, member.ID).ExtractErr()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Anything left in addrs is a new member that needs to be added to a pool
|
||||
for addr := range addrs {
|
||||
_, err := v2_pools.CreateAssociateMember(lbaas.network, poolID, v2_pools.MemberCreateOpts{
|
||||
Address: addr,
|
||||
ProtocolPort: int(ports[0].NodePort),
|
||||
SubnetID: lbaas.opts.SubnetId,
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lbaas *LbaasV2) EnsureLoadBalancerDeleted(service *api.Service) error {
|
||||
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
|
||||
glog.V(4).Infof("EnsureLoadBalancerDeleted(%v)", loadBalancerName)
|
||||
|
||||
loadbalancer, err := getLoadbalancerByName(lbaas.network, loadBalancerName)
|
||||
if err != nil && err != ErrNotFound {
|
||||
return err
|
||||
}
|
||||
if loadbalancer == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if lbaas.opts.FloatingNetworkId != "" && loadbalancer != nil {
|
||||
portID, err := getPortIDByIP(lbaas.network, loadbalancer.VipAddress)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
floatingIP, err := getFloatingIPByPortID(lbaas.network, portID)
|
||||
if err != nil && err != ErrNotFound {
|
||||
return err
|
||||
}
|
||||
if floatingIP != nil {
|
||||
err = floatingips.Delete(lbaas.network, floatingIP.ID).ExtractErr()
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// get all listeners associated with this loadbalancer
|
||||
var listenerIDs []string
|
||||
err = listeners.List(lbaas.network, listeners.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) {
|
||||
listenerList, err := listeners.ExtractListeners(page)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, listener := range listenerList {
|
||||
listenerIDs = append(listenerIDs, listener.ID)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// get all pools associated with this loadbalancer
|
||||
var poolIDs []string
|
||||
err = v2_pools.List(lbaas.network, v2_pools.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) {
|
||||
poolsList, err := v2_pools.ExtractPools(page)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, pool := range poolsList {
|
||||
poolIDs = append(poolIDs, pool.ID)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// get all members associated with each poolIDs
|
||||
var memberIDs []string
|
||||
for _, poolID := range poolIDs {
|
||||
err := v2_pools.ListAssociateMembers(lbaas.network, poolID, v2_pools.MemberListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
|
||||
membersList, err := v2_pools.ExtractMembers(page)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, member := range membersList {
|
||||
memberIDs = append(memberIDs, member.ID)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// get all monitors associated with each poolIDs
|
||||
var monitorIDs []string
|
||||
for _, poolID := range poolIDs {
|
||||
err = v2_monitors.List(lbaas.network, v2_monitors.ListOpts{PoolID: poolID}).EachPage(func(page pagination.Page) (bool, error) {
|
||||
monitorsList, err := v2_monitors.ExtractMonitors(page)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, monitor := range monitorsList {
|
||||
monitorIDs = append(monitorIDs, monitor.ID)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// delete all monitors
|
||||
for _, monitorID := range monitorIDs {
|
||||
err := v2_monitors.Delete(lbaas.network, monitorID).ExtractErr()
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
||||
}
|
||||
|
||||
// delete all members and pools
|
||||
for _, poolID := range poolIDs {
|
||||
// delete all members for this pool
|
||||
for _, memberID := range memberIDs {
|
||||
err := v2_pools.DeleteMember(lbaas.network, poolID, memberID).ExtractErr()
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
||||
}
|
||||
|
||||
// delete pool
|
||||
err := v2_pools.Delete(lbaas.network, poolID).ExtractErr()
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
||||
}
|
||||
|
||||
// delete all listeners
|
||||
for _, listenerID := range listenerIDs {
|
||||
err := listeners.Delete(lbaas.network, listenerID).ExtractErr()
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
||||
}
|
||||
|
||||
// delete loadbalancer
|
||||
err = loadbalancers.Delete(lbaas.network, loadbalancer.ID).ExtractErr()
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
waitLoadbalancerDeleted(lbaas.network, loadbalancer.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lb *LbaasV1) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) {
|
||||
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
|
||||
vip, err := getVipByName(lb.network, loadBalancerName)
|
||||
if err == ErrNotFound {
|
||||
return nil, false, nil
|
||||
}
|
||||
if vip == nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
status := &api.LoadBalancerStatus{}
|
||||
status.Ingress = []api.LoadBalancerIngress{{IP: vip.Address}}
|
||||
|
||||
return status, true, err
|
||||
}
|
||||
|
||||
// TODO: This code currently ignores 'region' and always creates a
|
||||
// loadbalancer in only the current OpenStack region. We should take
|
||||
// a list of regions (from config) and query/create loadbalancers in
|
||||
// each region.
|
||||
|
||||
func (lb *LbaasV1) EnsureLoadBalancer(apiService *api.Service, hosts []string) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, apiService.Annotations)
|
||||
|
||||
ports := apiService.Spec.Ports
|
||||
if len(ports) > 1 {
|
||||
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
|
||||
} else if len(ports) == 0 {
|
||||
return nil, fmt.Errorf("no ports provided to openstack load balancer")
|
||||
}
|
||||
|
||||
// The service controller verified all the protocols match on the ports, just check and use the first one
|
||||
// TODO: Convert all error messages to use an event recorder
|
||||
if ports[0].Protocol != api.ProtocolTCP {
|
||||
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers")
|
||||
}
|
||||
|
||||
affinity := apiService.Spec.SessionAffinity
|
||||
var persistence *vips.SessionPersistence
|
||||
switch affinity {
|
||||
case api.ServiceAffinityNone:
|
||||
persistence = nil
|
||||
case api.ServiceAffinityClientIP:
|
||||
persistence = &vips.SessionPersistence{Type: "SOURCE_IP"}
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
|
||||
}
|
||||
|
||||
sourceRanges, err := service.GetLoadBalancerSourceRanges(apiService)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !service.IsAllowAll(sourceRanges) {
|
||||
return nil, fmt.Errorf("Source range restrictions are not supported for openstack load balancers")
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Checking if openstack load balancer already exists: %s", cloudprovider.GetLoadBalancerName(apiService))
|
||||
_, exists, err := lb.GetLoadBalancer(apiService)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err)
|
||||
}
|
||||
|
||||
// TODO: Implement a more efficient update strategy for common changes than delete & create
|
||||
// In particular, if we implement hosts update, we can get rid of UpdateHosts
|
||||
if exists {
|
||||
err := lb.EnsureLoadBalancerDeleted(apiService)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
lbmethod := lb.opts.LBMethod
|
||||
if lbmethod == "" {
|
||||
lbmethod = pools.LBMethodRoundRobin
|
||||
}
|
||||
name := cloudprovider.GetLoadBalancerName(apiService)
|
||||
pool, err := pools.Create(lb.network, pools.CreateOpts{
|
||||
Name: name,
|
||||
Protocol: pools.ProtocolTCP,
|
||||
SubnetID: lb.opts.SubnetId,
|
||||
LBMethod: lbmethod,
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, host := range hosts {
|
||||
addr, err := getAddressByName(lb.compute, host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = members.Create(lb.network, members.CreateOpts{
|
||||
PoolID: pool.ID,
|
||||
ProtocolPort: int(ports[0].NodePort), //TODO: need to handle multi-port
|
||||
Address: addr,
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
pools.Delete(lb.network, pool.ID)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var mon *monitors.Monitor
|
||||
if lb.opts.CreateMonitor {
|
||||
mon, err = monitors.Create(lb.network, monitors.CreateOpts{
|
||||
Type: monitors.TypeTCP,
|
||||
Delay: int(lb.opts.MonitorDelay.Duration.Seconds()),
|
||||
Timeout: int(lb.opts.MonitorTimeout.Duration.Seconds()),
|
||||
MaxRetries: int(lb.opts.MonitorMaxRetries),
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
pools.Delete(lb.network, pool.ID)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = pools.AssociateMonitor(lb.network, pool.ID, mon.ID).Extract()
|
||||
if err != nil {
|
||||
monitors.Delete(lb.network, mon.ID)
|
||||
pools.Delete(lb.network, pool.ID)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
createOpts := vips.CreateOpts{
|
||||
Name: name,
|
||||
Description: fmt.Sprintf("Kubernetes external service %s", name),
|
||||
Protocol: "TCP",
|
||||
ProtocolPort: int(ports[0].Port), //TODO: need to handle multi-port
|
||||
PoolID: pool.ID,
|
||||
SubnetID: lb.opts.SubnetId,
|
||||
Persistence: persistence,
|
||||
}
|
||||
|
||||
loadBalancerIP := apiService.Spec.LoadBalancerIP
|
||||
if loadBalancerIP != "" {
|
||||
createOpts.Address = loadBalancerIP
|
||||
}
|
||||
|
||||
vip, err := vips.Create(lb.network, createOpts).Extract()
|
||||
if err != nil {
|
||||
if mon != nil {
|
||||
monitors.Delete(lb.network, mon.ID)
|
||||
}
|
||||
pools.Delete(lb.network, pool.ID)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
status := &api.LoadBalancerStatus{}
|
||||
|
||||
status.Ingress = []api.LoadBalancerIngress{{IP: vip.Address}}
|
||||
|
||||
if lb.opts.FloatingNetworkId != "" {
|
||||
floatIPOpts := floatingips.CreateOpts{
|
||||
FloatingNetworkID: lb.opts.FloatingNetworkId,
|
||||
PortID: vip.PortID,
|
||||
}
|
||||
floatIP, err := floatingips.Create(lb.network, floatIPOpts).Extract()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
status.Ingress = append(status.Ingress, api.LoadBalancerIngress{IP: floatIP.FloatingIP})
|
||||
}
|
||||
|
||||
return status, nil
|
||||
|
||||
}
|
||||
|
||||
func (lb *LbaasV1) UpdateLoadBalancer(service *api.Service, hosts []string) error {
|
||||
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
|
||||
glog.V(4).Infof("UpdateLoadBalancer(%v, %v)", loadBalancerName, hosts)
|
||||
|
||||
vip, err := getVipByName(lb.network, loadBalancerName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set of member (addresses) that _should_ exist
|
||||
addrs := map[string]bool{}
|
||||
for _, host := range hosts {
|
||||
addr, err := getAddressByName(lb.compute, host)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
addrs[addr] = true
|
||||
}
|
||||
|
||||
// Iterate over members that _do_ exist
|
||||
pager := members.List(lb.network, members.ListOpts{PoolID: vip.PoolID})
|
||||
err = pager.EachPage(func(page pagination.Page) (bool, error) {
|
||||
memList, err := members.ExtractMembers(page)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, member := range memList {
|
||||
if _, found := addrs[member.Address]; found {
|
||||
// Member already exists
|
||||
delete(addrs, member.Address)
|
||||
} else {
|
||||
// Member needs to be deleted
|
||||
err = members.Delete(lb.network, member.ID).ExtractErr()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Anything left in addrs is a new member that needs to be added
|
||||
for addr := range addrs {
|
||||
_, err := members.Create(lb.network, members.CreateOpts{
|
||||
PoolID: vip.PoolID,
|
||||
Address: addr,
|
||||
ProtocolPort: vip.ProtocolPort,
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lb *LbaasV1) EnsureLoadBalancerDeleted(service *api.Service) error {
|
||||
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
|
||||
glog.V(4).Infof("EnsureLoadBalancerDeleted(%v)", loadBalancerName)
|
||||
|
||||
vip, err := getVipByName(lb.network, loadBalancerName)
|
||||
if err != nil && err != ErrNotFound {
|
||||
return err
|
||||
}
|
||||
|
||||
if lb.opts.FloatingNetworkId != "" && vip != nil {
|
||||
floatingIP, err := getFloatingIPByPortID(lb.network, vip.PortID)
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
if floatingIP != nil {
|
||||
err = floatingips.Delete(lb.network, floatingIP.ID).ExtractErr()
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We have to delete the VIP before the pool can be deleted,
|
||||
// so no point continuing if this fails.
|
||||
if vip != nil {
|
||||
err := vips.Delete(lb.network, vip.ID).ExtractErr()
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
var pool *pools.Pool
|
||||
if vip != nil {
|
||||
pool, err = pools.Get(lb.network, vip.PoolID).Extract()
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// The VIP is gone, but it is conceivable that a Pool
|
||||
// still exists that we failed to delete on some
|
||||
// previous occasion. Make a best effort attempt to
|
||||
// cleanup any pools with the same name as the VIP.
|
||||
pool, err = getPoolByName(lb.network, service.Name)
|
||||
if err != nil && err != ErrNotFound {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if pool != nil {
|
||||
for _, monId := range pool.MonitorIDs {
|
||||
_, err = pools.DisassociateMonitor(lb.network, pool.ID, monId).Extract()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = monitors.Delete(lb.network, monId).ExtractErr()
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = pools.Delete(lb.network, pool.ID).ExtractErr()
|
||||
if err != nil && !isNotFound(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -67,15 +67,15 @@ func TestReadConfig(t *testing.T) {
|
|||
}
|
||||
|
||||
cfg, err := readConfig(strings.NewReader(`
|
||||
[Global]
|
||||
auth-url = http://auth.url
|
||||
username = user
|
||||
[LoadBalancer]
|
||||
create-monitor = yes
|
||||
monitor-delay = 1m
|
||||
monitor-timeout = 30s
|
||||
monitor-max-retries = 3
|
||||
`))
|
||||
[Global]
|
||||
auth-url = http://auth.url
|
||||
username = user
|
||||
[LoadBalancer]
|
||||
create-monitor = yes
|
||||
monitor-delay = 1m
|
||||
monitor-timeout = 30s
|
||||
monitor-max-retries = 3
|
||||
`))
|
||||
if err != nil {
|
||||
t.Fatalf("Should succeed when a valid config is provided: %s", err)
|
||||
}
|
||||
|
@ -204,6 +204,8 @@ func TestLoadBalancer(t *testing.T) {
|
|||
t.Skipf("No config found in environment")
|
||||
}
|
||||
|
||||
cfg.LoadBalancer.LBVersion = "v2"
|
||||
|
||||
os, err := newOpenStack(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct/authenticate OpenStack: %s", err)
|
||||
|
@ -223,6 +225,32 @@ func TestLoadBalancer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestLoadBalancerV2(t *testing.T) {
|
||||
cfg, ok := configFromEnv()
|
||||
if !ok {
|
||||
t.Skipf("No config found in environment")
|
||||
}
|
||||
cfg.LoadBalancer.LBVersion = "v2"
|
||||
|
||||
os, err := newOpenStack(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct/authenticate OpenStack: %s", err)
|
||||
}
|
||||
|
||||
lbaas, ok := os.LoadBalancer()
|
||||
if !ok {
|
||||
t.Fatalf("LoadBalancer() returned false - perhaps your stack doesn't support Neutron?")
|
||||
}
|
||||
|
||||
_, exists, err := lbaas.GetLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "noexist"}})
|
||||
if err != nil {
|
||||
t.Fatalf("GetLoadBalancer(\"noexist\") returned error: %s", err)
|
||||
}
|
||||
if exists {
|
||||
t.Fatalf("GetLoadBalancer(\"noexist\") returned exists")
|
||||
}
|
||||
}
|
||||
|
||||
func TestZones(t *testing.T) {
|
||||
os := OpenStack{
|
||||
provider: &gophercloud.ProviderClient{
|
||||
|
|
Loading…
Reference in New Issue