From f3eb75ecd3c2cc81ee1b3186d09418cd8485420d Mon Sep 17 00:00:00 2001 From: Andrew Nepogoda Date: Fri, 22 Sep 2023 19:37:55 +0200 Subject: [PATCH] Allow multiple locations for source import/export (#23) * Allow multiple locations for source import/export * Fix source_declares_resource clean up during data refresh * remove redundant property from AzureConfig * update readme * simplify blob.py * rename methods in query.py --- .github/workflows/run-tests.yml | 3 - cli-e2e-test/config/loader.toml | 12 ++ cli-e2e-test/config/model/scenario1.json | 2 + cli-e2e-test/config/model/scenario2.json | 1 + cli-e2e-test/config/model/scenario3.json | 1 + cli-e2e-test/config/model/scenario4.json | 2 + cli-e2e-test/test_e2e.py | 5 +- cli/README.md | 58 +++++++-- cli/args.py | 22 ---- cli/runner.py | 7 +- rel/source_configs/config.rel | 19 ++- rel/source_configs/data_reload.rel | 2 +- test/test_cfg_src_step.py | 43 +++---- test/test_cfg_src_step_factory.py | 84 +++++++++++++ test/test_export_step.py | 6 +- workflow/README.md | 46 +++++++ workflow/__init__.py | 2 +- workflow/blob.py | 15 +-- workflow/common.py | 113 +++++++++++------- workflow/constants.py | 25 ++-- workflow/executor.py | 146 ++++++++++------------- workflow/paths.py | 37 ++++-- workflow/query.py | 79 ++++++------ workflow/rai.py | 6 +- workflow/utils.py | 7 +- 25 files changed, 463 insertions(+), 280 deletions(-) create mode 100644 cli-e2e-test/config/loader.toml create mode 100644 test/test_cfg_src_step_factory.py diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 27ba836..a23535e 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -48,9 +48,6 @@ jobs: echo "port = 443" >> config echo "client_id = ${{ env.RAI_CLIENT_ID }}" >> config echo "client_secret = ${{ env.RAI_CLIENT_SECRET }}" >> config - # Create empty toml - cd $GITHUB_WORKSPACE/cli-e2e-test/config - touch loader.toml env: RAI_CLIENT_ID: ${{ secrets.client_id }} RAI_CLIENT_SECRET: ${{ secrets.client_secret }} diff --git a/cli-e2e-test/config/loader.toml b/cli-e2e-test/config/loader.toml new file mode 100644 index 0000000..1ef6691 --- /dev/null +++ b/cli-e2e-test/config/loader.toml @@ -0,0 +1,12 @@ +rai_sdk_http_retries=0 + +[[container]] +name="input" +type="local" +data_path="./data" + +[[container]] +name="export" +type="local" +data_path="./output" + diff --git a/cli-e2e-test/config/model/scenario1.json b/cli-e2e-test/config/model/scenario1.json index 3e3257d..31078df 100644 --- a/cli-e2e-test/config/model/scenario1.json +++ b/cli-e2e-test/config/model/scenario1.json @@ -6,6 +6,7 @@ "configFiles": [ "config/config1.rel" ], + "defaultContainer": "input", "sources": [ { "relation": "zip_city_state_master_data", @@ -74,6 +75,7 @@ "name": "Export", "exportJointly": false, "dateFormat": "%Y%m%d", + "defaultContainer": "export", "exports": [ { "type": "csv", diff --git a/cli-e2e-test/config/model/scenario2.json b/cli-e2e-test/config/model/scenario2.json index 9f1d0c6..d4bb31f 100644 --- a/cli-e2e-test/config/model/scenario2.json +++ b/cli-e2e-test/config/model/scenario2.json @@ -6,6 +6,7 @@ "configFiles": [ "config/config2.rel" ], + "defaultContainer": "input", "sources": [ { "relation": "zip_city_state_master_data", diff --git a/cli-e2e-test/config/model/scenario3.json b/cli-e2e-test/config/model/scenario3.json index ebc1233..fe7351d 100644 --- a/cli-e2e-test/config/model/scenario3.json +++ b/cli-e2e-test/config/model/scenario3.json @@ -6,6 +6,7 @@ "configFiles": [ "config/config2.rel" ], + "defaultContainer": "input", "sources": [ { "relation": "zip_city_state_master_data", diff --git a/cli-e2e-test/config/model/scenario4.json b/cli-e2e-test/config/model/scenario4.json index 714b9c6..7727801 100644 --- a/cli-e2e-test/config/model/scenario4.json +++ b/cli-e2e-test/config/model/scenario4.json @@ -6,6 +6,7 @@ "configFiles": [ "config/config3.rel" ], + "defaultContainer": "input", "sources": [ { "relation": "city_data", @@ -41,6 +42,7 @@ "name": "Export", "exportJointly": false, "dateFormat": "%Y%m%d", + "defaultContainer": "export", "exports": [ { "type": "csv", diff --git a/cli-e2e-test/test_e2e.py b/cli-e2e-test/test_e2e.py index 1ffc795..bcb0929 100644 --- a/cli-e2e-test/test_e2e.py +++ b/cli-e2e-test/test_e2e.py @@ -21,13 +21,10 @@ class CliE2ETest(unittest.TestCase): expected = "./expected_results" resource_name = "wm-cli-e2e-test-" + str(uuid.uuid4()) cmd_with_common_arguments = ["python", "main.py", - "--run-mode", "local", "--env-config", env_config, "--engine", resource_name, "--database", resource_name, - "--rel-config-dir", "./rel", - "--dev-data-dir", dev_data_dir, - "--output-root", output] + "--rel-config-dir", "./rel"] def test_scenario1_model(self): # when diff --git a/cli/README.md b/cli/README.md index 2df37fa..9ba3b65 100644 --- a/cli/README.md +++ b/cli/README.md @@ -6,7 +6,7 @@ This Command-Line Interface (CLI) is designed to provide an easy and interactive 1. Create a batch configuration (ex. `poc.json`) file using the syntax and structure outlined in the [RAI Workflow Framework README](../workflow/README.md). 2. Add `rai-workflow-manager` as dependency to your `requirements.txt` file: ```txt -rai-workflow-manager==0.0.9 +rai-workflow-manager==0.0.13 ``` 3. Build the project: ```bash @@ -25,20 +25,61 @@ import cli.runner if __name__ == "__main__": cli.runner.start() ``` -5. Run the following command to execute the batch configuration: +5. Create `loader.toml` file with the following content: +```toml +[[container]] +name="input" +type="local" +data_path="./data" + +[[container]] +name="export" +type="local" +data_path="./output" +``` +6. Run the following command to execute the batch configuration: ```bash python main.py \ - --run-mode local \ - --dev-data-dir /data \ --rel-config-dir /rel \ --batch-config poc.json \ --env-config loader.toml \ --engine \ - --database \ - --output-root ./output + --database ``` -where ``, `` are the names of some RAI resources to use, `` is the path to the directory containing directory with data and rel sources. +where ``, `` are the names of some RAI resources to use +## `loader.toml` Configuration +The `loader.toml` file is used to specify static properties for the RAI Workflow Framework. It contains the following properties: + +| Description | Property | +|:---------------------------------------------------------------------------------|------------------------| +| RAI profile. | `rai_profile` | +| Path to RAI config. | `rai_profile_path` | +| HTTP retries for RAI sdk in case of errors. (Can be overridden by CLI argument) | `rai_sdk_http_retries` | +| A list of containers to use for loading and exporting data. | `container` | +| The name of the container. | `container.name` | +| The type of the container. Supported types: `local`, `azure` | `container.type` | +| The path in the container. | `container.data_path` | +| Remote container account | `container.account` | +| Remote container SAS token. | `container.sas` | +| Container for remote container SAS token. (Ex. Azure Blob container) | `container.container` | +### Azure container example +```toml +[[container]] +name="input" +type="azure" +data_path="input" +account="account_name" +sas="sas_token" +container="container_name" +``` +### Local container example +```toml +[[container]] +name="input" +type="local" +data_path="./data" +``` ## CLI Arguments | Description | CLI argument | Is required | Default value | Parameter Type | Recognized Values | |:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------|-------------|-------------------------|--------------------------|-----------------------------------------------------| @@ -48,14 +89,11 @@ where ``, `` are the names of some RAI resources to use, `load all partitions (and shards) in one transaction | `--collapse-partitions-on-load` | `False` | `True` | `Bool` | | -| Output folder path for local run mode | `--output-root` | `False` | `../../output` | `String` | | | Logging level for cli | `--log-level` | `False` | `INFO` | `String` | `['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']` | | Drop database before workflow run, or not | `--drop-db` | `False` | `False` | `BooleanOptionalAction` | `True` in case argument presents | | Remove RAI engine and database after run or not | `--cleanup-resources` | `False` | `False` | `Bool` | | diff --git a/cli/args.py b/cli/args.py index f16bd08..d1ae4dc 100644 --- a/cli/args.py +++ b/cli/args.py @@ -1,7 +1,5 @@ from argparse import ArgumentParser, Namespace, BooleanOptionalAction -import workflow.executor - def parse() -> Namespace: parser = ArgumentParser() @@ -36,13 +34,6 @@ def parse() -> Namespace: required=True, type=str ) - parser.add_argument( - "--run-mode", - help="Type of run mode", - required=True, - type=workflow.executor.WorkflowRunMode, - choices=list(workflow.executor.WorkflowRunMode), - ) parser.add_argument( "--start-date", help="Start date for model data. Format: 'YYYYmmdd'", required=False, @@ -69,12 +60,6 @@ def parse() -> Namespace: action="store_true", default=False ) - parser.add_argument( - "--dev-data-dir", help="Directory containing dev data", - required=False, - default="../data", - type=str - ) parser.add_argument( "--rel-config-dir", help="Directory containing rel config files to install", required=False, @@ -95,13 +80,6 @@ def parse() -> Namespace: default=True, type=bool ) - parser.add_argument( - "--output-root", - help="Output folder path for dev mode", - required=False, - default="../../output", - type=str - ) parser.add_argument( "--log-level", help="Set log level", diff --git a/cli/runner.py b/cli/runner.py index 1db50a0..d5ef53b 100644 --- a/cli/runner.py +++ b/cli/runner.py @@ -43,17 +43,14 @@ def start(): # Init workflow executor parameters = { workflow.constants.REL_CONFIG_DIR: args.rel_config_dir, - workflow.constants.OUTPUT_ROOT: args.output_root, - workflow.constants.LOCAL_DATA_DIR: args.dev_data_dir, workflow.constants.START_DATE: args.start_date, workflow.constants.END_DATE: args.end_date, workflow.constants.FORCE_REIMPORT: args.force_reimport, workflow.constants.FORCE_REIMPORT_NOT_CHUNK_PARTITIONED: args.force_reimport_not_chunk_partitioned, workflow.constants.COLLAPSE_PARTITIONS_ON_LOAD: args.collapse_partitions_on_load } - config = workflow.executor.WorkflowConfig(env_config, args.run_mode, - workflow.common.BatchConfig(args.batch_config_name, - batch_config_json), + config = workflow.executor.WorkflowConfig(env_config, workflow.common.BatchConfig(args.batch_config_name, + batch_config_json), args.recover, args.recover_step, args.selected_steps, parameters) executor = workflow.executor.WorkflowExecutor.init(logger, config, resource_manager) end_time = time.time() diff --git a/rel/source_configs/config.rel b/rel/source_configs/config.rel index fc88608..4253024 100644 --- a/rel/source_configs/config.rel +++ b/rel/source_configs/config.rel @@ -1,7 +1,7 @@ bound simple_relation = String bound multi_part_relation = String bound date_partitioned_source = String -bound source_declares_resource = String, String +bound source_declares_resource = String, String, String bound source_has_input_format = String, String bound source_catalog bound simple_source_catalog @@ -97,7 +97,7 @@ module uri from v } - def value(v) { source_declares_resource(_, v) } + def value(v) { source_declares_resource(_, _, v) } def parse(u, i, d) { value(v) and @@ -197,7 +197,7 @@ module source def populates = transpose[relation:identifies] def declares(s, r) { - source_declares_resource(rel, res) and + source_declares_resource(rel, _, res) and r = uri:identifies[ ^URI[res] ] and s = relation:identifies[ rel_name:identifies[ ^RelName[rel] ] ] from rel, res @@ -214,6 +214,12 @@ module source forall(r: declares(s, r) implies not resource:local(r) ) } + def container(s, c) { + source_declares_resource(rel, c, _) and + s = relation:identifies[ rel_name:identifies[ ^RelName[rel] ] ] + from rel + } + def date_partitioned(s) { s = relation:identifies[ rel_name:identifies[ ^RelName[date_partitioned_source] ] ] } def format(s, f) { @@ -352,6 +358,13 @@ def missing_resources_json(:[], n, :is_local, "Y") { from s } +def missing_resources_json(:[], n, :container, v) { + source:needs_resource(s) and + source:index[s] = n and + source:container(s, v) + from s +} + def missing_resources_json(:[], n, :is_date_partitioned, v) { source:needs_resource(s) and source:index[s] = n and diff --git a/rel/source_configs/data_reload.rel b/rel/source_configs/data_reload.rel index 10bfa9c..366d729 100644 --- a/rel/source_configs/data_reload.rel +++ b/rel/source_configs/data_reload.rel @@ -29,7 +29,7 @@ def simple_sources(rel, path) { * All simple sources are affected if they match with declared sources. */ def potentially_affected_sources(rel, path) { - source_declares_resource(rel, path) and + source_declares_resource(rel, _, path) and simple_sources(rel, path) } /* diff --git a/test/test_cfg_src_step.py b/test/test_cfg_src_step.py index 5da15a0..85146ba 100644 --- a/test/test_cfg_src_step.py +++ b/test/test_cfg_src_step.py @@ -24,7 +24,7 @@ def test_get_date_range_not_part(self): is_chunk_partitioned=False, ) paths_builder = Mock() - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, None) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, None) # When calling _get_date_range days = workflow_step._get_date_range(self.logger, test_src) @@ -41,7 +41,7 @@ def test_inflate_sources_not_part(self): paths_builder = _create_path_builder_mock([ paths.FilePath(path="test/test_non_part.csv"), ]) - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, None) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, None) # When calling _inflate_sources workflow_step._inflate_sources(self.logger) # Then @@ -62,7 +62,7 @@ def test_get_date_range_date_part_1day(self): ) end_date = "20220105" paths_builder = Mock() - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, end_date) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, end_date) # When calling _get_date_range days = workflow_step._get_date_range(self.logger, test_src) @@ -85,7 +85,7 @@ def test_inflate_sources_date_part_1day_multiple_paths(self): paths.FilePath(path="test/test_20220105_3.csv", as_of_date="20220105"), ]) end_date = "20220105" - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, end_date) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, end_date) # When calling _inflate_sources workflow_step._inflate_sources(self.logger) # Then @@ -104,7 +104,7 @@ def test_get_date_range_date_part_10days(self): ) end_date = "20220115" paths_builder = Mock() - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, end_date) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, end_date) # When calling _get_date_range days = workflow_step._get_date_range(self.logger, test_src) @@ -122,7 +122,7 @@ def test_inflate_sources_date_part_10days(self): paths.FilePath(path=f"test/test_{day}_1.csv", as_of_date=f"{day}") for day in range(20220101, 20220116) ]) # 20220101, 20220102, ..., 20220115 end_date = "20220115" - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, end_date) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, end_date) # When calling _inflate_sources workflow_step._inflate_sources(self.logger) # Then @@ -137,7 +137,7 @@ def test_get_date_range_date_part_10days_offset_2days(self): ) end_date = "20220112" paths_builder = Mock() - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, end_date) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, end_date) # When calling _get_date_range days = workflow_step._get_date_range(self.logger, test_src) @@ -156,7 +156,7 @@ def test_inflate_sources_date_part_10days_offset_2days(self): for i in range(2) ]) # 20220101, 20220102, ..., 20220110 end_date = "20220112" - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, end_date) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, end_date) # When calling _inflate_sources workflow_step._inflate_sources(self.logger) # Then @@ -177,7 +177,7 @@ def test_get_date_range_snapshot_valid_the_same_day(self): ) end_date = "20220105" paths_builder = Mock() - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, end_date) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, end_date) # When calling _get_date_range days = workflow_step._get_date_range(self.logger, test_src) @@ -198,7 +198,7 @@ def test_inflate_sources_snapshot_valid_the_same_day(self): paths.FilePath(path="test/snapshot_20220105.csv", as_of_date="20220105"), ]) end_date = "20220105" - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, end_date) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, end_date) # When calling _inflate_sources workflow_step._inflate_sources(self.logger) # Then @@ -238,7 +238,7 @@ def test_inflate_sources_snapshot_1day(self): paths.FilePath(path="test/snapshot_20220105.csv", as_of_date="20220105"), ]) end_date = "20220105" - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, end_date) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, end_date) # When calling _inflate_sources workflow_step._inflate_sources(self.logger) # Then @@ -256,7 +256,7 @@ def test_get_date_range_snapshot_1day_offset_by_1day(self): ) end_date = "20220105" paths_builder = Mock() - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, end_date) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, end_date) # When calling _get_date_range days = workflow_step._get_date_range(self.logger, test_src) @@ -277,7 +277,7 @@ def test_inflate_sources_snapshot_1day_offset_by_1day(self): paths.FilePath(path="test/snapshot_20220104.csv", as_of_date="20220104"), ]) end_date = "20220105" - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, end_date) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, end_date) # When calling _inflate_sources workflow_step._inflate_sources(self.logger) # Then @@ -312,7 +312,7 @@ def test_get_date_range_snapshot_30days_before_start(self): ) paths_builder = _create_path_builder_mock([]) end_date = "20220131" - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, end_date) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, end_date) # When calling _inflate_sources workflow_step._inflate_sources(self.logger) # Then @@ -330,7 +330,7 @@ def test_get_date_range_snapshot_30days_at_start(self): paths.FilePath(path="test/snapshot_20220101.csv", as_of_date="20220101"), ]) end_date = "20220131" - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, end_date) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, end_date) # When calling _inflate_sources workflow_step._inflate_sources(self.logger) # Then @@ -348,7 +348,7 @@ def test_get_date_range_snapshot_30days_in_the_middle(self): paths.FilePath(path="test/snapshot_20220115.csv", as_of_date="20220115"), ]) end_date = "20220131" - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, end_date) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, end_date) # When calling _inflate_sources workflow_step._inflate_sources(self.logger) # Then @@ -366,7 +366,7 @@ def test_get_date_range_snapshot_30days_at_the_end(self): paths.FilePath(path="test/snapshot_20220130.csv", as_of_date="20220130"), ]) end_date = "20220131" - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, end_date) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, end_date) # When calling _inflate_sources workflow_step._inflate_sources(self.logger) # Then @@ -387,7 +387,7 @@ def test_inflate_sources_snapshot_1day_multiple_paths(self): paths.FilePath(path="test/test_20220105_2.csv", as_of_date="20220105"), ]) end_date = "20220105" - workflow_step = _create_cfg_sources_step([test_src], paths_builder, None, end_date) + workflow_step = _create_cfg_sources_step([test_src], {"default": paths_builder}, None, end_date) # When calling _inflate_sources workflow_step._inflate_sources(self.logger) # Then @@ -402,6 +402,7 @@ def _create_test_source(is_chunk_partitioned: bool = True, is_date_partitioned: loads_number_of_days: int = 1, offset_by_number_of_days: int = 0, snapshot_validity_days=None) -> Source: return Source( + container="default", relation="test", relative_path="test", input_format="test", @@ -415,8 +416,8 @@ def _create_test_source(is_chunk_partitioned: bool = True, is_date_partitioned: ) -def _create_cfg_sources_step(sources: List[Source], paths_builder: paths.PathsBuilder, start_date, end_date) ->\ - ConfigureSourcesWorkflowStep: +def _create_cfg_sources_step(sources: List[Source], paths_builders: dict[str, paths.PathsBuilder], start_date, + end_date) -> ConfigureSourcesWorkflowStep: return ConfigureSourcesWorkflowStep( idt=uuid.uuid4(), name="test", @@ -426,7 +427,7 @@ def _create_cfg_sources_step(sources: List[Source], paths_builder: paths.PathsBu config_files=[], rel_config_dir="", sources=sources, - paths_builder=paths_builder, + paths_builders=paths_builders, start_date=start_date, end_date=end_date, force_reimport=False, diff --git a/test/test_cfg_src_step_factory.py b/test/test_cfg_src_step_factory.py new file mode 100644 index 0000000..1f51906 --- /dev/null +++ b/test/test_cfg_src_step_factory.py @@ -0,0 +1,84 @@ +import logging +import unittest +from unittest.mock import Mock, MagicMock + +from workflow import paths +from workflow import constants +from workflow.common import Source, BatchConfig, EnvConfig, Container, ContainerType +from workflow.executor import ConfigureSourcesWorkflowStepFactory, WorkflowConfig, WorkflowStepState + + +class TestConfigureSourcesWorkflowStepFactory(unittest.TestCase): + logger: logging.Logger = Mock() + + def test_get_step(self): + # Setup factory spy + factory = ConfigureSourcesWorkflowStepFactory() + spy = MagicMock(wraps=factory._parse_sources) + sources = [_create_test_source("azure", "src1"), _create_test_source("local", "src2"), + _create_test_source("local", "src3"), _create_test_source("local", "src4")] + spy.return_value = sources + factory._parse_sources = spy + config = _create_wf_cfg(EnvConfig({"azure": _create_container("default", ContainerType.AZURE), + "local": _create_container("local", ContainerType.LOCAL)}), Mock()) + # When + step = factory._get_step(self.logger, config, "1", "name", WorkflowStepState.INIT, 0, None, {"configFiles": []}) + # Then + self.assertEqual("1", step.idt) + self.assertEqual("name", step.name) + self.assertEqual(WorkflowStepState.INIT, step.state) + self.assertEqual(0, step.timing) + self.assertEqual(None, step.engine_size) + self.assertEqual([], step.config_files) + self.assertEqual("./rel", step.rel_config_dir) + self.assertEqual(sources, step.sources) + self.assertEqual(2, len(step.paths_builders.keys())) + self.assertIsInstance(step.paths_builders.get("local"), paths.LocalPathsBuilder) + self.assertIsInstance(step.paths_builders.get("azure"), paths.AzurePathsBuilder) + self.assertEqual("2021-01-01", step.start_date) + self.assertEqual("2021-01-01", step.end_date) + self.assertFalse(step.force_reimport) + self.assertFalse(step.force_reimport_not_chunk_partitioned) + + +def _create_test_source(container: str, relation: str) -> Source: + return Source( + container=container, + relation=relation, + relative_path="test", + input_format="test", + extensions=["test"], + is_chunk_partitioned=True, + is_date_partitioned=True, + loads_number_of_days=0, + offset_by_number_of_days=0, + snapshot_validity_days=0, + paths=[] + ) + + +def _create_container(name: str, c_type: ContainerType) -> Container: + return Container( + name=name, + type=c_type, + params={} + ) + + +def _create_wf_cfg(env_config: EnvConfig, batch_config: BatchConfig) -> WorkflowConfig: + parameters = { + constants.REL_CONFIG_DIR: "./rel", + constants.START_DATE: "2021-01-01", + constants.END_DATE: "2021-01-01", + constants.FORCE_REIMPORT: False, + constants.FORCE_REIMPORT_NOT_CHUNK_PARTITIONED: False, + constants.COLLAPSE_PARTITIONS_ON_LOAD: False + } + return WorkflowConfig( + env=env_config, + batch_config=batch_config, + recover=False, + recover_step="", + selected_steps=[], + step_params=parameters + ) diff --git a/test/test_export_step.py b/test/test_export_step.py index 8e2de7e..c9967f3 100644 --- a/test/test_export_step.py +++ b/test/test_export_step.py @@ -15,7 +15,7 @@ class TestConfigureSourcesWorkflowStep(unittest.TestCase): @patch('workflow.rai.execute_query_take_single') def test_should_export_should_not_export_valid_snapshot(self, mock_execute_query): # given - export = Export([], "relation", "relative_path", FileType.CSV, "snapshot_binding") + export = Export([], "relation", "relative_path", FileType.CSV, "snapshot_binding", "default") end_date = "20220105" step = _create_export_step([export], end_date) rai_config = RaiConfig(Mock(), "engine", "database") @@ -29,7 +29,7 @@ def test_should_export_should_not_export_valid_snapshot(self, mock_execute_quer @patch('workflow.rai.execute_query_take_single') def test_should_export_should_export_snapshot_expiring_today(self, mock_execute_query): # given - export = Export([], "relation", "relative_path", FileType.CSV, "snapshot_binding") + export = Export([], "relation", "relative_path", FileType.CSV, "snapshot_binding", "default") end_date = "20220105" step = _create_export_step([export], end_date) rai_config = RaiConfig(Mock(), "engine", "database") @@ -43,7 +43,7 @@ def test_should_export_should_export_snapshot_expiring_today(self, mock_execute @patch('workflow.rai.execute_query_take_single') def test_should_export_should_export_expired_snapshot(self, mock_execute_query): # given - export = Export([], "relation", "relative_path", FileType.CSV, "snapshot_binding") + export = Export([], "relation", "relative_path", FileType.CSV, "snapshot_binding", "default") end_date = "20220105" step = _create_export_step([export], end_date) rai_config = RaiConfig(Mock(), "engine", "database") diff --git a/workflow/README.md b/workflow/README.md index 4c17fab..3c20c52 100644 --- a/workflow/README.md +++ b/workflow/README.md @@ -44,12 +44,14 @@ The order of the steps in the batch configuration is important, as the RAI Workf Steps of this type are used to configure sources which workflow manager will use during [Load Data](#load-data) step. * `configFiles`(required) is used to specify the configuration files to install. Leave empty if no configuration files are needed. +* `defaultContainer`(required) is used to specify the default container for sources input. * `sources`(required) is used to specify the sources to configure. * `relation`(required) is used to specify the name of the relation to which the source should be uploaded. * `isChunkPartitioned`(optional) is used to specify whether the source is chunk partitioned(split on chunks). By default `False`. * `relativePath`(required) is used to specify the relative path of the source on Blob storage or in data on file system. * `inputFormat`(required) is used to specify the input format of the source. The supported input formats are `csv`, `jsonl`. * `isDatePartitioned`(optional) is used to specify is partitioned by date. By default `False`. + * `container`(optional) is used to specify the container for particular source. If not specified, the `defaultContainer` will be used. * `extensions`(optional) is used to specify the extensions of the source which will be associated with the `inputFormat`. If not specified, the `inputFormat` will be used default extensions are used. * `loadsNumberOfDays`(optional) is used to specify the number of days to load. * `offsetByNumberOfDays`(optional) is used to specify the number of days to offset the current (end) date by. @@ -61,9 +63,11 @@ Steps of this type are used to configure sources which workflow manager will use "configFiles": [ "source_configs/my_config.rel" ], + "defaultContainer": "input", "sources": [ { "relation": "master_data", + "container": "azure_input", "relativePath": "master_source/data", "inputFormat": "csv" }, @@ -128,7 +132,49 @@ Steps of this type are used to materialize relations in the RAI database. ``` ### Export +Steps of this type are used to export data from RAI database. +* `exportJointly`(required) is used to specify whether the relations should be exported jointly or separately. +* `dateFormat`(required) is used to specify date format for export folder. +* `defaultContainer`(required) is used to specify the default container for export. +* `exports`(required) is used to specify relations to export. + * `type`(required) is used to specify the type of export. The supported types are `csv`. + * `configRelName`(required) is used to specify the name of the relation which configures the export. + * `relativePath`(required) is used to specify the relative path of the export on Blob storage or in data on file system. + * `container`(optional) is used to specify the container for particular export. If not specified, the `defaultContainer` will be used. + * `snapshotBinding`(optional) is used to specify the name of the source which is bound to the export. If specified, the export will be skipped if the snapshot is still valid. + * `metaKey`(optional) is used to specify the meta-key for the export. If specified, the export will be specialized by the meta-key. +```json +{ + "type": "Export", + "name": "Export", + "exportJointly": false, + "dateFormat": "%Y%m%d", + "defaultContainer": "export", + "exports": [ + { + "type": "csv", + "configRelName": "account_journey_csv", + "relativePath": "account_journey" + }, + { + "type": "csv", + "container": "azure_output", + "configRelName": "device_seen_snapshot_csv", + "relativePath": "device_seen_snapshot", + "snapshotBinding": "device_seen_snapshot" + }, + { + "type": "csv", + "configRelName": "meta_exports", + "relativePath": "meta_exports", + "metaKey": [ + "Symbol" + ] + } + ] +} +``` #### Common config options: * `data` relation contains the well-defined (:col, key..., val) data to be exported; diff --git a/workflow/__init__.py b/workflow/__init__.py index 25c88da..250d4ad 100644 --- a/workflow/__init__.py +++ b/workflow/__init__.py @@ -12,5 +12,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version_info__ = (0, 0, 12) +__version_info__ = (0, 0, 13) __version__ = ".".join(map(str, __version_info__)) diff --git a/workflow/blob.py b/workflow/blob.py index ccf1dc5..b6dafc5 100644 --- a/workflow/blob.py +++ b/workflow/blob.py @@ -3,17 +3,14 @@ from azure.storage.blob import BlobServiceClient from workflow.common import FileFormat -from workflow.common import EnvConfig +from workflow.common import AzureConfig from workflow.constants import BLOB_PAGE_SIZE -def list_files_in_containers(logger: logging.Logger, env_config: EnvConfig, path_prefix) -> List[str]: - azure_account = env_config.azure_import_account - azure_container = env_config.azure_import_container - account_url = f"https://{azure_account}.blob.core.windows.net" - - blob_service_client = BlobServiceClient(account_url=account_url, credential=env_config.azure_import_sas) - container_client = blob_service_client.get_container_client(azure_container) +def list_files_in_containers(logger: logging.Logger, config: AzureConfig, path_prefix) -> List[str]: + blob_service_client = BlobServiceClient(account_url=f"https://{config.account}.blob.core.windows.net", + credential=config.sas) + container_client = blob_service_client.get_container_client(config.container) # Get a list of blobs in the folder logger.debug(f"Path prefix to list blob files: {path_prefix}") @@ -22,7 +19,7 @@ def list_files_in_containers(logger: logging.Logger, env_config: EnvConfig, path for blob in page: blob_name = blob.name if FileFormat.is_supported(blob_name): - paths.append(f"azure://{azure_account}.blob.core.windows.net/{azure_container}/{blob_name}") + paths.append(f"azure://{config.account}.blob.core.windows.net/{config.container}/{blob_name}") else: logger.debug(f"Skip unsupported file from blob: {blob_name}") return paths diff --git a/workflow/common.py b/workflow/common.py index 991060a..84511ba 100644 --- a/workflow/common.py +++ b/workflow/common.py @@ -4,9 +4,8 @@ from railib import api -from workflow.constants import AZURE_EXPORT_ACCOUNT, AZURE_EXPORT_CONTAINER, AZURE_EXPORT_DATA_PATH, \ - AZURE_EXPORT_NUM_FILES, AZURE_EXPORT_SAS, AZURE_IMPORT_ACCOUNT, AZURE_IMPORT_CONTAINER, AZURE_IMPORT_DATA_PATH, \ - AZURE_IMPORT_SAS +from workflow.constants import AZURE_ACCOUNT, AZURE_CONTAINER, AZURE_DATA_PATH, AZURE_SAS, LOCAL_DATA_PATH, CONTAINER, \ + CONTAINER_TYPE, CONTAINER_NAME class MetaEnum(EnumMeta): @@ -49,22 +48,62 @@ class FileType(str, BaseEnum): JSONL = 'JSONL' -class SourceType(Enum): - LOCAL = 1 - REMOTE = 2 +class ContainerType(str, BaseEnum): + LOCAL = 'local' + AZURE = 'azure' + + def __str__(self): + return self.value @staticmethod def from_source(src): if 'is_local' in src and src['is_local'] == 'Y': - return SourceType.LOCAL + return ContainerType.LOCAL elif 'is_remote' in src and src['is_remote'] == 'Y': - return SourceType.REMOTE + return ContainerType.AZURE else: raise ValueError("Source is neither local nor remote.") +@dataclasses.dataclass +class Container: + name: str + type: ContainerType + params: dict[str, Any] + + +@dataclasses.dataclass +class AzureConfig: + account: str + container: str + data_path: str + sas: str + + +@dataclasses.dataclass +class LocalConfig: + data_path: str + + +class ConfigExtractor: + + @staticmethod + def azure_from_env_vars(env_vars: dict[str, Any]): + return AzureConfig( + account=env_vars.get(AZURE_ACCOUNT, ""), + container=env_vars.get(AZURE_CONTAINER, ""), + data_path=env_vars.get(AZURE_DATA_PATH, ""), + sas=env_vars.get(AZURE_SAS, "") + ) + + @staticmethod + def local_from_env_vars(env_vars: dict[str, Any]): + return LocalConfig(data_path=env_vars.get(LOCAL_DATA_PATH, "")) + + @dataclasses.dataclass class Source: + container: str relation: str relative_path: str input_format: str @@ -77,7 +116,7 @@ class Source: paths: List[str] = dataclasses.field(default_factory=list) def to_paths_csv(self) -> str: - return "\n".join([f"{self.relation},{p}" for p in self.paths]) + return "\n".join([f"{self.relation},{self.container},{p}" for p in self.paths]) def to_chunk_partitioned_paths_csv(self) -> str: return "\n".join([f"{self.relation},{path},{self.is_chunk_partitioned}" for path in self.paths]) @@ -95,45 +134,28 @@ class RaiConfig: @dataclasses.dataclass class EnvConfig: - # Azure EXPORT blob - azure_export_account: str - azure_export_container: str - azure_export_data_path: str - azure_export_num_files: int - azure_export_sas: str - # Azure IMPORT blob - azure_import_account: str - azure_import_container: str - azure_import_data_path: str - azure_import_sas: str + containers: dict[str, Container] + + EXTRACTORS = { + ContainerType.AZURE: lambda env_vars: ConfigExtractor.azure_from_env_vars(env_vars), + ContainerType.LOCAL: lambda env_vars: ConfigExtractor.local_from_env_vars(env_vars) + } + + def container_name_to_type(self) -> dict[str, ContainerType]: + return {container.name: container.type for container in self.containers.values()} + + def get_container(self, name: str) -> Container: + return self.containers[name] @staticmethod def from_env_vars(env_vars: dict[str, Any]): - azure_export_account = env_vars[AZURE_EXPORT_ACCOUNT] if AZURE_EXPORT_ACCOUNT in env_vars else "" - azure_export_container = env_vars[ - AZURE_EXPORT_CONTAINER] if AZURE_EXPORT_CONTAINER in env_vars else "" - azure_export_data_path = env_vars[ - AZURE_EXPORT_DATA_PATH] if AZURE_EXPORT_DATA_PATH in env_vars else "" - azure_export_num_files = env_vars[ - AZURE_EXPORT_NUM_FILES] if AZURE_EXPORT_NUM_FILES in env_vars else 1 - azure_export_sas = env_vars[AZURE_EXPORT_SAS] if AZURE_EXPORT_SAS in env_vars else "" - azure_import_account = env_vars[AZURE_IMPORT_ACCOUNT] if AZURE_IMPORT_ACCOUNT in env_vars else "" - azure_import_container = env_vars[ - AZURE_IMPORT_CONTAINER] if AZURE_IMPORT_CONTAINER in env_vars else "" - azure_import_data_path = env_vars[ - AZURE_IMPORT_DATA_PATH] if AZURE_IMPORT_DATA_PATH in env_vars else "" - azure_import_sas = env_vars[AZURE_IMPORT_SAS] if AZURE_IMPORT_SAS in env_vars else "" - return EnvConfig( - azure_export_account=azure_export_account, - azure_export_container=azure_export_container, - azure_export_data_path=azure_export_data_path, - azure_export_num_files=azure_export_num_files, - azure_export_sas=azure_export_sas, - azure_import_account=azure_import_account, - azure_import_container=azure_import_container, - azure_import_data_path=azure_import_data_path, - azure_import_sas=azure_import_sas - ) + containers = {} + for container in env_vars.get(CONTAINER, []): + name = container[CONTAINER_NAME] + containers[name] = Container(name=container[CONTAINER_NAME], + type=ContainerType[container[CONTAINER_TYPE].upper()], + params=container) + return EnvConfig(containers) @dataclasses.dataclass @@ -143,6 +165,7 @@ class Export: relative_path: str file_type: FileType snapshot_binding: str + container: str offset_by_number_of_days: int = 0 diff --git a/workflow/constants.py b/workflow/constants.py index d084741..47ab6bb 100644 --- a/workflow/constants.py +++ b/workflow/constants.py @@ -35,23 +35,20 @@ "JSON": "load_json" # TODO: maybe load_json_general? } # Env config params -RAI_PROFILE = "RAI_PROFILE" -RAI_PROFILE_PATH = "RAI_PROFILE_PATH" -RAI_SDK_HTTP_RETRIES = "RAI_SDK_HTTP_RETRIES" -AZURE_EXPORT_ACCOUNT = "AZURE_EXPORT_ACCOUNT" -AZURE_EXPORT_CONTAINER = "AZURE_EXPORT_CONTAINER" -AZURE_EXPORT_DATA_PATH = "AZURE_EXPORT_DATA_PATH" -AZURE_EXPORT_NUM_FILES = "AZURE_EXPORT_NUM_FILES" -AZURE_EXPORT_SAS = "AZURE_EXPORT_SAS" -AZURE_IMPORT_ACCOUNT = "AZURE_IMPORT_ACCOUNT" -AZURE_IMPORT_CONTAINER = "AZURE_IMPORT_CONTAINER" -AZURE_IMPORT_DATA_PATH = "AZURE_IMPORT_DATA_PATH" -AZURE_IMPORT_SAS = "AZURE_IMPORT_SAS" +CONTAINER = "container" +CONTAINER_TYPE = "type" +CONTAINER_NAME = "name" +RAI_PROFILE = "rai_profile" +RAI_PROFILE_PATH = "rai_profile_path" +RAI_SDK_HTTP_RETRIES = "rai_sdk_http_retries" +AZURE_ACCOUNT = "account" +AZURE_CONTAINER = "container" +AZURE_DATA_PATH = "data_path" +AZURE_SAS = "sas" +LOCAL_DATA_PATH = "data_path" # Step parameters REL_CONFIG_DIR = "rel_config_dir" -OUTPUT_ROOT = "output_root" -LOCAL_DATA_DIR = "local_data_dir" START_DATE = "start_date" END_DATE = "end_date" FORCE_REIMPORT = "force_reimport" diff --git a/workflow/executor.py b/workflow/executor.py index 6a48ce7..c5d4799 100644 --- a/workflow/executor.py +++ b/workflow/executor.py @@ -10,7 +10,7 @@ from more_itertools import peekable from workflow import query as q, paths, rai, constants -from workflow.common import EnvConfig, RaiConfig, Source, BatchConfig, Export, FileType +from workflow.common import EnvConfig, RaiConfig, Source, BatchConfig, Export, FileType, ContainerType from workflow.manager import ResourceManager from workflow.utils import save_csv_output, format_duration, build_models, extract_date_range, build_relation_path, \ get_common_model_relative_path @@ -24,14 +24,6 @@ EXPORT = 'Export' -class WorkflowRunMode(Enum): - LOCAL = 'local' - REMOTE = 'remote' - - def __str__(self): - return self.value - - class WorkflowStepState(str, Enum): INIT = 'INIT' IN_PROGRESS = 'IN_PROGRESS' @@ -42,7 +34,6 @@ class WorkflowStepState(str, Enum): @dataclasses.dataclass class WorkflowConfig: env: EnvConfig - run_mode: WorkflowRunMode batch_config: BatchConfig recover: bool recover_step: str @@ -61,7 +52,7 @@ def __init__(self, idt, name, state, timing, engine_size): self.idt = idt self.name = name self.state = state - self.time = timing + self.timing = timing self.engine_size = engine_size def execute(self, logger: logging.Logger, env_config: EnvConfig, rai_config: RaiConfig): @@ -127,19 +118,19 @@ class ConfigureSourcesWorkflowStep(WorkflowStep): config_files: List[str] rel_config_dir: str sources: List[Source] - paths_builder: paths.PathsBuilder + paths_builders: dict[str, paths.PathsBuilder] start_date: str end_date: str force_reimport: bool force_reimport_not_chunk_partitioned: bool - def __init__(self, idt, name, state, timing, engine_size, config_files, rel_config_dir, sources, paths_builder, + def __init__(self, idt, name, state, timing, engine_size, config_files, rel_config_dir, sources, paths_builders, start_date, end_date, force_reimport, force_reimport_not_chunk_partitioned): super().__init__(idt, name, state, timing, engine_size) self.config_files = config_files self.rel_config_dir = rel_config_dir self.sources = sources - self.paths_builder = paths_builder + self.paths_builders = paths_builders self.start_date = start_date self.end_date = end_date self.force_reimport = force_reimport @@ -162,8 +153,8 @@ def _inflate_sources(self, logger: logging.Logger): for src in self.sources: logger.info(f"Inflating source: '{src.relation}'") days = self._get_date_range(logger, src) - inflated_paths = self.paths_builder.build(logger, days, src.relative_path, src.extensions, - src.is_date_partitioned) + inflated_paths = self.paths_builders[src.container].build(logger, days, src.relative_path, src.extensions, + src.is_date_partitioned) if src.is_date_partitioned: # after inflating we take the last `src.loads_number_of_days` days and reduce into an array of paths inflated_paths.sort(key=lambda v: v.as_of_date) @@ -198,40 +189,39 @@ class ConfigureSourcesWorkflowStepFactory(WorkflowStepFactory): def _validate_params(self, config: WorkflowConfig, step: dict) -> None: super()._validate_params(config, step) end_date = config.step_params[constants.END_DATE] - sources = self._parse_sources(step["sources"]) + sources = self._parse_sources(step) if not end_date: for s in sources: if s.is_date_partitioned: raise ValueError(f"End date is required for date partitioned source: {s.relation}") def _required_params(self, config: WorkflowConfig) -> List[str]: - required_params = [constants.REL_CONFIG_DIR, constants.START_DATE, constants.END_DATE] - if config.run_mode == WorkflowRunMode.LOCAL: - required_params.append(constants.LOCAL_DATA_DIR) - return required_params + return [constants.REL_CONFIG_DIR, constants.START_DATE, constants.END_DATE] def _get_step(self, logger: logging.Logger, config: WorkflowConfig, idt, name, state, timing, engine_size, - step: dict) -> WorkflowStep: - if config.run_mode == WorkflowRunMode.LOCAL: - local_data_dir = config.step_params[constants.LOCAL_DATA_DIR] - paths_builder = paths.LocalPathsBuilder(local_data_dir) - elif config.run_mode == WorkflowRunMode.REMOTE: - paths_builder = paths.RemotePathsBuilder(config.env) - else: - raise Exception("unsupported mode") + step: dict) -> ConfigureSourcesWorkflowStep: rel_config_dir = config.step_params[constants.REL_CONFIG_DIR] - sources = self._parse_sources(step["sources"]) + sources = self._parse_sources(step) start_date = config.step_params[constants.START_DATE] end_date = config.step_params[constants.END_DATE] force_reimport = config.step_params.get(constants.FORCE_REIMPORT, False) force_reimport_not_chunk_partitioned = config.step_params.get(constants.FORCE_REIMPORT_NOT_CHUNK_PARTITIONED, False) + env_config = config.env + paths_builders = {} + for src in sources: + container_name = src.container + if container_name not in paths_builders: + container = env_config.get_container(container_name) + paths_builders[container_name] = paths.PathsBuilderFactory.get_path_builder(container) return ConfigureSourcesWorkflowStep(idt, name, state, timing, engine_size, step["configFiles"], rel_config_dir, - sources, paths_builder, start_date, end_date, force_reimport, + sources, paths_builders, start_date, end_date, force_reimport, force_reimport_not_chunk_partitioned) @staticmethod - def _parse_sources(sources: List[Dict]) -> List[Source]: + def _parse_sources(step: dict) -> List[Source]: + sources = step["sources"] + default_container = step["defaultContainer"] result = [] for source in sources: if "future" not in source or not source["future"]: @@ -244,7 +234,9 @@ def _parse_sources(sources: List[Dict]) -> List[Source]: loads_number_of_days = source.get("loadsNumberOfDays") offset_by_number_of_days = source.get("offsetByNumberOfDays") snapshot_validity_days = source.get("snapshotValidityDays") + container = source.get("container", default_container) result.append(Source( + container, relation, relative_path, input_format, @@ -307,7 +299,11 @@ def _load_source(self, logger: logging.Logger, env_config: EnvConfig, rai_config @staticmethod def _load_resource(logger: logging.Logger, env_config: EnvConfig, rai_config: RaiConfig, resources, src) -> None: try: - rai.execute_query(logger, rai_config, q.load_resources(logger, env_config, resources, src), readonly=False) + container = env_config.get_container(src["container"]) + rai.execute_query(logger, rai_config, + q.load_resources(logger, EnvConfig.EXTRACTORS[container.type](container.params), + resources, src), readonly=False) + except KeyError as e: logger.error(f"Unsupported file type: {src['file_type']}. Skip the source: {src}", e) except ValueError as e: @@ -358,6 +354,18 @@ class ExportWorkflowStep(WorkflowStep): date_format: str end_date: str + EXPORT_FUNCTION = { + ContainerType.LOCAL: + lambda logger, rai_config, exports, end_date, date_format, container: save_csv_output( + rai.execute_query_csv(logger, rai_config, q.export_relations_local(logger, exports)), + EnvConfig.EXTRACTORS[container.type](container.params)), + ContainerType.AZURE: + lambda logger, rai_config, exports, end_date, date_format, container: rai.execute_query( + logger, rai_config, + q.export_relations_to_azure(logger, EnvConfig.EXTRACTORS[container.type](container.params), exports, + end_date, date_format)) + } + def __init__(self, idt, name, state, timing, engine_size, exports, export_jointly, date_format, end_date): super().__init__(idt, name, state, timing, engine_size) self.exports = exports @@ -369,11 +377,19 @@ def execute(self, logger: logging.Logger, env_config: EnvConfig, rai_config: Rai logger.info("Executing Export step..") exports = list(filter(lambda e: self._should_export(logger, rai_config, e), self.exports)) + name_to_type = env_config.container_name_to_type() if self.export_jointly: - self._export(logger, env_config, rai_config, exports) + exports.sort(key=lambda e: e.container) + container_groups = {container: list(group) for container, group in + groupby(exports, key=lambda e: e.container)} + for container, grouped_exports in container_groups.items(): + self.EXPORT_FUNCTION[name_to_type[container]](logger, rai_config, grouped_exports, self.end_date, + self.date_format, env_config.get_container(container)) else: for export in exports: - self._export(logger, env_config, rai_config, [export]) + container = export.container + self.EXPORT_FUNCTION[name_to_type[container]](logger, rai_config, [export], self.end_date, + self.date_format, env_config.get_container(container)) def _should_export(self, logger: logging.Logger, rai_config: RaiConfig, export: Export) -> bool: if export.snapshot_binding is None: @@ -392,77 +408,39 @@ def _should_export(self, logger: logging.Logger, rai_config: RaiConfig, export: f"Skipping export of {export.relation}: defined as a snapshot and the current one is still valid") return should_export - def _export(self, logger: logging.Logger, env_config: EnvConfig, rai_config: RaiConfig, exports): - raise NotImplementedError("This class is abstract") - class ExportWorkflowStepFactory(WorkflowStepFactory): def _required_params(self, config: WorkflowConfig) -> List[str]: - required_params = [] - if config.run_mode == WorkflowRunMode.LOCAL: - required_params.append(constants.OUTPUT_ROOT) - elif config.run_mode == WorkflowRunMode.REMOTE: - required_params.append(constants.END_DATE) - return required_params + return [constants.END_DATE] def _get_step(self, logger: logging.Logger, config: WorkflowConfig, idt, name, state, timing, engine_size, step: dict) -> WorkflowStep: exports = self._load_exports(logger, step) end_date = config.step_params[constants.END_DATE] - if config.run_mode == WorkflowRunMode.LOCAL: - output_root = config.step_params[constants.OUTPUT_ROOT] - return LocalExportWorkflowStep(idt, name, state, timing, engine_size, exports, step["exportJointly"], - step["dateFormat"], end_date, output_root) - elif config.run_mode == WorkflowRunMode.REMOTE: - return RemoteExportWorkflowStep(idt, name, state, timing, engine_size, exports, step["exportJointly"], - step["dateFormat"], end_date) - else: - raise Exception("Unsupported mode") + return ExportWorkflowStep(idt, name, state, timing, engine_size, exports, step["exportJointly"], + step["dateFormat"], end_date) @staticmethod def _load_exports(logger: logging.Logger, src) -> List[Export]: exports_json = src["exports"] + default_container = src["defaultContainer"] exports = [] for e in exports_json: if "future" not in e or not e["future"]: try: - meta_key = e["metaKey"] if "metaKey" in e else [] - file_type_str = e["type"].upper() - snapshot_binding = e["snapshotBinding"] if "snapshotBinding" in e else None - offset_by_number_of_days = e["offsetByNumberOfDays"] if "offsetByNumberOfDays" in e else 0 - cfg = Export(meta_key, e["configRelName"], e["relativePath"], FileType[file_type_str], - snapshot_binding, offset_by_number_of_days) - exports.append(cfg) + exports.append(Export(meta_key=e.get("metaKey", []), + relation=e["configRelName"], + relative_path=e["relativePath"], + file_type=FileType[e["type"].upper()], + snapshot_binding=e.get("snapshotBinding"), + container=e.get("container", default_container), + offset_by_number_of_days=e.get("offsetByNumberOfDays", 0))) except KeyError as ex: logger.warning(f"Unsupported FileType: {ex}. Skipping export: {e}.") return exports -class LocalExportWorkflowStep(ExportWorkflowStep): - output_root: str - - def __init__(self, idt, name, state, timing, engine_size, exports, export_jointly, date_format, end_date, - output_root): - super().__init__(idt, name, state, timing, engine_size, exports, export_jointly, date_format, end_date) - self.output_root = output_root - - def _export(self, logger: logging.Logger, env_config: EnvConfig, rai_config: RaiConfig, exports): - save_csv_output( - rai.execute_query_csv(logger, rai_config, q.export_relations_local(logger, exports)), - self.output_root) - - -class RemoteExportWorkflowStep(ExportWorkflowStep): - def __init__(self, idt, name, state, timing, engine_size, exports, export_jointly, date_format, end_date): - super().__init__(idt, name, state, timing, engine_size, exports, export_jointly, date_format, end_date) - - def _export(self, logger: logging.Logger, env_config: EnvConfig, rai_config: RaiConfig, exports): - rai.execute_query( - logger, rai_config, - q.export_relations_remote(logger, env_config, exports, self.end_date, self.date_format)) - - DEFAULT_FACTORIES = MappingProxyType( { CONFIGURE_SOURCES: ConfigureSourcesWorkflowStepFactory(), diff --git a/workflow/paths.py b/workflow/paths.py index f20de5c..5c33c85 100644 --- a/workflow/paths.py +++ b/workflow/paths.py @@ -6,7 +6,7 @@ from typing import List from workflow import blob, constants -from workflow.common import EnvConfig +from workflow.common import EnvConfig, AzureConfig, LocalConfig, Container, ContainerType @dataclasses.dataclass @@ -32,15 +32,15 @@ def _build(self, logger: logging.Logger, days: List[str], relative_path: str, in class LocalPathsBuilder(PathsBuilder): - local_data_dir: str + config: LocalConfig - def __init__(self, local_data_dir): - self.local_data_dir = local_data_dir + def __init__(self, config): + self.config = config def _build(self, logger: logging.Logger, days: List[str], relative_path, extensions: List[str], is_date_partitioned: bool) -> List[FilePath]: paths = [] - files_path = f"{self.local_data_dir}/{relative_path}" + files_path = f"{self.config.data_path}/{relative_path}" if is_date_partitioned: for day in days: folder_path = f"{files_path}/{constants.DATE_PREFIX}{day}" @@ -60,15 +60,15 @@ def _get_folder_paths(folder_path: str, extensions: List[str]): return paths -class RemotePathsBuilder(PathsBuilder): - env_config: EnvConfig +class AzurePathsBuilder(PathsBuilder): + config: AzureConfig - def __init__(self, env_config: EnvConfig): - self.env_config = env_config + def __init__(self, config: AzureConfig): + self.config = config def _build(self, logger: logging.Logger, days: List[str], relative_path, extensions: List[str], is_date_partitioned: bool) -> List[FilePath]: - import_data_path = self.env_config.azure_import_data_path + import_data_path = self.config.data_path files_path = f"{import_data_path}/{relative_path}" logger.info(f"Loading data from blob import path: '{import_data_path}'") @@ -78,9 +78,22 @@ def _build(self, logger: logging.Logger, days: List[str], relative_path, extensi for day in days: logger.debug(f"Day from range: {day}") day_paths = [FilePath(path=path, as_of_date=day) for path in - blob.list_files_in_containers(logger, self.env_config, + blob.list_files_in_containers(logger, self.config, f"{files_path}/{constants.DATE_PREFIX}{day}")] paths += day_paths else: - paths = [FilePath(path=path) for path in blob.list_files_in_containers(logger, self.env_config, files_path)] + paths = [FilePath(path=path) for path in blob.list_files_in_containers(logger, self.config, files_path)] return paths + + +class PathsBuilderFactory: + + __CONTAINER_TYPE_TO_BUILDER = { + ContainerType.LOCAL: lambda container: LocalPathsBuilder(EnvConfig.EXTRACTORS[container.type](container.params)), + ContainerType.AZURE: lambda container: AzurePathsBuilder(EnvConfig.EXTRACTORS[container.type](container.params)) + } + + @staticmethod + def get_path_builder(container: Container) -> PathsBuilder: + return PathsBuilderFactory.__CONTAINER_TYPE_TO_BUILDER[container.type](container) + diff --git a/workflow/query.py b/workflow/query.py index 9466ae9..93e0e0b 100644 --- a/workflow/query.py +++ b/workflow/query.py @@ -5,7 +5,7 @@ from typing import List, Iterable from workflow import utils -from workflow.common import EnvConfig, FileType, Export, Source, SourceType +from workflow.common import FileType, Export, Source, ContainerType, AzureConfig from workflow.constants import IMPORT_CONFIG_REL, FILE_LOAD_RELATION # Static queries @@ -60,18 +60,26 @@ def populate_source_configs(sources: List[Source]) -> str: date_partitioned_sources = list(filter(lambda source: source.is_date_partitioned, sources)) return f""" - def delete:source_declares_resource = declared_sources_to_delete + def delete:source_declares_resource(r, c, p) {{ + declared_sources_to_delete(r, p) and + source_declares_resource(r, c, p) + }} def resource_config[:data] = \"\"\"{source_config_csv}\"\"\" def resource_config[:syntax, :header_row] = -1 - def resource_config[:syntax, :header] = (1, :Relation); (2, :Path) + def resource_config[:syntax, :header] = (1, :Relation); (2, :Container); (3, :Path) def resource_config[:schema, :Relation] = "string" + def resource_config[:schema, :Container] = "string" def resource_config[:schema, :Path] = "string" def source_config_csv = load_csv[resource_config] - def insert:source_declares_resource(r, p) = - exists(i : source_config_csv(:Relation, i, r) and source_config_csv(:Path, i, p)) + def insert:source_declares_resource(r, c, p) = + exists(i : + source_config_csv(:Relation, i, r) and + source_config_csv(:Container, i, c) and + source_config_csv(:Path, i, p) + ) def input_format_config[:data] = \"\"\"{data_formats_csv}\"\"\" @@ -110,30 +118,30 @@ def insert:resources_data_to_delete = resources_to_delete """ -def load_resources(logger: logging.Logger, env_config: EnvConfig, resources, src) -> str: +def load_resources(logger: logging.Logger, config: AzureConfig, resources, src) -> str: rel_name = src["source"] file_stype_str = src["file_type"] file_type = FileType[file_stype_str] - src_type = SourceType.from_source(src) + src_type = ContainerType.from_source(src) if 'is_multi_part' in src and src['is_multi_part'] == 'Y': if file_type == FileType.CSV or file_type == FileType.JSONL: - if src_type == SourceType.LOCAL: + if src_type == ContainerType.LOCAL: logger.info(f"Loading {len(resources)} shards from local files") return _local_load_multipart_query(rel_name, file_type, resources) - elif src_type == SourceType.REMOTE: - logger.info(f"Loading {len(resources)} shards from remote files") - return _remote_load_multipart_query(rel_name, file_type, resources, env_config) + elif src_type == ContainerType.AZURE: + logger.info(f"Loading {len(resources)} shards from Azure files") + return _azure_load_multipart_query(rel_name, file_type, resources, config) else: logger.error(f"Unknown file type {file_stype_str}") else: - if src_type == SourceType.LOCAL: + if src_type == ContainerType.LOCAL: logger.info("Loading from local file.") return _local_load_simple_query(rel_name, resources[0]["uri"], file_type) - elif src_type == SourceType.REMOTE: - logger.info("Loading from remote file.") - return _remote_load_simple_query(rel_name, resources[0]["uri"], file_type, env_config) + elif src_type == ContainerType.AZURE: + logger.info("Loading from Azure file.") + return _azure_load_simple_query(rel_name, resources[0]["uri"], file_type, config) def get_snapshot_expiration_date(snapshot_binding: str, date_format: str) -> str: @@ -163,18 +171,18 @@ def export_relations_local(logger: logging.Logger, exports: List[Export]) -> str return query -def export_relations_remote(logger: logging.Logger, env_config: EnvConfig, exports: List[Export], end_date: str, - date_format: str) -> str: +def export_relations_to_azure(logger: logging.Logger, config: AzureConfig, exports: List[Export], end_date: str, + date_format: str) -> str: query = f""" def _credentials_config:integration:provider = "azure" - def _credentials_config:integration:credentials:azure_sas_token = raw"{env_config.azure_export_sas}" + def _credentials_config:integration:credentials:azure_sas_token = raw"{config.sas}" """ for export in exports: if export.file_type == FileType.CSV: if export.meta_key: - query += _export_meta_relation_as_csv_remote(env_config, export, end_date, date_format) + query += _export_meta_relation_as_csv_to_azure(config, export, end_date, date_format) else: - query += _export_relation_as_csv_remote(env_config, export, end_date, date_format) + query += _export_relation_as_csv_to_azure(config, export, end_date, date_format) else: logger.warning(f"Unsupported export type: {export.file_type}") return query @@ -242,9 +250,9 @@ def _local_load_simple_query(rel_name: str, uri: str, file_type: FileType) -> st raise e -def _remote_load_simple_query(rel_name: str, uri: str, file_type: FileType, env_config: EnvConfig) -> str: +def _azure_load_simple_query(rel_name: str, uri: str, file_type: FileType, config: AzureConfig) -> str: return f"def {IMPORT_CONFIG_REL}:{rel_name}:integration:provider = \"azure\"\n" \ - f"def {IMPORT_CONFIG_REL}:{rel_name}:integration:credentials:azure_sas_token = raw\"{env_config.azure_import_sas}\"\n" \ + f"def {IMPORT_CONFIG_REL}:{rel_name}:integration:credentials:azure_sas_token = raw\"{config.sas}\"\n" \ f"def {IMPORT_CONFIG_REL}:{rel_name}:path = \"{uri}\"\n" \ f"{_simple_insert_query(rel_name, file_type)}" @@ -273,7 +281,7 @@ def _local_load_multipart_query(rel_name: str, file_type: FileType, parts) -> st f"{insert_text}" -def _remote_load_multipart_query(rel_name: str, file_type: FileType, parts, env_config: EnvConfig) -> str: +def _azure_load_multipart_query(rel_name: str, file_type: FileType, parts, config: AzureConfig) -> str: path_rel_name = f"{rel_name}_path" part_indexes = "" @@ -286,8 +294,7 @@ def _remote_load_multipart_query(rel_name: str, file_type: FileType, parts, env_ insert_text = _multi_part_insert_query(rel_name, file_type) load_config = _multi_part_load_config_query(rel_name, file_type, - _remote_multipart_config_integration(path_rel_name, - env_config)) + _azure_multipart_config_integration(path_rel_name, config)) return f"{_part_index_relation(part_indexes)}\n" \ f"{_path_rel_name_relation(path_rel_name, part_uri_map)}\n" \ @@ -313,9 +320,9 @@ def _local_multipart_config_integration(raw_data_rel_name: str) -> str: return f"def data = {raw_data_rel_name}[i]" -def _remote_multipart_config_integration(path_rel_name: str, env_config: EnvConfig) -> str: +def _azure_multipart_config_integration(path_rel_name: str, config: AzureConfig) -> str: return f"def integration:provider = \"azure\"\n" \ - f"def integration:credentials:azure_sas_token = raw\"{env_config.azure_import_sas}\"\n" \ + f"def integration:credentials:azure_sas_token = raw\"{config.sas}\"\n" \ f"def path = {path_rel_name}[i]\n" @@ -382,9 +389,9 @@ def output:{rel_name}[{key_str}] = csv_string[_export_csv_config:{rel_name}[{key """ -def _export_relation_as_csv_remote(env_config: EnvConfig, export: Export, end_date: str, date_format: str) -> str: +def _export_relation_as_csv_to_azure(config: AzureConfig, export: Export, end_date: str, date_format: str) -> str: rel_name = export.relation - export_path = f"{_compose_export_path(env_config, export, end_date, date_format)}/{rel_name}.csv" + export_path = f"{_compose_export_path(config, export, end_date, date_format)}/{rel_name}.csv" return f""" module _export_csv_config def {rel_name} = export_config:{rel_name} @@ -395,10 +402,10 @@ def export:{rel_name} = export_csv[_export_csv_config:{rel_name}] """ -def _export_meta_relation_as_csv_remote(env_config: EnvConfig, export: Export, end_date: str, date_format: str) -> str: +def _export_meta_relation_as_csv_to_azure(config: AzureConfig, export: Export, end_date: str, date_format: str) -> str: rel_name = export.relation postfix = _to_rel_meta_key_as_str(export) - base_path = _compose_export_path(env_config, export, end_date, date_format) + base_path = _compose_export_path(config, export, end_date, date_format) export_path = f"{base_path}/{rel_name}_{postfix}.csv" key_str = _to_rel_meta_key_as_seq(export) return f""" @@ -420,16 +427,14 @@ def export:{rel_name}[{key_str}] = export_csv[_export_csv_config:{rel_name}:conf """ -def _compose_export_path(env_config: EnvConfig, export: Export, end_date: str, date_format: str) -> str: - account_url = f"azure://{env_config.azure_export_account}.blob.core.windows.net" - container_path = env_config.azure_export_container - folder_path = env_config.azure_export_data_path +def _compose_export_path(config: AzureConfig, export: Export, end_date: str, date_format: str) -> str: + account_url = f"azure://{config.account}.blob.core.windows.net" date_path = utils.get_date_path(end_date, date_format, export.offset_by_number_of_days) if export.meta_key: postfix = _to_rel_meta_key_as_str(export) - return f"{account_url}/{container_path}/{folder_path}/{export.relative_path}_{postfix}/{date_path}" + return f"{account_url}/{config.container}/{config.data_path}/{export.relative_path}_{postfix}/{date_path}" else: - return f"{account_url}/{container_path}/{folder_path}/{export.relative_path}/{date_path}" + return f"{account_url}/{config.container}/{config.data_path}/{export.relative_path}/{date_path}" def _to_rel_literal_relation(xs: Iterable[str]) -> str: diff --git a/workflow/rai.py b/workflow/rai.py index e74578d..f333936 100644 --- a/workflow/rai.py +++ b/workflow/rai.py @@ -19,9 +19,9 @@ def get_config(engine: str, database: str, env_vars: dict[str, Any]) -> RaiConfi :param env_vars: Env vars dictionary :return: Env config """ - rai_profile = env_vars[RAI_PROFILE] if RAI_PROFILE in env_vars else "default" - rai_profile_path = env_vars[RAI_PROFILE_PATH] if RAI_PROFILE_PATH in env_vars else "~/.rai/config" - retries = env_vars[RAI_SDK_HTTP_RETRIES] if RAI_SDK_HTTP_RETRIES in env_vars else 3 + rai_profile = env_vars.get(RAI_PROFILE, "default") + rai_profile_path = env_vars.get(RAI_PROFILE_PATH, "~/.rai/config") + retries = env_vars.get(RAI_SDK_HTTP_RETRIES, 3) ctx = api.Context(**config.read(fname=rai_profile_path, profile=rai_profile), retries=retries) return RaiConfig(ctx=ctx, engine=engine, database=database) diff --git a/workflow/utils.py b/workflow/utils.py index e87f044..4eb05cb 100644 --- a/workflow/utils.py +++ b/workflow/utils.py @@ -5,6 +5,7 @@ from typing import List, Dict from workflow import constants +from workflow.common import LocalConfig def range_days(start: datetime, end: datetime) -> List[datetime]: @@ -30,17 +31,17 @@ def sansext(fname: str) -> str: return os.path.splitext(os.path.basename(fname))[0] -def save_csv_output(outputs: Dict, output_root: str) -> None: +def save_csv_output(outputs: Dict, config: LocalConfig) -> None: """ Save the content of dictionary as CSV files :param outputs: dictionary with outputs - :param output_root: output root folder + :param config: local config :return: """ for output in outputs.keys(): # for the time being, this handles the specialized relations of meta-exports normalized_file_name = output.replace("/:", "_") - with open(f"{output_root}/{normalized_file_name}.csv", "w") as file: + with open(f"{config.data_path}/{normalized_file_name}.csv", "w") as file: file.write(outputs[output])