From dd460046924cad928cd49d62990e57ad510cb61c Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 6 Sep 2023 17:24:00 -0600 Subject: [PATCH] Add event stream (#401) --- cmd/litefs/mount_test.go | 117 ++++++++++++++++++ db.go | 57 ++++++++- go.sum | 2 - http/server.go | 36 +++++- store.go | 248 ++++++++++++++++++++++++++++++++------- 5 files changed, 415 insertions(+), 45 deletions(-) diff --git a/cmd/litefs/mount_test.go b/cmd/litefs/mount_test.go index 59ed1cb..d3008a8 100644 --- a/cmd/litefs/mount_test.go +++ b/cmd/litefs/mount_test.go @@ -8,6 +8,7 @@ import ( "database/sql" _ "embed" "encoding/hex" + "encoding/json" "fmt" "io" "log" @@ -2302,6 +2303,122 @@ func TestMultiNode_WriteSnapshot_LockingProtocol(t *testing.T) { runMountCommand(t, cmd1) } +func TestEventStream(t *testing.T) { + t.Run("Tx/Primary", func(t *testing.T) { + cmd0 := runMountCommand(t, newMountCommand(t, t.TempDir(), nil)) + db := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "db")) + + resp, err := http.Get(cmd0.HTTPServer.URL() + "/events") + if err != nil { + t.Fatal(err) + } + defer func() { _ = resp.Body.Close() }() + + dec := json.NewDecoder(resp.Body) + + var offset ltx.TXID + if testingutil.IsWALMode() { + offset = 1 + } + + var event litefs.Event + if err := dec.Decode(&event); err != nil { + t.Fatal(err) + } else if got, want := event.Type, "init"; got != want { + t.Fatalf("type=%s, want %s", got, want) + } + + if _, err := db.Exec(`CREATE TABLE t (x)`); err != nil { + t.Fatal(err) + } + if err := dec.Decode(&event); err != nil { + t.Fatal(err) + } else if got, want := event.Type, "tx"; got != want { + t.Fatalf("type=%s, want %s", got, want) + } else if got, want := event.DB, "db"; got != want { + t.Fatalf("db=%s, want %s", got, want) + } else if got, want := event.Data.(*litefs.TxEventData).TXID, ltx.TXID(offset+1); got != want { + t.Fatalf("data.txid=%s, want %s", got, want) + } + + if _, err := db.Exec(`INSERT INTO t VALUES (100)`); err != nil { + t.Fatal(err) + } + if err := dec.Decode(&event); err != nil { + t.Fatal(err) + } else if got, want := event.Type, "tx"; got != want { + t.Fatalf("type=%s, want %s", got, want) + } else if got, want := event.DB, "db"; got != want { + t.Fatalf("db=%s, want %s", got, want) + } else if got, want := event.Data.(*litefs.TxEventData).TXID, ltx.TXID(offset+2); got != want { + t.Fatalf("data.txid=%s, want %s", got, want) + } + }) + + t.Run("Tx/Replica", func(t *testing.T) { + cmd0 := runMountCommand(t, newMountCommand(t, t.TempDir(), nil)) + waitForPrimary(t, cmd0) + cmd1 := runMountCommand(t, newMountCommand(t, t.TempDir(), cmd0)) + db := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "db")) + + resp, err := http.Get(cmd1.HTTPServer.URL() + "/events") + if err != nil { + t.Fatal(err) + } + defer func() { _ = resp.Body.Close() }() + + dec := json.NewDecoder(resp.Body) + + var event litefs.Event + if err := dec.Decode(&event); err != nil { + t.Fatal(err) + } else if got, want := event.Type, "init"; got != want { + t.Fatalf("type=%s, want %s", got, want) + } + + var offset ltx.TXID + if testingutil.IsWALMode() { + offset = 1 + + if err := dec.Decode(&event); err != nil { + t.Fatal(err) + } else if got, want := event.Type, "tx"; got != want { + t.Fatalf("type=%s, want %s", got, want) + } else if got, want := event.DB, "db"; got != want { + t.Fatalf("db=%s, want %s", got, want) + } else if got, want := event.Data.(*litefs.TxEventData).TXID, ltx.TXID(1); got != want { + t.Fatalf("data.txid=%s, want %s", got, want) + } + } + + if _, err := db.Exec(`CREATE TABLE t (x)`); err != nil { + t.Fatal(err) + } + if err := dec.Decode(&event); err != nil { + t.Fatal(err) + } else if got, want := event.Type, "tx"; got != want { + t.Fatalf("type=%s, want %s", got, want) + } else if got, want := event.DB, "db"; got != want { + t.Fatalf("db=%s, want %s", got, want) + } else if got, want := event.Data.(*litefs.TxEventData).TXID, ltx.TXID(offset+1); got != want { + t.Fatalf("data.txid=%s, want %s", got, want) + } + + if _, err := db.Exec(`INSERT INTO t VALUES (100)`); err != nil { + t.Fatal(err) + } + if err := dec.Decode(&event); err != nil { + t.Fatal(err) + } else if got, want := event.Type, "tx"; got != want { + t.Fatalf("type=%s, want %s", got, want) + } else if got, want := event.DB, "db"; got != want { + t.Fatalf("db=%s, want %s", got, want) + } else if got, want := event.Data.(*litefs.TxEventData).TXID, ltx.TXID(offset+2); got != want { + t.Fatalf("data.txid=%s, want %s", got, want) + } + }) +} + // Ensure multiple nodes can run in a cluster for an extended period of time. func TestFunctional_OK(t *testing.T) { if *funTime <= 0 { diff --git a/db.go b/db.go index 36e6181..a14dd72 100644 --- a/db.go +++ b/db.go @@ -1735,6 +1735,19 @@ func (db *DB) CommitWAL(ctx context.Context) (err error) { // Notify store of database change. db.store.MarkDirty(db.name) + // Notify event stream subscribers of new transaction. + db.store.NotifyEvent(Event{ + Type: EventTypeTx, + DB: db.name, + Data: TxEventData{ + TXID: pos.TXID, + PostApplyChecksum: pos.PostApplyChecksum, + PageSize: enc.Header().PageSize, + Commit: enc.Header().Commit, + Timestamp: time.UnixMilli(enc.Header().Timestamp).UTC(), + }, + }) + // Perform full checksum verification, if set. For testing only. if db.store.StrictVerify { if chksum, err := db.onDiskChecksum(dbFile, walFile); err != nil { @@ -2109,6 +2122,19 @@ func (db *DB) CommitJournal(ctx context.Context, mode JournalMode) (err error) { // Notify store of database change. db.store.MarkDirty(db.name) + // Notify event stream subscribers of new transaction. + db.store.NotifyEvent(Event{ + Type: EventTypeTx, + DB: db.name, + Data: TxEventData{ + TXID: pos.TXID, + PostApplyChecksum: pos.PostApplyChecksum, + PageSize: enc.Header().PageSize, + Commit: enc.Header().Commit, + Timestamp: time.UnixMilli(enc.Header().Timestamp).UTC(), + }, + }) + // Calculate checksum for entire database. if db.store.StrictVerify { if chksum, err := db.onDiskChecksum(dbFile, nil); err != nil { @@ -2237,6 +2263,19 @@ func (db *DB) Drop(ctx context.Context) (err error) { // Notify store of database change. db.store.MarkDirty(db.name) + // Notify event stream subscribers of new transaction. + db.store.NotifyEvent(Event{ + Type: EventTypeTx, + DB: db.name, + Data: TxEventData{ + TXID: pos.TXID, + PostApplyChecksum: pos.PostApplyChecksum, + PageSize: enc.Header().PageSize, + Commit: enc.Header().Commit, + Timestamp: time.UnixMilli(enc.Header().Timestamp).UTC(), + }, + }) + return nil } @@ -2521,10 +2560,11 @@ func (db *DB) ApplyLTXNoLock(ctx context.Context, path string) error { } // Update transaction for database. - if err := db.setPos(ltx.Pos{ + pos := ltx.Pos{ TXID: dec.Header().MaxTXID, PostApplyChecksum: dec.Trailer().PostApplyChecksum, - }, dec.Header().Timestamp); err != nil { + } + if err := db.setPos(pos, dec.Header().Timestamp); err != nil { return fmt.Errorf("set pos: %w", err) } @@ -2543,6 +2583,19 @@ func (db *DB) ApplyLTXNoLock(ctx context.Context, path string) error { // Notify store of database change. db.store.MarkDirty(db.name) + // Notify event stream subscribers of new transaction. + db.store.NotifyEvent(Event{ + Type: EventTypeTx, + DB: db.name, + Data: TxEventData{ + TXID: pos.TXID, + PostApplyChecksum: pos.PostApplyChecksum, + PageSize: dec.Header().PageSize, + Commit: dec.Header().Commit, + Timestamp: time.UnixMilli(dec.Header().Timestamp).UTC(), + }, + }) + // Calculate latency since LTX file was written. latency := float64(time.Now().UnixMilli()-dec.Header().Timestamp) / 1000 dbLatencySecondsMetricVec.WithLabelValues(db.name).Set(latency) diff --git a/go.sum b/go.sum index fcfa40b..60f46c2 100644 --- a/go.sum +++ b/go.sum @@ -308,8 +308,6 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/superfly/litefs-go v0.0.0-20230227231337-34ea5dcf1e0b h1:+WuhtZFB8fNdPeaMUtuB/U8aknXBXdDW/mBm/HTYJNg= github.com/superfly/litefs-go v0.0.0-20230227231337-34ea5dcf1e0b/go.mod h1:h+GUx1V2s0C5nY73ZN82760eWEJrpMaiDweF31VmJKk= -github.com/superfly/ltx v0.3.12 h1:Z7z1sc4g34/jUi3XO84+zBlIsbaoh2RJ3b4zTQpBK/M= -github.com/superfly/ltx v0.3.12/go.mod h1:ly+Dq7UVacQVEI5/b0r6j+PSNy9ibwx1yikcWAaSkhE= github.com/superfly/ltx v0.3.13 h1:IbuocKJ6sY2jYvZbpUGMYmTkvaLSGUderEZwmaIUmJ0= github.com/superfly/ltx v0.3.13/go.mod h1:ly+Dq7UVacQVEI5/b0r6j+PSNy9ibwx1yikcWAaSkhE= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= diff --git a/http/server.go b/http/server.go index 598299a..9cbd473 100644 --- a/http/server.go +++ b/http/server.go @@ -233,6 +233,14 @@ func (s *Server) serveHTTP(w http.ResponseWriter, r *http.Request) { Error(w, r, fmt.Errorf("method not allowed"), http.StatusMethodNotAllowed) } + case "/events": + switch r.Method { + case http.MethodGet: + s.handleGetEvents(w, r) + default: + Error(w, r, fmt.Errorf("method not allowed"), http.StatusMethodNotAllowed) + } + default: http.NotFound(w, r) } @@ -506,7 +514,7 @@ func (s *Server) handlePostStream(w http.ResponseWriter, r *http.Request) { defer serverStreamCountMetric.Dec() // Subscribe to store changes - subscription := s.store.Subscribe(id) + subscription := s.store.SubscribeChangeSet(id) defer func() { _ = subscription.Close() }() // Read in pos map. @@ -745,6 +753,32 @@ func (s *Server) streamLTXSnapshot(ctx context.Context, w http.ResponseWriter, d return ltx.Pos{TXID: header.MaxTXID, PostApplyChecksum: trailer.PostApplyChecksum}, nil } +func (s *Server) handleGetEvents(w http.ResponseWriter, r *http.Request) { + subscription := s.store.SubscribeEvents() + defer func() { subscription.Stop() }() + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + + enc := json.NewEncoder(w) + for { + select { + case <-r.Context().Done(): + return + case event, ok := <-subscription.C(): + if !ok { + log.Printf("http: event stream buffer exceeded, disconnecting") + return + } + if err := enc.Encode(event); err != nil { + log.Printf("http: event stream error: %s", err) + return + } + w.(http.Flusher).Flush() + } + } +} + func Error(w http.ResponseWriter, r *http.Request, err error, code int) { log.Printf("http: %s %s: error: %s", r.Method, r.URL.Path, err) http.Error(w, err.Error(), code) diff --git a/store.go b/store.go index 2ed7766..ff8e8e3 100644 --- a/store.go +++ b/store.go @@ -58,11 +58,12 @@ type Store struct { mu sync.Mutex path string - id uint64 // unique node id - clusterID atomic.Value - dbs map[string]*DB - subscribers map[*Subscriber]struct{} - primaryTimestamp atomic.Int64 // ms since epoch of last update from primary. -1 if primary + id uint64 // unique node id + clusterID atomic.Value + dbs map[string]*DB + changeSetSubscribers map[*ChangeSetSubscriber]struct{} + eventSubscribers map[*EventSubscriber]struct{} + primaryTimestamp atomic.Int64 // ms since epoch of last update from primary. -1 if primary lease Lease // if not nil, store is current primary primaryCh chan struct{} // closed when primary loses leadership @@ -140,11 +141,13 @@ func NewStore(path string, candidate bool) *Store { dbs: make(map[string]*DB), - subscribers: make(map[*Subscriber]struct{}), - candidate: candidate, - primaryCh: primaryCh, - readyCh: make(chan struct{}), - demoteCh: make(chan struct{}), + changeSetSubscribers: make(map[*ChangeSetSubscriber]struct{}), + eventSubscribers: make(map[*EventSubscriber]struct{}), + + candidate: candidate, + primaryCh: primaryCh, + readyCh: make(chan struct{}), + demoteCh: make(chan struct{}), ReconnectDelay: DefaultReconnectDelay, DemoteDelay: DefaultDemoteDelay, @@ -401,7 +404,7 @@ func (s *Store) Handoff(ctx context.Context, nodeID uint64) error { } // Find connected subscriber by node ID. - sub := s.subscriberByNodeID(nodeID) + sub := s.changeSetSubscriberByNodeID(nodeID) if sub == nil { return fmt.Errorf("target node is not currently connected") } @@ -446,6 +449,8 @@ func (s *Store) setLease(lease Lease) { } else { storeIsPrimaryMetric.Set(0) } + + s.notifyPrimaryChange() } // PrimaryCtx wraps ctx with another context that will cancel when no longer primary. @@ -466,6 +471,11 @@ func (s *Store) PrimaryInfo() (isPrimary bool, info *PrimaryInfo) { return s.isPrimary(), s.primaryInfo.Clone() } +func (s *Store) setPrimaryInfo(info *PrimaryInfo) { + s.primaryInfo = info + s.notifyPrimaryChange() +} + // Candidate returns true if store is eligible to be the primary. func (s *Store) Candidate() bool { return s.candidate @@ -590,37 +600,37 @@ func (s *Store) PosMap() map[string]ltx.Pos { return m } -// Subscribe creates a new subscriber for store changes. -func (s *Store) Subscribe(nodeID uint64) *Subscriber { +// SubscribeChangeSet creates a new subscriber for store changes. +func (s *Store) SubscribeChangeSet(nodeID uint64) *ChangeSetSubscriber { s.mu.Lock() defer s.mu.Unlock() - sub := newSubscriber(s, nodeID) - s.subscribers[sub] = struct{}{} + sub := newChangeSetSubscriber(s, nodeID) + s.changeSetSubscribers[sub] = struct{}{} - storeSubscriberCountMetric.Set(float64(len(s.subscribers))) + storeSubscriberCountMetric.Set(float64(len(s.changeSetSubscribers))) return sub } -// Unsubscribe removes a subscriber from the store. -func (s *Store) Unsubscribe(sub *Subscriber) { +// UnsubscribeChangeSet removes a subscriber from the store. +func (s *Store) UnsubscribeChangeSet(sub *ChangeSetSubscriber) { s.mu.Lock() defer s.mu.Unlock() - delete(s.subscribers, sub) - storeSubscriberCountMetric.Set(float64(len(s.subscribers))) + delete(s.changeSetSubscribers, sub) + storeSubscriberCountMetric.Set(float64(len(s.changeSetSubscribers))) } // SubscriberByNodeID returns a subscriber by node ID. // Returns nil if the node is not currently subscribed to the store. -func (s *Store) SubscriberByNodeID(nodeID uint64) *Subscriber { +func (s *Store) SubscriberByNodeID(nodeID uint64) *ChangeSetSubscriber { s.mu.Lock() defer s.mu.Unlock() - return s.subscriberByNodeID(nodeID) + return s.changeSetSubscriberByNodeID(nodeID) } -func (s *Store) subscriberByNodeID(nodeID uint64) *Subscriber { - for sub := range s.subscribers { +func (s *Store) changeSetSubscriberByNodeID(nodeID uint64) *ChangeSetSubscriber { + for sub := range s.changeSetSubscribers { if sub.NodeID() == nodeID { return sub } @@ -636,11 +646,82 @@ func (s *Store) MarkDirty(name string) { } func (s *Store) markDirty(name string) { - for sub := range s.subscribers { + for sub := range s.changeSetSubscribers { sub.MarkDirty(name) } } +// SubscribeEvents creates a new subscriber for store events. +func (s *Store) SubscribeEvents() *EventSubscriber { + s.mu.Lock() + defer s.mu.Unlock() + + var hostname string + if s.primaryInfo != nil { + hostname = s.primaryInfo.Hostname + } + + sub := newEventSubscriber(s) + sub.ch <- Event{ + Type: EventTypeInit, + Data: InitEventData{ + IsPrimary: s.isPrimary(), + Hostname: hostname, + }, + } + + s.eventSubscribers[sub] = struct{}{} + + return sub +} + +// UnsubscribeEvents removes an event subscriber from the store. +func (s *Store) UnsubscribeEvents(sub *EventSubscriber) { + s.mu.Lock() + defer s.mu.Unlock() + s.unsubscribeEvents(sub) +} + +func (s *Store) unsubscribeEvents(sub *EventSubscriber) { + if _, ok := s.eventSubscribers[sub]; ok { + delete(s.eventSubscribers, sub) + close(sub.ch) + } +} + +// NotifyEvent sends event to all event subscribers. +// If a subscriber has no additional buffer space available then it is closed. +func (s *Store) NotifyEvent(event Event) { + s.mu.Lock() + defer s.mu.Unlock() + s.notifyEvent(event) +} + +func (s *Store) notifyEvent(event Event) { + for sub := range s.eventSubscribers { + select { + case sub.ch <- event: + default: + s.unsubscribeEvents(sub) + } + } +} + +func (s *Store) notifyPrimaryChange() { + var hostname string + if s.primaryInfo != nil { + hostname = s.primaryInfo.Hostname + } + + s.notifyEvent(Event{ + Type: EventTypePrimaryChange, + Data: PrimaryChangeEventData{ + IsPrimary: s.isPrimary(), + Hostname: hostname, + }, + }) +} + // monitorLease continuously handles either the leader lease or replicates from the primary. func (s *Store) monitorLease(ctx context.Context) (err error) { // Initialize environment to indicate this node is not a primary. @@ -925,7 +1006,7 @@ func (s *Store) SyncBackup(ctx context.Context) error { // streamBackup connects to a backup server and continuously streams LTX files. func (s *Store) streamBackup(ctx context.Context, oneTime bool) (err error) { // Start subscription immediately so we can collect any changes. - subscription := s.Subscribe(0) + subscription := s.SubscribeChangeSet(0) defer func() { _ = subscription.Close() }() slog.Info("begin streaming backup", slog.Duration("full-sync-interval", s.BackupFullSyncInterval)) @@ -1246,14 +1327,14 @@ func (s *Store) monitorLeaseAsReplica(ctx context.Context, info PrimaryInfo) (ha // Store the URL of the primary while we're in this function. s.mu.Lock() - s.primaryInfo = &info + s.setPrimaryInfo(&info) s.mu.Unlock() // Clear the primary URL once we leave this function since we can no longer connect. defer func() { s.mu.Lock() defer s.mu.Unlock() - s.primaryInfo = nil + s.setPrimaryInfo(nil) }() posMap := s.PosMap() @@ -1587,13 +1668,13 @@ type storeVarJSON struct { DBs map[string]*dbVarJSON `json:"dbs"` } -// Subscriber subscribes to changes to databases in the store. +// ChangeSetSubscriber subscribes to changes to databases in the store. // // It implements a set of "dirty" databases instead of a channel of all events // as clients can be slow and we don't want to cause channels to back up. It // is the responsibility of the caller to determine the state changes which is // usually just checking the position of the client versus the store's database. -type Subscriber struct { +type ChangeSetSubscriber struct { store *Store nodeID uint64 @@ -1603,9 +1684,9 @@ type Subscriber struct { handoffCh chan string } -// newSubscriber returns a new instance of Subscriber associated with a store. -func newSubscriber(store *Store, nodeID uint64) *Subscriber { - s := &Subscriber{ +// newChangeSetSubscriber returns a new instance of Subscriber associated with a store. +func newChangeSetSubscriber(store *Store, nodeID uint64) *ChangeSetSubscriber { + s := &ChangeSetSubscriber{ store: store, nodeID: nodeID, notifyCh: make(chan struct{}, 1), @@ -1616,22 +1697,22 @@ func newSubscriber(store *Store, nodeID uint64) *Subscriber { } // Close removes the subscriber from the store. -func (s *Subscriber) Close() error { - s.store.Unsubscribe(s) +func (s *ChangeSetSubscriber) Close() error { + s.store.UnsubscribeChangeSet(s) return nil } // NodeID returns the ID of the subscribed node. -func (s *Subscriber) NodeID() uint64 { return s.nodeID } +func (s *ChangeSetSubscriber) NodeID() uint64 { return s.nodeID } // NotifyCh returns a channel that receives a value when the dirty set has changed. -func (s *Subscriber) NotifyCh() <-chan struct{} { return s.notifyCh } +func (s *ChangeSetSubscriber) NotifyCh() <-chan struct{} { return s.notifyCh } // HandoffCh returns a channel that returns a lease ID on handoff. -func (s *Subscriber) HandoffCh() chan string { return s.handoffCh } +func (s *ChangeSetSubscriber) HandoffCh() chan string { return s.handoffCh } // MarkDirty marks a database ID as dirty. -func (s *Subscriber) MarkDirty(name string) { +func (s *ChangeSetSubscriber) MarkDirty(name string) { s.mu.Lock() defer s.mu.Unlock() s.dirtySet[name] = struct{}{} @@ -1644,7 +1725,7 @@ func (s *Subscriber) MarkDirty(name string) { // DirtySet returns a set of database IDs that have changed since the last call // to DirtySet(). This call clears the set. -func (s *Subscriber) DirtySet() map[string]struct{} { +func (s *ChangeSetSubscriber) DirtySet() map[string]struct{} { s.mu.Lock() defer s.mu.Unlock() @@ -1653,6 +1734,93 @@ func (s *Subscriber) DirtySet() map[string]struct{} { return dirtySet } +const EventChannelBufferSize = 1024 + +// EventSubscriber subscribes to generic store events. +type EventSubscriber struct { + store *Store + ch chan Event +} + +// newEventSubscriber returns a new instance of Subscriber associated with a store. +func newEventSubscriber(store *Store) *EventSubscriber { + return &EventSubscriber{ + store: store, + ch: make(chan Event, EventChannelBufferSize), + } +} + +// Stop closes the subscriber and removes it from the store. +func (s *EventSubscriber) Stop() { + s.store.UnsubscribeEvents(s) +} + +// C returns a channel that receives event notifications. +// If caller cannot read events fast enough then channel will be closed. +func (s *EventSubscriber) C() <-chan Event { return s.ch } + +const ( + EventTypeInit = "init" + EventTypeTx = "tx" + EventTypePrimaryChange = "primaryChange" +) + +// Event represents a generic event. +type Event struct { + Type string `json:"type"` + DB string `json:"db,omitempty"` + Data any `json:"data,omitempty"` +} + +func (e *Event) UnmarshalJSON(data []byte) error { + var v eventJSON + if err := json.Unmarshal(data, &v); err != nil { + return err + } + + e.Type = v.Type + e.DB = v.DB + + switch v.Type { + case EventTypeInit: + e.Data = &InitEventData{} + case EventTypeTx: + e.Data = &TxEventData{} + case EventTypePrimaryChange: + e.Data = &PrimaryChangeEventData{} + default: + e.Data = nil + } + if err := json.Unmarshal(v.Data, &e.Data); err != nil { + return err + } + return nil +} + +type eventJSON struct { + Type string `json:"type"` + DB string `json:"db,omitempty"` + Data json.RawMessage `json:"data,omitempty"` +} + +type InitEventData struct { + IsPrimary bool `json:"isPrimary"` + Hostname string `json:"hostname,omitempty"` +} + +type TxEventData struct { + TXID ltx.TXID `json:"txID"` + PostApplyChecksum ltx.Checksum `json:"postApplyChecksum"` + PageSize uint32 `json:"pageSize"` + Commit uint32 `json:"commit"` + Timestamp time.Time `json:"timestamp"` +} + +type PrimaryChangeEventData struct { + IsPrimary bool `json:"isPrimary"` + Hostname string `json:"hostname,omitempty"` +} + var _ context.Context = (*primaryCtx)(nil) // primaryCtx represents a context that is marked done when the node loses its primary status.