Skip to content

Commit

Permalink
update datafusion to 43 (#295)
Browse files Browse the repository at this point in the history
* update datafusion to 43

* fix test
  • Loading branch information
houqp authored Jan 2, 2025
1 parent 85dfe3a commit 1c0bdda
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 10 deletions.
2 changes: 1 addition & 1 deletion convergence-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ repository = "https://github.com/returnString/convergence"
[dependencies]
tokio = { version = "1" }
async-trait = "0.1"
datafusion = "38"
datafusion = "43"
convergence = { path = "../convergence", version = "0.16.0" }
chrono = "0.4"

Expand Down
9 changes: 5 additions & 4 deletions convergence-arrow/examples/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use convergence::server::{self, BindOptions};
use convergence_arrow::datafusion::DataFusionEngine;
use convergence_arrow::metadata::Catalog;
use datafusion::arrow::datatypes::DataType;
use datafusion::catalog::schema::MemorySchemaProvider;
use datafusion::catalog::{CatalogProvider, MemoryCatalogProvider};
use datafusion::catalog_common::memory::MemorySchemaProvider;
use datafusion::catalog::CatalogProvider;
use datafusion::catalog_common::MemoryCatalogProvider;
use datafusion::logical_expr::Volatility;
use datafusion::physical_plan::ColumnarValue;
use datafusion::prelude::*;
Expand Down Expand Up @@ -35,15 +36,15 @@ async fn new_engine() -> DataFusionEngine {
ctx.register_udf(create_udf(
"pg_backend_pid",
vec![],
Arc::new(DataType::Int32),
DataType::Int32,
Volatility::Stable,
Arc::new(|_| Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(0))))),
));

ctx.register_udf(create_udf(
"current_schema",
vec![],
Arc::new(DataType::Utf8),
DataType::Utf8,
Volatility::Stable,
Arc::new(|_| Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some("public".to_owned()))))),
));
Expand Down
4 changes: 3 additions & 1 deletion convergence-arrow/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
use datafusion::arrow::array::{ArrayRef, Int32Builder, StringBuilder, UInt32Builder};
use datafusion::arrow::datatypes::{Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::catalog::schema::{MemorySchemaProvider, SchemaProvider};
use datafusion::catalog::CatalogProvider;
use datafusion::catalog::SchemaProvider;
use datafusion::catalog_common::memory::MemorySchemaProvider;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::DataFusionError;
use std::convert::TryInto;
Expand Down Expand Up @@ -153,6 +154,7 @@ impl MetadataBuilder {
}

/// Wrapper catalog supporting generation of pg metadata (e.g. pg_catalog schema).
#[derive(Debug)]
pub struct Catalog {
wrapped: Arc<dyn CatalogProvider>,
}
Expand Down
2 changes: 1 addition & 1 deletion convergence/src/protocol_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl<'a> DataRowWriter<'a> {
primitive_write!(write_float8, f64);
}

impl<'a> Drop for DataRowWriter<'a> {
impl Drop for DataRowWriter<'_> {
fn drop(&mut self) {
assert_eq!(
self.parent.num_cols, self.current_col,
Expand Down
6 changes: 3 additions & 3 deletions convergence/tests/test_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,16 @@ async fn extended_query_flow() {
async fn simple_query_flow() {
let client = setup().await;
let messages = client.simple_query("select 1").await.unwrap();
assert_eq!(messages.len(), 2);
assert_eq!(messages.len(), 3);

let row = match &messages[0] {
let row = match &messages[1] {
SimpleQueryMessage::Row(row) => row,
_ => panic!("expected row"),
};

assert_eq!(row.get(0), Some("1"));

let num_rows = match &messages[1] {
let num_rows = match &messages[2] {
SimpleQueryMessage::CommandComplete(rows) => *rows,
_ => panic!("expected command complete"),
};
Expand Down

0 comments on commit 1c0bdda

Please sign in to comment.