diff --git a/Cargo.toml b/Cargo.toml index 17b4e2ba..4ee1444a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,9 +29,9 @@ byteorder = "1.0.0" futures-core = "0.3.0" futures-sink = "0.3.0" serde = "1.0.8" -tokio = { version = "0.2.0", features = ["tcp"] } -bytes = "0.5.0" +tokio = { version = "1.0", features = ["net"] } +bytes = "1.0" [dev-dependencies] futures = "0.3.0" -tokio = { version = "0.2.0", features = ["full"] } +tokio = { version = "1.0", features = ["full"] } diff --git a/src/lib.rs b/src/lib.rs index 7022ddfb..b25f9f68 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,10 +25,11 @@ pub use crate::writer::{AsyncDestination, BincodeWriterFor, SyncDestination}; mod tests { use super::*; use futures::prelude::*; + use tokio::io::AsyncWriteExt; #[tokio::test] async fn it_works() { - let mut echo = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let echo = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = echo.local_addr().unwrap(); tokio::spawn(async move { @@ -51,7 +52,7 @@ mod tests { #[tokio::test] async fn lots() { - let mut echo = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let echo = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = echo.local_addr().unwrap(); tokio::spawn(async move { @@ -71,7 +72,7 @@ mod tests { .await .unwrap(); - tokio::net::TcpStream::shutdown(c.get_mut(), std::net::Shutdown::Write).unwrap(); + c.get_mut().shutdown().await.unwrap(); let mut at = 0; while let Some(got) = c.next().await.transpose().unwrap() { diff --git a/src/reader.rs b/src/reader.rs index 26283190..b1eb1ea1 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -7,7 +7,7 @@ use std::io; use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::io::AsyncRead; +use tokio::io::{AsyncRead, ReadBuf}; /// A wrapper around an asynchronous reader that produces an asynchronous stream of /// bincode-decoded values. @@ -139,7 +139,9 @@ where unsafe { rest.set_len(max) }; while self.buffer.len() < target_size { - let n = ready!(Pin::new(&mut self.reader).poll_read(cx, &mut rest[..]))?; + let mut buf = ReadBuf::new(&mut rest[..]); + ready!(Pin::new(&mut self.reader).poll_read(cx, &mut buf))?; + let n = buf.filled().len(); if n == 0 { if self.buffer.len() == 0 { return Poll::Ready(Ok(FillResult::EOF)); diff --git a/src/stream.rs b/src/stream.rs index 976f4bb8..aaa660d9 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -6,7 +6,7 @@ use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::task::{Context, Poll}; use std::{fmt, io}; -use tokio::io::AsyncRead; +use tokio::io::{AsyncRead, ReadBuf}; /// A wrapper around an asynchronous stream that receives and sends bincode-encoded values. /// @@ -135,8 +135,8 @@ where fn poll_read( self: Pin<&mut Self>, cx: &mut Context, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf, + ) -> Poll> { Pin::new(self.get_mut().get_mut()).poll_read(cx, buf) } }