Skip to content

Commit

Permalink
wip traffic data quality
Browse files Browse the repository at this point in the history
  • Loading branch information
clezag committed Feb 2, 2024
1 parent cd3ee59 commit a96cfbd
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 20 deletions.
4 changes: 2 additions & 2 deletions traffic-a22-data-quality/calls.http
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ Authorization: Bearer {{token}}
### Get candidate stations
GET {{ninja_url}}/v2/tree/TrafficSensor/Nr. Light Vehicles,Nr. Heavy Vehicles,Nr. Buses/latest
?where=mperiod.in.(600,86400)
&select=mperiod,mvalidtime
&select=mperiod,mvalidtime,mtransactiontime
&limit=-1
Authorization: Bearer {{token}}


### Get single station measurement history
GET {{ninja_url}}/v2/flat/TrafficSensor/Nr. Light Vehicles,Nr. Heavy Vehicles,Nr. Buses/2019-12-01/2020-03-01
?where=and(mperiod.eq.600,scode.eq."A22:1871:1")
&select=mvalue,mvalidtime,tname
&select=scode,mvalue,tname
&limit=20
Authorization: Bearer {{token}}
49 changes: 31 additions & 18 deletions traffic-a22-data-quality/src/dc/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,22 @@ import (
"fmt"
"log/slog"
"os"
"strconv"
"time"
"traffic-a22-data-quality/ninjalib"
)

var origin string = os.Getenv("ORIGIN")

var queryLatest = os.Getenv("NINJA_QUERY_LATEST")
var queryLatest string = os.Getenv("NINJA_QUERY_LATEST")
var ninjaLimit, _ = strconv.Atoi(os.Getenv("NINJA_QUERY_LIMIT"))

const periodBase = 600
const periodAggregate = 86400

type NinjaMeasurement struct {
Period int64 `json:"mperiod"`
Timestamp ninjalib.NinjaTime `json:"mvalidtime"`
Period int64 `json:"mperiod"`
Time ninjalib.NinjaTime `json:"mvalidtime"`
Since ninjalib.NinjaTime `json:"mtransactiontime"`
}

type NinjaTreeData = map[string]struct { // key = stationtype
Expand All @@ -33,10 +35,11 @@ type NinjaTreeData = map[string]struct { // key = stationtype
} `json:"stations"`
}

type NinjaFlatData = []struct {
type NinjaFlatData = struct {
StationCode string `json:"scode"`
TypeName string `json:"tname"`
Timestamp ninjalib.NinjaTime `json:"_timestamp"`
Value uint64 `json:"mvalue"`
}

func Job() {
Expand All @@ -60,6 +63,7 @@ func sumJob() {
}

type todoStation struct {
firstBase time.Time
lastBase time.Time
lastAggregate time.Time
}
Expand All @@ -70,13 +74,15 @@ func sumJob() {
for stationCode, station := range stations.Stations {
for typeName, dataType := range station.Datatypes {
for _, m := range dataType.Measurements {
var firstBase time.Time
var lastBase time.Time
var lastAggregate time.Time
if m.Period == periodBase {
lastBase = m.Timestamp.Time
lastBase = m.Time.Time
firstBase = m.Since.Time
}
if m.Period == periodAggregate {
lastAggregate = m.Timestamp.Time
lastAggregate = m.Time.Time
}

// Determine which stations we have to make history requests for
Expand All @@ -85,6 +91,7 @@ func sumJob() {
todos[stationCode] = make(map[string]todoStation)
}
todos[stationCode][typeName] = todoStation{
firstBase: firstBase,
lastBase: lastBase,
lastAggregate: lastAggregate,
}
Expand All @@ -101,21 +108,27 @@ func sumJob() {
// get data history from starting point until last EOD
for stationCode, typeMap := range todos {
for typeName, todo := range typeMap {
// for every n months
// start with day lastAggregate = +1 until EOD lastBase
start := stripToDay(todo.lastAggregate).AddDate(0, 0, 1)
end := stripToDay(todo.lastBase)
var rest []NinjaFlatData
thereMightBeMore := true
for thereMightBeMore {
start := stripToDay(todo.lastAggregate).AddDate(0, 0, 1)
end := stripToDay(todo.lastBase)

for start.Compare(end) < 0 {
// request end = start + 3 months
res, err := getNinjaData(stationCode, typeName, start, end)
if err != nil {
slog.Error("Error requesting data from Ninja", err)
return
}
debugLogJson(res)

// debugLogJson(res)
if len(rest) == 0 {
rest := res.Data
} else {
rest := append(rest, res.Data...)
}
thereMightBeMore = res.Limit == int64(len(res.Data))
}

// do the things
}
}
// for each calendar day:
Expand All @@ -124,16 +137,16 @@ func sumJob() {
// create combined sum measurement
}

func getNinjaData(stationCode string, typeName string, from time.Time, to time.Time) (*ninjalib.NinjaResponse[NinjaFlatData], error) {
func getNinjaData(stationCode string, typeName string, from time.Time, to time.Time) (*ninjalib.NinjaResponse[[]NinjaFlatData], error) {
req := ninjalib.DefaultNinjaRequest()
req.AddDataType(typeName)
req.From = from
req.To = to
req.Select = "scode,mvalue,tname"
req.Where = fmt.Sprintf("and(mperiod.eq.%d,scode.eq.\"%s\")", periodBase, stationCode)
req.Limit = -1
req.Limit = 10 //int64(ninjaLimit)

var res *ninjalib.NinjaResponse[NinjaFlatData]
res := &ninjalib.NinjaResponse[[]NinjaFlatData]{}

err := ninjalib.HistoryRequest(req, res)
return res, err
Expand Down

0 comments on commit a96cfbd

Please sign in to comment.