Skip to content

Commit

Permalink
Move ordering to ScanOption, add Scanner test
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Nov 15, 2024
1 parent 9c52d5d commit 937695d
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 19 deletions.
4 changes: 2 additions & 2 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ Status ScannerBuilder::Filter(const compute::Expression& filter) {
}

Status ScannerBuilder::Ordering(const compute::Ordering& ordering) {
ordering_ = ordering;
scan_options_->ordering = ordering;
return Status::OK();
}

Expand Down Expand Up @@ -1005,7 +1005,7 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
auto scan_options = scan_node_options.scan_options;
auto dataset = scan_node_options.dataset;
bool require_sequenced_output = scan_node_options.require_sequenced_output;
Ordering ordering = scan_node_options.ordering;
Ordering ordering = scan_node_options.scan_options->ordering;

RETURN_NOT_OK(NormalizeScanOptions(scan_options, dataset->schema()));

Expand Down
20 changes: 8 additions & 12 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ struct ARROW_DS_EXPORT ScanOptions {
compute::Expression filter = compute::literal(true);
/// A projection expression (which can add/remove/rename columns).
compute::Expression projection;
/// An explicit ordering the scanned data provide. Scan fails on first un-ordered row.
/// Reading un-ordered data partially might succeed if un-ordered rows are not read.
/// Setting an ordering does not actually sort the data but asserts that data in the
/// dataset is ordered as stated during scan.
compute::Ordering ordering = Ordering::Unordered();

/// Schema with which batches will be read from fragments. This is also known as the
/// "reader schema" it will be used (for example) in constructing CSV file readers to
Expand Down Expand Up @@ -505,7 +510,7 @@ class ARROW_DS_EXPORT ScannerBuilder {
///
/// An explicit ordering the scanned data provide. Scan fails on first un-ordered row.
/// Reading un-ordered data partially might succeed if un-ordered rows are not read.
/// Setting an ordering does not actually order the data but asserts that data in the
/// Setting an ordering does not actually sort the data but asserts that data in the
/// dataset is ordered as stated during scan.
Status Ordering(const compute::Ordering& ordering);

Expand Down Expand Up @@ -572,23 +577,14 @@ class ARROW_DS_EXPORT ScanNodeOptions : public acero::ExecNodeOptions {
public:
explicit ScanNodeOptions(std::shared_ptr<Dataset> dataset,
std::shared_ptr<ScanOptions> scan_options,
bool require_sequenced_output,
Ordering ordering = Ordering::Unordered())
bool require_sequenced_output = false)
: dataset(std::move(dataset)),
scan_options(std::move(scan_options)),
require_sequenced_output(require_sequenced_output),
ordering(std::move(ordering)) {}

explicit ScanNodeOptions(std::shared_ptr<Dataset> dataset,
std::shared_ptr<ScanOptions> scan_options,
Ordering ordering = Ordering::Unordered())
: ScanNodeOptions(std::move(dataset), std::move(scan_options), false,
std::move(ordering)) {}
require_sequenced_output(require_sequenced_output) {}

std::shared_ptr<Dataset> dataset;
std::shared_ptr<ScanOptions> scan_options;
bool require_sequenced_output;
Ordering ordering;
};

/// @}
Expand Down
77 changes: 72 additions & 5 deletions cpp/src/arrow/dataset/scanner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -925,9 +925,11 @@ std::ostream& operator<<(std::ostream& out, const TestScannerParams& params) {

class TestScanner : public DatasetFixtureMixinWithParam<TestScannerParams> {
protected:
std::shared_ptr<Scanner> MakeScanner(std::shared_ptr<Dataset> dataset) {
std::shared_ptr<Scanner> MakeScanner(std::shared_ptr<Dataset> dataset,
const Ordering& ordering = Ordering::Unordered()) {
ScannerBuilder builder(std::move(dataset), options_);
ARROW_EXPECT_OK(builder.UseThreads(GetParam().use_threads));
ARROW_EXPECT_OK(builder.Ordering(ordering));
EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish());
return scanner;
}
Expand Down Expand Up @@ -979,6 +981,65 @@ class TestScanner : public DatasetFixtureMixinWithParam<TestScannerParams> {

AssertScanBatchesUnorderedEquals(expected.get(), scanner.get(), 1);
}

std::vector<std::shared_ptr<RecordBatch>> CreateOrderedBatches(int items_per_batch,
int batches,
bool ordered) {
/// produces batches where
/// - first column is asc ordered (unless ordered == false)
/// - second column is desc ordered
/// - third column is constant
/// when ordered == false, the last value in the first column breaks the order (it is
/// 0)
auto step =
ordered ? 1
: std::numeric_limits<unsigned int>::max() / items_per_batch / batches;
auto start = ordered ? 1 : step + items_per_batch * batches;
return gen::Gen({gen::Step(start, step, false), gen::Step(-1, -1, true),
gen::Constant(std::make_shared<Int32Scalar>(42))})
->FailOnError()
->RecordBatches(items_per_batch, batches);
}

void AssertScannerOrdering(bool ordered) {
auto items_per_batch = GetParam().items_per_batch;
auto num_batches = GetParam().num_child_datasets * GetParam().num_batches;

SetSchema({field("f0", uint32()), field("f1", int32()), field("f2", int32())});
auto batches = CreateOrderedBatches(items_per_batch, num_batches, ordered);
auto dataset = std::make_shared<InMemoryDataset>(schema_, batches);

// scanning the dataset always works when not asserting the order
auto scanner = MakeScanner(std::move(dataset));
auto expected = CreateOrderedBatches(items_per_batch, num_batches, ordered);
std::shared_ptr<RecordBatchReader> reader =
std::make_shared<BatchIterator>(schema_, expected);
AssertScanBatchesEquals(reader.get(), scanner.get());

auto ordering = Ordering({compute::SortKey("f0", compute::SortOrder::Ascending)},
compute::NullPlacement::AtStart);

if (ordered) {
// when dataset is ordered, scanning it while asserting the order works fine
dataset = std::make_shared<InMemoryDataset>(schema_, batches);
scanner = MakeScanner(std::move(dataset), ordering);
expected = CreateOrderedBatches(items_per_batch, num_batches, ordered);
reader = std::make_shared<BatchIterator>(schema_, expected);
AssertScanBatchesEquals(reader.get(), scanner.get());
} else {
// when dataset is not ordered, scanning it fails on the conflicting row
dataset = std::make_shared<InMemoryDataset>(schema_, batches);
scanner = MakeScanner(std::move(dataset), ordering);
ASSERT_OK_AND_ASSIGN(auto it, scanner->ScanBatches());
auto next = it.Next();
while (next.ok() && !IsIterationEnd<TaggedRecordBatch>(*next)) {
next = it.Next();
}
// expect iteration to stop on failure status
ASSERT_NOT_OK(next);
ASSERT_EQ(next.status().message(), "Data is not ordered");
}
}
};

TEST_P(TestScanner, Scan) {
Expand All @@ -987,6 +1048,10 @@ TEST_P(TestScanner, Scan) {
AssertScanBatchesUnorderedEqualRepetitionsOf(MakeScanner(batch), batch);
}

TEST_P(TestScanner, ScanOrdering) { AssertScannerOrdering(true); }

TEST_P(TestScanner, ScanOrderingFail) { AssertScannerOrdering(false); }

TEST_P(TestScanner, ScanBatches) {
SetSchema({field("i32", int32()), field("f64", float64())});
auto batch = ConstantArrayGenerator::Zeroes(GetParam().items_per_batch, schema_);
Expand Down Expand Up @@ -3006,8 +3071,9 @@ TEST(ScanNode, AssertOrder) {
// test existing orderings pass
for (const Ordering& ordering :
{asc, desc, asc_desc, asc_desc_rand, desc_asc, desc_asc_rand}) {
declarations = acero::Declaration::Sequence({acero::Declaration(
{"scan", dataset::ScanNodeOptions{dataset, scan_options, ordering}})});
scan_options->ordering = ordering;
declarations = acero::Declaration::Sequence(
{acero::Declaration({"scan", dataset::ScanNodeOptions{dataset, scan_options}})});
ASSERT_OK_AND_ASSIGN(auto actual, acero::DeclarationToTable(declarations));
// Scan node always emits augmented fields so we drop those
ASSERT_OK_AND_ASSIGN(auto actualMinusAugmented, actual->SelectColumns({0, 1, 2}));
Expand All @@ -3017,8 +3083,9 @@ TEST(ScanNode, AssertOrder) {

// test non-existing orderings fail
for (const Ordering& non_ordering : {not_asc, not_asc, unordered}) {
declarations = acero::Declaration::Sequence({acero::Declaration(
{"scan", dataset::ScanNodeOptions{dataset, scan_options, false, non_ordering}})});
scan_options->ordering = non_ordering;
declarations = acero::Declaration::Sequence(
{acero::Declaration({"scan", dataset::ScanNodeOptions{dataset, scan_options}})});
ASSERT_NOT_OK(acero::DeclarationToTable(declarations));
AssertPlanHasAssertOrderNode(declarations, true);
}
Expand Down

0 comments on commit 937695d

Please sign in to comment.