diff --git a/cmd/buckets_upgrade.go b/cmd/buckets_upgrade.go index 61f578b6e..e48164882 100644 --- a/cmd/buckets_upgrade.go +++ b/cmd/buckets_upgrade.go @@ -30,7 +30,7 @@ func NewBucketUpgrade() *cobra.Command { }() if args[0] == "*" { - return driver.UpgradeAllBuckets(cmd.Context(), make(chan struct{})) + return driver.UpgradeAllBuckets(cmd.Context()) } return driver.UpgradeBucket(cmd.Context(), args[0]) diff --git a/cmd/root.go b/cmd/root.go index e50555cd0..ac9eed7f7 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -2,7 +2,12 @@ package cmd import ( "github.com/formancehq/go-libs/v2/bun/bunmigrate" + "github.com/formancehq/go-libs/v2/logging" "github.com/formancehq/go-libs/v2/service" + "github.com/formancehq/ledger/internal/storage/bucket" + "github.com/formancehq/ledger/internal/storage/driver" + "github.com/formancehq/ledger/internal/storage/ledger" + systemstore "github.com/formancehq/ledger/internal/storage/system" "github.com/uptrace/bun" "github.com/spf13/cobra" @@ -34,17 +39,20 @@ func NewRootCommand() *cobra.Command { root.AddCommand(serve) root.AddCommand(buckets) root.AddCommand(version) - root.AddCommand(bunmigrate.NewDefaultCommand(func(cmd *cobra.Command, _ []string, _ *bun.DB) error { - // todo: use provided db ... - driver, db, err := getDriver(cmd) - if err != nil { + root.AddCommand(bunmigrate.NewDefaultCommand(func(cmd *cobra.Command, _ []string, db *bun.DB) error { + logger := logging.NewDefaultLogger(cmd.OutOrStdout(), service.IsDebug(cmd), false, false) + cmd.SetContext(logging.ContextWithLogger(cmd.Context(), logger)) + + driver := driver.New( + ledger.NewFactory(db), + systemstore.New(db), + bucket.NewDefaultFactory(db), + ) + if err := driver.Initialize(cmd.Context()); err != nil { return err } - defer func() { - _ = db.Close() - }() - return driver.UpgradeAllBuckets(cmd.Context(), make(chan struct{})) + return driver.UpgradeAllBuckets(cmd.Context()) })) root.AddCommand(NewDocsCommand()) diff --git a/deployments/pulumi/main.go b/deployments/pulumi/main.go index 972f227bb..f7121f448 100644 --- a/deployments/pulumi/main.go +++ b/deployments/pulumi/main.go @@ -6,6 +6,7 @@ import ( "github.com/formancehq/ledger/deployments/pulumi/pkg" "github.com/pulumi/pulumi/sdk/v3/go/pulumi" "github.com/pulumi/pulumi/sdk/v3/go/pulumi/config" + "github.com/pulumi/pulumi/sdk/v3/go/pulumix" ) func main() { @@ -35,23 +36,18 @@ func deploy(ctx *pulumi.Context) error { } } - debug, _ := conf.TryBool("debug") - imagePullPolicy, _ := conf.Try("image.pullPolicy") - - replicaCount, _ := conf.TryInt("replicaCount") - experimentalFeatures, _ := conf.TryBool("experimentalFeatures") - _, err = pulumi_ledger.NewComponent(ctx, "ledger", &pulumi_ledger.ComponentArgs{ Namespace: pulumi.String(namespace), Timeout: pulumi.Int(timeout), Tag: pulumi.String(version), - ImagePullPolicy: pulumi.String(imagePullPolicy), + ImagePullPolicy: pulumi.String(conf.Get("image.pullPolicy")), Postgres: pulumi_ledger.PostgresArgs{ URI: pulumi.String(postgresURI), }, - Debug: pulumi.Bool(debug), - ReplicaCount: pulumi.Int(replicaCount), - ExperimentalFeatures: pulumi.Bool(experimentalFeatures), + Debug: pulumi.Bool(conf.GetBool("debug")), + ReplicaCount: pulumi.Int(conf.GetInt("replicaCount")), + ExperimentalFeatures: pulumi.Bool(conf.GetBool("experimentalFeatures")), + Upgrade: pulumix.Val(pulumi_ledger.UpgradeMode(config.Get(ctx, "upgrade-mode"))), }) return err diff --git a/deployments/pulumi/main_test.go b/deployments/pulumi/main_test.go index c3f21130c..5baea2450 100644 --- a/deployments/pulumi/main_test.go +++ b/deployments/pulumi/main_test.go @@ -19,38 +19,59 @@ import ( func TestProgram(t *testing.T) { - ctx := logging.TestingContext() - stackName := "ledger-tests-pulumi-" + uuid.NewString()[:8] + type testCase struct { + name string + config map[string]string + } + for _, tc := range []testCase{ + { + name: "nominal", + config: map[string]string{ + "timeout": "30", + }, + }, + { + name: "upgrade using a job", + config: map[string]string{ + "timeout": "30", + "upgrade-mode": "job", + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ctx := logging.TestingContext() + stackName := "ledger-tests-pulumi-" + uuid.NewString()[:8] - stack, err := auto.UpsertStackInlineSource(ctx, stackName, "ledger-tests-pulumi-postgres", deployPostgres(stackName)) - require.NoError(t, err) + stack, err := auto.UpsertStackInlineSource(ctx, stackName, "ledger-tests-pulumi-postgres", deployPostgres(stackName)) + require.NoError(t, err) - t.Log("Deploy pg stack") - up, err := stack.Up(ctx, optup.ProgressStreams(os.Stdout), optup.ErrorProgressStreams(os.Stderr)) - require.NoError(t, err) + t.Log("Deploy pg stack") + up, err := stack.Up(ctx, optup.ProgressStreams(os.Stdout), optup.ErrorProgressStreams(os.Stderr)) + require.NoError(t, err) - t.Cleanup(func() { - t.Log("Destroy stack") - _, err := stack.Destroy(ctx, optdestroy.Remove(), optdestroy.ProgressStreams(os.Stdout), optdestroy.ErrorProgressStreams(os.Stderr)) - require.NoError(t, err) - }) + t.Cleanup(func() { + t.Log("Destroy stack") + _, err := stack.Destroy(ctx, optdestroy.Remove(), optdestroy.ProgressStreams(os.Stdout), optdestroy.ErrorProgressStreams(os.Stderr)) + require.NoError(t, err) + }) - postgresURI := up.Outputs["uri"].Value.(string) + postgresURI := up.Outputs["uri"].Value.(string) - t.Log("Test program") - integration.ProgramTest(t, &integration.ProgramTestOptions{ - Quick: true, - SkipRefresh: true, - Dir: ".", - Config: map[string]string{ - "namespace": stackName, - "postgres.uri": postgresURI, - "timeout": "30", - }, - Stdout: os.Stdout, - Stderr: os.Stderr, - Verbose: testing.Verbose(), - }) + tc.config["postgres.uri"] = postgresURI + tc.config["namespace"] = stackName + + t.Log("Test program") + integration.ProgramTest(t, &integration.ProgramTestOptions{ + Quick: true, + SkipRefresh: true, + Dir: ".", + Config: tc.config, + Stdout: os.Stdout, + Stderr: os.Stderr, + Verbose: testing.Verbose(), + }) + }) + } } func deployPostgres(stackName string) func(ctx *pulumi.Context) error { diff --git a/deployments/pulumi/pkg/component.go b/deployments/pulumi/pkg/component.go index 5964c6855..e8e7b7acc 100644 --- a/deployments/pulumi/pkg/component.go +++ b/deployments/pulumi/pkg/component.go @@ -15,6 +15,14 @@ import ( var ErrPostgresURIRequired = fmt.Errorf("postgresURI is required") +type UpgradeMode string + +const ( + UpgradeModeDisabled UpgradeMode = "disabled" + UpgradeModeJob UpgradeMode = "job" + UpgradeModeInApp UpgradeMode = "in-app" +) + type Component struct { pulumi.ResourceState @@ -22,7 +30,6 @@ type Component struct { ServiceNamespace pulumix.Output[string] ServicePort pulumix.Output[int] ServiceInternalURL pulumix.Output[string] - Migrations pulumix.Output[*batchv1.Job] } type PostgresArgs struct { @@ -73,13 +80,12 @@ type ComponentArgs struct { Debug pulumix.Input[bool] ReplicaCount pulumix.Input[int] GracePeriod pulumix.Input[string] - AutoUpgrade pulumix.Input[bool] - WaitUpgrade pulumix.Input[bool] BallastSizeInBytes pulumix.Input[int] NumscriptCacheMaxCount pulumix.Input[int] BulkMaxSize pulumix.Input[int] BulkParallel pulumix.Input[int] TerminationGracePeriodSeconds pulumix.Input[*int] + Upgrade pulumix.Input[UpgradeMode] ExperimentalFeatures pulumix.Input[bool] ExperimentalNumscriptInterpreter pulumix.Input[bool] @@ -129,14 +135,29 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts .. } ledgerImage := pulumi.Sprintf("ghcr.io/formancehq/ledger:%s", tag) - autoUpgrade := pulumix.Val(true) - if args.AutoUpgrade != nil { - autoUpgrade = args.AutoUpgrade.ToOutput(ctx.Context()) + upgradeMode := UpgradeModeInApp + if args.Upgrade != nil { + var ( + upgradeModeChan = make(chan UpgradeMode, 1) + ) + pulumix.ApplyErr(args.Upgrade, func(upgradeMode UpgradeMode) (any, error) { + upgradeModeChan <- upgradeMode + close(upgradeModeChan) + return nil, nil + }) + + select { + case <-ctx.Context().Done(): + return nil, ctx.Context().Err() + case upgradeMode = <-upgradeModeChan: + if upgradeMode == "" { + upgradeMode = UpgradeModeInApp + } + } } - waitUpgrade := pulumix.Val(true) - if args.WaitUpgrade != nil { - waitUpgrade = args.WaitUpgrade.ToOutput(ctx.Context()) + if upgradeMode != "" && upgradeMode != UpgradeModeDisabled && upgradeMode != UpgradeModeJob && upgradeMode != UpgradeModeInApp { + return nil, fmt.Errorf("invalid upgrade mode: %s", upgradeMode) } imagePullPolicy := pulumix.Val("") @@ -351,18 +372,10 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts .. }) } - if args.AutoUpgrade != nil { + if upgradeMode == UpgradeModeInApp { envVars = append(envVars, corev1.EnvVarArgs{ - Name: pulumi.String("AUTO_UPGRADE"), - Value: pulumix.Apply2Err(autoUpgrade, waitUpgrade, func(autoUpgrade, waitUpgrade bool) (string, error) { - if waitUpgrade && !autoUpgrade { - return "", fmt.Errorf("waitUpgrade requires autoUpgrade to be true") - } - if !autoUpgrade { - return "false", nil - } - return "true", nil - }).Untyped().(pulumi.StringOutput), + Name: pulumi.String("AUTO_UPGRADE"), + Value: pulumi.String("true"), }) } @@ -472,7 +485,8 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts .. Port: pulumi.String("http"), }, FailureThreshold: pulumi.Int(1), - PeriodSeconds: pulumi.Int(10), + PeriodSeconds: pulumi.Int(60), + TimeoutSeconds: pulumi.IntPtr(3), }, ReadinessProbe: corev1.ProbeArgs{ HttpGet: corev1.HTTPGetActionArgs{ @@ -480,15 +494,17 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts .. Port: pulumi.String("http"), }, FailureThreshold: pulumi.Int(1), - PeriodSeconds: pulumi.Int(10), + PeriodSeconds: pulumi.Int(60), + TimeoutSeconds: pulumi.IntPtr(3), }, StartupProbe: corev1.ProbeArgs{ HttpGet: corev1.HTTPGetActionArgs{ Path: pulumi.String("/_healthcheck"), Port: pulumi.String("http"), }, - FailureThreshold: pulumi.Int(60), - PeriodSeconds: pulumi.Int(5), + PeriodSeconds: pulumi.Int(5), + InitialDelaySeconds: pulumi.IntPtr(2), + TimeoutSeconds: pulumi.IntPtr(3), }, Env: envVars, }, @@ -501,11 +517,8 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts .. return nil, err } - cmp.Migrations = pulumix.ApplyErr(waitUpgrade, func(waitUpgrade bool) (*batchv1.Job, error) { - if !waitUpgrade { - return nil, nil - } - return batchv1.NewJob(ctx, "wait-migration-completion", &batchv1.JobArgs{ + if upgradeMode == UpgradeModeJob { + _, err = batchv1.NewJob(ctx, "migrate", &batchv1.JobArgs{ Metadata: &metav1.ObjectMetaArgs{ Namespace: namespace.Untyped().(pulumi.StringOutput), }, @@ -515,7 +528,7 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts .. RestartPolicy: pulumi.String("OnFailure"), Containers: corev1.ContainerArray{ corev1.ContainerArgs{ - Name: pulumi.String("check"), + Name: pulumi.String("migrate"), Args: pulumi.StringArray{ pulumi.String("migrate"), }, @@ -537,7 +550,10 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts .. }, }, }, pulumi.Parent(cmp)) - }) + if err != nil { + return nil, err + } + } service, err := corev1.NewService(ctx, "ledger", &corev1.ServiceArgs{ Metadata: &metav1.ObjectMetaArgs{ diff --git a/docs/api/README.md b/docs/api/README.md index 52cc516f7..6dee2686e 100644 --- a/docs/api/README.md +++ b/docs/api/README.md @@ -597,7 +597,10 @@ Accept: application/json } } } - ] + ], + "errorCode": "VALIDATION", + "errorMessage": "[VALIDATION] invalid 'cursor' query param", + "details": "https://play.numscript.org/?payload=eyJlcnJvciI6ImFjY291bnQgaGFkIGluc3VmZmljaWVudCBmdW5kcyJ9" } ``` @@ -3244,7 +3247,7 @@ Authorization ( Scopes: ledger:write ) |*anonymous*|INTERPRETER_PARSE| |*anonymous*|INTERPRETER_RUNTIME| |*anonymous*|LEDGER_ALREADY_EXISTS| -|*anonymous*|BUCKET_OUTDATED| +|*anonymous*|OUTDATED_SCHEMA|

V2LedgerInfoResponse

@@ -3788,16 +3791,28 @@ and } } } - ] + ], + "errorCode": "VALIDATION", + "errorMessage": "[VALIDATION] invalid 'cursor' query param", + "details": "https://play.numscript.org/?payload=eyJlcnJvciI6ImFjY291bnQgaGFkIGluc3VmZmljaWVudCBmdW5kcyJ9" } ``` ### Properties +allOf + +|Name|Type|Required|Restrictions|Description| +|---|---|---|---|---| +|*anonymous*|object|false|none|none| +|ยป data|[[V2BulkElementResult](#schemav2bulkelementresult)]|false|none|none| + +and + |Name|Type|Required|Restrictions|Description| |---|---|---|---|---| -|data|[[V2BulkElementResult](#schemav2bulkelementresult)]|true|none|none| +|*anonymous*|[V2ErrorResponse](#schemav2errorresponse)|false|none|none|

V2BulkElementResult

diff --git a/internal/api/common/errors.go b/internal/api/common/errors.go index 0e440d6c6..ff8b86e53 100644 --- a/internal/api/common/errors.go +++ b/internal/api/common/errors.go @@ -17,7 +17,6 @@ const ( ErrMetadataOverride = "METADATA_OVERRIDE" ErrBulkSizeExceeded = "BULK_SIZE_EXCEEDED" ErrLedgerAlreadyExists = "LEDGER_ALREADY_EXISTS" - ErrBucketOutdated = "BUCKET_OUTDATED" ErrInterpreterParse = "INTERPRETER_PARSE" ErrInterpreterRuntime = "INTERPRETER_RUNTIME" diff --git a/internal/api/v2/controllers_ledgers_create.go b/internal/api/v2/controllers_ledgers_create.go index 2c59eb0b1..3b7b20898 100644 --- a/internal/api/v2/controllers_ledgers_create.go +++ b/internal/api/v2/controllers_ledgers_create.go @@ -38,7 +38,7 @@ func createLedger(systemController system.Controller) http.HandlerFunc { errors.Is(err, ledger.ErrInvalidBucketName{}): api.BadRequest(w, common.ErrValidation, err) case errors.Is(err, system.ErrBucketOutdated): - api.BadRequest(w, common.ErrBucketOutdated, err) + api.BadRequest(w, common.ErrOutdatedSchema, err) case errors.Is(err, system.ErrLedgerAlreadyExists): api.BadRequest(w, common.ErrLedgerAlreadyExists, err) default: diff --git a/internal/api/v2/controllers_ledgers_create_test.go b/internal/api/v2/controllers_ledgers_create_test.go index 60bcca31c..eb4265715 100644 --- a/internal/api/v2/controllers_ledgers_create_test.go +++ b/internal/api/v2/controllers_ledgers_create_test.go @@ -84,7 +84,7 @@ func TestLedgersCreate(t *testing.T) { expectedBackendCall: true, returnErr: system.ErrBucketOutdated, expectStatusCode: http.StatusBadRequest, - expectErrorCode: common.ErrBucketOutdated, + expectErrorCode: common.ErrOutdatedSchema, }, { name: "unexpected error", diff --git a/internal/storage/bucket/bucket.go b/internal/storage/bucket/bucket.go index ff353a179..48a6b9316 100644 --- a/internal/storage/bucket/bucket.go +++ b/internal/storage/bucket/bucket.go @@ -10,7 +10,7 @@ import ( ) type Bucket interface { - Migrate(ctx context.Context, minimalVersionReached chan struct{}, opts ...migrations.Option) error + Migrate(ctx context.Context, opts ...migrations.Option) error AddLedger(ctx context.Context, ledger ledger.Ledger) error HasMinimalVersion(ctx context.Context) (bool, error) IsUpToDate(ctx context.Context) (bool, error) diff --git a/internal/storage/bucket/default_bucket.go b/internal/storage/bucket/default_bucket.go index 0b3b89409..1b9fb8233 100644 --- a/internal/storage/bucket/default_bucket.go +++ b/internal/storage/bucket/default_bucket.go @@ -38,8 +38,8 @@ func (b *DefaultBucket) IsUpToDate(ctx context.Context) (bool, error) { return GetMigrator(b.db, b.name).IsUpToDate(ctx) } -func (b *DefaultBucket) Migrate(ctx context.Context, minimalVersionReached chan struct{}, options ...migrations.Option) error { - return migrate(ctx, b.tracer, b.db, b.name, minimalVersionReached, options...) +func (b *DefaultBucket) Migrate(ctx context.Context, options ...migrations.Option) error { + return migrate(ctx, b.tracer, b.db, b.name, options...) } func (b *DefaultBucket) HasMinimalVersion(ctx context.Context) (bool, error) { diff --git a/internal/storage/bucket/default_bucket_test.go b/internal/storage/bucket/default_bucket_test.go index 2c81a1111..0bb3b971d 100644 --- a/internal/storage/bucket/default_bucket_test.go +++ b/internal/storage/bucket/default_bucket_test.go @@ -31,5 +31,5 @@ func TestBuckets(t *testing.T) { require.NoError(t, system.Migrate(ctx, db)) b := bucket.NewDefault(db, noop.Tracer{}, name) - require.NoError(t, b.Migrate(ctx, make(chan struct{}))) + require.NoError(t, b.Migrate(ctx)) } diff --git a/internal/storage/bucket/migrations.go b/internal/storage/bucket/migrations.go index 32e02802a..d44e702a3 100644 --- a/internal/storage/bucket/migrations.go +++ b/internal/storage/bucket/migrations.go @@ -24,21 +24,11 @@ func GetMigrator(db *bun.DB, name string, options ...migrations.Option) *migrati return migrator } -func migrate(ctx context.Context, tracer trace.Tracer, db *bun.DB, name string, minimalVersionReached chan struct{}, options ...migrations.Option) error { +func migrate(ctx context.Context, tracer trace.Tracer, db *bun.DB, name string, options ...migrations.Option) error { ctx, span := tracer.Start(ctx, "Migrate bucket") defer span.End() migrator := GetMigrator(db, name, options...) - version, err := migrator.GetLastVersion(ctx) - if err != nil { - if !errors.Is(err, migrations.ErrMissingVersionTable) { - return err - } - } - - if version >= MinimalSchemaVersion { - close(minimalVersionReached) - } for { err := migrator.UpByOne(ctx) @@ -48,15 +38,5 @@ func migrate(ctx context.Context, tracer trace.Tracer, db *bun.DB, name string, } return err } - version++ - - if version >= MinimalSchemaVersion { - select { - case <-minimalVersionReached: - // already closed - default: - close(minimalVersionReached) - } - } } } diff --git a/internal/storage/bucket/migrations/11-make-stateless/up.sql b/internal/storage/bucket/migrations/11-make-stateless/up.sql index 251d1448a..33c37515b 100644 --- a/internal/storage/bucket/migrations/11-make-stateless/up.sql +++ b/internal/storage/bucket/migrations/11-make-stateless/up.sql @@ -4,7 +4,7 @@ create or replace function transaction_date() returns timestamp as $$ declare ret timestamp without time zone; begin - create temporary table if not exists transaction_date on commit drop as + create temporary table if not exists transaction_date on commit delete rows as select statement_timestamp(); select * @@ -16,7 +16,7 @@ create or replace function transaction_date() returns timestamp as $$ ret = statement_timestamp(); insert into transaction_date - select ret; + select ret at time zone 'utc'; end if; return ret at time zone 'utc'; diff --git a/internal/storage/bucket/migrations/16-create-transaction-id-index-on-moves/notes.yaml b/internal/storage/bucket/migrations/16-create-transaction-id-index-on-moves/notes.yaml new file mode 100644 index 000000000..1a8995d82 --- /dev/null +++ b/internal/storage/bucket/migrations/16-create-transaction-id-index-on-moves/notes.yaml @@ -0,0 +1 @@ +name: Create transaction id index on moves diff --git a/internal/storage/bucket/migrations/16-create-transaction-id-index-on-moves/up.sql b/internal/storage/bucket/migrations/16-create-transaction-id-index-on-moves/up.sql new file mode 100644 index 000000000..de13279d0 --- /dev/null +++ b/internal/storage/bucket/migrations/16-create-transaction-id-index-on-moves/up.sql @@ -0,0 +1 @@ +create index concurrently moves_transactions_id on "{{ .Schema }}".moves(transactions_id); \ No newline at end of file diff --git a/internal/storage/bucket/migrations/16-moves-fill-transaction-id/notes.yaml b/internal/storage/bucket/migrations/17-moves-fill-transaction-id/notes.yaml similarity index 100% rename from internal/storage/bucket/migrations/16-moves-fill-transaction-id/notes.yaml rename to internal/storage/bucket/migrations/17-moves-fill-transaction-id/notes.yaml diff --git a/internal/storage/bucket/migrations/16-moves-fill-transaction-id/up.sql b/internal/storage/bucket/migrations/17-moves-fill-transaction-id/up.sql similarity index 83% rename from internal/storage/bucket/migrations/16-moves-fill-transaction-id/up.sql rename to internal/storage/bucket/migrations/17-moves-fill-transaction-id/up.sql index 5c486cf6e..735c891c8 100644 --- a/internal/storage/bucket/migrations/16-moves-fill-transaction-id/up.sql +++ b/internal/storage/bucket/migrations/17-moves-fill-transaction-id/up.sql @@ -1,12 +1,10 @@ do $$ declare - _batch_size integer := 100; + _batch_size integer := 1000; _max integer; begin set search_path = '{{.Schema}}'; - create index moves_transactions_id on moves(transactions_id); - select count(seq) from moves where transactions_id is null @@ -38,7 +36,9 @@ do $$ end loop; alter table moves - alter column transactions_id set not null; + add constraint transactions_id_not_null + check (transactions_id is not null) + not valid; end $$ language plpgsql; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/16-moves-fill-transaction-id/up_tests_after.sql b/internal/storage/bucket/migrations/17-moves-fill-transaction-id/up_tests_after.sql similarity index 100% rename from internal/storage/bucket/migrations/16-moves-fill-transaction-id/up_tests_after.sql rename to internal/storage/bucket/migrations/17-moves-fill-transaction-id/up_tests_after.sql diff --git a/internal/storage/bucket/migrations/16-moves-fill-transaction-id/up_tests_before.sql b/internal/storage/bucket/migrations/17-moves-fill-transaction-id/up_tests_before.sql similarity index 100% rename from internal/storage/bucket/migrations/16-moves-fill-transaction-id/up_tests_before.sql rename to internal/storage/bucket/migrations/17-moves-fill-transaction-id/up_tests_before.sql diff --git a/internal/storage/bucket/migrations/17-transactions-fill-inserted-at/notes.yaml b/internal/storage/bucket/migrations/18-transactions-fill-inserted-at/notes.yaml similarity index 100% rename from internal/storage/bucket/migrations/17-transactions-fill-inserted-at/notes.yaml rename to internal/storage/bucket/migrations/18-transactions-fill-inserted-at/notes.yaml diff --git a/internal/storage/bucket/migrations/17-transactions-fill-inserted-at/up.sql b/internal/storage/bucket/migrations/18-transactions-fill-inserted-at/up.sql similarity index 98% rename from internal/storage/bucket/migrations/17-transactions-fill-inserted-at/up.sql rename to internal/storage/bucket/migrations/18-transactions-fill-inserted-at/up.sql index 6adca135f..fdc93ea71 100644 --- a/internal/storage/bucket/migrations/17-transactions-fill-inserted-at/up.sql +++ b/internal/storage/bucket/migrations/18-transactions-fill-inserted-at/up.sql @@ -1,6 +1,6 @@ do $$ declare - _batch_size integer := 100; + _batch_size integer := 1000; _date timestamp without time zone; _count integer := 0; begin diff --git a/internal/storage/bucket/migrations/17-transactions-fill-inserted-at/up_tests_after.sql b/internal/storage/bucket/migrations/18-transactions-fill-inserted-at/up_tests_after.sql similarity index 100% rename from internal/storage/bucket/migrations/17-transactions-fill-inserted-at/up_tests_after.sql rename to internal/storage/bucket/migrations/18-transactions-fill-inserted-at/up_tests_after.sql diff --git a/internal/storage/bucket/migrations/17-transactions-fill-inserted-at/up_tests_before.sql b/internal/storage/bucket/migrations/18-transactions-fill-inserted-at/up_tests_before.sql similarity index 100% rename from internal/storage/bucket/migrations/17-transactions-fill-inserted-at/up_tests_before.sql rename to internal/storage/bucket/migrations/18-transactions-fill-inserted-at/up_tests_before.sql diff --git a/internal/storage/bucket/migrations/18-transactions-fill-pcv/up.sql b/internal/storage/bucket/migrations/18-transactions-fill-pcv/up.sql deleted file mode 100644 index 39dd9e9f4..000000000 --- a/internal/storage/bucket/migrations/18-transactions-fill-pcv/up.sql +++ /dev/null @@ -1,62 +0,0 @@ -do $$ - declare - _batch_size integer := 100; - _count integer; - begin - set search_path = '{{.Schema}}'; - - select count(id) - from transactions - where post_commit_volumes is null - into _count; - - perform pg_notify('migrations-{{ .Schema }}', 'init: ' || _count); - - loop - -- disable triggers - set session_replication_role = replica; - - with _outdated_transactions as ( - select id - from transactions - where post_commit_volumes is null - limit _batch_size - ) - update transactions - set post_commit_volumes = ( - select public.aggregate_objects(post_commit_volumes::jsonb) as post_commit_volumes - from ( - select accounts_address, json_build_object(accounts_address, post_commit_volumes) post_commit_volumes - from ( - select accounts_address, json_build_object(asset, post_commit_volumes) as post_commit_volumes - from ( - select distinct on (accounts_address, asset) - accounts_address, - asset, - first_value(post_commit_volumes) over ( - partition by accounts_address, asset - order by seq desc - ) as post_commit_volumes - from moves - where transactions_id = transactions.id and ledger = transactions.ledger - ) moves - ) values - ) values - ) - from _outdated_transactions - where transactions.id in (_outdated_transactions.id); - - -- enable triggers - set session_replication_role = default; - - exit when not found; - - commit; - - perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size); - end loop; - - alter table transactions - alter column post_commit_volumes set not null; - end -$$; \ No newline at end of file diff --git a/internal/storage/bucket/migrations/18-transactions-fill-pcv/notes.yaml b/internal/storage/bucket/migrations/19-transactions-fill-pcv/notes.yaml similarity index 100% rename from internal/storage/bucket/migrations/18-transactions-fill-pcv/notes.yaml rename to internal/storage/bucket/migrations/19-transactions-fill-pcv/notes.yaml diff --git a/internal/storage/bucket/migrations/19-transactions-fill-pcv/up.sql b/internal/storage/bucket/migrations/19-transactions-fill-pcv/up.sql new file mode 100644 index 000000000..d3cf97488 --- /dev/null +++ b/internal/storage/bucket/migrations/19-transactions-fill-pcv/up.sql @@ -0,0 +1,63 @@ +do $$ + declare + _offset integer := 0; + _batch_size integer := 1000; + begin + set search_path = '{{ .Schema }}'; + + drop table if exists moves_view; + + create temp table moves_view as + select transactions_id::numeric, public.aggregate_objects(json_build_object(accounts_address, json_build_object(asset, post_commit_volumes))::jsonb) as volumes + from ( + SELECT DISTINCT ON (moves.transactions_id, accounts_address, asset) moves.transactions_id, accounts_address, asset, + first_value(post_commit_volumes) OVER ( + PARTITION BY moves.transactions_id, accounts_address, asset + ORDER BY seq DESC + ) AS post_commit_volumes + FROM moves + where insertion_date < ( + select tstamp from goose_db_version where version_id = 12 + ) + ) moves + group by transactions_id; + + perform pg_notify('migrations-{{ .Schema }}', 'init: ' || (select count(*) from moves_view)); + + create index moves_view_idx on moves_view(transactions_id); + + -- disable triggers + set session_replication_role = replica; + + loop + with data as ( + select transactions_id, volumes + from moves_view + -- play better than offset/limit + where transactions_id >= _offset and transactions_id < _offset + _batch_size + ) + update transactions + set post_commit_volumes = data.volumes + from data + where transactions.id = data.transactions_id; + + exit when not found; + + _offset = _offset + _batch_size; + + perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size); + + commit; + end loop; + + -- enable triggers + set session_replication_role = default; + + drop table if exists moves_view; + + alter table transactions + add constraint post_commit_volumes_not_null + check (post_commit_volumes is not null) + not valid; + end +$$; diff --git a/internal/storage/bucket/migrations/18-transactions-fill-pcv/up_tests_after.sql b/internal/storage/bucket/migrations/19-transactions-fill-pcv/up_tests_after.sql similarity index 100% rename from internal/storage/bucket/migrations/18-transactions-fill-pcv/up_tests_after.sql rename to internal/storage/bucket/migrations/19-transactions-fill-pcv/up_tests_after.sql diff --git a/internal/storage/bucket/migrations/18-transactions-fill-pcv/up_tests_before.sql b/internal/storage/bucket/migrations/19-transactions-fill-pcv/up_tests_before.sql similarity index 100% rename from internal/storage/bucket/migrations/18-transactions-fill-pcv/up_tests_before.sql rename to internal/storage/bucket/migrations/19-transactions-fill-pcv/up_tests_before.sql diff --git a/internal/storage/bucket/migrations/19-accounts-volumes-fill-history/notes.yaml b/internal/storage/bucket/migrations/20-accounts-volumes-fill-history/notes.yaml similarity index 100% rename from internal/storage/bucket/migrations/19-accounts-volumes-fill-history/notes.yaml rename to internal/storage/bucket/migrations/20-accounts-volumes-fill-history/notes.yaml diff --git a/internal/storage/bucket/migrations/19-accounts-volumes-fill-history/up.sql b/internal/storage/bucket/migrations/20-accounts-volumes-fill-history/up.sql similarity index 97% rename from internal/storage/bucket/migrations/19-accounts-volumes-fill-history/up.sql rename to internal/storage/bucket/migrations/20-accounts-volumes-fill-history/up.sql index f77f2a0ec..56083891f 100644 --- a/internal/storage/bucket/migrations/19-accounts-volumes-fill-history/up.sql +++ b/internal/storage/bucket/migrations/20-accounts-volumes-fill-history/up.sql @@ -1,7 +1,7 @@ do $$ declare _count integer; - _batch_size integer := 100; + _batch_size integer := 1000; begin set search_path = '{{.Schema}}'; diff --git a/internal/storage/bucket/migrations/19-accounts-volumes-fill-history/up_tests_after.sql b/internal/storage/bucket/migrations/20-accounts-volumes-fill-history/up_tests_after.sql similarity index 100% rename from internal/storage/bucket/migrations/19-accounts-volumes-fill-history/up_tests_after.sql rename to internal/storage/bucket/migrations/20-accounts-volumes-fill-history/up_tests_after.sql diff --git a/internal/storage/bucket/migrations/19-accounts-volumes-fill-history/up_tests_before.sql b/internal/storage/bucket/migrations/20-accounts-volumes-fill-history/up_tests_before.sql similarity index 100% rename from internal/storage/bucket/migrations/19-accounts-volumes-fill-history/up_tests_before.sql rename to internal/storage/bucket/migrations/20-accounts-volumes-fill-history/up_tests_before.sql diff --git a/internal/storage/bucket/migrations/20-transactions-metadata-fill-transaction-id/notes.yaml b/internal/storage/bucket/migrations/21-transactions-metadata-fill-transaction-id/notes.yaml similarity index 100% rename from internal/storage/bucket/migrations/20-transactions-metadata-fill-transaction-id/notes.yaml rename to internal/storage/bucket/migrations/21-transactions-metadata-fill-transaction-id/notes.yaml diff --git a/internal/storage/bucket/migrations/20-transactions-metadata-fill-transaction-id/up.sql b/internal/storage/bucket/migrations/21-transactions-metadata-fill-transaction-id/up.sql similarity index 87% rename from internal/storage/bucket/migrations/20-transactions-metadata-fill-transaction-id/up.sql rename to internal/storage/bucket/migrations/21-transactions-metadata-fill-transaction-id/up.sql index 7823fa915..b0f7d1690 100644 --- a/internal/storage/bucket/migrations/20-transactions-metadata-fill-transaction-id/up.sql +++ b/internal/storage/bucket/migrations/21-transactions-metadata-fill-transaction-id/up.sql @@ -1,7 +1,7 @@ do $$ declare - _batch_size integer := 100; + _batch_size integer := 1000; _count integer; begin set search_path = '{{.Schema}}'; @@ -38,7 +38,9 @@ do $$ end loop; alter table transactions_metadata - alter column transactions_id set not null ; + add constraint transactions_id_not_null + check (transactions_id is not null) + not valid; end $$; diff --git a/internal/storage/bucket/migrations/20-transactions-metadata-fill-transaction-id/up_tests_after.sql b/internal/storage/bucket/migrations/21-transactions-metadata-fill-transaction-id/up_tests_after.sql similarity index 100% rename from internal/storage/bucket/migrations/20-transactions-metadata-fill-transaction-id/up_tests_after.sql rename to internal/storage/bucket/migrations/21-transactions-metadata-fill-transaction-id/up_tests_after.sql diff --git a/internal/storage/bucket/migrations/20-transactions-metadata-fill-transaction-id/up_tests_before.sql b/internal/storage/bucket/migrations/21-transactions-metadata-fill-transaction-id/up_tests_before.sql similarity index 100% rename from internal/storage/bucket/migrations/20-transactions-metadata-fill-transaction-id/up_tests_before.sql rename to internal/storage/bucket/migrations/21-transactions-metadata-fill-transaction-id/up_tests_before.sql diff --git a/internal/storage/bucket/migrations/21-accounts-metadata-fill-address/notes.yaml b/internal/storage/bucket/migrations/22-accounts-metadata-fill-address/notes.yaml similarity index 100% rename from internal/storage/bucket/migrations/21-accounts-metadata-fill-address/notes.yaml rename to internal/storage/bucket/migrations/22-accounts-metadata-fill-address/notes.yaml diff --git a/internal/storage/bucket/migrations/21-accounts-metadata-fill-address/up.sql b/internal/storage/bucket/migrations/22-accounts-metadata-fill-address/up.sql similarity index 86% rename from internal/storage/bucket/migrations/21-accounts-metadata-fill-address/up.sql rename to internal/storage/bucket/migrations/22-accounts-metadata-fill-address/up.sql index 752ef3cfd..10d2a6849 100644 --- a/internal/storage/bucket/migrations/21-accounts-metadata-fill-address/up.sql +++ b/internal/storage/bucket/migrations/22-accounts-metadata-fill-address/up.sql @@ -1,7 +1,7 @@ do $$ declare - _batch_size integer := 100; + _batch_size integer := 1000; _count integer; begin set search_path = '{{.Schema}}'; @@ -38,7 +38,9 @@ do $$ end loop; alter table accounts_metadata - alter column accounts_address set not null ; + add constraint accounts_address_not_null + check (accounts_address is not null) + not valid; end $$; diff --git a/internal/storage/bucket/migrations/21-accounts-metadata-fill-address/up_tests_after.sql b/internal/storage/bucket/migrations/22-accounts-metadata-fill-address/up_tests_after.sql similarity index 100% rename from internal/storage/bucket/migrations/21-accounts-metadata-fill-address/up_tests_after.sql rename to internal/storage/bucket/migrations/22-accounts-metadata-fill-address/up_tests_after.sql diff --git a/internal/storage/bucket/migrations/21-accounts-metadata-fill-address/up_tests_before.sql b/internal/storage/bucket/migrations/22-accounts-metadata-fill-address/up_tests_before.sql similarity index 100% rename from internal/storage/bucket/migrations/21-accounts-metadata-fill-address/up_tests_before.sql rename to internal/storage/bucket/migrations/22-accounts-metadata-fill-address/up_tests_before.sql diff --git a/internal/storage/bucket/migrations/22-logs-fill-memento/notes.yaml b/internal/storage/bucket/migrations/23-logs-fill-memento/notes.yaml similarity index 100% rename from internal/storage/bucket/migrations/22-logs-fill-memento/notes.yaml rename to internal/storage/bucket/migrations/23-logs-fill-memento/notes.yaml diff --git a/internal/storage/bucket/migrations/22-logs-fill-memento/up.sql b/internal/storage/bucket/migrations/23-logs-fill-memento/up.sql similarity index 85% rename from internal/storage/bucket/migrations/22-logs-fill-memento/up.sql rename to internal/storage/bucket/migrations/23-logs-fill-memento/up.sql index 7923084b3..3169d6872 100644 --- a/internal/storage/bucket/migrations/22-logs-fill-memento/up.sql +++ b/internal/storage/bucket/migrations/23-logs-fill-memento/up.sql @@ -1,6 +1,6 @@ do $$ declare - _batch_size integer := 100; + _batch_size integer := 1000; _count integer; begin set search_path = '{{.Schema}}'; @@ -32,7 +32,9 @@ do $$ end loop; alter table logs - alter column memento set not null; + add constraint memento_not_null + check (memento is not null) + not valid; end $$; diff --git a/internal/storage/bucket/migrations/22-logs-fill-memento/up_tests_after.sql b/internal/storage/bucket/migrations/23-logs-fill-memento/up_tests_after.sql similarity index 100% rename from internal/storage/bucket/migrations/22-logs-fill-memento/up_tests_after.sql rename to internal/storage/bucket/migrations/23-logs-fill-memento/up_tests_after.sql diff --git a/internal/storage/bucket/migrations/22-logs-fill-memento/up_tests_before.sql b/internal/storage/bucket/migrations/23-logs-fill-memento/up_tests_before.sql similarity index 100% rename from internal/storage/bucket/migrations/22-logs-fill-memento/up_tests_before.sql rename to internal/storage/bucket/migrations/23-logs-fill-memento/up_tests_before.sql diff --git a/internal/storage/bucket/migrations/23-noop-keep-for-compatibility/notes.yaml b/internal/storage/bucket/migrations/23-noop-keep-for-compatibility/notes.yaml deleted file mode 100644 index ab4b7e1a2..000000000 --- a/internal/storage/bucket/migrations/23-noop-keep-for-compatibility/notes.yaml +++ /dev/null @@ -1 +0,0 @@ -name: Noop, keep for compatibility diff --git a/internal/storage/bucket/migrations/23-noop-keep-for-compatibility/up.sql b/internal/storage/bucket/migrations/23-noop-keep-for-compatibility/up.sql deleted file mode 100644 index e69de29bb..000000000 diff --git a/internal/storage/driver/buckets_generated_test.go b/internal/storage/driver/buckets_generated_test.go index 89d168b38..b71780813 100644 --- a/internal/storage/driver/buckets_generated_test.go +++ b/internal/storage/driver/buckets_generated_test.go @@ -113,9 +113,9 @@ func (mr *MockBucketMockRecorder) IsUpToDate(ctx any) *gomock.Call { } // Migrate mocks base method. -func (m *MockBucket) Migrate(ctx context.Context, minimalVersionReached chan struct{}, opts ...migrations.Option) error { +func (m *MockBucket) Migrate(ctx context.Context, opts ...migrations.Option) error { m.ctrl.T.Helper() - varargs := []any{ctx, minimalVersionReached} + varargs := []any{ctx} for _, a := range opts { varargs = append(varargs, a) } @@ -125,9 +125,9 @@ func (m *MockBucket) Migrate(ctx context.Context, minimalVersionReached chan str } // Migrate indicates an expected call of Migrate. -func (mr *MockBucketMockRecorder) Migrate(ctx, minimalVersionReached any, opts ...any) *gomock.Call { +func (mr *MockBucketMockRecorder) Migrate(ctx any, opts ...any) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]any{ctx, minimalVersionReached}, opts...) + varargs := append([]any{ctx}, opts...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Migrate", reflect.TypeOf((*MockBucket)(nil).Migrate), varargs...) } diff --git a/internal/storage/driver/driver.go b/internal/storage/driver/driver.go index 7c8b2b337..497bb406e 100644 --- a/internal/storage/driver/driver.go +++ b/internal/storage/driver/driver.go @@ -61,7 +61,6 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto if err := b.Migrate( ctx, - make(chan struct{}), migrations.WithLockRetryInterval(d.migratorLockRetryInterval), ); err != nil { return nil, fmt.Errorf("migrating bucket: %w", err) @@ -82,6 +81,7 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto } func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) { + // todo: keep the ledger in cache somewhere to avoid read the ledger at each request, maybe in the factory ret, err := d.systemStore.GetLedger(ctx, name) if err != nil { return nil, nil, err @@ -159,20 +159,17 @@ func (d *Driver) GetLedger(ctx context.Context, name string) (*ledger.Ledger, er func (d *Driver) UpgradeBucket(ctx context.Context, name string) error { return d.bucketFactory.Create(name).Migrate( ctx, - make(chan struct{}), migrations.WithLockRetryInterval(d.migratorLockRetryInterval), ) } -func (d *Driver) UpgradeAllBuckets(ctx context.Context, minimalVersionReached chan struct{}) error { +func (d *Driver) UpgradeAllBuckets(ctx context.Context) error { buckets, err := d.systemStore.GetDistinctBuckets(ctx) if err != nil { return fmt.Errorf("getting distinct buckets: %w", err) } - sem := make(chan struct{}, len(buckets)) - wp := pond.New(d.parallelBucketMigrations, len(buckets), pond.Context(ctx)) for _, bucketName := range buckets { @@ -182,18 +179,13 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context, minimalVersionReached ch }) b := d.bucketFactory.Create(bucketName) - // copy semaphore to be able to nil it - sem := sem - l: for { - minimalVersionReached := make(chan struct{}) errChan := make(chan error, 1) go func() { logger.Infof("Upgrading...") errChan <- b.Migrate( logging.ContextWithLogger(ctx, logger), - minimalVersionReached, migrations.WithLockRetryInterval(d.migratorLockRetryInterval), ) }() @@ -213,40 +205,45 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context, minimalVersionReached ch return } } - if sem != nil { - logger.Infof("Reached minimal workable version") - sem <- struct{}{} - } logger.Info("Upgrade terminated") return - case <-minimalVersionReached: - minimalVersionReached = nil - if sem != nil { - logger.Infof("Reached minimal workable version") - sem <- struct{}{} - sem = nil - } } } } }) } - for i := 0; i < len(buckets); i++ { - select { - case <-ctx.Done(): - return ctx.Err() - case <-sem: - } + wp.StopAndWait() + + return nil +} + +func (d *Driver) HasReachMinimalVersion(ctx context.Context) (bool, error) { + isUpToDate, err := d.systemStore.IsUpToDate(ctx) + if err != nil { + return false, fmt.Errorf("checking if system store is up to date: %w", err) + } + if !isUpToDate { + return false, nil } - logging.FromContext(ctx).Infof("All buckets have reached minimal workable version") - close(minimalVersionReached) + buckets, err := d.systemStore.GetDistinctBuckets(ctx) + if err != nil { + return false, fmt.Errorf("getting distinct buckets: %w", err) + } - wp.StopAndWait() + for _, b := range buckets { + hasMinimalVersion, err := d.bucketFactory.Create(b).HasMinimalVersion(ctx) + if err != nil { + return false, fmt.Errorf("checking if bucket '%s' is up to date: %w", b, err) + } + if !hasMinimalVersion { + return false, nil + } + } - return nil + return true, nil } func New( diff --git a/internal/storage/driver/driver_test.go b/internal/storage/driver/driver_test.go index 615cb475e..ea18bcafd 100644 --- a/internal/storage/driver/driver_test.go +++ b/internal/storage/driver/driver_test.go @@ -51,16 +51,15 @@ func TestUpgradeAllLedgers(t *testing.T) { Return(bucket) bucket.EXPECT(). - Migrate(gomock.Any(), gomock.Any(), gomock.Any()). - DoAndReturn(func(ctx context.Context, minimalVersionReached chan struct{}, opts ...migrations.Option) error { - close(minimalVersionReached) + Migrate(gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, opts ...migrations.Option) error { return nil }) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) t.Cleanup(cancel) - require.NoError(t, d.UpgradeAllBuckets(ctx, make(chan struct{}))) + require.NoError(t, d.UpgradeAllBuckets(ctx)) }) t.Run("with concurrent buckets", func(t *testing.T) { @@ -92,9 +91,8 @@ func TestUpgradeAllLedgers(t *testing.T) { Return(bucket) bucket.EXPECT(). - Migrate(gomock.Any(), gomock.Any(), gomock.Any()). - DoAndReturn(func(ctx context.Context, minimalVersionReached chan struct{}, opts ...migrations.Option) error { - close(minimalVersionReached) + Migrate(gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, opts ...migrations.Option) error { return nil }) } @@ -106,7 +104,7 @@ func TestUpgradeAllLedgers(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 2*time.Second) t.Cleanup(cancel) - require.NoError(t, d.UpgradeAllBuckets(ctx, make(chan struct{}))) + require.NoError(t, d.UpgradeAllBuckets(ctx)) }) t.Run("and error", func(t *testing.T) { @@ -124,7 +122,6 @@ func TestUpgradeAllLedgers(t *testing.T) { bucket1 := driver.NewMockBucket(ctrl) bucket2 := driver.NewMockBucket(ctrl) bucketList := []string{"bucket1", "bucket2"} - allBucketsMinimalVersionReached := make(chan struct{}) bucketFactory.EXPECT(). Create(gomock.AnyOf( @@ -141,10 +138,9 @@ func TestUpgradeAllLedgers(t *testing.T) { bucket1MigrationStarted := make(chan struct{}) bucket1.EXPECT(). - Migrate(gomock.Any(), gomock.Any(), gomock.Any()). + Migrate(gomock.Any(), gomock.Any()). AnyTimes(). - DoAndReturn(func(ctx context.Context, minimalVersionReached chan struct{}, opts ...migrations.Option) error { - close(minimalVersionReached) + DoAndReturn(func(ctx context.Context, opts ...migrations.Option) error { close(bucket1MigrationStarted) return nil @@ -152,9 +148,9 @@ func TestUpgradeAllLedgers(t *testing.T) { firstCall := true bucket2.EXPECT(). - Migrate(gomock.Any(), gomock.Any(), gomock.Any()). + Migrate(gomock.Any(), gomock.Any()). AnyTimes(). - DoAndReturn(func(ctx context.Context, minimalVersionReached chan struct{}, opts ...migrations.Option) error { + DoAndReturn(func(ctx context.Context, opts ...migrations.Option) error { select { case <-ctx.Done(): return ctx.Err() @@ -163,7 +159,6 @@ func TestUpgradeAllLedgers(t *testing.T) { firstCall = false return errors.New("unknown error") } - close(minimalVersionReached) return nil } }) @@ -177,7 +172,7 @@ func TestUpgradeAllLedgers(t *testing.T) { t.Cleanup(cancel) bucket1MigrationStarted = make(chan struct{}) - err := d.UpgradeAllBuckets(ctx, allBucketsMinimalVersionReached) + err := d.UpgradeAllBuckets(ctx) require.NoError(t, err) }) }) @@ -214,7 +209,7 @@ func TestLedgersCreate(t *testing.T) { Return(false, nil) bucket.EXPECT(). - Migrate(gomock.Any(), gomock.Any(), gomock.Any()). + Migrate(gomock.Any(), gomock.Any()). Return(nil) bucket.EXPECT(). diff --git a/internal/storage/driver/system_generated_test.go b/internal/storage/driver/system_generated_test.go index e646d8a75..d6afce573 100644 --- a/internal/storage/driver/system_generated_test.go +++ b/internal/storage/driver/system_generated_test.go @@ -116,6 +116,21 @@ func (mr *SystemStoreMockRecorder) GetMigrator(options ...any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMigrator", reflect.TypeOf((*SystemStore)(nil).GetMigrator), options...) } +// IsUpToDate mocks base method. +func (m *SystemStore) IsUpToDate(ctx context.Context) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsUpToDate", ctx) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IsUpToDate indicates an expected call of IsUpToDate. +func (mr *SystemStoreMockRecorder) IsUpToDate(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUpToDate", reflect.TypeOf((*SystemStore)(nil).IsUpToDate), ctx) +} + // ListLedgers mocks base method. func (m *SystemStore) ListLedgers(ctx context.Context, q ledger0.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error) { m.ctrl.T.Helper() diff --git a/internal/storage/ledger/legacy/main_test.go b/internal/storage/ledger/legacy/main_test.go index 364414da3..392ea3750 100644 --- a/internal/storage/ledger/legacy/main_test.go +++ b/internal/storage/ledger/legacy/main_test.go @@ -69,7 +69,7 @@ func newLedgerStore(t T) *testStore { l := ledger.MustNewWithDefault(ledgerName) b := bucket.NewDefault(db, noop.Tracer{}, ledger.DefaultBucket) - require.NoError(t, b.Migrate(ctx, make(chan struct{}))) + require.NoError(t, b.Migrate(ctx)) require.NoError(t, b.AddLedger(ctx, l)) return &testStore{ diff --git a/internal/storage/module.go b/internal/storage/module.go index 29597c847..fd1fe1a28 100644 --- a/internal/storage/module.go +++ b/internal/storage/module.go @@ -2,23 +2,43 @@ package storage import ( "context" + "errors" + "github.com/formancehq/go-libs/v2/health" "github.com/formancehq/go-libs/v2/logging" "github.com/formancehq/ledger/internal/storage/driver" "go.uber.org/fx" ) +const HealthCheckName = `storage-driver-up-to-date` + func NewFXModule(autoUpgrade bool) fx.Option { ret := []fx.Option{ driver.NewFXModule(), + health.ProvideHealthCheck(func(driver *driver.Driver) health.NamedCheck { + hasReachedMinimalVersion := false + return health.NewNamedCheck(HealthCheckName, health.CheckFn(func(ctx context.Context) error { + if hasReachedMinimalVersion { + return nil + } + var err error + hasReachedMinimalVersion, err = driver.HasReachMinimalVersion(ctx) + if err != nil { + return err + } + if !hasReachedMinimalVersion { + return errors.New("storage driver is not up to date") + } + return nil + })) + }), } if autoUpgrade { ret = append(ret, fx.Invoke(func(lc fx.Lifecycle, driver *driver.Driver) { var ( - upgradeContext context.Context - cancelContext func() - upgradeStopped = make(chan struct{}) - minimalVersionReached = make(chan struct{}) + upgradeContext context.Context + cancelContext func() + upgradeStopped = make(chan struct{}) ) lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { @@ -26,17 +46,12 @@ func NewFXModule(autoUpgrade bool) fx.Option { go func() { defer close(upgradeStopped) - if err := driver.UpgradeAllBuckets(upgradeContext, minimalVersionReached); err != nil { + if err := driver.UpgradeAllBuckets(upgradeContext); err != nil { logging.FromContext(ctx).Errorf("failed to upgrade all buckets: %v", err) } }() - select { - case <-ctx.Done(): - return ctx.Err() - case <-minimalVersionReached: - return nil - } + return nil }, OnStop: func(ctx context.Context) error { cancelContext() @@ -52,4 +67,4 @@ func NewFXModule(autoUpgrade bool) fx.Option { ) } return fx.Options(ret...) -} \ No newline at end of file +} diff --git a/internal/storage/system/store.go b/internal/storage/system/store.go index 1e32bad2c..4ebec7f79 100644 --- a/internal/storage/system/store.go +++ b/internal/storage/system/store.go @@ -24,6 +24,7 @@ type Store interface { Migrate(ctx context.Context, options ...migrations.Option) error GetMigrator(options ...migrations.Option) *migrations.Migrator + IsUpToDate(ctx context.Context) (bool, error) } const ( @@ -34,6 +35,10 @@ type DefaultStore struct { db *bun.DB } +func (d *DefaultStore) IsUpToDate(ctx context.Context) (bool, error) { + return d.GetMigrator().IsUpToDate(ctx) +} + func (d *DefaultStore) GetDistinctBuckets(ctx context.Context) ([]string, error) { var buckets []string err := d.db.NewSelect(). diff --git a/openapi.yaml b/openapi.yaml index 7e769bd79..f91ba4288 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -3492,7 +3492,7 @@ components: - INTERPRETER_PARSE - INTERPRETER_RUNTIME - LEDGER_ALREADY_EXISTS - - BUCKET_OUTDATED + - OUTDATED_SCHEMA example: VALIDATION V2LedgerInfoResponse: type: object @@ -3633,11 +3633,13 @@ components: - targetType - key V2BulkResponse: - properties: - data: - type: array - items: - $ref: '#/components/schemas/V2BulkElementResult' + allOf: + - properties: + data: + type: array + items: + $ref: '#/components/schemas/V2BulkElementResult' + - $ref: '#/components/schemas/V2ErrorResponse' type: object required: - data diff --git a/openapi/v2.yaml b/openapi/v2.yaml index 0c8c82d8d..4d879e322 100644 --- a/openapi/v2.yaml +++ b/openapi/v2.yaml @@ -1770,7 +1770,7 @@ components: - INTERPRETER_PARSE - INTERPRETER_RUNTIME - LEDGER_ALREADY_EXISTS - - BUCKET_OUTDATED + - OUTDATED_SCHEMA example: VALIDATION V2LedgerInfoResponse: type: object @@ -1911,11 +1911,13 @@ components: - targetType - key V2BulkResponse: - properties: - data: - type: array - items: - $ref: "#/components/schemas/V2BulkElementResult" + allOf: + - properties: + data: + type: array + items: + $ref: "#/components/schemas/V2BulkElementResult" + - $ref: "#/components/schemas/V2ErrorResponse" type: object required: - data diff --git a/pkg/client/.speakeasy/gen.yaml b/pkg/client/.speakeasy/gen.yaml index bfd90d79b..584f2ddbd 100644 --- a/pkg/client/.speakeasy/gen.yaml +++ b/pkg/client/.speakeasy/gen.yaml @@ -15,7 +15,7 @@ generation: auth: oAuth2ClientCredentialsEnabled: true go: - version: 0.5.0 + version: 0.5.1 additionalDependencies: {} allowUnknownFieldsInWeakUnions: false clientServerStatusCodesAsErrors: true diff --git a/pkg/client/docs/models/components/v2bulkresponse.md b/pkg/client/docs/models/components/v2bulkresponse.md index 64c495660..3d9269f2c 100644 --- a/pkg/client/docs/models/components/v2bulkresponse.md +++ b/pkg/client/docs/models/components/v2bulkresponse.md @@ -3,6 +3,9 @@ ## Fields -| Field | Type | Required | Description | -| ---------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------- | -| `Data` | [][components.V2BulkElementResult](../../models/components/v2bulkelementresult.md) | :heavy_check_mark: | N/A | \ No newline at end of file +| Field | Type | Required | Description | Example | +| -------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------- | +| `Data` | [][components.V2BulkElementResult](../../models/components/v2bulkelementresult.md) | :heavy_check_mark: | N/A | | +| `ErrorCode` | [components.V2ErrorsEnum](../../models/components/v2errorsenum.md) | :heavy_check_mark: | N/A | VALIDATION | +| `ErrorMessage` | *string* | :heavy_check_mark: | N/A | [VALIDATION] invalid 'cursor' query param | +| `Details` | **string* | :heavy_minus_sign: | N/A | https://play.numscript.org/?payload=eyJlcnJvciI6ImFjY291bnQgaGFkIGluc3VmZmljaWVudCBmdW5kcyJ9 | \ No newline at end of file diff --git a/pkg/client/formance.go b/pkg/client/formance.go index 65656349a..869df9420 100644 --- a/pkg/client/formance.go +++ b/pkg/client/formance.go @@ -143,9 +143,9 @@ func New(opts ...SDKOption) *Formance { sdkConfiguration: sdkConfiguration{ Language: "go", OpenAPIDocVersion: "v1", - SDKVersion: "0.5.0", + SDKVersion: "0.5.1", GenVersion: "2.384.1", - UserAgent: "speakeasy-sdk/go 0.5.0 2.384.1 v1 github.com/formancehq/ledger/pkg/client", + UserAgent: "speakeasy-sdk/go 0.5.1 2.384.1 v1 github.com/formancehq/ledger/pkg/client", Hooks: hooks.New(), }, } diff --git a/pkg/client/models/components/v2bulkresponse.go b/pkg/client/models/components/v2bulkresponse.go index 06eace58f..940f8cf49 100644 --- a/pkg/client/models/components/v2bulkresponse.go +++ b/pkg/client/models/components/v2bulkresponse.go @@ -3,7 +3,10 @@ package components type V2BulkResponse struct { - Data []V2BulkElementResult `json:"data"` + Data []V2BulkElementResult `json:"data"` + ErrorCode V2ErrorsEnum `json:"errorCode"` + ErrorMessage string `json:"errorMessage"` + Details *string `json:"details,omitempty"` } func (o *V2BulkResponse) GetData() []V2BulkElementResult { @@ -12,3 +15,24 @@ func (o *V2BulkResponse) GetData() []V2BulkElementResult { } return o.Data } + +func (o *V2BulkResponse) GetErrorCode() V2ErrorsEnum { + if o == nil { + return V2ErrorsEnum("") + } + return o.ErrorCode +} + +func (o *V2BulkResponse) GetErrorMessage() string { + if o == nil { + return "" + } + return o.ErrorMessage +} + +func (o *V2BulkResponse) GetDetails() *string { + if o == nil { + return nil + } + return o.Details +} diff --git a/pkg/generate/generator.go b/pkg/generate/generator.go index 11c2c2f43..395e049ea 100644 --- a/pkg/generate/generator.go +++ b/pkg/generate/generator.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/dop251/goja" "github.com/formancehq/go-libs/v2/collectionutils" + "github.com/formancehq/go-libs/v2/pointer" ledger "github.com/formancehq/ledger/internal" "github.com/formancehq/ledger/internal/api/bulking" "github.com/formancehq/ledger/pkg/client" @@ -14,6 +15,7 @@ import ( "github.com/formancehq/ledger/pkg/client/models/operations" "github.com/google/uuid" "math/big" + "net/http" "os" "path/filepath" "time" @@ -141,11 +143,30 @@ func (r Action) Apply(ctx context.Context, client *client.V2, l string) ([]compo response, err := client.CreateBulk(ctx, operations.V2CreateBulkRequest{ Ledger: l, RequestBody: bulkElements, + Atomic: pointer.For(true), }) if err != nil { return nil, fmt.Errorf("creating transaction: %w", err) } + if response.HTTPMeta.Response.StatusCode == http.StatusBadRequest { + return nil, fmt.Errorf( + "unexpected error: %s [%s]", + response.V2BulkResponse.ErrorMessage, + response.V2BulkResponse.ErrorCode, + ) + } + + for _, data := range response.V2BulkResponse.Data { + if data.Type == components.V2BulkElementResultTypeError { + return nil, fmt.Errorf( + "unexpected error: %s [%s]", + data.V2BulkElementResultError.ErrorDescription, + data.V2BulkElementResultError.ErrorCode, + ) + } + } + return response.V2BulkResponse.Data, nil } diff --git a/pkg/testserver/server.go b/pkg/testserver/server.go index c479887a9..d6dae85ab 100644 --- a/pkg/testserver/server.go +++ b/pkg/testserver/server.go @@ -57,11 +57,13 @@ type Logger interface { type Server struct { configuration Configuration logger Logger - httpClient *ledgerclient.Formance + sdkClient *ledgerclient.Formance cancel func() ctx context.Context errorChan chan error id string + httpClient *http.Client + serverURL string } func (s *Server) Start() error { @@ -221,11 +223,14 @@ func (s *Server) Start() error { transport = httpclient.NewDebugHTTPTransport(transport) } - s.httpClient = ledgerclient.New( - ledgerclient.WithServerURL(httpserver.URL(s.ctx)), - ledgerclient.WithClient(&http.Client{ - Transport: transport, - }), + s.httpClient = &http.Client{ + Transport: transport, + } + s.serverURL = httpserver.URL(s.ctx) + + s.sdkClient = ledgerclient.New( + ledgerclient.WithServerURL(s.serverURL), + ledgerclient.WithClient(s.httpClient), ) return nil @@ -255,9 +260,17 @@ func (s *Server) Stop(ctx context.Context) error { } func (s *Server) Client() *ledgerclient.Formance { + return s.sdkClient +} + +func (s *Server) HTTPClient() *http.Client { return s.httpClient } +func (s *Server) ServerURL() string { + return s.serverURL +} + func (s *Server) Restart(ctx context.Context) error { if err := s.Stop(ctx); err != nil { return err diff --git a/test/e2e/app_lifecycle_test.go b/test/e2e/app_lifecycle_test.go index 2fdfcda81..fda433f31 100644 --- a/test/e2e/app_lifecycle_test.go +++ b/test/e2e/app_lifecycle_test.go @@ -5,12 +5,14 @@ package test_suite import ( "context" "database/sql" + "encoding/json" "github.com/formancehq/go-libs/v2/bun/bunconnect" "github.com/formancehq/go-libs/v2/logging" "github.com/formancehq/go-libs/v2/pointer" "github.com/formancehq/go-libs/v2/testing/platform/pgtesting" "github.com/formancehq/go-libs/v2/time" ledger "github.com/formancehq/ledger/internal" + "github.com/formancehq/ledger/internal/storage" "github.com/formancehq/ledger/internal/storage/bucket" "github.com/formancehq/ledger/internal/storage/system" "github.com/formancehq/ledger/pkg/client/models/components" @@ -275,4 +277,15 @@ var _ = Context("Ledger downgrade tests", func() { }) }) }) + + It("should be ok when targeting health check endpoint", func() { + ret, err := testServer.GetValue().HTTPClient().Get(testServer.GetValue().ServerURL() + "/_healthcheck") + Expect(err).To(BeNil()) + + body := make(map[string]interface{}) + Expect(json.NewDecoder(ret.Body).Decode(&body)).To(BeNil()) + Expect(body).To(Equal(map[string]any{ + storage.HealthCheckName: "OK", + })) + }) }) diff --git a/test/migrations/upgrade_test.go b/test/migrations/upgrade_test.go index d43dbf7e0..3884c9deb 100644 --- a/test/migrations/upgrade_test.go +++ b/test/migrations/upgrade_test.go @@ -73,7 +73,7 @@ func TestMigrations(t *testing.T) { driver.WithParallelBucketMigration(1), ) require.NoError(t, driver.Initialize(ctx)) - require.NoError(t, driver.UpgradeAllBuckets(ctx, make(chan struct{}))) + require.NoError(t, driver.UpgradeAllBuckets(ctx)) } func copyDatabase(t *testing.T, dockerPool *docker.Pool, source, destination string) { diff --git a/tools/generator/cmd/root.go b/tools/generator/cmd/root.go index b031f17f4..61dcfd1ea 100644 --- a/tools/generator/cmd/root.go +++ b/tools/generator/cmd/root.go @@ -19,14 +19,7 @@ import ( "strings" ) -var ( - rootCmd = &cobra.Command{ - Use: "generator ", - Short: "Generate data for a ledger. WARNING: This is an experimental tool.", - RunE: run, - Args: cobra.ExactArgs(2), - SilenceUsage: true, - } +const ( parallelFlag = "parallel" ledgerFlag = "ledger" ledgerMetadataFlag = "ledger-metadata" @@ -37,6 +30,18 @@ var ( clientSecretFlag = "client-secret" authUrlFlag = "auth-url" insecureSkipVerifyFlag = "insecure-skip-verify" + httpClientTimeoutFlag = "http-client-timeout" + debugFlag = "debug" +) + +var ( + rootCmd = &cobra.Command{ + Use: "generator ", + Short: "Generate data for a ledger. WARNING: This is an experimental tool.", + RunE: run, + Args: cobra.ExactArgs(2), + SilenceUsage: true, + } ) func init() { @@ -50,6 +55,8 @@ func init() { rootCmd.Flags().String(ledgerBucketFlag, "", "Ledger bucket") rootCmd.Flags().StringSlice(ledgerMetadataFlag, []string{}, "Ledger metadata") rootCmd.Flags().StringSlice(ledgerFeatureFlag, []string{}, "Ledger features") + rootCmd.Flags().Duration(httpClientTimeoutFlag, 0, "HTTP client timeout (default: no timeout)") + rootCmd.Flags().Bool(debugFlag, false, "Enable debug logging") rootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") } @@ -106,7 +113,21 @@ func run(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to get insecureSkipVerify: %w", err) } + httpClientTimeout, err := cmd.Flags().GetDuration(httpClientTimeoutFlag) + if err != nil { + return fmt.Errorf("failed to get http client timeout: %w", err) + } + + debug, err := cmd.Flags().GetBool(debugFlag) + if err != nil { + return fmt.Errorf("failed to get debug: %w", err) + } + + logger := logging.NewDefaultLogger(cmd.OutOrStdout(), debug, false, false) + ctx := logging.ContextWithLogger(cmd.Context(), logger) + httpClient := &http.Client{ + Timeout: httpClientTimeout, Transport: &http.Transport{ MaxIdleConns: vus, MaxConnsPerHost: vus, @@ -138,7 +159,7 @@ func run(cmd *cobra.Command, args []string) error { TokenURL: authUrl + "/oauth/token", Scopes: []string{"ledger:read", "ledger:write"}, }). - Client(context.WithValue(cmd.Context(), oauth2.HTTPClient, httpClient)) + Client(context.WithValue(ctx, oauth2.HTTPClient, httpClient)) } client := ledgerclient.New( @@ -146,8 +167,8 @@ func run(cmd *cobra.Command, args []string) error { ledgerclient.WithClient(httpClient), ) - logging.FromContext(cmd.Context()).Infof("Creating ledger '%s' if not exists", targetedLedger) - _, err = client.Ledger.V2.GetLedger(cmd.Context(), operations.V2GetLedgerRequest{ + logging.FromContext(ctx).Infof("Creating ledger '%s' if not exists", targetedLedger) + _, err = client.Ledger.V2.GetLedger(ctx, operations.V2GetLedgerRequest{ Ledger: targetedLedger, }) if err != nil { @@ -155,7 +176,7 @@ func run(cmd *cobra.Command, args []string) error { if !errors.As(err, &sdkError) || sdkError.ErrorCode != components.V2ErrorsEnumNotFound { return fmt.Errorf("failed to get ledger: %w", err) } - _, err = client.Ledger.V2.CreateLedger(cmd.Context(), operations.V2CreateLedgerRequest{ + _, err = client.Ledger.V2.CreateLedger(ctx, operations.V2CreateLedgerRequest{ Ledger: targetedLedger, V2CreateLedgerRequest: &components.V2CreateLedgerRequest{ Bucket: &ledgerBucket, @@ -171,11 +192,11 @@ func run(cmd *cobra.Command, args []string) error { } } - logging.FromContext(cmd.Context()).Infof("Starting to generate data with %d vus", vus) + logger.Infof("Starting to generate data with %d vus", vus) return generate. NewGeneratorSet(vus, string(fileContent), targetedLedger, client, uint64(untilLogID)). - Run(cmd.Context()) + Run(ctx) } func extractSliceSliceFlag(cmd *cobra.Command, flagName string) (map[string]string, error) {