go: storage policies

This implements storage policies (as best I understand them).
It turned into a pretty big patch and changes a bunch of stuff.

There's a lot of plumbing of policies into functions that choose an
"objects" directory.

The replicator and auditor also handle any "replication" policies.
Those may get abstracted later on so you can register auditor and
replicator equivalents for non-replication object engines.

Request routing can take policies into account.  This is so that
something like "REPLICATE" calls can be routed to the correct object
engine, since they're instantiated per-policy.

They're instantiated per-policy so they can get the configs for their
policy, which swift doesn't really do, but I think could be useful.

Change-Id: Ia90664f9b178a29e1061bb03d0dd442ba6636c73
This commit is contained in:
Michael Barton 2016-07-02 04:57:15 +00:00
parent ffe0901240
commit 257b0d8041
24 changed files with 658 additions and 222 deletions

View File

@ -472,15 +472,15 @@ func NewProxyDirectClient() (ProxyClient, error) {
if err != nil {
return nil, err
}
c.ObjectRing, err = hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix)
c.ObjectRing, err = hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix, 0)
if err != nil {
return nil, err
}
c.ContainerRing, err = hummingbird.GetRing("container", hashPathPrefix, hashPathSuffix)
c.ContainerRing, err = hummingbird.GetRing("container", hashPathPrefix, hashPathSuffix, 0)
if err != nil {
return nil, err
}
c.AccountRing, err = hummingbird.GetRing("account", hashPathPrefix, hashPathSuffix)
c.AccountRing, err = hummingbird.GetRing("account", hashPathPrefix, hashPathSuffix, 0)
if err != nil {
return nil, err
}

86
go/hummingbird/policy.go Normal file
View File

@ -0,0 +1,86 @@
// Copyright (c) 2015 Rackspace
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package hummingbird
import (
"fmt"
"strings"
)
type Policy struct {
Index int
Type string
Name string
Aliases []string
Default bool
Deprecated bool
Config map[string]string
}
type PolicyList map[int]*Policy
// LoadPolicies loads policies, probably from /etc/swift/swift.conf
func normalLoadPolicies() PolicyList {
policies := map[int]*Policy{0: &Policy{
Index: 0,
Type: "replication",
Name: "Policy-0",
Aliases: nil,
Default: false,
Deprecated: false,
}}
for _, loc := range configLocations {
if conf, e := LoadConfig(loc); e == nil {
for key := range conf.File {
var policyIndex int
if c, err := fmt.Sscanf(key, "storage-policy:%d", &policyIndex); err == nil && c == 1 {
aliases := []string{}
aliasList := conf.GetDefault(key, "aliases", "")
for _, alias := range strings.Split(aliasList, ",") {
alias = strings.Trim(alias, " ")
if alias != "" {
aliases = append(aliases, alias)
}
}
policies[policyIndex] = &Policy{
Index: policyIndex,
Type: conf.GetDefault(key, "policy_type", "replication"),
Name: conf.GetDefault(key, "name", fmt.Sprintf("Policy-%d", policyIndex)),
Aliases: aliases,
Deprecated: conf.GetBool(key, "deprecated", false),
Default: conf.GetBool(key, "default", false),
Config: map[string]string(conf.File[key]),
}
}
}
break
}
}
defaultFound := false
for _, policy := range policies {
if policy.Default {
defaultFound = true
}
}
if !defaultFound {
policies[0].Default = true
}
return PolicyList(policies)
}
type loadPoliciesFunc func() PolicyList
var LoadPolicies loadPoliciesFunc = normalLoadPolicies

View File

@ -0,0 +1,63 @@
// Copyright (c) 2015 Rackspace
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package hummingbird
import (
"io/ioutil"
"os"
"testing"
"github.com/stretchr/testify/require"
)
func TestLoadPolicy(t *testing.T) {
tempFile, _ := ioutil.TempFile("", "INI")
tempFile.Write([]byte("[swift-hash]\nswift_hash_path_prefix = changeme\nswift_hash_path_suffix = changeme\n" +
"[storage-policy:0]\nname = gold\naliases = yellow, orange\npolicy_type = replication\ndefault = yes\n" +
"[storage-policy:1]\nname = silver\npolicy_type = replication\ndeprecated = yes\n"))
oldConfigs := configLocations
defer func() {
configLocations = oldConfigs
defer tempFile.Close()
defer os.Remove(tempFile.Name())
}()
configLocations = []string{tempFile.Name()}
policyList := LoadPolicies()
require.Equal(t, policyList[0].Name, "gold")
require.Equal(t, policyList[0].Default, true)
require.Equal(t, policyList[0].Deprecated, false)
require.Equal(t, policyList[0].Aliases, []string{"yellow", "orange"})
require.Equal(t, policyList[1].Name, "silver")
require.Equal(t, policyList[1].Deprecated, true)
require.Equal(t, policyList[1].Default, false)
require.Equal(t, policyList[1].Aliases, []string{})
}
func TestNoPolicies(t *testing.T) {
tempFile, _ := ioutil.TempFile("", "INI")
tempFile.Write([]byte("[swift-hash]\nswift_hash_path_prefix = changeme\nswift_hash_path_suffix = changeme\n"))
oldConfigs := configLocations
defer func() {
configLocations = oldConfigs
defer tempFile.Close()
defer os.Remove(tempFile.Name())
}()
configLocations = []string{tempFile.Name()}
policyList := LoadPolicies()
require.Equal(t, policyList[0].Name, "Policy-0")
require.Equal(t, policyList[0].Default, true)
require.Equal(t, policyList[0].Deprecated, false)
}

View File

