From 3114bde0b6de0047001aabf681999d3ad4d113e1 Mon Sep 17 00:00:00 2001 From: Andrew Nepogoda Date: Tue, 3 Oct 2023 16:02:50 +0200 Subject: [PATCH] Added chunk partitioning support for non date partitioned sources (#30) * Added chunk partitioning support for non date partitioned sources --- cli-e2e-test/config/model/scenario1.json | 16 +++++- cli-e2e-test/config/model/scenario2.json | 7 +++ cli-e2e-test/config/model/scenario3.json | 7 +++ cli-e2e-test/config/model/scenario5.json | 34 +++++++++++++ cli-e2e-test/config/model/scenario6.json | 26 ++++++++++ ...-3l1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv} | 0 ...-3t1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv} | 0 ...1-3q1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv | 2 + ...2-3w1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv | 3 ++ ...u1ec0b0-ebfd-a773-71d7-f71f42a2f066.jsonl} | 0 .../test_scenario1_model/products_csv.csv | 4 ++ cli-e2e-test/rel/config/config1.rel | 10 +++- cli-e2e-test/rel/config/config2.rel | 2 +- cli-e2e-test/rel/config/config3.rel | 2 +- cli-e2e-test/rel/product.rel | 7 +++ cli-e2e-test/test_e2e.py | 50 ++++++++++++++++++- cli/README.md | 2 +- rel/source_configs/config.rel | 25 ++++------ rel/source_configs/data_reload.rel | 15 +++++- workflow/README.md | 2 - workflow/__init__.py | 2 +- 21 files changed, 189 insertions(+), 27 deletions(-) create mode 100644 cli-e2e-test/config/model/scenario5.json create mode 100644 cli-e2e-test/config/model/scenario6.json rename cli-e2e-test/data/device/data_dt=20220105/{part-1.csv => part-1-3l1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv} (100%) rename cli-e2e-test/data/device_seen_snapshot/data_dt=20220102/{part-1.csv => part-1-3t1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv} (100%) create mode 100644 cli-e2e-test/data/product/part-1-3q1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv create mode 100644 cli-e2e-test/data/product/part-2-3w1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv rename cli-e2e-test/data/store/data_dt=20220105/{part-1.jsonl => part-1-3u1ec0b0-ebfd-a773-71d7-f71f42a2f066.jsonl} (100%) create mode 100644 cli-e2e-test/expected_results/test_scenario1_model/products_csv.csv create mode 100644 cli-e2e-test/rel/product.rel diff --git a/cli-e2e-test/config/model/scenario1.json b/cli-e2e-test/config/model/scenario1.json index 31078df..ec4c4ae 100644 --- a/cli-e2e-test/config/model/scenario1.json +++ b/cli-e2e-test/config/model/scenario1.json @@ -33,6 +33,13 @@ ], "loadsNumberOfDays": 60 }, + { + "relation": "product_data", + "isChunkPartitioned": true, + "isDatePartitioned": false, + "relativePath": "product", + "inputFormat": "csv" + }, { "relation": "device_seen_snapshot", "isChunkPartitioned": true, @@ -52,6 +59,7 @@ "device.rel", "zip.rel", "store.rel", + "product.rel", "json_schema_mapping.rel", "device_seen.rel" ] @@ -66,7 +74,8 @@ "relations": [ "city:name", "device:name", - "store:name" + "store:name", + "product:name" ], "materializeJointly": true }, @@ -87,6 +96,11 @@ "configRelName": "devices_csv", "relativePath": "devices" }, + { + "type": "csv", + "configRelName": "products_csv", + "relativePath": "products" + }, { "type": "csv", "configRelName": "stores_csv", diff --git a/cli-e2e-test/config/model/scenario2.json b/cli-e2e-test/config/model/scenario2.json index d4bb31f..89c4f7e 100644 --- a/cli-e2e-test/config/model/scenario2.json +++ b/cli-e2e-test/config/model/scenario2.json @@ -20,6 +20,13 @@ "relativePath": "city", "inputFormat": "csv", "loadsNumberOfDays": 60 + }, + { + "relation": "product_data", + "isChunkPartitioned": true, + "isDatePartitioned": false, + "relativePath": "product", + "inputFormat": "csv" } ] } diff --git a/cli-e2e-test/config/model/scenario3.json b/cli-e2e-test/config/model/scenario3.json index fe7351d..72543fa 100644 --- a/cli-e2e-test/config/model/scenario3.json +++ b/cli-e2e-test/config/model/scenario3.json @@ -20,6 +20,13 @@ "relativePath": "temp/city", "inputFormat": "csv", "loadsNumberOfDays": 60 + }, + { + "relation": "product_data", + "isChunkPartitioned": true, + "isDatePartitioned": false, + "relativePath": "product", + "inputFormat": "csv" } ] } diff --git a/cli-e2e-test/config/model/scenario5.json b/cli-e2e-test/config/model/scenario5.json new file mode 100644 index 0000000..d47bf0c --- /dev/null +++ b/cli-e2e-test/config/model/scenario5.json @@ -0,0 +1,34 @@ +{ + "workflow": [ + { + "type": "ConfigureSources", + "name": "ConfigureSources", + "configFiles": [ + "config/config2.rel" + ], + "defaultContainer": "input", + "sources": [ + { + "relation": "zip_city_state_master_data", + "relativePath": "master_source/zip_city_state", + "inputFormat": "csv" + }, + { + "relation": "city_data", + "isChunkPartitioned": true, + "isDatePartitioned": true, + "relativePath": "city", + "inputFormat": "csv", + "loadsNumberOfDays": 60 + }, + { + "relation": "product_data", + "isChunkPartitioned": true, + "isDatePartitioned": false, + "relativePath": "temp/product", + "inputFormat": "csv" + } + ] + } + ] +} diff --git a/cli-e2e-test/config/model/scenario6.json b/cli-e2e-test/config/model/scenario6.json new file mode 100644 index 0000000..489bcb7 --- /dev/null +++ b/cli-e2e-test/config/model/scenario6.json @@ -0,0 +1,26 @@ +{ + "workflow": [ + { + "type": "ConfigureSources", + "name": "ConfigureSources", + "configFiles": [ + "config/config2.rel" + ], + "defaultContainer": "input", + "sources": [ + { + "relation": "zip_city_state_master_data", + "relativePath": "master_source/zip_city_state", + "inputFormat": "csv" + }, + { + "relation": "product_data", + "isChunkPartitioned": true, + "isDatePartitioned": false, + "relativePath": "temp/product", + "inputFormat": "csv" + } + ] + } + ] +} diff --git a/cli-e2e-test/data/device/data_dt=20220105/part-1.csv b/cli-e2e-test/data/device/data_dt=20220105/part-1-3l1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv similarity index 100% rename from cli-e2e-test/data/device/data_dt=20220105/part-1.csv rename to cli-e2e-test/data/device/data_dt=20220105/part-1-3l1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv diff --git a/cli-e2e-test/data/device_seen_snapshot/data_dt=20220102/part-1.csv b/cli-e2e-test/data/device_seen_snapshot/data_dt=20220102/part-1-3t1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv similarity index 100% rename from cli-e2e-test/data/device_seen_snapshot/data_dt=20220102/part-1.csv rename to cli-e2e-test/data/device_seen_snapshot/data_dt=20220102/part-1-3t1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv diff --git a/cli-e2e-test/data/product/part-1-3q1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv b/cli-e2e-test/data/product/part-1-3q1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv new file mode 100644 index 0000000..2a26ddc --- /dev/null +++ b/cli-e2e-test/data/product/part-1-3q1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv @@ -0,0 +1,2 @@ +product +mobile_phone \ No newline at end of file diff --git a/cli-e2e-test/data/product/part-2-3w1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv b/cli-e2e-test/data/product/part-2-3w1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv new file mode 100644 index 0000000..8ec248f --- /dev/null +++ b/cli-e2e-test/data/product/part-2-3w1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv @@ -0,0 +1,3 @@ +product +router +adapter \ No newline at end of file diff --git a/cli-e2e-test/data/store/data_dt=20220105/part-1.jsonl b/cli-e2e-test/data/store/data_dt=20220105/part-1-3u1ec0b0-ebfd-a773-71d7-f71f42a2f066.jsonl similarity index 100% rename from cli-e2e-test/data/store/data_dt=20220105/part-1.jsonl rename to cli-e2e-test/data/store/data_dt=20220105/part-1-3u1ec0b0-ebfd-a773-71d7-f71f42a2f066.jsonl diff --git a/cli-e2e-test/expected_results/test_scenario1_model/products_csv.csv b/cli-e2e-test/expected_results/test_scenario1_model/products_csv.csv new file mode 100644 index 0000000..ff3300b --- /dev/null +++ b/cli-e2e-test/expected_results/test_scenario1_model/products_csv.csv @@ -0,0 +1,4 @@ +name +mobile_phone +router +adapter \ No newline at end of file diff --git a/cli-e2e-test/rel/config/config1.rel b/cli-e2e-test/rel/config/config1.rel index 78e139f..2a0d26b 100644 --- a/cli-e2e-test/rel/config/config1.rel +++ b/cli-e2e-test/rel/config/config1.rel @@ -24,6 +24,14 @@ module export_config } end + module products_csv + def data = product_info + + def syntax:header = { + 1, :name + } + end + module device_seen_snapshot_csv def data = device_seen_snapshot_updated @@ -35,5 +43,5 @@ module export_config end def part_resource_date_pattern = "^(.+)/data_dt=(?[0-9]+)/(.+).(csv|json|jsonl)$" -def part_resource_index_pattern = "^(.+)/data_dt=(.+)/part-(?[0-9]).(csv|json|jsonl)$" +def part_resource_index_pattern = "^(.+)/part-(?[0-9])-(.+).(csv|json|jsonl)$" def part_resource_index_multiplier = 100000 \ No newline at end of file diff --git a/cli-e2e-test/rel/config/config2.rel b/cli-e2e-test/rel/config/config2.rel index d3084ee..0b8b09b 100644 --- a/cli-e2e-test/rel/config/config2.rel +++ b/cli-e2e-test/rel/config/config2.rel @@ -1,3 +1,3 @@ def part_resource_date_pattern = "^(.+)/data_dt=(?[0-9]+)/(.+).(csv|json|jsonl)$" -def part_resource_index_pattern = "^(.+)/data_dt=(.+)/part-(?[0-9])-(.+).(csv|json|jsonl)$" +def part_resource_index_pattern = "^(.+)/part-(?[0-9])-(.+).(csv|json|jsonl)$" def part_resource_index_multiplier = 100000 \ No newline at end of file diff --git a/cli-e2e-test/rel/config/config3.rel b/cli-e2e-test/rel/config/config3.rel index ff27cd7..1e91fd1 100644 --- a/cli-e2e-test/rel/config/config3.rel +++ b/cli-e2e-test/rel/config/config3.rel @@ -10,5 +10,5 @@ module export_config end def part_resource_date_pattern = "^(.+)/data_dt=(?[0-9]+)/(.+).(csv|json|jsonl)$" -def part_resource_index_pattern = "^(.+)/data_dt=(.+)/part-(?[0-9])-(.+).(csv|json|jsonl)$" +def part_resource_index_pattern = "^(.+)/part-(?[0-9])-(.+).(csv|json|jsonl)$" def part_resource_index_multiplier = 100000 \ No newline at end of file diff --git a/cli-e2e-test/rel/product.rel b/cli-e2e-test/rel/product.rel new file mode 100644 index 0000000..3090b8f --- /dev/null +++ b/cli-e2e-test/rel/product.rel @@ -0,0 +1,7 @@ +module product_data + def PRODUCT_NAME[idx, row] = source_catalog:product_data[idx, :product, row] +end + +module product_info + def name = product_data:PRODUCT_NAME[_, _] +end diff --git a/cli-e2e-test/test_e2e.py b/cli-e2e-test/test_e2e.py index 05c7d1c..5e6308f 100644 --- a/cli-e2e-test/test_e2e.py +++ b/cli-e2e-test/test_e2e.py @@ -69,6 +69,8 @@ def test_scenario2_model_force_reimport(self): self.assertEqual(rsp_json, [{'partition': 2023090800001, 'relation': 'city_data'}, {'partition': 2023090800002, 'relation': 'city_data'}, {'partition': 2023090900001, 'relation': 'city_data'}, + {'partition': 1, 'relation': 'product_data'}, + {'partition': 2, 'relation': 'product_data'}, {'relation': 'zip_city_state_master_data'}]) def test_scenario2_model_force_reimport_chunk_partitioned(self): @@ -87,7 +89,7 @@ def test_scenario2_model_force_reimport_chunk_partitioned(self): rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, RESOURCES_TO_DELETE_REL) self.assertEqual(rsp_json, [{'relation': 'zip_city_state_master_data'}]) - def test_scenario3_model_single_partition_change(self): + def test_scenario3_model_single_partition_change_for_date_partitioned(self): # when test_args = ["--batch-config", "./config/model/scenario3.json", "--start-date", "20230908", @@ -109,7 +111,7 @@ def test_scenario3_model_single_partition_change(self): rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, RESOURCES_TO_DELETE_REL) self.assertEqual(rsp_json, [{'partition': 2023090800001, 'relation': 'city_data'}]) - def test_scenario3_model_two_partitions_overriden_by_one(self): + def test_scenario3_model_two_partitions_overriden_by_one_for_date_partitioned(self): # when test_args = ["--batch-config", "./config/model/scenario3.json", "--start-date", "20230908", @@ -154,6 +156,50 @@ def test_scenario4_model_reimport_2_partitions_data_with_1(self): self.assertNotEqual(rsp, 1) self.assert_output_dir_files(self.test_scenario4_model_reimport_2_partitions_data_with_1.__name__) + def test_scenario5_model_single_partition_change(self): + # when + test_args = ["--batch-config", "./config/model/scenario5.json", + "--start-date", "20230908", + "--end-date", "20230909"] + # copy data for scenario 5 + data_folder = "/product" + shutil.copytree(f"{self.dev_data_dir}{data_folder}", f"{self.temp_folder}{data_folder}") + rsp = call(self.cmd_with_common_arguments + test_args + ["--drop-db"]) + # then + self.assertNotEqual(rsp, 1) + # and when + # rename files to simulate data refresh + os.rename(f"{self.temp_folder}{data_folder}/part-1-3q1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv", + f"{self.temp_folder}{data_folder}/part-1-{uuid.uuid4()}.csv") + rsp = call(self.cmd_with_common_arguments + test_args) + # then + self.assertNotEqual(rsp, 1) + rai_config = self.resource_manager.get_rai_config() + rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, RESOURCES_TO_DELETE_REL) + self.assertEqual(rsp_json, [{'partition': 1, 'relation': 'product_data'}]) + + def test_scenario6_model_two_partitions_overriden_by_one(self): + # when + test_args = ["--batch-config", "./config/model/scenario6.json"] + data_folder = "/product" + # copy data for scenario 3 + shutil.copytree(f"{self.dev_data_dir}{data_folder}", f"{self.temp_folder}{data_folder}") + rsp = call(self.cmd_with_common_arguments + test_args + ["--drop-db"]) + # then + self.assertNotEqual(rsp, 1) + # and when + # rename files to simulate data refresh + os.rename(f"{self.temp_folder}{data_folder}/part-1-3q1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv", + f"{self.temp_folder}{data_folder}/part-1-{uuid.uuid4()}.csv") + os.remove(f"{self.temp_folder}{data_folder}/part-2-3w1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv") + rsp = call(self.cmd_with_common_arguments + test_args) + # then + self.assertNotEqual(rsp, 1) + rai_config = self.resource_manager.get_rai_config() + rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, RESOURCES_TO_DELETE_REL) + self.assertEqual(rsp_json, [{'partition': 1, 'relation': 'product_data'}, + {'partition': 2, 'relation': 'product_data'}]) + @classmethod def setUpClass(cls) -> None: # Make sure output folder is empty since the folder share across repository. Remove README.md, other files left. diff --git a/cli/README.md b/cli/README.md index 54ebf5b..4284bff 100644 --- a/cli/README.md +++ b/cli/README.md @@ -6,7 +6,7 @@ This Command-Line Interface (CLI) is designed to provide an easy and interactive 1. Create a batch configuration (ex. `poc.json`) file using the syntax and structure outlined in the [RAI Workflow Framework README](../workflow/README.md). 2. Add `rai-workflow-manager` as dependency to your `requirements.txt` file: ```txt -rai-workflow-manager==0.0.17 +rai-workflow-manager==0.0.18 ``` 3. Build the project: ```bash diff --git a/rel/source_configs/config.rel b/rel/source_configs/config.rel index 4f1a857..0c178da 100644 --- a/rel/source_configs/config.rel +++ b/rel/source_configs/config.rel @@ -74,8 +74,6 @@ def Resource(r) { resource:id(r, _) } value type PartIndex = Int def PartResource(r) { part_resource:hashes_to_part_index(r, _) } -ic part_resource_hashes_to_date(p) { PartResource(p) implies part_resource:hashes_to_date(p, _) } - module resource def id = transpose[uri:identifies] @@ -105,6 +103,14 @@ module part_resource n = ^PartIndex[ d * part_resource_index_multiplier + s ] from d, s } + + @inline + def parse_part_index[v](n) { + not uri:parse_value(v, "date", _) and + s = parse_int[ uri:parse_value[v, shard_alias] ] and + n = ^PartIndex[s] + from s + } end module uri @@ -197,12 +203,6 @@ ic source_has_container_type(s) { implies source:container_type(s, _) } -// currently we do not support chunk partitioning for not date partitioned source -ic multi_part_source_is_date_partitioned(s) { - MultiPartSource(s) - implies - source:date_partitioned(s) -} module source def populates = transpose[relation:identifies] @@ -279,14 +279,7 @@ def source:populated(s) { def source:needs_resource(s) { resource:needed(r) and - not PartResource(r) and - resource:part_of(r, s) - from r -} - -def source:needs_resource(s) { - resource:needed(r) and - not PartResource(r) and + not part_resource:hashes_to_date(r, _) and resource:part_of(r, s) from r } diff --git a/rel/source_configs/data_reload.rel b/rel/source_configs/data_reload.rel index 366d729..d02f822 100644 --- a/rel/source_configs/data_reload.rel +++ b/rel/source_configs/data_reload.rel @@ -24,7 +24,7 @@ def simple_sources(rel, path) { not chunk_partitioned_sources(rel, path, _) from i } -// TODO: add support for chunk partitioned sources which are not date partitioned + /* * All simple sources are affected if they match with declared sources. */ @@ -32,6 +32,19 @@ def potentially_affected_sources(rel, path) { source_declares_resource(rel, _, path) and simple_sources(rel, path) } +/* + * All chunk partitioned and not partitioned by date sources are affected if they match with declared sources. + */ +def potentially_affected_sources(rel, path, p_idx) { + chunk_partitioned_sources(rel, _, _) and + source_declares_resource(rel, _, path) and + s = relation:identifies[ rel_name:identifies[ ^RelName[rel] ] ] and + not source:date_partitioned(s) and + source:declares(s, res) and + part_resource:hashes_to_part_index(res, p_idx) and + resource:id(res, ^URI[path]) + from s, res +} /* * All chunk partitioned sources for given date are affected in case new sources have at least one partition in this date. */ diff --git a/workflow/README.md b/workflow/README.md index d009cf3..47e260e 100644 --- a/workflow/README.md +++ b/workflow/README.md @@ -87,8 +87,6 @@ Steps of this type are used to configure sources which workflow manager will use ] } ``` -#### Limitations -- We do not support chunk partitioning for sources which are not partitioned by date. ### Install Model diff --git a/workflow/__init__.py b/workflow/__init__.py index 9a98cd7..7977cb5 100644 --- a/workflow/__init__.py +++ b/workflow/__init__.py @@ -12,5 +12,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version_info__ = (0, 0, 17) +__version_info__ = (0, 0, 18) __version__ = ".".join(map(str, __version_info__))