Skip to content

Commit

Permalink
Update gen.py
Browse files Browse the repository at this point in the history
Added advanced tokenization methods with a few more controls.
  • Loading branch information
Inserian authored Jan 28, 2025
1 parent f893b91 commit e26f693
Showing 1 changed file with 205 additions and 49 deletions.
254 changes: 205 additions & 49 deletions gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,162 @@

"""
Script to Generate Vocabulary from a Large Corpus or Dataset and Update a Model Qelm/JSON with a Tkinter GUI.
You can alter this in any way as it's very rudi and may not reflect current models as the base may change consistantly.
I have included multiple different tokenization methods, limiting this might work better.
This is still rudi for just incase you need a new vocab for qelm
"""

import json
import os
import tkinter as tk
from tkinter import filedialog, messagebox, scrolledtext, ttk
from nltk.tokenize import word_tokenize
from tkinter import filedialog, messagebox, scrolledtext
from collections import Counter
import numpy as np
import nltk
from nltk.tokenize import word_tokenize, RegexpTokenizer
from nltk.stem import WordNetLemmatizer

# Download necessary NLTK data
nltk.download('punkt', quiet=True)
nltk.download('wordnet', quiet=True)


def build_subword_tokenizer(corpus_tokens, vocab_size, special_tokens=None):

# Start with each token as an individual 'symbol'
token_freq = Counter(corpus_tokens)

# If special tokens are provided, reserve them at the start of the vocabulary
if special_tokens is None:
special_tokens = []

# Convert tokens into a list of characters (sub-tokens)
subword_freq = Counter()
for token, freq in token_freq.items():
chars = tuple(list(token)) # store as a tuple to be hashable
subword_freq[chars] += freq

# Merging procedure
def get_most_frequent_pair(freq_dict):
pairs = Counter()
for seq, freq in freq_dict.items():
if len(seq) < 2:
continue
# Count adjacent pairs
for i in range(len(seq) - 1):
pairs[(seq[i], seq[i+1])] += freq
if not pairs:
return None
return pairs.most_common(1)[0][0] # return the pair with the highest freq

# Initialize a merges list which will store merges performed
merges = []
current_vocab_size = len(subword_freq)

max_merges = vocab_size - len(special_tokens)

while current_vocab_size < max_merges:
pair_to_merge = get_most_frequent_pair(subword_freq)
if not pair_to_merge:
break
merges.append(pair_to_merge)
# Merge the pair in subword_freq
new_subword_freq = Counter()
for seq, freq in subword_freq.items():
new_seq = []
i = 0
while i < len(seq):
if i < len(seq) - 1 and seq[i] == pair_to_merge[0] and seq[i+1] == pair_to_merge[1]:
# Merge into a single subword
new_seq.append(''.join(pair_to_merge))
i += 2
else:
new_seq.append(seq[i])
i += 1
new_subword_freq[tuple(new_seq)] += freq
subword_freq = new_subword_freq
current_vocab_size += 1

# e.g., if we ended with subword_freq containing ('qu', 'an', 't', 'um'), we have these subwords
unique_symbols = set()
for seq in subword_freq:
unique_symbols.update(seq)

# Insert special tokens first
token_to_id = {}
idx_counter = 0
for sp_token in special_tokens:
token_to_id[sp_token] = idx_counter
idx_counter += 1

for sym in sorted(unique_symbols):
# Skip if it's already a special token
if sym not in token_to_id:
token_to_id[sym] = idx_counter
idx_counter += 1

return token_to_id


def tokenize_bpe_like(text):

tokenizer = RegexpTokenizer(r'\w+')
tokens = tokenizer.tokenize(text.lower())
return tokens


def generate_vocabulary_from_corpus(
corpus: str,
vocab_size: int,
tokenization_method: str = 'basic',
use_lemmatization: bool = False
) -> dict:

def generate_vocabulary_from_corpus(corpus: str, vocab_size: int) -> dict:
"""
Generate a vocabulary and token-to-ID mapping from the given text corpus.
"""
if not corpus.strip():
raise ValueError("Input corpus is empty or contains no valid text.")

# Tokenize the corpus
tokens = word_tokenize(corpus.lower())
# Some standard special tokens; can be expanded as needed.
SPECIAL_TOKENS = ["[PAD]", "[UNK]", "[CLS]", "[SEP]", "[MASK]"]

if tokenization_method == 'basic':
# Basic tokenization (NLTK word_tokenize)
tokens = word_tokenize(corpus.lower())
elif tokenization_method == 'bpe':
# BPE-like tokenization (two-step: naive word split + subword merges)
tokens = tokenize_bpe_like(corpus)
elif tokenization_method == 'char':
# Character-based tokenization
tokens = list(corpus.replace(" ", "_")) # replace spaces with underscores to preserve them
else:
raise ValueError(f"Unsupported tokenization method: {tokenization_method}")

