-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathIndexManager.py
107 lines (98 loc) · 5.32 KB
/
IndexManager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import string
import re
import asyncio
from elasticsearch import Elasticsearch
from llama_index.core import StorageContext
from elasticsearch import AsyncElasticsearch
import os
from llama_index.vector_stores.elasticsearch import ElasticsearchStore
from llama_index.core import (
VectorStoreIndex,
SimpleDirectoryReader,
)
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.postprocessor import SimilarityPostprocessor
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core.node_parser import (
SemanticSplitterNodeParser,
)
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core import get_response_synthesizer
from llama_index.embeddings.openai import OpenAIEmbedding
from concurrent.futures import ThreadPoolExecutor
from config import semantic_buffer_size, semantic_breakpoint_percentile_threshold, embedding_model, top_k_similar, cutoff
class IndexManager:
def __init__(self, pdfs_dir, elasticsearch_endpoint_url, index_name):
self.pdfs_dir = pdfs_dir
self.elasticsearch_endpoint_url = elasticsearch_endpoint_url
self.index_name = index_name
self.es_client = AsyncElasticsearch(elasticsearch_endpoint_url)
self.vector_store = ElasticsearchStore(index_name=self.index_name, es_url=self.elasticsearch_endpoint_url)
async def check_index_exists(self):
return await self.es_client.indices.exists(index=self.index_name)
async def load_data(self):
def load_data_sync():
return SimpleDirectoryReader(self.pdfs_dir).load_data()
# Execute Synchronously in Asynchronous Context: loop.run_in_executor(None, load_data_sync) schedules the synchronous function
# load_data_sync to be executed in the default executor (which is typically a thread pool).
# This method returns a Future object, and await is used to wait for the Future object to complete.
# why we use? To run synchronous functions asynchronously
# Synchronous functions are those that execute sequentially, meaning the program waits for the function to complete before moving on to the next line of code.
# Asynchronous functions, on the other hand, allow the program to continue executing other code while waiting for the function to complete.
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, load_data_sync)
async def process_documents(self, documents):
embed_model = OpenAIEmbedding(model=embedding_model)
splitter = SemanticSplitterNodeParser(
buffer_size=semantic_buffer_size, breakpoint_percentile_threshold=semantic_breakpoint_percentile_threshold, embed_model=embed_model
)
nodes = splitter.get_nodes_from_documents(documents)
print(f"Extracted {len(nodes)} nodes from the documents with number of pages: {len(documents)}")
return nodes
async def initialize_index(self, nodes):
storage_context = StorageContext.from_defaults(vector_store=self.vector_store)
index = VectorStoreIndex(nodes=nodes, storage_context=storage_context)
print("New index initialized and persisted.")
return index
async def get_or_create_index(self):
if await self.check_index_exists():
print(f"Loading existing index '{self.index_name}' from Elasticsearch...")
return VectorStoreIndex.from_vector_store(self.vector_store)
else:
print(f"No existing index found. Initializing new index with name: {self.index_name}...")
documents = await self.load_data()
nodes = await self.process_documents(documents)
return await self.initialize_index(nodes)
async def create_query_engine(self):
index = await self.get_or_create_index()
retriever = VectorIndexRetriever(index=index, similarity_top_k=top_k_similar)
response_synthesizer = get_response_synthesizer()
query_engine = RetrieverQueryEngine(
retriever=retriever,
response_synthesizer=response_synthesizer,
node_postprocessors=[SimilarityPostprocessor(similarity_cutoff=cutoff)]
)
return query_engine
@staticmethod
def simple_format_response_and_sources(response):
# Check if the response has an attribute 'response', and get its value
primary_response = getattr(response, 'response', '')
# Create the output dictionary with the primary response
output = {"response": primary_response}
# Check if the response has an attribute 'source_nodes' and process it
sources = []
if hasattr(response, 'source_nodes'):
for node in response.source_nodes:
node_data = getattr(node, 'node', None)
if node_data:
metadata = getattr(node_data, 'metadata', {})
text = getattr(node_data, 'text', '')
text = re.sub(r'\n\n|\n|\u2028', lambda m: {'\n\n': '\u2028', '\n': ' ', '\u2028': '\n\n'}[m.group()], text)
source_info = {
"file": metadata.get('file_name', 'N/A'),
"page": metadata.get('page_label', 'N/A'),
"text": text
}
sources.append(source_info)
output['sources'] = sources
return output