Skip to content

Commit

Permalink
feat: make ChainHead api local
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Feb 1, 2023
1 parent 5cd13d3 commit 4ef5149
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 48 deletions.
4 changes: 1 addition & 3 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ type Proxy interface {
// between the two objects.
ChainStatObj(ctx context.Context, obj cid.Cid, base cid.Cid) (api.ObjStat, error) //perm:read

// ChainHead returns the current head of the chain.
ChainHead(context.Context) (*types.TipSet, error) //perm:read

// ChainGetBlock returns the block specified by the given CID.
ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error) //perm:read

Expand Down Expand Up @@ -385,6 +382,7 @@ type Local interface {
// ChainNotify returns channel with chain head updates.
// First message is guaranteed to be of len == 1, and type == 'current'.
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
ChainHead(context.Context) (*types.TipSet, error)
}

// UnSupport is a subset of api.FullNode
Expand Down
10 changes: 10 additions & 0 deletions co/chain_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"time"

"github.com/filecoin-project/lotus/chain/types"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/store"
)
Expand Down Expand Up @@ -73,3 +75,11 @@ func (c *Coordinator) ChainNotify(ctx context.Context) (<-chan []*api.HeadChange

return out, nil
}

// ChainHead impls api.FullNode.ChainNotify
func (c *Coordinator) ChainHead(in0 context.Context) (*types.TipSet, error) {
c.headMu.RLock()
defer c.headMu.RUnlock()

return c.head, nil
}
2 changes: 0 additions & 2 deletions co/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,7 @@ func (c *Coordinator) handleCandidate(hc *headCandidate) {
preAddrs := c.sel.getAddrOfPriority(CatchUpPriority)
c.sel.setPriority(DelayPriority, preAddrs...)
c.sel.setPriority(CatchUpPriority, addr)

c.tspub.Pub(headChanges, tipsetChangeTopic)

return
}

