Skip to content

Commit

Permalink
Snowflake as datasource (#24)
Browse files Browse the repository at this point in the history
* Snowflake as input container

* Handle missed container

* bump version

* update readme
  • Loading branch information
AndrewNepogoda authored Sep 28, 2023
1 parent f3eb75e commit 535bb1f
Show file tree
Hide file tree
Showing 20 changed files with 370 additions and 137 deletions.
12 changes: 6 additions & 6 deletions cli-e2e-test/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import shutil
import uuid

import test_query as q
import workflow.manager
import workflow.rai
from workflow.constants import RESOURCES_TO_DELETE_REL
from csv_diff import load_csv, compare, human_text
from subprocess import call

Expand Down Expand Up @@ -49,7 +49,7 @@ def test_scenario2_model_no_data_changes(self):
# then
self.assertNotEqual(rsp, 1)
rai_config = self.resource_manager.get_rai_config()
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, q.RESOURCES_TO_DELETE)
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, RESOURCES_TO_DELETE_REL)
self.assertEqual(rsp_json, {})

def test_scenario2_model_force_reimport(self):
Expand All @@ -65,7 +65,7 @@ def test_scenario2_model_force_reimport(self):
# then
self.assertNotEqual(rsp, 1)
rai_config = self.resource_manager.get_rai_config()
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, q.RESOURCES_TO_DELETE)
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, RESOURCES_TO_DELETE_REL)
self.assertEqual(rsp_json, [{'partition': 2023090800001, 'relation': 'city_data'},
{'partition': 2023090800002, 'relation': 'city_data'},
{'partition': 2023090900001, 'relation': 'city_data'},
Expand All @@ -84,7 +84,7 @@ def test_scenario2_model_force_reimport_chunk_partitioned(self):
# then
self.assertNotEqual(rsp, 1)
rai_config = self.resource_manager.get_rai_config()
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, q.RESOURCES_TO_DELETE)
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, RESOURCES_TO_DELETE_REL)
self.assertEqual(rsp_json, [{'relation': 'zip_city_state_master_data'}])

def test_scenario3_model_single_partition_change(self):
Expand All @@ -106,7 +106,7 @@ def test_scenario3_model_single_partition_change(self):
# then
self.assertNotEqual(rsp, 1)
rai_config = self.resource_manager.get_rai_config()
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, q.RESOURCES_TO_DELETE)
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, RESOURCES_TO_DELETE_REL)
self.assertEqual(rsp_json, [{'partition': 2023090800001, 'relation': 'city_data'}])

def test_scenario3_model_two_partitions_overriden_by_one(self):
Expand All @@ -129,7 +129,7 @@ def test_scenario3_model_two_partitions_overriden_by_one(self):
# then
self.assertNotEqual(rsp, 1)
rai_config = self.resource_manager.get_rai_config()
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, q.RESOURCES_TO_DELETE)
rsp_json = workflow.rai.execute_relation_json(self.logger, rai_config, RESOURCES_TO_DELETE_REL)
self.assertEqual(rsp_json, [{'partition': 2023090800001, 'relation': 'city_data'},
{'partition': 2023090800002, 'relation': 'city_data'}])

Expand Down
1 change: 0 additions & 1 deletion cli-e2e-test/test_query.py

This file was deleted.

44 changes: 31 additions & 13 deletions cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This Command-Line Interface (CLI) is designed to provide an easy and interactive
1. Create a batch configuration (ex. `poc.json`) file using the syntax and structure outlined in the [RAI Workflow Framework README](../workflow/README.md).
2. Add `rai-workflow-manager` as dependency to your `requirements.txt` file:
```txt
rai-workflow-manager==0.0.13
rai-workflow-manager==0.0.14
```
3. Build the project:
```bash
Expand Down Expand Up @@ -50,18 +50,23 @@ where `<engine>`, `<database>` are the names of some RAI resources to use
## `loader.toml` Configuration
The `loader.toml` file is used to specify static properties for the RAI Workflow Framework. It contains the following properties:

| Description | Property |
|:---------------------------------------------------------------------------------|------------------------|
| RAI profile. | `rai_profile` |
| Path to RAI config. | `rai_profile_path` |
| HTTP retries for RAI sdk in case of errors. (Can be overridden by CLI argument) | `rai_sdk_http_retries` |
| A list of containers to use for loading and exporting data. | `container` |
| The name of the container. | `container.name` |
| The type of the container. Supported types: `local`, `azure` | `container.type` |
| The path in the container. | `container.data_path` |
| Remote container account | `container.account` |
| Remote container SAS token. | `container.sas` |
| Container for remote container SAS token. (Ex. Azure Blob container) | `container.container` |
| Description | Property |
|:--------------------------------------------------------------------------------------------|------------------------|
| RAI profile. | `rai_profile` |
| Path to RAI config. | `rai_profile_path` |
| HTTP retries for RAI sdk in case of errors. (Can be overridden by CLI argument) | `rai_sdk_http_retries` |
| A list of containers to use for loading and exporting data. | `container` |
| The name of the container. | `container.name` |
| The type of the container. Supported types: `local`, `azure`, `snowflake`(only data import) | `container.type` |
| The path in the container. | `container.data_path` |
| Remote container account | `container.account` |
| Remote container SAS token. | `container.sas` |
| User for remote container. | `container.user` |
| Password for remote container. | `container.password` |
| User role for remote container (e.g. Snowflake user role). | `container.role` |
| Database for remote container. | `container.database` |
| Schema for remote container. | `container.schema` |
| Warehouse for Snowflake container. | `container.warehouse` |

### Azure container example
```toml
Expand All @@ -73,6 +78,19 @@ account="account_name"
sas="sas_token"
container="container_name"
```
### Snowflake container example
```toml
[[container]]
name="input"
type="snowflake"
account="account"
user="use"
password="password"
role="snowflake role"
warehouse="warehouse"
database="database"
schema="schema"
```
### Local container example
```toml
[[container]]
Expand Down
2 changes: 2 additions & 0 deletions cli/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ def configure(level=logging.INFO) -> logging.Logger:
# override default logging level for azure
logger = logging.getLogger('azure.core')
logger.setLevel(logging.ERROR)
logger = logging.getLogger('snowflake')
logger.setLevel(logging.ERROR)

