From 72ba6d780d0602fc8df1b702d70f345f519dd514 Mon Sep 17 00:00:00 2001 From: Ulf Bjorkengren Date: Thu, 5 Dec 2024 17:16:01 +0100 Subject: [PATCH 1/2] Request local data compression support Signed-off-by: Ulf Bjorkengren --- .../Javascript/appclient_commands.txt | 3 + server/vissv2server/wsMgr/wsMgr.go | 245 +++++++++++++++--- 2 files changed, 205 insertions(+), 43 deletions(-) diff --git a/client/client-1.0/Javascript/appclient_commands.txt b/client/client-1.0/Javascript/appclient_commands.txt index c6d59f32..428e7312 100644 --- a/client/client-1.0/Javascript/appclient_commands.txt +++ b/client/client-1.0/Javascript/appclient_commands.txt @@ -12,6 +12,9 @@ Get request: {"action":"get","path":"Server.Support.Protocol","requestId":"133"} {"action":"get","path":"Vehicle.Chassis.AxleCount","requestId":"321"} +Get Request with request local data compression: +{"action":"get","path":"Vehicle.Speed", "dc":"2+0","requestId":"232"} + Get request for historic data: {"action":"get","path":"Vehicle.Acceleration.Longitudinal","filter":{"variant":"history","parameter":"P2DT12H"},"requestId":"234"} diff --git a/server/vissv2server/wsMgr/wsMgr.go b/server/vissv2server/wsMgr/wsMgr.go index da837df3..38311a96 100644 --- a/server/vissv2server/wsMgr/wsMgr.go +++ b/server/vissv2server/wsMgr/wsMgr.go @@ -11,6 +11,8 @@ package wsMgr import ( utils "github.com/covesa/vissr/utils" "strings" + "strconv" + "sort" ) // the number of channel array elements sets the limit for max number of parallel WS app clients @@ -21,6 +23,22 @@ var clientBackendChan []chan string var wsClientIndex int const isClientLocal = false +/* +* responseHandling values, instructs server about possible path compression: +1: compress path, delete cache entry (get on single path) +2. do not compress path, delete cache entry (get on multiple paths) <- This shall not be saved in cache. +3. compress path, do not delete cache entry (subscribe, single path, but also for multiple paths after first time) +4. do not compress path, do not delete cache entry (subscribe on multiple paths) <- This is changed to 3 after first time +*/ +type DataCompressionElement struct { + PayloadId string + Dc string + ResponseHandling int //possible values 1..4, see description + SortedList []string +} +var dataCompressionCache []DataCompressionElement +const DCCACHESIZE = 20 + func initChannels() { wsClientChan = make([]chan string, NUMOFWSCLIENTS) clientBackendChan = make([]chan string, NUMOFWSCLIENTS) @@ -38,59 +56,200 @@ func RemoveRoutingForwardResponse(response string, transportMgrChan chan string) } } +func checkForCompression(reqMessage string) { + if strings.Contains(reqMessage, `"dc"`) { + dcValue, payloadId, singleResponse, singlePath := getDcConfig(reqMessage) +utils.Info.Printf("checkForCompression:dcValue=%s, payloadId=%s, singleResponse=%d, singlePath=%d", dcValue, payloadId, singleResponse, singlePath) + if len(dcValue) > 0 { + responseHandling := 1 // singleResponse && singlePath + if singleResponse && !singlePath { + responseHandling = 2 + } else if !singleResponse && singlePath { + responseHandling = 3 + } else if !singleResponse && !singlePath { + responseHandling = 4 + } + dcCacheInsert(payloadId, dcValue, responseHandling) + } + } +} + +func getDcConfig(reqMessage string) (string, string, bool, bool) { + var dcValue, payloadId string + isGet := false + singlePath := false + dcValue = getValueForKey(reqMessage, `"dc"`) + isGet = strings.Contains(reqMessage, `"get"`) + singlePath = !strings.Contains(reqMessage, `"paths"`) + if isGet { + payloadId = getValueForKey(reqMessage, `"requestId"`) + } else { + payloadId = getValueForKey(reqMessage, `"subscriptionId"`) + } + return dcValue, payloadId, isGet, singlePath +} + +func getValueForKey(reqMessage string, key string) string { + var keyValue string + keyIndex := strings.Index(reqMessage, key) + len(key) + hyphenIndex1 := strings.Index(reqMessage[keyIndex:], `"`) + if hyphenIndex1 != -1 { + hyphenIndex2 := strings.Index(reqMessage[keyIndex+hyphenIndex1+1:], `"`) + if hyphenIndex2 != -1 { + keyValue = reqMessage[keyIndex+hyphenIndex1+1:keyIndex+hyphenIndex1+1+hyphenIndex2] + } + } + return keyValue +} + +func initDcCache() { + dataCompressionCache = make([]DataCompressionElement, DCCACHESIZE) + for i := 0; i < DCCACHESIZE; i++ { + dataCompressionCache[i].ResponseHandling = -1 + } +} + +func dcCacheInsert(payloadId string, dcValue string, responseHandling int) { + for i := 0; i < DCCACHESIZE; i++ { + if dataCompressionCache[i].ResponseHandling == -1 { + dataCompressionCache[i].ResponseHandling = responseHandling + dataCompressionCache[i].PayloadId = payloadId + dataCompressionCache[i].Dc = dcValue + } + } +} + +func getDcCacheIndex(payloadId string) int { + for i := 0; i < DCCACHESIZE; i++ { + if dataCompressionCache[i].PayloadId == payloadId { + return i + } + } + return -1 +} + +func resetDcCache(cacheIndex int) { + dataCompressionCache[cacheIndex].ResponseHandling = -1 + dataCompressionCache[cacheIndex].SortedList = nil +} + +func checkForDecompression(respMessage string) string { + var payloadId string + isUnsubscribe := false + if strings.Contains(respMessage, `"error"`) { + return respMessage + } + switch getValueForKey(respMessage, `"action"`) { + case "unsubscribe": + isUnsubscribe = true + fallthrough + case "get": + payloadId = getValueForKey(respMessage, `"requestId"`) + case "subscription": + payloadId = getValueForKey(respMessage, `"subscriptionId"`) + default: return respMessage + } + cacheIndex := getDcCacheIndex(payloadId) +utils.Info.Printf("checkForDecompression:getValueForKey(respMessage, `action`)=%s, payloadId=%s, cacheIndex=%d", getValueForKey(respMessage, `"action"`), payloadId, cacheIndex) + if cacheIndex == -1 { + return respMessage + } + if isUnsubscribe { + resetDcCache(cacheIndex) + return respMessage + } + switch dataCompressionCache[cacheIndex].ResponseHandling { + case 1: + dataCompressionCache[cacheIndex].SortedList = getSortedPaths(respMessage) + respMessage = compressPaths(respMessage, dataCompressionCache[cacheIndex].SortedList) + resetDcCache(cacheIndex) + case 2: return respMessage + case 3: + dataCompressionCache[cacheIndex].SortedList = getSortedPaths(respMessage) + respMessage = compressPaths(respMessage, dataCompressionCache[cacheIndex].SortedList) + case 4: + dataCompressionCache[cacheIndex].ResponseHandling = 3 + default: return respMessage + } + return respMessage +} + +func getSortedPaths(respMessage string) []string { + respMap := make(map[string]interface{}) + if utils.MapRequest(respMessage, &respMap) != 0 { + utils.Error.Printf("getSortedPaths():invalid JSON format=%s", respMessage) + return nil + } + var paths []string + switch data := respMap["data"].(type) { + case interface{}: + utils.Info.Println(data, "is interface{}") + for k, v := range data.(map[string]interface{}) { + if k == "path" { + paths = append(paths, v.(string)) + } + } + case []interface{}: + utils.Info.Println(data, "is []interface{}") + for i := 0; i < len(data); i++ { + for k, v := range data[i].(map[string]interface{}) { + if k == "path" { + paths = append(paths, v.(string)) + } + } + } + default: + utils.Info.Println(data, "is of an unknown type") + } + sort.Strings(paths) + return paths +} + +func compressPaths(respMessage string, sortedList []string) string { + for i := 0; i < len(sortedList); i++ { + respMessage = strings.Replace(respMessage, sortedList[i], strconv.Itoa(i), 1) + } + return respMessage +} + func WsMgrInit(mgrId int, transportMgrChan chan string) { + var reqMessage string + var clientId int utils.ReadTransportSecConfig() initChannels() - - go utils.WsServer{ClientBackendChannel: clientBackendChan}.InitClientServer(utils.MuxServer[1], wsClientChan, mgrId, &wsClientIndex) // go routine needed due to listenAndServe call... - + initDcCache() + go utils.WsServer{ClientBackendChannel: clientBackendChan}.InitClientServer(utils.MuxServer[1], wsClientChan, mgrId, &wsClientIndex) utils.Info.Println("WS manager data session initiated.") for { select { case respMessage := <-transportMgrChan: utils.Info.Printf("WS mgr hub: Response from server core:%s", respMessage) + respMessage = checkForDecompression(respMessage) RemoveRoutingForwardResponse(respMessage, transportMgrChan) - case reqMessage := <-wsClientChan[0]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 0, transportMgrChan) - case reqMessage := <-wsClientChan[1]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 1, transportMgrChan) - case reqMessage := <-wsClientChan[2]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 2, transportMgrChan) - case reqMessage := <-wsClientChan[3]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 3, transportMgrChan) - case reqMessage := <-wsClientChan[4]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 4, transportMgrChan) - case reqMessage := <-wsClientChan[5]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 5, transportMgrChan) - case reqMessage := <-wsClientChan[6]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 6, transportMgrChan) - case reqMessage := <-wsClientChan[7]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 7, transportMgrChan) - case reqMessage := <-wsClientChan[8]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 8, transportMgrChan) - case reqMessage := <-wsClientChan[9]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 9, transportMgrChan) - case reqMessage := <-wsClientChan[10]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 10, transportMgrChan) - case reqMessage := <-wsClientChan[11]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 11, transportMgrChan) - case reqMessage := <-wsClientChan[12]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 12, transportMgrChan) - case reqMessage := <-wsClientChan[13]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 13, transportMgrChan) - case reqMessage := <-wsClientChan[14]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 14, transportMgrChan) - case reqMessage := <-wsClientChan[15]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 15, transportMgrChan) - case reqMessage := <-wsClientChan[16]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 16, transportMgrChan) - case reqMessage := <-wsClientChan[17]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 17, transportMgrChan) - case reqMessage := <-wsClientChan[18]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 18, transportMgrChan) - case reqMessage := <-wsClientChan[19]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 19, transportMgrChan) + continue + case reqMessage = <-wsClientChan[0]: clientId = 0 + case reqMessage = <-wsClientChan[1]: clientId = 1 + case reqMessage = <-wsClientChan[2]: clientId = 2 + case reqMessage = <-wsClientChan[3]: clientId = 3 + case reqMessage = <-wsClientChan[4]: clientId = 4 + case reqMessage = <-wsClientChan[5]: clientId = 5 + case reqMessage = <-wsClientChan[6]: clientId = 6 + case reqMessage = <-wsClientChan[7]: clientId = 7 + case reqMessage = <-wsClientChan[8]: clientId = 8 + case reqMessage = <-wsClientChan[9]: clientId = 9 + case reqMessage = <-wsClientChan[10]: clientId = 10 + case reqMessage = <-wsClientChan[11]: clientId = 11 + case reqMessage = <-wsClientChan[12]: clientId = 12 + case reqMessage = <-wsClientChan[13]: clientId = 13 + case reqMessage = <-wsClientChan[14]: clientId = 14 + case reqMessage = <-wsClientChan[15]: clientId = 15 + case reqMessage = <-wsClientChan[16]: clientId = 16 + case reqMessage = <-wsClientChan[17]: clientId = 17 + case reqMessage = <-wsClientChan[18]: clientId = 18 + case reqMessage = <-wsClientChan[19]: clientId = 19 } + checkForCompression(reqMessage) + utils.AddRoutingForwardRequest(reqMessage, mgrId, clientId, transportMgrChan) } } From d45f4f497e552d285872e10f579bc66a1e37b470 Mon Sep 17 00:00:00 2001 From: Ulf Bjorkengren Date: Thu, 12 Dec 2024 13:34:49 +0100 Subject: [PATCH 2/2] Websocket data compression support Signed-off-by: Ulf Bjorkengren --- .../Javascript/appclient_commands.txt | 5 +- .../client-1.0/compress_client/requests.json | 9 +- client/client-1.0/grpc_client/README.md | 5 +- client/client-1.0/grpc_client/grpc_client.go | 20 +- .../ServerSignalSpecification.vspec | 44 ++++- server/vissv2server/forest/server.binary | Bin 3666 -> 3262 bytes server/vissv2server/wsMgr/wsMgr.go | 187 +++++++++++++++--- utils/grcputils.go | 8 +- 8 files changed, 226 insertions(+), 52 deletions(-) rename {server => resources}/ServerSignalSpecification.vspec (78%) diff --git a/client/client-1.0/Javascript/appclient_commands.txt b/client/client-1.0/Javascript/appclient_commands.txt index 428e7312..4b80a7b3 100644 --- a/client/client-1.0/Javascript/appclient_commands.txt +++ b/client/client-1.0/Javascript/appclient_commands.txt @@ -12,8 +12,8 @@ Get request: {"action":"get","path":"Server.Support.Protocol","requestId":"133"} {"action":"get","path":"Vehicle.Chassis.AxleCount","requestId":"321"} -Get Request with request local data compression: -{"action":"get","path":"Vehicle.Speed", "dc":"2+0","requestId":"232"} +Get Request with data compression: +{"action":"get","path":"Vehicle.Speed", "dc":"2+1","requestId":"232"} Get request for historic data: @@ -43,6 +43,7 @@ Subscribe request: {"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/PassengerSide/IsOpen","filter":{"variant":"range","parameter":[{"logic-op":"gt","boundary":"500"},{"logic-op":"lt","boundary":"510"}]},"requestId":"265"} {"action":"subscribe","path":"Vehicle.Powertrain.Transmission.Speed","filter":{"variant":"curvelog","parameter":{"maxerr":"2","bufsize":"100"}},"requestId":"275"} {"action":"subscribe","path":"Vehicle","filter":[{"variant":"paths","parameter":["CurrentLocation.Latitude", "CurrentLocation.Longitude"]}, {"variant":"curvelog","parameter":{"maxerr":"0.00001","bufsize":"100"}}],"requestId":"285"} +{"action":"subscribe","path":"Vehicle.CurrentLocation","filter":[{"variant":"paths","parameter":["Latitude", "Longitude"]}, {"variant":"timebased","parameter":{"period":"3000"}}], "dc":"2+1","requestId":"286"} Unsubscribe request: {"action":"unsubscribe","subscriptionId":"1","requestId":"240"} diff --git a/client/client-1.0/compress_client/requests.json b/client/client-1.0/compress_client/requests.json index ba62e3c7..6532d522 100755 --- a/client/client-1.0/compress_client/requests.json +++ b/client/client-1.0/compress_client/requests.json @@ -1,3 +1,6 @@ -{"request":[{"action":"get","path":"Vehicle/Cabin/Door/Row1/DriverSide/IsOpen","requestId":"232"},{"action":"get","path":"Vehicle.Acceleration.Longitudinal","requestId":"233"}, -{"action":"get","path":"Vehicle/ADAS","filter":{"type":"paths","parameter":["ABS/*","CruiseControl/Error"]},"requestId":"237"}, -{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/PassengerSide/IsOpen","filter":{"type":"timebased","parameter":{"period":"3"}},"requestId":"246"}]} +{"request":[ +{"action":"get","path":"Vehicle/Cabin/Door/Row1/DriverSide/IsOpen","requestId":"232"},{"action":"get","path":"Vehicle.Acceleration.Longitudinal","requestId":"233"}, +{"action":"get","path":"Vehicle/ADAS","filter":{"variant":"paths","parameter":["ABS/*","CruiseControl/Error"]},"requestId":"237"}, +{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/PassengerSide/IsOpen","filter":{"variant":"timebased","parameter":{"period":"3"}},"requestId":"246"}, +{"action":"subscribe","path":"Vehicle.CurrentLocation","filter":[{"variant":"paths","parameter":["Latitude", "Longitude"]}, {"variant":"timebased","parameter":{"period":"3000"}}], "dc":"2+1","requestId":"286"} +]} diff --git a/client/client-1.0/grpc_client/README.md b/client/client-1.0/grpc_client/README.md index a699498e..5ce5781d 100644 --- a/client/client-1.0/grpc_client/README.md +++ b/client/client-1.0/grpc_client/README.md @@ -11,13 +11,14 @@ To run: ./grpc_client -The gRPC client UI provides a choice of four different request that can be issued: +The gRPC client UI provides a choice of four different request that can be issued, e.g.: ``` {"action":"get","path":"Vehicle/Chassis/Accelerator/PedalPosition","requestId":"232"} -{"action":"subscribe","path":"Vehicle/Speed","filter":{"type":"timebased","parameter":{"period":"100"}},"requestId":"246"} +{"action":"subscribe","path":"Vehicle/Speed","filter":{"varian":"timebased","parameter":{"period":"100"}},"requestId":"246"} {"action":"unsubscribe","subscriptionId":"1","requestId":"240"} {"action":"set", "path":"Vehicle/Body/Lights/IsLeftIndicatorOn", "value":"999", "requestId":"245"} ``` +The commands can be changed in the source code, in the parameter commandList, followed by rebuilding. These can be issued multiple times, but there is a limitation in that the unsubscribe has a static subscriptionID that only applies to the first started subscription. diff --git a/client/client-1.0/grpc_client/grpc_client.go b/client/client-1.0/grpc_client/grpc_client.go index 39a0c9c9..00642a2d 100644 --- a/client/client-1.0/grpc_client/grpc_client.go +++ b/client/client-1.0/grpc_client/grpc_client.go @@ -41,20 +41,20 @@ func initCommandList() { commandList[0] = `{"action":"get","path":"Vehicle/Speed","requestId":"232"}` commandList[1] = `{"action":"set", "path":"Vehicle/Body/Lights/IsLeftIndicatorOn", "value":"true", "requestId":"245"}` - commandList[2] = `{"action":"subscribe","path":"Vehicle.Speed","filter":{"type":"curvelog","parameter":{"maxerr":"2","bufsize":"15"}},"requestId":"285"}` -/* commandList[2] = `{"action":"subscribe","path":"Vehicle","filter":[{"type":"paths","parameter":["Speed", "Chassis.Accelerator.PedalPosition"]},{"type":"timebased","parameter": {"period":"5000"}}],"requestId":"246"}`*/ - commandList[3] = `{"action":"unsubscribe","subscriptionId":"X","requestId":"240"}` // X is replaced according to input + commandList[2] = `{"action":"subscribe","path":"Vehicle.CurrentLocation","filter":[{"variant":"paths","parameter":["Latitude", "Longitude"]}, {"variant":"timebased","parameter":{"period":"3000"}}], "dc":"2+1","requestId":"286"}` + commandList[3] = `{"action":"unsubscribe","subscriptionId":"X","requestId":"240"}` // X is replaced according to input, e.g. 23 sets X=2 /* different variants - commandList[2] = `{"action":"subscribe","path":"Vehicle","filter":[{"type":"paths","parameter":["Speed","CurrentLocation.Latitude", "CurrentLocation.Longitude"]}, {"type":"timebased","parameter":{"period":"100"}}],"requestId":"285"}` - commandList[1] = `{"action":"subscribe","path":"Vehicle/Speed","filter":{"type":"timebased","parameter":{"period":"100"}},"requestId":"246"}` + commandList[2] = `{"action":"subscribe","path":"Vehicle.Speed","filter":{"variant":"curvelog","parameter":{"maxerr":"2","bufsize":"15"}},"requestId":"285"}` + commandList[2] = `{"action":"subscribe","path":"Vehicle","filter":[{"variant":"paths","parameter":["Speed","CurrentLocation.Latitude", "CurrentLocation.Longitude"]}, {"variant":"timebased","parameter":{"period":"100"}}],"requestId":"285"}` + commandList[1] = `{"action":"subscribe","path":"Vehicle/Speed","filter":{"variant":"timebased","parameter":{"period":"100"}},"requestId":"246"}` commandList[0] = `{"action":"get","path":"Vehicle/Cabin/Door/Row1/Right/IsOpen","requestId":"232"}` - commandList[0] = `{"action":"get","path":"Vehicle/Cabin/Door","filter":{"type":"paths","parameter":"*.*.IsOpen"},"requestId":"235"}` - commandList[1] = `{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/Right/IsOpen","filter":{"type":"timebased","parameter":{"period":"3000"}},"requestId":"246"}` - commandList[1] = `{"action":"subscribe","path":"Vehicle","filter":{"type":"paths","parameter":["Speed", "Chassis.Accelerator.PedalPosition"]},"requestId":"246"}` + commandList[0] = `{"action":"get","path":"Vehicle/Cabin/Door","filter":{"variant":"paths","parameter":"*.*.IsOpen"},"requestId":"235"}` + commandList[1] = `{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/Right/IsOpen","filter":{"variant":"timebased","parameter":{"period":"3000"}},"requestId":"246"}` + commandList[1] = `{"action":"subscribe","path":"Vehicle","filter":{"variant":"paths","parameter":["Speed", "Chassis.Accelerator.PedalPosition"]},"requestId":"246"}` commandList[1] = `{"action":"subscribe","path":"Vehicle/Speed","requestId":"258"}` - commandList[1] = `{"action":"subscribe","path":"Vehicle","filter":[{"type":"paths","parameter":["Body.Lights.IsLeftIndicatorOn", "Chassis.Accelerator.PedalPosition"]}, {"type":"change","parameter":{"logic-op":"ne", "diff": "0"}}],"requestId":"285"}` - commandList[1] = {"action":"subscribe","path":"Vehicle","filter":{"type":"paths","parameter":["Speed", "Chassis.Accelerator.PedalPosition"]},"requestId":"246"}` + commandList[1] = `{"action":"subscribe","path":"Vehicle","filter":[{"variant":"paths","parameter":["Body.Lights.IsLeftIndicatorOn", "Chassis.Accelerator.PedalPosition"]}, {"variant":"change","parameter":{"logic-op":"ne", "diff": "0"}}],"requestId":"285"}` + commandList[1] = {"action":"subscribe","path":"Vehicle","filter":{"variant":"paths","parameter":["Speed", "Chassis.Accelerator.PedalPosition"]},"requestId":"246"}` */ } diff --git a/server/ServerSignalSpecification.vspec b/resources/ServerSignalSpecification.vspec similarity index 78% rename from server/ServerSignalSpecification.vspec rename to resources/ServerSignalSpecification.vspec index 994aa59a..5eabf9d0 100644 --- a/server/ServerSignalSpecification.vspec +++ b/resources/ServerSignalSpecification.vspec @@ -35,9 +35,27 @@ Server.Support.Security: Server.Support.Filter: type: attribute datatype: string[] - default: ["timebased", "change", "range", "curvelog", "paths"] + default: ["timebased", "change", "range", "curvelog", "paths", "metadata"] description: List of supported filter features. +Server.Support.Encoding: + type: attribute + datatype: string[] + default: ["protobuf"] + description: List of supported payload encoding features. + +Server.Support.Filetransfer: + type: attribute + datatype: string[] + default: ["upload", "download"] + description: List of supported file transfer features. + +Server.Support.DataCompression: + type: attribute + datatype: string[] + default: ["2+1"] + description: List of supported data compression features. + Server.Config: type: branch description: Top branch declaring the configuration of server supported features. @@ -64,6 +82,22 @@ Server.Config.Protocol.Websocket: type: branch description: Top branch for the server supported Websocket protocol. +Server.Config.Protocol.Websocket.FileTransfer: + type: branch + description: Websocket filetransfer. + +Server.Config.Protocol.Websocket.FileTransfer.Mode: + type: attribute + datatype: string[] + default: ["upload", "download"] + description: Websocket protocol port number for the primary payload format. + +Server.Config.Protocol.Websocket.FileTransfer.PortNum: + type: attribute + datatype: uint32 + default: 8002 + description: Websocket protocol port number for the primary payload format. + Server.Config.Protocol.Websocket.Primary: type: branch description: Websocket configuration for the primary payload format. @@ -78,10 +112,11 @@ Server.Config.Protocol.Websocket.Protobuf: type: branch description: Websocket configuration for the protobuf encoded payload format. -Server.Config.Protocol.Websocket.Protobuf.PortNum: +Server.Config.Protocol.Websocket.Protobuf.SubProtocol: type: attribute - datatype: uint32 - description: Websocket protocol port number for the protobuf encoded payload format. + datatype: string + default: "VISS-protoenc" + description: Websocket sub-protocol for the protobuf encoded payload format. Server.Config.Protocol.Mqtt: type: branch @@ -122,6 +157,7 @@ Server.Config.Protocol.Grpc.Protobuf: Server.Config.Protocol.Grpc.Protobuf.PortNum: type: attribute datatype: uint32 + default: 5443 description: gRPC port number for the protobuf encoded payload format. Server.Config.AccessControl: diff --git a/server/vissv2server/forest/server.binary b/server/vissv2server/forest/server.binary index 3a0f75e48934aae2d906c2de39aa6165bcc580d6..cd987c23177316132b5215e51527a9e20ce98c56 100644 GIT binary patch delta 834 zcmZ`%O>YuW6n!%cGYlUsO)a!k48+*6wE|+1rbG=XwW*3?Kuv>2=LXj>Y%tre%^R!1)x zJ)M9$yQ5n^hGh_t2MZd8%7HzrFy+9Sl@G)=|Fvnd7e97HpQJJ0# zpP(DT#ht*&MQh^1V@aahx?*GXfle|VpUWJY9G0tNP4{WNK!5Wq^c^=>3E8j|v)cx3 zaQ^FCXkd$6bNYvHWaqi{LZOI|(6EfYY#n&A%JeCJoYuJli@bhWy3HrS))W3pKGJlQ zL*B+kpBdXxPLFm7&xd{%OX=9Hon~|V33iPTSAR%|*AE7_MBfO@0-8bf8?6@o6#aO28+9QKJxuXjfNtG7A&ExdyTj z6hUznLhls(4#BOsck35W$QN+yMy(;tS@hyiob!MDs(*X-kLL~+AL#0?t`@wix7y9A zHP!}{L8w5)gpyKGN4~fQsQTckOM6pJBAjbM7XUP;>XB+FRk(x}I>zBP&pqgS~p|v77&o$;0z-MFuN~~r4w|HUP z-gLQR~L!c=(jpU?QkmKLQ zyT{+#n}j2ChHRmmij=5pC(Xo3k89C4$4`rw$G4}>k6-7zem0xUuY9pypNJ1poE{h0 zVlV{}eQZtHyjLX-{c+xZpL)q80UJz>5Urf<7=av;GpH+Msx;1RflV2poVi$p2*=V) zRA*O`2BR^jakzD5Ii(Oto1qpZLk|rdSPDU%lB43=_<8GaZ)#N=7w}?@t2aNYd2RnBIEdH_qzSnU_+5kWPde?QG%tK4Megu iy&oT)J}?I%MY6t7?BWv^vURuI)iU(X*;Vu1Y5X4oseF?F diff --git a/server/vissv2server/wsMgr/wsMgr.go b/server/vissv2server/wsMgr/wsMgr.go index 38311a96..6688a816 100644 --- a/server/vissv2server/wsMgr/wsMgr.go +++ b/server/vissv2server/wsMgr/wsMgr.go @@ -1,4 +1,5 @@ /** +* (C) 2024 Ford Motor Company * (C) 2022 Geotab Inc * (C) 2019 Volvo Cars * @@ -13,6 +14,7 @@ import ( "strings" "strconv" "sort" + "time" ) // the number of channel array elements sets the limit for max number of parallel WS app clients @@ -30,9 +32,14 @@ const isClientLocal = false 3. compress path, do not delete cache entry (subscribe, single path, but also for multiple paths after first time) 4. do not compress path, do not delete cache entry (subscribe on multiple paths) <- This is changed to 3 after first time */ +type CompressionType struct { + Pc uint8 + Tsc uint8 +} + type DataCompressionElement struct { PayloadId string - Dc string + Dc CompressionType ResponseHandling int //possible values 1..4, see description SortedList []string } @@ -56,10 +63,9 @@ func RemoveRoutingForwardResponse(response string, transportMgrChan chan string) } } -func checkForCompression(reqMessage string) { +func checkCompressionRequest(reqMessage string) { if strings.Contains(reqMessage, `"dc"`) { dcValue, payloadId, singleResponse, singlePath := getDcConfig(reqMessage) -utils.Info.Printf("checkForCompression:dcValue=%s, payloadId=%s, singleResponse=%d, singlePath=%d", dcValue, payloadId, singleResponse, singlePath) if len(dcValue) > 0 { responseHandling := 1 // singleResponse && singlePath if singleResponse && !singlePath { @@ -81,11 +87,7 @@ func getDcConfig(reqMessage string) (string, string, bool, bool) { dcValue = getValueForKey(reqMessage, `"dc"`) isGet = strings.Contains(reqMessage, `"get"`) singlePath = !strings.Contains(reqMessage, `"paths"`) - if isGet { - payloadId = getValueForKey(reqMessage, `"requestId"`) - } else { - payloadId = getValueForKey(reqMessage, `"subscriptionId"`) - } + payloadId = getValueForKey(reqMessage, `"requestId"`) return dcValue, payloadId, isGet, singlePath } @@ -112,9 +114,36 @@ func initDcCache() { func dcCacheInsert(payloadId string, dcValue string, responseHandling int) { for i := 0; i < DCCACHESIZE; i++ { if dataCompressionCache[i].ResponseHandling == -1 { - dataCompressionCache[i].ResponseHandling = responseHandling - dataCompressionCache[i].PayloadId = payloadId - dataCompressionCache[i].Dc = dcValue + if setDcValue(dcValue, i) { + dataCompressionCache[i].ResponseHandling = responseHandling + dataCompressionCache[i].PayloadId = payloadId + } + return + } + } +} + +func setDcValue(dcValue string, cacheIndex int) bool { + isCached := false + plusIndex := strings.Index(dcValue, "+") + if plusIndex != -1 { + pc, err := strconv.Atoi(dcValue[:plusIndex]) + if err == nil && (pc == 2 || pc == 0) { // only request local compression is supported + dataCompressionCache[cacheIndex].Dc.Pc = uint8(pc) + tsc, err := strconv.Atoi(dcValue[plusIndex+1:]) + if err == nil && (tsc == 1 || tsc == 0) { // message local ts compression supported + dataCompressionCache[cacheIndex].Dc.Tsc = uint8(tsc) + isCached = true + } + } + } + return isCached +} + +func updatepayloadId(payloadId1 string, payloadId2 string) { + for i := 0; i < DCCACHESIZE; i++ { + if dataCompressionCache[i].PayloadId == payloadId1 { + dataCompressionCache[i].PayloadId = payloadId2 } } } @@ -133,7 +162,7 @@ func resetDcCache(cacheIndex int) { dataCompressionCache[cacheIndex].SortedList = nil } -func checkForDecompression(respMessage string) string { +func checkCompressionResponse(respMessage string) string { var payloadId string isUnsubscribe := false if strings.Contains(respMessage, `"error"`) { @@ -145,12 +174,16 @@ func checkForDecompression(respMessage string) string { fallthrough case "get": payloadId = getValueForKey(respMessage, `"requestId"`) + case "subscribe": + payloadId1 := getValueForKey(respMessage, `"requestId"`) + payloadId2 := getValueForKey(respMessage, `"subscriptionId"`) + updatepayloadId(payloadId1, payloadId2) + case "subscription": payloadId = getValueForKey(respMessage, `"subscriptionId"`) default: return respMessage } cacheIndex := getDcCacheIndex(payloadId) -utils.Info.Printf("checkForDecompression:getValueForKey(respMessage, `action`)=%s, payloadId=%s, cacheIndex=%d", getValueForKey(respMessage, `"action"`), payloadId, cacheIndex) if cacheIndex == -1 { return respMessage } @@ -160,16 +193,28 @@ utils.Info.Printf("checkForDecompression:getValueForKey(respMessage, `action`)=% } switch dataCompressionCache[cacheIndex].ResponseHandling { case 1: - dataCompressionCache[cacheIndex].SortedList = getSortedPaths(respMessage) - respMessage = compressPaths(respMessage, dataCompressionCache[cacheIndex].SortedList) + if dataCompressionCache[cacheIndex].Dc.Pc == 2 { + dataCompressionCache[cacheIndex].SortedList = getSortedPaths(respMessage) + respMessage = compressPaths(respMessage, dataCompressionCache[cacheIndex].SortedList) + } + if dataCompressionCache[cacheIndex].Dc.Tsc == 1 { + respMessage = compressTs(respMessage) + } resetDcCache(cacheIndex) case 2: return respMessage case 3: - dataCompressionCache[cacheIndex].SortedList = getSortedPaths(respMessage) - respMessage = compressPaths(respMessage, dataCompressionCache[cacheIndex].SortedList) + if dataCompressionCache[cacheIndex].Dc.Pc == 2 { + respMessage = compressPaths(respMessage, dataCompressionCache[cacheIndex].SortedList) + } + if dataCompressionCache[cacheIndex].Dc.Tsc == 1 { + respMessage = compressTs(respMessage) + } case 4: + dataCompressionCache[cacheIndex].SortedList = getSortedPaths(respMessage) dataCompressionCache[cacheIndex].ResponseHandling = 3 - default: return respMessage + if dataCompressionCache[cacheIndex].Dc.Tsc == 1 { + respMessage = compressTs(respMessage) + } } return respMessage } @@ -181,28 +226,116 @@ func getSortedPaths(respMessage string) []string { return nil } var paths []string - switch data := respMap["data"].(type) { + dataIf := respMap["data"] + switch data := dataIf.(type) { + case []interface{}: +// utils.Info.Println(data, "is []interface{}") + for i := 0; i < len(data); i++ { + for k, v := range data[i].(map[string]interface{}) { + if k == "path" { + paths = append(paths, v.(string)) + } + } + } case interface{}: - utils.Info.Println(data, "is interface{}") +// utils.Info.Println(data, "is interface{}") for k, v := range data.(map[string]interface{}) { if k == "path" { paths = append(paths, v.(string)) } } + default: + utils.Info.Println(data, "is of an unknown type") + } + sort.Strings(paths) + return paths +} + +func compressTs(respMessage string) string { +utils.Info.Printf("compressTs()") + respMap := make(map[string]interface{}) + if utils.MapRequest(respMessage, &respMap) != 0 { + utils.Error.Printf("compressTs():invalid JSON format=%s", respMessage) + return respMessage + } + var tsList []string + messageTs := respMap["ts"].(string) + dataIf := respMap["data"] + switch data := dataIf.(type) { case []interface{}: - utils.Info.Println(data, "is []interface{}") +// utils.Info.Println(data, "is []interface{}") for i := 0; i < len(data); i++ { for k, v := range data[i].(map[string]interface{}) { - if k == "path" { - paths = append(paths, v.(string)) + if k == "dp" { + tsList = append(tsList, getDpTsList(v)...) } } } + case interface{}: +// utils.Info.Println(data, "is interface{}") + for k, v := range data.(map[string]interface{}) { + if k == "dp" { + tsList = getDpTsList(v) + } + } default: utils.Info.Println(data, "is of an unknown type") } - sort.Strings(paths) - return paths + respMessage = replaceTs(respMessage, messageTs, tsList) + return respMessage +} + +func getDpTsList(dpMap interface{}) []string { + var tsList []string + switch dp := dpMap.(type) { + case []interface{}: +// utils.Info.Println(dp, "is []interface{}") + for i := 0; i < len(dp); i++ { + for k, v := range dp[i].(map[string]interface{}) { + if k == "ts" { + tsList = append(tsList, v.(string)) + } + } + } + case interface{}: +// utils.Info.Println(dp, "is interface{}") + for k, v := range dp.(map[string]interface{}) { + if k == "ts" { + tsList = append(tsList, v.(string)) + } + } + default: + utils.Info.Println(dp, "is of an unknown type") + } + return tsList +} + +func replaceTs(respMessage string, messageTs string, tsList []string) string { + tsRef, _ := time.Parse(time.RFC3339, messageTs) + refMs := tsRef.UnixMilli() + for i := 0; i < len(tsList); i++ { + tsDp, _ := time.Parse(time.RFC3339, tsList[i]) + dpMs := tsDp.UnixMilli() + diffMs := refMs - dpMs + if diffMs > 999999999 || diffMs < -999999999 { + continue // keep iso time + } + if diffMs == 0 { // replace 2nd instance + firstTsIndex := strings.Index(respMessage, tsList[i]) + len(tsList[i]) + 1 + respMessage = respMessage[:firstTsIndex] + strings.Replace(respMessage[firstTsIndex:], tsList[i], timeDiff(int(diffMs)), 1) + } else { + respMessage = strings.Replace(respMessage, tsList[i], timeDiff(int(diffMs)), 1) + } + } + return respMessage +} + +func timeDiff(diffMs int) string { + if diffMs > 0 { + return "-" + strconv.Itoa(diffMs) + } else { + return "+" + strconv.Itoa(-diffMs) + } } func compressPaths(respMessage string, sortedList []string) string { @@ -225,7 +358,7 @@ func WsMgrInit(mgrId int, transportMgrChan chan string) { select { case respMessage := <-transportMgrChan: utils.Info.Printf("WS mgr hub: Response from server core:%s", respMessage) - respMessage = checkForDecompression(respMessage) + respMessage = checkCompressionResponse(respMessage) RemoveRoutingForwardResponse(respMessage, transportMgrChan) continue case reqMessage = <-wsClientChan[0]: clientId = 0 @@ -249,7 +382,7 @@ func WsMgrInit(mgrId int, transportMgrChan chan string) { case reqMessage = <-wsClientChan[18]: clientId = 18 case reqMessage = <-wsClientChan[19]: clientId = 19 } - checkForCompression(reqMessage) + checkCompressionRequest(reqMessage) utils.AddRoutingForwardRequest(reqMessage, mgrId, clientId, transportMgrChan) } } diff --git a/utils/grcputils.go b/utils/grcputils.go index 682c2e46..561b8a1f 100644 --- a/utils/grcputils.go +++ b/utils/grcputils.go @@ -335,9 +335,9 @@ func createPbFilter(index int, filterExpression map[string]interface{}, filter * filter.FilterExp[index].Value.ValueHistory = &pb.FilterExpressions_FilterExpression_FilterValue_HistoryValue{} filter.FilterExp[index].Value.ValueHistory.TimePeriod = filterExpression["parameter"].(string) case pb.FilterExpressions_FilterExpression_METADATA: - Warning.Printf("Filter type is not supported by protobuf encoding.") + Warning.Printf("Filter variant is not supported by protobuf encoding.") default: - Error.Printf("Filter type is unknown.") + Error.Printf("Filter variant is unknown.") } } @@ -422,7 +422,7 @@ func getFilterVariant(filterVariant string) pb.FilterExpressions_FilterExpressio case "metadata": return pb.FilterExpressions_FilterExpression_METADATA } - return pb.FilterExpressions_FilterExpression_METADATA + 100 //undefined filter type + return pb.FilterExpressions_FilterExpression_METADATA + 100 //undefined filter variant } func createSubscribeRequestPb(protoMessage *pb.SubscribeRequestMessage, messageMap map[string]interface{}) { @@ -694,7 +694,7 @@ func synthesizeFilter(filterExp *pb.FilterExpressions_FilterExpression) string { fType = "metadata" value = getJsonFilterValueMetadata(filterExp) } - return `{"type":"` + fType + `","parameter":` + value + `}` + return `{"variant":"` + fType + `","parameter":` + value + `}` } func getJsonFilterValuePaths(filterExp *pb.FilterExpressions_FilterExpression) string {