Skip to content

Commit

Permalink
Minor renames
Browse files Browse the repository at this point in the history
  • Loading branch information
arjunsk committed Jan 9, 2024
1 parent 8938893 commit c46a336
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 41 deletions.
30 changes: 15 additions & 15 deletions pkg/txn/c_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,37 @@ type Scheduler struct {
sync.Mutex
nextTs uint64

readWaiter *TsWaiter
commitWaiter *TsWaiter
readVisibilityWaiter *TsWaiter
commitVisibilityWaiter *TsWaiter

readyToCommitTxns []ReadyToCommitTxn
}

func NewScheduler() *Scheduler {
scheduler := &Scheduler{
nextTs: 1,
readWaiter: NewTsWaiter(),
commitWaiter: NewTsWaiter(),
nextTs: 1,
readVisibilityWaiter: NewTsWaiter(),
commitVisibilityWaiter: NewTsWaiter(),
}

scheduler.readWaiter.Done(scheduler.nextTs - 1)
scheduler.commitWaiter.Done(scheduler.nextTs - 1)
scheduler.readVisibilityWaiter.Done(scheduler.nextTs - 1)
scheduler.commitVisibilityWaiter.Done(scheduler.nextTs - 1)
return scheduler
}

func (o *Scheduler) Stop() {
o.readWaiter.Stop()
o.commitWaiter.Stop()
o.readVisibilityWaiter.Stop()
o.commitVisibilityWaiter.Stop()
}

