Skip to content

Commit

Permalink
Allow multiple locations for source import/export (#23)
Browse files Browse the repository at this point in the history
* Allow multiple locations for source import/export

* Fix source_declares_resource clean up during data refresh

* remove redundant property from AzureConfig

* update readme

* simplify blob.py

* rename methods in query.py
  • Loading branch information
AndrewNepogoda authored Sep 22, 2023
1 parent 54cca40 commit f3eb75e
Show file tree
Hide file tree
Showing 25 changed files with 463 additions and 280 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ jobs:
echo "port = 443" >> config
echo "client_id = ${{ env.RAI_CLIENT_ID }}" >> config
echo "client_secret = ${{ env.RAI_CLIENT_SECRET }}" >> config
# Create empty toml
cd $GITHUB_WORKSPACE/cli-e2e-test/config
touch loader.toml
env:
RAI_CLIENT_ID: ${{ secrets.client_id }}
RAI_CLIENT_SECRET: ${{ secrets.client_secret }}
Expand Down
12 changes: 12 additions & 0 deletions cli-e2e-test/config/loader.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
rai_sdk_http_retries=0

[[container]]
name="input"
type="local"
data_path="./data"

[[container]]
name="export"
type="local"
data_path="./output"

2 changes: 2 additions & 0 deletions cli-e2e-test/config/model/scenario1.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"configFiles": [
"config/config1.rel"
],
"defaultContainer": "input",
"sources": [
{
"relation": "zip_city_state_master_data",
Expand Down Expand Up @@ -74,6 +75,7 @@
"name": "Export",
"exportJointly": false,
"dateFormat": "%Y%m%d",
"defaultContainer": "export",
"exports": [
{
"type": "csv",
Expand Down
1 change: 1 addition & 0 deletions cli-e2e-test/config/model/scenario2.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"configFiles": [
"config/config2.rel"
],
"defaultContainer": "input",
"sources": [
{
"relation": "zip_city_state_master_data",
Expand Down
1 change: 1 addition & 0 deletions cli-e2e-test/config/model/scenario3.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"configFiles": [
"config/config2.rel"
],
"defaultContainer": "input",
"sources": [
{
"relation": "zip_city_state_master_data",
Expand Down
2 changes: 2 additions & 0 deletions cli-e2e-test/config/model/scenario4.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"configFiles": [
"config/config3.rel"
],
"defaultContainer": "input",
"sources": [
{
"relation": "city_data",
Expand Down Expand Up @@ -41,6 +42,7 @@
"name": "Export",
"exportJointly": false,
"dateFormat": "%Y%m%d",
"defaultContainer": "export",
"exports": [
{
"type": "csv",
Expand Down
5 changes: 1 addition & 4 deletions cli-e2e-test/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@ class CliE2ETest(unittest.TestCase):
expected = "./expected_results"
resource_name = "wm-cli-e2e-test-" + str(uuid.uuid4())
cmd_with_common_arguments = ["python", "main.py",
"--run-mode", "local",
"--env-config", env_config,
"--engine", resource_name,
"--database", resource_name,
"--rel-config-dir", "./rel",
"--dev-data-dir", dev_data_dir,
"--output-root", output]
"--rel-config-dir", "./rel"]

def test_scenario1_model(self):
# when
Expand Down
58 changes: 48 additions & 10 deletions cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This Command-Line Interface (CLI) is designed to provide an easy and interactive
1. Create a batch configuration (ex. `poc.json`) file using the syntax and structure outlined in the [RAI Workflow Framework README](../workflow/README.md).
2. Add `rai-workflow-manager` as dependency to your `requirements.txt` file:
```txt
rai-workflow-manager==0.0.9
rai-workflow-manager==0.0.13
```
3. Build the project:
```bash
Expand All @@ -25,20 +25,61 @@ import cli.runner
if __name__ == "__main__":
cli.runner.start()
```
5. Run the following command to execute the batch configuration:
5. Create `loader.toml` file with the following content:
```toml
[[container]]
name="input"
type="local"
data_path="./data"

[[container]]
name="export"
type="local"
data_path="./output"
```
6. Run the following command to execute the batch configuration:
```bash
python main.py \
--run-mode local \
--dev-data-dir <path>/data \
--rel-config-dir <path>/rel \
--batch-config poc.json \
--env-config loader.toml \
--engine <engine> \
--database <database> \
--output-root ./output
--database <database>
```
where `<engine>`, `<database>` are the names of some RAI resources to use, `<path>` is the path to the directory containing directory with data and rel sources.
where `<engine>`, `<database>` are the names of some RAI resources to use
## `loader.toml` Configuration
The `loader.toml` file is used to specify static properties for the RAI Workflow Framework. It contains the following properties:

| Description | Property |
|:---------------------------------------------------------------------------------|------------------------|
| RAI profile. | `rai_profile` |
| Path to RAI config. | `rai_profile_path` |
| HTTP retries for RAI sdk in case of errors. (Can be overridden by CLI argument) | `rai_sdk_http_retries` |
| A list of containers to use for loading and exporting data. | `container` |
| The name of the container. | `container.name` |
| The type of the container. Supported types: `local`, `azure` | `container.type` |
| The path in the container. | `container.data_path` |
| Remote container account | `container.account` |
| Remote container SAS token. | `container.sas` |
| Container for remote container SAS token. (Ex. Azure Blob container) | `container.container` |

### Azure container example
```toml
[[container]]
name="input"
type="azure"
data_path="input"
account="account_name"
sas="sas_token"
container="container_name"
```
### Local container example
```toml
[[container]]
name="input"
type="local"
data_path="./data"
```
## CLI Arguments
| Description | CLI argument | Is required | Default value | Parameter Type | Recognized Values |
|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------|-------------|-------------------------|--------------------------|-----------------------------------------------------|
Expand All @@ -48,14 +89,11 @@ where `<engine>`, `<database>` are the names of some RAI resources to use, `<pat
| RAI Cloud source database name for clone | `--source-database` | `False` | | `String` | |
| RAI Cloud engine name | `--engine` | `True` | | `String` | |
| The size of RAI engine | `--engine-size` | `False` | `XS` | `String` | `['XS', 'S', 'M', 'L', 'XL']` |
| Run mode | `--run-mode` | `True` | | `String` | `['local', 'remote']` |
| Model earliest date to consider | `--start-date` | `False` | | `String` | format `YYYYmmdd` |
| Model latest date to consider | `--end-date` | `False` | | `String` | format `YYYYmmdd` |
| Directory containing dev data | `--dev-data-dir` | `False` | `../data` | `String` | |
| Directory containing rel config files to install | `--rel-config-dir` | `False` | `../rel` | `String` | |
| Path to `loader.toml` | `--env-config` | `False` | `../config/loader.toml` | `String` | |
| When loading each multi-part source, <br/>load all partitions (and shards) in one transaction | `--collapse-partitions-on-load` | `False` | `True` | `Bool` | |
| Output folder path for local run mode | `--output-root` | `False` | `../../output` | `String` | |
| Logging level for cli | `--log-level` | `False` | `INFO` | `String` | `['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']` |
| Drop database before workflow run, or not | `--drop-db` | `False` | `False` | `BooleanOptionalAction` | `True` in case argument presents |
| Remove RAI engine and database after run or not | `--cleanup-resources` | `False` | `False` | `Bool` | |
Expand Down
22 changes: 0 additions & 22 deletions cli/args.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from argparse import ArgumentParser, Namespace, BooleanOptionalAction

import workflow.executor


def parse() -> Namespace:
parser = ArgumentParser()
Expand Down Expand Up @@ -36,13 +34,6 @@ def parse() -> Namespace:
required=True,
type=str
)
parser.add_argument(
"--run-mode",
help="Type of run mode",
required=True,
type=workflow.executor.WorkflowRunMode,
choices=list(workflow.executor.WorkflowRunMode),
)
parser.add_argument(
"--start-date", help="Start date for model data. Format: 'YYYYmmdd'",
required=False,
Expand All @@ -69,12 +60,6 @@ def parse() -> Namespace:
action="store_true",
default=False
)
parser.add_argument(
"--dev-data-dir", help="Directory containing dev data",
required=False,
default="../data",
type=str
)
parser.add_argument(
"--rel-config-dir", help="Directory containing rel config files to install",
required=False,
Expand All @@ -95,13 +80,6 @@ def parse() -> Namespace:
default=True,
type=bool
)
parser.add_argument(
"--output-root",
help="Output folder path for dev mode",
required=False,
default="../../output",
type=str
)
parser.add_argument(
"--log-level",
help="Set log level",
Expand Down
7 changes: 2 additions & 5 deletions cli/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,14 @@ def start():
# Init workflow executor
parameters = {
workflow.constants.REL_CONFIG_DIR: args.rel_config_dir,
workflow.constants.OUTPUT_ROOT: args.output_root,
workflow.constants.LOCAL_DATA_DIR: args.dev_data_dir,
workflow.constants.START_DATE: args.start_date,
workflow.constants.END_DATE: args.end_date,
workflow.constants.FORCE_REIMPORT: args.force_reimport,
workflow.constants.FORCE_REIMPORT_NOT_CHUNK_PARTITIONED: args.force_reimport_not_chunk_partitioned,
workflow.constants.COLLAPSE_PARTITIONS_ON_LOAD: args.collapse_partitions_on_load
}
config = workflow.executor.WorkflowConfig(env_config, args.run_mode,
workflow.common.BatchConfig(args.batch_config_name,
batch_config_json),
config = workflow.executor.WorkflowConfig(env_config, workflow.common.BatchConfig(args.batch_config_name,
batch_config_json),
args.recover, args.recover_step, args.selected_steps, parameters)
executor = workflow.executor.WorkflowExecutor.init(logger, config, resource_manager)
end_time = time.time()
Expand Down
19 changes: 16 additions & 3 deletions rel/source_configs/config.rel
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
bound simple_relation = String
bound multi_part_relation = String
bound date_partitioned_source = String
bound source_declares_resource = String, String
bound source_declares_resource = String, String, String
bound source_has_input_format = String, String
bound source_catalog
bound simple_source_catalog
Expand Down Expand Up @@ -97,7 +97,7 @@ module uri
from v
}

def value(v) { source_declares_resource(_, v) }
def value(v) { source_declares_resource(_, _, v) }

def parse(u, i, d) {
value(v) and
Expand Down Expand Up @@ -197,7 +197,7 @@ module source
def populates = transpose[relation:identifies]

def declares(s, r) {
source_declares_resource(rel, res) and
source_declares_resource(rel, _, res) and
r = uri:identifies[ ^URI[res] ] and
s = relation:identifies[ rel_name:identifies[ ^RelName[rel] ] ]
from rel, res
Expand All @@ -214,6 +214,12 @@ module source
forall(r: declares(s, r) implies not resource:local(r) )
}

def container(s, c) {
source_declares_resource(rel, c, _) and
s = relation:identifies[ rel_name:identifies[ ^RelName[rel] ] ]
from rel
}

def date_partitioned(s) { s = relation:identifies[ rel_name:identifies[ ^RelName[date_partitioned_source] ] ] }

def format(s, f) {
Expand Down Expand Up @@ -352,6 +358,13 @@ def missing_resources_json(:[], n, :is_local, "Y") {
from s
}

def missing_resources_json(:[], n, :container, v) {
source:needs_resource(s) and
source:index[s] = n and
source:container(s, v)
from s
}

def missing_resources_json(:[], n, :is_date_partitioned, v) {
source:needs_resource(s) and
source:index[s] = n and
Expand Down
2 changes: 1 addition & 1 deletion rel/source_configs/data_reload.rel
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def simple_sources(rel, path) {
* All simple sources are affected if they match with declared sources.
*/
def potentially_affected_sources(rel, path) {
source_declares_resource(rel, path) and
source_declares_resource(rel, _, path) and
simple_sources(rel, path)
}
/*
Expand Down
Loading

0 comments on commit f3eb75e

Please sign in to comment.