diff --git a/go/objectserver/replicator.go b/go/objectserver/replicator.go index d47cd74fa0..acc21fd593 100644 --- a/go/objectserver/replicator.go +++ b/go/objectserver/replicator.go @@ -27,7 +27,6 @@ import ( "os" "path/filepath" "runtime/debug" - "sort" "strconv" "strings" "sync" @@ -36,10 +35,9 @@ import ( "github.com/openstack/swift/go/hummingbird" ) -var ReplicationSessionTimeout = 60 * time.Second -var RunForeverInterval = 30 * time.Second var StatsReportInterval = 300 * time.Second var TmpEmptyTime = 24 * time.Hour +var ReplicateDeviceTimeout = 4 * time.Hour // Encapsulates a partition for replication. type job struct { @@ -67,6 +65,23 @@ type PriorityRepJob struct { ToDevices []*hummingbird.Device `json:"to_devices"` } +type deviceProgress struct { + PartitionsDone uint64 + PartitionsTotal uint64 + StartDate time.Time + LastUpdate time.Time + FilesSent uint64 + BytesSent uint64 + PriorityRepsDone uint64 + + FullReplicateCount uint64 + CancelCount uint64 + LastPassDuration time.Duration + LastPassUpdate time.Time + + dev *hummingbird.Device +} + // Object replicator daemon object type Replicator struct { concurrency int @@ -83,18 +98,19 @@ type Replicator struct { timePerPart time.Duration quorumDelete bool concurrencySem chan struct{} - devices []string - partitions []string + devices map[string]bool + partitions map[string]bool priRepChans map[int]chan PriorityRepJob priRepM sync.Mutex reclaimAge int64 + once bool + cancelers map[string]chan struct{} + /* stats accounting */ - startTime time.Time - replicationCount, jobCount, dataTransferred, filesTransferred uint64 - replicationCountIncrement, jobCountIncrement, dataTransferAdd chan uint64 - partitionTimes sort.Float64Slice - partitionTimesAdd chan float64 + deviceProgressMap map[string]*deviceProgress + deviceProgressIncr chan deviceProgress + deviceProgressPassInit chan deviceProgress } func (r *Replicator) LogError(format string, args ...interface{}) { @@ -252,7 +268,7 @@ type syncFileArg struct { dev *hummingbird.Device } -func (r *Replicator) syncFile(objFile string, dst []*syncFileArg) (syncs int, insync int, err error) { +func (r *Replicator) syncFile(objFile string, dst []*syncFileArg, j *job) (syncs int, insync int, err error) { var wrs []*syncFileArg lst := strings.Split(objFile, string(os.PathSeparator)) relPath := filepath.Join(lst[len(lst)-5:]...) @@ -316,9 +332,13 @@ func (r *Replicator) syncFile(objFile string, dst []*syncFileArg) (syncs int, in sfa.conn.Flush() if sfa.conn.RecvMessage(&fur) == nil { if fur.Success { - r.dataTransferAdd <- uint64(fileSize) syncs++ insync++ + r.deviceProgressIncr <- deviceProgress{ + dev: j.dev, + FilesSent: 1, + BytesSent: uint64(fileSize), + } } } } @@ -400,7 +420,7 @@ func (r *Replicator) replicateLocal(j *job, nodes []*hummingbird.Device, moreNod toSync = append(toSync, &syncFileArg{conn: remoteConnections[dev.Id], dev: dev}) } } - if syncs, _, err := r.syncFile(objFile, toSync); err == nil { + if syncs, _, err := r.syncFile(objFile, toSync, j); err == nil { syncCount += syncs } else { r.LogError("[syncFile] %v", err) @@ -452,7 +472,7 @@ func (r *Replicator) replicateHandoff(j *job, nodes []*hummingbird.Device) { toSync = append(toSync, &syncFileArg{conn: remoteConnections[dev.Id], dev: dev}) } } - if syncs, insync, err := r.syncFile(objFile, toSync); err == nil { + if syncs, insync, err := r.syncFile(objFile, toSync, j); err == nil { syncCount += syncs success := insync == len(nodes) @@ -489,66 +509,106 @@ func (r *Replicator) cleanTemp(dev *hummingbird.Device) { } } -func (r *Replicator) replicateDevice(dev *hummingbird.Device) { - defer r.LogPanics("PANIC REPLICATING DEVICE") +// start or restart replication on a device and set up canceler +func (r *Replicator) restartReplicateDevice(dev *hummingbird.Device) { + r.devGroup.Add(1) + if canceler, ok := r.cancelers[dev.Device]; ok { + close(canceler) + } + r.cancelers[dev.Device] = make(chan struct{}) + go r.replicateDevice(dev, r.cancelers[dev.Device]) +} + +// Run replication on given device. For normal usage do not call directly, use "restartReplicateDevice" +func (r *Replicator) replicateDevice(dev *hummingbird.Device, canceler chan struct{}) { + defer r.LogPanics(fmt.Sprintf("PANIC REPLICATING DEVICE: %s", dev.Device)) defer r.devGroup.Done() + var lastPassDuration time.Duration + for { + r.cleanTemp(dev) + passStartTime := time.Now() - r.cleanTemp(dev) - - if mounted, err := hummingbird.IsMount(filepath.Join(r.driveRoot, dev.Device)); r.checkMounts && (err != nil || mounted != true) { - r.LogError("[replicateDevice] Drive not mounted: %s", dev.Device) - return - } - objPath := filepath.Join(r.driveRoot, dev.Device, "objects") - if fi, err := os.Stat(objPath); err != nil || !fi.Mode().IsDir() { - r.LogError("[replicateDevice] No objects found: %s", objPath) - return - } - partitionList, err := filepath.Glob(filepath.Join(objPath, "[0-9]*")) - if err != nil { - r.LogError("[replicateDevice] Error getting partition list: %s (%v)", objPath, err) - return - } - for i := len(partitionList) - 1; i > 0; i-- { // shuffle partition list - j := rand.Intn(i + 1) - partitionList[j], partitionList[i] = partitionList[i], partitionList[j] - } - r.jobCountIncrement <- uint64(len(partitionList)) - for _, partition := range partitionList { - if hummingbird.Exists(filepath.Join(r.driveRoot, dev.Device, "lock_device")) { + if mounted, err := hummingbird.IsMount(filepath.Join(r.driveRoot, dev.Device)); r.checkMounts && (err != nil || mounted != true) { + r.LogError("[replicateDevice] Drive not mounted: %s", dev.Device) break } - r.processPriorityJobs(dev.Id) - if len(r.partitions) > 0 { - found := false - for _, p := range r.partitions { - if filepath.Base(partition) == p { - found = true - break - } + objPath := filepath.Join(r.driveRoot, dev.Device, "objects") + if fi, err := os.Stat(objPath); err != nil || !fi.Mode().IsDir() { + r.LogError("[replicateDevice] No objects found: %s", objPath) + break + } + partitionList, err := filepath.Glob(filepath.Join(objPath, "[0-9]*")) + if err != nil { + r.LogError("[replicateDevice] Error getting partition list: %s (%v)", objPath, err) + break + } + // if one of the breaks above triggers, the monitorer should retry after ReplicateDeviceTimeout + for i := len(partitionList) - 1; i > 0; i-- { // shuffle partition list + j := rand.Intn(i + 1) + partitionList[j], partitionList[i] = partitionList[i], partitionList[j] + } + + numPartitions := uint64(len(partitionList)) + partitionsProcessed := uint64(0) + + r.deviceProgressPassInit <- deviceProgress{ + dev: dev, + PartitionsTotal: numPartitions, + LastPassDuration: lastPassDuration, + } + + for _, partition := range partitionList { + if hummingbird.Exists(filepath.Join(r.driveRoot, dev.Device, "lock_device")) { + break } - if !found { + select { + case <-canceler: + { + r.deviceProgressIncr <- deviceProgress{ + dev: dev, + CancelCount: 1, + } + r.LogError("replicateDevice canceled for device: %s", dev.Device) + return + } + default: + } + r.processPriorityJobs(dev.Id) + if _, ok := r.partitions[filepath.Base(partition)]; len(r.partitions) > 0 && !ok { continue } - } - if partitioni, err := strconv.ParseUint(filepath.Base(partition), 10, 64); err == nil { - func() { - <-r.partRateTicker.C - r.concurrencySem <- struct{}{} - r.replicationCountIncrement <- 1 - j := &job{objPath: objPath, partition: filepath.Base(partition), dev: dev} - nodes, handoff := r.Ring.GetJobNodes(partitioni, j.dev.Id) - partStart := time.Now() - defer func() { - <-r.concurrencySem - r.partitionTimesAdd <- float64(time.Since(partStart)) / float64(time.Second) + + if partitioni, err := strconv.ParseUint(filepath.Base(partition), 10, 64); err == nil { + func() { + <-r.partRateTicker.C + r.concurrencySem <- struct{}{} + r.deviceProgressIncr <- deviceProgress{ + dev: dev, + PartitionsDone: 1, + } + j := &job{objPath: objPath, partition: filepath.Base(partition), dev: dev} + nodes, handoff := r.Ring.GetJobNodes(partitioni, j.dev.Id) + defer func() { + <-r.concurrencySem + }() + partitionsProcessed += 1 + if handoff { + r.replicateHandoff(j, nodes) + } else { + r.replicateLocal(j, nodes, r.Ring.GetMoreNodes(partitioni)) + } }() - if handoff { - r.replicateHandoff(j, nodes) - } else { - r.replicateLocal(j, nodes, r.Ring.GetMoreNodes(partitioni)) - } - }() + } + } + if partitionsProcessed >= numPartitions { + r.deviceProgressIncr <- deviceProgress{ + dev: dev, + FullReplicateCount: 1, + } + lastPassDuration = time.Since(passStartTime) + } + if r.once { + break } } } @@ -557,22 +617,75 @@ func (r *Replicator) replicateDevice(dev *hummingbird.Device) { func (r *Replicator) statsReporter(c <-chan time.Time) { for { select { - case dt := <-r.dataTransferAdd: - r.dataTransferred += dt - r.filesTransferred += 1 - case jobs := <-r.jobCountIncrement: - r.jobCount += jobs - case replicates := <-r.replicationCountIncrement: - r.replicationCount += replicates - case partitionTime := <-r.partitionTimesAdd: - r.partitionTimes = append(r.partitionTimes, partitionTime) - case now, ok := <-c: + case dp := <-r.deviceProgressPassInit: + curDp, ok := r.deviceProgressMap[dp.dev.Device] + if !ok { + curDp = &deviceProgress{ + dev: dp.dev, + StartDate: time.Now(), + LastUpdate: time.Now(), + } + r.deviceProgressMap[dp.dev.Device] = curDp + } + + curDp.StartDate = time.Now() + curDp.LastUpdate = time.Now() + curDp.PartitionsDone = 0 + curDp.PartitionsTotal = dp.PartitionsTotal + curDp.FilesSent = 0 + curDp.BytesSent = 0 + curDp.PriorityRepsDone = 0 + curDp.LastPassDuration = dp.LastPassDuration + if dp.LastPassDuration > 0 { + curDp.LastPassUpdate = time.Now() + } + case dp := <-r.deviceProgressIncr: + if curDp, ok := r.deviceProgressMap[dp.dev.Device]; !ok { + r.LogError("Trying to increment progress and not present: %s", dp.dev.Device) + } else { + curDp.LastUpdate = time.Now() + curDp.PartitionsDone += dp.PartitionsDone + curDp.FilesSent += dp.FilesSent + curDp.BytesSent += dp.BytesSent + curDp.PriorityRepsDone += dp.PriorityRepsDone + curDp.FullReplicateCount += dp.FullReplicateCount + curDp.CancelCount += dp.CancelCount + } + case _, ok := <-c: if !ok { return } - if r.replicationCount > 0 { - elapsed := float64(now.Sub(r.startTime)) / float64(time.Second) - remaining := time.Duration(float64(now.Sub(r.startTime))/(float64(r.replicationCount)/float64(r.jobCount))) - now.Sub(r.startTime) + var totalParts uint64 + var doneParts uint64 + var bytesProcessed uint64 + var filesProcessed uint64 + var processingDuration time.Duration + allHaveCompleted := true + var totalDuration time.Duration + var maxLastPassUpdate time.Time + + for _, dp := range r.deviceProgressMap { + if time.Since(dp.LastUpdate) > ReplicateDeviceTimeout { + r.restartReplicateDevice(dp.dev) + continue + } + totalParts += dp.PartitionsTotal + doneParts += dp.PartitionsDone + bytesProcessed += dp.BytesSent + filesProcessed += dp.FilesSent + processingDuration += dp.LastUpdate.Sub(dp.StartDate) + + allHaveCompleted = allHaveCompleted && (dp.LastPassDuration > 0) + totalDuration += processingDuration + if maxLastPassUpdate.Before(dp.LastPassUpdate) { + maxLastPassUpdate = dp.LastPassUpdate + } + } + + if doneParts > 0 { + processingNsecs := float64(processingDuration.Nanoseconds()) + partsPerNsecond := float64(doneParts) / processingNsecs + remaining := time.Duration(float64(totalParts-doneParts) / partsPerNsecond) var remainingStr string if remaining >= time.Hour { remainingStr = fmt.Sprintf("%.0fh", remaining.Hours()) @@ -581,70 +694,42 @@ func (r *Replicator) statsReporter(c <-chan time.Time) { } else { remainingStr = fmt.Sprintf("%.0fs", remaining.Seconds()) } - r.LogInfo("%d/%d (%.2f%%) partitions replicated in %.2fs (%.2f/sec, %v remaining)", - r.replicationCount, r.jobCount, float64(100*r.replicationCount)/float64(r.jobCount), - elapsed, float64(r.replicationCount)/elapsed, remainingStr) + r.LogInfo("%d/%d (%.2f%%) partitions replicated in %.2f worker seconds (%.2f/sec, %v remaining)", + doneParts, totalParts, float64(100*doneParts)/float64(totalParts), + processingNsecs/float64(time.Second), partsPerNsecond*float64(time.Second), remainingStr) } - if len(r.partitionTimes) > 0 { - r.partitionTimes.Sort() - r.LogInfo("Partition times: max %.4fs, min %.4fs, med %.4fs", - r.partitionTimes[len(r.partitionTimes)-1], r.partitionTimes[0], - r.partitionTimes[len(r.partitionTimes)/2]) - } - if r.dataTransferred > 0 { - elapsed := float64(now.Sub(r.startTime)) / float64(time.Second) - r.LogInfo("Data synced: %d (%.2f kbps), files synced: %d", - r.dataTransferred, ((float64(r.dataTransferred)/1024.0)*8.0)/elapsed, r.filesTransferred) + + if allHaveCompleted { + // this is a little lame- i'd rather just drop this completely + hummingbird.DumpReconCache(r.reconCachePath, "object", + map[string]interface{}{ + "object_replication_time": float64(totalDuration) / float64(len(r.deviceProgressMap)) / float64(time.Second), + "object_replication_last": float64(maxLastPassUpdate.UnixNano()) / float64(time.Second), + }) + } } } } -// Run replication passes of the whole server until c is closed. -func (r *Replicator) run(c <-chan time.Time) { - for _ = range c { - r.partitionTimes = nil - r.jobCount = 0 - r.replicationCount = 0 - r.dataTransferred = 0 - r.filesTransferred = 0 - r.startTime = time.Now() - statsTicker := time.NewTicker(StatsReportInterval) - go r.statsReporter(statsTicker.C) +// Run replication passes for each device on the whole server. +func (r *Replicator) run() { + statsTicker := time.NewTicker(StatsReportInterval) + go r.statsReporter(statsTicker.C) - r.partRateTicker = time.NewTicker(r.timePerPart) - r.concurrencySem = make(chan struct{}, r.concurrency) - localDevices, err := r.Ring.LocalDevices(r.port) - if err != nil { - r.LogError("Error getting local devices: %v", err) - continue - } - for _, dev := range localDevices { - if len(r.devices) > 0 { - found := false - for _, d := range r.devices { - if dev.Device == d { - found = true - break - } - } - if !found { - continue - } - } - r.devGroup.Add(1) - go r.replicateDevice(dev) - } - r.devGroup.Wait() - r.partRateTicker.Stop() - statsTicker.Stop() - r.statsReporter(OneTimeChan()) - hummingbird.DumpReconCache(r.reconCachePath, "object", - map[string]interface{}{ - "object_replication_time": float64(time.Since(r.startTime)) / float64(time.Second), - "object_replication_last": float64(time.Now().UnixNano()) / float64(time.Second), - }) + r.partRateTicker = time.NewTicker(r.timePerPart) + r.concurrencySem = make(chan struct{}, r.concurrency) + localDevices, err := r.Ring.LocalDevices(r.port) + if err != nil { + r.LogError("Error getting local devices: %v", err) + return } + for _, dev := range localDevices { + if _, ok := r.devices[dev.Device]; ok || len(r.devices) == 0 { + r.restartReplicateDevice(dev) + } + } + r.devGroup.Wait() } // processPriorityJobs runs any pending priority jobs given the device's id @@ -652,21 +737,22 @@ func (r *Replicator) processPriorityJobs(id int) { for { select { case pri := <-r.getPriRepChan(id): - r.jobCountIncrement <- 1 + r.deviceProgressIncr <- deviceProgress{ + dev: pri.FromDevice, + PriorityRepsDone: 1, + } + func() { <-r.partRateTicker.C r.concurrencySem <- struct{}{} - r.replicationCountIncrement <- 1 j := &job{ dev: pri.FromDevice, partition: strconv.FormatUint(pri.Partition, 10), objPath: filepath.Join(r.driveRoot, pri.FromDevice.Device, "objects"), } _, handoff := r.Ring.GetJobNodes(pri.Partition, pri.FromDevice.Id) - partStart := time.Now() defer func() { <-r.concurrencySem - r.partitionTimesAdd <- float64(time.Since(partStart)) / float64(time.Second) }() toDevicesArr := make([]string, len(pri.ToDevices)) for i, s := range pri.ToDevices { @@ -699,6 +785,27 @@ func (r *Replicator) getPriRepChan(id int) chan PriorityRepJob { return r.priRepChans[id] } +// ProgressReportHandler handles HTTP requests for current replication progress +func (r *Replicator) ProgressReportHandler(w http.ResponseWriter, req *http.Request) { + _, err := ioutil.ReadAll(req.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + data, err := json.Marshal(r.deviceProgressMap) + if err != nil { + r.LogError("Error Marshaling device progress: ", err) + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + w.WriteHeader(http.StatusOK) + w.Write(data) + return + +} + // priorityRepHandler handles HTTP requests for priority replications jobs. func (r *Replicator) priorityRepHandler(w http.ResponseWriter, req *http.Request) { var pri PriorityRepJob @@ -733,6 +840,7 @@ func (r *Replicator) priorityRepHandler(w http.ResponseWriter, req *http.Request func (r *Replicator) startWebServer() { mux := http.NewServeMux() mux.HandleFunc("/priorityrep", r.priorityRepHandler) + mux.HandleFunc("/progress", r.ProgressReportHandler) mux.Handle("/debug/", http.DefaultServeMux) for { if sock, err := hummingbird.RetryListen(r.bindIp, r.bindPort); err != nil { @@ -743,15 +851,16 @@ func (r *Replicator) startWebServer() { } } -// Run a single replication pass. +// Run a single replication pass. (NOTE: we will prob get rid of this because of priorityRepl) func (r *Replicator) Run() { - r.run(OneTimeChan()) + r.once = true + r.run() } // Run replication passes in a loop until forever. func (r *Replicator) RunForever() { go r.startWebServer() - r.run(time.Tick(RunForeverInterval)) + r.run() } func NewReplicator(serverconf hummingbird.Config, flags *flag.FlagSet) (hummingbird.Daemon, error) { @@ -760,11 +869,14 @@ func NewReplicator(serverconf hummingbird.Config, flags *flag.FlagSet) (hummingb } replicator := &Replicator{ - partitionTimesAdd: make(chan float64), - replicationCountIncrement: make(chan uint64), - jobCountIncrement: make(chan uint64), - dataTransferAdd: make(chan uint64), - priRepChans: make(map[int]chan PriorityRepJob), + priRepChans: make(map[int]chan PriorityRepJob), + deviceProgressMap: make(map[string]*deviceProgress), + deviceProgressPassInit: make(chan deviceProgress), + deviceProgressIncr: make(chan deviceProgress), + devices: make(map[string]bool), + partitions: make(map[string]bool), + cancelers: make(map[string]chan struct{}), + once: false, } hashPathPrefix, hashPathSuffix, err := hummingbird.GetHashPrefixAndSuffix() if err != nil { @@ -791,13 +903,17 @@ func NewReplicator(serverconf hummingbird.Config, flags *flag.FlagSet) (hummingb devices_flag := flags.Lookup("devices") if devices_flag != nil { if devices := devices_flag.Value.(flag.Getter).Get().(string); len(devices) > 0 { - replicator.devices = strings.Split(devices, ",") + for _, devName := range strings.Split(devices, ",") { + replicator.devices[strings.TrimSpace(devName)] = true + } } } partitions_flag := flags.Lookup("partitions") if partitions_flag != nil { if partitions := partitions_flag.Value.(flag.Getter).Get().(string); len(partitions) > 0 { - replicator.partitions = strings.Split(partitions, ",") + for _, part := range strings.Split(partitions, ",") { + replicator.partitions[strings.TrimSpace(part)] = true + } } } if !replicator.quorumDelete { diff --git a/go/objectserver/replicator_test.go b/go/objectserver/replicator_test.go index 574806701d..3bb7abd650 100644 --- a/go/objectserver/replicator_test.go +++ b/go/objectserver/replicator_test.go @@ -28,6 +28,7 @@ import ( "os" "path/filepath" "strconv" + "strings" "testing" "time" @@ -146,7 +147,7 @@ func TestCleanTemp(t *testing.T) { assert.False(t, hummingbird.Exists(filepath.Join(driveRoot, "sda", "tmp", "oldfile"))) } -func TestReplicatorReportStats(t *testing.T) { +func TestReplicatorReportStatsNotSetup(t *testing.T) { saved := &replicationLogSaver{} replicator := makeReplicator("devices", os.TempDir(), "ms_per_part", "1", "concurrency", "3") replicator.logger = saved @@ -157,25 +158,51 @@ func TestReplicatorReportStats(t *testing.T) { replicator.statsReporter(c) done <- true }() - replicator.jobCountIncrement <- 100 - replicator.replicationCountIncrement <- 50 - replicator.partitionTimesAdd <- 10.0 - replicator.partitionTimesAdd <- 20.0 - replicator.partitionTimesAdd <- 15.0 + replicator.deviceProgressIncr <- deviceProgress{ + dev: &hummingbird.Device{Device: "sda"}, + PartitionsTotal: 12} + c <- t close(c) <-done return saved.logged[len(saved.logged)-1] } - var remaining int - var elapsed, rate float64 - replicator.startTime = time.Now() reportStats(time.Now().Add(100 * time.Second)) - cnt, err := fmt.Sscanf(saved.logged[0], "50/100 (50.00%%) partitions replicated in %fs (%f/sec, %dm remaining)", &elapsed, &rate, &remaining) - assert.Nil(t, err) - assert.Equal(t, 3, cnt) - assert.Equal(t, 2, remaining) - assert.Equal(t, saved.logged[1], "Partition times: max 20.0000s, min 10.0000s, med 15.0000s") + assert.Equal(t, "Trying to increment progress and not present: sda", saved.logged[0]) +} + +func TestReplicatorReportStats(t *testing.T) { + saved := &replicationLogSaver{} + replicator := makeReplicator("devices", os.TempDir(), "ms_per_part", "1", "concurrency", "3") + replicator.logger = saved + + replicator.deviceProgressMap["sda"] = &deviceProgress{ + dev: &hummingbird.Device{Device: "sda"}} + replicator.deviceProgressMap["sdb"] = &deviceProgress{ + dev: &hummingbird.Device{Device: "sdb"}} + + reportStats := func(t time.Time) string { + c := make(chan time.Time) + done := make(chan bool) + go func() { + replicator.statsReporter(c) + done <- true + }() + replicator.deviceProgressPassInit <- deviceProgress{ + dev: &hummingbird.Device{Device: "sda"}, + PartitionsTotal: 10} + replicator.deviceProgressIncr <- deviceProgress{ + dev: &hummingbird.Device{Device: "sda"}, + PartitionsDone: 10} + + c <- t + close(c) + <-done + return saved.logged[len(saved.logged)-1] + } + reportStats(time.Now().Add(100 * time.Second)) + assert.Equal(t, replicator.deviceProgressMap["sda"].PartitionsDone, uint64(10)) + assert.NotEqual(t, strings.Index(saved.logged[0], "10/10 (100.00%) partitions replicated in"), -1) } type FakeLocalRing struct { @@ -513,3 +540,97 @@ func TestPriorityRepHandler404(t *testing.T) { replicator.priorityRepHandler(w, req) require.EqualValues(t, 404, w.Code) } + +func TestRestartDevice(t *testing.T) { + ts, err := makeObjectServer() + assert.Nil(t, err) + defer ts.Close() + + ts2, err := makeObjectServer() + assert.Nil(t, err) + defer ts2.Close() + + req, err := http.NewRequest("PUT", fmt.Sprintf("http://%s:%d/sda/0/a/c/o", ts.host, ts.port), + bytes.NewBuffer([]byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ"))) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/octet-stream") + req.Header.Set("Content-Length", "26") + req.Header.Set("X-Timestamp", hummingbird.GetTimestamp()) + resp, err := http.DefaultClient.Do(req) + require.Nil(t, err) + require.Equal(t, 201, resp.StatusCode) + + req, err = http.NewRequest("PUT", fmt.Sprintf("http://%s:%d/sda/1/a/c/o2", ts.host, ts.port), + bytes.NewBuffer([]byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ"))) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/octet-stream") + req.Header.Set("Content-Length", "26") + req.Header.Set("X-Timestamp", hummingbird.GetTimestamp()) + resp, err = http.DefaultClient.Do(req) + require.Nil(t, err) + require.Equal(t, 201, resp.StatusCode) + + ldev := &hummingbird.Device{ReplicationIp: ts.host, ReplicationPort: ts.port, Device: "sda"} + rdev := &hummingbird.Device{ReplicationIp: ts2.host, ReplicationPort: ts2.port, Device: "sda"} + dp := &deviceProgress{ + dev: ldev, + StartDate: time.Now(), + LastUpdate: time.Now(), + } + + saved := &replicationLogSaver{} + repl := makeReplicator() + repl.Ring = &FakeRepRing2{ldev: ldev, rdev: rdev} + repl.logger = saved + + // set stuff up + repl.driveRoot = ts.objServer.driveRoot + myTicker := make(chan time.Time) + repl.partRateTicker = time.NewTicker(repl.timePerPart) + repl.partRateTicker.C = myTicker + repl.concurrencySem = make(chan struct{}, 5) + repl.deviceProgressMap["sda"] = dp + + repl.restartReplicateDevice(ldev) + cancelChan := repl.cancelers["sda"] + // precancel the run + delete(repl.cancelers, "sda") + close(cancelChan) + //start replication for loop + statsDp := <-repl.deviceProgressPassInit + assert.Equal(t, uint64(2), statsDp.PartitionsTotal) + // but got canceled + statsDp = <-repl.deviceProgressIncr + assert.Equal(t, uint64(1), statsDp.CancelCount) + // start up everything again + repl.restartReplicateDevice(ldev) + //start replication for loop again + <-repl.deviceProgressPassInit + // 1st partition process + myTicker <- time.Now() + statsDp = <-repl.deviceProgressIncr + assert.Equal(t, uint64(1), statsDp.PartitionsDone) + // syncing file + statsDp = <-repl.deviceProgressIncr + assert.Equal(t, uint64(1), statsDp.FilesSent) + + // 2nd partition process + myTicker <- time.Now() + statsDp = <-repl.deviceProgressIncr + assert.Equal(t, uint64(1), statsDp.PartitionsDone) + statsDp = <-repl.deviceProgressIncr + assert.Equal(t, uint64(1), statsDp.FilesSent) + // 2nd partition was processed so cancel next run + cancelChan = repl.cancelers["sda"] + delete(repl.cancelers, "sda") + close(cancelChan) + // check that full replicate was tracked + statsDp = <-repl.deviceProgressIncr + assert.Equal(t, uint64(1), statsDp.FullReplicateCount) + // starting final run + statsDp = <-repl.deviceProgressPassInit + assert.Equal(t, uint64(2), statsDp.PartitionsTotal) + // but it got canceled so returning + statsDp = <-repl.deviceProgressIncr + assert.Equal(t, uint64(1), statsDp.CancelCount) +}