Expand Down
22 changes: 15 additions & 7 deletions co/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,8 @@ import (

// NewCtx constructs a Ctx instance
func NewCtx(mctx helpers.MetricsCtx, lc fx.Lifecycle, nodeOpt NodeOption) (*Ctx, error) {
bcache, err := newBlockHeaderCache(1 << 20)
if err != nil {
return nil, err
}

return &Ctx{
lc: helpers.LifecycleCtx(mctx, lc),
bcache: bcache,
headCh: make(chan *headCandidate, 256),
errNodeCh: make(chan string, 256),
nodeOpt: nodeOpt,
Expand All @@ -31,7 +25,6 @@ func NewCtx(mctx helpers.MetricsCtx, lc fx.Lifecycle, nodeOpt NodeOption) (*Ctx,
// Ctx contains the shared components between different modules
type Ctx struct {
lc context.Context
bcache *blockHeaderCache
headCh chan *headCandidate
errNodeCh chan string

Expand Down Expand Up @@ -77,3 +70,18 @@ func (bc *blockHeaderCache) load(c cid.Cid) (*types.BlockHeader, bool) {
blk, ok := val.(*types.BlockHeader)
return blk, ok
}

func (bc *blockHeaderCache) has(c cid.Cid) bool {
_, ok := bc.cache.Peek(c)
return ok
}

func (bc *blockHeaderCache) hasKey(key types.TipSetKey) bool {
for _, blkCid := range key.Cids() {
has := bc.has(blkCid)
if !has {
return false
}
}
return true
}
34 changes: 9 additions & 25 deletions co/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"sync"
"time"

lru "github.com/hashicorp/golang-lru"

"github.com/filecoin-project/lotus/api/v1api"
"github.com/ipfs-force-community/venus-common-utils/apiinfo"

Expand Down Expand Up @@ -73,8 +71,9 @@ type Node struct {
closer jsonrpc.ClientCloser
}

blkCache *lru.ARCCache
log *zap.SugaredLogger
blkCache *blockHeaderCache

log *zap.SugaredLogger
}

func NewNode(cctx *Ctx, info NodeInfo) (*Node, error) {
Expand All @@ -83,11 +82,12 @@ func NewNode(cctx *Ctx, info NodeInfo) (*Node, error) {
return nil, err
}
ctx, cancel := context.WithCancel(cctx.lc)
blkCache, err := lru.NewARC(100)
blkCache, err := newBlockHeaderCache(1 << 20)
if err != nil {
cancel()
return nil, err
}

return &Node{
reListenInterval: cctx.nodeOpt.ReListenMinInterval,
opt: cctx.nodeOpt,
Expand Down Expand Up @@ -211,7 +211,7 @@ func (n *Node) reListen() (<-chan []*api.HeadChange, error) {
}

func (n *Node) applyChanges(lifeCtx context.Context, changes []*api.HeadChange) {
n.sctx.bcache.add(changes)
n.blkCache.add(changes)

idx := -1
for i := range changes {
Expand All @@ -220,7 +220,6 @@ func (n *Node) applyChanges(lifeCtx context.Context, changes []*api.HeadChange)
idx = i
case store.HCApply:
idx = i
n.storeKey(changes[i].Val.Key())
}
}

Expand Down Expand Up @@ -292,31 +291,16 @@ func (n *Node) loadTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipS
}

func (n *Node) loadBlockHeader(ctx context.Context, c cid.Cid) (*types.BlockHeader, error) {
if blk, ok := n.sctx.bcache.load(c); ok {
if blk, ok := n.blkCache.load(c); ok {
return blk, nil
}

blk, err := n.upstream.full.ChainGetBlock(ctx, c)
return blk, err
}

func (n *Node) hasKey(key types.TipSetKey) bool {
for _, blkCid := range key.Cids() {
_, has := n.blkCache.Peek(blkCid.String())
if !has {
return false
}
}
return true
}

func (n *Node) storeKey(key types.TipSetKey) {
for _, blkCid := range key.Cids() {
_, has := n.blkCache.Peek(blkCid.String())
if !has {
n.blkCache.Add(blkCid.String(), nil)
}
}
func (n *Node) hasTipset(key types.TipSetKey) bool {
return n.blkCache.hasKey(key)
}

const (
Expand Down
5 changes: 3 additions & 2 deletions co/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ func (s *Selector) Select(tsk types.TipSetKey) (*Node, error) {

for addr, p := range s.priority {
node := s.nodeProvider.GetNode(addr)
if !node.hasKey(tsk) {
log.Warnf("node %s not contains key %s", addr, tsk)
if !tsk.IsEmpty() && node.hasTipset(tsk) && p != ErrPriority {
log.Debugf("node %s has tipset %s, change to catchup node", addr, tsk.Cids())
catchUpQue[addr] = s.weight[addr]
continue
}
if p == CatchUpPriority {
Expand Down
9 changes: 9 additions & 0 deletions proxy/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ type Local struct {
}

// impl api.Local
func (p *Local) ChainHead(in0 context.Context) (out0 *types.TipSet, err error) {
cli, err := p.Select(types.EmptyTSK)
if err != nil {
err = fmt.Errorf("api ChainHead %v", err)
return
}
return cli.ChainHead(in0)
}

func (p *Local) ChainNotify(in0 context.Context) (out0 <-chan []*api1.HeadChange, err error) {
cli, err := p.Select(types.EmptyTSK)
if err != nil {
Expand Down
9 changes: 0 additions & 9 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,6 @@ func (p *Proxy) ChainHasObj(in0 context.Context, in1 cid.Cid) (out0 bool, err er
return cli.ChainHasObj(in0, in1)
}

func (p *Proxy) ChainHead(in0 context.Context) (out0 *types.TipSet, err error) {
cli, err := p.Select(types.EmptyTSK)
if err != nil {
err = fmt.Errorf("api ChainHead %v", err)
return
}
return cli.ChainHead(in0)
}

func (p *Proxy) ChainReadObj(in0 context.Context, in1 cid.Cid) (out0 []uint8, err error) {
cli, err := p.Select(types.EmptyTSK)
if err != nil {
Expand Down

0 comments on commit 4ef5149

Please sign in to comment.