Skip to content

Commit

Permalink
adding snapshot source support (#20)
Browse files Browse the repository at this point in the history
* adding snapshot source support

* resolving merge

* fix issue

* fix

* make sure all paths are valid

* source updated to correct field type

* adding more tests for inflate sources

* fix date range logic

* added export step tests

* added snapshot source to e2e tests

* bump version and upd readme
  • Loading branch information
stanislav-bedunkevich authored Sep 19, 2023
1 parent f2bf2a4 commit 54cca40
Show file tree
Hide file tree
Showing 19 changed files with 771 additions and 53 deletions.
19 changes: 18 additions & 1 deletion cli-e2e-test/config/model/scenario1.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@
"jsonl"
],
"loadsNumberOfDays": 60
},
{
"relation": "device_seen_snapshot",
"isChunkPartitioned": true,
"isDatePartitioned": true,
"relativePath": "device_seen_snapshot",
"inputFormat": "csv",
"loadsNumberOfDays": 1,
"offsetByNumberOfDays": 1,
"snapshotValidityDays": 3
}
]
},
Expand All @@ -41,7 +51,8 @@
"device.rel",
"zip.rel",
"store.rel",
"json_schema_mapping.rel"
"json_schema_mapping.rel",
"device_seen.rel"
]
},
{
Expand Down Expand Up @@ -78,6 +89,12 @@
"type": "csv",
"configRelName": "stores_csv",
"relativePath": "stores"
},
{
"type": "csv",
"configRelName": "device_seen_snapshot_csv",
"relativePath": "device_seen_snapshot",
"snapshotBinding": "device_seen_snapshot"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
device,last_seen
IPhone1,2021-12-31
IPhone2,2021-12-30
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
device,last_seen
IPhone1,2022-01-05
IPhone2,2022-01-05
9 changes: 9 additions & 0 deletions cli-e2e-test/rel/config/config1.rel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ module export_config
1, :name
}
end

module device_seen_snapshot_csv
def data = device_seen_snapshot_updated

def syntax:header = {
1, :device ;
2, :last_seen
}
end
end

def part_resource_date_pattern = "^(.+)/data_dt=(?<date>[0-9]+)/(.+).(csv|json|jsonl)$"
Expand Down
34 changes: 34 additions & 0 deletions cli-e2e-test/rel/device_seen.rel
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
bound import_config:device_seen_snapshot:schema
bound syntax:header

module device_seen_snapshot
def DEVICE_NAME[idx, row] = source_catalog:device_seen_snapshot[idx, :device, row]
def LAST_SEEN[idx, row] = source_catalog:device_seen_snapshot[idx, :last_seen, row]
end

/*
* device_seen snapshot is a snapshot of the last time a device was seen, which is
* essentially an aggregation over the snapshot data and the current day's data
* found in the `device` relation.
*/

def current_date = max[source:spans[_]]
def device_seen_today(n, t) {
device:name(n) and
t = current_date
}

def device_last_seen[d] = max[t:
device_seen_today(d, t)
or
(
device_seen_snapshot:DEVICE_NAME(idx, row, d) and
device_seen_snapshot:LAST_SEEN(idx, row, t)
from idx, row
)
]

module device_seen_snapshot_updated
def device(k, d) { device_last_seen(d, _) and k = d }
def last_seen(d, t) { device_last_seen(d, t) }
end
1 change: 0 additions & 1 deletion cli-e2e-test/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class CliE2ETest(unittest.TestCase):
def test_scenario1_model(self):
# when
test_args = ["--batch-config", "./config/model/scenario1.json",
"--start-date", "20220103",
"--end-date", "20220105",
"--drop-db"]
rsp = call(self.cmd_with_common_arguments + test_args)
Expand Down
2 changes: 2 additions & 0 deletions rel/batch_config/workflow/steps/configure_sources.rel
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ module batch_source
def step(src, st) { batch_source_name:step_source_name_to_source(st, _, src) }
def extensions(src, e) { extract_value:extensions(src, :[], _, e) }
def partitioned(src) { extract_value:partitioned(src, boolean_true) }
def relation = extract_value:relation
def relative_path = extract_value:relativePath
def input_format = extract_value:inputFormat
def loads_number_of_days = extract_value:loadsNumberOfDays
def snapshot_validity_days = extract_value:snapshotValidityDays

@inline
def extract_value[A](src, val...) {
Expand Down
1 change: 1 addition & 0 deletions rel/batch_config/workflow/steps/export.rel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ module batch_export
def type = extract_value:type
def relative_path = extract_value:relativePath
def offset_by_number_of_days = extract_value:offsetByNumberOfDays
def snapshot_binding = extract_value:snapshotBinding

@inline
def extract_value[A](e, val...) {
Expand Down
Loading

0 comments on commit 54cca40

Please sign in to comment.