-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(wren-ai-service): Consolidate SQL Pairs Service and Remove Redundant Code #1268
Changes from 13 commits
06cb03e
9930b44
f9b0677
80ef5e9
5005fad
c48d51e
fb4f90e
42a7813
ab54820
640304d
9dd3a43
f93a443
5943227
61f7e13
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -8,21 +8,21 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from hamilton import base | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from hamilton.async_driver import AsyncDriver | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from haystack import Document, component | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from haystack.document_stores.types import DuplicatePolicy | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from haystack.document_stores.types import DocumentStore, DuplicatePolicy | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from langfuse.decorators import observe | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from pydantic import BaseModel | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from src.core.pipeline import BasicPipeline | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from src.core.provider import DocumentStoreProvider, EmbedderProvider | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from src.pipelines.indexing import AsyncDocumentWriter, SqlPairsCleaner | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from src.pipelines.indexing import AsyncDocumentWriter | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logger = logging.getLogger("wren-ai-service") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
class SqlPair(BaseModel): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
id: str | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sql: str | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
question: str | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sql: str = "" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
question: str = "" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@component | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -49,6 +49,30 @@ def run(self, sql_pairs: List[SqlPair], project_id: Optional[str] = ""): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@component | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
class SqlPairsCleaner: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def __init__(self, sql_pairs_store: DocumentStore) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.store = sql_pairs_store | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@component.output_types() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def run( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self, sql_pair_ids: List[str], project_id: Optional[str] = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
filter = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"operator": "AND", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"conditions": [ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{"field": "sql_pair_id", "operator": "in", "value": sql_pair_ids}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if project_id: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
filter["conditions"].append( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{"field": "project_id", "operator": "==", "value": project_id} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return await self.store.delete_documents(filter) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
## Start of Pipeline | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@observe(capture_input=False) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def boilerplates( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -155,9 +179,10 @@ def __init__( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
document_store=store, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
policy=DuplicatePolicy.OVERWRITE, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"external_pairs": _load_sql_pairs(sql_pairs_path), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self._external_pairs = _load_sql_pairs(sql_pairs_path) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
super().__init__( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
AsyncDriver({}, sys.modules[__name__], result_builder=base.DictResult()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -167,18 +192,35 @@ async def run( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
mdl_str: str, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
project_id: Optional[str] = "", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
external_pairs: Optional[Dict[str, Any]] = {}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) -> Dict[str, Any]: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logger.info( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
f"Project ID: {project_id} SQL Pairs Indexing pipeline is running..." | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return await self._pipe.execute( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
["write"], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
inputs={ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"mdl_str": mdl_str, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"project_id": project_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
**self._components, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
input = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"mdl_str": mdl_str, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"project_id": project_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"external_pairs": { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
**self._external_pairs, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
**external_pairs, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
**self._components, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return await self._pipe.execute(["write"], inputs=input) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
200
to
+212
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider error handling in pipeline execution. The pipeline + try:
+ return await self._pipe.execute(["write"], inputs=input)
+ except Exception as e:
+ logger.error(f"Pipeline execution failed: {e}")
+ raise 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@observe(name="Clean Documents for SQL Pairs") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def clean( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sql_pairs: List[SqlPair], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
project_id: Optional[str] = None, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await clean( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sql_pairs=sql_pairs, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
embedding={"documents": []}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cleaner=self._components["cleaner"], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
project_id=project_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Verify error handling for dynamic method invocation.
Using
getattr
without checking if the method exists could raiseAttributeError
.📝 Committable suggestion