Skip to content

Commit

Permalink
chore: Make query stage / shuffle code easier to understand (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Dec 19, 2024
1 parent 151a0e2 commit 2783613
Show file tree
Hide file tree
Showing 23 changed files with 72 additions and 77 deletions.
7 changes: 4 additions & 3 deletions datafusion_ray/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def execute_query_stage(

# if the query stage has a single output partition then we need to execute for the output
# partition, otherwise we need to execute in parallel for each input partition
concurrency = stage.get_input_partition_count()
concurrency = stage.get_execution_partition_count()
output_partitions_count = stage.get_output_partition_count()
if output_partitions_count == 1:
# reduce stage
Expand Down Expand Up @@ -159,5 +159,6 @@ def plan(self, execution_plan: Any) -> List[pa.RecordBatch]:
)
_, partitions = ray.get(future)
# assert len(partitions) == 1, len(partitions)
result_set = ray.get(partitions[0])
return result_set
record_batches = ray.get(partitions[0])
# filter out empty batches
return [batch for batch in record_batches if batch.num_rows > 0]
2 changes: 1 addition & 1 deletion src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ mod test {
let query_stage = graph.query_stages.get(&id).unwrap();
output.push_str(&format!(
"Query Stage #{id} ({} -> {}):\n{}\n",
query_stage.get_input_partition_count(),
query_stage.get_execution_partition_count(),
query_stage.get_output_partition_count(),
displayable(query_stage.plan.as_ref()).indent(false)
));
Expand Down
42 changes: 19 additions & 23 deletions src/query_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::context::serialize_execution_plan;
use crate::shuffle::{ShuffleCodec, ShuffleReaderExec};
use crate::shuffle::{ShuffleCodec, ShuffleReaderExec, ShuffleWriterExec};
use datafusion::error::Result;
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, Partitioning};
use datafusion::prelude::SessionContext;
Expand Down Expand Up @@ -60,8 +60,8 @@ impl PyQueryStage {
self.stage.get_child_stage_ids()
}

pub fn get_input_partition_count(&self) -> usize {
self.stage.get_input_partition_count()
pub fn get_execution_partition_count(&self) -> usize {
self.stage.get_execution_partition_count()
}

pub fn get_output_partition_count(&self) -> usize {
Expand All @@ -75,16 +75,6 @@ pub struct QueryStage {
pub plan: Arc<dyn ExecutionPlan>,
}

fn _get_output_partition_count(plan: &dyn ExecutionPlan) -> usize {
// UnknownPartitioning and HashPartitioning with empty expressions will
// both return 1 partition.
match plan.properties().output_partitioning() {
Partitioning::UnknownPartitioning(_) => 1,
Partitioning::Hash(expr, _) if expr.is_empty() => 1,
p => p.partition_count(),
}
}

impl QueryStage {
pub fn new(id: usize, plan: Arc<dyn ExecutionPlan>) -> Self {
Self { id, plan }
Expand All @@ -96,21 +86,27 @@ impl QueryStage {
ids
}

/// Get the input partition count. This is the same as the number of concurrent tasks
/// when we schedule this query stage for execution
pub fn get_input_partition_count(&self) -> usize {
if self.plan.children().is_empty() {
// leaf node (file scan)
self.plan.output_partitioning().partition_count()
/// Get the number of partitions that can be executed in parallel
pub fn get_execution_partition_count(&self) -> usize {
if let Some(shuffle) = self.plan.as_any().downcast_ref::<ShuffleWriterExec>() {
// use the partitioning of the input to the shuffle write because we are
// really executing that and then using the shuffle writer to repartition
// the output
shuffle.input_plan.output_partitioning().partition_count()
} else {
self.plan.children()[0]
.output_partitioning()
.partition_count()
// for any other plan, use its output partitioning
self.plan.output_partitioning().partition_count()
}
}

pub fn get_output_partition_count(&self) -> usize {
_get_output_partition_count(self.plan.as_ref())
// UnknownPartitioning and HashPartitioning with empty expressions will
// both return 1 partition.
match self.plan.properties().output_partitioning() {
Partitioning::UnknownPartitioning(_) => 1,
Partitioning::Hash(expr, _) if expr.is_empty() => 1,
p => p.partition_count(),
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/shuffle/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl PhysicalExtensionCodec for ShuffleCodec {
};
PlanType::ShuffleReader(reader)
} else if let Some(writer) = node.as_any().downcast_ref::<ShuffleWriterExec>() {
let plan = PhysicalPlanNode::try_from_physical_plan(writer.plan.clone(), self)?;
let plan = PhysicalPlanNode::try_from_physical_plan(writer.input_plan.clone(), self)?;
let partitioning =
encode_partitioning_scheme(writer.properties().output_partitioning())?;
let writer = ShuffleWriterExecNode {
Expand Down
10 changes: 5 additions & 5 deletions src/shuffle/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use std::sync::Arc;
#[derive(Debug)]
pub struct ShuffleWriterExec {
pub stage_id: usize,
pub(crate) plan: Arc<dyn ExecutionPlan>,
pub(crate) input_plan: Arc<dyn ExecutionPlan>,
/// Output partitioning
properties: PlanProperties,
/// Directory to write shuffle files from
Expand Down Expand Up @@ -84,7 +84,7 @@ impl ShuffleWriterExec {

Self {
stage_id,
plan,
input_plan: plan,
properties,
shuffle_dir: shuffle_dir.to_string(),
metrics: ExecutionPlanMetricsSet::new(),
Expand All @@ -98,11 +98,11 @@ impl ExecutionPlan for ShuffleWriterExec {
}

fn schema(&self) -> SchemaRef {
self.plan.schema()
self.input_plan.schema()
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.plan]
vec![&self.input_plan]
}

fn with_new_children(
Expand All @@ -122,7 +122,7 @@ impl ExecutionPlan for ShuffleWriterExec {
self.stage_id
);

let mut stream = self.plan.execute(input_partition, context)?;
let mut stream = self.input_plan.execute(input_partition, context)?;
let write_time =
MetricBuilder::new(&self.metrics).subset_time("write_time", input_partition);
let repart_time =
Expand Down
2 changes: 1 addition & 1 deletion testdata/expected-plans/q1.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "l_return
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=0, input_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2))

Query Stage #2 (2 -> 1):
Query Stage #2 (1 -> 1):
SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS LAST]
ShuffleReaderExec(stage_id=1, input_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q10.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ ShuffleWriterExec(stage_id=7, output_partitioning=Hash([Column { name: "c_custke
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=6, input_partitioning=Hash([Column { name: "c_custkey", index: 0 }, Column { name: "c_name", index: 1 }, Column { name: "c_acctbal", index: 2 }, Column { name: "c_phone", index: 3 }, Column { name: "n_name", index: 4 }, Column { name: "c_address", index: 5 }, Column { name: "c_comment", index: 6 }], 2))

Query Stage #8 (2 -> 1):
Query Stage #8 (1 -> 1):
SortPreservingMergeExec: [revenue@2 DESC], fetch=20
ShuffleReaderExec(stage_id=7, input_partitioning=Hash([Column { name: "c_custkey", index: 0 }, Column { name: "c_name", index: 1 }, Column { name: "c_acctbal", index: 3 }, Column { name: "c_phone", index: 6 }, Column { name: "n_name", index: 4 }, Column { name: "c_address", index: 5 }, Column { name: "c_comment", index: 7 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q11.txt
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ ShuffleWriterExec(stage_id=10, output_partitioning=Hash([Column { name: "ps_part
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=9, input_partitioning=Hash([Column { name: "ps_partkey", index: 0 }], 2))

Query Stage #11 (2 -> 1):
Query Stage #11 (1 -> 1):
SortPreservingMergeExec: [value@1 DESC]
ShuffleReaderExec(stage_id=10, input_partitioning=Hash([Column { name: "ps_partkey", index: 0 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q12.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name: "l_shipmo
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=2, input_partitioning=Hash([Column { name: "l_shipmode", index: 0 }], 2))

Query Stage #4 (2 -> 1):
Query Stage #4 (1 -> 1):
SortPreservingMergeExec: [l_shipmode@0 ASC NULLS LAST]
ShuffleReaderExec(stage_id=3, input_partitioning=Hash([Column { name: "l_shipmode", index: 0 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q13.txt
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name: "c_count"
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=2, input_partitioning=Hash([Column { name: "c_count", index: 0 }], 2))

Query Stage #4 (2 -> 1):
Query Stage #4 (1 -> 1):
SortPreservingMergeExec: [custdist@1 DESC, c_count@0 DESC]
ShuffleReaderExec(stage_id=3, input_partitioning=Hash([Column { name: "c_count", index: 0 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q16.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ ShuffleWriterExec(stage_id=6, output_partitioning=Hash([Column { name: "p_brand"
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=5, input_partitioning=Hash([Column { name: "p_brand", index: 0 }, Column { name: "p_type", index: 1 }, Column { name: "p_size", index: 2 }], 2))

Query Stage #7 (2 -> 1):
Query Stage #7 (1 -> 1):
SortPreservingMergeExec: [supplier_cnt@3 DESC, p_brand@0 ASC NULLS LAST, p_type@1 ASC NULLS LAST, p_size@2 ASC NULLS LAST]
ShuffleReaderExec(stage_id=6, input_partitioning=Hash([Column { name: "p_brand", index: 0 }, Column { name: "p_type", index: 1 }, Column { name: "p_size", index: 2 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q18.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ ShuffleWriterExec(stage_id=6, output_partitioning=Hash([Column { name: "c_name",
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=5, input_partitioning=Hash([Column { name: "c_name", index: 0 }, Column { name: "c_custkey", index: 1 }, Column { name: "o_orderkey", index: 2 }, Column { name: "o_orderdate", index: 3 }, Column { name: "o_totalprice", index: 4 }], 2))

Query Stage #7 (2 -> 1):
Query Stage #7 (1 -> 1):
SortPreservingMergeExec: [o_totalprice@4 DESC, o_orderdate@3 ASC NULLS LAST], fetch=100
ShuffleReaderExec(stage_id=6, input_partitioning=Hash([Column { name: "c_name", index: 0 }, Column { name: "c_custkey", index: 1 }, Column { name: "o_orderkey", index: 2 }, Column { name: "o_orderdate", index: 3 }, Column { name: "o_totalprice", index: 4 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q2.txt
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ ShuffleWriterExec(stage_id=17, output_partitioning=Hash([Column { name: "p_partk
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=16, input_partitioning=Hash([Column { name: "ps_partkey", index: 1 }, Column { name: "min(partsupp.ps_supplycost)", index: 0 }], 2))

Query Stage #18 (2 -> 1):
Query Stage #18 (1 -> 1):
SortPreservingMergeExec: [s_acctbal@0 DESC, n_name@2 ASC NULLS LAST, s_name@1 ASC NULLS LAST, p_partkey@3 ASC NULLS LAST], fetch=100
ShuffleReaderExec(stage_id=17, input_partitioning=Hash([Column { name: "p_partkey", index: 3 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q20.txt
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ ShuffleWriterExec(stage_id=8, output_partitioning=Hash([], 2))
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=7, input_partitioning=Hash([Column { name: "ps_suppkey", index: 0 }], 2))

Query Stage #9 (2 -> 1):
Query Stage #9 (1 -> 1):
SortPreservingMergeExec: [s_name@0 ASC NULLS LAST]
ShuffleReaderExec(stage_id=8, input_partitioning=Hash([], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q21.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ ShuffleWriterExec(stage_id=10, output_partitioning=Hash([Column { name: "s_name"
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=9, input_partitioning=Hash([Column { name: "s_name", index: 0 }], 2))

Query Stage #11 (2 -> 1):
Query Stage #11 (1 -> 1):
SortPreservingMergeExec: [numwait@1 DESC, s_name@0 ASC NULLS LAST], fetch=100
ShuffleReaderExec(stage_id=10, input_partitioning=Hash([Column { name: "s_name", index: 0 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q22.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ ShuffleWriterExec(stage_id=4, output_partitioning=Hash([Column { name: "cntrycod
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=3, input_partitioning=Hash([Column { name: "cntrycode", index: 0 }], 2))

Query Stage #5 (2 -> 1):
Query Stage #5 (1 -> 1):
SortPreservingMergeExec: [cntrycode@0 ASC NULLS LAST]
ShuffleReaderExec(stage_id=4, input_partitioning=Hash([Column { name: "cntrycode", index: 0 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q3.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ ShuffleWriterExec(stage_id=5, output_partitioning=Hash([Column { name: "l_orderk
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=4, input_partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 1 }, Column { name: "o_shippriority", index: 2 }], 2))

Query Stage #6 (2 -> 1):
Query Stage #6 (1 -> 1):
SortPreservingMergeExec: [revenue@1 DESC, o_orderdate@2 ASC NULLS LAST], fetch=10
ShuffleReaderExec(stage_id=5, input_partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 2 }, Column { name: "o_shippriority", index: 3 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q4.txt
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name: "o_orderp
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=2, input_partitioning=Hash([Column { name: "o_orderpriority", index: 0 }], 2))

Query Stage #4 (2 -> 1):
Query Stage #4 (1 -> 1):
SortPreservingMergeExec: [o_orderpriority@0 ASC NULLS LAST]
ShuffleReaderExec(stage_id=3, input_partitioning=Hash([Column { name: "o_orderpriority", index: 0 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q5.txt
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ ShuffleWriterExec(stage_id=11, output_partitioning=Hash([Column { name: "n_name"
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=10, input_partitioning=Hash([Column { name: "n_name", index: 0 }], 2))

Query Stage #12 (2 -> 1):
Query Stage #12 (1 -> 1):
SortPreservingMergeExec: [revenue@1 DESC]
ShuffleReaderExec(stage_id=11, input_partitioning=Hash([Column { name: "n_name", index: 0 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q7.txt
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ ShuffleWriterExec(stage_id=11, output_partitioning=Hash([Column { name: "supp_na
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=10, input_partitioning=Hash([Column { name: "supp_nation", index: 0 }, Column { name: "cust_nation", index: 1 }, Column { name: "l_year", index: 2 }], 2))

Query Stage #12 (2 -> 1):
Query Stage #12 (1 -> 1):
SortPreservingMergeExec: [supp_nation@0 ASC NULLS LAST, cust_nation@1 ASC NULLS LAST, l_year@2 ASC NULLS LAST]
ShuffleReaderExec(stage_id=11, input_partitioning=Hash([Column { name: "supp_nation", index: 0 }, Column { name: "cust_nation", index: 1 }, Column { name: "l_year", index: 2 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q8.txt
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ ShuffleWriterExec(stage_id=15, output_partitioning=Hash([Column { name: "o_year"
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=14, input_partitioning=Hash([Column { name: "o_year", index: 0 }], 2))

Query Stage #16 (2 -> 1):
Query Stage #16 (1 -> 1):
SortPreservingMergeExec: [o_year@0 ASC NULLS LAST]
ShuffleReaderExec(stage_id=15, input_partitioning=Hash([Column { name: "o_year", index: 0 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q9.txt
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ ShuffleWriterExec(stage_id=11, output_partitioning=Hash([Column { name: "nation"
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=10, input_partitioning=Hash([Column { name: "nation", index: 0 }, Column { name: "o_year", index: 1 }], 2))

Query Stage #12 (2 -> 1):
Query Stage #12 (1 -> 1):
SortPreservingMergeExec: [nation@0 ASC NULLS LAST, o_year@1 DESC]
ShuffleReaderExec(stage_id=11, input_partitioning=Hash([Column { name: "nation", index: 0 }, Column { name: "o_year", index: 1 }], 2))

52 changes: 25 additions & 27 deletions tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,42 @@

from datafusion_ray.context import DatafusionRayContext
from datafusion import SessionContext, SessionConfig, RuntimeConfig, col, lit, functions as F
import pytest

@pytest.fixture
def df_ctx():
"""Fixture to create a DataFusion context."""
# used fixed partition count so that tests are deterministic on different environments
config = SessionConfig().with_target_partitions(4)
return SessionContext(config=config)

def test_basic_query_succeed():
df_ctx = SessionContext()
ctx = DatafusionRayContext(df_ctx)
@pytest.fixture
def ctx(df_ctx):
"""Fixture to create a Datafusion Ray context."""
return DatafusionRayContext(df_ctx)

def test_basic_query_succeed(df_ctx, ctx):
df_ctx.register_csv("tips", "examples/tips.csv", has_header=True)
# TODO why does this return a single batch and not a list of batches?
record_batches = ctx.sql("SELECT * FROM tips")
assert record_batches[0].num_rows == 244
assert len(record_batches) <= 4
num_rows = sum(batch.num_rows for batch in record_batches)
assert num_rows == 244

def test_aggregate_csv():
df_ctx = SessionContext()
ctx = DatafusionRayContext(df_ctx)
def test_aggregate_csv(df_ctx, ctx):
df_ctx.register_csv("tips", "examples/tips.csv", has_header=True)
record_batches = ctx.sql("select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker")
assert isinstance(record_batches, list)
# TODO why does this return many empty batches?
num_rows = 0
for record_batch in record_batches:
num_rows += record_batch.num_rows
assert len(record_batches) <= 4
num_rows = sum(batch.num_rows for batch in record_batches)
assert num_rows == 4

def test_aggregate_parquet():
df_ctx = SessionContext()
ctx = DatafusionRayContext(df_ctx)
def test_aggregate_parquet(df_ctx, ctx):
df_ctx.register_parquet("tips", "examples/tips.parquet")
record_batches = ctx.sql("select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker")
# TODO why does this return many empty batches?
num_rows = 0
for record_batch in record_batches:
num_rows += record_batch.num_rows
assert len(record_batches) <= 4
num_rows = sum(batch.num_rows for batch in record_batches)
assert num_rows == 4

def test_aggregate_parquet_dataframe():
df_ctx = SessionContext()
ray_ctx = DatafusionRayContext(df_ctx)
def test_aggregate_parquet_dataframe(df_ctx, ctx):
df = df_ctx.read_parquet(f"examples/tips.parquet")
df = (
df.aggregate(
Expand All @@ -62,12 +62,10 @@ def test_aggregate_parquet_dataframe():
.filter(col("day") != lit("Dinner"))
.aggregate([col("sex"), col("smoker")], [F.avg(col("tip_pct")).alias("avg_pct")])
)
ray_results = ray_ctx.plan(df.execution_plan())
ray_results = ctx.plan(df.execution_plan())
df_ctx.create_dataframe([ray_results]).show()


def test_no_result_query():
df_ctx = SessionContext()
ctx = DatafusionRayContext(df_ctx)
def test_no_result_query(df_ctx, ctx):
df_ctx.register_csv("tips", "examples/tips.csv", has_header=True)
ctx.sql("CREATE VIEW tips_view AS SELECT * FROM tips")

0 comments on commit 2783613

Please sign in to comment.