diff --git a/cdc/sink/dmlsink/txn/event.go b/cdc/sink/dmlsink/txn/event.go index 39bb764d3f4..0fa40471b0d 100644 --- a/cdc/sink/dmlsink/txn/event.go +++ b/cdc/sink/dmlsink/txn/event.go @@ -16,7 +16,6 @@ package txn import ( "encoding/binary" "hash/fnv" - "sort" "strings" "time" @@ -41,51 +40,17 @@ func (e *txnEvent) OnConflictResolved() { e.conflictResolved = time.Now() } -// GenSortedDedupKeysHash implements causality.txnEvent interface. -func (e *txnEvent) GenSortedDedupKeysHash(numSlots uint64) []uint64 { - hashes := genTxnKeys(e.TxnCallbackableEvent.Event) - - // Sort and dedup hashes. - // Sort hashes by `hash % numSlots` to avoid deadlock, and then dedup - // hashes, so the same txn will not check confict with the same hash twice to - // prevent potential cyclic self dependency in the causality dependency - // graph. - return sortAndDedupHashes(hashes, numSlots) -} - -func sortAndDedupHashes(hashes []uint64, numSlots uint64) []uint64 { - if len(hashes) == 0 { - return nil - } - - // Sort hashes by `hash % numSlots` to avoid deadlock. - sort.Slice(hashes, func(i, j int) bool { return hashes[i]%numSlots < hashes[j]%numSlots }) - - // Dedup hashes - last := hashes[0] - j := 1 - for i, hash := range hashes { - if i == 0 { - // skip first one, start checking duplication from 2nd one - continue - } - if hash == last { - continue - } - last = hash - hashes[j] = hash - j++ - } - hashes = hashes[:j] - - return hashes +// ConflictKeys implements causality.txnEvent interface. +func (e *txnEvent) ConflictKeys() []uint64 { + return genTxnKeys(e.TxnCallbackableEvent.Event) } -// genTxnKeys returns hash keys for `txn`. +// genTxnKeys returns deduplicated hash keys of a transaction. func genTxnKeys(txn *model.SingleTableTxn) []uint64 { if len(txn.Rows) == 0 { return nil } + hashRes := make(map[uint64]struct{}, len(txn.Rows)) hasher := fnv.New32a() for _, row := range txn.Rows { diff --git a/cdc/sink/dmlsink/txn/event_test.go b/cdc/sink/dmlsink/txn/event_test.go index 1f6a029c9ff..bdbb5cd7db9 100644 --- a/cdc/sink/dmlsink/txn/event_test.go +++ b/cdc/sink/dmlsink/txn/event_test.go @@ -199,30 +199,3 @@ func TestGenKeys(t *testing.T) { require.Equal(t, tc.expected, keys) } } - -func TestSortAndDedupHash(t *testing.T) { - // If a transaction contains multiple rows, these rows may generate the same hash - // in some rare cases. We should dedup these hashes to avoid unnecessary self cyclic - // dependency in the causality dependency graph. - t.Parallel() - testCases := []struct { - hashes []uint64 - expected []uint64 - }{{ - // No duplicate hashes - hashes: []uint64{1, 2, 3, 4, 5}, - expected: []uint64{1, 2, 3, 4, 5}, - }, { - // Duplicate hashes - hashes: []uint64{1, 2, 3, 4, 5, 1, 2, 3, 4, 5}, - expected: []uint64{1, 2, 3, 4, 5}, - }, { - // Has hash value larger than slots count, should sort by `hash % numSlots` first. - hashes: []uint64{4, 9, 9, 3}, - expected: []uint64{9, 3, 4}, - }} - - for _, tc := range testCases { - require.Equal(t, tc.expected, sortAndDedupHashes(tc.hashes, 8)) - } -} diff --git a/pkg/causality/conflict_detector.go b/pkg/causality/conflict_detector.go index 79f14944027..417fda4f219 100644 --- a/pkg/causality/conflict_detector.go +++ b/pkg/causality/conflict_detector.go @@ -34,15 +34,14 @@ type ConflictDetector[Txn txnEvent] struct { // slots are used to find all unfinished transactions // conflicting with an incoming transactions. - slots *internal.Slots[*internal.Node] + slots *internal.Slots numSlots uint64 - // nextWorkerID is used to dispatch transactions round-robin. - nextWorkerID atomic.Int64 + // nextCacheID is used to dispatch transactions round-robin. + nextCacheID atomic.Int64 // Used to run a background goroutine to GC or notify nodes. notifiedNodes *chann.DrainableChann[func()] - garbageNodes *chann.DrainableChann[*internal.Node] wg sync.WaitGroup closeCh chan struct{} } @@ -53,10 +52,9 @@ func NewConflictDetector[Txn txnEvent]( ) *ConflictDetector[Txn] { ret := &ConflictDetector[Txn]{ resolvedTxnCaches: make([]txnCache[Txn], opt.Count), - slots: internal.NewSlots[*internal.Node](numSlots), + slots: internal.NewSlots(numSlots), numSlots: numSlots, notifiedNodes: chann.NewAutoDrainChann[func()](), - garbageNodes: chann.NewAutoDrainChann[*internal.Node](), closeCh: make(chan struct{}), } for i := 0; i < opt.Count; i++ { @@ -75,28 +73,24 @@ func NewConflictDetector[Txn txnEvent]( // Add pushes a transaction to the ConflictDetector. // // NOTE: if multiple threads access this concurrently, -// Txn.GenSortedDedupKeysHash must be sorted by the slot index. +// Txn.ConflictKeys must be sorted by the slot index. func (d *ConflictDetector[Txn]) Add(txn Txn) { - sortedKeysHash := txn.GenSortedDedupKeysHash(d.numSlots) - node := internal.NewNode(sortedKeysHash) + hashes := txn.ConflictKeys() + node := d.slots.AllocNode(hashes) txnWithNotifier := TxnWithNotifier[Txn]{ TxnEvent: txn, PostTxnExecuted: func() { // After this transaction is executed, we can remove the node from the graph, // and resolve related dependencies for these transacitons which depend on this // executed transaction. - node.Remove() - - // Send this node to garbageNodes to GC it from the slots if this node is still - // occupied related slots. - d.garbageNodes.In() <- node + d.slots.Remove(node) }, } node.TrySendToTxnCache = func(cacheID int64) bool { // Try sending this txn to related cache as soon as all dependencies are resolved. return d.sendToCache(txnWithNotifier, cacheID) } - node.RandCacheID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.resolvedTxnCaches)) } + node.RandCacheID = func() int64 { return d.nextCacheID.Add(1) % int64(len(d.resolvedTxnCaches)) } node.OnNotified = func(callback func()) { d.notifiedNodes.In() <- callback } d.slots.Add(node) } @@ -110,7 +104,6 @@ func (d *ConflictDetector[Txn]) Close() { func (d *ConflictDetector[Txn]) runBackgroundTasks() { defer func() { d.notifiedNodes.CloseAndDrain() - d.garbageNodes.CloseAndDrain() }() for { select { @@ -120,10 +113,6 @@ func (d *ConflictDetector[Txn]) runBackgroundTasks() { if notifyCallback != nil { notifyCallback() } - case node := <-d.garbageNodes.Out(): - if node != nil { - d.slots.Free(node) - } } } } diff --git a/pkg/causality/internal/main_test.go b/pkg/causality/internal/main_test.go new file mode 100644 index 00000000000..97ba7649c89 --- /dev/null +++ b/pkg/causality/internal/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/pkg/causality/internal/node.go b/pkg/causality/internal/node.go index a9ec62e6055..cb2c6a79730 100644 --- a/pkg/causality/internal/node.go +++ b/pkg/causality/internal/node.go @@ -29,8 +29,6 @@ type ( const ( unassigned = cacheID(-2) assignedToAny = cacheID(-1) - - invalidNodeID = int64(-1) ) var ( @@ -60,8 +58,8 @@ type Node struct { // Following fields are used for notifying a node's dependers lock-free. totalDependencies int32 - resolvedDependencies int32 removedDependencies int32 + resolvedDependencies int32 resolvedList []int64 // Following fields are protected by `mu`. @@ -84,52 +82,14 @@ type Node struct { dependers *btree.BTreeG[*Node] } -// NewNode creates a new node. -func NewNode(sortedDedupKeysHash []uint64) (ret *Node) { - defer func() { - ret.id = genNextNodeID() - ret.sortedDedupKeysHash = sortedDedupKeysHash - ret.TrySendToTxnCache = nil - ret.RandCacheID = nil - ret.totalDependencies = 0 - ret.resolvedDependencies = 0 - ret.removedDependencies = 0 - ret.resolvedList = nil - ret.assignedTo = unassigned - ret.removed = false - }() - - ret = new(Node) - return -} - -// NodeID implements interface internal.SlotNode. -func (n *Node) NodeID() int64 { +func (n *Node) nodeID() int64 { return n.id } -// Hashes implements interface internal.SlotNode. -func (n *Node) Hashes() []uint64 { - return n.sortedDedupKeysHash -} - -// DependOn implements interface internal.SlotNode. -func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int) { +func (n *Node) dependOn(dependencyNodes map[int64]*Node) { resolvedDependencies := int32(0) depend := func(target *Node) { - if target == nil { - // For a given Node, every dependency corresponds to a target. - // If target is nil it means the dependency doesn't conflict - // with any other nodes. However it's still necessary to track - // it because Node.tryResolve needs to counting the number of - // resolved dependencies. - resolvedDependencies = atomic.AddInt32(&n.resolvedDependencies, 1) - atomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], assignedToAny) - atomic.AddInt32(&n.removedDependencies, 1) - return - } - if target.id == n.id { log.Panic("node cannot depend on itself") } @@ -157,13 +117,8 @@ func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int) } } - // Re-allocate ID in `DependOn` instead of creating the node, because the node can be - // pending in slots after it's created. - // ?: why gen new ID here? - n.id = genNextNodeID() - // `totalDependencies` and `resolvedList` must be initialized before depending on any targets. - n.totalDependencies = int32(len(dependencyNodes) + noDependencyKeyCnt) + n.totalDependencies = int32(len(dependencyNodes)) n.resolvedList = make([]int64, 0, n.totalDependencies) for i := 0; i < int(n.totalDependencies); i++ { n.resolvedList = append(n.resolvedList, unassigned) @@ -172,15 +127,11 @@ func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int) for _, node := range dependencyNodes { depend(node) } - for i := 0; i < noDependencyKeyCnt; i++ { - depend(nil) - } n.maybeResolve() } -// Remove implements interface internal.SlotNode. -func (n *Node) Remove() { +func (n *Node) remove() { n.mu.Lock() defer n.mu.Unlock() @@ -197,25 +148,6 @@ func (n *Node) Remove() { } } -// Free implements interface internal.SlotNode. -// It must be called if a node is no longer used. -// We are using sync.Pool to lessen the burden of GC. -func (n *Node) Free() { - n.mu.Lock() - defer n.mu.Unlock() - if n.id == invalidNodeID { - log.Panic("double free") - } - - n.id = invalidNodeID - n.TrySendToTxnCache = nil - - // TODO: reuse node if necessary. Currently it's impossible if async-notify is used. - // The reason is a node can step functions `assignTo`, `Remove`, `Free`, then `assignTo`. - // again. In the last `assignTo`, it can never know whether the node has been reused - // or not. -} - // tryAssignTo assigns a node to a cache. Returns `true` on success. func (n *Node) tryAssignTo(cacheID int64) bool { n.mu.Lock() @@ -254,7 +186,7 @@ func (n *Node) maybeResolve() { log.Panic("invalid cache ID", zap.Uint64("cacheID", uint64(cacheID))) } - if cacheID >= 0 { + if cacheID != assignedToAny { n.tryAssignTo(cacheID) return } @@ -289,9 +221,6 @@ func (n *Node) tryResolve() (int64, bool) { hasDiffDep := false for i := 1; i < int(n.totalDependencies); i++ { curr := atomic.LoadInt64(&n.resolvedList[i]) - // Todo: simplify assign to logic, only resolve dependencies nodes after - // corresponding transactions are executed. - // // In DependOn, depend(nil) set resolvedList[i] to assignedToAny // for these no dependecy keys. if curr == assignedToAny { diff --git a/pkg/causality/internal/node_test.go b/pkg/causality/internal/node_test.go index 85dfe8b7b27..10703ab37cf 100644 --- a/pkg/causality/internal/node_test.go +++ b/pkg/causality/internal/node_test.go @@ -19,32 +19,17 @@ import ( "github.com/stretchr/testify/require" ) -var _ SlotNode[*Node] = &Node{} // Asserts that *Node implements SlotNode[*Node]. - -func newNodeForTest() *Node { - node := NewNode(nil) - node.OnNotified = func(callback func()) { - // run the callback immediately - callback() +func newNodeForTest(hashes ...uint64) *Node { + return &Node{ + id: genNextNodeID(), + sortedDedupKeysHash: sortAndDedupHashes(hashes, 8), + assignedTo: unassigned, + RandCacheID: func() cacheID { return 100 }, + OnNotified: func(callback func()) { + // run the callback immediately + callback() + }, } - return node -} - -func TestNodeFree(t *testing.T) { - // This case should not be run parallel to - // others, for fear that the use-after-free - // will race with newNodeForTest() in other cases. - - nodeA := newNodeForTest() - nodeA.Free() - - nodeA = newNodeForTest() - nodeA.Free() - - // Double freeing should panic. - require.Panics(t, func() { - nodeA.Free() - }) } func TestNodeEquals(t *testing.T) { @@ -52,8 +37,8 @@ func TestNodeEquals(t *testing.T) { nodeA := newNodeForTest() nodeB := newNodeForTest() - require.False(t, nodeA.NodeID() == nodeB.NodeID()) - require.True(t, nodeA.NodeID() == nodeA.NodeID()) + require.False(t, nodeA.nodeID() == nodeB.nodeID()) + require.True(t, nodeA.nodeID() == nodeA.nodeID()) } func TestNodeDependOn(t *testing.T) { @@ -63,7 +48,7 @@ func TestNodeDependOn(t *testing.T) { nodeA := newNodeForTest() nodeB := newNodeForTest() - nodeA.DependOn(map[int64]*Node{nodeB.NodeID(): nodeB}, 999) + nodeA.dependOn(map[int64]*Node{nodeB.nodeID(): nodeB}) require.Equal(t, nodeA.dependerCount(), 0) require.Equal(t, nodeB.dependerCount(), 1) } @@ -71,24 +56,22 @@ func TestNodeDependOn(t *testing.T) { func TestNodeSingleDependency(t *testing.T) { t.Parallel() - // Node B depends on A, without any other resolved dependencies. + // Node B depends on A nodeA := newNodeForTest() nodeB := newNodeForTest() - nodeB.RandCacheID = func() cacheID { return 100 } - nodeB.DependOn(map[int64]*Node{nodeA.NodeID(): nodeA}, 0) + nodeB.dependOn(map[int64]*Node{nodeA.nodeID(): nodeA}) require.True(t, nodeA.tryAssignTo(1)) require.Equal(t, cacheID(1), nodeA.assignedWorkerID()) require.Equal(t, cacheID(1), nodeB.assignedWorkerID()) - // Node D depends on C, with some other resolved dependencies. + // Node D depends on C nodeC := newNodeForTest() nodeD := newNodeForTest() - nodeD.RandCacheID = func() cacheID { return 100 } - nodeD.DependOn(map[int64]*Node{nodeA.NodeID(): nodeC}, 999) + nodeD.dependOn(map[int64]*Node{nodeA.nodeID(): nodeC}) require.True(t, nodeC.tryAssignTo(2)) require.Equal(t, cacheID(2), nodeC.assignedWorkerID()) - nodeC.Remove() - require.Equal(t, cacheID(100), nodeD.assignedWorkerID()) + nodeC.remove() + require.Equal(t, cacheID(2), nodeD.assignedWorkerID()) } func TestNodeMultipleDependencies(t *testing.T) { @@ -103,16 +86,15 @@ func TestNodeMultipleDependencies(t *testing.T) { nodeB := newNodeForTest() nodeC := newNodeForTest() - nodeC.DependOn(map[int64]*Node{nodeA.NodeID(): nodeA, nodeB.NodeID(): nodeB}, 999) - nodeC.RandCacheID = func() cacheID { return 100 } + nodeC.dependOn(map[int64]*Node{nodeA.nodeID(): nodeA, nodeB.nodeID(): nodeB}) require.True(t, nodeA.tryAssignTo(1)) require.True(t, nodeB.tryAssignTo(2)) require.Equal(t, unassigned, nodeC.assignedWorkerID()) - nodeA.Remove() - nodeB.Remove() + nodeA.remove() + nodeB.remove() require.Equal(t, int64(100), nodeC.assignedWorkerID()) } @@ -121,8 +103,7 @@ func TestNodeResolveImmediately(t *testing.T) { // Node A depends on 0 unresolved dependencies and some resolved dependencies. nodeA := newNodeForTest() - nodeA.RandCacheID = func() cacheID { return cacheID(100) } - nodeA.DependOn(nil, 999) + nodeA.dependOn(nil) require.Equal(t, cacheID(100), nodeA.assignedWorkerID()) // Node D depends on B and C, all of them are assigned to 1. @@ -131,16 +112,14 @@ func TestNodeResolveImmediately(t *testing.T) { nodeC := newNodeForTest() require.True(t, nodeC.tryAssignTo(1)) nodeD := newNodeForTest() - nodeD.RandCacheID = func() cacheID { return cacheID(100) } - nodeD.DependOn(map[int64]*Node{nodeB.NodeID(): nodeB, nodeC.NodeID(): nodeC}, 0) + nodeD.dependOn(map[int64]*Node{nodeB.nodeID(): nodeB, nodeC.nodeID(): nodeC}) require.Equal(t, cacheID(1), nodeD.assignedWorkerID()) // Node E depends on B and C and some other resolved dependencies. - nodeB.Remove() - nodeC.Remove() + nodeB.remove() + nodeC.remove() nodeE := newNodeForTest() - nodeE.RandCacheID = func() cacheID { return cacheID(100) } - nodeE.DependOn(map[int64]*Node{nodeB.NodeID(): nodeB, nodeC.NodeID(): nodeC}, 999) + nodeE.dependOn(map[int64]*Node{nodeB.nodeID(): nodeB, nodeC.nodeID(): nodeC}) require.Equal(t, cacheID(100), nodeE.assignedWorkerID()) } @@ -149,16 +128,18 @@ func TestNodeDependOnSelf(t *testing.T) { nodeA := newNodeForTest() require.Panics(t, func() { - nodeA.DependOn(map[int64]*Node{nodeA.NodeID(): nodeA}, 999) + nodeA.dependOn(map[int64]*Node{nodeA.nodeID(): nodeA}) }) } func TestNodeDoubleAssigning(t *testing.T) { t.Parallel() - // nodeA := newNodeForTest() - // require.True(t, nodeA.tryAssignTo(1)) - // require.False(t, nodeA.tryAssignTo(2)) - - require.True(t, -1 == assignedToAny) + nodeA := newNodeForTest() + nodeA.TrySendToTxnCache = func(id cacheID) bool { + return id == 2 + } + require.False(t, nodeA.tryAssignTo(1)) + require.True(t, nodeA.tryAssignTo(2)) + require.True(t, nodeA.tryAssignTo(2)) } diff --git a/pkg/causality/internal/slots.go b/pkg/causality/internal/slots.go index 23ec22bdcf2..99e32ca4290 100644 --- a/pkg/causality/internal/slots.go +++ b/pkg/causality/internal/slots.go @@ -15,53 +15,52 @@ package internal import ( "math" + "sort" "sync" ) -type slot[E SlotNode[E]] struct { - nodes map[uint64]E +type slot struct { + nodes map[uint64]*Node mu sync.Mutex } -// SlotNode describes objects that can be compared for equality. -type SlotNode[T any] interface { - // NodeID tells the node's ID. - NodeID() int64 - // Hashs returns the sorted and deduped hashes of the node. - Hashes() []uint64 - // Construct a dependency on `others`. - DependOn(dependencyNodes map[int64]T, noDependencyKeyCnt int) - // Remove the node itself and notify all dependers. - Remove() - // Free the node itself and remove it from the graph. - Free() -} - // Slots implements slot-based conflict detection. -// It holds references to E, which can be used to build +// It holds references to Node, which can be used to build // a DAG of dependency. -type Slots[E SlotNode[E]] struct { - slots []slot[E] +type Slots struct { + slots []slot numSlots uint64 } // NewSlots creates a new Slots. -func NewSlots[E SlotNode[E]](numSlots uint64) *Slots[E] { - slots := make([]slot[E], numSlots) +func NewSlots(numSlots uint64) *Slots { + slots := make([]slot, numSlots) for i := uint64(0); i < numSlots; i++ { - slots[i].nodes = make(map[uint64]E, 8) + slots[i].nodes = make(map[uint64]*Node, 8) } - return &Slots[E]{ + return &Slots{ slots: slots, numSlots: numSlots, } } +// AllocNode allocates a new node and initializes it with the given hashes. +// TODO: reuse node if necessary. Currently it's impossible if async-notify is used. +// The reason is a node can step functions `assignTo`, `Remove`, `free`, then `assignTo`. +// again. In the last `assignTo`, it can never know whether the node has been reused +// or not. +func (s *Slots) AllocNode(hashes []uint64) *Node { + return &Node{ + id: genNextNodeID(), + sortedDedupKeysHash: sortAndDedupHashes(hashes, s.numSlots), + assignedTo: unassigned, + } +} + // Add adds an elem to the slots and calls DependOn for elem. -func (s *Slots[E]) Add(elem E) { - hashes := elem.Hashes() - dependencyNodes := make(map[int64]E, len(hashes)) - noDependecyCnt := 0 +func (s *Slots) Add(elem *Node) { + hashes := elem.sortedDedupKeysHash + dependencyNodes := make(map[int64]*Node, len(hashes)) var lastSlot uint64 = math.MaxUint64 for _, hash := range hashes { @@ -75,12 +74,10 @@ func (s *Slots[E]) Add(elem E) { // If there is a node occpuied the same hash slot, we may have conflict with it. // Add the conflict node to the dependencyNodes. if prevNode, ok := s.slots[slotIdx].nodes[hash]; ok { - prevID := prevNode.NodeID() + prevID := prevNode.nodeID() // If there are multiple hashes conflicts with the same node, we only need to // depend on the node once. dependencyNodes[prevID] = prevNode - } else { - noDependecyCnt += 1 } // Add this node to the slot, make sure new coming nodes with the same hash should // depend on this node. @@ -89,7 +86,7 @@ func (s *Slots[E]) Add(elem E) { // Construct the dependency graph based on collected `dependencyNodes` and with corresponding // slots locked. - elem.DependOn(dependencyNodes, noDependecyCnt) + elem.dependOn(dependencyNodes) // Lock those slots one by one and then unlock them one by one, so that // we can avoid 2 transactions get executed interleaved. @@ -103,23 +100,56 @@ func (s *Slots[E]) Add(elem E) { } } -// Free removes an element from the Slots. -func (s *Slots[E]) Free(elem E) { - hashes := elem.Hashes() +// Remove removes an element from the Slots. +func (s *Slots) Remove(elem *Node) { + elem.remove() + hashes := elem.sortedDedupKeysHash for _, hash := range hashes { slotIdx := getSlot(hash, s.numSlots) s.slots[slotIdx].mu.Lock() // Remove the node from the slot. // If the node is not in the slot, it means the node has been replaced by new node with the same hash, // in this case we don't need to remove it from the slot. - if tail, ok := s.slots[slotIdx].nodes[hash]; ok && tail.NodeID() == elem.NodeID() { + if tail, ok := s.slots[slotIdx].nodes[hash]; ok && tail.nodeID() == elem.nodeID() { delete(s.slots[slotIdx].nodes, hash) } s.slots[slotIdx].mu.Unlock() } - elem.Free() } func getSlot(hash, numSlots uint64) uint64 { return hash % numSlots } + +// Sort and dedup hashes. +// Sort hashes by `hash % numSlots` to avoid deadlock, and then dedup +// hashes, so the same node will not check confict with the same hash +// twice to prevent potential cyclic self dependency in the causality +// dependency graph. +func sortAndDedupHashes(hashes []uint64, numSlots uint64) []uint64 { + if len(hashes) == 0 { + return nil + } + + // Sort hashes by `hash % numSlots` to avoid deadlock. + sort.Slice(hashes, func(i, j int) bool { return hashes[i]%numSlots < hashes[j]%numSlots }) + + // Dedup hashes + last := hashes[0] + j := 1 + for i, hash := range hashes { + if i == 0 { + // skip first one, start checking duplication from 2nd one + continue + } + if hash == last { + continue + } + last = hash + hashes[j] = hash + j++ + } + hashes = hashes[:j] + + return hashes +} diff --git a/pkg/causality/internal/slots_test.go b/pkg/causality/internal/slots_test.go index 71c41deac0f..312523355a9 100644 --- a/pkg/causality/internal/slots_test.go +++ b/pkg/causality/internal/slots_test.go @@ -26,18 +26,17 @@ func TestSlotsTrivial(t *testing.T) { t.Parallel() const count = 1000 - slots := NewSlots[*Node](8) + slots := NewSlots(8) nodes := make([]*Node, 0, 1000) for i := 0; i < count; i++ { - node := NewNode([]uint64{1, 2, 3, 4, 5}) - node.RandCacheID = func() cacheID { return 100 } + node := newNodeForTest(1, 2, 3, 4, 5) slots.Add(node) nodes = append(nodes, node) } for i := 0; i < count; i++ { - slots.Free(nodes[i]) + slots.Remove(nodes[i]) } require.Equal(t, 0, len(slots.slots[1].nodes)) @@ -51,16 +50,11 @@ func TestSlotsConcurrentOps(t *testing.T) { t.Parallel() const N = 256 - slots := NewSlots[*Node](8) + slots := NewSlots(8) freeNodeChan := make(chan *Node, N) inuseNodeChan := make(chan *Node, N) - newNode := func() *Node { - node := NewNode([]uint64{1, 9, 17, 25, 33}) - node.RandCacheID = func() cacheID { return 100 } - return node - } for i := 0; i < N; i++ { - freeNodeChan <- newNode() + freeNodeChan <- newNodeForTest(1, 9, 17, 25, 33) } ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) @@ -92,11 +86,38 @@ func TestSlotsConcurrentOps(t *testing.T) { return case node := <-inuseNodeChan: // keys belong to the same slot after hash, since slot num is 8 - slots.Free(node) - freeNodeChan <- newNode() + slots.Remove(node) + freeNodeChan <- newNodeForTest(1, 9, 17, 25, 33) } } }() wg.Wait() } + +func TestSortAndDedupHash(t *testing.T) { + // If a transaction contains multiple rows, these rows may generate the same hash + // in some rare cases. We should dedup these hashes to avoid unnecessary self cyclic + // dependency in the causality dependency graph. + t.Parallel() + testCases := []struct { + hashes []uint64 + expected []uint64 + }{{ + // No duplicate hashes + hashes: []uint64{1, 2, 3, 4, 5}, + expected: []uint64{1, 2, 3, 4, 5}, + }, { + // Duplicate hashes + hashes: []uint64{1, 2, 3, 4, 5, 1, 2, 3, 4, 5}, + expected: []uint64{1, 2, 3, 4, 5}, + }, { + // Has hash value larger than slots count, should sort by `hash % numSlots` first. + hashes: []uint64{4, 9, 9, 3}, + expected: []uint64{9, 3, 4}, + }} + + for _, tc := range testCases { + require.Equal(t, tc.expected, sortAndDedupHashes(tc.hashes, 8)) + } +} diff --git a/pkg/causality/tests/integration_test.go b/pkg/causality/tests/integration_test.go index 35de5533d1b..99fe40f5cef 100644 --- a/pkg/causality/tests/integration_test.go +++ b/pkg/causality/tests/integration_test.go @@ -40,7 +40,7 @@ func TestConflictBasics(t *testing.T) { numWorkers, numSlots, newUniformGenerator(workingSetSize, batchSize, numSlots), ).WithExecFunc( func(txn *txnForTest) error { - for _, key := range txn.GenSortedDedupKeysHash(numSlots) { + for _, key := range txn.ConflictKeys() { // Access a position in the array without synchronization, // so that if causality check is buggy, the Go race detection would fail. conflictArray[key]++ diff --git a/pkg/causality/tests/worker.go b/pkg/causality/tests/worker.go index 50f50319262..363fce75267 100644 --- a/pkg/causality/tests/worker.go +++ b/pkg/causality/tests/worker.go @@ -28,7 +28,7 @@ type txnForTest struct { func (t *txnForTest) OnConflictResolved() {} -func (t *txnForTest) GenSortedDedupKeysHash(numSlots uint64) []uint64 { +func (t *txnForTest) ConflictKeys() []uint64 { return t.keys } diff --git a/pkg/causality/txn_cache.go b/pkg/causality/txn_cache.go index be7ec1ab4d6..86d064f0100 100644 --- a/pkg/causality/txn_cache.go +++ b/pkg/causality/txn_cache.go @@ -1,4 +1,4 @@ -// Copyright 2022 PingCAP, Inc. +// Copyright 2024 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,15 +20,22 @@ import ( "go.uber.org/zap" ) +const ( + // BlockStrategyWaitAvailable means the cache will block until there is an available slot. + BlockStrategyWaitAvailable BlockStrategy = "waitAvailable" + // BlockStrategyWaitEmpty means the cache will block until all cached txns are consumed. + BlockStrategyWaitEmpty = "waitEmpty" + // TODO: maybe we can implement a strategy that can automatically adapt to different scenarios +) + +// BlockStrategy is the strategy to handle the situation when the cache is full. +type BlockStrategy string + type txnEvent interface { // OnConflictResolved is called when the event leaves ConflictDetector. OnConflictResolved() - - // Hashes are in range [0, math.MaxUint64) and must be deduped. - // - // NOTE: if the conflict detector is accessed by multiple threads concurrently, - // GenSortedDedupKeysHash must also be sorted based on `key % numSlots`. - GenSortedDedupKeysHash(numSlots uint64) []uint64 + // ConflictKeys returns the keys that the event conflicts with. + ConflictKeys() []uint64 } // TxnWithNotifier is a wrapper of txnEvent with a PostTxnExecuted. @@ -129,14 +136,3 @@ func (w *boundedTxnCacheWithBlock[Txn]) add(txn TxnWithNotifier[Txn]) bool { func (w *boundedTxnCacheWithBlock[Txn]) out() <-chan TxnWithNotifier[Txn] { return w.ch } - -// BlockStrategy is the strategy to handle the situation when the cache is full. -type BlockStrategy string - -const ( - // BlockStrategyWaitAvailable means the cache will block until there is an available slot. - BlockStrategyWaitAvailable BlockStrategy = "waitAvailable" - // BlockStrategyWaitEmpty means the cache will block until all cached txns are consumed. - BlockStrategyWaitEmpty = "waitAll" - // TODO: maybe we can implement a strategy that can automatically adapt to different scenarios -) diff --git a/pkg/causality/txn_cache_test.go b/pkg/causality/txn_cache_test.go index 3ddfe4e518e..d83a937c5c2 100644 --- a/pkg/causality/txn_cache_test.go +++ b/pkg/causality/txn_cache_test.go @@ -1,4 +1,4 @@ -// Copyright 2022 PingCAP, Inc. +// Copyright 2024 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ type mockTxnEvent struct{} func (m mockTxnEvent) OnConflictResolved() { } -func (m mockTxnEvent) GenSortedDedupKeysHash(numSlots uint64) []uint64 { +func (m mockTxnEvent) ConflictKeys() []uint64 { return nil }