Skip to content

Commit

Permalink
Added chunk partitioning support for non date partitioned sources (#30)
Browse files Browse the repository at this point in the history
* Added chunk partitioning support for non date partitioned sources
  • Loading branch information
AndrewNepogoda authored Oct 3, 2023
1 parent 89b98ae commit 3114bde
Show file tree
Hide file tree
Showing 21 changed files with 189 additions and 27 deletions.
16 changes: 15 additions & 1 deletion cli-e2e-test/config/model/scenario1.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@
],
"loadsNumberOfDays": 60
},
{
"relation": "product_data",
"isChunkPartitioned": true,
"isDatePartitioned": false,
"relativePath": "product",
"inputFormat": "csv"
},
{
"relation": "device_seen_snapshot",
"isChunkPartitioned": true,
Expand All @@ -52,6 +59,7 @@
"device.rel",
"zip.rel",
"store.rel",
"product.rel",
"json_schema_mapping.rel",
"device_seen.rel"
]
Expand All @@ -66,7 +74,8 @@
"relations": [
"city:name",
"device:name",
"store:name"
"store:name",
"product:name"
],
"materializeJointly": true
},
Expand All @@ -87,6 +96,11 @@
"configRelName": "devices_csv",
"relativePath": "devices"
},
{
"type": "csv",
"configRelName": "products_csv",
"relativePath": "products"
},
{
"type": "csv",
"configRelName": "stores_csv",
Expand Down
7 changes: 7 additions & 0 deletions cli-e2e-test/config/model/scenario2.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
"relativePath": "city",
"inputFormat": "csv",
"loadsNumberOfDays": 60
},
{
"relation": "product_data",
"isChunkPartitioned": true,
"isDatePartitioned": false,
"relativePath": "product",
"inputFormat": "csv"
}
]
}
Expand Down
7 changes: 7 additions & 0 deletions cli-e2e-test/config/model/scenario3.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
"relativePath": "temp/city",
"inputFormat": "csv",
"loadsNumberOfDays": 60
},
{
"relation": "product_data",
"isChunkPartitioned": true,
"isDatePartitioned": false,
"relativePath": "product",
"inputFormat": "csv"
}
]
}
Expand Down
34 changes: 34 additions & 0 deletions cli-e2e-test/config/model/scenario5.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"workflow": [
{
"type": "ConfigureSources",
"name": "ConfigureSources",
"configFiles": [
"config/config2.rel"
],
"defaultContainer": "input",
"sources": [
{
"relation": "zip_city_state_master_data",
"relativePath": "master_source/zip_city_state",
"inputFormat": "csv"
},
{
"relation": "city_data",
"isChunkPartitioned": true,
"isDatePartitioned": true,
"relativePath": "city",
"inputFormat": "csv",
"loadsNumberOfDays": 60
},
{
"relation": "product_data",
"isChunkPartitioned": true,
"isDatePartitioned": false,
"relativePath": "temp/product",
"inputFormat": "csv"
}
]
}
]
}
26 changes: 26 additions & 0 deletions cli-e2e-test/config/model/scenario6.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"workflow": [
{
"type": "ConfigureSources",
"name": "ConfigureSources",
"configFiles": [
"config/config2.rel"
],
"defaultContainer": "input",
"sources": [
{
"relation": "zip_city_state_master_data",
"relativePath": "master_source/zip_city_state",
"inputFormat": "csv"
},
{
"relation": "product_data",
"isChunkPartitioned": true,
"isDatePartitioned": false,
"relativePath": "temp/product",
"inputFormat": "csv"
}
]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
product
mobile_phone
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
product
router
adapter
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
name
mobile_phone
router
adapter
10 changes: 9 additions & 1 deletion cli-e2e-test/rel/config/config1.rel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ module export_config
}
end

module products_csv
def data = product_info

def syntax:header = {
1, :name
}
end

module device_seen_snapshot_csv
def data = device_seen_snapshot_updated

Expand All @@ -35,5 +43,5 @@ module export_config
end

def part_resource_date_pattern = "^(.+)/data_dt=(?<date>[0-9]+)/(.+).(csv|json|jsonl)$"
def part_resource_index_pattern = "^(.+)/data_dt=(.+)/part-(?<shard>[0-9]).(csv|json|jsonl)$"
def part_resource_index_pattern = "^(.+)/part-(?<shard>[0-9])-(.+).(csv|json|jsonl)$"
def part_resource_index_multiplier = 100000
2 changes: 1 addition & 1 deletion cli-e2e-test/rel/config/config2.rel
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
def part_resource_date_pattern = "^(.+)/data_dt=(?<date>[0-9]+)/(.+).(csv|json|jsonl)$"
def part_resource_index_pattern = "^(.+)/data_dt=(.+)/part-(?<shard>[0-9])-(.+).(csv|json|jsonl)$"
def part_resource_index_pattern = "^(.+)/part-(?<shard>[0-9])-(.+).(csv|json|jsonl)$"
def part_resource_index_multiplier = 100000
2 changes: 1 addition & 1 deletion cli-e2e-test/rel/config/config3.rel
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ module export_config
end

def part_resource_date_pattern = "^(.+)/data_dt=(?<date>[0-9]+)/(.+).(csv|json|jsonl)$"
def part_resource_index_pattern = "^(.+)/data_dt=(.+)/part-(?<shard>[0-9])-(.+).(csv|json|jsonl)$"
def part_resource_index_pattern = "^(.+)/part-(?<shard>[0-9])-(.+).(csv|json|jsonl)$"
def part_resource_index_multiplier = 100000
7 changes: 7 additions & 0 deletions cli-e2e-test/rel/product.rel
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module product_data
def PRODUCT_NAME[idx, row] = source_catalog:product_data[idx, :product, row]
end

module product_info
def name = product_data:PRODUCT_NAME[_, _]
end
50 changes: 48 additions & 2 deletions cli-e2e-test/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def test_scenario2_model_force_reimport(self):
self.assertEqual(rsp_json, [{'partition': 2023090800001, 'relation': 'city_data'},
{'partition': 2023090800002, 'relation': 'city_data'},
{'partition': 2023090900001, 'relation': 'city_data'},
{'partition': 1, 'relation': 'product_data'},
{'partition': 2, 'relation': 'product_data'},
{'relation': 'zip_city_state_master_data'}])

def test_scenario2_model_force_reimport_chunk_partitioned(self):
Expand All @@ -87,7 +89,7 @@ def test_scenario2_model_force_reimport_chunk_partitioned(self):
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, RESOURCES_TO_DELETE_REL)
self.assertEqual(rsp_json, [{'relation': 'zip_city_state_master_data'}])

