Skip to content

Commit

Permalink
add benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
acud committed Aug 14, 2024
1 parent 4bd4e40 commit 369eec0
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 7 deletions.
12 changes: 6 additions & 6 deletions activation/atxwriter/atxwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ func (w *AtxWriter) Start(ctx context.Context) {
if err := w.db.WithTx(ctx, func(tx *sql.Tx) error {
var err error
for _, item := range batch {
err = atxs.Add(tx, item.atx, item.watx.Blob())
err = atxs.Add(tx, item.Atx, item.Watx.Blob())
if err != nil && !errors.Is(err, sql.ErrObjectExists) {
WriteBatchErrorsCount.Inc()
return fmt.Errorf("add atx to db: %w", err)
}
err = atxs.SetPost(tx, item.atx.ID(), item.watx.PrevATXID, 0,
item.atx.SmesherID, item.watx.NumUnits)
err = atxs.SetPost(tx, item.Atx.ID(), item.Watx.PrevATXID, 0,
item.Atx.SmesherID, item.Watx.NumUnits)
if err != nil && !errors.Is(err, sql.ErrObjectExists) {
WriteBatchErrorsCount.Inc()
return fmt.Errorf("set atx units: %w", err)
Expand Down Expand Up @@ -135,7 +135,7 @@ func (w *AtxWriter) Store(atx *types.ActivationTx, watx *wire.ActivationTxV1) (<
if !w.running {
w.timer.Reset(writerDelay)
}
w.atxBatch[atx.ID()] = atxBatchItem{atx: atx, watx: watx}
w.atxBatch[atx.ID()] = atxBatchItem{Atx: atx, Watx: watx}
br := w.atxBatchResult
c := br.doneC
return c, br.Error
Expand All @@ -151,8 +151,8 @@ func (b *batchResult) Error() error {
}

type atxBatchItem struct {
atx *types.ActivationTx
watx *wire.ActivationTxV1
Atx *types.ActivationTx
Watx *wire.ActivationTxV1
}

type db interface {
Expand Down
109 changes: 108 additions & 1 deletion activation/atxwriter/atxwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package atxwriter_test

import (
"context"
"path/filepath"
"testing"
"time"

Expand All @@ -10,9 +11,11 @@ import (
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/migrations"
"github.com/spacemeshos/merkle-tree"
poetShared "github.com/spacemeshos/poet/shared"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)

Expand Down Expand Up @@ -90,6 +93,90 @@ func TestWriteCoalesce_MultipleBatches(t *testing.T) {
require.NoError(t, err)
}

func BenchmarkWriteCoalesing(b *testing.B) {
a := make([]atxwriter.AtxBatchItem, 100000)
for i := 0; i < len(a); i++ {
goldenATXID := types.RandomATXID()
wAtx := newInitialATXv1(goldenATXID)
atx := toAtx(wAtx)
a[i] = atxwriter.AtxBatchItem{Atx: atx, Watx: wAtx}
}

b.ResetTimer()
b.Run("No Coalesing", func(b *testing.B) {
db := newDiskSqlite(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := db.WithTx(context.Background(), func(tx *sql.Tx) error {
var err error
err = atxs.Add(tx, a[i].Atx, a[i].Watx.Blob())
if err != nil {
b.Fatal(err)
}
err = atxs.SetPost(tx, a[i].Atx.ID(), a[i].Watx.PrevATXID, 0,
a[i].Atx.SmesherID, a[i].Watx.NumUnits)
if err != nil {
b.Fatal(err)
}
return nil
}); err != nil {
b.Fatal(err)
}
}
})

// with the coalesing tests, one must take the "ns/op" metrics and divide it
// by the number of entries written together to see how many items we're doing
// per time unit.
b.Run("Coalesing 1000 entries", func(b *testing.B) {
db := newDiskSqlite(b)
b.ResetTimer()
for j := 0; j < b.N; j++ {
if err := db.WithTx(context.Background(), func(tx *sql.Tx) error {
var err error
for i := (j * 1000); i < (j*1000)+1000; i++ {
err = atxs.Add(tx, a[i].Atx, a[i].Watx.Blob())
if err != nil {
b.Fatal(err)
}
err = atxs.SetPost(tx, a[i].Atx.ID(), a[i].Watx.PrevATXID, 0,
a[i].Atx.SmesherID, a[i].Watx.NumUnits)
if err != nil {
b.Fatal(err)
}
}
return nil
}); err != nil {
b.Fatal(err)
}
}
})

b.Run("Coalesing 5000 entries", func(b *testing.B) {
db := newDiskSqlite(b)
b.ResetTimer()
for j := 0; j < b.N; j++ {
if err := db.WithTx(context.Background(), func(tx *sql.Tx) error {
var err error
for i := (j * 5000); i < (j*5000)+5000; i++ {
err = atxs.Add(tx, a[i].Atx, a[i].Watx.Blob())
if err != nil {
b.Fatal(err)
}
err = atxs.SetPost(tx, a[i].Atx.ID(), a[i].Watx.PrevATXID, 0,
a[i].Atx.SmesherID, a[i].Watx.NumUnits)
if err != nil {
b.Fatal(err)
}
}
return nil
}); err != nil {
b.Fatal(err)
}
}
})
}

func toAtx(watx *wire.ActivationTxV1) *types.ActivationTx {
atx := wire.ActivationTxFromWireV1(watx)
atx.SetReceived(time.Now())
Expand All @@ -98,7 +185,7 @@ func toAtx(watx *wire.ActivationTxV1) *types.ActivationTx {
return atx
}

func newTestAtxWriter(t *testing.T) (*atxwriter.AtxWriter, *sql.Database) {
func newTestAtxWriter(t testing.TB) (*atxwriter.AtxWriter, *sql.Database) {
t.Helper()
db := sql.InMemoryTest(t)
log := zaptest.NewLogger(t)
Expand All @@ -116,6 +203,26 @@ func newTestAtxWriter(t *testing.T) (*atxwriter.AtxWriter, *sql.Database) {
return w, db
}

func newDiskSqlite(tb testing.TB) *sql.Database {
tb.Helper()
m21 := migrations.New0021Migration(zap.NewNop(), 1_000_000)
migrations, err := sql.StateMigrations()
if err != nil {
tb.Fatal(err)
}
dbopts := []sql.Opt{
sql.WithMigrations(migrations),
sql.WithMigration(m21),
}
dir := tb.TempDir()
sqlDB, err := sql.Open("file:"+filepath.Join(dir, "sql.sql"), dbopts...)
if err != nil {
tb.Fatal(err)
}
tb.Cleanup(func() { sqlDB.Close() })
return sqlDB
}

func newInitialATXv1(
goldenATXID types.ATXID,
opts ...func(*wire.ActivationTxV1),
Expand Down
3 changes: 3 additions & 0 deletions activation/atxwriter/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package atxwriter

type AtxBatchItem = atxBatchItem

0 comments on commit 369eec0

Please sign in to comment.