Skip to content

Commit

Permalink
Merge pull request #2438 from sgayangi/2416-sgayang-grpc
Browse files Browse the repository at this point in the history
Add gRPC API support
  • Loading branch information
CrowleyRajapakse authored Sep 20, 2024
2 parents 14a8b44 + 18576c5 commit 95c0ffb
Show file tree
Hide file tree
Showing 158 changed files with 11,417 additions and 2,413 deletions.
1 change: 1 addition & 0 deletions adapter/internal/oasparser/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ const (
SOAP string = "SOAP"
WS string = "WS"
GRAPHQL string = "GraphQL"
GRPC string = "GRPC"
WEBHOOK string = "WEBHOOK"
SSE string = "SSE"
Prototyped string = "prototyped"
Expand Down
32 changes: 28 additions & 4 deletions adapter/internal/oasparser/envoyconf/http_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
cors_filter_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/cors/v3"
ext_authv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_authz/v3"
ext_process "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3"
grpc_stats_filter_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/grpc_stats/v3"
luav3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/lua/v3"
ratelimit "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ratelimit/v3"
routerv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
Expand All @@ -48,9 +49,9 @@ import (
"github.com/golang/protobuf/ptypes/any"
)


// HTTPExternalProcessor HTTP filter
const HTTPExternalProcessor = "envoy.filters.http.ext_proc"

// RatelimitFilterName Ratelimit filter name
const RatelimitFilterName = "envoy.filters.http.ratelimit"

Expand Down Expand Up @@ -114,6 +115,27 @@ func getRouterHTTPFilter() *hcmv3.HttpFilter {
return &filter
}

// getGRPCStatsHTTPFilter gets grpc_stats http filter.
func getGRPCStatsHTTPFilter() *hcmv3.HttpFilter {

gprcStatsFilterConf := grpc_stats_filter_v3.FilterConfig{
EnableUpstreamStats: true,
EmitFilterState: true,
}
gprcStatsFilterTypedConf, err := anypb.New(&gprcStatsFilterConf)

if err != nil {
logger.LoggerOasparser.Error("Error marshaling grpc stats filter configs. ", err)
}

filter := hcmv3.HttpFilter{
Name: "grpc_stats",
ConfigType: &hcmv3.HttpFilter_TypedConfig{TypedConfig: gprcStatsFilterTypedConf},
}

return &filter
}

// getCorsHTTPFilter gets cors http filter.
func getCorsHTTPFilter() *hcmv3.HttpFilter {

Expand All @@ -136,11 +158,13 @@ func getCorsHTTPFilter() *hcmv3.HttpFilter {
func getUpgradeFilters() []*hcmv3.HttpFilter {

cors := getCorsHTTPFilter()
grpcStats := getGRPCStatsHTTPFilter()
extAauth := getExtAuthzHTTPFilter()
apkWebSocketWASM := getAPKWebSocketWASMFilter()
router := getRouterHTTPFilter()
upgradeFilters := []*hcmv3.HttpFilter{
cors,
grpcStats,
extAauth,
apkWebSocketWASM,
router,
Expand Down Expand Up @@ -215,16 +239,16 @@ func getExtProcessHTTPFilter() *hcmv3.HttpFilter {
},
},
ProcessingMode: &ext_process.ProcessingMode{
ResponseBodyMode: ext_process.ProcessingMode_BUFFERED,
RequestHeaderMode: ext_process.ProcessingMode_SKIP,
ResponseBodyMode: ext_process.ProcessingMode_BUFFERED,
RequestHeaderMode: ext_process.ProcessingMode_SKIP,
ResponseHeaderMode: ext_process.ProcessingMode_SKIP,
},
MetadataOptions: &ext_process.MetadataOptions{
ForwardingNamespaces: &ext_process.MetadataOptions_MetadataNamespaces{
Untyped: []string{"envoy.filters.http.ext_authz", "envoy.filters.http.ext_proc"},
},
},
RequestAttributes: []string{"xds.route_metadata"},
RequestAttributes: []string{"xds.route_metadata"},
ResponseAttributes: []string{"xds.route_metadata"},
}
ext, err2 := anypb.New(externalProcessor)
Expand Down
4 changes: 2 additions & 2 deletions adapter/internal/oasparser/envoyconf/internal_dtos.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package envoyconf

import (
"github.com/wso2/apk/adapter/internal/oasparser/model"
dpv1alpha2 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha2"
"github.com/wso2/apk/common-go-libs/apis/dp/v1alpha3"
)

// routeCreateParams is the DTO used to provide information to the envoy route create function
Expand All @@ -41,7 +41,7 @@ type routeCreateParams struct {
isDefaultVersion bool
createDefaultPath bool
apiLevelRateLimitPolicy *model.RateLimitPolicy
apiProperties []dpv1alpha2.Property
apiProperties []v1alpha3.Property
environment string
envType string
mirrorClusterNames map[string][]string
Expand Down
126 changes: 112 additions & 14 deletions adapter/internal/oasparser/envoyconf/routes_with_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import (
cors_filter_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/cors/v3"
extAuthService "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_authz/v3"
extProcessorv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3"
ratelimitv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ratelimit/v3"
lua "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/lua/v3"
ratelimitv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ratelimit/v3"
tlsv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
upstreams "github.com/envoyproxy/go-control-plane/envoy/extensions/upstreams/http/v3"
envoy_type_matcherv3 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3"
Expand All @@ -54,7 +54,7 @@ import (
logging "github.com/wso2/apk/adapter/internal/logging"
"github.com/wso2/apk/adapter/internal/oasparser/constants"
"github.com/wso2/apk/adapter/internal/oasparser/model"
dpv1alpha2 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha2"
"github.com/wso2/apk/common-go-libs/apis/dp/v1alpha3"
"google.golang.org/protobuf/proto"
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"
)
Expand Down Expand Up @@ -92,6 +92,7 @@ const (
// to the api level clusters.
func CreateRoutesWithClusters(adapterInternalAPI *model.AdapterInternalAPI, interceptorCerts map[string][]byte, vHost string, organizationID string) (routesP []*routev3.Route,
clustersP []*clusterv3.Cluster, addressesP []*corev3.Address, err error) {

var (
routes []*routev3.Route
clusters []*clusterv3.Cluster
Expand Down Expand Up @@ -199,6 +200,77 @@ func CreateRoutesWithClusters(adapterInternalAPI *model.AdapterInternalAPI, inte
}
return routes, clusters, endpoints, nil
}

if adapterInternalAPI.GetAPIType() == constants.GRPC {
basePath := strings.TrimSuffix(adapterInternalAPI.Endpoints.Endpoints[0].Basepath, "/")

clusterName := getClusterName(adapterInternalAPI.Endpoints.EndpointPrefix, organizationID, vHost,
adapterInternalAPI.GetTitle(), apiVersion, "")
adapterInternalAPI.Endpoints.HTTP2BackendEnabled = true
cluster, address, err := processEndpoints(clusterName, adapterInternalAPI.Endpoints, timeout, basePath)
if err != nil {
logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR,
"Error while adding grpc endpoints for %s:%v. %v", apiTitle, apiVersion, err.Error()))
return nil, nil, nil, fmt.Errorf("error while adding grpc endpoints for %s:%v. %v", apiTitle, apiVersion,
err.Error())
}
clusters = append(clusters, cluster)
endpoints = append(endpoints, address...)

for _, resource := range adapterInternalAPI.GetResources() {
var clusterName string
resourcePath := resource.GetPath()
endpoint := resource.GetEndpoints()
endpoint.HTTP2BackendEnabled = true
basePath := strings.TrimSuffix(endpoint.Endpoints[0].Basepath, "/")
existingClusterName := getExistingClusterName(*endpoint, processedEndpoints)

if existingClusterName == "" {
clusterName = getClusterName(endpoint.EndpointPrefix, organizationID, vHost, adapterInternalAPI.GetTitle(), apiVersion, resource.GetID())
cluster, address, err := processEndpoints(clusterName, endpoint, timeout, basePath)
if err != nil {
logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR, "Error while adding resource level endpoints for %s:%v-%v. %v", apiTitle, apiVersion, resourcePath, err.Error()))
} else {
clusters = append(clusters, cluster)
endpoints = append(endpoints, address...)
processedEndpoints[clusterName] = *endpoint
}
} else {
clusterName = existingClusterName
}

// Create resource level interceptor clusters if required
clustersI, endpointsI, operationalReqInterceptors, operationalRespInterceptorVal := createInterceptorResourceClusters(adapterInternalAPI,
interceptorCerts, vHost, organizationID, apiRequestInterceptor, apiResponseInterceptor, resource)
clusters = append(clusters, clustersI...)
endpoints = append(endpoints, endpointsI...)
routeParams := genRouteCreateParams(adapterInternalAPI, resource, vHost, basePath, clusterName, *operationalReqInterceptors, *operationalRespInterceptorVal, organizationID,
false, false, nil)

routeP, err := createRoutes(routeParams)
if err != nil {
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error2231, logging.MAJOR,
"Error while creating routes for GRPC API %s %s for path: %s Error: %s", adapterInternalAPI.GetTitle(),
adapterInternalAPI.GetVersion(), resource.GetPath(), err.Error()))
return nil, nil, nil, fmt.Errorf("error while creating routes. %v", err)
}
routes = append(routes, routeP...)
if adapterInternalAPI.IsDefaultVersion {
defaultRoutes, errDefaultPath := createRoutes(genRouteCreateParams(adapterInternalAPI, resource, vHost, basePath, clusterName,
*operationalReqInterceptors, *operationalRespInterceptorVal, organizationID,
false, true, nil))
if errDefaultPath != nil {
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error2231, logging.MAJOR, "Error while creating routes for GRPC API %s %s for path: %s Error: %s",
adapterInternalAPI.GetTitle(), adapterInternalAPI.GetVersion(), removeFirstOccurrence(resource.GetPath(), adapterInternalAPI.GetVersion()), errDefaultPath.Error()))
return nil, nil, nil, fmt.Errorf("error while creating routes. %v", errDefaultPath)
}
routes = append(routes, defaultRoutes...)
}
}

