Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature-flagged support for socks5 proxy in Client #1311

Merged
merged 3 commits into from
Oct 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ license = "Apache-2.0"
release = false

[features]
default = ["rustls-tls", "kubederive", "ws", "latest", "runtime", "refresh"]
default = ["rustls-tls", "kubederive", "ws", "latest", "socks5", "runtime", "refresh"]
kubederive = ["kube/derive"]
openssl-tls = ["kube/client", "kube/openssl-tls"]
rustls-tls = ["kube/client", "kube/rustls-tls"]
runtime = ["kube/runtime", "kube/unstable-runtime"]
socks5 = ["kube/socks5"]
refresh = ["kube/oauth", "kube/oidc"]
ws = ["kube/ws"]
latest = ["k8s-openapi/latest"]
Expand Down
4 changes: 3 additions & 1 deletion kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ client = ["config", "__non_core", "hyper", "http-body", "tower", "tower-http", "
jsonpatch = ["kube-core/jsonpatch"]
admission = ["kube-core/admission"]
config = ["__non_core", "pem", "home"]
socks5 = ["hyper-socks2"]

# private feature sets; do not use
__non_core = ["tracing", "serde_yaml", "base64"]

[package.metadata.docs.rs]
features = ["client", "rustls-tls", "openssl-tls", "ws", "oauth", "oidc", "jsonpatch", "admission", "k8s-openapi/latest"]
features = ["client", "rustls-tls", "openssl-tls", "ws", "oauth", "oidc", "jsonpatch", "admission", "k8s-openapi/latest", "socks5"]
# Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature.
rustdoc-args = ["--cfg", "docsrs"]

Expand All @@ -59,6 +60,7 @@ jsonpath_lib = { version = "0.3.0", optional = true }
tokio-util = { version = "0.7.0", optional = true, features = ["io", "codec"] }
hyper = { version = "0.14.13", optional = true, features = ["client", "http1", "stream", "tcp"] }
hyper-rustls = { version = "0.24.0", optional = true }
hyper-socks2 = { version = "0.8.0", optional = true, default-features = false }
tokio-tungstenite = { version = "0.20.0", optional = true }
tower = { version = "0.4.13", optional = true, features = ["buffer", "filter", "util"] }
tower-http = { version = "0.4.0", optional = true, features = ["auth", "map-response-body", "trace"] }
Expand Down
244 changes: 135 additions & 109 deletions kube-client/src/client/builder.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use bytes::Bytes;
use http::{Request, Response};
use hyper::{self, client::HttpConnector};
use http::{header::HeaderMap, Request, Response};
use hyper::{
self,
client::{connect::Connection, HttpConnector},
};
use hyper_timeout::TimeoutConnector;
pub use kube_core::response::Status;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use tower::{util::BoxService, BoxError, Layer, Service, ServiceBuilder};
use tower_http::{
classify::ServerErrorsFailureClass, map_response_body::MapResponseBodyLayer, trace::TraceLayer,
};
use tracing::Span;

use crate::{client::ConfigExt, Client, Config, Error, Result};

Expand Down Expand Up @@ -61,117 +67,137 @@
}
}

