From eeecee225d1dadb0bf11bb4007a3e718ef96c47b Mon Sep 17 00:00:00 2001 From: Dylan Pulver Date: Thu, 4 Jul 2024 19:44:25 -0400 Subject: [PATCH] safety/scan/finder --- safety/scan/finder/file_finder.py | 75 +++++++++++++++++++++++++------ safety/scan/finder/handlers.py | 74 +++++++++++++++++++++++------- 2 files changed, 119 insertions(+), 30 deletions(-) diff --git a/safety/scan/finder/file_finder.py b/safety/scan/finder/file_finder.py index aaacd7ea..1aab8400 100644 --- a/safety/scan/finder/file_finder.py +++ b/safety/scan/finder/file_finder.py @@ -13,6 +13,16 @@ LOG = logging.getLogger(__name__) def should_exclude(excludes: Set[Path], to_analyze: Path) -> bool: + """ + Determines whether a given path should be excluded based on the provided exclusion set. + + Args: + excludes (Set[Path]): Set of paths to exclude. + to_analyze (Path): The path to analyze. + + Returns: + bool: True if the path should be excluded, False otherwise. + """ if not to_analyze.is_absolute(): to_analyze = to_analyze.resolve() @@ -27,7 +37,7 @@ def should_exclude(excludes: Set[Path], to_analyze: Path) -> bool: return True except ValueError: pass - + return False @@ -37,25 +47,46 @@ class FileFinder(): find depending on the language type. """ - def __init__(self, max_level: int, ecosystems: List[Ecosystem], target: Path, - console, live_status=None, - exclude: Optional[List[str]] = None, - include_files: Optional[Dict[FileType, List[Path]]] = None, - handlers: Optional[Set[FileHandler]] = None) -> None: + def __init__( + self, + max_level: int, + ecosystems: List[Ecosystem], + target: Path, + console, + live_status=None, + exclude: Optional[List[str]] = None, + include_files: Optional[Dict[FileType, List[Path]]] = None, + handlers: Optional[Set[FileHandler]] = None + ) -> None: + """ + Initializes the FileFinder with the specified parameters. + + Args: + max_level (int): Maximum directory depth to search. + ecosystems (List[Ecosystem]): List of ecosystems to consider. + target (Path): Target directory to search. + console: Console object for output. + live_status: Live status object for updates. + exclude (Optional[List[str]]): List of patterns to exclude from the search. + include_files (Optional[Dict[FileType, List[Path]]]): Dictionary of files to include in the search. + handlers (Optional[Set[FileHandler]]): Set of file handlers. + """ self.max_level = max_level self.target = target self.include_files = include_files + # If no handlers are provided, initialize them from the ecosystem mapping if not handlers: - handlers = set(ECOSYSTEM_HANDLER_MAPPING[ecosystem]() + handlers = set(ECOSYSTEM_HANDLER_MAPPING[ecosystem]() for ecosystem in ecosystems) - + self.handlers = handlers self.file_count = 0 self.exclude_dirs: Set[Path] = set() self.exclude_files: Set[Path] = set() exclude = [] if not exclude else exclude + # Populate the exclude_dirs and exclude_files sets based on the provided patterns for pattern in exclude: for path in Path(target).glob(pattern): if path.is_dir(): @@ -65,8 +96,18 @@ def __init__(self, max_level: int, ecosystems: List[Ecosystem], target: Path, self.console = console self.live_status = live_status - - def process_directory(self, dir_path, max_deep: Optional[int]=None) -> Tuple[str, Dict[str, Set[Path]]]: + + def process_directory(self, dir_path: str, max_deep: Optional[int] = None) -> Tuple[str, Dict[str, Set[Path]]]: + """ + Processes the specified directory to find files matching the handlers' criteria. + + Args: + dir_path (str): The directory path to process. + max_deep (Optional[int]): Maximum depth to search within the directory. + + Returns: + Tuple[str, Dict[str, Set[Path]]]: The directory path and a dictionary of file types and their corresponding paths. + """ files: Dict[str, Set[Path]] = {} level : int = 0 initial_depth = len(Path(dir_path).parts) - 1 @@ -77,22 +118,24 @@ def process_directory(self, dir_path, max_deep: Optional[int]=None) -> Tuple[str root_path = Path(root) current_depth = len(root_path.parts) - initial_depth + # Filter directories based on exclusion criteria dirs[:] = [d for d in dirs if not should_exclude(excludes=self.exclude_dirs, to_analyze=(root_path / Path(d)))] - if dirs: LOG.info(f"Directories to inspect -> {', '.join(dirs)}") - + LOG.info(f"Current -> {root}") if self.live_status: self.live_status.update(f":mag: Scanning {root}") + # Stop descending into directories if the maximum depth is reached if max_deep is not None and current_depth > max_deep: # Don't go deeper del dirs[:] + # Filter filenames based on exclusion criteria filenames[:] = [f for f in filenames if not should_exclude( - excludes=self.exclude_files, + excludes=self.exclude_files, to_analyze=Path(f))] self.file_count += len(filenames) @@ -111,4 +154,10 @@ def process_directory(self, dir_path, max_deep: Optional[int]=None) -> Tuple[str return dir_path, files def search(self) -> Tuple[str, Dict[str, Set[Path]]]: + """ + Initiates the search for files within the target directory. + + Returns: + Tuple[str, Dict[str, Set[Path]]]: The target directory and a dictionary of file types and their corresponding paths. + """ return self.process_directory(self.target, self.max_level) diff --git a/safety/scan/finder/handlers.py b/safety/scan/finder/handlers.py index 4e2f6966..80a3db6d 100644 --- a/safety/scan/finder/handlers.py +++ b/safety/scan/finder/handlers.py @@ -2,7 +2,7 @@ import os from pathlib import Path from types import MappingProxyType -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Optional, Tuple from safety_schemas.models import Ecosystem, FileType @@ -10,11 +10,26 @@ NOT_IMPLEMENTED = "You should implement this." class FileHandler(ABC): - + """ + Abstract base class for file handlers that define how to handle specific types of files + within an ecosystem. + """ + def __init__(self) -> None: self.ecosystem: Optional[Ecosystem] = None def can_handle(self, root: str, file_name: str, include_files: Dict[FileType, List[Path]]) -> Optional[FileType]: + """ + Determines if the handler can handle the given file based on its type and inclusion criteria. + + Args: + root (str): The root directory of the file. + file_name (str): The name of the file. + include_files (Dict[FileType, List[Path]]): Dictionary of file types and their paths to include. + + Returns: + Optional[FileType]: The type of the file if it can be handled, otherwise None. + """ # Keeping it simple for now if not self.ecosystem: @@ -28,54 +43,79 @@ def can_handle(self, root: str, file_name: str, include_files: Dict[FileType, Li return f_type # Let's compare by name only for now - # We can put heavier logic here, but for speed reasons, + # We can put heavier logic here, but for speed reasons, # right now is very basic, we will improve this later. # Custom matching per File Type if file_name.lower().endswith(f_type.value.lower()): return f_type - + return None - + @abstractmethod def download_required_assets(self, session) -> Dict[str, str]: + """ + Abstract method to download required assets for handling files. Should be implemented + by subclasses. + + Args: + session: The session object for making network requests. + + Returns: + Dict[str, str]: A dictionary of downloaded assets. + """ return NotImplementedError(NOT_IMPLEMENTED) class PythonFileHandler(FileHandler): + """ + Handler for Python files within the Python ecosystem. + """ # Example of a Python File Handler - + def __init__(self) -> None: super().__init__() self.ecosystem = Ecosystem.PYTHON - - def download_required_assets(self, session): + + def download_required_assets(self, session) -> None: + """ + Downloads the required assets for handling Python files, specifically the Safety database. + + Args: + session: The session object for making network requests. + """ from safety.safety import fetch_database - + SAFETY_DB_DIR = os.getenv("SAFETY_DB_DIR") db = False if SAFETY_DB_DIR is None else SAFETY_DB_DIR - + # Fetch both the full and partial Safety databases fetch_database(session=session, full=False, db=db, cached=True, - telemetry=True, ecosystem=Ecosystem.PYTHON, + telemetry=True, ecosystem=Ecosystem.PYTHON, from_cache=False) - + fetch_database(session=session, full=True, db=db, cached=True, - telemetry=True, ecosystem=Ecosystem.PYTHON, + telemetry=True, ecosystem=Ecosystem.PYTHON, from_cache=False) class SafetyProjectFileHandler(FileHandler): + """ + Handler for Safety project files within the Safety project ecosystem. + """ # Example of a Python File Handler - + def __init__(self) -> None: super().__init__() self.ecosystem = Ecosystem.SAFETY_PROJECT - - def download_required_assets(self, session): + + def download_required_assets(self, session) -> None: + """ + No required assets to download for Safety project files. + """ pass - +# Mapping of ecosystems to their corresponding file handlers ECOSYSTEM_HANDLER_MAPPING = MappingProxyType({ Ecosystem.PYTHON: PythonFileHandler, Ecosystem.SAFETY_PROJECT: SafetyProjectFileHandler,