def test_scenario3_model_single_partition_change(self):
def test_scenario3_model_single_partition_change_for_date_partitioned(self):
# when
test_args = ["--batch-config", "./config/model/scenario3.json",
"--start-date", "20230908",
Expand All @@ -109,7 +111,7 @@ def test_scenario3_model_single_partition_change(self):
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, RESOURCES_TO_DELETE_REL)
self.assertEqual(rsp_json, [{'partition': 2023090800001, 'relation': 'city_data'}])

def test_scenario3_model_two_partitions_overriden_by_one(self):
def test_scenario3_model_two_partitions_overriden_by_one_for_date_partitioned(self):
# when
test_args = ["--batch-config", "./config/model/scenario3.json",
"--start-date", "20230908",
Expand Down Expand Up @@ -154,6 +156,50 @@ def test_scenario4_model_reimport_2_partitions_data_with_1(self):
self.assertNotEqual(rsp, 1)
self.assert_output_dir_files(self.test_scenario4_model_reimport_2_partitions_data_with_1.__name__)

def test_scenario5_model_single_partition_change(self):
# when
test_args = ["--batch-config", "./config/model/scenario5.json",
"--start-date", "20230908",
"--end-date", "20230909"]
# copy data for scenario 5
data_folder = "/product"
shutil.copytree(f"{self.dev_data_dir}{data_folder}", f"{self.temp_folder}{data_folder}")
rsp = call(self.cmd_with_common_arguments + test_args + ["--drop-db"])
# then
self.assertNotEqual(rsp, 1)
# and when
# rename files to simulate data refresh
os.rename(f"{self.temp_folder}{data_folder}/part-1-3q1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv",
f"{self.temp_folder}{data_folder}/part-1-{uuid.uuid4()}.csv")
rsp = call(self.cmd_with_common_arguments + test_args)
# then
self.assertNotEqual(rsp, 1)
rai_config = self.resource_manager.get_rai_config()
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, RESOURCES_TO_DELETE_REL)
self.assertEqual(rsp_json, [{'partition': 1, 'relation': 'product_data'}])