impl TryFrom<Config> for ClientBuilder<BoxService<Request<hyper::Body>, Response<Box<DynBody>>, BoxError>> {
pub type GenericService = BoxService<Request<hyper::Body>, Response<Box<DynBody>>, BoxError>;

impl TryFrom<Config> for ClientBuilder<GenericService> {
type Error = Error;

/// Builds a default [`ClientBuilder`] stack from a given configuration
fn try_from(config: Config) -> Result<Self> {
use std::time::Duration;

use http::header::HeaderMap;
use tracing::Span;

let default_ns = config.default_namespace.clone();
let auth_layer = config.auth_layer()?;

let client: hyper::Client<_, hyper::Body> = {
let mut connector = HttpConnector::new();
connector.enforce_http(false);

// Current TLS feature precedence when more than one are set:
// 1. rustls-tls
// 2. openssl-tls
// Create a custom client to use something else.
// If TLS features are not enabled, http connector will be used.
#[cfg(feature = "rustls-tls")]
let connector = config.rustls_https_connector_with_connector(connector)?;
#[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
let connector = config.openssl_https_connector_with_connector(connector)?;
#[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
if auth_layer.is_none() || config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) {
// no tls stack situation only works on anonymous auth with http scheme
return Err(Error::TlsRequired);
}

let mut connector = TimeoutConnector::new(connector);

// Set the timeouts for the client
connector.set_connect_timeout(config.connect_timeout);
connector.set_read_timeout(config.read_timeout);
connector.set_write_timeout(config.write_timeout);

hyper::Client::builder().build(connector)
};

let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner();
#[cfg(feature = "gzip")]
let stack = ServiceBuilder::new()
.layer(stack)
.layer(tower_http::decompression::DecompressionLayer::new())
.into_inner();

let service = ServiceBuilder::new()
.layer(stack)
.option_layer(auth_layer)
.layer(config.extra_headers_layer()?)
.layer(
// Attribute names follow [Semantic Conventions].
// [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md
TraceLayer::new_for_http()
.make_span_with(|req: &Request<hyper::Body>| {
tracing::debug_span!(
"HTTP",
http.method = %req.method(),
http.url = %req.uri(),
http.status_code = tracing::field::Empty,
otel.name = req.extensions().get::<&'static str>().unwrap_or(&"HTTP"),
otel.kind = "client",
otel.status_code = tracing::field::Empty,
)
})
.on_request(|_req: &Request<hyper::Body>, _span: &Span| {
tracing::debug!("requesting");
})
.on_response(|res: &Response<hyper::Body>, _latency: Duration, span: &Span| {
let status = res.status();
span.record("http.status_code", status.as_u16());
if status.is_client_error() || status.is_server_error() {
span.record("otel.status_code", "ERROR");
}
})
// Explicitly disable `on_body_chunk`. The default does nothing.
.on_body_chunk(())
.on_eos(|_: Option<&HeaderMap>, _duration: Duration, _span: &Span| {
tracing::debug!("stream closed");
})
.on_failure(|ec: ServerErrorsFailureClass, _latency: Duration, span: &Span| {
// Called when
// - Calling the inner service errored
// - Polling `Body` errored
// - the response was classified as failure (5xx)
// - End of stream was classified as failure
let mut connector = HttpConnector::new();
connector.enforce_http(false);

#[cfg(feature = "socks5")]
if let Some(proxy_addr) = config.proxy_url.clone() {
let connector = hyper_socks2::SocksConnector {
proxy_addr,
auth: None,
connector,
};

return make_generic_builder(connector, config);

Check warning on line 88 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L88

Added line #L88 was not covered by tests
}

make_generic_builder(connector, config)
}
}

/// Helper function for implementation of [`TryFrom<Config>`] for [`ClientBuilder`].
/// Ignores [`Config::proxy_url`], which at this point is already handled.
fn make_generic_builder<H>(base_connector: H, config: Config) -> Result<ClientBuilder<GenericService>, Error>
where
H: 'static + Clone + Send + Sync + Service<http::Uri>,
H::Response: 'static + Connection + AsyncRead + AsyncWrite + Send + Unpin,
H::Future: 'static + Send,
H::Error: 'static + Send + Sync + std::error::Error,
{
let default_ns = config.default_namespace.clone();
let auth_layer = config.auth_layer()?;

let client: hyper::Client<_, hyper::Body> = {

Check warning on line 107 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L107

Added line #L107 was not covered by tests
// Current TLS feature precedence when more than one are set:
// 1. rustls-tls
// 2. openssl-tls
// Create a custom client to use something else.
// If TLS features are not enabled, http connector will be used.
#[cfg(feature = "rustls-tls")]
let connector = config.rustls_https_connector_with_connector(base_connector)?;
#[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
let connector = config.openssl_https_connector_with_connector(base_connector)?;
#[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
if auth_layer.is_none() || config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) {

Check warning on line 118 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L116-L118

Added lines #L116 - L118 were not covered by tests
// no tls stack situation only works on anonymous auth with http scheme
return Err(Error::TlsRequired);

Check warning on line 120 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L120

Added line #L120 was not covered by tests
}

let mut connector = TimeoutConnector::new(connector);

// Set the timeouts for the client
connector.set_connect_timeout(config.connect_timeout);
connector.set_read_timeout(config.read_timeout);
connector.set_write_timeout(config.write_timeout);

hyper::Client::builder().build(connector)
};

let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner();
#[cfg(feature = "gzip")]
let stack = ServiceBuilder::new()
.layer(stack)
.layer(tower_http::decompression::DecompressionLayer::new())

Check warning on line 137 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L135-L137

Added lines #L135 - L137 were not covered by tests
.into_inner();

let service = ServiceBuilder::new()
.layer(stack)
.option_layer(auth_layer)
.layer(config.extra_headers_layer()?)
.layer(
// Attribute names follow [Semantic Conventions].
// [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md
TraceLayer::new_for_http()
.make_span_with(|req: &Request<hyper::Body>| {
tracing::debug_span!(
"HTTP",

Check warning on line 150 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L150

Added line #L150 was not covered by tests
http.method = %req.method(),
http.url = %req.uri(),
http.status_code = tracing::field::Empty,

Check warning on line 153 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L153

Added line #L153 was not covered by tests
otel.name = req.extensions().get::<&'static str>().unwrap_or(&"HTTP"),
otel.kind = "client",
otel.status_code = tracing::field::Empty,

Check warning on line 156 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L155-L156

Added lines #L155 - L156 were not covered by tests
)
})
.on_request(|_req: &Request<hyper::Body>, _span: &Span| {
tracing::debug!("requesting");
})
.on_response(|res: &Response<hyper::Body>, _latency: Duration, span: &Span| {
let status = res.status();
span.record("http.status_code", status.as_u16());
if status.is_client_error() || status.is_server_error() {
span.record("otel.status_code", "ERROR");
match ec {
ServerErrorsFailureClass::StatusCode(status) => {
span.record("http.status_code", status.as_u16());
tracing::error!("failed with status {}", status)
}
ServerErrorsFailureClass::Error(err) => {
tracing::error!("failed with error {}", err)
}
}
}),
)
.service(client);

Ok(Self::new(
BoxService::new(
MapResponseBodyLayer::new(|body| {
Box::new(http_body::Body::map_err(body, BoxError::from)) as Box<DynBody>
}
})
.layer(service),
),
default_ns,
))
}
// Explicitly disable `on_body_chunk`. The default does nothing.
.on_body_chunk(())
.on_eos(|_: Option<&HeaderMap>, _duration: Duration, _span: &Span| {
tracing::debug!("stream closed");

Check warning on line 172 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L170-L172

Added lines #L170 - L172 were not covered by tests
})
.on_failure(|ec: ServerErrorsFailureClass, _latency: Duration, span: &Span| {

Check warning on line 174 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L174

Added line #L174 was not covered by tests
// Called when
// - Calling the inner service errored
// - Polling `Body` errored
// - the response was classified as failure (5xx)
// - End of stream was classified as failure
span.record("otel.status_code", "ERROR");
match ec {
ServerErrorsFailureClass::StatusCode(status) => {
span.record("http.status_code", status.as_u16());
tracing::error!("failed with status {}", status)

Check warning on line 184 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L180-L184

Added lines #L180 - L184 were not covered by tests
}
ServerErrorsFailureClass::Error(err) => {
tracing::error!("failed with error {}", err)

Check warning on line 187 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L186-L187

Added lines #L186 - L187 were not covered by tests
}
}
}),
)
.service(client);

Ok(ClientBuilder::new(
BoxService::new(
MapResponseBodyLayer::new(|body| {
Box::new(http_body::Body::map_err(body, BoxError::from)) as Box<DynBody>
})
.layer(service),
),
default_ns,
))
}
37 changes: 25 additions & 12 deletions kube-client/src/client/config_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ pub trait ConfigExt: private::Sealed {
/// ```
#[cfg_attr(docsrs, doc(cfg(feature = "rustls-tls")))]
#[cfg(feature = "rustls-tls")]
fn rustls_https_connector_with_connector(
fn rustls_https_connector_with_connector<H>(
&self,
connector: hyper::client::HttpConnector,
) -> Result<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>>;
connector: H,
) -> Result<hyper_rustls::HttpsConnector<H>>;

/// Create [`rustls::ClientConfig`] based on config.
/// # Example
Expand Down Expand Up @@ -118,10 +118,16 @@ pub trait ConfigExt: private::Sealed {
/// ```
#[cfg_attr(docsrs, doc(cfg(feature = "openssl-tls")))]
#[cfg(feature = "openssl-tls")]
fn openssl_https_connector_with_connector(
fn openssl_https_connector_with_connector<H>(
&self,
connector: hyper::client::HttpConnector,
) -> Result<hyper_openssl::HttpsConnector<hyper::client::HttpConnector>>;
connector: H,
) -> Result<hyper_openssl::HttpsConnector<H>>
where
H: tower::Service<http::Uri> + Send,
H::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
H::Future: Send + 'static,
H::Response:
tokio::io::AsyncRead + tokio::io::AsyncWrite + hyper::client::connect::Connection + Unpin;

/// Create [`openssl::ssl::SslConnectorBuilder`] based on config.
/// # Example
Expand Down Expand Up @@ -215,10 +221,10 @@ impl ConfigExt for Config {
}

#[cfg(feature = "rustls-tls")]
fn rustls_https_connector_with_connector(
fn rustls_https_connector_with_connector<H>(
&self,
connector: hyper::client::HttpConnector,
) -> Result<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>> {
connector: H,
) -> Result<hyper_rustls::HttpsConnector<H>> {
let rustls_config = self.rustls_client_config()?;
let mut builder = hyper_rustls::HttpsConnectorBuilder::new()
.with_tls_config(rustls_config)
Expand All @@ -245,10 +251,17 @@ impl ConfigExt for Config {
}

#[cfg(feature = "openssl-tls")]
fn openssl_https_connector_with_connector(
fn openssl_https_connector_with_connector<H>(
&self,
connector: hyper::client::HttpConnector,
) -> Result<hyper_openssl::HttpsConnector<hyper::client::HttpConnector>> {
connector: H,
) -> Result<hyper_openssl::HttpsConnector<H>>
where
H: tower::Service<http::Uri> + Send,
H::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
H::Future: Send + 'static,
H::Response:
tokio::io::AsyncRead + tokio::io::AsyncWrite + hyper::client::connect::Connection + Unpin,
{
let mut https =
hyper_openssl::HttpsConnector::with_connector(connector, self.openssl_ssl_connector_builder()?)
.map_err(|e| Error::OpensslTls(tls::openssl_tls::Error::CreateHttpsConnector(e)))?;
Expand Down
3 changes: 1 addition & 2 deletions kube-client/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ pub struct Config {
pub accept_invalid_certs: bool,
/// Stores information to tell the cluster who you are.
pub auth_info: AuthInfo,
// TODO Actually support proxy or create an example with custom client
/// Optional proxy URL.
/// Optional proxy URL. Proxy support requires the `socks5` feature.
pub proxy_url: Option<http::Uri>,
/// If set, apiserver certificate will be validated to contain this string
///
Expand Down
3 changes: 2 additions & 1 deletion kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ admission = ["kube-core/admission"]
derive = ["kube-derive", "kube-core/schema"]
runtime = ["kube-runtime"]
unstable-runtime = ["kube-runtime/unstable-runtime"]
socks5 = ["kube-client/socks5"]

[package.metadata.docs.rs]
features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/latest", "unstable-runtime"]
features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/latest", "unstable-runtime", "socks5"]
# Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature.
rustdoc-args = ["--cfg", "docsrs"]

Expand Down
Loading