-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsse.go
181 lines (148 loc) · 3.62 KB
/
sse.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package sse_parser
import (
"fmt"
"io"
"log/slog"
"strings"
)
type Parser struct {
buffer strings.Builder
dataCompleteFn func(dataBytes string) bool
}
func NewParser(completeFn func(dataBytes string) bool) *Parser {
return &Parser{
dataCompleteFn: completeFn,
}
}
type Message struct {
Event string
Data string
}
func (p *Parser) Reset() {
p.buffer.Reset()
}
func (p *Parser) doParseSingle(all string) (Message, bool) {
lineInPart := strings.Split(all, "\n")
if len(lineInPart) == 0 {
return Message{}, false
}
firstLine := lineInPart[0]
event := ""
data := ""
if strings.HasPrefix(firstLine, "event:") {
// normal
if len(lineInPart) < 2 {
return Message{}, false
}
dataLines := strings.Join(lineInPart[1:], "\n")
// Can either start with "event:" or "data:"
if !strings.HasPrefix(dataLines, "data:") {
return Message{}, false
}
event = strings.TrimPrefix(firstLine, "event:")
data = strings.TrimPrefix(dataLines, "data:")
} else if strings.HasPrefix(firstLine, "data:") {
// just data (damn google "sse")
dataLines := strings.Join(lineInPart[0:], "\n")
data = strings.TrimPrefix(dataLines, "data:")
} else {
return Message{}, false
}
if p.dataCompleteFn == nil || p.dataCompleteFn(data) {
return Message{
Event: event,
Data: data,
}, true
} else {
return Message{}, false
}
}
func (p *Parser) doParseAll(isFinish bool) []Message {
allInBuffer := p.buffer.String()
parts := strings.Split(allInBuffer, "\n\n")
p.buffer.Reset()
// if all lines are empty, we can just return
stringsAllEmpty := func(lines []string) bool {
for _, line := range lines {
if strings.TrimSpace(line) != "" {
return false
}
}
return true
}
if stringsAllEmpty(parts) {
return []Message{}
}
// All parts except the last must be a valid message.
// The last part may or may not yet be complete
messages := []Message{}
for i, part := range parts {
if i+1 == len(parts) {
// we are at the last part, and we are not forcing the last line
// so we can skip this part
continue
}
message, ok := p.doParseSingle(part)
if !ok {
slog.Error(fmt.Sprintf("Invalid message: %s, skipping", part))
continue
}
messages = append(messages, message)
}
lastPart := parts[len(parts)-1]
if stringsAllEmpty([]string{lastPart}) {
return messages
}
if isFinish || p.dataCompleteFn != nil {
lastMessage, ok := p.doParseSingle(lastPart)
if !ok {
if isFinish {
slog.Error(fmt.Sprintf("Invalid last message piece: %s, skipping", lastPart))
} else {
p.buffer.WriteString(lastPart) // put it back to parse later when we have more data
}
return messages
}
return append(messages, lastMessage)
}
return messages
}
func (p *Parser) Add(data string) []Message {
// replace all \r\n with \n
data = strings.ReplaceAll(data, "\r\n", "\n")
p.buffer.WriteString(data)
return p.doParseAll(false)
}
func (p *Parser) Finish() []Message {
return p.doParseAll(true)
}
type Writer struct {
parser *Parser
ch chan Message
}
var _ io.Writer = &Writer{}
func (w *Writer) Write(p []byte) (n int, err error) {
messages := w.parser.Add(string(p))
for _, message := range messages {
w.ch <- message
}
return len(p), nil
}
func (w *Writer) Finish() {
for _, message := range w.parser.Finish() {
w.ch <- message
}
}
func (p *Parser) Stream(reader io.Reader, msgBufSize int) <-chan Message {
ch := make(chan Message, msgBufSize)
writer := &Writer{parser: p, ch: ch}
go func() {
defer close(ch)
defer writer.Finish()
_, err := io.Copy(writer, reader)
if err != nil {
slog.Error(fmt.Sprintf("Error reading from stream: %v", err))
}
}()
return ch
}