diff --git a/cmd/kubestack/kubestack.go b/cmd/kubestack/kubestack.go index 79a76bd..779d73d 100644 --- a/cmd/kubestack/kubestack.go +++ b/cmd/kubestack/kubestack.go @@ -38,8 +38,6 @@ import ( cniSpecVersion "github.com/containernetworking/cni/pkg/version" "github.com/containernetworking/plugins/pkg/ns" "github.com/golang/glog" - "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/portsbinding" - "github.com/gophercloud/gophercloud/openstack/networking/v2/ports" // import plugins _ "git.openstack.org/openstack/stackube/pkg/kubestack/plugins/openvswitch" @@ -54,7 +52,7 @@ var ( // OpenStack describes openstack client and its plugins. type OpenStack struct { - Client openstack.Client + Client openstack.Interface Plugin plugins.PluginInterface } @@ -124,15 +122,15 @@ func initOpenstack(stdinData []byte) (OpenStack, string, error) { } os := OpenStack{ - Client: *openStackClient, + Client: openStackClient, } // Init plugin - pluginName := os.Client.PluginName + pluginName := os.Client.GetPluginName() if pluginName == "" { pluginName = "ovs" } - integrationBridge := os.Client.IntegrationBridge + integrationBridge := os.Client.GetIntegrationBridge() if integrationBridge == "" { integrationBridge = "br-int" } @@ -194,16 +192,11 @@ func cmdAdd(args *skel.CmdArgs) error { deviceOwner := fmt.Sprintf("compute:%s", getHostName()) if port.DeviceOwner != deviceOwner { - // Update hostname in order to make sure it is correct - updateOpts := portsbinding.UpdateOpts{ - HostID: getHostName(), - UpdateOptsBuilder: ports.UpdateOpts{ - DeviceOwner: deviceOwner, - }, - } - _, err = portsbinding.Update(os.Client.Network, port.ID, updateOpts).Extract() + err := os.Client.UpdatePortsBinding(port.ID, deviceOwner) if err != nil { - ports.Delete(os.Client.Network, port.ID) + if os.Client.DeletePortByID(port.ID) != nil { + glog.Warningf("Delete port %s failed", port.ID) + } glog.Errorf("Update port %s failed: %v", portName, err) return err } @@ -214,7 +207,7 @@ func cmdAdd(args *skel.CmdArgs) error { subnet, err := os.Client.GetProviderSubnet(port.FixedIPs[0].SubnetID) if err != nil { glog.Errorf("Get info of subnet %s failed: %v", port.FixedIPs[0].SubnetID, err) - if nil != ports.Delete(os.Client.Network, port.ID).ExtractErr() { + if os.Client.DeletePortByID(port.ID) != nil { glog.Warningf("Delete port %s failed", port.ID) } return err @@ -247,7 +240,7 @@ func cmdAdd(args *skel.CmdArgs) error { subnet.Gateway, args.IfName, netnsName) if err != nil { glog.Errorf("SetupInterface failed: %v", err) - if nil != ports.Delete(os.Client.Network, port.ID).ExtractErr() { + if os.Client.DeletePortByID(port.ID) != nil { glog.Warningf("Delete port %s failed", port.ID) } return err @@ -316,9 +309,9 @@ func cmdDel(args *skel.CmdArgs) error { } // Delete port from openstack - err = osClient.Client.DeletePort(portName) + err = osClient.Client.DeletePortByName(portName) if err != nil { - glog.Errorf("DeletePort %s failed: %v", portName, err) + glog.Errorf("Delete port %s failed: %v", portName, err) return err } diff --git a/cmd/stackube-controller/stackube-controller.go b/cmd/stackube-controller/stackube-controller.go index 78dfb17..f8f448c 100644 --- a/cmd/stackube-controller/stackube-controller.go +++ b/cmd/stackube-controller/stackube-controller.go @@ -48,7 +48,7 @@ var ( ) func startControllers(kubeClient *kubernetes.Clientset, - osClient *openstack.Client, kubeExtClient *extclientset.Clientset) error { + osClient openstack.Interface, kubeExtClient *extclientset.Clientset) error { // Creates a new Tenant controller tenantController, err := tenant.NewTenantController(kubeClient, osClient, kubeExtClient) if err != nil { @@ -62,7 +62,7 @@ func startControllers(kubeClient *kubernetes.Clientset, } // Creates a new RBAC controller - rbacController, err := rbacmanager.NewRBACController(kubeClient, osClient.CRDClient, *userCIDR, *userGateway) + rbacController, err := rbacmanager.NewRBACController(kubeClient, osClient.GetCRDClient(), *userCIDR, *userGateway) if err != nil { return err } @@ -104,7 +104,7 @@ func startControllers(kubeClient *kubernetes.Clientset, return nil } -func initClients() (*kubernetes.Clientset, *openstack.Client, *extclientset.Clientset, error) { +func initClients() (*kubernetes.Clientset, openstack.Interface, *extclientset.Clientset, error) { // Create kubernetes client config. Use kubeconfig if given, otherwise assume in-cluster. config, err := util.NewClusterConfig(*kubeconfig) if err != nil { diff --git a/pkg/auth-controller/tenant/tenant_controller.go b/pkg/auth-controller/tenant/tenant_controller.go index fede491..9602d00 100644 --- a/pkg/auth-controller/tenant/tenant_controller.go +++ b/pkg/auth-controller/tenant/tenant_controller.go @@ -38,12 +38,12 @@ import ( type TenantController struct { k8sClient *kubernetes.Clientset kubeCRDClient *crdClient.CRDClient - openstackClient *openstack.Client + openstackClient openstack.Interface } // NewTenantController creates a new tenant controller. func NewTenantController(kubeClient *kubernetes.Clientset, - osClient *openstack.Client, + osClient openstack.Interface, kubeExtClient *apiextensionsclient.Clientset) (*TenantController, error) { // initialize CRD if it does not exist _, err := crdClient.CreateTenantCRD(kubeExtClient) @@ -52,7 +52,7 @@ func NewTenantController(kubeClient *kubernetes.Clientset, } c := &TenantController{ - kubeCRDClient: osClient.CRDClient, + kubeCRDClient: osClient.GetCRDClient(), k8sClient: kubeClient, openstackClient: osClient, } diff --git a/pkg/network-controller/network_controller.go b/pkg/network-controller/network_controller.go index 5c220b4..85d2a4c 100644 --- a/pkg/network-controller/network_controller.go +++ b/pkg/network-controller/network_controller.go @@ -52,7 +52,7 @@ const ( type NetworkController struct { k8sclient *kubernetes.Clientset kubeCRDClient *kubecrd.CRDClient - driver *openstack.Client + driver openstack.Interface networkInformer cache.Controller } @@ -69,7 +69,7 @@ func (c *NetworkController) Run(stopCh <-chan struct{}) error { return nil } -func NewNetworkController(kubeClient *kubernetes.Clientset, osClient *openstack.Client, kubeExtClient *apiextensionsclient.Clientset) (*NetworkController, error) { +func NewNetworkController(kubeClient *kubernetes.Clientset, osClient openstack.Interface, kubeExtClient *apiextensionsclient.Clientset) (*NetworkController, error) { // initialize CRD if it does not exist _, err := kubecrd.CreateNetworkCRD(kubeExtClient) if err != nil && !apierrors.IsAlreadyExists(err) { @@ -77,13 +77,13 @@ func NewNetworkController(kubeClient *kubernetes.Clientset, osClient *openstack. } source := cache.NewListWatchFromClient( - osClient.CRDClient.Client, + osClient.GetCRDClient().Client, crv1.NetworkResourcePlural, apiv1.NamespaceAll, fields.Everything()) networkController := &NetworkController{ k8sclient: kubeClient, - kubeCRDClient: osClient.CRDClient, + kubeCRDClient: osClient.GetCRDClient(), driver: osClient, } _, networkInformer := cache.NewInformer( diff --git a/pkg/openstack/client.go b/pkg/openstack/client.go index 2f5e46f..05c2697 100644 --- a/pkg/openstack/client.go +++ b/pkg/openstack/client.go @@ -63,6 +63,56 @@ var ( ErrMultipleResults = errors.New("MultipleResults") ) +// Interface should be implemented by a openstack client. +type Interface interface { + // CreateTenant creates tenant by tenantname. + CreateTenant(tenantName string) (string, error) + // DeleteTenant deletes tenant by tenantName. + DeleteTenant(tenantName string) error + // GetTenantIDFromName gets tenantID by tenantName. + GetTenantIDFromName(tenantName string) (string, error) + // CheckTenantID checks tenant exist or not. + CheckTenantID(tenantID string) (bool, error) + // CreateUser creates user with username, password in the tenant. + CreateUser(username, password, tenantID string) error + // DeleteAllUsersOnTenant deletes all users on the tenant. + DeleteAllUsersOnTenant(tenantName string) error + // CreateNetwork creates network. + CreateNetwork(network *drivertypes.Network) error + // GetNetworkByID gets network by networkID. + GetNetworkByID(networkID string) (*drivertypes.Network, error) + // GetNetwork gets networks by networkName. + GetNetwork(networkName string) (*drivertypes.Network, error) + // DeleteNetwork deletes network by networkName. + DeleteNetwork(networkName string) error + // GetProviderSubnet gets provider subnet by id + GetProviderSubnet(osSubnetID string) (*drivertypes.Subnet, error) + // CreatePort creates port by neworkID, tenantID and portName. + CreatePort(networkID, tenantID, portName string) (*portsbinding.Port, error) + // GetPort gets port by portName. + GetPort(name string) (*ports.Port, error) + // ListPorts lists ports by networkID and deviceOwner. + ListPorts(networkID, deviceOwner string) ([]ports.Port, error) + // DeletePortByName deletes port by portName. + DeletePortByName(portName string) error + // DeletePortByID deletes port by portID. + DeletePortByID(portID string) error + // UpdatePortsBinding updates port binding. + UpdatePortsBinding(portID, deviceOwner string) error + // LoadBalancerExist returns whether a load balancer has already been exist. + LoadBalancerExist(name string) (bool, error) + // EnsureLoadBalancer ensures a load balancer is created. + EnsureLoadBalancer(lb *LoadBalancer) (*LoadBalancerStatus, error) + // EnsureLoadBalancerDeleted ensures a load balancer is deleted. + EnsureLoadBalancerDeleted(name string) error + // GetCRDClient returns the CRDClient. + GetCRDClient() *crdClient.CRDClient + // GetPluginName returns the plugin name. + GetPluginName() string + // GetIntegrationBridge returns the integration bridge name. + GetIntegrationBridge() string +} + type Client struct { Identity *gophercloud.ServiceClient Provider *gophercloud.ProviderClient @@ -101,7 +151,7 @@ func toAuthOptions(cfg Config) gophercloud.AuthOptions { } } -func NewClient(config string, kubeConfig string) (*Client, error) { +func NewClient(config string, kubeConfig string) (Interface, error) { var opts gophercloud.AuthOptions cfg, err := readConfig(config) if err != nil { @@ -170,7 +220,22 @@ func readConfig(config string) (Config, error) { return cfg, nil } -func (c *Client) GetTenantIDFromName(tenantName string) (string, error) { +// GetCRDClient returns the CRDClient. +func (os *Client) GetCRDClient() *crdClient.CRDClient { + return os.CRDClient +} + +// GetPluginName returns the plugin name. +func (os *Client) GetPluginName() string { + return os.PluginName +} + +// GetIntegrationBridge returns the integration bridge name. +func (os *Client) GetIntegrationBridge() string { + return os.IntegrationBridge +} + +func (os *Client) GetTenantIDFromName(tenantName string) (string, error) { if util.IsSystemNamespace(tenantName) { tenantName = util.SystemTenant } @@ -180,7 +245,7 @@ func (c *Client) GetTenantIDFromName(tenantName string) (string, error) { tenant *crv1.Tenant err error ) - if tenant, err = c.CRDClient.GetTenant(tenantName); err != nil { + if tenant, err = os.CRDClient.GetTenant(tenantName); err != nil { return "", err } if tenant.Spec.TenantID != "" { @@ -189,7 +254,7 @@ func (c *Client) GetTenantIDFromName(tenantName string) (string, error) { // Otherwise, fetch tenantID from OpenStack var tenantID string - err = tenants.List(c.Identity, nil).EachPage(func(page pagination.Page) (bool, error) { + err = tenants.List(os.Identity, nil).EachPage(func(page pagination.Page) (bool, error) { tenantList, err1 := tenants.ExtractTenants(page) if err1 != nil { return false, err1 @@ -211,35 +276,35 @@ func (c *Client) GetTenantIDFromName(tenantName string) (string, error) { return tenantID, nil } -func (c *Client) CreateTenant(tenantName string) (string, error) { +func (os *Client) CreateTenant(tenantName string) (string, error) { createOpts := tenants.CreateOpts{ Name: tenantName, Description: "stackube", Enabled: gophercloud.Enabled, } - _, err := tenants.Create(c.Identity, createOpts).Extract() + _, err := tenants.Create(os.Identity, createOpts).Extract() if err != nil && !IsAlreadyExists(err) { glog.Errorf("Failed to create tenant %s: %v", tenantName, err) return "", err } glog.V(4).Infof("Tenant %s created", tenantName) - tenantID, err := c.GetTenantIDFromName(tenantName) + tenantID, err := os.GetTenantIDFromName(tenantName) if err != nil { return "", err } return tenantID, nil } -func (c *Client) DeleteTenant(tenantName string) error { - return tenants.List(c.Identity, nil).EachPage(func(page pagination.Page) (bool, error) { +func (os *Client) DeleteTenant(tenantName string) error { + return tenants.List(os.Identity, nil).EachPage(func(page pagination.Page) (bool, error) { tenantList, err := tenants.ExtractTenants(page) if err != nil { return false, err } for _, t := range tenantList { if t.Name == tenantName { - err := tenants.Delete(c.Identity, t.ID).ExtractErr() + err := tenants.Delete(os.Identity, t.ID).ExtractErr() if err != nil { glog.Errorf("Delete openstack tenant %s error: %v", tenantName, err) return false, err @@ -252,14 +317,14 @@ func (c *Client) DeleteTenant(tenantName string) error { }) } -func (c *Client) CreateUser(username, password, tenantID string) error { +func (os *Client) CreateUser(username, password, tenantID string) error { opts := users.CreateOpts{ Name: username, TenantID: tenantID, Enabled: gophercloud.Enabled, Password: password, } - _, err := users.Create(c.Identity, opts).Extract() + _, err := users.Create(os.Identity, opts).Extract() if err != nil && !IsAlreadyExists(err) { glog.Errorf("Failed to create user %s: %v", username, err) return err @@ -268,18 +333,18 @@ func (c *Client) CreateUser(username, password, tenantID string) error { return nil } -func (c *Client) DeleteAllUsersOnTenant(tenantName string) error { - tenantID, err := c.GetTenantIDFromName(tenantName) +func (os *Client) DeleteAllUsersOnTenant(tenantName string) error { + tenantID, err := os.GetTenantIDFromName(tenantName) if err != nil { return nil } - return users.ListUsers(c.Identity, tenantID).EachPage(func(page pagination.Page) (bool, error) { + return users.ListUsers(os.Identity, tenantID).EachPage(func(page pagination.Page) (bool, error) { usersList, err := users.ExtractUsers(page) if err != nil { return false, err } for _, u := range usersList { - res := users.Delete(c.Identity, u.ID) + res := users.Delete(os.Identity, u.ID) if res.Err != nil { glog.Errorf("Delete openstack user %s error: %v", u.Name, err) return false, err @@ -824,8 +889,8 @@ func (os *Client) ListPorts(networkID, deviceOwner string) ([]ports.Port, error) return results, nil } -// Delete port by portName -func (os *Client) DeletePort(portName string) error { +// DeletePortByName deletes port by portName +func (os *Client) DeletePortByName(portName string) error { port, err := os.GetPort(portName) if err == util.ErrNotFound { glog.V(4).Infof("Port %s already deleted", portName) @@ -845,3 +910,27 @@ func (os *Client) DeletePort(portName string) error { return nil } + +// DeletePortByID deletes port by portID. +func (os *Client) DeletePortByID(portID string) error { + err := ports.Delete(os.Network, portID).ExtractErr() + if err != nil { + glog.Errorf("Delete openstack port portID %s failed: %v", portID, err) + return err + } + + return nil +} + +// UpdatePortsBinding updates port binding. +func (os *Client) UpdatePortsBinding(portID, deviceOwner string) error { + // Update hostname in order to make sure it is correct + updateOpts := portsbinding.UpdateOpts{ + HostID: getHostName(), + UpdateOptsBuilder: ports.UpdateOpts{ + DeviceOwner: deviceOwner, + }, + } + _, err := portsbinding.Update(os.Network, portID, updateOpts).Extract() + return err +} diff --git a/pkg/proxy/iptables.go b/pkg/proxy/iptables.go index 4431225..92de9aa 100644 --- a/pkg/proxy/iptables.go +++ b/pkg/proxy/iptables.go @@ -19,7 +19,6 @@ package proxy import ( "bytes" "fmt" - "os/exec" "github.com/golang/glog" utilexec "k8s.io/utils/exec" @@ -38,18 +37,35 @@ const ( opDeleteRule = "-D" ) +// iptablesInterface is an injectable interface for running iptables commands. +type iptablesInterface interface { + // ensureChain ensures chain STACKUBE-PREROUTING is created. + ensureChain() error + // ensureRule links STACKUBE-PREROUTING chain. + ensureRule(op, chain string, args []string) error + // restoreAll runs `iptables-restore` passing data through []byte. + restoreAll(data []byte) error + // netnsExist checks netns exist or not. + netnsExist() bool + // setNetns populates namespace of iptables. + setNetns(netns string) +} + type Iptables struct { exec utilexec.Interface namespace string } -func NewIptables(exec utilexec.Interface, namespace string) *Iptables { +func NewIptables(exec utilexec.Interface) iptablesInterface { return &Iptables{ - exec: exec, - namespace: namespace, + exec: exec, } } +func (r *Iptables) setNetns(netns string) { + r.namespace = netns +} + // runInNat executes iptables command in nat table. func (r *Iptables) runInNat(op, chain string, args []string) ([]byte, error) { fullArgs := []string{"netns", "exec", r.namespace, "iptables", "-t", TableNAT, op, chain} @@ -132,11 +148,11 @@ func writeLine(buf *bytes.Buffer, words ...string) { } } -func netnsExist(netns string) bool { - args := []string{"netns", "pids", netns} - out, err := exec.Command("ip", args...).CombinedOutput() +func (r *Iptables) netnsExist() bool { + args := []string{"netns", "pids", r.namespace} + out, err := r.exec.Command("ip", args...).CombinedOutput() if err != nil { - glog.V(5).Infof("Checking netns %q failed: %s: %v", netns, out, err) + glog.V(5).Infof("Checking netns %q failed: %s: %v", r.namespace, out, err) return false } diff --git a/pkg/proxy/iptables_test.go b/pkg/proxy/iptables_test.go index d7bf01b..c3cb210 100644 --- a/pkg/proxy/iptables_test.go +++ b/pkg/proxy/iptables_test.go @@ -43,7 +43,8 @@ func TestEnsureChain(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - ipt := NewIptables(&fexec, "FOO") + ipt := NewIptables(&fexec) + ipt.setNetns("FOO") // Success. err := ipt.ensureChain() fmt.Println(err) @@ -80,7 +81,9 @@ func TestEnsureRuleAlreadyExists(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - ipt := NewIptables(&fexec, "FOO") + ipt := NewIptables(&fexec) + ipt.setNetns("FOO") + err := ipt.ensureRule(opAddpendRule, ChainPrerouting, []string{"abc", "123"}) if err != nil { t.Errorf("expected success, got %v", err) @@ -108,7 +111,9 @@ func TestEnsureRuleNew(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - ipt := NewIptables(&fexec, "FOO") + ipt := NewIptables(&fexec) + ipt.setNetns("FOO") + err := ipt.ensureRule(opAddpendRule, ChainPrerouting, []string{"abc", "123"}) if err != nil { t.Errorf("expected success, got %v", err) @@ -133,7 +138,9 @@ func TestEnsureRuleErrorChecking(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - ipt := NewIptables(&fexec, "FOO") + ipt := NewIptables(&fexec) + ipt.setNetns("FOO") + err := ipt.ensureRule(opAddpendRule, ChainPrerouting, []string{"abc", "123"}) if err == nil { t.Errorf("expected failure") @@ -158,7 +165,9 @@ func TestEnsureRuleErrorCreating(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - ipt := NewIptables(&fexec, "FOO") + ipt := NewIptables(&fexec) + ipt.setNetns("FOO") + err := ipt.ensureRule(opAddpendRule, ChainPrerouting, []string{"abc", "123"}) if err == nil { t.Errorf("expected failure") @@ -179,7 +188,9 @@ func TestRestoreAll(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, } - ipt := NewIptables(&fexec, "FOO") + ipt := NewIptables(&fexec) + ipt.setNetns("FOO") + err := ipt.restoreAll([]byte{}) if err != nil { t.Errorf("expected success, got %v", err) @@ -193,3 +204,31 @@ func TestRestoreAll(t *testing.T) { t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0]) } } + +func TestNetnsExist(t *testing.T) { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte{}, nil }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + ipt := NewIptables(&fexec) + ipt.setNetns("FOO") + + ok := ipt.netnsExist() + if !ok { + t.Errorf("expected true, got %v", ok) + } + + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } + + if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ip", "netns", "pids", "FOO") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0]) + } +} diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 024c39f..ab7cc80 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -50,11 +50,13 @@ const ( burstSyncs = 2 ) +// Proxier is an iptables based proxy for connections between a localhost:port +// and services that provide the actual backends in each network. type Proxier struct { clusterDNS string - exec utilexec.Interface kubeClientset *kubernetes.Clientset - osClient *openstack.Client + osClient openstack.Interface + iptables iptablesInterface factory informers.SharedInformerFactory namespaceInformer informersV1.NamespaceInformer serviceInformer informersV1.ServiceInformer @@ -72,7 +74,7 @@ type Proxier struct { initialized int32 // endpointsSynced and servicesSynced are set to true when corresponding // objects are synced after startup. This is used to avoid updating iptables - // with some partial data after kube-proxy restart. + // with some partial data after stackube-proxy restart. endpointsSynced bool servicesSynced bool namespaceSynced bool @@ -109,13 +111,15 @@ func NewProxier(kubeConfig, openstackConfig string) (*Proxier, error) { } factory := informers.NewSharedInformerFactory(clientset, defaultResyncPeriod) + execer := utilexec.New() + proxier := &Proxier{ kubeClientset: clientset, osClient: osClient, + iptables: NewIptables(execer), factory: factory, clusterDNS: clusterDNS, - exec: execer, endpointsChanges: newEndpointsChangeMap(""), serviceChanges: newServiceChangeMap(), namespaceChanges: newNamespaceChangeMap(), @@ -129,17 +133,17 @@ func NewProxier(kubeConfig, openstackConfig string) (*Proxier, error) { return proxier, nil } -func (proxier *Proxier) setInitialized(value bool) { +func (p *Proxier) setInitialized(value bool) { var initialized int32 if value { initialized = 1 } - atomic.StoreInt32(&proxier.initialized, initialized) + atomic.StoreInt32(&p.initialized, initialized) } -func (proxier *Proxier) isInitialized() bool { - return atomic.LoadInt32(&proxier.initialized) > 0 +func (p *Proxier) isInitialized() bool { + return atomic.LoadInt32(&p.initialized) > 0 } func (p *Proxier) onServiceAdded(obj interface{}) { @@ -317,6 +321,7 @@ func (p *Proxier) onNamespaceDeleted(obj interface{}) { } } +// RegisterInformers registers informers which informer on namespaces,services and endpoints change. func (p *Proxier) RegisterInformers() { p.namespaceInformer = p.factory.Core().V1().Namespaces() p.namespaceInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -343,6 +348,7 @@ func (p *Proxier) RegisterInformers() { }, time.Minute) } +// StartNamespaceInformer starts namespace informer. func (p *Proxier) StartNamespaceInformer(stopCh <-chan struct{}) error { if !cache.WaitForCacheSync(stopCh, p.namespaceInformer.Informer().HasSynced) { return fmt.Errorf("failed to cache namespaces") @@ -362,6 +368,7 @@ func (p *Proxier) StartNamespaceInformer(stopCh <-chan struct{}) error { return nil } +// StartServiceInformer starts service informer. func (p *Proxier) StartServiceInformer(stopCh <-chan struct{}) error { if !cache.WaitForCacheSync(stopCh, p.serviceInformer.Informer().HasSynced) { return fmt.Errorf("failed to cache services") @@ -381,6 +388,7 @@ func (p *Proxier) StartServiceInformer(stopCh <-chan struct{}) error { return nil } +// StartEndpointInformer starts endpoint informer. func (p *Proxier) StartEndpointInformer(stopCh <-chan struct{}) error { if !cache.WaitForCacheSync(stopCh, p.endpointInformer.Informer().HasSynced) { return fmt.Errorf("failed to cache endpoints") @@ -399,11 +407,13 @@ func (p *Proxier) StartEndpointInformer(stopCh <-chan struct{}) error { return nil } +// StartInformerFactory starts informer factory. func (p *Proxier) StartInformerFactory(stopCh <-chan struct{}) error { p.factory.Start(stopCh) return nil } +// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. func (p *Proxier) SyncLoop() error { p.syncRunner.Loop(wait.NeverStop) return nil @@ -517,20 +527,22 @@ func (p *Proxier) syncProxyRules() { // Step 3: compose iptables chain. netns := getRouterNetns(nsInfo.router) - if !netnsExist(netns) { + + // populates netns to iptables. + p.iptables.setNetns(netns) + if !p.iptables.netnsExist() { glog.V(3).Infof("Netns %q doesn't exist, omit the services in namespace %q", netns, namespace) continue } - ipt := NewIptables(p.exec, netns) // ensure chain STACKUBE-PREROUTING created. - err := ipt.ensureChain() + err := p.iptables.ensureChain() if err != nil { glog.Errorf("EnsureChain %q in netns %q failed: %v", ChainSKPrerouting, netns, err) continue } // link STACKUBE-PREROUTING chain. - err = ipt.ensureRule(opAddpendRule, ChainPrerouting, []string{ + err = p.iptables.ensureRule(opAddpendRule, ChainPrerouting, []string{ "-m", "comment", "--comment", "stackube service portals", "-j", ChainSKPrerouting, }) if err != nil { @@ -596,7 +608,7 @@ func (p *Proxier) syncProxyRules() { writeLine(iptablesData, []string{"COMMIT"}...) // Step 6: execute iptables-restore. - err = ipt.restoreAll(iptablesData.Bytes()) + err = p.iptables.restoreAll(iptablesData.Bytes()) if err != nil { glog.Errorf("Failed to execute iptables-restore: %v", err) continue diff --git a/pkg/service-controller/service_controller.go b/pkg/service-controller/service_controller.go index ce9a796..a364034 100644 --- a/pkg/service-controller/service_controller.go +++ b/pkg/service-controller/service_controller.go @@ -73,7 +73,7 @@ type ServiceController struct { cache *serviceCache kubeClientset *kubernetes.Clientset - osClient *openstack.Client + osClient openstack.Interface factory informers.SharedInformerFactory serviceInformer informersV1.ServiceInformer endpointInformer informersV1.EndpointsInformer @@ -85,7 +85,7 @@ type ServiceController struct { // NewServiceController returns a new service controller to keep openstack lbaas resources // (load balancers) in sync with the registry. func NewServiceController(kubeClient *kubernetes.Clientset, - osClient *openstack.Client) (*ServiceController, error) { + osClient openstack.Interface) (*ServiceController, error) { factory := informers.NewSharedInformerFactory(kubeClient, resyncPeriod) s := &ServiceController{ osClient: osClient,