Skip to content

Commit

Permalink
traffic-a22-data-quality: initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
dulvui committed Jan 24, 2024
1 parent abe3036 commit 70526e7
Show file tree
Hide file tree
Showing 13 changed files with 1,107 additions and 0 deletions.
Empty file.
4 changes: 4 additions & 0 deletions traffic-a22-data-quality/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Traffic A22 data quality elaboration

This elaboration extends the traffic data of A22 by enriching the data with daily sums of passing vehicles per station and creating virtual parent stations grouping traffic stations with the same directions e.g. km 612 direction south + km 612 direction south fast lane

34 changes: 34 additions & 0 deletions traffic-a22-data-quality/calls.http
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
##### VSCODE / REST Client
# Create a .env file and set the corresponding variables
# See all $dotenv fields below

@username={{$dotenv API_OAUTH_USERNAME}}
@password={{$dotenv API_OAUTH_PASSWORD}}
@secret={{$dotenv API_OAUTH_CLIENT_SECRET}}
@clientId={{$dotenv API_OAUTH_CLIENT_ID}}



### Get access token for the writer (LOCAL DEV)
# @name login
GET https://www.onecenter.info/oauth/token
Content-Type: application/x-www-form-urlencoded

grant_type=password
&username={{username}}
&client_id={{clientId}}
&client_secret={{secret}}
&password={{password}}

### save token

@token = {{login.response.body.access_token}}

### GetFacilities
GET https://www.onecenter.info/api/DAZ/GetFacilities
Authorization: Bearer {{token}}

### FacilityID
GET https://www.onecenter.info/api/DAZ/FacilityFreePlaces
?FacilityID=608612
Authorization: Bearer {{login.response.body.access_token}}
96 changes: 96 additions & 0 deletions traffic-a22-data-quality/src/bdplib/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// SPDX-FileCopyrightText: NOI Techpark <[email protected]>

// SPDX-License-Identifier: AGPL-3.0-or-later

package bdplib

import (
"encoding/json"
"io"
"log/slog"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
)

type Token struct {
AccessToken string `json:"access_token"`
ExpiresIn int64 `json:"expires_in"`
NotBeforePolicy int64 `json:"not-before-policy"`
RefreshExpiresIn int64 `json:"refresh_expires_in"`
TokenType string `json:"token_type"`
RefreshToken string `json:"refresh_token"`
Scope string
}

var tokenUri string = os.Getenv("OAUTH_TOKEN_URI")
var clientId string = os.Getenv("OAUTH_CLIENT_ID")
var clientSecret string = os.Getenv("OAUTH_CLIENT_SECRET")

var token Token

var tokenExpiry int64

func GetToken() string {
ts := time.Now().Unix()

if len(token.AccessToken) == 0 || ts > tokenExpiry {
// if no token is available or refreshToken is expired, get new token
newToken()
}

return token.AccessToken
}

func newToken() {
slog.Info("Getting new token...")
params := url.Values{}
params.Add("client_id", clientId)
params.Add("client_secret", clientSecret)
params.Add("grant_type", "client_credentials")

authRequest(params)

slog.Info("Getting new token done.")
}

func authRequest(params url.Values) {
body := strings.NewReader(params.Encode())

req, err := http.NewRequest("POST", tokenUri, body)
if err != nil {
slog.Error("error", err)
return
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

resp, err := http.DefaultClient.Do(req)
if err != nil {
slog.Error("error", err)
return
}
defer resp.Body.Close()

slog.Info("Auth response code is: " + strconv.Itoa(resp.StatusCode))
if resp.StatusCode == http.StatusOK {
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
slog.Error("error", err)
return
}

err = json.Unmarshal(bodyBytes, &token)
if err != nil {
slog.Error("error", err)
return
}
}

// calculate token expiry timestamp with 600 seconds margin
tokenExpiry = time.Now().Unix() + token.ExpiresIn - 600

slog.Debug("auth token expires in " + strconv.FormatInt(tokenExpiry, 10))
}
239 changes: 239 additions & 0 deletions traffic-a22-data-quality/src/bdplib/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// SPDX-FileCopyrightText: NOI Techpark <[email protected]>

// SPDX-License-Identifier: AGPL-3.0-or-later

package bdplib

import (
"bufio"
"bytes"
"encoding/json"
"log/slog"
"net/http"
"os"
"strconv"
)

type Provenance struct {
Lineage string `json:"lineage"`
DataCollector string `json:"dataCollector"`
DataCollectorVersion string `json:"dataCollectorVersion"`
}

type DataType struct {
Name string `json:"name"`
Unit string `json:"unit"`
Description string `json:"description"`
Rtype string `json:"rType"`
Period uint32 `json:"period"`
MetaData map[string]string `json:"metaData"`
}

type Station struct {
Id string `json:"id"`
Name string `json:"name"`
StationType string `json:"stationType"`
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
Origin string `json:"origin"`
ParentStation string `json:"parentStation"`
MetaData map[string]interface{} `json:"metaData"`
}

type DataMap struct {
Name string `json:"name"`
Data []Record `json:"data"`
Branch map[string]DataMap `json:"branch"`
Provenance string `json:"provenance"`
}

