Skip to content

Commit

Permalink
[CF-557] Changelog updates should wait for transaction commit
Browse files Browse the repository at this point in the history
  • Loading branch information
erikdw committed Jan 11, 2024
1 parent 3652065 commit b8c6173
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/cmd/ctlstore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type supervisorCliConfig struct {
type ledgerHealthConfig struct {
Disable bool `conf:"disable" help:"disable ledger latency health attributing (DEPRECATED: use disable-ecs-behavior instead)"`
DisableECSBehavior bool `conf:"disable-ecs-behavior" help:"disable ledger latency health attributing"`
MaxHealthyLatency time.Duration `conf:"max-healty-latency" help:"Max latency considered healthy"`
MaxHealthyLatency time.Duration `conf:"max-healthy-latency" help:"Max latency considered healthy"`
AttributeName string `conf:"attribute-name" help:"The name of the attribute"`
HealthyAttributeValue string `conf:"healthy-attribute-value" help:"The value of the attribute if healthy"`
UnhealthyAttributeValue string `conf:"unhealth-attribute-value" help:"The value of the attribute if unhealthy"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/executive/db_executive.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (e *dbExecutive) AddFields(familyName string, tableName string, fieldNames
}

// We first write the column modification to the DML ledger within the transaction.
// It's important that this is done befored the DDL is applied to the ctldb, as
// It's important that this is done before the DDL is applied to the ctldb, as
// the DDL is not able to be rolled back. In this way, if the DDL fails, the DML
// can be rolled back.
dlw := dmlLedgerWriter{Tx: tx, TableName: dmlLedgerTableName}
Expand Down
4 changes: 4 additions & 0 deletions pkg/ldb/ldbs.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,7 @@ func FetchSeqFromLdb(ctx context.Context, db *sql.DB) (schema.DMLSequence, error
}
return schema.DMLSequence(seq), err
}

func IsInternalTable(name string) bool {
return name == LDBSeqTableName || name == LDBLastUpdateTableName
}
35 changes: 32 additions & 3 deletions pkg/ldbwriter/ldb_callback_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,52 @@ import (
"github.com/segmentio/ctlstore/pkg/schema"
"github.com/segmentio/ctlstore/pkg/sqlite"
"github.com/segmentio/events/v2"
"github.com/segmentio/stats/v4"
)

// CallbackWriter is an LDBWriter that delegates to another
// writer and then, upon a successful write, executes N callbacks.
type CallbackWriter struct {
DB *sql.DB
Delegate LDBWriter
Callbacks []LDBWriteCallback
DB *sql.DB
Delegate LDBWriter
Callbacks []LDBWriteCallback
// Buffer between SQLite Callback and our code
ChangeBuffer *sqlite.SQLChangeBuffer
// Accumulated changes across multiple ApplyDMLStatement calls
transactionChanges []sqlite.SQLiteWatchChange
}

func (w *CallbackWriter) ApplyDMLStatement(ctx context.Context, statement schema.DMLStatement) error {
err := w.Delegate.ApplyDMLStatement(ctx, statement)
if err != nil {
return err
}

// If beginning a transaction then start accumulating changes
if statement.Statement == schema.DMLTxBeginKey {
w.transactionChanges = make([]sqlite.SQLiteWatchChange, 0)
stats.Set("ldb_changes_accumulated", len(w.transactionChanges))
return nil
}

changes := w.ChangeBuffer.Pop()

// Are we in a transaction?
if w.transactionChanges != nil {
if statement.Statement == schema.DMLTxEndKey {
// Transaction done! Send out the accumulated changes
changes = append(w.transactionChanges, changes...)
stats.Set("ldb_changes_accumulated", len(changes))
w.transactionChanges = nil
} else {
// Transaction isn't over yet, save the latest changes
w.transactionChanges = append(w.transactionChanges, changes...)
stats.Set("ldb_changes_accumulated", len(w.transactionChanges))
return nil
}
}

stats.Observe("ldb_changes_written", len(changes))
for _, callback := range w.Callbacks {
events.Debug("Writing DML callback for %{cb}T", callback)
callback.LDBWritten(ctx, LDBWriteMetadata{
Expand Down
147 changes: 147 additions & 0 deletions pkg/ldbwriter/ldb_callback_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package ldbwriter

import (
"context"
"database/sql"
"github.com/segmentio/ctlstore/pkg/ldb"
"github.com/segmentio/ctlstore/pkg/schema"
"github.com/segmentio/ctlstore/pkg/sqlite"
"github.com/stretchr/testify/assert"
"testing"
)

/*
* Simple LDBWriteCallback handler that just stores the changes it gets.
*/
type TestUpdateCallbackHandler struct {
Changes []sqlite.SQLiteWatchChange
}

func (u *TestUpdateCallbackHandler) LDBWritten(ctx context.Context, data LDBWriteMetadata) {
// The [:0] slice operation will reuse the underlying array of u.Changes if it's large enough
// to hold all elements of data.Changes, otherwise it will allocate a new one.
u.Changes = append(u.Changes[:0], data.Changes...)
}

func (u *TestUpdateCallbackHandler) UpdateCount() int {
return len(u.Changes)
}

func (u *TestUpdateCallbackHandler) Reset() {
u.Changes = u.Changes[:0]
return
}

/*
* Test strategy:
* Check how many times we get callbacks while applying DML statements,
* and how many updates we get per callback.
*/
func TestCallbackWriter_ApplyDMLStatement(t *testing.T) {
// Begin boilerplate
var err error
ctx := context.Background()
var changeBuffer sqlite.SQLChangeBuffer
dbName := "test_ldb_callback_writer"
_ = sqlite.RegisterSQLiteWatch(dbName, &changeBuffer)

db, err := sql.Open(dbName, ":memory:")
if err != nil {
t.Fatalf("Unexpected error: %+v", err)
}
defer db.Close()

err = ldb.EnsureLdbInitialized(ctx, db)
if err != nil {
t.Fatalf("Couldn't initialize SQLite db, error %v", err)
}
// End boilerplate

// Set up the callback writer with our test callback handler
ldbWriteCallback := &TestUpdateCallbackHandler{}

writer := CallbackWriter{
DB: db,
Delegate: &SqlLdbWriter{Db: db},
Callbacks: []LDBWriteCallback{ldbWriteCallback},
ChangeBuffer: &changeBuffer,
}

err = writer.ApplyDMLStatement(ctx, schema.NewTestDMLStatement("CREATE TABLE foo (bar VARCHAR);"))
if err != nil {
t.Fatalf("Could not issue CREATE TABLE statements, error %v", err)
}

type args struct {
ctx context.Context
statements []schema.DMLStatement
}
tests := []struct {
name string
args args
expectedCallbacks int
expectedUpdatesPerCallback int
wantErr bool
}{
{
name: "Test 1",
args: args{
ctx: ctx,
statements: []schema.DMLStatement{schema.NewTestDMLStatement("INSERT INTO foo VALUES('dummy');")},
},
expectedCallbacks: 1,
expectedUpdatesPerCallback: 1,
wantErr: false,
},
{
name: "Test 2",
args: args{
ctx: ctx,
statements: []schema.DMLStatement{
schema.NewTestDMLStatement("INSERT INTO foo VALUES('boston');"),
schema.NewTestDMLStatement("INSERT INTO foo VALUES('detroit');"),
schema.NewTestDMLStatement("INSERT INTO foo VALUES('chicago');"),
},
},
// bare statements outside of a transaction should get a callback each time
expectedCallbacks: 3,
expectedUpdatesPerCallback: 1,
wantErr: false,
},
{
name: "Test 3",
args: args{
ctx: ctx,
statements: []schema.DMLStatement{
schema.NewTestDMLStatement(schema.DMLTxBeginKey),
schema.NewTestDMLStatement("INSERT INTO foo VALUES('asdf');"),
schema.NewTestDMLStatement("INSERT INTO foo VALUES('foo');"),
schema.NewTestDMLStatement("INSERT INTO foo VALUES('bar');"),
schema.NewTestDMLStatement(schema.DMLTxEndKey),
},
},
// since it's a transaction, we expect only one callback, and it should have all 3 updates
expectedCallbacks: 1,
expectedUpdatesPerCallback: 3,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
callbackCount := 0
for _, statement := range tt.args.statements {
if err := writer.ApplyDMLStatement(tt.args.ctx, statement); (err != nil) != tt.wantErr {
t.Errorf("ApplyDMLStatement() error = %v, wantErr %v", err, tt.wantErr)
}
// did we get a callback from that statement being applied?
if ldbWriteCallback.UpdateCount() > 0 {
callbackCount++
assert.Equal(t, tt.expectedUpdatesPerCallback, ldbWriteCallback.UpdateCount())
// delete previous callback's update entries since we "handled" the callback
ldbWriteCallback.Reset()
}
}
assert.Equal(t, tt.expectedCallbacks, callbackCount)
})
}
}
7 changes: 6 additions & 1 deletion pkg/sqlite/sqlite_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package sqlite
import (
"context"
"database/sql"

"github.com/pkg/errors"
"github.com/segmentio/ctlstore/pkg/ldb"
"github.com/segmentio/ctlstore/pkg/scanfunc"
"github.com/segmentio/ctlstore/pkg/schema"
"github.com/segmentio/go-sqlite3"
Expand Down Expand Up @@ -40,6 +40,11 @@ func RegisterSQLiteWatch(dbName string, buffer *SQLChangeBuffer) error {
var newRow []interface{}
var oldRow []interface{}

// Don't bother propagating updates of our internal bookkeeping tables
if ldb.IsInternalTable(pud.TableName) {
return
}

if pud.Op == sqlite3.SQLITE_UPDATE || pud.Op == sqlite3.SQLITE_DELETE {
oldRow = make([]interface{}, cnt)
err := pud.Old(oldRow...)
Expand Down

0 comments on commit b8c6173

Please sign in to comment.