if use_lemmatization:
lemmatizer = WordNetLemmatizer()
# For BPE-like or char-based, lemmatization doesn't always make sense.
if tokenization_method in ['basic', 'bpe']:
new_tokens = []
for tk in tokens:
if tk.isalpha():
new_tokens.append(lemmatizer.lemmatize(tk))
else:
new_tokens.append(tk)
tokens = new_tokens

if tokenization_method == 'bpe':
token_to_id = build_subword_tokenizer(tokens, vocab_size, special_tokens=SPECIAL_TOKENS)
return token_to_id
else:
if not tokens:
raise ValueError("Tokenization produced no tokens. Ensure the input corpus contains valid text.")

if not tokens:
raise ValueError("Tokenization produced no tokens. Ensure the input corpus contains valid text.")
token_freq = Counter(tokens)

# Count token frequencies
token_freq = Counter(tokens)
# Insert special tokens with high frequency to ensure they appear
for sp_tok in SPECIAL_TOKENS:
token_freq[sp_tok] = 999999999

# Select the most common tokens based on vocab_size
most_common_tokens = [token for token, _ in token_freq.most_common(vocab_size)]

# Create token-to-ID mapping
token_to_id = {token: idx for idx, token in enumerate(most_common_tokens)}
return token_to_id
# Take the most common tokens
most_common_tokens = [token for token, _ in token_freq.most_common(vocab_size)]
token_to_id = {token: idx for idx, token in enumerate(most_common_tokens)}
return token_to_id


def update_model_with_vocabulary(json_file: str, token_to_id: dict):
Expand Down Expand Up @@ -72,18 +190,36 @@ def update_model_with_vocabulary(json_file: str, token_to_id: dict):
print(f"Model updated successfully with vocabulary. Saved to: {json_file}")


def validate_input_text(text: str, token_to_id: dict):
"""
Validate that the input text contains tokens present in the vocabulary.
"""
tokens = word_tokenize(text.lower())
valid_tokens = [token for token in tokens if token in token_to_id]
def validate_input_text(text: str, token_to_id: dict, tokenization_method: str = 'basic'):

if not valid_tokens:
missing_tokens = [token for token in tokens if token not in token_to_id]
if "[UNK]" not in token_to_id:
raise ValueError("Vocabulary missing the [UNK] special token. Cannot handle out-of-vocab tokens properly.")

text = text.lower()

if tokenization_method == 'basic':
tokens = word_tokenize(text)
elif tokenization_method == 'bpe':
# Check each subword element against the vocab; if not present, we label it as [UNK].
tokens = tokenize_bpe_like(text)
elif tokenization_method == 'char':
tokens = list(text.replace(" ", "_"))
else:
tokens = word_tokenize(text)

valid_tokens = []
missing_tokens = []

for tk in tokens:
if tk in token_to_id:
valid_tokens.append(tk)
else:
missing_tokens.append(tk)

if not valid_tokens and missing_tokens:
raise ValueError(
f"Input text contains no valid tokens. Missing tokens: {missing_tokens}. "
f"Consider expanding the corpus to include these words."
f"Input text contains tokens that are not in the vocabulary. Missing tokens: {missing_tokens}. "
f"Consider updating or expanding the corpus to include these words."
)

return valid_tokens
Expand All @@ -92,14 +228,16 @@ def validate_input_text(text: str, token_to_id: dict):
class VocabularyGUI:
def __init__(self, master):
self.master = master
master.title("Vocabulary Generator and Model Updater")
master.title("Advanced Vocabulary Generator and Model Updater")

# Initialize variables
self.json_file = tk.StringVar()
self.vocab_size = tk.IntVar(value=100000)
self.sample_input = tk.StringVar()
self.corpus_mode = tk.StringVar(value="manual")
self.dataset_path = tk.StringVar()
self.tokenization_method = tk.StringVar(value='basic')
self.use_lemmatization = tk.BooleanVar(value=False)

# Layout configuration
self.create_widgets()
Expand Down Expand Up @@ -141,20 +279,39 @@ def create_widgets(self):
self.vocab_entry = tk.Entry(vocab_frame, textvariable=self.vocab_size, width=10)
self.vocab_entry.pack(side=tk.LEFT, padx=5)

# Tokenization Method
tok_method_frame = tk.LabelFrame(self.master, text="Tokenization Method")
tok_method_frame.pack(padx=10, pady=5, fill=tk.X)

methods = [
("Basic (NLTK word_tokenize)", "basic"),
("BPE-like (Subword)", "bpe"),
("Character-based", "char")
]
for mtext, mval in methods:
tk.Radiobutton(tok_method_frame, text=mtext, variable=self.tokenization_method, value=mval).pack(anchor=tk.W)