logger = logging.getLogger()
logger.setLevel(level)
Expand Down
12 changes: 10 additions & 2 deletions rel/batch_config/workflow/steps/configure_sources.rel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ module batch_workflow_step
module configure_sources
def config_files(st in BatchWorkflowConfigureSourcesStep, f) { batch_workflow_step:configFiles(st, f) }

def default_container(st in BatchWorkflowConfigureSourcesStep, c) { batch_workflow_step:defaultContainer(st, c) }

def sources = transpose[batch_source:step]
end
end
Expand All @@ -28,8 +30,10 @@ end
module batch_source
def step(src, st) { batch_source_name:step_source_name_to_source(st, _, src) }
def extensions(src, e) { extract_value:extensions(src, :[], _, e) }
def partitioned(src) { extract_value:partitioned(src, boolean_true) }
def date_partitioned(src) { extract_value:isDatePartitioned(src, boolean_true) }
def chunk_partitioned(src) { extract_value:isChunkPartitioned(src, boolean_true) }
def relation = extract_value:relation
def container = extract_value:container
def relative_path = extract_value:relativePath
def input_format = extract_value:inputFormat
def loads_number_of_days = extract_value:loadsNumberOfDays
Expand All @@ -44,4 +48,8 @@ module batch_source
}
end

// TODO: declare ICs
ic configure_sources_step_default_container_is_mandatory(s) {
BatchWorkflowConfigureSourcesStep(s)
implies
batch_workflow_step:configure_sources:default_container(s, _)
}
1 change: 1 addition & 0 deletions rel/batch_config/workflow/workflow.rel
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ module batch_workflow_step
end

def batch_workflow_step(part, s, v) { batch_workflow_step:json_data(s, part, :[], _, v) }
def batch_workflow_step(part, s, v) { batch_workflow_step:json_data(s, part, v) }

