Skip to content

Commit

Permalink
More kdvh updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Lun4m committed Nov 8, 2024
1 parent f13150d commit 8caf453
Show file tree
Hide file tree
Showing 12 changed files with 590 additions and 611 deletions.
1 change: 1 addition & 0 deletions migrations/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ migrate
tables*/
test_*/
.env
dumps/
69 changes: 44 additions & 25 deletions migrations/kdvh/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
"github.com/rickb777/period"
)

// Caches all the metadate needed for import.
// If any error occurs here the program will exit.
func (config *ImportConfig) CacheMetadata(kdvh *KDVH) {
config.CacheStinfo(kdvh)
config.cacheKDVH(kdvh)
// Caches all the metadata needed for import.
// If any error occurs inside here the program will exit.
func (config *ImportConfig) CacheMetadata() {
config.cacheStinfo()
config.cacheKDVH()
config.cacheParamOffsets()
}

Expand All @@ -27,26 +27,45 @@ type StinfoKey struct {
TableName string
}

func NewStinfoKey(elem, table string) StinfoKey {
return StinfoKey{ElemCode: elem, TableName: table}
// Subset of StinfoQuery with only param info
type StinfoParam struct {
TypeID int32
ParamID int32
Hlevel *int32
Sensor int32
Fromtime time.Time
IsScalar bool
}

// Query from Stinfosys elem_map_cfnames_param
type Metadata struct {
// Struct holding query from Stinfosys elem_map_cfnames_param
type StinfoQuery struct {
ElemCode string `db:"elem_code"`
TableName string `db:"table_name"`
TypeID int32 `db:"typeid"`
ParamID int32 `db:"paramid"`
Hlevel *int32 `db:"hlevel"`
Sensor int32 `db:"sensor"`
Fromtime time.Time `db:"fromtime"`
Scalar bool `db:"scalar"`
// totime *time.Time
IsScalar bool `db:"scalar"`
}

func (q *StinfoQuery) toParam() StinfoParam {
return StinfoParam{
TypeID: q.TypeID,
ParamID: q.ParamID,
Hlevel: q.Hlevel,
Sensor: q.Sensor,
Fromtime: q.Fromtime,
IsScalar: q.IsScalar,
}
}
func (q *StinfoQuery) toKey() StinfoKey {
return StinfoKey{q.ElemCode, q.TableName}
}

// Save metadata for later use by quering Stinfosys
func (config *ImportConfig) CacheStinfo(db *KDVH) {
cache := make(map[StinfoKey]Metadata)
func (config *ImportConfig) cacheStinfo() {
cache := make(map[StinfoKey]StinfoParam)

fmt.Println("Connecting to Stinfosys to cache metadata")
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
Expand All @@ -59,7 +78,7 @@ func (config *ImportConfig) CacheStinfo(db *KDVH) {
}
defer conn.Close(context.TODO())

for _, table := range db.Tables {
for _, table := range KDVH {
if config.Tables != nil && !slices.Contains(config.Tables, table.TableName) {
continue
}
Expand All @@ -76,14 +95,14 @@ func (config *ImportConfig) CacheStinfo(db *KDVH) {
os.Exit(1)
}

metas, err := pgx.CollectRows(rows, pgx.RowToStructByName[Metadata])
metas, err := pgx.CollectRows(rows, pgx.RowToStructByName[StinfoQuery])
if err != nil {
slog.Error(err.Error())
os.Exit(1)
}

for _, meta := range metas {
cache[StinfoKey{meta.ElemCode, meta.TableName}] = meta
cache[meta.toKey()] = meta.toParam()
}
}

Expand All @@ -100,6 +119,12 @@ func newKDVHKey(elem, table string, stnr int32) KDVHKey {
return KDVHKey{StinfoKey{ElemCode: elem, TableName: table}, stnr}
}

// Timespan stored in KDVH for a given (table, station, element) triplet
type Timespan struct {
FromTime *time.Time `db:"fdato"`
ToTime *time.Time `db:"tdato"`
}

// Struct used to deserialize KDVH query in cacheKDVH
type MetaKDVH struct {
ElemCode string `db:"elem_code"`
Expand All @@ -117,13 +142,7 @@ func (m *MetaKDVH) toKey() KDVHKey {
return KDVHKey{StinfoKey{ElemCode: m.ElemCode, TableName: m.TableName}, m.Station}
}

// Timespan stored in KDVH for a given (table, station, element) triplet
type Timespan struct {
FromTime *time.Time `db:"fdato"`
ToTime *time.Time `db:"tdato"`
}

func (config *ImportConfig) cacheKDVH(db *KDVH) {
func (config *ImportConfig) cacheKDVH() {
cache := make(map[KDVHKey]Timespan)

fmt.Println("Connecting to KDVH proxy to cache metadata")
Expand All @@ -137,7 +156,7 @@ func (config *ImportConfig) cacheKDVH(db *KDVH) {
}
defer conn.Close(context.TODO())

for _, t := range db.Tables {
for _, t := range KDVH {
if config.Tables != nil && !slices.Contains(config.Tables, t.TableName) {
continue
}
Expand Down Expand Up @@ -170,7 +189,7 @@ func (config *ImportConfig) cacheKDVH(db *KDVH) {
config.KDVHMap = cache
}

// how to modify the obstime (in KDVH) for certain paramid
// Caches how to modify the obstime (in KDVH) for certain paramids
func (config *ImportConfig) cacheParamOffsets() {
cache := make(map[StinfoKey]period.Period)

Expand Down
98 changes: 35 additions & 63 deletions migrations/kdvh/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"slices"
"strings"

"github.com/schollz/progressbar/v3"
_ "github.com/jackc/pgx/v5/stdlib"

"migrate/utils"
)
Expand Down Expand Up @@ -45,15 +45,16 @@ func (config *DumpConfig) setup() {
}
}

func (config *DumpConfig) Execute(_ []string) error {
func (config *DumpConfig) Execute([]string) error {
config.setup()

conn, err := sql.Open("pgx", os.Getenv("KDVH_PROXY_CONN"))
if err != nil {
slog.Error(err.Error())
return nil
}

kdvh := InitDB()
for _, table := range kdvh.Tables {
for _, table := range KDVH {
if config.Tables != nil && !slices.Contains(config.Tables, table.TableName) {
continue
}
Expand All @@ -63,22 +64,11 @@ func (config *DumpConfig) Execute(_ []string) error {
return nil
}

func (db *KDVH) Dump(conn *sql.DB, config *DumpConfig) {
for _, table := range db.Tables {
if config.Tables != nil && !slices.Contains(config.Tables, table.TableName) {
continue
}
table.Dump(conn, config)
}
}

func (table *KDVHTable) Dump(conn *sql.DB, config *DumpConfig) {
func (table *Table) Dump(conn *sql.DB, config *DumpConfig) {
defer utils.SendEmailOnPanic(fmt.Sprintf("%s dump", table.TableName), config.Email)

// TODO: should probably do it at the station/element level?
// NOTE: '_combined' kept for backward compatibility with original scripts
outdir := filepath.Join(config.BaseDir, table.TableName+"_combined")
if _, err := os.ReadDir(outdir); err == nil && !config.Overwrite {
table.Path = filepath.Join(config.BaseDir, table.Path)
if _, err := os.ReadDir(table.Path); err == nil && !config.Overwrite {
slog.Info(fmt.Sprint("Skipping data dump of ", table.TableName, " because dumped folder already exists"))
return
}
Expand All @@ -89,56 +79,42 @@ func (table *KDVHTable) Dump(conn *sql.DB, config *DumpConfig) {
return
}

bar := progressbar.NewOptions(len(elements),
progressbar.OptionOnCompletion(func() { fmt.Println() }),
progressbar.OptionSetDescription(table.TableName),
progressbar.OptionShowCount(),
progressbar.OptionSetPredictTime(false),
progressbar.OptionShowElapsedTimeOnFinish(),
progressbar.OptionSetTheme(progressbar.Theme{
Saucer: "=",
SaucerHead: ">",
SaucerPadding: " ",
BarStart: "[",
BarEnd: "]",
}),
)
bar := utils.NewBar(len(elements), table.TableName)

// TODO: should be safe to spawn goroutines/waitgroup here with connection pool?
bar.RenderBlank()
for _, element := range elements {
table.dumpElement(element, outdir, conn, config)
table.dumpElement(element, conn, config)
bar.Add(1)
}
}

func (table *KDVHTable) dumpElement(element, outdir string, conn *sql.DB, config *DumpConfig) {
// TODO: maybe we don't do this? Or can we use pgdump/copy?
// The problem is that there are no indices on the tables, that's why the queries are super slow
//
// Dumping the whole table might be a lot faster (for T_MDATA it's ~10 times faster!):
//
// copyQuery := fmt.SPrintf("\\copy t_mdata TO '%s/%s.csv' WITH CSV HEADER", config.BaseDir, table.TableName)
// cmd := exec.Command("psql", CONN_STRING, "-c", copyQuery)
// cmd.Stderr = &bytes.Buffer{}
// err = cmd.Run()
//
// but it might be more difficult to recover if something goes wrong?
// TODO: maybe we don't do this? Or can we use pgdump/copy?
// The problem is that there are no indices on the tables, that's why the queries are super slow
// Dumping the whole table might be a lot faster (for T_MDATA it's ~10 times faster!),
// but it might be more difficult to recover if something goes wrong?
// =>
// copyQuery := fmt.SPrintf("\\copy t_mdata TO '%s/%s.csv' WITH CSV HEADER", config.BaseDir, table.TableName)
// cmd := exec.Command("psql", CONN_STRING, "-c", copyQuery)
// cmd.Stderr = &bytes.Buffer{}
// err = cmd.Run()
func (table *Table) dumpElement(element string, conn *sql.DB, config *DumpConfig) {
stations, err := table.getStationsWithElement(element, conn, config)
if err != nil {
slog.Error(fmt.Sprintf("Could not fetch stations for table %s: %v", table.TableName, err))
return
}

for _, station := range stations {
path := filepath.Join(outdir, string(station))
path := filepath.Join(table.Path, string(station))
if err := os.MkdirAll(path, os.ModePerm); err != nil {
slog.Error(err.Error())
return
}

err := table.dumpFunc(
dumpFuncArgs{
path: path,
path,
DumpMeta{
element: element,
station: station,
dataTable: table.TableName,
Expand All @@ -154,8 +130,8 @@ func (table *KDVHTable) dumpElement(element, outdir string, conn *sql.DB, config
}
}

// Fetches the element and filters the user input
func (table *KDVHTable) getElements(conn *sql.DB, config *DumpConfig) ([]string, error) {
// Fetches elements and filters them based on user input
func (table *Table) getElements(conn *sql.DB, config *DumpConfig) ([]string, error) {
elements, err := table.fetchElements(conn)
if err != nil {
return nil, err
Expand All @@ -166,12 +142,12 @@ func (table *KDVHTable) getElements(conn *sql.DB, config *DumpConfig) ([]string,
}

// List of columns that we do not need to select when extracting the element codes from a KDVH table
// TODO: should typeid be here?
var INVALID_COLUMNS = []string{"dato", "stnr", "typeid", "season", "xxx"}

// Fetch column names for a given table
// We skip the columns defined in INVALID_COLUMNS and all columns that contain the 'kopi' string
func (table *KDVHTable) fetchElements(conn *sql.DB) ([]string, error) {
// TODO: should we dump these invalid/kopi elements even if we are not importing them?
func (table *Table) fetchElements(conn *sql.DB) (elements []string, err error) {
slog.Info(fmt.Sprintf("Fetching elements for %s...", table.TableName))

// TODO: not sure why we only dump these two for this table
Expand All @@ -194,7 +170,6 @@ func (table *KDVHTable) fetchElements(conn *sql.DB) ([]string, error) {
}
defer rows.Close()

var elements []string
for rows.Next() {
var name string
if err = rows.Scan(&name); err != nil {
Expand All @@ -206,8 +181,8 @@ func (table *KDVHTable) fetchElements(conn *sql.DB) ([]string, error) {
return elements, rows.Err()
}

// Fetches the stations and filters the user input
func (table *KDVHTable) getStationsWithElement(element string, conn *sql.DB, config *DumpConfig) ([]string, error) {
// Fetches station numbers and filters them based on user input
func (table *Table) getStationsWithElement(element string, conn *sql.DB, config *DumpConfig) ([]string, error) {
stations, err := table.fetchStationsWithElement(element, conn)
if err != nil {
return nil, err
Expand All @@ -218,9 +193,9 @@ func (table *KDVHTable) getStationsWithElement(element string, conn *sql.DB, con
return stations, nil
}

// Fetches the unique station numbers in the table
// Fetches all unique station numbers in the table
// FIXME: the DISTINCT query can be extremely slow
func (table *KDVHTable) fetchStationNumbers(conn *sql.DB) ([]string, error) {
func (table *Table) fetchStationNumbers(conn *sql.DB) (stations []string, err error) {
slog.Info(fmt.Sprint("Fetching station numbers (this can take a while)..."))

query := fmt.Sprintf(
Expand All @@ -234,7 +209,6 @@ func (table *KDVHTable) fetchStationNumbers(conn *sql.DB) ([]string, error) {
}
defer rows.Close()

stations := make([]string, 0)
for rows.Next() {
var stnr string
if err := rows.Scan(&stnr); err != nil {
Expand All @@ -246,10 +220,9 @@ func (table *KDVHTable) fetchStationNumbers(conn *sql.DB) ([]string, error) {
return stations, rows.Err()
}

// Fetches the unique station numbers in the table for a given element (when that element is not null)
// NOTE: splitting by element does make it a bit better,
// because we avoid quering for tables that have no data or flag for that element
func (table *KDVHTable) fetchStationsWithElement(element string, conn *sql.DB) ([]string, error) {
// Fetches the unique station numbers in the table for a given element (and when that element is not null)
// NOTE: splitting by element does make it a bit better, because we avoid quering for stations that have no data or flag for that element?
func (table *Table) fetchStationsWithElement(element string, conn *sql.DB) (stations []string, err error) {
slog.Info(fmt.Sprintf("Fetching station numbers for %s (this can take a while)...", element))

query := fmt.Sprintf(
Expand All @@ -264,7 +237,6 @@ func (table *KDVHTable) fetchStationsWithElement(element string, conn *sql.DB) (
}
defer rows.Close()

stations := make([]string, 0)
for rows.Next() {
var stnr string
if err := rows.Scan(&stnr); err != nil {
Expand Down
Loading

0 comments on commit 8caf453

Please sign in to comment.