-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathworker.go
88 lines (84 loc) · 2.17 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// Copyright Kuei-chun Chen, 2022-present. All rights reserved.
package hummingbird
import (
"fmt"
"time"
"github.com/simagix/gox"
"github.com/simagix/keyhole/mdb"
"golang.org/x/text/language"
"golang.org/x/text/message"
)
// Worker copies data
func Worker(id string) error {
inst := GetMigratorInstance()
var setNames []string
for setName := range inst.Replicas() {
setNames = append(setNames, setName)
}
logger := gox.GetLogger()
workerID := fmt.Sprintf("proc %v", id)
status := fmt.Sprintf(`[%v] joined`, workerID)
ws := inst.Workspace()
logger.Info(status)
ws.Log(status)
index := 0
rev := -1
processed := 0
printer := message.NewPrinter(language.English)
btime := time.Now()
for !inst.IsExit() {
rev *= -1
index++
index := index % len(setNames)
if time.Since(btime) > 10*time.Minute {
status := printer.Sprintf("[%v] has processed %d tasks", workerID, processed)
logger.Info(status)
btime = time.Now()
}
task, err := ws.FindNextTaskAndUpdate(setNames[index], workerID, rev)
if err != nil {
if task != nil {
task.Status = TaskAdded
ws.UpdateTask(task)
}
time.Sleep(10 * time.Second)
continue
}
dbName, collName := mdb.SplitNamespace(task.Namespace)
dbNameTo, collNameTo := dbName, collName
if task.Include.To != "" {
dbNameTo, collNameTo = mdb.SplitNamespace(task.Include.To)
}
src, err := GetMongoClient(inst.Replicas()[task.SetName])
if err != nil {
task.Status = TaskAdded
ws.UpdateTask(task)
time.Sleep(1 * time.Second)
continue
}
tgt, err := GetMongoClient(inst.Target)
if err != nil {
task.Status = TaskAdded
ws.UpdateTask(task)
time.Sleep(1 * time.Second)
continue
}
if err = task.CopyData(src.Database(dbName).Collection(collName),
tgt.Database(dbNameTo).Collection(collNameTo)); err != nil {
task.Status = TaskAdded
} else {
task.Status = TaskCompleted
task.EndTime = time.Now()
}
task.UpdatedBy = workerID
ws.UpdateTask(task)
processed++
if (processed)%100 == 1 {
status := printer.Sprintf("[%v] has processed %d tasks", workerID, processed)
logger.Info(status)
}
time.Sleep(100 * time.Millisecond)
}
logger.Infof(`[%v] exits`, workerID)
return nil
}