Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
flush when response exceed limit size
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Oct 6, 2023
1 parent e7b848a commit f969447
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 42 deletions.
4 changes: 2 additions & 2 deletions sqld/src/connection/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ impl MakeConnection for MakeWriteProxyConn {
}
}

#[derive(Debug)]
pub struct WriteProxyConnection {
/// Lazily initialized read connection
read_conn: LibSqlConnection<TransparentMethods>,
Expand Down Expand Up @@ -196,8 +195,9 @@ impl WriteProxyConnection {
let (builder, new_status, new_frame_no) = match res {
Ok(res) => res,
Err(e @ (Error::PrimaryStreamDisconnect | Error::PrimaryStreamMisuse)) => {
// drop the connection
// drop the connection, and reset the state.
self.remote_conn.lock().await.take();
*status = TxnStatus::Init;
return Err(e);
}
Err(e) => return Err(e),
Expand Down
13 changes: 10 additions & 3 deletions sqld/src/rpc/streaming_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use std::task::{ready, Context, Poll};

use futures_core::Stream;
use prost::Message;
use rusqlite::types::ValueRef;
use tokio::sync::mpsc;
use tonic::{Code, Status};
Expand Down Expand Up @@ -50,6 +51,7 @@ struct StreamResponseBuilder {
request_id: u32,
sender: mpsc::Sender<ExecResp>,
current: Option<ProgramResp>,
current_size: usize,
}

impl StreamResponseBuilder {
Expand All @@ -59,12 +61,15 @@ impl StreamResponseBuilder {
}

fn push(&mut self, step: Step) -> Result<(), QueryResultBuilderError> {
const MAX_RESPONSE_STEPS: usize = 10;
const MAX_RESPONSE_SIZE: usize = bytesize::ByteSize::mb(1).as_u64() as usize;

let current = self.current();
current.steps.push(RespStep { step: Some(step) });
let step = RespStep { step: Some(step) };
let size = step.encoded_len();
current.steps.push(step);
self.current_size += size;

if current.steps.len() > MAX_RESPONSE_STEPS {
if self.current_size >= MAX_RESPONSE_SIZE {
self.flush()?;
}

Expand All @@ -77,6 +82,7 @@ impl StreamResponseBuilder {
request_id: self.request_id,
response: Some(exec_resp::Response::ProgramResp(current)),
};
self.current_size = 0;
self.sender
.blocking_send(resp)
.map_err(|_| QueryResultBuilderError::Internal(anyhow::anyhow!("stream closed")))?;
Expand Down Expand Up @@ -235,6 +241,7 @@ where
request_id,
sender,
current: None,
current_size: 0,
};
let mut fut = conn.execute_program(pgm, authenticated, builder, None);
loop {
Expand Down
37 changes: 0 additions & 37 deletions sqld/tests/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,40 +205,3 @@ fn sync_many_replica() {

sim.run().unwrap();
}

#[test]
fn create_namespace() {
let mut sim = Builder::new().build();
make_cluster(&mut sim, 0, false);

sim.client("client", async {
let db =
Database::open_remote_with_connector("http://foo.primary:8080", "", TurmoilConnector)?;
let conn = db.connect()?;

let Err(e) = conn.execute("create table test (x)", ()).await else {
panic!()
};
assert_snapshot!(e.to_string());

let client = Client::new();
let resp = client
.post(
"http://foo.primary:9090/v1/namespaces/foo/create",
json!({}),
)
.await?;
assert_eq!(resp.status(), 200);

conn.execute("create table test (x)", ()).await.unwrap();
let mut rows = conn.query("select count(*) from test", ()).await.unwrap();
assert!(matches!(
rows.next().unwrap().unwrap().get_value(0).unwrap(),
Value::Integer(0)
));

Ok(())
});

sim.run().unwrap();
}
37 changes: 37 additions & 0 deletions sqld/tests/namespaces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,43 @@ fn make_primary(sim: &mut Sim, path: PathBuf) {
});
}

#[test]
fn create_namespace() {
let mut sim = Builder::new().build();
make_cluster(&mut sim, 0, false);

sim.client("client", async {
let db =
Database::open_remote_with_connector("http://foo.primary:8080", "", TurmoilConnector)?;
let conn = db.connect()?;

let Err(e) = conn.execute("create table test (x)", ()).await else {
panic!()
};
assert_snapshot!(e.to_string());

let client = Client::new();
let resp = client
.post(
"http://foo.primary:9090/v1/namespaces/foo/create",
json!({}),
)
.await?;
assert_eq!(resp.status(), 200);

conn.execute("create table test (x)", ()).await.unwrap();
let mut rows = conn.query("select count(*) from test", ()).await.unwrap();
assert!(matches!(
rows.next().unwrap().unwrap().get_value(0).unwrap(),
Value::Integer(0)
));

Ok(())
});

sim.run().unwrap();
}

#[test]
fn fork_namespace() {
let mut sim = Builder::new().build();
Expand Down

0 comments on commit f969447

Please sign in to comment.