Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use urllib3 for retries #182

Merged
merged 11 commits into from
Aug 9, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 2.8.x (Unreleased)

- Replace retry handling with DatabricksRetryPolicy. This is disabled by default. To enable, set `enable_v3_retries=True` when creating `databricks.sql.client`
- Other: Fix typo in README quick start example
- Other: Add autospec to Client mocks and tidy up `make_request`

Expand Down
410 changes: 410 additions & 0 deletions src/databricks/sql/auth/retry.py

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions src/databricks/sql/auth/thrift_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

from urllib3 import HTTPConnectionPool, HTTPSConnectionPool, ProxyManager

from databricks.sql.auth.retry import CommandType, DatabricksRetryPolicy


class THttpClient(thrift.transport.THttpClient.THttpClient):
def __init__(
Expand All @@ -28,6 +30,7 @@ def __init__(
key_file=None,
ssl_context=None,
max_connections: int = 1,
retry_policy: Union[DatabricksRetryPolicy, int] = 0,
susodapop marked this conversation as resolved.
Show resolved Hide resolved
):
if port is not None:
warnings.warn(
Expand Down Expand Up @@ -81,6 +84,10 @@ def __init__(

self.max_connections = max_connections

# If retry_policy == 0 then urllib3 will not retry automatically
# this falls back to the pre-v3 behaviour where thrift_backend.py handles retry logic
self.retry_policy = retry_policy

self.__wbuf = BytesIO()
self.__resp: Union[None, HTTPResponse] = None
self.__timeout = None
Expand All @@ -92,6 +99,13 @@ def setCustomHeaders(self, headers: Dict[str, str]):
self._headers = headers
super().setCustomHeaders(headers)

def startRetryTimer(self):
"""Notify DatabricksRetryPolicy of the request start time

This is used to enforce the retry_stop_after_attempts_duration
"""
self.retry_policy and self.retry_policy.start_retry_timer()

def open(self):

# self.__pool replaces the self.__http used by the original THttpClient
Expand Down Expand Up @@ -167,6 +181,7 @@ def flush(self):
headers=headers,
preload_content=False,
timeout=self.__timeout,
retries=self.retry_policy,
)

# Get reply to flush the request
Expand All @@ -188,3 +203,12 @@ def basic_proxy_auth_header(proxy):
)
cr = base64.b64encode(ap.encode()).strip()
return "Basic " + six.ensure_str(cr)

def set_retry_command_type(self, value: CommandType):
"""Pass the provided CommandType to the retry policy"""
if isinstance(self.retry_policy, DatabricksRetryPolicy):
self.retry_policy.command_type = value
else:
logger.warning(
"DatabricksRetryPolicy is currently bypassed. The CommandType cannot be set."
)
12 changes: 11 additions & 1 deletion src/databricks/sql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

from databricks.sql import __version__
from databricks.sql import *
from databricks.sql.exc import OperationalError
from databricks.sql.exc import (
OperationalError,
SessionAlreadyClosedError,
CursorAlreadyClosedError,
)
from databricks.sql.thrift_backend import ThriftBackend
from databricks.sql.utils import ExecuteResponse, ParamEscaper, inject_parameters
from databricks.sql.types import Row
Expand Down Expand Up @@ -257,6 +261,9 @@ def _close(self, close_cursors=True) -> None:

try:
self.thrift_backend.close_session(self._session_handle)
except RequestError as e:
if isinstance(e.args[1], SessionAlreadyClosedError):
logger.info("Session was closed by a prior request")
except DatabaseError as e:
if "Invalid SessionHandle" in str(e):
logger.warning(
Expand Down Expand Up @@ -958,6 +965,9 @@ def close(self) -> None:
and self.connection.open
):
self.thrift_backend.close_command(self.command_id)
except RequestError as e:
if isinstance(e.args[1], CursorAlreadyClosedError):
logger.info("Operation was canceled by a prior request")
finally:
self.has_been_closed_server_side = True
self.op_state = self.thrift_backend.CLOSED_OP_STATE
Expand Down
22 changes: 22 additions & 0 deletions src/databricks/sql/exc.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,25 @@ class RequestError(OperationalError):
"""

pass


class MaxRetryDurationError(RequestError):
"""Thrown if the next HTTP request retry would exceed the configured
stop_after_attempts_duration
"""


class NonRecoverableNetworkError(RequestError):
"""Thrown if an HTTP code 501 is received"""


class UnsafeToRetryError(RequestError):
"""Thrown if ExecuteStatement request receives a code other than 200, 429, or 503"""


class SessionAlreadyClosedError(RequestError):
"""Thrown if CloseSession receives a code 404. ThriftBackend should gracefully proceed as this is expected."""


class CursorAlreadyClosedError(RequestError):
"""Thrown if CancelOperation receives a code 404. ThriftBackend should gracefully proceed as this is expected."""
57 changes: 51 additions & 6 deletions src/databricks/sql/thrift_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import urllib3.exceptions

import databricks.sql.auth.thrift_http_client
from databricks.sql.auth.thrift_http_client import CommandType
from databricks.sql.auth.authenticators import AuthProvider
from databricks.sql.thrift_api.TCLIService import TCLIService, ttypes
from databricks.sql import *
from databricks.sql.exc import MaxRetryDurationError
from databricks.sql.thrift_api.TCLIService.TCLIService import (
Client as TCLIServiceClient,
)
Expand Down Expand Up @@ -70,6 +72,12 @@ class ThriftBackend:
CLOSED_OP_STATE = ttypes.TOperationState.CLOSED_STATE
ERROR_OP_STATE = ttypes.TOperationState.ERROR_STATE

_retry_delay_min: float
_retry_delay_max: float
_retry_stop_after_attempts_count: int
_retry_stop_after_attempts_duration: float
_retry_delay_default: float

def __init__(
self,
server_hostname: str,
Expand Down Expand Up @@ -113,9 +121,15 @@ def __init__(
#
# _retry_stop_after_attempts_count
# The maximum number of times we should retry retryable requests (defaults to 24)
# _retry_dangerous_codes
# An iterable of integer HTTP status codes. ExecuteStatement commands will be retried if these codes are received.
# (defaults to [])
# _socket_timeout
# The timeout in seconds for socket send, recv and connect operations. Should be a positive float or integer.
# (defaults to 900)
# _enable_v3_retries
# Whether to use the DatabricksRetryPolicy implemented in urllib3
# (defaults to False)
# max_download_threads
# Number of threads for handling cloud fetch downloads. Defaults to 10

Expand Down Expand Up @@ -166,10 +180,28 @@ def __init__(

self._auth_provider = auth_provider

# Connector version 3 retry approach
self.enable_v3_retries = kwargs.get("_enable_v3_retries", False)
self.force_dangerous_codes = kwargs.get("_retry_dangerous_codes", [])

additional_transport_args = {}
if self.enable_v3_retries:
self.retry_policy = databricks.sql.auth.thrift_http_client.DatabricksRetryPolicy(
delay_min=self._retry_delay_min,
delay_max=self._retry_delay_max,
stop_after_attempts_count=self._retry_stop_after_attempts_count,
stop_after_attempts_duration=self._retry_stop_after_attempts_duration,
delay_default=self._retry_delay_default,
force_dangerous_codes=self.force_dangerous_codes,
)

additional_transport_args["retry_policy"] = self.retry_policy

self._transport = databricks.sql.auth.thrift_http_client.THttpClient(
auth_provider=self._auth_provider,
uri_or_host=uri,
ssl_context=ssl_context,
**additional_transport_args, # type: ignore
)

timeout = kwargs.get("_socket_timeout", DEFAULT_SOCKET_TIMEOUT)
Expand All @@ -188,6 +220,7 @@ def __init__(

self._request_lock = threading.RLock()

# TODO: Move this bounding logic into DatabricksRetryPolicy for v3 (PECO-918)
def _initialize_retry_args(self, kwargs):
# Configure retries & timing: use user-settings or defaults, and bound
# by policy. Log.warn when given param gets restricted.
Expand Down Expand Up @@ -335,12 +368,17 @@ def attempt_request(attempt):

error, error_message, retry_delay = None, None, None
try:
logger.debug(
"Sending request: {}(<REDACTED>)".format(
getattr(method, "__name__")
)
)

this_method_name = getattr(method, "__name__")

logger.debug("Sending request: {}(<REDACTED>)".format(this_method_name))
unsafe_logger.debug("Sending request: {}".format(request))

# These three lines are no-ops if the v3 retry policy is not in use
this_command_type = CommandType.get(this_method_name)
self._transport.set_retry_command_type(this_command_type)
self._transport.startRetryTimer()

response = method(request)

# Calling `close()` here releases the active HTTP connection back to the pool
Expand All @@ -356,9 +394,16 @@ def attempt_request(attempt):
except urllib3.exceptions.HTTPError as err:
# retry on timeout. Happens a lot in Azure and it is safe as data has not been sent to server yet

# TODO: don't use exception handling for GOS polling...

gos_name = TCLIServiceClient.GetOperationStatus.__name__
if method.__name__ == gos_name:
retry_delay = bound_retry_delay(attempt, self._retry_delay_default)
delay_default = (
self.enable_v3_retries
and self.retry_policy.delay_default
or self._retry_delay_default
)
retry_delay = bound_retry_delay(attempt, delay_default)
logger.info(
f"GetOperationStatus failed with HTTP error and will be retried: {str(err)}"
)
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/sqlalchemy/dialect/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class DatabricksDialect(default.DefaultDialect):
driver: str = "databricks-sql-python"
default_schema_name: str = "default"

preparer = DatabricksIdentifierPreparer
preparer = DatabricksIdentifierPreparer # type: ignore
susodapop marked this conversation as resolved.
Show resolved Hide resolved
type_compiler = DatabricksTypeCompiler
ddl_compiler = DatabricksDDLCompiler
supports_statement_cache: bool = True
Expand Down
Loading
Loading