Skip to content

Commit

Permalink
feat: support to set piece length for preheat (#3848)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Feb 26, 2025
1 parent 3d2eee6 commit 3232f7d
Show file tree
Hide file tree
Showing 25 changed files with 218 additions and 163 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/compatibility-e2e-v2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,22 @@ jobs:
include:
- module: manager
image: manager
image-tag: v2.2.1-rc.0
image-tag: v2.2.1-rc.2
chart-name: manager
skip: "Rate Limit"
- module: scheduler
image: scheduler
image-tag: v2.2.1-rc.0
image-tag: v2.2.1-rc.2
chart-name: scheduler
skip: "Rate Limit"
- module: client
image: client
image-tag: v0.2.11
image-tag: v0.2.14
chart-name: client
skip: "Rate Limit"
- module: seed-client
image: client
image-tag: v0.2.11
image-tag: v0.2.14
chart-name: seed-client
skip: "Rate Limit"

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
cloud.google.com/go/storage v1.50.0
d7y.io/api/v2 v2.1.27
d7y.io/api/v2 v2.1.30
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.50.0 h1:3TbVkzTooBvnZsk7WaAQfOsNrdoM8QHusXA1cpk6Q
cloud.google.com/go/storage v1.50.0/go.mod h1:l7XeiD//vx5lfqE3RavfmU9yvk5Pp0Zhcv482poyafY=
cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI=
cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io=
d7y.io/api/v2 v2.1.27 h1:bEiAnNNs944DrV1rZP4m1BR9vh8KkBKLwlyVv+kOQoQ=
d7y.io/api/v2 v2.1.27/go.mod h1:RFsSiRgEBfMrSHlrZWbkJ3xil8fnbbGSj0UMwbUUC5I=
d7y.io/api/v2 v2.1.30 h1:KCHcg2oWX8jSFwJXuxgmNCPlPx29/2tcMXHT7wVKqWo=
d7y.io/api/v2 v2.1.30/go.mod h1:RFsSiRgEBfMrSHlrZWbkJ3xil8fnbbGSj0UMwbUUC5I=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
1 change: 1 addition & 0 deletions internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
// PreheatRequest defines the request parameters for preheating.
type PreheatRequest struct {
URL string `json:"url" validate:"required,url"`
PieceLength *uint64 `json:"pieceLength" binding:"omitempty,gte=4194304"`
Tag string `json:"tag" validate:"omitempty"`
FilteredQueryParams string `json:"filtered_query_params" validate:"omitempty"`
Headers map[string]string `json:"headers" validate:"omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions manager/job/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul
files = []internaljob.PreheatRequest{
{
URL: json.URL,
PieceLength: json.PieceLength,
Tag: json.Tag,
FilteredQueryParams: json.FilteredQueryParams,
Headers: json.Headers,
Expand Down Expand Up @@ -349,6 +350,7 @@ func (p *preheat) parseLayers(manifests []distribution.Manifest, args types.Preh
header.Set("Accept", v.MediaType)
layer := internaljob.PreheatRequest{
URL: image.blobsURL(v.Digest.String()),
PieceLength: args.PieceLength,
Tag: args.Tag,
FilteredQueryParams: args.FilteredQueryParams,
Headers: nethttp.HeaderToMap(header),
Expand Down
4 changes: 2 additions & 2 deletions manager/job/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler,

taskID := json.TaskID
if json.URL != "" {
taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
taskID = idgen.TaskIDV2(json.URL, json.PieceLength, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
}

args, err := internaljob.MarshalRequest(internaljob.GetTaskRequest{
Expand Down Expand Up @@ -121,7 +121,7 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul

taskID := json.TaskID
if json.URL != "" {
taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
taskID = idgen.TaskIDV2(json.URL, json.PieceLength, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
}

args, err := internaljob.MarshalRequest(internaljob.DeleteTaskRequest{
Expand Down
18 changes: 18 additions & 0 deletions manager/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ type PreheatArgs struct {
// URL is the image url for preheating.
URL string `json:"url" binding:"required"`

// PieceLength is the piece length(bytes) for downloading file. The value needs to
// be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib).
// If the piece length is not specified, the piece length will be calculated
// according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"`

// Tag is the tag for preheating.
Tag string `json:"tag" binding:"omitempty"`

Expand Down Expand Up @@ -177,6 +183,12 @@ type GetTaskArgs struct {
// URL is the download url of the task.
URL string `json:"url" binding:"omitempty"`

// PieceLength is the piece length(bytes) for downloading file. The value needs to
// be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib).
// If the piece length is not specified, the piece length will be calculated
// according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"`

// Tag is the tag of the task.
Tag string `json:"tag" binding:"omitempty"`

Expand Down Expand Up @@ -211,6 +223,12 @@ type DeleteTaskArgs struct {
// URL is the download url of the task.
URL string `json:"url" binding:"omitempty"`

// PieceLength is the piece length(bytes) for downloading file. The value needs to
// be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib).
// If the piece length is not specified, the piece length will be calculated
// according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"`

// Tag is the tag of the task.
Tag string `json:"tag" binding:"omitempty"`

Expand Down
10 changes: 8 additions & 2 deletions pkg/idgen/task_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package idgen

import (
"strconv"
"strings"

commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
Expand Down Expand Up @@ -91,11 +92,16 @@ func ParseFilteredQueryParams(rawFilteredQueryParams string) []string {
}

// TaskIDV2 generates v2 version of task id.
func TaskIDV2(url, tag, application string, filteredQueryParams []string) string {
func TaskIDV2(url string, pieceLength *uint64, tag, application string, filteredQueryParams []string) string {
url, err := neturl.FilterQueryParams(url, filteredQueryParams)
if err != nil {
url = ""
}

return pkgdigest.SHA256FromStrings(url, tag, application)
params := []string{url, tag, application}
if pieceLength != nil {
params = append(params, strconv.FormatUint(*pieceLength, 10))
}

return pkgdigest.SHA256FromStrings(params...)
}
25 changes: 24 additions & 1 deletion pkg/idgen/task_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,12 @@ func TestTaskIDV1(t *testing.T) {
}

func TestTaskIDV2(t *testing.T) {
pieceLength := uint64(1024)

tests := []struct {
name string
url string
pieceLength *uint64
tag string
application string
filters []string
Expand All @@ -118,9 +121,20 @@ func TestTaskIDV2(t *testing.T) {
{
name: "generate taskID",
url: "https://example.com",
pieceLength: &pieceLength,
tag: "foo",
application: "bar",
filters: []string{},
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "99a47b38e9d3321aebebd715bea0483c1400cef2f767f84d97458f9dcedff221")
},
},
{
name: "generate taskID with tag and application",
url: "https://example.com",
tag: "foo",
application: "bar",
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "160fa7f001d9d2e893130894fbb60a5fb006e1d61bff82955f2946582bc9de1d")
Expand All @@ -144,6 +158,15 @@ func TestTaskIDV2(t *testing.T) {
assert.Equal(d, "63dee2822037636b0109876b58e95692233840753a882afa69b9b5ee82a6c57d")
},
},
{
name: "generate taskID with pieceLength",
url: "https://example.com",
pieceLength: &pieceLength,
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "40c21de3ad2f1470ca1a19a2ad2577803a1829851f6cf862ffa2d4577ae51d38")
},
},
{
name: "generate taskID with filters",
url: "https://example.com?foo=foo&bar=bar",
Expand All @@ -157,7 +180,7 @@ func TestTaskIDV2(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.expect(t, TaskIDV2(tc.url, tc.tag, tc.application, tc.filters))
tc.expect(t, TaskIDV2(tc.url, tc.pieceLength, tc.tag, tc.application, tc.filters))
})
}
}
7 changes: 5 additions & 2 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ func (j *job) preheat(ctx context.Context, data string) (string, error) {
return "", err
}

taskID := idgen.TaskIDV2(req.URL, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator))
taskID := idgen.TaskIDV2(req.URL, req.PieceLength, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator))
log := logger.WithTask(taskID, req.URL)
log.Infof("preheat %s request: %#v", req.URL, req)
log.Infof("preheat %s %d request: %#v", req.URL, req.PieceLength, req)

