-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreader.go
130 lines (119 loc) · 3.63 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package main
import (
"errors"
"flag"
"io"
"log"
"math/rand"
"mime/multipart"
)
// Helper struct to abstract away the complexity of single vs multi part
// range requests.
//
// Exposes simple Reset()/Read()/Close()/etc methods while transparently
// handling using single/multi part range requests to serve them.
type Reader struct {
// Fields that don't change over lifecycle of reader
Size, ChunkSize int64
NumWorkers int
SupportsMultipart bool
Downloader Downloader
// Can change over reader lifecycle
Start, CurChunkStart, CurPos int64
Chunk io.ReadCloser
MultipartReader *multipart.Reader
MultipartChunk *multipart.Part
}
func NewReader(size, start, chunkSize int64, numWorkers int, supportsMultipart bool, downloader Downloader) *Reader {
r := &Reader{
Start: start,
CurChunkStart: start,
CurPos: start,
Size: size,
ChunkSize: chunkSize,
NumWorkers: numWorkers,
SupportsMultipart: supportsMultipart,
Downloader: downloader,
}
r.Reset(start)
return r
}
// Reset all volatile state to start over reading contents from the current position.
// Useful if we abort a chunk midway and need to restart our progress from that chunk.
func (r *Reader) Reset(curPos int64) {
r.Start = r.CurChunkStart
r.CurPos = curPos
if r.UseMultipart() {
r.MultipartReader = getMultipartReader(r.Downloader, curPos, r.CurChunkStart, r.ChunkSize, r.Size, r.NumWorkers)
}
}
func (r *Reader) AdvanceNextChunk() {
r.CurChunkStart += (r.ChunkSize * int64(r.NumWorkers))
r.CurPos = r.CurChunkStart
}
func (r *Reader) RequestChunk() {
if r.UseMultipart() {
if multiChunk, err := r.MultipartReader.NextPart(); err == nil {
r.MultipartChunk = multiChunk
} else {
log.Fatal("Error getting next multipart chunk:", err.Error())
}
} else {
r.Chunk = r.Downloader.GetRange(r.CurPos, min(r.CurChunkStart+r.ChunkSize, r.Size))
}
}
func (r *Reader) Read(d []byte) (int, error) {
if flag.Lookup("test.v") != nil && rand.Intn(100) < 95 {
// We're running as part of a unit test, randomly fail read calls 95% of the time
return 0, errors.New("forced read fail for testing")
}
if r.UseMultipart() {
return r.MultipartChunk.Read(d)
} else {
return r.Chunk.Read(d)
}
}
func (r *Reader) Close() error {
if r.UseMultipart() && r.MultipartChunk != nil {
return r.MultipartChunk.Close()
} else if !r.UseMultipart() && r.Chunk != nil {
return r.Chunk.Close()
} else {
log.Println("Attempting to close nil chunk, silently succeeding")
return nil
}
}
// Don't use multipart if the current request is too small to need it.
// Otherwise the response won't contain multipart boundary tokens
// and the reader object will break.
func (r *Reader) UseMultipart() bool {
return r.SupportsMultipart && r.Start+r.ChunkSize*int64(r.NumWorkers) < r.Size
}
func getMultipartReader(
downloader Downloader,
curPos int64,
curChunkStart int64,
chunkSize int64,
size int64,
numWorkers int) *multipart.Reader {
var ranges = [][]int64{}
// First chunk might start at curPos if we're resetting the reader midway through a chunk
ranges = append(ranges, []int64{curPos, min(curChunkStart+chunkSize, size)})
curChunkStart += (chunkSize * int64(numWorkers))
for curChunkStart < size {
ranges = append(ranges, []int64{curChunkStart, min(curChunkStart+chunkSize, size)})
curChunkStart += (chunkSize * int64(numWorkers))
}
var reader, err = (downloader).GetRanges(ranges)
if err != nil {
log.Fatal("Failed to get ranges from file:", err.Error())
}
return reader
}
func min(a, b int64) int64 {
if a < b {
return a
} else {
return b
}
}