Skip to content

Commit

Permalink
Improve TestConnector buf reader
Browse files Browse the repository at this point in the history
  • Loading branch information
algesten committed Jan 7, 2025
1 parent 423aa8f commit 0ee0598
Showing 1 changed file with 44 additions and 4 deletions.
48 changes: 44 additions & 4 deletions src/unversioned/transport/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

use std::cell::RefCell;
use std::io::Write;
use std::io::{BufRead, BufReader};
use std::sync::mpsc::{self, Receiver, RecvTimeoutError};
use std::sync::{Arc, Mutex};
use std::{fmt, io, thread};

use http::{Method, Request, Uri};
use ureq_proto::parser::try_parse_request;

use crate::http;
use crate::Error;
Expand Down Expand Up @@ -37,6 +37,7 @@ impl<In: Transport> Connector<In> for TestConnector {
let config = details.config;

let uri = details.uri.clone();
debug!("Test uri: {}", uri);

let buffers = LazyBuffers::new(config.input_buffer_size(), config.output_buffer_size());

Expand Down Expand Up @@ -110,14 +111,13 @@ fn test_run(
tx: mpsc::SyncSender<Vec<u8>>,
handlers: Vec<TestHandler>,
) {
let mut reader = BufReader::new(RxRead(rx));
let mut reader = SaneBufReader(Some(RxRead(rx)), vec![]);
let mut writer = TxWrite(tx);
let uri_s = uri.to_string();

let req = loop {
let input = reader.fill_buf().expect("test fill_buf");
let maybe =
ureq_proto::parser::try_parse_request::<100>(input).expect("test parse request");
let maybe = try_parse_request::<100>(input).expect("test parse request");
if let Some((amount, req)) = maybe {
reader.consume(amount);
break req;
Expand Down Expand Up @@ -439,6 +439,46 @@ impl io::Write for TxWrite {
}
}

struct SaneBufReader<R: io::Read>(Option<R>, Vec<u8>);

impl<R: io::Read> io::Read for SaneBufReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if !self.1.is_empty() {
let max = buf.len().min(self.1.len());
buf[..max].copy_from_slice(&self.1[..max]);
self.1.drain(..max);
return Ok(max);
}

let Some(reader) = &mut self.0 else {
return Ok(0);
};
reader.read(buf)
}
}

impl<R: io::Read> SaneBufReader<R> {
pub fn fill_buf(&mut self) -> io::Result<&[u8]> {
let Some(reader) = &mut self.0 else {
return Ok(&self.1);
};

let l = self.1.len();
self.1.resize(l + 1024, 0);
let buf = &mut self.1[l..];
let n = reader.read(buf)?;
if n == 0 {
self.0 = None;
}
self.1.truncate(l + n);
Ok(&self.1)
}

pub fn consume(&mut self, n: usize) {
self.1.drain(..n);
}
}

pub(crate) struct TestTransport {
buffers: LazyBuffers,
tx: mpsc::SyncSender<Vec<u8>>,
Expand Down

0 comments on commit 0ee0598

Please sign in to comment.