swift/go/client/directclient.go

641 lines
21 KiB
Go

package client
import (
"encoding/json"
"fmt"
"io"
"math"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/openstack/swift/go/hummingbird"
)
func map2Headers(m map[string]string) http.Header {
if m == nil {
return nil
}
headers := make(http.Header, len(m))
for k, v := range m {
headers.Set(k, v)
}
return headers
}
func headers2Map(headers http.Header) map[string]string {
if headers == nil {
return nil
}
m := make(map[string]string, len(headers))
for k := range headers {
m[k] = headers.Get(k)
}
return m
}
func mkquery(options map[string]string) string {
query := ""
for k, v := range options {
if v != "" {
query += url.QueryEscape(k) + "=" + url.QueryEscape(v) + "&"
}
}
if query != "" {
return "?" + strings.TrimRight(query, "&")
}
return ""
}
type ProxyDirectClient struct {
client *http.Client
AccountRing hummingbird.Ring
ContainerRing hummingbird.Ring
ObjectRing hummingbird.Ring
}
func (c *ProxyDirectClient) quorumResponse(reqs ...*http.Request) int {
// this is based on swift's best_response function.
statusCodes := make(chan int)
cancel := make(chan struct{})
defer close(cancel)
for _, req := range reqs {
go func(req *http.Request) {
status := 500
if resp, err := c.client.Do(req); err == nil {
status = resp.StatusCode
resp.Body.Close()
}
select {
case statusCodes <- status:
case <-cancel:
}
}(req)
}
quorum := int(math.Ceil(float64(len(reqs)) / 2.0))
responseClasses := []int{0, 0, 0, 0, 0, 0}
for status := range statusCodes {
class := status / 100
if class <= 5 {
responseClasses[class]++
if responseClasses[class] >= quorum {
return status
}
}
}
return 503
}
func (c *ProxyDirectClient) firstResponse(reqs ...*http.Request) (resp *http.Response) {
resps := make(chan *http.Response)
cancel := make(chan struct{})
defer close(cancel)
for _, req := range reqs {
req.Cancel = cancel
go func(req *http.Request) {
resp, err := c.client.Do(req)
if err != nil {
resp = nil
}
select {
case resps <- resp:
case <-cancel:
if resp != nil {
resp.Body.Close()
}
}
}(req)
select {
case <-time.After(time.Second):
case resp = <-resps:
if resp.StatusCode/100 == 2 {
return
}
}
}
return
}
var _ ProxyClient = &ProxyDirectClient{}
func (c *ProxyDirectClient) PutAccount(account string, headers http.Header) int {
partition := c.AccountRing.GetPartition(account, "", "")
reqs := make([]*http.Request, 0)
for _, device := range c.AccountRing.GetNodes(partition) {
url := fmt.Sprintf("http://%s:%d/%s/%d/%s", device.Ip, device.Port, device.Device, partition, hummingbird.Urlencode(account))
req, _ := http.NewRequest("PUT", url, nil)
for key := range headers {
req.Header.Set(key, headers.Get(key))
}
reqs = append(reqs, req)
}
return c.quorumResponse(reqs...)
}
func (c *ProxyDirectClient) PostAccount(account string, headers http.Header) int {
partition := c.AccountRing.GetPartition(account, "", "")
reqs := make([]*http.Request, 0)
for _, device := range c.AccountRing.GetNodes(partition) {
url := fmt.Sprintf("http://%s:%d/%s/%d/%s", device.Ip, device.Port, device.Device, partition, hummingbird.Urlencode(account))
req, _ := http.NewRequest("POST", url, nil)
for key := range headers {
req.Header.Set(key, headers.Get(key))
}
reqs = append(reqs, req)
}
return c.quorumResponse(reqs...)
}
func (c *ProxyDirectClient) GetAccount(account string, options map[string]string, headers http.Header) (io.ReadCloser, http.Header, int) {
partition := c.AccountRing.GetPartition(account, "", "")
reqs := make([]*http.Request, 0)
query := mkquery(options)
for _, device := range c.AccountRing.GetNodes(partition) {
url := fmt.Sprintf("http://%s:%d/%s/%d/%s%s", device.Ip, device.Port, device.Device, partition,
hummingbird.Urlencode(account), query)
req, _ := http.NewRequest("GET", url, nil)
for key := range headers {
req.Header.Set(key, headers.Get(key))
}
reqs = append(reqs, req)
}
resp := c.firstResponse(reqs...)
if resp == nil {
return nil, nil, 404
}
return resp.Body, resp.Header, resp.StatusCode
}
func (c *ProxyDirectClient) HeadAccount(account string, headers http.Header) (http.Header, int) {
partition := c.AccountRing.GetPartition(account, "", "")
reqs := make([]*http.Request, 0)
for _, device := range c.AccountRing.GetNodes(partition) {
url := fmt.Sprintf("http://%s:%d/%s/%d/%s", device.Ip, device.Port, device.Device, partition,
hummingbird.Urlencode(account))
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
continue
}
for key := range headers {
req.Header.Set(key, headers.Get(key))
}
reqs = append(reqs, req)
}
resp := c.firstResponse(reqs...)
if resp == nil {
return nil, 404
}
resp.Body.Close()
return resp.Header, resp.StatusCode
}
func (c *ProxyDirectClient) DeleteAccount(account string, headers http.Header) int {
partition := c.AccountRing.GetPartition(account, "", "")
reqs := make([]*http.Request, 0)
for _, device := range c.AccountRing.GetNodes(partition) {
url := fmt.Sprintf("http://%s:%d/%s/%d/%s", device.Ip, device.Port, device.Device, partition, hummingbird.Urlencode(account))
req, _ := http.NewRequest("DELETE", url, nil)
for key := range headers {
req.Header.Set(key, headers.Get(key))
}
reqs = append(reqs, req)
}
return c.quorumResponse(reqs...)
}
func (c *ProxyDirectClient) PutContainer(account string, container string, headers http.Header) int {
partition := c.ContainerRing.GetPartition(account, container, "")
accountPartition := c.AccountRing.GetPartition(account, "", "")
accountDevices := c.AccountRing.GetNodes(accountPartition)
reqs := make([]*http.Request, 0)
for i, device := range c.ContainerRing.GetNodes(partition) {
url := fmt.Sprintf("http://%s:%d/%s/%d/%s/%s", device.Ip, device.Port, device.Device, partition,
hummingbird.Urlencode(account), hummingbird.Urlencode(container))
req, _ := http.NewRequest("PUT", url, nil)
for key := range headers {
req.Header.Set(key, headers.Get(key))
}
req.Header.Set("X-Account-Partition", strconv.FormatUint(accountPartition, 10))
req.Header.Set("X-Account-Host", fmt.Sprintf("%s:%d", accountDevices[i].Ip, accountDevices[i].Port))
req.Header.Set("X-Account-Device", accountDevices[i].Device)
reqs = append(reqs, req)
}
return c.quorumResponse(reqs...)
}
func (c *ProxyDirectClient) PostContainer(account string, container string, headers http.Header) int {
partition := c.ContainerRing.GetPartition(account, container, "")
reqs := make([]*http.Request, 0)
for _, device := range c.ContainerRing.GetNodes(partition) {
url := fmt.Sprintf("http://%s:%d/%s/%d/%s/%s", device.Ip, device.Port, device.Device, partition,
hummingbird.Urlencode(account), hummingbird.Urlencode(container))
req, _ := http.NewRequest("POST", url, nil)
for key := range headers {
req.Header.Set(key, headers.Get(key))
}
reqs = append(reqs, req)
}
return c.quorumResponse(reqs...)
}
func (c *ProxyDirectClient) GetContainer(account string, container string, options map[string]string, headers http.Header) (io.ReadCloser, http.Header, int) {
partition := c.ContainerRing.GetPartition(account, container, "")
reqs := make([]*http.Request, 0)
query := mkquery(options)
for _, device := range c.ContainerRing.GetNodes(partition) {
url := fmt.Sprintf("http://%s:%d/%s/%d/%s/%s%s", device.Ip, device.Port, device.Device, partition,
hummingbird.Urlencode(account), hummingbird.Urlencode(container), query)
req, _ := http.NewRequest("GET", url, nil)
for key := range headers {
req.Header.Set(key, headers.Get(key))
}
reqs = append(reqs, req)
}
resp := c.firstResponse(reqs...)
if resp == nil {
return nil, nil, 404
}
return resp.Body, resp.Header, resp.StatusCode
}
func (c *ProxyDirectClient) HeadContainer(account string, container string, headers http.Header) (http.Header, int) {
partition := c.ObjectRing.GetPartition(account, container, "")
reqs := make([]*http.Request, 0)
for _, device := range c.ContainerRing.GetNodes(partition) {
url := fmt.Sprintf("http://%s:%d/%s/%d/%s/%s", device.Ip, device.Port, device.Device, partition,
hummingbird.Urlencode(account), hummingbird.Urlencode(container))
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
continue
}
for key := range headers {
req.Header.Set(key, headers.Get(key))
}
reqs = append(reqs, req)
}
resp := c.firstResponse(reqs...)
if resp == nil {
return nil, 404
}
resp.Body.Close()
return resp.Header, resp.StatusCode
}
func (c *ProxyDirectClient) DeleteContainer(account string, container string, headers http.Header) int {
partition := c.ContainerRing.GetPartition(account, container, "")
accountPartition := c.AccountRing.GetPartition(account, "", "")
accountDevices := c.AccountRing.GetNodes(accountPartition)
reqs := make([]*http.Request, 0)
for i, device := range c.ContainerRing.GetNodes(partition) {
url := fmt.Sprintf("http://%s:%d/%s/%d/%s/%s", device.Ip, device.Port, device.Device, partition,
hummingbird.Urlencode(account), hummingbird.Urlencode(container))
req, _ := http.NewRequest("DELETE", url, nil)
for key := range headers {
req.Header.Set(key, headers.Get(key))
}
req.Header.Set("X-Account-Partition", strconv.FormatUint(accountPartition, 10))
req.Header.Set("X-Account-Host", fmt.Sprintf("%s:%d", accountDevices[i].Ip, accountDevices[i].Port))
req.Header.Set("X-Account-Device", accountDevices[i].Device)
reqs = append(reqs, req)
}
return c.quorumResponse(reqs...)
}
func (c *ProxyDirectClient) PutObject(account string, container string, obj string, headers http.Header, src io.Reader) int {
partition := c.ObjectRing.GetPartition(account, container, obj)
containerPartition := c.ContainerRing.GetPartition(account, container, "")
containerDevices := c.ContainerRing.GetNodes(containerPartition)
var writers []*io.PipeWriter
reqs := make([]*http.Request, 0)
for i, device := range c.ObjectRing.GetNodes(partition) {
url := fmt.Sprintf("http://%s:%d/%s/%d/%s/%s/%s", device.Ip, device.Port, device.Device, partition,
hummingbird.Urlencode(account), hummingbird.Urlencode(container), hummingbird.Urlencode(obj))
rp, wp := io.Pipe()
defer wp.Close()
defer rp.Close()
req, err := http.NewRequest("PUT", url, rp)
if err != nil {
continue
}
writers = append(writers, wp)
for key := range headers {
req.Header.Set(key, headers.Get(key))
}
// req.ContentLength = request.ContentLength // TODO
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("X-Container-Partition", strconv.FormatUint(containerPartition, 10))
req.Header.Set("X-Container-Host", fmt.Sprintf("%s:%d", containerDevices[i].Ip, containerDevices[i].Port))
req.Header.Set("X-Container-Device", containerDevices[i].Device)
req.Header.Set("Expect", "100-Continue")
reqs = append(reqs, req)
}
go func() {
mw := io.MultiWriter(writers[0], writers[1], writers[2])
io.Copy(mw, src)
for _, writer := range writers {
writer.Close()
}
}()
return c.quorumResponse(reqs...)
}
func (c *ProxyDirectClient) PostObject(account string, container string, obj string, headers http.Header) int {
partition := c.ObjectRing.GetPartition(account, container, obj)
containerPartition := c.ContainerRing.GetPartition(account, container, "")
containerDevices := c.ContainerRing.GetNodes(containerPartition)
reqs := make([]*http.Request, 0)
for i, device := range c.ObjectRing.GetNodes(partition) {
url := fmt.Sprintf("http://%s:%d/%s/%d/%s/%s/%s", device.Ip, device.Port, device.Device, partition,
hummingbird.Urlencode(account), hummingbird.Urlencode(container), hummingbird.Urlencode(obj))
req, _ := http.NewRequest("POST", url, nil)
for key := range headers {
req.Header.Set(key, headers.Get(key))
}
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("X-Container-Partition", strconv.FormatUint(containerPartition, 10))
req.Header.Set("X-Container-Host", fmt.Sprintf("%s:%d", containerDevices[i].Ip, containerDevices[i].Port))
req.Header.Set("X-Container-Device", containerDevices[i].Device)
reqs = append(reqs, req)
}
return c.quorumResponse(reqs...)
}
func (c *ProxyDirectClient) GetObject(account string, container string, obj string, headers http.Header) (io.ReadCloser, http.Header, int) {
partition := c.ObjectRing.GetPartition(account, container, obj)
nodes := c.ObjectRing.GetNodes(partition)
reqs := make([]*http.Request, 0, len(nodes))
for _, device := range nodes {
url := fmt.Sprintf("http://%s:%d/%s/%d/%s/%s/%s", device.Ip, device.Port, device.Device, partition,
hummingbird.Urlencode(account), hummingbird.Urlencode(container), hummingbird.Urlencode(obj))
req, err := http.NewRequest("GET", url, nil)
if err != nil {
continue
}
for key := range headers {
req.Header.Set(key, headers.Get(key))
}
reqs = append(reqs, req)
}
resp := c.firstResponse(reqs...)
if resp == nil {
return nil, nil, 404
}
return resp.Body, resp.Header, resp.StatusCode
}
func (c *ProxyDirectClient) GrepObject(account string, container string, obj string, search string) (io.ReadCloser, http.Header, int) {
partition := c.ObjectRing.GetPartition(account, container, obj)
nodes := c.ObjectRing.GetNodes(partition)
reqs := make([]*http.Request, 0, len(nodes))
for _, device := range nodes {
url := fmt.Sprintf("http://%s:%d/%s/%d/%s/%s/%s?e=%s", device.Ip, device.Port, device.Device, partition,
hummingbird.Urlencode(account), hummingbird.Urlencode(container), hummingbird.Urlencode(obj), hummingbird.Urlencode(search))
req, err := http.NewRequest("GREP", url, nil)
if err != nil {
continue
}
reqs = append(reqs, req)
}
resp := c.firstResponse(reqs...)
if resp == nil {
return nil, nil, 404
}
return resp.Body, resp.Header, resp.StatusCode
}
func (c *ProxyDirectClient) HeadObject(account string, container string, obj string, headers http.Header) (http.Header, int) {
partition := c.ObjectRing.GetPartition(account, container, obj)
nodes := c.ObjectRing.GetNodes(partition)
reqs := make([]*http.Request, 0, len(nodes))
for _, device := range nodes {
url := fmt.Sprintf("http://%s:%d/%s/%d/%s/%s/%s", device.Ip, device.Port, device.Device, partition,
hummingbird.Urlencode(account), hummingbird.Urlencode(container), hummingbird.Urlencode(obj))
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
continue
}
for key := range headers {
req.Header.Set(key, headers.Get(key))
}
reqs = append(reqs, req)
}
resp := c.firstResponse(reqs...)
if resp == nil {
return nil, 404
}
resp.Body.Close()
return resp.Header, resp.StatusCode
}
func (c *ProxyDirectClient) DeleteObject(account string, container string, obj string, headers http.Header) int {
partition := c.ObjectRing.GetPartition(account, container, obj)
containerPartition := c.ContainerRing.GetPartition(account, container, "")
containerDevices := c.ContainerRing.GetNodes(containerPartition)
reqs := make([]*http.Request, 0)
for i, device := range c.ObjectRing.GetNodes(partition) {
url := fmt.Sprintf("http://%s:%d/%s/%d/%s/%s/%s", device.Ip, device.Port, device.Device, partition,
hummingbird.Urlencode(account), hummingbird.Urlencode(container), hummingbird.Urlencode(obj))
req, _ := http.NewRequest("DELETE", url, nil)
for key := range headers {
req.Header.Set(key, headers.Get(key))
}
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("X-Container-Partition", strconv.FormatUint(containerPartition, 10))
req.Header.Set("X-Container-Host", fmt.Sprintf("%s:%d", containerDevices[i].Ip, containerDevices[i].Port))
req.Header.Set("X-Container-Device", containerDevices[i].Device)
reqs = append(reqs, req)
}
return c.quorumResponse(reqs...)
}
func NewProxyDirectClient() (ProxyClient, error) {
c := &ProxyDirectClient{}
hashPathPrefix, hashPathSuffix, err := hummingbird.GetHashPrefixAndSuffix()
if err != nil {
return nil, err
}
c.ObjectRing, err = hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix, 0)
if err != nil {
return nil, err
}
c.ContainerRing, err = hummingbird.GetRing("container", hashPathPrefix, hashPathSuffix, 0)
if err != nil {
return nil, err
}
c.AccountRing, err = hummingbird.GetRing("account", hashPathPrefix, hashPathSuffix, 0)
if err != nil {
return nil, err
}
c.client = &http.Client{
Transport: &http.Transport{
Dial: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 5 * time.Second,
}).Dial,
},
Timeout: 120 * time.Minute,
}
return c, nil
}
type directClient struct {
*ProxyDirectClient
account string
}
var _ Client = &directClient{}
func (c *directClient) PutAccount(headers map[string]string) (err error) {
if code := c.ProxyDirectClient.PutAccount(c.account, map2Headers(headers)); code/100 != 2 {
return HTTPError(code)
}
return nil
}
func (c *directClient) PostAccount(headers map[string]string) (err error) {
if code := c.ProxyDirectClient.PostAccount(c.account, map2Headers(headers)); code/100 != 2 {
return HTTPError(code)
}
return nil
}
func (c *directClient) GetAccount(marker string, endMarker string, limit int, prefix string, delimiter string, headers map[string]string) ([]ContainerRecord, map[string]string, error) {
options := map[string]string{
"format": "json",
"marker": marker,
"end_marker": endMarker,
"prefix": prefix,
"delimiter": delimiter,
}
if limit != 0 {
options["limit"] = strconv.Itoa(limit)
}
r, h, code := c.ProxyDirectClient.GetAccount(c.account, options, map2Headers(headers))
if code != 200 {
return nil, nil, HTTPError(code)
}
var accountListing []ContainerRecord
decoder := json.NewDecoder(r)
decoder.Decode(&accountListing)
return accountListing, headers2Map(h), nil
}
func (c *directClient) HeadAccount(headers map[string]string) (map[string]string, error) {
h, code := c.ProxyDirectClient.HeadAccount(c.account, map2Headers(headers))
if code/100 != 2 {
return nil, HTTPError(code)
}
return headers2Map(h), nil
}
func (c *directClient) DeleteAccount(headers map[string]string) (err error) {
if code := c.ProxyDirectClient.DeleteAccount(c.account, map2Headers(headers)); code/100 != 2 {
return HTTPError(code)
}
return nil
}
func (c *directClient) PutContainer(container string, headers map[string]string) (err error) {
if code := c.ProxyDirectClient.PutContainer(c.account, container, map2Headers(headers)); code/100 != 2 {
return HTTPError(code)
}
return nil
}
func (c *directClient) PostContainer(container string, headers map[string]string) (err error) {
if code := c.ProxyDirectClient.PostContainer(c.account, container, map2Headers(headers)); code/100 != 2 {
return HTTPError(code)
}
return nil
}
func (c *directClient) GetContainer(container string, marker string, endMarker string, limit int, prefix string, delimiter string, headers map[string]string) ([]ObjectRecord, map[string]string, error) {
options := map[string]string{
"format": "json",
"marker": marker,
"end_marker": endMarker,
"prefix": prefix,
"delimiter": delimiter,
}
if limit != 0 {
options["limit"] = strconv.Itoa(limit)
}
r, h, code := c.ProxyDirectClient.GetContainer(c.account, container, options, map2Headers(headers))
if code != 200 {
return nil, nil, HTTPError(code)
}
defer r.Close()
var containerListing []ObjectRecord
decoder := json.NewDecoder(r)
decoder.Decode(&containerListing)
return containerListing, headers2Map(h), nil
}
func (c *directClient) HeadContainer(container string, headers map[string]string) (map[string]string, error) {
h, code := c.ProxyDirectClient.HeadContainer(c.account, container, map2Headers(headers))
if code/100 != 2 {
return nil, HTTPError(code)
}
return headers2Map(h), nil
}
func (c *directClient) DeleteContainer(container string, headers map[string]string) (err error) {
if code := c.ProxyDirectClient.DeleteContainer(c.account, container, map2Headers(headers)); code/100 != 2 {
return HTTPError(code)
}
return nil
}
func (c *directClient) PutObject(container string, obj string, headers map[string]string, src io.Reader) (err error) {
if code := c.ProxyDirectClient.PutObject(c.account, container, obj, map2Headers(headers), src); code/100 != 2 {
return HTTPError(code)
}
return nil
}
func (c *directClient) PostObject(container string, obj string, headers map[string]string) (err error) {
if code := c.ProxyDirectClient.PostObject(c.account, container, obj, map2Headers(headers)); code/100 != 2 {
return HTTPError(code)
}
return nil
}
func (c *directClient) GetObject(container string, obj string, headers map[string]string) (io.ReadCloser, map[string]string, error) {
r, h, code := c.ProxyDirectClient.GetObject(c.account, container, obj, map2Headers(headers))
if code/100 != 2 {
if r != nil {
r.Close()
}
return nil, nil, HTTPError(code)
}
return r, headers2Map(h), nil
}
func (c *directClient) HeadObject(container string, obj string, headers map[string]string) (map[string]string, error) {
h, code := c.ProxyDirectClient.HeadObject(c.account, container, obj, map2Headers(headers))
if code/100 != 2 {
return nil, HTTPError(code)
}
return headers2Map(h), nil
}
func (c *directClient) DeleteObject(container string, obj string, headers map[string]string) (err error) {
if code := c.ProxyDirectClient.DeleteObject(c.account, container, obj, map2Headers(headers)); code/100 != 2 {
return HTTPError(code)
}
return nil
}
// NewDirectClient creates a new direct client with the given account name.
func NewDirectClient(account string) (Client, error) {
rdc, err := NewProxyDirectClient()
if err != nil {
return nil, err
}
return &directClient{account: account, ProxyDirectClient: rdc.(*ProxyDirectClient)}, nil
}