From f2d398cecdc655a9eb30b4156cdbb566111d9140 Mon Sep 17 00:00:00 2001 From: Stanislav Bedunkevich <63121780+stanislav-bedunkevich@users.noreply.github.com> Date: Mon, 1 Jul 2024 23:03:46 +0200 Subject: [PATCH] add incremental snapshot support (#82) * add incremental snapshot support * disable expiration reloading for snapshots * add snapshot catalog * fix test * fix test * update oracle * up version --- ...1-3l1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv | 2 + ...1-3p1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv | 4 ++ .../cities_csv.csv | 3 + .../device_seen_snapshot_csv.csv | 6 ++ .../devices_csv.csv | 4 ++ .../products_csv.csv | 4 ++ .../stores_csv.csv | 3 + .../users_csv.csv | 4 ++ cli-e2e-test/rel/device_seen.rel | 16 +++-- cli-e2e-test/test_e2e.py | 17 ++++- cli/args.py | 6 ++ cli/runner.py | 3 +- rel/source_configs/config.rel | 14 ++++ rel/source_configs/data_reload.rel | 5 +- rel/util/snapshot_diff.rel | 5 ++ test/test_cfg_src_step.py | 3 +- test/test_cfg_src_step_factory.py | 3 +- workflow/__init__.py | 2 +- workflow/constants.py | 2 + workflow/executor.py | 36 ++++++++--- workflow/query.py | 64 +++++++++++++------ 21 files changed, 165 insertions(+), 41 deletions(-) create mode 100644 cli-e2e-test/data/device/data_dt=20220109/part-1-3l1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv create mode 100644 cli-e2e-test/data/device_seen_snapshot/data_dt=20220108/part-1-3p1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv create mode 100644 cli-e2e-test/expected_results/test_scenario1_load_data_jointly/cities_csv.csv create mode 100644 cli-e2e-test/expected_results/test_scenario1_load_data_jointly/device_seen_snapshot_csv.csv create mode 100644 cli-e2e-test/expected_results/test_scenario1_load_data_jointly/devices_csv.csv create mode 100644 cli-e2e-test/expected_results/test_scenario1_load_data_jointly/products_csv.csv create mode 100644 cli-e2e-test/expected_results/test_scenario1_load_data_jointly/stores_csv.csv create mode 100644 cli-e2e-test/expected_results/test_scenario1_load_data_jointly/users_csv.csv create mode 100644 rel/util/snapshot_diff.rel diff --git a/cli-e2e-test/data/device/data_dt=20220109/part-1-3l1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv b/cli-e2e-test/data/device/data_dt=20220109/part-1-3l1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv new file mode 100644 index 0000000..4738c38 --- /dev/null +++ b/cli-e2e-test/data/device/data_dt=20220109/part-1-3l1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv @@ -0,0 +1,2 @@ +device,IMEI +IPhone8,125 diff --git a/cli-e2e-test/data/device_seen_snapshot/data_dt=20220108/part-1-3p1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv b/cli-e2e-test/data/device_seen_snapshot/data_dt=20220108/part-1-3p1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv new file mode 100644 index 0000000..39b95f6 --- /dev/null +++ b/cli-e2e-test/data/device_seen_snapshot/data_dt=20220108/part-1-3p1ec0b0-ebfd-a773-71d7-f71f42a2f066.csv @@ -0,0 +1,4 @@ +device,last_seen +IPhone1,2022-01-05 +IPhone2,2022-01-05 +IPhone7,2022-01-01 diff --git a/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/cities_csv.csv b/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/cities_csv.csv new file mode 100644 index 0000000..e7b6238 --- /dev/null +++ b/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/cities_csv.csv @@ -0,0 +1,3 @@ +name +Los Angeles +Santa Monica \ No newline at end of file diff --git a/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/device_seen_snapshot_csv.csv b/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/device_seen_snapshot_csv.csv new file mode 100644 index 0000000..fe9d0c4 --- /dev/null +++ b/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/device_seen_snapshot_csv.csv @@ -0,0 +1,6 @@ +device,last_seen +IPhone1,2022-01-09 +IPhone2,2022-01-09 +IPhone6,2021-11-16 +IPhone7,2022-01-01 +IPhone8,2022-01-09 diff --git a/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/devices_csv.csv b/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/devices_csv.csv new file mode 100644 index 0000000..8e773b2 --- /dev/null +++ b/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/devices_csv.csv @@ -0,0 +1,4 @@ +name +IPhone1 +IPhone2 +IPhone8 diff --git a/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/products_csv.csv b/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/products_csv.csv new file mode 100644 index 0000000..ff3300b --- /dev/null +++ b/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/products_csv.csv @@ -0,0 +1,4 @@ +name +mobile_phone +router +adapter \ No newline at end of file diff --git a/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/stores_csv.csv b/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/stores_csv.csv new file mode 100644 index 0000000..f01a8f5 --- /dev/null +++ b/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/stores_csv.csv @@ -0,0 +1,3 @@ +name +Store1 +Store2 \ No newline at end of file diff --git a/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/users_csv.csv b/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/users_csv.csv new file mode 100644 index 0000000..0134cee --- /dev/null +++ b/cli-e2e-test/expected_results/test_scenario1_load_data_jointly/users_csv.csv @@ -0,0 +1,4 @@ +name +Andrew +Sam +Jordan \ No newline at end of file diff --git a/cli-e2e-test/rel/device_seen.rel b/cli-e2e-test/rel/device_seen.rel index b3fc2d9..577e71f 100644 --- a/cli-e2e-test/rel/device_seen.rel +++ b/cli-e2e-test/rel/device_seen.rel @@ -5,13 +5,15 @@ module import_config:device_seen_snapshot } bound syntax:header -end -module device_seen_snapshot - def DEVICE_NAME[idx, row] = source_catalog:device_seen_snapshot[idx, :device, row] - def LAST_SEEN[idx, row] = source_catalog:device_seen_snapshot[idx, :last_seen, row] + @inline + def row_key_map[R](idx, row, key) { + ^Device(R[idx, :device, row], key) + } end +entity type Device = String + /* * device_seen snapshot is a snapshot of the last time a device was seen, which is * essentially an aggregation over the snapshot data and the current day's data @@ -28,9 +30,9 @@ def device_last_seen[d] = max[t: device_seen_today(d, t) or ( - device_seen_snapshot:DEVICE_NAME(idx, row, d) and - device_seen_snapshot:LAST_SEEN(idx, row, t) - from idx, row + snapshot_catalog:device_seen_snapshot:device(key, d) and + snapshot_catalog:device_seen_snapshot:last_seen(key, t) + from key ) ] diff --git a/cli-e2e-test/test_e2e.py b/cli-e2e-test/test_e2e.py index 1b165bb..44070ed 100644 --- a/cli-e2e-test/test_e2e.py +++ b/cli-e2e-test/test_e2e.py @@ -41,15 +41,26 @@ def test_scenario1_model(self): self.assert_output_dir_files(self.test_scenario1_model.__name__) def test_scenario1_load_data_jointly(self): - # when + # when loading as of 20220105 test_args = ["--batch-config", "./config/model/scenario1.json", "--end-date", "20220105", - "--drop-db", "--load-data-jointly"] + "--drop-db", "--load-data-jointly", + "--enable-incremental-snapshots"] rsp = call(self.cmd_with_common_arguments + test_args) - # then + # then should get the same result as other tests for scenario1 self.assertNotEqual(rsp, 1) self.assert_output_dir_files(self.test_scenario1_model.__name__) + # when loading as of 20220109 + test_args = ["--batch-config", "./config/model/scenario1.json", + "--end-date", "20220109", + "--load-data-jointly", + "--enable-incremental-snapshots"] + rsp = call(self.cmd_with_common_arguments + test_args) + # then should get an updated snapshot + self.assertNotEqual(rsp, 1) + self.assert_output_dir_files(self.test_scenario1_load_data_jointly.__name__) + def test_scenario1_model_yaml(self): # when test_args = ["--batch-config", "./config/model/scenario1.yaml", diff --git a/cli/args.py b/cli/args.py index 81e80a9..61d1880 100644 --- a/cli/args.py +++ b/cli/args.py @@ -153,6 +153,12 @@ def parse() -> Namespace: action=BooleanOptionalAction, default=True ) + parser.add_argument( + "--enable-incremental-snapshots", + help="Loading snapshot sources via first doing a diff and applying insert:* and delete:* deltas", + action=BooleanOptionalAction, + default=False + ) parser.add_argument( "--recover", help="Recover a batch run starting from a FAILED step", diff --git a/cli/runner.py b/cli/runner.py index 1f7d3e7..50dcf3c 100644 --- a/cli/runner.py +++ b/cli/runner.py @@ -52,7 +52,8 @@ def start(factories: dict[str, workflow.executor.WorkflowStepFactory] = MappingP workflow.constants.FORCE_REIMPORT: args.force_reimport, workflow.constants.FORCE_REIMPORT_NOT_CHUNK_PARTITIONED: args.force_reimport_not_chunk_partitioned, workflow.constants.COLLAPSE_PARTITIONS_ON_LOAD: args.collapse_partitions_on_load, - workflow.constants.LOAD_DATA_JOINTLY: args.load_data_jointly + workflow.constants.LOAD_DATA_JOINTLY: args.load_data_jointly, + workflow.constants.ENABLE_INCREMENTAL_SNAPSHOTS: args.enable_incremental_snapshots } config = workflow.executor.WorkflowConfig(env_config, workflow.common.BatchConfig(args.batch_config_name, batch_config_json), diff --git a/rel/source_configs/config.rel b/rel/source_configs/config.rel index 806fa9b..5fd65de 100644 --- a/rel/source_configs/config.rel +++ b/rel/source_configs/config.rel @@ -4,6 +4,7 @@ bound date_partitioned_source_relation = String bound source_declares_resource = String, String, String bound source_has_input_format = String, String bound source_has_container_type = String, String +bound snapshot_catalog bound source_catalog bound simple_source_catalog bound part_resource_date_pattern = String @@ -268,6 +269,10 @@ module source } def resource_part_index = declares . part_resource:part_index + + def has_batch_config(s, cfg) { + source:relname[s] = #(batch_source:relation[cfg]) + } end /** @@ -396,6 +401,15 @@ def missing_resources_json(:[], n, :is_date_partitioned, v) { from s } +def missing_resources_json(:[], n, :is_snapshot, v) { + source:needs_resource(s) and + source:index[s] = n and + source:has_batch_config(s, cfg) and + batch_source:snapshot_validity_days(cfg, _) and + boolean_true(v) + from s, cfg +} + def missing_resources_json(:[], i, :resources, :[], k, :uri, n) { source:index[s] = i and resource:index[s, r] = k and diff --git a/rel/source_configs/data_reload.rel b/rel/source_configs/data_reload.rel index ef22fce..3751598 100644 --- a/rel/source_configs/data_reload.rel +++ b/rel/source_configs/data_reload.rel @@ -123,7 +123,10 @@ def resources_to_delete(rel) { rel = #(transpose[resource_to_invalidate][_]) } -def resources_data_to_delete_idx = enumerate[resources_data_to_delete] +// the following enumerate is split in order to help the compiler handle arity overloads +def resources_data_to_delete_idx = enumerate[r, ix: resources_data_to_delete(r, ix)] +def resources_data_to_delete_idx = enumerate[r: resources_data_to_delete(r)] + def resources_data_to_delete_json(:[], i, :relation, s) { resources_data_to_delete_idx(i, r, _) and s = relname_string[r] diff --git a/rel/util/snapshot_diff.rel b/rel/util/snapshot_diff.rel new file mode 100644 index 0000000..9d07d56 --- /dev/null +++ b/rel/util/snapshot_diff.rel @@ -0,0 +1,5 @@ +@outline @nomaintain +module snapshot_diff[{old}, {new}] + def insertions(x...) = new(x...) and not old(x...) + def deletions(x...) = old(x...) and not new(x...) +end diff --git a/test/test_cfg_src_step.py b/test/test_cfg_src_step.py index d333eef..bbe34d1 100644 --- a/test/test_cfg_src_step.py +++ b/test/test_cfg_src_step.py @@ -581,7 +581,8 @@ def _create_cfg_sources_step(sources: List[Source], paths_builders: dict[str, pa start_date=start_date, end_date=end_date, force_reimport=False, - force_reimport_not_chunk_partitioned=False + force_reimport_not_chunk_partitioned=False, + enable_incremental_snapshots=False ) diff --git a/test/test_cfg_src_step_factory.py b/test/test_cfg_src_step_factory.py index 10cf6f6..7e687c9 100644 --- a/test/test_cfg_src_step_factory.py +++ b/test/test_cfg_src_step_factory.py @@ -77,7 +77,8 @@ def _create_wf_cfg(env_config: EnvConfig, batch_config: BatchConfig) -> Workflow constants.FORCE_REIMPORT: False, constants.FORCE_REIMPORT_NOT_CHUNK_PARTITIONED: False, constants.COLLAPSE_PARTITIONS_ON_LOAD: False, - constants.LOAD_DATA_JOINTLY: False + constants.LOAD_DATA_JOINTLY: False, + constants.ENABLE_INCREMENTAL_SNAPSHOTS: False, } return WorkflowConfig( env=env_config, diff --git a/workflow/__init__.py b/workflow/__init__.py index ebd44f4..69ada38 100644 --- a/workflow/__init__.py +++ b/workflow/__init__.py @@ -12,5 +12,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version_info__ = (0, 0, 49) +__version_info__ = (0, 0, 50) __version__ = ".".join(map(str, __version_info__)) diff --git a/workflow/constants.py b/workflow/constants.py index 061e067..0e5afa7 100644 --- a/workflow/constants.py +++ b/workflow/constants.py @@ -11,6 +11,7 @@ "batch_config/workflow/steps/load_data.rel", "batch_config/workflow/steps/materialize.rel", "batch_config/workflow/steps/execute_command.rel", + "util/snapshot_diff.rel", ] COMMON_MODEL_RELATIVE_PATH = "/../rel" @@ -81,6 +82,7 @@ FORCE_REIMPORT_NOT_CHUNK_PARTITIONED = "force_reimport_not_chunk_partitioned" COLLAPSE_PARTITIONS_ON_LOAD = "collapse_partitions_on_load" LOAD_DATA_JOINTLY = "load_data_jointly" +ENABLE_INCREMENTAL_SNAPSHOTS = "enable_incremental_snapshots" # Snowflake constants diff --git a/workflow/executor.py b/workflow/executor.py index 4c33c48..52b0eab 100644 --- a/workflow/executor.py +++ b/workflow/executor.py @@ -131,9 +131,11 @@ class ConfigureSourcesWorkflowStep(WorkflowStep): end_date: str force_reimport: bool force_reimport_not_chunk_partitioned: bool + enable_incremental_snapshots: bool def __init__(self, idt, name, type_value, state, timing, engine_size, config_files, rel_config_dir, sources, - paths_builders, start_date, end_date, force_reimport, force_reimport_not_chunk_partitioned): + paths_builders, start_date, end_date, force_reimport, force_reimport_not_chunk_partitioned, + enable_incremental_snapshots): super().__init__(idt, name, type_value, state, timing, engine_size) self.config_files = config_files self.rel_config_dir = rel_config_dir @@ -143,6 +145,7 @@ def __init__(self, idt, name, type_value, state, timing, engine_size, config_fil self.end_date = end_date self.force_reimport = force_reimport self.force_reimport_not_chunk_partitioned = force_reimport_not_chunk_partitioned + self.enable_incremental_snapshots = enable_incremental_snapshots def _execute(self, logger: logging.Logger, env_config: EnvConfig, rai_config: RaiConfig): rai.install_models(logger, rai_config, env_config, build_models(self.config_files, self.rel_config_dir)) @@ -152,6 +155,15 @@ def _execute(self, logger: logging.Logger, env_config: EnvConfig, rai_config: Ra declared_sources = {src["source"]: src for src in rai.execute_relation_json(logger, rai_config, env_config, constants.DECLARED_DATE_PARTITIONED_SOURCE_REL)} + + # if `enable_incremental_snapshots` is set to True, we need to filter out snapshot sources + if self.enable_incremental_snapshots: + # get a set of snapshot source names from the `relation` field + snapshot_sources = {src.relation for src in self.sources if src.snapshot_validity_days and + src.snapshot_validity_days >= 0} + logger.info(f"Snapshot sources skipped, will be reloaded incrementally: {snapshot_sources}") + # filter out snapshot sources from the declared sources + declared_sources = {k: v for k, v in declared_sources.items() if k not in snapshot_sources} expired_sources = self._calculate_expired_sources(logger, declared_sources) # mark declared sources for reimport @@ -284,6 +296,7 @@ def _get_step(self, logger: logging.Logger, config: WorkflowConfig, idt, name, t force_reimport = config.step_params.get(constants.FORCE_REIMPORT, False) force_reimport_not_chunk_partitioned = config.step_params.get(constants.FORCE_REIMPORT_NOT_CHUNK_PARTITIONED, False) + enable_incremental_snapshots = config.step_params[constants.ENABLE_INCREMENTAL_SNAPSHOTS] paths_builders = {} for src in sources: container = src.container @@ -291,7 +304,8 @@ def _get_step(self, logger: logging.Logger, config: WorkflowConfig, idt, name, t paths_builders[container.name] = paths.PathsBuilderFactory.get_path_builder(container) return ConfigureSourcesWorkflowStep(idt, name, type_value, state, timing, engine_size, step["configFiles"], rel_config_dir, sources, paths_builders, start_date, end_date, - force_reimport, force_reimport_not_chunk_partitioned) + force_reimport, force_reimport_not_chunk_partitioned, + enable_incremental_snapshots) @staticmethod def _parse_sources(step: dict, env_config: EnvConfig) -> List[Source]: @@ -328,11 +342,14 @@ def _parse_sources(step: dict, env_config: EnvConfig) -> List[Source]: class LoadDataWorkflowStep(WorkflowStep): collapse_partitions_on_load: bool load_jointly: bool + enable_incremental_snapshots: bool - def __init__(self, idt, name, type_value, state, timing, engine_size, collapse_partitions_on_load, load_jointly): + def __init__(self, idt, name, type_value, state, timing, engine_size, collapse_partitions_on_load, load_jointly, + enable_incremental_snapshots): super().__init__(idt, name, type_value, state, timing, engine_size) self.collapse_partitions_on_load = collapse_partitions_on_load self.load_jointly = load_jointly + self.enable_incremental_snapshots = enable_incremental_snapshots def _execute(self, logger: logging.Logger, env_config: EnvConfig, rai_config: RaiConfig): rai.execute_query(logger, rai_config, env_config, q.DELETE_REFRESHED_SOURCES_DATA, readonly=False) @@ -353,6 +370,7 @@ def _execute(self, logger: logging.Logger, env_config: EnvConfig, rai_config: Ra self._load_simple_resources(logger, env_config, rai_config, simple_resources) + # note: async resources do not support snapshot diffing, as CDC should be incremental if async_resources: self._load_async_resources(logger, env_config, rai_config, async_resources) @@ -425,7 +443,7 @@ def _get_date_part_load_query(self, logger: logging.Logger, config, src): resources = [] for d in srcs: resources += d["resources"] - return [q.load_resources(logger, config, resources, src)] + return [q.load_resources(logger, config, resources, src, self.enable_incremental_snapshots)] else: logger.info(f"Loading '{source_name}' one date partition at a time") batch = [] @@ -433,7 +451,8 @@ def _get_date_part_load_query(self, logger: logging.Logger, config, src): logger.info(f"Loading partition for date {d['date']}") for res in d["resources"]: - batch.append(q.load_resources(logger, config, [res], src)) + batch.append( + q.load_resources(logger, config, [res], src, self.enable_incremental_snapshots)) return batch def _get_simple_src_load_query(self, logger: logging.Logger, config, src): @@ -441,12 +460,12 @@ def _get_simple_src_load_query(self, logger: logging.Logger, config, src): logger.info(f"Loading source '{source_name}' not partitioned by date") if self.collapse_partitions_on_load: logger.info(f"Loading '{source_name}' all chunk partitions simultaneously") - return [q.load_resources(logger, config, src["resources"], src)] + return [q.load_resources(logger, config, src["resources"], src, self.enable_incremental_snapshots)] else: logger.info(f"Loading '{source_name}' one chunk partition at a time") batch = [] for res in src["resources"]: - batch.append(q.load_resources(logger, config, [res], src)) + batch.append(q.load_resources(logger, config, [res], src, self.enable_incremental_snapshots)) return batch @staticmethod @@ -470,8 +489,9 @@ def _get_step(self, logger: logging.Logger, config: WorkflowConfig, idt, name, t engine_size, step: dict) -> WorkflowStep: collapse_partitions_on_load = config.step_params[constants.COLLAPSE_PARTITIONS_ON_LOAD] load_jointly = config.step_params[constants.LOAD_DATA_JOINTLY] + enable_incremental_snapshots = config.step_params[constants.ENABLE_INCREMENTAL_SNAPSHOTS] return LoadDataWorkflowStep(idt, name, type_value, state, timing, engine_size, collapse_partitions_on_load, - load_jointly) + load_jointly, enable_incremental_snapshots) class MaterializeWorkflowStep(WorkflowStep): diff --git a/workflow/query.py b/workflow/query.py index e894f1d..095538f 100644 --- a/workflow/query.py +++ b/workflow/query.py @@ -131,30 +131,34 @@ def insert:resources_data_to_delete = resources_to_delete """ -def load_resources(logger: logging.Logger, config: AzureConfig, resources, src) -> QueryWithInputs: +def load_resources(logger: logging.Logger, config: AzureConfig, resources, src, snapshot_diff_enabled: bool = False) ->\ + QueryWithInputs: rel_name = src["source"] file_stype_str = src["file_type"] file_type = FileType[file_stype_str] src_type = ContainerType.from_source(src) + reload_as_snapshot = snapshot_diff_enabled and src.get("is_snapshot", False) if 'is_multi_part' in src and src['is_multi_part'] == 'Y': if file_type == FileType.CSV or file_type == FileType.JSONL: if src_type == ContainerType.LOCAL: logger.info(f"Loading {len(resources)} shards from local files") - return _local_load_multipart_query(rel_name, file_type, resources) + return _local_load_multipart_query(rel_name, file_type, resources, reload_as_snapshot) elif src_type == ContainerType.AZURE: logger.info(f"Loading {len(resources)} shards from Azure files") - return QueryWithInputs(_azure_load_multipart_query(rel_name, file_type, resources, config), {}) + return QueryWithInputs( + _azure_load_multipart_query(rel_name, file_type, resources, config, reload_as_snapshot), {}) else: logger.error(f"Unknown file type {file_stype_str}") else: if src_type == ContainerType.LOCAL: logger.info("Loading from local file") - return _local_load_simple_query(rel_name, resources[0]["uri"], file_type) + return _local_load_simple_query(rel_name, resources[0]["uri"], file_type, reload_as_snapshot) elif src_type == ContainerType.AZURE: logger.info("Loading from Azure file") - return QueryWithInputs(_azure_load_simple_query(rel_name, resources[0]["uri"], file_type, config), {}) + return QueryWithInputs( + _azure_load_simple_query(rel_name, resources[0]["uri"], file_type, config, reload_as_snapshot), {}) def get_snapshot_expiration_date(snapshot_binding: str, date_format: str) -> str: @@ -269,25 +273,26 @@ def output_relation(relation: str) -> str: return f"def output = {relation}" -def _local_load_simple_query(rel_name: str, uri: str, file_type: FileType) -> QueryWithInputs: +def _local_load_simple_query(rel_name: str, uri: str, file_type: FileType, reload_as_snapshot: bool) -> QueryWithInputs: try: raw_data_rel_name = f"{rel_name}_data" data = utils.read(uri) query = f"def {IMPORT_CONFIG_REL}:{rel_name}:data = {raw_data_rel_name}\n" \ - f"{_simple_insert_query(rel_name, file_type)}\n" + f"{_simple_insert_query(rel_name, file_type, reload_as_snapshot)}\n" return QueryWithInputs(query, {raw_data_rel_name: data}) except OSError as e: raise e -def _azure_load_simple_query(rel_name: str, uri: str, file_type: FileType, config: AzureConfig) -> str: +def _azure_load_simple_query(rel_name: str, uri: str, file_type: FileType, config: AzureConfig, + reload_as_snapshot: bool) -> str: return f"def {IMPORT_CONFIG_REL}:{rel_name}:integration:provider = \"azure\"\n" \ f"def {IMPORT_CONFIG_REL}:{rel_name}:integration:credentials:azure_sas_token = raw\"{config.sas}\"\n" \ f"def {IMPORT_CONFIG_REL}:{rel_name}:path = \"{uri}\"\n" \ - f"{_simple_insert_query(rel_name, file_type)}" + f"{_simple_insert_query(rel_name, file_type, reload_as_snapshot)}" -def _local_load_multipart_query(rel_name: str, file_type: FileType, parts) -> QueryWithInputs: +def _local_load_multipart_query(rel_name: str, file_type: FileType, parts, reload_as_snapshot: bool) -> QueryWithInputs: raw_data_rel_name = f"{rel_name}_data" raw_text = "" @@ -303,7 +308,7 @@ def _local_load_multipart_query(rel_name: str, file_type: FileType, parts) -> Qu except OSError as e: raise e - insert_text = _multi_part_insert_query(rel_name, file_type) + insert_text = _multi_part_insert_query(rel_name, file_type, reload_as_snapshot) load_config = _multi_part_load_config_query(rel_name, file_type, _local_multipart_config_integration(raw_data_rel_name)) @@ -315,7 +320,8 @@ def _local_load_multipart_query(rel_name: str, file_type: FileType, parts) -> Qu return QueryWithInputs(query, inputs) -def _azure_load_multipart_query(rel_name: str, file_type: FileType, parts, config: AzureConfig) -> str: +def _azure_load_multipart_query(rel_name: str, file_type: FileType, parts, config: AzureConfig, + reload_as_snapshot: bool) -> str: path_rel_name = f"{rel_name}_path" part_indexes = "" @@ -326,7 +332,7 @@ def _azure_load_multipart_query(rel_name: str, file_type: FileType, parts, confi part_indexes += f"{part_idx}\n" part_uri_map += f"{part_idx},\"{part_uri}\"\n" - insert_text = _multi_part_insert_query(rel_name, file_type) + insert_text = _multi_part_insert_query(rel_name, file_type, reload_as_snapshot) load_config = _multi_part_load_config_query(rel_name, file_type, _azure_multipart_config_integration(path_rel_name, config)) @@ -360,14 +366,36 @@ def _azure_multipart_config_integration(path_rel_name: str, config: AzureConfig) f"def path = {path_rel_name}[i]\n" -def _multi_part_insert_query(rel_name: str, file_type: FileType) -> str: +def _multi_part_insert_query(rel_name: str, file_type: FileType, reload_as_snapshot: bool) -> str: conf_rel_name = _config_rel_name(rel_name) - - return f"def insert:source_catalog:{rel_name}[i] = {FILE_LOAD_RELATION[file_type]}[{conf_rel_name}[i]]" + insert_body = f"{FILE_LOAD_RELATION[file_type]}[{conf_rel_name}[i]]" + if reload_as_snapshot: + return _snapshot_delta_query(rel_name, insert_body, True) + else: + return f"def insert:source_catalog:{rel_name}[i] = {insert_body}" -def _simple_insert_query(rel_name: str, file_type: FileType) -> str: - return f"def insert:simple_source_catalog:{rel_name} = {FILE_LOAD_RELATION[file_type]}[{IMPORT_CONFIG_REL}:{rel_name}]" +def _simple_insert_query(rel_name: str, file_type: FileType, reload_as_snapshot: bool) -> str: + insert_body = f"{FILE_LOAD_RELATION[file_type]}[{IMPORT_CONFIG_REL}:{rel_name}]" + if reload_as_snapshot: + return _snapshot_delta_query(rel_name, insert_body, False) + else: + return f"def insert:simple_source_catalog:{rel_name} = {insert_body}" + + +def _snapshot_delta_query(rel_name: str, data_body: str, is_partitioned: bool) -> str: + part_index_col = "[i]" if is_partitioned else "" + part_index_var = "i, " if is_partitioned else "" + return f"def _snapshot_data_raw:{rel_name}{part_index_col} = {data_body}\n" \ + f"def _snapshot_data_key:{rel_name} = import_config:{rel_name}:row_key_map[_snapshot_data_raw:{rel_name}]\n" \ + f"def _snapshot_data:{rel_name}(col, key, val) {{\n" \ + f" _snapshot_data_key:{rel_name}({part_index_var}row, key) and\n" \ + f" _snapshot_data_raw:{rel_name}({part_index_var}col, row, val)\n" \ + f" from {part_index_var}row\n" \ + f"}}\n" \ + f"def _snapshot_delta:{rel_name} = snapshot_diff[snapshot_catalog:{rel_name}, _snapshot_data:{rel_name}]\n" \ + f"def insert:snapshot_catalog:{rel_name} = _snapshot_delta:{rel_name}:insertions\n" \ + f"def delete:snapshot_catalog:{rel_name} = _snapshot_delta:{rel_name}:deletions\n" def _load_from_indexed_literal(raw_data_rel_name: str, index: int) -> str: