Skip to content

Commit

Permalink
add incremental snapshot support (#82)
Browse files Browse the repository at this point in the history
* add incremental snapshot support

* disable expiration reloading for snapshots

* add snapshot catalog

* fix test

* fix test

* update oracle

* up version
  • Loading branch information
stanislav-bedunkevich authored Jul 1, 2024
1 parent 81a5609 commit f2d398c
Show file tree
Hide file tree
Showing 21 changed files with 165 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
device,IMEI
IPhone8,125
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
device,last_seen
IPhone1,2022-01-05
IPhone2,2022-01-05
IPhone7,2022-01-01
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name
Los Angeles
Santa Monica
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
device,last_seen
IPhone1,2022-01-09
IPhone2,2022-01-09
IPhone6,2021-11-16
IPhone7,2022-01-01
IPhone8,2022-01-09
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
name
IPhone1
IPhone2
IPhone8
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
name
mobile_phone
router
adapter
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name
Store1
Store2
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
name
Andrew
Sam
Jordan
16 changes: 9 additions & 7 deletions cli-e2e-test/rel/device_seen.rel
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ module import_config:device_seen_snapshot
}

bound syntax:header
end

module device_seen_snapshot
def DEVICE_NAME[idx, row] = source_catalog:device_seen_snapshot[idx, :device, row]
def LAST_SEEN[idx, row] = source_catalog:device_seen_snapshot[idx, :last_seen, row]
@inline
def row_key_map[R](idx, row, key) {
^Device(R[idx, :device, row], key)
}
end

entity type Device = String

