From 2d50707a4391c69ba1c8b0c2630a9dde25fc2cd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Sat, 27 Jul 2024 14:16:14 +0000 Subject: [PATCH] Test Python 3.13 --- .github/workflows/data/core/matrix.yml | 2 +- .github/workflows/data/ftp/matrix.yml | 2 +- .github/workflows/data/ftps/matrix.yml | 2 +- .github/workflows/data/hdfs/matrix.yml | 2 +- .github/workflows/data/s3/matrix.yml | 2 +- .github/workflows/data/sftp/matrix.yml | 2 +- .github/workflows/data/webdav/matrix.yml | 2 +- .../db_connection/greenplum/connection.py | 19 ++++++--- .../jdbc_connection/connection.py | 39 +++++++++++++++++-- .../db_connection/jdbc_connection/options.py | 14 ++++++- .../db_connection/jdbc_mixin/connection.py | 3 +- onetl/strategy/incremental_strategy.py | 34 +++++++++------- 12 files changed, 90 insertions(+), 33 deletions(-) diff --git a/.github/workflows/data/core/matrix.yml b/.github/workflows/data/core/matrix.yml index d20f074ab..3f25b079b 100644 --- a/.github/workflows/data/core/matrix.yml +++ b/.github/workflows/data/core/matrix.yml @@ -8,7 +8,7 @@ min: &min max: &max spark-version: 3.5.1 pydantic-version: 2 - python-version: '3.12' + python-version: '3.13.0-beta.4' java-version: 20 os: ubuntu-latest diff --git a/.github/workflows/data/ftp/matrix.yml b/.github/workflows/data/ftp/matrix.yml index d01c39029..878f9b779 100644 --- a/.github/workflows/data/ftp/matrix.yml +++ b/.github/workflows/data/ftp/matrix.yml @@ -5,7 +5,7 @@ min: &min max: &max pydantic-version: 2 - python-version: '3.12' + python-version: '3.13.0-beta.4' os: ubuntu-latest latest: &latest diff --git a/.github/workflows/data/ftps/matrix.yml b/.github/workflows/data/ftps/matrix.yml index efe28e79a..40ec8fc9a 100644 --- a/.github/workflows/data/ftps/matrix.yml +++ b/.github/workflows/data/ftps/matrix.yml @@ -5,7 +5,7 @@ min: &min max: &max pydantic-version: 2 - python-version: '3.12' + python-version: '3.13.0-beta.4' os: ubuntu-latest latest: &latest diff --git a/.github/workflows/data/hdfs/matrix.yml b/.github/workflows/data/hdfs/matrix.yml index 6d8156c50..45cbc1d96 100644 --- a/.github/workflows/data/hdfs/matrix.yml +++ b/.github/workflows/data/hdfs/matrix.yml @@ -10,7 +10,7 @@ max: &max hadoop-version: hadoop3-hdfs spark-version: 3.5.1 pydantic-version: 2 - python-version: '3.12' + python-version: '3.13.0-beta.4' java-version: 20 os: ubuntu-latest diff --git a/.github/workflows/data/s3/matrix.yml b/.github/workflows/data/s3/matrix.yml index d9b9338f8..df7f08b38 100644 --- a/.github/workflows/data/s3/matrix.yml +++ b/.github/workflows/data/s3/matrix.yml @@ -12,7 +12,7 @@ max: &max minio-version: 2024.4.18 spark-version: 3.5.1 pydantic-version: 2 - python-version: '3.12' + python-version: '3.13.0-beta.4' java-version: 20 os: ubuntu-latest diff --git a/.github/workflows/data/sftp/matrix.yml b/.github/workflows/data/sftp/matrix.yml index a32f6f823..a57f6dfe3 100644 --- a/.github/workflows/data/sftp/matrix.yml +++ b/.github/workflows/data/sftp/matrix.yml @@ -5,7 +5,7 @@ min: &min max: &max pydantic-version: 2 - python-version: '3.12' + python-version: '3.13.0-beta.4' os: ubuntu-latest latest: &latest diff --git a/.github/workflows/data/webdav/matrix.yml b/.github/workflows/data/webdav/matrix.yml index fb76e3282..57423e616 100644 --- a/.github/workflows/data/webdav/matrix.yml +++ b/.github/workflows/data/webdav/matrix.yml @@ -5,7 +5,7 @@ min: &min max: &max pydantic-version: 2 - python-version: '3.12' + python-version: '3.13.0-beta.4' os: ubuntu-latest latest: &latest diff --git a/onetl/connection/db_connection/greenplum/connection.py b/onetl/connection/db_connection/greenplum/connection.py index 7ed60539b..125710dff 100644 --- a/onetl/connection/db_connection/greenplum/connection.py +++ b/onetl/connection/db_connection/greenplum/connection.py @@ -5,17 +5,18 @@ import logging import os import textwrap +import threading import warnings -from typing import TYPE_CHECKING, Any, ClassVar +from typing import TYPE_CHECKING, Any, ClassVar, Optional from etl_entities.instance import Host from onetl.connection.db_connection.jdbc_connection.options import JDBCReadOptions try: - from pydantic.v1 import validator + from pydantic.v1 import PrivateAttr, SecretStr, validator except (ImportError, AttributeError): - from pydantic import validator # type: ignore[no-redef, assignment] + from pydantic import validator, SecretStr, PrivateAttr # type: ignore[no-redef, assignment] from onetl._util.classproperty import classproperty from onetl._util.java import try_import_java_class @@ -39,7 +40,9 @@ from onetl.connection.db_connection.jdbc_mixin.options import ( JDBCExecuteOptions, JDBCFetchOptions, - JDBCOptions, +) +from onetl.connection.db_connection.jdbc_mixin.options import ( + JDBCOptions as JDBCMixinOptions, ) from onetl.exception import MISSING_JVM_CLASS_MSG, TooManyParallelJobsError from onetl.hooks import slot, support_hooks @@ -69,7 +72,7 @@ class GreenplumExtra(GenericOptions): class Config: extra = "allow" - prohibited_options = JDBCOptions.Config.prohibited_options + prohibited_options = JDBCMixinOptions.Config.prohibited_options @support_hooks @@ -157,6 +160,8 @@ class Greenplum(JDBCMixin, DBConnection): """ host: Host + user: str + password: SecretStr database: str port: int = 5432 extra: GreenplumExtra = GreenplumExtra() @@ -166,6 +171,7 @@ class Greenplum(JDBCMixin, DBConnection): SQLOptions = GreenplumSQLOptions FetchOptions = GreenplumFetchOptions ExecuteOptions = GreenplumExecuteOptions + JDBCOptions = JDBCMixinOptions Extra = GreenplumExtra Dialect = GreenplumDialect @@ -174,6 +180,9 @@ class Greenplum(JDBCMixin, DBConnection): CONNECTIONS_WARNING_LIMIT: ClassVar[int] = 31 CONNECTIONS_EXCEPTION_LIMIT: ClassVar[int] = 100 + _CHECK_QUERY: ClassVar[str] = "SELECT 1" + _last_connection_and_options: Optional[threading.local] = PrivateAttr(default=None) + @slot @classmethod def get_packages( diff --git a/onetl/connection/db_connection/jdbc_connection/connection.py b/onetl/connection/db_connection/jdbc_connection/connection.py index 5b0aebeb8..f5cf8029a 100644 --- a/onetl/connection/db_connection/jdbc_connection/connection.py +++ b/onetl/connection/db_connection/jdbc_connection/connection.py @@ -4,8 +4,14 @@ import logging import secrets +import threading import warnings -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, ClassVar, Optional + +try: + from pydantic.v1 import PrivateAttr, SecretStr +except (ImportError, AttributeError): + from pydantic import PrivateAttr, SecretStr # type: ignore[no-redef, assignment] from onetl._util.sql import clear_statement from onetl.connection.db_connection.db_connection import DBConnection @@ -19,7 +25,13 @@ JDBCWriteOptions, ) from onetl.connection.db_connection.jdbc_mixin import JDBCMixin -from onetl.connection.db_connection.jdbc_mixin.options import JDBCFetchOptions +from onetl.connection.db_connection.jdbc_mixin.options import ( + JDBCExecuteOptions, + JDBCFetchOptions, +) +from onetl.connection.db_connection.jdbc_mixin.options import ( + JDBCOptions as JDBCMixinOptions, +) from onetl.hooks import slot, support_hooks from onetl.hwm import Window from onetl.log import log_lines, log_with_indent @@ -45,6 +57,16 @@ @support_hooks class JDBCConnection(JDBCMixin, DBConnection): + user: str + password: SecretStr + + DRIVER: ClassVar[str] + _CHECK_QUERY: ClassVar[str] = "SELECT 1" + _last_connection_and_options: Optional[threading.local] = PrivateAttr(default=None) + + JDBCOptions = JDBCMixinOptions + FetchOptions = JDBCFetchOptions + ExecuteOptions = JDBCExecuteOptions Dialect = JDBCDialect ReadOptions = JDBCReadOptions SQLOptions = JDBCSQLOptions @@ -109,11 +131,16 @@ def read_source_as_df( limit: int | None = None, options: JDBCReadOptions | None = None, ) -> DataFrame: + if isinstance(options, JDBCLegacyOptions): + raw_options = self.ReadOptions.parse(options.dict(exclude_unset=True)) + else: + raw_options = self.ReadOptions.parse(options) + read_options = self._set_lower_upper_bound( table=source, where=where, hint=hint, - options=self.ReadOptions.parse(options), + options=raw_options, ) new_columns = columns or ["*"] @@ -170,7 +197,11 @@ def write_df_to_target( target: str, options: JDBCWriteOptions | None = None, ) -> None: - write_options = self.WriteOptions.parse(options) + if isinstance(options, JDBCLegacyOptions): + write_options = self.WriteOptions.parse(options.dict(exclude_unset=True)) + else: + write_options = self.WriteOptions.parse(options) + jdbc_properties = self._get_jdbc_properties(write_options, exclude={"if_exists"}, exclude_none=True) mode = ( diff --git a/onetl/connection/db_connection/jdbc_connection/options.py b/onetl/connection/db_connection/jdbc_connection/options.py index a2aa39adb..7e189e86f 100644 --- a/onetl/connection/db_connection/jdbc_connection/options.py +++ b/onetl/connection/db_connection/jdbc_connection/options.py @@ -672,7 +672,19 @@ def _check_partition_fields(cls, values): "Deprecated in 0.5.0 and will be removed in 1.0.0. Use 'ReadOptions' or 'WriteOptions' instead", category=UserWarning, ) -class JDBCLegacyOptions(JDBCReadOptions, JDBCWriteOptions): +class JDBCLegacyOptions(GenericOptions): class Config: prohibited_options = GENERIC_PROHIBITED_OPTIONS + known_options = READ_OPTIONS | WRITE_OPTIONS | READ_WRITE_OPTIONS extra = "allow" + + partition_column: Optional[str] = Field(default=None, alias="partitionColumn") + num_partitions: PositiveInt = Field(default=1, alias="numPartitions") + lower_bound: Optional[int] = Field(default=None, alias="lowerBound") + upper_bound: Optional[int] = Field(default=None, alias="upperBound") + session_init_statement: Optional[str] = Field(default=None, alias="sessionInitStatement") + query_timeout: Optional[int] = Field(default=None, alias="queryTimeout") + if_exists: JDBCTableExistBehavior = Field(default=JDBCTableExistBehavior.APPEND, alias="mode") + isolation_level: str = Field(default="READ_UNCOMMITTED", alias="isolationLevel") + fetchsize: int = 100_000 + batchsize: int = 20_000 diff --git a/onetl/connection/db_connection/jdbc_mixin/connection.py b/onetl/connection/db_connection/jdbc_mixin/connection.py index e8c19e38b..0ce2ce52b 100644 --- a/onetl/connection/db_connection/jdbc_mixin/connection.py +++ b/onetl/connection/db_connection/jdbc_mixin/connection.py @@ -29,7 +29,6 @@ ) from onetl.exception import MISSING_JVM_CLASS_MSG from onetl.hooks import slot, support_hooks -from onetl.impl import FrozenModel from onetl.log import log_lines if TYPE_CHECKING: @@ -57,7 +56,7 @@ class JDBCStatementType(Enum): @support_hooks -class JDBCMixin(FrozenModel): +class JDBCMixin: """ Compatibility layer between Python and Java SQL Module. diff --git a/onetl/strategy/incremental_strategy.py b/onetl/strategy/incremental_strategy.py index 0397514b6..613d22c88 100644 --- a/onetl/strategy/incremental_strategy.py +++ b/onetl/strategy/incremental_strategy.py @@ -6,23 +6,11 @@ from etl_entities.hwm import HWM -from onetl.impl import BaseModel from onetl.strategy.batch_hwm_strategy import BatchHWMStrategy from onetl.strategy.hwm_strategy import HWMStrategy -class OffsetMixin(BaseModel): - hwm: Optional[HWM] = None - offset: Any = None - - def fetch_hwm(self) -> None: - super().fetch_hwm() - - if self.hwm and self.hwm.value is not None and self.offset is not None: - self.hwm -= self.offset - - -class IncrementalStrategy(OffsetMixin, HWMStrategy): +class IncrementalStrategy(HWMStrategy): """Incremental strategy for :ref:`db-reader`/:ref:`file-downloader`. Used for fetching only new rows/files from a source @@ -353,8 +341,17 @@ class IncrementalStrategy(OffsetMixin, HWMStrategy): # current run will download only files which were not downloaded in previous runs """ + hwm: Optional[HWM] = None + offset: Any = None + + def fetch_hwm(self) -> None: + super().fetch_hwm() + + if self.hwm and self.hwm.value is not None and self.offset is not None: + self.hwm -= self.offset -class IncrementalBatchStrategy(OffsetMixin, BatchHWMStrategy): + +class IncrementalBatchStrategy(BatchHWMStrategy): """Incremental batch strategy for :ref:`db-reader`. .. note:: @@ -669,6 +666,15 @@ class IncrementalBatchStrategy(OffsetMixin, BatchHWMStrategy): """ + hwm: Optional[HWM] = None + offset: Any = None + + def fetch_hwm(self) -> None: + super().fetch_hwm() + + if self.hwm and self.hwm.value is not None and self.offset is not None: + self.hwm -= self.offset + def __next__(self): self.save_hwm() return super().__next__()