return routes, clusters, endpoints, nil
}

for _, resource := range adapterInternalAPI.GetResources() {
var clusterName string
mirrorClusterNames := map[string][]string{}
Expand Down Expand Up @@ -884,9 +956,9 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
Override: &extProcessorv3.ExtProcPerRoute_Overrides{
Overrides: &extProcessorv3.ExtProcOverrides{
ProcessingMode: &extProcessorv3.ProcessingMode{
RequestHeaderMode: extProcessorv3.ProcessingMode_SKIP,
RequestHeaderMode: extProcessorv3.ProcessingMode_SKIP,
ResponseHeaderMode: extProcessorv3.ProcessingMode_SEND,
ResponseBodyMode: extProcessorv3.ProcessingMode_NONE,
ResponseBodyMode: extProcessorv3.ProcessingMode_NONE,
},
},
},
Expand Down Expand Up @@ -940,8 +1012,14 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
decorator *routev3.Decorator
)
if params.createDefaultPath {
xWso2Basepath = removeFirstOccurrence(xWso2Basepath, "/"+version)
resourcePath = removeFirstOccurrence(resource.GetPath(), "/"+version)
// check if basepath is separated from version by a . or /
if strings.Contains(basePath, "."+version) {
xWso2Basepath = removeFirstOccurrence(basePath, "."+version)
resourcePath = removeFirstOccurrence(resource.GetPath(), "."+version)
} else {
xWso2Basepath = removeFirstOccurrence(xWso2Basepath, "/"+version)
resourcePath = removeFirstOccurrence(resource.GetPath(), "/"+version)
}
}

