Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
stream exec support describe
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Oct 8, 2023
1 parent df6efbf commit 1d721db
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 5 deletions.
2 changes: 1 addition & 1 deletion sqld/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;

use parking_lot::{Mutex, RwLock};
use rusqlite::{DatabaseName, ErrorCode, OpenFlags, StatementStatus, TransactionState};
use sqld_libsql_bindings::wal_hook::{TransparentMethods, WalMethodsHook, };
use sqld_libsql_bindings::wal_hook::{TransparentMethods, WalMethodsHook};
use tokio::sync::{watch, Notify};
use tokio::time::{Duration, Instant};

Expand Down
2 changes: 1 addition & 1 deletion sqld/src/connection/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ pub mod test {

use super::*;
use crate::{
query_result_builder::{test::test_driver, QueryResultBuilderError, Column},
query_result_builder::{test::test_driver, Column, QueryResultBuilderError},
rpc::proxy::rpc::{query_result::RowResult, ExecuteResults},
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
source: sqld/src/rpc/streaming_exec.rs
expression: stream.next().await.unwrap().unwrap()
---
ExecResp {
request_id: 0,
response: Some(
DescribeResp(
DescribeResp {
params: [
DescribeParam {
name: Some(
"$hello",
),
},
],
cols: [
DescribeCol {
name: "$hello",
decltype: None,
},
],
is_explain: false,
is_readonly: true,
},
),
),
}
62 changes: 59 additions & 3 deletions sqld/src/rpc/streaming_exec.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use futures_core::future::BoxFuture;
use futures_core::Stream;
use futures_option::OptionExt;
use prost::Message;
Expand All @@ -11,6 +12,7 @@ use tonic::{Code, Status};

use crate::auth::Authenticated;
use crate::connection::Connection;
use crate::connection::program::Program;
use crate::error::Error;
use crate::query_analysis::TxnStatus;
use crate::query_result_builder::{
Expand All @@ -19,6 +21,7 @@ use crate::query_result_builder::{
use crate::replication::FrameNo;
use crate::rpc::proxy::rpc::exec_req::Request;
use crate::rpc::proxy::rpc::exec_resp::{self, Response};
use crate::rpc::proxy::rpc::{DescribeResp, StreamDescribeReq, DescribeCol, DescribeParam};

use super::proxy::rpc::resp_step::Step;
use super::proxy::rpc::row_value::Value;
Expand Down Expand Up @@ -52,7 +55,7 @@ where
C: Connection,
{
async_stream::stream! {
let mut current_request_fut = None;
let mut current_request_fut: Option<BoxFuture<'static, (crate::Result<()>, u32)>> = None;
let (snd, mut recv) = mpsc::channel(1);
let conn = Arc::new(conn);

Expand Down Expand Up @@ -90,13 +93,41 @@ where
max_program_resp_size,
};

let ret = conn.execute_program(pgm, auth, builder, None).await;
let ret = conn.execute_program(pgm, auth, builder, None).await.map(|_| ());
(ret, request_id)
};

current_request_fut.replace(Box::pin(fut));
}
Some(Request::Describe(_)) => todo!(),
Some(Request::Describe(StreamDescribeReq { stmt })) => {
let auth = auth.clone();
let sender = snd.clone();
let conn = conn.clone();
let fut = async move {
let do_describe = || async move {
let ret = conn.describe(stmt, auth, None).await??;
Ok(DescribeResp {
cols: ret.cols.into_iter().map(|c| DescribeCol { name: c.name, decltype: c.decltype }).collect(),
params: ret.params.into_iter().map(|p| DescribeParam { name: p.name }).collect(),
is_explain: ret.is_explain,
is_readonly: ret.is_readonly
})
};

let ret: crate::Result<()> = match do_describe().await {
Ok(resp) => {
let _ = sender.send(ExecResp { request_id, response: Some(Response::DescribeResp(resp)) }).await;
Ok(())
}
Err(e) => Err(e),
};

(ret, request_id)
};

current_request_fut.replace(Box::pin(fut));

},
None => {
yield Err(Status::new(Code::InvalidArgument, "invalid request"));
break
Expand Down Expand Up @@ -540,4 +571,29 @@ mod test {
let resp = stream.next().await.unwrap().unwrap();
assert_eq!(resp.request_id, 1);
}

#[tokio::test]
async fn describe() {
let tmp = tempdir().unwrap();
let conn = LibSqlConnection::new_test(tmp.path());
let (snd, rcv) = mpsc::channel(1);
let auth = Authenticated::Authorized(Authorized {
namespace: None,
permission: Permission::FullAccess,
});
// limit the size of the response for force a split
let stream = make_proxy_stream_inner(conn, auth, ReceiverStream::new(rcv), 500);

pin!(stream);

// request 0 should be dropped, and request 1 should be processed instead
let req = ExecReq {
request_id: 0,
request: Some(Request::Describe(StreamDescribeReq { stmt: "select $hello".into() })),
};

snd.send(Ok(req)).await.unwrap();

assert_debug_snapshot!(stream.next().await.unwrap().unwrap());
}
}

0 comments on commit 1d721db

Please sign in to comment.