Skip to content

Commit

Permalink
Merge pull request #796 from phenobarbital/scylla-connector
Browse files Browse the repository at this point in the history
fixing some timeouts
  • Loading branch information
phenobarbital authored Sep 27, 2023
2 parents 78e01c7 + 3d1f462 commit 8287148
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 23 deletions.
51 changes: 29 additions & 22 deletions asyncdb/drivers/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,10 @@ async def wait_close(self, gracefully=True, timeout=5):
)
# # until end, close the pool correctly:
self._pool.terminate()
except asyncio.TimeoutError as e:
self._logger.warning(
f"Close timed out: {e}"
)
except Exception as err:
error = f"Pool Exception: {err.__class__.__name__}: {err}"
print(f"Pool Error: {error}")
Expand Down Expand Up @@ -1016,21 +1020,21 @@ async def forward(self, number):
return await self._cursor.forward(number)
except Exception as err:
error = f"Error forward Cursor: {err}"
raise Exception(error) from err
raise DriverError(error) from err

async def fetch(self, number=1):
try:
return await self._cursor.fetch(number)
except Exception as err:
error = f"Error Fetch Cursor: {err}"
raise Exception(error) from err
raise DriverError(error) from err

async def fetchrow(self):
try:
return await self._cursor.fetchrow()
except Exception as err:
error = f"Error Fetchrow Cursor: {err}"
raise Exception(error) from err
raise DriverError(error) from err

