Skip to content

Commit

Permalink
feat(ledger): improve queries performances (#920)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored and flemzord committed Dec 4, 2023
1 parent 84c34ec commit c53593d
Show file tree
Hide file tree
Showing 9 changed files with 501 additions and 381 deletions.
2 changes: 1 addition & 1 deletion Earthfile
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,4 @@ benchstat:
ARG compareAgainstRevision=main
COPY --pass-args github.com/formancehq/stack/components/ledger:$compareAgainstRevision+bench/results.txt /tmp/main.txt
COPY --pass-args +bench/results.txt /tmp/branch.txt
RUN benchstat /tmp/main.txt /tmp/branch.txt
RUN --no-cache benchstat /tmp/main.txt /tmp/branch.txt
28 changes: 18 additions & 10 deletions internal/storage/ledgerstore/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@ import (
)

func (store *Store) buildAccountQuery(q PITFilterWithVolumes, query *bun.SelectQuery) *bun.SelectQuery {

query = query.
DistinctOn("accounts.address").
Column("accounts.address").
ColumnExpr("coalesce(metadata, '{}'::jsonb) as metadata").
Table("accounts").
Apply(filterPIT(q.PIT, "insertion_date")).
Order("accounts.address", "revision desc")
Order("accounts.address")

if q.PIT == nil {
query = query.Join("left join accounts_metadata on accounts_metadata.address = accounts.address")
if q.PIT != nil && !q.PIT.IsZero() {
query = query.
Column("accounts.address").
ColumnExpr("accounts_metadata.metadata").
Join("left join accounts_metadata on accounts_metadata.address = accounts.address and accounts_metadata.date < ?", q.PIT).
Order("revision desc")
} else {
query = query.Join("left join accounts_metadata on accounts_metadata.address = accounts.address and accounts_metadata.date < ?", q.PIT)
query = query.Column("metadata")
}

if q.ExpandVolumes {
Expand All @@ -45,7 +48,7 @@ func (store *Store) buildAccountQuery(q PITFilterWithVolumes, query *bun.SelectQ
return query
}

func (store *Store) accountQueryContext(qb query.Builder) (string, []any, error) {
func (store *Store) accountQueryContext(qb query.Builder, q GetAccountsQuery) (string, []any, error) {
metadataRegex := regexp.MustCompile("metadata\\[(.+)\\]")
balanceRegex := regexp.MustCompile("balance\\[(.*)\\]")

Expand All @@ -68,7 +71,12 @@ func (store *Store) accountQueryContext(qb query.Builder) (string, []any, error)
}
match := metadataRegex.FindAllStringSubmatch(key, 3)

return "metadata @> ?", []any{map[string]any{
key := "metadata"
if q.Options.Options.PIT != nil && !q.Options.Options.PIT.IsZero() {
key = "accounts_metadata.metadata"
}

return key + " @> ?", []any{map[string]any{
match[0][1]: value,
}}, nil
case balanceRegex.Match([]byte(key)):
Expand Down Expand Up @@ -113,7 +121,7 @@ func (store *Store) GetAccountsWithVolumes(ctx context.Context, q GetAccountsQue
err error
)
if q.Options.QueryBuilder != nil {
where, args, err = store.accountQueryContext(q.Options.QueryBuilder)
where, args, err = store.accountQueryContext(q.Options.QueryBuilder, q)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -171,7 +179,7 @@ func (store *Store) CountAccounts(ctx context.Context, q GetAccountsQuery) (int,
err error
)
if q.Options.QueryBuilder != nil {
where, args, err = store.accountQueryContext(q.Options.QueryBuilder)
where, args, err = store.accountQueryContext(q.Options.QueryBuilder, q)
if err != nil {
return 0, err
}
Expand Down
45 changes: 39 additions & 6 deletions internal/storage/ledgerstore/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ func TestGetAccount(t *testing.T) {
t.Parallel()
store := newLedgerStore(t)
now := ledger.Now()
ctx := logging.TestingContext()

require.NoError(t, store.InsertLogs(context.Background(),
require.NoError(t, store.InsertLogs(ctx,
ledger.ChainLogs(
ledger.NewTransactionLog(ledger.NewTransaction().WithPostings(
ledger.NewPosting("world", "multi", "USD/2", big.NewInt(100)),
Expand All @@ -215,7 +216,7 @@ func TestGetAccount(t *testing.T) {

t.Run("find account", func(t *testing.T) {
t.Parallel()
account, err := store.GetAccountWithVolumes(context.Background(), NewGetAccountQuery("multi"))
account, err := store.GetAccountWithVolumes(ctx, NewGetAccountQuery("multi"))
require.NoError(t, err)
require.Equal(t, ledger.ExpandedAccount{
Account: ledger.Account{
Expand All @@ -225,11 +226,43 @@ func TestGetAccount(t *testing.T) {
},
},
}, *account)

account, err = store.GetAccountWithVolumes(ctx, NewGetAccountQuery("world"))
require.NoError(t, err)
require.Equal(t, ledger.ExpandedAccount{
Account: ledger.Account{
Address: "world",
Metadata: metadata.Metadata{},
},
}, *account)
})

t.Run("find account in past", func(t *testing.T) {
t.Parallel()
account, err := store.GetAccountWithVolumes(ctx, NewGetAccountQuery("multi"))
require.NoError(t, err)
require.Equal(t, ledger.ExpandedAccount{
Account: ledger.Account{
Address: "multi",
Metadata: metadata.Metadata{
"category": "gold",
},
},
}, *account)

account, err = store.GetAccountWithVolumes(ctx, NewGetAccountQuery("world").WithPIT(ledger.Now()))
require.NoError(t, err)
require.Equal(t, ledger.ExpandedAccount{
Account: ledger.Account{
Address: "world",
Metadata: metadata.Metadata{},
},
}, *account)
})

t.Run("find account with volumes", func(t *testing.T) {
t.Parallel()
account, err := store.GetAccountWithVolumes(context.Background(), NewGetAccountQuery("multi").
account, err := store.GetAccountWithVolumes(ctx, NewGetAccountQuery("multi").
WithExpandVolumes())
require.NoError(t, err)
require.Equal(t, ledger.ExpandedAccount{
Expand All @@ -247,7 +280,7 @@ func TestGetAccount(t *testing.T) {

t.Run("find account with effective volumes", func(t *testing.T) {
t.Parallel()
account, err := store.GetAccountWithVolumes(context.Background(), NewGetAccountQuery("multi").
account, err := store.GetAccountWithVolumes(ctx, NewGetAccountQuery("multi").
WithExpandEffectiveVolumes())
require.NoError(t, err)
require.Equal(t, ledger.ExpandedAccount{
Expand All @@ -265,7 +298,7 @@ func TestGetAccount(t *testing.T) {

t.Run("find account using pit", func(t *testing.T) {
t.Parallel()
account, err := store.GetAccountWithVolumes(context.Background(), NewGetAccountQuery("multi").WithPIT(now))
account, err := store.GetAccountWithVolumes(ctx, NewGetAccountQuery("multi").WithPIT(now))
require.NoError(t, err)
require.Equal(t, ledger.ExpandedAccount{
Account: ledger.Account{
Expand All @@ -278,7 +311,7 @@ func TestGetAccount(t *testing.T) {

t.Run("not existent account", func(t *testing.T) {
t.Parallel()
account, err := store.GetAccountWithVolumes(context.Background(), NewGetAccountQuery("account_not_existing"))
account, err := store.GetAccountWithVolumes(ctx, NewGetAccountQuery("account_not_existing"))
require.NoError(t, err)
require.NotNil(t, account)
})
Expand Down
40 changes: 26 additions & 14 deletions internal/storage/ledgerstore/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
func (store *Store) GetAggregatedBalances(ctx context.Context, q GetAggregatedBalanceQuery) (ledger.BalancesByAssets, error) {

var (
needJoinMetadata bool
subQuery string
args []any
err error
needMetadata bool
subQuery string
args []any
err error
)
if q.Options.QueryBuilder != nil {
subQuery, args, err = q.Options.QueryBuilder.Build(query.ContextFn(func(key, operator string, value any) (string, []any, error) {
Expand All @@ -42,9 +42,13 @@ func (store *Store) GetAggregatedBalances(ctx context.Context, q GetAggregatedBa
return "", nil, newErrInvalidQuery("'metadata' column can only be used with $match")
}
match := metadataRegex.FindAllStringSubmatch(key, 3)
needJoinMetadata = true
needMetadata = true
key := "accounts.metadata"
if q.Options.Options.PIT != nil {
key = "am.metadata"
}

return "am.metadata @> ?", []any{map[string]any{
return key + " @> ?", []any{map[string]any{
match[0][1]: value,
}}, nil
default:
Expand All @@ -68,14 +72,22 @@ func (store *Store) GetAggregatedBalances(ctx context.Context, q GetAggregatedBa
Order("account_address", "asset", "moves.seq desc").
Apply(filterPIT(q.Options.Options.PIT, "insertion_date")) // todo(gfyrag): expose capability to use effective_date

if needJoinMetadata {
moves = moves.Join(`left join lateral (
select metadata
from accounts_metadata am
where am.address = moves.account_address and (? is null or date <= ?)
order by revision desc
limit 1
) am on true`, q.Options.Options.PIT, q.Options.Options.PIT)
if needMetadata {
if q.Options.Options.PIT != nil {
moves = moves.Join(`join lateral (
select metadata
from accounts_metadata am
where am.address = moves.account_address and (? is null or date <= ?)
order by revision desc
limit 1
) am on true`, q.Options.Options.PIT, q.Options.Options.PIT)
} else {
moves = moves.Join(`join lateral (
select metadata
from accounts a
where a.address = moves.account_address
) accounts on true`)
}
}
if subQuery != "" {
moves = moves.Where(subQuery, args...)
Expand Down
Loading

0 comments on commit c53593d

Please sign in to comment.