-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.py
167 lines (139 loc) · 4.38 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
from korvus import Collection, Pipeline
from firecrawl import FirecrawlApp
import os
import time
import asyncio
from rich import print
from rich.pretty import pprint
from dotenv import load_dotenv
import argparse
# Load variables from our .env file
load_dotenv()
# Configure our program args
parser = argparse.ArgumentParser(description="Example Korvus x Firecrawl")
parser.add_argument(
"action", choices=["crawl", "search", "rag"], help="Action to perform"
)
# Initialize the FirecrawlApp with your API key
firecrawl = FirecrawlApp(api_key=os.environ["FIRECRAWL_API_KEY"])
# Define our Pipeline and Collection
pipeline = Pipeline(
"v0",
{
"markdown": {
"splitter": {"model": "markdown"},
"semantic_search": {
"model": "mixedbread-ai/mxbai-embed-large-v1",
},
},
},
)
collection = Collection(
"korvus-firecrawl-example-0", database_url=os.environ["KORVUS_DATABASE_URL"]
)
# Add our Pipeline to our Collection
async def add_pipeline():
await collection.add_pipeline(pipeline)
# Crawl with Firecrawl
def crawl():
print("Crawling...")
job = firecrawl.crawl_url(
os.environ["CRAWL_URL"],
params={
"limit": int(os.environ["CRAWL_LIMIT"]),
"scrapeOptions": {"formats": ["markdown"]},
},
poll_interval=30,
)
return job
# Do RAG
async def do_rag(user_query):
results = await collection.rag(
{
"CONTEXT": {
"vector_search": {
"query": {
"fields": {
"markdown": {
"query": user_query,
"parameters": {
"prompt": "Represent this sentence for searching relevant passages: "
},
}
},
},
"document": {"keys": ["id"]},
"rerank": {
"model": "mixedbread-ai/mxbai-rerank-base-v1",
"query": user_query,
"num_documents_to_rerank": 100,
},
"limit": 5,
},
"aggregate": {"join": "\n\n\n"},
},
"chat": {
"model": "meta-llama/Meta-Llama-3.1-405B-Instruct",
"messages": [
{
"role": "system",
"content": "You are a question and answering bot. Answer the users question given the context succinctly.",
},
{
"role": "user",
"content": f"Given the context\n\n:{{CONTEXT}}\n\nAnswer the question: {user_query}",
},
],
"max_tokens": 256,
},
},
pipeline,
)
return results
# Do search
async def do_search(user_query):
results = await collection.search(
{
"query": {
"semantic_search": {
"markdown": {
"query": user_query,
},
},
},
"limit": 5,
},
pipeline,
)
return results
# Get user input and call our callback
async def input_loop(callback):
while True:
query = input("Enter your query (or 'q' to quit): ")
if query.lower() == "q":
break
results = await callback(query)
print("\n[bold]Results:[/bold]\n")
pprint(results, max_length=2, max_string=100)
# Our main function
async def main():
args = parser.parse_args()
if args.action == "crawl":
# Add our Pipeline to our Collection
# We only ever need to do this once
# Calling it more than once does nothing
await add_pipeline()
# Crawl the website
results = crawl()
# Construct our documents to upsert
documents = [
{"id": data["metadata"]["sourceURL"], "markdown": data["markdown"]}
for data in results["data"]
]
# Upsert our documents
await collection.upsert_documents(documents)
elif args.action == "rag":
await input_loop(do_rag)
elif args.action == "search":
await input_loop(do_search)
asyncio.run(main())