Use interface

Update openstack client and iptables to use interface, so we
can fake openstack client and iptables for unit tests.

Change-Id: Id97eef2b9b36002b0c70a069a3718328d111ffd3
Implements: blueprint enhance-unit-testing
Signed-off-by: mozhuli <21621232@zju.edu.cn>
This commit is contained in:
mozhulee 2017-08-20 19:38:56 +08:00 committed by mozhuli
parent 5ffb3de826
commit 30669fdc5b
9 changed files with 225 additions and 76 deletions

View File

@ -38,8 +38,6 @@ import (
cniSpecVersion "github.com/containernetworking/cni/pkg/version"
"github.com/containernetworking/plugins/pkg/ns"
"github.com/golang/glog"
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/portsbinding"
"github.com/gophercloud/gophercloud/openstack/networking/v2/ports"
// import plugins
_ "git.openstack.org/openstack/stackube/pkg/kubestack/plugins/openvswitch"
@ -54,7 +52,7 @@ var (
// OpenStack describes openstack client and its plugins.
type OpenStack struct {
Client openstack.Client
Client openstack.Interface
Plugin plugins.PluginInterface
}
@ -124,15 +122,15 @@ func initOpenstack(stdinData []byte) (OpenStack, string, error) {
}
os := OpenStack{
Client: *openStackClient,
Client: openStackClient,
}
// Init plugin
pluginName := os.Client.PluginName
pluginName := os.Client.GetPluginName()
if pluginName == "" {
pluginName = "ovs"
}
integrationBridge := os.Client.IntegrationBridge
integrationBridge := os.Client.GetIntegrationBridge()
if integrationBridge == "" {
integrationBridge = "br-int"
}
@ -194,16 +192,11 @@ func cmdAdd(args *skel.CmdArgs) error {
deviceOwner := fmt.Sprintf("compute:%s", getHostName())
if port.DeviceOwner != deviceOwner {
// Update hostname in order to make sure it is correct
updateOpts := portsbinding.UpdateOpts{
HostID: getHostName(),
UpdateOptsBuilder: ports.UpdateOpts{
DeviceOwner: deviceOwner,
},
}
_, err = portsbinding.Update(os.Client.Network, port.ID, updateOpts).Extract()
err := os.Client.UpdatePortsBinding(port.ID, deviceOwner)
if err != nil {
ports.Delete(os.Client.Network, port.ID)
if os.Client.DeletePortByID(port.ID) != nil {
glog.Warningf("Delete port %s failed", port.ID)
}
glog.Errorf("Update port %s failed: %v", portName, err)
return err
}
@ -214,7 +207,7 @@ func cmdAdd(args *skel.CmdArgs) error {
subnet, err := os.Client.GetProviderSubnet(port.FixedIPs[0].SubnetID)
if err != nil {
glog.Errorf("Get info of subnet %s failed: %v", port.FixedIPs[0].SubnetID, err)
if nil != ports.Delete(os.Client.Network, port.ID).ExtractErr() {
if os.Client.DeletePortByID(port.ID) != nil {
glog.Warningf("Delete port %s failed", port.ID)
}
return err
@ -247,7 +240,7 @@ func cmdAdd(args *skel.CmdArgs) error {
subnet.Gateway, args.IfName, netnsName)
if err != nil {
glog.Errorf("SetupInterface failed: %v", err)
if nil != ports.Delete(os.Client.Network, port.ID).ExtractErr() {
if os.Client.DeletePortByID(port.ID) != nil {
glog.Warningf("Delete port %s failed", port.ID)
}
return err
@ -316,9 +309,9 @@ func cmdDel(args *skel.CmdArgs) error {
}
// Delete port from openstack
err = osClient.Client.DeletePort(portName)
err = osClient.Client.DeletePortByName(portName)
if err != nil {
glog.Errorf("DeletePort %s failed: %v", portName, err)
glog.Errorf("Delete port %s failed: %v", portName, err)
return err
}

View File

@ -48,7 +48,7 @@ var (
)
func startControllers(kubeClient *kubernetes.Clientset,
osClient *openstack.Client, kubeExtClient *extclientset.Clientset) error {
osClient openstack.Interface, kubeExtClient *extclientset.Clientset) error {
// Creates a new Tenant controller
tenantController, err := tenant.NewTenantController(kubeClient, osClient, kubeExtClient)
if err != nil {
@ -62,7 +62,7 @@ func startControllers(kubeClient *kubernetes.Clientset,
}
// Creates a new RBAC controller
rbacController, err := rbacmanager.NewRBACController(kubeClient, osClient.CRDClient, *userCIDR, *userGateway)
rbacController, err := rbacmanager.NewRBACController(kubeClient, osClient.GetCRDClient(), *userCIDR, *userGateway)
if err != nil {
return err
}
@ -104,7 +104,7 @@ func startControllers(kubeClient *kubernetes.Clientset,
return nil
}
func initClients() (*kubernetes.Clientset, *openstack.Client, *extclientset.Clientset, error) {
func initClients() (*kubernetes.Clientset, openstack.Interface, *extclientset.Clientset, error) {
// Create kubernetes client config. Use kubeconfig if given, otherwise assume in-cluster.
config, err := util.NewClusterConfig(*kubeconfig)
if err != nil {

View File

@ -38,12 +38,12 @@ import (
type TenantController struct {
k8sClient *kubernetes.Clientset
kubeCRDClient *crdClient.CRDClient
openstackClient *openstack.Client
openstackClient openstack.Interface
}
// NewTenantController creates a new tenant controller.
func NewTenantController(kubeClient *kubernetes.Clientset,
osClient *openstack.Client,
osClient openstack.Interface,
kubeExtClient *apiextensionsclient.Clientset) (*TenantController, error) {
// initialize CRD if it does not exist
_, err := crdClient.CreateTenantCRD(kubeExtClient)
@ -52,7 +52,7 @@ func NewTenantController(kubeClient *kubernetes.Clientset,
}
c := &TenantController{
kubeCRDClient: osClient.CRDClient,
kubeCRDClient: osClient.GetCRDClient(),
k8sClient: kubeClient,
openstackClient: osClient,
}

View File

@ -52,7 +52,7 @@ const (
type NetworkController struct {
k8sclient *kubernetes.Clientset
kubeCRDClient *kubecrd.CRDClient
driver *openstack.Client
driver openstack.Interface
networkInformer cache.Controller
}
@ -69,7 +69,7 @@ func (c *NetworkController) Run(stopCh <-chan struct{}) error {
return nil
}
func NewNetworkController(kubeClient *kubernetes.Clientset, osClient *openstack.Client, kubeExtClient *apiextensionsclient.Clientset) (*NetworkController, error) {
func NewNetworkController(kubeClient *kubernetes.Clientset, osClient openstack.Interface, kubeExtClient *apiextensionsclient.Clientset) (*NetworkController, error) {
// initialize CRD if it does not exist
_, err := kubecrd.CreateNetworkCRD(kubeExtClient)
if err != nil && !apierrors.IsAlreadyExists(err) {
@ -77,13 +77,13 @@ func NewNetworkController(kubeClient *kubernetes.Clientset, osClient *openstack.
}
source := cache.NewListWatchFromClient(
osClient.CRDClient.Client,
osClient.GetCRDClient().Client,
crv1.NetworkResourcePlural,
apiv1.NamespaceAll,
fields.Everything())
networkController := &NetworkController{
k8sclient: kubeClient,
kubeCRDClient: osClient.CRDClient,
kubeCRDClient: osClient.GetCRDClient(),
driver: osClient,
}
_, networkInformer := cache.NewInformer(

View File

@ -63,6 +63,56 @@ var (
ErrMultipleResults = errors.New("MultipleResults")
)
// Interface should be implemented by a openstack client.
type Interface interface {
// CreateTenant creates tenant by tenantname.
CreateTenant(tenantName string) (string, error)
// DeleteTenant deletes tenant by tenantName.
DeleteTenant(tenantName string) error
// GetTenantIDFromName gets tenantID by tenantName.
GetTenantIDFromName(tenantName string) (string, error)
// CheckTenantID checks tenant exist or not.
CheckTenantID(tenantID string) (bool, error)
// CreateUser creates user with username, password in the tenant.
CreateUser(username, password, tenantID string) error
// DeleteAllUsersOnTenant deletes all users on the tenant.
DeleteAllUsersOnTenant(tenantName string) error
// CreateNetwork creates network.
CreateNetwork(network *drivertypes.Network) error
// GetNetworkByID gets network by networkID.
GetNetworkByID(networkID string) (*drivertypes.Network, error)
// GetNetwork gets networks by networkName.
GetNetwork(networkName string) (*drivertypes.Network, error)
// DeleteNetwork deletes network by networkName.
DeleteNetwork(networkName string) error
// GetProviderSubnet gets provider subnet by id
GetProviderSubnet(osSubnetID string) (*drivertypes.Subnet, error)
// CreatePort creates port by neworkID, tenantID and portName.
CreatePort(networkID, tenantID, portName string) (*portsbinding.Port, error)
// GetPort gets port by portName.
GetPort(name string) (*ports.Port, error)
// ListPorts lists ports by networkID and deviceOwner.
ListPorts(networkID, deviceOwner string) ([]ports.Port, error)
// DeletePortByName deletes port by portName.
DeletePortByName(portName string) error
// DeletePortByID deletes port by portID.
DeletePortByID(portID string) error
// UpdatePortsBinding updates port binding.
UpdatePortsBinding(portID, deviceOwner string) error
// LoadBalancerExist returns whether a load balancer has already been exist.
LoadBalancerExist(name string) (bool, error)
// EnsureLoadBalancer ensures a load balancer is created.
EnsureLoadBalancer(lb *LoadBalancer) (*LoadBalancerStatus, error)
// EnsureLoadBalancerDeleted ensures a load balancer is deleted.
EnsureLoadBalancerDeleted(name string) error
// GetCRDClient returns the CRDClient.
GetCRDClient() *crdClient.CRDClient
// GetPluginName returns the plugin name.
GetPluginName() string
// GetIntegrationBridge returns the integration bridge name.
GetIntegrationBridge() string
}
type Client struct {
Identity *gophercloud.ServiceClient
Provider *gophercloud.ProviderClient
@ -101,7 +151,7 @@ func toAuthOptions(cfg Config) gophercloud.AuthOptions {
}
}
func NewClient(config string, kubeConfig string) (*Client, error) {
func NewClient(config string, kubeConfig string) (Interface, error) {
var opts gophercloud.AuthOptions
cfg, err := readConfig(config)
if err != nil {
@ -170,7 +220,22 @@ func readConfig(config string) (Config, error) {
return cfg, nil
}
func (c *Client) GetTenantIDFromName(tenantName string) (string, error) {
// GetCRDClient returns the CRDClient.
func (os *Client) GetCRDClient() *crdClient.CRDClient {
return os.CRDClient
}
// GetPluginName returns the plugin name.
func (os *Client) GetPluginName() string {
return os.PluginName
}
// GetIntegrationBridge returns the integration bridge name.
func (os *Client) GetIntegrationBridge() string {
return os.IntegrationBridge
}
func (os *Client) GetTenantIDFromName(tenantName string) (string, error) {
if util.IsSystemNamespace(tenantName) {
tenantName = util.SystemTenant
}
@ -180,7 +245,7 @@ func (c *Client) GetTenantIDFromName(tenantName string) (string, error) {
tenant *crv1.Tenant
err error
)
if tenant, err = c.CRDClient.GetTenant(tenantName); err != nil {
if tenant, err = os.CRDClient.GetTenant(tenantName); err != nil {
return "", err
}
if tenant.Spec.TenantID != "" {
@ -189,7 +254,7 @@ func (c *Client) GetTenantIDFromName(tenantName string) (string, error) {
// Otherwise, fetch tenantID from OpenStack
var tenantID string
err = tenants.List(c.Identity, nil).EachPage(func(page pagination.Page) (bool, error) {
err = tenants.List(os.Identity, nil).EachPage(func(page pagination.Page) (bool, error) {
tenantList, err1 := tenants.ExtractTenants(page)
if err1 != nil {
return false, err1
@ -211,35 +276,35 @@ func (c *Client) GetTenantIDFromName(tenantName string) (string, error) {
return tenantID, nil
}
func (c *Client) CreateTenant(tenantName string) (string, error) {
func (os *Client) CreateTenant(tenantName string) (string, error) {
createOpts := tenants.CreateOpts{
Name: tenantName,
Description: "stackube",
Enabled: gophercloud.Enabled,
}
_, err := tenants.Create(c.Identity, createOpts).Extract()
_, err := tenants.Create(os.Identity, createOpts).Extract()
if err != nil && !IsAlreadyExists(err) {
glog.Errorf("Failed to create tenant %s: %v", tenantName, err)
return "", err
}
glog.V(4).Infof("Tenant %s created", tenantName)
tenantID, err := c.GetTenantIDFromName(tenantName)
tenantID, err := os.GetTenantIDFromName(tenantName)
if err != nil {
return "", err
}
return tenantID, nil
}
func (c *Client) DeleteTenant(tenantName string) error {
return tenants.List(c.Identity, nil).EachPage(func(page pagination.Page) (bool, error) {
func (os *Client) DeleteTenant(tenantName string) error {
return tenants.List(os.Identity, nil).EachPage(func(page pagination.Page) (bool, error) {
tenantList, err := tenants.ExtractTenants(page)
if err != nil {
return false, err
}
for _, t := range tenantList {
if t.Name == tenantName {
err := tenants.Delete(c.Identity, t.ID).ExtractErr()
err := tenants.Delete(os.Identity, t.ID).ExtractErr()
if err != nil {
glog.Errorf("Delete openstack tenant %s error: %v", tenantName, err)
return false, err
@ -252,14 +317,14 @@ func (c *Client) DeleteTenant(tenantName string) error {
})
}
func (c *Client) CreateUser(username, password, tenantID string) error {
func (os *Client) CreateUser(username, password, tenantID string) error {
opts := users.CreateOpts{
Name: username,
TenantID: tenantID,
Enabled: gophercloud.Enabled,
Password: password,
}
_, err := users.Create(c.Identity, opts).Extract()
_, err := users.Create(os.Identity, opts).Extract()
if err != nil && !IsAlreadyExists(err) {
glog.Errorf("Failed to create user %s: %v", username, err)
return err
@ -268,18 +333,18 @@ func (c *Client) CreateUser(username, password, tenantID string) error {
return nil
}
func (c *Client) DeleteAllUsersOnTenant(tenantName string) error {
tenantID, err := c.GetTenantIDFromName(tenantName)
func (os *Client) DeleteAllUsersOnTenant(tenantName string) error {
tenantID, err := os.GetTenantIDFromName(tenantName)
if err != nil {
return nil
}
return users.ListUsers(c.Identity, tenantID).EachPage(func(page pagination.Page) (bool, error) {
return users.ListUsers(os.Identity, tenantID).EachPage(func(page pagination.Page) (bool, error) {
usersList, err := users.ExtractUsers(page)
if err != nil {
return false, err
}
for _, u := range usersList {
res := users.Delete(c.Identity, u.ID)
res := users.Delete(os.Identity, u.ID)
if res.Err != nil {
glog.Errorf("Delete openstack user %s error: %v", u.Name, err)
return false, err
@ -824,8 +889,8 @@ func (os *Client) ListPorts(networkID, deviceOwner string) ([]ports.Port, error)
return results, nil
}
// Delete port by portName
func (os *Client) DeletePort(portName string) error {
// DeletePortByName deletes port by portName
func (os *Client) DeletePortByName(portName string) error {
port, err := os.GetPort(portName)
if err == util.ErrNotFound {
glog.V(4).Infof("Port %s already deleted", portName)
@ -845,3 +910,27 @@ func (os *Client) DeletePort(portName string) error {
return nil
}
// DeletePortByID deletes port by portID.
func (os *Client) DeletePortByID(portID string) error {
err := ports.Delete(os.Network, portID).ExtractErr()
if err != nil {
glog.Errorf("Delete openstack port portID %s failed: %v", portID, err)
return err
}
return nil
}
// UpdatePortsBinding updates port binding.
func (os *Client) UpdatePortsBinding(portID, deviceOwner string) error {
// Update hostname in order to make sure it is correct
updateOpts := portsbinding.UpdateOpts{
HostID: getHostName(),
UpdateOptsBuilder: ports.UpdateOpts{
DeviceOwner: deviceOwner,
},
}
_, err := portsbinding.Update(os.Network, portID, updateOpts).Extract()
return err
}

View File

@ -19,7 +19,6 @@ package proxy
import (
"bytes"
"fmt"
"os/exec"
"github.com/golang/glog"
utilexec "k8s.io/utils/exec"
@ -38,18 +37,35 @@ const (
opDeleteRule = "-D"
)
// iptablesInterface is an injectable interface for running iptables commands.
type iptablesInterface interface {
// ensureChain ensures chain STACKUBE-PREROUTING is created.
ensureChain() error
// ensureRule links STACKUBE-PREROUTING chain.
ensureRule(op, chain string, args []string) error
// restoreAll runs `iptables-restore` passing data through []byte.
restoreAll(data []byte) error
// netnsExist checks netns exist or not.
netnsExist() bool
// setNetns populates namespace of iptables.
setNetns(netns string)
}
type Iptables struct {
exec utilexec.Interface
namespace string
}
func NewIptables(exec utilexec.Interface, namespace string) *Iptables {
func NewIptables(exec utilexec.Interface) iptablesInterface {
return &Iptables{
exec: exec,
namespace: namespace,
exec: exec,
}
}
func (r *Iptables) setNetns(netns string) {
r.namespace = netns
}
// runInNat executes iptables command in nat table.
func (r *Iptables) runInNat(op, chain string, args []string) ([]byte, error) {
fullArgs := []string{"netns", "exec", r.namespace, "iptables", "-t", TableNAT, op, chain}
@ -132,11 +148,11 @@ func writeLine(buf *bytes.Buffer, words ...string) {
}
}
func netnsExist(netns string) bool {
args := []string{"netns", "pids", netns}
out, err := exec.Command("ip", args...).CombinedOutput()
func (r *Iptables) netnsExist() bool {
args := []string{"netns", "pids", r.namespace}
out, err := r.exec.Command("ip", args...).CombinedOutput()
if err != nil {
glog.V(5).Infof("Checking netns %q failed: %s: %v", netns, out, err)
glog.V(5).Infof("Checking netns %q failed: %s: %v", r.namespace, out, err)
return false
}

View File

@ -43,7 +43,8 @@ func TestEnsureChain(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
ipt := NewIptables(&fexec, "FOO")
ipt := NewIptables(&fexec)
ipt.setNetns("FOO")
// Success.
err := ipt.ensureChain()
fmt.Println(err)
@ -80,7 +81,9 @@ func TestEnsureRuleAlreadyExists(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
ipt := NewIptables(&fexec, "FOO")
ipt := NewIptables(&fexec)
ipt.setNetns("FOO")
err := ipt.ensureRule(opAddpendRule, ChainPrerouting, []string{"abc", "123"})
if err != nil {
t.Errorf("expected success, got %v", err)
@ -108,7 +111,9 @@ func TestEnsureRuleNew(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
ipt := NewIptables(&fexec, "FOO")
ipt := NewIptables(&fexec)
ipt.setNetns("FOO")
err := ipt.ensureRule(opAddpendRule, ChainPrerouting, []string{"abc", "123"})
if err != nil {
t.Errorf("expected success, got %v", err)
@ -133,7 +138,9 @@ func TestEnsureRuleErrorChecking(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
ipt := NewIptables(&fexec, "FOO")
ipt := NewIptables(&fexec)
ipt.setNetns("FOO")
err := ipt.ensureRule(opAddpendRule, ChainPrerouting, []string{"abc", "123"})
if err == nil {
t.Errorf("expected failure")
@ -158,7 +165,9 @@ func TestEnsureRuleErrorCreating(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
ipt := NewIptables(&fexec, "FOO")
ipt := NewIptables(&fexec)
ipt.setNetns("FOO")
err := ipt.ensureRule(opAddpendRule, ChainPrerouting, []string{"abc", "123"})
if err == nil {
t.Errorf("expected failure")
@ -179,7 +188,9 @@ func TestRestoreAll(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
ipt := NewIptables(&fexec, "FOO")
ipt := NewIptables(&fexec)
ipt.setNetns("FOO")
err := ipt.restoreAll([]byte{})
if err != nil {
t.Errorf("expected success, got %v", err)
@ -193,3 +204,31 @@ func TestRestoreAll(t *testing.T) {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0])
}
}
func TestNetnsExist(t *testing.T) {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte{}, nil },
},
}
fexec := fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
ipt := NewIptables(&fexec)
ipt.setNetns("FOO")
ok := ipt.netnsExist()
if !ok {
t.Errorf("expected true, got %v", ok)
}
if fcmd.CombinedOutputCalls != 1 {
t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls)
}
if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ip", "netns", "pids", "FOO") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0])
}
}

View File

@ -50,11 +50,13 @@ const (
burstSyncs = 2
)
// Proxier is an iptables based proxy for connections between a localhost:port
// and services that provide the actual backends in each network.
type Proxier struct {
clusterDNS string
exec utilexec.Interface
kubeClientset *kubernetes.Clientset
osClient *openstack.Client
osClient openstack.Interface
iptables iptablesInterface
factory informers.SharedInformerFactory
namespaceInformer informersV1.NamespaceInformer
serviceInformer informersV1.ServiceInformer
@ -72,7 +74,7 @@ type Proxier struct {
initialized int32
// endpointsSynced and servicesSynced are set to true when corresponding
// objects are synced after startup. This is used to avoid updating iptables
// with some partial data after kube-proxy restart.
// with some partial data after stackube-proxy restart.
endpointsSynced bool
servicesSynced bool
namespaceSynced bool
@ -109,13 +111,15 @@ func NewProxier(kubeConfig, openstackConfig string) (*Proxier, error) {
}
factory := informers.NewSharedInformerFactory(clientset, defaultResyncPeriod)
execer := utilexec.New()
proxier := &Proxier{
kubeClientset: clientset,
osClient: osClient,
iptables: NewIptables(execer),
factory: factory,
clusterDNS: clusterDNS,
exec: execer,
endpointsChanges: newEndpointsChangeMap(""),
serviceChanges: newServiceChangeMap(),
namespaceChanges: newNamespaceChangeMap(),
@ -129,17 +133,17 @@ func NewProxier(kubeConfig, openstackConfig string) (*Proxier, error) {
return proxier, nil
}
func (proxier *Proxier) setInitialized(value bool) {
func (p *Proxier) setInitialized(value bool) {
var initialized int32
if value {
initialized = 1
}
atomic.StoreInt32(&proxier.initialized, initialized)
atomic.StoreInt32(&p.initialized, initialized)
}
func (proxier *Proxier) isInitialized() bool {
return atomic.LoadInt32(&proxier.initialized) > 0
func (p *Proxier) isInitialized() bool {
return atomic.LoadInt32(&p.initialized) > 0
}
func (p *Proxier) onServiceAdded(obj interface{}) {
@ -317,6 +321,7 @@ func (p *Proxier) onNamespaceDeleted(obj interface{}) {
}
}
// RegisterInformers registers informers which informer on namespacesservices and endpoints change.
func (p *Proxier) RegisterInformers() {
p.namespaceInformer = p.factory.Core().V1().Namespaces()
p.namespaceInformer.Informer().AddEventHandlerWithResyncPeriod(
@ -343,6 +348,7 @@ func (p *Proxier) RegisterInformers() {
}, time.Minute)
}
// StartNamespaceInformer starts namespace informer.
func (p *Proxier) StartNamespaceInformer(stopCh <-chan struct{}) error {
if !cache.WaitForCacheSync(stopCh, p.namespaceInformer.Informer().HasSynced) {
return fmt.Errorf("failed to cache namespaces")
@ -362,6 +368,7 @@ func (p *Proxier) StartNamespaceInformer(stopCh <-chan struct{}) error {
return nil
}
// StartServiceInformer starts service informer.
func (p *Proxier) StartServiceInformer(stopCh <-chan struct{}) error {
if !cache.WaitForCacheSync(stopCh, p.serviceInformer.Informer().HasSynced) {
return fmt.Errorf("failed to cache services")
@ -381,6 +388,7 @@ func (p *Proxier) StartServiceInformer(stopCh <-chan struct{}) error {
return nil
}
// StartEndpointInformer starts endpoint informer.
func (p *Proxier) StartEndpointInformer(stopCh <-chan struct{}) error {
if !cache.WaitForCacheSync(stopCh, p.endpointInformer.Informer().HasSynced) {
return fmt.Errorf("failed to cache endpoints")
@ -399,11 +407,13 @@ func (p *Proxier) StartEndpointInformer(stopCh <-chan struct{}) error {
return nil
}
// StartInformerFactory starts informer factory.
func (p *Proxier) StartInformerFactory(stopCh <-chan struct{}) error {
p.factory.Start(stopCh)
return nil
}
// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
func (p *Proxier) SyncLoop() error {
p.syncRunner.Loop(wait.NeverStop)
return nil
@ -517,20 +527,22 @@ func (p *Proxier) syncProxyRules() {
// Step 3: compose iptables chain.
netns := getRouterNetns(nsInfo.router)
if !netnsExist(netns) {
// populates netns to iptables.
p.iptables.setNetns(netns)
if !p.iptables.netnsExist() {
glog.V(3).Infof("Netns %q doesn't exist, omit the services in namespace %q", netns, namespace)
continue
}
ipt := NewIptables(p.exec, netns)
// ensure chain STACKUBE-PREROUTING created.
err := ipt.ensureChain()
err := p.iptables.ensureChain()
if err != nil {
glog.Errorf("EnsureChain %q in netns %q failed: %v", ChainSKPrerouting, netns, err)
continue
}
// link STACKUBE-PREROUTING chain.
err = ipt.ensureRule(opAddpendRule, ChainPrerouting, []string{
err = p.iptables.ensureRule(opAddpendRule, ChainPrerouting, []string{
"-m", "comment", "--comment", "stackube service portals", "-j", ChainSKPrerouting,
})
if err != nil {
@ -596,7 +608,7 @@ func (p *Proxier) syncProxyRules() {
writeLine(iptablesData, []string{"COMMIT"}...)
// Step 6: execute iptables-restore.
err = ipt.restoreAll(iptablesData.Bytes())
err = p.iptables.restoreAll(iptablesData.Bytes())
if err != nil {
glog.Errorf("Failed to execute iptables-restore: %v", err)
continue

View File

@ -73,7 +73,7 @@ type ServiceController struct {
cache *serviceCache
kubeClientset *kubernetes.Clientset
osClient *openstack.Client
osClient openstack.Interface
factory informers.SharedInformerFactory
serviceInformer informersV1.ServiceInformer
endpointInformer informersV1.EndpointsInformer
@ -85,7 +85,7 @@ type ServiceController struct {
// NewServiceController returns a new service controller to keep openstack lbaas resources
// (load balancers) in sync with the registry.
func NewServiceController(kubeClient *kubernetes.Clientset,
osClient *openstack.Client) (*ServiceController, error) {
osClient openstack.Interface) (*ServiceController, error) {
factory := informers.NewSharedInformerFactory(kubeClient, resyncPeriod)
s := &ServiceController{
osClient: osClient,