Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce claimtrie temp allocs. #51

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions blockchain/claimtrie.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (b *BlockChain) ParseClaimScripts(block *btcutil.Block, bn *blockNode, view
ht := block.Height()

for _, tx := range block.Transactions() {
h := handler{ht, tx, view, map[string][]byte{}}
h := handler{ht, tx, view, map[change.ClaimID][]byte{}}
if err := h.handleTxIns(b.claimTrie); err != nil {
return err
}
Expand Down Expand Up @@ -67,7 +67,7 @@ type handler struct {
ht int32
tx *btcutil.Tx
view *UtxoViewpoint
spent map[string][]byte
spent map[change.ClaimID][]byte
}

func (h *handler) handleTxIns(ct *claimtrie.ClaimTrie) error {
Expand Down Expand Up @@ -171,6 +171,9 @@ func (b *BlockChain) GetClaimsForName(height int32, name string) (string, *node.

n, err := b.claimTrie.NodeAt(height, normalizedName)
if err != nil {
if n != nil {
n.Close()
}
return string(normalizedName), nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions claimtrie/change/claimid.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func NewIDFromString(s string) (id ClaimID, err error) {
}

// Key is for in-memory maps
func (id ClaimID) Key() string {
return string(id[:])
func (id ClaimID) Key() ClaimID {
return id
}

// String is for anything written to a DB
Expand Down
3 changes: 3 additions & 0 deletions claimtrie/claimtrie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func TestNormalizationFork(t *testing.T) {
r.NoError(err)
r.NotNil(n.BestClaim)
r.Equal(int32(1), n.TakenOverAt)
n.Close()

o8 := wire.OutPoint{Hash: hash, Index: 8}
err = ct.AddClaim([]byte("aÑEJO"), o8, change.NewClaimID(o8), 8)
Expand All @@ -150,6 +151,7 @@ func TestNormalizationFork(t *testing.T) {
n, err = ct.nodeManager.NodeAt(ct.nodeManager.Height(), []byte("test"))
r.NoError(err)
r.Equal(int64(18), n.BestClaim.Amount+n.SupportSums[n.BestClaim.ClaimID.Key()])
n.Close()
}

func TestActivationsOnNormalizationFork(t *testing.T) {
Expand Down Expand Up @@ -229,6 +231,7 @@ func verifyBestIndex(t *testing.T, ct *ClaimTrie, name string, idx uint32, claim
if claims > 0 {
r.Equal(idx, n.BestClaim.OutPoint.Index)
}
n.Close()
}

func TestRebuild(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion claimtrie/cmd/cmd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ func NewNodeDumpCommand() *cobra.Command {
}
defer repo.Close()

changes, err := repo.LoadChanges([]byte(name))
changes, closer, err := repo.LoadChanges([]byte(name))
if err != nil {
return errors.Wrapf(err, "load commands")
}
defer closer()

for _, chg := range changes {
if chg.Height > height {
Expand Down Expand Up @@ -107,6 +108,7 @@ func NewNodeReplayCommand() *cobra.Command {
}

showNode(n)
n.Close()
return nil
},
}
Expand Down
14 changes: 13 additions & 1 deletion claimtrie/node/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package node
import (
"container/list"
"sync"
"sync/atomic"

"github.com/lbryio/lbcd/claimtrie/change"
)
Expand All @@ -27,8 +28,11 @@ func (nc *Cache) insert(name []byte, n *Node, height int32) {
nc.mtx.Lock()
defer nc.mtx.Unlock()

atomic.AddInt32(&n.refcnt, 1)

existing := nc.nodes[key]
if existing != nil {
existing.node.Close()
existing.node = n
existing.height = height
existing.changes = nil
Expand All @@ -38,8 +42,11 @@ func (nc *Cache) insert(name []byte, n *Node, height int32) {

for nc.order.Len() >= nc.limit {
// TODO: maybe ensure that we don't remove nodes that have a lot of changes?
delete(nc.nodes, nc.order.Back().Value.(string))
exp := nc.order.Back().Value.(string)
expired := nc.nodes[exp]
delete(nc.nodes, exp)
nc.order.Remove(nc.order.Back())
expired.node.Close()
}

element := nc.order.PushFront(key)
Expand All @@ -55,6 +62,7 @@ func (nc *Cache) fetch(name []byte, height int32) (*Node, []change.Change, int32
existing := nc.nodes[key]
if existing != nil && existing.height <= height {
nc.order.MoveToFront(existing.element)
atomic.AddInt32(&existing.node.refcnt, 1)
return existing.node, existing.changes, existing.height
}
return nil, nil, -1
Expand Down Expand Up @@ -84,13 +92,17 @@ func (nc *Cache) drop(names [][]byte) {
// we can't roll it backwards because we don't know its previous height value; just toast it
delete(nc.nodes, key)
nc.order.Remove(existing.element)
existing.node.Close()
}
}
}

func (nc *Cache) clear() {
nc.mtx.Lock()
defer nc.mtx.Unlock()
for _, existing := range nc.nodes {
existing.node.Close()
}
nc.nodes = map[string]*cacheLeaf{}
nc.order = list.New()
// we'll let the GC sort out the remains...
Expand Down
7 changes: 7 additions & 0 deletions claimtrie/node/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"strconv"
"strings"
"sync"

"github.com/lbryio/lbcd/chaincfg/chainhash"
"github.com/lbryio/lbcd/claimtrie/change"
Expand Down Expand Up @@ -32,6 +33,12 @@ type Claim struct {
Sequence int32 `msgpack:",omitempty"`
}

func newClaim() interface{} {
return &Claim{}
}

var claimPool = sync.Pool{New: newClaim}

func (c *Claim) setOutPoint(op wire.OutPoint) *Claim {
c.OutPoint = op
return c
Expand Down
1 change: 1 addition & 0 deletions claimtrie/node/hashfork_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func (nm *HashV2Manager) computeClaimHashes(name []byte) (*chainhash.Hash, int32
if err != nil || n == nil {
return nil, 0
}
defer n.Close()

n.SortClaimsByBid()
claimHashes := make([]*chainhash.Hash, 0, len(n.Claims))
Expand Down
24 changes: 19 additions & 5 deletions claimtrie/node/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,21 @@ func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) {

n, changes, oldHeight := nm.cache.fetch(name, height)
if n == nil {
changes, err := nm.repo.LoadChanges(name)
changes, closer, err := nm.repo.LoadChanges(name)
if err != nil {
return nil, errors.Wrap(err, "in load changes")
}
defer closer()

if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block
changes = append(changes, nm.tempChanges[string(name)]...)
}

n, err = nm.newNodeFromChanges(changes, height)
if err != nil {
if n != nil {
n.Close()
}
return nil, errors.Wrap(err, "in new node")
}
// TODO: how can we tell what needs to be cached?
Expand All @@ -74,9 +78,14 @@ func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) {
if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block
changes = append(changes, nm.tempChanges[string(name)]...)
}
old := n
n = n.Clone()
old.Close()
updated, err := nm.updateFromChanges(n, changes, height)
if err != nil {
if n != nil {
n.Close()
}
return nil, errors.Wrap(err, "in update from changes")
}
if !updated {
Expand Down Expand Up @@ -121,13 +130,15 @@ func (nm *BaseManager) updateFromChanges(n *Node, changes []change.Change, heigh
delay := nm.getDelayForName(n, chg)
err := n.ApplyChange(chg, delay)
if err != nil {
n.Close()
return false, errors.Wrap(err, "in apply change")
}
}

if count <= 0 {
// we applied no changes, which means we shouldn't exist if we had all the changes
// or might mean nothing significant if we are applying a partial changeset
n.Close()
return false, nil
}
lastChange := changes[count-1]
Expand Down Expand Up @@ -417,10 +428,13 @@ func (nm *BaseManager) hasChildren(name []byte, height int32, spentChildren map[
return true // children that are spent in the same block cannot count as active children
}
n, _ := nm.newNodeFromChanges(changes, height)
if n != nil && n.HasActiveBestClaim() {
c[changes[0].Name[len(name)]] = true
if len(c) >= required {
return false
if n != nil {
defer n.Close()
if n.HasActiveBestClaim() {
c[changes[0].Name[len(name)]] = true
if len(c) >= required {
return false
}
}
}
return true
Expand Down
2 changes: 2 additions & 0 deletions claimtrie/node/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func TestNodeSort(t *testing.T) {
r.True(OutPointLess(*out1, *out3))

n := New()
defer n.Close()
n.Claims = append(n.Claims, &Claim{OutPoint: *out1, AcceptedAt: 3, Amount: 3, ClaimID: change.ClaimID{1}})
n.Claims = append(n.Claims, &Claim{OutPoint: *out2, AcceptedAt: 3, Amount: 3, ClaimID: change.ClaimID{2}})
n.handleExpiredAndActivated(3)
Expand All @@ -167,6 +168,7 @@ func TestClaimSort(t *testing.T) {
param.ActiveParams.ExtendedClaimExpirationTime = 1000

n := New()
defer n.Close()
n.Claims = append(n.Claims, &Claim{OutPoint: *out2, AcceptedAt: 3, Amount: 3, ClaimID: change.ClaimID{2}, Status: Activated})
n.Claims = append(n.Claims, &Claim{OutPoint: *out3, AcceptedAt: 3, Amount: 2, ClaimID: change.ClaimID{3}, Status: Activated})
n.Claims = append(n.Claims, &Claim{OutPoint: *out3, AcceptedAt: 4, Amount: 2, ClaimID: change.ClaimID{4}, Status: Activated})
Expand Down
Loading