type Record struct {
Value interface{} `json:"value"`
Period uint64 `json:"period"`
Timestamp int64 `json:"timestamp"`
}

const syncDataTypesPath string = "/syncDataTypes"
const syncStationsPath string = "/syncStations"
const pushRecordsPath string = "/pushRecords"
const getDateOfLastRecordPath string = "/getDateOfLastRecord"
const stationsPath string = "/stations"
const provenancePath string = "/provenance"

var provenanceUuid string

var baseUri string = os.Getenv("BASE_URI")

var prv string = os.Getenv("PROVENANCE_VERSION")
var prn string = os.Getenv("PROVENANCE_NAME")

var origin string = os.Getenv("ORIGIN")

func SyncDataTypes(stationType string, dataTypes []DataType) {
pushProvenance()

slog.Debug("Syncing data types...")

url := baseUri + syncDataTypesPath + "?stationType=" + stationType + "&prn=" + prn + "&prv=" + prv

postToWriter(dataTypes, url)

slog.Debug("Syncing data types done.")
}

func SyncStations(stationType string, stations []Station) {
pushProvenance()

slog.Info("Syncing " + strconv.Itoa(len(stations)) + " " + stationType + " stations...")
url := baseUri + syncStationsPath + "/" + stationType + "?prn=" + prn + "&prv=" + prv
postToWriter(stations, url)
slog.Info("Syncing stations done.")
}

func PushData(stationType string, dataMap DataMap) {
pushProvenance()

slog.Info("Pushing records...")
url := baseUri + pushRecordsPath + "/" + stationType + "?prn=" + prn + "&prv=" + prv
postToWriter(dataMap, url)
slog.Info("Pushing records done.")
}

func CreateDataType(name string, unit string, description string, rtype string) DataType {
// TODO add some checks
return DataType{
Name: name,
Unit: unit,
Description: description,
Rtype: rtype,
}
}

func CreateStation(id string, name string, stationType string, lat float64, lon float64, origin string) Station {
// TODO add some checks
var station = Station{
Name: name,
StationType: stationType,
Latitude: lat,
Longitude: lon,
Origin: origin,
Id: id,
}
return station
}

func CreateRecord(ts int64, value interface{}, period uint64) Record {
// TODO add some checks
var record = Record{
Value: value,
Timestamp: ts,
Period: period,
}
return record
}

func createDataMap() DataMap {
var dataMap = DataMap{
Name: "(default)",
Provenance: provenanceUuid,
Branch: make(map[string]DataMap),
}
return dataMap
}

// add an array of record to dataMap
func AddRecords(stationCode string, dataType string, records []Record, dataMap *DataMap) {
for _, record := range records {
AddRecord(stationCode, dataType, record, dataMap)
}
}

// add one single record to dataMap
func AddRecord(stationCode string, dataType string, record Record, dataMap *DataMap) {
if dataMap.Name == "" {
*dataMap = createDataMap()
}

if dataMap.Branch[stationCode].Name == "" {
dataMap.Branch[stationCode] = DataMap{
Name: "(default)",
Branch: make(map[string]DataMap),
}
slog.Debug("new station in branch " + stationCode)
}

if dataMap.Branch[stationCode].Branch[dataType].Name == "" {
dataMap.Branch[stationCode].Branch[dataType] = DataMap{
Name: "(default)",
Data: []Record{record},
}
// to assign a value to a struct in a map, this code part is needed
// https://stackoverflow.com/a/69006398/8794667
} else if entry, ok := dataMap.Branch[stationCode].Branch[dataType]; ok {
entry.Data = append(entry.Data, record)
dataMap.Branch[stationCode].Branch[dataType] = entry
}
}

func postToWriter(data interface{}, fullUrl string) (string, error) {
json, err := json.Marshal(data)
if err != nil {
slog.Error("error", err)
}

client := http.Client{}
req, err := http.NewRequest("POST", fullUrl, bytes.NewBuffer(json))
if err != nil {
slog.Error("error", err)
}

req.Header = http.Header{
"Content-Type": {"application/json"},
"Authorization": {"Bearer " + GetToken()},
}

res, err := client.Do(req)
if err != nil {
slog.Error("error", err)
}

slog.Info("Writer post response code: " + res.Status)

scanner := bufio.NewScanner(res.Body)
for i := 0; scanner.Scan() && i < 5; i++ {
return scanner.Text(), nil
}

err = scanner.Err()
if err != nil {
slog.Error("error", err)
}
return "", err
}

func pushProvenance() {
if len(provenanceUuid) > 0 {
return
}

slog.Info("Pushing provenance...")
slog.Info("prv: " + prv + " prn: " + prn)

var provenance = Provenance{
DataCollector: prn,
DataCollectorVersion: prv,
Lineage: origin,
}

url := baseUri + provenancePath + "?&prn=" + prn + "&prv=" + prv

res, err := postToWriter(provenance, url)

if err != nil {
slog.Error("error", err)
}

provenanceUuid = res

slog.Info("Pushing provenance done.")
}
Loading

0 comments on commit 70526e7

Please sign in to comment.