Skip to content

Commit

Permalink
Add event stream (#401)
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson authored Sep 6, 2023
1 parent 4eaa0cb commit dd46004
Show file tree
Hide file tree
Showing 5 changed files with 415 additions and 45 deletions.
117 changes: 117 additions & 0 deletions cmd/litefs/mount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"database/sql"
_ "embed"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -2302,6 +2303,122 @@ func TestMultiNode_WriteSnapshot_LockingProtocol(t *testing.T) {
runMountCommand(t, cmd1)
}

func TestEventStream(t *testing.T) {
t.Run("Tx/Primary", func(t *testing.T) {
cmd0 := runMountCommand(t, newMountCommand(t, t.TempDir(), nil))
db := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "db"))

resp, err := http.Get(cmd0.HTTPServer.URL() + "/events")
if err != nil {
t.Fatal(err)
}
defer func() { _ = resp.Body.Close() }()

dec := json.NewDecoder(resp.Body)

var offset ltx.TXID
if testingutil.IsWALMode() {
offset = 1
}

var event litefs.Event
if err := dec.Decode(&event); err != nil {
t.Fatal(err)
} else if got, want := event.Type, "init"; got != want {
t.Fatalf("type=%s, want %s", got, want)
}

if _, err := db.Exec(`CREATE TABLE t (x)`); err != nil {
t.Fatal(err)
}
if err := dec.Decode(&event); err != nil {
t.Fatal(err)
} else if got, want := event.Type, "tx"; got != want {
t.Fatalf("type=%s, want %s", got, want)
} else if got, want := event.DB, "db"; got != want {
t.Fatalf("db=%s, want %s", got, want)
} else if got, want := event.Data.(*litefs.TxEventData).TXID, ltx.TXID(offset+1); got != want {
t.Fatalf("data.txid=%s, want %s", got, want)
}

if _, err := db.Exec(`INSERT INTO t VALUES (100)`); err != nil {
t.Fatal(err)
}
if err := dec.Decode(&event); err != nil {
t.Fatal(err)
} else if got, want := event.Type, "tx"; got != want {
t.Fatalf("type=%s, want %s", got, want)
} else if got, want := event.DB, "db"; got != want {
t.Fatalf("db=%s, want %s", got, want)
} else if got, want := event.Data.(*litefs.TxEventData).TXID, ltx.TXID(offset+2); got != want {
t.Fatalf("data.txid=%s, want %s", got, want)
}
})

t.Run("Tx/Replica", func(t *testing.T) {
cmd0 := runMountCommand(t, newMountCommand(t, t.TempDir(), nil))
waitForPrimary(t, cmd0)
cmd1 := runMountCommand(t, newMountCommand(t, t.TempDir(), cmd0))
db := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "db"))

resp, err := http.Get(cmd1.HTTPServer.URL() + "/events")
if err != nil {
t.Fatal(err)
}
defer func() { _ = resp.Body.Close() }()

dec := json.NewDecoder(resp.Body)

var event litefs.Event
if err := dec.Decode(&event); err != nil {
t.Fatal(err)
} else if got, want := event.Type, "init"; got != want {
t.Fatalf("type=%s, want %s", got, want)
}

var offset ltx.TXID
if testingutil.IsWALMode() {
offset = 1

if err := dec.Decode(&event); err != nil {
t.Fatal(err)
} else if got, want := event.Type, "tx"; got != want {
t.Fatalf("type=%s, want %s", got, want)
} else if got, want := event.DB, "db"; got != want {
t.Fatalf("db=%s, want %s", got, want)
} else if got, want := event.Data.(*litefs.TxEventData).TXID, ltx.TXID(1); got != want {
t.Fatalf("data.txid=%s, want %s", got, want)
}
}

