Skip to content

Commit

Permalink
fix(accounts): fix bloom filter empty (#456)
Browse files Browse the repository at this point in the history
When rebooting the ledger, bloom filter will be
empty and we will never fetch the metadata and
the balances
  • Loading branch information
paul-nicolas authored Aug 31, 2023
1 parent 78a58db commit ddeb74b
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 79 deletions.
4 changes: 0 additions & 4 deletions pkg/storage/sqlstorage/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,6 @@ func (s *Store) ensureAccountExists(ctx context.Context, account string) error {
return err
}

s.bloom.Add([]byte(account))

_, err = executor.ExecContext(ctx, sqlq, args...)
return s.error(err)
}
Expand Down Expand Up @@ -326,8 +324,6 @@ func (s *Store) UpdateAccountMetadata(ctx context.Context, address string, metad
return errors.Wrap(err, "reading last log")
}

s.bloom.Add([]byte(address))

return s.appendLog(ctx, core.NewSetMetadataLog(lastLog, at, core.SetMetadata{
TargetType: core.MetaTargetTypeAccount,
TargetID: address,
Expand Down
90 changes: 44 additions & 46 deletions pkg/storage/sqlstorage/aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,65 +23,63 @@ func (s *Store) GetAccountWithVolumes(ctx context.Context, address string) (*cor
}
assetsVolumes := core.AssetsVolumes{}

if s.bloom.Test([]byte(address)) {
sb := sqlbuilder.NewSelectBuilder()
sb.Select("accounts.metadata", "volumes.asset", "volumes.input", "volumes.output")
sb.From(s.schema.Table("accounts"))
sb.JoinWithOption(sqlbuilder.LeftOuterJoin,
s.schema.Table("volumes"),
"accounts.address = volumes.account")
sb.Where(sb.E("accounts.address", address))

executor, err := s.executorProvider(ctx)
if err != nil {
return nil, err
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select("accounts.metadata", "volumes.asset", "volumes.input", "volumes.output")
sb.From(s.schema.Table("accounts"))
sb.JoinWithOption(sqlbuilder.LeftOuterJoin,
s.schema.Table("volumes"),
"accounts.address = volumes.account")
sb.Where(sb.E("accounts.address", address))

q, args := sb.BuildWithFlavor(s.schema.Flavor())
rows, err := executor.QueryContext(ctx, q, args...)
if err != nil {
executor, err := s.executorProvider(ctx)
if err != nil {
return nil, err
}

q, args := sb.BuildWithFlavor(s.schema.Flavor())
rows, err := executor.QueryContext(ctx, q, args...)
if err != nil {
return nil, s.error(err)
}
defer rows.Close()

for rows.Next() {
var asset, inputStr, outputStr sql.NullString
if err := rows.Scan(&acc.Metadata, &asset, &inputStr, &outputStr); err != nil {
return nil, s.error(err)
}
defer rows.Close()

for rows.Next() {
var asset, inputStr, outputStr sql.NullString
if err := rows.Scan(&acc.Metadata, &asset, &inputStr, &outputStr); err != nil {
return nil, s.error(err)
if asset.Valid {
assetsVolumes[asset.String] = core.Volumes{
Input: core.NewMonetaryInt(0),
Output: core.NewMonetaryInt(0),
}

if asset.Valid {
if inputStr.Valid {
input, err := core.ParseMonetaryInt(inputStr.String)
if err != nil {
return nil, s.error(err)
}
assetsVolumes[asset.String] = core.Volumes{
Input: core.NewMonetaryInt(0),
Output: core.NewMonetaryInt(0),
Input: input,
Output: assetsVolumes[asset.String].Output,
}
}

if inputStr.Valid {
input, err := core.ParseMonetaryInt(inputStr.String)
if err != nil {
return nil, s.error(err)
}
assetsVolumes[asset.String] = core.Volumes{
Input: input,
Output: assetsVolumes[asset.String].Output,
}
if outputStr.Valid {
output, err := core.ParseMonetaryInt(outputStr.String)
if err != nil {
return nil, s.error(err)
}

if outputStr.Valid {
output, err := core.ParseMonetaryInt(outputStr.String)
if err != nil {
return nil, s.error(err)
}
assetsVolumes[asset.String] = core.Volumes{
Input: assetsVolumes[asset.String].Input,
Output: output,
}
assetsVolumes[asset.String] = core.Volumes{
Input: assetsVolumes[asset.String].Input,
Output: output,
}
}
}
if err := rows.Err(); err != nil {
return nil, s.error(err)
}
}
if err := rows.Err(); err != nil {
return nil, s.error(err)
}

res := &core.AccountWithVolumes{
Expand Down
29 changes: 0 additions & 29 deletions pkg/storage/sqlstorage/store_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@ package sqlstorage

import (
"context"
"fmt"
"os"
"strconv"
"time"

"github.com/bits-and-blooms/bloom"
"github.com/formancehq/stack/libs/go-libs/logging"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/numary/ledger/pkg/core"
Expand All @@ -27,7 +23,6 @@ type Store struct {
onDelete func(ctx context.Context) error
lastLog *core.Log
lastTx *core.ExpandedTransaction
bloom *bloom.BloomFilter
cache *cache.Cache
}

Expand Down Expand Up @@ -71,35 +66,11 @@ func (s *Store) Close(ctx context.Context) error {
func NewStore(schema Schema, executorProvider func(ctx context.Context) (executor, error),
onClose, onDelete func(ctx context.Context) error) *Store {

const (
bloomFilterSizeEnvVar = "NUMARY_BLOOM_FILTER_SIZE"
bloomFilterErrorRateEnvVar = "NUMARY_BLOOM_FILTER_ERROR_RATE"
)

var (
bloomSize uint64 = 100000
bloomErrorRate = 0.01
err error
)
if bloomSizeFromEnv := os.Getenv(bloomFilterSizeEnvVar); bloomSizeFromEnv != "" {
bloomSize, err = strconv.ParseUint(bloomSizeFromEnv, 10, 64)
if err != nil {
panic(errors.Wrap(err, fmt.Sprint("Parsing", bloomFilterSizeEnvVar, "env var")))
}
}
if bloomErrorRateFromEnv := os.Getenv(bloomFilterErrorRateEnvVar); bloomErrorRateFromEnv != "" {
bloomErrorRate, err = strconv.ParseFloat(bloomErrorRateFromEnv, 64)
if err != nil {
panic(errors.Wrap(err, fmt.Sprint("Parsing", bloomFilterErrorRateEnvVar, "env var")))
}
}

return &Store{
executorProvider: executorProvider,
schema: schema,
onClose: onClose,
onDelete: onDelete,
bloom: bloom.NewWithEstimates(uint(bloomSize), bloomErrorRate),
cache: cache.New(5*time.Minute, 10*time.Minute),
}
}
Expand Down

0 comments on commit ddeb74b

Please sign in to comment.