Skip to content

Commit

Permalink
Improved handling blocked APIs by not invoking /apis endpoint unless …
Browse files Browse the repository at this point in the history
…it is the startup. The JMS/ASB event's details are used to populate the necessary maps(api-metadata)

APIList object is not passed to the enforcer now, instead of that EnforcerAPI's lifecycle state is used.
Any API within the api metadata map should be kept only if the API is a default API or API is in blocked state. If the API becomes unblocked, it is removed from the map.
During the startup also only blocked and default versioned APIs are pulled.
  • Loading branch information
VirajSalaka committed Jul 4, 2022
1 parent 932b1a9 commit aa16e1f
Show file tree
Hide file tree
Showing 28 changed files with 735 additions and 704 deletions.
112 changes: 43 additions & 69 deletions adapter/internal/discovery/xds/marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package xds
import (
"encoding/json"
"fmt"
"strconv"

"github.com/wso2/product-microgateway/adapter/config"
logger "github.com/wso2/product-microgateway/adapter/internal/loggers"
Expand All @@ -14,8 +13,8 @@ import (
)

var (
// APIListMap has the following mapping label -> apiUUID -> API (Metadata)
APIListMap map[string]map[string]*subscription.APIs
// APIMetadataMap has the following mapping apiUUID -> API (Metadata)
APIMetadataMap map[string]*subscription.APIs
// SubscriptionMap contains the subscriptions recieved from API Manager Control Plane
SubscriptionMap map[int32]*subscription.Subscription
// ApplicationMap contains the applications recieved from API Manager Control Plane
Expand All @@ -40,8 +39,6 @@ const (
DeleteEvent
)

const blockedStatus string = "BLOCKED"

// MarshalConfig will marshal a Config struct - read from the config toml - to
// enfocer's CDS resource representation.
func MarshalConfig(config *config.Config) *enforcer.Config {
Expand Down Expand Up @@ -480,20 +477,15 @@ func MarshalSubscriptionPolicyEventAndReturnList(policy *types.SubscriptionPolic
return marshalSubscriptionPolicyMapToList(SubscriptionPolicyMap)
}

// MarshalAPIMetataAndReturnList updates the internal APIListMap and returns the XDS compatible APIList.
// apiList is the internal APIList object (For single API, this would contain a List with just one API)
// UpdateAPIMetataMapWithMultipleAPIs updates the internal APIMetadataMap and it needs to be called during the startup.
// The purpose here is to store default versioned APIs and blocked APIs.
// initialAPIUUIDListMap is assigned during startup when global adapter is associated. This would be empty otherwise.
// gatewayLabel is the environment.
func MarshalAPIMetataAndReturnList(apiList *types.APIList, initialAPIUUIDListMap map[string]int, gatewayLabel string) *subscription.APIList {
func UpdateAPIMetataMapWithMultipleAPIs(apiList *types.APIList, initialAPIUUIDListMap map[string]int) {

if APIListMap == nil {
APIListMap = make(map[string]map[string]*subscription.APIs)
}
// var resourceMapForLabel map[string]*subscription.APIs
if _, ok := APIListMap[gatewayLabel]; !ok {
APIListMap[gatewayLabel] = make(map[string]*subscription.APIs)
if APIMetadataMap == nil {
APIMetadataMap = make(map[string]*subscription.APIs)
}
resourceMapForLabel := APIListMap[gatewayLabel]

for _, api := range apiList.List {
// initialAPIUUIDListMap is not null if the adapter is running with global adapter enabled, and it is
// the first method invocation.
Expand All @@ -503,44 +495,44 @@ func MarshalAPIMetataAndReturnList(apiList *types.APIList, initialAPIUUIDListMap
}
}
newAPI := marshalAPIMetadata(&api)
resourceMapForLabel[api.UUID] = newAPI
}
return marshalAPIListMapToList(resourceMapForLabel)
}

// DeleteAPIAndReturnList removes the API from internal maps and returns the marshalled API List.
// If the apiUUID is not found in the internal map under the provided environment, then it would return a
// nil value. Hence it is required to check if the return value is nil, prior to updating the XDS cache.
func DeleteAPIAndReturnList(apiUUID, organizationUUID string, gatewayLabel string) *subscription.APIList {
if _, ok := APIListMap[gatewayLabel]; !ok {
logger.LoggerXds.Debugf("No API Metadata is available under gateway Environment : %s", gatewayLabel)
return nil
APIMetadataMap[api.UUID] = newAPI
}
delete(APIListMap[gatewayLabel], apiUUID)
return marshalAPIListMapToList(APIListMap[gatewayLabel])
}

// MarshalAPIForLifeCycleChangeEventAndReturnList updates the internal map's API instances lifecycle state only if
// stored API Instance's or input status event is a blocked event.
// If no change is applied, it would return nil. Hence the XDS cache should not be updated.
func MarshalAPIForLifeCycleChangeEventAndReturnList(apiUUID, status, gatewayLabel string) *subscription.APIList {
if _, ok := APIListMap[gatewayLabel]; !ok {
logger.LoggerXds.Debugf("No API Metadata is available under gateway Environment : %s", gatewayLabel)
return nil
}
if _, ok := APIListMap[gatewayLabel][apiUUID]; !ok {
logger.LoggerXds.Debugf("No API Metadata for API ID: %s is available under gateway Environment : %s",
apiUUID, gatewayLabel)
return nil
}
storedAPILCState := APIListMap[gatewayLabel][apiUUID].LcState

// Because the adapter only required to update the XDS if it is related to blocked state.
if !(storedAPILCState == blockedStatus || status == blockedStatus) {
return nil
}
APIListMap[gatewayLabel][apiUUID].LcState = status
return marshalAPIListMapToList(APIListMap[gatewayLabel])
// UpdateAPIMetataMapWithAPILCEvent updates the internal map's API instances lifecycle state only if
// stored API Instance's or input status event is a blocked event. If the API's state is changed to un-BLOCKED state,
// the record will be removed.
func UpdateAPIMetataMapWithAPILCEvent(apiUUID, status string) {

apiEntry, apiFound := APIMetadataMap[apiUUID]
if !apiFound {
// IF API is not stored within the metadata Map and the lifecycle state is something else other BLOCKED state, those are discarded.
if status != blockedStatus {
logger.LoggerXds.Debugf("API life cycle state change event is discarded for the API : %s", apiUUID)
return
}
// IF API is not available and state is BLOCKED, needs to create a new instance and store within the Map.
logger.LoggerXds.Debugf("No API Metadata for API ID: %s is available. Hence a new record is added.", apiUUID)
APIMetadataMap[apiUUID] = &subscription.APIs{
Uuid: apiUUID,
LcState: status,
}
logger.LoggerXds.Infof("API life cycle state change event with state %q is updated for the API : %s",
status, apiUUID)
return
}
// If the API is available in the metadata map it should be either a BLOCKED API and/or default versioned API.
// If the update for existing API entry is not "BLOCKED" then the API can be removed from the list.
// when the API is unavailable in the api metadata list received in the enforcer, it would be treated as
// an unblocked API.
// But if the API is not the default version, those records won't be stored.
if status != blockedStatus && !apiEntry.IsDefaultVersion {
delete(APIMetadataMap, apiUUID)
return
}
APIMetadataMap[apiUUID].LcState = status
logger.LoggerXds.Infof("API life cycle state change event with state %q is updated for the API : %s",
status, apiUUID)
}

func marshalSubscription(subscriptionInternal *types.Subscription) *subscription.Subscription {
Expand Down Expand Up @@ -599,13 +591,6 @@ func marshalKeyMapping(keyMappingInternal *types.ApplicationKeyMapping) *subscri

func marshalAPIMetadata(api *types.API) *subscription.APIs {
return &subscription.APIs{
ApiId: strconv.Itoa(api.APIID),
Name: api.Name,
Provider: api.Provider,
Version: api.Version,
Context: api.Context,
Policy: api.Policy,
ApiType: api.APIType,
Uuid: api.UUID,
IsDefaultVersion: api.IsDefaultVersion,
LcState: api.APIStatus,
Expand Down Expand Up @@ -642,14 +627,3 @@ func marshalSubscriptionPolicy(policy *types.SubscriptionPolicy) *subscription.S
func GetApplicationKeyMappingReference(keyMapping *types.ApplicationKeyMapping) string {
return keyMapping.ConsumerKey + ":" + keyMapping.KeyManager
}

// CheckIfAPIMetadataIsAlreadyAvailable returns true only if the API Metadata for the given API UUID
// is already available
func CheckIfAPIMetadataIsAlreadyAvailable(apiUUID, label string) bool {
if _, labelAvailable := APIListMap[label]; labelAvailable {
if _, apiAvailale := APIListMap[label][apiUUID]; apiAvailale {
return true
}
}
return false
}
89 changes: 80 additions & 9 deletions adapter/internal/discovery/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/wso2/product-microgateway/adapter/internal/oasparser/model"
mgw "github.com/wso2/product-microgateway/adapter/internal/oasparser/model"
"github.com/wso2/product-microgateway/adapter/internal/svcdiscovery"
"github.com/wso2/product-microgateway/adapter/pkg/discovery/api/wso2/discovery/api"
subscription "github.com/wso2/product-microgateway/adapter/pkg/discovery/api/wso2/discovery/subscription"
throttle "github.com/wso2/product-microgateway/adapter/pkg/discovery/api/wso2/discovery/throttle"
wso2_cache "github.com/wso2/product-microgateway/adapter/pkg/discovery/protocol/cache/v3"
Expand Down Expand Up @@ -83,7 +84,7 @@ var (
orgIDOpenAPIRoutesMap map[string]map[string][]*routev3.Route // organizationID -> Vhost:API_UUID -> Envoy Routes map
orgIDOpenAPIClustersMap map[string]map[string][]*clusterv3.Cluster // organizationID -> Vhost:API_UUID -> Envoy Clusters map
orgIDOpenAPIEndpointsMap map[string]map[string][]*corev3.Address // organizationID -> Vhost:API_UUID -> Envoy Endpoints map
orgIDOpenAPIEnforcerApisMap map[string]map[string]types.Resource // organizationID -> Vhost:API_UUID -> API Resource map
orgIDOpenAPIEnforcerApisMap map[string]map[string]*api.Api // organizationID -> Vhost:API_UUID -> API Resource map
orgIDvHostBasepathMap map[string]map[string]string // organizationID -> Vhost:basepath -> Vhost:API_UUID

reverseAPINameVersionMap map[string]string
Expand Down Expand Up @@ -119,6 +120,8 @@ const (
maxRandomInt int = 999999999
prototypedAPI string = "PROTOTYPED"
apiKeyFieldSeparator string = ":"
blockedStatus string = "BLOCKED"
nonBlockedStatus string = "CREATED/PUBLISHED"
)

// IDHash uses ID field as the node hash.
Expand Down Expand Up @@ -161,7 +164,7 @@ func init() {
orgIDOpenAPIRoutesMap = make(map[string]map[string][]*routev3.Route) // organizationID -> Vhost:API_UUID -> Envoy Routes map
orgIDOpenAPIClustersMap = make(map[string]map[string][]*clusterv3.Cluster) // organizationID -> Vhost:API_UUID -> Envoy Clusters map
orgIDOpenAPIEndpointsMap = make(map[string]map[string][]*corev3.Address) // organizationID -> Vhost:API_UUID -> Envoy Endpoints map
orgIDOpenAPIEnforcerApisMap = make(map[string]map[string]types.Resource) // organizationID -> Vhost:API_UUID -> API Resource map
orgIDOpenAPIEnforcerApisMap = make(map[string]map[string]*api.Api) // organizationID -> Vhost:API_UUID -> API Resource map
orgIDvHostBasepathMap = make(map[string]map[string]string)

reverseAPINameVersionMap = make(map[string]string)
Expand Down Expand Up @@ -334,6 +337,8 @@ func UpdateAPI(vHost string, apiProject mgw.ProjectAPI, environments []string) (
apiYaml.Name, apiYaml.Version, organizationID)
return nil, validationErr
}
// Update the LifecycleStatus of the API.
updateLCStateOfMgwSwagger(&mgwSwagger)

// -------- Finished updating mgwSwagger struct

Expand Down Expand Up @@ -434,28 +439,60 @@ func UpdateAPI(vHost string, apiProject mgw.ProjectAPI, environments []string) (
}

if _, ok := orgIDOpenAPIEnforcerApisMap[organizationID]; ok {
orgIDOpenAPIEnforcerApisMap[organizationID][apiIdentifier] = oasParser.GetEnforcerAPI(mgwSwagger,
apiProject.APILifeCycleStatus, vHost)
orgIDOpenAPIEnforcerApisMap[organizationID][apiIdentifier] = oasParser.GetEnforcerAPI(mgwSwagger, vHost)
} else {
enforcerAPIMap := make(map[string]types.Resource)
enforcerAPIMap[apiIdentifier] = oasParser.GetEnforcerAPI(mgwSwagger, apiProject.APILifeCycleStatus,
vHost)
enforcerAPIMap := make(map[string]*api.Api)
enforcerAPIMap[apiIdentifier] = oasParser.GetEnforcerAPI(mgwSwagger, vHost)
orgIDOpenAPIEnforcerApisMap[organizationID] = enforcerAPIMap
}

// TODO: (VirajSalaka) Fault tolerance mechanism implementation
revisionStatus := updateXdsCacheOnAPIAdd(oldLabels, newLabels)
if revisionStatus {
// send updated revision to control plane
deployedRevision = notifier.UpdateDeployedRevisions(apiYaml.ID, apiYaml.RevisionID, environments,
vHost)
deployedRevision = notifier.UpdateDeployedRevisions(apiYaml.ID, apiYaml.RevisionID, environments, vHost)
}
if svcdiscovery.IsServiceDiscoveryEnabled {
startConsulServiceDiscovery(organizationID) //consul service discovery starting point
}
return deployedRevision, nil
}

// UpdateAPIInEnforcerForBlockedAPIUpdate updates the state of APIMetadataMap under apiID with the lifecycle state and then
// XDS cache for enforcerAPIs is updated.
func UpdateAPIInEnforcerForBlockedAPIUpdate(apiID, organizationID, state string) {
mutexForInternalMapUpdate.Lock()
defer mutexForInternalMapUpdate.Unlock()
// First needs to update the API Metadata Map
UpdateAPIMetataMapWithAPILCEvent(apiID, state)
var apiReferenceArray []string
// Iterate through the enforcerAPIsMap and update the lifecycle status for the map. This is the map representing
// runtime-artifact for each API
if openAPIEnforcerAPIsMap, orgFound := orgIDOpenAPIEnforcerApisMap[organizationID]; orgFound {
// The reference is vhost:apiUUID. Hence it is required to iterate through all API entries as there could be multiple deployments of the same API
// under different vhosts.
for apiReference, enforcerAPI := range openAPIEnforcerAPIsMap {
if strings.HasSuffix(apiReference, ":"+apiID) && (state == blockedStatus || enforcerAPI.ApiLifeCycleState == blockedStatus) {
logger.LoggerXds.Infof("API Lifecycle status is updated for the API %s to %s state", apiReference, state)
enforcerAPI.ApiLifeCycleState = state
apiReferenceArray = append(apiReferenceArray, apiReference)
}
}
} else {
logger.LoggerXds.Infof("API Life Cycle event is not applied due to irrelevant tenant domain : %s.", organizationID)
return
}

// For all the gateway labels containing the API, the enforcer XDS cache needs to be updated.
if openAPIEnvoyLabelMap, ok := orgIDOpenAPIEnvoyMap[organizationID]; ok {
for _, apiReference := range apiReferenceArray {
if labels, labelsFound := openAPIEnvoyLabelMap[apiReference]; labelsFound {
updateXdsCacheForEnforcerAPIsOnly(labels)
}
}
}
}

// GetAllEnvironments returns all the environments merging new environments with already deployed environments
// of the given vhost of the API
func GetAllEnvironments(apiUUID, vhost string, newEnvironments []string) []string {
Expand Down Expand Up @@ -782,6 +819,12 @@ func updateXdsCacheOnAPIAdd(oldLabels []string, newLabels []string) bool {
return revisionStatus
}

func updateXdsCacheForEnforcerAPIsOnly(labels []string) {
for _, label := range labels {
UpdateEnforcerApis(label, generateEnforcerAPIsForLabel(label), "")
}
}

// GenerateEnvoyResoucesForLabel generates envoy resources for a given label
// This method will list out all APIs mapped to the label. and generate envoy resources for all of these APIs.
func GenerateEnvoyResoucesForLabel(label string) ([]types.Resource, []types.Resource, []types.Resource,
Expand Down Expand Up @@ -850,6 +893,24 @@ func GenerateEnvoyResoucesForLabel(label string) ([]types.Resource, []types.Reso
return endpoints, clusters, listeners, routeConfigs, apis
}

// generateEnforcerAPIsForLabel generates ebforcerAPIs resource array for a given label.
// This is used when the envoy resources are not required to have changes but the enforcer APIs are required to (Blocked State APIs)
func generateEnforcerAPIsForLabel(label string) []types.Resource {
var apis []types.Resource

for organizationID, entityMap := range orgIDOpenAPIEnvoyMap {
for apiKey, labels := range entityMap {
if arrayContains(labels, label) {
enforcerAPI, ok := orgIDOpenAPIEnforcerApisMap[organizationID][apiKey]
if ok {
apis = append(apis, enforcerAPI)
}
}
}
}
return apis
}

// GenerateGlobalClusters generates the globally available clusters and endpoints.
func GenerateGlobalClusters(label string) {
clusters, endpoints := oasParser.GetGlobalClusters()
Expand Down Expand Up @@ -1239,3 +1300,13 @@ func UpdateEnforcerThrottleData(throttleData *throttle.ThrottleData) {
enforcerThrottleData = t
logger.LoggerXds.Infof("New Throttle Data cache update for the label: " + label + " version: " + fmt.Sprint(version))
}

func updateLCStateOfMgwSwagger(mgwSwagger *model.MgwSwagger) {
// If there are any metadata stored under the APIMetadataMap and Life Cycle state is blocked, update the mgwSwagger
apiEntry, apiFound := APIMetadataMap[mgwSwagger.GetID()]
if apiFound && apiEntry.LcState == blockedStatus {
mgwSwagger.LifeCycleState = blockedStatus
return
}
mgwSwagger.LifeCycleState = nonBlockedStatus
}
Loading

0 comments on commit aa16e1f

Please sign in to comment.