Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YDB Support #3

Merged
merged 25 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SOURCE ?= file go_bindata github github_ee bitbucket aws_s3 google_cloud_storage godoc_vfs gitlab
DATABASE ?= postgres mysql redshift cassandra spanner cockroachdb yugabytedb clickhouse mongodb sqlserver firebird neo4j pgx pgx5 rqlite
DATABASE ?= postgres mysql redshift cassandra spanner cockroachdb yugabytedb clickhouse mongodb sqlserver firebird neo4j pgx pgx5 rqlite ydb
DATABASE_TEST ?= $(DATABASE) sqlite sqlite3 sqlcipher
VERSION ?= $(shell git describe --tags 2>/dev/null | cut -c 2-)
TEST_FLAGS ?=
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Database drivers run migrations. [Add a new database?](database/driver.go)
* [Firebird](database/firebird)
* [MS SQL Server](database/sqlserver)
* [rqlite](database/rqlite)
* [YDB](database/ydb)

### Database URLs

Expand Down
30 changes: 30 additions & 0 deletions database/ydb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# [YDB](https://ydb.tech/docs/)

`ydb://[user:password@]host:port/database?QUERY_PARAMS`
asmyasnikov marked this conversation as resolved.
Show resolved Hide resolved

| URL Query | Description |
|:----------:|:---------------------------------------:|
| `user` | The user to sign in as. |
| `password` | The user's password. |
| `host` | The host to connect to. |
| `port` | The port to bind to. | |
| `database` | The name of the database to connect to. |

| URL Query Params | Description |
|:--------------------:|:-----------------------------------------------------------:|
| `x-auth-token` | Authentication token. |
| `x-migrations-table` | Name of the migrations table (default `schema_migrations`). |
| `x-use-grpcs` | Enables gRPCS protocol for YDB connections (default grpc). |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not required. Protocol defined in connection string directly

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

golang-migrate uses scheme to determine db driver --
https://github.com/golang-migrate/migrate/blob/master/database/driver.go#L86


### Authentication