ctx, cancel := context.WithTimeout(ctx, req.Timeout)
defer cancel()
Expand Down Expand Up @@ -299,6 +299,7 @@ func (j *job) preheatAllSeedPeers(ctx context.Context, taskID string, req *inter
taskID,
&dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{
Url: req.URL,
PieceLength: req.PieceLength,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down Expand Up @@ -440,6 +441,7 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj
taskID,
&dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{
Url: req.URL,
PieceLength: req.PieceLength,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down Expand Up @@ -583,6 +585,7 @@ func (j *job) preheatV2(ctx context.Context, taskID string, req *internaljob.Pre
stream, err := j.resource.SeedPeer().Client().DownloadTask(ctx, taskID, &dfdaemonv2.DownloadTaskRequest{
Download: &commonv2.Download{
Url: req.URL,
PieceLength: req.PieceLength,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down
10 changes: 0 additions & 10 deletions scheduler/resource/standard/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,6 @@ const (
// TaskOption is a functional option for task.
type TaskOption func(task *Task)

// WithPieceLength set PieceLength for task.
func WithPieceLength(pieceLength int32) TaskOption {
return func(t *Task) {
t.PieceLength = pieceLength
}
}

// WithDigest set Digest for task.
func WithDigest(d *digest.Digest) TaskOption {
return func(t *Task) {
Expand Down Expand Up @@ -127,9 +120,6 @@ type Task struct {
// Task request headers.
Header map[string]string

// Task piece length.
PieceLength int32

// DirectPiece is tiny piece data.
DirectPiece []byte

Expand Down
25 changes: 11 additions & 14 deletions scheduler/resource/standard/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ var (
CreatedAt: time.Now(),
}

mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
mockTaskFilteredQueryParams = []string{"bar"}
mockTaskHeader = map[string]string{"content-length": "100"}
mockTaskPieceLength int32 = 2048
mockPieceDigest = digest.New(digest.AlgorithmMD5, "ad83a945518a4ef007d8b2db2ef165b3")
mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskPieceLength uint64 = 2048
mockTaskID = idgen.TaskIDV2(mockTaskURL, &mockTaskPieceLength, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
mockTaskFilteredQueryParams = []string{"bar"}
mockTaskHeader = map[string]string{"content-length": "100"}
mockPieceDigest = digest.New(digest.AlgorithmMD5, "ad83a945518a4ef007d8b2db2ef165b3")
)

func TestTask_NewTask(t *testing.T) {
Expand All @@ -78,7 +78,6 @@ func TestTask_NewTask(t *testing.T) {
assert.Equal(task.Application, mockTaskApplication)
assert.EqualValues(task.FilteredQueryParams, mockTaskFilteredQueryParams)
assert.EqualValues(task.Header, mockTaskHeader)
assert.Equal(task.PieceLength, int32(0))
assert.Empty(task.DirectPiece)
assert.Equal(task.ContentLength.Load(), int64(-1))
assert.Equal(task.TotalPieceCount.Load(), int32(0))
Expand All @@ -94,7 +93,7 @@ func TestTask_NewTask(t *testing.T) {
},
{
name: "new task with piece length",
options: []TaskOption{WithPieceLength(mockTaskPieceLength)},
options: []TaskOption{},
expect: func(t *testing.T, task *Task) {
assert := assert.New(t)
assert.Equal(task.ID, mockTaskID)
Expand All @@ -105,7 +104,6 @@ func TestTask_NewTask(t *testing.T) {
assert.Equal(task.Application, mockTaskApplication)
assert.EqualValues(task.FilteredQueryParams, mockTaskFilteredQueryParams)
assert.EqualValues(task.Header, mockTaskHeader)
assert.Equal(task.PieceLength, mockTaskPieceLength)
assert.Empty(task.DirectPiece)
assert.Equal(task.ContentLength.Load(), int64(-1))
assert.Equal(task.TotalPieceCount.Load(), int32(0))
Expand All @@ -132,7 +130,6 @@ func TestTask_NewTask(t *testing.T) {
assert.Equal(task.Application, mockTaskApplication)
assert.EqualValues(task.FilteredQueryParams, mockTaskFilteredQueryParams)
assert.EqualValues(task.Header, mockTaskHeader)
assert.Equal(task.PieceLength, int32(0))
assert.Empty(task.DirectPiece)
assert.Equal(task.ContentLength.Load(), int64(-1))
assert.Equal(task.TotalPieceCount.Load(), int32(0))
Expand Down
Loading

0 comments on commit 3232f7d

Please sign in to comment.