diff --git a/airbyte-to-flow/src/connector_runner.rs b/airbyte-to-flow/src/connector_runner.rs index 847704b9105b..b090378a5e1b 100644 --- a/airbyte-to-flow/src/connector_runner.rs +++ b/airbyte-to-flow/src/connector_runner.rs @@ -248,8 +248,7 @@ async fn streaming_all( response_stream_writer: Arc>>>, response_finished_sender: oneshot::Sender, ) -> Result<(), Error> { - let mut request_stream_reader = - StreamReader::new(request_stream); + let mut request_stream_reader = StreamReader::new(request_stream); let request_stream_copy = async move { copy(&mut request_stream_reader, &mut request_stream_writer).await?; @@ -263,8 +262,8 @@ async fn streaming_all( while let Some(result) = response_stream.next().await { match result { Ok(bytes) => { - writer.write(&bytes).await?; - }, + writer.write_all(&bytes).await?; + } // This error usually happens because there is an underlying error // in the connector. We don't want this error to obscure the real error // so we just log it as a debug and let the last output error @@ -274,7 +273,7 @@ async fn streaming_all( } Err(e) => Err::<(), std::io::Error>(e.into())?, } - }; + } response_finished_sender .send(true)