From 3232f7db405859d1466b1b6389323e275a82c1f4 Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 26 Feb 2025 15:45:37 +0800 Subject: [PATCH] feat: support to set piece length for preheat (#3848) Signed-off-by: Gaius --- .github/workflows/compatibility-e2e-v2.yml | 8 +- client-rs | 2 +- go.mod | 2 +- go.sum | 4 +- internal/job/types.go | 1 + manager/job/preheat.go | 2 + manager/job/task.go | 4 +- manager/types/job.go | 18 +++++ pkg/idgen/task_id.go | 10 ++- pkg/idgen/task_id_test.go | 25 +++++- scheduler/job/job.go | 7 +- scheduler/resource/standard/task.go | 10 --- scheduler/resource/standard/task_test.go | 25 +++--- .../evaluator/evaluator_base_test.go | 76 +++++++++---------- scheduler/scheduling/scheduling.go | 1 - scheduler/scheduling/scheduling_test.go | 47 ++++++------ scheduler/service/service_v1.go | 2 + scheduler/service/service_v1_test.go | 71 +++++++++-------- scheduler/service/service_v2.go | 4 +- scheduler/service/service_v2_test.go | 38 +++++----- test/e2e/v2/util/file.go | 2 +- test/e2e/v2/util/task.go | 9 +++ .../testdata/charts/config-v2-rate-limit.yaml | 4 + test/testdata/charts/config-v2.yaml | 4 + test/testdata/charts/config.yaml | 5 +- 25 files changed, 218 insertions(+), 163 deletions(-) diff --git a/.github/workflows/compatibility-e2e-v2.yml b/.github/workflows/compatibility-e2e-v2.yml index 4b7afbdd7ea..8c884ccedb4 100644 --- a/.github/workflows/compatibility-e2e-v2.yml +++ b/.github/workflows/compatibility-e2e-v2.yml @@ -31,22 +31,22 @@ jobs: include: - module: manager image: manager - image-tag: v2.2.1-rc.0 + image-tag: v2.2.1-rc.2 chart-name: manager skip: "Rate Limit" - module: scheduler image: scheduler - image-tag: v2.2.1-rc.0 + image-tag: v2.2.1-rc.2 chart-name: scheduler skip: "Rate Limit" - module: client image: client - image-tag: v0.2.11 + image-tag: v0.2.14 chart-name: client skip: "Rate Limit" - module: seed-client image: client - image-tag: v0.2.11 + image-tag: v0.2.14 chart-name: seed-client skip: "Rate Limit" diff --git a/client-rs b/client-rs index 3339d5ca2fe..5e659a210b5 160000 --- a/client-rs +++ b/client-rs @@ -1 +1 @@ -Subproject commit 3339d5ca2feac41a63f0472811cb38205e5fbaf2 +Subproject commit 5e659a210b5a2e33bca46858bf1bb528a3f45546 diff --git a/go.mod b/go.mod index 5cca3a6152c..45f32b240bd 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.23.0 require ( cloud.google.com/go/storage v1.50.0 - d7y.io/api/v2 v2.1.27 + d7y.io/api/v2 v2.1.30 github.com/MysteriousPotato/go-lockable v1.0.0 github.com/RichardKnop/machinery v1.10.8 github.com/Showmax/go-fqdn v1.0.0 diff --git a/go.sum b/go.sum index 8f9529fe1df..6da8d0602e7 100644 --- a/go.sum +++ b/go.sum @@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.50.0 h1:3TbVkzTooBvnZsk7WaAQfOsNrdoM8QHusXA1cpk6Q cloud.google.com/go/storage v1.50.0/go.mod h1:l7XeiD//vx5lfqE3RavfmU9yvk5Pp0Zhcv482poyafY= cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI= cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io= -d7y.io/api/v2 v2.1.27 h1:bEiAnNNs944DrV1rZP4m1BR9vh8KkBKLwlyVv+kOQoQ= -d7y.io/api/v2 v2.1.27/go.mod h1:RFsSiRgEBfMrSHlrZWbkJ3xil8fnbbGSj0UMwbUUC5I= +d7y.io/api/v2 v2.1.30 h1:KCHcg2oWX8jSFwJXuxgmNCPlPx29/2tcMXHT7wVKqWo= +d7y.io/api/v2 v2.1.30/go.mod h1:RFsSiRgEBfMrSHlrZWbkJ3xil8fnbbGSj0UMwbUUC5I= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/internal/job/types.go b/internal/job/types.go index d54f9a20eb5..d1111f882c1 100644 --- a/internal/job/types.go +++ b/internal/job/types.go @@ -23,6 +23,7 @@ import ( // PreheatRequest defines the request parameters for preheating. type PreheatRequest struct { URL string `json:"url" validate:"required,url"` + PieceLength *uint64 `json:"pieceLength" binding:"omitempty,gte=4194304"` Tag string `json:"tag" validate:"omitempty"` FilteredQueryParams string `json:"filtered_query_params" validate:"omitempty"` Headers map[string]string `json:"headers" validate:"omitempty"` diff --git a/manager/job/preheat.go b/manager/job/preheat.go index 50c61cfd241..e5d08ef9793 100644 --- a/manager/job/preheat.go +++ b/manager/job/preheat.go @@ -128,6 +128,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul files = []internaljob.PreheatRequest{ { URL: json.URL, + PieceLength: json.PieceLength, Tag: json.Tag, FilteredQueryParams: json.FilteredQueryParams, Headers: json.Headers, @@ -349,6 +350,7 @@ func (p *preheat) parseLayers(manifests []distribution.Manifest, args types.Preh header.Set("Accept", v.MediaType) layer := internaljob.PreheatRequest{ URL: image.blobsURL(v.Digest.String()), + PieceLength: args.PieceLength, Tag: args.Tag, FilteredQueryParams: args.FilteredQueryParams, Headers: nethttp.HeaderToMap(header), diff --git a/manager/job/task.go b/manager/job/task.go index de078f50511..2974893f49a 100644 --- a/manager/job/task.go +++ b/manager/job/task.go @@ -63,7 +63,7 @@ func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler, taskID := json.TaskID if json.URL != "" { - taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams)) + taskID = idgen.TaskIDV2(json.URL, json.PieceLength, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams)) } args, err := internaljob.MarshalRequest(internaljob.GetTaskRequest{ @@ -121,7 +121,7 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul taskID := json.TaskID if json.URL != "" { - taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams)) + taskID = idgen.TaskIDV2(json.URL, json.PieceLength, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams)) } args, err := internaljob.MarshalRequest(internaljob.DeleteTaskRequest{ diff --git a/manager/types/job.go b/manager/types/job.go index f9421196feb..fa982b738d3 100644 --- a/manager/types/job.go +++ b/manager/types/job.go @@ -111,6 +111,12 @@ type PreheatArgs struct { // URL is the image url for preheating. URL string `json:"url" binding:"required"` + // PieceLength is the piece length(bytes) for downloading file. The value needs to + // be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib). + // If the piece length is not specified, the piece length will be calculated + // according to the file size. + PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"` + // Tag is the tag for preheating. Tag string `json:"tag" binding:"omitempty"` @@ -177,6 +183,12 @@ type GetTaskArgs struct { // URL is the download url of the task. URL string `json:"url" binding:"omitempty"` + // PieceLength is the piece length(bytes) for downloading file. The value needs to + // be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib). + // If the piece length is not specified, the piece length will be calculated + // according to the file size. + PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"` + // Tag is the tag of the task. Tag string `json:"tag" binding:"omitempty"` @@ -211,6 +223,12 @@ type DeleteTaskArgs struct { // URL is the download url of the task. URL string `json:"url" binding:"omitempty"` + // PieceLength is the piece length(bytes) for downloading file. The value needs to + // be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib). + // If the piece length is not specified, the piece length will be calculated + // according to the file size. + PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"` + // Tag is the tag of the task. Tag string `json:"tag" binding:"omitempty"` diff --git a/pkg/idgen/task_id.go b/pkg/idgen/task_id.go index 44f82847296..39e6a48999a 100644 --- a/pkg/idgen/task_id.go +++ b/pkg/idgen/task_id.go @@ -17,6 +17,7 @@ package idgen import ( + "strconv" "strings" commonv1 "d7y.io/api/v2/pkg/apis/common/v1" @@ -91,11 +92,16 @@ func ParseFilteredQueryParams(rawFilteredQueryParams string) []string { } // TaskIDV2 generates v2 version of task id. -func TaskIDV2(url, tag, application string, filteredQueryParams []string) string { +func TaskIDV2(url string, pieceLength *uint64, tag, application string, filteredQueryParams []string) string { url, err := neturl.FilterQueryParams(url, filteredQueryParams) if err != nil { url = "" } - return pkgdigest.SHA256FromStrings(url, tag, application) + params := []string{url, tag, application} + if pieceLength != nil { + params = append(params, strconv.FormatUint(*pieceLength, 10)) + } + + return pkgdigest.SHA256FromStrings(params...) } diff --git a/pkg/idgen/task_id_test.go b/pkg/idgen/task_id_test.go index 6b7e95f9a26..bef59ee4afb 100644 --- a/pkg/idgen/task_id_test.go +++ b/pkg/idgen/task_id_test.go @@ -107,9 +107,12 @@ func TestTaskIDV1(t *testing.T) { } func TestTaskIDV2(t *testing.T) { + pieceLength := uint64(1024) + tests := []struct { name string url string + pieceLength *uint64 tag string application string filters []string @@ -118,9 +121,20 @@ func TestTaskIDV2(t *testing.T) { { name: "generate taskID", url: "https://example.com", + pieceLength: &pieceLength, tag: "foo", application: "bar", filters: []string{}, + expect: func(t *testing.T, d any) { + assert := assert.New(t) + assert.Equal(d, "99a47b38e9d3321aebebd715bea0483c1400cef2f767f84d97458f9dcedff221") + }, + }, + { + name: "generate taskID with tag and application", + url: "https://example.com", + tag: "foo", + application: "bar", expect: func(t *testing.T, d any) { assert := assert.New(t) assert.Equal(d, "160fa7f001d9d2e893130894fbb60a5fb006e1d61bff82955f2946582bc9de1d") @@ -144,6 +158,15 @@ func TestTaskIDV2(t *testing.T) { assert.Equal(d, "63dee2822037636b0109876b58e95692233840753a882afa69b9b5ee82a6c57d") }, }, + { + name: "generate taskID with pieceLength", + url: "https://example.com", + pieceLength: &pieceLength, + expect: func(t *testing.T, d any) { + assert := assert.New(t) + assert.Equal(d, "40c21de3ad2f1470ca1a19a2ad2577803a1829851f6cf862ffa2d4577ae51d38") + }, + }, { name: "generate taskID with filters", url: "https://example.com?foo=foo&bar=bar", @@ -157,7 +180,7 @@ func TestTaskIDV2(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - tc.expect(t, TaskIDV2(tc.url, tc.tag, tc.application, tc.filters)) + tc.expect(t, TaskIDV2(tc.url, tc.pieceLength, tc.tag, tc.application, tc.filters)) }) } } diff --git a/scheduler/job/job.go b/scheduler/job/job.go index b6e370a9231..55c67260cf3 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -170,9 +170,9 @@ func (j *job) preheat(ctx context.Context, data string) (string, error) { return "", err } - taskID := idgen.TaskIDV2(req.URL, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator)) + taskID := idgen.TaskIDV2(req.URL, req.PieceLength, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator)) log := logger.WithTask(taskID, req.URL) - log.Infof("preheat %s request: %#v", req.URL, req) + log.Infof("preheat %s %d request: %#v", req.URL, req.PieceLength, req) ctx, cancel := context.WithTimeout(ctx, req.Timeout) defer cancel() @@ -299,6 +299,7 @@ func (j *job) preheatAllSeedPeers(ctx context.Context, taskID string, req *inter taskID, &dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{ Url: req.URL, + PieceLength: req.PieceLength, Type: commonv2.TaskType_STANDARD, Tag: &req.Tag, Application: &req.Application, @@ -440,6 +441,7 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj taskID, &dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{ Url: req.URL, + PieceLength: req.PieceLength, Type: commonv2.TaskType_STANDARD, Tag: &req.Tag, Application: &req.Application, @@ -583,6 +585,7 @@ func (j *job) preheatV2(ctx context.Context, taskID string, req *internaljob.Pre stream, err := j.resource.SeedPeer().Client().DownloadTask(ctx, taskID, &dfdaemonv2.DownloadTaskRequest{ Download: &commonv2.Download{ Url: req.URL, + PieceLength: req.PieceLength, Type: commonv2.TaskType_STANDARD, Tag: &req.Tag, Application: &req.Application, diff --git a/scheduler/resource/standard/task.go b/scheduler/resource/standard/task.go index 7ccdcf2e8ca..0ce3e719fc2 100644 --- a/scheduler/resource/standard/task.go +++ b/scheduler/resource/standard/task.go @@ -87,13 +87,6 @@ const ( // TaskOption is a functional option for task. type TaskOption func(task *Task) -// WithPieceLength set PieceLength for task. -func WithPieceLength(pieceLength int32) TaskOption { - return func(t *Task) { - t.PieceLength = pieceLength - } -} - // WithDigest set Digest for task. func WithDigest(d *digest.Digest) TaskOption { return func(t *Task) { @@ -127,9 +120,6 @@ type Task struct { // Task request headers. Header map[string]string - // Task piece length. - PieceLength int32 - // DirectPiece is tiny piece data. DirectPiece []byte diff --git a/scheduler/resource/standard/task_test.go b/scheduler/resource/standard/task_test.go index 0547d5c435a..d2a44f93c26 100644 --- a/scheduler/resource/standard/task_test.go +++ b/scheduler/resource/standard/task_test.go @@ -47,16 +47,16 @@ var ( CreatedAt: time.Now(), } - mockTaskBackToSourceLimit int32 = 200 - mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) - mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") - mockTaskTag = "d7y" - mockTaskApplication = "foo" - mockTaskFilteredQueryParams = []string{"bar"} - mockTaskHeader = map[string]string{"content-length": "100"} - mockTaskPieceLength int32 = 2048 - mockPieceDigest = digest.New(digest.AlgorithmMD5, "ad83a945518a4ef007d8b2db2ef165b3") + mockTaskBackToSourceLimit int32 = 200 + mockTaskURL = "http://example.com/foo" + mockTaskPieceLength uint64 = 2048 + mockTaskID = idgen.TaskIDV2(mockTaskURL, &mockTaskPieceLength, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) + mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") + mockTaskTag = "d7y" + mockTaskApplication = "foo" + mockTaskFilteredQueryParams = []string{"bar"} + mockTaskHeader = map[string]string{"content-length": "100"} + mockPieceDigest = digest.New(digest.AlgorithmMD5, "ad83a945518a4ef007d8b2db2ef165b3") ) func TestTask_NewTask(t *testing.T) { @@ -78,7 +78,6 @@ func TestTask_NewTask(t *testing.T) { assert.Equal(task.Application, mockTaskApplication) assert.EqualValues(task.FilteredQueryParams, mockTaskFilteredQueryParams) assert.EqualValues(task.Header, mockTaskHeader) - assert.Equal(task.PieceLength, int32(0)) assert.Empty(task.DirectPiece) assert.Equal(task.ContentLength.Load(), int64(-1)) assert.Equal(task.TotalPieceCount.Load(), int32(0)) @@ -94,7 +93,7 @@ func TestTask_NewTask(t *testing.T) { }, { name: "new task with piece length", - options: []TaskOption{WithPieceLength(mockTaskPieceLength)}, + options: []TaskOption{}, expect: func(t *testing.T, task *Task) { assert := assert.New(t) assert.Equal(task.ID, mockTaskID) @@ -105,7 +104,6 @@ func TestTask_NewTask(t *testing.T) { assert.Equal(task.Application, mockTaskApplication) assert.EqualValues(task.FilteredQueryParams, mockTaskFilteredQueryParams) assert.EqualValues(task.Header, mockTaskHeader) - assert.Equal(task.PieceLength, mockTaskPieceLength) assert.Empty(task.DirectPiece) assert.Equal(task.ContentLength.Load(), int64(-1)) assert.Equal(task.TotalPieceCount.Load(), int32(0)) @@ -132,7 +130,6 @@ func TestTask_NewTask(t *testing.T) { assert.Equal(task.Application, mockTaskApplication) assert.EqualValues(task.FilteredQueryParams, mockTaskFilteredQueryParams) assert.EqualValues(task.Header, mockTaskHeader) - assert.Equal(task.PieceLength, int32(0)) assert.Empty(task.DirectPiece) assert.Equal(task.ContentLength.Load(), int64(-1)) assert.Equal(task.TotalPieceCount.Load(), int32(0)) diff --git a/scheduler/scheduling/evaluator/evaluator_base_test.go b/scheduler/scheduling/evaluator/evaluator_base_test.go index 6a0b868770b..77a37342ffb 100644 --- a/scheduler/scheduling/evaluator/evaluator_base_test.go +++ b/scheduler/scheduling/evaluator/evaluator_base_test.go @@ -134,20 +134,20 @@ var ( Platform: "darwin", } - mockTaskBackToSourceLimit int32 = 200 - mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) - mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") - mockTaskTag = "d7y" - mockTaskApplication = "foo" - mockTaskFilteredQueryParams = []string{"bar"} - mockTaskHeader = map[string]string{"content-length": "100"} - mockTaskPieceLength int32 = 2048 - mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false) - mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true) - mockHostLocation = "bas" - mockHostIDC = "baz" - mockPeerID = idgen.PeerIDV2() + mockTaskBackToSourceLimit int32 = 200 + mockTaskURL = "http://example.com/foo" + mockTaskPieceLength uint64 = 2048 + mockTaskID = idgen.TaskIDV2(mockTaskURL, &mockTaskPieceLength, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) + mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") + mockTaskTag = "d7y" + mockTaskApplication = "foo" + mockTaskFilteredQueryParams = []string{"bar"} + mockTaskHeader = map[string]string{"content-length": "100"} + mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false) + mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true) + mockHostLocation = "bas" + mockHostIDC = "baz" + mockPeerID = idgen.PeerIDV2() ) func TestEvaluatorBase_newEvaluatorBase(t *testing.T) { @@ -184,7 +184,7 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { name: "parents is empty", parents: []*standard.Peer{}, child: standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -201,13 +201,13 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { name: "evaluate single parent", parents: []*standard.Peer{ standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), }, child: standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -226,33 +226,33 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { name: "evaluate parents with free upload count", parents: []*standard.Peer{ standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( "bar", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( "baz", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( "bac", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( "bae", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), }, child: standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -277,33 +277,33 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { name: "evaluate parents with pieces", parents: []*standard.Peer{ standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( "bar", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( "baz", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( "bac", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( "bae", mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), }, child: standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -347,12 +347,12 @@ func TestEvaluatorBase_evaluate(t *testing.T) { { name: "evaluate parent", parent: standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), child: standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -367,12 +367,12 @@ func TestEvaluatorBase_evaluate(t *testing.T) { { name: "evaluate parent with pieces", parent: standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), child: standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), - standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)), + standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)), standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), @@ -400,7 +400,7 @@ func TestEvaluatorBase_calculatePieceScore(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) tests := []struct { name string @@ -562,7 +562,7 @@ func TestEvaluatorBase_calculatehostUploadSuccessScore(t *testing.T) { host := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) mockPeer := standard.NewPeer(mockPeerID, mockTask, host) e := newEvaluatorBase() tc.mock(host) @@ -612,7 +612,7 @@ func TestEvaluatorBase_calculateFreeUploadScore(t *testing.T) { host := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) mockPeer := standard.NewPeer(mockPeerID, mockTask, host) e := newEvaluatorBase() tc.mock(host, mockPeer) @@ -664,7 +664,7 @@ func TestEvaluatorBase_calculateHostTypeScore(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) e := newEvaluatorBase() tc.mock(peer) @@ -878,7 +878,7 @@ func TestEvaluatorBase_IsBadParent(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) tests := []struct { name string diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go index afb3a9155cd..ba23c4803d2 100644 --- a/scheduler/scheduling/scheduling.go +++ b/scheduler/scheduling/scheduling.go @@ -847,7 +847,6 @@ func constructSuccessNormalTaskResponse(candidateParents []*standard.Peer) *sche Application: &candidateParent.Task.Application, FilteredQueryParams: candidateParent.Task.FilteredQueryParams, RequestHeader: candidateParent.Task.Header, - PieceLength: uint64(candidateParent.Task.PieceLength), ContentLength: uint64(candidateParent.Task.ContentLength.Load()), PieceCount: uint32(candidateParent.Task.TotalPieceCount.Load()), SizeScope: candidateParent.Task.SizeScope(), diff --git a/scheduler/scheduling/scheduling_test.go b/scheduler/scheduling/scheduling_test.go index 5f97b01ad5a..2c9b9b34d85 100644 --- a/scheduler/scheduling/scheduling_test.go +++ b/scheduler/scheduling/scheduling_test.go @@ -162,22 +162,22 @@ var ( Platform: "darwin", } - mockTaskBackToSourceLimit int32 = 200 - mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) - mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") - mockTaskTag = "d7y" - mockTaskApplication = "foo" - mockTaskFilteredQueryParams = []string{"bar"} - mockTaskHeader = map[string]string{"content-length": "100"} - mockTaskPieceLength int32 = 2048 - mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false) - mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true) - mockHostLocation = "baz" - mockHostIDC = "bas" - mockPeerID = idgen.PeerIDV2() - mockSeedPeerID = idgen.PeerIDV2() - mockPiece = standard.Piece{ + mockTaskBackToSourceLimit int32 = 200 + mockTaskURL = "http://example.com/foo" + mockTaskPieceLength uint64 = 2048 + mockTaskID = idgen.TaskIDV2(mockTaskURL, &mockTaskPieceLength, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) + mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") + mockTaskTag = "d7y" + mockTaskApplication = "foo" + mockTaskFilteredQueryParams = []string{"bar"} + mockTaskHeader = map[string]string{"content-length": "100"} + mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false) + mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true) + mockHostLocation = "baz" + mockHostIDC = "bas" + mockPeerID = idgen.PeerIDV2() + mockSeedPeerID = idgen.PeerIDV2() + mockPiece = standard.Piece{ Number: 1, ParentID: "foo", Offset: 2, @@ -439,7 +439,7 @@ func TestScheduling_ScheduleCandidateParents(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) mockSeedHost := standard.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, @@ -710,7 +710,7 @@ func TestScheduling_ScheduleParentAndCandidateParents(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) mockSeedHost := standard.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, @@ -1039,7 +1039,7 @@ func TestScheduling_FindCandidateParents(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) var mockPeers []*standard.Peer @@ -1357,7 +1357,7 @@ func TestScheduling_FindParentAndCandidateParents(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) var mockPeers []*standard.Peer @@ -1619,7 +1619,7 @@ func TestScheduling_FindSuccessParent(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) var mockPeers []*standard.Peer @@ -1672,7 +1672,6 @@ func TestScheduling_constructSuccessNormalTaskResponse(t *testing.T) { Application: &candidateParents[0].Task.Application, FilteredQueryParams: candidateParents[0].Task.FilteredQueryParams, RequestHeader: candidateParents[0].Task.Header, - PieceLength: uint64(candidateParents[0].Task.PieceLength), ContentLength: uint64(candidateParents[0].Task.ContentLength.Load()), PieceCount: uint32(candidateParents[0].Task.TotalPieceCount.Load()), SizeScope: candidateParents[0].Task.SizeScope(), @@ -1764,7 +1763,7 @@ func TestScheduling_constructSuccessNormalTaskResponse(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) candidateParents := []*standard.Peer{standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockTask, mockHost, standard.WithRange(nethttp.Range{ Start: 1, Length: 10, @@ -1814,7 +1813,7 @@ func TestScheduling_constructSuccessPeerPacket(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) parent := standard.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockTask, mockHost) diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index 50c9590e361..57317577b43 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -340,6 +340,7 @@ func (v *V1) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTaskRequ options = append(options, resource.WithDigest(d)) } + // Piece length is not supported in Protocol V1, use default value 0. task := resource.NewTask(taskID, req.GetUrl(), req.UrlMeta.GetTag(), req.UrlMeta.GetApplication(), types.TaskTypeV1ToV2(req.GetTaskType()), strings.Split(req.UrlMeta.GetFilter(), idgen.FilteredQueryParamsSeparator), req.UrlMeta.GetHeader(), int32(v.config.Scheduler.BackToSourceCount), options...) task, _ = v.resource.TaskManager().LoadOrStore(task) @@ -806,6 +807,7 @@ func (v *V1) storeTask(_ context.Context, req *schedulerv1.PeerTaskRequest, typ options = append(options, resource.WithDigest(d)) } + // Piece length is not supported in Protocol V1, use default value 0. task := resource.NewTask(req.GetTaskId(), req.GetUrl(), req.UrlMeta.GetTag(), req.UrlMeta.GetApplication(), typ, filteredQueryParams, req.UrlMeta.GetHeader(), int32(v.config.Scheduler.BackToSourceCount), options...) v.resource.TaskManager().Store(task) diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index ec051f5a57c..80b186b0c98 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -82,22 +82,22 @@ var ( Idc: mockHostIDC, } - mockTaskBackToSourceLimit int32 = 200 - mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) - mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") - mockTaskTag = "d7y" - mockTaskApplication = "foo" - mockTaskFilteredQueryParams = []string{"bar"} - mockTaskHeader = map[string]string{"Content-Length": "100", "Range": "bytes=0-99"} - mockTaskPieceLength int32 = 2048 - mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false) - mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true) - mockHostLocation = "bas" - mockHostIDC = "baz" - mockPeerID = idgen.PeerIDV2() - mockSeedPeerID = idgen.PeerIDV2() - mockPeerRange = nethttp.Range{ + mockTaskBackToSourceLimit int32 = 200 + mockTaskURL = "http://example.com/foo" + mockTaskPieceLength uint64 = 2048 + mockTaskID = idgen.TaskIDV2(mockTaskURL, &mockTaskPieceLength, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) + mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") + mockTaskTag = "d7y" + mockTaskApplication = "foo" + mockTaskFilteredQueryParams = []string{"bar"} + mockTaskHeader = map[string]string{"Content-Length": "100", "Range": "bytes=0-99"} + mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false) + mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true) + mockHostLocation = "bas" + mockHostIDC = "baz" + mockPeerID = idgen.PeerIDV2() + mockSeedPeerID = idgen.PeerIDV2() + mockPeerRange = nethttp.Range{ Start: 0, Length: 10, } @@ -806,7 +806,7 @@ func TestServiceV1_RegisterPeerTask(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) mockSeedHost := resource.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, @@ -1069,7 +1069,7 @@ func TestServiceV1_ReportPieceResult(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) tc.mock(mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), stream.EXPECT()) tc.expect(t, mockPeer, svc.ReportPieceResult(stream)) @@ -1218,7 +1218,7 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) tc.run(t, mockPeer, tc.req, svc, mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) @@ -1277,7 +1277,7 @@ func TestServiceV1_StatTask(t *testing.T) { dynconfig := configmocks.NewMockDynconfigInterface(ctl) taskManager := resource.NewMockTaskManager(ctl) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) tc.mock(mockTask, taskManager, res.EXPECT(), taskManager.EXPECT()) task, err := svc.StatTask(context.Background(), &schedulerv1.StatTaskRequest{TaskId: mockTaskID}) @@ -1575,7 +1575,7 @@ func TestServiceV1_AnnounceTask(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) tc.mock(mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT()) @@ -1771,7 +1771,7 @@ func TestServiceV1_LeaveTask(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) @@ -2433,7 +2433,7 @@ func TestServiceV1_LeaveHost(t *testing.T) { host := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) mockPeer := resource.NewPeer(mockSeedPeerID, mockTask, host) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) @@ -2537,7 +2537,7 @@ func TestServiceV1_prefetchTask(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) peer := resource.NewPeer(mockPeerID, task, mockHost) svc := NewV1(tc.config, res, scheduling, dynconfig) taskManager := resource.NewMockTaskManager(ctl) @@ -3006,7 +3006,7 @@ func TestServiceV1_triggerTask(t *testing.T) { mockSeedHost := resource.NewHost( mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) mockSeedPeer := resource.NewPeer(mockSeedPeerID, mockTask, mockSeedHost) seedPeer := resource.NewMockSeedPeer(ctl) @@ -3078,7 +3078,6 @@ func TestServiceV1_storeTask(t *testing.T) { assert.Equal(task.Application, mockTaskApplication) assert.EqualValues(task.FilteredQueryParams, mockTaskFilteredQueryParams) assert.EqualValues(task.Header, mockTaskHeader) - assert.Equal(task.PieceLength, int32(0)) assert.Empty(task.DirectPiece) assert.Equal(task.ContentLength.Load(), int64(-1)) assert.Equal(task.TotalPieceCount.Load(), int32(0)) @@ -3202,7 +3201,7 @@ func TestServiceV1_storePeer(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) gomock.InOrder( @@ -3222,7 +3221,7 @@ func TestServiceV1_storePeer(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(mockPeerID)).Return(nil, false).Times(1), @@ -3323,7 +3322,7 @@ func TestServiceV1_triggerSeedPeerTask(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) peer := resource.NewPeer(mockPeerID, task, mockHost) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, SeedPeer: mockSeedPeerConfig}, res, scheduling, dynconfig) @@ -3403,7 +3402,7 @@ func TestServiceV1_handleBeginOfPiece(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig) @@ -3418,7 +3417,7 @@ func TestServiceV1_handlePieceSuccess(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) tests := []struct { name string @@ -3730,7 +3729,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) parent := resource.NewPeer(mockSeedPeerID, mockTask, mockHost) seedPeer := resource.NewMockSeedPeer(ctl) @@ -3854,7 +3853,7 @@ func TestServiceV1_handlePeerSuccess(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) @@ -3934,7 +3933,7 @@ func TestServiceV1_handlePeerFail(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost) child := resource.NewPeer(mockPeerID, mockTask, mockHost) @@ -4016,7 +4015,7 @@ func TestServiceV1_handleTaskSuccess(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) - task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) tc.mock(task) svc.handleTaskSuccess(context.Background(), task, tc.result) @@ -4154,7 +4153,7 @@ func TestServiceV1_handleTaskFail(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) - task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest)) tc.mock(task) svc.handleTaskFailure(context.Background(), task, tc.backToSourceErr, tc.seedPeerErr) diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 8514c8f36f8..11909e7f5a5 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -302,7 +302,6 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c Application: &peer.Task.Application, FilteredQueryParams: peer.Task.FilteredQueryParams, RequestHeader: peer.Task.Header, - PieceLength: uint64(peer.Task.PieceLength), ContentLength: uint64(peer.Task.ContentLength.Load()), PieceCount: uint32(peer.Task.TotalPieceCount.Load()), SizeScope: peer.Task.SizeScope(), @@ -460,7 +459,6 @@ func (v *V2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*c Application: &task.Application, FilteredQueryParams: task.FilteredQueryParams, RequestHeader: task.Header, - PieceLength: uint64(task.PieceLength), ContentLength: uint64(task.ContentLength.Load()), PieceCount: uint32(task.TotalPieceCount.Load()), SizeScope: task.SizeScope(), @@ -1537,7 +1535,7 @@ func (v *V2) handleResource(_ context.Context, stream schedulerv2.Scheduler_Anno // Store new task or update task. task, loaded := v.resource.TaskManager().Load(taskID) if !loaded { - options := []standard.TaskOption{standard.WithPieceLength(int32(download.GetPieceLength()))} + options := []standard.TaskOption{} if download.GetDigest() != "" { d, err := digest.Parse(download.GetDigest()) if err != nil { diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index 675ca1049ad..81ec1f5ba04 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -320,7 +320,6 @@ func TestServiceV2_StatPeer(t *testing.T) { Application: &peer.Task.Application, FilteredQueryParams: peer.Task.FilteredQueryParams, RequestHeader: peer.Task.Header, - PieceLength: uint64(peer.Task.PieceLength), ContentLength: uint64(peer.Task.ContentLength.Load()), PieceCount: uint32(peer.Task.TotalPieceCount.Load()), SizeScope: peer.Task.SizeScope(), @@ -427,7 +426,7 @@ func TestServiceV2_StatPeer(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockSeedPeerID, mockTask, mockHost, standard.WithRange(mockPeerRange)) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig) @@ -498,7 +497,7 @@ func TestServiceV2_DeletePeer(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockSeedPeerID, mockTask, mockHost, standard.WithRange(mockPeerRange)) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig) @@ -549,7 +548,6 @@ func TestServiceV2_StatTask(t *testing.T) { Application: &task.Application, FilteredQueryParams: task.FilteredQueryParams, RequestHeader: task.Header, - PieceLength: uint64(task.PieceLength), ContentLength: uint64(task.ContentLength.Load()), PieceCount: uint32(task.TotalPieceCount.Load()), SizeScope: task.SizeScope(), @@ -584,7 +582,7 @@ func TestServiceV2_StatTask(t *testing.T) { dynconfig := configmocks.NewMockDynconfigInterface(ctl) taskManager := standard.NewMockTaskManager(ctl) - task := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + task := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig) tc.mock(task, taskManager, resource.EXPECT(), taskManager.EXPECT()) @@ -1608,7 +1606,7 @@ func TestServiceV2_DeleteHost(t *testing.T) { host := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) mockPeer := standard.NewPeer(mockSeedPeerID, mockTask, host) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, nil, scheduling, dynconfig) @@ -1910,7 +1908,7 @@ func TestServiceV2_handleRegisterPeerRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) seedPeer := standard.NewPeer(mockSeedPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2004,7 +2002,7 @@ func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2097,7 +2095,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2169,7 +2167,7 @@ func TestServiceV2_handleRescheduleRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2243,7 +2241,7 @@ func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2443,7 +2441,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { mockHost.IP = ip mockHost.DownloadPort = int32(port) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2516,7 +2514,7 @@ func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2636,7 +2634,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2795,7 +2793,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -2921,7 +2919,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T) mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -3032,7 +3030,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -3098,7 +3096,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -3312,7 +3310,7 @@ func TestServiceV2_handleResource(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) mockPeer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) @@ -3589,7 +3587,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) { mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) + mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV2(&tc.config, resource, persistentCacheResource, scheduling, dynconfig) diff --git a/test/e2e/v2/util/file.go b/test/e2e/v2/util/file.go index c529594ea84..56c65d47fc1 100644 --- a/test/e2e/v2/util/file.go +++ b/test/e2e/v2/util/file.go @@ -117,7 +117,7 @@ func (f *File) GetTaskID(opts ...TaskIDOption) string { opt(taskIDOptions) } - return idgen.TaskIDV2(taskIDOptions.url, taskIDOptions.tag, taskIDOptions.application, taskIDOptions.filteredQueryParams) + return idgen.TaskIDV2(taskIDOptions.url, taskIDOptions.pieceLength, taskIDOptions.tag, taskIDOptions.application, taskIDOptions.filteredQueryParams) } // GetOutputPath returns the output path of the file. diff --git a/test/e2e/v2/util/task.go b/test/e2e/v2/util/task.go index 0793b0c59b3..71990e1446c 100644 --- a/test/e2e/v2/util/task.go +++ b/test/e2e/v2/util/task.go @@ -132,6 +132,8 @@ func CalculateSha256ByOutput(pods []*PodExec, output string) (string, error) { type taskID struct { // url is the url of the download task. url string + // pieceLength is the piece length of the download task. + pieceLength *uint64 // tag is the tag of the download task. tag string // appliccation is the application of the download task. @@ -150,6 +152,13 @@ func WithTaskIDURL(url string) TaskIDOption { } } +// WithTaskIDPieceLength sets the piece length of the download task. +func WithTaskIDPieceLength(pieceLength uint64) TaskIDOption { + return func(o *taskID) { + o.pieceLength = &pieceLength + } +} + // WithTaskIDTag sets the tag of the download task. func WithTaskIDTag(tag string) TaskIDOption { return func(o *taskID) { diff --git a/test/testdata/charts/config-v2-rate-limit.yaml b/test/testdata/charts/config-v2-rate-limit.yaml index f2239a2a17a..5a81afb73ce 100644 --- a/test/testdata/charts/config-v2-rate-limit.yaml +++ b/test/testdata/charts/config-v2-rate-limit.yaml @@ -65,6 +65,10 @@ scheduler: console: false verbose: true scheduler: + algorithm: default + retryBackToSourceLimit: 5 + retryLimit: 7 + retryInterval: 500ms gc: hostGCInterval: 2m diff --git a/test/testdata/charts/config-v2.yaml b/test/testdata/charts/config-v2.yaml index 09d5a0a10d9..ac7ed4d5c3c 100644 --- a/test/testdata/charts/config-v2.yaml +++ b/test/testdata/charts/config-v2.yaml @@ -65,6 +65,10 @@ scheduler: console: false verbose: true scheduler: + algorithm: default + retryBackToSourceLimit: 5 + retryLimit: 7 + retryInterval: 500ms gc: hostGCInterval: 2m diff --git a/test/testdata/charts/config.yaml b/test/testdata/charts/config.yaml index 03a5eef2de1..80ddd22e8ff 100644 --- a/test/testdata/charts/config.yaml +++ b/test/testdata/charts/config.yaml @@ -27,7 +27,10 @@ scheduler: config: verbose: true scheduler: - algorithm: nt + algorithm: default + retryBackToSourceLimit: 5 + retryLimit: 7 + retryInterval: 500ms seedPeer: image: