From c53593d95c098f4e2c61742d39ee3a9bf78829ff Mon Sep 17 00:00:00 2001 From: Ragot Geoffrey Date: Tue, 28 Nov 2023 10:16:59 +0100 Subject: [PATCH] feat(ledger): improve queries performances (#920) --- Earthfile | 2 +- internal/storage/ledgerstore/accounts.go | 28 +- internal/storage/ledgerstore/accounts_test.go | 45 +- internal/storage/ledgerstore/balances.go | 40 +- .../ledgerstore/migrations/0-init-schema.sql | 730 ++++++++++-------- .../ledgerstore/store_benchmarks_test.go | 1 + internal/storage/ledgerstore/transactions.go | 27 +- .../storage/ledgerstore/transactions_test.go | 1 + internal/storage/ledgerstore/utils.go | 8 +- 9 files changed, 501 insertions(+), 381 deletions(-) diff --git a/Earthfile b/Earthfile index 867009c8b..d6b6fe12b 100644 --- a/Earthfile +++ b/Earthfile @@ -115,4 +115,4 @@ benchstat: ARG compareAgainstRevision=main COPY --pass-args github.com/formancehq/stack/components/ledger:$compareAgainstRevision+bench/results.txt /tmp/main.txt COPY --pass-args +bench/results.txt /tmp/branch.txt - RUN benchstat /tmp/main.txt /tmp/branch.txt + RUN --no-cache benchstat /tmp/main.txt /tmp/branch.txt diff --git a/internal/storage/ledgerstore/accounts.go b/internal/storage/ledgerstore/accounts.go index 4d118fe4b..a688849c8 100644 --- a/internal/storage/ledgerstore/accounts.go +++ b/internal/storage/ledgerstore/accounts.go @@ -16,18 +16,21 @@ import ( ) func (store *Store) buildAccountQuery(q PITFilterWithVolumes, query *bun.SelectQuery) *bun.SelectQuery { + query = query. - DistinctOn("accounts.address"). Column("accounts.address"). - ColumnExpr("coalesce(metadata, '{}'::jsonb) as metadata"). Table("accounts"). Apply(filterPIT(q.PIT, "insertion_date")). - Order("accounts.address", "revision desc") + Order("accounts.address") - if q.PIT == nil { - query = query.Join("left join accounts_metadata on accounts_metadata.address = accounts.address") + if q.PIT != nil && !q.PIT.IsZero() { + query = query. + Column("accounts.address"). + ColumnExpr("accounts_metadata.metadata"). + Join("left join accounts_metadata on accounts_metadata.address = accounts.address and accounts_metadata.date < ?", q.PIT). + Order("revision desc") } else { - query = query.Join("left join accounts_metadata on accounts_metadata.address = accounts.address and accounts_metadata.date < ?", q.PIT) + query = query.Column("metadata") } if q.ExpandVolumes { @@ -45,7 +48,7 @@ func (store *Store) buildAccountQuery(q PITFilterWithVolumes, query *bun.SelectQ return query } -func (store *Store) accountQueryContext(qb query.Builder) (string, []any, error) { +func (store *Store) accountQueryContext(qb query.Builder, q GetAccountsQuery) (string, []any, error) { metadataRegex := regexp.MustCompile("metadata\\[(.+)\\]") balanceRegex := regexp.MustCompile("balance\\[(.*)\\]") @@ -68,7 +71,12 @@ func (store *Store) accountQueryContext(qb query.Builder) (string, []any, error) } match := metadataRegex.FindAllStringSubmatch(key, 3) - return "metadata @> ?", []any{map[string]any{ + key := "metadata" + if q.Options.Options.PIT != nil && !q.Options.Options.PIT.IsZero() { + key = "accounts_metadata.metadata" + } + + return key + " @> ?", []any{map[string]any{ match[0][1]: value, }}, nil case balanceRegex.Match([]byte(key)): @@ -113,7 +121,7 @@ func (store *Store) GetAccountsWithVolumes(ctx context.Context, q GetAccountsQue err error ) if q.Options.QueryBuilder != nil { - where, args, err = store.accountQueryContext(q.Options.QueryBuilder) + where, args, err = store.accountQueryContext(q.Options.QueryBuilder, q) if err != nil { return nil, err } @@ -171,7 +179,7 @@ func (store *Store) CountAccounts(ctx context.Context, q GetAccountsQuery) (int, err error ) if q.Options.QueryBuilder != nil { - where, args, err = store.accountQueryContext(q.Options.QueryBuilder) + where, args, err = store.accountQueryContext(q.Options.QueryBuilder, q) if err != nil { return 0, err } diff --git a/internal/storage/ledgerstore/accounts_test.go b/internal/storage/ledgerstore/accounts_test.go index 39d0ec0fc..0fd707bcc 100644 --- a/internal/storage/ledgerstore/accounts_test.go +++ b/internal/storage/ledgerstore/accounts_test.go @@ -197,8 +197,9 @@ func TestGetAccount(t *testing.T) { t.Parallel() store := newLedgerStore(t) now := ledger.Now() + ctx := logging.TestingContext() - require.NoError(t, store.InsertLogs(context.Background(), + require.NoError(t, store.InsertLogs(ctx, ledger.ChainLogs( ledger.NewTransactionLog(ledger.NewTransaction().WithPostings( ledger.NewPosting("world", "multi", "USD/2", big.NewInt(100)), @@ -215,7 +216,7 @@ func TestGetAccount(t *testing.T) { t.Run("find account", func(t *testing.T) { t.Parallel() - account, err := store.GetAccountWithVolumes(context.Background(), NewGetAccountQuery("multi")) + account, err := store.GetAccountWithVolumes(ctx, NewGetAccountQuery("multi")) require.NoError(t, err) require.Equal(t, ledger.ExpandedAccount{ Account: ledger.Account{ @@ -225,11 +226,43 @@ func TestGetAccount(t *testing.T) { }, }, }, *account) + + account, err = store.GetAccountWithVolumes(ctx, NewGetAccountQuery("world")) + require.NoError(t, err) + require.Equal(t, ledger.ExpandedAccount{ + Account: ledger.Account{ + Address: "world", + Metadata: metadata.Metadata{}, + }, + }, *account) + }) + + t.Run("find account in past", func(t *testing.T) { + t.Parallel() + account, err := store.GetAccountWithVolumes(ctx, NewGetAccountQuery("multi")) + require.NoError(t, err) + require.Equal(t, ledger.ExpandedAccount{ + Account: ledger.Account{ + Address: "multi", + Metadata: metadata.Metadata{ + "category": "gold", + }, + }, + }, *account) + + account, err = store.GetAccountWithVolumes(ctx, NewGetAccountQuery("world").WithPIT(ledger.Now())) + require.NoError(t, err) + require.Equal(t, ledger.ExpandedAccount{ + Account: ledger.Account{ + Address: "world", + Metadata: metadata.Metadata{}, + }, + }, *account) }) t.Run("find account with volumes", func(t *testing.T) { t.Parallel() - account, err := store.GetAccountWithVolumes(context.Background(), NewGetAccountQuery("multi"). + account, err := store.GetAccountWithVolumes(ctx, NewGetAccountQuery("multi"). WithExpandVolumes()) require.NoError(t, err) require.Equal(t, ledger.ExpandedAccount{ @@ -247,7 +280,7 @@ func TestGetAccount(t *testing.T) { t.Run("find account with effective volumes", func(t *testing.T) { t.Parallel() - account, err := store.GetAccountWithVolumes(context.Background(), NewGetAccountQuery("multi"). + account, err := store.GetAccountWithVolumes(ctx, NewGetAccountQuery("multi"). WithExpandEffectiveVolumes()) require.NoError(t, err) require.Equal(t, ledger.ExpandedAccount{ @@ -265,7 +298,7 @@ func TestGetAccount(t *testing.T) { t.Run("find account using pit", func(t *testing.T) { t.Parallel() - account, err := store.GetAccountWithVolumes(context.Background(), NewGetAccountQuery("multi").WithPIT(now)) + account, err := store.GetAccountWithVolumes(ctx, NewGetAccountQuery("multi").WithPIT(now)) require.NoError(t, err) require.Equal(t, ledger.ExpandedAccount{ Account: ledger.Account{ @@ -278,7 +311,7 @@ func TestGetAccount(t *testing.T) { t.Run("not existent account", func(t *testing.T) { t.Parallel() - account, err := store.GetAccountWithVolumes(context.Background(), NewGetAccountQuery("account_not_existing")) + account, err := store.GetAccountWithVolumes(ctx, NewGetAccountQuery("account_not_existing")) require.NoError(t, err) require.NotNil(t, account) }) diff --git a/internal/storage/ledgerstore/balances.go b/internal/storage/ledgerstore/balances.go index 3a45bc9e0..ed93b803b 100644 --- a/internal/storage/ledgerstore/balances.go +++ b/internal/storage/ledgerstore/balances.go @@ -17,10 +17,10 @@ import ( func (store *Store) GetAggregatedBalances(ctx context.Context, q GetAggregatedBalanceQuery) (ledger.BalancesByAssets, error) { var ( - needJoinMetadata bool - subQuery string - args []any - err error + needMetadata bool + subQuery string + args []any + err error ) if q.Options.QueryBuilder != nil { subQuery, args, err = q.Options.QueryBuilder.Build(query.ContextFn(func(key, operator string, value any) (string, []any, error) { @@ -42,9 +42,13 @@ func (store *Store) GetAggregatedBalances(ctx context.Context, q GetAggregatedBa return "", nil, newErrInvalidQuery("'metadata' column can only be used with $match") } match := metadataRegex.FindAllStringSubmatch(key, 3) - needJoinMetadata = true + needMetadata = true + key := "accounts.metadata" + if q.Options.Options.PIT != nil { + key = "am.metadata" + } - return "am.metadata @> ?", []any{map[string]any{ + return key + " @> ?", []any{map[string]any{ match[0][1]: value, }}, nil default: @@ -68,14 +72,22 @@ func (store *Store) GetAggregatedBalances(ctx context.Context, q GetAggregatedBa Order("account_address", "asset", "moves.seq desc"). Apply(filterPIT(q.Options.Options.PIT, "insertion_date")) // todo(gfyrag): expose capability to use effective_date - if needJoinMetadata { - moves = moves.Join(`left join lateral ( - select metadata - from accounts_metadata am - where am.address = moves.account_address and (? is null or date <= ?) - order by revision desc - limit 1 - ) am on true`, q.Options.Options.PIT, q.Options.Options.PIT) + if needMetadata { + if q.Options.Options.PIT != nil { + moves = moves.Join(`join lateral ( + select metadata + from accounts_metadata am + where am.address = moves.account_address and (? is null or date <= ?) + order by revision desc + limit 1 + ) am on true`, q.Options.Options.PIT, q.Options.Options.PIT) + } else { + moves = moves.Join(`join lateral ( + select metadata + from accounts a + where a.address = moves.account_address + ) accounts on true`) + } } if subQuery != "" { moves = moves.Where(subQuery, args...) diff --git a/internal/storage/ledgerstore/migrations/0-init-schema.sql b/internal/storage/ledgerstore/migrations/0-init-schema.sql index ba96b42a9..ea3e62d4e 100644 --- a/internal/storage/ledgerstore/migrations/0-init-schema.sql +++ b/internal/storage/ledgerstore/migrations/0-init-schema.sql @@ -2,10 +2,10 @@ Some utils */ create aggregate aggregate_objects(jsonb) ( - sfunc = jsonb_concat, - stype = jsonb, - initcond = '{}' -); + sfunc = jsonb_concat, + stype = jsonb, + initcond = '{}' + ); create function first_agg (anyelement, anyelement) returns anyelement @@ -14,55 +14,62 @@ create function first_agg (anyelement, anyelement) strict parallel safe as $$ - select $1 +select $1 $$; create aggregate first (anyelement) ( sfunc = first_agg, stype = anyelement, parallel = safe -); + ); create function array_distinct(anyarray) returns anyarray language sql immutable as $$ - select array_agg(distinct x) - from unnest($1) t(x); +select array_agg(distinct x) +from unnest($1) t(x); $$; /** Define types **/ -create type account_with_volumes as ( +create type account_with_volumes as +( address varchar, metadata jsonb, volumes jsonb ); -create type volumes as ( +create type volumes as +( inputs numeric, outputs numeric ); -create type volumes_with_asset as ( +create type volumes_with_asset as +( asset varchar, volumes volumes ); /** Define tables **/ -create table transactions ( +create table transactions +( id numeric not null primary key, timestamp timestamp without time zone not null, reference varchar, reverted_at timestamp without time zone, + updated_at timestamp without time zone, postings varchar not null, sources jsonb, destinations jsonb, sources_arrays jsonb, - destinations_arrays jsonb + destinations_arrays jsonb, + metadata jsonb not null default '{}'::jsonb ); -create table transactions_metadata ( +create table transactions_metadata +( transaction_id numeric not null references transactions(id), revision numeric default 0 not null, date timestamp not null, @@ -71,20 +78,25 @@ create table transactions_metadata ( primary key (transaction_id, revision) ); -create table accounts ( +create table accounts +( address varchar primary key, address_array jsonb not null, - insertion_date timestamp not null + insertion_date timestamp not null, + updated_at timestamp not null, + metadata jsonb not null default '{}'::jsonb ); -create table accounts_metadata ( +create table accounts_metadata +( address varchar references accounts(address), - metadata jsonb default '{}'::jsonb, + metadata jsonb not null default '{}'::jsonb, revision numeric default 0, date timestamp ); -create table moves ( +create table moves +( seq serial not null primary key , transaction_id numeric not null references transactions(id), account_address varchar not null, @@ -98,14 +110,16 @@ create table moves ( is_source boolean not null ); -create type log_type as enum ( - 'NEW_TRANSACTION', - 'REVERTED_TRANSACTION', - 'SET_METADATA', - 'DELETE_METADATA' -); +create type log_type as enum + ( + 'NEW_TRANSACTION', + 'REVERTED_TRANSACTION', + 'SET_METADATA', + 'DELETE_METADATA' + ); -create table logs ( +create table logs +( id numeric not null primary key, type log_type not null, hash bytea not null, @@ -121,7 +135,7 @@ create function balance_from_volumes(v volumes) language sql immutable as $$ - select v.inputs - v.outputs +select v.inputs - v.outputs $$; /** Index required for write part */ @@ -129,7 +143,10 @@ create index moves_range_dates on moves (account_address, asset, effective_date) /** Index requires for read */ create index transactions_date on transactions (timestamp); -create index transactions_metadata_metadata on transactions_metadata using gin (metadata); +create index transactions_metadata_index on transactions using gin (metadata jsonb_path_ops); +create index transactions_metadata_metadata on transactions_metadata using gin (metadata jsonb_path_ops); +create unique index transactions_metadata_revisions on transactions_metadata(transaction_id asc, revision desc) include (metadata, date); + --create unique index transactions_revisions on transactions_metadata(id desc, revision desc); create index transactions_sources on transactions using gin (sources jsonb_path_ops); create index transactions_destinations on transactions using gin (destinations jsonb_path_ops); @@ -148,41 +165,28 @@ create index moves_transactions_id on moves (transaction_id); create index accounts_address_array on accounts using gin (address_array jsonb_ops); create index accounts_address_array_length on accounts (jsonb_array_length(address_array)); +create index accounts_metadata_index on accounts (address) include (metadata); +create index accounts_metadata_metadata on accounts_metadata using gin (metadata jsonb_path_ops); -create unique index accounts_metadata_revisions on accounts_metadata(address asc, revision desc); +create unique index accounts_metadata_revisions on accounts_metadata(address asc, revision desc) include (metadata, date); /** Define write functions **/ -create function insert_new_account(_address varchar, _date timestamp) - returns bool - language plpgsql -as $$ - declare - _account accounts; - begin - insert into accounts(address, address_array, insertion_date) - values (_address, to_json(string_to_array(_address, ':')), _date) - on conflict do nothing - returning * into _account; - - return _account is not null; - end; -$$; -- given the input : "a:b:c", the function will produce : '{"0": "a", "1": "b", "2": "c", "3": null}' create function explode_address(_address varchar) - returns jsonb - language sql - immutable + returns jsonb + language sql + immutable as $$ - select aggregate_objects(jsonb_build_object(data.number - 1, data.value)) - from ( - select row_number() over () as number, v.value - from ( - select unnest(string_to_array(_address, ':')) as value - union all - select null - ) v - ) data +select aggregate_objects(jsonb_build_object(data.number - 1, data.value)) +from ( + select row_number() over () as number, v.value + from ( + select unnest(string_to_array(_address, ':')) as value + union all + select null + ) v + ) data $$; create function get_account(_account_address varchar, _before timestamp default null) @@ -190,12 +194,12 @@ create function get_account(_account_address varchar, _before timestamp default language sql stable as $$ - select distinct on (address) * - from accounts_metadata t - where (_before is null or t.date <= _before) - and t.address = _account_address - order by address, revision desc - limit 1; +select distinct on (address) * +from accounts_metadata t +where (_before is null or t.date <= _before) + and t.address = _account_address +order by address, revision desc +limit 1; $$; create function get_transaction(_id numeric, _before timestamp default null) @@ -203,11 +207,11 @@ create function get_transaction(_id numeric, _before timestamp default null) language sql stable as $$ - select * - from transactions t - where (_before is null or t.timestamp <= _before) and t.id = _id - order by id desc - limit 1; +select * +from transactions t +where (_before is null or t.timestamp <= _before) and t.id = _id +order by id desc +limit 1; $$; -- a simple 'select distinct asset from moves' would be more simple @@ -218,21 +222,22 @@ create function get_all_assets() returns setof varchar language sql as $$ - with recursive t as ( - select min(asset) as asset - from moves - union all - select ( +with recursive t as ( + select min(asset) as asset + from moves + union all + select + ( select min(asset) from moves where asset > t.asset ) - from t - where t.asset is not null - ) - select asset from t where asset is not null - union all - select null where exists(select 1 from moves where asset is null) + from t + where t.asset is not null +) +select asset from t where asset is not null +union all +select null where exists(select 1 from moves where asset is null) $$; create function get_latest_move_for_account_and_asset(_account_address varchar, _asset varchar, _before timestamp default null) @@ -240,258 +245,245 @@ create function get_latest_move_for_account_and_asset(_account_address varchar, language sql stable as $$ - select * - from moves s - where (_before is null or s.effective_date <= _before) and s.account_address = _account_address and s.asset = _asset - order by effective_date desc, seq desc - limit 1; +select * +from moves s +where (_before is null or s.effective_date <= _before) and s.account_address = _account_address and s.asset = _asset +order by effective_date desc, seq desc +limit 1; $$; -create function update_account_metadata(_address varchar, _metadata jsonb, _date timestamp) - returns void - language sql +create function upsert_account(_address varchar, _metadata jsonb, _date timestamp) + returns bool + language plpgsql as $$ - select insert_new_account(_address, _date); - - insert into accounts_metadata (address, metadata, date, revision) - ( - select _address, accounts_metadata.metadata || _metadata, _date, accounts_metadata.revision + 1 - from accounts_metadata - where address = _address - order by revision desc - limit 1 - ) - union all -- if no metdata - select _address, _metadata, _date, 0 - limit 1; +declare + exists bool = false; +begin + select true from accounts where address = _address into exists; + + insert into accounts(address, address_array, insertion_date, metadata, updated_at) + values (_address, to_json(string_to_array(_address, ':')), _date, coalesce(_metadata, '{}'::jsonb), _date) + on conflict (address) do update + set metadata = accounts.metadata || coalesce(_metadata, '{}'::jsonb), + updated_at = _date + where not accounts.metadata @> coalesce(_metadata, '{}'::jsonb); + + return exists is null; +end; $$; create function delete_account_metadata(_address varchar, _key varchar, _date timestamp) returns void - language sql + language plpgsql as $$ - insert into accounts_metadata (address, metadata, date, revision) - select _address, accounts_metadata.metadata - _key, _date, accounts_metadata.revision + 1 - from accounts_metadata - where address = _address - order by revision desc - limit 1 +begin + update accounts + set metadata = metadata - _key, updated_at = _date + where address = _address; +end $$; create function update_transaction_metadata(_id numeric, _metadata jsonb, _date timestamp) returns void - language sql + language plpgsql as $$ - insert into transactions_metadata (transaction_id, metadata, date, revision) - ( - select originalTX.transaction_id, - originalTX.metadata || _metadata, - _date, - originalTX.revision + 1 - from transactions_metadata originalTX - where transaction_id = _id - order by revision desc - limit 1 - ) - union all ( - select _id, '{}'::jsonb, null, -1 - ) - limit 1 +begin + update transactions + set metadata = metadata || _metadata, updated_at = _date + where id = _id; -- todo: add fill factor on transactions table ? +end; $$; create function delete_transaction_metadata(_id numeric, _key varchar, _date timestamp) returns void - language sql + language plpgsql as $$ - insert into transactions_metadata (transaction_id, metadata, date, revision) - select originalTX.transaction_id, - originalTX.metadata - _key, - _date, - originalTX.revision + 1 - from transactions_metadata originalTX - where transaction_id = _id - order by revision desc - limit 1; +begin + update transactions + set metadata = metadata - _key, updated_at = _date + where id = _id; +end; $$; create function revert_transaction(_id numeric, _date timestamp) returns void language sql as $$ - update transactions - set reverted_at = _date - where id = _id; +update transactions +set reverted_at = _date +where id = _id; $$; + create or replace function insert_move(_transaction_id numeric, _insertion_date timestamp without time zone, - _effective_date timestamp without time zone, _account_address varchar, _asset varchar, _amount numeric, _is_source bool, _new_account bool) + _effective_date timestamp without time zone, _account_address varchar, _asset varchar, _amount numeric, _is_source bool, _new_account bool) returns void language plpgsql as $$ - declare - _post_commit_volumes volumes = (0, 0)::volumes; - _effective_post_commit_volumes volumes = (0, 0)::volumes; - _seq numeric; - begin - - -- todo: lock if we enable parallelism - -- perform * - -- from accounts - -- where address = _account_address - -- for update; - - if not _new_account then - select (post_commit_volumes).inputs, (post_commit_volumes).outputs into _post_commit_volumes +declare + _post_commit_volumes volumes = (0, 0)::volumes; + _effective_post_commit_volumes volumes = (0, 0)::volumes; + _seq numeric; +begin + + -- todo: lock if we enable parallelism + -- perform * + -- from accounts + -- where address = _account_address + -- for update; + + if not _new_account then + select (post_commit_volumes).inputs, (post_commit_volumes).outputs into _post_commit_volumes + from moves + where account_address = _account_address + and asset = _asset + order by seq desc + limit 1; + + if not found then + _post_commit_volumes = (0, 0)::volumes; + _effective_post_commit_volumes = (0, 0)::volumes; + else + select (post_commit_effective_volumes).inputs, (post_commit_effective_volumes).outputs into _effective_post_commit_volumes from moves where account_address = _account_address - and asset = _asset - order by seq desc + and asset = _asset and effective_date <= _effective_date + order by effective_date desc, seq desc limit 1; - - if not found then - _post_commit_volumes = (0, 0)::volumes; - _effective_post_commit_volumes = (0, 0)::volumes; - else - select (post_commit_effective_volumes).inputs, (post_commit_effective_volumes).outputs into _effective_post_commit_volumes - from moves - where account_address = _account_address - and asset = _asset and effective_date <= _effective_date - order by effective_date desc, seq desc - limit 1; - end if; end if; + end if; - if _is_source then - _post_commit_volumes.outputs = _post_commit_volumes.outputs + _amount; - _effective_post_commit_volumes.outputs = _effective_post_commit_volumes.outputs + _amount; - else - _post_commit_volumes.inputs = _post_commit_volumes.inputs + _amount; - _effective_post_commit_volumes.inputs = _effective_post_commit_volumes.inputs + _amount; - end if; + if _is_source then + _post_commit_volumes.outputs = _post_commit_volumes.outputs + _amount; + _effective_post_commit_volumes.outputs = _effective_post_commit_volumes.outputs + _amount; + else + _post_commit_volumes.inputs = _post_commit_volumes.inputs + _amount; + _effective_post_commit_volumes.inputs = _effective_post_commit_volumes.inputs + _amount; + end if; - insert into moves ( - insertion_date, - effective_date, - account_address, - asset, - transaction_id, - amount, - is_source, - account_address_array, - post_commit_volumes, - post_commit_effective_volumes - ) values (_insertion_date, _effective_date, _account_address, _asset, _transaction_id, - _amount, _is_source, (select to_json(string_to_array(_account_address, ':'))), - _post_commit_volumes, _effective_post_commit_volumes) - returning seq into _seq; - - if not _new_account then - update moves - set post_commit_effective_volumes = ( - (post_commit_effective_volumes).inputs + case when _is_source then 0 else _amount end, - (post_commit_effective_volumes).outputs + case when _is_source then _amount else 0 end - ) - where account_address = _account_address and asset = _asset and effective_date > _effective_date; - - update moves - set post_commit_effective_volumes = ( - (post_commit_effective_volumes).inputs + case when _is_source then 0 else _amount end, - (post_commit_effective_volumes).outputs + case when _is_source then _amount else 0 end - ) - where account_address = _account_address and asset = _asset and effective_date = _effective_date and seq > _seq; - end if; - end; + insert into moves ( + insertion_date, + effective_date, + account_address, + asset, + transaction_id, + amount, + is_source, + account_address_array, + post_commit_volumes, + post_commit_effective_volumes + ) values (_insertion_date, _effective_date, _account_address, _asset, _transaction_id, + _amount, _is_source, (select to_json(string_to_array(_account_address, ':'))), + _post_commit_volumes, _effective_post_commit_volumes) + returning seq into _seq; + + if not _new_account then + update moves + set post_commit_effective_volumes = + ( + (post_commit_effective_volumes).inputs + case when _is_source then 0 else _amount end, + (post_commit_effective_volumes).outputs + case when _is_source then _amount else 0 end + ) + where account_address = _account_address and asset = _asset and effective_date > _effective_date; + + update moves + set post_commit_effective_volumes = + ( + (post_commit_effective_volumes).inputs + case when _is_source then 0 else _amount end, + (post_commit_effective_volumes).outputs + case when _is_source then _amount else 0 end + ) + where account_address = _account_address and asset = _asset and effective_date = _effective_date and seq > _seq; + end if; +end; $$; -create function insert_posting(_transaction_id numeric, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, posting jsonb) +create function insert_posting(_transaction_id numeric, _insertion_date timestamp without time zone, + _effective_date timestamp without time zone, posting jsonb, _account_metadata jsonb) returns void language plpgsql as $$ - declare - source_created bool; - destination_created bool; - begin - select insert_new_account(posting->>'source', _insertion_date) into source_created; - select insert_new_account(posting->>'destination', _insertion_date) into destination_created; - - -- todo: sometimes the balance is known at commit time (for sources != world), we need to forward the value to populate the pre_commit_aggregated_input and output - perform insert_move(_transaction_id, _insertion_date, _effective_date, - posting->>'source', posting->>'asset', (posting->>'amount')::numeric, true, source_created); - perform insert_move(_transaction_id, _insertion_date, _effective_date, - posting->>'destination', posting->>'asset', (posting->>'amount')::numeric, false, destination_created); - end; +declare + source_created bool; + destination_created bool; +begin + select upsert_account(posting->>'source', _account_metadata->(posting->>'source'), _insertion_date) into source_created; + select upsert_account(posting->>'destination', _account_metadata->(posting->>'destination'), _insertion_date) into destination_created; + + -- todo: sometimes the balance is known at commit time (for sources != world), we need to forward the value to populate the pre_commit_aggregated_input and output + perform insert_move(_transaction_id, _insertion_date, _effective_date, + posting->>'source', posting->>'asset', (posting->>'amount')::numeric, true, source_created); + perform insert_move(_transaction_id, _insertion_date, _effective_date, + posting->>'destination', posting->>'asset', (posting->>'amount')::numeric, false, destination_created); +end; $$; -- todo: maybe we could avoid plpgsql functions -create function insert_transaction(data jsonb, _date timestamp without time zone) +create function insert_transaction(data jsonb, _date timestamp without time zone, _account_metadata jsonb) returns void language plpgsql as $$ - declare - posting jsonb; - begin - insert into transactions (id, timestamp, reference, postings, sources, destinations, sources_arrays, destinations_arrays) - values ((data->>'id')::numeric, - (data->>'timestamp')::timestamp without time zone, - data->>'reference', - jsonb_pretty(data->'postings'), - ( - select to_jsonb(array_agg(v->>'source')) as value - from jsonb_array_elements(data->'postings') v - ), - ( - select to_jsonb(array_agg(v->>'destination')) as value - from jsonb_array_elements(data->'postings') v - ), - ( - select to_jsonb(array_agg(explode_address(v->>'source'))) as value - from jsonb_array_elements(data->'postings') v - ), - ( - select to_jsonb(array_agg(explode_address(v->>'destination'))) as value - from jsonb_array_elements(data->'postings') v - ) - ); - - for posting in (select jsonb_array_elements(data->'postings')) loop +declare + posting jsonb; +begin + insert into transactions (id, timestamp, updated_at, reference, postings, sources, + destinations, sources_arrays, destinations_arrays, metadata) + values ((data->>'id')::numeric, + (data->>'timestamp')::timestamp without time zone, + (data->>'timestamp')::timestamp without time zone, + data->>'reference', + jsonb_pretty(data->'postings'), + ( + select to_jsonb(array_agg(v->>'source')) as value + from jsonb_array_elements(data->'postings') v + ), + ( + select to_jsonb(array_agg(v->>'destination')) as value + from jsonb_array_elements(data->'postings') v + ), + ( + select to_jsonb(array_agg(explode_address(v->>'source'))) as value + from jsonb_array_elements(data->'postings') v + ), + ( + select to_jsonb(array_agg(explode_address(v->>'destination'))) as value + from jsonb_array_elements(data->'postings') v + ), + coalesce(data->'metadata', '{}'::jsonb) + ); + + for posting in (select jsonb_array_elements(data->'postings')) loop -- todo: sometimes the balance is known at commit time (for sources != world), we need to forward the value to populate the pre_commit_aggregated_input and output - perform insert_posting((data->>'id')::numeric, _date, (data->>'timestamp')::timestamp without time zone, posting); + perform insert_posting((data->>'id')::numeric, _date, (data->>'timestamp')::timestamp without time zone, posting, _account_metadata); end loop; - if data->'metadata' is not null and data->>'metadata' <> '()' then - insert into transactions_metadata (transaction_id, revision, date, metadata) values ( + if data->'metadata' is not null and data->>'metadata' <> '()' then + insert into transactions_metadata (transaction_id, revision, date, metadata) values + ( (data->>'id')::numeric, 0, - (data->>'timestamp')::timestamp without time zone, - coalesce(data->'metadata', '{}'::jsonb) + (data->>'timestamp')::timestamp without time zone, coalesce(data->'metadata', '{}'::jsonb) ); - end if; - end + end if; +end $$; create function handle_log() returns trigger - security definer - language plpgsql + security definer + language plpgsql as $$ - declare - _key varchar; - _value jsonb; - begin +begin if new.type = 'NEW_TRANSACTION' then - perform insert_transaction(new.data->'transaction', new.date); - for _key, _value in (select * from jsonb_each_text(new.data->'accountMetadata')) loop - perform update_account_metadata(_key, _value, (new.data->'transaction'->>'timestamp')::timestamp); - end loop; + perform insert_transaction(new.data->'transaction', new.date, new.data->'accountMetadata'); end if; if new.type = 'REVERTED_TRANSACTION' then - perform insert_transaction(new.data->'transaction', new.date); + perform insert_transaction(new.data->'transaction', new.date, '{}'::jsonb); perform revert_transaction((new.data->>'revertedTransactionID')::numeric, (new.data->'transaction'->>'timestamp')::timestamp); end if; if new.type = 'SET_METADATA' then if new.data->>'targetType' = 'TRANSACTION' then perform update_transaction_metadata((new.data->>'targetId')::numeric, new.data->'metadata', new.date); else - perform update_account_metadata((new.data->>'targetId')::varchar, new.data ->'metadata', new.date); + perform upsert_account((new.data->>'targetId')::varchar, new.data -> 'metadata', new.date); end if; end if; if new.type = 'DELETE_METADATA' then @@ -503,36 +495,103 @@ as $$ end if; return new; - end; +end; $$; -/** Define the trigger which populate table in response to new logs **/ create trigger insert_log after insert on logs for each row execute procedure handle_log(); +create function update_account_metadata_history() returns trigger + security definer + language plpgsql +as $$ +begin + insert into accounts_metadata (address, revision, date, metadata) values (new.address, ( + select revision + 1 + from accounts_metadata + where accounts_metadata.address = new.address + order by revision desc + limit 1 + ), new.updated_at, new.metadata); + + return new; +end; +$$; + +create trigger update_account after update on accounts + for each row execute procedure update_account_metadata_history(); + +create function insert_account_metadata_history() returns trigger + security definer + language plpgsql +as $$ +begin + insert into accounts_metadata (address, revision, date, metadata) values (new.address, 1, new.insertion_date, new.metadata); + + return new; +end; +$$; + +create trigger insert_account after insert on accounts + for each row execute procedure insert_account_metadata_history(); + +create function update_transaction_metadata_history() returns trigger + security definer + language plpgsql +as $$ +begin + insert into transactions_metadata (transaction_id, revision, date, metadata) values (new.id, ( + select revision + 1 + from transactions_metadata + where transactions_metadata.transaction_id = new.id + order by revision desc + limit 1 + ), new.updated_at, new.metadata); + + return new; +end; +$$; + +create trigger update_transaction after update on transactions + for each row execute procedure update_transaction_metadata_history(); + +create function insert_transaction_metadata_history() returns trigger + security definer + language plpgsql +as $$ +begin + insert into transactions_metadata (transaction_id, revision, date, metadata) values (new.id, 1, new.timestamp, new.metadata); + + return new; +end; +$$; + +create trigger insert_transaction after insert on transactions + for each row execute procedure insert_transaction_metadata_history(); + create or replace function get_all_account_effective_volumes(_account varchar, _before timestamp default null) returns setof volumes_with_asset language sql stable as $$ - with - all_assets as ( - select v.v as asset - from get_all_assets() v - ), - moves as ( - select m.* - from all_assets assets - join lateral ( - select * - from moves s - where (_before is null or s.effective_date <= _before) and s.account_address = _account and s.asset = assets.asset - order by effective_date desc, seq desc - limit 1 +with + all_assets as ( + select v.v as asset + from get_all_assets() v + ), + moves as ( + select m.* + from all_assets assets + join lateral ( + select * + from moves s + where (_before is null or s.effective_date <= _before) and s.account_address = _account and s.asset = assets.asset + order by effective_date desc, seq desc + limit 1 ) m on true - ) - select moves.asset, moves.post_commit_effective_volumes - from moves + ) +select moves.asset, moves.post_commit_effective_volumes +from moves $$; create or replace function get_all_account_volumes(_account varchar, _before timestamp default null) @@ -540,24 +599,23 @@ create or replace function get_all_account_volumes(_account varchar, _before tim language sql stable as $$ - with - all_assets as ( - select v.v as asset - from get_all_assets() v - ), - moves as ( - select m.* - from all_assets assets - join lateral ( - select * - from moves s - where (_before is null or s.insertion_date <= _before) and s.account_address = _account and s.asset = assets.asset - order by seq desc - limit 1 +with + all_assets as ( + select v.v as asset + from get_all_assets() v + ), + moves as ( + select m.* + from all_assets assets join lateral ( + select * + from moves s + where (_before is null or s.insertion_date <= _before) and s.account_address = _account and s.asset = assets.asset + order by seq desc + limit 1 ) m on true - ) - select moves.asset, moves.post_commit_volumes - from moves + ) +select moves.asset, moves.post_commit_volumes +from moves $$; create function volumes_to_jsonb(v volumes_with_asset) @@ -565,7 +623,7 @@ create function volumes_to_jsonb(v volumes_with_asset) language sql immutable as $$ - select ('{"' || v.asset || '": {"input": ' || (v.volumes).inputs || ', "output": ' || (v.volumes).outputs || '}}')::jsonb +select ('{"' || v.asset || '": {"input": ' || (v.volumes).inputs || ', "output": ' || (v.volumes).outputs || '}}')::jsonb $$; create function get_account_aggregated_effective_volumes(_account_address varchar, _before timestamp default null) @@ -573,8 +631,8 @@ create function get_account_aggregated_effective_volumes(_account_address varcha language sql stable as $$ - select aggregate_objects(volumes_to_jsonb(volumes_with_asset)) - from get_all_account_effective_volumes(_account_address, _before := _before) volumes_with_asset +select aggregate_objects(volumes_to_jsonb(volumes_with_asset)) +from get_all_account_effective_volumes(_account_address, _before := _before) volumes_with_asset $$; create function get_account_aggregated_volumes(_account_address varchar, _before timestamp default null) @@ -583,8 +641,8 @@ create function get_account_aggregated_volumes(_account_address varchar, _before stable parallel safe as $$ - select aggregate_objects(volumes_to_jsonb(volumes_with_asset)) - from get_all_account_volumes(_account_address, _before := _before) volumes_with_asset +select aggregate_objects(volumes_to_jsonb(volumes_with_asset)) +from get_all_account_volumes(_account_address, _before := _before) volumes_with_asset $$; create function get_account_balance(_account varchar, _asset varchar, _before timestamp default null) @@ -592,11 +650,11 @@ create function get_account_balance(_account varchar, _asset varchar, _before ti language sql stable as $$ - select (post_commit_volumes).inputs - (post_commit_volumes).outputs - from moves s - where (_before is null or s.effective_date <= _before) and s.account_address = _account and s.asset = _asset - order by seq desc - limit 1 +select (post_commit_volumes).inputs - (post_commit_volumes).outputs +from moves s +where (_before is null or s.effective_date <= _before) and s.account_address = _account and s.asset = _asset +order by seq desc +limit 1 $$; create function aggregate_ledger_volumes( @@ -608,46 +666,46 @@ create function aggregate_ledger_volumes( language sql stable as $$ - with - moves as ( - select distinct on (m.account_address, m.asset) m.* - from moves m - where (_before is null or m.effective_date <= _before) and - (_accounts is null or account_address = any(_accounts)) and - (_assets is null or asset = any(_assets)) - order by account_address, asset, m.seq desc - ) - select v.asset, (sum((v.post_commit_effective_volumes).inputs), sum((v.post_commit_effective_volumes).outputs)) - from moves v - group by v.asset +with + moves as ( + select distinct on (m.account_address, m.asset) m.* + from moves m + where (_before is null or m.effective_date <= _before) and + (_accounts is null or account_address = any(_accounts)) and + (_assets is null or asset = any(_assets)) + order by account_address, asset, m.seq desc + ) +select v.asset, (sum((v.post_commit_effective_volumes).inputs), sum((v.post_commit_effective_volumes).outputs)) +from moves v +group by v.asset $$; -create function get_aggregated_effective_volumes_for_transaction(tx transactions) returns jsonb +create function get_aggregated_effective_volumes_for_transaction(tx numeric) returns jsonb stable language sql as $$ select aggregate_objects(jsonb_build_object(data.account_address, data.aggregated)) from ( - select distinct on (move.account_address, move.asset) move.account_address, - volumes_to_jsonb((move.asset, first(move.post_commit_effective_volumes))) as aggregated - from moves move - where move.transaction_id = tx.id - group by move.account_address, move.asset -) data + select distinct on (move.account_address, move.asset) move.account_address, + volumes_to_jsonb((move.asset, first(move.post_commit_effective_volumes))) as aggregated + from moves move + where move.transaction_id = tx + group by move.account_address, move.asset + ) data $$; -create function get_aggregated_volumes_for_transaction(tx transactions) returns jsonb +create function get_aggregated_volumes_for_transaction(tx numeric) returns jsonb stable language sql as $$ select aggregate_objects(jsonb_build_object(data.account_address, data.aggregated)) from ( - select distinct on (move.account_address, move.asset) move.account_address, - volumes_to_jsonb((move.asset, first(move.post_commit_volumes))) as aggregated - from moves move - where move.transaction_id = tx.id - group by move.account_address, move.asset -) data + select distinct on (move.account_address, move.asset) move.account_address, + volumes_to_jsonb((move.asset, first(move.post_commit_volumes))) as aggregated + from moves move + where move.transaction_id = tx + group by move.account_address, move.asset + ) data $$; diff --git a/internal/storage/ledgerstore/store_benchmarks_test.go b/internal/storage/ledgerstore/store_benchmarks_test.go index 4924286d5..0ce316603 100644 --- a/internal/storage/ledgerstore/store_benchmarks_test.go +++ b/internal/storage/ledgerstore/store_benchmarks_test.go @@ -410,6 +410,7 @@ func benchmarksReadAccounts(b *testing.B, ctx context.Context, store *Store, pit if !t.allowEmptyResponse && len(ret.Data) == 0 { require.Fail(b, "response should not be empty") } + } explainRequest(ctx, b, func(ctx context.Context) { diff --git a/internal/storage/ledgerstore/transactions.go b/internal/storage/ledgerstore/transactions.go index 7ef698112..7bb05de33 100644 --- a/internal/storage/ledgerstore/transactions.go +++ b/internal/storage/ledgerstore/transactions.go @@ -153,27 +153,29 @@ func (store *Store) buildTransactionQuery(p PITFilterWithVolumes, query *bun.Sel selectMetadata = selectMetadata.Where("date <= ?", p.PIT) } - query = query. - Table("transactions"). - ColumnExpr("distinct on(transactions.id) transactions.*, transactions_metadata.metadata"). - Join(fmt.Sprintf(`left join lateral (%s) as transactions_metadata on true`, selectMetadata.String())) + query = query.Table("transactions") if p.PIT != nil && !p.PIT.IsZero() { query = query. Where("timestamp <= ?", p.PIT). + ColumnExpr("distinct on(transactions.id) transactions.*"). + Column("transactions_metadata.metadata"). + Join(fmt.Sprintf(`left join lateral (%s) as transactions_metadata on true`, selectMetadata.String())). ColumnExpr(fmt.Sprintf("case when reverted_at is not null and reverted_at > '%s' then null else reverted_at end", p.PIT.Format(ledger.DateFormat))) + } else { + query = query.Column("transactions.metadata", "transactions.*") } if p.ExpandEffectiveVolumes { - query = query.ColumnExpr("get_aggregated_effective_volumes_for_transaction(transactions) as post_commit_effective_volumes") + query = query.ColumnExpr("get_aggregated_effective_volumes_for_transaction(transactions.id) as post_commit_effective_volumes") } if p.ExpandVolumes { - query = query.ColumnExpr("get_aggregated_volumes_for_transaction(transactions) as post_commit_volumes") + query = query.ColumnExpr("get_aggregated_volumes_for_transaction(transactions.id) as post_commit_volumes") } return query } -func (store *Store) transactionQueryContext(qb query.Builder) (string, []any, error) { +func (store *Store) transactionQueryContext(qb query.Builder, q GetTransactionsQuery) (string, []any, error) { return qb.Build(query.ContextFn(func(key, operator string, value any) (string, []any, error) { switch { @@ -218,7 +220,12 @@ func (store *Store) transactionQueryContext(qb query.Builder) (string, []any, er } match := metadataRegex.FindAllStringSubmatch(key, 3) - return "metadata @> ?", []any{map[string]any{ + key := "metadata" + if q.Options.Options.PIT != nil && !q.Options.Options.PIT.IsZero() { + key = "transactions_metadata.metadata" + } + + return key + " @> ?", []any{map[string]any{ match[0][1]: value, }}, nil default: @@ -246,7 +253,7 @@ func (store *Store) GetTransactions(ctx context.Context, q GetTransactionsQuery) ) if q.Options.QueryBuilder != nil { - where, args, err = store.transactionQueryContext(q.Options.QueryBuilder) + where, args, err = store.transactionQueryContext(q.Options.QueryBuilder, q) if err != nil { return nil, err } @@ -276,7 +283,7 @@ func (store *Store) CountTransactions(ctx context.Context, q GetTransactionsQuer ) if q.Options.QueryBuilder != nil { - where, args, err = store.transactionQueryContext(q.Options.QueryBuilder) + where, args, err = store.transactionQueryContext(q.Options.QueryBuilder, q) if err != nil { return 0, err } diff --git a/internal/storage/ledgerstore/transactions_test.go b/internal/storage/ledgerstore/transactions_test.go index 532001d4e..a32725f1f 100644 --- a/internal/storage/ledgerstore/transactions_test.go +++ b/internal/storage/ledgerstore/transactions_test.go @@ -378,6 +378,7 @@ func TestInsertTransactions(t *testing.T) { tx, err := store.GetTransactionWithVolumes(context.Background(), NewGetTransactionQuery(big.NewInt(0)). WithExpandVolumes()) + require.NoError(t, err) internaltesting.RequireEqual(t, tx1, *tx) }) diff --git a/internal/storage/ledgerstore/utils.go b/internal/storage/ledgerstore/utils.go index e7d09a9cd..794cc7250 100644 --- a/internal/storage/ledgerstore/utils.go +++ b/internal/storage/ledgerstore/utils.go @@ -36,14 +36,14 @@ func fetch[T any](s *Store, ctx context.Context, builders ...func(query *bun.Sel func paginateWithOffset[FILTERS any, RETURN any](s *Store, ctx context.Context, q *paginate.OffsetPaginatedQuery[FILTERS], builders ...func(query *bun.SelectQuery) *bun.SelectQuery) (*api.Cursor[RETURN], error) { - var ret RETURN + //var ret RETURN query := s.db.NewSelect() for _, builder := range builders { query = query.Apply(builder) } - if query.GetModel() == nil && query.GetTableName() == "" { - query = query.Model(ret) - } + //if query.GetModel() == nil && query.GetTableName() == "" { + // query = query.Model(ret) + //} return paginate.UsingOffset[FILTERS, RETURN](ctx, query, *q) }