-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathopenai_wrapper.py
301 lines (236 loc) · 8.4 KB
/
openai_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
import json
import os
import time
from icecream import ic
from pydantic import BaseModel
from tenacity import retry, stop_after_attempt, wait_random_exponential
from pathlib import Path
def setup_secret():
# do nothing if already set
if os.environ.get("OPENAI_API_KEY"):
return
secret_file = Path.home() / "gits/igor2/secretBox.json"
SECRETS = json.loads(secret_file.read_text())
os.environ["OPENAI_API_KEY"] = SECRETS["openai"]
def setup_gpt():
from openai import OpenAI
setup_secret()
return OpenAI(api_key=os.environ["OPENAI_API_KEY"])
client = setup_gpt()
class CompletionModel(BaseModel):
max_input_only_tokens: int
max_output_tokens: int
name: str
gpt4 = CompletionModel(
max_input_only_tokens=100 * 1000,
max_output_tokens=16 * 1000,
name="gpt-4o-2024-11-20",
)
gpt35 = CompletionModel(
max_input_only_tokens=12 * 1000,
max_output_tokens=4 * 1000,
name="gpt-3.5-turbo-0125",
)
def get_model_type(u4: bool) -> CompletionModel:
if u4:
return gpt4
else:
return gpt35
text_model_gpt_4 = "gpt-4o-2024-05-13"
gpt_4_tokens = 100000
gpt_4_input_tokens = 100 * 1000
gpt_4_output_tokens = 100 * 1000
text_model_gpt35 = "gpt-3.5-turbo-1106"
gpt_3_5_tokens = 16000
def model_to_max_tokens(model):
model_to_tokens = {text_model_gpt_4: gpt_4_tokens, text_model_gpt35: gpt_3_5_tokens}
return model_to_tokens[model]
def get_model(u4):
model = ""
if u4:
model = text_model_gpt_4
else:
model = text_model_gpt35
return model
def get_remaining_output_tokens(model: CompletionModel, prompt: str):
# For symetric models, max_input_only_tokens= 0 and max_output_tokens = the full context window
# For asymmetrics models, max_output_tokens = full context_window - max_input_only_tokens
input_tokens = num_tokens_from_string(prompt, "cl100k_base")
# If you only used input_context only tokens, don't remove anything f+ 100
output_tokens_consumed = max((input_tokens - model.max_input_only_tokens), 0)
return model.max_output_tokens - output_tokens_consumed
def choose_model(u4, tokens=0):
model = "SPECIFY_MODEL"
if u4:
model = text_model_gpt_4
else:
model = text_model_gpt35
is_token_count_the_default = tokens == 0 # TBD if we can do it without hardcoding.
if is_token_count_the_default:
tokens = model_to_max_tokens(model)
return model, tokens
def remaining_response_tokens(model, system_prompt, user_prompt):
tokens = model_to_max_tokens(model)
input_tokens = (
num_tokens_from_string(user_prompt + system_prompt, "cl100k_base") + 100
) # too lazy to count the messages stuf
output_tokens = tokens - input_tokens
return output_tokens
def num_tokens_from_string(string: str, encoding_name: str = "cl100k_base") -> int:
"""Returns the number of tokens in a text string."""
import tiktoken
encoding = tiktoken.get_encoding(encoding_name)
num_tokens = len(encoding.encode(string))
num_tokens = num_tokens + 1 # for newline
return num_tokens
def ask_gpt(
prompt_to_gpt="Make a rhyme about Dr. Seuss forgetting to pass a default paramater",
tokens: int = 0,
u4=False,
debug=False,
):
return ask_gpt_n(prompt_to_gpt, tokens=tokens, u4=u4, debug=debug, n=1)[0]
@retry(
wait=wait_random_exponential(min=1, max=60),
stop=stop_after_attempt(3),
)
def ask_gpt_n(
prompt_to_gpt="Make a rhyme about Dr. Seuss forgetting to pass a default paramater",
tokens: int = 0,
u4=False,
debug=False,
n=1,
):
text_model_best, tokens = choose_model(u4)
messages = [
{"role": "system", "content": "You are a really good improv coach."},
{"role": "user", "content": prompt_to_gpt},
]
model = get_model_type(u4)
output_tokens = get_remaining_output_tokens(model, prompt_to_gpt)
text_model_best = model.name
if debug:
ic(text_model_best)
ic(tokens)
ic(output_tokens)
start = time.time()
responses = n
response_contents = ["" for _ in range(responses)]
for chunk in client.chat.completions.create( # type: Ignore
model=text_model_best,
messages=messages, # type: ignore
max_tokens=output_tokens,
n=responses,
temperature=0.7,
stream=True,
):
if "choices" not in chunk:
continue
for elem in chunk["choices"]: # type: ignore
delta = elem["delta"]
delta_content = delta.get("content", "")
response_contents[elem["index"]] += delta_content
if debug:
out = f"All chunks took: {int((time.time() - start)*1000)} ms"
ic(out)
# hard code to only return first response
return response_contents
def get_ell_model(
openai: bool = False,
openai_cheap: bool = False,
google: bool = False,
claude: bool = False,
llama: bool = False,
) -> str:
"""
See changes in diff
"""
# if more then one is true, exit and fail
count_true = sum([openai, google, claude, llama, openai_cheap])
if count_true > 1:
print("Only one model can be selected")
exit(1)
if count_true == 0:
# default to openai
openai = True
if google:
raise NotImplementedError("google")
elif claude:
return "claude-3-5-sonnet-20241022"
elif llama:
return "llama-3.2-90b-vision-preview"
elif openai_cheap:
return "gpt-4o-mini"
else:
return gpt4.name
def openai_func(cls):
return {
"type": "function",
"function": {"name": cls.__name__, "parameters": cls.model_json_schema()},
}
def tool_choice(fn):
r = {"type": "function", "function": {"name": fn["function"]["name"]}}
ic(r)
return r
def get_youtube_transcript(url):
from yt_dlp import YoutubeDL
ic("Downloading captions from youtube")
# random tmp file name for transcript
transcript_file_base = f"transcript_{int(time.time())}"
transcript_file_name = f"{transcript_file_base}.en.vtt"
transcript_file_template = f"{transcript_file_base}.%(ext)s"
ydl_opts = {
"skip_download": True, # We only want to download the transcript, not the video
"writesubtitles": True, # Download subtitles
"subtitleslangs": [
"en"
], # Specify the language of the subtitles, e.g., 'en' for English
"subtitlesformat": "vtt", # Format of the subtitles
"writeautomaticsub": True,
"outtmpl": transcript_file_template, # Output file name
"quiet": True, # Suppress output to stdout
"no_warnings": True, # Suppress warnings
}
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(
url, download=False
) # Extract video info without downloading the video
subtitles = info.get("subtitles", {})
automatic_captions = info.get("automatic_captions", {})
if "en" in subtitles or "en" in automatic_captions:
ydl.download([url]) # Download the transcript
else:
raise ValueError("No English subtitles found.")
subtitle_file = transcript_file_name
with open(subtitle_file, "r", encoding="utf-8") as f:
transcript = f.read()
# erase the transcript file after reading
Path(transcript_file_name).unlink(missing_ok=True)
# # Clean up the transcript
# import re
# transcript = re.sub(r'WEBVTT\n\n', '', transcript)
# transcript = re.sub(r'\d{2}:\d{2}:\d{2}\.\d{3} --> \d{2}:\d{2}:\d{2}\.\d{3}\n', '', transcript)
# transcript = re.sub(r'\n\n', ' ', transcript)
return transcript.strip()
def get_text_from_path_or_stdin(path):
import sys
from pathlib import Path
if not path: # read from stdin
return "".join(sys.stdin.readlines())
if (
path.startswith("https://youtu.be")
or path.startswith("https://youtube.com")
or path.startswith("https://www.youtube.com")
):
return get_youtube_transcript(path)
if path.startswith("http"):
import requests
import html2text
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
}
request = requests.get(path, headers=headers)
return html2text.html2text(request.text)
if path:
return Path(path).read_text()
return str(sys.stdin.readlines())