From 2332507073cb37323f52ba7a2496c748b1167d85 Mon Sep 17 00:00:00 2001 From: Gary Date: Tue, 29 Oct 2024 20:00:36 +0800 Subject: [PATCH] feat(catalog): use advanced converting pipleine (#122) Because we need the better pipeline to convert the file to markdown This commit uses the advanced one based on file type --- pkg/handler/knowledgebase.go | 12 ++-- pkg/service/pipeline.go | 127 ++++++++++++++++++++++++----------- pkg/worker/worker.go | 8 +-- 3 files changed, 98 insertions(+), 49 deletions(-) diff --git a/pkg/handler/knowledgebase.go b/pkg/handler/knowledgebase.go index 3770d4a..64195e4 100644 --- a/pkg/handler/knowledgebase.go +++ b/pkg/handler/knowledgebase.go @@ -215,10 +215,10 @@ func (ph *PublicHandler) ListCatalogs(ctx context.Context, req *artifactpb.ListC CreateTime: kb.CreateTime.String(), UpdateTime: kb.UpdateTime.String(), OwnerName: kb.Owner, - ConvertingPipelines: []string{service.NamespaceID + "/" + service.ConvertPDFToMDPipelineID}, + ConvertingPipelines: []string{service.NamespaceID + "/" + service.ConvertDocToMDPipelineID}, SplittingPipelines: []string{ - service.NamespaceID + "/" + service.TextSplitPipelineID, - service.NamespaceID + "/" + service.MdSplitPipelineID}, + service.NamespaceID + "/" + service.TextChunkPipelineID, + service.NamespaceID + "/" + service.MdChunkPipelineID}, EmbeddingPipelines: []string{service.NamespaceID + "/" + service.TextEmbedPipelineID}, DownstreamApps: []string{}, TotalFiles: uint32(fileCounts[kb.UID]), @@ -304,10 +304,10 @@ func (ph *PublicHandler) UpdateCatalog(ctx context.Context, req *artifactpb.Upda CreateTime: kb.CreateTime.String(), UpdateTime: kb.UpdateTime.String(), OwnerName: kb.Owner, - ConvertingPipelines: []string{service.NamespaceID + "/" + service.ConvertPDFToMDPipelineID}, + ConvertingPipelines: []string{service.NamespaceID + "/" + service.ConvertDocToMDPipelineID}, SplittingPipelines: []string{ - service.NamespaceID + "/" + service.TextSplitPipelineID, - service.NamespaceID + "/" + service.MdSplitPipelineID}, + service.NamespaceID + "/" + service.TextChunkPipelineID, + service.NamespaceID + "/" + service.MdChunkPipelineID}, EmbeddingPipelines: []string{service.NamespaceID + "/" + service.TextEmbedPipelineID}, DownstreamApps: []string{}, TotalFiles: uint32(fileCounts[kb.UID]), diff --git a/pkg/service/pipeline.go b/pkg/service/pipeline.go index 7593f13..08c1f2d 100644 --- a/pkg/service/pipeline.go +++ b/pkg/service/pipeline.go @@ -16,22 +16,34 @@ import ( "google.golang.org/protobuf/types/known/structpb" ) -const chunkLength = 800 +const chunkLength = 1024 const chunkOverlap = 200 const NamespaceID = "preset" -const PDFToMDVersion = "v1.1.1" + + +// Note: this pipeline is for the old indexing pipeline +const ConvertDocToMDPipelineID = "indexing-convert-pdf" +const DocToMDVersion = "v1.1.1" + +// TODO: the pipeline id is not correct, need to update the pipeline id +const ConvertDocToMDPipelineID2 = "indexing-advanced-convert-doc" +// TODO: the version is not correct, need to update the version +const DocToMDVersion2 = "v1.0.1" + +const MdChunkPipelineID = "indexing-split-markdown" const MdSplitVersion = "v2.0.0" + +const TextChunkPipelineID = "indexing-split-text" const TextSplitVersion = "v2.0.0" + +const TextEmbedPipelineID = "indexing-embed" const TextEmbedVersion = "v1.1.0" + +const QAPipelineID = "retrieving-qna" const QAVersion = "v1.2.0" -const ConvertPDFToMDPipelineID = "indexing-convert-pdf" -const MdSplitPipelineID = "indexing-split-markdown" -const TextSplitPipelineID = "indexing-split-text" -const TextEmbedPipelineID = "indexing-embed" -const RetrievingQnA = "retrieving-qna" // ConvertToMDPipe using converting pipeline to convert some file type to MD and consume caller's credits -func (s *Service) ConvertToMDPipe(ctx context.Context, caller uuid.UUID, requester uuid.UUID, pdfBase64 string, fileType artifactPb.FileType) (string, error) { +func (s *Service) ConvertToMDPipe(ctx context.Context, caller uuid.UUID, requester uuid.UUID, fileBase64 string, fileType artifactPb.FileType) (string, error) { logger, _ := logger.GetZapLogger(ctx) var md metadata.MD if requester != uuid.Nil { @@ -47,46 +59,55 @@ func (s *Service) ConvertToMDPipe(ctx context.Context, caller uuid.UUID, request }) } ctx = metadata.NewOutgoingContext(ctx, md) - prefix := "" - if fileType == artifactPb.FileType_FILE_TYPE_PDF { - prefix = "data:application/pdf;base64," - } else if fileType == artifactPb.FileType_FILE_TYPE_DOCX { - prefix = "data:application/vnd.openxmlformats-officedocument.wordprocessingml.document;base64," - } else if fileType == artifactPb.FileType_FILE_TYPE_DOC { - prefix = "data:application/msword;base64," - } else if fileType == artifactPb.FileType_FILE_TYPE_PPT { - prefix = "data:application/vnd.ms-powerpoint;base64," - } else if fileType == artifactPb.FileType_FILE_TYPE_PPTX { - prefix = "data:application/vnd.openxmlformats-officedocument.presentationml.presentation;base64," - } else if fileType == artifactPb.FileType_FILE_TYPE_HTML { - prefix = "data:text/html;base64," - } else if fileType == artifactPb.FileType_FILE_TYPE_TEXT { - prefix = "data:text/plain;base64," - } else if fileType == artifactPb.FileType_FILE_TYPE_XLSX { - prefix = "data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64," - } else if fileType == artifactPb.FileType_FILE_TYPE_XLS { - prefix = "data:application/vnd.ms-excel;base64," - } else if fileType == artifactPb.FileType_FILE_TYPE_CSV { - prefix = "data:text/csv;base64," + + // Get the appropriate prefix for the file type + prefix := getFileTypePrefix(fileType) + + // Determine which pipeline and version to use based on file type + var pipelineID string + var version string + + switch fileType { + // Document types use the new pipeline + case artifactPb.FileType_FILE_TYPE_PDF, + artifactPb.FileType_FILE_TYPE_DOCX, + artifactPb.FileType_FILE_TYPE_DOC, + artifactPb.FileType_FILE_TYPE_PPT, + artifactPb.FileType_FILE_TYPE_PPTX, + artifactPb.FileType_FILE_TYPE_HTML: + pipelineID = ConvertDocToMDPipelineID2 + version = DocToMDVersion2 + + // Spreadsheet types and others use the original pipeline + case artifactPb.FileType_FILE_TYPE_XLSX, + artifactPb.FileType_FILE_TYPE_XLS, + artifactPb.FileType_FILE_TYPE_CSV: + pipelineID = ConvertDocToMDPipelineID + version = DocToMDVersion + + default: + return "", fmt.Errorf("unsupported file type: %v", fileType) } req := &pipelinePb.TriggerNamespacePipelineReleaseRequest{ NamespaceId: NamespaceID, - PipelineId: ConvertPDFToMDPipelineID, - ReleaseId: PDFToMDVersion, + PipelineId: pipelineID, + ReleaseId: version, Inputs: []*structpb.Struct{ { Fields: map[string]*structpb.Value{ - "document_input": {Kind: &structpb.Value_StringValue{StringValue: prefix + pdfBase64}}, + "document_input": {Kind: &structpb.Value_StringValue{StringValue: prefix + fileBase64}}, }, }, }, } + resp, err := s.PipelinePub.TriggerNamespacePipelineRelease(ctx, req) if err != nil { logger.Error("failed to trigger pipeline", zap.Error(err)) - return "", fmt.Errorf("failed to trigger %s pipeline: %w", ConvertPDFToMDPipelineID, err) + return "", fmt.Errorf("failed to trigger %s pipeline: %w", pipelineID, err) } + result, err := getConvertResult(resp) if err != nil { logger.Error("failed to get convert result", zap.Error(err)) @@ -95,6 +116,34 @@ func (s *Service) ConvertToMDPipe(ctx context.Context, caller uuid.UUID, request return result, nil } +// getFileTypePrefix returns the appropriate prefix for the given file type +func getFileTypePrefix(fileType artifactPb.FileType) string { + switch fileType { + case artifactPb.FileType_FILE_TYPE_PDF: + return "data:application/pdf;base64," + case artifactPb.FileType_FILE_TYPE_DOCX: + return "data:application/vnd.openxmlformats-officedocument.wordprocessingml.document;base64," + case artifactPb.FileType_FILE_TYPE_DOC: + return "data:application/msword;base64," + case artifactPb.FileType_FILE_TYPE_PPT: + return "data:application/vnd.ms-powerpoint;base64," + case artifactPb.FileType_FILE_TYPE_PPTX: + return "data:application/vnd.openxmlformats-officedocument.presentationml.presentation;base64," + case artifactPb.FileType_FILE_TYPE_HTML: + return "data:text/html;base64," + case artifactPb.FileType_FILE_TYPE_TEXT: + return "data:text/plain;base64," + case artifactPb.FileType_FILE_TYPE_XLSX: + return "data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64," + case artifactPb.FileType_FILE_TYPE_XLS: + return "data:application/vnd.ms-excel;base64," + case artifactPb.FileType_FILE_TYPE_CSV: + return "data:text/csv;base64," + default: + return "" + } +} + // Helper function to safely extract the "convert_result" from the response. // It checks if the index and key are available to avoid nil pointer issues. func getConvertResult(resp *pipelinePb.TriggerNamespacePipelineReleaseResponse) (string, error) { @@ -138,7 +187,7 @@ func (s *Service) SplitMarkdownPipe(ctx context.Context, caller uuid.UUID, reque ctx = metadata.NewOutgoingContext(ctx, md) req := &pipelinePb.TriggerNamespacePipelineReleaseRequest{ NamespaceId: NamespaceID, - PipelineId: MdSplitPipelineID, + PipelineId: MdChunkPipelineID, ReleaseId: MdSplitVersion, Inputs: []*structpb.Struct{ { @@ -152,7 +201,7 @@ func (s *Service) SplitMarkdownPipe(ctx context.Context, caller uuid.UUID, reque } res, err := s.PipelinePub.TriggerNamespacePipelineRelease(ctx, req) if err != nil { - return nil, fmt.Errorf("failed to trigger %s pipeline. err:%w", MdSplitPipelineID, err) + return nil, fmt.Errorf("failed to trigger %s pipeline. err:%w", MdChunkPipelineID, err) } result, err := GetChunksFromResponse(res) if err != nil { @@ -219,7 +268,7 @@ func (s *Service) SplitTextPipe(ctx context.Context, caller uuid.UUID, requester ctx = metadata.NewOutgoingContext(ctx, md) req := &pipelinePb.TriggerNamespacePipelineReleaseRequest{ NamespaceId: NamespaceID, - PipelineId: TextSplitPipelineID, + PipelineId: TextChunkPipelineID, ReleaseId: TextSplitVersion, Inputs: []*structpb.Struct{ @@ -234,7 +283,7 @@ func (s *Service) SplitTextPipe(ctx context.Context, caller uuid.UUID, requester } res, err := s.PipelinePub.TriggerNamespacePipelineRelease(ctx, req) if err != nil { - return nil, fmt.Errorf("failed to trigger %s pipeline. err:%w", TextSplitPipelineID, err) + return nil, fmt.Errorf("failed to trigger %s pipeline. err:%w", TextChunkPipelineID, err) } result, err := GetChunksFromResponse(res) if err != nil { @@ -410,7 +459,7 @@ func (s *Service) QuestionAnsweringPipe(ctx context.Context, caller uuid.UUID, r ctx = metadata.NewOutgoingContext(ctx, md) req := &pipelinePb.TriggerNamespacePipelineReleaseRequest{ NamespaceId: NamespaceID, - PipelineId: RetrievingQnA, + PipelineId: QAPipelineID, ReleaseId: QAVersion, Inputs: []*structpb.Struct{ { @@ -423,7 +472,7 @@ func (s *Service) QuestionAnsweringPipe(ctx context.Context, caller uuid.UUID, r } res, err := s.PipelinePub.TriggerNamespacePipelineRelease(ctx, req) if err != nil { - return "", fmt.Errorf("failed to trigger %s pipeline. err:%w", RetrievingQnA, err) + return "", fmt.Errorf("failed to trigger %s pipeline. err:%w", QAPipelineID, err) } reply := res.Outputs[0].GetFields()["assistant_reply"].GetStringValue() return reply, nil diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 4422d5b..e630398 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -454,7 +454,7 @@ func (wp *fileToEmbWorkerPool) processConvertingFile(ctx context.Context, file r base64Data := base64.StdEncoding.EncodeToString(data) // save the converting pipeline metadata into database - convertingPipelineMetadata := service.NamespaceID + "/" + service.ConvertPDFToMDPipelineID + "@" + service.PDFToMDVersion + convertingPipelineMetadata := service.NamespaceID + "/" + service.ConvertDocToMDPipelineID + "@" + service.DocToMDVersion err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", convertingPipelineMetadata, "", "", nil, nil, nil, nil) if err != nil { logger.Error("Failed to save converting pipeline metadata.", zap.String("File uid:", file.UID.String())) @@ -541,7 +541,7 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err } // save chunking pipeline metadata into file's extra metadata - chunkingPipelineMetadata := service.NamespaceID + "/" + service.MdSplitPipelineID + "@" + service.MdSplitVersion + chunkingPipelineMetadata := service.NamespaceID + "/" + service.MdChunkPipelineID + "@" + service.MdSplitVersion err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", chunkingPipelineMetadata, "", nil, nil, nil, nil) if err != nil { logger.Error("Failed to save chunking pipeline metadata.", zap.String("File uid:", file.UID.String())) @@ -583,7 +583,7 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err } // save chunking pipeline metadata into file's extra metadata - chunkingPipelineMetadata := service.NamespaceID + "/" + service.TextSplitPipelineID + "@" + service.TextSplitVersion + chunkingPipelineMetadata := service.NamespaceID + "/" + service.TextChunkPipelineID + "@" + service.TextSplitVersion err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", chunkingPipelineMetadata, "", nil, nil, nil, nil) if err != nil { logger.Error("Failed to save chunking pipeline metadata.", zap.String("File uid:", file.UID.String())) @@ -610,7 +610,7 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep } // save chunking pipeline metadata into file's extra metadata - chunkingPipelineMetadata := service.NamespaceID + "/" + service.MdSplitPipelineID + "@" + service.MdSplitVersion + chunkingPipelineMetadata := service.NamespaceID + "/" + service.MdChunkPipelineID + "@" + service.MdSplitVersion err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", chunkingPipelineMetadata, "", nil, nil, nil, nil) if err != nil { logger.Error("Failed to save chunking pipeline metadata.", zap.String("File uid:", file.UID.String()))