From 72ba6d780d0602fc8df1b702d70f345f519dd514 Mon Sep 17 00:00:00 2001 From: Ulf Bjorkengren Date: Thu, 5 Dec 2024 17:16:01 +0100 Subject: [PATCH] Request local data compression support Signed-off-by: Ulf Bjorkengren --- .../Javascript/appclient_commands.txt | 3 + server/vissv2server/wsMgr/wsMgr.go | 245 +++++++++++++++--- 2 files changed, 205 insertions(+), 43 deletions(-) diff --git a/client/client-1.0/Javascript/appclient_commands.txt b/client/client-1.0/Javascript/appclient_commands.txt index c6d59f32..428e7312 100644 --- a/client/client-1.0/Javascript/appclient_commands.txt +++ b/client/client-1.0/Javascript/appclient_commands.txt @@ -12,6 +12,9 @@ Get request: {"action":"get","path":"Server.Support.Protocol","requestId":"133"} {"action":"get","path":"Vehicle.Chassis.AxleCount","requestId":"321"} +Get Request with request local data compression: +{"action":"get","path":"Vehicle.Speed", "dc":"2+0","requestId":"232"} + Get request for historic data: {"action":"get","path":"Vehicle.Acceleration.Longitudinal","filter":{"variant":"history","parameter":"P2DT12H"},"requestId":"234"} diff --git a/server/vissv2server/wsMgr/wsMgr.go b/server/vissv2server/wsMgr/wsMgr.go index da837df3..38311a96 100644 --- a/server/vissv2server/wsMgr/wsMgr.go +++ b/server/vissv2server/wsMgr/wsMgr.go @@ -11,6 +11,8 @@ package wsMgr import ( utils "github.com/covesa/vissr/utils" "strings" + "strconv" + "sort" ) // the number of channel array elements sets the limit for max number of parallel WS app clients @@ -21,6 +23,22 @@ var clientBackendChan []chan string var wsClientIndex int const isClientLocal = false +/* +* responseHandling values, instructs server about possible path compression: +1: compress path, delete cache entry (get on single path) +2. do not compress path, delete cache entry (get on multiple paths) <- This shall not be saved in cache. +3. compress path, do not delete cache entry (subscribe, single path, but also for multiple paths after first time) +4. do not compress path, do not delete cache entry (subscribe on multiple paths) <- This is changed to 3 after first time +*/ +type DataCompressionElement struct { + PayloadId string + Dc string + ResponseHandling int //possible values 1..4, see description + SortedList []string +} +var dataCompressionCache []DataCompressionElement +const DCCACHESIZE = 20 + func initChannels() { wsClientChan = make([]chan string, NUMOFWSCLIENTS) clientBackendChan = make([]chan string, NUMOFWSCLIENTS) @@ -38,59 +56,200 @@ func RemoveRoutingForwardResponse(response string, transportMgrChan chan string) } } +func checkForCompression(reqMessage string) { + if strings.Contains(reqMessage, `"dc"`) { + dcValue, payloadId, singleResponse, singlePath := getDcConfig(reqMessage) +utils.Info.Printf("checkForCompression:dcValue=%s, payloadId=%s, singleResponse=%d, singlePath=%d", dcValue, payloadId, singleResponse, singlePath) + if len(dcValue) > 0 { + responseHandling := 1 // singleResponse && singlePath + if singleResponse && !singlePath { + responseHandling = 2 + } else if !singleResponse && singlePath { + responseHandling = 3 + } else if !singleResponse && !singlePath { + responseHandling = 4 + } + dcCacheInsert(payloadId, dcValue, responseHandling) + } + } +} + +func getDcConfig(reqMessage string) (string, string, bool, bool) { + var dcValue, payloadId string + isGet := false + singlePath := false + dcValue = getValueForKey(reqMessage, `"dc"`) + isGet = strings.Contains(reqMessage, `"get"`) + singlePath = !strings.Contains(reqMessage, `"paths"`) + if isGet { + payloadId = getValueForKey(reqMessage, `"requestId"`) + } else { + payloadId = getValueForKey(reqMessage, `"subscriptionId"`) + } + return dcValue, payloadId, isGet, singlePath +} + +func getValueForKey(reqMessage string, key string) string { + var keyValue string + keyIndex := strings.Index(reqMessage, key) + len(key) + hyphenIndex1 := strings.Index(reqMessage[keyIndex:], `"`) + if hyphenIndex1 != -1 { + hyphenIndex2 := strings.Index(reqMessage[keyIndex+hyphenIndex1+1:], `"`) + if hyphenIndex2 != -1 { + keyValue = reqMessage[keyIndex+hyphenIndex1+1:keyIndex+hyphenIndex1+1+hyphenIndex2] + } + } + return keyValue +} + +func initDcCache() { + dataCompressionCache = make([]DataCompressionElement, DCCACHESIZE) + for i := 0; i < DCCACHESIZE; i++ { + dataCompressionCache[i].ResponseHandling = -1 + } +} + +func dcCacheInsert(payloadId string, dcValue string, responseHandling int) { + for i := 0; i < DCCACHESIZE; i++ { + if dataCompressionCache[i].ResponseHandling == -1 { + dataCompressionCache[i].ResponseHandling = responseHandling + dataCompressionCache[i].PayloadId = payloadId + dataCompressionCache[i].Dc = dcValue + } + } +} + +func getDcCacheIndex(payloadId string) int { + for i := 0; i < DCCACHESIZE; i++ { + if dataCompressionCache[i].PayloadId == payloadId { + return i + } + } + return -1 +} + +func resetDcCache(cacheIndex int) { + dataCompressionCache[cacheIndex].ResponseHandling = -1 + dataCompressionCache[cacheIndex].SortedList = nil +} + +func checkForDecompression(respMessage string) string { + var payloadId string + isUnsubscribe := false + if strings.Contains(respMessage, `"error"`) { + return respMessage + } + switch getValueForKey(respMessage, `"action"`) { + case "unsubscribe": + isUnsubscribe = true + fallthrough + case "get": + payloadId = getValueForKey(respMessage, `"requestId"`) + case "subscription": + payloadId = getValueForKey(respMessage, `"subscriptionId"`) + default: return respMessage + } + cacheIndex := getDcCacheIndex(payloadId) +utils.Info.Printf("checkForDecompression:getValueForKey(respMessage, `action`)=%s, payloadId=%s, cacheIndex=%d", getValueForKey(respMessage, `"action"`), payloadId, cacheIndex) + if cacheIndex == -1 { + return respMessage + } + if isUnsubscribe { + resetDcCache(cacheIndex) + return respMessage + } + switch dataCompressionCache[cacheIndex].ResponseHandling { + case 1: + dataCompressionCache[cacheIndex].SortedList = getSortedPaths(respMessage) + respMessage = compressPaths(respMessage, dataCompressionCache[cacheIndex].SortedList) + resetDcCache(cacheIndex) + case 2: return respMessage + case 3: + dataCompressionCache[cacheIndex].SortedList = getSortedPaths(respMessage) + respMessage = compressPaths(respMessage, dataCompressionCache[cacheIndex].SortedList) + case 4: + dataCompressionCache[cacheIndex].ResponseHandling = 3 + default: return respMessage + } + return respMessage +} + +func getSortedPaths(respMessage string) []string { + respMap := make(map[string]interface{}) + if utils.MapRequest(respMessage, &respMap) != 0 { + utils.Error.Printf("getSortedPaths():invalid JSON format=%s", respMessage) + return nil + } + var paths []string + switch data := respMap["data"].(type) { + case interface{}: + utils.Info.Println(data, "is interface{}") + for k, v := range data.(map[string]interface{}) { + if k == "path" { + paths = append(paths, v.(string)) + } + } + case []interface{}: + utils.Info.Println(data, "is []interface{}") + for i := 0; i < len(data); i++ { + for k, v := range data[i].(map[string]interface{}) { + if k == "path" { + paths = append(paths, v.(string)) + } + } + } + default: + utils.Info.Println(data, "is of an unknown type") + } + sort.Strings(paths) + return paths +} + +func compressPaths(respMessage string, sortedList []string) string { + for i := 0; i < len(sortedList); i++ { + respMessage = strings.Replace(respMessage, sortedList[i], strconv.Itoa(i), 1) + } + return respMessage +} + func WsMgrInit(mgrId int, transportMgrChan chan string) { + var reqMessage string + var clientId int utils.ReadTransportSecConfig() initChannels() - - go utils.WsServer{ClientBackendChannel: clientBackendChan}.InitClientServer(utils.MuxServer[1], wsClientChan, mgrId, &wsClientIndex) // go routine needed due to listenAndServe call... - + initDcCache() + go utils.WsServer{ClientBackendChannel: clientBackendChan}.InitClientServer(utils.MuxServer[1], wsClientChan, mgrId, &wsClientIndex) utils.Info.Println("WS manager data session initiated.") for { select { case respMessage := <-transportMgrChan: utils.Info.Printf("WS mgr hub: Response from server core:%s", respMessage) + respMessage = checkForDecompression(respMessage) RemoveRoutingForwardResponse(respMessage, transportMgrChan) - case reqMessage := <-wsClientChan[0]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 0, transportMgrChan) - case reqMessage := <-wsClientChan[1]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 1, transportMgrChan) - case reqMessage := <-wsClientChan[2]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 2, transportMgrChan) - case reqMessage := <-wsClientChan[3]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 3, transportMgrChan) - case reqMessage := <-wsClientChan[4]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 4, transportMgrChan) - case reqMessage := <-wsClientChan[5]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 5, transportMgrChan) - case reqMessage := <-wsClientChan[6]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 6, transportMgrChan) - case reqMessage := <-wsClientChan[7]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 7, transportMgrChan) - case reqMessage := <-wsClientChan[8]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 8, transportMgrChan) - case reqMessage := <-wsClientChan[9]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 9, transportMgrChan) - case reqMessage := <-wsClientChan[10]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 10, transportMgrChan) - case reqMessage := <-wsClientChan[11]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 11, transportMgrChan) - case reqMessage := <-wsClientChan[12]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 12, transportMgrChan) - case reqMessage := <-wsClientChan[13]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 13, transportMgrChan) - case reqMessage := <-wsClientChan[14]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 14, transportMgrChan) - case reqMessage := <-wsClientChan[15]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 15, transportMgrChan) - case reqMessage := <-wsClientChan[16]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 16, transportMgrChan) - case reqMessage := <-wsClientChan[17]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 17, transportMgrChan) - case reqMessage := <-wsClientChan[18]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 18, transportMgrChan) - case reqMessage := <-wsClientChan[19]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 19, transportMgrChan) + continue + case reqMessage = <-wsClientChan[0]: clientId = 0 + case reqMessage = <-wsClientChan[1]: clientId = 1 + case reqMessage = <-wsClientChan[2]: clientId = 2 + case reqMessage = <-wsClientChan[3]: clientId = 3 + case reqMessage = <-wsClientChan[4]: clientId = 4 + case reqMessage = <-wsClientChan[5]: clientId = 5 + case reqMessage = <-wsClientChan[6]: clientId = 6 + case reqMessage = <-wsClientChan[7]: clientId = 7 + case reqMessage = <-wsClientChan[8]: clientId = 8 + case reqMessage = <-wsClientChan[9]: clientId = 9 + case reqMessage = <-wsClientChan[10]: clientId = 10 + case reqMessage = <-wsClientChan[11]: clientId = 11 + case reqMessage = <-wsClientChan[12]: clientId = 12 + case reqMessage = <-wsClientChan[13]: clientId = 13 + case reqMessage = <-wsClientChan[14]: clientId = 14 + case reqMessage = <-wsClientChan[15]: clientId = 15 + case reqMessage = <-wsClientChan[16]: clientId = 16 + case reqMessage = <-wsClientChan[17]: clientId = 17 + case reqMessage = <-wsClientChan[18]: clientId = 18 + case reqMessage = <-wsClientChan[19]: clientId = 19 } + checkForCompression(reqMessage) + utils.AddRoutingForwardRequest(reqMessage, mgrId, clientId, transportMgrChan) } }