/*
* device_seen snapshot is a snapshot of the last time a device was seen, which is
* essentially an aggregation over the snapshot data and the current day's data
Expand All @@ -28,9 +30,9 @@ def device_last_seen[d] = max[t:
device_seen_today(d, t)
or
(
device_seen_snapshot:DEVICE_NAME(idx, row, d) and
device_seen_snapshot:LAST_SEEN(idx, row, t)
from idx, row
snapshot_catalog:device_seen_snapshot:device(key, d) and
snapshot_catalog:device_seen_snapshot:last_seen(key, t)
from key
)
]

Expand Down
17 changes: 14 additions & 3 deletions cli-e2e-test/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,26 @@ def test_scenario1_model(self):
self.assert_output_dir_files(self.test_scenario1_model.__name__)

def test_scenario1_load_data_jointly(self):
# when
# when loading as of 20220105
test_args = ["--batch-config", "./config/model/scenario1.json",
"--end-date", "20220105",
"--drop-db", "--load-data-jointly"]
"--drop-db", "--load-data-jointly",
"--enable-incremental-snapshots"]
rsp = call(self.cmd_with_common_arguments + test_args)
# then
# then should get the same result as other tests for scenario1
self.assertNotEqual(rsp, 1)
self.assert_output_dir_files(self.test_scenario1_model.__name__)

# when loading as of 20220109
test_args = ["--batch-config", "./config/model/scenario1.json",
"--end-date", "20220109",
"--load-data-jointly",
"--enable-incremental-snapshots"]
rsp = call(self.cmd_with_common_arguments + test_args)
# then should get an updated snapshot
self.assertNotEqual(rsp, 1)
self.assert_output_dir_files(self.test_scenario1_load_data_jointly.__name__)

def test_scenario1_model_yaml(self):
# when
test_args = ["--batch-config", "./config/model/scenario1.yaml",
Expand Down
6 changes: 6 additions & 0 deletions cli/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ def parse() -> Namespace:
action=BooleanOptionalAction,
default=True
)
parser.add_argument(
"--enable-incremental-snapshots",
help="Loading snapshot sources via first doing a diff and applying insert:* and delete:* deltas",
action=BooleanOptionalAction,
default=False
)
parser.add_argument(
"--recover",
help="Recover a batch run starting from a FAILED step",
Expand Down
3 changes: 2 additions & 1 deletion cli/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def start(factories: dict[str, workflow.executor.WorkflowStepFactory] = MappingP
workflow.constants.FORCE_REIMPORT: args.force_reimport,
workflow.constants.FORCE_REIMPORT_NOT_CHUNK_PARTITIONED: args.force_reimport_not_chunk_partitioned,
workflow.constants.COLLAPSE_PARTITIONS_ON_LOAD: args.collapse_partitions_on_load,
workflow.constants.LOAD_DATA_JOINTLY: args.load_data_jointly
workflow.constants.LOAD_DATA_JOINTLY: args.load_data_jointly,
workflow.constants.ENABLE_INCREMENTAL_SNAPSHOTS: args.enable_incremental_snapshots
}
config = workflow.executor.WorkflowConfig(env_config, workflow.common.BatchConfig(args.batch_config_name,
batch_config_json),
Expand Down
14 changes: 14 additions & 0 deletions rel/source_configs/config.rel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ bound date_partitioned_source_relation = String
bound source_declares_resource = String, String, String
bound source_has_input_format = String, String
bound source_has_container_type = String, String
bound snapshot_catalog
bound source_catalog
bound simple_source_catalog
bound part_resource_date_pattern = String
Expand Down Expand Up @@ -268,6 +269,10 @@ module source
}

def resource_part_index = declares . part_resource:part_index

def has_batch_config(s, cfg) {
source:relname[s] = #(batch_source:relation[cfg])
}
end

/**
Expand Down Expand Up @@ -396,6 +401,15 @@ def missing_resources_json(:[], n, :is_date_partitioned, v) {
from s
}

def missing_resources_json(:[], n, :is_snapshot, v) {
source:needs_resource(s) and
source:index[s] = n and
source:has_batch_config(s, cfg) and
batch_source:snapshot_validity_days(cfg, _) and
boolean_true(v)
from s, cfg
}

def missing_resources_json(:[], i, :resources, :[], k, :uri, n) {
source:index[s] = i and
resource:index[s, r] = k and
Expand Down
5 changes: 4 additions & 1 deletion rel/source_configs/data_reload.rel
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ def resources_to_delete(rel) {
rel = #(transpose[resource_to_invalidate][_])
}

def resources_data_to_delete_idx = enumerate[resources_data_to_delete]
// the following enumerate is split in order to help the compiler handle arity overloads
def resources_data_to_delete_idx = enumerate[r, ix: resources_data_to_delete(r, ix)]
def resources_data_to_delete_idx = enumerate[r: resources_data_to_delete(r)]

def resources_data_to_delete_json(:[], i, :relation, s) {
resources_data_to_delete_idx(i, r, _) and
s = relname_string[r]
Expand Down
5 changes: 5 additions & 0 deletions rel/util/snapshot_diff.rel
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
@outline @nomaintain
module snapshot_diff[{old}, {new}]
def insertions(x...) = new(x...) and not old(x...)
def deletions(x...) = old(x...) and not new(x...)
end
3 changes: 2 additions & 1 deletion test/test_cfg_src_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,8 @@ def _create_cfg_sources_step(sources: List[Source], paths_builders: dict[str, pa
start_date=start_date,
end_date=end_date,
force_reimport=False,
force_reimport_not_chunk_partitioned=False
force_reimport_not_chunk_partitioned=False,
enable_incremental_snapshots=False
)


Expand Down
3 changes: 2 additions & 1 deletion test/test_cfg_src_step_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ def _create_wf_cfg(env_config: EnvConfig, batch_config: BatchConfig) -> Workflow
constants.FORCE_REIMPORT: False,
constants.FORCE_REIMPORT_NOT_CHUNK_PARTITIONED: False,
constants.COLLAPSE_PARTITIONS_ON_LOAD: False,
constants.LOAD_DATA_JOINTLY: False
constants.LOAD_DATA_JOINTLY: False,
constants.ENABLE_INCREMENTAL_SNAPSHOTS: False,
}
return WorkflowConfig(
env=env_config,
Expand Down
2 changes: 1 addition & 1 deletion workflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version_info__ = (0, 0, 49)
__version_info__ = (0, 0, 50)
__version__ = ".".join(map(str, __version_info__))
2 changes: 2 additions & 0 deletions workflow/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"batch_config/workflow/steps/load_data.rel",
"batch_config/workflow/steps/materialize.rel",
"batch_config/workflow/steps/execute_command.rel",
"util/snapshot_diff.rel",
]

COMMON_MODEL_RELATIVE_PATH = "/../rel"
Expand Down Expand Up @@ -81,6 +82,7 @@
FORCE_REIMPORT_NOT_CHUNK_PARTITIONED = "force_reimport_not_chunk_partitioned"
COLLAPSE_PARTITIONS_ON_LOAD = "collapse_partitions_on_load"
LOAD_DATA_JOINTLY = "load_data_jointly"
ENABLE_INCREMENTAL_SNAPSHOTS = "enable_incremental_snapshots"

# Snowflake constants

Expand Down
36 changes: 28 additions & 8 deletions workflow/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,11 @@ class ConfigureSourcesWorkflowStep(WorkflowStep):
end_date: str
force_reimport: bool
force_reimport_not_chunk_partitioned: bool
enable_incremental_snapshots: bool

def __init__(self, idt, name, type_value, state, timing, engine_size, config_files, rel_config_dir, sources,
paths_builders, start_date, end_date, force_reimport, force_reimport_not_chunk_partitioned):
paths_builders, start_date, end_date, force_reimport, force_reimport_not_chunk_partitioned,
enable_incremental_snapshots):
super().__init__(idt, name, type_value, state, timing, engine_size)
self.config_files = config_files
self.rel_config_dir = rel_config_dir
Expand All @@ -143,6 +145,7 @@ def __init__(self, idt, name, type_value, state, timing, engine_size, config_fil
self.end_date = end_date
self.force_reimport = force_reimport
self.force_reimport_not_chunk_partitioned = force_reimport_not_chunk_partitioned
self.enable_incremental_snapshots = enable_incremental_snapshots

def _execute(self, logger: logging.Logger, env_config: EnvConfig, rai_config: RaiConfig):
rai.install_models(logger, rai_config, env_config, build_models(self.config_files, self.rel_config_dir))
Expand All @@ -152,6 +155,15 @@ def _execute(self, logger: logging.Logger, env_config: EnvConfig, rai_config: Ra
declared_sources = {src["source"]: src for src in
rai.execute_relation_json(logger, rai_config, env_config,
constants.DECLARED_DATE_PARTITIONED_SOURCE_REL)}

# if `enable_incremental_snapshots` is set to True, we need to filter out snapshot sources
if self.enable_incremental_snapshots:
# get a set of snapshot source names from the `relation` field
snapshot_sources = {src.relation for src in self.sources if src.snapshot_validity_days and
src.snapshot_validity_days >= 0}
logger.info(f"Snapshot sources skipped, will be reloaded incrementally: {snapshot_sources}")
# filter out snapshot sources from the declared sources
declared_sources = {k: v for k, v in declared_sources.items() if k not in snapshot_sources}
expired_sources = self._calculate_expired_sources(logger, declared_sources)

# mark declared sources for reimport
Expand Down Expand Up @@ -284,14 +296,16 @@ def _get_step(self, logger: logging.Logger, config: WorkflowConfig, idt, name, t
force_reimport = config.step_params.get(constants.FORCE_REIMPORT, False)
force_reimport_not_chunk_partitioned = config.step_params.get(constants.FORCE_REIMPORT_NOT_CHUNK_PARTITIONED,
False)
enable_incremental_snapshots = config.step_params[constants.ENABLE_INCREMENTAL_SNAPSHOTS]
paths_builders = {}
for src in sources:
container = src.container
if container.name not in paths_builders:
paths_builders[container.name] = paths.PathsBuilderFactory.get_path_builder(container)
return ConfigureSourcesWorkflowStep(idt, name, type_value, state, timing, engine_size, step["configFiles"],
rel_config_dir, sources, paths_builders, start_date, end_date,
force_reimport, force_reimport_not_chunk_partitioned)
force_reimport, force_reimport_not_chunk_partitioned,
enable_incremental_snapshots)

@staticmethod
def _parse_sources(step: dict, env_config: EnvConfig) -> List[Source]:
Expand Down Expand Up @@ -328,11 +342,14 @@ def _parse_sources(step: dict, env_config: EnvConfig) -> List[Source]:
class LoadDataWorkflowStep(WorkflowStep):
collapse_partitions_on_load: bool
load_jointly: bool
enable_incremental_snapshots: bool

def __init__(self, idt, name, type_value, state, timing, engine_size, collapse_partitions_on_load, load_jointly):
def __init__(self, idt, name, type_value, state, timing, engine_size, collapse_partitions_on_load, load_jointly,
enable_incremental_snapshots):
super().__init__(idt, name, type_value, state, timing, engine_size)
self.collapse_partitions_on_load = collapse_partitions_on_load
self.load_jointly = load_jointly
self.enable_incremental_snapshots = enable_incremental_snapshots

def _execute(self, logger: logging.Logger, env_config: EnvConfig, rai_config: RaiConfig):
rai.execute_query(logger, rai_config, env_config, q.DELETE_REFRESHED_SOURCES_DATA, readonly=False)
Expand All @@ -353,6 +370,7 @@ def _execute(self, logger: logging.Logger, env_config: EnvConfig, rai_config: Ra

self._load_simple_resources(logger, env_config, rai_config, simple_resources)

# note: async resources do not support snapshot diffing, as CDC should be incremental
if async_resources:
self._load_async_resources(logger, env_config, rai_config, async_resources)

Expand Down Expand Up @@ -425,28 +443,29 @@ def _get_date_part_load_query(self, logger: logging.Logger, config, src):
resources = []
for d in srcs:
resources += d["resources"]
return [q.load_resources(logger, config, resources, src)]
return [q.load_resources(logger, config, resources, src, self.enable_incremental_snapshots)]
else:
logger.info(f"Loading '{source_name}' one date partition at a time")
batch = []
for d in src["dates"]:
logger.info(f"Loading partition for date {d['date']}")

for res in d["resources"]:
batch.append(q.load_resources(logger, config, [res], src))
batch.append(
q.load_resources(logger, config, [res], src, self.enable_incremental_snapshots))
return batch

def _get_simple_src_load_query(self, logger: logging.Logger, config, src):
source_name = src["source"]
logger.info(f"Loading source '{source_name}' not partitioned by date")
if self.collapse_partitions_on_load:
logger.info(f"Loading '{source_name}' all chunk partitions simultaneously")
return [q.load_resources(logger, config, src["resources"], src)]
return [q.load_resources(logger, config, src["resources"], src, self.enable_incremental_snapshots)]
else:
logger.info(f"Loading '{source_name}' one chunk partition at a time")
batch = []
for res in src["resources"]:
batch.append(q.load_resources(logger, config, [res], src))
batch.append(q.load_resources(logger, config, [res], src, self.enable_incremental_snapshots))
return batch

@staticmethod
Expand All @@ -470,8 +489,9 @@ def _get_step(self, logger: logging.Logger, config: WorkflowConfig, idt, name, t
engine_size, step: dict) -> WorkflowStep:
collapse_partitions_on_load = config.step_params[constants.COLLAPSE_PARTITIONS_ON_LOAD]
load_jointly = config.step_params[constants.LOAD_DATA_JOINTLY]
enable_incremental_snapshots = config.step_params[constants.ENABLE_INCREMENTAL_SNAPSHOTS]
return LoadDataWorkflowStep(idt, name, type_value, state, timing, engine_size, collapse_partitions_on_load,
load_jointly)
load_jointly, enable_incremental_snapshots)


class MaterializeWorkflowStep(WorkflowStep):
Expand Down
Loading

0 comments on commit f2d398c

Please sign in to comment.