From c46a3367f66d6162c45ab1dd8310b006a1d481ca Mon Sep 17 00:00:00 2001 From: Arjun Sunil Kumar Date: Tue, 9 Jan 2024 02:29:20 -0800 Subject: [PATCH] Minor renames --- pkg/txn/c_scheduler.go | 30 ++++++++--------- pkg/txn/d_ts_waiter.go | 32 +++++++++---------- .../{d_ts_waiter_heap.go => d_txn_tracker.go} | 20 ++++++------ 3 files changed, 41 insertions(+), 41 deletions(-) rename pkg/txn/{d_ts_waiter_heap.go => d_txn_tracker.go} (78%) diff --git a/pkg/txn/c_scheduler.go b/pkg/txn/c_scheduler.go index ecb6e92..ada8399 100644 --- a/pkg/txn/c_scheduler.go +++ b/pkg/txn/c_scheduler.go @@ -9,27 +9,27 @@ type Scheduler struct { sync.Mutex nextTs uint64 - readWaiter *TsWaiter - commitWaiter *TsWaiter + readVisibilityWaiter *TsWaiter + commitVisibilityWaiter *TsWaiter readyToCommitTxns []ReadyToCommitTxn } func NewScheduler() *Scheduler { scheduler := &Scheduler{ - nextTs: 1, - readWaiter: NewTsWaiter(), - commitWaiter: NewTsWaiter(), + nextTs: 1, + readVisibilityWaiter: NewTsWaiter(), + commitVisibilityWaiter: NewTsWaiter(), } - scheduler.readWaiter.Done(scheduler.nextTs - 1) - scheduler.commitWaiter.Done(scheduler.nextTs - 1) + scheduler.readVisibilityWaiter.Done(scheduler.nextTs - 1) + scheduler.commitVisibilityWaiter.Done(scheduler.nextTs - 1) return scheduler } func (o *Scheduler) Stop() { - o.readWaiter.Stop() - o.commitWaiter.Stop() + o.readVisibilityWaiter.Stop() + o.commitVisibilityWaiter.Stop() } func (o *Scheduler) NewReadTs() uint64 { @@ -37,9 +37,9 @@ func (o *Scheduler) NewReadTs() uint64 { defer o.Unlock() beginTimestamp := o.nextTs - 1 - o.readWaiter.Begin(beginTimestamp) + o.readVisibilityWaiter.Begin(beginTimestamp) - err := o.commitWaiter.WaitFor(context.Background(), beginTimestamp) + err := o.commitVisibilityWaiter.WaitFor(context.Background(), beginTimestamp) if err != nil { panic(err) } @@ -68,16 +68,16 @@ func (o *Scheduler) NewCommitTs(transaction *Txn) (uint64, error) { o.nextTs = o.nextTs + 1 o.addReadyToCommitTxn(transaction, commitTs) - o.commitWaiter.Begin(commitTs) + o.commitVisibilityWaiter.Begin(commitTs) return commitTs, nil } func (o *Scheduler) DoneRead(transaction *Txn) { - o.readWaiter.Done(transaction.snapshot.ts) + o.readVisibilityWaiter.Done(transaction.snapshot.ts) } func (o *Scheduler) DoneCommit(commitTs uint64) { - o.commitWaiter.Done(commitTs) + o.commitVisibilityWaiter.Done(commitTs) } func (o *Scheduler) hasConflictFor(txn *Txn) bool { @@ -98,7 +98,7 @@ func (o *Scheduler) hasConflictFor(txn *Txn) bool { func (o *Scheduler) gcOldReadyToCommitTxns() { updatedReadyToCommitTxns := o.readyToCommitTxns[:0] - lastCommittedTxnTs := o.readWaiter.DoneTill() + lastCommittedTxnTs := o.readVisibilityWaiter.DoneTill() for _, readyToCommitTxn := range o.readyToCommitTxns { if readyToCommitTxn.commitTs <= lastCommittedTxnTs { diff --git a/pkg/txn/d_ts_waiter.go b/pkg/txn/d_ts_waiter.go index dcf9562..ebc6b2d 100644 --- a/pkg/txn/d_ts_waiter.go +++ b/pkg/txn/d_ts_waiter.go @@ -20,16 +20,16 @@ type Event struct { } type TsWaiter struct { - eventCh chan Event - stopCh chan struct{} - txnHeap *TransactionHeap + eventCh chan Event + stopCh chan struct{} + txnTracker *TransactionTracker } func NewTsWaiter() *TsWaiter { waiter := &TsWaiter{ - eventCh: make(chan Event), - stopCh: make(chan struct{}), - txnHeap: NewTransactionHeap(), + eventCh: make(chan Event), + stopCh: make(chan struct{}), + txnTracker: NewTransactionTracker(), } go waiter.Run() return waiter @@ -64,7 +64,7 @@ func (w *TsWaiter) Stop() { } func (w *TsWaiter) DoneTill() uint64 { - return w.txnHeap.GlobalDoneTill() + return w.txnTracker.GlobalDoneTill() } func (w *TsWaiter) Run() { @@ -73,13 +73,13 @@ func (w *TsWaiter) Run() { case event := <-w.eventCh: switch event.typ { case BeginEvent: - w.txnHeap.AddBeginEvent(event.ts) - globalDoneTill := w.txnHeap.RecalculateGlobalDoneTill() - w.txnHeap.CloseWaitersUntil(globalDoneTill) + w.txnTracker.AddBeginEvent(event.ts) + globalDoneTill := w.txnTracker.RecalculateGlobalDoneTill() + w.txnTracker.CloseWaitersUntil(globalDoneTill) case DoneEvent: - w.txnHeap.AddDoneEvent(event.ts) - globalDoneTill := w.txnHeap.RecalculateGlobalDoneTill() - w.txnHeap.CloseWaitersUntil(globalDoneTill) + w.txnTracker.AddDoneEvent(event.ts) + globalDoneTill := w.txnTracker.RecalculateGlobalDoneTill() + w.txnTracker.CloseWaitersUntil(globalDoneTill) case WaitForEvent: w.processWaitEvent(event) default: @@ -96,7 +96,7 @@ func (w *TsWaiter) processWaitEvent(event Event) { if doneTill >= event.ts { close(event.waitCh) } else { - w.txnHeap.AddWaiter(event.ts, event.waitCh) + w.txnTracker.AddWaiter(event.ts, event.waitCh) } } @@ -104,10 +104,10 @@ func (w *TsWaiter) processClose() { close(w.eventCh) close(w.stopCh) - for timestamp, waiter := range w.txnHeap.waiters { + for timestamp, waiter := range w.txnTracker.waiters { for _, channel := range waiter { close(channel) } - delete(w.txnHeap.waiters, timestamp) + delete(w.txnTracker.waiters, timestamp) } } diff --git a/pkg/txn/d_ts_waiter_heap.go b/pkg/txn/d_txn_tracker.go similarity index 78% rename from pkg/txn/d_ts_waiter_heap.go rename to pkg/txn/d_txn_tracker.go index 8e78d48..32f28f8 100644 --- a/pkg/txn/d_ts_waiter_heap.go +++ b/pkg/txn/d_txn_tracker.go @@ -19,18 +19,18 @@ func (h *TsHeap) Pop() any { return x } -type TransactionHeap struct { +type TransactionTracker struct { doneTillTs atomic.Uint64 tsHeap TsHeap // min tsHeap of txn timestamps pendingTxnCounts map[uint64]int // ts -> txn count waiters map[uint64][]chan struct{} // ts -> waitChs } -func NewTransactionHeap() *TransactionHeap { +func NewTransactionTracker() *TransactionTracker { var tsHeap TsHeap heap.Init(&tsHeap) - res := TransactionHeap{ + res := TransactionTracker{ tsHeap: tsHeap, pendingTxnCounts: make(map[uint64]int), waiters: make(map[uint64][]chan struct{}), @@ -39,21 +39,21 @@ func NewTransactionHeap() *TransactionHeap { return &res } -func (h *TransactionHeap) AddBeginEvent(ts uint64) { +func (h *TransactionTracker) AddBeginEvent(ts uint64) { if _, ok := h.pendingTxnCounts[ts]; !ok { heap.Push(&h.tsHeap, ts) } h.pendingTxnCounts[ts] += 1 } -func (h *TransactionHeap) AddDoneEvent(ts uint64) { +func (h *TransactionTracker) AddDoneEvent(ts uint64) { if _, ok := h.pendingTxnCounts[ts]; !ok { heap.Push(&h.tsHeap, ts) } h.pendingTxnCounts[ts] += -1 } -func (h *TransactionHeap) AddWaiter(ts uint64, ch chan struct{}) { +func (h *TransactionTracker) AddWaiter(ts uint64, ch chan struct{}) { if _, ok := h.waiters[ts]; !ok { h.waiters[ts] = []chan struct{}{ch} } else { @@ -61,7 +61,7 @@ func (h *TransactionHeap) AddWaiter(ts uint64, ch chan struct{}) { } } -func (h *TransactionHeap) CloseWaitersUntil(utilTs uint64) { +func (h *TransactionTracker) CloseWaitersUntil(utilTs uint64) { for ts, waiter := range h.waiters { if ts <= utilTs { for _, channel := range waiter { @@ -72,11 +72,11 @@ func (h *TransactionHeap) CloseWaitersUntil(utilTs uint64) { } } -func (h *TransactionHeap) GlobalDoneTill() uint64 { +func (h *TransactionTracker) GlobalDoneTill() uint64 { return h.doneTillTs.Load() } -func (h *TransactionHeap) RecalculateGlobalDoneTill() uint64 { +func (h *TransactionTracker) RecalculateGlobalDoneTill() uint64 { doneTill := h.GlobalDoneTill() globalDoneTill := doneTill for len(h.tsHeap) > 0 { @@ -85,7 +85,7 @@ func (h *TransactionHeap) RecalculateGlobalDoneTill() uint64 { break } - // update txnHeap & pendingTxnCounts + // update txnTracker & pendingTxnCounts heap.Pop(&h.tsHeap) delete(h.pendingTxnCounts, localDoneTill)