diff --git a/README_ydb.md b/README_ydb.md new file mode 100644 index 00000000..6d301762 --- /dev/null +++ b/README_ydb.md @@ -0,0 +1,20 @@ +# YDB-specific configuration for stroppy tests + +To support the authentication methods specific to YDB Managed Service, stroppy uses the additional environment variables when running in *client* mode only: +* `YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS` - the path to the service account key file. When configured, the key file is used to authenticate the connection. +* `YDB_METADATA_CREDENTIALS` - when set to `1`, the service account key associated with the Cloud compute instance is used to authenticate the connection. +* `YDB_ACCESS_TOKEN_CREDENTIALS` - YDB access token. When configured, the access token is passed as is to authenticate the connection. +* `YDB_TLS_CERTIFICATES_FILE` - PEM-encoded file with custom TLS certificate(s) to be used for GRPCS connections. + +In addition, there are the following YDB-specific environment variables: +* `YDB_STROPPY_PARTITIONS_COUNT` - [`AUTO_PARTITIONING_MIN_PARTITIONS_COUNT`](https://ydb.tech/en/docs/concepts/datamodel/table#auto_partitioning_partition_size_mb) setting value for `account` and `transfer` tables. This setting only affects the `pop` operation mode. +* `YDB_STROPPY_PARTITIONS_SIZE` - [`AUTO_PARTITIONING_PARTITION_SIZE_MB`](https://ydb.tech/en/docs/concepts/datamodel/table#auto_partitioning_min_partitions_count) setting value for `account` and `transfer` tables. This setting only affects the `pop` operation mode. +* `YDB_STROPPY_HASH_TRANSFER_ID` - when set to `1`, the actual value of `transfer_id` field in the `transfer` table is replaced with its SHA-1 hash code (Base-64 encoded). This setting only affects the `pay` operation mode. + +Typical "client" operation modes command examples: + +```bash +export YDB_DB='grpc://stroppy:passw0rd@ycydb-d1:2136?database=/Root/testdb' +./stroppy pop --dbtype ydb --url "$YDB_DB" -n 1000000 -w 8000 --run-type client +./stroppy pay --dbtype ydb --url "$YDB_DB" -n 10000000 -w 8000 --run-type client +``` \ No newline at end of file diff --git a/go.mod b/go.mod index 7469a3e6..484b194f 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/tidwall/gjson v1.8.1 github.com/ydb-platform/ydb-go-genproto v0.0.0-20220922065549-66df47a830ba github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2 - github.com/ydb-platform/ydb-go-sdk/v3 v3.38.2 + github.com/ydb-platform/ydb-go-sdk/v3 v3.40.1 github.com/ydb-platform/ydb-kubernetes-operator v0.0.0-20220913070254-59c84ccfc365 github.com/zalando/postgres-operator v1.6.3 github.com/zclconf/go-cty v1.9.0 diff --git a/go.sum b/go.sum index bc5f5510..f37db290 100644 --- a/go.sum +++ b/go.sum @@ -1039,8 +1039,8 @@ github.com/ydb-platform/ydb-go-genproto v0.0.0-20220922065549-66df47a830ba/go.mo github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2 h1:EYSI1kulnHb0H0zt3yOw4cRj4ABMSMGwNe43D+fX7e4= github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2/go.mod h1:Xfjce+VMU9yJVr1lj60yK2fFPWjB4jr/4cp3K7cjzi4= github.com/ydb-platform/ydb-go-sdk/v3 v3.25.3/go.mod h1:PFizF/vJsdAgEwjK3DVSBD52kdmRkWfSIS2q2pA+e88= -github.com/ydb-platform/ydb-go-sdk/v3 v3.38.2 h1:SbrZWu21pteBjZBLDQwJz1LgcJCC6D3kcQurAmpFTF8= -github.com/ydb-platform/ydb-go-sdk/v3 v3.38.2/go.mod h1:SOpOg3nIQ7kT337LvWGNjfdpTQmFEeHXDqY/huSu+Xc= +github.com/ydb-platform/ydb-go-sdk/v3 v3.40.1 h1:6a/yMcXT+ZpybYsGvEpFSsOdQlbqtArycUqY7U1Yd1w= +github.com/ydb-platform/ydb-go-sdk/v3 v3.40.1/go.mod h1:hJqWSE2NZ2o2c9geHtRJee+xwiHgEfQX9koBZPLTfHY= github.com/ydb-platform/ydb-go-yc v0.8.3 h1:92UUUMsfvtMl6mho8eQ9lbkiPrF3a9CT+RrVRAKNRwo= github.com/ydb-platform/ydb-go-yc v0.8.3/go.mod h1:zUolAFGzJ5XG8uwiseTLr9Lapm7L7hdVdZgLSuv9FXE= github.com/ydb-platform/ydb-go-yc-metadata v0.5.2 h1:nMtixUijP0Z7iHJNT9fOL+dbmEzZxqU6Xk87ll7hqXg= diff --git a/pkg/database/cluster/yandex_constans.go b/pkg/database/cluster/yandex_constans.go deleted file mode 100644 index 6eef594b..00000000 --- a/pkg/database/cluster/yandex_constans.go +++ /dev/null @@ -1,82 +0,0 @@ -package cluster - -const ( - yqlInsertAccount = ` -DECLARE $bic AS String; DECLARE $ban AS String; DECLARE $balance AS Int64; -INSERT INTO "&{stroppyDir}/account" (bic, ban, balance) VALUES ($bic, $ban, $balance); -` - - yqlUpsertTransfer = ` -DECLARE $transfer_id AS String; -DECLARE $src_bic AS String; -DECLARE $src_ban AS String; -DECLARE $dst_bic AS String; -DECLARE $dst_ban AS String; -DECLARE $amount AS Int64; -DECLARE $state AS String; -UPSERT INTO "&{stroppyDir}/transfer" ( - transfer_id, - src_bic, - src_ban, - dst_bic, - dst_ban, - amount, - state -) -VALUES ( - $transfer_id, - $src_bic, - $src_ban, - $dst_bic, - $dst_ban, - $amount, - $state -);` - - yqlSelectSrcDstAccount = ` -DECLARE $src_bic AS String; -DECLARE $src_ban AS String; -DECLARE $dst_bic AS String; -DECLARE $dst_ban AS String; -SELECT 1 AS srcdst, balance -FROM "&{stroppyDir}/account" -WHERE bic = $src_bic AND ban = $src_ban -UNION ALL -SELECT 2 AS srcdst, balance -FROM "&{stroppyDir}/account" -WHERE bic = $dst_bic AND ban = $dst_ban; -` - - yqlUpsertSrcDstAccount = ` -DECLARE $src_bic AS String; -DECLARE $src_ban AS String; -DECLARE $dst_bic AS String; -DECLARE $dst_ban AS String; -DECLARE $amount AS Int64; -$shared_select = ( - SELECT - bic, - ban, - balance - $amount AS balance - FROM "&{stroppyDir}/account" - WHERE bic = $src_bic AND ban = $src_ban - UNION ALL - SELECT - bic, - ban, - balance + $amount AS balance - FROM "&{stroppyDir}/account" - WHERE bic = $dst_bic AND ban = $dst_ban -); - -UPDATE "&{stroppyDir}/account" ON -SELECT * FROM $shared_select; -` - - yqlSelectBalanceAccount = ` -DECLARE $bic AS String; DECLARE $ban AS String; -SELECT balance, CAST(0 AS Int64) AS pending -FROM "&{stroppyDir}/account" -WHERE bic = $bic AND ban = $ban -` -) diff --git a/pkg/database/cluster/yandex.go b/pkg/database/cluster/ydb.go similarity index 67% rename from pkg/database/cluster/yandex.go rename to pkg/database/cluster/ydb.go index 9e1e0da7..b6934536 100644 --- a/pkg/database/cluster/yandex.go +++ b/pkg/database/cluster/ydb.go @@ -2,15 +2,18 @@ package cluster import ( "context" + "crypto/sha1" + _ "embed" + "encoding/base64" "fmt" "os" "path" "strconv" "strings" "sync" + "text/template" "time" - "github.com/ansel1/merry/v2" "github.com/google/uuid" "github.com/pkg/errors" llog "github.com/sirupsen/logrus" @@ -22,8 +25,9 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/table/result" "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" "github.com/ydb-platform/ydb-go-sdk/v3/table/types" - "gitlab.com/picodata/stroppy/internal/model" "gopkg.in/inf.v0" + + "gitlab.com/picodata/stroppy/internal/model" ) const ( @@ -32,23 +36,29 @@ const ( stroppyAgent string = "stroppy 1.0" // default operation timeout. defaultTimeout = time.Second * 10 - // partitioning settings for accounts and transfers tables. - partitionsMinCount = 100 - partitionsMaxMbytes = 12 - poolSizeOverhead = 10 + // extra connections in the pool. + poolSizeOverhead = 10 ) -var errIllegalNilOutput = errors.New( - "Illegal nil output value of balance column for srcdst account statement", +var ( + //go:embed ydb_insert_account.yql + yqlInsertAccount string + + //go:embed ydb_transfer.yql + yqlTransfer string + + //go:embed ydb_select_balance_account.yql + yqlSelectBalanceAccount string ) -type YandexDBCluster struct { +type YdbCluster struct { ydbConnection ydb.Connection yqlInsertAccount string - yqlUpsertTransfer string - yqlSelectSrcDstAcc string - yqlUpsertSrcDstAcc string yqlSelectBalanceAcc string + yqlTransferSingleOp string + transferIdHashing bool + partitionsMaxSize int + partitionsMinCount int } func envExists(key string) bool { @@ -65,11 +75,57 @@ func envConfigured() bool { envExists("YDB_ACCESS_TOKEN_CREDENTIALS")) } -func NewYandexDBCluster( +func envTransferIdHashing() bool { + if value, ok := os.LookupEnv("YDB_STROPPY_HASH_TRANSFER_ID"); ok { + if (value == "1") || (value == "Y") { + llog.Infoln("YDB transfer id hashing is ENABLED") + return true + } + } + return false +} + +func envPartitionsMinCount() int { + ret := 300 + if value, ok := os.LookupEnv("YDB_STROPPY_PARTITIONS_COUNT"); ok { + x, err := strconv.Atoi(value) + if err != nil || x <= 0 || x > 10000 { + llog.Warningln("Illegal value [", value, "] passed in YDB_STROPPY_PARTITIONS_COUNT, ignored") + } else { + ret = x + } + } + llog.Infoln("Using YDB minimal partition count ", ret) + return ret +} + +func envPartitionsMaxSize() int { + ret := 512 + if value, ok := os.LookupEnv("YDB_STROPPY_PARTITIONS_SIZE"); ok { + x, err := strconv.Atoi(value) + if err != nil || x <= 0 || x > 10000 { + llog.Warningln("Illegal value [", value, "] passed in YDB_STROPPY_PARTITIONS_SIZE, ignored") + } else { + ret = x + } + } + llog.Infoln("Using YDB maximal partition size ", ret) + return ret +} + +func envTlsCertificateFile() string { + if value, ok := os.LookupEnv("YDB_TLS_CERTIFICATES_FILE"); ok { + return value + } + return "" +} + +func NewYdbCluster( ydbContext context.Context, dbURL string, poolSize uint64, -) (*YandexDBCluster, error) { +) (*YdbCluster, error) { + llog.Infof("YDB Go SDK version %s", ydb.Version) llog.Infof("Establishing connection to YDB on %s with poolSize %d", dbURL, poolSize) var ( @@ -77,54 +133,52 @@ func NewYandexDBCluster( err error ) + options := []ydb.Option{ + ydb.WithUserAgent(stroppyAgent), + ydb.WithSessionPoolSizeLimit(int(poolSize + poolSizeOverhead)), + ydb.WithSessionPoolIdleThreshold(defaultTimeout), + ydb.WithDiscoveryInterval(defaultTimeout), + } if envConfigured() { - llog.Infoln("NOTE: YDB connection credentials are configured through the environment") - - database, err = ydb.Open(ydbContext, dbURL, - ydb.WithUserAgent(stroppyAgent), - ydb.WithSessionPoolSizeLimit(int(poolSize+poolSizeOverhead)), - ydb.WithSessionPoolIdleThreshold(defaultTimeout), - ydb.WithDiscoveryInterval(defaultTimeout), - environ.WithEnvironCredentials(ydbContext), - ) - } else { - database, err = ydb.Open(ydbContext, dbURL, - ydb.WithUserAgent(stroppyAgent), - ydb.WithSessionPoolSizeLimit(int(poolSize+poolSizeOverhead)), - ydb.WithSessionPoolIdleThreshold(defaultTimeout), - ydb.WithDiscoveryInterval(defaultTimeout), - ) + llog.Infoln("YDB connection credentials are configured through the environment") + options = append(options, environ.WithEnvironCredentials(ydbContext)) + } + if tlsCertFile := envTlsCertificateFile(); len(tlsCertFile) > 0 { + llog.Infoln("YDB custom TLS certificate file: ", tlsCertFile) + options = append(options, ydb.WithCertificatesFromFile(tlsCertFile)) } + database, err = ydb.Open(ydbContext, dbURL, options...) if err != nil { - return nil, errors.Wrap(err, "Error creating YDB connection holder") + return nil, errors.Wrap(err, "Failed to create YDB connection") } - return &YandexDBCluster{ + return &YdbCluster{ ydbConnection: database, - yqlUpsertTransfer: expandYql(yqlUpsertTransfer), - yqlSelectSrcDstAcc: expandYql(yqlSelectSrcDstAccount), - yqlUpsertSrcDstAcc: expandYql(yqlUpsertSrcDstAccount), yqlInsertAccount: expandYql(yqlInsertAccount), yqlSelectBalanceAcc: expandYql(yqlSelectBalanceAccount), + yqlTransferSingleOp: expandYql(yqlTransfer), + transferIdHashing: envTransferIdHashing(), + partitionsMaxSize: envPartitionsMaxSize(), + partitionsMinCount: envPartitionsMinCount(), }, nil } -func (*YandexDBCluster) GetClusterType() DBClusterType { +func (*YdbCluster) GetClusterType() DBClusterType { return YandexDBClusterType } var ( - globalYandexDBClusterSettings *Settings - globalYandexDBClusterSettingsMtx sync.Mutex + globalYdbClusterSettings *Settings + globalYdbClusterSettingsMtx sync.Mutex ) -func (ydbCluster *YandexDBCluster) FetchSettings() (Settings, error) { - globalYandexDBClusterSettingsMtx.Lock() - defer globalYandexDBClusterSettingsMtx.Unlock() +func (ydbCluster *YdbCluster) FetchSettings() (Settings, error) { + globalYdbClusterSettingsMtx.Lock() + defer globalYdbClusterSettingsMtx.Unlock() - if globalYandexDBClusterSettings != nil { - return *globalYandexDBClusterSettings, nil + if globalYdbClusterSettings != nil { + return *globalYdbClusterSettings, nil } var ( @@ -134,45 +188,40 @@ func (ydbCluster *YandexDBCluster) FetchSettings() (Settings, error) { defer func() { if err == nil { - globalYandexDBClusterSettings = &clusterSettings + globalYdbClusterSettings = &clusterSettings } }() ydbContext, ctxCloseFn := context.WithCancel(context.Background()) defer ctxCloseFn() - tableFullPath := path.Join(ydbCluster.ydbConnection.Name(), stroppyDir, "settings") + selectStmnt := fmt.Sprintf("SELECT key, value FROM `%s/settings`", stroppyDir) if err = ydbCluster.ydbConnection.Table().Do( ydbContext, func(ydbContext context.Context, ydbSession table.Session) error { - var queryResult result.StreamResult - if queryResult, err = ydbSession.StreamReadTable( - ydbContext, - tableFullPath, - ); err != nil { - return errors.Wrap(err, "failed to reading table in stream") + var rows result.Result + _, rows, err = ydbSession.Execute(ydbContext, table.DefaultTxControl(), selectStmnt, nil) + if err != nil { + return err } - - llog.Traceln("Settings successfully fetched from ydb") - defer func() { - _ = queryResult.Close() + _ = rows.Close() }() var ( key string value string ) - - for queryResult.NextResultSet(ydbContext) { - for queryResult.NextRow() { - if err = queryResult.ScanNamed( + for rows.NextResultSet(ydbContext) { + for rows.NextRow() { + if err = rows.ScanNamed( named.OptionalWithDefault("key", &key), named.OptionalWithDefault("value", &value), ); err != nil { - return errors.Wrap(err, "failed ot scan parameters") + return err } + llog.Tracef("Settings{ key: %s, value: %s }", key, value) switch key { case "count": if clusterSettings.Count, err = strconv.Atoi(value); err != nil { @@ -183,18 +232,9 @@ func (ydbCluster *YandexDBCluster) FetchSettings() (Settings, error) { return errors.Wrap(err, "failed to convert seed into integer") } } - llog.Tracef( - "Settings{ key: %s, value: %s }", - key, - value, - ) } } - if err = queryResult.Err(); err != nil { - return errors.Wrap(err, "failed retrieve query result") - } - return nil }, table.WithIdempotent(), @@ -205,25 +245,35 @@ func (ydbCluster *YandexDBCluster) FetchSettings() (Settings, error) { return clusterSettings, nil } -func (ydbCluster *YandexDBCluster) MakeAtomicTransfer( +func convertTransferId(useHash bool, transferId *model.TransferId) string { + if useHash { + hasher := sha1.New() + hasher.Write(transferId[:]) + return base64.URLEncoding.EncodeToString(hasher.Sum(nil)) + } else { + return transferId.String() + } +} + +func (ydbCluster *YdbCluster) MakeAtomicTransfer( transfer *model.Transfer, //nolint clientID uuid.UUID, -) error { - var err error - - ydbContext, ctxCloseFn := context.WithCancel(context.Background()) - defer ctxCloseFn() +) (err error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() amount := transfer.Amount.UnscaledBig().Int64() - - if err = ydbCluster.ydbConnection.Table().DoTx( - ydbContext, - func(ctx context.Context, tx table.TransactionActor) error { - // Select from account table - var query result.Result - query, err = tx.Execute( - ctx, ydbCluster.yqlSelectSrcDstAcc, + transferId := convertTransferId(ydbCluster.transferIdHashing, &transfer.Id) + + // Execute the single-statement transfer transaction + return ydbCluster.ydbConnection.Table().Do(ctx, + func(ctx context.Context, s table.Session) error { + _, _, err = s.Execute(ctx, + table.SerializableReadWriteTxControl(table.CommitTx()), + ydbCluster.yqlTransferSingleOp, table.NewQueryParameters( + table.ValueParam("transfer_id", + types.BytesValueFromString(transferId)), table.ValueParam("src_bic", types.BytesValueFromString(transfer.Acs[0].Bic)), table.ValueParam("src_ban", @@ -232,113 +282,42 @@ func (ydbCluster *YandexDBCluster) MakeAtomicTransfer( types.BytesValueFromString(transfer.Acs[1].Bic)), table.ValueParam("dst_ban", types.BytesValueFromString(transfer.Acs[1].Ban)), + table.ValueParam("amount", + types.Int64Value(amount)), + table.ValueParam("state", + types.BytesValueFromString("complete")), ), options.WithKeepInCache(true), ) if err != nil { - return errors.Wrap(err, "failed to execute transaction") - } - defer func() { - _ = query.Close() - }() - - for query.NextResultSet(ctx) { - // Expect to have 2 rows - source and destination accounts. - // In case of 0 or 1 rows something is missing. - if query.CurrentResultSet().RowCount() != 2 { //nolint:gomnd // not magic number + // TODO: find a better way to grab the specific errors + text := err.Error() + if strings.Contains(text, "MISSING_ACCOUNTS") { llog.Tracef( "missing transfer: src_bic: %s, src_ban: %s dst_bic: %s, dst_ban: %s", transfer.Acs[0].Bic, transfer.Acs[0].Ban, transfer.Acs[1].Bic, transfer.Acs[1].Ban, ) - return ErrNoRows } - for query.NextRow() { - var srcdst int32 - var balance *int64 - if err = query.Scan(&srcdst, &balance); err != nil { - return errors.Wrap(err, "failed to scan account balance") - } - if balance == nil { - return errIllegalNilOutput - } - switch srcdst { - case 1: // need to check the source account balance - if *balance < amount { - return ErrInsufficientFunds - } - case 2: //nolint:gomnd // nothing to do on the destination account - default: // something strange to be reported - return merry.Errorf( - "Illegal srcdst value %d for srcdst account statement", - srcdst, - ) - } + if strings.Contains(text, "INSUFFICIENT_FUNDS") { + llog.Tracef( + "insufficient funds: src_bic: %s, src_ban: %s dst_bic: %s, dst_ban: %s", + transfer.Acs[0].Bic, transfer.Acs[0].Ban, + transfer.Acs[1].Bic, transfer.Acs[1].Ban, + ) + return ErrInsufficientFunds } + return errors.Wrap(err, "failed to execute the transfer") } - if err = query.Err(); err != nil { - return errors.Wrap(err, "failed to retrieve query status") - } - - // Upsert the new row to the transfer table - _, err = tx.Execute( - ctx, ydbCluster.yqlUpsertTransfer, - table.NewQueryParameters( - table.ValueParam("transfer_id", - types.BytesValueFromString(transfer.Id.String())), - table.ValueParam("src_bic", - types.BytesValueFromString(transfer.Acs[0].Bic)), - table.ValueParam("src_ban", - types.BytesValueFromString(transfer.Acs[0].Ban)), - table.ValueParam("dst_bic", - types.BytesValueFromString(transfer.Acs[1].Bic)), - table.ValueParam("dst_ban", - types.BytesValueFromString(transfer.Acs[1].Ban)), - table.ValueParam("amount", - types.Int64Value(amount)), - table.ValueParam("state", - types.BytesValueFromString("complete")), - ), - options.WithKeepInCache(true), - ) - if err != nil { - return errors.Wrap(err, "failed to execute transaction") - } - - // Update two balances in the account table. - _, err = tx.Execute( - ctx, ydbCluster.yqlUpsertSrcDstAcc, - table.NewQueryParameters( - table.ValueParam("src_bic", - types.BytesValueFromString(transfer.Acs[0].Bic)), - table.ValueParam("src_ban", - types.BytesValueFromString(transfer.Acs[0].Ban)), - table.ValueParam("dst_bic", - types.BytesValueFromString(transfer.Acs[1].Bic)), - table.ValueParam("dst_ban", - types.BytesValueFromString(transfer.Acs[1].Ban)), - table.ValueParam("amount", - types.Int64Value(transfer.Amount.UnscaledBig().Int64())), - ), - options.WithKeepInCache(true), - ) - if err != nil { - return errors.Wrap(err, "failed to execute transaction") - } - return nil }, - // Mark the transaction idempotent to allow retries. + // Mark the operation idempotent to allow retries. table.WithIdempotent(), - ); err != nil { - return errors.Wrap(err, "failed to execute 'Do' procedure") - } - - return nil + ) } -func (ydbCluster *YandexDBCluster) FetchAccounts() ([]model.Account, error) { +func (ydbCluster *YdbCluster) FetchAccounts() ([]model.Account, error) { var err error ydbContext, ctxCloseFn := context.WithCancel(context.Background()) @@ -357,7 +336,7 @@ func (ydbCluster *YandexDBCluster) FetchAccounts() ([]model.Account, error) { rows, err = sess.StreamExecuteScanQuery(ctx, selectStmnt, nil) if err != nil { - return errors.Wrap(err, "failed to execute scan query") + return errors.Wrap(err, "failed to execute scan query on account table") } defer func() { _ = rows.Close() @@ -385,7 +364,7 @@ func (ydbCluster *YandexDBCluster) FetchAccounts() ([]model.Account, error) { return accs, nil } -func (ydbCluster *YandexDBCluster) FetchBalance( +func (ydbCluster *YdbCluster) FetchBalance( bic string, ban string, ) (*inf.Dec, *inf.Dec, error) { @@ -438,7 +417,7 @@ func (ydbCluster *YandexDBCluster) FetchBalance( return nil, nil, errors.Errorf("No amount for bic %s and ban %s", bic, ban) } -func (ydbCluster *YandexDBCluster) FetchTotal() (*inf.Dec, error) { +func (ydbCluster *YdbCluster) FetchTotal() (*inf.Dec, error) { var ( err error queryResult result.Result @@ -487,7 +466,7 @@ func (ydbCluster *YandexDBCluster) FetchTotal() (*inf.Dec, error) { return inf.NewDec(amount, 0), nil } -func (ydbCluster *YandexDBCluster) CheckBalance() (*inf.Dec, error) { +func (ydbCluster *YdbCluster) CheckBalance() (*inf.Dec, error) { var ( err error queryResult result.Result @@ -534,7 +513,7 @@ func (ydbCluster *YandexDBCluster) CheckBalance() (*inf.Dec, error) { return inf.NewDec(totalBalance, 0), nil } -func (ydbCluster *YandexDBCluster) PersistTotal(total inf.Dec) error { +func (ydbCluster *YdbCluster) PersistTotal(total inf.Dec) error { var err error ydbContext, ctxCloseFn := context.WithCancel(context.Background()) @@ -569,7 +548,7 @@ func (ydbCluster *YandexDBCluster) PersistTotal(total inf.Dec) error { return nil } -func (ydbCluster *YandexDBCluster) BootstrapDB(count uint64, seed int) error { +func (ydbCluster *YdbCluster) BootstrapDB(count uint64, seed int) error { var err error llog.Infof("Creating the folders and tables...") @@ -587,35 +566,16 @@ func (ydbCluster *YandexDBCluster) BootstrapDB(count uint64, seed int) error { return err } - if err = createSettingsTable( - ydbContext, - ydbCluster.ydbConnection.Table(), - prefix, - ); err != nil { + if err = ydbCluster.createSettingsTable(ydbContext, prefix); err != nil { return err } - - if err = createAccountTable( - ydbContext, - ydbCluster.ydbConnection.Table(), - prefix, - ); err != nil { + if err = ydbCluster.createAccountTable(ydbContext, prefix); err != nil { return err } - - if err = createTransferTable( - ydbContext, - ydbCluster.ydbConnection.Table(), - prefix, - ); err != nil { + if err = ydbCluster.createTransferTable(ydbContext, prefix); err != nil { return err } - - if err = createChecksumTable( - ydbContext, - ydbCluster.ydbConnection.Table(), - prefix, - ); err != nil { + if err = ydbCluster.createChecksumTable(ydbContext, prefix); err != nil { return err } @@ -631,15 +591,15 @@ func (ydbCluster *YandexDBCluster) BootstrapDB(count uint64, seed int) error { return nil } -func createSettingsTable( //nolint:dupl // because it golang +func (ydbCluster *YdbCluster) createSettingsTable( //nolint:dupl // because it golang ydbContext context.Context, - ydbClient table.Client, prefix string, + prefix string, ) error { var err error tabname := path.Join(prefix, "settings") if err = recreateTable( - ydbContext, ydbClient, tabname, + ydbContext, ydbCluster.ydbConnection.Table(), tabname, func(ctx context.Context, session table.Session) error { if err = session.CreateTable( ctx, tabname, @@ -647,27 +607,34 @@ func createSettingsTable( //nolint:dupl // because it golang options.WithColumn("value", types.Optional(types.TypeString)), options.WithPrimaryKeyColumn("key"), ); err != nil { - return errors.Wrap(err, "failed to create table") + return err } - return nil }, ); err != nil { - return errors.Wrap(err, "failed to recreate settings table") + return errors.Wrap(err, "failed to (re)create settings table") } return nil } -func createAccountTable( +func (ydbCluster *YdbCluster) createAccountTable( ydbContext context.Context, - ydbClient table.Client, prefix string, + prefix string, ) error { var err error + partitionsMinCount := ydbCluster.partitionsMinCount + if partitionsMinCount < 10 { + partitionsMinCount = 10 + } else if partitionsMinCount > 10000 { + partitionsMinCount = 10000 + } + partitionsMaxCount := partitionsMinCount + 10 + (ydbCluster.partitionsMinCount / 10) + tabname := path.Join(prefix, "account") if err = recreateTable( - ydbContext, ydbClient, tabname, + ydbContext, ydbCluster.ydbConnection.Table(), tabname, func(ctx context.Context, session table.Session) error { if err = session.CreateTable( ctx, tabname, @@ -678,31 +645,39 @@ func createAccountTable( options.WithPartitioningSettings( options.WithPartitioningByLoad(options.FeatureEnabled), options.WithPartitioningBySize(options.FeatureEnabled), - options.WithMinPartitionsCount(partitionsMinCount), - options.WithPartitionSizeMb(partitionsMaxMbytes), + options.WithMinPartitionsCount(uint64(partitionsMinCount)), + options.WithMaxPartitionsCount(uint64(partitionsMaxCount)), + options.WithPartitionSizeMb(uint64(ydbCluster.partitionsMaxSize)), ), ); err != nil { - return errors.Wrap(err, "failed to create table") + return err } - return nil }, ); err != nil { - return errors.Wrap(err, "failed to recreate account table") + return errors.Wrap(err, "failed to (re)create account table") } return nil } -func createTransferTable( +func (ydbCluster *YdbCluster) createTransferTable( ydbContext context.Context, - ydbClient table.Client, prefix string, + prefix string, ) error { var err error + partitionsMinCount := ydbCluster.partitionsMinCount + if partitionsMinCount < 10 { + partitionsMinCount = 10 + } else if partitionsMinCount > 10000 { + partitionsMinCount = 10000 + } + partitionsMaxCount := partitionsMinCount + 10 + (ydbCluster.partitionsMinCount / 10) + tabname := path.Join(prefix, "transfer") if err = recreateTable( - ydbContext, ydbClient, tabname, + ydbContext, ydbCluster.ydbConnection.Table(), tabname, func(ctx context.Context, session table.Session) error { if err = session.CreateTable( ctx, tabname, @@ -719,31 +694,31 @@ func createTransferTable( options.WithPartitioningSettings( options.WithPartitioningByLoad(options.FeatureEnabled), options.WithPartitioningBySize(options.FeatureEnabled), - options.WithMinPartitionsCount(partitionsMinCount), - options.WithPartitionSizeMb(partitionsMaxMbytes), + options.WithMinPartitionsCount(uint64(partitionsMinCount)), + options.WithMaxPartitionsCount(uint64(partitionsMaxCount)), + options.WithPartitionSizeMb(uint64(ydbCluster.partitionsMaxSize)), ), ); err != nil { - return errors.Wrap(err, "failed to create table") + return err } - return nil }, ); err != nil { - return errors.Wrap(err, "failed to recreate account table") + return errors.Wrap(err, "failed to (re)create transfer table") } return nil } -func createChecksumTable( //nolint:dupl // because it golang +func (ydbCluster *YdbCluster) createChecksumTable( //nolint:dupl // because it golang ydbContext context.Context, - ydbClient table.Client, prefix string, + prefix string, ) error { var err error tabname := path.Join(prefix, "checksum") if err = recreateTable( - ydbContext, ydbClient, tabname, + ydbContext, ydbCluster.ydbConnection.Table(), tabname, func(ctx context.Context, session table.Session) error { if err = session.CreateTable( ctx, tabname, @@ -751,13 +726,13 @@ func createChecksumTable( //nolint:dupl // because it golang options.WithColumn("amount", types.Optional(types.TypeInt64)), options.WithPrimaryKeyColumn("name"), ); err != nil { - return errors.Wrap(err, "failed to create table") + return err } return nil }, ); err != nil { - return errors.Wrap(err, "failed to recreate checksum table") + return errors.Wrap(err, "failed to (re)create checksum table") } return nil @@ -869,7 +844,7 @@ func upsertSettings( return nil } -func (ydbCluster *YandexDBCluster) InsertAccount(acc model.Account) error { +func (ydbCluster *YdbCluster) InsertAccount(acc model.Account) error { var err error ydbContext, ctxCloseFn := context.WithCancel(context.Background()) @@ -904,38 +879,38 @@ func (ydbCluster *YandexDBCluster) InsertAccount(acc model.Account) error { return nil } -func (ydbCluster *YandexDBCluster) InsertTransfer(transfer *model.Transfer) error { +func (ydbCluster *YdbCluster) InsertTransfer(transfer *model.Transfer) error { panic("unimplemented!") } -func (ydbCluster *YandexDBCluster) DeleteTransfer( +func (ydbCluster *YdbCluster) DeleteTransfer( transferID model.TransferId, clientID uuid.UUID, ) error { panic("unimplemented!") } -func (ydbCluster *YandexDBCluster) SetTransferClient( +func (ydbCluster *YdbCluster) SetTransferClient( clientID uuid.UUID, transferID model.TransferId, ) error { panic("unimplemented!") } -func (ydbCluster *YandexDBCluster) FetchTransferClient( +func (ydbCluster *YdbCluster) FetchTransferClient( transferID model.TransferId, ) (*uuid.UUID, error) { panic("unimplemented!") } -func (ydbCluster *YandexDBCluster) ClearTransferClient( +func (ydbCluster *YdbCluster) ClearTransferClient( transferID model.TransferId, clientID uuid.UUID, ) error { panic("unimplemented!") } -func (ydbCluster *YandexDBCluster) SetTransferState( +func (ydbCluster *YdbCluster) SetTransferState( state string, transferID model.TransferId, clientID uuid.UUID, @@ -943,17 +918,17 @@ func (ydbCluster *YandexDBCluster) SetTransferState( panic("unimplemented!") } -func (ydbCluster *YandexDBCluster) FetchTransfer( +func (ydbCluster *YdbCluster) FetchTransfer( transferID model.TransferId, ) (*model.Transfer, error) { panic("unimplemented!") } -func (ydbCluster *YandexDBCluster) FetchDeadTransfers() ([]model.TransferId, error) { +func (ydbCluster *YdbCluster) FetchDeadTransfers() ([]model.TransferId, error) { panic("unimplemented!") } -func (ydbCluster *YandexDBCluster) UpdateBalance( +func (ydbCluster *YdbCluster) UpdateBalance( balance *inf.Dec, bic string, ban string, @@ -962,7 +937,7 @@ func (ydbCluster *YandexDBCluster) UpdateBalance( panic("unimplemented!") } -func (ydbCluster *YandexDBCluster) LockAccount( +func (ydbCluster *YdbCluster) LockAccount( transferID model.TransferId, pendingAmount *inf.Dec, bic string, @@ -971,7 +946,7 @@ func (ydbCluster *YandexDBCluster) LockAccount( panic("unimplemented!") } -func (ydbCluster *YandexDBCluster) UnlockAccount( +func (ydbCluster *YdbCluster) UnlockAccount( bic string, ban string, transferID model.TransferId, @@ -980,17 +955,27 @@ func (ydbCluster *YandexDBCluster) UnlockAccount( } // TODO: check possibility of collecting statistics for YDB. -func (ydbCluster *YandexDBCluster) StartStatisticsCollect(_ time.Duration) error { +func (ydbCluster *YdbCluster) StartStatisticsCollect(_ time.Duration) error { llog.Debugln("statistic for YDB not implemeted yet, watch grafana metrics, please") return nil } +var ( + // Template for generating YQL queries + ydbYqlTemplate = template.New("").Funcs(template.FuncMap{ + "stroppyDir": func() string { + return stroppyDir + }, + }) +) + // Substitute directory path into the YQL template, // replacing the double quote characters with backticks. func expandYql(query string) string { - retval := strings.ReplaceAll(query, "&{stroppyDir}", stroppyDir) - retval = strings.ReplaceAll(retval, `"`, "`") - - return retval + var buffer strings.Builder + if err := template.Must(ydbYqlTemplate.Parse(query)).Execute(&buffer, nil); err != nil { + panic(err) + } + return buffer.String() } diff --git a/pkg/database/cluster/ydb_insert_account.yql b/pkg/database/cluster/ydb_insert_account.yql new file mode 100644 index 00000000..be344aa4 --- /dev/null +++ b/pkg/database/cluster/ydb_insert_account.yql @@ -0,0 +1,2 @@ +DECLARE $bic AS String; DECLARE $ban AS String; DECLARE $balance AS Int64; +INSERT INTO `{{ stroppyDir }}/account` (bic, ban, balance) VALUES ($bic, $ban, $balance); diff --git a/pkg/database/cluster/ydb_select_balance_account.yql b/pkg/database/cluster/ydb_select_balance_account.yql new file mode 100644 index 00000000..ece392c8 --- /dev/null +++ b/pkg/database/cluster/ydb_select_balance_account.yql @@ -0,0 +1,5 @@ +DECLARE $bic AS String; DECLARE $ban AS String; + +SELECT balance, CAST(0 AS Int64) AS pending +FROM `{{ stroppyDir }}/account` +WHERE bic = $bic AND ban = $ban diff --git a/pkg/database/cluster/ydb_test.go b/pkg/database/cluster/ydb_test.go new file mode 100644 index 00000000..1ed95233 --- /dev/null +++ b/pkg/database/cluster/ydb_test.go @@ -0,0 +1,34 @@ +package cluster + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_expandYql(t *testing.T) { + for _, tt := range []struct { + in string + out string + }{ + { + in: yqlInsertAccount, + //nolint:lll + out: "DECLARE $bic AS String; DECLARE $ban AS String; DECLARE $balance AS Int64;\nINSERT INTO `stroppy/account` (bic, ban, balance) VALUES ($bic, $ban, $balance);\n", + }, + { + in: yqlTransfer, + //nolint:lll + out: "DECLARE $transfer_id AS String;\nDECLARE $src_bic AS String;\nDECLARE $src_ban AS String;\nDECLARE $dst_bic AS String;\nDECLARE $dst_ban AS String;\nDECLARE $amount AS Int64;\nDECLARE $state AS String;\n\n$shared_select = (\n SELECT\n bic,\n ban,\n Ensure(balance - $amount, balance >= $amount, 'INSUFFICIENT_FUNDS') AS balance\n FROM `stroppy/account`\n WHERE bic = $src_bic AND ban = $src_ban\n UNION ALL\n SELECT\n bic,\n ban,\n balance + $amount AS balance\n FROM `stroppy/account`\n WHERE bic = $dst_bic AND ban = $dst_ban\n);\n\nDISCARD SELECT Ensure(2, cnt=2, 'MISSING_ACCOUNTS')\nFROM (SELECT COUNT(*) AS cnt FROM $shared_select);\n\nUPSERT INTO `stroppy/account`\nSELECT * FROM $shared_select;\n\nUPSERT INTO `stroppy/transfer` (transfer_id, src_bic, src_ban, dst_bic, dst_ban, amount, state)\nVALUES ($transfer_id, $src_bic, $src_ban, $dst_bic, $dst_ban, $amount, $state);\n", + }, + { + in: yqlSelectBalanceAccount, + //nolint:lll + out: "DECLARE $bic AS String; DECLARE $ban AS String;\n\nSELECT balance, CAST(0 AS Int64) AS pending\nFROM `stroppy/account`\nWHERE bic = $bic AND ban = $ban\n", + }, + } { + t.Run("", func(t *testing.T) { + require.Equal(t, tt.out, expandYql(tt.in)) + }) + } +} diff --git a/pkg/database/cluster/ydb_transfer.yql b/pkg/database/cluster/ydb_transfer.yql new file mode 100644 index 00000000..36103e89 --- /dev/null +++ b/pkg/database/cluster/ydb_transfer.yql @@ -0,0 +1,32 @@ +DECLARE $transfer_id AS String; +DECLARE $src_bic AS String; +DECLARE $src_ban AS String; +DECLARE $dst_bic AS String; +DECLARE $dst_ban AS String; +DECLARE $amount AS Int64; +DECLARE $state AS String; + +$shared_select = ( + SELECT + bic, + ban, + Ensure(balance - $amount, balance >= $amount, 'INSUFFICIENT_FUNDS') AS balance + FROM `{{ stroppyDir }}/account` + WHERE bic = $src_bic AND ban = $src_ban + UNION ALL + SELECT + bic, + ban, + balance + $amount AS balance + FROM `{{ stroppyDir }}/account` + WHERE bic = $dst_bic AND ban = $dst_ban +); + +DISCARD SELECT Ensure(2, cnt=2, 'MISSING_ACCOUNTS') +FROM (SELECT COUNT(*) AS cnt FROM $shared_select); + +UPSERT INTO `{{ stroppyDir }}/account` +SELECT * FROM $shared_select; + +UPSERT INTO `{{ stroppyDir }}/transfer` (transfer_id, src_bic, src_ban, dst_bic, dst_ban, amount, state) +VALUES ($transfer_id, $src_bic, $src_ban, $dst_bic, $dst_ban, $amount, $state); diff --git a/pkg/engine/db/yandex.go b/pkg/engine/db/yandex.go index afec015f..15633972 100644 --- a/pkg/engine/db/yandex.go +++ b/pkg/engine/db/yandex.go @@ -173,7 +173,7 @@ func (yc *yandexCluster) deployYandexDBOperator(shellState *state.State) error { // Connect to freshly deployed cluster. func (yc *yandexCluster) Connect() (interface{}, error) { var ( - connection *cluster.YandexDBCluster + connection *cluster.YdbCluster err error ) @@ -186,7 +186,7 @@ func (yc *yandexCluster) Connect() (interface{}, error) { ydbContext, ctxCloseFn := context.WithTimeout(context.Background(), time.Second) defer ctxCloseFn() - if connection, err = cluster.NewYandexDBCluster( + if connection, err = cluster.NewYdbCluster( ydbContext, yc.commonCluster.DBUrl, yc.commonCluster.connectionPoolSize,