Skip to content

Commit

Permalink
feat(artifact): add retry in minIO and milvus (#109)
Browse files Browse the repository at this point in the history
Because

- we need to increase stability of using minIO and milvus.
- we need to provide user friendly error message when file name is too
long.

This commit

- add retry for calling them
- add custom file name check and return better error message
  • Loading branch information
Yougigun authored Oct 7, 2024
1 parent a8430aa commit c5bbf5f
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 42 deletions.
2 changes: 1 addition & 1 deletion pkg/acl/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (c *ACLClient) ListPermissions(ctx context.Context, objectType string, role
Relation: role,
Type: objectType,
})
// TODO: handle error when no model is created
// TODO: handle error when no auth model is created
if err != nil {
if statusErr, ok := status.FromError(err); ok {
if statusErr.Code() == codes.Code(openfga.ErrorCode_type_not_found) {
Expand Down
16 changes: 11 additions & 5 deletions pkg/handler/knowledgebasefiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ import (
"strings"

"github.com/gofrs/uuid"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
"gorm.io/gorm"

"github.com/instill-ai/artifact-backend/pkg/constant"
"github.com/instill-ai/artifact-backend/pkg/customerror"
"github.com/instill-ai/artifact-backend/pkg/logger" // Add this import
"github.com/instill-ai/artifact-backend/pkg/logger"
"github.com/instill-ai/artifact-backend/pkg/repository"
"github.com/instill-ai/artifact-backend/pkg/resource"
"github.com/instill-ai/artifact-backend/pkg/utils"

artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
"gorm.io/gorm"
)

func (ph *PublicHandler) UploadCatalogFile(ctx context.Context, req *artifactpb.UploadCatalogFileRequest) (*artifactpb.UploadCatalogFileResponse, error) {
Expand All @@ -29,7 +31,11 @@ func (ph *PublicHandler) UploadCatalogFile(ctx context.Context, req *artifactpb.
if err != nil {
return nil, err
}

// check file name length based on character count
if len(req.File.Name) > 255 {
return nil, fmt.Errorf("file name is too long. max length is 255. name: %s err: %w",
req.File.Name, customerror.ErrInvalidArgument)
}
// determine the file type by its extension
req.File.Type = DetermineFileType(req.File.Name)
if req.File.Type == artifactpb.FileType_FILE_TYPE_UNSPECIFIED {
Expand Down
20 changes: 20 additions & 0 deletions pkg/handler/knowledgebasefiles_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package handler

import (
"fmt"
"testing"
"unicode/utf8"
)

func TestUploadCatalogFile(t *testing.T) {
// Check rune count
input := "-"

actual := utf8.RuneCountInString(input)
// print actual
fmt.Println(actual)

// check string length
expected := len(input)
fmt.Println(expected)
}
29 changes: 25 additions & 4 deletions pkg/milvus/milvus.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,17 @@ func (m *MilvusClient) CreateKnowledgeBaseCollection(ctx context.Context, kbUID

// InsertVectorsToKnowledgeBaseCollection
func (m *MilvusClient) InsertVectorsToKnowledgeBaseCollection(ctx context.Context, kbUID string, embeddings []Embedding) error {
logger, _ := logger.GetZapLogger(ctx)
collectionName := m.GetKnowledgeBaseCollectionName(kbUID)

// Check if the collection exists
has, err := m.c.HasCollection(ctx, collectionName)
if err != nil {
logger.Error("Failed to check collection existence", zap.Error(err))
return fmt.Errorf("failed to check collection existence: %w", err)
}
if !has {
logger.Error("Collection does not exist", zap.String("collection", collectionName))
return fmt.Errorf("collection %s does not exist", collectionName)
}

Expand Down Expand Up @@ -180,18 +183,36 @@ func (m *MilvusClient) InsertVectorsToKnowledgeBaseCollection(ctx context.Contex
entity.NewColumnFloatVector(KbCollectionFiledEmbedding, VectorDim, vectors),
}

// Insert the data
_, err = m.c.Upsert(ctx, collectionName, "", columns...)
// Insert the data with retry
maxRetries := 3
for attempt := 1; attempt <= maxRetries; attempt++ {
_, err = m.c.Upsert(ctx, collectionName, "", columns...)
if err == nil {
break
}
logger.Warn("Failed to insert vectors, retrying", zap.Int("attempt", attempt), zap.Error(err))
time.Sleep(time.Second * time.Duration(attempt))
}
if err != nil {
logger.Error("Failed to insert vectors after retries", zap.Error(err))
return fmt.Errorf("failed to insert vectors: %w", err)
}

// Optionally, you can flush the collection to ensure the data is persisted
err = m.c.Flush(ctx, collectionName, false)
// Flush the collection with retry
for attempt := 1; attempt <= maxRetries; attempt++ {
err = m.c.Flush(ctx, collectionName, false)
if err == nil {
break
}
logger.Warn("Failed to flush collection, retrying", zap.Int("attempt", attempt), zap.Error(err))
time.Sleep(time.Second * time.Duration(attempt))
}
if err != nil {
logger.Error("Failed to flush collection after retries", zap.Error(err))
return fmt.Errorf("failed to flush collection after insertion: %w", err)
}

logger.Info("Successfully inserted and flushed vectors", zap.String("collection", collectionName))
return nil
}

Expand Down
15 changes: 12 additions & 3 deletions pkg/minio/knowledgebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"fmt"
"sync"

"github.com/instill-ai/artifact-backend/pkg/logger"
"github.com/instill-ai/artifact-backend/pkg/utils"
"go.uber.org/zap"
)

// KnowledgeBaseI is the interface for knowledge base related operations.
Expand Down Expand Up @@ -56,8 +58,13 @@ type ChunkContentType []byte

// SaveTextChunks saves batch of chunks(text files) to MinIO.
func (m *Minio) SaveTextChunks(ctx context.Context, kbUID string, chunks map[ChunkUIDType]ChunkContentType) error {
logger, _ := logger.GetZapLogger(ctx)
var wg sync.WaitGroup
errorUIDChan := make(chan string, len(chunks))
type ChunkError struct {
ChunkUID string
ErrorMessage string
}
errorUIDChan := make(chan ChunkError, len(chunks))
for chunkUID, chunkContent := range chunks {
wg.Add(1)
go utils.GoRecover(func() {
Expand All @@ -67,19 +74,21 @@ func (m *Minio) SaveTextChunks(ctx context.Context, kbUID string, chunks map[Chu

err := m.UploadBase64File(ctx, filePathName, base64.StdEncoding.EncodeToString(chunkContent), "text/plain")
if err != nil {
errorUIDChan <- string(chunkUID)
logger.Error("Failed to upload chunk after retries", zap.String("chunkUID", string(chunkUID)), zap.Error(err))
errorUIDChan <- ChunkError{ChunkUID: string(chunkUID), ErrorMessage: err.Error()}
return
}
}(chunkUID, chunkContent)
}, fmt.Sprintf("SaveTextChunks %s", chunkUID))
}
wg.Wait()
close(errorUIDChan)
var errStr []string
var errStr []ChunkError
for err := range errorUIDChan {
errStr = append(errStr, err)
}
if len(errStr) > 0 {
logger.Error("Failed to upload chunks", zap.Any("ChunkError", errStr))
return fmt.Errorf("failed to upload chunks: %v", errStr)
}
return nil
Expand Down
95 changes: 73 additions & 22 deletions pkg/minio/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"strings"
"sync"
"time"

"github.com/instill-ai/artifact-backend/config"
log "github.com/instill-ai/artifact-backend/pkg/logger"
Expand Down Expand Up @@ -92,9 +93,16 @@ func (m *Minio) UploadBase64File(ctx context.Context, filePathName string, base6
// Upload the content to MinIO
size := int64(len(decodedContent))
// Create the file path with folder structure
_, err = m.client.PutObjectWithContext(ctx, m.bucket, filePathName, contentReader, size, minio.PutObjectOptions{ContentType: fileMimeType})
for i := 0; i < 3; i++ {
_, err = m.client.PutObjectWithContext(ctx, m.bucket, filePathName, contentReader, size, minio.PutObjectOptions{ContentType: fileMimeType})
if err == nil {
break
}
log.Error("Failed to upload file to MinIO, retrying...", zap.String("attempt", fmt.Sprintf("%d", i+1)), zap.Error(err))
time.Sleep(1 * time.Second)
}
if err != nil {
log.Error("Failed to upload file to MinIO", zap.Error(err))
log.Error("Failed to upload file to MinIO after retries", zap.Error(err))
return err
}
return nil
Expand All @@ -107,7 +115,14 @@ func (m *Minio) DeleteFile(ctx context.Context, filePathName string) (err error)
return err
}
// Delete the file from MinIO
err = m.client.RemoveObject(m.bucket, filePathName)
for attempt := 1; attempt <= 3; attempt++ {
err = m.client.RemoveObject(m.bucket, filePathName)
if err == nil {
break
}
log.Error("Failed to delete file from MinIO, retrying...", zap.String("filePathName", filePathName), zap.Int("attempt", attempt), zap.Error(err))
time.Sleep(time.Duration(attempt) * time.Second)
}
if err != nil {
log.Error("Failed to delete file from MinIO", zap.Error(err))
return err
Expand All @@ -132,7 +147,15 @@ func (m *Minio) DeleteFiles(ctx context.Context, filePathNames []string) chan er
func() {
func(filePathName string, errCh chan error) {
defer wg.Done()
err := m.client.RemoveObject(m.bucket, filePathName)
var err error
for attempt := 1; attempt <= 3; attempt++ {
err = m.client.RemoveObject(m.bucket, filePathName)
if err == nil {
break
}
log.Error("Failed to delete file from MinIO, retrying...", zap.String("filePathName", filePathName), zap.Int("attempt", attempt), zap.Error(err))
time.Sleep(time.Duration(attempt) * time.Second)
}
if err != nil {
log.Error("Failed to delete file from MinIO", zap.Error(err))
errCh <- err
Expand All @@ -151,10 +174,18 @@ func (m *Minio) GetFile(ctx context.Context, filePathName string) ([]byte, error
return nil, err
}

// Get the object using the client
object, err := m.client.GetObject(m.bucket, filePathName, minio.GetObjectOptions{})
// Get the object using the client with three attempts and proper time delay
var object *minio.Object
for attempt := 1; attempt <= 3; attempt++ {
object, err = m.client.GetObject(m.bucket, filePathName, minio.GetObjectOptions{})
if err == nil {
break
}
log.Error("Failed to get file from MinIO, retrying...", zap.String("filePathName", filePathName), zap.Int("attempt", attempt), zap.Error(err))
time.Sleep(time.Duration(attempt) * time.Second)
}
if err != nil {
log.Error("Failed to get file from MinIO", zap.Error(err))
log.Error("Failed to get file from MinIO after 3 attempts", zap.String("filePathName", filePathName), zap.Error(err))
return nil, err
}
defer object.Close()
Expand Down Expand Up @@ -184,21 +215,27 @@ func (m *Minio) GetFilesByPaths(ctx context.Context, filePaths []string) ([]File
}

var wg sync.WaitGroup
files := make([]FileContent, len(filePaths))
errors := make([]error, len(filePaths))
var mu sync.Mutex
fileCh := make(chan FileContent, len(filePaths))
errorCh := make(chan error, len(filePaths))

for i, path := range filePaths {
for _, path := range filePaths {
wg.Add(1)
go utils.GoRecover(func() {
func(index int, filePath string) {
func(filePath string) {
defer wg.Done()
obj, err := m.client.GetObject(m.bucket, filePath, minio.GetObjectOptions{})
var obj *minio.Object
var err error
for attempt := 1; attempt <= 3; attempt++ {
obj, err = m.client.GetObject(m.bucket, filePath, minio.GetObjectOptions{})
if err == nil {
break
}
log.Error("Failed to get object from MinIO, retrying...", zap.String("path", filePath), zap.Int("attempt", attempt), zap.Error(err))
time.Sleep(time.Duration(attempt) * time.Second)
}
if err != nil {
log.Error("Failed to get object from MinIO", zap.String("path", filePath), zap.Error(err))
mu.Lock()
errors[index] = err
mu.Unlock()
errorCh <- err
return
}
defer obj.Close()
Expand All @@ -207,22 +244,28 @@ func (m *Minio) GetFilesByPaths(ctx context.Context, filePaths []string) ([]File
_, err = io.Copy(&buffer, obj)
if err != nil {
log.Error("Failed to read object content", zap.String("path", filePath), zap.Error(err))
errors[index] = err
errorCh <- err
return
}

files[index] = FileContent{
fileCh <- FileContent{
Name: filepath.Base(filePath),
Content: buffer.Bytes(),
}
}(i, path)
}(path)
}, fmt.Sprintf("GetFilesByPaths %s", path))
}

wg.Wait()
close(fileCh)
close(errorCh)

var files []FileContent
for file := range fileCh {
files = append(files, file)
}

// Check if any errors occurred
for _, err := range errors {
for err := range errorCh {
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -257,7 +300,15 @@ func (m *Minio) DeleteFilesWithPrefix(ctx context.Context, prefix string) chan e
go utils.GoRecover(func() {
func(objectName string) {
defer wg.Done()
err := m.client.RemoveObject(m.bucket, objectName)
var err error
for attempt := 1; attempt <= 3; attempt++ {
err = m.client.RemoveObject(m.bucket, objectName)
if err == nil {
break
}
log.Error("Failed to delete object from MinIO, retrying...", zap.String("object", objectName), zap.Int("attempt", attempt), zap.Error(err))
time.Sleep(time.Duration(attempt) * time.Second)
}
if err != nil {
log.Error("Failed to delete object from MinIO", zap.String("object", objectName), zap.Error(err))
errCh <- err
Expand Down
2 changes: 0 additions & 2 deletions pkg/service/mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func (s *Service) GetNamespaceByNsID(ctx context.Context, nsID string) (*resourc
return &ns, nil
}

// TODO: GetNamespaceTierByNsID: in the future, this logic should be removed in CE. Because CE does not have subscription
// GetNamespaceTierByNsID returns the tier of the namespace given the namespace ID
func (s *Service) GetNamespaceTierByNsID(ctx context.Context, nsID string) (Tier, error) {
ns, err := s.GetNamespaceByNsID(ctx, nsID)
Expand All @@ -55,7 +54,6 @@ func (s *Service) GetNamespaceTierByNsID(ctx context.Context, nsID string) (Tier
return s.GetNamespaceTier(ctx, ns)
}

// TODO: GetNamespaceTier: in the future, this logic should be removed in CE. Because CE does not have subscription
func (s *Service) GetNamespaceTier(ctx context.Context, ns *resource.Namespace) (Tier, error) {
log, _ := logger.GetZapLogger(ctx)
switch ns.NsType {
Expand Down
4 changes: 2 additions & 2 deletions pkg/service/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,11 @@ func GetVectorsFromResponse(resp *pipelinePb.TriggerNamespacePipelineReleaseResp
for _, output := range resp.Outputs {
embedResult, ok := output.GetFields()["embed_result"]
if !ok {
return nil, fmt.Errorf("embed_result not found in the output fields. resp: %v", resp)
return nil, fmt.Errorf("embed_result not found in the output fields. output: %v", output)
}
listValue := embedResult.GetListValue()
if listValue == nil {
return nil, fmt.Errorf("embed_result is not a list. resp: %v", resp)
return nil, fmt.Errorf("embed_result is not a list. output: %v", output)
}

vector := make([]float32, 0, len(listValue.GetValues()))
Expand Down
4 changes: 2 additions & 2 deletions pkg/usage/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (u *usage) RetrieveArtifactUsageData() interface{} {

// Roll all artifact resources on a user
// for _, user := range userResp.GetUsers() {
//TODO: implement the logic to retrieve the artifact usage data
//TODO: implement the logic to retrieve the app usage data
// }

if userResp.NextPageToken == "" {
Expand All @@ -115,7 +115,7 @@ func (u *usage) RetrieveArtifactUsageData() interface{} {

// Roll all artifact resources on an org
// for _, org := range orgResp.GetOrganizations() {
//TODO: implement the logic to retrieve the artifact usage data
//TODO: implement the logic to retrieve the app usage data
// }

if orgResp.NextPageToken == "" {
Expand Down
Loading

0 comments on commit c5bbf5f

Please sign in to comment.