By default, golang-migrate connects to YDB
using [anonymous credentials](https://ydb.tech/docs/en/recipes/ydb-sdk/auth-anonymous). \
Through the url query, you can change the default behavior:

- To connect to YDB using [static credentials](https://ydb.tech/docs/en/recipes/ydb-sdk/auth-static) you need to specify
username and password:
`ydb://user:password@host:port/database`
asmyasnikov marked this conversation as resolved.
Show resolved Hide resolved
- To connect to YDB using [token](https://ydb.tech/docs/en/recipes/ydb-sdk/auth-access-token) you need to specify token
as query parameter:
`ydb://host:port/database?x-auth-token=<YDB_TOKEN>`
asmyasnikov marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions database/ydb/examples/migrations/001_create_users.down.yql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE `test/users`;
6 changes: 6 additions & 0 deletions database/ydb/examples/migrations/001_create_users.up.yql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE `test/users` (
id Uint64,
name String,
email String,
PRIMARY KEY (id)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP TABLE `test/cities`;

ALTER TABLE `test/users`
DROP COLUMN city;
8 changes: 8 additions & 0 deletions database/ydb/examples/migrations/002_add_city_to_users.up.yql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE `test/cities` (
id Uint64,
name String,
PRIMARY KEY (id)
);

ALTER TABLE `test/users`
ADD COLUMN city Uint64;
1 change: 1 addition & 0 deletions database/ydb/examples/migrations/003_create_topic.down.yql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TOPIC `test/topic`;
1 change: 1 addition & 0 deletions database/ydb/examples/migrations/003_create_topic.up.yql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE TOPIC `test/topic`;
276 changes: 276 additions & 0 deletions database/ydb/ydb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
package ydb

import (
"context"
"errors"
"fmt"
"io"
"net/url"
"sync/atomic"

"github.com/hashicorp/go-multierror"
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/query"

"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
)

func init() {
db := YDB{}
database.Register("ydb", &db)
}

const (
defaultMigrationsTable = "schema_migrations"

queryParamAuthToken = "x-auth-token"
queryParamMigrationsTable = "x-migrations-table"
queryParamUseGRPCS = "x-use-grpcs"
)

var (
ErrNilConfig = fmt.Errorf("no config")
ErrNoDatabaseName = fmt.Errorf("no database name")
)

type Config struct {
MigrationsTable string
}

type YDB struct {
driver *ydb.Driver
config *Config

isLocked atomic.Bool
}

func WithInstance(driver *ydb.Driver, config *Config) (database.Driver, error) {
if config == nil {
return nil, ErrNilConfig
}

if len(config.MigrationsTable) == 0 {
config.MigrationsTable = defaultMigrationsTable
}

db := &YDB{
driver: driver,
config: config,
}
if err := db.ensureVersionTable(); err != nil {
return nil, err
}
return db, nil
}

func (db *YDB) Open(dsn string) (database.Driver, error) {
purl, err := url.Parse(dsn)
if err != nil {
return nil, err
}

if len(purl.Path) == 0 {
return nil, ErrNoDatabaseName
}

pquery, err := url.ParseQuery(purl.RawQuery)
if err != nil {
return nil, err
}

switch {
case pquery.Has(queryParamUseGRPCS):
purl.Scheme = "grpcs"
default:
purl.Scheme = "grpc"
}

purl = migrate.FilterCustomQuery(purl)
credentials := db.parseCredentials(purl, pquery)
driver, err := ydb.Open(context.TODO(), purl.String(), credentials)
if err != nil {
return nil, err
}

px, err := WithInstance(driver, &Config{
MigrationsTable: pquery.Get(queryParamMigrationsTable),
})
if err != nil {
return nil, err
}
return px, nil
}

func (db *YDB) parseCredentials(url *url.URL, query url.Values) (credentials ydb.Option) {
switch {
case query.Has(queryParamAuthToken):
credentials = ydb.WithAccessTokenCredentials(query.Get(queryParamAuthToken))
case url.User != nil:
user := url.User.Username()
password, _ := url.User.Password()
credentials = ydb.WithStaticCredentials(user, password)
default:
credentials = ydb.WithAnonymousCredentials()
}
url.User = nil
return credentials
}

func (db *YDB) Close() error {
return db.driver.Close(context.TODO())
}

func (db *YDB) Run(migration io.Reader) error {
rawMigrations, err := io.ReadAll(migration)
if err != nil {
return err
}

res, err := db.driver.Scripting().Execute(context.TODO(), string(rawMigrations), nil)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other places you use db.driver.Query(), here we should also use it

if err != nil {
return err
}
return res.Close()
}

func (db *YDB) SetVersion(version int, dirty bool) error {
deleteVersionQuery := fmt.Sprintf(`
DELETE FROM %s
`, db.config.MigrationsTable)

insertVersionQuery := fmt.Sprintf(`
INSERT INTO %s (version, dirty, created) VALUES (%d, %t, CurrentUtcTimestamp())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use a query with parameters here to eliminate compilation time from migration progress. I am not sure if it is meaningful, though, it is okay to do it with a Sprintf.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the difference would be in µs and this is a script which will be run manually, so I think Sprintf would be ok:)

`, db.config.MigrationsTable, version, dirty)

ctx := context.TODO()
err := db.driver.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error {
if err := tx.Exec(ctx, deleteVersionQuery); err != nil {
return err
}
// Also re-write the schema version for nil dirty versions to prevent
// empty schema version for failed down migration on the first migration
// See: https://github.com/golang-migrate/migrate/issues/330
if version >= 0 || (version == database.NilVersion && dirty) {
if err := tx.Exec(ctx, insertVersionQuery); err != nil {
return err
}
}
return nil
}, query.WithTxSettings(query.TxSettings(query.WithSerializableReadWrite())))
return err
}

func (db *YDB) Version() (version int, dirty bool, err error) {
ctx := context.TODO()

getVersionQuery := fmt.Sprintf(`
SELECT version, dirty FROM %s ORDER BY version DESC LIMIT 1
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You clear the table in SetVersion(), then maybe we do not need ORDER BY DESC LIMIT 1 here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ORDER BY DESC is not needed, and actually LIMIT 1 also not needed
but all other adapters use LIMIT 1, so I left this for unification

`, db.config.MigrationsTable)

rs, err := db.driver.Query().QueryResultSet(ctx, getVersionQuery)
if err != nil {
return 0, false, &database.Error{OrigErr: err, Query: []byte(getVersionQuery)}
}
defer func() {
if closeErr := rs.Close(ctx); closeErr != nil {
err = multierror.Append(err, closeErr)
}
}()

row, err := rs.NextRow(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
return database.NilVersion, false, nil
}
return 0, false, err
}

var v uint64
if err = row.Scan(&v, &dirty); err != nil {
return 0, false, &database.Error{OrigErr: err, Query: []byte(getVersionQuery)}
}
return int(v), dirty, err
}

func (db *YDB) Drop() (err error) {
ctx := context.TODO()

listQuery := "SELECT DISTINCT Path FROM `.sys/partition_stats` WHERE Path NOT LIKE '%/.sys%'"
rs, err := db.driver.Query().QueryResultSet(context.TODO(), listQuery)
if err != nil {
return &database.Error{OrigErr: err, Query: []byte(listQuery)}
}
defer func() {
if closeErr := rs.Close(ctx); closeErr != nil {
err = multierror.Append(err, closeErr)
}
}()

for {
var row query.Row
if row, err = rs.NextRow(ctx); err != nil {
if errors.Is(err, io.EOF) {
err = nil
break
}
return err
}

var table string
if err = row.Scan(&table); err != nil {
return err
}

dropQuery := fmt.Sprintf("DROP TABLE `%s`", table)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other db adapters in the repo use DROP TABLE IF EXISTS, I am not really sure why, but maybe we should keep it consistent and add IF EXISTS here

if err = db.driver.Query().Exec(ctx, dropQuery); err != nil {
return &database.Error{OrigErr: err, Query: []byte(dropQuery)}
}
}

return err
}
func (db *YDB) Lock() error {
if !db.isLocked.CompareAndSwap(false, true) {
return database.ErrLocked
}
return nil
}

func (db *YDB) Unlock() error {
if !db.isLocked.CompareAndSwap(true, false) {
return database.ErrNotLocked
}
return nil
}

// ensureVersionTable checks if versions table exists and, if not, creates it.
func (db *YDB) ensureVersionTable() (err error) {
if err = db.Lock(); err != nil {
return err
}

defer func() {
if unlockErr := db.Unlock(); unlockErr != nil {
if err == nil {
err = unlockErr
} else {
err = multierror.Append(err, unlockErr)
}
}
}()

createVersionTableQuery := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
version Uint64,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add NOT NULL to scheme

dirty Bool,
created Timestamp,
PRIMARY KEY(version)
)
`, db.config.MigrationsTable)
err = db.driver.Query().Exec(context.TODO(), createVersionTableQuery)
if err != nil {
return err
}
return err
}
Loading