Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AT server changed to act as server towards ECF #9

Merged
merged 2 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 31 additions & 40 deletions server/vissv2server/atServer/atServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@ import (
"crypto/rsa"
"crypto/tls"
"encoding/json"
"flag"
"fmt"
"github.com/gorilla/websocket"
"io"
"math/rand"
"net/http"
"net/url"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -201,61 +199,54 @@ func initClientComm(atsChannel chan string, muxServer *http.ServeMux) {
}

func initEcfComm(ecfReceiveChan chan string, ecfSendChan chan string, muxServer *http.ServeMux) {
scheme := "ws"
portNum := "8445"
var addr = flag.String("addr", "localhost:"+portNum, "http service address")
dataSessionUrl := url.URL{Scheme: scheme, Host: *addr, Path: ""}
dialer := websocket.Dialer{
HandshakeTimeout: time.Second,
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
conn := reDialer(dialer, dataSessionUrl)
if conn != nil {
go ecfClient(conn, ecfSendChan)
ecfReceiveChan <- "internal-ecfAvailable"
go ecfReceiver(conn, ecfReceiveChan)
}
}

func reDialer(dialer websocket.Dialer, sessionUrl url.URL) *websocket.Conn {
for i := 0; i < 15; i++ {
conn, _, err := dialer.Dial(sessionUrl.String(), nil)
if err != nil {
utils.Error.Printf("Data session dial error:%s\n", err)
time.Sleep(2 * time.Second)
ecfHandler := makeEcfHandler(ecfReceiveChan, ecfSendChan)
muxServer.HandleFunc("/", ecfHandler)
utils.Info.Print(http.ListenAndServe(":8445", muxServer))
}

func makeEcfHandler(receiveChan chan string, sendChan chan string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
if req.Header.Get("Upgrade") == "websocket" {
utils.Info.Printf("Received websocket request: we are upgrading to a websocket connection.\n")
Upgrader.CheckOrigin = func(r *http.Request) bool { return true }
h := http.Header{}
conn, err := Upgrader.Upgrade(w, req, h)
if err != nil {
utils.Error.Print("upgrade error:", err)
return
}
go ecfReceiver(conn, receiveChan)
go ecfSender(conn, sendChan)
receiveChan <- "internal-ecfAvailable"
} else {
utils.Info.Printf("ECF dial success.\n")
return conn
utils.Info.Printf("Client must set up a Websocket session.\n")
}
}
utils.Error.Printf("ECF dial failure.\n")
return nil
}

func ecfClient(conn *websocket.Conn, ecfSendChan chan string) {
func ecfReceiver(conn *websocket.Conn, receiveChan chan string) {
defer conn.Close()
for {
ecfRequest := <-ecfSendChan
err := conn.WriteMessage(websocket.TextMessage, []byte(ecfRequest))
_, msg, err := conn.ReadMessage()
if err != nil {
utils.Error.Printf("ecfClient:Request write error:%s\n", err)
return
utils.Error.Printf("ECF server read error: %s\n", err)
break
}
request := string(msg)
// utils.Info.Printf("ecfReceiver: request: %s\n", request)
receiveChan <- request
}
}

func ecfReceiver(conn *websocket.Conn, ecfReceiveChan chan string) {
func ecfSender(conn *websocket.Conn, sendChan chan string) {
defer conn.Close()
for {
_, msg, err := conn.ReadMessage()
response := <- sendChan
err := conn.WriteMessage(websocket.TextMessage, []byte(response))
if err != nil {
utils.Error.Printf("ecfReceiver read error: %s", err)
utils.Error.Printf("ecfSender: write error: %s\n", err)
break
}
message := string(msg)
utils.Info.Printf("ECF message: %s", message)
ecfReceiveChan <- message
}
}

Expand Down
66 changes: 38 additions & 28 deletions server/vissv2server/atServer/ecfSim/ecfSimulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"time"
"strings"
"net/http"
"net/url"
"flag"
"github.com/gorilla/websocket"
"encoding/json"
)
Expand All @@ -34,57 +36,65 @@ var cancelTicker *time.Ticker
var cancelRequest string

func initEcfComm(receiveChan chan string, sendChan chan string, muxServer *http.ServeMux) {
ecfHandler := makeEcfHandler(receiveChan, sendChan)
muxServer.HandleFunc("/", ecfHandler)
fmt.Print(http.ListenAndServe(":8445", muxServer))
scheme := "ws"
portNum := "8445"
var addr = flag.String("addr", "localhost:"+portNum, "http service address")
dataSessionUrl := url.URL{Scheme: scheme, Host: *addr, Path: ""}
dialer := websocket.Dialer{
HandshakeTimeout: time.Second,
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
conn := reDialer(dialer, dataSessionUrl)
if conn != nil {
go ecfClient(conn, sendChan)
go ecfReceiver(conn, receiveChan)
}
}

func makeEcfHandler(receiveChan chan string, sendChan chan string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
if req.Header.Get("Upgrade") == "websocket" {
fmt.Printf("Received websocket request: we are upgrading to a websocket connection.\n")
Upgrader.CheckOrigin = func(r *http.Request) bool { return true }
h := http.Header{}
conn, err := Upgrader.Upgrade(w, req, h)
if err != nil {
fmt.Print("upgrade error:", err)
return
}
go ecfReceiver(conn, receiveChan)
go ecfSender(conn, sendChan)
func reDialer(dialer websocket.Dialer, sessionUrl url.URL) *websocket.Conn {
for i := 0; i < 15; i++ {
conn, _, err := dialer.Dial(sessionUrl.String(), nil)
if err != nil {
fmt.Printf("Data session dial error:%s\n", err)
time.Sleep(2 * time.Second)
} else {
fmt.Printf("Client must set up a Websocket session.\n")
fmt.Printf("ECF dial success.\n")
return conn
}
}
fmt.Printf("ECF dial failure.\n")
return nil
}

func ecfReceiver(conn *websocket.Conn, receiveChan chan string) {
func ecfClient(conn *websocket.Conn, sendChan chan string) {
defer conn.Close()
for {
_, msg, err := conn.ReadMessage()
ecfRequest := <-sendChan
err := conn.WriteMessage(websocket.TextMessage, []byte(ecfRequest))
if err != nil {
fmt.Printf("ECF server read error: %s\n", err)
break
fmt.Printf("ecfClient:Request write error:%s\n", err)
return
}
request := string(msg)
// fmt.Printf("ecfReceiver: request: %s\n", request)
receiveChan <- request
}
}

func ecfSender(conn *websocket.Conn, sendChan chan string) {
func ecfReceiver(conn *websocket.Conn, ecfReceiveChan chan string) {
defer conn.Close()
for {
response := <- sendChan
err := conn.WriteMessage(websocket.TextMessage, []byte(response))
_, msg, err := conn.ReadMessage()
if err != nil {
fmt.Printf("ecfSender: write error: %s\n", err)
fmt.Printf("ecfReceiver read error: %s\n", err)
break
}
message := string(msg)
fmt.Printf("ECF message: %s\n", message)
ecfReceiveChan <- message
}
}

func dispatchResponse(request string, sendChan chan string) {
fmt.Printf("dispatchResponse: request=%s\n", request)
var requestMap map[string]interface{}
errorIndex := statusIndex
err := json.Unmarshal([]byte(request), &requestMap)
Expand Down
18 changes: 14 additions & 4 deletions server/vissv2server/atServer/purposelist.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,20 @@
],
"device": "Cloud"
},
"signal_access": {
"path": "Vehicle.Powertrain.Transmission.TravelledDistance",
"access_mode": "read-only"
}
"signal_access": [
{
"path": "Vehicle.TraveledDistance",
"access_mode": "read-only"
},
{
"path": "Vehicle.CurrentLocation.Longitude",
"access_mode": "read-only"
},
{
"path": "Vehicle.CurrentLocation.Latitude",
"access_mode": "read-only"
}
]
},
{
"short": "pay-how-you-drive",
Expand Down