def test_scenario6_model_two_partitions_overriden_by_one(self):
# when
test_args = ["--batch-config", "./config/model/scenario6.json"]
data_folder = "/product"
# copy data for scenario 3
shutil.copytree(f"{self.dev_data_dir}{data_folder}", f"{self.temp_folder}{data_folder}")
rsp = call(self.cmd_with_common_arguments + test_args + ["--drop-db"])
# then
self.assertNotEqual(rsp, 1)
# and when
# rename files to simulate data refresh
os.rename(f"{self.temp_folder}{data_folder}/part-1-3q1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv",
f"{self.temp_folder}{data_folder}/part-1-{uuid.uuid4()}.csv")
os.remove(f"{self.temp_folder}{data_folder}/part-2-3w1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv")
rsp = call(self.cmd_with_common_arguments + test_args)
# then
self.assertNotEqual(rsp, 1)
rai_config = self.resource_manager.get_rai_config()
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, RESOURCES_TO_DELETE_REL)
self.assertEqual(rsp_json, [{'partition': 1, 'relation': 'product_data'},
{'partition': 2, 'relation': 'product_data'}])

@classmethod
def setUpClass(cls) -> None:
# Make sure output folder is empty since the folder share across repository. Remove README.md, other files left.
Expand Down
2 changes: 1 addition & 1 deletion cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This Command-Line Interface (CLI) is designed to provide an easy and interactive
1. Create a batch configuration (ex. `poc.json`) file using the syntax and structure outlined in the [RAI Workflow Framework README](../workflow/README.md).
2. Add `rai-workflow-manager` as dependency to your `requirements.txt` file:
```txt
rai-workflow-manager==0.0.17
rai-workflow-manager==0.0.18
```
3. Build the project:
```bash
Expand Down
25 changes: 9 additions & 16 deletions rel/source_configs/config.rel
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ def Resource(r) { resource:id(r, _) }
value type PartIndex = Int
def PartResource(r) { part_resource:hashes_to_part_index(r, _) }

ic part_resource_hashes_to_date(p) { PartResource(p) implies part_resource:hashes_to_date(p, _) }

module resource
def id = transpose[uri:identifies]

Expand Down Expand Up @@ -105,6 +103,14 @@ module part_resource
n = ^PartIndex[ d * part_resource_index_multiplier + s ]
from d, s
}

@inline
def parse_part_index[v](n) {
not uri:parse_value(v, "date", _) and
s = parse_int[ uri:parse_value[v, shard_alias] ] and
n = ^PartIndex[s]
from s
}
end

module uri
Expand Down Expand Up @@ -197,12 +203,6 @@ ic source_has_container_type(s) {
implies
source:container_type(s, _)
}
// currently we do not support chunk partitioning for not date partitioned source
ic multi_part_source_is_date_partitioned(s) {
MultiPartSource(s)
implies
source:date_partitioned(s)
}

module source
def populates = transpose[relation:identifies]
Expand Down Expand Up @@ -279,14 +279,7 @@ def source:populated(s) {

def source:needs_resource(s) {
resource:needed(r) and
not PartResource(r) and
resource:part_of(r, s)
from r
}

def source:needs_resource(s) {
resource:needed(r) and
not PartResource(r) and
not part_resource:hashes_to_date(r, _) and
resource:part_of(r, s)
from r
}
Expand Down
15 changes: 14 additions & 1 deletion rel/source_configs/data_reload.rel
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,27 @@ def simple_sources(rel, path) {
not chunk_partitioned_sources(rel, path, _)
from i
}
// TODO: add support for chunk partitioned sources which are not date partitioned

/*
* All simple sources are affected if they match with declared sources.
*/
def potentially_affected_sources(rel, path) {
source_declares_resource(rel, _, path) and
simple_sources(rel, path)
}
/*
* All chunk partitioned and not partitioned by date sources are affected if they match with declared sources.
*/
def potentially_affected_sources(rel, path, p_idx) {
chunk_partitioned_sources(rel, _, _) and
source_declares_resource(rel, _, path) and
s = relation:identifies[ rel_name:identifies[ ^RelName[rel] ] ] and
not source:date_partitioned(s) and
source:declares(s, res) and
part_resource:hashes_to_part_index(res, p_idx) and
resource:id(res, ^URI[path])
from s, res
}
/*
* All chunk partitioned sources for given date are affected in case new sources have at least one partition in this date.
*/
Expand Down
2 changes: 0 additions & 2 deletions workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ Steps of this type are used to configure sources which workflow manager will use
]
}
```
#### Limitations
- We do not support chunk partitioning for sources which are not partitioned by date.

### Install Model

Expand Down
2 changes: 1 addition & 1 deletion workflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version_info__ = (0, 0, 17)
__version_info__ = (0, 0, 18)
__version__ = ".".join(map(str, __version_info__))

0 comments on commit 3114bde

Please sign in to comment.