diff --git a/src/common/function/src/function_registry.rs b/src/common/function/src/function_registry.rs index 0ce3f8abefb3..a728eaf67f96 100644 --- a/src/common/function/src/function_registry.rs +++ b/src/common/function/src/function_registry.rs @@ -25,6 +25,7 @@ use crate::scalars::expression::ExpressionFunction; use crate::scalars::json::JsonFunction; use crate::scalars::matches::MatchesFunction; use crate::scalars::math::MathFunction; +use crate::scalars::sleep::SleepFunction; use crate::scalars::timestamp::TimestampFunction; use crate::scalars::vector::VectorFunction; use crate::system::SystemFunction; @@ -122,6 +123,9 @@ pub static FUNCTION_REGISTRY: Lazy> = Lazy::new(|| { // Vector related functions VectorFunction::register(&function_registry); + // Sleep function + SleepFunction::register(&function_registry); + // Geo functions #[cfg(feature = "geo")] crate::scalars::geo::GeoFunctions::register(&function_registry); diff --git a/src/common/function/src/scalars.rs b/src/common/function/src/scalars.rs index 8a2556d733ce..87f0821aca95 100644 --- a/src/common/function/src/scalars.rs +++ b/src/common/function/src/scalars.rs @@ -20,6 +20,7 @@ pub mod geo; pub mod json; pub mod matches; pub mod math; +pub mod sleep; pub mod vector; #[cfg(test)] diff --git a/src/common/function/src/scalars/sleep.rs b/src/common/function/src/scalars/sleep.rs new file mode 100644 index 000000000000..fe48808ac943 --- /dev/null +++ b/src/common/function/src/scalars/sleep.rs @@ -0,0 +1,170 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; +use std::{fmt, thread}; + +use common_query::error::{InvalidFuncArgsSnafu, Result}; +use common_query::prelude::{Signature, Volatility}; +use datatypes::prelude::ConcreteDataType; +use datatypes::value::Value; +use datatypes::vectors::{Int64Vector, VectorRef}; +use snafu::ensure; + +use crate::function::{Function, FunctionContext}; +use crate::function_registry::FunctionRegistry; + +/// Sleep function that pauses execution for specified seconds +#[derive(Clone, Debug, Default)] +pub(crate) struct SleepFunction; + +impl SleepFunction { + pub fn register(registry: &FunctionRegistry) { + registry.register(Arc::new(SleepFunction)); + } +} + +const NAME: &str = "sleep"; + +impl Function for SleepFunction { + fn name(&self) -> &str { + NAME + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::int64_datatype()) + } + + fn signature(&self) -> Signature { + // Accept int32, int64 and float64 types + Signature::uniform( + 1, + vec![ + ConcreteDataType::int32_datatype(), + ConcreteDataType::int64_datatype(), + ConcreteDataType::float64_datatype(), + ], + Volatility::Volatile, + ) + } + + fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + ensure!( + columns.len() == 1, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect exactly one, have: {}", + columns.len() + ), + } + ); + + let vector = &columns[0]; + let mut result = Vec::with_capacity(vector.len()); + + for i in 0..vector.len() { + let secs = match vector.get(i) { + Value::Int64(x) => x as f64, + Value::Int32(x) => x as f64, + Value::Float64(x) => x.into_inner(), + _ => { + result.push(None); + continue; + } + }; + // Sleep for the specified seconds TODO: use tokio::time::sleep when the scalars are async + thread::sleep(Duration::from_secs_f64(secs)); + result.push(Some(secs as i64)); + } + + Ok(Arc::new(Int64Vector::from(result))) + } +} + +impl fmt::Display for SleepFunction { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "SLEEP") + } +} + +#[cfg(test)] +mod tests { + use std::time::Instant; + + use datatypes::value::Value; + use datatypes::vectors::{Float64Vector, Int32Vector}; + + use super::*; + + #[test] + fn test_sleep() { + let f = SleepFunction; + assert_eq!("sleep", f.name()); + assert_eq!( + ConcreteDataType::int64_datatype(), + f.return_type(&[]).unwrap() + ); + + let times = vec![Some(1_i64), None, Some(2_i64)]; + let args: Vec = vec![Arc::new(Int64Vector::from(times.clone()))]; + + let start = Instant::now(); + let vector = f.eval(FunctionContext::default(), &args).unwrap(); + let elapsed = start.elapsed(); + + assert_eq!(3, vector.len()); + assert!(elapsed.as_secs() >= 3); // Should sleep for total of 3 seconds + + assert_eq!(vector.get(0), Value::Int64(1)); + assert_eq!(vector.get(1), Value::Null); + assert_eq!(vector.get(2), Value::Int64(2)); + } + + #[test] + fn test_sleep_float64() { + let f = SleepFunction; + let times = vec![Some(0.5_f64), None, Some(1.5_f64)]; + let args: Vec = vec![Arc::new(Float64Vector::from(times))]; + + let start = Instant::now(); + let vector = f.eval(FunctionContext::default(), &args).unwrap(); + let elapsed = start.elapsed(); + + assert_eq!(3, vector.len()); + assert!(elapsed.as_secs_f64() >= 2.0); + + assert_eq!(vector.get(0), Value::Int64(0)); + assert_eq!(vector.get(1), Value::Null); + assert_eq!(vector.get(2), Value::Int64(1)); + } + + #[test] + fn test_sleep_int32() { + let f = SleepFunction; + let times = vec![Some(1_i32), None, Some(2_i32)]; + let args: Vec = vec![Arc::new(Int32Vector::from(times))]; + + let start = Instant::now(); + let vector = f.eval(FunctionContext::default(), &args).unwrap(); + let elapsed = start.elapsed(); + + assert_eq!(3, vector.len()); + assert!(elapsed.as_secs() >= 3); + + assert_eq!(vector.get(0), Value::Int64(1)); + assert_eq!(vector.get(1), Value::Null); + assert_eq!(vector.get(2), Value::Int64(2)); + } +} diff --git a/src/servers/src/postgres/fixtures.rs b/src/servers/src/postgres/fixtures.rs index 34761d5d35ec..35fd56163682 100644 --- a/src/servers/src/postgres/fixtures.rs +++ b/src/servers/src/postgres/fixtures.rs @@ -117,9 +117,17 @@ pub(crate) fn process<'a>(query: &str, query_ctx: QueryContextRef) -> Option = Lazy::new(|| Regex::new("(?i)(LIMIT\\s+\\d+)::bigint").unwrap()); + +static PG_SLEEP_PATTERN: Lazy = + Lazy::new(|| Regex::new("(?i)pg_sleep\\s*\\((.*?)\\)").unwrap()); + pub(crate) fn rewrite_sql(query: &str) -> Cow<'_, str> { //TODO(sunng87): remove this when we upgraded datafusion to 43 or newer let query = LIMIT_CAST_PATTERN.replace_all(query, "$1"); + + // tricky way to support both sleep in mysql and pg_sleep in postgres + let query = PG_SLEEP_PATTERN.replace_all(&query, "sleep($1)"); + // DBeaver tricky replacement for datafusion not support sql // TODO: add more here query diff --git a/tests/cases/standalone/common/function/sleep.result b/tests/cases/standalone/common/function/sleep.result new file mode 100644 index 000000000000..13d21d64f797 --- /dev/null +++ b/tests/cases/standalone/common/function/sleep.result @@ -0,0 +1,40 @@ +select sleep(0.1); + ++---------------------+ +| sleep(Float64(0.1)) | ++---------------------+ +| 0 | ++---------------------+ + +select sleep(1) as a; + ++---+ +| a | ++---+ +| 1 | ++---+ + +-- should fail it is for postgres +select pg_sleep(0.1); + +Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Invalid function 'pg_sleep'. +Did you mean 'sleep'? + +-- SQLNESS PROTOCOL POSTGRES +select pg_sleep(0.5); + ++---------------------+ +| sleep(Float64(0.5)) | ++---------------------+ +| 0 | ++---------------------+ + +-- SQLNESS PROTOCOL POSTGRES +select pg_sleep(2) as b; + ++---+ +| b | ++---+ +| 2 | ++---+ + diff --git a/tests/cases/standalone/common/function/sleep.sql b/tests/cases/standalone/common/function/sleep.sql new file mode 100644 index 000000000000..f817995b86a5 --- /dev/null +++ b/tests/cases/standalone/common/function/sleep.sql @@ -0,0 +1,12 @@ +select sleep(0.1); + +select sleep(1) as a; + +-- should fail it is for postgres +select pg_sleep(0.1); + +-- SQLNESS PROTOCOL POSTGRES +select pg_sleep(0.5); + +-- SQLNESS PROTOCOL POSTGRES +select pg_sleep(2) as b;