From 3ffd5ff0c19b3c8544d3f6ecabafb8ee78280031 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 18 Jun 2024 16:45:52 +0800 Subject: [PATCH] enhance: Use auto oss access detection for download-segment (#274) Make `download-segment` work for gcp as well Signed-off-by: Congqi Xia --- states/download_segment.go | 193 ++++++++++--------------------------- states/instance.go | 5 - states/minio.go | 3 + 3 files changed, 55 insertions(+), 146 deletions(-) diff --git a/states/download_segment.go b/states/download_segment.go index 3bb89f3..ddd05c3 100644 --- a/states/download_segment.go +++ b/states/download_segment.go @@ -8,142 +8,47 @@ import ( "io" "os" "path" - "strconv" "time" "github.com/manifoldco/promptui" - "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" - "github.com/milvus-io/birdwatcher/proto/v2.0/indexpb" + "github.com/milvus-io/birdwatcher/framework" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" ) -func getDownloadSegmentCmd(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "download-segment", - Short: "download segment file with provided segment id", - Run: func(cmd *cobra.Command, args []string) { - if len(args) == 0 { - fmt.Println("no segment id provided") - return - } - - segSet := make(map[int64]struct{}) - for _, arg := range args { - id, err := strconv.ParseInt(arg, 10, 64) - if err == nil { - //skip bad segment id for now - segSet[id] = struct{}{} - } - } - - segments, err := common.ListSegments(cli, basePath, func(info *datapb.SegmentInfo) bool { - _, ok := segSet[info.ID] - return ok - }) - if err != nil { - fmt.Println("failed to list segment info", err.Error()) - return - } - - minioClient, bucketName, err := getMinioAccess() - if err != nil { - fmt.Println("failed to get minio access", err.Error()) - return - } - - folder := fmt.Sprintf("dlsegment_%s", time.Now().Format("20060102150406")) - for _, segment := range segments { - common.FillFieldsIfV2(cli, basePath, segment) - downloadSegment(minioClient, bucketName, segment, nil, folder) - } - - }, - } - - return cmd +type DownloadSegmentParam struct { + framework.ParamBase `use:"download-segment" desc:"download segment file with provided segment id"` + MinioAddress string `name:"minioAddr" default:"" desc:"the minio address to override, leave empty to use milvus.yaml value"` + SegmentID int64 `name:"segment" default:"0" desc:"segment id to downloads"` } -func getMinioWithInfo(addr string, ak, sk string, bucketName string, secure bool) (*minio.Client, string, error) { - cred := credentials.NewStaticV4(ak, sk, "") - minioClient, err := minio.New(addr, &minio.Options{ - Creds: cred, - Secure: secure, +func (s *InstanceState) DownloadSegmentCommand(ctx context.Context, p *DownloadSegmentParam) error { + segments, err := common.ListSegmentsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), func(s *models.Segment) bool { + return s.ID == p.SegmentID }) if err != nil { - return nil, "", err - } - exists, err := minioClient.BucketExists(context.Background(), bucketName) - if !exists { - fmt.Printf("bucket %s not exists\n", bucketName) - return nil, "", err - } - - if !exists { - fmt.Printf("Bucket not exist\n") - return nil, "", errors.New("bucket not exists") - } - - return minioClient, bucketName, nil -} - -func getMinioWithIAM(addr string, bucketName string, secure bool) (*minio.Client, string, error) { - cred := credentials.NewIAM("") - minioClient, err := minio.New(addr, &minio.Options{ - Creds: cred, - Secure: secure, - }) - if err != nil { - return nil, "", err - } - exists, err := minioClient.BucketExists(context.Background(), bucketName) - if !exists { - fmt.Printf("bucket %s not exists\n", bucketName) - return nil, "", err - } - - if !exists { - fmt.Printf("Bucket not exist\n") - return nil, "", errors.New("bucket not exists") - } - - return minioClient, bucketName, nil -} - -func getMinioAccess() (*minio.Client, string, error) { - p := promptui.Prompt{ - Label: "BucketName", - } - bucketName, err := p.Run() - if err != nil { - return nil, "", err + return err } - minioClient, err := getMinioClient() + minioClient, bucketName, _, err := s.GetMinioClientFromCfg(ctx, p.MinioAddress) if err != nil { - fmt.Println("cannot get minio client", err.Error()) - return nil, "", err - - } - exists, err := minioClient.BucketExists(context.Background(), bucketName) - if !exists { - fmt.Printf("bucket %s not exists\n", bucketName) - return nil, "", err + return err } - if !exists { - fmt.Printf("Bucket not exist\n") - return nil, "", errors.New("bucket not exists") + folder := fmt.Sprintf("dlsegment_%s", time.Now().Format("20060102150406")) + for _, segment := range segments { + err := s.downloadSegment(ctx, minioClient, bucketName, segment, folder) + if err != nil { + return err + } } - return minioClient, bucketName, nil + return nil } -func downloadSegment(cli *minio.Client, bucketName string, segment *datapb.SegmentInfo, indexMeta *indexpb.IndexMeta, folderPath string) error { - +func (s *InstanceState) downloadSegment(ctx context.Context, minioClient *minio.Client, bucketName string, segment *models.Segment, folderPath string) error { p := path.Join(folderPath, fmt.Sprintf("%d", segment.ID)) if _, err := os.Stat(p); errors.Is(err, os.ErrNotExist) { err := os.MkdirAll(p, os.ModePerm) @@ -155,7 +60,7 @@ func downloadSegment(cli *minio.Client, bucketName string, segment *datapb.Segme fmt.Printf("Downloading Segment: %d ...\n", segment.ID) - for _, fieldBinlog := range segment.Binlogs { + for _, fieldBinlog := range segment.GetBinlogs() { folder := fmt.Sprintf("%s/%d", p, fieldBinlog.FieldID) err := os.MkdirAll(folder, 0777) if err != nil { @@ -164,13 +69,13 @@ func downloadSegment(cli *minio.Client, bucketName string, segment *datapb.Segme } for _, binlog := range fieldBinlog.Binlogs { - obj, err := cli.GetObject(context.Background(), bucketName, binlog.GetLogPath(), minio.GetObjectOptions{}) + obj, err := minioClient.GetObject(ctx, bucketName, binlog.LogPath, minio.GetObjectOptions{}) if err != nil { - fmt.Printf("failed to download file bucket=\"%s\", filePath = \"%s\", err: %s\n", bucketName, binlog.GetLogPath(), err.Error()) + fmt.Printf("failed to download file bucket=\"%s\", filePath = \"%s\", err: %s\n", bucketName, binlog.LogPath, err.Error()) return err } - name := path.Base(binlog.GetLogPath()) + name := path.Base(binlog.LogPath) f, err := os.Create(path.Join(folder, name)) if err != nil { @@ -182,28 +87,34 @@ func downloadSegment(cli *minio.Client, bucketName string, segment *datapb.Segme io.Copy(w, r) } } + return nil +} - if indexMeta != nil { - fmt.Println("downloading index files ...") - folder := path.Join(p, "index") - for _, indexFile := range indexMeta.GetIndexFilePaths() { - obj, err := cli.GetObject(context.Background(), bucketName, indexFile, minio.GetObjectOptions{}) - if err != nil { - fmt.Println("failed to download file", bucketName, indexFile) - //index not affect segment download result - continue - } +func getMinioAccess() (*minio.Client, string, error) { + p := promptui.Prompt{ + Label: "BucketName", + } + bucketName, err := p.Run() + if err != nil { + return nil, "", err + } + + minioClient, err := getMinioClient() + if err != nil { + fmt.Println("cannot get minio client", err.Error()) + return nil, "", err - name := path.Base(indexFile) - f, err := os.Create(path.Join(folder, name)) - if err != nil { - fmt.Println("failed to create index file") - continue - } - w := bufio.NewWriter(f) - r := bufio.NewReader(obj) - io.Copy(w, r) - } } - return nil + exists, err := minioClient.BucketExists(context.Background(), bucketName) + if !exists { + fmt.Printf("bucket %s not exists\n", bucketName) + return nil, "", err + } + + if !exists { + fmt.Printf("Bucket not exist\n") + return nil, "", errors.New("bucket not exists") + } + + return minioClient, bucketName, nil } diff --git a/states/instance.go b/states/instance.go index 25a322a..52b98e9 100644 --- a/states/instance.go +++ b/states/instance.go @@ -56,8 +56,6 @@ func (s *InstanceState) SetupCommands() { ) cmd.AddCommand( - // download-segment - getDownloadSegmentCmd(cli, basePath), // show [subcommand] options... showCmd, // repair [subcommand] options... @@ -94,9 +92,6 @@ func (s *InstanceState) SetupCommands() { // probe GetProbeCmd(cli, basePath), - // set current-version - SetCurrentVersionCommand(), - // remove-segment-by-id //removeSegmentByID(cli, basePath), // garbage-collect diff --git a/states/minio.go b/states/minio.go index b4a37ac..272f6e1 100644 --- a/states/minio.go +++ b/states/minio.go @@ -94,6 +94,9 @@ func (s *InstanceState) GetMinioClientFromCfg(ctx context.Context, minioAddr str useSSL = config.GetValue() } } + if minioAddr != "" { + addr = minioAddr + } mp := oss.MinioClientParam{ CloudProvider: cloudProvider,