go: have replicateDevice go routine loop per device

This will prevent a slow disk from slowing down replication
on all the rest of the server.

This also changes the meaning of the replicator recon report. Since
we won't be waiting for the replicator to complete the whole
server the object_replication_time will be the average of
the replication time of all the devices. It won't be set until
there has been a complete pass on all the devices. This is a
little strange but I'm leaving it there for some backwards
compatibility.

The previous whole device replication times were a little misleading if
most the drives are done and it is still running on a few slow drives.
The new model makes the replication stats per drive and instead of
reporting them in recon it has a new API call to the replicator's
http server at /progress .  I could also add these per-drive stats
into recon but i will leave that as a further patch if somebody
wants it. I'm going to try to use the /progress call to get what I need.

The object replicator itself will monitor the progress of the
replicators and restart the go routines if one of them has not
reported for 4 hours.

Change-Id: Ifa5defbe684329979a533b558a57c38ac029ada1
This commit is contained in:
David Goetz
2016-04-12 13:33:33 -05:00
parent 20418501b6
commit 9be450cfba
2 changed files with 400 additions and 163 deletions

View File

@@ -27,7 +27,6 @@ import (
"os"
"path/filepath"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
@@ -36,10 +35,9 @@ import (
"github.com/openstack/swift/go/hummingbird"
)
var ReplicationSessionTimeout = 60 * time.Second
var RunForeverInterval = 30 * time.Second
var StatsReportInterval = 300 * time.Second
var TmpEmptyTime = 24 * time.Hour
var ReplicateDeviceTimeout = 4 * time.Hour
// Encapsulates a partition for replication.
type job struct {
@@ -67,6 +65,23 @@ type PriorityRepJob struct {
ToDevices []*hummingbird.Device `json:"to_devices"`
}
type deviceProgress struct {
PartitionsDone uint64
PartitionsTotal uint64
StartDate time.Time
LastUpdate time.Time
FilesSent uint64
BytesSent uint64
PriorityRepsDone uint64
FullReplicateCount uint64
CancelCount uint64
LastPassDuration time.Duration
LastPassUpdate time.Time
dev *hummingbird.Device
}
// Object replicator daemon object
type Replicator struct {
concurrency int
@@ -83,18 +98,19 @@ type Replicator struct {
timePerPart time.Duration
quorumDelete bool
concurrencySem chan struct{}
devices []string
partitions []string
devices map[string]bool
partitions map[string]bool
priRepChans map[int]chan PriorityRepJob
priRepM sync.Mutex
reclaimAge int64
once bool
cancelers map[string]chan struct{}
/* stats accounting */
startTime time.Time
replicationCount, jobCount, dataTransferred, filesTransferred uint64
replicationCountIncrement, jobCountIncrement, dataTransferAdd chan uint64
partitionTimes sort.Float64Slice
partitionTimesAdd chan float64
deviceProgressMap map[string]*deviceProgress
deviceProgressIncr chan deviceProgress
deviceProgressPassInit chan deviceProgress
}
func (r *Replicator) LogError(format string, args ...interface{}) {
@@ -248,7 +264,7 @@ type syncFileArg struct {
dev *hummingbird.Device
}
func (r *Replicator) syncFile(objFile string, dst []*syncFileArg) (syncs int, insync int, err error) {
func (r *Replicator) syncFile(objFile string, dst []*syncFileArg, j *job) (syncs int, insync int, err error) {
var wrs []*syncFileArg
lst := strings.Split(objFile, string(os.PathSeparator))
relPath := filepath.Join(lst[len(lst)-5:]...)
@@ -312,9 +328,13 @@ func (r *Replicator) syncFile(objFile string, dst []*syncFileArg) (syncs int, in
sfa.conn.Flush()
if sfa.conn.RecvMessage(&fur) == nil {
if fur.Success {
r.dataTransferAdd <- uint64(fileSize)
syncs++
insync++
r.deviceProgressIncr <- deviceProgress{
dev: j.dev,
FilesSent: 1,
BytesSent: uint64(fileSize),
}
}
}
}
@@ -396,7 +416,7 @@ func (r *Replicator) replicateLocal(j *job, nodes []*hummingbird.Device, moreNod
toSync = append(toSync, &syncFileArg{conn: remoteConnections[dev.Id], dev: dev})
}
}
if syncs, _, err := r.syncFile(objFile, toSync); err == nil {
if syncs, _, err := r.syncFile(objFile, toSync, j); err == nil {
syncCount += syncs
} else {
r.LogError("[syncFile] %v", err)
@@ -448,7 +468,7 @@ func (r *Replicator) replicateHandoff(j *job, nodes []*hummingbird.Device) {
toSync = append(toSync, &syncFileArg{conn: remoteConnections[dev.Id], dev: dev})
}
}
if syncs, insync, err := r.syncFile(objFile, toSync); err == nil {
if syncs, insync, err := r.syncFile(objFile, toSync, j); err == nil {
syncCount += syncs
success := insync == len(nodes)
@@ -485,66 +505,106 @@ func (r *Replicator) cleanTemp(dev *hummingbird.Device) {
}
}
func (r *Replicator) replicateDevice(dev *hummingbird.Device) {
defer r.LogPanics("PANIC REPLICATING DEVICE")
// start or restart replication on a device and set up canceler
func (r *Replicator) restartReplicateDevice(dev *hummingbird.Device) {
r.devGroup.Add(1)
if canceler, ok := r.cancelers[dev.Device]; ok {
close(canceler)
}
r.cancelers[dev.Device] = make(chan struct{})
go r.replicateDevice(dev, r.cancelers[dev.Device])
}
// Run replication on given device. For normal usage do not call directly, use "restartReplicateDevice"
func (r *Replicator) replicateDevice(dev *hummingbird.Device, canceler chan struct{}) {
defer r.LogPanics(fmt.Sprintf("PANIC REPLICATING DEVICE: %s", dev.Device))
defer r.devGroup.Done()
var lastPassDuration time.Duration
for {
r.cleanTemp(dev)
passStartTime := time.Now()
r.cleanTemp(dev)
if mounted, err := hummingbird.IsMount(filepath.Join(r.driveRoot, dev.Device)); r.checkMounts && (err != nil || mounted != true) {
r.LogError("[replicateDevice] Drive not mounted: %s", dev.Device)
return
}
objPath := filepath.Join(r.driveRoot, dev.Device, "objects")
if fi, err := os.Stat(objPath); err != nil || !fi.Mode().IsDir() {
r.LogError("[replicateDevice] No objects found: %s", objPath)
return
}
partitionList, err := filepath.Glob(filepath.Join(objPath, "[0-9]*"))
if err != nil {
r.LogError("[replicateDevice] Error getting partition list: %s (%v)", objPath, err)
return
}
for i := len(partitionList) - 1; i > 0; i-- { // shuffle partition list
j := rand.Intn(i + 1)
partitionList[j], partitionList[i] = partitionList[i], partitionList[j]
}
r.jobCountIncrement <- uint64(len(partitionList))
for _, partition := range partitionList {
if hummingbird.Exists(filepath.Join(r.driveRoot, dev.Device, "lock_device")) {
if mounted, err := hummingbird.IsMount(filepath.Join(r.driveRoot, dev.Device)); r.checkMounts && (err != nil || mounted != true) {
r.LogError("[replicateDevice] Drive not mounted: %s", dev.Device)
break
}
r.processPriorityJobs(dev.Id)
if len(r.partitions) > 0 {
found := false
for _, p := range r.partitions {
if filepath.Base(partition) == p {
found = true
break
}
objPath := filepath.Join(r.driveRoot, dev.Device, "objects")
if fi, err := os.Stat(objPath); err != nil || !fi.Mode().IsDir() {
r.LogError("[replicateDevice] No objects found: %s", objPath)
break
}
partitionList, err := filepath.Glob(filepath.Join(objPath, "[0-9]*"))
if err != nil {
r.LogError("[replicateDevice] Error getting partition list: %s (%v)", objPath, err)
break
}
// if one of the breaks above triggers, the monitorer should retry after ReplicateDeviceTimeout
for i := len(partitionList) - 1; i > 0; i-- { // shuffle partition list
j := rand.Intn(i + 1)
partitionList[j], partitionList[i] = partitionList[i], partitionList[j]
}
numPartitions := uint64(len(partitionList))
partitionsProcessed := uint64(0)
r.deviceProgressPassInit <- deviceProgress{
dev: dev,
PartitionsTotal: numPartitions,
LastPassDuration: lastPassDuration,
}
for _, partition := range partitionList {
if hummingbird.Exists(filepath.Join(r.driveRoot, dev.Device, "lock_device")) {
break
}
if !found {
select {
case <-canceler:
{
r.deviceProgressIncr <- deviceProgress{
dev: dev,
CancelCount: 1,
}
r.LogError("replicateDevice canceled for device: %s", dev.Device)
return
}
default:
}
r.processPriorityJobs(dev.Id)
if _, ok := r.partitions[filepath.Base(partition)]; len(r.partitions) > 0 && !ok {
continue
}
}
if partitioni, err := strconv.ParseUint(filepath.Base(partition), 10, 64); err == nil {
func() {
<-r.partRateTicker.C
r.concurrencySem <- struct{}{}
r.replicationCountIncrement <- 1
j := &job{objPath: objPath, partition: filepath.Base(partition), dev: dev}
nodes, handoff := r.Ring.GetJobNodes(partitioni, j.dev.Id)
partStart := time.Now()
defer func() {
<-r.concurrencySem
r.partitionTimesAdd <- float64(time.Since(partStart)) / float64(time.Second)
if partitioni, err := strconv.ParseUint(filepath.Base(partition), 10, 64); err == nil {
func() {
<-r.partRateTicker.C
r.concurrencySem <- struct{}{}
r.deviceProgressIncr <- deviceProgress{
dev: dev,
PartitionsDone: 1,
}
j := &job{objPath: objPath, partition: filepath.Base(partition), dev: dev}
nodes, handoff := r.Ring.GetJobNodes(partitioni, j.dev.Id)
defer func() {
<-r.concurrencySem
}()
partitionsProcessed += 1
if handoff {
r.replicateHandoff(j, nodes)
} else {
r.replicateLocal(j, nodes, r.Ring.GetMoreNodes(partitioni))
}
}()
if handoff {
r.replicateHandoff(j, nodes)
} else {
r.replicateLocal(j, nodes, r.Ring.GetMoreNodes(partitioni))
}
}()
}
}
if partitionsProcessed >= numPartitions {
r.deviceProgressIncr <- deviceProgress{
dev: dev,
FullReplicateCount: 1,
}
lastPassDuration = time.Since(passStartTime)
}
if r.once {
break
}
}
}
@@ -553,22 +613,75 @@ func (r *Replicator) replicateDevice(dev *hummingbird.Device) {
func (r *Replicator) statsReporter(c <-chan time.Time) {
for {
select {
case dt := <-r.dataTransferAdd:
r.dataTransferred += dt
r.filesTransferred += 1
case jobs := <-r.jobCountIncrement:
r.jobCount += jobs
case replicates := <-r.replicationCountIncrement:
r.replicationCount += replicates
case partitionTime := <-r.partitionTimesAdd:
r.partitionTimes = append(r.partitionTimes, partitionTime)
case now, ok := <-c:
case dp := <-r.deviceProgressPassInit:
curDp, ok := r.deviceProgressMap[dp.dev.Device]
if !ok {
curDp = &deviceProgress{
dev: dp.dev,
StartDate: time.Now(),
LastUpdate: time.Now(),
}
r.deviceProgressMap[dp.dev.Device] = curDp
}
curDp.StartDate = time.Now()
curDp.LastUpdate = time.Now()
curDp.PartitionsDone = 0
curDp.PartitionsTotal = dp.PartitionsTotal
curDp.FilesSent = 0
curDp.BytesSent = 0
curDp.PriorityRepsDone = 0
curDp.LastPassDuration = dp.LastPassDuration
if dp.LastPassDuration > 0 {
curDp.LastPassUpdate = time.Now()
}
case dp := <-r.deviceProgressIncr:
if curDp, ok := r.deviceProgressMap[dp.dev.Device]; !ok {
r.LogError("Trying to increment progress and not present: %s", dp.dev.Device)
} else {
curDp.LastUpdate = time.Now()
curDp.PartitionsDone += dp.PartitionsDone
curDp.FilesSent += dp.FilesSent
curDp.BytesSent += dp.BytesSent
curDp.PriorityRepsDone += dp.PriorityRepsDone
curDp.FullReplicateCount += dp.FullReplicateCount
curDp.CancelCount += dp.CancelCount
}
case _, ok := <-c:
if !ok {
return
}
if r.replicationCount > 0 {
elapsed := float64(now.Sub(r.startTime)) / float64(time.Second)
remaining := time.Duration(float64(now.Sub(r.startTime))/(float64(r.replicationCount)/float64(r.jobCount))) - now.Sub(r.startTime)
var totalParts uint64
var doneParts uint64
var bytesProcessed uint64
var filesProcessed uint64
var processingDuration time.Duration
allHaveCompleted := true
var totalDuration time.Duration
var maxLastPassUpdate time.Time
for _, dp := range r.deviceProgressMap {
if time.Since(dp.LastUpdate) > ReplicateDeviceTimeout {
r.restartReplicateDevice(dp.dev)
continue
}
totalParts += dp.PartitionsTotal
doneParts += dp.PartitionsDone
bytesProcessed += dp.BytesSent
filesProcessed += dp.FilesSent
processingDuration += dp.LastUpdate.Sub(dp.StartDate)
allHaveCompleted = allHaveCompleted && (dp.LastPassDuration > 0)
totalDuration += processingDuration
if maxLastPassUpdate.Before(dp.LastPassUpdate) {
maxLastPassUpdate = dp.LastPassUpdate
}
}
if doneParts > 0 {
processingNsecs := float64(processingDuration.Nanoseconds())
partsPerNsecond := float64(doneParts) / processingNsecs
remaining := time.Duration(float64(totalParts-doneParts) / partsPerNsecond)
var remainingStr string
if remaining >= time.Hour {
remainingStr = fmt.Sprintf("%.0fh", remaining.Hours())
@@ -577,70 +690,42 @@ func (r *Replicator) statsReporter(c <-chan time.Time) {
} else {
remainingStr = fmt.Sprintf("%.0fs", remaining.Seconds())
}
r.LogInfo("%d/%d (%.2f%%) partitions replicated in %.2fs (%.2f/sec, %v remaining)",
r.replicationCount, r.jobCount, float64(100*r.replicationCount)/float64(r.jobCount),
elapsed, float64(r.replicationCount)/elapsed, remainingStr)
r.LogInfo("%d/%d (%.2f%%) partitions replicated in %.2f worker seconds (%.2f/sec, %v remaining)",
doneParts, totalParts, float64(100*doneParts)/float64(totalParts),
processingNsecs/float64(time.Second), partsPerNsecond*float64(time.Second), remainingStr)
}
if len(r.partitionTimes) > 0 {
r.partitionTimes.Sort()
r.LogInfo("Partition times: max %.4fs, min %.4fs, med %.4fs",
r.partitionTimes[len(r.partitionTimes)-1], r.partitionTimes[0],
r.partitionTimes[len(r.partitionTimes)/2])
}
if r.dataTransferred > 0 {
elapsed := float64(now.Sub(r.startTime)) / float64(time.Second)
r.LogInfo("Data synced: %d (%.2f kbps), files synced: %d",
r.dataTransferred, ((float64(r.dataTransferred)/1024.0)*8.0)/elapsed, r.filesTransferred)
if allHaveCompleted {
// this is a little lame- i'd rather just drop this completely
hummingbird.DumpReconCache(r.reconCachePath, "object",
map[string]interface{}{
"object_replication_time": float64(totalDuration) / float64(len(r.deviceProgressMap)) / float64(time.Second),
"object_replication_last": float64(maxLastPassUpdate.UnixNano()) / float64(time.Second),
})
}
}
}
}
// Run replication passes of the whole server until c is closed.
func (r *Replicator) run(c <-chan time.Time) {
for _ = range c {
r.partitionTimes = nil
r.jobCount = 0
r.replicationCount = 0
r.dataTransferred = 0
r.filesTransferred = 0
r.startTime = time.Now()
statsTicker := time.NewTicker(StatsReportInterval)
go r.statsReporter(statsTicker.C)
// Run replication passes for each device on the whole server.
func (r *Replicator) run() {
statsTicker := time.NewTicker(StatsReportInterval)
go r.statsReporter(statsTicker.C)
r.partRateTicker = time.NewTicker(r.timePerPart)
r.concurrencySem = make(chan struct{}, r.concurrency)
localDevices, err := r.Ring.LocalDevices(r.port)
if err != nil {
r.LogError("Error getting local devices: %v", err)
continue
}
for _, dev := range localDevices {
if len(r.devices) > 0 {
found := false
for _, d := range r.devices {
if dev.Device == d {
found = true
break
}
}
if !found {
continue
}
}
r.devGroup.Add(1)
go r.replicateDevice(dev)
}
r.devGroup.Wait()
r.partRateTicker.Stop()
statsTicker.Stop()
r.statsReporter(OneTimeChan())
hummingbird.DumpReconCache(r.reconCachePath, "object",
map[string]interface{}{
"object_replication_time": float64(time.Since(r.startTime)) / float64(time.Second),
"object_replication_last": float64(time.Now().UnixNano()) / float64(time.Second),
})
r.partRateTicker = time.NewTicker(r.timePerPart)
r.concurrencySem = make(chan struct{}, r.concurrency)
localDevices, err := r.Ring.LocalDevices(r.port)
if err != nil {
r.LogError("Error getting local devices: %v", err)
return
}
for _, dev := range localDevices {
if _, ok := r.devices[dev.Device]; ok || len(r.devices) == 0 {
r.restartReplicateDevice(dev)
}
}
r.devGroup.Wait()
}
// processPriorityJobs runs any pending priority jobs given the device's id
@@ -648,21 +733,22 @@ func (r *Replicator) processPriorityJobs(id int) {
for {
select {
case pri := <-r.getPriRepChan(id):
r.jobCountIncrement <- 1
r.deviceProgressIncr <- deviceProgress{
dev: pri.FromDevice,
PriorityRepsDone: 1,
}
func() {
<-r.partRateTicker.C
r.concurrencySem <- struct{}{}
r.replicationCountIncrement <- 1
j := &job{
dev: pri.FromDevice,
partition: strconv.FormatUint(pri.Partition, 10),
objPath: filepath.Join(r.driveRoot, pri.FromDevice.Device, "objects"),
}
_, handoff := r.Ring.GetJobNodes(pri.Partition, pri.FromDevice.Id)
partStart := time.Now()
defer func() {
<-r.concurrencySem
r.partitionTimesAdd <- float64(time.Since(partStart)) / float64(time.Second)
}()
toDevicesArr := make([]string, len(pri.ToDevices))
for i, s := range pri.ToDevices {
@@ -695,6 +781,27 @@ func (r *Replicator) getPriRepChan(id int) chan PriorityRepJob {
return r.priRepChans[id]
}
// ProgressReportHandler handles HTTP requests for current replication progress
func (r *Replicator) ProgressReportHandler(w http.ResponseWriter, req *http.Request) {
_, err := ioutil.ReadAll(req.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
data, err := json.Marshal(r.deviceProgressMap)
if err != nil {
r.LogError("Error Marshaling device progress: ", err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
w.Write(data)
return
}
// priorityRepHandler handles HTTP requests for priority replications jobs.
func (r *Replicator) priorityRepHandler(w http.ResponseWriter, req *http.Request) {
var pri PriorityRepJob
@@ -729,6 +836,7 @@ func (r *Replicator) priorityRepHandler(w http.ResponseWriter, req *http.Request
func (r *Replicator) startWebServer() {
mux := http.NewServeMux()
mux.HandleFunc("/priorityrep", r.priorityRepHandler)
mux.HandleFunc("/progress", r.ProgressReportHandler)
mux.Handle("/debug/", http.DefaultServeMux)
for {
if sock, err := hummingbird.RetryListen(r.bindIp, r.bindPort); err != nil {
@@ -739,24 +847,28 @@ func (r *Replicator) startWebServer() {
}
}
// Run a single replication pass.
// Run a single replication pass. (NOTE: we will prob get rid of this because of priorityRepl)
func (r *Replicator) Run() {
r.run(OneTimeChan())
r.once = true
r.run()
}
// Run replication passes in a loop until forever.
func (r *Replicator) RunForever() {
go r.startWebServer()
r.run(time.Tick(RunForeverInterval))
r.run()
}
func NewReplicator(conf string, flags *flag.FlagSet) (hummingbird.Daemon, error) {
replicator := &Replicator{
partitionTimesAdd: make(chan float64),
replicationCountIncrement: make(chan uint64),
jobCountIncrement: make(chan uint64),
dataTransferAdd: make(chan uint64),
priRepChans: make(map[int]chan PriorityRepJob),
priRepChans: make(map[int]chan PriorityRepJob),
deviceProgressMap: make(map[string]*deviceProgress),
deviceProgressPassInit: make(chan deviceProgress),
deviceProgressIncr: make(chan deviceProgress),
devices: make(map[string]bool),
partitions: make(map[string]bool),
cancelers: make(map[string]chan struct{}),
once: false,
}
hashPathPrefix, hashPathSuffix, err := hummingbird.GetHashPrefixAndSuffix()
if err != nil {
@@ -787,13 +899,17 @@ func NewReplicator(conf string, flags *flag.FlagSet) (hummingbird.Daemon, error)
devices_flag := flags.Lookup("devices")
if devices_flag != nil {
if devices := devices_flag.Value.(flag.Getter).Get().(string); len(devices) > 0 {
replicator.devices = strings.Split(devices, ",")
for _, devName := range strings.Split(devices, ",") {
replicator.devices[strings.TrimSpace(devName)] = true
}
}
}
partitions_flag := flags.Lookup("partitions")
if partitions_flag != nil {
if partitions := partitions_flag.Value.(flag.Getter).Get().(string); len(partitions) > 0 {
replicator.partitions = strings.Split(partitions, ",")
for _, part := range strings.Split(partitions, ",") {
replicator.partitions[strings.TrimSpace(part)] = true
}
}
}
if !replicator.quorumDelete {

View File

@@ -28,6 +28,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
@@ -148,7 +149,7 @@ func TestCleanTemp(t *testing.T) {
assert.False(t, hummingbird.Exists(filepath.Join(driveRoot, "sda", "tmp", "oldfile")))
}
func TestReplicatorReportStats(t *testing.T) {
func TestReplicatorReportStatsNotSetup(t *testing.T) {
saved := &replicationLogSaver{}
replicator := makeReplicator("devices", os.TempDir(), "ms_per_part", "1", "concurrency", "3")
replicator.logger = saved
@@ -159,25 +160,51 @@ func TestReplicatorReportStats(t *testing.T) {
replicator.statsReporter(c)
done <- true
}()
replicator.jobCountIncrement <- 100
replicator.replicationCountIncrement <- 50
replicator.partitionTimesAdd <- 10.0
replicator.partitionTimesAdd <- 20.0
replicator.partitionTimesAdd <- 15.0
replicator.deviceProgressIncr <- deviceProgress{
dev: &hummingbird.Device{Device: "sda"},
PartitionsTotal: 12}
c <- t
close(c)
<-done
return saved.logged[len(saved.logged)-1]
}
var remaining int
var elapsed, rate float64
replicator.startTime = time.Now()
reportStats(time.Now().Add(100 * time.Second))
cnt, err := fmt.Sscanf(saved.logged[0], "50/100 (50.00%%) partitions replicated in %fs (%f/sec, %dm remaining)", &elapsed, &rate, &remaining)
assert.Nil(t, err)
assert.Equal(t, 3, cnt)
assert.Equal(t, 2, remaining)
assert.Equal(t, saved.logged[1], "Partition times: max 20.0000s, min 10.0000s, med 15.0000s")
assert.Equal(t, "Trying to increment progress and not present: sda", saved.logged[0])
}
func TestReplicatorReportStats(t *testing.T) {
saved := &replicationLogSaver{}
replicator := makeReplicator("devices", os.TempDir(), "ms_per_part", "1", "concurrency", "3")
replicator.logger = saved
replicator.deviceProgressMap["sda"] = &deviceProgress{
dev: &hummingbird.Device{Device: "sda"}}
replicator.deviceProgressMap["sdb"] = &deviceProgress{
dev: &hummingbird.Device{Device: "sdb"}}
reportStats := func(t time.Time) string {
c := make(chan time.Time)
done := make(chan bool)
go func() {
replicator.statsReporter(c)
done <- true
}()
replicator.deviceProgressPassInit <- deviceProgress{
dev: &hummingbird.Device{Device: "sda"},
PartitionsTotal: 10}
replicator.deviceProgressIncr <- deviceProgress{
dev: &hummingbird.Device{Device: "sda"},
PartitionsDone: 10}
c <- t
close(c)
<-done
return saved.logged[len(saved.logged)-1]
}
reportStats(time.Now().Add(100 * time.Second))
assert.Equal(t, replicator.deviceProgressMap["sda"].PartitionsDone, uint64(10))
assert.NotEqual(t, strings.Index(saved.logged[0], "10/10 (100.00%) partitions replicated in"), -1)
}
type FakeLocalRing struct {
@@ -521,3 +548,97 @@ func TestPriorityRepHandler404(t *testing.T) {
replicator.priorityRepHandler(w, req)
require.EqualValues(t, 404, w.Code)
}
func TestRestartDevice(t *testing.T) {
ts, err := makeObjectServer()
assert.Nil(t, err)
defer ts.Close()
ts2, err := makeObjectServer()
assert.Nil(t, err)
defer ts2.Close()
req, err := http.NewRequest("PUT", fmt.Sprintf("http://%s:%d/sda/0/a/c/o", ts.host, ts.port),
bytes.NewBuffer([]byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ")))
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Content-Length", "26")
req.Header.Set("X-Timestamp", hummingbird.GetTimestamp())
resp, err := http.DefaultClient.Do(req)
require.Nil(t, err)
require.Equal(t, 201, resp.StatusCode)
req, err = http.NewRequest("PUT", fmt.Sprintf("http://%s:%d/sda/1/a/c/o2", ts.host, ts.port),
bytes.NewBuffer([]byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ")))
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Content-Length", "26")
req.Header.Set("X-Timestamp", hummingbird.GetTimestamp())
resp, err = http.DefaultClient.Do(req)
require.Nil(t, err)
require.Equal(t, 201, resp.StatusCode)
ldev := &hummingbird.Device{ReplicationIp: ts.host, ReplicationPort: ts.port, Device: "sda"}
rdev := &hummingbird.Device{ReplicationIp: ts2.host, ReplicationPort: ts2.port, Device: "sda"}
dp := &deviceProgress{
dev: ldev,
StartDate: time.Now(),
LastUpdate: time.Now(),
}
saved := &replicationLogSaver{}
repl := makeReplicator()
repl.Ring = &FakeRepRing2{ldev: ldev, rdev: rdev}
repl.logger = saved
// set stuff up
repl.driveRoot = ts.objServer.driveRoot
myTicker := make(chan time.Time)
repl.partRateTicker = time.NewTicker(repl.timePerPart)
repl.partRateTicker.C = myTicker
repl.concurrencySem = make(chan struct{}, 5)
repl.deviceProgressMap["sda"] = dp
repl.restartReplicateDevice(ldev)
cancelChan := repl.cancelers["sda"]
// precancel the run
delete(repl.cancelers, "sda")
close(cancelChan)
//start replication for loop
statsDp := <-repl.deviceProgressPassInit
assert.Equal(t, uint64(2), statsDp.PartitionsTotal)
// but got canceled
statsDp = <-repl.deviceProgressIncr
assert.Equal(t, uint64(1), statsDp.CancelCount)
// start up everything again
repl.restartReplicateDevice(ldev)
//start replication for loop again
<-repl.deviceProgressPassInit
// 1st partition process
myTicker <- time.Now()
statsDp = <-repl.deviceProgressIncr
assert.Equal(t, uint64(1), statsDp.PartitionsDone)
// syncing file
statsDp = <-repl.deviceProgressIncr
assert.Equal(t, uint64(1), statsDp.FilesSent)
// 2nd partition process
myTicker <- time.Now()
statsDp = <-repl.deviceProgressIncr
assert.Equal(t, uint64(1), statsDp.PartitionsDone)
statsDp = <-repl.deviceProgressIncr
assert.Equal(t, uint64(1), statsDp.FilesSent)
// 2nd partition was processed so cancel next run
cancelChan = repl.cancelers["sda"]
delete(repl.cancelers, "sda")
close(cancelChan)
// check that full replicate was tracked
statsDp = <-repl.deviceProgressIncr
assert.Equal(t, uint64(1), statsDp.FullReplicateCount)
// starting final run
statsDp = <-repl.deviceProgressPassInit
assert.Equal(t, uint64(2), statsDp.PartitionsTotal)
// but it got canceled so returning
statsDp = <-repl.deviceProgressIncr
assert.Equal(t, uint64(1), statsDp.CancelCount)
}