Skip to content

Commit

Permalink
enhance: Use auto oss access detection for download-segment (milvus-i…
Browse files Browse the repository at this point in the history
…o#274)

Make `download-segment` work for gcp as well

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Jun 18, 2024
1 parent 18abfa1 commit 3ffd5ff
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 146 deletions.
193 changes: 52 additions & 141 deletions states/download_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,142 +8,47 @@ import (
"io"
"os"
"path"
"strconv"
"time"

"github.com/manifoldco/promptui"
"github.com/milvus-io/birdwatcher/proto/v2.0/datapb"
"github.com/milvus-io/birdwatcher/proto/v2.0/indexpb"
"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
)

func getDownloadSegmentCmd(cli clientv3.KV, basePath string) *cobra.Command {
cmd := &cobra.Command{
Use: "download-segment",
Short: "download segment file with provided segment id",
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 0 {
fmt.Println("no segment id provided")
return
}

segSet := make(map[int64]struct{})
for _, arg := range args {
id, err := strconv.ParseInt(arg, 10, 64)
if err == nil {
//skip bad segment id for now
segSet[id] = struct{}{}
}
}

segments, err := common.ListSegments(cli, basePath, func(info *datapb.SegmentInfo) bool {
_, ok := segSet[info.ID]
return ok
})
if err != nil {
fmt.Println("failed to list segment info", err.Error())
return
}

minioClient, bucketName, err := getMinioAccess()
if err != nil {
fmt.Println("failed to get minio access", err.Error())
return
}

folder := fmt.Sprintf("dlsegment_%s", time.Now().Format("20060102150406"))
for _, segment := range segments {
common.FillFieldsIfV2(cli, basePath, segment)
downloadSegment(minioClient, bucketName, segment, nil, folder)
}

},
}

return cmd
type DownloadSegmentParam struct {
framework.ParamBase `use:"download-segment" desc:"download segment file with provided segment id"`
MinioAddress string `name:"minioAddr" default:"" desc:"the minio address to override, leave empty to use milvus.yaml value"`
SegmentID int64 `name:"segment" default:"0" desc:"segment id to downloads"`
}

func getMinioWithInfo(addr string, ak, sk string, bucketName string, secure bool) (*minio.Client, string, error) {
cred := credentials.NewStaticV4(ak, sk, "")
minioClient, err := minio.New(addr, &minio.Options{
Creds: cred,
Secure: secure,
func (s *InstanceState) DownloadSegmentCommand(ctx context.Context, p *DownloadSegmentParam) error {
segments, err := common.ListSegmentsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), func(s *models.Segment) bool {
return s.ID == p.SegmentID
})
if err != nil {
return nil, "", err
}
exists, err := minioClient.BucketExists(context.Background(), bucketName)
if !exists {
fmt.Printf("bucket %s not exists\n", bucketName)
return nil, "", err
}

if !exists {
fmt.Printf("Bucket not exist\n")
return nil, "", errors.New("bucket not exists")
}

return minioClient, bucketName, nil
}

func getMinioWithIAM(addr string, bucketName string, secure bool) (*minio.Client, string, error) {
cred := credentials.NewIAM("")
minioClient, err := minio.New(addr, &minio.Options{
Creds: cred,
Secure: secure,
})
if err != nil {
return nil, "", err
}
exists, err := minioClient.BucketExists(context.Background(), bucketName)
if !exists {
fmt.Printf("bucket %s not exists\n", bucketName)
return nil, "", err
}

if !exists {
fmt.Printf("Bucket not exist\n")
return nil, "", errors.New("bucket not exists")
}

return minioClient, bucketName, nil
}

func getMinioAccess() (*minio.Client, string, error) {
p := promptui.Prompt{
Label: "BucketName",
}
bucketName, err := p.Run()
if err != nil {
return nil, "", err
return err
}

minioClient, err := getMinioClient()
minioClient, bucketName, _, err := s.GetMinioClientFromCfg(ctx, p.MinioAddress)
if err != nil {
fmt.Println("cannot get minio client", err.Error())
return nil, "", err

}
exists, err := minioClient.BucketExists(context.Background(), bucketName)
if !exists {
fmt.Printf("bucket %s not exists\n", bucketName)
return nil, "", err
return err
}

if !exists {
fmt.Printf("Bucket not exist\n")
return nil, "", errors.New("bucket not exists")
folder := fmt.Sprintf("dlsegment_%s", time.Now().Format("20060102150406"))
for _, segment := range segments {
err := s.downloadSegment(ctx, minioClient, bucketName, segment, folder)
if err != nil {
return err
}
}

return minioClient, bucketName, nil
return nil
}

func downloadSegment(cli *minio.Client, bucketName string, segment *datapb.SegmentInfo, indexMeta *indexpb.IndexMeta, folderPath string) error {

func (s *InstanceState) downloadSegment(ctx context.Context, minioClient *minio.Client, bucketName string, segment *models.Segment, folderPath string) error {
p := path.Join(folderPath, fmt.Sprintf("%d", segment.ID))
if _, err := os.Stat(p); errors.Is(err, os.ErrNotExist) {
err := os.MkdirAll(p, os.ModePerm)
Expand All @@ -155,7 +60,7 @@ func downloadSegment(cli *minio.Client, bucketName string, segment *datapb.Segme

fmt.Printf("Downloading Segment: %d ...\n", segment.ID)

for _, fieldBinlog := range segment.Binlogs {
for _, fieldBinlog := range segment.GetBinlogs() {
folder := fmt.Sprintf("%s/%d", p, fieldBinlog.FieldID)
err := os.MkdirAll(folder, 0777)
if err != nil {
Expand All @@ -164,13 +69,13 @@ func downloadSegment(cli *minio.Client, bucketName string, segment *datapb.Segme
}

for _, binlog := range fieldBinlog.Binlogs {
obj, err := cli.GetObject(context.Background(), bucketName, binlog.GetLogPath(), minio.GetObjectOptions{})
obj, err := minioClient.GetObject(ctx, bucketName, binlog.LogPath, minio.GetObjectOptions{})
if err != nil {
fmt.Printf("failed to download file bucket=\"%s\", filePath = \"%s\", err: %s\n", bucketName, binlog.GetLogPath(), err.Error())
fmt.Printf("failed to download file bucket=\"%s\", filePath = \"%s\", err: %s\n", bucketName, binlog.LogPath, err.Error())
return err
}

name := path.Base(binlog.GetLogPath())
name := path.Base(binlog.LogPath)

f, err := os.Create(path.Join(folder, name))
if err != nil {
Expand All @@ -182,28 +87,34 @@ func downloadSegment(cli *minio.Client, bucketName string, segment *datapb.Segme
io.Copy(w, r)
}
}
return nil
}

if indexMeta != nil {
fmt.Println("downloading index files ...")
folder := path.Join(p, "index")
for _, indexFile := range indexMeta.GetIndexFilePaths() {
obj, err := cli.GetObject(context.Background(), bucketName, indexFile, minio.GetObjectOptions{})
if err != nil {
fmt.Println("failed to download file", bucketName, indexFile)
//index not affect segment download result
continue
}
func getMinioAccess() (*minio.Client, string, error) {
p := promptui.Prompt{
Label: "BucketName",
}
bucketName, err := p.Run()
if err != nil {
return nil, "", err
}

minioClient, err := getMinioClient()
if err != nil {
fmt.Println("cannot get minio client", err.Error())
return nil, "", err

name := path.Base(indexFile)
f, err := os.Create(path.Join(folder, name))
if err != nil {
fmt.Println("failed to create index file")
continue
}
w := bufio.NewWriter(f)
r := bufio.NewReader(obj)
io.Copy(w, r)
}
}
return nil
exists, err := minioClient.BucketExists(context.Background(), bucketName)
if !exists {
fmt.Printf("bucket %s not exists\n", bucketName)
return nil, "", err
}

if !exists {
fmt.Printf("Bucket not exist\n")
return nil, "", errors.New("bucket not exists")
}

return minioClient, bucketName, nil
}
5 changes: 0 additions & 5 deletions states/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ func (s *InstanceState) SetupCommands() {
)

cmd.AddCommand(
// download-segment
getDownloadSegmentCmd(cli, basePath),
// show [subcommand] options...
showCmd,
// repair [subcommand] options...
Expand Down Expand Up @@ -94,9 +92,6 @@ func (s *InstanceState) SetupCommands() {
// probe
GetProbeCmd(cli, basePath),

// set current-version
SetCurrentVersionCommand(),

// remove-segment-by-id
//removeSegmentByID(cli, basePath),
// garbage-collect
Expand Down
3 changes: 3 additions & 0 deletions states/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func (s *InstanceState) GetMinioClientFromCfg(ctx context.Context, minioAddr str
useSSL = config.GetValue()
}
}
if minioAddr != "" {
addr = minioAddr
}

mp := oss.MinioClientParam{
CloudProvider: cloudProvider,
Expand Down

0 comments on commit 3ffd5ff

Please sign in to comment.