Skip to content

Commit

Permalink
feat(catalog): use advanced converting pipleine (#122)
Browse files Browse the repository at this point in the history
Because

we need the better pipeline to convert the file to markdown

This commit

uses the advanced one based on file type
  • Loading branch information
Yougigun authored Oct 29, 2024
1 parent 860b539 commit 2332507
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 49 deletions.
12 changes: 6 additions & 6 deletions pkg/handler/knowledgebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,10 @@ func (ph *PublicHandler) ListCatalogs(ctx context.Context, req *artifactpb.ListC
CreateTime: kb.CreateTime.String(),
UpdateTime: kb.UpdateTime.String(),
OwnerName: kb.Owner,
ConvertingPipelines: []string{service.NamespaceID + "/" + service.ConvertPDFToMDPipelineID},
ConvertingPipelines: []string{service.NamespaceID + "/" + service.ConvertDocToMDPipelineID},
SplittingPipelines: []string{
service.NamespaceID + "/" + service.TextSplitPipelineID,
service.NamespaceID + "/" + service.MdSplitPipelineID},
service.NamespaceID + "/" + service.TextChunkPipelineID,
service.NamespaceID + "/" + service.MdChunkPipelineID},
EmbeddingPipelines: []string{service.NamespaceID + "/" + service.TextEmbedPipelineID},
DownstreamApps: []string{},
TotalFiles: uint32(fileCounts[kb.UID]),
Expand Down Expand Up @@ -304,10 +304,10 @@ func (ph *PublicHandler) UpdateCatalog(ctx context.Context, req *artifactpb.Upda
CreateTime: kb.CreateTime.String(),
UpdateTime: kb.UpdateTime.String(),
OwnerName: kb.Owner,
ConvertingPipelines: []string{service.NamespaceID + "/" + service.ConvertPDFToMDPipelineID},
ConvertingPipelines: []string{service.NamespaceID + "/" + service.ConvertDocToMDPipelineID},
SplittingPipelines: []string{
service.NamespaceID + "/" + service.TextSplitPipelineID,
service.NamespaceID + "/" + service.MdSplitPipelineID},
service.NamespaceID + "/" + service.TextChunkPipelineID,
service.NamespaceID + "/" + service.MdChunkPipelineID},
EmbeddingPipelines: []string{service.NamespaceID + "/" + service.TextEmbedPipelineID},
DownstreamApps: []string{},
TotalFiles: uint32(fileCounts[kb.UID]),
Expand Down
127 changes: 88 additions & 39 deletions pkg/service/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,34 @@ import (
"google.golang.org/protobuf/types/known/structpb"
)

const chunkLength = 800
const chunkLength = 1024
const chunkOverlap = 200
const NamespaceID = "preset"
const PDFToMDVersion = "v1.1.1"


// Note: this pipeline is for the old indexing pipeline
const ConvertDocToMDPipelineID = "indexing-convert-pdf"
const DocToMDVersion = "v1.1.1"

// TODO: the pipeline id is not correct, need to update the pipeline id
const ConvertDocToMDPipelineID2 = "indexing-advanced-convert-doc"
// TODO: the version is not correct, need to update the version
const DocToMDVersion2 = "v1.0.1"

const MdChunkPipelineID = "indexing-split-markdown"
const MdSplitVersion = "v2.0.0"

const TextChunkPipelineID = "indexing-split-text"
const TextSplitVersion = "v2.0.0"

const TextEmbedPipelineID = "indexing-embed"
const TextEmbedVersion = "v1.1.0"

const QAPipelineID = "retrieving-qna"
const QAVersion = "v1.2.0"
const ConvertPDFToMDPipelineID = "indexing-convert-pdf"
const MdSplitPipelineID = "indexing-split-markdown"
const TextSplitPipelineID = "indexing-split-text"
const TextEmbedPipelineID = "indexing-embed"
const RetrievingQnA = "retrieving-qna"

// ConvertToMDPipe using converting pipeline to convert some file type to MD and consume caller's credits
func (s *Service) ConvertToMDPipe(ctx context.Context, caller uuid.UUID, requester uuid.UUID, pdfBase64 string, fileType artifactPb.FileType) (string, error) {
func (s *Service) ConvertToMDPipe(ctx context.Context, caller uuid.UUID, requester uuid.UUID, fileBase64 string, fileType artifactPb.FileType) (string, error) {
logger, _ := logger.GetZapLogger(ctx)
var md metadata.MD
if requester != uuid.Nil {
Expand All @@ -47,46 +59,55 @@ func (s *Service) ConvertToMDPipe(ctx context.Context, caller uuid.UUID, request
})
}
ctx = metadata.NewOutgoingContext(ctx, md)
prefix := ""
if fileType == artifactPb.FileType_FILE_TYPE_PDF {
prefix = "data:application/pdf;base64,"
} else if fileType == artifactPb.FileType_FILE_TYPE_DOCX {
prefix = "data:application/vnd.openxmlformats-officedocument.wordprocessingml.document;base64,"
} else if fileType == artifactPb.FileType_FILE_TYPE_DOC {
prefix = "data:application/msword;base64,"
} else if fileType == artifactPb.FileType_FILE_TYPE_PPT {
prefix = "data:application/vnd.ms-powerpoint;base64,"
} else if fileType == artifactPb.FileType_FILE_TYPE_PPTX {
prefix = "data:application/vnd.openxmlformats-officedocument.presentationml.presentation;base64,"
} else if fileType == artifactPb.FileType_FILE_TYPE_HTML {
prefix = "data:text/html;base64,"
} else if fileType == artifactPb.FileType_FILE_TYPE_TEXT {
prefix = "data:text/plain;base64,"
} else if fileType == artifactPb.FileType_FILE_TYPE_XLSX {
prefix = "data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64,"
} else if fileType == artifactPb.FileType_FILE_TYPE_XLS {
prefix = "data:application/vnd.ms-excel;base64,"
} else if fileType == artifactPb.FileType_FILE_TYPE_CSV {
prefix = "data:text/csv;base64,"

// Get the appropriate prefix for the file type
prefix := getFileTypePrefix(fileType)

// Determine which pipeline and version to use based on file type
var pipelineID string
var version string

switch fileType {
// Document types use the new pipeline
case artifactPb.FileType_FILE_TYPE_PDF,
artifactPb.FileType_FILE_TYPE_DOCX,
artifactPb.FileType_FILE_TYPE_DOC,
artifactPb.FileType_FILE_TYPE_PPT,
artifactPb.FileType_FILE_TYPE_PPTX,
artifactPb.FileType_FILE_TYPE_HTML:
pipelineID = ConvertDocToMDPipelineID2
version = DocToMDVersion2

// Spreadsheet types and others use the original pipeline
case artifactPb.FileType_FILE_TYPE_XLSX,
artifactPb.FileType_FILE_TYPE_XLS,
artifactPb.FileType_FILE_TYPE_CSV:
pipelineID = ConvertDocToMDPipelineID
version = DocToMDVersion

default:
return "", fmt.Errorf("unsupported file type: %v", fileType)
}

req := &pipelinePb.TriggerNamespacePipelineReleaseRequest{
NamespaceId: NamespaceID,
PipelineId: ConvertPDFToMDPipelineID,
ReleaseId: PDFToMDVersion,
PipelineId: pipelineID,
ReleaseId: version,
Inputs: []*structpb.Struct{
{
Fields: map[string]*structpb.Value{
"document_input": {Kind: &structpb.Value_StringValue{StringValue: prefix + pdfBase64}},
"document_input": {Kind: &structpb.Value_StringValue{StringValue: prefix + fileBase64}},
},
},
},
}

resp, err := s.PipelinePub.TriggerNamespacePipelineRelease(ctx, req)
if err != nil {
logger.Error("failed to trigger pipeline", zap.Error(err))
return "", fmt.Errorf("failed to trigger %s pipeline: %w", ConvertPDFToMDPipelineID, err)
return "", fmt.Errorf("failed to trigger %s pipeline: %w", pipelineID, err)
}

result, err := getConvertResult(resp)
if err != nil {
logger.Error("failed to get convert result", zap.Error(err))
Expand All @@ -95,6 +116,34 @@ func (s *Service) ConvertToMDPipe(ctx context.Context, caller uuid.UUID, request
return result, nil
}

// getFileTypePrefix returns the appropriate prefix for the given file type
func getFileTypePrefix(fileType artifactPb.FileType) string {
switch fileType {
case artifactPb.FileType_FILE_TYPE_PDF:
return "data:application/pdf;base64,"
case artifactPb.FileType_FILE_TYPE_DOCX:
return "data:application/vnd.openxmlformats-officedocument.wordprocessingml.document;base64,"
case artifactPb.FileType_FILE_TYPE_DOC:
return "data:application/msword;base64,"
case artifactPb.FileType_FILE_TYPE_PPT:
return "data:application/vnd.ms-powerpoint;base64,"
case artifactPb.FileType_FILE_TYPE_PPTX:
return "data:application/vnd.openxmlformats-officedocument.presentationml.presentation;base64,"
case artifactPb.FileType_FILE_TYPE_HTML:
return "data:text/html;base64,"
case artifactPb.FileType_FILE_TYPE_TEXT:
return "data:text/plain;base64,"
case artifactPb.FileType_FILE_TYPE_XLSX:
return "data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64,"
case artifactPb.FileType_FILE_TYPE_XLS:
return "data:application/vnd.ms-excel;base64,"
case artifactPb.FileType_FILE_TYPE_CSV:
return "data:text/csv;base64,"
default:
return ""
}
}

// Helper function to safely extract the "convert_result" from the response.
// It checks if the index and key are available to avoid nil pointer issues.
func getConvertResult(resp *pipelinePb.TriggerNamespacePipelineReleaseResponse) (string, error) {
Expand Down Expand Up @@ -138,7 +187,7 @@ func (s *Service) SplitMarkdownPipe(ctx context.Context, caller uuid.UUID, reque
ctx = metadata.NewOutgoingContext(ctx, md)
req := &pipelinePb.TriggerNamespacePipelineReleaseRequest{
NamespaceId: NamespaceID,
PipelineId: MdSplitPipelineID,
PipelineId: MdChunkPipelineID,
ReleaseId: MdSplitVersion,
Inputs: []*structpb.Struct{
{
Expand All @@ -152,7 +201,7 @@ func (s *Service) SplitMarkdownPipe(ctx context.Context, caller uuid.UUID, reque
}
res, err := s.PipelinePub.TriggerNamespacePipelineRelease(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to trigger %s pipeline. err:%w", MdSplitPipelineID, err)
return nil, fmt.Errorf("failed to trigger %s pipeline. err:%w", MdChunkPipelineID, err)
}
result, err := GetChunksFromResponse(res)
if err != nil {
Expand Down Expand Up @@ -219,7 +268,7 @@ func (s *Service) SplitTextPipe(ctx context.Context, caller uuid.UUID, requester
ctx = metadata.NewOutgoingContext(ctx, md)
req := &pipelinePb.TriggerNamespacePipelineReleaseRequest{
NamespaceId: NamespaceID,
PipelineId: TextSplitPipelineID,
PipelineId: TextChunkPipelineID,
ReleaseId: TextSplitVersion,

Inputs: []*structpb.Struct{
Expand All @@ -234,7 +283,7 @@ func (s *Service) SplitTextPipe(ctx context.Context, caller uuid.UUID, requester
}
res, err := s.PipelinePub.TriggerNamespacePipelineRelease(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to trigger %s pipeline. err:%w", TextSplitPipelineID, err)
return nil, fmt.Errorf("failed to trigger %s pipeline. err:%w", TextChunkPipelineID, err)
}
result, err := GetChunksFromResponse(res)
if err != nil {
Expand Down Expand Up @@ -410,7 +459,7 @@ func (s *Service) QuestionAnsweringPipe(ctx context.Context, caller uuid.UUID, r
ctx = metadata.NewOutgoingContext(ctx, md)
req := &pipelinePb.TriggerNamespacePipelineReleaseRequest{
NamespaceId: NamespaceID,
PipelineId: RetrievingQnA,
PipelineId: QAPipelineID,
ReleaseId: QAVersion,
Inputs: []*structpb.Struct{
{
Expand All @@ -423,7 +472,7 @@ func (s *Service) QuestionAnsweringPipe(ctx context.Context, caller uuid.UUID, r
}
res, err := s.PipelinePub.TriggerNamespacePipelineRelease(ctx, req)
if err != nil {
return "", fmt.Errorf("failed to trigger %s pipeline. err:%w", RetrievingQnA, err)
return "", fmt.Errorf("failed to trigger %s pipeline. err:%w", QAPipelineID, err)
}
reply := res.Outputs[0].GetFields()["assistant_reply"].GetStringValue()
return reply, nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func (wp *fileToEmbWorkerPool) processConvertingFile(ctx context.Context, file r
base64Data := base64.StdEncoding.EncodeToString(data)

// save the converting pipeline metadata into database
convertingPipelineMetadata := service.NamespaceID + "/" + service.ConvertPDFToMDPipelineID + "@" + service.PDFToMDVersion
convertingPipelineMetadata := service.NamespaceID + "/" + service.ConvertDocToMDPipelineID + "@" + service.DocToMDVersion
err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", convertingPipelineMetadata, "", "", nil, nil, nil, nil)
if err != nil {
logger.Error("Failed to save converting pipeline metadata.", zap.String("File uid:", file.UID.String()))
Expand Down Expand Up @@ -541,7 +541,7 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep
return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err
}
// save chunking pipeline metadata into file's extra metadata
chunkingPipelineMetadata := service.NamespaceID + "/" + service.MdSplitPipelineID + "@" + service.MdSplitVersion
chunkingPipelineMetadata := service.NamespaceID + "/" + service.MdChunkPipelineID + "@" + service.MdSplitVersion
err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", chunkingPipelineMetadata, "", nil, nil, nil, nil)
if err != nil {
logger.Error("Failed to save chunking pipeline metadata.", zap.String("File uid:", file.UID.String()))
Expand Down Expand Up @@ -583,7 +583,7 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep
return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err
}
// save chunking pipeline metadata into file's extra metadata
chunkingPipelineMetadata := service.NamespaceID + "/" + service.TextSplitPipelineID + "@" + service.TextSplitVersion
chunkingPipelineMetadata := service.NamespaceID + "/" + service.TextChunkPipelineID + "@" + service.TextSplitVersion
err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", chunkingPipelineMetadata, "", nil, nil, nil, nil)
if err != nil {
logger.Error("Failed to save chunking pipeline metadata.", zap.String("File uid:", file.UID.String()))
Expand All @@ -610,7 +610,7 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep
}

// save chunking pipeline metadata into file's extra metadata
chunkingPipelineMetadata := service.NamespaceID + "/" + service.MdSplitPipelineID + "@" + service.MdSplitVersion
chunkingPipelineMetadata := service.NamespaceID + "/" + service.MdChunkPipelineID + "@" + service.MdSplitVersion
err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", chunkingPipelineMetadata, "", nil, nil, nil, nil)
if err != nil {
logger.Error("Failed to save chunking pipeline metadata.", zap.String("File uid:", file.UID.String()))
Expand Down

0 comments on commit 2332507

Please sign in to comment.