-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathstore.go
197 lines (159 loc) · 6.38 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package rangedb
import (
"context"
"encoding/json"
"fmt"
"strings"
)
// Version for RangeDB.
const Version = "0.13.0-dev"
// Record contains event data and metadata.
type Record struct {
StreamName string `msgpack:"n" json:"streamName"`
AggregateType string `msgpack:"a" json:"aggregateType"`
AggregateID string `msgpack:"i" json:"aggregateID"`
GlobalSequenceNumber uint64 `msgpack:"g" json:"globalSequenceNumber"`
StreamSequenceNumber uint64 `msgpack:"s" json:"streamSequenceNumber"`
InsertTimestamp uint64 `msgpack:"u" json:"insertTimestamp"`
EventID string `msgpack:"e" json:"eventID"`
EventType string `msgpack:"t" json:"eventType"`
Data interface{} `msgpack:"d" json:"data"`
Metadata interface{} `msgpack:"m" json:"metadata"`
}
// EventRecord stores the event and metadata to be used for persisting.
type EventRecord struct {
Event Event
Metadata interface{}
}
// EventBinder defines how to bind events for serialization.
type EventBinder interface {
Bind(events ...Event)
}
// Store is the interface that stores and retrieves event records.
type Store interface {
EventBinder
// Events returns a RecordIterator containing all events in the store starting with globalSequenceNumber.
Events(ctx context.Context, globalSequenceNumber uint64) RecordIterator
// EventsByAggregateTypes returns a RecordIterator containing all events for each aggregateType(s) starting
// with globalSequenceNumber.
EventsByAggregateTypes(ctx context.Context, globalSequenceNumber uint64, aggregateTypes ...string) RecordIterator
// EventsByStream returns a RecordIterator containing all events in the stream starting with streamSequenceNumber.
EventsByStream(ctx context.Context, streamSequenceNumber uint64, streamName string) RecordIterator
// OptimisticDeleteStream removes an entire stream. If the expectedStreamSequenceNumber does not match the current
// stream sequence number, an rangedberror.UnexpectedSequenceNumber error is returned.
OptimisticDeleteStream(ctx context.Context, expectedStreamSequenceNumber uint64, streamName string) error
// OptimisticSave persists events to a single stream returning the new StreamSequenceNumber or an error. If
// the expectedStreamSequenceNumber does not match the current stream sequence number,
// an rangedberror.UnexpectedSequenceNumber error is returned.
OptimisticSave(ctx context.Context, expectedStreamSequenceNumber uint64, streamName string, eventRecords ...*EventRecord) (uint64, error)
// Save persists events to a single stream returning the new StreamSequenceNumber or an error.
Save(ctx context.Context, streamName string, eventRecords ...*EventRecord) (uint64, error)
AllEventsSubscription(ctx context.Context, bufferSize int, subscriber RecordSubscriber) RecordSubscription
AggregateTypesSubscription(ctx context.Context, bufferSize int, subscriber RecordSubscriber, aggregateTypes ...string) RecordSubscription
TotalEventsInStream(ctx context.Context, streamName string) (uint64, error)
}
// RecordSubscription defines how a subscription starts and stops.
type RecordSubscription interface {
// Start returns immediately after subscribing only to new events in a goroutine.
Start() error
// StartFrom blocks until all previous events have been processed, then returns after subscribing to new events in a goroutine.
StartFrom(globalSequenceNumber uint64) error
// Stop cancels the subscription and stops.
Stop()
}
// ResultRecord combines Record and error as a result struct for event queries.
type ResultRecord struct {
Record *Record
Err error
}
// RecordIterator is used to traverse a stream of record events.
type RecordIterator interface {
Next() bool
NextContext(context.Context) bool
Record() *Record
Err() error
}
// Event is the interface that defines the required event methods.
type Event interface {
AggregateMessage
EventType() string
}
// AggregateMessage is the interface that supports building an event stream name.
type AggregateMessage interface {
AggregateID() string
AggregateType() string
}
// The RecordSubscriberFunc type is an adapter to allow the use of
// ordinary functions as record subscribers. If f is a function
// with the appropriate signature, RecordSubscriberFunc(f) is a
// Handler that calls f.
type RecordSubscriberFunc func(*Record)
// Accept receives a record.
func (f RecordSubscriberFunc) Accept(record *Record) {
f(record)
}
// RecordSubscriber is the interface that defines how a projection receives Records.
type RecordSubscriber interface {
Accept(record *Record)
}
// GetEventStream returns the stream name for an event.
func GetEventStream(message AggregateMessage) string {
return GetStream(message.AggregateType(), message.AggregateID())
}
// GetStream returns the stream name for an aggregateType and aggregateID.
func GetStream(aggregateType, aggregateID string) string {
return fmt.Sprintf("%s!%s", aggregateType, aggregateID)
}
// ParseStream returns the aggregateType and aggregateID for a stream name.
func ParseStream(streamName string) (aggregateType, aggregateID string) {
pieces := strings.Split(streamName, "!")
return pieces[0], pieces[1]
}
// ReadNRecords reads up to N records from the channel returned by f into a slice
func ReadNRecords(totalEvents uint64, f func() (RecordIterator, context.CancelFunc)) []*Record {
var records []*Record
cnt := uint64(0)
recordIterator, done := f()
for recordIterator.Next() {
if recordIterator.Err() != nil {
break
}
cnt++
if cnt > totalEvents {
break
}
records = append(records, recordIterator.Record())
}
done()
for recordIterator.Next() {
}
return records
}
type rawEvent struct {
aggregateType string
aggregateID string
eventType string
data interface{}
}
// NewRawEvent constructs a new raw event when an event struct is unavailable or unknown.
func NewRawEvent(aggregateType, aggregateID, eventType string, data interface{}) Event {
return &rawEvent{
aggregateType: aggregateType,
aggregateID: aggregateID,
eventType: eventType,
data: data,
}
}
func (e rawEvent) AggregateID() string {
return e.aggregateID
}
func (e rawEvent) AggregateType() string {
return e.aggregateType
}
func (e rawEvent) EventType() string {
return e.eventType
}
func (e rawEvent) MarshalJSON() ([]byte, error) {
return json.Marshal(e.data)
}
var ErrStreamNotFound = fmt.Errorf("stream not found")