Skip to content

Commit

Permalink
reverted back to pre-parallel version
Browse files Browse the repository at this point in the history
  • Loading branch information
Dephoh committed Dec 11, 2024
1 parent 5c86e88 commit 2b516d5
Showing 1 changed file with 18 additions and 43 deletions.
61 changes: 18 additions & 43 deletions RAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
from langchain_community.retrievers import BM25Retriever
import requests
from typing import Dict, Any, Optional, List, Tuple
import json
import logging
import concurrent.futures

def retrieve(query: str,vectorstore:PineconeVectorStore, k: int = 100) -> Tuple[List[Document], List[float]]:
def retrieve(query: str,vectorstore:PineconeVectorStore, k: int = 1000) -> Tuple[List[Document], List[float]]:
start = time.time()
# pinecone_api_key = os.getenv("PINECONE_API_KEY")
# pc = Pinecone(api_key=pinecone_api_key)
Expand All @@ -39,6 +39,7 @@ def retrieve(query: str,vectorstore:PineconeVectorStore, k: int = 100) -> Tuple[

def safe_get_json(url: str) -> Optional[Dict]:
"""Safely fetch and parse JSON from a URL."""
print("Fetching JSON")
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
Expand All @@ -63,52 +64,26 @@ def extract_text_from_json(json_data: Dict) -> str:

return " ".join(text_parts) if text_parts else "No content available"

def process_single_document(doc: Document) -> Optional[Document]:
"""Process a single document by fetching and extracting metadata."""
if not doc.metadata.get('source'):
return None

url = f"https://www.digitalcommonwealth.org/search/{doc.metadata['source']}"
json_data = safe_get_json(f"{url}.json")

if json_data:
text_content = extract_text_from_json(json_data)
if text_content:
return Document(
page_content=text_content,
metadata={
"source": doc.metadata['source'],
"field": doc.metadata['field'],
"URL": url
}
)
return None

def rerank(documents: List[Document], query: str, max_workers: int = 1) -> List[Document]:
"""Ingest more metadata and rerank documents using BM25 with parallel processing."""
def rerank(documents: List[Document], query: str) -> List[Document]:
"""Ingest more metadata. Rerank documents using BM25"""
start = time.time()
if not documents:
return []

meta_start = time.time()
full_docs = []

# Process documents in parallel using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all document processing tasks
future_to_doc = {
executor.submit(process_single_document, doc): doc
for doc in documents
}
meta_start = time.time()
for doc in documents:
if not doc.metadata.get('source'):
continue

url = f"https://www.digitalcommonwealth.org/search/{doc.metadata['source']}"
json_data = safe_get_json(f"{url}.json")

# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_doc):
processed_doc = future.result()
if processed_doc:extract_text_from_json():
full_docs.append(processed_doc)

if json_data:
text_content = extract_text_from_json(json_data)
if text_content: # Only add documents with actual content
full_docs.append(Document(page_content=text_content, metadata={"source":doc.metadata['source'],"field":doc.metadata['field'],"URL":url}))
logging.info(f"Took {time.time()-meta_start} seconds to retrieve all metadata")

# If no valid documents were processed, return empty list
if not full_docs:
return []
Expand All @@ -117,7 +92,7 @@ def rerank(documents: List[Document], query: str, max_workers: int = 1) -> List[
reranker = BM25Retriever.from_documents(full_docs, k=min(10, len(full_docs)))
reranked_docs = reranker.invoke(query)
logging.info(f"Finished reranking: {time.time()-start}")
return full_docs
return reranked_docs

def parse_xml_and_query(query:str,xml_string:str) -> str:
"""parse xml and return rephrased query"""
Expand Down Expand Up @@ -203,7 +178,7 @@ def RAG(llm: Any, query: str,vectorstore:PineconeVectorStore, top: int = 10, k:
First, reason about the answer between <REASONING></REASONING> headers,
based on the context determine if there is sufficient material for answering the exact question,
return either <VALID>YES</VALID> or <VALID>NO</VALID>
then return a response between <RESPONSE></RESPONSE> headers, your response should be well formatted and an individual summary of each piece of relevant context:
then return a response between <RESPONSE></RESPONSE> headers:
Here is an example
<EXAMPLE>
<QUERY>Are pineapples a good fuel for cars?</QUERY>
Expand Down

0 comments on commit 2b516d5

Please sign in to comment.