From 307ab24f16077fe4e2da126314f4a5fb05db204a Mon Sep 17 00:00:00 2001 From: Tharsanan1 Date: Fri, 13 Sep 2024 16:36:55 +0530 Subject: [PATCH] Move rl to vhost --- adapter/internal/discovery/xds/server.go | 25 +- .../internal/oasparser/config_generator.go | 10 +- .../internal/oasparser/envoyconf/constants.go | 1 + .../oasparser/envoyconf/http_filters.go | 2 + .../internal/oasparser/envoyconf/listener.go | 314 +++++++++++++++++- .../oasparser/envoyconf/listener_test.go | 4 +- .../oasparser/envoyconf/routes_configs.go | 312 +---------------- .../envoyconf/routes_with_clusters.go | 48 ++- .../envoyconf/routes_with_clusters_test.go | 18 + .../oasparser/model/adapter_internal_api.go | 8 +- adapter/internal/oasparser/model/resource.go | 14 +- .../operator/controllers/dp/api_controller.go | 100 +----- .../operator/synchronizer/api_state.go | 1 - .../operator/synchronizer/data_store.go | 11 - .../operator/synchronizer/rest_api.go | 2 +- .../dp/airatelimitpolicy_controller.go | 39 --- .../internal/xds/ratelimiter_cache.go | 5 - .../org.wso2.apk.enforcer/build.gradle | 1 + .../apk/enforcer/grpc/ExtAuthService.java | 3 - .../grpc/ExternalProcessorService.java | 97 ++++-- .../enforcer/grpc/client/RatelimitClient.java | 10 +- .../apk/enforcer/security/KeyValidator.java | 2 - .../security/jwt/APIKeyAuthenticator.java | 4 +- .../wso2/apk/enforcer/server/AuthServer.java | 1 - .../subscription/SubscriptionDto.java | 2 +- .../gateway-runtime-deployment.yaml | 4 +- 26 files changed, 498 insertions(+), 540 deletions(-) diff --git a/adapter/internal/discovery/xds/server.go b/adapter/internal/discovery/xds/server.go index 1f1cd89d26..2c45800c78 100644 --- a/adapter/internal/discovery/xds/server.go +++ b/adapter/internal/discovery/xds/server.go @@ -97,9 +97,10 @@ var ( // todo(amali) there can be multiple vhosts for one EnvoyInternalAPI so handle this apiuuid+sand/prod should be the key - orgAPIMap map[string]map[string]*EnvoyInternalAPI // organizationID -> Vhost:API_UUID -> EnvoyInternalAPI struct map - orgIDLatestAPIVersionMap map[string]map[string]map[string]semantic_version.SemVersion // organizationID -> Vhost:APIName -> VersionRange(vx/vx.x; x is int) -> Latest API Version - + orgAPIMap map[string]map[string]*EnvoyInternalAPI // organizationID -> Vhost:API_UUID -> EnvoyInternalAPI struct map + orgIDLatestAPIVersionMap map[string]map[string]map[string]semantic_version.SemVersion // organizationID -> Vhost:APIName -> VersionRange(vx/vx.x; x is int) -> Latest API Version + vHostToSubscriptionBasedAIRLMap map[string]bool + vHostToSubscriptionBasedRLMap map[string]bool // Envoy Label as map key // TODO(amali) use this without generating all again. gatewayLabelConfigMap map[string]*EnvoyGatewayConfig // GW-Label -> EnvoyGatewayConfig struct map @@ -152,6 +153,8 @@ func init() { gatewayLabelConfigMap = make(map[string]*EnvoyGatewayConfig) orgAPIMap = make(map[string]map[string]*EnvoyInternalAPI) orgIDLatestAPIVersionMap = make(map[string]map[string]map[string]semantic_version.SemVersion) + vHostToSubscriptionBasedAIRLMap = make(map[string]bool) + vHostToSubscriptionBasedRLMap = make(map[string]bool) enforcerLabelMap = make(map[string]*EnforcerInternalAPI) // currently subscriptions, configs, applications, applicationPolicies, subscriptionPolicies, @@ -330,7 +333,7 @@ func GenerateEnvoyResoucesForGateway(gatewayName string) ([]types.Resource, if found { // Prepare the route config name based on the gateway listener section name. routeConfigName := common.GetEnvoyRouteConfigName(listener.Name, string(listenerSection.Name)) - routesConfig := oasParser.GetRouteConfigs(map[string][]*routev3.Route{vhost: routes}, routeConfigName, envoyGatewayConfig.customRateLimitPolicies) + routesConfig := oasParser.GetRouteConfigs(map[string][]*routev3.Route{vhost: routes}, routeConfigName, envoyGatewayConfig.customRateLimitPolicies, vHostToSubscriptionBasedAIRLMap, vHostToSubscriptionBasedRLMap) routeConfigMatched, alreadyExistsInRouteConfigList := routeConfigs[routeConfigName] if alreadyExistsInRouteConfigList { @@ -353,7 +356,7 @@ func GenerateEnvoyResoucesForGateway(gatewayName string) ([]types.Resource, var vhostToRouteArrayFilteredMapForSystemEndpoints = make(map[string][]*routev3.Route) vhostToRouteArrayFilteredMapForSystemEndpoints[systemHost] = vhostToRouteArrayMap[systemHost] routeConfigName := common.GetEnvoyRouteConfigName(common.GetEnvoyListenerName(string(listener.Protocol), uint32(listener.Port)), string(listener.Name)) - systemRoutesConfig := oasParser.GetRouteConfigs(vhostToRouteArrayFilteredMapForSystemEndpoints, routeConfigName, envoyGatewayConfig.customRateLimitPolicies) + systemRoutesConfig := oasParser.GetRouteConfigs(vhostToRouteArrayFilteredMapForSystemEndpoints, routeConfigName, envoyGatewayConfig.customRateLimitPolicies, vHostToSubscriptionBasedAIRLMap, vHostToSubscriptionBasedRLMap) routeConfigs[routeConfigName] = systemRoutesConfig } } @@ -560,6 +563,18 @@ func PopulateInternalMaps(adapterInternalAPI *model.AdapterInternalAPI, labels, } err := UpdateOrgAPIMap(vHosts, labels, listenerName, sectionName, adapterInternalAPI) + for vhost := range vHosts { + if adapterInternalAPI.AIProvider.Enabled && adapterInternalAPI.GetSubscriptionValidation() { + vHostToSubscriptionBasedAIRLMap[vhost] = true + } else { + vHostToSubscriptionBasedAIRLMap[vhost] = false + } + if adapterInternalAPI.GetSubscriptionValidation() { + vHostToSubscriptionBasedRLMap[vhost] = true + } else { + vHostToSubscriptionBasedRLMap[vhost] = false + } + } if err != nil { logger.LoggerXds.ErrorC(logging.PrintError(logging.Error1415, logging.MAJOR, "Error updating the API : %s:%s in vhosts: %s, API_UUID: %v. %v", diff --git a/adapter/internal/oasparser/config_generator.go b/adapter/internal/oasparser/config_generator.go index df33b61ae6..0ba8499577 100644 --- a/adapter/internal/oasparser/config_generator.go +++ b/adapter/internal/oasparser/config_generator.go @@ -85,8 +85,8 @@ func GetProductionListener(gateway *gwapiv1.Gateway, resolvedListenerCerts map[s // // The RouteConfiguration is named as "default" func GetRouteConfigs(vhostToRouteArrayMap map[string][]*routev3.Route, routeConfigName string, - customRateLimitPolicies []*model.CustomRateLimitPolicy) *routev3.RouteConfiguration { - vHosts := envoy.CreateVirtualHosts(vhostToRouteArrayMap, customRateLimitPolicies) + customRateLimitPolicies []*model.CustomRateLimitPolicy, vhostToSubscriptionAIRL map[string]bool, vhostToSubscriptionRL map[string]bool) *routev3.RouteConfiguration { + vHosts := envoy.CreateVirtualHosts(vhostToRouteArrayMap, customRateLimitPolicies, vhostToSubscriptionAIRL, vhostToSubscriptionRL) routeConfig := envoy.CreateRoutesConfigForRds(vHosts, routeConfigName) return routeConfig } @@ -119,12 +119,6 @@ func GetCacheResources(endpoints []*corev3.Address, clusters []*clusterv3.Cluste return listenerRes, clusterRes, routeConfigRes, endpointRes } -// UpdateRoutesConfig updates the existing routes configuration with the provided map of vhost to array of routes. -// All the already existing routes (within the routeConfiguration) will be removed. -func UpdateRoutesConfig(routeConfig *routev3.RouteConfiguration, vhostToRouteArrayMap map[string][]*routev3.Route) { - routeConfig.VirtualHosts = envoy.CreateVirtualHosts(vhostToRouteArrayMap, nil) -} - // GetEnforcerAPI retrieves the ApiDS object model for a given swagger definition // along with the vhost to deploy the API. func GetEnforcerAPI(adapterInternalAPI *model.AdapterInternalAPI, vhost string) *api.Api { diff --git a/adapter/internal/oasparser/envoyconf/constants.go b/adapter/internal/oasparser/envoyconf/constants.go index c92501c054..04a92f34ab 100644 --- a/adapter/internal/oasparser/envoyconf/constants.go +++ b/adapter/internal/oasparser/envoyconf/constants.go @@ -30,6 +30,7 @@ const ( httpConManagerStartPrefix string = "ingress_http" extAuthzPerRouteName string = "type.googleapis.com/envoy.extensions.filters.http.ext_authz.v3.ExtAuthzPerRoute" extProcPerRouteName string = "type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExtProcPerRoute" + ratelimitPerRouteName string = "type.googleapis.com/envoy.extensions.filters.http.ratelimit.v3.RateLimitPerRoute" luaPerRouteName string = "type.googleapis.com/envoy.extensions.filters.http.lua.v3.LuaPerRoute" corsFilterName string = "type.googleapis.com/envoy.extensions.filters.http.cors.v3.Cors" localRateLimitPerRouteName string = "type.googleapis.com/envoy.extensions.filters.http.local_ratelimit.v3.LocalRateLimit" diff --git a/adapter/internal/oasparser/envoyconf/http_filters.go b/adapter/internal/oasparser/envoyconf/http_filters.go index 4017426f87..cedd7784bd 100644 --- a/adapter/internal/oasparser/envoyconf/http_filters.go +++ b/adapter/internal/oasparser/envoyconf/http_filters.go @@ -51,6 +51,8 @@ import ( // HTTPExternalProcessor HTTP filter const HTTPExternalProcessor = "envoy.filters.http.ext_proc" +// RatelimitFilterName Ratelimit filter name +const RatelimitFilterName = "envoy.filters.http.ratelimit" // getHTTPFilters generates httpFilter configuration func getHTTPFilters(globalLuaScript string) []*hcmv3.HttpFilter { diff --git a/adapter/internal/oasparser/envoyconf/listener.go b/adapter/internal/oasparser/envoyconf/listener.go index 7237a13d24..493550539f 100644 --- a/adapter/internal/oasparser/envoyconf/listener.go +++ b/adapter/internal/oasparser/envoyconf/listener.go @@ -334,7 +334,7 @@ func CreateListenerByGateway(gateway *gwapiv1.Gateway, resolvedListenerCerts map // CreateVirtualHosts creates VirtualHost configurations for envoy which serves // request from the vHost domain. The routes array will be included as the routes // for the created virtual host. -func CreateVirtualHosts(vhostToRouteArrayMap map[string][]*routev3.Route, customRateLimitPolicies []*model.CustomRateLimitPolicy) []*routev3.VirtualHost { +func CreateVirtualHosts(vhostToRouteArrayMap map[string][]*routev3.Route, customRateLimitPolicies []*model.CustomRateLimitPolicy, vhostToSubscriptionAIRL map[string]bool, vhostToSubscriptionRL map[string]bool) []*routev3.VirtualHost { virtualHosts := make([]*routev3.VirtualHost, 0, len(vhostToRouteArrayMap)) var rateLimits []*routev3.RateLimit for _, customRateLimitPolicy := range customRateLimitPolicies { @@ -381,6 +381,12 @@ func CreateVirtualHosts(vhostToRouteArrayMap map[string][]*routev3.Route, custom } for vhost, routes := range vhostToRouteArrayMap { + if flag, exists := vhostToSubscriptionAIRL[vhost]; exists && flag { + rateLimits = append(rateLimits, generateSubscriptionBasedAIRatelimits()...) + } + if flag, exists := vhostToSubscriptionRL[vhost]; exists && flag { + rateLimits = append(rateLimits, generateSubscriptionBasedRatelimits()...) + } virtualHost := &routev3.VirtualHost{ Name: vhost, Domains: []string{vhost, fmt.Sprint(vhost, ":*")}, @@ -392,6 +398,312 @@ func CreateVirtualHosts(vhostToRouteArrayMap map[string][]*routev3.Route, custom return virtualHosts } + +func generateSubscriptionBasedRatelimits() []*routev3.RateLimit { + return []*routev3.RateLimit{&routev3.RateLimit{ + Actions: []*routev3.RateLimit_Action{ + { + ActionSpecifier: &routev3.RateLimit_Action_Metadata{ + Metadata: &routev3.RateLimit_Action_MetaData{ + DescriptorKey: DescriptorKeyForOrganization, + MetadataKey: &metadatav3.MetadataKey{ + Key: extAuthzFilterName, + Path: []*metadatav3.MetadataKey_PathSegment{ + { + Segment: &metadatav3.MetadataKey_PathSegment_Key{ + Key: descriptorMetadataKeyForOrganization, + }, + }, + }, + }, + Source: routev3.RateLimit_Action_MetaData_DYNAMIC, + SkipIfAbsent: true, + }, + }, + }, + { + ActionSpecifier: &routev3.RateLimit_Action_Metadata{ + Metadata: &routev3.RateLimit_Action_MetaData{ + DescriptorKey: DescriptorKeyForSubscription, + MetadataKey: &metadatav3.MetadataKey{ + Key: extAuthzFilterName, + Path: []*metadatav3.MetadataKey_PathSegment{ + { + Segment: &metadatav3.MetadataKey_PathSegment_Key{ + Key: descriptorMetadataKeyForSubscription, + }, + }, + }, + }, + Source: routev3.RateLimit_Action_MetaData_DYNAMIC, + SkipIfAbsent: true, + }, + }, + }, + { + ActionSpecifier: &routev3.RateLimit_Action_Metadata{ + Metadata: &routev3.RateLimit_Action_MetaData{ + DescriptorKey: DescriptorKeyForPolicy, + MetadataKey: &metadatav3.MetadataKey{ + Key: extAuthzFilterName, + Path: []*metadatav3.MetadataKey_PathSegment{ + { + Segment: &metadatav3.MetadataKey_PathSegment_Key{ + Key: descriptorMetadataKeyForUsagePolicy, + }, + }, + }, + }, + Source: routev3.RateLimit_Action_MetaData_DYNAMIC, + SkipIfAbsent: true, + }, + }, + }, + }, + }, &routev3.RateLimit{ + Actions: []*routev3.RateLimit_Action{ + { + ActionSpecifier: &routev3.RateLimit_Action_Metadata{ + Metadata: &routev3.RateLimit_Action_MetaData{ + DescriptorKey: DescriptorKeyForOrganization, + MetadataKey: &metadatav3.MetadataKey{ + Key: extAuthzFilterName, + Path: []*metadatav3.MetadataKey_PathSegment{ + { + Segment: &metadatav3.MetadataKey_PathSegment_Key{ + Key: descriptorMetadataKeyForOrganization, + }, + }, + }, + }, + Source: routev3.RateLimit_Action_MetaData_DYNAMIC, + SkipIfAbsent: true, + }, + }, + }, + { + ActionSpecifier: &routev3.RateLimit_Action_Metadata{ + Metadata: &routev3.RateLimit_Action_MetaData{ + DescriptorKey: DescriptorKeyForSubscription, + MetadataKey: &metadatav3.MetadataKey{ + Key: extAuthzFilterName, + Path: []*metadatav3.MetadataKey_PathSegment{ + { + Segment: &metadatav3.MetadataKey_PathSegment_Key{ + Key: descriptorMetadataKeyForSubscription, + }, + }, + }, + }, + Source: routev3.RateLimit_Action_MetaData_DYNAMIC, + SkipIfAbsent: true, + }, + }, + }, + { + ActionSpecifier: &routev3.RateLimit_Action_Metadata{ + Metadata: &routev3.RateLimit_Action_MetaData{ + DescriptorKey: DescriptorKeyForPolicy, + MetadataKey: &metadatav3.MetadataKey{ + Key: extAuthzFilterName, + Path: []*metadatav3.MetadataKey_PathSegment{ + { + Segment: &metadatav3.MetadataKey_PathSegment_Key{ + Key: descriptorMetadataKeyForUsagePolicy, + }, + }, + }, + }, + Source: routev3.RateLimit_Action_MetaData_DYNAMIC, + SkipIfAbsent: true, + }, + }, + }, + { + ActionSpecifier: &routev3.RateLimit_Action_GenericKey_{ + GenericKey: &routev3.RateLimit_Action_GenericKey{ + DescriptorKey: "burst", + DescriptorValue: "enabled", + }, + }, + }, + }, + }, + } +} + +func generateSubscriptionBasedAIRatelimits() []*routev3.RateLimit { + rateLimitForRequestTokenCount := routev3.RateLimit{ + Actions: []*routev3.RateLimit_Action{ + { + ActionSpecifier: &routev3.RateLimit_Action_Metadata{ + Metadata: &routev3.RateLimit_Action_MetaData{ + DescriptorKey: DescriptorKeyForAIRequestTokenCountForSubscriptionBasedAIRL, + MetadataKey: &metadatav3.MetadataKey{ + Key: extAuthzFilterName, + Path: []*metadatav3.MetadataKey_PathSegment{ + &metadatav3.MetadataKey_PathSegment{ + Segment: &metadatav3.MetadataKey_PathSegment_Key{ + Key: DynamicMetadataKeyForOrganizationAndAIRLPolicy, + }, + }, + }, + }, + Source: routev3.RateLimit_Action_MetaData_DYNAMIC, + SkipIfAbsent: true, + }, + }, + }, + { + ActionSpecifier: &routev3.RateLimit_Action_Metadata{ + Metadata: &routev3.RateLimit_Action_MetaData{ + DescriptorKey: DescriptorKeyForAISubscription, + MetadataKey: &metadatav3.MetadataKey{ + Key: extAuthzFilterName, + Path: []*metadatav3.MetadataKey_PathSegment{ + &metadatav3.MetadataKey_PathSegment{ + Segment: &metadatav3.MetadataKey_PathSegment_Key{ + Key: DynamicMetadataKeyForSubscription, + }, + }, + }, + }, + Source: routev3.RateLimit_Action_MetaData_DYNAMIC, + SkipIfAbsent: true, + }, + }, + }, + }, + } + rateLimitForResponseTokenCount := routev3.RateLimit{ + Actions: []*routev3.RateLimit_Action{ + { + ActionSpecifier: &routev3.RateLimit_Action_Metadata{ + Metadata: &routev3.RateLimit_Action_MetaData{ + DescriptorKey: DescriptorKeyForAIResponseTokenCountForSubscriptionBasedAIRL, + MetadataKey: &metadatav3.MetadataKey{ + Key: extAuthzFilterName, + Path: []*metadatav3.MetadataKey_PathSegment{ + &metadatav3.MetadataKey_PathSegment{ + Segment: &metadatav3.MetadataKey_PathSegment_Key{ + Key: DynamicMetadataKeyForOrganizationAndAIRLPolicy, + }, + }, + }, + }, + Source: routev3.RateLimit_Action_MetaData_DYNAMIC, + SkipIfAbsent: true, + }, + }, + }, + { + ActionSpecifier: &routev3.RateLimit_Action_Metadata{ + Metadata: &routev3.RateLimit_Action_MetaData{ + DescriptorKey: DescriptorKeyForAISubscription, + MetadataKey: &metadatav3.MetadataKey{ + Key: extAuthzFilterName, + Path: []*metadatav3.MetadataKey_PathSegment{ + &metadatav3.MetadataKey_PathSegment{ + Segment: &metadatav3.MetadataKey_PathSegment_Key{ + Key: DynamicMetadataKeyForSubscription, + }, + }, + }, + }, + Source: routev3.RateLimit_Action_MetaData_DYNAMIC, + SkipIfAbsent: true, + }, + }, + }, + }, + } + rateLimitForRequestCount := routev3.RateLimit{ + Actions: []*routev3.RateLimit_Action{ + { + ActionSpecifier: &routev3.RateLimit_Action_Metadata{ + Metadata: &routev3.RateLimit_Action_MetaData{ + DescriptorKey: DescriptorKeyForAIRequestCountForSubscriptionBasedAIRL, + MetadataKey: &metadatav3.MetadataKey{ + Key: extAuthzFilterName, + Path: []*metadatav3.MetadataKey_PathSegment{ + &metadatav3.MetadataKey_PathSegment{ + Segment: &metadatav3.MetadataKey_PathSegment_Key{ + Key: DynamicMetadataKeyForOrganizationAndAIRLPolicy, + }, + }, + }, + }, + Source: routev3.RateLimit_Action_MetaData_DYNAMIC, + SkipIfAbsent: true, + }, + }, + }, + { + ActionSpecifier: &routev3.RateLimit_Action_Metadata{ + Metadata: &routev3.RateLimit_Action_MetaData{ + DescriptorKey: DescriptorKeyForAISubscription, + MetadataKey: &metadatav3.MetadataKey{ + Key: extAuthzFilterName, + Path: []*metadatav3.MetadataKey_PathSegment{ + &metadatav3.MetadataKey_PathSegment{ + Segment: &metadatav3.MetadataKey_PathSegment_Key{ + Key: DynamicMetadataKeyForSubscription, + }, + }, + }, + }, + Source: routev3.RateLimit_Action_MetaData_DYNAMIC, + SkipIfAbsent: true, + }, + }, + }, + }, + } + rateLimitForTotalTokenCount := routev3.RateLimit{ + Actions: []*routev3.RateLimit_Action{ + { + ActionSpecifier: &routev3.RateLimit_Action_Metadata{ + Metadata: &routev3.RateLimit_Action_MetaData{ + DescriptorKey: DescriptorKeyForAITotalTokenCountForSubscriptionBasedAIRL, + MetadataKey: &metadatav3.MetadataKey{ + Key: extAuthzFilterName, + Path: []*metadatav3.MetadataKey_PathSegment{ + &metadatav3.MetadataKey_PathSegment{ + Segment: &metadatav3.MetadataKey_PathSegment_Key{ + Key: DynamicMetadataKeyForOrganizationAndAIRLPolicy, + }, + }, + }, + }, + Source: routev3.RateLimit_Action_MetaData_DYNAMIC, + SkipIfAbsent: true, + }, + }, + }, + { + ActionSpecifier: &routev3.RateLimit_Action_Metadata{ + Metadata: &routev3.RateLimit_Action_MetaData{ + DescriptorKey: DescriptorKeyForAISubscription, + MetadataKey: &metadatav3.MetadataKey{ + Key: extAuthzFilterName, + Path: []*metadatav3.MetadataKey_PathSegment{ + &metadatav3.MetadataKey_PathSegment{ + Segment: &metadatav3.MetadataKey_PathSegment_Key{ + Key: DynamicMetadataKeyForSubscription, + }, + }, + }, + }, + Source: routev3.RateLimit_Action_MetaData_DYNAMIC, + SkipIfAbsent: true, + }, + }, + }, + }, + } + return []*routev3.RateLimit{&rateLimitForRequestTokenCount, &rateLimitForResponseTokenCount, &rateLimitForRequestCount, &rateLimitForTotalTokenCount} +} + // TODO: (VirajSalaka) Still the following method is not utilized as Sds is not implement. Keeping the Implementation for future reference // func generateDefaultSdsSecretFromConfigfile(privateKeyPath string, pulicKeyPath string) (*tlsv3.Secret, error) { // var secret tlsv3.Secret diff --git a/adapter/internal/oasparser/envoyconf/listener_test.go b/adapter/internal/oasparser/envoyconf/listener_test.go index 38308b8549..cf17e636e7 100644 --- a/adapter/internal/oasparser/envoyconf/listener_test.go +++ b/adapter/internal/oasparser/envoyconf/listener_test.go @@ -66,7 +66,7 @@ func TestCreateVirtualHost(t *testing.T) { "*": testCreateRoutesForUnitTests(t), "mg.wso2.com": testCreateRoutesForUnitTests(t), } - vHosts := CreateVirtualHosts(vhostToRouteArrayMap, nil) + vHosts := CreateVirtualHosts(vhostToRouteArrayMap, nil, make(map[string]bool), make(map[string]bool)) if len(vHosts) != 2 { t.Error("Virtual Host creation failed") @@ -92,7 +92,7 @@ func TestCreateRoutesConfigForRds(t *testing.T) { "mg.wso2.com": testCreateRoutesForUnitTests(t), } httpListeners := "httpslistener" - vHosts := CreateVirtualHosts(vhostToRouteArrayMap, nil) + vHosts := CreateVirtualHosts(vhostToRouteArrayMap, nil, make(map[string]bool), make(map[string]bool)) rConfig := CreateRoutesConfigForRds(vHosts, httpListeners) assert.NotNil(t, rConfig, "CreateRoutesConfigForRds is failed") diff --git a/adapter/internal/oasparser/envoyconf/routes_configs.go b/adapter/internal/oasparser/envoyconf/routes_configs.go index 4798a48cd5..7b005904bc 100644 --- a/adapter/internal/oasparser/envoyconf/routes_configs.go +++ b/adapter/internal/oasparser/envoyconf/routes_configs.go @@ -29,7 +29,6 @@ import ( extAuthService "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_authz/v3" extProcessorv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" envoy_type_matcherv3 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" - metadatav3 "github.com/envoyproxy/go-control-plane/envoy/type/metadata/v3" "github.com/envoyproxy/go-control-plane/pkg/wellknown" "github.com/golang/protobuf/ptypes/any" logger "github.com/wso2/apk/adapter/internal/loggers" @@ -41,7 +40,6 @@ import ( "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/wrapperspb" gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" - v35 "github.com/envoyproxy/go-control-plane/envoy/type/metadata/v3" ) // Constants for Rate Limiting @@ -118,7 +116,7 @@ func generateRouteMatch(routeRegex string) *routev3.RouteMatch { return match } -func generateRouteAction(apiType string, routeConfig *model.EndpointConfig, ratelimitCriteria *ratelimitCriteria, mirrorClusterNames []string, isSubscriptionBasedAIRatelimitEnabled bool, isBackendBasedAIRatelimitEnabled bool, descriptorValueForBackendBasedAIRatelimit string) (action *routev3.Route_Route) { +func generateRouteAction(apiType string, routeConfig *model.EndpointConfig, ratelimitCriteria *ratelimitCriteria, mirrorClusterNames []string, isBackendBasedAIRatelimitEnabled bool, descriptorValueForBackendBasedAIRatelimit string) (action *routev3.Route_Route) { action = &routev3.Route_Route{ Route: &routev3.RouteAction{ HostRewriteSpecifier: &routev3.RouteAction_AutoHostRewrite{ @@ -152,9 +150,6 @@ func generateRouteAction(apiType string, routeConfig *model.EndpointConfig, rate if isBackendBasedAIRatelimitEnabled { action.Route.RateLimits = append(action.Route.RateLimits, generateBackendBasedAIRatelimit(descriptorValueForBackendBasedAIRatelimit)...) } - if isSubscriptionBasedAIRatelimitEnabled { - action.Route.RateLimits = append(action.Route.RateLimits, generateSubscriptionBasedAIRatelimit()...) - } // Add request mirroring configurations if mirrorClusterNames != nil && len(mirrorClusterNames) > 0 { @@ -263,177 +258,7 @@ func generateBackendBasedAIRatelimit(descValue string) []*routev3.RateLimit { } -func generateSubscriptionBasedAIRatelimit() []*routev3.RateLimit { - rateLimitForRequestTokenCount := routev3.RateLimit{ - Actions: []*routev3.RateLimit_Action{ - { - ActionSpecifier: &routev3.RateLimit_Action_Metadata{ - Metadata: &routev3.RateLimit_Action_MetaData{ - DescriptorKey: DescriptorKeyForAIRequestTokenCountForSubscriptionBasedAIRL, - MetadataKey: &v35.MetadataKey{ - Key: extAuthzFilterName, - Path: []*v35.MetadataKey_PathSegment{ - &v35.MetadataKey_PathSegment{ - Segment: &v35.MetadataKey_PathSegment_Key{ - Key: DynamicMetadataKeyForOrganizationAndAIRLPolicy, - }, - }, - }, - }, - Source: routev3.RateLimit_Action_MetaData_DYNAMIC, - SkipIfAbsent: true, - }, - }, - }, - { - ActionSpecifier: &routev3.RateLimit_Action_Metadata{ - Metadata: &routev3.RateLimit_Action_MetaData{ - DescriptorKey: DescriptorKeyForAISubscription, - MetadataKey: &v35.MetadataKey{ - Key: extAuthzFilterName, - Path: []*v35.MetadataKey_PathSegment{ - &v35.MetadataKey_PathSegment{ - Segment: &v35.MetadataKey_PathSegment_Key{ - Key: DynamicMetadataKeyForSubscription, - }, - }, - }, - }, - Source: routev3.RateLimit_Action_MetaData_DYNAMIC, - SkipIfAbsent: true, - }, - }, - }, - }, - } - rateLimitForResponseTokenCount := routev3.RateLimit{ - Actions: []*routev3.RateLimit_Action{ - { - ActionSpecifier: &routev3.RateLimit_Action_Metadata{ - Metadata: &routev3.RateLimit_Action_MetaData{ - DescriptorKey: DescriptorKeyForAIResponseTokenCountForSubscriptionBasedAIRL, - MetadataKey: &v35.MetadataKey{ - Key: extAuthzFilterName, - Path: []*v35.MetadataKey_PathSegment{ - &v35.MetadataKey_PathSegment{ - Segment: &v35.MetadataKey_PathSegment_Key{ - Key: DynamicMetadataKeyForOrganizationAndAIRLPolicy, - }, - }, - }, - }, - Source: routev3.RateLimit_Action_MetaData_DYNAMIC, - SkipIfAbsent: true, - }, - }, - }, - { - ActionSpecifier: &routev3.RateLimit_Action_Metadata{ - Metadata: &routev3.RateLimit_Action_MetaData{ - DescriptorKey: DescriptorKeyForAISubscription, - MetadataKey: &v35.MetadataKey{ - Key: extAuthzFilterName, - Path: []*v35.MetadataKey_PathSegment{ - &v35.MetadataKey_PathSegment{ - Segment: &v35.MetadataKey_PathSegment_Key{ - Key: DynamicMetadataKeyForSubscription, - }, - }, - }, - }, - Source: routev3.RateLimit_Action_MetaData_DYNAMIC, - SkipIfAbsent: true, - }, - }, - }, - }, - } - rateLimitForRequestCount := routev3.RateLimit{ - Actions: []*routev3.RateLimit_Action{ - { - ActionSpecifier: &routev3.RateLimit_Action_Metadata{ - Metadata: &routev3.RateLimit_Action_MetaData{ - DescriptorKey: DescriptorKeyForAIRequestCountForSubscriptionBasedAIRL, - MetadataKey: &v35.MetadataKey{ - Key: extAuthzFilterName, - Path: []*v35.MetadataKey_PathSegment{ - &v35.MetadataKey_PathSegment{ - Segment: &v35.MetadataKey_PathSegment_Key{ - Key: DynamicMetadataKeyForOrganizationAndAIRLPolicy, - }, - }, - }, - }, - Source: routev3.RateLimit_Action_MetaData_DYNAMIC, - SkipIfAbsent: true, - }, - }, - }, - { - ActionSpecifier: &routev3.RateLimit_Action_Metadata{ - Metadata: &routev3.RateLimit_Action_MetaData{ - DescriptorKey: DescriptorKeyForAISubscription, - MetadataKey: &v35.MetadataKey{ - Key: extAuthzFilterName, - Path: []*v35.MetadataKey_PathSegment{ - &v35.MetadataKey_PathSegment{ - Segment: &v35.MetadataKey_PathSegment_Key{ - Key: DynamicMetadataKeyForSubscription, - }, - }, - }, - }, - Source: routev3.RateLimit_Action_MetaData_DYNAMIC, - SkipIfAbsent: true, - }, - }, - }, - }, - } - rateLimitForTotalTokenCount := routev3.RateLimit{ - Actions: []*routev3.RateLimit_Action{ - { - ActionSpecifier: &routev3.RateLimit_Action_Metadata{ - Metadata: &routev3.RateLimit_Action_MetaData{ - DescriptorKey: DescriptorKeyForAITotalTokenCountForSubscriptionBasedAIRL, - MetadataKey: &v35.MetadataKey{ - Key: extAuthzFilterName, - Path: []*v35.MetadataKey_PathSegment{ - &v35.MetadataKey_PathSegment{ - Segment: &v35.MetadataKey_PathSegment_Key{ - Key: DynamicMetadataKeyForOrganizationAndAIRLPolicy, - }, - }, - }, - }, - Source: routev3.RateLimit_Action_MetaData_DYNAMIC, - SkipIfAbsent: true, - }, - }, - }, - { - ActionSpecifier: &routev3.RateLimit_Action_Metadata{ - Metadata: &routev3.RateLimit_Action_MetaData{ - DescriptorKey: DescriptorKeyForAISubscription, - MetadataKey: &v35.MetadataKey{ - Key: extAuthzFilterName, - Path: []*v35.MetadataKey_PathSegment{ - &v35.MetadataKey_PathSegment{ - Segment: &v35.MetadataKey_PathSegment_Key{ - Key: DynamicMetadataKeyForSubscription, - }, - }, - }, - }, - Source: routev3.RateLimit_Action_MetaData_DYNAMIC, - SkipIfAbsent: true, - }, - }, - }, - }, - } - return []*routev3.RateLimit{&rateLimitForRequestTokenCount, &rateLimitForResponseTokenCount, &rateLimitForRequestCount, &rateLimitForTotalTokenCount} -} + func generateRateLimitPolicy(ratelimitCriteria *ratelimitCriteria) []*routev3.RateLimit { environmentValue := ratelimitCriteria.environment @@ -492,7 +317,6 @@ func generateRateLimitPolicy(ratelimitCriteria *ratelimitCriteria) []*routev3.Ra } ratelimits := []*routev3.RateLimit{&rateLimit} - ratelimits = addSubscriptionRatelimitActions(ratelimits) return ratelimits } @@ -533,138 +357,6 @@ func generateHeaderMatcher(headerName, valueRegex string) *routev3.HeaderMatcher return headerMatcherArray } -func addSubscriptionRatelimitActions(actions []*routev3.RateLimit) []*routev3.RateLimit { - return append(actions, - &routev3.RateLimit{ - Actions: []*routev3.RateLimit_Action{ - { - ActionSpecifier: &routev3.RateLimit_Action_Metadata{ - Metadata: &routev3.RateLimit_Action_MetaData{ - DescriptorKey: DescriptorKeyForOrganization, - MetadataKey: &metadatav3.MetadataKey{ - Key: extAuthzFilterName, - Path: []*metadatav3.MetadataKey_PathSegment{ - { - Segment: &metadatav3.MetadataKey_PathSegment_Key{ - Key: descriptorMetadataKeyForOrganization, - }, - }, - }, - }, - Source: routev3.RateLimit_Action_MetaData_DYNAMIC, - SkipIfAbsent: true, - }, - }, - }, - { - ActionSpecifier: &routev3.RateLimit_Action_Metadata{ - Metadata: &routev3.RateLimit_Action_MetaData{ - DescriptorKey: DescriptorKeyForSubscription, - MetadataKey: &metadatav3.MetadataKey{ - Key: extAuthzFilterName, - Path: []*metadatav3.MetadataKey_PathSegment{ - { - Segment: &metadatav3.MetadataKey_PathSegment_Key{ - Key: descriptorMetadataKeyForSubscription, - }, - }, - }, - }, - Source: routev3.RateLimit_Action_MetaData_DYNAMIC, - SkipIfAbsent: true, - }, - }, - }, - { - ActionSpecifier: &routev3.RateLimit_Action_Metadata{ - Metadata: &routev3.RateLimit_Action_MetaData{ - DescriptorKey: DescriptorKeyForPolicy, - MetadataKey: &metadatav3.MetadataKey{ - Key: extAuthzFilterName, - Path: []*metadatav3.MetadataKey_PathSegment{ - { - Segment: &metadatav3.MetadataKey_PathSegment_Key{ - Key: descriptorMetadataKeyForUsagePolicy, - }, - }, - }, - }, - Source: routev3.RateLimit_Action_MetaData_DYNAMIC, - SkipIfAbsent: true, - }, - }, - }, - }, - }, &routev3.RateLimit{ - Actions: []*routev3.RateLimit_Action{ - { - ActionSpecifier: &routev3.RateLimit_Action_Metadata{ - Metadata: &routev3.RateLimit_Action_MetaData{ - DescriptorKey: DescriptorKeyForOrganization, - MetadataKey: &metadatav3.MetadataKey{ - Key: extAuthzFilterName, - Path: []*metadatav3.MetadataKey_PathSegment{ - { - Segment: &metadatav3.MetadataKey_PathSegment_Key{ - Key: descriptorMetadataKeyForOrganization, - }, - }, - }, - }, - Source: routev3.RateLimit_Action_MetaData_DYNAMIC, - SkipIfAbsent: true, - }, - }, - }, - { - ActionSpecifier: &routev3.RateLimit_Action_Metadata{ - Metadata: &routev3.RateLimit_Action_MetaData{ - DescriptorKey: DescriptorKeyForSubscription, - MetadataKey: &metadatav3.MetadataKey{ - Key: extAuthzFilterName, - Path: []*metadatav3.MetadataKey_PathSegment{ - { - Segment: &metadatav3.MetadataKey_PathSegment_Key{ - Key: descriptorMetadataKeyForSubscription, - }, - }, - }, - }, - Source: routev3.RateLimit_Action_MetaData_DYNAMIC, - SkipIfAbsent: true, - }, - }, - }, - { - ActionSpecifier: &routev3.RateLimit_Action_Metadata{ - Metadata: &routev3.RateLimit_Action_MetaData{ - DescriptorKey: DescriptorKeyForPolicy, - MetadataKey: &metadatav3.MetadataKey{ - Key: extAuthzFilterName, - Path: []*metadatav3.MetadataKey_PathSegment{ - { - Segment: &metadatav3.MetadataKey_PathSegment_Key{ - Key: descriptorMetadataKeyForUsagePolicy, - }, - }, - }, - }, - Source: routev3.RateLimit_Action_MetaData_DYNAMIC, - SkipIfAbsent: true, - }, - }, - }, - { - ActionSpecifier: &routev3.RateLimit_Action_GenericKey_{ - GenericKey: &routev3.RateLimit_Action_GenericKey{ - DescriptorKey: "burst", - DescriptorValue: "enabled", - }, - }, - }, - }, - }) -} func generateRegexMatchAndSubstitute(routePath, endpointResourcePath string, pathMatchType gwapiv1.PathMatchType) *envoy_type_matcherv3.RegexMatchAndSubstitute { diff --git a/adapter/internal/oasparser/envoyconf/routes_with_clusters.go b/adapter/internal/oasparser/envoyconf/routes_with_clusters.go index a4fb91b1c8..772031f383 100644 --- a/adapter/internal/oasparser/envoyconf/routes_with_clusters.go +++ b/adapter/internal/oasparser/envoyconf/routes_with_clusters.go @@ -40,6 +40,7 @@ import ( cors_filter_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/cors/v3" extAuthService "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_authz/v3" extProcessorv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" + ratelimitv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ratelimit/v3" lua "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/lua/v3" tlsv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" upstreams "github.com/envoyproxy/go-control-plane/envoy/extensions/upstreams/http/v3" @@ -865,7 +866,7 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error LuaLocal: luaFilter, wellknown.CORS: corsFilter, } - if !params.isAiAPI || (!resource.GetEnableBackendBasedAIRatelimit() && !resource.GetEnableSubscriptionBasedAIRatelimit()) { + if !params.isAiAPI { perFilterConfigExtProc := extProcessorv3.ExtProcPerRoute{ Override: &extProcessorv3.ExtProcPerRoute_Disabled{ Disabled: true, @@ -877,7 +878,36 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error Value: dataExtProc, } perRouteFilterConfigs[HTTPExternalProcessor] = filterExtProc + } else { + if strings.ToUpper(resource.GetExtractTokenFromValue()) == "HEADER" { + perFilterConfigExtProc := extProcessorv3.ExtProcPerRoute{ + Override: &extProcessorv3.ExtProcPerRoute_Overrides{ + Overrides: &extProcessorv3.ExtProcOverrides{ + ProcessingMode: &extProcessorv3.ProcessingMode{ + RequestHeaderMode: extProcessorv3.ProcessingMode_SKIP, + ResponseHeaderMode: extProcessorv3.ProcessingMode_SKIP, + ResponseBodyMode: extProcessorv3.ProcessingMode_NONE, + }, + }, + }, + } + dataExtProc, _ := proto.Marshal(&perFilterConfigExtProc) + filterExtProc := &any.Any{ + TypeUrl: extProcPerRouteName, + Value: dataExtProc, + } + perRouteFilterConfigs[HTTPExternalProcessor] = filterExtProc + } + } + perFilterConfigRL := ratelimitv3.RateLimitPerRoute{ + VhRateLimits: ratelimitv3.RateLimitPerRoute_INCLUDE, + } + ratelimitPerRoute, _ := proto.Marshal(&perFilterConfigRL) + filterrl := &any.Any{ + TypeUrl: ratelimitPerRouteName, + Value: ratelimitPerRoute, } + perRouteFilterConfigs[RatelimitFilterName] = filterrl logger.LoggerOasparser.Debugf("adding route : %s for API : %s", resourcePath, title) @@ -930,8 +960,7 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error } routeConfig := resource.GetEndpoints().Config metaData := &corev3.Metadata{} - logger.LoggerAPI.Infof("Is backend based rl enabled: %+v, Is subs based rl enabled: %+v", resource.GetEnableBackendBasedAIRatelimit(), resource.GetEnableSubscriptionBasedAIRatelimit()) - if params.isAiAPI && (resource.GetEnableBackendBasedAIRatelimit() || resource.GetEnableSubscriptionBasedAIRatelimit()) { + if params.isAiAPI { metaData = &corev3.Metadata{ FilterMetadata: map[string]*structpb.Struct{ "envoy.filters.http.ext_proc": &structpb.Struct{ @@ -941,11 +970,6 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error StringValue: fmt.Sprintf("%t", resource.GetEnableBackendBasedAIRatelimit()), }, }, - "EnableSubscriptionBasedAIRatelimit": &structpb.Value{ - Kind: &structpb.Value_StringValue{ - StringValue: fmt.Sprintf("%t", resource.GetEnableSubscriptionBasedAIRatelimit()), - }, - }, "BackendBasedAIRatelimitDescriptorValue": &structpb.Value{ Kind: &structpb.Value_StringValue{ StringValue: resource.GetBackendBasedAIRatelimitDescriptorValue(), @@ -1092,8 +1116,8 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error metadataValue := operation.GetMethod() + "_to_" + newMethod match2.DynamicMetadata = generateMetadataMatcherForInternalRoutes(metadataValue) - action1 := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, mirrorClusterNames[operation.GetID()], resource.GetEnableSubscriptionBasedAIRatelimit() && params.isAiAPI, resource.GetEnableBackendBasedAIRatelimit() && params.isAiAPI, resource.GetBackendBasedAIRatelimitDescriptorValue()) - action2 := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, mirrorClusterNames[operation.GetID()], resource.GetEnableSubscriptionBasedAIRatelimit() && params.isAiAPI, resource.GetEnableBackendBasedAIRatelimit() && params.isAiAPI, resource.GetBackendBasedAIRatelimitDescriptorValue()) + action1 := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, mirrorClusterNames[operation.GetID()], resource.GetEnableBackendBasedAIRatelimit() && params.isAiAPI, resource.GetBackendBasedAIRatelimitDescriptorValue()) + action2 := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, mirrorClusterNames[operation.GetID()], resource.GetEnableBackendBasedAIRatelimit() && params.isAiAPI, resource.GetBackendBasedAIRatelimitDescriptorValue()) requestHeadersToRemove := make([]string,0) if params.isAiAPI { @@ -1125,7 +1149,7 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error } else { var action *routev3.Route_Route if requestRedirectAction == nil { - action = generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, mirrorClusterNames[operation.GetID()], resource.GetEnableSubscriptionBasedAIRatelimit() && params.isAiAPI, resource.GetEnableBackendBasedAIRatelimit() && params.isAiAPI, resource.GetBackendBasedAIRatelimitDescriptorValue()) + action = generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, mirrorClusterNames[operation.GetID()], resource.GetEnableBackendBasedAIRatelimit() && params.isAiAPI, resource.GetBackendBasedAIRatelimitDescriptorValue()) } logger.LoggerOasparser.Debug("Creating routes for resource with policies", resourcePath, operation.GetMethod()) // create route for current method. Add policies to route config. Send via enforcer @@ -1154,7 +1178,7 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error } match := generateRouteMatch(routePath) match.Headers = generateHTTPMethodMatcher(methodRegex, clusterName) - action := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, nil, resource.GetEnableSubscriptionBasedAIRatelimit(), resource.GetEnableBackendBasedAIRatelimit(), resource.GetBackendBasedAIRatelimitDescriptorValue()) + action := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, nil, resource.GetEnableBackendBasedAIRatelimit() && params.isAiAPI, resource.GetBackendBasedAIRatelimitDescriptorValue()) rewritePath := generateRoutePathForReWrite(basePath, resourcePath, pathMatchType) action.Route.RegexRewrite = generateRegexMatchAndSubstitute(rewritePath, resourcePath, pathMatchType) requestHeadersToRemove := make([]string,0) diff --git a/adapter/internal/oasparser/envoyconf/routes_with_clusters_test.go b/adapter/internal/oasparser/envoyconf/routes_with_clusters_test.go index f914c68b81..ab26811a40 100644 --- a/adapter/internal/oasparser/envoyconf/routes_with_clusters_test.go +++ b/adapter/internal/oasparser/envoyconf/routes_with_clusters_test.go @@ -31,6 +31,7 @@ import ( operatorutils "github.com/wso2/apk/adapter/internal/operator/utils" "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha1" "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha2" + "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8types "k8s.io/apimachinery/pkg/types" gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" @@ -131,6 +132,8 @@ func TestCreateRoutesWithClustersWithExactAndRegularExpressionRules(t *testing.T httpRouteState.BackendMapping = backendMapping apiState.ProdHTTPRoute = &httpRouteState + apiState.AIProvider = new(v1alpha3.AIProvider) + httpRouteState.RuleIdxToAiRatelimitPolicyMapping = make(map[int]*v1alpha3.AIRateLimitPolicy) adapterInternalAPI, labels, err := synchronizer.UpdateInternalMapsFromHTTPRoute(apiState, &httpRouteState, constants.Production) assert.Equal(t, map[string]struct{}{"default-gateway": {}}, labels, "Labels are incorrect.") assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object") @@ -186,6 +189,8 @@ func TestExtractAPIDetailsFromHTTPRouteForDefaultCase(t *testing.T) { apiState := generateSampleAPI("test-api-1", "1.0.0", "/test-api/1.0.0") httpRouteState := synchronizer.HTTPRouteState{} httpRouteState = *apiState.ProdHTTPRoute + apiState.AIProvider = new(v1alpha3.AIProvider) + httpRouteState.RuleIdxToAiRatelimitPolicyMapping = make(map[int]*v1alpha3.AIRateLimitPolicy) xds.SanitizeGateway("default-gateway", true) adapterInternalAPI, labels, err := synchronizer.UpdateInternalMapsFromHTTPRoute(apiState, &httpRouteState, constants.Production) assert.Equal(t, map[string]struct{}{"default-gateway": {}}, labels, "Labels are incorrect.") @@ -201,6 +206,8 @@ func TestExtractAPIDetailsFromHTTPRouteForSpecificEnvironment(t *testing.T) { apiState.APIDefinition.Spec.Environment = "dev" xds.SanitizeGateway("default-gateway", true) + apiState.AIProvider = new(v1alpha3.AIProvider) + httpRouteState.RuleIdxToAiRatelimitPolicyMapping = make(map[int]*v1alpha3.AIRateLimitPolicy) adapterInternalAPI, labels, err := synchronizer.UpdateInternalMapsFromHTTPRoute(apiState, &httpRouteState, constants.Production) assert.Equal(t, map[string]struct{}{"default-gateway": {}}, labels, "Labels are incorrect.") assert.Nil(t, err, "Error should not be present when apiState is converted to a AdapterInternalAPI object") @@ -267,6 +274,9 @@ func generateSampleAPI(apiName string, apiVersion string, basePath string) synch httpRouteState.BackendMapping = backendMapping apiState.ProdHTTPRoute = &httpRouteState + + apiState.AIProvider = new(v1alpha3.AIProvider) + httpRouteState.RuleIdxToAiRatelimitPolicyMapping = make(map[int]*v1alpha3.AIRateLimitPolicy) return apiState } @@ -294,6 +304,8 @@ func TestCreateRoutesWithClustersWithMultiplePathPrefixRules(t *testing.T) { apiState.APIDefinition = &apiDefinition httpRouteState := synchronizer.HTTPRouteState{} + apiState.AIProvider = new(v1alpha3.AIProvider) + httpRouteState.RuleIdxToAiRatelimitPolicyMapping = make(map[int]*v1alpha3.AIRateLimitPolicy) httpRoute := gwapiv1.HTTPRoute{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", @@ -444,6 +456,8 @@ func TestCreateRoutesWithClustersWithBackendTLSConfigs(t *testing.T) { httpRouteState := synchronizer.HTTPRouteState{} methodTypeGet := gwapiv1.HTTPMethodGet + apiState.AIProvider = new(v1alpha3.AIProvider) + httpRouteState.RuleIdxToAiRatelimitPolicyMapping = make(map[int]*v1alpha3.AIRateLimitPolicy) httpRoute := gwapiv1.HTTPRoute{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", @@ -567,6 +581,8 @@ func TestCreateRoutesWithClustersDifferentBackendRefs(t *testing.T) { httpRouteState := synchronizer.HTTPRouteState{} methodTypeGet := gwapiv1.HTTPMethodGet + apiState.AIProvider = new(v1alpha3.AIProvider) + httpRouteState.RuleIdxToAiRatelimitPolicyMapping = make(map[int]*v1alpha3.AIRateLimitPolicy) httpRoute := gwapiv1.HTTPRoute{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", @@ -659,6 +675,8 @@ func TestCreateRoutesWithClustersSameBackendRefs(t *testing.T) { httpRouteState := synchronizer.HTTPRouteState{} methodTypeGet := gwapiv1.HTTPMethodGet + apiState.AIProvider = new(v1alpha3.AIProvider) + httpRouteState.RuleIdxToAiRatelimitPolicyMapping = make(map[int]*v1alpha3.AIRateLimitPolicy) httpRoute := gwapiv1.HTTPRoute{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", diff --git a/adapter/internal/oasparser/model/adapter_internal_api.go b/adapter/internal/oasparser/model/adapter_internal_api.go index 37bf98da4c..b6be28d390 100644 --- a/adapter/internal/oasparser/model/adapter_internal_api.go +++ b/adapter/internal/oasparser/model/adapter_internal_api.go @@ -497,7 +497,7 @@ func (adapterInternalAPI *AdapterInternalAPI) Validate() error { // SetInfoHTTPRouteCR populates resources and endpoints of adapterInternalAPI. httpRoute.Spec.Rules.Matches // are used to create resources and httpRoute.Spec.Rules.BackendRefs are used to create EndpointClusters. -func (adapterInternalAPI *AdapterInternalAPI) SetInfoHTTPRouteCR(httpRoute *gwapiv1.HTTPRoute, resourceParams ResourceParams, isAiSubscriptionRatelimitEnabled bool, ruleIdxToAiRatelimitPolicyMapping map[int]*dpv1alpha3.AIRateLimitPolicy) error { +func (adapterInternalAPI *AdapterInternalAPI) SetInfoHTTPRouteCR(httpRoute *gwapiv1.HTTPRoute, resourceParams ResourceParams, ruleIdxToAiRatelimitPolicyMapping map[int]*dpv1alpha3.AIRateLimitPolicy, extractTokenFrom string) error { var resources []*Resource outputAuthScheme := utils.TieBreaker(utils.GetPtrSlice(maps.Values(resourceParams.AuthSchemes))) outputAPIPolicy := utils.TieBreaker(utils.GetPtrSlice(maps.Values(resourceParams.APIPolicies))) @@ -544,11 +544,11 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoHTTPRouteCR(httpRoute *gwap enableBackendBasedAIRatelimit := false descriptorValue := "" if aiRatelimitPolicy, exists := ruleIdxToAiRatelimitPolicyMapping[ruleID]; exists { - loggers.LoggerAPI.Infof("Found AI ratelimit mapping for ruleId: %d, related api: %s", ruleID, adapterInternalAPI.UUID) + loggers.LoggerAPI.Debugf("Found AI ratelimit mapping for ruleId: %d, related api: %s", ruleID, adapterInternalAPI.UUID) enableBackendBasedAIRatelimit = true descriptorValue = prepareAIRatelimitIdentifier(adapterInternalAPI.OrganizationID, utils.NamespacedName(aiRatelimitPolicy), &aiRatelimitPolicy.Spec) } else { - loggers.LoggerAPI.Infof("Could not find AIratelimit for ruleId: %d, len of map: %d, related api: %s", ruleID, len(ruleIdxToAiRatelimitPolicyMapping), adapterInternalAPI.UUID) + loggers.LoggerAPI.Debugf("Could not find AIratelimit for ruleId: %d, len of map: %d, related api: %s", ruleID, len(ruleIdxToAiRatelimitPolicyMapping), adapterInternalAPI.UUID) } backendBasePath := "" @@ -915,9 +915,9 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoHTTPRouteCR(httpRoute *gwap hasPolicies: true, iD: uuid.New().String(), hasRequestRedirectFilter: hasRequestRedirectPolicy, - enableSubscriptionBasedAIRatelimit: isAiSubscriptionRatelimitEnabled, enableBackendBasedAIRatelimit: enableBackendBasedAIRatelimit, backendBasedAIRatelimitDescriptorValue: descriptorValue, + extractTokenFrom: extractTokenFrom, } resource.endpoints = &EndpointCluster{ diff --git a/adapter/internal/oasparser/model/resource.go b/adapter/internal/oasparser/model/resource.go index a953e3a850..f4137b850f 100644 --- a/adapter/internal/oasparser/model/resource.go +++ b/adapter/internal/oasparser/model/resource.go @@ -45,9 +45,9 @@ type Resource struct { vendorExtensions map[string]interface{} hasPolicies bool hasRequestRedirectFilter bool - enableSubscriptionBasedAIRatelimit bool enableBackendBasedAIRatelimit bool backendBasedAIRatelimitDescriptorValue string + extractTokenFrom string } // GetEndpointSecurity returns the endpoint security object of a given resource. @@ -193,11 +193,6 @@ func SortResources(resources []*Resource) []*Resource { return resources } -// GetEnableSubscriptionBasedAIRatelimit returns the value of enableSubscriptionBasedAIRatelimit. -func (resource *Resource) GetEnableSubscriptionBasedAIRatelimit() bool { - return resource.enableSubscriptionBasedAIRatelimit -} - // GetEnableBackendBasedAIRatelimit returns the value of enableBackendBasedAIRatelimit. func (resource *Resource) GetEnableBackendBasedAIRatelimit() bool { return resource.enableBackendBasedAIRatelimit @@ -206,4 +201,9 @@ func (resource *Resource) GetEnableBackendBasedAIRatelimit() bool { // GetBackendBasedAIRatelimitDescriptorValue returns the value of backendBasedAIRatelimitDescriptorValue. func (resource *Resource) GetBackendBasedAIRatelimitDescriptorValue() string { return resource.backendBasedAIRatelimitDescriptorValue -} \ No newline at end of file +} + +// GetExtractTokenFromValue returns the value of extractTokenFrom +func (resource *Resource) GetExtractTokenFromValue() string { + return resource.extractTokenFrom +} diff --git a/adapter/internal/operator/controllers/dp/api_controller.go b/adapter/internal/operator/controllers/dp/api_controller.go index e14d43fcb8..44301838aa 100644 --- a/adapter/internal/operator/controllers/dp/api_controller.go +++ b/adapter/internal/operator/controllers/dp/api_controller.go @@ -58,7 +58,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" k8client "sigs.k8s.io/controller-runtime/pkg/client" - cpv1alpha3 "github.com/wso2/apk/common-go-libs/apis/cp/v1alpha3" dpv1alpha1 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha1" dpv1alpha2 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha2" dpv1alpha3 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha3" @@ -232,12 +231,6 @@ func NewAPIController(mgr manager.Manager, operatorDataStore *synchronizer.Opera return err } - if err := c.Watch(source.Kind(mgr.GetCache(), &cpv1alpha3.Subscription{}), handler.EnqueueRequestsFromMapFunc(apiReconciler.populateAPIReconcileRequestsForSubscription), - predicates...); err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2645, logging.BLOCKER, "Error watching Subscription resources: %v", err)) - return err - } - loggers.LoggerAPKOperator.Info("API Controller successfully started. Watching API Objects....") go apiReconciler.handleStatus() go apiReconciler.handleLabels(ctx) @@ -399,7 +392,6 @@ func (apiReconciler *APIReconciler) resolveAPIRefs(ctx context.Context, api dpv1 apiRef.String(), namespace, string(api.ObjectMeta.UID), "api definition file not found") } } - apiReconciler.resolveAiSubscriptionRatelimitPolicies(ctx, apiState) if len(apiState.Authentications) > 0 { if apiState.MutualSSL, err = apiReconciler.resolveAuthentications(ctx, apiState.Authentications); err != nil { return nil, fmt.Errorf("error while resolving authentication %v in namespace: %s was not found. %s", @@ -854,30 +846,6 @@ func (apiReconciler *APIReconciler) getAPIPolicyChildrenRefs(ctx context.Context return interceptorServices, backendJWTs, subscriptionValidation, aiProvider, nil } -func (apiReconciler *APIReconciler) resolveAiSubscriptionRatelimitPolicies(ctx context.Context, apiState *synchronizer.APIState) { - apiState.IsAiSubscriptionRatelimitEnabled = false - subscriptionList := &cpv1alpha3.SubscriptionList{} - if err := apiReconciler.client.List(ctx, subscriptionList, &k8client.ListOptions{ - FieldSelector: fields.OneTermEqualSelector(subscriptionToAPIIndex, utils.GetSubscriptionToAPIIndexID(apiState.APIDefinition.Spec.APIName, apiState.APIDefinition.Spec.APIVersion)), - }); err != nil { - loggers.LoggerAPKOperator.Infof("No associated subscription found for API: %s", utils.NamespacedName(apiState.APIDefinition)) - return - } - for _, subscription := range subscriptionList.Items { - aiRatelimitPolicy := &dpv1alpha3.AIRateLimitPolicy{} - nn:= types.NamespacedName{ - Name: subscription.Spec.RatelimitRef.Name, - Namespace: subscription.GetNamespace(), - } - if err := apiReconciler.client.Get(ctx, nn, aiRatelimitPolicy, ); err == nil { - loggers.LoggerAPKOperator.Infof("API state set as AI subscription enabled") - apiState.IsAiSubscriptionRatelimitEnabled = true - break - } - loggers.LoggerAPKOperator.Infof("No associated aiRatelimitPolicy found for Subscription: %s", utils.NamespacedName(&subscription)) - } -} - func (apiReconciler *APIReconciler) resolveAuthentications(ctx context.Context, authentications map[string]dpv1alpha2.Authentication) (*dpv1alpha2.MutualSSL, error) { resolvedMutualSSL := dpv1alpha2.MutualSSL{} @@ -909,10 +877,10 @@ func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Conte if err := apiReconciler.client.List(ctx, aiRLPolicyList, &k8client.ListOptions{ FieldSelector: fields.OneTermEqualSelector(aiRatelimitPolicyToBackendIndex, backendNamespacedName.String()), }); err != nil { - loggers.LoggerAPKOperator.Infof("No associated AI ratelimit policy found for : %s", backendNamespacedName.String()) + loggers.LoggerAPKOperator.Debugf("No associated AI ratelimit policy found for : %s", backendNamespacedName.String()) } else { for _, aiRLPolicy := range aiRLPolicyList.Items { - loggers.LoggerAPKOperator.Infof("Adding mapping for ruleid: %d to aiRLPolicy: %s", id, utils.NamespacedName(&aiRLPolicy)) + loggers.LoggerAPKOperator.Debugf("Adding mapping for ruleid: %d to aiRLPolicy: %s", id, utils.NamespacedName(&aiRLPolicy)) ruleIdxToAiRatelimitPolicyMapping[id] = &aiRLPolicy } } @@ -1012,14 +980,6 @@ func (apiReconciler *APIReconciler) populateAPIReconcileRequestsForAIRatelimitPo return requests } -func (apiReconciler *APIReconciler) populateAPIReconcileRequestsForSubscription(ctx context.Context, obj k8client.Object) []reconcile.Request { - requests := apiReconciler.getAPIsForSubscription(ctx, obj) - // if len(requests) > 0 { - // apiReconciler.handleOwnerReference(ctx, obj, &requests) - // } - return requests -} - func (apiReconciler *APIReconciler) populateAPIReconcileRequestsForAuthentication(ctx context.Context, obj k8client.Object) []reconcile.Request { requests := apiReconciler.getAPIsForAuthentication(ctx, obj) if len(requests) > 0 { @@ -1486,35 +1446,6 @@ func (apiReconciler *APIReconciler) getAPIsForAIRatelimitPolicy(ctx context.Cont return []reconcile.Request{} } -// getAPIsForAIRatelimitPolicy triggers the API controller reconcile method based on the changes detected -// in subscription resources. -func (apiReconciler *APIReconciler) getAPIsForSubscription(ctx context.Context, obj k8client.Object) []reconcile.Request { - subscription, ok := obj.(*cpv1alpha3.Subscription) - if !ok { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2622, logging.TRIVIAL, "Unexpected object type, bypassing reconciliation: %v", obj)) - return []reconcile.Request{} - } - apiList := &dpv1alpha2.APIList{} - if err := apiReconciler.client.List(ctx, apiList, &k8client.ListOptions{ - FieldSelector: fields.OneTermEqualSelector(apiToSubscriptionIndex, utils.GetSubscriptionToAPIIndexID(subscription.Spec.API.Name, subscription.Spec.API.Version)), - }); err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2649, logging.CRITICAL, "Unable to find associated APIs for subscription: %s, error: %v", utils.NamespacedName(subscription).String(), err.Error())) - return []reconcile.Request{} - } - requests := []reconcile.Request{} - for _, api := range apiList.Items { - req := reconcile.Request{ - NamespacedName: types.NamespacedName{ - Name: api.Name, - Namespace: api.Namespace}, - } - requests = append(requests, req) - loggers.LoggerAPKOperator.Infof("Adding reconcile request for API: %s/%s with API UUID: %v due to change in subscription: %v", api.Namespace, api.Name, - string(api.ObjectMeta.UID), utils.NamespacedName(subscription).String()) - } - return requests -} - // getAPIForAuthentication triggers the API controller reconcile method based on the changes detected // from Authentication objects. If the changes are done for an API stored in the Operator Data store, // a new reconcile event will be created and added to the reconcile event queue. @@ -2086,33 +2017,6 @@ func addIndexes(ctx context.Context, mgr manager.Manager) error { return err } - // // AIRatelimitPolicy to Subscription indexer - // if err := mgr.GetFieldIndexer().IndexField(ctx, &dpv1alpha3.AIRateLimitPolicy{}, aiRatelimitPolicyToSubscriptionIndex, - // func(rawObj k8client.Object) []string { - // aiRatelimitPolicy := rawObj.(*dpv1alpha3.AIRateLimitPolicy) - // var subscriptions []string - // namespace := utils.GetNamespace(aiRatelimitPolicy.Spec.TargetRef.Namespace, aiRatelimitPolicy.GetNamespace()) - // subscriptions = append(subscriptions, types.NamespacedName{ - // Name: string(aiRatelimitPolicy.Spec.TargetRef.Name), - // Namespace: namespace, - // }.String()) - // return subscriptions - // }); err != nil { - // return err - // } - - // Subscription to API indexer - if err := mgr.GetFieldIndexer().IndexField(ctx, &cpv1alpha3.Subscription{}, subscriptionToAPIIndex, - func(rawObj k8client.Object) []string { - subscription := rawObj.(*cpv1alpha3.Subscription) - var subscriptions []string - subscriptionIdentifierForIndex := fmt.Sprintf("%s_%s", subscription.Spec.API.Name, subscription.Spec.API.Version) - subscriptions = append(subscriptions, subscriptionIdentifierForIndex) - return subscriptions - }); err != nil { - return err - } - // API to Subscription indexer if err := mgr.GetFieldIndexer().IndexField(ctx, &dpv1alpha2.API{}, apiToSubscriptionIndex, func(rawObj k8client.Object) []string { diff --git a/adapter/internal/operator/synchronizer/api_state.go b/adapter/internal/operator/synchronizer/api_state.go index 8536cd62af..cacf066181 100644 --- a/adapter/internal/operator/synchronizer/api_state.go +++ b/adapter/internal/operator/synchronizer/api_state.go @@ -45,7 +45,6 @@ type APIState struct { APIDefinitionFile []byte SubscriptionValidation bool MutualSSL *v1alpha2.MutualSSL - IsAiSubscriptionRatelimitEnabled bool } // HTTPRouteState holds the state of the deployed httpRoutes. This state is compared with diff --git a/adapter/internal/operator/synchronizer/data_store.go b/adapter/internal/operator/synchronizer/data_store.go index 8d8f4bb50b..995aff427f 100644 --- a/adapter/internal/operator/synchronizer/data_store.go +++ b/adapter/internal/operator/synchronizer/data_store.go @@ -75,11 +75,6 @@ func (ods *OperatorDataStore) processAPIState(apiNamespacedName types.Namespaced var updated bool events := []string{} cachedAPI := ods.apiStore[apiNamespacedName] - if cachedAPI.IsAiSubscriptionRatelimitEnabled != apiState.IsAiSubscriptionRatelimitEnabled { - cachedAPI.IsAiSubscriptionRatelimitEnabled = apiState.IsAiSubscriptionRatelimitEnabled - updated = true - events = append(events, "Subscription based AI RatelimitPolicy") - } if cachedAPI.AIProvider == nil && apiState.AIProvider != nil { cachedAPI.AIProvider = apiState.AIProvider @@ -114,18 +109,15 @@ func (ods *OperatorDataStore) processAPIState(apiNamespacedName types.Namespaced for key, aiRl := range apiState.ProdHTTPRoute.RuleIdxToAiRatelimitPolicyMapping { if cachedAIRl, exists := cachedAPI.ProdHTTPRoute.RuleIdxToAiRatelimitPolicyMapping[key]; exists { if utils.NamespacedName(cachedAIRl).String() != utils.NamespacedName(aiRl).String() || cachedAIRl.Generation != aiRl.Generation { - loggers.LoggerAPI.Infof("Returning true * %s %s %d %d", utils.NamespacedName(cachedAIRl).String(), utils.NamespacedName(aiRl).String(), cachedAIRl.Generation, aiRl.Generation) updated = true break } } else { - loggers.LoggerAPI.Info("Returning true&&") updated = true break } } if len(cachedAPI.ProdHTTPRoute.RuleIdxToAiRatelimitPolicyMapping) != len(apiState.ProdHTTPRoute.RuleIdxToAiRatelimitPolicyMapping) { - loggers.LoggerAPI.Info("Returning true ***") updated = true } } @@ -169,18 +161,15 @@ func (ods *OperatorDataStore) processAPIState(apiNamespacedName types.Namespaced for key, aiRl := range apiState.SandHTTPRoute.RuleIdxToAiRatelimitPolicyMapping { if cachedAIRl, exists := cachedAPI.SandHTTPRoute.RuleIdxToAiRatelimitPolicyMapping[key]; exists { if utils.NamespacedName(cachedAIRl).String() != utils.NamespacedName(aiRl).String() || cachedAIRl.Generation != aiRl.Generation { - loggers.LoggerAPI.Info("Returning true") updated = true break } } else { - loggers.LoggerAPI.Info("Returning true") updated = true break } } if len(cachedAPI.ProdHTTPRoute.RuleIdxToAiRatelimitPolicyMapping) != len(apiState.ProdHTTPRoute.RuleIdxToAiRatelimitPolicyMapping) { - loggers.LoggerAPI.Info("Returning true") updated = true } } diff --git a/adapter/internal/operator/synchronizer/rest_api.go b/adapter/internal/operator/synchronizer/rest_api.go index 8c82aaa486..cbd8c84e7a 100644 --- a/adapter/internal/operator/synchronizer/rest_api.go +++ b/adapter/internal/operator/synchronizer/rest_api.go @@ -107,7 +107,7 @@ func generateAdapterInternalAPI(apiState APIState, httpRouteState *HTTPRouteStat RateLimitPolicies: apiState.RateLimitPolicies, ResourceRateLimitPolicies: apiState.ResourceRateLimitPolicies, } - if err := adapterInternalAPI.SetInfoHTTPRouteCR(httpRouteState.HTTPRouteCombined, resourceParams, apiState.IsAiSubscriptionRatelimitEnabled, httpRouteState.RuleIdxToAiRatelimitPolicyMapping); err != nil { + if err := adapterInternalAPI.SetInfoHTTPRouteCR(httpRouteState.HTTPRouteCombined, resourceParams, httpRouteState.RuleIdxToAiRatelimitPolicyMapping, apiState.AIProvider.Spec.RateLimitFields.PromptTokens.In); err != nil { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2631, logging.MAJOR, "Error setting HttpRoute CR info to adapterInternalAPI. %v", err)) return nil, err } diff --git a/common-controller/internal/operator/controllers/dp/airatelimitpolicy_controller.go b/common-controller/internal/operator/controllers/dp/airatelimitpolicy_controller.go index 0d618a2277..4791376a32 100644 --- a/common-controller/internal/operator/controllers/dp/airatelimitpolicy_controller.go +++ b/common-controller/internal/operator/controllers/dp/airatelimitpolicy_controller.go @@ -26,27 +26,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" dpv1alpha3 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha3" - // "context" - // "fmt" - // "time" - - // logger "github.com/sirupsen/logrus" - // k8error "k8s.io/apimachinery/pkg/api/errors" - // "k8s.io/apimachinery/pkg/fields" - // "k8s.io/apimachinery/pkg/runtime" - // "k8s.io/apimachinery/pkg/types" - // ctrl "sigs.k8s.io/controller-runtime" - // "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" - // "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" - // "sigs.k8s.io/controller-runtime/pkg/reconcile" - - // k8client "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/source" - // gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" "github.com/wso2/apk/adapter/pkg/logging" cache "github.com/wso2/apk/common-controller/internal/cache" @@ -54,8 +38,6 @@ import ( loggers "github.com/wso2/apk/common-controller/internal/loggers" "github.com/wso2/apk/common-controller/internal/utils" xds "github.com/wso2/apk/common-controller/internal/xds" - // dpv1alpha1 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha1" - // dpv1alpha2 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha2" "github.com/wso2/apk/common-go-libs/constants" ) @@ -73,12 +55,6 @@ func NewAIRatelimitController(mgr manager.Manager, ratelimitStore *cache.Ratelim ods: ratelimitStore, } - // ctx := context.Background() - // if err := addIndexes(ctx, mgr); err != nil { - // loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2612, logging.BLOCKER, "Error adding indexes: %v", err)) - // return err - // } - c, err := controller.New(constants.AIRatelimitController, mgr, controller.Options{Reconciler: aiRateLimitPolicyReconciler}) if err != nil { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2663, logging.BLOCKER, @@ -89,20 +65,6 @@ func NewAIRatelimitController(mgr manager.Manager, ratelimitStore *cache.Ratelim conf := config.ReadConfigs() predicates := []predicate.Predicate{predicate.NewPredicateFuncs(utils.FilterByNamespaces(conf.CommonController.Operator.Namespaces))} - // if err := c.Watch(source.Kind(mgr.GetCache(), &dpv1alpha2.API{}), - // handler.EnqueueRequestsFromMapFunc(aiRateLimitPolicyReconciler.getRatelimitForAPI), predicates...); err != nil { - // loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2611, logging.BLOCKER, - // "Error watching API resources: %v", err)) - // return err - // } - - // if err := c.Watch(source.Kind(mgr.GetCache(), &gwapiv1.HTTPRoute{}), - // handler.EnqueueRequestsFromMapFunc(aiRateLimitPolicyReconciler.getRatelimitForHTTPRoute), predicates...); err != nil { - // loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2613, logging.BLOCKER, - // "Error watching HTTPRoute resources: %v", err)) - // return err - // } - if err := c.Watch(source.Kind(mgr.GetCache(), &dpv1alpha3.AIRateLimitPolicy{}), &handler.EnqueueRequestForObject{}, predicates...); err != nil { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2639, logging.BLOCKER, "Error watching Ratelimit resources: %v", err.Error())) @@ -153,7 +115,6 @@ func (r *AIRateLimitPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Re xds.UpdateRateLimiterPolicies(conf.CommonController.Server.Label) } } - loggers.LoggerAPKOperator.Infof("AIRatelimit reconcile..*****.") return ctrl.Result{}, nil } diff --git a/common-controller/internal/xds/ratelimiter_cache.go b/common-controller/internal/xds/ratelimiter_cache.go index 53827e7230..035c6c91f4 100644 --- a/common-controller/internal/xds/ratelimiter_cache.go +++ b/common-controller/internal/xds/ratelimiter_cache.go @@ -335,10 +335,8 @@ func (r *rateLimitPolicyCache) AddCustomRateLimitPolicies(customRateLimitPolicy // ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache process the specs and update the cache func (r *rateLimitPolicyCache) ProcessSubscriptionBasedAIRatelimitPolicySpecsAndUpdateCache(subscriptionEnabledAIRatelimitPolicies map[types.NamespacedName]struct{}, aiRatelimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) { aiRlDescriptors := make([]*rls_config.RateLimitDescriptor, 0) - loggers.LoggerAPKOperator.Infof("222222") for namespacedNameRl := range subscriptionEnabledAIRatelimitPolicies { if airl, exists := aiRatelimitPolicySpecs[namespacedNameRl]; exists { - loggers.LoggerAPKOperator.Infof("----- %s %s %s", DescriptorKeyForSubscriptionBasedAIRequestTokenCount, prepareSubscriptionBasedAIRatelimitIdentifier(airl.Override.Organization, namespacedNameRl), DescriptorKeyForSubscription) // Add descriptor for RequestTokenCount aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{ Key: DescriptorKeyForSubscriptionBasedAIRequestTokenCount, @@ -403,10 +401,7 @@ func (r *rateLimitPolicyCache) ProcessSubscriptionBasedAIRatelimitPolicySpecsAnd // ProcessAIratelimitPolicySpecsAndUpdateCache process the specs and update the cache func (r *rateLimitPolicyCache) ProcessAIRatelimitPolicySpecsAndUpdateCache(aiRateLimitPolicySpecs map[types.NamespacedName]*dpv1alpha3.AIRateLimitPolicySpec) { aiRlDescriptors := make([]*rls_config.RateLimitDescriptor, 0) - loggers.LoggerAPKOperator.Infof("222222") for namespacedName, spec := range aiRateLimitPolicySpecs { - logger.Infof("Adding : %s, %s", DescriptorKeyForAIRequestCount, prepareAIRatelimitIdentifier(spec.Override.Organization, namespacedName, spec)) - logger.Infof("For airl: %s", namespacedName) // Add descriptor for RequestTokenCount aiRlDescriptors = append(aiRlDescriptors, &rls_config.RateLimitDescriptor{ Key: DescriptorKeyForAIRequestTokenCount, diff --git a/gateway/enforcer/org.wso2.apk.enforcer/build.gradle b/gateway/enforcer/org.wso2.apk.enforcer/build.gradle index 21fdfe75ce..8f410f5718 100644 --- a/gateway/enforcer/org.wso2.apk.enforcer/build.gradle +++ b/gateway/enforcer/org.wso2.apk.enforcer/build.gradle @@ -91,6 +91,7 @@ dependencies { implementation libs.gson implementation libs.ua.parser implementation libs.commons.lang3 + implementation libs.commons.compress implementation libs.openfeign.feign.gson implementation libs.openfeign.feign.slf4j diff --git a/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/grpc/ExtAuthService.java b/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/grpc/ExtAuthService.java index b961743b37..060a441b07 100644 --- a/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/grpc/ExtAuthService.java +++ b/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/grpc/ExtAuthService.java @@ -270,7 +270,6 @@ private String constructQueryParamString(boolean removeAllQueryParams, String re * @param value */ private void addMetadata(Struct.Builder structBuilder, String key, String value) { - System.out.println("Key: " + key + " value: " + value); structBuilder.putFields(key, Value.newBuilder().setStringValue(value).build()); } @@ -309,6 +308,4 @@ private CheckResponse buildReadyCheckResponse(CheckResponse.Builder responseBuil .setDeniedResponse(deniedResponsePreparer.build()) .build(); } - - } diff --git a/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/grpc/ExternalProcessorService.java b/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/grpc/ExternalProcessorService.java index 4d776ad9b3..258c655462 100644 --- a/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/grpc/ExternalProcessorService.java +++ b/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/grpc/ExternalProcessorService.java @@ -22,12 +22,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.Struct; import com.google.protobuf.Value; +import io.envoyproxy.envoy.config.core.v3.HeaderValue; import io.envoyproxy.envoy.service.ext_proc.v3.BodyMutation; import io.envoyproxy.envoy.service.ext_proc.v3.BodyResponse; import io.envoyproxy.envoy.service.ext_proc.v3.CommonResponse; import io.envoyproxy.envoy.service.ext_proc.v3.ExternalProcessorGrpc; import io.envoyproxy.envoy.service.ext_proc.v3.HeaderMutation; import io.envoyproxy.envoy.service.ext_proc.v3.HeadersResponse; +import io.envoyproxy.envoy.service.ext_proc.v3.HttpHeaders; import io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest; import io.envoyproxy.envoy.service.ext_proc.v3.ProcessingResponse; import io.grpc.stub.StreamObserver; @@ -73,26 +75,65 @@ public class ExternalProcessorService extends ExternalProcessorGrpc.ExternalProc public StreamObserver process( final StreamObserver responseObserver) { FilterMetadata filterMetadata = new FilterMetadata(); - System.out.println("process ...."); return new StreamObserver() { @Override public void onNext(ProcessingRequest request) { ProcessingRequest.RequestCase r = request.getRequestCase(); switch (r) { + case RESPONSE_HEADERS: + if (!request.getAttributesMap().isEmpty() && request.getAttributesMap().get("envoy.filters.http.ext_proc") != null && request.getAttributesMap().get("envoy.filters.http.ext_proc").getFieldsMap().get("xds.route_metadata") != null){ + Value value = request.getAttributesMap().get("envoy.filters.http.ext_proc").getFieldsMap().get("xds.route_metadata"); + FilterMetadata metadata = convertStringToFilterMetadata(value.getStringValue()); + filterMetadata.backendBasedAIRatelimitDescriptorValue = metadata.backendBasedAIRatelimitDescriptorValue; + filterMetadata.enableBackendBasedAIRatelimit = metadata.enableBackendBasedAIRatelimit; + } + executorService.submit(() -> { + Struct filterMetadataFromAuthZ = request.getMetadataContext().getFilterMetadataOrDefault("envoy.filters.http.ext_authz", null); + if (filterMetadataFromAuthZ != null) { + String extractTokenFrom = filterMetadataFromAuthZ.getFieldsMap().get(DYNAMIC_METADATA_KEY_FOR_EXTRACT_TOKEN_FROM).getStringValue(); + String promptTokenID = filterMetadataFromAuthZ.getFieldsMap().get(DYNAMIC_METADATA_KEY_FOR_PROMPT_TOKEN_ID).getStringValue(); + String completionTokenID = filterMetadataFromAuthZ.getFieldsMap().get(DYNAMIC_METADATA_KEY_FOR_COMPLETION_TOKEN_ID).getStringValue(); + String totalTokenID = filterMetadataFromAuthZ.getFieldsMap().get(DYNAMIC_METADATA_KEY_FOR_TOTAL_TOKEN_ID).getStringValue(); + + Usage usage = extractUsageFromHeaders(request.getResponseHeaders(), completionTokenID, promptTokenID, totalTokenID); + if (usage == null) { + logger.error("Usage details not found.."); + responseObserver.onCompleted(); + return; + } + List configs = new ArrayList<>(); + if (filterMetadata.enableBackendBasedAIRatelimit) { + configs.add(new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_AI_REQUEST_TOKEN_COUNT, filterMetadata.backendBasedAIRatelimitDescriptorValue, usage.getPrompt_tokens() - 1)); + configs.add(new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_AI_RESPONSE_TOKEN_COUNT, filterMetadata.backendBasedAIRatelimitDescriptorValue, usage.getCompletion_tokens() - 1)); + configs.add(new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_AI_TOTAL_TOKEN_COUNT, filterMetadata.backendBasedAIRatelimitDescriptorValue, usage.getTotal_tokens() - 1)); + } + if (request.hasMetadataContext()) { + if (filterMetadataFromAuthZ != null) { + if (filterMetadataFromAuthZ.getFieldsMap().get(DYNAMIC_METADATA_KEY_FOR_ORGANIZATION_AND_AIRL_POLICY) != null && filterMetadataFromAuthZ.getFieldsMap().get(DYNAMIC_METADATA_KEY_FOR_SUBSCRIPTION) != null) { + String orgAndAIRLPolicyValue = filterMetadataFromAuthZ.getFieldsMap().get(DYNAMIC_METADATA_KEY_FOR_ORGANIZATION_AND_AIRL_POLICY).getStringValue(); + String aiRLSubsValue = filterMetadataFromAuthZ.getFieldsMap().get(DYNAMIC_METADATA_KEY_FOR_SUBSCRIPTION).getStringValue(); + configs.add(new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_SUBSCRIPTION_BASED_AI_REQUEST_TOKEN_COUNT, orgAndAIRLPolicyValue, new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_AI_SUBSCRIPTION, aiRLSubsValue, usage.getPrompt_tokens() - 1))); + configs.add(new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_SUBSCRIPTION_BASED_AI_RESPONSE_TOKEN_COUNT, orgAndAIRLPolicyValue, new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_AI_SUBSCRIPTION, aiRLSubsValue, usage.getCompletion_tokens() - 1))); + configs.add(new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_SUBSCRIPTION_BASED_AI_TOTAL_TOKEN_COUNT, orgAndAIRLPolicyValue, new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_AI_SUBSCRIPTION, aiRLSubsValue, usage.getTotal_tokens() - 1))); + } + } + } + ratelimitClient.shouldRatelimit(configs); + } + }); + responseObserver.onCompleted(); case RESPONSE_BODY: if (!request.getAttributesMap().isEmpty() && request.getAttributesMap().get("envoy.filters.http.ext_proc") != null && request.getAttributesMap().get("envoy.filters.http.ext_proc").getFieldsMap().get("xds.route_metadata") != null){ Value value = request.getAttributesMap().get("envoy.filters.http.ext_proc").getFieldsMap().get("xds.route_metadata"); FilterMetadata metadata = convertStringToFilterMetadata(value.getStringValue()); filterMetadata.backendBasedAIRatelimitDescriptorValue = metadata.backendBasedAIRatelimitDescriptorValue; filterMetadata.enableBackendBasedAIRatelimit = metadata.enableBackendBasedAIRatelimit; - filterMetadata.enableSubscriptionBasedAIRatelimit = metadata.enableSubscriptionBasedAIRatelimit; } - System.out.println("In the response flow metadata descirtor:" + filterMetadata.backendBasedAIRatelimitDescriptorValue); if (request.hasResponseBody()) { final byte[] bodyFromResponse = request.getResponseBody().getBody().toByteArray(); executorService.submit(() -> { - String body = null; + String body; try { body = decompress(bodyFromResponse); } catch (Exception e) { @@ -118,16 +159,14 @@ public void onNext(ProcessingRequest request) { configs.add(new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_AI_RESPONSE_TOKEN_COUNT, filterMetadata.backendBasedAIRatelimitDescriptorValue, usage.getCompletion_tokens() - 1)); configs.add(new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_AI_TOTAL_TOKEN_COUNT, filterMetadata.backendBasedAIRatelimitDescriptorValue, usage.getTotal_tokens() - 1)); } - if (filterMetadata.enableSubscriptionBasedAIRatelimit) { - if (request.hasMetadataContext()) { - if (filterMetadataFromAuthZ != null) { - if (filterMetadataFromAuthZ.getFieldsMap().get(DYNAMIC_METADATA_KEY_FOR_ORGANIZATION_AND_AIRL_POLICY) != null && filterMetadataFromAuthZ.getFieldsMap().get(DYNAMIC_METADATA_KEY_FOR_SUBSCRIPTION) != null) { - String orgAndAIRLPolicyValue = filterMetadataFromAuthZ.getFieldsMap().get(DYNAMIC_METADATA_KEY_FOR_ORGANIZATION_AND_AIRL_POLICY).getStringValue(); - String aiRLSubsValue = filterMetadataFromAuthZ.getFieldsMap().get(DYNAMIC_METADATA_KEY_FOR_SUBSCRIPTION).getStringValue(); - configs.add(new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_SUBSCRIPTION_BASED_AI_REQUEST_TOKEN_COUNT, orgAndAIRLPolicyValue, new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_AI_SUBSCRIPTION, aiRLSubsValue, usage.getPrompt_tokens() - 1))); - configs.add(new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_SUBSCRIPTION_BASED_AI_RESPONSE_TOKEN_COUNT, orgAndAIRLPolicyValue, new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_AI_SUBSCRIPTION, aiRLSubsValue, usage.getCompletion_tokens() - 1))); - configs.add(new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_SUBSCRIPTION_BASED_AI_TOTAL_TOKEN_COUNT, orgAndAIRLPolicyValue, new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_AI_SUBSCRIPTION, aiRLSubsValue, usage.getTotal_tokens() - 1))); - } + if (request.hasMetadataContext()) { + if (filterMetadataFromAuthZ != null) { + if (filterMetadataFromAuthZ.getFieldsMap().get(DYNAMIC_METADATA_KEY_FOR_ORGANIZATION_AND_AIRL_POLICY) != null && filterMetadataFromAuthZ.getFieldsMap().get(DYNAMIC_METADATA_KEY_FOR_SUBSCRIPTION) != null) { + String orgAndAIRLPolicyValue = filterMetadataFromAuthZ.getFieldsMap().get(DYNAMIC_METADATA_KEY_FOR_ORGANIZATION_AND_AIRL_POLICY).getStringValue(); + String aiRLSubsValue = filterMetadataFromAuthZ.getFieldsMap().get(DYNAMIC_METADATA_KEY_FOR_SUBSCRIPTION).getStringValue(); + configs.add(new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_SUBSCRIPTION_BASED_AI_REQUEST_TOKEN_COUNT, orgAndAIRLPolicyValue, new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_AI_SUBSCRIPTION, aiRLSubsValue, usage.getPrompt_tokens() - 1))); + configs.add(new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_SUBSCRIPTION_BASED_AI_RESPONSE_TOKEN_COUNT, orgAndAIRLPolicyValue, new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_AI_SUBSCRIPTION, aiRLSubsValue, usage.getCompletion_tokens() - 1))); + configs.add(new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_SUBSCRIPTION_BASED_AI_TOTAL_TOKEN_COUNT, orgAndAIRLPolicyValue, new RatelimitClient.KeyValueHitsAddend(DESCRIPTOR_KEY_FOR_AI_SUBSCRIPTION, aiRLSubsValue, usage.getTotal_tokens() - 1))); } } } @@ -136,7 +175,6 @@ public void onNext(ProcessingRequest request) { }); responseObserver.onCompleted(); } else { - System.out.println("Request does not have response body"); responseObserver.onCompleted(); } @@ -145,7 +183,7 @@ public void onNext(ProcessingRequest request) { @Override public void onError(Throwable err) { - System.out.println("on error ...."+ err.getLocalizedMessage() + " " + err.getMessage() + " " + err.toString()+ " ****"); + logger.error("Error initiated from envoy in the external processing session. Error: " + err); } @Override @@ -180,13 +218,11 @@ protected HeadersResponse prepareHeadersResponse() { // The FilterMetadata class as per your request private static class FilterMetadata { - boolean enableSubscriptionBasedAIRatelimit; boolean enableBackendBasedAIRatelimit; String backendBasedAIRatelimitDescriptorValue; @Override public String toString() { return "FilterMetadata{" + - "enableSubscriptionBasedAIRatelimit=" + enableSubscriptionBasedAIRatelimit + ", enableBackendBasedAIRatelimit=" + enableBackendBasedAIRatelimit + ", backendBasedAIRatelimitDescriptorValue='" + backendBasedAIRatelimitDescriptorValue + '\'' + '}'; @@ -199,12 +235,10 @@ public static FilterMetadata convertStringToFilterMetadata(String input) { // Regex patterns to extract specific fields String backendValuePattern = "key: \"BackendBasedAIRatelimitDescriptorValue\".*?string_value: \"(.*?)\""; String enableBackendPattern = "key: \"EnableBackendBasedAIRatelimit\".*?string_value: \"(.*?)\""; - String enableSubscriptionPattern = "key: \"EnableSubscriptionBasedAIRatelimit\".*?string_value: \"(.*?)\""; // Extract and assign to the FilterMetadata object metadata.backendBasedAIRatelimitDescriptorValue = extractValue(input, backendValuePattern); metadata.enableBackendBasedAIRatelimit = Boolean.parseBoolean(extractValue(input, enableBackendPattern)); - metadata.enableSubscriptionBasedAIRatelimit = Boolean.parseBoolean(extractValue(input, enableSubscriptionPattern)); return metadata; } @@ -224,6 +258,27 @@ public static String sanitize(String input) { return input.replaceAll("[\\t\\n\\r]+", " ").trim(); } + private static Usage extractUsageFromHeaders(HttpHeaders headers, String completionTokenPath, String promptTokenPath, String totalTokenPath) { + try { + Usage usage = new Usage(); + for (HeaderValue headerValue : headers.getHeaders().getHeadersList()) { + if (headerValue.getKey().equals(completionTokenPath)) { + usage.completion_tokens = Integer.parseInt(headerValue.getValue()); + } + if (headerValue.getKey().equals(promptTokenPath)) { + usage.prompt_tokens = Integer.parseInt(headerValue.getValue()); + } + if (headerValue.getKey().equals(totalTokenPath)) { + usage.total_tokens = Integer.parseInt(headerValue.getValue()); + } + } + return usage; + } catch (Exception e) { + logger.error("Error occured while getting yusage info from headers" + e); + return null; + } + } + private static Usage extractUsageFromBody(String body, String completionTokenPath, String promptTokenPath, String totalTokenPath) { body = sanitize(body); ObjectMapper mapper = new ObjectMapper(); @@ -284,7 +339,7 @@ private static Usage extractUsageFromBody(String body, String completionTokenPat return usage; } catch (Exception e) { - System.out.println(String.format("Unexpected error while extracting usage from the body: %s", body) + " \n" + e); + logger.error(String.format("Unexpected error while extracting usage from the body: %s", body) + " \n" + e); return null; } } diff --git a/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/grpc/client/RatelimitClient.java b/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/grpc/client/RatelimitClient.java index f62d293e1c..e0b341cd13 100644 --- a/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/grpc/client/RatelimitClient.java +++ b/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/grpc/client/RatelimitClient.java @@ -8,17 +8,20 @@ import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.wso2.apk.enforcer.config.ConfigHolder; +import org.wso2.apk.enforcer.grpc.ExternalProcessorService; import java.io.File; import java.nio.file.Paths; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import javax.net.ssl.SSLException; public class RatelimitClient { + private static final Logger logger = LogManager.getLogger(RatelimitClient.class); RateLimitServiceGrpc.RateLimitServiceBlockingStub stub; + public RatelimitClient(){ File certFile = Paths.get(ConfigHolder.getInstance().getEnvVarConfig().getEnforcerPublicKeyPath()).toFile(); File keyFile = Paths.get(ConfigHolder.getInstance().getEnvVarConfig().getEnforcerPrivateKeyPath()).toFile(); @@ -30,7 +33,7 @@ public RatelimitClient(){ .keyManager(certFile, keyFile) .build(); } catch (SSLException e) { - System.out.println("Error while generating SSL Context."+ e); + logger.error("Error while generating SSL Context."+ e); } String rlHost = ConfigHolder.getInstance().getEnvVarConfig().getRatelimiterHost(); int port = ConfigHolder.getInstance().getEnvVarConfig().getRatelimiterPort(); @@ -59,7 +62,6 @@ public void shouldRatelimit(List configs) { .setHitsAddend(hitsAddend) .build(); RateLimitResponse rateLimitResponse = stub.shouldRateLimit(rateLimitRequest); - System.out.println(rateLimitResponse.getOverallCode()); } } diff --git a/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/security/KeyValidator.java b/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/security/KeyValidator.java index daf2aa091a..c6bfc3fdae 100644 --- a/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/security/KeyValidator.java +++ b/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/security/KeyValidator.java @@ -263,8 +263,6 @@ public static APIKeyValidationInfoDTO validateSubscription(String apiUuid, Strin return infoDTO; } - - private static void validate(APIKeyValidationInfoDTO infoDTO, Application app, Subscription sub) { // Validate subscription status diff --git a/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/security/jwt/APIKeyAuthenticator.java b/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/security/jwt/APIKeyAuthenticator.java index 0eb2a90ef1..eba64e2c25 100644 --- a/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/security/jwt/APIKeyAuthenticator.java +++ b/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/security/jwt/APIKeyAuthenticator.java @@ -222,8 +222,8 @@ private AuthenticationContext processAPIKey(RequestContext requestContext, Strin .getMandateSubscriptionValidation(); if (!requestContext.getMatchedAPI().isSystemAPI() && (isGatewayLevelSubscriptionValidationEnabled || requestContext.getMatchedAPI() .isSubscriptionValidation())) { - validationInfoDto = KeyValidator.validateSubscription(apiUuid, apiContext, - requestContext.getMatchedAPI(), payload); +// validationInfoDto = KeyValidator.validateSubscription(apiUuid, apiContext, +// requestContext.getMatchedAPI(), payload); log.debug("Validating subscription for API Key using JWT claims against invoked API info." + " context: {} version: {}", apiContext, apiVersion); validationInfoDto = getAPIKeyValidationDTO(requestContext, payload); diff --git a/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/server/AuthServer.java b/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/server/AuthServer.java index d849dc648a..38ac92af25 100644 --- a/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/server/AuthServer.java +++ b/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/server/AuthServer.java @@ -159,7 +159,6 @@ private static Server initServer() throws SSLException { EnforcerWorkerPool enforcerWorkerPool = new EnforcerWorkerPool(threadPoolConfig.getCoreSize(), threadPoolConfig.getMaxSize(), threadPoolConfig.getKeepAliveTime(), threadPoolConfig.getQueueSize(), Constants.EXTERNAL_AUTHZ_THREAD_GROUP, Constants.EXTERNAL_AUTHZ_THREAD_ID); - System.out.println("test"); return NettyServerBuilder.forPort(authServerConfig.getPort()) .keepAliveTime(authServerConfig.getKeepAliveTime(), TimeUnit.SECONDS).bossEventLoopGroup(bossGroup) .workerEventLoopGroup(workerGroup) diff --git a/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/subscription/SubscriptionDto.java b/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/subscription/SubscriptionDto.java index b8a31efaad..42cb28a935 100644 --- a/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/subscription/SubscriptionDto.java +++ b/gateway/enforcer/org.wso2.apk.enforcer/src/main/java/org/wso2/apk/enforcer/subscription/SubscriptionDto.java @@ -63,4 +63,4 @@ public void setRatelimitTier(String ratelimitTier) { this.ratelimitTier = ratelimitTier; } -} \ No newline at end of file +} diff --git a/helm-charts/templates/data-plane/gateway-components/gateway-runtime/gateway-runtime-deployment.yaml b/helm-charts/templates/data-plane/gateway-components/gateway-runtime/gateway-runtime-deployment.yaml index bf6217a4ff..50d9bc91f8 100644 --- a/helm-charts/templates/data-plane/gateway-components/gateway-runtime/gateway-runtime-deployment.yaml +++ b/helm-charts/templates/data-plane/gateway-components/gateway-runtime/gateway-runtime-deployment.yaml @@ -100,9 +100,9 @@ spec: value: admin - name: JAVA_OPTS {{- if and .Values.wso2.apk.metrics .Values.wso2.apk.metrics.enabled }} - value: -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5006 -Dhttpclient.hostnameVerifier=AllowAll -Xms512m -Xmx512m -XX:MaxRAMFraction=2 -Dapk.jmx.metrics.enabled=true -javaagent:/home/wso2/lib/jmx_prometheus_javaagent-0.20.0.jar=18006:/tmp/metrics/prometheus-jmx-config-enforcer.yml + value: -Dhttpclient.hostnameVerifier=AllowAll -Xms512m -Xmx512m -XX:MaxRAMFraction=2 -Dapk.jmx.metrics.enabled=true -javaagent:/home/wso2/lib/jmx_prometheus_javaagent-0.20.0.jar=18006:/tmp/metrics/prometheus-jmx-config-enforcer.yml {{- else }} - value: -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5006 -Dhttpclient.hostnameVerifier=AllowAll -Xms512m -Xmx512m -XX:MaxRAMFraction=2 + value: -Dhttpclient.hostnameVerifier=AllowAll -Xms512m -Xmx512m -XX:MaxRAMFraction=2 {{- end }} {{- if and .Values.wso2.apk.dp.gatewayRuntime.analytics .Values.wso2.apk.dp.gatewayRuntime.analytics.publishers }} {{- $defaultPublisherSecretName := "" }}