Skip to content

Commit

Permalink
fix: entire source code is no longer stored in memory
Browse files Browse the repository at this point in the history
  • Loading branch information
art1f1c3R committed Jan 28, 2025
1 parent 1ece531 commit 38cc36b
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import yaml

from macaron.config.defaults import defaults
from macaron.errors import ConfigurationError, HeuristicAnalyzerValueError
from macaron.errors import ConfigurationError, HeuristicAnalyzerValueError, SourceCodeError
from macaron.json_tools import JsonType, json_extract
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
Expand Down Expand Up @@ -231,31 +231,30 @@ def analyze_dataflow(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[Heu
analysis_result: dict = {}
result: HeuristicResult = HeuristicResult.SKIP

source_code = pypi_package_json.package_sourcecode
if not source_code:
try:
for filename, content in pypi_package_json.iter_sourcecode():
try:
_ = ast.parse(content.decode("utf-8"))
except (SyntaxError, ValueError) as ast_parse_error:
logger.debug("File %s cannot be parsed as a python file: %s", filename, ast_parse_error)
continue

# tracer = DataFlowTracer()
# tracer.generate_symbol_table(content)

# functioncall_analyzer = FunctionCallAnalyzer(self.suspicious_pattern, tracer)
# is_malware, detail_info = functioncall_analyzer.analyze(content)
# if is_malware:
# result = HeuristicResult.FAIL

# # TODO: Currently, the result collector does not handle the situation that
# # multiple same filename. In the future, this will be replace with absolute path.
# if detail_info:
# analysis_result[filename] = detail_info
except SourceCodeError as sourcecode_error:
error_msg = "Unable to retrieve PyPI package source code"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

for filename, content in source_code.items():
try:
_ = ast.parse(content)
except (SyntaxError, ValueError) as ast_parse_error:
logger.debug("File %s cannot be parsed as a python file: %s", filename, ast_parse_error)
continue

# tracer = DataFlowTracer()
# tracer.generate_symbol_table(content)

# functioncall_analyzer = FunctionCallAnalyzer(self.suspicious_pattern, tracer)
# is_malware, detail_info = functioncall_analyzer.analyze(content)
# if is_malware:
# result = HeuristicResult.FAIL

# # TODO: Currently, the result collector does not handle the situation that
# # multiple same filename. In the future, this will be replace with absolute path.
# if detail_info:
# analysis_result[filename] = detail_info
raise HeuristicAnalyzerValueError(error_msg) from sourcecode_error

return result, analysis_result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,6 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
component=ctx.component,
pypi_registry=pypi_registry,
package_json={},
package_sourcecode={},
package_sourcecode_path="",
)

Expand Down
132 changes: 102 additions & 30 deletions src/macaron/slsa_analyzer/package_registry/pypi_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import tarfile
import tempfile
import urllib.parse
from collections.abc import Callable
from collections.abc import Callable, Iterator
from dataclasses import dataclass
from datetime import datetime

Expand All @@ -31,6 +31,10 @@
logger: logging.Logger = logging.getLogger(__name__)


def _handle_temp_dir_clean(function: Callable, path: str, onerror: tuple) -> None:
raise SourceCodeError(f"Error removing with shutil. function={function}, " f"path={path}, excinfo={onerror}")


class PyPIRegistry(PackageRegistry):
"""This class implements the pypi package registry."""

Expand Down Expand Up @@ -187,10 +191,7 @@ def download_package_json(self, url: str) -> dict:

return res_obj

def _handle_temp_dir_clean(self, function: Callable, path: str, onerror: tuple) -> None:
raise SourceCodeError(f"Error removing with shutil. function={function}, " f"path={path}, excinfo={onerror}")

def download_package_sourcecode(self, url: str) -> tuple[dict[str, bytes], str]:
def download_package_sourcecode(self, url: str) -> str:
"""Download the package source code from pypi registry.
Parameters
Expand All @@ -200,11 +201,14 @@ def download_package_sourcecode(self, url: str) -> tuple[dict[str, bytes], str]:
Returns
-------
tuple[dict[str, bytes], str]
A dictionary of filenames and file contents, and the temp directory with the source code.
"""
sourcecode: dict = {}
str
The temp directory with the source code.
Raises
------
InvalidHTTPResponseError
If the HTTP request to the registry fails or an unexpected response is returned.
"""
# Get name of file.
_, _, file_name = url.rpartition("/")
package_name = re.sub(r"\.tar\.gz$", "", file_name)
Expand All @@ -216,7 +220,7 @@ def download_package_sourcecode(self, url: str) -> tuple[dict[str, bytes], str]:
error_msg = f"Unable to find package source code using URL: {url}"
logger.debug(error_msg)
try:
shutil.rmtree(temp_dir, onerror=self._handle_temp_dir_clean)
shutil.rmtree(temp_dir, onerror=_handle_temp_dir_clean)
except SourceCodeError as tempdir_exception:
tempdir_exception_msg = (
f"Unable to cleanup temporary directory {temp_dir} for source code: {tempdir_exception}"
Expand All @@ -235,7 +239,7 @@ def download_package_sourcecode(self, url: str) -> tuple[dict[str, bytes], str]:
error_msg = f"Error while streaming source file: {stream_error}"
logger.debug(error_msg)
try:
shutil.rmtree(temp_dir, onerror=self._handle_temp_dir_clean)
shutil.rmtree(temp_dir, onerror=_handle_temp_dir_clean)
except SourceCodeError as tempdir_exception:
tempdir_exception_msg = (
f"Unable to cleanup temporary directory {temp_dir} for source code: {tempdir_exception}"
Expand All @@ -249,15 +253,11 @@ def download_package_sourcecode(self, url: str) -> tuple[dict[str, bytes], str]:
with tarfile.open(source_file.name, "r:gz") as sourcecode_tar:
sourcecode_tar.extractall(temp_dir, filter="data")

for member in sourcecode_tar.getmembers():
if member.isfile() and (file_obj := sourcecode_tar.extractfile(member)):
sourcecode[member.name] = file_obj.read()

except tarfile.ReadError as read_error:
error_msg = f"Error reading source code tar file: {read_error}"
logger.debug(error_msg)
try:
shutil.rmtree(temp_dir, onerror=self._handle_temp_dir_clean)
shutil.rmtree(temp_dir, onerror=_handle_temp_dir_clean)
except SourceCodeError as tempdir_exception:
tempdir_exception_msg = (
f"Unable to cleanup temporary directory {temp_dir} for source code: {tempdir_exception}"
Expand All @@ -266,11 +266,16 @@ def download_package_sourcecode(self, url: str) -> tuple[dict[str, bytes], str]:

raise InvalidHTTPResponseError(error_msg) from read_error

extracted_dir = os.listdir(temp_dir)
if len(extracted_dir) == 1 and re.sub(".tar.gz$", "", file_name) == extracted_dir[0]:
# structure used package name and version as top-level directory
temp_dir = os.path.join(temp_dir, extracted_dir[0])

else:
error_msg = f"Unable to extract source code from file {file_name}"
logger.debug(error_msg)
try:
shutil.rmtree(temp_dir, onerror=self._handle_temp_dir_clean)
shutil.rmtree(temp_dir, onerror=_handle_temp_dir_clean)
except SourceCodeError as tempdir_exception:
tempdir_exception_msg = (
f"Unable to cleanup temporary directory {temp_dir} for source code: {tempdir_exception}"
Expand All @@ -281,7 +286,7 @@ def download_package_sourcecode(self, url: str) -> tuple[dict[str, bytes], str]:
raise InvalidHTTPResponseError(error_msg)

logger.debug("Temporary download and unzip of %s stored in %s", file_name, temp_dir)
return sourcecode, temp_dir
return temp_dir

def get_package_page(self, package_name: str) -> str | None:
"""Implement custom API to get package main page.
Expand Down Expand Up @@ -401,9 +406,6 @@ class PyPIPackageJsonAsset:
#: The asset content.
package_json: dict

#: The source code of the package hosted on PyPI
package_sourcecode: dict

#: the source code temporary location name
package_sourcecode_path: str

Expand Down Expand Up @@ -537,7 +539,7 @@ def get_latest_release_upload_time(self) -> str | None:
return None

def download_sourcecode(self) -> bool:
"""Get the source code of the package and store it in the package_sourcecode attribute.
"""Get the source code of the package and store it in a temporary directory.
Returns
-------
Expand All @@ -547,26 +549,22 @@ def download_sourcecode(self) -> bool:
url = self.get_sourcecode_url()
if url:
try:
self.package_sourcecode, self.package_sourcecode_path = self.pypi_registry.download_package_sourcecode(
url
)
self.package_sourcecode_path = self.pypi_registry.download_package_sourcecode(url)
return True
except InvalidHTTPResponseError as error:
logger.debug(error)
return False

def _handle_temp_dir_clean(self, function: Callable, path: str, onerror: tuple) -> None:
raise SourceCodeError(f"Error removing with shutil. function={function}, " f"path={path}, excinfo={onerror}")

def cleanup_sourcecode(self) -> None:
"""
Delete the temporary directory created when downloading the source code.
The package source code is no longer accessible after this.
The package source code is no longer accessible after this, and the package_sourcecode_path
attribute is set to an empty string.
"""
if self.package_sourcecode_path:
try:
shutil.rmtree(self.package_sourcecode_path, onerror=self._handle_temp_dir_clean)
shutil.rmtree(self.package_sourcecode_path, onerror=_handle_temp_dir_clean)
self.package_sourcecode_path = ""
except SourceCodeError as tempdir_exception:
tempdir_exception_msg = (
Expand All @@ -575,3 +573,77 @@ def cleanup_sourcecode(self) -> None:
)
logger.debug(tempdir_exception_msg)
raise tempdir_exception

def get_sourcecode_file_contents(self, path: str) -> bytes:
"""
Get the contents of a single source code file specified by the path.
The path can be relative to the package_sourcecode_path attribute, or an absolute path.
Parameters
----------
path: str
The absolute or relative to package_sourcecode_path file path to open.
Returns
-------
bytes
The raw contents of the source code file.
Raises
------
SourceCodeError
if the source code has not been downloaded, or there is an error accessing the file.
"""
if not self.package_sourcecode_path:
error_msg = "No source code files have been downloaded"
logger.debug(error_msg)
raise SourceCodeError(error_msg)

if not os.path.isabs(path):
path = os.path.join(self.package_sourcecode_path, path)

if not os.path.exists(path):
error_msg = f"Unable to locate file {path}"
logger.debug(error_msg)
raise SourceCodeError(error_msg)

try:
with open(path, "rb") as file:
return file.read()
except OSError as read_error:
error_msg = f"Unable to read file {path}: {read_error}"
logger.debug(error_msg)
raise SourceCodeError(error_msg) from read_error

def iter_sourcecode(self) -> Iterator[tuple[str, bytes]]:
"""
Iterate through all source code files.
Returns
-------
tuple[str, bytes]
The source code file path, and the the raw contents of the source code file.
Raises
------
SourceCodeError
if the source code has not been downloaded.
"""
if not self.package_sourcecode_path:
error_msg = "No source code files have been downloaded"
logger.debug(error_msg)
raise SourceCodeError(error_msg)

for root, _directories, files in os.walk(self.package_sourcecode_path):
for file in files:
if root == ".":
root_path = os.getcwd() + os.linesep
else:
root_path = root
filepath = os.path.join(root_path, file)

with open(filepath, "rb") as handle:
contents = handle.read()

yield filepath, contents

0 comments on commit 38cc36b

Please sign in to comment.