Skip to content

Commit

Permalink
Merge pull request #33 from COVESA/wscsv_client
Browse files Browse the repository at this point in the history
Wscsv client
  • Loading branch information
UlfBj authored Apr 29, 2024
2 parents ca3d72c + b243a87 commit bc93099
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Dockerfile.agtserver
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

ARG GO_VERSION=1.22.1
ARG VSSTREE_NAME="vss_vissv2.binary"
ARG BUILD_IMAGE="golang:1.22.1-bookworm"
ARG BUILD_IMAGE="golang:latest"
ARG RUNTIME_IMAGE="debian:bullseye-slim"

#----------------------Builder-----------------------
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile.rlserver
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

ARG GO_VERSION=1.22.1
ARG VSSTREE_NAME="vss_vissv2.binary"
ARG BUILD_IMAGE="golang:1.22.1-bookworm"
ARG BUILD_IMAGE="golang:latest"
ARG RUNTIME_IMAGE="debian:bullseye-slim"

#----------------------Builder-----------------------
Expand Down Expand Up @@ -77,7 +77,7 @@ COPY --from=builder /build/server/agt_server/agt_public_key.rsa .
ENTRYPOINT ["/app/vissv2server","-s","redis"]


FROM golang:latest as feeder
FROM golang:latest as feeder
USER root
WORKDIR /app
COPY --from=builder /build/bin/feeder-rl feeder
Expand Down
43 changes: 25 additions & 18 deletions client/client-1.0/csv_client/csv_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,13 @@ func performNoneCommand(commandNumber int, conn *websocket.Conn, optionChannel c
fmt.Printf("Compression is not applied. If subscribe request is issued, response data will be saved in the file data.csv\n")

jsonResponse := string(getResponse(conn, []byte(requestList.Request[commandNumber])))
fmt.Printf("Response: %s\n", jsonResponse)
if strings.Contains(requestList.Request[commandNumber], "subscribe") == true {
subscriptionId := utils.ExtractSubscriptionId(jsonResponse)
unsubReq := `{"action":"unsubscribe", "subscriptionId":"` + subscriptionId + `"}`
unsubChannel <- unsubReq
maxArrayLen := 10000
valArray := make([]string, maxArrayLen)
tsArray := make([]string, maxArrayLen)
maxArrayLen := 10 // no of datapoints to save in csv + buffer size for last notification
var valArray []string //:= make([]string, maxArrayLen)
var tsArray []string //:= make([]string, maxArrayLen)
sessionDone := false
arrayIndex := 0
finalIterations := -1
Expand All @@ -170,12 +169,12 @@ func performNoneCommand(commandNumber int, conn *websocket.Conn, optionChannel c
finalIterations--
fmt.Printf("Notification: %s\n", jsonNotification)
if arrayIndex < maxArrayLen {
arrayIndex = storeinArrays(jsonNotification, valArray, tsArray, arrayIndex)
arrayIndex = storeinArrays(jsonNotification, &valArray, &tsArray, arrayIndex)
} else {
fmt.Printf("Maximum number of datapoints are saved. Recording terminated.\n")
}
}
if sessionDone == true && finalIterations == 0 {
if sessionDone == true && finalIterations <= 0 {
fmt.Printf("Number of datapoints saved: %d\n", arrayIndex)
saveInCsv(valArray, tsArray, arrayIndex)
return
Expand All @@ -189,13 +188,13 @@ func performNoneCommand(commandNumber int, conn *websocket.Conn, optionChannel c
/* TODO: Current impl only supports notifications that do not contain data arrays ([]data).
* To support data[] (multiple signals in the notification), arrayIndex would need to be an array, and tsArray/valArray would have to be 2-dim arrays
*/
func storeinArrays(jsonNotification string, valArray []string, tsArray []string, arrayIndex int) int { // can be any of the four response formats that VISSv2 specifies...
func storeinArrays(jsonNotification string, valArray *[]string, tsArray *[]string, arrayIndex int) int { // can be any of the four response formats that VISSv2 specifies...
var notificationMap = make(map[string]interface{})
utils.MapRequest(jsonNotification, &notificationMap)
return processDataLevel1(notificationMap["data"], valArray, tsArray, arrayIndex)
}

func processDataLevel1(dataObject interface{}, valArray []string, tsArray []string, arrayIndex int) int { // data or []data level
func processDataLevel1(dataObject interface{}, valArray *[]string, tsArray *[]string, arrayIndex int) int { // data or []data level
switch vv := dataObject.(type) {
case []interface{}: // []data
// utils.Info.Println(dataObject, "is an array:, len=", strconv.Itoa(len(vv)))
Expand All @@ -209,7 +208,7 @@ func processDataLevel1(dataObject interface{}, valArray []string, tsArray []stri
return arrayIndex
}

func processDataLevel2(dataArray []interface{}, valArray []string, tsArray []string, arrayIndex int) int { // []data level
func processDataLevel2(dataArray []interface{}, valArray *[]string, tsArray *[]string, arrayIndex int) int { // []data level
for k, v := range dataArray {
switch vv := v.(type) {
case map[string]interface{}:
Expand All @@ -222,7 +221,7 @@ func processDataLevel2(dataArray []interface{}, valArray []string, tsArray []str
return arrayIndex
}

func processDataLevel3(data map[string]interface{}, valArray []string, tsArray []string, arrayIndex int) int { // inside data, dp or []dp level
func processDataLevel3(data map[string]interface{}, valArray *[]string, tsArray *[]string, arrayIndex int) int { // inside data, dp or []dp level
for k, v := range data {
switch vv := v.(type) {
case []interface{}: // []dp
Expand All @@ -240,7 +239,7 @@ func processDataLevel3(data map[string]interface{}, valArray []string, tsArray [
return arrayIndex
}

func processDataLevel4(dpArray []interface{}, valArray []string, tsArray []string, arrayIndex int) int { // []dp level
func processDataLevel4(dpArray []interface{}, valArray *[]string, tsArray *[]string, arrayIndex int) int { // []dp level
for k, v := range dpArray {
switch vv := v.(type) {
case map[string]interface{}:
Expand All @@ -253,15 +252,17 @@ func processDataLevel4(dpArray []interface{}, valArray []string, tsArray []strin
return arrayIndex
}

func processDataLevel5(dp map[string]interface{}, valArray []string, tsArray []string, arrayIndex int) int { // inside dp level
func processDataLevel5(dp map[string]interface{}, valArray *[]string, tsArray *[]string, arrayIndex int) int { // inside dp level
for k, v := range dp {
switch vv := v.(type) {
case string:
// utils.Info.Println(k, "is string", vv)
if k == "value" {
valArray[arrayIndex] = vv
// valArray[arrayIndex] = vv
*valArray = append(*valArray, vv)
} else if k == "ts" {
tsArray[arrayIndex] = vv
// tsArray[arrayIndex] = vv
*tsArray = append(*tsArray, vv)
}
default:
utils.Info.Println(k, "is of an unknown type")
Expand Down Expand Up @@ -383,12 +384,13 @@ func displayOptions() {

func readOption(conn *websocket.Conn, optionChannel chan string, unsubChannel chan string) {
for {
fmt.Scanf("%s", &commandNumber)
fmt.Scanf("%s\n", &commandNumber)
if commandNumber == "q" {
unsubReq := <-unsubChannel
fmt.Printf("Unsubscribe request: %s\n", unsubReq)
getResponse(conn, []byte(unsubReq))

// getResponse(conn, []byte(unsubReq))
// let the main thread handle the unsubscribe request
conn.WriteMessage(websocket.BinaryMessage, []byte(unsubReq))
}
optionChannel <- commandNumber
}
Expand Down Expand Up @@ -442,7 +444,7 @@ func main() {
}

conn := initVissV2WebSocket(compression)

defer conn.Close()
unsubChannel := make(chan string, 1)
optionChannel := make(chan string)
go readOption(conn, optionChannel, unsubChannel)
Expand All @@ -451,10 +453,15 @@ func main() {
displayOptions()
select {
case commandNumber = <-optionChannel:
fmt.Printf("command number is \n", commandNumber)
if commandNumber == "0" {
fmt.Printf("Exiting program\n")
return
}
}
if commandNumber == "q" {
continue
}
cNo, err := strconv.Atoi(commandNumber)
if err != nil {
fmt.Printf("Selected option not supported\n")
Expand Down
2 changes: 1 addition & 1 deletion feeder/feeder-rl/feeder-rl.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func initVSSInterfaceMgr(inputChan chan DomainData, outputChan chan DomainData)
select {
case outData := <-outputChan:
utils.Info.Printf("Data written to statestorage: Name=%s, Value=%s", outData.Name, outData.Value)
status := redisSet(feederClient, outData.Name, outData.Value, utils.GetRfcTime())
status := redisSet(feederClient, outData.Name, outData.Value /*utils.GetRfcTime()*/, utils.GetTimeInMilliSecs())
if status != 0 {
utils.Error.Printf("initVSSInterfaceMgr():Redis write failed")
}
Expand Down
15 changes: 12 additions & 3 deletions server/vissv2server/serviceMgr/curvelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func clCapture1dim(clChan chan CLPack, subscriptionId int, path string, bufSize
}
mcloseClSubId.Unlock()
dp := getVehicleData(path)
utils.Info.Printf("dp=%s", dp)
utils.Info.Printf("dp=%s", dp)
utils.MapRequest(dp, &dpMap)
if dpMap["value"].(string) == "Data-not-found" {
continue
Expand Down Expand Up @@ -622,8 +622,17 @@ func transformDataPoint(aRingBuffer *RingBuffer, index int, tsBase time.Time) (C
cLBufElement.Value = (float64)(value)
t, err := time.Parse(time.RFC3339, ts)
if err != nil {
utils.Error.Printf("Curve logging failed to convert time to Unix time err=%s", err)
return cLBufElement, false
t2, err := strconv.ParseInt(ts, 10, 64)
if err != nil {
utils.Error.Printf("Curve logging failed to convert time to Unix time err=%s", err)
return cLBufElement, false
}
tstr := time.UnixMilli(t2).Format(time.RFC3339)
t, err = time.Parse(time.RFC3339, tstr)
if err != nil {
utils.Error.Printf("Curve logging failed to convert time to Unix time err=%s", err)
return cLBufElement, false
}
}
cLBufElement.Timestamp = t.Sub(tsBase).Seconds()
return cLBufElement, true
Expand Down
5 changes: 5 additions & 0 deletions utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,18 @@ func AddKeyValue(message string, key string, value string) string { // to avoid
return message
}

func GetTimeInMilliSecs() string {
return strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond), 10)
}

func GetRfcTime() string {
withTimeZone := time.Now().Format(time.RFC3339) // 2020-05-01T15:34:35+02:00
if withTimeZone[len(withTimeZone)-6] == '+' {
return withTimeZone[:len(withTimeZone)-6] + "Z"
} else {
return withTimeZone
}
return withTimeZone
}

func FileExists(filename string) bool {
Expand Down

0 comments on commit bc93099

Please sign in to comment.