Skip to content

Commit

Permalink
dfp: allowing the dfp cluster to do dns lookups
Browse files Browse the repository at this point in the history
Signed-off-by: Alyssa Wilk <[email protected]>
  • Loading branch information
alyssawilk committed Jan 29, 2025
1 parent a20dffd commit ce1d3c8
Show file tree
Hide file tree
Showing 18 changed files with 215 additions and 63 deletions.
5 changes: 5 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ removed_config_or_runtime:
Removed runtime flag ``envoy.reloadable_features.dns_details`` and legacy code paths.
new_features:
- area: dfp
change: |
The DFP cluster will now use the async lookup path to do DNS resolutions for null hosts. This behavioral change
can be temporarily reverted by setting runtime guard ``envoy.reloadable_features.dfp_cluster_resolves_hosts``
to false.
- area: oauth2
change: |
Add the option to specify SameSite cookie attribute values for oauth2 supported cookies.
Expand Down
6 changes: 5 additions & 1 deletion envoy/upstream/load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ struct HostSelectionResponse {
HostSelectionResponse(HostConstSharedPtr host,
std::unique_ptr<AsyncHostSelectionHandle> cancelable = nullptr)
: host(host), cancelable(std::move(cancelable)) {}
HostSelectionResponse(HostConstSharedPtr host, std::string details)
: host(host), details(details) {}
HostConstSharedPtr host;
std::string details;
std::unique_ptr<AsyncHostSelectionHandle> cancelable;
};

Expand Down Expand Up @@ -151,8 +154,9 @@ class LoadBalancerContext {

/* Called by the load balancer when asynchronous host selection completes
* @param host supplies the upstream host selected
* @param details gives optional details about the resolution success/failure.
*/
virtual void onAsyncHostSelection(HostConstSharedPtr&& host) PURE;
virtual void onAsyncHostSelection(HostConstSharedPtr&& host, std::string details) PURE;
};

