Skip to content

Commit

Permalink
feat(log-query): implement the first part of log query expr
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Feb 15, 2025
1 parent 7fc935c commit 5c13a43
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 15 deletions.
6 changes: 3 additions & 3 deletions src/log-query/src/log_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct LogQuery {
}

/// Expression to calculate on log after filtering.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LogExpr {
NamedIdent(String),
PositionalIdent(usize),
Expand Down Expand Up @@ -289,7 +289,7 @@ pub struct ColumnFilters {
pub filters: Vec<ContentFilter>,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ContentFilter {
// Search-based filters
/// Only match the exact content.
Expand Down Expand Up @@ -317,7 +317,7 @@ pub enum ContentFilter {
Compound(Vec<ContentFilter>, BinaryOperator),
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum BinaryOperator {
And,
Or,
Expand Down
26 changes: 26 additions & 0 deletions src/query/src/log_query/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datafusion::error::DataFusionError;
use log_query::LogExpr;
use snafu::{Location, Snafu};

#[derive(Snafu)]
Expand Down Expand Up @@ -57,6 +58,28 @@ pub enum Error {
location: Location,
feature: String,
},

#[snafu(display("Unknown aggregate function: {name}"))]
UnknownAggregateFunction {
name: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Unknown scalar function: {name}"))]
UnknownScalarFunction {
name: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Unexpected log expression: {expr:?}, expected {expected}"))]
UnexpectedLogExpr {
expr: LogExpr,
expected: String,
#[snafu(implicit)]
location: Location,
},
}

impl ErrorExt for Error {
Expand All @@ -67,6 +90,9 @@ impl ErrorExt for Error {
DataFusionPlanning { .. } => StatusCode::External,
UnknownTable { .. } | TimeIndexNotFound { .. } => StatusCode::Internal,
Unimplemented { .. } => StatusCode::Unsupported,
UnknownAggregateFunction { .. }
| UnknownScalarFunction { .. }
| UnexpectedLogExpr { .. } => StatusCode::InvalidArguments,
}
}

Expand Down
215 changes: 204 additions & 11 deletions src/query/src/log_query/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,35 @@
use catalog::table_source::DfTableSourceProvider;
use common_function::utils::escape_like_pattern;
use datafusion::datasource::DefaultTableSource;
use datafusion_common::ScalarValue;
use datafusion::execution::SessionState;
use datafusion_common::{DFSchema, ScalarValue};
use datafusion_expr::utils::conjunction;
use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion_sql::TableReference;
use datatypes::schema::Schema;
use log_query::{ColumnFilters, LogQuery, TimeFilter};
use log_query::{ColumnFilters, LogExpr, LogQuery, TimeFilter};
use snafu::{OptionExt, ResultExt};
use table::table::adapter::DfTableProviderAdapter;

use crate::log_query::error::{
CatalogSnafu, DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, UnimplementedSnafu,
CatalogSnafu, DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, UnexpectedLogExprSnafu,
UnimplementedSnafu, UnknownAggregateFunctionSnafu, UnknownScalarFunctionSnafu,
UnknownTableSnafu,
};

const DEFAULT_LIMIT: usize = 1000;

pub struct LogQueryPlanner {
table_provider: DfTableSourceProvider,
session_state: SessionState,
}

impl LogQueryPlanner {
pub fn new(table_provider: DfTableSourceProvider) -> Self {
Self { table_provider }
pub fn new(table_provider: DfTableSourceProvider, session_state: SessionState) -> Self {
Self {
table_provider,
session_state,
}
}

pub async fn query_to_plan(&mut self, query: LogQuery) -> Result<LogicalPlan> {
Expand Down Expand Up @@ -100,6 +106,54 @@ impl LogQueryPlanner {
)
.context(DataFusionPlanningSnafu)?;

// Apply log expressions
for expr in &query.exprs {
match expr {
LogExpr::AggrFunc {
name,
args,
by,
range: _range,
} => {
let schema = plan_builder.schema();
let (group_expr, aggr_exprs) = self.build_aggr_func(schema, name, args, by)?;
plan_builder = plan_builder
.aggregate([group_expr], aggr_exprs)
.context(DataFusionPlanningSnafu)?;
}
LogExpr::Filter { expr, filter } => {
let schema = plan_builder.schema();
let expr = self.log_expr_to_df_expr(expr, &schema)?;

Check failure on line 126 in src/query/src/log_query/planner.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
let col_name = expr.schema_name().to_string();
let filter = self.build_column_filter(&ColumnFilters {
column_name: col_name,
filters: vec![filter.clone()],
})?;
if let Some(filter) = filter {
plan_builder = plan_builder
.filter(filter)
.context(DataFusionPlanningSnafu)?;
}
}
LogExpr::ScalarFunc { name, args } => {
let schema = plan_builder.schema();
let expr = self.build_scalar_func(schema, name, args)?;
plan_builder = plan_builder
.project([expr])
.context(DataFusionPlanningSnafu)?;
}
LogExpr::NamedIdent(_) | LogExpr::PositionalIdent(_) => {
// nothing to do
}
_ => {
UnimplementedSnafu {
feature: "log expression",
}
.fail()?;
}
}
}

// Build the final plan
let plan = plan_builder.build().context(DataFusionPlanningSnafu)?;

Expand Down Expand Up @@ -182,6 +236,61 @@ impl LogQueryPlanner {

Ok(conjunction(exprs))
}

fn build_aggr_func(
&self,
schema: &DFSchema,
fn_name: &str,
args: &[LogExpr],
by: &[LogExpr],
) -> Result<(Expr, Vec<Expr>)> {
let aggr_fn = self
.session_state
.aggregate_functions()
.get(fn_name)
.context(UnknownAggregateFunctionSnafu {
name: fn_name.to_string(),
})?;
let args = args
.iter()
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.try_collect::<Vec<_>>()?;
let group_exprs = by
.iter()
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.try_collect::<Vec<_>>()?;
let aggr_expr = aggr_fn.call(args);

Ok((aggr_expr, group_exprs))
}

fn log_expr_to_df_expr(&self, expr: &LogExpr, schema: &DFSchema) -> Result<Expr> {
match expr {
LogExpr::NamedIdent(name) => Ok(col(name)),
LogExpr::PositionalIdent(index) => Ok(col(schema.field(*index).name())),
LogExpr::Literal(literal) => Ok(lit(ScalarValue::Utf8(Some(literal.clone())))),
_ => UnexpectedLogExprSnafu {
expr: expr.clone(),
expected: "named identifier, positional identifier, or literal",
}
.fail(),
}
}

fn build_scalar_func(&self, schema: &DFSchema, name: &str, args: &[LogExpr]) -> Result<Expr> {
let args = args
.iter()
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.try_collect::<Vec<_>>()?;
let func = self.session_state.scalar_functions().get(name).context(
UnknownScalarFunctionSnafu {
name: name.to_string(),
},
)?;
let expr = func.call(args);

Ok(expr)
}
}

#[cfg(test)]
Expand All @@ -192,6 +301,7 @@ mod tests {
use catalog::RegisterTableRequest;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_query::test_util::DummyDecoder;
use datafusion::execution::SessionStateBuilder;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaRef};
use log_query::{ContentFilter, Context, Limit};
Expand Down Expand Up @@ -270,7 +380,8 @@ mod tests {
async fn test_query_to_plan() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let mut planner = LogQueryPlanner::new(table_provider);
let session_state = SessionStateBuilder::new().with_default_features().build();
let mut planner = LogQueryPlanner::new(table_provider, session_state);

let log_query = LogQuery {
table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
Expand Down Expand Up @@ -304,7 +415,8 @@ mod tests {
async fn test_build_time_filter() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let planner = LogQueryPlanner::new(table_provider);
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);

let time_filter = TimeFilter {
start: Some("2021-01-01T00:00:00Z".to_string()),
Expand All @@ -331,7 +443,8 @@ mod tests {
async fn test_build_time_filter_without_end() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let planner = LogQueryPlanner::new(table_provider);
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);

let time_filter = TimeFilter {
start: Some("2021-01-01T00:00:00Z".to_string()),
Expand All @@ -358,7 +471,8 @@ mod tests {
async fn test_build_column_filter() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let planner = LogQueryPlanner::new(table_provider);
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);

let column_filter = ColumnFilters {
column_name: "message".to_string(),
Expand All @@ -384,7 +498,8 @@ mod tests {
async fn test_query_to_plan_with_only_skip() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let mut planner = LogQueryPlanner::new(table_provider);
let session_state = SessionStateBuilder::new().with_default_features().build();
let mut planner = LogQueryPlanner::new(table_provider, session_state);

let log_query = LogQuery {
table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
Expand Down Expand Up @@ -418,7 +533,8 @@ mod tests {
async fn test_query_to_plan_without_limit() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let mut planner = LogQueryPlanner::new(table_provider);
let session_state = SessionStateBuilder::new().with_default_features().build();
let mut planner = LogQueryPlanner::new(table_provider, session_state);

let log_query = LogQuery {
table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
Expand Down Expand Up @@ -455,4 +571,81 @@ mod tests {
assert_eq!(escape_like_pattern("te_st"), "te\\_st");
assert_eq!(escape_like_pattern("te\\st"), "te\\\\st");
}

#[tokio::test]
async fn test_query_to_plan_with_aggr_func() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let mut planner = LogQueryPlanner::new(table_provider, session_state);

let log_query = LogQuery {
table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
time_filter: TimeFilter {
start: Some("2021-01-01T00:00:00Z".to_string()),
end: Some("2021-01-02T00:00:00Z".to_string()),
span: None,
},
filters: vec![],
limit: Limit {
skip: None,
fetch: Some(100),
},
context: Context::None,
columns: vec![],
exprs: vec![LogExpr::AggrFunc {
name: "count".to_string(),
args: vec![LogExpr::NamedIdent("message".to_string())],
by: vec![LogExpr::NamedIdent("host".to_string())],
range: None,
}],
};

let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Aggregate: groupBy=[[count(greptime.public.test_table.message)]], aggr=[[greptime.public.test_table.host]] [count(greptime.public.test_table.message):Int64, host:Utf8;N]\
\n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";

assert_eq!(plan.display_indent_schema().to_string(), expected);
}

#[tokio::test]
async fn test_query_to_plan_with_scalar_func() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let mut planner = LogQueryPlanner::new(table_provider, session_state);

let log_query = LogQuery {
table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
time_filter: TimeFilter {
start: Some("2021-01-01T00:00:00Z".to_string()),
end: Some("2021-01-02T00:00:00Z".to_string()),
span: None,
},
filters: vec![],
limit: Limit {
skip: None,
fetch: Some(100),
},
context: Context::None,
columns: vec![],
exprs: vec![LogExpr::ScalarFunc {
name: "date_trunc".to_string(),
args: vec![
LogExpr::NamedIdent("timestamp".to_string()),
LogExpr::Literal("day".to_string()),
],
}],
};

let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Projection: date_trunc(greptime.public.test_table.timestamp, Utf8(\"day\")) [date_trunc(greptime.public.test_table.timestamp,Utf8(\"day\")):Timestamp(Nanosecond, None);N]\
\n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";

assert_eq!(plan.display_indent_schema().to_string(), expected);
}
}
2 changes: 1 addition & 1 deletion src/query/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl LogicalPlanner for DfLogicalPlanner {
.enable_ident_normalization,
);

let mut planner = LogQueryPlanner::new(table_provider);
let mut planner = LogQueryPlanner::new(table_provider, self.session_state.clone());
planner
.query_to_plan(query)
.await
Expand Down

0 comments on commit 5c13a43

Please sign in to comment.