-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwriter_with_indexer.go
88 lines (71 loc) · 2.01 KB
/
writer_with_indexer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package ethwal
import (
"context"
"fmt"
"log"
"github.com/0xsequence/ethwal/storage"
)
type writerWithIndexer[T any] struct {
writer Writer[T]
indexer *Indexer[T]
}
var _ Writer[any] = (*writerWithIndexer[any])(nil)
func NewWriterWithIndexer[T any](writer Writer[T], indexer *Indexer[T]) (Writer[T], error) {
if writer.BlockNum() > indexer.BlockNum() {
// todo: implement a way to catch up indexer with writer
// this should never happen if the writer with indexer is used
return nil, fmt.Errorf("writer is ahead of indexer, can't catch up")
}
opts := writer.Options()
wrappedPolicy := NewWrappedRollPolicy(opts.FileRollPolicy, func(ctx context.Context) {
err := indexer.Flush(ctx)
if err != nil {
log.Default().Println("failed to flush index", "err", err)
}
})
opts.FileRollPolicy = wrappedPolicy
writer.SetOptions(opts)
return &writerWithIndexer[T]{indexer: indexer, writer: writer}, nil
}
func (c *writerWithIndexer[T]) FileSystem() storage.FS {
return c.writer.FileSystem()
}
func (c *writerWithIndexer[T]) Write(ctx context.Context, block Block[T]) error {
// update indexes first (idempotent)
err := c.index(ctx, block)
if err != nil {
return err
}
// write block, noop if block already written
err = c.writer.Write(ctx, block)
if err != nil {
return err
}
return nil
}
func (c *writerWithIndexer[T]) Close(ctx context.Context) error {
err := c.indexer.Close(ctx)
if err != nil {
return err
}
return c.writer.Close(ctx)
}
func (c *writerWithIndexer[T]) BlockNum() uint64 {
return min(c.writer.BlockNum(), c.indexer.BlockNum())
}
func (c *writerWithIndexer[T]) RollFile(ctx context.Context) error {
err := c.indexer.Flush(ctx)
if err != nil {
return err
}
return c.writer.RollFile(ctx)
}
func (c *writerWithIndexer[T]) Options() Options {
return c.writer.Options()
}
func (c *writerWithIndexer[T]) SetOptions(options Options) {
c.writer.SetOptions(options)
}
func (c *writerWithIndexer[T]) index(ctx context.Context, block Block[T]) error {
return c.indexer.Index(ctx, block)
}