/**
Expand Down
58 changes: 38 additions & 20 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,9 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
Network::Socket::appendOptions(upstream_options_, callbacks_->getUpstreamSocketOptions());
}

callbacks_->streamInfo().downstreamTiming().setValue(
"envoy.router.host_selection_start_ms",
callbacks_->dispatcher().timeSource().monotonicTime());
auto host_selection_response = cluster->chooseHost(this);
if (!host_selection_response.cancelable ||
!Runtime::runtimeFeatureEnabled("envoy.reloadable_features.async_host_selection")) {
Expand All @@ -668,7 +671,8 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
// well as handling unsupported asynchronous host selection by treating it
// as host selection failure and calling sendNoHealthyUpstreamResponse.
return continueDecodeHeaders(cluster, headers, end_stream, modify_headers, nullptr,
std::move(host_selection_response.host));
std::move(host_selection_response.host),
std::string(host_selection_response.details));
}

ENVOY_STREAM_LOG(debug, "Doing asynchronous host selection\n", *callbacks_);
Expand All @@ -677,13 +681,14 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
host_selection_cancelable_ = std::move(host_selection_response.cancelable);
// Configure a callback to be called on asynchronous host selection.
on_host_selected_ = ([this, cluster, end_stream,
modify_headers](Upstream::HostConstSharedPtr&& host) -> void {
modify_headers](Upstream::HostConstSharedPtr&& host,
std::string host_selection_details) -> void {
// It should always be safe to call continueDecodeHeaders. In the case the
// stream had a local reply before host selection completed,
// the lookup should be canceled.
bool should_continue_decoding = false;
continueDecodeHeaders(cluster, *downstream_headers_, end_stream, modify_headers,
&should_continue_decoding, std::move(host));
&should_continue_decoding, std::move(host), host_selection_details);
// continueDecodeHeaders can itself send a local reply, in which case should_continue_decoding
// should be false. If this is not the case, we can continue the filter chain due to successful
// asynchronous host selection.
Expand All @@ -699,23 +704,26 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
}

// When asynchronous host selection is complete, call the pre-configured on_host_selected_function.
void Filter::onAsyncHostSelection(Upstream::HostConstSharedPtr&& host) {
ENVOY_STREAM_LOG(debug, "Completing asynchronous host selection\n", *callbacks_);
void Filter::onAsyncHostSelection(Upstream::HostConstSharedPtr&& host, std::string details) {
ENVOY_STREAM_LOG(debug, "Completing asynchronous host selection [{}]\n", *callbacks_, details);
std::unique_ptr<Upstream::AsyncHostSelectionHandle> local_scope =
std::move(host_selection_cancelable_);
on_host_selected_(std::move(host));
on_host_selected_(std::move(host), details);
}

Http::FilterHeadersStatus Filter::continueDecodeHeaders(
Upstream::ThreadLocalCluster* cluster, Http::RequestHeaderMap& headers, bool end_stream,
std::function<void(Http::ResponseHeaderMap&)> modify_headers, bool* should_continue_decoding,
Upstream::HostConstSharedPtr&& selected_host) {
Upstream::HostConstSharedPtr&& selected_host,
absl::optional<std::string> host_selection_details) {
const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState();
const DebugConfig* debug_config = filter_state->getDataReadOnly<DebugConfig>(DebugConfig::key());
callbacks_->streamInfo().downstreamTiming().setValue(
"envoy.router.host_selection_end_ms", callbacks_->dispatcher().timeSource().monotonicTime());

std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster, selected_host);
if (!generic_conn_pool) {
sendNoHealthyUpstreamResponse();
sendNoHealthyUpstreamResponse(host_selection_details);
return Http::FilterHeadersStatus::StopIteration;
}
Upstream::HostDescriptionConstSharedPtr host = generic_conn_pool->host();
Expand Down Expand Up @@ -938,12 +946,14 @@ std::unique_ptr<GenericConnPool> Filter::createConnPool(Upstream::ThreadLocalClu
callbacks_->streamInfo().protocol(), this, *message);
}

void Filter::sendNoHealthyUpstreamResponse() {
void Filter::sendNoHealthyUpstreamResponse(absl::optional<std::string> optional_details) {
callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoHealthyUpstream);
chargeUpstreamCode(Http::Code::ServiceUnavailable, {}, false);
absl::string_view details = (optional_details.has_value() && !optional_details->empty())
? absl::string_view(*optional_details)
: StreamInfo::ResponseCodeDetails::get().NoHealthyUpstream;
callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "no healthy upstream", modify_headers_,
absl::nullopt,
StreamInfo::ResponseCodeDetails::get().NoHealthyUpstream);
absl::nullopt, details);
}

Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_stream) {
Expand Down Expand Up @@ -2112,11 +2122,14 @@ void Filter::doRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry
const auto cluster = config_->cm_.getThreadLocalCluster(route_entry_->clusterName());
std::unique_ptr<GenericConnPool> generic_conn_pool;
if (cluster == nullptr) {
sendNoHealthyUpstreamResponse();
sendNoHealthyUpstreamResponse({});
cleanup();
return;
}

callbacks_->streamInfo().downstreamTiming().setValue(
"envoy.router.host_selection_start_ms",
callbacks_->dispatcher().timeSource().monotonicTime());
auto host_selection_response = cluster->chooseHost(this);
if (!host_selection_response.cancelable ||
!Runtime::runtimeFeatureEnabled("envoy.reloadable_features.async_host_selection")) {
Expand All @@ -2127,26 +2140,31 @@ void Filter::doRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry
// well as handling unsupported asynchronous host selection (by treating it
// as host selection failure).
continueDoRetry(can_send_early_data, can_use_http3, is_timeout_retry,
std::move(host_selection_response.host), *cluster);
std::move(host_selection_response.host), *cluster,
std::string(host_selection_response.details));
}

ENVOY_STREAM_LOG(debug, "Handling asynchronous host selection for retry\n", *callbacks_);
// Again latch the cancel handle, and set up the callback to be called when host
// selection is complete.
host_selection_cancelable_ = std::move(host_selection_response.cancelable);
on_host_selected_ = ([this, can_send_early_data, can_use_http3, is_timeout_retry,
cluster](Upstream::HostConstSharedPtr&& host) -> void {
continueDoRetry(can_send_early_data, can_use_http3, is_timeout_retry, std::move(host),
*cluster);
});
on_host_selected_ =
([this, can_send_early_data, can_use_http3, is_timeout_retry,
cluster](Upstream::HostConstSharedPtr&& host, std::string host_selection_details) -> void {
continueDoRetry(can_send_early_data, can_use_http3, is_timeout_retry, std::move(host),
*cluster, host_selection_details);
});
}

void Filter::continueDoRetry(bool can_send_early_data, bool can_use_http3,
TimeoutRetry is_timeout_retry, Upstream::HostConstSharedPtr&& host,
Upstream::ThreadLocalCluster& cluster) {
Upstream::ThreadLocalCluster& cluster,
absl::optional<std::string> host_selection_details) {
callbacks_->streamInfo().downstreamTiming().setValue(
"envoy.router.host_selection_end_ms", callbacks_->dispatcher().timeSource().monotonicTime());
std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(cluster, host);
if (!generic_conn_pool) {
sendNoHealthyUpstreamResponse();
sendNoHealthyUpstreamResponse(host_selection_details);
cleanup();
return;
}
Expand Down
12 changes: 7 additions & 5 deletions source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ class Filter : Logger::Loggable<Logger::Id::router>,
continueDecodeHeaders(Upstream::ThreadLocalCluster* cluster, Http::RequestHeaderMap& headers,
bool end_stream,
std::function<void(Http::ResponseHeaderMap&)> modify_headers,
bool* should_continue_decoding, Upstream::HostConstSharedPtr&& host);
bool* should_continue_decoding, Upstream::HostConstSharedPtr&& host,
absl::optional<std::string> host_selection_detailsi = {});

Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override;
Http::FilterTrailersStatus decodeTrailers(Http::RequestTrailerMap& trailers) override;
Expand Down Expand Up @@ -446,7 +447,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
return callbacks_->upstreamOverrideHost();
}

void onAsyncHostSelection(Upstream::HostConstSharedPtr&& host) override;
void onAsyncHostSelection(Upstream::HostConstSharedPtr&& host, std::string details) override;

/**
* Set a computed cookie to be sent with the downstream headers.
Expand Down Expand Up @@ -569,7 +570,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
// if a "good" response comes back and we return downstream, so there is no point in waiting
// for the remaining upstream requests to return.
void resetOtherUpstreams(UpstreamRequest& upstream_request);
void sendNoHealthyUpstreamResponse();
void sendNoHealthyUpstreamResponse(absl::optional<std::string> details);
bool setupRedirect(const Http::ResponseHeaderMap& headers);
bool convertRequestHeadersForInternalRedirect(Http::RequestHeaderMap& downstream_headers,
const Http::ResponseHeaderMap& upstream_headers,
Expand All @@ -579,7 +580,8 @@ class Filter : Logger::Loggable<Logger::Id::router>,
absl::optional<uint64_t> code);
void doRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry is_timeout_retry);
void continueDoRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry is_timeout_retry,
Upstream::HostConstSharedPtr&& host, Upstream::ThreadLocalCluster& cluster);
Upstream::HostConstSharedPtr&& host, Upstream::ThreadLocalCluster& cluster,
absl::optional<std::string> host_selection_details);

void runRetryOptionsPredicates(UpstreamRequest& retriable_request);
// Called immediately after a non-5xx header is received from upstream, performs stats accounting
Expand All @@ -603,7 +605,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
std::unique_ptr<Stats::StatNameDynamicStorage> alt_stat_prefix_;
const VirtualCluster* request_vcluster_{};
RouteStatsContextOptRef route_stats_context_;
std::function<void(Upstream::HostConstSharedPtr&& host)> on_host_selected_;
std::function<void(Upstream::HostConstSharedPtr&& host, std::string details)> on_host_selected_;
std::unique_ptr<Upstream::AsyncHostSelectionHandle> host_selection_cancelable_;
Event::TimerPtr response_timeout_;
TimeoutData timeout_;
Expand Down
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ RUNTIME_GUARD(envoy_reloadable_features_async_host_selection);
RUNTIME_GUARD(envoy_reloadable_features_avoid_dfp_cluster_removal_on_cds_update);
RUNTIME_GUARD(envoy_reloadable_features_boolean_to_string_fix);
RUNTIME_GUARD(envoy_reloadable_features_check_switch_protocol_websocket_handshake);
RUNTIME_GUARD(envoy_reloadable_features_dfp_cluster_resolves_hosts);
RUNTIME_GUARD(envoy_reloadable_features_dfp_fail_on_empty_host_header);
RUNTIME_GUARD(envoy_reloadable_features_disallow_quic_client_udp_mmsg);
RUNTIME_GUARD(envoy_reloadable_features_dns_nodata_noname_is_success);
Expand Down
3 changes: 3 additions & 0 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2238,6 +2238,9 @@ HostSelectionResponse ClusterManagerImpl::ThreadLocalClusterManagerImpl::Cluster
if (host_selection.host || host_selection.cancelable) {
return host_selection;
}
cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
ENVOY_LOG(debug, "no healthy host");
return host_selection;
}

cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/load_balancer_context_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class LoadBalancerContextBase : public LoadBalancerContext {

absl::optional<OverrideHost> overrideHostToSelect() const override { return {}; }

void onAsyncHostSelection(HostConstSharedPtr&&) override {}
void onAsyncHostSelection(HostConstSharedPtr&&, std::string) override {}
};

} // namespace Upstream
Expand Down
74 changes: 66 additions & 8 deletions source/extensions/clusters/dynamic_forward_proxy/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "envoy/stream_info/uint32_accessor.h"

#include "source/common/http/utility.h"
#include "source/common/network/filter_state_proxy_info.h"
#include "source/common/network/transport_socket_options_impl.h"
#include "source/common/router/string_accessor_impl.h"
#include "source/common/stream_info/uint32_accessor_impl.h"
Expand Down Expand Up @@ -46,6 +47,18 @@ class DynamicPortObjectFactory : public StreamInfo::FilterState::ObjectFactory {
}
};

bool isProxying(StreamInfo::StreamInfo* stream_info) {
if (!stream_info || !(Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.skip_dns_lookup_for_proxied_requests"))) {
return false;
}
const Envoy::StreamInfo::FilterStateSharedPtr& filter_state = stream_info->filterState();
return filter_state && filter_state->hasData<Network::Http11ProxyInfoFilterState>(
Network::Http11ProxyInfoFilterState::key());

return false;
}

} // namespace

REGISTER_FACTORY(DynamicHostObjectFactory, StreamInfo::FilterState::ObjectFactory);
Expand Down Expand Up @@ -394,21 +407,66 @@ Cluster::LoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {

if (raw_host.empty()) {
ENVOY_LOG(debug, "host empty");
return {nullptr};
return {nullptr, "empty_host_header"};
}
std::string host = Common::DynamicForwardProxy::DnsHostInfo::normalizeHostForDfp(raw_host, port);
std::string hostname =
Common::DynamicForwardProxy::DnsHostInfo::normalizeHostForDfp(raw_host, port);

if (cluster_.enableSubCluster()) {
return cluster_.chooseHost(host, context);
}
return findHostByName(host);
return cluster_.chooseHost(hostname, context);
}
Upstream::HostConstSharedPtr host = findHostByName(hostname);
bool force_refresh =
Runtime::runtimeFeatureEnabled("envoy.reloadable_features.reresolve_if_no_connections") &&
Runtime::runtimeFeatureEnabled("envoy.reloadable_features.dfp_cluster_resolves_hosts") &&
host && !host->used();
if ((host && !force_refresh) ||
!Runtime::runtimeFeatureEnabled("envoy.reloadable_features.dfp_cluster_resolves_hosts")) {
return {host};
}

// If the host is not found, the DFP cluster cluster can now do asynchronous lookup.
Upstream::ResourceAutoIncDecPtr handle = cluster_.dns_cache_->canCreateDnsRequest();

// Return an immediate failure if there's too many requests already.
if (!handle) {
return {nullptr, "dns_cache_pending_requests_overflow"};
}

// Attempt to load the host from cache. Generally this will result in async
// resolution so create a DFPHostSelectionHandle to handle this.
std::unique_ptr<DFPHostSelectionHandle> cancelable =
std::make_unique<DFPHostSelectionHandle>(context, cluster_, hostname);
bool is_proxying = isProxying(context->requestStreamInfo());
auto result = cluster_.dns_cache_->loadDnsCacheEntryWithForceRefresh(raw_host, port, is_proxying,
force_refresh, *cancelable);
switch (result.status_) {
case Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryStatus::InCache:
return {nullptr, result.host_info_.has_value() ? result.host_info_.value()->details() : ""};
case Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryStatus::Loading:
// Here the DFP kicks off an async lookup. The DFPHostSelectionHandle will
// call onLoadDnsCacheComplete and onAsyncHostSelection unless the
// resolution is canceled by the stream.
cancelable->handle_ = std::move(result.handle_);
cancelable->auto_dec_ = std::move(handle);
return Upstream::HostSelectionResponse{nullptr, std::move(cancelable)};
case Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryStatus::Overflow:
// In the csae of overflow, return immediate failure.
ENVOY_LOG(debug, "host {} lookup failed due to overflow", hostname);
return {nullptr, "dns_cache_overflow"};
}
return {nullptr};
}

Upstream::HostConstSharedPtr Cluster::LoadBalancer::findHostByName(const std::string& host) const {
return cluster_.findHostByName(host);
}

Upstream::HostConstSharedPtr Cluster::findHostByName(const std::string& host) const {
{
absl::ReaderMutexLock lock{&cluster_.host_map_lock_};
const auto host_it = cluster_.host_map_.find(host);
if (host_it == cluster_.host_map_.end()) {
absl::ReaderMutexLock lock{&host_map_lock_};
const auto host_it = host_map_.find(host);
if (host_it == host_map_.end()) {
ENVOY_LOG(debug, "host {} not found", host);
return nullptr;
} else {
Expand Down
Loading

0 comments on commit ce1d3c8

Please sign in to comment.