diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index b548b18..646242c 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -24,4 +24,6 @@ jobs: echo Finished successful build with Python $pythonversion - name: Test with pytest run: | - pytest -v tests + pytest -v tests -m "not postgres_db and not mssql_db" + pytest -v tests -m postgres_db + pytest -v tests -m mssql_db diff --git a/Makefile b/Makefile index fb89f90..f4a197e 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,9 @@ test: make .venv/bin/python # test of module .venv/bin/pip install .[test] - .venv/bin/pytest + .venv/bin/pytest -m "not postgres_db and not mssql_db" + .venv/bin/pytest -m postgres_db + .venv/bin/pytest -m mssql_db publish: diff --git a/docs/commands.rst b/docs/commands.rst index 6cd79cf..b2b5dc1 100644 --- a/docs/commands.rst +++ b/docs/commands.rst @@ -24,6 +24,8 @@ Files commands .. autoclass:: ReadScriptOutput +.. autoclass:: WriteFile + Python commands --------------- diff --git a/mara_pipelines/commands/files.py b/mara_pipelines/commands/files.py index 7ae403e..b10ff19 100644 --- a/mara_pipelines/commands/files.py +++ b/mara_pipelines/commands/files.py @@ -4,7 +4,7 @@ import pathlib import shlex import sys -from typing import List, Tuple, Dict +from typing import List, Tuple, Dict, Union, Callable, Optional import enum @@ -83,10 +83,10 @@ def __init__(self, file_name: str, compression: Compression, target_table: str, self.timezone = timezone self.file_format = file_format - def db_alias(self): + def db_alias(self) -> str: return self._db_alias or config.default_db_alias() - def shell_command(self): + def shell_command(self) -> str: copy_from_stdin_command = mara_db.shell.copy_from_stdin_command( self.db_alias(), csv_format=self.csv_format, target_table=self.target_table, skip_header=self.skip_header, @@ -104,7 +104,7 @@ def shell_command(self): # Bigquery loading does not support streaming data through pipes return copy_from_stdin_command + f" {pathlib.Path(config.data_dir()) / self.file_name}" - def mapper_file_path(self): + def mapper_file_path(self) -> pathlib.Path: return self.parent.parent.base_path() / self.mapper_script_file_name def html_doc_items(self) -> List[Tuple[str, str]]: @@ -141,10 +141,10 @@ def __init__(self, sqlite_file_name: str, target_table: str, self.timezone = timezone @property - def db_alias(self): + def db_alias(self) -> str: return self._db_alias or config.default_db_alias() - def shell_command(self): + def shell_command(self) -> str: return (sql._SQLCommand.shell_command(self) + ' | ' + mara_db.shell.copy_command( mara_db.dbs.SQLiteDB(file_name=pathlib.Path(config.data_dir()).absolute() / self.sqlite_file_name), @@ -188,10 +188,10 @@ def __init__(self, file_name: str, target_table: str, make_unique: bool = False, self.timezone = timezone self.pipe_format = pipe_format - def db_alias(self): + def db_alias(self) -> str: return self._db_alias or config.default_db_alias() - def shell_command(self): + def shell_command(self) -> str: return f'{shlex.quote(sys.executable)} "{self.file_path()}" \\\n' \ + (' | sort -u \\\n' if self.make_unique else '') \ + ' | ' + mara_db.shell.copy_from_stdin_command( @@ -200,7 +200,7 @@ def shell_command(self): null_value_string=self.null_value_string, timezone=self.timezone, pipe_format=self.pipe_format) - def file_path(self): + def file_path(self) -> pathlib.Path: return self.parent.parent.base_path() / self.file_name def html_doc_items(self) -> List[Tuple[str, str]]: @@ -219,3 +219,51 @@ def html_doc_items(self) -> List[Tuple[str, str]]: _.tt[json.dumps(self.null_value_string) if self.null_value_string is not None else None]), ('time zone', _.tt[self.timezone]), (_.i['shell command'], html.highlight_syntax(self.shell_command(), 'bash'))] + + +class WriteFile(sql._SQLCommand): + """Writes data to a local file. The command is executed on the shell.""" + + def __init__(self, dest_file_name: str, sql_statement: Optional[Union[Callable, str]] = None, sql_file_name: Optional[str] = None, + replace: Optional[Dict[str, str]] = None, db_alias: Optional[str] = None, + compression: Compression = Compression.NONE, + format: formats.Format = formats.CsvFormat()) -> None: + """ + Writes the output of a sql query to a file in a specific format. The command is executed on the shell. + + Args: + dest_file_name: destination file name + sql_statement: The statement to run as a string + sql_file_name: The name of the file to run (relative to the directory of the parent pipeline) + replace: A set of replacements to perform against the sql query `{'replace`: 'with', ..}` + db_alias: db on which the SQL statement shall run + storage_alias: storage on which the CSV file shall be saved + format: the format in which the file shall be written. Default: CSV according to RFC 4180 + """ + if compression != Compression.NONE: + raise ValueError('Currently WriteFile only supports compression NONE') + super().__init__(sql_statement, sql_file_name, replace) + self.dest_file_name = dest_file_name + self._db_alias = db_alias + self.compression = compression + self.format = format + + @property + def db_alias(self) -> str: + return self._db_alias or config.default_db_alias() + + def shell_command(self) -> str: + command = super().shell_command() \ + + ' | ' + mara_db.shell.copy_to_stdout_command( \ + self.db_alias, header=None, footer=None, delimiter_char=None, \ + csv_format=None, pipe_format=self.format) +' \\\n' + return command \ + + f' > "{pathlib.Path(config.data_dir()) / self.dest_file_name}"' + + def html_doc_items(self) -> List[Tuple[str, str]]: + return [('db', _.tt[self.db_alias]) + ] \ + + sql._SQLCommand.html_doc_items(self, self.db_alias) \ + + [('format', _.tt[self.format]), + ('destination file name', _.tt[self.dest_file_name]), + (_.i['shell command'], html.highlight_syntax(self.shell_command(), 'bash'))] diff --git a/pyproject.toml b/pyproject.toml index adb4681..dddb2f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,9 @@ [build-system] requires = ["setuptools >= 40.6.0", "wheel >= 0.31"] build-backend = "setuptools.build_meta" + +[tool.pytest.ini_options] +markers = [ + "postgres_db", + "mssql_db", +] diff --git a/setup.cfg b/setup.cfg index 9d6ecc8..f4bbc81 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,3 +31,4 @@ test = pytest-docker pytest-dependency mara_app>=1.5.2 + mara-db[postgres,mssql] diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..520753a --- /dev/null +++ b/tests/README.md @@ -0,0 +1,8 @@ +Test notes +========== + +There are several types of tests: +* tests run without docker +* tests run with docker + +The tests running in docker are marked with their execution setup. E.g. mark `postgres_db` is used for a setup where PostgreSQL is used as data warehouse database, `mssql_db` is used for a setup where SQL Server is used as data warehouse database / and so on. Docker tests are executed sequential, because otherwise they would override their mara configuration. diff --git a/tests/db_test_helper.py b/tests/db_test_helper.py new file mode 100644 index 0000000..7c63a7e --- /dev/null +++ b/tests/db_test_helper.py @@ -0,0 +1,24 @@ +import sqlalchemy +from mara_db import dbs + + +def db_is_responsive(db: dbs.DB) -> bool: + """Returns True when the DB is available on the given port, otherwise False""" + engine = sqlalchemy.create_engine(db.sqlalchemy_url, pool_pre_ping=True) + + try: + with engine.connect() as conn: + return True + except: + return False + + +def db_replace_placeholders(db: dbs.DB, docker_ip: str, docker_port: int, database: str = None) -> dbs.DB: + """Replaces the internal placeholders with the docker ip and docker port""" + if db.host == 'DOCKER_IP': + db.host = docker_ip + if db.port == -1: + db.port = docker_port + if database: + db.database = database + return db diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index 618a899..5f41dce 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -10,3 +10,12 @@ services: POSTGRES_HOST_AUTH_METHOD: md5 ports: - "5432" + + mssql: + image: mcr.microsoft.com/mssql/server:2022-latest + environment: + - ACCEPT_EULA=Y + - MSSQL_SA_PASSWORD=YourStrong@Passw0rd + - MSSQL_PID=Developer + ports: + - "1433" diff --git a/tests/local_config.py b/tests/local_config.py new file mode 100644 index 0000000..e75e8b8 --- /dev/null +++ b/tests/local_config.py @@ -0,0 +1,10 @@ +# This file contains secrets used by the tests + +from mara_db import dbs + +# supported placeholders +# host='DOCKER_IP' will be replaced with the ip address given from pytest-docker +# port=-1 will be replaced with the ip address given from pytest-docker + +POSTGRES_DB = dbs.PostgreSQLDB(host='DOCKER_IP', port=-1, user="mara", password="mara", database="mara") +MSSQL_SQLCMD_DB = dbs.SqlcmdSQLServerDB(host='DOCKER_IP', port=-1, user='sa', password='YourStrong@Passw0rd', database='master', trust_server_certificate=True) diff --git a/tests/mssql/__init__.py b/tests/mssql/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/mssql/names.csv b/tests/mssql/names.csv new file mode 100644 index 0000000..6ff8758 --- /dev/null +++ b/tests/mssql/names.csv @@ -0,0 +1,10 @@ +1,Elinor Meklit +2,Triana Mahalah +3,Eugraphios Esmae +4,Agustín Alvilda +5,Behruz Hathor +6,Mathilde Tola +7,Kapel Tupaq +8,Shet Badulf +9,Ruslan Vančo +10,Madhavi Traian diff --git a/tests/mssql/names_dll_create.sql b/tests/mssql/names_dll_create.sql new file mode 100644 index 0000000..8378249 --- /dev/null +++ b/tests/mssql/names_dll_create.sql @@ -0,0 +1,5 @@ +CREATE TABLE names +( + id INT, + name nvarchar(max) +); diff --git a/tests/mssql/names_dll_drop.sql b/tests/mssql/names_dll_drop.sql new file mode 100644 index 0000000..b0418c5 --- /dev/null +++ b/tests/mssql/names_dll_drop.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS names; diff --git a/tests/mssql/test_command_ReadFile.py b/tests/mssql/test_command_ReadFile.py new file mode 100644 index 0000000..2f6a079 --- /dev/null +++ b/tests/mssql/test_command_ReadFile.py @@ -0,0 +1,137 @@ +import pathlib +import pytest +from typing import Tuple, Iterator + +from mara_app.monkey_patch import patch +from mara_db import dbs, formats +from mara_pipelines.commands.sql import ExecuteSQL +from mara_pipelines.commands.files import ReadFile, Compression + +from tests.command_helper import run_command +from tests.db_test_helper import db_is_responsive, db_replace_placeholders +from tests.local_config import POSTGRES_DB, MSSQL_SQLCMD_DB + +import mara_pipelines.config +patch(mara_pipelines.config.data_dir)(lambda: pathlib.Path(__file__).parent) + +FILE_PATH = pathlib.Path(__file__).parent + + +if not POSTGRES_DB: + pytest.skip("skipping MSSQL tests: variable POSTGRES_DB not set", allow_module_level=True) +if not MSSQL_SQLCMD_DB: + pytest.skip("skipping MSSQL tests: variable MSSQL_SQLCMD_DB not set", allow_module_level=True) + + +@pytest.fixture(scope="session") +def mssql_db(docker_ip, docker_services) -> Tuple[str, int]: + """Ensures that MSSQL server is running on docker.""" + + postgres_docker_port = docker_services.port_for("postgres", 5432) + _mara_db = db_replace_placeholders(POSTGRES_DB, docker_ip, postgres_docker_port) + + # here we need to wait until the PostgreSQL port is available. + docker_services.wait_until_responsive( + timeout=30.0, pause=0.1, check=lambda: db_is_responsive(_mara_db) + ) + + mssql_docker_port = docker_services.port_for("mssql", 1433) + master_db = db_replace_placeholders(MSSQL_SQLCMD_DB, docker_ip, mssql_docker_port) + + # here we need to wait until the MSSQL port is available. + docker_services.wait_until_responsive( + timeout=30.0, pause=0.1, check=lambda: db_is_responsive(master_db) + ) + + # create the dwh database + conn: dbs.DB = None + try: + conn = dbs.connect(master_db) # dbt.cursor_context cannot be used here because + # CREATE DATABASE cannot run inside a + # transaction block + try: + cur = conn.cursor() + conn.autocommit = True + cur.execute('CREATE DATABASE [dwh]') + finally: + if cur: + cur.close() + finally: + if conn: + conn.close() + + dwh_db = db_replace_placeholders(MSSQL_SQLCMD_DB, docker_ip, mssql_docker_port, database='dwh') + + import mara_db.config + patch(mara_db.config.databases)(lambda: { + 'mara': _mara_db, + 'dwh': dwh_db + }) + patch(mara_pipelines.config.default_db_alias)(lambda: 'dwh') + + return dwh_db + + +@pytest.mark.dependency() +@pytest.fixture +def names_table(mssql_db) -> Iterator[str]: + """ + Provides a 'names' table for tests. + """ + ddl_file_path = str((pathlib.Path(__file__).parent / 'names_dll_create.sql').absolute()) + assert run_command( + ExecuteSQL(sql_file_name=ddl_file_path), + + base_path=FILE_PATH + ) + + yield "names" + + ddl_file_path = str((pathlib.Path(__file__).parent / 'names_dll_drop.sql').absolute()) + assert run_command( + ExecuteSQL(sql_file_name=ddl_file_path), + + base_path=FILE_PATH + ) + + +@pytest.mark.mssql_db +def test_read_file(names_table): + """Tests command ReadFile""" + assert run_command( + ReadFile(file_name='names.csv', + compression=Compression.NONE, + target_table=names_table, + file_format=formats.CsvFormat()), + + base_path=FILE_PATH + ) + + with dbs.cursor_context('dwh') as cur: + try: + result = cur.execute(f'SELECT COUNT(*) FROM "{names_table}";') + assert 10, result.fetchone()[0] + + finally: + cur.execute(f'DELETE FROM "{names_table}";') + + +@pytest.mark.mssql_db +def test_read_file_old_parameters(names_table): + """Tests command ReadFile""" + assert run_command( + ReadFile(file_name='names.csv', + compression=Compression.NONE, + target_table=names_table, + csv_format=True), + + base_path=FILE_PATH + ) + + with dbs.cursor_context('dwh') as cur: + try: + result = cur.execute(f'SELECT COUNT(*) FROM "{names_table}";') + assert 10, result.fetchone()[0] + + finally: + cur.execute(f'DELETE FROM "{names_table}";') diff --git a/tests/mssql/test_mssql.py b/tests/mssql/test_mssql.py new file mode 100644 index 0000000..b062719 --- /dev/null +++ b/tests/mssql/test_mssql.py @@ -0,0 +1,105 @@ +import os +import pathlib +import pytest +import typing as t + +from mara_app.monkey_patch import patch +from mara_db import formats +import mara_pipelines.config +from mara_pipelines.commands.bash import RunBash +from mara_pipelines.commands.files import WriteFile +from mara_pipelines.commands.sql import ExecuteSQL +from mara_pipelines.pipelines import Pipeline, Task +from mara_pipelines.ui.cli import run_pipeline + +from tests.db_test_helper import db_is_responsive, db_replace_placeholders +from tests.local_config import MSSQL_SQLCMD_DB + + +if not MSSQL_SQLCMD_DB: + pytest.skip("skipping MSSQL tests: variable MSSQL_SQLCMD_DB not set", allow_module_level=True) + + +@pytest.fixture(scope="session") +def mssql_db(docker_ip, docker_services) -> t.Tuple[str, int]: + """Ensures that MSSQL server is running on docker.""" + + docker_port = docker_services.port_for("mssql", 1433) + db = db_replace_placeholders(MSSQL_SQLCMD_DB, docker_ip, docker_port) + + # here we need to wait until the PostgreSQL port is available. + docker_services.wait_until_responsive( + timeout=30.0, pause=0.1, check=lambda: db_is_responsive(db) + ) + + import mara_db.config + patch(mara_db.config.databases)(lambda: {'dwh': db}) + patch(mara_pipelines.config.default_db_alias)(lambda: 'dwh') + + return db + + +@pytest.mark.dependency() +@pytest.mark.mssql_db +def test_command_WriteFile(mssql_db): + + # set local temp path + patch(mara_pipelines.config.data_dir)(lambda: str((pathlib.Path(__file__).parent / '.tmp').absolute())) + + pipeline = Pipeline( + id='test_command_write_file', + description="") + + pipeline.add_initial( + Task(id='initial_ddl', + description="", + commands=[ + ExecuteSQL(""" +DROP TABLE IF EXISTS "test_command_WriteFile"; + +CREATE TABLE "test_command_WriteFile" +( + Id INT IDENTITY(1,1), + LongText1 NVARCHAR(MAX), + LongText2 NVARCHAR(MAX) +); + +INSERT INTO "test_command_WriteFile" ( + LongText1, LongText2 +) VALUES +('Hello', 'World!'), +('He lo', ' orld! '), +('Hello\t', ', World! '); +"""), + RunBash(f'mkdir -p {mara_pipelines.config.data_dir()}') + ])) + + pipeline.add( + Task(id='write_file_csv', + description="Wirte content of table to file", + commands=[WriteFile(dest_file_name='write-file.csv', + sql_statement="""SELECT * FROM "test_command_WriteFile";""", + format=formats.CsvFormat(delimiter_char='\t', header=False))])) + + pipeline.add( + Task(id='write_file_tsv', + description="Wirte content of table to file", + commands=[WriteFile(dest_file_name='write-file.tsv', + sql_statement="""SELECT * FROM "test_command_WriteFile";""", + format=formats.CsvFormat(delimiter_char='\t', header=False))])) + + assert run_pipeline(pipeline) + + files = [ + str((pathlib.Path(mara_pipelines.config.data_dir()) / 'write-file.csv').absolute()), + str((pathlib.Path(mara_pipelines.config.data_dir()) / 'write-file.tsv').absolute()) + ] + + file_not_found = [] + for file in files: + if not os.path.exists(file): + file_not_found.append(file) + else: + os.remove(file) + + assert not file_not_found diff --git a/tests/postgres/test_command_ReadFile.py b/tests/postgres/test_command_ReadFile.py index 152a9bd..12a61db 100644 --- a/tests/postgres/test_command_ReadFile.py +++ b/tests/postgres/test_command_ReadFile.py @@ -1,13 +1,15 @@ import pathlib import pytest -import sqlalchemy from typing import Tuple, Iterator from mara_app.monkey_patch import patch from mara_db import dbs, formats from mara_pipelines.commands.sql import ExecuteSQL from mara_pipelines.commands.files import ReadFile, Compression + from tests.command_helper import run_command +from tests.db_test_helper import db_is_responsive, db_replace_placeholders +from tests.local_config import POSTGRES_DB import mara_pipelines.config patch(mara_pipelines.config.data_dir)(lambda: pathlib.Path(__file__).parent) @@ -15,15 +17,8 @@ FILE_PATH = pathlib.Path(__file__).parent -def db_is_responsive(db: dbs.DB) -> bool: - """Returns True when the DB is available on the given port, otherwise False""" - engine = sqlalchemy.create_engine(db.sqlalchemy_url, pool_pre_ping=True) - - try: - with engine.connect(): - return True - except: - return False +if not POSTGRES_DB: + pytest.skip("skipping PostgreSQL tests: variable POSTGRES_DB not set", allow_module_level=True) @pytest.fixture(scope="session") @@ -31,32 +26,29 @@ def postgres_db(docker_ip, docker_services) -> Tuple[str, int]: """Ensures that PostgreSQL server is running on docker.""" docker_port = docker_services.port_for("postgres", 5432) - mara_db = dbs.PostgreSQLDB(host=docker_ip, - port=docker_port, - user="mara", - password="mara", - database="mara") + _mara_db = db_replace_placeholders(POSTGRES_DB, docker_ip, docker_port) # here we need to wait until the PostgreSQL port is available. docker_services.wait_until_responsive( - timeout=30.0, pause=0.1, check=lambda: db_is_responsive(mara_db) + timeout=30.0, pause=0.1, check=lambda: db_is_responsive(_mara_db) ) # create the dwh database + conn: dbs.DB = None try: - conn = dbs.connect(mara_db) # dbt.cursor_context cannot be used here because - # CREATE DATABASE cannot run inside a - # transaction block + conn = dbs.connect(_mara_db) # dbt.cursor_context cannot be used here because + # CREATE DATABASE cannot run inside a + # transaction block try: cur = conn.cursor() conn.autocommit = True cur.execute(''' CREATE DATABASE "dwh" - WITH OWNER "mara" - ENCODING 'UTF8' - TEMPLATE template0 - LC_COLLATE = 'en_US.UTF-8' - LC_CTYPE = 'en_US.UTF-8' + WITH OWNER "mara" + ENCODING 'UTF8' + TEMPLATE template0 + LC_COLLATE = 'en_US.UTF-8' + LC_CTYPE = 'en_US.UTF-8' ''') finally: if cur: @@ -65,15 +57,11 @@ def postgres_db(docker_ip, docker_services) -> Tuple[str, int]: if conn: conn.close() - dwh_db = dbs.PostgreSQLDB(host=docker_ip, - port=docker_port, - user="mara", - password="mara", - database="dwh") + dwh_db = db_replace_placeholders(POSTGRES_DB, docker_ip, docker_port, database='dwh') import mara_db.config patch(mara_db.config.databases)(lambda: { - 'mara': mara_db, + 'mara': _mara_db, 'dwh': dwh_db }) patch(mara_pipelines.config.default_db_alias)(lambda: 'dwh') @@ -104,6 +92,7 @@ def names_table(postgres_db) -> Iterator[str]: ) +@pytest.mark.postgres_db def test_read_file(names_table): """Tests command ReadFile""" assert run_command( @@ -118,13 +107,14 @@ def test_read_file(names_table): with dbs.cursor_context('dwh') as cur: try: - result = cur.execute(f'SELECT COUNT(*) FROM "{names_table}";') - assert 10, result.fetchone()[0] + result = cur.execute(f'SELECT COUNT(*) FROM "{names_table}";') + assert 10, result.fetchone()[0] finally: cur.execute(f'DELETE FROM "{names_table}";') +@pytest.mark.postgres_db def test_read_file_old_parameters(names_table): """Tests command ReadFile""" assert run_command( @@ -138,8 +128,8 @@ def test_read_file_old_parameters(names_table): with dbs.cursor_context('dwh') as cur: try: - result = cur.execute(f'SELECT COUNT(*) FROM "{names_table}";') - assert 10, result.fetchone()[0] + result = cur.execute(f'SELECT COUNT(*) FROM "{names_table}";') + assert 10, result.fetchone()[0] finally: cur.execute(f'DELETE FROM "{names_table}";') diff --git a/tests/postgres/test_postgres.py b/tests/postgres/test_postgres.py new file mode 100644 index 0000000..0820326 --- /dev/null +++ b/tests/postgres/test_postgres.py @@ -0,0 +1,105 @@ +import os +import pathlib +import pytest +import typing as t + +from mara_app.monkey_patch import patch +from mara_db import formats +import mara_pipelines.config +from mara_pipelines.commands.bash import RunBash +from mara_pipelines.commands.files import WriteFile +from mara_pipelines.commands.sql import ExecuteSQL +from mara_pipelines.pipelines import Pipeline, Task +from mara_pipelines.ui.cli import run_pipeline + +from tests.db_test_helper import db_is_responsive, db_replace_placeholders +from tests.local_config import POSTGRES_DB + + +if not POSTGRES_DB: + pytest.skip("skipping PostgreSQL tests: variable POSTGRES_DB not set", allow_module_level=True) + + +@pytest.fixture(scope="session") +def postgres_db(docker_ip, docker_services) -> t.Tuple[str, int]: + """Ensures that PostgreSQL server is running on docker.""" + + docker_port = docker_services.port_for("postgres", 5432) + db = db_replace_placeholders(POSTGRES_DB, docker_ip, docker_port) + + # here we need to wait until the PostgreSQL port is available. + docker_services.wait_until_responsive( + timeout=30.0, pause=0.1, check=lambda: db_is_responsive(db) + ) + + import mara_db.config + patch(mara_db.config.databases)(lambda: {'dwh': db}) + patch(mara_pipelines.config.default_db_alias)(lambda: 'dwh') + + return db + + +@pytest.mark.dependency() +@pytest.mark.postgres_db +def test_postgres_command_WriteFile(postgres_db): + + # set local temp path + patch(mara_pipelines.config.data_dir)(lambda: str((pathlib.Path(__file__).parent / '.tmp').absolute())) + + pipeline = Pipeline( + id='test_postgres_command_write_file', + description="") + + pipeline.add_initial( + Task(id='initial_ddl', + description="", + commands=[ + ExecuteSQL(""" +DROP TABLE IF EXISTS "test_postgres_command_WriteFile"; + +CREATE TABLE "test_postgres_command_WriteFile" +( + Id INT GENERATED ALWAYS AS IDENTITY, + LongText1 TEXT, + LongText2 TEXT +); + +INSERT INTO "test_postgres_command_WriteFile" ( + LongText1, LongText2 +) VALUES +('Hello', 'World!'), +('He lo', ' orld! '), +('Hello\t', ', World! '); +"""), + RunBash(f'mkdir -p {mara_pipelines.config.data_dir()}') + ])) + + pipeline.add( + Task(id='write_file_csv', + description="Wirte content of table to file", + commands=[WriteFile(dest_file_name='write-file.csv', + sql_statement="""SELECT * FROM "test_postgres_command_WriteFile";""", + format=formats.CsvFormat(delimiter_char='\t', header=False))])) + + pipeline.add( + Task(id='write_file_tsv', + description="Wirte content of table to file", + commands=[WriteFile(dest_file_name='write-file.tsv', + sql_statement="""SELECT * FROM "test_postgres_command_WriteFile";""", + format=formats.CsvFormat(delimiter_char='\t', header=False))])) + + assert run_pipeline(pipeline) + + files = [ + str((pathlib.Path(mara_pipelines.config.data_dir()) / 'write-file.csv').absolute()), + str((pathlib.Path(mara_pipelines.config.data_dir()) / 'write-file.tsv').absolute()) + ] + + file_not_found = [] + for file in files: + if not os.path.exists(file): + file_not_found.append(file) + else: + os.remove(file) + + assert not file_not_found diff --git a/tests/postgres/test_postgres_ddl.sql b/tests/postgres/test_postgres_ddl.sql new file mode 100644 index 0000000..c90c890 --- /dev/null +++ b/tests/postgres/test_postgres_ddl.sql @@ -0,0 +1,5 @@ +CREATE TABLE IF NOT EXISTS names +( + id INT, + name TEXT +);