Skip to content

Commit

Permalink
1.5 workers for parallel. Avoiding rate limits
Browse files Browse the repository at this point in the history
  • Loading branch information
Dephoh committed Dec 11, 2024
1 parent 706b16d commit 0716283
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions RAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
import concurrent.futures
import json
from threading import Lock

def retrieve(query: str,vectorstore:PineconeVectorStore, k: int = 100) -> Tuple[List[Document], List[float]]:
start = time.time()
Expand Down Expand Up @@ -85,17 +86,33 @@ def process_single_document(doc: Document) -> Optional[Document]:
)
return None

def rerank(documents: List[Document], query: str, max_workers: int = 3) -> List[Document]:
"""Ingest more metadata and rerank documents using BM25 with parallel processing."""
# Global state to track alternating behavior
_use_two_workers = False
_worker_lock = Lock()

def get_current_worker_count() -> int:
"""Thread-safe way to get and toggle the worker count between 1 and 2."""
global _use_two_workers
with _worker_lock:
current_workers = 2 if _use_two_workers else 1
_use_two_workers = not _use_two_workers # Toggle for next time
return current_workers

def rerank(documents: List[Document], query: str) -> List[Document]:
"""Ingest more metadata and rerank documents using BM25 with alternating worker counts."""
start = time.time()
if not documents:
return []

meta_start = time.time()
full_docs = []

# Get the worker count for this specific call
worker_count = get_current_worker_count()
logging.info(f"Processing with {worker_count} worker{'s' if worker_count > 1 else ''}")

# Process documents in parallel using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
with concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) as executor:
# Submit all document processing tasks
future_to_doc = {
executor.submit(process_single_document, doc): doc
Expand Down Expand Up @@ -205,7 +222,7 @@ def RAG(llm: Any, query: str,vectorstore:PineconeVectorStore, top: int = 10, k:
First, reason about the answer between <REASONING></REASONING> headers,
based on the context determine if there is sufficient material for answering the exact question,
return either <VALID>YES</VALID> or <VALID>NO</VALID>
then return a response between <RESPONSE></RESPONSE> headers, your response should be well formatted and an individual summary of each piece of relevant context:
then return a response between <RESPONSE></RESPONSE> headers:
Here is an example
<EXAMPLE>
<QUERY>Are pineapples a good fuel for cars?</QUERY>
Expand Down

0 comments on commit 0716283

Please sign in to comment.