diff --git a/src/borg/archive.py b/src/borg/archive.py index e1cb7f4530..1cadf0cb12 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -719,38 +719,82 @@ def extract_helper(self, item, path, hlm, *, dry_run=False): # In this case, we *want* to extract twice, because there is no other way. pass - def compare_and_extract_chunks(self, item, fs_path): + def compare_and_extract_chunks(self, item, fs_path, st=None, *, pi=None, sparse=False): """Compare file chunks and patch if needed. Returns True if patching succeeded.""" - try: - st = os.stat(fs_path, follow_symlinks=False) - if not stat.S_ISREG(st.st_mode): - return False - - with open(fs_path, "rb+") as fs_file: - chunk_offset = 0 - for chunk_entry in item.chunks: - chunkid_A = chunk_entry.id - size = chunk_entry.size + if st is None or not stat.S_ISREG(st.st_mode): + return False - fs_file.seek(chunk_offset) - data_F = fs_file.read(size) + try: + # First pass: Build fs chunks list + fs_chunks = [] + offset = 0 + with backup_io("open"): + fs_file = open(fs_path, "rb") + with fs_file: + for chunk in item.chunks: + with backup_io("seek"): + fs_file.seek(offset) + with backup_io("read"): + data = fs_file.read(chunk.size) + if len(data) != chunk.size: + fs_chunks.append(None) + else: + fs_chunks.append(ChunkListEntry(id=self.key.id_hash(data), size=chunk.size)) + offset += chunk.size - needs_update = True - if len(data_F) == size: - chunkid_F = self.key.id_hash(data_F) - needs_update = chunkid_A != chunkid_F + # Compare chunks and collect needed chunk IDs + needed_chunks = [] + for fs_chunk, item_chunk in zip(fs_chunks, item.chunks): + if fs_chunk is None or fs_chunk.id != item_chunk.id: + needed_chunks.append(item_chunk) - if needs_update: - chunk_data = b"".join(self.pipeline.fetch_many([chunkid_A], ro_type=ROBJ_FILE_STREAM)) - fs_file.seek(chunk_offset) - fs_file.write(chunk_data) + if not needed_chunks: + return True - chunk_offset += size + # Fetch all needed chunks and iterate through ALL of them + chunk_data_iter = self.pipeline.fetch_many( + [chunk.id for chunk in needed_chunks], is_preloaded=True, ro_type=ROBJ_FILE_STREAM + ) - fs_file.truncate(item.size) - return True + # Second pass: Update file and consume EVERY chunk from the iterator + offset = 0 + item_chunk_size = 0 + with backup_io("open"): + fs_file = open(fs_path, "rb+") + with fs_file: + for fs_chunk, item_chunk in zip(fs_chunks, item.chunks): + with backup_io("seek"): + fs_file.seek(offset) + if fs_chunk is not None and fs_chunk.id == item_chunk.id: + offset += item_chunk.size + item_chunk_size += item_chunk.size + else: + chunk_data = next(chunk_data_iter) + if pi: + pi.show(increase=len(chunk_data), info=[remove_surrogates(item.path)]) + with backup_io("write"): + if sparse and not chunk_data.strip(b"\0"): + fs_file.seek(len(chunk_data), 1) # Seek over sparse section + offset += len(chunk_data) + else: + fs_file.write(chunk_data) + offset += len(chunk_data) + item_chunk_size += len(chunk_data) + with backup_io("truncate_and_attrs"): + fs_file.truncate(item.size) + fs_file.flush() + self.restore_attrs(fs_path, item, fd=fs_file.fileno()) + + # Size verification like extract_item + if "size" in item and item.size != item_chunk_size: + raise BackupError(f"Size inconsistency detected: size {item.size}, chunks size {item_chunk_size}") + + # Damaged chunks check like extract_item + if "chunks_healthy" in item and not item.chunks_healthy: + raise BackupError("File has damaged (all-zero) chunks. Try running borg check --repair.") - except (OSError, Exception): + return True + except OSError: return False def extract_item( @@ -855,7 +899,7 @@ def make_parent(path): with self.extract_helper(item, path, hlm) as hardlink_set: if hardlink_set: return - if self.compare_and_extract_chunks(item, path): + if self.compare_and_extract_chunks(item, path, pi=pi, sparse=sparse): return with backup_io("open"): diff --git a/src/borg/testsuite/archive_test.py b/src/borg/testsuite/archive_test.py index 3304141495..483d19d6a9 100644 --- a/src/borg/testsuite/archive_test.py +++ b/src/borg/testsuite/archive_test.py @@ -426,27 +426,31 @@ def __init__(self): extractor.pipeline = cache extractor.key = key extractor.cwd = str(tmpdir) + extractor.restore_attrs = Mock() # Track fetched chunks across tests fetched_chunks = [] - def create_mock_chunks(test_data, chunk_size=512): - """Helper function to create mock chunks from test data""" + def create_mock_chunks(item_data, chunk_size=4): + """Helper function to create mock chunks from archive data""" chunks = [] - for i in range(0, len(test_data), chunk_size): - chunk_data = test_data[i : i + chunk_size] + for i in range(0, len(item_data), chunk_size): + chunk_data = item_data[i : i + chunk_size] chunk_id = key.id_hash(chunk_data) chunks.append(Mock(id=chunk_id, size=len(chunk_data))) cache.objects[chunk_id] = chunk_data - item = Mock(chunks=chunks, size=len(test_data)) - target_path = str(tmpdir.join("test.txt")) - return item, target_path + item = Mock(spec=["chunks", "size", "__contains__", "get"]) + item.chunks = chunks # Use actual list for chunks + item.size = len(item_data) + item.__contains__ = lambda self, item: item == "size" - def mock_fetch_many(chunk_ids, ro_type): + return item, str(tmpdir.join("test.txt")) + + def mock_fetch_many(chunk_ids, is_preloaded=True, ro_type=None): """Helper function to track and mock chunk fetching""" fetched_chunks.extend(chunk_ids) - return [cache.objects[chunk_id] for chunk_id in chunk_ids] + return iter([cache.objects[chunk_id] for chunk_id in chunk_ids]) def clear_fetched_chunks(): """Helper function to clear tracked chunks between tests""" @@ -462,99 +466,85 @@ def get_fetched_chunks(): @pytest.mark.parametrize( - "name, test_data, initial_data, expected_fetched_chunks, expected_success", + "name, item_data, fs_data, expected_fetched_chunks", [ ( "no_changes", - b"A" * 512, # One complete chunk, no changes needed - b"A" * 512, # Identical content + b"1111", # One complete chunk, no changes needed + b"1111", # Identical content 0, # No chunks should be fetched - True, ), ( "single_chunk_change", - b"A" * 512 + b"B" * 512, # Two chunks - b"A" * 512 + b"X" * 512, # Second chunk different + b"11112222", # Two chunks + b"1111XXXX", # Second chunk different 1, # Only second chunk should be fetched - True, ), ( "cross_boundary_change", - b"A" * 512 + b"B" * 512, # Two chunks - b"A" * 500 + b"X" * 24, # Change crosses chunk boundary + b"11112222", # Two chunks + b"111XX22", # Change crosses chunk boundary 2, # Both chunks need update - True, ), ( "exact_multiple_chunks", - b"A" * 512 + b"B" * 512 + b"C" * 512, # Three complete chunks - b"A" * 512 + b"X" * 512 + b"C" * 512, # Middle chunk different + b"11112222333", # Three chunks (last one partial) + b"1111XXXX333", # Middle chunk different 1, # Only middle chunk fetched - True, ), ( "first_chunk_change", - b"A" * 512 + b"B" * 512, # Two chunks - b"X" * 512 + b"B" * 512, # First chunk different + b"11112222", # Two chunks + b"XXXX2222", # First chunk different 1, # Only first chunk should be fetched - True, ), ( "all_chunks_different", - b"A" * 512 + b"B" * 512, # Two chunks - b"X" * 512 + b"Y" * 512, # Both chunks different + b"11112222", # Two chunks + b"XXXXYYYY", # Both chunks different 2, # Both chunks should be fetched - True, ), ( "partial_last_chunk", - b"A" * 512 + b"B" * 100, # One full chunk + partial - b"A" * 512 + b"X" * 100, # Partial chunk different + b"111122", # One full chunk + partial + b"1111XX", # Partial chunk different 1, # Only second chunk should be fetched - True, ), ], ) -def test_compare_and_extract_chunks( - setup_extractor, name, test_data, initial_data, expected_fetched_chunks, expected_success -): +def test_compare_and_extract_chunks(setup_extractor, name, item_data, fs_data, expected_fetched_chunks): """Test chunk comparison and extraction""" extractor, key, cache, tmpdir, create_mock_chunks, get_fetched_chunks, clear_fetched_chunks = setup_extractor clear_fetched_chunks() - item, target_path = create_mock_chunks(test_data, chunk_size=512) + chunk_size = 4 + item, target_path = create_mock_chunks(item_data, chunk_size=chunk_size) original_chunk_ids = [chunk.id for chunk in item.chunks] # Write initial file state with open(target_path, "wb") as f: - f.write(initial_data) - - result = extractor.compare_and_extract_chunks(item, target_path) - assert result == expected_success - - if expected_success: - # Verify only the expected chunks were fetched - fetched_chunks = get_fetched_chunks() - assert ( - len(fetched_chunks) == expected_fetched_chunks - ), f"Expected {expected_fetched_chunks} chunks to be fetched, got {len(fetched_chunks)}" - - # For single chunk changes, verify it's the correct chunk - if expected_fetched_chunks == 1: - # Find which chunk should have changed by comparing initial_data with test_data - for i, (orig_chunk, mod_chunk) in enumerate( - zip( - [test_data[i : i + 512] for i in range(0, len(test_data), 512)], - [initial_data[i : i + 512] for i in range(0, len(initial_data), 512)], - ) - ): - if orig_chunk != mod_chunk: - assert ( - fetched_chunks[0] == original_chunk_ids[i] - ), f"Wrong chunk fetched. Expected chunk at position {i}" - break - - # Verify final content - with open(target_path, "rb") as f: - assert f.read() == test_data + f.write(fs_data) + + st = os.stat(target_path) + result = extractor.compare_and_extract_chunks(item, target_path, st=st) # Pass st parameter + assert result + + # Verify only the expected chunks were fetched + fetched_chunks = get_fetched_chunks() + assert len(fetched_chunks) == expected_fetched_chunks + + # For single chunk changes, verify it's the correct chunk + if expected_fetched_chunks == 1: + item_chunks = [item_data[i : i + chunk_size] for i in range(0, len(item_data), chunk_size)] + fs_chunks = [fs_data[i : i + chunk_size] for i in range(0, len(fs_data), chunk_size)] + + # Find which chunk should have changed by comparing item_data with fs_data + for i, (item_chunk, fs_chunk) in enumerate(zip(item_chunks, fs_chunks)): + if item_chunk != fs_chunk: + assert fetched_chunks[0] == original_chunk_ids[i] + break + + # Verify final content + with open(target_path, "rb") as f: + assert f.read() == item_data