Skip to content

Commit

Permalink
Refactor un to support worker.
Browse files Browse the repository at this point in the history
  • Loading branch information
mxdlzg committed May 4, 2024
1 parent 56caa68 commit 7d066c0
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 46 deletions.
11 changes: 11 additions & 0 deletions packages/service/common/file/read/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { addHours } from 'date-fns';
import { WorkerNameEnum, runWorker } from '../../../worker/utils';
import { ReadFileResponse } from '../../../worker/file/type';
import {DatasetSchemaType} from "@fastgpt/global/core/dataset/type";
import {initPdfText} from "../../../worker/file/extension/unstructured";

export const initMarkdownText = ({
teamId,
Expand Down Expand Up @@ -55,6 +56,16 @@ export const readFileRawContent = async ({
dataset
});

// pdf image query
if (['pdf'].includes(extension)) {
result.rawText = await initPdfText({
teamId: teamId,
metadata: metadata,
dataset: dataset,
pageElements: result?.metadata? result?.metadata["elements"] : []
});
}

// markdown data format
if (['md', 'html', 'docx'].includes(extension)) {
result.rawText = await initMarkdownText({
Expand Down
5 changes: 2 additions & 3 deletions packages/service/core/ai/functions/queryImageDescription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ export async function queryImageDescription({
rawTex,
image_base64,
model,
ai,
language="eng"
}: {
rawTex: string;
image_base64: string;
model: string;
ai: any
language?: string;
}) {
// 无"data:image/jpeg;base64,"开头的base64结构
Expand All @@ -49,9 +51,6 @@ export async function queryImageDescription({
}
];

const ai = getAIApi({
timeout: 480000
});
const data = await ai.chat.completions.create({
model: getLLMModel(model).model,
temperature: 0.1,
Expand Down
25 changes: 13 additions & 12 deletions packages/service/core/dataset/unstructured/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,42 @@ export type UnstructuredEnvType = {

let client: UnstructuredClient | null = null;

function initClient(){
function initClient(config?: UnstructuredEnvType){
if (!config){
config = global.unstructuredConfigs;
}
const httpClient = axios.create({
timeout: global.unstructuredConfigs.timeout,
timeout: config.timeout,
})

// httpClient.interceptors.request.use((config) => {
// return config;
// })
client = new UnstructuredClient({
serverURL: global.unstructuredConfigs.baseUrl || 'http://localhost:8000',
serverURL: config.baseUrl || 'http://localhost:8000',
security: {
apiKeyAuth: ""
},
defaultClient: httpClient,
retryConfig: {
logger: addLog,
strategy: global.unstructuredConfigs.retryConfig.strategy,
strategy: config.retryConfig.strategy,
retryConnectionErrors: true,
backoff: {
initialInterval: global.unstructuredConfigs.retryConfig.initialInterval,
maxInterval: global.unstructuredConfigs.retryConfig.maxInterval,
maxElapsedTime: global.unstructuredConfigs.retryConfig.maxElapsedTime,
exponent: global.unstructuredConfigs.retryConfig.exponent,
initialInterval: config.retryConfig.initialInterval,
maxInterval: config.retryConfig.maxInterval,
maxElapsedTime: config.retryConfig.maxElapsedTime,
exponent: config.retryConfig.exponent,
}
}
});
}



export const getClient = ({

}) => {
export const getClient = (config?: UnstructuredEnvType) => {
if (!client) {
initClient();
initClient(config);
}
return client;
}
52 changes: 29 additions & 23 deletions packages/service/worker/file/extension/unstructured.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
import { ReadFileByBufferParams, ReadFileResponse } from "../../../common/file/read/type";
import { initMarkdownText } from '../../../common/file/read/utils';
import { getDownloadStream, getFileById } from "../../../common/file/gridfs/controller";
import { queryImageDescription } from "../../../core/ai/functions/queryImageDescription";
import { getLLMModel } from "../../../core/ai/model";
import { getClient } from "../../../core/dataset/unstructured/config";
import { addLog } from "../../../common/system/log";
import { PDFDocument } from "pdf-lib"
import {ReadFileResponse} from "../../../common/file/read/type";
import {initMarkdownText} from '../../../common/file/read/utils';
import {queryImageDescription} from "../../../core/ai/functions/queryImageDescription";
import {getClient} from "../../../core/dataset/unstructured/config";
import {addLog} from "../../../common/system/log";
import {PDFDocument} from "pdf-lib"
import pLimit from "p-limit";
import {ReadRawTextByBuffer} from "../type";

type TokenType = {
str: string;
dir: string;
width: number;
height: number;
transform: number[];
fontName: string;
hasEOL: boolean;
};
import {workerData} from "worker_threads"
import {DatasetSchemaType} from "@fastgpt/global/core/dataset/type";
import {getAIApi} from "../../../core/ai/config";

type UnstructuredElementType = {
type: string;
Expand All @@ -32,7 +23,7 @@ type UnstructuredElementType = {
const limit = pLimit(3);

// 解构文件,目前接收pdf、word
export const readUnFile = async ({ buffer, preview, metadata, teamId, dataset }: ReadRawTextByBuffer): Promise<ReadFileResponse> => {
export const readUnFile = async ({ buffer, preview, metadata }: ReadRawTextByBuffer): Promise<ReadFileResponse> => {

if (preview) {
const pdfDoc = await PDFDocument.load(buffer);
Expand All @@ -47,7 +38,7 @@ export const readUnFile = async ({ buffer, preview, metadata, teamId, dataset }:

//1. 请求分割pdf
addLog.info(`File ${metadata?.relatedId} partition started.`);
const client = getClient({})
const client = getClient(workerData.globalConfig.unstructuredConfigs)
const res = await client?.general.partition({
files: {
content: buffer,
Expand All @@ -67,7 +58,23 @@ export const readUnFile = async ({ buffer, preview, metadata, teamId, dataset }:
if (!pageElements || pageElements.length == 0) {
pageElements = []
}
if (metadata) {
metadata["elements"] = pageElements;
}
return {
formatText: "", metadata: metadata, rawText: ""
}
}

export const initPdfText = async ({ metadata, teamId, dataset, pageElements }: {
metadata: any;
teamId: string;
dataset: DatasetSchemaType|undefined;
pageElements: any[];
}): Promise<string> => {
const ai = getAIApi({
timeout: 480000
})
//3. 请求llm-v对图片(图片和表格)进行描述 4. 将图片、表格插入mongodb
const asyncOperation = async (element: UnstructuredElementType) => {
if (["Image", "Table"].includes(element.type) && element.text.length >= 2 && element.metadata.image_base64) {
Expand All @@ -77,6 +84,7 @@ export const readUnFile = async ({ buffer, preview, metadata, teamId, dataset }:
rawTex: element.text,
image_base64: "data:image/jpeg;base64," + element.metadata.image_base64,
model: (dataset?.agentModel || "gemini-pro-vision"),
ai: ai,
language: element.metadata.languages[0],
}).catch(error => {
addLog.error(`Llm image ${element.element_id} error:`, error)
Expand Down Expand Up @@ -105,7 +113,5 @@ export const readUnFile = async ({ buffer, preview, metadata, teamId, dataset }:
}).join('');
addLog.info(`Join ${metadata?.relatedId} pdf text end.`);

return {
formatText: "", metadata: metadata, rawText: finalText
}
return finalText
}
2 changes: 1 addition & 1 deletion packages/service/worker/file/type.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ export type ReadRawTextProps<T> = {
encoding: string;
preview?: boolean;
teamId: string;
dataset?: DatasetSchemaType;
metadata?: Record<string, any>;
};

Expand All @@ -17,4 +16,5 @@ export type ReadRawTextByBuffer = ReadRawTextProps<Buffer>;
export type ReadFileResponse = {
rawText: string;
formatText?: string;
metadata?: Record<string, any>;
};
6 changes: 5 additions & 1 deletion packages/service/worker/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ export enum WorkerNameEnum {

export const getWorker = (name: WorkerNameEnum) => {
const workerPath = path.join(process.cwd(), '.next', 'server', 'worker', `${name}.js`);
return new Worker(workerPath);
return new Worker(workerPath, {workerData: {
globalConfig: {
unstructuredConfigs: global.unstructuredConfigs
}
}});
};

export const runWorker = <T = any>(name: WorkerNameEnum, params?: Record<string, any>) => {
Expand Down
2 changes: 2 additions & 0 deletions projects/app/src/global/common/api/systemRes.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import type {

import type { FastGPTFeConfigsType } from '@fastgpt/global/common/system/types/index.d';
import { SubPlanType } from '@fastgpt/global/support/wallet/sub/type';
import {UnstructuredEnvType} from "@fastgpt/service/core/dataset/unstructured/config";

export type InitDateResponse = {
llmModels: LLMModelItemType[];
vectorModels: VectorModelItemType[];
audioSpeechModels: AudioSpeechModels[];
reRankModels: ReRankModelItemType[];
whisperModel: WhisperModelType;
unstructuredConfigs: UnstructuredEnvType;
feConfigs: FastGPTFeConfigsType;
subPlans?: SubPlanType;
systemVersion: string;
Expand Down
1 change: 1 addition & 0 deletions projects/app/src/pages/api/common/system/getInitData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
jsonRes<InitDateResponse>(res, {
data: {
feConfigs: global.feConfigs,
unstructuredConfigs: global.unstructuredConfigs,
subPlans: global.subPlans,
llmModels: global.llmModels,
vectorModels: global.vectorModels,
Expand Down
7 changes: 1 addition & 6 deletions projects/app/src/service/events/generateFileChunk.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
import {readFileContentFromMongo} from "@fastgpt/service/common/file/gridfs/controller";
import {BucketNameEnum} from "@fastgpt/global/common/file/constants";
import {splitText2Chunks} from "@fastgpt/global/common/string/textSplitter";
import {DatasetCollectionTypeEnum, TrainingModeEnum} from "@fastgpt/global/core/dataset/constants";
import {TrainingModeEnum} from "@fastgpt/global/core/dataset/constants";
import {checkDatasetLimit} from "@fastgpt/service/support/permission/teamLimit";
import {predictDataLimitLength} from "@fastgpt/global/core/dataset/utils";
import {createTrainingUsage} from "@fastgpt/service/support/wallet/usage/controller";
import {UsageSourceEnum} from "@fastgpt/global/support/wallet/usage/constants";
import {getLLMModel, getVectorModel} from "@fastgpt/service/core/ai/model";
import {pushDataListToTrainingQueue} from "@fastgpt/service/core/dataset/training/controller";
import {MongoImage} from "@fastgpt/service/common/file/image/schema";
import {jsonRes} from "@fastgpt/service/common/response";
import {addLog} from "@fastgpt/service/common/system/log";
import {startTrainingQueue} from "@/service/core/dataset/training/utils";
import {DatasetSchemaType} from "@fastgpt/global/core/dataset/type";
import {ClientSession} from "@fastgpt/service/common/mongo";
import {mongoSessionRun} from "@fastgpt/service/common/mongo/sessionRun";
import {createOneCollection} from "@fastgpt/service/core/dataset/collection/controller";
import {putDatasetCollectionById} from "@/web/core/dataset/api";
import {hashStr} from "@fastgpt/global/common/string/tools";
import {MongoDatasetCollection} from "@fastgpt/service/core/dataset/collection/schema";
import {getCollectionUpdateTime} from "@fastgpt/service/core/dataset/collection/utils";

export const generateFileChunk = async ({
teamId,
Expand Down

0 comments on commit 7d066c0

Please sign in to comment.