Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(artifact): adopt the advanced converting pipeline #127

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions pkg/service/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ const chunkLength = 1024
const chunkOverlap = 200
const NamespaceID = "preset"

// Note: this pipeline is for the old indexing pipeline
// Note: this pipeline is for the old indexing pipeline having convert_result
const ConvertDocToMDPipelineID = "indexing-convert-pdf"
const DocToMDVersion = "v1.1.1"

// TODO: we revert to the old pipeline. it will change to the new pipeline later.
const ConvertDocToMDPipelineID2 = "indexing-convert-pdf"

// TODO: we need to update the version after the new pipeline is ready
const DocToMDVersion2 = "v1.1.1"
// Note: this pipeline is for the new indexing pipeline having convert_result or convert_result2
const ConvertDocToMDPipelineID2 = "indexing-advanced-convert-doc"
const DocToMDVersion2 = "v1.2.0"

const MdChunkPipelineID = "indexing-split-markdown"
const MdSplitVersion = "v2.0.0"
Expand Down Expand Up @@ -144,8 +142,9 @@ func getFileTypePrefix(fileType artifactPb.FileType) string {
}
}

// Helper function to safely extract the "convert_result" from the response.
// It checks if the index and key are available to avoid nil pointer issues.
// getConvertResult extracts the conversion result from the pipeline response.
// It first checks for a non-empty "convert_result" field, then falls back to "convert_result2".
// Returns an error if neither field contains valid data or if the response structure is invalid.
func getConvertResult(resp *pipelinePb.TriggerNamespacePipelineReleaseResponse) (string, error) {
if resp == nil || len(resp.Outputs) == 0 {
return "", fmt.Errorf("response is nil or has no outputs. resp: %v", resp)
Expand All @@ -155,10 +154,14 @@ func getConvertResult(resp *pipelinePb.TriggerNamespacePipelineReleaseResponse)
return "", fmt.Errorf("fields in the output are nil. resp: %v", resp)
}
convertResult, ok := fields["convert_result"]
if !ok {
return "", fmt.Errorf("convert_result not found in the output fields. resp: %v", resp)
if ok && convertResult.GetStringValue() != "" {
return convertResult.GetStringValue(), nil
}
return convertResult.GetStringValue(), nil
convertResult2, ok2 := fields["convert_result2"]
if ok2 && convertResult2.GetStringValue() != "" {
return convertResult2.GetStringValue(), nil
}
return "", fmt.Errorf("convert_result or convert_result2 not found in the output fields. resp: %v", resp)
}

type Chunk = struct {
Expand Down Expand Up @@ -365,7 +368,6 @@ func (s *Service) EmbeddingTextPipe(ctx context.Context, caller uuid.UUID, reque
batch := texts[i:end]
batchIndex := i / maxBatchSize


// Acquire semaphore before starting goroutine
sem <- struct{}{}
wg.Add(1)
Expand Down
55 changes: 47 additions & 8 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,11 +490,37 @@ func (wp *fileToEmbWorkerPool) processConvertingFile(ctx context.Context, file r
return updatedFile, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_CHUNKING, nil
}

// Processes a file with the status "chunking".
// If the file is a PDF or other document type, it retrieves the converted file from MinIO and calls the markdown chunking pipeline.
// If the file is a text or markdown file, it retrieves the file from MinIO and calls the respective chunking pipeline.
// The resulting chunks are saved into object storage and metadata is updated in the database.
// Finally, the file status is updated to "embedding" in the database.
// Processes a file with the status "chunking" by splitting it into text chunks.
// The processing varies by file type:
//
// For PDF, DOC, DOCX, PPT, PPTX, HTML, XLSX, XLS, CSV:
// - Retrieves converted file from MinIO
// - For spreadsheet files (XLSX, XLS, CSV): Uses markdown chunking pipeline
// - For other document types: Uses text chunking pipeline
//
// For TEXT files:
// - Retrieves original file from MinIO
// - Uses text chunking pipeline
//
// For MARKDOWN files:
// - Retrieves original file from MinIO
// - Uses markdown chunking pipeline
//
// For all file types:
// - Saves chunks to object storage
// - Updates metadata in database with chunking pipeline info
// - Updates file status to "embedding"
//
// Parameters:
// - ctx: Context for the operation
// - file: KnowledgeBaseFile struct containing file metadata
//
// Returns:
// - updatedFile: Updated KnowledgeBaseFile after processing
// - nextStatus: Next file process status (EMBEDDING if successful)
// - err: Error if any step fails
//
// The function handles errors at each step and returns appropriate status codes.
func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file repository.KnowledgeBaseFile) (*repository.KnowledgeBaseFile, artifactpb.FileProcessStatus, error) {
logger, _ := logger.GetZapLogger(ctx)
logger.Info("Processing chunking status file.", zap.String("File uid", file.UID.String()))
Expand Down Expand Up @@ -527,10 +553,23 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep
return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err
}

// TODO: some file use splitTextPipe and some use splitMarkdownPipe
// call the markdown chunking pipeline
requesterUID := file.RequesterUID
chunks, err := wp.svc.SplitMarkdownPipe(ctx, file.CreatorUID, requesterUID, string(convertedFileData))
chunks := []service.Chunk{}
switch file.Type {
case artifactpb.FileType_FILE_TYPE_XLSX.String(),
artifactpb.FileType_FILE_TYPE_XLS.String(),
artifactpb.FileType_FILE_TYPE_CSV.String():
requesterUID := file.RequesterUID
chunks, err = wp.svc.SplitMarkdownPipe(ctx, file.CreatorUID, requesterUID, string(convertedFileData))
case artifactpb.FileType_FILE_TYPE_PDF.String(),
artifactpb.FileType_FILE_TYPE_DOCX.String(),
artifactpb.FileType_FILE_TYPE_DOC.String(),
artifactpb.FileType_FILE_TYPE_PPTX.String(),
artifactpb.FileType_FILE_TYPE_PPT.String(),
artifactpb.FileType_FILE_TYPE_HTML.String():
requesterUID := file.RequesterUID
chunks, err = wp.svc.SplitTextPipe(ctx, file.CreatorUID, requesterUID, string(convertedFileData))
}
if err != nil {
logger.Error("Failed to get chunks from converted file using markdown chunking pipeline.", zap.String("Converted file uid", convertedFile.UID.String()))
return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err
Expand Down
Loading