if pathMatchType != gwapiv1.PathMatchExact {
Expand Down Expand Up @@ -1099,6 +1177,11 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
}
}

var mirrorClusterNameList []string
if mirrorClusterNames != nil && mirrorClusterNames[operation.GetID()] != nil {
mirrorClusterNameList = mirrorClusterNames[operation.GetID()]
}

// TODO: (suksw) preserve header key case?
if hasMethodRewritePolicy {
logger.LoggerOasparser.Debugf("Creating two routes to support method rewrite for %s %s. New method: %s",
Expand All @@ -1116,10 +1199,10 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
metadataValue := operation.GetMethod() + "_to_" + newMethod
match2.DynamicMetadata = generateMetadataMatcherForInternalRoutes(metadataValue)

action1 := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, mirrorClusterNames[operation.GetID()], resource.GetEnableBackendBasedAIRatelimit() && params.isAiAPI, resource.GetBackendBasedAIRatelimitDescriptorValue())
action2 := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, mirrorClusterNames[operation.GetID()], resource.GetEnableBackendBasedAIRatelimit() && params.isAiAPI, resource.GetBackendBasedAIRatelimitDescriptorValue())
action1 := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, mirrorClusterNameList, resource.GetEnableBackendBasedAIRatelimit() && params.isAiAPI, resource.GetBackendBasedAIRatelimitDescriptorValue())
action2 := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, mirrorClusterNameList, resource.GetEnableBackendBasedAIRatelimit() && params.isAiAPI, resource.GetBackendBasedAIRatelimitDescriptorValue())