func (o *Scheduler) NewReadTs() uint64 {
o.Lock()
defer o.Unlock()

beginTimestamp := o.nextTs - 1
o.readWaiter.Begin(beginTimestamp)
o.readVisibilityWaiter.Begin(beginTimestamp)

err := o.commitWaiter.WaitFor(context.Background(), beginTimestamp)
err := o.commitVisibilityWaiter.WaitFor(context.Background(), beginTimestamp)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -68,16 +68,16 @@ func (o *Scheduler) NewCommitTs(transaction *Txn) (uint64, error) {
o.nextTs = o.nextTs + 1

o.addReadyToCommitTxn(transaction, commitTs)
o.commitWaiter.Begin(commitTs)
o.commitVisibilityWaiter.Begin(commitTs)
return commitTs, nil
}

func (o *Scheduler) DoneRead(transaction *Txn) {
o.readWaiter.Done(transaction.snapshot.ts)
o.readVisibilityWaiter.Done(transaction.snapshot.ts)
}

func (o *Scheduler) DoneCommit(commitTs uint64) {
o.commitWaiter.Done(commitTs)
o.commitVisibilityWaiter.Done(commitTs)
}

func (o *Scheduler) hasConflictFor(txn *Txn) bool {
Expand All @@ -98,7 +98,7 @@ func (o *Scheduler) hasConflictFor(txn *Txn) bool {

func (o *Scheduler) gcOldReadyToCommitTxns() {
updatedReadyToCommitTxns := o.readyToCommitTxns[:0]
lastCommittedTxnTs := o.readWaiter.DoneTill()
lastCommittedTxnTs := o.readVisibilityWaiter.DoneTill()

for _, readyToCommitTxn := range o.readyToCommitTxns {
if readyToCommitTxn.commitTs <= lastCommittedTxnTs {
Expand Down
32 changes: 16 additions & 16 deletions pkg/txn/d_ts_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ type Event struct {
}

type TsWaiter struct {
eventCh chan Event
stopCh chan struct{}
txnHeap *TransactionHeap
eventCh chan Event
stopCh chan struct{}
txnTracker *TransactionTracker
}

func NewTsWaiter() *TsWaiter {
waiter := &TsWaiter{
eventCh: make(chan Event),
stopCh: make(chan struct{}),
txnHeap: NewTransactionHeap(),
eventCh: make(chan Event),
stopCh: make(chan struct{}),
txnTracker: NewTransactionTracker(),
}
go waiter.Run()
return waiter
Expand Down Expand Up @@ -64,7 +64,7 @@ func (w *TsWaiter) Stop() {
}

func (w *TsWaiter) DoneTill() uint64 {
return w.txnHeap.GlobalDoneTill()
return w.txnTracker.GlobalDoneTill()
}

func (w *TsWaiter) Run() {
Expand All @@ -73,13 +73,13 @@ func (w *TsWaiter) Run() {
case event := <-w.eventCh:
switch event.typ {
case BeginEvent:
w.txnHeap.AddBeginEvent(event.ts)
globalDoneTill := w.txnHeap.RecalculateGlobalDoneTill()
w.txnHeap.CloseWaitersUntil(globalDoneTill)
w.txnTracker.AddBeginEvent(event.ts)
globalDoneTill := w.txnTracker.RecalculateGlobalDoneTill()
w.txnTracker.CloseWaitersUntil(globalDoneTill)
case DoneEvent:
w.txnHeap.AddDoneEvent(event.ts)
globalDoneTill := w.txnHeap.RecalculateGlobalDoneTill()
w.txnHeap.CloseWaitersUntil(globalDoneTill)
w.txnTracker.AddDoneEvent(event.ts)
globalDoneTill := w.txnTracker.RecalculateGlobalDoneTill()
w.txnTracker.CloseWaitersUntil(globalDoneTill)
case WaitForEvent:
w.processWaitEvent(event)
default:
Expand All @@ -96,18 +96,18 @@ func (w *TsWaiter) processWaitEvent(event Event) {
if doneTill >= event.ts {
close(event.waitCh)
} else {
w.txnHeap.AddWaiter(event.ts, event.waitCh)
w.txnTracker.AddWaiter(event.ts, event.waitCh)
}
}

func (w *TsWaiter) processClose() {
close(w.eventCh)
close(w.stopCh)

for timestamp, waiter := range w.txnHeap.waiters {
for timestamp, waiter := range w.txnTracker.waiters {
for _, channel := range waiter {
close(channel)
}
delete(w.txnHeap.waiters, timestamp)
delete(w.txnTracker.waiters, timestamp)
}
}
20 changes: 10 additions & 10 deletions pkg/txn/d_ts_waiter_heap.go → pkg/txn/d_txn_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ func (h *TsHeap) Pop() any {
return x
}

type TransactionHeap struct {
type TransactionTracker struct {
doneTillTs atomic.Uint64
tsHeap TsHeap // min tsHeap of txn timestamps
pendingTxnCounts map[uint64]int // ts -> txn count
waiters map[uint64][]chan struct{} // ts -> waitChs
}

func NewTransactionHeap() *TransactionHeap {
func NewTransactionTracker() *TransactionTracker {
var tsHeap TsHeap
heap.Init(&tsHeap)

res := TransactionHeap{
res := TransactionTracker{
tsHeap: tsHeap,
pendingTxnCounts: make(map[uint64]int),
waiters: make(map[uint64][]chan struct{}),
Expand All @@ -39,29 +39,29 @@ func NewTransactionHeap() *TransactionHeap {
return &res
}

func (h *TransactionHeap) AddBeginEvent(ts uint64) {
func (h *TransactionTracker) AddBeginEvent(ts uint64) {
if _, ok := h.pendingTxnCounts[ts]; !ok {
heap.Push(&h.tsHeap, ts)
}
h.pendingTxnCounts[ts] += 1
}

func (h *TransactionHeap) AddDoneEvent(ts uint64) {
func (h *TransactionTracker) AddDoneEvent(ts uint64) {
if _, ok := h.pendingTxnCounts[ts]; !ok {
heap.Push(&h.tsHeap, ts)
}
h.pendingTxnCounts[ts] += -1
}

func (h *TransactionHeap) AddWaiter(ts uint64, ch chan struct{}) {
func (h *TransactionTracker) AddWaiter(ts uint64, ch chan struct{}) {
if _, ok := h.waiters[ts]; !ok {
h.waiters[ts] = []chan struct{}{ch}
} else {
h.waiters[ts] = append(h.waiters[ts], ch)
}
}

func (h *TransactionHeap) CloseWaitersUntil(utilTs uint64) {
func (h *TransactionTracker) CloseWaitersUntil(utilTs uint64) {
for ts, waiter := range h.waiters {
if ts <= utilTs {
for _, channel := range waiter {
Expand All @@ -72,11 +72,11 @@ func (h *TransactionHeap) CloseWaitersUntil(utilTs uint64) {
}
}

func (h *TransactionHeap) GlobalDoneTill() uint64 {
func (h *TransactionTracker) GlobalDoneTill() uint64 {
return h.doneTillTs.Load()
}

func (h *TransactionHeap) RecalculateGlobalDoneTill() uint64 {
func (h *TransactionTracker) RecalculateGlobalDoneTill() uint64 {
doneTill := h.GlobalDoneTill()
globalDoneTill := doneTill
for len(h.tsHeap) > 0 {
Expand All @@ -85,7 +85,7 @@ func (h *TransactionHeap) RecalculateGlobalDoneTill() uint64 {
break
}

// update txnHeap & pendingTxnCounts
// update txnTracker & pendingTxnCounts
heap.Pop(&h.tsHeap)
delete(h.pendingTxnCounts, localDoneTill)

Expand Down

0 comments on commit c46a336

Please sign in to comment.