From 170bee02b894108be34fc9fbe7f69f2d2cf39b7d Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 7 Feb 2025 11:53:10 +0800 Subject: [PATCH 1/3] fix: unquote flow_name in create flow expr --- src/operator/src/error.rs | 8 +++++ src/operator/src/expr_factory.rs | 51 +++++++++++++++++++++++++--- src/sql/src/parsers/create_parser.rs | 3 +- 3 files changed, 57 insertions(+), 5 deletions(-) diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 3a5aae897399..990f6123a3e1 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -704,6 +704,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid flow name: {name}"))] + InvalidFlowName { + name: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Empty {} expr", name))] EmptyDdlExpr { name: String, @@ -821,6 +828,7 @@ impl ErrorExt for Error { | Error::UnsupportedRegionRequest { .. } | Error::InvalidTableName { .. } | Error::InvalidViewName { .. } + | Error::InvalidFlowName { .. } | Error::InvalidView { .. } | Error::InvalidExpr { .. } | Error::AdminFunctionNotFound { .. } diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 07ef56884fd2..85c90c964b41 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -38,7 +38,7 @@ use query::sql::{ use session::context::QueryContextRef; use session::table_name::table_idents_to_full_name; use snafu::{ensure, OptionExt, ResultExt}; -use sql::ast::ColumnOption; +use sql::ast::{ColumnOption, ObjectName}; use sql::statements::alter::{ AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation, }; @@ -55,8 +55,9 @@ use table::table_reference::TableReference; use crate::error::{ BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu, ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu, - InferFileTableSchemaSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, - PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu, + InferFileTableSchemaSnafu, InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, + ParseSqlSnafu, PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, + UnrecognizedTableOptionSnafu, }; #[derive(Debug, Copy, Clone)] @@ -731,7 +732,7 @@ pub fn to_create_flow_task_expr( Ok(CreateFlowExpr { catalog_name: query_ctx.current_catalog().to_string(), - flow_name: create_flow.flow_name.to_string(), + flow_name: sanitize_flow_name(create_flow.flow_name)?, source_table_names, sink_table_name: Some(sink_table_name), or_replace: create_flow.or_replace, @@ -743,6 +744,14 @@ pub fn to_create_flow_task_expr( }) } +/// sanitize the flow name, remove possible quotes +fn sanitize_flow_name(flow_name: ObjectName) -> Result { + let ident = flow_name.0.first().context(InvalidFlowNameSnafu { + name: flow_name.to_string(), + })?; + Ok(ident.value.clone()) +} + #[cfg(test)] mod tests { use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions}; @@ -755,6 +764,40 @@ mod tests { use super::*; + #[test] + fn test_create_flow_expr() { + let sql = r" +CREATE FLOW `task_2` +SINK TO schema_1.table_1 +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;"; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Statement::CreateFlow(create_flow) = stmt else { + unreachable!() + }; + let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap(); + + let to_dot_sep = + |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name); + assert_eq!("task_2", expr.flow_name); + assert_eq!("greptime", expr.catalog_name); + assert_eq!( + "greptime.schema_1.table_1", + expr.sink_table_name.map(to_dot_sep).unwrap() + ); + assert_eq!(1, expr.source_table_names.len()); + assert_eq!( + "greptime.schema_2.table_2", + to_dot_sep(expr.source_table_names[0].clone()) + ); + assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql); + } + #[test] fn test_create_to_expr() { let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');"; diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index f371788a470a..84f491d8e34d 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -1306,7 +1306,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT` let sql = r" -CREATE FLOW task_2 +CREATE FLOW `task_2` SINK TO schema_1.table_1 AS SELECT max(c1), min(c2) FROM schema_2.table_2;"; @@ -1322,6 +1322,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; assert!(!create_task.if_not_exists); assert!(create_task.expire_after.is_none()); assert!(create_task.comment.is_none()); + assert_eq!(create_task.flow_name.to_string(), "`task_2`"); } #[test] From bcc0fc77db80742d82a04763f64e2f158b4f1bc2 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 7 Feb 2025 12:40:46 +0800 Subject: [PATCH 2/3] chore: per review --- src/operator/src/expr_factory.rs | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 85c90c964b41..f084954b5bd9 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -746,6 +746,12 @@ pub fn to_create_flow_task_expr( /// sanitize the flow name, remove possible quotes fn sanitize_flow_name(flow_name: ObjectName) -> Result { + if flow_name.0.len() != 1 { + return InvalidFlowNameSnafu { + name: flow_name.to_string(), + } + .fail(); + } let ident = flow_name.0.first().context(InvalidFlowNameSnafu { name: flow_name.to_string(), })?; @@ -796,6 +802,28 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; to_dot_sep(expr.source_table_names[0].clone()) ); assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql); + + let sql = r" +CREATE FLOW abc.`task_2` +SINK TO schema_1.table_1 +AS +SELECT max(c1), min(c2) FROM schema_2.table_2;"; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Statement::CreateFlow(create_flow) = stmt else { + unreachable!() + }; + let res = to_create_flow_task_expr(create_flow, &QueryContext::arc()); + + assert!(res.is_err()); + assert!(res + .unwrap_err() + .to_string() + .contains("Invalid flow name: abc.`task_2`")); } #[test] From 4257013ae1ebca1e6585f86989a7740c8b90bb85 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 7 Feb 2025 15:28:35 +0800 Subject: [PATCH 3/3] fix: compat with older version --- src/common/meta/src/key/flow/flow_name.rs | 31 ++++++++++++++++++----- src/operator/src/expr_factory.rs | 16 +++++------- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/src/common/meta/src/key/flow/flow_name.rs b/src/common/meta/src/key/flow/flow_name.rs index cac2e29633f4..4c465c707087 100644 --- a/src/common/meta/src/key/flow/flow_name.rs +++ b/src/common/meta/src/key/flow/flow_name.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_telemetry::warn; use futures::stream::BoxStream; use lazy_static::lazy_static; use regex::Regex; @@ -37,6 +38,12 @@ lazy_static! { "^{FLOW_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$" )) .unwrap(); + + /// for compatibility with older flow name with less strict name pattern + static ref COMPAT_FLOW_NAME_KEY_PATTERN: Regex = Regex::new(&format!( + "^{FLOW_NAME_KEY_PREFIX}/({NAME_PATTERN})/(.*)$" + )) + .unwrap(); } /// The key of mapping {flow_name} to [FlowId]. @@ -114,12 +121,18 @@ impl<'a> MetadataKey<'a, FlowNameKeyInner<'a>> for FlowNameKeyInner<'_> { } .build() })?; - let captures = - FLOW_NAME_KEY_PATTERN - .captures(key) - .context(error::InvalidMetadataSnafu { - err_msg: format!("Invalid FlowNameKeyInner '{key}'"), - })?; + let captures = FLOW_NAME_KEY_PATTERN + .captures(key) + .or_else(|| { + warn!( + "FlowNameKeyInner '{}' is not a valid flow name in newer version.", + key + ); + COMPAT_FLOW_NAME_KEY_PATTERN.captures(key) + }) + .context(error::InvalidMetadataSnafu { + err_msg: format!("Invalid FlowNameKeyInner '{key}'"), + })?; // Safety: pass the regex check above let catalog_name = captures.get(1).unwrap().as_str(); let flow_name = captures.get(2).unwrap().as_str(); @@ -284,6 +297,12 @@ mod tests { let key = FlowNameKey::from_bytes(&bytes).unwrap(); assert_eq!(key.catalog(), "my_catalog"); assert_eq!(key.flow_name(), "my_task"); + + // compatibility with older version + let bytes = b"__flow/name/my_catalog/a/`b`".to_vec(); + let key = FlowNameKey::from_bytes(&bytes).unwrap(); + assert_eq!(key.catalog(), "my_catalog"); + assert_eq!(key.flow_name(), "a/`b`"); } #[test] fn test_key_start_range() { diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index f084954b5bd9..4fa239a5030c 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -745,17 +745,15 @@ pub fn to_create_flow_task_expr( } /// sanitize the flow name, remove possible quotes -fn sanitize_flow_name(flow_name: ObjectName) -> Result { - if flow_name.0.len() != 1 { - return InvalidFlowNameSnafu { +fn sanitize_flow_name(mut flow_name: ObjectName) -> Result { + ensure!( + flow_name.0.len() == 1, + InvalidFlowNameSnafu { name: flow_name.to_string(), } - .fail(); - } - let ident = flow_name.0.first().context(InvalidFlowNameSnafu { - name: flow_name.to_string(), - })?; - Ok(ident.value.clone()) + ); + // safety: we've checked flow_name.0 has exactly one element. + Ok(flow_name.0.swap_remove(0).value) } #[cfg(test)]