diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fb3c07cb..083ee77c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -361,3 +361,5 @@ jobs: name: Run `rpk,underscore-wildcards` tests - run: cargo test --features pq-experimental,rpk,underscore-wildcards name: Run `pq-experimental,rpk,underscore-wildcards` tests + - run: cargo test -p hyper-boring --features hyper1 + name: Run hyper 1.0 tests for hyper-boring diff --git a/Cargo.toml b/Cargo.toml index f848893f..a067f74f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ boring = { version = "4.9.1", path = "./boring" } tokio-boring = { version = "4.9.1", path = "./tokio-boring" } bindgen = { version = "0.70.1", default-features = false, features = ["runtime"] } +bytes = "1" cmake = "0.1.18" fs_extra = "1.3.0" fslock = "0.2" @@ -36,10 +37,15 @@ futures = "0.3" tokio = "1" anyhow = "1" antidote = "1.0.0" -http = "0.2" -hyper = { version = "0.14", default-features = false } +http = "1" +http-body-util = "0.1.2" +http_old = { package = "http", version = "0.2" } +hyper = "1" +hyper-util = "0.1.6" +hyper_old = { package = "hyper", version = "0.14", default-features = false } linked_hash_set = "0.1" once_cell = "1.0" openssl-macros = "0.1.1" tower = "0.4" tower-layer = "0.3" +tower-service = "0.3" diff --git a/boring-sys/Cargo.toml b/boring-sys/Cargo.toml index c1471f20..5b623181 100644 --- a/boring-sys/Cargo.toml +++ b/boring-sys/Cargo.toml @@ -81,3 +81,6 @@ bindgen = { workspace = true } cmake = { workspace = true } fs_extra = { workspace = true } fslock = { workspace = true } + +[lints.rust] +unexpected_cfgs = { level = "allow", check-cfg = ['cfg(const_fn)'] } diff --git a/hyper-boring/Cargo.toml b/hyper-boring/Cargo.toml index b8c5c9d0..1e30d035 100644 --- a/hyper-boring/Cargo.toml +++ b/hyper-boring/Cargo.toml @@ -17,7 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"] [features] default = ["runtime"] -runtime = ["hyper/runtime"] +runtime = ["hyper_old/runtime"] # Use a FIPS-validated version of boringssl. fips = ["tokio-boring/fips"] @@ -28,19 +28,30 @@ fips-link-precompiled = ["tokio-boring/fips-link-precompiled"] # Enables experimental post-quantum crypto (https://blog.cloudflare.com/post-quantum-for-all/) pq-experimental = ["tokio-boring/pq-experimental"] +# Enable Hyper 1 support +hyper1 = ["dep:http", "dep:hyper", "dep:hyper-util", "dep:tower-service"] + [dependencies] antidote = { workspace = true } -http = { workspace = true } -hyper = { workspace = true, features = ["client"] } +http = { workspace = true, optional = true } +http_old = { workspace = true } +hyper = { workspace = true, optional = true } +hyper-util = { workspace = true, optional = true, features = ["client", "client-legacy"] } +hyper_old = { workspace = true, features = ["client"] } linked_hash_set = { workspace = true } once_cell = { workspace = true } boring = { workspace = true } tokio = { workspace = true } tokio-boring = { workspace = true } tower-layer = { workspace = true } +tower-service = { workspace = true, optional = true } [dev-dependencies] -hyper = { workspace = true, features = [ "full" ] } +bytes = { workspace = true } +http-body-util = { workspace = true } +hyper-util = { workspace = true, features = ["http1", "http2", "service", "tokio"] } +hyper = { workspace = true, features = ["server"] } +hyper_old = { workspace = true, features = [ "full" ] } tokio = { workspace = true, features = [ "full" ] } tower = { workspace = true, features = ["util"] } futures = { workspace = true } diff --git a/hyper-boring/src/lib.rs b/hyper-boring/src/lib.rs index 53c12a46..736a3d89 100644 --- a/hyper-boring/src/lib.rs +++ b/hyper-boring/src/lib.rs @@ -2,91 +2,27 @@ #![warn(missing_docs)] #![cfg_attr(docsrs, feature(doc_auto_cfg))] -use crate::cache::{SessionCache, SessionKey}; -use antidote::Mutex; +use crate::cache::SessionKey; use boring::error::ErrorStack; use boring::ex_data::Index; -use boring::ssl::{ - ConnectConfiguration, Ssl, SslConnector, SslConnectorBuilder, SslMethod, SslRef, - SslSessionCacheMode, -}; -use http::uri::Scheme; -use hyper::client::connect::{Connected, Connection}; -#[cfg(feature = "runtime")] -use hyper::client::HttpConnector; -use hyper::service::Service; -use hyper::Uri; +use boring::ssl::Ssl; use once_cell::sync::OnceCell; -use std::fmt::Debug; -use std::future::Future; -use std::io; -use std::net; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::{error::Error, fmt}; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use std::fmt; use tokio_boring::SslStream; -use tower_layer::Layer; mod cache; -#[cfg(test)] -mod test; +mod v0; +/// Hyper 1 support. +#[cfg(feature = "hyper1")] +pub mod v1; + +pub use self::v0::*; fn key_index() -> Result, ErrorStack> { static IDX: OnceCell> = OnceCell::new(); IDX.get_or_try_init(Ssl::new_ex_index).copied() } -#[derive(Clone)] -struct Inner { - ssl: SslConnector, - cache: Arc>, - callback: Option, - ssl_callback: Option, -} - -type Callback = - Arc Result<(), ErrorStack> + Sync + Send>; -type SslCallback = Arc Result<(), ErrorStack> + Sync + Send>; - -impl Inner { - fn setup_ssl(&self, uri: &Uri, host: &str) -> Result { - let mut conf = self.ssl.configure()?; - - if let Some(ref callback) = self.callback { - callback(&mut conf, uri)?; - } - - let key = SessionKey { - host: host.to_string(), - port: uri.port_u16().unwrap_or(443), - }; - - if let Some(session) = self.cache.lock().get(&key) { - unsafe { - conf.set_session(&session)?; - } - } - - let idx = key_index()?; - conf.set_ex_data(idx, key); - - let mut ssl = conf.into_ssl(host)?; - - if let Some(ref ssl_callback) = self.ssl_callback { - ssl_callback(&mut ssl, uri)?; - } - - Ok(ssl) - } -} - -/// A layer which wraps services in an `HttpsConnector`. -pub struct HttpsLayer { - inner: Inner, -} - /// Settings for [`HttpsLayer`] pub struct HttpsLayerSettings { session_cache_capacity: usize, @@ -123,214 +59,6 @@ impl HttpsLayerSettingsBuilder { } } -impl HttpsLayer { - /// Creates a new `HttpsLayer` with default settings. - /// - /// ALPN is configured to support both HTTP/1 and HTTP/1.1. - pub fn new() -> Result { - let mut ssl = SslConnector::builder(SslMethod::tls())?; - - ssl.set_alpn_protos(b"\x02h2\x08http/1.1")?; - - Self::with_connector(ssl) - } - - /// Creates a new `HttpsLayer`. - /// - /// The session cache configuration of `ssl` will be overwritten. - pub fn with_connector(ssl: SslConnectorBuilder) -> Result { - Self::with_connector_and_settings(ssl, Default::default()) - } - - /// Creates a new `HttpsLayer` with settings - pub fn with_connector_and_settings( - mut ssl: SslConnectorBuilder, - settings: HttpsLayerSettings, - ) -> Result { - let cache = Arc::new(Mutex::new(SessionCache::with_capacity( - settings.session_cache_capacity, - ))); - - ssl.set_session_cache_mode(SslSessionCacheMode::CLIENT); - - ssl.set_new_session_callback({ - let cache = cache.clone(); - move |ssl, session| { - if let Some(key) = key_index().ok().and_then(|idx| ssl.ex_data(idx)) { - cache.lock().insert(key.clone(), session); - } - } - }); - - Ok(HttpsLayer { - inner: Inner { - ssl: ssl.build(), - cache, - callback: None, - ssl_callback: None, - }, - }) - } - - /// Registers a callback which can customize the configuration of each connection. - /// - /// Unsuitable to change verify hostflags (with `config.param_mut().set_hostflags(…)`), - /// as they are reset after the callback is executed. Use [`Self::set_ssl_callback`] - /// instead. - pub fn set_callback(&mut self, callback: F) - where - F: Fn(&mut ConnectConfiguration, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send, - { - self.inner.callback = Some(Arc::new(callback)); - } - - /// Registers a callback which can customize the `Ssl` of each connection. - pub fn set_ssl_callback(&mut self, callback: F) - where - F: Fn(&mut SslRef, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send, - { - self.inner.ssl_callback = Some(Arc::new(callback)); - } -} - -impl Layer for HttpsLayer { - type Service = HttpsConnector; - - fn layer(&self, inner: S) -> HttpsConnector { - HttpsConnector { - http: inner, - inner: self.inner.clone(), - } - } -} - -/// A Connector using OpenSSL to support `http` and `https` schemes. -#[derive(Clone)] -pub struct HttpsConnector { - http: T, - inner: Inner, -} - -#[cfg(feature = "runtime")] -impl HttpsConnector { - /// Creates a a new `HttpsConnector` using default settings. - /// - /// The Hyper `HttpConnector` is used to perform the TCP socket connection. ALPN is configured to support both - /// HTTP/2 and HTTP/1.1. - /// - /// Requires the `runtime` Cargo feature. - pub fn new() -> Result, ErrorStack> { - let mut http = HttpConnector::new(); - http.enforce_http(false); - - HttpsLayer::new().map(|l| l.layer(http)) - } -} - -impl HttpsConnector -where - S: Service + Send, - S::Error: Into>, - S::Future: Unpin + Send + 'static, - T: AsyncRead + AsyncWrite + Connection + Unpin + Debug + Sync + Send + 'static, -{ - /// Creates a new `HttpsConnector`. - /// - /// The session cache configuration of `ssl` will be overwritten. - pub fn with_connector( - http: S, - ssl: SslConnectorBuilder, - ) -> Result, ErrorStack> { - HttpsLayer::with_connector(ssl).map(|l| l.layer(http)) - } - - /// Registers a callback which can customize the configuration of each connection. - /// - /// Unsuitable to change verify hostflags (with `config.param_mut().set_hostflags(…)`), - /// as they are reset after the callback is executed. Use [`Self::set_ssl_callback`] - /// instead. - pub fn set_callback(&mut self, callback: F) - where - F: Fn(&mut ConnectConfiguration, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send, - { - self.inner.callback = Some(Arc::new(callback)); - } - - /// Registers a callback which can customize the `Ssl` of each connection. - pub fn set_ssl_callback(&mut self, callback: F) - where - F: Fn(&mut SslRef, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send, - { - self.inner.ssl_callback = Some(Arc::new(callback)); - } -} - -impl Service for HttpsConnector -where - S: Service + Send, - S::Error: Into>, - S::Future: Unpin + Send + 'static, - S::Response: AsyncRead + AsyncWrite + Connection + Unpin + Debug + Sync + Send + 'static, -{ - type Response = MaybeHttpsStream; - type Error = Box; - #[allow(clippy::type_complexity)] - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.http.poll_ready(cx).map_err(Into::into) - } - - fn call(&mut self, uri: Uri) -> Self::Future { - let is_tls_scheme = uri - .scheme() - .map(|s| s == &Scheme::HTTPS || s.as_str() == "wss") - .unwrap_or(false); - - let tls_setup = if is_tls_scheme { - Some((self.inner.clone(), uri.clone())) - } else { - None - }; - - let connect = self.http.call(uri); - - let f = async { - let conn = connect.await.map_err(Into::into)?; - - let (inner, uri) = match tls_setup { - Some((inner, uri)) => (inner, uri), - None => return Ok(MaybeHttpsStream::Http(conn)), - }; - - let mut host = uri.host().ok_or("URI missing host")?; - - // If `host` is an IPv6 address, we must strip away the square brackets that surround - // it (otherwise, boring will fail to parse the host as an IP address, eventually - // causing the handshake to fail due a hostname verification error). - if !host.is_empty() { - let last = host.len() - 1; - let mut chars = host.chars(); - - if let (Some('['), Some(']')) = (chars.next(), chars.last()) { - if host[1..last].parse::().is_ok() { - host = &host[1..last]; - } - } - } - - let ssl = inner.setup_ssl(&uri, host)?; - let stream = tokio_boring::SslStreamBuilder::new(ssl, conn) - .connect() - .await?; - - Ok(MaybeHttpsStream::Https(stream)) - }; - - Box::pin(f) - } -} - /// A stream which may be wrapped with TLS. pub enum MaybeHttpsStream { /// A raw HTTP stream. @@ -339,72 +67,6 @@ pub enum MaybeHttpsStream { Https(SslStream), } -impl AsyncRead for MaybeHttpsStream -where - T: AsyncRead + AsyncWrite + Unpin, -{ - fn poll_read( - mut self: Pin<&mut Self>, - ctx: &mut Context<'_>, - buf: &mut ReadBuf, - ) -> Poll> { - match &mut *self { - MaybeHttpsStream::Http(s) => Pin::new(s).poll_read(ctx, buf), - MaybeHttpsStream::Https(s) => Pin::new(s).poll_read(ctx, buf), - } - } -} - -impl AsyncWrite for MaybeHttpsStream -where - T: AsyncRead + AsyncWrite + Unpin, -{ - fn poll_write( - mut self: Pin<&mut Self>, - ctx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - match &mut *self { - MaybeHttpsStream::Http(s) => Pin::new(s).poll_write(ctx, buf), - MaybeHttpsStream::Https(s) => Pin::new(s).poll_write(ctx, buf), - } - } - - fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - match &mut *self { - MaybeHttpsStream::Http(s) => Pin::new(s).poll_flush(ctx), - MaybeHttpsStream::Https(s) => Pin::new(s).poll_flush(ctx), - } - } - - fn poll_shutdown(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - match &mut *self { - MaybeHttpsStream::Http(s) => Pin::new(s).poll_shutdown(ctx), - MaybeHttpsStream::Https(s) => Pin::new(s).poll_shutdown(ctx), - } - } -} - -impl Connection for MaybeHttpsStream -where - T: Connection, -{ - fn connected(&self) -> Connected { - match self { - MaybeHttpsStream::Http(s) => s.connected(), - MaybeHttpsStream::Https(s) => { - let mut connected = s.get_ref().connected(); - - if s.ssl().selected_alpn_protocol() == Some(b"h2") { - connected = connected.negotiated_h2(); - } - - connected - } - } - } -} - impl fmt::Debug for MaybeHttpsStream { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { diff --git a/hyper-boring/src/v0.rs b/hyper-boring/src/v0.rs new file mode 100644 index 00000000..172d1640 --- /dev/null +++ b/hyper-boring/src/v0.rs @@ -0,0 +1,345 @@ +use crate::cache::{SessionCache, SessionKey}; +use crate::{key_index, HttpsLayerSettings, MaybeHttpsStream}; +use antidote::Mutex; +use boring::error::ErrorStack; +use boring::ssl::{ + ConnectConfiguration, Ssl, SslConnector, SslConnectorBuilder, SslMethod, SslRef, + SslSessionCacheMode, +}; +use http_old::uri::Scheme; +use hyper_old::client::connect::{Connected, Connection}; +use hyper_old::client::HttpConnector; +use hyper_old::service::Service; +use hyper_old::Uri; +use std::error::Error; +use std::future::Future; +use std::net; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::{fmt, io}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tower_layer::Layer; + +/// A Connector using OpenSSL to support `http` and `https` schemes. +#[derive(Clone)] +pub struct HttpsConnector { + http: T, + inner: Inner, +} + +#[cfg(feature = "runtime")] +impl HttpsConnector { + /// Creates a a new `HttpsConnector` using default settings. + /// + /// The Hyper `HttpConnector` is used to perform the TCP socket connection. ALPN is configured to support both + /// HTTP/2 and HTTP/1.1. + /// + /// Requires the `runtime` Cargo feature. + pub fn new() -> Result, ErrorStack> { + let mut http = HttpConnector::new(); + http.enforce_http(false); + + HttpsLayer::new().map(|l| l.layer(http)) + } +} + +impl HttpsConnector +where + S: Service + Send, + S::Error: Into>, + S::Future: Unpin + Send + 'static, + T: AsyncRead + AsyncWrite + Connection + Unpin + fmt::Debug + Sync + Send + 'static, +{ + /// Creates a new `HttpsConnector`. + /// + /// The session cache configuration of `ssl` will be overwritten. + pub fn with_connector( + http: S, + ssl: SslConnectorBuilder, + ) -> Result, ErrorStack> { + HttpsLayer::with_connector(ssl).map(|l| l.layer(http)) + } + + /// Registers a callback which can customize the configuration of each connection. + /// + /// Unsuitable to change verify hostflags (with `config.param_mut().set_hostflags(…)`), + /// as they are reset after the callback is executed. Use [`Self::set_ssl_callback`] + /// instead. + pub fn set_callback(&mut self, callback: F) + where + F: Fn(&mut ConnectConfiguration, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send, + { + self.inner.callback = Some(Arc::new(callback)); + } + + /// Registers a callback which can customize the `Ssl` of each connection. + pub fn set_ssl_callback(&mut self, callback: F) + where + F: Fn(&mut SslRef, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send, + { + self.inner.ssl_callback = Some(Arc::new(callback)); + } +} + +/// A layer which wraps services in an `HttpsConnector`. +pub struct HttpsLayer { + inner: Inner, +} + +#[derive(Clone)] +struct Inner { + ssl: SslConnector, + cache: Arc>, + callback: Option, + ssl_callback: Option, +} + +type Callback = + Arc Result<(), ErrorStack> + Sync + Send>; +type SslCallback = Arc Result<(), ErrorStack> + Sync + Send>; + +impl HttpsLayer { + /// Creates a new `HttpsLayer` with default settings. + /// + /// ALPN is configured to support both HTTP/1 and HTTP/1.1. + pub fn new() -> Result { + let mut ssl = SslConnector::builder(SslMethod::tls())?; + + ssl.set_alpn_protos(b"\x02h2\x08http/1.1")?; + + Self::with_connector(ssl) + } + + /// Creates a new `HttpsLayer`. + /// + /// The session cache configuration of `ssl` will be overwritten. + pub fn with_connector(ssl: SslConnectorBuilder) -> Result { + Self::with_connector_and_settings(ssl, Default::default()) + } + + /// Creates a new `HttpsLayer` with settings + pub fn with_connector_and_settings( + mut ssl: SslConnectorBuilder, + settings: HttpsLayerSettings, + ) -> Result { + let cache = Arc::new(Mutex::new(SessionCache::with_capacity( + settings.session_cache_capacity, + ))); + + ssl.set_session_cache_mode(SslSessionCacheMode::CLIENT); + + ssl.set_new_session_callback({ + let cache = cache.clone(); + move |ssl, session| { + if let Some(key) = key_index().ok().and_then(|idx| ssl.ex_data(idx)) { + cache.lock().insert(key.clone(), session); + } + } + }); + + Ok(HttpsLayer { + inner: Inner { + ssl: ssl.build(), + cache, + callback: None, + ssl_callback: None, + }, + }) + } + + /// Registers a callback which can customize the configuration of each connection. + /// + /// Unsuitable to change verify hostflags (with `config.param_mut().set_hostflags(…)`), + /// as they are reset after the callback is executed. Use [`Self::set_ssl_callback`] + /// instead. + pub fn set_callback(&mut self, callback: F) + where + F: Fn(&mut ConnectConfiguration, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send, + { + self.inner.callback = Some(Arc::new(callback)); + } + + /// Registers a callback which can customize the `Ssl` of each connection. + pub fn set_ssl_callback(&mut self, callback: F) + where + F: Fn(&mut SslRef, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send, + { + self.inner.ssl_callback = Some(Arc::new(callback)); + } +} + +impl Layer for HttpsLayer { + type Service = HttpsConnector; + + fn layer(&self, inner: S) -> HttpsConnector { + HttpsConnector { + http: inner, + inner: self.inner.clone(), + } + } +} + +impl Inner { + fn setup_ssl(&self, uri: &Uri, host: &str) -> Result { + let mut conf = self.ssl.configure()?; + + if let Some(ref callback) = self.callback { + callback(&mut conf, uri)?; + } + + let key = SessionKey { + host: host.to_string(), + port: uri.port_u16().unwrap_or(443), + }; + + if let Some(session) = self.cache.lock().get(&key) { + unsafe { + conf.set_session(&session)?; + } + } + + let idx = key_index()?; + conf.set_ex_data(idx, key); + + let mut ssl = conf.into_ssl(host)?; + + if let Some(ref ssl_callback) = self.ssl_callback { + ssl_callback(&mut ssl, uri)?; + } + + Ok(ssl) + } +} + +impl Service for HttpsConnector +where + S: Service + Send, + S::Error: Into>, + S::Future: Unpin + Send + 'static, + S::Response: AsyncRead + AsyncWrite + Connection + Unpin + fmt::Debug + Sync + Send + 'static, +{ + type Response = MaybeHttpsStream; + type Error = Box; + #[allow(clippy::type_complexity)] + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.http.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, uri: Uri) -> Self::Future { + let is_tls_scheme = uri + .scheme() + .map(|s| s == &Scheme::HTTPS || s.as_str() == "wss") + .unwrap_or(false); + + let tls_setup = if is_tls_scheme { + Some((self.inner.clone(), uri.clone())) + } else { + None + }; + + let connect = self.http.call(uri); + + let f = async { + let conn = connect.await.map_err(Into::into)?; + + let (inner, uri) = match tls_setup { + Some((inner, uri)) => (inner, uri), + None => return Ok(MaybeHttpsStream::Http(conn)), + }; + + let mut host = uri.host().ok_or("URI missing host")?; + + // If `host` is an IPv6 address, we must strip away the square brackets that surround + // it (otherwise, boring will fail to parse the host as an IP address, eventually + // causing the handshake to fail due a hostname verification error). + if !host.is_empty() { + let last = host.len() - 1; + let mut chars = host.chars(); + + if let (Some('['), Some(']')) = (chars.next(), chars.last()) { + if host[1..last].parse::().is_ok() { + host = &host[1..last]; + } + } + } + + let ssl = inner.setup_ssl(&uri, host)?; + let stream = tokio_boring::SslStreamBuilder::new(ssl, conn) + .connect() + .await?; + + Ok(MaybeHttpsStream::Https(stream)) + }; + + Box::pin(f) + } +} + +impl Connection for MaybeHttpsStream +where + T: Connection, +{ + fn connected(&self) -> Connected { + match self { + MaybeHttpsStream::Http(s) => s.connected(), + MaybeHttpsStream::Https(s) => { + let mut connected = s.get_ref().connected(); + + if s.ssl().selected_alpn_protocol() == Some(b"h2") { + connected = connected.negotiated_h2(); + } + + connected + } + } + } +} + +impl AsyncRead for MaybeHttpsStream +where + T: AsyncRead + AsyncWrite + Unpin, +{ + fn poll_read( + mut self: Pin<&mut Self>, + ctx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + match &mut *self { + MaybeHttpsStream::Http(s) => Pin::new(s).poll_read(ctx, buf), + MaybeHttpsStream::Https(s) => Pin::new(s).poll_read(ctx, buf), + } + } +} + +impl AsyncWrite for MaybeHttpsStream +where + T: AsyncRead + AsyncWrite + Unpin, +{ + fn poll_write( + mut self: Pin<&mut Self>, + ctx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + match &mut *self { + MaybeHttpsStream::Http(s) => Pin::new(s).poll_write(ctx, buf), + MaybeHttpsStream::Https(s) => Pin::new(s).poll_write(ctx, buf), + } + } + + fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + match &mut *self { + MaybeHttpsStream::Http(s) => Pin::new(s).poll_flush(ctx), + MaybeHttpsStream::Https(s) => Pin::new(s).poll_flush(ctx), + } + } + + fn poll_shutdown(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + match &mut *self { + MaybeHttpsStream::Http(s) => Pin::new(s).poll_shutdown(ctx), + MaybeHttpsStream::Https(s) => Pin::new(s).poll_shutdown(ctx), + } + } +} diff --git a/hyper-boring/src/v1.rs b/hyper-boring/src/v1.rs new file mode 100644 index 00000000..10d0d598 --- /dev/null +++ b/hyper-boring/src/v1.rs @@ -0,0 +1,350 @@ +use crate::cache::{SessionCache, SessionKey}; +use crate::{key_index, HttpsLayerSettings, MaybeHttpsStream}; +use antidote::Mutex; +use boring::error::ErrorStack; +use boring::ssl::{ + ConnectConfiguration, Ssl, SslConnector, SslConnectorBuilder, SslMethod, SslRef, + SslSessionCacheMode, +}; +use http::uri::Scheme; +use http::Uri; +use hyper::rt::{Read, ReadBufCursor, Write}; +use hyper_util::client::legacy::connect::{Connected, Connection, HttpConnector}; +use hyper_util::rt::TokioIo; +use std::error::Error; +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::{io, net}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tower_layer::Layer; +use tower_service::Service; + +/// A Connector using OpenSSL to support `http` and `https` schemes. +#[derive(Clone)] +pub struct HttpsConnector { + http: T, + inner: Inner, +} + +#[cfg(feature = "runtime")] +impl HttpsConnector { + /// Creates a a new `HttpsConnector` using default settings. + /// + /// The Hyper `HttpConnector` is used to perform the TCP socket connection. ALPN is configured to support both + /// HTTP/2 and HTTP/1.1. + /// + /// Requires the `runtime` Cargo feature. + pub fn new() -> Result, ErrorStack> { + let mut http = HttpConnector::new(); + http.enforce_http(false); + + HttpsLayer::new().map(|l| l.layer(http)) + } +} + +impl HttpsConnector +where + S: Service> + Send, + S::Error: Into>, + S::Future: Unpin + Send + 'static, + T: AsyncRead + AsyncWrite + Connection + Unpin + fmt::Debug + Sync + Send + 'static, +{ + /// Creates a new `HttpsConnector`. + /// + /// The session cache configuration of `ssl` will be overwritten. + pub fn with_connector( + http: S, + ssl: SslConnectorBuilder, + ) -> Result, ErrorStack> { + HttpsLayer::with_connector(ssl).map(|l| l.layer(http)) + } + + /// Registers a callback which can customize the configuration of each connection. + /// + /// Unsuitable to change verify hostflags (with `config.param_mut().set_hostflags(…)`), + /// as they are reset after the callback is executed. Use [`Self::set_ssl_callback`] + /// instead. + pub fn set_callback(&mut self, callback: F) + where + F: Fn(&mut ConnectConfiguration, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send, + { + self.inner.callback = Some(Arc::new(callback)); + } + + /// Registers a callback which can customize the `Ssl` of each connection. + pub fn set_ssl_callback(&mut self, callback: F) + where + F: Fn(&mut SslRef, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send, + { + self.inner.ssl_callback = Some(Arc::new(callback)); + } +} + +/// A layer which wraps services in an `HttpsConnector`. +pub struct HttpsLayer { + inner: Inner, +} + +#[derive(Clone)] +struct Inner { + ssl: SslConnector, + cache: Arc>, + callback: Option, + ssl_callback: Option, +} + +type Callback = + Arc Result<(), ErrorStack> + Sync + Send>; +type SslCallback = Arc Result<(), ErrorStack> + Sync + Send>; + +impl HttpsLayer { + /// Creates a new `HttpsLayer` with default settings. + /// + /// ALPN is configured to support both HTTP/1 and HTTP/1.1. + pub fn new() -> Result { + let mut ssl = SslConnector::builder(SslMethod::tls())?; + + ssl.set_alpn_protos(b"\x02h2\x08http/1.1")?; + + Self::with_connector(ssl) + } + + /// Creates a new `HttpsLayer`. + /// + /// The session cache configuration of `ssl` will be overwritten. + pub fn with_connector(ssl: SslConnectorBuilder) -> Result { + Self::with_connector_and_settings(ssl, Default::default()) + } + + /// Creates a new `HttpsLayer` with settings + pub fn with_connector_and_settings( + mut ssl: SslConnectorBuilder, + settings: HttpsLayerSettings, + ) -> Result { + let cache = Arc::new(Mutex::new(SessionCache::with_capacity( + settings.session_cache_capacity, + ))); + + ssl.set_session_cache_mode(SslSessionCacheMode::CLIENT); + + ssl.set_new_session_callback({ + let cache = cache.clone(); + move |ssl, session| { + if let Some(key) = key_index().ok().and_then(|idx| ssl.ex_data(idx)) { + cache.lock().insert(key.clone(), session); + } + } + }); + + Ok(HttpsLayer { + inner: Inner { + ssl: ssl.build(), + cache, + callback: None, + ssl_callback: None, + }, + }) + } + + /// Registers a callback which can customize the configuration of each connection. + /// + /// Unsuitable to change verify hostflags (with `config.param_mut().set_hostflags(…)`), + /// as they are reset after the callback is executed. Use [`Self::set_ssl_callback`] + /// instead. + pub fn set_callback(&mut self, callback: F) + where + F: Fn(&mut ConnectConfiguration, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send, + { + self.inner.callback = Some(Arc::new(callback)); + } + + /// Registers a callback which can customize the `Ssl` of each connection. + pub fn set_ssl_callback(&mut self, callback: F) + where + F: Fn(&mut SslRef, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send, + { + self.inner.ssl_callback = Some(Arc::new(callback)); + } +} + +impl Layer for HttpsLayer { + type Service = HttpsConnector; + + fn layer(&self, inner: S) -> HttpsConnector { + HttpsConnector { + http: inner, + inner: self.inner.clone(), + } + } +} + +impl Inner { + fn setup_ssl(&self, uri: &Uri, host: &str) -> Result { + let mut conf = self.ssl.configure()?; + + if let Some(ref callback) = self.callback { + callback(&mut conf, uri)?; + } + + let key = SessionKey { + host: host.to_string(), + port: uri.port_u16().unwrap_or(443), + }; + + if let Some(session) = self.cache.lock().get(&key) { + unsafe { + conf.set_session(&session)?; + } + } + + let idx = key_index()?; + conf.set_ex_data(idx, key); + + let mut ssl = conf.into_ssl(host)?; + + if let Some(ref ssl_callback) = self.ssl_callback { + ssl_callback(&mut ssl, uri)?; + } + + Ok(ssl) + } +} + +impl Service for HttpsConnector +where + S: Service> + Send, + S::Error: Into>, + S::Future: Unpin + Send + 'static, + T: AsyncRead + AsyncWrite + Connection + Unpin + fmt::Debug + Sync + Send + 'static, +{ + type Response = MaybeHttpsStream; + type Error = Box; + #[allow(clippy::type_complexity)] + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.http.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, uri: Uri) -> Self::Future { + let is_tls_scheme = uri + .scheme() + .map(|s| s == &Scheme::HTTPS || s.as_str() == "wss") + .unwrap_or(false); + + let tls_setup = if is_tls_scheme { + Some((self.inner.clone(), uri.clone())) + } else { + None + }; + + let connect = self.http.call(uri); + + let f = async { + let conn = connect.await.map_err(Into::into)?.into_inner(); + + let (inner, uri) = match tls_setup { + Some((inner, uri)) => (inner, uri), + None => return Ok(MaybeHttpsStream::Http(conn)), + }; + + let mut host = uri.host().ok_or("URI missing host")?; + + // If `host` is an IPv6 address, we must strip away the square brackets that surround + // it (otherwise, boring will fail to parse the host as an IP address, eventually + // causing the handshake to fail due a hostname verification error). + if !host.is_empty() { + let last = host.len() - 1; + let mut chars = host.chars(); + + if let (Some('['), Some(']')) = (chars.next(), chars.last()) { + if host[1..last].parse::().is_ok() { + host = &host[1..last]; + } + } + } + + let ssl = inner.setup_ssl(&uri, host)?; + let stream = tokio_boring::SslStreamBuilder::new(ssl, conn) + .connect() + .await?; + + Ok(MaybeHttpsStream::Https(stream)) + }; + + Box::pin(f) + } +} + +impl Connection for MaybeHttpsStream +where + T: Connection, +{ + fn connected(&self) -> Connected { + match self { + MaybeHttpsStream::Http(s) => s.connected(), + MaybeHttpsStream::Https(s) => { + let mut connected = s.get_ref().connected(); + + if s.ssl().selected_alpn_protocol() == Some(b"h2") { + connected = connected.negotiated_h2(); + } + + connected + } + } + } +} + +impl Read for MaybeHttpsStream +where + T: AsyncRead + AsyncWrite + Unpin, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: ReadBufCursor<'_>, + ) -> Poll> { + match &mut *self { + MaybeHttpsStream::Http(inner) => Pin::new(&mut TokioIo::new(inner)).poll_read(cx, buf), + MaybeHttpsStream::Https(inner) => Pin::new(&mut TokioIo::new(inner)).poll_read(cx, buf), + } + } +} + +impl Write for MaybeHttpsStream +where + T: AsyncRead + AsyncWrite + Unpin, +{ + fn poll_write( + mut self: Pin<&mut Self>, + ctx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + match &mut *self { + MaybeHttpsStream::Http(inner) => { + Pin::new(&mut TokioIo::new(inner)).poll_write(ctx, buf) + } + MaybeHttpsStream::Https(inner) => { + Pin::new(&mut TokioIo::new(inner)).poll_write(ctx, buf) + } + } + } + + fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + match &mut *self { + MaybeHttpsStream::Http(inner) => Pin::new(&mut TokioIo::new(inner)).poll_flush(ctx), + MaybeHttpsStream::Https(inner) => Pin::new(&mut TokioIo::new(inner)).poll_flush(ctx), + } + } + + fn poll_shutdown(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + match &mut *self { + MaybeHttpsStream::Http(inner) => Pin::new(&mut TokioIo::new(inner)).poll_shutdown(ctx), + MaybeHttpsStream::Https(inner) => Pin::new(&mut TokioIo::new(inner)).poll_shutdown(ctx), + } + } +} diff --git a/hyper-boring/tests/hyper1.rs b/hyper-boring/tests/hyper1.rs new file mode 100644 index 00000000..441caea6 --- /dev/null +++ b/hyper-boring/tests/hyper1.rs @@ -0,0 +1,160 @@ +#![cfg(feature = "hyper1")] + +use boring::ssl::{SslAcceptor, SslConnector, SslFiletype, SslMethod}; +use bytes::Bytes; +use futures::StreamExt; +use http_body_util::{BodyStream, Empty}; +use hyper::{service, Response}; +use hyper_boring::v1::HttpsConnector; +use hyper_util::client::legacy::connect::HttpConnector; +use hyper_util::client::legacy::Client; +use hyper_util::rt::{TokioExecutor, TokioIo}; +use std::convert::Infallible; +use std::{io, iter}; +use tokio::net::TcpListener; + +#[tokio::test] +async fn google() { + let ssl = HttpsConnector::new().unwrap(); + let client = Client::builder(TokioExecutor::new()) + .pool_max_idle_per_host(0) + .build::<_, Empty>(ssl); + + for _ in 0..3 { + let resp = client + .get("https://www.google.com".parse().unwrap()) + .await + .expect("connection should succeed"); + let mut body = BodyStream::new(resp.into_body()); + while body.next().await.transpose().unwrap().is_some() {} + } +} + +#[tokio::test] +async fn localhost() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let port = addr.port(); + + let server = async move { + let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); + acceptor.set_session_id_context(b"test").unwrap(); + acceptor + .set_private_key_file("tests/test/key.pem", SslFiletype::PEM) + .unwrap(); + acceptor + .set_certificate_chain_file("tests/test/cert.pem") + .unwrap(); + let acceptor = acceptor.build(); + + for _ in 0..3 { + let stream = listener.accept().await.unwrap().0; + let stream = tokio_boring::accept(&acceptor, stream).await.unwrap(); + + let service = service::service_fn(|_| async { + Ok::<_, io::Error>(Response::new(>::new())) + }); + + hyper::server::conn::http1::Builder::new() + .keep_alive(false) + .serve_connection(TokioIo::new(stream), service) + .await + .unwrap(); + } + }; + tokio::spawn(server); + + let resolver = + tower::service_fn(move |_name| async move { Ok::<_, Infallible>(iter::once(addr)) }); + + let mut connector = HttpConnector::new_with_resolver(resolver); + + connector.enforce_http(false); + + let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap(); + + ssl.set_ca_file("tests/test/root-ca.pem").unwrap(); + + use std::fs::File; + use std::io::Write; + + let file = File::create("../target/keyfile.log").unwrap(); + ssl.set_keylog_callback(move |_, line| { + let _ = writeln!(&file, "{}", line); + }); + + let ssl = HttpsConnector::with_connector(connector, ssl).unwrap(); + let client = Client::builder(TokioExecutor::new()).build::<_, Empty>(ssl); + + for _ in 0..3 { + let resp = client + .get(format!("https://foobar.com:{}", port).parse().unwrap()) + .await + .unwrap(); + assert!(resp.status().is_success(), "{}", resp.status()); + let mut body = BodyStream::new(resp.into_body()); + while body.next().await.transpose().unwrap().is_some() {} + } +} + +#[tokio::test] +async fn alpn_h2() { + use boring::ssl::{self, AlpnError}; + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let port = addr.port(); + + let server = async move { + let mut acceptor = SslAcceptor::mozilla_modern(SslMethod::tls()).unwrap(); + acceptor + .set_certificate_chain_file("tests/test/cert.pem") + .unwrap(); + acceptor + .set_private_key_file("tests/test/key.pem", SslFiletype::PEM) + .unwrap(); + acceptor.set_alpn_select_callback(|_, client| { + ssl::select_next_proto(b"\x02h2", client).ok_or(AlpnError::NOACK) + }); + let acceptor = acceptor.build(); + + let stream = listener.accept().await.unwrap().0; + let stream = tokio_boring::accept(&acceptor, stream).await.unwrap(); + assert_eq!(stream.ssl().selected_alpn_protocol().unwrap(), b"h2"); + + let service = service::service_fn(|_| async { + Ok::<_, io::Error>(Response::new(>::new())) + }); + + hyper::server::conn::http2::Builder::new(TokioExecutor::new()) + .serve_connection(TokioIo::new(stream), service) + .await + .unwrap(); + }; + tokio::spawn(server); + + let resolver = + tower::service_fn(move |_name| async move { Ok::<_, Infallible>(iter::once(addr)) }); + + let mut connector = HttpConnector::new_with_resolver(resolver); + + connector.enforce_http(false); + + let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap(); + + ssl.set_ca_file("tests/test/root-ca.pem").unwrap(); + + let mut ssl = HttpsConnector::with_connector(connector, ssl).unwrap(); + + ssl.set_ssl_callback(|ssl, _| ssl.set_alpn_protos(b"\x02h2\x08http/1.1")); + + let client = Client::builder(TokioExecutor::new()).build::<_, Empty>(ssl); + + let resp = client + .get(format!("https://foobar.com:{}", port).parse().unwrap()) + .await + .unwrap(); + assert!(resp.status().is_success(), "{}", resp.status()); + let mut body = BodyStream::new(resp.into_body()); + while body.next().await.transpose().unwrap().is_some() {} +} diff --git a/hyper-boring/src/test.rs b/hyper-boring/tests/old.rs similarity index 87% rename from hyper-boring/src/test.rs rename to hyper-boring/tests/old.rs index 006d5163..08cfce12 100644 --- a/hyper-boring/src/test.rs +++ b/hyper-boring/tests/old.rs @@ -1,12 +1,12 @@ -use super::*; -use boring::ssl::{SslAcceptor, SslFiletype, SslMethod}; +use boring::ssl::{SslAcceptor, SslConnector, SslFiletype, SslMethod}; use futures::StreamExt; -use hyper::client::HttpConnector; -use hyper::server::conn::Http; -use hyper::{service, Response}; -use hyper::{Body, Client}; +use hyper_boring::HttpsConnector; +use hyper_old::client::HttpConnector; +use hyper_old::server::conn::Http; +use hyper_old::{service, Response}; +use hyper_old::{Body, Client}; use std::convert::Infallible; -use std::iter; +use std::{io, iter}; use tokio::net::TcpListener; #[tokio::test] @@ -37,10 +37,10 @@ async fn localhost() { let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); acceptor.set_session_id_context(b"test").unwrap(); acceptor - .set_private_key_file("test/key.pem", SslFiletype::PEM) + .set_private_key_file("tests/test/key.pem", SslFiletype::PEM) .unwrap(); acceptor - .set_certificate_chain_file("test/cert.pem") + .set_certificate_chain_file("tests/test/cert.pem") .unwrap(); let acceptor = acceptor.build(); @@ -69,7 +69,7 @@ async fn localhost() { let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap(); - ssl.set_ca_file("test/root-ca.pem").unwrap(); + ssl.set_ca_file("tests/test/root-ca.pem").unwrap(); use std::fs::File; use std::io::Write; @@ -104,10 +104,10 @@ async fn alpn_h2() { let server = async move { let mut acceptor = SslAcceptor::mozilla_modern(SslMethod::tls()).unwrap(); acceptor - .set_certificate_chain_file("test/cert.pem") + .set_certificate_chain_file("tests/test/cert.pem") .unwrap(); acceptor - .set_private_key_file("test/key.pem", SslFiletype::PEM) + .set_private_key_file("tests/test/key.pem", SslFiletype::PEM) .unwrap(); acceptor.set_alpn_select_callback(|_, client| { ssl::select_next_proto(b"\x02h2", client).ok_or(AlpnError::NOACK) @@ -138,7 +138,7 @@ async fn alpn_h2() { let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap(); - ssl.set_ca_file("test/root-ca.pem").unwrap(); + ssl.set_ca_file("tests/test/root-ca.pem").unwrap(); let mut ssl = HttpsConnector::with_connector(connector, ssl).unwrap(); diff --git a/hyper-boring/test/cert.pem b/hyper-boring/tests/test/cert.pem similarity index 100% rename from hyper-boring/test/cert.pem rename to hyper-boring/tests/test/cert.pem diff --git a/hyper-boring/test/key.pem b/hyper-boring/tests/test/key.pem similarity index 100% rename from hyper-boring/test/key.pem rename to hyper-boring/tests/test/key.pem diff --git a/hyper-boring/test/root-ca.pem b/hyper-boring/tests/test/root-ca.pem similarity index 100% rename from hyper-boring/test/root-ca.pem rename to hyper-boring/tests/test/root-ca.pem