go: end-user client

Add an end-user client.  It maps very closely to swift functionality.
Also, rewrite "hummingbird bench" to use said aforementioned client.

Change-Id: I48b69254bae375defa16ec46c3c451f52f386bf9
This commit is contained in:
Michael Barton 2016-04-01 02:30:52 +00:00
parent fcd89b1ca2
commit 31c1ea783e
3 changed files with 505 additions and 119 deletions

View File

@ -42,7 +42,7 @@ func (obj *DirectObject) Put() bool {
req.Header.Set("X-Timestamp", hummingbird.GetTimestamp())
req.Header.Set("Content-Type", "application/octet-stream")
req.ContentLength = int64(len(obj.Data))
resp, err := client.Do(req)
resp, err := http.DefaultClient.Do(req)
if resp != nil {
resp.Body.Close()
}
@ -54,7 +54,7 @@ func (obj *DirectObject) Put() bool {
func (obj *DirectObject) Get() bool {
req, _ := http.NewRequest("GET", obj.Url, nil)
resp, err := client.Do(req)
resp, err := http.DefaultClient.Do(req)
if resp != nil {
io.Copy(ioutil.Discard, resp.Body)
}
@ -66,7 +66,7 @@ func (obj *DirectObject) Get() bool {
func (obj *DirectObject) Replicate() bool {
req, _ := http.NewRequest("REPLICATE", obj.Url, nil)
resp, err := client.Do(req)
resp, err := http.DefaultClient.Do(req)
if resp != nil {
io.Copy(ioutil.Discard, resp.Body)
}
@ -76,7 +76,7 @@ func (obj *DirectObject) Replicate() bool {
func (obj *DirectObject) Delete() bool {
req, _ := http.NewRequest("DELETE", obj.Url, nil)
req.Header.Set("X-Timestamp", hummingbird.GetTimestamp())
resp, err := client.Do(req)
resp, err := http.DefaultClient.Do(req)
if resp != nil {
resp.Body.Close()
}
@ -89,7 +89,7 @@ func (obj *DirectObject) Delete() bool {
func GetDevices(address string, checkMounted bool) []string {
deviceUrl := fmt.Sprintf("%srecon/diskusage", address)
req, err := http.NewRequest("GET", deviceUrl, nil)
resp, err := client.Do(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
fmt.Println(fmt.Sprintf("ERROR GETTING DEVICES: %s", err))
os.Exit(1)
@ -153,14 +153,15 @@ func RunDBench(args []string) {
}
data := make([]byte, objectSize)
objects := make([]DirectObject, numObjects)
objects := make([]*DirectObject, numObjects)
deviceParts := make(map[string]bool)
for i, _ := range objects {
device := strings.Trim(deviceList[i%len(deviceList)], " ")
part := rand.Int63()%numPartitions + minPartition
objects[i].Url = fmt.Sprintf("%s%s/%d/%s/%s/%d", address, device, part, "a", "c", rand.Int63())
objects[i].Data = data
objects[i] = &DirectObject{
Url: fmt.Sprintf("%s%s/%d/%s/%s/%d", address, device, part, "a", "c", rand.Int63()),
Data: data,
}
deviceParts[fmt.Sprintf("%s/%d", device, part)] = true
}
@ -198,5 +199,4 @@ func RunDBench(args []string) {
if doReplicates {
DoJobs("REPLICATE", replWork, concurrency)
}
}

View File

@ -17,132 +17,91 @@ package bench
import (
"bytes"
"crypto/tls"
"fmt"
"io"
"io/ioutil"
"math"
"math/rand"
"net/http"
"os"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/openstack/swift/go/client"
"github.com/openstack/swift/go/hummingbird"
)
var client = &http.Client{}
var storageURL = ""
var authToken = ""
func Auth(endpoint string, user string, key string, allowInsecureAuthCert bool) (string, string) {
req, err := http.NewRequest("GET", endpoint, nil)
req.Header.Set("X-Auth-User", user)
req.Header.Set("X-Auth-Key", key)
authClient := &http.Client{}
if allowInsecureAuthCert {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
authClient = &http.Client{Transport: tr}
}
resp, err := authClient.Do(req)
if err != nil {
fmt.Println("ERROR MAKING AUTH REQUEST", err)
os.Exit(1)
}
resp.Body.Close()
return resp.Header.Get("X-Storage-Url"), resp.Header.Get("X-Auth-Token")
}
func PutContainers(storageURL string, authToken string, count int, salt string) {
for i := 0; i < count; i++ {
url := fmt.Sprintf("%s/%d-%s", storageURL, i, salt)
req, _ := http.NewRequest("PUT", url, nil)
req.Header.Set("X-Auth-Token", authToken)
resp, err := client.Do(req)
if err != nil || resp.StatusCode/100 != 2 {
fmt.Println("ERROR CREATING CONTAINERS", resp.StatusCode)
os.Exit(1)
}
resp.Body.Close()
}
}
type Object struct {
Url string
Data []byte
Id int
State int
c client.Client
state int
container string
name string
data []byte
}
func (obj *Object) Put() bool {
req, _ := http.NewRequest("PUT", obj.Url, bytes.NewReader(obj.Data))
req.Header.Set("X-Auth-Token", authToken)
req.Header.Set("Content-Length", strconv.FormatInt(int64(len(obj.Data)), 10))
req.ContentLength = int64(len(obj.Data))
resp, err := client.Do(req)
if resp != nil {
resp.Body.Close()
}
return err == nil && resp.StatusCode/100 == 2
return obj.c.PutObject(obj.container, obj.name, nil, bytes.NewReader(obj.data)) == nil
}
func (obj *Object) Get() bool {
req, _ := http.NewRequest("GET", obj.Url, nil)
req.Header.Set("X-Auth-Token", authToken)
resp, err := client.Do(req)
if resp != nil {
io.Copy(ioutil.Discard, resp.Body)
if r, _, err := obj.c.GetObject(obj.container, obj.name, nil); err != nil {
return false
} else {
io.Copy(ioutil.Discard, r)
r.Close()
return true
}
return err == nil && resp.StatusCode/100 == 2
}
func (obj *Object) Delete() bool {
req, _ := http.NewRequest("DELETE", obj.Url, nil)
req.Header.Set("X-Auth-Token", authToken)
resp, err := client.Do(req)
if resp != nil {
resp.Body.Close()
}
return err == nil && resp.StatusCode/100 == 2
return obj.c.DeleteObject(obj.container, obj.name, nil) == nil
}
func DoJobs(name string, work []func() bool, concurrency int) {
wg := sync.WaitGroup{}
starterPistol := make(chan int)
jobId := int32(-1)
errors := 0
wg.Add(concurrency)
jobTimes := make([]float64, len(work))
cwg := sync.WaitGroup{}
errorCount := 0
jobTimes := make([]float64, 0, len(work))
times := make(chan float64)
errors := make(chan int)
jobqueue := make(chan func() bool)
cwg.Add(2)
go func() {
for n := range errors {
errorCount += n
}
cwg.Done()
}()
go func() {
for t := range times {
jobTimes = append(jobTimes, t)
}
sort.Float64s(jobTimes)
cwg.Done()
}()
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
_, _ = <-starterPistol
for {
job := int(atomic.AddInt32(&jobId, 1))
if job >= len(work) {
wg.Done()
return
}
for job := range jobqueue {
startJob := time.Now()
result := work[job]()
jobTimes[job] = float64(time.Now().Sub(startJob)) / float64(time.Second)
if !result {
errors += 1
if !job() {
errors <- 1
}
times <- float64(time.Now().Sub(startJob)) / float64(time.Second)
}
wg.Done()
}()
}
start := time.Now()
close(starterPistol)
for _, job := range work {
jobqueue <- job
}
close(jobqueue)
wg.Wait()
totalTime := float64(time.Now().Sub(start)) / float64(time.Second)
sort.Float64s(jobTimes)
close(errors)
close(times)
cwg.Wait()
sum := 0.0
for _, val := range jobTimes {
sum += val
@ -153,7 +112,7 @@ func DoJobs(name string, work []func() bool, concurrency int) {
diffsum += math.Pow(val-avg, 2.0)
}
fmt.Printf("%ss: %d @ %.2f/s\n", name, len(work), float64(len(work))/totalTime)
fmt.Println(" Failures:", errors)
fmt.Println(" Failures:", errorCount)
fmt.Printf(" Mean: %.5fs (%.1f%% RSD)\n", avg, math.Sqrt(diffsum/float64(len(work)))*100.0/avg)
fmt.Printf(" Median: %.5fs\n", jobTimes[int(float64(len(jobTimes))*0.5)])
fmt.Printf(" 85%%: %.5fs\n", jobTimes[int(float64(len(jobTimes))*0.85)])
@ -197,16 +156,34 @@ func RunBench(args []string) {
allowInsecureAuthCert := benchconf.GetBool("bench", "allow_insecure_auth_cert", false)
salt := fmt.Sprintf("%d", rand.Int63())
storageURL, authToken = Auth(authURL, authUser, authKey, allowInsecureAuthCert)
var cli client.Client
if allowInsecureAuthCert {
cli, err = client.NewInsecureClient("", authUser, "", authKey, "", authURL, false)
} else {
cli, err = client.NewClient("", authUser, "", authKey, "", authURL, false)
}
if err != nil {
fmt.Println("Error creating client:", err)
os.Exit(1)
}
PutContainers(storageURL, authToken, concurrency, salt)
for i := 0; i < concurrency; i++ {
if err := cli.PutContainer(fmt.Sprintf("%d-%s", i, salt), nil); err != nil {
fmt.Println("Error putting container:", err)
os.Exit(1)
}
}
data := make([]byte, objectSize)
objects := make([]Object, numObjects)
objects := make([]*Object, numObjects)
for i, _ := range objects {
objects[i].Url = fmt.Sprintf("%s/%d-%s/%d", storageURL, i%concurrency, salt, rand.Int63())
objects[i].Data = data
objects[i].Id = i
objects[i] = &Object{
state: 0,
container: fmt.Sprintf("%d-%s", i%concurrency, salt),
name: fmt.Sprintf("%x", rand.Int63()),
data: data,
c: cli,
}
}
work := make([]func() bool, len(objects))
@ -264,21 +241,36 @@ func RunThrash(args []string) {
numObjects := thrashconf.GetInt("thrash", "num_objects", 5000)
numGets := int(thrashconf.GetInt("thrash", "gets_per_object", 5))
allowInsecureAuthCert := thrashconf.GetBool("bench", "allow_insecure_auth_cert", false)
storageURL, authToken = Auth(authURL, authUser, authKey, allowInsecureAuthCert)
salt := fmt.Sprintf("%d", rand.Int63())
PutContainers(storageURL, authToken, concurrency, salt)
var cli client.Client
if allowInsecureAuthCert {
cli, err = client.NewInsecureClient("", authUser, "", authKey, "", authURL, false)
} else {
cli, err = client.NewClient("", authUser, "", authKey, "", authURL, false)
}
if err != nil {
fmt.Println("Error creating client:", err)
os.Exit(1)
}
for i := 0; i < concurrency; i++ {
if err := cli.PutContainer(fmt.Sprintf("%d-%s", i, salt), nil); err != nil {
fmt.Println("Error putting container:", err)
os.Exit(1)
}
}
data := make([]byte, objectSize)
objects := make([]*Object, numObjects)
for i, _ := range objects {
objects[i] = &Object{}
objects[i].Url = fmt.Sprintf("%s/%d-%s/%d", storageURL, i%concurrency, salt, rand.Int63())
objects[i].Data = data
objects[i].Id = i
objects[i].State = 1
objects[i] = &Object{
state: 0,
container: fmt.Sprintf("%d-%s", i%concurrency, salt),
name: fmt.Sprintf("%x", rand.Int63()),
data: data,
c: cli,
}
}
workch := make(chan func() bool)
@ -293,15 +285,20 @@ func RunThrash(args []string) {
for {
i := int(rand.Int63() % int64(len(objects)))
if objects[i].State == 1 {
if objects[i].state == 1 {
workch <- objects[i].Put
} else if objects[i].State < numGets+2 {
} else if objects[i].state < numGets+2 {
workch <- objects[i].Get
} else if objects[i].State >= numGets+2 {
} else if objects[i].state >= numGets+2 {
workch <- objects[i].Delete
objects[i] = &Object{Url: fmt.Sprintf("%s/%d-%s/%d", storageURL, i%concurrency, salt, rand.Int63()), Data: data, Id: i, State: 1}
objects[i] = &Object{
container: fmt.Sprintf("%d-%s", i%concurrency, salt),
name: fmt.Sprintf("%x", rand.Int63()),
data: data,
c: cli,
}
continue
}
objects[i].State += 1
objects[i].state += 1
}
}

389
go/client/userclient.go Normal file
View File

@ -0,0 +1,389 @@
package client
import (
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"io"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"
)
// userClient is a Client to be used by end-users. It knows how to authenticate with auth v1 and v2.
type userClient struct {
client *http.Client
ServiceURL string
AuthToken string
tenant, username, password, apikey, region, authurl string
private bool
}
var _ Client = &userClient{}
func (c *userClient) authedRequest(method string, path string, body io.Reader, headers map[string]string) (*http.Request, error) {
req, err := http.NewRequest(method, c.ServiceURL+path, body)
if err != nil {
return nil, err
}
req.Header.Set("X-Auth-Token", c.AuthToken)
req.Header.Set("User-Agent", "Hummingbird Client")
for k, v := range headers {
req.Header.Set(k, v)
}
return req, nil
}
func (c *userClient) do(req *http.Request) (*http.Response, error) {
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode == 401 {
resp.Body.Close()
if c.authenticate() != nil {
return nil, errors.New("Authentication failed.")
}
resp, err = c.client.Do(req)
if err != nil {
return nil, err
}
}
if resp.StatusCode/100 != 2 {
resp.Body.Close()
return nil, HTTPError(resp.StatusCode)
}
return resp, nil
}
func (c *userClient) doRequest(method string, path string, body io.Reader, headers map[string]string) error {
req, err := c.authedRequest(method, path, body, headers)
if err != nil {
return err
}
resp, err := c.do(req)
if err != nil {
return err
}
resp.Body.Close()
return nil
}
func (c *userClient) PutAccount(headers map[string]string) (err error) {
return c.doRequest("PUT", "", nil, headers)
}
func (c *userClient) PostAccount(headers map[string]string) (err error) {
return c.doRequest("POST", "", nil, headers)
}
func (c *userClient) GetAccount(marker string, endMarker string, limit int, prefix string, delimiter string, headers map[string]string) ([]ContainerRecord, map[string]string, error) {
limitStr := ""
if limit > 0 {
limitStr = strconv.Itoa(limit)
}
path := mkquery(map[string]string{"marker": marker, "end_marker": endMarker, "prefix": prefix, "delimiter": delimiter, "limit": limitStr})
req, err := c.authedRequest("GET", path, nil, headers)
if err != nil {
return nil, nil, err
}
req.Header.Set("Accept", "application/json")
resp, err := c.do(req)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, nil, err
}
var accountListing []ContainerRecord
if err := json.Unmarshal(body, &accountListing); err != nil {
return nil, nil, err
}
return accountListing, headers2Map(resp.Header), nil
}
func (c *userClient) HeadAccount(headers map[string]string) (map[string]string, error) {
req, err := c.authedRequest("HEAD", "", nil, headers)
if err != nil {
return nil, err
}
resp, err := c.do(req)
if err != nil {
return nil, err
}
resp.Body.Close()
return headers2Map(resp.Header), nil
}
func (c *userClient) DeleteAccount(headers map[string]string) (err error) {
return c.doRequest("DELETE", "", nil, nil)
}
func (c *userClient) PutContainer(container string, headers map[string]string) (err error) {
return c.doRequest("PUT", "/"+container, nil, headers)
}
func (c *userClient) PostContainer(container string, headers map[string]string) (err error) {
return c.doRequest("POST", "/"+container, nil, headers)
}
func (c *userClient) GetContainer(container string, marker string, endMarker string, limit int, prefix string, delimiter string, headers map[string]string) ([]ObjectRecord, map[string]string, error) {
limitStr := ""
if limit > 0 {
limitStr = strconv.Itoa(limit)
}
path := "/" + container + mkquery(map[string]string{"marker": marker, "end_marker": endMarker, "prefix": prefix, "delimiter": delimiter, "limit": limitStr})
req, err := c.authedRequest("GET", path, nil, headers)
if err != nil {
return nil, nil, err
}
req.Header.Set("Accept", "application/json")
resp, err := c.do(req)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, nil, err
}
var containerListing []ObjectRecord
if err := json.Unmarshal(body, &containerListing); err != nil {
return nil, nil, err
}
return containerListing, headers2Map(resp.Header), nil
}
func (c *userClient) HeadContainer(container string, headers map[string]string) (map[string]string, error) {
req, err := c.authedRequest("HEAD", "/"+container, nil, headers)
if err != nil {
return nil, err
}
resp, err := c.do(req)
if err != nil {
return nil, err
}
resp.Body.Close()
return headers2Map(resp.Header), nil
}
func (c *userClient) DeleteContainer(container string, headers map[string]string) (err error) {
return c.doRequest("DELETE", "/"+container, nil, headers)
}
func (c *userClient) PutObject(container string, obj string, headers map[string]string, src io.Reader) (err error) {
return c.doRequest("PUT", "/"+container+"/"+obj, src, headers)
}
func (c *userClient) PostObject(container string, obj string, headers map[string]string) (err error) {
return c.doRequest("POST", "/"+container+"/"+obj, nil, headers)
}
func (c *userClient) GetObject(container string, obj string, headers map[string]string) (io.ReadCloser, map[string]string, error) {
req, err := c.authedRequest("GET", "/"+container+"/"+obj, nil, headers)
if err != nil {
return nil, nil, err
}
resp, err := c.do(req)
if err != nil {
return nil, nil, err
}
return resp.Body, headers2Map(resp.Header), nil
}
func (c *userClient) HeadObject(container string, obj string, headers map[string]string) (map[string]string, error) {
req, err := c.authedRequest("HEAD", "/"+container+"/"+obj, nil, headers)
if err != nil {
return nil, err
}
resp, err := c.do(req)
if err != nil {
return nil, err
}
resp.Body.Close()
return headers2Map(resp.Header), nil
}
func (c *userClient) DeleteObject(container string, obj string, headers map[string]string) (err error) {
return c.doRequest("DELETE", "/"+container+"/"+obj, nil, headers)
}
func (c *userClient) authenticatev1() error {
req, err := http.NewRequest("GET", c.authurl, nil)
req.Header.Set("X-Auth-User", c.username)
req.Header.Set("X-Auth-Key", c.apikey)
if err != nil {
return err
}
resp, err := c.client.Do(req)
if err != nil {
return err
}
resp.Body.Close()
if resp.StatusCode/100 != 2 {
return HTTPError(resp.StatusCode)
}
c.ServiceURL = resp.Header.Get("X-Storage-Url")
c.AuthToken = resp.Header.Get("X-Auth-Token")
if c.ServiceURL != "" && c.AuthToken != "" {
return nil
}
return errors.New("Failed to authenticate.")
}
type KeystoneRequestV2 struct {
Auth interface{} `json:"auth"`
}
type KeystonePasswordAuthV2 struct {
TenantName string `json:"tenantName"`
PasswordCredentials struct {
Username string `json:"username"`
Password string `json:"password"`
} `json:"passwordCredentials"`
}
type RaxAPIKeyAuthV2 struct {
APIKeyCredentials struct {
Username string `json:"username"`
APIKey string `json:"apiKey"`
} `json:"RAX-KSKEY:apiKeyCredentials"`
}
type KeystoneResponseV2 struct {
Access struct {
Token struct {
ID string `json:"id"`
Tenant struct {
Name string `json:"name"`
ID string `json:"id"`
} `json:"tenant"`
} `json:"token"`
ServiceCatalog []struct {
Endpoints []struct {
PublicURL string `json:"publicURL"`
InternalURL string `json:"internalURL"`
Region string `json:"region"`
} `json:"endpoints"`
Type string `json:"type"`
} `json:"serviceCatalog"`
User struct {
RaxDefaultRegion string `json:"RAX-AUTH:defaultRegion"`
} `json:"user"`
} `json:"access"`
}
func (c *userClient) authenticatev2() (err error) {
if !strings.HasSuffix(c.authurl, "tokens") {
if c.authurl[len(c.authurl)-1] == '/' {
c.authurl = c.authurl + "tokens"
} else {
c.authurl = c.authurl + "/tokens"
}
}
var authReq []byte
if c.password != "" {
creds := &KeystonePasswordAuthV2{TenantName: c.tenant}
creds.PasswordCredentials.Username = c.username
creds.PasswordCredentials.Password = c.password
authReq, err = json.Marshal(&KeystoneRequestV2{Auth: creds})
} else if c.apikey != "" {
creds := &RaxAPIKeyAuthV2{}
creds.APIKeyCredentials.Username = c.username
creds.APIKeyCredentials.APIKey = c.apikey
authReq, err = json.Marshal(&KeystoneRequestV2{Auth: creds})
} else {
return errors.New("Couldn't figure out what credentials to use.")
}
if err != nil {
return err
}
resp, err := c.client.Post(c.authurl, "application/json", bytes.NewBuffer(authReq))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
return HTTPError(resp.StatusCode)
}
var authResponse KeystoneResponseV2
if body, err := ioutil.ReadAll(resp.Body); err != nil {
return err
} else if err = json.Unmarshal(body, &authResponse); err != nil {
return err
}
c.AuthToken = authResponse.Access.Token.ID
region := c.region
if region == "" {
region = authResponse.Access.User.RaxDefaultRegion
}
for _, s := range authResponse.Access.ServiceCatalog {
if s.Type == "object-store" {
for _, e := range s.Endpoints {
if e.Region == region || region == "" || len(s.Endpoints) == 1 {
if c.private {
c.ServiceURL = e.InternalURL
} else {
c.ServiceURL = e.PublicURL
}
return nil
}
}
}
}
return errors.New("Didn't find endpoint")
}
func (c *userClient) authenticate() error {
if strings.Contains(c.authurl, "/v2") {
return c.authenticatev2()
} else {
return c.authenticatev1()
}
}
// NewClient creates a new end-user client. It authenticates immediately, and returns an error if unable to.
func NewClient(tenant string, username string, password string, apikey string, region string, authurl string, private bool) (Client, error) {
c := &userClient{
client: &http.Client{Timeout: 30 * time.Minute},
tenant: tenant,
username: username,
password: password,
apikey: apikey,
region: region,
authurl: authurl,
private: private,
}
if err := c.authenticate(); err != nil {
return nil, err
}
return c, nil
}
// NewInsecureClient creates a new end-user client with SSL verification turned off. It authenticates immediately, and returns an error if unable to.
func NewInsecureClient(tenant string, username string, password string, apikey string, region string, authurl string, private bool) (Client, error) {
c := &userClient{
client: &http.Client{
Timeout: 30 * time.Minute,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
},
tenant: tenant,
username: username,
password: password,
apikey: apikey,
region: region,
authurl: authurl,
private: private,
}
if err := c.authenticate(); err != nil {
return nil, err
}
return c, nil
}