diff --git a/go/client/directclient.go b/go/client/directclient.go index a1adc46302..0148ce1823 100644 --- a/go/client/directclient.go +++ b/go/client/directclient.go @@ -472,15 +472,15 @@ func NewProxyDirectClient() (ProxyClient, error) { if err != nil { return nil, err } - c.ObjectRing, err = hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix) + c.ObjectRing, err = hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix, 0) if err != nil { return nil, err } - c.ContainerRing, err = hummingbird.GetRing("container", hashPathPrefix, hashPathSuffix) + c.ContainerRing, err = hummingbird.GetRing("container", hashPathPrefix, hashPathSuffix, 0) if err != nil { return nil, err } - c.AccountRing, err = hummingbird.GetRing("account", hashPathPrefix, hashPathSuffix) + c.AccountRing, err = hummingbird.GetRing("account", hashPathPrefix, hashPathSuffix, 0) if err != nil { return nil, err } diff --git a/go/hummingbird/policy.go b/go/hummingbird/policy.go new file mode 100644 index 0000000000..cff9f14f02 --- /dev/null +++ b/go/hummingbird/policy.go @@ -0,0 +1,86 @@ +// Copyright (c) 2015 Rackspace +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package hummingbird + +import ( + "fmt" + "strings" +) + +type Policy struct { + Index int + Type string + Name string + Aliases []string + Default bool + Deprecated bool + Config map[string]string +} + +type PolicyList map[int]*Policy + +// LoadPolicies loads policies, probably from /etc/swift/swift.conf +func normalLoadPolicies() PolicyList { + policies := map[int]*Policy{0: &Policy{ + Index: 0, + Type: "replication", + Name: "Policy-0", + Aliases: nil, + Default: false, + Deprecated: false, + }} + for _, loc := range configLocations { + if conf, e := LoadConfig(loc); e == nil { + for key := range conf.File { + var policyIndex int + if c, err := fmt.Sscanf(key, "storage-policy:%d", &policyIndex); err == nil && c == 1 { + aliases := []string{} + aliasList := conf.GetDefault(key, "aliases", "") + for _, alias := range strings.Split(aliasList, ",") { + alias = strings.Trim(alias, " ") + if alias != "" { + aliases = append(aliases, alias) + } + } + policies[policyIndex] = &Policy{ + Index: policyIndex, + Type: conf.GetDefault(key, "policy_type", "replication"), + Name: conf.GetDefault(key, "name", fmt.Sprintf("Policy-%d", policyIndex)), + Aliases: aliases, + Deprecated: conf.GetBool(key, "deprecated", false), + Default: conf.GetBool(key, "default", false), + Config: map[string]string(conf.File[key]), + } + } + } + break + } + } + defaultFound := false + for _, policy := range policies { + if policy.Default { + defaultFound = true + } + } + if !defaultFound { + policies[0].Default = true + } + return PolicyList(policies) +} + +type loadPoliciesFunc func() PolicyList + +var LoadPolicies loadPoliciesFunc = normalLoadPolicies diff --git a/go/hummingbird/policy_test.go b/go/hummingbird/policy_test.go new file mode 100644 index 0000000000..d2afcb2e51 --- /dev/null +++ b/go/hummingbird/policy_test.go @@ -0,0 +1,63 @@ +// Copyright (c) 2015 Rackspace +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package hummingbird + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestLoadPolicy(t *testing.T) { + tempFile, _ := ioutil.TempFile("", "INI") + tempFile.Write([]byte("[swift-hash]\nswift_hash_path_prefix = changeme\nswift_hash_path_suffix = changeme\n" + + "[storage-policy:0]\nname = gold\naliases = yellow, orange\npolicy_type = replication\ndefault = yes\n" + + "[storage-policy:1]\nname = silver\npolicy_type = replication\ndeprecated = yes\n")) + oldConfigs := configLocations + defer func() { + configLocations = oldConfigs + defer tempFile.Close() + defer os.Remove(tempFile.Name()) + }() + configLocations = []string{tempFile.Name()} + policyList := LoadPolicies() + require.Equal(t, policyList[0].Name, "gold") + require.Equal(t, policyList[0].Default, true) + require.Equal(t, policyList[0].Deprecated, false) + require.Equal(t, policyList[0].Aliases, []string{"yellow", "orange"}) + require.Equal(t, policyList[1].Name, "silver") + require.Equal(t, policyList[1].Deprecated, true) + require.Equal(t, policyList[1].Default, false) + require.Equal(t, policyList[1].Aliases, []string{}) +} + +func TestNoPolicies(t *testing.T) { + tempFile, _ := ioutil.TempFile("", "INI") + tempFile.Write([]byte("[swift-hash]\nswift_hash_path_prefix = changeme\nswift_hash_path_suffix = changeme\n")) + oldConfigs := configLocations + defer func() { + configLocations = oldConfigs + defer tempFile.Close() + defer os.Remove(tempFile.Name()) + }() + configLocations = []string{tempFile.Name()} + policyList := LoadPolicies() + require.Equal(t, policyList[0].Name, "Policy-0") + require.Equal(t, policyList[0].Default, true) + require.Equal(t, policyList[0].Deprecated, false) +} diff --git a/go/hummingbird/ring.go b/go/hummingbird/ring.go index 03f732b6c5..ce1dac3a1c 100644 --- a/go/hummingbird/ring.go +++ b/go/hummingbird/ring.go @@ -324,12 +324,16 @@ func LoadRing(path string, prefix string, suffix string) (Ring, error) { // GetRing returns the current ring given the ring_type ("account", "container", "object"), // hash path prefix, and hash path suffix. An error is raised if the requested ring does // not exist. -func GetRing(ring_type, prefix, suffix string) (Ring, error) { +func GetRing(ringType, prefix, suffix string, policy int) (Ring, error) { var ring Ring var err error - if ring, err = LoadRing(fmt.Sprintf("/etc/hummingbird/%s.ring.gz", ring_type), prefix, suffix); err != nil { - if ring, err = LoadRing(fmt.Sprintf("/etc/swift/%s.ring.gz", ring_type), prefix, suffix); err != nil { - return nil, fmt.Errorf("Error loading %s ring", ring_type) + ringFile := fmt.Sprintf("%s.ring.gz", ringType) + if policy != 0 { + ringFile = fmt.Sprintf("%s-%d.ring.gz", ringType, policy) + } + if ring, err = LoadRing(fmt.Sprintf("/etc/hummingbird/%s", ringFile), prefix, suffix); err != nil { + if ring, err = LoadRing(fmt.Sprintf("/etc/swift/%s", ringFile), prefix, suffix); err != nil { + return nil, fmt.Errorf("Error loading %s:%d ring", ringType, policy) } } return ring, nil diff --git a/go/hummingbird/router.go b/go/hummingbird/router.go index ce4c9fc5b4..bf34ab6beb 100644 --- a/go/hummingbird/router.go +++ b/go/hummingbird/router.go @@ -17,6 +17,7 @@ package hummingbird import ( "net/http" + "strconv" "strings" ) @@ -33,6 +34,7 @@ type matcher struct { catchallName string vars []variable static []variable + policy int handler http.Handler } @@ -42,6 +44,8 @@ type router struct { MethodNotAllowedHandler http.Handler } +const anyPolicy = -1 + // Split a string in twain on sep. Doing it this way over strings.Split*() saves allocating a slice. func Split2(path string, sep string) (string, string) { if nextSlash := strings.Index(path, sep); nextSlash == -1 { @@ -52,13 +56,16 @@ func Split2(path string, sep string) (string, string) { } // Given a method and path, return the handler and vars that should be used to serve them. -func (r *router) route(method, path string) (http.Handler, map[string]string) { +func (r *router) route(method, path string, policy int) (http.Handler, map[string]string) { methodFound := false path = path[1:] slashCount := strings.Count(path, "/") // This code is slightly gnarly because it avoids allocating anything until it's sure of a match. NEXTMATCH: for _, m := range r.matchers { + if m.policy != anyPolicy && m.policy != policy { + continue + } if m.method != method { continue } @@ -113,10 +120,10 @@ NEXTMATCH: return r.NotFoundHandler, nil } -// Register a handler for the given method and pattern. +// HandlePolicy registers a handler for the given method, pattern, and policy header. // The pattern is pretty much what you're used to, i.e. /static/:variable/*catchall -func (r *router) Handle(method, pattern string, handler http.Handler) { - m := &matcher{method: method} +func (r *router) HandlePolicy(method, pattern string, policy int, handler http.Handler) { + m := &matcher{method: method, policy: policy} parts := strings.Split(pattern[1:], "/") m.length = len(parts) if pattern[len(pattern)-1] == '/' { @@ -136,6 +143,12 @@ func (r *router) Handle(method, pattern string, handler http.Handler) { r.matchers = append(r.matchers, m) } +// Handle registers a handler for the given method and pattern. +// The pattern is pretty much what you're used to, i.e. /static/:variable/*catchall +func (r *router) Handle(method, pattern string, handler http.Handler) { + r.HandlePolicy(method, pattern, anyPolicy, handler) +} + func (r *router) Get(path string, handler http.Handler) { r.Handle("GET", path, handler) } @@ -165,7 +178,11 @@ func (r *router) Post(path string, handler http.Handler) { } func (r *router) ServeHTTP(writer http.ResponseWriter, request *http.Request) { - handler, vars := r.route(request.Method, request.URL.Path) + policy, err := strconv.Atoi(request.Header.Get("X-Backend-Storage-Policy-Index")) + if err != nil { + policy = 0 + } + handler, vars := r.route(request.Method, request.URL.Path, policy) SetVars(request, vars) handler.ServeHTTP(writer, request) } diff --git a/go/hummingbird/router_test.go b/go/hummingbird/router_test.go index df8611bd7f..98b660a7c8 100644 --- a/go/hummingbird/router_test.go +++ b/go/hummingbird/router_test.go @@ -19,7 +19,6 @@ import ( "net/url" "testing" - "github.com/dimfeld/httptreemux" "github.com/stretchr/testify/assert" ) @@ -79,6 +78,9 @@ func TestRouterDisambiguation(t *testing.T) { addRoute("REPLICATE", "/:device/:partition", "REPLICATE") addRoute("SYNC", "/:device/*relpath", "SYNC") addRoute("GET", "/info", "INFO") + router.HandlePolicy("GET", "/reconstruct", 10, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handledBy = "POLICY_VERB_WORKED" + })) router.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { handledBy = "NOT_FOUND" vars = nil @@ -158,6 +160,16 @@ func TestRouterDisambiguation(t *testing.T) { makeRequest("GET", "/something/bad") assert.Equal(t, "NOT_FOUND", handledBy) assert.Nil(t, vars) + + req, _ := http.NewRequest("GET", "/reconstruct", nil) + req.URL, _ = url.Parse("/reconstruct") + req.RequestURI = "/reconstruct" + router.ServeHTTP(mockResponseWriter{}, req) + assert.Equal(t, "NOT_FOUND", handledBy) + + req.Header.Set("X-Backend-Storage-Policy-Index", "10") + router.ServeHTTP(mockResponseWriter{}, req) + assert.Equal(t, "POLICY_VERB_WORKED", handledBy) } func BenchmarkRouteObject(b *testing.B) { @@ -182,27 +194,3 @@ func BenchmarkRouteObject(b *testing.B) { router.ServeHTTP(w, r) } } - -func BenchmarkRouteObjectTree(b *testing.B) { - router := httptreemux.New() - // calls SetVars() to be fair. I mean, we could just pass the vars down too. - objGet := func(w http.ResponseWriter, r *http.Request, ps map[string]string) { SetVars(r, ps) } - router.GET("/healthcheck", objGet) - router.GET("/diskusage", objGet) - router.GET("/recon/:method", objGet) - router.HEAD("/:device/:partition/:account/:container/*obj", objGet) - router.PUT("/:device/:partition/:account/:container/*obj", objGet) - router.DELETE("/:device/:partition/:account/:container/*obj", objGet) - router.Handle("REPLICATE", "/:device/:partition/:suffixes", objGet) - router.Handle("REPLICATE", "/:device/:partition", objGet) - router.Handle("SYNC", "/:device/*relpath", objGet) - router.GET("/:device/:partition/:account/:container/*obj", objGet) - - r := newRequest("GET", "/sda/123/acc/cont/a/bunch/of/stuff") - w := mockResponseWriter{} - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - router.ServeHTTP(w, r) - } -} diff --git a/go/objectserver/auditor.go b/go/objectserver/auditor.go index c31bfaffd5..ee4089c5d6 100644 --- a/go/objectserver/auditor.go +++ b/go/objectserver/auditor.go @@ -197,22 +197,27 @@ func (a *Auditor) auditDevice(devPath string) { return } - objPath := filepath.Join(devPath, "objects") - partitions, err := hummingbird.ReadDirNames(objPath) - if err != nil { - a.errors++ - a.totalErrors++ - a.LogError("Error reading objects dir %s", objPath) - return - } - for _, partition := range partitions { - _, intErr := strconv.ParseInt(partition, 10, 64) - partitionDir := filepath.Join(objPath, partition) - if finfo, err := os.Stat(partitionDir); err != nil || intErr != nil || !finfo.Mode().IsDir() { - a.LogError("Skipping invalid file in objects directory: %s", partitionDir) + for _, policy := range hummingbird.LoadPolicies() { + if policy.Type != "replication" { continue } - a.auditPartition(partitionDir) + objPath := filepath.Join(devPath, PolicyDir(policy.Index)) + partitions, err := hummingbird.ReadDirNames(objPath) + if err != nil { + a.errors++ + a.totalErrors++ + a.LogError("Error reading objects dir: %s", objPath) + continue + } + for _, partition := range partitions { + _, intErr := strconv.ParseInt(partition, 10, 64) + partitionDir := filepath.Join(objPath, partition) + if finfo, err := os.Stat(partitionDir); err != nil || intErr != nil || !finfo.Mode().IsDir() { + a.LogError("Skipping invalid file in objects directory: %s", partitionDir) + continue + } + a.auditPartition(partitionDir) + } } } diff --git a/go/objectserver/backend.go b/go/objectserver/backend.go index 12a1f4a8ed..b169d45c68 100644 --- a/go/objectserver/backend.go +++ b/go/objectserver/backend.go @@ -49,6 +49,24 @@ type AtomicFileWriter interface { Preallocate(int64, int64) error } +func PolicyDir(policy int) string { + if policy == 0 { + return "objects" + } + return fmt.Sprintf("objects-%d", policy) +} + +func UnPolicyDir(dir string) (int, error) { + if dir == "objects" { + return 0, nil + } + var policy int + if n, err := fmt.Sscanf(dir, "objects-%d", &policy); n == 1 && err == nil { + return policy, nil + } + return 0, fmt.Errorf("Unable to parse policy from dir") +} + func RawReadMetadata(fileNameOrFd interface{}) ([]byte, error) { var pickledMetadata []byte offset := 0 @@ -130,14 +148,14 @@ func WriteMetadata(fd uintptr, v map[string]string) error { func QuarantineHash(hashDir string) error { // FYI- this does not invalidate the hash like swift's version. Please // do that yourself - hash := filepath.Base(hashDir) - // drive objects partition suffix hash - driveDir := filepath.Dir(filepath.Dir(filepath.Dir(filepath.Dir(hashDir)))) - // TODO: this will need to be slightly more complicated once policies - quarantineDir := filepath.Join(driveDir, "quarantined", "objects") + // objects partition suffix hash + objsDir := filepath.Dir(filepath.Dir(filepath.Dir(hashDir))) + driveDir := filepath.Dir(objsDir) + quarantineDir := filepath.Join(driveDir, "quarantined", filepath.Base(objsDir)) if err := os.MkdirAll(quarantineDir, 0755); err != nil { return err } + hash := filepath.Base(hashDir) destDir := filepath.Join(quarantineDir, hash+"-"+hummingbird.UUID()) if err := os.Rename(hashDir, destDir); err != nil { return err @@ -251,8 +269,8 @@ func RecalculateSuffixHash(suffixDir string, reclaimAge int64) (string, *humming return hex.EncodeToString(h.Sum(nil)), nil } -func GetHashes(driveRoot string, device string, partition string, recalculate []string, reclaimAge int64, logger hummingbird.LoggingContext) (map[string]string, *hummingbird.BackendError) { - partitionDir := filepath.Join(driveRoot, device, "objects", partition) +func GetHashes(driveRoot string, device string, partition string, recalculate []string, reclaimAge int64, policy int, logger hummingbird.LoggingContext) (map[string]string, *hummingbird.BackendError) { + partitionDir := filepath.Join(driveRoot, device, PolicyDir(policy), partition) pklFile := filepath.Join(partitionDir, "hashes.pkl") invalidFile := filepath.Join(partitionDir, "hashes.invalid") @@ -307,7 +325,7 @@ func GetHashes(driveRoot string, device string, partition string, recalculate [] for suffix, hash := range hashes { if hash == "" { modified = true - suffixDir := driveRoot + "/" + device + "/objects/" + partition + "/" + suffix + suffixDir := filepath.Join(partitionDir, suffix) recalc_hash, err := RecalculateSuffixHash(suffixDir, reclaimAge) if err == nil { hashes[suffix] = recalc_hash @@ -340,18 +358,18 @@ func GetHashes(driveRoot string, device string, partition string, recalculate [] } logger.LogError("Made recursive call to GetHashes: %s", partitionDir) partitionLock.Close() - return GetHashes(driveRoot, device, partition, recalculate, reclaimAge, logger) + return GetHashes(driveRoot, device, partition, recalculate, reclaimAge, policy, logger) } } return hashes, nil } -func ObjHashDir(vars map[string]string, driveRoot string, hashPathPrefix string, hashPathSuffix string) string { +func ObjHashDir(vars map[string]string, driveRoot string, hashPathPrefix string, hashPathSuffix string, policy int) string { h := md5.New() io.WriteString(h, hashPathPrefix+"/"+vars["account"]+"/"+vars["container"]+"/"+vars["obj"]+hashPathSuffix) hexHash := hex.EncodeToString(h.Sum(nil)) suffix := hexHash[29:32] - return filepath.Join(driveRoot, vars["device"], "objects", vars["partition"], suffix, hexHash) + return filepath.Join(driveRoot, vars["device"], PolicyDir(policy), vars["partition"], suffix, hexHash) } func ObjectFiles(directory string) (string, string) { diff --git a/go/objectserver/backend_test.go b/go/objectserver/backend_test.go index fbb7cedde9..584d6f6801 100644 --- a/go/objectserver/backend_test.go +++ b/go/objectserver/backend_test.go @@ -26,6 +26,7 @@ import ( "github.com/openstack/swift/go/hummingbird" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestWriteReadMetadata(t *testing.T) { @@ -62,7 +63,7 @@ func TestGetHashes(t *testing.T) { f, _ = os.Create(filepath.Join(driveRoot, "sda", "objects", "1", "abc", "00000000000000000000000000000abc", "67890.data")) f.Close() - hashes, err := GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), nil) + hashes, err := GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), 0, nil) assert.Nil(t, err) assert.Equal(t, "b1589029b7db9d01347caece2159d588", hashes["abc"]) @@ -71,12 +72,12 @@ func TestGetHashes(t *testing.T) { f.Close() // make sure hash for "abc" isn't recalculated yet. - hashes, err = GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), nil) + hashes, err = GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), 0, nil) assert.Nil(t, err) assert.Equal(t, "b1589029b7db9d01347caece2159d588", hashes["abc"]) // force recalculate of "abc" - hashes, err = GetHashes(driveRoot, "sda", "1", []string{"abc"}, int64(hummingbird.ONE_WEEK), nil) + hashes, err = GetHashes(driveRoot, "sda", "1", []string{"abc"}, int64(hummingbird.ONE_WEEK), 0, nil) assert.Nil(t, err) assert.Equal(t, "8834e84467693c2e8f670f4afbea5334", hashes["abc"]) } @@ -91,7 +92,7 @@ func TestInvalidateHash(t *testing.T) { f, _ = os.Create(filepath.Join(driveRoot, "sda", "objects", "1", "abc", "00000000000000000000000000000abc", "67890.data")) f.Close() - hashes, err := GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), nil) + hashes, err := GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), 0, nil) assert.Nil(t, err) assert.Equal(t, "b1589029b7db9d01347caece2159d588", hashes["abc"]) @@ -100,13 +101,44 @@ func TestInvalidateHash(t *testing.T) { f.Close() // make sure hash for "abc" isn't recalculated yet. - hashes, err = GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), nil) + hashes, err = GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), 0, nil) assert.Nil(t, err) assert.Equal(t, "b1589029b7db9d01347caece2159d588", hashes["abc"]) // invalidate hash of suffix "abc" InvalidateHash(filepath.Join(driveRoot, "", "sda", "objects", "1", "abc", "00000000000000000000000000000abc")) - hashes, err = GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), nil) + hashes, err = GetHashes(driveRoot, "sda", "1", nil, int64(hummingbird.ONE_WEEK), 0, nil) assert.Nil(t, err) assert.Equal(t, "8834e84467693c2e8f670f4afbea5334", hashes["abc"]) } + +func TestPolicyDir(t *testing.T) { + policy, err := UnPolicyDir("objects") + require.Nil(t, err) + require.Equal(t, "objects", PolicyDir(policy)) + + policy, err = UnPolicyDir("objects-1") + require.Nil(t, err) + require.Equal(t, "objects-1", PolicyDir(policy)) + + policy, err = UnPolicyDir("objects-100") + require.Nil(t, err) + require.Equal(t, "objects-100", PolicyDir(policy)) +} + +func TestQuarantineHash(t *testing.T) { + driveRoot, _ := ioutil.TempDir("", "") + defer os.RemoveAll(driveRoot) + + hashDir := filepath.Join(driveRoot, "sda", "objects", "1", "abc", "fffffffffffffffffffffffffffffabc") + os.MkdirAll(hashDir, 0777) + QuarantineHash(hashDir) + require.True(t, hummingbird.Exists(filepath.Join(driveRoot, "sda", "quarantined", "objects"))) + require.False(t, hummingbird.Exists(hashDir)) + + hashDir = filepath.Join(driveRoot, "sdb", "objects-1", "1", "abc", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") + os.MkdirAll(hashDir, 0777) + QuarantineHash(hashDir) + require.True(t, hummingbird.Exists(filepath.Join(driveRoot, "sdb", "quarantined", "objects-1"))) + require.False(t, hummingbird.Exists(hashDir)) +} diff --git a/go/objectserver/main.go b/go/objectserver/main.go index 2450bddfe8..5e26f32b53 100644 --- a/go/objectserver/main.go +++ b/go/objectserver/main.go @@ -18,7 +18,6 @@ package objectserver import ( "crypto/md5" "encoding/hex" - "errors" "flag" "fmt" "io" @@ -36,8 +35,6 @@ import ( "github.com/openstack/swift/go/middleware" ) -var DefaultObjEngine = "swift" - type ObjectServer struct { driveRoot string hashPathPrefix string @@ -50,13 +47,25 @@ type ObjectServer struct { diskInUse *hummingbird.KeyedLimit expiringDivisor int64 updateClient *http.Client - objEngine ObjectEngine + objEngines map[int]ObjectEngine +} + +func (server *ObjectServer) newObject(req *http.Request, vars map[string]string, needData bool) (Object, error) { + policy, err := strconv.Atoi(req.Header.Get("X-Backend-Storage-Policy-Index")) + if err != nil { + policy = 0 + } + engine, ok := server.objEngines[policy] + if !ok { + return nil, fmt.Errorf("Engine for policy index %d not found.", policy) + } + return engine.New(vars, needData) } func (server *ObjectServer) ObjGetHandler(writer http.ResponseWriter, request *http.Request) { vars := hummingbird.GetVars(request) headers := writer.Header() - obj, err := server.objEngine.New(vars, request.Method == "GET") + obj, err := server.newObject(request, vars, request.Method == "GET") if err != nil { hummingbird.GetLogger(request).LogError("Unable to open object: %v", err) hummingbird.StandardResponse(writer, http.StatusInternalServerError) @@ -206,7 +215,7 @@ func (server *ObjectServer) ObjPutHandler(writer http.ResponseWriter, request *h } } - obj, err := server.objEngine.New(vars, false) + obj, err := server.newObject(request, vars, false) if err != nil { hummingbird.GetLogger(request).LogError("Error getting obj: %s", err.Error()) hummingbird.StandardResponse(writer, http.StatusInternalServerError) @@ -295,7 +304,7 @@ func (server *ObjectServer) ObjDeleteHandler(writer http.ResponseWriter, request } responseStatus := http.StatusNotFound - obj, err := server.objEngine.New(vars, false) + obj, err := server.newObject(request, vars, false) if err != nil { hummingbird.GetLogger(request).LogError("Error getting obj: %s", err.Error()) hummingbird.StandardResponse(writer, http.StatusInternalServerError) @@ -471,9 +480,11 @@ func (server *ObjectServer) GetHandler(config hummingbird.Config) http.Handler { router.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { http.Error(w, fmt.Sprintf("Invalid path: %s", r.URL.Path), http.StatusBadRequest) }) - server.objEngine.RegisterHandlers(func(method, path string, handler http.HandlerFunc) { - router.Handle(method, path, commonHandlers.ThenFunc(handler)) - }) + for policy, objEngine := range server.objEngines { + objEngine.RegisterHandlers(func(method, path string, handler http.HandlerFunc) { + router.HandlePolicy(method, path, policy, commonHandlers.ThenFunc(handler)) + }) + } return alice.New(middleware.GrepObject).Then(router) } @@ -490,13 +501,18 @@ func GetServer(serverconf hummingbird.Config, flags *flag.FlagSet) (bindIP strin if err != nil { return "", 0, nil, nil, err } - - objEngineName := serverconf.GetDefault("app:object-server", "object_engine", DefaultObjEngine) - if newEngineFactory, err := FindEngine(objEngineName); err != nil { - return "", 0, nil, nil, errors.New("Unable to find object engine") - } else if server.objEngine, err = newEngineFactory(serverconf, flags); err != nil { - return "", 0, nil, nil, err + server.objEngines = make(map[int]ObjectEngine) + for _, policy := range hummingbird.LoadPolicies() { + if newEngine, err := FindEngine(policy.Type); err != nil { + return "", 0, nil, nil, fmt.Errorf("Unable to find object engine type %s: %v", policy.Type, err) + } else { + server.objEngines[policy.Index], err = newEngine(serverconf, policy, flags) + if err != nil { + return "", 0, nil, nil, fmt.Errorf("Error instantiating object engine type %s: %v", policy.Type, err) + } + } } + server.driveRoot = serverconf.GetDefault("app:object-server", "devices", "/srv/node") server.checkMounts = serverconf.GetBool("app:object-server", "mount_check", true) server.checkEtags = serverconf.GetBool("app:object-server", "check_etags", false) diff --git a/go/objectserver/main_test.go b/go/objectserver/main_test.go index a1293c8a1a..5ec7e44904 100644 --- a/go/objectserver/main_test.go +++ b/go/objectserver/main_test.go @@ -52,6 +52,7 @@ func (t *TestServer) Close() { func (t *TestServer) Do(method string, path string, body io.ReadCloser) (*http.Response, error) { req, err := http.NewRequest(method, fmt.Sprintf("http://%s:%d%s", t.host, t.port, path), body) + req.Header.Set("X-Backend-Storage-Policy-Index", "0") if err != nil { return nil, err } diff --git a/go/objectserver/objengine.go b/go/objectserver/objengine.go index 213f6c3e2c..2b0a5f2db3 100644 --- a/go/objectserver/objengine.go +++ b/go/objectserver/objengine.go @@ -61,7 +61,7 @@ type ObjectEngine interface { } // ObjectEngineConstructor> is a function that, given configs and flags, returns an ObjectEngine -type ObjectEngineConstructor func(hummingbird.Config, *flag.FlagSet) (ObjectEngine, error) +type ObjectEngineConstructor func(hummingbird.Config, *hummingbird.Policy, *flag.FlagSet) (ObjectEngine, error) type engineFactoryEntry struct { name string diff --git a/go/objectserver/objengine_test.go b/go/objectserver/objengine_test.go index 8405956c70..69e100552a 100644 --- a/go/objectserver/objengine_test.go +++ b/go/objectserver/objengine_test.go @@ -26,7 +26,7 @@ import ( func TestObjectEngineRegistry(t *testing.T) { testErr := errors.New("Not implemented") - constructor := func(hummingbird.Config, *flag.FlagSet) (ObjectEngine, error) { + constructor := func(hummingbird.Config, *hummingbird.Policy, *flag.FlagSet) (ObjectEngine, error) { return nil, testErr } @@ -34,7 +34,7 @@ func TestObjectEngineRegistry(t *testing.T) { fconstructor, err := FindEngine("test") require.Nil(t, err) - eng, err := fconstructor(hummingbird.Config{}, nil) + eng, err := fconstructor(hummingbird.Config{}, nil, nil) require.Nil(t, eng) require.Equal(t, err, testErr) diff --git a/go/objectserver/priorityrep.go b/go/objectserver/priorityrep.go index a9bbc38968..f422d7a343 100644 --- a/go/objectserver/priorityrep.go +++ b/go/objectserver/priorityrep.go @@ -18,8 +18,10 @@ package objectserver import ( "bytes" "encoding/json" + "flag" "fmt" "net/http" + "os" "strconv" "strings" "sync" @@ -143,21 +145,29 @@ func getPartMoveJobs(oldRing, newRing hummingbird.Ring) []*PriorityRepJob { // MoveParts takes two object .ring.gz files as []string{oldRing, newRing} and dispatches priority replication jobs to rebalance data in line with any ring changes. func MoveParts(args []string) { - if len(args) != 1 { - fmt.Println("USAGE: hummingbird moveparts [old ringfile]") + flags := flag.NewFlagSet("moveparts", flag.ExitOnError) + policy := flags.Int("p", 0, "policy index to use") + flags.Usage = func() { + fmt.Fprintf(os.Stderr, "USAGE: hummingbird moveparts [old ringfile]") + flags.PrintDefaults() + } + flags.Parse(args) + if len(flags.Args()) != 1 { + flags.Usage() return } + hashPathPrefix, hashPathSuffix, err := hummingbird.GetHashPrefixAndSuffix() if err != nil { fmt.Println("Unable to load hash path prefix and suffix:", err) return } - oldRing, err := hummingbird.LoadRing(args[0], hashPathPrefix, hashPathSuffix) + oldRing, err := hummingbird.LoadRing(flags.Arg(0), hashPathPrefix, hashPathSuffix) if err != nil { fmt.Println("Unable to load old ring:", err) return } - curRing, err := hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix) + curRing, err := hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix, *policy) if err != nil { fmt.Println("Unable to load current ring:", err) return @@ -193,22 +203,30 @@ func getRestoreDeviceJobs(ring hummingbird.Ring, ip string, devName string) []*P // RestoreDevice takes an IP address and device name such as []string{"172.24.0.1", "sda1"} and attempts to restores its data from peers. func RestoreDevice(args []string) { - if len(args) != 2 { - fmt.Println("USAGE: hummingbird restoredevice [ip] [device]") + flags := flag.NewFlagSet("restoredevice", flag.ExitOnError) + policy := flags.Int("p", 0, "policy index to use") + flags.Usage = func() { + fmt.Fprintf(os.Stderr, "USAGE: hummingbird restoredevice [ip] [device]\n") + flags.PrintDefaults() + } + flags.Parse(args) + if len(flags.Args()) != 2 { + flags.Usage() return } + hashPathPrefix, hashPathSuffix, err := hummingbird.GetHashPrefixAndSuffix() if err != nil { fmt.Println("Unable to load hash path prefix and suffix:", err) return } - objRing, err := hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix) + objRing, err := hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix, *policy) if err != nil { fmt.Println("Unable to load ring:", err) return } client := &http.Client{Timeout: time.Hour} - jobs := getRestoreDeviceJobs(objRing, args[0], args[1]) + jobs := getRestoreDeviceJobs(objRing, flags.Arg(0), flags.Arg(1)) fmt.Println("Job count:", len(jobs)) doPriRepJobs(jobs, 2, client) fmt.Println("Done sending jobs.") @@ -231,21 +249,29 @@ func getRescuePartsJobs(objRing hummingbird.Ring, partitions []uint64) []*Priori } func RescueParts(args []string) { - if len(args) != 1 { - fmt.Println("USAGE: hummingbird rescueparts partnum1,partnum2,...") + flags := flag.NewFlagSet("rescueparts", flag.ExitOnError) + policy := flags.Int("p", 0, "policy index to use") + flags.Usage = func() { + fmt.Fprintf(os.Stderr, "USAGE: hummingbird rescueparts partnum1,partnum2,...\n") + flags.PrintDefaults() + } + flags.Parse(args) + if len(flags.Args()) != 1 { + flags.Usage() return } + hashPathPrefix, hashPathSuffix, err := hummingbird.GetHashPrefixAndSuffix() if err != nil { fmt.Println("Unable to load hash path prefix and suffix:", err) return } - objRing, err := hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix) + objRing, err := hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix, *policy) if err != nil { fmt.Println("Unable to load ring:", err) return } - partsStr := strings.Split(args[0], ",") + partsStr := strings.Split(flags.Arg(0), ",") partsInt := make([]uint64, len(partsStr)) for i, p := range partsStr { partsInt[i], err = strconv.ParseUint(p, 10, 64) diff --git a/go/objectserver/repconn.go b/go/objectserver/repconn.go index 96e1d78e81..8f06b724d2 100644 --- a/go/objectserver/repconn.go +++ b/go/objectserver/repconn.go @@ -24,6 +24,7 @@ import ( "net" "net/http" "net/http/httputil" + "strconv" "time" "github.com/openstack/swift/go/hummingbird" @@ -140,9 +141,10 @@ func (r *RepConn) Close() { r.c.Close() } -func NewRepConn(dev *hummingbird.Device, partition string) (*RepConn, error) { +func NewRepConn(dev *hummingbird.Device, partition string, policy int) (*RepConn, error) { url := fmt.Sprintf("http://%s:%d/%s/%s", dev.ReplicationIp, dev.ReplicationPort, dev.Device, partition) req, err := http.NewRequest("REPCONN", url, nil) + req.Header.Set("X-Backend-Storage-Policy-Index", strconv.Itoa(policy)) if err != nil { return nil, err } diff --git a/go/objectserver/replicator.go b/go/objectserver/replicator.go index 6b8d2421c7..3ce02ce6f2 100644 --- a/go/objectserver/replicator.go +++ b/go/objectserver/replicator.go @@ -44,6 +44,7 @@ type job struct { dev *hummingbird.Device partition string objPath string + policy int } type ReplicationData struct { @@ -63,6 +64,7 @@ type PriorityRepJob struct { Partition uint64 `json:"partition"` FromDevice *hummingbird.Device `json:"from_device"` ToDevices []*hummingbird.Device `json:"to_devices"` + Policy int `json:"policy"` } type deviceProgress struct { @@ -92,7 +94,6 @@ type Replicator struct { bindIp string logger hummingbird.SysLogLike port int - Ring hummingbird.Ring devGroup sync.WaitGroup partRateTicker *time.Ticker timePerPart time.Duration @@ -104,6 +105,7 @@ type Replicator struct { priRepChans map[int]chan PriorityRepJob priRepM sync.Mutex reclaimAge int64 + Rings map[int]hummingbird.Ring once bool cancelers map[string]chan struct{} @@ -204,8 +206,8 @@ func (r *Replicator) getFile(filePath string) (fp *os.File, xattrs []byte, size return fp, rawxattr, finfo.Size(), nil } -func (r *Replicator) beginReplication(dev *hummingbird.Device, partition string, hashes bool, rChan chan ReplicationData) { - rc, err := NewRepConn(dev, partition) +func (r *Replicator) beginReplication(dev *hummingbird.Device, partition string, hashes bool, policy int, rChan chan ReplicationData) { + rc, err := NewRepConn(dev, partition, policy) if err != nil { r.LogError("[beginReplication] error creating new request: %v", err) rChan <- ReplicationData{dev: dev, conn: nil, hashes: nil, err: err} @@ -365,7 +367,7 @@ func (r *Replicator) replicateLocal(j *job, nodes []*hummingbird.Device, moreNod startGetHashesRemote := time.Now() rChan := make(chan ReplicationData) for i := 0; i < len(nodes); i++ { - go r.beginReplication(nodes[i], j.partition, true, rChan) + go r.beginReplication(nodes[i], j.partition, true, j.policy, rChan) } for i := 0; i < len(nodes); i++ { rData := <-rChan @@ -375,7 +377,7 @@ func (r *Replicator) replicateLocal(j *job, nodes []*hummingbird.Device, moreNod remoteConnections[rData.dev.Id] = rData.conn } else if rData.err == RepUnmountedError { if nextNode := moreNodes.Next(); nextNode != nil { - go r.beginReplication(nextNode, j.partition, true, rChan) + go r.beginReplication(nextNode, j.partition, true, j.policy, rChan) nodes = append(nodes, nextNode) } } @@ -388,7 +390,7 @@ func (r *Replicator) replicateLocal(j *job, nodes []*hummingbird.Device, moreNod startGetHashesLocal := time.Now() recalc := []string{} - hashes, herr := GetHashes(r.driveRoot, j.dev.Device, j.partition, recalc, r.reclaimAge, r) + hashes, herr := GetHashes(r.driveRoot, j.dev.Device, j.partition, recalc, r.reclaimAge, j.policy, r) if herr != nil { r.LogError("[replicateLocal] error getting local hashes: %v", herr) return @@ -401,7 +403,7 @@ func (r *Replicator) replicateLocal(j *job, nodes []*hummingbird.Device, moreNod } } } - hashes, herr = GetHashes(r.driveRoot, j.dev.Device, j.partition, recalc, r.reclaimAge, r) + hashes, herr = GetHashes(r.driveRoot, j.dev.Device, j.partition, recalc, r.reclaimAge, j.policy, r) if herr != nil { r.LogError("[replicateLocal] error recalculating local hashes: %v", herr) return @@ -456,7 +458,7 @@ func (r *Replicator) replicateHandoff(j *job, nodes []*hummingbird.Device) { rChan := make(chan ReplicationData) nodesNeeded := len(nodes) for i := 0; i < nodesNeeded; i++ { - go r.beginReplication(nodes[i], j.partition, false, rChan) + go r.beginReplication(nodes[i], j.partition, false, j.policy, rChan) } for i := 0; i < nodesNeeded; i++ { rData := <-rChan @@ -538,25 +540,43 @@ func (r *Replicator) replicateDevice(dev *hummingbird.Device, canceler chan stru r.LogError("[replicateDevice] Drive not mounted: %s", dev.Device) break } + if hummingbird.Exists(filepath.Join(r.driveRoot, dev.Device, "lock_device")) { + break + } r.cleanTemp(dev) - objPath := filepath.Join(r.driveRoot, dev.Device, "objects") - if fi, err := os.Stat(objPath); err != nil || !fi.Mode().IsDir() { - r.LogError("[replicateDevice] No objects found: %s", objPath) - break - } - partitionList, err := filepath.Glob(filepath.Join(objPath, "[0-9]*")) - if err != nil { - r.LogError("[replicateDevice] Error getting partition list: %s (%v)", objPath, err) - break - } - // if one of the breaks above triggers, the monitorer should retry after ReplicateDeviceTimeout - for i := len(partitionList) - 1; i > 0; i-- { // shuffle partition list - j := rand.Intn(i + 1) - partitionList[j], partitionList[i] = partitionList[i], partitionList[j] + jobList := make([]job, 0) + + for policy := range r.Rings { + objPath := filepath.Join(r.driveRoot, dev.Device, PolicyDir(policy)) + if fi, err := os.Stat(objPath); err != nil || !fi.Mode().IsDir() { + continue + } + policyPartitions, err := filepath.Glob(filepath.Join(objPath, "[0-9]*")) + if err != nil { + r.LogError("[replicateDevice] Error getting partition list: %s (%v)", objPath, err) + continue + } + for _, partition := range policyPartitions { + partition = filepath.Base(partition) + if _, ok := r.partitions[partition]; len(r.partitions) > 0 && !ok { + continue + } + if _, err := strconv.ParseUint(partition, 10, 64); err == nil { + jobList = append(jobList, + job{objPath: objPath, + partition: partition, + dev: dev, + policy: policy}) + } + } + } + + numPartitions := uint64(len(jobList)) + if numPartitions == 0 { + r.LogError("[replicateDevice] No objects found: %s", filepath.Join(r.driveRoot, dev.Device)) } - numPartitions := uint64(len(partitionList)) partitionsProcessed := uint64(0) r.deviceProgressPassInit <- deviceProgress{ @@ -565,10 +585,12 @@ func (r *Replicator) replicateDevice(dev *hummingbird.Device, canceler chan stru LastPassDuration: lastPassDuration, } - for _, partition := range partitionList { - if hummingbird.Exists(filepath.Join(r.driveRoot, dev.Device, "lock_device")) { - break - } + for i := len(jobList) - 1; i > 0; i-- { // shuffle job list + j := rand.Intn(i + 1) + jobList[j], jobList[i] = jobList[i], jobList[j] + } + + for _, j := range jobList { select { case <-canceler: { @@ -582,31 +604,29 @@ func (r *Replicator) replicateDevice(dev *hummingbird.Device, canceler chan stru default: } r.processPriorityJobs(dev.Id) - if _, ok := r.partitions[filepath.Base(partition)]; len(r.partitions) > 0 && !ok { - continue - } - if partitioni, err := strconv.ParseUint(filepath.Base(partition), 10, 64); err == nil { - func() { - <-r.partRateTicker.C - r.concurrencySem <- struct{}{} - r.deviceProgressIncr <- deviceProgress{ - dev: dev, - PartitionsDone: 1, - } - j := &job{objPath: objPath, partition: filepath.Base(partition), dev: dev} - nodes, handoff := r.Ring.GetJobNodes(partitioni, j.dev.Id) - defer func() { - <-r.concurrencySem - }() - partitionsProcessed += 1 - if handoff { - r.replicateHandoff(j, nodes) - } else { - r.replicateLocal(j, nodes, r.Ring.GetMoreNodes(partitioni)) - } + func() { + <-r.partRateTicker.C + r.concurrencySem <- struct{}{} + defer func() { + <-r.concurrencySem }() - } + r.deviceProgressIncr <- deviceProgress{ + dev: dev, + PartitionsDone: 1, + } + partitioni, err := strconv.ParseUint(j.partition, 10, 64) + if err != nil { + return + } + nodes, handoff := r.Rings[j.policy].GetJobNodes(partitioni, j.dev.Id) + partitionsProcessed += 1 + if handoff { + r.replicateHandoff(&j, nodes) + } else { + r.replicateLocal(&j, nodes, r.Rings[j.policy].GetMoreNodes(partitioni)) + } + }() } if partitionsProcessed >= numPartitions { r.deviceProgressIncr <- deviceProgress{ @@ -624,16 +644,22 @@ func (r *Replicator) replicateDevice(dev *hummingbird.Device, canceler chan stru // restartDevices should be called periodically from the statsReporter thread to make sure devices that should be replicating are. func (r *Replicator) restartDevices() { shouldBeRunning := make(map[string]bool) - if localDevices, err := r.Ring.LocalDevices(r.port); err != nil { - r.LogError("Error getting local devices: %v", err) - return - } else { - // launch replication for any new devices - for _, dev := range localDevices { - shouldBeRunning[dev.Device] = true - if _, ok := r.deviceProgressMap[dev.Device]; !ok { - r.restartReplicateDevice(dev) - } + deviceMap := map[string]*hummingbird.Device{} + for _, ring := range r.Rings { + devices, err := ring.LocalDevices(r.port) + if err != nil { + r.LogError("Error getting local devices: %v", err) + return + } + for _, dev := range devices { + deviceMap[dev.Device] = dev + } + } + // launch replication for any new devices + for _, dev := range deviceMap { + shouldBeRunning[dev.Device] = true + if _, ok := r.deviceProgressMap[dev.Device]; !ok { + r.restartReplicateDevice(dev) } } // kill any replicators that are no longer in the ring @@ -756,12 +782,18 @@ func (r *Replicator) run() { r.partRateTicker = time.NewTicker(r.timePerPart) r.concurrencySem = make(chan struct{}, r.concurrency) - localDevices, err := r.Ring.LocalDevices(r.port) - if err != nil { - r.LogError("Error getting local devices: %v", err) - return + deviceMap := map[string]*hummingbird.Device{} + for _, ring := range r.Rings { + devices, err := ring.LocalDevices(r.port) + if err != nil { + r.LogError("Error getting local devices: %v", err) + return + } + for _, dev := range devices { + deviceMap[dev.Device] = dev + } } - for _, dev := range localDevices { + for _, dev := range deviceMap { if _, ok := r.devices[dev.Device]; ok || len(r.devices) == 0 { r.restartReplicateDevice(dev) } @@ -786,8 +818,9 @@ func (r *Replicator) processPriorityJobs(id int) { dev: pri.FromDevice, partition: strconv.FormatUint(pri.Partition, 10), objPath: filepath.Join(r.driveRoot, pri.FromDevice.Device, "objects"), + policy: pri.Policy, } - _, handoff := r.Ring.GetJobNodes(pri.Partition, pri.FromDevice.Id) + _, handoff := r.Rings[pri.Policy].GetJobNodes(pri.Partition, pri.FromDevice.Id) defer func() { <-r.concurrencySem }() @@ -920,6 +953,15 @@ func NewReplicator(serverconf hummingbird.Config, flags *flag.FlagSet) (hummingb if err != nil { return nil, fmt.Errorf("Unable to get hash prefix and suffix") } + replicator.Rings = make(map[int]hummingbird.Ring) + for _, policy := range hummingbird.LoadPolicies() { + if policy.Type != "replication" { + continue + } + if replicator.Rings[policy.Index], err = hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix, policy.Index); err != nil { + return nil, fmt.Errorf("Unable to load ring.") + } + } replicator.reconCachePath = serverconf.GetDefault("object-auditor", "recon_cache_path", "/var/cache/swift") replicator.checkMounts = serverconf.GetBool("object-replicator", "mount_check", true) replicator.driveRoot = serverconf.GetDefault("object-replicator", "devices", "/srv/node") @@ -935,9 +977,6 @@ func NewReplicator(serverconf hummingbird.Config, flags *flag.FlagSet) (hummingb replicator.timePerPart = time.Duration(serverconf.GetInt("object-replicator", "ms_per_part", 750)) * time.Millisecond } replicator.concurrency = int(serverconf.GetInt("object-replicator", "concurrency", 1)) - if replicator.Ring, err = hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix); err != nil { - return nil, fmt.Errorf("Unable to load ring.") - } devices_flag := flags.Lookup("devices") if devices_flag != nil { if devices := devices_flag.Value.(flag.Getter).Get().(string); len(devices) > 0 { diff --git a/go/objectserver/replicator_test.go b/go/objectserver/replicator_test.go index 4a1b76db15..6077197b60 100644 --- a/go/objectserver/replicator_test.go +++ b/go/objectserver/replicator_test.go @@ -373,7 +373,7 @@ func TestReplicationLocal(t *testing.T) { replicator, err := makeReplicator("bind_port", fmt.Sprintf("%d", ts.port)) require.Nil(t, err) replicator.driveRoot = ts.objServer.driveRoot - replicator.Ring = &FakeRepRing1{ldev: ldev, rdev: rdev} + replicator.Rings[0] = &FakeRepRing1{ldev: ldev, rdev: rdev} replicator.Run() req, err = http.NewRequest("HEAD", fmt.Sprintf("http://%s:%d/sda/0/a/c/o", ts2.host, ts2.port), nil) @@ -426,7 +426,7 @@ func TestReplicationHandoff(t *testing.T) { replicator, err := makeReplicator("bind_port", fmt.Sprintf("%d", ts.port)) require.Nil(t, err) replicator.driveRoot = ts.objServer.driveRoot - replicator.Ring = &FakeRepRing2{ldev: ldev, rdev: rdev} + replicator.Rings[0] = &FakeRepRing2{ldev: ldev, rdev: rdev} replicator.Run() req, err = http.NewRequest("HEAD", fmt.Sprintf("http://%s:%d/sda/0/a/c/o", ts2.host, ts2.port), nil) @@ -473,7 +473,7 @@ func TestReplicationHandoffQuorumDelete(t *testing.T) { replicator, _ = makeReplicatorWithFlags([]string{"bind_port", fmt.Sprintf("%d", ts.port)}, flags) require.True(t, replicator.quorumDelete) replicator.driveRoot = ts.objServer.driveRoot - replicator.Ring = &FakeRepRing2{ldev: ldev, rdev: rdev} + replicator.Rings[0] = &FakeRepRing2{ldev: ldev, rdev: rdev} replicator.Run() req, err = http.NewRequest("HEAD", fmt.Sprintf("http://%s:%d/sda/0/a/c/o", ts2.host, ts2.port), nil) @@ -616,7 +616,7 @@ func TestRestartDevice(t *testing.T) { saved := &replicationLogSaver{} repl, _ := makeReplicator() - repl.Ring = &FakeRepRing2{ldev: ldev, rdev: rdev} + repl.Rings[0] = &FakeRepRing2{ldev: ldev, rdev: rdev} repl.logger = saved // set stuff up @@ -680,7 +680,7 @@ func TestRestartDevices(t *testing.T) { ldev := &hummingbird.Device{ReplicationIp: "127.0.0.1", ReplicationPort: 6001, Device: "sda"} rdev := &hummingbird.Device{ReplicationIp: "127.0.0.2", ReplicationPort: 6001, Device: "sdb"} ring := &FakeRepRing2{ldev: ldev, rdev: rdev} - replicator.Ring = ring + replicator.Rings[0] = ring replicator.restartDevices() _, oka := replicator.cancelers["sda"] _, okb := replicator.cancelers["sdb"] @@ -704,16 +704,17 @@ func TestSyncFileQuarantine(t *testing.T) { hashDir := filepath.Join(driveRoot, "sda", "objects", "1", "abc", "d41d8cd98f00b204e9800998ecf8427e") objFile := filepath.Join(hashDir, "12345.data") + j := &job{policy: 0} require.Nil(t, os.MkdirAll(objFile, 0777)) // not a regular file - replicator.syncFile(objFile, nil, nil) + replicator.syncFile(objFile, nil, j) assert.False(t, hummingbird.Exists(hashDir)) os.MkdirAll(hashDir, 0777) // error reading metadata fp, err := os.Create(objFile) require.Nil(t, err) defer fp.Close() - replicator.syncFile(objFile, nil, nil) + replicator.syncFile(objFile, nil, j) assert.False(t, hummingbird.Exists(hashDir)) os.MkdirAll(hashDir, 0777) // unparseable pickle @@ -721,7 +722,7 @@ func TestSyncFileQuarantine(t *testing.T) { defer fp.Close() require.Nil(t, err) RawWriteMetadata(fp.Fd(), []byte("NOT A VALID PICKLE")) - replicator.syncFile(objFile, nil, nil) + replicator.syncFile(objFile, nil, j) assert.False(t, hummingbird.Exists(hashDir)) os.MkdirAll(hashDir, 0777) // wrong metadata type @@ -729,7 +730,7 @@ func TestSyncFileQuarantine(t *testing.T) { defer fp.Close() require.Nil(t, err) RawWriteMetadata(fp.Fd(), hummingbird.PickleDumps("hi")) - replicator.syncFile(objFile, nil, nil) + replicator.syncFile(objFile, nil, j) assert.False(t, hummingbird.Exists(hashDir)) os.MkdirAll(hashDir, 0777) // unparseable content-length @@ -740,7 +741,7 @@ func TestSyncFileQuarantine(t *testing.T) { "Content-Type": "text/plain", "name": "/a/c/o", "ETag": "d41d8cd98f00b204e9800998ecf8427e", "X-Timestamp": "12345.12345", "Content-Length": "X"} RawWriteMetadata(fp.Fd(), hummingbird.PickleDumps(badContentLengthMetdata)) - replicator.syncFile(objFile, nil, nil) + replicator.syncFile(objFile, nil, j) assert.False(t, hummingbird.Exists(hashDir)) os.MkdirAll(hashDir, 0777) // content-length doesn't match file size @@ -751,6 +752,6 @@ func TestSyncFileQuarantine(t *testing.T) { "Content-Type": "text/plain", "name": "/a/c/o", "ETag": "d41d8cd98f00b204e9800998ecf8427e", "X-Timestamp": "12345.12345", "Content-Length": "50000"} RawWriteMetadata(fp.Fd(), hummingbird.PickleDumps(wrongContentLengthMetdata)) - replicator.syncFile(objFile, nil, nil) + replicator.syncFile(objFile, nil, j) assert.False(t, hummingbird.Exists(hashDir)) } diff --git a/go/objectserver/swiftobjeng.go b/go/objectserver/swiftobjeng.go index a92fe288fb..d5d7f61b7a 100644 --- a/go/objectserver/swiftobjeng.go +++ b/go/objectserver/swiftobjeng.go @@ -175,13 +175,14 @@ type SwiftObjectFactory struct { replicationMan *ReplicationManager replicateTimeout time.Duration reclaimAge int64 + policy int } // New returns an instance of SwiftObject with the given parameters. Metadata is read in and if needData is true, the file is opened. func (f *SwiftObjectFactory) New(vars map[string]string, needData bool) (Object, error) { var err error sor := &SwiftObject{reclaimAge: f.reclaimAge, reserve: f.reserve} - sor.hashDir = ObjHashDir(vars, f.driveRoot, f.hashPathPrefix, f.hashPathSuffix) + sor.hashDir = ObjHashDir(vars, f.driveRoot, f.hashPathPrefix, f.hashPathSuffix, f.policy) sor.tempDir = TempDirPath(f.driveRoot, vars["device"]) sor.dataFile, sor.metaFile = ObjectFiles(sor.hashDir) if sor.Exists() { @@ -226,8 +227,8 @@ func (f *SwiftObjectFactory) objReplicateHandler(writer http.ResponseWriter, req if len(vars["suffixes"]) > 0 { recalculate = strings.Split(vars["suffixes"], "-") } - hashes, err := GetHashes(f.driveRoot, vars["device"], vars["partition"], recalculate, f.reclaimAge, hummingbird.GetLogger(request)) - if err != nil { + hashes, herr := GetHashes(f.driveRoot, vars["device"], vars["partition"], recalculate, f.reclaimAge, f.policy, hummingbird.GetLogger(request)) + if herr != nil { hummingbird.GetLogger(request).LogError("Unable to get hashes for %s/%s", vars["device"], vars["partition"]) hummingbird.StandardResponse(writer, http.StatusInternalServerError) return @@ -273,7 +274,7 @@ func (f *SwiftObjectFactory) objRepConnHandler(writer http.ResponseWriter, reque var hashes map[string]string if brr.NeedHashes { var herr *hummingbird.BackendError - hashes, herr = GetHashes(f.driveRoot, brr.Device, brr.Partition, nil, f.reclaimAge, hummingbird.GetLogger(request)) + hashes, herr = GetHashes(f.driveRoot, brr.Device, brr.Partition, nil, f.reclaimAge, f.policy, hummingbird.GetLogger(request)) if herr != nil { hummingbird.GetLogger(request).LogError("[ObjRepConnHandler] Error getting hashes: %v", herr) writer.WriteHeader(http.StatusInternalServerError) @@ -355,7 +356,7 @@ func (f *SwiftObjectFactory) RegisterHandlers(addRoute func(method, path string, } // SwiftEngineConstructor creates a SwiftObjectFactory given the object server configs. -func SwiftEngineConstructor(config hummingbird.Config, flags *flag.FlagSet) (ObjectEngine, error) { +func SwiftEngineConstructor(config hummingbird.Config, policy *hummingbird.Policy, flags *flag.FlagSet) (ObjectEngine, error) { driveRoot := config.GetDefault("app:object-server", "devices", "/srv/node") reserve := config.GetInt("app:object-server", "fallocate_reserve", 0) hashPathPrefix, hashPathSuffix, err := hummingbird.GetHashPrefixAndSuffix() @@ -372,11 +373,12 @@ func SwiftEngineConstructor(config hummingbird.Config, flags *flag.FlagSet) (Obj reserve: reserve, replicationMan: replicationMan, replicateTimeout: replicateTimeout, - reclaimAge: reclaimAge}, nil + reclaimAge: reclaimAge, + policy: policy.Index}, nil } func init() { - RegisterObjectEngine("swift", SwiftEngineConstructor) + RegisterObjectEngine("replication", SwiftEngineConstructor) } // make sure these things satisfy interfaces at compile time diff --git a/go/objectserver/update.go b/go/objectserver/update.go index 9a713a9e7d..ee89f182f2 100644 --- a/go/objectserver/update.go +++ b/go/objectserver/update.go @@ -116,10 +116,11 @@ func (server *ObjectServer) updateContainer(metadata map[string]string, request return } requestHeaders := http.Header{ - "Referer": {hummingbird.GetDefault(request.Header, "Referer", "-")}, - "User-Agent": {hummingbird.GetDefault(request.Header, "User-Agent", "-")}, - "X-Trans-Id": {hummingbird.GetDefault(request.Header, "X-Trans-Id", "-")}, - "X-Timestamp": {request.Header.Get("X-Timestamp")}, + "X-Backend-Storage-Policy-Index": {hummingbird.GetDefault(request.Header, "X-Backend-Storage-Policy-Index", "0")}, + "Referer": {hummingbird.GetDefault(request.Header, "Referer", "-")}, + "User-Agent": {hummingbird.GetDefault(request.Header, "User-Agent", "-")}, + "X-Trans-Id": {hummingbird.GetDefault(request.Header, "X-Trans-Id", "-")}, + "X-Timestamp": {request.Header.Get("X-Timestamp")}, } if request.Method != "DELETE" { requestHeaders.Add("X-Content-Type", metadata["Content-Type"]) @@ -152,10 +153,11 @@ func (server *ObjectServer) updateDeleteAt(request *http.Request, deleteAtStr st hosts := splitHeader(request.Header.Get("X-Delete-At-Host")) devices := splitHeader(request.Header.Get("X-Delete-At-Device")) requestHeaders := http.Header{ - "Referer": {hummingbird.GetDefault(request.Header, "Referer", "-")}, - "User-Agent": {hummingbird.GetDefault(request.Header, "User-Agent", "-")}, - "X-Trans-Id": {hummingbird.GetDefault(request.Header, "X-Trans-Id", "-")}, - "X-Timestamp": {request.Header.Get("X-Timestamp")}, + "X-Backend-Storage-Policy-Index": {hummingbird.GetDefault(request.Header, "X-Backend-Storage-Policy-Index", "0")}, + "Referer": {hummingbird.GetDefault(request.Header, "Referer", "-")}, + "User-Agent": {hummingbird.GetDefault(request.Header, "User-Agent", "-")}, + "X-Trans-Id": {hummingbird.GetDefault(request.Header, "X-Trans-Id", "-")}, + "X-Timestamp": {request.Header.Get("X-Timestamp")}, } if request.Method != "DELETE" { requestHeaders.Add("X-Content-Type", "text/plain") diff --git a/go/probe/auditor_test.go b/go/probe/auditor_test.go index 8f7222f086..74ded62b00 100644 --- a/go/probe/auditor_test.go +++ b/go/probe/auditor_test.go @@ -31,9 +31,9 @@ func TestAuditorMd5(t *testing.T) { // put a file timestamp := hummingbird.GetTimestamp() - e.PutObject(0, timestamp, "X") + e.PutObject(0, timestamp, "X", 0) - locations := e.FileLocations("a", "c", "o") + locations := e.FileLocations("a", "c", "o", 0) path := filepath.Join(locations[0], timestamp+".data") // make sure the file is still there after an audit pass @@ -56,9 +56,9 @@ func TestAuditorContentLength(t *testing.T) { // put a file timestamp := hummingbird.GetTimestamp() - e.PutObject(0, timestamp, "X") + e.PutObject(0, timestamp, "X", 0) - locations := e.FileLocations("a", "c", "o") + locations := e.FileLocations("a", "c", "o", 0) path := filepath.Join(locations[0], timestamp+".data") // make sure the file is still there after an audit pass diff --git a/go/probe/base.go b/go/probe/base.go index e507fe83cc..8363bb0812 100644 --- a/go/probe/base.go +++ b/go/probe/base.go @@ -112,18 +112,18 @@ func (e *Environment) Close() { } // FileLocations returns a list of file paths for the object's hash directory on all three underlying object servers. -func (e *Environment) FileLocations(account, container, obj string) (paths []string) { +func (e *Environment) FileLocations(account, container, obj string, policy int) (paths []string) { partition := e.ring.GetPartition(account, container, obj) vars := map[string]string{"account": account, "container": container, "obj": obj, "partition": strconv.Itoa(int(partition)), "device": "sda"} for i := 0; i < 4; i++ { - path := objectserver.ObjHashDir(vars, e.driveRoots[i], e.hashPrefix, e.hashSuffix) + path := objectserver.ObjHashDir(vars, e.driveRoots[i], e.hashPrefix, e.hashSuffix, policy) paths = append(paths, path) } return } // PutObject uploads an object "/a/c/o" to the indicated server with X-Timestamp set to timestamp and body set to data. -func (e *Environment) PutObject(server int, timestamp string, data string) bool { +func (e *Environment) PutObject(server int, timestamp string, data string, policy int) bool { body := bytes.NewBuffer([]byte(data)) req, err := http.NewRequest("PUT", fmt.Sprintf("http://%s:%d/sda/0/a/c/o", e.hosts[server], e.ports[server]), body) if err != nil { @@ -132,16 +132,30 @@ func (e *Environment) PutObject(server int, timestamp string, data string) bool req.Header.Set("Content-Type", "application/octet-stream") req.Header.Set("Content-Length", strconv.Itoa(len(data))) req.Header.Set("X-Timestamp", timestamp) + req.Header.Set("X-Backend-Storage-Policy-Index", strconv.Itoa(policy)) resp, err := http.DefaultClient.Do(req) return err == nil && resp.StatusCode == 201 } +// DeleteObject deletes the object. +func (e *Environment) DeleteObject(server int, timestamp string, policy int) bool { + req, err := http.NewRequest("DELETE", fmt.Sprintf("http://%s:%d/sda/0/a/c/o", e.hosts[server], e.ports[server]), nil) + if err != nil { + return false + } + req.Header.Set("X-Timestamp", timestamp) + req.Header.Set("X-Backend-Storage-Policy-Index", strconv.Itoa(policy)) + resp, err := http.DefaultClient.Do(req) + return err == nil && resp.StatusCode == 204 +} + // ObjExists returns a boolean indicating that it can fetch the named object and that its X-Timestamp matches the timestamp argument. -func (e *Environment) ObjExists(server int, timestamp string) bool { +func (e *Environment) ObjExists(server int, timestamp string, policy int) bool { req, err := http.NewRequest("HEAD", fmt.Sprintf("http://%s:%d/sda/0/a/c/o", e.hosts[server], e.ports[server]), nil) if err != nil { return false } + req.Header.Set("X-Backend-Storage-Policy-Index", strconv.Itoa(policy)) resp, err := http.DefaultClient.Do(req) if err != nil || resp.StatusCode != 200 { return false @@ -151,6 +165,30 @@ func (e *Environment) ObjExists(server int, timestamp string) bool { // NewEnvironment creates a new environment. Arguments should be a series of key, value pairs that are added to the object server configuration file. func NewEnvironment(settings ...string) *Environment { + oldLoadPolicies := hummingbird.LoadPolicies + hummingbird.LoadPolicies = func() hummingbird.PolicyList { + return hummingbird.PolicyList(map[int]*hummingbird.Policy{ + 0: &hummingbird.Policy{ + Index: 0, + Type: "replication", + Name: "Policy-0", + Aliases: nil, + Default: false, + Deprecated: false, + }, + 1: &hummingbird.Policy{ + Index: 1, + Type: "replication", + Name: "Policy-1", + Aliases: nil, + Default: false, + Deprecated: false, + }, + }) + } + defer func() { + hummingbird.LoadPolicies = oldLoadPolicies + }() env := &Environment{ring: &FakeRing{devices: nil}} env.hashPrefix, env.hashSuffix, _ = hummingbird.GetHashPrefixAndSuffix() for i := 0; i < 4; i++ { @@ -174,7 +212,8 @@ func NewEnvironment(settings ...string) *Environment { ts.Config.Handler = server.GetHandler(conf) replicator, _ := objectserver.NewReplicator(conf, &flag.FlagSet{}) auditor, _ := objectserver.NewAuditor(conf, &flag.FlagSet{}) - replicator.(*objectserver.Replicator).Ring = env.ring + replicator.(*objectserver.Replicator).Rings[0] = env.ring + replicator.(*objectserver.Replicator).Rings[1] = env.ring env.ring.(*FakeRing).devices = append(env.ring.(*FakeRing).devices, &hummingbird.Device{ Id: i, Device: "sda", Ip: host, Port: port, Region: 0, ReplicationIp: host, ReplicationPort: port, Weight: 1, Zone: i, }) diff --git a/go/probe/combo_test.go b/go/probe/combo_test.go index c7a9bf49de..32c80efd7e 100644 --- a/go/probe/combo_test.go +++ b/go/probe/combo_test.go @@ -32,11 +32,11 @@ func TestAuditReplicate(t *testing.T) { // put a file timestamp := hummingbird.GetTimestamp() for i := 0; i < 3; i++ { - e.PutObject(i, timestamp, "X") + e.PutObject(i, timestamp, "X", 0) } // simulate bit-rot of the file contents - locations := e.FileLocations("a", "c", "o") + locations := e.FileLocations("a", "c", "o", 0) path := filepath.Join(locations[0], timestamp+".data") f, _ := os.OpenFile(path, os.O_RDWR, 0777) f.Write([]byte("!")) @@ -44,9 +44,9 @@ func TestAuditReplicate(t *testing.T) { // make sure the file is gone after an audit pass e.auditors[0].Run() - assert.False(t, e.ObjExists(0, timestamp)) + assert.False(t, e.ObjExists(0, timestamp, 0)) // make sure the file is replaced after another server's replicator runs e.replicators[1].Run() - assert.True(t, e.ObjExists(0, timestamp)) + assert.True(t, e.ObjExists(0, timestamp, 0)) } diff --git a/go/probe/policy_test.go b/go/probe/policy_test.go new file mode 100644 index 0000000000..3a91849fc4 --- /dev/null +++ b/go/probe/policy_test.go @@ -0,0 +1,95 @@ +// Copyright (c) 2015 Rackspace +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package probe + +import ( + "os" + "path/filepath" + "testing" + + "github.com/openstack/swift/go/hummingbird" + "github.com/stretchr/testify/require" +) + +func TestPolicySeparate(t *testing.T) { + e := NewEnvironment() + defer e.Close() + + timestamp := hummingbird.GetTimestamp() + require.True(t, e.PutObject(0, timestamp, "X", 0)) + + require.True(t, e.ObjExists(0, timestamp, 0)) + require.False(t, e.ObjExists(0, timestamp, 1)) + + timestamp = hummingbird.GetTimestamp() + require.True(t, e.DeleteObject(0, timestamp, 0)) + require.True(t, e.PutObject(0, timestamp, "X", 1)) + + require.False(t, e.ObjExists(0, timestamp, 0)) + require.True(t, e.ObjExists(0, timestamp, 1)) +} + +func TestBasicPolicyReplication(t *testing.T) { + e := NewEnvironment() + defer e.Close() + + timestamp := hummingbird.GetTimestamp() + require.True(t, e.PutObject(0, timestamp, "X", 1)) + + e.replicators[0].Run() + + require.False(t, e.ObjExists(0, timestamp, 0)) + require.False(t, e.ObjExists(1, timestamp, 0)) + require.True(t, e.ObjExists(0, timestamp, 1)) + require.True(t, e.ObjExists(2, timestamp, 1)) +} + +func TestOtherPolicyAuditReplicate(t *testing.T) { + e := NewEnvironment() + defer e.Close() + + timestamp := hummingbird.GetTimestamp() + e.PutObject(0, timestamp, "X", 1) + e.PutObject(1, timestamp, "X", 1) + + locations := e.FileLocations("a", "c", "o", 1) + path := filepath.Join(locations[0], timestamp+".data") + + e.auditors[0].Run() + e.auditors[1].Run() + + require.True(t, e.ObjExists(0, timestamp, 1)) + require.True(t, e.ObjExists(1, timestamp, 1)) + + f, _ := os.OpenFile(path, os.O_RDWR, 0777) + f.Write([]byte("!")) + f.Close() + + e.auditors[0].Run() + e.auditors[1].Run() + + require.True(t, e.ObjExists(1, timestamp, 1)) + require.False(t, e.ObjExists(0, timestamp, 1)) + + e.replicators[0].Run() + e.replicators[1].Run() + + require.True(t, e.ObjExists(0, timestamp, 1)) + require.True(t, e.ObjExists(1, timestamp, 1)) + + require.False(t, e.ObjExists(0, timestamp, 0)) + require.False(t, e.ObjExists(1, timestamp, 0)) +} diff --git a/go/probe/replicator_test.go b/go/probe/replicator_test.go index e37674e1ff..11be650cff 100644 --- a/go/probe/replicator_test.go +++ b/go/probe/replicator_test.go @@ -30,7 +30,7 @@ func TestReplicationHandoff(t *testing.T) { // put a file timestamp := hummingbird.GetTimestamp() - assert.True(t, e.PutObject(0, timestamp, "X")) + assert.True(t, e.PutObject(0, timestamp, "X", 0)) // make a drive look unmounted with a handler that always 507s origHandler := e.servers[1].Config.Handler @@ -42,28 +42,28 @@ func TestReplicationHandoff(t *testing.T) { e.replicators[0].Run() // so it's on the primary nodes that are up - assert.True(t, e.ObjExists(0, timestamp)) - assert.True(t, e.ObjExists(2, timestamp)) + assert.True(t, e.ObjExists(0, timestamp, 0)) + assert.True(t, e.ObjExists(2, timestamp, 0)) // and now it's on the handoff node - assert.True(t, e.ObjExists(3, timestamp)) + assert.True(t, e.ObjExists(3, timestamp, 0)) // fix the "unmounted" drive e.servers[1].Config.Handler = origHandler // make sure it's not on the newly fixed node yet - assert.False(t, e.ObjExists(1, timestamp)) + assert.False(t, e.ObjExists(1, timestamp, 0)) // run the handoff node's replicator e.replicators[3].Run() // it's no longer on the handoff node - assert.False(t, e.ObjExists(3, timestamp)) + assert.False(t, e.ObjExists(3, timestamp, 0)) // make sure it's on all the primary nodes - assert.True(t, e.ObjExists(0, timestamp)) - assert.True(t, e.ObjExists(1, timestamp)) - assert.True(t, e.ObjExists(2, timestamp)) + assert.True(t, e.ObjExists(0, timestamp, 0)) + assert.True(t, e.ObjExists(1, timestamp, 0)) + assert.True(t, e.ObjExists(2, timestamp, 0)) } func TestReplicationUnlinkOld(t *testing.T) { @@ -72,18 +72,18 @@ func TestReplicationUnlinkOld(t *testing.T) { // put a file to a primary node timestamp := hummingbird.GetTimestamp() - assert.True(t, e.PutObject(0, timestamp, "X")) + assert.True(t, e.PutObject(0, timestamp, "X", 0)) // put a newer file to another primary node timestamp2 := hummingbird.GetTimestamp() - assert.True(t, e.PutObject(1, timestamp2, "X")) + assert.True(t, e.PutObject(1, timestamp2, "X", 0)) - assert.True(t, e.ObjExists(0, timestamp)) - assert.True(t, e.ObjExists(1, timestamp2)) + assert.True(t, e.ObjExists(0, timestamp, 0)) + assert.True(t, e.ObjExists(1, timestamp2, 0)) // run the replicator on the server with the old file e.replicators[0].Run() // verify the old file was removed by the replicator - assert.False(t, e.ObjExists(0, timestamp)) + assert.False(t, e.ObjExists(0, timestamp, 0)) }