Skip to content

Commit

Permalink
test without schema
Browse files Browse the repository at this point in the history
  • Loading branch information
spicy-sauce committed Nov 14, 2023
1 parent c8dc1ac commit 02dbedb
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 30 deletions.
37 changes: 21 additions & 16 deletions integration-tests/common/db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,16 @@ def get_engine(db_container: DbContainer) -> Engine:
return create_engine(db_container.get_connection_url())


def create_schema(engine: Engine, schema_name: str):
inspector = inspect(engine)
if not schema_name in inspector.get_schema_names():
with engine.connect() as connection:
with connection.begin():
connection.execute(text(f"CREATE SCHEMA {schema_name}"))
def create_schema(engine: Engine, schema_name: Optional[str]):
if schema_name:
inspector = inspect(engine)
if not schema_name in inspector.get_schema_names():
with engine.connect() as connection:
with connection.begin():
connection.execute(text(f"CREATE SCHEMA {schema_name}"))


def create_emp_table(engine: Engine, schema_name: str):
def create_emp_table(engine: Engine, schema_name: Optional[str]):
base = declarative_base()

columns = [
Expand All @@ -114,7 +115,7 @@ def create_emp_table(engine: Engine, schema_name: str):
base.metadata.create_all(engine)


def create_address_table(engine: Engine, schema_name: str):
def create_address_table(engine: Engine, schema_name: Optional[str]):
base = declarative_base()

columns = [
Expand All @@ -128,26 +129,30 @@ def create_address_table(engine: Engine, schema_name: str):
base.metadata.create_all(engine)


def insert_to_emp_table(engine: Engine, schema_name: str):
def insert_to_emp_table(engine: Engine, schema_name: Optional[str]):
schema_prefix = f"{schema_name}." if schema_name else ""
emp_table = f"{schema_prefix}emp"
with engine.connect() as connection:
with connection.begin():
connection.execute(text(
f"INSERT INTO {schema_name}.emp (id, full_name, country, gender) VALUES (1, 'John Doe', '972 - ISRAEL', 'M')"))
f"INSERT INTO {emp_table} (id, full_name, country, gender) VALUES (1, 'John Doe', '972 - ISRAEL', 'M')"))
connection.execute(text(
f"INSERT INTO {schema_name}.emp (id, full_name, country, gender) VALUES (10, 'john doe', '972 - ISRAEL', 'M')"))
f"INSERT INTO {emp_table} (id, full_name, country, gender) VALUES (10, 'john doe', '972 - ISRAEL', 'M')"))
connection.execute(text(
f"INSERT INTO {schema_name}.emp (id, full_name, country, gender, address) VALUES (12, 'steve steve', '972 - ISRAEL', 'M', 'main street')"))
f"INSERT INTO {emp_table} (id, full_name, country, gender, address) VALUES (12, 'steve steve', '972 - ISRAEL', 'M', 'main street')"))


def insert_to_address_table(engine: Engine, schema_name: str):
def insert_to_address_table(engine: Engine, schema_name: Optional[str]):
schema_prefix = f"{schema_name}." if schema_name else ""
address_table = f"{schema_prefix}address"
with engine.connect() as connection:
with connection.begin():
connection.execute(text(
f"INSERT INTO {schema_name}.address (id, emp_id, country_code, address) VALUES (1, 1, 'IL', 'my address 1')"))
f"INSERT INTO {address_table} (id, emp_id, country_code, address) VALUES (1, 1, 'IL', 'my address 1')"))
connection.execute(text(
f"INSERT INTO {schema_name}.address (id, emp_id, country_code, address) VALUES (2, 1, 'US', 'my address 2')"))
f"INSERT INTO {address_table} (id, emp_id, country_code, address) VALUES (2, 1, 'US', 'my address 2')"))
connection.execute(text(
f"INSERT INTO {schema_name}.address (id, emp_id, country_code, address) VALUES (5, 12, 'US', 'my address 0')"))
f"INSERT INTO {address_table} (id, emp_id, country_code, address) VALUES (5, 12, 'US', 'my address 0')"))


