Skip to content

Commit

Permalink
feat: convert pdf to image concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
chuang8511 committed Nov 5, 2024
1 parent 8d29ffb commit c26232f
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 25 deletions.
3 changes: 3 additions & 0 deletions pkg/component/operator/document/v0/transformer/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ var (

//go:embed execution/pdf_checker.py
pdfChecker string

//go:embed execution/get_page_numbers.py
getPageNumbersExecution string
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from io import BytesIO
import json
import sys
import base64
import pdfplumber

if __name__ == "__main__":
try:
json_str = sys.stdin.buffer.read().decode('utf-8')
params = json.loads(json_str)
pdf_string = params["PDF"]
decoded_bytes = base64.b64decode(pdf_string)
pdf_file_obj = BytesIO(decoded_bytes)

pdf = pdfplumber.open(pdf_file_obj)

output = {
"page_numbers": len(pdf.pages)
}

print(json.dumps(output))
except Exception as e:
print(json.dumps({"error": str(e)}))
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,39 @@
import json
import base64
import sys
import pdfplumber

# TODO chuang8511:
# Deal with the import error when running the code in the docker container.
# Now, we combine all python code into one file to avoid the import error.
# from pdf_to_markdown import PDFTransformer
# from pdf_to_markdown import PageImageProcessor

if __name__ == "__main__":
json_str = sys.stdin.buffer.read().decode('utf-8')
params = json.loads(json_str)
filename = params["filename"]
pdf_string = params["PDF"]
page_idx = params["page_idx"]
if "resolution" in params and params["resolution"] != 0 and params["resolution"] != None:
resolution = params["resolution"]
else:
resolution = 500

decoded_bytes = base64.b64decode(pdf_string)
pdf_file_obj = BytesIO(decoded_bytes)
pdf = PDFTransformer(x=pdf_file_obj)
pages = pdf.raw_pages
exclude_file_extension = filename.split(".")[0]
filenames = []
images = []
pdf = pdfplumber.open(pdf_file_obj)

page = pdf.pages[page_idx]

for i, page in enumerate(pages):
page_image = page.to_image(resolution=resolution)
encoded_image = PageImageProcessor.encode_image(page_image)
images.append(encoded_image)
filenames.append(f"{exclude_file_extension}_{i}.png")
page_image = page.to_image(resolution=resolution)
encoded_image = PageImageProcessor.encode_image(page_image)

exclude_file_extension = filename.split(".")[0]
filename = f"{exclude_file_extension}_{page_idx}.png"

output = {
"images": images,
"filename": filenames,
"image": encoded_image,
"filename": filename,
}

print(json.dumps(output))
83 changes: 71 additions & 12 deletions pkg/component/operator/document/v0/transformer/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/instill-ai/pipeline-backend/pkg/component/internal/util"
)
Expand All @@ -19,6 +20,16 @@ type ConvertDocumentToImagesTransformerOutput struct {
Filenames []string `json:"filenames"`
}

type pageNumbers struct {
PageNumbers int `json:"page_numbers"`
Error string `json:"error"`
}

type pageImage struct {
Image string `json:"image"`
Filename string `json:"filename"`
}

func ConvertDocumentToImage(inputStruct *ConvertDocumentToImagesTransformerInput) (*ConvertDocumentToImagesTransformerOutput, error) {

contentType, err := util.GetContentTypeFromBase64(inputStruct.Document)
Expand Down Expand Up @@ -53,27 +64,75 @@ func ConvertDocumentToImage(inputStruct *ConvertDocumentToImagesTransformerInput
base64PDFWithoutMime = util.TrimBase64Mime(base64PDF)
}

paramsJSON := map[string]interface{}{
"PDF": base64PDFWithoutMime,
"filename": inputStruct.Filename,
"resolution": inputStruct.Resolution,
getNumberJSON := map[string]interface{}{
"PDF": base64PDFWithoutMime,
}

pythonCode := imageProcessor + pdfTransformer + taskConvertToImagesExecution
pageNumbersBytes, err := util.ExecutePythonCode(getPageNumbersExecution, getNumberJSON)
if err != nil {
return nil, fmt.Errorf("get page numbers: %w", err)
}

outputBytes, err := util.ExecutePythonCode(pythonCode, paramsJSON)
var pageNumbers pageNumbers
err = json.Unmarshal(pageNumbersBytes, &pageNumbers)
if err != nil || pageNumbers.Error != "" {
return nil, fmt.Errorf("unmarshal page numbers: %w, %s", err, pageNumbers.Error)
}

if err != nil {
return nil, fmt.Errorf("failed to run python script: %w", err)
if pageNumbers.PageNumbers == 0 {
return &ConvertDocumentToImagesTransformerOutput{
Images: []string{},
Filenames: []string{},
}, nil
}

output := ConvertDocumentToImagesTransformerOutput{}
pythonCode := imageProcessor + pdfTransformer + taskConvertToImagesExecution

err = json.Unmarshal(outputBytes, &output)
// We will make this number tunable & configurable in the future.
maxWorkers := 5

if err != nil {
return nil, fmt.Errorf("failed to unmarshal output: %w", err)
jobs := make(chan int, pageNumbers.PageNumbers)
output := ConvertDocumentToImagesTransformerOutput{
Images: make([]string, pageNumbers.PageNumbers),
Filenames: make([]string, pageNumbers.PageNumbers),
}

// Create workers
wg := sync.WaitGroup{}
for w := 0; w < maxWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for pageIdx := range jobs {
paramsJSON := map[string]interface{}{
"PDF": base64PDFWithoutMime,
"filename": inputStruct.Filename,
"resolution": inputStruct.Resolution,
"page_idx": pageIdx,
}
outputBytes, err := util.ExecutePythonCode(pythonCode, paramsJSON)
if err != nil {
continue
}
var pageImage pageImage
err = json.Unmarshal(outputBytes, &pageImage)
if err != nil {
continue
}
output.Images[pageIdx] = pageImage.Image
output.Filenames[pageIdx] = pageImage.Filename
}
}()
}

// Send jobs to workers
for i := 0; i < pageNumbers.PageNumbers; i++ {
jobs <- i
}
close(jobs)

// Wait for all workers to complete
wg.Wait()

if len(output.Filenames) == 0 {
output.Filenames = []string{}
Expand Down

0 comments on commit c26232f

Please sign in to comment.