@ -324,12 +324,16 @@ func LoadRing(path string, prefix string, suffix string) (Ring, error) {
// GetRing returns the current ring given the ring_type ("account", "container", "object"),
// hash path prefix, and hash path suffix. An error is raised if the requested ring does
// not exist.
func GetRing(ring_type, prefix, suffix string) (Ring, error) {
func GetRing(ringType, prefix, suffix string, policy int) (Ring, error) {
var ring Ring
var err error
if ring, err = LoadRing(fmt.Sprintf("/etc/hummingbird/%s.ring.gz", ring_type), prefix, suffix); err != nil {
if ring, err = LoadRing(fmt.Sprintf("/etc/swift/%s.ring.gz", ring_type), prefix, suffix); err != nil {
return nil, fmt.Errorf("Error loading %s ring", ring_type)
ringFile := fmt.Sprintf("%s.ring.gz", ringType)
if policy != 0 {
ringFile = fmt.Sprintf("%s-%d.ring.gz", ringType, policy)
}
if ring, err = LoadRing(fmt.Sprintf("/etc/hummingbird/%s", ringFile), prefix, suffix); err != nil {
if ring, err = LoadRing(fmt.Sprintf("/etc/swift/%s", ringFile), prefix, suffix); err != nil {
return nil, fmt.Errorf("Error loading %s:%d ring", ringType, policy)
}
}
return ring, nil

View File

@ -17,6 +17,7 @@ package hummingbird
import (
"net/http"
"strconv"
"strings"
)
@ -33,6 +34,7 @@ type matcher struct {
catchallName string
vars []variable
static []variable
policy int
handler http.Handler
}
@ -42,6 +44,8 @@ type router struct {
MethodNotAllowedHandler http.Handler
}
const anyPolicy = -1
// Split a string in twain on sep. Doing it this way over strings.Split*() saves allocating a slice.
func Split2(path string, sep string) (string, string) {
if nextSlash := strings.Index(path, sep); nextSlash == -1 {
@ -52,13 +56,16 @@ func Split2(path string, sep string) (string, string) {
}
// Given a method and path, return the handler and vars that should be used to serve them.
func (r *router) route(method, path string) (http.Handler, map[string]string) {
func (r *router) route(method, path string, policy int) (http.Handler, map[string]string) {
methodFound := false
path = path[1:]
slashCount := strings.Count(path, "/")
// This code is slightly gnarly because it avoids allocating anything until it's sure of a match.
NEXTMATCH:
for _, m := range r.matchers {
if m.policy != anyPolicy && m.policy != policy {
continue
}
if m.method != method {
continue
}
@ -113,10 +120,10 @@ NEXTMATCH:
return r.NotFoundHandler, nil
}
// Register a handler for the given method and pattern.
// HandlePolicy registers a handler for the given method, pattern, and policy header.
// The pattern is pretty much what you're used to, i.e. /static/:variable/*catchall
func (r *router) Handle(method, pattern string, handler http.Handler) {
m := &matcher{method: method}
func (r *router) HandlePolicy(method, pattern string, policy int, handler http.Handler) {
m := &matcher{method: method, policy: policy}
parts := strings.Split(pattern[1:], "/")
m.length = len(parts)
if pattern[len(pattern)-1] == '/' {
@ -136,6 +143,12 @@ func (r *router) Handle(method, pattern string, handler http.Handler) {
r.matchers = append(r.matchers, m)
}
// Handle registers a handler for the given method and pattern.
// The pattern is pretty much what you're used to, i.e. /static/:variable/*catchall
func (r *router) Handle(method, pattern string, handler http.Handler) {
r.HandlePolicy(method, pattern, anyPolicy, handler)
}
func (r *router) Get(path string, handler http.Handler) {
r.Handle("GET", path, handler)
}
@ -165,7 +178,11 @@ func (r *router) Post(path string, handler http.Handler) {
}
func (r *router) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
handler, vars := r.route(request.Method, request.URL.Path)
policy, err := strconv.Atoi(request.Header.Get("X-Backend-Storage-Policy-Index"))
if err != nil {
policy = 0
}
handler, vars := r.route(request.Method, request.URL.Path, policy)
SetVars(request, vars)
handler.ServeHTTP(writer, request)
}

View File

@ -19,7 +19,6 @@ import (
"net/url"
"testing"
"github.com/dimfeld/httptreemux"
"github.com/stretchr/testify/assert"
)
@ -79,6 +78,9 @@ func TestRouterDisambiguation(t *testing.T) {
addRoute("REPLICATE", "/:device/:partition", "REPLICATE")
addRoute("SYNC", "/:device/*relpath", "SYNC")
addRoute("GET", "/info", "INFO")
router.HandlePolicy("GET", "/reconstruct", 10, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handledBy = "POLICY_VERB_WORKED"
}))
router.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handledBy = "NOT_FOUND"
vars = nil
@ -158,6 +160,16 @@ func TestRouterDisambiguation(t *testing.T) {
makeRequest("GET", "/something/bad")
assert.Equal(t, "NOT_FOUND", handledBy)
assert.Nil(t, vars)
req, _ := http.NewRequest("GET", "/reconstruct", nil)
req.URL, _ = url.Parse("/reconstruct")
req.RequestURI = "/reconstruct"
router.ServeHTTP(mockResponseWriter{}, req)
assert.Equal(t, "NOT_FOUND", handledBy)
req.Header.Set("X-Backend-Storage-Policy-Index", "10")
router.ServeHTTP(mockResponseWriter{}, req)
assert.Equal(t, "POLICY_VERB_WORKED", handledBy)
}
func BenchmarkRouteObject(b *testing.B) {
@ -182,27 +194,3 @@ func BenchmarkRouteObject(b *testing.B) {
router.ServeHTTP(w, r)
}
}
func BenchmarkRouteObjectTree(b *testing.B) {
router := httptreemux.New()
// calls SetVars() to be fair. I mean, we could just pass the vars down too.
objGet := func(w http.ResponseWriter, r *http.Request, ps map[string]string) { SetVars(r, ps) }
router.GET("/healthcheck", objGet)
router.GET("/diskusage", objGet)
router.GET("/recon/:method", objGet)
router.HEAD("/:device/:partition/:account/:container/*obj", objGet)
router.PUT("/:device/:partition/:account/:container/*obj", objGet)
router.DELETE("/:device/:partition/:account/:container/*obj", objGet)
router.Handle("REPLICATE", "/:device/:partition/:suffixes", objGet)
router.Handle("REPLICATE", "/:device/:partition", objGet)
router.Handle("SYNC", "/:device/*relpath", objGet)
router.GET("/:device/:partition/:account/:container/*obj", objGet)
r := newRequest("GET", "/sda/123/acc/cont/a/bunch/of/stuff")
w := mockResponseWriter{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
router.ServeHTTP(w, r)
}
}

View File

@ -197,22 +197,27 @@ func (a *Auditor) auditDevice(devPath string) {
return
}
objPath := filepath.Join(devPath, "objects")
partitions, err := hummingbird.ReadDirNames(objPath)
if err != nil {
a.errors++
a.totalErrors++
a.LogError("Error reading objects dir %s", objPath)
return
}
for _, partition := range partitions {
_, intErr := strconv.ParseInt(partition, 10, 64)
partitionDir := filepath.Join(objPath, partition)
if finfo, err := os.Stat(partitionDir); err != nil || intErr != nil || !finfo.Mode().IsDir() {
a.LogError("Skipping invalid file in objects directory: %s", partitionDir)
for _, policy := range hummingbird.LoadPolicies() {
if policy.Type != "replication" {
continue
}
a.auditPartition(partitionDir)
objPath := filepath.Join(devPath, PolicyDir(policy.Index))
partitions, err := hummingbird.ReadDirNames(objPath)
if err != nil {
a.errors++
a.totalErrors++
a.LogError("Error reading objects dir: %s", objPath)
continue
}
for _, partition := range partitions {
_, intErr := strconv.ParseInt(partition, 10, 64)
partitionDir := filepath.Join(objPath, partition)
if finfo, err := os.Stat(partitionDir); err != nil || intErr != nil || !finfo.Mode().IsDir() {
a.LogError("Skipping invalid file in objects directory: %s", partitionDir)
continue
}
a.auditPartition(partitionDir)
}
}
}

View File

@ -49,6 +49,24 @@ type AtomicFileWriter interface {
Preallocate(int64, int64) error
}
func PolicyDir(policy int) string {
if policy == 0 {
return "objects"
}
return fmt.Sprintf("objects-%d", policy)
}
func UnPolicyDir(dir string) (int, error) {
if dir == "objects" {
return 0, nil
}
var policy int
if n, err := fmt.Sscanf(dir, "objects-%d", &policy); n == 1 && err == nil {
return policy, nil
}
return 0, fmt.Errorf("Unable to parse policy from dir")
}
func RawReadMetadata(fileNameOrFd interface{}) ([]byte, error) {
var pickledMetadata []byte
offset := 0
@ -130,14 +148,14 @@ func WriteMetadata(fd uintptr, v map[string]string) error {
func QuarantineHash(hashDir string) error {
// FYI- this does not invalidate the hash like swift's version. Please
// do that yourself
hash := filepath.Base(hashDir)
// drive objects partition suffix hash
driveDir := filepath.Dir(filepath.Dir(filepath.Dir(filepath.Dir(hashDir))))
// TODO: this will need to be slightly more complicated once policies
quarantineDir := filepath.Join(driveDir, "quarantined", "objects")
// objects partition suffix hash
objsDir := filepath.Dir(filepath.Dir(filepath.Dir(hashDir)))
driveDir := filepath.Dir(objsDir)
quarantineDir := filepath.Join(driveDir, "quarantined", filepath.Base(objsDir))
if err := os.MkdirAll(quarantineDir, 0755); err != nil {
return err
}
hash := filepath.Base(hashDir)
destDir := filepath.Join(quarantineDir, hash+"-"+hummingbird.UUID())
if err := os.Rename(hashDir, destDir); err != nil {
return err
@ -251,8 +269,8 @@ func RecalculateSuffixHash(suffixDir string, reclaimAge int64) (string, *humming
return hex.EncodeToString(h.Sum(nil)), nil
}
func GetHashes(driveRoot string, device string, partition string, recalculate []string, reclaimAge int64, logger hummingbird.LoggingContext) (map[string]string, *hummingbird.BackendError) {
partitionDir := filepath.Join(driveRoot, device, "objects", partition)
func GetHashes(driveRoot string, device string, partition string, recalculate []string, reclaimAge int64, policy int, logger hummingbird.LoggingContext) (map[string]string, *hummingbird.BackendError) {
partitionDir := filepath.Join(driveRoot, device, PolicyDir(policy), partition)
pklFile := filepath.Join(partitionDir, "hashes.pkl")
invalidFile := filepath.Join(partitionDir, "hashes.invalid")
@ -307,7 +325,7 @@ func GetHashes(driveRoot string, device string, partition string, recalculate []
for suffix, hash := range hashes {
if hash == "" {
modified = true
suffixDir := driveRoot + "/" + device + "/objects/" + partition + "/" + suffix
suffixDir := filepath.Join(partitionDir, suffix)
recalc_hash, err := RecalculateSuffixHash(suffixDir, reclaimAge)
if err == nil {
hashes[suffix] = recalc_hash
@ -340,18 +358,18 @@ func GetHashes(driveRoot string, device string, partition string, recalculate []
}
logger.LogError("Made recursive call to GetHashes: %s", partitionDir)
partitionLock.Close()
return GetHashes(driveRoot, device, partition, recalculate, reclaimAge, logger)
return GetHashes(driveRoot, device, partition, recalculate, reclaimAge, policy, logger)
}
}
return hashes, nil
}
func ObjHashDir(vars map[string]string, driveRoot string, hashPathPrefix string, hashPathSuffix string) string {
func ObjHashDir(vars map[string]string, driveRoot string, hashPathPrefix string, hashPathSuffix string, policy int) string {
h := md5.New()
io.WriteString(h, hashPathPrefix+"/"+vars["account"]+"/"+vars["container"]+"/"+vars["obj"]+hashPathSuffix)
hexHash := hex.EncodeToString(h.Sum(nil))
suffix := hexHash[29:32]
return filepath.Join(driveRoot, vars["device"], "objects", vars["partition"], suffix, hexHash)
return filepath.Join(driveRoot, vars["device"], PolicyDir(policy), vars["partition"], suffix, hexHash)
}
func ObjectFiles(directory string) (string, string) {

View File

@ -26,6 +26,7 @@ import (
"github.com/openstack/swift/go/hummingbird"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestWriteReadMetadata(t *testing.T) {
@ -62,7 +63,7 @@ func TestGetHashes(t *testing.T) {
f, _ = os.Create(filepath.Join(driveRoot, "sda", "objects", "1", "abc", "00000000000000000000000000000abc", "67890.data"))
f.Close()
hashes, err := GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), nil)
hashes, err := GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), 0, nil)
assert.Nil(t, err)
assert.Equal(t, "b1589029b7db9d01347caece2159d588", hashes["abc"])
@ -71,12 +72,12 @@ func TestGetHashes(t *testing.T) {
f.Close()
// make sure hash for "abc" isn't recalculated yet.
hashes, err = GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), nil)
hashes, err = GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), 0, nil)
assert.Nil(t, err)
assert.Equal(t, "b1589029b7db9d01347caece2159d588", hashes["abc"])
// force recalculate of "abc"
hashes, err = GetHashes(driveRoot, "sda", "1", []string{"abc"}, int64(hummingbird.ONE_WEEK), nil)
hashes, err = GetHashes(driveRoot, "sda", "1", []string{"abc"}, int64(hummingbird.ONE_WEEK), 0, nil)
assert.Nil(t, err)
assert.Equal(t, "8834e84467693c2e8f670f4afbea5334", hashes["abc"])
}
@ -91,7 +92,7 @@ func TestInvalidateHash(t *testing.T) {
f, _ = os.Create(filepath.Join(driveRoot, "sda", "objects", "1", "abc", "00000000000000000000000000000abc", "67890.data"))
f.Close()
hashes, err := GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), nil)
hashes, err := GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), 0, nil)
assert.Nil(t, err)
assert.Equal(t, "b1589029b7db9d01347caece2159d588", hashes["abc"])
@ -100,13 +101,44 @@ func TestInvalidateHash(t *testing.T) {
f.Close()
// make sure hash for "abc" isn't recalculated yet.
hashes, err = GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), nil)
hashes, err = GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), 0, nil)
assert.Nil(t, err)
assert.Equal(t, "b1589029b7db9d01347caece2159d588", hashes["abc"])
// invalidate hash of suffix "abc"
InvalidateHash(filepath.Join(driveRoot, "", "sda", "objects", "1", "abc", "00000000000000000000000000000abc"))
hashes, err = GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), nil)
hashes, err = GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), 0, nil)
assert.Nil(t, err)
assert.Equal(t, "8834e84467693c2e8f670f4afbea5334", hashes["abc"])
}
func TestPolicyDir(t *testing.T) {
policy, err := UnPolicyDir("objects")
require.Nil(t, err)
require.Equal(t, "objects", PolicyDir(policy))
policy, err = UnPolicyDir("objects-1")
require.Nil(t, err)
require.Equal(t, "objects-1", PolicyDir(policy))
policy, err = UnPolicyDir("objects-100")
require.Nil(t, err)
require.Equal(t, "objects-100", PolicyDir(policy))
}
func TestQuarantineHash(t *testing.T) {
driveRoot, _ := ioutil.TempDir("", "")
defer os.RemoveAll(driveRoot)
hashDir := filepath.Join(driveRoot, "sda", "objects", "1", "abc", "fffffffffffffffffffffffffffffabc")
os.MkdirAll(hashDir, 0777)
QuarantineHash(hashDir)
require.True(t, hummingbird.Exists(filepath.Join(driveRoot, "sda", "quarantined", "objects")))
require.False(t, hummingbird.Exists(hashDir))
hashDir = filepath.Join(driveRoot, "sdb", "objects-1", "1", "abc", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
os.MkdirAll(hashDir, 0777)
QuarantineHash(hashDir)
require.True(t, hummingbird.Exists(filepath.Join(driveRoot, "sdb", "quarantined", "objects-1")))
require.False(t, hummingbird.Exists(hashDir))
}

View File

@ -18,7 +18,6 @@ package objectserver
import (
"crypto/md5"
"encoding/hex"
"errors"
"flag"
"fmt"
"io"
@ -36,8 +35,6 @@ import (
"github.com/openstack/swift/go/middleware"
)
var DefaultObjEngine = "swift"
type ObjectServer struct {
driveRoot string
hashPathPrefix string
@ -50,13 +47,25 @@ type ObjectServer struct {
diskInUse *hummingbird.KeyedLimit
expiringDivisor int64
updateClient *http.Client
objEngine ObjectEngine
objEngines map[int]ObjectEngine
}
func (server *ObjectServer) newObject(req *http.Request, vars map[string]string, needData bool) (Object, error) {
policy, err := strconv.Atoi(req.Header.Get("X-Backend-Storage-Policy-Index"))
if err != nil {
policy = 0
}
engine, ok := server.objEngines[policy]
if !ok {
return nil, fmt.Errorf("Engine for policy index %d not found.", policy)
}
return engine.New(vars, needData)
}
func (server *ObjectServer) ObjGetHandler(writer http.ResponseWriter, request *http.Request) {
vars := hummingbird.GetVars(request)
headers := writer.Header()
obj, err := server.objEngine.New(vars, request.Method == "GET")
obj, err := server.newObject(request, vars, request.Method == "GET")
if err != nil {
hummingbird.GetLogger(request).LogError("Unable to open object: %v", err)
hummingbird.StandardResponse(writer, http.StatusInternalServerError)
@ -206,7 +215,7 @@ func (server *ObjectServer) ObjPutHandler(writer http.ResponseWriter, request *h
}
}
obj, err := server.objEngine.New(vars, false)
obj, err := server.newObject(request, vars, false)
if err != nil {
hummingbird.GetLogger(request).LogError("Error getting obj: %s", err.Error())
hummingbird.StandardResponse(writer, http.StatusInternalServerError)
@ -295,7 +304,7 @@ func (server *ObjectServer) ObjDeleteHandler(writer http.ResponseWriter, request
}
responseStatus := http.StatusNotFound
obj, err := server.objEngine.New(vars, false)
obj, err := server.newObject(request, vars, false)
if err != nil {
hummingbird.GetLogger(request).LogError("Error getting obj: %s", err.Error())
hummingbird.StandardResponse(writer, http.StatusInternalServerError)
@ -471,9 +480,11 @@ func (server *ObjectServer) GetHandler(config hummingbird.Config) http.Handler {
router.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, fmt.Sprintf("Invalid path: %s", r.URL.Path), http.StatusBadRequest)
})
server.objEngine.RegisterHandlers(func(method, path string, handler http.HandlerFunc) {
router.Handle(method, path, commonHandlers.ThenFunc(handler))
})
for policy, objEngine := range server.objEngines {
objEngine.RegisterHandlers(func(method, path string, handler http.HandlerFunc) {
router.HandlePolicy(method, path, policy, commonHandlers.ThenFunc(handler))
})
}
return alice.New(middleware.GrepObject).Then(router)
}
@ -490,13 +501,18 @@ func GetServer(serverconf hummingbird.Config, flags *flag.FlagSet) (bindIP strin
if err != nil {
return "", 0, nil, nil, err
}
objEngineName := serverconf.GetDefault("app:object-server", "object_engine", DefaultObjEngine)
if newEngineFactory, err := FindEngine(objEngineName); err != nil {
return "", 0, nil, nil, errors.New("Unable to find object engine")
} else if server.objEngine, err = newEngineFactory(serverconf, flags); err != nil {
return "", 0, nil, nil, err
server.objEngines = make(map[int]ObjectEngine)
for _, policy := range hummingbird.LoadPolicies() {
if newEngine, err := FindEngine(policy.Type); err != nil {
return "", 0, nil, nil, fmt.Errorf("Unable to find object engine type %s: %v", policy.Type, err)
} else {
server.objEngines[policy.Index], err = newEngine(serverconf, policy, flags)
if err != nil {
return "", 0, nil, nil, fmt.Errorf("Error instantiating object engine type %s: %v", policy.Type, err)
}
}
}
server.driveRoot = serverconf.GetDefault("app:object-server", "devices", "/srv/node")
server.checkMounts = serverconf.GetBool("app:object-server", "mount_check", true)
server.checkEtags = serverconf.GetBool("app:object-server", "check_etags", false)

View File

@ -52,6 +52,7 @@ func (t *TestServer) Close() {
func (t *TestServer) Do(method string, path string, body io.ReadCloser) (*http.Response, error) {
req, err := http.NewRequest(method, fmt.Sprintf("http://%s:%d%s", t.host, t.port, path), body)
req.Header.Set("X-Backend-Storage-Policy-Index", "0")
if err != nil {
return nil, err
}

View File

@ -61,7 +61,7 @@ type ObjectEngine interface {
}
// ObjectEngineConstructor> is a function that, given configs and flags, returns an ObjectEngine
type ObjectEngineConstructor func(hummingbird.Config, *flag.FlagSet) (ObjectEngine, error)
type ObjectEngineConstructor func(hummingbird.Config, *hummingbird.Policy, *flag.FlagSet) (ObjectEngine, error)
type engineFactoryEntry struct {
name string

View File

@ -26,7 +26,7 @@ import (
func TestObjectEngineRegistry(t *testing.T) {
testErr := errors.New("Not implemented")
constructor := func(hummingbird.Config, *flag.FlagSet) (ObjectEngine, error) {
constructor := func(hummingbird.Config, *hummingbird.Policy, *flag.FlagSet) (ObjectEngine, error) {
return nil, testErr
}
@ -34,7 +34,7 @@ func TestObjectEngineRegistry(t *testing.T) {
fconstructor, err := FindEngine("test")
require.Nil(t, err)
eng, err := fconstructor(hummingbird.Config{}, nil)
eng, err := fconstructor(hummingbird.Config{}, nil, nil)
require.Nil(t, eng)
require.Equal(t, err, testErr)

View File

@ -18,8 +18,10 @@ package objectserver
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"net/http"
"os"
"strconv"
"strings"
"sync"
@ -143,21 +145,29 @@ func getPartMoveJobs(oldRing, newRing hummingbird.Ring) []*PriorityRepJob {
// MoveParts takes two object .ring.gz files as []string{oldRing, newRing} and dispatches priority replication jobs to rebalance data in line with any ring changes.
func MoveParts(args []string) {
if len(args) != 1 {
fmt.Println("USAGE: hummingbird moveparts [old ringfile]")
flags := flag.NewFlagSet("moveparts", flag.ExitOnError)
policy := flags.Int("p", 0, "policy index to use")
flags.Usage = func() {
fmt.Fprintf(os.Stderr, "USAGE: hummingbird moveparts [old ringfile]")
flags.PrintDefaults()
}
flags.Parse(args)
if len(flags.Args()) != 1 {
flags.Usage()
return
}
hashPathPrefix, hashPathSuffix, err := hummingbird.GetHashPrefixAndSuffix()
if err != nil {
fmt.Println("Unable to load hash path prefix and suffix:", err)
return
}
oldRing, err := hummingbird.LoadRing(args[0], hashPathPrefix, hashPathSuffix)
oldRing, err := hummingbird.LoadRing(flags.Arg(0), hashPathPrefix, hashPathSuffix)
if err != nil {
fmt.Println("Unable to load old ring:", err)
return
}
curRing, err := hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix)
curRing, err := hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix, *policy)
if err != nil {
fmt.Println("Unable to load current ring:", err)
return
@ -193,22 +203,30 @@ func getRestoreDeviceJobs(ring hummingbird.Ring, ip string, devName string) []*P
// RestoreDevice takes an IP address and device name such as []string{"172.24.0.1", "sda1"} and attempts to restores its data from peers.
func RestoreDevice(args []string) {
if len(args) != 2 {
fmt.Println("USAGE: hummingbird restoredevice [ip] [device]")
flags := flag.NewFlagSet("restoredevice", flag.ExitOnError)
policy := flags.Int("p", 0, "policy index to use")
flags.Usage = func() {
fmt.Fprintf(os.Stderr, "USAGE: hummingbird restoredevice [ip] [device]\n")
flags.PrintDefaults()
}
flags.Parse(args)
if len(flags.Args()) != 2 {
flags.Usage()
return
}
hashPathPrefix, hashPathSuffix, err := hummingbird.GetHashPrefixAndSuffix()
if err != nil {
fmt.Println("Unable to load hash path prefix and suffix:", err)
return
}
objRing, err := hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix)
objRing, err := hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix, *policy)
if err != nil {
fmt.Println("Unable to load ring:", err)
return
}
client := &http.Client{Timeout: time.Hour}
jobs := getRestoreDeviceJobs(objRing, args[0], args[1])
jobs := getRestoreDeviceJobs(objRing, flags.Arg(0), flags.Arg(1))
fmt.Println("Job count:", len(jobs))
doPriRepJobs(jobs, 2, client)
fmt.Println("Done sending jobs.")
@ -231,21 +249,29 @@ func getRescuePartsJobs(objRing hummingbird.Ring, partitions []uint64) []*Priori
}
func RescueParts(args []string) {
if len(args) != 1 {
fmt.Println("USAGE: hummingbird rescueparts partnum1,partnum2,...")
flags := flag.NewFlagSet("rescueparts", flag.ExitOnError)
policy := flags.Int("p", 0, "policy index to use")
flags.Usage = func() {
fmt.Fprintf(os.Stderr, "USAGE: hummingbird rescueparts partnum1,partnum2,...\n")
flags.PrintDefaults()
}
flags.Parse(args)
if len(flags.Args()) != 1 {
flags.Usage()
return
}
hashPathPrefix, hashPathSuffix, err := hummingbird.GetHashPrefixAndSuffix()
if err != nil {
fmt.Println("Unable to load hash path prefix and suffix:", err)
return
}
objRing, err := hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix)
objRing, err := hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix, *policy)
if err != nil {
fmt.Println("Unable to load ring:", err)
return
}
partsStr := strings.Split(args[0], ",")
partsStr := strings.Split(flags.Arg(0), ",")
partsInt := make([]uint64, len(partsStr))
for i, p := range partsStr {
partsInt[i], err = strconv.ParseUint(p, 10, 64)

View File

@ -24,6 +24,7 @@ import (
"net"
"net/http"
"net/http/httputil"
"strconv"
"time"
"github.com/openstack/swift/go/hummingbird"
@ -140,9 +141,10 @@ func (r *RepConn) Close() {
r.c.Close()
}
func NewRepConn(dev *hummingbird.Device, partition string) (*RepConn, error) {
func NewRepConn(dev *hummingbird.Device, partition string, policy int) (*RepConn, error) {
url := fmt.Sprintf("http://%s:%d/%s/%s", dev.ReplicationIp, dev.ReplicationPort, dev.Device, partition)
req, err := http.NewRequest("REPCONN", url, nil)
req.Header.Set("X-Backend-Storage-Policy-Index", strconv.Itoa(policy))
if err != nil {
return nil, err
}

View File

@ -44,6 +44,7 @@ type job struct {
dev *hummingbird.Device
partition string
objPath string
policy int
}
type ReplicationData struct {
@ -63,6 +64,7 @@ type PriorityRepJob struct {
Partition uint64 `json:"partition"`
FromDevice *hummingbird.Device `json:"from_device"`
ToDevices []*hummingbird.Device `json:"to_devices"`
Policy int `json:"policy"`
}
type deviceProgress struct {
@ -92,7 +94,6 @@ type Replicator struct {
bindIp string
logger hummingbird.SysLogLike
port int
Ring hummingbird.Ring
devGroup sync.WaitGroup
partRateTicker *time.Ticker
timePerPart time.Duration
@ -104,6 +105,7 @@ type Replicator struct {
priRepChans map[int]chan PriorityRepJob
priRepM sync.Mutex
reclaimAge int64
Rings map[int]hummingbird.Ring
once bool
cancelers map[string]chan struct{}
@ -204,8 +206,8 @@ func (r *Replicator) getFile(filePath string) (fp *os.File, xattrs []byte, size
return fp, rawxattr, finfo.Size(), nil
}
func (r *Replicator) beginReplication(dev *hummingbird.Device, partition string, hashes bool, rChan chan ReplicationData) {
rc, err := NewRepConn(dev, partition)
func (r *Replicator) beginReplication(dev *hummingbird.Device, partition string, hashes bool, policy int, rChan chan ReplicationData) {
rc, err := NewRepConn(dev, partition, policy)
if err != nil {
r.LogError("[beginReplication] error creating new request: %v", err)
rChan <- ReplicationData{dev: dev, conn: nil, hashes: nil, err: err}
@ -365,7 +367,7 @@ func (r *Replicator) replicateLocal(j *job, nodes []*hummingbird.Device, moreNod
startGetHashesRemote := time.Now()
rChan := make(chan ReplicationData)
for i := 0; i < len(nodes); i++ {
go r.beginReplication(nodes[i], j.partition, true, rChan)
go r.beginReplication(nodes[i], j.partition, true, j.policy, rChan)
}
for i := 0; i < len(nodes); i++ {
rData := <-rChan
@ -375,7 +377,7 @@ func (r *Replicator) replicateLocal(j *job, nodes []*hummingbird.Device, moreNod
remoteConnections[rData.dev.Id] = rData.conn
} else if rData.err == RepUnmountedError {
if nextNode := moreNodes.Next(); nextNode != nil {
go r.beginReplication(nextNode, j.partition, true, rChan)
go r.beginReplication(nextNode, j.partition, true, j.policy, rChan)
nodes = append(nodes, nextNode)
}
}
@ -388,7 +390,7 @@ func (r *Replicator) replicateLocal(j *job, nodes []*hummingbird.Device, moreNod
startGetHashesLocal := time.Now()
recalc := []string{}
hashes, herr := GetHashes(r.driveRoot, j.dev.Device, j.partition, recalc, r.reclaimAge, r)
hashes, herr := GetHashes(r.driveRoot, j.dev.Device, j.partition, recalc, r.reclaimAge, j.policy, r)
if herr != nil {
r.LogError("[replicateLocal] error getting local hashes: %v", herr)
return
@ -401,7 +403,7 @@ func (r *Replicator) replicateLocal(j *job, nodes []*hummingbird.Device, moreNod
}
}
}
hashes, herr = GetHashes(r.driveRoot, j.dev.Device, j.partition, recalc, r.reclaimAge, r)
hashes, herr = GetHashes(r.driveRoot, j.dev.Device, j.partition, recalc, r.reclaimAge, j.policy, r)
if herr != nil {
r.LogError("[replicateLocal] error recalculating local hashes: %v", herr)
return
@ -456,7 +458,7 @@ func (r *Replicator) replicateHandoff(j *job, nodes []*hummingbird.Device) {
rChan := make(chan ReplicationData)
nodesNeeded := len(nodes)
for i := 0; i < nodesNeeded; i++ {
go r.beginReplication(nodes[i], j.partition, false, rChan)
go r.beginReplication(nodes[i], j.partition, false, j.policy, rChan)
}
for i := 0; i < nodesNeeded; i++ {
rData := <-rChan
@ -538,25 +540,43 @@ func (r *Replicator) replicateDevice(dev *hummingbird.Device, canceler chan stru
r.LogError("[replicateDevice] Drive not mounted: %s", dev.Device)
break
}
if hummingbird.Exists(filepath.Join(r.driveRoot, dev.Device, "lock_device")) {
break
}
r.cleanTemp(dev)
objPath := filepath.Join(r.driveRoot, dev.Device, "objects")
if fi, err := os.Stat(objPath); err != nil || !fi.Mode().IsDir() {
r.LogError("[replicateDevice] No objects found: %s", objPath)
break
}
partitionList, err := filepath.Glob(filepath.Join(objPath, "[0-9]*"))
if err != nil {
r.LogError("[replicateDevice] Error getting partition list: %s (%v)", objPath, err)
break
}
// if one of the breaks above triggers, the monitorer should retry after ReplicateDeviceTimeout
for i := len(partitionList) - 1; i > 0; i-- { // shuffle partition list
j := rand.Intn(i + 1)
partitionList[j], partitionList[i] = partitionList[i], partitionList[j]
jobList := make([]job, 0)
for policy := range r.Rings {
objPath := filepath.Join(r.driveRoot, dev.Device, PolicyDir(policy))
if fi, err := os.Stat(objPath); err != nil || !fi.Mode().IsDir() {
continue
}
policyPartitions, err := filepath.Glob(filepath.Join(objPath, "[0-9]*"))
if err != nil {
r.LogError("[replicateDevice] Error getting partition list: %s (%v)", objPath, err)
continue
}
for _, partition := range policyPartitions {
partition = filepath.Base(partition)
if _, ok := r.partitions[partition]; len(r.partitions) > 0 && !ok {
continue
}
if _, err := strconv.ParseUint(partition, 10, 64); err == nil {
jobList = append(jobList,
job{objPath: objPath,
partition: partition,
dev: dev,
policy: policy})
}
}
}
numPartitions := uint64(len(jobList))
if numPartitions == 0 {
r.LogError("[replicateDevice] No objects found: %s", filepath.Join(r.driveRoot, dev.Device))
}
numPartitions := uint64(len(partitionList))
partitionsProcessed := uint64(0)
r.deviceProgressPassInit <- deviceProgress{
@ -565,10 +585,12 @@ func (r *Replicator) replicateDevice(dev *hummingbird.Device, canceler chan stru
LastPassDuration: lastPassDuration,
}
for _, partition := range partitionList {
if hummingbird.Exists(filepath.Join(r.driveRoot, dev.Device, "lock_device")) {
break
}
for i := len(jobList) - 1; i > 0; i-- { // shuffle job list
j := rand.Intn(i + 1)
jobList[j], jobList[i] = jobList[i], jobList[j]
}
for _, j := range jobList {
select {
case <-canceler:
{
@ -582,31 +604,29 @@ func (r *Replicator) replicateDevice(dev *hummingbird.Device, canceler chan stru
default:
}
r.processPriorityJobs(dev.Id)
if _, ok := r.partitions[filepath.Base(partition)]; len(r.partitions) > 0 && !ok {
continue
}
if partitioni, err := strconv.ParseUint(filepath.Base(partition), 10, 64); err == nil {
func() {
<-r.partRateTicker.C
r.concurrencySem <- struct{}{}
r.deviceProgressIncr <- deviceProgress{
dev: dev,
PartitionsDone: 1,
}
j := &job{objPath: objPath, partition: filepath.Base(partition), dev: dev}
nodes, handoff := r.Ring.GetJobNodes(partitioni, j.dev.Id)
defer func() {
<-r.concurrencySem
}()
partitionsProcessed += 1
if handoff {
r.replicateHandoff(j, nodes)
} else {
r.replicateLocal(j, nodes, r.Ring.GetMoreNodes(partitioni))
}
func() {
<-r.partRateTicker.C
r.concurrencySem <- struct{}{}
defer func() {
<-r.concurrencySem
}()
}
r.deviceProgressIncr <- deviceProgress{
dev: dev,
PartitionsDone: 1,
}
partitioni, err := strconv.ParseUint(j.partition, 10, 64)
if err != nil {
return
}
nodes, handoff := r.Rings[j.policy].GetJobNodes(partitioni, j.dev.Id)
partitionsProcessed += 1
if handoff {
r.replicateHandoff(&j, nodes)
} else {
r.replicateLocal(&j, nodes, r.Rings[j.policy].GetMoreNodes(partitioni))
}
}()
}
if partitionsProcessed >= numPartitions {
r.deviceProgressIncr <- deviceProgress{
@ -624,16 +644,22 @@ func (r *Replicator) replicateDevice(dev *hummingbird.Device, canceler chan stru
// restartDevices should be called periodically from the statsReporter thread to make sure devices that should be replicating are.
func (r *Replicator) restartDevices() {
shouldBeRunning := make(map[string]bool)
if localDevices, err := r.Ring.LocalDevices(r.port); err != nil {
r.LogError("Error getting local devices: %v", err)
return
} else {
// launch replication for any new devices
for _, dev := range localDevices {
shouldBeRunning[dev.Device] = true
if _, ok := r.deviceProgressMap[dev.Device]; !ok {
r.restartReplicateDevice(dev)
}
deviceMap := map[string]*hummingbird.Device{}
for _, ring := range r.Rings {
devices, err := ring.LocalDevices(r.port)
if err != nil {
r.LogError("Error getting local devices: %v", err)
return
}
for _, dev := range devices {
deviceMap[dev.Device] = dev
}
}
// launch replication for any new devices
for _, dev := range deviceMap {
shouldBeRunning[dev.Device] = true
if _, ok := r.deviceProgressMap[dev.Device]; !ok {
r.restartReplicateDevice(dev)
}
}
// kill any replicators that are no longer in the ring
@ -756,12 +782,18 @@ func (r *Replicator) run() {
r.partRateTicker = time.NewTicker(r.timePerPart)
r.concurrencySem = make(chan struct{}, r.concurrency)
localDevices, err := r.Ring.LocalDevices(r.port)
if err != nil {
r.LogError("Error getting local devices: %v", err)
return
deviceMap := map[string]*hummingbird.Device{}
for _, ring := range r.Rings {
devices, err := ring.LocalDevices(r.port)
if err != nil {
r.LogError("Error getting local devices: %v", err)
return
}
for _, dev := range devices {
deviceMap[dev.Device] = dev
}
}
for _, dev := range localDevices {
for _, dev := range deviceMap {
if _, ok := r.devices[dev.Device]; ok || len(r.devices) == 0 {
r.restartReplicateDevice(dev)
}
@ -786,8 +818,9 @@ func (r *Replicator) processPriorityJobs(id int) {
dev: pri.FromDevice,
partition: strconv.FormatUint(pri.Partition, 10),
objPath: filepath.Join(r.driveRoot, pri.FromDevice.Device, "objects"),
policy: pri.Policy,
}
_, handoff := r.Ring.GetJobNodes(pri.Partition, pri.FromDevice.Id)
_, handoff := r.Rings[pri.Policy].GetJobNodes(pri.Partition, pri.FromDevice.Id)
defer func() {
<-r.concurrencySem
}()
@ -920,6 +953,15 @@ func NewReplicator(serverconf hummingbird.Config, flags *flag.FlagSet) (hummingb
if err != nil {
return nil, fmt.Errorf("Unable to get hash prefix and suffix")
}
replicator.Rings = make(map[int]hummingbird.Ring)
for _, policy := range hummingbird.LoadPolicies() {
if policy.Type != "replication" {
continue
}
if replicator.Rings[policy.Index], err = hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix, policy.Index); err != nil {
return nil, fmt.Errorf("Unable to load ring.")
}
}
replicator.reconCachePath = serverconf.GetDefault("object-auditor", "recon_cache_path", "/var/cache/swift")
replicator.checkMounts = serverconf.GetBool("object-replicator", "mount_check", true)
replicator.driveRoot = serverconf.GetDefault("object-replicator", "devices", "/srv/node")
@ -935,9 +977,6 @@ func NewReplicator(serverconf hummingbird.Config, flags *flag.FlagSet) (hummingb
replicator.timePerPart = time.Duration(serverconf.GetInt("object-replicator", "ms_per_part", 750)) * time.Millisecond
}
replicator.concurrency = int(serverconf.GetInt("object-replicator", "concurrency", 1))
if replicator.Ring, err = hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix); err != nil {
return nil, fmt.Errorf("Unable to load ring.")
}
devices_flag := flags.Lookup("devices")
if devices_flag != nil {
if devices := devices_flag.Value.(flag.Getter).Get().(string); len(devices) > 0 {

View File

@ -373,7 +373,7 @@ func TestReplicationLocal(t *testing.T) {
replicator, err := makeReplicator("bind_port", fmt.Sprintf("%d", ts.port))
require.Nil(t, err)
replicator.driveRoot = ts.objServer.driveRoot
replicator.Ring = &FakeRepRing1{ldev: ldev, rdev: rdev}
replicator.Rings[0] = &FakeRepRing1{ldev: ldev, rdev: rdev}
replicator.Run()
req, err = http.NewRequest("HEAD", fmt.Sprintf("http://%s:%d/sda/0/a/c/o", ts2.host, ts2.port), nil)
@ -426,7 +426,7 @@ func TestReplicationHandoff(t *testing.T) {
replicator, err := makeReplicator("bind_port", fmt.Sprintf("%d", ts.port))
require.Nil(t, err)
replicator.driveRoot = ts.objServer.driveRoot
replicator.Ring = &FakeRepRing2{ldev: ldev, rdev: rdev}
replicator.Rings[0] = &FakeRepRing2{ldev: ldev, rdev: rdev}
replicator.Run()
req, err = http.NewRequest("HEAD", fmt.Sprintf("http://%s:%d/sda/0/a/c/o", ts2.host, ts2.port), nil)
@ -473,7 +473,7 @@ func TestReplicationHandoffQuorumDelete(t *testing.T) {
replicator, _ = makeReplicatorWithFlags([]string{"bind_port", fmt.Sprintf("%d", ts.port)}, flags)
require.True(t, replicator.quorumDelete)
replicator.driveRoot = ts.objServer.driveRoot
replicator.Ring = &FakeRepRing2{ldev: ldev, rdev: rdev}
replicator.Rings[0] = &FakeRepRing2{ldev: ldev, rdev: rdev}
replicator.Run()
req, err = http.NewRequest("HEAD", fmt.Sprintf("http://%s:%d/sda/0/a/c/o", ts2.host, ts2.port), nil)
@ -616,7 +616,7 @@ func TestRestartDevice(t *testing.T) {
saved := &replicationLogSaver{}
repl, _ := makeReplicator()
repl.Ring = &FakeRepRing2{ldev: ldev, rdev: rdev}
repl.Rings[0] = &FakeRepRing2{ldev: ldev, rdev: rdev}
repl.logger = saved
// set stuff up
@ -680,7 +680,7 @@ func TestRestartDevices(t *testing.T) {
ldev := &hummingbird.Device{ReplicationIp: "127.0.0.1", ReplicationPort: 6001, Device: "sda"}
rdev := &hummingbird.Device{ReplicationIp: "127.0.0.2", ReplicationPort: 6001, Device: "sdb"}
ring := &FakeRepRing2{ldev: ldev, rdev: rdev}
replicator.Ring = ring
replicator.Rings[0] = ring
replicator.restartDevices()
_, oka := replicator.cancelers["sda"]
_, okb := replicator.cancelers["sdb"]
@ -704,16 +704,17 @@ func TestSyncFileQuarantine(t *testing.T) {
hashDir := filepath.Join(driveRoot, "sda", "objects", "1", "abc", "d41d8cd98f00b204e9800998ecf8427e")
objFile := filepath.Join(hashDir, "12345.data")
j := &job{policy: 0}
require.Nil(t, os.MkdirAll(objFile, 0777)) // not a regular file
replicator.syncFile(objFile, nil, nil)
replicator.syncFile(objFile, nil, j)
assert.False(t, hummingbird.Exists(hashDir))
os.MkdirAll(hashDir, 0777) // error reading metadata
fp, err := os.Create(objFile)
require.Nil(t, err)
defer fp.Close()
replicator.syncFile(objFile, nil, nil)
replicator.syncFile(objFile, nil, j)
assert.False(t, hummingbird.Exists(hashDir))
os.MkdirAll(hashDir, 0777) // unparseable pickle
@ -721,7 +722,7 @@ func TestSyncFileQuarantine(t *testing.T) {
defer fp.Close()
require.Nil(t, err)
RawWriteMetadata(fp.Fd(), []byte("NOT A VALID PICKLE"))
replicator.syncFile(objFile, nil, nil)
replicator.syncFile(objFile, nil, j)
assert.False(t, hummingbird.Exists(hashDir))
os.MkdirAll(hashDir, 0777) // wrong metadata type
@ -729,7 +730,7 @@ func TestSyncFileQuarantine(t *testing.T) {
defer fp.Close()
require.Nil(t, err)
RawWriteMetadata(fp.Fd(), hummingbird.PickleDumps("hi"))
replicator.syncFile(objFile, nil, nil)
replicator.syncFile(objFile, nil, j)
assert.False(t, hummingbird.Exists(hashDir))
os.MkdirAll(hashDir, 0777) // unparseable content-length
@ -740,7 +741,7 @@ func TestSyncFileQuarantine(t *testing.T) {
"Content-Type": "text/plain", "name": "/a/c/o", "ETag": "d41d8cd98f00b204e9800998ecf8427e",
"X-Timestamp": "12345.12345", "Content-Length": "X"}
RawWriteMetadata(fp.Fd(), hummingbird.PickleDumps(badContentLengthMetdata))
replicator.syncFile(objFile, nil, nil)
replicator.syncFile(objFile, nil, j)
assert.False(t, hummingbird.Exists(hashDir))
os.MkdirAll(hashDir, 0777) // content-length doesn't match file size
@ -751,6 +752,6 @@ func TestSyncFileQuarantine(t *testing.T) {
"Content-Type": "text/plain", "name": "/a/c/o", "ETag": "d41d8cd98f00b204e9800998ecf8427e",
"X-Timestamp": "12345.12345", "Content-Length": "50000"}
RawWriteMetadata(fp.Fd(), hummingbird.PickleDumps(wrongContentLengthMetdata))
replicator.syncFile(objFile, nil, nil)
replicator.syncFile(objFile, nil, j)
assert.False(t, hummingbird.Exists(hashDir))
}

View File

@ -175,13 +175,14 @@ type SwiftObjectFactory struct {
replicationMan *ReplicationManager
replicateTimeout time.Duration
reclaimAge int64
policy int
}
// New returns an instance of SwiftObject with the given parameters. Metadata is read in and if needData is true, the file is opened.
func (f *SwiftObjectFactory) New(vars map[string]string, needData bool) (Object, error) {
var err error
sor := &SwiftObject{reclaimAge: f.reclaimAge, reserve: f.reserve}
sor.hashDir = ObjHashDir(vars, f.driveRoot, f.hashPathPrefix, f.hashPathSuffix)
sor.hashDir = ObjHashDir(vars, f.driveRoot, f.hashPathPrefix, f.hashPathSuffix, f.policy)
sor.tempDir = TempDirPath(f.driveRoot, vars["device"])
sor.dataFile, sor.metaFile = ObjectFiles(sor.hashDir)
if sor.Exists() {
@ -226,8 +227,8 @@ func (f *SwiftObjectFactory) objReplicateHandler(writer http.ResponseWriter, req
if len(vars["suffixes"]) > 0 {
recalculate = strings.Split(vars["suffixes"], "-")
}
hashes, err := GetHashes(f.driveRoot, vars["device"], vars["partition"], recalculate, f.reclaimAge, hummingbird.GetLogger(request))
if err != nil {
hashes, herr := GetHashes(f.driveRoot, vars["device"], vars["partition"], recalculate, f.reclaimAge, f.policy, hummingbird.GetLogger(request))
if herr != nil {
hummingbird.GetLogger(request).LogError("Unable to get hashes for %s/%s", vars["device"], vars["partition"])
hummingbird.StandardResponse(writer, http.StatusInternalServerError)
return
@ -273,7 +274,7 @@ func (f *SwiftObjectFactory) objRepConnHandler(writer http.ResponseWriter, reque
var hashes map[string]string
if brr.NeedHashes {
var herr *hummingbird.BackendError
hashes, herr = GetHashes(f.driveRoot, brr.Device, brr.Partition, nil, f.reclaimAge, hummingbird.GetLogger(request))
hashes, herr = GetHashes(f.driveRoot, brr.Device, brr.Partition, nil, f.reclaimAge, f.policy, hummingbird.GetLogger(request))
if herr != nil {
hummingbird.GetLogger(request).LogError("[ObjRepConnHandler] Error getting hashes: %v", herr)
writer.WriteHeader(http.StatusInternalServerError)
@ -355,7 +356,7 @@ func (f *SwiftObjectFactory) RegisterHandlers(addRoute func(method, path string,
}
// SwiftEngineConstructor creates a SwiftObjectFactory given the object server configs.
func SwiftEngineConstructor(config hummingbird.Config, flags *flag.FlagSet) (ObjectEngine, error) {
func SwiftEngineConstructor(config hummingbird.Config, policy *hummingbird.Policy, flags *flag.FlagSet) (ObjectEngine, error) {
driveRoot := config.GetDefault("app:object-server", "devices", "/srv/node")
reserve := config.GetInt("app:object-server", "fallocate_reserve", 0)
hashPathPrefix, hashPathSuffix, err := hummingbird.GetHashPrefixAndSuffix()
@ -372,11 +373,12 @@ func SwiftEngineConstructor(config hummingbird.Config, flags *flag.FlagSet) (Obj
reserve: reserve,
replicationMan: replicationMan,
replicateTimeout: replicateTimeout,
reclaimAge: reclaimAge}, nil
reclaimAge: reclaimAge,
policy: policy.Index}, nil
}
func init() {
RegisterObjectEngine("swift", SwiftEngineConstructor)
RegisterObjectEngine("replication", SwiftEngineConstructor)
}
// make sure these things satisfy interfaces at compile time

View File

@ -116,10 +116,11 @@ func (server *ObjectServer) updateContainer(metadata map[string]string, request
return
}
requestHeaders := http.Header{
"Referer": {hummingbird.GetDefault(request.Header, "Referer", "-")},
"User-Agent": {hummingbird.GetDefault(request.Header, "User-Agent", "-")},
"X-Trans-Id": {hummingbird.GetDefault(request.Header, "X-Trans-Id", "-")},
"X-Timestamp": {request.Header.Get("X-Timestamp")},
"X-Backend-Storage-Policy-Index": {hummingbird.GetDefault(request.Header, "X-Backend-Storage-Policy-Index", "0")},
"Referer": {hummingbird.GetDefault(request.Header, "Referer", "-")},
"User-Agent": {hummingbird.GetDefault(request.Header, "User-Agent", "-")},
"X-Trans-Id": {hummingbird.GetDefault(request.Header, "X-Trans-Id", "-")},
"X-Timestamp": {request.Header.Get("X-Timestamp")},
}
if request.Method != "DELETE" {
requestHeaders.Add("X-Content-Type", metadata["Content-Type"])
@ -152,10 +153,11 @@ func (server *ObjectServer) updateDeleteAt(request *http.Request, deleteAtStr st
hosts := splitHeader(request.Header.Get("X-Delete-At-Host"))
devices := splitHeader(request.Header.Get("X-Delete-At-Device"))
requestHeaders := http.Header{
"Referer": {hummingbird.GetDefault(request.Header, "Referer", "-")},
"User-Agent": {hummingbird.GetDefault(request.Header, "User-Agent", "-")},
"X-Trans-Id": {hummingbird.GetDefault(request.Header, "X-Trans-Id", "-")},
"X-Timestamp": {request.Header.Get("X-Timestamp")},
"X-Backend-Storage-Policy-Index": {hummingbird.GetDefault(request.Header, "X-Backend-Storage-Policy-Index", "0")},
"Referer": {hummingbird.GetDefault(request.Header, "Referer", "-")},
"User-Agent": {hummingbird.GetDefault(request.Header, "User-Agent", "-")},
"X-Trans-Id": {hummingbird.GetDefault(request.Header, "X-Trans-Id", "-")},
"X-Timestamp": {request.Header.Get("X-Timestamp")},
}
if request.Method != "DELETE" {
requestHeaders.Add("X-Content-Type", "text/plain")

View File

@ -31,9 +31,9 @@ func TestAuditorMd5(t *testing.T) {
// put a file
timestamp := hummingbird.GetTimestamp()
e.PutObject(0, timestamp, "X")
e.PutObject(0, timestamp, "X", 0)
locations := e.FileLocations("a", "c", "o")
locations := e.FileLocations("a", "c", "o", 0)
path := filepath.Join(locations[0], timestamp+".data")
// make sure the file is still there after an audit pass
@ -56,9 +56,9 @@ func TestAuditorContentLength(t *testing.T) {
// put a file
timestamp := hummingbird.GetTimestamp()
e.PutObject(0, timestamp, "X")
e.PutObject(0, timestamp, "X", 0)
locations := e.FileLocations("a", "c", "o")
locations := e.FileLocations("a", "c", "o", 0)
path := filepath.Join(locations[0], timestamp+".data")
// make sure the file is still there after an audit pass

View File

@ -112,18 +112,18 @@ func (e *Environment) Close() {
}
// FileLocations returns a list of file paths for the object's hash directory on all three underlying object servers.
func (e *Environment) FileLocations(account, container, obj string) (paths []string) {
func (e *Environment) FileLocations(account, container, obj string, policy int) (paths []string) {
partition := e.ring.GetPartition(account, container, obj)
vars := map[string]string{"account": account, "container": container, "obj": obj, "partition": strconv.Itoa(int(partition)), "device": "sda"}
for i := 0; i < 4; i++ {
path := objectserver.ObjHashDir(vars, e.driveRoots[i], e.hashPrefix, e.hashSuffix)
path := objectserver.ObjHashDir(vars, e.driveRoots[i], e.hashPrefix, e.hashSuffix, policy)
paths = append(paths, path)
}
return
}
// PutObject uploads an object "/a/c/o" to the indicated server with X-Timestamp set to timestamp and body set to data.
func (e *Environment) PutObject(server int, timestamp string, data string) bool {
func (e *Environment) PutObject(server int, timestamp string, data string, policy int) bool {
body := bytes.NewBuffer([]byte(data))
req, err := http.NewRequest("PUT", fmt.Sprintf("http://%s:%d/sda/0/a/c/o", e.hosts[server], e.ports[server]), body)
if err != nil {
@ -132,16 +132,30 @@ func (e *Environment) PutObject(server int, timestamp string, data string) bool
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Content-Length", strconv.Itoa(len(data)))
req.Header.Set("X-Timestamp", timestamp)
req.Header.Set("X-Backend-Storage-Policy-Index", strconv.Itoa(policy))
resp, err := http.DefaultClient.Do(req)
return err == nil && resp.StatusCode == 201
}
// DeleteObject deletes the object.
func (e *Environment) DeleteObject(server int, timestamp string, policy int) bool {
req, err := http.NewRequest("DELETE", fmt.Sprintf("http://%s:%d/sda/0/a/c/o", e.hosts[server], e.ports[server]), nil)
if err != nil {
return false
}
req.Header.Set("X-Timestamp", timestamp)
req.Header.Set("X-Backend-Storage-Policy-Index", strconv.Itoa(policy))
resp, err := http.DefaultClient.Do(req)
return err == nil && resp.StatusCode == 204
}
// ObjExists returns a boolean indicating that it can fetch the named object and that its X-Timestamp matches the timestamp argument.
func (e *Environment) ObjExists(server int, timestamp string) bool {
func (e *Environment) ObjExists(server int, timestamp string, policy int) bool {
req, err := http.NewRequest("HEAD", fmt.Sprintf("http://%s:%d/sda/0/a/c/o", e.hosts[server], e.ports[server]), nil)
if err != nil {
return false
}
req.Header.Set("X-Backend-Storage-Policy-Index", strconv.Itoa(policy))
resp, err := http.DefaultClient.Do(req)
if err != nil || resp.StatusCode != 200 {
return false
@ -151,6 +165,30 @@ func (e *Environment) ObjExists(server int, timestamp string) bool {
// NewEnvironment creates a new environment. Arguments should be a series of key, value pairs that are added to the object server configuration file.
func NewEnvironment(settings ...string) *Environment {
oldLoadPolicies := hummingbird.LoadPolicies
hummingbird.LoadPolicies = func() hummingbird.PolicyList {
return hummingbird.PolicyList(map[int]*hummingbird.Policy{
0: &hummingbird.Policy{
Index: 0,
Type: "replication",
Name: "Policy-0",
Aliases: nil,
Default: false,
Deprecated: false,
},
1: &hummingbird.Policy{
Index: 1,
Type: "replication",
Name: "Policy-1",
Aliases: nil,
Default: false,
Deprecated: false,
},
})
}
defer func() {
hummingbird.LoadPolicies = oldLoadPolicies
}()
env := &Environment{ring: &FakeRing{devices: nil}}
env.hashPrefix, env.hashSuffix, _ = hummingbird.GetHashPrefixAndSuffix()
for i := 0; i < 4; i++ {
@ -174,7 +212,8 @@ func NewEnvironment(settings ...string) *Environment {
ts.Config.Handler = server.GetHandler(conf)
replicator, _ := objectserver.NewReplicator(conf, &flag.FlagSet{})
auditor, _ := objectserver.NewAuditor(conf, &flag.FlagSet{})
replicator.(*objectserver.Replicator).Ring = env.ring
replicator.(*objectserver.Replicator).Rings[0] = env.ring
replicator.(*objectserver.Replicator).Rings[1] = env.ring
env.ring.(*FakeRing).devices = append(env.ring.(*FakeRing).devices, &hummingbird.Device{
Id: i, Device: "sda", Ip: host, Port: port, Region: 0, ReplicationIp: host, ReplicationPort: port, Weight: 1, Zone: i,
})

View File

@ -32,11 +32,11 @@ func TestAuditReplicate(t *testing.T) {
// put a file
timestamp := hummingbird.GetTimestamp()
for i := 0; i < 3; i++ {
e.PutObject(i, timestamp, "X")
e.PutObject(i, timestamp, "X", 0)
}
// simulate bit-rot of the file contents
locations := e.FileLocations("a", "c", "o")
locations := e.FileLocations("a", "c", "o", 0)
path := filepath.Join(locations[0], timestamp+".data")
f, _ := os.OpenFile(path, os.O_RDWR, 0777)
f.Write([]byte("!"))
@ -44,9 +44,9 @@ func TestAuditReplicate(t *testing.T) {
// make sure the file is gone after an audit pass
e.auditors[0].Run()
assert.False(t, e.ObjExists(0, timestamp))
assert.False(t, e.ObjExists(0, timestamp, 0))
// make sure the file is replaced after another server's replicator runs
e.replicators[1].Run()
assert.True(t, e.ObjExists(0, timestamp))
assert.True(t, e.ObjExists(0, timestamp, 0))
}

95
go/probe/policy_test.go Normal file
View File

@ -0,0 +1,95 @@
// Copyright (c) 2015 Rackspace
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package probe
import (
"os"
"path/filepath"
"testing"
"github.com/openstack/swift/go/hummingbird"
"github.com/stretchr/testify/require"
)
func TestPolicySeparate(t *testing.T) {
e := NewEnvironment()
defer e.Close()
timestamp := hummingbird.GetTimestamp()
require.True(t, e.PutObject(0, timestamp, "X", 0))
require.True(t, e.ObjExists(0, timestamp, 0))
require.False(t, e.ObjExists(0, timestamp, 1))
timestamp = hummingbird.GetTimestamp()
require.True(t, e.DeleteObject(0, timestamp, 0))
require.True(t, e.PutObject(0, timestamp, "X", 1))
require.False(t, e.ObjExists(0, timestamp, 0))
require.True(t, e.ObjExists(0, timestamp, 1))
}
func TestBasicPolicyReplication(t *testing.T) {
e := NewEnvironment()
defer e.Close()
timestamp := hummingbird.GetTimestamp()
require.True(t, e.PutObject(0, timestamp, "X", 1))
e.replicators[0].Run()
require.False(t, e.ObjExists(0, timestamp, 0))
require.False(t, e.ObjExists(1, timestamp, 0))
require.True(t, e.ObjExists(0, timestamp, 1))
require.True(t, e.ObjExists(2, timestamp, 1))
}
func TestOtherPolicyAuditReplicate(t *testing.T) {
e := NewEnvironment()
defer e.Close()
timestamp := hummingbird.GetTimestamp()
e.PutObject(0, timestamp, "X", 1)
e.PutObject(1, timestamp, "X", 1)
locations := e.FileLocations("a", "c", "o", 1)
path := filepath.Join(locations[0], timestamp+".data")
e.auditors[0].Run()
e.auditors[1].Run()
require.True(t, e.ObjExists(0, timestamp, 1))
require.True(t, e.ObjExists(1, timestamp, 1))
f, _ := os.OpenFile(path, os.O_RDWR, 0777)
f.Write([]byte("!"))
f.Close()
e.auditors[0].Run()
e.auditors[1].Run()
require.True(t, e.ObjExists(1, timestamp, 1))
require.False(t, e.ObjExists(0, timestamp, 1))
e.replicators[0].Run()
e.replicators[1].Run()
require.True(t, e.ObjExists(0, timestamp, 1))
require.True(t, e.ObjExists(1, timestamp, 1))
require.False(t, e.ObjExists(0, timestamp, 0))
require.False(t, e.ObjExists(1, timestamp, 0))
}

View File

@ -30,7 +30,7 @@ func TestReplicationHandoff(t *testing.T) {
// put a file
timestamp := hummingbird.GetTimestamp()
assert.True(t, e.PutObject(0, timestamp, "X"))
assert.True(t, e.PutObject(0, timestamp, "X", 0))
// make a drive look unmounted with a handler that always 507s
origHandler := e.servers[1].Config.Handler
@ -42,28 +42,28 @@ func TestReplicationHandoff(t *testing.T) {
e.replicators[0].Run()
// so it's on the primary nodes that are up
assert.True(t, e.ObjExists(0, timestamp))
assert.True(t, e.ObjExists(2, timestamp))
assert.True(t, e.ObjExists(0, timestamp, 0))
assert.True(t, e.ObjExists(2, timestamp, 0))
// and now it's on the handoff node
assert.True(t, e.ObjExists(3, timestamp))
assert.True(t, e.ObjExists(3, timestamp, 0))
// fix the "unmounted" drive
e.servers[1].Config.Handler = origHandler
// make sure it's not on the newly fixed node yet
assert.False(t, e.ObjExists(1, timestamp))
assert.False(t, e.ObjExists(1, timestamp, 0))
// run the handoff node's replicator
e.replicators[3].Run()
// it's no longer on the handoff node
assert.False(t, e.ObjExists(3, timestamp))
assert.False(t, e.ObjExists(3, timestamp, 0))
// make sure it's on all the primary nodes
assert.True(t, e.ObjExists(0, timestamp))
assert.True(t, e.ObjExists(1, timestamp))
assert.True(t, e.ObjExists(2, timestamp))
assert.True(t, e.ObjExists(0, timestamp, 0))
assert.True(t, e.ObjExists(1, timestamp, 0))
assert.True(t, e.ObjExists(2, timestamp, 0))
}
func TestReplicationUnlinkOld(t *testing.T) {
@ -72,18 +72,18 @@ func TestReplicationUnlinkOld(t *testing.T) {
// put a file to a primary node
timestamp := hummingbird.GetTimestamp()
assert.True(t, e.PutObject(0, timestamp, "X"))
assert.True(t, e.PutObject(0, timestamp, "X", 0))
// put a newer file to another primary node
timestamp2 := hummingbird.GetTimestamp()
assert.True(t, e.PutObject(1, timestamp2, "X"))
assert.True(t, e.PutObject(1, timestamp2, "X", 0))
assert.True(t, e.ObjExists(0, timestamp))
assert.True(t, e.ObjExists(1, timestamp2))
assert.True(t, e.ObjExists(0, timestamp, 0))
assert.True(t, e.ObjExists(1, timestamp2, 0))
// run the replicator on the server with the old file
e.replicators[0].Run()
// verify the old file was removed by the replicator
assert.False(t, e.ObjExists(0, timestamp))
assert.False(t, e.ObjExists(0, timestamp, 0))
}