diff --git a/mara_catalog/export.py b/mara_catalog/export.py new file mode 100644 index 0000000..571561d --- /dev/null +++ b/mara_catalog/export.py @@ -0,0 +1,75 @@ +"""Functions to export tables from a database to a storage""" + +from typing import Iterator, Union, List + +from mara_pipelines.pipelines import Command +from mara_db import formats +from .catalog import DbCatalog +from .schema import WriteSchema + +## TBD +from app.pipelines.transfer_to.write_file import WriteFile + + +def export_catalog_mara_commands(catalog: DbCatalog, storage_alias: str, base_path: str, + format: formats.Format, write_schema_file: bool = False, include: List[str] = None, + db_alias: str = None) -> Iterator[Union[Command, List[Command]]]: + """ + Returns pipeline tasks which exports a catalog to a storage. + + Args: + catalog: The catalog to be exported + storage_alias: the storage where the tables shall be exported to + base_path: the base path + format: the format as it should be exported + write_schema_file: if a sqlalchemy schema file shall be added into the table directory. + list: if you want to include only a predefined list of tables, pass over a list of table names here. + This is applied accross schemas since schema selection is not yet supported. (TODO Might be changed in the future.) + """ + + for table in catalog.tables: + table_name = table['name'] + schema_name = table.get('schema', catalog.default_schema) + if include: + if not table_name in include: + # skip tables defined in include + continue + table_path = f'{base_path}/{schema_name}/{table_name}' if catalog.has_schemas else f'{base_path}/{table_name}' + #yield Task(id=table_to_id(schema_name, table_name), + # description=f"Export table {schema_name}.{table_name} to storage {storage_alias}", + # commands= + yield [ + # TBD: when format is parquet, delete the folder content first + WriteFile(sql_statement=f'SELECT * FROM "{schema_name}"."{table_name}"', + db_alias=db_alias or catalog.db_alias, + storage_alias=storage_alias, + dest_file_name=clean_hadoop_path(f'{table_path}/part.0.parquet'), # TODO generic format ending would be nice here + compression=('snappy' if isinstance(format, formats.ParquetFormat) else None), + format=format) + ] + ([ + WriteSchema(table_name, + schema=schema_name, + db_alias=db_alias or catalog.db_alias, + storage_alias=storage_alias, + file_name=f'{clean_hadoop_path(table_path)}/_sqlalchemy_metadata.py') + ] if write_schema_file else []) + #, + # max_retries=2) + + +def table_to_id(schema_name, table_name) -> str: + return f'{schema_name}_{table_name}'.lower() + + +def clean_hadoop_path(path) -> str: + """ + Hadoop hides paths starting with '_' and '.'. With this function paths for tables are renamed so that they can be read via Hadoop. + """ + parts = [] + for part in str(path).split('/'): + if part.startswith('_') or part.startswith('.'): + parts.append('x' + part) + else: + parts.append(part) + + return '/'.join(parts) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/databricks/__init__.py b/tests/databricks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml new file mode 100644 index 0000000..1dc49cb --- /dev/null +++ b/tests/docker-compose.yml @@ -0,0 +1,20 @@ +version: '3.1' + +services: + postgres: + image: postgres:15 + environment: + POSTGRES_DB: mara + POSTGRES_USER: mara + POSTGRES_PASSWORD: mara + POSTGRES_HOST_AUTH_METHOD: md5 + ports: + - "5432" + + mssql: + image: mcr.microsoft.com/mssql/server:2022-latest + environment: + - ACCEPT_EULA=Y + - SA_PASSWORD=YourStrong@Passw0rd + ports: + - "1433" diff --git a/tests/example_datalake/README.md b/tests/example_datalake/README.md new file mode 100644 index 0000000..fea958a --- /dev/null +++ b/tests/example_datalake/README.md @@ -0,0 +1 @@ +The sample data is taken from https://www.tablab.app/datasets/sample/parquet diff --git a/tests/example_datalake/mt_cars/mt cars.parquet b/tests/example_datalake/mt_cars/mt cars.parquet new file mode 100644 index 0000000..13085cd Binary files /dev/null and b/tests/example_datalake/mt_cars/mt cars.parquet differ diff --git a/tests/example_datalake/weather/Weather.parquet b/tests/example_datalake/weather/Weather.parquet new file mode 100644 index 0000000..a31a6f5 Binary files /dev/null and b/tests/example_datalake/weather/Weather.parquet differ diff --git a/tests/local_config.py b/tests/local_config.py new file mode 100644 index 0000000..8466200 --- /dev/null +++ b/tests/local_config.py @@ -0,0 +1,11 @@ +# This file contains secrets used by the tests + +from mara_db import dbs + +# supported placeholders +# host='DOCKER_IP' will be replaced with the ip address given from pytest-docker +# port=-1 will be replaced with the ip address given from pytest-docker + +POSTGRES_DB = dbs.PostgreSQLDB(host='DOCKER_IP', port=-1, user="mara", password="mara", database="mara") +MSSQL_DB = dbs.SQLServerDB(host='DOCKER_IP', port=-1, user='sa', password='YourStrong@Passw0rd', database='master') +DATABRICKS_DB = None #dbs.DatabricksDB(host='DBSQLCLI_HOST_NAME', http_path='DBSQLCLI_HTTP_PATH', access_token='DBSQLCLI_ACCESS_TOKEN') diff --git a/tests/local_config.py.example b/tests/local_config.py.example new file mode 100644 index 0000000..a4e9523 --- /dev/null +++ b/tests/local_config.py.example @@ -0,0 +1,11 @@ +# This file contains secrets used by the tests + +from mara_db import dbs + +# supported placeholders +# host='DOCKER_IP' will be replaced with the ip address given from pytest-docker +# port=-1 will be replaced with the ip address given from pytest-docker + +POSTGRES_DB = dbs.PostgreSQLDB(host='DOCKER_IP', port=-1, user="mara", password="mara", database="mara") +MSSQL_DB = None # dbs.SQLServerDB(host='DOCKER_IP', port=-1, user='sa', password='YourStrong@Passw0rd', database='master') +DATABRICKS_DB = None #dbs.DatabricksDB(host='DBSQLCLI_HOST_NAME', http_path='DBSQLCLI_HTTP_PATH', access_token='DBSQLCLI_ACCESS_TOKEN') diff --git a/tests/mssql/__init__.py b/tests/mssql/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/postgres/__init__.py b/tests/postgres/__init__.py new file mode 100644 index 0000000..e69de29