diff --git a/client/client-1.0/Javascript/appclient_commands.txt b/client/client-1.0/Javascript/appclient_commands.txt index c6d59f32..4b80a7b3 100644 --- a/client/client-1.0/Javascript/appclient_commands.txt +++ b/client/client-1.0/Javascript/appclient_commands.txt @@ -12,6 +12,9 @@ Get request: {"action":"get","path":"Server.Support.Protocol","requestId":"133"} {"action":"get","path":"Vehicle.Chassis.AxleCount","requestId":"321"} +Get Request with data compression: +{"action":"get","path":"Vehicle.Speed", "dc":"2+1","requestId":"232"} + Get request for historic data: {"action":"get","path":"Vehicle.Acceleration.Longitudinal","filter":{"variant":"history","parameter":"P2DT12H"},"requestId":"234"} @@ -40,6 +43,7 @@ Subscribe request: {"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/PassengerSide/IsOpen","filter":{"variant":"range","parameter":[{"logic-op":"gt","boundary":"500"},{"logic-op":"lt","boundary":"510"}]},"requestId":"265"} {"action":"subscribe","path":"Vehicle.Powertrain.Transmission.Speed","filter":{"variant":"curvelog","parameter":{"maxerr":"2","bufsize":"100"}},"requestId":"275"} {"action":"subscribe","path":"Vehicle","filter":[{"variant":"paths","parameter":["CurrentLocation.Latitude", "CurrentLocation.Longitude"]}, {"variant":"curvelog","parameter":{"maxerr":"0.00001","bufsize":"100"}}],"requestId":"285"} +{"action":"subscribe","path":"Vehicle.CurrentLocation","filter":[{"variant":"paths","parameter":["Latitude", "Longitude"]}, {"variant":"timebased","parameter":{"period":"3000"}}], "dc":"2+1","requestId":"286"} Unsubscribe request: {"action":"unsubscribe","subscriptionId":"1","requestId":"240"} diff --git a/client/client-1.0/compress_client/requests.json b/client/client-1.0/compress_client/requests.json index ba62e3c7..6532d522 100755 --- a/client/client-1.0/compress_client/requests.json +++ b/client/client-1.0/compress_client/requests.json @@ -1,3 +1,6 @@ -{"request":[{"action":"get","path":"Vehicle/Cabin/Door/Row1/DriverSide/IsOpen","requestId":"232"},{"action":"get","path":"Vehicle.Acceleration.Longitudinal","requestId":"233"}, -{"action":"get","path":"Vehicle/ADAS","filter":{"type":"paths","parameter":["ABS/*","CruiseControl/Error"]},"requestId":"237"}, -{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/PassengerSide/IsOpen","filter":{"type":"timebased","parameter":{"period":"3"}},"requestId":"246"}]} +{"request":[ +{"action":"get","path":"Vehicle/Cabin/Door/Row1/DriverSide/IsOpen","requestId":"232"},{"action":"get","path":"Vehicle.Acceleration.Longitudinal","requestId":"233"}, +{"action":"get","path":"Vehicle/ADAS","filter":{"variant":"paths","parameter":["ABS/*","CruiseControl/Error"]},"requestId":"237"}, +{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/PassengerSide/IsOpen","filter":{"variant":"timebased","parameter":{"period":"3"}},"requestId":"246"}, +{"action":"subscribe","path":"Vehicle.CurrentLocation","filter":[{"variant":"paths","parameter":["Latitude", "Longitude"]}, {"variant":"timebased","parameter":{"period":"3000"}}], "dc":"2+1","requestId":"286"} +]} diff --git a/client/client-1.0/grpc_client/README.md b/client/client-1.0/grpc_client/README.md index a699498e..5ce5781d 100644 --- a/client/client-1.0/grpc_client/README.md +++ b/client/client-1.0/grpc_client/README.md @@ -11,13 +11,14 @@ To run: ./grpc_client -The gRPC client UI provides a choice of four different request that can be issued: +The gRPC client UI provides a choice of four different request that can be issued, e.g.: ``` {"action":"get","path":"Vehicle/Chassis/Accelerator/PedalPosition","requestId":"232"} -{"action":"subscribe","path":"Vehicle/Speed","filter":{"type":"timebased","parameter":{"period":"100"}},"requestId":"246"} +{"action":"subscribe","path":"Vehicle/Speed","filter":{"varian":"timebased","parameter":{"period":"100"}},"requestId":"246"} {"action":"unsubscribe","subscriptionId":"1","requestId":"240"} {"action":"set", "path":"Vehicle/Body/Lights/IsLeftIndicatorOn", "value":"999", "requestId":"245"} ``` +The commands can be changed in the source code, in the parameter commandList, followed by rebuilding. These can be issued multiple times, but there is a limitation in that the unsubscribe has a static subscriptionID that only applies to the first started subscription. diff --git a/client/client-1.0/grpc_client/grpc_client.go b/client/client-1.0/grpc_client/grpc_client.go index 39a0c9c9..00642a2d 100644 --- a/client/client-1.0/grpc_client/grpc_client.go +++ b/client/client-1.0/grpc_client/grpc_client.go @@ -41,20 +41,20 @@ func initCommandList() { commandList[0] = `{"action":"get","path":"Vehicle/Speed","requestId":"232"}` commandList[1] = `{"action":"set", "path":"Vehicle/Body/Lights/IsLeftIndicatorOn", "value":"true", "requestId":"245"}` - commandList[2] = `{"action":"subscribe","path":"Vehicle.Speed","filter":{"type":"curvelog","parameter":{"maxerr":"2","bufsize":"15"}},"requestId":"285"}` -/* commandList[2] = `{"action":"subscribe","path":"Vehicle","filter":[{"type":"paths","parameter":["Speed", "Chassis.Accelerator.PedalPosition"]},{"type":"timebased","parameter": {"period":"5000"}}],"requestId":"246"}`*/ - commandList[3] = `{"action":"unsubscribe","subscriptionId":"X","requestId":"240"}` // X is replaced according to input + commandList[2] = `{"action":"subscribe","path":"Vehicle.CurrentLocation","filter":[{"variant":"paths","parameter":["Latitude", "Longitude"]}, {"variant":"timebased","parameter":{"period":"3000"}}], "dc":"2+1","requestId":"286"}` + commandList[3] = `{"action":"unsubscribe","subscriptionId":"X","requestId":"240"}` // X is replaced according to input, e.g. 23 sets X=2 /* different variants - commandList[2] = `{"action":"subscribe","path":"Vehicle","filter":[{"type":"paths","parameter":["Speed","CurrentLocation.Latitude", "CurrentLocation.Longitude"]}, {"type":"timebased","parameter":{"period":"100"}}],"requestId":"285"}` - commandList[1] = `{"action":"subscribe","path":"Vehicle/Speed","filter":{"type":"timebased","parameter":{"period":"100"}},"requestId":"246"}` + commandList[2] = `{"action":"subscribe","path":"Vehicle.Speed","filter":{"variant":"curvelog","parameter":{"maxerr":"2","bufsize":"15"}},"requestId":"285"}` + commandList[2] = `{"action":"subscribe","path":"Vehicle","filter":[{"variant":"paths","parameter":["Speed","CurrentLocation.Latitude", "CurrentLocation.Longitude"]}, {"variant":"timebased","parameter":{"period":"100"}}],"requestId":"285"}` + commandList[1] = `{"action":"subscribe","path":"Vehicle/Speed","filter":{"variant":"timebased","parameter":{"period":"100"}},"requestId":"246"}` commandList[0] = `{"action":"get","path":"Vehicle/Cabin/Door/Row1/Right/IsOpen","requestId":"232"}` - commandList[0] = `{"action":"get","path":"Vehicle/Cabin/Door","filter":{"type":"paths","parameter":"*.*.IsOpen"},"requestId":"235"}` - commandList[1] = `{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/Right/IsOpen","filter":{"type":"timebased","parameter":{"period":"3000"}},"requestId":"246"}` - commandList[1] = `{"action":"subscribe","path":"Vehicle","filter":{"type":"paths","parameter":["Speed", "Chassis.Accelerator.PedalPosition"]},"requestId":"246"}` + commandList[0] = `{"action":"get","path":"Vehicle/Cabin/Door","filter":{"variant":"paths","parameter":"*.*.IsOpen"},"requestId":"235"}` + commandList[1] = `{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/Right/IsOpen","filter":{"variant":"timebased","parameter":{"period":"3000"}},"requestId":"246"}` + commandList[1] = `{"action":"subscribe","path":"Vehicle","filter":{"variant":"paths","parameter":["Speed", "Chassis.Accelerator.PedalPosition"]},"requestId":"246"}` commandList[1] = `{"action":"subscribe","path":"Vehicle/Speed","requestId":"258"}` - commandList[1] = `{"action":"subscribe","path":"Vehicle","filter":[{"type":"paths","parameter":["Body.Lights.IsLeftIndicatorOn", "Chassis.Accelerator.PedalPosition"]}, {"type":"change","parameter":{"logic-op":"ne", "diff": "0"}}],"requestId":"285"}` - commandList[1] = {"action":"subscribe","path":"Vehicle","filter":{"type":"paths","parameter":["Speed", "Chassis.Accelerator.PedalPosition"]},"requestId":"246"}` + commandList[1] = `{"action":"subscribe","path":"Vehicle","filter":[{"variant":"paths","parameter":["Body.Lights.IsLeftIndicatorOn", "Chassis.Accelerator.PedalPosition"]}, {"variant":"change","parameter":{"logic-op":"ne", "diff": "0"}}],"requestId":"285"}` + commandList[1] = {"action":"subscribe","path":"Vehicle","filter":{"variant":"paths","parameter":["Speed", "Chassis.Accelerator.PedalPosition"]},"requestId":"246"}` */ } diff --git a/server/ServerSignalSpecification.vspec b/resources/ServerSignalSpecification.vspec similarity index 78% rename from server/ServerSignalSpecification.vspec rename to resources/ServerSignalSpecification.vspec index 994aa59a..5eabf9d0 100644 --- a/server/ServerSignalSpecification.vspec +++ b/resources/ServerSignalSpecification.vspec @@ -35,9 +35,27 @@ Server.Support.Security: Server.Support.Filter: type: attribute datatype: string[] - default: ["timebased", "change", "range", "curvelog", "paths"] + default: ["timebased", "change", "range", "curvelog", "paths", "metadata"] description: List of supported filter features. +Server.Support.Encoding: + type: attribute + datatype: string[] + default: ["protobuf"] + description: List of supported payload encoding features. + +Server.Support.Filetransfer: + type: attribute + datatype: string[] + default: ["upload", "download"] + description: List of supported file transfer features. + +Server.Support.DataCompression: + type: attribute + datatype: string[] + default: ["2+1"] + description: List of supported data compression features. + Server.Config: type: branch description: Top branch declaring the configuration of server supported features. @@ -64,6 +82,22 @@ Server.Config.Protocol.Websocket: type: branch description: Top branch for the server supported Websocket protocol. +Server.Config.Protocol.Websocket.FileTransfer: + type: branch + description: Websocket filetransfer. + +Server.Config.Protocol.Websocket.FileTransfer.Mode: + type: attribute + datatype: string[] + default: ["upload", "download"] + description: Websocket protocol port number for the primary payload format. + +Server.Config.Protocol.Websocket.FileTransfer.PortNum: + type: attribute + datatype: uint32 + default: 8002 + description: Websocket protocol port number for the primary payload format. + Server.Config.Protocol.Websocket.Primary: type: branch description: Websocket configuration for the primary payload format. @@ -78,10 +112,11 @@ Server.Config.Protocol.Websocket.Protobuf: type: branch description: Websocket configuration for the protobuf encoded payload format. -Server.Config.Protocol.Websocket.Protobuf.PortNum: +Server.Config.Protocol.Websocket.Protobuf.SubProtocol: type: attribute - datatype: uint32 - description: Websocket protocol port number for the protobuf encoded payload format. + datatype: string + default: "VISS-protoenc" + description: Websocket sub-protocol for the protobuf encoded payload format. Server.Config.Protocol.Mqtt: type: branch @@ -122,6 +157,7 @@ Server.Config.Protocol.Grpc.Protobuf: Server.Config.Protocol.Grpc.Protobuf.PortNum: type: attribute datatype: uint32 + default: 5443 description: gRPC port number for the protobuf encoded payload format. Server.Config.AccessControl: diff --git a/server/vissv2server/forest/server.binary b/server/vissv2server/forest/server.binary index 3a0f75e4..cd987c23 100644 Binary files a/server/vissv2server/forest/server.binary and b/server/vissv2server/forest/server.binary differ diff --git a/server/vissv2server/wsMgr/wsMgr.go b/server/vissv2server/wsMgr/wsMgr.go index da837df3..6688a816 100644 --- a/server/vissv2server/wsMgr/wsMgr.go +++ b/server/vissv2server/wsMgr/wsMgr.go @@ -1,4 +1,5 @@ /** +* (C) 2024 Ford Motor Company * (C) 2022 Geotab Inc * (C) 2019 Volvo Cars * @@ -11,6 +12,9 @@ package wsMgr import ( utils "github.com/covesa/vissr/utils" "strings" + "strconv" + "sort" + "time" ) // the number of channel array elements sets the limit for max number of parallel WS app clients @@ -21,6 +25,27 @@ var clientBackendChan []chan string var wsClientIndex int const isClientLocal = false +/* +* responseHandling values, instructs server about possible path compression: +1: compress path, delete cache entry (get on single path) +2. do not compress path, delete cache entry (get on multiple paths) <- This shall not be saved in cache. +3. compress path, do not delete cache entry (subscribe, single path, but also for multiple paths after first time) +4. do not compress path, do not delete cache entry (subscribe on multiple paths) <- This is changed to 3 after first time +*/ +type CompressionType struct { + Pc uint8 + Tsc uint8 +} + +type DataCompressionElement struct { + PayloadId string + Dc CompressionType + ResponseHandling int //possible values 1..4, see description + SortedList []string +} +var dataCompressionCache []DataCompressionElement +const DCCACHESIZE = 20 + func initChannels() { wsClientChan = make([]chan string, NUMOFWSCLIENTS) clientBackendChan = make([]chan string, NUMOFWSCLIENTS) @@ -38,59 +63,326 @@ func RemoveRoutingForwardResponse(response string, transportMgrChan chan string) } } +func checkCompressionRequest(reqMessage string) { + if strings.Contains(reqMessage, `"dc"`) { + dcValue, payloadId, singleResponse, singlePath := getDcConfig(reqMessage) + if len(dcValue) > 0 { + responseHandling := 1 // singleResponse && singlePath + if singleResponse && !singlePath { + responseHandling = 2 + } else if !singleResponse && singlePath { + responseHandling = 3 + } else if !singleResponse && !singlePath { + responseHandling = 4 + } + dcCacheInsert(payloadId, dcValue, responseHandling) + } + } +} + +func getDcConfig(reqMessage string) (string, string, bool, bool) { + var dcValue, payloadId string + isGet := false + singlePath := false + dcValue = getValueForKey(reqMessage, `"dc"`) + isGet = strings.Contains(reqMessage, `"get"`) + singlePath = !strings.Contains(reqMessage, `"paths"`) + payloadId = getValueForKey(reqMessage, `"requestId"`) + return dcValue, payloadId, isGet, singlePath +} + +func getValueForKey(reqMessage string, key string) string { + var keyValue string + keyIndex := strings.Index(reqMessage, key) + len(key) + hyphenIndex1 := strings.Index(reqMessage[keyIndex:], `"`) + if hyphenIndex1 != -1 { + hyphenIndex2 := strings.Index(reqMessage[keyIndex+hyphenIndex1+1:], `"`) + if hyphenIndex2 != -1 { + keyValue = reqMessage[keyIndex+hyphenIndex1+1:keyIndex+hyphenIndex1+1+hyphenIndex2] + } + } + return keyValue +} + +func initDcCache() { + dataCompressionCache = make([]DataCompressionElement, DCCACHESIZE) + for i := 0; i < DCCACHESIZE; i++ { + dataCompressionCache[i].ResponseHandling = -1 + } +} + +func dcCacheInsert(payloadId string, dcValue string, responseHandling int) { + for i := 0; i < DCCACHESIZE; i++ { + if dataCompressionCache[i].ResponseHandling == -1 { + if setDcValue(dcValue, i) { + dataCompressionCache[i].ResponseHandling = responseHandling + dataCompressionCache[i].PayloadId = payloadId + } + return + } + } +} + +func setDcValue(dcValue string, cacheIndex int) bool { + isCached := false + plusIndex := strings.Index(dcValue, "+") + if plusIndex != -1 { + pc, err := strconv.Atoi(dcValue[:plusIndex]) + if err == nil && (pc == 2 || pc == 0) { // only request local compression is supported + dataCompressionCache[cacheIndex].Dc.Pc = uint8(pc) + tsc, err := strconv.Atoi(dcValue[plusIndex+1:]) + if err == nil && (tsc == 1 || tsc == 0) { // message local ts compression supported + dataCompressionCache[cacheIndex].Dc.Tsc = uint8(tsc) + isCached = true + } + } + } + return isCached +} + +func updatepayloadId(payloadId1 string, payloadId2 string) { + for i := 0; i < DCCACHESIZE; i++ { + if dataCompressionCache[i].PayloadId == payloadId1 { + dataCompressionCache[i].PayloadId = payloadId2 + } + } +} + +func getDcCacheIndex(payloadId string) int { + for i := 0; i < DCCACHESIZE; i++ { + if dataCompressionCache[i].PayloadId == payloadId { + return i + } + } + return -1 +} + +func resetDcCache(cacheIndex int) { + dataCompressionCache[cacheIndex].ResponseHandling = -1 + dataCompressionCache[cacheIndex].SortedList = nil +} + +func checkCompressionResponse(respMessage string) string { + var payloadId string + isUnsubscribe := false + if strings.Contains(respMessage, `"error"`) { + return respMessage + } + switch getValueForKey(respMessage, `"action"`) { + case "unsubscribe": + isUnsubscribe = true + fallthrough + case "get": + payloadId = getValueForKey(respMessage, `"requestId"`) + case "subscribe": + payloadId1 := getValueForKey(respMessage, `"requestId"`) + payloadId2 := getValueForKey(respMessage, `"subscriptionId"`) + updatepayloadId(payloadId1, payloadId2) + + case "subscription": + payloadId = getValueForKey(respMessage, `"subscriptionId"`) + default: return respMessage + } + cacheIndex := getDcCacheIndex(payloadId) + if cacheIndex == -1 { + return respMessage + } + if isUnsubscribe { + resetDcCache(cacheIndex) + return respMessage + } + switch dataCompressionCache[cacheIndex].ResponseHandling { + case 1: + if dataCompressionCache[cacheIndex].Dc.Pc == 2 { + dataCompressionCache[cacheIndex].SortedList = getSortedPaths(respMessage) + respMessage = compressPaths(respMessage, dataCompressionCache[cacheIndex].SortedList) + } + if dataCompressionCache[cacheIndex].Dc.Tsc == 1 { + respMessage = compressTs(respMessage) + } + resetDcCache(cacheIndex) + case 2: return respMessage + case 3: + if dataCompressionCache[cacheIndex].Dc.Pc == 2 { + respMessage = compressPaths(respMessage, dataCompressionCache[cacheIndex].SortedList) + } + if dataCompressionCache[cacheIndex].Dc.Tsc == 1 { + respMessage = compressTs(respMessage) + } + case 4: + dataCompressionCache[cacheIndex].SortedList = getSortedPaths(respMessage) + dataCompressionCache[cacheIndex].ResponseHandling = 3 + if dataCompressionCache[cacheIndex].Dc.Tsc == 1 { + respMessage = compressTs(respMessage) + } + } + return respMessage +} + +func getSortedPaths(respMessage string) []string { + respMap := make(map[string]interface{}) + if utils.MapRequest(respMessage, &respMap) != 0 { + utils.Error.Printf("getSortedPaths():invalid JSON format=%s", respMessage) + return nil + } + var paths []string + dataIf := respMap["data"] + switch data := dataIf.(type) { + case []interface{}: +// utils.Info.Println(data, "is []interface{}") + for i := 0; i < len(data); i++ { + for k, v := range data[i].(map[string]interface{}) { + if k == "path" { + paths = append(paths, v.(string)) + } + } + } + case interface{}: +// utils.Info.Println(data, "is interface{}") + for k, v := range data.(map[string]interface{}) { + if k == "path" { + paths = append(paths, v.(string)) + } + } + default: + utils.Info.Println(data, "is of an unknown type") + } + sort.Strings(paths) + return paths +} + +func compressTs(respMessage string) string { +utils.Info.Printf("compressTs()") + respMap := make(map[string]interface{}) + if utils.MapRequest(respMessage, &respMap) != 0 { + utils.Error.Printf("compressTs():invalid JSON format=%s", respMessage) + return respMessage + } + var tsList []string + messageTs := respMap["ts"].(string) + dataIf := respMap["data"] + switch data := dataIf.(type) { + case []interface{}: +// utils.Info.Println(data, "is []interface{}") + for i := 0; i < len(data); i++ { + for k, v := range data[i].(map[string]interface{}) { + if k == "dp" { + tsList = append(tsList, getDpTsList(v)...) + } + } + } + case interface{}: +// utils.Info.Println(data, "is interface{}") + for k, v := range data.(map[string]interface{}) { + if k == "dp" { + tsList = getDpTsList(v) + } + } + default: + utils.Info.Println(data, "is of an unknown type") + } + respMessage = replaceTs(respMessage, messageTs, tsList) + return respMessage +} + +func getDpTsList(dpMap interface{}) []string { + var tsList []string + switch dp := dpMap.(type) { + case []interface{}: +// utils.Info.Println(dp, "is []interface{}") + for i := 0; i < len(dp); i++ { + for k, v := range dp[i].(map[string]interface{}) { + if k == "ts" { + tsList = append(tsList, v.(string)) + } + } + } + case interface{}: +// utils.Info.Println(dp, "is interface{}") + for k, v := range dp.(map[string]interface{}) { + if k == "ts" { + tsList = append(tsList, v.(string)) + } + } + default: + utils.Info.Println(dp, "is of an unknown type") + } + return tsList +} + +func replaceTs(respMessage string, messageTs string, tsList []string) string { + tsRef, _ := time.Parse(time.RFC3339, messageTs) + refMs := tsRef.UnixMilli() + for i := 0; i < len(tsList); i++ { + tsDp, _ := time.Parse(time.RFC3339, tsList[i]) + dpMs := tsDp.UnixMilli() + diffMs := refMs - dpMs + if diffMs > 999999999 || diffMs < -999999999 { + continue // keep iso time + } + if diffMs == 0 { // replace 2nd instance + firstTsIndex := strings.Index(respMessage, tsList[i]) + len(tsList[i]) + 1 + respMessage = respMessage[:firstTsIndex] + strings.Replace(respMessage[firstTsIndex:], tsList[i], timeDiff(int(diffMs)), 1) + } else { + respMessage = strings.Replace(respMessage, tsList[i], timeDiff(int(diffMs)), 1) + } + } + return respMessage +} + +func timeDiff(diffMs int) string { + if diffMs > 0 { + return "-" + strconv.Itoa(diffMs) + } else { + return "+" + strconv.Itoa(-diffMs) + } +} + +func compressPaths(respMessage string, sortedList []string) string { + for i := 0; i < len(sortedList); i++ { + respMessage = strings.Replace(respMessage, sortedList[i], strconv.Itoa(i), 1) + } + return respMessage +} + func WsMgrInit(mgrId int, transportMgrChan chan string) { + var reqMessage string + var clientId int utils.ReadTransportSecConfig() initChannels() - - go utils.WsServer{ClientBackendChannel: clientBackendChan}.InitClientServer(utils.MuxServer[1], wsClientChan, mgrId, &wsClientIndex) // go routine needed due to listenAndServe call... - + initDcCache() + go utils.WsServer{ClientBackendChannel: clientBackendChan}.InitClientServer(utils.MuxServer[1], wsClientChan, mgrId, &wsClientIndex) utils.Info.Println("WS manager data session initiated.") for { select { case respMessage := <-transportMgrChan: utils.Info.Printf("WS mgr hub: Response from server core:%s", respMessage) + respMessage = checkCompressionResponse(respMessage) RemoveRoutingForwardResponse(respMessage, transportMgrChan) - case reqMessage := <-wsClientChan[0]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 0, transportMgrChan) - case reqMessage := <-wsClientChan[1]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 1, transportMgrChan) - case reqMessage := <-wsClientChan[2]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 2, transportMgrChan) - case reqMessage := <-wsClientChan[3]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 3, transportMgrChan) - case reqMessage := <-wsClientChan[4]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 4, transportMgrChan) - case reqMessage := <-wsClientChan[5]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 5, transportMgrChan) - case reqMessage := <-wsClientChan[6]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 6, transportMgrChan) - case reqMessage := <-wsClientChan[7]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 7, transportMgrChan) - case reqMessage := <-wsClientChan[8]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 8, transportMgrChan) - case reqMessage := <-wsClientChan[9]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 9, transportMgrChan) - case reqMessage := <-wsClientChan[10]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 10, transportMgrChan) - case reqMessage := <-wsClientChan[11]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 11, transportMgrChan) - case reqMessage := <-wsClientChan[12]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 12, transportMgrChan) - case reqMessage := <-wsClientChan[13]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 13, transportMgrChan) - case reqMessage := <-wsClientChan[14]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 14, transportMgrChan) - case reqMessage := <-wsClientChan[15]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 15, transportMgrChan) - case reqMessage := <-wsClientChan[16]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 16, transportMgrChan) - case reqMessage := <-wsClientChan[17]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 17, transportMgrChan) - case reqMessage := <-wsClientChan[18]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 18, transportMgrChan) - case reqMessage := <-wsClientChan[19]: - utils.AddRoutingForwardRequest(reqMessage, mgrId, 19, transportMgrChan) + continue + case reqMessage = <-wsClientChan[0]: clientId = 0 + case reqMessage = <-wsClientChan[1]: clientId = 1 + case reqMessage = <-wsClientChan[2]: clientId = 2 + case reqMessage = <-wsClientChan[3]: clientId = 3 + case reqMessage = <-wsClientChan[4]: clientId = 4 + case reqMessage = <-wsClientChan[5]: clientId = 5 + case reqMessage = <-wsClientChan[6]: clientId = 6 + case reqMessage = <-wsClientChan[7]: clientId = 7 + case reqMessage = <-wsClientChan[8]: clientId = 8 + case reqMessage = <-wsClientChan[9]: clientId = 9 + case reqMessage = <-wsClientChan[10]: clientId = 10 + case reqMessage = <-wsClientChan[11]: clientId = 11 + case reqMessage = <-wsClientChan[12]: clientId = 12 + case reqMessage = <-wsClientChan[13]: clientId = 13 + case reqMessage = <-wsClientChan[14]: clientId = 14 + case reqMessage = <-wsClientChan[15]: clientId = 15 + case reqMessage = <-wsClientChan[16]: clientId = 16 + case reqMessage = <-wsClientChan[17]: clientId = 17 + case reqMessage = <-wsClientChan[18]: clientId = 18 + case reqMessage = <-wsClientChan[19]: clientId = 19 } + checkCompressionRequest(reqMessage) + utils.AddRoutingForwardRequest(reqMessage, mgrId, clientId, transportMgrChan) } } diff --git a/utils/grcputils.go b/utils/grcputils.go index 682c2e46..561b8a1f 100644 --- a/utils/grcputils.go +++ b/utils/grcputils.go @@ -335,9 +335,9 @@ func createPbFilter(index int, filterExpression map[string]interface{}, filter * filter.FilterExp[index].Value.ValueHistory = &pb.FilterExpressions_FilterExpression_FilterValue_HistoryValue{} filter.FilterExp[index].Value.ValueHistory.TimePeriod = filterExpression["parameter"].(string) case pb.FilterExpressions_FilterExpression_METADATA: - Warning.Printf("Filter type is not supported by protobuf encoding.") + Warning.Printf("Filter variant is not supported by protobuf encoding.") default: - Error.Printf("Filter type is unknown.") + Error.Printf("Filter variant is unknown.") } } @@ -422,7 +422,7 @@ func getFilterVariant(filterVariant string) pb.FilterExpressions_FilterExpressio case "metadata": return pb.FilterExpressions_FilterExpression_METADATA } - return pb.FilterExpressions_FilterExpression_METADATA + 100 //undefined filter type + return pb.FilterExpressions_FilterExpression_METADATA + 100 //undefined filter variant } func createSubscribeRequestPb(protoMessage *pb.SubscribeRequestMessage, messageMap map[string]interface{}) { @@ -694,7 +694,7 @@ func synthesizeFilter(filterExp *pb.FilterExpressions_FilterExpression) string { fType = "metadata" value = getJsonFilterValueMetadata(filterExp) } - return `{"type":"` + fType + `","parameter":` + value + `}` + return `{"variant":"` + fType + `","parameter":` + value + `}` } func getJsonFilterValuePaths(filterExp *pb.FilterExpressions_FilterExpression) string {