# Lemmatization Toggle
lemma_frame = tk.Frame(self.master)
lemma_frame.pack(padx=10, pady=5, fill=tk.X)

tk.Checkbutton(lemma_frame, text="Use Lemmatization", variable=self.use_lemmatization).pack(anchor=tk.W)

# Text Corpus
self.corpus_frame = tk.Frame(self.master)
self.corpus_frame.pack(padx=10, pady=5, fill=tk.BOTH, expand=True)

tk.Label(self.corpus_frame, text="Text Corpus:").pack(anchor=tk.W)
self.corpus_text = scrolledtext.ScrolledText(self.corpus_frame, height=10)
self.corpus_text.pack(fill=tk.BOTH, expand=True)
# Insert default corpus

# Change the default corpus anyway you want, just ensure it includes a full alphabet
default_corpus = """
Quantum computing is an area of computing focused on developing computer technology
based on the principles of quantum theory. Language models aim to generate meaningful
text based on input, bridging the gap between technology and human interaction.
Great literature includes works like those of Shakespeare, Dickens, and Austen.
Great literature includes works like those of Shakespeare, Dickens, and Austen.
Scientific advancements span physics, biology, and chemistry. Everyday conversations
include phrases like "hello," "how are you?" and "what's the weather like today?"
Expand Down Expand Up @@ -235,9 +392,6 @@ def log(self, message):
self.log_text.config(state='disabled')

def load_corpus_from_dataset(self, path):
"""
Load text from a single file or all text files in a directory.
"""
if os.path.isfile(path):
if not path.lower().endswith('.txt'):
raise ValueError("Selected file is not a text file (*.txt).")
Expand All @@ -263,20 +417,26 @@ def generate_vocabulary(self):
self.log("Generating vocabulary...")
mode = self.corpus_mode.get()
vocab_size = self.vocab_size.get()
tok_method = self.tokenization_method.get()
lemma_flag = self.use_lemmatization.get()

try:
if mode == "manual":
corpus = self.corpus_text.get("1.0", tk.END)
token_to_id = generate_vocabulary_from_corpus(corpus, vocab_size)
self.log(f"Generated {len(token_to_id)} tokens for the vocabulary from manual corpus.")
else:
dataset_path = self.dataset_path.get()
if not dataset_path:
raise ValueError("Dataset path is not specified.")
corpus = self.load_corpus_from_dataset(dataset_path)
token_to_id = generate_vocabulary_from_corpus(corpus, vocab_size)
self.log(f"Generated {len(token_to_id)} tokens for the vocabulary from dataset.")


# Generate the vocabulary with advanced options
token_to_id = generate_vocabulary_from_corpus(
corpus,
vocab_size,
tokenization_method=tok_method,
use_lemmatization=lemma_flag
)
self.log(f"Generated {len(token_to_id)} tokens using {tok_method} tokenization. Lemmatization={lemma_flag}.")
messagebox.showinfo("Success", f"Vocabulary generated with {len(token_to_id)} tokens.")
self.generated_token_to_id = token_to_id # Store for later use
except ValueError as e:
Expand All @@ -286,10 +446,11 @@ def generate_vocabulary(self):
def validate_input(self):
self.log("Validating sample input...")
sample = self.sample_input.get()
tok_method = self.tokenization_method.get()
try:
if not hasattr(self, 'generated_token_to_id'):
raise ValueError("Vocabulary not generated yet. Please generate vocabulary first.")
valid_tokens = validate_input_text(sample, self.generated_token_to_id)
raise ValueError("Vocabulary not generated yet. Please generate the vocabulary first.")
valid_tokens = validate_input_text(sample, self.generated_token_to_id, tokenization_method=tok_method)
self.log(f"Valid tokens: {valid_tokens}")
messagebox.showinfo("Validation Success", f"Valid tokens:\n{valid_tokens}")
except ValueError as e:
Expand All @@ -301,7 +462,7 @@ def update_model(self):
json_file = self.json_file.get()
try:
if not hasattr(self, 'generated_token_to_id'):
raise ValueError("Vocabulary not generated yet. Please generate vocabulary first.")
raise ValueError("Vocabulary not generated yet. Please generate the vocabulary first.")
if not json_file:
raise ValueError("JSON file path is not specified.")
update_model_with_vocabulary(json_file, self.generated_token_to_id)
Expand All @@ -311,11 +472,6 @@ def update_model(self):
self.log(f"Error while updating model JSON: {e}")
messagebox.showerror("Update Error", f"Failed to update model JSON:\n{e}")


# ============================
# Entry Point
# ============================

def main():
root = tk.Tk()
gui = VocabularyGUI(root)
Expand Down

0 comments on commit e26f693

Please sign in to comment.