Skip to content

Commit

Permalink
fix: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
NiwakaDev committed Jan 18, 2025
1 parent c78a492 commit d2cffa8
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 14 deletions.
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ snafu = "0.8"
sysinfo = "0.30"

rustls = { version = "0.23.20", default-features = false } # override by patch, see [patch.crates-io]
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "54a267ac89c09b11c0c88934690530807185d3e7", features = [
sqlparser = { git = "https://github.com/NiwakaDev/sqlparser-rs", rev = "b3584823b236db3c244b70945008f0aae65c5e39", features = [
"visitor",
"serde",
] } # on branch v0.44.x
Expand Down
32 changes: 32 additions & 0 deletions src/common/function/src/scalars/math.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl MathFunction {
registry.register(Arc::new(RateFunction));
registry.register(Arc::new(RangeFunction));
registry.register(Arc::new(ClampFunction));
registry.register(Arc::new(WithinFilterFunction));
}
}

Expand Down Expand Up @@ -87,3 +88,34 @@ impl Function for RangeFunction {
.context(GeneralDataFusionSnafu)
}
}

#[derive(Clone, Debug, Default)]
struct WithinFilterFunction;

impl fmt::Display for WithinFilterFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "WithinFilterFunction")
}
}

impl Function for WithinFilterFunction {
fn name(&self) -> &str {
"within_filter"
}

fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::boolean_datatype())
}

