Skip to content

Commit

Permalink
Change logic for chunk comaprison and extraction
Browse files Browse the repository at this point in the history
  • Loading branch information
alighazi288 committed Jan 14, 2025
1 parent 6ea3ea8 commit e81ef60
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 189 deletions.
103 changes: 28 additions & 75 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,76 +720,38 @@ def extract_helper(self, item, path, hlm, *, dry_run=False):
pass

def compare_and_extract_chunks(self, item, fs_path):
print(f"Initial fs_path: {fs_path}")
print(f"self.cwd: {self.cwd}")
if fs_path.startswith(self.cwd):
fs_path = fs_path[len(self.cwd) :].lstrip(os.sep)
print(f"Relative fs_path: {fs_path}")

# Construct the final path
fs_path = os.path.normpath(os.path.join(self.cwd, fs_path))
print(f"Final fs_path: {fs_path}")
print(f"File exists at final path: {os.path.isfile(fs_path)}")

os.makedirs(os.path.dirname(fs_path), exist_ok=True)
"""Compare file chunks and patch if needed. Returns True if patching succeeded."""
try:
if os.path.isfile(fs_path):
with open(fs_path, "rb+") as fs_file:
chunk_offset = 0
for chunk_entry in item.chunks:
chunkid_A = chunk_entry.id
size = chunk_entry.size
print(f"Processing chunk at offset {chunk_offset}")
st = os.stat(fs_path, follow_symlinks=False)
if not stat.S_ISREG(st.st_mode):
return False

fs_file.seek(chunk_offset)
data_F = fs_file.read(size)
print(f"Read {len(data_F)} bytes at offset {chunk_offset}")
print(f"File content: {data_F[:20]}...") # Show first 20 bytes

if len(data_F) == size:
chunkid_F = self.key.id_hash(data_F)
print("Comparing hashes:") # Debug
print(f"Archive hash: {chunkid_A.hex()}") # Debug
print(f"File hash: {chunkid_F.hex()}") # Debug
print(f"Hashes match? {chunkid_A == chunkid_F}")
if chunkid_A != chunkid_F:
print("Hashes don't match, fetching new chunk") # Debug
fs_file.seek(chunk_offset) # Go back to the start of the chunk
chunk_data = b"".join(self.pipeline.fetch_many([chunkid_A], ro_type=ROBJ_FILE_STREAM))
print(f"Fetched content: {chunk_data[:20]}...")
fs_file.write(chunk_data)
fs_file.flush()
print("Wrote and flushed new chunk data")
else:
print(f"Chunk size mismatch at offset {chunk_offset}")
fs_file.seek(chunk_offset)
chunk_data = b"".join(self.pipeline.fetch_many([chunkid_A], ro_type=ROBJ_FILE_STREAM))
fs_file.write(chunk_data)
with open(fs_path, "rb+") as fs_file:
chunk_offset = 0
for chunk_entry in item.chunks:
chunkid_A = chunk_entry.id
size = chunk_entry.size

chunk_offset += size
fs_file.seek(chunk_offset)
data_F = fs_file.read(size)

fs_file.truncate(item.size)
print(f"\nFinal file size: {os.path.getsize(fs_path)}")
with open(fs_path, "rb") as f:
print(f"Final content: {f.read()[:20]}...")
else:
with open(fs_path, "wb") as fs_file:
for chunk_entry in item.chunks:
chunk_data = b"".join(self.pipeline.fetch_many([chunk_entry.id], ro_type=ROBJ_FILE_STREAM))
needs_update = True
if len(data_F) == size:
chunkid_F = self.key.id_hash(data_F)
needs_update = chunkid_A != chunkid_F

if needs_update:
chunk_data = b"".join(self.pipeline.fetch_many([chunkid_A], ro_type=ROBJ_FILE_STREAM))
fs_file.seek(chunk_offset)
fs_file.write(chunk_data)
fs_file.truncate(item.size)

with open(fs_path, "rb") as fs_file:
preview = fs_file.read(50)
print(f"Final file size: {os.path.getsize(fs_path)}, Expected: {item.size}")
print(f"Content preview (text): {preview.decode('utf-8', errors='replace')}")
chunk_offset += size

except OSError as e:
print(f"IO error processing {fs_path}: {e}")
raise
except Exception as e:
print(f"Error processing {fs_path}: {str(e)}")
raise
fs_file.truncate(item.size)
return True

except (OSError, Exception):
return False

def extract_item(
self,
Expand All @@ -802,7 +764,6 @@ def extract_item(
hlm=None,
pi=None,
continue_extraction=False,
check_existing=False,
):
"""
Extract archive item.
Expand All @@ -815,7 +776,6 @@ def extract_item(
:param hlm: maps hlid to link_target for extracting subtrees with hardlinks correctly
:param pi: ProgressIndicatorPercent (or similar) for file extraction progress (in bytes)
:param continue_extraction: continue a previously interrupted extraction of same archive
:param check_existing: check against existing file/block device and only retrieve changed data
"""

def same_item(item, st):
Expand All @@ -836,16 +796,6 @@ def same_item(item, st):
# if a previous extraction was interrupted between setting the mtime and setting non-default flags.
return True

if check_existing:
dest = os.path.normpath(self.cwd)
fs_path = os.path.join(dest, item.path)

if not os.path.normpath(fs_path).startswith(dest):
raise Exception(f"Path {fs_path} is outside of extraction directory {dest}")

self.compare_and_extract_chunks(item, fs_path)
return

has_damaged_chunks = "chunks_healthy" in item
if dry_run or stdout:
with self.extract_helper(item, "", hlm, dry_run=dry_run or stdout) as hardlink_set:
Expand Down Expand Up @@ -905,6 +855,9 @@ def make_parent(path):
with self.extract_helper(item, path, hlm) as hardlink_set:
if hardlink_set:
return
if self.compare_and_extract_chunks(item, path):
return

Check warning on line 859 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L859

Added line #L859 was not covered by tests

with backup_io("open"):
fd = open(path, "wb")
with fd:
Expand Down
8 changes: 0 additions & 8 deletions src/borg/archiver/extract_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def do_extract(self, args, repository, manifest, archive):
sparse = args.sparse
strip_components = args.strip_components
continue_extraction = args.continue_extraction
check_existing = args.check_existing
dirs = []
hlm = HardLinkManager(id_type=bytes, info_type=str) # hlid -> path

Expand Down Expand Up @@ -97,7 +96,6 @@ def do_extract(self, args, repository, manifest, archive):
hlm=hlm,
pi=pi,
continue_extraction=continue_extraction,
check_existing=check_existing,
)
except BackupError as e:
self.print_warning_instance(BackupWarning(remove_surrogates(orig_path), e))
Expand Down Expand Up @@ -194,12 +192,6 @@ def build_parser_extract(self, subparsers, common_parser, mid_common_parser):
action="store_true",
help="continue a previously interrupted extraction of same archive",
)
subparser.add_argument(
"--check-existing",
dest="check_existing",
action="store_true",
help="check against existing file/block device and only retrieve changed data",
)
subparser.add_argument("name", metavar="NAME", type=archivename_validator, help="specify the archive name")
subparser.add_argument(
"paths", metavar="PATH", nargs="*", type=PathSpec, help="paths to extract; patterns are supported"
Expand Down
Loading

0 comments on commit e81ef60

Please sign in to comment.