Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YDB Reimplement Lock Mechanics #4

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion database/ydb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
|:----------------------------:|:--------------------------------------------------------------------------------------------:|
| `x-auth-token` | Authentication token. |
| `x-migrations-table` | Name of the migrations table (default `schema_migrations`). |
| `x-lock-table` | Name of the table which maintains the migration lock (default `schema_lock`). |
| `x-use-grpcs` | Enables gRPCS protocol for YDB connections (default grpc). |
| `x-tls-ca` | The location of the CA (certificate authority) file. |
| `x-tls-insecure-skip-verify` | Controls whether a client verifies the server's certificate chain and host name. |
Expand All @@ -37,4 +38,11 @@ Through the url query, you can change the default behavior:
`ydb://user:password@host:port/database`
- To connect to YDB using [token](https://ydb.tech/docs/en/recipes/ydb-sdk/auth-access-token) you need to specify token
as query parameter:
`ydb://host:port/database?x-auth-token=<YDB_TOKEN>`
`ydb://host:port/database?x-auth-token=<YDB_TOKEN>`

### Locks

If golang-migrate fails to acquire the lock and no migrations are currently running.
This may indicate that one of the migrations did not complete successfully.
In this case, you need to analyze the previous migrations, rollback if necessary, and manually remove the lock from the
`x-lock-table`.
95 changes: 84 additions & 11 deletions database/ydb/ydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"fmt"
"io"
"net/url"
"sync/atomic"

"github.com/hashicorp/go-multierror"
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/balancers"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"go.uber.org/atomic"

"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
Expand All @@ -22,9 +24,11 @@ func init() {

const (
defaultMigrationsTable = "schema_migrations"
defaultLockTable = "schema_lock"

queryParamAuthToken = "x-auth-token"
queryParamMigrationsTable = "x-migrations-table"
queryParamLockTable = "x-lock-table"
queryParamUseGRPCS = "x-use-grpcs"
queryParamTLSCertificateAuthorities = "x-tls-ca"
queryParamTLSInsecureSkipVerify = "x-tls-insecure-skip-verify"
Expand All @@ -39,6 +43,8 @@ var (

type Config struct {
MigrationsTable string
LockTable string
DatabaseName string
}

type YDB struct {
Expand All @@ -63,6 +69,10 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
config.MigrationsTable = defaultMigrationsTable
}

if len(config.LockTable) == 0 {
config.LockTable = defaultLockTable
}

conn, err := instance.Conn(context.TODO())
if err != nil {
return nil, err
Expand All @@ -73,6 +83,9 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
db: instance,
config: config,
}
if err = db.ensureLockTable(); err != nil {
return nil, err
}
if err = db.ensureVersionTable(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -109,7 +122,11 @@ func (y *YDB) Open(dsn string) (database.Driver, error) {
return nil, err
}

nativeDriver, err := ydb.Open(context.TODO(), purl.String(), append(tlsOptions, credentials)...)
nativeDriver, err := ydb.Open(
context.TODO(),
purl.String(),
append(tlsOptions, credentials, ydb.WithBalancer(balancers.SingleConn()))...,
)
if err != nil {
return nil, err
}
Expand All @@ -123,6 +140,8 @@ func (y *YDB) Open(dsn string) (database.Driver, error) {

db, err := WithInstance(sql.OpenDB(connector), &Config{
MigrationsTable: pquery.Get(queryParamMigrationsTable),
LockTable: pquery.Get(queryParamLockTable),
DatabaseName: purl.Path,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -188,7 +207,7 @@ func (y *YDB) Run(migration io.Reader) error {
return err
}

if _, err = y.conn.ExecContext(ydb.WithQueryMode(context.TODO(), ydb.SchemeQueryMode), string(rawMigrations)); err != nil {
if _, err = y.conn.ExecContext(context.Background(), string(rawMigrations)); err != nil {
return database.Error{OrigErr: err, Err: "migration failed", Query: rawMigrations}
}
return nil
Expand Down Expand Up @@ -278,23 +297,77 @@ func (y *YDB) Drop() (err error) {

for _, path := range paths {
dropQuery := fmt.Sprintf("DROP TABLE IF EXISTS `%s`", path)
if _, err = y.conn.ExecContext(ydb.WithQueryMode(context.TODO(), ydb.SchemeQueryMode), dropQuery); err != nil {
if _, err = y.conn.ExecContext(context.Background(), dropQuery); err != nil {
return &database.Error{OrigErr: err, Query: []byte(dropQuery)}
}
}
return nil
}

func (y *YDB) Lock() error {
if !y.isLocked.CompareAndSwap(false, true) {
return database.ErrLocked
}
return nil
return database.CasRestoreOnErr(&y.isLocked, false, true, database.ErrLocked, func() (err error) {
return retry.DoTx(context.TODO(), y.db, func(ctx context.Context, tx *sql.Tx) (err error) {
aid, err := database.GenerateAdvisoryLockId(y.config.DatabaseName)
if err != nil {
return err
}

getLockQuery := fmt.Sprintf("SELECT * FROM %s WHERE lock_id = '%s'", y.config.LockTable, aid)
rows, err := tx.Query(getLockQuery, aid)
if err != nil {
return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(getLockQuery)}
}
defer func() {
if errClose := rows.Close(); errClose != nil {
err = multierror.Append(err, errClose)
}
}()

// If row exists at all, lock is present
locked := rows.Next()
if locked {
return database.ErrLocked
}

setLockQuery := fmt.Sprintf("INSERT INTO %s (lock_id) VALUES ('%s')", y.config.LockTable, aid)
if _, err = tx.Exec(setLockQuery); err != nil {
return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(setLockQuery)}
}
return nil
}, retry.WithTxOptions(&sql.TxOptions{Isolation: sql.LevelSerializable}))
})
}

func (y *YDB) Unlock() error {
if !y.isLocked.CompareAndSwap(true, false) {
return database.ErrNotLocked
return database.CasRestoreOnErr(&y.isLocked, true, false, database.ErrNotLocked, func() (err error) {
aid, err := database.GenerateAdvisoryLockId(y.config.DatabaseName)
if err != nil {
return err
}

releaseLockQuery := fmt.Sprintf("DELETE FROM %s WHERE lock_id = '%s'", y.config.LockTable, aid)
if _, err = y.conn.ExecContext(context.TODO(), releaseLockQuery); err != nil {
// On drops, the lock table is fully removed; This is fine, and is a valid "unlocked" state for the schema.
if ydb.IsOperationErrorSchemeError(err) {
return nil
}
return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(releaseLockQuery)}
}

return nil
})
}

// ensureLockTable checks if lock table exists and, if not, creates it.
func (y *YDB) ensureLockTable() (err error) {
createLockTableQuery := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
lock_id String NOT NULL,
PRIMARY KEY(lock_id)
)
`, y.config.LockTable)
if _, err = y.conn.ExecContext(context.Background(), createLockTableQuery); err != nil {
return &database.Error{OrigErr: err, Query: []byte(createLockTableQuery)}
}
return nil
}
Expand Down Expand Up @@ -323,7 +396,7 @@ func (y *YDB) ensureVersionTable() (err error) {
PRIMARY KEY(version)
)
`, y.config.MigrationsTable)
if _, err = y.conn.ExecContext(ydb.WithQueryMode(context.TODO(), ydb.SchemeQueryMode), createVersionTableQuery); err != nil {
if _, err = y.conn.ExecContext(context.Background(), createVersionTableQuery); err != nil {
return &database.Error{OrigErr: err, Query: []byte(createVersionTableQuery)}
}
return nil
Expand Down
75 changes: 50 additions & 25 deletions database/ydb/ydb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,55 +3,70 @@ package ydb
import (
"context"
"fmt"
"log"
"strings"
"testing"
"time"

"github.com/dhui/dktest"
"github.com/docker/go-connections/nat"
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/balancers"

"github.com/golang-migrate/migrate/v4"
dt "github.com/golang-migrate/migrate/v4/database/testing"
"github.com/golang-migrate/migrate/v4/dktesting"
_ "github.com/golang-migrate/migrate/v4/source/file"
)

const (
image = "ydbplatform/local-ydb:latest"
host = "localhost"
port = "2136"
defaultPort = 2136
databaseName = "local"
)

var (
opts = dktest.Options{
Env: map[string]string{
"GRPC_TLS_PORT": "2135",
"GRPC_PORT": "2136",
"MON_PORT": "8765",
},

PortBindings: nat.PortMap{
nat.Port("2136/tcp"): []nat.PortBinding{
{
HostIP: "0.0.0.0",
HostPort: port,
getOptions = func(port string) dktest.Options {
return dktest.Options{
Env: map[string]string{
"GRPC_TLS_PORT": "2135",
"GRPC_PORT": "2136",
"MON_PORT": "8765",
},
PortBindings: nat.PortMap{
nat.Port(fmt.Sprintf("%d/tcp", defaultPort)): []nat.PortBinding{
{
HostIP: "0.0.0.0",
HostPort: port,
},
},
},
},
PortRequired: true,
Hostname: "127.0.0.1",
ReadyTimeout: 15 * time.Second,
ReadyFunc: isReady,
}
}

Hostname: host,
ReadyTimeout: 15 * time.Second,
ReadyFunc: isReady,
// Released version: https://ydb.tech/docs/downloads/#ydb-server
specs = []dktesting.ContainerSpec{
{ImageName: "ydbplatform/local-ydb:latest", Options: getOptions("22000")},
{ImageName: "ydbplatform/local-ydb:24.3", Options: getOptions("22001")},
{ImageName: "ydbplatform/local-ydb:24.2", Options: getOptions("22002")},
}
)

func connectionString(options ...string) string {
func connectionString(host, port string, options ...string) string {
return fmt.Sprintf("ydb://%s:%s/%s?%s", host, port, databaseName, strings.Join(options, "&"))
}

func isReady(ctx context.Context, c dktest.ContainerInfo) bool {
d, err := ydb.Open(ctx, fmt.Sprintf("grpc://%s:%s/%s", host, port, databaseName))
ip, port, err := c.Port(defaultPort)
if err != nil {
log.Println("port error:", err)
return false
}

d, err := ydb.Open(ctx, fmt.Sprintf("grpc://%s:%s/%s", ip, port, databaseName), ydb.WithBalancer(balancers.SingleConn()))
if err != nil {
return false
}
Expand All @@ -67,9 +82,14 @@ func isReady(ctx context.Context, c dktest.ContainerInfo) bool {
}

func Test(t *testing.T) {
dktest.Run(t, image, opts, func(t *testing.T, c dktest.ContainerInfo) {
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
ip, port, err := c.Port(defaultPort)
if err != nil {
t.Fatal(err)
}

db := &YDB{}
d, err := db.Open(connectionString())
d, err := db.Open(connectionString(ip, port))
if err != nil {
t.Fatal(err)
}
Expand All @@ -86,9 +106,14 @@ func Test(t *testing.T) {
}

func TestMigrate(t *testing.T) {
dktest.Run(t, image, opts, func(t *testing.T, c dktest.ContainerInfo) {
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
ip, port, err := c.Port(defaultPort)
if err != nil {
t.Fatal(err)
}

db := &YDB{}
d, err := db.Open(connectionString())
d, err := db.Open(connectionString(ip, port))
if err != nil {
t.Fatal(err)
}
Expand Down
Loading