Merge "go: have replicateDevice go routine loop per device" into feature/hummingbird

This commit is contained in:
Jenkins
2016-05-26 21:11:01 +00:00
committed by Gerrit Code Review
2 changed files with 400 additions and 163 deletions

View File

@@ -27,7 +27,6 @@ import (
"os"
"path/filepath"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
@@ -36,10 +35,9 @@ import (
"github.com/openstack/swift/go/hummingbird"
)
var ReplicationSessionTimeout = 60 * time.Second
var RunForeverInterval = 30 * time.Second
var StatsReportInterval = 300 * time.Second
var TmpEmptyTime = 24 * time.Hour
var ReplicateDeviceTimeout = 4 * time.Hour
// Encapsulates a partition for replication.
type job struct {
@@ -67,6 +65,23 @@ type PriorityRepJob struct {
ToDevices []*hummingbird.Device `json:"to_devices"`
}
type deviceProgress struct {
PartitionsDone uint64
PartitionsTotal uint64
StartDate time.Time
LastUpdate time.Time
FilesSent uint64
BytesSent uint64
PriorityRepsDone uint64
FullReplicateCount uint64
CancelCount uint64
LastPassDuration time.Duration
LastPassUpdate time.Time
dev *hummingbird.Device
}
// Object replicator daemon object
type Replicator struct {
concurrency int
@@ -83,18 +98,19 @@ type Replicator struct {
timePerPart time.Duration
quorumDelete bool
concurrencySem chan struct{}
devices []string
partitions []string
devices map[string]bool
partitions map[string]bool
priRepChans map[int]chan PriorityRepJob
priRepM sync.Mutex
reclaimAge int64
once bool
cancelers map[string]chan struct{}
/* stats accounting */
startTime time.Time
replicationCount, jobCount, dataTransferred, filesTransferred uint64
replicationCountIncrement, jobCountIncrement, dataTransferAdd chan uint64
partitionTimes sort.Float64Slice
partitionTimesAdd chan float64
deviceProgressMap map[string]*deviceProgress
deviceProgressIncr chan deviceProgress
deviceProgressPassInit chan deviceProgress
}
func (r *Replicator) LogError(format string, args ...interface{}) {
@@ -252,7 +268,7 @@ type syncFileArg struct {
dev *hummingbird.Device
}
func (r *Replicator) syncFile(objFile string, dst []*syncFileArg) (syncs int, insync int, err error) {
func (r *Replicator) syncFile(objFile string, dst []*syncFileArg, j *job) (syncs int, insync int, err error) {
var wrs []*syncFileArg
lst := strings.Split(objFile, string(os.PathSeparator))
relPath := filepath.Join(lst[len(lst)-5:]...)
@@ -316,9 +332,13 @@ func (r *Replicator) syncFile(objFile string, dst []*syncFileArg) (syncs int, in
sfa.conn.Flush()
if sfa.conn.RecvMessage(&fur) == nil {
if fur.Success {
r.dataTransferAdd <- uint64(fileSize)
syncs++
insync++
r.deviceProgressIncr <- deviceProgress{
dev: j.dev,
FilesSent: 1,
BytesSent: uint64(fileSize),
}
}
}
}
@@ -400,7 +420,7 @@ func (r *Replicator) replicateLocal(j *job, nodes []*hummingbird.Device, moreNod
toSync = append(toSync, &syncFileArg{conn: remoteConnections[dev.Id], dev: dev})
}
}
if syncs, _, err := r.syncFile(objFile, toSync); err == nil {
if syncs, _, err := r.syncFile(objFile, toSync, j); err == nil {
syncCount += syncs
} else {
r.LogError("[syncFile] %v", err)
@@ -452,7 +472,7 @@ func (r *Replicator) replicateHandoff(j *job, nodes []*hummingbird.Device) {
toSync = append(toSync, &syncFileArg{conn: remoteConnections[dev.Id], dev: dev})
}
}
if syncs, insync, err := r.syncFile(objFile, toSync); err == nil {
if syncs, insync, err := r.syncFile(objFile, toSync, j); err == nil {
syncCount += syncs
success := insync == len(nodes)
@@ -489,66 +509,106 @@ func (r *Replicator) cleanTemp(dev *hummingbird.Device) {
}
}
func (r *Replicator) replicateDevice(dev *hummingbird.Device) {
defer r.LogPanics("PANIC REPLICATING DEVICE")
// start or restart replication on a device and set up canceler
func (r *Replicator) restartReplicateDevice(dev *hummingbird.Device) {
r.devGroup.Add(1)
if canceler, ok := r.cancelers[dev.Device]; ok {
close(canceler)
}
r.cancelers[dev.Device] = make(chan struct{})
go r.replicateDevice(dev, r.cancelers[dev.Device])
}
// Run replication on given device. For normal usage do not call directly, use "restartReplicateDevice"
func (r *Replicator) replicateDevice(dev *hummingbird.Device, canceler chan struct{}) {
defer r.LogPanics(fmt.Sprintf("PANIC REPLICATING DEVICE: %s", dev.Device))
defer r.devGroup.Done()
var lastPassDuration time.Duration
for {
r.cleanTemp(dev)
passStartTime := time.Now()
r.cleanTemp(dev)
if mounted, err := hummingbird.IsMount(filepath.Join(r.driveRoot, dev.Device)); r.checkMounts && (err != nil || mounted != true) {
r.LogError("[replicateDevice] Drive not mounted: %s", dev.Device)
return
}
objPath := filepath.Join(r.driveRoot, dev.Device, "objects")
if fi, err := os.Stat(objPath); err != nil || !fi.Mode().IsDir() {
r.LogError("[replicateDevice] No objects found: %s", objPath)
return
}
partitionList, err := filepath.Glob(filepath.Join(objPath, "[0-9]*"))
if err != nil {
r.LogError("[replicateDevice] Error getting partition list: %s (%v)", objPath, err)
return
}
for i := len(partitionList) - 1; i > 0; i-- { // shuffle partition list
j := rand.Intn(i + 1)
partitionList[j], partitionList[i] = partitionList[i], partitionList[j]
}
r.jobCountIncrement <- uint64(len(partitionList))
for _, partition := range partitionList {
if hummingbird.Exists(filepath.Join(r.driveRoot, dev.Device, "lock_device")) {
if mounted, err := hummingbird.IsMount(filepath.Join(r.driveRoot, dev.Device)); r.checkMounts && (err != nil || mounted != true) {
r.LogError("[replicateDevice] Drive not mounted: %s", dev.Device)
break
}
r.processPriorityJobs(dev.Id)
if len(r.partitions) > 0 {
found := false
for _, p := range r.partitions {
if filepath.Base(partition) == p {
found = true
break
}
objPath := filepath.Join(r.driveRoot, dev.Device, "objects")
if fi, err := os.Stat(objPath); err != nil || !fi.Mode().IsDir() {
r.LogError("[replicateDevice] No objects found: %s", objPath)
break
}
partitionList, err := filepath.Glob(filepath.Join(objPath, "[0-9]*"))
if err != nil {
r.LogError("[replicateDevice] Error getting partition list: %s (%v)", objPath, err)
break
}
// if one of the breaks above triggers, the monitorer should retry after ReplicateDeviceTimeout
for i := len(partitionList) - 1; i > 0; i-- { // shuffle partition list
j := rand.Intn(i + 1)
partitionList[j], partitionList[i] = partitionList[i], partitionList[j]
}
numPartitions := uint64(len(partitionList))
partitionsProcessed := uint64(0)
r.deviceProgressPassInit <- deviceProgress{
dev: dev,
PartitionsTotal: numPartitions,
LastPassDuration: lastPassDuration,
}
for _, partition := range partitionList {
if hummingbird.Exists(filepath.Join(r.driveRoot, dev.Device, "lock_device")) {
break
}
if !found {
select {
case <-canceler:
{
r.deviceProgressIncr <- deviceProgress{
dev: dev,
CancelCount: 1,
}
r.LogError("replicateDevice canceled for device: %s", dev.Device)
return
}
default:
}
r.processPriorityJobs(dev.Id)
if _, ok := r.partitions[filepath.Base(partition)]; len(r.partitions) > 0 && !ok {
continue
}
}
if partitioni, err := strconv.ParseUint(filepath.Base(partition), 10, 64); err == nil {
func() {
<-r.partRateTicker.C
r.concurrencySem <- struct{}{}
r.replicationCountIncrement <- 1
j := &job{objPath: objPath, partition: filepath.Base(partition), dev: dev}
nodes, handoff := r.Ring.GetJobNodes(partitioni, j.dev.Id)
partStart := time.Now()
defer func() {
<-r.concurrencySem
r.partitionTimesAdd <- float64(time.Since(partStart)) / float64(time.Second)
if partitioni, err := strconv.ParseUint(filepath.Base(partition), 10, 64); err == nil {
func() {
<-r.partRateTicker.C
r.concurrencySem <- struct{}{}
r.deviceProgressIncr <- deviceProgress{
dev: dev,
PartitionsDone: 1,
}
j := &job{objPath: objPath, partition: filepath.Base(partition), dev: dev}
nodes, handoff := r.Ring.GetJobNodes(partitioni, j.dev.Id)
defer func() {
<-r.concurrencySem
}()
partitionsProcessed += 1
if handoff {
r.replicateHandoff(j, nodes)
} else {
r.replicateLocal(j, nodes, r.Ring.GetMoreNodes(partitioni))
}
}()
if handoff {
r.replicateHandoff(j, nodes)
} else {
r.replicateLocal(j, nodes, r.Ring.GetMoreNodes(partitioni))
}
}()
}
}
if partitionsProcessed >= numPartitions {
r.deviceProgressIncr <- deviceProgress{
dev: dev,
FullReplicateCount: 1,
}
lastPassDuration = time.Since(passStartTime)
}
if r.once {
break
}
}
}
@@ -557,22 +617,75 @@ func (r *Replicator) replicateDevice(dev *hummingbird.Device) {
func (r *Replicator) statsReporter(c <-chan time.Time) {
for {
select {
case dt := <-r.dataTransferAdd:
r.dataTransferred += dt
r.filesTransferred += 1
case jobs := <-r.jobCountIncrement:
r.jobCount += jobs
case replicates := <-r.replicationCountIncrement:
r.replicationCount += replicates
case partitionTime := <-r.partitionTimesAdd:
r.partitionTimes = append(r.partitionTimes, partitionTime)
case now, ok := <-c:
case dp := <-r.deviceProgressPassInit:
curDp, ok := r.deviceProgressMap[dp.dev.Device]
if !ok {
curDp = &deviceProgress{
dev: dp.dev,
StartDate: time.Now(),
LastUpdate: time.Now(),
}
r.deviceProgressMap[dp.dev.Device] = curDp
}
curDp.StartDate = time.Now()
curDp.LastUpdate = time.Now()
curDp.PartitionsDone = 0
curDp.PartitionsTotal = dp.PartitionsTotal
curDp.FilesSent = 0
curDp.BytesSent = 0
curDp.PriorityRepsDone = 0
curDp.LastPassDuration = dp.LastPassDuration
if dp.LastPassDuration > 0 {
curDp.LastPassUpdate = time.Now()
}
case dp := <-r.deviceProgressIncr:
if curDp, ok := r.deviceProgressMap[dp.dev.Device]; !ok {
r.LogError("Trying to increment progress and not present: %s", dp.dev.Device)
} else {
curDp.LastUpdate = time.Now()
curDp.PartitionsDone += dp.PartitionsDone
curDp.FilesSent += dp.FilesSent
curDp.BytesSent += dp.BytesSent
curDp.PriorityRepsDone += dp.PriorityRepsDone
curDp.FullReplicateCount += dp.FullReplicateCount
curDp.CancelCount += dp.CancelCount
}
case _, ok := <-c:
if !ok {
return
}
if r.replicationCount > 0 {
elapsed := float64(now.Sub(r.startTime)) / float64(time.Second)
remaining := time.Duration(float64(now.Sub(r.startTime))/(float64(r.replicationCount)/float64(r.jobCount))) - now.Sub(r.startTime)
var totalParts uint64
var doneParts uint64
var bytesProcessed uint64
var filesProcessed uint64
var processingDuration time.Duration
allHaveCompleted := true
var totalDuration time.Duration
var maxLastPassUpdate time.Time
for _, dp := range r.deviceProgressMap {
if time.Since(dp.LastUpdate) > ReplicateDeviceTimeout {
r.restartReplicateDevice(dp.dev)
continue
}
totalParts += dp.PartitionsTotal
doneParts += dp.PartitionsDone
bytesProcessed += dp.BytesSent
filesProcessed += dp.FilesSent
processingDuration += dp.LastUpdate.Sub(dp.StartDate)
allHaveCompleted = allHaveCompleted && (dp.LastPassDuration > 0)
totalDuration += processingDuration
if maxLastPassUpdate.Before(dp.LastPassUpdate) {
maxLastPassUpdate = dp.LastPassUpdate
}
}
if doneParts > 0 {
processingNsecs := float64(processingDuration.Nanoseconds())
partsPerNsecond := float64(doneParts) / processingNsecs
remaining := time.Duration(float64(totalParts-doneParts) / partsPerNsecond)
var remainingStr string
if remaining >= time.Hour {
remainingStr = fmt.Sprintf("%.0fh", remaining.Hours())
@@ -581,70 +694,42 @@ func (r *Replicator) statsReporter(c <-chan time.Time) {
} else {
remainingStr = fmt.Sprintf("%.0fs", remaining.Seconds())
}
r.LogInfo("%d/%d (%.2f%%) partitions replicated in %.2fs (%.2f/sec, %v remaining)",
r.replicationCount, r.jobCount, float64(100*r.replicationCount)/float64(r.jobCount),
elapsed, float64(r.replicationCount)/elapsed, remainingStr)
r.LogInfo("%d/%d (%.2f%%) partitions replicated in %.2f worker seconds (%.2f/sec, %v remaining)",
doneParts, totalParts, float64(100*doneParts)/float64(totalParts),
processingNsecs/float64(time.Second), partsPerNsecond*float64(time.Second), remainingStr)
}
if len(r.partitionTimes) > 0 {
r.partitionTimes.Sort()
r.LogInfo("Partition times: max %.4fs, min %.4fs, med %.4fs",
r.partitionTimes[len(r.partitionTimes)-1], r.partitionTimes[0],
r.partitionTimes[len(r.partitionTimes)/2])
}
if r.dataTransferred > 0 {
elapsed := float64(now.Sub(r.startTime)) / float64(time.Second)
r.LogInfo("Data synced: %d (%.2f kbps), files synced: %d",
r.dataTransferred, ((float64(r.dataTransferred)/1024.0)*8.0)/elapsed, r.filesTransferred)
if allHaveCompleted {
// this is a little lame- i'd rather just drop this completely
hummingbird.DumpReconCache(r.reconCachePath, "object",
map[string]interface{}{
"object_replication_time": float64(totalDuration) / float64(len(r.deviceProgressMap)) / float64(time.Second),
"object_replication_last": float64(maxLastPassUpdate.UnixNano()) / float64(time.Second),
})
}
}
}
}
// Run replication passes of the whole server until c is closed.
func (r *Replicator) run(c <-chan time.Time) {
for _ = range c {
r.partitionTimes = nil
r.jobCount = 0
r.replicationCount = 0
r.dataTransferred = 0
r.filesTransferred = 0
r.startTime = time.Now()
statsTicker := time.NewTicker(StatsReportInterval)
go r.statsReporter(statsTicker.C)
// Run replication passes for each device on the whole server.
func (r *Replicator) run() {
statsTicker := time.NewTicker(StatsReportInterval)
go r.statsReporter(statsTicker.C)
r.partRateTicker = time.NewTicker(r.timePerPart)
r.concurrencySem = make(chan struct{}, r.concurrency)
localDevices, err := r.Ring.LocalDevices(r.port)
if err != nil {
r.LogError("Error getting local devices: %v", err)
continue
}
for _, dev := range localDevices {
if len(r.devices) > 0 {
found := false
for _, d := range r.devices {
if dev.Device == d {
found = true
break
}
}
if !found {
continue
}
}
r.devGroup.Add(1)
go r.replicateDevice(dev)
}
r.devGroup.Wait()
r.partRateTicker.Stop()
statsTicker.Stop()
r.statsReporter(OneTimeChan())
hummingbird.DumpReconCache(r.reconCachePath, "object",
map[string]interface{}{
"object_replication_time": float64(time.Since(r.startTime)) / float64(time.Second),
"object_replication_last": float64(time.Now().UnixNano()) / float64(time.Second),
})
r.partRateTicker = time.NewTicker(r.timePerPart)
r.concurrencySem = make(chan struct{}, r.concurrency)
localDevices, err := r.Ring.LocalDevices(r.port)
if err != nil {
r.LogError("Error getting local devices: %v", err)
return
}
for _, dev := range localDevices {
if _, ok := r.devices[dev.Device]; ok || len(r.devices) == 0 {
r.restartReplicateDevice(dev)
}
}
r.devGroup.Wait()
}
// processPriorityJobs runs any pending priority jobs given the device's id
@@ -652,21 +737,22 @@ func (r *Replicator) processPriorityJobs(id int) {
for {
select {
case pri := <-r.getPriRepChan(id):
r.jobCountIncrement <- 1
r.deviceProgressIncr <- deviceProgress{
dev: pri.FromDevice,
PriorityRepsDone: 1,
}
func() {
<-r.partRateTicker.C
r.concurrencySem <- struct{}{}
r.replicationCountIncrement <- 1
j := &job{
dev: pri.FromDevice,
partition: strconv.FormatUint(pri.Partition, 10),
objPath: filepath.Join(r.driveRoot, pri.FromDevice.Device, "objects"),
}
_, handoff := r.Ring.GetJobNodes(pri.Partition, pri.FromDevice.Id)
partStart := time.Now()
defer func() {
<-r.concurrencySem
r.partitionTimesAdd <- float64(time.Since(partStart)) / float64(time.Second)
}()
toDevicesArr := make([]string, len(pri.ToDevices))
for i, s := range pri.ToDevices {
@@ -699,6 +785,27 @@ func (r *Replicator) getPriRepChan(id int) chan PriorityRepJob {
return r.priRepChans[id]
}
// ProgressReportHandler handles HTTP requests for current replication progress
func (r *Replicator) ProgressReportHandler(w http.ResponseWriter, req *http.Request) {
_, err := ioutil.ReadAll(req.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
data, err := json.Marshal(r.deviceProgressMap)
if err != nil {
r.LogError("Error Marshaling device progress: ", err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
w.Write(data)
return
}
// priorityRepHandler handles HTTP requests for priority replications jobs.
func (r *Replicator) priorityRepHandler(w http.ResponseWriter, req *http.Request) {
var pri PriorityRepJob
@@ -733,6 +840,7 @@ func (r *Replicator) priorityRepHandler(w http.ResponseWriter, req *http.Request
func (r *Replicator) startWebServer() {
mux := http.NewServeMux()
mux.HandleFunc("/priorityrep", r.priorityRepHandler)
mux.HandleFunc("/progress", r.ProgressReportHandler)
mux.Handle("/debug/", http.DefaultServeMux)
for {
if sock, err := hummingbird.RetryListen(r.bindIp, r.bindPort); err != nil {
@@ -743,15 +851,16 @@ func (r *Replicator) startWebServer() {
}
}
// Run a single replication pass.
// Run a single replication pass. (NOTE: we will prob get rid of this because of priorityRepl)
func (r *Replicator) Run() {
r.run(OneTimeChan())
r.once = true
r.run()
}
// Run replication passes in a loop until forever.
func (r *Replicator) RunForever() {
go r.startWebServer()
r.run(time.Tick(RunForeverInterval))
r.run()
}
func NewReplicator(serverconf hummingbird.Config, flags *flag.FlagSet) (hummingbird.Daemon, error) {
@@ -760,11 +869,14 @@ func NewReplicator(serverconf hummingbird.Config, flags *flag.FlagSet) (hummingb
}
replicator := &Replicator{
partitionTimesAdd: make(chan float64),
replicationCountIncrement: make(chan uint64),
jobCountIncrement: make(chan uint64),
dataTransferAdd: make(chan uint64),
priRepChans: make(map[int]chan PriorityRepJob),
priRepChans: make(map[int]chan PriorityRepJob),
deviceProgressMap: make(map[string]*deviceProgress),
deviceProgressPassInit: make(chan deviceProgress),
deviceProgressIncr: make(chan deviceProgress),
devices: make(map[string]bool),
partitions: make(map[string]bool),
cancelers: make(map[string]chan struct{}),
once: false,
}
hashPathPrefix, hashPathSuffix, err := hummingbird.GetHashPrefixAndSuffix()
if err != nil {
@@ -791,13 +903,17 @@ func NewReplicator(serverconf hummingbird.Config, flags *flag.FlagSet) (hummingb
devices_flag := flags.Lookup("devices")
if devices_flag != nil {
if devices := devices_flag.Value.(flag.Getter).Get().(string); len(devices) > 0 {
replicator.devices = strings.Split(devices, ",")
for _, devName := range strings.Split(devices, ",") {
replicator.devices[strings.TrimSpace(devName)] = true
}
}
}
partitions_flag := flags.Lookup("partitions")
if partitions_flag != nil {
if partitions := partitions_flag.Value.(flag.Getter).Get().(string); len(partitions) > 0 {
replicator.partitions = strings.Split(partitions, ",")
for _, part := range strings.Split(partitions, ",") {
replicator.partitions[strings.TrimSpace(part)] = true
}
}
}
if !replicator.quorumDelete {

View File

@@ -28,6 +28,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
@@ -146,7 +147,7 @@ func TestCleanTemp(t *testing.T) {
assert.False(t, hummingbird.Exists(filepath.Join(driveRoot, "sda", "tmp", "oldfile")))
}
func TestReplicatorReportStats(t *testing.T) {
func TestReplicatorReportStatsNotSetup(t *testing.T) {
saved := &replicationLogSaver{}
replicator := makeReplicator("devices", os.TempDir(), "ms_per_part", "1", "concurrency", "3")
replicator.logger = saved
@@ -157,25 +158,51 @@ func TestReplicatorReportStats(t *testing.T) {
replicator.statsReporter(c)
done <- true
}()
replicator.jobCountIncrement <- 100
replicator.replicationCountIncrement <- 50
replicator.partitionTimesAdd <- 10.0
replicator.partitionTimesAdd <- 20.0
replicator.partitionTimesAdd <- 15.0
replicator.deviceProgressIncr <- deviceProgress{
dev: &hummingbird.Device{Device: "sda"},
PartitionsTotal: 12}
c <- t
close(c)
<-done
return saved.logged[len(saved.logged)-1]
}
var remaining int
var elapsed, rate float64
replicator.startTime = time.Now()
reportStats(time.Now().Add(100 * time.Second))
cnt, err := fmt.Sscanf(saved.logged[0], "50/100 (50.00%%) partitions replicated in %fs (%f/sec, %dm remaining)", &elapsed, &rate, &remaining)
assert.Nil(t, err)
assert.Equal(t, 3, cnt)
assert.Equal(t, 2, remaining)
assert.Equal(t, saved.logged[1], "Partition times: max 20.0000s, min 10.0000s, med 15.0000s")
assert.Equal(t, "Trying to increment progress and not present: sda", saved.logged[0])
}
func TestReplicatorReportStats(t *testing.T) {
saved := &replicationLogSaver{}
replicator := makeReplicator("devices", os.TempDir(), "ms_per_part", "1", "concurrency", "3")
replicator.logger = saved
replicator.deviceProgressMap["sda"] = &deviceProgress{
dev: &hummingbird.Device{Device: "sda"}}
replicator.deviceProgressMap["sdb"] = &deviceProgress{
dev: &hummingbird.Device{Device: "sdb"}}
reportStats := func(t time.Time) string {
c := make(chan time.Time)
done := make(chan bool)
go func() {
replicator.statsReporter(c)
done <- true
}()
replicator.deviceProgressPassInit <- deviceProgress{
dev: &hummingbird.Device{Device: "sda"},
PartitionsTotal: 10}
replicator.deviceProgressIncr <- deviceProgress{
dev: &hummingbird.Device{Device: "sda"},
PartitionsDone: 10}
c <- t
close(c)
<-done
return saved.logged[len(saved.logged)-1]
}
reportStats(time.Now().Add(100 * time.Second))
assert.Equal(t, replicator.deviceProgressMap["sda"].PartitionsDone, uint64(10))
assert.NotEqual(t, strings.Index(saved.logged[0], "10/10 (100.00%) partitions replicated in"), -1)
}
type FakeLocalRing struct {
@@ -513,3 +540,97 @@ func TestPriorityRepHandler404(t *testing.T) {
replicator.priorityRepHandler(w, req)
require.EqualValues(t, 404, w.Code)
}
func TestRestartDevice(t *testing.T) {
ts, err := makeObjectServer()
assert.Nil(t, err)
defer ts.Close()
ts2, err := makeObjectServer()
assert.Nil(t, err)
defer ts2.Close()
req, err := http.NewRequest("PUT", fmt.Sprintf("http://%s:%d/sda/0/a/c/o", ts.host, ts.port),
bytes.NewBuffer([]byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ")))
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Content-Length", "26")
req.Header.Set("X-Timestamp", hummingbird.GetTimestamp())
resp, err := http.DefaultClient.Do(req)
require.Nil(t, err)
require.Equal(t, 201, resp.StatusCode)
req, err = http.NewRequest("PUT", fmt.Sprintf("http://%s:%d/sda/1/a/c/o2", ts.host, ts.port),
bytes.NewBuffer([]byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ")))
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Content-Length", "26")
req.Header.Set("X-Timestamp", hummingbird.GetTimestamp())
resp, err = http.DefaultClient.Do(req)
require.Nil(t, err)
require.Equal(t, 201, resp.StatusCode)
ldev := &hummingbird.Device{ReplicationIp: ts.host, ReplicationPort: ts.port, Device: "sda"}
rdev := &hummingbird.Device{ReplicationIp: ts2.host, ReplicationPort: ts2.port, Device: "sda"}
dp := &deviceProgress{
dev: ldev,
StartDate: time.Now(),
LastUpdate: time.Now(),
}
saved := &replicationLogSaver{}
repl := makeReplicator()
repl.Ring = &FakeRepRing2{ldev: ldev, rdev: rdev}
repl.logger = saved
// set stuff up
repl.driveRoot = ts.objServer.driveRoot
myTicker := make(chan time.Time)
repl.partRateTicker = time.NewTicker(repl.timePerPart)
repl.partRateTicker.C = myTicker
repl.concurrencySem = make(chan struct{}, 5)
repl.deviceProgressMap["sda"] = dp
repl.restartReplicateDevice(ldev)
cancelChan := repl.cancelers["sda"]
// precancel the run
delete(repl.cancelers, "sda")
close(cancelChan)
//start replication for loop
statsDp := <-repl.deviceProgressPassInit
assert.Equal(t, uint64(2), statsDp.PartitionsTotal)
// but got canceled
statsDp = <-repl.deviceProgressIncr
assert.Equal(t, uint64(1), statsDp.CancelCount)
// start up everything again
repl.restartReplicateDevice(ldev)
//start replication for loop again
<-repl.deviceProgressPassInit
// 1st partition process
myTicker <- time.Now()
statsDp = <-repl.deviceProgressIncr
assert.Equal(t, uint64(1), statsDp.PartitionsDone)
// syncing file
statsDp = <-repl.deviceProgressIncr
assert.Equal(t, uint64(1), statsDp.FilesSent)
// 2nd partition process
myTicker <- time.Now()
statsDp = <-repl.deviceProgressIncr
assert.Equal(t, uint64(1), statsDp.PartitionsDone)
statsDp = <-repl.deviceProgressIncr
assert.Equal(t, uint64(1), statsDp.FilesSent)
// 2nd partition was processed so cancel next run
cancelChan = repl.cancelers["sda"]
delete(repl.cancelers, "sda")
close(cancelChan)
// check that full replicate was tracked
statsDp = <-repl.deviceProgressIncr
assert.Equal(t, uint64(1), statsDp.FullReplicateCount)
// starting final run
statsDp = <-repl.deviceProgressPassInit
assert.Equal(t, uint64(2), statsDp.PartitionsTotal)
// but it got canceled so returning
statsDp = <-repl.deviceProgressIncr
assert.Equal(t, uint64(1), statsDp.CancelCount)
}