-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathmerge_record_iterators.go
85 lines (67 loc) · 2.07 KB
/
merge_record_iterators.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package rangedb
type pipe struct {
recordIterator RecordIterator
currentResultRecord ResultRecord
}
func newPipe(recordIterator RecordIterator) *pipe {
return &pipe{recordIterator: recordIterator}
}
func (p *pipe) ReadNext() bool {
p.recordIterator.Next()
p.currentResultRecord = ResultRecord{
Record: p.recordIterator.Record(),
Err: p.recordIterator.Err(),
}
return p.currentResultRecord.Record != nil
}
func (p *pipe) IsNextGlobalSequenceNumber(currentPosition uint64) bool {
return p.currentResultRecord.Record.GlobalSequenceNumber == currentPosition+1
}
// MergeRecordIteratorsInOrder combines record channels ordered by record.GlobalSequenceNumber.
func MergeRecordIteratorsInOrder(recordIterators []RecordIterator) RecordIterator {
resultRecords := make(chan ResultRecord)
go func() {
defer close(resultRecords)
var pipes []*pipe
for _, recordIterator := range recordIterators {
pipe := newPipe(recordIterator)
if pipe.ReadNext() {
pipes = append(pipes, pipe)
}
}
var currentPosition uint64
for len(pipes) > 0 {
i := getIndexWithSmallestGlobalSequenceNumber(pipes)
resultRecords <- pipes[i].currentResultRecord
currentPosition = pipes[i].currentResultRecord.Record.GlobalSequenceNumber
if !pipes[i].ReadNext() {
pipes = remove(pipes, i)
continue
}
for pipes[i].IsNextGlobalSequenceNumber(currentPosition) {
resultRecords <- pipes[i].currentResultRecord
currentPosition = pipes[i].currentResultRecord.Record.GlobalSequenceNumber
if !pipes[i].ReadNext() {
pipes = remove(pipes, i)
break
}
}
}
}()
return NewRecordIterator(resultRecords)
}
func remove(s []*pipe, i int) []*pipe {
s[i] = s[len(s)-1]
return s[:len(s)-1]
}
func getIndexWithSmallestGlobalSequenceNumber(pipes []*pipe) int {
smallestIndex := 0
min := pipes[smallestIndex].currentResultRecord.Record.GlobalSequenceNumber
for i, pipe := range pipes {
if pipe.currentResultRecord.Record.GlobalSequenceNumber < min {
smallestIndex = i
min = pipe.currentResultRecord.Record.GlobalSequenceNumber
}
}
return smallestIndex
}