## Cursor Iterator Context
def __aiter__(self):
Expand Down Expand Up @@ -1077,7 +1081,7 @@ async def copy_from_table(
f"Error on Copy, Invalid Statement Error: {ex}"
) from ex
except Exception as ex:
raise ProviderError(
raise DriverError(
f"Error on Table Copy: {ex}"
) from ex

Expand Down Expand Up @@ -1116,7 +1120,7 @@ async def copy_to_table(
f"Error on Copy, Invalid Statement Error: {ex}"
) from ex
except Exception as ex:
raise Exception(
raise DriverError(
f"Error on Copy to Table {ex}"
) from ex

Expand All @@ -1135,7 +1139,10 @@ async def copy_into_table(
await self._transaction.commit()
try:
result = await self._connection.copy_records_to_table(
table_name=table, schema_name=schema, columns=columns, records=source
table_name=table,
schema_name=schema,
columns=columns,
records=source
)
return result
except UndefinedTableError as ex:
Expand All @@ -1155,15 +1162,15 @@ async def copy_into_table(
f"Error on Copy, Constraint Violated: {ex}"
) from ex
except InterfaceError as ex:
raise ProviderError(
raise DriverError(
f"Error on Copy into Table Function: {ex}"
) from ex
except (RuntimeError, PostgresError) as ex:
raise ProviderError(
raise DriverError(
f"Postgres Error on Copy into Table: {ex}"
) from ex
except Exception as ex:
raise Exception(
raise DriverError(
f"Error on Copy into Table: {ex}"
) from ex

Expand Down Expand Up @@ -1219,7 +1226,7 @@ async def create(
else:
return False
except Exception as err:
raise ProviderError(
raise DriverError(
f"Error in Object Creation: {err!s}"
) from err
else:
Expand Down Expand Up @@ -1321,7 +1328,7 @@ async def _insert_(self, _model: Model, **kwargs): # pylint: disable=W0613
message=f"Constraint Error: {err!r}",
) from err
except Exception as err:
raise ProviderError(
raise DriverError(
message=f"Error on Insert over table {_model.Meta.name}: {err!s}"
) from err

Expand Down Expand Up @@ -1362,7 +1369,7 @@ async def _delete_(self, _model: Model, **kwargs): # pylint: disable=W0613
result = await self._connection.execute(_delete)
return f'DELETE {result}: {_filter!s}'
except Exception as err:
raise ProviderError(
raise DriverError(
message=f"Error on Insert over table {_model.Meta.name}: {err!s}"
) from err

Expand Down Expand Up @@ -1442,7 +1449,7 @@ async def _update_(self, _model: Model, **kwargs): # pylint: disable=W0613
setattr(_model, f, val)
return _model
except Exception as err:
raise ProviderError(
raise DriverError(
message=f"Error on Insert over table {_model.Meta.name}: {err!s}"
) from err

Expand Down Expand Up @@ -1481,7 +1488,7 @@ async def _fetch_(self, _model: Model, *args, **kwargs):
result = await self._connection.fetchrow(_get)
return result
except Exception as e:
raise ProviderError(
raise DriverError(
f"Error: Model Fetch over {table}: {e}"
) from e

Expand Down Expand Up @@ -1519,7 +1526,7 @@ async def _filter_(self, _model: Model, *args, **kwargs):
result = await self._connection.fetch(_get)
return result
except Exception as e:
raise ProviderError(
raise DriverError(
f"Error: Model GET over {table}: {e}"
) from e

Expand All @@ -1530,7 +1537,7 @@ async def _select_(self, *args, **kwargs):
try:
model = kwargs['_model']
except KeyError as e:
raise ProviderError(
raise DriverError(
f'Missing Model for SELECT {kwargs!s}'
) from e
try:
Expand All @@ -1554,7 +1561,7 @@ async def _select_(self, *args, **kwargs):
result = await self._connection.fetch(_get)
return result
except Exception as e:
raise ProviderError(
raise DriverError(
f"Error: Model SELECT over {table}: {e}"
) from e

Expand Down Expand Up @@ -1592,7 +1599,7 @@ async def _get_(self, _model: Model, *args, **kwargs):
result = await self._connection.fetchrow(_get)
return result
except Exception as e:
raise ProviderError(
raise DriverError(
f"Error: Model GET over {table}: {e}"
) from e

Expand All @@ -1617,7 +1624,7 @@ async def _all_(self, _model: Model, *args, **kwargs): # pylint: disable=W0613
result = await self._connection.fetch(_all)
return result
except Exception as e:
raise ProviderError(
raise DriverError(
f"Error: Model All over {table}: {e}"
) from e

Expand Down Expand Up @@ -1647,7 +1654,7 @@ async def _remove_(self, _model: Model, **kwargs):
result = await self._connection.execute(_delete)
return f'DELETE {result}: {_filter!s}'
except Exception as err:
raise ProviderError(
raise DriverError(
message=f"Error on Insert over table {_model.Meta.name}: {err!s}"
) from err

Expand All @@ -1658,7 +1665,7 @@ async def _updating_(self, *args, _filter: dict = None, **kwargs):
try:
model = kwargs['_model']
except KeyError as e:
raise ProviderError(
raise DriverError(
f'Missing Model for SELECT {kwargs!s}'
) from e
try:
Expand Down Expand Up @@ -1706,6 +1713,6 @@ async def _updating_(self, *args, _filter: dict = None, **kwargs):
if result := await self._connection.fetch(_all):
return [model(**dict(r)) for r in result]
except Exception as err:
raise ProviderError(
raise DriverError(
message=f"Error on Insert over table {model.Meta.name}: {err!s}"
) from err
4 changes: 4 additions & 0 deletions asyncdb/drivers/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ async def close(self, timeout: int = 5) -> None:
await asyncio.wait_for(
self._connection.close(), timeout=timeout
)
except asyncio.TimeoutError as e:
self._logger.warning(
f"Close timed out: {e}"
)
except Exception as err:
raise DriverError(
message=f"{__name__!s}: Closing Error: {err!s}"
Expand Down
4 changes: 4 additions & 0 deletions asyncdb/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
# clean up anything you need to clean up
try:
await asyncio.wait_for(self.close(), timeout=20)
except asyncio.TimeoutError as e:
self._logger.warning(
f"Close timed out: {e}"
)
except RuntimeError as e:
self._logger.error(
str(e)
Expand Down
2 changes: 1 addition & 1 deletion asyncdb/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
__title__ = 'asyncdb'
__description__ = ('Library for Asynchronous data source connections '
'Collection of asyncio drivers.')
__version__ = '2.5.0'
__version__ = '2.5.1'
__author__ = 'Jesus Lara'
__author_email__ = '[email protected]'
__license__ = 'BSD'

0 comments on commit 8287148

Please sign in to comment.