Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add WriteFile command #89

Merged
merged 13 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ jobs:
echo Finished successful build with Python $pythonversion
- name: Test with pytest
run: |
pytest -v tests
pytest -v tests -m "not postgres_db and not mssql_db"
pytest -v tests -m postgres_db
pytest -v tests -m mssql_db
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ test:
make .venv/bin/python
# test of module
.venv/bin/pip install .[test]
.venv/bin/pytest
.venv/bin/pytest -m "not postgres_db and not mssql_db"
.venv/bin/pytest -m postgres_db
.venv/bin/pytest -m mssql_db


publish:
Expand Down
2 changes: 2 additions & 0 deletions docs/commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Files commands

.. autoclass:: ReadScriptOutput

.. autoclass:: WriteFile


Python commands
---------------
Expand Down
66 changes: 57 additions & 9 deletions mara_pipelines/commands/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pathlib
import shlex
import sys
from typing import List, Tuple, Dict
from typing import List, Tuple, Dict, Union, Callable, Optional

import enum

Expand Down Expand Up @@ -83,10 +83,10 @@ def __init__(self, file_name: str, compression: Compression, target_table: str,
self.timezone = timezone
self.file_format = file_format

def db_alias(self):
def db_alias(self) -> str:
return self._db_alias or config.default_db_alias()

def shell_command(self):
def shell_command(self) -> str:
copy_from_stdin_command = mara_db.shell.copy_from_stdin_command(
self.db_alias(), csv_format=self.csv_format, target_table=self.target_table,
skip_header=self.skip_header,
Expand All @@ -104,7 +104,7 @@ def shell_command(self):
# Bigquery loading does not support streaming data through pipes
return copy_from_stdin_command + f" {pathlib.Path(config.data_dir()) / self.file_name}"

def mapper_file_path(self):
def mapper_file_path(self) -> pathlib.Path:
return self.parent.parent.base_path() / self.mapper_script_file_name

def html_doc_items(self) -> List[Tuple[str, str]]:
Expand Down Expand Up @@ -141,10 +141,10 @@ def __init__(self, sqlite_file_name: str, target_table: str,
self.timezone = timezone

@property
def db_alias(self):
def db_alias(self) -> str:
return self._db_alias or config.default_db_alias()

def shell_command(self):
def shell_command(self) -> str:
return (sql._SQLCommand.shell_command(self)
+ ' | ' + mara_db.shell.copy_command(
mara_db.dbs.SQLiteDB(file_name=pathlib.Path(config.data_dir()).absolute() / self.sqlite_file_name),
Expand Down Expand Up @@ -188,10 +188,10 @@ def __init__(self, file_name: str, target_table: str, make_unique: bool = False,
self.timezone = timezone
self.pipe_format = pipe_format

def db_alias(self):
def db_alias(self) -> str:
return self._db_alias or config.default_db_alias()

def shell_command(self):
def shell_command(self) -> str:
return f'{shlex.quote(sys.executable)} "{self.file_path()}" \\\n' \
+ (' | sort -u \\\n' if self.make_unique else '') \
+ ' | ' + mara_db.shell.copy_from_stdin_command(
Expand All @@ -200,7 +200,7 @@ def shell_command(self):
null_value_string=self.null_value_string, timezone=self.timezone,
pipe_format=self.pipe_format)

def file_path(self):
def file_path(self) -> pathlib.Path:
return self.parent.parent.base_path() / self.file_name

def html_doc_items(self) -> List[Tuple[str, str]]:
Expand All @@ -219,3 +219,51 @@ def html_doc_items(self) -> List[Tuple[str, str]]:
_.tt[json.dumps(self.null_value_string) if self.null_value_string is not None else None]),
('time zone', _.tt[self.timezone]),
(_.i['shell command'], html.highlight_syntax(self.shell_command(), 'bash'))]


class WriteFile(sql._SQLCommand):
"""Writes data to a local file. The command is executed on the shell."""

def __init__(self, dest_file_name: str, sql_statement: Optional[Union[Callable, str]] = None, sql_file_name: Optional[str] = None,
replace: Optional[Dict[str, str]] = None, db_alias: Optional[str] = None,
compression: Compression = Compression.NONE,
format: formats.Format = formats.CsvFormat()) -> None:
"""
Writes the output of a sql query to a file in a specific format. The command is executed on the shell.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... on the shell on the machine which runs mara (e.g. in the docker container!).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true! I am not happy with that everything in Mara runs through a shell command. For example, when I run mara in a Jupyter Notebook, it is pretty stupid that I have to carry out commands to make sure that the shell toolings are installed. It would be much smarter to use sqlalchemy or a DB API instead. I had already in mind to add a SqlExecutionContext which doesn't use shell but the DB API or SQLAlchemy, but ... didn't really had a usecase where I needed it...


Args:
dest_file_name: destination file name
sql_statement: The statement to run as a string
sql_file_name: The name of the file to run (relative to the directory of the parent pipeline)
replace: A set of replacements to perform against the sql query `{'replace`: 'with', ..}`
db_alias: db on which the SQL statement shall run
storage_alias: storage on which the CSV file shall be saved
format: the format in which the file shall be written. Default: CSV according to RFC 4180
"""
if compression != Compression.NONE:
raise ValueError('Currently WriteFile only supports compression NONE')
super().__init__(sql_statement, sql_file_name, replace)
self.dest_file_name = dest_file_name
self._db_alias = db_alias
self.compression = compression
self.format = format

@property
def db_alias(self) -> str:
return self._db_alias or config.default_db_alias()

def shell_command(self) -> str:
command = super().shell_command() \
+ ' | ' + mara_db.shell.copy_to_stdout_command( \
self.db_alias, header=None, footer=None, delimiter_char=None, \
csv_format=None, pipe_format=self.format) +' \\\n'
return command \
+ f' > "{pathlib.Path(config.data_dir()) / self.dest_file_name}"'

def html_doc_items(self) -> List[Tuple[str, str]]:
return [('db', _.tt[self.db_alias])
] \
+ sql._SQLCommand.html_doc_items(self, self.db_alias) \
+ [('format', _.tt[self.format]),
('destination file name', _.tt[self.dest_file_name]),
(_.i['shell command'], html.highlight_syntax(self.shell_command(), 'bash'))]
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
[build-system]
requires = ["setuptools >= 40.6.0", "wheel >= 0.31"]
build-backend = "setuptools.build_meta"

[tool.pytest.ini_options]
markers = [
"postgres_db",
"mssql_db",
]
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ test =
pytest-docker
pytest-dependency
mara_app>=1.5.2
mara-db[postgres,mssql]
8 changes: 8 additions & 0 deletions tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Test notes
==========

There are several types of tests:
* tests run without docker
* tests run with docker

The tests running in docker are marked with their execution setup. E.g. mark `postgres_db` is used for a setup where PostgreSQL is used as data warehouse database, `mssql_db` is used for a setup where SQL Server is used as data warehouse database / and so on. Docker tests are executed sequential, because otherwise they would override their mara configuration.
24 changes: 24 additions & 0 deletions tests/db_test_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import sqlalchemy
from mara_db import dbs


def db_is_responsive(db: dbs.DB) -> bool:
"""Returns True when the DB is available on the given port, otherwise False"""
engine = sqlalchemy.create_engine(db.sqlalchemy_url, pool_pre_ping=True)

try:
with engine.connect() as conn:
return True
except:
return False


def db_replace_placeholders(db: dbs.DB, docker_ip: str, docker_port: int, database: str = None) -> dbs.DB:
"""Replaces the internal placeholders with the docker ip and docker port"""
if db.host == 'DOCKER_IP':
db.host = docker_ip
if db.port == -1:
db.port = docker_port
if database:
db.database = database
return db
9 changes: 9 additions & 0 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,12 @@ services:
POSTGRES_HOST_AUTH_METHOD: md5
ports:
- "5432"

mssql:
image: mcr.microsoft.com/mssql/server:2022-latest
environment:
- ACCEPT_EULA=Y
- MSSQL_SA_PASSWORD=YourStrong@Passw0rd
- MSSQL_PID=Developer
ports:
- "1433"
10 changes: 10 additions & 0 deletions tests/local_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# This file contains secrets used by the tests

from mara_db import dbs

# supported placeholders
# host='DOCKER_IP' will be replaced with the ip address given from pytest-docker
# port=-1 will be replaced with the ip address given from pytest-docker

POSTGRES_DB = dbs.PostgreSQLDB(host='DOCKER_IP', port=-1, user="mara", password="mara", database="mara")
Copy link
Member

@jankatins jankatins Mar 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not pick that from an env var and add a fixture to set the env var? Or just create this in the fixture, where you have all the information (and return the DB)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yee... I am not 100% sure about that. This is actually a copy from the test suite of the mara_db project. See local_config.py.example.

The main purpose that I defined it in a separate config file is that you have the option to simply activate / disable the tests for specific database engines. Especially for test against cloud services this is handy. I don't want to share the credentials for my cloud with everybody but at the same time share the option for those who want to test their changes against the cloud ;-)

Removing this and integrating it into the fixture makes sence for now, but maybe not in the future...

MSSQL_SQLCMD_DB = dbs.SqlcmdSQLServerDB(host='DOCKER_IP', port=-1, user='sa', password='YourStrong@Passw0rd', database='master', trust_server_certificate=True)
Empty file added tests/mssql/__init__.py
Empty file.
10 changes: 10 additions & 0 deletions tests/mssql/names.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
1,Elinor Meklit
2,Triana Mahalah
3,Eugraphios Esmae
4,Agustín Alvilda
5,Behruz Hathor
6,Mathilde Tola
7,Kapel Tupaq
8,Shet Badulf
9,Ruslan Vančo
10,Madhavi Traian
5 changes: 5 additions & 0 deletions tests/mssql/names_dll_create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE names
(
id INT,
name nvarchar(max)
);
1 change: 1 addition & 0 deletions tests/mssql/names_dll_drop.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS names;
137 changes: 137 additions & 0 deletions tests/mssql/test_command_ReadFile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import pathlib
import pytest
from typing import Tuple, Iterator

from mara_app.monkey_patch import patch
from mara_db import dbs, formats
from mara_pipelines.commands.sql import ExecuteSQL
from mara_pipelines.commands.files import ReadFile, Compression

from tests.command_helper import run_command
from tests.db_test_helper import db_is_responsive, db_replace_placeholders
from tests.local_config import POSTGRES_DB, MSSQL_SQLCMD_DB

import mara_pipelines.config
patch(mara_pipelines.config.data_dir)(lambda: pathlib.Path(__file__).parent)

FILE_PATH = pathlib.Path(__file__).parent


if not POSTGRES_DB:
pytest.skip("skipping MSSQL tests: variable POSTGRES_DB not set", allow_module_level=True)
if not MSSQL_SQLCMD_DB:
pytest.skip("skipping MSSQL tests: variable MSSQL_SQLCMD_DB not set", allow_module_level=True)


@pytest.fixture(scope="session")
def mssql_db(docker_ip, docker_services) -> Tuple[str, int]:
"""Ensures that MSSQL server is running on docker."""

postgres_docker_port = docker_services.port_for("postgres", 5432)
_mara_db = db_replace_placeholders(POSTGRES_DB, docker_ip, postgres_docker_port)

# here we need to wait until the PostgreSQL port is available.
docker_services.wait_until_responsive(
timeout=30.0, pause=0.1, check=lambda: db_is_responsive(_mara_db)
)

mssql_docker_port = docker_services.port_for("mssql", 1433)
master_db = db_replace_placeholders(MSSQL_SQLCMD_DB, docker_ip, mssql_docker_port)

# here we need to wait until the MSSQL port is available.
docker_services.wait_until_responsive(
timeout=30.0, pause=0.1, check=lambda: db_is_responsive(master_db)
)

# create the dwh database
conn: dbs.DB = None
try:
conn = dbs.connect(master_db) # dbt.cursor_context cannot be used here because
# CREATE DATABASE cannot run inside a
# transaction block
try:
cur = conn.cursor()
conn.autocommit = True
cur.execute('CREATE DATABASE [dwh]')
finally:
if cur:
cur.close()
finally:
if conn:
conn.close()

dwh_db = db_replace_placeholders(MSSQL_SQLCMD_DB, docker_ip, mssql_docker_port, database='dwh')

import mara_db.config
patch(mara_db.config.databases)(lambda: {
'mara': _mara_db,
'dwh': dwh_db
})
patch(mara_pipelines.config.default_db_alias)(lambda: 'dwh')

return dwh_db


@pytest.mark.dependency()
@pytest.fixture
def names_table(mssql_db) -> Iterator[str]:
"""
Provides a 'names' table for tests.
"""
ddl_file_path = str((pathlib.Path(__file__).parent / 'names_dll_create.sql').absolute())
assert run_command(
ExecuteSQL(sql_file_name=ddl_file_path),

base_path=FILE_PATH
)

yield "names"

ddl_file_path = str((pathlib.Path(__file__).parent / 'names_dll_drop.sql').absolute())
assert run_command(
ExecuteSQL(sql_file_name=ddl_file_path),

base_path=FILE_PATH
)


@pytest.mark.mssql_db
def test_read_file(names_table):
"""Tests command ReadFile"""
assert run_command(
ReadFile(file_name='names.csv',
compression=Compression.NONE,
target_table=names_table,
file_format=formats.CsvFormat()),

base_path=FILE_PATH
)

with dbs.cursor_context('dwh') as cur:
try:
result = cur.execute(f'SELECT COUNT(*) FROM "{names_table}";')
assert 10, result.fetchone()[0]

finally:
cur.execute(f'DELETE FROM "{names_table}";')


@pytest.mark.mssql_db
def test_read_file_old_parameters(names_table):
"""Tests command ReadFile"""
assert run_command(
ReadFile(file_name='names.csv',
compression=Compression.NONE,
target_table=names_table,
csv_format=True),

base_path=FILE_PATH
)

with dbs.cursor_context('dwh') as cur:
try:
result = cur.execute(f'SELECT COUNT(*) FROM "{names_table}";')
assert 10, result.fetchone()[0]

finally:
cur.execute(f'DELETE FROM "{names_table}";')
Loading