def select_one_row(engine: Engine, query: str) -> Optional[Dict[str, Any]]:
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/resources/jobs/tests/redis_to_db2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ steps:
- uses: relational.write
with:
connection: db2-hr
schema: hr
#schema: hr
table: emp
opcode_field: __$$opcode
keys:
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/resources/jobs/tests/redis_to_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ steps:
- uses: relational.write
with:
connection: mysql-hr
schema: hr
#schema: hr
table: emp
opcode_field: __$$opcode
keys:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ steps:
- uses: relational.write
with:
connection: oracle-hr
schema: hr
#schema: hr
table: emp
opcode_field: __$$opcode
keys:
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/resources/jobs/tests/redis_to_pg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ steps:
- uses: relational.write
with:
connection: psql-hr
schema: hr
#schema: hr
table: address
opcode_field: __$$opcode
foreach: "addresses: addresses[]"
Expand Down
25 changes: 15 additions & 10 deletions integration-tests/test_redis_to_relational.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import logging
from contextlib import suppress
from typing import Optional

import pytest
from common import db_utils, redis_utils
from common.utils import run_job
from sqlalchemy.engine import Engine

logger = logging.getLogger("dy")
SCHEMA_NAME = "hr"
SCHEMA_NAME = None # "hr"


# sqlserver test fails due https://github.com/testcontainers/testcontainers-python/issues/285
Expand Down Expand Up @@ -55,15 +56,19 @@ def test_redis_to_relational_db(db_type: str):
database_container.stop()


def check_results(engine: Engine, schema_name: str):
def check_results(engine: Engine, schema_name: Optional[str]):
schema_prefix = f"{schema_name}." if schema_name else ""
emp_table = f"{schema_prefix}emp"
address_table = f"{schema_prefix}address"

# the first record was supposed to be deleted due to opcode=="d"
total_employees = db_utils.select_one_row(engine, f"select count(*) as total from {schema_name}.emp")
total_employees = db_utils.select_one_row(engine, f"select count(*) as total from {emp_table}")
assert total_employees and total_employees["total"] == 3

first_employee = db_utils.select_one_row(engine, f"select * from {schema_name}.emp where id = 1")
first_employee = db_utils.select_one_row(engine, f"select * from {emp_table} where id = 1")
assert first_employee is None

second_employee = db_utils.select_one_row(engine, f"select * from {schema_name}.emp where id = 2")
second_employee = db_utils.select_one_row(engine, f"select * from {emp_table} where id = 2")
assert second_employee is not None
assert second_employee["full_name"] == "Jane Doe"
assert second_employee["country"] == "972 - ISRAEL"
Expand All @@ -72,29 +77,29 @@ def check_results(engine: Engine, schema_name: str):
assert second_employee["address"] is None

# address is not in the record. verify an upsert operation doesn't remove it
third_employee = db_utils.select_one_row(engine, f"select * from {schema_name}.emp where id = 12")
third_employee = db_utils.select_one_row(engine, f"select * from {emp_table} where id = 12")
assert third_employee is not None
assert third_employee["full_name"] == "John Doe"
assert third_employee["country"] == "972 - ISRAEL"
assert third_employee["gender"] == "M"
assert third_employee["address"] == "main street"

total_addresses = db_utils.select_one_row(engine, f"select count(*) as total from {schema_name}.address")
total_addresses = db_utils.select_one_row(engine, f"select count(*) as total from {address_table}")
assert total_addresses and total_addresses["total"] == 3

updated_address = db_utils.select_one_row(engine, f"select * from {schema_name}.address where id = 5")
updated_address = db_utils.select_one_row(engine, f"select * from {address_table} where id = 5")
assert updated_address is not None
assert updated_address["emp_id"] == 12
assert updated_address["country_code"] == "IL"
assert updated_address["address"] == "my address 5"

new_address_1 = db_utils.select_one_row(engine, f"select * from {schema_name}.address where id = 3")
new_address_1 = db_utils.select_one_row(engine, f"select * from {address_table} where id = 3")
assert new_address_1 is not None
assert new_address_1["emp_id"] == 2
assert new_address_1["country_code"] == "IL"
assert new_address_1["address"] == "my address 3"

new_address_2 = db_utils.select_one_row(engine, f"select * from {schema_name}.address where id = 4")
new_address_2 = db_utils.select_one_row(engine, f"select * from {address_table} where id = 4")
assert new_address_2 is not None
assert new_address_2["emp_id"] == 2
assert new_address_2["country_code"] == "US"
Expand Down

0 comments on commit 02dbedb

Please sign in to comment.