Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
acud committed Aug 13, 2024
1 parent 554a9b3 commit 4bd4e40
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 143 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package write_coalescer
package atxwriter

import (
"context"
Expand All @@ -14,55 +14,74 @@ import (
"go.uber.org/zap"
)

var sqlWriterSleep = 100 * time.Millisecond

const poolItemMinSize = 1000 // minimum size of atx batch (to save on allocation)
var pool = &sync.Pool{
New: func() any {
s := make([]atxBatchItem, 0, poolItemMinSize)
return &s
},
}
var (
writerDelay = 100 * time.Millisecond
pool = &sync.Pool{
New: func() any {
s := make(map[types.ATXID]atxBatchItem)
return &s
},
}
rmSlice = make([]types.ATXID, 0, 1000)
)

func getBatch() []atxBatchItem {
v := pool.Get().(*[]atxBatchItem)
func getBatch() map[types.ATXID]atxBatchItem {
v := pool.Get().(*map[types.ATXID]atxBatchItem)
return *v
}

func putBatch(v []atxBatchItem) {
v = v[:0]
func putBatch(v map[types.ATXID]atxBatchItem) {
for k := range v {
rmSlice = append(rmSlice, k)
}
for _, k := range rmSlice {
delete(v, k)
}

pool.Put(&v)
}

type AtxWriter struct {
db db
logger *zap.Logger

atxMu sync.Mutex
atxBatch []atxBatchItem
atxMu sync.Mutex
timer *time.Timer
running bool
deadline time.Time

atxBatch map[types.ATXID]atxBatchItem
atxBatchResult *batchResult
}

func New(db db, logger *zap.Logger) *AtxWriter {
return &AtxWriter{
// create a stopped timer so we could reuse it later on
timer := time.NewTimer(writerDelay)
if !timer.Stop() {
<-timer.C
}

writer := &AtxWriter{
db: db,
logger: logger,
timer: timer,
atxBatchResult: &batchResult{
doneC: make(chan struct{}),
},
atxBatch: getBatch(),
}
return writer
}

// Start the forever-loop that flushes the atxs to the DB
// at-least every `sqlWriterSleep`. The caller is responsible
// to call Start in a different goroutine.
func (w *AtxWriter) Start(ctx context.Context) {
t := time.NewTicker(sqlWriterSleep)
for {
select {
case <-ctx.Done():
return
case <-t.C:
case <-w.timer.C:
// copy-on-write
w.atxMu.Lock()
if len(w.atxBatch) == 0 {
Expand Down Expand Up @@ -100,14 +119,23 @@ func (w *AtxWriter) Start(ctx context.Context) {
}
putBatch(batch)
close(res.doneC)
w.atxMu.Lock()
if len(w.atxBatch) == 0 {
w.running = false
}

w.atxMu.Unlock()
}
}
}

func (w *AtxWriter) Store(atx *types.ActivationTx, watx *wire.ActivationTxV1) (<-chan struct{}, func() error) {
w.atxMu.Lock()
defer w.atxMu.Unlock()
w.atxBatch = append(w.atxBatch, atxBatchItem{atx: atx, watx: watx})
if !w.running {
w.timer.Reset(writerDelay)
}
w.atxBatch[atx.ID()] = atxBatchItem{atx: atx, watx: watx}
br := w.atxBatchResult
c := br.doneC
return c, br.Error
Expand Down
191 changes: 191 additions & 0 deletions activation/atxwriter/atxwriter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package atxwriter_test

import (
"context"
"testing"
"time"

"github.com/spacemeshos/go-spacemesh/activation/atxwriter"
"github.com/spacemeshos/go-spacemesh/activation/wire"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/merkle-tree"
poetShared "github.com/spacemeshos/poet/shared"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)

var (
postGenesisEpoch types.EpochID = 2
goldenATXID = types.RandomATXID()
wAtx = newInitialATXv1(goldenATXID)
atx = toAtx(wAtx)
)

func TestWriteCoalesce_One(t *testing.T) {
w, db := newTestAtxWriter(t)

ch, errfn := w.Store(atx, wAtx)
var err error
select {
case <-ch:
err = errfn()
case <-time.After(5 * time.Second):
t.Fatal("timeout")
}
require.NoError(t, err)
has, err := atxs.Has(db, atx.ID())
require.True(t, has)
require.NoError(t, err)
}

func TestWriteCoalesce_Duplicates(t *testing.T) {
w, db := newTestAtxWriter(t)

ch, errfn := w.Store(atx, wAtx)
_, _ = w.Store(atx, wAtx)
var err error
select {
case <-ch:
err = errfn()
case <-time.After(5 * time.Second):
t.Fatal("timeout")
}
require.NoError(t, err)
has, err := atxs.Has(db, atx.ID())
require.True(t, has)
require.NoError(t, err)
}

func TestWriteCoalesce_MultipleBatches(t *testing.T) {
w, db := newTestAtxWriter(t)

ch, errfn := w.Store(atx, wAtx)
var err error
select {
case <-ch:
err = errfn()
case <-time.After(5 * time.Second):
t.Fatal("timeout")
}
require.NoError(t, err)
has, err := atxs.Has(db, atx.ID())
require.True(t, has)
require.NoError(t, err)

wAtx2 := newInitialATXv1(types.RandomATXID())
atx2 := toAtx(wAtx)

ch, errfn = w.Store(atx2, wAtx2)
select {
case <-ch:
err = errfn()
case <-time.After(5 * time.Second):
t.Fatal("timeout")
}
require.NoError(t, err)
has, err = atxs.Has(db, atx2.ID())
require.True(t, has)
require.NoError(t, err)
}

func toAtx(watx *wire.ActivationTxV1) *types.ActivationTx {
atx := wire.ActivationTxFromWireV1(watx)
atx.SetReceived(time.Now())
atx.BaseTickHeight = uint64(atx.PublishEpoch)
atx.TickCount = 1
return atx
}

func newTestAtxWriter(t *testing.T) (*atxwriter.AtxWriter, *sql.Database) {
t.Helper()
db := sql.InMemoryTest(t)
log := zaptest.NewLogger(t)
w := atxwriter.New(db, log)
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
t.Cleanup(func() {
cancel()
<-done
})
go func() {
defer close(done)
w.Start(ctx)
}()
return w, db
}

func newInitialATXv1(
goldenATXID types.ATXID,
opts ...func(*wire.ActivationTxV1),
) *wire.ActivationTxV1 {
nonce := uint64(999)
poetRef := types.RandomHash()
atx := &wire.ActivationTxV1{
InnerActivationTxV1: wire.InnerActivationTxV1{
NIPostChallengeV1: wire.NIPostChallengeV1{
PrevATXID: types.EmptyATXID,
PublishEpoch: postGenesisEpoch,
PositioningATXID: goldenATXID,
CommitmentATXID: &goldenATXID,
InitialPost: &wire.PostV1{},
},
NIPost: newNIPostV1WithPoet(poetRef.Bytes()),
VRFNonce: &nonce,
Coinbase: types.GenerateAddress([]byte("aaaa")),
NumUnits: 100,
},
}
for _, opt := range opts {
opt(atx)
}
return atx
}

func newMerkleProof(leafs []types.Hash32) (types.MerkleProof, types.Hash32) {
tree, err := merkle.NewTreeBuilder().
WithHashFunc(poetShared.HashMembershipTreeNode).
WithLeavesToProve(map[uint64]bool{0: true}).
Build()
if err != nil {
panic(err)
}
for _, m := range leafs {
if err := tree.AddLeaf(m[:]); err != nil {
panic(err)
}
}
root, nodes := tree.RootAndProof()
nodesH32 := make([]types.Hash32, 0, len(nodes))
for _, n := range nodes {
nodesH32 = append(nodesH32, types.BytesToHash(n))
}
return types.MerkleProof{
Nodes: nodesH32,
}, types.BytesToHash(root)
}

func newNIPostV1WithPoet(poetRef []byte) *wire.NIPostV1 {
proof, _ := newMerkleProof([]types.Hash32{
types.BytesToHash([]byte("challenge")),
types.BytesToHash([]byte("leaf2")),
types.BytesToHash([]byte("leaf3")),
types.BytesToHash([]byte("leaf4")),
})

return &wire.NIPostV1{
Membership: wire.MerkleProofV1{
Nodes: proof.Nodes,
LeafIndex: 0,
},
Post: &wire.PostV1{
Nonce: 0,
Indices: []byte{1, 2, 3},
Pow: 0,
},
PostMetadata: &wire.PostMetadataV1{
Challenge: poetRef,
},
}
}
33 changes: 33 additions & 0 deletions activation/atxwriter/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package atxwriter

import (
"github.com/spacemeshos/go-spacemesh/metrics"
)

const (
namespace = "activation_write_coalescer"
)

var BatchWriteCount = metrics.NewSimpleCounter(
namespace,
"batch_write_count",
"number of errors when writing a batch",
)

var WriteBatchErrorsCount = metrics.NewSimpleCounter(
namespace,
"write_batch_errors",
"number of errors when writing a batch",
)

var ErroredBatchCount = metrics.NewSimpleCounter(
namespace,
"errored_batch",
"number of batches that errored",
)

var FlushBatchSize = metrics.NewSimpleCounter(
namespace,
"flush_batch_size",
"size of flushed batch",
)
4 changes: 0 additions & 4 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,6 @@ func (h *Handler) Register(sig *signing.EdSigner) {
h.v1.Register(sig)
}

func (h *Handler) Start(ctx context.Context) {
h.v1.flushAtxLoop(ctx)
}

// HandleSyncedAtx handles atxs received by sync.
func (h *Handler) HandleSyncedAtx(ctx context.Context, expHash types.Hash32, peer p2p.Peer, data []byte) error {
_, err := h.handleAtx(ctx, expHash, peer, data)
Expand Down
Loading

0 comments on commit 4bd4e40

Please sign in to comment.