From a1ccd21e2dac6bf19c06456d392ce5292f0d9c27 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 11 Jun 2024 18:08:48 -0700 Subject: [PATCH] [Data] Make `ExecutionPlan.execute` return `RefBundle` instead of `BlockList` (#45852) This PR is part of a larger effort to remove `LazyBlockList`. Currently, `ExecutionPlan.execute()` doesn't always perform execution. Sometimes, it skips execution and returns a `LazyBlockList`. This `LazyBlockList` has a code path separate from the standard streaming execution code path to load data. This PR updates the `execute()` implementation so that it always actually perform execution, and returns a `RefBundle` rather than a `BlockList`. Signed-off-by: Balaji Veeramani Signed-off-by: Balaji Veeramani Co-authored-by: Scott Lee --- .../logical/interfaces/logical_operator.py | 4 + .../logical/operators/read_operator.py | 5 +- python/ray/data/_internal/plan.py | 116 ++++------------ python/ray/data/dataset.py | 79 +++++------ python/ray/data/datasource/datasource.py | 4 + .../data/datasource/file_based_datasource.py | 3 + .../ray/data/datasource/parquet_datasource.py | 3 + python/ray/data/tests/conftest.py | 12 +- python/ray/data/tests/test_binary.py | 1 - python/ray/data/tests/test_consumption.py | 22 +--- python/ray/data/tests/test_csv.py | 9 +- python/ray/data/tests/test_exceptions.py | 5 +- python/ray/data/tests/test_formats.py | 2 +- python/ray/data/tests/test_json.py | 8 +- python/ray/data/tests/test_parquet.py | 47 +++---- python/ray/data/tests/test_sort.py | 4 +- python/ray/data/tests/test_split.py | 18 +-- python/ray/data/tests/test_splitblocks.py | 4 +- python/ray/data/tests/test_sql.py | 58 ++++---- python/ray/data/tests/test_stats.py | 124 ++++++++++++++++-- python/ray/data/tests/test_tensor.py | 4 +- python/ray/data/tests/test_text.py | 1 - 22 files changed, 270 insertions(+), 263 deletions(-) diff --git a/python/ray/data/_internal/logical/interfaces/logical_operator.py b/python/ray/data/_internal/logical/interfaces/logical_operator.py index 20ca42c0c3743..6c0cbfee098b3 100644 --- a/python/ray/data/_internal/logical/interfaces/logical_operator.py +++ b/python/ray/data/_internal/logical/interfaces/logical_operator.py @@ -70,3 +70,7 @@ def num_rows(self) -> Optional[int]: actual computation. """ return None + + def input_files(self) -> Optional[List[str]]: + """The input files of this operator, or ``None`` if not known.""" + return None diff --git a/python/ray/data/_internal/logical/operators/read_operator.py b/python/ray/data/_internal/logical/operators/read_operator.py index bb5e4f0dd9609..1cecb9540f353 100644 --- a/python/ray/data/_internal/logical/operators/read_operator.py +++ b/python/ray/data/_internal/logical/operators/read_operator.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, List, Optional, Union from ray.data._internal.logical.operators.map_operator import AbstractMap from ray.data.datasource.datasource import Datasource, Reader @@ -48,3 +48,6 @@ def schema(self): def num_rows(self): return self._datasource.num_rows() + + def input_files(self) -> Optional[List[str]]: + return self._datasource.input_files() diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index bd3300e43014f..02543a556de92 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -1,7 +1,7 @@ import copy import itertools import logging -from typing import TYPE_CHECKING, Iterator, Optional, Tuple, Type, Union +from typing import TYPE_CHECKING, Iterator, List, Optional, Tuple, Type, Union import pyarrow @@ -90,6 +90,8 @@ def __init__( self._run_by_consumer = run_by_consumer self._dataset_name = None + self._has_started_execution = False + if data_context is None: # Snapshot the current context, so that the config of Datasets is always # determined by the config at the time it was created. @@ -124,10 +126,7 @@ def get_plan_as_string(self, dataset_cls: Type["Dataset"]) -> str: plan_str = "" plan_max_depth = 0 dataset_blocks = None - if ( - self._snapshot_bundle is None - or self._snapshot_operator != self._logical_plan.dag - ): + if not self.has_computed_output(): def generate_logical_plan_string( op: LogicalOperator, @@ -159,21 +158,12 @@ def generate_logical_plan_string( self._logical_plan.dag ) - # Get schema of initial blocks. - if self.needs_eager_execution(): - # In the case where the plan contains only a Read/From operator, - # it is cheap to execute it. - # This allows us to get the most accurate estimates related - # to the dataset, after applying execution plan optimizer rules - # (e.g. number of blocks may change based on parallelism). - self.execute() if self._snapshot_blocks is not None: schema = self._get_unified_blocks_schema( self._snapshot_blocks, fetch_if_missing=False ) dataset_blocks = self._snapshot_blocks else: - assert self._in_blocks is not None schema = self._get_unified_blocks_schema( self._in_blocks, fetch_if_missing=False ) @@ -359,10 +349,7 @@ def schema( return self._schema schema = None - if ( - self._snapshot_bundle is not None - and not self._snapshot_operator.output_dependencies - ): + if self.has_computed_output(): schema = unify_block_metadata_schema(self._snapshot_bundle.metadata) elif self._logical_plan.dag.schema() is not None: schema = self._logical_plan.dag.schema() @@ -390,6 +377,10 @@ def schema( def cache_schema(self, schema: Union[type, "pyarrow.lib.Schema"]): self._schema = schema + def input_files(self) -> Optional[List[str]]: + """Get the input files of the dataset, if available.""" + return self._logical_plan.dag.input_files() + def _get_unified_blocks_schema( self, blocks: BlockList, fetch_if_missing: bool = False ) -> Union[type, "pyarrow.lib.Schema"]: @@ -413,14 +404,6 @@ def _get_unified_blocks_schema( return unified_schema if not fetch_if_missing: return None - # Synchronously fetch the schema. - # For lazy block lists, this launches read tasks and fetches block metadata - # until we find the first valid block schema. This is to minimize new - # computations when fetching the schema. - for _, m in blocks.iter_blocks_with_metadata(): - if m.schema is not None and (m.num_rows is None or m.num_rows > 0): - return m.schema - return None def meta_count(self) -> Optional[int]: """Get the number of rows after applying all plan optimizations, if possible. @@ -430,10 +413,7 @@ def meta_count(self) -> Optional[int]: Returns: The number of records of the result Dataset, or None. """ - if ( - self._snapshot_bundle is not None - and not self._snapshot_operator.output_dependencies - ): + if self.has_computed_output(): num_rows = sum(m.num_rows for m in self._snapshot_bundle.metadata) elif self._logical_plan.dag.num_rows() is not None: num_rows = self._logical_plan.dag.num_rows() @@ -468,18 +448,14 @@ def execute_to_iterator( Returns: Tuple of iterator over output blocks and the executor. """ + self._has_started_execution = True # Always used the saved context for execution. ctx = self._context if self.has_computed_output(): - return ( - self.execute( - allow_clear_input_blocks, force_read=False - ).iter_blocks_with_metadata(), - self._snapshot_stats, - None, - ) + bundle = self.execute(allow_clear_input_blocks) + return iter(bundle.blocks), self._snapshot_stats, None from ray.data._internal.execution.legacy_compat import ( execute_to_legacy_block_iterator, @@ -508,20 +484,19 @@ def execute_to_iterator( def execute( self, allow_clear_input_blocks: bool = True, - force_read: bool = False, preserve_order: bool = False, - ) -> BlockList: + ) -> RefBundle: """Execute this plan. Args: allow_clear_input_blocks: Whether we should try to clear the input blocks for each operator. - force_read: Whether to force the read operator to fully execute. preserve_order: Whether to preserve order in execution. Returns: The blocks of the output dataset. """ + self._has_started_execution = True # Always used the saved context for execution. context = self._context @@ -540,7 +515,6 @@ def execute( from ray.data._internal.execution.legacy_compat import ( _get_initial_stats_from_plan, execute_to_legacy_block_list, - get_legacy_lazy_block_list_read_only, ) if self.is_from_in_memory_only(): @@ -549,12 +523,6 @@ def execute( # recording unnecessary metrics for an empty plan execution. blocks = self._in_blocks stats = _get_initial_stats_from_plan(self) - elif self.is_read_only(): - # If the Dataset is read-only, get the LazyBlockList without - # executing the plan by only fetching metadata available from - # the input Datasource or Reader without executing its ReadTasks. - blocks = get_legacy_lazy_block_list_read_only(self) - stats = _get_initial_stats_from_plan(self) else: from ray.data._internal.execution.streaming_executor import ( StreamingExecutor, @@ -618,36 +586,19 @@ def collect_stats(cur_stats): # Set the snapshot to the output of the final operator. self._snapshot_blocks = blocks - if not isinstance(blocks, LazyBlockList): - self._snapshot_bundle = RefBundle( - tuple(blocks.iter_blocks_with_metadata()), - owns_blocks=blocks._owned_by_consumer, - ) + self._snapshot_bundle = RefBundle( + tuple(blocks.iter_blocks_with_metadata()), + owns_blocks=blocks._owned_by_consumer, + ) self._snapshot_operator = self._logical_plan.dag self._snapshot_stats = stats self._snapshot_stats.dataset_uuid = self._dataset_uuid + return self._snapshot_bundle - # In the case of a read-only dataset, we replace the - # input LazyBlockList with a copy that includes the - # calculated metadata from initializing the InputDataBuffer. - if self.is_read_only(): - self._in_blocks = blocks - if isinstance(self._snapshot_blocks, LazyBlockList) and force_read: - executed_blocks = self._snapshot_blocks.compute_to_blocklist() - # After executing the snapshot blocks, get its updated stats. - # The snapshot blocks after execution will contain the execution stats. - self._snapshot_stats = self._snapshot_blocks.stats() - self._snapshot_blocks = executed_blocks - assert not isinstance(executed_blocks, LazyBlockList), type(executed_blocks) - self._snapshot_bundle = RefBundle( - tuple(executed_blocks.iter_blocks_with_metadata()), - owns_blocks=executed_blocks._owned_by_consumer, - ) - self._snapshot_operator = self._logical_plan.dag - # When force-read is enabled, we similarly update self._in_blocks. - if self.is_read_only(): - self._in_blocks = self._snapshot_blocks - return self._snapshot_blocks + @property + def has_started_execution(self) -> bool: + """Return ``True`` if this plan has been partially or fully executed.""" + return self._has_started_execution def clear_block_refs(self) -> None: """Clear all cached block references of this plan, including input blocks. @@ -679,22 +630,6 @@ def has_lazy_input(self) -> bool: """Return whether this plan has lazy input blocks.""" return isinstance(self._in_blocks, LazyBlockList) - def needs_eager_execution(self, root_op: Optional[LogicalOperator] = None) -> bool: - """Return whether the LogicalPlan corresponding to `root_op` - should be eagerly executed. By default, the last operator of - the LogicalPlan is used. - - This is often useful for input/read-only plans, - where eager execution fetches accurate metadata for the dataset - without executing the underlying read tasks.""" - if root_op is None: - root_op = self._logical_plan.dag - # Since read tasks will not be scheduled until data is consumed or materialized, - # it is cheap to execute the plan (i.e. run the plan optimizer). - # In the case where the data is already in-memory (InputData, - # FromXXX operator), it is similarly also cheap to execute it. - return self.is_from_in_memory_only(root_op) or self.is_read_only(root_op) - def is_read_only(self, root_op: Optional[LogicalOperator] = None) -> bool: """Return whether the LogicalPlan corresponding to `root_op` contains only a Read op. By default, the last operator of @@ -721,8 +656,7 @@ def has_computed_output(self) -> bool: output of this plan. """ return ( - self._snapshot_blocks is not None - and not self._snapshot_blocks.is_cleared() + self._snapshot_bundle is not None and self._snapshot_operator == self._logical_plan.dag ) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 1736f26734ec7..53d0c63dc0cc3 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1392,10 +1392,10 @@ def train(self, data_iterator): "locality_hints must not contain duplicate actor handles" ) - blocks = self._plan.execute() - owned_by_consumer = blocks._owned_by_consumer + bundle = self._plan.execute() + owned_by_consumer = bundle.owns_blocks stats = self._plan.stats() - block_refs, metadata = zip(*blocks.get_blocks_with_metadata()) + block_refs, metadata = zip(*bundle.blocks) if locality_hints is None: blocks = np.array_split(block_refs, n) @@ -1591,11 +1591,11 @@ def split_at_indices(self, indices: List[int]) -> List["MaterializedDataset"]: if indices[0] < 0: raise ValueError("indices must be positive") start_time = time.perf_counter() - block_list = self._plan.execute() + bundle = self._plan.execute() blocks, metadata = _split_at_indices( - block_list.get_blocks_with_metadata(), + bundle.blocks, indices, - block_list._owned_by_consumer, + bundle.owns_blocks, ) split_duration = time.perf_counter() - start_time parent_stats = self._plan.stats() @@ -1605,12 +1605,8 @@ def split_at_indices(self, indices: List[int]) -> List["MaterializedDataset"]: stats = DatasetStats(metadata={"Split": ms}, parent=parent_stats) stats.time_total_s = split_duration - split_block_list = BlockList( - bs, ms, owned_by_consumer=block_list._owned_by_consumer - ) - ref_bundles = _block_list_to_bundles( - split_block_list, block_list._owned_by_consumer - ) + split_block_list = BlockList(bs, ms, owned_by_consumer=bundle.owns_blocks) + ref_bundles = _block_list_to_bundles(split_block_list, bundle.owns_blocks) logical_plan = LogicalPlan(InputData(input_data=ref_bundles)) splits.append( @@ -1618,7 +1614,7 @@ def split_at_indices(self, indices: List[int]) -> List["MaterializedDataset"]: ExecutionPlan( split_block_list, stats, - run_by_consumer=block_list._owned_by_consumer, + run_by_consumer=bundle.owns_blocks, ), logical_plan, ) @@ -1793,32 +1789,25 @@ def union(self, *other: List["Dataset"]) -> "Dataset": Returns: A new dataset holding the rows of the input datasets. """ + from ray.data._internal.execution.legacy_compat import ( + get_legacy_lazy_block_list_read_only, + ) start_time = time.perf_counter() - owned_by_consumer = self._plan.execute()._owned_by_consumer + owned_by_consumer = False datasets = [self] + list(other) - bls: List[BlockList] = [] - has_nonlazy = False - for ds in datasets: - bl = ds._plan.execute() - if not isinstance(bl, LazyBlockList): - has_nonlazy = True - bls.append(bl) + has_nonlazy = any(not ds._plan.is_read_only() for ds in datasets) if has_nonlazy: + ops_to_union = [] blocks = [] metadata = [] - ops_to_union = [] - for idx, bl in enumerate(bls): - if isinstance(bl, LazyBlockList): - bs, ms = bl._get_blocks_with_metadata() - else: - assert isinstance(bl, BlockList), type(bl) - bs, ms = bl._blocks, bl._metadata - op_logical_plan = datasets[idx]._plan._logical_plan + for ds in datasets: + bundle = ds._plan.execute() + op_logical_plan = ds._plan._logical_plan ops_to_union.append(op_logical_plan.dag) - blocks.extend(bs) - metadata.extend(ms) + blocks.extend(bundle.block_refs) + metadata.extend(bundle.metadata) blocklist = BlockList(blocks, metadata, owned_by_consumer=owned_by_consumer) logical_plan = LogicalPlan(UnionLogicalOperator(*ops_to_union)) @@ -1837,7 +1826,8 @@ def union(self, *other: List["Dataset"]) -> "Dataset": ] read_task_names.extend(other_read_names) - for bl in bls: + for ds in datasets: + bl = get_legacy_lazy_block_list_read_only(ds._plan) tasks.extend(bl._tasks) block_partition_refs.extend(bl._block_partition_refs) block_partition_meta_refs.extend(bl._block_partition_meta_refs) @@ -2668,7 +2658,7 @@ def num_blocks(self) -> int: "Call `ds.materialize()` to get a `MaterializedDataset`." ) - @ConsumptionAPI(if_more_than_read=True, pattern="Time complexity:") + @ConsumptionAPI def size_bytes(self) -> int: """Return the in-memory size of the dataset. @@ -2678,18 +2668,16 @@ def size_bytes(self) -> int: >>> ds.size_bytes() 80 - Time complexity: O(1) - Returns: The in-memory size of the dataset in bytes, or None if the in-memory size is not known. """ - metadata = self._plan.execute().get_metadata() + metadata = self._plan.execute().metadata if not metadata or metadata[0].size_bytes is None: return None return sum(m.size_bytes for m in metadata) - @ConsumptionAPI(if_more_than_read=True, pattern="Time complexity:") + @ConsumptionAPI def input_files(self) -> List[str]: """Return the list of input files for the dataset. @@ -2699,18 +2687,11 @@ def input_files(self) -> List[str]: >>> ds.input_files() ['ray-example-data/iris.csv'] - Time complexity: O(num input files) - Returns: The list of input files used to create the dataset, or an empty list if the input files is not known. """ - metadata = self._plan.execute().get_metadata() - files = set() - for m in metadata: - for f in m.input_files: - files.add(f) - return list(files) + return list(set(self._plan.input_files())) @ConsumptionAPI def write_parquet( @@ -3629,7 +3610,7 @@ def write_datasink( datasink.on_write_start() self._write_ds = Dataset(plan, logical_plan).materialize() - blocks = ray.get(self._write_ds._plan.execute().get_blocks()) + blocks = ray.get(self._write_ds._plan.execute().block_refs) assert all( isinstance(block, pd.DataFrame) and len(block) == 1 for block in blocks ) @@ -4586,7 +4567,7 @@ def materialize(self) -> "MaterializedDataset": A MaterializedDataset holding the materialized data blocks. """ copy = Dataset.copy(self, _deep_copy=True, _as=MaterializedDataset) - copy._plan.execute(force_read=True) + copy._plan.execute() blocks = copy._plan._snapshot_blocks blocks_with_metadata = blocks.get_blocks_with_metadata() if blocks else [] @@ -4675,9 +4656,9 @@ def get_internal_block_refs(self) -> List[ObjectRef[Block]]: Returns: A list of references to this dataset's blocks. """ - blocks = self._plan.execute().get_blocks() + block_refs = self._plan.execute().block_refs self._synchronize_progress_bar() - return blocks + return block_refs @DeveloperAPI def has_serializable_lineage(self) -> bool: diff --git a/python/ray/data/datasource/datasource.py b/python/ray/data/datasource/datasource.py index 3eae6acf7845a..5718624ef85f8 100644 --- a/python/ray/data/datasource/datasource.py +++ b/python/ray/data/datasource/datasource.py @@ -109,6 +109,10 @@ def schema(self) -> Optional[Union[type, "pyarrow.lib.Schema"]]: metadata = [read_task.get_metadata() for read_task in read_tasks] return unify_block_metadata_schema(metadata) + def input_files(self) -> Optional[List[str]]: + """Return a list of input files, or ``None`` if unknown.""" + return None + @Deprecated class Reader: diff --git a/python/ray/data/datasource/file_based_datasource.py b/python/ray/data/datasource/file_based_datasource.py index 12d65600f2203..a24d2700a57d3 100644 --- a/python/ray/data/datasource/file_based_datasource.py +++ b/python/ray/data/datasource/file_based_datasource.py @@ -358,6 +358,9 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block]: def supports_distributed_reads(self) -> bool: return self._supports_distributed_reads + def input_files(self) -> Optional[List[str]]: + return self._paths() + def _add_partitions( data: Union["pyarrow.Table", "pd.DataFrame"], partitions: Dict[str, Any] diff --git a/python/ray/data/datasource/parquet_datasource.py b/python/ray/data/datasource/parquet_datasource.py index 703ee63659024..0b2fad2712385 100644 --- a/python/ray/data/datasource/parquet_datasource.py +++ b/python/ray/data/datasource/parquet_datasource.py @@ -468,6 +468,9 @@ def num_rows(self) -> Optional[int]: def schema(self) -> "pyarrow.Schema": return self._inferred_schema + def input_files(self) -> Optional[List[str]]: + return self._pq_paths + def _read_fragments( block_udf, diff --git a/python/ray/data/tests/conftest.py b/python/ray/data/tests/conftest.py index 3a4fcac7e78f4..ad1eb2ef65b17 100644 --- a/python/ray/data/tests/conftest.py +++ b/python/ray/data/tests/conftest.py @@ -218,7 +218,6 @@ def _assert_base_partitioned_ds( num_input_files=2, num_rows=6, schema="{one: int64, two: string}", - num_computed=2, sorted_values=None, ds_take_transform_fn=lambda taken: [[s["one"], s["two"]] for s in taken], sorted_values_transform_fn=lambda sorted_values: sorted_values, @@ -226,7 +225,7 @@ def _assert_base_partitioned_ds( if sorted_values is None: sorted_values = [[1, "a"], [1, "b"], [1, "c"], [3, "e"], [3, "f"], [3, "g"]] # Test metadata ops. - assert ds._plan.execute()._num_computed() == 0 + assert not ds._plan.has_started_execution assert ds.count() == count, f"{ds.count()} != {count}" assert ds.size_bytes() > 0, f"{ds.size_bytes()} <= 0" assert ds.schema() is not None @@ -252,17 +251,8 @@ def _remove_whitespace(ds_str): _remove_whitespace(schema), ) == _remove_whitespace(repr(ds)), ds - if num_computed is not None: - assert ( - ds._plan.execute()._num_computed() == num_computed - ), f"{ds._plan.execute()._num_computed()} != {num_computed}" - # Force a data read. values = ds_take_transform_fn(ds.take_all()) - if num_computed is not None: - assert ( - ds._plan.execute()._num_computed() == num_computed - ), f"{ds._plan.execute()._num_computed()} != {num_computed}" actual_sorted_values = sorted_values_transform_fn(sorted(values)) assert ( actual_sorted_values == sorted_values diff --git a/python/ray/data/tests/test_binary.py b/python/ray/data/tests/test_binary.py index 4f5a3e788ffe6..9b9c0543bcb09 100644 --- a/python/ray/data/tests/test_binary.py +++ b/python/ray/data/tests/test_binary.py @@ -193,7 +193,6 @@ def skip_unpartitioned(kv_dict): count=2, num_rows=2, schema="{bytes: binary}", - num_computed=None, sorted_values=[b"1 a\n1 b\n1 c", b"3 e\n3 f\n3 g"], ds_take_transform_fn=lambda t: extract_values("bytes", t), ) diff --git a/python/ray/data/tests/test_consumption.py b/python/ray/data/tests/test_consumption.py index 5151d9b65e4b2..6d878c6d6b664 100644 --- a/python/ray/data/tests/test_consumption.py +++ b/python/ray/data/tests/test_consumption.py @@ -90,7 +90,7 @@ def test_schema_no_execution(ray_start_regular): last_snapshot, ) # We do not kick off the read task by default. - assert ds._plan._in_blocks._num_computed() == 0 + assert not ds._plan.has_started_execution schema = ds.schema() assert schema.names == ["id"] @@ -99,9 +99,8 @@ def test_schema_no_execution(ray_start_regular): last_snapshot = assert_core_execution_metrics_equals( CoreExecutionMetrics(task_count={}), last_snapshot ) - assert ds._plan._in_blocks._num_computed() == 0 # Fetching the schema should not trigger execution of extra read tasks. - assert ds._plan.execute()._num_computed() == 0 + assert not ds._plan.has_started_execution def test_schema_cached(ray_start_regular): @@ -138,11 +137,11 @@ def check_schema_cached(ds, expected_task_count, last_snapshot): def test_count(ray_start_regular): ds = ray.data.range(100, override_num_blocks=10) # We do not kick off the read task by default. - assert ds._plan._in_blocks._num_computed() == 0 + assert not ds._plan.has_started_execution assert ds.count() == 100 # Getting number of rows should not trigger execution of any read tasks # for ray.data.range(), as the number of rows is known beforehand. - assert ds._plan._in_blocks._num_computed() == 0 + assert not ds._plan.has_started_execution assert_core_execution_metrics_equals( CoreExecutionMetrics(task_count={"_get_datasource_or_legacy_reader": 1}) @@ -200,7 +199,7 @@ def delay(row): last_snapshot = assert_core_execution_metrics_equals( CoreExecutionMetrics( task_count={ - "_execute_read_task_split": 20, + "ReadRange": 20, "_get_datasource_or_legacy_reader": 1, } ), @@ -481,7 +480,7 @@ def test_schema_repr(ray_start_regular_shared): def _check_none_computed(ds): # In streaming executor, ds.take() will not invoke partial execution # in LazyBlocklist. - assert ds._plan.execute()._num_computed() == 0 + assert not ds._plan.has_started_execution def test_lazy_loading_exponential_rampup(ray_start_regular_shared): @@ -1238,15 +1237,6 @@ def test_iter_batches_grid(ray_start_regular_shared): assert len(batches[-1]) == num_rows % batch_size -def test_lazy_loading_iter_batches_exponential_rampup(ray_start_regular_shared): - ds = ray.data.range(32, override_num_blocks=8) - expected_num_blocks = [1, 2, 4, 4, 8, 8, 8, 8] - for _, expected in zip(ds.iter_batches(batch_size=None), expected_num_blocks): - # In streaming execution of ds.iter_batches(), there is no partial - # execution so _num_computed() in LazyBlocklist is 0. - _check_none_computed(ds) - - def test_union(ray_start_regular_shared): ds = ray.data.range(20, override_num_blocks=10) diff --git a/python/ray/data/tests/test_csv.py b/python/ray/data/tests/test_csv.py index be8bfac48312d..8d042fffc8e69 100644 --- a/python/ray/data/tests/test_csv.py +++ b/python/ray/data/tests/test_csv.py @@ -94,7 +94,7 @@ def test_csv_read(ray_start_regular_shared, fs, data_path, endpoint_url): df = pd.concat([df1, df2], ignore_index=True) assert df.equals(dsdf) # Test metadata ops. - for block, meta in ds._plan.execute().get_blocks_with_metadata(): + for block, meta in ds._plan.execute().blocks: BlockAccessor.for_block(ray.get(block)).size_bytes() == meta.size_bytes # Three files, override_num_blocks=2. @@ -376,7 +376,6 @@ def test_csv_read_many_files_partitioned( num_input_files=num_files, num_rows=num_rows, schema="{one: int64, two: int64}", - num_computed=num_files, sorted_values=sorted( itertools.chain.from_iterable( list( @@ -661,7 +660,7 @@ def keep_expected_partitions(kv_dict): filesystem=fs, override_num_blocks=6, ) - assert_base_partitioned_ds(ds, num_input_files=6, num_computed=6) + assert_base_partitioned_ds(ds, num_input_files=6) assert ray.get(kept_file_counter.get.remote()) == 6 if i == 0: # expect to skip 1 unpartitioned files in the parent of the base directory @@ -730,7 +729,7 @@ def test_csv_roundtrip(ray_start_regular_shared, fs, data_path): ds2df = ds2.to_pandas() assert ds2df.equals(df) # Test metadata ops. - for block, meta in ds2._plan.execute().get_blocks_with_metadata(): + for block, meta in ds2._plan.execute().blocks: BlockAccessor.for_block(ray.get(block)).size_bytes() == meta.size_bytes # Two blocks. @@ -742,7 +741,7 @@ def test_csv_roundtrip(ray_start_regular_shared, fs, data_path): ds2df = ds2.to_pandas() assert pd.concat([df, df2], ignore_index=True).equals(ds2df) # Test metadata ops. - for block, meta in ds2._plan.execute().get_blocks_with_metadata(): + for block, meta in ds2._plan.execute().blocks: BlockAccessor.for_block(ray.get(block)).size_bytes() == meta.size_bytes diff --git a/python/ray/data/tests/test_exceptions.py b/python/ray/data/tests/test_exceptions.py index 9fe8e04195a09..e3db020c19aee 100644 --- a/python/ray/data/tests/test_exceptions.py +++ b/python/ray/data/tests/test_exceptions.py @@ -41,10 +41,7 @@ class FakeException(Exception): with pytest.raises(FakeException) as exc_info: with patch( - ( - "ray.data._internal.execution.legacy_compat." - "get_legacy_lazy_block_list_read_only" - ), + "ray.data._internal.plan.ExecutionPlan.has_computed_output", side_effect=FakeException(), ): ray.data.range(1).materialize() diff --git a/python/ray/data/tests/test_formats.py b/python/ray/data/tests/test_formats.py index b7fb0e488cc3a..900c249d5030c 100644 --- a/python/ray/data/tests/test_formats.py +++ b/python/ray/data/tests/test_formats.py @@ -106,7 +106,7 @@ def test_fsspec_filesystem(ray_start_regular_shared, tmp_path): ds = ray.data.read_parquet([path1, path2], filesystem=fs) # Test metadata-only parquet ops. - assert ds._plan.execute()._num_computed() == 0 + assert not ds._plan.has_started_execution assert ds.count() == 6 out_path = os.path.join(tmp_path, "out") diff --git a/python/ray/data/tests/test_json.py b/python/ray/data/tests/test_json.py index 789fb9e3b89cf..2b51657f9a20f 100644 --- a/python/ray/data/tests/test_json.py +++ b/python/ray/data/tests/test_json.py @@ -74,7 +74,7 @@ def test_json_read(ray_start_regular_shared, fs, data_path, endpoint_url): df = pd.concat([df1, df2], ignore_index=True) assert df.equals(dsdf) # Test metadata ops. - for block, meta in ds._plan.execute().get_blocks_with_metadata(): + for block, meta in ds._plan.execute().blocks: BlockAccessor.for_block(ray.get(block)).size_bytes() == meta.size_bytes # Three files, override_num_blocks=2. @@ -237,7 +237,7 @@ def test_zipped_json_read(ray_start_regular_shared, tmp_path): dsdf = ds.to_pandas() assert pd.concat([df1, df2], ignore_index=True).equals(dsdf) # Test metadata ops. - for block, meta in ds._plan.execute().get_blocks_with_metadata(): + for block, meta in ds._plan.execute().blocks: BlockAccessor.for_block(ray.get(block)).size_bytes() # Directory and file, two files. @@ -544,7 +544,7 @@ def test_json_roundtrip(ray_start_regular_shared, fs, data_path): ds2df = ds2.to_pandas() assert ds2df.equals(df) # Test metadata ops. - for block, meta in ds2._plan.execute().get_blocks_with_metadata(): + for block, meta in ds2._plan.execute().blocks: BlockAccessor.for_block(ray.get(block)).size_bytes() == meta.size_bytes if fs is None: @@ -571,7 +571,7 @@ def test_json_roundtrip(ray_start_regular_shared, fs, data_path): ds2df = ds2.to_pandas() assert pd.concat([df, df2], ignore_index=True).equals(ds2df) # Test metadata ops. - for block, meta in ds2._plan.execute().get_blocks_with_metadata(): + for block, meta in ds2._plan.execute().blocks: BlockAccessor.for_block(ray.get(block)).size_bytes() == meta.size_bytes diff --git a/python/ray/data/tests/test_parquet.py b/python/ray/data/tests/test_parquet.py index eba0547a0d13e..ccdb6c96db1f7 100644 --- a/python/ray/data/tests/test_parquet.py +++ b/python/ray/data/tests/test_parquet.py @@ -29,14 +29,6 @@ from ray.tests.conftest import * # noqa -def check_num_computed(ds, streaming_expected) -> None: - # When streaming executor is on, the _num_computed() is affected only - # by the ds.schema() which will still partial read the blocks, but will - # not affected by operations like take() as it's executed via streaming - # executor. - assert ds._plan.execute()._num_computed() == streaming_expected - - def test_include_paths(ray_start_regular_shared, tmp_path): path = os.path.join(tmp_path, "test.txt") table = pa.Table.from_pydict({"animals": ["cat", "dog"]}) @@ -327,29 +319,25 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path): ds = ray.data.read_parquet_bulk(data_path, filesystem=fs, file_extensions=None) ds.schema() - # Expect individual file paths to be processed successfully. paths = [path1, path2] ds = ray.data.read_parquet_bulk(paths, filesystem=fs) - # Expect precomputed row counts to be missing. - assert ds._meta_count() is None - # Expect to lazily compute all metadata correctly. - check_num_computed(ds, 0) - assert ds.count() == 6 - assert ds.size_bytes() > 0 - assert ds.schema() is not None input_files = ds.input_files() assert len(input_files) == 2, input_files assert "test1.parquet" in str(input_files) assert "test2.parquet" in str(input_files) - assert str(ds) == "Dataset(num_rows=6, schema={one: int64, two: string})", ds - assert repr(ds) == "Dataset(num_rows=6, schema={one: int64, two: string})", ds - check_num_computed(ds, 2) + assert not ds._plan.has_started_execution + + # Schema isn't available, so we do a partial read. + assert ds.schema() is not None + assert str(ds) == "Dataset(num_rows=?, schema={one: int64, two: string})", ds + assert repr(ds) == "Dataset(num_rows=?, schema={one: int64, two: string})", ds + assert ds._plan.has_started_execution + assert not ds._plan.has_computed_output() # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 2) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -367,10 +355,10 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path): ds = ray.data.read_parquet_bulk(paths + [txt_path], filesystem=fs) assert ds._plan.initial_num_blocks() == 2 + assert not ds._plan.has_started_execution # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 0) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -415,25 +403,22 @@ def test_parquet_read_bulk_meta_provider(ray_start_regular_shared, fs, data_path meta_provider=DefaultFileMetadataProvider(), ) - # Expect precomputed row counts to be missing. - assert ds._meta_count() is None - # Expect to lazily compute all metadata correctly. - check_num_computed(ds, 0) - assert ds.count() == 6 - assert ds.size_bytes() > 0 - assert ds.schema() is not None input_files = ds.input_files() assert len(input_files) == 2, input_files assert "test1.parquet" in str(input_files) assert "test2.parquet" in str(input_files) + assert not ds._plan.has_started_execution + + assert ds.count() == 6 + assert ds.size_bytes() > 0 + assert ds.schema() is not None assert str(ds) == "Dataset(num_rows=6, schema={one: int64, two: string})", ds assert repr(ds) == "Dataset(num_rows=6, schema={one: int64, two: string})", ds - check_num_computed(ds, 2) + assert ds._plan.has_started_execution # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 2) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -1045,7 +1030,7 @@ def test_parquet_roundtrip(ray_start_regular_shared, fs, data_path): ds2df = ds2.to_pandas() assert pd.concat([df1, df2], ignore_index=True).equals(ds2df) # Test metadata ops. - for block, meta in ds2._plan.execute().get_blocks_with_metadata(): + for block, meta in ds2._plan.execute().blocks: BlockAccessor.for_block(ray.get(block)).size_bytes() == meta.size_bytes if fs is None: shutil.rmtree(path) diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index d66bf8f0d7a0d..2246723553ab0 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -181,7 +181,7 @@ def test_sort_arrow_with_empty_blocks( assert ( len( SortTaskSpec.sample_boundaries( - ds._plan.execute().get_blocks(), SortKey("id"), 3 + ds._plan.execute().block_refs, SortKey("id"), 3 ) ) == 2 @@ -282,7 +282,7 @@ def test_sort_pandas_with_empty_blocks(ray_start_regular, use_push_based_shuffle assert ( len( SortTaskSpec.sample_boundaries( - ds._plan.execute().get_blocks(), SortKey("id"), 3 + ds._plan.execute().block_refs, SortKey("id"), 3 ) ) == 2 diff --git a/python/ray/data/tests/test_split.py b/python/ray/data/tests/test_split.py index c9cf6f7f537b8..4fcebe634c285 100644 --- a/python/ray/data/tests/test_split.py +++ b/python/ray/data/tests/test_split.py @@ -344,32 +344,24 @@ def test_split(ray_start_regular_shared): assert ds._block_num_rows() == [2] * 10 datasets = ds.split(5) - assert [2] * 5 == [ - dataset._plan.execute().initial_num_blocks() for dataset in datasets - ] + assert [2] * 5 == [len(dataset._plan.execute().blocks) for dataset in datasets] assert 190 == sum([dataset.sum("id") for dataset in datasets]) datasets = ds.split(3) - assert [4, 3, 3] == [ - dataset._plan.execute().initial_num_blocks() for dataset in datasets - ] + assert [4, 3, 3] == [len(dataset._plan.execute().blocks) for dataset in datasets] assert 190 == sum([dataset.sum("id") for dataset in datasets]) datasets = ds.split(1) - assert [10] == [ - dataset._plan.execute().initial_num_blocks() for dataset in datasets - ] + assert [10] == [len(dataset._plan.execute().blocks) for dataset in datasets] assert 190 == sum([dataset.sum("id") for dataset in datasets]) datasets = ds.split(10) - assert [1] * 10 == [ - dataset._plan.execute().initial_num_blocks() for dataset in datasets - ] + assert [1] * 10 == [len(dataset._plan.execute().blocks) for dataset in datasets] assert 190 == sum([dataset.sum("id") for dataset in datasets]) datasets = ds.split(11) assert [1] * 10 + [0] == [ - dataset._plan.execute().initial_num_blocks() for dataset in datasets + len(dataset._plan.execute().blocks) for dataset in datasets ] assert 190 == sum([dataset.sum("id") or 0 for dataset in datasets]) diff --git a/python/ray/data/tests/test_splitblocks.py b/python/ray/data/tests/test_splitblocks.py index a4006a9fb67c1..5e6c958006fc0 100644 --- a/python/ray/data/tests/test_splitblocks.py +++ b/python/ray/data/tests/test_splitblocks.py @@ -37,7 +37,7 @@ def test_small_file_split(ray_start_10_cpus_shared, restore_data_context): last_snapshot = assert_core_execution_metrics_equals( CoreExecutionMetrics( task_count={ - "_execute_read_task_split": 1, + "ReadCSV": 1, "_get_datasource_or_legacy_reader": 1, }, ), @@ -76,7 +76,7 @@ def test_small_file_split(ray_start_10_cpus_shared, restore_data_context): last_snapshot = assert_core_execution_metrics_equals( CoreExecutionMetrics( task_count={ - "_execute_read_task_split": 1, + "ReadCSV->SplitBlocks(10)": 1, }, ), last_snapshot, diff --git a/python/ray/data/tests/test_sql.py b/python/ray/data/tests/test_sql.py index 4b42efc3dc368..0c0bc04adc637 100644 --- a/python/ray/data/tests/test_sql.py +++ b/python/ray/data/tests/test_sql.py @@ -265,35 +265,49 @@ def request_get_mock(url, params=None, **kwargs): ray.init() # test query with a table name - result = ray.data.read_databricks_tables( - warehouse_id=warehouse_id, - table="table1", - catalog="catalog1", - schema="db1", - override_num_blocks=5, - ).to_pandas() - + result = ( + ray.data.read_databricks_tables( + warehouse_id=warehouse_id, + table="table1", + catalog="catalog1", + schema="db1", + override_num_blocks=5, + ) + .to_pandas() + .sort_values("c1") + .reset_index(drop=True) + ) pd.testing.assert_frame_equal(result, expected_result_df) # test query with SQL - result = ray.data.read_databricks_tables( - warehouse_id=warehouse_id, - query="select * from table1", - catalog="catalog1", - schema="db1", - override_num_blocks=5, - ).to_pandas() + result = ( + ray.data.read_databricks_tables( + warehouse_id=warehouse_id, + query="select * from table1", + catalog="catalog1", + schema="db1", + override_num_blocks=5, + ) + .to_pandas() + .sort_values("c1") + .reset_index(drop=True) + ) pd.testing.assert_frame_equal(result, expected_result_df) # test larger parallelism - result = ray.data.read_databricks_tables( - warehouse_id=warehouse_id, - query="select * from table1", - catalog="catalog1", - schema="db1", - override_num_blocks=100, - ).to_pandas() + result = ( + ray.data.read_databricks_tables( + warehouse_id=warehouse_id, + query="select * from table1", + catalog="catalog1", + schema="db1", + override_num_blocks=100, + ) + .to_pandas() + .sort_values("c1") + .reset_index(drop=True) + ) pd.testing.assert_frame_equal(result, expected_result_df) diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 0c91964c625b7..325f0b43cb804 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -528,12 +528,43 @@ def test_dataset__repr__(ray_start_regular_shared, restore_data_context): expected_stats = ( "DatasetStatsSummary(\n" " dataset_uuid=N,\n" - " base_name=None,\n" + " base_name=ReadRange,\n" " number=N,\n" - " extra_metrics={},\n" + " extra_metrics={\n" + " num_inputs_received: N,\n" + " bytes_inputs_received: N,\n" + " num_task_inputs_processed: N,\n" + " bytes_task_inputs_processed: N,\n" + " bytes_inputs_of_submitted_tasks: N,\n" + " num_task_outputs_generated: N,\n" + " bytes_task_outputs_generated: N,\n" + " rows_task_outputs_generated: N,\n" + " num_outputs_taken: N,\n" + " bytes_outputs_taken: N,\n" + " num_outputs_of_finished_tasks: N,\n" + " bytes_outputs_of_finished_tasks: N,\n" + " num_tasks_submitted: N,\n" + " num_tasks_running: Z,\n" + " num_tasks_have_outputs: N,\n" + " num_tasks_finished: N,\n" + " num_tasks_failed: Z,\n" + " block_generation_time: N,\n" + " task_submission_backpressure_time: N,\n" + " obj_store_mem_internal_inqueue_blocks: Z,\n" + " obj_store_mem_internal_inqueue: Z,\n" + " obj_store_mem_internal_outqueue_blocks: Z,\n" + " obj_store_mem_internal_outqueue: Z,\n" + " obj_store_mem_pending_task_inputs: Z,\n" + " obj_store_mem_freed: N,\n" + " obj_store_mem_spilled: Z,\n" + " obj_store_mem_used: A,\n" + " cpu_usage: Z,\n" + " gpu_usage: Z,\n" + " ray_remote_args: {'num_cpus': N, 'scheduling_strategy': 'SPREAD'},\n" + " },\n" " operators_stats=[\n" " OperatorStatsSummary(\n" - " operator_name='Read',\n" + " operator_name='ReadRange',\n" " is_suboperator=False,\n" " time_total_s=T,\n" f" block_execution_summary_str={EXECUTION_STRING}\n" @@ -559,7 +590,30 @@ def test_dataset__repr__(ray_start_regular_shared, restore_data_context): " global_bytes_spilled=M,\n" " global_bytes_restored=M,\n" " dataset_bytes_spilled=M,\n" - " parents=[],\n" + " parents=[\n" + " DatasetStatsSummary(\n" + " dataset_uuid=unknown_uuid,\n" + " base_name=None,\n" + " number=N,\n" + " extra_metrics={},\n" + " operators_stats=[],\n" + " iter_stats=IterStatsSummary(\n" + " wait_time=T,\n" + " get_time=T,\n" + " iter_blocks_local=None,\n" + " iter_blocks_remote=None,\n" + " iter_unknown_location=None,\n" + " next_time=T,\n" + " format_time=T,\n" + " user_time=T,\n" + " total_time=T,\n" + " ),\n" + " global_bytes_spilled=M,\n" + " global_bytes_restored=M,\n" + " dataset_bytes_spilled=M,\n" + " parents=[],\n" + " ),\n" + " ],\n" ")" ) @@ -649,12 +703,43 @@ def check_stats(): " parents=[\n" " DatasetStatsSummary(\n" " dataset_uuid=N,\n" - " base_name=None,\n" + " base_name=ReadRange,\n" " number=N,\n" - " extra_metrics={},\n" + " extra_metrics={\n" + " num_inputs_received: N,\n" + " bytes_inputs_received: N,\n" + " num_task_inputs_processed: N,\n" + " bytes_task_inputs_processed: N,\n" + " bytes_inputs_of_submitted_tasks: N,\n" + " num_task_outputs_generated: N,\n" + " bytes_task_outputs_generated: N,\n" + " rows_task_outputs_generated: N,\n" + " num_outputs_taken: N,\n" + " bytes_outputs_taken: N,\n" + " num_outputs_of_finished_tasks: N,\n" + " bytes_outputs_of_finished_tasks: N,\n" + " num_tasks_submitted: N,\n" + " num_tasks_running: Z,\n" + " num_tasks_have_outputs: N,\n" + " num_tasks_finished: N,\n" + " num_tasks_failed: Z,\n" + " block_generation_time: N,\n" + " task_submission_backpressure_time: N,\n" + " obj_store_mem_internal_inqueue_blocks: Z,\n" + " obj_store_mem_internal_inqueue: Z,\n" + " obj_store_mem_internal_outqueue_blocks: Z,\n" + " obj_store_mem_internal_outqueue: Z,\n" + " obj_store_mem_pending_task_inputs: Z,\n" + " obj_store_mem_freed: N,\n" + " obj_store_mem_spilled: Z,\n" + " obj_store_mem_used: A,\n" + " cpu_usage: Z,\n" + " gpu_usage: Z,\n" + " ray_remote_args: {'num_cpus': N, 'scheduling_strategy': 'SPREAD'},\n" # noqa: E501 + " },\n" " operators_stats=[\n" " OperatorStatsSummary(\n" - " operator_name='Read',\n" + " operator_name='ReadRange',\n" " is_suboperator=False,\n" " time_total_s=T,\n" f" block_execution_summary_str={EXECUTION_STRING}\n" @@ -680,7 +765,30 @@ def check_stats(): " global_bytes_spilled=M,\n" " global_bytes_restored=M,\n" " dataset_bytes_spilled=M,\n" - " parents=[],\n" + " parents=[\n" + " DatasetStatsSummary(\n" + " dataset_uuid=unknown_uuid,\n" + " base_name=None,\n" + " number=N,\n" + " extra_metrics={},\n" + " operators_stats=[],\n" + " iter_stats=IterStatsSummary(\n" + " wait_time=T,\n" + " get_time=T,\n" + " iter_blocks_local=None,\n" + " iter_blocks_remote=None,\n" + " iter_unknown_location=None,\n" + " next_time=T,\n" + " format_time=T,\n" + " user_time=T,\n" + " total_time=T,\n" + " ),\n" + " global_bytes_spilled=M,\n" + " global_bytes_restored=M,\n" + " dataset_bytes_spilled=M,\n" + " parents=[],\n" + " ),\n" + " ],\n" " ),\n" " ],\n" ")" diff --git a/python/ray/data/tests/test_tensor.py b/python/ray/data/tests/test_tensor.py index 71ed971e6e300..852142bf41500 100644 --- a/python/ray/data/tests/test_tensor.py +++ b/python/ray/data/tests/test_tensor.py @@ -1,3 +1,4 @@ +import math import time import numpy as np @@ -40,7 +41,8 @@ def test_tensors_basic(ray_start_regular_shared): assert str(ds) == ( "Dataset(num_rows=6, schema={data: numpy.ndarray(shape=(3, 5), dtype=int64)})" ) - assert ds.size_bytes() == 5 * 3 * 6 * 8 + # The actual size is slightly larger due to metadata. + assert math.isclose(ds.size_bytes(), 5 * 3 * 6 * 8, rel_tol=0.1) # Test row iterator yields tensors. for tensor in ds.iter_rows(): diff --git a/python/ray/data/tests/test_text.py b/python/ray/data/tests/test_text.py index 47dd736fe3bef..5a3239750eb93 100644 --- a/python/ray/data/tests/test_text.py +++ b/python/ray/data/tests/test_text.py @@ -156,7 +156,6 @@ def skip_unpartitioned(kv_dict): assert_base_partitioned_ds( ds, schema="{text: string}", - num_computed=None, sorted_values=["1 a", "1 b", "1 c", "3 e", "3 f", "3 g"], ds_take_transform_fn=_to_lines, )