diff --git a/.gitignore b/.gitignore index c450cb7f..0ada464d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,10 @@ .env *__pycache__* *scratch* -*relevance_assessment* \ No newline at end of file +*relevance_assessment* +palimpzest/* +paper_workloads/contracts/full_contract_txt* +paper_workloads/contracts/sample_contract_txt* +*.xlsx +*.csv +paper_workloads/* \ No newline at end of file diff --git a/README.md b/README.md index 30709f75..44f19460 100644 --- a/README.md +++ b/README.md @@ -30,8 +30,23 @@ To install Motion, clone this repository and install the required dependencies: ```bash git clone https://github.com/shreyashankar/motion-v3.git -cd motion -pip install -r requirements.txt +cd motion-v3 +pip install poetry +make install +``` + +Then set up a .env file in your repository with the following: + +```bash +OPENAI_API_KEY=your_openai_api_key +``` + +or you can set the OPENAI_API_KEY environment variable in your environment. + +Then run the basic test suite to ensure everything is working: + +```bash +make tests-basic ``` ## Usage @@ -57,6 +72,12 @@ The configuration file is a YAML document with the following top-level keys: Motion supports various operation types, each designed for specific data transformation tasks. All prompt templates used in these operations are Jinja2 templates, allowing for the use of loops, conditionals, and other Jinja2 features to create dynamic prompts based on input data. +All operations have the following optional parameters: + +- `optimize`: Boolean flag. If true, the operation will be optimized. Default is True. +- `recursively_optimize`: Boolean flag. If true, the operation will be recursively optimized (e.g., reduces generated in map operations will be optimized). Default is false. I recommend not settting this to true unless you are willing to babysit the optimizer. +- `sample_size`: Integer. The number of samples to use for the operation, if you want to run it only on a sample of data. (Only applicable at runtime, not in optimization time.) + Here's an overview of the supported operation types: ### Map @@ -66,7 +87,7 @@ The Map operation applies a transformation to each item in the input data. Required parameters: - `type`: Must be set to `"map"`. -- `prompt`: The prompt template to use for the transformation. +- `prompt`: The prompt template to use for the transformation. Access variables with `input.keyname` - `output`: Schema definition for the output from the LLM. - `model` (optional): The language model to use, falls back to `default_model` if not specified. @@ -397,16 +418,18 @@ Required parameters: - `type`: Must be set to `"reduce"`. - `reduce_key`: The key to use for grouping data. This can be a single key (string) or a list of keys. -- `prompt`: The prompt template to use for the reduction operation. This template can access the grouped values using `{{ values }}` (a list of dictionary objects or records) and the reduce key using `{{ reduce_key }}`. +- `prompt`: The prompt template to use for the reduction operation. This template can access the grouped values using `{{ inputs }}` (a list of dictionary objects or records) and the reduce key using `{{ reduce_key }}`. - `output`: Schema definition for the output from the LLM. Optional parameters: +- `synthesize_resolve`: Boolean flag. If false, we will not synthesize a resolve operation in between a map and a reduce operation. Default is true. +- `synthesize_merge`: Boolean flag. If false, we will not synthesize a merge optimization (we will only rely on folding). Default is true. - `model`: The language model to use, falls back to `default_model` if not specified. - `input`: Specifies the schema or keys to subselect from each item or value to pass into the prompt. If omitted, all keys from the input items will be used. - `pass_through`: Boolean flag. If true, keys (not on input) from the first item in the group will be passed through to the output. Default is false. - `commutative`: Boolean flag. If true, the reduce operation is commutative, meaning the order of operations doesn't matter. This can enable further optimizations. Default is true. -- `fold_prompt`: A prompt template for incremental folding. This enables processing of large groups in smaller batches. The template should access the current reduced values using `{{ output.field_name }}` and the new batch of values using `{{ values }}`. +- `fold_prompt`: A prompt template for incremental folding. This enables processing of large groups in smaller batches. The template should access the current reduced values using `{{ output.field_name }}` and the new batch of values using `{{ inputs }}`. - `fold_batch_size`: The number of items to process in each fold operation when using incremental folding. - `merge_prompt`: A prompt template for merging the results of multiple fold operations. This is used when processing large groups in parallel. The template should access the list of intermediate results using `{{ outputs }}`. - `merge_batch_size`: The number of intermediate results to merge in each merge operation. The optimizers uses a default of 2 if it can find a good merge prompt. @@ -430,7 +453,7 @@ reduce_operation: reduce_key: category prompt: | Analyze the following items in the category '{{ reduce_key.category }}': - {% for item in values %} + {% for item in inputs %} - {{ item.name }}: ${{ item.price }} {% endfor %} @@ -465,7 +488,7 @@ reduce_operation: age: integer prompt: | Analyze the following group of values for the group '{{ reduce_key }}': - {% for value in values %} + {% for value in inputs %} - {{ value }} {% endfor %} @@ -487,7 +510,7 @@ reduce_operation: reduce_key: group prompt: | Analyze the following group of values for the group '{{ reduce_key }}': - {% for value in values %} + {% for value in inputs %} - {{ value }} {% endfor %} @@ -500,7 +523,7 @@ reduce_operation: Average: {{ output.avg }} New values to be folded in: - {% for value in values %} + {% for value in inputs %} - {{ value }} {% endfor %} @@ -531,7 +554,7 @@ Required parameters: - `type`: Must be set to `"resolve"`. - `comparison_prompt`: The prompt template to use for comparing potential matches. -- `resolution_prompt`: The prompt template to use for reducing matched entries. The matched entries are accessed via the `matched_entries` variable. +- `resolution_prompt`: The prompt template to use for reducing matched entries. The matched entries are accessed via the `inputs` variable. - `output`: Schema definition for the output from the LLM. This should include the resolved key. Optional parameters: @@ -564,7 +587,7 @@ resolve_operation: resolution_prompt: | Merge the following patient records into a single, consolidated record: - {% for entry in matched_entries %} + {% for entry in inputs %} Patient Record {{ loop.index }}: {{ entry | tojson }} @@ -713,55 +736,92 @@ If any of these validation rules fail, the output will be discarded and not pass ## Example Pipeline -Here's an example of a pipeline that performs sentiment analysis and word counting using a parallel map operation, then filters based on word count: +Here's an example of a pipeline that extracts themes from student survey responses, unnests the themes, and then summarizes the responses for each theme: ```yaml default_model: gpt-4o-mini +datasets: + student_submissions: + type: file + path: "data/student_survey_responses.json" # Assuming all items have a "survey_response" attribute + operations: - parallel_map_operation: - type: parallel_map - prompts: - - name: sentiment - prompt: "Analyze the sentiment of the following text: '{{ input.text }}'. Classify it as either positive, negative, or neutral." - output_keys: ["sentiment"] - - name: word_count - prompt: "Count the number of words in the following text: '{{ input.text }}'. Return the count as an integer." - output_keys: ["word_count"] + extract_themes: + type: map + prompt: | + I'm teaching a class on databases. Analyze the following student survey response: + + {{ input.survey_response }} + + Extract 2-3 main themes from this response. Return the themes as a list of strings. output: schema: - sentiment: string - word_count: integer + themes: list[str] + class_id: str validate: - - output["word_count"] > 0 - - output["sentiment"] in ["positive", "negative", "neutral"] + - len(output["themes"]) >= 2) + num_retries_on_validate_failure: 3 - filter_operation: - type: filter - prompt: "Determine if the word count {{ input.word_count }} is greater than 10. Return true if it is, false otherwise." - output: + unnest_themes: + type: unnest + unnest_key: themes + + resolve_themes: + type: resolve + embedding_model: text-embedding-3-small + blocking_threshold: 0.7 + limit_comparisons: 1000 # You can change this or remove it entirely + + comparison_prompt: | + Compare the following two themes extracted from student survey responses about a database class: + + Theme 1: {{ left.theme }} + Theme 2: {{ right.theme }} + + Are these themes essentially the same or very closely related? + resolution_prompt: | + You are merging similar themes from student survey responses about a database class. Here are the themes to merge: + + {% for theme in inputs %} + Theme {{ loop.index }}: {{ theme.theme }} + {% endfor %} + + Create a single, concise theme that captures the essence of all these themes. + output: # Merge prompt output. no need to define schema for comparison prompt output schema: - keep: boolean + theme: str + model: gpt-4o-mini -datasets: - sample_dataset: - type: file - path: "data/sample_data.json" + summarize_themes: + type: reduce + reduce_key: theme + prompt: | + I am teaching a class on databases. You are helping me analyze student survey responses. Summarize the responses for the theme: {{ inputs[0].theme }} + + Responses: + {% for item in inputs %} + Survey {{ loop.index }}: + - {{ item.survey_response }} + {% endfor %} + + Summarize the main points from the surveys expressed about this theme. Do not mention any names of students or any other identifying information. + output: + schema: + summary: str pipeline: steps: - - name: analyze_text - input: sample_dataset - operations: - - parallel_map_operation - - name: filter_long_texts - input: analyze_text + - name: extract_response_themes + input: student_submissions operations: - - filter_operation + - extract_themes + - unnest_themes + - summarize_themes output: type: file - path: "output/results.json" + path: "output/theme_summaries.json" # Your summaries will be saved to the summary key ``` To run this pipeline, save it as `pipeline.yaml` and execute: @@ -770,4 +830,4 @@ To run this pipeline, save it as `pipeline.yaml` and execute: motion pipeline.yaml ``` -This will process the data in `data/sample_data.json`, perform sentiment analysis and word counting in parallel, validate the results, filter out short texts, and save the results in `output/results.json`. +This will process the student submissions data, extract themes from each response, unnest the themes, summarize the responses for each theme, and save the theme summaries in `output/theme_summaries.json`. diff --git a/book_pipeline.yaml b/book_pipeline.yaml index 009b3c3c..26e34730 100644 --- a/book_pipeline.yaml +++ b/book_pipeline.yaml @@ -99,7 +99,7 @@ operations: Are these genres likely to be the same or closely related? resolution_prompt: | Given the following matched genre entries: - {% for entry in matched_entries %} + {% for entry in inputs %} Entry {{ loop.index }}: Genre: {{ entry.genre }} Example Book: {{ entry.title }} @@ -179,7 +179,7 @@ operations: prompt: | Summarize the books in the {{ reduce_key }} genre: Books: - {% for book in values %} + {% for book in inputs %} - Title: "{{ book.title }}" Word Count: {{ book.word_count }} Theme: "{{ book.theme }}" diff --git a/motion/builder.py b/motion/builder.py index f44032dd..4da4fdd4 100644 --- a/motion/builder.py +++ b/motion/builder.py @@ -135,6 +135,7 @@ def __init__( self.sample_size_map.update(self.config["optimizer_config"]["sample_sizes"]) self.status = None + self.step_op_to_optimized_ops = {} self.print_optimizer_config() @@ -239,7 +240,10 @@ def compute_sample_size( upstream_ops = [] for step_op in step_ops: if step_op != op_config.get("name"): - upstream_ops.append(step_op) + if step_op in self.step_op_to_optimized_ops: + upstream_ops.extend(self.step_op_to_optimized_ops[step_op]) + else: + upstream_ops.append(step_op) else: break @@ -291,7 +295,9 @@ def _insert_empty_resolve_operations(self): if op_type == "map": has_map = True map_op = op - elif op_type == "reduce": + elif op_type == "reduce" and self.config["operations"][op].get( + "synthesize_resolve", True + ): has_reduce = True reduce_op = op elif op_type == "resolve": @@ -428,7 +434,8 @@ def optimize(self): if step["name"] != step_name ] + [optimized_step] - # Save the result to datasets using the step name + self.step_op_to_optimized_ops[step_name] = optimized_step["operations"] + step_hash = ( hashlib.md5( json.dumps( @@ -458,7 +465,9 @@ def optimize(self): ): # Run the entire step input_data = self._run_partial_step( - step, step_operations, float("inf"), optimized_operations + step, + step_operations, + float("inf"), # TODO: FIX THIS ) self.datasets[step_hash] = copy.deepcopy(input_data) else: @@ -772,7 +781,20 @@ def _get_sample_data( data, op_config.get("reduce_key"), sample_size ) - return random.sample(data, min(sample_size, len(data))) + # Take the random 500 examples or all if less than 500 + initial_data = random.sample(data, min(500, len(data))) + + # Calculate counts for each example + char_counts = [len(str(item)) for item in initial_data] + total_counts = sum(char_counts) + + # Calculate weights based on word counts + weights = [count / total_counts for count in char_counts] + + # Perform weighted random sampling + return random.choices( + initial_data, weights=weights, k=min(sample_size, len(initial_data)) + ) def _get_reduce_sample( self, data: List[Dict[str, Any]], reduce_key: str, sample_size: int diff --git a/motion/cli.py b/motion/cli.py index b93c87ae..a67c127e 100644 --- a/motion/cli.py +++ b/motion/cli.py @@ -67,11 +67,7 @@ def clear_cache(): """ Clear the LLM cache stored on disk. """ - try: - cc() - typer.echo("Cache cleared successfully.") - except Exception as e: - typer.echo(f"An error occurred while clearing the cache: {str(e)}") + cc() if __name__ == "__main__": diff --git a/motion/operations/equijoin.py b/motion/operations/equijoin.py index 37990066..1a97dad6 100644 --- a/motion/operations/equijoin.py +++ b/motion/operations/equijoin.py @@ -13,7 +13,8 @@ import numpy as np from jinja2 import Template -from litellm import completion_cost, embedding, model_cost +from litellm import embedding, model_cost +from motion.utils import completion_cost from sklearn.metrics.pairwise import cosine_similarity from motion.operations.base import BaseOperation diff --git a/motion/operations/map.py b/motion/operations/map.py index 438f5d12..5fde29ea 100644 --- a/motion/operations/map.py +++ b/motion/operations/map.py @@ -6,7 +6,7 @@ from typing import Any, Dict, List, Optional, Tuple from jinja2 import Template -from litellm import completion_cost +from motion.utils import completion_cost from motion.operations.base import BaseOperation from motion.operations.utils import ( diff --git a/motion/operations/reduce.py b/motion/operations/reduce.py index 3aa5b189..fc6619ac 100644 --- a/motion/operations/reduce.py +++ b/motion/operations/reduce.py @@ -17,7 +17,8 @@ import jinja2 import numpy as np from jinja2 import Template -from litellm import completion_cost, embedding +from motion.utils import completion_cost +from litellm import embedding from sklearn.cluster import KMeans from sklearn.metrics.pairwise import cosine_similarity @@ -70,12 +71,12 @@ def syntax_check(self) -> None: The method performs the following checks: 1. Verifies the presence of all required keys in the configuration. 2. Validates the structure and content of the 'output' configuration, including its 'schema'. - 3. Checks if the main 'prompt' is a valid Jinja2 template and contains the required 'values' variable. + 3. Checks if the main 'prompt' is a valid Jinja2 template and contains the required 'inputs' variable. 4. If 'merge_prompt' is specified, ensures that 'fold_prompt' is also present. 5. If 'fold_prompt' is present, verifies the existence of 'fold_batch_size'. - 6. Validates the 'fold_prompt' as a Jinja2 template with required variables 'values' and 'output'. + 6. Validates the 'fold_prompt' as a Jinja2 template with required variables 'inputs' and 'output'. 7. If present, checks 'merge_prompt' as a valid Jinja2 template with required 'outputs' variable. - 8. Verifies types of various configuration values (e.g., 'fold_batch_size' as int). + 8. Verifies types of various configuration inputs (e.g., 'fold_batch_size' as int). 9. Checks for the presence and validity of optional configurations like 'model'. Raises: @@ -107,8 +108,8 @@ def syntax_check(self) -> None: jinja2.nodes.Name ) template_var_names = {var.name for var in template_vars} - if "values" not in template_var_names: - raise ValueError("Template must include the 'values' variable") + if "inputs" not in template_var_names: + raise ValueError("Template must include the 'inputs' variable") except Exception as e: raise ValueError(f"Invalid Jinja2 template in 'prompt': {str(e)}") @@ -131,7 +132,7 @@ def syntax_check(self) -> None: self.config["fold_prompt"] ).find_all(jinja2.nodes.Name) fold_template_var_names = {var.name for var in fold_template_vars} - required_vars = {"values", "output"} + required_vars = {"inputs", "output"} if not required_vars.issubset(fold_template_var_names): raise ValueError( f"Fold template must include variables: {required_vars}" @@ -352,9 +353,10 @@ def _get_embeddings( ) -> Tuple[List[List[float]], float]: embedding_model = value_sampling["embedding_model"] embedding_keys = value_sampling["embedding_keys"] - + if not embedding_keys: + embedding_keys = list(items[0].keys()) texts = [ - " ".join(str(item[key]) for key in embedding_keys if key in item) + " ".join(str(item[key]) for key in embedding_keys if key in item)[:10000] for item in items ] response = gen_embedding(embedding_model, texts) @@ -605,7 +607,7 @@ def _increment_fold( start_time = time.time() fold_prompt_template = Template(self.config["fold_prompt"]) fold_prompt = fold_prompt_template.render( - values=batch, + inputs=batch, output=current_output, reduce_key=dict(zip(self.config["reduce_key"], key)), ) @@ -734,7 +736,7 @@ def _batch_reduce( """ prompt_template = Template(self.config["prompt"]) prompt = prompt_template.render( - reduce_key=dict(zip(self.config["reduce_key"], key)), values=group_list + reduce_key=dict(zip(self.config["reduce_key"], key)), inputs=group_list ) item_cost = 0 diff --git a/motion/operations/resolve.py b/motion/operations/resolve.py index 673ccff4..de14c6c7 100644 --- a/motion/operations/resolve.py +++ b/motion/operations/resolve.py @@ -4,10 +4,12 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any, Dict, List, Tuple +import random import jinja2 from jinja2 import Template -from litellm import completion_cost, embedding +from motion.utils import completion_cost +from litellm import embedding from sklearn.metrics.pairwise import cosine_similarity from motion.operations.base import BaseOperation @@ -22,7 +24,11 @@ def compare_pair( - comparison_prompt: str, model: str, item1: Dict, item2: Dict + comparison_prompt: str, + model: str, + item1: Dict, + item2: Dict, + blocking_keys: List[str] = [], ) -> Tuple[bool, float]: """ Compares two items using an LLM model to determine if they match. @@ -36,6 +42,13 @@ def compare_pair( Returns: Tuple[bool, float]: A tuple containing a boolean indicating whether the items match and the cost of the comparison. """ + if blocking_keys: + if all( + key in item1 and key in item2 and item1[key].lower() == item2[key].lower() + for key in blocking_keys + ): + return True, 0 + prompt_template = Template(comparison_prompt) prompt = prompt_template.render(input1=item1, input2=item2) response = call_llm( @@ -43,7 +56,6 @@ def compare_pair( "compare", [{"role": "user", "content": prompt}], {"is_match": "bool"}, - console=self.console, ) output = parse_llm_response(response)[0] return output["is_match"], completion_cost(response) @@ -59,7 +71,7 @@ def syntax_check(self) -> None: 2. Ensures 'output' contains a 'schema' key. 3. Validates that 'schema' in 'output' is a non-empty dictionary. 4. Checks if 'comparison_prompt' is a valid Jinja2 template with 'input1' and 'input2' variables. - 5. If 'resolution_prompt' is present, verifies it as a valid Jinja2 template with 'matched_entries' variable. + 5. If 'resolution_prompt' is present, verifies it as a valid Jinja2 template with 'inputs' variable. 6. Optionally checks if 'model' is a string (if present). 7. Optionally checks 'blocking_keys' (if present, further checks are performed). @@ -106,9 +118,9 @@ def syntax_check(self) -> None: self.config["resolution_prompt"] ).find_all(jinja2.nodes.Name) reduction_var_names = {var.name for var in reduction_vars} - if "matched_entries" not in reduction_var_names: + if "inputs" not in reduction_var_names: raise ValueError( - "'resolution_prompt' must contain 'matched_entries' variable" + "'resolution_prompt' must contain 'inputs' variable" ) except Exception as e: raise ValueError(f"Invalid Jinja2 template: {str(e)}") @@ -149,6 +161,13 @@ def syntax_check(self) -> None: "'schema' in 'input' configuration must be a dictionary" ) + # Check limit_comparisons (optional) + if "limit_comparisons" in self.config: + if not isinstance(self.config["limit_comparisons"], int): + raise TypeError("'limit_comparisons' must be an integer") + if self.config["limit_comparisons"] <= 0: + raise ValueError("'limit_comparisons' must be a positive integer") + def execute(self, input_data: List[Dict]) -> Tuple[List[Dict], float]: """ Executes the resolve operation on the provided dataset. @@ -172,6 +191,7 @@ def execute(self, input_data: List[Dict]) -> Tuple[List[Dict], float]: blocking_threshold = self.config.get("blocking_threshold") blocking_conditions = self.config.get("blocking_conditions", []) input_schema = self.config.get("input", {}).get("schema", {}) + limit_comparisons = self.config.get("limit_comparisons") total_cost = 0 if len(input_data) == 0: @@ -244,23 +264,48 @@ def merge_clusters(item1, item2): for j in range(i + 1, len(input_data)) ] - # Filter pairs based on blocking rules - def should_compare(pair): + # Filter pairs based on blocking conditions + def meets_blocking_conditions(pair): i, j = pair - if find_cluster(i) == find_cluster(j): - return False - if blocking_threshold is not None: - if ( - cosine_similarity([embeddings[i]], [embeddings[j]])[0][0] - < blocking_threshold - ): - return False - if blocking_conditions: - if not is_match(input_data[i], input_data[j]): - return False - return True + return ( + is_match(input_data[i], input_data[j]) if blocking_conditions else True + ) + + blocked_pairs = list(filter(meets_blocking_conditions, all_pairs)) + + # Apply limit_comparisons to blocked pairs + if limit_comparisons is not None and len(blocked_pairs) > limit_comparisons: + self.console.log( + f"Randomly sampling {limit_comparisons} pairs out of {len(blocked_pairs)} blocked pairs." + ) + blocked_pairs = random.sample(blocked_pairs, limit_comparisons) - filtered_pairs = list(filter(should_compare, all_pairs)) + # If there are remaining comparisons, fill with highest cosine similarities + remaining_comparisons = ( + limit_comparisons - len(blocked_pairs) + if limit_comparisons is not None + else float("inf") + ) + if remaining_comparisons > 0 and blocking_threshold is not None: + cosine_pairs = [] + for i, j in all_pairs: + if (i, j) not in blocked_pairs and find_cluster(i) != find_cluster(j): + similarity = cosine_similarity([embeddings[i]], [embeddings[j]])[0][ + 0 + ] + if similarity >= blocking_threshold: + cosine_pairs.append((i, j, similarity)) + + if remaining_comparisons != float("inf"): + cosine_pairs.sort(key=lambda x: x[2], reverse=True) + additional_pairs = [ + (i, j) for i, j, _ in cosine_pairs[: int(remaining_comparisons)] + ] + blocked_pairs.extend(additional_pairs) + else: + blocked_pairs.extend((i, j) for i, j, _ in cosine_pairs) + + filtered_pairs = blocked_pairs # Calculate and print statistics total_possible_comparisons = len(input_data) * (len(input_data) - 1) // 2 @@ -291,6 +336,7 @@ def should_compare(pair): self.config.get("comparison_model", self.default_model), input_data[pair[0]], input_data[pair[1]], + blocking_keys, ): pair for pair in batch } @@ -322,9 +368,7 @@ def process_cluster(cluster): for item in cluster_items ] - resolution_prompt = reduction_template.render( - matched_entries=cluster_items - ) + resolution_prompt = reduction_template.render(inputs=cluster_items) reduction_response = call_llm( self.config.get("resolution_model", self.default_model), "reduce", diff --git a/motion/operations/utils.py b/motion/operations/utils.py index b6a5abe1..bcd6dccf 100644 --- a/motion/operations/utils.py +++ b/motion/operations/utils.py @@ -2,6 +2,7 @@ import os import hashlib import json +import shutil import threading from concurrent.futures import as_completed from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union @@ -9,19 +10,22 @@ from dotenv import load_dotenv from frozendict import frozendict from jinja2 import Template -from litellm import completion, completion_cost, embedding, model_cost +from litellm import completion, embedding, model_cost +from motion.utils import completion_cost from rich.console import Console from tqdm import tqdm from diskcache import Cache import tiktoken +from rich import print as rprint from motion.utils import count_tokens load_dotenv() # litellm.set_verbose = True MOTION_HOME_DIR = os.path.expanduser("~/.motion") -CACHE_DIR = os.path.join(MOTION_HOME_DIR, "llm_cache") -cache = Cache(CACHE_DIR) +CACHE_DIR = os.path.join(MOTION_HOME_DIR, "cache") +LLM_CACHE_DIR = os.path.join(MOTION_HOME_DIR, "llm_cache") +cache = Cache(LLM_CACHE_DIR) def freezeargs(func): @@ -119,7 +123,20 @@ def clear_cache(console: Console = Console()): try: cache.clear() cache.close() - console.log("[bold green]LLM cache cleared successfully.[/bold green]") + # Remove all files in the cache directory + cache_dir = CACHE_DIR + for filename in os.listdir(cache_dir): + file_path = os.path.join(cache_dir, filename) + try: + if os.path.isfile(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): + shutil.rmtree(file_path) + except Exception as e: + console.log( + f"[bold red]Error deleting {file_path}: {str(e)}[/bold red]" + ) + console.log("[bold green]Cache cleared successfully.[/bold green]") except Exception as e: console.log(f"[bold red]Error clearing cache: {str(e)}[/bold red]") @@ -253,6 +270,10 @@ def call_llm_with_validation( cost = 0.0 for i in range(num_tries): response = llm_call_fn(messages) + if isinstance(response, tuple): + response, curr_cost = response + cost += curr_cost + cost += completion_cost(response) parsed_output, result = validation_fn(response) @@ -457,10 +478,17 @@ def truncate_messages( mid_point = len(encoded_content) // 2 truncated_encoded = ( encoded_content[: mid_point - tokens_to_remove // 2] - + encoder.encode(" ... [content truncated] ... ") + + encoder.encode(f" ... [{tokens_to_remove} tokens truncated] ... ") + encoded_content[mid_point + tokens_to_remove // 2 :] ) truncated_content = encoder.decode(truncated_encoded) + # Calculate the total number of tokens in the original content + total_tokens = len(encoded_content) + + # Print the warning message using rprint + rprint( + f"[yellow]Warning:[/yellow] Cutting {tokens_to_remove} tokens from a prompt with {total_tokens} tokens..." + ) longest_message["content"] = truncated_content @@ -532,14 +560,37 @@ def call_llm_with_gleaning( validator_response = completion( model=model, messages=messages + [{"role": "user", "content": validator_prompt}], + response_format={ + "type": "json_schema", + "json_schema": { + "name": "response", + "strict": True, + "schema": { + "type": "object", + "properties": { + "should_refine": {"type": "boolean"}, + "improvements": {"type": "string"}, + }, + "required": ["should_refine", "improvements"], + "additionalProperties": False, + }, + }, + }, ) cost += completion_cost(validator_response) + # Parse the validator response + suggestion = json.loads(validator_response.choices[0].message.content) + if suggestion["should_refine"] == False: + break + + console.log(f"Improvements: {suggestion['improvements']}") + # Prompt for improvement improvement_prompt = f"""Based on the validation feedback: ``` -{validator_response.choices[0].message.content} +{suggestion['improvements']} ``` Please improve your previous response. Ensure that the output adheres to the required schema and addresses any issues raised in the validation.""" diff --git a/motion/optimizers/join_optimizer.py b/motion/optimizers/join_optimizer.py index ce3e977c..3122c3ed 100644 --- a/motion/optimizers/join_optimizer.py +++ b/motion/optimizers/join_optimizer.py @@ -5,7 +5,8 @@ from typing import Any, Dict, List, Optional, Tuple import numpy as np -from litellm import completion_cost, embedding, model_cost +from motion.utils import completion_cost +from litellm import embedding, model_cost from rich.console import Console from rich.prompt import Confirm from rich.status import Status @@ -300,7 +301,7 @@ def synthesize_resolution_prompt( system_prompt = f"""You are an AI assistant tasked with creating a resolution prompt for LLM-assisted entity resolution. Your task is to create a prompt that will be used to merge multiple duplicate keys into a single, consolidated key. The key(s) being resolved (known as the reduce_key) are {', '.join(reduce_key)}. - The duplicate keys will be provided in a list called 'matched_entries' in a Jinja2 template. + The duplicate keys will be provided in a list called 'inputs' in a Jinja2 template. """ if map_prompt: @@ -312,7 +313,7 @@ def synthesize_resolution_prompt( "content": f""" Create a resolution prompt for merging duplicate keys into a single key. The prompt should: 1. Be tailored to the specific domain and type of data being merged, based on the context provided. - 2. Use a Jinja2 template to iterate over the duplicate keys (accessed as 'matched_entries', where each item is a dictionary containing the reduce_key fields, which you can access as entry.reduce_key for each reduce_key in {reduce_key}). + 2. Use a Jinja2 template to iterate over the duplicate keys (accessed as 'inputs', where each item is a dictionary containing the reduce_key fields, which you can access as entry.reduce_key for each reduce_key in {reduce_key}). 3. Instruct to create a single, consolidated key from the duplicate keys. 4. Include guidelines for resolving conflicts (e.g., choosing the most recent, most complete, or most reliable information). 5. Specify that the output of the resolution prompt should conform to the given output schema: {json.dumps(output_schema, indent=2)} @@ -321,7 +322,7 @@ def synthesize_resolution_prompt( ``` Analyze the following duplicate entries: - {{% for key in matched_entries %}} + {{% for key in inputs %}} Entry {{{{ loop.index }}}}: {{{{ key | tojson }}}} @@ -901,6 +902,13 @@ def _compute_embeddings( if not keys: prompt_template = self.op_config.get("comparison_prompt", "") prompt_vars = extract_jinja_variables(prompt_template) + # Get rid of input, input1, input2 + prompt_vars = [ + var + for var in prompt_vars + if var not in ["input", "input1", "input2"] + ] + # strip all things before . in the prompt_vars keys += list(set([var.split(".")[-1] for var in prompt_vars])) if not keys: diff --git a/motion/optimizers/map_optimizer/config_generators.py b/motion/optimizers/map_optimizer/config_generators.py index 54348794..a0d0298f 100644 --- a/motion/optimizers/map_optimizer/config_generators.py +++ b/motion/optimizers/map_optimizer/config_generators.py @@ -143,6 +143,10 @@ def _get_split_config( f"[yellow]Breaking down operation {op_config['name']}[/yellow]" ) self.console.log(f"[cyan]Subprompt:[/cyan] {result['subprompt']}") + self.console.log( + f"[cyan]Split Output Schema:[/cyan] {result['split_output_schema']}" + ) + return result def _determine_metadata_needs( @@ -234,7 +238,7 @@ def _check_metadata_necessity( There are {num_words_before} words before this chunk and {num_words_after} words after this chunk in the full text. Full input sample: - {json.dumps(random.choice(input_data_sample), indent=2)} + {json.dumps(random.choice(input_data_sample), indent=2)[:1000]} Determine if metadata is needed to perform the subtask. @@ -324,6 +328,9 @@ def _get_metadata_config( max_threads=self.max_threads, console=self.console, ) + if "output_schema" not in result or result["output_schema"] == {}: + result["output_schema"] = {"metadata": "str"} + result["needs_metadata"] = True return result @@ -411,7 +418,7 @@ def _generate_chunk_sizes( split_key: str, input_data_sample: List[Dict[str, Any]], token_limit: int, - num_chunks: int = 6, + num_chunks: int = 8, ) -> List[int]: # Get the average document length avg_doc_length = sum( diff --git a/motion/optimizers/map_optimizer/operation_creators.py b/motion/optimizers/map_optimizer/operation_creators.py index 5412df86..3ab3dafc 100644 --- a/motion/optimizers/map_optimizer/operation_creators.py +++ b/motion/optimizers/map_optimizer/operation_creators.py @@ -191,7 +191,7 @@ def create_reduce_operation( return { "type": "reduce", "name": name, - "reduce_key": doc_id_key, + "reduce_key": [doc_id_key], "input": op_config["output"], # subselect keys "prompt": combine_prompt, "model": ( diff --git a/motion/optimizers/map_optimizer/optimizer.py b/motion/optimizers/map_optimizer/optimizer.py index 70af66b5..26fa206c 100644 --- a/motion/optimizers/map_optimizer/optimizer.py +++ b/motion/optimizers/map_optimizer/optimizer.py @@ -284,11 +284,12 @@ def optimize( print(traceback.format_exc()) # Add no change plan - results["no_change"] = ( - results["no_change"][0], - no_change_runtime, - results["no_change"][2], - ) + if not data_exceeds_limit: + results["no_change"] = ( + results["no_change"][0], + no_change_runtime, + results["no_change"][2], + ) # Create a table of scores sorted in descending order scores = sorted( diff --git a/motion/optimizers/map_optimizer/plan_generators.py b/motion/optimizers/map_optimizer/plan_generators.py index 020b3aa8..36679dbf 100644 --- a/motion/optimizers/map_optimizer/plan_generators.py +++ b/motion/optimizers/map_optimizer/plan_generators.py @@ -293,7 +293,7 @@ def task(): f"[bold]Evaluation score for {plan_name}:[/bold] {plan_evaluation_score}" ) - if plan_evaluation_score >= 0.7: # TODO: make this a parameter + if plan_evaluation_score >= 0.6: # TODO: make this a parameter self.console.log(f"Keeping configuration: {plan_name}") else: self.console.log(f"Pruning configuration: {plan_name}") @@ -307,7 +307,7 @@ def task(): plan = copy.deepcopy(base_operations) if self.is_filter or not op_config.get( - "recursively_optimize", True + "recursively_optimize", False ): plan.extend(smg_ops + [map_op] + unnest_ops + [reduce_op]) return plan_name, plan @@ -327,10 +327,11 @@ def task(): smg_ops + [map_op] + unnest_ops + optimized_reduce_ops ) except Exception as e: - self.console.log( - f"[yellow]Error optimizing reduce operation: {e}. Skipping...[/yellow]" - ) - return plan_name, [] + raise e + # self.console.log( + # f"[yellow]Error optimizing reduce operation: {e}. Skipping...[/yellow]" + # ) + # return plan_name, [] return plan_name, plan @@ -671,6 +672,26 @@ def _generate_parallel_plans( covered_keys.update(subtask["output_keys"]) missing_keys = output_schema_keys - covered_keys + # Attempt to add missing keys to the most appropriate subtask + if missing_keys: + self.console.log( + "[bold yellow]Warning:[/bold yellow] Some output schema keys are not covered by subtasks. Attempting to add them to the most appropriate subtask." + ) + for key in missing_keys: + # Find the subtask with the most similar existing output keys + best_subtask = max( + result["subtasks"], + key=lambda s: len(set(s["output_keys"]) & output_schema_keys), + ) + best_subtask["output_keys"].append(key) + covered_keys.add(key) + self.console.log( + f"[yellow]Added missing key '{key}' to subtask '{best_subtask['name']}'[/yellow]" + ) + + # Check again for any remaining missing keys + missing_keys = output_schema_keys - covered_keys + if missing_keys: raise ValueError( f"Trying to create a parallel map decomposition. The following output schema keys are not covered by any subtask: {missing_keys}" @@ -820,9 +841,9 @@ def _generate_chain_plans( for subtask in result["subtasks"]: subtask_output_keys.update(subtask["output_keys"]) - if output_schema_keys != subtask_output_keys: + if len(output_schema_keys - subtask_output_keys) > 0: raise ValueError( - "Not all output schema keys are covered by subtasks after correction attempt." + f"Not all output schema keys are covered by subtasks after correction attempt. Missing keys: {output_schema_keys - subtask_output_keys}" ) chain_plan = [] diff --git a/motion/optimizers/map_optimizer/prompt_generators.py b/motion/optimizers/map_optimizer/prompt_generators.py index 2dc3519e..d8778970 100644 --- a/motion/optimizers/map_optimizer/prompt_generators.py +++ b/motion/optimizers/map_optimizer/prompt_generators.py @@ -297,7 +297,7 @@ def _get_combine_prompt( 6. Determines whether the combine operation is commutative. Note: - The generated combine prompt is constrained to use only the 'values' + The generated combine prompt is constrained to use only the 'inputs' variable, which contains all chunk results. It must be a valid Jinja2 template and avoid using complex logic or filters. @@ -330,10 +330,10 @@ def _get_combine_prompt( Modify the original prompt to be a prompt that will combine these chunk results to accomplish the original task. Guidelines for your prompt template: - - The only variable you are allowed to use is the values variable, which contains all chunk results. Each value is a dictionary with the keys {', '.join(schema_keys)} + - The only variable you are allowed to use is the inputs variable, which contains all chunk results. Each value is a dictionary with the keys {', '.join(schema_keys)} - Avoid using filters or complex logic, even though Jinja technically supports it - The prompt template must be a valid Jinja2 template - - You must use the {{ values }} variable somehow (you can access specific schema keys if you'ld like) + - You must use the {{ inputs }} variable somehow (you can access specific schema keys if you'ld like) Provide your prompt template as a single string. """ diff --git a/motion/optimizers/reduce_optimizer.py b/motion/optimizers/reduce_optimizer.py index 5ffae7f5..7f99f585 100644 --- a/motion/optimizers/reduce_optimizer.py +++ b/motion/optimizers/reduce_optimizer.py @@ -9,7 +9,7 @@ from motion.operations.base import BaseOperation from motion.optimizers.join_optimizer import JoinOptimizer -from motion.optimizers.utils import LLMClient +from motion.optimizers.utils import LLMClient, extract_jinja_variables from motion.utils import count_tokens from motion.operations.utils import truncate_messages from litellm import model_cost @@ -67,6 +67,7 @@ def optimize( self, op_config: Dict[str, Any], input_data: List[Dict[str, Any]], + level: int = 1, ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], float]: """ Optimize the reduce operation based on the given configuration and input data. @@ -108,7 +109,7 @@ def optimize( prompt_template = Template(op_config["prompt"]) sample_prompt = prompt_template.render( reduce_key=dict(zip(op_config["reduce_key"], sample_key)), - values=[input_data[0]], + inputs=[input_data[0]], ) # Count tokens in the sample prompt @@ -170,11 +171,13 @@ def optimize( ) # Step 3: Evaluate if decomposition is beneficial - decomposition_result = self._evaluate_decomposition(op_config, input_data) + decomposition_result = self._evaluate_decomposition( + op_config, input_data, level + ) if decomposition_result["should_decompose"]: return self._optimize_decomposed_reduce( - decomposition_result, op_config, input_data + decomposition_result, op_config, input_data, level ) return self._optimize_single_reduce(op_config, input_data, validator_prompt) @@ -197,7 +200,7 @@ def _should_use_map( reduce_key=dict( zip(op_config["reduce_key"], sample_input[op_config["reduce_key"]]) ), - values=[sample_input], + inputs=[sample_input], ) # Prepare the message for the LLM @@ -290,7 +293,7 @@ def _optimize_single_reduce( # Step 3: Create and evaluate multiple reduce plans reduce_plans = self._create_reduce_plans(op_config, input_data, is_commutative) best_plan = self._evaluate_reduce_plans( - reduce_plans, input_data, validator_prompt + op_config, reduce_plans, input_data, validator_prompt ) # Step 4: Run the best reduce plan @@ -303,6 +306,7 @@ def _optimize_decomposed_reduce( decomposition_result: Dict[str, Any], op_config: Dict[str, Any], input_data: List[Dict[str, Any]], + level: int, ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], float]: """ Optimize a decomposed reduce operation. @@ -318,7 +322,7 @@ def _optimize_decomposed_reduce( decomposition_result (Dict[str, Any]): The result of the decomposition evaluation. op_config (Dict[str, Any]): The original reduce operation configuration. input_data (List[Dict[str, Any]]): The input data for the reduce operation. - + level (int): The current level of decomposition. Returns: Tuple[List[Dict[str, Any]], List[Dict[str, Any]], float]: A tuple containing the list of optimized configurations for both reduce operations and the final output of the second reduce operation, and the cost of the operation due to synthesizing any resolve operations. @@ -339,35 +343,41 @@ def _optimize_decomposed_reduce( first_reduce_config["reduce_key"] = [sub_group_key, op_config["reduce_key"]] first_reduce_config["pass_through"] = True - resolve_config = { - "type": "resolve", - "empty": True, - "embedding_model": "text-embedding-3-small", - "resolution_model": self.config.get("default_model", "gpt-4o-mini"), - "comparison_model": self.config.get("default_model", "gpt-4o-mini"), - "_intermediates": { - "map_prompt": op_config.get("_intermediates", {}).get( - "last_map_prompt" - ), - "reduce_key": first_reduce_config["reduce_key"], - }, - } - optimized_resolve_config, resolve_cost = JoinOptimizer( - self.config, resolve_config, self.console, self.llm_client, self.max_threads - ).optimize_resolve(input_data) - all_cost += resolve_cost - - if not optimized_resolve_config.get("empty", False): - # Add this to the pipeline - pipeline += [optimized_resolve_config] - - # Run the resolver - optimized_output = self._run_operation(optimized_resolve_config, input_data) - input_data = optimized_output + if first_reduce_config.get("synthesize_resolve", True): + resolve_config = { + "type": "resolve", + "empty": True, + "embedding_model": "text-embedding-3-small", + "resolution_model": self.config.get("default_model", "gpt-4o-mini"), + "comparison_model": self.config.get("default_model", "gpt-4o-mini"), + "_intermediates": { + "map_prompt": op_config.get("_intermediates", {}).get( + "last_map_prompt" + ), + "reduce_key": first_reduce_config["reduce_key"], + }, + } + optimized_resolve_config, resolve_cost = JoinOptimizer( + self.config, + resolve_config, + self.console, + self.llm_client, + self.max_threads, + ).optimize_resolve(input_data) + all_cost += resolve_cost + + if not optimized_resolve_config.get("empty", False): + # Add this to the pipeline + pipeline += [optimized_resolve_config] + + # Run the resolver + optimized_output = self._run_operation( + optimized_resolve_config, input_data + ) + input_data = optimized_output first_optimized_configs, first_outputs, first_cost = self.optimize( - first_reduce_config, - input_data, + first_reduce_config, input_data, level + 1 ) pipeline += first_optimized_configs all_cost += first_cost @@ -378,7 +388,7 @@ def _optimize_decomposed_reduce( second_reduce_config["pass_through"] = True second_optimized_configs, second_outputs, second_cost = self.optimize( - second_reduce_config, first_outputs + second_reduce_config, first_outputs, level + 1 ) # Combine optimized configs and return with final output @@ -391,6 +401,7 @@ def _evaluate_decomposition( self, op_config: Dict[str, Any], input_data: List[Dict[str, Any]], + level: int = 1, ) -> Dict[str, Any]: """ Evaluate whether decomposing the reduce operation would be beneficial. @@ -401,11 +412,12 @@ def _evaluate_decomposition( Args: op_config (Dict[str, Any]): Configuration for the reduce operation. input_data (List[Dict[str, Any]]): Input data for the reduce operation. + level (int): The current level of decomposition. Returns: Dict[str, Any]: A dictionary containing the decomposition decision and details. """ - should_decompose = self._should_decompose(op_config, input_data) + should_decompose = self._should_decompose(op_config, input_data, level) # Log the decomposition decision if should_decompose["should_decompose"]: @@ -436,6 +448,7 @@ def _should_decompose( self, op_config: Dict[str, Any], input_data: List[Dict[str, Any]], + level: int = 1, ) -> Dict[str, Any]: """ Determine if decomposing the reduce operation would be beneficial. @@ -443,10 +456,18 @@ def _should_decompose( Args: op_config (Dict[str, Any]): Configuration for the reduce operation. input_data (List[Dict[str, Any]]): Input data for the reduce operation. + level (int): The current level of decomposition. Returns: Dict[str, Any]: A dictionary containing the decomposition decision and explanation. """ + # TODO: we have not enabled recursive decomposition yet + if level > 1 and not op_config.get("recursively_optimize", False): + return { + "should_decompose": False, + "explanation": "Recursive decomposition is not enabled.", + } + system_prompt = ( "You are an AI assistant tasked with optimizing data processing pipelines." ) @@ -559,7 +580,7 @@ def _get_decomposition_details( 2. A prompt for the first reduce operation 3. A prompt for the second (final) reduce operation - For the reduce operation prompts, you should only minimally modify the original prompt. The prompts should be Jinja templates, and the only variables they can access are the `reduce_key` and `values` variables. + For the reduce operation prompts, you should only minimally modify the original prompt. The prompts should be Jinja templates, and the only variables they can access are the `reduce_key` and `inputs` variables. Provide your suggestions in the following format: """ @@ -695,12 +716,15 @@ def _determine_value_sampling( # Determine embedding keys prompt = f""" For the {method} sampling method, we need to determine which keys from the input data should be used for generating embeddings. + + Input data keys: + {', '.join(sample_input[0].keys())} Sample Input Data: - {json.dumps(sample_input[0], indent=2)} + {json.dumps(sample_input[0], indent=2)[:1000]}... Based on the reduce operation prompt and the sample input data, which keys should be used for generating embeddings? Use keys that will create meaningful embeddings (i.e., not id-related keys). - Provide your answer as a list of key names. + Provide your answer as a list of key names that is a subset of the input data keys. You should pick only the 1-3 keys that are necessary for generating meaningful embeddings, that have relatively short values. """ parameters = { @@ -718,7 +742,24 @@ def _determine_value_sampling( parameters, ) result = json.loads(response.choices[0].message.content) - value_sampling_config["embedding_keys"] = result["embedding_keys"] + # TODO: validate that these exist + embedding_keys = result["embedding_keys"] + for key in result["embedding_keys"]: + if key not in sample_input[0]: + embedding_keys.remove(key) + + if not embedding_keys: + # Select the reduce key + self.console.log( + "No embedding keys found, selecting reduce key for embedding key" + ) + embedding_keys = ( + op_config["reduce_key"] + if isinstance(op_config["reduce_key"], list) + else [op_config["reduce_key"]] + ) + + value_sampling_config["embedding_keys"] = embedding_keys if method == "sem_sim": # Determine query text @@ -783,12 +824,12 @@ def _is_commutative( {op_config['prompt']} Sample Input Data: - {json.dumps(sample_input, indent=2)} + {json.dumps(sample_input, indent=2)[:1000]}... A reduce operation is commutative if the order of combining elements doesn't affect the final result. For example, sum and product operations are commutative, while subtraction and division are not. - Based on the reduce operation prompt and the sample input data, determine if this operation is likely to be commutative. + Based on the reduce operation prompt, determine if this operation is likely to be commutative. Answer with 'yes' if order matters (non-commutative) or 'no' if order doesn't matter (commutative). Explain your reasoning briefly. @@ -1084,11 +1125,24 @@ def _create_reduce_plans( prompt_tokens = count_tokens(op_config["prompt"], model) sample_input = input_data[:100] sample_output = self._run_operation(op_config, input_data[:100]) + + prompt_vars = extract_jinja_variables(op_config["prompt"]) + prompt_vars = [var.split(".")[-1] for var in prompt_vars] avg_input_tokens = mean( - [count_tokens(json.dumps(item), model) for item in sample_input] + [ + count_tokens( + json.dumps({k: item[k] for k in prompt_vars if k in item}), model + ) + for item in sample_input + ] ) avg_output_tokens = mean( - [count_tokens(json.dumps(item), model) for item in sample_output] + [ + count_tokens( + json.dumps({k: item[k] for k in prompt_vars if k in item}), model + ) + for item in sample_output + ] ) # Calculate max batch size that fits in context window @@ -1096,21 +1150,43 @@ def _create_reduce_plans( model_input_context_length - prompt_tokens - avg_output_tokens ) // avg_input_tokens - # Generate 5 candidate batch sizes + # Generate 6 candidate batch sizes batch_sizes = [ - max(1, int(max_batch_size * ratio)) for ratio in [0.2, 0.4, 0.6, 0.8, 1.0] + max(1, int(max_batch_size * ratio)) + for ratio in [0.1, 0.2, 0.4, 0.6, 0.8, 1.0] ] + # Log the generated batch sizes + self.console.log("[cyan]Generating plans for batch sizes:[/cyan]") + for size in batch_sizes: + self.console.log(f" - {size}") batch_sizes = sorted(set(batch_sizes)) # Remove duplicates and sort plans = [] # Generate multiple fold prompts - fold_prompts = self._synthesize_fold_prompts( - op_config, - sample_input, - sample_output, - num_prompts=2, - ) + max_retries = 5 + retry_count = 0 + fold_prompts = [] + + while retry_count < max_retries and not fold_prompts: + try: + fold_prompts = self._synthesize_fold_prompts( + op_config, + sample_input, + sample_output, + num_prompts=2, + ) + if not fold_prompts: + raise ValueError("No fold prompts generated") + except Exception as e: + retry_count += 1 + if retry_count == max_retries: + raise RuntimeError( + f"Failed to generate fold prompts after {max_retries} attempts: {str(e)}" + ) + self.console.log( + f"Retry {retry_count}/{max_retries}: Failed to generate fold prompts. Retrying..." + ) for batch_size in batch_sizes: for fold_prompt in fold_prompts: @@ -1345,7 +1421,7 @@ def generate_single_prompt(): The fold prompt should be a Jinja2 template with the following variables available: - {{ output }}: The current reduced value (a dictionary with the current output schema) - - {{ values }}: A list of new values to be folded in + - {{ inputs }}: A list of new values to be folded in - {{ reduce_key }}: The key used for grouping in the reduce operation Provide the fold prompt as a string. @@ -1355,7 +1431,21 @@ def generate_single_prompt(): system_prompt, parameters, ) - return json.loads(response.choices[0].message.content)["fold_prompt"] + fold_prompt = json.loads(response.choices[0].message.content)["fold_prompt"] + + # Run the operation with the fold prompt + # Create a temporary plan with the fold prompt + temp_plan = op_config.copy() + temp_plan["fold_prompt"] = fold_prompt + temp_plan["fold_batch_size"] = min( + len(sample_input), 2 + ) # Use a small batch size for testing + + # Run the operation with the fold prompt + self._run_operation(temp_plan, sample_input[: temp_plan["fold_batch_size"]]) + + # If the operation runs successfully, return the fold prompt + return fold_prompt with ThreadPoolExecutor(max_workers=self.max_threads) as executor: fold_prompts = list( @@ -1366,6 +1456,7 @@ def generate_single_prompt(): def _evaluate_reduce_plans( self, + op_config: Dict[str, Any], plans: List[Dict[str, Any]], input_data: List[Dict[str, Any]], validator_prompt: str, @@ -1384,6 +1475,7 @@ def _evaluate_reduce_plans( together. We default to a merge batch size of 2, but one can increase this. Args: + op_config (Dict[str, Any]): The configuration of the reduce operation. plans (List[Dict[str, Any]]): A list of reduce plans to evaluate. input_data (List[Dict[str, Any]]): The input data to use for evaluation. validator_prompt (str): The prompt to use for validating the output of each plan. @@ -1447,53 +1539,56 @@ def _evaluate_reduce_plans( f"\n[green]Selected best plan with score: {best_score:.2f} and batch size: {best_plan['fold_batch_size']}[/green]" ) - # Create a new plan with merge prompt and updated parameters - merged_plan = best_plan.copy() + if op_config.get("synthesize_merge", True): + # Create a new plan with merge prompt and updated parameters + merged_plan = best_plan.copy() - # Synthesize merge prompt if it doesn't exist - if "merge_prompt" not in merged_plan: - merged_plan["merge_prompt"] = self._synthesize_merge_prompt( - merged_plan, plan_outputs[id(best_plan)] + # Synthesize merge prompt if it doesn't exist + if "merge_prompt" not in merged_plan: + merged_plan["merge_prompt"] = self._synthesize_merge_prompt( + merged_plan, plan_outputs[id(best_plan)] + ) + # Print the synthesized merge prompt + self.console.log("\n[bold]Synthesized Merge Prompt:[/bold]") + self.console.log(merged_plan["merge_prompt"]) + + # Set merge_batch_size to 2 and num_parallel_folds to 5 + merged_plan["merge_batch_size"] = 2 + + # Evaluate the merged plan + _, merged_plan_score, _, operation_instance = self._evaluate_single_plan( + merged_plan, + evaluation_sample, + validator_prompt, + validation_inputs, + return_instance=True, ) - # Print the synthesized merge prompt - self.console.log("\n[bold]Synthesized Merge Prompt:[/bold]") - self.console.log(merged_plan["merge_prompt"]) - - # Set merge_batch_size to 2 and num_parallel_folds to 5 - merged_plan["merge_batch_size"] = 2 - - # Evaluate the merged plan - _, merged_plan_score, _, operation_instance = self._evaluate_single_plan( - merged_plan, - evaluation_sample, - validator_prompt, - validation_inputs, - return_instance=True, - ) - # Get the merge and fold times from the operation instance - merge_times = operation_instance.merge_times - fold_times = operation_instance.fold_times - merge_avg_time = mean(merge_times) if merge_times else None - fold_avg_time = mean(fold_times) if fold_times else None + # Get the merge and fold times from the operation instance + merge_times = operation_instance.merge_times + fold_times = operation_instance.fold_times + merge_avg_time = mean(merge_times) if merge_times else None + fold_avg_time = mean(fold_times) if fold_times else None - self.console.log("\n[bold]Scores:[/bold]") - self.console.log(f"Original plan: {best_score:.2f}") - self.console.log(f"Merged plan: {merged_plan_score:.2f}") + self.console.log("\n[bold]Scores:[/bold]") + self.console.log(f"Original plan: {best_score:.2f}") + self.console.log(f"Merged plan: {merged_plan_score:.2f}") - # Compare scores and decide which plan to use - if merged_plan_score >= best_score * 0.75: - self.console.log( - f"\n[green]Using merged plan with score: {merged_plan_score:.2f}[/green]" - ) - if merge_avg_time and fold_avg_time: - merged_plan["merge_time"] = merge_avg_time - merged_plan["fold_time"] = fold_avg_time - return merged_plan + # Compare scores and decide which plan to use + if merged_plan_score >= best_score * 0.75: + self.console.log( + f"\n[green]Using merged plan with score: {merged_plan_score:.2f}[/green]" + ) + if merge_avg_time and fold_avg_time: + merged_plan["merge_time"] = merge_avg_time + merged_plan["fold_time"] = fold_avg_time + return merged_plan + else: + self.console.log( + f"\n[yellow]Merged plan quality too low. Using original plan with score: {best_score:.2f}[/yellow]" + ) + return best_plan else: - self.console.log( - f"\n[yellow]Merged plan quality too low. Using original plan with score: {best_score:.2f}[/yellow]" - ) return best_plan def _evaluate_single_plan( diff --git a/motion/optimizers/utils.py b/motion/optimizers/utils.py index f4ee2dea..d0216e1e 100644 --- a/motion/optimizers/utils.py +++ b/motion/optimizers/utils.py @@ -3,6 +3,7 @@ from jinja2 import Environment, meta from litellm import completion, completion_cost +from motion.operations.utils import truncate_messages def extract_jinja_variables(template_string: str) -> List[str]: @@ -82,6 +83,8 @@ def generate( """ parameters["additionalProperties"] = False + messages = truncate_messages(messages, self.model) + response = completion( model=self.model, messages=[ diff --git a/motion/utils.py b/motion/utils.py index ef59affb..0108bc05 100644 --- a/motion/utils.py +++ b/motion/utils.py @@ -4,6 +4,15 @@ import tiktoken import json +from litellm import completion_cost as lcc + + +def completion_cost(response) -> float: + try: + return lcc(response) + except Exception: + return 0.0 + def load_config(config_path: str) -> Dict[str, Any]: """ diff --git a/paper_workloads/biodex/compute_rp.py b/paper_workloads/biodex/compute_rp.py index 4d695756..f521e9a3 100644 --- a/paper_workloads/biodex/compute_rp.py +++ b/paper_workloads/biodex/compute_rp.py @@ -27,22 +27,25 @@ def main(): df = pd.DataFrame(data) labels_df = pd.DataFrame(labels) df = df.merge(labels_df, on="id") - k = 10 + k_values = [5, 10] - # Calculate RP@10 for each group + # Calculate RP@5 and RP@10 for each group results = [] for _, row in df.iterrows(): id_ = row["id"] extracted = row["ranked_conditions"] ground_truth = row["ground_truth_reactions"] - rp_at_k = calculate_rp_at_k(extracted, ground_truth, k) - results.append({"id": id_, f"RP@{k}": rp_at_k}) + result = {"id": id_} + for k in k_values: + rp_at_k = calculate_rp_at_k(extracted, ground_truth, k) + result[f"RP@{k}"] = rp_at_k + results.append(result) - # Calculate average RP@k - avg_rp_at_k = sum(item[f"RP@{k}"] for item in results) / len(results) - - print(f"Average RP@{k}: {avg_rp_at_k}") + # Calculate average RP@k for each k + for k in k_values: + avg_rp_at_k = sum(item[f"RP@{k}"] for item in results) / len(results) + print(f"Average RP@{k}: {avg_rp_at_k}") if __name__ == "__main__": diff --git a/paper_workloads/biodex/pipeline_opt.yaml b/paper_workloads/biodex/pipeline_opt.yaml index 7a47b959..30da468d 100644 --- a/paper_workloads/biodex/pipeline_opt.yaml +++ b/paper_workloads/biodex/pipeline_opt.yaml @@ -77,7 +77,6 @@ operations: Determine if {{ right.reaction }} is described in the medical article, considering the context and meaning beyond just the presence of individual words. embedding_model: text-embedding-3-small - limit_comparisons: 100000 type: equijoin synthesized_conditions_extraction: model: gpt-4o-mini diff --git a/paper_workloads/corporate_lobbying/compute_metrics.py b/paper_workloads/corporate_lobbying/compute_metrics.py index 87f15aa0..a7ec9d73 100644 --- a/paper_workloads/corporate_lobbying/compute_metrics.py +++ b/paper_workloads/corporate_lobbying/compute_metrics.py @@ -50,6 +50,11 @@ def compute_metrics(ground_truth_path, predictions_path): "/Users/shreyashankar/Documents/hacking/motion-v3/paper_workloads/corporate_lobbying/relevance_assessment_tool.json", ) +our_metrics = compute_metrics( + "/Users/shreyashankar/Documents/hacking/motion-v3/paper_workloads/corporate_lobbying/ground_truth.json", + "/Users/shreyashankar/Documents/hacking/motion-v3/paper_workloads/corporate_lobbying/relevance_assessment_ours.json", +) + # Prepare data for the DataFrame data = { "Metric": ["Accuracy", "Precision", "Recall", "F1 Score"], @@ -59,12 +64,18 @@ def compute_metrics(ground_truth_path, predictions_path): f"{baseline_metrics['recall']:.4f}", f"{baseline_metrics['f1_score']:.4f}", ], - "Tool": [ + "Raw Article Text from Google Search": [ f"{tool_metrics['accuracy']:.4f}", f"{tool_metrics['precision']:.4f}", f"{tool_metrics['recall']:.4f}", f"{tool_metrics['f1_score']:.4f}", ], + "Motion": [ + f"{our_metrics['accuracy']:.4f}", + f"{our_metrics['precision']:.4f}", + f"{our_metrics['recall']:.4f}", + f"{our_metrics['f1_score']:.4f}", + ], } # Create DataFrame diff --git a/paper_workloads/medicalschema/eval.py b/paper_workloads/medicalschema/eval.py index 8d4b32b6..4d9fce44 100644 --- a/paper_workloads/medicalschema/eval.py +++ b/paper_workloads/medicalschema/eval.py @@ -23,7 +23,7 @@ ] with open( - "/Users/shreyashankar/Documents/hacking/motion-v3/paper_workloads/medicalschema/extracted_patient_data.json" + "/Users/shreyashankar/Documents/hacking/motion-v3/paper_workloads/medicalschema/extracted_patient_data_with_reduce.json" ) as f: data = json.load(f) diff --git a/paper_workloads/medicalschema/pipeline.yaml b/paper_workloads/medicalschema/pipeline.yaml index db5848ec..988f0340 100644 --- a/paper_workloads/medicalschema/pipeline.yaml +++ b/paper_workloads/medicalschema/pipeline.yaml @@ -87,6 +87,76 @@ operations: If a field is not present in the data, give an empty string for it. The case_submitter_id is required. + combine_patient_data: + type: reduce + commutative: false + reduce_key: + - file_name + output: + schema: + case_submitter_id: "list[str]" + age_at_diagnosis: "list[int]" + race: "list[str]" + ethnicity: "list[str]" + gender: "list[str]" + vital_status: "list[str]" + ajcc_pathologic_t: "list[str]" + ajcc_pathologic_n: "list[str]" + ajcc_pathologic_stage: "list[str]" + tumor_grade: "list[str]" + tumor_focality: "list[str]" + tumor_largest_dimension_diameter: "list[float]" + primary_diagnosis: "list[str]" + morphology: "list[str]" + tissue_or_organ_of_origin: "list[str]" + study: "list[str]" + prompt: | + Combine the patient data from multiple chunks of the same file. Each chunk contains lists of patient data fields. You should concatenate the relevant lists from each chunk. + + Input chunks to merge: + {% for chunk in values %} + Chunk {{ loop.index }}: + case_submitter_id: {{ chunk.case_submitter_id }} + age_at_diagnosis: {{ chunk.age_at_diagnosis }} + race: {{ chunk.race }} + ethnicity: {{ chunk.ethnicity }} + gender: {{ chunk.gender }} + vital_status: {{ chunk.vital_status }} + ajcc_pathologic_t: {{ chunk.ajcc_pathologic_t }} + ajcc_pathologic_n: {{ chunk.ajcc_pathologic_n }} + ajcc_pathologic_stage: {{ chunk.ajcc_pathologic_stage }} + tumor_grade: {{ chunk.tumor_grade }} + tumor_focality: {{ chunk.tumor_focality }} + tumor_largest_dimension_diameter: {{ chunk.tumor_largest_dimension_diameter }} + primary_diagnosis: {{ chunk.primary_diagnosis }} + morphology: {{ chunk.morphology }} + tissue_or_organ_of_origin: {{ chunk.tissue_or_organ_of_origin }} + study: {{ chunk.study }} + + {% endfor %} + + Merge the data from all chunks into a single set of lists, maintaining the order of the case_submitter_ids. If there are any conflicts in the data for the same case_submitter_id, use the data from the later chunk. + + Ensure the output maintains the following schema: + - case_submitter_id: list[str] + - age_at_diagnosis: list[int] + - race: list[str] + - ethnicity: list[str] + - gender: list[str] + - vital_status: list[str] + - ajcc_pathologic_t: list[str] + - ajcc_pathologic_n: list[str] + - ajcc_pathologic_stage: list[str] + - tumor_grade: list[str] + - tumor_focality: list[str] + - tumor_largest_dimension_diameter: list[float] + - primary_diagnosis: list[str] + - morphology: list[str] + - tissue_or_organ_of_origin: list[str] + - study: list[str] + + Provide the merged data in the same format as the input chunks. + pipeline: steps: - name: biometric_task @@ -96,7 +166,8 @@ pipeline: - split_text - gather_peripherals - extract_patient_data + - combine_patient_data output: type: file - path: "/Users/shreyashankar/Documents/hacking/motion-v3/paper_workloads/medicalschema/extracted_patient_data.json" + path: "/Users/shreyashankar/Documents/hacking/motion-v3/paper_workloads/medicalschema/extracted_patient_data_with_reduce.json" diff --git a/patient_records.json b/patient_records.json deleted file mode 100644 index f4f435cc..00000000 --- a/patient_records.json +++ /dev/null @@ -1 +0,0 @@ -[{"id": 1, "text": "Patient reports taking aspirin daily."}, {"id": 2, "text": "Patient is on lisinopril for blood pressure."}, {"id": 3, "text": "Patient is prescribed omeprazole for acid reflux."}, {"id": 4, "text": "Patient takes metformin for diabetes."}, {"id": 5, "text": "Patient uses albuterol inhaler as needed."}, {"id": 6, "text": "Patient is prescribed warfarin for blood thinning."}, {"id": 7, "text": "Patient takes levothyroxine for thyroid issues."}, {"id": 8, "text": "Patient uses insulin injections to manage diabetes."}, {"id": 9, "text": "Patient is on amlodipine for blood pressure."}, {"id": 10, "text": "Patient reports taking ibuprofen for pain relief."}, {"id": 11, "text": "Patient uses a fluticasone nasal spray for allergies."}, {"id": 12, "text": "Patient takes metformin for diabetes."}, {"id": 13, "text": "Patient is prescribed atorvastatin for high cholesterol."}, {"id": 14, "text": "Patient is taking sertraline for depression."}, {"id": 15, "text": "Patient uses a budesonide inhaler for asthma control."}] \ No newline at end of file diff --git a/tests/test_basic.py b/tests/test_basic.py index c207bd00..23cce95b 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -433,7 +433,7 @@ def reduce_config(): return { "type": "reduce", "reduce_key": "group", - "prompt": "Summarize the following group of values: {{ values }} Provide a total and any other relevant statistics.", + "prompt": "Summarize the following group of values: {{ inputs }} Provide a total and any other relevant statistics.", "output": {"schema": {"total": "number", "avg": "number"}}, "model": "gpt-4o-mini", } @@ -514,7 +514,7 @@ def resolve_config(): "embedding_model": "text-embedding-3-small", "comparison_model": "gpt-4o-mini", "resolution_model": "gpt-4o-mini", - "resolution_prompt": "Given the following list of similar entries, determine one common name and email. {{ matched_entries }}", + "resolution_prompt": "Given the following list of similar entries, determine one common name and email. {{ inputs }}", } diff --git a/tests/test_map_decomp.py b/tests/test_map_decomp.py deleted file mode 100644 index a41d28ec..00000000 --- a/tests/test_map_decomp.py +++ /dev/null @@ -1,214 +0,0 @@ -# import re -# from typing import List, Tuple, Any -# from motion.old.dataset import Dataset -# from motion.operators import LLMMapper, build_only -# from dotenv import load_dotenv -# import os -# from litellm import completion - -# load_dotenv() - -# MODEL = "gpt-4o-mini" # Adjust based on your available models - -# book_titles = [ -# "The Curious Incident of the Dog in the Night-Time", -# "The Guernsey Literary and Potato Peel Pie Society", -# "The Hundred-Year-Old Man Who Climbed Out of the Window and Disappeared", -# "The Particular Sadness of Lemon Cake: A Novel of Magical Realism", -# "The Unbearable Lightness of Being in a World of Heavy Consequences", -# "The Hitchhiker's Guide to the Galaxy: The Complete Trilogy of Five", -# "The Absolutely True Diary of a Part-Time Indian: A Journey of Self-Discovery", -# "The Perks of Being a Wallflower: Coming of Age in the Digital Era", -# "The Brief Wondrous Life of Oscar Wao: A Tale of Love and the Dominican Diaspora", -# "The Elegance of the Hedgehog: Philosophical Musings in a Parisian Apartment", -# "The Curious Case of Benjamin Button and Other Jazz Age Stories", -# "The Time Traveler's Wife: A Love Story Across Dimensions and Decades", -# "The Immortal Life of Henrietta Lacks: Science, Ethics, and Human Legacy", -# "The Thousand Autumns of Jacob de Zoet: A Historical Epic of Old Japan", -# "The Mysterious Affair at Styles: Hercule Poirot's First Case", -# "The Strange Case of Dr. Jekyll and Mr. Hyde: A Study in Duality", -# "The Catcher in the Rye: Holden Caulfield's Journey Through New York City", -# "The Adventures of Huckleberry Finn: A Boy's Journey Down the Mississippi", -# "The Picture of Dorian Gray: A Portrait of Vanity and Corruption", -# "The Wind-Up Bird Chronicle: A Surreal Quest in Modern Tokyo", -# "The Unlikely Pilgrimage of Harold Fry: A Journey of a Thousand Miles", -# "The Girl with the Dragon Tattoo: A Swedish Thriller of Secrets and Revenge", -# "The Curious Incident of the Siamese Cat in the Afternoon", -# "The Remarkable Journey of Coyote Sunrise: A Cross-Country Adventure", -# "The Inconvenient Indian: A Curious Account of Native People in North America", -# "The Subtle Art of Not Giving a F*ck: A Counterintuitive Approach to Living a Good Life", -# "The Loneliness of the Long-Distance Runner: Stories of Endurance and Solitude", -# "The Hitchhiker's Guide to the Galaxy: Don't Panic Edition", -# "The Neverending Story: A Tale of Fantasy and Imagination", -# "The Absolutely True Diary of a Part-Time Superhero", -# "The Hundred-Year-Old Secret Society of the Whispering Willows", -# "The Peculiar Circumstances Surrounding the Disappearance of Miss Finch", -# "The Extraordinary Adventures of Alfred Kropp: Keeper of King Arthur's Sword", -# "The Unexpected Inheritance of Inspector Chopra: A Baby Elephant Mystery", -# "The Curious Case of the Clockwork Man: A Burton & Swinburne Adventure", -# "The Sweetness at the Bottom of the Pie: A Flavia de Luce Mystery", -# "The Wonderful Story of Henry Sugar and Six More Tales of the Unexpected", -# "The Incredible Journey of Edward Tulane: A Toy Rabbit's Tale of Love", -# "The Astonishing Color of After: A Story of Grief, Love, and Magic", -# "The Inexplicable Logic of My Life: A Coming-of-Age Tale", -# "The Mysterious Benedict Society and the Perilous Journey: Book Two", -# "The Undomestic Goddess: A Hilarious Tale of Corporate Burnout and Rural Bliss", -# "The Absolutely True Story of a Part-Time Indian Princess", -# "The Strange and Beautiful Sorrows of Ava Lavender: A Magical Realism Novel", -# "The Gigantic Beard That Was Evil: A Graphic Novel About Conformity", -# "The Hundred-Year-Old Man Who Climbed Out of the Window and Ran for President", -# "The Curious Incident of the Cat in the Daytime: A Feline Mystery", -# "The Guernsey Literary and Potato Peel Pie Society: The Lost Chapters", -# "The Particular Sadness of Chocolate Cake: A Culinary Mystery", -# "The Unbearable Lightness of Being a Wallflower in the Digital Age", -# "The Time Traveler's Daughter: Navigating a Legacy Across Centuries", -# "The Immortal Life of Henrietta Lacks: The Next Generation", -# "The Thousand Winters of Jacob de Zoet: A Tale of Perseverance in Feudal Japan", -# "The Mysterious Affair at Sunset: Miss Marple's Final Case", -# "The Strange Case of the Alchemist's Daughter: Monsters and Mysteries in Victorian London", -# "The Catcher in the Wifi: Holden Caulfield Navigates the Internet Age", -# "The Adventures of Huckleberry Finn in Space: A Galactic Journey", -# "The Picture of Dorian Gray's Selfie: Vanity in the Age of Social Media", -# "The Wind-Up Girl Chronicle: A Tale of Bioengineered Futures", -# "The Unlikely Pilgrimage of Harold Fry's Grandson: Retracing Steps", -# "The Girl with the Quantum Tattoo: A Sci-Fi Thriller of Alternate Realities", -# "The Remarkable Journey of Coyote Sunrise: The Road Trip Continues", -# "The Inconvenient Truth About Time Travel: A Scientific and Philosophical Exploration", -# "The Subtle Art of Giving Too Many F*cks: An Overachiever's Guide to Burnout", -# "The Loneliness of the Long-Distance Spaceship Captain: Tales from the Void", -# "The Neverending Story: The Lost Chapters of Fantastica", -# "The Extraordinary Adventures of Ordinary Alice: A Wonderland Reimagined", -# "The Unexpected Inheritance of Inspector Gadget: A Robotic Mystery", -# "The Curious Case of the Quantum Cat: Schrödinger's Detective Story", -# "The Sweetness at the Bottom of the Black Hole: An Astrophysicist's Memoir", -# "The Wonderful World of Whales: Secrets of the Cetacean Society", -# "The Incredible Journey of the Last Dodo: A Tale of Extinction and Hope", -# "The Astonishing Color of Invisible Light: Synesthesia in the Quantum Realm", -# "The Inexplicable Logic of Cats: A Feline Philosopher's Guide to Life", -# "The Mysterious Benedict Society and the Riddle of the Multiverse", -# "The Undomestic Android: A Comedy of Errors in the Smart Home Era", -# "The Absolutely True Story of a Part-Time Vampire Slayer's Apprentice", -# "The Strange and Beautiful Sorrows of the Last Unicorn on Earth", -# "The Gigantic Beard That Saved the World: An Eco-Fable", -# "The Hundred-Year-Old Algorithm That Escaped the Mainframe", -# "The Curious Incident of the Ghost in the Machine: A Digital Haunting", -# "The Literary and Logarithmic Society: Math Nerds Save the Library", -# "The Particular Joy of Homemade Ice Cream: A Culinary Love Story", -# "The Unbearable Lightness of Being a Cloud in a World of Big Data", -# "The Time Traveler's Pet: A Dog's Journey Through Human History", -# "The Immortal Life of Artificial Intelligence: From Turing to Transcendence", -# "The Thousand Moons of Europa: A Space Opera of Jovian Proportions", -# "The Mysterious Affair at the Quantum Cafe: Agatha Christie Meets Schrödinger", -# "The Strange Case of the Disappearing Bees: An Ecological Mystery", -# "The Catcher in the Virtual Rye: Holden Caulfield's Digital Descendants", -# "The Adventures of Sherlock Holmes in the 25th Century", -# "The Picture of Dorian Gray's Clone: Ethics in the Age of Genetic Engineering", -# "The Wind-Up Universe Chronicle: Cosmic Tales of Entropy and Renewal", -# "The Unlikely Pilgrimage of the Last Human on Earth", -# "The Girl with the Bioengineered Brain: A Neuro-Noir Thriller", -# "The Remarkable Journey to the Center of the Atom: A Subatomic Odyssey", -# "The Inconvenient Yeti: Cryptozoology in the Age of Climate Change", -# "The Subtle Art of Embracing Chaos: A Mathematician's Guide to Life", -# "The Loneliness of the Interplanetary Postal Worker: Cosmic Mail Tales", -# "The Neverending Story of the Expanding Universe: A Cosmological Epic", -# ] - -# # Create initial dataset -# dataset = Dataset([(title, 1) for title in book_titles]) - - -# class BookTitleAnalyzer(LLMMapper): -# def generate_prompt(self, key: str, value: int) -> list: -# return [ -# { -# "role": "system", -# "content": "You are a literary analyst with expertise in linguistics, creative writing, and market trends.", -# }, -# { -# "role": "user", -# "content": f"""Perform an in-depth analysis of the following book title: - -# Title: {key} - -# Provide the following information: -# 1. Linguistic Analysis: -# a) Number of words -# b) Syllable count (estimate) -# c) Presence of alliteration or assonance -# d) Identify any archaic or unusual words - -# 2. Thematic Elements: -# a) Primary theme(s) suggested by the title -# b) Emotional tone (e.g., mysterious, uplifting, melancholic) -# c) Any cultural or historical references implied - -# 3. Genre and Market Appeal: -# a) Primary and secondary genre predictions -# b) Target age group -# c) Estimate potential market appeal (1-10 scale) - -# 4. Creative Writing Aspects: -# a) Suggest a brief (2-3 sentence) plot synopsis based solely on the title -# b) Propose a potential opening line for the book - -# 5. Comparative Analysis: -# a) Suggest two existing famous books with similar titles or themes -# b) Explain how this title might stand out or blend in with current market trends - -# Provide your analysis in a structured format, using the numbering system above.""", -# }, -# ] - -# def process_response(self, response: Any, **prompt_kwargs) -> Tuple[str, dict]: -# content = response.choices[0].message.content.strip() -# # We'll keep the raw response as is, given its complex structure -# return (prompt_kwargs["key"], content) - -# @build_only -# def validate( -# self, input_key: str, input_value: int, output_key: str, output_value: str -# ) -> None: -# # Use LLM for content quality check -# validation_prompt = f""" -# Analyze the following book title analysis for quality and completeness: - -# Title: {input_key} -# Analysis: -# {output_value} - -# Please verify: -# 1. Is the linguistic analysis accurate and complete? -# 2. Are the thematic elements well-identified and relevant? -# 3. Is the genre prediction reasonable, and is the market appeal justified? -# 4. Is the plot synopsis creative and relevant to the title? -# 5. Are the comparative analysis and market trend assessment insightful? - -# Respond with YES if all criteria are met satisfactorily, or NO if any aspect is lacking or could be significantly improved, with a brief explanation. -# """ - -# response = completion( -# model="gpt-4o-mini", -# messages=[{"role": "user", "content": validation_prompt}], -# ) - -# validation_result = response.choices[0].message.content.strip() -# if "NO" in validation_result: -# raise ValueError(f"LLM Validation Failed: {validation_result}") - - -# # Apply operation -# if __name__ == "__main__": -# pipeline = dataset.map(BookTitleAnalyzer(model=MODEL)) - -# pipeline = pipeline.build(sample_size=15) -# # result = pipeline.execute() - -# # # Print results -# # for title, analysis in result: -# # print(f"\nTitle: {title}") -# # print("Analysis:") -# # print(analysis) -# # print("-" * 70) - -# # # Print the number of titles analyzed -# # print(f"Number of titles analyzed: {len(result)}") diff --git a/tests/test_reduce.py b/tests/test_reduce.py index 69912ff8..92affe58 100644 --- a/tests/test_reduce.py +++ b/tests/test_reduce.py @@ -21,8 +21,8 @@ def reduce_config(): return { "type": "reduce", "reduce_key": "category", - "prompt": "Categorize and summarize the following items: {{ values }} Provide a brief summary of the category and list the most common themes.", - "fold_prompt": "Combine the following category summaries: {{ output }} with new items: {{ values }}. Provide an updated summary and list of common themes.", + "prompt": "Categorize and summarize the following items: {{ inputs }} Provide a brief summary of the category and list the most common themes.", + "fold_prompt": "Combine the following category summaries: {{ output }} with new items: {{ inputs }}. Provide an updated summary and list of common themes.", "merge_prompt": "Merge the following category summaries: {% for output in outputs %}{{ output.summary }}, Themes: {{ output.themes }}{% if not loop.last %} | {% endif %}{% endfor %}. Provide a final summary and list of common themes for each category.", "output": {"schema": {"summary": "string", "themes": "list[string]"}}, "fold_batch_size": 3, @@ -150,8 +150,8 @@ def test_reduce_operation_non_commutative(default_model, max_threads): non_commutative_config = { "reduce_key": "sequence", "commutative": False, - "prompt": "Combine the sentences in '{{ values }}'. Maintain order.", - "fold_prompt": "Combine sequences: Previous result '{{ output }}', New value '{{ values[0] }}'. Maintain order.", + "prompt": "Combine the sentences in '{{ inputs }}'. Maintain order.", + "fold_prompt": "Combine sequences: Previous result '{{ output }}', New value '{{ inputs[0] }}'. Maintain order.", "fold_batch_size": 1, "output": {"schema": {"combined_result": "string"}}, } diff --git a/tests/test_reduce_value_sampling.py b/tests/test_reduce_value_sampling.py index 7210f584..2ae48e80 100644 --- a/tests/test_reduce_value_sampling.py +++ b/tests/test_reduce_value_sampling.py @@ -35,7 +35,7 @@ def test_random_sampling(default_model, max_threads, large_sample_data): config = { "reduce_key": "group", "value_sampling": {"enabled": True, "method": "random", "sample_size": 50}, - "prompt": "Summarize the following texts: {{ values|map(attribute='text')|join(' | ') }}", + "prompt": "Summarize the following texts: {{ inputs|map(attribute='text')|join(' | ') }}", "output": {"schema": {"summary": "string"}}, } @@ -52,7 +52,7 @@ def test_first_n_sampling(default_model, max_threads, large_sample_data): config = { "reduce_key": "group", "value_sampling": {"enabled": True, "method": "first_n", "sample_size": 100}, - "prompt": "Summarize the following texts: {{ values|map(attribute='text')|join(' | ') }}", + "prompt": "Summarize the following texts: {{ inputs|map(attribute='text')|join(' | ') }}", "output": {"schema": {"summary": "string"}}, } @@ -75,7 +75,7 @@ def test_cluster_sampling(default_model, max_threads, large_sample_data): "embedding_model": "text-embedding-3-small", "embedding_keys": ["text"], }, - "prompt": "Summarize the following texts: {{ values|map(attribute='text')|join(' | ') }}", + "prompt": "Summarize the following texts: {{ inputs|map(attribute='text')|join(' | ') }}", "output": {"schema": {"summary": "string"}}, } @@ -99,7 +99,7 @@ def test_semantic_similarity_sampling(default_model, max_threads, large_sample_d "embedding_keys": ["text"], "query_text": "technology", }, - "prompt": "Summarize the following texts: {{ values|map(attribute='text')|join(' | ') }}", + "prompt": "Summarize the following texts: {{ inputs|map(attribute='text')|join(' | ') }}", "output": {"schema": {"summary": "string"}}, } @@ -126,7 +126,7 @@ def test_invalid_sampling_method(default_model, max_threads, large_sample_data): "method": "invalid_method", "sample_size": 50, }, - "prompt": "Summarize the following texts: {{ values|map(attribute='text')|join(' | ') }}", + "prompt": "Summarize the following texts: {{ inputs|map(attribute='text')|join(' | ') }}", "output": {"schema": {"summary": "string"}}, } diff --git a/tests/test_synth_resolve.py b/tests/test_synth_resolve.py index 24cdba46..4b244588 100644 --- a/tests/test_synth_resolve.py +++ b/tests/test_synth_resolve.py @@ -48,7 +48,7 @@ def config_yaml(sample_data): "optimize": False, "reduce_key": "medication", "output": {"schema": {"summary": "string"}}, - "prompt": "Summarize the usage of the medication '{{ reduce_key }}' based on the following contexts:\n\n{% for item in values %}{{ item.text }}\n{% endfor %}\n\nProvide a brief summary of how this medication is typically used.", + "prompt": "Summarize the usage of the medication '{{ reduce_key }}' based on the following contexts:\n\n{% for item in inputs %}{{ item.text }}\n{% endfor %}\n\nProvide a brief summary of how this medication is typically used.", }, }, "pipeline": { @@ -97,6 +97,7 @@ def test_synth_resolve(config_yaml): assert "reduce_key" in synthesized_op["_intermediates"] assert "comparison_prompt" in synthesized_op assert "resolution_prompt" in synthesized_op + assert "blocking_threshold" in synthesized_op break if synthesized_resolve_found: diff --git a/todos.md b/todos.md index 9e304c17..e4e0b51d 100644 --- a/todos.md +++ b/todos.md @@ -65,9 +65,9 @@ TODO: - [ ] Support this kind of chunking in the optimizer - [x] Extract headers & levels from documents, and add the level hierarchy to the chunk. - [x] Support tool use in map operators -- [ ] Fix DSL to handle inputs like we've done in the Overleaf writeup -- [ ] Support prompts exceeding context windows; figure out how to throw out data / prioritize elements +- [x] Support prompts exceeding context windows; figure out how to throw out data / prioritize elements - [ ] Support retries in the optimizers +- [ ] Fix bug in recursively optimizing reduce in the map optimizer - [ ] Write tests for optimizers - [ ] Filter optimizer - [x] Extend map optimizer to support filter diff --git a/workloads/medical/filter_opt.yaml b/workloads/medical/filter_opt.yaml index 4ec2ad47..58c2aeed 100644 --- a/workloads/medical/filter_opt.yaml +++ b/workloads/medical/filter_opt.yaml @@ -16,11 +16,11 @@ operations: count: 1 split_key: src summary_model: gpt-4o-mini - summary_prompt: 'Summarize the following chunk: {{ chunk_content }} + summary_prompt: "Summarize the following chunk: {{ chunk_content }} Identify and highlight any mentions of pain reliever medications in the conversation - for clarity in the analysis task.' + for clarity in the analysis task." type: split subfilter_extract_medical_info: model: gpt-4o-mini @@ -28,15 +28,15 @@ operations: schema: _short_explanation: string references_pain_reliever: bool - prompt: 'Analyze the provided portion of the conversation: + prompt: "Analyze the provided portion of the conversation: {{ input.chunk_content }} Does this part of the transcript mention a pain reliever medication? Provide - the result as a boolean value under the key ''references_pain_reliever''. Only - process the main chunk.' + the result as a boolean value under the key 'references_pain_reliever'. Only + process the main chunk." type: filter subreduce_extract_medical_info: commutative: true @@ -50,22 +50,23 @@ operations: _short_explanation: string references_pain_reliever: bool pass_through: true - prompt: "Combine the following chunk results to determine if the full conversation\ - \ transcript references a pain reliever medication:\n\n{% for value in values\ + prompt: + "Combine the following chunk results to determine if the full conversation\ + \ transcript references a pain reliever medication:\n\n{% for value in inputs\ \ %}\n- {{ value._short_explanation }}\n (References pain reliever: {{ value.references_pain_reliever\ \ }})\n{% endfor %}\n\nBased on the information above, does the complete transcript\ \ mention any pain reliever medication? Provide a final answer and a brief explanation." reduce_key: - - extract_medical_info_chunk_group_id + - extract_medical_info_chunk_group_id type: reduce pipeline: output: path: /Users/shreyashankar/Documents/hacking/motion-v3/workloads/medical/pain_reliever_references.json type: file steps: - - input: transcripts - name: medical_info_extraction - operations: - - split_extract_medical_info - - subfilter_extract_medical_info - - subreduce_extract_medical_info + - input: transcripts + name: medical_info_extraction + operations: + - split_extract_medical_info + - subfilter_extract_medical_info + - subreduce_extract_medical_info diff --git a/workloads/medical/full.yaml b/workloads/medical/full.yaml index 677539a8..e4b84965 100644 --- a/workloads/medical/full.yaml +++ b/workloads/medical/full.yaml @@ -37,7 +37,7 @@ operations: Are these medications likely to be the same or closely related? resolution_prompt: | Given the following matched medication entries: - {% for entry in matched_entries %} + {% for entry in inputs %} Entry {{ loop.index }}: {{ entry.medication }} {% endfor %} diff --git a/workloads/medical/full_opt.yaml b/workloads/medical/full_opt.yaml index fb35f7e5..13b6859b 100644 --- a/workloads/medical/full_opt.yaml +++ b/workloads/medical/full_opt.yaml @@ -45,7 +45,7 @@ operations: medication: str resolution_prompt: "Given the following matched medication entries: - {% for entry in matched_entries %} + {% for entry in inputs %} Entry {{ loop.index }}: {{ entry.medication }} diff --git a/workloads/medical/reduce.yaml b/workloads/medical/reduce.yaml index 69c601b9..43927a67 100644 --- a/workloads/medical/reduce.yaml +++ b/workloads/medical/reduce.yaml @@ -19,7 +19,7 @@ operations: prompt: | Analyze the following doctor-patient transcripts where the medication "{{ reduce_key }}" is mentioned: - {% for item in values %} + {% for item in inputs %} Transcript {{ loop.index }}: {{ item.src }} diff --git a/workloads/medical/reduce_opt.yaml b/workloads/medical/reduce_opt.yaml index cfba3f97..26ecc50c 100644 --- a/workloads/medical/reduce_opt.yaml +++ b/workloads/medical/reduce_opt.yaml @@ -7,7 +7,8 @@ operations: summarize_medication_contexts: commutative: true fold_batch_size: 15 - fold_prompt: 'Summarize the settings in which patients take the medication "{{ + fold_prompt: + 'Summarize the settings in which patients take the medication "{{ reduce_key }}" by combining the new information in the following doctor-patient transcripts with the current summary: @@ -22,7 +23,7 @@ operations: New Transcripts: - {% for value in values %} + {% for value in inputs %} Transcript {{ loop.index + 1 }}: @@ -44,7 +45,8 @@ operations: medication: str src: str merge_batch_size: 2 - merge_prompt: 'Combine the summaries produced by the previous fold operations + merge_prompt: + 'Combine the summaries produced by the previous fold operations into a comprehensive summary. These fold outputs are based on separate analyses of doctor-patient transcripts where the medication "{{ reduce_key }}" is mentioned. The goal is to merge these summaries into one cohesive output. @@ -68,11 +70,12 @@ operations: output: schema: summary: str - prompt: 'Analyze the following doctor-patient transcripts where the medication + prompt: + 'Analyze the following doctor-patient transcripts where the medication "{{ reduce_key }}" is mentioned: - {% for item in values %} + {% for item in inputs %} Transcript {{ loop.index }}: @@ -97,14 +100,14 @@ operations: ' reduce_key: - - medication + - medication type: reduce pipeline: output: path: /Users/shreyashankar/Documents/hacking/motion-v3/workloads/medical/medication_context_summaries.json type: file steps: - - input: deduped_meds - name: medication_context_summarization - operations: - - summarize_medication_contexts + - input: deduped_meds + name: medication_context_summarization + operations: + - summarize_medication_contexts diff --git a/workloads/medical/resolve.yaml b/workloads/medical/resolve.yaml index f19248c8..9cafee1e 100644 --- a/workloads/medical/resolve.yaml +++ b/workloads/medical/resolve.yaml @@ -38,7 +38,7 @@ operations: Are these medications likely to be the same or closely related? resolution_prompt: | Given the following matched medication entries: - {% for entry in matched_entries %} + {% for entry in inputs %} Entry {{ loop.index }}: {{ entry.medication }} {% endfor %} diff --git a/workloads/medical/resolve_opt.yaml b/workloads/medical/resolve_opt.yaml index 1ce77855..bb2bf700 100644 --- a/workloads/medical/resolve_opt.yaml +++ b/workloads/medical/resolve_opt.yaml @@ -48,7 +48,7 @@ operations: medication: str resolution_prompt: "Given the following matched medication entries: - {% for entry in matched_entries %} + {% for entry in inputs %} Entry {{ loop.index }}: {{ entry.medication }} diff --git a/workloads/medical/synth_resolve.yaml b/workloads/medical/synth_resolve.yaml index 53e858e1..3b0ce981 100644 --- a/workloads/medical/synth_resolve.yaml +++ b/workloads/medical/synth_resolve.yaml @@ -34,7 +34,7 @@ operations: prompt: | Analyze the following transcripts related to the medication "{{ reduce_key }}": - {% for item in values %} + {% for item in inputs %} Transcript {{ loop.index }}: {{ item.src }} {% endfor %} diff --git a/workloads/medical/synth_resolve_opt.yaml b/workloads/medical/synth_resolve_opt.yaml index aea7dc98..2106c96e 100644 --- a/workloads/medical/synth_resolve_opt.yaml +++ b/workloads/medical/synth_resolve_opt.yaml @@ -10,7 +10,8 @@ operations: output: schema: medication: list[str] - prompt: 'Analyze the following transcript of a conversation between a doctor and + prompt: + "Analyze the following transcript of a conversation between a doctor and a patient: @@ -21,7 +22,7 @@ operations: If no medications are mentioned, return an empty list. - ' + " type: map summarize_medications: name: summarize_medications @@ -29,11 +30,12 @@ operations: output: schema: symptoms_summary: str - prompt: 'Analyze the following transcripts related to the medication "{{ reduce_key + prompt: + 'Analyze the following transcripts related to the medication "{{ reduce_key }}": - {% for item in values %} + {% for item in inputs %} Transcript {{ loop.index }}: @@ -48,11 +50,12 @@ operations: ' reduce_key: - - medication + - medication type: reduce synthesized_resolve_0: _intermediates: - map_prompt: 'Analyze the following transcript of a conversation between a doctor + map_prompt: + "Analyze the following transcript of a conversation between a doctor and a patient: @@ -63,14 +66,15 @@ operations: If no medications are mentioned, return an empty list. - ' + " reduce_key: - - medication + - medication blocking_keys: - - medication + - medication blocking_threshold: 0.7778 comparison_model: gpt-4o-mini - comparison_prompt: 'Compare the following two medications extracted from the conversation + comparison_prompt: + 'Compare the following two medications extracted from the conversation between a doctor and a patient: @@ -93,8 +97,9 @@ operations: schema: medication: string resolution_model: gpt-4o-mini - resolution_prompt: "Analyze the following duplicate medication entries:\n\n{%\ - \ for entry in matched_entries %}\nEntry {{ loop.index }}:\n{{ entry | tojson\ + resolution_prompt: + "Analyze the following duplicate medication entries:\n\n{%\ + \ for entry in inputs %}\nEntry {{ loop.index }}:\n{{ entry | tojson\ \ }}\n\n{% endfor %}\n\nCreate a single, consolidated medication key that combines\ \ the information from all duplicate entries. When merging, follow these guidelines:\n\ 1. If medication names appear in different formats or abbreviations, standardize\ @@ -116,10 +121,10 @@ pipeline: path: /Users/shreyashankar/Documents/hacking/motion-v3/workloads/medical/summarized_medical_info_synth.json type: file steps: - - input: transcripts - name: medical_info_extraction - operations: - - extract_medications - - unnest_medications - - synthesized_resolve_0 - - summarize_medications + - input: transcripts + name: medical_info_extraction + operations: + - extract_medications + - unnest_medications + - synthesized_resolve_0 + - summarize_medications