Skip to content

Commit

Permalink
feat: check paths completed for all retrieval types
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Aug 23, 2023
1 parent c40fe59 commit 88ccb73
Show file tree
Hide file tree
Showing 17 changed files with 388 additions and 241 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/benbjohnson/clock v1.3.5
github.com/cespare/xxhash/v2 v2.2.0
github.com/dustin/go-humanize v1.0.1
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7.0.20230818040822-432a10ec7e4a
github.com/filecoin-project/go-retrieval-types v1.2.0
github.com/filecoin-project/go-state-types v0.10.0
github.com/google/uuid v1.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ github.com/filecoin-project/go-commp-utils v0.1.3/go.mod h1:3ENlD1pZySaUout0p9AN
github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20220905160352-62059082a837/go.mod h1:e2YBjSblNVoBckkbv3PPqsq71q98oFkFqL7s1etViGo=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7 h1:v+zJS5B6pA3ptWZS4t8tbt1Hz9qENnN4nVr1w99aSWc=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7/go.mod h1:V3Y4KbttaCwyg1gwkP7iai8CbQx4mZUGjd3h9GZWLKE=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7.0.20230818040822-432a10ec7e4a h1:v6ZyQK5U965p+6QHjbA2QFt9SE4kgQiA+qR2uSCVXI4=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7.0.20230818040822-432a10ec7e4a/go.mod h1:P0/wHIz9WdKSrLg+D9Ue9bc09iAF8lDW8EXDYRmUP00=
github.com/filecoin-project/go-ds-versioning v0.1.2 h1:to4pTadv3IeV1wvgbCbN6Vqd+fu+7tveXgv/rCEZy6w=
github.com/filecoin-project/go-ds-versioning v0.1.2/go.mod h1:C9/l9PnB1+mwPa26BBVpCjG/XQCB0yj/q5CK2J8X1I4=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
Expand Down
4 changes: 3 additions & 1 deletion pkg/internal/itest/client_retrieval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
retrievaltypes "github.com/filecoin-project/go-retrieval-types"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lassie/pkg/internal/itest/mocknet"
"github.com/filecoin-project/lassie/pkg/net/client"
"github.com/filecoin-project/lassie/pkg/retriever/graphsync/client"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
dss "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/storeutil"
bstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-unixfsnode"
Expand Down Expand Up @@ -129,6 +130,7 @@ func runRetrieval(t *testing.T, ctx context.Context, mrn *mocknet.MockRetrievalN
proposal,
selectorparse.CommonSelector_ExploreAllRecursively,
0,
func(rp graphsync.ResponseProgress) {},
subscriberLocal,
shutdown,
)
Expand Down
89 changes: 67 additions & 22 deletions pkg/internal/itest/http_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,20 @@ func TestHttpFetch(t *testing.T) {
wrapPath := "/want2/want1/want0"

testCases := []struct {
name string
graphsyncRemotes int
bitswapRemotes int
httpRemotes int
disableGraphsync bool
expectFail bool
expectUncleanEnd bool
expectUnauthorized bool
modifyHttpConfig func(httpserver.HttpServerConfig) httpserver.HttpServerConfig
generate func(*testing.T, io.Reader, []testpeer.TestPeer) []unixfs.DirEntry
paths []string
setHeader headerSetter
modifyQueries []queryModifier
validateBodies []bodyValidator
lassieOpts lassieOptsGen
name string
graphsyncRemotes int
bitswapRemotes int
httpRemotes int
disableGraphsync bool
expectStatusCode int
expectUncleanEnd bool
modifyHttpConfig func(httpserver.HttpServerConfig) httpserver.HttpServerConfig
generate func(*testing.T, io.Reader, []testpeer.TestPeer) []unixfs.DirEntry
paths []string
setHeader headerSetter
modifyQueries []queryModifier
validateBodies []bodyValidator
lassieOpts lassieOptsGen
}{
{
name: "graphsync large sharded file",
Expand Down Expand Up @@ -690,6 +689,54 @@ func TestHttpFetch(t *testing.T) {
unixfs.CompareDirEntries(t, srcData.Children[1].Children[1].Children[1], gotDir)
}},
},
{
name: "graphsync nested file, with path plus extra, unclean end (path unfulfilled)",
graphsyncRemotes: 1,
generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry {
lsys := remotes[0].LinkSystem
return []unixfs.DirEntry{unixfs.WrapContent(t, rndReader, lsys, unixfs.GenerateFile(t, remotes[0].LinkSystem, rndReader, 1024), wrapPath, false)}
},
paths: []string{wrapPath + "/more/not/here"},
modifyQueries: []queryModifier{entityQuery},
validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) {
// no validation, Go's body parser will fail on the unclean end and we're unlikely
// to have enough content quick enough to parse before it encounters the unclean end
// and returns nothing
}},
expectUncleanEnd: true,
},
{
name: "bitswap nested file, with path plus extra, unclean end (path unfulfilled)",
bitswapRemotes: 1,
generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry {
lsys := remotes[0].LinkSystem
return []unixfs.DirEntry{unixfs.WrapContent(t, rndReader, lsys, unixfs.GenerateFile(t, remotes[0].LinkSystem, rndReader, 1024), wrapPath, false)}
},
paths: []string{wrapPath + "/more/not/here"},
modifyQueries: []queryModifier{entityQuery},
validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) {
// no validation, Go's body parser will fail on the unclean end and we're unlikely
// to have enough content quick enough to parse before it encounters the unclean end
// and returns nothing
}},
expectUncleanEnd: true,
},
{
name: "http nested file, with path plus extra, unclean end (path unfulfilled)",
httpRemotes: 1,
generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry {
lsys := remotes[0].LinkSystem
return []unixfs.DirEntry{unixfs.WrapContent(t, rndReader, lsys, unixfs.GenerateFile(t, remotes[0].LinkSystem, rndReader, 1024), wrapPath, false)}
},
paths: []string{wrapPath + "/more/not/here"},
modifyQueries: []queryModifier{entityQuery},
validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) {
// no validation, Go's body parser will fail on the unclean end and we're unlikely
// to have enough content quick enough to parse before it encounters the unclean end
// and returns nothing
}},
expectUncleanEnd: true,
},
{
// A very contrived example - we spread the content generated for this test across 4 peers,
// then we also make sure the root is in all of them, so the CandidateFinder will return them
Expand Down Expand Up @@ -765,7 +812,7 @@ func TestHttpFetch(t *testing.T) {
name: "two separate, parallel graphsync retrievals, with graphsync disabled",
graphsyncRemotes: 2,
disableGraphsync: true,
expectFail: true,
expectStatusCode: http.StatusGatewayTimeout,
generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry {
return []unixfs.DirEntry{
unixfs.GenerateFile(t, remotes[0].LinkSystem, rndReader, 4<<20),
Expand Down Expand Up @@ -929,7 +976,7 @@ func TestHttpFetch(t *testing.T) {
cfg.AccessToken = "super-secret"
return cfg
},
expectUnauthorized: true,
expectStatusCode: http.StatusUnauthorized,
},
{
name: "with access token - allows requests with authorization header",
Expand All @@ -945,7 +992,7 @@ func TestHttpFetch(t *testing.T) {
header.Set("Authorization", "Bearer super-secret")
header.Add("Accept", "application/vnd.ipld.car")
},
expectUnauthorized: false,
expectStatusCode: http.StatusOK, // i.e. not StatusUnauthorized
},
}

Expand Down Expand Up @@ -1059,10 +1106,8 @@ func TestHttpFetch(t *testing.T) {
}

for i, resp := range responses {
if testCase.expectFail {
req.Equal(http.StatusGatewayTimeout, resp.StatusCode)
} else if testCase.expectUnauthorized {
req.Equal(http.StatusUnauthorized, resp.StatusCode)
if testCase.expectStatusCode != 0 && testCase.expectStatusCode != http.StatusOK {
req.Equal(testCase.expectStatusCode, resp.StatusCode)
} else {
if resp.StatusCode != http.StatusOK {
body, err := io.ReadAll(resp.Body)
Expand Down
14 changes: 11 additions & 3 deletions pkg/internal/testutil/mockclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
retrievaltypes "github.com/filecoin-project/go-retrieval-types"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -24,9 +25,10 @@ type DelayedConnectReturn struct {
}

type DelayedClientReturn struct {
ResultStats *types.RetrievalStats
ResultErr error
Delay time.Duration
ResultStats *types.RetrievalStats
ProgressPaths []string
ResultErr error
Delay time.Duration
}

type ClientRetrievalRequest struct {
Expand Down Expand Up @@ -182,6 +184,7 @@ func (mc *MockClient) RetrieveFromPeer(
proposal *retrievaltypes.DealProposal,
selector ipld.Node,
maxBlocks uint64,
progressCallback func(graphsync.ResponseProgress),
eventsCallback datatransfer.Subscriber,
gracefulShutdownRequested <-chan struct{},
) (*types.RetrievalStats, error) {
Expand Down Expand Up @@ -210,6 +213,11 @@ func (mc *MockClient) RetrieveFromPeer(
return nil, context.Canceled
case <-timer.C:
}
for _, path := range drr.ProgressPaths {
progressCallback(graphsync.ResponseProgress{
Path: datamodel.ParsePath(path),
})
}
eventsCallback(datatransfer.Event{Code: datatransfer.Open}, nil)
if drr.ResultStats != nil {
acceptedResponse := &retrievaltypes.DealResponse{
Expand Down
2 changes: 1 addition & 1 deletion pkg/lassie/lassie.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"time"

"github.com/filecoin-project/lassie/pkg/indexerlookup"
"github.com/filecoin-project/lassie/pkg/net/client"
"github.com/filecoin-project/lassie/pkg/net/host"
"github.com/filecoin-project/lassie/pkg/retriever"
"github.com/filecoin-project/lassie/pkg/retriever/graphsync/client"
"github.com/filecoin-project/lassie/pkg/session"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-datastore"
Expand Down
90 changes: 8 additions & 82 deletions pkg/retriever/bitswapretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"io"
"math"
"sync/atomic"
"time"

Expand All @@ -15,19 +14,15 @@ import (
"github.com/filecoin-project/lassie/pkg/events"
"github.com/filecoin-project/lassie/pkg/retriever/bitswaphelpers"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/filecoin-project/lassie/pkg/verifiedcar"
"github.com/ipfs/boxo/bitswap/client"
"github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-unixfsnode"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/linking/preload"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipni/go-libipni/metadata"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -226,15 +221,13 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb
traversalLinkSys.StorageReadOpener = loader
}

// run the retrieval
err = easyTraverse(
ctx,
cidlink.Link{Cid: br.request.Cid},
selector,
traversalLinkSys,
preloader,
br.request.MaxBlocks,
)
err = verifiedcar.Config{
Root: br.request.Cid,
Selector: selector,
ExpectPath: datamodel.ParsePath(br.request.Path),
MaxBlocks: br.request.MaxBlocks,
}.Traverse(ctx, traversalLinkSys, preloader)

if storage != nil {
storage.Stop()
}
Expand Down Expand Up @@ -302,70 +295,3 @@ func loaderForSession(retrievalID types.RetrievalID, inProgressCids InProgressCi
return bytes.NewReader(blk.RawData()), nil
}
}

func easyTraverse(
ctx context.Context,
root datamodel.Link,
traverseSelector datamodel.Node,
lsys linking.LinkSystem,
preloader preload.Loader,
maxBlocks uint64,
) error {

lsys, ecr := newErrorCapturingReader(lsys)
protoChooser := dagpb.AddSupportToChooser(basicnode.Chooser)

// retrieve first node
prototype, err := protoChooser(root, linking.LinkContext{Ctx: ctx})
if err != nil {
return err
}
node, err := lsys.Load(linking.LinkContext{Ctx: ctx}, root, prototype)
if err != nil {
return err
}

progress := traversal.Progress{
Cfg: &traversal.Config{
Ctx: ctx,
LinkSystem: lsys,
LinkTargetNodePrototypeChooser: protoChooser,
Preloader: preloader,
},
}
if maxBlocks > 0 {
progress.Budget = &traversal.Budget{
LinkBudget: int64(maxBlocks) - 1, // first block is already loaded
NodeBudget: math.MaxInt64,
}
}
progress.LastBlock.Link = root
compiledSelector, err := selector.ParseSelector(traverseSelector)
if err != nil {
return err
}

if err := progress.WalkMatching(node, compiledSelector, unixfsnode.BytesConsumingMatcher); err != nil {
return err
}
return ecr.Error
}

type errorCapturingReader struct {
sro linking.BlockReadOpener
Error error
}

func newErrorCapturingReader(lsys linking.LinkSystem) (linking.LinkSystem, *errorCapturingReader) {
ecr := &errorCapturingReader{sro: lsys.StorageReadOpener}
lsys.StorageReadOpener = ecr.StorageReadOpener
return lsys, ecr
}

func (ecr *errorCapturingReader) StorageReadOpener(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) {
r, err := ecr.sro(lc, l)
if err != nil {
ecr.Error = err
}
return r, err
}
Loading

0 comments on commit 88ccb73

Please sign in to comment.