requestHeadersToRemove := make([]string,0)
requestHeadersToRemove := make([]string, 0)
// Create route1 for current method.
// Do not add policies to route config. Send via enforcer
route1 := generateRouteConfig(xWso2Basepath+operation.GetMethod(), match1, action1, requestRedirectAction, metaData, decorator, perRouteFilterConfigs,
Expand All @@ -1141,7 +1224,7 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
} else {
var action *routev3.Route_Route
if requestRedirectAction == nil {
action = generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, mirrorClusterNames[operation.GetID()], resource.GetEnableBackendBasedAIRatelimit() && params.isAiAPI, resource.GetBackendBasedAIRatelimitDescriptorValue())
action = generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, mirrorClusterNameList, resource.GetEnableBackendBasedAIRatelimit() && params.isAiAPI, resource.GetBackendBasedAIRatelimitDescriptorValue())
}
logger.LoggerOasparser.Debug("Creating routes for resource with policies", resourcePath, operation.GetMethod())
// create route for current method. Add policies to route config. Send via enforcer
Expand Down Expand Up @@ -1170,7 +1253,17 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
action := generateRouteAction(apiType, routeConfig, rateLimitPolicyCriteria, nil, resource.GetEnableBackendBasedAIRatelimit() && params.isAiAPI, resource.GetBackendBasedAIRatelimitDescriptorValue())
rewritePath := generateRoutePathForReWrite(basePath, resourcePath, pathMatchType)
action.Route.RegexRewrite = generateRegexMatchAndSubstitute(rewritePath, resourcePath, pathMatchType)
requestHeadersToRemove := make([]string,0)
requestHeadersToRemove := make([]string, 0)

if apiType == constants.GRPC {
match.Headers = nil
newRoutePath := "/" + strings.TrimPrefix(resourcePath, basePath+".")
if newRoutePath == "/"+resourcePath {
temp := removeFirstOccurrence(basePath, "."+version)
newRoutePath = "/" + strings.TrimPrefix(resourcePath, temp+".")
}
action.Route.RegexRewrite = generateRegexMatchAndSubstitute(rewritePath, newRoutePath, pathMatchType)
}
route := generateRouteConfig(xWso2Basepath, match, action, nil, metaData, decorator, perRouteFilterConfigs,
nil, requestHeadersToRemove, nil, nil) // general headers to add and remove are included in this methods
routes = append(routes, route)
Expand Down Expand Up @@ -1335,8 +1428,13 @@ func CreateAPIDefinitionEndpoint(adapterInternalAPI *model.AdapterInternalAPI, v

matchPath := basePath + endpoint
if isDefaultversion {
basePathWithoutVersion := removeLastOccurrence(basePath, "/"+version)
matchPath = basePathWithoutVersion + endpoint
if adapterInternalAPI.GetAPIType() == constants.GRPC {
basePathWithoutVersion := removeLastOccurrence(basePath, "."+version)
matchPath = basePathWithoutVersion + "/" + vHost + endpoint
} else {
basePathWithoutVersion := removeLastOccurrence(basePath, "/"+version)
matchPath = basePathWithoutVersion + endpoint
}
}

matchPath = strings.Replace(matchPath, basePath, regexp.QuoteMeta(basePath), 1)
Expand Down Expand Up @@ -1632,7 +1730,7 @@ func getUpgradeConfig(apiType string) []*routev3.RouteAction_UpgradeConfig {
return upgradeConfig
}

func getAPIProperties(apiPropertiesConfig []dpv1alpha2.Property) string {
func getAPIProperties(apiPropertiesConfig []v1alpha3.Property) string {
var apiProperties = make(map[string]string)
for _, val := range apiPropertiesConfig {
apiProperties[val.Name] = val.Value
Expand Down
Loading

0 comments on commit 95c0ffb

Please sign in to comment.