Skip to content

Commit

Permalink
pkg/causality(cdc): refactor and simplify conflict detector (#10954)
Browse files Browse the repository at this point in the history
close #10973
  • Loading branch information
CharlesCheung96 authored May 15, 2024
1 parent 7531086 commit aed7768
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 289 deletions.
45 changes: 5 additions & 40 deletions cdc/sink/dmlsink/txn/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package txn
import (
"encoding/binary"
"hash/fnv"
"sort"
"strings"
"time"

Expand All @@ -41,51 +40,17 @@ func (e *txnEvent) OnConflictResolved() {
e.conflictResolved = time.Now()
}

// GenSortedDedupKeysHash implements causality.txnEvent interface.
func (e *txnEvent) GenSortedDedupKeysHash(numSlots uint64) []uint64 {
hashes := genTxnKeys(e.TxnCallbackableEvent.Event)

// Sort and dedup hashes.
// Sort hashes by `hash % numSlots` to avoid deadlock, and then dedup
// hashes, so the same txn will not check confict with the same hash twice to
// prevent potential cyclic self dependency in the causality dependency
// graph.
return sortAndDedupHashes(hashes, numSlots)
}

func sortAndDedupHashes(hashes []uint64, numSlots uint64) []uint64 {
if len(hashes) == 0 {
return nil
}

// Sort hashes by `hash % numSlots` to avoid deadlock.
sort.Slice(hashes, func(i, j int) bool { return hashes[i]%numSlots < hashes[j]%numSlots })

// Dedup hashes
last := hashes[0]
j := 1
for i, hash := range hashes {
if i == 0 {
// skip first one, start checking duplication from 2nd one
continue
}
if hash == last {
continue
}
last = hash
hashes[j] = hash
j++
}
hashes = hashes[:j]

return hashes
// ConflictKeys implements causality.txnEvent interface.
func (e *txnEvent) ConflictKeys() []uint64 {
return genTxnKeys(e.TxnCallbackableEvent.Event)
}

// genTxnKeys returns hash keys for `txn`.
// genTxnKeys returns deduplicated hash keys of a transaction.
func genTxnKeys(txn *model.SingleTableTxn) []uint64 {
if len(txn.Rows) == 0 {
return nil
}

hashRes := make(map[uint64]struct{}, len(txn.Rows))
hasher := fnv.New32a()
for _, row := range txn.Rows {
Expand Down
27 changes: 0 additions & 27 deletions cdc/sink/dmlsink/txn/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,30 +199,3 @@ func TestGenKeys(t *testing.T) {
require.Equal(t, tc.expected, keys)
}
}

func TestSortAndDedupHash(t *testing.T) {
// If a transaction contains multiple rows, these rows may generate the same hash
// in some rare cases. We should dedup these hashes to avoid unnecessary self cyclic
// dependency in the causality dependency graph.
t.Parallel()
testCases := []struct {
hashes []uint64
expected []uint64
}{{
// No duplicate hashes
hashes: []uint64{1, 2, 3, 4, 5},
expected: []uint64{1, 2, 3, 4, 5},
}, {
// Duplicate hashes
hashes: []uint64{1, 2, 3, 4, 5, 1, 2, 3, 4, 5},
expected: []uint64{1, 2, 3, 4, 5},
}, {
// Has hash value larger than slots count, should sort by `hash % numSlots` first.
hashes: []uint64{4, 9, 9, 3},
expected: []uint64{9, 3, 4},
}}

for _, tc := range testCases {
require.Equal(t, tc.expected, sortAndDedupHashes(tc.hashes, 8))
}
}
29 changes: 9 additions & 20 deletions pkg/causality/conflict_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,14 @@ type ConflictDetector[Txn txnEvent] struct {

// slots are used to find all unfinished transactions
// conflicting with an incoming transactions.
slots *internal.Slots[*internal.Node]
slots *internal.Slots
numSlots uint64

// nextWorkerID is used to dispatch transactions round-robin.
nextWorkerID atomic.Int64
// nextCacheID is used to dispatch transactions round-robin.
nextCacheID atomic.Int64

// Used to run a background goroutine to GC or notify nodes.
notifiedNodes *chann.DrainableChann[func()]
garbageNodes *chann.DrainableChann[*internal.Node]
wg sync.WaitGroup
closeCh chan struct{}
}
Expand All @@ -53,10 +52,9 @@ func NewConflictDetector[Txn txnEvent](
) *ConflictDetector[Txn] {
ret := &ConflictDetector[Txn]{
resolvedTxnCaches: make([]txnCache[Txn], opt.Count),
slots: internal.NewSlots[*internal.Node](numSlots),
slots: internal.NewSlots(numSlots),
numSlots: numSlots,
notifiedNodes: chann.NewAutoDrainChann[func()](),
garbageNodes: chann.NewAutoDrainChann[*internal.Node](),
closeCh: make(chan struct{}),
}
for i := 0; i < opt.Count; i++ {
Expand All @@ -75,28 +73,24 @@ func NewConflictDetector[Txn txnEvent](
// Add pushes a transaction to the ConflictDetector.
//
// NOTE: if multiple threads access this concurrently,
// Txn.GenSortedDedupKeysHash must be sorted by the slot index.
// Txn.ConflictKeys must be sorted by the slot index.
func (d *ConflictDetector[Txn]) Add(txn Txn) {
sortedKeysHash := txn.GenSortedDedupKeysHash(d.numSlots)
node := internal.NewNode(sortedKeysHash)
hashes := txn.ConflictKeys()
node := d.slots.AllocNode(hashes)
txnWithNotifier := TxnWithNotifier[Txn]{
TxnEvent: txn,
PostTxnExecuted: func() {
// After this transaction is executed, we can remove the node from the graph,
// and resolve related dependencies for these transacitons which depend on this
// executed transaction.
node.Remove()

// Send this node to garbageNodes to GC it from the slots if this node is still
// occupied related slots.
d.garbageNodes.In() <- node
d.slots.Remove(node)
},
}
node.TrySendToTxnCache = func(cacheID int64) bool {
// Try sending this txn to related cache as soon as all dependencies are resolved.
return d.sendToCache(txnWithNotifier, cacheID)
}
node.RandCacheID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.resolvedTxnCaches)) }
node.RandCacheID = func() int64 { return d.nextCacheID.Add(1) % int64(len(d.resolvedTxnCaches)) }
node.OnNotified = func(callback func()) { d.notifiedNodes.In() <- callback }
d.slots.Add(node)
}
Expand All @@ -110,7 +104,6 @@ func (d *ConflictDetector[Txn]) Close() {
func (d *ConflictDetector[Txn]) runBackgroundTasks() {
defer func() {
d.notifiedNodes.CloseAndDrain()
d.garbageNodes.CloseAndDrain()
}()
for {
select {
Expand All @@ -120,10 +113,6 @@ func (d *ConflictDetector[Txn]) runBackgroundTasks() {
if notifyCallback != nil {
notifyCallback()
}
case node := <-d.garbageNodes.Out():
if node != nil {
d.slots.Free(node)
}
}
}
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/causality/internal/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
)

func TestMain(m *testing.M) {
leakutil.SetUpLeakTest(m)
}
83 changes: 6 additions & 77 deletions pkg/causality/internal/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ type (
const (
unassigned = cacheID(-2)
assignedToAny = cacheID(-1)

invalidNodeID = int64(-1)
)

var (
Expand Down Expand Up @@ -60,8 +58,8 @@ type Node struct {

// Following fields are used for notifying a node's dependers lock-free.
totalDependencies int32
resolvedDependencies int32
removedDependencies int32
resolvedDependencies int32
resolvedList []int64

// Following fields are protected by `mu`.
Expand All @@ -84,52 +82,14 @@ type Node struct {
dependers *btree.BTreeG[*Node]
}

// NewNode creates a new node.
func NewNode(sortedDedupKeysHash []uint64) (ret *Node) {
defer func() {
ret.id = genNextNodeID()
ret.sortedDedupKeysHash = sortedDedupKeysHash
ret.TrySendToTxnCache = nil
ret.RandCacheID = nil
ret.totalDependencies = 0
ret.resolvedDependencies = 0
ret.removedDependencies = 0
ret.resolvedList = nil
ret.assignedTo = unassigned
ret.removed = false
}()

ret = new(Node)
return
}

// NodeID implements interface internal.SlotNode.
func (n *Node) NodeID() int64 {
func (n *Node) nodeID() int64 {
return n.id
}

// Hashes implements interface internal.SlotNode.
func (n *Node) Hashes() []uint64 {
return n.sortedDedupKeysHash
}

// DependOn implements interface internal.SlotNode.
func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int) {
func (n *Node) dependOn(dependencyNodes map[int64]*Node) {
resolvedDependencies := int32(0)

depend := func(target *Node) {
if target == nil {
// For a given Node, every dependency corresponds to a target.
// If target is nil it means the dependency doesn't conflict
// with any other nodes. However it's still necessary to track
// it because Node.tryResolve needs to counting the number of
// resolved dependencies.
resolvedDependencies = atomic.AddInt32(&n.resolvedDependencies, 1)
atomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], assignedToAny)
atomic.AddInt32(&n.removedDependencies, 1)
return
}

if target.id == n.id {
log.Panic("node cannot depend on itself")
}
Expand Down Expand Up @@ -157,13 +117,8 @@ func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int)
}
}

// Re-allocate ID in `DependOn` instead of creating the node, because the node can be
// pending in slots after it's created.
// ?: why gen new ID here?
n.id = genNextNodeID()

// `totalDependencies` and `resolvedList` must be initialized before depending on any targets.
n.totalDependencies = int32(len(dependencyNodes) + noDependencyKeyCnt)
n.totalDependencies = int32(len(dependencyNodes))
n.resolvedList = make([]int64, 0, n.totalDependencies)
for i := 0; i < int(n.totalDependencies); i++ {
n.resolvedList = append(n.resolvedList, unassigned)
Expand All @@ -172,15 +127,11 @@ func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int)
for _, node := range dependencyNodes {
depend(node)
}
for i := 0; i < noDependencyKeyCnt; i++ {
depend(nil)
}

n.maybeResolve()
}

// Remove implements interface internal.SlotNode.
func (n *Node) Remove() {
func (n *Node) remove() {
n.mu.Lock()
defer n.mu.Unlock()

Expand All @@ -197,25 +148,6 @@ func (n *Node) Remove() {
}
}

// Free implements interface internal.SlotNode.
// It must be called if a node is no longer used.
// We are using sync.Pool to lessen the burden of GC.
func (n *Node) Free() {
n.mu.Lock()
defer n.mu.Unlock()
if n.id == invalidNodeID {
log.Panic("double free")
}

n.id = invalidNodeID
n.TrySendToTxnCache = nil

// TODO: reuse node if necessary. Currently it's impossible if async-notify is used.
// The reason is a node can step functions `assignTo`, `Remove`, `Free`, then `assignTo`.
// again. In the last `assignTo`, it can never know whether the node has been reused
// or not.
}

// tryAssignTo assigns a node to a cache. Returns `true` on success.
func (n *Node) tryAssignTo(cacheID int64) bool {
n.mu.Lock()
Expand Down Expand Up @@ -254,7 +186,7 @@ func (n *Node) maybeResolve() {
log.Panic("invalid cache ID", zap.Uint64("cacheID", uint64(cacheID)))
}

if cacheID >= 0 {
if cacheID != assignedToAny {
n.tryAssignTo(cacheID)
return
}
Expand Down Expand Up @@ -289,9 +221,6 @@ func (n *Node) tryResolve() (int64, bool) {
hasDiffDep := false
for i := 1; i < int(n.totalDependencies); i++ {
curr := atomic.LoadInt64(&n.resolvedList[i])
// Todo: simplify assign to logic, only resolve dependencies nodes after
// corresponding transactions are executed.
//
// In DependOn, depend(nil) set resolvedList[i] to assignedToAny
// for these no dependecy keys.
if curr == assignedToAny {
Expand Down
Loading

0 comments on commit aed7768

Please sign in to comment.