Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Jowett <[email protected]>
  • Loading branch information
Alan Jowett committed Oct 23, 2024
1 parent ab0aad6 commit bfcd366
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions libfuzzer/run_corpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@
import os
import argparse
import subprocess
import shlex

# Maximum size limits in bytes (1MB)
MAX_INSTRUCTIONS_SIZE = 1024 * 1024
MAX_MEMORY_SIZE = 1024 * 1024

# Timeout for plugin execution in seconds
PLUGIN_TIMEOUT = 30

def parse_plugin_options(options_str: str) -> str | list[str]:
"""Parse plugin options string into either a single string or list of options."""
Expand All @@ -20,7 +28,6 @@ def parse_plugin_options(options_str: str) -> str | list[str]:
# Remove outer quotes and handle escaped quotes
return options_str[1:-1].replace(f"\\{quote}", quote)
# Split by spaces, preserving quoted substrings
import shlex
return shlex.split(options_str)

def parse_corpus_file(corpus_file: str) -> tuple[bytes, bytes]:
Expand All @@ -32,14 +39,14 @@ def parse_corpus_file(corpus_file: str) -> tuple[bytes, bytes]:
print(f'Invalid file format (header too short): {corpus_file}')
return None, None
instructions_length = int.from_bytes(header, byteorder='little')
if instructions_length <= 0 or instructions_length > 1024*1024: # 1MB limit
if instructions_length <= 0 or instructions_length > MAX_INSTRUCTIONS_SIZE:
print(f'Invalid instructions length: {instructions_length}')
return None, None
instructions = f.read(instructions_length)
if len(instructions) != instructions_length:
print(f'Truncated instructions in file: {corpus_file}')
return None, None
memory = f.read(1024*1024) # Read max 1MB of memory
memory = f.read(MAX_INSTRUCTIONS_SIZE)
return instructions, memory
except IOError as e:
print(f'Error reading file {corpus_file}: {e}')
Expand All @@ -57,7 +64,7 @@ def run_plugin(plugin_path: str, memory_hex: str, options: str | list[str],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
output, stderr = process.communicate(input=instructions, timeout=30)
output, stderr = process.communicate(input=instructions, timeout=PLUGIN_TIMEOUT)
if process.returncode != 0:
return None, f'Plugin failed with error: {stderr.decode()}'
return output, None
Expand Down

0 comments on commit bfcd366

Please sign in to comment.