diff --git a/activation/atxwriter/atxwriter.go b/activation/atxwriter/atxwriter.go index 569687c801..4614cfa567 100644 --- a/activation/atxwriter/atxwriter.go +++ b/activation/atxwriter/atxwriter.go @@ -99,13 +99,13 @@ func (w *AtxWriter) Start(ctx context.Context) { if err := w.db.WithTx(ctx, func(tx *sql.Tx) error { var err error for _, item := range batch { - err = atxs.Add(tx, item.atx, item.watx.Blob()) + err = atxs.Add(tx, item.Atx, item.Watx.Blob()) if err != nil && !errors.Is(err, sql.ErrObjectExists) { WriteBatchErrorsCount.Inc() return fmt.Errorf("add atx to db: %w", err) } - err = atxs.SetPost(tx, item.atx.ID(), item.watx.PrevATXID, 0, - item.atx.SmesherID, item.watx.NumUnits) + err = atxs.SetPost(tx, item.Atx.ID(), item.Watx.PrevATXID, 0, + item.Atx.SmesherID, item.Watx.NumUnits) if err != nil && !errors.Is(err, sql.ErrObjectExists) { WriteBatchErrorsCount.Inc() return fmt.Errorf("set atx units: %w", err) @@ -135,7 +135,7 @@ func (w *AtxWriter) Store(atx *types.ActivationTx, watx *wire.ActivationTxV1) (< if !w.running { w.timer.Reset(writerDelay) } - w.atxBatch[atx.ID()] = atxBatchItem{atx: atx, watx: watx} + w.atxBatch[atx.ID()] = atxBatchItem{Atx: atx, Watx: watx} br := w.atxBatchResult c := br.doneC return c, br.Error @@ -151,8 +151,8 @@ func (b *batchResult) Error() error { } type atxBatchItem struct { - atx *types.ActivationTx - watx *wire.ActivationTxV1 + Atx *types.ActivationTx + Watx *wire.ActivationTxV1 } type db interface { diff --git a/activation/atxwriter/atxwriter_test.go b/activation/atxwriter/atxwriter_test.go index 58c3175166..49c82207b9 100644 --- a/activation/atxwriter/atxwriter_test.go +++ b/activation/atxwriter/atxwriter_test.go @@ -2,6 +2,7 @@ package atxwriter_test import ( "context" + "path/filepath" "testing" "time" @@ -10,9 +11,11 @@ import ( "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/atxs" + "github.com/spacemeshos/go-spacemesh/sql/migrations" "github.com/spacemeshos/merkle-tree" poetShared "github.com/spacemeshos/poet/shared" "github.com/stretchr/testify/require" + "go.uber.org/zap" "go.uber.org/zap/zaptest" ) @@ -90,6 +93,90 @@ func TestWriteCoalesce_MultipleBatches(t *testing.T) { require.NoError(t, err) } +func BenchmarkWriteCoalesing(b *testing.B) { + a := make([]atxwriter.AtxBatchItem, 100000) + for i := 0; i < len(a); i++ { + goldenATXID := types.RandomATXID() + wAtx := newInitialATXv1(goldenATXID) + atx := toAtx(wAtx) + a[i] = atxwriter.AtxBatchItem{Atx: atx, Watx: wAtx} + } + + b.ResetTimer() + b.Run("No Coalesing", func(b *testing.B) { + db := newDiskSqlite(b) + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := db.WithTx(context.Background(), func(tx *sql.Tx) error { + var err error + err = atxs.Add(tx, a[i].Atx, a[i].Watx.Blob()) + if err != nil { + b.Fatal(err) + } + err = atxs.SetPost(tx, a[i].Atx.ID(), a[i].Watx.PrevATXID, 0, + a[i].Atx.SmesherID, a[i].Watx.NumUnits) + if err != nil { + b.Fatal(err) + } + return nil + }); err != nil { + b.Fatal(err) + } + } + }) + + // with the coalesing tests, one must take the "ns/op" metrics and divide it + // by the number of entries written together to see how many items we're doing + // per time unit. + b.Run("Coalesing 1000 entries", func(b *testing.B) { + db := newDiskSqlite(b) + b.ResetTimer() + for j := 0; j < b.N; j++ { + if err := db.WithTx(context.Background(), func(tx *sql.Tx) error { + var err error + for i := (j * 1000); i < (j*1000)+1000; i++ { + err = atxs.Add(tx, a[i].Atx, a[i].Watx.Blob()) + if err != nil { + b.Fatal(err) + } + err = atxs.SetPost(tx, a[i].Atx.ID(), a[i].Watx.PrevATXID, 0, + a[i].Atx.SmesherID, a[i].Watx.NumUnits) + if err != nil { + b.Fatal(err) + } + } + return nil + }); err != nil { + b.Fatal(err) + } + } + }) + + b.Run("Coalesing 5000 entries", func(b *testing.B) { + db := newDiskSqlite(b) + b.ResetTimer() + for j := 0; j < b.N; j++ { + if err := db.WithTx(context.Background(), func(tx *sql.Tx) error { + var err error + for i := (j * 5000); i < (j*5000)+5000; i++ { + err = atxs.Add(tx, a[i].Atx, a[i].Watx.Blob()) + if err != nil { + b.Fatal(err) + } + err = atxs.SetPost(tx, a[i].Atx.ID(), a[i].Watx.PrevATXID, 0, + a[i].Atx.SmesherID, a[i].Watx.NumUnits) + if err != nil { + b.Fatal(err) + } + } + return nil + }); err != nil { + b.Fatal(err) + } + } + }) +} + func toAtx(watx *wire.ActivationTxV1) *types.ActivationTx { atx := wire.ActivationTxFromWireV1(watx) atx.SetReceived(time.Now()) @@ -98,7 +185,7 @@ func toAtx(watx *wire.ActivationTxV1) *types.ActivationTx { return atx } -func newTestAtxWriter(t *testing.T) (*atxwriter.AtxWriter, *sql.Database) { +func newTestAtxWriter(t testing.TB) (*atxwriter.AtxWriter, *sql.Database) { t.Helper() db := sql.InMemoryTest(t) log := zaptest.NewLogger(t) @@ -116,6 +203,26 @@ func newTestAtxWriter(t *testing.T) (*atxwriter.AtxWriter, *sql.Database) { return w, db } +func newDiskSqlite(tb testing.TB) *sql.Database { + tb.Helper() + m21 := migrations.New0021Migration(zap.NewNop(), 1_000_000) + migrations, err := sql.StateMigrations() + if err != nil { + tb.Fatal(err) + } + dbopts := []sql.Opt{ + sql.WithMigrations(migrations), + sql.WithMigration(m21), + } + dir := tb.TempDir() + sqlDB, err := sql.Open("file:"+filepath.Join(dir, "sql.sql"), dbopts...) + if err != nil { + tb.Fatal(err) + } + tb.Cleanup(func() { sqlDB.Close() }) + return sqlDB +} + func newInitialATXv1( goldenATXID types.ATXID, opts ...func(*wire.ActivationTxV1), diff --git a/activation/atxwriter/export_test.go b/activation/atxwriter/export_test.go new file mode 100644 index 0000000000..ab4f9f70c3 --- /dev/null +++ b/activation/atxwriter/export_test.go @@ -0,0 +1,3 @@ +package atxwriter + +type AtxBatchItem = atxBatchItem