From 70526e7fc25df9dd63ed0b53660dacdf6eae5559 Mon Sep 17 00:00:00 2001 From: Simon Dalvai Date: Wed, 24 Jan 2024 14:27:59 +0100 Subject: [PATCH] traffic-a22-data-quality: initial commit --- traffic-a22-data-quality/.env.example | 0 traffic-a22-data-quality/README.md | 4 + traffic-a22-data-quality/calls.http | 34 +++ traffic-a22-data-quality/src/bdplib/auth.go | 96 +++++++ traffic-a22-data-quality/src/bdplib/client.go | 239 ++++++++++++++++++ traffic-a22-data-quality/src/bdplib/ninja.go | 239 ++++++++++++++++++ traffic-a22-data-quality/src/dc/auth.go | 102 ++++++++ traffic-a22-data-quality/src/dc/data.go | 105 ++++++++ traffic-a22-data-quality/src/dc/job.go | 154 +++++++++++ traffic-a22-data-quality/src/go.mod | 11 + traffic-a22-data-quality/src/go.sum | 42 +++ traffic-a22-data-quality/src/log/log.go | 43 ++++ traffic-a22-data-quality/src/main.go | 38 +++ 13 files changed, 1107 insertions(+) create mode 100644 traffic-a22-data-quality/.env.example create mode 100644 traffic-a22-data-quality/README.md create mode 100644 traffic-a22-data-quality/calls.http create mode 100644 traffic-a22-data-quality/src/bdplib/auth.go create mode 100644 traffic-a22-data-quality/src/bdplib/client.go create mode 100644 traffic-a22-data-quality/src/bdplib/ninja.go create mode 100644 traffic-a22-data-quality/src/dc/auth.go create mode 100644 traffic-a22-data-quality/src/dc/data.go create mode 100644 traffic-a22-data-quality/src/dc/job.go create mode 100644 traffic-a22-data-quality/src/go.mod create mode 100644 traffic-a22-data-quality/src/go.sum create mode 100644 traffic-a22-data-quality/src/log/log.go create mode 100644 traffic-a22-data-quality/src/main.go diff --git a/traffic-a22-data-quality/.env.example b/traffic-a22-data-quality/.env.example new file mode 100644 index 00000000..e69de29b diff --git a/traffic-a22-data-quality/README.md b/traffic-a22-data-quality/README.md new file mode 100644 index 00000000..c6e9398c --- /dev/null +++ b/traffic-a22-data-quality/README.md @@ -0,0 +1,4 @@ +# Traffic A22 data quality elaboration + +This elaboration extends the traffic data of A22 by enriching the data with daily sums of passing vehicles per station and creating virtual parent stations grouping traffic stations with the same directions e.g. km 612 direction south + km 612 direction south fast lane + diff --git a/traffic-a22-data-quality/calls.http b/traffic-a22-data-quality/calls.http new file mode 100644 index 00000000..0d9fd74e --- /dev/null +++ b/traffic-a22-data-quality/calls.http @@ -0,0 +1,34 @@ +##### VSCODE / REST Client +# Create a .env file and set the corresponding variables +# See all $dotenv fields below + +@username={{$dotenv API_OAUTH_USERNAME}} +@password={{$dotenv API_OAUTH_PASSWORD}} +@secret={{$dotenv API_OAUTH_CLIENT_SECRET}} +@clientId={{$dotenv API_OAUTH_CLIENT_ID}} + + + +### Get access token for the writer (LOCAL DEV) +# @name login +GET https://www.onecenter.info/oauth/token +Content-Type: application/x-www-form-urlencoded + +grant_type=password +&username={{username}} +&client_id={{clientId}} +&client_secret={{secret}} +&password={{password}} + +### save token + +@token = {{login.response.body.access_token}} + +### GetFacilities +GET https://www.onecenter.info/api/DAZ/GetFacilities +Authorization: Bearer {{token}} + +### FacilityID +GET https://www.onecenter.info/api/DAZ/FacilityFreePlaces +?FacilityID=608612 +Authorization: Bearer {{login.response.body.access_token}} \ No newline at end of file diff --git a/traffic-a22-data-quality/src/bdplib/auth.go b/traffic-a22-data-quality/src/bdplib/auth.go new file mode 100644 index 00000000..ad973c9d --- /dev/null +++ b/traffic-a22-data-quality/src/bdplib/auth.go @@ -0,0 +1,96 @@ +// SPDX-FileCopyrightText: NOI Techpark + +// SPDX-License-Identifier: AGPL-3.0-or-later + +package bdplib + +import ( + "encoding/json" + "io" + "log/slog" + "net/http" + "net/url" + "os" + "strconv" + "strings" + "time" +) + +type Token struct { + AccessToken string `json:"access_token"` + ExpiresIn int64 `json:"expires_in"` + NotBeforePolicy int64 `json:"not-before-policy"` + RefreshExpiresIn int64 `json:"refresh_expires_in"` + TokenType string `json:"token_type"` + RefreshToken string `json:"refresh_token"` + Scope string +} + +var tokenUri string = os.Getenv("OAUTH_TOKEN_URI") +var clientId string = os.Getenv("OAUTH_CLIENT_ID") +var clientSecret string = os.Getenv("OAUTH_CLIENT_SECRET") + +var token Token + +var tokenExpiry int64 + +func GetToken() string { + ts := time.Now().Unix() + + if len(token.AccessToken) == 0 || ts > tokenExpiry { + // if no token is available or refreshToken is expired, get new token + newToken() + } + + return token.AccessToken +} + +func newToken() { + slog.Info("Getting new token...") + params := url.Values{} + params.Add("client_id", clientId) + params.Add("client_secret", clientSecret) + params.Add("grant_type", "client_credentials") + + authRequest(params) + + slog.Info("Getting new token done.") +} + +func authRequest(params url.Values) { + body := strings.NewReader(params.Encode()) + + req, err := http.NewRequest("POST", tokenUri, body) + if err != nil { + slog.Error("error", err) + return + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + slog.Error("error", err) + return + } + defer resp.Body.Close() + + slog.Info("Auth response code is: " + strconv.Itoa(resp.StatusCode)) + if resp.StatusCode == http.StatusOK { + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + slog.Error("error", err) + return + } + + err = json.Unmarshal(bodyBytes, &token) + if err != nil { + slog.Error("error", err) + return + } + } + + // calculate token expiry timestamp with 600 seconds margin + tokenExpiry = time.Now().Unix() + token.ExpiresIn - 600 + + slog.Debug("auth token expires in " + strconv.FormatInt(tokenExpiry, 10)) +} diff --git a/traffic-a22-data-quality/src/bdplib/client.go b/traffic-a22-data-quality/src/bdplib/client.go new file mode 100644 index 00000000..2368740d --- /dev/null +++ b/traffic-a22-data-quality/src/bdplib/client.go @@ -0,0 +1,239 @@ +// SPDX-FileCopyrightText: NOI Techpark + +// SPDX-License-Identifier: AGPL-3.0-or-later + +package bdplib + +import ( + "bufio" + "bytes" + "encoding/json" + "log/slog" + "net/http" + "os" + "strconv" +) + +type Provenance struct { + Lineage string `json:"lineage"` + DataCollector string `json:"dataCollector"` + DataCollectorVersion string `json:"dataCollectorVersion"` +} + +type DataType struct { + Name string `json:"name"` + Unit string `json:"unit"` + Description string `json:"description"` + Rtype string `json:"rType"` + Period uint32 `json:"period"` + MetaData map[string]string `json:"metaData"` +} + +type Station struct { + Id string `json:"id"` + Name string `json:"name"` + StationType string `json:"stationType"` + Latitude float64 `json:"latitude"` + Longitude float64 `json:"longitude"` + Origin string `json:"origin"` + ParentStation string `json:"parentStation"` + MetaData map[string]interface{} `json:"metaData"` +} + +type DataMap struct { + Name string `json:"name"` + Data []Record `json:"data"` + Branch map[string]DataMap `json:"branch"` + Provenance string `json:"provenance"` +} + +type Record struct { + Value interface{} `json:"value"` + Period uint64 `json:"period"` + Timestamp int64 `json:"timestamp"` +} + +const syncDataTypesPath string = "/syncDataTypes" +const syncStationsPath string = "/syncStations" +const pushRecordsPath string = "/pushRecords" +const getDateOfLastRecordPath string = "/getDateOfLastRecord" +const stationsPath string = "/stations" +const provenancePath string = "/provenance" + +var provenanceUuid string + +var baseUri string = os.Getenv("BASE_URI") + +var prv string = os.Getenv("PROVENANCE_VERSION") +var prn string = os.Getenv("PROVENANCE_NAME") + +var origin string = os.Getenv("ORIGIN") + +func SyncDataTypes(stationType string, dataTypes []DataType) { + pushProvenance() + + slog.Debug("Syncing data types...") + + url := baseUri + syncDataTypesPath + "?stationType=" + stationType + "&prn=" + prn + "&prv=" + prv + + postToWriter(dataTypes, url) + + slog.Debug("Syncing data types done.") +} + +func SyncStations(stationType string, stations []Station) { + pushProvenance() + + slog.Info("Syncing " + strconv.Itoa(len(stations)) + " " + stationType + " stations...") + url := baseUri + syncStationsPath + "/" + stationType + "?prn=" + prn + "&prv=" + prv + postToWriter(stations, url) + slog.Info("Syncing stations done.") +} + +func PushData(stationType string, dataMap DataMap) { + pushProvenance() + + slog.Info("Pushing records...") + url := baseUri + pushRecordsPath + "/" + stationType + "?prn=" + prn + "&prv=" + prv + postToWriter(dataMap, url) + slog.Info("Pushing records done.") +} + +func CreateDataType(name string, unit string, description string, rtype string) DataType { + // TODO add some checks + return DataType{ + Name: name, + Unit: unit, + Description: description, + Rtype: rtype, + } +} + +func CreateStation(id string, name string, stationType string, lat float64, lon float64, origin string) Station { + // TODO add some checks + var station = Station{ + Name: name, + StationType: stationType, + Latitude: lat, + Longitude: lon, + Origin: origin, + Id: id, + } + return station +} + +func CreateRecord(ts int64, value interface{}, period uint64) Record { + // TODO add some checks + var record = Record{ + Value: value, + Timestamp: ts, + Period: period, + } + return record +} + +func createDataMap() DataMap { + var dataMap = DataMap{ + Name: "(default)", + Provenance: provenanceUuid, + Branch: make(map[string]DataMap), + } + return dataMap +} + +// add an array of record to dataMap +func AddRecords(stationCode string, dataType string, records []Record, dataMap *DataMap) { + for _, record := range records { + AddRecord(stationCode, dataType, record, dataMap) + } +} + +// add one single record to dataMap +func AddRecord(stationCode string, dataType string, record Record, dataMap *DataMap) { + if dataMap.Name == "" { + *dataMap = createDataMap() + } + + if dataMap.Branch[stationCode].Name == "" { + dataMap.Branch[stationCode] = DataMap{ + Name: "(default)", + Branch: make(map[string]DataMap), + } + slog.Debug("new station in branch " + stationCode) + } + + if dataMap.Branch[stationCode].Branch[dataType].Name == "" { + dataMap.Branch[stationCode].Branch[dataType] = DataMap{ + Name: "(default)", + Data: []Record{record}, + } + // to assign a value to a struct in a map, this code part is needed + // https://stackoverflow.com/a/69006398/8794667 + } else if entry, ok := dataMap.Branch[stationCode].Branch[dataType]; ok { + entry.Data = append(entry.Data, record) + dataMap.Branch[stationCode].Branch[dataType] = entry + } +} + +func postToWriter(data interface{}, fullUrl string) (string, error) { + json, err := json.Marshal(data) + if err != nil { + slog.Error("error", err) + } + + client := http.Client{} + req, err := http.NewRequest("POST", fullUrl, bytes.NewBuffer(json)) + if err != nil { + slog.Error("error", err) + } + + req.Header = http.Header{ + "Content-Type": {"application/json"}, + "Authorization": {"Bearer " + GetToken()}, + } + + res, err := client.Do(req) + if err != nil { + slog.Error("error", err) + } + + slog.Info("Writer post response code: " + res.Status) + + scanner := bufio.NewScanner(res.Body) + for i := 0; scanner.Scan() && i < 5; i++ { + return scanner.Text(), nil + } + + err = scanner.Err() + if err != nil { + slog.Error("error", err) + } + return "", err +} + +func pushProvenance() { + if len(provenanceUuid) > 0 { + return + } + + slog.Info("Pushing provenance...") + slog.Info("prv: " + prv + " prn: " + prn) + + var provenance = Provenance{ + DataCollector: prn, + DataCollectorVersion: prv, + Lineage: origin, + } + + url := baseUri + provenancePath + "?&prn=" + prn + "&prv=" + prv + + res, err := postToWriter(provenance, url) + + if err != nil { + slog.Error("error", err) + } + + provenanceUuid = res + + slog.Info("Pushing provenance done.") +} diff --git a/traffic-a22-data-quality/src/bdplib/ninja.go b/traffic-a22-data-quality/src/bdplib/ninja.go new file mode 100644 index 00000000..2368740d --- /dev/null +++ b/traffic-a22-data-quality/src/bdplib/ninja.go @@ -0,0 +1,239 @@ +// SPDX-FileCopyrightText: NOI Techpark + +// SPDX-License-Identifier: AGPL-3.0-or-later + +package bdplib + +import ( + "bufio" + "bytes" + "encoding/json" + "log/slog" + "net/http" + "os" + "strconv" +) + +type Provenance struct { + Lineage string `json:"lineage"` + DataCollector string `json:"dataCollector"` + DataCollectorVersion string `json:"dataCollectorVersion"` +} + +type DataType struct { + Name string `json:"name"` + Unit string `json:"unit"` + Description string `json:"description"` + Rtype string `json:"rType"` + Period uint32 `json:"period"` + MetaData map[string]string `json:"metaData"` +} + +type Station struct { + Id string `json:"id"` + Name string `json:"name"` + StationType string `json:"stationType"` + Latitude float64 `json:"latitude"` + Longitude float64 `json:"longitude"` + Origin string `json:"origin"` + ParentStation string `json:"parentStation"` + MetaData map[string]interface{} `json:"metaData"` +} + +type DataMap struct { + Name string `json:"name"` + Data []Record `json:"data"` + Branch map[string]DataMap `json:"branch"` + Provenance string `json:"provenance"` +} + +type Record struct { + Value interface{} `json:"value"` + Period uint64 `json:"period"` + Timestamp int64 `json:"timestamp"` +} + +const syncDataTypesPath string = "/syncDataTypes" +const syncStationsPath string = "/syncStations" +const pushRecordsPath string = "/pushRecords" +const getDateOfLastRecordPath string = "/getDateOfLastRecord" +const stationsPath string = "/stations" +const provenancePath string = "/provenance" + +var provenanceUuid string + +var baseUri string = os.Getenv("BASE_URI") + +var prv string = os.Getenv("PROVENANCE_VERSION") +var prn string = os.Getenv("PROVENANCE_NAME") + +var origin string = os.Getenv("ORIGIN") + +func SyncDataTypes(stationType string, dataTypes []DataType) { + pushProvenance() + + slog.Debug("Syncing data types...") + + url := baseUri + syncDataTypesPath + "?stationType=" + stationType + "&prn=" + prn + "&prv=" + prv + + postToWriter(dataTypes, url) + + slog.Debug("Syncing data types done.") +} + +func SyncStations(stationType string, stations []Station) { + pushProvenance() + + slog.Info("Syncing " + strconv.Itoa(len(stations)) + " " + stationType + " stations...") + url := baseUri + syncStationsPath + "/" + stationType + "?prn=" + prn + "&prv=" + prv + postToWriter(stations, url) + slog.Info("Syncing stations done.") +} + +func PushData(stationType string, dataMap DataMap) { + pushProvenance() + + slog.Info("Pushing records...") + url := baseUri + pushRecordsPath + "/" + stationType + "?prn=" + prn + "&prv=" + prv + postToWriter(dataMap, url) + slog.Info("Pushing records done.") +} + +func CreateDataType(name string, unit string, description string, rtype string) DataType { + // TODO add some checks + return DataType{ + Name: name, + Unit: unit, + Description: description, + Rtype: rtype, + } +} + +func CreateStation(id string, name string, stationType string, lat float64, lon float64, origin string) Station { + // TODO add some checks + var station = Station{ + Name: name, + StationType: stationType, + Latitude: lat, + Longitude: lon, + Origin: origin, + Id: id, + } + return station +} + +func CreateRecord(ts int64, value interface{}, period uint64) Record { + // TODO add some checks + var record = Record{ + Value: value, + Timestamp: ts, + Period: period, + } + return record +} + +func createDataMap() DataMap { + var dataMap = DataMap{ + Name: "(default)", + Provenance: provenanceUuid, + Branch: make(map[string]DataMap), + } + return dataMap +} + +// add an array of record to dataMap +func AddRecords(stationCode string, dataType string, records []Record, dataMap *DataMap) { + for _, record := range records { + AddRecord(stationCode, dataType, record, dataMap) + } +} + +// add one single record to dataMap +func AddRecord(stationCode string, dataType string, record Record, dataMap *DataMap) { + if dataMap.Name == "" { + *dataMap = createDataMap() + } + + if dataMap.Branch[stationCode].Name == "" { + dataMap.Branch[stationCode] = DataMap{ + Name: "(default)", + Branch: make(map[string]DataMap), + } + slog.Debug("new station in branch " + stationCode) + } + + if dataMap.Branch[stationCode].Branch[dataType].Name == "" { + dataMap.Branch[stationCode].Branch[dataType] = DataMap{ + Name: "(default)", + Data: []Record{record}, + } + // to assign a value to a struct in a map, this code part is needed + // https://stackoverflow.com/a/69006398/8794667 + } else if entry, ok := dataMap.Branch[stationCode].Branch[dataType]; ok { + entry.Data = append(entry.Data, record) + dataMap.Branch[stationCode].Branch[dataType] = entry + } +} + +func postToWriter(data interface{}, fullUrl string) (string, error) { + json, err := json.Marshal(data) + if err != nil { + slog.Error("error", err) + } + + client := http.Client{} + req, err := http.NewRequest("POST", fullUrl, bytes.NewBuffer(json)) + if err != nil { + slog.Error("error", err) + } + + req.Header = http.Header{ + "Content-Type": {"application/json"}, + "Authorization": {"Bearer " + GetToken()}, + } + + res, err := client.Do(req) + if err != nil { + slog.Error("error", err) + } + + slog.Info("Writer post response code: " + res.Status) + + scanner := bufio.NewScanner(res.Body) + for i := 0; scanner.Scan() && i < 5; i++ { + return scanner.Text(), nil + } + + err = scanner.Err() + if err != nil { + slog.Error("error", err) + } + return "", err +} + +func pushProvenance() { + if len(provenanceUuid) > 0 { + return + } + + slog.Info("Pushing provenance...") + slog.Info("prv: " + prv + " prn: " + prn) + + var provenance = Provenance{ + DataCollector: prn, + DataCollectorVersion: prv, + Lineage: origin, + } + + url := baseUri + provenancePath + "?&prn=" + prn + "&prv=" + prv + + res, err := postToWriter(provenance, url) + + if err != nil { + slog.Error("error", err) + } + + provenanceUuid = res + + slog.Info("Pushing provenance done.") +} diff --git a/traffic-a22-data-quality/src/dc/auth.go b/traffic-a22-data-quality/src/dc/auth.go new file mode 100644 index 00000000..f7025d0e --- /dev/null +++ b/traffic-a22-data-quality/src/dc/auth.go @@ -0,0 +1,102 @@ +// SPDX-FileCopyrightText: NOI Techpark + +// SPDX-License-Identifier: AGPL-3.0-or-later + +package dc + +import ( + "encoding/json" + "io" + "log/slog" + "net/http" + "net/url" + "os" + "strconv" + "strings" + "time" +) + +type Token struct { + AccessToken string `json:"access_token"` + ExpiresIn int64 `json:"expires_in"` + NotBeforePolicy int64 `json:"not-before-policy"` + RefreshExpiresIn int64 `json:"refresh_expires_in"` + TokenType string `json:"token_type"` + RefreshToken string `json:"refresh_token"` + Scope string +} + +var tokenUri string = os.Getenv("API_OAUTH_TOKEN_URI") +var clientId string = os.Getenv("API_OAUTH_CLIENT_ID") +var clientSecret string = os.Getenv("API_OAUTH_CLIENT_SECRET") +var username string = os.Getenv("API_OAUTH_USERNAME") +var password string = os.Getenv("API_OAUTH_PASSWORD") + +var token Token + +var tokenExpiry int64 + +func GetToken() string { + ts := time.Now().Unix() + + if len(token.AccessToken) == 0 || ts > tokenExpiry { + // if no token is available or refreshToken is expired, get new token + newToken() + } + + return token.AccessToken +} + +func newToken() { + slog.Info("Getting new API token...") + + authRequest() + + slog.Info("Getting new API token done.") +} + +func authRequest() { + params := url.Values{} + params.Add("grant_type", "password") + params.Add("client_id", clientId) + params.Add("client_secret", clientSecret) + params.Add("username", username) + params.Add("password", password) + + body := strings.NewReader(params.Encode()) + + req, err := http.NewRequest("GET", tokenUri, body) + if err != nil { + slog.Error("error", err) + return + } + + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + slog.Error("error", err) + return + } + defer resp.Body.Close() + + slog.Info("Auth response code is: " + strconv.Itoa(resp.StatusCode)) + if resp.StatusCode == http.StatusOK { + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + slog.Error("error", err) + return + } + + err = json.Unmarshal(bodyBytes, &token) + if err != nil { + slog.Error("error", err) + return + } + } + + // calculate token expiry timestamp with 600 seconds margin + tokenExpiry = time.Now().Unix() + token.ExpiresIn - 600 + + slog.Info("auth token expires in " + strconv.FormatInt(tokenExpiry, 10)) +} diff --git a/traffic-a22-data-quality/src/dc/data.go b/traffic-a22-data-quality/src/dc/data.go new file mode 100644 index 00000000..2bbdbb6f --- /dev/null +++ b/traffic-a22-data-quality/src/dc/data.go @@ -0,0 +1,105 @@ +// SPDX-FileCopyrightText: NOI Techpark + +// SPDX-License-Identifier: AGPL-3.0-or-later + +package dc + +import ( + "encoding/json" + "io" + "log/slog" + "net/http" + "strconv" +) + +type FacilityResponse struct { + Data FacilityData +} + +type FacilityData struct { + Status string + Facilities []Facility +} + +type Facility struct { + IdCompany int + FacilityId int + Description string + City string + Address string + ZIPCode string + Telephone1 string + Telephone2 string + PostNumber int + ReceiptMerchant string + Web string +} + +type FreePlaceResponse struct { + Data FreePlaceData +} + +type FreePlaceData struct { + Status string + FreePlaces []FreePlace +} + +type FreePlace struct { + FacilityId int + FacilityDescription string + ParkNo int + CountingCategoryNo int + CountingCategory string + FreeLimit int + OccupancyLimit int + CurrentLevel int + Reservation int + Capacity int + FreePlaces int +} + +const facilityUrl = "https://www.onecenter.info/api/DAZ/GetFacilities" +const freePlacesUrl = "https://www.onecenter.info/api/DAZ/FacilityFreePlaces?FacilityID=" + +func GetFacilityData() FacilityResponse { + var response FacilityResponse + getData(facilityUrl, &response) + return response +} + +func GetFreePlacesData(facilityId int) FreePlaceResponse { + var response FreePlaceResponse + getData(freePlacesUrl+strconv.Itoa(facilityId), &response) + return response +} + +func getData(url string, response interface{}) { + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + slog.Error("error", err) + } + req.Header = http.Header{ + "Content-Type": {"application/json"}, + "Authorization": {"Bearer " + GetToken()}, + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + slog.Error("error", err) + } + defer resp.Body.Close() + + slog.Info("Auth response code is: " + strconv.Itoa(resp.StatusCode)) + if resp.StatusCode == http.StatusOK { + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + slog.Error("error", err) + } + + err = json.Unmarshal(bodyBytes, &response) + if err != nil { + slog.Error("error", err) + } + } +} diff --git a/traffic-a22-data-quality/src/dc/job.go b/traffic-a22-data-quality/src/dc/job.go new file mode 100644 index 00000000..a8fe3adb --- /dev/null +++ b/traffic-a22-data-quality/src/dc/job.go @@ -0,0 +1,154 @@ +// SPDX-FileCopyrightText: NOI Techpark + +// SPDX-License-Identifier: AGPL-3.0-or-later + +package dc + +import ( + "log/slog" + "os" + "strconv" + "time" + "traffic-a22-data-quality/bdplib" +) + +const stationTypeParent string = "ParkingFacility" +const stationType string = "ParkingStation" + +const shortStay string = "short_stay" +const Subscribers string = "subscribers" + +const dataTypeFreeShort string = "free_" + shortStay +const dataTypeFreeSubs string = "free_" + Subscribers +const dataTypeFreeTotal string = "free" +const dataTypeOccupiedShort string = "occupied_" + shortStay +const dataTypeOccupiedSubs string = "occupied_" + Subscribers +const dataTypeOccupiedTotal string = "occupied" + +var origin string = os.Getenv("ORIGIN") + +const bzLat float64 = 46.49067 +const bzLon float64 = 11.33982 + +// GetFacilityData returns data for multiple companies; this identifier filters out STA +const identifier string = "STA – Strutture Trasporto Alto Adige SpA Via dei Conciapelli, 60 39100 Bolzano UID: 00586190217" + +func Job() { + var parentStations []bdplib.Station + // save stations by stationCode + stations := make(map[string]bdplib.Station) + + var dataMapParent bdplib.DataMap + var dataMap bdplib.DataMap + + facilities := GetFacilityData() + + ts := time.Now().UnixMilli() + + for _, facility := range facilities.Data.Facilities { + + if facility.ReceiptMerchant == identifier { + parentStationCode := strconv.Itoa(facility.FacilityId) + + parentStation := bdplib.CreateStation(parentStationCode, facility.Description, stationTypeParent, bzLat, bzLon, origin) + parentStation.MetaData = map[string]interface{}{ + "IdCompany": facility.FacilityId, + "City": facility.City, + "Address": facility.Address, + "ZIPCode": facility.ZIPCode, + "Telephone1": facility.Telephone1, + "Telephone2": facility.Telephone2, + } + parentStations = append(parentStations, parentStation) + + freePlaces := GetFreePlacesData(facility.FacilityId) + + // total facility measurements + freeTotalSum := 0 + occupiedTotalSum := 0 + capacityTotal := 0 + + // freeplaces is array of a single categories data + // if multiple parkNo exist, multiple entries for every parkNo and its categories exist + // so iterating over freeplaces and checking if the station with the parkNo has already been created is needed + for _, freePlace := range freePlaces.Data.FreePlaces { + // create ParkingStation + stationCode := parentStationCode + "_" + strconv.Itoa(freePlace.ParkNo) + station, ok := stations[stationCode] + if !ok { + station = bdplib.CreateStation(stationCode, facility.Description, stationType, bzLat, bzLon, origin) + station.ParentStation = parentStation.Id + station.MetaData = make(map[string]interface{}) + stations[stationCode] = station + slog.Debug("Create station " + stationCode) + } + + switch freePlace.CountingCategoryNo { + // Short Stay + case 1: + station.MetaData["free_limit_"+shortStay] = freePlace.FreeLimit + station.MetaData["occupancy_limit_"+shortStay] = freePlace.OccupancyLimit + station.MetaData["capacity_"+shortStay] = freePlace.Capacity + bdplib.AddRecord(stationCode, dataTypeFreeShort, bdplib.CreateRecord(ts, freePlace.FreePlaces, 600), &dataMap) + bdplib.AddRecord(stationCode, dataTypeOccupiedShort, bdplib.CreateRecord(ts, freePlace.CurrentLevel, 600), &dataMap) + // Subscribed + case 2: + station.MetaData["free_limit_"+Subscribers] = freePlace.FreeLimit + station.MetaData["occupancy_limit_"+Subscribers] = freePlace.OccupancyLimit + station.MetaData["capacity_"+Subscribers] = freePlace.Capacity + bdplib.AddRecord(stationCode, dataTypeFreeSubs, bdplib.CreateRecord(ts, freePlace.FreePlaces, 600), &dataMap) + bdplib.AddRecord(stationCode, dataTypeOccupiedSubs, bdplib.CreateRecord(ts, freePlace.CurrentLevel, 600), &dataMap) + // Total + default: + station.MetaData["free_limit"] = freePlace.FreeLimit + station.MetaData["occupancy_limit"] = freePlace.OccupancyLimit + station.MetaData["Capacity"] = freePlace.Capacity + bdplib.AddRecord(stationCode, dataTypeFreeTotal, bdplib.CreateRecord(ts, freePlace.FreePlaces, 600), &dataMap) + bdplib.AddRecord(stationCode, dataTypeOccupiedTotal, bdplib.CreateRecord(ts, freePlace.CurrentLevel, 600), &dataMap) + // total facility data + freeTotalSum += freePlace.FreePlaces + occupiedTotalSum += freePlace.CurrentLevel + capacityTotal += freePlace.Capacity + } + } + // assign total facility data, if data is not 0 + if freeTotalSum > 0 { + bdplib.AddRecord(parentStationCode, dataTypeFreeTotal, bdplib.CreateRecord(ts, freeTotalSum, 600), &dataMapParent) + } + if occupiedTotalSum > 0 { + bdplib.AddRecord(parentStationCode, dataTypeOccupiedTotal, bdplib.CreateRecord(ts, occupiedTotalSum, 600), &dataMapParent) + } + if capacityTotal > 0 { + parentStation.MetaData["Capacity"] = capacityTotal + } + } + } + bdplib.SyncStations(stationTypeParent, parentStations) + bdplib.SyncStations(stationType, values(stations)) + bdplib.PushData(stationTypeParent, dataMapParent) + bdplib.PushData(stationType, dataMap) +} + +func SyncDataTypes() { + var dataTypes []bdplib.DataType + // free + dataTypes = append(dataTypes, bdplib.CreateDataType(dataTypeFreeShort, "", "Amount of free 'short stay' parking slots", "Instantaneous")) + dataTypes = append(dataTypes, bdplib.CreateDataType(dataTypeFreeSubs, "", "Amount of free 'subscribed' parking slots", "Instantaneous")) + dataTypes = append(dataTypes, bdplib.CreateDataType(dataTypeFreeTotal, "", "Amount of free parking slots", "Instantaneous")) + // occupied + dataTypes = append(dataTypes, bdplib.CreateDataType(dataTypeOccupiedShort, "", "Amount of occupied 'short stay' parking slots", "Instantaneous")) + dataTypes = append(dataTypes, bdplib.CreateDataType(dataTypeOccupiedSubs, "", "Amount of occupied 'subscribed' parking slots", "Instantaneous")) + dataTypes = append(dataTypes, bdplib.CreateDataType(dataTypeOccupiedTotal, "", "Amount of occupied parking slots", "Instantaneous")) + + bdplib.SyncDataTypes(stationType, dataTypes) +} + +// to extract values array from map, without external dependency +// https://stackoverflow.com/questions/13422578/in-go-how-to-get-a-slice-of-values-from-a-map +func values[M ~map[K]V, K comparable, V any](m M) []V { + r := make([]V, 0, len(m)) + for _, v := range m { + r = append(r, v) + } + return r +} diff --git a/traffic-a22-data-quality/src/go.mod b/traffic-a22-data-quality/src/go.mod new file mode 100644 index 00000000..83a957dc --- /dev/null +++ b/traffic-a22-data-quality/src/go.mod @@ -0,0 +1,11 @@ +module traffic-a22-data-quality + +go 1.21.5 + +require github.com/go-co-op/gocron v1.37.0 + +require ( + github.com/google/uuid v1.6.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect + go.uber.org/atomic v1.11.0 // indirect +) diff --git a/traffic-a22-data-quality/src/go.sum b/traffic-a22-data-quality/src/go.sum new file mode 100644 index 00000000..e9e81420 --- /dev/null +++ b/traffic-a22-data-quality/src/go.sum @@ -0,0 +1,42 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-co-op/gocron v1.37.0 h1:ZYDJGtQ4OMhTLKOKMIch+/CY70Brbb1dGdooLEhh7b0= +github.com/go-co-op/gocron v1.37.0/go.mod h1:3L/n6BkO7ABj+TrfSVXLRzsP26zmikL4ISkLQ0O8iNY= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/traffic-a22-data-quality/src/log/log.go b/traffic-a22-data-quality/src/log/log.go new file mode 100644 index 00000000..0222bac4 --- /dev/null +++ b/traffic-a22-data-quality/src/log/log.go @@ -0,0 +1,43 @@ +// SPDX-FileCopyrightText: NOI Techpark + +// SPDX-License-Identifier: AGPL-3.0-or-later + +package log + +import ( + "log/slog" + "os" + "strings" +) + +// read logger level from env and uses INFO as default +func InitLogger() { + + logLevel := os.Getenv("LOG_LEVEL") + + level := new(slog.LevelVar) + + level.Set(parseLogLevel(logLevel)) + + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: level, + })) + slog.SetDefault(logger) + + slog.Info("Start logger with level: " + logLevel) +} + +func parseLogLevel(level string) slog.Level { + switch strings.ToUpper(level) { + case "DEBUG": + return slog.LevelDebug + case "WARNING": + return slog.LevelWarn + case "ERROR": + return slog.LevelError + case "INFO": + return slog.LevelInfo + default: + return slog.LevelInfo + } +} diff --git a/traffic-a22-data-quality/src/main.go b/traffic-a22-data-quality/src/main.go new file mode 100644 index 00000000..c4bc1d54 --- /dev/null +++ b/traffic-a22-data-quality/src/main.go @@ -0,0 +1,38 @@ +// SPDX-FileCopyrightText: (c) NOI Techpark + +// SPDX-License-Identifier: AGPL-3.0-or-later + +package main + +import ( + "log/slog" + "os" + "time" + + "traffic-a22-data-quality/dc" + "traffic-a22-data-quality/log" + + "github.com/go-co-op/gocron" +) + +func main() { + log.InitLogger() + + dc.SyncDataTypes() + + cron := os.Getenv("SCHEDULER_CRON") + slog.Debug("Cron defined as: " + cron) + + if len(cron) == 0 { + slog.Error("Cron job definition in env missing") + os.Exit(1) + } + + // call job once at startup + dc.Job() + + // start cron job + s := gocron.NewScheduler(time.UTC) + s.CronWithSeconds(cron).Do(dc.Job) + s.StartBlocking() +}