go: replication prioritizer
This adds two new commands: hummingbird moveparts [old ring.gz] [new ring.gz] After a rebalance, moveparts will diff the two rings and instruct the object replicators (via HTTP call) to execute jobs that bulk move any re-assigned partitions. hummingbird restoredevice [ip] [device-name] restoredevice will instruct peers of the selected device to sync over their copies of any partitions that should be on that drive. It can be used to speed up restoring a drive after it's been swapped. They should both run until all of the jobs have been taken up by the replicator, so a large job might take a while. This means the replicator has a semi-real HTTP server now, and not just a debug port. It binds to the replication object server's port + 500, which is totally arbitrary and open to bikeshedding. Change-Id: I4bbb2719657eb84c5191e053a1654e986511bb95
This commit is contained in:
@@ -279,6 +279,10 @@ func main() {
|
||||
fmt.Fprintf(os.Stderr, "\n")
|
||||
proxyFlags.Usage()
|
||||
fmt.Fprintf(os.Stderr, "\n")
|
||||
fmt.Fprintf(os.Stderr, "hummingbird moveparts [old ring.gz] [new ring.gz]\n")
|
||||
fmt.Fprintf(os.Stderr, " Prioritize replication for moving partitions after a ring change\n\n")
|
||||
fmt.Fprintf(os.Stderr, "hummingbird restoredevice [ip] [device-name]\n")
|
||||
fmt.Fprintf(os.Stderr, " Reconstruct a device from its peers\n\n")
|
||||
fmt.Fprintf(os.Stderr, "hummingbird bench CONFIG\n")
|
||||
fmt.Fprintf(os.Stderr, " Run bench tool\n\n")
|
||||
fmt.Fprintf(os.Stderr, "hummingbird dbench CONFIG\n")
|
||||
@@ -325,6 +329,10 @@ func main() {
|
||||
bench.RunDBench(flag.Args()[1:])
|
||||
case "thrash":
|
||||
bench.RunThrash(flag.Args()[1:])
|
||||
case "moveparts":
|
||||
objectserver.MoveParts(flag.Args()[1:])
|
||||
case "restoredevice":
|
||||
objectserver.RestoreDevice(flag.Args()[1:])
|
||||
default:
|
||||
flag.Usage()
|
||||
}
|
||||
|
||||
@@ -311,7 +311,7 @@ func RunDaemon(GetDaemon func(string, *flag.FlagSet) (Daemon, error), flags *fla
|
||||
configFile := flags.Lookup("c").Value.(flag.Getter).Get().(string)
|
||||
configFiles, err := filepath.Glob(filepath.Join(configFile, "*.conf"))
|
||||
if err != nil || len(configFiles) <= 0 {
|
||||
configFiles = []string{flags.Arg(0)}
|
||||
configFiles = []string{configFile}
|
||||
}
|
||||
|
||||
once := flags.Lookup("once").Value.(flag.Getter).Get() == true
|
||||
|
||||
215
go/objectserver/priorityrep.go
Normal file
215
go/objectserver/priorityrep.go
Normal file
@@ -0,0 +1,215 @@
|
||||
// Copyright (c) 2015 Rackspace
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
// implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package objectserver
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/openstack/swift/go/hummingbird"
|
||||
)
|
||||
|
||||
type devLimiter struct {
|
||||
inUse map[int]int
|
||||
m sync.Mutex
|
||||
max int
|
||||
somethingFinished chan struct{}
|
||||
}
|
||||
|
||||
func (d *devLimiter) start(j *PriorityRepJob) bool {
|
||||
d.m.Lock()
|
||||
doable := d.inUse[j.FromDevice.Id] < d.max
|
||||
for _, dev := range j.ToDevices {
|
||||
doable = doable && d.inUse[dev.Id] < d.max
|
||||
}
|
||||
if doable {
|
||||
d.inUse[j.FromDevice.Id] += 1
|
||||
for _, dev := range j.ToDevices {
|
||||
d.inUse[dev.Id] += 1
|
||||
}
|
||||
}
|
||||
d.m.Unlock()
|
||||
return doable
|
||||
}
|
||||
|
||||
func (d *devLimiter) finished(j *PriorityRepJob) {
|
||||
d.m.Lock()
|
||||
d.inUse[j.FromDevice.Id] -= 1
|
||||
for _, dev := range j.ToDevices {
|
||||
d.inUse[dev.Id] -= 1
|
||||
}
|
||||
d.m.Unlock()
|
||||
select {
|
||||
case d.somethingFinished <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (d *devLimiter) waitForSomethingToFinish() {
|
||||
<-d.somethingFinished
|
||||
}
|
||||
|
||||
// doPriRepJobs executes a list of PriorityRepJobs, limiting concurrent jobs per device to deviceMax.
|
||||
func doPriRepJobs(jobs []*PriorityRepJob, deviceMax int, client *http.Client) {
|
||||
limiter := &devLimiter{inUse: make(map[int]int), max: deviceMax, somethingFinished: make(chan struct{}, 1)}
|
||||
wg := sync.WaitGroup{}
|
||||
for len(jobs) > 0 {
|
||||
foundDoable := false
|
||||
for i := range jobs {
|
||||
if !limiter.start(jobs[i]) {
|
||||
continue
|
||||
}
|
||||
foundDoable = true
|
||||
wg.Add(1)
|
||||
go func(job *PriorityRepJob) {
|
||||
defer wg.Done()
|
||||
defer limiter.finished(job)
|
||||
url := fmt.Sprintf("http://%s:%d/priorityrep", job.FromDevice.ReplicationIp, job.FromDevice.ReplicationPort+500)
|
||||
jsonned, err := json.Marshal(job)
|
||||
if err != nil {
|
||||
fmt.Println("Failed to serialize job for some reason:", err)
|
||||
return
|
||||
}
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonned))
|
||||
if err != nil {
|
||||
fmt.Println("Failed to create request for some reason:", err)
|
||||
return
|
||||
}
|
||||
req.ContentLength = int64(len(jsonned))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
fmt.Printf("Error moving partition %d: %v\n", job.Partition, err)
|
||||
return
|
||||
}
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode/100 != 2 {
|
||||
fmt.Printf("Bad status code moving partition %d: %d\n", job.Partition, resp.StatusCode)
|
||||
} else {
|
||||
fmt.Printf("Replicating partition %d from %s/%s\n", job.Partition, job.FromDevice.Ip, job.FromDevice.Device)
|
||||
}
|
||||
}(jobs[i])
|
||||
jobs = append(jobs[:i], jobs[i+1:]...)
|
||||
break
|
||||
}
|
||||
if !foundDoable {
|
||||
limiter.waitForSomethingToFinish()
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// getPartMoveJobs takes two rings and creates a list of jobs for any partition moves between them.
|
||||
func getPartMoveJobs(oldRing, newRing hummingbird.Ring) []*PriorityRepJob {
|
||||
jobs := make([]*PriorityRepJob, 0)
|
||||
for partition := uint64(0); true; partition++ {
|
||||
olddevs := oldRing.GetNodes(partition)
|
||||
newdevs := newRing.GetNodes(partition)
|
||||
if olddevs == nil || newdevs == nil {
|
||||
break
|
||||
}
|
||||
for i := range olddevs {
|
||||
if olddevs[i].Id != newdevs[i].Id {
|
||||
// TODO: handle if a node just changes positions, which doesn't happen, but isn't against the contract.
|
||||
jobs = append(jobs, &PriorityRepJob{
|
||||
JobType: "handoff",
|
||||
Partition: partition,
|
||||
FromDevice: olddevs[i],
|
||||
ToDevices: []*hummingbird.Device{newdevs[i]},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
return jobs
|
||||
}
|
||||
|
||||
// MoveParts takes two object .ring.gz files as []string{oldRing, newRing} and dispatches priority replication jobs to rebalance data in line with any ring changes.
|
||||
func MoveParts(args []string) {
|
||||
if len(args) != 2 {
|
||||
fmt.Println("USAGE: hummingbird moveparts [old ringfile] [new ringfile]")
|
||||
return
|
||||
}
|
||||
hashPathPrefix, hashPathSuffix, err := hummingbird.GetHashPrefixAndSuffix()
|
||||
if err != nil {
|
||||
fmt.Println("Unable to load hash path prefix and suffix:", err)
|
||||
return
|
||||
}
|
||||
oldRing, err := hummingbird.LoadRing(args[0], hashPathPrefix, hashPathSuffix)
|
||||
if err != nil {
|
||||
fmt.Println("Unable to load old ring:", err)
|
||||
return
|
||||
}
|
||||
newRing, err := hummingbird.LoadRing(args[1], hashPathPrefix, hashPathSuffix)
|
||||
if err != nil {
|
||||
fmt.Println("Unable to load new ring:", err)
|
||||
return
|
||||
}
|
||||
client := &http.Client{Timeout: time.Hour}
|
||||
jobs := getPartMoveJobs(oldRing, newRing)
|
||||
fmt.Println("Job count:", len(jobs))
|
||||
doPriRepJobs(jobs, 2, client)
|
||||
fmt.Println("Done sending jobs.")
|
||||
}
|
||||
|
||||
// getRestoreDeviceJobs takes an ip address and device name, and creates a list of jobs to restore that device's data from peers.
|
||||
func getRestoreDeviceJobs(ring hummingbird.Ring, ip string, devName string) []*PriorityRepJob {
|
||||
jobs := make([]*PriorityRepJob, 0)
|
||||
for partition := uint64(0); true; partition++ {
|
||||
devs := ring.GetNodes(partition)
|
||||
if devs == nil {
|
||||
break
|
||||
}
|
||||
for i, dev := range devs {
|
||||
if dev.Device == devName && (dev.Ip == ip || dev.ReplicationIp == ip) {
|
||||
src := devs[(i+1)%len(devs)]
|
||||
jobs = append(jobs, &PriorityRepJob{
|
||||
JobType: "local",
|
||||
Partition: partition,
|
||||
FromDevice: src,
|
||||
ToDevices: []*hummingbird.Device{dev},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
return jobs
|
||||
}
|
||||
|
||||
// RestoreDevice takes an IP address and device name such as []string{"172.24.0.1", "sda1"} and attempts to restores its data from peers.
|
||||
func RestoreDevice(args []string) {
|
||||
if len(args) != 2 {
|
||||
fmt.Println("USAGE: hummingbird restoredevice [ip] [device]")
|
||||
return
|
||||
}
|
||||
hashPathPrefix, hashPathSuffix, err := hummingbird.GetHashPrefixAndSuffix()
|
||||
if err != nil {
|
||||
fmt.Println("Unable to load hash path prefix and suffix:", err)
|
||||
return
|
||||
}
|
||||
objRing, err := hummingbird.GetRing("object", hashPathPrefix, hashPathSuffix)
|
||||
if err != nil {
|
||||
fmt.Println("Unable to load ring:", err)
|
||||
return
|
||||
}
|
||||
client := &http.Client{Timeout: time.Hour}
|
||||
jobs := getRestoreDeviceJobs(objRing, args[0], args[1])
|
||||
fmt.Println("Job count:", len(jobs))
|
||||
doPriRepJobs(jobs, 2, client)
|
||||
fmt.Println("Done sending jobs.")
|
||||
}
|
||||
160
go/objectserver/priorityrep_test.go
Normal file
160
go/objectserver/priorityrep_test.go
Normal file
@@ -0,0 +1,160 @@
|
||||
// Copyright (c) 2015 Rackspace
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
// implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package objectserver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/openstack/swift/go/hummingbird"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type priFakeRing struct {
|
||||
mapping map[uint64][]int
|
||||
}
|
||||
|
||||
func (p *priFakeRing) GetJobNodes(partition uint64, localDevice int) (response []*hummingbird.Device, handoff bool) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (p *priFakeRing) GetPartition(account string, container string, object string) uint64 { return 0 }
|
||||
|
||||
func (p *priFakeRing) LocalDevices(localPort int) (devs []*hummingbird.Device, err error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (p *priFakeRing) GetMoreNodes(partition uint64) hummingbird.MoreNodes { return nil }
|
||||
|
||||
func (p *priFakeRing) GetNodes(partition uint64) (response []*hummingbird.Device) {
|
||||
for _, p := range p.mapping[partition] {
|
||||
response = append(response, &hummingbird.Device{Id: p, Device: fmt.Sprintf("drive%d", p), Ip: "127.0.0.1", Port: p})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func TestGetPartMoveJobs(t *testing.T) {
|
||||
t.Parallel()
|
||||
oldRing := &priFakeRing{
|
||||
mapping: map[uint64][]int{
|
||||
0: {1, 2, 3, 4, 5},
|
||||
1: {6, 7, 8, 9, 10},
|
||||
},
|
||||
}
|
||||
newRing := &priFakeRing{
|
||||
mapping: map[uint64][]int{
|
||||
0: {6, 2, 3, 4, 5},
|
||||
1: {6, 7, 8, 9, 11},
|
||||
},
|
||||
}
|
||||
jobs := getPartMoveJobs(oldRing, newRing)
|
||||
require.EqualValues(t, 2, len(jobs))
|
||||
require.EqualValues(t, 0, jobs[0].Partition)
|
||||
require.EqualValues(t, 1, jobs[0].FromDevice.Id)
|
||||
require.EqualValues(t, 6, jobs[0].ToDevices[0].Id)
|
||||
require.EqualValues(t, 1, jobs[1].Partition)
|
||||
require.EqualValues(t, 10, jobs[1].FromDevice.Id)
|
||||
require.EqualValues(t, 11, jobs[1].ToDevices[0].Id)
|
||||
}
|
||||
|
||||
func TestGetRestoreDeviceJobs(t *testing.T) {
|
||||
t.Parallel()
|
||||
ring := &priFakeRing{
|
||||
mapping: map[uint64][]int{
|
||||
0: {1, 2},
|
||||
1: {1, 3},
|
||||
},
|
||||
}
|
||||
jobs := getRestoreDeviceJobs(ring, "127.0.0.1", "drive1")
|
||||
require.EqualValues(t, 2, len(jobs))
|
||||
require.EqualValues(t, 0, jobs[0].Partition)
|
||||
require.EqualValues(t, 2, jobs[0].FromDevice.Id)
|
||||
require.EqualValues(t, 1, jobs[0].ToDevices[0].Id)
|
||||
require.EqualValues(t, 1, jobs[1].Partition)
|
||||
require.EqualValues(t, 3, jobs[1].FromDevice.Id)
|
||||
require.EqualValues(t, 1, jobs[1].ToDevices[0].Id)
|
||||
}
|
||||
|
||||
func TestPriRepJobs(t *testing.T) {
|
||||
t.Parallel()
|
||||
handlerRan := false
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
handlerRan = true
|
||||
require.Equal(t, "/priorityrep", r.URL.Path)
|
||||
var pri PriorityRepJob
|
||||
data, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
if err := json.Unmarshal(data, &pri); err != nil {
|
||||
w.WriteHeader(400)
|
||||
return
|
||||
}
|
||||
require.Equal(t, "handoff", pri.JobType)
|
||||
require.EqualValues(t, 0, pri.Partition)
|
||||
require.EqualValues(t, "sda", pri.FromDevice.Device)
|
||||
require.EqualValues(t, 1, len(pri.ToDevices))
|
||||
require.EqualValues(t, "sdb", pri.ToDevices[0].Device)
|
||||
}))
|
||||
|
||||
defer ts.Close()
|
||||
u, _ := url.Parse(ts.URL)
|
||||
host, ports, _ := net.SplitHostPort(u.Host)
|
||||
port, _ := strconv.Atoi(ports)
|
||||
jobs := []*PriorityRepJob{
|
||||
&PriorityRepJob{
|
||||
JobType: "handoff",
|
||||
Partition: 0,
|
||||
FromDevice: &hummingbird.Device{Device: "sda", Ip: host, Port: port - 500, ReplicationIp: host, ReplicationPort: port - 500},
|
||||
ToDevices: []*hummingbird.Device{
|
||||
&hummingbird.Device{Device: "sdb"},
|
||||
},
|
||||
},
|
||||
}
|
||||
doPriRepJobs(jobs, 2, http.DefaultClient)
|
||||
require.Equal(t, true, handlerRan)
|
||||
}
|
||||
|
||||
func TestDevLimiter(t *testing.T) {
|
||||
t.Parallel()
|
||||
job1 := &PriorityRepJob{
|
||||
FromDevice: &hummingbird.Device{Id: 0},
|
||||
ToDevices: []*hummingbird.Device{&hummingbird.Device{Id: 1, Device: "sdb"}},
|
||||
}
|
||||
job2 := &PriorityRepJob{
|
||||
FromDevice: &hummingbird.Device{Id: 1},
|
||||
ToDevices: []*hummingbird.Device{&hummingbird.Device{Id: 2, Device: "sdb"}},
|
||||
}
|
||||
job3 := &PriorityRepJob{
|
||||
FromDevice: &hummingbird.Device{Id: 1},
|
||||
ToDevices: []*hummingbird.Device{&hummingbird.Device{Id: 0, Device: "sdb"}},
|
||||
}
|
||||
limiter := &devLimiter{inUse: make(map[int]int), max: 2, somethingFinished: make(chan struct{}, 1)}
|
||||
require.True(t, limiter.start(job1))
|
||||
require.True(t, limiter.start(job2))
|
||||
require.False(t, limiter.start(job3))
|
||||
limiter.finished(job1)
|
||||
require.True(t, limiter.start(job3))
|
||||
}
|
||||
@@ -1,3 +1,18 @@
|
||||
// Copyright (c) 2015 Rackspace
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
// implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package objectserver
|
||||
|
||||
import (
|
||||
|
||||
@@ -17,6 +17,7 @@ package objectserver
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
@@ -54,13 +55,27 @@ type ReplicationData struct {
|
||||
err error
|
||||
}
|
||||
|
||||
type NoMoreNodes struct{}
|
||||
|
||||
func (n *NoMoreNodes) Next() *hummingbird.Device {
|
||||
return nil
|
||||
}
|
||||
|
||||
type PriorityRepJob struct {
|
||||
JobType string `json:"job_type"`
|
||||
Partition uint64 `json:"partition"`
|
||||
FromDevice *hummingbird.Device `json:"from_device"`
|
||||
ToDevices []*hummingbird.Device `json:"to_devices"`
|
||||
}
|
||||
|
||||
// Object replicator daemon object
|
||||
type Replicator struct {
|
||||
concurrency int
|
||||
checkMounts bool
|
||||
driveRoot string
|
||||
reconCachePath string
|
||||
debugAddress string
|
||||
bindPort int
|
||||
bindIp string
|
||||
logger hummingbird.SysLogLike
|
||||
port int
|
||||
Ring hummingbird.Ring
|
||||
@@ -70,6 +85,8 @@ type Replicator struct {
|
||||
concurrencySem chan struct{}
|
||||
devices []string
|
||||
partitions []string
|
||||
priRepChans map[int]chan PriorityRepJob
|
||||
priRepM sync.Mutex
|
||||
|
||||
/* stats accounting */
|
||||
startTime time.Time
|
||||
@@ -190,6 +207,8 @@ func listObjFiles(partdir string, needSuffix func(string) bool) ([]string, error
|
||||
return nil, err
|
||||
}
|
||||
if len(suffixDirs) == 0 {
|
||||
os.Remove(filepath.Join(partdir, ".lock"))
|
||||
os.Remove(filepath.Join(partdir, "hashes.pkl"))
|
||||
os.Remove(partdir)
|
||||
return nil, nil
|
||||
}
|
||||
@@ -251,7 +270,7 @@ func (r *Replicator) syncFile(objFile string, dst []*syncFileArg) (syncs int, in
|
||||
wrs = append(wrs, sfa)
|
||||
} else if sfr.NewerExists {
|
||||
insync++
|
||||
if os.RemoveAll(objFile) == nil {
|
||||
if os.Remove(objFile) == nil {
|
||||
InvalidateHash(filepath.Dir(objFile))
|
||||
}
|
||||
} else if sfr.Exists {
|
||||
@@ -421,7 +440,7 @@ func (r *Replicator) replicateHandoff(j *job, nodes []*hummingbird.Device) {
|
||||
if syncs, insync, err := r.syncFile(objFile, toSync); err == nil {
|
||||
syncCount += syncs
|
||||
if insync == len(nodes) {
|
||||
os.RemoveAll(objFile)
|
||||
os.Remove(objFile)
|
||||
os.Remove(filepath.Dir(objFile))
|
||||
}
|
||||
} else {
|
||||
@@ -476,10 +495,10 @@ func (r *Replicator) replicateDevice(dev *hummingbird.Device) {
|
||||
}
|
||||
r.jobCountIncrement <- uint64(len(partitionList))
|
||||
for _, partition := range partitionList {
|
||||
lockPath := filepath.Join(r.driveRoot, dev.Device, "lock_device")
|
||||
if hummingbird.Exists(lockPath) {
|
||||
if hummingbird.Exists(filepath.Join(r.driveRoot, dev.Device, "lock_device")) {
|
||||
break
|
||||
}
|
||||
r.processPriorityJobs(dev.Id)
|
||||
if len(r.partitions) > 0 {
|
||||
found := false
|
||||
for _, p := range r.partitions {
|
||||
@@ -493,30 +512,23 @@ func (r *Replicator) replicateDevice(dev *hummingbird.Device) {
|
||||
}
|
||||
}
|
||||
if partitioni, err := strconv.ParseUint(filepath.Base(partition), 10, 64); err == nil {
|
||||
var partStart time.Time
|
||||
<-r.partRateTicker.C
|
||||
r.replicationCountIncrement <- 1
|
||||
j := &job{objPath: objPath, partition: filepath.Base(partition), dev: dev}
|
||||
if nodes, handoff := r.Ring.GetJobNodes(partitioni, j.dev.Id); handoff {
|
||||
func() {
|
||||
r.concurrencySem <- struct{}{}
|
||||
defer func() {
|
||||
<-r.concurrencySem
|
||||
}()
|
||||
partStart = time.Now()
|
||||
func() {
|
||||
<-r.partRateTicker.C
|
||||
r.concurrencySem <- struct{}{}
|
||||
r.replicationCountIncrement <- 1
|
||||
j := &job{objPath: objPath, partition: filepath.Base(partition), dev: dev}
|
||||
nodes, handoff := r.Ring.GetJobNodes(partitioni, j.dev.Id)
|
||||
partStart := time.Now()
|
||||
defer func() {
|
||||
<-r.concurrencySem
|
||||
r.partitionTimesAdd <- float64(time.Since(partStart)) / float64(time.Second)
|
||||
}()
|
||||
if handoff {
|
||||
r.replicateHandoff(j, nodes)
|
||||
}()
|
||||
} else {
|
||||
func() {
|
||||
r.concurrencySem <- struct{}{}
|
||||
defer func() {
|
||||
<-r.concurrencySem
|
||||
}()
|
||||
partStart = time.Now()
|
||||
} else {
|
||||
r.replicateLocal(j, nodes, r.Ring.GetMoreNodes(partitioni))
|
||||
}()
|
||||
}
|
||||
r.partitionTimesAdd <- float64(time.Since(partStart)) / float64(time.Second)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -560,8 +572,9 @@ func (r *Replicator) statsReporter(c <-chan time.Time) {
|
||||
r.partitionTimes[len(r.partitionTimes)/2])
|
||||
}
|
||||
if r.dataTransferred > 0 {
|
||||
elapsed := float64(now.Sub(r.startTime)) / float64(time.Second)
|
||||
r.LogInfo("Data synced: %d (%.2f kbps), files synced: %d",
|
||||
r.dataTransferred, (float64(r.dataTransferred)/1024.0)*8.0, r.filesTransferred)
|
||||
r.dataTransferred, ((float64(r.dataTransferred)/1024.0)*8.0)/elapsed, r.filesTransferred)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -614,15 +627,100 @@ func (r *Replicator) run(c <-chan time.Time) {
|
||||
}
|
||||
}
|
||||
|
||||
// processPriorityJobs runs any pending priority jobs given the device's id
|
||||
func (r *Replicator) processPriorityJobs(id int) {
|
||||
for {
|
||||
select {
|
||||
case pri := <-r.getPriRepChan(id):
|
||||
r.jobCountIncrement <- 1
|
||||
func() {
|
||||
<-r.partRateTicker.C
|
||||
r.concurrencySem <- struct{}{}
|
||||
r.replicationCountIncrement <- 1
|
||||
j := &job{
|
||||
dev: pri.FromDevice,
|
||||
partition: strconv.FormatUint(pri.Partition, 10),
|
||||
objPath: filepath.Join(r.driveRoot, pri.FromDevice.Device, "objects"),
|
||||
}
|
||||
partStart := time.Now()
|
||||
defer func() {
|
||||
<-r.concurrencySem
|
||||
r.partitionTimesAdd <- float64(time.Since(partStart)) / float64(time.Second)
|
||||
}()
|
||||
if pri.JobType == "handoff" {
|
||||
r.replicateHandoff(j, pri.ToDevices)
|
||||
} else if pri.JobType == "local" {
|
||||
r.replicateLocal(j, pri.ToDevices, &NoMoreNodes{})
|
||||
}
|
||||
}()
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getPriRepChan synchronizes access to the r.priRepChans mapping
|
||||
func (r *Replicator) getPriRepChan(id int) chan PriorityRepJob {
|
||||
r.priRepM.Lock()
|
||||
defer r.priRepM.Unlock()
|
||||
if _, ok := r.priRepChans[id]; !ok {
|
||||
r.priRepChans[id] = make(chan PriorityRepJob)
|
||||
}
|
||||
return r.priRepChans[id]
|
||||
}
|
||||
|
||||
// priorityRepHandler handles HTTP requests for priority replications jobs.
|
||||
func (r *Replicator) priorityRepHandler(w http.ResponseWriter, req *http.Request) {
|
||||
var pri PriorityRepJob
|
||||
data, err := ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
if err := json.Unmarshal(data, &pri); err != nil || pri.JobType == "" {
|
||||
w.WriteHeader(400)
|
||||
return
|
||||
}
|
||||
if r.checkMounts {
|
||||
if mounted, err := hummingbird.IsMount(filepath.Join(r.driveRoot, pri.FromDevice.Device)); err != nil || mounted == false {
|
||||
w.WriteHeader(507)
|
||||
return
|
||||
}
|
||||
}
|
||||
if !hummingbird.Exists(filepath.Join(r.driveRoot, pri.FromDevice.Device, "objects", strconv.FormatUint(pri.Partition, 10))) {
|
||||
w.WriteHeader(404)
|
||||
return
|
||||
}
|
||||
timeout := time.NewTimer(time.Hour)
|
||||
defer timeout.Stop()
|
||||
select {
|
||||
case r.getPriRepChan(pri.FromDevice.Id) <- pri:
|
||||
case <-timeout.C:
|
||||
}
|
||||
w.WriteHeader(200)
|
||||
}
|
||||
|
||||
func (r *Replicator) startWebServer() {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/priorityrep", r.priorityRepHandler)
|
||||
mux.Handle("/debug/", http.DefaultServeMux)
|
||||
for {
|
||||
if sock, err := hummingbird.RetryListen(r.bindIp, r.bindPort); err != nil {
|
||||
r.LogError("Listen failed: %v", err)
|
||||
} else {
|
||||
http.Serve(sock, mux)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Run a single replication pass.
|
||||
func (r *Replicator) Run() {
|
||||
go http.ListenAndServe(r.debugAddress, nil)
|
||||
r.run(OneTimeChan())
|
||||
}
|
||||
|
||||
// Run replication passes in a loop until forever.
|
||||
func (r *Replicator) RunForever() {
|
||||
go http.ListenAndServe(r.debugAddress, nil)
|
||||
go r.startWebServer()
|
||||
r.run(time.Tick(RunForeverInterval))
|
||||
}
|
||||
|
||||
@@ -632,6 +730,7 @@ func NewReplicator(conf string, flags *flag.FlagSet) (hummingbird.Daemon, error)
|
||||
replicationCountIncrement: make(chan uint64),
|
||||
jobCountIncrement: make(chan uint64),
|
||||
dataTransferAdd: make(chan uint64),
|
||||
priRepChans: make(map[int]chan PriorityRepJob),
|
||||
}
|
||||
hashPathPrefix, hashPathSuffix, err := hummingbird.GetHashPrefixAndSuffix()
|
||||
if err != nil {
|
||||
@@ -645,7 +744,8 @@ func NewReplicator(conf string, flags *flag.FlagSet) (hummingbird.Daemon, error)
|
||||
replicator.checkMounts = serverconf.GetBool("object-replicator", "mount_check", true)
|
||||
replicator.driveRoot = serverconf.GetDefault("object-replicator", "devices", "/srv/node")
|
||||
replicator.port = int(serverconf.GetInt("object-replicator", "bind_port", 6000))
|
||||
replicator.debugAddress = serverconf.GetDefault("object-replicator", "go_debug_address", "127.0.0.1:7070")
|
||||
replicator.bindIp = serverconf.GetDefault("object-replicator", "bind_ip", "0.0.0.0")
|
||||
replicator.bindPort = int(serverconf.GetInt("object-replicator", "replicator_bind_port", int64(replicator.port+500)))
|
||||
replicator.logger = hummingbird.SetupLogger(serverconf.GetDefault("object-replicator", "log_facility", "LOG_LOCAL0"), "object-replicator", "")
|
||||
if serverconf.GetBool("object-replicator", "vm_test_mode", false) {
|
||||
replicator.timePerPart = time.Duration(serverconf.GetInt("object-replicator", "ms_per_part", 2000)) * time.Millisecond
|
||||
|
||||
@@ -17,6 +17,7 @@ package objectserver
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
@@ -415,3 +416,49 @@ func TestListObjFiles(t *testing.T) {
|
||||
require.False(t, hummingbird.Exists(filepath.Join(dir, "objects", "1")))
|
||||
require.True(t, hummingbird.Exists(filepath.Join(dir, "objects")))
|
||||
}
|
||||
|
||||
func TestPriorityRepHandler(t *testing.T) {
|
||||
t.Parallel()
|
||||
driveRoot := setupDirectory()
|
||||
defer os.RemoveAll(driveRoot)
|
||||
replicator := makeReplicator("bind_port", "1234", "check_mounts", "no")
|
||||
replicator.driveRoot = driveRoot
|
||||
w := httptest.NewRecorder()
|
||||
job := &PriorityRepJob{
|
||||
JobType: "handoff",
|
||||
Partition: 1,
|
||||
FromDevice: &hummingbird.Device{Id: 1, Device: "sda", Ip: "127.0.0.1", Port: 5000, ReplicationIp: "127.0.0.1", ReplicationPort: 5000},
|
||||
ToDevices: []*hummingbird.Device{
|
||||
&hummingbird.Device{Id: 2, Device: "sdb"},
|
||||
},
|
||||
}
|
||||
jsonned, _ := json.Marshal(job)
|
||||
req, _ := http.NewRequest("POST", "/priorityrep", bytes.NewBuffer(jsonned))
|
||||
go func() {
|
||||
replicator.priorityRepHandler(w, req)
|
||||
require.EqualValues(t, 200, w.Code)
|
||||
}()
|
||||
pri := <-replicator.getPriRepChan(1)
|
||||
require.Equal(t, "handoff", pri.JobType)
|
||||
}
|
||||
|
||||
func TestPriorityRepHandler404(t *testing.T) {
|
||||
t.Parallel()
|
||||
driveRoot := setupDirectory()
|
||||
defer os.RemoveAll(driveRoot)
|
||||
replicator := makeReplicator("bind_port", "1234", "check_mounts", "no")
|
||||
replicator.driveRoot = driveRoot
|
||||
w := httptest.NewRecorder()
|
||||
job := &PriorityRepJob{
|
||||
JobType: "handoff",
|
||||
Partition: 0,
|
||||
FromDevice: &hummingbird.Device{Id: 1, Device: "sda", Ip: "127.0.0.1", Port: 5000, ReplicationIp: "127.0.0.1", ReplicationPort: 5000},
|
||||
ToDevices: []*hummingbird.Device{
|
||||
&hummingbird.Device{Id: 2, Device: "sdb"},
|
||||
},
|
||||
}
|
||||
jsonned, _ := json.Marshal(job)
|
||||
req, _ := http.NewRequest("POST", "/priorityrep", bytes.NewBuffer(jsonned))
|
||||
replicator.priorityRepHandler(w, req)
|
||||
require.EqualValues(t, 404, w.Code)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user