Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v1.26] Ingress policy enforcement #442

Merged
merged 3 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cilium/api/bpf_metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,13 @@ message BpfMetadata {
// Config is provided, the source address will be picked by the local IP stack.
string ipv4_source_address = 5;
string ipv6_source_address = 6;

// True if policy should be enforced on l7 LB used. The policy bound to the configured
// ipv[46]_source_addresses, which must be explicitly set, applies. Ingress policy is
// enforced on the security identity of the original (e.g., external) source. Egress
// policy is enforced on the security identity of the backend selected by the load balancer.
//
// Deprecation note: This option will be forced 'true' and deprecated when Cilium 1.15 is
// the oldest supported release.
bool enforce_policy_on_l7lb = 7;
}
160 changes: 106 additions & 54 deletions cilium/bpf_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ Config::Config(const ::cilium::BpfMetadata& config,
ipv4_source_address_(
Network::Utility::parseInternetAddressNoThrow(config.ipv4_source_address())),
ipv6_source_address_(
Network::Utility::parseInternetAddressNoThrow(config.ipv6_source_address())) {
Network::Utility::parseInternetAddressNoThrow(config.ipv6_source_address())),
enforce_policy_on_l7lb_(config.enforce_policy_on_l7lb()) {
if (is_l7lb_ && is_ingress_) {
throw EnvoyException("cilium.bpf_metadata: is_l7lb may not be set with is_ingress");
}
Expand Down Expand Up @@ -152,7 +153,6 @@ Config::Config(const ::cilium::BpfMetadata& config,
// Get the shared policy provider, or create it if not already created.
// Note that the API config source is assumed to be the same for all filter
// instances!

npmap_ = createPolicyMap(context, ct_maps_);
}

Expand Down Expand Up @@ -180,7 +180,7 @@ const PolicyInstanceConstSharedPtr Config::getPolicy(const std::string& pod_ip)
// Allow all traffic for egress without a policy when 'is_l7lb_' is true.
// This is the case for L7 LB listeners only. This is needed to allow traffic forwarded by k8s
// Ingress (which is implemented as an egress listener!).
if (!is_ingress_ && is_l7lb_) {
if (!enforce_policy_on_l7lb_ && !is_ingress_ && is_l7lb_) {
return npmap_->AllowAllEgressPolicy;
}
}
Expand Down Expand Up @@ -216,20 +216,21 @@ bool Config::getMetadata(Network::ConnectionSocket& socket) {
}

auto policy = getPolicy(pod_ip);
if (policy == nullptr) {
ENVOY_LOG(warn, "cilium.bpf_metadata ({}): No policy found for {}",
is_ingress_ ? "ingress" : "egress", pod_ip);
return false;
}

uint32_t source_identity = 0;
// Resolve the source security ID from conntrack map, or from ip cache
if (ct_maps_ != nullptr) {
auto ct_name = policy->conntrackName();
if (ct_name.length() > 0) {
source_identity = ct_maps_->lookupSrcIdentity(ct_name, sip, dip, is_ingress_);
if (policy) {
const std::string& ct_name = policy->conntrackName();
if (ct_name.length() > 0) {
source_identity = ct_maps_->lookupSrcIdentity(ct_name, sip, dip, is_ingress_);
}
} else if (is_l7lb_) {
// non-local source should be in the global conntrack
source_identity = ct_maps_->lookupSrcIdentity("global", sip, dip, is_ingress_);
}
}
// Fall back to ipcache lookup if conntrack entry can not be located
if (source_identity == 0) {
source_identity = resolvePolicyId(sip);
}
Expand All @@ -240,6 +241,10 @@ bool Config::getMetadata(Network::ConnectionSocket& socket) {
destination_identity = resolvePolicyId(dip);
}

// ingress_source_identity is non-zero when the egress path l7 LB should also enforce
// the ingress path policy using the original source identity.
uint32_t ingress_source_identity = 0;

Network::Address::InstanceConstSharedPtr ipv4_source_address = ipv4_source_address_;
Network::Address::InstanceConstSharedPtr ipv6_source_address = ipv6_source_address_;

Expand All @@ -256,58 +261,97 @@ bool Config::getMetadata(Network::ConnectionSocket& socket) {
//
// NOTE: is_l7lb_ is only used for egress, so the local
// endpoint is the source, and the other node is the destination.
bool east_west_l7_lb = is_l7lb_ && use_original_source_address_ && policy->getEndpointID() != 0;
uint32_t endpoint_id = policy ? policy->getEndpointID() : 0;
bool east_west_l7_lb = false;
if (is_l7lb_) {
if (use_original_source_address_) {
// Use source pod's IP address for east/west l7 LB
if (endpoint_id == 0) {
// Local pod not found. Original source address can only be used for local pods.
ENVOY_LOG(warn, "cilium.bpf_metadata (east/west L7 LB): Non-local pod can not use original source address: {}",
pod_ip);
return false;
}

east_west_l7_lb = true;

if (east_west_l7_lb) {
// Use source pod's IP address for east/west l7 LB
const auto& ips = policy->getEndpointIPs();
if (ips.ipv4_ && ips.ipv6_) {
// Keep the original source address for the matching IP version, create a new source IP for
// the other version (with the same source port number) in case an upstream of a different IP
// version is chosen.
// the other version (with the same source port number) in case an upstream of a different
// IP version is chosen.
const auto& ips = policy->getEndpointIPs();
switch (sip->version()) {
case Network::Address::IpVersion::v4: {
ipv4_source_address = src_address;
sockaddr_in6 sa6 = *reinterpret_cast<const sockaddr_in6*>(ips.ipv6_->sockAddr());
sa6.sin6_port = htons(sip->port());
ipv6_source_address = std::make_shared<Network::Address::Ipv6Instance>(sa6);

ipv4_source_address = src_address;
if (ips.ipv6_) {
sockaddr_in6 sa6 = *reinterpret_cast<const sockaddr_in6*>(ips.ipv6_->sockAddr());
sa6.sin6_port = htons(sip->port());
ipv6_source_address = std::make_shared<Network::Address::Ipv6Instance>(sa6);
} else {
ipv6_source_address = nullptr;
}
} break;
case Network::Address::IpVersion::v6: {
ipv6_source_address = src_address;
sockaddr_in sa4 = *reinterpret_cast<const sockaddr_in*>(ips.ipv4_->sockAddr());
sa4.sin_port = htons(sip->port());
ipv4_source_address = std::make_shared<Network::Address::Ipv4Instance>(&sa4);
ipv6_source_address = src_address;
if (ips.ipv4_) {
sockaddr_in sa4 = *reinterpret_cast<const sockaddr_in*>(ips.ipv4_->sockAddr());
sa4.sin_port = htons(sip->port());
ipv4_source_address = std::make_shared<Network::Address::Ipv4Instance>(&sa4);
} else {
ipv4_source_address = nullptr;
}
} break;
}
// Original source address is now in one of 'ipv[46]_source_address'
src_address = nullptr;
}
} else if (is_l7lb_) {
// North/south L7 LB, assume the source security identity of the configured source addresses, if
// any and policy for this identity exists.
const Network::Address::Ip* ip = nullptr;
if (ipv4_source_address && ipv4_source_address->ip()) {
ip = ipv4_source_address->ip();
} else if (ipv6_source_address && ipv6_source_address->ip()) {
ip = ipv6_source_address->ip();
}
if (ip) {
} else {
// North/south L7 LB, assume the source security identity of the configured source addresses,
// if any and policy for this identity exists.
const Network::Address::Ip* ip = nullptr;

// Pick the local source address of the same family as the incoming connection
switch (sip->version()) {
case Network::Address::IpVersion::v4:
if (ipv4_source_address) {
ip = ipv4_source_address->ip();
}
break;
case Network::Address::IpVersion::v6:
if (ipv6_source_address) {
ip = ipv6_source_address->ip();
}
break;
}
if (!ip) {
// IP family of the connection has no configured local source address
ENVOY_LOG(warn, "cilium.bpf_metadata (north/south L7 LB): No local IP source address configured for the family of {}",
pod_ip);
return false;
}

pod_ip = ip->addressAsString();

auto new_id = resolvePolicyId(ip);
if (new_id != Cilium::ID::WORLD) {
auto new_pod_ip = ip->addressAsString();
// AllowAllEgressPolicy will be returned if no explicit Ingress policy exists
const auto& new_policy = getPolicy(new_pod_ip);
if (new_policy) {
source_identity = new_id;
pod_ip = new_pod_ip;
policy = new_policy;
}
} // The configured IP is used, but the original source identity, pod IP and policy are kept
if (new_id == Cilium::ID::WORLD) {
// No security ID available for the configured source IP
ENVOY_LOG(warn, "cilium.bpf_metadata (north/south L7 LB): Unknown local IP source address configured: {}",
pod_ip);
return false;
}

// Enforce ingress policy on the incoming Ingress traffic?
if (enforce_policy_on_l7lb_)
ingress_source_identity = source_identity;

source_identity = new_id;

// AllowAllEgressPolicy will be returned if no explicit Ingress policy exists
policy = getPolicy(pod_ip);

// Original source address is never used for north/south LB
// This means that a local host IP is used if no IP is configured to be used instead of it
// ('ip' above is null).
src_address = nullptr;
}
// Original source address is never used for north/south LB
// This means that a local host IP is used if no IP is configured to be used instead of it
// ('ip' above is null).
src_address = nullptr;

// Otherwise only use the original source address if permitted, destination identity is not a
// locally allocated identity, is not classified as WORLD, and the destination is not in the
Expand All @@ -325,6 +369,13 @@ bool Config::getMetadata(Network::ConnectionSocket& socket) {
socket.addOptions(Network::SocketOptionFactory::buildReusePortOptions());
}

// policy must exist at this point
if (policy == nullptr) {
ENVOY_LOG(warn, "cilium.bpf_metadata ({}): No policy found for {}",
is_ingress_ ? "ingress" : "egress", pod_ip);
return false;
}

// Add metadata for policy based listener filter chain matching.
// This requires the TLS inspector, if used, to run before us.
// Note: This requires egress policy be known before upstream host selection,
Expand All @@ -351,7 +402,7 @@ bool Config::getMetadata(Network::ConnectionSocket& socket) {
// Mark with source endpoint ID for east/west l7 LB. This causes the upstream packets to be
// processed by the the source endpoint's policy enforcement in the datapath.
if (east_west_l7_lb) {
mark = 0x0900 | policy->getEndpointID() << 16;
mark = 0x0900 | endpoint_id << 16;
} else {
// Mark with source identity
uint32_t cluster_id = (source_identity >> 16) & 0xFF;
Expand All @@ -360,7 +411,8 @@ bool Config::getMetadata(Network::ConnectionSocket& socket) {
}
}
socket.addOption(std::make_shared<Cilium::SocketOption>(
policy, mark, source_identity, is_ingress_, is_l7lb_, dip->port(), std::move(pod_ip),
policy, mark, ingress_source_identity, source_identity, is_ingress_, is_l7lb_, dip->port(),
std::move(pod_ip),
std::move(src_address), std::move(ipv4_source_address), std::move(ipv6_source_address),
shared_from_this()));
return true;
Expand Down
1 change: 1 addition & 0 deletions cilium/bpf_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class Config : public Cilium::PolicyResolver,
bool is_l7lb_;
Network::Address::InstanceConstSharedPtr ipv4_source_address_;
Network::Address::InstanceConstSharedPtr ipv6_source_address_;
bool enforce_policy_on_l7lb_;

std::shared_ptr<const Cilium::NetworkPolicyMap> npmap_{};
Cilium::CtMapSharedPtr ct_maps_{};
Expand Down
25 changes: 18 additions & 7 deletions cilium/l7policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,30 @@ Http::FilterHeadersStatus AccessFilter::decodeHeaders(Http::RequestHeaderMap& he
uint32_t destination_port = dip->port();
uint32_t destination_identity = option->resolvePolicyId(dip);

// Policy may have changed since the connection was established, get fresh policy
const auto& policy = option->getPolicy();
if (policy) {
if (!policy) {
ENVOY_LOG(debug, "cilium.l7policy: No policy found for pod {}, defaulting to DENY",
option->pod_ip_);
return false;
}

allowed_ = true;
if (option->ingress_source_identity_ != 0) {
allowed_ = policy->Allowed(true, option->port_, option->ingress_source_identity_,
headers, log_entry_);
ENVOY_LOG(debug, "cilium.l7policy: Ingress from {} policy lookup for endpoint {} for port {}: {}",
option->ingress_source_identity_,
option->pod_ip_, option->port_, allowed_ ? "ALLOW" : "DENY");
}
if (allowed_) {
allowed_ = policy->Allowed(option->ingress_, destination_port,
option->ingress_ ? option->identity_ : destination_identity,
headers, log_entry_);
option->ingress_ ? option->identity_ : destination_identity,
headers, log_entry_);
ENVOY_LOG(debug, "cilium.l7policy: {} ({}->{}) policy lookup for endpoint {} for port {}: {}",
option->ingress_ ? "ingress" : "egress", option->identity_, destination_identity,
option->pod_ip_, destination_port, allowed_ ? "ALLOW" : "DENY");
} else {
ENVOY_LOG(debug, "cilium.l7policy: No {} policy found for pod {}, defaulting to DENY",
option->ingress_ ? "ingress" : "egress", option->pod_ip_);
}

// Update the log entry with the chosen destination address and current headers, as remaining
// filters, upstream, and/or policy may have altered headers.
log_entry_.UpdateFromRequest(destination_identity, dst_address, headers);
Expand Down
15 changes: 13 additions & 2 deletions cilium/network_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,17 @@ Network::FilterStatus Instance::onNewConnection() {
}
destination_port = dip->port();
destination_identity = option->resolvePolicyId(dip);

if (option->ingress_source_identity_ != 0) {
port_policy_ = option->initial_policy_->findPortPolicy(true, destination_port,
option->ingress_source_identity_);
if (port_policy_ == nullptr ||
!port_policy_->Matches(sni, option->ingress_source_identity_)) {
ENVOY_CONN_LOG(debug, "cilium.network: ingress policy drop for source identity: {} port: {}", conn,
option->ingress_source_identity_, destination_port);
return false;
}
}
}

port_policy_ = option->initial_policy_->findPortPolicy(option->ingress_, destination_port,
Expand Down Expand Up @@ -215,7 +226,7 @@ Network::FilterStatus Instance::onData(Buffer::Instance& data, bool end_stream)
if (res != FILTER_OK) {
// Drop the connection due to an error
go_parser_->Close();
conn.close(Network::ConnectionCloseType::NoFlush);
conn.close(Network::ConnectionCloseType::NoFlush, "proxylib error");
return Network::FilterStatus::StopIteration;
}

Expand All @@ -236,7 +247,7 @@ Network::FilterStatus Instance::onData(Buffer::Instance& data, bool end_stream)
bool changed = log_entry_.UpdateFromMetadata(l7proto_, metadata.filter_metadata().at(l7proto_));

if (!port_policy_->allowed(metadata)) {
conn.close(Network::ConnectionCloseType::NoFlush);
conn.close(Network::ConnectionCloseType::NoFlush, "metadata policy drop");
config_->Log(log_entry_, ::cilium::EntryType::Denied);
return Network::FilterStatus::StopIteration;
} else {
Expand Down
2 changes: 1 addition & 1 deletion cilium/proxylib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ FilterResult GoFilter::Instance::OnIO(bool reply, Buffer::Instance& data, bool e
void GoFilter::Instance::Close() {
(*parent_.go_close_)(connection_id_);
connection_id_ = 0;
conn_.close(Network::ConnectionCloseType::NoFlush);
conn_.close(Network::ConnectionCloseType::FlushWrite);
}

} // namespace Cilium
Expand Down
10 changes: 9 additions & 1 deletion cilium/socket_option.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,16 @@ class SocketMarkOption : public Network::Socket::Option,

class SocketOption : public SocketMarkOption {
public:
SocketOption(PolicyInstanceConstSharedPtr policy, uint32_t mark, uint32_t source_identity,
SocketOption(PolicyInstanceConstSharedPtr policy, uint32_t mark,
uint32_t ingress_source_identity, uint32_t source_identity,
bool ingress, bool l7lb, uint16_t port, std::string&& pod_ip,
Network::Address::InstanceConstSharedPtr original_source_address,
Network::Address::InstanceConstSharedPtr ipv4_source_address,
Network::Address::InstanceConstSharedPtr ipv6_source_address,
const std::shared_ptr<PolicyResolver>& policy_id_resolver)
: SocketMarkOption(mark, source_identity, ingress, l7lb, original_source_address,
ipv4_source_address, ipv6_source_address),
ingress_source_identity_(ingress_source_identity),
initial_policy_(policy), port_(port), pod_ip_(std::move(pod_ip)),
policy_id_resolver_(policy_id_resolver) {
ENVOY_LOG(debug,
Expand All @@ -191,6 +193,12 @@ class SocketOption : public SocketMarkOption {
return policy_id_resolver_->getPolicy(pod_ip_);
}

// policyUseUpstreamDestinationAddress returns 'true' if policy enforcement should be done on the
// basis of the upstream destination address.
bool policyUseUpstreamDestinationAddress() const { return is_l7lb_; }

// Additional ingress policy enforcement is performed if ingress_source_identity is non-zero
uint32_t ingress_source_identity_;
const PolicyInstanceConstSharedPtr initial_policy_; // Never NULL
uint16_t port_;
std::string pod_ip_;
Expand Down
3 changes: 2 additions & 1 deletion cilium/websocket_codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ void Codec::closeOnError(const char* msg) {
ENVOY_LOG(debug, "websocket: Closing connection: {}", msg);
}
// Close downstream, this should result also in the upstream getting closed (if any).
connection_.close(Network::ConnectionCloseType::NoFlush);
connection_.close(Network::ConnectionCloseType::NoFlush,
fmt::format("websocket error: {}", msg));
}

void Codec::closeOnError(Buffer::Instance& data, const char* msg) {
Expand Down
Loading
Loading