diff --git a/convergence-arrow/Cargo.toml b/convergence-arrow/Cargo.toml index 3d483b5..3755a14 100644 --- a/convergence-arrow/Cargo.toml +++ b/convergence-arrow/Cargo.toml @@ -10,7 +10,7 @@ repository = "https://github.com/returnString/convergence" [dependencies] tokio = { version = "1" } async-trait = "0.1" -datafusion = "38" +datafusion = "43" convergence = { path = "../convergence", version = "0.16.0" } chrono = "0.4" diff --git a/convergence-arrow/examples/datafusion.rs b/convergence-arrow/examples/datafusion.rs index aa4591e..d1c40f3 100644 --- a/convergence-arrow/examples/datafusion.rs +++ b/convergence-arrow/examples/datafusion.rs @@ -2,8 +2,9 @@ use convergence::server::{self, BindOptions}; use convergence_arrow::datafusion::DataFusionEngine; use convergence_arrow::metadata::Catalog; use datafusion::arrow::datatypes::DataType; -use datafusion::catalog::schema::MemorySchemaProvider; -use datafusion::catalog::{CatalogProvider, MemoryCatalogProvider}; +use datafusion::catalog_common::memory::MemorySchemaProvider; +use datafusion::catalog::CatalogProvider; +use datafusion::catalog_common::MemoryCatalogProvider; use datafusion::logical_expr::Volatility; use datafusion::physical_plan::ColumnarValue; use datafusion::prelude::*; @@ -35,7 +36,7 @@ async fn new_engine() -> DataFusionEngine { ctx.register_udf(create_udf( "pg_backend_pid", vec![], - Arc::new(DataType::Int32), + DataType::Int32, Volatility::Stable, Arc::new(|_| Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(0))))), )); @@ -43,7 +44,7 @@ async fn new_engine() -> DataFusionEngine { ctx.register_udf(create_udf( "current_schema", vec![], - Arc::new(DataType::Utf8), + DataType::Utf8, Volatility::Stable, Arc::new(|_| Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some("public".to_owned()))))), )); diff --git a/convergence-arrow/src/metadata.rs b/convergence-arrow/src/metadata.rs index ad63207..6417711 100644 --- a/convergence-arrow/src/metadata.rs +++ b/convergence-arrow/src/metadata.rs @@ -3,8 +3,9 @@ use datafusion::arrow::array::{ArrayRef, Int32Builder, StringBuilder, UInt32Builder}; use datafusion::arrow::datatypes::{Field, Schema}; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::catalog::schema::{MemorySchemaProvider, SchemaProvider}; use datafusion::catalog::CatalogProvider; +use datafusion::catalog::SchemaProvider; +use datafusion::catalog_common::memory::MemorySchemaProvider; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::DataFusionError; use std::convert::TryInto; @@ -153,6 +154,7 @@ impl MetadataBuilder { } /// Wrapper catalog supporting generation of pg metadata (e.g. pg_catalog schema). +#[derive(Debug)] pub struct Catalog { wrapped: Arc, } diff --git a/convergence/src/protocol_ext.rs b/convergence/src/protocol_ext.rs index 575090f..b146775 100644 --- a/convergence/src/protocol_ext.rs +++ b/convergence/src/protocol_ext.rs @@ -138,7 +138,7 @@ impl<'a> DataRowWriter<'a> { primitive_write!(write_float8, f64); } -impl<'a> Drop for DataRowWriter<'a> { +impl Drop for DataRowWriter<'_> { fn drop(&mut self) { assert_eq!( self.parent.num_cols, self.current_col, diff --git a/convergence/tests/test_connection.rs b/convergence/tests/test_connection.rs index c234a57..1f23bdf 100644 --- a/convergence/tests/test_connection.rs +++ b/convergence/tests/test_connection.rs @@ -79,16 +79,16 @@ async fn extended_query_flow() { async fn simple_query_flow() { let client = setup().await; let messages = client.simple_query("select 1").await.unwrap(); - assert_eq!(messages.len(), 2); + assert_eq!(messages.len(), 3); - let row = match &messages[0] { + let row = match &messages[1] { SimpleQueryMessage::Row(row) => row, _ => panic!("expected row"), }; assert_eq!(row.get(0), Some("1")); - let num_rows = match &messages[1] { + let num_rows = match &messages[2] { SimpleQueryMessage::CommandComplete(rows) => *rows, _ => panic!("expected command complete"), };