fn signature(&self) -> Signature {
Signature::uniform(
2,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}

fn eval(&self, _func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
Err(DataFusionError::Internal("todo".into())).context(GeneralDataFusionSnafu)
}
}
2 changes: 1 addition & 1 deletion src/log-query/src/log_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ mod tests {
fn test_canonicalize() {
// with 'start' only
let mut tf = TimeFilter {
start: Some("2023-10-01".to_string()),
start: Some("2023".to_string()),
end: None,
span: None,
};
Expand Down
1 change: 1 addition & 0 deletions src/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub mod region_query;
pub mod sql;
pub mod stats;
pub(crate) mod window_sort;
mod within_filter;

#[cfg(test)]
pub(crate) mod test_util;
Expand Down
1 change: 1 addition & 0 deletions src/query/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl DfLogicalPlanner {
let result = sql_to_rel
.statement_to_plan(df_stmt)
.context(PlanSqlSnafu)?;

let plan = RangePlanRewriter::new(table_provider, query_ctx.clone())
.rewrite(result)
.await?;
Expand Down
7 changes: 6 additions & 1 deletion src/query/src/query_engine/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use crate::query_engine::options::QueryOptions;
use crate::query_engine::DefaultSerializer;
use crate::range_select::planner::RangeSelectPlanner;
use crate::region_query::RegionQueryHandlerRef;
use crate::within_filter::WithinFilterRule;
use crate::QueryEngineContext;

/// Query engine global state
Expand Down Expand Up @@ -95,10 +96,14 @@ impl QueryEngineState {
let runtime_env = Arc::new(RuntimeEnv::default());
let session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
// Apply extension rules
let mut extension_rules = Vec::new();
// TODO: remove Vec<Arc<(dyn ExtensionAnalyzerRule + std::marker::Send + Sync + 'static)>>
let mut extension_rules: Vec<
Arc<(dyn ExtensionAnalyzerRule + std::marker::Send + Sync + 'static)>,
> = Vec::new();

// The [`TypeConversionRule`] must be at first
extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
extension_rules.push(Arc::new(WithinFilterRule));

// Apply the datafusion rules
let mut analyzer = Analyzer::new();
Expand Down
96 changes: 96 additions & 0 deletions src/query/src/within_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use chrono::NaiveDate;
use common_time::timestamp::{TimeUnit, Timestamp};
use datafusion::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Filter, LogicalPlan, Operator};

use crate::optimizer::ExtensionAnalyzerRule;
use crate::QueryEngineContext;

pub struct WithinFilterRule;

impl ExtensionAnalyzerRule for WithinFilterRule {
fn analyze(
&self,
plan: LogicalPlan,
_ctx: &QueryEngineContext,
_config: &ConfigOptions,
) -> Result<LogicalPlan> {
plan.transform(|plan| match plan.clone() {
LogicalPlan::Filter(filter) => {
if let Expr::ScalarFunction(func) = &filter.predicate
&& func.func.name() == "within_filter"
{
let column_name = func.args[0].clone();
let time_arg = func.args[1].clone();
if let Expr::Literal(literal) = time_arg
&& let ScalarValue::Utf8(Some(s)) = literal
{
if let Ok(year) = s.parse::<i32>() {
let timestamp = NaiveDate::from_ymd_opt(year, 1, 1).unwrap();
let timestamp = Timestamp::from_chrono_date(timestamp).unwrap();
let value = Some(timestamp.value());
let timestamp = match timestamp.unit() {
TimeUnit::Second => ScalarValue::TimestampSecond(value, None),
TimeUnit::Millisecond => {
ScalarValue::TimestampMillisecond(value, None)
}
TimeUnit::Microsecond => {
ScalarValue::TimestampMicrosecond(value, None)
}
TimeUnit::Nanosecond => {
ScalarValue::TimestampNanosecond(value, None)
}
};
let next_timestamp = NaiveDate::from_ymd_opt(year + 1, 1, 1).unwrap();
let next_timestamp =
Timestamp::from_chrono_date(next_timestamp).unwrap();
let value = Some(next_timestamp.value());
let next_timestamp = match next_timestamp.unit() {
TimeUnit::Second => ScalarValue::TimestampSecond(value, None),
TimeUnit::Millisecond => {
ScalarValue::TimestampMillisecond(value, None)
}
TimeUnit::Microsecond => {
ScalarValue::TimestampMicrosecond(value, None)
}
TimeUnit::Nanosecond => {
ScalarValue::TimestampNanosecond(value, None)
}
};
let left = Expr::BinaryExpr(BinaryExpr {
left: Box::new(column_name.clone()),
op: Operator::GtEq,
right: Box::new(Expr::Literal(timestamp)),
});
let right = Expr::BinaryExpr(BinaryExpr {
left: Box::new(column_name),
op: Operator::Lt,
right: Box::new(Expr::Literal(next_timestamp)),
});
let new_expr = Expr::BinaryExpr(BinaryExpr::new(
Box::new(left),
Operator::And,
Box::new(right),
));
let new_plan =
LogicalPlan::Filter(Filter::try_new(new_expr, filter.input)?);
Ok(Transformed::yes(new_plan))
} else {
Err(DataFusionError::NotImplemented(
"add more formats".to_string(),
))
}
} else {
todo!();
}
} else {
Ok(Transformed::no(plan))
}
}
_ => Ok(Transformed::no(plan)),
})
.map(|t| t.data)
}
}
10 changes: 10 additions & 0 deletions tests/cases/standalone/common/basic.result
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ SELECT * FROM system_metrics;
| host2 | idc_a | 80.0 | 70.3 | 90.0 | 2022-11-03T03:39:57.450 |
+-------+-------+----------+-------------+-----------+-------------------------+

SELECT * FROM system_metrics where ts within '2022-11-03';

+-------+-------+----------+-------------+-----------+-------------------------+
| host | idc | cpu_util | memory_util | disk_util | ts |
+-------+-------+----------+-------------+-----------+-------------------------+
| host1 | idc_a | 11.8 | 10.3 | 10.3 | 2022-11-03T03:39:57.450 |
| host2 | idc_a | 80.0 | 70.3 | 90.0 | 2022-11-03T03:39:57.450 |
| host1 | idc_b | 50.0 | 66.7 | 40.6 | 2022-11-03T03:39:57.450 |
+-------+-------+----------+-------------+-----------+-------------------------+

SELECT count(*) FROM system_metrics;

+----------+
Expand Down
2 changes: 2 additions & 0 deletions tests/cases/standalone/common/basic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ VALUES

SELECT * FROM system_metrics;

SELECT * FROM system_metrics where ts within '2022-11-03';

SELECT count(*) FROM system_metrics;

SELECT avg(cpu_util) FROM system_metrics;
Expand Down

0 comments on commit d2cffa8

Please sign in to comment.