Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relational.write - Improve ConnectionError #370

55 changes: 23 additions & 32 deletions core/src/datayoga_core/blocks/relational/write/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from datayoga_core.context import Context
from datayoga_core.opcode import OpCode
from datayoga_core.result import BlockResult, Result, Status
from sqlalchemy import text
from sqlalchemy import select, text
from sqlalchemy.exc import (DatabaseError, OperationalError,
PendingRollbackError)
from sqlalchemy.sql.expression import ColumnCollection
Expand All @@ -27,6 +27,7 @@ def init(self, context: Optional[Context] = None):

self.context = context
self.engine = None
self.connection = None
self.setup_engine()

def setup_engine(self):
Expand Down Expand Up @@ -74,14 +75,8 @@ def setup_engine(self):
self.delete_stmt = self.tbl.delete().where(sa.and_(*conditions))
self.upsert_stmt = self.generate_upsert_stmt()

except OperationalError as e:
self.dispose_engine()
raise ConnectionError(e)
except DatabaseError as e:
# Handling specific OracleDB errors: Network failure and Database restart
if self.db_type == relational_utils.DbType.ORACLE:
self.handle_oracle_database_error(e)
raise
except (OperationalError, PendingRollbackError, DatabaseError) as e:
spicy-sauce marked this conversation as resolved.
Show resolved Hide resolved
self._handle_connection_error(e)

def dispose_engine(self):
with suppress(Exception):
Expand Down Expand Up @@ -197,29 +192,8 @@ def execute(self, statement: Any, records: List[Dict[str, Any]]):
self.connection.execute(statement, records)
if not self.connection._is_autocommit_isolation():
self.connection.commit()

except (OperationalError, PendingRollbackError) as e:
if self.db_type == relational_utils.DbType.SQLSERVER:
self.handle_mssql_operational_error(e)

self.dispose_engine()
raise ConnectionError(e)
except DatabaseError as e:
if self.db_type == relational_utils.DbType.ORACLE:
self.handle_oracle_database_error(e)

raise

def handle_mssql_operational_error(self, e):
"""Handling specific MSSQL cases: Conversion failed (245) and Truncated data (2628)"""
if e.orig.args[0] in (245, 2628):
raise

def handle_oracle_database_error(self, e):
"""Handling specific OracleDB cases: Network failure (DPY-4011) and Database restart (ORA-01089)"""
if "DPY-4011" in f"{e}" or "ORA-01089" in f"{e}":
self.dispose_engine()
raise ConnectionError(e)
except (OperationalError, PendingRollbackError, DatabaseError) as e:
self._handle_connection_error(e)

def execute_upsert(self, records: List[Dict[str, Any]]):
if records:
Expand All @@ -243,3 +217,20 @@ def execute_delete(self, records: List[Dict[str, Any]]):

def stop(self):
self.dispose_engine()

def _is_connection_valid(self) -> bool:
"""Checks if the current database connection is still valid."""
try:
# Execute a simple query to check if the connection is still valid
self.connection.scalar(select(1))
spicy-sauce marked this conversation as resolved.
Show resolved Hide resolved
return True
except (OperationalError, PendingRollbackError, DatabaseError):
return False

def _handle_connection_error(self, error: Exception):
"""Handles connection errors by disposing the engine if necessary and raising ConnectionError."""
if self.connection is not None and not self._is_connection_valid():
self.dispose_engine()
raise ConnectionError(error)
else:
raise
15 changes: 15 additions & 0 deletions integration-tests/common/redis_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ def add_to_emp_stream(redis_client: Redis):
],
"__$$opcode": "u"
},
# gender length is too long, should fail (except from Cassandra)
{
"_id": 11,
"fname": "jane",
"lname": "doe",
"country_code": 972,
"country_name": "israel",
"credit_card": "1000-2000-3000-4000",
"gender": "FF",
"addresses": [
{"id": 33, "country_code": "IL", "address": "my address 33"},
{"id": 44, "country_code": "US", "address": "my address 44"}
],
"__$$opcode": "u"
},
{
"_id": 12,
"fname": "john",
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/test_redis_to_cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def prepare_db():
def test_total_records(prepare_db):
session = cassandra_utils.get_cassandra_session(["localhost"])
total_employees = session.execute(f"select count(*) as total from {TABLE}").one()
assert total_employees.total == 3
assert total_employees.total == 4


def test_filtered_record(prepare_db):
Expand Down
7 changes: 6 additions & 1 deletion integration-tests/test_redis_to_relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@
])
def test_redis_to_relational_db(db_type: str, schema_name: Optional[str]):
"""Reads data from a Redis stream and writes it to a relational database."""
job_name = f"tests.redis_to_{db_type}"

try:
redis_container = redis_utils.get_redis_oss_container(redis_utils.REDIS_PORT)
redis_container.start()

redis_utils.add_to_emp_stream(redis_utils.get_redis_client("localhost", redis_utils.REDIS_PORT))

with pytest.raises(ValueError):
run_job(job_name)

if db_type == "db2":
db_container = db_utils.get_db2_container("hr", "my_user", "my_pass")
elif db_type == "mysql":
Expand All @@ -52,7 +57,7 @@ def test_redis_to_relational_db(db_type: str, schema_name: Optional[str]):
db_utils.insert_to_emp_table(engine, schema_name)
db_utils.insert_to_address_table(engine, schema_name)

run_job(f"tests.redis_to_{db_type}")
run_job(job_name)
check_results(engine, schema_name)
finally:
with suppress(Exception):
Expand Down
Loading
Loading