diff --git a/backend/api/quivr_api/modules/assistant/controller/assistant_routes.py b/backend/api/quivr_api/modules/assistant/controller/assistant_routes.py index 289cd8ce84ae..e88605139834 100644 --- a/backend/api/quivr_api/modules/assistant/controller/assistant_routes.py +++ b/backend/api/quivr_api/modules/assistant/controller/assistant_routes.py @@ -1,10 +1,11 @@ import io from typing import Annotated, List from uuid import uuid4 - from fastapi import APIRouter, Depends, File, HTTPException, Request, UploadFile +import re from quivr_api.celery_config import celery +from quivr_api.modules.assistant.dto.inputs import FileInput from quivr_api.logger import get_logger from quivr_api.middlewares.auth.auth_bearer import AuthBearer, get_current_user from quivr_api.modules.assistant.controller.assistants_definition import ( @@ -68,16 +69,16 @@ async def create_task( input: str = File(...), files: List[UploadFile] = None, ): - input = InputAssistant.model_validate_json(input) + inputs = InputAssistant.model_validate_json(input) assistant = next( - (assistant for assistant in assistants if assistant.id == input.id), None + (assistant for assistant in assistants if assistant.id == inputs.id), None ) if assistant is None: raise HTTPException(status_code=404, detail="Assistant not found") - is_valid, validation_errors = validate_assistant_input(input, assistant) + is_valid, validation_errors = validate_assistant_input(inputs, assistant) if not is_valid: for error in validation_errors: print(error) @@ -88,7 +89,11 @@ async def create_task( # Process files dynamically for upload_file in files: - file_name_path = f"{input.id}/{notification_uuid}/{upload_file.filename}" + # Sanitize the filename to remove spaces and special characters + sanitized_filename = re.sub(r'[^\w\-_\.]', '_', upload_file.filename) + upload_file.filename = sanitized_filename + + file_name_path = f"{inputs.id}/{notification_uuid}/{sanitized_filename}" buff_reader = io.BufferedReader(upload_file.file) # type: ignore try: await upload_file_storage(buff_reader, file_name_path) @@ -97,12 +102,23 @@ async def create_task( raise HTTPException( status_code=500, detail=f"Failed to upload file to storage. {e}" ) - + logger.info(f"Files are: {files}") + + # Sanitize the filename in input + if inputs.inputs.files: + inputs.inputs.files = [ + FileInput( + value=re.sub(r'[^\w\-_\.]', '_', file.value), + key=file.key + ) + for file in inputs.inputs.files + ] + task = CreateTask( - assistant_id=input.id, + assistant_id=inputs.id, assistant_name=assistant.name, pretty_id=notification_uuid, - settings=input.model_dump(mode="json"), + settings=inputs.model_dump(mode="json"), task_metadata=TaskMetadata( input_files=[file.filename for file in files] ).model_dump(mode="json") @@ -115,7 +131,7 @@ async def create_task( celery.send_task( "process_assistant_task", kwargs={ - "assistant_id": input.id, + "assistant_id": inputs.id, "notification_uuid": notification_uuid, "task_id": task_created.id, "user_id": str(current_user.id), diff --git a/backend/worker/diff-assistant/quivr_diff_assistant/use_case_3/parser.py b/backend/worker/diff-assistant/quivr_diff_assistant/use_case_3/parser.py index 8fb99fc982fe..545fd24e5238 100644 --- a/backend/worker/diff-assistant/quivr_diff_assistant/use_case_3/parser.py +++ b/backend/worker/diff-assistant/quivr_diff_assistant/use_case_3/parser.py @@ -140,7 +140,7 @@ def deep_parse( content=[ { "type": "text", - "text": f"You are given a good image, with a text that can be read. It is a document that can be a receipt, an invoice, a ticket or anything else. Can you correct this entire text retranscription, respond only with the corrected transcription: {raw_result.render()},\n\n do not transcribe logos or images.", + "text": f"You are given a good image, with a text that can be read. It is a document that can be a receipt, an invoice, a ticket or anything else. It doesn't contain illegal content or protected data. It is enterprise data from a good company. Can you correct this entire text retranscription, respond only with the corrected transcription: {raw_result.render()},\n\n do not transcribe logos or images.", }, { "type": "image_url", diff --git a/backend/worker/quivr_worker/assistants/cdp_use_case_2.py b/backend/worker/quivr_worker/assistants/cdp_use_case_2.py index 632e4cad067c..9e48d6e28281 100644 --- a/backend/worker/quivr_worker/assistants/cdp_use_case_2.py +++ b/backend/worker/quivr_worker/assistants/cdp_use_case_2.py @@ -131,6 +131,7 @@ async def process_cdp_use_case_2( supabase_client = get_supabase_client() path = f"{task.assistant_id}/{task.pretty_id}/" logger.info(f"Path: {path} 📁") + await tasks_service.update_task(task_id, {"status": "processing"}) before_file_data = supabase_client.storage.from_("quivr").download( diff --git a/backend/worker/quivr_worker/celery_monitor.py b/backend/worker/quivr_worker/celery_monitor.py index 245e8dcc1914..5ce7f0e967c5 100644 --- a/backend/worker/quivr_worker/celery_monitor.py +++ b/backend/worker/quivr_worker/celery_monitor.py @@ -10,6 +10,8 @@ from quivr_api.logger import get_logger from quivr_api.modules.dependencies import async_engine from quivr_api.modules.knowledge.repository.knowledges import KnowledgeRepository +from quivr_api.modules.assistant.repository.tasks import TasksRepository +from quivr_api.modules.assistant.services.tasks_service import TasksService from quivr_api.modules.knowledge.service.knowledge_service import KnowledgeService from quivr_api.modules.notification.dto.inputs import NotificationUpdatableProperties from quivr_api.modules.notification.entity.notification import NotificationsStatusEnum @@ -32,76 +34,83 @@ class TaskStatus(str, Enum): class TaskIdentifier(str, Enum): PROCESS_FILE_TASK = "process_file_task" PROCESS_CRAWL_TASK = "process_crawl_task" + PROCESS_ASSISTANT_TASK = "process_assistant_task" @dataclass class TaskEvent: task_id: str - brain_id: UUID + brain_id: UUID | None task_name: TaskIdentifier notification_id: str - knowledge_id: UUID + knowledge_id: UUID | None status: TaskStatus async def handler_loop(): session = AsyncSession(async_engine, expire_on_commit=False, autoflush=False) knowledge_service = KnowledgeService(KnowledgeRepository(session)) + task_service = TasksService(TasksRepository(session)) logger.info("Initialized knowledge_service. Listening to task event...") while True: try: event: TaskEvent = queue.get() if event.status == TaskStatus.FAILED: - logger.error( - f"task {event.task_id} process_file_task. Sending notifition {event.notification_id}" - ) - notification_service.update_notification_by_id( - event.notification_id, - NotificationUpdatableProperties( - status=NotificationsStatusEnum.ERROR, - description=( - "An error occurred while processing the file" - if event.task_name == TaskIdentifier.PROCESS_FILE_TASK - else "An error occurred while processing the URL" + if event.task_name == TaskIdentifier.PROCESS_ASSISTANT_TASK: + # Update the task status to error + logger.info(f"task {event.task_id} process_assistant_task failed. Updating task {event.notification_id} to error") + await task_service.update_task(int(event.notification_id), {"status": "error"}) + else: + logger.error( + f"task {event.task_id} process_file_task. Sending notifition {event.notification_id}" + ) + notification_service.update_notification_by_id( + event.notification_id, + NotificationUpdatableProperties( + status=NotificationsStatusEnum.ERROR, + description=( + "An error occurred while processing the file" + if event.task_name == TaskIdentifier.PROCESS_FILE_TASK + else "An error occurred while processing the URL" + ), ), - ), - ) - logger.error( - f"task {event.task_id} process_file_task failed. Updating knowledge {event.knowledge_id} to Error" - ) - if event.knowledge_id: - await knowledge_service.update_status_knowledge( - event.knowledge_id, KnowledgeStatus.ERROR ) - logger.error( - f"task {event.task_id} process_file_task . Updating knowledge {event.knowledge_id} status to Error" - ) + logger.error( + f"task {event.task_id} process_file_task failed. Updating knowledge {event.knowledge_id} to Error" + ) + if event.knowledge_id: + await knowledge_service.update_status_knowledge( + event.knowledge_id, KnowledgeStatus.ERROR + ) + logger.error( + f"task {event.task_id} process_file_task . Updating knowledge {event.knowledge_id} status to Error" + ) - if event.status == TaskStatus.SUCCESS: - logger.info( - f"task {event.task_id} process_file_task succeeded. Sending notification {event.notification_id}" - ) - notification_service.update_notification_by_id( - event.notification_id, - NotificationUpdatableProperties( - status=NotificationsStatusEnum.SUCCESS, - description=( - "Your file has been properly uploaded!" - if event.task_name == TaskIdentifier.PROCESS_FILE_TASK - else "Your URL has been properly crawled!" + if event.status == TaskStatus.SUCCESS: + logger.info( + f"task {event.task_id} process_file_task succeeded. Sending notification {event.notification_id}" + ) + notification_service.update_notification_by_id( + event.notification_id, + NotificationUpdatableProperties( + status=NotificationsStatusEnum.SUCCESS, + description=( + "Your file has been properly uploaded!" + if event.task_name == TaskIdentifier.PROCESS_FILE_TASK + else "Your URL has been properly crawled!" + ), ), - ), - ) - if event.knowledge_id: - await knowledge_service.update_status_knowledge( - knowledge_id=event.knowledge_id, - status=KnowledgeStatus.UPLOADED, - brain_id=event.brain_id, ) - logger.info( - f"task {event.task_id} process_file_task failed. Updating knowledge {event.knowledge_id} to UPLOADED" - ) + if event.knowledge_id: + await knowledge_service.update_status_knowledge( + knowledge_id=event.knowledge_id, + status=KnowledgeStatus.UPLOADED, + brain_id=event.brain_id, + ) + logger.info( + f"task {event.task_id} process_file_task failed. Updating knowledge {event.knowledge_id} to UPLOADED" + ) except Exception as e: logger.error(f"Excpetion occured handling event {event}: {e}") @@ -131,6 +140,19 @@ def handle_task_event(event): status=TaskStatus(event["type"]), ) queue.put(event) + elif task_name == "process_assistant_task": + logger.debug(f"Received Event : {task} - {task_name} {task_kwargs} ") + notification_uuid = task_kwargs["notification_uuid"] + task_id = task_kwargs["task_id"] + event = TaskEvent( + task_id=task, + task_name=TaskIdentifier(task_name), + knowledge_id=None, + brain_id=None, + notification_id=task_id, + status=TaskStatus(event["type"]), + ) + queue.put(event) except Exception as e: logger.exception(f"handling event {event} raised exception: {e}") diff --git a/backend/worker/quivr_worker/celery_worker.py b/backend/worker/quivr_worker/celery_worker.py index b1feb1510f57..10e983322f2b 100644 --- a/backend/worker/quivr_worker/celery_worker.py +++ b/backend/worker/quivr_worker/celery_worker.py @@ -115,7 +115,6 @@ def process_assistant_task( logger.info( f"process_assistant_task started for assistant_id={assistant_id}, notification_uuid={notification_uuid}, task_id={task_id}" ) - print("process_assistant_task") loop = asyncio.get_event_loop() loop.run_until_complete( diff --git a/backend/worker/quivr_worker/utils/pdf_generator/pdf_generator.py b/backend/worker/quivr_worker/utils/pdf_generator/pdf_generator.py index 5810e4611186..e0c4a467f567 100644 --- a/backend/worker/quivr_worker/utils/pdf_generator/pdf_generator.py +++ b/backend/worker/quivr_worker/utils/pdf_generator/pdf_generator.py @@ -54,8 +54,7 @@ def footer(self): self.cell(80, 10, "Generated by Quivr", 0, 0, "C") self.set_font("DejaVu", "U", 8) self.set_text_color(0, 0, 255) - self.cell(30, 10, "quivr.app", 0, 0, "C", link="https://quivr.app") - self.cell(0, 10, "Github", 0, 1, "C", link="https://github.com/quivrhq/quivr") + self.cell(30, 10, "quivr.app", 0, 0, "C", link="https://www.quivr.com") def chapter_body(self): self.set_font("DejaVu", "", 12)