From bfcd366717d93aa30fa1bd6558d0eeab46ece15b Mon Sep 17 00:00:00 2001 From: Alan Jowett Date: Wed, 23 Oct 2024 12:22:39 -0700 Subject: [PATCH] PR feedback Signed-off-by: Alan Jowett --- libfuzzer/run_corpus.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/libfuzzer/run_corpus.py b/libfuzzer/run_corpus.py index fc76cea0..04e06fe7 100644 --- a/libfuzzer/run_corpus.py +++ b/libfuzzer/run_corpus.py @@ -8,6 +8,14 @@ import os import argparse import subprocess +import shlex + +# Maximum size limits in bytes (1MB) +MAX_INSTRUCTIONS_SIZE = 1024 * 1024 +MAX_MEMORY_SIZE = 1024 * 1024 + +# Timeout for plugin execution in seconds +PLUGIN_TIMEOUT = 30 def parse_plugin_options(options_str: str) -> str | list[str]: """Parse plugin options string into either a single string or list of options.""" @@ -20,7 +28,6 @@ def parse_plugin_options(options_str: str) -> str | list[str]: # Remove outer quotes and handle escaped quotes return options_str[1:-1].replace(f"\\{quote}", quote) # Split by spaces, preserving quoted substrings - import shlex return shlex.split(options_str) def parse_corpus_file(corpus_file: str) -> tuple[bytes, bytes]: @@ -32,14 +39,14 @@ def parse_corpus_file(corpus_file: str) -> tuple[bytes, bytes]: print(f'Invalid file format (header too short): {corpus_file}') return None, None instructions_length = int.from_bytes(header, byteorder='little') - if instructions_length <= 0 or instructions_length > 1024*1024: # 1MB limit + if instructions_length <= 0 or instructions_length > MAX_INSTRUCTIONS_SIZE: print(f'Invalid instructions length: {instructions_length}') return None, None instructions = f.read(instructions_length) if len(instructions) != instructions_length: print(f'Truncated instructions in file: {corpus_file}') return None, None - memory = f.read(1024*1024) # Read max 1MB of memory + memory = f.read(MAX_INSTRUCTIONS_SIZE) return instructions, memory except IOError as e: print(f'Error reading file {corpus_file}: {e}') @@ -57,7 +64,7 @@ def run_plugin(plugin_path: str, memory_hex: str, options: str | list[str], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - output, stderr = process.communicate(input=instructions, timeout=30) + output, stderr = process.communicate(input=instructions, timeout=PLUGIN_TIMEOUT) if process.returncode != 0: return None, f'Plugin failed with error: {stderr.decode()}' return output, None