module batch_workflow_step_order
def step_order_to_workflow(o, s, w) {
Expand Down
82 changes: 42 additions & 40 deletions rel/source_configs/config.rel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ bound multi_part_relation = String
bound date_partitioned_source = String
bound source_declares_resource = String, String, String
bound source_has_input_format = String, String
bound source_has_container_type = String, String
bound source_catalog
bound simple_source_catalog
bound part_resource_date_pattern = String
Expand Down Expand Up @@ -36,6 +37,31 @@ end

def input_format_code_to_string = transpose[^InputFormatCode]

/**
* Container types
*/

value type ContainerTypeCode = String
entity type ContainerType = ContainerTypeCode

def ContainerType(t) { container_type:id(t, _) }

module container_type
def id = transpose[container_type_code:identifies]
end

module container_type_code
def value = { "AZURE" ; "LOCAL" ; "SNOWFLAKE" }

def identifies(c, t) {
value(v) and
^ContainerTypeCode(v, c) and
^ContainerType(c, t)
from v
}
end

def container_type_code_to_string = transpose[^ContainerTypeCode]
/**
* Resources
*/
Expand All @@ -54,13 +80,6 @@ module resource
def id = transpose[uri:identifies]

def part_of = transpose[source:declares]

def local(r) {
str = uri_to_string[resource:id[r]] and
not regex_match( "^azure://(.+)$", str ) and
not regex_match( "^https://(.+)$", str )
from str
}
end

module part_resource
Expand Down Expand Up @@ -167,24 +186,16 @@ ic multi_part_source_declares_part_resources(s) {
forall(r in source:declares[s]: PartResource(r))
}

ic source_is_local_or_remote(s, rel) {
source:declares(s, _) and
source:populates(s, rel)
implies
source:local(s) or source:remote(s)
}

ic not_local_and_remote(s, rel) {
source:populates(s, rel) and
source:local(s)
ic source_has_unique_input_format(s) {
Source(s)
implies
not source:remote(s)
source:format(s, _)
}

ic source_has_unique_input_format(s) {
ic source_has_container_type(s) {
Source(s)
implies
source:format(s, _)
source:container_type(s, _)
}
// currently we do not support chunk partitioning for not date partitioned source
ic multi_part_source_is_date_partitioned(s) {
Expand All @@ -205,15 +216,6 @@ module source

def spans[s] = part_resource:hashes_to_date[declares[s]]

def local(s) {
declares(s, _) and
forall(r: declares(s, r) implies resource:local(r) )
}
def remote(s) {
declares(s, _) and
forall(r: declares(s, r) implies not resource:local(r) )
}

def container(s, c) {
source_declares_resource(rel, c, _) and
s = relation:identifies[ rel_name:identifies[ ^RelName[rel] ] ]
Expand All @@ -235,6 +237,13 @@ module source
CSVInputFormat(f)
from rel
}

def container_type(s, typ) {
source_has_container_type(rel, raw_container_type) and
s = relation:identifies[ rel_name:identifies[ ^RelName[rel] ] ] and
typ = container_type_code:identifies[^ContainerTypeCode[raw_container_type]]
from raw_container_type, rel
}
end

/**
Expand Down Expand Up @@ -344,24 +353,17 @@ def missing_resources_json(:[], n, :is_multi_part, "Y") {
from s
}

def missing_resources_json(:[], n, :is_remote, "Y") {
source:needs_resource(s) and
source:index[s] = n and
source:remote(s)
from s
}

def missing_resources_json(:[], n, :is_local, "Y") {
def missing_resources_json(:[], n, :container, v) {
source:needs_resource(s) and
source:index[s] = n and
source:local(s)
source:container(s, v)
from s
}

def missing_resources_json(:[], n, :container, v) {
def missing_resources_json(:[], n, :container_type, typ) {
source:needs_resource(s) and
source:index[s] = n and
source:container(s, v)
typ = container_type_code_to_string[ container_type:id[ source:container_type[s] ] ]
from s
}

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ requests-toolbelt==1.0.0
urllib3==1.26.6
more-itertools==10.1.0
azure-storage-blob==12.17.0
snowflake-connector-python==3.2.0
csv-diff==1.1
4 changes: 2 additions & 2 deletions test/test_cfg_src_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from unittest.mock import Mock

from workflow import paths
from workflow.common import Source
from workflow.common import Source, Container, ContainerType
from workflow.executor import ConfigureSourcesWorkflowStep, WorkflowStepState


Expand Down Expand Up @@ -402,7 +402,7 @@ def _create_test_source(is_chunk_partitioned: bool = True, is_date_partitioned:
loads_number_of_days: int = 1, offset_by_number_of_days: int = 0,
snapshot_validity_days=None) -> Source:
return Source(
container="default",
container=Container("default", ContainerType.LOCAL, {}),
relation="test",
relative_path="test",
input_format="test",
Expand Down
15 changes: 9 additions & 6 deletions test/test_cfg_src_step_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ class TestConfigureSourcesWorkflowStepFactory(unittest.TestCase):
def test_get_step(self):
# Setup factory spy
factory = ConfigureSourcesWorkflowStepFactory()
config = _create_wf_cfg(EnvConfig({"azure": _create_container("default", ContainerType.AZURE),
"local": _create_container("local", ContainerType.LOCAL)}), Mock())
spy = MagicMock(wraps=factory._parse_sources)
sources = [_create_test_source("azure", "src1"), _create_test_source("local", "src2"),
_create_test_source("local", "src3"), _create_test_source("local", "src4")]
sources = [_create_test_source(config.env.get_container("azure"), "src1"),
_create_test_source(config.env.get_container("local"), "src2"),
_create_test_source(config.env.get_container("local"), "src3"),
_create_test_source(config.env.get_container("local"), "src4")]
spy.return_value = sources
factory._parse_sources = spy
config = _create_wf_cfg(EnvConfig({"azure": _create_container("default", ContainerType.AZURE),
"local": _create_container("local", ContainerType.LOCAL)}), Mock())

# When
step = factory._get_step(self.logger, config, "1", "name", WorkflowStepState.INIT, 0, None, {"configFiles": []})
# Then
Expand All @@ -34,14 +37,14 @@ def test_get_step(self):
self.assertEqual(sources, step.sources)
self.assertEqual(2, len(step.paths_builders.keys()))
self.assertIsInstance(step.paths_builders.get("local"), paths.LocalPathsBuilder)
self.assertIsInstance(step.paths_builders.get("azure"), paths.AzurePathsBuilder)
self.assertIsInstance(step.paths_builders.get("default"), paths.AzurePathsBuilder)
self.assertEqual("2021-01-01", step.start_date)
self.assertEqual("2021-01-01", step.end_date)
self.assertFalse(step.force_reimport)
self.assertFalse(step.force_reimport_not_chunk_partitioned)


def _create_test_source(container: str, relation: str) -> Source:
def _create_test_source(container: Container, relation: str) -> Source:
return Source(
container=container,
relation=relation,
Expand Down
Loading

0 comments on commit 535bb1f

Please sign in to comment.