Skip to content

Commit

Permalink
feat: optimize ledgerv1 (#453)
Browse files Browse the repository at this point in the history
* feat: keep last log in memory

* feat: keep last tx in memory

* feat: add cuckoo filter

* feat: cache volumes

* feat: comment all useless actions

* feat: use cache

* feat: reenable CI

* feat: review
  • Loading branch information
gfyrag authored Aug 18, 2023
1 parent d55966b commit aa3a91f
Show file tree
Hide file tree
Showing 16 changed files with 328 additions and 142 deletions.
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ ARG SEGMENT_WRITE_KEY
WORKDIR /go/src/github.com/numary/ledger
# get deps first so it's cached
COPY . .
RUN CGO_ENABLED=1 GOOS=linux GOARCH=$TARGETARCH \
RUN --mount=type=cache,id=gobuild,target=/root/.cache/go-build \
--mount=type=cache,id=gomodcache,target=/go/pkg/mod \
CGO_ENABLED=1 GOOS=linux GOARCH=$TARGETARCH \
CC=$TARGETARCH-linux-gnu-gcc \
go build -o numary -tags json1,netgo \
-ldflags="-X github.com/numary/ledger/cmd.Version=${VERSION} \
Expand Down
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ require (
github.com/ThreeDotsLabs/watermill-kafka/v2 v2.2.2 // indirect
github.com/ThreeDotsLabs/watermill-nats/v2 v2.0.0 // indirect
github.com/ajg/form v1.5.1 // indirect
github.com/bits-and-blooms/bloom v2.0.3+incompatible // indirect
github.com/bytedance/sonic v1.8.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/containerd/continuity v0.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/docker/cli v20.10.17+incompatible // indirect
github.com/docker/docker v20.10.17+incompatible // indirect
Expand Down Expand Up @@ -115,11 +117,14 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/runc v1.1.3 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/segmentio/backo-go v1.0.1 // indirect
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb // indirect
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 // indirect
github.com/spf13/afero v1.9.3 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
Expand All @@ -129,6 +134,7 @@ require (
github.com/ugorji/go/codec v1.2.10 // indirect
github.com/uptrace/opentelemetry-go-extra/otellogrus v0.1.21 // indirect
github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.21 // indirect
github.com/willf/bitset v1.1.11 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ github.com/antlr/antlr4/runtime/Go/antlr v1.4.10/go.mod h1:F7bn7fEU90QkQ3tnmaTx3
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/bits-and-blooms/bloom v2.0.3+incompatible h1:3ONZFjJoMyfHDil5iCcNkcPJ//PNNo+55RHvPrfUGnY=
github.com/bits-and-blooms/bloom v2.0.3+incompatible/go.mod h1:nEmPH2pqJb3sCXfd7cyDSKC4iPfCAt312JHgNrtnnDE=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bool64/shared v0.1.5 h1:fp3eUhBsrSjNCQPcSdQqZxxh9bBwrYiZ+zOKFkM0/2E=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
Expand Down Expand Up @@ -121,6 +123,8 @@ github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWa
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/docker/cli v20.10.17+incompatible h1:eO2KS7ZFeov5UJeaDmIs1NFEDRf32PaqRpvoEkKBy5M=
Expand Down Expand Up @@ -425,6 +429,8 @@ github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417/go.m
github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI=
github.com/ory/dockertest/v3 v3.9.1 h1:v4dkG+dlu76goxMiTT2j8zV7s4oPPEppKT8K8p2f1kY=
github.com/ory/dockertest/v3 v3.9.1/go.mod h1:42Ir9hmvaAPm0Mgibk6mBPi7SFvTXxEcnztDYOJ//uM=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw=
Expand Down Expand Up @@ -464,12 +470,15 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg=
github.com/segmentio/backo-go v1.0.1 h1:68RQccglxZeyURy93ASB/2kc9QudzgIDexJ927N++y4=
github.com/segmentio/backo-go v1.0.1/go.mod h1:9/Rh6yILuLysoQnZ2oNooD2g7aBnvM7r/fNVxRNWfBc=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb h1:XfLJSPIOUX+osiMraVgIrMR27uMXnRJWGm1+GL8/63U=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg=
github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.9.3 h1:41FoI0fD7OR7mGcKE/aOiLkGreyf8ifIOQmJANWogMk=
github.com/spf13/afero v1.9.3/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y=
Expand Down Expand Up @@ -516,6 +525,8 @@ github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.21/go.mod h1:2MNqrUmDrt5
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/willf/bitset v1.1.11 h1:N7Z7E9UvjW+sGsEl7k/SJrvY2reP1A07MrGuCjIOjRE=
github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
Expand Down Expand Up @@ -975,6 +986,7 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
10 changes: 5 additions & 5 deletions pkg/api/controllers/transaction_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1199,12 +1199,12 @@ func TestTransactions(t *testing.T) {

require.Len(t, cursor.Data, 1)
require.Equal(t, cursor.Data[0].ID, tx3.ID)
rsp = internal.CountTransactions(api, url.Values{
"metadata[priority]": []string{"high"},
rsp = internal.CountTransactions(api, url.Values{
"metadata[priority]": []string{"high"},
})
require.Equal(t, http.StatusOK, rsp.Result().StatusCode)
require.Equal(t, "1", rsp.Header().Get("Count"))
})
require.Equal(t, http.StatusOK, rsp.Result().StatusCode)
require.Equal(t, "1", rsp.Header().Get("Count"))
})

t.Run("after", func(t *testing.T) {
rsp := internal.GetTransactions(api, url.Values{
Expand Down
25 changes: 25 additions & 0 deletions pkg/core/account.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core

import (
"encoding/json"
"fmt"
"regexp"
)
Expand All @@ -14,12 +15,36 @@ type Account struct {
Metadata Metadata `json:"metadata" swaggertype:"object"`
}

func (v Account) Copy() *Account {
data, err := json.Marshal(v)
if err != nil {
panic(err)
}
ret := &Account{}
if err := json.Unmarshal(data, ret); err != nil {
panic(err)
}
return ret
}

type AccountWithVolumes struct {
Account
Volumes AssetsVolumes `json:"volumes"`
Balances AssetsBalances `json:"balances" example:"COIN:100"`
}

func (v AccountWithVolumes) Copy() *AccountWithVolumes {
data, err := json.Marshal(v)
if err != nil {
panic(err)
}
ret := &AccountWithVolumes{}
if err := json.Unmarshal(data, ret); err != nil {
panic(err)
}
return ret
}

const accountPattern = "^[a-zA-Z_]+[a-zA-Z0-9_:]*$"

var accountRegexp = regexp.MustCompile(accountPattern)
Expand Down
6 changes: 3 additions & 3 deletions pkg/ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ func NewLedger(store Store, monitor Monitor, cache *ristretto.Cache, options ...
}

func (l *Ledger) Close(ctx context.Context) error {
if err := l.store.Close(ctx); err != nil {
return errors.Wrap(err, "closing store")
}
//if err := l.store.Close(ctx); err != nil {
// return errors.Wrap(err, "closing store")
//}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ledger/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func TestRevertTransaction(t *testing.T) {

newBal := world.Balances["COIN"]
expectedBal := originalBal.Add(revertAmt)
require.Equalf(t, expectedBal, newBal,
require.Equalf(t, expectedBal.Uint64(), newBal.Uint64(),
"COIN world balances expected %d, got %d", expectedBal, newBal)
})
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/storage/sqlstorage/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ func (s *Store) GetAccounts(ctx context.Context, q ledger.AccountsQuery) (api.Cu
}

func (s *Store) GetAccount(ctx context.Context, addr string) (*core.Account, error) {

entry, ok := s.cache.Get(addr)
if ok {
return entry.(*core.AccountWithVolumes).Account.Copy(), nil
}

sb := sqlbuilder.NewSelectBuilder()
sb.Select("address", "metadata").
From(s.schema.Table("accounts")).
Expand Down Expand Up @@ -265,11 +271,20 @@ func (s *Store) ensureAccountExists(ctx context.Context, account string) error {
return err
}

s.bloom.Add([]byte(account))

_, err = executor.ExecContext(ctx, sqlq, args...)
return s.error(err)
}

func (s *Store) UpdateAccountMetadata(ctx context.Context, address string, metadata core.Metadata, at time.Time) error {

entry, ok := s.cache.Get(address)
if ok {
account := entry.(*core.AccountWithVolumes)
account.Metadata = account.Metadata.Merge(metadata)
}

ib := sqlbuilder.NewInsertBuilder()

metadataData, err := json.Marshal(metadata)
Expand Down Expand Up @@ -311,6 +326,8 @@ func (s *Store) UpdateAccountMetadata(ctx context.Context, address string, metad
return errors.Wrap(err, "reading last log")
}

s.bloom.Add([]byte(address))

return s.appendLog(ctx, core.NewSetMetadataLog(lastLog, at, core.SetMetadata{
TargetType: core.MetaTargetTypeAccount,
TargetID: address,
Expand Down
100 changes: 55 additions & 45 deletions pkg/storage/sqlstorage/aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,72 +8,80 @@ import (
"github.com/huandu/go-sqlbuilder"
"github.com/numary/ledger/pkg/core"
"github.com/numary/ledger/pkg/ledger"
"github.com/patrickmn/go-cache"
)

func (s *Store) GetAccountWithVolumes(ctx context.Context, account string) (*core.AccountWithVolumes, error) {
sb := sqlbuilder.NewSelectBuilder()
sb.Select("accounts.metadata", "volumes.asset", "volumes.input", "volumes.output")
sb.From(s.schema.Table("accounts"))
sb.JoinWithOption(sqlbuilder.LeftOuterJoin,
s.schema.Table("volumes"),
"accounts.address = volumes.account")
sb.Where(sb.E("accounts.address", account))

executor, err := s.executorProvider(ctx)
if err != nil {
return nil, err
}

q, args := sb.BuildWithFlavor(s.schema.Flavor())
rows, err := executor.QueryContext(ctx, q, args...)
if err != nil {
return nil, s.error(err)
func (s *Store) GetAccountWithVolumes(ctx context.Context, address string) (*core.AccountWithVolumes, error) {
account, ok := s.cache.Get(address)
if ok {
return account.(*core.AccountWithVolumes).Copy(), nil
}
defer rows.Close()

acc := core.Account{
Address: core.AccountAddress(account),
Address: core.AccountAddress(address),
Metadata: core.Metadata{},
}
assetsVolumes := core.AssetsVolumes{}

for rows.Next() {
var asset, inputStr, outputStr sql.NullString
if err := rows.Scan(&acc.Metadata, &asset, &inputStr, &outputStr); err != nil {
if s.bloom.Test([]byte(address)) {
sb := sqlbuilder.NewSelectBuilder()
sb.Select("accounts.metadata", "volumes.asset", "volumes.input", "volumes.output")
sb.From(s.schema.Table("accounts"))
sb.JoinWithOption(sqlbuilder.LeftOuterJoin,
s.schema.Table("volumes"),
"accounts.address = volumes.account")
sb.Where(sb.E("accounts.address", address))

executor, err := s.executorProvider(ctx)
if err != nil {
return nil, err
}

q, args := sb.BuildWithFlavor(s.schema.Flavor())
rows, err := executor.QueryContext(ctx, q, args...)
if err != nil {
return nil, s.error(err)
}
defer rows.Close()

if asset.Valid {
assetsVolumes[asset.String] = core.Volumes{
Input: core.NewMonetaryInt(0),
Output: core.NewMonetaryInt(0),
for rows.Next() {
var asset, inputStr, outputStr sql.NullString
if err := rows.Scan(&acc.Metadata, &asset, &inputStr, &outputStr); err != nil {
return nil, s.error(err)
}

if inputStr.Valid {
input, err := core.ParseMonetaryInt(inputStr.String)
if err != nil {
return nil, s.error(err)
}
if asset.Valid {
assetsVolumes[asset.String] = core.Volumes{
Input: input,
Output: assetsVolumes[asset.String].Output,
Input: core.NewMonetaryInt(0),
Output: core.NewMonetaryInt(0),
}
}

if outputStr.Valid {
output, err := core.ParseMonetaryInt(outputStr.String)
if err != nil {
return nil, s.error(err)
if inputStr.Valid {
input, err := core.ParseMonetaryInt(inputStr.String)
if err != nil {
return nil, s.error(err)
}
assetsVolumes[asset.String] = core.Volumes{
Input: input,
Output: assetsVolumes[asset.String].Output,
}
}
assetsVolumes[asset.String] = core.Volumes{
Input: assetsVolumes[asset.String].Input,
Output: output,

if outputStr.Valid {
output, err := core.ParseMonetaryInt(outputStr.String)
if err != nil {
return nil, s.error(err)
}
assetsVolumes[asset.String] = core.Volumes{
Input: assetsVolumes[asset.String].Input,
Output: output,
}
}
}
}
}
if err := rows.Err(); err != nil {
return nil, s.error(err)
if err := rows.Err(); err != nil {
return nil, s.error(err)
}
}

res := &core.AccountWithVolumes{
Expand All @@ -82,6 +90,8 @@ func (s *Store) GetAccountWithVolumes(ctx context.Context, account string) (*cor
}
res.Balances = res.Volumes.Balances()

s.cache.Set(address, res.Copy(), cache.NoExpiration)

return res, nil
}

Expand Down
Loading

0 comments on commit aa3a91f

Please sign in to comment.