if _, err := db.Exec(`CREATE TABLE t (x)`); err != nil {
t.Fatal(err)
}
if err := dec.Decode(&event); err != nil {
t.Fatal(err)
} else if got, want := event.Type, "tx"; got != want {
t.Fatalf("type=%s, want %s", got, want)
} else if got, want := event.DB, "db"; got != want {
t.Fatalf("db=%s, want %s", got, want)
} else if got, want := event.Data.(*litefs.TxEventData).TXID, ltx.TXID(offset+1); got != want {
t.Fatalf("data.txid=%s, want %s", got, want)
}

if _, err := db.Exec(`INSERT INTO t VALUES (100)`); err != nil {
t.Fatal(err)
}
if err := dec.Decode(&event); err != nil {
t.Fatal(err)
} else if got, want := event.Type, "tx"; got != want {
t.Fatalf("type=%s, want %s", got, want)
} else if got, want := event.DB, "db"; got != want {
t.Fatalf("db=%s, want %s", got, want)
} else if got, want := event.Data.(*litefs.TxEventData).TXID, ltx.TXID(offset+2); got != want {
t.Fatalf("data.txid=%s, want %s", got, want)
}
})
}

// Ensure multiple nodes can run in a cluster for an extended period of time.
func TestFunctional_OK(t *testing.T) {
if *funTime <= 0 {
Expand Down
57 changes: 55 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1735,6 +1735,19 @@ func (db *DB) CommitWAL(ctx context.Context) (err error) {
// Notify store of database change.
db.store.MarkDirty(db.name)

// Notify event stream subscribers of new transaction.
db.store.NotifyEvent(Event{
Type: EventTypeTx,
DB: db.name,
Data: TxEventData{
TXID: pos.TXID,
PostApplyChecksum: pos.PostApplyChecksum,
PageSize: enc.Header().PageSize,
Commit: enc.Header().Commit,
Timestamp: time.UnixMilli(enc.Header().Timestamp).UTC(),
},
})

// Perform full checksum verification, if set. For testing only.
if db.store.StrictVerify {
if chksum, err := db.onDiskChecksum(dbFile, walFile); err != nil {
Expand Down Expand Up @@ -2109,6 +2122,19 @@ func (db *DB) CommitJournal(ctx context.Context, mode JournalMode) (err error) {
// Notify store of database change.
db.store.MarkDirty(db.name)

// Notify event stream subscribers of new transaction.
db.store.NotifyEvent(Event{
Type: EventTypeTx,
DB: db.name,
Data: TxEventData{
TXID: pos.TXID,
PostApplyChecksum: pos.PostApplyChecksum,
PageSize: enc.Header().PageSize,
Commit: enc.Header().Commit,
Timestamp: time.UnixMilli(enc.Header().Timestamp).UTC(),
},
})

// Calculate checksum for entire database.
if db.store.StrictVerify {
if chksum, err := db.onDiskChecksum(dbFile, nil); err != nil {
Expand Down Expand Up @@ -2237,6 +2263,19 @@ func (db *DB) Drop(ctx context.Context) (err error) {
// Notify store of database change.
db.store.MarkDirty(db.name)

// Notify event stream subscribers of new transaction.
db.store.NotifyEvent(Event{
Type: EventTypeTx,
DB: db.name,
Data: TxEventData{
TXID: pos.TXID,
PostApplyChecksum: pos.PostApplyChecksum,
PageSize: enc.Header().PageSize,
Commit: enc.Header().Commit,
Timestamp: time.UnixMilli(enc.Header().Timestamp).UTC(),
},
})

return nil
}

Expand Down Expand Up @@ -2521,10 +2560,11 @@ func (db *DB) ApplyLTXNoLock(ctx context.Context, path string) error {
}

// Update transaction for database.
if err := db.setPos(ltx.Pos{
pos := ltx.Pos{
TXID: dec.Header().MaxTXID,
PostApplyChecksum: dec.Trailer().PostApplyChecksum,
}, dec.Header().Timestamp); err != nil {
}
if err := db.setPos(pos, dec.Header().Timestamp); err != nil {
return fmt.Errorf("set pos: %w", err)
}

Expand All @@ -2543,6 +2583,19 @@ func (db *DB) ApplyLTXNoLock(ctx context.Context, path string) error {
// Notify store of database change.
db.store.MarkDirty(db.name)

// Notify event stream subscribers of new transaction.
db.store.NotifyEvent(Event{
Type: EventTypeTx,
DB: db.name,
Data: TxEventData{
TXID: pos.TXID,
PostApplyChecksum: pos.PostApplyChecksum,
PageSize: dec.Header().PageSize,
Commit: dec.Header().Commit,
Timestamp: time.UnixMilli(dec.Header().Timestamp).UTC(),
},
})

// Calculate latency since LTX file was written.
latency := float64(time.Now().UnixMilli()-dec.Header().Timestamp) / 1000
dbLatencySecondsMetricVec.WithLabelValues(db.name).Set(latency)
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,6 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/superfly/litefs-go v0.0.0-20230227231337-34ea5dcf1e0b h1:+WuhtZFB8fNdPeaMUtuB/U8aknXBXdDW/mBm/HTYJNg=
github.com/superfly/litefs-go v0.0.0-20230227231337-34ea5dcf1e0b/go.mod h1:h+GUx1V2s0C5nY73ZN82760eWEJrpMaiDweF31VmJKk=
github.com/superfly/ltx v0.3.12 h1:Z7z1sc4g34/jUi3XO84+zBlIsbaoh2RJ3b4zTQpBK/M=
github.com/superfly/ltx v0.3.12/go.mod h1:ly+Dq7UVacQVEI5/b0r6j+PSNy9ibwx1yikcWAaSkhE=
github.com/superfly/ltx v0.3.13 h1:IbuocKJ6sY2jYvZbpUGMYmTkvaLSGUderEZwmaIUmJ0=
github.com/superfly/ltx v0.3.13/go.mod h1:ly+Dq7UVacQVEI5/b0r6j+PSNy9ibwx1yikcWAaSkhE=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
Expand Down
36 changes: 35 additions & 1 deletion http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ func (s *Server) serveHTTP(w http.ResponseWriter, r *http.Request) {
Error(w, r, fmt.Errorf("method not allowed"), http.StatusMethodNotAllowed)
}

case "/events":
switch r.Method {
case http.MethodGet:
s.handleGetEvents(w, r)
default:
Error(w, r, fmt.Errorf("method not allowed"), http.StatusMethodNotAllowed)
}

default:
http.NotFound(w, r)
}
Expand Down Expand Up @@ -506,7 +514,7 @@ func (s *Server) handlePostStream(w http.ResponseWriter, r *http.Request) {
defer serverStreamCountMetric.Dec()

// Subscribe to store changes
subscription := s.store.Subscribe(id)
subscription := s.store.SubscribeChangeSet(id)
defer func() { _ = subscription.Close() }()

// Read in pos map.
Expand Down Expand Up @@ -745,6 +753,32 @@ func (s *Server) streamLTXSnapshot(ctx context.Context, w http.ResponseWriter, d
return ltx.Pos{TXID: header.MaxTXID, PostApplyChecksum: trailer.PostApplyChecksum}, nil
}

func (s *Server) handleGetEvents(w http.ResponseWriter, r *http.Request) {
subscription := s.store.SubscribeEvents()
defer func() { subscription.Stop() }()

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)

enc := json.NewEncoder(w)
for {
select {
case <-r.Context().Done():
return
case event, ok := <-subscription.C():
if !ok {
log.Printf("http: event stream buffer exceeded, disconnecting")
return
}
if err := enc.Encode(event); err != nil {
log.Printf("http: event stream error: %s", err)
return
}
w.(http.Flusher).Flush()
}
}
}

func Error(w http.ResponseWriter, r *http.Request, err error, code int) {
log.Printf("http: %s %s: error: %s", r.Method, r.URL.Path, err)
http.Error(w, err.Error(), code)
Expand Down
Loading

0 comments on commit dd46004

Please sign in to comment.