diff --git a/backend/config.yaml b/backend/config.yaml index b5da1f4..c1b0987 100644 --- a/backend/config.yaml +++ b/backend/config.yaml @@ -1,35 +1,34 @@ LLMConfig: &LLMConfig - source: AzureChatOpenAI + source: ChatVertexAI source_config: - openai_api_type: azure - openai_api_key: {{ OPENAI_API_KEY }} - openai_api_base: https://genai-ds.openai.azure.com/ - openai_api_version: 2023-07-01-preview - deployment_name: gpt4 + project_id: genai-vertex + model_name: chat-bison@001 + location: europe-west1 + max_output_tokens: 512 temperature: 0.1 VectorStoreConfig: &VectorStoreConfig - source: Chroma + source: MatchingEngine source_config: + project_id: genai-vertex + gcs_bucket_name: rag_accelerator_matchin_engine persist_directory: vector_database/ collection_metadata: hnsw:space: cosine - retriever_search_type: similarity_score_threshold + retriever_search_type: similarity retriever_config: - k: 20 - score_threshold: 0.5 + k: 10 insertion_mode: null EmbeddingModelConfig: &EmbeddingModelConfig - source: OpenAIEmbeddings + source: VertexAIEmbeddings source_config: - openai_api_type: azure - openai_api_key: {{ EMBEDDING_API_KEY }} - openai_api_base: https://poc-openai-artefact.openai.azure.com/ - deployment: embeddings - chunk_size: 500 + location: europe-west1 + project_id: genai-vertex + model_name: textembedding-gecko@001 + DatabaseConfig: &DatabaseConfig database_url: {{ DATABASE_URL }} diff --git a/backend/rag_components/document_loader.py b/backend/rag_components/document_loader.py index 87bbfdf..8b06471 100644 --- a/backend/rag_components/document_loader.py +++ b/backend/rag_components/document_loader.py @@ -50,4 +50,4 @@ def get_loaders() -> List[str]: for _, obj in inspect.getmembers(langchain.document_loaders): if inspect.isclass(obj): loaders.append(obj.__name__) - return loaders + return loaders + ["TextLoader"] diff --git a/backend/rag_components/matching_engine/get_matching_engine.py b/backend/rag_components/matching_engine/get_matching_engine.py new file mode 100644 index 0000000..81c1f1b --- /dev/null +++ b/backend/rag_components/matching_engine/get_matching_engine.py @@ -0,0 +1,173 @@ +######################################################################################################################################################################## + +# REFERENCE : https://github.com/GoogleCloudPlatform/generative-ai/blob/main/language/use-cases/document-qa/question_answering_documents_langchain_matching_engine.ipynb + +######################################################################################################################################################################## + +import os +import time +import logging +import vertexai + +import json +import uuid +import numpy as np + +from typing import List, Tuple + +from matching_engine.matching_engine_utils import MatchingEngineUtils +from langchain_community.vectorstores.matching_engine import MatchingEngine +from langchain.embeddings import VertexAIEmbeddings + +from langchain_core.documents import Document +from langchain.text_splitter import RecursiveCharacterTextSplitter + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("VECTORSTORE") + + +vertexai.init(project=os.environ.get("PROJECT_ID"), location=os.environ.get("REGION")) + + +def rate_limit(max_per_minute): + """ + A generator function that ensures a maximum number of operations per minute. + This function is used to rate limit operations that are executed in a loop. + It calculates the time taken for an operation and sleeps for the remaining time + to ensure that the maximum number of operations per minute is not exceeded. + Args: + max_per_minute (int): The maximum number of operations that can be performed per minute. + Yields: + None: Yields None and sleeps for the calculated time period if necessary. + """ + period = 60 / max_per_minute + logger.info("Waiting") + while True: + before = time.time() + yield + after = time.time() + elapsed = after - before + sleep_time = max(0, period - elapsed) + if sleep_time > 0: + print(".", end="") + time.sleep(sleep_time) + + +class CustomVertexAIEmbeddings(VertexAIEmbeddings): + requests_per_minute: int + num_instances_per_batch: int + + # Overriding embed_documents method + def embed_documents(self, texts: List[str]): + limiter = rate_limit(self.requests_per_minute) + results = [] + docs = list(texts) + + while docs: + # Working in batches because the API accepts maximum 5 + # documents per request to get embeddings + head, docs = ( + docs[: self.num_instances_per_batch], + docs[self.num_instances_per_batch :], + ) + chunk = self.client.get_embeddings(head) + results.extend(chunk) + next(limiter) + return [r.values for r in results] + + + +def initialise_index_folder(path: str, embeddings_dimension: int = 768): + # dummy embedding + init_embedding = {"id": str(uuid.uuid4()), "embedding": list(np.zeros(embeddings_dimension))} + # dump embedding to a local file + with open("init_embeddings.json", "w") as f: + json.dump(init_embedding, f) + # write embedding to Cloud Storage + os.system(f"set -x && gsutil cp init_embeddings.json gs://{path}/init_embeddings.json") + + +def get_matching_engine_and_deploy_index( + index_name: str = "orange_index", + embeddings_qpm: int = 100, + embedding_num_batch: int = 5, + embeddings_dimension: int = 768, + embeddings_gcs_dir: str = "init_folder" +) -> Tuple[MatchingEngine, str, str]: + """ + Creates, deploy index and return vertex matching engine object (vectorstore). + Args: + index_name (str, optional): The name of the matching engine index that will be created. Defaults to "me_index". + embeddings_qpm (int, optional): The number of queries per minute for the embeddings. Defaults to 100. + embedding_num_batch (int, optional): The number of instances per batch for the embeddings. Defaults to 5. + embeddings_dimension (int, optional): number of dimensions for the embeddings + embeddings_gcs_dir (str, optional): Directory where the embeddings are stored. + + Returns: + MatchingEngineRetriever: A retriever to be used with langchain Chain objects mainly for Q&A. + """ + + embeddings = CustomVertexAIEmbeddings( + location=os.environ.get("REGION"), + project_id=os.environ.get("PROJECT_ID"), + requests_per_minute=embeddings_qpm, + num_instances_per_batch=embedding_num_batch, + ) + + # initialize file for index creation + initialise_index_folder(path=f"{os.environ.get('BUCKET_NAME')}/{embeddings_gcs_dir}") + + + # Create and deploy a matching engine endpoint + index_maker = MatchingEngineUtils(os.environ.get('PROJECT_ID'), os.environ.get('REGION'), index_name) + logger.info(f"Creating index from gs://{os.environ.get('BUCKET_NAME')}/{embeddings_gcs_dir}, this step can take a while ...") + index_maker.create_index( + embedding_gcs_uri=f"gs://{os.environ.get('BUCKET_NAME')}/{embeddings_gcs_dir}", + dimensions=embeddings_dimension, + index_update_method="batch", + index_algorithm="tree-ah", + ) + + logger.info("Deploying index, this step can take a while ...") + index_maker.deploy_index() + + # Expose matching engine to index + ME_INDEX_ID, ME_INDEX_ENDPOINT_ID = index_maker.get_index_and_endpoint() + mengine = MatchingEngine.from_components( + project_id=os.environ.get('PROJECT_ID'), + region=os.environ.get('REGION'), + gcs_bucket_name=os.environ.get("BUCKET_NAME"), + embedding=embeddings, + index_id=ME_INDEX_ID, + endpoint_id=ME_INDEX_ENDPOINT_ID, + ) + return mengine, ME_INDEX_ID, ME_INDEX_ENDPOINT_ID + + +def add_documents_to_matching_engine( + matching_engine: MatchingEngine, + documents: List[Document], + chunk_size: int = 1000, + chunk_overlap: int = 50, + separators: List[str]=["\n\n", "\n", ".", "!", "?", ",", " ", ""] +) -> None : + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + separators=separators + ) + #logger.info(f"Using {RecursiveCharacterTextSplitter} for chuncks creation ...") + doc_splits = text_splitter.split_documents(documents) + for idx, split in enumerate(doc_splits): + split.metadata["chunk"] = idx + texts = [doc.page_content for doc in doc_splits] + metadatas = [ + [ + {"namespace": "source", "allow_list": [doc.metadata["source"]]}, + {"namespace": "chunk", "allow_list": [str(doc.metadata["chunk"])]}, + ] + for doc in doc_splits + ] + logger.info("Adding documents to vectorstore ...") + matching_engine.add_texts(texts=texts, metadatas=metadatas) diff --git a/backend/rag_components/matching_engine/matching_engine_utils.py b/backend/rag_components/matching_engine/matching_engine_utils.py new file mode 100644 index 0000000..cfabdba --- /dev/null +++ b/backend/rag_components/matching_engine/matching_engine_utils.py @@ -0,0 +1,323 @@ +# Utility functions to create Index and deploy the index to an Endpoint +from datetime import datetime +import time +import logging + +from google.cloud import aiplatform_v1 as aipv1 +from google.protobuf import struct_pb2 + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("MATCHING ENGINE UTILS") + + +class MatchingEngineUtils: + def __init__(self, project_id: str, region: str, index_name: str): + self.project_id = project_id + self.region = region + self.index_name = index_name + self.index_endpoint_name = f"{self.index_name}-endpoint" + self.PARENT = f"projects/{self.project_id}/locations/{self.region}" + + ENDPOINT = f"{self.region}-aiplatform.googleapis.com" + # set index client + self.index_client = aipv1.IndexServiceClient( + client_options=dict(api_endpoint=ENDPOINT) + ) + # set index endpoint client + self.index_endpoint_client = aipv1.IndexEndpointServiceClient( + client_options=dict(api_endpoint=ENDPOINT) + ) + + def get_index(self): + # Check if index exists + request = aipv1.ListIndexesRequest(parent=self.PARENT) + page_result = self.index_client.list_indexes(request=request) + indexes = [ + response.name + for response in page_result + if response.display_name == self.index_name + ] + + if len(indexes) == 0: + return None + else: + index_id = indexes[0] + request = aipv1.GetIndexRequest(name=index_id) + index = self.index_client.get_index(request=request) + return index + + def get_index_endpoint(self): + # Check if index endpoint exists + request = aipv1.ListIndexEndpointsRequest(parent=self.PARENT) + page_result = self.index_endpoint_client.list_index_endpoints(request=request) + index_endpoints = [ + response.name + for response in page_result + if response.display_name == self.index_endpoint_name + ] + + if len(index_endpoints) == 0: + return None + else: + index_endpoint_id = index_endpoints[0] + request = aipv1.GetIndexEndpointRequest(name=index_endpoint_id) + index_endpoint = self.index_endpoint_client.get_index_endpoint( + request=request + ) + return index_endpoint + + def create_index( + self, + embedding_gcs_uri: str, + dimensions: int, + index_update_method: str = "streaming", + index_algorithm: str = "tree-ah", + ): + # Get index + index = self.get_index() + # Create index if does not exists + if index: + logger.info(f"Index {self.index_name} already exists with id {index.name}") + else: + logger.info(f"Index {self.index_name} does not exists. Creating index ...") + + if index_update_method == "streaming": + index_update_method = aipv1.Index.IndexUpdateMethod.STREAM_UPDATE + else: + index_update_method = aipv1.Index.IndexUpdateMethod.BATCH_UPDATE + + treeAhConfig = struct_pb2.Struct( + fields={ + "leafNodeEmbeddingCount": struct_pb2.Value(number_value=500), + "leafNodesToSearchPercent": struct_pb2.Value(number_value=7), + } + ) + if index_algorithm == "treeah": + algorithmConfig = struct_pb2.Struct( + fields={"treeAhConfig": struct_pb2.Value(struct_value=treeAhConfig)} + ) + else: + algorithmConfig = struct_pb2.Struct( + fields={ + "bruteForceConfig": struct_pb2.Value( + struct_value=struct_pb2.Struct() + ) + } + ) + config = struct_pb2.Struct( + fields={ + "dimensions": struct_pb2.Value(number_value=dimensions), + "approximateNeighborsCount": struct_pb2.Value(number_value=150), + "distanceMeasureType": struct_pb2.Value( + string_value="DOT_PRODUCT_DISTANCE" + ), + "algorithmConfig": struct_pb2.Value(struct_value=algorithmConfig), + "shardSize": struct_pb2.Value(string_value="SHARD_SIZE_SMALL"), + } + ) + metadata = struct_pb2.Struct( + fields={ + "config": struct_pb2.Value(struct_value=config), + "contentsDeltaUri": struct_pb2.Value( + string_value=embedding_gcs_uri + ), + } + ) + + index_request = { + "display_name": self.index_name, + "description": "Index for LangChain demo", + "metadata": struct_pb2.Value(struct_value=metadata), + "index_update_method": index_update_method, + } + + r = self.index_client.create_index(parent=self.PARENT, index=index_request) + logger.info( + f"Creating index with long running operation {r._operation.name}" + ) + + # Poll the operation until it's done successfullly. + logging.info("Poll the operation to create index ...") + while True: + if r.done(): + break + time.sleep(60) + print(".", end="") + + index = r.result() + logger.info( + f"Index {self.index_name} created with resource name as {index.name}" + ) + + return index + + def deploy_index( + self, + machine_type: str = "e2-standard-2", + min_replica_count: int = 2, + max_replica_count: int = 10, + network: str = None, + ): + try: + # Get index if exists + index = self.get_index() + if not index: + raise Exception( + f"Index {self.index_name} does not exists. Please create index before deploying." + ) + + # Get index endpoint if exists + index_endpoint = self.get_index_endpoint() + # Create Index Endpoint if does not exists + if index_endpoint: + logger.info( + f"Index endpoint {self.index_endpoint_name} already exists with resource " + + f"name as {index_endpoint.name} and endpoint domain name as " + + f"{index_endpoint.public_endpoint_domain_name}" + ) + else: + logger.info( + f"Index endpoint {self.index_endpoint_name} does not exists. Creating index endpoint..." + ) + index_endpoint_request = {"display_name": self.index_endpoint_name} + + if network: + index_endpoint_request["network"] = network + else: + index_endpoint_request["public_endpoint_enabled"] = True + + r = self.index_endpoint_client.create_index_endpoint( + parent=self.PARENT, index_endpoint=index_endpoint_request + ) + logger.info( + f"Deploying index to endpoint with long running operation {r._operation.name}" + ) + + logger.info("Poll the operation to create index endpoint ...") + while True: + if r.done(): + break + time.sleep(60) + print(".", end="") + + index_endpoint = r.result() + logger.info( + f"Index endpoint {self.index_endpoint_name} created with resource " + + f"name as {index_endpoint.name} and endpoint domain name as " + + f"{index_endpoint.public_endpoint_domain_name}" + ) + except Exception as e: + logger.error(f"Failed to create index endpoint {self.index_endpoint_name}") + raise e + + # Deploy Index to endpoint + try: + # Check if index is already deployed to the endpoint + for d_index in index_endpoint.deployed_indexes: + if d_index.index == index.name: + logger.info( + f"Skipping deploying Index. Index {self.index_name}" + + f"already deployed with id {index.name} to the index endpoint {self.index_endpoint_name}" + ) + return index_endpoint + + timestamp = datetime.now().strftime("%Y%m%d%H%M%S") + deployed_index_id = f"{self.index_name.replace('-', '_')}_{timestamp}" + deploy_index = { + "id": deployed_index_id, + "display_name": deployed_index_id, + "index": index.name, + "dedicated_resources": { + "machine_spec": { + "machine_type": machine_type, + }, + "min_replica_count": min_replica_count, + "max_replica_count": max_replica_count, + }, + } + logger.info(f"Deploying index with request = {deploy_index}") + r = self.index_endpoint_client.deploy_index( + index_endpoint=index_endpoint.name, deployed_index=deploy_index + ) + + # Poll the operation until it's done successfullly. + logger.info("Poll the operation to deploy index ...") + while True: + if r.done(): + break + time.sleep(60) + print(".", end="") + + logger.info( + f"Deployed index {self.index_name} to endpoint {self.index_endpoint_name}" + ) + + except Exception as e: + logger.error( + f"Failed to deploy index {self.index_name} to the index endpoint {self.index_endpoint_name}" + ) + raise e + + return index_endpoint + + def get_index_and_endpoint(self): + # Get index id if exists + index = self.get_index() + index_id = index.name if index else "" + + # Get index endpoint id if exists + index_endpoint = self.get_index_endpoint() + index_endpoint_id = index_endpoint.name if index_endpoint else "" + + return index_id, index_endpoint_id + + def delete_index(self): + # Check if index exists + index = self.get_index() + + # create index if does not exists + if index: + # Delete index + index_id = index.name + logger.info(f"Deleting Index {self.index_name} with id {index_id}") + self.index_client.delete_index(name=index_id) + else: + raise Exception("Index {index_name} does not exists.") + + def delete_index_endpoint(self): + # Check if index endpoint exists + index_endpoint = self.get_index_endpoint() + + # Create Index Endpoint if does not exists + if index_endpoint: + logger.info( + f"Index endpoint {self.index_endpoint_name} exists with resource " + + f"name as {index_endpoint.name} and endpoint domain name as " + + f"{index_endpoint.public_endpoint_domain_name}" + ) + + index_endpoint_id = index_endpoint.name + index_endpoint = self.index_endpoint_client.get_index_endpoint( + name=index_endpoint_id + ) + # Undeploy existing indexes + for d_index in index_endpoint.deployed_indexes: + logger.info( + f"Undeploying index with id {d_index.id} from Index endpoint {self.index_endpoint_name}" + ) + request = aipv1.UndeployIndexRequest( + index_endpoint=index_endpoint_id, deployed_index_id=d_index.id + ) + r = self.index_endpoint_client.undeploy_index(request=request) + response = r.result() + logger.info(response) + + # Delete index endpoint + logger.info( + f"Deleting Index endpoint {self.index_endpoint_name} with id {index_endpoint_id}" + ) + self.index_endpoint_client.delete_index_endpoint(name=index_endpoint_id) + else: + raise Exception( + f"Index endpoint {self.index_endpoint_name} does not exists." + ) diff --git a/backend/rag_components/prompts.py b/backend/rag_components/prompts.py index bbb7775..644cade 100644 --- a/backend/rag_components/prompts.py +++ b/backend/rag_components/prompts.py @@ -9,29 +9,35 @@ Rephrased question : """ -rag_system_prompt = """ -As a chatbot assistant, your mission is to respond to user inquiries in a precise and concise manner based on \ - the documents provided as input. -It is essential to respond in the same language in which the question was asked. Responses must be written in \ - a professional style and must demonstrate great attention to detail. -""" +rag_system_prompt = "" -respond_to_question = """ -As a chatbot assistant, your mission is to respond to user inquiries in a precise and concise manner based on \ - the documents provided as input. -It is essential to respond in the same language in which the question was asked. Responses must be written in \ - a professional style and must demonstrate great attention to detail. +respond_to_question = """ +Tu es un conseiller expert chez Orange, charge d'expliquer au client pourquoi une offre specifique lui est proposee en fonction de ses besoins. +Il est crucial de fournir une reponse detaillee qui illustre clairement pourquoi l'offre recommandee est la meilleure. +Reponds de maniere professionnelle en te basant uniquement sur les informations disponibles. Evite d'ajouter des informations non justifiees. +Les besoins du client seront indiques dans la question. -Respond to the question taking into account the following context. +Voici comment proceder pour recommander une offre : +- Evalue le besoin du client sur une echelle de 1 a 10. +- En tenant compte des offres disponibles : 'Livebox Fibre', 'Livebox Up fibre' et 'Livebox Max Fibre', choisis une offre qui correspond a l'evaluation du besoin. +- Formule une recommandation d'offre en te basant sur les deux points precedents et justifie ta recommandation en suivant le format ci-dessous avec des details en points : + <<< Nous sommes ravis de vous recommander l'offre <<< offre >>> car elle repond a << raison globale >>. + Plus en detail : + • Raison 1 ... + • Raison 2 ... + • ... >>> -{context} +- Si le client possede un grand logement, un repetiteur peut etre necessaire. Si c'est le cas, recommande le nombre de repetiteurs supplementaires a +ajouter a l'offre et justifie. Si un repetiteur n'est pas necessaire, ne le mentionne pas. -Question: {question} -""" +Fais attention a ne jamais mentionner le prix des offres, sois concis et evite les repetitions. -document_context = """ -Content: {page_content} +Question: +{question} + +Contexte: +{context} -Source: {source} +Reponse : """ diff --git a/backend/rag_components/vector_store.py b/backend/rag_components/vector_store.py index 9959ef8..220e9b8 100644 --- a/backend/rag_components/vector_store.py +++ b/backend/rag_components/vector_store.py @@ -1,7 +1,5 @@ import inspect - from langchain_community import vectorstores - from backend.config import RagConfig @@ -20,5 +18,15 @@ def get_vector_store(embedding_model, config: RagConfig): kwargs = {key: value for key, value in config.vector_store.source_config.items() if key in parameters.keys()} kwargs[embedding_param.name] = embedding_model + + if config.vector_store.source == "MatchingEngine": + import os + from google.cloud import storage + from google.cloud import aiplatform + aiplatform.init(project=os.environ.get("PROJECT_ID"), location=os.environ.get("REGION")) + kwargs["gcs_client"] = storage.Client(project=os.environ.get("PROJECT_ID")) + kwargs["index"] = aiplatform.MatchingEngineIndex(index_name=os.environ.get("INDEX_ID")) + kwargs["endpoint"] = aiplatform.MatchingEngineIndexEndpoint(index_endpoint_name=os.environ.get("ENDPOINT_ID")) + vector_store = vector_store_spec(**kwargs) return vector_store