From 0cbff2ca10475c7c78d01a92c37c4f68b8555e24 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Wed, 5 Feb 2025 11:58:24 -0800 Subject: [PATCH] go: sqle,doltdb: Move commit hooks from doltdb to sqle. --- go/cmd/dolt/commands/sql_test.go | 2 +- go/cmd/dolt/commands/sqlserver/server_test.go | 5 +- .../doltcore/{doltdb => sqle}/commit_hooks.go | 37 +++-- .../{doltdb => sqle}/commit_hooks_test.go | 130 ++++++++++++++---- go/libraries/doltcore/sqle/replication.go | 6 +- .../doltcore/sqle/replication_test.go | 2 +- 6 files changed, 130 insertions(+), 52 deletions(-) rename go/libraries/doltcore/{doltdb => sqle}/commit_hooks.go (87%) rename go/libraries/doltcore/{doltdb => sqle}/commit_hooks_test.go (64%) diff --git a/go/cmd/dolt/commands/sql_test.go b/go/cmd/dolt/commands/sql_test.go index 925fcc289a..310264346a 100644 --- a/go/cmd/dolt/commands/sql_test.go +++ b/go/cmd/dolt/commands/sql_test.go @@ -620,7 +620,7 @@ func TestCommitHooksNoErrors(t *testing.T) { t.Error("failed to produce noop hook") } else { switch h := hooks[0].(type) { - case *doltdb.LogHook: + case *sqle.LogHook: default: t.Errorf("expected LogHook, found: %s", h) } diff --git a/go/cmd/dolt/commands/sqlserver/server_test.go b/go/cmd/dolt/commands/sqlserver/server_test.go index 301f0bdf04..3c9bdf83cd 100644 --- a/go/cmd/dolt/commands/sqlserver/server_test.go +++ b/go/cmd/dolt/commands/sqlserver/server_test.go @@ -93,13 +93,14 @@ func TestServerArgs(t *testing.T) { } func TestDeprecatedUserPasswordServerArgs(t *testing.T) { + ctx := context.Background() controller := svcs.NewController() dEnv, err := sqle.CreateEnvWithSeedData() require.NoError(t, err) defer func() { - assert.NoError(t, dEnv.DoltDB.Close()) + assert.NoError(t, dEnv.DoltDB(ctx).Close()) }() - err = StartServer(context.Background(), "0.0.0", "dolt sql-server", []string{ + err = StartServer(ctx, "0.0.0", "dolt sql-server", []string{ "-H", "localhost", "-P", "15200", "-u", "username", diff --git a/go/libraries/doltcore/doltdb/commit_hooks.go b/go/libraries/doltcore/sqle/commit_hooks.go similarity index 87% rename from go/libraries/doltcore/doltdb/commit_hooks.go rename to go/libraries/doltcore/sqle/commit_hooks.go index 2854eff677..60de8b0382 100644 --- a/go/libraries/doltcore/doltdb/commit_hooks.go +++ b/go/libraries/doltcore/sqle/commit_hooks.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package doltdb +package sqle import ( "context" @@ -23,44 +23,39 @@ import ( "github.com/dolthub/go-mysql-server/sql" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/ref" "github.com/dolthub/dolt/go/store/datas" "github.com/dolthub/dolt/go/store/hash" - "github.com/dolthub/dolt/go/store/types" ) type PushOnWriteHook struct { - destDB *DoltDB + destDB *doltdb.DoltDB tmpDir string out io.Writer - fmt *types.NomsBinFormat } -var _ CommitHook = (*PushOnWriteHook)(nil) +var _ doltdb.CommitHook = (*PushOnWriteHook)(nil) // NewPushOnWriteHook creates a ReplicateHook, parameterizaed by the backup database // and a local tempfile for pushing -func NewPushOnWriteHook(destDB *DoltDB, tmpDir string) *PushOnWriteHook { +func NewPushOnWriteHook(destDB *doltdb.DoltDB, tmpDir string) *PushOnWriteHook { return &PushOnWriteHook{ destDB: destDB, tmpDir: tmpDir, - fmt: destDB.Format(), } } // Execute implements CommitHook, replicates head updates to the destDb field -func (ph *PushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db *DoltDB) (func(context.Context) error, error) { +func (ph *PushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db *doltdb.DoltDB) (func(context.Context) error, error) { return nil, pushDataset(ctx, ph.destDB, db, ds, ph.tmpDir) } -func pushDataset(ctx context.Context, destDB, srcDB *DoltDB, ds datas.Dataset, tmpDir string) error { +func pushDataset(ctx context.Context, destDB, srcDB *doltdb.DoltDB, ds datas.Dataset, tmpDir string) error { addr, ok := ds.MaybeHeadAddr() if !ok { - rf, err := ref.Parse(ds.ID()) - if err != nil { - return err - } - err = destDB.DeleteBranch(ctx, rf, nil) + // TODO: fix up hack usage. + _, err := doltdb.HackDatasDatabaseFromDoltDB(destDB).Delete(ctx, ds, "") return err } @@ -100,7 +95,7 @@ func (ph *PushOnWriteHook) SetLogger(ctx context.Context, wr io.Writer) error { type PushArg struct { ds datas.Dataset - db *DoltDB + db *doltdb.DoltDB hash hash.Hash } @@ -116,10 +111,10 @@ const ( asyncPushSyncReplica = "async_push_sync_replica" ) -var _ CommitHook = (*AsyncPushOnWriteHook)(nil) +var _ doltdb.CommitHook = (*AsyncPushOnWriteHook)(nil) // NewAsyncPushOnWriteHook creates a AsyncReplicateHook -func NewAsyncPushOnWriteHook(bThreads *sql.BackgroundThreads, destDB *DoltDB, tmpDir string, logger io.Writer) (*AsyncPushOnWriteHook, error) { +func NewAsyncPushOnWriteHook(bThreads *sql.BackgroundThreads, destDB *doltdb.DoltDB, tmpDir string, logger io.Writer) (*AsyncPushOnWriteHook, error) { ch := make(chan PushArg, asyncPushBufferSize) err := RunAsyncReplicationThreads(bThreads, ch, destDB, tmpDir, logger) if err != nil { @@ -133,7 +128,7 @@ func (*AsyncPushOnWriteHook) ExecuteForWorkingSets() bool { } // Execute implements CommitHook, replicates head updates to the destDb field -func (ah *AsyncPushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db *DoltDB) (func(context.Context) error, error) { +func (ah *AsyncPushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db *doltdb.DoltDB) (func(context.Context) error, error) { addr, _ := ds.MaybeHeadAddr() // TODO: Unconditional push here seems dangerous. ah.ch <- PushArg{ds: ds, db: db, hash: addr} @@ -159,7 +154,7 @@ type LogHook struct { out io.Writer } -var _ CommitHook = (*LogHook)(nil) +var _ doltdb.CommitHook = (*LogHook)(nil) // NewLogHook is a noop that logs to a writer when invoked func NewLogHook(msg []byte) *LogHook { @@ -167,7 +162,7 @@ func NewLogHook(msg []byte) *LogHook { } // Execute implements CommitHook, writes message to log channel -func (lh *LogHook) Execute(ctx context.Context, ds datas.Dataset, db *DoltDB) (func(context.Context) error, error) { +func (lh *LogHook) Execute(ctx context.Context, ds datas.Dataset, db *doltdb.DoltDB) (func(context.Context) error, error) { if lh.out != nil { _, err := lh.out.Write(lh.msg) return nil, err @@ -193,7 +188,7 @@ func (*LogHook) ExecuteForWorkingSets() bool { return false } -func RunAsyncReplicationThreads(bThreads *sql.BackgroundThreads, ch chan PushArg, destDB *DoltDB, tmpDir string, logger io.Writer) error { +func RunAsyncReplicationThreads(bThreads *sql.BackgroundThreads, ch chan PushArg, destDB *doltdb.DoltDB, tmpDir string, logger io.Writer) error { mu := &sync.Mutex{} var newHeads = make(map[string]PushArg, asyncPushBufferSize) diff --git a/go/libraries/doltcore/doltdb/commit_hooks_test.go b/go/libraries/doltcore/sqle/commit_hooks_test.go similarity index 64% rename from go/libraries/doltcore/doltdb/commit_hooks_test.go rename to go/libraries/doltcore/sqle/commit_hooks_test.go index 1eafe367c9..904cc75006 100644 --- a/go/libraries/doltcore/doltdb/commit_hooks_test.go +++ b/go/libraries/doltcore/sqle/commit_hooks_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package doltdb +package sqle import ( "bytes" @@ -24,16 +24,21 @@ import ( "time" "github.com/dolthub/go-mysql-server/sql" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/buffer" "github.com/dolthub/dolt/go/libraries/doltcore/dbfactory" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable" "github.com/dolthub/dolt/go/libraries/doltcore/ref" + "github.com/dolthub/dolt/go/libraries/doltcore/row" + "github.com/dolthub/dolt/go/libraries/doltcore/schema" "github.com/dolthub/dolt/go/libraries/utils/filesys" "github.com/dolthub/dolt/go/libraries/utils/test" "github.com/dolthub/dolt/go/store/datas" + "github.com/dolthub/dolt/go/store/prolly/tree" "github.com/dolthub/dolt/go/store/types" ) @@ -59,7 +64,7 @@ func TestPushOnWriteHook(t *testing.T) { t.Fatal("Failed to create noms directory") } - destDB, _ := LoadDoltDB(context.Background(), types.Format_Default, LocalDirDoltDB, filesys.LocalFS) + destDB, _ := doltdb.LoadDoltDB(context.Background(), types.Format_Default, doltdb.LocalDirDoltDB, filesys.LocalFS) // source repo testDir, err = test.ChangeToTestDir("TestReplicationSource") @@ -75,7 +80,7 @@ func TestPushOnWriteHook(t *testing.T) { t.Fatal("Failed to create noms directory") } - ddb, _ := LoadDoltDB(context.Background(), types.Format_Default, LocalDirDoltDB, filesys.LocalFS) + ddb, _ := doltdb.LoadDoltDB(context.Background(), types.Format_Default, doltdb.LocalDirDoltDB, filesys.LocalFS) err = ddb.WriteEmptyRepo(context.Background(), "main", committerName, committerEmail) if err != nil { @@ -83,7 +88,7 @@ func TestPushOnWriteHook(t *testing.T) { } // prepare a commit in the source repo - cs, _ := NewCommitSpec("main") + cs, _ := doltdb.NewCommitSpec("main") optCmt, err := ddb.Resolve(context.Background(), cs, nil) if err != nil { t.Fatal("Couldn't find commit") @@ -95,28 +100,28 @@ func TestPushOnWriteHook(t *testing.T) { assert.NoError(t, err) if meta.Name != committerName || meta.Email != committerEmail { - t.Error("Unexpected metadata") + t.Fatal("Unexpected metadata") } root, err := commit.GetRootValue(context.Background()) assert.NoError(t, err) - names, err := root.GetTableNames(context.Background(), DefaultSchemaName) + names, err := root.GetTableNames(context.Background(), doltdb.DefaultSchemaName) assert.NoError(t, err) if len(names) != 0 { t.Fatal("There should be no tables in empty db") } tSchema := createTestSchema(t) - rowData := createTestRowData(t, ddb.vrw, ddb.ns, tSchema) - tbl, err := CreateTestTable(ddb.vrw, ddb.ns, tSchema, rowData) + rowData := createTestRowData(t, ddb.ValueReadWriter(), ddb.NodeStore(), tSchema) + tbl, err := createHooksTestTable(ddb.ValueReadWriter(), ddb.NodeStore(), tSchema, rowData) if err != nil { t.Fatal("Failed to create test table with data") } - root, err = root.PutTable(context.Background(), TableName{Name: "test"}, tbl) + root, err = root.PutTable(context.Background(), doltdb.TableName{Name: "test"}, tbl) assert.NoError(t, err) r, valHash, err := ddb.WriteRootValue(context.Background(), root) @@ -130,19 +135,19 @@ func TestPushOnWriteHook(t *testing.T) { // setup hook hook := NewPushOnWriteHook(destDB, tmpDir) - ddb.SetCommitHooks(ctx, []CommitHook{hook}) + ddb.SetCommitHooks(ctx, []doltdb.CommitHook{hook}) t.Run("replicate to remote", func(t *testing.T) { srcCommit, err := ddb.Commit(context.Background(), valHash, ref.NewBranchRef(defaultBranch), meta) require.NoError(t, err) - ds, err := ddb.db.GetDataset(ctx, "refs/heads/main") + ds, err := doltdb.HackDatasDatabaseFromDoltDB(ddb).GetDataset(ctx, "refs/heads/main") require.NoError(t, err) _, err = hook.Execute(ctx, ds, ddb) require.NoError(t, err) - cs, _ = NewCommitSpec(defaultBranch) + cs, _ = doltdb.NewCommitSpec(defaultBranch) optCmt, err := destDB.Resolve(context.Background(), cs, nil) require.NoError(t, err) destCommit, ok := optCmt.ToCommit() @@ -199,7 +204,7 @@ func TestAsyncPushOnWrite(t *testing.T) { t.Fatal("Failed to create noms directory") } - destDB, _ := LoadDoltDB(context.Background(), types.Format_Default, LocalDirDoltDB, filesys.LocalFS) + destDB, _ := doltdb.LoadDoltDB(context.Background(), types.Format_Default, doltdb.LocalDirDoltDB, filesys.LocalFS) // source repo testDir, err = test.ChangeToTestDir("TestReplicationSource") @@ -215,7 +220,7 @@ func TestAsyncPushOnWrite(t *testing.T) { t.Fatal("Failed to create noms directory") } - ddb, _ := LoadDoltDB(context.Background(), types.Format_Default, LocalDirDoltDB, filesys.LocalFS) + ddb, _ := doltdb.LoadDoltDB(context.Background(), types.Format_Default, doltdb.LocalDirDoltDB, filesys.LocalFS) err = ddb.WriteEmptyRepo(context.Background(), "main", committerName, committerEmail) if err != nil { @@ -231,7 +236,7 @@ func TestAsyncPushOnWrite(t *testing.T) { } for i := 0; i < 200; i++ { - cs, _ := NewCommitSpec("main") + cs, _ := doltdb.NewCommitSpec("main") optCmt, err := ddb.Resolve(context.Background(), cs, nil) if err != nil { t.Fatal("Couldn't find commit") @@ -251,16 +256,16 @@ func TestAsyncPushOnWrite(t *testing.T) { assert.NoError(t, err) tSchema := createTestSchema(t) - rowData, err := durable.NewEmptyPrimaryIndex(ctx, ddb.vrw, ddb.ns, tSchema) + rowData, err := durable.NewEmptyPrimaryIndex(ctx, ddb.ValueReadWriter(), ddb.NodeStore(), tSchema) require.NoError(t, err) - tbl, err := CreateTestTable(ddb.vrw, ddb.ns, tSchema, rowData) + tbl, err := createHooksTestTable(ddb.ValueReadWriter(), ddb.NodeStore(), tSchema, rowData) require.NoError(t, err) if err != nil { t.Fatal("Failed to create test table with data") } - root, err = root.PutTable(context.Background(), TableName{Name: "test"}, tbl) + root, err = root.PutTable(context.Background(), doltdb.TableName{Name: "test"}, tbl) assert.NoError(t, err) r, valHash, err := ddb.WriteRootValue(context.Background(), root) @@ -274,7 +279,7 @@ func TestAsyncPushOnWrite(t *testing.T) { _, err = ddb.Commit(context.Background(), valHash, ref.NewBranchRef(defaultBranch), meta) require.NoError(t, err) - ds, err := ddb.db.GetDataset(ctx, "refs/heads/main") + ds, err := doltdb.HackDatasDatabaseFromDoltDB(ddb).GetDataset(ctx, "refs/heads/main") require.NoError(t, err) _, err = hook.Execute(ctx, ds, ddb) require.NoError(t, err) @@ -293,20 +298,20 @@ func TestAsyncPushOnWrite(t *testing.T) { // same as the call which is made after a branch delete. counts := &countingCommitHook{make(map[string]int)} - destDB.SetCommitHooks(context.Background(), []CommitHook{counts}) + destDB.SetCommitHooks(context.Background(), []doltdb.CommitHook{counts}) bThreads := sql.NewBackgroundThreads() hook, err := NewAsyncPushOnWriteHook(bThreads, destDB, tmpDir, &buffer.Buffer{}) require.NoError(t, err, "create push on write hook without an error") // Pretend we replicate a HEAD which does exist. - ds, err := ddb.db.GetDataset(ctx, "refs/heads/main") + ds, err := doltdb.HackDatasDatabaseFromDoltDB(ddb).GetDataset(ctx, "refs/heads/main") require.NoError(t, err) _, err = hook.Execute(ctx, ds, ddb) require.NoError(t, err) // Pretend we replicate a HEAD which does not exist, i.e., a branch delete. - ds, err = ddb.db.GetDataset(ctx, "refs/heads/does_not_exist") + ds, err = doltdb.HackDatasDatabaseFromDoltDB(ddb).GetDataset(ctx, "refs/heads/does_not_exist") require.NoError(t, err) _, err = hook.Execute(ctx, ds, ddb) require.NoError(t, err) @@ -326,14 +331,14 @@ func TestAsyncPushOnWrite(t *testing.T) { }) } -var _ CommitHook = (*countingCommitHook)(nil) +var _ doltdb.CommitHook = (*countingCommitHook)(nil) type countingCommitHook struct { // The number of times Execute() got called for given dataset. counts map[string]int } -func (c *countingCommitHook) Execute(ctx context.Context, ds datas.Dataset, db *DoltDB) (func(context.Context) error, error) { +func (c *countingCommitHook) Execute(ctx context.Context, ds datas.Dataset, db *doltdb.DoltDB) (func(context.Context) error, error) { c.counts[ds.ID()] += 1 return nil, nil } @@ -349,3 +354,80 @@ func (c *countingCommitHook) SetLogger(ctx context.Context, wr io.Writer) error func (c *countingCommitHook) ExecuteForWorkingSets() bool { return false } + +const ( + idTag = 0 + firstTag = 1 + lastTag = 2 + isMarriedTag = 3 + ageTag = 4 + emptySchTag = 5 +) +const testSchemaIndexName = "idx_name" +const testSchemaIndexAge = "idx_age" + +var id0, _ = uuid.NewRandom() +var id1, _ = uuid.NewRandom() +var id2, _ = uuid.NewRandom() +var id3, _ = uuid.NewRandom() + +func createTestSchema(t *testing.T) schema.Schema { + colColl := schema.NewColCollection( + schema.NewColumn("id", idTag, types.UUIDKind, true, schema.NotNullConstraint{}), + schema.NewColumn("first", firstTag, types.StringKind, false, schema.NotNullConstraint{}), + schema.NewColumn("last", lastTag, types.StringKind, false, schema.NotNullConstraint{}), + schema.NewColumn("is_married", isMarriedTag, types.BoolKind, false), + schema.NewColumn("age", ageTag, types.UintKind, false), + schema.NewColumn("empty", emptySchTag, types.IntKind, false), + ) + sch, err := schema.SchemaFromCols(colColl) + require.NoError(t, err) + _, err = sch.Indexes().AddIndexByColTags(testSchemaIndexName, []uint64{firstTag, lastTag}, nil, schema.IndexProperties{IsUnique: false, Comment: ""}) + require.NoError(t, err) + _, err = sch.Indexes().AddIndexByColTags(testSchemaIndexAge, []uint64{ageTag}, nil, schema.IndexProperties{IsUnique: false, Comment: ""}) + require.NoError(t, err) + return sch +} + +func createTestRowData(t *testing.T, vrw types.ValueReadWriter, ns tree.NodeStore, sch schema.Schema) durable.Index { + if types.Format_Default == types.Format_DOLT { + idx, err := durable.NewEmptyPrimaryIndex(context.Background(), vrw, ns, sch) + require.NoError(t, err) + return idx + } + + vals := []row.TaggedValues{ + {idTag: types.UUID(id0), firstTag: types.String("bill"), lastTag: types.String("billerson"), ageTag: types.Uint(53)}, + {idTag: types.UUID(id1), firstTag: types.String("eric"), lastTag: types.String("ericson"), isMarriedTag: types.Bool(true), ageTag: types.Uint(21)}, + {idTag: types.UUID(id2), firstTag: types.String("john"), lastTag: types.String("johnson"), isMarriedTag: types.Bool(false), ageTag: types.Uint(53)}, + {idTag: types.UUID(id3), firstTag: types.String("robert"), lastTag: types.String("robertson"), ageTag: types.Uint(36)}, + } + + var err error + rows := make([]row.Row, len(vals)) + + m, err := types.NewMap(context.Background(), vrw) + assert.NoError(t, err) + ed := m.Edit() + + for i, val := range vals { + r, err := row.New(vrw.Format(), sch, val) + require.NoError(t, err) + rows[i] = r + ed = ed.Set(r.NomsMapKey(sch), r.NomsMapValue(sch)) + } + + m, err = ed.Map(context.Background()) + assert.NoError(t, err) + return durable.IndexFromNomsMap(m, vrw, ns) +} + +func createHooksTestTable(vrw types.ValueReadWriter, ns tree.NodeStore, tSchema schema.Schema, rowData durable.Index) (*doltdb.Table, error) { + tbl, err := doltdb.NewTable(context.Background(), vrw, ns, tSchema, rowData, nil, nil) + + if err != nil { + return nil, err + } + + return tbl, nil +} diff --git a/go/libraries/doltcore/sqle/replication.go b/go/libraries/doltcore/sqle/replication.go index 0a6319ba93..dfc9abd644 100644 --- a/go/libraries/doltcore/sqle/replication.go +++ b/go/libraries/doltcore/sqle/replication.go @@ -62,10 +62,10 @@ func getPushOnWriteHook(ctx context.Context, bThreads *sql.BackgroundThreads, dE return nil, err } if _, val, ok = sql.SystemVariables.GetGlobal(dsess.AsyncReplication); ok && val == dsess.SysVarTrue { - return doltdb.NewAsyncPushOnWriteHook(bThreads, ddb, tmpDir, logger) + return NewAsyncPushOnWriteHook(bThreads, ddb, tmpDir, logger) } - return doltdb.NewPushOnWriteHook(ddb, tmpDir), nil + return NewPushOnWriteHook(ddb, tmpDir), nil } // GetCommitHooks creates a list of hooks to execute on database commit. Hooks that cannot be created because of an @@ -77,7 +77,7 @@ func GetCommitHooks(ctx context.Context, bThreads *sql.BackgroundThreads, dEnv * if err != nil { path, _ := dEnv.FS.Abs(".") logrus.Errorf("error loading replication for database at %s, replication disabled: %v", path, err) - postCommitHooks = append(postCommitHooks, doltdb.NewLogHook([]byte(err.Error()+"\n"))) + postCommitHooks = append(postCommitHooks, NewLogHook([]byte(err.Error()+"\n"))) } else if hook != nil { postCommitHooks = append(postCommitHooks, hook) } diff --git a/go/libraries/doltcore/sqle/replication_test.go b/go/libraries/doltcore/sqle/replication_test.go index d73c1966f1..14fa20650a 100644 --- a/go/libraries/doltcore/sqle/replication_test.go +++ b/go/libraries/doltcore/sqle/replication_test.go @@ -43,7 +43,7 @@ func TestCommitHooksNoErrors(t *testing.T) { t.Error("failed to produce noop hook") } else { switch h := hooks[0].(type) { - case *doltdb.LogHook: + case *LogHook: default: t.Errorf("expected LogHook, found: %s", h) }